]> git.proxmox.com Git - mirror_ubuntu-artful-kernel.git/blobdiff - net/ceph/messenger.c
libceph: use local variables for message positions
[mirror_ubuntu-artful-kernel.git] / net / ceph / messenger.c
index 2c0669fb54e33181f2834ea6725dd15291ec5a64..7788170524e3c410741163cc08159c74f92d8dc9 100644 (file)
@@ -493,7 +493,7 @@ static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
 }
 
 static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
-                    int offset, size_t size, int more)
+                    int offset, size_t size, bool more)
 {
        int flags = MSG_DONTWAIT | MSG_NOSIGNAL | (more ? MSG_MORE : MSG_EOR);
        int ret;
@@ -697,18 +697,19 @@ static void con_out_kvec_add(struct ceph_connection *con,
 }
 
 #ifdef CONFIG_BLOCK
-static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg)
+static void init_bio_iter(struct bio *bio, struct bio **bio_iter,
+                       unsigned int *bio_seg)
 {
        if (!bio) {
-               *iter = NULL;
-               *seg = 0;
+               *bio_iter = NULL;
+               *bio_seg = 0;
                return;
        }
-       *iter = bio;
-       *seg = bio->bi_idx;
+       *bio_iter = bio;
+       *bio_seg = (unsigned int) bio->bi_idx;
 }
 
-static void iter_bio_next(struct bio **bio_iter, int *seg)
+static void iter_bio_next(struct bio **bio_iter, unsigned int *seg)
 {
        if (*bio_iter == NULL)
                return;
@@ -724,22 +725,23 @@ static void iter_bio_next(struct bio **bio_iter, int *seg)
 static void prepare_write_message_data(struct ceph_connection *con)
 {
        struct ceph_msg *msg = con->out_msg;
+       struct ceph_msg_pos *msg_pos = &con->out_msg_pos;
 
        BUG_ON(!msg);
        BUG_ON(!msg->hdr.data_len);
 
        /* initialize page iterator */
-       con->out_msg_pos.page = 0;
+       msg_pos->page = 0;
        if (msg->pages)
-               con->out_msg_pos.page_pos = msg->page_alignment;
+               msg_pos->page_pos = msg->page_alignment;
        else
-               con->out_msg_pos.page_pos = 0;
+               msg_pos->page_pos = 0;
 #ifdef CONFIG_BLOCK
        if (msg->bio)
                init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg);
 #endif
-       con->out_msg_pos.data_pos = 0;
-       con->out_msg_pos.did_page_crc = false;
+       msg_pos->data_pos = 0;
+       msg_pos->did_page_crc = false;
        con->out_more = 1;  /* data + footer will follow */
 }
 
@@ -803,16 +805,11 @@ static void prepare_write_message(struct ceph_connection *con)
                m->hdr.seq = cpu_to_le64(++con->out_seq);
                m->needs_out_seq = false;
        }
-#ifdef CONFIG_BLOCK
-       else
-               m->bio_iter = NULL;
-#endif
 
-       dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n",
+       dout("prepare_write_message %p seq %lld type %d len %d+%d+%d (%zd)\n",
             m, con->out_seq, le16_to_cpu(m->hdr.type),
             le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
-            le32_to_cpu(m->hdr.data_len),
-            m->nr_pages);
+            le32_to_cpu(m->hdr.data_len), m->length);
        BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
 
        /* tag + hdr + front + middle */
@@ -1026,31 +1023,53 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
                        size_t len, size_t sent, bool in_trail)
 {
        struct ceph_msg *msg = con->out_msg;
+       struct ceph_msg_pos *msg_pos = &con->out_msg_pos;
 
        BUG_ON(!msg);
        BUG_ON(!sent);
 
-       con->out_msg_pos.data_pos += sent;
-       con->out_msg_pos.page_pos += sent;
+       msg_pos->data_pos += sent;
+       msg_pos->page_pos += sent;
        if (sent < len)
                return;
 
        BUG_ON(sent != len);
-       con->out_msg_pos.page_pos = 0;
-       con->out_msg_pos.page++;
-       con->out_msg_pos.did_page_crc = false;
+       msg_pos->page_pos = 0;
+       msg_pos->page++;
+       msg_pos->did_page_crc = false;
        if (in_trail)
-               list_move_tail(&page->lru,
-                              &msg->trail->head);
+               list_rotate_left(&msg->trail->head);
        else if (msg->pagelist)
-               list_move_tail(&page->lru,
-                              &msg->pagelist->head);
+               list_rotate_left(&msg->pagelist->head);
 #ifdef CONFIG_BLOCK
        else if (msg->bio)
                iter_bio_next(&msg->bio_iter, &msg->bio_seg);
 #endif
 }
 
