]> git.proxmox.com Git - mirror_ubuntu-bionic-kernel.git/commitdiff
Merge tag 'v3.5-rc1'
authorSage Weil <sage@inktank.com>
Fri, 15 Jun 2012 19:32:04 +0000 (12:32 -0700)
committerSage Weil <sage@inktank.com>
Fri, 15 Jun 2012 19:32:04 +0000 (12:32 -0700)
Linux 3.5-rc1

Conflicts:
net/ceph/messenger.c

1  2 
include/linux/ceph/libceph.h
net/ceph/ceph_common.c
net/ceph/messenger.c
net/ceph/mon_client.c
net/ceph/osd_client.c
net/ceph/osdmap.c

index 927361c4b0a86403c456c7e7f8d2a020631abe26,e71d683982a6f651a83a22fd7ef12bf7af6f655f..98ec36ae8a3b1a3e678e79977df064aaf4cafc88
@@@ -7,6 -7,7 +7,7 @@@
  #include <linux/backing-dev.h>
  #include <linux/completion.h>
  #include <linux/exportfs.h>
+ #include <linux/bug.h>
  #include <linux/fs.h>
  #include <linux/mempool.h>
  #include <linux/pagemap.h>
@@@ -131,7 -132,7 +132,7 @@@ struct ceph_client 
        u32 supported_features;
        u32 required_features;
  
 -      struct ceph_messenger *msgr;   /* messenger instance */
 +      struct ceph_messenger msgr;   /* messenger instance */
        struct ceph_mon_client monc;
        struct ceph_osd_client osdc;
  
diff --combined net/ceph/ceph_common.c
index c815f31a1a3fc470c97ed1e66fb53a1ec5325043,a776f751edbf223220b63cdac9dadb38187bb2b6..58b09efb528d15a40a9dc57aabb2b67304052267
@@@ -441,8 -441,8 +441,8 @@@ EXPORT_SYMBOL(ceph_client_id)
   * create a fresh client instance
   */
  struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private,
