]> git.proxmox.com Git - mirror_ubuntu-zesty-kernel.git/blobdiff - drivers/block/drbd/drbd_main.c
drbd: allow read requests to be retried after force-detach
[mirror_ubuntu-zesty-kernel.git] / drivers / block / drbd / drbd_main.c
index 77c957ff7a0e861f8bdd00a0327083617eb1d330..c0acd86c84153185cfc8c18716a25dffe0928d65 100644 (file)
@@ -120,7 +120,6 @@ module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0
  */
 struct idr minors;
 struct list_head drbd_tconns;  /* list of struct drbd_tconn */
-DEFINE_MUTEX(drbd_cfg_mutex);
 
 struct kmem_cache *drbd_request_cache;
 struct kmem_cache *drbd_ee_cache;      /* peer requests */
@@ -189,151 +188,75 @@ int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
 #endif
 
 /**
- * DOC: The transfer log
- *
- * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
- * mdev->tconn->newest_tle points to the head, mdev->tconn->oldest_tle points to the tail
- * of the list. There is always at least one &struct drbd_tl_epoch object.
- *
- * Each &struct drbd_tl_epoch has a circular double linked list of requests
- * attached.
- */
-static int tl_init(struct drbd_tconn *tconn)
-{
-       struct drbd_tl_epoch *b;
-
-       /* during device minor initialization, we may well use GFP_KERNEL */
-       b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
-       if (!b)
-               return 0;
-       INIT_LIST_HEAD(&b->requests);
-       INIT_LIST_HEAD(&b->w.list);
-       b->next = NULL;
-       b->br_number = 4711;
-       b->n_writes = 0;
-       b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
-
-       tconn->oldest_tle = b;
-       tconn->newest_tle = b;
-       INIT_LIST_HEAD(&tconn->out_of_sequence_requests);
-
-       return 1;
-}
-
-static void tl_cleanup(struct drbd_tconn *tconn)
-{
-       if (tconn->oldest_tle != tconn->newest_tle)
-               conn_err(tconn, "ASSERT FAILED: oldest_tle == newest_tle\n");
-       if (!list_empty(&tconn->out_of_sequence_requests))
-               conn_err(tconn, "ASSERT FAILED: list_empty(out_of_sequence_requests)\n");
-       kfree(tconn->oldest_tle);
-       tconn->oldest_tle = NULL;
-       kfree(tconn->unused_spare_tle);
-       tconn->unused_spare_tle = NULL;
-}
-
-/**
- * _tl_add_barrier() - Adds a barrier to the transfer log
- * @mdev:      DRBD device.
- * @new:       Barrier to be added before the current head of the TL.
- *
- * The caller must hold the req_lock.
- */
-void _tl_add_barrier(struct drbd_tconn *tconn, struct drbd_tl_epoch *new)
-{
-       struct drbd_tl_epoch *newest_before;
-
-       INIT_LIST_HEAD(&new->requests);
-       INIT_LIST_HEAD(&new->w.list);
-       new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
-       new->next = NULL;
-       new->n_writes = 0;
-
-       newest_before = tconn->newest_tle;
-       /* never send a barrier number == 0, because that is special-cased
-        * when using TCQ for our write ordering code */
-       new->br_number = (newest_before->br_number+1) ?: 1;
-       if (tconn->newest_tle != new) {
-               tconn->newest_tle->next = new;
-               tconn->newest_tle = new;
-       }
-}
-
-/**
- * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
- * @mdev:      DRBD device.
+ * tl_release() - mark as BARRIER_ACKED all requests in the corresponding transfer log epoch
+ * @tconn:     DRBD connection.
  * @barrier_nr:        Expected identifier of the DRBD write barrier packet.
  * @set_size:  Expected number of requests before that barrier.
  *
  * In case the passed barrier_nr or set_size does not match the oldest
- * &struct drbd_tl_epoch objects this function will cause a termination
- * of the connection.
+ * epoch of not yet barrier-acked requests, this function will cause a
+ * termination of the connection.
  */
 void tl_release(struct drbd_tconn *tconn, unsigned int barrier_nr,
                unsigned int set_size)
 {
-       struct drbd_conf *mdev;
-       struct drbd_tl_epoch *b, *nob; /* next old barrier */
-       struct list_head *le, *tle;
        struct drbd_request *r;
+       struct drbd_request *req = NULL;
+       int expect_epoch = 0;
+       int expect_size = 0;
 
        spin_lock_irq(&tconn->req_lock);
 
-       b = tconn->oldest_tle;
+       /* find latest not yet barrier-acked write request,
+        * count writes in its epoch. */
+       list_for_each_entry(r, &tconn->transfer_log, tl_requests) {
+               const unsigned s = r->rq_state;
+               if (!req) {
+                       if (!(s & RQ_WRITE))
+                               continue;
+                       if (!(s & RQ_NET_MASK))
+                               continue;
+                       if (s & RQ_NET_DONE)
+                               continue;
+                       req = r;
+                       expect_epoch = req->epoch;
+                       expect_size ++;
+               } else {
+                       if (r->epoch != expect_epoch)
+                               break;
+                       if (!(s & RQ_WRITE))
+                               continue;
+                       /* if (s & RQ_DONE): not expected */
+                       /* if (!(s & RQ_NET_MASK)): not expected */
+                       expect_size++;
+               }
+       }
 
        /* first some paranoia code */
-       if (b == NULL) {
+       if (req == NULL) {
                conn_err(tconn, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
                         barrier_nr);
                goto bail;
        }
-       if (b->br_number != barrier_nr) {
+       if (expect_epoch != barrier_nr) {
                conn_err(tconn, "BAD! BarrierAck #%u received, expected #%u!\n",
-                        barrier_nr, b->br_number);
+                        barrier_nr, expect_epoch);
                goto bail;
        }
-       if (b->n_writes != set_size) {
+
+       if (expect_size != set_size) {
                conn_err(tconn, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
-                        barrier_nr, set_size, b->n_writes);
+                        barrier_nr, set_size, expect_size);
                goto bail;
        }
 
        /* Clean up list of requests processed during current epoch */
-       list_for_each_safe(le, tle, &b->requests) {
-               r = list_entry(le, struct drbd_request, tl_requests);
-               _req_mod(r, BARRIER_ACKED);
-       }
-       /* There could be requests on the list waiting for completion
-          of the write to the local disk. To avoid corruptions of
-          slab's data structures we have to remove the lists head.
-
-          Also there could have been a barrier ack out of sequence, overtaking
-          the write acks - which would be a bug and violating write ordering.
-          To not deadlock in case we lose connection while such requests are
-          still pending, we need some way to find them for the
-          _req_mode(CONNECTION_LOST_WHILE_PENDING).
-
-          These have been list_move'd to the out_of_sequence_requests list in
-          _req_mod(, BARRIER_ACKED) above.
-          */
-       list_del_init(&b->requests);
-       mdev = b->w.mdev;
-
-       nob = b->next;
-       if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
-               _tl_add_barrier(tconn, b);
-               if (nob)
-                       tconn->oldest_tle = nob;
-               /* if nob == NULL b was the only barrier, and becomes the new
-                  barrier. Therefore tconn->oldest_tle points already to b */
-       } else {
-               D_ASSERT(nob != NULL);
-               tconn->oldest_tle = nob;
-               kfree(b);
+       list_for_each_entry_safe(req, r, &tconn->transfer_log, tl_requests) {
+               if (req->epoch != expect_epoch)
+                       break;
+               _req_mod(req, BARRIER_ACKED);
        }
-
        spin_unlock_irq(&tconn->req_lock);
-       dec_ap_pending(mdev);
 
        return;
 
@@ -351,75 +274,21 @@ bail:
  * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
  * RESTART_FROZEN_DISK_IO.
  */
+/* must hold resource->req_lock */
 void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
 {
-       struct drbd_tl_epoch *b, *tmp, **pn;
-       struct list_head *le, *tle, carry_reads;
-       struct drbd_request *req;
-       int rv, n_writes, n_reads;
-
-       b = tconn->oldest_tle;
-       pn = &tconn->oldest_tle;
-       while (b) {
-               n_writes = 0;
-               n_reads = 0;
-               INIT_LIST_HEAD(&carry_reads);
-               list_for_each_safe(le, tle, &b->requests) {
-                       req = list_entry(le, struct drbd_request, tl_requests);
-                       rv = _req_mod(req, what);
-
-                       n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
-                       n_reads  += (rv & MR_READ) >> MR_READ_SHIFT;
-               }
-               tmp = b->next;
-
-               if (n_writes) {
-                       if (what == RESEND) {
-                               b->n_writes = n_writes;
-                               if (b->w.cb == NULL) {
-                                       b->w.cb = w_send_barrier;
-                                       inc_ap_pending(b->w.mdev);
-                                       set_bit(CREATE_BARRIER, &b->w.mdev->flags);
-                               }
-
-                               drbd_queue_work(&tconn->data.work, &b->w);
-                       }
-                       pn = &b->next;
-               } else {
-                       if (n_reads)
-                               list_add(&carry_reads, &b->requests);
-                       /* there could still be requests on that ring list,
-                        * in case local io is still pending */
-                       list_del(&b->requests);
-
-                       /* dec_ap_pending corresponding to queue_barrier.
-                        * the newest barrier may not have been queued yet,
-                        * in which case w.cb is still NULL. */
-                       if (b->w.cb != NULL)
-                               dec_ap_pending(b->w.mdev);
-
-                       if (b == tconn->newest_tle) {
-                               /* recycle, but reinit! */
-                               if (tmp != NULL)
-                                       conn_err(tconn, "ASSERT FAILED tmp == NULL");
-                               INIT_LIST_HEAD(&b->requests);
-                               list_splice(&carry_reads, &b->requests);
-                               INIT_LIST_HEAD(&b->w.list);
-                               b->w.cb = NULL;
-                               b->br_number = net_random();
-                               b->n_writes = 0;
-
-                               *pn = b;
-                               break;
-                       }
-                       *pn = tmp;
-                       kfree(b);
-               }
-               b = tmp;
-               list_splice(&carry_reads, &b->requests);
-       }
+       struct drbd_request *req, *r;
+
+       list_for_each_entry_safe(req, r, &tconn->transfer_log, tl_requests)
+               _req_mod(req, what);
 }
 
+void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
+{
+       spin_lock_irq(&tconn->req_lock);
+       _tl_restart(tconn, what);
+       spin_unlock_irq(&tconn->req_lock);
+}
 
 /**
  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
@@ -431,38 +300,26 @@ void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
  */
 void tl_clear(struct drbd_tconn *tconn)
 {
-       struct drbd_conf *mdev;
-       struct list_head *le, *tle;
-       struct drbd_request *r;
-       int vnr;
-
-       spin_lock_irq(&tconn->req_lock);
-
-       _tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING);
-
-       /* we expect this list to be empty. */
-       if (!list_empty(&tconn->out_of_sequence_requests))
-               conn_err(tconn, "ASSERT FAILED list_empty(&out_of_sequence_requests)\n");
-
-       /* but just in case, clean it up anyways! */
-       list_for_each_safe(le, tle, &tconn->out_of_sequence_requests) {
-               r = list_entry(le, struct drbd_request, tl_requests);
-               /* It would be nice to complete outside of spinlock.
-                * But this is easier for now. */
-               _req_mod(r, CONNECTION_LOST_WHILE_PENDING);
-       }
-
-       /* ensure bit indicating barrier is required is clear */
-       idr_for_each_entry(&tconn->volumes, mdev, vnr)
-               clear_bit(CREATE_BARRIER, &mdev->flags);
-
-       spin_unlock_irq(&tconn->req_lock);
+       tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING);
 }
 
-void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
+/**
+ * tl_abort_disk_io() - Abort disk I/O for all requests for a certain mdev in the TL
+ * @mdev:      DRBD device.
+ */
+void tl_abort_disk_io(struct drbd_conf *mdev)
 {
+       struct drbd_tconn *tconn = mdev->tconn;
+       struct drbd_request *req, *r;
+
        spin_lock_irq(&tconn->req_lock);
-       _tl_restart(tconn, what);
+       list_for_each_entry_safe(req, r, &tconn->transfer_log, tl_requests) {
+               if (!(req->rq_state & RQ_LOCAL_PENDING))
+                       continue;
+               if (req->w.mdev != mdev)
+                       continue;
+               _req_mod(req, ABORT_DISK_IO);
+       }
        spin_unlock_irq(&tconn->req_lock);
 }
 
