}
static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
- int offset, size_t size, int more)
+ int offset, size_t size, bool more)
{
int flags = MSG_DONTWAIT | MSG_NOSIGNAL | (more ? MSG_MORE : MSG_EOR);
int ret;
}
#ifdef CONFIG_BLOCK
-static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg)
+static void init_bio_iter(struct bio *bio, struct bio **bio_iter,
+ unsigned int *bio_seg)
{
if (!bio) {
- *iter = NULL;
- *seg = 0;
+ *bio_iter = NULL;
+ *bio_seg = 0;
return;
}
- *iter = bio;
- *seg = bio->bi_idx;
+ *bio_iter = bio;
+ *bio_seg = (unsigned int) bio->bi_idx;
}
-static void iter_bio_next(struct bio **bio_iter, int *seg)
+static void iter_bio_next(struct bio **bio_iter, unsigned int *seg)
{
if (*bio_iter == NULL)
return;
static void prepare_write_message_data(struct ceph_connection *con)
{
struct ceph_msg *msg = con->out_msg;
+ struct ceph_msg_pos *msg_pos = &con->out_msg_pos;
BUG_ON(!msg);
BUG_ON(!msg->hdr.data_len);
/* initialize page iterator */
- con->out_msg_pos.page = 0;
+ msg_pos->page = 0;
if (msg->pages)
- con->out_msg_pos.page_pos = msg->page_alignment;
+ msg_pos->page_pos = msg->page_alignment;
else
- con->out_msg_pos.page_pos = 0;
+ msg_pos->page_pos = 0;
#ifdef CONFIG_BLOCK
if (msg->bio)
init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg);
#endif
- con->out_msg_pos.data_pos = 0;
- con->out_msg_pos.did_page_crc = false;
+ msg_pos->data_pos = 0;
+ msg_pos->did_page_crc = false;
con->out_more = 1; /* data + footer will follow */
}
m->hdr.seq = cpu_to_le64(++con->out_seq);
m->needs_out_seq = false;
}
-#ifdef CONFIG_BLOCK
- else
- m->bio_iter = NULL;
-#endif
- dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n",
+ dout("prepare_write_message %p seq %lld type %d len %d+%d+%d (%zd)\n",
m, con->out_seq, le16_to_cpu(m->hdr.type),
le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
- le32_to_cpu(m->hdr.data_len),
- m->nr_pages);
+ le32_to_cpu(m->hdr.data_len), m->length);
BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
/* tag + hdr + front + middle */
size_t len, size_t sent, bool in_trail)
{
struct ceph_msg *msg = con->out_msg;
+ struct ceph_msg_pos *msg_pos = &con->out_msg_pos;
BUG_ON(!msg);
BUG_ON(!sent);
- con->out_msg_pos.data_pos += sent;
- con->out_msg_pos.page_pos += sent;
+ msg_pos->data_pos += sent;
+ msg_pos->page_pos += sent;
if (sent < len)
return;
BUG_ON(sent != len);
- con->out_msg_pos.page_pos = 0;
- con->out_msg_pos.page++;
- con->out_msg_pos.did_page_crc = false;
+ msg_pos->page_pos = 0;
+ msg_pos->page++;
+ msg_pos->did_page_crc = false;
if (in_trail)
- list_move_tail(&page->lru,
- &msg->trail->head);
+ list_rotate_left(&msg->trail->head);
else if (msg->pagelist)
- list_move_tail(&page->lru,
- &msg->pagelist->head);
+ list_rotate_left(&msg->pagelist->head);
#ifdef CONFIG_BLOCK
else if (msg->bio)
iter_bio_next(&msg->bio_iter, &msg->bio_seg);
#endif
}
+static void in_msg_pos_next(struct ceph_connection *con, size_t len,
+ size_t received)
+{
+ struct ceph_msg *msg = con->in_msg;
+ struct ceph_msg_pos *msg_pos = &con->in_msg_pos;
+
+ BUG_ON(!msg);
+ BUG_ON(!received);
+
+ msg_pos->data_pos += received;
+ msg_pos->page_pos += received;
+ if (received < len)
+ return;
+
+ BUG_ON(received != len);
+ msg_pos->page_pos = 0;
+ msg_pos->page++;
+#ifdef CONFIG_BLOCK
+ if (msg->bio)
+ iter_bio_next(&msg->bio_iter, &msg->bio_seg);
+#endif /* CONFIG_BLOCK */
+}
+
/*
* Write as much message data payload as we can. If we finish, queue
* up the footer.
static int write_partial_msg_pages(struct ceph_connection *con)
{
struct ceph_msg *msg = con->out_msg;
+ struct ceph_msg_pos *msg_pos = &con->out_msg_pos;
unsigned int data_len = le32_to_cpu(msg->hdr.data_len);
size_t len;
bool do_datacrc = !con->msgr->nocrc;
const size_t trail_len = (msg->trail ? msg->trail->length : 0);
const size_t trail_off = data_len - trail_len;
- dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n",
- con, msg, con->out_msg_pos.page, msg->nr_pages,
- con->out_msg_pos.page_pos);
+ dout("write_partial_msg_pages %p msg %p page %d offset %d\n",
+ con, msg, msg_pos->page, msg_pos->page_pos);
/*
* Iterate through each page that contains data to be
* need to map the page. If we have no pages, they have
* been revoked, so use the zero page.
*/
- while (data_len > con->out_msg_pos.data_pos) {
+ while (data_len > msg_pos->data_pos) {
struct page *page = NULL;
int max_write = PAGE_SIZE;
int bio_offset = 0;
- in_trail = in_trail || con->out_msg_pos.data_pos >= trail_off;
+ in_trail = in_trail || msg_pos->data_pos >= trail_off;
if (!in_trail)
- total_max_write = trail_off - con->out_msg_pos.data_pos;
+ total_max_write = trail_off - msg_pos->data_pos;
if (in_trail) {
- total_max_write = data_len - con->out_msg_pos.data_pos;
+ total_max_write = data_len - msg_pos->data_pos;
page = list_first_entry(&msg->trail->head,
struct page, lru);
} else if (msg->pages) {
- page = msg->pages[con->out_msg_pos.page];
+ page = msg->pages[msg_pos->page];
} else if (msg->pagelist) {
page = list_first_entry(&msg->pagelist->head,
struct page, lru);
} else {
page = zero_page;
}
- len = min_t(int, max_write - con->out_msg_pos.page_pos,
+ len = min_t(int, max_write - msg_pos->page_pos,
total_max_write);
- if (do_datacrc && !con->out_msg_pos.did_page_crc) {
+ if (do_datacrc && !msg_pos->did_page_crc) {
void *base;
u32 crc = le32_to_cpu(msg->footer.data_crc);
char *kaddr;
kaddr = kmap(page);
BUG_ON(kaddr == NULL);
- base = kaddr + con->out_msg_pos.page_pos + bio_offset;
+ base = kaddr + msg_pos->page_pos + bio_offset;
crc = crc32c(crc, base, len);
kunmap(page);
msg->footer.data_crc = cpu_to_le32(crc);
- con->out_msg_pos.did_page_crc = true;
+ msg_pos->did_page_crc = true;
}
ret = ceph_tcp_sendpage(con->sock, page,
- con->out_msg_pos.page_pos + bio_offset,
- len, 1);
+ msg_pos->page_pos + bio_offset,
+ len, true);
if (ret <= 0)
goto out;
while (con->out_skip > 0) {
size_t size = min(con->out_skip, (int) PAGE_CACHE_SIZE);
- ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, 1);
+ ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, true);
if (ret <= 0)
goto out;
con->out_skip -= ret;
struct page **pages,
unsigned int data_len, bool do_datacrc)
{
+ struct ceph_msg_pos *msg_pos = &con->in_msg_pos;
+ struct page *page;
void *p;
int ret;
int left;
- left = min((int)(data_len - con->in_msg_pos.data_pos),
- (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
+ left = min((int)(data_len - msg_pos->data_pos),
+ (int)(PAGE_SIZE - msg_pos->page_pos));
/* (page) data */
BUG_ON(pages == NULL);
- p = kmap(pages[con->in_msg_pos.page]);
- ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
- left);
+ page = pages[msg_pos->page];
+ p = kmap(page);
+ ret = ceph_tcp_recvmsg(con->sock, p + msg_pos->page_pos, left);
if (ret > 0 && do_datacrc)
con->in_data_crc =
crc32c(con->in_data_crc,
- p + con->in_msg_pos.page_pos, ret);
- kunmap(pages[con->in_msg_pos.page]);
+ p + msg_pos->page_pos, ret);
+ kunmap(page);
if (ret <= 0)
return ret;
- con->in_msg_pos.data_pos += ret;
- con->in_msg_pos.page_pos += ret;
- if (con->in_msg_pos.page_pos == PAGE_SIZE) {
- con->in_msg_pos.page_pos = 0;
- con->in_msg_pos.page++;
- }
+
+ in_msg_pos_next(con, left, ret);
return ret;
}
#ifdef CONFIG_BLOCK
static int read_partial_message_bio(struct ceph_connection *con,
- struct bio **bio_iter, int *bio_seg,
unsigned int data_len, bool do_datacrc)
{
- struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
+ struct ceph_msg *msg = con->in_msg;
+ struct ceph_msg_pos *msg_pos = &con->in_msg_pos;
+ struct bio_vec *bv;
+ struct page *page;
void *p;
int ret, left;
- left = min((int)(data_len - con->in_msg_pos.data_pos),
- (int)(bv->bv_len - con->in_msg_pos.page_pos));
+ BUG_ON(!msg);
+ BUG_ON(!msg->bio_iter);
+ bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg);
+
+ left = min((int)(data_len - msg_pos->data_pos),
+ (int)(bv->bv_len - msg_pos->page_pos));
- p = kmap(bv->bv_page) + bv->bv_offset;
+ page = bv->bv_page;
+ p = kmap(page) + bv->bv_offset;
- ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
- left);
+ ret = ceph_tcp_recvmsg(con->sock, p + msg_pos->page_pos, left);
if (ret > 0 && do_datacrc)
con->in_data_crc =
crc32c(con->in_data_crc,
- p + con->in_msg_pos.page_pos, ret);
- kunmap(bv->bv_page);
+ p + msg_pos->page_pos, ret);
+ kunmap(page);
if (ret <= 0)
return ret;
- con->in_msg_pos.data_pos += ret;
- con->in_msg_pos.page_pos += ret;
- if (con->in_msg_pos.page_pos == bv->bv_len) {
- con->in_msg_pos.page_pos = 0;
- iter_bio_next(bio_iter, bio_seg);
- }
+
+ in_msg_pos_next(con, left, ret);
return ret;
}
static int read_partial_message(struct ceph_connection *con)
{
struct ceph_msg *m = con->in_msg;
+ struct ceph_msg_pos *msg_pos = &con->in_msg_pos;
int size;
int end;
int ret;
if (front_len > CEPH_MSG_MAX_FRONT_LEN)
return -EIO;
middle_len = le32_to_cpu(con->in_hdr.middle_len);
- if (middle_len > CEPH_MSG_MAX_DATA_LEN)
+ if (middle_len > CEPH_MSG_MAX_MIDDLE_LEN)
return -EIO;
data_len = le32_to_cpu(con->in_hdr.data_len);
if (data_len > CEPH_MSG_MAX_DATA_LEN)
int skip = 0;
dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
- con->in_hdr.front_len, con->in_hdr.data_len);
+ front_len, data_len);
ret = ceph_con_in_msg_alloc(con, &skip);
if (ret < 0)
return ret;
if (m->middle)
m->middle->vec.iov_len = 0;
- con->in_msg_pos.page = 0;
+ msg_pos->page = 0;
if (m->pages)
- con->in_msg_pos.page_pos = m->page_alignment;
+ msg_pos->page_pos = m->page_alignment;
else
- con->in_msg_pos.page_pos = 0;
- con->in_msg_pos.data_pos = 0;
+ msg_pos->page_pos = 0;
+ msg_pos->data_pos = 0;
#ifdef CONFIG_BLOCK
if (m->bio)
}
/* (page) data */
- while (con->in_msg_pos.data_pos < data_len) {
+ while (msg_pos->data_pos < data_len) {
if (m->pages) {
ret = read_partial_message_pages(con, m->pages,
data_len, do_datacrc);
return ret;
#ifdef CONFIG_BLOCK
} else if (m->bio) {
- BUG_ON(!m->bio_iter);
ret = read_partial_message_bio(con,
- &m->bio_iter, &m->bio_seg,
data_len, do_datacrc);
if (ret <= 0)
return ret;
}
EXPORT_SYMBOL(ceph_con_keepalive);
+void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
+ size_t length, size_t alignment)
+{
+ BUG_ON(!pages);
+ BUG_ON(!length);
+ BUG_ON(msg->pages);
+ BUG_ON(msg->length);
+
+ msg->pages = pages;
+ msg->length = length;
+ msg->page_alignment = alignment & ~PAGE_MASK;
+}
+EXPORT_SYMBOL(ceph_msg_data_set_pages);
+
+void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
+ struct ceph_pagelist *pagelist)
+{
+ BUG_ON(!pagelist);
+ BUG_ON(!pagelist->length);
+ BUG_ON(msg->pagelist);
+
+ msg->pagelist = pagelist;
+}
+EXPORT_SYMBOL(ceph_msg_data_set_pagelist);
+
+void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio)
+{
+ BUG_ON(!bio);
+ BUG_ON(msg->bio);
+
+ msg->bio = bio;
+}
+EXPORT_SYMBOL(ceph_msg_data_set_bio);
+
+void ceph_msg_data_set_trail(struct ceph_msg *msg, struct ceph_pagelist *trail)
+{
+ BUG_ON(!trail);
+ BUG_ON(!trail->length);
+ BUG_ON(msg->trail);
+
+ msg->trail = trail;
+}
+EXPORT_SYMBOL(ceph_msg_data_set_trail);
/*
* construct a new message with given type, size
{
struct ceph_msg *m;
- m = kmalloc(sizeof(*m), flags);
+ m = kzalloc(sizeof(*m), flags);
if (m == NULL)
goto out;
- kref_init(&m->kref);
- m->con = NULL;
- INIT_LIST_HEAD(&m->list_head);
-
- m->hdr.tid = 0;
m->hdr.type = cpu_to_le16(type);
m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT);
- m->hdr.version = 0;
m->hdr.front_len = cpu_to_le32(front_len);
- m->hdr.middle_len = 0;
- m->hdr.data_len = 0;
- m->hdr.data_off = 0;
- m->hdr.reserved = 0;
- m->footer.front_crc = 0;
- m->footer.middle_crc = 0;
- m->footer.data_crc = 0;
- m->footer.flags = 0;
- m->front_max = front_len;
- m->front_is_vmalloc = false;
- m->more_to_follow = false;
- m->ack_stamp = 0;
- m->pool = NULL;
-
- /* middle */
- m->middle = NULL;
- /* data */
- m->nr_pages = 0;
- m->page_alignment = 0;
- m->pages = NULL;
- m->pagelist = NULL;
-#ifdef CONFIG_BLOCK
- m->bio = NULL;
- m->bio_iter = NULL;
- m->bio_seg = 0;
-#endif /* CONFIG_BLOCK */
- m->trail = NULL;
+ INIT_LIST_HEAD(&m->list_head);
+ kref_init(&m->kref);
/* front */
+ m->front_max = front_len;
if (front_len) {
if (front_len > PAGE_CACHE_SIZE) {
m->front.iov_base = __vmalloc(front_len, flags,
static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
{
struct ceph_msg_header *hdr = &con->in_hdr;
- int type = le16_to_cpu(hdr->type);
- int front_len = le32_to_cpu(hdr->front_len);
int middle_len = le32_to_cpu(hdr->middle_len);
+ struct ceph_msg *msg;
int ret = 0;
BUG_ON(con->in_msg != NULL);
+ BUG_ON(!con->ops->alloc_msg);
- if (con->ops->alloc_msg) {
- struct ceph_msg *msg;
-
- mutex_unlock(&con->mutex);
- msg = con->ops->alloc_msg(con, hdr, skip);
- mutex_lock(&con->mutex);
- if (con->state != CON_STATE_OPEN) {
- if (msg)
- ceph_msg_put(msg);
- return -EAGAIN;
- }
- con->in_msg = msg;
- if (con->in_msg) {
- con->in_msg->con = con->ops->get(con);
- BUG_ON(con->in_msg->con == NULL);
- }
- if (*skip) {
- con->in_msg = NULL;
- return 0;
- }
- if (!con->in_msg) {
- con->error_msg =
- "error allocating memory for incoming message";
- return -ENOMEM;
- }
+ mutex_unlock(&con->mutex);
+ msg = con->ops->alloc_msg(con, hdr, skip);
+ mutex_lock(&con->mutex);
+ if (con->state != CON_STATE_OPEN) {
+ if (msg)
+ ceph_msg_put(msg);
+ return -EAGAIN;
}
- if (!con->in_msg) {
- con->in_msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
- if (!con->in_msg) {
- pr_err("unable to allocate msg type %d len %d\n",
- type, front_len);
- return -ENOMEM;
- }
+ if (msg) {
+ BUG_ON(*skip);
+ con->in_msg = msg;
con->in_msg->con = con->ops->get(con);
BUG_ON(con->in_msg->con == NULL);
- con->in_msg->page_alignment = le16_to_cpu(hdr->data_off);
+ } else {
+ /*
+ * Null message pointer means either we should skip
+ * this message or we couldn't allocate memory. The
+ * former is not an error.
+ */
+ if (*skip)
+ return 0;
+ con->error_msg = "error allocating memory for incoming message";
+
+ return -ENOMEM;
}
memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
ceph_buffer_put(m->middle);
m->middle = NULL;
}
- m->nr_pages = 0;
+ m->length = 0;
m->pages = NULL;
if (m->pagelist) {
void ceph_msg_dump(struct ceph_msg *msg)
{
- pr_debug("msg_dump %p (front_max %d nr_pages %d)\n", msg,
- msg->front_max, msg->nr_pages);
+ pr_debug("msg_dump %p (front_max %d length %zd)\n", msg,
+ msg->front_max, msg->length);
print_hex_dump(KERN_DEBUG, "header: ",
DUMP_PREFIX_OFFSET, 16, 1,
&msg->hdr, sizeof(msg->hdr), true);