]> git.proxmox.com Git - mirror_ubuntu-bionic-kernel.git/blame - drivers/block/drbd/drbd_receiver.c
drbd: reduce code duplication when receiving data requests
[mirror_ubuntu-bionic-kernel.git] / drivers / block / drbd / drbd_receiver.c
CommitLineData
b411b363
PR
1/*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
b411b363
PR
26#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
b411b363
PR
31#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
39#include <linux/smp_lock.h>
40#include <linux/pkt_sched.h>
41#define __KERNEL_SYSCALLS__
42#include <linux/unistd.h>
43#include <linux/vmalloc.h>
44#include <linux/random.h>
b411b363
PR
45#include <linux/string.h>
46#include <linux/scatterlist.h>
47#include "drbd_int.h"
b411b363
PR
48#include "drbd_req.h"
49
50#include "drbd_vli.h"
51
52struct flush_work {
53 struct drbd_work w;
54 struct drbd_epoch *epoch;
55};
56
57enum finish_epoch {
58 FE_STILL_LIVE,
59 FE_DESTROYED,
60 FE_RECYCLED,
61};
62
63static int drbd_do_handshake(struct drbd_conf *mdev);
64static int drbd_do_auth(struct drbd_conf *mdev);
65
66static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
67static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
68
69static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
70{
71 struct drbd_epoch *prev;
72 spin_lock(&mdev->epoch_lock);
73 prev = list_entry(epoch->list.prev, struct drbd_epoch, list);
74 if (prev == epoch || prev == mdev->current_epoch)
75 prev = NULL;
76 spin_unlock(&mdev->epoch_lock);
77 return prev;
78}
79
80#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
81
45bb912b
LE
82/*
83 * some helper functions to deal with single linked page lists,
84 * page->private being our "next" pointer.
85 */
86
87/* If at least n pages are linked at head, get n pages off.
88 * Otherwise, don't modify head, and return NULL.
89 * Locking is the responsibility of the caller.
90 */
91static struct page *page_chain_del(struct page **head, int n)
92{
93 struct page *page;
94 struct page *tmp;
95
96 BUG_ON(!n);
97 BUG_ON(!head);
98
99 page = *head;
23ce4227
PR
100
101 if (!page)
102 return NULL;
103
45bb912b
LE
104 while (page) {
105 tmp = page_chain_next(page);
106 if (--n == 0)
107 break; /* found sufficient pages */
108 if (tmp == NULL)
109 /* insufficient pages, don't use any of them. */
110 return NULL;
111 page = tmp;
112 }
113
114 /* add end of list marker for the returned list */
115 set_page_private(page, 0);
116 /* actual return value, and adjustment of head */
117 page = *head;
118 *head = tmp;
119 return page;
120}
121
122/* may be used outside of locks to find the tail of a (usually short)
123 * "private" page chain, before adding it back to a global chain head
124 * with page_chain_add() under a spinlock. */
125static struct page *page_chain_tail(struct page *page, int *len)
126{
127 struct page *tmp;
128 int i = 1;
129 while ((tmp = page_chain_next(page)))
130 ++i, page = tmp;
131 if (len)
132 *len = i;
133 return page;
134}
135
136static int page_chain_free(struct page *page)
137{
138 struct page *tmp;
139 int i = 0;
140 page_chain_for_each_safe(page, tmp) {
141 put_page(page);
142 ++i;
143 }
144 return i;
145}
146
147static void page_chain_add(struct page **head,
148 struct page *chain_first, struct page *chain_last)
149{
150#if 1
151 struct page *tmp;
152 tmp = page_chain_tail(chain_first, NULL);
153 BUG_ON(tmp != chain_last);
154#endif
155
156 /* add chain to head */
157 set_page_private(chain_last, (unsigned long)*head);
158 *head = chain_first;
159}
160
161static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
b411b363
PR
162{
163 struct page *page = NULL;
45bb912b
LE
164 struct page *tmp = NULL;
165 int i = 0;
b411b363
PR
166
167 /* Yes, testing drbd_pp_vacant outside the lock is racy.
168 * So what. It saves a spin_lock. */
45bb912b 169 if (drbd_pp_vacant >= number) {
b411b363 170 spin_lock(&drbd_pp_lock);
45bb912b
LE
171 page = page_chain_del(&drbd_pp_pool, number);
172 if (page)
173 drbd_pp_vacant -= number;
b411b363 174 spin_unlock(&drbd_pp_lock);
45bb912b
LE
175 if (page)
176 return page;
b411b363 177 }
45bb912b 178
b411b363
PR
179 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
180 * "criss-cross" setup, that might cause write-out on some other DRBD,
181 * which in turn might block on the other node at this very place. */
45bb912b
LE
182 for (i = 0; i < number; i++) {
183 tmp = alloc_page(GFP_TRY);
184 if (!tmp)
185 break;
186 set_page_private(tmp, (unsigned long)page);
187 page = tmp;
188 }
189
190 if (i == number)
191 return page;
192
193 /* Not enough pages immediately available this time.
194 * No need to jump around here, drbd_pp_alloc will retry this
195 * function "soon". */
196 if (page) {
197 tmp = page_chain_tail(page, NULL);
198 spin_lock(&drbd_pp_lock);
199 page_chain_add(&drbd_pp_pool, page, tmp);
200 drbd_pp_vacant += i;
201 spin_unlock(&drbd_pp_lock);
202 }
203 return NULL;
b411b363
PR
204}
205
206/* kick lower level device, if we have more than (arbitrary number)
207 * reference counts on it, which typically are locally submitted io
208 * requests. don't use unacked_cnt, so we speed up proto A and B, too. */
209static void maybe_kick_lo(struct drbd_conf *mdev)
210{
211 if (atomic_read(&mdev->local_cnt) >= mdev->net_conf->unplug_watermark)
212 drbd_kick_lo(mdev);
213}
214
215static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
216{
217 struct drbd_epoch_entry *e;
218 struct list_head *le, *tle;
219
220 /* The EEs are always appended to the end of the list. Since
221 they are sent in order over the wire, they have to finish
222 in order. As soon as we see the first not finished we can
223 stop to examine the list... */
224
225 list_for_each_safe(le, tle, &mdev->net_ee) {
226 e = list_entry(le, struct drbd_epoch_entry, w.list);
45bb912b 227 if (drbd_ee_has_active_page(e))
b411b363
PR
228 break;
229 list_move(le, to_be_freed);
230 }
231}
232
233static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
234{
235 LIST_HEAD(reclaimed);
236 struct drbd_epoch_entry *e, *t;
237
238 maybe_kick_lo(mdev);
239 spin_lock_irq(&mdev->req_lock);
240 reclaim_net_ee(mdev, &reclaimed);
241 spin_unlock_irq(&mdev->req_lock);
242
243 list_for_each_entry_safe(e, t, &reclaimed, w.list)
244 drbd_free_ee(mdev, e);
245}
246
247/**
45bb912b 248 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
b411b363 249 * @mdev: DRBD device.
45bb912b
LE
250 * @number: number of pages requested
251 * @retry: whether to retry, if not enough pages are available right now
252 *
253 * Tries to allocate number pages, first from our own page pool, then from
254 * the kernel, unless this allocation would exceed the max_buffers setting.
255 * Possibly retry until DRBD frees sufficient pages somewhere else.
b411b363 256 *
45bb912b 257 * Returns a page chain linked via page->private.
b411b363 258 */
45bb912b 259static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
b411b363
PR
260{
261 struct page *page = NULL;
262 DEFINE_WAIT(wait);
263
45bb912b
LE
264 /* Yes, we may run up to @number over max_buffers. If we
265 * follow it strictly, the admin will get it wrong anyways. */
266 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
267 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363 268
45bb912b 269 while (page == NULL) {
b411b363
PR
270 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
271
272 drbd_kick_lo_and_reclaim_net(mdev);
273
274 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
45bb912b 275 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363
PR
276 if (page)
277 break;
278 }
279
280 if (!retry)
281 break;
282
283 if (signal_pending(current)) {
284 dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
285 break;
286 }
287
288 schedule();
289 }
290 finish_wait(&drbd_pp_wait, &wait);
291
45bb912b
LE
292 if (page)
293 atomic_add(number, &mdev->pp_in_use);
b411b363
PR
294 return page;
295}
296
297/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
45bb912b
LE
298 * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
299 * Either links the page chain back to the global pool,
300 * or returns all pages to the system. */
b411b363
PR
301static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
302{
b411b363 303 int i;
45bb912b
LE
304 if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count)
305 i = page_chain_free(page);
306 else {
307 struct page *tmp;
308 tmp = page_chain_tail(page, &i);
309 spin_lock(&drbd_pp_lock);
310 page_chain_add(&drbd_pp_pool, page, tmp);
311 drbd_pp_vacant += i;
312 spin_unlock(&drbd_pp_lock);
b411b363 313 }
45bb912b
LE
314 atomic_sub(i, &mdev->pp_in_use);
315 i = atomic_read(&mdev->pp_in_use);
316 if (i < 0)
317 dev_warn(DEV, "ASSERTION FAILED: pp_in_use: %d < 0\n", i);
b411b363
PR
318 wake_up(&drbd_pp_wait);
319}
320
321/*
322You need to hold the req_lock:
323 _drbd_wait_ee_list_empty()
324
325You must not have the req_lock:
326 drbd_free_ee()
327 drbd_alloc_ee()
328 drbd_init_ee()
329 drbd_release_ee()
330 drbd_ee_fix_bhs()
331 drbd_process_done_ee()
332 drbd_clear_done_ee()
333 drbd_wait_ee_list_empty()
334*/
335
336struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
337 u64 id,
338 sector_t sector,
339 unsigned int data_size,
340 gfp_t gfp_mask) __must_hold(local)
341{
b411b363
PR
342 struct drbd_epoch_entry *e;
343 struct page *page;
45bb912b 344 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
b411b363
PR
345
346 if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
347 return NULL;
348
349 e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
350 if (!e) {
351 if (!(gfp_mask & __GFP_NOWARN))
352 dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
353 return NULL;
354 }
355
45bb912b
LE
356 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
357 if (!page)
358 goto fail;
b411b363 359
b411b363
PR
360 INIT_HLIST_NODE(&e->colision);
361 e->epoch = NULL;
45bb912b
LE
362 e->mdev = mdev;
363 e->pages = page;
364 atomic_set(&e->pending_bios, 0);
365 e->size = data_size;
b411b363 366 e->flags = 0;
45bb912b
LE
367 e->sector = sector;
368 e->sector = sector;
369 e->block_id = id;
b411b363 370
b411b363
PR
371 return e;
372
45bb912b 373 fail:
b411b363 374 mempool_free(e, drbd_ee_mempool);
b411b363
PR
375 return NULL;
376}
377
378void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
379{
c36c3ced
LE
380 if (e->flags & EE_HAS_DIGEST)
381 kfree(e->digest);
45bb912b
LE
382 drbd_pp_free(mdev, e->pages);
383 D_ASSERT(atomic_read(&e->pending_bios) == 0);
b411b363
PR
384 D_ASSERT(hlist_unhashed(&e->colision));
385 mempool_free(e, drbd_ee_mempool);
386}
387
388int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
389{
390 LIST_HEAD(work_list);
391 struct drbd_epoch_entry *e, *t;
392 int count = 0;
393
394 spin_lock_irq(&mdev->req_lock);
395 list_splice_init(list, &work_list);
396 spin_unlock_irq(&mdev->req_lock);
397
398 list_for_each_entry_safe(e, t, &work_list, w.list) {
399 drbd_free_ee(mdev, e);
400 count++;
401 }
402 return count;
403}
404
405
406/*
407 * This function is called from _asender only_
408 * but see also comments in _req_mod(,barrier_acked)
409 * and receive_Barrier.
410 *
411 * Move entries from net_ee to done_ee, if ready.
412 * Grab done_ee, call all callbacks, free the entries.
413 * The callbacks typically send out ACKs.
414 */
415static int drbd_process_done_ee(struct drbd_conf *mdev)
416{
417 LIST_HEAD(work_list);
418 LIST_HEAD(reclaimed);
419 struct drbd_epoch_entry *e, *t;
420 int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
421
422 spin_lock_irq(&mdev->req_lock);
423 reclaim_net_ee(mdev, &reclaimed);
424 list_splice_init(&mdev->done_ee, &work_list);
425 spin_unlock_irq(&mdev->req_lock);
426
427 list_for_each_entry_safe(e, t, &reclaimed, w.list)
428 drbd_free_ee(mdev, e);
429
430 /* possible callbacks here:
431 * e_end_block, and e_end_resync_block, e_send_discard_ack.
432 * all ignore the last argument.
433 */
434 list_for_each_entry_safe(e, t, &work_list, w.list) {
b411b363
PR
435 /* list_del not necessary, next/prev members not touched */
436 ok = e->w.cb(mdev, &e->w, !ok) && ok;
437 drbd_free_ee(mdev, e);
438 }
439 wake_up(&mdev->ee_wait);
440
441 return ok;
442}
443
444void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
445{
446 DEFINE_WAIT(wait);
447
448 /* avoids spin_lock/unlock
449 * and calling prepare_to_wait in the fast path */
450 while (!list_empty(head)) {
451 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
452 spin_unlock_irq(&mdev->req_lock);
453 drbd_kick_lo(mdev);
454 schedule();
455 finish_wait(&mdev->ee_wait, &wait);
456 spin_lock_irq(&mdev->req_lock);
457 }
458}
459
460void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
461{
462 spin_lock_irq(&mdev->req_lock);
463 _drbd_wait_ee_list_empty(mdev, head);
464 spin_unlock_irq(&mdev->req_lock);
465}
466
467/* see also kernel_accept; which is only present since 2.6.18.
468 * also we want to log which part of it failed, exactly */
469static int drbd_accept(struct drbd_conf *mdev, const char **what,
470 struct socket *sock, struct socket **newsock)
471{
472 struct sock *sk = sock->sk;
473 int err = 0;
474
475 *what = "listen";
476 err = sock->ops->listen(sock, 5);
477 if (err < 0)
478 goto out;
479
480 *what = "sock_create_lite";
481 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
482 newsock);
483 if (err < 0)
484 goto out;
485
486 *what = "accept";
487 err = sock->ops->accept(sock, *newsock, 0);
488 if (err < 0) {
489 sock_release(*newsock);
490 *newsock = NULL;
491 goto out;
492 }
493 (*newsock)->ops = sock->ops;
494
495out:
496 return err;
497}
498
499static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
500 void *buf, size_t size, int flags)
501{
502 mm_segment_t oldfs;
503 struct kvec iov = {
504 .iov_base = buf,
505 .iov_len = size,
506 };
507 struct msghdr msg = {
508 .msg_iovlen = 1,
509 .msg_iov = (struct iovec *)&iov,
510 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
511 };
512 int rv;
513
514 oldfs = get_fs();
515 set_fs(KERNEL_DS);
516 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
517 set_fs(oldfs);
518
519 return rv;
520}
521
522static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
523{
524 mm_segment_t oldfs;
525 struct kvec iov = {
526 .iov_base = buf,
527 .iov_len = size,
528 };
529 struct msghdr msg = {
530 .msg_iovlen = 1,
531 .msg_iov = (struct iovec *)&iov,
532 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
533 };
534 int rv;
535
536 oldfs = get_fs();
537 set_fs(KERNEL_DS);
538
539 for (;;) {
540 rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
541 if (rv == size)
542 break;
543
544 /* Note:
545 * ECONNRESET other side closed the connection
546 * ERESTARTSYS (on sock) we got a signal
547 */
548
549 if (rv < 0) {
550 if (rv == -ECONNRESET)
551 dev_info(DEV, "sock was reset by peer\n");
552 else if (rv != -ERESTARTSYS)
553 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
554 break;
555 } else if (rv == 0) {
556 dev_info(DEV, "sock was shut down by peer\n");
557 break;
558 } else {
559 /* signal came in, or peer/link went down,
560 * after we read a partial message
561 */
562 /* D_ASSERT(signal_pending(current)); */
563 break;
564 }
565 };
566
567 set_fs(oldfs);
568
569 if (rv != size)
570 drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
571
572 return rv;
573}
574
5dbf1673
LE
575/* quoting tcp(7):
576 * On individual connections, the socket buffer size must be set prior to the
577 * listen(2) or connect(2) calls in order to have it take effect.
578 * This is our wrapper to do so.
579 */
580static void drbd_setbufsize(struct socket *sock, unsigned int snd,
581 unsigned int rcv)
582{
583 /* open coded SO_SNDBUF, SO_RCVBUF */
584 if (snd) {
585 sock->sk->sk_sndbuf = snd;
586 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
587 }
588 if (rcv) {
589 sock->sk->sk_rcvbuf = rcv;
590 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
591 }
592}
593
b411b363
PR
594static struct socket *drbd_try_connect(struct drbd_conf *mdev)
595{
596 const char *what;
597 struct socket *sock;
598 struct sockaddr_in6 src_in6;
599 int err;
600 int disconnect_on_error = 1;
601
602 if (!get_net_conf(mdev))
603 return NULL;
604
605 what = "sock_create_kern";
606 err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
607 SOCK_STREAM, IPPROTO_TCP, &sock);
608 if (err < 0) {
609 sock = NULL;
610 goto out;
611 }
612
613 sock->sk->sk_rcvtimeo =
614 sock->sk->sk_sndtimeo = mdev->net_conf->try_connect_int*HZ;
5dbf1673
LE
615 drbd_setbufsize(sock, mdev->net_conf->sndbuf_size,
616 mdev->net_conf->rcvbuf_size);
b411b363
PR
617
618 /* explicitly bind to the configured IP as source IP
619 * for the outgoing connections.
620 * This is needed for multihomed hosts and to be
621 * able to use lo: interfaces for drbd.
622 * Make sure to use 0 as port number, so linux selects
623 * a free one dynamically.
624 */
625 memcpy(&src_in6, mdev->net_conf->my_addr,
626 min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
627 if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
628 src_in6.sin6_port = 0;
629 else
630 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
631
632 what = "bind before connect";
633 err = sock->ops->bind(sock,
634 (struct sockaddr *) &src_in6,
635 mdev->net_conf->my_addr_len);
636 if (err < 0)
637 goto out;
638
639 /* connect may fail, peer not yet available.
640 * stay C_WF_CONNECTION, don't go Disconnecting! */
641 disconnect_on_error = 0;
642 what = "connect";
643 err = sock->ops->connect(sock,
644 (struct sockaddr *)mdev->net_conf->peer_addr,
645 mdev->net_conf->peer_addr_len, 0);
646
647out:
648 if (err < 0) {
649 if (sock) {
650 sock_release(sock);
651 sock = NULL;
652 }
653 switch (-err) {
654 /* timeout, busy, signal pending */
655 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
656 case EINTR: case ERESTARTSYS:
657 /* peer not (yet) available, network problem */
658 case ECONNREFUSED: case ENETUNREACH:
659 case EHOSTDOWN: case EHOSTUNREACH:
660 disconnect_on_error = 0;
661 break;
662 default:
663 dev_err(DEV, "%s failed, err = %d\n", what, err);
664 }
665 if (disconnect_on_error)
666 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
667 }
668 put_net_conf(mdev);
669 return sock;
670}
671
672static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
673{
674 int timeo, err;
675 struct socket *s_estab = NULL, *s_listen;
676 const char *what;
677
678 if (!get_net_conf(mdev))
679 return NULL;
680
681 what = "sock_create_kern";
682 err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
683 SOCK_STREAM, IPPROTO_TCP, &s_listen);
684 if (err) {
685 s_listen = NULL;
686 goto out;
687 }
688
689 timeo = mdev->net_conf->try_connect_int * HZ;
690 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
691
692 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
693 s_listen->sk->sk_rcvtimeo = timeo;
694 s_listen->sk->sk_sndtimeo = timeo;
5dbf1673
LE
695 drbd_setbufsize(s_listen, mdev->net_conf->sndbuf_size,
696 mdev->net_conf->rcvbuf_size);
b411b363
PR
697
698 what = "bind before listen";
699 err = s_listen->ops->bind(s_listen,
700 (struct sockaddr *) mdev->net_conf->my_addr,
701 mdev->net_conf->my_addr_len);
702 if (err < 0)
703 goto out;
704
705 err = drbd_accept(mdev, &what, s_listen, &s_estab);
706
707out:
708 if (s_listen)
709 sock_release(s_listen);
710 if (err < 0) {
711 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
712 dev_err(DEV, "%s failed, err = %d\n", what, err);
713 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
714 }
715 }
716 put_net_conf(mdev);
717
718 return s_estab;
719}
720
721static int drbd_send_fp(struct drbd_conf *mdev,
722 struct socket *sock, enum drbd_packets cmd)
723{
724 struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
725
726 return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
727}
728
729static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
730{
731 struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
732 int rr;
733
734 rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
735
736 if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
737 return be16_to_cpu(h->command);
738
739 return 0xffff;
740}
741
742/**
743 * drbd_socket_okay() - Free the socket if its connection is not okay
744 * @mdev: DRBD device.
745 * @sock: pointer to the pointer to the socket.
746 */
747static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
748{
749 int rr;
750 char tb[4];
751
752 if (!*sock)
753 return FALSE;
754
755 rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
756
757 if (rr > 0 || rr == -EAGAIN) {
758 return TRUE;
759 } else {
760 sock_release(*sock);
761 *sock = NULL;
762 return FALSE;
763 }
764}
765
766/*
767 * return values:
768 * 1 yes, we have a valid connection
769 * 0 oops, did not work out, please try again
770 * -1 peer talks different language,
771 * no point in trying again, please go standalone.
772 * -2 We do not have a network config...
773 */
774static int drbd_connect(struct drbd_conf *mdev)
775{
776 struct socket *s, *sock, *msock;
777 int try, h, ok;
778
779 D_ASSERT(!mdev->data.socket);
780
b411b363
PR
781 if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
782 return -2;
783
784 clear_bit(DISCARD_CONCURRENT, &mdev->flags);
785
786 sock = NULL;
787 msock = NULL;
788
789 do {
790 for (try = 0;;) {
791 /* 3 tries, this should take less than a second! */
792 s = drbd_try_connect(mdev);
793 if (s || ++try >= 3)
794 break;
795 /* give the other side time to call bind() & listen() */
796 __set_current_state(TASK_INTERRUPTIBLE);
797 schedule_timeout(HZ / 10);
798 }
799
800 if (s) {
801 if (!sock) {
802 drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
803 sock = s;
804 s = NULL;
805 } else if (!msock) {
806 drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
807 msock = s;
808 s = NULL;
809 } else {
810 dev_err(DEV, "Logic error in drbd_connect()\n");
811 goto out_release_sockets;
812 }
813 }
814
815 if (sock && msock) {
816 __set_current_state(TASK_INTERRUPTIBLE);
817 schedule_timeout(HZ / 10);
818 ok = drbd_socket_okay(mdev, &sock);
819 ok = drbd_socket_okay(mdev, &msock) && ok;
820 if (ok)
821 break;
822 }
823
824retry:
825 s = drbd_wait_for_connect(mdev);
826 if (s) {
827 try = drbd_recv_fp(mdev, s);
828 drbd_socket_okay(mdev, &sock);
829 drbd_socket_okay(mdev, &msock);
830 switch (try) {
831 case P_HAND_SHAKE_S:
832 if (sock) {
833 dev_warn(DEV, "initial packet S crossed\n");
834 sock_release(sock);
835 }
836 sock = s;
837 break;
838 case P_HAND_SHAKE_M:
839 if (msock) {
840 dev_warn(DEV, "initial packet M crossed\n");
841 sock_release(msock);
842 }
843 msock = s;
844 set_bit(DISCARD_CONCURRENT, &mdev->flags);
845 break;
846 default:
847 dev_warn(DEV, "Error receiving initial packet\n");
848 sock_release(s);
849 if (random32() & 1)
850 goto retry;
851 }
852 }
853
854 if (mdev->state.conn <= C_DISCONNECTING)
855 goto out_release_sockets;
856 if (signal_pending(current)) {
857 flush_signals(current);
858 smp_rmb();
859 if (get_t_state(&mdev->receiver) == Exiting)
860 goto out_release_sockets;
861 }
862
863 if (sock && msock) {
864 ok = drbd_socket_okay(mdev, &sock);
865 ok = drbd_socket_okay(mdev, &msock) && ok;
866 if (ok)
867 break;
868 }
869 } while (1);
870
871 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
872 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
873
874 sock->sk->sk_allocation = GFP_NOIO;
875 msock->sk->sk_allocation = GFP_NOIO;
876
877 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
878 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
879
b411b363
PR
880 /* NOT YET ...
881 * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
882 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
883 * first set it to the P_HAND_SHAKE timeout,
884 * which we set to 4x the configured ping_timeout. */
885 sock->sk->sk_sndtimeo =
886 sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
887
888 msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
889 msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
890
891 /* we don't want delays.
892 * we use TCP_CORK where apropriate, though */
893 drbd_tcp_nodelay(sock);
894 drbd_tcp_nodelay(msock);
895
896 mdev->data.socket = sock;
897 mdev->meta.socket = msock;
898 mdev->last_received = jiffies;
899
900 D_ASSERT(mdev->asender.task == NULL);
901
902 h = drbd_do_handshake(mdev);
903 if (h <= 0)
904 return h;
905
906 if (mdev->cram_hmac_tfm) {
907 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
b10d96cb
JT
908 switch (drbd_do_auth(mdev)) {
909 case -1:
b411b363
PR
910 dev_err(DEV, "Authentication of peer failed\n");
911 return -1;
b10d96cb
JT
912 case 0:
913 dev_err(DEV, "Authentication of peer failed, trying again.\n");
914 return 0;
b411b363
PR
915 }
916 }
917
918 if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
919 return 0;
920
921 sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
922 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
923
924 atomic_set(&mdev->packet_seq, 0);
925 mdev->peer_seq = 0;
926
927 drbd_thread_start(&mdev->asender);
928
7e2455c1
PR
929 if (!drbd_send_protocol(mdev))
930 return -1;
b411b363 931 drbd_send_sync_param(mdev, &mdev->sync_conf);
e89b591c 932 drbd_send_sizes(mdev, 0, 0);
b411b363
PR
933 drbd_send_uuids(mdev);
934 drbd_send_state(mdev);
935 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
936 clear_bit(RESIZE_PENDING, &mdev->flags);
937
938 return 1;
939
940out_release_sockets:
941 if (sock)
942 sock_release(sock);
943 if (msock)
944 sock_release(msock);
945 return -1;
946}
947
948static int drbd_recv_header(struct drbd_conf *mdev, struct p_header *h)
949{
950 int r;
951
952 r = drbd_recv(mdev, h, sizeof(*h));
953
954 if (unlikely(r != sizeof(*h))) {
955 dev_err(DEV, "short read expecting header on sock: r=%d\n", r);
956 return FALSE;
957 };
958 h->command = be16_to_cpu(h->command);
959 h->length = be16_to_cpu(h->length);
960 if (unlikely(h->magic != BE_DRBD_MAGIC)) {
961 dev_err(DEV, "magic?? on data m: 0x%lx c: %d l: %d\n",
962 (long)be32_to_cpu(h->magic),
963 h->command, h->length);
964 return FALSE;
965 }
966 mdev->last_received = jiffies;
967
968 return TRUE;
969}
970
971static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
972{
973 int rv;
974
975 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
fbd9b09a
DM
976 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
977 NULL, BLKDEV_IFL_WAIT);
b411b363
PR
978 if (rv) {
979 dev_err(DEV, "local disk flush failed with status %d\n", rv);
980 /* would rather check on EOPNOTSUPP, but that is not reliable.
981 * don't try again for ANY return value != 0
982 * if (rv == -EOPNOTSUPP) */
983 drbd_bump_write_ordering(mdev, WO_drain_io);
984 }
985 put_ldev(mdev);
986 }
987
988 return drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
989}
990
991static int w_flush(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
992{
993 struct flush_work *fw = (struct flush_work *)w;
994 struct drbd_epoch *epoch = fw->epoch;
995
996 kfree(w);
997
998 if (!test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags))
999 drbd_flush_after_epoch(mdev, epoch);
1000
1001 drbd_may_finish_epoch(mdev, epoch, EV_PUT |
1002 (mdev->state.conn < C_CONNECTED ? EV_CLEANUP : 0));
1003
1004 return 1;
1005}
1006
1007/**
1008 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1009 * @mdev: DRBD device.
1010 * @epoch: Epoch object.
1011 * @ev: Epoch event.
1012 */
1013static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1014 struct drbd_epoch *epoch,
1015 enum epoch_event ev)
1016{
1017 int finish, epoch_size;
1018 struct drbd_epoch *next_epoch;
1019 int schedule_flush = 0;
1020 enum finish_epoch rv = FE_STILL_LIVE;
1021
1022 spin_lock(&mdev->epoch_lock);
1023 do {
1024 next_epoch = NULL;
1025 finish = 0;
1026
1027 epoch_size = atomic_read(&epoch->epoch_size);
1028
1029 switch (ev & ~EV_CLEANUP) {
1030 case EV_PUT:
1031 atomic_dec(&epoch->active);
1032 break;
1033 case EV_GOT_BARRIER_NR:
1034 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1035
1036 /* Special case: If we just switched from WO_bio_barrier to
1037 WO_bdev_flush we should not finish the current epoch */
1038 if (test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags) && epoch_size == 1 &&
1039 mdev->write_ordering != WO_bio_barrier &&
1040 epoch == mdev->current_epoch)
1041 clear_bit(DE_CONTAINS_A_BARRIER, &epoch->flags);
1042 break;
1043 case EV_BARRIER_DONE:
1044 set_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags);
1045 break;
1046 case EV_BECAME_LAST:
1047 /* nothing to do*/
1048 break;
1049 }
1050
b411b363
PR
1051 if (epoch_size != 0 &&
1052 atomic_read(&epoch->active) == 0 &&
1053 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) &&
1054 epoch->list.prev == &mdev->current_epoch->list &&
1055 !test_bit(DE_IS_FINISHING, &epoch->flags)) {
1056 /* Nearly all conditions are met to finish that epoch... */
1057 if (test_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags) ||
1058 mdev->write_ordering == WO_none ||
1059 (epoch_size == 1 && test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) ||
1060 ev & EV_CLEANUP) {
1061 finish = 1;
1062 set_bit(DE_IS_FINISHING, &epoch->flags);
1063 } else if (!test_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags) &&
1064 mdev->write_ordering == WO_bio_barrier) {
1065 atomic_inc(&epoch->active);
1066 schedule_flush = 1;
1067 }
1068 }
1069 if (finish) {
1070 if (!(ev & EV_CLEANUP)) {
1071 spin_unlock(&mdev->epoch_lock);
1072 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1073 spin_lock(&mdev->epoch_lock);
1074 }
1075 dec_unacked(mdev);
1076
1077 if (mdev->current_epoch != epoch) {
1078 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1079 list_del(&epoch->list);
1080 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1081 mdev->epochs--;
b411b363
PR
1082 kfree(epoch);
1083
1084 if (rv == FE_STILL_LIVE)
1085 rv = FE_DESTROYED;
1086 } else {
1087 epoch->flags = 0;
1088 atomic_set(&epoch->epoch_size, 0);
698f9315 1089 /* atomic_set(&epoch->active, 0); is already zero */
b411b363
PR
1090 if (rv == FE_STILL_LIVE)
1091 rv = FE_RECYCLED;
1092 }
1093 }
1094
1095 if (!next_epoch)
1096 break;
1097
1098 epoch = next_epoch;
1099 } while (1);
1100
1101 spin_unlock(&mdev->epoch_lock);
1102
1103 if (schedule_flush) {
1104 struct flush_work *fw;
1105 fw = kmalloc(sizeof(*fw), GFP_ATOMIC);
1106 if (fw) {
b411b363
PR
1107 fw->w.cb = w_flush;
1108 fw->epoch = epoch;
1109 drbd_queue_work(&mdev->data.work, &fw->w);
1110 } else {
1111 dev_warn(DEV, "Could not kmalloc a flush_work obj\n");
1112 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1113 /* That is not a recursion, only one level */
1114 drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
1115 drbd_may_finish_epoch(mdev, epoch, EV_PUT);
1116 }
1117 }
1118
1119 return rv;
1120}
1121
1122/**
1123 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1124 * @mdev: DRBD device.
1125 * @wo: Write ordering method to try.
1126 */
1127void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1128{
1129 enum write_ordering_e pwo;
1130 static char *write_ordering_str[] = {
1131 [WO_none] = "none",
1132 [WO_drain_io] = "drain",
1133 [WO_bdev_flush] = "flush",
1134 [WO_bio_barrier] = "barrier",
1135 };
1136
1137 pwo = mdev->write_ordering;
1138 wo = min(pwo, wo);
1139 if (wo == WO_bio_barrier && mdev->ldev->dc.no_disk_barrier)
1140 wo = WO_bdev_flush;
1141 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1142 wo = WO_drain_io;
1143 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1144 wo = WO_none;
1145 mdev->write_ordering = wo;
1146 if (pwo != mdev->write_ordering || wo == WO_bio_barrier)
1147 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1148}
1149
45bb912b
LE
1150/**
1151 * drbd_submit_ee()
1152 * @mdev: DRBD device.
1153 * @e: epoch entry
1154 * @rw: flag field, see bio->bi_rw
1155 */
1156/* TODO allocate from our own bio_set. */
1157int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1158 const unsigned rw, const int fault_type)
1159{
1160 struct bio *bios = NULL;
1161 struct bio *bio;
1162 struct page *page = e->pages;
1163 sector_t sector = e->sector;
1164 unsigned ds = e->size;
1165 unsigned n_bios = 0;
1166 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1167
1168 /* In most cases, we will only need one bio. But in case the lower
1169 * level restrictions happen to be different at this offset on this
1170 * side than those of the sending peer, we may need to submit the
1171 * request in more than one bio. */
1172next_bio:
1173 bio = bio_alloc(GFP_NOIO, nr_pages);
1174 if (!bio) {
1175 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1176 goto fail;
1177 }
1178 /* > e->sector, unless this is the first bio */
1179 bio->bi_sector = sector;
1180 bio->bi_bdev = mdev->ldev->backing_bdev;
1181 /* we special case some flags in the multi-bio case, see below
7b6d91da 1182 * (REQ_UNPLUG, REQ_HARDBARRIER) */
45bb912b
LE
1183 bio->bi_rw = rw;
1184 bio->bi_private = e;
1185 bio->bi_end_io = drbd_endio_sec;
1186
1187 bio->bi_next = bios;
1188 bios = bio;
1189 ++n_bios;
1190
1191 page_chain_for_each(page) {
1192 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1193 if (!bio_add_page(bio, page, len, 0)) {
1194 /* a single page must always be possible! */
1195 BUG_ON(bio->bi_vcnt == 0);
1196 goto next_bio;
1197 }
1198 ds -= len;
1199 sector += len >> 9;
1200 --nr_pages;
1201 }
1202 D_ASSERT(page == NULL);
1203 D_ASSERT(ds == 0);
1204
1205 atomic_set(&e->pending_bios, n_bios);
1206 do {
1207 bio = bios;
1208 bios = bios->bi_next;
1209 bio->bi_next = NULL;
1210
7b6d91da 1211 /* strip off REQ_UNPLUG unless it is the last bio */
45bb912b 1212 if (bios)
7b6d91da 1213 bio->bi_rw &= ~REQ_UNPLUG;
45bb912b
LE
1214
1215 drbd_generic_make_request(mdev, fault_type, bio);
1216
7b6d91da 1217 /* strip off REQ_HARDBARRIER,
45bb912b
LE
1218 * unless it is the first or last bio */
1219 if (bios && bios->bi_next)
7b6d91da 1220 bios->bi_rw &= ~REQ_HARDBARRIER;
45bb912b
LE
1221 } while (bios);
1222 maybe_kick_lo(mdev);
1223 return 0;
1224
1225fail:
1226 while (bios) {
1227 bio = bios;
1228 bios = bios->bi_next;
1229 bio_put(bio);
1230 }
1231 return -ENOMEM;
1232}
1233
b411b363 1234/**
7b6d91da 1235 * w_e_reissue() - Worker callback; Resubmit a bio, without REQ_HARDBARRIER set
b411b363
PR
1236 * @mdev: DRBD device.
1237 * @w: work object.
1238 * @cancel: The connection will be closed anyways (unused in this callback)
1239 */
1240int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
1241{
1242 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
b411b363
PR
1243 /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
1244 (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
1245 so that we can finish that epoch in drbd_may_finish_epoch().
1246 That is necessary if we already have a long chain of Epochs, before
7b6d91da 1247 we realize that REQ_HARDBARRIER is actually not supported */
b411b363
PR
1248
1249 /* As long as the -ENOTSUPP on the barrier is reported immediately
1250 that will never trigger. If it is reported late, we will just
1251 print that warning and continue correctly for all future requests
1252 with WO_bdev_flush */
1253 if (previous_epoch(mdev, e->epoch))
1254 dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
1255
b411b363
PR
1256 /* we still have a local reference,
1257 * get_ldev was done in receive_Data. */
b411b363
PR
1258
1259 e->w.cb = e_end_block;
45bb912b
LE
1260 if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_DT_WR) != 0) {
1261 /* drbd_submit_ee fails for one reason only:
1262 * if was not able to allocate sufficient bios.
1263 * requeue, try again later. */
1264 e->w.cb = w_e_reissue;
1265 drbd_queue_work(&mdev->data.work, &e->w);
1266 }
b411b363
PR
1267 return 1;
1268}
1269
1270static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h)
1271{
1272 int rv, issue_flush;
1273 struct p_barrier *p = (struct p_barrier *)h;
1274 struct drbd_epoch *epoch;
1275
1276 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
1277
1278 rv = drbd_recv(mdev, h->payload, h->length);
1279 ERR_IF(rv != h->length) return FALSE;
1280
1281 inc_unacked(mdev);
1282
1283 if (mdev->net_conf->wire_protocol != DRBD_PROT_C)
1284 drbd_kick_lo(mdev);
1285
1286 mdev->current_epoch->barrier_nr = p->barrier;
1287 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1288
1289 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1290 * the activity log, which means it would not be resynced in case the
1291 * R_PRIMARY crashes now.
1292 * Therefore we must send the barrier_ack after the barrier request was
1293 * completed. */
1294 switch (mdev->write_ordering) {
1295 case WO_bio_barrier:
1296 case WO_none:
1297 if (rv == FE_RECYCLED)
1298 return TRUE;
1299 break;
1300
1301 case WO_bdev_flush:
1302 case WO_drain_io:
367a8d73
PR
1303 if (rv == FE_STILL_LIVE) {
1304 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
1305 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1306 rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1307 }
b411b363
PR
1308 if (rv == FE_RECYCLED)
1309 return TRUE;
1310
1311 /* The asender will send all the ACKs and barrier ACKs out, since
1312 all EEs moved from the active_ee to the done_ee. We need to
1313 provide a new epoch object for the EEs that come in soon */
1314 break;
1315 }
1316
1317 /* receiver context, in the writeout path of the other node.
1318 * avoid potential distributed deadlock */
1319 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1320 if (!epoch) {
1321 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
d3db7b48 1322 issue_flush = !test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
b411b363
PR
1323 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1324 if (issue_flush) {
1325 rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1326 if (rv == FE_RECYCLED)
1327 return TRUE;
1328 }
1329
1330 drbd_wait_ee_list_empty(mdev, &mdev->done_ee);
1331
1332 return TRUE;
1333 }
1334
1335 epoch->flags = 0;
1336 atomic_set(&epoch->epoch_size, 0);
1337 atomic_set(&epoch->active, 0);
1338
1339 spin_lock(&mdev->epoch_lock);
1340 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1341 list_add(&epoch->list, &mdev->current_epoch->list);
1342 mdev->current_epoch = epoch;
1343 mdev->epochs++;
b411b363
PR
1344 } else {
1345 /* The current_epoch got recycled while we allocated this one... */
1346 kfree(epoch);
1347 }
1348 spin_unlock(&mdev->epoch_lock);
1349
1350 return TRUE;
1351}
1352
1353/* used from receive_RSDataReply (recv_resync_read)
1354 * and from receive_Data */
1355static struct drbd_epoch_entry *
1356read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1357{
6666032a 1358 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
b411b363 1359 struct drbd_epoch_entry *e;
b411b363 1360 struct page *page;
45bb912b 1361 int dgs, ds, rr;
b411b363
PR
1362 void *dig_in = mdev->int_dig_in;
1363 void *dig_vv = mdev->int_dig_vv;
6b4388ac 1364 unsigned long *data;
b411b363
PR
1365
1366 dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1367 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1368
1369 if (dgs) {
1370 rr = drbd_recv(mdev, dig_in, dgs);
1371 if (rr != dgs) {
1372 dev_warn(DEV, "short read receiving data digest: read %d expected %d\n",
1373 rr, dgs);
1374 return NULL;
1375 }
1376 }
1377
1378 data_size -= dgs;
1379
1380 ERR_IF(data_size & 0x1ff) return NULL;
1381 ERR_IF(data_size > DRBD_MAX_SEGMENT_SIZE) return NULL;
1382
6666032a
LE
1383 /* even though we trust out peer,
1384 * we sometimes have to double check. */
1385 if (sector + (data_size>>9) > capacity) {
1386 dev_err(DEV, "capacity: %llus < sector: %llus + size: %u\n",
1387 (unsigned long long)capacity,
1388 (unsigned long long)sector, data_size);
1389 return NULL;
1390 }
1391
b411b363
PR
1392 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1393 * "criss-cross" setup, that might cause write-out on some other DRBD,
1394 * which in turn might block on the other node at this very place. */
1395 e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1396 if (!e)
1397 return NULL;
45bb912b 1398
b411b363 1399 ds = data_size;
45bb912b
LE
1400 page = e->pages;
1401 page_chain_for_each(page) {
1402 unsigned len = min_t(int, ds, PAGE_SIZE);
6b4388ac 1403 data = kmap(page);
45bb912b 1404 rr = drbd_recv(mdev, data, len);
6b4388ac
PR
1405 if (FAULT_ACTIVE(mdev, DRBD_FAULT_RECEIVE)) {
1406 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1407 data[0] = data[0] ^ (unsigned long)-1;
1408 }
b411b363 1409 kunmap(page);
45bb912b 1410 if (rr != len) {
b411b363
PR
1411 drbd_free_ee(mdev, e);
1412 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
45bb912b 1413 rr, len);
b411b363
PR
1414 return NULL;
1415 }
1416 ds -= rr;
1417 }
1418
1419 if (dgs) {
45bb912b 1420 drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
b411b363
PR
1421 if (memcmp(dig_in, dig_vv, dgs)) {
1422 dev_err(DEV, "Digest integrity check FAILED.\n");
1423 drbd_bcast_ee(mdev, "digest failed",
1424 dgs, dig_in, dig_vv, e);
1425 drbd_free_ee(mdev, e);
1426 return NULL;
1427 }
1428 }
1429 mdev->recv_cnt += data_size>>9;
1430 return e;
1431}
1432
1433/* drbd_drain_block() just takes a data block
1434 * out of the socket input buffer, and discards it.
1435 */
1436static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1437{
1438 struct page *page;
1439 int rr, rv = 1;
1440 void *data;
1441
c3470cde
LE
1442 if (!data_size)
1443 return TRUE;
1444
45bb912b 1445 page = drbd_pp_alloc(mdev, 1, 1);
b411b363
PR
1446
1447 data = kmap(page);
1448 while (data_size) {
1449 rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1450 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1451 rv = 0;
1452 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1453 rr, min_t(int, data_size, PAGE_SIZE));
1454 break;
1455 }
1456 data_size -= rr;
1457 }
1458 kunmap(page);
1459 drbd_pp_free(mdev, page);
1460 return rv;
1461}
1462
1463static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1464 sector_t sector, int data_size)
1465{
1466 struct bio_vec *bvec;
1467 struct bio *bio;
1468 int dgs, rr, i, expect;
1469 void *dig_in = mdev->int_dig_in;
1470 void *dig_vv = mdev->int_dig_vv;
1471
1472 dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1473 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1474
1475 if (dgs) {
1476 rr = drbd_recv(mdev, dig_in, dgs);
1477 if (rr != dgs) {
1478 dev_warn(DEV, "short read receiving data reply digest: read %d expected %d\n",
1479 rr, dgs);
1480 return 0;
1481 }
1482 }
1483
1484 data_size -= dgs;
1485
1486 /* optimistically update recv_cnt. if receiving fails below,
1487 * we disconnect anyways, and counters will be reset. */
1488 mdev->recv_cnt += data_size>>9;
1489
1490 bio = req->master_bio;
1491 D_ASSERT(sector == bio->bi_sector);
1492
1493 bio_for_each_segment(bvec, bio, i) {
1494 expect = min_t(int, data_size, bvec->bv_len);
1495 rr = drbd_recv(mdev,
1496 kmap(bvec->bv_page)+bvec->bv_offset,
1497 expect);
1498 kunmap(bvec->bv_page);
1499 if (rr != expect) {
1500 dev_warn(DEV, "short read receiving data reply: "
1501 "read %d expected %d\n",
1502 rr, expect);
1503 return 0;
1504 }
1505 data_size -= rr;
1506 }
1507
1508 if (dgs) {
45bb912b 1509 drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
b411b363
PR
1510 if (memcmp(dig_in, dig_vv, dgs)) {
1511 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1512 return 0;
1513 }
1514 }
1515
1516 D_ASSERT(data_size == 0);
1517 return 1;
1518}
1519
1520/* e_end_resync_block() is called via
1521 * drbd_process_done_ee() by asender only */
1522static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1523{
1524 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1525 sector_t sector = e->sector;
1526 int ok;
1527
1528 D_ASSERT(hlist_unhashed(&e->colision));
1529
45bb912b 1530 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1531 drbd_set_in_sync(mdev, sector, e->size);
1532 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1533 } else {
1534 /* Record failure to sync */
1535 drbd_rs_failed_io(mdev, sector, e->size);
1536
1537 ok = drbd_send_ack(mdev, P_NEG_ACK, e);
1538 }
1539 dec_unacked(mdev);
1540
1541 return ok;
1542}
1543
1544static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1545{
1546 struct drbd_epoch_entry *e;
1547
1548 e = read_in_block(mdev, ID_SYNCER, sector, data_size);
45bb912b
LE
1549 if (!e)
1550 goto fail;
b411b363
PR
1551
1552 dec_rs_pending(mdev);
1553
b411b363
PR
1554 inc_unacked(mdev);
1555 /* corresponding dec_unacked() in e_end_resync_block()
1556 * respective _drbd_clear_done_ee */
1557
45bb912b
LE
1558 e->w.cb = e_end_resync_block;
1559
b411b363
PR
1560 spin_lock_irq(&mdev->req_lock);
1561 list_add(&e->w.list, &mdev->sync_ee);
1562 spin_unlock_irq(&mdev->req_lock);
1563
45bb912b
LE
1564 if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1565 return TRUE;
b411b363 1566
45bb912b
LE
1567 drbd_free_ee(mdev, e);
1568fail:
1569 put_ldev(mdev);
1570 return FALSE;
b411b363
PR
1571}
1572
1573static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
1574{
1575 struct drbd_request *req;
1576 sector_t sector;
1577 unsigned int header_size, data_size;
1578 int ok;
1579 struct p_data *p = (struct p_data *)h;
1580
1581 header_size = sizeof(*p) - sizeof(*h);
1582 data_size = h->length - header_size;
1583
1584 ERR_IF(data_size == 0) return FALSE;
1585
1586 if (drbd_recv(mdev, h->payload, header_size) != header_size)
1587 return FALSE;
1588
1589 sector = be64_to_cpu(p->sector);
1590
1591 spin_lock_irq(&mdev->req_lock);
1592 req = _ar_id_to_req(mdev, p->block_id, sector);
1593 spin_unlock_irq(&mdev->req_lock);
1594 if (unlikely(!req)) {
1595 dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1596 return FALSE;
1597 }
1598
1599 /* hlist_del(&req->colision) is done in _req_may_be_done, to avoid
1600 * special casing it there for the various failure cases.
1601 * still no race with drbd_fail_pending_reads */
1602 ok = recv_dless_read(mdev, req, sector, data_size);
1603
1604 if (ok)
1605 req_mod(req, data_received);
1606 /* else: nothing. handled from drbd_disconnect...
1607 * I don't think we may complete this just yet
1608 * in case we are "on-disconnect: freeze" */
1609
1610 return ok;
1611}
1612
1613static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h)
1614{
1615 sector_t sector;
1616 unsigned int header_size, data_size;
1617 int ok;
1618 struct p_data *p = (struct p_data *)h;
1619
1620 header_size = sizeof(*p) - sizeof(*h);
1621 data_size = h->length - header_size;
1622
1623 ERR_IF(data_size == 0) return FALSE;
1624
1625 if (drbd_recv(mdev, h->payload, header_size) != header_size)
1626 return FALSE;
1627
1628 sector = be64_to_cpu(p->sector);
1629 D_ASSERT(p->block_id == ID_SYNCER);
1630
1631 if (get_ldev(mdev)) {
1632 /* data is submitted to disk within recv_resync_read.
1633 * corresponding put_ldev done below on error,
1634 * or in drbd_endio_write_sec. */
1635 ok = recv_resync_read(mdev, sector, data_size);
1636 } else {
1637 if (__ratelimit(&drbd_ratelimit_state))
1638 dev_err(DEV, "Can not write resync data to local disk.\n");
1639
1640 ok = drbd_drain_block(mdev, data_size);
1641
1642 drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1643 }
1644
778f271d
PR
1645 atomic_add(data_size >> 9, &mdev->rs_sect_in);
1646
b411b363
PR
1647 return ok;
1648}
1649
1650/* e_end_block() is called via drbd_process_done_ee().
1651 * this means this function only runs in the asender thread
1652 */
1653static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1654{
1655 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1656 sector_t sector = e->sector;
1657 struct drbd_epoch *epoch;
1658 int ok = 1, pcmd;
1659
1660 if (e->flags & EE_IS_BARRIER) {
1661 epoch = previous_epoch(mdev, e->epoch);
1662 if (epoch)
1663 drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE + (cancel ? EV_CLEANUP : 0));
1664 }
1665
1666 if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
45bb912b 1667 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1668 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1669 mdev->state.conn <= C_PAUSED_SYNC_T &&
1670 e->flags & EE_MAY_SET_IN_SYNC) ?
1671 P_RS_WRITE_ACK : P_WRITE_ACK;
1672 ok &= drbd_send_ack(mdev, pcmd, e);
1673 if (pcmd == P_RS_WRITE_ACK)
1674 drbd_set_in_sync(mdev, sector, e->size);
1675 } else {
1676 ok = drbd_send_ack(mdev, P_NEG_ACK, e);
1677 /* we expect it to be marked out of sync anyways...
1678 * maybe assert this? */
1679 }
1680 dec_unacked(mdev);
1681 }
1682 /* we delete from the conflict detection hash _after_ we sent out the
1683 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1684 if (mdev->net_conf->two_primaries) {
1685 spin_lock_irq(&mdev->req_lock);
1686 D_ASSERT(!hlist_unhashed(&e->colision));
1687 hlist_del_init(&e->colision);
1688 spin_unlock_irq(&mdev->req_lock);
1689 } else {
1690 D_ASSERT(hlist_unhashed(&e->colision));
1691 }
1692
1693 drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1694
1695 return ok;
1696}
1697
1698static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1699{
1700 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1701 int ok = 1;
1702
1703 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1704 ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1705
1706 spin_lock_irq(&mdev->req_lock);
1707 D_ASSERT(!hlist_unhashed(&e->colision));
1708 hlist_del_init(&e->colision);
1709 spin_unlock_irq(&mdev->req_lock);
1710
1711 dec_unacked(mdev);
1712
1713 return ok;
1714}
1715
1716/* Called from receive_Data.
1717 * Synchronize packets on sock with packets on msock.
1718 *
1719 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1720 * packet traveling on msock, they are still processed in the order they have
1721 * been sent.
1722 *
1723 * Note: we don't care for Ack packets overtaking P_DATA packets.
1724 *
1725 * In case packet_seq is larger than mdev->peer_seq number, there are
1726 * outstanding packets on the msock. We wait for them to arrive.
1727 * In case we are the logically next packet, we update mdev->peer_seq
1728 * ourselves. Correctly handles 32bit wrap around.
1729 *
1730 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1731 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1732 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1733 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1734 *
1735 * returns 0 if we may process the packet,
1736 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1737static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1738{
1739 DEFINE_WAIT(wait);
1740 unsigned int p_seq;
1741 long timeout;
1742 int ret = 0;
1743 spin_lock(&mdev->peer_seq_lock);
1744 for (;;) {
1745 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1746 if (seq_le(packet_seq, mdev->peer_seq+1))
1747 break;
1748 if (signal_pending(current)) {
1749 ret = -ERESTARTSYS;
1750 break;
1751 }
1752 p_seq = mdev->peer_seq;
1753 spin_unlock(&mdev->peer_seq_lock);
1754 timeout = schedule_timeout(30*HZ);
1755 spin_lock(&mdev->peer_seq_lock);
1756 if (timeout == 0 && p_seq == mdev->peer_seq) {
1757 ret = -ETIMEDOUT;
1758 dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1759 break;
1760 }
1761 }
1762 finish_wait(&mdev->seq_wait, &wait);
1763 if (mdev->peer_seq+1 == packet_seq)
1764 mdev->peer_seq++;
1765 spin_unlock(&mdev->peer_seq_lock);
1766 return ret;
1767}
1768
1769/* mirrored write */
1770static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
1771{
1772 sector_t sector;
1773 struct drbd_epoch_entry *e;
1774 struct p_data *p = (struct p_data *)h;
1775 int header_size, data_size;
1776 int rw = WRITE;
1777 u32 dp_flags;
1778
1779 header_size = sizeof(*p) - sizeof(*h);
1780 data_size = h->length - header_size;
1781
1782 ERR_IF(data_size == 0) return FALSE;
1783
1784 if (drbd_recv(mdev, h->payload, header_size) != header_size)
1785 return FALSE;
1786
1787 if (!get_ldev(mdev)) {
1788 if (__ratelimit(&drbd_ratelimit_state))
1789 dev_err(DEV, "Can not write mirrored data block "
1790 "to local disk.\n");
1791 spin_lock(&mdev->peer_seq_lock);
1792 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1793 mdev->peer_seq++;
1794 spin_unlock(&mdev->peer_seq_lock);
1795
1796 drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1797 atomic_inc(&mdev->current_epoch->epoch_size);
1798 return drbd_drain_block(mdev, data_size);
1799 }
1800
1801 /* get_ldev(mdev) successful.
1802 * Corresponding put_ldev done either below (on various errors),
1803 * or in drbd_endio_write_sec, if we successfully submit the data at
1804 * the end of this function. */
1805
1806 sector = be64_to_cpu(p->sector);
1807 e = read_in_block(mdev, p->block_id, sector, data_size);
1808 if (!e) {
1809 put_ldev(mdev);
1810 return FALSE;
1811 }
1812
b411b363
PR
1813 e->w.cb = e_end_block;
1814
1815 spin_lock(&mdev->epoch_lock);
1816 e->epoch = mdev->current_epoch;
1817 atomic_inc(&e->epoch->epoch_size);
1818 atomic_inc(&e->epoch->active);
1819
1820 if (mdev->write_ordering == WO_bio_barrier && atomic_read(&e->epoch->epoch_size) == 1) {
1821 struct drbd_epoch *epoch;
1822 /* Issue a barrier if we start a new epoch, and the previous epoch
1823 was not a epoch containing a single request which already was
1824 a Barrier. */
1825 epoch = list_entry(e->epoch->list.prev, struct drbd_epoch, list);
1826 if (epoch == e->epoch) {
1827 set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
7b6d91da 1828 rw |= REQ_HARDBARRIER;
b411b363
PR
1829 e->flags |= EE_IS_BARRIER;
1830 } else {
1831 if (atomic_read(&epoch->epoch_size) > 1 ||
1832 !test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) {
1833 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
b411b363 1834 set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
7b6d91da 1835 rw |= REQ_HARDBARRIER;
b411b363
PR
1836 e->flags |= EE_IS_BARRIER;
1837 }
1838 }
1839 }
1840 spin_unlock(&mdev->epoch_lock);
1841
1842 dp_flags = be32_to_cpu(p->dp_flags);
1843 if (dp_flags & DP_HARDBARRIER) {
1844 dev_err(DEV, "ASSERT FAILED would have submitted barrier request\n");
7b6d91da 1845 /* rw |= REQ_HARDBARRIER; */
b411b363
PR
1846 }
1847 if (dp_flags & DP_RW_SYNC)
7b6d91da 1848 rw |= REQ_SYNC | REQ_UNPLUG;
b411b363
PR
1849 if (dp_flags & DP_MAY_SET_IN_SYNC)
1850 e->flags |= EE_MAY_SET_IN_SYNC;
1851
1852 /* I'm the receiver, I do hold a net_cnt reference. */
1853 if (!mdev->net_conf->two_primaries) {
1854 spin_lock_irq(&mdev->req_lock);
1855 } else {
1856 /* don't get the req_lock yet,
1857 * we may sleep in drbd_wait_peer_seq */
1858 const int size = e->size;
1859 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1860 DEFINE_WAIT(wait);
1861 struct drbd_request *i;
1862 struct hlist_node *n;
1863 struct hlist_head *slot;
1864 int first;
1865
1866 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1867 BUG_ON(mdev->ee_hash == NULL);
1868 BUG_ON(mdev->tl_hash == NULL);
1869
1870 /* conflict detection and handling:
1871 * 1. wait on the sequence number,
1872 * in case this data packet overtook ACK packets.
1873 * 2. check our hash tables for conflicting requests.
1874 * we only need to walk the tl_hash, since an ee can not
1875 * have a conflict with an other ee: on the submitting
1876 * node, the corresponding req had already been conflicting,
1877 * and a conflicting req is never sent.
1878 *
1879 * Note: for two_primaries, we are protocol C,
1880 * so there cannot be any request that is DONE
1881 * but still on the transfer log.
1882 *
1883 * unconditionally add to the ee_hash.
1884 *
1885 * if no conflicting request is found:
1886 * submit.
1887 *
1888 * if any conflicting request is found
1889 * that has not yet been acked,
1890 * AND I have the "discard concurrent writes" flag:
1891 * queue (via done_ee) the P_DISCARD_ACK; OUT.
1892 *
1893 * if any conflicting request is found:
1894 * block the receiver, waiting on misc_wait
1895 * until no more conflicting requests are there,
1896 * or we get interrupted (disconnect).
1897 *
1898 * we do not just write after local io completion of those
1899 * requests, but only after req is done completely, i.e.
1900 * we wait for the P_DISCARD_ACK to arrive!
1901 *
1902 * then proceed normally, i.e. submit.
1903 */
1904 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1905 goto out_interrupted;
1906
1907 spin_lock_irq(&mdev->req_lock);
1908
1909 hlist_add_head(&e->colision, ee_hash_slot(mdev, sector));
1910
1911#define OVERLAPS overlaps(i->sector, i->size, sector, size)
1912 slot = tl_hash_slot(mdev, sector);
1913 first = 1;
1914 for (;;) {
1915 int have_unacked = 0;
1916 int have_conflict = 0;
1917 prepare_to_wait(&mdev->misc_wait, &wait,
1918 TASK_INTERRUPTIBLE);
1919 hlist_for_each_entry(i, n, slot, colision) {
1920 if (OVERLAPS) {
1921 /* only ALERT on first iteration,
1922 * we may be woken up early... */
1923 if (first)
1924 dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1925 " new: %llus +%u; pending: %llus +%u\n",
1926 current->comm, current->pid,
1927 (unsigned long long)sector, size,
1928 (unsigned long long)i->sector, i->size);
1929 if (i->rq_state & RQ_NET_PENDING)
1930 ++have_unacked;
1931 ++have_conflict;
1932 }
1933 }
1934#undef OVERLAPS
1935 if (!have_conflict)
1936 break;
1937
1938 /* Discard Ack only for the _first_ iteration */
1939 if (first && discard && have_unacked) {
1940 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1941 (unsigned long long)sector);
1942 inc_unacked(mdev);
1943 e->w.cb = e_send_discard_ack;
1944 list_add_tail(&e->w.list, &mdev->done_ee);
1945
1946 spin_unlock_irq(&mdev->req_lock);
1947
1948 /* we could probably send that P_DISCARD_ACK ourselves,
1949 * but I don't like the receiver using the msock */
1950
1951 put_ldev(mdev);
1952 wake_asender(mdev);
1953 finish_wait(&mdev->misc_wait, &wait);
1954 return TRUE;
1955 }
1956
1957 if (signal_pending(current)) {
1958 hlist_del_init(&e->colision);
1959
1960 spin_unlock_irq(&mdev->req_lock);
1961
1962 finish_wait(&mdev->misc_wait, &wait);
1963 goto out_interrupted;
1964 }
1965
1966 spin_unlock_irq(&mdev->req_lock);
1967 if (first) {
1968 first = 0;
1969 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1970 "sec=%llus\n", (unsigned long long)sector);
1971 } else if (discard) {
1972 /* we had none on the first iteration.
1973 * there must be none now. */
1974 D_ASSERT(have_unacked == 0);
1975 }
1976 schedule();
1977 spin_lock_irq(&mdev->req_lock);
1978 }
1979 finish_wait(&mdev->misc_wait, &wait);
1980 }
1981
1982 list_add(&e->w.list, &mdev->active_ee);
1983 spin_unlock_irq(&mdev->req_lock);
1984
1985 switch (mdev->net_conf->wire_protocol) {
1986 case DRBD_PROT_C:
1987 inc_unacked(mdev);
1988 /* corresponding dec_unacked() in e_end_block()
1989 * respective _drbd_clear_done_ee */
1990 break;
1991 case DRBD_PROT_B:
1992 /* I really don't like it that the receiver thread
1993 * sends on the msock, but anyways */
1994 drbd_send_ack(mdev, P_RECV_ACK, e);
1995 break;
1996 case DRBD_PROT_A:
1997 /* nothing to do */
1998 break;
1999 }
2000
2001 if (mdev->state.pdsk == D_DISKLESS) {
2002 /* In case we have the only disk of the cluster, */
2003 drbd_set_out_of_sync(mdev, e->sector, e->size);
2004 e->flags |= EE_CALL_AL_COMPLETE_IO;
2005 drbd_al_begin_io(mdev, e->sector);
2006 }
2007
45bb912b
LE
2008 if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
2009 return TRUE;
b411b363
PR
2010
2011out_interrupted:
2012 /* yes, the epoch_size now is imbalanced.
2013 * but we drop the connection anyways, so we don't have a chance to
2014 * receive a barrier... atomic_inc(&mdev->epoch_size); */
2015 put_ldev(mdev);
2016 drbd_free_ee(mdev, e);
2017 return FALSE;
2018}
2019
2020static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
2021{
2022 sector_t sector;
2023 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
2024 struct drbd_epoch_entry *e;
2025 struct digest_info *di = NULL;
2026 int size, digest_size;
2027 unsigned int fault_type;
2028 struct p_block_req *p =
2029 (struct p_block_req *)h;
2030 const int brps = sizeof(*p)-sizeof(*h);
2031
2032 if (drbd_recv(mdev, h->payload, brps) != brps)
2033 return FALSE;
2034
2035 sector = be64_to_cpu(p->sector);
2036 size = be32_to_cpu(p->blksize);
2037
2038 if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_SEGMENT_SIZE) {
2039 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2040 (unsigned long long)sector, size);
2041 return FALSE;
2042 }
2043 if (sector + (size>>9) > capacity) {
2044 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2045 (unsigned long long)sector, size);
2046 return FALSE;
2047 }
2048
2049 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2050 if (__ratelimit(&drbd_ratelimit_state))
2051 dev_err(DEV, "Can not satisfy peer's read request, "
2052 "no local data.\n");
2053 drbd_send_ack_rp(mdev, h->command == P_DATA_REQUEST ? P_NEG_DREPLY :
2054 P_NEG_RS_DREPLY , p);
c3470cde 2055 return drbd_drain_block(mdev, h->length - brps);
b411b363
PR
2056 }
2057
2058 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2059 * "criss-cross" setup, that might cause write-out on some other DRBD,
2060 * which in turn might block on the other node at this very place. */
2061 e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2062 if (!e) {
2063 put_ldev(mdev);
2064 return FALSE;
2065 }
2066
b411b363
PR
2067 switch (h->command) {
2068 case P_DATA_REQUEST:
2069 e->w.cb = w_e_end_data_req;
2070 fault_type = DRBD_FAULT_DT_RD;
80a40e43
LE
2071 /* application IO, don't drbd_rs_begin_io */
2072 goto submit;
2073
b411b363
PR
2074 case P_RS_DATA_REQUEST:
2075 e->w.cb = w_e_end_rsdata_req;
2076 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2077 break;
2078
2079 case P_OV_REPLY:
2080 case P_CSUM_RS_REQUEST:
2081 fault_type = DRBD_FAULT_RS_RD;
2082 digest_size = h->length - brps ;
2083 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2084 if (!di)
2085 goto out_free_e;
2086
2087 di->digest_size = digest_size;
2088 di->digest = (((char *)di)+sizeof(struct digest_info));
2089
c36c3ced
LE
2090 e->digest = di;
2091 e->flags |= EE_HAS_DIGEST;
2092
b411b363
PR
2093 if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2094 goto out_free_e;
2095
b411b363
PR
2096 if (h->command == P_CSUM_RS_REQUEST) {
2097 D_ASSERT(mdev->agreed_pro_version >= 89);
2098 e->w.cb = w_e_end_csum_rs_req;
2099 } else if (h->command == P_OV_REPLY) {
2100 e->w.cb = w_e_end_ov_reply;
2101 dec_rs_pending(mdev);
80a40e43
LE
2102 /* drbd_rs_begin_io done when we sent this request */
2103 goto submit;
b411b363
PR
2104 }
2105 break;
2106
2107 case P_OV_REQUEST:
2108 if (mdev->state.conn >= C_CONNECTED &&
2109 mdev->state.conn != C_VERIFY_T)
2110 dev_warn(DEV, "ASSERT FAILED: got P_OV_REQUEST while being %s\n",
2111 drbd_conn_str(mdev->state.conn));
2112 if (mdev->ov_start_sector == ~(sector_t)0 &&
2113 mdev->agreed_pro_version >= 90) {
2114 mdev->ov_start_sector = sector;
2115 mdev->ov_position = sector;
2116 mdev->ov_left = mdev->rs_total - BM_SECT_TO_BIT(sector);
2117 dev_info(DEV, "Online Verify start sector: %llu\n",
2118 (unsigned long long)sector);
2119 }
2120 e->w.cb = w_e_end_ov_req;
2121 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2122 break;
2123
b411b363
PR
2124 default:
2125 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2126 cmdname(h->command));
2127 fault_type = DRBD_FAULT_MAX;
80a40e43 2128 goto out_free_e;
b411b363
PR
2129 }
2130
80a40e43
LE
2131 if (drbd_rs_begin_io(mdev, e->sector))
2132 goto out_free_e;
b411b363 2133
80a40e43 2134submit:
b411b363 2135 inc_unacked(mdev);
80a40e43
LE
2136 spin_lock_irq(&mdev->req_lock);
2137 list_add_tail(&e->w.list, &mdev->read_ee);
2138 spin_unlock_irq(&mdev->req_lock);
b411b363 2139
45bb912b
LE
2140 if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2141 return TRUE;
b411b363
PR
2142
2143out_free_e:
b411b363
PR
2144 put_ldev(mdev);
2145 drbd_free_ee(mdev, e);
2146 return FALSE;
2147}
2148
2149static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2150{
2151 int self, peer, rv = -100;
2152 unsigned long ch_self, ch_peer;
2153
2154 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2155 peer = mdev->p_uuid[UI_BITMAP] & 1;
2156
2157 ch_peer = mdev->p_uuid[UI_SIZE];
2158 ch_self = mdev->comm_bm_set;
2159
2160 switch (mdev->net_conf->after_sb_0p) {
2161 case ASB_CONSENSUS:
2162 case ASB_DISCARD_SECONDARY:
2163 case ASB_CALL_HELPER:
2164 dev_err(DEV, "Configuration error.\n");
2165 break;
2166 case ASB_DISCONNECT:
2167 break;
2168 case ASB_DISCARD_YOUNGER_PRI:
2169 if (self == 0 && peer == 1) {
2170 rv = -1;
2171 break;
2172 }
2173 if (self == 1 && peer == 0) {
2174 rv = 1;
2175 break;
2176 }
2177 /* Else fall through to one of the other strategies... */
2178 case ASB_DISCARD_OLDER_PRI:
2179 if (self == 0 && peer == 1) {
2180 rv = 1;
2181 break;
2182 }
2183 if (self == 1 && peer == 0) {
2184 rv = -1;
2185 break;
2186 }
2187 /* Else fall through to one of the other strategies... */
ad19bf6e 2188 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
b411b363
PR
2189 "Using discard-least-changes instead\n");
2190 case ASB_DISCARD_ZERO_CHG:
2191 if (ch_peer == 0 && ch_self == 0) {
2192 rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2193 ? -1 : 1;
2194 break;
2195 } else {
2196 if (ch_peer == 0) { rv = 1; break; }
2197 if (ch_self == 0) { rv = -1; break; }
2198 }
2199 if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2200 break;
2201 case ASB_DISCARD_LEAST_CHG:
2202 if (ch_self < ch_peer)
2203 rv = -1;
2204 else if (ch_self > ch_peer)
2205 rv = 1;
2206 else /* ( ch_self == ch_peer ) */
2207 /* Well, then use something else. */
2208 rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2209 ? -1 : 1;
2210 break;
2211 case ASB_DISCARD_LOCAL:
2212 rv = -1;
2213 break;
2214 case ASB_DISCARD_REMOTE:
2215 rv = 1;
2216 }
2217
2218 return rv;
2219}
2220
2221static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2222{
2223 int self, peer, hg, rv = -100;
2224
2225 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2226 peer = mdev->p_uuid[UI_BITMAP] & 1;
2227
2228 switch (mdev->net_conf->after_sb_1p) {
2229 case ASB_DISCARD_YOUNGER_PRI:
2230 case ASB_DISCARD_OLDER_PRI:
2231 case ASB_DISCARD_LEAST_CHG:
2232 case ASB_DISCARD_LOCAL:
2233 case ASB_DISCARD_REMOTE:
2234 dev_err(DEV, "Configuration error.\n");
2235 break;
2236 case ASB_DISCONNECT:
2237 break;
2238 case ASB_CONSENSUS:
2239 hg = drbd_asb_recover_0p(mdev);
2240 if (hg == -1 && mdev->state.role == R_SECONDARY)
2241 rv = hg;
2242 if (hg == 1 && mdev->state.role == R_PRIMARY)
2243 rv = hg;
2244 break;
2245 case ASB_VIOLENTLY:
2246 rv = drbd_asb_recover_0p(mdev);
2247 break;
2248 case ASB_DISCARD_SECONDARY:
2249 return mdev->state.role == R_PRIMARY ? 1 : -1;
2250 case ASB_CALL_HELPER:
2251 hg = drbd_asb_recover_0p(mdev);
2252 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2253 self = drbd_set_role(mdev, R_SECONDARY, 0);
2254 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2255 * we might be here in C_WF_REPORT_PARAMS which is transient.
2256 * we do not need to wait for the after state change work either. */
2257 self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2258 if (self != SS_SUCCESS) {
2259 drbd_khelper(mdev, "pri-lost-after-sb");
2260 } else {
2261 dev_warn(DEV, "Successfully gave up primary role.\n");
2262 rv = hg;
2263 }
2264 } else
2265 rv = hg;
2266 }
2267
2268 return rv;
2269}
2270
2271static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2272{
2273 int self, peer, hg, rv = -100;
2274
2275 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2276 peer = mdev->p_uuid[UI_BITMAP] & 1;
2277
2278 switch (mdev->net_conf->after_sb_2p) {
2279 case ASB_DISCARD_YOUNGER_PRI:
2280 case ASB_DISCARD_OLDER_PRI:
2281 case ASB_DISCARD_LEAST_CHG:
2282 case ASB_DISCARD_LOCAL:
2283 case ASB_DISCARD_REMOTE:
2284 case ASB_CONSENSUS:
2285 case ASB_DISCARD_SECONDARY:
2286 dev_err(DEV, "Configuration error.\n");
2287 break;
2288 case ASB_VIOLENTLY:
2289 rv = drbd_asb_recover_0p(mdev);
2290 break;
2291 case ASB_DISCONNECT:
2292 break;
2293 case ASB_CALL_HELPER:
2294 hg = drbd_asb_recover_0p(mdev);
2295 if (hg == -1) {
2296 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2297 * we might be here in C_WF_REPORT_PARAMS which is transient.
2298 * we do not need to wait for the after state change work either. */
2299 self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2300 if (self != SS_SUCCESS) {
2301 drbd_khelper(mdev, "pri-lost-after-sb");
2302 } else {
2303 dev_warn(DEV, "Successfully gave up primary role.\n");
2304 rv = hg;
2305 }
2306 } else
2307 rv = hg;
2308 }
2309
2310 return rv;
2311}
2312
2313static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2314 u64 bits, u64 flags)
2315{
2316 if (!uuid) {
2317 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2318 return;
2319 }
2320 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2321 text,
2322 (unsigned long long)uuid[UI_CURRENT],
2323 (unsigned long long)uuid[UI_BITMAP],
2324 (unsigned long long)uuid[UI_HISTORY_START],
2325 (unsigned long long)uuid[UI_HISTORY_END],
2326 (unsigned long long)bits,
2327 (unsigned long long)flags);
2328}
2329
2330/*
2331 100 after split brain try auto recover
2332 2 C_SYNC_SOURCE set BitMap
2333 1 C_SYNC_SOURCE use BitMap
2334 0 no Sync
2335 -1 C_SYNC_TARGET use BitMap
2336 -2 C_SYNC_TARGET set BitMap
2337 -100 after split brain, disconnect
2338-1000 unrelated data
2339 */
2340static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2341{
2342 u64 self, peer;
2343 int i, j;
2344
2345 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2346 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2347
2348 *rule_nr = 10;
2349 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2350 return 0;
2351
2352 *rule_nr = 20;
2353 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2354 peer != UUID_JUST_CREATED)
2355 return -2;
2356
2357 *rule_nr = 30;
2358 if (self != UUID_JUST_CREATED &&
2359 (peer == UUID_JUST_CREATED || peer == (u64)0))
2360 return 2;
2361
2362 if (self == peer) {
2363 int rct, dc; /* roles at crash time */
2364
2365 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2366
2367 if (mdev->agreed_pro_version < 91)
2368 return -1001;
2369
2370 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2371 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2372 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2373 drbd_uuid_set_bm(mdev, 0UL);
2374
2375 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2376 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2377 *rule_nr = 34;
2378 } else {
2379 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2380 *rule_nr = 36;
2381 }
2382
2383 return 1;
2384 }
2385
2386 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2387
2388 if (mdev->agreed_pro_version < 91)
2389 return -1001;
2390
2391 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2392 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2393 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2394
2395 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2396 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2397 mdev->p_uuid[UI_BITMAP] = 0UL;
2398
2399 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2400 *rule_nr = 35;
2401 } else {
2402 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2403 *rule_nr = 37;
2404 }
2405
2406 return -1;
2407 }
2408
2409 /* Common power [off|failure] */
2410 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2411 (mdev->p_uuid[UI_FLAGS] & 2);
2412 /* lowest bit is set when we were primary,
2413 * next bit (weight 2) is set when peer was primary */
2414 *rule_nr = 40;
2415
2416 switch (rct) {
2417 case 0: /* !self_pri && !peer_pri */ return 0;
2418 case 1: /* self_pri && !peer_pri */ return 1;
2419 case 2: /* !self_pri && peer_pri */ return -1;
2420 case 3: /* self_pri && peer_pri */
2421 dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2422 return dc ? -1 : 1;
2423 }
2424 }
2425
2426 *rule_nr = 50;
2427 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2428 if (self == peer)
2429 return -1;
2430
2431 *rule_nr = 51;
2432 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2433 if (self == peer) {
2434 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2435 peer = mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1);
2436 if (self == peer) {
2437 /* The last P_SYNC_UUID did not get though. Undo the last start of
2438 resync as sync source modifications of the peer's UUIDs. */
2439
2440 if (mdev->agreed_pro_version < 91)
2441 return -1001;
2442
2443 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2444 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2445 return -1;
2446 }
2447 }
2448
2449 *rule_nr = 60;
2450 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2451 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2452 peer = mdev->p_uuid[i] & ~((u64)1);
2453 if (self == peer)
2454 return -2;
2455 }
2456
2457 *rule_nr = 70;
2458 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2459 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2460 if (self == peer)
2461 return 1;
2462
2463 *rule_nr = 71;
2464 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2465 if (self == peer) {
2466 self = mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1);
2467 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2468 if (self == peer) {
2469 /* The last P_SYNC_UUID did not get though. Undo the last start of
2470 resync as sync source modifications of our UUIDs. */
2471
2472 if (mdev->agreed_pro_version < 91)
2473 return -1001;
2474
2475 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2476 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2477
2478 dev_info(DEV, "Undid last start of resync:\n");
2479
2480 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2481 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2482
2483 return 1;
2484 }
2485 }
2486
2487
2488 *rule_nr = 80;
d8c2a36b 2489 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
b411b363
PR
2490 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2491 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2492 if (self == peer)
2493 return 2;
2494 }
2495
2496 *rule_nr = 90;
2497 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2498 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2499 if (self == peer && self != ((u64)0))
2500 return 100;
2501
2502 *rule_nr = 100;
2503 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2504 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2505 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2506 peer = mdev->p_uuid[j] & ~((u64)1);
2507 if (self == peer)
2508 return -100;
2509 }
2510 }
2511
2512 return -1000;
2513}
2514
2515/* drbd_sync_handshake() returns the new conn state on success, or
2516 CONN_MASK (-1) on failure.
2517 */
2518static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2519 enum drbd_disk_state peer_disk) __must_hold(local)
2520{
2521 int hg, rule_nr;
2522 enum drbd_conns rv = C_MASK;
2523 enum drbd_disk_state mydisk;
2524
2525 mydisk = mdev->state.disk;
2526 if (mydisk == D_NEGOTIATING)
2527 mydisk = mdev->new_state_tmp.disk;
2528
2529 dev_info(DEV, "drbd_sync_handshake:\n");
2530 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2531 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2532 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2533
2534 hg = drbd_uuid_compare(mdev, &rule_nr);
2535
2536 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2537
2538 if (hg == -1000) {
2539 dev_alert(DEV, "Unrelated data, aborting!\n");
2540 return C_MASK;
2541 }
2542 if (hg == -1001) {
2543 dev_alert(DEV, "To resolve this both sides have to support at least protocol\n");
2544 return C_MASK;
2545 }
2546
2547 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2548 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2549 int f = (hg == -100) || abs(hg) == 2;
2550 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2551 if (f)
2552 hg = hg*2;
2553 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2554 hg > 0 ? "source" : "target");
2555 }
2556
3a11a487
AG
2557 if (abs(hg) == 100)
2558 drbd_khelper(mdev, "initial-split-brain");
2559
b411b363
PR
2560 if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2561 int pcount = (mdev->state.role == R_PRIMARY)
2562 + (peer_role == R_PRIMARY);
2563 int forced = (hg == -100);
2564
2565 switch (pcount) {
2566 case 0:
2567 hg = drbd_asb_recover_0p(mdev);
2568 break;
2569 case 1:
2570 hg = drbd_asb_recover_1p(mdev);
2571 break;
2572 case 2:
2573 hg = drbd_asb_recover_2p(mdev);
2574 break;
2575 }
2576 if (abs(hg) < 100) {
2577 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2578 "automatically solved. Sync from %s node\n",
2579 pcount, (hg < 0) ? "peer" : "this");
2580 if (forced) {
2581 dev_warn(DEV, "Doing a full sync, since"
2582 " UUIDs where ambiguous.\n");
2583 hg = hg*2;
2584 }
2585 }
2586 }
2587
2588 if (hg == -100) {
2589 if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2590 hg = -1;
2591 if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2592 hg = 1;
2593
2594 if (abs(hg) < 100)
2595 dev_warn(DEV, "Split-Brain detected, manually solved. "
2596 "Sync from %s node\n",
2597 (hg < 0) ? "peer" : "this");
2598 }
2599
2600 if (hg == -100) {
580b9767
LE
2601 /* FIXME this log message is not correct if we end up here
2602 * after an attempted attach on a diskless node.
2603 * We just refuse to attach -- well, we drop the "connection"
2604 * to that disk, in a way... */
3a11a487 2605 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
b411b363
PR
2606 drbd_khelper(mdev, "split-brain");
2607 return C_MASK;
2608 }
2609
2610 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2611 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2612 return C_MASK;
2613 }
2614
2615 if (hg < 0 && /* by intention we do not use mydisk here. */
2616 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2617 switch (mdev->net_conf->rr_conflict) {
2618 case ASB_CALL_HELPER:
2619 drbd_khelper(mdev, "pri-lost");
2620 /* fall through */
2621 case ASB_DISCONNECT:
2622 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2623 return C_MASK;
2624 case ASB_VIOLENTLY:
2625 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2626 "assumption\n");
2627 }
2628 }
2629
cf14c2e9
PR
2630 if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2631 if (hg == 0)
2632 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2633 else
2634 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2635 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2636 abs(hg) >= 2 ? "full" : "bit-map based");
2637 return C_MASK;
2638 }
2639
b411b363
PR
2640 if (abs(hg) >= 2) {
2641 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2642 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake"))
2643 return C_MASK;
2644 }
2645
2646 if (hg > 0) { /* become sync source. */
2647 rv = C_WF_BITMAP_S;
2648 } else if (hg < 0) { /* become sync target */
2649 rv = C_WF_BITMAP_T;
2650 } else {
2651 rv = C_CONNECTED;
2652 if (drbd_bm_total_weight(mdev)) {
2653 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2654 drbd_bm_total_weight(mdev));
2655 }
2656 }
2657
2658 return rv;
2659}
2660
2661/* returns 1 if invalid */
2662static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2663{
2664 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2665 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2666 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2667 return 0;
2668
2669 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2670 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2671 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2672 return 1;
2673
2674 /* everything else is valid if they are equal on both sides. */
2675 if (peer == self)
2676 return 0;
2677
2678 /* everything es is invalid. */
2679 return 1;
2680}
2681
2682static int receive_protocol(struct drbd_conf *mdev, struct p_header *h)
2683{
2684 struct p_protocol *p = (struct p_protocol *)h;
2685 int header_size, data_size;
2686 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
cf14c2e9 2687 int p_want_lose, p_two_primaries, cf;
b411b363
PR
2688 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2689
2690 header_size = sizeof(*p) - sizeof(*h);
2691 data_size = h->length - header_size;
2692
2693 if (drbd_recv(mdev, h->payload, header_size) != header_size)
2694 return FALSE;
2695
2696 p_proto = be32_to_cpu(p->protocol);
2697 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2698 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2699 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
b411b363 2700 p_two_primaries = be32_to_cpu(p->two_primaries);
cf14c2e9
PR
2701 cf = be32_to_cpu(p->conn_flags);
2702 p_want_lose = cf & CF_WANT_LOSE;
2703
2704 clear_bit(CONN_DRY_RUN, &mdev->flags);
2705
2706 if (cf & CF_DRY_RUN)
2707 set_bit(CONN_DRY_RUN, &mdev->flags);
b411b363
PR
2708
2709 if (p_proto != mdev->net_conf->wire_protocol) {
2710 dev_err(DEV, "incompatible communication protocols\n");
2711 goto disconnect;
2712 }
2713
2714 if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2715 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2716 goto disconnect;
2717 }
2718
2719 if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2720 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2721 goto disconnect;
2722 }
2723
2724 if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2725 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2726 goto disconnect;
2727 }
2728
2729 if (p_want_lose && mdev->net_conf->want_lose) {
2730 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2731 goto disconnect;
2732 }
2733
2734 if (p_two_primaries != mdev->net_conf->two_primaries) {
2735 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2736 goto disconnect;
2737 }
2738
2739 if (mdev->agreed_pro_version >= 87) {
2740 unsigned char *my_alg = mdev->net_conf->integrity_alg;
2741
2742 if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2743 return FALSE;
2744
2745 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2746 if (strcmp(p_integrity_alg, my_alg)) {
2747 dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2748 goto disconnect;
2749 }
2750 dev_info(DEV, "data-integrity-alg: %s\n",
2751 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2752 }
2753
2754 return TRUE;
2755
2756disconnect:
2757 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2758 return FALSE;
2759}
2760
2761/* helper function
2762 * input: alg name, feature name
2763 * return: NULL (alg name was "")
2764 * ERR_PTR(error) if something goes wrong
2765 * or the crypto hash ptr, if it worked out ok. */
2766struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2767 const char *alg, const char *name)
2768{
2769 struct crypto_hash *tfm;
2770
2771 if (!alg[0])
2772 return NULL;
2773
2774 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2775 if (IS_ERR(tfm)) {
2776 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2777 alg, name, PTR_ERR(tfm));
2778 return tfm;
2779 }
2780 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2781 crypto_free_hash(tfm);
2782 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2783 return ERR_PTR(-EINVAL);
2784 }
2785 return tfm;
2786}
2787
2788static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h)
2789{
2790 int ok = TRUE;
8e26f9cc 2791 struct p_rs_param_95 *p = (struct p_rs_param_95 *)h;
b411b363
PR
2792 unsigned int header_size, data_size, exp_max_sz;
2793 struct crypto_hash *verify_tfm = NULL;
2794 struct crypto_hash *csums_tfm = NULL;
2795 const int apv = mdev->agreed_pro_version;
778f271d
PR
2796 int *rs_plan_s = NULL;
2797 int fifo_size = 0;
b411b363
PR
2798
2799 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
2800 : apv == 88 ? sizeof(struct p_rs_param)
2801 + SHARED_SECRET_MAX
8e26f9cc
PR
2802 : apv <= 94 ? sizeof(struct p_rs_param_89)
2803 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
b411b363
PR
2804
2805 if (h->length > exp_max_sz) {
2806 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2807 h->length, exp_max_sz);
2808 return FALSE;
2809 }
2810
2811 if (apv <= 88) {
2812 header_size = sizeof(struct p_rs_param) - sizeof(*h);
2813 data_size = h->length - header_size;
8e26f9cc 2814 } else if (apv <= 94) {
b411b363
PR
2815 header_size = sizeof(struct p_rs_param_89) - sizeof(*h);
2816 data_size = h->length - header_size;
2817 D_ASSERT(data_size == 0);
8e26f9cc
PR
2818 } else {
2819 header_size = sizeof(struct p_rs_param_95) - sizeof(*h);
2820 data_size = h->length - header_size;
2821 D_ASSERT(data_size == 0);
b411b363
PR
2822 }
2823
2824 /* initialize verify_alg and csums_alg */
2825 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2826
2827 if (drbd_recv(mdev, h->payload, header_size) != header_size)
2828 return FALSE;
2829
2830 mdev->sync_conf.rate = be32_to_cpu(p->rate);
2831
2832 if (apv >= 88) {
2833 if (apv == 88) {
2834 if (data_size > SHARED_SECRET_MAX) {
2835 dev_err(DEV, "verify-alg too long, "
2836 "peer wants %u, accepting only %u byte\n",
2837 data_size, SHARED_SECRET_MAX);
2838 return FALSE;
2839 }
2840
2841 if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2842 return FALSE;
2843
2844 /* we expect NUL terminated string */
2845 /* but just in case someone tries to be evil */
2846 D_ASSERT(p->verify_alg[data_size-1] == 0);
2847 p->verify_alg[data_size-1] = 0;
2848
2849 } else /* apv >= 89 */ {
2850 /* we still expect NUL terminated strings */
2851 /* but just in case someone tries to be evil */
2852 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2853 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2854 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2855 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2856 }
2857
2858 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2859 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2860 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2861 mdev->sync_conf.verify_alg, p->verify_alg);
2862 goto disconnect;
2863 }
2864 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2865 p->verify_alg, "verify-alg");
2866 if (IS_ERR(verify_tfm)) {
2867 verify_tfm = NULL;
2868 goto disconnect;
2869 }
2870 }
2871
2872 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2873 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2874 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2875 mdev->sync_conf.csums_alg, p->csums_alg);
2876 goto disconnect;
2877 }
2878 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2879 p->csums_alg, "csums-alg");
2880 if (IS_ERR(csums_tfm)) {
2881 csums_tfm = NULL;
2882 goto disconnect;
2883 }
2884 }
2885
8e26f9cc
PR
2886 if (apv > 94) {
2887 mdev->sync_conf.rate = be32_to_cpu(p->rate);
2888 mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
2889 mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
2890 mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
2891 mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
778f271d
PR
2892
2893 fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
2894 if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
2895 rs_plan_s = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
2896 if (!rs_plan_s) {
2897 dev_err(DEV, "kmalloc of fifo_buffer failed");
2898 goto disconnect;
2899 }
2900 }
8e26f9cc 2901 }
b411b363
PR
2902
2903 spin_lock(&mdev->peer_seq_lock);
2904 /* lock against drbd_nl_syncer_conf() */
2905 if (verify_tfm) {
2906 strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2907 mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2908 crypto_free_hash(mdev->verify_tfm);
2909 mdev->verify_tfm = verify_tfm;
2910 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2911 }
2912 if (csums_tfm) {
2913 strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2914 mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2915 crypto_free_hash(mdev->csums_tfm);
2916 mdev->csums_tfm = csums_tfm;
2917 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2918 }
778f271d
PR
2919 if (fifo_size != mdev->rs_plan_s.size) {
2920 kfree(mdev->rs_plan_s.values);
2921 mdev->rs_plan_s.values = rs_plan_s;
2922 mdev->rs_plan_s.size = fifo_size;
2923 mdev->rs_planed = 0;
2924 }
b411b363
PR
2925 spin_unlock(&mdev->peer_seq_lock);
2926 }
2927
2928 return ok;
2929disconnect:
2930 /* just for completeness: actually not needed,
2931 * as this is not reached if csums_tfm was ok. */
2932 crypto_free_hash(csums_tfm);
2933 /* but free the verify_tfm again, if csums_tfm did not work out */
2934 crypto_free_hash(verify_tfm);
2935 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2936 return FALSE;
2937}
2938
2939static void drbd_setup_order_type(struct drbd_conf *mdev, int peer)
2940{
2941 /* sorry, we currently have no working implementation
2942 * of distributed TCQ */
2943}
2944
2945/* warn if the arguments differ by more than 12.5% */
2946static void warn_if_differ_considerably(struct drbd_conf *mdev,
2947 const char *s, sector_t a, sector_t b)
2948{
2949 sector_t d;
2950 if (a == 0 || b == 0)
2951 return;
2952 d = (a > b) ? (a - b) : (b - a);
2953 if (d > (a>>3) || d > (b>>3))
2954 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2955 (unsigned long long)a, (unsigned long long)b);
2956}
2957
2958static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
2959{
2960 struct p_sizes *p = (struct p_sizes *)h;
2961 enum determine_dev_size dd = unchanged;
2962 unsigned int max_seg_s;
2963 sector_t p_size, p_usize, my_usize;
2964 int ldsc = 0; /* local disk size changed */
e89b591c 2965 enum dds_flags ddsf;
b411b363
PR
2966
2967 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2968 if (drbd_recv(mdev, h->payload, h->length) != h->length)
2969 return FALSE;
2970
2971 p_size = be64_to_cpu(p->d_size);
2972 p_usize = be64_to_cpu(p->u_size);
2973
2974 if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2975 dev_err(DEV, "some backing storage is needed\n");
2976 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2977 return FALSE;
2978 }
2979
2980 /* just store the peer's disk size for now.
2981 * we still need to figure out whether we accept that. */
2982 mdev->p_size = p_size;
2983
2984#define min_not_zero(l, r) (l == 0) ? r : ((r == 0) ? l : min(l, r))
2985 if (get_ldev(mdev)) {
2986 warn_if_differ_considerably(mdev, "lower level device sizes",
2987 p_size, drbd_get_max_capacity(mdev->ldev));
2988 warn_if_differ_considerably(mdev, "user requested size",
2989 p_usize, mdev->ldev->dc.disk_size);
2990
2991 /* if this is the first connect, or an otherwise expected
2992 * param exchange, choose the minimum */
2993 if (mdev->state.conn == C_WF_REPORT_PARAMS)
2994 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2995 p_usize);
2996
2997 my_usize = mdev->ldev->dc.disk_size;
2998
2999 if (mdev->ldev->dc.disk_size != p_usize) {
3000 mdev->ldev->dc.disk_size = p_usize;
3001 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3002 (unsigned long)mdev->ldev->dc.disk_size);
3003 }
3004
3005 /* Never shrink a device with usable data during connect.
3006 But allow online shrinking if we are connected. */
a393db6f 3007 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
b411b363
PR
3008 drbd_get_capacity(mdev->this_bdev) &&
3009 mdev->state.disk >= D_OUTDATED &&
3010 mdev->state.conn < C_CONNECTED) {
3011 dev_err(DEV, "The peer's disk size is too small!\n");
3012 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3013 mdev->ldev->dc.disk_size = my_usize;
3014 put_ldev(mdev);
3015 return FALSE;
3016 }
3017 put_ldev(mdev);
3018 }
3019#undef min_not_zero
3020
e89b591c 3021 ddsf = be16_to_cpu(p->dds_flags);
b411b363 3022 if (get_ldev(mdev)) {
e89b591c 3023 dd = drbd_determin_dev_size(mdev, ddsf);
b411b363
PR
3024 put_ldev(mdev);
3025 if (dd == dev_size_error)
3026 return FALSE;
3027 drbd_md_sync(mdev);
3028 } else {
3029 /* I am diskless, need to accept the peer's size. */
3030 drbd_set_my_capacity(mdev, p_size);
3031 }
3032
b411b363
PR
3033 if (get_ldev(mdev)) {
3034 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3035 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3036 ldsc = 1;
3037 }
3038
a1c88d0d
LE
3039 if (mdev->agreed_pro_version < 94)
3040 max_seg_s = be32_to_cpu(p->max_segment_size);
3041 else /* drbd 8.3.8 onwards */
3042 max_seg_s = DRBD_MAX_SEGMENT_SIZE;
3043
b411b363
PR
3044 if (max_seg_s != queue_max_segment_size(mdev->rq_queue))
3045 drbd_setup_queue_param(mdev, max_seg_s);
3046
e89b591c 3047 drbd_setup_order_type(mdev, be16_to_cpu(p->queue_order_type));
b411b363
PR
3048 put_ldev(mdev);
3049 }
3050
3051 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3052 if (be64_to_cpu(p->c_size) !=
3053 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3054 /* we have different sizes, probably peer
3055 * needs to know my new size... */
e89b591c 3056 drbd_send_sizes(mdev, 0, ddsf);
b411b363
PR
3057 }
3058 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3059 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3060 if (mdev->state.pdsk >= D_INCONSISTENT &&
e89b591c
PR
3061 mdev->state.disk >= D_INCONSISTENT) {
3062 if (ddsf & DDSF_NO_RESYNC)
3063 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3064 else
3065 resync_after_online_grow(mdev);
3066 } else
b411b363
PR
3067 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3068 }
3069 }
3070
3071 return TRUE;
3072}
3073
3074static int receive_uuids(struct drbd_conf *mdev, struct p_header *h)
3075{
3076 struct p_uuids *p = (struct p_uuids *)h;
3077 u64 *p_uuid;
3078 int i;
3079
3080 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3081 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3082 return FALSE;
3083
3084 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3085
3086 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3087 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3088
3089 kfree(mdev->p_uuid);
3090 mdev->p_uuid = p_uuid;
3091
3092 if (mdev->state.conn < C_CONNECTED &&
3093 mdev->state.disk < D_INCONSISTENT &&
3094 mdev->state.role == R_PRIMARY &&
3095 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3096 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3097 (unsigned long long)mdev->ed_uuid);
3098 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3099 return FALSE;
3100 }
3101
3102 if (get_ldev(mdev)) {
3103 int skip_initial_sync =
3104 mdev->state.conn == C_CONNECTED &&
3105 mdev->agreed_pro_version >= 90 &&
3106 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3107 (p_uuid[UI_FLAGS] & 8);
3108 if (skip_initial_sync) {
3109 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3110 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3111 "clear_n_write from receive_uuids");
3112 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3113 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3114 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3115 CS_VERBOSE, NULL);
3116 drbd_md_sync(mdev);
3117 }
3118 put_ldev(mdev);
18a50fa2
PR
3119 } else if (mdev->state.disk < D_INCONSISTENT &&
3120 mdev->state.role == R_PRIMARY) {
3121 /* I am a diskless primary, the peer just created a new current UUID
3122 for me. */
3123 drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
b411b363
PR
3124 }
3125
3126 /* Before we test for the disk state, we should wait until an eventually
3127 ongoing cluster wide state change is finished. That is important if
3128 we are primary and are detaching from our disk. We need to see the
3129 new disk state... */
3130 wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3131 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3132 drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3133
3134 return TRUE;
3135}
3136
3137/**
3138 * convert_state() - Converts the peer's view of the cluster state to our point of view
3139 * @ps: The state as seen by the peer.
3140 */
3141static union drbd_state convert_state(union drbd_state ps)
3142{
3143 union drbd_state ms;
3144
3145 static enum drbd_conns c_tab[] = {
3146 [C_CONNECTED] = C_CONNECTED,
3147
3148 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3149 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3150 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3151 [C_VERIFY_S] = C_VERIFY_T,
3152 [C_MASK] = C_MASK,
3153 };
3154
3155 ms.i = ps.i;
3156
3157 ms.conn = c_tab[ps.conn];
3158 ms.peer = ps.role;
3159 ms.role = ps.peer;
3160 ms.pdsk = ps.disk;
3161 ms.disk = ps.pdsk;
3162 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3163
3164 return ms;
3165}
3166
3167static int receive_req_state(struct drbd_conf *mdev, struct p_header *h)
3168{
3169 struct p_req_state *p = (struct p_req_state *)h;
3170 union drbd_state mask, val;
3171 int rv;
3172
3173 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3174 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3175 return FALSE;
3176
3177 mask.i = be32_to_cpu(p->mask);
3178 val.i = be32_to_cpu(p->val);
3179
3180 if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3181 test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3182 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3183 return TRUE;
3184 }
3185
3186 mask = convert_state(mask);
3187 val = convert_state(val);
3188
3189 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3190
3191 drbd_send_sr_reply(mdev, rv);
3192 drbd_md_sync(mdev);
3193
3194 return TRUE;
3195}
3196
3197static int receive_state(struct drbd_conf *mdev, struct p_header *h)
3198{
3199 struct p_state *p = (struct p_state *)h;
3200 enum drbd_conns nconn, oconn;
3201 union drbd_state ns, peer_state;
3202 enum drbd_disk_state real_peer_disk;
65d922c3 3203 enum chg_state_flags cs_flags;
b411b363
PR
3204 int rv;
3205
3206 ERR_IF(h->length != (sizeof(*p)-sizeof(*h)))
3207 return FALSE;
3208
3209 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3210 return FALSE;
3211
3212 peer_state.i = be32_to_cpu(p->state);
3213
3214 real_peer_disk = peer_state.disk;
3215 if (peer_state.disk == D_NEGOTIATING) {
3216 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3217 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3218 }
3219
3220 spin_lock_irq(&mdev->req_lock);
3221 retry:
3222 oconn = nconn = mdev->state.conn;
3223 spin_unlock_irq(&mdev->req_lock);
3224
3225 if (nconn == C_WF_REPORT_PARAMS)
3226 nconn = C_CONNECTED;
3227
3228 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3229 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3230 int cr; /* consider resync */
3231
3232 /* if we established a new connection */
3233 cr = (oconn < C_CONNECTED);
3234 /* if we had an established connection
3235 * and one of the nodes newly attaches a disk */
3236 cr |= (oconn == C_CONNECTED &&
3237 (peer_state.disk == D_NEGOTIATING ||
3238 mdev->state.disk == D_NEGOTIATING));
3239 /* if we have both been inconsistent, and the peer has been
3240 * forced to be UpToDate with --overwrite-data */
3241 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3242 /* if we had been plain connected, and the admin requested to
3243 * start a sync by "invalidate" or "invalidate-remote" */
3244 cr |= (oconn == C_CONNECTED &&
3245 (peer_state.conn >= C_STARTING_SYNC_S &&
3246 peer_state.conn <= C_WF_BITMAP_T));
3247
3248 if (cr)
3249 nconn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3250
3251 put_ldev(mdev);
3252 if (nconn == C_MASK) {
580b9767 3253 nconn = C_CONNECTED;
b411b363
PR
3254 if (mdev->state.disk == D_NEGOTIATING) {
3255 drbd_force_state(mdev, NS(disk, D_DISKLESS));
b411b363
PR
3256 } else if (peer_state.disk == D_NEGOTIATING) {
3257 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3258 peer_state.disk = D_DISKLESS;
580b9767 3259 real_peer_disk = D_DISKLESS;
b411b363 3260 } else {
cf14c2e9
PR
3261 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3262 return FALSE;
b411b363
PR
3263 D_ASSERT(oconn == C_WF_REPORT_PARAMS);
3264 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3265 return FALSE;
3266 }
3267 }
3268 }
3269
3270 spin_lock_irq(&mdev->req_lock);
3271 if (mdev->state.conn != oconn)
3272 goto retry;
3273 clear_bit(CONSIDER_RESYNC, &mdev->flags);
3274 ns.i = mdev->state.i;
3275 ns.conn = nconn;
3276 ns.peer = peer_state.role;
3277 ns.pdsk = real_peer_disk;
3278 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3279 if ((nconn == C_CONNECTED || nconn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3280 ns.disk = mdev->new_state_tmp.disk;
65d922c3 3281 cs_flags = CS_VERBOSE + (oconn < C_CONNECTED && nconn >= C_CONNECTED ? 0 : CS_HARD);
481c6f50
PR
3282 if (ns.pdsk == D_CONSISTENT && ns.susp && nconn == C_CONNECTED && oconn < C_CONNECTED &&
3283 test_bit(NEW_CUR_UUID, &mdev->flags)) {
3284 /* Do not allow tl_restart(resend) for a rebooted peer. We can only allow this
3285 for temporal network outages! */
3286 spin_unlock_irq(&mdev->req_lock);
3287 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3288 tl_clear(mdev);
3289 drbd_uuid_new_current(mdev);
3290 clear_bit(NEW_CUR_UUID, &mdev->flags);
3291 drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0));
3292 return FALSE;
3293 }
65d922c3 3294 rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
b411b363
PR
3295 ns = mdev->state;
3296 spin_unlock_irq(&mdev->req_lock);
3297
3298 if (rv < SS_SUCCESS) {
3299 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3300 return FALSE;
3301 }
3302
3303 if (oconn > C_WF_REPORT_PARAMS) {
3304 if (nconn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3305 peer_state.disk != D_NEGOTIATING ) {
3306 /* we want resync, peer has not yet decided to sync... */
3307 /* Nowadays only used when forcing a node into primary role and
3308 setting its disk to UpToDate with that */
3309 drbd_send_uuids(mdev);
3310 drbd_send_state(mdev);
3311 }
3312 }
3313
3314 mdev->net_conf->want_lose = 0;
3315
3316 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3317
3318 return TRUE;
3319}
3320
3321static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h)
3322{
3323 struct p_rs_uuid *p = (struct p_rs_uuid *)h;
3324
3325 wait_event(mdev->misc_wait,
3326 mdev->state.conn == C_WF_SYNC_UUID ||
3327 mdev->state.conn < C_CONNECTED ||
3328 mdev->state.disk < D_NEGOTIATING);
3329
3330 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3331
3332 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3333 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3334 return FALSE;
3335
3336 /* Here the _drbd_uuid_ functions are right, current should
3337 _not_ be rotated into the history */
3338 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3339 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3340 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3341
3342 drbd_start_resync(mdev, C_SYNC_TARGET);
3343
3344 put_ldev(mdev);
3345 } else
3346 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3347
3348 return TRUE;
3349}
3350
3351enum receive_bitmap_ret { OK, DONE, FAILED };
3352
3353static enum receive_bitmap_ret
3354receive_bitmap_plain(struct drbd_conf *mdev, struct p_header *h,
3355 unsigned long *buffer, struct bm_xfer_ctx *c)
3356{
3357 unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3358 unsigned want = num_words * sizeof(long);
3359
3360 if (want != h->length) {
3361 dev_err(DEV, "%s:want (%u) != h->length (%u)\n", __func__, want, h->length);
3362 return FAILED;
3363 }
3364 if (want == 0)
3365 return DONE;
3366 if (drbd_recv(mdev, buffer, want) != want)
3367 return FAILED;
3368
3369 drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3370
3371 c->word_offset += num_words;
3372 c->bit_offset = c->word_offset * BITS_PER_LONG;
3373 if (c->bit_offset > c->bm_bits)
3374 c->bit_offset = c->bm_bits;
3375
3376 return OK;
3377}
3378
3379static enum receive_bitmap_ret
3380recv_bm_rle_bits(struct drbd_conf *mdev,
3381 struct p_compressed_bm *p,
3382 struct bm_xfer_ctx *c)
3383{
3384 struct bitstream bs;
3385 u64 look_ahead;
3386 u64 rl;
3387 u64 tmp;
3388 unsigned long s = c->bit_offset;
3389 unsigned long e;
3390 int len = p->head.length - (sizeof(*p) - sizeof(p->head));
3391 int toggle = DCBP_get_start(p);
3392 int have;
3393 int bits;
3394
3395 bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3396
3397 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3398 if (bits < 0)
3399 return FAILED;
3400
3401 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3402 bits = vli_decode_bits(&rl, look_ahead);
3403 if (bits <= 0)
3404 return FAILED;
3405
3406 if (toggle) {
3407 e = s + rl -1;
3408 if (e >= c->bm_bits) {
3409 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3410 return FAILED;
3411 }
3412 _drbd_bm_set_bits(mdev, s, e);
3413 }
3414
3415 if (have < bits) {
3416 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3417 have, bits, look_ahead,
3418 (unsigned int)(bs.cur.b - p->code),
3419 (unsigned int)bs.buf_len);
3420 return FAILED;
3421 }
3422 look_ahead >>= bits;
3423 have -= bits;
3424
3425 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3426 if (bits < 0)
3427 return FAILED;
3428 look_ahead |= tmp << have;
3429 have += bits;
3430 }
3431
3432 c->bit_offset = s;
3433 bm_xfer_ctx_bit_to_word_offset(c);
3434
3435 return (s == c->bm_bits) ? DONE : OK;
3436}
3437
3438static enum receive_bitmap_ret
3439decode_bitmap_c(struct drbd_conf *mdev,
3440 struct p_compressed_bm *p,
3441 struct bm_xfer_ctx *c)
3442{
3443 if (DCBP_get_code(p) == RLE_VLI_Bits)
3444 return recv_bm_rle_bits(mdev, p, c);
3445
3446 /* other variants had been implemented for evaluation,
3447 * but have been dropped as this one turned out to be "best"
3448 * during all our tests. */
3449
3450 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3451 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3452 return FAILED;
3453}
3454
3455void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3456 const char *direction, struct bm_xfer_ctx *c)
3457{
3458 /* what would it take to transfer it "plaintext" */
3459 unsigned plain = sizeof(struct p_header) *
3460 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3461 + c->bm_words * sizeof(long);
3462 unsigned total = c->bytes[0] + c->bytes[1];
3463 unsigned r;
3464
3465 /* total can not be zero. but just in case: */
3466 if (total == 0)
3467 return;
3468
3469 /* don't report if not compressed */
3470 if (total >= plain)
3471 return;
3472
3473 /* total < plain. check for overflow, still */
3474 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3475 : (1000 * total / plain);
3476
3477 if (r > 1000)
3478 r = 1000;
3479
3480 r = 1000 - r;
3481 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3482 "total %u; compression: %u.%u%%\n",
3483 direction,
3484 c->bytes[1], c->packets[1],
3485 c->bytes[0], c->packets[0],
3486 total, r/10, r % 10);
3487}
3488
3489/* Since we are processing the bitfield from lower addresses to higher,
3490 it does not matter if the process it in 32 bit chunks or 64 bit
3491 chunks as long as it is little endian. (Understand it as byte stream,
3492 beginning with the lowest byte...) If we would use big endian
3493 we would need to process it from the highest address to the lowest,
3494 in order to be agnostic to the 32 vs 64 bits issue.
3495
3496 returns 0 on failure, 1 if we successfully received it. */
3497static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h)
3498{
3499 struct bm_xfer_ctx c;
3500 void *buffer;
3501 enum receive_bitmap_ret ret;
3502 int ok = FALSE;
3503
3504 wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
3505
3506 drbd_bm_lock(mdev, "receive bitmap");
3507
3508 /* maybe we should use some per thread scratch page,
3509 * and allocate that during initial device creation? */
3510 buffer = (unsigned long *) __get_free_page(GFP_NOIO);
3511 if (!buffer) {
3512 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3513 goto out;
3514 }
3515
3516 c = (struct bm_xfer_ctx) {
3517 .bm_bits = drbd_bm_bits(mdev),
3518 .bm_words = drbd_bm_words(mdev),
3519 };
3520
3521 do {
3522 if (h->command == P_BITMAP) {
3523 ret = receive_bitmap_plain(mdev, h, buffer, &c);
3524 } else if (h->command == P_COMPRESSED_BITMAP) {
3525 /* MAYBE: sanity check that we speak proto >= 90,
3526 * and the feature is enabled! */
3527 struct p_compressed_bm *p;
3528
3529 if (h->length > BM_PACKET_PAYLOAD_BYTES) {
3530 dev_err(DEV, "ReportCBitmap packet too large\n");
3531 goto out;
3532 }
3533 /* use the page buff */
3534 p = buffer;
3535 memcpy(p, h, sizeof(*h));
3536 if (drbd_recv(mdev, p->head.payload, h->length) != h->length)
3537 goto out;
3538 if (p->head.length <= (sizeof(*p) - sizeof(p->head))) {
3539 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", p->head.length);
3540 return FAILED;
3541 }
3542 ret = decode_bitmap_c(mdev, p, &c);
3543 } else {
3544 dev_warn(DEV, "receive_bitmap: h->command neither ReportBitMap nor ReportCBitMap (is 0x%x)", h->command);
3545 goto out;
3546 }
3547
3548 c.packets[h->command == P_BITMAP]++;
3549 c.bytes[h->command == P_BITMAP] += sizeof(struct p_header) + h->length;
3550
3551 if (ret != OK)
3552 break;
3553
3554 if (!drbd_recv_header(mdev, h))
3555 goto out;
3556 } while (ret == OK);
3557 if (ret == FAILED)
3558 goto out;
3559
3560 INFO_bm_xfer_stats(mdev, "receive", &c);
3561
3562 if (mdev->state.conn == C_WF_BITMAP_T) {
3563 ok = !drbd_send_bitmap(mdev);
3564 if (!ok)
3565 goto out;
3566 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3567 ok = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3568 D_ASSERT(ok == SS_SUCCESS);
3569 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3570 /* admin may have requested C_DISCONNECTING,
3571 * other threads may have noticed network errors */
3572 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3573 drbd_conn_str(mdev->state.conn));
3574 }
3575
3576 ok = TRUE;
3577 out:
3578 drbd_bm_unlock(mdev);
3579 if (ok && mdev->state.conn == C_WF_BITMAP_S)
3580 drbd_start_resync(mdev, C_SYNC_SOURCE);
3581 free_page((unsigned long) buffer);
3582 return ok;
3583}
3584
e7f52dfb 3585static int receive_skip_(struct drbd_conf *mdev, struct p_header *h, int silent)
b411b363
PR
3586{
3587 /* TODO zero copy sink :) */
3588 static char sink[128];
3589 int size, want, r;
3590
e7f52dfb
LE
3591 if (!silent)
3592 dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3593 h->command, h->length);
b411b363
PR
3594
3595 size = h->length;
3596 while (size > 0) {
3597 want = min_t(int, size, sizeof(sink));
3598 r = drbd_recv(mdev, sink, want);
3599 ERR_IF(r <= 0) break;
3600 size -= r;
3601 }
3602 return size == 0;
3603}
3604
e7f52dfb 3605static int receive_skip(struct drbd_conf *mdev, struct p_header *h)
0ced55a3 3606{
e7f52dfb 3607 return receive_skip_(mdev, h, 0);
0ced55a3
PR
3608}
3609
e7f52dfb 3610static int receive_skip_silent(struct drbd_conf *mdev, struct p_header *h)
0ced55a3 3611{
e7f52dfb 3612 return receive_skip_(mdev, h, 1);
0ced55a3
PR
3613}
3614
e7f52dfb 3615static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h)
0ced55a3 3616{
e7f52dfb
LE
3617 if (mdev->state.disk >= D_INCONSISTENT)
3618 drbd_kick_lo(mdev);
0ced55a3 3619
e7f52dfb
LE
3620 /* Make sure we've acked all the TCP data associated
3621 * with the data requests being unplugged */
3622 drbd_tcp_quickack(mdev->data.socket);
0ced55a3 3623
0ced55a3
PR
3624 return TRUE;
3625}
3626
b411b363
PR
3627typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, struct p_header *);
3628
3629static drbd_cmd_handler_f drbd_default_handler[] = {
3630 [P_DATA] = receive_Data,
3631 [P_DATA_REPLY] = receive_DataReply,
3632 [P_RS_DATA_REPLY] = receive_RSDataReply,
3633 [P_BARRIER] = receive_Barrier,
3634 [P_BITMAP] = receive_bitmap,
3635 [P_COMPRESSED_BITMAP] = receive_bitmap,
3636 [P_UNPLUG_REMOTE] = receive_UnplugRemote,
3637 [P_DATA_REQUEST] = receive_DataRequest,
3638 [P_RS_DATA_REQUEST] = receive_DataRequest,
3639 [P_SYNC_PARAM] = receive_SyncParam,
3640 [P_SYNC_PARAM89] = receive_SyncParam,
3641 [P_PROTOCOL] = receive_protocol,
3642 [P_UUIDS] = receive_uuids,
3643 [P_SIZES] = receive_sizes,
3644 [P_STATE] = receive_state,
3645 [P_STATE_CHG_REQ] = receive_req_state,
3646 [P_SYNC_UUID] = receive_sync_uuid,
3647 [P_OV_REQUEST] = receive_DataRequest,
3648 [P_OV_REPLY] = receive_DataRequest,
3649 [P_CSUM_RS_REQUEST] = receive_DataRequest,
e7f52dfb 3650 [P_DELAY_PROBE] = receive_skip_silent,
b411b363
PR
3651 /* anything missing from this table is in
3652 * the asender_tbl, see get_asender_cmd */
3653 [P_MAX_CMD] = NULL,
3654};
3655
3656static drbd_cmd_handler_f *drbd_cmd_handler = drbd_default_handler;
3657static drbd_cmd_handler_f *drbd_opt_cmd_handler;
3658
3659static void drbdd(struct drbd_conf *mdev)
3660{
3661 drbd_cmd_handler_f handler;
3662 struct p_header *header = &mdev->data.rbuf.header;
3663
3664 while (get_t_state(&mdev->receiver) == Running) {
3665 drbd_thread_current_set_cpu(mdev);
0b33a916
LE
3666 if (!drbd_recv_header(mdev, header)) {
3667 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
b411b363 3668 break;
0b33a916 3669 }
b411b363
PR
3670
3671 if (header->command < P_MAX_CMD)
3672 handler = drbd_cmd_handler[header->command];
3673 else if (P_MAY_IGNORE < header->command
3674 && header->command < P_MAX_OPT_CMD)
3675 handler = drbd_opt_cmd_handler[header->command-P_MAY_IGNORE];
3676 else if (header->command > P_MAX_OPT_CMD)
3677 handler = receive_skip;
3678 else
3679 handler = NULL;
3680
3681 if (unlikely(!handler)) {
3682 dev_err(DEV, "unknown packet type %d, l: %d!\n",
3683 header->command, header->length);
3684 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3685 break;
3686 }
3687 if (unlikely(!handler(mdev, header))) {
3688 dev_err(DEV, "error receiving %s, l: %d!\n",
3689 cmdname(header->command), header->length);
3690 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3691 break;
3692 }
b411b363
PR
3693 }
3694}
3695
b411b363
PR
3696void drbd_flush_workqueue(struct drbd_conf *mdev)
3697{
3698 struct drbd_wq_barrier barr;
3699
3700 barr.w.cb = w_prev_work_done;
3701 init_completion(&barr.done);
3702 drbd_queue_work(&mdev->data.work, &barr.w);
3703 wait_for_completion(&barr.done);
3704}
3705
f70b3511
PR
3706void drbd_free_tl_hash(struct drbd_conf *mdev)
3707{
3708 struct hlist_head *h;
3709
3710 spin_lock_irq(&mdev->req_lock);
3711
3712 if (!mdev->tl_hash || mdev->state.conn != C_STANDALONE) {
3713 spin_unlock_irq(&mdev->req_lock);
3714 return;
3715 }
3716 /* paranoia code */
3717 for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3718 if (h->first)
3719 dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3720 (int)(h - mdev->ee_hash), h->first);
3721 kfree(mdev->ee_hash);
3722 mdev->ee_hash = NULL;
3723 mdev->ee_hash_s = 0;
3724
3725 /* paranoia code */
3726 for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3727 if (h->first)
3728 dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3729 (int)(h - mdev->tl_hash), h->first);
3730 kfree(mdev->tl_hash);
3731 mdev->tl_hash = NULL;
3732 mdev->tl_hash_s = 0;
3733 spin_unlock_irq(&mdev->req_lock);
3734}
3735
b411b363
PR
3736static void drbd_disconnect(struct drbd_conf *mdev)
3737{
3738 enum drbd_fencing_p fp;
3739 union drbd_state os, ns;
3740 int rv = SS_UNKNOWN_ERROR;
3741 unsigned int i;
3742
3743 if (mdev->state.conn == C_STANDALONE)
3744 return;
3745 if (mdev->state.conn >= C_WF_CONNECTION)
3746 dev_err(DEV, "ASSERT FAILED cstate = %s, expected < WFConnection\n",
3747 drbd_conn_str(mdev->state.conn));
3748
3749 /* asender does not clean up anything. it must not interfere, either */
3750 drbd_thread_stop(&mdev->asender);
b411b363 3751 drbd_free_sock(mdev);
b411b363 3752
85719573 3753 /* wait for current activity to cease. */
b411b363
PR
3754 spin_lock_irq(&mdev->req_lock);
3755 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3756 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3757 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3758 spin_unlock_irq(&mdev->req_lock);
3759
3760 /* We do not have data structures that would allow us to
3761 * get the rs_pending_cnt down to 0 again.
3762 * * On C_SYNC_TARGET we do not have any data structures describing
3763 * the pending RSDataRequest's we have sent.
3764 * * On C_SYNC_SOURCE there is no data structure that tracks
3765 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3766 * And no, it is not the sum of the reference counts in the
3767 * resync_LRU. The resync_LRU tracks the whole operation including
3768 * the disk-IO, while the rs_pending_cnt only tracks the blocks
3769 * on the fly. */
3770 drbd_rs_cancel_all(mdev);
3771 mdev->rs_total = 0;
3772 mdev->rs_failed = 0;
3773 atomic_set(&mdev->rs_pending_cnt, 0);
3774 wake_up(&mdev->misc_wait);
3775
3776 /* make sure syncer is stopped and w_resume_next_sg queued */
3777 del_timer_sync(&mdev->resync_timer);
3778 set_bit(STOP_SYNC_TIMER, &mdev->flags);
3779 resync_timer_fn((unsigned long)mdev);
3780
b411b363
PR
3781 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3782 * w_make_resync_request etc. which may still be on the worker queue
3783 * to be "canceled" */
3784 drbd_flush_workqueue(mdev);
3785
3786 /* This also does reclaim_net_ee(). If we do this too early, we might
3787 * miss some resync ee and pages.*/
3788 drbd_process_done_ee(mdev);
3789
3790 kfree(mdev->p_uuid);
3791 mdev->p_uuid = NULL;
3792
3793 if (!mdev->state.susp)
3794 tl_clear(mdev);
3795
b411b363
PR
3796 dev_info(DEV, "Connection closed\n");
3797
3798 drbd_md_sync(mdev);
3799
3800 fp = FP_DONT_CARE;
3801 if (get_ldev(mdev)) {
3802 fp = mdev->ldev->dc.fencing;
3803 put_ldev(mdev);
3804 }
3805
87f7be4c
PR
3806 if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
3807 drbd_try_outdate_peer_async(mdev);
b411b363
PR
3808
3809 spin_lock_irq(&mdev->req_lock);
3810 os = mdev->state;
3811 if (os.conn >= C_UNCONNECTED) {
3812 /* Do not restart in case we are C_DISCONNECTING */
3813 ns = os;
3814 ns.conn = C_UNCONNECTED;
3815 rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3816 }
3817 spin_unlock_irq(&mdev->req_lock);
3818
3819 if (os.conn == C_DISCONNECTING) {
84dfb9f5 3820 wait_event(mdev->net_cnt_wait, atomic_read(&mdev->net_cnt) == 0);
b411b363 3821
f70b3511
PR
3822 if (!mdev->state.susp) {
3823 /* we must not free the tl_hash
3824 * while application io is still on the fly */
3825 wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
3826 drbd_free_tl_hash(mdev);
3827 }
b411b363
PR
3828
3829 crypto_free_hash(mdev->cram_hmac_tfm);
3830 mdev->cram_hmac_tfm = NULL;
3831
3832 kfree(mdev->net_conf);
3833 mdev->net_conf = NULL;
3834 drbd_request_state(mdev, NS(conn, C_STANDALONE));
3835 }
3836
3837 /* tcp_close and release of sendpage pages can be deferred. I don't
3838 * want to use SO_LINGER, because apparently it can be deferred for
3839 * more than 20 seconds (longest time I checked).
3840 *
3841 * Actually we don't care for exactly when the network stack does its
3842 * put_page(), but release our reference on these pages right here.
3843 */
3844 i = drbd_release_ee(mdev, &mdev->net_ee);
3845 if (i)
3846 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3847 i = atomic_read(&mdev->pp_in_use);
3848 if (i)
45bb912b 3849 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
b411b363
PR
3850
3851 D_ASSERT(list_empty(&mdev->read_ee));
3852 D_ASSERT(list_empty(&mdev->active_ee));
3853 D_ASSERT(list_empty(&mdev->sync_ee));
3854 D_ASSERT(list_empty(&mdev->done_ee));
3855
3856 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3857 atomic_set(&mdev->current_epoch->epoch_size, 0);
3858 D_ASSERT(list_empty(&mdev->current_epoch->list));
3859}
3860
3861/*
3862 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3863 * we can agree on is stored in agreed_pro_version.
3864 *
3865 * feature flags and the reserved array should be enough room for future
3866 * enhancements of the handshake protocol, and possible plugins...
3867 *
3868 * for now, they are expected to be zero, but ignored.
3869 */
3870static int drbd_send_handshake(struct drbd_conf *mdev)
3871{
3872 /* ASSERT current == mdev->receiver ... */
3873 struct p_handshake *p = &mdev->data.sbuf.handshake;
3874 int ok;
3875
3876 if (mutex_lock_interruptible(&mdev->data.mutex)) {
3877 dev_err(DEV, "interrupted during initial handshake\n");
3878 return 0; /* interrupted. not ok. */
3879 }
3880
3881 if (mdev->data.socket == NULL) {
3882 mutex_unlock(&mdev->data.mutex);
3883 return 0;
3884 }
3885
3886 memset(p, 0, sizeof(*p));
3887 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3888 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3889 ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3890 (struct p_header *)p, sizeof(*p), 0 );
3891 mutex_unlock(&mdev->data.mutex);
3892 return ok;
3893}
3894
3895/*
3896 * return values:
3897 * 1 yes, we have a valid connection
3898 * 0 oops, did not work out, please try again
3899 * -1 peer talks different language,
3900 * no point in trying again, please go standalone.
3901 */
3902static int drbd_do_handshake(struct drbd_conf *mdev)
3903{
3904 /* ASSERT current == mdev->receiver ... */
3905 struct p_handshake *p = &mdev->data.rbuf.handshake;
3906 const int expect = sizeof(struct p_handshake)
3907 -sizeof(struct p_header);
3908 int rv;
3909
3910 rv = drbd_send_handshake(mdev);
3911 if (!rv)
3912 return 0;
3913
3914 rv = drbd_recv_header(mdev, &p->head);
3915 if (!rv)
3916 return 0;
3917
3918 if (p->head.command != P_HAND_SHAKE) {
3919 dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
3920 cmdname(p->head.command), p->head.command);
3921 return -1;
3922 }
3923
3924 if (p->head.length != expect) {
3925 dev_err(DEV, "expected HandShake length: %u, received: %u\n",
3926 expect, p->head.length);
3927 return -1;
3928 }
3929
3930 rv = drbd_recv(mdev, &p->head.payload, expect);
3931
3932 if (rv != expect) {
3933 dev_err(DEV, "short read receiving handshake packet: l=%u\n", rv);
3934 return 0;
3935 }
3936
b411b363
PR
3937 p->protocol_min = be32_to_cpu(p->protocol_min);
3938 p->protocol_max = be32_to_cpu(p->protocol_max);
3939 if (p->protocol_max == 0)
3940 p->protocol_max = p->protocol_min;
3941
3942 if (PRO_VERSION_MAX < p->protocol_min ||
3943 PRO_VERSION_MIN > p->protocol_max)
3944 goto incompat;
3945
3946 mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
3947
3948 dev_info(DEV, "Handshake successful: "
3949 "Agreed network protocol version %d\n", mdev->agreed_pro_version);
3950
3951 return 1;
3952
3953 incompat:
3954 dev_err(DEV, "incompatible DRBD dialects: "
3955 "I support %d-%d, peer supports %d-%d\n",
3956 PRO_VERSION_MIN, PRO_VERSION_MAX,
3957 p->protocol_min, p->protocol_max);
3958 return -1;
3959}
3960
3961#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
3962static int drbd_do_auth(struct drbd_conf *mdev)
3963{
3964 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
3965 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
b10d96cb 3966 return -1;
b411b363
PR
3967}
3968#else
3969#define CHALLENGE_LEN 64
b10d96cb
JT
3970
3971/* Return value:
3972 1 - auth succeeded,
3973 0 - failed, try again (network error),
3974 -1 - auth failed, don't try again.
3975*/
3976
b411b363
PR
3977static int drbd_do_auth(struct drbd_conf *mdev)
3978{
3979 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
3980 struct scatterlist sg;
3981 char *response = NULL;
3982 char *right_response = NULL;
3983 char *peers_ch = NULL;
3984 struct p_header p;
3985 unsigned int key_len = strlen(mdev->net_conf->shared_secret);
3986 unsigned int resp_size;
3987 struct hash_desc desc;
3988 int rv;
3989
3990 desc.tfm = mdev->cram_hmac_tfm;
3991 desc.flags = 0;
3992
3993 rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
3994 (u8 *)mdev->net_conf->shared_secret, key_len);
3995 if (rv) {
3996 dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
b10d96cb 3997 rv = -1;
b411b363
PR
3998 goto fail;
3999 }
4000
4001 get_random_bytes(my_challenge, CHALLENGE_LEN);
4002
4003 rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4004 if (!rv)
4005 goto fail;
4006
4007 rv = drbd_recv_header(mdev, &p);
4008 if (!rv)
4009 goto fail;
4010
4011 if (p.command != P_AUTH_CHALLENGE) {
4012 dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4013 cmdname(p.command), p.command);
4014 rv = 0;
4015 goto fail;
4016 }
4017
4018 if (p.length > CHALLENGE_LEN*2) {
4019 dev_err(DEV, "expected AuthChallenge payload too big.\n");
b10d96cb 4020 rv = -1;
b411b363
PR
4021 goto fail;
4022 }
4023
4024 peers_ch = kmalloc(p.length, GFP_NOIO);
4025 if (peers_ch == NULL) {
4026 dev_err(DEV, "kmalloc of peers_ch failed\n");
b10d96cb 4027 rv = -1;
b411b363
PR
4028 goto fail;
4029 }
4030
4031 rv = drbd_recv(mdev, peers_ch, p.length);
4032
4033 if (rv != p.length) {
4034 dev_err(DEV, "short read AuthChallenge: l=%u\n", rv);
4035 rv = 0;
4036 goto fail;
4037 }
4038
4039 resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
4040 response = kmalloc(resp_size, GFP_NOIO);
4041 if (response == NULL) {
4042 dev_err(DEV, "kmalloc of response failed\n");
b10d96cb 4043 rv = -1;
b411b363
PR
4044 goto fail;
4045 }
4046
4047 sg_init_table(&sg, 1);
4048 sg_set_buf(&sg, peers_ch, p.length);
4049
4050 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4051 if (rv) {
4052 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4053 rv = -1;
b411b363
PR
4054 goto fail;
4055 }
4056
4057 rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4058 if (!rv)
4059 goto fail;
4060
4061 rv = drbd_recv_header(mdev, &p);
4062 if (!rv)
4063 goto fail;
4064
4065 if (p.command != P_AUTH_RESPONSE) {
4066 dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
4067 cmdname(p.command), p.command);
4068 rv = 0;
4069 goto fail;
4070 }
4071
4072 if (p.length != resp_size) {
4073 dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4074 rv = 0;
4075 goto fail;
4076 }
4077
4078 rv = drbd_recv(mdev, response , resp_size);
4079
4080 if (rv != resp_size) {
4081 dev_err(DEV, "short read receiving AuthResponse: l=%u\n", rv);
4082 rv = 0;
4083 goto fail;
4084 }
4085
4086 right_response = kmalloc(resp_size, GFP_NOIO);
2d1ee87d 4087 if (right_response == NULL) {
b411b363 4088 dev_err(DEV, "kmalloc of right_response failed\n");
b10d96cb 4089 rv = -1;
b411b363
PR
4090 goto fail;
4091 }
4092
4093 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4094
4095 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4096 if (rv) {
4097 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4098 rv = -1;
b411b363
PR
4099 goto fail;
4100 }
4101
4102 rv = !memcmp(response, right_response, resp_size);
4103
4104 if (rv)
4105 dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4106 resp_size, mdev->net_conf->cram_hmac_alg);
b10d96cb
JT
4107 else
4108 rv = -1;
b411b363
PR
4109
4110 fail:
4111 kfree(peers_ch);
4112 kfree(response);
4113 kfree(right_response);
4114
4115 return rv;
4116}
4117#endif
4118
4119int drbdd_init(struct drbd_thread *thi)
4120{
4121 struct drbd_conf *mdev = thi->mdev;
4122 unsigned int minor = mdev_to_minor(mdev);
4123 int h;
4124
4125 sprintf(current->comm, "drbd%d_receiver", minor);
4126
4127 dev_info(DEV, "receiver (re)started\n");
4128
4129 do {
4130 h = drbd_connect(mdev);
4131 if (h == 0) {
4132 drbd_disconnect(mdev);
4133 __set_current_state(TASK_INTERRUPTIBLE);
4134 schedule_timeout(HZ);
4135 }
4136 if (h == -1) {
4137 dev_warn(DEV, "Discarding network configuration.\n");
4138 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4139 }
4140 } while (h == 0);
4141
4142 if (h > 0) {
4143 if (get_net_conf(mdev)) {
4144 drbdd(mdev);
4145 put_net_conf(mdev);
4146 }
4147 }
4148
4149 drbd_disconnect(mdev);
4150
4151 dev_info(DEV, "receiver terminated\n");
4152 return 0;
4153}
4154
4155/* ********* acknowledge sender ******** */
4156
4157static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h)
4158{
4159 struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4160
4161 int retcode = be32_to_cpu(p->retcode);
4162
4163 if (retcode >= SS_SUCCESS) {
4164 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4165 } else {
4166 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4167 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4168 drbd_set_st_err_str(retcode), retcode);
4169 }
4170 wake_up(&mdev->state_wait);
4171
4172 return TRUE;
4173}
4174
4175static int got_Ping(struct drbd_conf *mdev, struct p_header *h)
4176{
4177 return drbd_send_ping_ack(mdev);
4178
4179}
4180
4181static int got_PingAck(struct drbd_conf *mdev, struct p_header *h)
4182{
4183 /* restore idle timeout */
4184 mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
309d1608
PR
4185 if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4186 wake_up(&mdev->misc_wait);
b411b363
PR
4187
4188 return TRUE;
4189}
4190
4191static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h)
4192{
4193 struct p_block_ack *p = (struct p_block_ack *)h;
4194 sector_t sector = be64_to_cpu(p->sector);
4195 int blksize = be32_to_cpu(p->blksize);
4196
4197 D_ASSERT(mdev->agreed_pro_version >= 89);
4198
4199 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4200
4201 drbd_rs_complete_io(mdev, sector);
4202 drbd_set_in_sync(mdev, sector, blksize);
4203 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4204 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4205 dec_rs_pending(mdev);
778f271d 4206 atomic_add(blksize >> 9, &mdev->rs_sect_in);
b411b363
PR
4207
4208 return TRUE;
4209}
4210
4211/* when we receive the ACK for a write request,
4212 * verify that we actually know about it */
4213static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4214 u64 id, sector_t sector)
4215{
4216 struct hlist_head *slot = tl_hash_slot(mdev, sector);
4217 struct hlist_node *n;
4218 struct drbd_request *req;
4219
4220 hlist_for_each_entry(req, n, slot, colision) {
4221 if ((unsigned long)req == (unsigned long)id) {
4222 if (req->sector != sector) {
4223 dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4224 "wrong sector (%llus versus %llus)\n", req,
4225 (unsigned long long)req->sector,
4226 (unsigned long long)sector);
4227 break;
4228 }
4229 return req;
4230 }
4231 }
4232 dev_err(DEV, "_ack_id_to_req: failed to find req %p, sector %llus in list\n",
4233 (void *)(unsigned long)id, (unsigned long long)sector);
4234 return NULL;
4235}
4236
4237typedef struct drbd_request *(req_validator_fn)
4238 (struct drbd_conf *mdev, u64 id, sector_t sector);
4239
4240static int validate_req_change_req_state(struct drbd_conf *mdev,
4241 u64 id, sector_t sector, req_validator_fn validator,
4242 const char *func, enum drbd_req_event what)
4243{
4244 struct drbd_request *req;
4245 struct bio_and_error m;
4246
4247 spin_lock_irq(&mdev->req_lock);
4248 req = validator(mdev, id, sector);
4249 if (unlikely(!req)) {
4250 spin_unlock_irq(&mdev->req_lock);
4251 dev_err(DEV, "%s: got a corrupt block_id/sector pair\n", func);
4252 return FALSE;
4253 }
4254 __req_mod(req, what, &m);
4255 spin_unlock_irq(&mdev->req_lock);
4256
4257 if (m.bio)
4258 complete_master_bio(mdev, &m);
4259 return TRUE;
4260}
4261
4262static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h)
4263{
4264 struct p_block_ack *p = (struct p_block_ack *)h;
4265 sector_t sector = be64_to_cpu(p->sector);
4266 int blksize = be32_to_cpu(p->blksize);
4267 enum drbd_req_event what;
4268
4269 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4270
4271 if (is_syncer_block_id(p->block_id)) {
4272 drbd_set_in_sync(mdev, sector, blksize);
4273 dec_rs_pending(mdev);
4274 return TRUE;
4275 }
4276 switch (be16_to_cpu(h->command)) {
4277 case P_RS_WRITE_ACK:
4278 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4279 what = write_acked_by_peer_and_sis;
4280 break;
4281 case P_WRITE_ACK:
4282 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4283 what = write_acked_by_peer;
4284 break;
4285 case P_RECV_ACK:
4286 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4287 what = recv_acked_by_peer;
4288 break;
4289 case P_DISCARD_ACK:
4290 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4291 what = conflict_discarded_by_peer;
4292 break;
4293 default:
4294 D_ASSERT(0);
4295 return FALSE;
4296 }
4297
4298 return validate_req_change_req_state(mdev, p->block_id, sector,
4299 _ack_id_to_req, __func__ , what);
4300}
4301
4302static int got_NegAck(struct drbd_conf *mdev, struct p_header *h)
4303{
4304 struct p_block_ack *p = (struct p_block_ack *)h;
4305 sector_t sector = be64_to_cpu(p->sector);
4306
4307 if (__ratelimit(&drbd_ratelimit_state))
4308 dev_warn(DEV, "Got NegAck packet. Peer is in troubles?\n");
4309
4310 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4311
4312 if (is_syncer_block_id(p->block_id)) {
4313 int size = be32_to_cpu(p->blksize);
4314 dec_rs_pending(mdev);
4315 drbd_rs_failed_io(mdev, sector, size);
4316 return TRUE;
4317 }
4318 return validate_req_change_req_state(mdev, p->block_id, sector,
4319 _ack_id_to_req, __func__ , neg_acked);
4320}
4321
4322static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h)
4323{
4324 struct p_block_ack *p = (struct p_block_ack *)h;
4325 sector_t sector = be64_to_cpu(p->sector);
4326
4327 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4328 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4329 (unsigned long long)sector, be32_to_cpu(p->blksize));
4330
4331 return validate_req_change_req_state(mdev, p->block_id, sector,
4332 _ar_id_to_req, __func__ , neg_acked);
4333}
4334
4335static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h)
4336{
4337 sector_t sector;
4338 int size;
4339 struct p_block_ack *p = (struct p_block_ack *)h;
4340
4341 sector = be64_to_cpu(p->sector);
4342 size = be32_to_cpu(p->blksize);
b411b363
PR
4343
4344 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4345
4346 dec_rs_pending(mdev);
4347
4348 if (get_ldev_if_state(mdev, D_FAILED)) {
4349 drbd_rs_complete_io(mdev, sector);
4350 drbd_rs_failed_io(mdev, sector, size);
4351 put_ldev(mdev);
4352 }
4353
4354 return TRUE;
4355}
4356
4357static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h)
4358{
4359 struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4360
4361 tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4362
4363 return TRUE;
4364}
4365
4366static int got_OVResult(struct drbd_conf *mdev, struct p_header *h)
4367{
4368 struct p_block_ack *p = (struct p_block_ack *)h;
4369 struct drbd_work *w;
4370 sector_t sector;
4371 int size;
4372
4373 sector = be64_to_cpu(p->sector);
4374 size = be32_to_cpu(p->blksize);
4375
4376 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4377
4378 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4379 drbd_ov_oos_found(mdev, sector, size);
4380 else
4381 ov_oos_print(mdev);
4382
4383 drbd_rs_complete_io(mdev, sector);
4384 dec_rs_pending(mdev);
4385
4386 if (--mdev->ov_left == 0) {
4387 w = kmalloc(sizeof(*w), GFP_NOIO);
4388 if (w) {
4389 w->cb = w_ov_finished;
4390 drbd_queue_work_front(&mdev->data.work, w);
4391 } else {
4392 dev_err(DEV, "kmalloc(w) failed.");
4393 ov_oos_print(mdev);
4394 drbd_resync_finished(mdev);
4395 }
4396 }
4397 return TRUE;
4398}
4399
e7f52dfb 4400static int got_something_to_ignore_m(struct drbd_conf *mdev, struct p_header *h)
0ced55a3 4401{
e7f52dfb 4402 /* IGNORE */
0ced55a3
PR
4403 return TRUE;
4404}
4405
b411b363
PR
4406struct asender_cmd {
4407 size_t pkt_size;
4408 int (*process)(struct drbd_conf *mdev, struct p_header *h);
4409};
4410
4411static struct asender_cmd *get_asender_cmd(int cmd)
4412{
4413 static struct asender_cmd asender_tbl[] = {
4414 /* anything missing from this table is in
4415 * the drbd_cmd_handler (drbd_default_handler) table,
4416 * see the beginning of drbdd() */
4417 [P_PING] = { sizeof(struct p_header), got_Ping },
4418 [P_PING_ACK] = { sizeof(struct p_header), got_PingAck },
4419 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4420 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4421 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4422 [P_DISCARD_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4423 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
4424 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
4425 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply},
4426 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
4427 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
4428 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4429 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
e7f52dfb 4430 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe), got_something_to_ignore_m },
b411b363
PR
4431 [P_MAX_CMD] = { 0, NULL },
4432 };
4433 if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4434 return NULL;
4435 return &asender_tbl[cmd];
4436}
4437
4438int drbd_asender(struct drbd_thread *thi)
4439{
4440 struct drbd_conf *mdev = thi->mdev;
4441 struct p_header *h = &mdev->meta.rbuf.header;
4442 struct asender_cmd *cmd = NULL;
4443
4444 int rv, len;
4445 void *buf = h;
4446 int received = 0;
4447 int expect = sizeof(struct p_header);
4448 int empty;
4449
4450 sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4451
4452 current->policy = SCHED_RR; /* Make this a realtime task! */
4453 current->rt_priority = 2; /* more important than all other tasks */
4454
4455 while (get_t_state(thi) == Running) {
4456 drbd_thread_current_set_cpu(mdev);
4457 if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4458 ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4459 mdev->meta.socket->sk->sk_rcvtimeo =
4460 mdev->net_conf->ping_timeo*HZ/10;
4461 }
4462
4463 /* conditionally cork;
4464 * it may hurt latency if we cork without much to send */
4465 if (!mdev->net_conf->no_cork &&
4466 3 < atomic_read(&mdev->unacked_cnt))
4467 drbd_tcp_cork(mdev->meta.socket);
4468 while (1) {
4469 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4470 flush_signals(current);
4471 if (!drbd_process_done_ee(mdev)) {
4472 dev_err(DEV, "process_done_ee() = NOT_OK\n");
4473 goto reconnect;
4474 }
4475 /* to avoid race with newly queued ACKs */
4476 set_bit(SIGNAL_ASENDER, &mdev->flags);
4477 spin_lock_irq(&mdev->req_lock);
4478 empty = list_empty(&mdev->done_ee);
4479 spin_unlock_irq(&mdev->req_lock);
4480 /* new ack may have been queued right here,
4481 * but then there is also a signal pending,
4482 * and we start over... */
4483 if (empty)
4484 break;
4485 }
4486 /* but unconditionally uncork unless disabled */
4487 if (!mdev->net_conf->no_cork)
4488 drbd_tcp_uncork(mdev->meta.socket);
4489
4490 /* short circuit, recv_msg would return EINTR anyways. */
4491 if (signal_pending(current))
4492 continue;
4493
4494 rv = drbd_recv_short(mdev, mdev->meta.socket,
4495 buf, expect-received, 0);
4496 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4497
4498 flush_signals(current);
4499
4500 /* Note:
4501 * -EINTR (on meta) we got a signal
4502 * -EAGAIN (on meta) rcvtimeo expired
4503 * -ECONNRESET other side closed the connection
4504 * -ERESTARTSYS (on data) we got a signal
4505 * rv < 0 other than above: unexpected error!
4506 * rv == expected: full header or command
4507 * rv < expected: "woken" by signal during receive
4508 * rv == 0 : "connection shut down by peer"
4509 */
4510 if (likely(rv > 0)) {
4511 received += rv;
4512 buf += rv;
4513 } else if (rv == 0) {
4514 dev_err(DEV, "meta connection shut down by peer.\n");
4515 goto reconnect;
4516 } else if (rv == -EAGAIN) {
4517 if (mdev->meta.socket->sk->sk_rcvtimeo ==
4518 mdev->net_conf->ping_timeo*HZ/10) {
4519 dev_err(DEV, "PingAck did not arrive in time.\n");
4520 goto reconnect;
4521 }
4522 set_bit(SEND_PING, &mdev->flags);
4523 continue;
4524 } else if (rv == -EINTR) {
4525 continue;
4526 } else {
4527 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4528 goto reconnect;
4529 }
4530
4531 if (received == expect && cmd == NULL) {
4532 if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4533 dev_err(DEV, "magic?? on meta m: 0x%lx c: %d l: %d\n",
4534 (long)be32_to_cpu(h->magic),
4535 h->command, h->length);
4536 goto reconnect;
4537 }
4538 cmd = get_asender_cmd(be16_to_cpu(h->command));
4539 len = be16_to_cpu(h->length);
4540 if (unlikely(cmd == NULL)) {
4541 dev_err(DEV, "unknown command?? on meta m: 0x%lx c: %d l: %d\n",
4542 (long)be32_to_cpu(h->magic),
4543 h->command, h->length);
4544 goto disconnect;
4545 }
4546 expect = cmd->pkt_size;
6a0afdf5 4547 ERR_IF(len != expect-sizeof(struct p_header))
b411b363 4548 goto reconnect;
b411b363
PR
4549 }
4550 if (received == expect) {
4551 D_ASSERT(cmd != NULL);
b411b363
PR
4552 if (!cmd->process(mdev, h))
4553 goto reconnect;
4554
4555 buf = h;
4556 received = 0;
4557 expect = sizeof(struct p_header);
4558 cmd = NULL;
4559 }
4560 }
4561
4562 if (0) {
4563reconnect:
4564 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4565 }
4566 if (0) {
4567disconnect:
4568 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4569 }
4570 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4571
4572 D_ASSERT(mdev->state.conn < C_CONNECTED);
4573 dev_info(DEV, "asender terminated\n");
4574
4575 return 0;
4576}