@@ -501,12 +358,14 @@ restart:
        thi->task = NULL;
        thi->t_state = NONE;
        smp_mb();
-       complete(&thi->stop);
+       complete_all(&thi->stop);
        spin_unlock_irqrestore(&thi->t_lock, flags);
 
        conn_info(tconn, "Terminating %s\n", current->comm);
 
        /* Release mod reference taken when thread was started */
+
+       kref_put(&tconn->kref, &conn_destroy);
        module_put(THIS_MODULE);
        return retval;
 }
@@ -544,6 +403,8 @@ int drbd_thread_start(struct drbd_thread *thi)
                        return false;
                }
 
+               kref_get(&thi->tconn->kref);
+
                init_completion(&thi->stop);
                thi->reset_cpu_mask = 1;
                thi->t_state = RUNNING;
@@ -556,6 +417,7 @@ int drbd_thread_start(struct drbd_thread *thi)
                if (IS_ERR(nt)) {
                        conn_err(tconn, "Couldn't start thread\n");
 
+                       kref_put(&tconn->kref, &conn_destroy);
                        module_put(THIS_MODULE);
                        return false;
                }
@@ -634,13 +496,15 @@ char *drbd_task_to_thread_name(struct drbd_tconn *tconn, struct task_struct *tas
 
 int conn_lowest_minor(struct drbd_tconn *tconn)
 {
-       int vnr = 0;
        struct drbd_conf *mdev;
+       int vnr = 0, m;
 
+       rcu_read_lock();
        mdev = idr_get_next(&tconn->volumes, &vnr);
-       if (!mdev)
-               return -1;
-       return mdev_to_minor(mdev);
+       m = mdev ? mdev_to_minor(mdev) : -1;
+       rcu_read_unlock();
+
+       return m;
 }
 
 #ifdef CONFIG_SMP
@@ -689,204 +553,305 @@ void drbd_thread_current_set_cpu(struct drbd_thread *thi)
 }
 #endif
 
-static void prepare_header80(struct p_header80 *h, enum drbd_packet cmd, int size)
+/**
+ * drbd_header_size  -  size of a packet header
+ *
+ * The header size is a multiple of 8, so any payload following the header is
+ * word aligned on 64-bit architectures.  (The bitmap send and receive code
+ * relies on this.)
+ */
+unsigned int drbd_header_size(struct drbd_tconn *tconn)
+{
+       if (tconn->agreed_pro_version >= 100) {
+               BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header100), 8));
+               return sizeof(struct p_header100);
+       } else {
+               BUILD_BUG_ON(sizeof(struct p_header80) !=
+                            sizeof(struct p_header95));
+               BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header80), 8));
+               return sizeof(struct p_header80);
+       }
+}
+
+static unsigned int prepare_header80(struct p_header80 *h, enum drbd_packet cmd, int size)
 {
        h->magic   = cpu_to_be32(DRBD_MAGIC);
        h->command = cpu_to_be16(cmd);
        h->length  = cpu_to_be16(size);
+       return sizeof(struct p_header80);
 }
 
-static void prepare_header95(struct p_header95 *h, enum drbd_packet cmd, int size)
+static unsigned int prepare_header95(struct p_header95 *h, enum drbd_packet cmd, int size)
 {
        h->magic   = cpu_to_be16(DRBD_MAGIC_BIG);
        h->command = cpu_to_be16(cmd);
-       h->length  = cpu_to_be32(size);
+       h->length = cpu_to_be32(size);
+       return sizeof(struct p_header95);
 }
 
-static void _prepare_header(struct drbd_tconn *tconn, int vnr, struct p_header *h,
-                           enum drbd_packet cmd, int size)
+static unsigned int prepare_header100(struct p_header100 *h, enum drbd_packet cmd,
+                                     int size, int vnr)
 {
-       if (tconn->agreed_pro_version >= 100 || size > DRBD_MAX_SIZE_H80_PACKET)
-               prepare_header95(&h->h95, cmd, size);
+       h->magic = cpu_to_be32(DRBD_MAGIC_100);
+       h->volume = cpu_to_be16(vnr);
+       h->command = cpu_to_be16(cmd);
+       h->length = cpu_to_be32(size);
+       h->pad = 0;
+       return sizeof(struct p_header100);
+}
+
+static unsigned int prepare_header(struct drbd_tconn *tconn, int vnr,
+                                  void *buffer, enum drbd_packet cmd, int size)
+{
+       if (tconn->agreed_pro_version >= 100)
+               return prepare_header100(buffer, cmd, size, vnr);
+       else if (tconn->agreed_pro_version >= 95 &&
+                size > DRBD_MAX_SIZE_H80_PACKET)
+               return prepare_header95(buffer, cmd, size);
        else
-               prepare_header80(&h->h80, cmd, size);
+               return prepare_header80(buffer, cmd, size);
 }
 
-static void prepare_header(struct drbd_conf *mdev, struct p_header *h,
-                          enum drbd_packet cmd, int size)
+static void *__conn_prepare_command(struct drbd_tconn *tconn,
+                                   struct drbd_socket *sock)
 {
-       _prepare_header(mdev->tconn, mdev->vnr, h, cmd, size);
+       if (!sock->socket)
+               return NULL;
+       return sock->sbuf + drbd_header_size(tconn);
 }
 
-/* the appropriate socket mutex must be held already */
-int _conn_send_cmd(struct drbd_tconn *tconn, int vnr, struct socket *sock,
-                  enum drbd_packet cmd, struct p_header *h, size_t size,
-                  unsigned msg_flags)
+void *conn_prepare_command(struct drbd_tconn *tconn, struct drbd_socket *sock)
 {
+       void *p;
+
+       mutex_lock(&sock->mutex);
+       p = __conn_prepare_command(tconn, sock);
+       if (!p)
+               mutex_unlock(&sock->mutex);
+
+       return p;
+}
+
+void *drbd_prepare_command(struct drbd_conf *mdev, struct drbd_socket *sock)
+{
+       return conn_prepare_command(mdev->tconn, sock);
+}
+
+static int __send_command(struct drbd_tconn *tconn, int vnr,
+                         struct drbd_socket *sock, enum drbd_packet cmd,
+                         unsigned int header_size, void *data,
+                         unsigned int size)
+{
+       int msg_flags;
        int err;
 
-       _prepare_header(tconn, vnr, h, cmd, size - sizeof(struct p_header));
-       err = drbd_send_all(tconn, sock, h, size, msg_flags);
-       if (err && !signal_pending(current))
-               conn_warn(tconn, "short send %s size=%d\n",
-                         cmdname(cmd), (int)size);
+       /*
+        * Called with @data == NULL and the size of the data blocks in @size
+        * for commands that send data blocks.  For those commands, omit the
+        * MSG_MORE flag: this will increase the likelihood that data blocks
+        * which are page aligned on the sender will end up page aligned on the
+        * receiver.
+        */
+       msg_flags = data ? MSG_MORE : 0;
+
+       header_size += prepare_header(tconn, vnr, sock->sbuf, cmd,
+                                     header_size + size);
+       err = drbd_send_all(tconn, sock->socket, sock->sbuf, header_size,
+                           msg_flags);
+       if (data && !err)
+               err = drbd_send_all(tconn, sock->socket, data, size, 0);
        return err;
 }
 
-/* don't pass the socket. we may only look at it
- * when we hold the appropriate socket mutex.
- */
-int conn_send_cmd(struct drbd_tconn *tconn, int vnr, struct drbd_socket *sock,
-                 enum drbd_packet cmd, struct p_header *h, size_t size)
+static int __conn_send_command(struct drbd_tconn *tconn, struct drbd_socket *sock,
+                              enum drbd_packet cmd, unsigned int header_size,
+                              void *data, unsigned int size)
 {
-       int err = -EIO;
+       return __send_command(tconn, 0, sock, cmd, header_size, data, size);
+}
 
-       mutex_lock(&sock->mutex);
-       if (sock->socket)
-               err = _conn_send_cmd(tconn, vnr, sock->socket, cmd, h, size, 0);
+int conn_send_command(struct drbd_tconn *tconn, struct drbd_socket *sock,
+                     enum drbd_packet cmd, unsigned int header_size,
+                     void *data, unsigned int size)
+{
+       int err;
+
+       err = __conn_send_command(tconn, sock, cmd, header_size, data, size);
        mutex_unlock(&sock->mutex);
        return err;
 }
 
-int conn_send_cmd2(struct drbd_tconn *tconn, enum drbd_packet cmd, char *data,
-                  size_t size)
+int drbd_send_command(struct drbd_conf *mdev, struct drbd_socket *sock,
+                     enum drbd_packet cmd, unsigned int header_size,
+                     void *data, unsigned int size)
 {
-       struct p_header80 h;
        int err;
 
-       prepare_header80(&h, cmd, size);
-       err = drbd_get_data_sock(tconn);
-       if (!err) {
-               err = drbd_send_all(tconn, tconn->data.socket, &h, sizeof(h), 0);
-               if (!err)
-                       err = drbd_send_all(tconn, tconn->data.socket, data, size, 0);
-               drbd_put_data_sock(tconn);
-       }
+       err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, header_size,
+                            data, size);
+       mutex_unlock(&sock->mutex);
        return err;
 }
 
+int drbd_send_ping(struct drbd_tconn *tconn)
+{
+       struct drbd_socket *sock;
+
+       sock = &tconn->meta;
+       if (!conn_prepare_command(tconn, sock))
+               return -EIO;
+       return conn_send_command(tconn, sock, P_PING, 0, NULL, 0);
+}
+
+int drbd_send_ping_ack(struct drbd_tconn *tconn)
+{
+       struct drbd_socket *sock;
+
+       sock = &tconn->meta;
+       if (!conn_prepare_command(tconn, sock))
+               return -EIO;
+       return conn_send_command(tconn, sock, P_PING_ACK, 0, NULL, 0);
+}
+
 int drbd_send_sync_param(struct drbd_conf *mdev)
 {
+       struct drbd_socket *sock;
        struct p_rs_param_95 *p;
-       struct socket *sock;
-       int size, err;
+       int size;
        const int apv = mdev->tconn->agreed_pro_version;
+       enum drbd_packet cmd;
+       struct net_conf *nc;
+       struct disk_conf *dc;
+
+       sock = &mdev->tconn->data;
+       p = drbd_prepare_command(mdev, sock);
+       if (!p)
+               return -EIO;
+
+       rcu_read_lock();
+       nc = rcu_dereference(mdev->tconn->net_conf);
 
        size = apv <= 87 ? sizeof(struct p_rs_param)
                : apv == 88 ? sizeof(struct p_rs_param)
-                       + strlen(mdev->tconn->net_conf->verify_alg) + 1
+                       + strlen(nc->verify_alg) + 1
                : apv <= 94 ? sizeof(struct p_rs_param_89)
                : /* apv >= 95 */ sizeof(struct p_rs_param_95);
 
-       /* used from admin command context and receiver/worker context.
-        * to avoid kmalloc, grab the socket right here,
-        * then use the pre-allocated sbuf there */
-       mutex_lock(&mdev->tconn->data.mutex);
-       sock = mdev->tconn->data.socket;
-
-       if (likely(sock != NULL)) {
-               enum drbd_packet cmd =
-                       apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
-
-               p = &mdev->tconn->data.sbuf.rs_param_95;
+       cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
 
-               /* initialize verify_alg and csums_alg */
-               memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
-
-               if (get_ldev(mdev)) {
-                       p->rate = cpu_to_be32(mdev->ldev->dc.resync_rate);
-                       p->c_plan_ahead = cpu_to_be32(mdev->ldev->dc.c_plan_ahead);
-                       p->c_delay_target = cpu_to_be32(mdev->ldev->dc.c_delay_target);
-                       p->c_fill_target = cpu_to_be32(mdev->ldev->dc.c_fill_target);
-                       p->c_max_rate = cpu_to_be32(mdev->ldev->dc.c_max_rate);
-                       put_ldev(mdev);
-               } else {
-                       p->rate = cpu_to_be32(DRBD_RATE_DEF);
-                       p->c_plan_ahead = cpu_to_be32(DRBD_C_PLAN_AHEAD_DEF);
-                       p->c_delay_target = cpu_to_be32(DRBD_C_DELAY_TARGET_DEF);
-                       p->c_fill_target = cpu_to_be32(DRBD_C_FILL_TARGET_DEF);
-                       p->c_max_rate = cpu_to_be32(DRBD_C_MAX_RATE_DEF);
-               }
+       /* initialize verify_alg and csums_alg */
+       memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
 
-               if (apv >= 88)
-                       strcpy(p->verify_alg, mdev->tconn->net_conf->verify_alg);
-               if (apv >= 89)
-                       strcpy(p->csums_alg, mdev->tconn->net_conf->csums_alg);
-
-               err = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
-       } else
-               err = -EIO;
+       if (get_ldev(mdev)) {
+               dc = rcu_dereference(mdev->ldev->disk_conf);
+               p->resync_rate = cpu_to_be32(dc->resync_rate);
+               p->c_plan_ahead = cpu_to_be32(dc->c_plan_ahead);
+               p->c_delay_target = cpu_to_be32(dc->c_delay_target);
+               p->c_fill_target = cpu_to_be32(dc->c_fill_target);
+               p->c_max_rate = cpu_to_be32(dc->c_max_rate);
+               put_ldev(mdev);
+       } else {
+               p->resync_rate = cpu_to_be32(DRBD_RESYNC_RATE_DEF);
+               p->c_plan_ahead = cpu_to_be32(DRBD_C_PLAN_AHEAD_DEF);
+               p->c_delay_target = cpu_to_be32(DRBD_C_DELAY_TARGET_DEF);
+               p->c_fill_target = cpu_to_be32(DRBD_C_FILL_TARGET_DEF);
+               p->c_max_rate = cpu_to_be32(DRBD_C_MAX_RATE_DEF);
+       }
 
-       mutex_unlock(&mdev->tconn->data.mutex);
+       if (apv >= 88)
+               strcpy(p->verify_alg, nc->verify_alg);
+       if (apv >= 89)
+               strcpy(p->csums_alg, nc->csums_alg);
+       rcu_read_unlock();
 
-       return err;
+       return drbd_send_command(mdev, sock, cmd, size, NULL, 0);
 }
 