+static void in_msg_pos_next(struct ceph_connection *con, size_t len,
+                               size_t received)
+{
+       struct ceph_msg *msg = con->in_msg;
+       struct ceph_msg_pos *msg_pos = &con->in_msg_pos;
+
+       BUG_ON(!msg);
+       BUG_ON(!received);
+
+       msg_pos->data_pos += received;
+       msg_pos->page_pos += received;
+       if (received < len)
+               return;
+
+       BUG_ON(received != len);
+       msg_pos->page_pos = 0;
+       msg_pos->page++;
+#ifdef CONFIG_BLOCK
+       if (msg->bio)
+               iter_bio_next(&msg->bio_iter, &msg->bio_seg);
+#endif /* CONFIG_BLOCK */
+}
+
 /*
  * Write as much message data payload as we can.  If we finish, queue
  * up the footer.
@@ -1061,6 +1080,7 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
 static int write_partial_msg_pages(struct ceph_connection *con)
 {
        struct ceph_msg *msg = con->out_msg;
+       struct ceph_msg_pos *msg_pos = &con->out_msg_pos;
        unsigned int data_len = le32_to_cpu(msg->hdr.data_len);
        size_t len;
        bool do_datacrc = !con->msgr->nocrc;
@@ -1070,9 +1090,8 @@ static int write_partial_msg_pages(struct ceph_connection *con)
        const size_t trail_len = (msg->trail ? msg->trail->length : 0);
        const size_t trail_off = data_len - trail_len;
 
-       dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n",
-            con, msg, con->out_msg_pos.page, msg->nr_pages,
-            con->out_msg_pos.page_pos);
+       dout("write_partial_msg_pages %p msg %p page %d offset %d\n",
+            con, msg, msg_pos->page, msg_pos->page_pos);
 
        /*
         * Iterate through each page that contains data to be
@@ -1082,22 +1101,22 @@ static int write_partial_msg_pages(struct ceph_connection *con)
         * need to map the page.  If we have no pages, they have
         * been revoked, so use the zero page.
         */