-                                      unsigned supported_features,
-                                      unsigned required_features)
+                                      unsigned int supported_features,
+                                      unsigned int required_features)
  {
        struct ceph_client *client;
        struct ceph_entity_addr *myaddr = NULL;
        /* msgr */
        if (ceph_test_opt(client, MYIP))
                myaddr = &client->options->my_addr;
 -      client->msgr = ceph_messenger_create(myaddr,
 -                                           client->supported_features,
 -                                           client->required_features);
 -      if (IS_ERR(client->msgr)) {
 -              err = PTR_ERR(client->msgr);
 -              goto fail;
 -      }
 -      client->msgr->nocrc = ceph_test_opt(client, NOCRC);
 +      ceph_messenger_init(&client->msgr, myaddr,
 +              client->supported_features,
 +              client->required_features,
 +              ceph_test_opt(client, NOCRC));
  
        /* subsystems */
        err = ceph_monc_init(&client->monc, client);
        if (err < 0)
 -              goto fail_msgr;
 +              goto fail;
        err = ceph_osdc_init(&client->osdc, client);
        if (err < 0)
                goto fail_monc;
  
  fail_monc:
        ceph_monc_stop(&client->monc);
 -fail_msgr:
 -      ceph_messenger_destroy(client->msgr);
  fail:
        kfree(client);
        return ERR_PTR(err);
@@@ -498,10 -504,19 +498,10 @@@ void ceph_destroy_client(struct ceph_cl
        /* unmount */
        ceph_osdc_stop(&client->osdc);
  
 -      /*
 -       * make sure osd connections close out before destroying the
 -       * auth module, which is needed to free those connections'
 -       * ceph_authorizers.
 -       */
 -      ceph_msgr_flush();
 -
        ceph_monc_stop(&client->monc);
  
        ceph_debugfs_client_cleanup(client);
  
 -      ceph_messenger_destroy(client->msgr);
 -
        ceph_destroy_options(client->options);
  
        kfree(client);
diff --combined net/ceph/messenger.c
index bdbecac2d69db2d50587b34774f2f17a5c87ec06,524f4e4f598b845a7242c0243efb1a4e6a843955..5e9f61d6d2340ef932068dc88d65609b593d4f63
   * the sender.
   */
  
 +/* State values for ceph_connection->sock_state; NEW is assumed to be 0 */
 +
 +#define CON_SOCK_STATE_NEW            0       /* -> CLOSED */
 +#define CON_SOCK_STATE_CLOSED         1       /* -> CONNECTING */
 +#define CON_SOCK_STATE_CONNECTING     2       /* -> CONNECTED or -> CLOSING */
 +#define CON_SOCK_STATE_CONNECTED      3       /* -> CLOSING or -> CLOSED */
 +#define CON_SOCK_STATE_CLOSING                4       /* -> CLOSED */
 +
  /* static tag bytes (protocol control messages) */
  static char tag_msg = CEPH_MSGR_TAG_MSG;
  static char tag_ack = CEPH_MSGR_TAG_ACK;
@@@ -155,101 -147,52 +155,101 @@@ void ceph_msgr_flush(void
  }
  EXPORT_SYMBOL(ceph_msgr_flush);
  
 +/* Connection socket state transition functions */
 +
 +static void con_sock_state_init(struct ceph_connection *con)
 +{
 +      int old_state;
 +
 +      old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED);
 +      if (WARN_ON(old_state != CON_SOCK_STATE_NEW))
 +              printk("%s: unexpected old state %d\n", __func__, old_state);
 +}
 +
 +static void con_sock_state_connecting(struct ceph_connection *con)
 +{
 +      int old_state;
 +
 +      old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTING);
 +      if (WARN_ON(old_state != CON_SOCK_STATE_CLOSED))
 +              printk("%s: unexpected old state %d\n", __func__, old_state);
 +}
 +
 +static void con_sock_state_connected(struct ceph_connection *con)
 +{
 +      int old_state;
 +
 +      old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTED);
 +      if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING))
 +              printk("%s: unexpected old state %d\n", __func__, old_state);
 +}
 +
 +static void con_sock_state_closing(struct ceph_connection *con)
 +{
 +      int old_state;
 +
 +      old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSING);
 +      if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING &&
 +                      old_state != CON_SOCK_STATE_CONNECTED &&
 +                      old_state != CON_SOCK_STATE_CLOSING))
 +              printk("%s: unexpected old state %d\n", __func__, old_state);
 +}
 +
 +static void con_sock_state_closed(struct ceph_connection *con)
 +{
 +      int old_state;
 +
 +      old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED);
 +      if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTED &&
 +                      old_state != CON_SOCK_STATE_CLOSING))
 +              printk("%s: unexpected old state %d\n", __func__, old_state);
 +}
  
  /*
   * socket callback functions
   */
  
  /* data available on socket, or listen socket received a connect */
 -static void ceph_data_ready(struct sock *sk, int count_unused)
 +static void ceph_sock_data_ready(struct sock *sk, int count_unused)
  {
        struct ceph_connection *con = sk->sk_user_data;
  
        if (sk->sk_state != TCP_CLOSE_WAIT) {
 -              dout("ceph_data_ready on %p state = %lu, queueing work\n",
 +              dout("%s on %p state = %lu, queueing work\n", __func__,
                     con, con->state);
                queue_con(con);
        }
  }
  
  /* socket has buffer space for writing */
 -static void ceph_write_space(struct sock *sk)
 +static void ceph_sock_write_space(struct sock *sk)
  {
        struct ceph_connection *con = sk->sk_user_data;
  
        /* only queue to workqueue if there is data we want to write,
         * and there is sufficient space in the socket buffer to accept
 -       * more data.  clear SOCK_NOSPACE so that ceph_write_space()
 +       * more data.  clear SOCK_NOSPACE so that ceph_sock_write_space()
         * doesn't get called again until try_write() fills the socket
         * buffer. See net/ipv4/tcp_input.c:tcp_check_space()
         * and net/core/stream.c:sk_stream_write_space().
         */
 -      if (test_bit(WRITE_PENDING, &con->state)) {
 +      if (test_bit(WRITE_PENDING, &con->flags)) {
                if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
 -                      dout("ceph_write_space %p queueing write work\n", con);
 +                      dout("%s %p queueing write work\n", __func__, con);
                        clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
                        queue_con(con);
                }
        } else {
 -              dout("ceph_write_space %p nothing to write\n", con);
 +              dout("%s %p nothing to write\n", __func__, con);
        }
  }
  
  /* socket's state has changed */
 -static void ceph_state_change(struct sock *sk)
 +static void ceph_sock_state_change(struct sock *sk)
  {
        struct ceph_connection *con = sk->sk_user_data;
  
 -      dout("ceph_state_change %p state = %lu sk_state = %u\n",
 +      dout("%s %p state = %lu sk_state = %u\n", __func__,
             con, con->state, sk->sk_state);
  
        if (test_bit(CLOSED, &con->state))
  
        switch (sk->sk_state) {
        case TCP_CLOSE:
 -              dout("ceph_state_change TCP_CLOSE\n");
 +              dout("%s TCP_CLOSE\n", __func__);
        case TCP_CLOSE_WAIT:
 -              dout("ceph_state_change TCP_CLOSE_WAIT\n");
 -              if (test_and_set_bit(SOCK_CLOSED, &con->state) == 0) {
 +              dout("%s TCP_CLOSE_WAIT\n", __func__);
 +              con_sock_state_closing(con);
 +              if (test_and_set_bit(SOCK_CLOSED, &con->flags) == 0) {
                        if (test_bit(CONNECTING, &con->state))
                                con->error_msg = "connection failed";
                        else
                }
                break;
        case TCP_ESTABLISHED:
 -              dout("ceph_state_change TCP_ESTABLISHED\n");
 +              dout("%s TCP_ESTABLISHED\n", __func__);
 +              con_sock_state_connected(con);
                queue_con(con);
                break;
        default:        /* Everything else is uninteresting */
@@@ -287,9 -228,9 +287,9 @@@ static void set_sock_callbacks(struct s
  {
        struct sock *sk = sock->sk;
        sk->sk_user_data = con;
 -      sk->sk_data_ready = ceph_data_ready;
 -      sk->sk_write_space = ceph_write_space;
 -      sk->sk_state_change = ceph_state_change;
 +      sk->sk_data_ready = ceph_sock_data_ready;
 +      sk->sk_write_space = ceph_sock_write_space;
 +      sk->sk_state_change = ceph_sock_state_change;
  }
  
  
@@@ -321,7 -262,6 +321,7 @@@ static int ceph_tcp_connect(struct ceph
  
        dout("connect %s\n", ceph_pr_addr(&con->peer_addr.in_addr));
  
 +      con_sock_state_connecting(con);
        ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr),
                                 O_NONBLOCK);
        if (ret == -EINPROGRESS) {
                return ret;
        }
        con->sock = sock;
 -
        return 0;
  }
  
@@@ -402,7 -343,6 +402,7 @@@ static int con_close_socket(struct ceph
        sock_release(con->sock);
        con->sock = NULL;
        clear_bit(SOCK_CLOSED, &con->state);
 +      con_sock_state_closed(con);
        return rc;
  }
  
  static void ceph_msg_remove(struct ceph_msg *msg)
  {
        list_del_init(&msg->list_head);
 +      BUG_ON(msg->con == NULL);
 +      ceph_con_put(msg->con);
 +      msg->con = NULL;
 +
        ceph_msg_put(msg);
  }
  static void ceph_msg_remove_list(struct list_head *head)
@@@ -436,11 -372,8 +436,11 @@@ static void reset_connection(struct cep
        ceph_msg_remove_list(&con->out_sent);
  
        if (con->in_msg) {
 +              BUG_ON(con->in_msg->con != con);
 +              con->in_msg->con = NULL;
                ceph_msg_put(con->in_msg);
                con->in_msg = NULL;
 +              ceph_con_put(con->in_msg->con);
        }
  
        con->connect_seq = 0;
@@@ -460,14 -393,11 +460,14 @@@ void ceph_con_close(struct ceph_connect
  {
        dout("con_close %p peer %s\n", con,
             ceph_pr_addr(&con->peer_addr.in_addr));
 -      set_bit(CLOSED, &con->state);  /* in case there's queued work */
 +      clear_bit(NEGOTIATING, &con->state);
        clear_bit(STANDBY, &con->state);  /* avoid connect_seq bump */
 -      clear_bit(LOSSYTX, &con->state);  /* so we retry next connect */
 -      clear_bit(KEEPALIVE_PENDING, &con->state);
 -      clear_bit(WRITE_PENDING, &con->state);
 +      set_bit(CLOSED, &con->state);
 +
 +      clear_bit(LOSSYTX, &con->flags);  /* so we retry next connect */
 +      clear_bit(KEEPALIVE_PENDING, &con->flags);
 +      clear_bit(WRITE_PENDING, &con->flags);
 +
        mutex_lock(&con->mutex);
        reset_connection(con);
        con->peer_global_seq = 0;
@@@ -484,8 -414,7 +484,8 @@@ void ceph_con_open(struct ceph_connecti
  {
        dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr));
        set_bit(OPENING, &con->state);
 -      clear_bit(CLOSED, &con->state);
 +      WARN_ON(!test_and_clear_bit(CLOSED, &con->state));
 +
        memcpy(&con->peer_addr, addr, sizeof(*addr));
        con->delay = 0;      /* reset backoff memory */
        queue_con(con);
@@@ -527,28 -456,16 +527,28 @@@ void ceph_con_put(struct ceph_connectio
  /*
   * initialize a new connection.
   */
 -void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con)
 +void ceph_con_init(struct ceph_connection *con, void *private,
 +      const struct ceph_connection_operations *ops,
 +      struct ceph_messenger *msgr, __u8 entity_type, __u64 entity_num)
  {
        dout("con_init %p\n", con);
        memset(con, 0, sizeof(*con));
 +      con->private = private;
 +      con->ops = ops;
        atomic_set(&con->nref, 1);
        con->msgr = msgr;
 +
 +      con_sock_state_init(con);
 +
 +      con->peer_name.type = (__u8) entity_type;
 +      con->peer_name.num = cpu_to_le64(entity_num);
 +
        mutex_init(&con->mutex);
        INIT_LIST_HEAD(&con->out_queue);
        INIT_LIST_HEAD(&con->out_sent);
        INIT_DELAYED_WORK(&con->work, con_work);
 +
 +      set_bit(CLOSED, &con->state);
  }
  EXPORT_SYMBOL(ceph_con_init);
  
@@@ -569,14 -486,14 +569,14 @@@ static u32 get_global_seq(struct ceph_m
        return ret;
  }
  
 -static void ceph_con_out_kvec_reset(struct ceph_connection *con)
 +static void con_out_kvec_reset(struct ceph_connection *con)
  {
        con->out_kvec_left = 0;
        con->out_kvec_bytes = 0;
        con->out_kvec_cur = &con->out_kvec[0];
  }
  
 -static void ceph_con_out_kvec_add(struct ceph_connection *con,
 +static void con_out_kvec_add(struct ceph_connection *con,
                                size_t size, void *data)
  {
        int index;
@@@ -617,7 -534,7 +617,7 @@@ static void prepare_write_message(struc
        struct ceph_msg *m;
        u32 crc;
  
 -      ceph_con_out_kvec_reset(con);
 +      con_out_kvec_reset(con);
        con->out_kvec_is_msg = true;
        con->out_msg_done = false;
  
         * TCP packet that's a good thing. */
        if (con->in_seq > con->in_seq_acked) {
                con->in_seq_acked = con->in_seq;
 -              ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
 +              con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
                con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
 -              ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack),
 +              con_out_kvec_add(con, sizeof (con->out_temp_ack),
                        &con->out_temp_ack);
        }
  
 +      BUG_ON(list_empty(&con->out_queue));
        m = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
        con->out_msg = m;
 +      BUG_ON(m->con != con);
  
        /* put message on sent list */
        ceph_msg_get(m);
                m->hdr.seq = cpu_to_le64(++con->out_seq);
                m->needs_out_seq = false;
        }
 +#ifdef CONFIG_BLOCK
 +      else
 +              m->bio_iter = NULL;
 +#endif
  
        dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n",
             m, con->out_seq, le16_to_cpu(m->hdr.type),
        BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
  
        /* tag + hdr + front + middle */
 -      ceph_con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
 -      ceph_con_out_kvec_add(con, sizeof (m->hdr), &m->hdr);
 -      ceph_con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
 +      con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
 +      con_out_kvec_add(con, sizeof (m->hdr), &m->hdr);
 +      con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
  
        if (m->middle)
 -              ceph_con_out_kvec_add(con, m->middle->vec.iov_len,
 +              con_out_kvec_add(con, m->middle->vec.iov_len,
                        m->middle->vec.iov_base);
  
        /* fill in crc (except data pages), footer */
                prepare_write_message_footer(con);
        }
  
 -      set_bit(WRITE_PENDING, &con->state);
 +      set_bit(WRITE_PENDING, &con->flags);
  }
  
  /*
@@@ -715,16 -626,16 +715,16 @@@ static void prepare_write_ack(struct ce
             con->in_seq_acked, con->in_seq);
        con->in_seq_acked = con->in_seq;
  
 -      ceph_con_out_kvec_reset(con);
 +      con_out_kvec_reset(con);
  
 -      ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
 +      con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
  
        con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
 -      ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack),
 +      con_out_kvec_add(con, sizeof (con->out_temp_ack),
                                &con->out_temp_ack);
  
        con->out_more = 1;  /* more will follow.. eventually.. */
 -      set_bit(WRITE_PENDING, &con->state);
 +      set_bit(WRITE_PENDING, &con->flags);
  }
  
  /*
  static void prepare_write_keepalive(struct ceph_connection *con)
  {
        dout("prepare_write_keepalive %p\n", con);
 -      ceph_con_out_kvec_reset(con);
 -      ceph_con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive);
 -      set_bit(WRITE_PENDING, &con->state);
 +      con_out_kvec_reset(con);
 +      con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive);
 +      set_bit(WRITE_PENDING, &con->flags);
  }
  
  /*
@@@ -764,7 -675,7 +764,7 @@@ static struct ceph_auth_handshake *get_
  
        if (IS_ERR(auth))
                return auth;
 -      if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->state))
 +      if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->flags))
                return ERR_PTR(-EAGAIN);
  
        con->auth_reply_buf = auth->authorizer_reply_buf;
   */
  static void prepare_write_banner(struct ceph_connection *con)
  {
 -      ceph_con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
 -      ceph_con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr),
 +      con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
 +      con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr),
                                        &con->msgr->my_enc_addr);
  
        con->out_more = 0;
 -      set_bit(WRITE_PENDING, &con->state);
 +      set_bit(WRITE_PENDING, &con->flags);
  }
  
  static int prepare_write_connect(struct ceph_connection *con)
  {
-       unsigned global_seq = get_global_seq(con->msgr, 0);
+       unsigned int global_seq = get_global_seq(con->msgr, 0);
        int proto;
        int auth_proto;
        struct ceph_auth_handshake *auth;
        con->out_connect.authorizer_len = auth ?
                cpu_to_le32(auth->authorizer_buf_len) : 0;
  
 -      ceph_con_out_kvec_add(con, sizeof (con->out_connect),
 +      con_out_kvec_add(con, sizeof (con->out_connect),
                                        &con->out_connect);
        if (auth && auth->authorizer_buf_len)
 -              ceph_con_out_kvec_add(con, auth->authorizer_buf_len,
 +              con_out_kvec_add(con, auth->authorizer_buf_len,
                                        auth->authorizer_buf);
  
        con->out_more = 0;
 -      set_bit(WRITE_PENDING, &con->state);
 +      set_bit(WRITE_PENDING, &con->flags);
  
        return 0;
  }
@@@ -917,7 -828,7 +917,7 @@@ static void iter_bio_next(struct bio **
  static int write_partial_msg_pages(struct ceph_connection *con)
  {
        struct ceph_msg *msg = con->out_msg;
-       unsigned data_len = le32_to_cpu(msg->hdr.data_len);
+       unsigned int data_len = le32_to_cpu(msg->hdr.data_len);
        size_t len;
        bool do_datacrc = !con->msgr->nocrc;
        int ret;
        /* prepare and queue up footer, too */
        if (!do_datacrc)
                con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
 -      ceph_con_out_kvec_reset(con);
 +      con_out_kvec_reset(con);
        prepare_write_message_footer(con);
        ret = 1;
  out:
@@@ -1445,6 -1356,11 +1445,6 @@@ static void fail_protocol(struct ceph_c
  {
        reset_connection(con);
        set_bit(CLOSED, &con->state);  /* in case there's queued work */
 -
 -      mutex_unlock(&con->mutex);
 -      if (con->ops->bad_proto)
 -              con->ops->bad_proto(con);
 -      mutex_lock(&con->mutex);
  }
  
  static int process_connect(struct ceph_connection *con)
                        return -1;
                }
                con->auth_retry = 1;
 -              ceph_con_out_kvec_reset(con);
 +              con_out_kvec_reset(con);
                ret = prepare_write_connect(con);
                if (ret < 0)
                        return ret;
                       ENTITY_NAME(con->peer_name),
                       ceph_pr_addr(&con->peer_addr.in_addr));
                reset_connection(con);
 -              ceph_con_out_kvec_reset(con);
 +              con_out_kvec_reset(con);
                ret = prepare_write_connect(con);
                if (ret < 0)
                        return ret;
                     le32_to_cpu(con->out_connect.connect_seq),
                     le32_to_cpu(con->in_connect.connect_seq));
                con->connect_seq = le32_to_cpu(con->in_connect.connect_seq);
 -              ceph_con_out_kvec_reset(con);
 +              con_out_kvec_reset(con);
                ret = prepare_write_connect(con);
                if (ret < 0)
                        return ret;
                     le32_to_cpu(con->in_connect.global_seq));
                get_global_seq(con->msgr,
                               le32_to_cpu(con->in_connect.global_seq));
 -              ceph_con_out_kvec_reset(con);
 +              con_out_kvec_reset(con);
                ret = prepare_write_connect(con);
                if (ret < 0)
                        return ret;
                        le32_to_cpu(con->in_reply.connect_seq));
  
                if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
 -                      set_bit(LOSSYTX, &con->state);
 +                      set_bit(LOSSYTX, &con->flags);
  
                prepare_read_tag(con);
                break;
@@@ -1667,13 -1583,14 +1667,13 @@@ static int read_partial_message_section
        return 1;
  }
  
 -static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
 -                              struct ceph_msg_header *hdr,
 -                              int *skip);
 +static bool ceph_con_in_msg_alloc(struct ceph_connection *con,
 +                              struct ceph_msg_header *hdr);
  
  
  static int read_partial_message_pages(struct ceph_connection *con,
                                      struct page **pages,
-                                     unsigned data_len, bool do_datacrc)
+                                     unsigned int data_len, bool do_datacrc)
  {
        void *p;
        int ret;
  #ifdef CONFIG_BLOCK
  static int read_partial_message_bio(struct ceph_connection *con,
                                    struct bio **bio_iter, int *bio_seg,
-                                   unsigned data_len, bool do_datacrc)
+                                   unsigned int data_len, bool do_datacrc)
  {
        struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
        void *p;
@@@ -1749,8 -1666,9 +1749,8 @@@ static int read_partial_message(struct 
        int size;
        int end;
        int ret;
-       unsigned front_len, middle_len, data_len;
+       unsigned int front_len, middle_len, data_len;
        bool do_datacrc = !con->msgr->nocrc;
 -      int skip;
        u64 seq;
        u32 crc;
  
        if (!con->in_msg) {
                dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
                     con->in_hdr.front_len, con->in_hdr.data_len);
 -              skip = 0;
 -              con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip);
 -              if (skip) {
 +              if (ceph_con_in_msg_alloc(con, &con->in_hdr)) {
                        /* skip this message */
                        dout("alloc_msg said skip message\n");
                        BUG_ON(con->in_msg);
                                "error allocating memory for incoming message";
                        return -ENOMEM;
                }
 +
 +              BUG_ON(con->in_msg->con != con);
                m = con->in_msg;
                m->front.iov_len = 0;    /* haven't read it yet */
                if (m->middle)
@@@ -1915,11 -1833,8 +1915,11 @@@ static void process_message(struct ceph
  {
        struct ceph_msg *msg;
  
 +      BUG_ON(con->in_msg->con != con);
 +      con->in_msg->con = NULL;
        msg = con->in_msg;
        con->in_msg = NULL;
 +      ceph_con_put(con);
  
        /* if first message, set peer_name */
        if (con->peer_name.type == 0)
@@@ -1959,15 -1874,14 +1959,15 @@@ more
  
        /* open the socket first? */
        if (con->sock == NULL) {
 -              ceph_con_out_kvec_reset(con);
 +              clear_bit(NEGOTIATING, &con->state);
 +              set_bit(CONNECTING, &con->state);
 +
 +              con_out_kvec_reset(con);
                prepare_write_banner(con);
                ret = prepare_write_connect(con);
                if (ret < 0)
                        goto out;
                prepare_read_banner(con);
 -              set_bit(CONNECTING, &con->state);
 -              clear_bit(NEGOTIATING, &con->state);
  
                BUG_ON(con->in_msg);
                con->in_tag = CEPH_MSGR_TAG_READY;
@@@ -2024,14 -1938,14 +2024,14 @@@ do_next
                        prepare_write_ack(con);
                        goto more;
                }
 -              if (test_and_clear_bit(KEEPALIVE_PENDING, &con->state)) {
 +              if (test_and_clear_bit(KEEPALIVE_PENDING, &con->flags)) {
                        prepare_write_keepalive(con);
                        goto more;
                }
        }
  
        /* Nothing to do! */
 -      clear_bit(WRITE_PENDING, &con->state);
 +      clear_bit(WRITE_PENDING, &con->flags);
        dout("try_write nothing else to write.\n");
        ret = 0;
  out:
@@@ -2173,6 -2087,12 +2173,6 @@@ bad_tag
   */
  static void queue_con(struct ceph_connection *con)
  {
 -      if (test_bit(DEAD, &con->state)) {
 -              dout("queue_con %p ignoring: DEAD\n",
 -                   con);
 -              return;
 -      }
 -
        if (!con->ops->get(con)) {
                dout("queue_con %p ref count 0\n", con);
                return;
@@@ -2197,7 -2117,7 +2197,7 @@@ static void con_work(struct work_struc
  
        mutex_lock(&con->mutex);
  restart:
 -      if (test_and_clear_bit(BACKOFF, &con->state)) {
 +      if (test_and_clear_bit(BACKOFF, &con->flags)) {
                dout("con_work %p backing off\n", con);
                if (queue_delayed_work(ceph_msgr_wq, &con->work,
                                       round_jiffies_relative(con->delay))) {
                con_close_socket(con);
        }
  
 -      if (test_and_clear_bit(SOCK_CLOSED, &con->state))
 +      if (test_and_clear_bit(SOCK_CLOSED, &con->flags))
                goto fault;
  
        ret = try_read(con);
@@@ -2265,7 -2185,7 +2265,7 @@@ static void ceph_fault(struct ceph_conn
        dout("fault %p state %lu to peer %s\n",
             con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));
  
 -      if (test_bit(LOSSYTX, &con->state)) {
 +      if (test_bit(LOSSYTX, &con->flags)) {
                dout("fault on LOSSYTX channel\n");
                goto out;
        }
        con_close_socket(con);
  
        if (con->in_msg) {
 +              BUG_ON(con->in_msg->con != con);
 +              con->in_msg->con = NULL;
                ceph_msg_put(con->in_msg);
                con->in_msg = NULL;
 +              ceph_con_put(con);
        }
  
        /* Requeue anything that hasn't been acked */
        /* If there are no messages queued or keepalive pending, place
         * the connection in a STANDBY state */
        if (list_empty(&con->out_queue) &&
 -          !test_bit(KEEPALIVE_PENDING, &con->state)) {
 +          !test_bit(KEEPALIVE_PENDING, &con->flags)) {
                dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
 -              clear_bit(WRITE_PENDING, &con->state);
 +              clear_bit(WRITE_PENDING, &con->flags);
                set_bit(STANDBY, &con->state);
        } else {
                /* retry after a delay. */
                         * that when con_work restarts we schedule the
                         * delay then.
                         */
 -                      set_bit(BACKOFF, &con->state);
 +                      set_bit(BACKOFF, &con->flags);
                }
        }
  
  
  
  /*
 - * create a new messenger instance
 + * initialize a new messenger instance
   */
 -struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr,
 -                                           u32 supported_features,
 -                                           u32 required_features)
 +void ceph_messenger_init(struct ceph_messenger *msgr,
 +                      struct ceph_entity_addr *myaddr,
 +                      u32 supported_features,
 +                      u32 required_features,
 +                      bool nocrc)
  {
 -      struct ceph_messenger *msgr;
 -
 -      msgr = kzalloc(sizeof(*msgr), GFP_KERNEL);
 -      if (msgr == NULL)
 -              return ERR_PTR(-ENOMEM);
 -
        msgr->supported_features = supported_features;
        msgr->required_features = required_features;
  
        msgr->inst.addr.type = 0;
        get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce));
        encode_my_addr(msgr);
 +      msgr->nocrc = nocrc;
  
 -      dout("messenger_create %p\n", msgr);
 -      return msgr;
 +      dout("%s %p\n", __func__, msgr);
  }
 -EXPORT_SYMBOL(ceph_messenger_create);
 -
 -void ceph_messenger_destroy(struct ceph_messenger *msgr)
 -{
 -      dout("destroy %p\n", msgr);
 -      kfree(msgr);
 -      dout("destroyed messenger %p\n", msgr);
 -}
 -EXPORT_SYMBOL(ceph_messenger_destroy);
 +EXPORT_SYMBOL(ceph_messenger_init);
  
  static void clear_standby(struct ceph_connection *con)
  {
                mutex_lock(&con->mutex);
                dout("clear_standby %p and ++connect_seq\n", con);
                con->connect_seq++;
 -              WARN_ON(test_bit(WRITE_PENDING, &con->state));
 -              WARN_ON(test_bit(KEEPALIVE_PENDING, &con->state));
 +              WARN_ON(test_bit(WRITE_PENDING, &con->flags));
 +              WARN_ON(test_bit(KEEPALIVE_PENDING, &con->flags));
                mutex_unlock(&con->mutex);
        }
  }
@@@ -2398,11 -2327,6 +2398,11 @@@ void ceph_con_send(struct ceph_connecti
  
        /* queue */
        mutex_lock(&con->mutex);
 +
 +      BUG_ON(msg->con != NULL);
 +      msg->con = ceph_con_get(con);
 +      BUG_ON(msg->con == NULL);
 +
        BUG_ON(!list_empty(&msg->list_head));
        list_add_tail(&msg->list_head, &con->out_queue);
        dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
        /* if there wasn't anything waiting to send before, queue
         * new work */
        clear_standby(con);
 -      if (test_and_set_bit(WRITE_PENDING, &con->state) == 0)
 +      if (test_and_set_bit(WRITE_PENDING, &con->flags) == 0)
                queue_con(con);
  }
  EXPORT_SYMBOL(ceph_con_send);
  /*
   * Revoke a message that was previously queued for send
   */
 -void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
 +void ceph_msg_revoke(struct ceph_msg *msg)
  {
 +      struct ceph_connection *con = msg->con;
 +
 +      if (!con)
 +              return;         /* Message not in our possession */
 +
        mutex_lock(&con->mutex);
        if (!list_empty(&msg->list_head)) {
 -              dout("con_revoke %p msg %p - was on queue\n", con, msg);
 +              dout("%s %p msg %p - was on queue\n", __func__, con, msg);
                list_del_init(&msg->list_head);
 -              ceph_msg_put(msg);
 +              BUG_ON(msg->con == NULL);
 +              ceph_con_put(msg->con);
 +              msg->con = NULL;
                msg->hdr.seq = 0;
 +
 +              ceph_msg_put(msg);
        }
        if (con->out_msg == msg) {
 -              dout("con_revoke %p msg %p - was sending\n", con, msg);
 +              dout("%s %p msg %p - was sending\n", __func__, con, msg);
                con->out_msg = NULL;
                if (con->out_kvec_is_msg) {
                        con->out_skip = con->out_kvec_bytes;
                        con->out_kvec_is_msg = false;
                }
 -              ceph_msg_put(msg);
                msg->hdr.seq = 0;
 +
 +              ceph_msg_put(msg);
        }
        mutex_unlock(&con->mutex);
  }
  /*
   * Revoke a message that we may be reading data into
   */
 -void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg)
 +void ceph_msg_revoke_incoming(struct ceph_msg *msg)
  {
 +      struct ceph_connection *con;
 +
 +      BUG_ON(msg == NULL);
 +      if (!msg->con) {
 +              dout("%s msg %p null con\n", __func__, msg);
 +
 +              return;         /* Message not in our possession */
 +      }
 +
 +      con = msg->con;
        mutex_lock(&con->mutex);
 -      if (con->in_msg && con->in_msg == msg) {
 +      if (con->in_msg == msg) {
-               unsigned front_len = le32_to_cpu(con->in_hdr.front_len);
-               unsigned middle_len = le32_to_cpu(con->in_hdr.middle_len);
-               unsigned data_len = le32_to_cpu(con->in_hdr.data_len);
+               unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
+               unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len);
+               unsigned int data_len = le32_to_cpu(con->in_hdr.data_len);
  
                /* skip rest of message */
 -              dout("con_revoke_pages %p msg %p revoked\n", con, msg);
 -                      con->in_base_pos = con->in_base_pos -
 +              dout("%s %p msg %p revoked\n", __func__, con, msg);
 +              con->in_base_pos = con->in_base_pos -
                                sizeof(struct ceph_msg_header) -
                                front_len -
                                middle_len -
                con->in_tag = CEPH_MSGR_TAG_READY;
                con->in_seq++;
        } else {
 -              dout("con_revoke_pages %p msg %p pages %p no-op\n",
 -                   con, con->in_msg, msg);
 +              dout("%s %p in_msg %p msg %p no-op\n",
 +                   __func__, con, con->in_msg, msg);
        }
        mutex_unlock(&con->mutex);
  }
@@@ -2503,8 -2407,8 +2503,8 @@@ void ceph_con_keepalive(struct ceph_con
  {
        dout("con_keepalive %p\n", con);
        clear_standby(con);
 -      if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 &&
 -          test_and_set_bit(WRITE_PENDING, &con->state) == 0)
 +      if (test_and_set_bit(KEEPALIVE_PENDING, &con->flags) == 0 &&
 +          test_and_set_bit(WRITE_PENDING, &con->flags) == 0)
                queue_con(con);
  }
  EXPORT_SYMBOL(ceph_con_keepalive);
@@@ -2523,8 -2427,6 +2523,8 @@@ struct ceph_msg *ceph_msg_new(int type
        if (m == NULL)
                goto out;
        kref_init(&m->kref);
 +
 +      m->con = NULL;
        INIT_LIST_HEAD(&m->list_head);
  
        m->hdr.tid = 0;
@@@ -2620,63 -2522,46 +2620,63 @@@ static int ceph_alloc_middle(struct cep
  }
  
  /*
 - * Generic message allocator, for incoming messages.
 + * Allocate a message for receiving an incoming message on a
 + * connection, and save the result in con->in_msg.  Uses the
 + * connection's private alloc_msg op if available.
 + *
 + * Returns true if the message should be skipped, false otherwise.
 + * If true is returned (skip message), con->in_msg will be NULL.
 + * If false is returned, con->in_msg will contain a pointer to the
 + * newly-allocated message, or NULL in case of memory exhaustion.
   */
 -static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
 -                              struct ceph_msg_header *hdr,
 -                              int *skip)
 +static bool ceph_con_in_msg_alloc(struct ceph_connection *con,
 +                              struct ceph_msg_header *hdr)
  {
        int type = le16_to_cpu(hdr->type);
        int front_len = le32_to_cpu(hdr->front_len);
        int middle_len = le32_to_cpu(hdr->middle_len);
 -      struct ceph_msg *msg = NULL;
        int ret;
  
 +      BUG_ON(con->in_msg != NULL);
 +
        if (con->ops->alloc_msg) {
 +              int skip = 0;
 +
                mutex_unlock(&con->mutex);
 -              msg = con->ops->alloc_msg(con, hdr, skip);
 +              con->in_msg = con->ops->alloc_msg(con, hdr, &skip);
                mutex_lock(&con->mutex);
 -              if (!msg || *skip)
 -                      return NULL;
 +              if (con->in_msg) {
 +                      con->in_msg->con = ceph_con_get(con);
 +                      BUG_ON(con->in_msg->con == NULL);
 +              }
 +              if (skip)
 +                      con->in_msg = NULL;
 +
 +              if (!con->in_msg)
 +                      return skip != 0;
        }
 -      if (!msg) {
 -              *skip = 0;
 -              msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
 -              if (!msg) {
 +      if (!con->in_msg) {
 +              con->in_msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
 +              if (!con->in_msg) {
                        pr_err("unable to allocate msg type %d len %d\n",
                               type, front_len);
 -                      return NULL;
 +                      return false;
                }
 -              msg->page_alignment = le16_to_cpu(hdr->data_off);
 +              con->in_msg->con = ceph_con_get(con);
 +              BUG_ON(con->in_msg->con == NULL);
 +              con->in_msg->page_alignment = le16_to_cpu(hdr->data_off);
        }
 -      memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
 +      memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
  
 -      if (middle_len && !msg->middle) {
 -              ret = ceph_alloc_middle(con, msg);
 +      if (middle_len && !con->in_msg->middle) {
 +              ret = ceph_alloc_middle(con, con->in_msg);
                if (ret < 0) {
 -                      ceph_msg_put(msg);
 -                      return NULL;
 +                      ceph_msg_put(con->in_msg);
 +                      con->in_msg = NULL;
                }
        }
  
 -      return msg;
 +      return false;
  }
  
  
diff --combined net/ceph/mon_client.c
index dc16595d68850fb09e2814a1235469c3d4e00264,10d6008d31f21f982fa929a023a4f0a5a1b02d2a..e9db3de20b2e8957c25cc231f272a26968a2eb4a
@@@ -106,9 -106,9 +106,9 @@@ static void __send_prepared_auth_reques
        monc->pending_auth = 1;
        monc->m_auth->front.iov_len = len;
        monc->m_auth->hdr.front_len = cpu_to_le32(len);
 -      ceph_con_revoke(monc->con, monc->m_auth);
 +      ceph_msg_revoke(monc->m_auth);
        ceph_msg_get(monc->m_auth);  /* keep our ref */
 -      ceph_con_send(monc->con, monc->m_auth);
 +      ceph_con_send(&monc->con, monc->m_auth);
  }
  
  /*
  static void __close_session(struct ceph_mon_client *monc)
  {
        dout("__close_session closing mon%d\n", monc->cur_mon);
 -      ceph_con_revoke(monc->con, monc->m_auth);
 -      ceph_con_close(monc->con);
 +      ceph_msg_revoke(monc->m_auth);
 +      ceph_con_close(&monc->con);
 +      monc->con.private = NULL;
        monc->cur_mon = -1;
        monc->pending_auth = 0;
        ceph_auth_reset(monc->auth);
@@@ -142,12 -141,10 +142,12 @@@ static int __open_session(struct ceph_m
                monc->sub_renew_after = jiffies;  /* i.e., expired */
                monc->want_next_osdmap = !!monc->want_next_osdmap;
  
 +              ceph_con_init(&monc->con, monc, &mon_con_ops,
 +                      &monc->client->msgr,
 +                      CEPH_ENTITY_TYPE_MON, monc->cur_mon);
 +
                dout("open_session mon%d opening\n", monc->cur_mon);
 -              monc->con->peer_name.type = CEPH_ENTITY_TYPE_MON;
 -              monc->con->peer_name.num = cpu_to_le64(monc->cur_mon);
 -              ceph_con_open(monc->con,
 +              ceph_con_open(&monc->con,
                              &monc->monmap->mon_inst[monc->cur_mon].addr);
  
                /* initiatiate authentication handshake */
@@@ -171,7 -168,7 +171,7 @@@ static bool __sub_expired(struct ceph_m
   */
  static void __schedule_delayed(struct ceph_mon_client *monc)
  {
-       unsigned delay;
+       unsigned int delay;
  
        if (monc->cur_mon < 0 || __sub_expired(monc))
                delay = 10 * HZ;
  static void __send_subscribe(struct ceph_mon_client *monc)
  {
        dout("__send_subscribe sub_sent=%u exp=%u want_osd=%d\n",
-            (unsigned)monc->sub_sent, __sub_expired(monc),
+            (unsigned int)monc->sub_sent, __sub_expired(monc),
             monc->want_next_osdmap);
        if ((__sub_expired(monc) && !monc->sub_sent) ||
            monc->want_next_osdmap == 1) {
  
                if (monc->want_next_osdmap) {
                        dout("__send_subscribe to 'osdmap' %u\n",
-                            (unsigned)monc->have_osdmap);
+                            (unsigned int)monc->have_osdmap);
                        ceph_encode_string(&p, end, "osdmap", 6);
                        i = p;
                        i->have = cpu_to_le64(monc->have_osdmap);
                }
                if (monc->want_mdsmap) {
                        dout("__send_subscribe to 'mdsmap' %u+\n",
-                            (unsigned)monc->have_mdsmap);
+                            (unsigned int)monc->have_mdsmap);
                        ceph_encode_string(&p, end, "mdsmap", 6);
                        i = p;
                        i->have = cpu_to_le64(monc->have_mdsmap);
  
                msg->front.iov_len = p - msg->front.iov_base;
                msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
 -              ceph_con_revoke(monc->con, msg);
 -              ceph_con_send(monc->con, ceph_msg_get(msg));
 +              ceph_msg_revoke(msg);
 +              ceph_con_send(&monc->con, ceph_msg_get(msg));
  
                monc->sub_sent = jiffies | 1;  /* never 0 */
        }
  static void handle_subscribe_ack(struct ceph_mon_client *monc,
                                 struct ceph_msg *msg)
  {
-       unsigned seconds;
+       unsigned int seconds;
        struct ceph_mon_subscribe_ack *h = msg->front.iov_base;
  
        if (msg->front.iov_len < sizeof(*h))
        if (monc->hunting) {
                pr_info("mon%d %s session established\n",
                        monc->cur_mon,
 -                      ceph_pr_addr(&monc->con->peer_addr.in_addr));
 +                      ceph_pr_addr(&monc->con.peer_addr.in_addr));
                monc->hunting = false;
        }
        dout("handle_subscribe_ack after %d seconds\n", seconds);
@@@ -442,7 -439,6 +442,7 @@@ static struct ceph_msg *get_generic_rep
                m = NULL;
        } else {
                dout("get_generic_reply %lld got %p\n", tid, req->reply);
 +              *skip = 0;
                m = ceph_msg_get(req->reply);
                /*
                 * we don't need to track the connection reading into
@@@ -465,7 -461,7 +465,7 @@@ static int do_generic_request(struct ce
        req->request->hdr.tid = cpu_to_le64(req->tid);
        __insert_generic_request(monc, req);
        monc->num_generic_requests++;
 -      ceph_con_send(monc->con, ceph_msg_get(req->request));
 +      ceph_con_send(&monc->con, ceph_msg_get(req->request));
        mutex_unlock(&monc->mutex);
  
        err = wait_for_completion_interruptible(&req->completion);
@@@ -688,8 -684,8 +688,8 @@@ static void __resend_generic_request(st
  
        for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
                req = rb_entry(p, struct ceph_mon_generic_request, node);
 -              ceph_con_revoke(monc->con, req->request);
 -              ceph_con_send(monc->con, ceph_msg_get(req->request));
 +              ceph_msg_revoke(req->request);
 +              ceph_con_send(&monc->con, ceph_msg_get(req->request));
        }
  }
  
@@@ -709,7 -705,7 +709,7 @@@ static void delayed_work(struct work_st
                __close_session(monc);
                __open_session(monc);  /* continue hunting */
        } else {
 -              ceph_con_keepalive(monc->con);
 +              ceph_con_keepalive(&monc->con);
  
                __validate_auth(monc);
  
@@@ -764,12 -760,19 +764,12 @@@ int ceph_monc_init(struct ceph_mon_clie
                goto out;
  
        /* connection */
 -      monc->con = kmalloc(sizeof(*monc->con), GFP_KERNEL);
 -      if (!monc->con)
 -              goto out_monmap;
 -      ceph_con_init(monc->client->msgr, monc->con);
 -      monc->con->private = monc;
 -      monc->con->ops = &mon_con_ops;
 -
        /* authentication */
        monc->auth = ceph_auth_init(cl->options->name,
                                    cl->options->key);
        if (IS_ERR(monc->auth)) {
                err = PTR_ERR(monc->auth);
 -              goto out_con;
 +              goto out_monmap;
        }
        monc->auth->want_keys =
                CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
@@@ -821,6 -824,8 +821,6 @@@ out_subscribe_ack
        ceph_msg_put(monc->m_subscribe_ack);
  out_auth:
        ceph_auth_destroy(monc->auth);
 -out_con:
 -      monc->con->ops->put(monc->con);
  out_monmap:
        kfree(monc->monmap);
  out:
@@@ -836,16 -841,12 +836,16 @@@ void ceph_monc_stop(struct ceph_mon_cli
        mutex_lock(&monc->mutex);
        __close_session(monc);
  
 -      monc->con->private = NULL;
 -      monc->con->ops->put(monc->con);
 -      monc->con = NULL;
 -
        mutex_unlock(&monc->mutex);
  
 +      /*
 +       * flush msgr queue before we destroy ourselves to ensure that:
 +       *  - any work that references our embedded con is finished.
 +       *  - any osd_client or other work that may reference an authorizer
 +       *    finishes before we shut down the auth subsystem.
 +       */
 +      ceph_msgr_flush();
 +
        ceph_auth_destroy(monc->auth);
  
        ceph_msg_put(monc->m_auth);
@@@ -879,8 -880,8 +879,8 @@@ static void handle_auth_reply(struct ce
        } else if (!was_auth && monc->auth->ops->is_authenticated(monc->auth)) {
                dout("authenticated, starting session\n");
  
 -              monc->client->msgr->inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
 -              monc->client->msgr->inst.name.num =
 +              monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
 +              monc->client->msgr.inst.name.num =
                                        cpu_to_le64(monc->auth->global_id);
  
                __send_subscribe(monc);
@@@ -991,8 -992,6 +991,8 @@@ static struct ceph_msg *mon_alloc_msg(s
        case CEPH_MSG_MDS_MAP:
        case CEPH_MSG_OSD_MAP:
                m = ceph_msg_new(type, front_len, GFP_NOFS, false);
 +              if (!m)
 +                      return NULL;    /* ENOMEM--return skip == 0 */
                break;
        }
  
@@@ -1022,7 -1021,7 +1022,7 @@@ static void mon_fault(struct ceph_conne
        if (!monc->hunting)
                pr_info("mon%d %s session lost, "
                        "hunting for new mon\n", monc->cur_mon,
 -                      ceph_pr_addr(&monc->con->peer_addr.in_addr));
 +                      ceph_pr_addr(&monc->con.peer_addr.in_addr));
  
        __close_session(monc);
        if (!monc->hunting) {
        mutex_unlock(&monc->mutex);
  }
  
 +/*
 + * We can ignore refcounting on the connection struct, as all references
 + * will come from the messenger workqueue, which is drained prior to
 + * mon_client destruction.
 + */
 +static struct ceph_connection *con_get(struct ceph_connection *con)
 +{
 +      return con;
 +}
 +
 +static void con_put(struct ceph_connection *con)
 +{
 +}
 +
  static const struct ceph_connection_operations mon_con_ops = {
 -      .get = ceph_con_get,
 -      .put = ceph_con_put,
 +      .get = con_get,
 +      .put = con_put,
        .dispatch = dispatch,
        .fault = mon_fault,
        .alloc_msg = mon_alloc_msg,
diff --combined net/ceph/osd_client.c
index c178c770acb4845964efac43782fafd8983670fc,1ffebed5ce0f9a629ad2733349b8e33c326850d5..db2da54f7336f12a31932791aa67e5c3ff3edbb9
@@@ -139,14 -139,15 +139,14 @@@ void ceph_osdc_release_request(struct k
  
        if (req->r_request)
                ceph_msg_put(req->r_request);
 -      if (req->r_reply)
 -              ceph_msg_put(req->r_reply);
        if (req->r_con_filling_msg) {
 -              dout("release_request revoking pages %p from con %p\n",
 +              dout("%s revoking pages %p from con %p\n", __func__,
                     req->r_pages, req->r_con_filling_msg);
 -              ceph_con_revoke_message(req->r_con_filling_msg,
 -                                    req->r_reply);
 -              ceph_con_put(req->r_con_filling_msg);
 +              ceph_msg_revoke_incoming(req->r_reply);
 +              req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
        }
 +      if (req->r_reply)
 +              ceph_msg_put(req->r_reply);
        if (req->r_own_pages)
                ceph_release_page_vector(req->r_pages,
                                         req->r_num_pages);
@@@ -623,7 -624,7 +623,7 @@@ static void osd_reset(struct ceph_conne
  /*
   * Track open sessions with osds.
   */
 -static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
 +static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
  {
        struct ceph_osd *osd;
  
  
        atomic_set(&osd->o_ref, 1);
        osd->o_osdc = osdc;
 +      osd->o_osd = onum;
        INIT_LIST_HEAD(&osd->o_requests);
        INIT_LIST_HEAD(&osd->o_linger_requests);
        INIT_LIST_HEAD(&osd->o_osd_lru);
        osd->o_incarnation = 1;
  
 -      ceph_con_init(osdc->client->msgr, &osd->o_con);
 -      osd->o_con.private = osd;
 -      osd->o_con.ops = &osd_con_ops;
 -      osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
 +      ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr,
 +              CEPH_ENTITY_TYPE_OSD, onum);
  
        INIT_LIST_HEAD(&osd->o_keepalive_item);
        return osd;
@@@ -851,7 -853,7 +851,7 @@@ static void __unregister_request(struc
  
        if (req->r_osd) {
                /* make sure the original request isn't in flight. */
 -              ceph_con_revoke(&req->r_osd->o_con, req->r_request);
 +              ceph_msg_revoke(req->r_request);
  
                list_del_init(&req->r_osd_item);
                if (list_empty(&req->r_osd->o_requests) &&
  static void __cancel_request(struct ceph_osd_request *req)
  {
        if (req->r_sent && req->r_osd) {
 -              ceph_con_revoke(&req->r_osd->o_con, req->r_request);
 +              ceph_msg_revoke(req->r_request);
                req->r_sent = 0;
        }
  }
@@@ -996,13 -998,15 +996,13 @@@ static int __map_request(struct ceph_os
        req->r_osd = __lookup_osd(osdc, o);
        if (!req->r_osd && o >= 0) {
                err = -ENOMEM;
 -              req->r_osd = create_osd(osdc);
 +              req->r_osd = create_osd(osdc, o);
                if (!req->r_osd) {
                        list_move(&req->r_req_lru_item, &osdc->req_notarget);
                        goto out;
                }
  
                dout("map_request osd %p is osd%d\n", req->r_osd, o);
 -              req->r_osd->o_osd = o;
 -              req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
                __insert_osd(osdc, req->r_osd);
  
                ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
@@@ -1212,11 -1216,11 +1212,11 @@@ static void handle_reply(struct ceph_os
        if (req->r_con_filling_msg == con && req->r_reply == msg) {
                dout(" dropping con_filling_msg ref %p\n", con);
                req->r_con_filling_msg = NULL;
 -              ceph_con_put(con);
 +              con->ops->put(con);
        }
  
        if (!req->r_got_reply) {
-               unsigned bytes;
+               unsigned int bytes;
  
                req->r_result = le32_to_cpu(rhead->result);
                bytes = le32_to_cpu(msg->hdr.data_len);
@@@ -1387,7 -1391,7 +1387,7 @@@ void ceph_osdc_handle_map(struct ceph_o
                             epoch, maplen);
                        newmap = osdmap_apply_incremental(&p, next,
                                                          osdc->osdmap,
 -                                                        osdc->client->msgr);
 +                                                        &osdc->client->msgr);
                        if (IS_ERR(newmap)) {
                                err = PTR_ERR(newmap);
                                goto bad;
@@@ -2021,10 -2025,10 +2021,10 @@@ static struct ceph_msg *get_reply(struc
        }
  
        if (req->r_con_filling_msg) {
 -              dout("get_reply revoking msg %p from old con %p\n",
 +              dout("%s revoking msg %p from old con %p\n", __func__,
                     req->r_reply, req->r_con_filling_msg);
 -              ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
 -              ceph_con_put(req->r_con_filling_msg);
 +              ceph_msg_revoke_incoming(req->r_reply);
 +              req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
                req->r_con_filling_msg = NULL;
        }
  
  #endif
        }
        *skip = 0;
 -      req->r_con_filling_msg = ceph_con_get(con);
 +      req->r_con_filling_msg = con->ops->get(con);
        dout("get_reply tid %lld %p\n", tid, m);
  
  out:
@@@ -2076,7 -2080,6 +2076,7 @@@ static struct ceph_msg *alloc_msg(struc
        int type = le16_to_cpu(hdr->type);
        int front = le32_to_cpu(hdr->front_len);
  
 +      *skip = 0;
        switch (type) {
        case CEPH_MSG_OSD_MAP:
        case CEPH_MSG_WATCH_NOTIFY:
diff --combined net/ceph/osdmap.c
index d3de09f519b27455719120f78902bb41fd07d38d,81e3b84a77efdecb6c44603e7784a083fe94b980..9600674c2c3978fc79912235d7cdb708a572501c
@@@ -38,7 -38,7 +38,7 @@@ done
  
  /* maps */
  
- static int calc_bits_of(unsigned t)
+ static int calc_bits_of(unsigned int t)
  {
        int b = 0;
        while (t) {
@@@ -154,7 -154,7 +154,7 @@@ static struct crush_map *crush_decode(v
        magic = ceph_decode_32(p);
        if (magic != CRUSH_MAGIC) {
                pr_err("crush_decode magic %x != current %x\n",
-                      (unsigned)magic, (unsigned)CRUSH_MAGIC);
+                      (unsigned int)magic, (unsigned int)CRUSH_MAGIC);
                goto bad;
        }
        c->max_buckets = ceph_decode_32(p);
@@@ -453,7 -453,7 +453,7 @@@ static void __remove_pg_pool(struct rb_
  
  static int __decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)
  {
-       unsigned n, m;
+       unsigned int n, m;
  
        ceph_decode_copy(p, &pi->v, sizeof(pi->v));
        calc_pg_masks(pi);
@@@ -488,16 -488,15 +488,16 @@@ static int __decode_pool_names(void **p
                ceph_decode_32_safe(p, end, pool, bad);
                ceph_decode_32_safe(p, end, len, bad);
                dout("  pool %d len %d\n", pool, len);
 +              ceph_decode_need(p, end, len, bad);
                pi = __lookup_pg_pool(&map->pg_pools, pool);
                if (pi) {
 +                      char *name = kstrndup(*p, len, GFP_NOFS);
 +
 +                      if (!name)
 +                              return -ENOMEM;
                        kfree(pi->name);
 -                      pi->name = kmalloc(len + 1, GFP_NOFS);
 -                      if (pi->name) {
 -                              memcpy(pi->name, *p, len);
 -                              pi->name[len] = '\0';
 -                              dout("  name is %s\n", pi->name);
 -                      }
 +                      pi->name = name;
 +                      dout("  name is %s\n", pi->name);
                }
                *p += len;
        }
@@@ -667,9 -666,6 +667,9 @@@ struct ceph_osdmap *osdmap_decode(void 
                ceph_decode_need(p, end, sizeof(u32) + sizeof(u64), bad);
                ceph_decode_copy(p, &pgid, sizeof(pgid));
                n = ceph_decode_32(p);
 +              err = -EINVAL;
 +              if (n > (UINT_MAX - sizeof(*pg)) / sizeof(u32))
 +                      goto bad;
                ceph_decode_need(p, end, n * sizeof(u32), bad);
                err = -ENOMEM;
                pg = kmalloc(sizeof(*pg) + n*sizeof(u32), GFP_NOFS);
@@@ -893,10 -889,6 +893,10 @@@ struct ceph_osdmap *osdmap_apply_increm
                        (void) __remove_pg_mapping(&map->pg_temp, pgid);
  
                        /* insert */
 +                      if (pglen > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) {
 +                              err = -EINVAL;
 +                              goto bad;
 +                      }
                        pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS);
                        if (!pg) {
                                err = -ENOMEM;
@@@ -975,7 -967,7 +975,7 @@@ void ceph_calc_file_object_mapping(stru
        objsetno = stripeno / su_per_object;
  
        *ono = objsetno * sc + stripepos;
-       dout("objset %u * sc %u = ono %u\n", objsetno, sc, (unsigned)*ono);
+       dout("objset %u * sc %u = ono %u\n", objsetno, sc, (unsigned int)*ono);
  
        /* *oxoff = *off % layout->fl_stripe_unit;  # offset in su */
        t = off;
@@@ -1003,11 -995,11 +1003,11 @@@ int ceph_calc_object_layout(struct ceph
                            struct ceph_file_layout *fl,
                            struct ceph_osdmap *osdmap)
  {
-       unsigned num, num_mask;
+       unsigned int num, num_mask;
        struct ceph_pg pgid;
        int poolid = le32_to_cpu(fl->fl_pg_pool);
        struct ceph_pg_pool_info *pool;
-       unsigned ps;
+       unsigned int ps;
  
        BUG_ON(!osdmap);
  
@@@ -1039,7 -1031,7 +1039,7 @@@ static int *calc_pg_raw(struct ceph_osd
        struct ceph_pg_mapping *pg;
        struct ceph_pg_pool_info *pool;
        int ruleno;
-       unsigned poolid, ps, pps, t, r;
+       unsigned int poolid, ps, pps, t, r;
  
        poolid = le32_to_cpu(pgid.pool);
        ps = le16_to_cpu(pgid.ps);