-int drbd_send_protocol(struct drbd_tconn *tconn)
+int __drbd_send_protocol(struct drbd_tconn *tconn, enum drbd_packet cmd)
 {
+       struct drbd_socket *sock;
        struct p_protocol *p;
-       int size, cf, err;
+       struct net_conf *nc;
+       int size, cf;
 
-       size = sizeof(struct p_protocol);
+       sock = &tconn->data;
+       p = __conn_prepare_command(tconn, sock);
+       if (!p)
+               return -EIO;
 
-       if (tconn->agreed_pro_version >= 87)
-               size += strlen(tconn->net_conf->integrity_alg) + 1;
+       rcu_read_lock();
+       nc = rcu_dereference(tconn->net_conf);
 
-       /* we must not recurse into our own queue,
-        * as that is blocked during handshake */
-       p = kmalloc(size, GFP_NOIO);
-       if (p == NULL)
-               return -ENOMEM;
+       if (nc->tentative && tconn->agreed_pro_version < 92) {
+               rcu_read_unlock();
+               mutex_unlock(&sock->mutex);
+               conn_err(tconn, "--dry-run is not supported by peer");
+               return -EOPNOTSUPP;
+       }
 
-       p->protocol      = cpu_to_be32(tconn->net_conf->wire_protocol);
-       p->after_sb_0p   = cpu_to_be32(tconn->net_conf->after_sb_0p);
-       p->after_sb_1p   = cpu_to_be32(tconn->net_conf->after_sb_1p);
-       p->after_sb_2p   = cpu_to_be32(tconn->net_conf->after_sb_2p);
-       p->two_primaries = cpu_to_be32(tconn->net_conf->two_primaries);
+       size = sizeof(*p);
+       if (tconn->agreed_pro_version >= 87)
+               size += strlen(nc->integrity_alg) + 1;
 
+       p->protocol      = cpu_to_be32(nc->wire_protocol);
+       p->after_sb_0p   = cpu_to_be32(nc->after_sb_0p);
+       p->after_sb_1p   = cpu_to_be32(nc->after_sb_1p);
+       p->after_sb_2p   = cpu_to_be32(nc->after_sb_2p);
+       p->two_primaries = cpu_to_be32(nc->two_primaries);
        cf = 0;
-       if (tconn->net_conf->want_lose)
-               cf |= CF_WANT_LOSE;
-       if (tconn->net_conf->dry_run) {
-               if (tconn->agreed_pro_version >= 92)
-                       cf |= CF_DRY_RUN;
-               else {
-                       conn_err(tconn, "--dry-run is not supported by peer");
-                       kfree(p);
-                       return -EOPNOTSUPP;
-               }
-       }
+       if (nc->discard_my_data)
+               cf |= CF_DISCARD_MY_DATA;
+       if (nc->tentative)
+               cf |= CF_DRY_RUN;
        p->conn_flags    = cpu_to_be32(cf);
 
        if (tconn->agreed_pro_version >= 87)
-               strcpy(p->integrity_alg, tconn->net_conf->integrity_alg);
+               strcpy(p->integrity_alg, nc->integrity_alg);
+       rcu_read_unlock();
+
+       return __conn_send_command(tconn, sock, cmd, size, NULL, 0);
+}
+
+int drbd_send_protocol(struct drbd_tconn *tconn)
+{
+       int err;
+
+       mutex_lock(&tconn->data.mutex);
+       err = __drbd_send_protocol(tconn, P_PROTOCOL);
+       mutex_unlock(&tconn->data.mutex);
 
-       err = conn_send_cmd2(tconn, P_PROTOCOL, p->head.payload, size - sizeof(struct p_header));
-       kfree(p);
        return err;
 }
 
 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
 {
-       struct p_uuids p;
+       struct drbd_socket *sock;
+       struct p_uuids *p;
        int i;
 
        if (!get_ldev_if_state(mdev, D_NEGOTIATING))
                return 0;
 
+       sock = &mdev->tconn->data;
+       p = drbd_prepare_command(mdev, sock);
+       if (!p) {
+               put_ldev(mdev);
+               return -EIO;
+       }
        for (i = UI_CURRENT; i < UI_SIZE; i++)
-               p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
+               p->uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
 
        mdev->comm_bm_set = drbd_bm_total_weight(mdev);
-       p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
-       uuid_flags |= mdev->tconn->net_conf->want_lose ? 1 : 0;
+       p->uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
+       rcu_read_lock();
+       uuid_flags |= rcu_dereference(mdev->tconn->net_conf)->discard_my_data ? 1 : 0;
+       rcu_read_unlock();
        uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
        uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
-       p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
+       p->uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
 
        put_ldev(mdev);
-
-       return drbd_send_cmd(mdev, &mdev->tconn->data, P_UUIDS, &p.head, sizeof(p));
+       return drbd_send_command(mdev, sock, P_UUIDS, sizeof(*p), NULL, 0);
 }
 
 int drbd_send_uuids(struct drbd_conf *mdev)
@@ -919,30 +884,42 @@ void drbd_print_uuids(struct drbd_conf *mdev, const char *text)
 
 void drbd_gen_and_send_sync_uuid(struct drbd_conf *mdev)
 {
-       struct p_rs_uuid p;
+       struct drbd_socket *sock;
+       struct p_rs_uuid *p;
        u64 uuid;
 
        D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
 
-       uuid = mdev->ldev->md.uuid[UI_BITMAP] + UUID_NEW_BM_OFFSET;
+       uuid = mdev->ldev->md.uuid[UI_BITMAP];
+       if (uuid && uuid != UUID_JUST_CREATED)
+               uuid = uuid + UUID_NEW_BM_OFFSET;
+       else
+               get_random_bytes(&uuid, sizeof(u64));
        drbd_uuid_set(mdev, UI_BITMAP, uuid);
        drbd_print_uuids(mdev, "updated sync UUID");
        drbd_md_sync(mdev);
-       p.uuid = cpu_to_be64(uuid);
 
-       drbd_send_cmd(mdev, &mdev->tconn->data, P_SYNC_UUID, &p.head, sizeof(p));
+       sock = &mdev->tconn->data;
+       p = drbd_prepare_command(mdev, sock);
+       if (p) {
+               p->uuid = cpu_to_be64(uuid);
+               drbd_send_command(mdev, sock, P_SYNC_UUID, sizeof(*p), NULL, 0);
+       }
 }
 
 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
 {
-       struct p_sizes p;
+       struct drbd_socket *sock;
+       struct p_sizes *p;
        sector_t d_size, u_size;
        int q_order_type, max_bio_size;
 
        if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
                D_ASSERT(mdev->ldev->backing_bdev);
                d_size = drbd_get_max_capacity(mdev->ldev);
-               u_size = mdev->ldev->dc.disk_size;
+               rcu_read_lock();
+               u_size = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
+               rcu_read_unlock();
                q_order_type = drbd_queue_order_type(mdev);
                max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
                max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE);
@@ -954,72 +931,143 @@ int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags fl
                max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
        }
 
-       p.d_size = cpu_to_be64(d_size);
-       p.u_size = cpu_to_be64(u_size);
-       p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
-       p.max_bio_size = cpu_to_be32(max_bio_size);
-       p.queue_order_type = cpu_to_be16(q_order_type);
-       p.dds_flags = cpu_to_be16(flags);
+       sock = &mdev->tconn->data;
+       p = drbd_prepare_command(mdev, sock);
+       if (!p)
+               return -EIO;
+
+       if (mdev->tconn->agreed_pro_version <= 94)
+               max_bio_size = min_t(int, max_bio_size, DRBD_MAX_SIZE_H80_PACKET);
+       else if (mdev->tconn->agreed_pro_version < 100)
+               max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE_P95);
 
-       return drbd_send_cmd(mdev, &mdev->tconn->data, P_SIZES, &p.head, sizeof(p));
+       p->d_size = cpu_to_be64(d_size);
+       p->u_size = cpu_to_be64(u_size);
+       p->c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
+       p->max_bio_size = cpu_to_be32(max_bio_size);
+       p->queue_order_type = cpu_to_be16(q_order_type);
+       p->dds_flags = cpu_to_be16(flags);
+       return drbd_send_command(mdev, sock, P_SIZES, sizeof(*p), NULL, 0);
 }
 
 /**
- * drbd_send_state() - Sends the drbd state to the peer
+ * drbd_send_current_state() - Sends the drbd state to the peer
  * @mdev:      DRBD device.
  */
-int drbd_send_state(struct drbd_conf *mdev)
+int drbd_send_current_state(struct drbd_conf *mdev)
 {
-       struct socket *sock;
-       struct p_state p;
-       int err = -EIO;
+       struct drbd_socket *sock;
+       struct p_state *p;
 
-       mutex_lock(&mdev->tconn->data.mutex);
+       sock = &mdev->tconn->data;
+       p = drbd_prepare_command(mdev, sock);
+       if (!p)
+               return -EIO;
+       p->state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
+       return drbd_send_command(mdev, sock, P_STATE, sizeof(*p), NULL, 0);
+}
 
-       p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
-       sock = mdev->tconn->data.socket;
+/**
+ * drbd_send_state() - After a state change, sends the new state to the peer
+ * @mdev:      DRBD device.
+ * @state:     the state to send, not necessarily the current state.
+ *
+ * Each state change queues an "after_state_ch" work, which will eventually
+ * send the resulting new state to the peer. If more state changes happen
+ * between queuing and processing of the after_state_ch work, we still
+ * want to send each intermediary state in the order it occurred.
+ */
+int drbd_send_state(struct drbd_conf *mdev, union drbd_state state)
+{
+       struct drbd_socket *sock;
+       struct p_state *p;
 
-       if (likely(sock != NULL))
-               err = _drbd_send_cmd(mdev, sock, P_STATE, &p.head, sizeof(p), 0);
+       sock = &mdev->tconn->data;
+       p = drbd_prepare_command(mdev, sock);
+       if (!p)
+               return -EIO;
+       p->state = cpu_to_be32(state.i); /* Within the send mutex */
+       return drbd_send_command(mdev, sock, P_STATE, sizeof(*p), NULL, 0);
+}
 
