]> git.proxmox.com Git - mirror_ubuntu-artful-kernel.git/blame - drivers/block/drbd/drbd_receiver.c
drbd: Add drbd_recv_all(): Receive an entire buffer
[mirror_ubuntu-artful-kernel.git] / drivers / block / drbd / drbd_receiver.c
CommitLineData
b411b363
PR
1/*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
b411b363
PR
26#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
b411b363
PR
31#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
b411b363
PR
39#include <linux/pkt_sched.h>
40#define __KERNEL_SYSCALLS__
41#include <linux/unistd.h>
42#include <linux/vmalloc.h>
43#include <linux/random.h>
b411b363
PR
44#include <linux/string.h>
45#include <linux/scatterlist.h>
46#include "drbd_int.h"
b411b363
PR
47#include "drbd_req.h"
48
49#include "drbd_vli.h"
50
77351055
PR
51struct packet_info {
52 enum drbd_packet cmd;
53 int size;
54 int vnr;
55};
56
b411b363
PR
57enum finish_epoch {
58 FE_STILL_LIVE,
59 FE_DESTROYED,
60 FE_RECYCLED,
61};
62
a4fbda8e
PR
63enum mdev_or_conn {
64 MDEV,
65 CONN,
66};
67
65d11ed6 68static int drbd_do_handshake(struct drbd_tconn *tconn);
13e6037d 69static int drbd_do_auth(struct drbd_tconn *tconn);
360cc740 70static int drbd_disconnected(int vnr, void *p, void *data);
b411b363
PR
71
72static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
99920dc5 73static int e_end_block(struct drbd_work *, int);
b411b363 74
b411b363
PR
75
76#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
77
45bb912b
LE
78/*
79 * some helper functions to deal with single linked page lists,
80 * page->private being our "next" pointer.
81 */
82
83/* If at least n pages are linked at head, get n pages off.
84 * Otherwise, don't modify head, and return NULL.
85 * Locking is the responsibility of the caller.
86 */
87static struct page *page_chain_del(struct page **head, int n)
88{
89 struct page *page;
90 struct page *tmp;
91
92 BUG_ON(!n);
93 BUG_ON(!head);
94
95 page = *head;
23ce4227
PR
96
97 if (!page)
98 return NULL;
99
45bb912b
LE
100 while (page) {
101 tmp = page_chain_next(page);
102 if (--n == 0)
103 break; /* found sufficient pages */
104 if (tmp == NULL)
105 /* insufficient pages, don't use any of them. */
106 return NULL;
107 page = tmp;
108 }
109
110 /* add end of list marker for the returned list */
111 set_page_private(page, 0);
112 /* actual return value, and adjustment of head */
113 page = *head;
114 *head = tmp;
115 return page;
116}
117
118/* may be used outside of locks to find the tail of a (usually short)
119 * "private" page chain, before adding it back to a global chain head
120 * with page_chain_add() under a spinlock. */
121static struct page *page_chain_tail(struct page *page, int *len)
122{
123 struct page *tmp;
124 int i = 1;
125 while ((tmp = page_chain_next(page)))
126 ++i, page = tmp;
127 if (len)
128 *len = i;
129 return page;
130}
131
132static int page_chain_free(struct page *page)
133{
134 struct page *tmp;
135 int i = 0;
136 page_chain_for_each_safe(page, tmp) {
137 put_page(page);
138 ++i;
139 }
140 return i;
141}
142
143static void page_chain_add(struct page **head,
144 struct page *chain_first, struct page *chain_last)
145{
146#if 1
147 struct page *tmp;
148 tmp = page_chain_tail(chain_first, NULL);
149 BUG_ON(tmp != chain_last);
150#endif
151
152 /* add chain to head */
153 set_page_private(chain_last, (unsigned long)*head);
154 *head = chain_first;
155}
156
157static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
b411b363
PR
158{
159 struct page *page = NULL;
45bb912b
LE
160 struct page *tmp = NULL;
161 int i = 0;
b411b363
PR
162
163 /* Yes, testing drbd_pp_vacant outside the lock is racy.
164 * So what. It saves a spin_lock. */
45bb912b 165 if (drbd_pp_vacant >= number) {
b411b363 166 spin_lock(&drbd_pp_lock);
45bb912b
LE
167 page = page_chain_del(&drbd_pp_pool, number);
168 if (page)
169 drbd_pp_vacant -= number;
b411b363 170 spin_unlock(&drbd_pp_lock);
45bb912b
LE
171 if (page)
172 return page;
b411b363 173 }
45bb912b 174
b411b363
PR
175 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
176 * "criss-cross" setup, that might cause write-out on some other DRBD,
177 * which in turn might block on the other node at this very place. */
45bb912b
LE
178 for (i = 0; i < number; i++) {
179 tmp = alloc_page(GFP_TRY);
180 if (!tmp)
181 break;
182 set_page_private(tmp, (unsigned long)page);
183 page = tmp;
184 }
185
186 if (i == number)
187 return page;
188
189 /* Not enough pages immediately available this time.
190 * No need to jump around here, drbd_pp_alloc will retry this
191 * function "soon". */
192 if (page) {
193 tmp = page_chain_tail(page, NULL);
194 spin_lock(&drbd_pp_lock);
195 page_chain_add(&drbd_pp_pool, page, tmp);
196 drbd_pp_vacant += i;
197 spin_unlock(&drbd_pp_lock);
198 }
199 return NULL;
b411b363
PR
200}
201
b411b363
PR
202static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
203{
db830c46 204 struct drbd_peer_request *peer_req;
b411b363
PR
205 struct list_head *le, *tle;
206
207 /* The EEs are always appended to the end of the list. Since
208 they are sent in order over the wire, they have to finish
209 in order. As soon as we see the first not finished we can
210 stop to examine the list... */
211
212 list_for_each_safe(le, tle, &mdev->net_ee) {
db830c46
AG
213 peer_req = list_entry(le, struct drbd_peer_request, w.list);
214 if (drbd_ee_has_active_page(peer_req))
b411b363
PR
215 break;
216 list_move(le, to_be_freed);
217 }
218}
219
220static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
221{
222 LIST_HEAD(reclaimed);
db830c46 223 struct drbd_peer_request *peer_req, *t;
b411b363 224
87eeee41 225 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 226 reclaim_net_ee(mdev, &reclaimed);
87eeee41 227 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 228
db830c46
AG
229 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
230 drbd_free_net_ee(mdev, peer_req);
b411b363
PR
231}
232
233/**
45bb912b 234 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
b411b363 235 * @mdev: DRBD device.
45bb912b
LE
236 * @number: number of pages requested
237 * @retry: whether to retry, if not enough pages are available right now
238 *
239 * Tries to allocate number pages, first from our own page pool, then from
240 * the kernel, unless this allocation would exceed the max_buffers setting.
241 * Possibly retry until DRBD frees sufficient pages somewhere else.
b411b363 242 *
45bb912b 243 * Returns a page chain linked via page->private.
b411b363 244 */
45bb912b 245static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
b411b363
PR
246{
247 struct page *page = NULL;
248 DEFINE_WAIT(wait);
249
45bb912b
LE
250 /* Yes, we may run up to @number over max_buffers. If we
251 * follow it strictly, the admin will get it wrong anyways. */
89e58e75 252 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers)
45bb912b 253 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363 254
45bb912b 255 while (page == NULL) {
b411b363
PR
256 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
257
258 drbd_kick_lo_and_reclaim_net(mdev);
259
89e58e75 260 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers) {
45bb912b 261 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363
PR
262 if (page)
263 break;
264 }
265
266 if (!retry)
267 break;
268
269 if (signal_pending(current)) {
270 dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
271 break;
272 }
273
274 schedule();
275 }
276 finish_wait(&drbd_pp_wait, &wait);
277
45bb912b
LE
278 if (page)
279 atomic_add(number, &mdev->pp_in_use);
b411b363
PR
280 return page;
281}
282
283/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
87eeee41 284 * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
45bb912b
LE
285 * Either links the page chain back to the global pool,
286 * or returns all pages to the system. */
435f0740 287static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
b411b363 288{
435f0740 289 atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
b411b363 290 int i;
435f0740 291
81a5d60e 292 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
45bb912b
LE
293 i = page_chain_free(page);
294 else {
295 struct page *tmp;
296 tmp = page_chain_tail(page, &i);
297 spin_lock(&drbd_pp_lock);
298 page_chain_add(&drbd_pp_pool, page, tmp);
299 drbd_pp_vacant += i;
300 spin_unlock(&drbd_pp_lock);
b411b363 301 }
435f0740 302 i = atomic_sub_return(i, a);
45bb912b 303 if (i < 0)
435f0740
LE
304 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
305 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
b411b363
PR
306 wake_up(&drbd_pp_wait);
307}
308
309/*
310You need to hold the req_lock:
311 _drbd_wait_ee_list_empty()
312
313You must not have the req_lock:
314 drbd_free_ee()
315 drbd_alloc_ee()
316 drbd_init_ee()
317 drbd_release_ee()
318 drbd_ee_fix_bhs()
319 drbd_process_done_ee()
320 drbd_clear_done_ee()
321 drbd_wait_ee_list_empty()
322*/
323
f6ffca9f
AG
324struct drbd_peer_request *
325drbd_alloc_ee(struct drbd_conf *mdev, u64 id, sector_t sector,
326 unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
b411b363 327{
db830c46 328 struct drbd_peer_request *peer_req;
b411b363 329 struct page *page;
45bb912b 330 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
b411b363 331
0cf9d27e 332 if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
b411b363
PR
333 return NULL;
334
db830c46
AG
335 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
336 if (!peer_req) {
b411b363
PR
337 if (!(gfp_mask & __GFP_NOWARN))
338 dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
339 return NULL;
340 }
341
45bb912b
LE
342 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
343 if (!page)
344 goto fail;
b411b363 345
db830c46
AG
346 drbd_clear_interval(&peer_req->i);
347 peer_req->i.size = data_size;
348 peer_req->i.sector = sector;
349 peer_req->i.local = false;
350 peer_req->i.waiting = false;
351
352 peer_req->epoch = NULL;
a21e9298 353 peer_req->w.mdev = mdev;
db830c46
AG
354 peer_req->pages = page;
355 atomic_set(&peer_req->pending_bios, 0);
356 peer_req->flags = 0;
9a8e7753
AG
357 /*
358 * The block_id is opaque to the receiver. It is not endianness
359 * converted, and sent back to the sender unchanged.
360 */
db830c46 361 peer_req->block_id = id;
b411b363 362
db830c46 363 return peer_req;
b411b363 364
45bb912b 365 fail:
db830c46 366 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
367 return NULL;
368}
369
db830c46 370void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
f6ffca9f 371 int is_net)
b411b363 372{
db830c46
AG
373 if (peer_req->flags & EE_HAS_DIGEST)
374 kfree(peer_req->digest);
375 drbd_pp_free(mdev, peer_req->pages, is_net);
376 D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
377 D_ASSERT(drbd_interval_empty(&peer_req->i));
378 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
379}
380
381int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
382{
383 LIST_HEAD(work_list);
db830c46 384 struct drbd_peer_request *peer_req, *t;
b411b363 385 int count = 0;
435f0740 386 int is_net = list == &mdev->net_ee;
b411b363 387
87eeee41 388 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 389 list_splice_init(list, &work_list);
87eeee41 390 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 391
db830c46
AG
392 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
393 drbd_free_some_ee(mdev, peer_req, is_net);
b411b363
PR
394 count++;
395 }
396 return count;
397}
398
399
32862ec7 400/* See also comments in _req_mod(,BARRIER_ACKED)
b411b363
PR
401 * and receive_Barrier.
402 *
403 * Move entries from net_ee to done_ee, if ready.
404 * Grab done_ee, call all callbacks, free the entries.
405 * The callbacks typically send out ACKs.
406 */
407static int drbd_process_done_ee(struct drbd_conf *mdev)
408{
409 LIST_HEAD(work_list);
410 LIST_HEAD(reclaimed);
db830c46 411 struct drbd_peer_request *peer_req, *t;
e2b3032b 412 int err = 0;
b411b363 413
87eeee41 414 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
415 reclaim_net_ee(mdev, &reclaimed);
416 list_splice_init(&mdev->done_ee, &work_list);
87eeee41 417 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 418
db830c46
AG
419 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
420 drbd_free_net_ee(mdev, peer_req);
b411b363
PR
421
422 /* possible callbacks here:
7be8da07 423 * e_end_block, and e_end_resync_block, e_send_discard_write.
b411b363
PR
424 * all ignore the last argument.
425 */
db830c46 426 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
e2b3032b
AG
427 int err2;
428
b411b363 429 /* list_del not necessary, next/prev members not touched */
e2b3032b
AG
430 err2 = peer_req->w.cb(&peer_req->w, !!err);
431 if (!err)
432 err = err2;
db830c46 433 drbd_free_ee(mdev, peer_req);
b411b363
PR
434 }
435 wake_up(&mdev->ee_wait);
436
e2b3032b 437 return err;
b411b363
PR
438}
439
440void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
441{
442 DEFINE_WAIT(wait);
443
444 /* avoids spin_lock/unlock
445 * and calling prepare_to_wait in the fast path */
446 while (!list_empty(head)) {
447 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
87eeee41 448 spin_unlock_irq(&mdev->tconn->req_lock);
7eaceacc 449 io_schedule();
b411b363 450 finish_wait(&mdev->ee_wait, &wait);
87eeee41 451 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
452 }
453}
454
455void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
456{
87eeee41 457 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 458 _drbd_wait_ee_list_empty(mdev, head);
87eeee41 459 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
460}
461
462/* see also kernel_accept; which is only present since 2.6.18.
463 * also we want to log which part of it failed, exactly */
7653620d 464static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
b411b363
PR
465{
466 struct sock *sk = sock->sk;
467 int err = 0;
468
469 *what = "listen";
470 err = sock->ops->listen(sock, 5);
471 if (err < 0)
472 goto out;
473
474 *what = "sock_create_lite";
475 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
476 newsock);
477 if (err < 0)
478 goto out;
479
480 *what = "accept";
481 err = sock->ops->accept(sock, *newsock, 0);
482 if (err < 0) {
483 sock_release(*newsock);
484 *newsock = NULL;
485 goto out;
486 }
487 (*newsock)->ops = sock->ops;
488
489out:
490 return err;
491}
492
dbd9eea0 493static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
b411b363
PR
494{
495 mm_segment_t oldfs;
496 struct kvec iov = {
497 .iov_base = buf,
498 .iov_len = size,
499 };
500 struct msghdr msg = {
501 .msg_iovlen = 1,
502 .msg_iov = (struct iovec *)&iov,
503 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
504 };
505 int rv;
506
507 oldfs = get_fs();
508 set_fs(KERNEL_DS);
509 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
510 set_fs(oldfs);
511
512 return rv;
513}
514
de0ff338 515static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
b411b363
PR
516{
517 mm_segment_t oldfs;
518 struct kvec iov = {
519 .iov_base = buf,
520 .iov_len = size,
521 };
522 struct msghdr msg = {
523 .msg_iovlen = 1,
524 .msg_iov = (struct iovec *)&iov,
525 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
526 };
527 int rv;
528
529 oldfs = get_fs();
530 set_fs(KERNEL_DS);
531
532 for (;;) {
de0ff338 533 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
b411b363
PR
534 if (rv == size)
535 break;
536
537 /* Note:
538 * ECONNRESET other side closed the connection
539 * ERESTARTSYS (on sock) we got a signal
540 */
541
542 if (rv < 0) {
543 if (rv == -ECONNRESET)
de0ff338 544 conn_info(tconn, "sock was reset by peer\n");
b411b363 545 else if (rv != -ERESTARTSYS)
de0ff338 546 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
b411b363
PR
547 break;
548 } else if (rv == 0) {
de0ff338 549 conn_info(tconn, "sock was shut down by peer\n");
b411b363
PR
550 break;
551 } else {
552 /* signal came in, or peer/link went down,
553 * after we read a partial message
554 */
555 /* D_ASSERT(signal_pending(current)); */
556 break;
557 }
558 };
559
560 set_fs(oldfs);
561
562 if (rv != size)
bbeb641c 563 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
b411b363
PR
564
565 return rv;
566}
567
c6967746
AG
568static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
569{
570 int err;
571
572 err = drbd_recv(tconn, buf, size);
573 if (err != size) {
574 if (err >= 0)
575 err = -EIO;
576 } else
577 err = 0;
578 return err;
579}
580
5dbf1673
LE
581/* quoting tcp(7):
582 * On individual connections, the socket buffer size must be set prior to the
583 * listen(2) or connect(2) calls in order to have it take effect.
584 * This is our wrapper to do so.
585 */
586static void drbd_setbufsize(struct socket *sock, unsigned int snd,
587 unsigned int rcv)
588{
589 /* open coded SO_SNDBUF, SO_RCVBUF */
590 if (snd) {
591 sock->sk->sk_sndbuf = snd;
592 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
593 }
594 if (rcv) {
595 sock->sk->sk_rcvbuf = rcv;
596 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
597 }
598}
599
eac3e990 600static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
b411b363
PR
601{
602 const char *what;
603 struct socket *sock;
604 struct sockaddr_in6 src_in6;
605 int err;
606 int disconnect_on_error = 1;
607
eac3e990 608 if (!get_net_conf(tconn))
b411b363
PR
609 return NULL;
610
611 what = "sock_create_kern";
eac3e990 612 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
b411b363
PR
613 SOCK_STREAM, IPPROTO_TCP, &sock);
614 if (err < 0) {
615 sock = NULL;
616 goto out;
617 }
618
619 sock->sk->sk_rcvtimeo =
eac3e990
PR
620 sock->sk->sk_sndtimeo = tconn->net_conf->try_connect_int*HZ;
621 drbd_setbufsize(sock, tconn->net_conf->sndbuf_size,
622 tconn->net_conf->rcvbuf_size);
b411b363
PR
623
624 /* explicitly bind to the configured IP as source IP
625 * for the outgoing connections.
626 * This is needed for multihomed hosts and to be
627 * able to use lo: interfaces for drbd.
628 * Make sure to use 0 as port number, so linux selects
629 * a free one dynamically.
630 */
eac3e990
PR
631 memcpy(&src_in6, tconn->net_conf->my_addr,
632 min_t(int, tconn->net_conf->my_addr_len, sizeof(src_in6)));
633 if (((struct sockaddr *)tconn->net_conf->my_addr)->sa_family == AF_INET6)
b411b363
PR
634 src_in6.sin6_port = 0;
635 else
636 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
637
638 what = "bind before connect";
639 err = sock->ops->bind(sock,
640 (struct sockaddr *) &src_in6,
eac3e990 641 tconn->net_conf->my_addr_len);
b411b363
PR
642 if (err < 0)
643 goto out;
644
645 /* connect may fail, peer not yet available.
646 * stay C_WF_CONNECTION, don't go Disconnecting! */
647 disconnect_on_error = 0;
648 what = "connect";
649 err = sock->ops->connect(sock,
eac3e990
PR
650 (struct sockaddr *)tconn->net_conf->peer_addr,
651 tconn->net_conf->peer_addr_len, 0);
b411b363
PR
652
653out:
654 if (err < 0) {
655 if (sock) {
656 sock_release(sock);
657 sock = NULL;
658 }
659 switch (-err) {
660 /* timeout, busy, signal pending */
661 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
662 case EINTR: case ERESTARTSYS:
663 /* peer not (yet) available, network problem */
664 case ECONNREFUSED: case ENETUNREACH:
665 case EHOSTDOWN: case EHOSTUNREACH:
666 disconnect_on_error = 0;
667 break;
668 default:
eac3e990 669 conn_err(tconn, "%s failed, err = %d\n", what, err);
b411b363
PR
670 }
671 if (disconnect_on_error)
bbeb641c 672 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 673 }
eac3e990 674 put_net_conf(tconn);
b411b363
PR
675 return sock;
676}
677
7653620d 678static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
b411b363
PR
679{
680 int timeo, err;
681 struct socket *s_estab = NULL, *s_listen;
682 const char *what;
683
7653620d 684 if (!get_net_conf(tconn))
b411b363
PR
685 return NULL;
686
687 what = "sock_create_kern";
7653620d 688 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
b411b363
PR
689 SOCK_STREAM, IPPROTO_TCP, &s_listen);
690 if (err) {
691 s_listen = NULL;
692 goto out;
693 }
694
7653620d 695 timeo = tconn->net_conf->try_connect_int * HZ;
b411b363
PR
696 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
697
698 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
699 s_listen->sk->sk_rcvtimeo = timeo;
700 s_listen->sk->sk_sndtimeo = timeo;
7653620d
PR
701 drbd_setbufsize(s_listen, tconn->net_conf->sndbuf_size,
702 tconn->net_conf->rcvbuf_size);
b411b363
PR
703
704 what = "bind before listen";
705 err = s_listen->ops->bind(s_listen,
7653620d
PR
706 (struct sockaddr *) tconn->net_conf->my_addr,
707 tconn->net_conf->my_addr_len);
b411b363
PR
708 if (err < 0)
709 goto out;
710
7653620d 711 err = drbd_accept(&what, s_listen, &s_estab);
b411b363
PR
712
713out:
714 if (s_listen)
715 sock_release(s_listen);
716 if (err < 0) {
717 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
7653620d 718 conn_err(tconn, "%s failed, err = %d\n", what, err);
bbeb641c 719 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
720 }
721 }
7653620d 722 put_net_conf(tconn);
b411b363
PR
723
724 return s_estab;
725}
726
d38e787e 727static int drbd_send_fp(struct drbd_tconn *tconn, struct socket *sock, enum drbd_packet cmd)
b411b363 728{
d38e787e 729 struct p_header *h = &tconn->data.sbuf.header;
b411b363 730
ecf2363c 731 return !_conn_send_cmd(tconn, 0, sock, cmd, h, sizeof(*h), 0);
b411b363
PR
732}
733
a25b63f1 734static enum drbd_packet drbd_recv_fp(struct drbd_tconn *tconn, struct socket *sock)
b411b363 735{
a25b63f1 736 struct p_header80 *h = &tconn->data.rbuf.header.h80;
b411b363
PR
737 int rr;
738
dbd9eea0 739 rr = drbd_recv_short(sock, h, sizeof(*h), 0);
b411b363 740
ca9bc12b 741 if (rr == sizeof(*h) && h->magic == cpu_to_be32(DRBD_MAGIC))
b411b363
PR
742 return be16_to_cpu(h->command);
743
744 return 0xffff;
745}
746
747/**
748 * drbd_socket_okay() - Free the socket if its connection is not okay
b411b363
PR
749 * @sock: pointer to the pointer to the socket.
750 */
dbd9eea0 751static int drbd_socket_okay(struct socket **sock)
b411b363
PR
752{
753 int rr;
754 char tb[4];
755
756 if (!*sock)
81e84650 757 return false;
b411b363 758
dbd9eea0 759 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
b411b363
PR
760
761 if (rr > 0 || rr == -EAGAIN) {
81e84650 762 return true;
b411b363
PR
763 } else {
764 sock_release(*sock);
765 *sock = NULL;
81e84650 766 return false;
b411b363
PR
767 }
768}
2325eb66
PR
769/* Gets called if a connection is established, or if a new minor gets created
770 in a connection */
771int drbd_connected(int vnr, void *p, void *data)
907599e0
PR
772{
773 struct drbd_conf *mdev = (struct drbd_conf *)p;
774 int ok = 1;
775
776 atomic_set(&mdev->packet_seq, 0);
777 mdev->peer_seq = 0;
778
8410da8f
PR
779 mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
780 &mdev->tconn->cstate_mutex :
781 &mdev->own_state_mutex;
782
103ea275 783 ok &= !drbd_send_sync_param(mdev);
f02d4d0a 784 ok &= !drbd_send_sizes(mdev, 0, 0);
2ae5f95b 785 ok &= !drbd_send_uuids(mdev);
927036f9 786 ok &= !drbd_send_state(mdev);
907599e0
PR
787 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
788 clear_bit(RESIZE_PENDING, &mdev->flags);
789
8410da8f 790
907599e0
PR
791 return !ok;
792}
793
b411b363
PR
794/*
795 * return values:
796 * 1 yes, we have a valid connection
797 * 0 oops, did not work out, please try again
798 * -1 peer talks different language,
799 * no point in trying again, please go standalone.
800 * -2 We do not have a network config...
801 */
907599e0 802static int drbd_connect(struct drbd_tconn *tconn)
b411b363
PR
803{
804 struct socket *s, *sock, *msock;
805 int try, h, ok;
806
bbeb641c 807 if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
b411b363
PR
808 return -2;
809
907599e0
PR
810 clear_bit(DISCARD_CONCURRENT, &tconn->flags);
811 tconn->agreed_pro_version = 99;
fd340c12
PR
812 /* agreed_pro_version must be smaller than 100 so we send the old
813 header (h80) in the first packet and in the handshake packet. */
b411b363
PR
814
815 sock = NULL;
816 msock = NULL;
817
818 do {
819 for (try = 0;;) {
820 /* 3 tries, this should take less than a second! */
907599e0 821 s = drbd_try_connect(tconn);
b411b363
PR
822 if (s || ++try >= 3)
823 break;
824 /* give the other side time to call bind() & listen() */
20ee6390 825 schedule_timeout_interruptible(HZ / 10);
b411b363
PR
826 }
827
828 if (s) {
829 if (!sock) {
907599e0 830 drbd_send_fp(tconn, s, P_HAND_SHAKE_S);
b411b363
PR
831 sock = s;
832 s = NULL;
833 } else if (!msock) {
907599e0 834 drbd_send_fp(tconn, s, P_HAND_SHAKE_M);
b411b363
PR
835 msock = s;
836 s = NULL;
837 } else {
907599e0 838 conn_err(tconn, "Logic error in drbd_connect()\n");
b411b363
PR
839 goto out_release_sockets;
840 }
841 }
842
843 if (sock && msock) {
907599e0 844 schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
dbd9eea0
PR
845 ok = drbd_socket_okay(&sock);
846 ok = drbd_socket_okay(&msock) && ok;
b411b363
PR
847 if (ok)
848 break;
849 }
850
851retry:
907599e0 852 s = drbd_wait_for_connect(tconn);
b411b363 853 if (s) {
907599e0 854 try = drbd_recv_fp(tconn, s);
dbd9eea0
PR
855 drbd_socket_okay(&sock);
856 drbd_socket_okay(&msock);
b411b363
PR
857 switch (try) {
858 case P_HAND_SHAKE_S:
859 if (sock) {
907599e0 860 conn_warn(tconn, "initial packet S crossed\n");
b411b363
PR
861 sock_release(sock);
862 }
863 sock = s;
864 break;
865 case P_HAND_SHAKE_M:
866 if (msock) {
907599e0 867 conn_warn(tconn, "initial packet M crossed\n");
b411b363
PR
868 sock_release(msock);
869 }
870 msock = s;
907599e0 871 set_bit(DISCARD_CONCURRENT, &tconn->flags);
b411b363
PR
872 break;
873 default:
907599e0 874 conn_warn(tconn, "Error receiving initial packet\n");
b411b363
PR
875 sock_release(s);
876 if (random32() & 1)
877 goto retry;
878 }
879 }
880
bbeb641c 881 if (tconn->cstate <= C_DISCONNECTING)
b411b363
PR
882 goto out_release_sockets;
883 if (signal_pending(current)) {
884 flush_signals(current);
885 smp_rmb();
907599e0 886 if (get_t_state(&tconn->receiver) == EXITING)
b411b363
PR
887 goto out_release_sockets;
888 }
889
890 if (sock && msock) {
dbd9eea0
PR
891 ok = drbd_socket_okay(&sock);
892 ok = drbd_socket_okay(&msock) && ok;
b411b363
PR
893 if (ok)
894 break;
895 }
896 } while (1);
897
898 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
899 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
900
901 sock->sk->sk_allocation = GFP_NOIO;
902 msock->sk->sk_allocation = GFP_NOIO;
903
904 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
905 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
906
b411b363 907 /* NOT YET ...
907599e0 908 * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
b411b363
PR
909 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
910 * first set it to the P_HAND_SHAKE timeout,
911 * which we set to 4x the configured ping_timeout. */
912 sock->sk->sk_sndtimeo =
907599e0 913 sock->sk->sk_rcvtimeo = tconn->net_conf->ping_timeo*4*HZ/10;
b411b363 914
907599e0
PR
915 msock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
916 msock->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
b411b363
PR
917
918 /* we don't want delays.
25985edc 919 * we use TCP_CORK where appropriate, though */
b411b363
PR
920 drbd_tcp_nodelay(sock);
921 drbd_tcp_nodelay(msock);
922
907599e0
PR
923 tconn->data.socket = sock;
924 tconn->meta.socket = msock;
925 tconn->last_received = jiffies;
b411b363 926
907599e0 927 h = drbd_do_handshake(tconn);
b411b363
PR
928 if (h <= 0)
929 return h;
930
907599e0 931 if (tconn->cram_hmac_tfm) {
b411b363 932 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
907599e0 933 switch (drbd_do_auth(tconn)) {
b10d96cb 934 case -1:
907599e0 935 conn_err(tconn, "Authentication of peer failed\n");
b411b363 936 return -1;
b10d96cb 937 case 0:
907599e0 938 conn_err(tconn, "Authentication of peer failed, trying again.\n");
b10d96cb 939 return 0;
b411b363
PR
940 }
941 }
942
bbeb641c 943 if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
b411b363
PR
944 return 0;
945
907599e0 946 sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
b411b363
PR
947 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
948
907599e0 949 drbd_thread_start(&tconn->asender);
b411b363 950
387eb308 951 if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
7e2455c1 952 return -1;
b411b363 953
907599e0 954 return !idr_for_each(&tconn->volumes, drbd_connected, tconn);
b411b363
PR
955
956out_release_sockets:
957 if (sock)
958 sock_release(sock);
959 if (msock)
960 sock_release(msock);
961 return -1;
962}
963
8172f3e9 964static int decode_header(struct drbd_tconn *tconn, struct p_header *h, struct packet_info *pi)
b411b363 965{
fd340c12 966 if (h->h80.magic == cpu_to_be32(DRBD_MAGIC)) {
77351055
PR
967 pi->cmd = be16_to_cpu(h->h80.command);
968 pi->size = be16_to_cpu(h->h80.length);
eefc2f7d 969 pi->vnr = 0;
ca9bc12b 970 } else if (h->h95.magic == cpu_to_be16(DRBD_MAGIC_BIG)) {
77351055
PR
971 pi->cmd = be16_to_cpu(h->h95.command);
972 pi->size = be32_to_cpu(h->h95.length) & 0x00ffffff;
973 pi->vnr = 0;
02918be2 974 } else {
ce243853 975 conn_err(tconn, "magic?? on data m: 0x%08x c: %d l: %d\n",
004352fa
LE
976 be32_to_cpu(h->h80.magic),
977 be16_to_cpu(h->h80.command),
978 be16_to_cpu(h->h80.length));
8172f3e9 979 return -EINVAL;
b411b363 980 }
8172f3e9 981 return 0;
257d0af6
PR
982}
983
9ba7aa00 984static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
257d0af6 985{
9ba7aa00 986 struct p_header *h = &tconn->data.rbuf.header;
69bc7bc3 987 int err;
257d0af6 988
69bc7bc3
AG
989 err = drbd_recv(tconn, h, sizeof(*h));
990 if (unlikely(err != sizeof(*h))) {
257d0af6 991 if (!signal_pending(current))
69bc7bc3
AG
992 conn_warn(tconn, "short read expecting header on sock: r=%d\n", err);
993 if (err >= 0)
994 err = -EIO;
995 return err;
257d0af6
PR
996 }
997
69bc7bc3 998 err = decode_header(tconn, h, pi);
9ba7aa00 999 tconn->last_received = jiffies;
b411b363 1000
69bc7bc3 1001 return err;
b411b363
PR
1002}
1003
2451fc3b 1004static void drbd_flush(struct drbd_conf *mdev)
b411b363
PR
1005{
1006 int rv;
1007
1008 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
fbd9b09a 1009 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
dd3932ed 1010 NULL);
b411b363
PR
1011 if (rv) {
1012 dev_err(DEV, "local disk flush failed with status %d\n", rv);
1013 /* would rather check on EOPNOTSUPP, but that is not reliable.
1014 * don't try again for ANY return value != 0
1015 * if (rv == -EOPNOTSUPP) */
1016 drbd_bump_write_ordering(mdev, WO_drain_io);
1017 }
1018 put_ldev(mdev);
1019 }
b411b363
PR
1020}
1021
1022/**
1023 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1024 * @mdev: DRBD device.
1025 * @epoch: Epoch object.
1026 * @ev: Epoch event.
1027 */
1028static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1029 struct drbd_epoch *epoch,
1030 enum epoch_event ev)
1031{
2451fc3b 1032 int epoch_size;
b411b363 1033 struct drbd_epoch *next_epoch;
b411b363
PR
1034 enum finish_epoch rv = FE_STILL_LIVE;
1035
1036 spin_lock(&mdev->epoch_lock);
1037 do {
1038 next_epoch = NULL;
b411b363
PR
1039
1040 epoch_size = atomic_read(&epoch->epoch_size);
1041
1042 switch (ev & ~EV_CLEANUP) {
1043 case EV_PUT:
1044 atomic_dec(&epoch->active);
1045 break;
1046 case EV_GOT_BARRIER_NR:
1047 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
b411b363
PR
1048 break;
1049 case EV_BECAME_LAST:
1050 /* nothing to do*/
1051 break;
1052 }
1053
b411b363
PR
1054 if (epoch_size != 0 &&
1055 atomic_read(&epoch->active) == 0 &&
2451fc3b 1056 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
b411b363
PR
1057 if (!(ev & EV_CLEANUP)) {
1058 spin_unlock(&mdev->epoch_lock);
1059 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1060 spin_lock(&mdev->epoch_lock);
1061 }
1062 dec_unacked(mdev);
1063
1064 if (mdev->current_epoch != epoch) {
1065 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1066 list_del(&epoch->list);
1067 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1068 mdev->epochs--;
b411b363
PR
1069 kfree(epoch);
1070
1071 if (rv == FE_STILL_LIVE)
1072 rv = FE_DESTROYED;
1073 } else {
1074 epoch->flags = 0;
1075 atomic_set(&epoch->epoch_size, 0);
698f9315 1076 /* atomic_set(&epoch->active, 0); is already zero */
b411b363
PR
1077 if (rv == FE_STILL_LIVE)
1078 rv = FE_RECYCLED;
2451fc3b 1079 wake_up(&mdev->ee_wait);
b411b363
PR
1080 }
1081 }
1082
1083 if (!next_epoch)
1084 break;
1085
1086 epoch = next_epoch;
1087 } while (1);
1088
1089 spin_unlock(&mdev->epoch_lock);
1090
b411b363
PR
1091 return rv;
1092}
1093
1094/**
1095 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1096 * @mdev: DRBD device.
1097 * @wo: Write ordering method to try.
1098 */
1099void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1100{
1101 enum write_ordering_e pwo;
1102 static char *write_ordering_str[] = {
1103 [WO_none] = "none",
1104 [WO_drain_io] = "drain",
1105 [WO_bdev_flush] = "flush",
b411b363
PR
1106 };
1107
1108 pwo = mdev->write_ordering;
1109 wo = min(pwo, wo);
b411b363
PR
1110 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1111 wo = WO_drain_io;
1112 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1113 wo = WO_none;
1114 mdev->write_ordering = wo;
2451fc3b 1115 if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
b411b363
PR
1116 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1117}
1118
45bb912b 1119/**
fbe29dec 1120 * drbd_submit_peer_request()
45bb912b 1121 * @mdev: DRBD device.
db830c46 1122 * @peer_req: peer request
45bb912b 1123 * @rw: flag field, see bio->bi_rw
10f6d992
LE
1124 *
1125 * May spread the pages to multiple bios,
1126 * depending on bio_add_page restrictions.
1127 *
1128 * Returns 0 if all bios have been submitted,
1129 * -ENOMEM if we could not allocate enough bios,
1130 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1131 * single page to an empty bio (which should never happen and likely indicates
1132 * that the lower level IO stack is in some way broken). This has been observed
1133 * on certain Xen deployments.
45bb912b
LE
1134 */
1135/* TODO allocate from our own bio_set. */
fbe29dec
AG
1136int drbd_submit_peer_request(struct drbd_conf *mdev,
1137 struct drbd_peer_request *peer_req,
1138 const unsigned rw, const int fault_type)
45bb912b
LE
1139{
1140 struct bio *bios = NULL;
1141 struct bio *bio;
db830c46
AG
1142 struct page *page = peer_req->pages;
1143 sector_t sector = peer_req->i.sector;
1144 unsigned ds = peer_req->i.size;
45bb912b
LE
1145 unsigned n_bios = 0;
1146 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
10f6d992 1147 int err = -ENOMEM;
45bb912b
LE
1148
1149 /* In most cases, we will only need one bio. But in case the lower
1150 * level restrictions happen to be different at this offset on this
1151 * side than those of the sending peer, we may need to submit the
da4a75d2
LE
1152 * request in more than one bio.
1153 *
1154 * Plain bio_alloc is good enough here, this is no DRBD internally
1155 * generated bio, but a bio allocated on behalf of the peer.
1156 */
45bb912b
LE
1157next_bio:
1158 bio = bio_alloc(GFP_NOIO, nr_pages);
1159 if (!bio) {
1160 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1161 goto fail;
1162 }
db830c46 1163 /* > peer_req->i.sector, unless this is the first bio */
45bb912b
LE
1164 bio->bi_sector = sector;
1165 bio->bi_bdev = mdev->ldev->backing_bdev;
45bb912b 1166 bio->bi_rw = rw;
db830c46 1167 bio->bi_private = peer_req;
fcefa62e 1168 bio->bi_end_io = drbd_peer_request_endio;
45bb912b
LE
1169
1170 bio->bi_next = bios;
1171 bios = bio;
1172 ++n_bios;
1173
1174 page_chain_for_each(page) {
1175 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1176 if (!bio_add_page(bio, page, len, 0)) {
10f6d992
LE
1177 /* A single page must always be possible!
1178 * But in case it fails anyways,
1179 * we deal with it, and complain (below). */
1180 if (bio->bi_vcnt == 0) {
1181 dev_err(DEV,
1182 "bio_add_page failed for len=%u, "
1183 "bi_vcnt=0 (bi_sector=%llu)\n",
1184 len, (unsigned long long)bio->bi_sector);
1185 err = -ENOSPC;
1186 goto fail;
1187 }
45bb912b
LE
1188 goto next_bio;
1189 }
1190 ds -= len;
1191 sector += len >> 9;
1192 --nr_pages;
1193 }
1194 D_ASSERT(page == NULL);
1195 D_ASSERT(ds == 0);
1196
db830c46 1197 atomic_set(&peer_req->pending_bios, n_bios);
45bb912b
LE
1198 do {
1199 bio = bios;
1200 bios = bios->bi_next;
1201 bio->bi_next = NULL;
1202
45bb912b 1203 drbd_generic_make_request(mdev, fault_type, bio);
45bb912b 1204 } while (bios);
45bb912b
LE
1205 return 0;
1206
1207fail:
1208 while (bios) {
1209 bio = bios;
1210 bios = bios->bi_next;
1211 bio_put(bio);
1212 }
10f6d992 1213 return err;
45bb912b
LE
1214}
1215
53840641 1216static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
db830c46 1217 struct drbd_peer_request *peer_req)
53840641 1218{
db830c46 1219 struct drbd_interval *i = &peer_req->i;
53840641
AG
1220
1221 drbd_remove_interval(&mdev->write_requests, i);
1222 drbd_clear_interval(i);
1223
6c852bec 1224 /* Wake up any processes waiting for this peer request to complete. */
53840641
AG
1225 if (i->waiting)
1226 wake_up(&mdev->misc_wait);
1227}
1228
d8763023
AG
1229static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packet cmd,
1230 unsigned int data_size)
b411b363 1231{
2451fc3b 1232 int rv;
e42325a5 1233 struct p_barrier *p = &mdev->tconn->data.rbuf.barrier;
b411b363
PR
1234 struct drbd_epoch *epoch;
1235
b411b363
PR
1236 inc_unacked(mdev);
1237
b411b363
PR
1238 mdev->current_epoch->barrier_nr = p->barrier;
1239 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1240
1241 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1242 * the activity log, which means it would not be resynced in case the
1243 * R_PRIMARY crashes now.
1244 * Therefore we must send the barrier_ack after the barrier request was
1245 * completed. */
1246 switch (mdev->write_ordering) {
b411b363
PR
1247 case WO_none:
1248 if (rv == FE_RECYCLED)
81e84650 1249 return true;
2451fc3b
PR
1250
1251 /* receiver context, in the writeout path of the other node.
1252 * avoid potential distributed deadlock */
1253 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1254 if (epoch)
1255 break;
1256 else
1257 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1258 /* Fall through */
b411b363
PR
1259
1260 case WO_bdev_flush:
1261 case WO_drain_io:
b411b363 1262 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
2451fc3b
PR
1263 drbd_flush(mdev);
1264
1265 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1266 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1267 if (epoch)
1268 break;
b411b363
PR
1269 }
1270
2451fc3b
PR
1271 epoch = mdev->current_epoch;
1272 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1273
1274 D_ASSERT(atomic_read(&epoch->active) == 0);
1275 D_ASSERT(epoch->flags == 0);
b411b363 1276
81e84650 1277 return true;
2451fc3b
PR
1278 default:
1279 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
81e84650 1280 return false;
b411b363
PR
1281 }
1282
1283 epoch->flags = 0;
1284 atomic_set(&epoch->epoch_size, 0);
1285 atomic_set(&epoch->active, 0);
1286
1287 spin_lock(&mdev->epoch_lock);
1288 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1289 list_add(&epoch->list, &mdev->current_epoch->list);
1290 mdev->current_epoch = epoch;
1291 mdev->epochs++;
b411b363
PR
1292 } else {
1293 /* The current_epoch got recycled while we allocated this one... */
1294 kfree(epoch);
1295 }
1296 spin_unlock(&mdev->epoch_lock);
1297
81e84650 1298 return true;
b411b363
PR
1299}
1300
1301/* used from receive_RSDataReply (recv_resync_read)
1302 * and from receive_Data */
f6ffca9f
AG
1303static struct drbd_peer_request *
1304read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1305 int data_size) __must_hold(local)
b411b363 1306{
6666032a 1307 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
db830c46 1308 struct drbd_peer_request *peer_req;
b411b363 1309 struct page *page;
45bb912b 1310 int dgs, ds, rr;
a0638456
PR
1311 void *dig_in = mdev->tconn->int_dig_in;
1312 void *dig_vv = mdev->tconn->int_dig_vv;
6b4388ac 1313 unsigned long *data;
b411b363 1314
a0638456
PR
1315 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1316 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
b411b363
PR
1317
1318 if (dgs) {
de0ff338 1319 rr = drbd_recv(mdev->tconn, dig_in, dgs);
b411b363 1320 if (rr != dgs) {
0ddc5549
LE
1321 if (!signal_pending(current))
1322 dev_warn(DEV,
1323 "short read receiving data digest: read %d expected %d\n",
1324 rr, dgs);
b411b363
PR
1325 return NULL;
1326 }
1327 }
1328
1329 data_size -= dgs;
1330
841ce241
AG
1331 if (!expect(data_size != 0))
1332 return NULL;
1333 if (!expect(IS_ALIGNED(data_size, 512)))
1334 return NULL;
1335 if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1336 return NULL;
b411b363 1337
6666032a
LE
1338 /* even though we trust out peer,
1339 * we sometimes have to double check. */
1340 if (sector + (data_size>>9) > capacity) {
fdda6544
LE
1341 dev_err(DEV, "request from peer beyond end of local disk: "
1342 "capacity: %llus < sector: %llus + size: %u\n",
6666032a
LE
1343 (unsigned long long)capacity,
1344 (unsigned long long)sector, data_size);
1345 return NULL;
1346 }
1347
b411b363
PR
1348 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1349 * "criss-cross" setup, that might cause write-out on some other DRBD,
1350 * which in turn might block on the other node at this very place. */
db830c46
AG
1351 peer_req = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1352 if (!peer_req)
b411b363 1353 return NULL;
45bb912b 1354
b411b363 1355 ds = data_size;
db830c46 1356 page = peer_req->pages;
45bb912b
LE
1357 page_chain_for_each(page) {
1358 unsigned len = min_t(int, ds, PAGE_SIZE);
6b4388ac 1359 data = kmap(page);
de0ff338 1360 rr = drbd_recv(mdev->tconn, data, len);
0cf9d27e 1361 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
6b4388ac
PR
1362 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1363 data[0] = data[0] ^ (unsigned long)-1;
1364 }
b411b363 1365 kunmap(page);
45bb912b 1366 if (rr != len) {
db830c46 1367 drbd_free_ee(mdev, peer_req);
0ddc5549
LE
1368 if (!signal_pending(current))
1369 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1370 rr, len);
b411b363
PR
1371 return NULL;
1372 }
1373 ds -= rr;
1374 }
1375
1376 if (dgs) {
db830c46 1377 drbd_csum_ee(mdev, mdev->tconn->integrity_r_tfm, peer_req, dig_vv);
b411b363 1378 if (memcmp(dig_in, dig_vv, dgs)) {
470be44a
LE
1379 dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1380 (unsigned long long)sector, data_size);
db830c46 1381 drbd_free_ee(mdev, peer_req);
b411b363
PR
1382 return NULL;
1383 }
1384 }
1385 mdev->recv_cnt += data_size>>9;
db830c46 1386 return peer_req;
b411b363
PR
1387}
1388
1389/* drbd_drain_block() just takes a data block
1390 * out of the socket input buffer, and discards it.
1391 */
1392static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1393{
1394 struct page *page;
fc5be839 1395 int rr, err = 0;
b411b363
PR
1396 void *data;
1397
c3470cde 1398 if (!data_size)
fc5be839 1399 return 0;
c3470cde 1400
45bb912b 1401 page = drbd_pp_alloc(mdev, 1, 1);
b411b363
PR
1402
1403 data = kmap(page);
1404 while (data_size) {
fc5be839
AG
1405 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1406
1407 rr = drbd_recv(mdev->tconn, data, len);
1408 if (rr != len) {
0ddc5549
LE
1409 if (!signal_pending(current))
1410 dev_warn(DEV,
1411 "short read receiving data: read %d expected %d\n",
fc5be839
AG
1412 rr, len);
1413 err = (rr < 0) ? rr : -EIO;
b411b363
PR
1414 break;
1415 }
1416 data_size -= rr;
1417 }
1418 kunmap(page);
435f0740 1419 drbd_pp_free(mdev, page, 0);
fc5be839 1420 return err;
b411b363
PR
1421}
1422
1423static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1424 sector_t sector, int data_size)
1425{
1426 struct bio_vec *bvec;
1427 struct bio *bio;
1428 int dgs, rr, i, expect;
a0638456
PR
1429 void *dig_in = mdev->tconn->int_dig_in;
1430 void *dig_vv = mdev->tconn->int_dig_vv;
b411b363 1431
a0638456
PR
1432 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1433 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
b411b363
PR
1434
1435 if (dgs) {
de0ff338 1436 rr = drbd_recv(mdev->tconn, dig_in, dgs);
b411b363 1437 if (rr != dgs) {
0ddc5549
LE
1438 if (!signal_pending(current))
1439 dev_warn(DEV,
1440 "short read receiving data reply digest: read %d expected %d\n",
1441 rr, dgs);
28284cef 1442 return rr < 0 ? rr : -EIO;
b411b363
PR
1443 }
1444 }
1445
1446 data_size -= dgs;
1447
1448 /* optimistically update recv_cnt. if receiving fails below,
1449 * we disconnect anyways, and counters will be reset. */
1450 mdev->recv_cnt += data_size>>9;
1451
1452 bio = req->master_bio;
1453 D_ASSERT(sector == bio->bi_sector);
1454
1455 bio_for_each_segment(bvec, bio, i) {
1456 expect = min_t(int, data_size, bvec->bv_len);
de0ff338 1457 rr = drbd_recv(mdev->tconn,
b411b363
PR
1458 kmap(bvec->bv_page)+bvec->bv_offset,
1459 expect);
1460 kunmap(bvec->bv_page);
1461 if (rr != expect) {
0ddc5549
LE
1462 if (!signal_pending(current))
1463 dev_warn(DEV, "short read receiving data reply: "
1464 "read %d expected %d\n",
1465 rr, expect);
28284cef 1466 return rr < 0 ? rr : -EIO;
b411b363
PR
1467 }
1468 data_size -= rr;
1469 }
1470
1471 if (dgs) {
a0638456 1472 drbd_csum_bio(mdev, mdev->tconn->integrity_r_tfm, bio, dig_vv);
b411b363
PR
1473 if (memcmp(dig_in, dig_vv, dgs)) {
1474 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
28284cef 1475 return -EINVAL;
b411b363
PR
1476 }
1477 }
1478
1479 D_ASSERT(data_size == 0);
28284cef 1480 return 0;
b411b363
PR
1481}
1482
1483/* e_end_resync_block() is called via
1484 * drbd_process_done_ee() by asender only */
99920dc5 1485static int e_end_resync_block(struct drbd_work *w, int unused)
b411b363 1486{
8050e6d0
AG
1487 struct drbd_peer_request *peer_req =
1488 container_of(w, struct drbd_peer_request, w);
00d56944 1489 struct drbd_conf *mdev = w->mdev;
db830c46 1490 sector_t sector = peer_req->i.sector;
99920dc5 1491 int err;
b411b363 1492
db830c46 1493 D_ASSERT(drbd_interval_empty(&peer_req->i));
b411b363 1494
db830c46
AG
1495 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1496 drbd_set_in_sync(mdev, sector, peer_req->i.size);
99920dc5 1497 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
b411b363
PR
1498 } else {
1499 /* Record failure to sync */
db830c46 1500 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
b411b363 1501
99920dc5 1502 err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
b411b363
PR
1503 }
1504 dec_unacked(mdev);
1505
99920dc5 1506 return err;
b411b363
PR
1507}
1508
1509static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1510{
db830c46 1511 struct drbd_peer_request *peer_req;
b411b363 1512
db830c46
AG
1513 peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1514 if (!peer_req)
45bb912b 1515 goto fail;
b411b363
PR
1516
1517 dec_rs_pending(mdev);
1518
b411b363
PR
1519 inc_unacked(mdev);
1520 /* corresponding dec_unacked() in e_end_resync_block()
1521 * respective _drbd_clear_done_ee */
1522
db830c46 1523 peer_req->w.cb = e_end_resync_block;
45bb912b 1524
87eeee41 1525 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 1526 list_add(&peer_req->w.list, &mdev->sync_ee);
87eeee41 1527 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 1528
0f0601f4 1529 atomic_add(data_size >> 9, &mdev->rs_sect_ev);
fbe29dec 1530 if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
e1c1b0fc 1531 return 0;
b411b363 1532
10f6d992
LE
1533 /* don't care for the reason here */
1534 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 1535 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 1536 list_del(&peer_req->w.list);
87eeee41 1537 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9 1538
db830c46 1539 drbd_free_ee(mdev, peer_req);
45bb912b
LE
1540fail:
1541 put_ldev(mdev);
e1c1b0fc 1542 return -EIO;
b411b363
PR
1543}
1544
668eebc6 1545static struct drbd_request *
bc9c5c41
AG
1546find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1547 sector_t sector, bool missing_ok, const char *func)
51624585 1548{
51624585
AG
1549 struct drbd_request *req;
1550
bc9c5c41
AG
1551 /* Request object according to our peer */
1552 req = (struct drbd_request *)(unsigned long)id;
5e472264 1553 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
668eebc6 1554 return req;
c3afd8f5
AG
1555 if (!missing_ok) {
1556 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1557 (unsigned long)id, (unsigned long long)sector);
1558 }
51624585
AG
1559 return NULL;
1560}
1561
d8763023
AG
1562static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packet cmd,
1563 unsigned int data_size)
b411b363
PR
1564{
1565 struct drbd_request *req;
1566 sector_t sector;
b411b363 1567 int ok;
e42325a5 1568 struct p_data *p = &mdev->tconn->data.rbuf.data;
b411b363
PR
1569
1570 sector = be64_to_cpu(p->sector);
1571
87eeee41 1572 spin_lock_irq(&mdev->tconn->req_lock);
bc9c5c41 1573 req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
87eeee41 1574 spin_unlock_irq(&mdev->tconn->req_lock);
c3afd8f5 1575 if (unlikely(!req))
81e84650 1576 return false;
b411b363 1577
24c4830c 1578 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
b411b363
PR
1579 * special casing it there for the various failure cases.
1580 * still no race with drbd_fail_pending_reads */
28284cef 1581 ok = !recv_dless_read(mdev, req, sector, data_size);
b411b363
PR
1582
1583 if (ok)
8554df1c 1584 req_mod(req, DATA_RECEIVED);
b411b363
PR
1585 /* else: nothing. handled from drbd_disconnect...
1586 * I don't think we may complete this just yet
1587 * in case we are "on-disconnect: freeze" */
1588
1589 return ok;
1590}
1591
d8763023
AG
1592static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packet cmd,
1593 unsigned int data_size)
b411b363
PR
1594{
1595 sector_t sector;
b411b363 1596 int ok;
e42325a5 1597 struct p_data *p = &mdev->tconn->data.rbuf.data;
b411b363
PR
1598
1599 sector = be64_to_cpu(p->sector);
1600 D_ASSERT(p->block_id == ID_SYNCER);
1601
1602 if (get_ldev(mdev)) {
1603 /* data is submitted to disk within recv_resync_read.
1604 * corresponding put_ldev done below on error,
fcefa62e 1605 * or in drbd_peer_request_endio. */
e1c1b0fc 1606 ok = !recv_resync_read(mdev, sector, data_size);
b411b363
PR
1607 } else {
1608 if (__ratelimit(&drbd_ratelimit_state))
1609 dev_err(DEV, "Can not write resync data to local disk.\n");
1610
fc5be839 1611 ok = !drbd_drain_block(mdev, data_size);
b411b363 1612
2b2bf214 1613 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
b411b363
PR
1614 }
1615
778f271d
PR
1616 atomic_add(data_size >> 9, &mdev->rs_sect_in);
1617
b411b363
PR
1618 return ok;
1619}
1620
99920dc5 1621static int w_restart_write(struct drbd_work *w, int cancel)
7be8da07
AG
1622{
1623 struct drbd_request *req = container_of(w, struct drbd_request, w);
1624 struct drbd_conf *mdev = w->mdev;
1625 struct bio *bio;
1626 unsigned long start_time;
1627 unsigned long flags;
1628
1629 spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1630 if (!expect(req->rq_state & RQ_POSTPONED)) {
1631 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
99920dc5 1632 return -EIO;
7be8da07
AG
1633 }
1634 bio = req->master_bio;
1635 start_time = req->start_time;
1636 /* Postponed requests will not have their master_bio completed! */
1637 __req_mod(req, DISCARD_WRITE, NULL);
1638 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1639
1640 while (__drbd_make_request(mdev, bio, start_time))
1641 /* retry */ ;
99920dc5 1642 return 0;
7be8da07
AG
1643}
1644
1645static void restart_conflicting_writes(struct drbd_conf *mdev,
1646 sector_t sector, int size)
1647{
1648 struct drbd_interval *i;
1649 struct drbd_request *req;
1650
1651 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1652 if (!i->local)
1653 continue;
1654 req = container_of(i, struct drbd_request, i);
1655 if (req->rq_state & RQ_LOCAL_PENDING ||
1656 !(req->rq_state & RQ_POSTPONED))
1657 continue;
1658 if (expect(list_empty(&req->w.list))) {
1659 req->w.mdev = mdev;
1660 req->w.cb = w_restart_write;
1661 drbd_queue_work(&mdev->tconn->data.work, &req->w);
1662 }
1663 }
1664}
1665
b411b363
PR
1666/* e_end_block() is called via drbd_process_done_ee().
1667 * this means this function only runs in the asender thread
1668 */
99920dc5 1669static int e_end_block(struct drbd_work *w, int cancel)
b411b363 1670{
8050e6d0
AG
1671 struct drbd_peer_request *peer_req =
1672 container_of(w, struct drbd_peer_request, w);
00d56944 1673 struct drbd_conf *mdev = w->mdev;
db830c46 1674 sector_t sector = peer_req->i.sector;
99920dc5 1675 int err = 0, pcmd;
b411b363 1676
89e58e75 1677 if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C) {
db830c46 1678 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1679 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1680 mdev->state.conn <= C_PAUSED_SYNC_T &&
db830c46 1681 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
b411b363 1682 P_RS_WRITE_ACK : P_WRITE_ACK;
99920dc5 1683 err = drbd_send_ack(mdev, pcmd, peer_req);
b411b363 1684 if (pcmd == P_RS_WRITE_ACK)
db830c46 1685 drbd_set_in_sync(mdev, sector, peer_req->i.size);
b411b363 1686 } else {
99920dc5 1687 err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
b411b363
PR
1688 /* we expect it to be marked out of sync anyways...
1689 * maybe assert this? */
1690 }
1691 dec_unacked(mdev);
1692 }
1693 /* we delete from the conflict detection hash _after_ we sent out the
1694 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
89e58e75 1695 if (mdev->tconn->net_conf->two_primaries) {
87eeee41 1696 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
1697 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1698 drbd_remove_epoch_entry_interval(mdev, peer_req);
7be8da07
AG
1699 if (peer_req->flags & EE_RESTART_REQUESTS)
1700 restart_conflicting_writes(mdev, sector, peer_req->i.size);
87eeee41 1701 spin_unlock_irq(&mdev->tconn->req_lock);
bb3bfe96 1702 } else
db830c46 1703 D_ASSERT(drbd_interval_empty(&peer_req->i));
b411b363 1704
db830c46 1705 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
b411b363 1706
99920dc5 1707 return err;
b411b363
PR
1708}
1709
7be8da07 1710static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
b411b363 1711{
7be8da07 1712 struct drbd_conf *mdev = w->mdev;
8050e6d0
AG
1713 struct drbd_peer_request *peer_req =
1714 container_of(w, struct drbd_peer_request, w);
99920dc5 1715 int err;
b411b363 1716
99920dc5 1717 err = drbd_send_ack(mdev, ack, peer_req);
b411b363
PR
1718 dec_unacked(mdev);
1719
99920dc5 1720 return err;
b411b363
PR
1721}
1722
99920dc5 1723static int e_send_discard_write(struct drbd_work *w, int unused)
7be8da07
AG
1724{
1725 return e_send_ack(w, P_DISCARD_WRITE);
1726}
1727
99920dc5 1728static int e_send_retry_write(struct drbd_work *w, int unused)
7be8da07
AG
1729{
1730 struct drbd_tconn *tconn = w->mdev->tconn;
1731
1732 return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1733 P_RETRY_WRITE : P_DISCARD_WRITE);
1734}
1735
3e394da1
AG
1736static bool seq_greater(u32 a, u32 b)
1737{
1738 /*
1739 * We assume 32-bit wrap-around here.
1740 * For 24-bit wrap-around, we would have to shift:
1741 * a <<= 8; b <<= 8;
1742 */
1743 return (s32)a - (s32)b > 0;
1744}
1745
1746static u32 seq_max(u32 a, u32 b)
1747{
1748 return seq_greater(a, b) ? a : b;
1749}
1750
7be8da07
AG
1751static bool need_peer_seq(struct drbd_conf *mdev)
1752{
1753 struct drbd_tconn *tconn = mdev->tconn;
1754
1755 /*
1756 * We only need to keep track of the last packet_seq number of our peer
1757 * if we are in dual-primary mode and we have the discard flag set; see
1758 * handle_write_conflicts().
1759 */
1760 return tconn->net_conf->two_primaries &&
1761 test_bit(DISCARD_CONCURRENT, &tconn->flags);
1762}
1763
43ae077d 1764static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
3e394da1 1765{
3c13b680 1766 unsigned int newest_peer_seq;
3e394da1 1767
7be8da07
AG
1768 if (need_peer_seq(mdev)) {
1769 spin_lock(&mdev->peer_seq_lock);
3c13b680
LE
1770 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1771 mdev->peer_seq = newest_peer_seq;
7be8da07 1772 spin_unlock(&mdev->peer_seq_lock);
3c13b680
LE
1773 /* wake up only if we actually changed mdev->peer_seq */
1774 if (peer_seq == newest_peer_seq)
7be8da07
AG
1775 wake_up(&mdev->seq_wait);
1776 }
3e394da1
AG
1777}
1778
b411b363
PR
1779/* Called from receive_Data.
1780 * Synchronize packets on sock with packets on msock.
1781 *
1782 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1783 * packet traveling on msock, they are still processed in the order they have
1784 * been sent.
1785 *
1786 * Note: we don't care for Ack packets overtaking P_DATA packets.
1787 *
1788 * In case packet_seq is larger than mdev->peer_seq number, there are
1789 * outstanding packets on the msock. We wait for them to arrive.
1790 * In case we are the logically next packet, we update mdev->peer_seq
1791 * ourselves. Correctly handles 32bit wrap around.
1792 *
1793 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1794 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1795 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1796 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1797 *
1798 * returns 0 if we may process the packet,
1799 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
7be8da07 1800static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
b411b363
PR
1801{
1802 DEFINE_WAIT(wait);
b411b363 1803 long timeout;
7be8da07
AG
1804 int ret;
1805
1806 if (!need_peer_seq(mdev))
1807 return 0;
1808
b411b363
PR
1809 spin_lock(&mdev->peer_seq_lock);
1810 for (;;) {
7be8da07
AG
1811 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1812 mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1813 ret = 0;
b411b363 1814 break;
7be8da07 1815 }
b411b363
PR
1816 if (signal_pending(current)) {
1817 ret = -ERESTARTSYS;
1818 break;
1819 }
7be8da07 1820 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
b411b363 1821 spin_unlock(&mdev->peer_seq_lock);
71b1c1eb
AG
1822 timeout = mdev->tconn->net_conf->ping_timeo*HZ/10;
1823 timeout = schedule_timeout(timeout);
b411b363 1824 spin_lock(&mdev->peer_seq_lock);
7be8da07 1825 if (!timeout) {
b411b363 1826 ret = -ETIMEDOUT;
71b1c1eb 1827 dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
b411b363
PR
1828 break;
1829 }
1830 }
b411b363 1831 spin_unlock(&mdev->peer_seq_lock);
7be8da07 1832 finish_wait(&mdev->seq_wait, &wait);
b411b363
PR
1833 return ret;
1834}
1835
688593c5
LE
1836/* see also bio_flags_to_wire()
1837 * DRBD_REQ_*, because we need to semantically map the flags to data packet
1838 * flags and back. We may replicate to other kernel versions. */
1839static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
76d2e7ec 1840{
688593c5
LE
1841 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1842 (dpf & DP_FUA ? REQ_FUA : 0) |
1843 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1844 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
76d2e7ec
PR
1845}
1846
7be8da07
AG
1847static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1848 unsigned int size)
1849{
1850 struct drbd_interval *i;
1851
1852 repeat:
1853 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1854 struct drbd_request *req;
1855 struct bio_and_error m;
1856
1857 if (!i->local)
1858 continue;
1859 req = container_of(i, struct drbd_request, i);
1860 if (!(req->rq_state & RQ_POSTPONED))
1861 continue;
1862 req->rq_state &= ~RQ_POSTPONED;
1863 __req_mod(req, NEG_ACKED, &m);
1864 spin_unlock_irq(&mdev->tconn->req_lock);
1865 if (m.bio)
1866 complete_master_bio(mdev, &m);
1867 spin_lock_irq(&mdev->tconn->req_lock);
1868 goto repeat;
1869 }
1870}
1871
1872static int handle_write_conflicts(struct drbd_conf *mdev,
1873 struct drbd_peer_request *peer_req)
1874{
1875 struct drbd_tconn *tconn = mdev->tconn;
1876 bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1877 sector_t sector = peer_req->i.sector;
1878 const unsigned int size = peer_req->i.size;
1879 struct drbd_interval *i;
1880 bool equal;
1881 int err;
1882
1883 /*
1884 * Inserting the peer request into the write_requests tree will prevent
1885 * new conflicting local requests from being added.
1886 */
1887 drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1888
1889 repeat:
1890 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1891 if (i == &peer_req->i)
1892 continue;
1893
1894 if (!i->local) {
1895 /*
1896 * Our peer has sent a conflicting remote request; this
1897 * should not happen in a two-node setup. Wait for the
1898 * earlier peer request to complete.
1899 */
1900 err = drbd_wait_misc(mdev, i);
1901 if (err)
1902 goto out;
1903 goto repeat;
1904 }
1905
1906 equal = i->sector == sector && i->size == size;
1907 if (resolve_conflicts) {
1908 /*
1909 * If the peer request is fully contained within the
1910 * overlapping request, it can be discarded; otherwise,
1911 * it will be retried once all overlapping requests
1912 * have completed.
1913 */
1914 bool discard = i->sector <= sector && i->sector +
1915 (i->size >> 9) >= sector + (size >> 9);
1916
1917 if (!equal)
1918 dev_alert(DEV, "Concurrent writes detected: "
1919 "local=%llus +%u, remote=%llus +%u, "
1920 "assuming %s came first\n",
1921 (unsigned long long)i->sector, i->size,
1922 (unsigned long long)sector, size,
1923 discard ? "local" : "remote");
1924
1925 inc_unacked(mdev);
1926 peer_req->w.cb = discard ? e_send_discard_write :
1927 e_send_retry_write;
1928 list_add_tail(&peer_req->w.list, &mdev->done_ee);
1929 wake_asender(mdev->tconn);
1930
1931 err = -ENOENT;
1932 goto out;
1933 } else {
1934 struct drbd_request *req =
1935 container_of(i, struct drbd_request, i);
1936
1937 if (!equal)
1938 dev_alert(DEV, "Concurrent writes detected: "
1939 "local=%llus +%u, remote=%llus +%u\n",
1940 (unsigned long long)i->sector, i->size,
1941 (unsigned long long)sector, size);
1942
1943 if (req->rq_state & RQ_LOCAL_PENDING ||
1944 !(req->rq_state & RQ_POSTPONED)) {
1945 /*
1946 * Wait for the node with the discard flag to
1947 * decide if this request will be discarded or
1948 * retried. Requests that are discarded will
1949 * disappear from the write_requests tree.
1950 *
1951 * In addition, wait for the conflicting
1952 * request to finish locally before submitting
1953 * the conflicting peer request.
1954 */
1955 err = drbd_wait_misc(mdev, &req->i);
1956 if (err) {
1957 _conn_request_state(mdev->tconn,
1958 NS(conn, C_TIMEOUT),
1959 CS_HARD);
1960 fail_postponed_requests(mdev, sector, size);
1961 goto out;
1962 }
1963 goto repeat;
1964 }
1965 /*
1966 * Remember to restart the conflicting requests after
1967 * the new peer request has completed.
1968 */
1969 peer_req->flags |= EE_RESTART_REQUESTS;
1970 }
1971 }
1972 err = 0;
1973
1974 out:
1975 if (err)
1976 drbd_remove_epoch_entry_interval(mdev, peer_req);
1977 return err;
1978}
1979
b411b363 1980/* mirrored write */
d8763023
AG
1981static int receive_Data(struct drbd_conf *mdev, enum drbd_packet cmd,
1982 unsigned int data_size)
b411b363
PR
1983{
1984 sector_t sector;
db830c46 1985 struct drbd_peer_request *peer_req;
e42325a5 1986 struct p_data *p = &mdev->tconn->data.rbuf.data;
7be8da07 1987 u32 peer_seq = be32_to_cpu(p->seq_num);
b411b363
PR
1988 int rw = WRITE;
1989 u32 dp_flags;
7be8da07 1990 int err;
b411b363 1991
b411b363 1992
7be8da07
AG
1993 if (!get_ldev(mdev)) {
1994 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2b2bf214 1995 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
b411b363 1996 atomic_inc(&mdev->current_epoch->epoch_size);
fc5be839 1997 return !drbd_drain_block(mdev, data_size) && err == 0;
b411b363
PR
1998 }
1999
fcefa62e
AG
2000 /*
2001 * Corresponding put_ldev done either below (on various errors), or in
2002 * drbd_peer_request_endio, if we successfully submit the data at the
2003 * end of this function.
2004 */
b411b363
PR
2005
2006 sector = be64_to_cpu(p->sector);
db830c46
AG
2007 peer_req = read_in_block(mdev, p->block_id, sector, data_size);
2008 if (!peer_req) {
b411b363 2009 put_ldev(mdev);
81e84650 2010 return false;
b411b363
PR
2011 }
2012
db830c46 2013 peer_req->w.cb = e_end_block;
b411b363 2014
688593c5
LE
2015 dp_flags = be32_to_cpu(p->dp_flags);
2016 rw |= wire_flags_to_bio(mdev, dp_flags);
2017
2018 if (dp_flags & DP_MAY_SET_IN_SYNC)
db830c46 2019 peer_req->flags |= EE_MAY_SET_IN_SYNC;
688593c5 2020
b411b363 2021 spin_lock(&mdev->epoch_lock);
db830c46
AG
2022 peer_req->epoch = mdev->current_epoch;
2023 atomic_inc(&peer_req->epoch->epoch_size);
2024 atomic_inc(&peer_req->epoch->active);
b411b363
PR
2025 spin_unlock(&mdev->epoch_lock);
2026
7be8da07
AG
2027 if (mdev->tconn->net_conf->two_primaries) {
2028 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2029 if (err)
b411b363 2030 goto out_interrupted;
87eeee41 2031 spin_lock_irq(&mdev->tconn->req_lock);
7be8da07
AG
2032 err = handle_write_conflicts(mdev, peer_req);
2033 if (err) {
2034 spin_unlock_irq(&mdev->tconn->req_lock);
2035 if (err == -ENOENT) {
b411b363 2036 put_ldev(mdev);
81e84650 2037 return true;
b411b363 2038 }
7be8da07 2039 goto out_interrupted;
b411b363 2040 }
7be8da07
AG
2041 } else
2042 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2043 list_add(&peer_req->w.list, &mdev->active_ee);
87eeee41 2044 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 2045
89e58e75 2046 switch (mdev->tconn->net_conf->wire_protocol) {
b411b363
PR
2047 case DRBD_PROT_C:
2048 inc_unacked(mdev);
2049 /* corresponding dec_unacked() in e_end_block()
2050 * respective _drbd_clear_done_ee */
2051 break;
2052 case DRBD_PROT_B:
2053 /* I really don't like it that the receiver thread
2054 * sends on the msock, but anyways */
db830c46 2055 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
b411b363
PR
2056 break;
2057 case DRBD_PROT_A:
2058 /* nothing to do */
2059 break;
2060 }
2061
6719fb03 2062 if (mdev->state.pdsk < D_INCONSISTENT) {
b411b363 2063 /* In case we have the only disk of the cluster, */
db830c46
AG
2064 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2065 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2066 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2067 drbd_al_begin_io(mdev, peer_req->i.sector);
b411b363
PR
2068 }
2069
fbe29dec 2070 if (drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR) == 0)
81e84650 2071 return true;
b411b363 2072
10f6d992
LE
2073 /* don't care for the reason here */
2074 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 2075 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
2076 list_del(&peer_req->w.list);
2077 drbd_remove_epoch_entry_interval(mdev, peer_req);
87eeee41 2078 spin_unlock_irq(&mdev->tconn->req_lock);
db830c46
AG
2079 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2080 drbd_al_complete_io(mdev, peer_req->i.sector);
22cc37a9 2081
b411b363 2082out_interrupted:
db830c46 2083 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
b411b363 2084 put_ldev(mdev);
db830c46 2085 drbd_free_ee(mdev, peer_req);
81e84650 2086 return false;
b411b363
PR
2087}
2088
0f0601f4
LE
2089/* We may throttle resync, if the lower device seems to be busy,
2090 * and current sync rate is above c_min_rate.
2091 *
2092 * To decide whether or not the lower device is busy, we use a scheme similar
2093 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2094 * (more than 64 sectors) of activity we cannot account for with our own resync
2095 * activity, it obviously is "busy".
2096 *
2097 * The current sync rate used here uses only the most recent two step marks,
2098 * to have a short time average so we can react faster.
2099 */
e3555d85 2100int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
0f0601f4
LE
2101{
2102 struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2103 unsigned long db, dt, dbdt;
e3555d85 2104 struct lc_element *tmp;
0f0601f4
LE
2105 int curr_events;
2106 int throttle = 0;
2107
2108 /* feature disabled? */
f399002e 2109 if (mdev->ldev->dc.c_min_rate == 0)
0f0601f4
LE
2110 return 0;
2111
e3555d85
PR
2112 spin_lock_irq(&mdev->al_lock);
2113 tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2114 if (tmp) {
2115 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2116 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2117 spin_unlock_irq(&mdev->al_lock);
2118 return 0;
2119 }
2120 /* Do not slow down if app IO is already waiting for this extent */
2121 }
2122 spin_unlock_irq(&mdev->al_lock);
2123
0f0601f4
LE
2124 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2125 (int)part_stat_read(&disk->part0, sectors[1]) -
2126 atomic_read(&mdev->rs_sect_ev);
e3555d85 2127
0f0601f4
LE
2128 if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2129 unsigned long rs_left;
2130 int i;
2131
2132 mdev->rs_last_events = curr_events;
2133
2134 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2135 * approx. */
2649f080
LE
2136 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2137
2138 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2139 rs_left = mdev->ov_left;
2140 else
2141 rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
0f0601f4
LE
2142
2143 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2144 if (!dt)
2145 dt++;
2146 db = mdev->rs_mark_left[i] - rs_left;
2147 dbdt = Bit2KB(db/dt);
2148
f399002e 2149 if (dbdt > mdev->ldev->dc.c_min_rate)
0f0601f4
LE
2150 throttle = 1;
2151 }
2152 return throttle;
2153}
2154
2155
d8763023
AG
2156static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packet cmd,
2157 unsigned int digest_size)
b411b363
PR
2158{
2159 sector_t sector;
2160 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
db830c46 2161 struct drbd_peer_request *peer_req;
b411b363 2162 struct digest_info *di = NULL;
b18b37be 2163 int size, verb;
b411b363 2164 unsigned int fault_type;
e42325a5 2165 struct p_block_req *p = &mdev->tconn->data.rbuf.block_req;
b411b363
PR
2166
2167 sector = be64_to_cpu(p->sector);
2168 size = be32_to_cpu(p->blksize);
2169
c670a398 2170 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
b411b363
PR
2171 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2172 (unsigned long long)sector, size);
81e84650 2173 return false;
b411b363
PR
2174 }
2175 if (sector + (size>>9) > capacity) {
2176 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2177 (unsigned long long)sector, size);
81e84650 2178 return false;
b411b363
PR
2179 }
2180
2181 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
b18b37be
PR
2182 verb = 1;
2183 switch (cmd) {
2184 case P_DATA_REQUEST:
2185 drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2186 break;
2187 case P_RS_DATA_REQUEST:
2188 case P_CSUM_RS_REQUEST:
2189 case P_OV_REQUEST:
2190 drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2191 break;
2192 case P_OV_REPLY:
2193 verb = 0;
2194 dec_rs_pending(mdev);
2195 drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2196 break;
2197 default:
2198 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2199 cmdname(cmd));
2200 }
2201 if (verb && __ratelimit(&drbd_ratelimit_state))
b411b363
PR
2202 dev_err(DEV, "Can not satisfy peer's read request, "
2203 "no local data.\n");
b18b37be 2204
a821cc4a 2205 /* drain possibly payload */
fc5be839 2206 return !drbd_drain_block(mdev, digest_size);
b411b363
PR
2207 }
2208
2209 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2210 * "criss-cross" setup, that might cause write-out on some other DRBD,
2211 * which in turn might block on the other node at this very place. */
db830c46
AG
2212 peer_req = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2213 if (!peer_req) {
b411b363 2214 put_ldev(mdev);
81e84650 2215 return false;
b411b363
PR
2216 }
2217
02918be2 2218 switch (cmd) {
b411b363 2219 case P_DATA_REQUEST:
db830c46 2220 peer_req->w.cb = w_e_end_data_req;
b411b363 2221 fault_type = DRBD_FAULT_DT_RD;
80a40e43
LE
2222 /* application IO, don't drbd_rs_begin_io */
2223 goto submit;
2224
b411b363 2225 case P_RS_DATA_REQUEST:
db830c46 2226 peer_req->w.cb = w_e_end_rsdata_req;
b411b363 2227 fault_type = DRBD_FAULT_RS_RD;
5f9915bb
LE
2228 /* used in the sector offset progress display */
2229 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
b411b363
PR
2230 break;
2231
2232 case P_OV_REPLY:
2233 case P_CSUM_RS_REQUEST:
2234 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2235 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2236 if (!di)
2237 goto out_free_e;
2238
2239 di->digest_size = digest_size;
2240 di->digest = (((char *)di)+sizeof(struct digest_info));
2241
db830c46
AG
2242 peer_req->digest = di;
2243 peer_req->flags |= EE_HAS_DIGEST;
c36c3ced 2244
de0ff338 2245 if (drbd_recv(mdev->tconn, di->digest, digest_size) != digest_size)
b411b363
PR
2246 goto out_free_e;
2247
02918be2 2248 if (cmd == P_CSUM_RS_REQUEST) {
31890f4a 2249 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
db830c46 2250 peer_req->w.cb = w_e_end_csum_rs_req;
5f9915bb
LE
2251 /* used in the sector offset progress display */
2252 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
02918be2 2253 } else if (cmd == P_OV_REPLY) {
2649f080
LE
2254 /* track progress, we may need to throttle */
2255 atomic_add(size >> 9, &mdev->rs_sect_in);
db830c46 2256 peer_req->w.cb = w_e_end_ov_reply;
b411b363 2257 dec_rs_pending(mdev);
0f0601f4
LE
2258 /* drbd_rs_begin_io done when we sent this request,
2259 * but accounting still needs to be done. */
2260 goto submit_for_resync;
b411b363
PR
2261 }
2262 break;
2263
2264 case P_OV_REQUEST:
b411b363 2265 if (mdev->ov_start_sector == ~(sector_t)0 &&
31890f4a 2266 mdev->tconn->agreed_pro_version >= 90) {
de228bba
LE
2267 unsigned long now = jiffies;
2268 int i;
b411b363
PR
2269 mdev->ov_start_sector = sector;
2270 mdev->ov_position = sector;
30b743a2
LE
2271 mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2272 mdev->rs_total = mdev->ov_left;
de228bba
LE
2273 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2274 mdev->rs_mark_left[i] = mdev->ov_left;
2275 mdev->rs_mark_time[i] = now;
2276 }
b411b363
PR
2277 dev_info(DEV, "Online Verify start sector: %llu\n",
2278 (unsigned long long)sector);
2279 }
db830c46 2280 peer_req->w.cb = w_e_end_ov_req;
b411b363 2281 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2282 break;
2283
b411b363
PR
2284 default:
2285 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
02918be2 2286 cmdname(cmd));
b411b363 2287 fault_type = DRBD_FAULT_MAX;
80a40e43 2288 goto out_free_e;
b411b363
PR
2289 }
2290
0f0601f4
LE
2291 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2292 * wrt the receiver, but it is not as straightforward as it may seem.
2293 * Various places in the resync start and stop logic assume resync
2294 * requests are processed in order, requeuing this on the worker thread
2295 * introduces a bunch of new code for synchronization between threads.
2296 *
2297 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2298 * "forever", throttling after drbd_rs_begin_io will lock that extent
2299 * for application writes for the same time. For now, just throttle
2300 * here, where the rest of the code expects the receiver to sleep for
2301 * a while, anyways.
2302 */
2303
2304 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2305 * this defers syncer requests for some time, before letting at least
2306 * on request through. The resync controller on the receiving side
2307 * will adapt to the incoming rate accordingly.
2308 *
2309 * We cannot throttle here if remote is Primary/SyncTarget:
2310 * we would also throttle its application reads.
2311 * In that case, throttling is done on the SyncTarget only.
2312 */
e3555d85
PR
2313 if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2314 schedule_timeout_uninterruptible(HZ/10);
2315 if (drbd_rs_begin_io(mdev, sector))
80a40e43 2316 goto out_free_e;
b411b363 2317
0f0601f4
LE
2318submit_for_resync:
2319 atomic_add(size >> 9, &mdev->rs_sect_ev);
2320
80a40e43 2321submit:
b411b363 2322 inc_unacked(mdev);
87eeee41 2323 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2324 list_add_tail(&peer_req->w.list, &mdev->read_ee);
87eeee41 2325 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 2326
fbe29dec 2327 if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
81e84650 2328 return true;
b411b363 2329
10f6d992
LE
2330 /* don't care for the reason here */
2331 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 2332 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2333 list_del(&peer_req->w.list);
87eeee41 2334 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9
LE
2335 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2336
b411b363 2337out_free_e:
b411b363 2338 put_ldev(mdev);
db830c46 2339 drbd_free_ee(mdev, peer_req);
81e84650 2340 return false;
b411b363
PR
2341}
2342
2343static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2344{
2345 int self, peer, rv = -100;
2346 unsigned long ch_self, ch_peer;
2347
2348 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2349 peer = mdev->p_uuid[UI_BITMAP] & 1;
2350
2351 ch_peer = mdev->p_uuid[UI_SIZE];
2352 ch_self = mdev->comm_bm_set;
2353
89e58e75 2354 switch (mdev->tconn->net_conf->after_sb_0p) {
b411b363
PR
2355 case ASB_CONSENSUS:
2356 case ASB_DISCARD_SECONDARY:
2357 case ASB_CALL_HELPER:
2358 dev_err(DEV, "Configuration error.\n");
2359 break;
2360 case ASB_DISCONNECT:
2361 break;
2362 case ASB_DISCARD_YOUNGER_PRI:
2363 if (self == 0 && peer == 1) {
2364 rv = -1;
2365 break;
2366 }
2367 if (self == 1 && peer == 0) {
2368 rv = 1;
2369 break;
2370 }
2371 /* Else fall through to one of the other strategies... */
2372 case ASB_DISCARD_OLDER_PRI:
2373 if (self == 0 && peer == 1) {
2374 rv = 1;
2375 break;
2376 }
2377 if (self == 1 && peer == 0) {
2378 rv = -1;
2379 break;
2380 }
2381 /* Else fall through to one of the other strategies... */
ad19bf6e 2382 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
b411b363
PR
2383 "Using discard-least-changes instead\n");
2384 case ASB_DISCARD_ZERO_CHG:
2385 if (ch_peer == 0 && ch_self == 0) {
25703f83 2386 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
b411b363
PR
2387 ? -1 : 1;
2388 break;
2389 } else {
2390 if (ch_peer == 0) { rv = 1; break; }
2391 if (ch_self == 0) { rv = -1; break; }
2392 }
89e58e75 2393 if (mdev->tconn->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
b411b363
PR
2394 break;
2395 case ASB_DISCARD_LEAST_CHG:
2396 if (ch_self < ch_peer)
2397 rv = -1;
2398 else if (ch_self > ch_peer)
2399 rv = 1;
2400 else /* ( ch_self == ch_peer ) */
2401 /* Well, then use something else. */
25703f83 2402 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
b411b363
PR
2403 ? -1 : 1;
2404 break;
2405 case ASB_DISCARD_LOCAL:
2406 rv = -1;
2407 break;
2408 case ASB_DISCARD_REMOTE:
2409 rv = 1;
2410 }
2411
2412 return rv;
2413}
2414
2415static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2416{
6184ea21 2417 int hg, rv = -100;
b411b363 2418
89e58e75 2419 switch (mdev->tconn->net_conf->after_sb_1p) {
b411b363
PR
2420 case ASB_DISCARD_YOUNGER_PRI:
2421 case ASB_DISCARD_OLDER_PRI:
2422 case ASB_DISCARD_LEAST_CHG:
2423 case ASB_DISCARD_LOCAL:
2424 case ASB_DISCARD_REMOTE:
2425 dev_err(DEV, "Configuration error.\n");
2426 break;
2427 case ASB_DISCONNECT:
2428 break;
2429 case ASB_CONSENSUS:
2430 hg = drbd_asb_recover_0p(mdev);
2431 if (hg == -1 && mdev->state.role == R_SECONDARY)
2432 rv = hg;
2433 if (hg == 1 && mdev->state.role == R_PRIMARY)
2434 rv = hg;
2435 break;
2436 case ASB_VIOLENTLY:
2437 rv = drbd_asb_recover_0p(mdev);
2438 break;
2439 case ASB_DISCARD_SECONDARY:
2440 return mdev->state.role == R_PRIMARY ? 1 : -1;
2441 case ASB_CALL_HELPER:
2442 hg = drbd_asb_recover_0p(mdev);
2443 if (hg == -1 && mdev->state.role == R_PRIMARY) {
bb437946
AG
2444 enum drbd_state_rv rv2;
2445
2446 drbd_set_role(mdev, R_SECONDARY, 0);
b411b363
PR
2447 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2448 * we might be here in C_WF_REPORT_PARAMS which is transient.
2449 * we do not need to wait for the after state change work either. */
bb437946
AG
2450 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2451 if (rv2 != SS_SUCCESS) {
b411b363
PR
2452 drbd_khelper(mdev, "pri-lost-after-sb");
2453 } else {
2454 dev_warn(DEV, "Successfully gave up primary role.\n");
2455 rv = hg;
2456 }
2457 } else
2458 rv = hg;
2459 }
2460
2461 return rv;
2462}
2463
2464static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2465{
6184ea21 2466 int hg, rv = -100;
b411b363 2467
89e58e75 2468 switch (mdev->tconn->net_conf->after_sb_2p) {
b411b363
PR
2469 case ASB_DISCARD_YOUNGER_PRI:
2470 case ASB_DISCARD_OLDER_PRI:
2471 case ASB_DISCARD_LEAST_CHG:
2472 case ASB_DISCARD_LOCAL:
2473 case ASB_DISCARD_REMOTE:
2474 case ASB_CONSENSUS:
2475 case ASB_DISCARD_SECONDARY:
2476 dev_err(DEV, "Configuration error.\n");
2477 break;
2478 case ASB_VIOLENTLY:
2479 rv = drbd_asb_recover_0p(mdev);
2480 break;
2481 case ASB_DISCONNECT:
2482 break;
2483 case ASB_CALL_HELPER:
2484 hg = drbd_asb_recover_0p(mdev);
2485 if (hg == -1) {
bb437946
AG
2486 enum drbd_state_rv rv2;
2487
b411b363
PR
2488 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2489 * we might be here in C_WF_REPORT_PARAMS which is transient.
2490 * we do not need to wait for the after state change work either. */
bb437946
AG
2491 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2492 if (rv2 != SS_SUCCESS) {
b411b363
PR
2493 drbd_khelper(mdev, "pri-lost-after-sb");
2494 } else {
2495 dev_warn(DEV, "Successfully gave up primary role.\n");
2496 rv = hg;
2497 }
2498 } else
2499 rv = hg;
2500 }
2501
2502 return rv;
2503}
2504
2505static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2506 u64 bits, u64 flags)
2507{
2508 if (!uuid) {
2509 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2510 return;
2511 }
2512 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2513 text,
2514 (unsigned long long)uuid[UI_CURRENT],
2515 (unsigned long long)uuid[UI_BITMAP],
2516 (unsigned long long)uuid[UI_HISTORY_START],
2517 (unsigned long long)uuid[UI_HISTORY_END],
2518 (unsigned long long)bits,
2519 (unsigned long long)flags);
2520}
2521
2522/*
2523 100 after split brain try auto recover
2524 2 C_SYNC_SOURCE set BitMap
2525 1 C_SYNC_SOURCE use BitMap
2526 0 no Sync
2527 -1 C_SYNC_TARGET use BitMap
2528 -2 C_SYNC_TARGET set BitMap
2529 -100 after split brain, disconnect
2530-1000 unrelated data
4a23f264
PR
2531-1091 requires proto 91
2532-1096 requires proto 96
b411b363
PR
2533 */
2534static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2535{
2536 u64 self, peer;
2537 int i, j;
2538
2539 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2540 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2541
2542 *rule_nr = 10;
2543 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2544 return 0;
2545
2546 *rule_nr = 20;
2547 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2548 peer != UUID_JUST_CREATED)
2549 return -2;
2550
2551 *rule_nr = 30;
2552 if (self != UUID_JUST_CREATED &&
2553 (peer == UUID_JUST_CREATED || peer == (u64)0))
2554 return 2;
2555
2556 if (self == peer) {
2557 int rct, dc; /* roles at crash time */
2558
2559 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2560
31890f4a 2561 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2562 return -1091;
b411b363
PR
2563
2564 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2565 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2566 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2567 drbd_uuid_set_bm(mdev, 0UL);
2568
2569 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2570 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2571 *rule_nr = 34;
2572 } else {
2573 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2574 *rule_nr = 36;
2575 }
2576
2577 return 1;
2578 }
2579
2580 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2581
31890f4a 2582 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2583 return -1091;
b411b363
PR
2584
2585 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2586 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2587 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2588
2589 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2590 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2591 mdev->p_uuid[UI_BITMAP] = 0UL;
2592
2593 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2594 *rule_nr = 35;
2595 } else {
2596 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2597 *rule_nr = 37;
2598 }
2599
2600 return -1;
2601 }
2602
2603 /* Common power [off|failure] */
2604 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2605 (mdev->p_uuid[UI_FLAGS] & 2);
2606 /* lowest bit is set when we were primary,
2607 * next bit (weight 2) is set when peer was primary */
2608 *rule_nr = 40;
2609
2610 switch (rct) {
2611 case 0: /* !self_pri && !peer_pri */ return 0;
2612 case 1: /* self_pri && !peer_pri */ return 1;
2613 case 2: /* !self_pri && peer_pri */ return -1;
2614 case 3: /* self_pri && peer_pri */
25703f83 2615 dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
b411b363
PR
2616 return dc ? -1 : 1;
2617 }
2618 }
2619
2620 *rule_nr = 50;
2621 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2622 if (self == peer)
2623 return -1;
2624
2625 *rule_nr = 51;
2626 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2627 if (self == peer) {
31890f4a 2628 if (mdev->tconn->agreed_pro_version < 96 ?
4a23f264
PR
2629 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2630 (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2631 peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2632 /* The last P_SYNC_UUID did not get though. Undo the last start of
2633 resync as sync source modifications of the peer's UUIDs. */
2634
31890f4a 2635 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2636 return -1091;
b411b363
PR
2637
2638 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2639 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
4a23f264
PR
2640
2641 dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2642 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2643
b411b363
PR
2644 return -1;
2645 }
2646 }
2647
2648 *rule_nr = 60;
2649 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2650 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2651 peer = mdev->p_uuid[i] & ~((u64)1);
2652 if (self == peer)
2653 return -2;
2654 }
2655
2656 *rule_nr = 70;
2657 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2658 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2659 if (self == peer)
2660 return 1;
2661
2662 *rule_nr = 71;
2663 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2664 if (self == peer) {
31890f4a 2665 if (mdev->tconn->agreed_pro_version < 96 ?
4a23f264
PR
2666 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2667 (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2668 self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2669 /* The last P_SYNC_UUID did not get though. Undo the last start of
2670 resync as sync source modifications of our UUIDs. */
2671
31890f4a 2672 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2673 return -1091;
b411b363
PR
2674
2675 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2676 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2677
4a23f264 2678 dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
b411b363
PR
2679 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2680 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2681
2682 return 1;
2683 }
2684 }
2685
2686
2687 *rule_nr = 80;
d8c2a36b 2688 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
b411b363
PR
2689 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2690 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2691 if (self == peer)
2692 return 2;
2693 }
2694
2695 *rule_nr = 90;
2696 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2697 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2698 if (self == peer && self != ((u64)0))
2699 return 100;
2700
2701 *rule_nr = 100;
2702 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2703 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2704 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2705 peer = mdev->p_uuid[j] & ~((u64)1);
2706 if (self == peer)
2707 return -100;
2708 }
2709 }
2710
2711 return -1000;
2712}
2713
2714/* drbd_sync_handshake() returns the new conn state on success, or
2715 CONN_MASK (-1) on failure.
2716 */
2717static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2718 enum drbd_disk_state peer_disk) __must_hold(local)
2719{
2720 int hg, rule_nr;
2721 enum drbd_conns rv = C_MASK;
2722 enum drbd_disk_state mydisk;
2723
2724 mydisk = mdev->state.disk;
2725 if (mydisk == D_NEGOTIATING)
2726 mydisk = mdev->new_state_tmp.disk;
2727
2728 dev_info(DEV, "drbd_sync_handshake:\n");
2729 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2730 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2731 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2732
2733 hg = drbd_uuid_compare(mdev, &rule_nr);
2734
2735 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2736
2737 if (hg == -1000) {
2738 dev_alert(DEV, "Unrelated data, aborting!\n");
2739 return C_MASK;
2740 }
4a23f264
PR
2741 if (hg < -1000) {
2742 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
b411b363
PR
2743 return C_MASK;
2744 }
2745
2746 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2747 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2748 int f = (hg == -100) || abs(hg) == 2;
2749 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2750 if (f)
2751 hg = hg*2;
2752 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2753 hg > 0 ? "source" : "target");
2754 }
2755
3a11a487
AG
2756 if (abs(hg) == 100)
2757 drbd_khelper(mdev, "initial-split-brain");
2758
89e58e75 2759 if (hg == 100 || (hg == -100 && mdev->tconn->net_conf->always_asbp)) {
b411b363
PR
2760 int pcount = (mdev->state.role == R_PRIMARY)
2761 + (peer_role == R_PRIMARY);
2762 int forced = (hg == -100);
2763
2764 switch (pcount) {
2765 case 0:
2766 hg = drbd_asb_recover_0p(mdev);
2767 break;
2768 case 1:
2769 hg = drbd_asb_recover_1p(mdev);
2770 break;
2771 case 2:
2772 hg = drbd_asb_recover_2p(mdev);
2773 break;
2774 }
2775 if (abs(hg) < 100) {
2776 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2777 "automatically solved. Sync from %s node\n",
2778 pcount, (hg < 0) ? "peer" : "this");
2779 if (forced) {
2780 dev_warn(DEV, "Doing a full sync, since"
2781 " UUIDs where ambiguous.\n");
2782 hg = hg*2;
2783 }
2784 }
2785 }
2786
2787 if (hg == -100) {
89e58e75 2788 if (mdev->tconn->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
b411b363 2789 hg = -1;
89e58e75 2790 if (!mdev->tconn->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
b411b363
PR
2791 hg = 1;
2792
2793 if (abs(hg) < 100)
2794 dev_warn(DEV, "Split-Brain detected, manually solved. "
2795 "Sync from %s node\n",
2796 (hg < 0) ? "peer" : "this");
2797 }
2798
2799 if (hg == -100) {
580b9767
LE
2800 /* FIXME this log message is not correct if we end up here
2801 * after an attempted attach on a diskless node.
2802 * We just refuse to attach -- well, we drop the "connection"
2803 * to that disk, in a way... */
3a11a487 2804 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
b411b363
PR
2805 drbd_khelper(mdev, "split-brain");
2806 return C_MASK;
2807 }
2808
2809 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2810 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2811 return C_MASK;
2812 }
2813
2814 if (hg < 0 && /* by intention we do not use mydisk here. */
2815 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
89e58e75 2816 switch (mdev->tconn->net_conf->rr_conflict) {
b411b363
PR
2817 case ASB_CALL_HELPER:
2818 drbd_khelper(mdev, "pri-lost");
2819 /* fall through */
2820 case ASB_DISCONNECT:
2821 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2822 return C_MASK;
2823 case ASB_VIOLENTLY:
2824 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2825 "assumption\n");
2826 }
2827 }
2828
8169e41b 2829 if (mdev->tconn->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
cf14c2e9
PR
2830 if (hg == 0)
2831 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2832 else
2833 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2834 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2835 abs(hg) >= 2 ? "full" : "bit-map based");
2836 return C_MASK;
2837 }
2838
b411b363
PR
2839 if (abs(hg) >= 2) {
2840 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
20ceb2b2
LE
2841 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2842 BM_LOCKED_SET_ALLOWED))
b411b363
PR
2843 return C_MASK;
2844 }
2845
2846 if (hg > 0) { /* become sync source. */
2847 rv = C_WF_BITMAP_S;
2848 } else if (hg < 0) { /* become sync target */
2849 rv = C_WF_BITMAP_T;
2850 } else {
2851 rv = C_CONNECTED;
2852 if (drbd_bm_total_weight(mdev)) {
2853 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2854 drbd_bm_total_weight(mdev));
2855 }
2856 }
2857
2858 return rv;
2859}
2860
2861/* returns 1 if invalid */
2862static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2863{
2864 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2865 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2866 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2867 return 0;
2868
2869 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2870 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2871 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2872 return 1;
2873
2874 /* everything else is valid if they are equal on both sides. */
2875 if (peer == self)
2876 return 0;
2877
2878 /* everything es is invalid. */
2879 return 1;
2880}
2881
7204624c 2882static int receive_protocol(struct drbd_tconn *tconn, enum drbd_packet cmd,
d8763023 2883 unsigned int data_size)
b411b363 2884{
7204624c 2885 struct p_protocol *p = &tconn->data.rbuf.protocol;
b411b363 2886 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
cf14c2e9 2887 int p_want_lose, p_two_primaries, cf;
b411b363
PR
2888 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2889
b411b363
PR
2890 p_proto = be32_to_cpu(p->protocol);
2891 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2892 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2893 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
b411b363 2894 p_two_primaries = be32_to_cpu(p->two_primaries);
cf14c2e9
PR
2895 cf = be32_to_cpu(p->conn_flags);
2896 p_want_lose = cf & CF_WANT_LOSE;
2897
7204624c 2898 clear_bit(CONN_DRY_RUN, &tconn->flags);
cf14c2e9
PR
2899
2900 if (cf & CF_DRY_RUN)
7204624c 2901 set_bit(CONN_DRY_RUN, &tconn->flags);
b411b363 2902
7204624c
PR
2903 if (p_proto != tconn->net_conf->wire_protocol) {
2904 conn_err(tconn, "incompatible communication protocols\n");
b411b363
PR
2905 goto disconnect;
2906 }
2907
7204624c
PR
2908 if (cmp_after_sb(p_after_sb_0p, tconn->net_conf->after_sb_0p)) {
2909 conn_err(tconn, "incompatible after-sb-0pri settings\n");
b411b363
PR
2910 goto disconnect;
2911 }
2912
7204624c
PR
2913 if (cmp_after_sb(p_after_sb_1p, tconn->net_conf->after_sb_1p)) {
2914 conn_err(tconn, "incompatible after-sb-1pri settings\n");
b411b363
PR
2915 goto disconnect;
2916 }
2917
7204624c
PR
2918 if (cmp_after_sb(p_after_sb_2p, tconn->net_conf->after_sb_2p)) {
2919 conn_err(tconn, "incompatible after-sb-2pri settings\n");
b411b363
PR
2920 goto disconnect;
2921 }
2922
7204624c
PR
2923 if (p_want_lose && tconn->net_conf->want_lose) {
2924 conn_err(tconn, "both sides have the 'want_lose' flag set\n");
b411b363
PR
2925 goto disconnect;
2926 }
2927
7204624c
PR
2928 if (p_two_primaries != tconn->net_conf->two_primaries) {
2929 conn_err(tconn, "incompatible setting of the two-primaries options\n");
b411b363
PR
2930 goto disconnect;
2931 }
2932
7204624c
PR
2933 if (tconn->agreed_pro_version >= 87) {
2934 unsigned char *my_alg = tconn->net_conf->integrity_alg;
b411b363 2935
7204624c 2936 if (drbd_recv(tconn, p_integrity_alg, data_size) != data_size)
81e84650 2937 return false;
b411b363
PR
2938
2939 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2940 if (strcmp(p_integrity_alg, my_alg)) {
7204624c 2941 conn_err(tconn, "incompatible setting of the data-integrity-alg\n");
b411b363
PR
2942 goto disconnect;
2943 }
7204624c 2944 conn_info(tconn, "data-integrity-alg: %s\n",
b411b363
PR
2945 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2946 }
2947
81e84650 2948 return true;
b411b363
PR
2949
2950disconnect:
7204624c 2951 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
81e84650 2952 return false;
b411b363
PR
2953}
2954
2955/* helper function
2956 * input: alg name, feature name
2957 * return: NULL (alg name was "")
2958 * ERR_PTR(error) if something goes wrong
2959 * or the crypto hash ptr, if it worked out ok. */
2960struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2961 const char *alg, const char *name)
2962{
2963 struct crypto_hash *tfm;
2964
2965 if (!alg[0])
2966 return NULL;
2967
2968 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2969 if (IS_ERR(tfm)) {
2970 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2971 alg, name, PTR_ERR(tfm));
2972 return tfm;
2973 }
2974 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2975 crypto_free_hash(tfm);
2976 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2977 return ERR_PTR(-EINVAL);
2978 }
2979 return tfm;
2980}
2981
d8763023
AG
2982static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packet cmd,
2983 unsigned int packet_size)
b411b363 2984{
81e84650 2985 int ok = true;
e42325a5 2986 struct p_rs_param_95 *p = &mdev->tconn->data.rbuf.rs_param_95;
b411b363
PR
2987 unsigned int header_size, data_size, exp_max_sz;
2988 struct crypto_hash *verify_tfm = NULL;
2989 struct crypto_hash *csums_tfm = NULL;
31890f4a 2990 const int apv = mdev->tconn->agreed_pro_version;
778f271d
PR
2991 int *rs_plan_s = NULL;
2992 int fifo_size = 0;
b411b363
PR
2993
2994 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
2995 : apv == 88 ? sizeof(struct p_rs_param)
2996 + SHARED_SECRET_MAX
8e26f9cc
PR
2997 : apv <= 94 ? sizeof(struct p_rs_param_89)
2998 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
b411b363 2999
02918be2 3000 if (packet_size > exp_max_sz) {
b411b363 3001 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
02918be2 3002 packet_size, exp_max_sz);
81e84650 3003 return false;
b411b363
PR
3004 }
3005
3006 if (apv <= 88) {
257d0af6 3007 header_size = sizeof(struct p_rs_param) - sizeof(struct p_header);
02918be2 3008 data_size = packet_size - header_size;
8e26f9cc 3009 } else if (apv <= 94) {
257d0af6 3010 header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header);
02918be2 3011 data_size = packet_size - header_size;
b411b363 3012 D_ASSERT(data_size == 0);
8e26f9cc 3013 } else {
257d0af6 3014 header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header);
02918be2 3015 data_size = packet_size - header_size;
b411b363
PR
3016 D_ASSERT(data_size == 0);
3017 }
3018
3019 /* initialize verify_alg and csums_alg */
3020 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3021
de0ff338 3022 if (drbd_recv(mdev->tconn, &p->head.payload, header_size) != header_size)
81e84650 3023 return false;
b411b363 3024
f399002e
LE
3025 if (get_ldev(mdev)) {
3026 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3027 put_ldev(mdev);
3028 }
b411b363
PR
3029
3030 if (apv >= 88) {
3031 if (apv == 88) {
3032 if (data_size > SHARED_SECRET_MAX) {
3033 dev_err(DEV, "verify-alg too long, "
3034 "peer wants %u, accepting only %u byte\n",
3035 data_size, SHARED_SECRET_MAX);
81e84650 3036 return false;
b411b363
PR
3037 }
3038
de0ff338 3039 if (drbd_recv(mdev->tconn, p->verify_alg, data_size) != data_size)
81e84650 3040 return false;
b411b363
PR
3041
3042 /* we expect NUL terminated string */
3043 /* but just in case someone tries to be evil */
3044 D_ASSERT(p->verify_alg[data_size-1] == 0);
3045 p->verify_alg[data_size-1] = 0;
3046
3047 } else /* apv >= 89 */ {
3048 /* we still expect NUL terminated strings */
3049 /* but just in case someone tries to be evil */
3050 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3051 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3052 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3053 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3054 }
3055
f399002e 3056 if (strcmp(mdev->tconn->net_conf->verify_alg, p->verify_alg)) {
b411b363
PR
3057 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3058 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
f399002e 3059 mdev->tconn->net_conf->verify_alg, p->verify_alg);
b411b363
PR
3060 goto disconnect;
3061 }
3062 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3063 p->verify_alg, "verify-alg");
3064 if (IS_ERR(verify_tfm)) {
3065 verify_tfm = NULL;
3066 goto disconnect;
3067 }
3068 }
3069
f399002e 3070 if (apv >= 89 && strcmp(mdev->tconn->net_conf->csums_alg, p->csums_alg)) {
b411b363
PR
3071 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3072 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
f399002e 3073 mdev->tconn->net_conf->csums_alg, p->csums_alg);
b411b363
PR
3074 goto disconnect;
3075 }
3076 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3077 p->csums_alg, "csums-alg");
3078 if (IS_ERR(csums_tfm)) {
3079 csums_tfm = NULL;
3080 goto disconnect;
3081 }
3082 }
3083
f399002e
LE
3084 if (apv > 94 && get_ldev(mdev)) {
3085 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3086 mdev->ldev->dc.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3087 mdev->ldev->dc.c_delay_target = be32_to_cpu(p->c_delay_target);
3088 mdev->ldev->dc.c_fill_target = be32_to_cpu(p->c_fill_target);
3089 mdev->ldev->dc.c_max_rate = be32_to_cpu(p->c_max_rate);
778f271d 3090
f399002e 3091 fifo_size = (mdev->ldev->dc.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
778f271d
PR
3092 if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
3093 rs_plan_s = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
3094 if (!rs_plan_s) {
3095 dev_err(DEV, "kmalloc of fifo_buffer failed");
f399002e 3096 put_ldev(mdev);
778f271d
PR
3097 goto disconnect;
3098 }
3099 }
f399002e 3100 put_ldev(mdev);
8e26f9cc 3101 }
b411b363
PR
3102
3103 spin_lock(&mdev->peer_seq_lock);
3104 /* lock against drbd_nl_syncer_conf() */
3105 if (verify_tfm) {
f399002e
LE
3106 strcpy(mdev->tconn->net_conf->verify_alg, p->verify_alg);
3107 mdev->tconn->net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3108 crypto_free_hash(mdev->tconn->verify_tfm);
3109 mdev->tconn->verify_tfm = verify_tfm;
b411b363
PR
3110 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3111 }
3112 if (csums_tfm) {
f399002e
LE
3113 strcpy(mdev->tconn->net_conf->csums_alg, p->csums_alg);
3114 mdev->tconn->net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3115 crypto_free_hash(mdev->tconn->csums_tfm);
3116 mdev->tconn->csums_tfm = csums_tfm;
b411b363
PR
3117 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3118 }
778f271d
PR
3119 if (fifo_size != mdev->rs_plan_s.size) {
3120 kfree(mdev->rs_plan_s.values);
3121 mdev->rs_plan_s.values = rs_plan_s;
3122 mdev->rs_plan_s.size = fifo_size;
3123 mdev->rs_planed = 0;
3124 }
b411b363
PR
3125 spin_unlock(&mdev->peer_seq_lock);
3126 }
3127
3128 return ok;
3129disconnect:
3130 /* just for completeness: actually not needed,
3131 * as this is not reached if csums_tfm was ok. */
3132 crypto_free_hash(csums_tfm);
3133 /* but free the verify_tfm again, if csums_tfm did not work out */
3134 crypto_free_hash(verify_tfm);
38fa9988 3135 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
81e84650 3136 return false;
b411b363
PR
3137}
3138
b411b363
PR
3139/* warn if the arguments differ by more than 12.5% */
3140static void warn_if_differ_considerably(struct drbd_conf *mdev,
3141 const char *s, sector_t a, sector_t b)
3142{
3143 sector_t d;
3144 if (a == 0 || b == 0)
3145 return;
3146 d = (a > b) ? (a - b) : (b - a);
3147 if (d > (a>>3) || d > (b>>3))
3148 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3149 (unsigned long long)a, (unsigned long long)b);
3150}
3151
d8763023
AG
3152static int receive_sizes(struct drbd_conf *mdev, enum drbd_packet cmd,
3153 unsigned int data_size)
b411b363 3154{
e42325a5 3155 struct p_sizes *p = &mdev->tconn->data.rbuf.sizes;
b411b363 3156 enum determine_dev_size dd = unchanged;
b411b363
PR
3157 sector_t p_size, p_usize, my_usize;
3158 int ldsc = 0; /* local disk size changed */
e89b591c 3159 enum dds_flags ddsf;
b411b363 3160
b411b363
PR
3161 p_size = be64_to_cpu(p->d_size);
3162 p_usize = be64_to_cpu(p->u_size);
3163
b411b363
PR
3164 /* just store the peer's disk size for now.
3165 * we still need to figure out whether we accept that. */
3166 mdev->p_size = p_size;
3167
b411b363
PR
3168 if (get_ldev(mdev)) {
3169 warn_if_differ_considerably(mdev, "lower level device sizes",
3170 p_size, drbd_get_max_capacity(mdev->ldev));
3171 warn_if_differ_considerably(mdev, "user requested size",
3172 p_usize, mdev->ldev->dc.disk_size);
3173
3174 /* if this is the first connect, or an otherwise expected
3175 * param exchange, choose the minimum */
3176 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3177 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3178 p_usize);
3179
3180 my_usize = mdev->ldev->dc.disk_size;
3181
3182 if (mdev->ldev->dc.disk_size != p_usize) {
3183 mdev->ldev->dc.disk_size = p_usize;
3184 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3185 (unsigned long)mdev->ldev->dc.disk_size);
3186 }
3187
3188 /* Never shrink a device with usable data during connect.
3189 But allow online shrinking if we are connected. */
a393db6f 3190 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
b411b363
PR
3191 drbd_get_capacity(mdev->this_bdev) &&
3192 mdev->state.disk >= D_OUTDATED &&
3193 mdev->state.conn < C_CONNECTED) {
3194 dev_err(DEV, "The peer's disk size is too small!\n");
38fa9988 3195 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
3196 mdev->ldev->dc.disk_size = my_usize;
3197 put_ldev(mdev);
81e84650 3198 return false;
b411b363
PR
3199 }
3200 put_ldev(mdev);
3201 }
b411b363 3202
e89b591c 3203 ddsf = be16_to_cpu(p->dds_flags);
b411b363 3204 if (get_ldev(mdev)) {
24c4830c 3205 dd = drbd_determine_dev_size(mdev, ddsf);
b411b363
PR
3206 put_ldev(mdev);
3207 if (dd == dev_size_error)
81e84650 3208 return false;
b411b363
PR
3209 drbd_md_sync(mdev);
3210 } else {
3211 /* I am diskless, need to accept the peer's size. */
3212 drbd_set_my_capacity(mdev, p_size);
3213 }
3214
99432fcc
PR
3215 mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3216 drbd_reconsider_max_bio_size(mdev);
3217
b411b363
PR
3218 if (get_ldev(mdev)) {
3219 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3220 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3221 ldsc = 1;
3222 }
3223
b411b363
PR
3224 put_ldev(mdev);
3225 }
3226
3227 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3228 if (be64_to_cpu(p->c_size) !=
3229 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3230 /* we have different sizes, probably peer
3231 * needs to know my new size... */
e89b591c 3232 drbd_send_sizes(mdev, 0, ddsf);
b411b363
PR
3233 }
3234 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3235 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3236 if (mdev->state.pdsk >= D_INCONSISTENT &&
e89b591c
PR
3237 mdev->state.disk >= D_INCONSISTENT) {
3238 if (ddsf & DDSF_NO_RESYNC)
3239 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3240 else
3241 resync_after_online_grow(mdev);
3242 } else
b411b363
PR
3243 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3244 }
3245 }
3246
81e84650 3247 return true;
b411b363
PR
3248}
3249
d8763023
AG
3250static int receive_uuids(struct drbd_conf *mdev, enum drbd_packet cmd,
3251 unsigned int data_size)
b411b363 3252{
e42325a5 3253 struct p_uuids *p = &mdev->tconn->data.rbuf.uuids;
b411b363 3254 u64 *p_uuid;
62b0da3a 3255 int i, updated_uuids = 0;
b411b363 3256
b411b363
PR
3257 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3258
3259 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3260 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3261
3262 kfree(mdev->p_uuid);
3263 mdev->p_uuid = p_uuid;
3264
3265 if (mdev->state.conn < C_CONNECTED &&
3266 mdev->state.disk < D_INCONSISTENT &&
3267 mdev->state.role == R_PRIMARY &&
3268 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3269 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3270 (unsigned long long)mdev->ed_uuid);
38fa9988 3271 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
81e84650 3272 return false;
b411b363
PR
3273 }
3274
3275 if (get_ldev(mdev)) {
3276 int skip_initial_sync =
3277 mdev->state.conn == C_CONNECTED &&
31890f4a 3278 mdev->tconn->agreed_pro_version >= 90 &&
b411b363
PR
3279 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3280 (p_uuid[UI_FLAGS] & 8);
3281 if (skip_initial_sync) {
3282 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3283 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
20ceb2b2
LE
3284 "clear_n_write from receive_uuids",
3285 BM_LOCKED_TEST_ALLOWED);
b411b363
PR
3286 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3287 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3288 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3289 CS_VERBOSE, NULL);
3290 drbd_md_sync(mdev);
62b0da3a 3291 updated_uuids = 1;
b411b363
PR
3292 }
3293 put_ldev(mdev);
18a50fa2
PR
3294 } else if (mdev->state.disk < D_INCONSISTENT &&
3295 mdev->state.role == R_PRIMARY) {
3296 /* I am a diskless primary, the peer just created a new current UUID
3297 for me. */
62b0da3a 3298 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
b411b363
PR
3299 }
3300
3301 /* Before we test for the disk state, we should wait until an eventually
3302 ongoing cluster wide state change is finished. That is important if
3303 we are primary and are detaching from our disk. We need to see the
3304 new disk state... */
8410da8f
PR
3305 mutex_lock(mdev->state_mutex);
3306 mutex_unlock(mdev->state_mutex);
b411b363 3307 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
62b0da3a
LE
3308 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3309
3310 if (updated_uuids)
3311 drbd_print_uuids(mdev, "receiver updated UUIDs to");
b411b363 3312
81e84650 3313 return true;
b411b363
PR
3314}
3315
3316/**
3317 * convert_state() - Converts the peer's view of the cluster state to our point of view
3318 * @ps: The state as seen by the peer.
3319 */
3320static union drbd_state convert_state(union drbd_state ps)
3321{
3322 union drbd_state ms;
3323
3324 static enum drbd_conns c_tab[] = {
3325 [C_CONNECTED] = C_CONNECTED,
3326
3327 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3328 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3329 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3330 [C_VERIFY_S] = C_VERIFY_T,
3331 [C_MASK] = C_MASK,
3332 };
3333
3334 ms.i = ps.i;
3335
3336 ms.conn = c_tab[ps.conn];
3337 ms.peer = ps.role;
3338 ms.role = ps.peer;
3339 ms.pdsk = ps.disk;
3340 ms.disk = ps.pdsk;
3341 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3342
3343 return ms;
3344}
3345
d8763023
AG
3346static int receive_req_state(struct drbd_conf *mdev, enum drbd_packet cmd,
3347 unsigned int data_size)
b411b363 3348{
e42325a5 3349 struct p_req_state *p = &mdev->tconn->data.rbuf.req_state;
b411b363 3350 union drbd_state mask, val;
bf885f8a 3351 enum drbd_state_rv rv;
b411b363 3352
b411b363
PR
3353 mask.i = be32_to_cpu(p->mask);
3354 val.i = be32_to_cpu(p->val);
3355
25703f83 3356 if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
8410da8f 3357 mutex_is_locked(mdev->state_mutex)) {
b411b363 3358 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
81e84650 3359 return true;
b411b363
PR
3360 }
3361
3362 mask = convert_state(mask);
3363 val = convert_state(val);
3364
dfafcc8a
PR
3365 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3366 drbd_send_sr_reply(mdev, rv);
b411b363 3367
b411b363
PR
3368 drbd_md_sync(mdev);
3369
81e84650 3370 return true;
b411b363
PR
3371}
3372
dfafcc8a
PR
3373static int receive_req_conn_state(struct drbd_tconn *tconn, enum drbd_packet cmd,
3374 unsigned int data_size)
3375{
3376 struct p_req_state *p = &tconn->data.rbuf.req_state;
3377 union drbd_state mask, val;
3378 enum drbd_state_rv rv;
3379
3380 mask.i = be32_to_cpu(p->mask);
3381 val.i = be32_to_cpu(p->val);
3382
3383 if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3384 mutex_is_locked(&tconn->cstate_mutex)) {
3385 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3386 return true;
3387 }
3388
3389 mask = convert_state(mask);
3390 val = convert_state(val);
3391
3392 rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY);
3393 conn_send_sr_reply(tconn, rv);
3394
3395 return true;
3396}
3397
d8763023
AG
3398static int receive_state(struct drbd_conf *mdev, enum drbd_packet cmd,
3399 unsigned int data_size)
b411b363 3400{
e42325a5 3401 struct p_state *p = &mdev->tconn->data.rbuf.state;
4ac4aada 3402 union drbd_state os, ns, peer_state;
b411b363 3403 enum drbd_disk_state real_peer_disk;
65d922c3 3404 enum chg_state_flags cs_flags;
b411b363
PR
3405 int rv;
3406
b411b363
PR
3407 peer_state.i = be32_to_cpu(p->state);
3408
3409 real_peer_disk = peer_state.disk;
3410 if (peer_state.disk == D_NEGOTIATING) {
3411 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3412 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3413 }
3414
87eeee41 3415 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 3416 retry:
4ac4aada 3417 os = ns = mdev->state;
87eeee41 3418 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 3419
e9ef7bb6
LE
3420 /* peer says his disk is uptodate, while we think it is inconsistent,
3421 * and this happens while we think we have a sync going on. */
3422 if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3423 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3424 /* If we are (becoming) SyncSource, but peer is still in sync
3425 * preparation, ignore its uptodate-ness to avoid flapping, it
3426 * will change to inconsistent once the peer reaches active
3427 * syncing states.
3428 * It may have changed syncer-paused flags, however, so we
3429 * cannot ignore this completely. */
3430 if (peer_state.conn > C_CONNECTED &&
3431 peer_state.conn < C_SYNC_SOURCE)
3432 real_peer_disk = D_INCONSISTENT;
3433
3434 /* if peer_state changes to connected at the same time,
3435 * it explicitly notifies us that it finished resync.
3436 * Maybe we should finish it up, too? */
3437 else if (os.conn >= C_SYNC_SOURCE &&
3438 peer_state.conn == C_CONNECTED) {
3439 if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3440 drbd_resync_finished(mdev);
81e84650 3441 return true;
e9ef7bb6
LE
3442 }
3443 }
3444
3445 /* peer says his disk is inconsistent, while we think it is uptodate,
3446 * and this happens while the peer still thinks we have a sync going on,
3447 * but we think we are already done with the sync.
3448 * We ignore this to avoid flapping pdsk.
3449 * This should not happen, if the peer is a recent version of drbd. */
3450 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3451 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3452 real_peer_disk = D_UP_TO_DATE;
3453
4ac4aada
LE
3454 if (ns.conn == C_WF_REPORT_PARAMS)
3455 ns.conn = C_CONNECTED;
b411b363 3456
67531718
PR
3457 if (peer_state.conn == C_AHEAD)
3458 ns.conn = C_BEHIND;
3459
b411b363
PR
3460 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3461 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3462 int cr; /* consider resync */
3463
3464 /* if we established a new connection */
4ac4aada 3465 cr = (os.conn < C_CONNECTED);
b411b363
PR
3466 /* if we had an established connection
3467 * and one of the nodes newly attaches a disk */
4ac4aada 3468 cr |= (os.conn == C_CONNECTED &&
b411b363 3469 (peer_state.disk == D_NEGOTIATING ||
4ac4aada 3470 os.disk == D_NEGOTIATING));
b411b363
PR
3471 /* if we have both been inconsistent, and the peer has been
3472 * forced to be UpToDate with --overwrite-data */
3473 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3474 /* if we had been plain connected, and the admin requested to
3475 * start a sync by "invalidate" or "invalidate-remote" */
4ac4aada 3476 cr |= (os.conn == C_CONNECTED &&
b411b363
PR
3477 (peer_state.conn >= C_STARTING_SYNC_S &&
3478 peer_state.conn <= C_WF_BITMAP_T));
3479
3480 if (cr)
4ac4aada 3481 ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
b411b363
PR
3482
3483 put_ldev(mdev);
4ac4aada
LE
3484 if (ns.conn == C_MASK) {
3485 ns.conn = C_CONNECTED;
b411b363 3486 if (mdev->state.disk == D_NEGOTIATING) {
82f59cc6 3487 drbd_force_state(mdev, NS(disk, D_FAILED));
b411b363
PR
3488 } else if (peer_state.disk == D_NEGOTIATING) {
3489 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3490 peer_state.disk = D_DISKLESS;
580b9767 3491 real_peer_disk = D_DISKLESS;
b411b363 3492 } else {
8169e41b 3493 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
81e84650 3494 return false;
4ac4aada 3495 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
38fa9988 3496 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
81e84650 3497 return false;
b411b363
PR
3498 }
3499 }
3500 }
3501
87eeee41 3502 spin_lock_irq(&mdev->tconn->req_lock);
4ac4aada 3503 if (mdev->state.i != os.i)
b411b363
PR
3504 goto retry;
3505 clear_bit(CONSIDER_RESYNC, &mdev->flags);
b411b363
PR
3506 ns.peer = peer_state.role;
3507 ns.pdsk = real_peer_disk;
3508 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4ac4aada 3509 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
b411b363 3510 ns.disk = mdev->new_state_tmp.disk;
4ac4aada
LE
3511 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3512 if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
481c6f50 3513 test_bit(NEW_CUR_UUID, &mdev->flags)) {
8554df1c 3514 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
481c6f50 3515 for temporal network outages! */
87eeee41 3516 spin_unlock_irq(&mdev->tconn->req_lock);
481c6f50 3517 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
2f5cdd0b 3518 tl_clear(mdev->tconn);
481c6f50
PR
3519 drbd_uuid_new_current(mdev);
3520 clear_bit(NEW_CUR_UUID, &mdev->flags);
38fa9988 3521 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
81e84650 3522 return false;
481c6f50 3523 }
65d922c3 3524 rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
b411b363 3525 ns = mdev->state;
87eeee41 3526 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
3527
3528 if (rv < SS_SUCCESS) {
38fa9988 3529 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
81e84650 3530 return false;
b411b363
PR
3531 }
3532
4ac4aada
LE
3533 if (os.conn > C_WF_REPORT_PARAMS) {
3534 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
b411b363
PR
3535 peer_state.disk != D_NEGOTIATING ) {
3536 /* we want resync, peer has not yet decided to sync... */
3537 /* Nowadays only used when forcing a node into primary role and
3538 setting its disk to UpToDate with that */
3539 drbd_send_uuids(mdev);
3540 drbd_send_state(mdev);
3541 }
3542 }
3543
89e58e75 3544 mdev->tconn->net_conf->want_lose = 0;
b411b363
PR
3545
3546 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3547
81e84650 3548 return true;
b411b363
PR
3549}
3550
d8763023
AG
3551static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packet cmd,
3552 unsigned int data_size)
b411b363 3553{
e42325a5 3554 struct p_rs_uuid *p = &mdev->tconn->data.rbuf.rs_uuid;
b411b363
PR
3555
3556 wait_event(mdev->misc_wait,
3557 mdev->state.conn == C_WF_SYNC_UUID ||
c4752ef1 3558 mdev->state.conn == C_BEHIND ||
b411b363
PR
3559 mdev->state.conn < C_CONNECTED ||
3560 mdev->state.disk < D_NEGOTIATING);
3561
3562 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3563
b411b363
PR
3564 /* Here the _drbd_uuid_ functions are right, current should
3565 _not_ be rotated into the history */
3566 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3567 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3568 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3569
62b0da3a 3570 drbd_print_uuids(mdev, "updated sync uuid");
b411b363
PR
3571 drbd_start_resync(mdev, C_SYNC_TARGET);
3572
3573 put_ldev(mdev);
3574 } else
3575 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3576
81e84650 3577 return true;
b411b363
PR
3578}
3579
2c46407d
AG
3580/**
3581 * receive_bitmap_plain
3582 *
3583 * Return 0 when done, 1 when another iteration is needed, and a negative error
3584 * code upon failure.
3585 */
3586static int
02918be2
PR
3587receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3588 unsigned long *buffer, struct bm_xfer_ctx *c)
b411b363
PR
3589{
3590 unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3591 unsigned want = num_words * sizeof(long);
2c46407d 3592 int err;
b411b363 3593
02918be2
PR
3594 if (want != data_size) {
3595 dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
2c46407d 3596 return -EIO;
b411b363
PR
3597 }
3598 if (want == 0)
2c46407d 3599 return 0;
de0ff338 3600 err = drbd_recv(mdev->tconn, buffer, want);
2c46407d
AG
3601 if (err != want) {
3602 if (err >= 0)
3603 err = -EIO;
3604 return err;
3605 }
b411b363
PR
3606
3607 drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3608
3609 c->word_offset += num_words;
3610 c->bit_offset = c->word_offset * BITS_PER_LONG;
3611 if (c->bit_offset > c->bm_bits)
3612 c->bit_offset = c->bm_bits;
3613
2c46407d 3614 return 1;
b411b363
PR
3615}
3616
2c46407d
AG
3617/**
3618 * recv_bm_rle_bits
3619 *
3620 * Return 0 when done, 1 when another iteration is needed, and a negative error
3621 * code upon failure.
3622 */
3623static int
b411b363
PR
3624recv_bm_rle_bits(struct drbd_conf *mdev,
3625 struct p_compressed_bm *p,
c6d25cfe
PR
3626 struct bm_xfer_ctx *c,
3627 unsigned int len)
b411b363
PR
3628{
3629 struct bitstream bs;
3630 u64 look_ahead;
3631 u64 rl;
3632 u64 tmp;
3633 unsigned long s = c->bit_offset;
3634 unsigned long e;
b411b363
PR
3635 int toggle = DCBP_get_start(p);
3636 int have;
3637 int bits;
3638
3639 bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3640
3641 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3642 if (bits < 0)
2c46407d 3643 return -EIO;
b411b363
PR
3644
3645 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3646 bits = vli_decode_bits(&rl, look_ahead);
3647 if (bits <= 0)
2c46407d 3648 return -EIO;
b411b363
PR
3649
3650 if (toggle) {
3651 e = s + rl -1;
3652 if (e >= c->bm_bits) {
3653 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
2c46407d 3654 return -EIO;
b411b363
PR
3655 }
3656 _drbd_bm_set_bits(mdev, s, e);
3657 }
3658
3659 if (have < bits) {
3660 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3661 have, bits, look_ahead,
3662 (unsigned int)(bs.cur.b - p->code),
3663 (unsigned int)bs.buf_len);
2c46407d 3664 return -EIO;
b411b363
PR
3665 }
3666 look_ahead >>= bits;
3667 have -= bits;
3668
3669 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3670 if (bits < 0)
2c46407d 3671 return -EIO;
b411b363
PR
3672 look_ahead |= tmp << have;
3673 have += bits;
3674 }
3675
3676 c->bit_offset = s;
3677 bm_xfer_ctx_bit_to_word_offset(c);
3678
2c46407d 3679 return (s != c->bm_bits);
b411b363
PR
3680}
3681
2c46407d
AG
3682/**
3683 * decode_bitmap_c
3684 *
3685 * Return 0 when done, 1 when another iteration is needed, and a negative error
3686 * code upon failure.
3687 */
3688static int
b411b363
PR
3689decode_bitmap_c(struct drbd_conf *mdev,
3690 struct p_compressed_bm *p,
c6d25cfe
PR
3691 struct bm_xfer_ctx *c,
3692 unsigned int len)
b411b363
PR
3693{
3694 if (DCBP_get_code(p) == RLE_VLI_Bits)
c6d25cfe 3695 return recv_bm_rle_bits(mdev, p, c, len);
b411b363
PR
3696
3697 /* other variants had been implemented for evaluation,
3698 * but have been dropped as this one turned out to be "best"
3699 * during all our tests. */
3700
3701 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
38fa9988 3702 conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
2c46407d 3703 return -EIO;
b411b363
PR
3704}
3705
3706void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3707 const char *direction, struct bm_xfer_ctx *c)
3708{
3709 /* what would it take to transfer it "plaintext" */
c012949a 3710 unsigned plain = sizeof(struct p_header) *
b411b363
PR
3711 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3712 + c->bm_words * sizeof(long);
3713 unsigned total = c->bytes[0] + c->bytes[1];
3714 unsigned r;
3715
3716 /* total can not be zero. but just in case: */
3717 if (total == 0)
3718 return;
3719
3720 /* don't report if not compressed */
3721 if (total >= plain)
3722 return;
3723
3724 /* total < plain. check for overflow, still */
3725 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3726 : (1000 * total / plain);
3727
3728 if (r > 1000)
3729 r = 1000;
3730
3731 r = 1000 - r;
3732 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3733 "total %u; compression: %u.%u%%\n",
3734 direction,
3735 c->bytes[1], c->packets[1],
3736 c->bytes[0], c->packets[0],
3737 total, r/10, r % 10);
3738}
3739
3740/* Since we are processing the bitfield from lower addresses to higher,
3741 it does not matter if the process it in 32 bit chunks or 64 bit
3742 chunks as long as it is little endian. (Understand it as byte stream,
3743 beginning with the lowest byte...) If we would use big endian
3744 we would need to process it from the highest address to the lowest,
3745 in order to be agnostic to the 32 vs 64 bits issue.
3746
3747 returns 0 on failure, 1 if we successfully received it. */
d8763023
AG
3748static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packet cmd,
3749 unsigned int data_size)
b411b363
PR
3750{
3751 struct bm_xfer_ctx c;
3752 void *buffer;
2c46407d 3753 int err;
81e84650 3754 int ok = false;
257d0af6 3755 struct p_header *h = &mdev->tconn->data.rbuf.header;
77351055 3756 struct packet_info pi;
b411b363 3757
20ceb2b2
LE
3758 drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3759 /* you are supposed to send additional out-of-sync information
3760 * if you actually set bits during this phase */
b411b363
PR
3761
3762 /* maybe we should use some per thread scratch page,
3763 * and allocate that during initial device creation? */
3764 buffer = (unsigned long *) __get_free_page(GFP_NOIO);
3765 if (!buffer) {
3766 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3767 goto out;
3768 }
3769
3770 c = (struct bm_xfer_ctx) {
3771 .bm_bits = drbd_bm_bits(mdev),
3772 .bm_words = drbd_bm_words(mdev),
3773 };
3774
2c46407d 3775 for(;;) {
02918be2 3776 if (cmd == P_BITMAP) {
2c46407d 3777 err = receive_bitmap_plain(mdev, data_size, buffer, &c);
02918be2 3778 } else if (cmd == P_COMPRESSED_BITMAP) {
b411b363
PR
3779 /* MAYBE: sanity check that we speak proto >= 90,
3780 * and the feature is enabled! */
3781 struct p_compressed_bm *p;
3782
02918be2 3783 if (data_size > BM_PACKET_PAYLOAD_BYTES) {
b411b363
PR
3784 dev_err(DEV, "ReportCBitmap packet too large\n");
3785 goto out;
3786 }
3787 /* use the page buff */
3788 p = buffer;
3789 memcpy(p, h, sizeof(*h));
de0ff338 3790 if (drbd_recv(mdev->tconn, p->head.payload, data_size) != data_size)
b411b363 3791 goto out;
004352fa
LE
3792 if (data_size <= (sizeof(*p) - sizeof(p->head))) {
3793 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
78fcbdae 3794 goto out;
b411b363 3795 }
c6d25cfe 3796 err = decode_bitmap_c(mdev, p, &c, data_size);
b411b363 3797 } else {
02918be2 3798 dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
b411b363
PR
3799 goto out;
3800 }
3801
02918be2 3802 c.packets[cmd == P_BITMAP]++;
257d0af6 3803 c.bytes[cmd == P_BITMAP] += sizeof(struct p_header) + data_size;
b411b363 3804
2c46407d
AG
3805 if (err <= 0) {
3806 if (err < 0)
3807 goto out;
b411b363 3808 break;
2c46407d 3809 }
69bc7bc3 3810 if (drbd_recv_header(mdev->tconn, &pi))
b411b363 3811 goto out;
77351055
PR
3812 cmd = pi.cmd;
3813 data_size = pi.size;
2c46407d 3814 }
b411b363
PR
3815
3816 INFO_bm_xfer_stats(mdev, "receive", &c);
3817
3818 if (mdev->state.conn == C_WF_BITMAP_T) {
de1f8e4a
AG
3819 enum drbd_state_rv rv;
3820
b411b363
PR
3821 ok = !drbd_send_bitmap(mdev);
3822 if (!ok)
3823 goto out;
3824 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
de1f8e4a
AG
3825 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3826 D_ASSERT(rv == SS_SUCCESS);
b411b363
PR
3827 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3828 /* admin may have requested C_DISCONNECTING,
3829 * other threads may have noticed network errors */
3830 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3831 drbd_conn_str(mdev->state.conn));
3832 }
3833
81e84650 3834 ok = true;
b411b363 3835 out:
20ceb2b2 3836 drbd_bm_unlock(mdev);
b411b363
PR
3837 if (ok && mdev->state.conn == C_WF_BITMAP_S)
3838 drbd_start_resync(mdev, C_SYNC_SOURCE);
3839 free_page((unsigned long) buffer);
3840 return ok;
3841}
3842
2de876ef 3843static int _tconn_receive_skip(struct drbd_tconn *tconn, unsigned int data_size)
b411b363
PR
3844{
3845 /* TODO zero copy sink :) */
3846 static char sink[128];
3847 int size, want, r;
3848
02918be2 3849 size = data_size;
b411b363
PR
3850 while (size > 0) {
3851 want = min_t(int, size, sizeof(sink));
2de876ef
PR
3852 r = drbd_recv(tconn, sink, want);
3853 if (r <= 0)
841ce241 3854 break;
b411b363
PR
3855 size -= r;
3856 }
3857 return size == 0;
3858}
3859
2de876ef
PR
3860static int receive_skip(struct drbd_conf *mdev, enum drbd_packet cmd,
3861 unsigned int data_size)
3862{
3863 dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3864 cmd, data_size);
3865
3866 return _tconn_receive_skip(mdev->tconn, data_size);
3867}
3868
3869static int tconn_receive_skip(struct drbd_tconn *tconn, enum drbd_packet cmd, unsigned int data_size)
3870{
3871 conn_warn(tconn, "skipping packet for non existing volume type %d, l: %d!\n",
3872 cmd, data_size);
3873
3874 return _tconn_receive_skip(tconn, data_size);
3875}
3876
d8763023
AG
3877static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packet cmd,
3878 unsigned int data_size)
0ced55a3 3879{
e7f52dfb
LE
3880 /* Make sure we've acked all the TCP data associated
3881 * with the data requests being unplugged */
e42325a5 3882 drbd_tcp_quickack(mdev->tconn->data.socket);
0ced55a3 3883
81e84650 3884 return true;
0ced55a3
PR
3885}
3886
d8763023
AG
3887static int receive_out_of_sync(struct drbd_conf *mdev, enum drbd_packet cmd,
3888 unsigned int data_size)
73a01a18 3889{
e42325a5 3890 struct p_block_desc *p = &mdev->tconn->data.rbuf.block_desc;
73a01a18 3891
f735e363
LE
3892 switch (mdev->state.conn) {
3893 case C_WF_SYNC_UUID:
3894 case C_WF_BITMAP_T:
3895 case C_BEHIND:
3896 break;
3897 default:
3898 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3899 drbd_conn_str(mdev->state.conn));
3900 }
3901
73a01a18
PR
3902 drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3903
81e84650 3904 return true;
73a01a18
PR
3905}
3906
02918be2
PR
3907struct data_cmd {
3908 int expect_payload;
3909 size_t pkt_size;
a4fbda8e 3910 enum mdev_or_conn fa_type; /* first argument's type */
d9ae84e7
PR
3911 union {
3912 int (*mdev_fn)(struct drbd_conf *, enum drbd_packet cmd,
3913 unsigned int to_receive);
3914 int (*conn_fn)(struct drbd_tconn *, enum drbd_packet cmd,
3915 unsigned int to_receive);
3916 };
02918be2
PR
3917};
3918
3919static struct data_cmd drbd_cmd_handler[] = {
d9ae84e7
PR
3920 [P_DATA] = { 1, sizeof(struct p_data), MDEV, { receive_Data } },
3921 [P_DATA_REPLY] = { 1, sizeof(struct p_data), MDEV, { receive_DataReply } },
3922 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), MDEV, { receive_RSDataReply } } ,
3923 [P_BARRIER] = { 0, sizeof(struct p_barrier), MDEV, { receive_Barrier } } ,
3924 [P_BITMAP] = { 1, sizeof(struct p_header), MDEV, { receive_bitmap } } ,
3925 [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header), MDEV, { receive_bitmap } } ,
3926 [P_UNPLUG_REMOTE] = { 0, sizeof(struct p_header), MDEV, { receive_UnplugRemote } },
3927 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3928 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3929 [P_SYNC_PARAM] = { 1, sizeof(struct p_header), MDEV, { receive_SyncParam } },
3930 [P_SYNC_PARAM89] = { 1, sizeof(struct p_header), MDEV, { receive_SyncParam } },
7204624c 3931 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), CONN, { .conn_fn = receive_protocol } },
d9ae84e7
PR
3932 [P_UUIDS] = { 0, sizeof(struct p_uuids), MDEV, { receive_uuids } },
3933 [P_SIZES] = { 0, sizeof(struct p_sizes), MDEV, { receive_sizes } },
3934 [P_STATE] = { 0, sizeof(struct p_state), MDEV, { receive_state } },
3935 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), MDEV, { receive_req_state } },
3936 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), MDEV, { receive_sync_uuid } },
3937 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3938 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3939 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3940 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), MDEV, { receive_skip } },
3941 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), MDEV, { receive_out_of_sync } },
dfafcc8a 3942 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), CONN, { .conn_fn = receive_req_conn_state } },
b411b363
PR
3943};
3944
02918be2 3945/* All handler functions that expect a sub-header get that sub-heder in
e42325a5 3946 mdev->tconn->data.rbuf.header.head.payload.
02918be2 3947
e42325a5 3948 Usually in mdev->tconn->data.rbuf.header.head the callback can find the usual
02918be2
PR
3949 p_header, but they may not rely on that. Since there is also p_header95 !
3950 */
b411b363 3951
eefc2f7d 3952static void drbdd(struct drbd_tconn *tconn)
b411b363 3953{
eefc2f7d 3954 struct p_header *header = &tconn->data.rbuf.header;
77351055 3955 struct packet_info pi;
02918be2
PR
3956 size_t shs; /* sub header size */
3957 int rv;
b411b363 3958
eefc2f7d
PR
3959 while (get_t_state(&tconn->receiver) == RUNNING) {
3960 drbd_thread_current_set_cpu(&tconn->receiver);
69bc7bc3 3961 if (drbd_recv_header(tconn, &pi))
02918be2 3962 goto err_out;
b411b363 3963
6e849ce8 3964 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) ||
d9ae84e7 3965 !drbd_cmd_handler[pi.cmd].mdev_fn)) {
eefc2f7d 3966 conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
02918be2 3967 goto err_out;
0b33a916 3968 }
b411b363 3969
77351055
PR
3970 shs = drbd_cmd_handler[pi.cmd].pkt_size - sizeof(struct p_header);
3971 if (pi.size - shs > 0 && !drbd_cmd_handler[pi.cmd].expect_payload) {
eefc2f7d 3972 conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
02918be2 3973 goto err_out;
b411b363 3974 }
b411b363 3975
c13f7e1a 3976 if (shs) {
eefc2f7d 3977 rv = drbd_recv(tconn, &header->payload, shs);
c13f7e1a 3978 if (unlikely(rv != shs)) {
0ddc5549 3979 if (!signal_pending(current))
eefc2f7d 3980 conn_warn(tconn, "short read while reading sub header: rv=%d\n", rv);
c13f7e1a
LE
3981 goto err_out;
3982 }
3983 }
3984
a4fbda8e 3985 if (drbd_cmd_handler[pi.cmd].fa_type == CONN) {
d9ae84e7
PR
3986 rv = drbd_cmd_handler[pi.cmd].conn_fn(tconn, pi.cmd, pi.size - shs);
3987 } else {
3988 struct drbd_conf *mdev = vnr_to_mdev(tconn, pi.vnr);
3989 rv = mdev ?
3990 drbd_cmd_handler[pi.cmd].mdev_fn(mdev, pi.cmd, pi.size - shs) :
3991 tconn_receive_skip(tconn, pi.cmd, pi.size - shs);
3992 }
b411b363 3993
02918be2 3994 if (unlikely(!rv)) {
eefc2f7d 3995 conn_err(tconn, "error receiving %s, l: %d!\n",
77351055 3996 cmdname(pi.cmd), pi.size);
02918be2 3997 goto err_out;
b411b363
PR
3998 }
3999 }
b411b363 4000
02918be2
PR
4001 if (0) {
4002 err_out:
bbeb641c 4003 conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
02918be2 4004 }
b411b363
PR
4005}
4006
0e29d163 4007void conn_flush_workqueue(struct drbd_tconn *tconn)
b411b363
PR
4008{
4009 struct drbd_wq_barrier barr;
4010
4011 barr.w.cb = w_prev_work_done;
0e29d163 4012 barr.w.tconn = tconn;
b411b363 4013 init_completion(&barr.done);
0e29d163 4014 drbd_queue_work(&tconn->data.work, &barr.w);
b411b363
PR
4015 wait_for_completion(&barr.done);
4016}
4017
360cc740 4018static void drbd_disconnect(struct drbd_tconn *tconn)
b411b363 4019{
bbeb641c 4020 enum drbd_conns oc;
b411b363 4021 int rv = SS_UNKNOWN_ERROR;
b411b363 4022
bbeb641c 4023 if (tconn->cstate == C_STANDALONE)
b411b363 4024 return;
b411b363
PR
4025
4026 /* asender does not clean up anything. it must not interfere, either */
360cc740
PR
4027 drbd_thread_stop(&tconn->asender);
4028 drbd_free_sock(tconn);
4029
4030 idr_for_each(&tconn->volumes, drbd_disconnected, tconn);
4031
4032 conn_info(tconn, "Connection closed\n");
4033
4034 spin_lock_irq(&tconn->req_lock);
bbeb641c
PR
4035 oc = tconn->cstate;
4036 if (oc >= C_UNCONNECTED)
4037 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4038
360cc740
PR
4039 spin_unlock_irq(&tconn->req_lock);
4040
bbeb641c 4041 if (oc == C_DISCONNECTING) {
360cc740
PR
4042 wait_event(tconn->net_cnt_wait, atomic_read(&tconn->net_cnt) == 0);
4043
4044 crypto_free_hash(tconn->cram_hmac_tfm);
4045 tconn->cram_hmac_tfm = NULL;
4046
4047 kfree(tconn->net_conf);
4048 tconn->net_conf = NULL;
bbeb641c 4049 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE);
360cc740
PR
4050 }
4051}
4052
4053static int drbd_disconnected(int vnr, void *p, void *data)
4054{
4055 struct drbd_conf *mdev = (struct drbd_conf *)p;
4056 enum drbd_fencing_p fp;
4057 unsigned int i;
b411b363 4058
85719573 4059 /* wait for current activity to cease. */
87eeee41 4060 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
4061 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4062 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4063 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
87eeee41 4064 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
4065
4066 /* We do not have data structures that would allow us to
4067 * get the rs_pending_cnt down to 0 again.
4068 * * On C_SYNC_TARGET we do not have any data structures describing
4069 * the pending RSDataRequest's we have sent.
4070 * * On C_SYNC_SOURCE there is no data structure that tracks
4071 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4072 * And no, it is not the sum of the reference counts in the
4073 * resync_LRU. The resync_LRU tracks the whole operation including
4074 * the disk-IO, while the rs_pending_cnt only tracks the blocks
4075 * on the fly. */
4076 drbd_rs_cancel_all(mdev);
4077 mdev->rs_total = 0;
4078 mdev->rs_failed = 0;
4079 atomic_set(&mdev->rs_pending_cnt, 0);
4080 wake_up(&mdev->misc_wait);
4081
7fde2be9
PR
4082 del_timer(&mdev->request_timer);
4083
b411b363 4084 del_timer_sync(&mdev->resync_timer);
b411b363
PR
4085 resync_timer_fn((unsigned long)mdev);
4086
b411b363
PR
4087 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4088 * w_make_resync_request etc. which may still be on the worker queue
4089 * to be "canceled" */
a21e9298 4090 drbd_flush_workqueue(mdev);
b411b363
PR
4091
4092 /* This also does reclaim_net_ee(). If we do this too early, we might
4093 * miss some resync ee and pages.*/
4094 drbd_process_done_ee(mdev);
4095
4096 kfree(mdev->p_uuid);
4097 mdev->p_uuid = NULL;
4098
fb22c402 4099 if (!is_susp(mdev->state))
2f5cdd0b 4100 tl_clear(mdev->tconn);
b411b363 4101
b411b363
PR
4102 drbd_md_sync(mdev);
4103
4104 fp = FP_DONT_CARE;
4105 if (get_ldev(mdev)) {
4106 fp = mdev->ldev->dc.fencing;
4107 put_ldev(mdev);
4108 }
4109
87f7be4c
PR
4110 if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
4111 drbd_try_outdate_peer_async(mdev);
b411b363 4112
20ceb2b2
LE
4113 /* serialize with bitmap writeout triggered by the state change,
4114 * if any. */
4115 wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4116
b411b363
PR
4117 /* tcp_close and release of sendpage pages can be deferred. I don't
4118 * want to use SO_LINGER, because apparently it can be deferred for
4119 * more than 20 seconds (longest time I checked).
4120 *
4121 * Actually we don't care for exactly when the network stack does its
4122 * put_page(), but release our reference on these pages right here.
4123 */
4124 i = drbd_release_ee(mdev, &mdev->net_ee);
4125 if (i)
4126 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
435f0740
LE
4127 i = atomic_read(&mdev->pp_in_use_by_net);
4128 if (i)
4129 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
b411b363
PR
4130 i = atomic_read(&mdev->pp_in_use);
4131 if (i)
45bb912b 4132 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
b411b363
PR
4133
4134 D_ASSERT(list_empty(&mdev->read_ee));
4135 D_ASSERT(list_empty(&mdev->active_ee));
4136 D_ASSERT(list_empty(&mdev->sync_ee));
4137 D_ASSERT(list_empty(&mdev->done_ee));
4138
4139 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4140 atomic_set(&mdev->current_epoch->epoch_size, 0);
4141 D_ASSERT(list_empty(&mdev->current_epoch->list));
360cc740
PR
4142
4143 return 0;
b411b363
PR
4144}
4145
4146/*
4147 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4148 * we can agree on is stored in agreed_pro_version.
4149 *
4150 * feature flags and the reserved array should be enough room for future
4151 * enhancements of the handshake protocol, and possible plugins...
4152 *
4153 * for now, they are expected to be zero, but ignored.
4154 */
8a22cccc 4155static int drbd_send_handshake(struct drbd_tconn *tconn)
b411b363 4156{
e6b3ea83 4157 /* ASSERT current == mdev->tconn->receiver ... */
8a22cccc 4158 struct p_handshake *p = &tconn->data.sbuf.handshake;
e8d17b01 4159 int err;
b411b363 4160
8a22cccc
PR
4161 if (mutex_lock_interruptible(&tconn->data.mutex)) {
4162 conn_err(tconn, "interrupted during initial handshake\n");
e8d17b01 4163 return -EINTR;
b411b363
PR
4164 }
4165
8a22cccc
PR
4166 if (tconn->data.socket == NULL) {
4167 mutex_unlock(&tconn->data.mutex);
e8d17b01 4168 return -EIO;
b411b363
PR
4169 }
4170
4171 memset(p, 0, sizeof(*p));
4172 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4173 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
e8d17b01 4174 err = _conn_send_cmd(tconn, 0, tconn->data.socket, P_HAND_SHAKE,
ecf2363c 4175 &p->head, sizeof(*p), 0);
8a22cccc 4176 mutex_unlock(&tconn->data.mutex);
e8d17b01 4177 return err;
b411b363
PR
4178}
4179
4180/*
4181 * return values:
4182 * 1 yes, we have a valid connection
4183 * 0 oops, did not work out, please try again
4184 * -1 peer talks different language,
4185 * no point in trying again, please go standalone.
4186 */
65d11ed6 4187static int drbd_do_handshake(struct drbd_tconn *tconn)
b411b363 4188{
65d11ed6
PR
4189 /* ASSERT current == tconn->receiver ... */
4190 struct p_handshake *p = &tconn->data.rbuf.handshake;
02918be2 4191 const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
77351055 4192 struct packet_info pi;
e8d17b01 4193 int err, rv;
b411b363 4194
e8d17b01
AG
4195 err = drbd_send_handshake(tconn);
4196 if (err)
b411b363
PR
4197 return 0;
4198
69bc7bc3
AG
4199 err = drbd_recv_header(tconn, &pi);
4200 if (err)
b411b363
PR
4201 return 0;
4202
77351055 4203 if (pi.cmd != P_HAND_SHAKE) {
65d11ed6 4204 conn_err(tconn, "expected HandShake packet, received: %s (0x%04x)\n",
77351055 4205 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4206 return -1;
4207 }
4208
77351055 4209 if (pi.size != expect) {
65d11ed6 4210 conn_err(tconn, "expected HandShake length: %u, received: %u\n",
77351055 4211 expect, pi.size);
b411b363
PR
4212 return -1;
4213 }
4214
65d11ed6 4215 rv = drbd_recv(tconn, &p->head.payload, expect);
b411b363
PR
4216
4217 if (rv != expect) {
0ddc5549 4218 if (!signal_pending(current))
65d11ed6 4219 conn_warn(tconn, "short read receiving handshake packet: l=%u\n", rv);
b411b363
PR
4220 return 0;
4221 }
4222
b411b363
PR
4223 p->protocol_min = be32_to_cpu(p->protocol_min);
4224 p->protocol_max = be32_to_cpu(p->protocol_max);
4225 if (p->protocol_max == 0)
4226 p->protocol_max = p->protocol_min;
4227
4228 if (PRO_VERSION_MAX < p->protocol_min ||
4229 PRO_VERSION_MIN > p->protocol_max)
4230 goto incompat;
4231
65d11ed6 4232 tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
b411b363 4233
65d11ed6
PR
4234 conn_info(tconn, "Handshake successful: "
4235 "Agreed network protocol version %d\n", tconn->agreed_pro_version);
b411b363
PR
4236
4237 return 1;
4238
4239 incompat:
65d11ed6 4240 conn_err(tconn, "incompatible DRBD dialects: "
b411b363
PR
4241 "I support %d-%d, peer supports %d-%d\n",
4242 PRO_VERSION_MIN, PRO_VERSION_MAX,
4243 p->protocol_min, p->protocol_max);
4244 return -1;
4245}
4246
4247#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
13e6037d 4248static int drbd_do_auth(struct drbd_tconn *tconn)
b411b363
PR
4249{
4250 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4251 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
b10d96cb 4252 return -1;
b411b363
PR
4253}
4254#else
4255#define CHALLENGE_LEN 64
b10d96cb
JT
4256
4257/* Return value:
4258 1 - auth succeeded,
4259 0 - failed, try again (network error),
4260 -1 - auth failed, don't try again.
4261*/
4262
13e6037d 4263static int drbd_do_auth(struct drbd_tconn *tconn)
b411b363
PR
4264{
4265 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4266 struct scatterlist sg;
4267 char *response = NULL;
4268 char *right_response = NULL;
4269 char *peers_ch = NULL;
13e6037d 4270 unsigned int key_len = strlen(tconn->net_conf->shared_secret);
b411b363
PR
4271 unsigned int resp_size;
4272 struct hash_desc desc;
77351055 4273 struct packet_info pi;
69bc7bc3 4274 int err, rv;
b411b363 4275
13e6037d 4276 desc.tfm = tconn->cram_hmac_tfm;
b411b363
PR
4277 desc.flags = 0;
4278
13e6037d
PR
4279 rv = crypto_hash_setkey(tconn->cram_hmac_tfm,
4280 (u8 *)tconn->net_conf->shared_secret, key_len);
b411b363 4281 if (rv) {
13e6037d 4282 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
b10d96cb 4283 rv = -1;
b411b363
PR
4284 goto fail;
4285 }
4286
4287 get_random_bytes(my_challenge, CHALLENGE_LEN);
4288
ce9879cb 4289 rv = !conn_send_cmd2(tconn, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
b411b363
PR
4290 if (!rv)
4291 goto fail;
4292
69bc7bc3
AG
4293 err = drbd_recv_header(tconn, &pi);
4294 if (err) {
4295 rv = 0;
b411b363 4296 goto fail;
69bc7bc3 4297 }
b411b363 4298
77351055 4299 if (pi.cmd != P_AUTH_CHALLENGE) {
13e6037d 4300 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
77351055 4301 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4302 rv = 0;
4303 goto fail;
4304 }
4305
77351055 4306 if (pi.size > CHALLENGE_LEN * 2) {
13e6037d 4307 conn_err(tconn, "expected AuthChallenge payload too big.\n");
b10d96cb 4308 rv = -1;
b411b363
PR
4309 goto fail;
4310 }
4311
77351055 4312 peers_ch = kmalloc(pi.size, GFP_NOIO);
b411b363 4313 if (peers_ch == NULL) {
13e6037d 4314 conn_err(tconn, "kmalloc of peers_ch failed\n");
b10d96cb 4315 rv = -1;
b411b363
PR
4316 goto fail;
4317 }
4318
13e6037d 4319 rv = drbd_recv(tconn, peers_ch, pi.size);
b411b363 4320
77351055 4321 if (rv != pi.size) {
0ddc5549 4322 if (!signal_pending(current))
13e6037d 4323 conn_warn(tconn, "short read AuthChallenge: l=%u\n", rv);
b411b363
PR
4324 rv = 0;
4325 goto fail;
4326 }
4327
13e6037d 4328 resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
b411b363
PR
4329 response = kmalloc(resp_size, GFP_NOIO);
4330 if (response == NULL) {
13e6037d 4331 conn_err(tconn, "kmalloc of response failed\n");
b10d96cb 4332 rv = -1;
b411b363
PR
4333 goto fail;
4334 }
4335
4336 sg_init_table(&sg, 1);
77351055 4337 sg_set_buf(&sg, peers_ch, pi.size);
b411b363
PR
4338
4339 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4340 if (rv) {
13e6037d 4341 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4342 rv = -1;
b411b363
PR
4343 goto fail;
4344 }
4345
ce9879cb 4346 rv = !conn_send_cmd2(tconn, P_AUTH_RESPONSE, response, resp_size);
b411b363
PR
4347 if (!rv)
4348 goto fail;
4349
69bc7bc3
AG
4350 err = drbd_recv_header(tconn, &pi);
4351 if (err) {
4352 rv = 0;
b411b363 4353 goto fail;
69bc7bc3 4354 }
b411b363 4355
77351055 4356 if (pi.cmd != P_AUTH_RESPONSE) {
13e6037d 4357 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
77351055 4358 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4359 rv = 0;
4360 goto fail;
4361 }
4362
77351055 4363 if (pi.size != resp_size) {
13e6037d 4364 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
b411b363
PR
4365 rv = 0;
4366 goto fail;
4367 }
4368
13e6037d 4369 rv = drbd_recv(tconn, response , resp_size);
b411b363
PR
4370
4371 if (rv != resp_size) {
0ddc5549 4372 if (!signal_pending(current))
13e6037d 4373 conn_warn(tconn, "short read receiving AuthResponse: l=%u\n", rv);
b411b363
PR
4374 rv = 0;
4375 goto fail;
4376 }
4377
4378 right_response = kmalloc(resp_size, GFP_NOIO);
2d1ee87d 4379 if (right_response == NULL) {
13e6037d 4380 conn_err(tconn, "kmalloc of right_response failed\n");
b10d96cb 4381 rv = -1;
b411b363
PR
4382 goto fail;
4383 }
4384
4385 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4386
4387 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4388 if (rv) {
13e6037d 4389 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4390 rv = -1;
b411b363
PR
4391 goto fail;
4392 }
4393
4394 rv = !memcmp(response, right_response, resp_size);
4395
4396 if (rv)
13e6037d
PR
4397 conn_info(tconn, "Peer authenticated using %d bytes of '%s' HMAC\n",
4398 resp_size, tconn->net_conf->cram_hmac_alg);
b10d96cb
JT
4399 else
4400 rv = -1;
b411b363
PR
4401
4402 fail:
4403 kfree(peers_ch);
4404 kfree(response);
4405 kfree(right_response);
4406
4407 return rv;
4408}
4409#endif
4410
4411int drbdd_init(struct drbd_thread *thi)
4412{
392c8801 4413 struct drbd_tconn *tconn = thi->tconn;
b411b363
PR
4414 int h;
4415
4d641dd7 4416 conn_info(tconn, "receiver (re)started\n");
b411b363
PR
4417
4418 do {
4d641dd7 4419 h = drbd_connect(tconn);
b411b363 4420 if (h == 0) {
4d641dd7 4421 drbd_disconnect(tconn);
20ee6390 4422 schedule_timeout_interruptible(HZ);
b411b363
PR
4423 }
4424 if (h == -1) {
4d641dd7 4425 conn_warn(tconn, "Discarding network configuration.\n");
bbeb641c 4426 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
4427 }
4428 } while (h == 0);
4429
4430 if (h > 0) {
4d641dd7
PR
4431 if (get_net_conf(tconn)) {
4432 drbdd(tconn);
4433 put_net_conf(tconn);
b411b363
PR
4434 }
4435 }
4436
4d641dd7 4437 drbd_disconnect(tconn);
b411b363 4438
4d641dd7 4439 conn_info(tconn, "receiver terminated\n");
b411b363
PR
4440 return 0;
4441}
4442
4443/* ********* acknowledge sender ******** */
4444
e4f78ede
PR
4445static int got_conn_RqSReply(struct drbd_tconn *tconn, enum drbd_packet cmd)
4446{
4447 struct p_req_state_reply *p = &tconn->meta.rbuf.req_state_reply;
4448 int retcode = be32_to_cpu(p->retcode);
4449
4450 if (retcode >= SS_SUCCESS) {
4451 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4452 } else {
4453 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4454 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4455 drbd_set_st_err_str(retcode), retcode);
4456 }
4457 wake_up(&tconn->ping_wait);
4458
4459 return true;
4460}
4461
d8763023 4462static int got_RqSReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4463{
257d0af6 4464 struct p_req_state_reply *p = &mdev->tconn->meta.rbuf.req_state_reply;
b411b363
PR
4465 int retcode = be32_to_cpu(p->retcode);
4466
e4f78ede
PR
4467 if (retcode >= SS_SUCCESS) {
4468 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4469 } else {
4470 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4471 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4472 drbd_set_st_err_str(retcode), retcode);
b411b363 4473 }
e4f78ede
PR
4474 wake_up(&mdev->state_wait);
4475
81e84650 4476 return true;
b411b363
PR
4477}
4478
f19e4f8b 4479static int got_Ping(struct drbd_tconn *tconn, enum drbd_packet cmd)
b411b363 4480{
f19e4f8b 4481 return drbd_send_ping_ack(tconn);
b411b363
PR
4482
4483}
4484
f19e4f8b 4485static int got_PingAck(struct drbd_tconn *tconn, enum drbd_packet cmd)
b411b363
PR
4486{
4487 /* restore idle timeout */
2a67d8b9
PR
4488 tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4489 if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4490 wake_up(&tconn->ping_wait);
b411b363 4491
81e84650 4492 return true;
b411b363
PR
4493}
4494
d8763023 4495static int got_IsInSync(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4496{
257d0af6 4497 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4498 sector_t sector = be64_to_cpu(p->sector);
4499 int blksize = be32_to_cpu(p->blksize);
4500
31890f4a 4501 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
b411b363
PR
4502
4503 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4504
1d53f09e
LE
4505 if (get_ldev(mdev)) {
4506 drbd_rs_complete_io(mdev, sector);
4507 drbd_set_in_sync(mdev, sector, blksize);
4508 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4509 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4510 put_ldev(mdev);
4511 }
b411b363 4512 dec_rs_pending(mdev);
778f271d 4513 atomic_add(blksize >> 9, &mdev->rs_sect_in);
b411b363 4514
81e84650 4515 return true;
b411b363
PR
4516}
4517
bc9c5c41
AG
4518static int
4519validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4520 struct rb_root *root, const char *func,
4521 enum drbd_req_event what, bool missing_ok)
b411b363
PR
4522{
4523 struct drbd_request *req;
4524 struct bio_and_error m;
4525
87eeee41 4526 spin_lock_irq(&mdev->tconn->req_lock);
bc9c5c41 4527 req = find_request(mdev, root, id, sector, missing_ok, func);
b411b363 4528 if (unlikely(!req)) {
87eeee41 4529 spin_unlock_irq(&mdev->tconn->req_lock);
81e84650 4530 return false;
b411b363
PR
4531 }
4532 __req_mod(req, what, &m);
87eeee41 4533 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
4534
4535 if (m.bio)
4536 complete_master_bio(mdev, &m);
81e84650 4537 return true;
b411b363
PR
4538}
4539
d8763023 4540static int got_BlockAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4541{
257d0af6 4542 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4543 sector_t sector = be64_to_cpu(p->sector);
4544 int blksize = be32_to_cpu(p->blksize);
4545 enum drbd_req_event what;
4546
4547 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4548
579b57ed 4549 if (p->block_id == ID_SYNCER) {
b411b363
PR
4550 drbd_set_in_sync(mdev, sector, blksize);
4551 dec_rs_pending(mdev);
81e84650 4552 return true;
b411b363 4553 }
257d0af6 4554 switch (cmd) {
b411b363 4555 case P_RS_WRITE_ACK:
89e58e75 4556 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4557 what = WRITE_ACKED_BY_PEER_AND_SIS;
b411b363
PR
4558 break;
4559 case P_WRITE_ACK:
89e58e75 4560 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4561 what = WRITE_ACKED_BY_PEER;
b411b363
PR
4562 break;
4563 case P_RECV_ACK:
89e58e75 4564 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B);
8554df1c 4565 what = RECV_ACKED_BY_PEER;
b411b363 4566 break;
7be8da07 4567 case P_DISCARD_WRITE:
89e58e75 4568 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
7be8da07
AG
4569 what = DISCARD_WRITE;
4570 break;
4571 case P_RETRY_WRITE:
4572 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4573 what = POSTPONE_WRITE;
b411b363
PR
4574 break;
4575 default:
4576 D_ASSERT(0);
81e84650 4577 return false;
b411b363
PR
4578 }
4579
4580 return validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41
AG
4581 &mdev->write_requests, __func__,
4582 what, false);
b411b363
PR
4583}
4584
d8763023 4585static int got_NegAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4586{
257d0af6 4587 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363 4588 sector_t sector = be64_to_cpu(p->sector);
2deb8336 4589 int size = be32_to_cpu(p->blksize);
89e58e75
PR
4590 bool missing_ok = mdev->tconn->net_conf->wire_protocol == DRBD_PROT_A ||
4591 mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B;
c3afd8f5 4592 bool found;
b411b363
PR
4593
4594 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4595
579b57ed 4596 if (p->block_id == ID_SYNCER) {
b411b363
PR
4597 dec_rs_pending(mdev);
4598 drbd_rs_failed_io(mdev, sector, size);
81e84650 4599 return true;
b411b363 4600 }
2deb8336 4601
c3afd8f5 4602 found = validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41 4603 &mdev->write_requests, __func__,
8554df1c 4604 NEG_ACKED, missing_ok);
c3afd8f5
AG
4605 if (!found) {
4606 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4607 The master bio might already be completed, therefore the
4608 request is no longer in the collision hash. */
4609 /* In Protocol B we might already have got a P_RECV_ACK
4610 but then get a P_NEG_ACK afterwards. */
4611 if (!missing_ok)
2deb8336 4612 return false;
c3afd8f5 4613 drbd_set_out_of_sync(mdev, sector, size);
2deb8336 4614 }
2deb8336 4615 return true;
b411b363
PR
4616}
4617
d8763023 4618static int got_NegDReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4619{
257d0af6 4620 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4621 sector_t sector = be64_to_cpu(p->sector);
4622
4623 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
7be8da07 4624
b411b363
PR
4625 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4626 (unsigned long long)sector, be32_to_cpu(p->blksize));
4627
4628 return validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41 4629 &mdev->read_requests, __func__,
8554df1c 4630 NEG_ACKED, false);
b411b363
PR
4631}
4632
d8763023 4633static int got_NegRSDReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363
PR
4634{
4635 sector_t sector;
4636 int size;
257d0af6 4637 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4638
4639 sector = be64_to_cpu(p->sector);
4640 size = be32_to_cpu(p->blksize);
b411b363
PR
4641
4642 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4643
4644 dec_rs_pending(mdev);
4645
4646 if (get_ldev_if_state(mdev, D_FAILED)) {
4647 drbd_rs_complete_io(mdev, sector);
257d0af6 4648 switch (cmd) {
d612d309
PR
4649 case P_NEG_RS_DREPLY:
4650 drbd_rs_failed_io(mdev, sector, size);
4651 case P_RS_CANCEL:
4652 break;
4653 default:
4654 D_ASSERT(0);
4655 put_ldev(mdev);
4656 return false;
4657 }
b411b363
PR
4658 put_ldev(mdev);
4659 }
4660
81e84650 4661 return true;
b411b363
PR
4662}
4663
d8763023 4664static int got_BarrierAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4665{
257d0af6 4666 struct p_barrier_ack *p = &mdev->tconn->meta.rbuf.barrier_ack;
b411b363 4667
2f5cdd0b 4668 tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
b411b363 4669
c4752ef1
PR
4670 if (mdev->state.conn == C_AHEAD &&
4671 atomic_read(&mdev->ap_in_flight) == 0 &&
370a43e7
PR
4672 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4673 mdev->start_resync_timer.expires = jiffies + HZ;
4674 add_timer(&mdev->start_resync_timer);
c4752ef1
PR
4675 }
4676
81e84650 4677 return true;
b411b363
PR
4678}
4679
d8763023 4680static int got_OVResult(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4681{
257d0af6 4682 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4683 struct drbd_work *w;
4684 sector_t sector;
4685 int size;
4686
4687 sector = be64_to_cpu(p->sector);
4688 size = be32_to_cpu(p->blksize);
4689
4690 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4691
4692 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4693 drbd_ov_oos_found(mdev, sector, size);
4694 else
4695 ov_oos_print(mdev);
4696
1d53f09e 4697 if (!get_ldev(mdev))
81e84650 4698 return true;
1d53f09e 4699
b411b363
PR
4700 drbd_rs_complete_io(mdev, sector);
4701 dec_rs_pending(mdev);
4702
ea5442af
LE
4703 --mdev->ov_left;
4704
4705 /* let's advance progress step marks only for every other megabyte */
4706 if ((mdev->ov_left & 0x200) == 0x200)
4707 drbd_advance_rs_marks(mdev, mdev->ov_left);
4708
4709 if (mdev->ov_left == 0) {
b411b363
PR
4710 w = kmalloc(sizeof(*w), GFP_NOIO);
4711 if (w) {
4712 w->cb = w_ov_finished;
a21e9298 4713 w->mdev = mdev;
e42325a5 4714 drbd_queue_work_front(&mdev->tconn->data.work, w);
b411b363
PR
4715 } else {
4716 dev_err(DEV, "kmalloc(w) failed.");
4717 ov_oos_print(mdev);
4718 drbd_resync_finished(mdev);
4719 }
4720 }
1d53f09e 4721 put_ldev(mdev);
81e84650 4722 return true;
b411b363
PR
4723}
4724
d8763023 4725static int got_skip(struct drbd_conf *mdev, enum drbd_packet cmd)
0ced55a3 4726{
81e84650 4727 return true;
0ced55a3
PR
4728}
4729
32862ec7
PR
4730static int tconn_process_done_ee(struct drbd_tconn *tconn)
4731{
082a3439
PR
4732 struct drbd_conf *mdev;
4733 int i, not_empty = 0;
32862ec7
PR
4734
4735 do {
4736 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4737 flush_signals(current);
082a3439 4738 idr_for_each_entry(&tconn->volumes, mdev, i) {
e2b3032b 4739 if (drbd_process_done_ee(mdev))
082a3439
PR
4740 return 1; /* error */
4741 }
32862ec7 4742 set_bit(SIGNAL_ASENDER, &tconn->flags);
082a3439
PR
4743
4744 spin_lock_irq(&tconn->req_lock);
4745 idr_for_each_entry(&tconn->volumes, mdev, i) {
4746 not_empty = !list_empty(&mdev->done_ee);
4747 if (not_empty)
4748 break;
4749 }
4750 spin_unlock_irq(&tconn->req_lock);
32862ec7
PR
4751 } while (not_empty);
4752
4753 return 0;
4754}
4755
7201b972
AG
4756struct asender_cmd {
4757 size_t pkt_size;
a4fbda8e
PR
4758 enum mdev_or_conn fa_type; /* first argument's type */
4759 union {
4760 int (*mdev_fn)(struct drbd_conf *mdev, enum drbd_packet cmd);
4761 int (*conn_fn)(struct drbd_tconn *tconn, enum drbd_packet cmd);
4762 };
7201b972
AG
4763};
4764
4765static struct asender_cmd asender_tbl[] = {
f19e4f8b
PR
4766 [P_PING] = { sizeof(struct p_header), CONN, { .conn_fn = got_Ping } },
4767 [P_PING_ACK] = { sizeof(struct p_header), CONN, { .conn_fn = got_PingAck } },
a4fbda8e
PR
4768 [P_RECV_ACK] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
4769 [P_WRITE_ACK] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
4770 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
4771 [P_DISCARD_WRITE] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
4772 [P_NEG_ACK] = { sizeof(struct p_block_ack), MDEV, { got_NegAck } },
4773 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), MDEV, { got_NegDReply } },
4774 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), MDEV, { got_NegRSDReply } },
4775 [P_OV_RESULT] = { sizeof(struct p_block_ack), MDEV, { got_OVResult } },
4776 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), MDEV, { got_BarrierAck } },
4777 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), MDEV, { got_RqSReply } },
4778 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), MDEV, { got_IsInSync } },
4779 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), MDEV, { got_skip } },
4780 [P_RS_CANCEL] = { sizeof(struct p_block_ack), MDEV, { got_NegRSDReply } },
e4f78ede 4781 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), CONN, {.conn_fn = got_conn_RqSReply}},
a4fbda8e 4782 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
7201b972
AG
4783};
4784
b411b363
PR
4785int drbd_asender(struct drbd_thread *thi)
4786{
392c8801 4787 struct drbd_tconn *tconn = thi->tconn;
32862ec7 4788 struct p_header *h = &tconn->meta.rbuf.header;
b411b363 4789 struct asender_cmd *cmd = NULL;
77351055 4790 struct packet_info pi;
257d0af6 4791 int rv;
b411b363
PR
4792 void *buf = h;
4793 int received = 0;
257d0af6 4794 int expect = sizeof(struct p_header);
f36af18c 4795 int ping_timeout_active = 0;
b411b363 4796
b411b363
PR
4797 current->policy = SCHED_RR; /* Make this a realtime task! */
4798 current->rt_priority = 2; /* more important than all other tasks */
4799
e77a0a5c 4800 while (get_t_state(thi) == RUNNING) {
80822284 4801 drbd_thread_current_set_cpu(thi);
32862ec7 4802 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
2a67d8b9 4803 if (!drbd_send_ping(tconn)) {
32862ec7 4804 conn_err(tconn, "drbd_send_ping has failed\n");
841ce241
AG
4805 goto reconnect;
4806 }
32862ec7
PR
4807 tconn->meta.socket->sk->sk_rcvtimeo =
4808 tconn->net_conf->ping_timeo*HZ/10;
f36af18c 4809 ping_timeout_active = 1;
b411b363
PR
4810 }
4811
32862ec7
PR
4812 /* TODO: conditionally cork; it may hurt latency if we cork without
4813 much to send */
4814 if (!tconn->net_conf->no_cork)
4815 drbd_tcp_cork(tconn->meta.socket);
082a3439
PR
4816 if (tconn_process_done_ee(tconn)) {
4817 conn_err(tconn, "tconn_process_done_ee() failed\n");
32862ec7 4818 goto reconnect;
082a3439 4819 }
b411b363 4820 /* but unconditionally uncork unless disabled */
32862ec7
PR
4821 if (!tconn->net_conf->no_cork)
4822 drbd_tcp_uncork(tconn->meta.socket);
b411b363
PR
4823
4824 /* short circuit, recv_msg would return EINTR anyways. */
4825 if (signal_pending(current))
4826 continue;
4827
32862ec7
PR
4828 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
4829 clear_bit(SIGNAL_ASENDER, &tconn->flags);
b411b363
PR
4830
4831 flush_signals(current);
4832
4833 /* Note:
4834 * -EINTR (on meta) we got a signal
4835 * -EAGAIN (on meta) rcvtimeo expired
4836 * -ECONNRESET other side closed the connection
4837 * -ERESTARTSYS (on data) we got a signal
4838 * rv < 0 other than above: unexpected error!
4839 * rv == expected: full header or command
4840 * rv < expected: "woken" by signal during receive
4841 * rv == 0 : "connection shut down by peer"
4842 */
4843 if (likely(rv > 0)) {
4844 received += rv;
4845 buf += rv;
4846 } else if (rv == 0) {
32862ec7 4847 conn_err(tconn, "meta connection shut down by peer.\n");
b411b363
PR
4848 goto reconnect;
4849 } else if (rv == -EAGAIN) {
cb6518cb
LE
4850 /* If the data socket received something meanwhile,
4851 * that is good enough: peer is still alive. */
32862ec7
PR
4852 if (time_after(tconn->last_received,
4853 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
cb6518cb 4854 continue;
f36af18c 4855 if (ping_timeout_active) {
32862ec7 4856 conn_err(tconn, "PingAck did not arrive in time.\n");
b411b363
PR
4857 goto reconnect;
4858 }
32862ec7 4859 set_bit(SEND_PING, &tconn->flags);
b411b363
PR
4860 continue;
4861 } else if (rv == -EINTR) {
4862 continue;
4863 } else {
32862ec7 4864 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
b411b363
PR
4865 goto reconnect;
4866 }
4867
4868 if (received == expect && cmd == NULL) {
8172f3e9 4869 if (decode_header(tconn, h, &pi))
b411b363 4870 goto reconnect;
7201b972
AG
4871 cmd = &asender_tbl[pi.cmd];
4872 if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd) {
32862ec7 4873 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
77351055 4874 pi.cmd, pi.size);
b411b363
PR
4875 goto disconnect;
4876 }
4877 expect = cmd->pkt_size;
77351055 4878 if (pi.size != expect - sizeof(struct p_header)) {
32862ec7 4879 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
77351055 4880 pi.cmd, pi.size);
b411b363 4881 goto reconnect;
257d0af6 4882 }
b411b363
PR
4883 }
4884 if (received == expect) {
a4fbda8e
PR
4885 bool rv;
4886
4887 if (cmd->fa_type == CONN) {
4888 rv = cmd->conn_fn(tconn, pi.cmd);
4889 } else {
4890 struct drbd_conf *mdev = vnr_to_mdev(tconn, pi.vnr);
4891 rv = cmd->mdev_fn(mdev, pi.cmd);
4892 }
4893
4894 if (!rv)
b411b363
PR
4895 goto reconnect;
4896
a4fbda8e
PR
4897 tconn->last_received = jiffies;
4898
f36af18c
LE
4899 /* the idle_timeout (ping-int)
4900 * has been restored in got_PingAck() */
7201b972 4901 if (cmd == &asender_tbl[P_PING_ACK])
f36af18c
LE
4902 ping_timeout_active = 0;
4903
b411b363
PR
4904 buf = h;
4905 received = 0;
257d0af6 4906 expect = sizeof(struct p_header);
b411b363
PR
4907 cmd = NULL;
4908 }
4909 }
4910
4911 if (0) {
4912reconnect:
bbeb641c 4913 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
b411b363
PR
4914 }
4915 if (0) {
4916disconnect:
bbeb641c 4917 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 4918 }
32862ec7 4919 clear_bit(SIGNAL_ASENDER, &tconn->flags);
b411b363 4920
32862ec7 4921 conn_info(tconn, "asender terminated\n");
b411b363
PR
4922
4923 return 0;
4924}