]> git.proxmox.com Git - mirror_ubuntu-bionic-kernel.git/blame - drivers/block/drbd/drbd_receiver.c
drbd: Get rid of typedef drbd_work_cb
[mirror_ubuntu-bionic-kernel.git] / drivers / block / drbd / drbd_receiver.c
CommitLineData
b411b363
PR
1/*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
b411b363
PR
26#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
b411b363
PR
31#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
b411b363
PR
39#include <linux/pkt_sched.h>
40#define __KERNEL_SYSCALLS__
41#include <linux/unistd.h>
42#include <linux/vmalloc.h>
43#include <linux/random.h>
b411b363
PR
44#include <linux/string.h>
45#include <linux/scatterlist.h>
46#include "drbd_int.h"
b411b363
PR
47#include "drbd_req.h"
48
49#include "drbd_vli.h"
50
77351055
PR
51struct packet_info {
52 enum drbd_packet cmd;
53 int size;
54 int vnr;
55};
56
b411b363
PR
57enum finish_epoch {
58 FE_STILL_LIVE,
59 FE_DESTROYED,
60 FE_RECYCLED,
61};
62
a4fbda8e
PR
63enum mdev_or_conn {
64 MDEV,
65 CONN,
66};
67
65d11ed6 68static int drbd_do_handshake(struct drbd_tconn *tconn);
13e6037d 69static int drbd_do_auth(struct drbd_tconn *tconn);
360cc740 70static int drbd_disconnected(int vnr, void *p, void *data);
b411b363
PR
71
72static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
99920dc5 73static int e_end_block(struct drbd_work *, int);
b411b363 74
b411b363
PR
75
76#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
77
45bb912b
LE
78/*
79 * some helper functions to deal with single linked page lists,
80 * page->private being our "next" pointer.
81 */
82
83/* If at least n pages are linked at head, get n pages off.
84 * Otherwise, don't modify head, and return NULL.
85 * Locking is the responsibility of the caller.
86 */
87static struct page *page_chain_del(struct page **head, int n)
88{
89 struct page *page;
90 struct page *tmp;
91
92 BUG_ON(!n);
93 BUG_ON(!head);
94
95 page = *head;
23ce4227
PR
96
97 if (!page)
98 return NULL;
99
45bb912b
LE
100 while (page) {
101 tmp = page_chain_next(page);
102 if (--n == 0)
103 break; /* found sufficient pages */
104 if (tmp == NULL)
105 /* insufficient pages, don't use any of them. */
106 return NULL;
107 page = tmp;
108 }
109
110 /* add end of list marker for the returned list */
111 set_page_private(page, 0);
112 /* actual return value, and adjustment of head */
113 page = *head;
114 *head = tmp;
115 return page;
116}
117
118/* may be used outside of locks to find the tail of a (usually short)
119 * "private" page chain, before adding it back to a global chain head
120 * with page_chain_add() under a spinlock. */
121static struct page *page_chain_tail(struct page *page, int *len)
122{
123 struct page *tmp;
124 int i = 1;
125 while ((tmp = page_chain_next(page)))
126 ++i, page = tmp;
127 if (len)
128 *len = i;
129 return page;
130}
131
132static int page_chain_free(struct page *page)
133{
134 struct page *tmp;
135 int i = 0;
136 page_chain_for_each_safe(page, tmp) {
137 put_page(page);
138 ++i;
139 }
140 return i;
141}
142
143static void page_chain_add(struct page **head,
144 struct page *chain_first, struct page *chain_last)
145{
146#if 1
147 struct page *tmp;
148 tmp = page_chain_tail(chain_first, NULL);
149 BUG_ON(tmp != chain_last);
150#endif
151
152 /* add chain to head */
153 set_page_private(chain_last, (unsigned long)*head);
154 *head = chain_first;
155}
156
157static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
b411b363
PR
158{
159 struct page *page = NULL;
45bb912b
LE
160 struct page *tmp = NULL;
161 int i = 0;
b411b363
PR
162
163 /* Yes, testing drbd_pp_vacant outside the lock is racy.
164 * So what. It saves a spin_lock. */
45bb912b 165 if (drbd_pp_vacant >= number) {
b411b363 166 spin_lock(&drbd_pp_lock);
45bb912b
LE
167 page = page_chain_del(&drbd_pp_pool, number);
168 if (page)
169 drbd_pp_vacant -= number;
b411b363 170 spin_unlock(&drbd_pp_lock);
45bb912b
LE
171 if (page)
172 return page;
b411b363 173 }
45bb912b 174
b411b363
PR
175 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
176 * "criss-cross" setup, that might cause write-out on some other DRBD,
177 * which in turn might block on the other node at this very place. */
45bb912b
LE
178 for (i = 0; i < number; i++) {
179 tmp = alloc_page(GFP_TRY);
180 if (!tmp)
181 break;
182 set_page_private(tmp, (unsigned long)page);
183 page = tmp;
184 }
185
186 if (i == number)
187 return page;
188
189 /* Not enough pages immediately available this time.
190 * No need to jump around here, drbd_pp_alloc will retry this
191 * function "soon". */
192 if (page) {
193 tmp = page_chain_tail(page, NULL);
194 spin_lock(&drbd_pp_lock);
195 page_chain_add(&drbd_pp_pool, page, tmp);
196 drbd_pp_vacant += i;
197 spin_unlock(&drbd_pp_lock);
198 }
199 return NULL;
b411b363
PR
200}
201
b411b363
PR
202static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
203{
db830c46 204 struct drbd_peer_request *peer_req;
b411b363
PR
205 struct list_head *le, *tle;
206
207 /* The EEs are always appended to the end of the list. Since
208 they are sent in order over the wire, they have to finish
209 in order. As soon as we see the first not finished we can
210 stop to examine the list... */
211
212 list_for_each_safe(le, tle, &mdev->net_ee) {
db830c46
AG
213 peer_req = list_entry(le, struct drbd_peer_request, w.list);
214 if (drbd_ee_has_active_page(peer_req))
b411b363
PR
215 break;
216 list_move(le, to_be_freed);
217 }
218}
219
220static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
221{
222 LIST_HEAD(reclaimed);
db830c46 223 struct drbd_peer_request *peer_req, *t;
b411b363 224
87eeee41 225 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 226 reclaim_net_ee(mdev, &reclaimed);
87eeee41 227 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 228
db830c46
AG
229 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
230 drbd_free_net_ee(mdev, peer_req);
b411b363
PR
231}
232
233/**
45bb912b 234 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
b411b363 235 * @mdev: DRBD device.
45bb912b
LE
236 * @number: number of pages requested
237 * @retry: whether to retry, if not enough pages are available right now
238 *
239 * Tries to allocate number pages, first from our own page pool, then from
240 * the kernel, unless this allocation would exceed the max_buffers setting.
241 * Possibly retry until DRBD frees sufficient pages somewhere else.
b411b363 242 *
45bb912b 243 * Returns a page chain linked via page->private.
b411b363 244 */
45bb912b 245static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
b411b363
PR
246{
247 struct page *page = NULL;
248 DEFINE_WAIT(wait);
249
45bb912b
LE
250 /* Yes, we may run up to @number over max_buffers. If we
251 * follow it strictly, the admin will get it wrong anyways. */
89e58e75 252 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers)
45bb912b 253 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363 254
45bb912b 255 while (page == NULL) {
b411b363
PR
256 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
257
258 drbd_kick_lo_and_reclaim_net(mdev);
259
89e58e75 260 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers) {
45bb912b 261 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363
PR
262 if (page)
263 break;
264 }
265
266 if (!retry)
267 break;
268
269 if (signal_pending(current)) {
270 dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
271 break;
272 }
273
274 schedule();
275 }
276 finish_wait(&drbd_pp_wait, &wait);
277
45bb912b
LE
278 if (page)
279 atomic_add(number, &mdev->pp_in_use);
b411b363
PR
280 return page;
281}
282
283/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
87eeee41 284 * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
45bb912b
LE
285 * Either links the page chain back to the global pool,
286 * or returns all pages to the system. */
435f0740 287static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
b411b363 288{
435f0740 289 atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
b411b363 290 int i;
435f0740 291
81a5d60e 292 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
45bb912b
LE
293 i = page_chain_free(page);
294 else {
295 struct page *tmp;
296 tmp = page_chain_tail(page, &i);
297 spin_lock(&drbd_pp_lock);
298 page_chain_add(&drbd_pp_pool, page, tmp);
299 drbd_pp_vacant += i;
300 spin_unlock(&drbd_pp_lock);
b411b363 301 }
435f0740 302 i = atomic_sub_return(i, a);
45bb912b 303 if (i < 0)
435f0740
LE
304 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
305 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
b411b363
PR
306 wake_up(&drbd_pp_wait);
307}
308
309/*
310You need to hold the req_lock:
311 _drbd_wait_ee_list_empty()
312
313You must not have the req_lock:
314 drbd_free_ee()
315 drbd_alloc_ee()
316 drbd_init_ee()
317 drbd_release_ee()
318 drbd_ee_fix_bhs()
319 drbd_process_done_ee()
320 drbd_clear_done_ee()
321 drbd_wait_ee_list_empty()
322*/
323
f6ffca9f
AG
324struct drbd_peer_request *
325drbd_alloc_ee(struct drbd_conf *mdev, u64 id, sector_t sector,
326 unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
b411b363 327{
db830c46 328 struct drbd_peer_request *peer_req;
b411b363 329 struct page *page;
45bb912b 330 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
b411b363 331
0cf9d27e 332 if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
b411b363
PR
333 return NULL;
334
db830c46
AG
335 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
336 if (!peer_req) {
b411b363
PR
337 if (!(gfp_mask & __GFP_NOWARN))
338 dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
339 return NULL;
340 }
341
45bb912b
LE
342 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
343 if (!page)
344 goto fail;
b411b363 345
db830c46
AG
346 drbd_clear_interval(&peer_req->i);
347 peer_req->i.size = data_size;
348 peer_req->i.sector = sector;
349 peer_req->i.local = false;
350 peer_req->i.waiting = false;
351
352 peer_req->epoch = NULL;
a21e9298 353 peer_req->w.mdev = mdev;
db830c46
AG
354 peer_req->pages = page;
355 atomic_set(&peer_req->pending_bios, 0);
356 peer_req->flags = 0;
9a8e7753
AG
357 /*
358 * The block_id is opaque to the receiver. It is not endianness
359 * converted, and sent back to the sender unchanged.
360 */
db830c46 361 peer_req->block_id = id;
b411b363 362
db830c46 363 return peer_req;
b411b363 364
45bb912b 365 fail:
db830c46 366 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
367 return NULL;
368}
369
db830c46 370void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
f6ffca9f 371 int is_net)
b411b363 372{
db830c46
AG
373 if (peer_req->flags & EE_HAS_DIGEST)
374 kfree(peer_req->digest);
375 drbd_pp_free(mdev, peer_req->pages, is_net);
376 D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
377 D_ASSERT(drbd_interval_empty(&peer_req->i));
378 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
379}
380
381int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
382{
383 LIST_HEAD(work_list);
db830c46 384 struct drbd_peer_request *peer_req, *t;
b411b363 385 int count = 0;
435f0740 386 int is_net = list == &mdev->net_ee;
b411b363 387
87eeee41 388 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 389 list_splice_init(list, &work_list);
87eeee41 390 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 391
db830c46
AG
392 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
393 drbd_free_some_ee(mdev, peer_req, is_net);
b411b363
PR
394 count++;
395 }
396 return count;
397}
398
399
32862ec7 400/* See also comments in _req_mod(,BARRIER_ACKED)
b411b363
PR
401 * and receive_Barrier.
402 *
403 * Move entries from net_ee to done_ee, if ready.
404 * Grab done_ee, call all callbacks, free the entries.
405 * The callbacks typically send out ACKs.
406 */
407static int drbd_process_done_ee(struct drbd_conf *mdev)
408{
409 LIST_HEAD(work_list);
410 LIST_HEAD(reclaimed);
db830c46 411 struct drbd_peer_request *peer_req, *t;
e2b3032b 412 int err = 0;
b411b363 413
87eeee41 414 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
415 reclaim_net_ee(mdev, &reclaimed);
416 list_splice_init(&mdev->done_ee, &work_list);
87eeee41 417 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 418
db830c46
AG
419 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
420 drbd_free_net_ee(mdev, peer_req);
b411b363
PR
421
422 /* possible callbacks here:
7be8da07 423 * e_end_block, and e_end_resync_block, e_send_discard_write.
b411b363
PR
424 * all ignore the last argument.
425 */
db830c46 426 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
e2b3032b
AG
427 int err2;
428
b411b363 429 /* list_del not necessary, next/prev members not touched */
e2b3032b
AG
430 err2 = peer_req->w.cb(&peer_req->w, !!err);
431 if (!err)
432 err = err2;
db830c46 433 drbd_free_ee(mdev, peer_req);
b411b363
PR
434 }
435 wake_up(&mdev->ee_wait);
436
e2b3032b 437 return err;
b411b363
PR
438}
439
440void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
441{
442 DEFINE_WAIT(wait);
443
444 /* avoids spin_lock/unlock
445 * and calling prepare_to_wait in the fast path */
446 while (!list_empty(head)) {
447 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
87eeee41 448 spin_unlock_irq(&mdev->tconn->req_lock);
7eaceacc 449 io_schedule();
b411b363 450 finish_wait(&mdev->ee_wait, &wait);
87eeee41 451 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
452 }
453}
454
455void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
456{
87eeee41 457 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 458 _drbd_wait_ee_list_empty(mdev, head);
87eeee41 459 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
460}
461
462/* see also kernel_accept; which is only present since 2.6.18.
463 * also we want to log which part of it failed, exactly */
7653620d 464static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
b411b363
PR
465{
466 struct sock *sk = sock->sk;
467 int err = 0;
468
469 *what = "listen";
470 err = sock->ops->listen(sock, 5);
471 if (err < 0)
472 goto out;
473
474 *what = "sock_create_lite";
475 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
476 newsock);
477 if (err < 0)
478 goto out;
479
480 *what = "accept";
481 err = sock->ops->accept(sock, *newsock, 0);
482 if (err < 0) {
483 sock_release(*newsock);
484 *newsock = NULL;
485 goto out;
486 }
487 (*newsock)->ops = sock->ops;
488
489out:
490 return err;
491}
492
dbd9eea0 493static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
b411b363
PR
494{
495 mm_segment_t oldfs;
496 struct kvec iov = {
497 .iov_base = buf,
498 .iov_len = size,
499 };
500 struct msghdr msg = {
501 .msg_iovlen = 1,
502 .msg_iov = (struct iovec *)&iov,
503 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
504 };
505 int rv;
506
507 oldfs = get_fs();
508 set_fs(KERNEL_DS);
509 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
510 set_fs(oldfs);
511
512 return rv;
513}
514
de0ff338 515static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
b411b363
PR
516{
517 mm_segment_t oldfs;
518 struct kvec iov = {
519 .iov_base = buf,
520 .iov_len = size,
521 };
522 struct msghdr msg = {
523 .msg_iovlen = 1,
524 .msg_iov = (struct iovec *)&iov,
525 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
526 };
527 int rv;
528
529 oldfs = get_fs();
530 set_fs(KERNEL_DS);
531
532 for (;;) {
de0ff338 533 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
b411b363
PR
534 if (rv == size)
535 break;
536
537 /* Note:
538 * ECONNRESET other side closed the connection
539 * ERESTARTSYS (on sock) we got a signal
540 */
541
542 if (rv < 0) {
543 if (rv == -ECONNRESET)
de0ff338 544 conn_info(tconn, "sock was reset by peer\n");
b411b363 545 else if (rv != -ERESTARTSYS)
de0ff338 546 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
b411b363
PR
547 break;
548 } else if (rv == 0) {
de0ff338 549 conn_info(tconn, "sock was shut down by peer\n");
b411b363
PR
550 break;
551 } else {
552 /* signal came in, or peer/link went down,
553 * after we read a partial message
554 */
555 /* D_ASSERT(signal_pending(current)); */
556 break;
557 }
558 };
559
560 set_fs(oldfs);
561
562 if (rv != size)
bbeb641c 563 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
b411b363
PR
564
565 return rv;
566}
567
c6967746
AG
568static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
569{
570 int err;
571
572 err = drbd_recv(tconn, buf, size);
573 if (err != size) {
574 if (err >= 0)
575 err = -EIO;
576 } else
577 err = 0;
578 return err;
579}
580
5dbf1673
LE
581/* quoting tcp(7):
582 * On individual connections, the socket buffer size must be set prior to the
583 * listen(2) or connect(2) calls in order to have it take effect.
584 * This is our wrapper to do so.
585 */
586static void drbd_setbufsize(struct socket *sock, unsigned int snd,
587 unsigned int rcv)
588{
589 /* open coded SO_SNDBUF, SO_RCVBUF */
590 if (snd) {
591 sock->sk->sk_sndbuf = snd;
592 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
593 }
594 if (rcv) {
595 sock->sk->sk_rcvbuf = rcv;
596 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
597 }
598}
599
eac3e990 600static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
b411b363
PR
601{
602 const char *what;
603 struct socket *sock;
604 struct sockaddr_in6 src_in6;
605 int err;
606 int disconnect_on_error = 1;
607
eac3e990 608 if (!get_net_conf(tconn))
b411b363
PR
609 return NULL;
610
611 what = "sock_create_kern";
eac3e990 612 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
b411b363
PR
613 SOCK_STREAM, IPPROTO_TCP, &sock);
614 if (err < 0) {
615 sock = NULL;
616 goto out;
617 }
618
619 sock->sk->sk_rcvtimeo =
eac3e990
PR
620 sock->sk->sk_sndtimeo = tconn->net_conf->try_connect_int*HZ;
621 drbd_setbufsize(sock, tconn->net_conf->sndbuf_size,
622 tconn->net_conf->rcvbuf_size);
b411b363
PR
623
624 /* explicitly bind to the configured IP as source IP
625 * for the outgoing connections.
626 * This is needed for multihomed hosts and to be
627 * able to use lo: interfaces for drbd.
628 * Make sure to use 0 as port number, so linux selects
629 * a free one dynamically.
630 */
eac3e990
PR
631 memcpy(&src_in6, tconn->net_conf->my_addr,
632 min_t(int, tconn->net_conf->my_addr_len, sizeof(src_in6)));
633 if (((struct sockaddr *)tconn->net_conf->my_addr)->sa_family == AF_INET6)
b411b363
PR
634 src_in6.sin6_port = 0;
635 else
636 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
637
638 what = "bind before connect";
639 err = sock->ops->bind(sock,
640 (struct sockaddr *) &src_in6,
eac3e990 641 tconn->net_conf->my_addr_len);
b411b363
PR
642 if (err < 0)
643 goto out;
644
645 /* connect may fail, peer not yet available.
646 * stay C_WF_CONNECTION, don't go Disconnecting! */
647 disconnect_on_error = 0;
648 what = "connect";
649 err = sock->ops->connect(sock,
eac3e990
PR
650 (struct sockaddr *)tconn->net_conf->peer_addr,
651 tconn->net_conf->peer_addr_len, 0);
b411b363
PR
652
653out:
654 if (err < 0) {
655 if (sock) {
656 sock_release(sock);
657 sock = NULL;
658 }
659 switch (-err) {
660 /* timeout, busy, signal pending */
661 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
662 case EINTR: case ERESTARTSYS:
663 /* peer not (yet) available, network problem */
664 case ECONNREFUSED: case ENETUNREACH:
665 case EHOSTDOWN: case EHOSTUNREACH:
666 disconnect_on_error = 0;
667 break;
668 default:
eac3e990 669 conn_err(tconn, "%s failed, err = %d\n", what, err);
b411b363
PR
670 }
671 if (disconnect_on_error)
bbeb641c 672 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 673 }
eac3e990 674 put_net_conf(tconn);
b411b363
PR
675 return sock;
676}
677
7653620d 678static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
b411b363
PR
679{
680 int timeo, err;
681 struct socket *s_estab = NULL, *s_listen;
682 const char *what;
683
7653620d 684 if (!get_net_conf(tconn))
b411b363
PR
685 return NULL;
686
687 what = "sock_create_kern";
7653620d 688 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
b411b363
PR
689 SOCK_STREAM, IPPROTO_TCP, &s_listen);
690 if (err) {
691 s_listen = NULL;
692 goto out;
693 }
694
7653620d 695 timeo = tconn->net_conf->try_connect_int * HZ;
b411b363
PR
696 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
697
698 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
699 s_listen->sk->sk_rcvtimeo = timeo;
700 s_listen->sk->sk_sndtimeo = timeo;
7653620d
PR
701 drbd_setbufsize(s_listen, tconn->net_conf->sndbuf_size,
702 tconn->net_conf->rcvbuf_size);
b411b363
PR
703
704 what = "bind before listen";
705 err = s_listen->ops->bind(s_listen,
7653620d
PR
706 (struct sockaddr *) tconn->net_conf->my_addr,
707 tconn->net_conf->my_addr_len);
b411b363
PR
708 if (err < 0)
709 goto out;
710
7653620d 711 err = drbd_accept(&what, s_listen, &s_estab);
b411b363
PR
712
713out:
714 if (s_listen)
715 sock_release(s_listen);
716 if (err < 0) {
717 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
7653620d 718 conn_err(tconn, "%s failed, err = %d\n", what, err);
bbeb641c 719 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
720 }
721 }
7653620d 722 put_net_conf(tconn);
b411b363
PR
723
724 return s_estab;
725}
726
d38e787e 727static int drbd_send_fp(struct drbd_tconn *tconn, struct socket *sock, enum drbd_packet cmd)
b411b363 728{
d38e787e 729 struct p_header *h = &tconn->data.sbuf.header;
b411b363 730
ecf2363c 731 return !_conn_send_cmd(tconn, 0, sock, cmd, h, sizeof(*h), 0);
b411b363
PR
732}
733
a25b63f1 734static enum drbd_packet drbd_recv_fp(struct drbd_tconn *tconn, struct socket *sock)
b411b363 735{
a25b63f1 736 struct p_header80 *h = &tconn->data.rbuf.header.h80;
b411b363
PR
737 int rr;
738
dbd9eea0 739 rr = drbd_recv_short(sock, h, sizeof(*h), 0);
b411b363 740
ca9bc12b 741 if (rr == sizeof(*h) && h->magic == cpu_to_be32(DRBD_MAGIC))
b411b363
PR
742 return be16_to_cpu(h->command);
743
744 return 0xffff;
745}
746
747/**
748 * drbd_socket_okay() - Free the socket if its connection is not okay
b411b363
PR
749 * @sock: pointer to the pointer to the socket.
750 */
dbd9eea0 751static int drbd_socket_okay(struct socket **sock)
b411b363
PR
752{
753 int rr;
754 char tb[4];
755
756 if (!*sock)
81e84650 757 return false;
b411b363 758
dbd9eea0 759 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
b411b363
PR
760
761 if (rr > 0 || rr == -EAGAIN) {
81e84650 762 return true;
b411b363
PR
763 } else {
764 sock_release(*sock);
765 *sock = NULL;
81e84650 766 return false;
b411b363
PR
767 }
768}
2325eb66
PR
769/* Gets called if a connection is established, or if a new minor gets created
770 in a connection */
771int drbd_connected(int vnr, void *p, void *data)
907599e0
PR
772{
773 struct drbd_conf *mdev = (struct drbd_conf *)p;
774 int ok = 1;
775
776 atomic_set(&mdev->packet_seq, 0);
777 mdev->peer_seq = 0;
778
8410da8f
PR
779 mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
780 &mdev->tconn->cstate_mutex :
781 &mdev->own_state_mutex;
782
103ea275 783 ok &= !drbd_send_sync_param(mdev);
f02d4d0a 784 ok &= !drbd_send_sizes(mdev, 0, 0);
2ae5f95b 785 ok &= !drbd_send_uuids(mdev);
927036f9 786 ok &= !drbd_send_state(mdev);
907599e0
PR
787 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
788 clear_bit(RESIZE_PENDING, &mdev->flags);
789
8410da8f 790
907599e0
PR
791 return !ok;
792}
793
b411b363
PR
794/*
795 * return values:
796 * 1 yes, we have a valid connection
797 * 0 oops, did not work out, please try again
798 * -1 peer talks different language,
799 * no point in trying again, please go standalone.
800 * -2 We do not have a network config...
801 */
907599e0 802static int drbd_connect(struct drbd_tconn *tconn)
b411b363
PR
803{
804 struct socket *s, *sock, *msock;
805 int try, h, ok;
806
bbeb641c 807 if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
b411b363
PR
808 return -2;
809
907599e0
PR
810 clear_bit(DISCARD_CONCURRENT, &tconn->flags);
811 tconn->agreed_pro_version = 99;
fd340c12
PR
812 /* agreed_pro_version must be smaller than 100 so we send the old
813 header (h80) in the first packet and in the handshake packet. */
b411b363
PR
814
815 sock = NULL;
816 msock = NULL;
817
818 do {
819 for (try = 0;;) {
820 /* 3 tries, this should take less than a second! */
907599e0 821 s = drbd_try_connect(tconn);
b411b363
PR
822 if (s || ++try >= 3)
823 break;
824 /* give the other side time to call bind() & listen() */
20ee6390 825 schedule_timeout_interruptible(HZ / 10);
b411b363
PR
826 }
827
828 if (s) {
829 if (!sock) {
907599e0 830 drbd_send_fp(tconn, s, P_HAND_SHAKE_S);
b411b363
PR
831 sock = s;
832 s = NULL;
833 } else if (!msock) {
907599e0 834 drbd_send_fp(tconn, s, P_HAND_SHAKE_M);
b411b363
PR
835 msock = s;
836 s = NULL;
837 } else {
907599e0 838 conn_err(tconn, "Logic error in drbd_connect()\n");
b411b363
PR
839 goto out_release_sockets;
840 }
841 }
842
843 if (sock && msock) {
907599e0 844 schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
dbd9eea0
PR
845 ok = drbd_socket_okay(&sock);
846 ok = drbd_socket_okay(&msock) && ok;
b411b363
PR
847 if (ok)
848 break;
849 }
850
851retry:
907599e0 852 s = drbd_wait_for_connect(tconn);
b411b363 853 if (s) {
907599e0 854 try = drbd_recv_fp(tconn, s);
dbd9eea0
PR
855 drbd_socket_okay(&sock);
856 drbd_socket_okay(&msock);
b411b363
PR
857 switch (try) {
858 case P_HAND_SHAKE_S:
859 if (sock) {
907599e0 860 conn_warn(tconn, "initial packet S crossed\n");
b411b363
PR
861 sock_release(sock);
862 }
863 sock = s;
864 break;
865 case P_HAND_SHAKE_M:
866 if (msock) {
907599e0 867 conn_warn(tconn, "initial packet M crossed\n");
b411b363
PR
868 sock_release(msock);
869 }
870 msock = s;
907599e0 871 set_bit(DISCARD_CONCURRENT, &tconn->flags);
b411b363
PR
872 break;
873 default:
907599e0 874 conn_warn(tconn, "Error receiving initial packet\n");
b411b363
PR
875 sock_release(s);
876 if (random32() & 1)
877 goto retry;
878 }
879 }
880
bbeb641c 881 if (tconn->cstate <= C_DISCONNECTING)
b411b363
PR
882 goto out_release_sockets;
883 if (signal_pending(current)) {
884 flush_signals(current);
885 smp_rmb();
907599e0 886 if (get_t_state(&tconn->receiver) == EXITING)
b411b363
PR
887 goto out_release_sockets;
888 }
889
890 if (sock && msock) {
dbd9eea0
PR
891 ok = drbd_socket_okay(&sock);
892 ok = drbd_socket_okay(&msock) && ok;
b411b363
PR
893 if (ok)
894 break;
895 }
896 } while (1);
897
898 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
899 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
900
901 sock->sk->sk_allocation = GFP_NOIO;
902 msock->sk->sk_allocation = GFP_NOIO;
903
904 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
905 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
906
b411b363 907 /* NOT YET ...
907599e0 908 * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
b411b363
PR
909 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
910 * first set it to the P_HAND_SHAKE timeout,
911 * which we set to 4x the configured ping_timeout. */
912 sock->sk->sk_sndtimeo =
907599e0 913 sock->sk->sk_rcvtimeo = tconn->net_conf->ping_timeo*4*HZ/10;
b411b363 914
907599e0
PR
915 msock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
916 msock->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
b411b363
PR
917
918 /* we don't want delays.
25985edc 919 * we use TCP_CORK where appropriate, though */
b411b363
PR
920 drbd_tcp_nodelay(sock);
921 drbd_tcp_nodelay(msock);
922
907599e0
PR
923 tconn->data.socket = sock;
924 tconn->meta.socket = msock;
925 tconn->last_received = jiffies;
b411b363 926
907599e0 927 h = drbd_do_handshake(tconn);
b411b363
PR
928 if (h <= 0)
929 return h;
930
907599e0 931 if (tconn->cram_hmac_tfm) {
b411b363 932 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
907599e0 933 switch (drbd_do_auth(tconn)) {
b10d96cb 934 case -1:
907599e0 935 conn_err(tconn, "Authentication of peer failed\n");
b411b363 936 return -1;
b10d96cb 937 case 0:
907599e0 938 conn_err(tconn, "Authentication of peer failed, trying again.\n");
b10d96cb 939 return 0;
b411b363
PR
940 }
941 }
942
bbeb641c 943 if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
b411b363
PR
944 return 0;
945
907599e0 946 sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
b411b363
PR
947 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
948
907599e0 949 drbd_thread_start(&tconn->asender);
b411b363 950
387eb308 951 if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
7e2455c1 952 return -1;
b411b363 953
907599e0 954 return !idr_for_each(&tconn->volumes, drbd_connected, tconn);
b411b363
PR
955
956out_release_sockets:
957 if (sock)
958 sock_release(sock);
959 if (msock)
960 sock_release(msock);
961 return -1;
962}
963
8172f3e9 964static int decode_header(struct drbd_tconn *tconn, struct p_header *h, struct packet_info *pi)
b411b363 965{
fd340c12 966 if (h->h80.magic == cpu_to_be32(DRBD_MAGIC)) {
77351055
PR
967 pi->cmd = be16_to_cpu(h->h80.command);
968 pi->size = be16_to_cpu(h->h80.length);
eefc2f7d 969 pi->vnr = 0;
ca9bc12b 970 } else if (h->h95.magic == cpu_to_be16(DRBD_MAGIC_BIG)) {
77351055
PR
971 pi->cmd = be16_to_cpu(h->h95.command);
972 pi->size = be32_to_cpu(h->h95.length) & 0x00ffffff;
973 pi->vnr = 0;
02918be2 974 } else {
ce243853 975 conn_err(tconn, "magic?? on data m: 0x%08x c: %d l: %d\n",
004352fa
LE
976 be32_to_cpu(h->h80.magic),
977 be16_to_cpu(h->h80.command),
978 be16_to_cpu(h->h80.length));
8172f3e9 979 return -EINVAL;
b411b363 980 }
8172f3e9 981 return 0;
257d0af6
PR
982}
983
9ba7aa00 984static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
257d0af6 985{
9ba7aa00 986 struct p_header *h = &tconn->data.rbuf.header;
69bc7bc3 987 int err;
257d0af6 988
69bc7bc3
AG
989 err = drbd_recv(tconn, h, sizeof(*h));
990 if (unlikely(err != sizeof(*h))) {
257d0af6 991 if (!signal_pending(current))
69bc7bc3
AG
992 conn_warn(tconn, "short read expecting header on sock: r=%d\n", err);
993 if (err >= 0)
994 err = -EIO;
995 return err;
257d0af6
PR
996 }
997
69bc7bc3 998 err = decode_header(tconn, h, pi);
9ba7aa00 999 tconn->last_received = jiffies;
b411b363 1000
69bc7bc3 1001 return err;
b411b363
PR
1002}
1003
2451fc3b 1004static void drbd_flush(struct drbd_conf *mdev)
b411b363
PR
1005{
1006 int rv;
1007
1008 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
fbd9b09a 1009 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
dd3932ed 1010 NULL);
b411b363
PR
1011 if (rv) {
1012 dev_err(DEV, "local disk flush failed with status %d\n", rv);
1013 /* would rather check on EOPNOTSUPP, but that is not reliable.
1014 * don't try again for ANY return value != 0
1015 * if (rv == -EOPNOTSUPP) */
1016 drbd_bump_write_ordering(mdev, WO_drain_io);
1017 }
1018 put_ldev(mdev);
1019 }
b411b363
PR
1020}
1021
1022/**
1023 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1024 * @mdev: DRBD device.
1025 * @epoch: Epoch object.
1026 * @ev: Epoch event.
1027 */
1028static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1029 struct drbd_epoch *epoch,
1030 enum epoch_event ev)
1031{
2451fc3b 1032 int epoch_size;
b411b363 1033 struct drbd_epoch *next_epoch;
b411b363
PR
1034 enum finish_epoch rv = FE_STILL_LIVE;
1035
1036 spin_lock(&mdev->epoch_lock);
1037 do {
1038 next_epoch = NULL;
b411b363
PR
1039
1040 epoch_size = atomic_read(&epoch->epoch_size);
1041
1042 switch (ev & ~EV_CLEANUP) {
1043 case EV_PUT:
1044 atomic_dec(&epoch->active);
1045 break;
1046 case EV_GOT_BARRIER_NR:
1047 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
b411b363
PR
1048 break;
1049 case EV_BECAME_LAST:
1050 /* nothing to do*/
1051 break;
1052 }
1053
b411b363
PR
1054 if (epoch_size != 0 &&
1055 atomic_read(&epoch->active) == 0 &&
2451fc3b 1056 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
b411b363
PR
1057 if (!(ev & EV_CLEANUP)) {
1058 spin_unlock(&mdev->epoch_lock);
1059 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1060 spin_lock(&mdev->epoch_lock);
1061 }
1062 dec_unacked(mdev);
1063
1064 if (mdev->current_epoch != epoch) {
1065 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1066 list_del(&epoch->list);
1067 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1068 mdev->epochs--;
b411b363
PR
1069 kfree(epoch);
1070
1071 if (rv == FE_STILL_LIVE)
1072 rv = FE_DESTROYED;
1073 } else {
1074 epoch->flags = 0;
1075 atomic_set(&epoch->epoch_size, 0);
698f9315 1076 /* atomic_set(&epoch->active, 0); is already zero */
b411b363
PR
1077 if (rv == FE_STILL_LIVE)
1078 rv = FE_RECYCLED;
2451fc3b 1079 wake_up(&mdev->ee_wait);
b411b363
PR
1080 }
1081 }
1082
1083 if (!next_epoch)
1084 break;
1085
1086 epoch = next_epoch;
1087 } while (1);
1088
1089 spin_unlock(&mdev->epoch_lock);
1090
b411b363
PR
1091 return rv;
1092}
1093
1094/**
1095 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1096 * @mdev: DRBD device.
1097 * @wo: Write ordering method to try.
1098 */
1099void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1100{
1101 enum write_ordering_e pwo;
1102 static char *write_ordering_str[] = {
1103 [WO_none] = "none",
1104 [WO_drain_io] = "drain",
1105 [WO_bdev_flush] = "flush",
b411b363
PR
1106 };
1107
1108 pwo = mdev->write_ordering;
1109 wo = min(pwo, wo);
b411b363
PR
1110 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1111 wo = WO_drain_io;
1112 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1113 wo = WO_none;
1114 mdev->write_ordering = wo;
2451fc3b 1115 if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
b411b363
PR
1116 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1117}
1118
45bb912b 1119/**
fbe29dec 1120 * drbd_submit_peer_request()
45bb912b 1121 * @mdev: DRBD device.
db830c46 1122 * @peer_req: peer request
45bb912b 1123 * @rw: flag field, see bio->bi_rw
10f6d992
LE
1124 *
1125 * May spread the pages to multiple bios,
1126 * depending on bio_add_page restrictions.
1127 *
1128 * Returns 0 if all bios have been submitted,
1129 * -ENOMEM if we could not allocate enough bios,
1130 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1131 * single page to an empty bio (which should never happen and likely indicates
1132 * that the lower level IO stack is in some way broken). This has been observed
1133 * on certain Xen deployments.
45bb912b
LE
1134 */
1135/* TODO allocate from our own bio_set. */
fbe29dec
AG
1136int drbd_submit_peer_request(struct drbd_conf *mdev,
1137 struct drbd_peer_request *peer_req,
1138 const unsigned rw, const int fault_type)
45bb912b
LE
1139{
1140 struct bio *bios = NULL;
1141 struct bio *bio;
db830c46
AG
1142 struct page *page = peer_req->pages;
1143 sector_t sector = peer_req->i.sector;
1144 unsigned ds = peer_req->i.size;
45bb912b
LE
1145 unsigned n_bios = 0;
1146 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
10f6d992 1147 int err = -ENOMEM;
45bb912b
LE
1148
1149 /* In most cases, we will only need one bio. But in case the lower
1150 * level restrictions happen to be different at this offset on this
1151 * side than those of the sending peer, we may need to submit the
da4a75d2
LE
1152 * request in more than one bio.
1153 *
1154 * Plain bio_alloc is good enough here, this is no DRBD internally
1155 * generated bio, but a bio allocated on behalf of the peer.
1156 */
45bb912b
LE
1157next_bio:
1158 bio = bio_alloc(GFP_NOIO, nr_pages);
1159 if (!bio) {
1160 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1161 goto fail;
1162 }
db830c46 1163 /* > peer_req->i.sector, unless this is the first bio */
45bb912b
LE
1164 bio->bi_sector = sector;
1165 bio->bi_bdev = mdev->ldev->backing_bdev;
45bb912b 1166 bio->bi_rw = rw;
db830c46 1167 bio->bi_private = peer_req;
fcefa62e 1168 bio->bi_end_io = drbd_peer_request_endio;
45bb912b
LE
1169
1170 bio->bi_next = bios;
1171 bios = bio;
1172 ++n_bios;
1173
1174 page_chain_for_each(page) {
1175 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1176 if (!bio_add_page(bio, page, len, 0)) {
10f6d992
LE
1177 /* A single page must always be possible!
1178 * But in case it fails anyways,
1179 * we deal with it, and complain (below). */
1180 if (bio->bi_vcnt == 0) {
1181 dev_err(DEV,
1182 "bio_add_page failed for len=%u, "
1183 "bi_vcnt=0 (bi_sector=%llu)\n",
1184 len, (unsigned long long)bio->bi_sector);
1185 err = -ENOSPC;
1186 goto fail;
1187 }
45bb912b
LE
1188 goto next_bio;
1189 }
1190 ds -= len;
1191 sector += len >> 9;
1192 --nr_pages;
1193 }
1194 D_ASSERT(page == NULL);
1195 D_ASSERT(ds == 0);
1196
db830c46 1197 atomic_set(&peer_req->pending_bios, n_bios);
45bb912b
LE
1198 do {
1199 bio = bios;
1200 bios = bios->bi_next;
1201 bio->bi_next = NULL;
1202
45bb912b 1203 drbd_generic_make_request(mdev, fault_type, bio);
45bb912b 1204 } while (bios);
45bb912b
LE
1205 return 0;
1206
1207fail:
1208 while (bios) {
1209 bio = bios;
1210 bios = bios->bi_next;
1211 bio_put(bio);
1212 }
10f6d992 1213 return err;
45bb912b
LE
1214}
1215
53840641 1216static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
db830c46 1217 struct drbd_peer_request *peer_req)
53840641 1218{
db830c46 1219 struct drbd_interval *i = &peer_req->i;
53840641
AG
1220
1221 drbd_remove_interval(&mdev->write_requests, i);
1222 drbd_clear_interval(i);
1223
6c852bec 1224 /* Wake up any processes waiting for this peer request to complete. */
53840641
AG
1225 if (i->waiting)
1226 wake_up(&mdev->misc_wait);
1227}
1228
d8763023
AG
1229static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packet cmd,
1230 unsigned int data_size)
b411b363 1231{
2451fc3b 1232 int rv;
e42325a5 1233 struct p_barrier *p = &mdev->tconn->data.rbuf.barrier;
b411b363
PR
1234 struct drbd_epoch *epoch;
1235
b411b363
PR
1236 inc_unacked(mdev);
1237
b411b363
PR
1238 mdev->current_epoch->barrier_nr = p->barrier;
1239 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1240
1241 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1242 * the activity log, which means it would not be resynced in case the
1243 * R_PRIMARY crashes now.
1244 * Therefore we must send the barrier_ack after the barrier request was
1245 * completed. */
1246 switch (mdev->write_ordering) {
b411b363
PR
1247 case WO_none:
1248 if (rv == FE_RECYCLED)
82bc0194 1249 return 0;
2451fc3b
PR
1250
1251 /* receiver context, in the writeout path of the other node.
1252 * avoid potential distributed deadlock */
1253 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1254 if (epoch)
1255 break;
1256 else
1257 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1258 /* Fall through */
b411b363
PR
1259
1260 case WO_bdev_flush:
1261 case WO_drain_io:
b411b363 1262 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
2451fc3b
PR
1263 drbd_flush(mdev);
1264
1265 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1266 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1267 if (epoch)
1268 break;
b411b363
PR
1269 }
1270
2451fc3b
PR
1271 epoch = mdev->current_epoch;
1272 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1273
1274 D_ASSERT(atomic_read(&epoch->active) == 0);
1275 D_ASSERT(epoch->flags == 0);
b411b363 1276
82bc0194 1277 return 0;
2451fc3b
PR
1278 default:
1279 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
82bc0194 1280 return -EIO;
b411b363
PR
1281 }
1282
1283 epoch->flags = 0;
1284 atomic_set(&epoch->epoch_size, 0);
1285 atomic_set(&epoch->active, 0);
1286
1287 spin_lock(&mdev->epoch_lock);
1288 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1289 list_add(&epoch->list, &mdev->current_epoch->list);
1290 mdev->current_epoch = epoch;
1291 mdev->epochs++;
b411b363
PR
1292 } else {
1293 /* The current_epoch got recycled while we allocated this one... */
1294 kfree(epoch);
1295 }
1296 spin_unlock(&mdev->epoch_lock);
1297
82bc0194 1298 return 0;
b411b363
PR
1299}
1300
1301/* used from receive_RSDataReply (recv_resync_read)
1302 * and from receive_Data */
f6ffca9f
AG
1303static struct drbd_peer_request *
1304read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1305 int data_size) __must_hold(local)
b411b363 1306{
6666032a 1307 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
db830c46 1308 struct drbd_peer_request *peer_req;
b411b363 1309 struct page *page;
45bb912b 1310 int dgs, ds, rr;
a0638456
PR
1311 void *dig_in = mdev->tconn->int_dig_in;
1312 void *dig_vv = mdev->tconn->int_dig_vv;
6b4388ac 1313 unsigned long *data;
b411b363 1314
a0638456
PR
1315 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1316 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
b411b363
PR
1317
1318 if (dgs) {
de0ff338 1319 rr = drbd_recv(mdev->tconn, dig_in, dgs);
b411b363 1320 if (rr != dgs) {
0ddc5549
LE
1321 if (!signal_pending(current))
1322 dev_warn(DEV,
1323 "short read receiving data digest: read %d expected %d\n",
1324 rr, dgs);
b411b363
PR
1325 return NULL;
1326 }
1327 }
1328
1329 data_size -= dgs;
1330
841ce241
AG
1331 if (!expect(data_size != 0))
1332 return NULL;
1333 if (!expect(IS_ALIGNED(data_size, 512)))
1334 return NULL;
1335 if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1336 return NULL;
b411b363 1337
6666032a
LE
1338 /* even though we trust out peer,
1339 * we sometimes have to double check. */
1340 if (sector + (data_size>>9) > capacity) {
fdda6544
LE
1341 dev_err(DEV, "request from peer beyond end of local disk: "
1342 "capacity: %llus < sector: %llus + size: %u\n",
6666032a
LE
1343 (unsigned long long)capacity,
1344 (unsigned long long)sector, data_size);
1345 return NULL;
1346 }
1347
b411b363
PR
1348 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1349 * "criss-cross" setup, that might cause write-out on some other DRBD,
1350 * which in turn might block on the other node at this very place. */
db830c46
AG
1351 peer_req = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1352 if (!peer_req)
b411b363 1353 return NULL;
45bb912b 1354
b411b363 1355 ds = data_size;
db830c46 1356 page = peer_req->pages;
45bb912b
LE
1357 page_chain_for_each(page) {
1358 unsigned len = min_t(int, ds, PAGE_SIZE);
6b4388ac 1359 data = kmap(page);
de0ff338 1360 rr = drbd_recv(mdev->tconn, data, len);
0cf9d27e 1361 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
6b4388ac
PR
1362 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1363 data[0] = data[0] ^ (unsigned long)-1;
1364 }
b411b363 1365 kunmap(page);
45bb912b 1366 if (rr != len) {
db830c46 1367 drbd_free_ee(mdev, peer_req);
0ddc5549
LE
1368 if (!signal_pending(current))
1369 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1370 rr, len);
b411b363
PR
1371 return NULL;
1372 }
1373 ds -= rr;
1374 }
1375
1376 if (dgs) {
db830c46 1377 drbd_csum_ee(mdev, mdev->tconn->integrity_r_tfm, peer_req, dig_vv);
b411b363 1378 if (memcmp(dig_in, dig_vv, dgs)) {
470be44a
LE
1379 dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1380 (unsigned long long)sector, data_size);
db830c46 1381 drbd_free_ee(mdev, peer_req);
b411b363
PR
1382 return NULL;
1383 }
1384 }
1385 mdev->recv_cnt += data_size>>9;
db830c46 1386 return peer_req;
b411b363
PR
1387}
1388
1389/* drbd_drain_block() just takes a data block
1390 * out of the socket input buffer, and discards it.
1391 */
1392static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1393{
1394 struct page *page;
fc5be839 1395 int rr, err = 0;
b411b363
PR
1396 void *data;
1397
c3470cde 1398 if (!data_size)
fc5be839 1399 return 0;
c3470cde 1400
45bb912b 1401 page = drbd_pp_alloc(mdev, 1, 1);
b411b363
PR
1402
1403 data = kmap(page);
1404 while (data_size) {
fc5be839
AG
1405 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1406
1407 rr = drbd_recv(mdev->tconn, data, len);
1408 if (rr != len) {
0ddc5549
LE
1409 if (!signal_pending(current))
1410 dev_warn(DEV,
1411 "short read receiving data: read %d expected %d\n",
fc5be839
AG
1412 rr, len);
1413 err = (rr < 0) ? rr : -EIO;
b411b363
PR
1414 break;
1415 }
1416 data_size -= rr;
1417 }
1418 kunmap(page);
435f0740 1419 drbd_pp_free(mdev, page, 0);
fc5be839 1420 return err;
b411b363
PR
1421}
1422
1423static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1424 sector_t sector, int data_size)
1425{
1426 struct bio_vec *bvec;
1427 struct bio *bio;
1428 int dgs, rr, i, expect;
a0638456
PR
1429 void *dig_in = mdev->tconn->int_dig_in;
1430 void *dig_vv = mdev->tconn->int_dig_vv;
b411b363 1431
a0638456
PR
1432 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1433 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
b411b363
PR
1434
1435 if (dgs) {
de0ff338 1436 rr = drbd_recv(mdev->tconn, dig_in, dgs);
b411b363 1437 if (rr != dgs) {
0ddc5549
LE
1438 if (!signal_pending(current))
1439 dev_warn(DEV,
1440 "short read receiving data reply digest: read %d expected %d\n",
1441 rr, dgs);
28284cef 1442 return rr < 0 ? rr : -EIO;
b411b363
PR
1443 }
1444 }
1445
1446 data_size -= dgs;
1447
1448 /* optimistically update recv_cnt. if receiving fails below,
1449 * we disconnect anyways, and counters will be reset. */
1450 mdev->recv_cnt += data_size>>9;
1451
1452 bio = req->master_bio;
1453 D_ASSERT(sector == bio->bi_sector);
1454
1455 bio_for_each_segment(bvec, bio, i) {
1456 expect = min_t(int, data_size, bvec->bv_len);
de0ff338 1457 rr = drbd_recv(mdev->tconn,
b411b363
PR
1458 kmap(bvec->bv_page)+bvec->bv_offset,
1459 expect);
1460 kunmap(bvec->bv_page);
1461 if (rr != expect) {
0ddc5549
LE
1462 if (!signal_pending(current))
1463 dev_warn(DEV, "short read receiving data reply: "
1464 "read %d expected %d\n",
1465 rr, expect);
28284cef 1466 return rr < 0 ? rr : -EIO;
b411b363
PR
1467 }
1468 data_size -= rr;
1469 }
1470
1471 if (dgs) {
a0638456 1472 drbd_csum_bio(mdev, mdev->tconn->integrity_r_tfm, bio, dig_vv);
b411b363
PR
1473 if (memcmp(dig_in, dig_vv, dgs)) {
1474 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
28284cef 1475 return -EINVAL;
b411b363
PR
1476 }
1477 }
1478
1479 D_ASSERT(data_size == 0);
28284cef 1480 return 0;
b411b363
PR
1481}
1482
1483/* e_end_resync_block() is called via
1484 * drbd_process_done_ee() by asender only */
99920dc5 1485static int e_end_resync_block(struct drbd_work *w, int unused)
b411b363 1486{
8050e6d0
AG
1487 struct drbd_peer_request *peer_req =
1488 container_of(w, struct drbd_peer_request, w);
00d56944 1489 struct drbd_conf *mdev = w->mdev;
db830c46 1490 sector_t sector = peer_req->i.sector;
99920dc5 1491 int err;
b411b363 1492
db830c46 1493 D_ASSERT(drbd_interval_empty(&peer_req->i));
b411b363 1494
db830c46
AG
1495 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1496 drbd_set_in_sync(mdev, sector, peer_req->i.size);
99920dc5 1497 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
b411b363
PR
1498 } else {
1499 /* Record failure to sync */
db830c46 1500 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
b411b363 1501
99920dc5 1502 err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
b411b363
PR
1503 }
1504 dec_unacked(mdev);
1505
99920dc5 1506 return err;
b411b363
PR
1507}
1508
1509static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1510{
db830c46 1511 struct drbd_peer_request *peer_req;
b411b363 1512
db830c46
AG
1513 peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1514 if (!peer_req)
45bb912b 1515 goto fail;
b411b363
PR
1516
1517 dec_rs_pending(mdev);
1518
b411b363
PR
1519 inc_unacked(mdev);
1520 /* corresponding dec_unacked() in e_end_resync_block()
1521 * respective _drbd_clear_done_ee */
1522
db830c46 1523 peer_req->w.cb = e_end_resync_block;
45bb912b 1524
87eeee41 1525 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 1526 list_add(&peer_req->w.list, &mdev->sync_ee);
87eeee41 1527 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 1528
0f0601f4 1529 atomic_add(data_size >> 9, &mdev->rs_sect_ev);
fbe29dec 1530 if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
e1c1b0fc 1531 return 0;
b411b363 1532
10f6d992
LE
1533 /* don't care for the reason here */
1534 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 1535 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 1536 list_del(&peer_req->w.list);
87eeee41 1537 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9 1538
db830c46 1539 drbd_free_ee(mdev, peer_req);
45bb912b
LE
1540fail:
1541 put_ldev(mdev);
e1c1b0fc 1542 return -EIO;
b411b363
PR
1543}
1544
668eebc6 1545static struct drbd_request *
bc9c5c41
AG
1546find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1547 sector_t sector, bool missing_ok, const char *func)
51624585 1548{
51624585
AG
1549 struct drbd_request *req;
1550
bc9c5c41
AG
1551 /* Request object according to our peer */
1552 req = (struct drbd_request *)(unsigned long)id;
5e472264 1553 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
668eebc6 1554 return req;
c3afd8f5
AG
1555 if (!missing_ok) {
1556 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1557 (unsigned long)id, (unsigned long long)sector);
1558 }
51624585
AG
1559 return NULL;
1560}
1561
d8763023
AG
1562static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packet cmd,
1563 unsigned int data_size)
b411b363
PR
1564{
1565 struct drbd_request *req;
1566 sector_t sector;
82bc0194 1567 int err;
e42325a5 1568 struct p_data *p = &mdev->tconn->data.rbuf.data;
b411b363
PR
1569
1570 sector = be64_to_cpu(p->sector);
1571
87eeee41 1572 spin_lock_irq(&mdev->tconn->req_lock);
bc9c5c41 1573 req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
87eeee41 1574 spin_unlock_irq(&mdev->tconn->req_lock);
c3afd8f5 1575 if (unlikely(!req))
82bc0194 1576 return -EIO;
b411b363 1577
24c4830c 1578 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
b411b363
PR
1579 * special casing it there for the various failure cases.
1580 * still no race with drbd_fail_pending_reads */
82bc0194
AG
1581 err = recv_dless_read(mdev, req, sector, data_size);
1582 if (!err)
8554df1c 1583 req_mod(req, DATA_RECEIVED);
b411b363
PR
1584 /* else: nothing. handled from drbd_disconnect...
1585 * I don't think we may complete this just yet
1586 * in case we are "on-disconnect: freeze" */
1587
82bc0194 1588 return err;
b411b363
PR
1589}
1590
d8763023
AG
1591static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packet cmd,
1592 unsigned int data_size)
b411b363
PR
1593{
1594 sector_t sector;
82bc0194 1595 int err;
e42325a5 1596 struct p_data *p = &mdev->tconn->data.rbuf.data;
b411b363
PR
1597
1598 sector = be64_to_cpu(p->sector);
1599 D_ASSERT(p->block_id == ID_SYNCER);
1600
1601 if (get_ldev(mdev)) {
1602 /* data is submitted to disk within recv_resync_read.
1603 * corresponding put_ldev done below on error,
fcefa62e 1604 * or in drbd_peer_request_endio. */
82bc0194 1605 err = recv_resync_read(mdev, sector, data_size);
b411b363
PR
1606 } else {
1607 if (__ratelimit(&drbd_ratelimit_state))
1608 dev_err(DEV, "Can not write resync data to local disk.\n");
1609
82bc0194 1610 err = drbd_drain_block(mdev, data_size);
b411b363 1611
2b2bf214 1612 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
b411b363
PR
1613 }
1614
778f271d
PR
1615 atomic_add(data_size >> 9, &mdev->rs_sect_in);
1616
82bc0194 1617 return err;
b411b363
PR
1618}
1619
99920dc5 1620static int w_restart_write(struct drbd_work *w, int cancel)
7be8da07
AG
1621{
1622 struct drbd_request *req = container_of(w, struct drbd_request, w);
1623 struct drbd_conf *mdev = w->mdev;
1624 struct bio *bio;
1625 unsigned long start_time;
1626 unsigned long flags;
1627
1628 spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1629 if (!expect(req->rq_state & RQ_POSTPONED)) {
1630 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
99920dc5 1631 return -EIO;
7be8da07
AG
1632 }
1633 bio = req->master_bio;
1634 start_time = req->start_time;
1635 /* Postponed requests will not have their master_bio completed! */
1636 __req_mod(req, DISCARD_WRITE, NULL);
1637 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1638
1639 while (__drbd_make_request(mdev, bio, start_time))
1640 /* retry */ ;
99920dc5 1641 return 0;
7be8da07
AG
1642}
1643
1644static void restart_conflicting_writes(struct drbd_conf *mdev,
1645 sector_t sector, int size)
1646{
1647 struct drbd_interval *i;
1648 struct drbd_request *req;
1649
1650 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1651 if (!i->local)
1652 continue;
1653 req = container_of(i, struct drbd_request, i);
1654 if (req->rq_state & RQ_LOCAL_PENDING ||
1655 !(req->rq_state & RQ_POSTPONED))
1656 continue;
1657 if (expect(list_empty(&req->w.list))) {
1658 req->w.mdev = mdev;
1659 req->w.cb = w_restart_write;
1660 drbd_queue_work(&mdev->tconn->data.work, &req->w);
1661 }
1662 }
1663}
1664
b411b363
PR
1665/* e_end_block() is called via drbd_process_done_ee().
1666 * this means this function only runs in the asender thread
1667 */
99920dc5 1668static int e_end_block(struct drbd_work *w, int cancel)
b411b363 1669{
8050e6d0
AG
1670 struct drbd_peer_request *peer_req =
1671 container_of(w, struct drbd_peer_request, w);
00d56944 1672 struct drbd_conf *mdev = w->mdev;
db830c46 1673 sector_t sector = peer_req->i.sector;
99920dc5 1674 int err = 0, pcmd;
b411b363 1675
89e58e75 1676 if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C) {
db830c46 1677 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1678 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1679 mdev->state.conn <= C_PAUSED_SYNC_T &&
db830c46 1680 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
b411b363 1681 P_RS_WRITE_ACK : P_WRITE_ACK;
99920dc5 1682 err = drbd_send_ack(mdev, pcmd, peer_req);
b411b363 1683 if (pcmd == P_RS_WRITE_ACK)
db830c46 1684 drbd_set_in_sync(mdev, sector, peer_req->i.size);
b411b363 1685 } else {
99920dc5 1686 err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
b411b363
PR
1687 /* we expect it to be marked out of sync anyways...
1688 * maybe assert this? */
1689 }
1690 dec_unacked(mdev);
1691 }
1692 /* we delete from the conflict detection hash _after_ we sent out the
1693 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
89e58e75 1694 if (mdev->tconn->net_conf->two_primaries) {
87eeee41 1695 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
1696 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1697 drbd_remove_epoch_entry_interval(mdev, peer_req);
7be8da07
AG
1698 if (peer_req->flags & EE_RESTART_REQUESTS)
1699 restart_conflicting_writes(mdev, sector, peer_req->i.size);
87eeee41 1700 spin_unlock_irq(&mdev->tconn->req_lock);
bb3bfe96 1701 } else
db830c46 1702 D_ASSERT(drbd_interval_empty(&peer_req->i));
b411b363 1703
db830c46 1704 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
b411b363 1705
99920dc5 1706 return err;
b411b363
PR
1707}
1708
7be8da07 1709static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
b411b363 1710{
7be8da07 1711 struct drbd_conf *mdev = w->mdev;
8050e6d0
AG
1712 struct drbd_peer_request *peer_req =
1713 container_of(w, struct drbd_peer_request, w);
99920dc5 1714 int err;
b411b363 1715
99920dc5 1716 err = drbd_send_ack(mdev, ack, peer_req);
b411b363
PR
1717 dec_unacked(mdev);
1718
99920dc5 1719 return err;
b411b363
PR
1720}
1721
99920dc5 1722static int e_send_discard_write(struct drbd_work *w, int unused)
7be8da07
AG
1723{
1724 return e_send_ack(w, P_DISCARD_WRITE);
1725}
1726
99920dc5 1727static int e_send_retry_write(struct drbd_work *w, int unused)
7be8da07
AG
1728{
1729 struct drbd_tconn *tconn = w->mdev->tconn;
1730
1731 return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1732 P_RETRY_WRITE : P_DISCARD_WRITE);
1733}
1734
3e394da1
AG
1735static bool seq_greater(u32 a, u32 b)
1736{
1737 /*
1738 * We assume 32-bit wrap-around here.
1739 * For 24-bit wrap-around, we would have to shift:
1740 * a <<= 8; b <<= 8;
1741 */
1742 return (s32)a - (s32)b > 0;
1743}
1744
1745static u32 seq_max(u32 a, u32 b)
1746{
1747 return seq_greater(a, b) ? a : b;
1748}
1749
7be8da07
AG
1750static bool need_peer_seq(struct drbd_conf *mdev)
1751{
1752 struct drbd_tconn *tconn = mdev->tconn;
1753
1754 /*
1755 * We only need to keep track of the last packet_seq number of our peer
1756 * if we are in dual-primary mode and we have the discard flag set; see
1757 * handle_write_conflicts().
1758 */
1759 return tconn->net_conf->two_primaries &&
1760 test_bit(DISCARD_CONCURRENT, &tconn->flags);
1761}
1762
43ae077d 1763static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
3e394da1 1764{
3c13b680 1765 unsigned int newest_peer_seq;
3e394da1 1766
7be8da07
AG
1767 if (need_peer_seq(mdev)) {
1768 spin_lock(&mdev->peer_seq_lock);
3c13b680
LE
1769 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1770 mdev->peer_seq = newest_peer_seq;
7be8da07 1771 spin_unlock(&mdev->peer_seq_lock);
3c13b680
LE
1772 /* wake up only if we actually changed mdev->peer_seq */
1773 if (peer_seq == newest_peer_seq)
7be8da07
AG
1774 wake_up(&mdev->seq_wait);
1775 }
3e394da1
AG
1776}
1777
b411b363
PR
1778/* Called from receive_Data.
1779 * Synchronize packets on sock with packets on msock.
1780 *
1781 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1782 * packet traveling on msock, they are still processed in the order they have
1783 * been sent.
1784 *
1785 * Note: we don't care for Ack packets overtaking P_DATA packets.
1786 *
1787 * In case packet_seq is larger than mdev->peer_seq number, there are
1788 * outstanding packets on the msock. We wait for them to arrive.
1789 * In case we are the logically next packet, we update mdev->peer_seq
1790 * ourselves. Correctly handles 32bit wrap around.
1791 *
1792 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1793 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1794 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1795 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1796 *
1797 * returns 0 if we may process the packet,
1798 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
7be8da07 1799static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
b411b363
PR
1800{
1801 DEFINE_WAIT(wait);
b411b363 1802 long timeout;
7be8da07
AG
1803 int ret;
1804
1805 if (!need_peer_seq(mdev))
1806 return 0;
1807
b411b363
PR
1808 spin_lock(&mdev->peer_seq_lock);
1809 for (;;) {
7be8da07
AG
1810 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1811 mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1812 ret = 0;
b411b363 1813 break;
7be8da07 1814 }
b411b363
PR
1815 if (signal_pending(current)) {
1816 ret = -ERESTARTSYS;
1817 break;
1818 }
7be8da07 1819 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
b411b363 1820 spin_unlock(&mdev->peer_seq_lock);
71b1c1eb
AG
1821 timeout = mdev->tconn->net_conf->ping_timeo*HZ/10;
1822 timeout = schedule_timeout(timeout);
b411b363 1823 spin_lock(&mdev->peer_seq_lock);
7be8da07 1824 if (!timeout) {
b411b363 1825 ret = -ETIMEDOUT;
71b1c1eb 1826 dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
b411b363
PR
1827 break;
1828 }
1829 }
b411b363 1830 spin_unlock(&mdev->peer_seq_lock);
7be8da07 1831 finish_wait(&mdev->seq_wait, &wait);
b411b363
PR
1832 return ret;
1833}
1834
688593c5
LE
1835/* see also bio_flags_to_wire()
1836 * DRBD_REQ_*, because we need to semantically map the flags to data packet
1837 * flags and back. We may replicate to other kernel versions. */
1838static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
76d2e7ec 1839{
688593c5
LE
1840 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1841 (dpf & DP_FUA ? REQ_FUA : 0) |
1842 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1843 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
76d2e7ec
PR
1844}
1845
7be8da07
AG
1846static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1847 unsigned int size)
1848{
1849 struct drbd_interval *i;
1850
1851 repeat:
1852 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1853 struct drbd_request *req;
1854 struct bio_and_error m;
1855
1856 if (!i->local)
1857 continue;
1858 req = container_of(i, struct drbd_request, i);
1859 if (!(req->rq_state & RQ_POSTPONED))
1860 continue;
1861 req->rq_state &= ~RQ_POSTPONED;
1862 __req_mod(req, NEG_ACKED, &m);
1863 spin_unlock_irq(&mdev->tconn->req_lock);
1864 if (m.bio)
1865 complete_master_bio(mdev, &m);
1866 spin_lock_irq(&mdev->tconn->req_lock);
1867 goto repeat;
1868 }
1869}
1870
1871static int handle_write_conflicts(struct drbd_conf *mdev,
1872 struct drbd_peer_request *peer_req)
1873{
1874 struct drbd_tconn *tconn = mdev->tconn;
1875 bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1876 sector_t sector = peer_req->i.sector;
1877 const unsigned int size = peer_req->i.size;
1878 struct drbd_interval *i;
1879 bool equal;
1880 int err;
1881
1882 /*
1883 * Inserting the peer request into the write_requests tree will prevent
1884 * new conflicting local requests from being added.
1885 */
1886 drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1887
1888 repeat:
1889 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1890 if (i == &peer_req->i)
1891 continue;
1892
1893 if (!i->local) {
1894 /*
1895 * Our peer has sent a conflicting remote request; this
1896 * should not happen in a two-node setup. Wait for the
1897 * earlier peer request to complete.
1898 */
1899 err = drbd_wait_misc(mdev, i);
1900 if (err)
1901 goto out;
1902 goto repeat;
1903 }
1904
1905 equal = i->sector == sector && i->size == size;
1906 if (resolve_conflicts) {
1907 /*
1908 * If the peer request is fully contained within the
1909 * overlapping request, it can be discarded; otherwise,
1910 * it will be retried once all overlapping requests
1911 * have completed.
1912 */
1913 bool discard = i->sector <= sector && i->sector +
1914 (i->size >> 9) >= sector + (size >> 9);
1915
1916 if (!equal)
1917 dev_alert(DEV, "Concurrent writes detected: "
1918 "local=%llus +%u, remote=%llus +%u, "
1919 "assuming %s came first\n",
1920 (unsigned long long)i->sector, i->size,
1921 (unsigned long long)sector, size,
1922 discard ? "local" : "remote");
1923
1924 inc_unacked(mdev);
1925 peer_req->w.cb = discard ? e_send_discard_write :
1926 e_send_retry_write;
1927 list_add_tail(&peer_req->w.list, &mdev->done_ee);
1928 wake_asender(mdev->tconn);
1929
1930 err = -ENOENT;
1931 goto out;
1932 } else {
1933 struct drbd_request *req =
1934 container_of(i, struct drbd_request, i);
1935
1936 if (!equal)
1937 dev_alert(DEV, "Concurrent writes detected: "
1938 "local=%llus +%u, remote=%llus +%u\n",
1939 (unsigned long long)i->sector, i->size,
1940 (unsigned long long)sector, size);
1941
1942 if (req->rq_state & RQ_LOCAL_PENDING ||
1943 !(req->rq_state & RQ_POSTPONED)) {
1944 /*
1945 * Wait for the node with the discard flag to
1946 * decide if this request will be discarded or
1947 * retried. Requests that are discarded will
1948 * disappear from the write_requests tree.
1949 *
1950 * In addition, wait for the conflicting
1951 * request to finish locally before submitting
1952 * the conflicting peer request.
1953 */
1954 err = drbd_wait_misc(mdev, &req->i);
1955 if (err) {
1956 _conn_request_state(mdev->tconn,
1957 NS(conn, C_TIMEOUT),
1958 CS_HARD);
1959 fail_postponed_requests(mdev, sector, size);
1960 goto out;
1961 }
1962 goto repeat;
1963 }
1964 /*
1965 * Remember to restart the conflicting requests after
1966 * the new peer request has completed.
1967 */
1968 peer_req->flags |= EE_RESTART_REQUESTS;
1969 }
1970 }
1971 err = 0;
1972
1973 out:
1974 if (err)
1975 drbd_remove_epoch_entry_interval(mdev, peer_req);
1976 return err;
1977}
1978
b411b363 1979/* mirrored write */
d8763023
AG
1980static int receive_Data(struct drbd_conf *mdev, enum drbd_packet cmd,
1981 unsigned int data_size)
b411b363
PR
1982{
1983 sector_t sector;
db830c46 1984 struct drbd_peer_request *peer_req;
e42325a5 1985 struct p_data *p = &mdev->tconn->data.rbuf.data;
7be8da07 1986 u32 peer_seq = be32_to_cpu(p->seq_num);
b411b363
PR
1987 int rw = WRITE;
1988 u32 dp_flags;
7be8da07 1989 int err;
b411b363 1990
7be8da07 1991 if (!get_ldev(mdev)) {
82bc0194
AG
1992 int err2;
1993
7be8da07 1994 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2b2bf214 1995 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
b411b363 1996 atomic_inc(&mdev->current_epoch->epoch_size);
82bc0194
AG
1997 err2 = drbd_drain_block(mdev, data_size);
1998 if (!err)
1999 err = err2;
2000 return err;
b411b363
PR
2001 }
2002
fcefa62e
AG
2003 /*
2004 * Corresponding put_ldev done either below (on various errors), or in
2005 * drbd_peer_request_endio, if we successfully submit the data at the
2006 * end of this function.
2007 */
b411b363
PR
2008
2009 sector = be64_to_cpu(p->sector);
db830c46
AG
2010 peer_req = read_in_block(mdev, p->block_id, sector, data_size);
2011 if (!peer_req) {
b411b363 2012 put_ldev(mdev);
82bc0194 2013 return -EIO;
b411b363
PR
2014 }
2015
db830c46 2016 peer_req->w.cb = e_end_block;
b411b363 2017
688593c5
LE
2018 dp_flags = be32_to_cpu(p->dp_flags);
2019 rw |= wire_flags_to_bio(mdev, dp_flags);
2020
2021 if (dp_flags & DP_MAY_SET_IN_SYNC)
db830c46 2022 peer_req->flags |= EE_MAY_SET_IN_SYNC;
688593c5 2023
b411b363 2024 spin_lock(&mdev->epoch_lock);
db830c46
AG
2025 peer_req->epoch = mdev->current_epoch;
2026 atomic_inc(&peer_req->epoch->epoch_size);
2027 atomic_inc(&peer_req->epoch->active);
b411b363
PR
2028 spin_unlock(&mdev->epoch_lock);
2029
7be8da07
AG
2030 if (mdev->tconn->net_conf->two_primaries) {
2031 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2032 if (err)
b411b363 2033 goto out_interrupted;
87eeee41 2034 spin_lock_irq(&mdev->tconn->req_lock);
7be8da07
AG
2035 err = handle_write_conflicts(mdev, peer_req);
2036 if (err) {
2037 spin_unlock_irq(&mdev->tconn->req_lock);
2038 if (err == -ENOENT) {
b411b363 2039 put_ldev(mdev);
82bc0194 2040 return 0;
b411b363 2041 }
7be8da07 2042 goto out_interrupted;
b411b363 2043 }
7be8da07
AG
2044 } else
2045 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2046 list_add(&peer_req->w.list, &mdev->active_ee);
87eeee41 2047 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 2048
89e58e75 2049 switch (mdev->tconn->net_conf->wire_protocol) {
b411b363
PR
2050 case DRBD_PROT_C:
2051 inc_unacked(mdev);
2052 /* corresponding dec_unacked() in e_end_block()
2053 * respective _drbd_clear_done_ee */
2054 break;
2055 case DRBD_PROT_B:
2056 /* I really don't like it that the receiver thread
2057 * sends on the msock, but anyways */
db830c46 2058 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
b411b363
PR
2059 break;
2060 case DRBD_PROT_A:
2061 /* nothing to do */
2062 break;
2063 }
2064
6719fb03 2065 if (mdev->state.pdsk < D_INCONSISTENT) {
b411b363 2066 /* In case we have the only disk of the cluster, */
db830c46
AG
2067 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2068 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2069 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2070 drbd_al_begin_io(mdev, peer_req->i.sector);
b411b363
PR
2071 }
2072
82bc0194
AG
2073 err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2074 if (!err)
2075 return 0;
b411b363 2076
10f6d992
LE
2077 /* don't care for the reason here */
2078 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 2079 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
2080 list_del(&peer_req->w.list);
2081 drbd_remove_epoch_entry_interval(mdev, peer_req);
87eeee41 2082 spin_unlock_irq(&mdev->tconn->req_lock);
db830c46
AG
2083 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2084 drbd_al_complete_io(mdev, peer_req->i.sector);
22cc37a9 2085
b411b363 2086out_interrupted:
db830c46 2087 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
b411b363 2088 put_ldev(mdev);
db830c46 2089 drbd_free_ee(mdev, peer_req);
82bc0194 2090 return err;
b411b363
PR
2091}
2092
0f0601f4
LE
2093/* We may throttle resync, if the lower device seems to be busy,
2094 * and current sync rate is above c_min_rate.
2095 *
2096 * To decide whether or not the lower device is busy, we use a scheme similar
2097 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2098 * (more than 64 sectors) of activity we cannot account for with our own resync
2099 * activity, it obviously is "busy".
2100 *
2101 * The current sync rate used here uses only the most recent two step marks,
2102 * to have a short time average so we can react faster.
2103 */
e3555d85 2104int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
0f0601f4
LE
2105{
2106 struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2107 unsigned long db, dt, dbdt;
e3555d85 2108 struct lc_element *tmp;
0f0601f4
LE
2109 int curr_events;
2110 int throttle = 0;
2111
2112 /* feature disabled? */
f399002e 2113 if (mdev->ldev->dc.c_min_rate == 0)
0f0601f4
LE
2114 return 0;
2115
e3555d85
PR
2116 spin_lock_irq(&mdev->al_lock);
2117 tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2118 if (tmp) {
2119 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2120 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2121 spin_unlock_irq(&mdev->al_lock);
2122 return 0;
2123 }
2124 /* Do not slow down if app IO is already waiting for this extent */
2125 }
2126 spin_unlock_irq(&mdev->al_lock);
2127
0f0601f4
LE
2128 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2129 (int)part_stat_read(&disk->part0, sectors[1]) -
2130 atomic_read(&mdev->rs_sect_ev);
e3555d85 2131
0f0601f4
LE
2132 if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2133 unsigned long rs_left;
2134 int i;
2135
2136 mdev->rs_last_events = curr_events;
2137
2138 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2139 * approx. */
2649f080
LE
2140 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2141
2142 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2143 rs_left = mdev->ov_left;
2144 else
2145 rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
0f0601f4
LE
2146
2147 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2148 if (!dt)
2149 dt++;
2150 db = mdev->rs_mark_left[i] - rs_left;
2151 dbdt = Bit2KB(db/dt);
2152
f399002e 2153 if (dbdt > mdev->ldev->dc.c_min_rate)
0f0601f4
LE
2154 throttle = 1;
2155 }
2156 return throttle;
2157}
2158
2159
d8763023
AG
2160static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packet cmd,
2161 unsigned int digest_size)
b411b363
PR
2162{
2163 sector_t sector;
2164 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
db830c46 2165 struct drbd_peer_request *peer_req;
b411b363 2166 struct digest_info *di = NULL;
b18b37be 2167 int size, verb;
b411b363 2168 unsigned int fault_type;
e42325a5 2169 struct p_block_req *p = &mdev->tconn->data.rbuf.block_req;
b411b363
PR
2170
2171 sector = be64_to_cpu(p->sector);
2172 size = be32_to_cpu(p->blksize);
2173
c670a398 2174 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
b411b363
PR
2175 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2176 (unsigned long long)sector, size);
82bc0194 2177 return -EINVAL;
b411b363
PR
2178 }
2179 if (sector + (size>>9) > capacity) {
2180 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2181 (unsigned long long)sector, size);
82bc0194 2182 return -EINVAL;
b411b363
PR
2183 }
2184
2185 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
b18b37be
PR
2186 verb = 1;
2187 switch (cmd) {
2188 case P_DATA_REQUEST:
2189 drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2190 break;
2191 case P_RS_DATA_REQUEST:
2192 case P_CSUM_RS_REQUEST:
2193 case P_OV_REQUEST:
2194 drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2195 break;
2196 case P_OV_REPLY:
2197 verb = 0;
2198 dec_rs_pending(mdev);
2199 drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2200 break;
2201 default:
2202 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2203 cmdname(cmd));
2204 }
2205 if (verb && __ratelimit(&drbd_ratelimit_state))
b411b363
PR
2206 dev_err(DEV, "Can not satisfy peer's read request, "
2207 "no local data.\n");
b18b37be 2208
a821cc4a 2209 /* drain possibly payload */
82bc0194 2210 return drbd_drain_block(mdev, digest_size);
b411b363
PR
2211 }
2212
2213 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2214 * "criss-cross" setup, that might cause write-out on some other DRBD,
2215 * which in turn might block on the other node at this very place. */
db830c46
AG
2216 peer_req = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2217 if (!peer_req) {
b411b363 2218 put_ldev(mdev);
82bc0194 2219 return -ENOMEM;
b411b363
PR
2220 }
2221
02918be2 2222 switch (cmd) {
b411b363 2223 case P_DATA_REQUEST:
db830c46 2224 peer_req->w.cb = w_e_end_data_req;
b411b363 2225 fault_type = DRBD_FAULT_DT_RD;
80a40e43
LE
2226 /* application IO, don't drbd_rs_begin_io */
2227 goto submit;
2228
b411b363 2229 case P_RS_DATA_REQUEST:
db830c46 2230 peer_req->w.cb = w_e_end_rsdata_req;
b411b363 2231 fault_type = DRBD_FAULT_RS_RD;
5f9915bb
LE
2232 /* used in the sector offset progress display */
2233 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
b411b363
PR
2234 break;
2235
2236 case P_OV_REPLY:
2237 case P_CSUM_RS_REQUEST:
2238 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2239 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2240 if (!di)
2241 goto out_free_e;
2242
2243 di->digest_size = digest_size;
2244 di->digest = (((char *)di)+sizeof(struct digest_info));
2245
db830c46
AG
2246 peer_req->digest = di;
2247 peer_req->flags |= EE_HAS_DIGEST;
c36c3ced 2248
de0ff338 2249 if (drbd_recv(mdev->tconn, di->digest, digest_size) != digest_size)
b411b363
PR
2250 goto out_free_e;
2251
02918be2 2252 if (cmd == P_CSUM_RS_REQUEST) {
31890f4a 2253 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
db830c46 2254 peer_req->w.cb = w_e_end_csum_rs_req;
5f9915bb
LE
2255 /* used in the sector offset progress display */
2256 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
02918be2 2257 } else if (cmd == P_OV_REPLY) {
2649f080
LE
2258 /* track progress, we may need to throttle */
2259 atomic_add(size >> 9, &mdev->rs_sect_in);
db830c46 2260 peer_req->w.cb = w_e_end_ov_reply;
b411b363 2261 dec_rs_pending(mdev);
0f0601f4
LE
2262 /* drbd_rs_begin_io done when we sent this request,
2263 * but accounting still needs to be done. */
2264 goto submit_for_resync;
b411b363
PR
2265 }
2266 break;
2267
2268 case P_OV_REQUEST:
b411b363 2269 if (mdev->ov_start_sector == ~(sector_t)0 &&
31890f4a 2270 mdev->tconn->agreed_pro_version >= 90) {
de228bba
LE
2271 unsigned long now = jiffies;
2272 int i;
b411b363
PR
2273 mdev->ov_start_sector = sector;
2274 mdev->ov_position = sector;
30b743a2
LE
2275 mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2276 mdev->rs_total = mdev->ov_left;
de228bba
LE
2277 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2278 mdev->rs_mark_left[i] = mdev->ov_left;
2279 mdev->rs_mark_time[i] = now;
2280 }
b411b363
PR
2281 dev_info(DEV, "Online Verify start sector: %llu\n",
2282 (unsigned long long)sector);
2283 }
db830c46 2284 peer_req->w.cb = w_e_end_ov_req;
b411b363 2285 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2286 break;
2287
b411b363
PR
2288 default:
2289 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
02918be2 2290 cmdname(cmd));
b411b363 2291 fault_type = DRBD_FAULT_MAX;
80a40e43 2292 goto out_free_e;
b411b363
PR
2293 }
2294
0f0601f4
LE
2295 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2296 * wrt the receiver, but it is not as straightforward as it may seem.
2297 * Various places in the resync start and stop logic assume resync
2298 * requests are processed in order, requeuing this on the worker thread
2299 * introduces a bunch of new code for synchronization between threads.
2300 *
2301 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2302 * "forever", throttling after drbd_rs_begin_io will lock that extent
2303 * for application writes for the same time. For now, just throttle
2304 * here, where the rest of the code expects the receiver to sleep for
2305 * a while, anyways.
2306 */
2307
2308 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2309 * this defers syncer requests for some time, before letting at least
2310 * on request through. The resync controller on the receiving side
2311 * will adapt to the incoming rate accordingly.
2312 *
2313 * We cannot throttle here if remote is Primary/SyncTarget:
2314 * we would also throttle its application reads.
2315 * In that case, throttling is done on the SyncTarget only.
2316 */
e3555d85
PR
2317 if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2318 schedule_timeout_uninterruptible(HZ/10);
2319 if (drbd_rs_begin_io(mdev, sector))
80a40e43 2320 goto out_free_e;
b411b363 2321
0f0601f4
LE
2322submit_for_resync:
2323 atomic_add(size >> 9, &mdev->rs_sect_ev);
2324
80a40e43 2325submit:
b411b363 2326 inc_unacked(mdev);
87eeee41 2327 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2328 list_add_tail(&peer_req->w.list, &mdev->read_ee);
87eeee41 2329 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 2330
fbe29dec 2331 if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
82bc0194 2332 return 0;
b411b363 2333
10f6d992
LE
2334 /* don't care for the reason here */
2335 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 2336 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2337 list_del(&peer_req->w.list);
87eeee41 2338 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9
LE
2339 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2340
b411b363 2341out_free_e:
b411b363 2342 put_ldev(mdev);
db830c46 2343 drbd_free_ee(mdev, peer_req);
82bc0194 2344 return -EIO;
b411b363
PR
2345}
2346
2347static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2348{
2349 int self, peer, rv = -100;
2350 unsigned long ch_self, ch_peer;
2351
2352 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2353 peer = mdev->p_uuid[UI_BITMAP] & 1;
2354
2355 ch_peer = mdev->p_uuid[UI_SIZE];
2356 ch_self = mdev->comm_bm_set;
2357
89e58e75 2358 switch (mdev->tconn->net_conf->after_sb_0p) {
b411b363
PR
2359 case ASB_CONSENSUS:
2360 case ASB_DISCARD_SECONDARY:
2361 case ASB_CALL_HELPER:
2362 dev_err(DEV, "Configuration error.\n");
2363 break;
2364 case ASB_DISCONNECT:
2365 break;
2366 case ASB_DISCARD_YOUNGER_PRI:
2367 if (self == 0 && peer == 1) {
2368 rv = -1;
2369 break;
2370 }
2371 if (self == 1 && peer == 0) {
2372 rv = 1;
2373 break;
2374 }
2375 /* Else fall through to one of the other strategies... */
2376 case ASB_DISCARD_OLDER_PRI:
2377 if (self == 0 && peer == 1) {
2378 rv = 1;
2379 break;
2380 }
2381 if (self == 1 && peer == 0) {
2382 rv = -1;
2383 break;
2384 }
2385 /* Else fall through to one of the other strategies... */
ad19bf6e 2386 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
b411b363
PR
2387 "Using discard-least-changes instead\n");
2388 case ASB_DISCARD_ZERO_CHG:
2389 if (ch_peer == 0 && ch_self == 0) {
25703f83 2390 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
b411b363
PR
2391 ? -1 : 1;
2392 break;
2393 } else {
2394 if (ch_peer == 0) { rv = 1; break; }
2395 if (ch_self == 0) { rv = -1; break; }
2396 }
89e58e75 2397 if (mdev->tconn->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
b411b363
PR
2398 break;
2399 case ASB_DISCARD_LEAST_CHG:
2400 if (ch_self < ch_peer)
2401 rv = -1;
2402 else if (ch_self > ch_peer)
2403 rv = 1;
2404 else /* ( ch_self == ch_peer ) */
2405 /* Well, then use something else. */
25703f83 2406 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
b411b363
PR
2407 ? -1 : 1;
2408 break;
2409 case ASB_DISCARD_LOCAL:
2410 rv = -1;
2411 break;
2412 case ASB_DISCARD_REMOTE:
2413 rv = 1;
2414 }
2415
2416 return rv;
2417}
2418
2419static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2420{
6184ea21 2421 int hg, rv = -100;
b411b363 2422
89e58e75 2423 switch (mdev->tconn->net_conf->after_sb_1p) {
b411b363
PR
2424 case ASB_DISCARD_YOUNGER_PRI:
2425 case ASB_DISCARD_OLDER_PRI:
2426 case ASB_DISCARD_LEAST_CHG:
2427 case ASB_DISCARD_LOCAL:
2428 case ASB_DISCARD_REMOTE:
2429 dev_err(DEV, "Configuration error.\n");
2430 break;
2431 case ASB_DISCONNECT:
2432 break;
2433 case ASB_CONSENSUS:
2434 hg = drbd_asb_recover_0p(mdev);
2435 if (hg == -1 && mdev->state.role == R_SECONDARY)
2436 rv = hg;
2437 if (hg == 1 && mdev->state.role == R_PRIMARY)
2438 rv = hg;
2439 break;
2440 case ASB_VIOLENTLY:
2441 rv = drbd_asb_recover_0p(mdev);
2442 break;
2443 case ASB_DISCARD_SECONDARY:
2444 return mdev->state.role == R_PRIMARY ? 1 : -1;
2445 case ASB_CALL_HELPER:
2446 hg = drbd_asb_recover_0p(mdev);
2447 if (hg == -1 && mdev->state.role == R_PRIMARY) {
bb437946
AG
2448 enum drbd_state_rv rv2;
2449
2450 drbd_set_role(mdev, R_SECONDARY, 0);
b411b363
PR
2451 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2452 * we might be here in C_WF_REPORT_PARAMS which is transient.
2453 * we do not need to wait for the after state change work either. */
bb437946
AG
2454 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2455 if (rv2 != SS_SUCCESS) {
b411b363
PR
2456 drbd_khelper(mdev, "pri-lost-after-sb");
2457 } else {
2458 dev_warn(DEV, "Successfully gave up primary role.\n");
2459 rv = hg;
2460 }
2461 } else
2462 rv = hg;
2463 }
2464
2465 return rv;
2466}
2467
2468static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2469{
6184ea21 2470 int hg, rv = -100;
b411b363 2471
89e58e75 2472 switch (mdev->tconn->net_conf->after_sb_2p) {
b411b363
PR
2473 case ASB_DISCARD_YOUNGER_PRI:
2474 case ASB_DISCARD_OLDER_PRI:
2475 case ASB_DISCARD_LEAST_CHG:
2476 case ASB_DISCARD_LOCAL:
2477 case ASB_DISCARD_REMOTE:
2478 case ASB_CONSENSUS:
2479 case ASB_DISCARD_SECONDARY:
2480 dev_err(DEV, "Configuration error.\n");
2481 break;
2482 case ASB_VIOLENTLY:
2483 rv = drbd_asb_recover_0p(mdev);
2484 break;
2485 case ASB_DISCONNECT:
2486 break;
2487 case ASB_CALL_HELPER:
2488 hg = drbd_asb_recover_0p(mdev);
2489 if (hg == -1) {
bb437946
AG
2490 enum drbd_state_rv rv2;
2491
b411b363
PR
2492 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2493 * we might be here in C_WF_REPORT_PARAMS which is transient.
2494 * we do not need to wait for the after state change work either. */
bb437946
AG
2495 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2496 if (rv2 != SS_SUCCESS) {
b411b363
PR
2497 drbd_khelper(mdev, "pri-lost-after-sb");
2498 } else {
2499 dev_warn(DEV, "Successfully gave up primary role.\n");
2500 rv = hg;
2501 }
2502 } else
2503 rv = hg;
2504 }
2505
2506 return rv;
2507}
2508
2509static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2510 u64 bits, u64 flags)
2511{
2512 if (!uuid) {
2513 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2514 return;
2515 }
2516 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2517 text,
2518 (unsigned long long)uuid[UI_CURRENT],
2519 (unsigned long long)uuid[UI_BITMAP],
2520 (unsigned long long)uuid[UI_HISTORY_START],
2521 (unsigned long long)uuid[UI_HISTORY_END],
2522 (unsigned long long)bits,
2523 (unsigned long long)flags);
2524}
2525
2526/*
2527 100 after split brain try auto recover
2528 2 C_SYNC_SOURCE set BitMap
2529 1 C_SYNC_SOURCE use BitMap
2530 0 no Sync
2531 -1 C_SYNC_TARGET use BitMap
2532 -2 C_SYNC_TARGET set BitMap
2533 -100 after split brain, disconnect
2534-1000 unrelated data
4a23f264
PR
2535-1091 requires proto 91
2536-1096 requires proto 96
b411b363
PR
2537 */
2538static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2539{
2540 u64 self, peer;
2541 int i, j;
2542
2543 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2544 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2545
2546 *rule_nr = 10;
2547 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2548 return 0;
2549
2550 *rule_nr = 20;
2551 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2552 peer != UUID_JUST_CREATED)
2553 return -2;
2554
2555 *rule_nr = 30;
2556 if (self != UUID_JUST_CREATED &&
2557 (peer == UUID_JUST_CREATED || peer == (u64)0))
2558 return 2;
2559
2560 if (self == peer) {
2561 int rct, dc; /* roles at crash time */
2562
2563 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2564
31890f4a 2565 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2566 return -1091;
b411b363
PR
2567
2568 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2569 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2570 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2571 drbd_uuid_set_bm(mdev, 0UL);
2572
2573 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2574 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2575 *rule_nr = 34;
2576 } else {
2577 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2578 *rule_nr = 36;
2579 }
2580
2581 return 1;
2582 }
2583
2584 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2585
31890f4a 2586 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2587 return -1091;
b411b363
PR
2588
2589 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2590 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2591 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2592
2593 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2594 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2595 mdev->p_uuid[UI_BITMAP] = 0UL;
2596
2597 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2598 *rule_nr = 35;
2599 } else {
2600 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2601 *rule_nr = 37;
2602 }
2603
2604 return -1;
2605 }
2606
2607 /* Common power [off|failure] */
2608 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2609 (mdev->p_uuid[UI_FLAGS] & 2);
2610 /* lowest bit is set when we were primary,
2611 * next bit (weight 2) is set when peer was primary */
2612 *rule_nr = 40;
2613
2614 switch (rct) {
2615 case 0: /* !self_pri && !peer_pri */ return 0;
2616 case 1: /* self_pri && !peer_pri */ return 1;
2617 case 2: /* !self_pri && peer_pri */ return -1;
2618 case 3: /* self_pri && peer_pri */
25703f83 2619 dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
b411b363
PR
2620 return dc ? -1 : 1;
2621 }
2622 }
2623
2624 *rule_nr = 50;
2625 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2626 if (self == peer)
2627 return -1;
2628
2629 *rule_nr = 51;
2630 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2631 if (self == peer) {
31890f4a 2632 if (mdev->tconn->agreed_pro_version < 96 ?
4a23f264
PR
2633 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2634 (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2635 peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2636 /* The last P_SYNC_UUID did not get though. Undo the last start of
2637 resync as sync source modifications of the peer's UUIDs. */
2638
31890f4a 2639 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2640 return -1091;
b411b363
PR
2641
2642 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2643 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
4a23f264
PR
2644
2645 dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2646 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2647
b411b363
PR
2648 return -1;
2649 }
2650 }
2651
2652 *rule_nr = 60;
2653 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2654 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2655 peer = mdev->p_uuid[i] & ~((u64)1);
2656 if (self == peer)
2657 return -2;
2658 }
2659
2660 *rule_nr = 70;
2661 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2662 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2663 if (self == peer)
2664 return 1;
2665
2666 *rule_nr = 71;
2667 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2668 if (self == peer) {
31890f4a 2669 if (mdev->tconn->agreed_pro_version < 96 ?
4a23f264
PR
2670 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2671 (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2672 self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2673 /* The last P_SYNC_UUID did not get though. Undo the last start of
2674 resync as sync source modifications of our UUIDs. */
2675
31890f4a 2676 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2677 return -1091;
b411b363
PR
2678
2679 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2680 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2681
4a23f264 2682 dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
b411b363
PR
2683 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2684 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2685
2686 return 1;
2687 }
2688 }
2689
2690
2691 *rule_nr = 80;
d8c2a36b 2692 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
b411b363
PR
2693 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2694 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2695 if (self == peer)
2696 return 2;
2697 }
2698
2699 *rule_nr = 90;
2700 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2701 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2702 if (self == peer && self != ((u64)0))
2703 return 100;
2704
2705 *rule_nr = 100;
2706 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2707 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2708 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2709 peer = mdev->p_uuid[j] & ~((u64)1);
2710 if (self == peer)
2711 return -100;
2712 }
2713 }
2714
2715 return -1000;
2716}
2717
2718/* drbd_sync_handshake() returns the new conn state on success, or
2719 CONN_MASK (-1) on failure.
2720 */
2721static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2722 enum drbd_disk_state peer_disk) __must_hold(local)
2723{
2724 int hg, rule_nr;
2725 enum drbd_conns rv = C_MASK;
2726 enum drbd_disk_state mydisk;
2727
2728 mydisk = mdev->state.disk;
2729 if (mydisk == D_NEGOTIATING)
2730 mydisk = mdev->new_state_tmp.disk;
2731
2732 dev_info(DEV, "drbd_sync_handshake:\n");
2733 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2734 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2735 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2736
2737 hg = drbd_uuid_compare(mdev, &rule_nr);
2738
2739 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2740
2741 if (hg == -1000) {
2742 dev_alert(DEV, "Unrelated data, aborting!\n");
2743 return C_MASK;
2744 }
4a23f264
PR
2745 if (hg < -1000) {
2746 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
b411b363
PR
2747 return C_MASK;
2748 }
2749
2750 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2751 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2752 int f = (hg == -100) || abs(hg) == 2;
2753 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2754 if (f)
2755 hg = hg*2;
2756 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2757 hg > 0 ? "source" : "target");
2758 }
2759
3a11a487
AG
2760 if (abs(hg) == 100)
2761 drbd_khelper(mdev, "initial-split-brain");
2762
89e58e75 2763 if (hg == 100 || (hg == -100 && mdev->tconn->net_conf->always_asbp)) {
b411b363
PR
2764 int pcount = (mdev->state.role == R_PRIMARY)
2765 + (peer_role == R_PRIMARY);
2766 int forced = (hg == -100);
2767
2768 switch (pcount) {
2769 case 0:
2770 hg = drbd_asb_recover_0p(mdev);
2771 break;
2772 case 1:
2773 hg = drbd_asb_recover_1p(mdev);
2774 break;
2775 case 2:
2776 hg = drbd_asb_recover_2p(mdev);
2777 break;
2778 }
2779 if (abs(hg) < 100) {
2780 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2781 "automatically solved. Sync from %s node\n",
2782 pcount, (hg < 0) ? "peer" : "this");
2783 if (forced) {
2784 dev_warn(DEV, "Doing a full sync, since"
2785 " UUIDs where ambiguous.\n");
2786 hg = hg*2;
2787 }
2788 }
2789 }
2790
2791 if (hg == -100) {
89e58e75 2792 if (mdev->tconn->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
b411b363 2793 hg = -1;
89e58e75 2794 if (!mdev->tconn->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
b411b363
PR
2795 hg = 1;
2796
2797 if (abs(hg) < 100)
2798 dev_warn(DEV, "Split-Brain detected, manually solved. "
2799 "Sync from %s node\n",
2800 (hg < 0) ? "peer" : "this");
2801 }
2802
2803 if (hg == -100) {
580b9767
LE
2804 /* FIXME this log message is not correct if we end up here
2805 * after an attempted attach on a diskless node.
2806 * We just refuse to attach -- well, we drop the "connection"
2807 * to that disk, in a way... */
3a11a487 2808 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
b411b363
PR
2809 drbd_khelper(mdev, "split-brain");
2810 return C_MASK;
2811 }
2812
2813 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2814 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2815 return C_MASK;
2816 }
2817
2818 if (hg < 0 && /* by intention we do not use mydisk here. */
2819 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
89e58e75 2820 switch (mdev->tconn->net_conf->rr_conflict) {
b411b363
PR
2821 case ASB_CALL_HELPER:
2822 drbd_khelper(mdev, "pri-lost");
2823 /* fall through */
2824 case ASB_DISCONNECT:
2825 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2826 return C_MASK;
2827 case ASB_VIOLENTLY:
2828 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2829 "assumption\n");
2830 }
2831 }
2832
8169e41b 2833 if (mdev->tconn->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
cf14c2e9
PR
2834 if (hg == 0)
2835 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2836 else
2837 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2838 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2839 abs(hg) >= 2 ? "full" : "bit-map based");
2840 return C_MASK;
2841 }
2842
b411b363
PR
2843 if (abs(hg) >= 2) {
2844 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
20ceb2b2
LE
2845 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2846 BM_LOCKED_SET_ALLOWED))
b411b363
PR
2847 return C_MASK;
2848 }
2849
2850 if (hg > 0) { /* become sync source. */
2851 rv = C_WF_BITMAP_S;
2852 } else if (hg < 0) { /* become sync target */
2853 rv = C_WF_BITMAP_T;
2854 } else {
2855 rv = C_CONNECTED;
2856 if (drbd_bm_total_weight(mdev)) {
2857 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2858 drbd_bm_total_weight(mdev));
2859 }
2860 }
2861
2862 return rv;
2863}
2864
2865/* returns 1 if invalid */
2866static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2867{
2868 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2869 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2870 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2871 return 0;
2872
2873 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2874 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2875 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2876 return 1;
2877
2878 /* everything else is valid if they are equal on both sides. */
2879 if (peer == self)
2880 return 0;
2881
2882 /* everything es is invalid. */
2883 return 1;
2884}
2885
7204624c 2886static int receive_protocol(struct drbd_tconn *tconn, enum drbd_packet cmd,
d8763023 2887 unsigned int data_size)
b411b363 2888{
7204624c 2889 struct p_protocol *p = &tconn->data.rbuf.protocol;
b411b363 2890 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
cf14c2e9 2891 int p_want_lose, p_two_primaries, cf;
b411b363
PR
2892 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2893
b411b363
PR
2894 p_proto = be32_to_cpu(p->protocol);
2895 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2896 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2897 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
b411b363 2898 p_two_primaries = be32_to_cpu(p->two_primaries);
cf14c2e9
PR
2899 cf = be32_to_cpu(p->conn_flags);
2900 p_want_lose = cf & CF_WANT_LOSE;
2901
7204624c 2902 clear_bit(CONN_DRY_RUN, &tconn->flags);
cf14c2e9
PR
2903
2904 if (cf & CF_DRY_RUN)
7204624c 2905 set_bit(CONN_DRY_RUN, &tconn->flags);
b411b363 2906
7204624c
PR
2907 if (p_proto != tconn->net_conf->wire_protocol) {
2908 conn_err(tconn, "incompatible communication protocols\n");
b411b363
PR
2909 goto disconnect;
2910 }
2911
7204624c
PR
2912 if (cmp_after_sb(p_after_sb_0p, tconn->net_conf->after_sb_0p)) {
2913 conn_err(tconn, "incompatible after-sb-0pri settings\n");
b411b363
PR
2914 goto disconnect;
2915 }
2916
7204624c
PR
2917 if (cmp_after_sb(p_after_sb_1p, tconn->net_conf->after_sb_1p)) {
2918 conn_err(tconn, "incompatible after-sb-1pri settings\n");
b411b363
PR
2919 goto disconnect;
2920 }
2921
7204624c
PR
2922 if (cmp_after_sb(p_after_sb_2p, tconn->net_conf->after_sb_2p)) {
2923 conn_err(tconn, "incompatible after-sb-2pri settings\n");
b411b363
PR
2924 goto disconnect;
2925 }
2926
7204624c
PR
2927 if (p_want_lose && tconn->net_conf->want_lose) {
2928 conn_err(tconn, "both sides have the 'want_lose' flag set\n");
b411b363
PR
2929 goto disconnect;
2930 }
2931
7204624c
PR
2932 if (p_two_primaries != tconn->net_conf->two_primaries) {
2933 conn_err(tconn, "incompatible setting of the two-primaries options\n");
b411b363
PR
2934 goto disconnect;
2935 }
2936
7204624c
PR
2937 if (tconn->agreed_pro_version >= 87) {
2938 unsigned char *my_alg = tconn->net_conf->integrity_alg;
82bc0194 2939 int err;
b411b363 2940
82bc0194
AG
2941 err = drbd_recv_all(tconn, p_integrity_alg, data_size);
2942 if (err)
2943 return err;
b411b363
PR
2944
2945 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2946 if (strcmp(p_integrity_alg, my_alg)) {
7204624c 2947 conn_err(tconn, "incompatible setting of the data-integrity-alg\n");
b411b363
PR
2948 goto disconnect;
2949 }
7204624c 2950 conn_info(tconn, "data-integrity-alg: %s\n",
b411b363
PR
2951 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2952 }
2953
82bc0194 2954 return 0;
b411b363
PR
2955
2956disconnect:
7204624c 2957 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 2958 return -EIO;
b411b363
PR
2959}
2960
2961/* helper function
2962 * input: alg name, feature name
2963 * return: NULL (alg name was "")
2964 * ERR_PTR(error) if something goes wrong
2965 * or the crypto hash ptr, if it worked out ok. */
2966struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2967 const char *alg, const char *name)
2968{
2969 struct crypto_hash *tfm;
2970
2971 if (!alg[0])
2972 return NULL;
2973
2974 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2975 if (IS_ERR(tfm)) {
2976 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2977 alg, name, PTR_ERR(tfm));
2978 return tfm;
2979 }
2980 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2981 crypto_free_hash(tfm);
2982 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2983 return ERR_PTR(-EINVAL);
2984 }
2985 return tfm;
2986}
2987
d8763023
AG
2988static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packet cmd,
2989 unsigned int packet_size)
b411b363 2990{
e42325a5 2991 struct p_rs_param_95 *p = &mdev->tconn->data.rbuf.rs_param_95;
b411b363
PR
2992 unsigned int header_size, data_size, exp_max_sz;
2993 struct crypto_hash *verify_tfm = NULL;
2994 struct crypto_hash *csums_tfm = NULL;
31890f4a 2995 const int apv = mdev->tconn->agreed_pro_version;
778f271d
PR
2996 int *rs_plan_s = NULL;
2997 int fifo_size = 0;
82bc0194 2998 int err;
b411b363
PR
2999
3000 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
3001 : apv == 88 ? sizeof(struct p_rs_param)
3002 + SHARED_SECRET_MAX
8e26f9cc
PR
3003 : apv <= 94 ? sizeof(struct p_rs_param_89)
3004 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
b411b363 3005
02918be2 3006 if (packet_size > exp_max_sz) {
b411b363 3007 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
02918be2 3008 packet_size, exp_max_sz);
82bc0194 3009 return -EIO;
b411b363
PR
3010 }
3011
3012 if (apv <= 88) {
257d0af6 3013 header_size = sizeof(struct p_rs_param) - sizeof(struct p_header);
02918be2 3014 data_size = packet_size - header_size;
8e26f9cc 3015 } else if (apv <= 94) {
257d0af6 3016 header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header);
02918be2 3017 data_size = packet_size - header_size;
b411b363 3018 D_ASSERT(data_size == 0);
8e26f9cc 3019 } else {
257d0af6 3020 header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header);
02918be2 3021 data_size = packet_size - header_size;
b411b363
PR
3022 D_ASSERT(data_size == 0);
3023 }
3024
3025 /* initialize verify_alg and csums_alg */
3026 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3027
82bc0194
AG
3028 err = drbd_recv_all(mdev->tconn, &p->head.payload, header_size);
3029 if (err)
3030 return err;
b411b363 3031
f399002e
LE
3032 if (get_ldev(mdev)) {
3033 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3034 put_ldev(mdev);
3035 }
b411b363
PR
3036
3037 if (apv >= 88) {
3038 if (apv == 88) {
3039 if (data_size > SHARED_SECRET_MAX) {
3040 dev_err(DEV, "verify-alg too long, "
3041 "peer wants %u, accepting only %u byte\n",
3042 data_size, SHARED_SECRET_MAX);
82bc0194 3043 return -EIO;
b411b363
PR
3044 }
3045
82bc0194
AG
3046 err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3047 if (err)
3048 return err;
b411b363
PR
3049
3050 /* we expect NUL terminated string */
3051 /* but just in case someone tries to be evil */
3052 D_ASSERT(p->verify_alg[data_size-1] == 0);
3053 p->verify_alg[data_size-1] = 0;
3054
3055 } else /* apv >= 89 */ {
3056 /* we still expect NUL terminated strings */
3057 /* but just in case someone tries to be evil */
3058 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3059 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3060 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3061 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3062 }
3063
f399002e 3064 if (strcmp(mdev->tconn->net_conf->verify_alg, p->verify_alg)) {
b411b363
PR
3065 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3066 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
f399002e 3067 mdev->tconn->net_conf->verify_alg, p->verify_alg);
b411b363
PR
3068 goto disconnect;
3069 }
3070 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3071 p->verify_alg, "verify-alg");
3072 if (IS_ERR(verify_tfm)) {
3073 verify_tfm = NULL;
3074 goto disconnect;
3075 }
3076 }
3077
f399002e 3078 if (apv >= 89 && strcmp(mdev->tconn->net_conf->csums_alg, p->csums_alg)) {
b411b363
PR
3079 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3080 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
f399002e 3081 mdev->tconn->net_conf->csums_alg, p->csums_alg);
b411b363
PR
3082 goto disconnect;
3083 }
3084 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3085 p->csums_alg, "csums-alg");
3086 if (IS_ERR(csums_tfm)) {
3087 csums_tfm = NULL;
3088 goto disconnect;
3089 }
3090 }
3091
f399002e
LE
3092 if (apv > 94 && get_ldev(mdev)) {
3093 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3094 mdev->ldev->dc.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3095 mdev->ldev->dc.c_delay_target = be32_to_cpu(p->c_delay_target);
3096 mdev->ldev->dc.c_fill_target = be32_to_cpu(p->c_fill_target);
3097 mdev->ldev->dc.c_max_rate = be32_to_cpu(p->c_max_rate);
778f271d 3098
f399002e 3099 fifo_size = (mdev->ldev->dc.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
778f271d
PR
3100 if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
3101 rs_plan_s = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
3102 if (!rs_plan_s) {
3103 dev_err(DEV, "kmalloc of fifo_buffer failed");
f399002e 3104 put_ldev(mdev);
778f271d
PR
3105 goto disconnect;
3106 }
3107 }
f399002e 3108 put_ldev(mdev);
8e26f9cc 3109 }
b411b363
PR
3110
3111 spin_lock(&mdev->peer_seq_lock);
3112 /* lock against drbd_nl_syncer_conf() */
3113 if (verify_tfm) {
f399002e
LE
3114 strcpy(mdev->tconn->net_conf->verify_alg, p->verify_alg);
3115 mdev->tconn->net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3116 crypto_free_hash(mdev->tconn->verify_tfm);
3117 mdev->tconn->verify_tfm = verify_tfm;
b411b363
PR
3118 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3119 }
3120 if (csums_tfm) {
f399002e
LE
3121 strcpy(mdev->tconn->net_conf->csums_alg, p->csums_alg);
3122 mdev->tconn->net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3123 crypto_free_hash(mdev->tconn->csums_tfm);
3124 mdev->tconn->csums_tfm = csums_tfm;
b411b363
PR
3125 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3126 }
778f271d
PR
3127 if (fifo_size != mdev->rs_plan_s.size) {
3128 kfree(mdev->rs_plan_s.values);
3129 mdev->rs_plan_s.values = rs_plan_s;
3130 mdev->rs_plan_s.size = fifo_size;
3131 mdev->rs_planed = 0;
3132 }
b411b363
PR
3133 spin_unlock(&mdev->peer_seq_lock);
3134 }
82bc0194 3135 return 0;
b411b363 3136
b411b363
PR
3137disconnect:
3138 /* just for completeness: actually not needed,
3139 * as this is not reached if csums_tfm was ok. */
3140 crypto_free_hash(csums_tfm);
3141 /* but free the verify_tfm again, if csums_tfm did not work out */
3142 crypto_free_hash(verify_tfm);
38fa9988 3143 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3144 return -EIO;
b411b363
PR
3145}
3146
b411b363
PR
3147/* warn if the arguments differ by more than 12.5% */
3148static void warn_if_differ_considerably(struct drbd_conf *mdev,
3149 const char *s, sector_t a, sector_t b)
3150{
3151 sector_t d;
3152 if (a == 0 || b == 0)
3153 return;
3154 d = (a > b) ? (a - b) : (b - a);
3155 if (d > (a>>3) || d > (b>>3))
3156 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3157 (unsigned long long)a, (unsigned long long)b);
3158}
3159
d8763023
AG
3160static int receive_sizes(struct drbd_conf *mdev, enum drbd_packet cmd,
3161 unsigned int data_size)
b411b363 3162{
e42325a5 3163 struct p_sizes *p = &mdev->tconn->data.rbuf.sizes;
b411b363 3164 enum determine_dev_size dd = unchanged;
b411b363
PR
3165 sector_t p_size, p_usize, my_usize;
3166 int ldsc = 0; /* local disk size changed */
e89b591c 3167 enum dds_flags ddsf;
b411b363 3168
b411b363
PR
3169 p_size = be64_to_cpu(p->d_size);
3170 p_usize = be64_to_cpu(p->u_size);
3171
b411b363
PR
3172 /* just store the peer's disk size for now.
3173 * we still need to figure out whether we accept that. */
3174 mdev->p_size = p_size;
3175
b411b363
PR
3176 if (get_ldev(mdev)) {
3177 warn_if_differ_considerably(mdev, "lower level device sizes",
3178 p_size, drbd_get_max_capacity(mdev->ldev));
3179 warn_if_differ_considerably(mdev, "user requested size",
3180 p_usize, mdev->ldev->dc.disk_size);
3181
3182 /* if this is the first connect, or an otherwise expected
3183 * param exchange, choose the minimum */
3184 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3185 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3186 p_usize);
3187
3188 my_usize = mdev->ldev->dc.disk_size;
3189
3190 if (mdev->ldev->dc.disk_size != p_usize) {
3191 mdev->ldev->dc.disk_size = p_usize;
3192 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3193 (unsigned long)mdev->ldev->dc.disk_size);
3194 }
3195
3196 /* Never shrink a device with usable data during connect.
3197 But allow online shrinking if we are connected. */
a393db6f 3198 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
b411b363
PR
3199 drbd_get_capacity(mdev->this_bdev) &&
3200 mdev->state.disk >= D_OUTDATED &&
3201 mdev->state.conn < C_CONNECTED) {
3202 dev_err(DEV, "The peer's disk size is too small!\n");
38fa9988 3203 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
3204 mdev->ldev->dc.disk_size = my_usize;
3205 put_ldev(mdev);
82bc0194 3206 return -EIO;
b411b363
PR
3207 }
3208 put_ldev(mdev);
3209 }
b411b363 3210
e89b591c 3211 ddsf = be16_to_cpu(p->dds_flags);
b411b363 3212 if (get_ldev(mdev)) {
24c4830c 3213 dd = drbd_determine_dev_size(mdev, ddsf);
b411b363
PR
3214 put_ldev(mdev);
3215 if (dd == dev_size_error)
82bc0194 3216 return -EIO;
b411b363
PR
3217 drbd_md_sync(mdev);
3218 } else {
3219 /* I am diskless, need to accept the peer's size. */
3220 drbd_set_my_capacity(mdev, p_size);
3221 }
3222
99432fcc
PR
3223 mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3224 drbd_reconsider_max_bio_size(mdev);
3225
b411b363
PR
3226 if (get_ldev(mdev)) {
3227 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3228 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3229 ldsc = 1;
3230 }
3231
b411b363
PR
3232 put_ldev(mdev);
3233 }
3234
3235 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3236 if (be64_to_cpu(p->c_size) !=
3237 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3238 /* we have different sizes, probably peer
3239 * needs to know my new size... */
e89b591c 3240 drbd_send_sizes(mdev, 0, ddsf);
b411b363
PR
3241 }
3242 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3243 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3244 if (mdev->state.pdsk >= D_INCONSISTENT &&
e89b591c
PR
3245 mdev->state.disk >= D_INCONSISTENT) {
3246 if (ddsf & DDSF_NO_RESYNC)
3247 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3248 else
3249 resync_after_online_grow(mdev);
3250 } else
b411b363
PR
3251 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3252 }
3253 }
3254
82bc0194 3255 return 0;
b411b363
PR
3256}
3257
d8763023
AG
3258static int receive_uuids(struct drbd_conf *mdev, enum drbd_packet cmd,
3259 unsigned int data_size)
b411b363 3260{
e42325a5 3261 struct p_uuids *p = &mdev->tconn->data.rbuf.uuids;
b411b363 3262 u64 *p_uuid;
62b0da3a 3263 int i, updated_uuids = 0;
b411b363 3264
b411b363
PR
3265 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3266
3267 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3268 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3269
3270 kfree(mdev->p_uuid);
3271 mdev->p_uuid = p_uuid;
3272
3273 if (mdev->state.conn < C_CONNECTED &&
3274 mdev->state.disk < D_INCONSISTENT &&
3275 mdev->state.role == R_PRIMARY &&
3276 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3277 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3278 (unsigned long long)mdev->ed_uuid);
38fa9988 3279 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3280 return -EIO;
b411b363
PR
3281 }
3282
3283 if (get_ldev(mdev)) {
3284 int skip_initial_sync =
3285 mdev->state.conn == C_CONNECTED &&
31890f4a 3286 mdev->tconn->agreed_pro_version >= 90 &&
b411b363
PR
3287 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3288 (p_uuid[UI_FLAGS] & 8);
3289 if (skip_initial_sync) {
3290 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3291 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
20ceb2b2
LE
3292 "clear_n_write from receive_uuids",
3293 BM_LOCKED_TEST_ALLOWED);
b411b363
PR
3294 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3295 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3296 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3297 CS_VERBOSE, NULL);
3298 drbd_md_sync(mdev);
62b0da3a 3299 updated_uuids = 1;
b411b363
PR
3300 }
3301 put_ldev(mdev);
18a50fa2
PR
3302 } else if (mdev->state.disk < D_INCONSISTENT &&
3303 mdev->state.role == R_PRIMARY) {
3304 /* I am a diskless primary, the peer just created a new current UUID
3305 for me. */
62b0da3a 3306 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
b411b363
PR
3307 }
3308
3309 /* Before we test for the disk state, we should wait until an eventually
3310 ongoing cluster wide state change is finished. That is important if
3311 we are primary and are detaching from our disk. We need to see the
3312 new disk state... */
8410da8f
PR
3313 mutex_lock(mdev->state_mutex);
3314 mutex_unlock(mdev->state_mutex);
b411b363 3315 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
62b0da3a
LE
3316 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3317
3318 if (updated_uuids)
3319 drbd_print_uuids(mdev, "receiver updated UUIDs to");
b411b363 3320
82bc0194 3321 return 0;
b411b363
PR
3322}
3323
3324/**
3325 * convert_state() - Converts the peer's view of the cluster state to our point of view
3326 * @ps: The state as seen by the peer.
3327 */
3328static union drbd_state convert_state(union drbd_state ps)
3329{
3330 union drbd_state ms;
3331
3332 static enum drbd_conns c_tab[] = {
3333 [C_CONNECTED] = C_CONNECTED,
3334
3335 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3336 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3337 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3338 [C_VERIFY_S] = C_VERIFY_T,
3339 [C_MASK] = C_MASK,
3340 };
3341
3342 ms.i = ps.i;
3343
3344 ms.conn = c_tab[ps.conn];
3345 ms.peer = ps.role;
3346 ms.role = ps.peer;
3347 ms.pdsk = ps.disk;
3348 ms.disk = ps.pdsk;
3349 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3350
3351 return ms;
3352}
3353
d8763023
AG
3354static int receive_req_state(struct drbd_conf *mdev, enum drbd_packet cmd,
3355 unsigned int data_size)
b411b363 3356{
e42325a5 3357 struct p_req_state *p = &mdev->tconn->data.rbuf.req_state;
b411b363 3358 union drbd_state mask, val;
bf885f8a 3359 enum drbd_state_rv rv;
b411b363 3360
b411b363
PR
3361 mask.i = be32_to_cpu(p->mask);
3362 val.i = be32_to_cpu(p->val);
3363
25703f83 3364 if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
8410da8f 3365 mutex_is_locked(mdev->state_mutex)) {
b411b363 3366 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
82bc0194 3367 return 0;
b411b363
PR
3368 }
3369
3370 mask = convert_state(mask);
3371 val = convert_state(val);
3372
dfafcc8a
PR
3373 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3374 drbd_send_sr_reply(mdev, rv);
b411b363 3375
b411b363
PR
3376 drbd_md_sync(mdev);
3377
82bc0194 3378 return 0;
b411b363
PR
3379}
3380
dfafcc8a
PR
3381static int receive_req_conn_state(struct drbd_tconn *tconn, enum drbd_packet cmd,
3382 unsigned int data_size)
3383{
3384 struct p_req_state *p = &tconn->data.rbuf.req_state;
3385 union drbd_state mask, val;
3386 enum drbd_state_rv rv;
3387
3388 mask.i = be32_to_cpu(p->mask);
3389 val.i = be32_to_cpu(p->val);
3390
3391 if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3392 mutex_is_locked(&tconn->cstate_mutex)) {
3393 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
82bc0194 3394 return 0;
dfafcc8a
PR
3395 }
3396
3397 mask = convert_state(mask);
3398 val = convert_state(val);
3399
3400 rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY);
3401 conn_send_sr_reply(tconn, rv);
3402
82bc0194 3403 return 0;
dfafcc8a
PR
3404}
3405
d8763023
AG
3406static int receive_state(struct drbd_conf *mdev, enum drbd_packet cmd,
3407 unsigned int data_size)
b411b363 3408{
e42325a5 3409 struct p_state *p = &mdev->tconn->data.rbuf.state;
4ac4aada 3410 union drbd_state os, ns, peer_state;
b411b363 3411 enum drbd_disk_state real_peer_disk;
65d922c3 3412 enum chg_state_flags cs_flags;
b411b363
PR
3413 int rv;
3414
b411b363
PR
3415 peer_state.i = be32_to_cpu(p->state);
3416
3417 real_peer_disk = peer_state.disk;
3418 if (peer_state.disk == D_NEGOTIATING) {
3419 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3420 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3421 }
3422
87eeee41 3423 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 3424 retry:
4ac4aada 3425 os = ns = mdev->state;
87eeee41 3426 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 3427
e9ef7bb6
LE
3428 /* peer says his disk is uptodate, while we think it is inconsistent,
3429 * and this happens while we think we have a sync going on. */
3430 if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3431 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3432 /* If we are (becoming) SyncSource, but peer is still in sync
3433 * preparation, ignore its uptodate-ness to avoid flapping, it
3434 * will change to inconsistent once the peer reaches active
3435 * syncing states.
3436 * It may have changed syncer-paused flags, however, so we
3437 * cannot ignore this completely. */
3438 if (peer_state.conn > C_CONNECTED &&
3439 peer_state.conn < C_SYNC_SOURCE)
3440 real_peer_disk = D_INCONSISTENT;
3441
3442 /* if peer_state changes to connected at the same time,
3443 * it explicitly notifies us that it finished resync.
3444 * Maybe we should finish it up, too? */
3445 else if (os.conn >= C_SYNC_SOURCE &&
3446 peer_state.conn == C_CONNECTED) {
3447 if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3448 drbd_resync_finished(mdev);
82bc0194 3449 return 0;
e9ef7bb6
LE
3450 }
3451 }
3452
3453 /* peer says his disk is inconsistent, while we think it is uptodate,
3454 * and this happens while the peer still thinks we have a sync going on,
3455 * but we think we are already done with the sync.
3456 * We ignore this to avoid flapping pdsk.
3457 * This should not happen, if the peer is a recent version of drbd. */
3458 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3459 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3460 real_peer_disk = D_UP_TO_DATE;
3461
4ac4aada
LE
3462 if (ns.conn == C_WF_REPORT_PARAMS)
3463 ns.conn = C_CONNECTED;
b411b363 3464
67531718
PR
3465 if (peer_state.conn == C_AHEAD)
3466 ns.conn = C_BEHIND;
3467
b411b363
PR
3468 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3469 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3470 int cr; /* consider resync */
3471
3472 /* if we established a new connection */
4ac4aada 3473 cr = (os.conn < C_CONNECTED);
b411b363
PR
3474 /* if we had an established connection
3475 * and one of the nodes newly attaches a disk */
4ac4aada 3476 cr |= (os.conn == C_CONNECTED &&
b411b363 3477 (peer_state.disk == D_NEGOTIATING ||
4ac4aada 3478 os.disk == D_NEGOTIATING));
b411b363
PR
3479 /* if we have both been inconsistent, and the peer has been
3480 * forced to be UpToDate with --overwrite-data */
3481 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3482 /* if we had been plain connected, and the admin requested to
3483 * start a sync by "invalidate" or "invalidate-remote" */
4ac4aada 3484 cr |= (os.conn == C_CONNECTED &&
b411b363
PR
3485 (peer_state.conn >= C_STARTING_SYNC_S &&
3486 peer_state.conn <= C_WF_BITMAP_T));
3487
3488 if (cr)
4ac4aada 3489 ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
b411b363
PR
3490
3491 put_ldev(mdev);
4ac4aada
LE
3492 if (ns.conn == C_MASK) {
3493 ns.conn = C_CONNECTED;
b411b363 3494 if (mdev->state.disk == D_NEGOTIATING) {
82f59cc6 3495 drbd_force_state(mdev, NS(disk, D_FAILED));
b411b363
PR
3496 } else if (peer_state.disk == D_NEGOTIATING) {
3497 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3498 peer_state.disk = D_DISKLESS;
580b9767 3499 real_peer_disk = D_DISKLESS;
b411b363 3500 } else {
8169e41b 3501 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
82bc0194 3502 return -EIO;
4ac4aada 3503 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
38fa9988 3504 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3505 return -EIO;
b411b363
PR
3506 }
3507 }
3508 }
3509
87eeee41 3510 spin_lock_irq(&mdev->tconn->req_lock);
4ac4aada 3511 if (mdev->state.i != os.i)
b411b363
PR
3512 goto retry;
3513 clear_bit(CONSIDER_RESYNC, &mdev->flags);
b411b363
PR
3514 ns.peer = peer_state.role;
3515 ns.pdsk = real_peer_disk;
3516 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4ac4aada 3517 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
b411b363 3518 ns.disk = mdev->new_state_tmp.disk;
4ac4aada
LE
3519 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3520 if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
481c6f50 3521 test_bit(NEW_CUR_UUID, &mdev->flags)) {
8554df1c 3522 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
481c6f50 3523 for temporal network outages! */
87eeee41 3524 spin_unlock_irq(&mdev->tconn->req_lock);
481c6f50 3525 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
2f5cdd0b 3526 tl_clear(mdev->tconn);
481c6f50
PR
3527 drbd_uuid_new_current(mdev);
3528 clear_bit(NEW_CUR_UUID, &mdev->flags);
38fa9988 3529 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
82bc0194 3530 return -EIO;
481c6f50 3531 }
65d922c3 3532 rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
b411b363 3533 ns = mdev->state;
87eeee41 3534 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
3535
3536 if (rv < SS_SUCCESS) {
38fa9988 3537 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3538 return -EIO;
b411b363
PR
3539 }
3540
4ac4aada
LE
3541 if (os.conn > C_WF_REPORT_PARAMS) {
3542 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
b411b363
PR
3543 peer_state.disk != D_NEGOTIATING ) {
3544 /* we want resync, peer has not yet decided to sync... */
3545 /* Nowadays only used when forcing a node into primary role and
3546 setting its disk to UpToDate with that */
3547 drbd_send_uuids(mdev);
3548 drbd_send_state(mdev);
3549 }
3550 }
3551
89e58e75 3552 mdev->tconn->net_conf->want_lose = 0;
b411b363
PR
3553
3554 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3555
82bc0194 3556 return 0;
b411b363
PR
3557}
3558
d8763023
AG
3559static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packet cmd,
3560 unsigned int data_size)
b411b363 3561{
e42325a5 3562 struct p_rs_uuid *p = &mdev->tconn->data.rbuf.rs_uuid;
b411b363
PR
3563
3564 wait_event(mdev->misc_wait,
3565 mdev->state.conn == C_WF_SYNC_UUID ||
c4752ef1 3566 mdev->state.conn == C_BEHIND ||
b411b363
PR
3567 mdev->state.conn < C_CONNECTED ||
3568 mdev->state.disk < D_NEGOTIATING);
3569
3570 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3571
b411b363
PR
3572 /* Here the _drbd_uuid_ functions are right, current should
3573 _not_ be rotated into the history */
3574 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3575 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3576 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3577
62b0da3a 3578 drbd_print_uuids(mdev, "updated sync uuid");
b411b363
PR
3579 drbd_start_resync(mdev, C_SYNC_TARGET);
3580
3581 put_ldev(mdev);
3582 } else
3583 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3584
82bc0194 3585 return 0;
b411b363
PR
3586}
3587
2c46407d
AG
3588/**
3589 * receive_bitmap_plain
3590 *
3591 * Return 0 when done, 1 when another iteration is needed, and a negative error
3592 * code upon failure.
3593 */
3594static int
02918be2
PR
3595receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3596 unsigned long *buffer, struct bm_xfer_ctx *c)
b411b363
PR
3597{
3598 unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3599 unsigned want = num_words * sizeof(long);
2c46407d 3600 int err;
b411b363 3601
02918be2
PR
3602 if (want != data_size) {
3603 dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
2c46407d 3604 return -EIO;
b411b363
PR
3605 }
3606 if (want == 0)
2c46407d 3607 return 0;
82bc0194
AG
3608 err = drbd_recv_all(mdev->tconn, buffer, want);
3609 if (err)
2c46407d 3610 return err;
b411b363
PR
3611
3612 drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3613
3614 c->word_offset += num_words;
3615 c->bit_offset = c->word_offset * BITS_PER_LONG;
3616 if (c->bit_offset > c->bm_bits)
3617 c->bit_offset = c->bm_bits;
3618
2c46407d 3619 return 1;
b411b363
PR
3620}
3621
2c46407d
AG
3622/**
3623 * recv_bm_rle_bits
3624 *
3625 * Return 0 when done, 1 when another iteration is needed, and a negative error
3626 * code upon failure.
3627 */
3628static int
b411b363
PR
3629recv_bm_rle_bits(struct drbd_conf *mdev,
3630 struct p_compressed_bm *p,
c6d25cfe
PR
3631 struct bm_xfer_ctx *c,
3632 unsigned int len)
b411b363
PR
3633{
3634 struct bitstream bs;
3635 u64 look_ahead;
3636 u64 rl;
3637 u64 tmp;
3638 unsigned long s = c->bit_offset;
3639 unsigned long e;
b411b363
PR
3640 int toggle = DCBP_get_start(p);
3641 int have;
3642 int bits;
3643
3644 bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3645
3646 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3647 if (bits < 0)
2c46407d 3648 return -EIO;
b411b363
PR
3649
3650 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3651 bits = vli_decode_bits(&rl, look_ahead);
3652 if (bits <= 0)
2c46407d 3653 return -EIO;
b411b363
PR
3654
3655 if (toggle) {
3656 e = s + rl -1;
3657 if (e >= c->bm_bits) {
3658 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
2c46407d 3659 return -EIO;
b411b363
PR
3660 }
3661 _drbd_bm_set_bits(mdev, s, e);
3662 }
3663
3664 if (have < bits) {
3665 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3666 have, bits, look_ahead,
3667 (unsigned int)(bs.cur.b - p->code),
3668 (unsigned int)bs.buf_len);
2c46407d 3669 return -EIO;
b411b363
PR
3670 }
3671 look_ahead >>= bits;
3672 have -= bits;
3673
3674 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3675 if (bits < 0)
2c46407d 3676 return -EIO;
b411b363
PR
3677 look_ahead |= tmp << have;
3678 have += bits;
3679 }
3680
3681 c->bit_offset = s;
3682 bm_xfer_ctx_bit_to_word_offset(c);
3683
2c46407d 3684 return (s != c->bm_bits);
b411b363
PR
3685}
3686
2c46407d
AG
3687/**
3688 * decode_bitmap_c
3689 *
3690 * Return 0 when done, 1 when another iteration is needed, and a negative error
3691 * code upon failure.
3692 */
3693static int
b411b363
PR
3694decode_bitmap_c(struct drbd_conf *mdev,
3695 struct p_compressed_bm *p,
c6d25cfe
PR
3696 struct bm_xfer_ctx *c,
3697 unsigned int len)
b411b363
PR
3698{
3699 if (DCBP_get_code(p) == RLE_VLI_Bits)
c6d25cfe 3700 return recv_bm_rle_bits(mdev, p, c, len);
b411b363
PR
3701
3702 /* other variants had been implemented for evaluation,
3703 * but have been dropped as this one turned out to be "best"
3704 * during all our tests. */
3705
3706 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
38fa9988 3707 conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
2c46407d 3708 return -EIO;
b411b363
PR
3709}
3710
3711void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3712 const char *direction, struct bm_xfer_ctx *c)
3713{
3714 /* what would it take to transfer it "plaintext" */
c012949a 3715 unsigned plain = sizeof(struct p_header) *
b411b363
PR
3716 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3717 + c->bm_words * sizeof(long);
3718 unsigned total = c->bytes[0] + c->bytes[1];
3719 unsigned r;
3720
3721 /* total can not be zero. but just in case: */
3722 if (total == 0)
3723 return;
3724
3725 /* don't report if not compressed */
3726 if (total >= plain)
3727 return;
3728
3729 /* total < plain. check for overflow, still */
3730 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3731 : (1000 * total / plain);
3732
3733 if (r > 1000)
3734 r = 1000;
3735
3736 r = 1000 - r;
3737 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3738 "total %u; compression: %u.%u%%\n",
3739 direction,
3740 c->bytes[1], c->packets[1],
3741 c->bytes[0], c->packets[0],
3742 total, r/10, r % 10);
3743}
3744
3745/* Since we are processing the bitfield from lower addresses to higher,
3746 it does not matter if the process it in 32 bit chunks or 64 bit
3747 chunks as long as it is little endian. (Understand it as byte stream,
3748 beginning with the lowest byte...) If we would use big endian
3749 we would need to process it from the highest address to the lowest,
3750 in order to be agnostic to the 32 vs 64 bits issue.
3751
3752 returns 0 on failure, 1 if we successfully received it. */
d8763023
AG
3753static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packet cmd,
3754 unsigned int data_size)
b411b363
PR
3755{
3756 struct bm_xfer_ctx c;
3757 void *buffer;
2c46407d 3758 int err;
257d0af6 3759 struct p_header *h = &mdev->tconn->data.rbuf.header;
77351055 3760 struct packet_info pi;
b411b363 3761
20ceb2b2
LE
3762 drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3763 /* you are supposed to send additional out-of-sync information
3764 * if you actually set bits during this phase */
b411b363
PR
3765
3766 /* maybe we should use some per thread scratch page,
3767 * and allocate that during initial device creation? */
3768 buffer = (unsigned long *) __get_free_page(GFP_NOIO);
3769 if (!buffer) {
3770 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
82bc0194 3771 err = -ENOMEM;
b411b363
PR
3772 goto out;
3773 }
3774
3775 c = (struct bm_xfer_ctx) {
3776 .bm_bits = drbd_bm_bits(mdev),
3777 .bm_words = drbd_bm_words(mdev),
3778 };
3779
2c46407d 3780 for(;;) {
02918be2 3781 if (cmd == P_BITMAP) {
2c46407d 3782 err = receive_bitmap_plain(mdev, data_size, buffer, &c);
02918be2 3783 } else if (cmd == P_COMPRESSED_BITMAP) {
b411b363
PR
3784 /* MAYBE: sanity check that we speak proto >= 90,
3785 * and the feature is enabled! */
3786 struct p_compressed_bm *p;
3787
02918be2 3788 if (data_size > BM_PACKET_PAYLOAD_BYTES) {
b411b363 3789 dev_err(DEV, "ReportCBitmap packet too large\n");
82bc0194 3790 err = -EIO;
b411b363
PR
3791 goto out;
3792 }
3793 /* use the page buff */
3794 p = buffer;
3795 memcpy(p, h, sizeof(*h));
82bc0194
AG
3796 err = drbd_recv_all(mdev->tconn, p->head.payload, data_size);
3797 if (err)
3798 goto out;
004352fa
LE
3799 if (data_size <= (sizeof(*p) - sizeof(p->head))) {
3800 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
82bc0194 3801 err = -EIO;
78fcbdae 3802 goto out;
b411b363 3803 }
c6d25cfe 3804 err = decode_bitmap_c(mdev, p, &c, data_size);
b411b363 3805 } else {
02918be2 3806 dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
82bc0194 3807 err = -EIO;
b411b363
PR
3808 goto out;
3809 }
3810
02918be2 3811 c.packets[cmd == P_BITMAP]++;
257d0af6 3812 c.bytes[cmd == P_BITMAP] += sizeof(struct p_header) + data_size;
b411b363 3813
2c46407d
AG
3814 if (err <= 0) {
3815 if (err < 0)
3816 goto out;
b411b363 3817 break;
2c46407d 3818 }
82bc0194
AG
3819 err = drbd_recv_header(mdev->tconn, &pi);
3820 if (err)
b411b363 3821 goto out;
77351055
PR
3822 cmd = pi.cmd;
3823 data_size = pi.size;
2c46407d 3824 }
b411b363
PR
3825
3826 INFO_bm_xfer_stats(mdev, "receive", &c);
3827
3828 if (mdev->state.conn == C_WF_BITMAP_T) {
de1f8e4a
AG
3829 enum drbd_state_rv rv;
3830
82bc0194
AG
3831 err = drbd_send_bitmap(mdev);
3832 if (err)
b411b363
PR
3833 goto out;
3834 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
de1f8e4a
AG
3835 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3836 D_ASSERT(rv == SS_SUCCESS);
b411b363
PR
3837 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3838 /* admin may have requested C_DISCONNECTING,
3839 * other threads may have noticed network errors */
3840 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3841 drbd_conn_str(mdev->state.conn));
3842 }
82bc0194 3843 err = 0;
b411b363 3844
b411b363 3845 out:
20ceb2b2 3846 drbd_bm_unlock(mdev);
82bc0194 3847 if (!err && mdev->state.conn == C_WF_BITMAP_S)
b411b363
PR
3848 drbd_start_resync(mdev, C_SYNC_SOURCE);
3849 free_page((unsigned long) buffer);
82bc0194 3850 return err;
b411b363
PR
3851}
3852
2de876ef 3853static int _tconn_receive_skip(struct drbd_tconn *tconn, unsigned int data_size)
b411b363
PR
3854{
3855 /* TODO zero copy sink :) */
3856 static char sink[128];
3857 int size, want, r;
3858
02918be2 3859 size = data_size;
b411b363
PR
3860 while (size > 0) {
3861 want = min_t(int, size, sizeof(sink));
2de876ef
PR
3862 r = drbd_recv(tconn, sink, want);
3863 if (r <= 0)
841ce241 3864 break;
b411b363
PR
3865 size -= r;
3866 }
82bc0194 3867 return size ? -EIO : 0;
b411b363
PR
3868}
3869
2de876ef
PR
3870static int receive_skip(struct drbd_conf *mdev, enum drbd_packet cmd,
3871 unsigned int data_size)
3872{
3873 dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3874 cmd, data_size);
3875
3876 return _tconn_receive_skip(mdev->tconn, data_size);
3877}
3878
3879static int tconn_receive_skip(struct drbd_tconn *tconn, enum drbd_packet cmd, unsigned int data_size)
3880{
3881 conn_warn(tconn, "skipping packet for non existing volume type %d, l: %d!\n",
3882 cmd, data_size);
3883
3884 return _tconn_receive_skip(tconn, data_size);
3885}
3886
d8763023
AG
3887static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packet cmd,
3888 unsigned int data_size)
0ced55a3 3889{
e7f52dfb
LE
3890 /* Make sure we've acked all the TCP data associated
3891 * with the data requests being unplugged */
e42325a5 3892 drbd_tcp_quickack(mdev->tconn->data.socket);
0ced55a3 3893
82bc0194 3894 return 0;
0ced55a3
PR
3895}
3896
d8763023
AG
3897static int receive_out_of_sync(struct drbd_conf *mdev, enum drbd_packet cmd,
3898 unsigned int data_size)
73a01a18 3899{
e42325a5 3900 struct p_block_desc *p = &mdev->tconn->data.rbuf.block_desc;
73a01a18 3901
f735e363
LE
3902 switch (mdev->state.conn) {
3903 case C_WF_SYNC_UUID:
3904 case C_WF_BITMAP_T:
3905 case C_BEHIND:
3906 break;
3907 default:
3908 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3909 drbd_conn_str(mdev->state.conn));
3910 }
3911
73a01a18
PR
3912 drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3913
82bc0194 3914 return 0;
73a01a18
PR
3915}
3916
02918be2
PR
3917struct data_cmd {
3918 int expect_payload;
3919 size_t pkt_size;
a4fbda8e 3920 enum mdev_or_conn fa_type; /* first argument's type */
d9ae84e7
PR
3921 union {
3922 int (*mdev_fn)(struct drbd_conf *, enum drbd_packet cmd,
3923 unsigned int to_receive);
3924 int (*conn_fn)(struct drbd_tconn *, enum drbd_packet cmd,
3925 unsigned int to_receive);
3926 };
02918be2
PR
3927};
3928
3929static struct data_cmd drbd_cmd_handler[] = {
d9ae84e7
PR
3930 [P_DATA] = { 1, sizeof(struct p_data), MDEV, { receive_Data } },
3931 [P_DATA_REPLY] = { 1, sizeof(struct p_data), MDEV, { receive_DataReply } },
3932 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), MDEV, { receive_RSDataReply } } ,
3933 [P_BARRIER] = { 0, sizeof(struct p_barrier), MDEV, { receive_Barrier } } ,
3934 [P_BITMAP] = { 1, sizeof(struct p_header), MDEV, { receive_bitmap } } ,
3935 [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header), MDEV, { receive_bitmap } } ,
3936 [P_UNPLUG_REMOTE] = { 0, sizeof(struct p_header), MDEV, { receive_UnplugRemote } },
3937 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3938 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3939 [P_SYNC_PARAM] = { 1, sizeof(struct p_header), MDEV, { receive_SyncParam } },
3940 [P_SYNC_PARAM89] = { 1, sizeof(struct p_header), MDEV, { receive_SyncParam } },
7204624c 3941 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), CONN, { .conn_fn = receive_protocol } },
d9ae84e7
PR
3942 [P_UUIDS] = { 0, sizeof(struct p_uuids), MDEV, { receive_uuids } },
3943 [P_SIZES] = { 0, sizeof(struct p_sizes), MDEV, { receive_sizes } },
3944 [P_STATE] = { 0, sizeof(struct p_state), MDEV, { receive_state } },
3945 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), MDEV, { receive_req_state } },
3946 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), MDEV, { receive_sync_uuid } },
3947 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3948 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3949 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3950 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), MDEV, { receive_skip } },
3951 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), MDEV, { receive_out_of_sync } },
dfafcc8a 3952 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), CONN, { .conn_fn = receive_req_conn_state } },
b411b363
PR
3953};
3954
02918be2 3955/* All handler functions that expect a sub-header get that sub-heder in
e42325a5 3956 mdev->tconn->data.rbuf.header.head.payload.
02918be2 3957
e42325a5 3958 Usually in mdev->tconn->data.rbuf.header.head the callback can find the usual
02918be2
PR
3959 p_header, but they may not rely on that. Since there is also p_header95 !
3960 */
b411b363 3961
eefc2f7d 3962static void drbdd(struct drbd_tconn *tconn)
b411b363 3963{
eefc2f7d 3964 struct p_header *header = &tconn->data.rbuf.header;
77351055 3965 struct packet_info pi;
02918be2 3966 size_t shs; /* sub header size */
82bc0194 3967 int err;
b411b363 3968
eefc2f7d
PR
3969 while (get_t_state(&tconn->receiver) == RUNNING) {
3970 drbd_thread_current_set_cpu(&tconn->receiver);
69bc7bc3 3971 if (drbd_recv_header(tconn, &pi))
02918be2 3972 goto err_out;
b411b363 3973
6e849ce8 3974 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) ||
d9ae84e7 3975 !drbd_cmd_handler[pi.cmd].mdev_fn)) {
eefc2f7d 3976 conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
02918be2 3977 goto err_out;
0b33a916 3978 }
b411b363 3979
77351055
PR
3980 shs = drbd_cmd_handler[pi.cmd].pkt_size - sizeof(struct p_header);
3981 if (pi.size - shs > 0 && !drbd_cmd_handler[pi.cmd].expect_payload) {
eefc2f7d 3982 conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
02918be2 3983 goto err_out;
b411b363 3984 }
b411b363 3985
c13f7e1a 3986 if (shs) {
82bc0194
AG
3987 err = drbd_recv_all(tconn, &header->payload, shs);
3988 if (err) {
0ddc5549 3989 if (!signal_pending(current))
82bc0194 3990 conn_warn(tconn, "short read while reading sub header: rv=%d\n", err);
c13f7e1a
LE
3991 goto err_out;
3992 }
3993 }
3994
a4fbda8e 3995 if (drbd_cmd_handler[pi.cmd].fa_type == CONN) {
82bc0194 3996 err = drbd_cmd_handler[pi.cmd].conn_fn(tconn, pi.cmd, pi.size - shs);
d9ae84e7
PR
3997 } else {
3998 struct drbd_conf *mdev = vnr_to_mdev(tconn, pi.vnr);
82bc0194 3999 err = mdev ?
d9ae84e7
PR
4000 drbd_cmd_handler[pi.cmd].mdev_fn(mdev, pi.cmd, pi.size - shs) :
4001 tconn_receive_skip(tconn, pi.cmd, pi.size - shs);
4002 }
b411b363 4003
82bc0194 4004 if (unlikely(err)) {
eefc2f7d 4005 conn_err(tconn, "error receiving %s, l: %d!\n",
77351055 4006 cmdname(pi.cmd), pi.size);
02918be2 4007 goto err_out;
b411b363
PR
4008 }
4009 }
82bc0194 4010 return;
b411b363 4011
82bc0194
AG
4012 err_out:
4013 conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
b411b363
PR
4014}
4015
0e29d163 4016void conn_flush_workqueue(struct drbd_tconn *tconn)
b411b363
PR
4017{
4018 struct drbd_wq_barrier barr;
4019
4020 barr.w.cb = w_prev_work_done;
0e29d163 4021 barr.w.tconn = tconn;
b411b363 4022 init_completion(&barr.done);
0e29d163 4023 drbd_queue_work(&tconn->data.work, &barr.w);
b411b363
PR
4024 wait_for_completion(&barr.done);
4025}
4026
360cc740 4027static void drbd_disconnect(struct drbd_tconn *tconn)
b411b363 4028{
bbeb641c 4029 enum drbd_conns oc;
b411b363 4030 int rv = SS_UNKNOWN_ERROR;
b411b363 4031
bbeb641c 4032 if (tconn->cstate == C_STANDALONE)
b411b363 4033 return;
b411b363
PR
4034
4035 /* asender does not clean up anything. it must not interfere, either */
360cc740
PR
4036 drbd_thread_stop(&tconn->asender);
4037 drbd_free_sock(tconn);
4038
4039 idr_for_each(&tconn->volumes, drbd_disconnected, tconn);
4040
4041 conn_info(tconn, "Connection closed\n");
4042
4043 spin_lock_irq(&tconn->req_lock);
bbeb641c
PR
4044 oc = tconn->cstate;
4045 if (oc >= C_UNCONNECTED)
4046 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4047
360cc740
PR
4048 spin_unlock_irq(&tconn->req_lock);
4049
bbeb641c 4050 if (oc == C_DISCONNECTING) {
360cc740
PR
4051 wait_event(tconn->net_cnt_wait, atomic_read(&tconn->net_cnt) == 0);
4052
4053 crypto_free_hash(tconn->cram_hmac_tfm);
4054 tconn->cram_hmac_tfm = NULL;
4055
4056 kfree(tconn->net_conf);
4057 tconn->net_conf = NULL;
bbeb641c 4058 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE);
360cc740
PR
4059 }
4060}
4061
4062static int drbd_disconnected(int vnr, void *p, void *data)
4063{
4064 struct drbd_conf *mdev = (struct drbd_conf *)p;
4065 enum drbd_fencing_p fp;
4066 unsigned int i;
b411b363 4067
85719573 4068 /* wait for current activity to cease. */
87eeee41 4069 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
4070 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4071 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4072 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
87eeee41 4073 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
4074
4075 /* We do not have data structures that would allow us to
4076 * get the rs_pending_cnt down to 0 again.
4077 * * On C_SYNC_TARGET we do not have any data structures describing
4078 * the pending RSDataRequest's we have sent.
4079 * * On C_SYNC_SOURCE there is no data structure that tracks
4080 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4081 * And no, it is not the sum of the reference counts in the
4082 * resync_LRU. The resync_LRU tracks the whole operation including
4083 * the disk-IO, while the rs_pending_cnt only tracks the blocks
4084 * on the fly. */
4085 drbd_rs_cancel_all(mdev);
4086 mdev->rs_total = 0;
4087 mdev->rs_failed = 0;
4088 atomic_set(&mdev->rs_pending_cnt, 0);
4089 wake_up(&mdev->misc_wait);
4090
7fde2be9
PR
4091 del_timer(&mdev->request_timer);
4092
b411b363 4093 del_timer_sync(&mdev->resync_timer);
b411b363
PR
4094 resync_timer_fn((unsigned long)mdev);
4095
b411b363
PR
4096 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4097 * w_make_resync_request etc. which may still be on the worker queue
4098 * to be "canceled" */
a21e9298 4099 drbd_flush_workqueue(mdev);
b411b363
PR
4100
4101 /* This also does reclaim_net_ee(). If we do this too early, we might
4102 * miss some resync ee and pages.*/
4103 drbd_process_done_ee(mdev);
4104
4105 kfree(mdev->p_uuid);
4106 mdev->p_uuid = NULL;
4107
fb22c402 4108 if (!is_susp(mdev->state))
2f5cdd0b 4109 tl_clear(mdev->tconn);
b411b363 4110
b411b363
PR
4111 drbd_md_sync(mdev);
4112
4113 fp = FP_DONT_CARE;
4114 if (get_ldev(mdev)) {
4115 fp = mdev->ldev->dc.fencing;
4116 put_ldev(mdev);
4117 }
4118
87f7be4c
PR
4119 if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
4120 drbd_try_outdate_peer_async(mdev);
b411b363 4121
20ceb2b2
LE
4122 /* serialize with bitmap writeout triggered by the state change,
4123 * if any. */
4124 wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4125
b411b363
PR
4126 /* tcp_close and release of sendpage pages can be deferred. I don't
4127 * want to use SO_LINGER, because apparently it can be deferred for
4128 * more than 20 seconds (longest time I checked).
4129 *
4130 * Actually we don't care for exactly when the network stack does its
4131 * put_page(), but release our reference on these pages right here.
4132 */
4133 i = drbd_release_ee(mdev, &mdev->net_ee);
4134 if (i)
4135 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
435f0740
LE
4136 i = atomic_read(&mdev->pp_in_use_by_net);
4137 if (i)
4138 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
b411b363
PR
4139 i = atomic_read(&mdev->pp_in_use);
4140 if (i)
45bb912b 4141 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
b411b363
PR
4142
4143 D_ASSERT(list_empty(&mdev->read_ee));
4144 D_ASSERT(list_empty(&mdev->active_ee));
4145 D_ASSERT(list_empty(&mdev->sync_ee));
4146 D_ASSERT(list_empty(&mdev->done_ee));
4147
4148 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4149 atomic_set(&mdev->current_epoch->epoch_size, 0);
4150 D_ASSERT(list_empty(&mdev->current_epoch->list));
360cc740
PR
4151
4152 return 0;
b411b363
PR
4153}
4154
4155/*
4156 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4157 * we can agree on is stored in agreed_pro_version.
4158 *
4159 * feature flags and the reserved array should be enough room for future
4160 * enhancements of the handshake protocol, and possible plugins...
4161 *
4162 * for now, they are expected to be zero, but ignored.
4163 */
8a22cccc 4164static int drbd_send_handshake(struct drbd_tconn *tconn)
b411b363 4165{
e6b3ea83 4166 /* ASSERT current == mdev->tconn->receiver ... */
8a22cccc 4167 struct p_handshake *p = &tconn->data.sbuf.handshake;
e8d17b01 4168 int err;
b411b363 4169
8a22cccc
PR
4170 if (mutex_lock_interruptible(&tconn->data.mutex)) {
4171 conn_err(tconn, "interrupted during initial handshake\n");
e8d17b01 4172 return -EINTR;
b411b363
PR
4173 }
4174
8a22cccc
PR
4175 if (tconn->data.socket == NULL) {
4176 mutex_unlock(&tconn->data.mutex);
e8d17b01 4177 return -EIO;
b411b363
PR
4178 }
4179
4180 memset(p, 0, sizeof(*p));
4181 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4182 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
e8d17b01 4183 err = _conn_send_cmd(tconn, 0, tconn->data.socket, P_HAND_SHAKE,
ecf2363c 4184 &p->head, sizeof(*p), 0);
8a22cccc 4185 mutex_unlock(&tconn->data.mutex);
e8d17b01 4186 return err;
b411b363
PR
4187}
4188
4189/*
4190 * return values:
4191 * 1 yes, we have a valid connection
4192 * 0 oops, did not work out, please try again
4193 * -1 peer talks different language,
4194 * no point in trying again, please go standalone.
4195 */
65d11ed6 4196static int drbd_do_handshake(struct drbd_tconn *tconn)
b411b363 4197{
65d11ed6
PR
4198 /* ASSERT current == tconn->receiver ... */
4199 struct p_handshake *p = &tconn->data.rbuf.handshake;
02918be2 4200 const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
77351055 4201 struct packet_info pi;
e8d17b01 4202 int err, rv;
b411b363 4203
e8d17b01
AG
4204 err = drbd_send_handshake(tconn);
4205 if (err)
b411b363
PR
4206 return 0;
4207
69bc7bc3
AG
4208 err = drbd_recv_header(tconn, &pi);
4209 if (err)
b411b363
PR
4210 return 0;
4211
77351055 4212 if (pi.cmd != P_HAND_SHAKE) {
65d11ed6 4213 conn_err(tconn, "expected HandShake packet, received: %s (0x%04x)\n",
77351055 4214 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4215 return -1;
4216 }
4217
77351055 4218 if (pi.size != expect) {
65d11ed6 4219 conn_err(tconn, "expected HandShake length: %u, received: %u\n",
77351055 4220 expect, pi.size);
b411b363
PR
4221 return -1;
4222 }
4223
65d11ed6 4224 rv = drbd_recv(tconn, &p->head.payload, expect);
b411b363
PR
4225
4226 if (rv != expect) {
0ddc5549 4227 if (!signal_pending(current))
65d11ed6 4228 conn_warn(tconn, "short read receiving handshake packet: l=%u\n", rv);
b411b363
PR
4229 return 0;
4230 }
4231
b411b363
PR
4232 p->protocol_min = be32_to_cpu(p->protocol_min);
4233 p->protocol_max = be32_to_cpu(p->protocol_max);
4234 if (p->protocol_max == 0)
4235 p->protocol_max = p->protocol_min;
4236
4237 if (PRO_VERSION_MAX < p->protocol_min ||
4238 PRO_VERSION_MIN > p->protocol_max)
4239 goto incompat;
4240
65d11ed6 4241 tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
b411b363 4242
65d11ed6
PR
4243 conn_info(tconn, "Handshake successful: "
4244 "Agreed network protocol version %d\n", tconn->agreed_pro_version);
b411b363
PR
4245
4246 return 1;
4247
4248 incompat:
65d11ed6 4249 conn_err(tconn, "incompatible DRBD dialects: "
b411b363
PR
4250 "I support %d-%d, peer supports %d-%d\n",
4251 PRO_VERSION_MIN, PRO_VERSION_MAX,
4252 p->protocol_min, p->protocol_max);
4253 return -1;
4254}
4255
4256#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
13e6037d 4257static int drbd_do_auth(struct drbd_tconn *tconn)
b411b363
PR
4258{
4259 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4260 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
b10d96cb 4261 return -1;
b411b363
PR
4262}
4263#else
4264#define CHALLENGE_LEN 64
b10d96cb
JT
4265
4266/* Return value:
4267 1 - auth succeeded,
4268 0 - failed, try again (network error),
4269 -1 - auth failed, don't try again.
4270*/
4271
13e6037d 4272static int drbd_do_auth(struct drbd_tconn *tconn)
b411b363
PR
4273{
4274 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4275 struct scatterlist sg;
4276 char *response = NULL;
4277 char *right_response = NULL;
4278 char *peers_ch = NULL;
13e6037d 4279 unsigned int key_len = strlen(tconn->net_conf->shared_secret);
b411b363
PR
4280 unsigned int resp_size;
4281 struct hash_desc desc;
77351055 4282 struct packet_info pi;
69bc7bc3 4283 int err, rv;
b411b363 4284
13e6037d 4285 desc.tfm = tconn->cram_hmac_tfm;
b411b363
PR
4286 desc.flags = 0;
4287
13e6037d
PR
4288 rv = crypto_hash_setkey(tconn->cram_hmac_tfm,
4289 (u8 *)tconn->net_conf->shared_secret, key_len);
b411b363 4290 if (rv) {
13e6037d 4291 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
b10d96cb 4292 rv = -1;
b411b363
PR
4293 goto fail;
4294 }
4295
4296 get_random_bytes(my_challenge, CHALLENGE_LEN);
4297
ce9879cb 4298 rv = !conn_send_cmd2(tconn, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
b411b363
PR
4299 if (!rv)
4300 goto fail;
4301
69bc7bc3
AG
4302 err = drbd_recv_header(tconn, &pi);
4303 if (err) {
4304 rv = 0;
b411b363 4305 goto fail;
69bc7bc3 4306 }
b411b363 4307
77351055 4308 if (pi.cmd != P_AUTH_CHALLENGE) {
13e6037d 4309 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
77351055 4310 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4311 rv = 0;
4312 goto fail;
4313 }
4314
77351055 4315 if (pi.size > CHALLENGE_LEN * 2) {
13e6037d 4316 conn_err(tconn, "expected AuthChallenge payload too big.\n");
b10d96cb 4317 rv = -1;
b411b363
PR
4318 goto fail;
4319 }
4320
77351055 4321 peers_ch = kmalloc(pi.size, GFP_NOIO);
b411b363 4322 if (peers_ch == NULL) {
13e6037d 4323 conn_err(tconn, "kmalloc of peers_ch failed\n");
b10d96cb 4324 rv = -1;
b411b363
PR
4325 goto fail;
4326 }
4327
13e6037d 4328 rv = drbd_recv(tconn, peers_ch, pi.size);
b411b363 4329
77351055 4330 if (rv != pi.size) {
0ddc5549 4331 if (!signal_pending(current))
13e6037d 4332 conn_warn(tconn, "short read AuthChallenge: l=%u\n", rv);
b411b363
PR
4333 rv = 0;
4334 goto fail;
4335 }
4336
13e6037d 4337 resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
b411b363
PR
4338 response = kmalloc(resp_size, GFP_NOIO);
4339 if (response == NULL) {
13e6037d 4340 conn_err(tconn, "kmalloc of response failed\n");
b10d96cb 4341 rv = -1;
b411b363
PR
4342 goto fail;
4343 }
4344
4345 sg_init_table(&sg, 1);
77351055 4346 sg_set_buf(&sg, peers_ch, pi.size);
b411b363
PR
4347
4348 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4349 if (rv) {
13e6037d 4350 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4351 rv = -1;
b411b363
PR
4352 goto fail;
4353 }
4354
ce9879cb 4355 rv = !conn_send_cmd2(tconn, P_AUTH_RESPONSE, response, resp_size);
b411b363
PR
4356 if (!rv)
4357 goto fail;
4358
69bc7bc3
AG
4359 err = drbd_recv_header(tconn, &pi);
4360 if (err) {
4361 rv = 0;
b411b363 4362 goto fail;
69bc7bc3 4363 }
b411b363 4364
77351055 4365 if (pi.cmd != P_AUTH_RESPONSE) {
13e6037d 4366 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
77351055 4367 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4368 rv = 0;
4369 goto fail;
4370 }
4371
77351055 4372 if (pi.size != resp_size) {
13e6037d 4373 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
b411b363
PR
4374 rv = 0;
4375 goto fail;
4376 }
4377
13e6037d 4378 rv = drbd_recv(tconn, response , resp_size);
b411b363
PR
4379
4380 if (rv != resp_size) {
0ddc5549 4381 if (!signal_pending(current))
13e6037d 4382 conn_warn(tconn, "short read receiving AuthResponse: l=%u\n", rv);
b411b363
PR
4383 rv = 0;
4384 goto fail;
4385 }
4386
4387 right_response = kmalloc(resp_size, GFP_NOIO);
2d1ee87d 4388 if (right_response == NULL) {
13e6037d 4389 conn_err(tconn, "kmalloc of right_response failed\n");
b10d96cb 4390 rv = -1;
b411b363
PR
4391 goto fail;
4392 }
4393
4394 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4395
4396 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4397 if (rv) {
13e6037d 4398 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4399 rv = -1;
b411b363
PR
4400 goto fail;
4401 }
4402
4403 rv = !memcmp(response, right_response, resp_size);
4404
4405 if (rv)
13e6037d
PR
4406 conn_info(tconn, "Peer authenticated using %d bytes of '%s' HMAC\n",
4407 resp_size, tconn->net_conf->cram_hmac_alg);
b10d96cb
JT
4408 else
4409 rv = -1;
b411b363
PR
4410
4411 fail:
4412 kfree(peers_ch);
4413 kfree(response);
4414 kfree(right_response);
4415
4416 return rv;
4417}
4418#endif
4419
4420int drbdd_init(struct drbd_thread *thi)
4421{
392c8801 4422 struct drbd_tconn *tconn = thi->tconn;
b411b363
PR
4423 int h;
4424
4d641dd7 4425 conn_info(tconn, "receiver (re)started\n");
b411b363
PR
4426
4427 do {
4d641dd7 4428 h = drbd_connect(tconn);
b411b363 4429 if (h == 0) {
4d641dd7 4430 drbd_disconnect(tconn);
20ee6390 4431 schedule_timeout_interruptible(HZ);
b411b363
PR
4432 }
4433 if (h == -1) {
4d641dd7 4434 conn_warn(tconn, "Discarding network configuration.\n");
bbeb641c 4435 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
4436 }
4437 } while (h == 0);
4438
4439 if (h > 0) {
4d641dd7
PR
4440 if (get_net_conf(tconn)) {
4441 drbdd(tconn);
4442 put_net_conf(tconn);
b411b363
PR
4443 }
4444 }
4445
4d641dd7 4446 drbd_disconnect(tconn);
b411b363 4447
4d641dd7 4448 conn_info(tconn, "receiver terminated\n");
b411b363
PR
4449 return 0;
4450}
4451
4452/* ********* acknowledge sender ******** */
4453
e4f78ede
PR
4454static int got_conn_RqSReply(struct drbd_tconn *tconn, enum drbd_packet cmd)
4455{
4456 struct p_req_state_reply *p = &tconn->meta.rbuf.req_state_reply;
4457 int retcode = be32_to_cpu(p->retcode);
4458
4459 if (retcode >= SS_SUCCESS) {
4460 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4461 } else {
4462 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4463 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4464 drbd_set_st_err_str(retcode), retcode);
4465 }
4466 wake_up(&tconn->ping_wait);
4467
4468 return true;
4469}
4470
d8763023 4471static int got_RqSReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4472{
257d0af6 4473 struct p_req_state_reply *p = &mdev->tconn->meta.rbuf.req_state_reply;
b411b363
PR
4474 int retcode = be32_to_cpu(p->retcode);
4475
e4f78ede
PR
4476 if (retcode >= SS_SUCCESS) {
4477 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4478 } else {
4479 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4480 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4481 drbd_set_st_err_str(retcode), retcode);
b411b363 4482 }
e4f78ede
PR
4483 wake_up(&mdev->state_wait);
4484
81e84650 4485 return true;
b411b363
PR
4486}
4487
f19e4f8b 4488static int got_Ping(struct drbd_tconn *tconn, enum drbd_packet cmd)
b411b363 4489{
f19e4f8b 4490 return drbd_send_ping_ack(tconn);
b411b363
PR
4491
4492}
4493
f19e4f8b 4494static int got_PingAck(struct drbd_tconn *tconn, enum drbd_packet cmd)
b411b363
PR
4495{
4496 /* restore idle timeout */
2a67d8b9
PR
4497 tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4498 if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4499 wake_up(&tconn->ping_wait);
b411b363 4500
81e84650 4501 return true;
b411b363
PR
4502}
4503
d8763023 4504static int got_IsInSync(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4505{
257d0af6 4506 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4507 sector_t sector = be64_to_cpu(p->sector);
4508 int blksize = be32_to_cpu(p->blksize);
4509
31890f4a 4510 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
b411b363
PR
4511
4512 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4513
1d53f09e
LE
4514 if (get_ldev(mdev)) {
4515 drbd_rs_complete_io(mdev, sector);
4516 drbd_set_in_sync(mdev, sector, blksize);
4517 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4518 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4519 put_ldev(mdev);
4520 }
b411b363 4521 dec_rs_pending(mdev);
778f271d 4522 atomic_add(blksize >> 9, &mdev->rs_sect_in);
b411b363 4523
81e84650 4524 return true;
b411b363
PR
4525}
4526
bc9c5c41
AG
4527static int
4528validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4529 struct rb_root *root, const char *func,
4530 enum drbd_req_event what, bool missing_ok)
b411b363
PR
4531{
4532 struct drbd_request *req;
4533 struct bio_and_error m;
4534
87eeee41 4535 spin_lock_irq(&mdev->tconn->req_lock);
bc9c5c41 4536 req = find_request(mdev, root, id, sector, missing_ok, func);
b411b363 4537 if (unlikely(!req)) {
87eeee41 4538 spin_unlock_irq(&mdev->tconn->req_lock);
81e84650 4539 return false;
b411b363
PR
4540 }
4541 __req_mod(req, what, &m);
87eeee41 4542 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
4543
4544 if (m.bio)
4545 complete_master_bio(mdev, &m);
81e84650 4546 return true;
b411b363
PR
4547}
4548
d8763023 4549static int got_BlockAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4550{
257d0af6 4551 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4552 sector_t sector = be64_to_cpu(p->sector);
4553 int blksize = be32_to_cpu(p->blksize);
4554 enum drbd_req_event what;
4555
4556 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4557
579b57ed 4558 if (p->block_id == ID_SYNCER) {
b411b363
PR
4559 drbd_set_in_sync(mdev, sector, blksize);
4560 dec_rs_pending(mdev);
81e84650 4561 return true;
b411b363 4562 }
257d0af6 4563 switch (cmd) {
b411b363 4564 case P_RS_WRITE_ACK:
89e58e75 4565 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4566 what = WRITE_ACKED_BY_PEER_AND_SIS;
b411b363
PR
4567 break;
4568 case P_WRITE_ACK:
89e58e75 4569 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4570 what = WRITE_ACKED_BY_PEER;
b411b363
PR
4571 break;
4572 case P_RECV_ACK:
89e58e75 4573 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B);
8554df1c 4574 what = RECV_ACKED_BY_PEER;
b411b363 4575 break;
7be8da07 4576 case P_DISCARD_WRITE:
89e58e75 4577 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
7be8da07
AG
4578 what = DISCARD_WRITE;
4579 break;
4580 case P_RETRY_WRITE:
4581 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4582 what = POSTPONE_WRITE;
b411b363
PR
4583 break;
4584 default:
4585 D_ASSERT(0);
81e84650 4586 return false;
b411b363
PR
4587 }
4588
4589 return validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41
AG
4590 &mdev->write_requests, __func__,
4591 what, false);
b411b363
PR
4592}
4593
d8763023 4594static int got_NegAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4595{
257d0af6 4596 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363 4597 sector_t sector = be64_to_cpu(p->sector);
2deb8336 4598 int size = be32_to_cpu(p->blksize);
89e58e75
PR
4599 bool missing_ok = mdev->tconn->net_conf->wire_protocol == DRBD_PROT_A ||
4600 mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B;
c3afd8f5 4601 bool found;
b411b363
PR
4602
4603 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4604
579b57ed 4605 if (p->block_id == ID_SYNCER) {
b411b363
PR
4606 dec_rs_pending(mdev);
4607 drbd_rs_failed_io(mdev, sector, size);
81e84650 4608 return true;
b411b363 4609 }
2deb8336 4610
c3afd8f5 4611 found = validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41 4612 &mdev->write_requests, __func__,
8554df1c 4613 NEG_ACKED, missing_ok);
c3afd8f5
AG
4614 if (!found) {
4615 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4616 The master bio might already be completed, therefore the
4617 request is no longer in the collision hash. */
4618 /* In Protocol B we might already have got a P_RECV_ACK
4619 but then get a P_NEG_ACK afterwards. */
4620 if (!missing_ok)
2deb8336 4621 return false;
c3afd8f5 4622 drbd_set_out_of_sync(mdev, sector, size);
2deb8336 4623 }
2deb8336 4624 return true;
b411b363
PR
4625}
4626
d8763023 4627static int got_NegDReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4628{
257d0af6 4629 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4630 sector_t sector = be64_to_cpu(p->sector);
4631
4632 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
7be8da07 4633
b411b363
PR
4634 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4635 (unsigned long long)sector, be32_to_cpu(p->blksize));
4636
4637 return validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41 4638 &mdev->read_requests, __func__,
8554df1c 4639 NEG_ACKED, false);
b411b363
PR
4640}
4641
d8763023 4642static int got_NegRSDReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363
PR
4643{
4644 sector_t sector;
4645 int size;
257d0af6 4646 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4647
4648 sector = be64_to_cpu(p->sector);
4649 size = be32_to_cpu(p->blksize);
b411b363
PR
4650
4651 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4652
4653 dec_rs_pending(mdev);
4654
4655 if (get_ldev_if_state(mdev, D_FAILED)) {
4656 drbd_rs_complete_io(mdev, sector);
257d0af6 4657 switch (cmd) {
d612d309
PR
4658 case P_NEG_RS_DREPLY:
4659 drbd_rs_failed_io(mdev, sector, size);
4660 case P_RS_CANCEL:
4661 break;
4662 default:
4663 D_ASSERT(0);
4664 put_ldev(mdev);
4665 return false;
4666 }
b411b363
PR
4667 put_ldev(mdev);
4668 }
4669
81e84650 4670 return true;
b411b363
PR
4671}
4672
d8763023 4673static int got_BarrierAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4674{
257d0af6 4675 struct p_barrier_ack *p = &mdev->tconn->meta.rbuf.barrier_ack;
b411b363 4676
2f5cdd0b 4677 tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
b411b363 4678
c4752ef1
PR
4679 if (mdev->state.conn == C_AHEAD &&
4680 atomic_read(&mdev->ap_in_flight) == 0 &&
370a43e7
PR
4681 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4682 mdev->start_resync_timer.expires = jiffies + HZ;
4683 add_timer(&mdev->start_resync_timer);
c4752ef1
PR
4684 }
4685
81e84650 4686 return true;
b411b363
PR
4687}
4688
d8763023 4689static int got_OVResult(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4690{
257d0af6 4691 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4692 struct drbd_work *w;
4693 sector_t sector;
4694 int size;
4695
4696 sector = be64_to_cpu(p->sector);
4697 size = be32_to_cpu(p->blksize);
4698
4699 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4700
4701 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
8f7bed77 4702 drbd_ov_out_of_sync_found(mdev, sector, size);
b411b363 4703 else
8f7bed77 4704 ov_out_of_sync_print(mdev);
b411b363 4705
1d53f09e 4706 if (!get_ldev(mdev))
81e84650 4707 return true;
1d53f09e 4708
b411b363
PR
4709 drbd_rs_complete_io(mdev, sector);
4710 dec_rs_pending(mdev);
4711
ea5442af
LE
4712 --mdev->ov_left;
4713
4714 /* let's advance progress step marks only for every other megabyte */
4715 if ((mdev->ov_left & 0x200) == 0x200)
4716 drbd_advance_rs_marks(mdev, mdev->ov_left);
4717
4718 if (mdev->ov_left == 0) {
b411b363
PR
4719 w = kmalloc(sizeof(*w), GFP_NOIO);
4720 if (w) {
4721 w->cb = w_ov_finished;
a21e9298 4722 w->mdev = mdev;
e42325a5 4723 drbd_queue_work_front(&mdev->tconn->data.work, w);
b411b363
PR
4724 } else {
4725 dev_err(DEV, "kmalloc(w) failed.");
8f7bed77 4726 ov_out_of_sync_print(mdev);
b411b363
PR
4727 drbd_resync_finished(mdev);
4728 }
4729 }
1d53f09e 4730 put_ldev(mdev);
81e84650 4731 return true;
b411b363
PR
4732}
4733
d8763023 4734static int got_skip(struct drbd_conf *mdev, enum drbd_packet cmd)
0ced55a3 4735{
81e84650 4736 return true;
0ced55a3
PR
4737}
4738
32862ec7
PR
4739static int tconn_process_done_ee(struct drbd_tconn *tconn)
4740{
082a3439
PR
4741 struct drbd_conf *mdev;
4742 int i, not_empty = 0;
32862ec7
PR
4743
4744 do {
4745 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4746 flush_signals(current);
082a3439 4747 idr_for_each_entry(&tconn->volumes, mdev, i) {
e2b3032b 4748 if (drbd_process_done_ee(mdev))
082a3439
PR
4749 return 1; /* error */
4750 }
32862ec7 4751 set_bit(SIGNAL_ASENDER, &tconn->flags);
082a3439
PR
4752
4753 spin_lock_irq(&tconn->req_lock);
4754 idr_for_each_entry(&tconn->volumes, mdev, i) {
4755 not_empty = !list_empty(&mdev->done_ee);
4756 if (not_empty)
4757 break;
4758 }
4759 spin_unlock_irq(&tconn->req_lock);
32862ec7
PR
4760 } while (not_empty);
4761
4762 return 0;
4763}
4764
7201b972
AG
4765struct asender_cmd {
4766 size_t pkt_size;
a4fbda8e
PR
4767 enum mdev_or_conn fa_type; /* first argument's type */
4768 union {
4769 int (*mdev_fn)(struct drbd_conf *mdev, enum drbd_packet cmd);
4770 int (*conn_fn)(struct drbd_tconn *tconn, enum drbd_packet cmd);
4771 };
7201b972
AG
4772};
4773
4774static struct asender_cmd asender_tbl[] = {
f19e4f8b
PR
4775 [P_PING] = { sizeof(struct p_header), CONN, { .conn_fn = got_Ping } },
4776 [P_PING_ACK] = { sizeof(struct p_header), CONN, { .conn_fn = got_PingAck } },
a4fbda8e
PR
4777 [P_RECV_ACK] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
4778 [P_WRITE_ACK] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
4779 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
4780 [P_DISCARD_WRITE] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
4781 [P_NEG_ACK] = { sizeof(struct p_block_ack), MDEV, { got_NegAck } },
4782 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), MDEV, { got_NegDReply } },
4783 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), MDEV, { got_NegRSDReply } },
4784 [P_OV_RESULT] = { sizeof(struct p_block_ack), MDEV, { got_OVResult } },
4785 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), MDEV, { got_BarrierAck } },
4786 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), MDEV, { got_RqSReply } },
4787 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), MDEV, { got_IsInSync } },
4788 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), MDEV, { got_skip } },
4789 [P_RS_CANCEL] = { sizeof(struct p_block_ack), MDEV, { got_NegRSDReply } },
e4f78ede 4790 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), CONN, {.conn_fn = got_conn_RqSReply}},
a4fbda8e 4791 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
7201b972
AG
4792};
4793
b411b363
PR
4794int drbd_asender(struct drbd_thread *thi)
4795{
392c8801 4796 struct drbd_tconn *tconn = thi->tconn;
32862ec7 4797 struct p_header *h = &tconn->meta.rbuf.header;
b411b363 4798 struct asender_cmd *cmd = NULL;
77351055 4799 struct packet_info pi;
257d0af6 4800 int rv;
b411b363
PR
4801 void *buf = h;
4802 int received = 0;
257d0af6 4803 int expect = sizeof(struct p_header);
f36af18c 4804 int ping_timeout_active = 0;
b411b363 4805
b411b363
PR
4806 current->policy = SCHED_RR; /* Make this a realtime task! */
4807 current->rt_priority = 2; /* more important than all other tasks */
4808
e77a0a5c 4809 while (get_t_state(thi) == RUNNING) {
80822284 4810 drbd_thread_current_set_cpu(thi);
32862ec7 4811 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
2a67d8b9 4812 if (!drbd_send_ping(tconn)) {
32862ec7 4813 conn_err(tconn, "drbd_send_ping has failed\n");
841ce241
AG
4814 goto reconnect;
4815 }
32862ec7
PR
4816 tconn->meta.socket->sk->sk_rcvtimeo =
4817 tconn->net_conf->ping_timeo*HZ/10;
f36af18c 4818 ping_timeout_active = 1;
b411b363
PR
4819 }
4820
32862ec7
PR
4821 /* TODO: conditionally cork; it may hurt latency if we cork without
4822 much to send */
4823 if (!tconn->net_conf->no_cork)
4824 drbd_tcp_cork(tconn->meta.socket);
082a3439
PR
4825 if (tconn_process_done_ee(tconn)) {
4826 conn_err(tconn, "tconn_process_done_ee() failed\n");
32862ec7 4827 goto reconnect;
082a3439 4828 }
b411b363 4829 /* but unconditionally uncork unless disabled */
32862ec7
PR
4830 if (!tconn->net_conf->no_cork)
4831 drbd_tcp_uncork(tconn->meta.socket);
b411b363
PR
4832
4833 /* short circuit, recv_msg would return EINTR anyways. */
4834 if (signal_pending(current))
4835 continue;
4836
32862ec7
PR
4837 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
4838 clear_bit(SIGNAL_ASENDER, &tconn->flags);
b411b363
PR
4839
4840 flush_signals(current);
4841
4842 /* Note:
4843 * -EINTR (on meta) we got a signal
4844 * -EAGAIN (on meta) rcvtimeo expired
4845 * -ECONNRESET other side closed the connection
4846 * -ERESTARTSYS (on data) we got a signal
4847 * rv < 0 other than above: unexpected error!
4848 * rv == expected: full header or command
4849 * rv < expected: "woken" by signal during receive
4850 * rv == 0 : "connection shut down by peer"
4851 */
4852 if (likely(rv > 0)) {
4853 received += rv;
4854 buf += rv;
4855 } else if (rv == 0) {
32862ec7 4856 conn_err(tconn, "meta connection shut down by peer.\n");
b411b363
PR
4857 goto reconnect;
4858 } else if (rv == -EAGAIN) {
cb6518cb
LE
4859 /* If the data socket received something meanwhile,
4860 * that is good enough: peer is still alive. */
32862ec7
PR
4861 if (time_after(tconn->last_received,
4862 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
cb6518cb 4863 continue;
f36af18c 4864 if (ping_timeout_active) {
32862ec7 4865 conn_err(tconn, "PingAck did not arrive in time.\n");
b411b363
PR
4866 goto reconnect;
4867 }
32862ec7 4868 set_bit(SEND_PING, &tconn->flags);
b411b363
PR
4869 continue;
4870 } else if (rv == -EINTR) {
4871 continue;
4872 } else {
32862ec7 4873 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
b411b363
PR
4874 goto reconnect;
4875 }
4876
4877 if (received == expect && cmd == NULL) {
8172f3e9 4878 if (decode_header(tconn, h, &pi))
b411b363 4879 goto reconnect;
7201b972
AG
4880 cmd = &asender_tbl[pi.cmd];
4881 if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd) {
32862ec7 4882 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
77351055 4883 pi.cmd, pi.size);
b411b363
PR
4884 goto disconnect;
4885 }
4886 expect = cmd->pkt_size;
77351055 4887 if (pi.size != expect - sizeof(struct p_header)) {
32862ec7 4888 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
77351055 4889 pi.cmd, pi.size);
b411b363 4890 goto reconnect;
257d0af6 4891 }
b411b363
PR
4892 }
4893 if (received == expect) {
a4fbda8e
PR
4894 bool rv;
4895
4896 if (cmd->fa_type == CONN) {
4897 rv = cmd->conn_fn(tconn, pi.cmd);
4898 } else {
4899 struct drbd_conf *mdev = vnr_to_mdev(tconn, pi.vnr);
4900 rv = cmd->mdev_fn(mdev, pi.cmd);
4901 }
4902
4903 if (!rv)
b411b363
PR
4904 goto reconnect;
4905
a4fbda8e
PR
4906 tconn->last_received = jiffies;
4907
f36af18c
LE
4908 /* the idle_timeout (ping-int)
4909 * has been restored in got_PingAck() */
7201b972 4910 if (cmd == &asender_tbl[P_PING_ACK])
f36af18c
LE
4911 ping_timeout_active = 0;
4912
b411b363
PR
4913 buf = h;
4914 received = 0;
257d0af6 4915 expect = sizeof(struct p_header);
b411b363
PR
4916 cmd = NULL;
4917 }
4918 }
4919
4920 if (0) {
4921reconnect:
bbeb641c 4922 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
b411b363
PR
4923 }
4924 if (0) {
4925disconnect:
bbeb641c 4926 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 4927 }
32862ec7 4928 clear_bit(SIGNAL_ASENDER, &tconn->flags);
b411b363 4929
32862ec7 4930 conn_info(tconn, "asender terminated\n");
b411b363
PR
4931
4932 return 0;
4933}