-       mutex_unlock(&mdev->tconn->data.mutex);
+int drbd_send_state_req(struct drbd_conf *mdev, union drbd_state mask, union drbd_state val)
+{
+       struct drbd_socket *sock;
+       struct p_req_state *p;
 
-       return err;
+       sock = &mdev->tconn->data;
+       p = drbd_prepare_command(mdev, sock);
+       if (!p)
+               return -EIO;
+       p->mask = cpu_to_be32(mask.i);
+       p->val = cpu_to_be32(val.i);
+       return drbd_send_command(mdev, sock, P_STATE_CHG_REQ, sizeof(*p), NULL, 0);
 }
 
-int _conn_send_state_req(struct drbd_tconn *tconn, int vnr, enum drbd_packet cmd,
-                        union drbd_state mask, union drbd_state val)
+int conn_send_state_req(struct drbd_tconn *tconn, union drbd_state mask, union drbd_state val)
 {
-       struct p_req_state p;
-
-       p.mask    = cpu_to_be32(mask.i);
-       p.val     = cpu_to_be32(val.i);
+       enum drbd_packet cmd;
+       struct drbd_socket *sock;
+       struct p_req_state *p;
 
-       return conn_send_cmd(tconn, vnr, &tconn->data, cmd, &p.head, sizeof(p));
+       cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REQ : P_CONN_ST_CHG_REQ;
+       sock = &tconn->data;
+       p = conn_prepare_command(tconn, sock);
+       if (!p)
+               return -EIO;
+       p->mask = cpu_to_be32(mask.i);
+       p->val = cpu_to_be32(val.i);
+       return conn_send_command(tconn, sock, cmd, sizeof(*p), NULL, 0);
 }
 
 void drbd_send_sr_reply(struct drbd_conf *mdev, enum drbd_state_rv retcode)
 {
-       struct p_req_state_reply p;
-
-       p.retcode    = cpu_to_be32(retcode);
+       struct drbd_socket *sock;
+       struct p_req_state_reply *p;
 
-       drbd_send_cmd(mdev, &mdev->tconn->meta, P_STATE_CHG_REPLY, &p.head, sizeof(p));
+       sock = &mdev->tconn->meta;
+       p = drbd_prepare_command(mdev, sock);
+       if (p) {
+               p->retcode = cpu_to_be32(retcode);
+               drbd_send_command(mdev, sock, P_STATE_CHG_REPLY, sizeof(*p), NULL, 0);
+       }
 }
 
-int conn_send_sr_reply(struct drbd_tconn *tconn, enum drbd_state_rv retcode)
+void conn_send_sr_reply(struct drbd_tconn *tconn, enum drbd_state_rv retcode)
 {
-       struct p_req_state_reply p;
+       struct drbd_socket *sock;
+       struct p_req_state_reply *p;
        enum drbd_packet cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REPLY : P_CONN_ST_CHG_REPLY;
 
-       p.retcode    = cpu_to_be32(retcode);
+       sock = &tconn->meta;
+       p = conn_prepare_command(tconn, sock);
+       if (p) {
+               p->retcode = cpu_to_be32(retcode);
+               conn_send_command(tconn, sock, cmd, sizeof(*p), NULL, 0);
+       }
+}
+
+static void dcbp_set_code(struct p_compressed_bm *p, enum drbd_bitmap_code code)
+{
+       BUG_ON(code & ~0xf);
+       p->encoding = (p->encoding & ~0xf) | code;
+}
 
-       return !conn_send_cmd(tconn, 0, &tconn->meta, cmd, &p.head, sizeof(p));
+static void dcbp_set_start(struct p_compressed_bm *p, int set)
+{
+       p->encoding = (p->encoding & ~0x80) | (set ? 0x80 : 0);
+}
+
+static void dcbp_set_pad_bits(struct p_compressed_bm *p, int n)
+{
+       BUG_ON(n & ~0x7);
+       p->encoding = (p->encoding & (~0x7 << 4)) | (n << 4);
 }
 
 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