-       while (data_len > con->out_msg_pos.data_pos) {
+       while (data_len > msg_pos->data_pos) {
                struct page *page = NULL;
                int max_write = PAGE_SIZE;
                int bio_offset = 0;
 
-               in_trail = in_trail || con->out_msg_pos.data_pos >= trail_off;
+               in_trail = in_trail || msg_pos->data_pos >= trail_off;
                if (!in_trail)
-                       total_max_write = trail_off - con->out_msg_pos.data_pos;
+                       total_max_write = trail_off - msg_pos->data_pos;
 
                if (in_trail) {
-                       total_max_write = data_len - con->out_msg_pos.data_pos;
+                       total_max_write = data_len - msg_pos->data_pos;
 
                        page = list_first_entry(&msg->trail->head,
                                                struct page, lru);
                } else if (msg->pages) {
-                       page = msg->pages[con->out_msg_pos.page];
+                       page = msg->pages[msg_pos->page];
                } else if (msg->pagelist) {
                        page = list_first_entry(&msg->pagelist->head,
                                                struct page, lru);
@@ -1113,25 +1132,25 @@ static int write_partial_msg_pages(struct ceph_connection *con)
                } else {
                        page = zero_page;
                }
-               len = min_t(int, max_write - con->out_msg_pos.page_pos,
+               len = min_t(int, max_write - msg_pos->page_pos,
                            total_max_write);
 
-               if (do_datacrc && !con->out_msg_pos.did_page_crc) {
+               if (do_datacrc && !msg_pos->did_page_crc) {
                        void *base;
                        u32 crc = le32_to_cpu(msg->footer.data_crc);
                        char *kaddr;
 
                        kaddr = kmap(page);
                        BUG_ON(kaddr == NULL);
-                       base = kaddr + con->out_msg_pos.page_pos + bio_offset;
+                       base = kaddr + msg_pos->page_pos + bio_offset;
                        crc = crc32c(crc, base, len);
                        kunmap(page);
                        msg->footer.data_crc = cpu_to_le32(crc);
-                       con->out_msg_pos.did_page_crc = true;
+                       msg_pos->did_page_crc = true;
                }
                ret = ceph_tcp_sendpage(con->sock, page,
-                                     con->out_msg_pos.page_pos + bio_offset,
-                                     len, 1);
+                                     msg_pos->page_pos + bio_offset,
+                                     len, true);
                if (ret <= 0)
                        goto out;
 
@@ -1160,7 +1179,7 @@ static int write_partial_skip(struct ceph_connection *con)
        while (con->out_skip > 0) {
                size_t size = min(con->out_skip, (int) PAGE_CACHE_SIZE);
 
-               ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, 1);
+               ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, true);
                if (ret <= 0)
                        goto out;
                con->out_skip -= ret;
@@ -1788,63 +1807,63 @@ static int read_partial_message_pages(struct ceph_connection *con,
                                      struct page **pages,
                                      unsigned int data_len, bool do_datacrc)
 {
+       struct ceph_msg_pos *msg_pos = &con->in_msg_pos;
+       struct page *page;
        void *p;
        int ret;
        int left;
 
-       left = min((int)(data_len - con->in_msg_pos.data_pos),
-                  (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
+       left = min((int)(data_len - msg_pos->data_pos),
+                  (int)(PAGE_SIZE - msg_pos->page_pos));
        /* (page) data */
        BUG_ON(pages == NULL);
-       p = kmap(pages[con->in_msg_pos.page]);
-       ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
-                              left);
+       page = pages[msg_pos->page];
+       p = kmap(page);
+       ret = ceph_tcp_recvmsg(con->sock, p + msg_pos->page_pos, left);
        if (ret > 0 && do_datacrc)
                con->in_data_crc =
                        crc32c(con->in_data_crc,
-                                 p + con->in_msg_pos.page_pos, ret);
-       kunmap(pages[con->in_msg_pos.page]);
+                                 p + msg_pos->page_pos, ret);
+       kunmap(page);
        if (ret <= 0)
                return ret;
-       con->in_msg_pos.data_pos += ret;
-       con->in_msg_pos.page_pos += ret;
-       if (con->in_msg_pos.page_pos == PAGE_SIZE) {
-               con->in_msg_pos.page_pos = 0;
-               con->in_msg_pos.page++;
-       }
+
+       in_msg_pos_next(con, left, ret);
 
        return ret;
 }
 
 #ifdef CONFIG_BLOCK
 static int read_partial_message_bio(struct ceph_connection *con,
-                                   struct bio **bio_iter, int *bio_seg,
                                    unsigned int data_len, bool do_datacrc)
 {
-       struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
+       struct ceph_msg *msg = con->in_msg;
+       struct ceph_msg_pos *msg_pos = &con->in_msg_pos;
+       struct bio_vec *bv;
+       struct page *page;
        void *p;
        int ret, left;
 
-       left = min((int)(data_len - con->in_msg_pos.data_pos),
-                  (int)(bv->bv_len - con->in_msg_pos.page_pos));
+       BUG_ON(!msg);
+       BUG_ON(!msg->bio_iter);
+       bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg);
+
+       left = min((int)(data_len - msg_pos->data_pos),
+                  (int)(bv->bv_len - msg_pos->page_pos));
 
-       p = kmap(bv->bv_page) + bv->bv_offset;
+       page = bv->bv_page;
+       p = kmap(page) + bv->bv_offset;
 
-       ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
-                              left);
+       ret = ceph_tcp_recvmsg(con->sock, p + msg_pos->page_pos, left);
        if (ret > 0 && do_datacrc)
                con->in_data_crc =
                        crc32c(con->in_data_crc,
-                                 p + con->in_msg_pos.page_pos, ret);
-       kunmap(bv->bv_page);
+                                 p + msg_pos->page_pos, ret);
+       kunmap(page);
        if (ret <= 0)
                return ret;
-       con->in_msg_pos.data_pos += ret;
-       con->in_msg_pos.page_pos += ret;
-       if (con->in_msg_pos.page_pos == bv->bv_len) {
-               con->in_msg_pos.page_pos = 0;
-               iter_bio_next(bio_iter, bio_seg);
-       }
+
+       in_msg_pos_next(con, left, ret);
 
        return ret;
 }
