]> git.proxmox.com Git - mirror_ubuntu-artful-kernel.git/blame - drivers/block/drbd/drbd_receiver.c
drbd: fix for possible deadlock on IO error during resync
[mirror_ubuntu-artful-kernel.git] / drivers / block / drbd / drbd_receiver.c
CommitLineData
b411b363
PR
1/*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
b411b363
PR
26#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
b411b363
PR
31#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
39#include <linux/smp_lock.h>
40#include <linux/pkt_sched.h>
41#define __KERNEL_SYSCALLS__
42#include <linux/unistd.h>
43#include <linux/vmalloc.h>
44#include <linux/random.h>
b411b363
PR
45#include <linux/string.h>
46#include <linux/scatterlist.h>
47#include "drbd_int.h"
b411b363
PR
48#include "drbd_req.h"
49
50#include "drbd_vli.h"
51
52struct flush_work {
53 struct drbd_work w;
54 struct drbd_epoch *epoch;
55};
56
57enum finish_epoch {
58 FE_STILL_LIVE,
59 FE_DESTROYED,
60 FE_RECYCLED,
61};
62
63static int drbd_do_handshake(struct drbd_conf *mdev);
64static int drbd_do_auth(struct drbd_conf *mdev);
65
66static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
67static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
68
69static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
70{
71 struct drbd_epoch *prev;
72 spin_lock(&mdev->epoch_lock);
73 prev = list_entry(epoch->list.prev, struct drbd_epoch, list);
74 if (prev == epoch || prev == mdev->current_epoch)
75 prev = NULL;
76 spin_unlock(&mdev->epoch_lock);
77 return prev;
78}
79
80#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
81
45bb912b
LE
82/*
83 * some helper functions to deal with single linked page lists,
84 * page->private being our "next" pointer.
85 */
86
87/* If at least n pages are linked at head, get n pages off.
88 * Otherwise, don't modify head, and return NULL.
89 * Locking is the responsibility of the caller.
90 */
91static struct page *page_chain_del(struct page **head, int n)
92{
93 struct page *page;
94 struct page *tmp;
95
96 BUG_ON(!n);
97 BUG_ON(!head);
98
99 page = *head;
23ce4227
PR
100
101 if (!page)
102 return NULL;
103
45bb912b
LE
104 while (page) {
105 tmp = page_chain_next(page);
106 if (--n == 0)
107 break; /* found sufficient pages */
108 if (tmp == NULL)
109 /* insufficient pages, don't use any of them. */
110 return NULL;
111 page = tmp;
112 }
113
114 /* add end of list marker for the returned list */
115 set_page_private(page, 0);
116 /* actual return value, and adjustment of head */
117 page = *head;
118 *head = tmp;
119 return page;
120}
121
122/* may be used outside of locks to find the tail of a (usually short)
123 * "private" page chain, before adding it back to a global chain head
124 * with page_chain_add() under a spinlock. */
125static struct page *page_chain_tail(struct page *page, int *len)
126{
127 struct page *tmp;
128 int i = 1;
129 while ((tmp = page_chain_next(page)))
130 ++i, page = tmp;
131 if (len)
132 *len = i;
133 return page;
134}
135
136static int page_chain_free(struct page *page)
137{
138 struct page *tmp;
139 int i = 0;
140 page_chain_for_each_safe(page, tmp) {
141 put_page(page);
142 ++i;
143 }
144 return i;
145}
146
147static void page_chain_add(struct page **head,
148 struct page *chain_first, struct page *chain_last)
149{
150#if 1
151 struct page *tmp;
152 tmp = page_chain_tail(chain_first, NULL);
153 BUG_ON(tmp != chain_last);
154#endif
155
156 /* add chain to head */
157 set_page_private(chain_last, (unsigned long)*head);
158 *head = chain_first;
159}
160
161static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
b411b363
PR
162{
163 struct page *page = NULL;
45bb912b
LE
164 struct page *tmp = NULL;
165 int i = 0;
b411b363
PR
166
167 /* Yes, testing drbd_pp_vacant outside the lock is racy.
168 * So what. It saves a spin_lock. */
45bb912b 169 if (drbd_pp_vacant >= number) {
b411b363 170 spin_lock(&drbd_pp_lock);
45bb912b
LE
171 page = page_chain_del(&drbd_pp_pool, number);
172 if (page)
173 drbd_pp_vacant -= number;
b411b363 174 spin_unlock(&drbd_pp_lock);
45bb912b
LE
175 if (page)
176 return page;
b411b363 177 }
45bb912b 178
b411b363
PR
179 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
180 * "criss-cross" setup, that might cause write-out on some other DRBD,
181 * which in turn might block on the other node at this very place. */
45bb912b
LE
182 for (i = 0; i < number; i++) {
183 tmp = alloc_page(GFP_TRY);
184 if (!tmp)
185 break;
186 set_page_private(tmp, (unsigned long)page);
187 page = tmp;
188 }
189
190 if (i == number)
191 return page;
192
193 /* Not enough pages immediately available this time.
194 * No need to jump around here, drbd_pp_alloc will retry this
195 * function "soon". */
196 if (page) {
197 tmp = page_chain_tail(page, NULL);
198 spin_lock(&drbd_pp_lock);
199 page_chain_add(&drbd_pp_pool, page, tmp);
200 drbd_pp_vacant += i;
201 spin_unlock(&drbd_pp_lock);
202 }
203 return NULL;
b411b363
PR
204}
205
206/* kick lower level device, if we have more than (arbitrary number)
207 * reference counts on it, which typically are locally submitted io
208 * requests. don't use unacked_cnt, so we speed up proto A and B, too. */
209static void maybe_kick_lo(struct drbd_conf *mdev)
210{
211 if (atomic_read(&mdev->local_cnt) >= mdev->net_conf->unplug_watermark)
212 drbd_kick_lo(mdev);
213}
214
215static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
216{
217 struct drbd_epoch_entry *e;
218 struct list_head *le, *tle;
219
220 /* The EEs are always appended to the end of the list. Since
221 they are sent in order over the wire, they have to finish
222 in order. As soon as we see the first not finished we can
223 stop to examine the list... */
224
225 list_for_each_safe(le, tle, &mdev->net_ee) {
226 e = list_entry(le, struct drbd_epoch_entry, w.list);
45bb912b 227 if (drbd_ee_has_active_page(e))
b411b363
PR
228 break;
229 list_move(le, to_be_freed);
230 }
231}
232
233static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
234{
235 LIST_HEAD(reclaimed);
236 struct drbd_epoch_entry *e, *t;
237
238 maybe_kick_lo(mdev);
239 spin_lock_irq(&mdev->req_lock);
240 reclaim_net_ee(mdev, &reclaimed);
241 spin_unlock_irq(&mdev->req_lock);
242
243 list_for_each_entry_safe(e, t, &reclaimed, w.list)
435f0740 244 drbd_free_net_ee(mdev, e);
b411b363
PR
245}
246
247/**
45bb912b 248 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
b411b363 249 * @mdev: DRBD device.
45bb912b
LE
250 * @number: number of pages requested
251 * @retry: whether to retry, if not enough pages are available right now
252 *
253 * Tries to allocate number pages, first from our own page pool, then from
254 * the kernel, unless this allocation would exceed the max_buffers setting.
255 * Possibly retry until DRBD frees sufficient pages somewhere else.
b411b363 256 *
45bb912b 257 * Returns a page chain linked via page->private.
b411b363 258 */
45bb912b 259static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
b411b363
PR
260{
261 struct page *page = NULL;
262 DEFINE_WAIT(wait);
263
45bb912b
LE
264 /* Yes, we may run up to @number over max_buffers. If we
265 * follow it strictly, the admin will get it wrong anyways. */
266 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
267 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363 268
45bb912b 269 while (page == NULL) {
b411b363
PR
270 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
271
272 drbd_kick_lo_and_reclaim_net(mdev);
273
274 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
45bb912b 275 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363
PR
276 if (page)
277 break;
278 }
279
280 if (!retry)
281 break;
282
283 if (signal_pending(current)) {
284 dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
285 break;
286 }
287
288 schedule();
289 }
290 finish_wait(&drbd_pp_wait, &wait);
291
45bb912b
LE
292 if (page)
293 atomic_add(number, &mdev->pp_in_use);
b411b363
PR
294 return page;
295}
296
297/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
45bb912b
LE
298 * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
299 * Either links the page chain back to the global pool,
300 * or returns all pages to the system. */
435f0740 301static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
b411b363 302{
435f0740 303 atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
b411b363 304 int i;
435f0740 305
45bb912b
LE
306 if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count)
307 i = page_chain_free(page);
308 else {
309 struct page *tmp;
310 tmp = page_chain_tail(page, &i);
311 spin_lock(&drbd_pp_lock);
312 page_chain_add(&drbd_pp_pool, page, tmp);
313 drbd_pp_vacant += i;
314 spin_unlock(&drbd_pp_lock);
b411b363 315 }
435f0740 316 i = atomic_sub_return(i, a);
45bb912b 317 if (i < 0)
435f0740
LE
318 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
319 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
b411b363
PR
320 wake_up(&drbd_pp_wait);
321}
322
323/*
324You need to hold the req_lock:
325 _drbd_wait_ee_list_empty()
326
327You must not have the req_lock:
328 drbd_free_ee()
329 drbd_alloc_ee()
330 drbd_init_ee()
331 drbd_release_ee()
332 drbd_ee_fix_bhs()
333 drbd_process_done_ee()
334 drbd_clear_done_ee()
335 drbd_wait_ee_list_empty()
336*/
337
338struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
339 u64 id,
340 sector_t sector,
341 unsigned int data_size,
342 gfp_t gfp_mask) __must_hold(local)
343{
b411b363
PR
344 struct drbd_epoch_entry *e;
345 struct page *page;
45bb912b 346 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
b411b363
PR
347
348 if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
349 return NULL;
350
351 e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
352 if (!e) {
353 if (!(gfp_mask & __GFP_NOWARN))
354 dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
355 return NULL;
356 }
357
45bb912b
LE
358 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
359 if (!page)
360 goto fail;
b411b363 361
b411b363
PR
362 INIT_HLIST_NODE(&e->colision);
363 e->epoch = NULL;
45bb912b
LE
364 e->mdev = mdev;
365 e->pages = page;
366 atomic_set(&e->pending_bios, 0);
367 e->size = data_size;
b411b363 368 e->flags = 0;
45bb912b 369 e->sector = sector;
45bb912b 370 e->block_id = id;
b411b363 371
b411b363
PR
372 return e;
373
45bb912b 374 fail:
b411b363 375 mempool_free(e, drbd_ee_mempool);
b411b363
PR
376 return NULL;
377}
378
435f0740 379void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, int is_net)
b411b363 380{
c36c3ced
LE
381 if (e->flags & EE_HAS_DIGEST)
382 kfree(e->digest);
435f0740 383 drbd_pp_free(mdev, e->pages, is_net);
45bb912b 384 D_ASSERT(atomic_read(&e->pending_bios) == 0);
b411b363
PR
385 D_ASSERT(hlist_unhashed(&e->colision));
386 mempool_free(e, drbd_ee_mempool);
387}
388
389int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
390{
391 LIST_HEAD(work_list);
392 struct drbd_epoch_entry *e, *t;
393 int count = 0;
435f0740 394 int is_net = list == &mdev->net_ee;
b411b363
PR
395
396 spin_lock_irq(&mdev->req_lock);
397 list_splice_init(list, &work_list);
398 spin_unlock_irq(&mdev->req_lock);
399
400 list_for_each_entry_safe(e, t, &work_list, w.list) {
435f0740 401 drbd_free_some_ee(mdev, e, is_net);
b411b363
PR
402 count++;
403 }
404 return count;
405}
406
407
408/*
409 * This function is called from _asender only_
410 * but see also comments in _req_mod(,barrier_acked)
411 * and receive_Barrier.
412 *
413 * Move entries from net_ee to done_ee, if ready.
414 * Grab done_ee, call all callbacks, free the entries.
415 * The callbacks typically send out ACKs.
416 */
417static int drbd_process_done_ee(struct drbd_conf *mdev)
418{
419 LIST_HEAD(work_list);
420 LIST_HEAD(reclaimed);
421 struct drbd_epoch_entry *e, *t;
422 int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
423
424 spin_lock_irq(&mdev->req_lock);
425 reclaim_net_ee(mdev, &reclaimed);
426 list_splice_init(&mdev->done_ee, &work_list);
427 spin_unlock_irq(&mdev->req_lock);
428
429 list_for_each_entry_safe(e, t, &reclaimed, w.list)
435f0740 430 drbd_free_net_ee(mdev, e);
b411b363
PR
431
432 /* possible callbacks here:
433 * e_end_block, and e_end_resync_block, e_send_discard_ack.
434 * all ignore the last argument.
435 */
436 list_for_each_entry_safe(e, t, &work_list, w.list) {
b411b363
PR
437 /* list_del not necessary, next/prev members not touched */
438 ok = e->w.cb(mdev, &e->w, !ok) && ok;
439 drbd_free_ee(mdev, e);
440 }
441 wake_up(&mdev->ee_wait);
442
443 return ok;
444}
445
446void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
447{
448 DEFINE_WAIT(wait);
449
450 /* avoids spin_lock/unlock
451 * and calling prepare_to_wait in the fast path */
452 while (!list_empty(head)) {
453 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
454 spin_unlock_irq(&mdev->req_lock);
455 drbd_kick_lo(mdev);
456 schedule();
457 finish_wait(&mdev->ee_wait, &wait);
458 spin_lock_irq(&mdev->req_lock);
459 }
460}
461
462void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
463{
464 spin_lock_irq(&mdev->req_lock);
465 _drbd_wait_ee_list_empty(mdev, head);
466 spin_unlock_irq(&mdev->req_lock);
467}
468
469/* see also kernel_accept; which is only present since 2.6.18.
470 * also we want to log which part of it failed, exactly */
471static int drbd_accept(struct drbd_conf *mdev, const char **what,
472 struct socket *sock, struct socket **newsock)
473{
474 struct sock *sk = sock->sk;
475 int err = 0;
476
477 *what = "listen";
478 err = sock->ops->listen(sock, 5);
479 if (err < 0)
480 goto out;
481
482 *what = "sock_create_lite";
483 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
484 newsock);
485 if (err < 0)
486 goto out;
487
488 *what = "accept";
489 err = sock->ops->accept(sock, *newsock, 0);
490 if (err < 0) {
491 sock_release(*newsock);
492 *newsock = NULL;
493 goto out;
494 }
495 (*newsock)->ops = sock->ops;
496
497out:
498 return err;
499}
500
501static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
502 void *buf, size_t size, int flags)
503{
504 mm_segment_t oldfs;
505 struct kvec iov = {
506 .iov_base = buf,
507 .iov_len = size,
508 };
509 struct msghdr msg = {
510 .msg_iovlen = 1,
511 .msg_iov = (struct iovec *)&iov,
512 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
513 };
514 int rv;
515
516 oldfs = get_fs();
517 set_fs(KERNEL_DS);
518 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
519 set_fs(oldfs);
520
521 return rv;
522}
523
524static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
525{
526 mm_segment_t oldfs;
527 struct kvec iov = {
528 .iov_base = buf,
529 .iov_len = size,
530 };
531 struct msghdr msg = {
532 .msg_iovlen = 1,
533 .msg_iov = (struct iovec *)&iov,
534 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
535 };
536 int rv;
537
538 oldfs = get_fs();
539 set_fs(KERNEL_DS);
540
541 for (;;) {
542 rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
543 if (rv == size)
544 break;
545
546 /* Note:
547 * ECONNRESET other side closed the connection
548 * ERESTARTSYS (on sock) we got a signal
549 */
550
551 if (rv < 0) {
552 if (rv == -ECONNRESET)
553 dev_info(DEV, "sock was reset by peer\n");
554 else if (rv != -ERESTARTSYS)
555 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
556 break;
557 } else if (rv == 0) {
558 dev_info(DEV, "sock was shut down by peer\n");
559 break;
560 } else {
561 /* signal came in, or peer/link went down,
562 * after we read a partial message
563 */
564 /* D_ASSERT(signal_pending(current)); */
565 break;
566 }
567 };
568
569 set_fs(oldfs);
570
571 if (rv != size)
572 drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
573
574 return rv;
575}
576
5dbf1673
LE
577/* quoting tcp(7):
578 * On individual connections, the socket buffer size must be set prior to the
579 * listen(2) or connect(2) calls in order to have it take effect.
580 * This is our wrapper to do so.
581 */
582static void drbd_setbufsize(struct socket *sock, unsigned int snd,
583 unsigned int rcv)
584{
585 /* open coded SO_SNDBUF, SO_RCVBUF */
586 if (snd) {
587 sock->sk->sk_sndbuf = snd;
588 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
589 }
590 if (rcv) {
591 sock->sk->sk_rcvbuf = rcv;
592 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
593 }
594}
595
b411b363
PR
596static struct socket *drbd_try_connect(struct drbd_conf *mdev)
597{
598 const char *what;
599 struct socket *sock;
600 struct sockaddr_in6 src_in6;
601 int err;
602 int disconnect_on_error = 1;
603
604 if (!get_net_conf(mdev))
605 return NULL;
606
607 what = "sock_create_kern";
608 err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
609 SOCK_STREAM, IPPROTO_TCP, &sock);
610 if (err < 0) {
611 sock = NULL;
612 goto out;
613 }
614
615 sock->sk->sk_rcvtimeo =
616 sock->sk->sk_sndtimeo = mdev->net_conf->try_connect_int*HZ;
5dbf1673
LE
617 drbd_setbufsize(sock, mdev->net_conf->sndbuf_size,
618 mdev->net_conf->rcvbuf_size);
b411b363
PR
619
620 /* explicitly bind to the configured IP as source IP
621 * for the outgoing connections.
622 * This is needed for multihomed hosts and to be
623 * able to use lo: interfaces for drbd.
624 * Make sure to use 0 as port number, so linux selects
625 * a free one dynamically.
626 */
627 memcpy(&src_in6, mdev->net_conf->my_addr,
628 min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
629 if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
630 src_in6.sin6_port = 0;
631 else
632 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
633
634 what = "bind before connect";
635 err = sock->ops->bind(sock,
636 (struct sockaddr *) &src_in6,
637 mdev->net_conf->my_addr_len);
638 if (err < 0)
639 goto out;
640
641 /* connect may fail, peer not yet available.
642 * stay C_WF_CONNECTION, don't go Disconnecting! */
643 disconnect_on_error = 0;
644 what = "connect";
645 err = sock->ops->connect(sock,
646 (struct sockaddr *)mdev->net_conf->peer_addr,
647 mdev->net_conf->peer_addr_len, 0);
648
649out:
650 if (err < 0) {
651 if (sock) {
652 sock_release(sock);
653 sock = NULL;
654 }
655 switch (-err) {
656 /* timeout, busy, signal pending */
657 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
658 case EINTR: case ERESTARTSYS:
659 /* peer not (yet) available, network problem */
660 case ECONNREFUSED: case ENETUNREACH:
661 case EHOSTDOWN: case EHOSTUNREACH:
662 disconnect_on_error = 0;
663 break;
664 default:
665 dev_err(DEV, "%s failed, err = %d\n", what, err);
666 }
667 if (disconnect_on_error)
668 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
669 }
670 put_net_conf(mdev);
671 return sock;
672}
673
674static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
675{
676 int timeo, err;
677 struct socket *s_estab = NULL, *s_listen;
678 const char *what;
679
680 if (!get_net_conf(mdev))
681 return NULL;
682
683 what = "sock_create_kern";
684 err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
685 SOCK_STREAM, IPPROTO_TCP, &s_listen);
686 if (err) {
687 s_listen = NULL;
688 goto out;
689 }
690
691 timeo = mdev->net_conf->try_connect_int * HZ;
692 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
693
694 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
695 s_listen->sk->sk_rcvtimeo = timeo;
696 s_listen->sk->sk_sndtimeo = timeo;
5dbf1673
LE
697 drbd_setbufsize(s_listen, mdev->net_conf->sndbuf_size,
698 mdev->net_conf->rcvbuf_size);
b411b363
PR
699
700 what = "bind before listen";
701 err = s_listen->ops->bind(s_listen,
702 (struct sockaddr *) mdev->net_conf->my_addr,
703 mdev->net_conf->my_addr_len);
704 if (err < 0)
705 goto out;
706
707 err = drbd_accept(mdev, &what, s_listen, &s_estab);
708
709out:
710 if (s_listen)
711 sock_release(s_listen);
712 if (err < 0) {
713 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
714 dev_err(DEV, "%s failed, err = %d\n", what, err);
715 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
716 }
717 }
718 put_net_conf(mdev);
719
720 return s_estab;
721}
722
723static int drbd_send_fp(struct drbd_conf *mdev,
724 struct socket *sock, enum drbd_packets cmd)
725{
02918be2 726 struct p_header80 *h = &mdev->data.sbuf.header.h80;
b411b363
PR
727
728 return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
729}
730
731static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
732{
02918be2 733 struct p_header80 *h = &mdev->data.rbuf.header.h80;
b411b363
PR
734 int rr;
735
736 rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
737
738 if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
739 return be16_to_cpu(h->command);
740
741 return 0xffff;
742}
743
744/**
745 * drbd_socket_okay() - Free the socket if its connection is not okay
746 * @mdev: DRBD device.
747 * @sock: pointer to the pointer to the socket.
748 */
749static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
750{
751 int rr;
752 char tb[4];
753
754 if (!*sock)
755 return FALSE;
756
757 rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
758
759 if (rr > 0 || rr == -EAGAIN) {
760 return TRUE;
761 } else {
762 sock_release(*sock);
763 *sock = NULL;
764 return FALSE;
765 }
766}
767
768/*
769 * return values:
770 * 1 yes, we have a valid connection
771 * 0 oops, did not work out, please try again
772 * -1 peer talks different language,
773 * no point in trying again, please go standalone.
774 * -2 We do not have a network config...
775 */
776static int drbd_connect(struct drbd_conf *mdev)
777{
778 struct socket *s, *sock, *msock;
779 int try, h, ok;
780
781 D_ASSERT(!mdev->data.socket);
782
b411b363
PR
783 if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
784 return -2;
785
786 clear_bit(DISCARD_CONCURRENT, &mdev->flags);
787
788 sock = NULL;
789 msock = NULL;
790
791 do {
792 for (try = 0;;) {
793 /* 3 tries, this should take less than a second! */
794 s = drbd_try_connect(mdev);
795 if (s || ++try >= 3)
796 break;
797 /* give the other side time to call bind() & listen() */
798 __set_current_state(TASK_INTERRUPTIBLE);
799 schedule_timeout(HZ / 10);
800 }
801
802 if (s) {
803 if (!sock) {
804 drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
805 sock = s;
806 s = NULL;
807 } else if (!msock) {
808 drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
809 msock = s;
810 s = NULL;
811 } else {
812 dev_err(DEV, "Logic error in drbd_connect()\n");
813 goto out_release_sockets;
814 }
815 }
816
817 if (sock && msock) {
818 __set_current_state(TASK_INTERRUPTIBLE);
819 schedule_timeout(HZ / 10);
820 ok = drbd_socket_okay(mdev, &sock);
821 ok = drbd_socket_okay(mdev, &msock) && ok;
822 if (ok)
823 break;
824 }
825
826retry:
827 s = drbd_wait_for_connect(mdev);
828 if (s) {
829 try = drbd_recv_fp(mdev, s);
830 drbd_socket_okay(mdev, &sock);
831 drbd_socket_okay(mdev, &msock);
832 switch (try) {
833 case P_HAND_SHAKE_S:
834 if (sock) {
835 dev_warn(DEV, "initial packet S crossed\n");
836 sock_release(sock);
837 }
838 sock = s;
839 break;
840 case P_HAND_SHAKE_M:
841 if (msock) {
842 dev_warn(DEV, "initial packet M crossed\n");
843 sock_release(msock);
844 }
845 msock = s;
846 set_bit(DISCARD_CONCURRENT, &mdev->flags);
847 break;
848 default:
849 dev_warn(DEV, "Error receiving initial packet\n");
850 sock_release(s);
851 if (random32() & 1)
852 goto retry;
853 }
854 }
855
856 if (mdev->state.conn <= C_DISCONNECTING)
857 goto out_release_sockets;
858 if (signal_pending(current)) {
859 flush_signals(current);
860 smp_rmb();
861 if (get_t_state(&mdev->receiver) == Exiting)
862 goto out_release_sockets;
863 }
864
865 if (sock && msock) {
866 ok = drbd_socket_okay(mdev, &sock);
867 ok = drbd_socket_okay(mdev, &msock) && ok;
868 if (ok)
869 break;
870 }
871 } while (1);
872
873 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
874 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
875
876 sock->sk->sk_allocation = GFP_NOIO;
877 msock->sk->sk_allocation = GFP_NOIO;
878
879 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
880 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
881
b411b363
PR
882 /* NOT YET ...
883 * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
884 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
885 * first set it to the P_HAND_SHAKE timeout,
886 * which we set to 4x the configured ping_timeout. */
887 sock->sk->sk_sndtimeo =
888 sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
889
890 msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
891 msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
892
893 /* we don't want delays.
894 * we use TCP_CORK where apropriate, though */
895 drbd_tcp_nodelay(sock);
896 drbd_tcp_nodelay(msock);
897
898 mdev->data.socket = sock;
899 mdev->meta.socket = msock;
900 mdev->last_received = jiffies;
901
902 D_ASSERT(mdev->asender.task == NULL);
903
904 h = drbd_do_handshake(mdev);
905 if (h <= 0)
906 return h;
907
908 if (mdev->cram_hmac_tfm) {
909 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
b10d96cb
JT
910 switch (drbd_do_auth(mdev)) {
911 case -1:
b411b363
PR
912 dev_err(DEV, "Authentication of peer failed\n");
913 return -1;
b10d96cb
JT
914 case 0:
915 dev_err(DEV, "Authentication of peer failed, trying again.\n");
916 return 0;
b411b363
PR
917 }
918 }
919
920 if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
921 return 0;
922
923 sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
924 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
925
926 atomic_set(&mdev->packet_seq, 0);
927 mdev->peer_seq = 0;
928
929 drbd_thread_start(&mdev->asender);
930
d5373389
PR
931 if (mdev->agreed_pro_version < 95 && get_ldev(mdev)) {
932 drbd_setup_queue_param(mdev, DRBD_MAX_SIZE_H80_PACKET);
933 put_ldev(mdev);
934 }
935
7e2455c1
PR
936 if (!drbd_send_protocol(mdev))
937 return -1;
b411b363 938 drbd_send_sync_param(mdev, &mdev->sync_conf);
e89b591c 939 drbd_send_sizes(mdev, 0, 0);
b411b363
PR
940 drbd_send_uuids(mdev);
941 drbd_send_state(mdev);
942 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
943 clear_bit(RESIZE_PENDING, &mdev->flags);
944
945 return 1;
946
947out_release_sockets:
948 if (sock)
949 sock_release(sock);
950 if (msock)
951 sock_release(msock);
952 return -1;
953}
954
02918be2 955static int drbd_recv_header(struct drbd_conf *mdev, enum drbd_packets *cmd, unsigned int *packet_size)
b411b363 956{
02918be2 957 union p_header *h = &mdev->data.rbuf.header;
b411b363
PR
958 int r;
959
960 r = drbd_recv(mdev, h, sizeof(*h));
b411b363
PR
961 if (unlikely(r != sizeof(*h))) {
962 dev_err(DEV, "short read expecting header on sock: r=%d\n", r);
963 return FALSE;
02918be2
PR
964 }
965
966 if (likely(h->h80.magic == BE_DRBD_MAGIC)) {
967 *cmd = be16_to_cpu(h->h80.command);
968 *packet_size = be16_to_cpu(h->h80.length);
969 } else if (h->h95.magic == BE_DRBD_MAGIC_BIG) {
970 *cmd = be16_to_cpu(h->h95.command);
971 *packet_size = be32_to_cpu(h->h95.length);
972 } else {
004352fa
LE
973 dev_err(DEV, "magic?? on data m: 0x%08x c: %d l: %d\n",
974 be32_to_cpu(h->h80.magic),
975 be16_to_cpu(h->h80.command),
976 be16_to_cpu(h->h80.length));
b411b363
PR
977 return FALSE;
978 }
979 mdev->last_received = jiffies;
980
981 return TRUE;
982}
983
984static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
985{
986 int rv;
987
988 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
fbd9b09a
DM
989 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
990 NULL, BLKDEV_IFL_WAIT);
b411b363
PR
991 if (rv) {
992 dev_err(DEV, "local disk flush failed with status %d\n", rv);
993 /* would rather check on EOPNOTSUPP, but that is not reliable.
994 * don't try again for ANY return value != 0
995 * if (rv == -EOPNOTSUPP) */
996 drbd_bump_write_ordering(mdev, WO_drain_io);
997 }
998 put_ldev(mdev);
999 }
1000
1001 return drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
1002}
1003
1004static int w_flush(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1005{
1006 struct flush_work *fw = (struct flush_work *)w;
1007 struct drbd_epoch *epoch = fw->epoch;
1008
1009 kfree(w);
1010
1011 if (!test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags))
1012 drbd_flush_after_epoch(mdev, epoch);
1013
1014 drbd_may_finish_epoch(mdev, epoch, EV_PUT |
1015 (mdev->state.conn < C_CONNECTED ? EV_CLEANUP : 0));
1016
1017 return 1;
1018}
1019
1020/**
1021 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1022 * @mdev: DRBD device.
1023 * @epoch: Epoch object.
1024 * @ev: Epoch event.
1025 */
1026static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1027 struct drbd_epoch *epoch,
1028 enum epoch_event ev)
1029{
1030 int finish, epoch_size;
1031 struct drbd_epoch *next_epoch;
1032 int schedule_flush = 0;
1033 enum finish_epoch rv = FE_STILL_LIVE;
1034
1035 spin_lock(&mdev->epoch_lock);
1036 do {
1037 next_epoch = NULL;
1038 finish = 0;
1039
1040 epoch_size = atomic_read(&epoch->epoch_size);
1041
1042 switch (ev & ~EV_CLEANUP) {
1043 case EV_PUT:
1044 atomic_dec(&epoch->active);
1045 break;
1046 case EV_GOT_BARRIER_NR:
1047 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1048
1049 /* Special case: If we just switched from WO_bio_barrier to
1050 WO_bdev_flush we should not finish the current epoch */
1051 if (test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags) && epoch_size == 1 &&
1052 mdev->write_ordering != WO_bio_barrier &&
1053 epoch == mdev->current_epoch)
1054 clear_bit(DE_CONTAINS_A_BARRIER, &epoch->flags);
1055 break;
1056 case EV_BARRIER_DONE:
1057 set_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags);
1058 break;
1059 case EV_BECAME_LAST:
1060 /* nothing to do*/
1061 break;
1062 }
1063
b411b363
PR
1064 if (epoch_size != 0 &&
1065 atomic_read(&epoch->active) == 0 &&
1066 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) &&
1067 epoch->list.prev == &mdev->current_epoch->list &&
1068 !test_bit(DE_IS_FINISHING, &epoch->flags)) {
1069 /* Nearly all conditions are met to finish that epoch... */
1070 if (test_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags) ||
1071 mdev->write_ordering == WO_none ||
1072 (epoch_size == 1 && test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) ||
1073 ev & EV_CLEANUP) {
1074 finish = 1;
1075 set_bit(DE_IS_FINISHING, &epoch->flags);
1076 } else if (!test_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags) &&
1077 mdev->write_ordering == WO_bio_barrier) {
1078 atomic_inc(&epoch->active);
1079 schedule_flush = 1;
1080 }
1081 }
1082 if (finish) {
1083 if (!(ev & EV_CLEANUP)) {
1084 spin_unlock(&mdev->epoch_lock);
1085 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1086 spin_lock(&mdev->epoch_lock);
1087 }
1088 dec_unacked(mdev);
1089
1090 if (mdev->current_epoch != epoch) {
1091 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1092 list_del(&epoch->list);
1093 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1094 mdev->epochs--;
b411b363
PR
1095 kfree(epoch);
1096
1097 if (rv == FE_STILL_LIVE)
1098 rv = FE_DESTROYED;
1099 } else {
1100 epoch->flags = 0;
1101 atomic_set(&epoch->epoch_size, 0);
698f9315 1102 /* atomic_set(&epoch->active, 0); is already zero */
b411b363
PR
1103 if (rv == FE_STILL_LIVE)
1104 rv = FE_RECYCLED;
1105 }
1106 }
1107
1108 if (!next_epoch)
1109 break;
1110
1111 epoch = next_epoch;
1112 } while (1);
1113
1114 spin_unlock(&mdev->epoch_lock);
1115
1116 if (schedule_flush) {
1117 struct flush_work *fw;
1118 fw = kmalloc(sizeof(*fw), GFP_ATOMIC);
1119 if (fw) {
b411b363
PR
1120 fw->w.cb = w_flush;
1121 fw->epoch = epoch;
1122 drbd_queue_work(&mdev->data.work, &fw->w);
1123 } else {
1124 dev_warn(DEV, "Could not kmalloc a flush_work obj\n");
1125 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1126 /* That is not a recursion, only one level */
1127 drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
1128 drbd_may_finish_epoch(mdev, epoch, EV_PUT);
1129 }
1130 }
1131
1132 return rv;
1133}
1134
1135/**
1136 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1137 * @mdev: DRBD device.
1138 * @wo: Write ordering method to try.
1139 */
1140void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1141{
1142 enum write_ordering_e pwo;
1143 static char *write_ordering_str[] = {
1144 [WO_none] = "none",
1145 [WO_drain_io] = "drain",
1146 [WO_bdev_flush] = "flush",
1147 [WO_bio_barrier] = "barrier",
1148 };
1149
1150 pwo = mdev->write_ordering;
1151 wo = min(pwo, wo);
1152 if (wo == WO_bio_barrier && mdev->ldev->dc.no_disk_barrier)
1153 wo = WO_bdev_flush;
1154 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1155 wo = WO_drain_io;
1156 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1157 wo = WO_none;
1158 mdev->write_ordering = wo;
1159 if (pwo != mdev->write_ordering || wo == WO_bio_barrier)
1160 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1161}
1162
45bb912b
LE
1163/**
1164 * drbd_submit_ee()
1165 * @mdev: DRBD device.
1166 * @e: epoch entry
1167 * @rw: flag field, see bio->bi_rw
1168 */
1169/* TODO allocate from our own bio_set. */
1170int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1171 const unsigned rw, const int fault_type)
1172{
1173 struct bio *bios = NULL;
1174 struct bio *bio;
1175 struct page *page = e->pages;
1176 sector_t sector = e->sector;
1177 unsigned ds = e->size;
1178 unsigned n_bios = 0;
1179 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1180
1181 /* In most cases, we will only need one bio. But in case the lower
1182 * level restrictions happen to be different at this offset on this
1183 * side than those of the sending peer, we may need to submit the
1184 * request in more than one bio. */
1185next_bio:
1186 bio = bio_alloc(GFP_NOIO, nr_pages);
1187 if (!bio) {
1188 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1189 goto fail;
1190 }
1191 /* > e->sector, unless this is the first bio */
1192 bio->bi_sector = sector;
1193 bio->bi_bdev = mdev->ldev->backing_bdev;
1194 /* we special case some flags in the multi-bio case, see below
7b6d91da 1195 * (REQ_UNPLUG, REQ_HARDBARRIER) */
45bb912b
LE
1196 bio->bi_rw = rw;
1197 bio->bi_private = e;
1198 bio->bi_end_io = drbd_endio_sec;
1199
1200 bio->bi_next = bios;
1201 bios = bio;
1202 ++n_bios;
1203
1204 page_chain_for_each(page) {
1205 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1206 if (!bio_add_page(bio, page, len, 0)) {
1207 /* a single page must always be possible! */
1208 BUG_ON(bio->bi_vcnt == 0);
1209 goto next_bio;
1210 }
1211 ds -= len;
1212 sector += len >> 9;
1213 --nr_pages;
1214 }
1215 D_ASSERT(page == NULL);
1216 D_ASSERT(ds == 0);
1217
1218 atomic_set(&e->pending_bios, n_bios);
1219 do {
1220 bio = bios;
1221 bios = bios->bi_next;
1222 bio->bi_next = NULL;
1223
7b6d91da 1224 /* strip off REQ_UNPLUG unless it is the last bio */
45bb912b 1225 if (bios)
7b6d91da 1226 bio->bi_rw &= ~REQ_UNPLUG;
45bb912b
LE
1227
1228 drbd_generic_make_request(mdev, fault_type, bio);
1229
7b6d91da 1230 /* strip off REQ_HARDBARRIER,
45bb912b
LE
1231 * unless it is the first or last bio */
1232 if (bios && bios->bi_next)
7b6d91da 1233 bios->bi_rw &= ~REQ_HARDBARRIER;
45bb912b
LE
1234 } while (bios);
1235 maybe_kick_lo(mdev);
1236 return 0;
1237
1238fail:
1239 while (bios) {
1240 bio = bios;
1241 bios = bios->bi_next;
1242 bio_put(bio);
1243 }
1244 return -ENOMEM;
1245}
1246
b411b363 1247/**
7b6d91da 1248 * w_e_reissue() - Worker callback; Resubmit a bio, without REQ_HARDBARRIER set
b411b363
PR
1249 * @mdev: DRBD device.
1250 * @w: work object.
1251 * @cancel: The connection will be closed anyways (unused in this callback)
1252 */
1253int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
1254{
1255 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
b411b363
PR
1256 /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
1257 (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
1258 so that we can finish that epoch in drbd_may_finish_epoch().
1259 That is necessary if we already have a long chain of Epochs, before
7b6d91da 1260 we realize that REQ_HARDBARRIER is actually not supported */
b411b363
PR
1261
1262 /* As long as the -ENOTSUPP on the barrier is reported immediately
1263 that will never trigger. If it is reported late, we will just
1264 print that warning and continue correctly for all future requests
1265 with WO_bdev_flush */
1266 if (previous_epoch(mdev, e->epoch))
1267 dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
1268
b411b363
PR
1269 /* we still have a local reference,
1270 * get_ldev was done in receive_Data. */
b411b363
PR
1271
1272 e->w.cb = e_end_block;
45bb912b
LE
1273 if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_DT_WR) != 0) {
1274 /* drbd_submit_ee fails for one reason only:
1275 * if was not able to allocate sufficient bios.
1276 * requeue, try again later. */
1277 e->w.cb = w_e_reissue;
1278 drbd_queue_work(&mdev->data.work, &e->w);
1279 }
b411b363
PR
1280 return 1;
1281}
1282
02918be2 1283static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
b411b363
PR
1284{
1285 int rv, issue_flush;
02918be2 1286 struct p_barrier *p = &mdev->data.rbuf.barrier;
b411b363
PR
1287 struct drbd_epoch *epoch;
1288
b411b363
PR
1289 inc_unacked(mdev);
1290
1291 if (mdev->net_conf->wire_protocol != DRBD_PROT_C)
1292 drbd_kick_lo(mdev);
1293
1294 mdev->current_epoch->barrier_nr = p->barrier;
1295 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1296
1297 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1298 * the activity log, which means it would not be resynced in case the
1299 * R_PRIMARY crashes now.
1300 * Therefore we must send the barrier_ack after the barrier request was
1301 * completed. */
1302 switch (mdev->write_ordering) {
1303 case WO_bio_barrier:
1304 case WO_none:
1305 if (rv == FE_RECYCLED)
1306 return TRUE;
1307 break;
1308
1309 case WO_bdev_flush:
1310 case WO_drain_io:
367a8d73
PR
1311 if (rv == FE_STILL_LIVE) {
1312 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
1313 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1314 rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1315 }
b411b363
PR
1316 if (rv == FE_RECYCLED)
1317 return TRUE;
1318
1319 /* The asender will send all the ACKs and barrier ACKs out, since
1320 all EEs moved from the active_ee to the done_ee. We need to
1321 provide a new epoch object for the EEs that come in soon */
1322 break;
1323 }
1324
1325 /* receiver context, in the writeout path of the other node.
1326 * avoid potential distributed deadlock */
1327 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1328 if (!epoch) {
1329 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
d3db7b48 1330 issue_flush = !test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
b411b363
PR
1331 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1332 if (issue_flush) {
1333 rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1334 if (rv == FE_RECYCLED)
1335 return TRUE;
1336 }
1337
1338 drbd_wait_ee_list_empty(mdev, &mdev->done_ee);
1339
1340 return TRUE;
1341 }
1342
1343 epoch->flags = 0;
1344 atomic_set(&epoch->epoch_size, 0);
1345 atomic_set(&epoch->active, 0);
1346
1347 spin_lock(&mdev->epoch_lock);
1348 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1349 list_add(&epoch->list, &mdev->current_epoch->list);
1350 mdev->current_epoch = epoch;
1351 mdev->epochs++;
b411b363
PR
1352 } else {
1353 /* The current_epoch got recycled while we allocated this one... */
1354 kfree(epoch);
1355 }
1356 spin_unlock(&mdev->epoch_lock);
1357
1358 return TRUE;
1359}
1360
1361/* used from receive_RSDataReply (recv_resync_read)
1362 * and from receive_Data */
1363static struct drbd_epoch_entry *
1364read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1365{
6666032a 1366 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
b411b363 1367 struct drbd_epoch_entry *e;
b411b363 1368 struct page *page;
45bb912b 1369 int dgs, ds, rr;
b411b363
PR
1370 void *dig_in = mdev->int_dig_in;
1371 void *dig_vv = mdev->int_dig_vv;
6b4388ac 1372 unsigned long *data;
b411b363
PR
1373
1374 dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1375 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1376
1377 if (dgs) {
1378 rr = drbd_recv(mdev, dig_in, dgs);
1379 if (rr != dgs) {
1380 dev_warn(DEV, "short read receiving data digest: read %d expected %d\n",
1381 rr, dgs);
1382 return NULL;
1383 }
1384 }
1385
1386 data_size -= dgs;
1387
1388 ERR_IF(data_size & 0x1ff) return NULL;
1389 ERR_IF(data_size > DRBD_MAX_SEGMENT_SIZE) return NULL;
1390
6666032a
LE
1391 /* even though we trust out peer,
1392 * we sometimes have to double check. */
1393 if (sector + (data_size>>9) > capacity) {
1394 dev_err(DEV, "capacity: %llus < sector: %llus + size: %u\n",
1395 (unsigned long long)capacity,
1396 (unsigned long long)sector, data_size);
1397 return NULL;
1398 }
1399
b411b363
PR
1400 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1401 * "criss-cross" setup, that might cause write-out on some other DRBD,
1402 * which in turn might block on the other node at this very place. */
1403 e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1404 if (!e)
1405 return NULL;
45bb912b 1406
b411b363 1407 ds = data_size;
45bb912b
LE
1408 page = e->pages;
1409 page_chain_for_each(page) {
1410 unsigned len = min_t(int, ds, PAGE_SIZE);
6b4388ac 1411 data = kmap(page);
45bb912b 1412 rr = drbd_recv(mdev, data, len);
6b4388ac
PR
1413 if (FAULT_ACTIVE(mdev, DRBD_FAULT_RECEIVE)) {
1414 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1415 data[0] = data[0] ^ (unsigned long)-1;
1416 }
b411b363 1417 kunmap(page);
45bb912b 1418 if (rr != len) {
b411b363
PR
1419 drbd_free_ee(mdev, e);
1420 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
45bb912b 1421 rr, len);
b411b363
PR
1422 return NULL;
1423 }
1424 ds -= rr;
1425 }
1426
1427 if (dgs) {
45bb912b 1428 drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
b411b363
PR
1429 if (memcmp(dig_in, dig_vv, dgs)) {
1430 dev_err(DEV, "Digest integrity check FAILED.\n");
1431 drbd_bcast_ee(mdev, "digest failed",
1432 dgs, dig_in, dig_vv, e);
1433 drbd_free_ee(mdev, e);
1434 return NULL;
1435 }
1436 }
1437 mdev->recv_cnt += data_size>>9;
1438 return e;
1439}
1440
1441/* drbd_drain_block() just takes a data block
1442 * out of the socket input buffer, and discards it.
1443 */
1444static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1445{
1446 struct page *page;
1447 int rr, rv = 1;
1448 void *data;
1449
c3470cde
LE
1450 if (!data_size)
1451 return TRUE;
1452
45bb912b 1453 page = drbd_pp_alloc(mdev, 1, 1);
b411b363
PR
1454
1455 data = kmap(page);
1456 while (data_size) {
1457 rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1458 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1459 rv = 0;
1460 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1461 rr, min_t(int, data_size, PAGE_SIZE));
1462 break;
1463 }
1464 data_size -= rr;
1465 }
1466 kunmap(page);
435f0740 1467 drbd_pp_free(mdev, page, 0);
b411b363
PR
1468 return rv;
1469}
1470
1471static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1472 sector_t sector, int data_size)
1473{
1474 struct bio_vec *bvec;
1475 struct bio *bio;
1476 int dgs, rr, i, expect;
1477 void *dig_in = mdev->int_dig_in;
1478 void *dig_vv = mdev->int_dig_vv;
1479
1480 dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1481 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1482
1483 if (dgs) {
1484 rr = drbd_recv(mdev, dig_in, dgs);
1485 if (rr != dgs) {
1486 dev_warn(DEV, "short read receiving data reply digest: read %d expected %d\n",
1487 rr, dgs);
1488 return 0;
1489 }
1490 }
1491
1492 data_size -= dgs;
1493
1494 /* optimistically update recv_cnt. if receiving fails below,
1495 * we disconnect anyways, and counters will be reset. */
1496 mdev->recv_cnt += data_size>>9;
1497
1498 bio = req->master_bio;
1499 D_ASSERT(sector == bio->bi_sector);
1500
1501 bio_for_each_segment(bvec, bio, i) {
1502 expect = min_t(int, data_size, bvec->bv_len);
1503 rr = drbd_recv(mdev,
1504 kmap(bvec->bv_page)+bvec->bv_offset,
1505 expect);
1506 kunmap(bvec->bv_page);
1507 if (rr != expect) {
1508 dev_warn(DEV, "short read receiving data reply: "
1509 "read %d expected %d\n",
1510 rr, expect);
1511 return 0;
1512 }
1513 data_size -= rr;
1514 }
1515
1516 if (dgs) {
45bb912b 1517 drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
b411b363
PR
1518 if (memcmp(dig_in, dig_vv, dgs)) {
1519 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1520 return 0;
1521 }
1522 }
1523
1524 D_ASSERT(data_size == 0);
1525 return 1;
1526}
1527
1528/* e_end_resync_block() is called via
1529 * drbd_process_done_ee() by asender only */
1530static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1531{
1532 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1533 sector_t sector = e->sector;
1534 int ok;
1535
1536 D_ASSERT(hlist_unhashed(&e->colision));
1537
45bb912b 1538 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1539 drbd_set_in_sync(mdev, sector, e->size);
1540 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1541 } else {
1542 /* Record failure to sync */
1543 drbd_rs_failed_io(mdev, sector, e->size);
1544
1545 ok = drbd_send_ack(mdev, P_NEG_ACK, e);
1546 }
1547 dec_unacked(mdev);
1548
1549 return ok;
1550}
1551
1552static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1553{
1554 struct drbd_epoch_entry *e;
1555
1556 e = read_in_block(mdev, ID_SYNCER, sector, data_size);
45bb912b
LE
1557 if (!e)
1558 goto fail;
b411b363
PR
1559
1560 dec_rs_pending(mdev);
1561
b411b363
PR
1562 inc_unacked(mdev);
1563 /* corresponding dec_unacked() in e_end_resync_block()
1564 * respective _drbd_clear_done_ee */
1565
45bb912b
LE
1566 e->w.cb = e_end_resync_block;
1567
b411b363
PR
1568 spin_lock_irq(&mdev->req_lock);
1569 list_add(&e->w.list, &mdev->sync_ee);
1570 spin_unlock_irq(&mdev->req_lock);
1571
0f0601f4 1572 atomic_add(data_size >> 9, &mdev->rs_sect_ev);
45bb912b
LE
1573 if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1574 return TRUE;
b411b363 1575
22cc37a9
LE
1576 /* drbd_submit_ee currently fails for one reason only:
1577 * not being able to allocate enough bios.
1578 * Is dropping the connection going to help? */
1579 spin_lock_irq(&mdev->req_lock);
1580 list_del(&e->w.list);
1581 spin_unlock_irq(&mdev->req_lock);
1582
45bb912b
LE
1583 drbd_free_ee(mdev, e);
1584fail:
1585 put_ldev(mdev);
1586 return FALSE;
b411b363
PR
1587}
1588
02918be2 1589static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
b411b363
PR
1590{
1591 struct drbd_request *req;
1592 sector_t sector;
b411b363 1593 int ok;
02918be2 1594 struct p_data *p = &mdev->data.rbuf.data;
b411b363
PR
1595
1596 sector = be64_to_cpu(p->sector);
1597
1598 spin_lock_irq(&mdev->req_lock);
1599 req = _ar_id_to_req(mdev, p->block_id, sector);
1600 spin_unlock_irq(&mdev->req_lock);
1601 if (unlikely(!req)) {
1602 dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1603 return FALSE;
1604 }
1605
1606 /* hlist_del(&req->colision) is done in _req_may_be_done, to avoid
1607 * special casing it there for the various failure cases.
1608 * still no race with drbd_fail_pending_reads */
1609 ok = recv_dless_read(mdev, req, sector, data_size);
1610
1611 if (ok)
1612 req_mod(req, data_received);
1613 /* else: nothing. handled from drbd_disconnect...
1614 * I don't think we may complete this just yet
1615 * in case we are "on-disconnect: freeze" */
1616
1617 return ok;
1618}
1619
02918be2 1620static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
b411b363
PR
1621{
1622 sector_t sector;
b411b363 1623 int ok;
02918be2 1624 struct p_data *p = &mdev->data.rbuf.data;
b411b363
PR
1625
1626 sector = be64_to_cpu(p->sector);
1627 D_ASSERT(p->block_id == ID_SYNCER);
1628
1629 if (get_ldev(mdev)) {
1630 /* data is submitted to disk within recv_resync_read.
1631 * corresponding put_ldev done below on error,
1632 * or in drbd_endio_write_sec. */
1633 ok = recv_resync_read(mdev, sector, data_size);
1634 } else {
1635 if (__ratelimit(&drbd_ratelimit_state))
1636 dev_err(DEV, "Can not write resync data to local disk.\n");
1637
1638 ok = drbd_drain_block(mdev, data_size);
1639
2b2bf214 1640 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
b411b363
PR
1641 }
1642
778f271d
PR
1643 atomic_add(data_size >> 9, &mdev->rs_sect_in);
1644
b411b363
PR
1645 return ok;
1646}
1647
1648/* e_end_block() is called via drbd_process_done_ee().
1649 * this means this function only runs in the asender thread
1650 */
1651static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1652{
1653 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1654 sector_t sector = e->sector;
1655 struct drbd_epoch *epoch;
1656 int ok = 1, pcmd;
1657
1658 if (e->flags & EE_IS_BARRIER) {
1659 epoch = previous_epoch(mdev, e->epoch);
1660 if (epoch)
1661 drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE + (cancel ? EV_CLEANUP : 0));
1662 }
1663
1664 if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
45bb912b 1665 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1666 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1667 mdev->state.conn <= C_PAUSED_SYNC_T &&
1668 e->flags & EE_MAY_SET_IN_SYNC) ?
1669 P_RS_WRITE_ACK : P_WRITE_ACK;
1670 ok &= drbd_send_ack(mdev, pcmd, e);
1671 if (pcmd == P_RS_WRITE_ACK)
1672 drbd_set_in_sync(mdev, sector, e->size);
1673 } else {
1674 ok = drbd_send_ack(mdev, P_NEG_ACK, e);
1675 /* we expect it to be marked out of sync anyways...
1676 * maybe assert this? */
1677 }
1678 dec_unacked(mdev);
1679 }
1680 /* we delete from the conflict detection hash _after_ we sent out the
1681 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1682 if (mdev->net_conf->two_primaries) {
1683 spin_lock_irq(&mdev->req_lock);
1684 D_ASSERT(!hlist_unhashed(&e->colision));
1685 hlist_del_init(&e->colision);
1686 spin_unlock_irq(&mdev->req_lock);
1687 } else {
1688 D_ASSERT(hlist_unhashed(&e->colision));
1689 }
1690
1691 drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1692
1693 return ok;
1694}
1695
1696static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1697{
1698 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1699 int ok = 1;
1700
1701 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1702 ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1703
1704 spin_lock_irq(&mdev->req_lock);
1705 D_ASSERT(!hlist_unhashed(&e->colision));
1706 hlist_del_init(&e->colision);
1707 spin_unlock_irq(&mdev->req_lock);
1708
1709 dec_unacked(mdev);
1710
1711 return ok;
1712}
1713
1714/* Called from receive_Data.
1715 * Synchronize packets on sock with packets on msock.
1716 *
1717 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1718 * packet traveling on msock, they are still processed in the order they have
1719 * been sent.
1720 *
1721 * Note: we don't care for Ack packets overtaking P_DATA packets.
1722 *
1723 * In case packet_seq is larger than mdev->peer_seq number, there are
1724 * outstanding packets on the msock. We wait for them to arrive.
1725 * In case we are the logically next packet, we update mdev->peer_seq
1726 * ourselves. Correctly handles 32bit wrap around.
1727 *
1728 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1729 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1730 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1731 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1732 *
1733 * returns 0 if we may process the packet,
1734 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1735static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1736{
1737 DEFINE_WAIT(wait);
1738 unsigned int p_seq;
1739 long timeout;
1740 int ret = 0;
1741 spin_lock(&mdev->peer_seq_lock);
1742 for (;;) {
1743 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1744 if (seq_le(packet_seq, mdev->peer_seq+1))
1745 break;
1746 if (signal_pending(current)) {
1747 ret = -ERESTARTSYS;
1748 break;
1749 }
1750 p_seq = mdev->peer_seq;
1751 spin_unlock(&mdev->peer_seq_lock);
1752 timeout = schedule_timeout(30*HZ);
1753 spin_lock(&mdev->peer_seq_lock);
1754 if (timeout == 0 && p_seq == mdev->peer_seq) {
1755 ret = -ETIMEDOUT;
1756 dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1757 break;
1758 }
1759 }
1760 finish_wait(&mdev->seq_wait, &wait);
1761 if (mdev->peer_seq+1 == packet_seq)
1762 mdev->peer_seq++;
1763 spin_unlock(&mdev->peer_seq_lock);
1764 return ret;
1765}
1766
76d2e7ec
PR
1767static unsigned long write_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1768{
1769 if (mdev->agreed_pro_version >= 95)
1770 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1771 (dpf & DP_UNPLUG ? REQ_UNPLUG : 0) |
1772 (dpf & DP_FUA ? REQ_FUA : 0) |
1773 (dpf & DP_FLUSH ? REQ_FUA : 0) |
1774 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1775 else
1776 return dpf & DP_RW_SYNC ? (REQ_SYNC | REQ_UNPLUG) : 0;
1777}
1778
b411b363 1779/* mirrored write */
02918be2 1780static int receive_Data(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
b411b363
PR
1781{
1782 sector_t sector;
1783 struct drbd_epoch_entry *e;
02918be2 1784 struct p_data *p = &mdev->data.rbuf.data;
b411b363
PR
1785 int rw = WRITE;
1786 u32 dp_flags;
1787
b411b363
PR
1788 if (!get_ldev(mdev)) {
1789 if (__ratelimit(&drbd_ratelimit_state))
1790 dev_err(DEV, "Can not write mirrored data block "
1791 "to local disk.\n");
1792 spin_lock(&mdev->peer_seq_lock);
1793 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1794 mdev->peer_seq++;
1795 spin_unlock(&mdev->peer_seq_lock);
1796
2b2bf214 1797 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
b411b363
PR
1798 atomic_inc(&mdev->current_epoch->epoch_size);
1799 return drbd_drain_block(mdev, data_size);
1800 }
1801
1802 /* get_ldev(mdev) successful.
1803 * Corresponding put_ldev done either below (on various errors),
1804 * or in drbd_endio_write_sec, if we successfully submit the data at
1805 * the end of this function. */
1806
1807 sector = be64_to_cpu(p->sector);
1808 e = read_in_block(mdev, p->block_id, sector, data_size);
1809 if (!e) {
1810 put_ldev(mdev);
1811 return FALSE;
1812 }
1813
b411b363
PR
1814 e->w.cb = e_end_block;
1815
1816 spin_lock(&mdev->epoch_lock);
1817 e->epoch = mdev->current_epoch;
1818 atomic_inc(&e->epoch->epoch_size);
1819 atomic_inc(&e->epoch->active);
1820
1821 if (mdev->write_ordering == WO_bio_barrier && atomic_read(&e->epoch->epoch_size) == 1) {
1822 struct drbd_epoch *epoch;
1823 /* Issue a barrier if we start a new epoch, and the previous epoch
1824 was not a epoch containing a single request which already was
1825 a Barrier. */
1826 epoch = list_entry(e->epoch->list.prev, struct drbd_epoch, list);
1827 if (epoch == e->epoch) {
1828 set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
7b6d91da 1829 rw |= REQ_HARDBARRIER;
b411b363
PR
1830 e->flags |= EE_IS_BARRIER;
1831 } else {
1832 if (atomic_read(&epoch->epoch_size) > 1 ||
1833 !test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) {
1834 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
b411b363 1835 set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
7b6d91da 1836 rw |= REQ_HARDBARRIER;
b411b363
PR
1837 e->flags |= EE_IS_BARRIER;
1838 }
1839 }
1840 }
1841 spin_unlock(&mdev->epoch_lock);
1842
1843 dp_flags = be32_to_cpu(p->dp_flags);
76d2e7ec
PR
1844 rw |= write_flags_to_bio(mdev, dp_flags);
1845
b411b363
PR
1846 if (dp_flags & DP_MAY_SET_IN_SYNC)
1847 e->flags |= EE_MAY_SET_IN_SYNC;
1848
1849 /* I'm the receiver, I do hold a net_cnt reference. */
1850 if (!mdev->net_conf->two_primaries) {
1851 spin_lock_irq(&mdev->req_lock);
1852 } else {
1853 /* don't get the req_lock yet,
1854 * we may sleep in drbd_wait_peer_seq */
1855 const int size = e->size;
1856 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1857 DEFINE_WAIT(wait);
1858 struct drbd_request *i;
1859 struct hlist_node *n;
1860 struct hlist_head *slot;
1861 int first;
1862
1863 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1864 BUG_ON(mdev->ee_hash == NULL);
1865 BUG_ON(mdev->tl_hash == NULL);
1866
1867 /* conflict detection and handling:
1868 * 1. wait on the sequence number,
1869 * in case this data packet overtook ACK packets.
1870 * 2. check our hash tables for conflicting requests.
1871 * we only need to walk the tl_hash, since an ee can not
1872 * have a conflict with an other ee: on the submitting
1873 * node, the corresponding req had already been conflicting,
1874 * and a conflicting req is never sent.
1875 *
1876 * Note: for two_primaries, we are protocol C,
1877 * so there cannot be any request that is DONE
1878 * but still on the transfer log.
1879 *
1880 * unconditionally add to the ee_hash.
1881 *
1882 * if no conflicting request is found:
1883 * submit.
1884 *
1885 * if any conflicting request is found
1886 * that has not yet been acked,
1887 * AND I have the "discard concurrent writes" flag:
1888 * queue (via done_ee) the P_DISCARD_ACK; OUT.
1889 *
1890 * if any conflicting request is found:
1891 * block the receiver, waiting on misc_wait
1892 * until no more conflicting requests are there,
1893 * or we get interrupted (disconnect).
1894 *
1895 * we do not just write after local io completion of those
1896 * requests, but only after req is done completely, i.e.
1897 * we wait for the P_DISCARD_ACK to arrive!
1898 *
1899 * then proceed normally, i.e. submit.
1900 */
1901 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1902 goto out_interrupted;
1903
1904 spin_lock_irq(&mdev->req_lock);
1905
1906 hlist_add_head(&e->colision, ee_hash_slot(mdev, sector));
1907
1908#define OVERLAPS overlaps(i->sector, i->size, sector, size)
1909 slot = tl_hash_slot(mdev, sector);
1910 first = 1;
1911 for (;;) {
1912 int have_unacked = 0;
1913 int have_conflict = 0;
1914 prepare_to_wait(&mdev->misc_wait, &wait,
1915 TASK_INTERRUPTIBLE);
1916 hlist_for_each_entry(i, n, slot, colision) {
1917 if (OVERLAPS) {
1918 /* only ALERT on first iteration,
1919 * we may be woken up early... */
1920 if (first)
1921 dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1922 " new: %llus +%u; pending: %llus +%u\n",
1923 current->comm, current->pid,
1924 (unsigned long long)sector, size,
1925 (unsigned long long)i->sector, i->size);
1926 if (i->rq_state & RQ_NET_PENDING)
1927 ++have_unacked;
1928 ++have_conflict;
1929 }
1930 }
1931#undef OVERLAPS
1932 if (!have_conflict)
1933 break;
1934
1935 /* Discard Ack only for the _first_ iteration */
1936 if (first && discard && have_unacked) {
1937 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1938 (unsigned long long)sector);
1939 inc_unacked(mdev);
1940 e->w.cb = e_send_discard_ack;
1941 list_add_tail(&e->w.list, &mdev->done_ee);
1942
1943 spin_unlock_irq(&mdev->req_lock);
1944
1945 /* we could probably send that P_DISCARD_ACK ourselves,
1946 * but I don't like the receiver using the msock */
1947
1948 put_ldev(mdev);
1949 wake_asender(mdev);
1950 finish_wait(&mdev->misc_wait, &wait);
1951 return TRUE;
1952 }
1953
1954 if (signal_pending(current)) {
1955 hlist_del_init(&e->colision);
1956
1957 spin_unlock_irq(&mdev->req_lock);
1958
1959 finish_wait(&mdev->misc_wait, &wait);
1960 goto out_interrupted;
1961 }
1962
1963 spin_unlock_irq(&mdev->req_lock);
1964 if (first) {
1965 first = 0;
1966 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1967 "sec=%llus\n", (unsigned long long)sector);
1968 } else if (discard) {
1969 /* we had none on the first iteration.
1970 * there must be none now. */
1971 D_ASSERT(have_unacked == 0);
1972 }
1973 schedule();
1974 spin_lock_irq(&mdev->req_lock);
1975 }
1976 finish_wait(&mdev->misc_wait, &wait);
1977 }
1978
1979 list_add(&e->w.list, &mdev->active_ee);
1980 spin_unlock_irq(&mdev->req_lock);
1981
1982 switch (mdev->net_conf->wire_protocol) {
1983 case DRBD_PROT_C:
1984 inc_unacked(mdev);
1985 /* corresponding dec_unacked() in e_end_block()
1986 * respective _drbd_clear_done_ee */
1987 break;
1988 case DRBD_PROT_B:
1989 /* I really don't like it that the receiver thread
1990 * sends on the msock, but anyways */
1991 drbd_send_ack(mdev, P_RECV_ACK, e);
1992 break;
1993 case DRBD_PROT_A:
1994 /* nothing to do */
1995 break;
1996 }
1997
1998 if (mdev->state.pdsk == D_DISKLESS) {
1999 /* In case we have the only disk of the cluster, */
2000 drbd_set_out_of_sync(mdev, e->sector, e->size);
2001 e->flags |= EE_CALL_AL_COMPLETE_IO;
2002 drbd_al_begin_io(mdev, e->sector);
2003 }
2004
45bb912b
LE
2005 if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
2006 return TRUE;
b411b363 2007
22cc37a9
LE
2008 /* drbd_submit_ee currently fails for one reason only:
2009 * not being able to allocate enough bios.
2010 * Is dropping the connection going to help? */
2011 spin_lock_irq(&mdev->req_lock);
2012 list_del(&e->w.list);
2013 hlist_del_init(&e->colision);
2014 spin_unlock_irq(&mdev->req_lock);
2015 if (e->flags & EE_CALL_AL_COMPLETE_IO)
2016 drbd_al_complete_io(mdev, e->sector);
2017
b411b363
PR
2018out_interrupted:
2019 /* yes, the epoch_size now is imbalanced.
2020 * but we drop the connection anyways, so we don't have a chance to
2021 * receive a barrier... atomic_inc(&mdev->epoch_size); */
2022 put_ldev(mdev);
2023 drbd_free_ee(mdev, e);
2024 return FALSE;
2025}
2026
0f0601f4
LE
2027/* We may throttle resync, if the lower device seems to be busy,
2028 * and current sync rate is above c_min_rate.
2029 *
2030 * To decide whether or not the lower device is busy, we use a scheme similar
2031 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2032 * (more than 64 sectors) of activity we cannot account for with our own resync
2033 * activity, it obviously is "busy".
2034 *
2035 * The current sync rate used here uses only the most recent two step marks,
2036 * to have a short time average so we can react faster.
2037 */
2038int drbd_rs_should_slow_down(struct drbd_conf *mdev)
2039{
2040 struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2041 unsigned long db, dt, dbdt;
2042 int curr_events;
2043 int throttle = 0;
2044
2045 /* feature disabled? */
2046 if (mdev->sync_conf.c_min_rate == 0)
2047 return 0;
2048
2049 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2050 (int)part_stat_read(&disk->part0, sectors[1]) -
2051 atomic_read(&mdev->rs_sect_ev);
2052 if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2053 unsigned long rs_left;
2054 int i;
2055
2056 mdev->rs_last_events = curr_events;
2057
2058 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2059 * approx. */
2060 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-2) % DRBD_SYNC_MARKS;
2061 rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2062
2063 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2064 if (!dt)
2065 dt++;
2066 db = mdev->rs_mark_left[i] - rs_left;
2067 dbdt = Bit2KB(db/dt);
2068
2069 if (dbdt > mdev->sync_conf.c_min_rate)
2070 throttle = 1;
2071 }
2072 return throttle;
2073}
2074
2075
02918be2 2076static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int digest_size)
b411b363
PR
2077{
2078 sector_t sector;
2079 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
2080 struct drbd_epoch_entry *e;
2081 struct digest_info *di = NULL;
02918be2 2082 int size;
b411b363 2083 unsigned int fault_type;
02918be2 2084 struct p_block_req *p = &mdev->data.rbuf.block_req;
b411b363
PR
2085
2086 sector = be64_to_cpu(p->sector);
2087 size = be32_to_cpu(p->blksize);
2088
2089 if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_SEGMENT_SIZE) {
2090 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2091 (unsigned long long)sector, size);
2092 return FALSE;
2093 }
2094 if (sector + (size>>9) > capacity) {
2095 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2096 (unsigned long long)sector, size);
2097 return FALSE;
2098 }
2099
2100 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2101 if (__ratelimit(&drbd_ratelimit_state))
2102 dev_err(DEV, "Can not satisfy peer's read request, "
2103 "no local data.\n");
02918be2 2104 drbd_send_ack_rp(mdev, cmd == P_DATA_REQUEST ? P_NEG_DREPLY :
b411b363 2105 P_NEG_RS_DREPLY , p);
a821cc4a
LE
2106 /* drain possibly payload */
2107 return drbd_drain_block(mdev, digest_size);
b411b363
PR
2108 }
2109
2110 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2111 * "criss-cross" setup, that might cause write-out on some other DRBD,
2112 * which in turn might block on the other node at this very place. */
2113 e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2114 if (!e) {
2115 put_ldev(mdev);
2116 return FALSE;
2117 }
2118
02918be2 2119 switch (cmd) {
b411b363
PR
2120 case P_DATA_REQUEST:
2121 e->w.cb = w_e_end_data_req;
2122 fault_type = DRBD_FAULT_DT_RD;
80a40e43
LE
2123 /* application IO, don't drbd_rs_begin_io */
2124 goto submit;
2125
b411b363
PR
2126 case P_RS_DATA_REQUEST:
2127 e->w.cb = w_e_end_rsdata_req;
2128 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2129 break;
2130
2131 case P_OV_REPLY:
2132 case P_CSUM_RS_REQUEST:
2133 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2134 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2135 if (!di)
2136 goto out_free_e;
2137
2138 di->digest_size = digest_size;
2139 di->digest = (((char *)di)+sizeof(struct digest_info));
2140
c36c3ced
LE
2141 e->digest = di;
2142 e->flags |= EE_HAS_DIGEST;
2143
b411b363
PR
2144 if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2145 goto out_free_e;
2146
02918be2 2147 if (cmd == P_CSUM_RS_REQUEST) {
b411b363
PR
2148 D_ASSERT(mdev->agreed_pro_version >= 89);
2149 e->w.cb = w_e_end_csum_rs_req;
02918be2 2150 } else if (cmd == P_OV_REPLY) {
b411b363
PR
2151 e->w.cb = w_e_end_ov_reply;
2152 dec_rs_pending(mdev);
0f0601f4
LE
2153 /* drbd_rs_begin_io done when we sent this request,
2154 * but accounting still needs to be done. */
2155 goto submit_for_resync;
b411b363
PR
2156 }
2157 break;
2158
2159 case P_OV_REQUEST:
2160 if (mdev->state.conn >= C_CONNECTED &&
2161 mdev->state.conn != C_VERIFY_T)
2162 dev_warn(DEV, "ASSERT FAILED: got P_OV_REQUEST while being %s\n",
2163 drbd_conn_str(mdev->state.conn));
2164 if (mdev->ov_start_sector == ~(sector_t)0 &&
2165 mdev->agreed_pro_version >= 90) {
2166 mdev->ov_start_sector = sector;
2167 mdev->ov_position = sector;
2168 mdev->ov_left = mdev->rs_total - BM_SECT_TO_BIT(sector);
2169 dev_info(DEV, "Online Verify start sector: %llu\n",
2170 (unsigned long long)sector);
2171 }
2172 e->w.cb = w_e_end_ov_req;
2173 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2174 break;
2175
b411b363
PR
2176 default:
2177 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
02918be2 2178 cmdname(cmd));
b411b363 2179 fault_type = DRBD_FAULT_MAX;
80a40e43 2180 goto out_free_e;
b411b363
PR
2181 }
2182
0f0601f4
LE
2183 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2184 * wrt the receiver, but it is not as straightforward as it may seem.
2185 * Various places in the resync start and stop logic assume resync
2186 * requests are processed in order, requeuing this on the worker thread
2187 * introduces a bunch of new code for synchronization between threads.
2188 *
2189 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2190 * "forever", throttling after drbd_rs_begin_io will lock that extent
2191 * for application writes for the same time. For now, just throttle
2192 * here, where the rest of the code expects the receiver to sleep for
2193 * a while, anyways.
2194 */
2195
2196 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2197 * this defers syncer requests for some time, before letting at least
2198 * on request through. The resync controller on the receiving side
2199 * will adapt to the incoming rate accordingly.
2200 *
2201 * We cannot throttle here if remote is Primary/SyncTarget:
2202 * we would also throttle its application reads.
2203 * In that case, throttling is done on the SyncTarget only.
2204 */
2205 if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev))
2206 msleep(100);
80a40e43
LE
2207 if (drbd_rs_begin_io(mdev, e->sector))
2208 goto out_free_e;
b411b363 2209
0f0601f4
LE
2210submit_for_resync:
2211 atomic_add(size >> 9, &mdev->rs_sect_ev);
2212
80a40e43 2213submit:
b411b363 2214 inc_unacked(mdev);
80a40e43
LE
2215 spin_lock_irq(&mdev->req_lock);
2216 list_add_tail(&e->w.list, &mdev->read_ee);
2217 spin_unlock_irq(&mdev->req_lock);
b411b363 2218
45bb912b
LE
2219 if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2220 return TRUE;
b411b363 2221
22cc37a9
LE
2222 /* drbd_submit_ee currently fails for one reason only:
2223 * not being able to allocate enough bios.
2224 * Is dropping the connection going to help? */
2225 spin_lock_irq(&mdev->req_lock);
2226 list_del(&e->w.list);
2227 spin_unlock_irq(&mdev->req_lock);
2228 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2229
b411b363 2230out_free_e:
b411b363
PR
2231 put_ldev(mdev);
2232 drbd_free_ee(mdev, e);
2233 return FALSE;
2234}
2235
2236static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2237{
2238 int self, peer, rv = -100;
2239 unsigned long ch_self, ch_peer;
2240
2241 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2242 peer = mdev->p_uuid[UI_BITMAP] & 1;
2243
2244 ch_peer = mdev->p_uuid[UI_SIZE];
2245 ch_self = mdev->comm_bm_set;
2246
2247 switch (mdev->net_conf->after_sb_0p) {
2248 case ASB_CONSENSUS:
2249 case ASB_DISCARD_SECONDARY:
2250 case ASB_CALL_HELPER:
2251 dev_err(DEV, "Configuration error.\n");
2252 break;
2253 case ASB_DISCONNECT:
2254 break;
2255 case ASB_DISCARD_YOUNGER_PRI:
2256 if (self == 0 && peer == 1) {
2257 rv = -1;
2258 break;
2259 }
2260 if (self == 1 && peer == 0) {
2261 rv = 1;
2262 break;
2263 }
2264 /* Else fall through to one of the other strategies... */
2265 case ASB_DISCARD_OLDER_PRI:
2266 if (self == 0 && peer == 1) {
2267 rv = 1;
2268 break;
2269 }
2270 if (self == 1 && peer == 0) {
2271 rv = -1;
2272 break;
2273 }
2274 /* Else fall through to one of the other strategies... */
ad19bf6e 2275 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
b411b363
PR
2276 "Using discard-least-changes instead\n");
2277 case ASB_DISCARD_ZERO_CHG:
2278 if (ch_peer == 0 && ch_self == 0) {
2279 rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2280 ? -1 : 1;
2281 break;
2282 } else {
2283 if (ch_peer == 0) { rv = 1; break; }
2284 if (ch_self == 0) { rv = -1; break; }
2285 }
2286 if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2287 break;
2288 case ASB_DISCARD_LEAST_CHG:
2289 if (ch_self < ch_peer)
2290 rv = -1;
2291 else if (ch_self > ch_peer)
2292 rv = 1;
2293 else /* ( ch_self == ch_peer ) */
2294 /* Well, then use something else. */
2295 rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2296 ? -1 : 1;
2297 break;
2298 case ASB_DISCARD_LOCAL:
2299 rv = -1;
2300 break;
2301 case ASB_DISCARD_REMOTE:
2302 rv = 1;
2303 }
2304
2305 return rv;
2306}
2307
2308static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2309{
2310 int self, peer, hg, rv = -100;
2311
2312 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2313 peer = mdev->p_uuid[UI_BITMAP] & 1;
2314
2315 switch (mdev->net_conf->after_sb_1p) {
2316 case ASB_DISCARD_YOUNGER_PRI:
2317 case ASB_DISCARD_OLDER_PRI:
2318 case ASB_DISCARD_LEAST_CHG:
2319 case ASB_DISCARD_LOCAL:
2320 case ASB_DISCARD_REMOTE:
2321 dev_err(DEV, "Configuration error.\n");
2322 break;
2323 case ASB_DISCONNECT:
2324 break;
2325 case ASB_CONSENSUS:
2326 hg = drbd_asb_recover_0p(mdev);
2327 if (hg == -1 && mdev->state.role == R_SECONDARY)
2328 rv = hg;
2329 if (hg == 1 && mdev->state.role == R_PRIMARY)
2330 rv = hg;
2331 break;
2332 case ASB_VIOLENTLY:
2333 rv = drbd_asb_recover_0p(mdev);
2334 break;
2335 case ASB_DISCARD_SECONDARY:
2336 return mdev->state.role == R_PRIMARY ? 1 : -1;
2337 case ASB_CALL_HELPER:
2338 hg = drbd_asb_recover_0p(mdev);
2339 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2340 self = drbd_set_role(mdev, R_SECONDARY, 0);
2341 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2342 * we might be here in C_WF_REPORT_PARAMS which is transient.
2343 * we do not need to wait for the after state change work either. */
2344 self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2345 if (self != SS_SUCCESS) {
2346 drbd_khelper(mdev, "pri-lost-after-sb");
2347 } else {
2348 dev_warn(DEV, "Successfully gave up primary role.\n");
2349 rv = hg;
2350 }
2351 } else
2352 rv = hg;
2353 }
2354
2355 return rv;
2356}
2357
2358static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2359{
2360 int self, peer, hg, rv = -100;
2361
2362 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2363 peer = mdev->p_uuid[UI_BITMAP] & 1;
2364
2365 switch (mdev->net_conf->after_sb_2p) {
2366 case ASB_DISCARD_YOUNGER_PRI:
2367 case ASB_DISCARD_OLDER_PRI:
2368 case ASB_DISCARD_LEAST_CHG:
2369 case ASB_DISCARD_LOCAL:
2370 case ASB_DISCARD_REMOTE:
2371 case ASB_CONSENSUS:
2372 case ASB_DISCARD_SECONDARY:
2373 dev_err(DEV, "Configuration error.\n");
2374 break;
2375 case ASB_VIOLENTLY:
2376 rv = drbd_asb_recover_0p(mdev);
2377 break;
2378 case ASB_DISCONNECT:
2379 break;
2380 case ASB_CALL_HELPER:
2381 hg = drbd_asb_recover_0p(mdev);
2382 if (hg == -1) {
2383 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2384 * we might be here in C_WF_REPORT_PARAMS which is transient.
2385 * we do not need to wait for the after state change work either. */
2386 self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2387 if (self != SS_SUCCESS) {
2388 drbd_khelper(mdev, "pri-lost-after-sb");
2389 } else {
2390 dev_warn(DEV, "Successfully gave up primary role.\n");
2391 rv = hg;
2392 }
2393 } else
2394 rv = hg;
2395 }
2396
2397 return rv;
2398}
2399
2400static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2401 u64 bits, u64 flags)
2402{
2403 if (!uuid) {
2404 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2405 return;
2406 }
2407 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2408 text,
2409 (unsigned long long)uuid[UI_CURRENT],
2410 (unsigned long long)uuid[UI_BITMAP],
2411 (unsigned long long)uuid[UI_HISTORY_START],
2412 (unsigned long long)uuid[UI_HISTORY_END],
2413 (unsigned long long)bits,
2414 (unsigned long long)flags);
2415}
2416
2417/*
2418 100 after split brain try auto recover
2419 2 C_SYNC_SOURCE set BitMap
2420 1 C_SYNC_SOURCE use BitMap
2421 0 no Sync
2422 -1 C_SYNC_TARGET use BitMap
2423 -2 C_SYNC_TARGET set BitMap
2424 -100 after split brain, disconnect
2425-1000 unrelated data
2426 */
2427static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2428{
2429 u64 self, peer;
2430 int i, j;
2431
2432 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2433 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2434
2435 *rule_nr = 10;
2436 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2437 return 0;
2438
2439 *rule_nr = 20;
2440 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2441 peer != UUID_JUST_CREATED)
2442 return -2;
2443
2444 *rule_nr = 30;
2445 if (self != UUID_JUST_CREATED &&
2446 (peer == UUID_JUST_CREATED || peer == (u64)0))
2447 return 2;
2448
2449 if (self == peer) {
2450 int rct, dc; /* roles at crash time */
2451
2452 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2453
2454 if (mdev->agreed_pro_version < 91)
2455 return -1001;
2456
2457 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2458 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2459 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2460 drbd_uuid_set_bm(mdev, 0UL);
2461
2462 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2463 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2464 *rule_nr = 34;
2465 } else {
2466 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2467 *rule_nr = 36;
2468 }
2469
2470 return 1;
2471 }
2472
2473 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2474
2475 if (mdev->agreed_pro_version < 91)
2476 return -1001;
2477
2478 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2479 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2480 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2481
2482 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2483 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2484 mdev->p_uuid[UI_BITMAP] = 0UL;
2485
2486 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2487 *rule_nr = 35;
2488 } else {
2489 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2490 *rule_nr = 37;
2491 }
2492
2493 return -1;
2494 }
2495
2496 /* Common power [off|failure] */
2497 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2498 (mdev->p_uuid[UI_FLAGS] & 2);
2499 /* lowest bit is set when we were primary,
2500 * next bit (weight 2) is set when peer was primary */
2501 *rule_nr = 40;
2502
2503 switch (rct) {
2504 case 0: /* !self_pri && !peer_pri */ return 0;
2505 case 1: /* self_pri && !peer_pri */ return 1;
2506 case 2: /* !self_pri && peer_pri */ return -1;
2507 case 3: /* self_pri && peer_pri */
2508 dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2509 return dc ? -1 : 1;
2510 }
2511 }
2512
2513 *rule_nr = 50;
2514 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2515 if (self == peer)
2516 return -1;
2517
2518 *rule_nr = 51;
2519 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2520 if (self == peer) {
2521 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2522 peer = mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1);
2523 if (self == peer) {
2524 /* The last P_SYNC_UUID did not get though. Undo the last start of
2525 resync as sync source modifications of the peer's UUIDs. */
2526
2527 if (mdev->agreed_pro_version < 91)
2528 return -1001;
2529
2530 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2531 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2532 return -1;
2533 }
2534 }
2535
2536 *rule_nr = 60;
2537 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2538 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2539 peer = mdev->p_uuid[i] & ~((u64)1);
2540 if (self == peer)
2541 return -2;
2542 }
2543
2544 *rule_nr = 70;
2545 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2546 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2547 if (self == peer)
2548 return 1;
2549
2550 *rule_nr = 71;
2551 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2552 if (self == peer) {
2553 self = mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1);
2554 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2555 if (self == peer) {
2556 /* The last P_SYNC_UUID did not get though. Undo the last start of
2557 resync as sync source modifications of our UUIDs. */
2558
2559 if (mdev->agreed_pro_version < 91)
2560 return -1001;
2561
2562 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2563 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2564
2565 dev_info(DEV, "Undid last start of resync:\n");
2566
2567 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2568 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2569
2570 return 1;
2571 }
2572 }
2573
2574
2575 *rule_nr = 80;
d8c2a36b 2576 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
b411b363
PR
2577 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2578 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2579 if (self == peer)
2580 return 2;
2581 }
2582
2583 *rule_nr = 90;
2584 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2585 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2586 if (self == peer && self != ((u64)0))
2587 return 100;
2588
2589 *rule_nr = 100;
2590 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2591 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2592 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2593 peer = mdev->p_uuid[j] & ~((u64)1);
2594 if (self == peer)
2595 return -100;
2596 }
2597 }
2598
2599 return -1000;
2600}
2601
2602/* drbd_sync_handshake() returns the new conn state on success, or
2603 CONN_MASK (-1) on failure.
2604 */
2605static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2606 enum drbd_disk_state peer_disk) __must_hold(local)
2607{
2608 int hg, rule_nr;
2609 enum drbd_conns rv = C_MASK;
2610 enum drbd_disk_state mydisk;
2611
2612 mydisk = mdev->state.disk;
2613 if (mydisk == D_NEGOTIATING)
2614 mydisk = mdev->new_state_tmp.disk;
2615
2616 dev_info(DEV, "drbd_sync_handshake:\n");
2617 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2618 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2619 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2620
2621 hg = drbd_uuid_compare(mdev, &rule_nr);
2622
2623 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2624
2625 if (hg == -1000) {
2626 dev_alert(DEV, "Unrelated data, aborting!\n");
2627 return C_MASK;
2628 }
2629 if (hg == -1001) {
2630 dev_alert(DEV, "To resolve this both sides have to support at least protocol\n");
2631 return C_MASK;
2632 }
2633
2634 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2635 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2636 int f = (hg == -100) || abs(hg) == 2;
2637 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2638 if (f)
2639 hg = hg*2;
2640 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2641 hg > 0 ? "source" : "target");
2642 }
2643
3a11a487
AG
2644 if (abs(hg) == 100)
2645 drbd_khelper(mdev, "initial-split-brain");
2646
b411b363
PR
2647 if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2648 int pcount = (mdev->state.role == R_PRIMARY)
2649 + (peer_role == R_PRIMARY);
2650 int forced = (hg == -100);
2651
2652 switch (pcount) {
2653 case 0:
2654 hg = drbd_asb_recover_0p(mdev);
2655 break;
2656 case 1:
2657 hg = drbd_asb_recover_1p(mdev);
2658 break;
2659 case 2:
2660 hg = drbd_asb_recover_2p(mdev);
2661 break;
2662 }
2663 if (abs(hg) < 100) {
2664 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2665 "automatically solved. Sync from %s node\n",
2666 pcount, (hg < 0) ? "peer" : "this");
2667 if (forced) {
2668 dev_warn(DEV, "Doing a full sync, since"
2669 " UUIDs where ambiguous.\n");
2670 hg = hg*2;
2671 }
2672 }
2673 }
2674
2675 if (hg == -100) {
2676 if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2677 hg = -1;
2678 if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2679 hg = 1;
2680
2681 if (abs(hg) < 100)
2682 dev_warn(DEV, "Split-Brain detected, manually solved. "
2683 "Sync from %s node\n",
2684 (hg < 0) ? "peer" : "this");
2685 }
2686
2687 if (hg == -100) {
580b9767
LE
2688 /* FIXME this log message is not correct if we end up here
2689 * after an attempted attach on a diskless node.
2690 * We just refuse to attach -- well, we drop the "connection"
2691 * to that disk, in a way... */
3a11a487 2692 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
b411b363
PR
2693 drbd_khelper(mdev, "split-brain");
2694 return C_MASK;
2695 }
2696
2697 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2698 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2699 return C_MASK;
2700 }
2701
2702 if (hg < 0 && /* by intention we do not use mydisk here. */
2703 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2704 switch (mdev->net_conf->rr_conflict) {
2705 case ASB_CALL_HELPER:
2706 drbd_khelper(mdev, "pri-lost");
2707 /* fall through */
2708 case ASB_DISCONNECT:
2709 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2710 return C_MASK;
2711 case ASB_VIOLENTLY:
2712 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2713 "assumption\n");
2714 }
2715 }
2716
cf14c2e9
PR
2717 if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2718 if (hg == 0)
2719 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2720 else
2721 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2722 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2723 abs(hg) >= 2 ? "full" : "bit-map based");
2724 return C_MASK;
2725 }
2726
b411b363
PR
2727 if (abs(hg) >= 2) {
2728 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2729 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake"))
2730 return C_MASK;
2731 }
2732
2733 if (hg > 0) { /* become sync source. */
2734 rv = C_WF_BITMAP_S;
2735 } else if (hg < 0) { /* become sync target */
2736 rv = C_WF_BITMAP_T;
2737 } else {
2738 rv = C_CONNECTED;
2739 if (drbd_bm_total_weight(mdev)) {
2740 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2741 drbd_bm_total_weight(mdev));
2742 }
2743 }
2744
2745 return rv;
2746}
2747
2748/* returns 1 if invalid */
2749static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2750{
2751 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2752 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2753 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2754 return 0;
2755
2756 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2757 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2758 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2759 return 1;
2760
2761 /* everything else is valid if they are equal on both sides. */
2762 if (peer == self)
2763 return 0;
2764
2765 /* everything es is invalid. */
2766 return 1;
2767}
2768
02918be2 2769static int receive_protocol(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
b411b363 2770{
02918be2 2771 struct p_protocol *p = &mdev->data.rbuf.protocol;
b411b363 2772 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
cf14c2e9 2773 int p_want_lose, p_two_primaries, cf;
b411b363
PR
2774 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2775
b411b363
PR
2776 p_proto = be32_to_cpu(p->protocol);
2777 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2778 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2779 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
b411b363 2780 p_two_primaries = be32_to_cpu(p->two_primaries);
cf14c2e9
PR
2781 cf = be32_to_cpu(p->conn_flags);
2782 p_want_lose = cf & CF_WANT_LOSE;
2783
2784 clear_bit(CONN_DRY_RUN, &mdev->flags);
2785
2786 if (cf & CF_DRY_RUN)
2787 set_bit(CONN_DRY_RUN, &mdev->flags);
b411b363
PR
2788
2789 if (p_proto != mdev->net_conf->wire_protocol) {
2790 dev_err(DEV, "incompatible communication protocols\n");
2791 goto disconnect;
2792 }
2793
2794 if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2795 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2796 goto disconnect;
2797 }
2798
2799 if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2800 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2801 goto disconnect;
2802 }
2803
2804 if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2805 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2806 goto disconnect;
2807 }
2808
2809 if (p_want_lose && mdev->net_conf->want_lose) {
2810 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2811 goto disconnect;
2812 }
2813
2814 if (p_two_primaries != mdev->net_conf->two_primaries) {
2815 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2816 goto disconnect;
2817 }
2818
2819 if (mdev->agreed_pro_version >= 87) {
2820 unsigned char *my_alg = mdev->net_conf->integrity_alg;
2821
2822 if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2823 return FALSE;
2824
2825 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2826 if (strcmp(p_integrity_alg, my_alg)) {
2827 dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2828 goto disconnect;
2829 }
2830 dev_info(DEV, "data-integrity-alg: %s\n",
2831 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2832 }
2833
2834 return TRUE;
2835
2836disconnect:
2837 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2838 return FALSE;
2839}
2840
2841/* helper function
2842 * input: alg name, feature name
2843 * return: NULL (alg name was "")
2844 * ERR_PTR(error) if something goes wrong
2845 * or the crypto hash ptr, if it worked out ok. */
2846struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2847 const char *alg, const char *name)
2848{
2849 struct crypto_hash *tfm;
2850
2851 if (!alg[0])
2852 return NULL;
2853
2854 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2855 if (IS_ERR(tfm)) {
2856 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2857 alg, name, PTR_ERR(tfm));
2858 return tfm;
2859 }
2860 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2861 crypto_free_hash(tfm);
2862 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2863 return ERR_PTR(-EINVAL);
2864 }
2865 return tfm;
2866}
2867
02918be2 2868static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int packet_size)
b411b363
PR
2869{
2870 int ok = TRUE;
02918be2 2871 struct p_rs_param_95 *p = &mdev->data.rbuf.rs_param_95;
b411b363
PR
2872 unsigned int header_size, data_size, exp_max_sz;
2873 struct crypto_hash *verify_tfm = NULL;
2874 struct crypto_hash *csums_tfm = NULL;
2875 const int apv = mdev->agreed_pro_version;
778f271d
PR
2876 int *rs_plan_s = NULL;
2877 int fifo_size = 0;
b411b363
PR
2878
2879 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
2880 : apv == 88 ? sizeof(struct p_rs_param)
2881 + SHARED_SECRET_MAX
8e26f9cc
PR
2882 : apv <= 94 ? sizeof(struct p_rs_param_89)
2883 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
b411b363 2884
02918be2 2885 if (packet_size > exp_max_sz) {
b411b363 2886 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
02918be2 2887 packet_size, exp_max_sz);
b411b363
PR
2888 return FALSE;
2889 }
2890
2891 if (apv <= 88) {
02918be2
PR
2892 header_size = sizeof(struct p_rs_param) - sizeof(struct p_header80);
2893 data_size = packet_size - header_size;
8e26f9cc 2894 } else if (apv <= 94) {
02918be2
PR
2895 header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header80);
2896 data_size = packet_size - header_size;
b411b363 2897 D_ASSERT(data_size == 0);
8e26f9cc 2898 } else {
02918be2
PR
2899 header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header80);
2900 data_size = packet_size - header_size;
8e26f9cc 2901 D_ASSERT(data_size == 0);
b411b363
PR
2902 }
2903
2904 /* initialize verify_alg and csums_alg */
2905 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2906
02918be2 2907 if (drbd_recv(mdev, &p->head.payload, header_size) != header_size)
b411b363
PR
2908 return FALSE;
2909
2910 mdev->sync_conf.rate = be32_to_cpu(p->rate);
2911
2912 if (apv >= 88) {
2913 if (apv == 88) {
2914 if (data_size > SHARED_SECRET_MAX) {
2915 dev_err(DEV, "verify-alg too long, "
2916 "peer wants %u, accepting only %u byte\n",
2917 data_size, SHARED_SECRET_MAX);
2918 return FALSE;
2919 }
2920
2921 if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2922 return FALSE;
2923
2924 /* we expect NUL terminated string */
2925 /* but just in case someone tries to be evil */
2926 D_ASSERT(p->verify_alg[data_size-1] == 0);
2927 p->verify_alg[data_size-1] = 0;
2928
2929 } else /* apv >= 89 */ {
2930 /* we still expect NUL terminated strings */
2931 /* but just in case someone tries to be evil */
2932 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2933 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2934 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2935 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2936 }
2937
2938 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2939 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2940 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2941 mdev->sync_conf.verify_alg, p->verify_alg);
2942 goto disconnect;
2943 }
2944 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2945 p->verify_alg, "verify-alg");
2946 if (IS_ERR(verify_tfm)) {
2947 verify_tfm = NULL;
2948 goto disconnect;
2949 }
2950 }
2951
2952 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2953 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2954 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2955 mdev->sync_conf.csums_alg, p->csums_alg);
2956 goto disconnect;
2957 }
2958 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2959 p->csums_alg, "csums-alg");
2960 if (IS_ERR(csums_tfm)) {
2961 csums_tfm = NULL;
2962 goto disconnect;
2963 }
2964 }
2965
8e26f9cc
PR
2966 if (apv > 94) {
2967 mdev->sync_conf.rate = be32_to_cpu(p->rate);
2968 mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
2969 mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
2970 mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
2971 mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
778f271d
PR
2972
2973 fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
2974 if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
2975 rs_plan_s = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
2976 if (!rs_plan_s) {
2977 dev_err(DEV, "kmalloc of fifo_buffer failed");
2978 goto disconnect;
2979 }
2980 }
8e26f9cc 2981 }
b411b363
PR
2982
2983 spin_lock(&mdev->peer_seq_lock);
2984 /* lock against drbd_nl_syncer_conf() */
2985 if (verify_tfm) {
2986 strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2987 mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2988 crypto_free_hash(mdev->verify_tfm);
2989 mdev->verify_tfm = verify_tfm;
2990 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2991 }
2992 if (csums_tfm) {
2993 strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2994 mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2995 crypto_free_hash(mdev->csums_tfm);
2996 mdev->csums_tfm = csums_tfm;
2997 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2998 }
778f271d
PR
2999 if (fifo_size != mdev->rs_plan_s.size) {
3000 kfree(mdev->rs_plan_s.values);
3001 mdev->rs_plan_s.values = rs_plan_s;
3002 mdev->rs_plan_s.size = fifo_size;
3003 mdev->rs_planed = 0;
3004 }
b411b363
PR
3005 spin_unlock(&mdev->peer_seq_lock);
3006 }
3007
3008 return ok;
3009disconnect:
3010 /* just for completeness: actually not needed,
3011 * as this is not reached if csums_tfm was ok. */
3012 crypto_free_hash(csums_tfm);
3013 /* but free the verify_tfm again, if csums_tfm did not work out */
3014 crypto_free_hash(verify_tfm);
3015 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3016 return FALSE;
3017}
3018
3019static void drbd_setup_order_type(struct drbd_conf *mdev, int peer)
3020{
3021 /* sorry, we currently have no working implementation
3022 * of distributed TCQ */
3023}
3024
3025/* warn if the arguments differ by more than 12.5% */
3026static void warn_if_differ_considerably(struct drbd_conf *mdev,
3027 const char *s, sector_t a, sector_t b)
3028{
3029 sector_t d;
3030 if (a == 0 || b == 0)
3031 return;
3032 d = (a > b) ? (a - b) : (b - a);
3033 if (d > (a>>3) || d > (b>>3))
3034 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3035 (unsigned long long)a, (unsigned long long)b);
3036}
3037
02918be2 3038static int receive_sizes(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
b411b363 3039{
02918be2 3040 struct p_sizes *p = &mdev->data.rbuf.sizes;
b411b363
PR
3041 enum determine_dev_size dd = unchanged;
3042 unsigned int max_seg_s;
3043 sector_t p_size, p_usize, my_usize;
3044 int ldsc = 0; /* local disk size changed */
e89b591c 3045 enum dds_flags ddsf;
b411b363 3046
b411b363
PR
3047 p_size = be64_to_cpu(p->d_size);
3048 p_usize = be64_to_cpu(p->u_size);
3049
3050 if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
3051 dev_err(DEV, "some backing storage is needed\n");
3052 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3053 return FALSE;
3054 }
3055
3056 /* just store the peer's disk size for now.
3057 * we still need to figure out whether we accept that. */
3058 mdev->p_size = p_size;
3059
3060#define min_not_zero(l, r) (l == 0) ? r : ((r == 0) ? l : min(l, r))
3061 if (get_ldev(mdev)) {
3062 warn_if_differ_considerably(mdev, "lower level device sizes",
3063 p_size, drbd_get_max_capacity(mdev->ldev));
3064 warn_if_differ_considerably(mdev, "user requested size",
3065 p_usize, mdev->ldev->dc.disk_size);
3066
3067 /* if this is the first connect, or an otherwise expected
3068 * param exchange, choose the minimum */
3069 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3070 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3071 p_usize);
3072
3073 my_usize = mdev->ldev->dc.disk_size;
3074
3075 if (mdev->ldev->dc.disk_size != p_usize) {
3076 mdev->ldev->dc.disk_size = p_usize;
3077 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3078 (unsigned long)mdev->ldev->dc.disk_size);
3079 }
3080
3081 /* Never shrink a device with usable data during connect.
3082 But allow online shrinking if we are connected. */
a393db6f 3083 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
b411b363
PR
3084 drbd_get_capacity(mdev->this_bdev) &&
3085 mdev->state.disk >= D_OUTDATED &&
3086 mdev->state.conn < C_CONNECTED) {
3087 dev_err(DEV, "The peer's disk size is too small!\n");
3088 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3089 mdev->ldev->dc.disk_size = my_usize;
3090 put_ldev(mdev);
3091 return FALSE;
3092 }
3093 put_ldev(mdev);
3094 }
3095#undef min_not_zero
3096
e89b591c 3097 ddsf = be16_to_cpu(p->dds_flags);
b411b363 3098 if (get_ldev(mdev)) {
e89b591c 3099 dd = drbd_determin_dev_size(mdev, ddsf);
b411b363
PR
3100 put_ldev(mdev);
3101 if (dd == dev_size_error)
3102 return FALSE;
3103 drbd_md_sync(mdev);
3104 } else {
3105 /* I am diskless, need to accept the peer's size. */
3106 drbd_set_my_capacity(mdev, p_size);
3107 }
3108
b411b363
PR
3109 if (get_ldev(mdev)) {
3110 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3111 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3112 ldsc = 1;
3113 }
3114
a1c88d0d
LE
3115 if (mdev->agreed_pro_version < 94)
3116 max_seg_s = be32_to_cpu(p->max_segment_size);
8979d9c9
LE
3117 else if (mdev->agreed_pro_version == 94)
3118 max_seg_s = DRBD_MAX_SIZE_H80_PACKET;
a1c88d0d
LE
3119 else /* drbd 8.3.8 onwards */
3120 max_seg_s = DRBD_MAX_SEGMENT_SIZE;
3121
b411b363
PR
3122 if (max_seg_s != queue_max_segment_size(mdev->rq_queue))
3123 drbd_setup_queue_param(mdev, max_seg_s);
3124
e89b591c 3125 drbd_setup_order_type(mdev, be16_to_cpu(p->queue_order_type));
b411b363
PR
3126 put_ldev(mdev);
3127 }
3128
3129 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3130 if (be64_to_cpu(p->c_size) !=
3131 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3132 /* we have different sizes, probably peer
3133 * needs to know my new size... */
e89b591c 3134 drbd_send_sizes(mdev, 0, ddsf);
b411b363
PR
3135 }
3136 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3137 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3138 if (mdev->state.pdsk >= D_INCONSISTENT &&
e89b591c
PR
3139 mdev->state.disk >= D_INCONSISTENT) {
3140 if (ddsf & DDSF_NO_RESYNC)
3141 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3142 else
3143 resync_after_online_grow(mdev);
3144 } else
b411b363
PR
3145 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3146 }
3147 }
3148
3149 return TRUE;
3150}
3151
02918be2 3152static int receive_uuids(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
b411b363 3153{
02918be2 3154 struct p_uuids *p = &mdev->data.rbuf.uuids;
b411b363
PR
3155 u64 *p_uuid;
3156 int i;
3157
b411b363
PR
3158 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3159
3160 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3161 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3162
3163 kfree(mdev->p_uuid);
3164 mdev->p_uuid = p_uuid;
3165
3166 if (mdev->state.conn < C_CONNECTED &&
3167 mdev->state.disk < D_INCONSISTENT &&
3168 mdev->state.role == R_PRIMARY &&
3169 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3170 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3171 (unsigned long long)mdev->ed_uuid);
3172 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3173 return FALSE;
3174 }
3175
3176 if (get_ldev(mdev)) {
3177 int skip_initial_sync =
3178 mdev->state.conn == C_CONNECTED &&
3179 mdev->agreed_pro_version >= 90 &&
3180 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3181 (p_uuid[UI_FLAGS] & 8);
3182 if (skip_initial_sync) {
3183 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3184 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3185 "clear_n_write from receive_uuids");
3186 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3187 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3188 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3189 CS_VERBOSE, NULL);
3190 drbd_md_sync(mdev);
3191 }
3192 put_ldev(mdev);
18a50fa2
PR
3193 } else if (mdev->state.disk < D_INCONSISTENT &&
3194 mdev->state.role == R_PRIMARY) {
3195 /* I am a diskless primary, the peer just created a new current UUID
3196 for me. */
3197 drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
b411b363
PR
3198 }
3199
3200 /* Before we test for the disk state, we should wait until an eventually
3201 ongoing cluster wide state change is finished. That is important if
3202 we are primary and are detaching from our disk. We need to see the
3203 new disk state... */
3204 wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3205 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3206 drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3207
3208 return TRUE;
3209}
3210
3211/**
3212 * convert_state() - Converts the peer's view of the cluster state to our point of view
3213 * @ps: The state as seen by the peer.
3214 */
3215static union drbd_state convert_state(union drbd_state ps)
3216{
3217 union drbd_state ms;
3218
3219 static enum drbd_conns c_tab[] = {
3220 [C_CONNECTED] = C_CONNECTED,
3221
3222 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3223 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3224 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3225 [C_VERIFY_S] = C_VERIFY_T,
3226 [C_MASK] = C_MASK,
3227 };
3228
3229 ms.i = ps.i;
3230
3231 ms.conn = c_tab[ps.conn];
3232 ms.peer = ps.role;
3233 ms.role = ps.peer;
3234 ms.pdsk = ps.disk;
3235 ms.disk = ps.pdsk;
3236 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3237
3238 return ms;
3239}
3240
02918be2 3241static int receive_req_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
b411b363 3242{
02918be2 3243 struct p_req_state *p = &mdev->data.rbuf.req_state;
b411b363
PR
3244 union drbd_state mask, val;
3245 int rv;
3246
b411b363
PR
3247 mask.i = be32_to_cpu(p->mask);
3248 val.i = be32_to_cpu(p->val);
3249
3250 if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3251 test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3252 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3253 return TRUE;
3254 }
3255
3256 mask = convert_state(mask);
3257 val = convert_state(val);
3258
3259 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3260
3261 drbd_send_sr_reply(mdev, rv);
3262 drbd_md_sync(mdev);
3263
3264 return TRUE;
3265}
3266
02918be2 3267static int receive_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
b411b363 3268{
02918be2 3269 struct p_state *p = &mdev->data.rbuf.state;
4ac4aada 3270 union drbd_state os, ns, peer_state;
b411b363 3271 enum drbd_disk_state real_peer_disk;
65d922c3 3272 enum chg_state_flags cs_flags;
b411b363
PR
3273 int rv;
3274
b411b363
PR
3275 peer_state.i = be32_to_cpu(p->state);
3276
3277 real_peer_disk = peer_state.disk;
3278 if (peer_state.disk == D_NEGOTIATING) {
3279 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3280 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3281 }
3282
3283 spin_lock_irq(&mdev->req_lock);
3284 retry:
4ac4aada 3285 os = ns = mdev->state;
b411b363
PR
3286 spin_unlock_irq(&mdev->req_lock);
3287
e9ef7bb6
LE
3288 /* peer says his disk is uptodate, while we think it is inconsistent,
3289 * and this happens while we think we have a sync going on. */
3290 if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3291 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3292 /* If we are (becoming) SyncSource, but peer is still in sync
3293 * preparation, ignore its uptodate-ness to avoid flapping, it
3294 * will change to inconsistent once the peer reaches active
3295 * syncing states.
3296 * It may have changed syncer-paused flags, however, so we
3297 * cannot ignore this completely. */
3298 if (peer_state.conn > C_CONNECTED &&
3299 peer_state.conn < C_SYNC_SOURCE)
3300 real_peer_disk = D_INCONSISTENT;
3301
3302 /* if peer_state changes to connected at the same time,
3303 * it explicitly notifies us that it finished resync.
3304 * Maybe we should finish it up, too? */
3305 else if (os.conn >= C_SYNC_SOURCE &&
3306 peer_state.conn == C_CONNECTED) {
3307 if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3308 drbd_resync_finished(mdev);
3309 return TRUE;
3310 }
3311 }
3312
3313 /* peer says his disk is inconsistent, while we think it is uptodate,
3314 * and this happens while the peer still thinks we have a sync going on,
3315 * but we think we are already done with the sync.
3316 * We ignore this to avoid flapping pdsk.
3317 * This should not happen, if the peer is a recent version of drbd. */
3318 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3319 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3320 real_peer_disk = D_UP_TO_DATE;
3321
4ac4aada
LE
3322 if (ns.conn == C_WF_REPORT_PARAMS)
3323 ns.conn = C_CONNECTED;
b411b363
PR
3324
3325 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3326 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3327 int cr; /* consider resync */
3328
3329 /* if we established a new connection */
4ac4aada 3330 cr = (os.conn < C_CONNECTED);
b411b363
PR
3331 /* if we had an established connection
3332 * and one of the nodes newly attaches a disk */
4ac4aada 3333 cr |= (os.conn == C_CONNECTED &&
b411b363 3334 (peer_state.disk == D_NEGOTIATING ||
4ac4aada 3335 os.disk == D_NEGOTIATING));
b411b363
PR
3336 /* if we have both been inconsistent, and the peer has been
3337 * forced to be UpToDate with --overwrite-data */
3338 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3339 /* if we had been plain connected, and the admin requested to
3340 * start a sync by "invalidate" or "invalidate-remote" */
4ac4aada 3341 cr |= (os.conn == C_CONNECTED &&
b411b363
PR
3342 (peer_state.conn >= C_STARTING_SYNC_S &&
3343 peer_state.conn <= C_WF_BITMAP_T));
3344
3345 if (cr)
4ac4aada 3346 ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
b411b363
PR
3347
3348 put_ldev(mdev);
4ac4aada
LE
3349 if (ns.conn == C_MASK) {
3350 ns.conn = C_CONNECTED;
b411b363
PR
3351 if (mdev->state.disk == D_NEGOTIATING) {
3352 drbd_force_state(mdev, NS(disk, D_DISKLESS));
b411b363
PR
3353 } else if (peer_state.disk == D_NEGOTIATING) {
3354 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3355 peer_state.disk = D_DISKLESS;
580b9767 3356 real_peer_disk = D_DISKLESS;
b411b363 3357 } else {
cf14c2e9
PR
3358 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3359 return FALSE;
4ac4aada 3360 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
b411b363
PR
3361 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3362 return FALSE;
3363 }
3364 }
3365 }
3366
3367 spin_lock_irq(&mdev->req_lock);
4ac4aada 3368 if (mdev->state.i != os.i)
b411b363
PR
3369 goto retry;
3370 clear_bit(CONSIDER_RESYNC, &mdev->flags);
b411b363
PR
3371 ns.peer = peer_state.role;
3372 ns.pdsk = real_peer_disk;
3373 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4ac4aada 3374 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
b411b363 3375 ns.disk = mdev->new_state_tmp.disk;
4ac4aada
LE
3376 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3377 if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
481c6f50
PR
3378 test_bit(NEW_CUR_UUID, &mdev->flags)) {
3379 /* Do not allow tl_restart(resend) for a rebooted peer. We can only allow this
3380 for temporal network outages! */
3381 spin_unlock_irq(&mdev->req_lock);
3382 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3383 tl_clear(mdev);
3384 drbd_uuid_new_current(mdev);
3385 clear_bit(NEW_CUR_UUID, &mdev->flags);
3386 drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0));
3387 return FALSE;
3388 }
65d922c3 3389 rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
b411b363
PR
3390 ns = mdev->state;
3391 spin_unlock_irq(&mdev->req_lock);
3392
3393 if (rv < SS_SUCCESS) {
3394 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3395 return FALSE;
3396 }
3397
4ac4aada
LE
3398 if (os.conn > C_WF_REPORT_PARAMS) {
3399 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
b411b363
PR
3400 peer_state.disk != D_NEGOTIATING ) {
3401 /* we want resync, peer has not yet decided to sync... */
3402 /* Nowadays only used when forcing a node into primary role and
3403 setting its disk to UpToDate with that */
3404 drbd_send_uuids(mdev);
3405 drbd_send_state(mdev);
3406 }
3407 }
3408
3409 mdev->net_conf->want_lose = 0;
3410
3411 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3412
3413 return TRUE;
3414}
3415
02918be2 3416static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
b411b363 3417{
02918be2 3418 struct p_rs_uuid *p = &mdev->data.rbuf.rs_uuid;
b411b363
PR
3419
3420 wait_event(mdev->misc_wait,
3421 mdev->state.conn == C_WF_SYNC_UUID ||
3422 mdev->state.conn < C_CONNECTED ||
3423 mdev->state.disk < D_NEGOTIATING);
3424
3425 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3426
b411b363
PR
3427 /* Here the _drbd_uuid_ functions are right, current should
3428 _not_ be rotated into the history */
3429 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3430 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3431 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3432
3433 drbd_start_resync(mdev, C_SYNC_TARGET);
3434
3435 put_ldev(mdev);
3436 } else
3437 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3438
3439 return TRUE;
3440}
3441
3442enum receive_bitmap_ret { OK, DONE, FAILED };
3443
3444static enum receive_bitmap_ret
02918be2
PR
3445receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3446 unsigned long *buffer, struct bm_xfer_ctx *c)
b411b363
PR
3447{
3448 unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3449 unsigned want = num_words * sizeof(long);
3450
02918be2
PR
3451 if (want != data_size) {
3452 dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
b411b363
PR
3453 return FAILED;
3454 }
3455 if (want == 0)
3456 return DONE;
3457 if (drbd_recv(mdev, buffer, want) != want)
3458 return FAILED;
3459
3460 drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3461
3462 c->word_offset += num_words;
3463 c->bit_offset = c->word_offset * BITS_PER_LONG;
3464 if (c->bit_offset > c->bm_bits)
3465 c->bit_offset = c->bm_bits;
3466
3467 return OK;
3468}
3469
3470static enum receive_bitmap_ret
3471recv_bm_rle_bits(struct drbd_conf *mdev,
3472 struct p_compressed_bm *p,
3473 struct bm_xfer_ctx *c)
3474{
3475 struct bitstream bs;
3476 u64 look_ahead;
3477 u64 rl;
3478 u64 tmp;
3479 unsigned long s = c->bit_offset;
3480 unsigned long e;
004352fa 3481 int len = be16_to_cpu(p->head.length) - (sizeof(*p) - sizeof(p->head));
b411b363
PR
3482 int toggle = DCBP_get_start(p);
3483 int have;
3484 int bits;
3485
3486 bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3487
3488 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3489 if (bits < 0)
3490 return FAILED;
3491
3492 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3493 bits = vli_decode_bits(&rl, look_ahead);
3494 if (bits <= 0)
3495 return FAILED;
3496
3497 if (toggle) {
3498 e = s + rl -1;
3499 if (e >= c->bm_bits) {
3500 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3501 return FAILED;
3502 }
3503 _drbd_bm_set_bits(mdev, s, e);
3504 }
3505
3506 if (have < bits) {
3507 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3508 have, bits, look_ahead,
3509 (unsigned int)(bs.cur.b - p->code),
3510 (unsigned int)bs.buf_len);
3511 return FAILED;
3512 }
3513 look_ahead >>= bits;
3514 have -= bits;
3515
3516 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3517 if (bits < 0)
3518 return FAILED;
3519 look_ahead |= tmp << have;
3520 have += bits;
3521 }
3522
3523 c->bit_offset = s;
3524 bm_xfer_ctx_bit_to_word_offset(c);
3525
3526 return (s == c->bm_bits) ? DONE : OK;
3527}
3528
3529static enum receive_bitmap_ret
3530decode_bitmap_c(struct drbd_conf *mdev,
3531 struct p_compressed_bm *p,
3532 struct bm_xfer_ctx *c)
3533{
3534 if (DCBP_get_code(p) == RLE_VLI_Bits)
3535 return recv_bm_rle_bits(mdev, p, c);
3536
3537 /* other variants had been implemented for evaluation,
3538 * but have been dropped as this one turned out to be "best"
3539 * during all our tests. */
3540
3541 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3542 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3543 return FAILED;
3544}
3545
3546void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3547 const char *direction, struct bm_xfer_ctx *c)
3548{
3549 /* what would it take to transfer it "plaintext" */
0b70a13d 3550 unsigned plain = sizeof(struct p_header80) *
b411b363
PR
3551 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3552 + c->bm_words * sizeof(long);
3553 unsigned total = c->bytes[0] + c->bytes[1];
3554 unsigned r;
3555
3556 /* total can not be zero. but just in case: */
3557 if (total == 0)
3558 return;
3559
3560 /* don't report if not compressed */
3561 if (total >= plain)
3562 return;
3563
3564 /* total < plain. check for overflow, still */
3565 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3566 : (1000 * total / plain);
3567
3568 if (r > 1000)
3569 r = 1000;
3570
3571 r = 1000 - r;
3572 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3573 "total %u; compression: %u.%u%%\n",
3574 direction,
3575 c->bytes[1], c->packets[1],
3576 c->bytes[0], c->packets[0],
3577 total, r/10, r % 10);
3578}
3579
3580/* Since we are processing the bitfield from lower addresses to higher,
3581 it does not matter if the process it in 32 bit chunks or 64 bit
3582 chunks as long as it is little endian. (Understand it as byte stream,
3583 beginning with the lowest byte...) If we would use big endian
3584 we would need to process it from the highest address to the lowest,
3585 in order to be agnostic to the 32 vs 64 bits issue.
3586
3587 returns 0 on failure, 1 if we successfully received it. */
02918be2 3588static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
b411b363
PR
3589{
3590 struct bm_xfer_ctx c;
3591 void *buffer;
3592 enum receive_bitmap_ret ret;
3593 int ok = FALSE;
02918be2 3594 struct p_header80 *h = &mdev->data.rbuf.header.h80;
b411b363
PR
3595
3596 wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
3597
3598 drbd_bm_lock(mdev, "receive bitmap");
3599
3600 /* maybe we should use some per thread scratch page,
3601 * and allocate that during initial device creation? */
3602 buffer = (unsigned long *) __get_free_page(GFP_NOIO);
3603 if (!buffer) {
3604 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3605 goto out;
3606 }
3607
3608 c = (struct bm_xfer_ctx) {
3609 .bm_bits = drbd_bm_bits(mdev),
3610 .bm_words = drbd_bm_words(mdev),
3611 };
3612
3613 do {
02918be2
PR
3614 if (cmd == P_BITMAP) {
3615 ret = receive_bitmap_plain(mdev, data_size, buffer, &c);
3616 } else if (cmd == P_COMPRESSED_BITMAP) {
b411b363
PR
3617 /* MAYBE: sanity check that we speak proto >= 90,
3618 * and the feature is enabled! */
3619 struct p_compressed_bm *p;
3620
02918be2 3621 if (data_size > BM_PACKET_PAYLOAD_BYTES) {
b411b363
PR
3622 dev_err(DEV, "ReportCBitmap packet too large\n");
3623 goto out;
3624 }
3625 /* use the page buff */
3626 p = buffer;
3627 memcpy(p, h, sizeof(*h));
02918be2 3628 if (drbd_recv(mdev, p->head.payload, data_size) != data_size)
b411b363 3629 goto out;
004352fa
LE
3630 if (data_size <= (sizeof(*p) - sizeof(p->head))) {
3631 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
b411b363
PR
3632 return FAILED;
3633 }
3634 ret = decode_bitmap_c(mdev, p, &c);
3635 } else {
02918be2 3636 dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
b411b363
PR
3637 goto out;
3638 }
3639
02918be2
PR
3640 c.packets[cmd == P_BITMAP]++;
3641 c.bytes[cmd == P_BITMAP] += sizeof(struct p_header80) + data_size;
b411b363
PR
3642
3643 if (ret != OK)
3644 break;
3645
02918be2 3646 if (!drbd_recv_header(mdev, &cmd, &data_size))
b411b363
PR
3647 goto out;
3648 } while (ret == OK);
3649 if (ret == FAILED)
3650 goto out;
3651
3652 INFO_bm_xfer_stats(mdev, "receive", &c);
3653
3654 if (mdev->state.conn == C_WF_BITMAP_T) {
3655 ok = !drbd_send_bitmap(mdev);
3656 if (!ok)
3657 goto out;
3658 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3659 ok = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3660 D_ASSERT(ok == SS_SUCCESS);
3661 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3662 /* admin may have requested C_DISCONNECTING,
3663 * other threads may have noticed network errors */
3664 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3665 drbd_conn_str(mdev->state.conn));
3666 }
3667
3668 ok = TRUE;
3669 out:
3670 drbd_bm_unlock(mdev);
3671 if (ok && mdev->state.conn == C_WF_BITMAP_S)
3672 drbd_start_resync(mdev, C_SYNC_SOURCE);
3673 free_page((unsigned long) buffer);
3674 return ok;
3675}
3676
02918be2 3677static int receive_skip(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
b411b363
PR
3678{
3679 /* TODO zero copy sink :) */
3680 static char sink[128];
3681 int size, want, r;
3682
02918be2
PR
3683 dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3684 cmd, data_size);
b411b363 3685
02918be2 3686 size = data_size;
b411b363
PR
3687 while (size > 0) {
3688 want = min_t(int, size, sizeof(sink));
3689 r = drbd_recv(mdev, sink, want);
3690 ERR_IF(r <= 0) break;
3691 size -= r;
3692 }
3693 return size == 0;
3694}
3695
02918be2 3696static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
0ced55a3 3697{
e7f52dfb
LE
3698 if (mdev->state.disk >= D_INCONSISTENT)
3699 drbd_kick_lo(mdev);
0ced55a3 3700
e7f52dfb
LE
3701 /* Make sure we've acked all the TCP data associated
3702 * with the data requests being unplugged */
3703 drbd_tcp_quickack(mdev->data.socket);
0ced55a3 3704
0ced55a3
PR
3705 return TRUE;
3706}
3707
02918be2
PR
3708typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packets cmd, unsigned int to_receive);
3709
3710struct data_cmd {
3711 int expect_payload;
3712 size_t pkt_size;
3713 drbd_cmd_handler_f function;
3714};
3715
3716static struct data_cmd drbd_cmd_handler[] = {
3717 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
3718 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
3719 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3720 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
3721 [P_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } ,
3722 [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } ,
3723 [P_UNPLUG_REMOTE] = { 0, sizeof(struct p_header80), receive_UnplugRemote },
3724 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3725 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3726 [P_SYNC_PARAM] = { 1, sizeof(struct p_header80), receive_SyncParam },
3727 [P_SYNC_PARAM89] = { 1, sizeof(struct p_header80), receive_SyncParam },
3728 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
3729 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
3730 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
3731 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
3732 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
3733 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
3734 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3735 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3736 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3737 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
b411b363
PR
3738 /* anything missing from this table is in
3739 * the asender_tbl, see get_asender_cmd */
02918be2 3740 [P_MAX_CMD] = { 0, 0, NULL },
b411b363
PR
3741};
3742
02918be2
PR
3743/* All handler functions that expect a sub-header get that sub-heder in
3744 mdev->data.rbuf.header.head.payload.
3745
3746 Usually in mdev->data.rbuf.header.head the callback can find the usual
3747 p_header, but they may not rely on that. Since there is also p_header95 !
3748 */
b411b363
PR
3749
3750static void drbdd(struct drbd_conf *mdev)
3751{
02918be2
PR
3752 union p_header *header = &mdev->data.rbuf.header;
3753 unsigned int packet_size;
3754 enum drbd_packets cmd;
3755 size_t shs; /* sub header size */
3756 int rv;
b411b363
PR
3757
3758 while (get_t_state(&mdev->receiver) == Running) {
3759 drbd_thread_current_set_cpu(mdev);
02918be2
PR
3760 if (!drbd_recv_header(mdev, &cmd, &packet_size))
3761 goto err_out;
3762
3763 if (unlikely(cmd >= P_MAX_CMD || !drbd_cmd_handler[cmd].function)) {
3764 dev_err(DEV, "unknown packet type %d, l: %d!\n", cmd, packet_size);
3765 goto err_out;
0b33a916 3766 }
b411b363 3767
02918be2
PR
3768 shs = drbd_cmd_handler[cmd].pkt_size - sizeof(union p_header);
3769 rv = drbd_recv(mdev, &header->h80.payload, shs);
3770 if (unlikely(rv != shs)) {
3771 dev_err(DEV, "short read while reading sub header: rv=%d\n", rv);
3772 goto err_out;
3773 }
b411b363 3774
02918be2
PR
3775 if (packet_size - shs > 0 && !drbd_cmd_handler[cmd].expect_payload) {
3776 dev_err(DEV, "No payload expected %s l:%d\n", cmdname(cmd), packet_size);
3777 goto err_out;
b411b363 3778 }
02918be2
PR
3779
3780 rv = drbd_cmd_handler[cmd].function(mdev, cmd, packet_size - shs);
3781
3782 if (unlikely(!rv)) {
b411b363 3783 dev_err(DEV, "error receiving %s, l: %d!\n",
02918be2
PR
3784 cmdname(cmd), packet_size);
3785 goto err_out;
b411b363 3786 }
b411b363 3787 }
02918be2
PR
3788
3789 if (0) {
3790 err_out:
3791 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3792 }
b411b363
PR
3793}
3794
b411b363
PR
3795void drbd_flush_workqueue(struct drbd_conf *mdev)
3796{
3797 struct drbd_wq_barrier barr;
3798
3799 barr.w.cb = w_prev_work_done;
3800 init_completion(&barr.done);
3801 drbd_queue_work(&mdev->data.work, &barr.w);
3802 wait_for_completion(&barr.done);
3803}
3804
f70b3511
PR
3805void drbd_free_tl_hash(struct drbd_conf *mdev)
3806{
3807 struct hlist_head *h;
3808
3809 spin_lock_irq(&mdev->req_lock);
3810
3811 if (!mdev->tl_hash || mdev->state.conn != C_STANDALONE) {
3812 spin_unlock_irq(&mdev->req_lock);
3813 return;
3814 }
3815 /* paranoia code */
3816 for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3817 if (h->first)
3818 dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3819 (int)(h - mdev->ee_hash), h->first);
3820 kfree(mdev->ee_hash);
3821 mdev->ee_hash = NULL;
3822 mdev->ee_hash_s = 0;
3823
3824 /* paranoia code */
3825 for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3826 if (h->first)
3827 dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3828 (int)(h - mdev->tl_hash), h->first);
3829 kfree(mdev->tl_hash);
3830 mdev->tl_hash = NULL;
3831 mdev->tl_hash_s = 0;
3832 spin_unlock_irq(&mdev->req_lock);
3833}
3834
b411b363
PR
3835static void drbd_disconnect(struct drbd_conf *mdev)
3836{
3837 enum drbd_fencing_p fp;
3838 union drbd_state os, ns;
3839 int rv = SS_UNKNOWN_ERROR;
3840 unsigned int i;
3841
3842 if (mdev->state.conn == C_STANDALONE)
3843 return;
3844 if (mdev->state.conn >= C_WF_CONNECTION)
3845 dev_err(DEV, "ASSERT FAILED cstate = %s, expected < WFConnection\n",
3846 drbd_conn_str(mdev->state.conn));
3847
3848 /* asender does not clean up anything. it must not interfere, either */
3849 drbd_thread_stop(&mdev->asender);
b411b363 3850 drbd_free_sock(mdev);
b411b363 3851
85719573 3852 /* wait for current activity to cease. */
b411b363
PR
3853 spin_lock_irq(&mdev->req_lock);
3854 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3855 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3856 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3857 spin_unlock_irq(&mdev->req_lock);
3858
3859 /* We do not have data structures that would allow us to
3860 * get the rs_pending_cnt down to 0 again.
3861 * * On C_SYNC_TARGET we do not have any data structures describing
3862 * the pending RSDataRequest's we have sent.
3863 * * On C_SYNC_SOURCE there is no data structure that tracks
3864 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3865 * And no, it is not the sum of the reference counts in the
3866 * resync_LRU. The resync_LRU tracks the whole operation including
3867 * the disk-IO, while the rs_pending_cnt only tracks the blocks
3868 * on the fly. */
3869 drbd_rs_cancel_all(mdev);
3870 mdev->rs_total = 0;
3871 mdev->rs_failed = 0;
3872 atomic_set(&mdev->rs_pending_cnt, 0);
3873 wake_up(&mdev->misc_wait);
3874
3875 /* make sure syncer is stopped and w_resume_next_sg queued */
3876 del_timer_sync(&mdev->resync_timer);
b411b363
PR
3877 resync_timer_fn((unsigned long)mdev);
3878
b411b363
PR
3879 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3880 * w_make_resync_request etc. which may still be on the worker queue
3881 * to be "canceled" */
3882 drbd_flush_workqueue(mdev);
3883
3884 /* This also does reclaim_net_ee(). If we do this too early, we might
3885 * miss some resync ee and pages.*/
3886 drbd_process_done_ee(mdev);
3887
3888 kfree(mdev->p_uuid);
3889 mdev->p_uuid = NULL;
3890
fb22c402 3891 if (!is_susp(mdev->state))
b411b363
PR
3892 tl_clear(mdev);
3893
b411b363
PR
3894 dev_info(DEV, "Connection closed\n");
3895
3896 drbd_md_sync(mdev);
3897
3898 fp = FP_DONT_CARE;
3899 if (get_ldev(mdev)) {
3900 fp = mdev->ldev->dc.fencing;
3901 put_ldev(mdev);
3902 }
3903
87f7be4c
PR
3904 if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
3905 drbd_try_outdate_peer_async(mdev);
b411b363
PR
3906
3907 spin_lock_irq(&mdev->req_lock);
3908 os = mdev->state;
3909 if (os.conn >= C_UNCONNECTED) {
3910 /* Do not restart in case we are C_DISCONNECTING */
3911 ns = os;
3912 ns.conn = C_UNCONNECTED;
3913 rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3914 }
3915 spin_unlock_irq(&mdev->req_lock);
3916
3917 if (os.conn == C_DISCONNECTING) {
84dfb9f5 3918 wait_event(mdev->net_cnt_wait, atomic_read(&mdev->net_cnt) == 0);
b411b363 3919
fb22c402 3920 if (!is_susp(mdev->state)) {
f70b3511
PR
3921 /* we must not free the tl_hash
3922 * while application io is still on the fly */
3923 wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
3924 drbd_free_tl_hash(mdev);
3925 }
b411b363
PR
3926
3927 crypto_free_hash(mdev->cram_hmac_tfm);
3928 mdev->cram_hmac_tfm = NULL;
3929
3930 kfree(mdev->net_conf);
3931 mdev->net_conf = NULL;
3932 drbd_request_state(mdev, NS(conn, C_STANDALONE));
3933 }
3934
3935 /* tcp_close and release of sendpage pages can be deferred. I don't
3936 * want to use SO_LINGER, because apparently it can be deferred for
3937 * more than 20 seconds (longest time I checked).
3938 *
3939 * Actually we don't care for exactly when the network stack does its
3940 * put_page(), but release our reference on these pages right here.
3941 */
3942 i = drbd_release_ee(mdev, &mdev->net_ee);
3943 if (i)
3944 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
435f0740
LE
3945 i = atomic_read(&mdev->pp_in_use_by_net);
3946 if (i)
3947 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
b411b363
PR
3948 i = atomic_read(&mdev->pp_in_use);
3949 if (i)
45bb912b 3950 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
b411b363
PR
3951
3952 D_ASSERT(list_empty(&mdev->read_ee));
3953 D_ASSERT(list_empty(&mdev->active_ee));
3954 D_ASSERT(list_empty(&mdev->sync_ee));
3955 D_ASSERT(list_empty(&mdev->done_ee));
3956
3957 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3958 atomic_set(&mdev->current_epoch->epoch_size, 0);
3959 D_ASSERT(list_empty(&mdev->current_epoch->list));
3960}
3961
3962/*
3963 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3964 * we can agree on is stored in agreed_pro_version.
3965 *
3966 * feature flags and the reserved array should be enough room for future
3967 * enhancements of the handshake protocol, and possible plugins...
3968 *
3969 * for now, they are expected to be zero, but ignored.
3970 */
3971static int drbd_send_handshake(struct drbd_conf *mdev)
3972{
3973 /* ASSERT current == mdev->receiver ... */
3974 struct p_handshake *p = &mdev->data.sbuf.handshake;
3975 int ok;
3976
3977 if (mutex_lock_interruptible(&mdev->data.mutex)) {
3978 dev_err(DEV, "interrupted during initial handshake\n");
3979 return 0; /* interrupted. not ok. */
3980 }
3981
3982 if (mdev->data.socket == NULL) {
3983 mutex_unlock(&mdev->data.mutex);
3984 return 0;
3985 }
3986
3987 memset(p, 0, sizeof(*p));
3988 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3989 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3990 ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
0b70a13d 3991 (struct p_header80 *)p, sizeof(*p), 0 );
b411b363
PR
3992 mutex_unlock(&mdev->data.mutex);
3993 return ok;
3994}
3995
3996/*
3997 * return values:
3998 * 1 yes, we have a valid connection
3999 * 0 oops, did not work out, please try again
4000 * -1 peer talks different language,
4001 * no point in trying again, please go standalone.
4002 */
4003static int drbd_do_handshake(struct drbd_conf *mdev)
4004{
4005 /* ASSERT current == mdev->receiver ... */
4006 struct p_handshake *p = &mdev->data.rbuf.handshake;
02918be2
PR
4007 const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
4008 unsigned int length;
4009 enum drbd_packets cmd;
b411b363
PR
4010 int rv;
4011
4012 rv = drbd_send_handshake(mdev);
4013 if (!rv)
4014 return 0;
4015
02918be2 4016 rv = drbd_recv_header(mdev, &cmd, &length);
b411b363
PR
4017 if (!rv)
4018 return 0;
4019
02918be2 4020 if (cmd != P_HAND_SHAKE) {
b411b363 4021 dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
02918be2 4022 cmdname(cmd), cmd);
b411b363
PR
4023 return -1;
4024 }
4025
02918be2 4026 if (length != expect) {
b411b363 4027 dev_err(DEV, "expected HandShake length: %u, received: %u\n",
02918be2 4028 expect, length);
b411b363
PR
4029 return -1;
4030 }
4031
4032 rv = drbd_recv(mdev, &p->head.payload, expect);
4033
4034 if (rv != expect) {
4035 dev_err(DEV, "short read receiving handshake packet: l=%u\n", rv);
4036 return 0;
4037 }
4038
b411b363
PR
4039 p->protocol_min = be32_to_cpu(p->protocol_min);
4040 p->protocol_max = be32_to_cpu(p->protocol_max);
4041 if (p->protocol_max == 0)
4042 p->protocol_max = p->protocol_min;
4043
4044 if (PRO_VERSION_MAX < p->protocol_min ||
4045 PRO_VERSION_MIN > p->protocol_max)
4046 goto incompat;
4047
4048 mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4049
4050 dev_info(DEV, "Handshake successful: "
4051 "Agreed network protocol version %d\n", mdev->agreed_pro_version);
4052
4053 return 1;
4054
4055 incompat:
4056 dev_err(DEV, "incompatible DRBD dialects: "
4057 "I support %d-%d, peer supports %d-%d\n",
4058 PRO_VERSION_MIN, PRO_VERSION_MAX,
4059 p->protocol_min, p->protocol_max);
4060 return -1;
4061}
4062
4063#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4064static int drbd_do_auth(struct drbd_conf *mdev)
4065{
4066 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4067 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
b10d96cb 4068 return -1;
b411b363
PR
4069}
4070#else
4071#define CHALLENGE_LEN 64
b10d96cb
JT
4072
4073/* Return value:
4074 1 - auth succeeded,
4075 0 - failed, try again (network error),
4076 -1 - auth failed, don't try again.
4077*/
4078
b411b363
PR
4079static int drbd_do_auth(struct drbd_conf *mdev)
4080{
4081 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4082 struct scatterlist sg;
4083 char *response = NULL;
4084 char *right_response = NULL;
4085 char *peers_ch = NULL;
b411b363
PR
4086 unsigned int key_len = strlen(mdev->net_conf->shared_secret);
4087 unsigned int resp_size;
4088 struct hash_desc desc;
02918be2
PR
4089 enum drbd_packets cmd;
4090 unsigned int length;
b411b363
PR
4091 int rv;
4092
4093 desc.tfm = mdev->cram_hmac_tfm;
4094 desc.flags = 0;
4095
4096 rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
4097 (u8 *)mdev->net_conf->shared_secret, key_len);
4098 if (rv) {
4099 dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
b10d96cb 4100 rv = -1;
b411b363
PR
4101 goto fail;
4102 }
4103
4104 get_random_bytes(my_challenge, CHALLENGE_LEN);
4105
4106 rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4107 if (!rv)
4108 goto fail;
4109
02918be2 4110 rv = drbd_recv_header(mdev, &cmd, &length);
b411b363
PR
4111 if (!rv)
4112 goto fail;
4113
02918be2 4114 if (cmd != P_AUTH_CHALLENGE) {
b411b363 4115 dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
02918be2 4116 cmdname(cmd), cmd);
b411b363
PR
4117 rv = 0;
4118 goto fail;
4119 }
4120
02918be2 4121 if (length > CHALLENGE_LEN * 2) {
b411b363 4122 dev_err(DEV, "expected AuthChallenge payload too big.\n");
b10d96cb 4123 rv = -1;
b411b363
PR
4124 goto fail;
4125 }
4126
02918be2 4127 peers_ch = kmalloc(length, GFP_NOIO);
b411b363
PR
4128 if (peers_ch == NULL) {
4129 dev_err(DEV, "kmalloc of peers_ch failed\n");
b10d96cb 4130 rv = -1;
b411b363
PR
4131 goto fail;
4132 }
4133
02918be2 4134 rv = drbd_recv(mdev, peers_ch, length);
b411b363 4135
02918be2 4136 if (rv != length) {
b411b363
PR
4137 dev_err(DEV, "short read AuthChallenge: l=%u\n", rv);
4138 rv = 0;
4139 goto fail;
4140 }
4141
4142 resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
4143 response = kmalloc(resp_size, GFP_NOIO);
4144 if (response == NULL) {
4145 dev_err(DEV, "kmalloc of response failed\n");
b10d96cb 4146 rv = -1;
b411b363
PR
4147 goto fail;
4148 }
4149
4150 sg_init_table(&sg, 1);
02918be2 4151 sg_set_buf(&sg, peers_ch, length);
b411b363
PR
4152
4153 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4154 if (rv) {
4155 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4156 rv = -1;
b411b363
PR
4157 goto fail;
4158 }
4159
4160 rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4161 if (!rv)
4162 goto fail;
4163
02918be2 4164 rv = drbd_recv_header(mdev, &cmd, &length);
b411b363
PR
4165 if (!rv)
4166 goto fail;
4167
02918be2 4168 if (cmd != P_AUTH_RESPONSE) {
b411b363 4169 dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
02918be2 4170 cmdname(cmd), cmd);
b411b363
PR
4171 rv = 0;
4172 goto fail;
4173 }
4174
02918be2 4175 if (length != resp_size) {
b411b363
PR
4176 dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4177 rv = 0;
4178 goto fail;
4179 }
4180
4181 rv = drbd_recv(mdev, response , resp_size);
4182
4183 if (rv != resp_size) {
4184 dev_err(DEV, "short read receiving AuthResponse: l=%u\n", rv);
4185 rv = 0;
4186 goto fail;
4187 }
4188
4189 right_response = kmalloc(resp_size, GFP_NOIO);
2d1ee87d 4190 if (right_response == NULL) {
b411b363 4191 dev_err(DEV, "kmalloc of right_response failed\n");
b10d96cb 4192 rv = -1;
b411b363
PR
4193 goto fail;
4194 }
4195
4196 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4197
4198 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4199 if (rv) {
4200 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4201 rv = -1;
b411b363
PR
4202 goto fail;
4203 }
4204
4205 rv = !memcmp(response, right_response, resp_size);
4206
4207 if (rv)
4208 dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4209 resp_size, mdev->net_conf->cram_hmac_alg);
b10d96cb
JT
4210 else
4211 rv = -1;
b411b363
PR
4212
4213 fail:
4214 kfree(peers_ch);
4215 kfree(response);
4216 kfree(right_response);
4217
4218 return rv;
4219}
4220#endif
4221
4222int drbdd_init(struct drbd_thread *thi)
4223{
4224 struct drbd_conf *mdev = thi->mdev;
4225 unsigned int minor = mdev_to_minor(mdev);
4226 int h;
4227
4228 sprintf(current->comm, "drbd%d_receiver", minor);
4229
4230 dev_info(DEV, "receiver (re)started\n");
4231
4232 do {
4233 h = drbd_connect(mdev);
4234 if (h == 0) {
4235 drbd_disconnect(mdev);
4236 __set_current_state(TASK_INTERRUPTIBLE);
4237 schedule_timeout(HZ);
4238 }
4239 if (h == -1) {
4240 dev_warn(DEV, "Discarding network configuration.\n");
4241 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4242 }
4243 } while (h == 0);
4244
4245 if (h > 0) {
4246 if (get_net_conf(mdev)) {
4247 drbdd(mdev);
4248 put_net_conf(mdev);
4249 }
4250 }
4251
4252 drbd_disconnect(mdev);
4253
4254 dev_info(DEV, "receiver terminated\n");
4255 return 0;
4256}
4257
4258/* ********* acknowledge sender ******** */
4259
0b70a13d 4260static int got_RqSReply(struct drbd_conf *mdev, struct p_header80 *h)
b411b363
PR
4261{
4262 struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4263
4264 int retcode = be32_to_cpu(p->retcode);
4265
4266 if (retcode >= SS_SUCCESS) {
4267 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4268 } else {
4269 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4270 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4271 drbd_set_st_err_str(retcode), retcode);
4272 }
4273 wake_up(&mdev->state_wait);
4274
4275 return TRUE;
4276}
4277
0b70a13d 4278static int got_Ping(struct drbd_conf *mdev, struct p_header80 *h)
b411b363
PR
4279{
4280 return drbd_send_ping_ack(mdev);
4281
4282}
4283
0b70a13d 4284static int got_PingAck(struct drbd_conf *mdev, struct p_header80 *h)
b411b363
PR
4285{
4286 /* restore idle timeout */
4287 mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
309d1608
PR
4288 if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4289 wake_up(&mdev->misc_wait);
b411b363
PR
4290
4291 return TRUE;
4292}
4293
0b70a13d 4294static int got_IsInSync(struct drbd_conf *mdev, struct p_header80 *h)
b411b363
PR
4295{
4296 struct p_block_ack *p = (struct p_block_ack *)h;
4297 sector_t sector = be64_to_cpu(p->sector);
4298 int blksize = be32_to_cpu(p->blksize);
4299
4300 D_ASSERT(mdev->agreed_pro_version >= 89);
4301
4302 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4303
1d53f09e
LE
4304 if (get_ldev(mdev)) {
4305 drbd_rs_complete_io(mdev, sector);
4306 drbd_set_in_sync(mdev, sector, blksize);
4307 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4308 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4309 put_ldev(mdev);
4310 }
b411b363 4311 dec_rs_pending(mdev);
778f271d 4312 atomic_add(blksize >> 9, &mdev->rs_sect_in);
b411b363
PR
4313
4314 return TRUE;
4315}
4316
4317/* when we receive the ACK for a write request,
4318 * verify that we actually know about it */
4319static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4320 u64 id, sector_t sector)
4321{
4322 struct hlist_head *slot = tl_hash_slot(mdev, sector);
4323 struct hlist_node *n;
4324 struct drbd_request *req;
4325
4326 hlist_for_each_entry(req, n, slot, colision) {
4327 if ((unsigned long)req == (unsigned long)id) {
4328 if (req->sector != sector) {
4329 dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4330 "wrong sector (%llus versus %llus)\n", req,
4331 (unsigned long long)req->sector,
4332 (unsigned long long)sector);
4333 break;
4334 }
4335 return req;
4336 }
4337 }
4338 dev_err(DEV, "_ack_id_to_req: failed to find req %p, sector %llus in list\n",
4339 (void *)(unsigned long)id, (unsigned long long)sector);
4340 return NULL;
4341}
4342
4343typedef struct drbd_request *(req_validator_fn)
4344 (struct drbd_conf *mdev, u64 id, sector_t sector);
4345
4346static int validate_req_change_req_state(struct drbd_conf *mdev,
4347 u64 id, sector_t sector, req_validator_fn validator,
4348 const char *func, enum drbd_req_event what)
4349{
4350 struct drbd_request *req;
4351 struct bio_and_error m;
4352
4353 spin_lock_irq(&mdev->req_lock);
4354 req = validator(mdev, id, sector);
4355 if (unlikely(!req)) {
4356 spin_unlock_irq(&mdev->req_lock);
4357 dev_err(DEV, "%s: got a corrupt block_id/sector pair\n", func);
4358 return FALSE;
4359 }
4360 __req_mod(req, what, &m);
4361 spin_unlock_irq(&mdev->req_lock);
4362
4363 if (m.bio)
4364 complete_master_bio(mdev, &m);
4365 return TRUE;
4366}
4367
0b70a13d 4368static int got_BlockAck(struct drbd_conf *mdev, struct p_header80 *h)
b411b363
PR
4369{
4370 struct p_block_ack *p = (struct p_block_ack *)h;
4371 sector_t sector = be64_to_cpu(p->sector);
4372 int blksize = be32_to_cpu(p->blksize);
4373 enum drbd_req_event what;
4374
4375 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4376
4377 if (is_syncer_block_id(p->block_id)) {
4378 drbd_set_in_sync(mdev, sector, blksize);
4379 dec_rs_pending(mdev);
4380 return TRUE;
4381 }
4382 switch (be16_to_cpu(h->command)) {
4383 case P_RS_WRITE_ACK:
4384 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4385 what = write_acked_by_peer_and_sis;
4386 break;
4387 case P_WRITE_ACK:
4388 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4389 what = write_acked_by_peer;
4390 break;
4391 case P_RECV_ACK:
4392 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4393 what = recv_acked_by_peer;
4394 break;
4395 case P_DISCARD_ACK:
4396 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4397 what = conflict_discarded_by_peer;
4398 break;
4399 default:
4400 D_ASSERT(0);
4401 return FALSE;
4402 }
4403
4404 return validate_req_change_req_state(mdev, p->block_id, sector,
4405 _ack_id_to_req, __func__ , what);
4406}
4407
0b70a13d 4408static int got_NegAck(struct drbd_conf *mdev, struct p_header80 *h)
b411b363
PR
4409{
4410 struct p_block_ack *p = (struct p_block_ack *)h;
4411 sector_t sector = be64_to_cpu(p->sector);
4412
4413 if (__ratelimit(&drbd_ratelimit_state))
4414 dev_warn(DEV, "Got NegAck packet. Peer is in troubles?\n");
4415
4416 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4417
4418 if (is_syncer_block_id(p->block_id)) {
4419 int size = be32_to_cpu(p->blksize);
4420 dec_rs_pending(mdev);
4421 drbd_rs_failed_io(mdev, sector, size);
4422 return TRUE;
4423 }
4424 return validate_req_change_req_state(mdev, p->block_id, sector,
4425 _ack_id_to_req, __func__ , neg_acked);
4426}
4427
0b70a13d 4428static int got_NegDReply(struct drbd_conf *mdev, struct p_header80 *h)
b411b363
PR
4429{
4430 struct p_block_ack *p = (struct p_block_ack *)h;
4431 sector_t sector = be64_to_cpu(p->sector);
4432
4433 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4434 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4435 (unsigned long long)sector, be32_to_cpu(p->blksize));
4436
4437 return validate_req_change_req_state(mdev, p->block_id, sector,
4438 _ar_id_to_req, __func__ , neg_acked);
4439}
4440
0b70a13d 4441static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header80 *h)
b411b363
PR
4442{
4443 sector_t sector;
4444 int size;
4445 struct p_block_ack *p = (struct p_block_ack *)h;
4446
4447 sector = be64_to_cpu(p->sector);
4448 size = be32_to_cpu(p->blksize);
b411b363
PR
4449
4450 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4451
4452 dec_rs_pending(mdev);
4453
4454 if (get_ldev_if_state(mdev, D_FAILED)) {
4455 drbd_rs_complete_io(mdev, sector);
4456 drbd_rs_failed_io(mdev, sector, size);
4457 put_ldev(mdev);
4458 }
4459
4460 return TRUE;
4461}
4462
0b70a13d 4463static int got_BarrierAck(struct drbd_conf *mdev, struct p_header80 *h)
b411b363
PR
4464{
4465 struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4466
4467 tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4468
4469 return TRUE;
4470}
4471
0b70a13d 4472static int got_OVResult(struct drbd_conf *mdev, struct p_header80 *h)
b411b363
PR
4473{
4474 struct p_block_ack *p = (struct p_block_ack *)h;
4475 struct drbd_work *w;
4476 sector_t sector;
4477 int size;
4478
4479 sector = be64_to_cpu(p->sector);
4480 size = be32_to_cpu(p->blksize);
4481
4482 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4483
4484 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4485 drbd_ov_oos_found(mdev, sector, size);
4486 else
4487 ov_oos_print(mdev);
4488
1d53f09e
LE
4489 if (!get_ldev(mdev))
4490 return TRUE;
4491
b411b363
PR
4492 drbd_rs_complete_io(mdev, sector);
4493 dec_rs_pending(mdev);
4494
4495 if (--mdev->ov_left == 0) {
4496 w = kmalloc(sizeof(*w), GFP_NOIO);
4497 if (w) {
4498 w->cb = w_ov_finished;
4499 drbd_queue_work_front(&mdev->data.work, w);
4500 } else {
4501 dev_err(DEV, "kmalloc(w) failed.");
4502 ov_oos_print(mdev);
4503 drbd_resync_finished(mdev);
4504 }
4505 }
1d53f09e 4506 put_ldev(mdev);
b411b363
PR
4507 return TRUE;
4508}
4509
02918be2 4510static int got_skip(struct drbd_conf *mdev, struct p_header80 *h)
0ced55a3 4511{
0ced55a3
PR
4512 return TRUE;
4513}
4514
b411b363
PR
4515struct asender_cmd {
4516 size_t pkt_size;
0b70a13d 4517 int (*process)(struct drbd_conf *mdev, struct p_header80 *h);
b411b363
PR
4518};
4519
4520static struct asender_cmd *get_asender_cmd(int cmd)
4521{
4522 static struct asender_cmd asender_tbl[] = {
4523 /* anything missing from this table is in
4524 * the drbd_cmd_handler (drbd_default_handler) table,
4525 * see the beginning of drbdd() */
0b70a13d
PR
4526 [P_PING] = { sizeof(struct p_header80), got_Ping },
4527 [P_PING_ACK] = { sizeof(struct p_header80), got_PingAck },
b411b363
PR
4528 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4529 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4530 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4531 [P_DISCARD_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4532 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
4533 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
4534 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply},
4535 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
4536 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
4537 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4538 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
02918be2 4539 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
b411b363
PR
4540 [P_MAX_CMD] = { 0, NULL },
4541 };
4542 if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4543 return NULL;
4544 return &asender_tbl[cmd];
4545}
4546
4547int drbd_asender(struct drbd_thread *thi)
4548{
4549 struct drbd_conf *mdev = thi->mdev;
02918be2 4550 struct p_header80 *h = &mdev->meta.rbuf.header.h80;
b411b363
PR
4551 struct asender_cmd *cmd = NULL;
4552
4553 int rv, len;
4554 void *buf = h;
4555 int received = 0;
0b70a13d 4556 int expect = sizeof(struct p_header80);
b411b363
PR
4557 int empty;
4558
4559 sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4560
4561 current->policy = SCHED_RR; /* Make this a realtime task! */
4562 current->rt_priority = 2; /* more important than all other tasks */
4563
4564 while (get_t_state(thi) == Running) {
4565 drbd_thread_current_set_cpu(mdev);
4566 if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4567 ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4568 mdev->meta.socket->sk->sk_rcvtimeo =
4569 mdev->net_conf->ping_timeo*HZ/10;
4570 }
4571
4572 /* conditionally cork;
4573 * it may hurt latency if we cork without much to send */
4574 if (!mdev->net_conf->no_cork &&
4575 3 < atomic_read(&mdev->unacked_cnt))
4576 drbd_tcp_cork(mdev->meta.socket);
4577 while (1) {
4578 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4579 flush_signals(current);
4580 if (!drbd_process_done_ee(mdev)) {
4581 dev_err(DEV, "process_done_ee() = NOT_OK\n");
4582 goto reconnect;
4583 }
4584 /* to avoid race with newly queued ACKs */
4585 set_bit(SIGNAL_ASENDER, &mdev->flags);
4586 spin_lock_irq(&mdev->req_lock);
4587 empty = list_empty(&mdev->done_ee);
4588 spin_unlock_irq(&mdev->req_lock);
4589 /* new ack may have been queued right here,
4590 * but then there is also a signal pending,
4591 * and we start over... */
4592 if (empty)
4593 break;
4594 }
4595 /* but unconditionally uncork unless disabled */
4596 if (!mdev->net_conf->no_cork)
4597 drbd_tcp_uncork(mdev->meta.socket);
4598
4599 /* short circuit, recv_msg would return EINTR anyways. */
4600 if (signal_pending(current))
4601 continue;
4602
4603 rv = drbd_recv_short(mdev, mdev->meta.socket,
4604 buf, expect-received, 0);
4605 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4606
4607 flush_signals(current);
4608
4609 /* Note:
4610 * -EINTR (on meta) we got a signal
4611 * -EAGAIN (on meta) rcvtimeo expired
4612 * -ECONNRESET other side closed the connection
4613 * -ERESTARTSYS (on data) we got a signal
4614 * rv < 0 other than above: unexpected error!
4615 * rv == expected: full header or command
4616 * rv < expected: "woken" by signal during receive
4617 * rv == 0 : "connection shut down by peer"
4618 */
4619 if (likely(rv > 0)) {
4620 received += rv;
4621 buf += rv;
4622 } else if (rv == 0) {
4623 dev_err(DEV, "meta connection shut down by peer.\n");
4624 goto reconnect;
4625 } else if (rv == -EAGAIN) {
4626 if (mdev->meta.socket->sk->sk_rcvtimeo ==
4627 mdev->net_conf->ping_timeo*HZ/10) {
4628 dev_err(DEV, "PingAck did not arrive in time.\n");
4629 goto reconnect;
4630 }
4631 set_bit(SEND_PING, &mdev->flags);
4632 continue;
4633 } else if (rv == -EINTR) {
4634 continue;
4635 } else {
4636 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4637 goto reconnect;
4638 }
4639
4640 if (received == expect && cmd == NULL) {
4641 if (unlikely(h->magic != BE_DRBD_MAGIC)) {
004352fa
LE
4642 dev_err(DEV, "magic?? on meta m: 0x%08x c: %d l: %d\n",
4643 be32_to_cpu(h->magic),
4644 be16_to_cpu(h->command),
4645 be16_to_cpu(h->length));
b411b363
PR
4646 goto reconnect;
4647 }
4648 cmd = get_asender_cmd(be16_to_cpu(h->command));
4649 len = be16_to_cpu(h->length);
4650 if (unlikely(cmd == NULL)) {
004352fa
LE
4651 dev_err(DEV, "unknown command?? on meta m: 0x%08x c: %d l: %d\n",
4652 be32_to_cpu(h->magic),
4653 be16_to_cpu(h->command),
4654 be16_to_cpu(h->length));
b411b363
PR
4655 goto disconnect;
4656 }
4657 expect = cmd->pkt_size;
0b70a13d 4658 ERR_IF(len != expect-sizeof(struct p_header80))
b411b363 4659 goto reconnect;
b411b363
PR
4660 }
4661 if (received == expect) {
4662 D_ASSERT(cmd != NULL);
b411b363
PR
4663 if (!cmd->process(mdev, h))
4664 goto reconnect;
4665
4666 buf = h;
4667 received = 0;
0b70a13d 4668 expect = sizeof(struct p_header80);
b411b363
PR
4669 cmd = NULL;
4670 }
4671 }
4672
4673 if (0) {
4674reconnect:
4675 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4676 }
4677 if (0) {
4678disconnect:
4679 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4680 }
4681 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4682
4683 D_ASSERT(mdev->state.conn < C_CONNECTED);
4684 dev_info(DEV, "asender terminated\n");
4685
4686 return 0;
4687}