-       struct p_compressed_bm *p,
-       struct bm_xfer_ctx *c)
+                        struct p_compressed_bm *p,
+                        unsigned int size,
+                        struct bm_xfer_ctx *c)
 {
        struct bitstream bs;
        unsigned long plain_bits;
@@ -1027,19 +1075,21 @@ int fill_bitmap_rle_bits(struct drbd_conf *mdev,
        unsigned long rl;
        unsigned len;
        unsigned toggle;
-       int bits;
+       int bits, use_rle;
 
        /* may we use this feature? */
-       if ((mdev->tconn->net_conf->use_rle == 0) ||
-               (mdev->tconn->agreed_pro_version < 90))
-                       return 0;
+       rcu_read_lock();
+       use_rle = rcu_dereference(mdev->tconn->net_conf)->use_rle;
+       rcu_read_unlock();
+       if (!use_rle || mdev->tconn->agreed_pro_version < 90)
+               return 0;
 
        if (c->bit_offset >= c->bm_bits)
                return 0; /* nothing to do. */
 
        /* use at most thus many bytes */
-       bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
-       memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
+       bitstream_init(&bs, p->code, size, 0);
+       memset(p->code, 0, size);
        /* plain bits covered in this code string */
        plain_bits = 0;
 
@@ -1061,12 +1111,12 @@ int fill_bitmap_rle_bits(struct drbd_conf *mdev,
                        if (rl == 0) {
                                /* the first checked bit was set,
                                 * store start value, */
-                               DCBP_set_start(p, 1);
+                               dcbp_set_start(p, 1);
                                /* but skip encoding of zero run length */
                                toggle = !toggle;
                                continue;
                        }
-                       DCBP_set_start(p, 0);
+                       dcbp_set_start(p, 0);
                }
 
                /* paranoia: catch zero runlength.
@@ -1106,7 +1156,7 @@ int fill_bitmap_rle_bits(struct drbd_conf *mdev,
        bm_xfer_ctx_bit_to_word_offset(c);
 
        /* store pad_bits */
-       DCBP_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
+       dcbp_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
 
        return len;
 }
@@ -1118,48 +1168,52 @@ int fill_bitmap_rle_bits(struct drbd_conf *mdev,
  * code upon failure.
  */
 static int
-send_bitmap_rle_or_plain(struct drbd_conf *mdev,
-                        struct p_header *h, struct bm_xfer_ctx *c)
+send_bitmap_rle_or_plain(struct drbd_conf *mdev, struct bm_xfer_ctx *c)
 {
-       struct p_compressed_bm *p = (void*)h;
-       unsigned long num_words;
-       int len;
-       int ok;
-
-       len = fill_bitmap_rle_bits(mdev, p, c);
+       struct drbd_socket *sock = &mdev->tconn->data;
+       unsigned int header_size = drbd_header_size(mdev->tconn);
+       struct p_compressed_bm *p = sock->sbuf + header_size;
+       int len, err;
 
+       len = fill_bitmap_rle_bits(mdev, p,
+                       DRBD_SOCKET_BUFFER_SIZE - header_size - sizeof(*p), c);
        if (len < 0)
                return -EIO;
 
        if (len) {
-               DCBP_set_code(p, RLE_VLI_Bits);
-               ok = !_drbd_send_cmd(mdev, mdev->tconn->data.socket, P_COMPRESSED_BITMAP, h,
-                                    sizeof(*p) + len, 0);
-
+               dcbp_set_code(p, RLE_VLI_Bits);
+               err = __send_command(mdev->tconn, mdev->vnr, sock,
+                                    P_COMPRESSED_BITMAP, sizeof(*p) + len,
+                                    NULL, 0);
                c->packets[0]++;
-               c->bytes[0] += sizeof(*p) + len;
+               c->bytes[0] += header_size + sizeof(*p) + len;
 
                if (c->bit_offset >= c->bm_bits)
                        len = 0; /* DONE */
        } else {
                /* was not compressible.
                 * send a buffer full of plain text bits instead. */
-               num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
-               len = num_words * sizeof(long);
+               unsigned int data_size;
+               unsigned long num_words;
+               unsigned long *p = sock->sbuf + header_size;
+
+               data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
+               num_words = min_t(size_t, data_size / sizeof(*p),
+                                 c->bm_words - c->word_offset);
+               len = num_words * sizeof(*p);
                if (len)
-                       drbd_bm_get_lel(mdev, c->word_offset, num_words, (unsigned long*)h->payload);
-               ok = !_drbd_send_cmd(mdev, mdev->tconn->data.socket, P_BITMAP,
-                                    h, sizeof(struct p_header80) + len, 0);
+                       drbd_bm_get_lel(mdev, c->word_offset, num_words, p);
+               err = __send_command(mdev->tconn, mdev->vnr, sock, P_BITMAP, len, NULL, 0);
                c->word_offset += num_words;
                c->bit_offset = c->word_offset * BITS_PER_LONG;
 
                c->packets[1]++;
-               c->bytes[1] += sizeof(struct p_header80) + len;
+               c->bytes[1] += header_size + len;
 
                if (c->bit_offset > c->bm_bits)
                        c->bit_offset = c->bm_bits;
        }
-       if (ok) {
+       if (!err) {
                if (len == 0) {
                        INFO_bm_xfer_stats(mdev, "send", c);
                        return 0;
@@ -1170,23 +1224,14 @@ send_bitmap_rle_or_plain(struct drbd_conf *mdev,
 }
 
 /* See the comment at receive_bitmap() */
-int _drbd_send_bitmap(struct drbd_conf *mdev)
+static int _drbd_send_bitmap(struct drbd_conf *mdev)
 {
        struct bm_xfer_ctx c;
-       struct p_header *p;
        int err;
 
        if (!expect(mdev->bitmap))
                return false;
 
-       /* maybe we should use some per thread scratch page,
-        * and allocate that during initial device creation? */
-       p = (struct p_header *) __get_free_page(GFP_NOIO);
-       if (!p) {
-               dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
-               return false;
-       }
-
        if (get_ldev(mdev)) {
                if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
                        dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
@@ -1210,32 +1255,39 @@ int _drbd_send_bitmap(struct drbd_conf *mdev)
        };
 
        do {
-               err = send_bitmap_rle_or_plain(mdev, p, &c);
+               err = send_bitmap_rle_or_plain(mdev, &c);
        } while (err > 0);
 
-       free_page((unsigned long) p);
        return err == 0;
 }
 
 int drbd_send_bitmap(struct drbd_conf *mdev)
 {
-       int err;
+       struct drbd_socket *sock = &mdev->tconn->data;
+       int err = -1;
 
-       if (drbd_get_data_sock(mdev->tconn))
-               return -1;
-       err = !_drbd_send_bitmap(mdev);
-       drbd_put_data_sock(mdev->tconn);
+       mutex_lock(&sock->mutex);
+       if (sock->socket)
+               err = !_drbd_send_bitmap(mdev);
+       mutex_unlock(&sock->mutex);
        return err;
 }
-void drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
+
+void drbd_send_b_ack(struct drbd_tconn *tconn, u32 barrier_nr, u32 set_size)
 {
-       struct p_barrier_ack p;
+       struct drbd_socket *sock;
+       struct p_barrier_ack *p;
 
-       p.barrier  = barrier_nr;
-       p.set_size = cpu_to_be32(set_size);
+       if (tconn->cstate < C_WF_REPORT_PARAMS)
+               return;
 
-       if (mdev->state.conn >= C_CONNECTED)
-               drbd_send_cmd(mdev, &mdev->tconn->meta, P_BARRIER_ACK, &p.head, sizeof(p));
+       sock = &tconn->meta;
+       p = conn_prepare_command(tconn, sock);
+       if (!p)
+               return;
+       p->barrier = barrier_nr;
+       p->set_size = cpu_to_be32(set_size);
+       conn_send_command(tconn, sock, P_BARRIER_ACK, sizeof(*p), NULL, 0);
 }
 
 /**
@@ -1249,16 +1301,21 @@ void drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
                          u64 sector, u32 blksize, u64 block_id)
 {
-       struct p_block_ack p;
+       struct drbd_socket *sock;
+       struct p_block_ack *p;
 
-       p.sector   = sector;
-       p.block_id = block_id;
-       p.blksize  = blksize;
-       p.seq_num  = cpu_to_be32(atomic_inc_return(&mdev->packet_seq));
+       if (mdev->state.conn < C_CONNECTED)
+               return -EIO;
 
-       if (!mdev->tconn->meta.socket || mdev->state.conn < C_CONNECTED)
+       sock = &mdev->tconn->meta;
+       p = drbd_prepare_command(mdev, sock);
+       if (!p)
                return -EIO;
-       return drbd_send_cmd(mdev, &mdev->tconn->meta, cmd, &p.head, sizeof(p));
+       p->sector = sector;
+       p->block_id = block_id;
+       p->blksize = blksize;
+       p->seq_num = cpu_to_be32(atomic_inc_return(&mdev->packet_seq));
+       return drbd_send_command(mdev, sock, cmd, sizeof(*p), NULL, 0);
 }
 
 /* dp->sector and dp->block_id already/still in network byte order,
@@ -1267,8 +1324,8 @@ static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
 void drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packet cmd,
                      struct p_data *dp, int data_size)
 {
-       data_size -= (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
-               crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
+       if (mdev->tconn->peer_integrity_tfm)
+               data_size -= crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
        _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
                       dp->block_id);
 }
@@ -1308,43 +1365,51 @@ int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packet cmd,
 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
                       sector_t sector, int size, u64 block_id)
 {
-       struct p_block_req p;
-
-       p.sector   = cpu_to_be64(sector);
-       p.block_id = block_id;
-       p.blksize  = cpu_to_be32(size);
+       struct drbd_socket *sock;
+       struct p_block_req *p;
 
-       return drbd_send_cmd(mdev, &mdev->tconn->data, cmd, &p.head, sizeof(p));
+       sock = &mdev->tconn->data;
+       p = drbd_prepare_command(mdev, sock);
+       if (!p)
+               return -EIO;
+       p->sector = cpu_to_be64(sector);
+       p->block_id = block_id;
+       p->blksize = cpu_to_be32(size);
+       return drbd_send_command(mdev, sock, cmd, sizeof(*p), NULL, 0);
 }
 
 int drbd_send_drequest_csum(struct drbd_conf *mdev, sector_t sector, int size,
                            void *digest, int digest_size, enum drbd_packet cmd)
 {
-       int err;
-       struct p_block_req p;
+       struct drbd_socket *sock;
+       struct p_block_req *p;
 
-       prepare_header(mdev, &p.head, cmd, sizeof(p) - sizeof(struct p_header) + digest_size);
-       p.sector   = cpu_to_be64(sector);
-       p.block_id = ID_SYNCER /* unused */;
-       p.blksize  = cpu_to_be32(size);
+       /* FIXME: Put the digest into the preallocated socket buffer.  */
 
-       mutex_lock(&mdev->tconn->data.mutex);
-       err = drbd_send_all(mdev->tconn, mdev->tconn->data.socket, &p, sizeof(p), 0);
-       if (!err)
-               err = drbd_send_all(mdev->tconn, mdev->tconn->data.socket, digest, digest_size, 0);
-       mutex_unlock(&mdev->tconn->data.mutex);
-       return err;
+       sock = &mdev->tconn->data;
+       p = drbd_prepare_command(mdev, sock);
+       if (!p)
+               return -EIO;
+       p->sector = cpu_to_be64(sector);
+       p->block_id = ID_SYNCER /* unused */;
+       p->blksize = cpu_to_be32(size);
+       return drbd_send_command(mdev, sock, cmd, sizeof(*p),
+                                digest, digest_size);
 }
 
 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
 {
-       struct p_block_req p;
-
-       p.sector   = cpu_to_be64(sector);
-       p.block_id = ID_SYNCER /* unused */;
-       p.blksize  = cpu_to_be32(size);
+       struct drbd_socket *sock;
+       struct p_block_req *p;
 
-       return drbd_send_cmd(mdev, &mdev->tconn->data, P_OV_REQUEST, &p.head, sizeof(p));
+       sock = &mdev->tconn->data;
+       p = drbd_prepare_command(mdev, sock);
+       if (!p)
+               return -EIO;
+       p->sector = cpu_to_be64(sector);
+       p->block_id = ID_SYNCER /* unused */;
+       p->blksize = cpu_to_be32(size);
+       return drbd_send_command(mdev, sock, P_OV_REQUEST, sizeof(*p), NULL, 0);
 }
 
 /* called on sndtimeo
@@ -1472,7 +1537,7 @@ static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
        struct bio_vec *bvec;
        int i;
        /* hint all but last page with MSG_MORE */
-       __bio_for_each_segment(bvec, bio, i, 0) {
+       bio_for_each_segment(bvec, bio, i) {
                int err;
 
                err = _drbd_no_send_page(mdev, bvec->bv_page,
@@ -1489,7 +1554,7 @@ static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
        struct bio_vec *bvec;
        int i;
        /* hint all but last page with MSG_MORE */
-       __bio_for_each_segment(bvec, bio, i, 0) {
+       bio_for_each_segment(bvec, bio, i) {
                int err;
 
                err = _drbd_send_page(mdev, bvec->bv_page,
@@ -1537,39 +1602,36 @@ static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
  */
 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
 {
-       int ok = 1;
-       struct p_data p;
+       struct drbd_socket *sock;
+       struct p_data *p;
        unsigned int dp_flags = 0;
-       void *dgb;
        int dgs;
+       int err;
 
-       if (drbd_get_data_sock(mdev->tconn))
-               return 0;
-
-       dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_w_tfm) ?
-               crypto_hash_digestsize(mdev->tconn->integrity_w_tfm) : 0;
-
-       prepare_header(mdev, &p.head, P_DATA, sizeof(p) - sizeof(struct p_header) + dgs + req->i.size);
-       p.sector   = cpu_to_be64(req->i.sector);
-       p.block_id = (unsigned long)req;
-       p.seq_num  = cpu_to_be32(req->seq_num = atomic_inc_return(&mdev->packet_seq));
+       sock = &mdev->tconn->data;
+       p = drbd_prepare_command(mdev, sock);
+       dgs = mdev->tconn->integrity_tfm ? crypto_hash_digestsize(mdev->tconn->integrity_tfm) : 0;
 
+       if (!p)
+               return -EIO;
+       p->sector = cpu_to_be64(req->i.sector);
+       p->block_id = (unsigned long)req;
+       p->seq_num = cpu_to_be32(atomic_inc_return(&mdev->packet_seq));
        dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
-
        if (mdev->state.conn >= C_SYNC_SOURCE &&
            mdev->state.conn <= C_PAUSED_SYNC_T)
                dp_flags |= DP_MAY_SET_IN_SYNC;
-
-       p.dp_flags = cpu_to_be32(dp_flags);
-       set_bit(UNPLUG_REMOTE, &mdev->flags);
-       ok = (sizeof(p) ==
-               drbd_send(mdev->tconn, mdev->tconn->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0));
-       if (ok && dgs) {
-               dgb = mdev->tconn->int_dig_out;
-               drbd_csum_bio(mdev, mdev->tconn->integrity_w_tfm, req->master_bio, dgb);
-               ok = dgs == drbd_send(mdev->tconn, mdev->tconn->data.socket, dgb, dgs, 0);
-       }
-       if (ok) {
+       if (mdev->tconn->agreed_pro_version >= 100) {
+               if (req->rq_state & RQ_EXP_RECEIVE_ACK)
+                       dp_flags |= DP_SEND_RECEIVE_ACK;
+               if (req->rq_state & RQ_EXP_WRITE_ACK)
+                       dp_flags |= DP_SEND_WRITE_ACK;
+       }
+       p->dp_flags = cpu_to_be32(dp_flags);
+       if (dgs)
+               drbd_csum_bio(mdev, mdev->tconn->integrity_tfm, req->master_bio, p + 1);
+       err = __send_command(mdev->tconn, mdev->vnr, sock, P_DATA, sizeof(*p) + dgs, NULL, req->i.size);
+       if (!err) {
                /* For protocol A, we have to memcpy the payload into
                 * socket buffers, as we may complete right away
                 * as soon as we handed it over to tcp, at which point the data
@@ -1581,18 +1643,18 @@ int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
                 * out ok after sending on this side, but does not fit on the
                 * receiving side, we sure have detected corruption elsewhere.
                 */
-               if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_A || dgs)
-                       ok = !_drbd_send_bio(mdev, req->master_bio);
+               if (!(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK)) || dgs)
+                       err = _drbd_send_bio(mdev, req->master_bio);
                else
-                       ok = !_drbd_send_zc_bio(mdev, req->master_bio);
+                       err = _drbd_send_zc_bio(mdev, req->master_bio);
 
                /* double check digest, sometimes buffers have been modified in flight. */
                if (dgs > 0 && dgs <= 64) {
                        /* 64 byte, 512 bit, is the largest digest size
                         * currently supported in kernel crypto. */
                        unsigned char digest[64];
-                       drbd_csum_bio(mdev, mdev->tconn->integrity_w_tfm, req->master_bio, digest);
-                       if (memcmp(mdev->tconn->int_dig_out, digest, dgs)) {
+                       drbd_csum_bio(mdev, mdev->tconn->integrity_tfm, req->master_bio, digest);
+                       if (memcmp(p + 1, digest, dgs)) {
                                dev_warn(DEV,
                                        "Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
                                        (unsigned long long)req->i.sector, req->i.size);
@@ -1601,10 +1663,9 @@ int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
                     ... Be noisy about digest too large ...
                } */
        }
+       mutex_unlock(&sock->mutex);  /* locked by drbd_prepare_command() */
 
-       drbd_put_data_sock(mdev->tconn);
-
-       return ok;
+       return err;
 }
 
 /* answer packet, used to send data back for read requests:
@@ -1614,51 +1675,44 @@ int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packet cmd,
                    struct drbd_peer_request *peer_req)
 {
+       struct drbd_socket *sock;
+       struct p_data *p;
        int err;
-       struct p_data p;
-       void *dgb;
        int dgs;
 
-       dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_w_tfm) ?
-               crypto_hash_digestsize(mdev->tconn->integrity_w_tfm) : 0;
+       sock = &mdev->tconn->data;
+       p = drbd_prepare_command(mdev, sock);
 
-       prepare_header(mdev, &p.head, cmd, sizeof(p) -
-                                          sizeof(struct p_header80) +
-                                          dgs + peer_req->i.size);
-       p.sector   = cpu_to_be64(peer_req->i.sector);
-       p.block_id = peer_req->block_id;
-       p.seq_num = 0;  /* unused */
+       dgs = mdev->tconn->integrity_tfm ? crypto_hash_digestsize(mdev->tconn->integrity_tfm) : 0;
 
-       /* Only called by our kernel thread.
-        * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
-        * in response to admin command or module unload.
-        */
-       err = drbd_get_data_sock(mdev->tconn);
-       if (err)
-               return err;
-       err = drbd_send_all(mdev->tconn, mdev->tconn->data.socket, &p,
-                           sizeof(p), dgs ? MSG_MORE : 0);
-       if (!err && dgs) {
-               dgb = mdev->tconn->int_dig_out;
-               drbd_csum_ee(mdev, mdev->tconn->integrity_w_tfm, peer_req, dgb);
-               err = drbd_send_all(mdev->tconn, mdev->tconn->data.socket, dgb,
-                                   dgs, 0);
-       }
+       if (!p)
+               return -EIO;
+       p->sector = cpu_to_be64(peer_req->i.sector);
+       p->block_id = peer_req->block_id;
+       p->seq_num = 0;  /* unused */
+       p->dp_flags = 0;
+       if (dgs)
+               drbd_csum_ee(mdev, mdev->tconn->integrity_tfm, peer_req, p + 1);
+       err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, sizeof(*p) + dgs, NULL, peer_req->i.size);
        if (!err)
                err = _drbd_send_zc_ee(mdev, peer_req);
-       drbd_put_data_sock(mdev->tconn);
+       mutex_unlock(&sock->mutex);  /* locked by drbd_prepare_command() */
 
        return err;
 }
 
-int drbd_send_oos(struct drbd_conf *mdev, struct drbd_request *req)
+int drbd_send_out_of_sync(struct drbd_conf *mdev, struct drbd_request *req)
 {
-       struct p_block_desc p;
-
-       p.sector  = cpu_to_be64(req->i.sector);
-       p.blksize = cpu_to_be32(req->i.size);
+       struct drbd_socket *sock;
+       struct p_block_desc *p;
 
-       return drbd_send_cmd(mdev, &mdev->tconn->data, P_OUT_OF_SYNC, &p.head, sizeof(p));
+       sock = &mdev->tconn->data;
+       p = drbd_prepare_command(mdev, sock);
+       if (!p)
+               return -EIO;
+       p->sector = cpu_to_be64(req->i.sector);
+       p->blksize = cpu_to_be32(req->i.size);
+       return drbd_send_command(mdev, sock, P_OUT_OF_SYNC, sizeof(*p), NULL, 0);
 }
 
 /*
@@ -1699,7 +1753,9 @@ int drbd_send(struct drbd_tconn *tconn, struct socket *sock,
        msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
 
        if (sock == tconn->data.socket) {
-               tconn->ko_count = tconn->net_conf->ko_count;
+               rcu_read_lock();
+               tconn->ko_count = rcu_dereference(tconn->net_conf)->ko_count;
+               rcu_read_unlock();
                drbd_update_congested(tconn);
        }
        do {
@@ -1803,15 +1859,12 @@ static void drbd_set_defaults(struct drbd_conf *mdev)
 {
        /* Beware! The actual layout differs
         * between big endian and little endian */
-       mdev->state = (union drbd_state) {
+       mdev->state = (union drbd_dev_state) {
                { .role = R_SECONDARY,
                  .peer = R_UNKNOWN,
                  .conn = C_STANDALONE,
                  .disk = D_DISKLESS,
                  .pdsk = D_UNKNOWN,
-                 .susp = 0,
-                 .susp_nod = 0,
-                 .susp_fen = 0
                } };
 }
 
@@ -1827,19 +1880,17 @@ void drbd_init_set_defaults(struct drbd_conf *mdev)
        atomic_set(&mdev->rs_pending_cnt, 0);
        atomic_set(&mdev->unacked_cnt, 0);
        atomic_set(&mdev->local_cnt, 0);
-       atomic_set(&mdev->pp_in_use, 0);
        atomic_set(&mdev->pp_in_use_by_net, 0);
        atomic_set(&mdev->rs_sect_in, 0);
        atomic_set(&mdev->rs_sect_ev, 0);
        atomic_set(&mdev->ap_in_flight, 0);
+       atomic_set(&mdev->md_io_in_use, 0);
 
-       mutex_init(&mdev->md_io_mutex);
        mutex_init(&mdev->own_state_mutex);
        mdev->state_mutex = &mdev->own_state_mutex;
 
        spin_lock_init(&mdev->al_lock);
        spin_lock_init(&mdev->peer_seq_lock);
-       spin_lock_init(&mdev->epoch_lock);
 
        INIT_LIST_HEAD(&mdev->active_ee);
        INIT_LIST_HEAD(&mdev->sync_ee);
@@ -1887,8 +1938,6 @@ void drbd_init_set_defaults(struct drbd_conf *mdev)
        init_waitqueue_head(&mdev->al_wait);
        init_waitqueue_head(&mdev->seq_wait);
 
-       /* mdev->tconn->agreed_pro_version gets initialized in drbd_connect() */
-       mdev->write_ordering = WO_bdev_flush;
        mdev->resync_wenr = LC_FREE;
        mdev->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
        mdev->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
@@ -1901,9 +1950,6 @@ void drbd_mdev_cleanup(struct drbd_conf *mdev)
                dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
                                mdev->tconn->receiver.t_state);
 
-       /* no need to lock it, I'm the only thread alive */
-       if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
-               dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
        mdev->al_writ_cnt  =
        mdev->bm_writ_cnt  =
        mdev->read_cnt     =
@@ -1929,21 +1975,18 @@ void drbd_mdev_cleanup(struct drbd_conf *mdev)
                drbd_bm_cleanup(mdev);
        }
 
-       drbd_free_resources(mdev);
+       drbd_free_bc(mdev->ldev);
+       mdev->ldev = NULL;
+
        clear_bit(AL_SUSPENDED, &mdev->flags);
 
-       /*
-        * currently we drbd_init_ee only on module load, so
-        * we may do drbd_release_ee only on module unload!
-        */
        D_ASSERT(list_empty(&mdev->active_ee));
        D_ASSERT(list_empty(&mdev->sync_ee));
        D_ASSERT(list_empty(&mdev->done_ee));
        D_ASSERT(list_empty(&mdev->read_ee));
        D_ASSERT(list_empty(&mdev->net_ee));
        D_ASSERT(list_empty(&mdev->resync_reads));
-       D_ASSERT(list_empty(&mdev->tconn->data.work.q));
-       D_ASSERT(list_empty(&mdev->tconn->meta.work.q));
+       D_ASSERT(list_empty(&mdev->tconn->sender_work.q));
        D_ASSERT(list_empty(&mdev->resync_work.list));
        D_ASSERT(list_empty(&mdev->unplug_work.list));
        D_ASSERT(list_empty(&mdev->go_diskless.list));
@@ -2083,59 +2126,53 @@ static struct notifier_block drbd_notifier = {
        .notifier_call = drbd_notify_sys,
 };
 
-static void drbd_release_ee_lists(struct drbd_conf *mdev)
+static void drbd_release_all_peer_reqs(struct drbd_conf *mdev)
 {
        int rr;
 
-       rr = drbd_release_ee(mdev, &mdev->active_ee);
+       rr = drbd_free_peer_reqs(mdev, &mdev->active_ee);
        if (rr)
                dev_err(DEV, "%d EEs in active list found!\n", rr);
 
-       rr = drbd_release_ee(mdev, &mdev->sync_ee);
+       rr = drbd_free_peer_reqs(mdev, &mdev->sync_ee);
        if (rr)
                dev_err(DEV, "%d EEs in sync list found!\n", rr);
 
-       rr = drbd_release_ee(mdev, &mdev->read_ee);
+       rr = drbd_free_peer_reqs(mdev, &mdev->read_ee);
        if (rr)
                dev_err(DEV, "%d EEs in read list found!\n", rr);
 
-       rr = drbd_release_ee(mdev, &mdev->done_ee);
+       rr = drbd_free_peer_reqs(mdev, &mdev->done_ee);
        if (rr)
                dev_err(DEV, "%d EEs in done list found!\n", rr);
 
-       rr = drbd_release_ee(mdev, &mdev->net_ee);
+       rr = drbd_free_peer_reqs(mdev, &mdev->net_ee);
        if (rr)
                dev_err(DEV, "%d EEs in net list found!\n", rr);
 }
 
 /* caution. no locking. */
-void drbd_delete_device(unsigned int minor)
+void drbd_minor_destroy(struct kref *kref)
 {
-       struct drbd_conf *mdev = minor_to_mdev(minor);
-
-       if (!mdev)
-               return;
+       struct drbd_conf *mdev = container_of(kref, struct drbd_conf, kref);
+       struct drbd_tconn *tconn = mdev->tconn;
 
-       idr_remove(&mdev->tconn->volumes, mdev->vnr);
-       idr_remove(&minors, minor);
-       synchronize_rcu();
+       del_timer_sync(&mdev->request_timer);
 
        /* paranoia asserts */
        D_ASSERT(mdev->open_cnt == 0);
-       D_ASSERT(list_empty(&mdev->tconn->data.work.q));
        /* end paranoia asserts */
 
-       del_gendisk(mdev->vdisk);
-
        /* cleanup stuff that may have been allocated during
         * device (re-)configuration or state changes */
 
        if (mdev->this_bdev)
                bdput(mdev->this_bdev);
 
-       drbd_free_resources(mdev);
+       drbd_free_bc(mdev->ldev);
+       mdev->ldev = NULL;
 
-       drbd_release_ee_lists(mdev);
+       drbd_release_all_peer_reqs(mdev);
 
        lc_destroy(mdev->act_log);
        lc_destroy(mdev->resync);
@@ -2143,16 +2180,101 @@ void drbd_delete_device(unsigned int minor)
        kfree(mdev->p_uuid);
        /* mdev->p_uuid = NULL; */
 
-       /* cleanup the rest that has been
-        * allocated from drbd_new_device
-        * and actually free the mdev itself */
-       drbd_free_mdev(mdev);
+       if (mdev->bitmap) /* should no longer be there. */
+               drbd_bm_cleanup(mdev);
+       __free_page(mdev->md_io_page);
+       put_disk(mdev->vdisk);
+       blk_cleanup_queue(mdev->rq_queue);
+       kfree(mdev->rs_plan_s);
+       kfree(mdev);
+
+       kref_put(&tconn->kref, &conn_destroy);
+}
+
+/* One global retry thread, if we need to push back some bio and have it
+ * reinserted through our make request function.
+ */
+static struct retry_worker {
+       struct workqueue_struct *wq;
+       struct work_struct worker;
+
+       spinlock_t lock;
+       struct list_head writes;
+} retry;
+
+static void do_retry(struct work_struct *ws)
+{
+       struct retry_worker *retry = container_of(ws, struct retry_worker, worker);
+       LIST_HEAD(writes);
+       struct drbd_request *req, *tmp;
+
+       spin_lock_irq(&retry->lock);
+       list_splice_init(&retry->writes, &writes);
+       spin_unlock_irq(&retry->lock);
+
+       list_for_each_entry_safe(req, tmp, &writes, tl_requests) {
+               struct drbd_conf *mdev = req->w.mdev;
+               struct bio *bio = req->master_bio;
+               unsigned long start_time = req->start_time;
+               bool expected;
+
+               expected = 
+                       expect(atomic_read(&req->completion_ref) == 0) &&
+                       expect(req->rq_state & RQ_POSTPONED) &&
+                       expect((req->rq_state & RQ_LOCAL_PENDING) == 0 ||
+                               (req->rq_state & RQ_LOCAL_ABORTED) != 0);
+
+               if (!expected)
+                       dev_err(DEV, "req=%p completion_ref=%d rq_state=%x\n",
+                               req, atomic_read(&req->completion_ref),
+                               req->rq_state);
+
+               /* We still need to put one kref associated with the
+                * "completion_ref" going zero in the code path that queued it
+                * here.  The request object may still be referenced by a
+                * frozen local req->private_bio, in case we force-detached.
+                */
+               kref_put(&req->kref, drbd_req_destroy);
+
+               /* A single suspended or otherwise blocking device may stall
+                * all others as well.  Fortunately, this code path is to
+                * recover from a situation that "should not happen":
+                * concurrent writes in multi-primary setup.
+                * In a "normal" lifecycle, this workqueue is supposed to be
+                * destroyed without ever doing anything.
+                * If it turns out to be an issue anyways, we can do per
+                * resource (replication group) or per device (minor) retry
+                * workqueues instead.
+                */
+
+               /* We are not just doing generic_make_request(),
+                * as we want to keep the start_time information. */
+               inc_ap_bio(mdev);
+               __drbd_make_request(mdev, bio, start_time);
+       }
+}
+
+void drbd_restart_request(struct drbd_request *req)
+{
+       unsigned long flags;
+       spin_lock_irqsave(&retry.lock, flags);
+       list_move_tail(&req->tl_requests, &retry.writes);
+       spin_unlock_irqrestore(&retry.lock, flags);
+
+       /* Drop the extra reference that would otherwise
+        * have been dropped by complete_master_bio.
+        * do_retry() needs to grab a new one. */
+       dec_ap_bio(req->w.mdev);
+
+       queue_work(retry.wq, &retry.worker);
 }
 
+
 static void drbd_cleanup(void)
 {
        unsigned int i;
        struct drbd_conf *mdev;
+       struct drbd_tconn *tconn, *tmp;
 
        unregister_reboot_notifier(&drbd_notifier);
 
@@ -2167,10 +2289,26 @@ static void drbd_cleanup(void)
        if (drbd_proc)
                remove_proc_entry("drbd", NULL);
 
+       if (retry.wq)
+               destroy_workqueue(retry.wq);
+
        drbd_genl_unregister();
 
-       idr_for_each_entry(&minors, mdev, i)
-               drbd_delete_device(i);
+       idr_for_each_entry(&minors, mdev, i) {
+               idr_remove(&minors, mdev_to_minor(mdev));
+               idr_remove(&mdev->tconn->volumes, mdev->vnr);
+               del_gendisk(mdev->vdisk);
+               /* synchronize_rcu(); No other threads running at this point */
+               kref_put(&mdev->kref, &drbd_minor_destroy);
+       }
+
+       /* not _rcu since, no other updater anymore. Genl already unregistered */
+       list_for_each_entry_safe(tconn, tmp, &drbd_tconns, all_tconn) {
+               list_del(&tconn->all_tconn); /* not _rcu no proc, not other threads */
+               /* synchronize_rcu(); */
+               kref_put(&tconn->kref, &conn_destroy);
+       }
+
        drbd_destroy_mempools();
        unregister_blkdev(DRBD_MAJOR, "drbd");
 
@@ -2220,30 +2358,131 @@ out:
 
 static void drbd_init_workqueue(struct drbd_work_queue* wq)
 {
-       sema_init(&wq->s, 0);
        spin_lock_init(&wq->q_lock);
        INIT_LIST_HEAD(&wq->q);
+       init_waitqueue_head(&wq->q_wait);
 }
 
-struct drbd_tconn *conn_by_name(const char *name)
+struct drbd_tconn *conn_get_by_name(const char *name)
 {
        struct drbd_tconn *tconn;
 
        if (!name || !name[0])
                return NULL;
 
-       mutex_lock(&drbd_cfg_mutex);
-       list_for_each_entry(tconn, &drbd_tconns, all_tconn) {
-               if (!strcmp(tconn->name, name))
+       rcu_read_lock();
+       list_for_each_entry_rcu(tconn, &drbd_tconns, all_tconn) {
+               if (!strcmp(tconn->name, name)) {
+                       kref_get(&tconn->kref);
                        goto found;
+               }
        }
        tconn = NULL;
 found:
-       mutex_unlock(&drbd_cfg_mutex);
+       rcu_read_unlock();
        return tconn;
 }
 
-struct drbd_tconn *drbd_new_tconn(const char *name)
+struct drbd_tconn *conn_get_by_addrs(void *my_addr, int my_addr_len,
+                                    void *peer_addr, int peer_addr_len)
+{
+       struct drbd_tconn *tconn;
+
+       rcu_read_lock();
+       list_for_each_entry_rcu(tconn, &drbd_tconns, all_tconn) {
+               if (tconn->my_addr_len == my_addr_len &&
+                   tconn->peer_addr_len == peer_addr_len &&
+                   !memcmp(&tconn->my_addr, my_addr, my_addr_len) &&
+                   !memcmp(&tconn->peer_addr, peer_addr, peer_addr_len)) {
+                       kref_get(&tconn->kref);
+                       goto found;
+               }
+       }
+       tconn = NULL;
+found:
+       rcu_read_unlock();
+       return tconn;
+}
+
+static int drbd_alloc_socket(struct drbd_socket *socket)
+{
+       socket->rbuf = (void *) __get_free_page(GFP_KERNEL);
+       if (!socket->rbuf)
+               return -ENOMEM;
+       socket->sbuf = (void *) __get_free_page(GFP_KERNEL);
+       if (!socket->sbuf)
+               return -ENOMEM;
+       return 0;
+}
+
+static void drbd_free_socket(struct drbd_socket *socket)
+{
+       free_page((unsigned long) socket->sbuf);
+       free_page((unsigned long) socket->rbuf);
+}
+
+void conn_free_crypto(struct drbd_tconn *tconn)
+{
+       drbd_free_sock(tconn);
+
+       crypto_free_hash(tconn->csums_tfm);
+       crypto_free_hash(tconn->verify_tfm);
+       crypto_free_hash(tconn->cram_hmac_tfm);
+       crypto_free_hash(tconn->integrity_tfm);
+       crypto_free_hash(tconn->peer_integrity_tfm);
+       kfree(tconn->int_dig_in);
+       kfree(tconn->int_dig_vv);
+
+       tconn->csums_tfm = NULL;
+       tconn->verify_tfm = NULL;
+       tconn->cram_hmac_tfm = NULL;
+       tconn->integrity_tfm = NULL;
+       tconn->peer_integrity_tfm = NULL;
+       tconn->int_dig_in = NULL;
+       tconn->int_dig_vv = NULL;
+}
+
+int set_resource_options(struct drbd_tconn *tconn, struct res_opts *res_opts)
+{
+       cpumask_var_t new_cpu_mask;
+       int err;
+
+       if (!zalloc_cpumask_var(&new_cpu_mask, GFP_KERNEL))
+               return -ENOMEM;
+               /*
+               retcode = ERR_NOMEM;
+               drbd_msg_put_info("unable to allocate cpumask");
+               */
+
+       /* silently ignore cpu mask on UP kernel */
+       if (nr_cpu_ids > 1 && res_opts->cpu_mask[0] != 0) {
+               /* FIXME: Get rid of constant 32 here */
+               err = bitmap_parse(res_opts->cpu_mask, 32,
+                                  cpumask_bits(new_cpu_mask), nr_cpu_ids);
+               if (err) {
+                       conn_warn(tconn, "bitmap_parse() failed with %d\n", err);
+                       /* retcode = ERR_CPU_MASK_PARSE; */
+                       goto fail;
+               }
+       }
+       tconn->res_opts = *res_opts;
+       if (!cpumask_equal(tconn->cpu_mask, new_cpu_mask)) {
+               cpumask_copy(tconn->cpu_mask, new_cpu_mask);
+               drbd_calc_cpu_mask(tconn);
+               tconn->receiver.reset_cpu_mask = 1;
+               tconn->asender.reset_cpu_mask = 1;
+               tconn->worker.reset_cpu_mask = 1;
+       }
+       err = 0;
+
+fail:
+       free_cpumask_var(new_cpu_mask);
+       return err;
+
+}
+
+/* caller must be under genl_lock() */
+struct drbd_tconn *conn_create(const char *name, struct res_opts *res_opts)
 {
        struct drbd_tconn *tconn;
 
@@ -2255,58 +2494,77 @@ struct drbd_tconn *drbd_new_tconn(const char *name)
        if (!tconn->name)
                goto fail;
 
+       if (drbd_alloc_socket(&tconn->data))
+               goto fail;
+       if (drbd_alloc_socket(&tconn->meta))
+               goto fail;
+
        if (!zalloc_cpumask_var(&tconn->cpu_mask, GFP_KERNEL))
                goto fail;
 
-       if (!tl_init(tconn))
+       if (set_resource_options(tconn, res_opts))
+               goto fail;
+
+       tconn->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
+       if (!tconn->current_epoch)
                goto fail;
 
+       INIT_LIST_HEAD(&tconn->transfer_log);
+
+       INIT_LIST_HEAD(&tconn->current_epoch->list);
+       tconn->epochs = 1;
+       spin_lock_init(&tconn->epoch_lock);
+       tconn->write_ordering = WO_bdev_flush;
+
+       tconn->send.seen_any_write_yet = false;
+       tconn->send.current_epoch_nr = 0;
+       tconn->send.current_epoch_writes = 0;
+
        tconn->cstate = C_STANDALONE;
        mutex_init(&tconn->cstate_mutex);
        spin_lock_init(&tconn->req_lock);
-       atomic_set(&tconn->net_cnt, 0);
-       init_waitqueue_head(&tconn->net_cnt_wait);
+       mutex_init(&tconn->conf_update);
        init_waitqueue_head(&tconn->ping_wait);
        idr_init(&tconn->volumes);
 
-       drbd_init_workqueue(&tconn->data.work);
+       drbd_init_workqueue(&tconn->sender_work);
        mutex_init(&tconn->data.mutex);
-
-       drbd_init_workqueue(&tconn->meta.work);
        mutex_init(&tconn->meta.mutex);
 
        drbd_thread_init(tconn, &tconn->receiver, drbdd_init, "receiver");
        drbd_thread_init(tconn, &tconn->worker, drbd_worker, "worker");
        drbd_thread_init(tconn, &tconn->asender, drbd_asender, "asender");
 
-       tconn->res_opts = (struct res_opts) {
-               {}, 0, /* cpu_mask */
-               DRBD_ON_NO_DATA_DEF, /* on_no_data */
-       };
-
-       mutex_lock(&drbd_cfg_mutex);
-       list_add_tail(&tconn->all_tconn, &drbd_tconns);
-       mutex_unlock(&drbd_cfg_mutex);
+       kref_init(&tconn->kref);
+       list_add_tail_rcu(&tconn->all_tconn, &drbd_tconns);
 
        return tconn;
 
 fail:
-       tl_cleanup(tconn);
+       kfree(tconn->current_epoch);
        free_cpumask_var(tconn->cpu_mask);
+       drbd_free_socket(&tconn->meta);
+       drbd_free_socket(&tconn->data);
        kfree(tconn->name);
        kfree(tconn);
 
        return NULL;
 }
 
-void drbd_free_tconn(struct drbd_tconn *tconn)
+void conn_destroy(struct kref *kref)
 {
-       list_del(&tconn->all_tconn);
+       struct drbd_tconn *tconn = container_of(kref, struct drbd_tconn, kref);
+
+       if (atomic_read(&tconn->current_epoch->epoch_size) !=  0)
+               conn_err(tconn, "epoch_size:%d\n", atomic_read(&tconn->current_epoch->epoch_size));
+       kfree(tconn->current_epoch);
+
        idr_destroy(&tconn->volumes);
 
        free_cpumask_var(tconn->cpu_mask);
+       drbd_free_socket(&tconn->meta);
+       drbd_free_socket(&tconn->data);
        kfree(tconn->name);
-       kfree(tconn->int_dig_out);
        kfree(tconn->int_dig_in);
        kfree(tconn->int_dig_vv);
        kfree(tconn);
@@ -2330,7 +2588,9 @@ enum drbd_ret_code conn_new_minor(struct drbd_tconn *tconn, unsigned int minor,
        if (!mdev)
                return ERR_NOMEM;
 
+       kref_get(&tconn->kref);
        mdev->tconn = tconn;
+
        mdev->minor = minor;
        mdev->vnr = vnr;
 
@@ -2380,13 +2640,6 @@ enum drbd_ret_code conn_new_minor(struct drbd_tconn *tconn, unsigned int minor,
        mdev->read_requests = RB_ROOT;
        mdev->write_requests = RB_ROOT;
 
-       mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
-       if (!mdev->current_epoch)
-               goto out_no_epoch;
-
-       INIT_LIST_HEAD(&mdev->current_epoch->list);
-       mdev->epochs = 1;
-
        if (!idr_pre_get(&minors, GFP_KERNEL))
                goto out_no_minor_idr;
        if (idr_get_new_above(&minors, mdev, minor, &minor_got))
@@ -2407,11 +2660,12 @@ enum drbd_ret_code conn_new_minor(struct drbd_tconn *tconn, unsigned int minor,
                goto out_idr_remove_vol;
        }
        add_disk(disk);
+       kref_init(&mdev->kref); /* one ref for both idrs and the the add_disk */
 
        /* inherit the connection state */
        mdev->state.conn = tconn->cstate;
        if (mdev->state.conn == C_WF_REPORT_PARAMS)
-               drbd_connected(vnr, mdev, tconn);
+               drbd_connected(mdev);
 
        return NO_ERROR;
 
@@ -2421,8 +2675,6 @@ out_idr_remove_minor:
        idr_remove(&minors, minor_got);
        synchronize_rcu();
 out_no_minor_idr:
-       kfree(mdev->current_epoch);
-out_no_epoch:
        drbd_bm_cleanup(mdev);
 out_no_bitmap:
        __free_page(mdev->md_io_page);
@@ -2432,37 +2684,21 @@ out_no_disk:
        blk_cleanup_queue(q);
 out_no_q:
        kfree(mdev);
+       kref_put(&tconn->kref, &conn_destroy);
        return err;
 }
 
-/* counterpart of drbd_new_device.
- * last part of drbd_delete_device. */
-void drbd_free_mdev(struct drbd_conf *mdev)
-{
-       kfree(mdev->current_epoch);
-       if (mdev->bitmap) /* should no longer be there. */
-               drbd_bm_cleanup(mdev);
-       __free_page(mdev->md_io_page);
-       put_disk(mdev->vdisk);
-       blk_cleanup_queue(mdev->rq_queue);
-       kfree(mdev);
-}
-
-
 int __init drbd_init(void)
 {
        int err;
 
-       BUILD_BUG_ON(sizeof(struct p_header80) != sizeof(struct p_header95));
-       BUILD_BUG_ON(sizeof(struct p_handshake) != 80);
-
        if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) {
                printk(KERN_ERR
                       "drbd: invalid minor_count (%d)\n", minor_count);
 #ifdef MODULE
                return -EINVAL;
 #else
-               minor_count = 8;
+               minor_count = DRBD_MINOR_COUNT_DEF;
 #endif
        }
 
@@ -2506,6 +2742,15 @@ int __init drbd_init(void)
        rwlock_init(&global_state_lock);
        INIT_LIST_HEAD(&drbd_tconns);
 
+       retry.wq = create_singlethread_workqueue("drbd-reissue");
+       if (!retry.wq) {
+               printk(KERN_ERR "drbd: unable to create retry workqueue\n");
+               goto fail;
+       }
+       INIT_WORK(&retry.worker, do_retry);
+       spin_lock_init(&retry.lock);
+       INIT_LIST_HEAD(&retry.writes);
+
        printk(KERN_INFO "drbd: initialized. "
               "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
               API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
@@ -2554,27 +2799,6 @@ void drbd_free_sock(struct drbd_tconn *tconn)
        }
 }
 
-
-void drbd_free_resources(struct drbd_conf *mdev)
-{
-       crypto_free_hash(mdev->tconn->csums_tfm);
-       mdev->tconn->csums_tfm = NULL;
-       crypto_free_hash(mdev->tconn->verify_tfm);
-       mdev->tconn->verify_tfm = NULL;
-       crypto_free_hash(mdev->tconn->cram_hmac_tfm);
-       mdev->tconn->cram_hmac_tfm = NULL;
-       crypto_free_hash(mdev->tconn->integrity_w_tfm);
-       mdev->tconn->integrity_w_tfm = NULL;
-       crypto_free_hash(mdev->tconn->integrity_r_tfm);
-       mdev->tconn->integrity_r_tfm = NULL;
-
-       drbd_free_sock(mdev->tconn);
-
-       __no_warn(local,
-                 drbd_free_bc(mdev->ldev);
-                 mdev->ldev = NULL;);
-}
-
 /* meta data management */
 
 struct meta_data_on_disk {
@@ -2615,15 +2839,17 @@ void drbd_md_sync(struct drbd_conf *mdev)
        if (!get_ldev_if_state(mdev, D_FAILED))
                return;
 
-       mutex_lock(&mdev->md_io_mutex);
-       buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
+       buffer = drbd_md_get_buffer(mdev);
+       if (!buffer)
+               goto out;
+
        memset(buffer, 0, 512);
 
        buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
        for (i = UI_CURRENT; i < UI_SIZE; i++)
                buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
        buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
-       buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
+       buffer->magic = cpu_to_be32(DRBD_MD_MAGIC_84_UNCLEAN);
 
        buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
        buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
@@ -2637,7 +2863,7 @@ void drbd_md_sync(struct drbd_conf *mdev)
        D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
        sector = mdev->ldev->md.md_offset;
 
-       if (!drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
+       if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
                /* this was a try anyways ... */
                dev_err(DEV, "meta data update failed!\n");
                drbd_chk_io_error(mdev, 1, true);
@@ -2647,7 +2873,8 @@ void drbd_md_sync(struct drbd_conf *mdev)
         * since we updated it on metadata. */
        mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
 
-       mutex_unlock(&mdev->md_io_mutex);
+       drbd_md_put_buffer(mdev);
+out:
        put_ldev(mdev);
 }
 
@@ -2657,20 +2884,22 @@ void drbd_md_sync(struct drbd_conf *mdev)
  * @bdev:      Device from which the meta data should be read in.
  *
  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_code in case
- * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
+ * something goes wrong.
  */
 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
 {
        struct meta_data_on_disk *buffer;
+       u32 magic, flags;
        int i, rv = NO_ERROR;
 
        if (!get_ldev_if_state(mdev, D_ATTACHING))
                return ERR_IO_MD_DISK;
 
-       mutex_lock(&mdev->md_io_mutex);
-       buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
+       buffer = drbd_md_get_buffer(mdev);
+       if (!buffer)
+               goto out;
 
-       if (!drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
+       if (drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
                /* NOTE: can't do normal error processing here as this is
                   called BEFORE disk is attached */
                dev_err(DEV, "Error while reading metadata.\n");
@@ -2678,8 +2907,20 @@ int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
                goto err;
        }
 
-       if (buffer->magic != cpu_to_be32(DRBD_MD_MAGIC)) {
-               dev_err(DEV, "Error while reading metadata, magic not found.\n");
+       magic = be32_to_cpu(buffer->magic);
+       flags = be32_to_cpu(buffer->flags);
+       if (magic == DRBD_MD_MAGIC_84_UNCLEAN ||
+           (magic == DRBD_MD_MAGIC_08 && !(flags & MDF_AL_CLEAN))) {
+                       /* btw: that's Activity Log clean, not "all" clean. */
+               dev_err(DEV, "Found unclean meta data. Did you \"drbdadm apply-al\"?\n");
+               rv = ERR_MD_UNCLEAN;
+               goto err;
+       }
+       if (magic != DRBD_MD_MAGIC_08) {
+               if (magic == DRBD_MD_MAGIC_07)
+                       dev_err(DEV, "Found old (0.7) meta data magic. Did you \"drbdadm create-md\"?\n");
+               else
+                       dev_err(DEV, "Meta data magic not found. Did you \"drbdadm create-md\"?\n");
                rv = ERR_MD_INVALID;
                goto err;
        }
@@ -2713,7 +2954,6 @@ int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
        for (i = UI_CURRENT; i < UI_SIZE; i++)
                bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
        bdev->md.flags = be32_to_cpu(buffer->flags);
-       bdev->dc.al_extents = be32_to_cpu(buffer->al_nr_extents);
        bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
 
        spin_lock_irq(&mdev->tconn->req_lock);
@@ -2725,11 +2965,9 @@ int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
        }
        spin_unlock_irq(&mdev->tconn->req_lock);
 
-       if (bdev->dc.al_extents < 7)
-               bdev->dc.al_extents = 127;
-
  err:
-       mutex_unlock(&mdev->md_io_mutex);
+       drbd_md_put_buffer(mdev);
+ out:
        put_ldev(mdev);
 
        return rv;
@@ -2909,7 +3147,7 @@ static int w_bitmap_io(struct drbd_work *w, int unused)
        work->why = NULL;
        work->flags = 0;
 
-       return 1;
+       return 0;
 }
 
 void drbd_ldev_destroy(struct drbd_conf *mdev)
@@ -2935,14 +3173,14 @@ static int w_go_diskless(struct drbd_work *w, int unused)
         * the protected members anymore, though, so once put_ldev reaches zero
         * again, it will be safe to free them. */
        drbd_force_state(mdev, NS(disk, D_DISKLESS));
-       return 1;
+       return 0;
 }
 
 void drbd_go_diskless(struct drbd_conf *mdev)
 {
        D_ASSERT(mdev->state.disk == D_FAILED);
        if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
-               drbd_queue_work(&mdev->tconn->data.work, &mdev->go_diskless);
+               drbd_queue_work(&mdev->tconn->sender_work, &mdev->go_diskless);
 }
 
 /**
@@ -2980,7 +3218,7 @@ void drbd_queue_bitmap_io(struct drbd_conf *mdev,
        set_bit(BITMAP_IO, &mdev->flags);
        if (atomic_read(&mdev->ap_bio_cnt) == 0) {
                if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
-                       drbd_queue_work(&mdev->tconn->data.work, &mdev->bm_io_work.w);
+                       drbd_queue_work(&mdev->tconn->sender_work, &mdev->bm_io_work.w);
        }
        spin_unlock_irq(&mdev->tconn->req_lock);
 }
@@ -3038,7 +3276,7 @@ static void md_sync_timer_fn(unsigned long data)
 {
        struct drbd_conf *mdev = (struct drbd_conf *) data;
 
-       drbd_queue_work_front(&mdev->tconn->data.work, &mdev->md_sync_work);
+       drbd_queue_work_front(&mdev->tconn->sender_work, &mdev->md_sync_work);
 }
 
 static int w_md_sync(struct drbd_work *w, int unused)
@@ -3051,7 +3289,7 @@ static int w_md_sync(struct drbd_work *w, int unused)
                mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
 #endif
        drbd_md_sync(mdev);
-       return 1;
+       return 0;
 }
 
 const char *cmdname(enum drbd_packet cmd)
@@ -3100,14 +3338,25 @@ const char *cmdname(enum drbd_packet cmd)
                [P_DELAY_PROBE]         = "DelayProbe",
                [P_OUT_OF_SYNC]         = "OutOfSync",
                [P_RETRY_WRITE]         = "RetryWrite",
+               [P_RS_CANCEL]           = "RSCancel",
+               [P_CONN_ST_CHG_REQ]     = "conn_st_chg_req",
+               [P_CONN_ST_CHG_REPLY]   = "conn_st_chg_reply",
+               [P_RETRY_WRITE]         = "retry_write",
+               [P_PROTOCOL_UPDATE]     = "protocol_update",
+
+               /* enum drbd_packet, but not commands - obsoleted flags:
+                *      P_MAY_IGNORE
+                *      P_MAX_OPT_CMD
+                */
        };
 
-       if (cmd == P_HAND_SHAKE_M)
-               return "HandShakeM";
-       if (cmd == P_HAND_SHAKE_S)
-               return "HandShakeS";
-       if (cmd == P_HAND_SHAKE)
-               return "HandShake";
+       /* too big for the array: 0xfffX */
+       if (cmd == P_INITIAL_META)
+               return "InitialMeta";
+       if (cmd == P_INITIAL_DATA)
+               return "InitialData";
+       if (cmd == P_CONNECTION_FEATURES)
+               return "ConnectionFeatures";
        if (cmd >= ARRAY_SIZE(cmdnames))
                return "Unknown";
        return cmdnames[cmd];
@@ -3121,15 +3370,18 @@ const char *cmdname(enum drbd_packet cmd)
  */
 int drbd_wait_misc(struct drbd_conf *mdev, struct drbd_interval *i)
 {
-       struct net_conf *net_conf = mdev->tconn->net_conf;
+       struct net_conf *nc;
        DEFINE_WAIT(wait);
        long timeout;
 
-       if (!net_conf)
+       rcu_read_lock();
+       nc = rcu_dereference(mdev->tconn->net_conf);
+       if (!nc) {
+               rcu_read_unlock();
                return -ETIMEDOUT;
-       timeout = MAX_SCHEDULE_TIMEOUT;
-       if (net_conf->ko_count)
-               timeout = net_conf->timeout * HZ / 10 * net_conf->ko_count;
+       }
+       timeout = nc->ko_count ? nc->timeout * HZ / 10 * nc->ko_count : MAX_SCHEDULE_TIMEOUT;
+       rcu_read_unlock();
 
        /* Indicate to wake up mdev->misc_wait on progress.  */
        i->waiting = true;