@@ -1856,6 +1875,7 @@ static int read_partial_message_bio(struct ceph_connection *con,
 static int read_partial_message(struct ceph_connection *con)
 {
        struct ceph_msg *m = con->in_msg;
+       struct ceph_msg_pos *msg_pos = &con->in_msg_pos;
        int size;
        int end;
        int ret;
@@ -1885,7 +1905,7 @@ static int read_partial_message(struct ceph_connection *con)
        if (front_len > CEPH_MSG_MAX_FRONT_LEN)
                return -EIO;
        middle_len = le32_to_cpu(con->in_hdr.middle_len);
-       if (middle_len > CEPH_MSG_MAX_DATA_LEN)
+       if (middle_len > CEPH_MSG_MAX_MIDDLE_LEN)
                return -EIO;
        data_len = le32_to_cpu(con->in_hdr.data_len);
        if (data_len > CEPH_MSG_MAX_DATA_LEN)
@@ -1914,7 +1934,7 @@ static int read_partial_message(struct ceph_connection *con)
                int skip = 0;
 
                dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
-                    con->in_hdr.front_len, con->in_hdr.data_len);
+                    front_len, data_len);
                ret = ceph_con_in_msg_alloc(con, &skip);
                if (ret < 0)
                        return ret;
@@ -1936,12 +1956,12 @@ static int read_partial_message(struct ceph_connection *con)
                if (m->middle)
                        m->middle->vec.iov_len = 0;
 
-               con->in_msg_pos.page = 0;
+               msg_pos->page = 0;
                if (m->pages)
-                       con->in_msg_pos.page_pos = m->page_alignment;
+                       msg_pos->page_pos = m->page_alignment;
                else
-                       con->in_msg_pos.page_pos = 0;
-               con->in_msg_pos.data_pos = 0;
+                       msg_pos->page_pos = 0;
+               msg_pos->data_pos = 0;
 
 #ifdef CONFIG_BLOCK
                if (m->bio)
@@ -1965,7 +1985,7 @@ static int read_partial_message(struct ceph_connection *con)
        }
 
        /* (page) data */
-       while (con->in_msg_pos.data_pos < data_len) {
+       while (msg_pos->data_pos < data_len) {
                if (m->pages) {
                        ret = read_partial_message_pages(con, m->pages,
                                                 data_len, do_datacrc);
@@ -1973,9 +1993,7 @@ static int read_partial_message(struct ceph_connection *con)
                                return ret;
 #ifdef CONFIG_BLOCK
                } else if (m->bio) {
-                       BUG_ON(!m->bio_iter);
                        ret = read_partial_message_bio(con,
-                                                &m->bio_iter, &m->bio_seg,
                                                 data_len, do_datacrc);
                        if (ret <= 0)
                                return ret;
@@ -2672,6 +2690,49 @@ void ceph_con_keepalive(struct ceph_connection *con)
 }
 EXPORT_SYMBOL(ceph_con_keepalive);
 
+void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
+               size_t length, size_t alignment)
+{
+       BUG_ON(!pages);
+       BUG_ON(!length);
+       BUG_ON(msg->pages);
+       BUG_ON(msg->length);
+
+       msg->pages = pages;
+       msg->length = length;
+       msg->page_alignment = alignment & ~PAGE_MASK;
+}
+EXPORT_SYMBOL(ceph_msg_data_set_pages);
+
+void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
+                               struct ceph_pagelist *pagelist)
+{
+       BUG_ON(!pagelist);
+       BUG_ON(!pagelist->length);
+       BUG_ON(msg->pagelist);
+
+       msg->pagelist = pagelist;
+}
+EXPORT_SYMBOL(ceph_msg_data_set_pagelist);
+
+void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio)
+{
+       BUG_ON(!bio);
+       BUG_ON(msg->bio);
+
+       msg->bio = bio;
+}
+EXPORT_SYMBOL(ceph_msg_data_set_bio);
+
+void ceph_msg_data_set_trail(struct ceph_msg *msg, struct ceph_pagelist *trail)
+{
+       BUG_ON(!trail);
+       BUG_ON(!trail->length);
+       BUG_ON(msg->trail);
+
+       msg->trail = trail;
+}
+EXPORT_SYMBOL(ceph_msg_data_set_trail);
 
 /*
  * construct a new message with given type, size
@@ -2682,49 +2743,19 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
 {
        struct ceph_msg *m;
 
-       m = kmalloc(sizeof(*m), flags);
+       m = kzalloc(sizeof(*m), flags);
        if (m == NULL)
                goto out;
-       kref_init(&m->kref);
 
-       m->con = NULL;
-       INIT_LIST_HEAD(&m->list_head);
-
-       m->hdr.tid = 0;
        m->hdr.type = cpu_to_le16(type);
        m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT);
-       m->hdr.version = 0;
        m->hdr.front_len = cpu_to_le32(front_len);
-       m->hdr.middle_len = 0;
-       m->hdr.data_len = 0;
-       m->hdr.data_off = 0;
-       m->hdr.reserved = 0;
-       m->footer.front_crc = 0;
-       m->footer.middle_crc = 0;
-       m->footer.data_crc = 0;
-       m->footer.flags = 0;
-       m->front_max = front_len;
-       m->front_is_vmalloc = false;
-       m->more_to_follow = false;
-       m->ack_stamp = 0;
-       m->pool = NULL;
-
-       /* middle */
-       m->middle = NULL;
 
