]> git.proxmox.com Git - mirror_ubuntu-artful-kernel.git/blame - drivers/block/drbd/drbd_receiver.c
drbd: Rename "enum drbd_packets" to "enum drbd_packet"
[mirror_ubuntu-artful-kernel.git] / drivers / block / drbd / drbd_receiver.c
CommitLineData
b411b363
PR
1/*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
b411b363
PR
26#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
b411b363
PR
31#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
b411b363
PR
39#include <linux/pkt_sched.h>
40#define __KERNEL_SYSCALLS__
41#include <linux/unistd.h>
42#include <linux/vmalloc.h>
43#include <linux/random.h>
b411b363
PR
44#include <linux/string.h>
45#include <linux/scatterlist.h>
46#include "drbd_int.h"
b411b363
PR
47#include "drbd_req.h"
48
49#include "drbd_vli.h"
50
b411b363
PR
51enum finish_epoch {
52 FE_STILL_LIVE,
53 FE_DESTROYED,
54 FE_RECYCLED,
55};
56
57static int drbd_do_handshake(struct drbd_conf *mdev);
58static int drbd_do_auth(struct drbd_conf *mdev);
59
60static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
61static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
62
b411b363
PR
63
64#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
65
45bb912b
LE
66/*
67 * some helper functions to deal with single linked page lists,
68 * page->private being our "next" pointer.
69 */
70
71/* If at least n pages are linked at head, get n pages off.
72 * Otherwise, don't modify head, and return NULL.
73 * Locking is the responsibility of the caller.
74 */
75static struct page *page_chain_del(struct page **head, int n)
76{
77 struct page *page;
78 struct page *tmp;
79
80 BUG_ON(!n);
81 BUG_ON(!head);
82
83 page = *head;
23ce4227
PR
84
85 if (!page)
86 return NULL;
87
45bb912b
LE
88 while (page) {
89 tmp = page_chain_next(page);
90 if (--n == 0)
91 break; /* found sufficient pages */
92 if (tmp == NULL)
93 /* insufficient pages, don't use any of them. */
94 return NULL;
95 page = tmp;
96 }
97
98 /* add end of list marker for the returned list */
99 set_page_private(page, 0);
100 /* actual return value, and adjustment of head */
101 page = *head;
102 *head = tmp;
103 return page;
104}
105
106/* may be used outside of locks to find the tail of a (usually short)
107 * "private" page chain, before adding it back to a global chain head
108 * with page_chain_add() under a spinlock. */
109static struct page *page_chain_tail(struct page *page, int *len)
110{
111 struct page *tmp;
112 int i = 1;
113 while ((tmp = page_chain_next(page)))
114 ++i, page = tmp;
115 if (len)
116 *len = i;
117 return page;
118}
119
120static int page_chain_free(struct page *page)
121{
122 struct page *tmp;
123 int i = 0;
124 page_chain_for_each_safe(page, tmp) {
125 put_page(page);
126 ++i;
127 }
128 return i;
129}
130
131static void page_chain_add(struct page **head,
132 struct page *chain_first, struct page *chain_last)
133{
134#if 1
135 struct page *tmp;
136 tmp = page_chain_tail(chain_first, NULL);
137 BUG_ON(tmp != chain_last);
138#endif
139
140 /* add chain to head */
141 set_page_private(chain_last, (unsigned long)*head);
142 *head = chain_first;
143}
144
145static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
b411b363
PR
146{
147 struct page *page = NULL;
45bb912b
LE
148 struct page *tmp = NULL;
149 int i = 0;
b411b363
PR
150
151 /* Yes, testing drbd_pp_vacant outside the lock is racy.
152 * So what. It saves a spin_lock. */
45bb912b 153 if (drbd_pp_vacant >= number) {
b411b363 154 spin_lock(&drbd_pp_lock);
45bb912b
LE
155 page = page_chain_del(&drbd_pp_pool, number);
156 if (page)
157 drbd_pp_vacant -= number;
b411b363 158 spin_unlock(&drbd_pp_lock);
45bb912b
LE
159 if (page)
160 return page;
b411b363 161 }
45bb912b 162
b411b363
PR
163 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
164 * "criss-cross" setup, that might cause write-out on some other DRBD,
165 * which in turn might block on the other node at this very place. */
45bb912b
LE
166 for (i = 0; i < number; i++) {
167 tmp = alloc_page(GFP_TRY);
168 if (!tmp)
169 break;
170 set_page_private(tmp, (unsigned long)page);
171 page = tmp;
172 }
173
174 if (i == number)
175 return page;
176
177 /* Not enough pages immediately available this time.
178 * No need to jump around here, drbd_pp_alloc will retry this
179 * function "soon". */
180 if (page) {
181 tmp = page_chain_tail(page, NULL);
182 spin_lock(&drbd_pp_lock);
183 page_chain_add(&drbd_pp_pool, page, tmp);
184 drbd_pp_vacant += i;
185 spin_unlock(&drbd_pp_lock);
186 }
187 return NULL;
b411b363
PR
188}
189
b411b363
PR
190static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
191{
192 struct drbd_epoch_entry *e;
193 struct list_head *le, *tle;
194
195 /* The EEs are always appended to the end of the list. Since
196 they are sent in order over the wire, they have to finish
197 in order. As soon as we see the first not finished we can
198 stop to examine the list... */
199
200 list_for_each_safe(le, tle, &mdev->net_ee) {
201 e = list_entry(le, struct drbd_epoch_entry, w.list);
45bb912b 202 if (drbd_ee_has_active_page(e))
b411b363
PR
203 break;
204 list_move(le, to_be_freed);
205 }
206}
207
208static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
209{
210 LIST_HEAD(reclaimed);
211 struct drbd_epoch_entry *e, *t;
212
87eeee41 213 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 214 reclaim_net_ee(mdev, &reclaimed);
87eeee41 215 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
216
217 list_for_each_entry_safe(e, t, &reclaimed, w.list)
435f0740 218 drbd_free_net_ee(mdev, e);
b411b363
PR
219}
220
221/**
45bb912b 222 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
b411b363 223 * @mdev: DRBD device.
45bb912b
LE
224 * @number: number of pages requested
225 * @retry: whether to retry, if not enough pages are available right now
226 *
227 * Tries to allocate number pages, first from our own page pool, then from
228 * the kernel, unless this allocation would exceed the max_buffers setting.
229 * Possibly retry until DRBD frees sufficient pages somewhere else.
b411b363 230 *
45bb912b 231 * Returns a page chain linked via page->private.
b411b363 232 */
45bb912b 233static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
b411b363
PR
234{
235 struct page *page = NULL;
236 DEFINE_WAIT(wait);
237
45bb912b
LE
238 /* Yes, we may run up to @number over max_buffers. If we
239 * follow it strictly, the admin will get it wrong anyways. */
89e58e75 240 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers)
45bb912b 241 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363 242
45bb912b 243 while (page == NULL) {
b411b363
PR
244 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
245
246 drbd_kick_lo_and_reclaim_net(mdev);
247
89e58e75 248 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers) {
45bb912b 249 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363
PR
250 if (page)
251 break;
252 }
253
254 if (!retry)
255 break;
256
257 if (signal_pending(current)) {
258 dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
259 break;
260 }
261
262 schedule();
263 }
264 finish_wait(&drbd_pp_wait, &wait);
265
45bb912b
LE
266 if (page)
267 atomic_add(number, &mdev->pp_in_use);
b411b363
PR
268 return page;
269}
270
271/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
87eeee41 272 * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
45bb912b
LE
273 * Either links the page chain back to the global pool,
274 * or returns all pages to the system. */
435f0740 275static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
b411b363 276{
435f0740 277 atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
b411b363 278 int i;
435f0740 279
1816a2b4 280 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE)*minor_count)
45bb912b
LE
281 i = page_chain_free(page);
282 else {
283 struct page *tmp;
284 tmp = page_chain_tail(page, &i);
285 spin_lock(&drbd_pp_lock);
286 page_chain_add(&drbd_pp_pool, page, tmp);
287 drbd_pp_vacant += i;
288 spin_unlock(&drbd_pp_lock);
b411b363 289 }
435f0740 290 i = atomic_sub_return(i, a);
45bb912b 291 if (i < 0)
435f0740
LE
292 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
293 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
b411b363
PR
294 wake_up(&drbd_pp_wait);
295}
296
297/*
298You need to hold the req_lock:
299 _drbd_wait_ee_list_empty()
300
301You must not have the req_lock:
302 drbd_free_ee()
303 drbd_alloc_ee()
304 drbd_init_ee()
305 drbd_release_ee()
306 drbd_ee_fix_bhs()
307 drbd_process_done_ee()
308 drbd_clear_done_ee()
309 drbd_wait_ee_list_empty()
310*/
311
312struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
313 u64 id,
314 sector_t sector,
315 unsigned int data_size,
316 gfp_t gfp_mask) __must_hold(local)
317{
b411b363
PR
318 struct drbd_epoch_entry *e;
319 struct page *page;
45bb912b 320 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
b411b363 321
0cf9d27e 322 if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
b411b363
PR
323 return NULL;
324
325 e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
326 if (!e) {
327 if (!(gfp_mask & __GFP_NOWARN))
328 dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
329 return NULL;
330 }
331
45bb912b
LE
332 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
333 if (!page)
334 goto fail;
b411b363 335
8b946255 336 drbd_clear_interval(&e->i);
b411b363 337 e->epoch = NULL;
45bb912b
LE
338 e->mdev = mdev;
339 e->pages = page;
340 atomic_set(&e->pending_bios, 0);
010f6e67 341 e->i.size = data_size;
b411b363 342 e->flags = 0;
010f6e67 343 e->i.sector = sector;
9a8e7753
AG
344 /*
345 * The block_id is opaque to the receiver. It is not endianness
346 * converted, and sent back to the sender unchanged.
347 */
45bb912b 348 e->block_id = id;
b411b363 349
b411b363
PR
350 return e;
351
45bb912b 352 fail:
b411b363 353 mempool_free(e, drbd_ee_mempool);
b411b363
PR
354 return NULL;
355}
356
435f0740 357void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, int is_net)
b411b363 358{
c36c3ced
LE
359 if (e->flags & EE_HAS_DIGEST)
360 kfree(e->digest);
435f0740 361 drbd_pp_free(mdev, e->pages, is_net);
45bb912b 362 D_ASSERT(atomic_read(&e->pending_bios) == 0);
8b946255 363 D_ASSERT(drbd_interval_empty(&e->i));
b411b363
PR
364 mempool_free(e, drbd_ee_mempool);
365}
366
367int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
368{
369 LIST_HEAD(work_list);
370 struct drbd_epoch_entry *e, *t;
371 int count = 0;
435f0740 372 int is_net = list == &mdev->net_ee;
b411b363 373
87eeee41 374 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 375 list_splice_init(list, &work_list);
87eeee41 376 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
377
378 list_for_each_entry_safe(e, t, &work_list, w.list) {
435f0740 379 drbd_free_some_ee(mdev, e, is_net);
b411b363
PR
380 count++;
381 }
382 return count;
383}
384
385
386/*
387 * This function is called from _asender only_
8554df1c 388 * but see also comments in _req_mod(,BARRIER_ACKED)
b411b363
PR
389 * and receive_Barrier.
390 *
391 * Move entries from net_ee to done_ee, if ready.
392 * Grab done_ee, call all callbacks, free the entries.
393 * The callbacks typically send out ACKs.
394 */
395static int drbd_process_done_ee(struct drbd_conf *mdev)
396{
397 LIST_HEAD(work_list);
398 LIST_HEAD(reclaimed);
399 struct drbd_epoch_entry *e, *t;
400 int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
401
87eeee41 402 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
403 reclaim_net_ee(mdev, &reclaimed);
404 list_splice_init(&mdev->done_ee, &work_list);
87eeee41 405 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
406
407 list_for_each_entry_safe(e, t, &reclaimed, w.list)
435f0740 408 drbd_free_net_ee(mdev, e);
b411b363
PR
409
410 /* possible callbacks here:
411 * e_end_block, and e_end_resync_block, e_send_discard_ack.
412 * all ignore the last argument.
413 */
414 list_for_each_entry_safe(e, t, &work_list, w.list) {
b411b363
PR
415 /* list_del not necessary, next/prev members not touched */
416 ok = e->w.cb(mdev, &e->w, !ok) && ok;
417 drbd_free_ee(mdev, e);
418 }
419 wake_up(&mdev->ee_wait);
420
421 return ok;
422}
423
424void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
425{
426 DEFINE_WAIT(wait);
427
428 /* avoids spin_lock/unlock
429 * and calling prepare_to_wait in the fast path */
430 while (!list_empty(head)) {
431 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
87eeee41 432 spin_unlock_irq(&mdev->tconn->req_lock);
7eaceacc 433 io_schedule();
b411b363 434 finish_wait(&mdev->ee_wait, &wait);
87eeee41 435 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
436 }
437}
438
439void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
440{
87eeee41 441 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 442 _drbd_wait_ee_list_empty(mdev, head);
87eeee41 443 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
444}
445
446/* see also kernel_accept; which is only present since 2.6.18.
447 * also we want to log which part of it failed, exactly */
448static int drbd_accept(struct drbd_conf *mdev, const char **what,
449 struct socket *sock, struct socket **newsock)
450{
451 struct sock *sk = sock->sk;
452 int err = 0;
453
454 *what = "listen";
455 err = sock->ops->listen(sock, 5);
456 if (err < 0)
457 goto out;
458
459 *what = "sock_create_lite";
460 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
461 newsock);
462 if (err < 0)
463 goto out;
464
465 *what = "accept";
466 err = sock->ops->accept(sock, *newsock, 0);
467 if (err < 0) {
468 sock_release(*newsock);
469 *newsock = NULL;
470 goto out;
471 }
472 (*newsock)->ops = sock->ops;
473
474out:
475 return err;
476}
477
478static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
479 void *buf, size_t size, int flags)
480{
481 mm_segment_t oldfs;
482 struct kvec iov = {
483 .iov_base = buf,
484 .iov_len = size,
485 };
486 struct msghdr msg = {
487 .msg_iovlen = 1,
488 .msg_iov = (struct iovec *)&iov,
489 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
490 };
491 int rv;
492
493 oldfs = get_fs();
494 set_fs(KERNEL_DS);
495 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
496 set_fs(oldfs);
497
498 return rv;
499}
500
501static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
502{
503 mm_segment_t oldfs;
504 struct kvec iov = {
505 .iov_base = buf,
506 .iov_len = size,
507 };
508 struct msghdr msg = {
509 .msg_iovlen = 1,
510 .msg_iov = (struct iovec *)&iov,
511 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
512 };
513 int rv;
514
515 oldfs = get_fs();
516 set_fs(KERNEL_DS);
517
518 for (;;) {
e42325a5 519 rv = sock_recvmsg(mdev->tconn->data.socket, &msg, size, msg.msg_flags);
b411b363
PR
520 if (rv == size)
521 break;
522
523 /* Note:
524 * ECONNRESET other side closed the connection
525 * ERESTARTSYS (on sock) we got a signal
526 */
527
528 if (rv < 0) {
529 if (rv == -ECONNRESET)
530 dev_info(DEV, "sock was reset by peer\n");
531 else if (rv != -ERESTARTSYS)
532 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
533 break;
534 } else if (rv == 0) {
535 dev_info(DEV, "sock was shut down by peer\n");
536 break;
537 } else {
538 /* signal came in, or peer/link went down,
539 * after we read a partial message
540 */
541 /* D_ASSERT(signal_pending(current)); */
542 break;
543 }
544 };
545
546 set_fs(oldfs);
547
548 if (rv != size)
549 drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
550
551 return rv;
552}
553
5dbf1673
LE
554/* quoting tcp(7):
555 * On individual connections, the socket buffer size must be set prior to the
556 * listen(2) or connect(2) calls in order to have it take effect.
557 * This is our wrapper to do so.
558 */
559static void drbd_setbufsize(struct socket *sock, unsigned int snd,
560 unsigned int rcv)
561{
562 /* open coded SO_SNDBUF, SO_RCVBUF */
563 if (snd) {
564 sock->sk->sk_sndbuf = snd;
565 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
566 }
567 if (rcv) {
568 sock->sk->sk_rcvbuf = rcv;
569 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
570 }
571}
572
b411b363
PR
573static struct socket *drbd_try_connect(struct drbd_conf *mdev)
574{
575 const char *what;
576 struct socket *sock;
577 struct sockaddr_in6 src_in6;
578 int err;
579 int disconnect_on_error = 1;
580
b2fb6dbe 581 if (!get_net_conf(mdev->tconn))
b411b363
PR
582 return NULL;
583
584 what = "sock_create_kern";
89e58e75 585 err = sock_create_kern(((struct sockaddr *)mdev->tconn->net_conf->my_addr)->sa_family,
b411b363
PR
586 SOCK_STREAM, IPPROTO_TCP, &sock);
587 if (err < 0) {
588 sock = NULL;
589 goto out;
590 }
591
592 sock->sk->sk_rcvtimeo =
89e58e75
PR
593 sock->sk->sk_sndtimeo = mdev->tconn->net_conf->try_connect_int*HZ;
594 drbd_setbufsize(sock, mdev->tconn->net_conf->sndbuf_size,
595 mdev->tconn->net_conf->rcvbuf_size);
b411b363
PR
596
597 /* explicitly bind to the configured IP as source IP
598 * for the outgoing connections.
599 * This is needed for multihomed hosts and to be
600 * able to use lo: interfaces for drbd.
601 * Make sure to use 0 as port number, so linux selects
602 * a free one dynamically.
603 */
89e58e75
PR
604 memcpy(&src_in6, mdev->tconn->net_conf->my_addr,
605 min_t(int, mdev->tconn->net_conf->my_addr_len, sizeof(src_in6)));
606 if (((struct sockaddr *)mdev->tconn->net_conf->my_addr)->sa_family == AF_INET6)
b411b363
PR
607 src_in6.sin6_port = 0;
608 else
609 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
610
611 what = "bind before connect";
612 err = sock->ops->bind(sock,
613 (struct sockaddr *) &src_in6,
89e58e75 614 mdev->tconn->net_conf->my_addr_len);
b411b363
PR
615 if (err < 0)
616 goto out;
617
618 /* connect may fail, peer not yet available.
619 * stay C_WF_CONNECTION, don't go Disconnecting! */
620 disconnect_on_error = 0;
621 what = "connect";
622 err = sock->ops->connect(sock,
89e58e75
PR
623 (struct sockaddr *)mdev->tconn->net_conf->peer_addr,
624 mdev->tconn->net_conf->peer_addr_len, 0);
b411b363
PR
625
626out:
627 if (err < 0) {
628 if (sock) {
629 sock_release(sock);
630 sock = NULL;
631 }
632 switch (-err) {
633 /* timeout, busy, signal pending */
634 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
635 case EINTR: case ERESTARTSYS:
636 /* peer not (yet) available, network problem */
637 case ECONNREFUSED: case ENETUNREACH:
638 case EHOSTDOWN: case EHOSTUNREACH:
639 disconnect_on_error = 0;
640 break;
641 default:
642 dev_err(DEV, "%s failed, err = %d\n", what, err);
643 }
644 if (disconnect_on_error)
645 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
646 }
b2fb6dbe 647 put_net_conf(mdev->tconn);
b411b363
PR
648 return sock;
649}
650
651static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
652{
653 int timeo, err;
654 struct socket *s_estab = NULL, *s_listen;
655 const char *what;
656
b2fb6dbe 657 if (!get_net_conf(mdev->tconn))
b411b363
PR
658 return NULL;
659
660 what = "sock_create_kern";
89e58e75 661 err = sock_create_kern(((struct sockaddr *)mdev->tconn->net_conf->my_addr)->sa_family,
b411b363
PR
662 SOCK_STREAM, IPPROTO_TCP, &s_listen);
663 if (err) {
664 s_listen = NULL;
665 goto out;
666 }
667
89e58e75 668 timeo = mdev->tconn->net_conf->try_connect_int * HZ;
b411b363
PR
669 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
670
671 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
672 s_listen->sk->sk_rcvtimeo = timeo;
673 s_listen->sk->sk_sndtimeo = timeo;
89e58e75
PR
674 drbd_setbufsize(s_listen, mdev->tconn->net_conf->sndbuf_size,
675 mdev->tconn->net_conf->rcvbuf_size);
b411b363
PR
676
677 what = "bind before listen";
678 err = s_listen->ops->bind(s_listen,
89e58e75
PR
679 (struct sockaddr *) mdev->tconn->net_conf->my_addr,
680 mdev->tconn->net_conf->my_addr_len);
b411b363
PR
681 if (err < 0)
682 goto out;
683
684 err = drbd_accept(mdev, &what, s_listen, &s_estab);
685
686out:
687 if (s_listen)
688 sock_release(s_listen);
689 if (err < 0) {
690 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
691 dev_err(DEV, "%s failed, err = %d\n", what, err);
692 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
693 }
694 }
b2fb6dbe 695 put_net_conf(mdev->tconn);
b411b363
PR
696
697 return s_estab;
698}
699
d8763023
AG
700static int drbd_send_fp(struct drbd_conf *mdev, struct socket *sock,
701 enum drbd_packet cmd)
b411b363 702{
c012949a 703 struct p_header *h = &mdev->tconn->data.sbuf.header;
b411b363
PR
704
705 return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
706}
707
d8763023
AG
708static enum drbd_packet drbd_recv_fp(struct drbd_conf *mdev,
709 struct socket *sock)
b411b363 710{
e42325a5 711 struct p_header80 *h = &mdev->tconn->data.rbuf.header.h80;
b411b363
PR
712 int rr;
713
714 rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
715
ca9bc12b 716 if (rr == sizeof(*h) && h->magic == cpu_to_be32(DRBD_MAGIC))
b411b363
PR
717 return be16_to_cpu(h->command);
718
719 return 0xffff;
720}
721
722/**
723 * drbd_socket_okay() - Free the socket if its connection is not okay
724 * @mdev: DRBD device.
725 * @sock: pointer to the pointer to the socket.
726 */
727static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
728{
729 int rr;
730 char tb[4];
731
732 if (!*sock)
81e84650 733 return false;
b411b363
PR
734
735 rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
736
737 if (rr > 0 || rr == -EAGAIN) {
81e84650 738 return true;
b411b363
PR
739 } else {
740 sock_release(*sock);
741 *sock = NULL;
81e84650 742 return false;
b411b363
PR
743 }
744}
745
746/*
747 * return values:
748 * 1 yes, we have a valid connection
749 * 0 oops, did not work out, please try again
750 * -1 peer talks different language,
751 * no point in trying again, please go standalone.
752 * -2 We do not have a network config...
753 */
754static int drbd_connect(struct drbd_conf *mdev)
755{
756 struct socket *s, *sock, *msock;
757 int try, h, ok;
758
e42325a5 759 D_ASSERT(!mdev->tconn->data.socket);
b411b363 760
b411b363
PR
761 if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
762 return -2;
763
764 clear_bit(DISCARD_CONCURRENT, &mdev->flags);
fd340c12
PR
765 mdev->tconn->agreed_pro_version = 99;
766 /* agreed_pro_version must be smaller than 100 so we send the old
767 header (h80) in the first packet and in the handshake packet. */
b411b363
PR
768
769 sock = NULL;
770 msock = NULL;
771
772 do {
773 for (try = 0;;) {
774 /* 3 tries, this should take less than a second! */
775 s = drbd_try_connect(mdev);
776 if (s || ++try >= 3)
777 break;
778 /* give the other side time to call bind() & listen() */
20ee6390 779 schedule_timeout_interruptible(HZ / 10);
b411b363
PR
780 }
781
782 if (s) {
783 if (!sock) {
784 drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
785 sock = s;
786 s = NULL;
787 } else if (!msock) {
788 drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
789 msock = s;
790 s = NULL;
791 } else {
792 dev_err(DEV, "Logic error in drbd_connect()\n");
793 goto out_release_sockets;
794 }
795 }
796
797 if (sock && msock) {
89e58e75 798 schedule_timeout_interruptible(mdev->tconn->net_conf->ping_timeo*HZ/10);
b411b363
PR
799 ok = drbd_socket_okay(mdev, &sock);
800 ok = drbd_socket_okay(mdev, &msock) && ok;
801 if (ok)
802 break;
803 }
804
805retry:
806 s = drbd_wait_for_connect(mdev);
807 if (s) {
808 try = drbd_recv_fp(mdev, s);
809 drbd_socket_okay(mdev, &sock);
810 drbd_socket_okay(mdev, &msock);
811 switch (try) {
812 case P_HAND_SHAKE_S:
813 if (sock) {
814 dev_warn(DEV, "initial packet S crossed\n");
815 sock_release(sock);
816 }
817 sock = s;
818 break;
819 case P_HAND_SHAKE_M:
820 if (msock) {
821 dev_warn(DEV, "initial packet M crossed\n");
822 sock_release(msock);
823 }
824 msock = s;
825 set_bit(DISCARD_CONCURRENT, &mdev->flags);
826 break;
827 default:
828 dev_warn(DEV, "Error receiving initial packet\n");
829 sock_release(s);
830 if (random32() & 1)
831 goto retry;
832 }
833 }
834
835 if (mdev->state.conn <= C_DISCONNECTING)
836 goto out_release_sockets;
837 if (signal_pending(current)) {
838 flush_signals(current);
839 smp_rmb();
e6b3ea83 840 if (get_t_state(&mdev->tconn->receiver) == EXITING)
b411b363
PR
841 goto out_release_sockets;
842 }
843
844 if (sock && msock) {
845 ok = drbd_socket_okay(mdev, &sock);
846 ok = drbd_socket_okay(mdev, &msock) && ok;
847 if (ok)
848 break;
849 }
850 } while (1);
851
852 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
853 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
854
855 sock->sk->sk_allocation = GFP_NOIO;
856 msock->sk->sk_allocation = GFP_NOIO;
857
858 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
859 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
860
b411b363 861 /* NOT YET ...
89e58e75 862 * sock->sk->sk_sndtimeo = mdev->tconn->net_conf->timeout*HZ/10;
b411b363
PR
863 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
864 * first set it to the P_HAND_SHAKE timeout,
865 * which we set to 4x the configured ping_timeout. */
866 sock->sk->sk_sndtimeo =
89e58e75 867 sock->sk->sk_rcvtimeo = mdev->tconn->net_conf->ping_timeo*4*HZ/10;
b411b363 868
89e58e75
PR
869 msock->sk->sk_sndtimeo = mdev->tconn->net_conf->timeout*HZ/10;
870 msock->sk->sk_rcvtimeo = mdev->tconn->net_conf->ping_int*HZ;
b411b363
PR
871
872 /* we don't want delays.
25985edc 873 * we use TCP_CORK where appropriate, though */
b411b363
PR
874 drbd_tcp_nodelay(sock);
875 drbd_tcp_nodelay(msock);
876
e42325a5
PR
877 mdev->tconn->data.socket = sock;
878 mdev->tconn->meta.socket = msock;
31890f4a 879 mdev->tconn->last_received = jiffies;
b411b363 880
e6b3ea83 881 D_ASSERT(mdev->tconn->asender.task == NULL);
b411b363
PR
882
883 h = drbd_do_handshake(mdev);
884 if (h <= 0)
885 return h;
886
a0638456 887 if (mdev->tconn->cram_hmac_tfm) {
b411b363 888 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
b10d96cb
JT
889 switch (drbd_do_auth(mdev)) {
890 case -1:
b411b363
PR
891 dev_err(DEV, "Authentication of peer failed\n");
892 return -1;
b10d96cb
JT
893 case 0:
894 dev_err(DEV, "Authentication of peer failed, trying again.\n");
895 return 0;
b411b363
PR
896 }
897 }
898
899 if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
900 return 0;
901
89e58e75 902 sock->sk->sk_sndtimeo = mdev->tconn->net_conf->timeout*HZ/10;
b411b363
PR
903 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
904
905 atomic_set(&mdev->packet_seq, 0);
906 mdev->peer_seq = 0;
907
e6b3ea83 908 drbd_thread_start(&mdev->tconn->asender);
b411b363 909
148efa16 910 if (drbd_send_protocol(mdev) == -1)
7e2455c1 911 return -1;
b411b363 912 drbd_send_sync_param(mdev, &mdev->sync_conf);
e89b591c 913 drbd_send_sizes(mdev, 0, 0);
b411b363
PR
914 drbd_send_uuids(mdev);
915 drbd_send_state(mdev);
916 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
917 clear_bit(RESIZE_PENDING, &mdev->flags);
7fde2be9 918 mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
b411b363
PR
919
920 return 1;
921
922out_release_sockets:
923 if (sock)
924 sock_release(sock);
925 if (msock)
926 sock_release(msock);
927 return -1;
928}
929
d8763023
AG
930static bool decode_header(struct drbd_conf *mdev, struct p_header *h,
931 enum drbd_packet *cmd, unsigned int *packet_size)
b411b363 932{
fd340c12 933 if (h->h80.magic == cpu_to_be32(DRBD_MAGIC)) {
02918be2
PR
934 *cmd = be16_to_cpu(h->h80.command);
935 *packet_size = be16_to_cpu(h->h80.length);
ca9bc12b 936 } else if (h->h95.magic == cpu_to_be16(DRBD_MAGIC_BIG)) {
02918be2 937 *cmd = be16_to_cpu(h->h95.command);
fd340c12 938 *packet_size = be32_to_cpu(h->h95.length) & 0x00ffffff;
02918be2 939 } else {
004352fa
LE
940 dev_err(DEV, "magic?? on data m: 0x%08x c: %d l: %d\n",
941 be32_to_cpu(h->h80.magic),
942 be16_to_cpu(h->h80.command),
943 be16_to_cpu(h->h80.length));
81e84650 944 return false;
b411b363 945 }
257d0af6
PR
946 return true;
947}
948
d8763023
AG
949static int drbd_recv_header(struct drbd_conf *mdev, enum drbd_packet *cmd,
950 unsigned int *packet_size)
257d0af6
PR
951{
952 struct p_header *h = &mdev->tconn->data.rbuf.header;
953 int r;
954
955 r = drbd_recv(mdev, h, sizeof(*h));
956 if (unlikely(r != sizeof(*h))) {
957 if (!signal_pending(current))
958 dev_warn(DEV, "short read expecting header on sock: r=%d\n", r);
959 return false;
960 }
961
962 r = decode_header(mdev, h, cmd, packet_size);
31890f4a 963 mdev->tconn->last_received = jiffies;
b411b363 964
257d0af6 965 return r;
b411b363
PR
966}
967
2451fc3b 968static void drbd_flush(struct drbd_conf *mdev)
b411b363
PR
969{
970 int rv;
971
972 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
fbd9b09a 973 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
dd3932ed 974 NULL);
b411b363
PR
975 if (rv) {
976 dev_err(DEV, "local disk flush failed with status %d\n", rv);
977 /* would rather check on EOPNOTSUPP, but that is not reliable.
978 * don't try again for ANY return value != 0
979 * if (rv == -EOPNOTSUPP) */
980 drbd_bump_write_ordering(mdev, WO_drain_io);
981 }
982 put_ldev(mdev);
983 }
b411b363
PR
984}
985
986/**
987 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
988 * @mdev: DRBD device.
989 * @epoch: Epoch object.
990 * @ev: Epoch event.
991 */
992static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
993 struct drbd_epoch *epoch,
994 enum epoch_event ev)
995{
2451fc3b 996 int epoch_size;
b411b363 997 struct drbd_epoch *next_epoch;
b411b363
PR
998 enum finish_epoch rv = FE_STILL_LIVE;
999
1000 spin_lock(&mdev->epoch_lock);
1001 do {
1002 next_epoch = NULL;
b411b363
PR
1003
1004 epoch_size = atomic_read(&epoch->epoch_size);
1005
1006 switch (ev & ~EV_CLEANUP) {
1007 case EV_PUT:
1008 atomic_dec(&epoch->active);
1009 break;
1010 case EV_GOT_BARRIER_NR:
1011 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
b411b363
PR
1012 break;
1013 case EV_BECAME_LAST:
1014 /* nothing to do*/
1015 break;
1016 }
1017
b411b363
PR
1018 if (epoch_size != 0 &&
1019 atomic_read(&epoch->active) == 0 &&
2451fc3b 1020 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
b411b363
PR
1021 if (!(ev & EV_CLEANUP)) {
1022 spin_unlock(&mdev->epoch_lock);
1023 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1024 spin_lock(&mdev->epoch_lock);
1025 }
1026 dec_unacked(mdev);
1027
1028 if (mdev->current_epoch != epoch) {
1029 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1030 list_del(&epoch->list);
1031 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1032 mdev->epochs--;
b411b363
PR
1033 kfree(epoch);
1034
1035 if (rv == FE_STILL_LIVE)
1036 rv = FE_DESTROYED;
1037 } else {
1038 epoch->flags = 0;
1039 atomic_set(&epoch->epoch_size, 0);
698f9315 1040 /* atomic_set(&epoch->active, 0); is already zero */
b411b363
PR
1041 if (rv == FE_STILL_LIVE)
1042 rv = FE_RECYCLED;
2451fc3b 1043 wake_up(&mdev->ee_wait);
b411b363
PR
1044 }
1045 }
1046
1047 if (!next_epoch)
1048 break;
1049
1050 epoch = next_epoch;
1051 } while (1);
1052
1053 spin_unlock(&mdev->epoch_lock);
1054
b411b363
PR
1055 return rv;
1056}
1057
1058/**
1059 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1060 * @mdev: DRBD device.
1061 * @wo: Write ordering method to try.
1062 */
1063void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1064{
1065 enum write_ordering_e pwo;
1066 static char *write_ordering_str[] = {
1067 [WO_none] = "none",
1068 [WO_drain_io] = "drain",
1069 [WO_bdev_flush] = "flush",
b411b363
PR
1070 };
1071
1072 pwo = mdev->write_ordering;
1073 wo = min(pwo, wo);
b411b363
PR
1074 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1075 wo = WO_drain_io;
1076 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1077 wo = WO_none;
1078 mdev->write_ordering = wo;
2451fc3b 1079 if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
b411b363
PR
1080 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1081}
1082
45bb912b
LE
1083/**
1084 * drbd_submit_ee()
1085 * @mdev: DRBD device.
1086 * @e: epoch entry
1087 * @rw: flag field, see bio->bi_rw
10f6d992
LE
1088 *
1089 * May spread the pages to multiple bios,
1090 * depending on bio_add_page restrictions.
1091 *
1092 * Returns 0 if all bios have been submitted,
1093 * -ENOMEM if we could not allocate enough bios,
1094 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1095 * single page to an empty bio (which should never happen and likely indicates
1096 * that the lower level IO stack is in some way broken). This has been observed
1097 * on certain Xen deployments.
45bb912b
LE
1098 */
1099/* TODO allocate from our own bio_set. */
1100int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1101 const unsigned rw, const int fault_type)
1102{
1103 struct bio *bios = NULL;
1104 struct bio *bio;
1105 struct page *page = e->pages;
010f6e67
AG
1106 sector_t sector = e->i.sector;
1107 unsigned ds = e->i.size;
45bb912b
LE
1108 unsigned n_bios = 0;
1109 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
10f6d992 1110 int err = -ENOMEM;
45bb912b
LE
1111
1112 /* In most cases, we will only need one bio. But in case the lower
1113 * level restrictions happen to be different at this offset on this
1114 * side than those of the sending peer, we may need to submit the
1115 * request in more than one bio. */
1116next_bio:
1117 bio = bio_alloc(GFP_NOIO, nr_pages);
1118 if (!bio) {
1119 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1120 goto fail;
1121 }
010f6e67 1122 /* > e->i.sector, unless this is the first bio */
45bb912b
LE
1123 bio->bi_sector = sector;
1124 bio->bi_bdev = mdev->ldev->backing_bdev;
45bb912b
LE
1125 bio->bi_rw = rw;
1126 bio->bi_private = e;
1127 bio->bi_end_io = drbd_endio_sec;
1128
1129 bio->bi_next = bios;
1130 bios = bio;
1131 ++n_bios;
1132
1133 page_chain_for_each(page) {
1134 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1135 if (!bio_add_page(bio, page, len, 0)) {
10f6d992
LE
1136 /* A single page must always be possible!
1137 * But in case it fails anyways,
1138 * we deal with it, and complain (below). */
1139 if (bio->bi_vcnt == 0) {
1140 dev_err(DEV,
1141 "bio_add_page failed for len=%u, "
1142 "bi_vcnt=0 (bi_sector=%llu)\n",
1143 len, (unsigned long long)bio->bi_sector);
1144 err = -ENOSPC;
1145 goto fail;
1146 }
45bb912b
LE
1147 goto next_bio;
1148 }
1149 ds -= len;
1150 sector += len >> 9;
1151 --nr_pages;
1152 }
1153 D_ASSERT(page == NULL);
1154 D_ASSERT(ds == 0);
1155
1156 atomic_set(&e->pending_bios, n_bios);
1157 do {
1158 bio = bios;
1159 bios = bios->bi_next;
1160 bio->bi_next = NULL;
1161
45bb912b 1162 drbd_generic_make_request(mdev, fault_type, bio);
45bb912b 1163 } while (bios);
45bb912b
LE
1164 return 0;
1165
1166fail:
1167 while (bios) {
1168 bio = bios;
1169 bios = bios->bi_next;
1170 bio_put(bio);
1171 }
10f6d992 1172 return err;
45bb912b
LE
1173}
1174
d8763023
AG
1175static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packet cmd,
1176 unsigned int data_size)
b411b363 1177{
2451fc3b 1178 int rv;
e42325a5 1179 struct p_barrier *p = &mdev->tconn->data.rbuf.barrier;
b411b363
PR
1180 struct drbd_epoch *epoch;
1181
b411b363
PR
1182 inc_unacked(mdev);
1183
b411b363
PR
1184 mdev->current_epoch->barrier_nr = p->barrier;
1185 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1186
1187 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1188 * the activity log, which means it would not be resynced in case the
1189 * R_PRIMARY crashes now.
1190 * Therefore we must send the barrier_ack after the barrier request was
1191 * completed. */
1192 switch (mdev->write_ordering) {
b411b363
PR
1193 case WO_none:
1194 if (rv == FE_RECYCLED)
81e84650 1195 return true;
2451fc3b
PR
1196
1197 /* receiver context, in the writeout path of the other node.
1198 * avoid potential distributed deadlock */
1199 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1200 if (epoch)
1201 break;
1202 else
1203 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1204 /* Fall through */
b411b363
PR
1205
1206 case WO_bdev_flush:
1207 case WO_drain_io:
b411b363 1208 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
2451fc3b
PR
1209 drbd_flush(mdev);
1210
1211 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1212 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1213 if (epoch)
1214 break;
b411b363
PR
1215 }
1216
2451fc3b
PR
1217 epoch = mdev->current_epoch;
1218 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1219
1220 D_ASSERT(atomic_read(&epoch->active) == 0);
1221 D_ASSERT(epoch->flags == 0);
b411b363 1222
81e84650 1223 return true;
2451fc3b
PR
1224 default:
1225 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
81e84650 1226 return false;
b411b363
PR
1227 }
1228
1229 epoch->flags = 0;
1230 atomic_set(&epoch->epoch_size, 0);
1231 atomic_set(&epoch->active, 0);
1232
1233 spin_lock(&mdev->epoch_lock);
1234 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1235 list_add(&epoch->list, &mdev->current_epoch->list);
1236 mdev->current_epoch = epoch;
1237 mdev->epochs++;
b411b363
PR
1238 } else {
1239 /* The current_epoch got recycled while we allocated this one... */
1240 kfree(epoch);
1241 }
1242 spin_unlock(&mdev->epoch_lock);
1243
81e84650 1244 return true;
b411b363
PR
1245}
1246
1247/* used from receive_RSDataReply (recv_resync_read)
1248 * and from receive_Data */
1249static struct drbd_epoch_entry *
1250read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1251{
6666032a 1252 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
b411b363 1253 struct drbd_epoch_entry *e;
b411b363 1254 struct page *page;
45bb912b 1255 int dgs, ds, rr;
a0638456
PR
1256 void *dig_in = mdev->tconn->int_dig_in;
1257 void *dig_vv = mdev->tconn->int_dig_vv;
6b4388ac 1258 unsigned long *data;
b411b363 1259
a0638456
PR
1260 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1261 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
b411b363
PR
1262
1263 if (dgs) {
1264 rr = drbd_recv(mdev, dig_in, dgs);
1265 if (rr != dgs) {
0ddc5549
LE
1266 if (!signal_pending(current))
1267 dev_warn(DEV,
1268 "short read receiving data digest: read %d expected %d\n",
1269 rr, dgs);
b411b363
PR
1270 return NULL;
1271 }
1272 }
1273
1274 data_size -= dgs;
1275
841ce241
AG
1276 if (!expect(data_size != 0))
1277 return NULL;
1278 if (!expect(IS_ALIGNED(data_size, 512)))
1279 return NULL;
1280 if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1281 return NULL;
b411b363 1282
6666032a
LE
1283 /* even though we trust out peer,
1284 * we sometimes have to double check. */
1285 if (sector + (data_size>>9) > capacity) {
fdda6544
LE
1286 dev_err(DEV, "request from peer beyond end of local disk: "
1287 "capacity: %llus < sector: %llus + size: %u\n",
6666032a
LE
1288 (unsigned long long)capacity,
1289 (unsigned long long)sector, data_size);
1290 return NULL;
1291 }
1292
b411b363
PR
1293 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1294 * "criss-cross" setup, that might cause write-out on some other DRBD,
1295 * which in turn might block on the other node at this very place. */
1296 e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1297 if (!e)
1298 return NULL;
45bb912b 1299
b411b363 1300 ds = data_size;
45bb912b
LE
1301 page = e->pages;
1302 page_chain_for_each(page) {
1303 unsigned len = min_t(int, ds, PAGE_SIZE);
6b4388ac 1304 data = kmap(page);
45bb912b 1305 rr = drbd_recv(mdev, data, len);
0cf9d27e 1306 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
6b4388ac
PR
1307 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1308 data[0] = data[0] ^ (unsigned long)-1;
1309 }
b411b363 1310 kunmap(page);
45bb912b 1311 if (rr != len) {
b411b363 1312 drbd_free_ee(mdev, e);
0ddc5549
LE
1313 if (!signal_pending(current))
1314 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1315 rr, len);
b411b363
PR
1316 return NULL;
1317 }
1318 ds -= rr;
1319 }
1320
1321 if (dgs) {
a0638456 1322 drbd_csum_ee(mdev, mdev->tconn->integrity_r_tfm, e, dig_vv);
b411b363 1323 if (memcmp(dig_in, dig_vv, dgs)) {
470be44a
LE
1324 dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1325 (unsigned long long)sector, data_size);
b411b363
PR
1326 drbd_bcast_ee(mdev, "digest failed",
1327 dgs, dig_in, dig_vv, e);
1328 drbd_free_ee(mdev, e);
1329 return NULL;
1330 }
1331 }
1332 mdev->recv_cnt += data_size>>9;
1333 return e;
1334}
1335
1336/* drbd_drain_block() just takes a data block
1337 * out of the socket input buffer, and discards it.
1338 */
1339static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1340{
1341 struct page *page;
1342 int rr, rv = 1;
1343 void *data;
1344
c3470cde 1345 if (!data_size)
81e84650 1346 return true;
c3470cde 1347
45bb912b 1348 page = drbd_pp_alloc(mdev, 1, 1);
b411b363
PR
1349
1350 data = kmap(page);
1351 while (data_size) {
1352 rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1353 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1354 rv = 0;
0ddc5549
LE
1355 if (!signal_pending(current))
1356 dev_warn(DEV,
1357 "short read receiving data: read %d expected %d\n",
1358 rr, min_t(int, data_size, PAGE_SIZE));
b411b363
PR
1359 break;
1360 }
1361 data_size -= rr;
1362 }
1363 kunmap(page);
435f0740 1364 drbd_pp_free(mdev, page, 0);
b411b363
PR
1365 return rv;
1366}
1367
1368static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1369 sector_t sector, int data_size)
1370{
1371 struct bio_vec *bvec;
1372 struct bio *bio;
1373 int dgs, rr, i, expect;
a0638456
PR
1374 void *dig_in = mdev->tconn->int_dig_in;
1375 void *dig_vv = mdev->tconn->int_dig_vv;
b411b363 1376
a0638456
PR
1377 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1378 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
b411b363
PR
1379
1380 if (dgs) {
1381 rr = drbd_recv(mdev, dig_in, dgs);
1382 if (rr != dgs) {
0ddc5549
LE
1383 if (!signal_pending(current))
1384 dev_warn(DEV,
1385 "short read receiving data reply digest: read %d expected %d\n",
1386 rr, dgs);
b411b363
PR
1387 return 0;
1388 }
1389 }
1390
1391 data_size -= dgs;
1392
1393 /* optimistically update recv_cnt. if receiving fails below,
1394 * we disconnect anyways, and counters will be reset. */
1395 mdev->recv_cnt += data_size>>9;
1396
1397 bio = req->master_bio;
1398 D_ASSERT(sector == bio->bi_sector);
1399
1400 bio_for_each_segment(bvec, bio, i) {
1401 expect = min_t(int, data_size, bvec->bv_len);
1402 rr = drbd_recv(mdev,
1403 kmap(bvec->bv_page)+bvec->bv_offset,
1404 expect);
1405 kunmap(bvec->bv_page);
1406 if (rr != expect) {
0ddc5549
LE
1407 if (!signal_pending(current))
1408 dev_warn(DEV, "short read receiving data reply: "
1409 "read %d expected %d\n",
1410 rr, expect);
b411b363
PR
1411 return 0;
1412 }
1413 data_size -= rr;
1414 }
1415
1416 if (dgs) {
a0638456 1417 drbd_csum_bio(mdev, mdev->tconn->integrity_r_tfm, bio, dig_vv);
b411b363
PR
1418 if (memcmp(dig_in, dig_vv, dgs)) {
1419 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1420 return 0;
1421 }
1422 }
1423
1424 D_ASSERT(data_size == 0);
1425 return 1;
1426}
1427
1428/* e_end_resync_block() is called via
1429 * drbd_process_done_ee() by asender only */
1430static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1431{
1432 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
010f6e67 1433 sector_t sector = e->i.sector;
b411b363
PR
1434 int ok;
1435
8b946255 1436 D_ASSERT(drbd_interval_empty(&e->i));
b411b363 1437
45bb912b 1438 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
010f6e67 1439 drbd_set_in_sync(mdev, sector, e->i.size);
b411b363
PR
1440 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1441 } else {
1442 /* Record failure to sync */
010f6e67 1443 drbd_rs_failed_io(mdev, sector, e->i.size);
b411b363
PR
1444
1445 ok = drbd_send_ack(mdev, P_NEG_ACK, e);
1446 }
1447 dec_unacked(mdev);
1448
1449 return ok;
1450}
1451
1452static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1453{
1454 struct drbd_epoch_entry *e;
1455
1456 e = read_in_block(mdev, ID_SYNCER, sector, data_size);
45bb912b
LE
1457 if (!e)
1458 goto fail;
b411b363
PR
1459
1460 dec_rs_pending(mdev);
1461
b411b363
PR
1462 inc_unacked(mdev);
1463 /* corresponding dec_unacked() in e_end_resync_block()
1464 * respective _drbd_clear_done_ee */
1465
45bb912b
LE
1466 e->w.cb = e_end_resync_block;
1467
87eeee41 1468 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 1469 list_add(&e->w.list, &mdev->sync_ee);
87eeee41 1470 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 1471
0f0601f4 1472 atomic_add(data_size >> 9, &mdev->rs_sect_ev);
45bb912b 1473 if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
81e84650 1474 return true;
b411b363 1475
10f6d992
LE
1476 /* don't care for the reason here */
1477 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 1478 spin_lock_irq(&mdev->tconn->req_lock);
22cc37a9 1479 list_del(&e->w.list);
87eeee41 1480 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9 1481
45bb912b
LE
1482 drbd_free_ee(mdev, e);
1483fail:
1484 put_ldev(mdev);
81e84650 1485 return false;
b411b363
PR
1486}
1487
668eebc6 1488static struct drbd_request *
bc9c5c41
AG
1489find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1490 sector_t sector, bool missing_ok, const char *func)
51624585 1491{
51624585
AG
1492 struct drbd_request *req;
1493
bc9c5c41
AG
1494 /* Request object according to our peer */
1495 req = (struct drbd_request *)(unsigned long)id;
1496 if (drbd_contains_interval(root, sector, &req->i))
668eebc6 1497 return req;
c3afd8f5
AG
1498 if (!missing_ok) {
1499 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1500 (unsigned long)id, (unsigned long long)sector);
1501 }
51624585
AG
1502 return NULL;
1503}
1504
d8763023
AG
1505static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packet cmd,
1506 unsigned int data_size)
b411b363
PR
1507{
1508 struct drbd_request *req;
1509 sector_t sector;
b411b363 1510 int ok;
e42325a5 1511 struct p_data *p = &mdev->tconn->data.rbuf.data;
b411b363
PR
1512
1513 sector = be64_to_cpu(p->sector);
1514
87eeee41 1515 spin_lock_irq(&mdev->tconn->req_lock);
bc9c5c41 1516 req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
87eeee41 1517 spin_unlock_irq(&mdev->tconn->req_lock);
c3afd8f5 1518 if (unlikely(!req))
81e84650 1519 return false;
b411b363 1520
24c4830c 1521 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
b411b363
PR
1522 * special casing it there for the various failure cases.
1523 * still no race with drbd_fail_pending_reads */
1524 ok = recv_dless_read(mdev, req, sector, data_size);
1525
1526 if (ok)
8554df1c 1527 req_mod(req, DATA_RECEIVED);
b411b363
PR
1528 /* else: nothing. handled from drbd_disconnect...
1529 * I don't think we may complete this just yet
1530 * in case we are "on-disconnect: freeze" */
1531
1532 return ok;
1533}
1534
d8763023
AG
1535static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packet cmd,
1536 unsigned int data_size)
b411b363
PR
1537{
1538 sector_t sector;
b411b363 1539 int ok;
e42325a5 1540 struct p_data *p = &mdev->tconn->data.rbuf.data;
b411b363
PR
1541
1542 sector = be64_to_cpu(p->sector);
1543 D_ASSERT(p->block_id == ID_SYNCER);
1544
1545 if (get_ldev(mdev)) {
1546 /* data is submitted to disk within recv_resync_read.
1547 * corresponding put_ldev done below on error,
9c50842a 1548 * or in drbd_endio_sec. */
b411b363
PR
1549 ok = recv_resync_read(mdev, sector, data_size);
1550 } else {
1551 if (__ratelimit(&drbd_ratelimit_state))
1552 dev_err(DEV, "Can not write resync data to local disk.\n");
1553
1554 ok = drbd_drain_block(mdev, data_size);
1555
2b2bf214 1556 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
b411b363
PR
1557 }
1558
778f271d
PR
1559 atomic_add(data_size >> 9, &mdev->rs_sect_in);
1560
b411b363
PR
1561 return ok;
1562}
1563
1564/* e_end_block() is called via drbd_process_done_ee().
1565 * this means this function only runs in the asender thread
1566 */
1567static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1568{
1569 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
010f6e67 1570 sector_t sector = e->i.sector;
b411b363
PR
1571 int ok = 1, pcmd;
1572
89e58e75 1573 if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C) {
45bb912b 1574 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1575 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1576 mdev->state.conn <= C_PAUSED_SYNC_T &&
1577 e->flags & EE_MAY_SET_IN_SYNC) ?
1578 P_RS_WRITE_ACK : P_WRITE_ACK;
1579 ok &= drbd_send_ack(mdev, pcmd, e);
1580 if (pcmd == P_RS_WRITE_ACK)
010f6e67 1581 drbd_set_in_sync(mdev, sector, e->i.size);
b411b363
PR
1582 } else {
1583 ok = drbd_send_ack(mdev, P_NEG_ACK, e);
1584 /* we expect it to be marked out of sync anyways...
1585 * maybe assert this? */
1586 }
1587 dec_unacked(mdev);
1588 }
1589 /* we delete from the conflict detection hash _after_ we sent out the
1590 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
89e58e75 1591 if (mdev->tconn->net_conf->two_primaries) {
87eeee41 1592 spin_lock_irq(&mdev->tconn->req_lock);
8b946255
AG
1593 D_ASSERT(!drbd_interval_empty(&e->i));
1594 drbd_remove_interval(&mdev->epoch_entries, &e->i);
1595 drbd_clear_interval(&e->i);
87eeee41 1596 spin_unlock_irq(&mdev->tconn->req_lock);
bb3bfe96 1597 } else
8b946255 1598 D_ASSERT(drbd_interval_empty(&e->i));
b411b363
PR
1599
1600 drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1601
1602 return ok;
1603}
1604
1605static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1606{
1607 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1608 int ok = 1;
1609
89e58e75 1610 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
b411b363
PR
1611 ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1612
87eeee41 1613 spin_lock_irq(&mdev->tconn->req_lock);
8b946255
AG
1614 D_ASSERT(!drbd_interval_empty(&e->i));
1615 drbd_remove_interval(&mdev->epoch_entries, &e->i);
1616 drbd_clear_interval(&e->i);
87eeee41 1617 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
1618
1619 dec_unacked(mdev);
1620
1621 return ok;
1622}
1623
1624/* Called from receive_Data.
1625 * Synchronize packets on sock with packets on msock.
1626 *
1627 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1628 * packet traveling on msock, they are still processed in the order they have
1629 * been sent.
1630 *
1631 * Note: we don't care for Ack packets overtaking P_DATA packets.
1632 *
1633 * In case packet_seq is larger than mdev->peer_seq number, there are
1634 * outstanding packets on the msock. We wait for them to arrive.
1635 * In case we are the logically next packet, we update mdev->peer_seq
1636 * ourselves. Correctly handles 32bit wrap around.
1637 *
1638 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1639 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1640 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1641 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1642 *
1643 * returns 0 if we may process the packet,
1644 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1645static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1646{
1647 DEFINE_WAIT(wait);
1648 unsigned int p_seq;
1649 long timeout;
1650 int ret = 0;
1651 spin_lock(&mdev->peer_seq_lock);
1652 for (;;) {
1653 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1654 if (seq_le(packet_seq, mdev->peer_seq+1))
1655 break;
1656 if (signal_pending(current)) {
1657 ret = -ERESTARTSYS;
1658 break;
1659 }
1660 p_seq = mdev->peer_seq;
1661 spin_unlock(&mdev->peer_seq_lock);
1662 timeout = schedule_timeout(30*HZ);
1663 spin_lock(&mdev->peer_seq_lock);
1664 if (timeout == 0 && p_seq == mdev->peer_seq) {
1665 ret = -ETIMEDOUT;
1666 dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1667 break;
1668 }
1669 }
1670 finish_wait(&mdev->seq_wait, &wait);
1671 if (mdev->peer_seq+1 == packet_seq)
1672 mdev->peer_seq++;
1673 spin_unlock(&mdev->peer_seq_lock);
1674 return ret;
1675}
1676
688593c5
LE
1677/* see also bio_flags_to_wire()
1678 * DRBD_REQ_*, because we need to semantically map the flags to data packet
1679 * flags and back. We may replicate to other kernel versions. */
1680static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
76d2e7ec 1681{
688593c5
LE
1682 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1683 (dpf & DP_FUA ? REQ_FUA : 0) |
1684 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1685 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
76d2e7ec
PR
1686}
1687
b411b363 1688/* mirrored write */
d8763023
AG
1689static int receive_Data(struct drbd_conf *mdev, enum drbd_packet cmd,
1690 unsigned int data_size)
b411b363
PR
1691{
1692 sector_t sector;
1693 struct drbd_epoch_entry *e;
e42325a5 1694 struct p_data *p = &mdev->tconn->data.rbuf.data;
b411b363
PR
1695 int rw = WRITE;
1696 u32 dp_flags;
1697
b411b363 1698 if (!get_ldev(mdev)) {
b411b363
PR
1699 spin_lock(&mdev->peer_seq_lock);
1700 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1701 mdev->peer_seq++;
1702 spin_unlock(&mdev->peer_seq_lock);
1703
2b2bf214 1704 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
b411b363
PR
1705 atomic_inc(&mdev->current_epoch->epoch_size);
1706 return drbd_drain_block(mdev, data_size);
1707 }
1708
1709 /* get_ldev(mdev) successful.
1710 * Corresponding put_ldev done either below (on various errors),
9c50842a 1711 * or in drbd_endio_sec, if we successfully submit the data at
b411b363
PR
1712 * the end of this function. */
1713
1714 sector = be64_to_cpu(p->sector);
1715 e = read_in_block(mdev, p->block_id, sector, data_size);
1716 if (!e) {
1717 put_ldev(mdev);
81e84650 1718 return false;
b411b363
PR
1719 }
1720
b411b363
PR
1721 e->w.cb = e_end_block;
1722
688593c5
LE
1723 dp_flags = be32_to_cpu(p->dp_flags);
1724 rw |= wire_flags_to_bio(mdev, dp_flags);
1725
1726 if (dp_flags & DP_MAY_SET_IN_SYNC)
1727 e->flags |= EE_MAY_SET_IN_SYNC;
1728
b411b363
PR
1729 spin_lock(&mdev->epoch_lock);
1730 e->epoch = mdev->current_epoch;
1731 atomic_inc(&e->epoch->epoch_size);
1732 atomic_inc(&e->epoch->active);
b411b363
PR
1733 spin_unlock(&mdev->epoch_lock);
1734
b411b363 1735 /* I'm the receiver, I do hold a net_cnt reference. */
89e58e75 1736 if (!mdev->tconn->net_conf->two_primaries) {
87eeee41 1737 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
1738 } else {
1739 /* don't get the req_lock yet,
1740 * we may sleep in drbd_wait_peer_seq */
010f6e67 1741 const int size = e->i.size;
b411b363
PR
1742 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1743 DEFINE_WAIT(wait);
b411b363
PR
1744 int first;
1745
89e58e75 1746 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
b411b363
PR
1747
1748 /* conflict detection and handling:
1749 * 1. wait on the sequence number,
1750 * in case this data packet overtook ACK packets.
bb3bfe96
AG
1751 * 2. check our interval trees for conflicting requests:
1752 * we only need to check the write_requests tree; the
1753 * epoch_entries tree cannot contain any overlaps because
1754 * they were already eliminated on the submitting node.
b411b363
PR
1755 *
1756 * Note: for two_primaries, we are protocol C,
1757 * so there cannot be any request that is DONE
1758 * but still on the transfer log.
1759 *
bb3bfe96 1760 * unconditionally add to the epoch_entries tree.
b411b363
PR
1761 *
1762 * if no conflicting request is found:
1763 * submit.
1764 *
1765 * if any conflicting request is found
1766 * that has not yet been acked,
1767 * AND I have the "discard concurrent writes" flag:
1768 * queue (via done_ee) the P_DISCARD_ACK; OUT.
1769 *
1770 * if any conflicting request is found:
1771 * block the receiver, waiting on misc_wait
1772 * until no more conflicting requests are there,
1773 * or we get interrupted (disconnect).
1774 *
1775 * we do not just write after local io completion of those
1776 * requests, but only after req is done completely, i.e.
1777 * we wait for the P_DISCARD_ACK to arrive!
1778 *
1779 * then proceed normally, i.e. submit.
1780 */
1781 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1782 goto out_interrupted;
1783
87eeee41 1784 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 1785
8b946255 1786 drbd_insert_interval(&mdev->epoch_entries, &e->i);
b411b363 1787
b411b363
PR
1788 first = 1;
1789 for (;;) {
de696716 1790 struct drbd_interval *i;
b411b363
PR
1791 int have_unacked = 0;
1792 int have_conflict = 0;
1793 prepare_to_wait(&mdev->misc_wait, &wait,
1794 TASK_INTERRUPTIBLE);
de696716
AG
1795
1796 i = drbd_find_overlap(&mdev->write_requests, sector, size);
1797 if (i) {
1798 struct drbd_request *req2 =
1799 container_of(i, struct drbd_request, i);
1800
1801 /* only ALERT on first iteration,
1802 * we may be woken up early... */
1803 if (first)
1804 dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1805 " new: %llus +%u; pending: %llus +%u\n",
1806 current->comm, current->pid,
1807 (unsigned long long)sector, size,
1808 (unsigned long long)req2->i.sector, req2->i.size);
1809 if (req2->rq_state & RQ_NET_PENDING)
1810 ++have_unacked;
1811 ++have_conflict;
b411b363 1812 }
b411b363
PR
1813 if (!have_conflict)
1814 break;
1815
1816 /* Discard Ack only for the _first_ iteration */
1817 if (first && discard && have_unacked) {
1818 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1819 (unsigned long long)sector);
1820 inc_unacked(mdev);
1821 e->w.cb = e_send_discard_ack;
1822 list_add_tail(&e->w.list, &mdev->done_ee);
1823
87eeee41 1824 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
1825
1826 /* we could probably send that P_DISCARD_ACK ourselves,
1827 * but I don't like the receiver using the msock */
1828
1829 put_ldev(mdev);
1830 wake_asender(mdev);
1831 finish_wait(&mdev->misc_wait, &wait);
81e84650 1832 return true;
b411b363
PR
1833 }
1834
1835 if (signal_pending(current)) {
8b946255
AG
1836 drbd_remove_interval(&mdev->epoch_entries, &e->i);
1837 drbd_clear_interval(&e->i);
b411b363 1838
87eeee41 1839 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
1840
1841 finish_wait(&mdev->misc_wait, &wait);
1842 goto out_interrupted;
1843 }
1844
87eeee41 1845 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
1846 if (first) {
1847 first = 0;
1848 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1849 "sec=%llus\n", (unsigned long long)sector);
1850 } else if (discard) {
1851 /* we had none on the first iteration.
1852 * there must be none now. */
1853 D_ASSERT(have_unacked == 0);
1854 }
1855 schedule();
87eeee41 1856 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
1857 }
1858 finish_wait(&mdev->misc_wait, &wait);
1859 }
1860
1861 list_add(&e->w.list, &mdev->active_ee);
87eeee41 1862 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 1863
89e58e75 1864 switch (mdev->tconn->net_conf->wire_protocol) {
b411b363
PR
1865 case DRBD_PROT_C:
1866 inc_unacked(mdev);
1867 /* corresponding dec_unacked() in e_end_block()
1868 * respective _drbd_clear_done_ee */
1869 break;
1870 case DRBD_PROT_B:
1871 /* I really don't like it that the receiver thread
1872 * sends on the msock, but anyways */
1873 drbd_send_ack(mdev, P_RECV_ACK, e);
1874 break;
1875 case DRBD_PROT_A:
1876 /* nothing to do */
1877 break;
1878 }
1879
6719fb03 1880 if (mdev->state.pdsk < D_INCONSISTENT) {
b411b363 1881 /* In case we have the only disk of the cluster, */
010f6e67 1882 drbd_set_out_of_sync(mdev, e->i.sector, e->i.size);
b411b363 1883 e->flags |= EE_CALL_AL_COMPLETE_IO;
6719fb03 1884 e->flags &= ~EE_MAY_SET_IN_SYNC;
010f6e67 1885 drbd_al_begin_io(mdev, e->i.sector);
b411b363
PR
1886 }
1887
45bb912b 1888 if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
81e84650 1889 return true;
b411b363 1890
10f6d992
LE
1891 /* don't care for the reason here */
1892 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 1893 spin_lock_irq(&mdev->tconn->req_lock);
22cc37a9 1894 list_del(&e->w.list);
8b946255
AG
1895 drbd_remove_interval(&mdev->epoch_entries, &e->i);
1896 drbd_clear_interval(&e->i);
87eeee41 1897 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9 1898 if (e->flags & EE_CALL_AL_COMPLETE_IO)
010f6e67 1899 drbd_al_complete_io(mdev, e->i.sector);
22cc37a9 1900
b411b363 1901out_interrupted:
10f6d992 1902 drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + EV_CLEANUP);
b411b363
PR
1903 put_ldev(mdev);
1904 drbd_free_ee(mdev, e);
81e84650 1905 return false;
b411b363
PR
1906}
1907
0f0601f4
LE
1908/* We may throttle resync, if the lower device seems to be busy,
1909 * and current sync rate is above c_min_rate.
1910 *
1911 * To decide whether or not the lower device is busy, we use a scheme similar
1912 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
1913 * (more than 64 sectors) of activity we cannot account for with our own resync
1914 * activity, it obviously is "busy".
1915 *
1916 * The current sync rate used here uses only the most recent two step marks,
1917 * to have a short time average so we can react faster.
1918 */
e3555d85 1919int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
0f0601f4
LE
1920{
1921 struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
1922 unsigned long db, dt, dbdt;
e3555d85 1923 struct lc_element *tmp;
0f0601f4
LE
1924 int curr_events;
1925 int throttle = 0;
1926
1927 /* feature disabled? */
1928 if (mdev->sync_conf.c_min_rate == 0)
1929 return 0;
1930
e3555d85
PR
1931 spin_lock_irq(&mdev->al_lock);
1932 tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
1933 if (tmp) {
1934 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
1935 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
1936 spin_unlock_irq(&mdev->al_lock);
1937 return 0;
1938 }
1939 /* Do not slow down if app IO is already waiting for this extent */
1940 }
1941 spin_unlock_irq(&mdev->al_lock);
1942
0f0601f4
LE
1943 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
1944 (int)part_stat_read(&disk->part0, sectors[1]) -
1945 atomic_read(&mdev->rs_sect_ev);
e3555d85 1946
0f0601f4
LE
1947 if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
1948 unsigned long rs_left;
1949 int i;
1950
1951 mdev->rs_last_events = curr_events;
1952
1953 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
1954 * approx. */
2649f080
LE
1955 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
1956
1957 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
1958 rs_left = mdev->ov_left;
1959 else
1960 rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
0f0601f4
LE
1961
1962 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
1963 if (!dt)
1964 dt++;
1965 db = mdev->rs_mark_left[i] - rs_left;
1966 dbdt = Bit2KB(db/dt);
1967
1968 if (dbdt > mdev->sync_conf.c_min_rate)
1969 throttle = 1;
1970 }
1971 return throttle;
1972}
1973
1974
d8763023
AG
1975static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packet cmd,
1976 unsigned int digest_size)
b411b363
PR
1977{
1978 sector_t sector;
1979 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1980 struct drbd_epoch_entry *e;
1981 struct digest_info *di = NULL;
b18b37be 1982 int size, verb;
b411b363 1983 unsigned int fault_type;
e42325a5 1984 struct p_block_req *p = &mdev->tconn->data.rbuf.block_req;
b411b363
PR
1985
1986 sector = be64_to_cpu(p->sector);
1987 size = be32_to_cpu(p->blksize);
1988
1816a2b4 1989 if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_BIO_SIZE) {
b411b363
PR
1990 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1991 (unsigned long long)sector, size);
81e84650 1992 return false;
b411b363
PR
1993 }
1994 if (sector + (size>>9) > capacity) {
1995 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1996 (unsigned long long)sector, size);
81e84650 1997 return false;
b411b363
PR
1998 }
1999
2000 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
b18b37be
PR
2001 verb = 1;
2002 switch (cmd) {
2003 case P_DATA_REQUEST:
2004 drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2005 break;
2006 case P_RS_DATA_REQUEST:
2007 case P_CSUM_RS_REQUEST:
2008 case P_OV_REQUEST:
2009 drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2010 break;
2011 case P_OV_REPLY:
2012 verb = 0;
2013 dec_rs_pending(mdev);
2014 drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2015 break;
2016 default:
2017 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2018 cmdname(cmd));
2019 }
2020 if (verb && __ratelimit(&drbd_ratelimit_state))
b411b363
PR
2021 dev_err(DEV, "Can not satisfy peer's read request, "
2022 "no local data.\n");
b18b37be 2023
a821cc4a
LE
2024 /* drain possibly payload */
2025 return drbd_drain_block(mdev, digest_size);
b411b363
PR
2026 }
2027
2028 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2029 * "criss-cross" setup, that might cause write-out on some other DRBD,
2030 * which in turn might block on the other node at this very place. */
2031 e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2032 if (!e) {
2033 put_ldev(mdev);
81e84650 2034 return false;
b411b363
PR
2035 }
2036
02918be2 2037 switch (cmd) {
b411b363
PR
2038 case P_DATA_REQUEST:
2039 e->w.cb = w_e_end_data_req;
2040 fault_type = DRBD_FAULT_DT_RD;
80a40e43
LE
2041 /* application IO, don't drbd_rs_begin_io */
2042 goto submit;
2043
b411b363
PR
2044 case P_RS_DATA_REQUEST:
2045 e->w.cb = w_e_end_rsdata_req;
2046 fault_type = DRBD_FAULT_RS_RD;
5f9915bb
LE
2047 /* used in the sector offset progress display */
2048 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
b411b363
PR
2049 break;
2050
2051 case P_OV_REPLY:
2052 case P_CSUM_RS_REQUEST:
2053 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2054 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2055 if (!di)
2056 goto out_free_e;
2057
2058 di->digest_size = digest_size;
2059 di->digest = (((char *)di)+sizeof(struct digest_info));
2060
c36c3ced
LE
2061 e->digest = di;
2062 e->flags |= EE_HAS_DIGEST;
2063
b411b363
PR
2064 if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2065 goto out_free_e;
2066
02918be2 2067 if (cmd == P_CSUM_RS_REQUEST) {
31890f4a 2068 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
b411b363 2069 e->w.cb = w_e_end_csum_rs_req;
5f9915bb
LE
2070 /* used in the sector offset progress display */
2071 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
02918be2 2072 } else if (cmd == P_OV_REPLY) {
2649f080
LE
2073 /* track progress, we may need to throttle */
2074 atomic_add(size >> 9, &mdev->rs_sect_in);
b411b363
PR
2075 e->w.cb = w_e_end_ov_reply;
2076 dec_rs_pending(mdev);
0f0601f4
LE
2077 /* drbd_rs_begin_io done when we sent this request,
2078 * but accounting still needs to be done. */
2079 goto submit_for_resync;
b411b363
PR
2080 }
2081 break;
2082
2083 case P_OV_REQUEST:
b411b363 2084 if (mdev->ov_start_sector == ~(sector_t)0 &&
31890f4a 2085 mdev->tconn->agreed_pro_version >= 90) {
de228bba
LE
2086 unsigned long now = jiffies;
2087 int i;
b411b363
PR
2088 mdev->ov_start_sector = sector;
2089 mdev->ov_position = sector;
30b743a2
LE
2090 mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2091 mdev->rs_total = mdev->ov_left;
de228bba
LE
2092 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2093 mdev->rs_mark_left[i] = mdev->ov_left;
2094 mdev->rs_mark_time[i] = now;
2095 }
b411b363
PR
2096 dev_info(DEV, "Online Verify start sector: %llu\n",
2097 (unsigned long long)sector);
2098 }
2099 e->w.cb = w_e_end_ov_req;
2100 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2101 break;
2102
b411b363
PR
2103 default:
2104 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
02918be2 2105 cmdname(cmd));
b411b363 2106 fault_type = DRBD_FAULT_MAX;
80a40e43 2107 goto out_free_e;
b411b363
PR
2108 }
2109
0f0601f4
LE
2110 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2111 * wrt the receiver, but it is not as straightforward as it may seem.
2112 * Various places in the resync start and stop logic assume resync
2113 * requests are processed in order, requeuing this on the worker thread
2114 * introduces a bunch of new code for synchronization between threads.
2115 *
2116 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2117 * "forever", throttling after drbd_rs_begin_io will lock that extent
2118 * for application writes for the same time. For now, just throttle
2119 * here, where the rest of the code expects the receiver to sleep for
2120 * a while, anyways.
2121 */
2122
2123 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2124 * this defers syncer requests for some time, before letting at least
2125 * on request through. The resync controller on the receiving side
2126 * will adapt to the incoming rate accordingly.
2127 *
2128 * We cannot throttle here if remote is Primary/SyncTarget:
2129 * we would also throttle its application reads.
2130 * In that case, throttling is done on the SyncTarget only.
2131 */
e3555d85
PR
2132 if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2133 schedule_timeout_uninterruptible(HZ/10);
2134 if (drbd_rs_begin_io(mdev, sector))
80a40e43 2135 goto out_free_e;
b411b363 2136
0f0601f4
LE
2137submit_for_resync:
2138 atomic_add(size >> 9, &mdev->rs_sect_ev);
2139
80a40e43 2140submit:
b411b363 2141 inc_unacked(mdev);
87eeee41 2142 spin_lock_irq(&mdev->tconn->req_lock);
80a40e43 2143 list_add_tail(&e->w.list, &mdev->read_ee);
87eeee41 2144 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 2145
45bb912b 2146 if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
81e84650 2147 return true;
b411b363 2148
10f6d992
LE
2149 /* don't care for the reason here */
2150 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 2151 spin_lock_irq(&mdev->tconn->req_lock);
22cc37a9 2152 list_del(&e->w.list);
87eeee41 2153 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9
LE
2154 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2155
b411b363 2156out_free_e:
b411b363
PR
2157 put_ldev(mdev);
2158 drbd_free_ee(mdev, e);
81e84650 2159 return false;
b411b363
PR
2160}
2161
2162static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2163{
2164 int self, peer, rv = -100;
2165 unsigned long ch_self, ch_peer;
2166
2167 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2168 peer = mdev->p_uuid[UI_BITMAP] & 1;
2169
2170 ch_peer = mdev->p_uuid[UI_SIZE];
2171 ch_self = mdev->comm_bm_set;
2172
89e58e75 2173 switch (mdev->tconn->net_conf->after_sb_0p) {
b411b363
PR
2174 case ASB_CONSENSUS:
2175 case ASB_DISCARD_SECONDARY:
2176 case ASB_CALL_HELPER:
2177 dev_err(DEV, "Configuration error.\n");
2178 break;
2179 case ASB_DISCONNECT:
2180 break;
2181 case ASB_DISCARD_YOUNGER_PRI:
2182 if (self == 0 && peer == 1) {
2183 rv = -1;
2184 break;
2185 }
2186 if (self == 1 && peer == 0) {
2187 rv = 1;
2188 break;
2189 }
2190 /* Else fall through to one of the other strategies... */
2191 case ASB_DISCARD_OLDER_PRI:
2192 if (self == 0 && peer == 1) {
2193 rv = 1;
2194 break;
2195 }
2196 if (self == 1 && peer == 0) {
2197 rv = -1;
2198 break;
2199 }
2200 /* Else fall through to one of the other strategies... */
ad19bf6e 2201 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
b411b363
PR
2202 "Using discard-least-changes instead\n");
2203 case ASB_DISCARD_ZERO_CHG:
2204 if (ch_peer == 0 && ch_self == 0) {
2205 rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2206 ? -1 : 1;
2207 break;
2208 } else {
2209 if (ch_peer == 0) { rv = 1; break; }
2210 if (ch_self == 0) { rv = -1; break; }
2211 }
89e58e75 2212 if (mdev->tconn->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
b411b363
PR
2213 break;
2214 case ASB_DISCARD_LEAST_CHG:
2215 if (ch_self < ch_peer)
2216 rv = -1;
2217 else if (ch_self > ch_peer)
2218 rv = 1;
2219 else /* ( ch_self == ch_peer ) */
2220 /* Well, then use something else. */
2221 rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2222 ? -1 : 1;
2223 break;
2224 case ASB_DISCARD_LOCAL:
2225 rv = -1;
2226 break;
2227 case ASB_DISCARD_REMOTE:
2228 rv = 1;
2229 }
2230
2231 return rv;
2232}
2233
2234static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2235{
6184ea21 2236 int hg, rv = -100;
b411b363 2237
89e58e75 2238 switch (mdev->tconn->net_conf->after_sb_1p) {
b411b363
PR
2239 case ASB_DISCARD_YOUNGER_PRI:
2240 case ASB_DISCARD_OLDER_PRI:
2241 case ASB_DISCARD_LEAST_CHG:
2242 case ASB_DISCARD_LOCAL:
2243 case ASB_DISCARD_REMOTE:
2244 dev_err(DEV, "Configuration error.\n");
2245 break;
2246 case ASB_DISCONNECT:
2247 break;
2248 case ASB_CONSENSUS:
2249 hg = drbd_asb_recover_0p(mdev);
2250 if (hg == -1 && mdev->state.role == R_SECONDARY)
2251 rv = hg;
2252 if (hg == 1 && mdev->state.role == R_PRIMARY)
2253 rv = hg;
2254 break;
2255 case ASB_VIOLENTLY:
2256 rv = drbd_asb_recover_0p(mdev);
2257 break;
2258 case ASB_DISCARD_SECONDARY:
2259 return mdev->state.role == R_PRIMARY ? 1 : -1;
2260 case ASB_CALL_HELPER:
2261 hg = drbd_asb_recover_0p(mdev);
2262 if (hg == -1 && mdev->state.role == R_PRIMARY) {
bb437946
AG
2263 enum drbd_state_rv rv2;
2264
2265 drbd_set_role(mdev, R_SECONDARY, 0);
b411b363
PR
2266 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2267 * we might be here in C_WF_REPORT_PARAMS which is transient.
2268 * we do not need to wait for the after state change work either. */
bb437946
AG
2269 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2270 if (rv2 != SS_SUCCESS) {
b411b363
PR
2271 drbd_khelper(mdev, "pri-lost-after-sb");
2272 } else {
2273 dev_warn(DEV, "Successfully gave up primary role.\n");
2274 rv = hg;
2275 }
2276 } else
2277 rv = hg;
2278 }
2279
2280 return rv;
2281}
2282
2283static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2284{
6184ea21 2285 int hg, rv = -100;
b411b363 2286
89e58e75 2287 switch (mdev->tconn->net_conf->after_sb_2p) {
b411b363
PR
2288 case ASB_DISCARD_YOUNGER_PRI:
2289 case ASB_DISCARD_OLDER_PRI:
2290 case ASB_DISCARD_LEAST_CHG:
2291 case ASB_DISCARD_LOCAL:
2292 case ASB_DISCARD_REMOTE:
2293 case ASB_CONSENSUS:
2294 case ASB_DISCARD_SECONDARY:
2295 dev_err(DEV, "Configuration error.\n");
2296 break;
2297 case ASB_VIOLENTLY:
2298 rv = drbd_asb_recover_0p(mdev);
2299 break;
2300 case ASB_DISCONNECT:
2301 break;
2302 case ASB_CALL_HELPER:
2303 hg = drbd_asb_recover_0p(mdev);
2304 if (hg == -1) {
bb437946
AG
2305 enum drbd_state_rv rv2;
2306
b411b363
PR
2307 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2308 * we might be here in C_WF_REPORT_PARAMS which is transient.
2309 * we do not need to wait for the after state change work either. */
bb437946
AG
2310 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2311 if (rv2 != SS_SUCCESS) {
b411b363
PR
2312 drbd_khelper(mdev, "pri-lost-after-sb");
2313 } else {
2314 dev_warn(DEV, "Successfully gave up primary role.\n");
2315 rv = hg;
2316 }
2317 } else
2318 rv = hg;
2319 }
2320
2321 return rv;
2322}
2323
2324static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2325 u64 bits, u64 flags)
2326{
2327 if (!uuid) {
2328 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2329 return;
2330 }
2331 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2332 text,
2333 (unsigned long long)uuid[UI_CURRENT],
2334 (unsigned long long)uuid[UI_BITMAP],
2335 (unsigned long long)uuid[UI_HISTORY_START],
2336 (unsigned long long)uuid[UI_HISTORY_END],
2337 (unsigned long long)bits,
2338 (unsigned long long)flags);
2339}
2340
2341/*
2342 100 after split brain try auto recover
2343 2 C_SYNC_SOURCE set BitMap
2344 1 C_SYNC_SOURCE use BitMap
2345 0 no Sync
2346 -1 C_SYNC_TARGET use BitMap
2347 -2 C_SYNC_TARGET set BitMap
2348 -100 after split brain, disconnect
2349-1000 unrelated data
4a23f264
PR
2350-1091 requires proto 91
2351-1096 requires proto 96
b411b363
PR
2352 */
2353static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2354{
2355 u64 self, peer;
2356 int i, j;
2357
2358 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2359 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2360
2361 *rule_nr = 10;
2362 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2363 return 0;
2364
2365 *rule_nr = 20;
2366 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2367 peer != UUID_JUST_CREATED)
2368 return -2;
2369
2370 *rule_nr = 30;
2371 if (self != UUID_JUST_CREATED &&
2372 (peer == UUID_JUST_CREATED || peer == (u64)0))
2373 return 2;
2374
2375 if (self == peer) {
2376 int rct, dc; /* roles at crash time */
2377
2378 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2379
31890f4a 2380 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2381 return -1091;
b411b363
PR
2382
2383 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2384 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2385 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2386 drbd_uuid_set_bm(mdev, 0UL);
2387
2388 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2389 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2390 *rule_nr = 34;
2391 } else {
2392 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2393 *rule_nr = 36;
2394 }
2395
2396 return 1;
2397 }
2398
2399 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2400
31890f4a 2401 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2402 return -1091;
b411b363
PR
2403
2404 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2405 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2406 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2407
2408 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2409 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2410 mdev->p_uuid[UI_BITMAP] = 0UL;
2411
2412 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2413 *rule_nr = 35;
2414 } else {
2415 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2416 *rule_nr = 37;
2417 }
2418
2419 return -1;
2420 }
2421
2422 /* Common power [off|failure] */
2423 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2424 (mdev->p_uuid[UI_FLAGS] & 2);
2425 /* lowest bit is set when we were primary,
2426 * next bit (weight 2) is set when peer was primary */
2427 *rule_nr = 40;
2428
2429 switch (rct) {
2430 case 0: /* !self_pri && !peer_pri */ return 0;
2431 case 1: /* self_pri && !peer_pri */ return 1;
2432 case 2: /* !self_pri && peer_pri */ return -1;
2433 case 3: /* self_pri && peer_pri */
2434 dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2435 return dc ? -1 : 1;
2436 }
2437 }
2438
2439 *rule_nr = 50;
2440 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2441 if (self == peer)
2442 return -1;
2443
2444 *rule_nr = 51;
2445 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2446 if (self == peer) {
31890f4a 2447 if (mdev->tconn->agreed_pro_version < 96 ?
4a23f264
PR
2448 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2449 (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2450 peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2451 /* The last P_SYNC_UUID did not get though. Undo the last start of
2452 resync as sync source modifications of the peer's UUIDs. */
2453
31890f4a 2454 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2455 return -1091;
b411b363
PR
2456
2457 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2458 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
4a23f264
PR
2459
2460 dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2461 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2462
b411b363
PR
2463 return -1;
2464 }
2465 }
2466
2467 *rule_nr = 60;
2468 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2469 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2470 peer = mdev->p_uuid[i] & ~((u64)1);
2471 if (self == peer)
2472 return -2;
2473 }
2474
2475 *rule_nr = 70;
2476 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2477 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2478 if (self == peer)
2479 return 1;
2480
2481 *rule_nr = 71;
2482 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2483 if (self == peer) {
31890f4a 2484 if (mdev->tconn->agreed_pro_version < 96 ?
4a23f264
PR
2485 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2486 (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2487 self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2488 /* The last P_SYNC_UUID did not get though. Undo the last start of
2489 resync as sync source modifications of our UUIDs. */
2490
31890f4a 2491 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2492 return -1091;
b411b363
PR
2493
2494 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2495 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2496
4a23f264 2497 dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
b411b363
PR
2498 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2499 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2500
2501 return 1;
2502 }
2503 }
2504
2505
2506 *rule_nr = 80;
d8c2a36b 2507 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
b411b363
PR
2508 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2509 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2510 if (self == peer)
2511 return 2;
2512 }
2513
2514 *rule_nr = 90;
2515 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2516 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2517 if (self == peer && self != ((u64)0))
2518 return 100;
2519
2520 *rule_nr = 100;
2521 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2522 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2523 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2524 peer = mdev->p_uuid[j] & ~((u64)1);
2525 if (self == peer)
2526 return -100;
2527 }
2528 }
2529
2530 return -1000;
2531}
2532
2533/* drbd_sync_handshake() returns the new conn state on success, or
2534 CONN_MASK (-1) on failure.
2535 */
2536static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2537 enum drbd_disk_state peer_disk) __must_hold(local)
2538{
2539 int hg, rule_nr;
2540 enum drbd_conns rv = C_MASK;
2541 enum drbd_disk_state mydisk;
2542
2543 mydisk = mdev->state.disk;
2544 if (mydisk == D_NEGOTIATING)
2545 mydisk = mdev->new_state_tmp.disk;
2546
2547 dev_info(DEV, "drbd_sync_handshake:\n");
2548 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2549 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2550 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2551
2552 hg = drbd_uuid_compare(mdev, &rule_nr);
2553
2554 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2555
2556 if (hg == -1000) {
2557 dev_alert(DEV, "Unrelated data, aborting!\n");
2558 return C_MASK;
2559 }
4a23f264
PR
2560 if (hg < -1000) {
2561 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
b411b363
PR
2562 return C_MASK;
2563 }
2564
2565 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2566 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2567 int f = (hg == -100) || abs(hg) == 2;
2568 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2569 if (f)
2570 hg = hg*2;
2571 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2572 hg > 0 ? "source" : "target");
2573 }
2574
3a11a487
AG
2575 if (abs(hg) == 100)
2576 drbd_khelper(mdev, "initial-split-brain");
2577
89e58e75 2578 if (hg == 100 || (hg == -100 && mdev->tconn->net_conf->always_asbp)) {
b411b363
PR
2579 int pcount = (mdev->state.role == R_PRIMARY)
2580 + (peer_role == R_PRIMARY);
2581 int forced = (hg == -100);
2582
2583 switch (pcount) {
2584 case 0:
2585 hg = drbd_asb_recover_0p(mdev);
2586 break;
2587 case 1:
2588 hg = drbd_asb_recover_1p(mdev);
2589 break;
2590 case 2:
2591 hg = drbd_asb_recover_2p(mdev);
2592 break;
2593 }
2594 if (abs(hg) < 100) {
2595 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2596 "automatically solved. Sync from %s node\n",
2597 pcount, (hg < 0) ? "peer" : "this");
2598 if (forced) {
2599 dev_warn(DEV, "Doing a full sync, since"
2600 " UUIDs where ambiguous.\n");
2601 hg = hg*2;
2602 }
2603 }
2604 }
2605
2606 if (hg == -100) {
89e58e75 2607 if (mdev->tconn->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
b411b363 2608 hg = -1;
89e58e75 2609 if (!mdev->tconn->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
b411b363
PR
2610 hg = 1;
2611
2612 if (abs(hg) < 100)
2613 dev_warn(DEV, "Split-Brain detected, manually solved. "
2614 "Sync from %s node\n",
2615 (hg < 0) ? "peer" : "this");
2616 }
2617
2618 if (hg == -100) {
580b9767
LE
2619 /* FIXME this log message is not correct if we end up here
2620 * after an attempted attach on a diskless node.
2621 * We just refuse to attach -- well, we drop the "connection"
2622 * to that disk, in a way... */
3a11a487 2623 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
b411b363
PR
2624 drbd_khelper(mdev, "split-brain");
2625 return C_MASK;
2626 }
2627
2628 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2629 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2630 return C_MASK;
2631 }
2632
2633 if (hg < 0 && /* by intention we do not use mydisk here. */
2634 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
89e58e75 2635 switch (mdev->tconn->net_conf->rr_conflict) {
b411b363
PR
2636 case ASB_CALL_HELPER:
2637 drbd_khelper(mdev, "pri-lost");
2638 /* fall through */
2639 case ASB_DISCONNECT:
2640 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2641 return C_MASK;
2642 case ASB_VIOLENTLY:
2643 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2644 "assumption\n");
2645 }
2646 }
2647
89e58e75 2648 if (mdev->tconn->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
cf14c2e9
PR
2649 if (hg == 0)
2650 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2651 else
2652 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2653 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2654 abs(hg) >= 2 ? "full" : "bit-map based");
2655 return C_MASK;
2656 }
2657
b411b363
PR
2658 if (abs(hg) >= 2) {
2659 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
20ceb2b2
LE
2660 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2661 BM_LOCKED_SET_ALLOWED))
b411b363
PR
2662 return C_MASK;
2663 }
2664
2665 if (hg > 0) { /* become sync source. */
2666 rv = C_WF_BITMAP_S;
2667 } else if (hg < 0) { /* become sync target */
2668 rv = C_WF_BITMAP_T;
2669 } else {
2670 rv = C_CONNECTED;
2671 if (drbd_bm_total_weight(mdev)) {
2672 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2673 drbd_bm_total_weight(mdev));
2674 }
2675 }
2676
2677 return rv;
2678}
2679
2680/* returns 1 if invalid */
2681static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2682{
2683 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2684 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2685 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2686 return 0;
2687
2688 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2689 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2690 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2691 return 1;
2692
2693 /* everything else is valid if they are equal on both sides. */
2694 if (peer == self)
2695 return 0;
2696
2697 /* everything es is invalid. */
2698 return 1;
2699}
2700
d8763023
AG
2701static int receive_protocol(struct drbd_conf *mdev, enum drbd_packet cmd,
2702 unsigned int data_size)
b411b363 2703{
e42325a5 2704 struct p_protocol *p = &mdev->tconn->data.rbuf.protocol;
b411b363 2705 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
cf14c2e9 2706 int p_want_lose, p_two_primaries, cf;
b411b363
PR
2707 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2708
b411b363
PR
2709 p_proto = be32_to_cpu(p->protocol);
2710 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2711 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2712 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
b411b363 2713 p_two_primaries = be32_to_cpu(p->two_primaries);
cf14c2e9
PR
2714 cf = be32_to_cpu(p->conn_flags);
2715 p_want_lose = cf & CF_WANT_LOSE;
2716
2717 clear_bit(CONN_DRY_RUN, &mdev->flags);
2718
2719 if (cf & CF_DRY_RUN)
2720 set_bit(CONN_DRY_RUN, &mdev->flags);
b411b363 2721
89e58e75 2722 if (p_proto != mdev->tconn->net_conf->wire_protocol) {
b411b363
PR
2723 dev_err(DEV, "incompatible communication protocols\n");
2724 goto disconnect;
2725 }
2726
89e58e75 2727 if (cmp_after_sb(p_after_sb_0p, mdev->tconn->net_conf->after_sb_0p)) {
b411b363
PR
2728 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2729 goto disconnect;
2730 }
2731
89e58e75 2732 if (cmp_after_sb(p_after_sb_1p, mdev->tconn->net_conf->after_sb_1p)) {
b411b363
PR
2733 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2734 goto disconnect;
2735 }
2736
89e58e75 2737 if (cmp_after_sb(p_after_sb_2p, mdev->tconn->net_conf->after_sb_2p)) {
b411b363
PR
2738 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2739 goto disconnect;
2740 }
2741
89e58e75 2742 if (p_want_lose && mdev->tconn->net_conf->want_lose) {
b411b363
PR
2743 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2744 goto disconnect;
2745 }
2746
89e58e75 2747 if (p_two_primaries != mdev->tconn->net_conf->two_primaries) {
b411b363
PR
2748 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2749 goto disconnect;
2750 }
2751
31890f4a 2752 if (mdev->tconn->agreed_pro_version >= 87) {
89e58e75 2753 unsigned char *my_alg = mdev->tconn->net_conf->integrity_alg;
b411b363
PR
2754
2755 if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
81e84650 2756 return false;
b411b363
PR
2757
2758 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2759 if (strcmp(p_integrity_alg, my_alg)) {
2760 dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2761 goto disconnect;
2762 }
2763 dev_info(DEV, "data-integrity-alg: %s\n",
2764 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2765 }
2766
81e84650 2767 return true;
b411b363
PR
2768
2769disconnect:
2770 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 2771 return false;
b411b363
PR
2772}
2773
2774/* helper function
2775 * input: alg name, feature name
2776 * return: NULL (alg name was "")
2777 * ERR_PTR(error) if something goes wrong
2778 * or the crypto hash ptr, if it worked out ok. */
2779struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2780 const char *alg, const char *name)
2781{
2782 struct crypto_hash *tfm;
2783
2784 if (!alg[0])
2785 return NULL;
2786
2787 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2788 if (IS_ERR(tfm)) {
2789 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2790 alg, name, PTR_ERR(tfm));
2791 return tfm;
2792 }
2793 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2794 crypto_free_hash(tfm);
2795 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2796 return ERR_PTR(-EINVAL);
2797 }
2798 return tfm;
2799}
2800
d8763023
AG
2801static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packet cmd,
2802 unsigned int packet_size)
b411b363 2803{
81e84650 2804 int ok = true;
e42325a5 2805 struct p_rs_param_95 *p = &mdev->tconn->data.rbuf.rs_param_95;
b411b363
PR
2806 unsigned int header_size, data_size, exp_max_sz;
2807 struct crypto_hash *verify_tfm = NULL;
2808 struct crypto_hash *csums_tfm = NULL;
31890f4a 2809 const int apv = mdev->tconn->agreed_pro_version;
778f271d
PR
2810 int *rs_plan_s = NULL;
2811 int fifo_size = 0;
b411b363
PR
2812
2813 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
2814 : apv == 88 ? sizeof(struct p_rs_param)
2815 + SHARED_SECRET_MAX
8e26f9cc
PR
2816 : apv <= 94 ? sizeof(struct p_rs_param_89)
2817 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
b411b363 2818
02918be2 2819 if (packet_size > exp_max_sz) {
b411b363 2820 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
02918be2 2821 packet_size, exp_max_sz);
81e84650 2822 return false;
b411b363
PR
2823 }
2824
2825 if (apv <= 88) {
257d0af6 2826 header_size = sizeof(struct p_rs_param) - sizeof(struct p_header);
02918be2 2827 data_size = packet_size - header_size;
8e26f9cc 2828 } else if (apv <= 94) {
257d0af6 2829 header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header);
02918be2 2830 data_size = packet_size - header_size;
b411b363 2831 D_ASSERT(data_size == 0);
8e26f9cc 2832 } else {
257d0af6 2833 header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header);
02918be2 2834 data_size = packet_size - header_size;
b411b363
PR
2835 D_ASSERT(data_size == 0);
2836 }
2837
2838 /* initialize verify_alg and csums_alg */
2839 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2840
02918be2 2841 if (drbd_recv(mdev, &p->head.payload, header_size) != header_size)
81e84650 2842 return false;
b411b363
PR
2843
2844 mdev->sync_conf.rate = be32_to_cpu(p->rate);
2845
2846 if (apv >= 88) {
2847 if (apv == 88) {
2848 if (data_size > SHARED_SECRET_MAX) {
2849 dev_err(DEV, "verify-alg too long, "
2850 "peer wants %u, accepting only %u byte\n",
2851 data_size, SHARED_SECRET_MAX);
81e84650 2852 return false;
b411b363
PR
2853 }
2854
2855 if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
81e84650 2856 return false;
b411b363
PR
2857
2858 /* we expect NUL terminated string */
2859 /* but just in case someone tries to be evil */
2860 D_ASSERT(p->verify_alg[data_size-1] == 0);
2861 p->verify_alg[data_size-1] = 0;
2862
2863 } else /* apv >= 89 */ {
2864 /* we still expect NUL terminated strings */
2865 /* but just in case someone tries to be evil */
2866 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2867 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2868 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2869 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2870 }
2871
2872 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2873 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2874 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2875 mdev->sync_conf.verify_alg, p->verify_alg);
2876 goto disconnect;
2877 }
2878 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2879 p->verify_alg, "verify-alg");
2880 if (IS_ERR(verify_tfm)) {
2881 verify_tfm = NULL;
2882 goto disconnect;
2883 }
2884 }
2885
2886 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2887 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2888 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2889 mdev->sync_conf.csums_alg, p->csums_alg);
2890 goto disconnect;
2891 }
2892 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2893 p->csums_alg, "csums-alg");
2894 if (IS_ERR(csums_tfm)) {
2895 csums_tfm = NULL;
2896 goto disconnect;
2897 }
2898 }
2899
8e26f9cc
PR
2900 if (apv > 94) {
2901 mdev->sync_conf.rate = be32_to_cpu(p->rate);
2902 mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
2903 mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
2904 mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
2905 mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
778f271d
PR
2906
2907 fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
2908 if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
2909 rs_plan_s = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
2910 if (!rs_plan_s) {
2911 dev_err(DEV, "kmalloc of fifo_buffer failed");
2912 goto disconnect;
2913 }
2914 }
8e26f9cc 2915 }
b411b363
PR
2916
2917 spin_lock(&mdev->peer_seq_lock);
2918 /* lock against drbd_nl_syncer_conf() */
2919 if (verify_tfm) {
2920 strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2921 mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2922 crypto_free_hash(mdev->verify_tfm);
2923 mdev->verify_tfm = verify_tfm;
2924 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2925 }
2926 if (csums_tfm) {
2927 strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2928 mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2929 crypto_free_hash(mdev->csums_tfm);
2930 mdev->csums_tfm = csums_tfm;
2931 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2932 }
778f271d
PR
2933 if (fifo_size != mdev->rs_plan_s.size) {
2934 kfree(mdev->rs_plan_s.values);
2935 mdev->rs_plan_s.values = rs_plan_s;
2936 mdev->rs_plan_s.size = fifo_size;
2937 mdev->rs_planed = 0;
2938 }
b411b363
PR
2939 spin_unlock(&mdev->peer_seq_lock);
2940 }
2941
2942 return ok;
2943disconnect:
2944 /* just for completeness: actually not needed,
2945 * as this is not reached if csums_tfm was ok. */
2946 crypto_free_hash(csums_tfm);
2947 /* but free the verify_tfm again, if csums_tfm did not work out */
2948 crypto_free_hash(verify_tfm);
2949 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 2950 return false;
b411b363
PR
2951}
2952
b411b363
PR
2953/* warn if the arguments differ by more than 12.5% */
2954static void warn_if_differ_considerably(struct drbd_conf *mdev,
2955 const char *s, sector_t a, sector_t b)
2956{
2957 sector_t d;
2958 if (a == 0 || b == 0)
2959 return;
2960 d = (a > b) ? (a - b) : (b - a);
2961 if (d > (a>>3) || d > (b>>3))
2962 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2963 (unsigned long long)a, (unsigned long long)b);
2964}
2965
d8763023
AG
2966static int receive_sizes(struct drbd_conf *mdev, enum drbd_packet cmd,
2967 unsigned int data_size)
b411b363 2968{
e42325a5 2969 struct p_sizes *p = &mdev->tconn->data.rbuf.sizes;
b411b363 2970 enum determine_dev_size dd = unchanged;
b411b363
PR
2971 sector_t p_size, p_usize, my_usize;
2972 int ldsc = 0; /* local disk size changed */
e89b591c 2973 enum dds_flags ddsf;
b411b363 2974
b411b363
PR
2975 p_size = be64_to_cpu(p->d_size);
2976 p_usize = be64_to_cpu(p->u_size);
2977
2978 if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2979 dev_err(DEV, "some backing storage is needed\n");
2980 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 2981 return false;
b411b363
PR
2982 }
2983
2984 /* just store the peer's disk size for now.
2985 * we still need to figure out whether we accept that. */
2986 mdev->p_size = p_size;
2987
b411b363
PR
2988 if (get_ldev(mdev)) {
2989 warn_if_differ_considerably(mdev, "lower level device sizes",
2990 p_size, drbd_get_max_capacity(mdev->ldev));
2991 warn_if_differ_considerably(mdev, "user requested size",
2992 p_usize, mdev->ldev->dc.disk_size);
2993
2994 /* if this is the first connect, or an otherwise expected
2995 * param exchange, choose the minimum */
2996 if (mdev->state.conn == C_WF_REPORT_PARAMS)
2997 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2998 p_usize);
2999
3000 my_usize = mdev->ldev->dc.disk_size;
3001
3002 if (mdev->ldev->dc.disk_size != p_usize) {
3003 mdev->ldev->dc.disk_size = p_usize;
3004 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3005 (unsigned long)mdev->ldev->dc.disk_size);
3006 }
3007
3008 /* Never shrink a device with usable data during connect.
3009 But allow online shrinking if we are connected. */
a393db6f 3010 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
b411b363
PR
3011 drbd_get_capacity(mdev->this_bdev) &&
3012 mdev->state.disk >= D_OUTDATED &&
3013 mdev->state.conn < C_CONNECTED) {
3014 dev_err(DEV, "The peer's disk size is too small!\n");
3015 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3016 mdev->ldev->dc.disk_size = my_usize;
3017 put_ldev(mdev);
81e84650 3018 return false;
b411b363
PR
3019 }
3020 put_ldev(mdev);
3021 }
b411b363 3022
e89b591c 3023 ddsf = be16_to_cpu(p->dds_flags);
b411b363 3024 if (get_ldev(mdev)) {
24c4830c 3025 dd = drbd_determine_dev_size(mdev, ddsf);
b411b363
PR
3026 put_ldev(mdev);
3027 if (dd == dev_size_error)
81e84650 3028 return false;
b411b363
PR
3029 drbd_md_sync(mdev);
3030 } else {
3031 /* I am diskless, need to accept the peer's size. */
3032 drbd_set_my_capacity(mdev, p_size);
3033 }
3034
99432fcc
PR
3035 mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3036 drbd_reconsider_max_bio_size(mdev);
3037
b411b363
PR
3038 if (get_ldev(mdev)) {
3039 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3040 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3041 ldsc = 1;
3042 }
3043
b411b363
PR
3044 put_ldev(mdev);
3045 }
3046
3047 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3048 if (be64_to_cpu(p->c_size) !=
3049 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3050 /* we have different sizes, probably peer
3051 * needs to know my new size... */
e89b591c 3052 drbd_send_sizes(mdev, 0, ddsf);
b411b363
PR
3053 }
3054 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3055 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3056 if (mdev->state.pdsk >= D_INCONSISTENT &&
e89b591c
PR
3057 mdev->state.disk >= D_INCONSISTENT) {
3058 if (ddsf & DDSF_NO_RESYNC)
3059 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3060 else
3061 resync_after_online_grow(mdev);
3062 } else
b411b363
PR
3063 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3064 }
3065 }
3066
81e84650 3067 return true;
b411b363
PR
3068}
3069
d8763023
AG
3070static int receive_uuids(struct drbd_conf *mdev, enum drbd_packet cmd,
3071 unsigned int data_size)
b411b363 3072{
e42325a5 3073 struct p_uuids *p = &mdev->tconn->data.rbuf.uuids;
b411b363 3074 u64 *p_uuid;
62b0da3a 3075 int i, updated_uuids = 0;
b411b363 3076
b411b363
PR
3077 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3078
3079 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3080 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3081
3082 kfree(mdev->p_uuid);
3083 mdev->p_uuid = p_uuid;
3084
3085 if (mdev->state.conn < C_CONNECTED &&
3086 mdev->state.disk < D_INCONSISTENT &&
3087 mdev->state.role == R_PRIMARY &&
3088 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3089 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3090 (unsigned long long)mdev->ed_uuid);
3091 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 3092 return false;
b411b363
PR
3093 }
3094
3095 if (get_ldev(mdev)) {
3096 int skip_initial_sync =
3097 mdev->state.conn == C_CONNECTED &&
31890f4a 3098 mdev->tconn->agreed_pro_version >= 90 &&
b411b363
PR
3099 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3100 (p_uuid[UI_FLAGS] & 8);
3101 if (skip_initial_sync) {
3102 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3103 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
20ceb2b2
LE
3104 "clear_n_write from receive_uuids",
3105 BM_LOCKED_TEST_ALLOWED);
b411b363
PR
3106 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3107 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3108 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3109 CS_VERBOSE, NULL);
3110 drbd_md_sync(mdev);
62b0da3a 3111 updated_uuids = 1;
b411b363
PR
3112 }
3113 put_ldev(mdev);
18a50fa2
PR
3114 } else if (mdev->state.disk < D_INCONSISTENT &&
3115 mdev->state.role == R_PRIMARY) {
3116 /* I am a diskless primary, the peer just created a new current UUID
3117 for me. */
62b0da3a 3118 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
b411b363
PR
3119 }
3120
3121 /* Before we test for the disk state, we should wait until an eventually
3122 ongoing cluster wide state change is finished. That is important if
3123 we are primary and are detaching from our disk. We need to see the
3124 new disk state... */
3125 wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3126 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
62b0da3a
LE
3127 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3128
3129 if (updated_uuids)
3130 drbd_print_uuids(mdev, "receiver updated UUIDs to");
b411b363 3131
81e84650 3132 return true;
b411b363
PR
3133}
3134
3135/**
3136 * convert_state() - Converts the peer's view of the cluster state to our point of view
3137 * @ps: The state as seen by the peer.
3138 */
3139static union drbd_state convert_state(union drbd_state ps)
3140{
3141 union drbd_state ms;
3142
3143 static enum drbd_conns c_tab[] = {
3144 [C_CONNECTED] = C_CONNECTED,
3145
3146 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3147 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3148 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3149 [C_VERIFY_S] = C_VERIFY_T,
3150 [C_MASK] = C_MASK,
3151 };
3152
3153 ms.i = ps.i;
3154
3155 ms.conn = c_tab[ps.conn];
3156 ms.peer = ps.role;
3157 ms.role = ps.peer;
3158 ms.pdsk = ps.disk;
3159 ms.disk = ps.pdsk;
3160 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3161
3162 return ms;
3163}
3164
d8763023
AG
3165static int receive_req_state(struct drbd_conf *mdev, enum drbd_packet cmd,
3166 unsigned int data_size)
b411b363 3167{
e42325a5 3168 struct p_req_state *p = &mdev->tconn->data.rbuf.req_state;
b411b363 3169 union drbd_state mask, val;
bf885f8a 3170 enum drbd_state_rv rv;
b411b363 3171
b411b363
PR
3172 mask.i = be32_to_cpu(p->mask);
3173 val.i = be32_to_cpu(p->val);
3174
3175 if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3176 test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3177 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
81e84650 3178 return true;
b411b363
PR
3179 }
3180
3181 mask = convert_state(mask);
3182 val = convert_state(val);
3183
3184 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3185
3186 drbd_send_sr_reply(mdev, rv);
3187 drbd_md_sync(mdev);
3188
81e84650 3189 return true;
b411b363
PR
3190}
3191
d8763023
AG
3192static int receive_state(struct drbd_conf *mdev, enum drbd_packet cmd,
3193 unsigned int data_size)
b411b363 3194{
e42325a5 3195 struct p_state *p = &mdev->tconn->data.rbuf.state;
4ac4aada 3196 union drbd_state os, ns, peer_state;
b411b363 3197 enum drbd_disk_state real_peer_disk;
65d922c3 3198 enum chg_state_flags cs_flags;
b411b363
PR
3199 int rv;
3200
b411b363
PR
3201 peer_state.i = be32_to_cpu(p->state);
3202
3203 real_peer_disk = peer_state.disk;
3204 if (peer_state.disk == D_NEGOTIATING) {
3205 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3206 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3207 }
3208
87eeee41 3209 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 3210 retry:
4ac4aada 3211 os = ns = mdev->state;
87eeee41 3212 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 3213
e9ef7bb6
LE
3214 /* peer says his disk is uptodate, while we think it is inconsistent,
3215 * and this happens while we think we have a sync going on. */
3216 if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3217 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3218 /* If we are (becoming) SyncSource, but peer is still in sync
3219 * preparation, ignore its uptodate-ness to avoid flapping, it
3220 * will change to inconsistent once the peer reaches active
3221 * syncing states.
3222 * It may have changed syncer-paused flags, however, so we
3223 * cannot ignore this completely. */
3224 if (peer_state.conn > C_CONNECTED &&
3225 peer_state.conn < C_SYNC_SOURCE)
3226 real_peer_disk = D_INCONSISTENT;
3227
3228 /* if peer_state changes to connected at the same time,
3229 * it explicitly notifies us that it finished resync.
3230 * Maybe we should finish it up, too? */
3231 else if (os.conn >= C_SYNC_SOURCE &&
3232 peer_state.conn == C_CONNECTED) {
3233 if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3234 drbd_resync_finished(mdev);
81e84650 3235 return true;
e9ef7bb6
LE
3236 }
3237 }
3238
3239 /* peer says his disk is inconsistent, while we think it is uptodate,
3240 * and this happens while the peer still thinks we have a sync going on,
3241 * but we think we are already done with the sync.
3242 * We ignore this to avoid flapping pdsk.
3243 * This should not happen, if the peer is a recent version of drbd. */
3244 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3245 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3246 real_peer_disk = D_UP_TO_DATE;
3247
4ac4aada
LE
3248 if (ns.conn == C_WF_REPORT_PARAMS)
3249 ns.conn = C_CONNECTED;
b411b363 3250
67531718
PR
3251 if (peer_state.conn == C_AHEAD)
3252 ns.conn = C_BEHIND;
3253
b411b363
PR
3254 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3255 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3256 int cr; /* consider resync */
3257
3258 /* if we established a new connection */
4ac4aada 3259 cr = (os.conn < C_CONNECTED);
b411b363
PR
3260 /* if we had an established connection
3261 * and one of the nodes newly attaches a disk */
4ac4aada 3262 cr |= (os.conn == C_CONNECTED &&
b411b363 3263 (peer_state.disk == D_NEGOTIATING ||
4ac4aada 3264 os.disk == D_NEGOTIATING));
b411b363
PR
3265 /* if we have both been inconsistent, and the peer has been
3266 * forced to be UpToDate with --overwrite-data */
3267 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3268 /* if we had been plain connected, and the admin requested to
3269 * start a sync by "invalidate" or "invalidate-remote" */
4ac4aada 3270 cr |= (os.conn == C_CONNECTED &&
b411b363
PR
3271 (peer_state.conn >= C_STARTING_SYNC_S &&
3272 peer_state.conn <= C_WF_BITMAP_T));
3273
3274 if (cr)
4ac4aada 3275 ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
b411b363
PR
3276
3277 put_ldev(mdev);
4ac4aada
LE
3278 if (ns.conn == C_MASK) {
3279 ns.conn = C_CONNECTED;
b411b363 3280 if (mdev->state.disk == D_NEGOTIATING) {
82f59cc6 3281 drbd_force_state(mdev, NS(disk, D_FAILED));
b411b363
PR
3282 } else if (peer_state.disk == D_NEGOTIATING) {
3283 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3284 peer_state.disk = D_DISKLESS;
580b9767 3285 real_peer_disk = D_DISKLESS;
b411b363 3286 } else {
cf14c2e9 3287 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
81e84650 3288 return false;
4ac4aada 3289 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
b411b363 3290 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 3291 return false;
b411b363
PR
3292 }
3293 }
3294 }
3295
87eeee41 3296 spin_lock_irq(&mdev->tconn->req_lock);
4ac4aada 3297 if (mdev->state.i != os.i)
b411b363
PR
3298 goto retry;
3299 clear_bit(CONSIDER_RESYNC, &mdev->flags);
b411b363
PR
3300 ns.peer = peer_state.role;
3301 ns.pdsk = real_peer_disk;
3302 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4ac4aada 3303 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
b411b363 3304 ns.disk = mdev->new_state_tmp.disk;
4ac4aada
LE
3305 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3306 if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
481c6f50 3307 test_bit(NEW_CUR_UUID, &mdev->flags)) {
8554df1c 3308 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
481c6f50 3309 for temporal network outages! */
87eeee41 3310 spin_unlock_irq(&mdev->tconn->req_lock);
481c6f50
PR
3311 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3312 tl_clear(mdev);
3313 drbd_uuid_new_current(mdev);
3314 clear_bit(NEW_CUR_UUID, &mdev->flags);
3315 drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0));
81e84650 3316 return false;
481c6f50 3317 }
65d922c3 3318 rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
b411b363 3319 ns = mdev->state;
87eeee41 3320 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
3321
3322 if (rv < SS_SUCCESS) {
3323 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 3324 return false;
b411b363
PR
3325 }
3326
4ac4aada
LE
3327 if (os.conn > C_WF_REPORT_PARAMS) {
3328 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
b411b363
PR
3329 peer_state.disk != D_NEGOTIATING ) {
3330 /* we want resync, peer has not yet decided to sync... */
3331 /* Nowadays only used when forcing a node into primary role and
3332 setting its disk to UpToDate with that */
3333 drbd_send_uuids(mdev);
3334 drbd_send_state(mdev);
3335 }
3336 }
3337
89e58e75 3338 mdev->tconn->net_conf->want_lose = 0;
b411b363
PR
3339
3340 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3341
81e84650 3342 return true;
b411b363
PR
3343}
3344
d8763023
AG
3345static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packet cmd,
3346 unsigned int data_size)
b411b363 3347{
e42325a5 3348 struct p_rs_uuid *p = &mdev->tconn->data.rbuf.rs_uuid;
b411b363
PR
3349
3350 wait_event(mdev->misc_wait,
3351 mdev->state.conn == C_WF_SYNC_UUID ||
c4752ef1 3352 mdev->state.conn == C_BEHIND ||
b411b363
PR
3353 mdev->state.conn < C_CONNECTED ||
3354 mdev->state.disk < D_NEGOTIATING);
3355
3356 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3357
b411b363
PR
3358 /* Here the _drbd_uuid_ functions are right, current should
3359 _not_ be rotated into the history */
3360 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3361 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3362 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3363
62b0da3a 3364 drbd_print_uuids(mdev, "updated sync uuid");
b411b363
PR
3365 drbd_start_resync(mdev, C_SYNC_TARGET);
3366
3367 put_ldev(mdev);
3368 } else
3369 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3370
81e84650 3371 return true;
b411b363
PR
3372}
3373
2c46407d
AG
3374/**
3375 * receive_bitmap_plain
3376 *
3377 * Return 0 when done, 1 when another iteration is needed, and a negative error
3378 * code upon failure.
3379 */
3380static int
02918be2
PR
3381receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3382 unsigned long *buffer, struct bm_xfer_ctx *c)
b411b363
PR
3383{
3384 unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3385 unsigned want = num_words * sizeof(long);
2c46407d 3386 int err;
b411b363 3387
02918be2
PR
3388 if (want != data_size) {
3389 dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
2c46407d 3390 return -EIO;
b411b363
PR
3391 }
3392 if (want == 0)
2c46407d
AG
3393 return 0;
3394 err = drbd_recv(mdev, buffer, want);
3395 if (err != want) {
3396 if (err >= 0)
3397 err = -EIO;
3398 return err;
3399 }
b411b363
PR
3400
3401 drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3402
3403 c->word_offset += num_words;
3404 c->bit_offset = c->word_offset * BITS_PER_LONG;
3405 if (c->bit_offset > c->bm_bits)
3406 c->bit_offset = c->bm_bits;
3407
2c46407d 3408 return 1;
b411b363
PR
3409}
3410
2c46407d
AG
3411/**
3412 * recv_bm_rle_bits
3413 *
3414 * Return 0 when done, 1 when another iteration is needed, and a negative error
3415 * code upon failure.
3416 */
3417static int
b411b363
PR
3418recv_bm_rle_bits(struct drbd_conf *mdev,
3419 struct p_compressed_bm *p,
c6d25cfe
PR
3420 struct bm_xfer_ctx *c,
3421 unsigned int len)
b411b363
PR
3422{
3423 struct bitstream bs;
3424 u64 look_ahead;
3425 u64 rl;
3426 u64 tmp;
3427 unsigned long s = c->bit_offset;
3428 unsigned long e;
b411b363
PR
3429 int toggle = DCBP_get_start(p);
3430 int have;
3431 int bits;
3432
3433 bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3434
3435 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3436 if (bits < 0)
2c46407d 3437 return -EIO;
b411b363
PR
3438
3439 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3440 bits = vli_decode_bits(&rl, look_ahead);
3441 if (bits <= 0)
2c46407d 3442 return -EIO;
b411b363
PR
3443
3444 if (toggle) {
3445 e = s + rl -1;
3446 if (e >= c->bm_bits) {
3447 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
2c46407d 3448 return -EIO;
b411b363
PR
3449 }
3450 _drbd_bm_set_bits(mdev, s, e);
3451 }
3452
3453 if (have < bits) {
3454 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3455 have, bits, look_ahead,
3456 (unsigned int)(bs.cur.b - p->code),
3457 (unsigned int)bs.buf_len);
2c46407d 3458 return -EIO;
b411b363
PR
3459 }
3460 look_ahead >>= bits;
3461 have -= bits;
3462
3463 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3464 if (bits < 0)
2c46407d 3465 return -EIO;
b411b363
PR
3466 look_ahead |= tmp << have;
3467 have += bits;
3468 }
3469
3470 c->bit_offset = s;
3471 bm_xfer_ctx_bit_to_word_offset(c);
3472
2c46407d 3473 return (s != c->bm_bits);
b411b363
PR
3474}
3475
2c46407d
AG
3476/**
3477 * decode_bitmap_c
3478 *
3479 * Return 0 when done, 1 when another iteration is needed, and a negative error
3480 * code upon failure.
3481 */
3482static int
b411b363
PR
3483decode_bitmap_c(struct drbd_conf *mdev,
3484 struct p_compressed_bm *p,
c6d25cfe
PR
3485 struct bm_xfer_ctx *c,
3486 unsigned int len)
b411b363
PR
3487{
3488 if (DCBP_get_code(p) == RLE_VLI_Bits)
c6d25cfe 3489 return recv_bm_rle_bits(mdev, p, c, len);
b411b363
PR
3490
3491 /* other variants had been implemented for evaluation,
3492 * but have been dropped as this one turned out to be "best"
3493 * during all our tests. */
3494
3495 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3496 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
2c46407d 3497 return -EIO;
b411b363
PR
3498}
3499
3500void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3501 const char *direction, struct bm_xfer_ctx *c)
3502{
3503 /* what would it take to transfer it "plaintext" */
c012949a 3504 unsigned plain = sizeof(struct p_header) *
b411b363
PR
3505 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3506 + c->bm_words * sizeof(long);
3507 unsigned total = c->bytes[0] + c->bytes[1];
3508 unsigned r;
3509
3510 /* total can not be zero. but just in case: */
3511 if (total == 0)
3512 return;
3513
3514 /* don't report if not compressed */
3515 if (total >= plain)
3516 return;
3517
3518 /* total < plain. check for overflow, still */
3519 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3520 : (1000 * total / plain);
3521
3522 if (r > 1000)
3523 r = 1000;
3524
3525 r = 1000 - r;
3526 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3527 "total %u; compression: %u.%u%%\n",
3528 direction,
3529 c->bytes[1], c->packets[1],
3530 c->bytes[0], c->packets[0],
3531 total, r/10, r % 10);
3532}
3533
3534/* Since we are processing the bitfield from lower addresses to higher,
3535 it does not matter if the process it in 32 bit chunks or 64 bit
3536 chunks as long as it is little endian. (Understand it as byte stream,
3537 beginning with the lowest byte...) If we would use big endian
3538 we would need to process it from the highest address to the lowest,
3539 in order to be agnostic to the 32 vs 64 bits issue.
3540
3541 returns 0 on failure, 1 if we successfully received it. */
d8763023
AG
3542static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packet cmd,
3543 unsigned int data_size)
b411b363
PR
3544{
3545 struct bm_xfer_ctx c;
3546 void *buffer;
2c46407d 3547 int err;
81e84650 3548 int ok = false;
257d0af6 3549 struct p_header *h = &mdev->tconn->data.rbuf.header;
b411b363 3550
20ceb2b2
LE
3551 drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3552 /* you are supposed to send additional out-of-sync information
3553 * if you actually set bits during this phase */
b411b363
PR
3554
3555 /* maybe we should use some per thread scratch page,
3556 * and allocate that during initial device creation? */
3557 buffer = (unsigned long *) __get_free_page(GFP_NOIO);
3558 if (!buffer) {
3559 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3560 goto out;
3561 }
3562
3563 c = (struct bm_xfer_ctx) {
3564 .bm_bits = drbd_bm_bits(mdev),
3565 .bm_words = drbd_bm_words(mdev),
3566 };
3567
2c46407d 3568 for(;;) {
02918be2 3569 if (cmd == P_BITMAP) {
2c46407d 3570 err = receive_bitmap_plain(mdev, data_size, buffer, &c);
02918be2 3571 } else if (cmd == P_COMPRESSED_BITMAP) {
b411b363
PR
3572 /* MAYBE: sanity check that we speak proto >= 90,
3573 * and the feature is enabled! */
3574 struct p_compressed_bm *p;
3575
02918be2 3576 if (data_size > BM_PACKET_PAYLOAD_BYTES) {
b411b363
PR
3577 dev_err(DEV, "ReportCBitmap packet too large\n");
3578 goto out;
3579 }
3580 /* use the page buff */
3581 p = buffer;
3582 memcpy(p, h, sizeof(*h));
02918be2 3583 if (drbd_recv(mdev, p->head.payload, data_size) != data_size)
b411b363 3584 goto out;
004352fa
LE
3585 if (data_size <= (sizeof(*p) - sizeof(p->head))) {
3586 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
78fcbdae 3587 goto out;
b411b363 3588 }
c6d25cfe 3589 err = decode_bitmap_c(mdev, p, &c, data_size);
b411b363 3590 } else {
02918be2 3591 dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
b411b363
PR
3592 goto out;
3593 }
3594
02918be2 3595 c.packets[cmd == P_BITMAP]++;
257d0af6 3596 c.bytes[cmd == P_BITMAP] += sizeof(struct p_header) + data_size;
b411b363 3597
2c46407d
AG
3598 if (err <= 0) {
3599 if (err < 0)
3600 goto out;
b411b363 3601 break;
2c46407d 3602 }
02918be2 3603 if (!drbd_recv_header(mdev, &cmd, &data_size))
b411b363 3604 goto out;
2c46407d 3605 }
b411b363
PR
3606
3607 INFO_bm_xfer_stats(mdev, "receive", &c);
3608
3609 if (mdev->state.conn == C_WF_BITMAP_T) {
de1f8e4a
AG
3610 enum drbd_state_rv rv;
3611
b411b363
PR
3612 ok = !drbd_send_bitmap(mdev);
3613 if (!ok)
3614 goto out;
3615 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
de1f8e4a
AG
3616 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3617 D_ASSERT(rv == SS_SUCCESS);
b411b363
PR
3618 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3619 /* admin may have requested C_DISCONNECTING,
3620 * other threads may have noticed network errors */
3621 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3622 drbd_conn_str(mdev->state.conn));
3623 }
3624
81e84650 3625 ok = true;
b411b363 3626 out:
20ceb2b2 3627 drbd_bm_unlock(mdev);
b411b363
PR
3628 if (ok && mdev->state.conn == C_WF_BITMAP_S)
3629 drbd_start_resync(mdev, C_SYNC_SOURCE);
3630 free_page((unsigned long) buffer);
3631 return ok;
3632}
3633
d8763023
AG
3634static int receive_skip(struct drbd_conf *mdev, enum drbd_packet cmd,
3635 unsigned int data_size)
b411b363
PR
3636{
3637 /* TODO zero copy sink :) */
3638 static char sink[128];
3639 int size, want, r;
3640
02918be2
PR
3641 dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3642 cmd, data_size);
b411b363 3643
02918be2 3644 size = data_size;
b411b363
PR
3645 while (size > 0) {
3646 want = min_t(int, size, sizeof(sink));
3647 r = drbd_recv(mdev, sink, want);
841ce241
AG
3648 if (!expect(r > 0))
3649 break;
b411b363
PR
3650 size -= r;
3651 }
3652 return size == 0;
3653}
3654
d8763023
AG
3655static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packet cmd,
3656 unsigned int data_size)
0ced55a3 3657{
e7f52dfb
LE
3658 /* Make sure we've acked all the TCP data associated
3659 * with the data requests being unplugged */
e42325a5 3660 drbd_tcp_quickack(mdev->tconn->data.socket);
0ced55a3 3661
81e84650 3662 return true;
0ced55a3
PR
3663}
3664
d8763023
AG
3665static int receive_out_of_sync(struct drbd_conf *mdev, enum drbd_packet cmd,
3666 unsigned int data_size)
73a01a18 3667{
e42325a5 3668 struct p_block_desc *p = &mdev->tconn->data.rbuf.block_desc;
73a01a18 3669
f735e363
LE
3670 switch (mdev->state.conn) {
3671 case C_WF_SYNC_UUID:
3672 case C_WF_BITMAP_T:
3673 case C_BEHIND:
3674 break;
3675 default:
3676 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3677 drbd_conn_str(mdev->state.conn));
3678 }
3679
73a01a18
PR
3680 drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3681
81e84650 3682 return true;
73a01a18
PR
3683}
3684
d8763023
AG
3685typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packet cmd,
3686 unsigned int to_receive);
02918be2
PR
3687
3688struct data_cmd {
3689 int expect_payload;
3690 size_t pkt_size;
3691 drbd_cmd_handler_f function;
3692};
3693
3694static struct data_cmd drbd_cmd_handler[] = {
3695 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
3696 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
3697 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3698 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
257d0af6
PR
3699 [P_BITMAP] = { 1, sizeof(struct p_header), receive_bitmap } ,
3700 [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header), receive_bitmap } ,
3701 [P_UNPLUG_REMOTE] = { 0, sizeof(struct p_header), receive_UnplugRemote },
02918be2
PR
3702 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3703 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
257d0af6
PR
3704 [P_SYNC_PARAM] = { 1, sizeof(struct p_header), receive_SyncParam },
3705 [P_SYNC_PARAM89] = { 1, sizeof(struct p_header), receive_SyncParam },
02918be2
PR
3706 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
3707 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
3708 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
3709 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
3710 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
3711 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
3712 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3713 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3714 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3715 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
73a01a18 3716 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
b411b363
PR
3717 /* anything missing from this table is in
3718 * the asender_tbl, see get_asender_cmd */
02918be2 3719 [P_MAX_CMD] = { 0, 0, NULL },
b411b363
PR
3720};
3721
02918be2 3722/* All handler functions that expect a sub-header get that sub-heder in
e42325a5 3723 mdev->tconn->data.rbuf.header.head.payload.
02918be2 3724
e42325a5 3725 Usually in mdev->tconn->data.rbuf.header.head the callback can find the usual
02918be2
PR
3726 p_header, but they may not rely on that. Since there is also p_header95 !
3727 */
b411b363
PR
3728
3729static void drbdd(struct drbd_conf *mdev)
3730{
c012949a 3731 struct p_header *header = &mdev->tconn->data.rbuf.header;
02918be2 3732 unsigned int packet_size;
d8763023 3733 enum drbd_packet cmd;
02918be2
PR
3734 size_t shs; /* sub header size */
3735 int rv;
b411b363 3736
e6b3ea83 3737 while (get_t_state(&mdev->tconn->receiver) == RUNNING) {
b411b363 3738 drbd_thread_current_set_cpu(mdev);
02918be2
PR
3739 if (!drbd_recv_header(mdev, &cmd, &packet_size))
3740 goto err_out;
b411b363 3741
02918be2
PR
3742 if (unlikely(cmd >= P_MAX_CMD || !drbd_cmd_handler[cmd].function)) {
3743 dev_err(DEV, "unknown packet type %d, l: %d!\n", cmd, packet_size);
3744 goto err_out;
0b33a916 3745 }
b411b363 3746
c012949a 3747 shs = drbd_cmd_handler[cmd].pkt_size - sizeof(struct p_header);
02918be2
PR
3748 if (packet_size - shs > 0 && !drbd_cmd_handler[cmd].expect_payload) {
3749 dev_err(DEV, "No payload expected %s l:%d\n", cmdname(cmd), packet_size);
3750 goto err_out;
b411b363 3751 }
b411b363 3752
c13f7e1a 3753 if (shs) {
c012949a 3754 rv = drbd_recv(mdev, &header->payload, shs);
c13f7e1a 3755 if (unlikely(rv != shs)) {
0ddc5549
LE
3756 if (!signal_pending(current))
3757 dev_warn(DEV, "short read while reading sub header: rv=%d\n", rv);
c13f7e1a
LE
3758 goto err_out;
3759 }
3760 }
3761
02918be2 3762 rv = drbd_cmd_handler[cmd].function(mdev, cmd, packet_size - shs);
b411b363 3763
02918be2 3764 if (unlikely(!rv)) {
b411b363 3765 dev_err(DEV, "error receiving %s, l: %d!\n",
02918be2
PR
3766 cmdname(cmd), packet_size);
3767 goto err_out;
b411b363
PR
3768 }
3769 }
b411b363 3770
02918be2
PR
3771 if (0) {
3772 err_out:
3773 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3774 }
856c50c7
LE
3775 /* If we leave here, we probably want to update at least the
3776 * "Connected" indicator on stable storage. Do so explicitly here. */
3777 drbd_md_sync(mdev);
b411b363
PR
3778}
3779
191d3cc8 3780void drbd_flush_workqueue(struct drbd_tconn *tconn)
b411b363
PR
3781{
3782 struct drbd_wq_barrier barr;
3783
3784 barr.w.cb = w_prev_work_done;
3785 init_completion(&barr.done);
191d3cc8 3786 drbd_queue_work(&tconn->data.work, &barr.w);
b411b363
PR
3787 wait_for_completion(&barr.done);
3788}
3789
3790static void drbd_disconnect(struct drbd_conf *mdev)
3791{
3792 enum drbd_fencing_p fp;
3793 union drbd_state os, ns;
3794 int rv = SS_UNKNOWN_ERROR;
3795 unsigned int i;
3796
3797 if (mdev->state.conn == C_STANDALONE)
3798 return;
b411b363
PR
3799
3800 /* asender does not clean up anything. it must not interfere, either */
e6b3ea83 3801 drbd_thread_stop(&mdev->tconn->asender);
b411b363 3802 drbd_free_sock(mdev);
b411b363 3803
85719573 3804 /* wait for current activity to cease. */
87eeee41 3805 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
3806 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3807 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3808 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
87eeee41 3809 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
3810
3811 /* We do not have data structures that would allow us to
3812 * get the rs_pending_cnt down to 0 again.
3813 * * On C_SYNC_TARGET we do not have any data structures describing
3814 * the pending RSDataRequest's we have sent.
3815 * * On C_SYNC_SOURCE there is no data structure that tracks
3816 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3817 * And no, it is not the sum of the reference counts in the
3818 * resync_LRU. The resync_LRU tracks the whole operation including
3819 * the disk-IO, while the rs_pending_cnt only tracks the blocks
3820 * on the fly. */
3821 drbd_rs_cancel_all(mdev);
3822 mdev->rs_total = 0;
3823 mdev->rs_failed = 0;
3824 atomic_set(&mdev->rs_pending_cnt, 0);
3825 wake_up(&mdev->misc_wait);
3826
7fde2be9
PR
3827 del_timer(&mdev->request_timer);
3828
b411b363
PR
3829 /* make sure syncer is stopped and w_resume_next_sg queued */
3830 del_timer_sync(&mdev->resync_timer);
b411b363
PR
3831 resync_timer_fn((unsigned long)mdev);
3832
b411b363
PR
3833 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3834 * w_make_resync_request etc. which may still be on the worker queue
3835 * to be "canceled" */
191d3cc8 3836 drbd_flush_workqueue(mdev->tconn);
b411b363
PR
3837
3838 /* This also does reclaim_net_ee(). If we do this too early, we might
3839 * miss some resync ee and pages.*/
3840 drbd_process_done_ee(mdev);
3841
3842 kfree(mdev->p_uuid);
3843 mdev->p_uuid = NULL;
3844
fb22c402 3845 if (!is_susp(mdev->state))
b411b363
PR
3846 tl_clear(mdev);
3847
b411b363
PR
3848 dev_info(DEV, "Connection closed\n");
3849
3850 drbd_md_sync(mdev);
3851
3852 fp = FP_DONT_CARE;
3853 if (get_ldev(mdev)) {
3854 fp = mdev->ldev->dc.fencing;
3855 put_ldev(mdev);
3856 }
3857
87f7be4c
PR
3858 if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
3859 drbd_try_outdate_peer_async(mdev);
b411b363 3860
87eeee41 3861 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
3862 os = mdev->state;
3863 if (os.conn >= C_UNCONNECTED) {
3864 /* Do not restart in case we are C_DISCONNECTING */
3865 ns = os;
3866 ns.conn = C_UNCONNECTED;
3867 rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3868 }
87eeee41 3869 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
3870
3871 if (os.conn == C_DISCONNECTING) {
b2fb6dbe 3872 wait_event(mdev->tconn->net_cnt_wait, atomic_read(&mdev->tconn->net_cnt) == 0);
b411b363 3873
a0638456
PR
3874 crypto_free_hash(mdev->tconn->cram_hmac_tfm);
3875 mdev->tconn->cram_hmac_tfm = NULL;
b411b363 3876
89e58e75
PR
3877 kfree(mdev->tconn->net_conf);
3878 mdev->tconn->net_conf = NULL;
b411b363
PR
3879 drbd_request_state(mdev, NS(conn, C_STANDALONE));
3880 }
3881
20ceb2b2
LE
3882 /* serialize with bitmap writeout triggered by the state change,
3883 * if any. */
3884 wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
3885
b411b363
PR
3886 /* tcp_close and release of sendpage pages can be deferred. I don't
3887 * want to use SO_LINGER, because apparently it can be deferred for
3888 * more than 20 seconds (longest time I checked).
3889 *
3890 * Actually we don't care for exactly when the network stack does its
3891 * put_page(), but release our reference on these pages right here.
3892 */
3893 i = drbd_release_ee(mdev, &mdev->net_ee);
3894 if (i)
3895 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
435f0740
LE
3896 i = atomic_read(&mdev->pp_in_use_by_net);
3897 if (i)
3898 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
b411b363
PR
3899 i = atomic_read(&mdev->pp_in_use);
3900 if (i)
45bb912b 3901 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
b411b363
PR
3902
3903 D_ASSERT(list_empty(&mdev->read_ee));
3904 D_ASSERT(list_empty(&mdev->active_ee));
3905 D_ASSERT(list_empty(&mdev->sync_ee));
3906 D_ASSERT(list_empty(&mdev->done_ee));
3907
3908 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3909 atomic_set(&mdev->current_epoch->epoch_size, 0);
3910 D_ASSERT(list_empty(&mdev->current_epoch->list));
3911}
3912
3913/*
3914 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3915 * we can agree on is stored in agreed_pro_version.
3916 *
3917 * feature flags and the reserved array should be enough room for future
3918 * enhancements of the handshake protocol, and possible plugins...
3919 *
3920 * for now, they are expected to be zero, but ignored.
3921 */
3922static int drbd_send_handshake(struct drbd_conf *mdev)
3923{
e6b3ea83 3924 /* ASSERT current == mdev->tconn->receiver ... */
e42325a5 3925 struct p_handshake *p = &mdev->tconn->data.sbuf.handshake;
b411b363
PR
3926 int ok;
3927
e42325a5 3928 if (mutex_lock_interruptible(&mdev->tconn->data.mutex)) {
b411b363
PR
3929 dev_err(DEV, "interrupted during initial handshake\n");
3930 return 0; /* interrupted. not ok. */
3931 }
3932
e42325a5
PR
3933 if (mdev->tconn->data.socket == NULL) {
3934 mutex_unlock(&mdev->tconn->data.mutex);
b411b363
PR
3935 return 0;
3936 }
3937
3938 memset(p, 0, sizeof(*p));
3939 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3940 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
c012949a
PR
3941 ok = _drbd_send_cmd(mdev, mdev->tconn->data.socket, P_HAND_SHAKE,
3942 &p->head, sizeof(*p), 0 );
e42325a5 3943 mutex_unlock(&mdev->tconn->data.mutex);
b411b363
PR
3944 return ok;
3945}
3946
3947/*
3948 * return values:
3949 * 1 yes, we have a valid connection
3950 * 0 oops, did not work out, please try again
3951 * -1 peer talks different language,
3952 * no point in trying again, please go standalone.
3953 */
3954static int drbd_do_handshake(struct drbd_conf *mdev)
3955{
e6b3ea83 3956 /* ASSERT current == mdev->tconn->receiver ... */
e42325a5 3957 struct p_handshake *p = &mdev->tconn->data.rbuf.handshake;
02918be2
PR
3958 const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
3959 unsigned int length;
d8763023 3960 enum drbd_packet cmd;
b411b363
PR
3961 int rv;
3962
3963 rv = drbd_send_handshake(mdev);
3964 if (!rv)
3965 return 0;
3966
02918be2 3967 rv = drbd_recv_header(mdev, &cmd, &length);
b411b363
PR
3968 if (!rv)
3969 return 0;
3970
02918be2 3971 if (cmd != P_HAND_SHAKE) {
b411b363 3972 dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
02918be2 3973 cmdname(cmd), cmd);
b411b363
PR
3974 return -1;
3975 }
3976
02918be2 3977 if (length != expect) {
b411b363 3978 dev_err(DEV, "expected HandShake length: %u, received: %u\n",
02918be2 3979 expect, length);
b411b363
PR
3980 return -1;
3981 }
3982
3983 rv = drbd_recv(mdev, &p->head.payload, expect);
3984
3985 if (rv != expect) {
0ddc5549
LE
3986 if (!signal_pending(current))
3987 dev_warn(DEV, "short read receiving handshake packet: l=%u\n", rv);
b411b363
PR
3988 return 0;
3989 }
3990
b411b363
PR
3991 p->protocol_min = be32_to_cpu(p->protocol_min);
3992 p->protocol_max = be32_to_cpu(p->protocol_max);
3993 if (p->protocol_max == 0)
3994 p->protocol_max = p->protocol_min;
3995
3996 if (PRO_VERSION_MAX < p->protocol_min ||
3997 PRO_VERSION_MIN > p->protocol_max)
3998 goto incompat;
3999
31890f4a 4000 mdev->tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
b411b363
PR
4001
4002 dev_info(DEV, "Handshake successful: "
31890f4a 4003 "Agreed network protocol version %d\n", mdev->tconn->agreed_pro_version);
b411b363
PR
4004
4005 return 1;
4006
4007 incompat:
4008 dev_err(DEV, "incompatible DRBD dialects: "
4009 "I support %d-%d, peer supports %d-%d\n",
4010 PRO_VERSION_MIN, PRO_VERSION_MAX,
4011 p->protocol_min, p->protocol_max);
4012 return -1;
4013}
4014
4015#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4016static int drbd_do_auth(struct drbd_conf *mdev)
4017{
4018 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4019 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
b10d96cb 4020 return -1;
b411b363
PR
4021}
4022#else
4023#define CHALLENGE_LEN 64
b10d96cb
JT
4024
4025/* Return value:
4026 1 - auth succeeded,
4027 0 - failed, try again (network error),
4028 -1 - auth failed, don't try again.
4029*/
4030
b411b363
PR
4031static int drbd_do_auth(struct drbd_conf *mdev)
4032{
4033 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4034 struct scatterlist sg;
4035 char *response = NULL;
4036 char *right_response = NULL;
4037 char *peers_ch = NULL;
89e58e75 4038 unsigned int key_len = strlen(mdev->tconn->net_conf->shared_secret);
b411b363
PR
4039 unsigned int resp_size;
4040 struct hash_desc desc;
d8763023 4041 enum drbd_packet cmd;
02918be2 4042 unsigned int length;
b411b363
PR
4043 int rv;
4044
a0638456 4045 desc.tfm = mdev->tconn->cram_hmac_tfm;
b411b363
PR
4046 desc.flags = 0;
4047
a0638456 4048 rv = crypto_hash_setkey(mdev->tconn->cram_hmac_tfm,
89e58e75 4049 (u8 *)mdev->tconn->net_conf->shared_secret, key_len);
b411b363
PR
4050 if (rv) {
4051 dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
b10d96cb 4052 rv = -1;
b411b363
PR
4053 goto fail;
4054 }
4055
4056 get_random_bytes(my_challenge, CHALLENGE_LEN);
4057
4058 rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4059 if (!rv)
4060 goto fail;
4061
02918be2 4062 rv = drbd_recv_header(mdev, &cmd, &length);
b411b363
PR
4063 if (!rv)
4064 goto fail;
4065
02918be2 4066 if (cmd != P_AUTH_CHALLENGE) {
b411b363 4067 dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
02918be2 4068 cmdname(cmd), cmd);
b411b363
PR
4069 rv = 0;
4070 goto fail;
4071 }
4072
02918be2 4073 if (length > CHALLENGE_LEN * 2) {
b411b363 4074 dev_err(DEV, "expected AuthChallenge payload too big.\n");
b10d96cb 4075 rv = -1;
b411b363
PR
4076 goto fail;
4077 }
4078
02918be2 4079 peers_ch = kmalloc(length, GFP_NOIO);
b411b363
PR
4080 if (peers_ch == NULL) {
4081 dev_err(DEV, "kmalloc of peers_ch failed\n");
b10d96cb 4082 rv = -1;
b411b363
PR
4083 goto fail;
4084 }
4085
02918be2 4086 rv = drbd_recv(mdev, peers_ch, length);
b411b363 4087
02918be2 4088 if (rv != length) {
0ddc5549
LE
4089 if (!signal_pending(current))
4090 dev_warn(DEV, "short read AuthChallenge: l=%u\n", rv);
b411b363
PR
4091 rv = 0;
4092 goto fail;
4093 }
4094
a0638456 4095 resp_size = crypto_hash_digestsize(mdev->tconn->cram_hmac_tfm);
b411b363
PR
4096 response = kmalloc(resp_size, GFP_NOIO);
4097 if (response == NULL) {
4098 dev_err(DEV, "kmalloc of response failed\n");
b10d96cb 4099 rv = -1;
b411b363
PR
4100 goto fail;
4101 }
4102
4103 sg_init_table(&sg, 1);
02918be2 4104 sg_set_buf(&sg, peers_ch, length);
b411b363
PR
4105
4106 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4107 if (rv) {
4108 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4109 rv = -1;
b411b363
PR
4110 goto fail;
4111 }
4112
4113 rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4114 if (!rv)
4115 goto fail;
4116
02918be2 4117 rv = drbd_recv_header(mdev, &cmd, &length);
b411b363
PR
4118 if (!rv)
4119 goto fail;
4120
02918be2 4121 if (cmd != P_AUTH_RESPONSE) {
b411b363 4122 dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
02918be2 4123 cmdname(cmd), cmd);
b411b363
PR
4124 rv = 0;
4125 goto fail;
4126 }
4127
02918be2 4128 if (length != resp_size) {
b411b363
PR
4129 dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4130 rv = 0;
4131 goto fail;
4132 }
4133
4134 rv = drbd_recv(mdev, response , resp_size);
4135
4136 if (rv != resp_size) {
0ddc5549
LE
4137 if (!signal_pending(current))
4138 dev_warn(DEV, "short read receiving AuthResponse: l=%u\n", rv);
b411b363
PR
4139 rv = 0;
4140 goto fail;
4141 }
4142
4143 right_response = kmalloc(resp_size, GFP_NOIO);
2d1ee87d 4144 if (right_response == NULL) {
b411b363 4145 dev_err(DEV, "kmalloc of right_response failed\n");
b10d96cb 4146 rv = -1;
b411b363
PR
4147 goto fail;
4148 }
4149
4150 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4151
4152 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4153 if (rv) {
4154 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4155 rv = -1;
b411b363
PR
4156 goto fail;
4157 }
4158
4159 rv = !memcmp(response, right_response, resp_size);
4160
4161 if (rv)
4162 dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
89e58e75 4163 resp_size, mdev->tconn->net_conf->cram_hmac_alg);
b10d96cb
JT
4164 else
4165 rv = -1;
b411b363
PR
4166
4167 fail:
4168 kfree(peers_ch);
4169 kfree(response);
4170 kfree(right_response);
4171
4172 return rv;
4173}
4174#endif
4175
4176int drbdd_init(struct drbd_thread *thi)
4177{
4178 struct drbd_conf *mdev = thi->mdev;
4179 unsigned int minor = mdev_to_minor(mdev);
4180 int h;
4181
4182 sprintf(current->comm, "drbd%d_receiver", minor);
4183
4184 dev_info(DEV, "receiver (re)started\n");
4185
4186 do {
4187 h = drbd_connect(mdev);
4188 if (h == 0) {
4189 drbd_disconnect(mdev);
20ee6390 4190 schedule_timeout_interruptible(HZ);
b411b363
PR
4191 }
4192 if (h == -1) {
4193 dev_warn(DEV, "Discarding network configuration.\n");
4194 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4195 }
4196 } while (h == 0);
4197
4198 if (h > 0) {
b2fb6dbe 4199 if (get_net_conf(mdev->tconn)) {
b411b363 4200 drbdd(mdev);
b2fb6dbe 4201 put_net_conf(mdev->tconn);
b411b363
PR
4202 }
4203 }
4204
4205 drbd_disconnect(mdev);
4206
4207 dev_info(DEV, "receiver terminated\n");
4208 return 0;
4209}
4210
4211/* ********* acknowledge sender ******** */
4212
d8763023 4213static int got_RqSReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4214{
257d0af6 4215 struct p_req_state_reply *p = &mdev->tconn->meta.rbuf.req_state_reply;
b411b363
PR
4216
4217 int retcode = be32_to_cpu(p->retcode);
4218
4219 if (retcode >= SS_SUCCESS) {
4220 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4221 } else {
4222 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4223 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4224 drbd_set_st_err_str(retcode), retcode);
4225 }
4226 wake_up(&mdev->state_wait);
4227
81e84650 4228 return true;
b411b363
PR
4229}
4230
d8763023 4231static int got_Ping(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363
PR
4232{
4233 return drbd_send_ping_ack(mdev);
4234
4235}
4236
d8763023 4237static int got_PingAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363
PR
4238{
4239 /* restore idle timeout */
e42325a5 4240 mdev->tconn->meta.socket->sk->sk_rcvtimeo = mdev->tconn->net_conf->ping_int*HZ;
309d1608
PR
4241 if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4242 wake_up(&mdev->misc_wait);
b411b363 4243
81e84650 4244 return true;
b411b363
PR
4245}
4246
d8763023 4247static int got_IsInSync(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4248{
257d0af6 4249 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4250 sector_t sector = be64_to_cpu(p->sector);
4251 int blksize = be32_to_cpu(p->blksize);
4252
31890f4a 4253 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
b411b363
PR
4254
4255 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4256
1d53f09e
LE
4257 if (get_ldev(mdev)) {
4258 drbd_rs_complete_io(mdev, sector);
4259 drbd_set_in_sync(mdev, sector, blksize);
4260 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4261 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4262 put_ldev(mdev);
4263 }
b411b363 4264 dec_rs_pending(mdev);
778f271d 4265 atomic_add(blksize >> 9, &mdev->rs_sect_in);
b411b363 4266
81e84650 4267 return true;
b411b363
PR
4268}
4269
bc9c5c41
AG
4270static int
4271validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4272 struct rb_root *root, const char *func,
4273 enum drbd_req_event what, bool missing_ok)
b411b363
PR
4274{
4275 struct drbd_request *req;
4276 struct bio_and_error m;
4277
87eeee41 4278 spin_lock_irq(&mdev->tconn->req_lock);
bc9c5c41 4279 req = find_request(mdev, root, id, sector, missing_ok, func);
b411b363 4280 if (unlikely(!req)) {
87eeee41 4281 spin_unlock_irq(&mdev->tconn->req_lock);
81e84650 4282 return false;
b411b363
PR
4283 }
4284 __req_mod(req, what, &m);
87eeee41 4285 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
4286
4287 if (m.bio)
4288 complete_master_bio(mdev, &m);
81e84650 4289 return true;
b411b363
PR
4290}
4291
d8763023 4292static int got_BlockAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4293{
257d0af6 4294 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4295 sector_t sector = be64_to_cpu(p->sector);
4296 int blksize = be32_to_cpu(p->blksize);
4297 enum drbd_req_event what;
4298
4299 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4300
579b57ed 4301 if (p->block_id == ID_SYNCER) {
b411b363
PR
4302 drbd_set_in_sync(mdev, sector, blksize);
4303 dec_rs_pending(mdev);
81e84650 4304 return true;
b411b363 4305 }
257d0af6 4306 switch (cmd) {
b411b363 4307 case P_RS_WRITE_ACK:
89e58e75 4308 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4309 what = WRITE_ACKED_BY_PEER_AND_SIS;
b411b363
PR
4310 break;
4311 case P_WRITE_ACK:
89e58e75 4312 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4313 what = WRITE_ACKED_BY_PEER;
b411b363
PR
4314 break;
4315 case P_RECV_ACK:
89e58e75 4316 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B);
8554df1c 4317 what = RECV_ACKED_BY_PEER;
b411b363
PR
4318 break;
4319 case P_DISCARD_ACK:
89e58e75 4320 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4321 what = CONFLICT_DISCARDED_BY_PEER;
b411b363
PR
4322 break;
4323 default:
4324 D_ASSERT(0);
81e84650 4325 return false;
b411b363
PR
4326 }
4327
4328 return validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41
AG
4329 &mdev->write_requests, __func__,
4330 what, false);
b411b363
PR
4331}
4332
d8763023 4333static int got_NegAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4334{
257d0af6 4335 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363 4336 sector_t sector = be64_to_cpu(p->sector);
2deb8336 4337 int size = be32_to_cpu(p->blksize);
89e58e75
PR
4338 bool missing_ok = mdev->tconn->net_conf->wire_protocol == DRBD_PROT_A ||
4339 mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B;
c3afd8f5 4340 bool found;
b411b363
PR
4341
4342 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4343
579b57ed 4344 if (p->block_id == ID_SYNCER) {
b411b363
PR
4345 dec_rs_pending(mdev);
4346 drbd_rs_failed_io(mdev, sector, size);
81e84650 4347 return true;
b411b363 4348 }
2deb8336 4349
c3afd8f5 4350 found = validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41 4351 &mdev->write_requests, __func__,
8554df1c 4352 NEG_ACKED, missing_ok);
c3afd8f5
AG
4353 if (!found) {
4354 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4355 The master bio might already be completed, therefore the
4356 request is no longer in the collision hash. */
4357 /* In Protocol B we might already have got a P_RECV_ACK
4358 but then get a P_NEG_ACK afterwards. */
4359 if (!missing_ok)
2deb8336 4360 return false;
c3afd8f5 4361 drbd_set_out_of_sync(mdev, sector, size);
2deb8336 4362 }
2deb8336 4363 return true;
b411b363
PR
4364}
4365
d8763023 4366static int got_NegDReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4367{
257d0af6 4368 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4369 sector_t sector = be64_to_cpu(p->sector);
4370
4371 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4372 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4373 (unsigned long long)sector, be32_to_cpu(p->blksize));
4374
4375 return validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41 4376 &mdev->read_requests, __func__,
8554df1c 4377 NEG_ACKED, false);
b411b363
PR
4378}
4379
d8763023 4380static int got_NegRSDReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363
PR
4381{
4382 sector_t sector;
4383 int size;
257d0af6 4384 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4385
4386 sector = be64_to_cpu(p->sector);
4387 size = be32_to_cpu(p->blksize);
b411b363
PR
4388
4389 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4390
4391 dec_rs_pending(mdev);
4392
4393 if (get_ldev_if_state(mdev, D_FAILED)) {
4394 drbd_rs_complete_io(mdev, sector);
257d0af6 4395 switch (cmd) {
d612d309
PR
4396 case P_NEG_RS_DREPLY:
4397 drbd_rs_failed_io(mdev, sector, size);
4398 case P_RS_CANCEL:
4399 break;
4400 default:
4401 D_ASSERT(0);
4402 put_ldev(mdev);
4403 return false;
4404 }
b411b363
PR
4405 put_ldev(mdev);
4406 }
4407
81e84650 4408 return true;
b411b363
PR
4409}
4410
d8763023 4411static int got_BarrierAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4412{
257d0af6 4413 struct p_barrier_ack *p = &mdev->tconn->meta.rbuf.barrier_ack;
b411b363
PR
4414
4415 tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4416
c4752ef1
PR
4417 if (mdev->state.conn == C_AHEAD &&
4418 atomic_read(&mdev->ap_in_flight) == 0 &&
370a43e7
PR
4419 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4420 mdev->start_resync_timer.expires = jiffies + HZ;
4421 add_timer(&mdev->start_resync_timer);
c4752ef1
PR
4422 }
4423
81e84650 4424 return true;
b411b363
PR
4425}
4426
d8763023 4427static int got_OVResult(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4428{
257d0af6 4429 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4430 struct drbd_work *w;
4431 sector_t sector;
4432 int size;
4433
4434 sector = be64_to_cpu(p->sector);
4435 size = be32_to_cpu(p->blksize);
4436
4437 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4438
4439 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4440 drbd_ov_oos_found(mdev, sector, size);
4441 else
4442 ov_oos_print(mdev);
4443
1d53f09e 4444 if (!get_ldev(mdev))
81e84650 4445 return true;
1d53f09e 4446
b411b363
PR
4447 drbd_rs_complete_io(mdev, sector);
4448 dec_rs_pending(mdev);
4449
ea5442af
LE
4450 --mdev->ov_left;
4451
4452 /* let's advance progress step marks only for every other megabyte */
4453 if ((mdev->ov_left & 0x200) == 0x200)
4454 drbd_advance_rs_marks(mdev, mdev->ov_left);
4455
4456 if (mdev->ov_left == 0) {
b411b363
PR
4457 w = kmalloc(sizeof(*w), GFP_NOIO);
4458 if (w) {
4459 w->cb = w_ov_finished;
e42325a5 4460 drbd_queue_work_front(&mdev->tconn->data.work, w);
b411b363
PR
4461 } else {
4462 dev_err(DEV, "kmalloc(w) failed.");
4463 ov_oos_print(mdev);
4464 drbd_resync_finished(mdev);
4465 }
4466 }
1d53f09e 4467 put_ldev(mdev);
81e84650 4468 return true;
b411b363
PR
4469}
4470
d8763023 4471static int got_skip(struct drbd_conf *mdev, enum drbd_packet cmd)
0ced55a3 4472{
81e84650 4473 return true;
0ced55a3
PR
4474}
4475
b411b363
PR
4476struct asender_cmd {
4477 size_t pkt_size;
d8763023 4478 int (*process)(struct drbd_conf *mdev, enum drbd_packet cmd);
b411b363
PR
4479};
4480
4481static struct asender_cmd *get_asender_cmd(int cmd)
4482{
4483 static struct asender_cmd asender_tbl[] = {
4484 /* anything missing from this table is in
4485 * the drbd_cmd_handler (drbd_default_handler) table,
4486 * see the beginning of drbdd() */
257d0af6
PR
4487 [P_PING] = { sizeof(struct p_header), got_Ping },
4488 [P_PING_ACK] = { sizeof(struct p_header), got_PingAck },
b411b363
PR
4489 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4490 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4491 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4492 [P_DISCARD_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4493 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
4494 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
4495 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply},
4496 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
4497 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
4498 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4499 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
02918be2 4500 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
d612d309 4501 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply},
b411b363
PR
4502 [P_MAX_CMD] = { 0, NULL },
4503 };
4504 if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4505 return NULL;
4506 return &asender_tbl[cmd];
4507}
4508
4509int drbd_asender(struct drbd_thread *thi)
4510{
4511 struct drbd_conf *mdev = thi->mdev;
257d0af6 4512 struct p_header *h = &mdev->tconn->meta.rbuf.header;
b411b363
PR
4513 struct asender_cmd *cmd = NULL;
4514
257d0af6 4515 int rv;
b411b363
PR
4516 void *buf = h;
4517 int received = 0;
257d0af6 4518 int expect = sizeof(struct p_header);
f36af18c 4519 int ping_timeout_active = 0;
257d0af6 4520 int empty, pkt_size;
d8763023 4521 enum drbd_packet cmd_nr;
b411b363
PR
4522
4523 sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4524
4525 current->policy = SCHED_RR; /* Make this a realtime task! */
4526 current->rt_priority = 2; /* more important than all other tasks */
4527
e77a0a5c 4528 while (get_t_state(thi) == RUNNING) {
b411b363
PR
4529 drbd_thread_current_set_cpu(mdev);
4530 if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
841ce241
AG
4531 if (!drbd_send_ping(mdev)) {
4532 dev_err(DEV, "drbd_send_ping has failed\n");
4533 goto reconnect;
4534 }
e42325a5 4535 mdev->tconn->meta.socket->sk->sk_rcvtimeo =
89e58e75 4536 mdev->tconn->net_conf->ping_timeo*HZ/10;
f36af18c 4537 ping_timeout_active = 1;
b411b363
PR
4538 }
4539
4540 /* conditionally cork;
4541 * it may hurt latency if we cork without much to send */
89e58e75 4542 if (!mdev->tconn->net_conf->no_cork &&
b411b363 4543 3 < atomic_read(&mdev->unacked_cnt))
e42325a5 4544 drbd_tcp_cork(mdev->tconn->meta.socket);
b411b363
PR
4545 while (1) {
4546 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4547 flush_signals(current);
0f8488e1 4548 if (!drbd_process_done_ee(mdev))
b411b363 4549 goto reconnect;
b411b363
PR
4550 /* to avoid race with newly queued ACKs */
4551 set_bit(SIGNAL_ASENDER, &mdev->flags);
87eeee41 4552 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 4553 empty = list_empty(&mdev->done_ee);
87eeee41 4554 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
4555 /* new ack may have been queued right here,
4556 * but then there is also a signal pending,
4557 * and we start over... */
4558 if (empty)
4559 break;
4560 }
4561 /* but unconditionally uncork unless disabled */
89e58e75 4562 if (!mdev->tconn->net_conf->no_cork)
e42325a5 4563 drbd_tcp_uncork(mdev->tconn->meta.socket);
b411b363
PR
4564
4565 /* short circuit, recv_msg would return EINTR anyways. */
4566 if (signal_pending(current))
4567 continue;
4568
e42325a5 4569 rv = drbd_recv_short(mdev, mdev->tconn->meta.socket,
b411b363
PR
4570 buf, expect-received, 0);
4571 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4572
4573 flush_signals(current);
4574
4575 /* Note:
4576 * -EINTR (on meta) we got a signal
4577 * -EAGAIN (on meta) rcvtimeo expired
4578 * -ECONNRESET other side closed the connection
4579 * -ERESTARTSYS (on data) we got a signal
4580 * rv < 0 other than above: unexpected error!
4581 * rv == expected: full header or command
4582 * rv < expected: "woken" by signal during receive
4583 * rv == 0 : "connection shut down by peer"
4584 */
4585 if (likely(rv > 0)) {
4586 received += rv;
4587 buf += rv;
4588 } else if (rv == 0) {
4589 dev_err(DEV, "meta connection shut down by peer.\n");
4590 goto reconnect;
4591 } else if (rv == -EAGAIN) {
cb6518cb
LE
4592 /* If the data socket received something meanwhile,
4593 * that is good enough: peer is still alive. */
31890f4a 4594 if (time_after(mdev->tconn->last_received,
e42325a5 4595 jiffies - mdev->tconn->meta.socket->sk->sk_rcvtimeo))
cb6518cb 4596 continue;
f36af18c 4597 if (ping_timeout_active) {
b411b363
PR
4598 dev_err(DEV, "PingAck did not arrive in time.\n");
4599 goto reconnect;
4600 }
4601 set_bit(SEND_PING, &mdev->flags);
4602 continue;
4603 } else if (rv == -EINTR) {
4604 continue;
4605 } else {
4606 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4607 goto reconnect;
4608 }
4609
4610 if (received == expect && cmd == NULL) {
257d0af6 4611 if (!decode_header(mdev, h, &cmd_nr, &pkt_size))
b411b363 4612 goto reconnect;
257d0af6 4613 cmd = get_asender_cmd(cmd_nr);
b411b363 4614 if (unlikely(cmd == NULL)) {
257d0af6
PR
4615 dev_err(DEV, "unknown command %d on meta (l: %d)\n",
4616 cmd_nr, pkt_size);
b411b363
PR
4617 goto disconnect;
4618 }
4619 expect = cmd->pkt_size;
257d0af6
PR
4620 if (pkt_size != expect - sizeof(struct p_header)) {
4621 dev_err(DEV, "Wrong packet size on meta (c: %d, l: %d)\n",
4622 cmd_nr, pkt_size);
b411b363 4623 goto reconnect;
257d0af6 4624 }
b411b363
PR
4625 }
4626 if (received == expect) {
31890f4a 4627 mdev->tconn->last_received = jiffies;
b411b363 4628 D_ASSERT(cmd != NULL);
257d0af6 4629 if (!cmd->process(mdev, cmd_nr))
b411b363
PR
4630 goto reconnect;
4631
f36af18c
LE
4632 /* the idle_timeout (ping-int)
4633 * has been restored in got_PingAck() */
4634 if (cmd == get_asender_cmd(P_PING_ACK))
4635 ping_timeout_active = 0;
4636
b411b363
PR
4637 buf = h;
4638 received = 0;
257d0af6 4639 expect = sizeof(struct p_header);
b411b363
PR
4640 cmd = NULL;
4641 }
4642 }
4643
4644 if (0) {
4645reconnect:
4646 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
856c50c7 4647 drbd_md_sync(mdev);
b411b363
PR
4648 }
4649 if (0) {
4650disconnect:
4651 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
856c50c7 4652 drbd_md_sync(mdev);
b411b363
PR
4653 }
4654 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4655
4656 D_ASSERT(mdev->state.conn < C_CONNECTED);
4657 dev_info(DEV, "asender terminated\n");
4658
4659 return 0;
4660}