-       /* data */
-       m->nr_pages = 0;
-       m->page_alignment = 0;
-       m->pages = NULL;
-       m->pagelist = NULL;
-#ifdef CONFIG_BLOCK
-       m->bio = NULL;
-       m->bio_iter = NULL;
-       m->bio_seg = 0;
-#endif /* CONFIG_BLOCK */
-       m->trail = NULL;
+       INIT_LIST_HEAD(&m->list_head);
+       kref_init(&m->kref);
 
        /* front */
+       m->front_max = front_len;
        if (front_len) {
                if (front_len > PAGE_CACHE_SIZE) {
                        m->front.iov_base = __vmalloc(front_len, flags,
@@ -2802,49 +2833,37 @@ static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
 static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
 {
        struct ceph_msg_header *hdr = &con->in_hdr;
-       int type = le16_to_cpu(hdr->type);
-       int front_len = le32_to_cpu(hdr->front_len);
        int middle_len = le32_to_cpu(hdr->middle_len);
+       struct ceph_msg *msg;
        int ret = 0;
 
        BUG_ON(con->in_msg != NULL);
+       BUG_ON(!con->ops->alloc_msg);
 
-       if (con->ops->alloc_msg) {
-               struct ceph_msg *msg;
-
-               mutex_unlock(&con->mutex);
-               msg = con->ops->alloc_msg(con, hdr, skip);
-               mutex_lock(&con->mutex);
-               if (con->state != CON_STATE_OPEN) {
-                       if (msg)
-                               ceph_msg_put(msg);
-                       return -EAGAIN;
-               }
-               con->in_msg = msg;
-               if (con->in_msg) {
-                       con->in_msg->con = con->ops->get(con);
-                       BUG_ON(con->in_msg->con == NULL);
-               }
-               if (*skip) {
-                       con->in_msg = NULL;
-                       return 0;
-               }
-               if (!con->in_msg) {
-                       con->error_msg =
-                               "error allocating memory for incoming message";
-                       return -ENOMEM;
-               }
+       mutex_unlock(&con->mutex);
+       msg = con->ops->alloc_msg(con, hdr, skip);
+       mutex_lock(&con->mutex);
+       if (con->state != CON_STATE_OPEN) {
+               if (msg)
+                       ceph_msg_put(msg);
+               return -EAGAIN;
        }
-       if (!con->in_msg) {
-               con->in_msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
-               if (!con->in_msg) {
-                       pr_err("unable to allocate msg type %d len %d\n",
-                              type, front_len);
-                       return -ENOMEM;
-               }
+       if (msg) {
+               BUG_ON(*skip);
+               con->in_msg = msg;
                con->in_msg->con = con->ops->get(con);
                BUG_ON(con->in_msg->con == NULL);
-               con->in_msg->page_alignment = le16_to_cpu(hdr->data_off);
+       } else {
+               /*
+                * Null message pointer means either we should skip
+                * this message or we couldn't allocate memory.  The
+                * former is not an error.
+                */
+               if (*skip)
+                       return 0;
+               con->error_msg = "error allocating memory for incoming message";
+
+               return -ENOMEM;
        }
        memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
 
@@ -2888,7 +2907,7 @@ void ceph_msg_last_put(struct kref *kref)
                ceph_buffer_put(m->middle);
                m->middle = NULL;
        }
-       m->nr_pages = 0;
+       m->length = 0;
        m->pages = NULL;
 
        if (m->pagelist) {
@@ -2908,8 +2927,8 @@ EXPORT_SYMBOL(ceph_msg_last_put);
 
 void ceph_msg_dump(struct ceph_msg *msg)
 {
-       pr_debug("msg_dump %p (front_max %d nr_pages %d)\n", msg,
-                msg->front_max, msg->nr_pages);
+       pr_debug("msg_dump %p (front_max %d length %zd)\n", msg,
+                msg->front_max, msg->length);
        print_hex_dump(KERN_DEBUG, "header: ",
                       DUMP_PREFIX_OFFSET, 16, 1,
                       &msg->hdr, sizeof(msg->hdr), true);