]> git.proxmox.com Git - mirror_ubuntu-bionic-kernel.git/blame - drivers/block/drbd/drbd_receiver.c
drbd: Converted drbd_worker() from mdev to tconn
[mirror_ubuntu-bionic-kernel.git] / drivers / block / drbd / drbd_receiver.c
CommitLineData
b411b363
PR
1/*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
b411b363
PR
26#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
b411b363
PR
31#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
b411b363
PR
39#include <linux/pkt_sched.h>
40#define __KERNEL_SYSCALLS__
41#include <linux/unistd.h>
42#include <linux/vmalloc.h>
43#include <linux/random.h>
b411b363
PR
44#include <linux/string.h>
45#include <linux/scatterlist.h>
46#include "drbd_int.h"
b411b363
PR
47#include "drbd_req.h"
48
49#include "drbd_vli.h"
50
77351055
PR
51struct packet_info {
52 enum drbd_packet cmd;
53 int size;
54 int vnr;
55};
56
b411b363
PR
57enum finish_epoch {
58 FE_STILL_LIVE,
59 FE_DESTROYED,
60 FE_RECYCLED,
61};
62
65d11ed6 63static int drbd_do_handshake(struct drbd_tconn *tconn);
13e6037d 64static int drbd_do_auth(struct drbd_tconn *tconn);
360cc740 65static int drbd_disconnected(int vnr, void *p, void *data);
b411b363
PR
66
67static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
68static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
69
b411b363
PR
70
71#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
72
45bb912b
LE
73/*
74 * some helper functions to deal with single linked page lists,
75 * page->private being our "next" pointer.
76 */
77
78/* If at least n pages are linked at head, get n pages off.
79 * Otherwise, don't modify head, and return NULL.
80 * Locking is the responsibility of the caller.
81 */
82static struct page *page_chain_del(struct page **head, int n)
83{
84 struct page *page;
85 struct page *tmp;
86
87 BUG_ON(!n);
88 BUG_ON(!head);
89
90 page = *head;
23ce4227
PR
91
92 if (!page)
93 return NULL;
94
45bb912b
LE
95 while (page) {
96 tmp = page_chain_next(page);
97 if (--n == 0)
98 break; /* found sufficient pages */
99 if (tmp == NULL)
100 /* insufficient pages, don't use any of them. */
101 return NULL;
102 page = tmp;
103 }
104
105 /* add end of list marker for the returned list */
106 set_page_private(page, 0);
107 /* actual return value, and adjustment of head */
108 page = *head;
109 *head = tmp;
110 return page;
111}
112
113/* may be used outside of locks to find the tail of a (usually short)
114 * "private" page chain, before adding it back to a global chain head
115 * with page_chain_add() under a spinlock. */
116static struct page *page_chain_tail(struct page *page, int *len)
117{
118 struct page *tmp;
119 int i = 1;
120 while ((tmp = page_chain_next(page)))
121 ++i, page = tmp;
122 if (len)
123 *len = i;
124 return page;
125}
126
127static int page_chain_free(struct page *page)
128{
129 struct page *tmp;
130 int i = 0;
131 page_chain_for_each_safe(page, tmp) {
132 put_page(page);
133 ++i;
134 }
135 return i;
136}
137
138static void page_chain_add(struct page **head,
139 struct page *chain_first, struct page *chain_last)
140{
141#if 1
142 struct page *tmp;
143 tmp = page_chain_tail(chain_first, NULL);
144 BUG_ON(tmp != chain_last);
145#endif
146
147 /* add chain to head */
148 set_page_private(chain_last, (unsigned long)*head);
149 *head = chain_first;
150}
151
152static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
b411b363
PR
153{
154 struct page *page = NULL;
45bb912b
LE
155 struct page *tmp = NULL;
156 int i = 0;
b411b363
PR
157
158 /* Yes, testing drbd_pp_vacant outside the lock is racy.
159 * So what. It saves a spin_lock. */
45bb912b 160 if (drbd_pp_vacant >= number) {
b411b363 161 spin_lock(&drbd_pp_lock);
45bb912b
LE
162 page = page_chain_del(&drbd_pp_pool, number);
163 if (page)
164 drbd_pp_vacant -= number;
b411b363 165 spin_unlock(&drbd_pp_lock);
45bb912b
LE
166 if (page)
167 return page;
b411b363 168 }
45bb912b 169
b411b363
PR
170 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
171 * "criss-cross" setup, that might cause write-out on some other DRBD,
172 * which in turn might block on the other node at this very place. */
45bb912b
LE
173 for (i = 0; i < number; i++) {
174 tmp = alloc_page(GFP_TRY);
175 if (!tmp)
176 break;
177 set_page_private(tmp, (unsigned long)page);
178 page = tmp;
179 }
180
181 if (i == number)
182 return page;
183
184 /* Not enough pages immediately available this time.
185 * No need to jump around here, drbd_pp_alloc will retry this
186 * function "soon". */
187 if (page) {
188 tmp = page_chain_tail(page, NULL);
189 spin_lock(&drbd_pp_lock);
190 page_chain_add(&drbd_pp_pool, page, tmp);
191 drbd_pp_vacant += i;
192 spin_unlock(&drbd_pp_lock);
193 }
194 return NULL;
b411b363
PR
195}
196
b411b363
PR
197static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
198{
db830c46 199 struct drbd_peer_request *peer_req;
b411b363
PR
200 struct list_head *le, *tle;
201
202 /* The EEs are always appended to the end of the list. Since
203 they are sent in order over the wire, they have to finish
204 in order. As soon as we see the first not finished we can
205 stop to examine the list... */
206
207 list_for_each_safe(le, tle, &mdev->net_ee) {
db830c46
AG
208 peer_req = list_entry(le, struct drbd_peer_request, w.list);
209 if (drbd_ee_has_active_page(peer_req))
b411b363
PR
210 break;
211 list_move(le, to_be_freed);
212 }
213}
214
215static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
216{
217 LIST_HEAD(reclaimed);
db830c46 218 struct drbd_peer_request *peer_req, *t;
b411b363 219
87eeee41 220 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 221 reclaim_net_ee(mdev, &reclaimed);
87eeee41 222 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 223
db830c46
AG
224 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
225 drbd_free_net_ee(mdev, peer_req);
b411b363
PR
226}
227
228/**
45bb912b 229 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
b411b363 230 * @mdev: DRBD device.
45bb912b
LE
231 * @number: number of pages requested
232 * @retry: whether to retry, if not enough pages are available right now
233 *
234 * Tries to allocate number pages, first from our own page pool, then from
235 * the kernel, unless this allocation would exceed the max_buffers setting.
236 * Possibly retry until DRBD frees sufficient pages somewhere else.
b411b363 237 *
45bb912b 238 * Returns a page chain linked via page->private.
b411b363 239 */
45bb912b 240static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
b411b363
PR
241{
242 struct page *page = NULL;
243 DEFINE_WAIT(wait);
244
45bb912b
LE
245 /* Yes, we may run up to @number over max_buffers. If we
246 * follow it strictly, the admin will get it wrong anyways. */
89e58e75 247 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers)
45bb912b 248 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363 249
45bb912b 250 while (page == NULL) {
b411b363
PR
251 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
252
253 drbd_kick_lo_and_reclaim_net(mdev);
254
89e58e75 255 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers) {
45bb912b 256 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363
PR
257 if (page)
258 break;
259 }
260
261 if (!retry)
262 break;
263
264 if (signal_pending(current)) {
265 dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
266 break;
267 }
268
269 schedule();
270 }
271 finish_wait(&drbd_pp_wait, &wait);
272
45bb912b
LE
273 if (page)
274 atomic_add(number, &mdev->pp_in_use);
b411b363
PR
275 return page;
276}
277
278/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
87eeee41 279 * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
45bb912b
LE
280 * Either links the page chain back to the global pool,
281 * or returns all pages to the system. */
435f0740 282static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
b411b363 283{
435f0740 284 atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
b411b363 285 int i;
435f0740 286
1816a2b4 287 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE)*minor_count)
45bb912b
LE
288 i = page_chain_free(page);
289 else {
290 struct page *tmp;
291 tmp = page_chain_tail(page, &i);
292 spin_lock(&drbd_pp_lock);
293 page_chain_add(&drbd_pp_pool, page, tmp);
294 drbd_pp_vacant += i;
295 spin_unlock(&drbd_pp_lock);
b411b363 296 }
435f0740 297 i = atomic_sub_return(i, a);
45bb912b 298 if (i < 0)
435f0740
LE
299 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
300 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
b411b363
PR
301 wake_up(&drbd_pp_wait);
302}
303
304/*
305You need to hold the req_lock:
306 _drbd_wait_ee_list_empty()
307
308You must not have the req_lock:
309 drbd_free_ee()
310 drbd_alloc_ee()
311 drbd_init_ee()
312 drbd_release_ee()
313 drbd_ee_fix_bhs()
314 drbd_process_done_ee()
315 drbd_clear_done_ee()
316 drbd_wait_ee_list_empty()
317*/
318
f6ffca9f
AG
319struct drbd_peer_request *
320drbd_alloc_ee(struct drbd_conf *mdev, u64 id, sector_t sector,
321 unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
b411b363 322{
db830c46 323 struct drbd_peer_request *peer_req;
b411b363 324 struct page *page;
45bb912b 325 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
b411b363 326
0cf9d27e 327 if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
b411b363
PR
328 return NULL;
329
db830c46
AG
330 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
331 if (!peer_req) {
b411b363
PR
332 if (!(gfp_mask & __GFP_NOWARN))
333 dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
334 return NULL;
335 }
336
45bb912b
LE
337 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
338 if (!page)
339 goto fail;
b411b363 340
db830c46
AG
341 drbd_clear_interval(&peer_req->i);
342 peer_req->i.size = data_size;
343 peer_req->i.sector = sector;
344 peer_req->i.local = false;
345 peer_req->i.waiting = false;
346
347 peer_req->epoch = NULL;
a21e9298 348 peer_req->w.mdev = mdev;
db830c46
AG
349 peer_req->pages = page;
350 atomic_set(&peer_req->pending_bios, 0);
351 peer_req->flags = 0;
9a8e7753
AG
352 /*
353 * The block_id is opaque to the receiver. It is not endianness
354 * converted, and sent back to the sender unchanged.
355 */
db830c46 356 peer_req->block_id = id;
b411b363 357
db830c46 358 return peer_req;
b411b363 359
45bb912b 360 fail:
db830c46 361 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
362 return NULL;
363}
364
db830c46 365void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
f6ffca9f 366 int is_net)
b411b363 367{
db830c46
AG
368 if (peer_req->flags & EE_HAS_DIGEST)
369 kfree(peer_req->digest);
370 drbd_pp_free(mdev, peer_req->pages, is_net);
371 D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
372 D_ASSERT(drbd_interval_empty(&peer_req->i));
373 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
374}
375
376int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
377{
378 LIST_HEAD(work_list);
db830c46 379 struct drbd_peer_request *peer_req, *t;
b411b363 380 int count = 0;
435f0740 381 int is_net = list == &mdev->net_ee;
b411b363 382
87eeee41 383 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 384 list_splice_init(list, &work_list);
87eeee41 385 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 386
db830c46
AG
387 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
388 drbd_free_some_ee(mdev, peer_req, is_net);
b411b363
PR
389 count++;
390 }
391 return count;
392}
393
394
32862ec7 395/* See also comments in _req_mod(,BARRIER_ACKED)
b411b363
PR
396 * and receive_Barrier.
397 *
398 * Move entries from net_ee to done_ee, if ready.
399 * Grab done_ee, call all callbacks, free the entries.
400 * The callbacks typically send out ACKs.
401 */
402static int drbd_process_done_ee(struct drbd_conf *mdev)
403{
404 LIST_HEAD(work_list);
405 LIST_HEAD(reclaimed);
db830c46 406 struct drbd_peer_request *peer_req, *t;
b411b363
PR
407 int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
408
87eeee41 409 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
410 reclaim_net_ee(mdev, &reclaimed);
411 list_splice_init(&mdev->done_ee, &work_list);
87eeee41 412 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 413
db830c46
AG
414 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
415 drbd_free_net_ee(mdev, peer_req);
b411b363
PR
416
417 /* possible callbacks here:
418 * e_end_block, and e_end_resync_block, e_send_discard_ack.
419 * all ignore the last argument.
420 */
db830c46 421 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
b411b363 422 /* list_del not necessary, next/prev members not touched */
db830c46
AG
423 ok = peer_req->w.cb(mdev, &peer_req->w, !ok) && ok;
424 drbd_free_ee(mdev, peer_req);
b411b363
PR
425 }
426 wake_up(&mdev->ee_wait);
427
428 return ok;
429}
430
431void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
432{
433 DEFINE_WAIT(wait);
434
435 /* avoids spin_lock/unlock
436 * and calling prepare_to_wait in the fast path */
437 while (!list_empty(head)) {
438 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
87eeee41 439 spin_unlock_irq(&mdev->tconn->req_lock);
7eaceacc 440 io_schedule();
b411b363 441 finish_wait(&mdev->ee_wait, &wait);
87eeee41 442 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
443 }
444}
445
446void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
447{
87eeee41 448 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 449 _drbd_wait_ee_list_empty(mdev, head);
87eeee41 450 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
451}
452
453/* see also kernel_accept; which is only present since 2.6.18.
454 * also we want to log which part of it failed, exactly */
7653620d 455static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
b411b363
PR
456{
457 struct sock *sk = sock->sk;
458 int err = 0;
459
460 *what = "listen";
461 err = sock->ops->listen(sock, 5);
462 if (err < 0)
463 goto out;
464
465 *what = "sock_create_lite";
466 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
467 newsock);
468 if (err < 0)
469 goto out;
470
471 *what = "accept";
472 err = sock->ops->accept(sock, *newsock, 0);
473 if (err < 0) {
474 sock_release(*newsock);
475 *newsock = NULL;
476 goto out;
477 }
478 (*newsock)->ops = sock->ops;
479
480out:
481 return err;
482}
483
dbd9eea0 484static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
b411b363
PR
485{
486 mm_segment_t oldfs;
487 struct kvec iov = {
488 .iov_base = buf,
489 .iov_len = size,
490 };
491 struct msghdr msg = {
492 .msg_iovlen = 1,
493 .msg_iov = (struct iovec *)&iov,
494 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
495 };
496 int rv;
497
498 oldfs = get_fs();
499 set_fs(KERNEL_DS);
500 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
501 set_fs(oldfs);
502
503 return rv;
504}
505
de0ff338 506static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
b411b363
PR
507{
508 mm_segment_t oldfs;
509 struct kvec iov = {
510 .iov_base = buf,
511 .iov_len = size,
512 };
513 struct msghdr msg = {
514 .msg_iovlen = 1,
515 .msg_iov = (struct iovec *)&iov,
516 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
517 };
518 int rv;
519
520 oldfs = get_fs();
521 set_fs(KERNEL_DS);
522
523 for (;;) {
de0ff338 524 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
b411b363
PR
525 if (rv == size)
526 break;
527
528 /* Note:
529 * ECONNRESET other side closed the connection
530 * ERESTARTSYS (on sock) we got a signal
531 */
532
533 if (rv < 0) {
534 if (rv == -ECONNRESET)
de0ff338 535 conn_info(tconn, "sock was reset by peer\n");
b411b363 536 else if (rv != -ERESTARTSYS)
de0ff338 537 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
b411b363
PR
538 break;
539 } else if (rv == 0) {
de0ff338 540 conn_info(tconn, "sock was shut down by peer\n");
b411b363
PR
541 break;
542 } else {
543 /* signal came in, or peer/link went down,
544 * after we read a partial message
545 */
546 /* D_ASSERT(signal_pending(current)); */
547 break;
548 }
549 };
550
551 set_fs(oldfs);
552
553 if (rv != size)
de0ff338 554 drbd_force_state(tconn->volume0, NS(conn, C_BROKEN_PIPE));
b411b363
PR
555
556 return rv;
557}
558
5dbf1673
LE
559/* quoting tcp(7):
560 * On individual connections, the socket buffer size must be set prior to the
561 * listen(2) or connect(2) calls in order to have it take effect.
562 * This is our wrapper to do so.
563 */
564static void drbd_setbufsize(struct socket *sock, unsigned int snd,
565 unsigned int rcv)
566{
567 /* open coded SO_SNDBUF, SO_RCVBUF */
568 if (snd) {
569 sock->sk->sk_sndbuf = snd;
570 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
571 }
572 if (rcv) {
573 sock->sk->sk_rcvbuf = rcv;
574 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
575 }
576}
577
eac3e990 578static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
b411b363
PR
579{
580 const char *what;
581 struct socket *sock;
582 struct sockaddr_in6 src_in6;
583 int err;
584 int disconnect_on_error = 1;
585
eac3e990 586 if (!get_net_conf(tconn))
b411b363
PR
587 return NULL;
588
589 what = "sock_create_kern";
eac3e990 590 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
b411b363
PR
591 SOCK_STREAM, IPPROTO_TCP, &sock);
592 if (err < 0) {
593 sock = NULL;
594 goto out;
595 }
596
597 sock->sk->sk_rcvtimeo =
eac3e990
PR
598 sock->sk->sk_sndtimeo = tconn->net_conf->try_connect_int*HZ;
599 drbd_setbufsize(sock, tconn->net_conf->sndbuf_size,
600 tconn->net_conf->rcvbuf_size);
b411b363
PR
601
602 /* explicitly bind to the configured IP as source IP
603 * for the outgoing connections.
604 * This is needed for multihomed hosts and to be
605 * able to use lo: interfaces for drbd.
606 * Make sure to use 0 as port number, so linux selects
607 * a free one dynamically.
608 */
eac3e990
PR
609 memcpy(&src_in6, tconn->net_conf->my_addr,
610 min_t(int, tconn->net_conf->my_addr_len, sizeof(src_in6)));
611 if (((struct sockaddr *)tconn->net_conf->my_addr)->sa_family == AF_INET6)
b411b363
PR
612 src_in6.sin6_port = 0;
613 else
614 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
615
616 what = "bind before connect";
617 err = sock->ops->bind(sock,
618 (struct sockaddr *) &src_in6,
eac3e990 619 tconn->net_conf->my_addr_len);
b411b363
PR
620 if (err < 0)
621 goto out;
622
623 /* connect may fail, peer not yet available.
624 * stay C_WF_CONNECTION, don't go Disconnecting! */
625 disconnect_on_error = 0;
626 what = "connect";
627 err = sock->ops->connect(sock,
eac3e990
PR
628 (struct sockaddr *)tconn->net_conf->peer_addr,
629 tconn->net_conf->peer_addr_len, 0);
b411b363
PR
630
631out:
632 if (err < 0) {
633 if (sock) {
634 sock_release(sock);
635 sock = NULL;
636 }
637 switch (-err) {
638 /* timeout, busy, signal pending */
639 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
640 case EINTR: case ERESTARTSYS:
641 /* peer not (yet) available, network problem */
642 case ECONNREFUSED: case ENETUNREACH:
643 case EHOSTDOWN: case EHOSTUNREACH:
644 disconnect_on_error = 0;
645 break;
646 default:
eac3e990 647 conn_err(tconn, "%s failed, err = %d\n", what, err);
b411b363
PR
648 }
649 if (disconnect_on_error)
eac3e990 650 drbd_force_state(tconn->volume0, NS(conn, C_DISCONNECTING));
b411b363 651 }
eac3e990 652 put_net_conf(tconn);
b411b363
PR
653 return sock;
654}
655
7653620d 656static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
b411b363
PR
657{
658 int timeo, err;
659 struct socket *s_estab = NULL, *s_listen;
660 const char *what;
661
7653620d 662 if (!get_net_conf(tconn))
b411b363
PR
663 return NULL;
664
665 what = "sock_create_kern";
7653620d 666 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
b411b363
PR
667 SOCK_STREAM, IPPROTO_TCP, &s_listen);
668 if (err) {
669 s_listen = NULL;
670 goto out;
671 }
672
7653620d 673 timeo = tconn->net_conf->try_connect_int * HZ;
b411b363
PR
674 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
675
676 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
677 s_listen->sk->sk_rcvtimeo = timeo;
678 s_listen->sk->sk_sndtimeo = timeo;
7653620d
PR
679 drbd_setbufsize(s_listen, tconn->net_conf->sndbuf_size,
680 tconn->net_conf->rcvbuf_size);
b411b363
PR
681
682 what = "bind before listen";
683 err = s_listen->ops->bind(s_listen,
7653620d
PR
684 (struct sockaddr *) tconn->net_conf->my_addr,
685 tconn->net_conf->my_addr_len);
b411b363
PR
686 if (err < 0)
687 goto out;
688
7653620d 689 err = drbd_accept(&what, s_listen, &s_estab);
b411b363
PR
690
691out:
692 if (s_listen)
693 sock_release(s_listen);
694 if (err < 0) {
695 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
7653620d
PR
696 conn_err(tconn, "%s failed, err = %d\n", what, err);
697 drbd_force_state(tconn->volume0, NS(conn, C_DISCONNECTING));
b411b363
PR
698 }
699 }
7653620d 700 put_net_conf(tconn);
b411b363
PR
701
702 return s_estab;
703}
704
d38e787e 705static int drbd_send_fp(struct drbd_tconn *tconn, struct socket *sock, enum drbd_packet cmd)
b411b363 706{
d38e787e 707 struct p_header *h = &tconn->data.sbuf.header;
b411b363 708
d38e787e 709 return _conn_send_cmd(tconn, 0, sock, cmd, h, sizeof(*h), 0);
b411b363
PR
710}
711
a25b63f1 712static enum drbd_packet drbd_recv_fp(struct drbd_tconn *tconn, struct socket *sock)
b411b363 713{
a25b63f1 714 struct p_header80 *h = &tconn->data.rbuf.header.h80;
b411b363
PR
715 int rr;
716
dbd9eea0 717 rr = drbd_recv_short(sock, h, sizeof(*h), 0);
b411b363 718
ca9bc12b 719 if (rr == sizeof(*h) && h->magic == cpu_to_be32(DRBD_MAGIC))
b411b363
PR
720 return be16_to_cpu(h->command);
721
722 return 0xffff;
723}
724
725/**
726 * drbd_socket_okay() - Free the socket if its connection is not okay
b411b363
PR
727 * @sock: pointer to the pointer to the socket.
728 */
dbd9eea0 729static int drbd_socket_okay(struct socket **sock)
b411b363
PR
730{
731 int rr;
732 char tb[4];
733
734 if (!*sock)
81e84650 735 return false;
b411b363 736
dbd9eea0 737 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
b411b363
PR
738
739 if (rr > 0 || rr == -EAGAIN) {
81e84650 740 return true;
b411b363
PR
741 } else {
742 sock_release(*sock);
743 *sock = NULL;
81e84650 744 return false;
b411b363
PR
745 }
746}
747
907599e0
PR
748static int drbd_connected(int vnr, void *p, void *data)
749{
750 struct drbd_conf *mdev = (struct drbd_conf *)p;
751 int ok = 1;
752
753 atomic_set(&mdev->packet_seq, 0);
754 mdev->peer_seq = 0;
755
756 ok &= drbd_send_sync_param(mdev, &mdev->sync_conf);
757 ok &= drbd_send_sizes(mdev, 0, 0);
758 ok &= drbd_send_uuids(mdev);
759 ok &= drbd_send_state(mdev);
760 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
761 clear_bit(RESIZE_PENDING, &mdev->flags);
762
763 return !ok;
764}
765
b411b363
PR
766/*
767 * return values:
768 * 1 yes, we have a valid connection
769 * 0 oops, did not work out, please try again
770 * -1 peer talks different language,
771 * no point in trying again, please go standalone.
772 * -2 We do not have a network config...
773 */
907599e0 774static int drbd_connect(struct drbd_tconn *tconn)
b411b363
PR
775{
776 struct socket *s, *sock, *msock;
777 int try, h, ok;
778
907599e0 779 if (drbd_request_state(tconn->volume0, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
b411b363
PR
780 return -2;
781
907599e0
PR
782 clear_bit(DISCARD_CONCURRENT, &tconn->flags);
783 tconn->agreed_pro_version = 99;
fd340c12
PR
784 /* agreed_pro_version must be smaller than 100 so we send the old
785 header (h80) in the first packet and in the handshake packet. */
b411b363
PR
786
787 sock = NULL;
788 msock = NULL;
789
790 do {
791 for (try = 0;;) {
792 /* 3 tries, this should take less than a second! */
907599e0 793 s = drbd_try_connect(tconn);
b411b363
PR
794 if (s || ++try >= 3)
795 break;
796 /* give the other side time to call bind() & listen() */
20ee6390 797 schedule_timeout_interruptible(HZ / 10);
b411b363
PR
798 }
799
800 if (s) {
801 if (!sock) {
907599e0 802 drbd_send_fp(tconn, s, P_HAND_SHAKE_S);
b411b363
PR
803 sock = s;
804 s = NULL;
805 } else if (!msock) {
907599e0 806 drbd_send_fp(tconn, s, P_HAND_SHAKE_M);
b411b363
PR
807 msock = s;
808 s = NULL;
809 } else {
907599e0 810 conn_err(tconn, "Logic error in drbd_connect()\n");
b411b363
PR
811 goto out_release_sockets;
812 }
813 }
814
815 if (sock && msock) {
907599e0 816 schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
dbd9eea0
PR
817 ok = drbd_socket_okay(&sock);
818 ok = drbd_socket_okay(&msock) && ok;
b411b363
PR
819 if (ok)
820 break;
821 }
822
823retry:
907599e0 824 s = drbd_wait_for_connect(tconn);
b411b363 825 if (s) {
907599e0 826 try = drbd_recv_fp(tconn, s);
dbd9eea0
PR
827 drbd_socket_okay(&sock);
828 drbd_socket_okay(&msock);
b411b363
PR
829 switch (try) {
830 case P_HAND_SHAKE_S:
831 if (sock) {
907599e0 832 conn_warn(tconn, "initial packet S crossed\n");
b411b363
PR
833 sock_release(sock);
834 }
835 sock = s;
836 break;
837 case P_HAND_SHAKE_M:
838 if (msock) {
907599e0 839 conn_warn(tconn, "initial packet M crossed\n");
b411b363
PR
840 sock_release(msock);
841 }
842 msock = s;
907599e0 843 set_bit(DISCARD_CONCURRENT, &tconn->flags);
b411b363
PR
844 break;
845 default:
907599e0 846 conn_warn(tconn, "Error receiving initial packet\n");
b411b363
PR
847 sock_release(s);
848 if (random32() & 1)
849 goto retry;
850 }
851 }
852
907599e0 853 if (tconn->volume0->state.conn <= C_DISCONNECTING)
b411b363
PR
854 goto out_release_sockets;
855 if (signal_pending(current)) {
856 flush_signals(current);
857 smp_rmb();
907599e0 858 if (get_t_state(&tconn->receiver) == EXITING)
b411b363
PR
859 goto out_release_sockets;
860 }
861
862 if (sock && msock) {
dbd9eea0
PR
863 ok = drbd_socket_okay(&sock);
864 ok = drbd_socket_okay(&msock) && ok;
b411b363
PR
865 if (ok)
866 break;
867 }
868 } while (1);
869
870 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
871 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
872
873 sock->sk->sk_allocation = GFP_NOIO;
874 msock->sk->sk_allocation = GFP_NOIO;
875
876 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
877 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
878
b411b363 879 /* NOT YET ...
907599e0 880 * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
b411b363
PR
881 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
882 * first set it to the P_HAND_SHAKE timeout,
883 * which we set to 4x the configured ping_timeout. */
884 sock->sk->sk_sndtimeo =
907599e0 885 sock->sk->sk_rcvtimeo = tconn->net_conf->ping_timeo*4*HZ/10;
b411b363 886
907599e0
PR
887 msock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
888 msock->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
b411b363
PR
889
890 /* we don't want delays.
25985edc 891 * we use TCP_CORK where appropriate, though */
b411b363
PR
892 drbd_tcp_nodelay(sock);
893 drbd_tcp_nodelay(msock);
894
907599e0
PR
895 tconn->data.socket = sock;
896 tconn->meta.socket = msock;
897 tconn->last_received = jiffies;
b411b363 898
907599e0 899 h = drbd_do_handshake(tconn);
b411b363
PR
900 if (h <= 0)
901 return h;
902
907599e0 903 if (tconn->cram_hmac_tfm) {
b411b363 904 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
907599e0 905 switch (drbd_do_auth(tconn)) {
b10d96cb 906 case -1:
907599e0 907 conn_err(tconn, "Authentication of peer failed\n");
b411b363 908 return -1;
b10d96cb 909 case 0:
907599e0 910 conn_err(tconn, "Authentication of peer failed, trying again.\n");
b10d96cb 911 return 0;
b411b363
PR
912 }
913 }
914
907599e0 915 if (drbd_request_state(tconn->volume0, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
b411b363
PR
916 return 0;
917
907599e0 918 sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
b411b363
PR
919 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
920
907599e0 921 drbd_thread_start(&tconn->asender);
b411b363 922
907599e0 923 if (drbd_send_protocol(tconn) == -1)
7e2455c1 924 return -1;
b411b363 925
907599e0 926 return !idr_for_each(&tconn->volumes, drbd_connected, tconn);
b411b363
PR
927
928out_release_sockets:
929 if (sock)
930 sock_release(sock);
931 if (msock)
932 sock_release(msock);
933 return -1;
934}
935
ce243853 936static bool decode_header(struct drbd_tconn *tconn, struct p_header *h, struct packet_info *pi)
b411b363 937{
fd340c12 938 if (h->h80.magic == cpu_to_be32(DRBD_MAGIC)) {
77351055
PR
939 pi->cmd = be16_to_cpu(h->h80.command);
940 pi->size = be16_to_cpu(h->h80.length);
eefc2f7d 941 pi->vnr = 0;
ca9bc12b 942 } else if (h->h95.magic == cpu_to_be16(DRBD_MAGIC_BIG)) {
77351055
PR
943 pi->cmd = be16_to_cpu(h->h95.command);
944 pi->size = be32_to_cpu(h->h95.length) & 0x00ffffff;
945 pi->vnr = 0;
02918be2 946 } else {
ce243853 947 conn_err(tconn, "magic?? on data m: 0x%08x c: %d l: %d\n",
004352fa
LE
948 be32_to_cpu(h->h80.magic),
949 be16_to_cpu(h->h80.command),
950 be16_to_cpu(h->h80.length));
81e84650 951 return false;
b411b363 952 }
257d0af6
PR
953 return true;
954}
955
9ba7aa00 956static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
257d0af6 957{
9ba7aa00 958 struct p_header *h = &tconn->data.rbuf.header;
257d0af6
PR
959 int r;
960
9ba7aa00 961 r = drbd_recv(tconn, h, sizeof(*h));
257d0af6
PR
962 if (unlikely(r != sizeof(*h))) {
963 if (!signal_pending(current))
9ba7aa00 964 conn_warn(tconn, "short read expecting header on sock: r=%d\n", r);
257d0af6
PR
965 return false;
966 }
967
9ba7aa00
PR
968 r = decode_header(tconn, h, pi);
969 tconn->last_received = jiffies;
b411b363 970
257d0af6 971 return r;
b411b363
PR
972}
973
2451fc3b 974static void drbd_flush(struct drbd_conf *mdev)
b411b363
PR
975{
976 int rv;
977
978 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
fbd9b09a 979 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
dd3932ed 980 NULL);
b411b363
PR
981 if (rv) {
982 dev_err(DEV, "local disk flush failed with status %d\n", rv);
983 /* would rather check on EOPNOTSUPP, but that is not reliable.
984 * don't try again for ANY return value != 0
985 * if (rv == -EOPNOTSUPP) */
986 drbd_bump_write_ordering(mdev, WO_drain_io);
987 }
988 put_ldev(mdev);
989 }
b411b363
PR
990}
991
992/**
993 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
994 * @mdev: DRBD device.
995 * @epoch: Epoch object.
996 * @ev: Epoch event.
997 */
998static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
999 struct drbd_epoch *epoch,
1000 enum epoch_event ev)
1001{
2451fc3b 1002 int epoch_size;
b411b363 1003 struct drbd_epoch *next_epoch;
b411b363
PR
1004 enum finish_epoch rv = FE_STILL_LIVE;
1005
1006 spin_lock(&mdev->epoch_lock);
1007 do {
1008 next_epoch = NULL;
b411b363
PR
1009
1010 epoch_size = atomic_read(&epoch->epoch_size);
1011
1012 switch (ev & ~EV_CLEANUP) {
1013 case EV_PUT:
1014 atomic_dec(&epoch->active);
1015 break;
1016 case EV_GOT_BARRIER_NR:
1017 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
b411b363
PR
1018 break;
1019 case EV_BECAME_LAST:
1020 /* nothing to do*/
1021 break;
1022 }
1023
b411b363
PR
1024 if (epoch_size != 0 &&
1025 atomic_read(&epoch->active) == 0 &&
2451fc3b 1026 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
b411b363
PR
1027 if (!(ev & EV_CLEANUP)) {
1028 spin_unlock(&mdev->epoch_lock);
1029 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1030 spin_lock(&mdev->epoch_lock);
1031 }
1032 dec_unacked(mdev);
1033
1034 if (mdev->current_epoch != epoch) {
1035 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1036 list_del(&epoch->list);
1037 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1038 mdev->epochs--;
b411b363
PR
1039 kfree(epoch);
1040
1041 if (rv == FE_STILL_LIVE)
1042 rv = FE_DESTROYED;
1043 } else {
1044 epoch->flags = 0;
1045 atomic_set(&epoch->epoch_size, 0);
698f9315 1046 /* atomic_set(&epoch->active, 0); is already zero */
b411b363
PR
1047 if (rv == FE_STILL_LIVE)
1048 rv = FE_RECYCLED;
2451fc3b 1049 wake_up(&mdev->ee_wait);
b411b363
PR
1050 }
1051 }
1052
1053 if (!next_epoch)
1054 break;
1055
1056 epoch = next_epoch;
1057 } while (1);
1058
1059 spin_unlock(&mdev->epoch_lock);
1060
b411b363
PR
1061 return rv;
1062}
1063
1064/**
1065 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1066 * @mdev: DRBD device.
1067 * @wo: Write ordering method to try.
1068 */
1069void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1070{
1071 enum write_ordering_e pwo;
1072 static char *write_ordering_str[] = {
1073 [WO_none] = "none",
1074 [WO_drain_io] = "drain",
1075 [WO_bdev_flush] = "flush",
b411b363
PR
1076 };
1077
1078 pwo = mdev->write_ordering;
1079 wo = min(pwo, wo);
b411b363
PR
1080 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1081 wo = WO_drain_io;
1082 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1083 wo = WO_none;
1084 mdev->write_ordering = wo;
2451fc3b 1085 if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
b411b363
PR
1086 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1087}
1088
45bb912b
LE
1089/**
1090 * drbd_submit_ee()
1091 * @mdev: DRBD device.
db830c46 1092 * @peer_req: peer request
45bb912b 1093 * @rw: flag field, see bio->bi_rw
10f6d992
LE
1094 *
1095 * May spread the pages to multiple bios,
1096 * depending on bio_add_page restrictions.
1097 *
1098 * Returns 0 if all bios have been submitted,
1099 * -ENOMEM if we could not allocate enough bios,
1100 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1101 * single page to an empty bio (which should never happen and likely indicates
1102 * that the lower level IO stack is in some way broken). This has been observed
1103 * on certain Xen deployments.
45bb912b
LE
1104 */
1105/* TODO allocate from our own bio_set. */
db830c46 1106int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
f6ffca9f 1107 const unsigned rw, const int fault_type)
45bb912b
LE
1108{
1109 struct bio *bios = NULL;
1110 struct bio *bio;
db830c46
AG
1111 struct page *page = peer_req->pages;
1112 sector_t sector = peer_req->i.sector;
1113 unsigned ds = peer_req->i.size;
45bb912b
LE
1114 unsigned n_bios = 0;
1115 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
10f6d992 1116 int err = -ENOMEM;
45bb912b
LE
1117
1118 /* In most cases, we will only need one bio. But in case the lower
1119 * level restrictions happen to be different at this offset on this
1120 * side than those of the sending peer, we may need to submit the
1121 * request in more than one bio. */
1122next_bio:
1123 bio = bio_alloc(GFP_NOIO, nr_pages);
1124 if (!bio) {
1125 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1126 goto fail;
1127 }
db830c46 1128 /* > peer_req->i.sector, unless this is the first bio */
45bb912b
LE
1129 bio->bi_sector = sector;
1130 bio->bi_bdev = mdev->ldev->backing_bdev;
45bb912b 1131 bio->bi_rw = rw;
db830c46 1132 bio->bi_private = peer_req;
45bb912b
LE
1133 bio->bi_end_io = drbd_endio_sec;
1134
1135 bio->bi_next = bios;
1136 bios = bio;
1137 ++n_bios;
1138
1139 page_chain_for_each(page) {
1140 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1141 if (!bio_add_page(bio, page, len, 0)) {
10f6d992
LE
1142 /* A single page must always be possible!
1143 * But in case it fails anyways,
1144 * we deal with it, and complain (below). */
1145 if (bio->bi_vcnt == 0) {
1146 dev_err(DEV,
1147 "bio_add_page failed for len=%u, "
1148 "bi_vcnt=0 (bi_sector=%llu)\n",
1149 len, (unsigned long long)bio->bi_sector);
1150 err = -ENOSPC;
1151 goto fail;
1152 }
45bb912b
LE
1153 goto next_bio;
1154 }
1155 ds -= len;
1156 sector += len >> 9;
1157 --nr_pages;
1158 }
1159 D_ASSERT(page == NULL);
1160 D_ASSERT(ds == 0);
1161
db830c46 1162 atomic_set(&peer_req->pending_bios, n_bios);
45bb912b
LE
1163 do {
1164 bio = bios;
1165 bios = bios->bi_next;
1166 bio->bi_next = NULL;
1167
45bb912b 1168 drbd_generic_make_request(mdev, fault_type, bio);
45bb912b 1169 } while (bios);
45bb912b
LE
1170 return 0;
1171
1172fail:
1173 while (bios) {
1174 bio = bios;
1175 bios = bios->bi_next;
1176 bio_put(bio);
1177 }
10f6d992 1178 return err;
45bb912b
LE
1179}
1180
53840641 1181static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
db830c46 1182 struct drbd_peer_request *peer_req)
53840641 1183{
db830c46 1184 struct drbd_interval *i = &peer_req->i;
53840641
AG
1185
1186 drbd_remove_interval(&mdev->write_requests, i);
1187 drbd_clear_interval(i);
1188
6c852bec 1189 /* Wake up any processes waiting for this peer request to complete. */
53840641
AG
1190 if (i->waiting)
1191 wake_up(&mdev->misc_wait);
1192}
1193
d8763023
AG
1194static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packet cmd,
1195 unsigned int data_size)
b411b363 1196{
2451fc3b 1197 int rv;
e42325a5 1198 struct p_barrier *p = &mdev->tconn->data.rbuf.barrier;
b411b363
PR
1199 struct drbd_epoch *epoch;
1200
b411b363
PR
1201 inc_unacked(mdev);
1202
b411b363
PR
1203 mdev->current_epoch->barrier_nr = p->barrier;
1204 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1205
1206 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1207 * the activity log, which means it would not be resynced in case the
1208 * R_PRIMARY crashes now.
1209 * Therefore we must send the barrier_ack after the barrier request was
1210 * completed. */
1211 switch (mdev->write_ordering) {
b411b363
PR
1212 case WO_none:
1213 if (rv == FE_RECYCLED)
81e84650 1214 return true;
2451fc3b
PR
1215
1216 /* receiver context, in the writeout path of the other node.
1217 * avoid potential distributed deadlock */
1218 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1219 if (epoch)
1220 break;
1221 else
1222 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1223 /* Fall through */
b411b363
PR
1224
1225 case WO_bdev_flush:
1226 case WO_drain_io:
b411b363 1227 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
2451fc3b
PR
1228 drbd_flush(mdev);
1229
1230 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1231 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1232 if (epoch)
1233 break;
b411b363
PR
1234 }
1235
2451fc3b
PR
1236 epoch = mdev->current_epoch;
1237 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1238
1239 D_ASSERT(atomic_read(&epoch->active) == 0);
1240 D_ASSERT(epoch->flags == 0);
b411b363 1241
81e84650 1242 return true;
2451fc3b
PR
1243 default:
1244 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
81e84650 1245 return false;
b411b363
PR
1246 }
1247
1248 epoch->flags = 0;
1249 atomic_set(&epoch->epoch_size, 0);
1250 atomic_set(&epoch->active, 0);
1251
1252 spin_lock(&mdev->epoch_lock);
1253 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1254 list_add(&epoch->list, &mdev->current_epoch->list);
1255 mdev->current_epoch = epoch;
1256 mdev->epochs++;
b411b363
PR
1257 } else {
1258 /* The current_epoch got recycled while we allocated this one... */
1259 kfree(epoch);
1260 }
1261 spin_unlock(&mdev->epoch_lock);
1262
81e84650 1263 return true;
b411b363
PR
1264}
1265
1266/* used from receive_RSDataReply (recv_resync_read)
1267 * and from receive_Data */
f6ffca9f
AG
1268static struct drbd_peer_request *
1269read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1270 int data_size) __must_hold(local)
b411b363 1271{
6666032a 1272 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
db830c46 1273 struct drbd_peer_request *peer_req;
b411b363 1274 struct page *page;
45bb912b 1275 int dgs, ds, rr;
a0638456
PR
1276 void *dig_in = mdev->tconn->int_dig_in;
1277 void *dig_vv = mdev->tconn->int_dig_vv;
6b4388ac 1278 unsigned long *data;
b411b363 1279
a0638456
PR
1280 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1281 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
b411b363
PR
1282
1283 if (dgs) {
de0ff338 1284 rr = drbd_recv(mdev->tconn, dig_in, dgs);
b411b363 1285 if (rr != dgs) {
0ddc5549
LE
1286 if (!signal_pending(current))
1287 dev_warn(DEV,
1288 "short read receiving data digest: read %d expected %d\n",
1289 rr, dgs);
b411b363
PR
1290 return NULL;
1291 }
1292 }
1293
1294 data_size -= dgs;
1295
841ce241
AG
1296 if (!expect(data_size != 0))
1297 return NULL;
1298 if (!expect(IS_ALIGNED(data_size, 512)))
1299 return NULL;
1300 if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1301 return NULL;
b411b363 1302
6666032a
LE
1303 /* even though we trust out peer,
1304 * we sometimes have to double check. */
1305 if (sector + (data_size>>9) > capacity) {
fdda6544
LE
1306 dev_err(DEV, "request from peer beyond end of local disk: "
1307 "capacity: %llus < sector: %llus + size: %u\n",
6666032a
LE
1308 (unsigned long long)capacity,
1309 (unsigned long long)sector, data_size);
1310 return NULL;
1311 }
1312
b411b363
PR
1313 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1314 * "criss-cross" setup, that might cause write-out on some other DRBD,
1315 * which in turn might block on the other node at this very place. */
db830c46
AG
1316 peer_req = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1317 if (!peer_req)
b411b363 1318 return NULL;
45bb912b 1319
b411b363 1320 ds = data_size;
db830c46 1321 page = peer_req->pages;
45bb912b
LE
1322 page_chain_for_each(page) {
1323 unsigned len = min_t(int, ds, PAGE_SIZE);
6b4388ac 1324 data = kmap(page);
de0ff338 1325 rr = drbd_recv(mdev->tconn, data, len);
0cf9d27e 1326 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
6b4388ac
PR
1327 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1328 data[0] = data[0] ^ (unsigned long)-1;
1329 }
b411b363 1330 kunmap(page);
45bb912b 1331 if (rr != len) {
db830c46 1332 drbd_free_ee(mdev, peer_req);
0ddc5549
LE
1333 if (!signal_pending(current))
1334 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1335 rr, len);
b411b363
PR
1336 return NULL;
1337 }
1338 ds -= rr;
1339 }
1340
1341 if (dgs) {
db830c46 1342 drbd_csum_ee(mdev, mdev->tconn->integrity_r_tfm, peer_req, dig_vv);
b411b363 1343 if (memcmp(dig_in, dig_vv, dgs)) {
470be44a
LE
1344 dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1345 (unsigned long long)sector, data_size);
b411b363 1346 drbd_bcast_ee(mdev, "digest failed",
db830c46
AG
1347 dgs, dig_in, dig_vv, peer_req);
1348 drbd_free_ee(mdev, peer_req);
b411b363
PR
1349 return NULL;
1350 }
1351 }
1352 mdev->recv_cnt += data_size>>9;
db830c46 1353 return peer_req;
b411b363
PR
1354}
1355
1356/* drbd_drain_block() just takes a data block
1357 * out of the socket input buffer, and discards it.
1358 */
1359static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1360{
1361 struct page *page;
1362 int rr, rv = 1;
1363 void *data;
1364
c3470cde 1365 if (!data_size)
81e84650 1366 return true;
c3470cde 1367
45bb912b 1368 page = drbd_pp_alloc(mdev, 1, 1);
b411b363
PR
1369
1370 data = kmap(page);
1371 while (data_size) {
de0ff338 1372 rr = drbd_recv(mdev->tconn, data, min_t(int, data_size, PAGE_SIZE));
b411b363
PR
1373 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1374 rv = 0;
0ddc5549
LE
1375 if (!signal_pending(current))
1376 dev_warn(DEV,
1377 "short read receiving data: read %d expected %d\n",
1378 rr, min_t(int, data_size, PAGE_SIZE));
b411b363
PR
1379 break;
1380 }
1381 data_size -= rr;
1382 }
1383 kunmap(page);
435f0740 1384 drbd_pp_free(mdev, page, 0);
b411b363
PR
1385 return rv;
1386}
1387
1388static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1389 sector_t sector, int data_size)
1390{
1391 struct bio_vec *bvec;
1392 struct bio *bio;
1393 int dgs, rr, i, expect;
a0638456
PR
1394 void *dig_in = mdev->tconn->int_dig_in;
1395 void *dig_vv = mdev->tconn->int_dig_vv;
b411b363 1396
a0638456
PR
1397 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1398 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
b411b363
PR
1399
1400 if (dgs) {
de0ff338 1401 rr = drbd_recv(mdev->tconn, dig_in, dgs);
b411b363 1402 if (rr != dgs) {
0ddc5549
LE
1403 if (!signal_pending(current))
1404 dev_warn(DEV,
1405 "short read receiving data reply digest: read %d expected %d\n",
1406 rr, dgs);
b411b363
PR
1407 return 0;
1408 }
1409 }
1410
1411 data_size -= dgs;
1412
1413 /* optimistically update recv_cnt. if receiving fails below,
1414 * we disconnect anyways, and counters will be reset. */
1415 mdev->recv_cnt += data_size>>9;
1416
1417 bio = req->master_bio;
1418 D_ASSERT(sector == bio->bi_sector);
1419
1420 bio_for_each_segment(bvec, bio, i) {
1421 expect = min_t(int, data_size, bvec->bv_len);
de0ff338 1422 rr = drbd_recv(mdev->tconn,
b411b363
PR
1423 kmap(bvec->bv_page)+bvec->bv_offset,
1424 expect);
1425 kunmap(bvec->bv_page);
1426 if (rr != expect) {
0ddc5549
LE
1427 if (!signal_pending(current))
1428 dev_warn(DEV, "short read receiving data reply: "
1429 "read %d expected %d\n",
1430 rr, expect);
b411b363
PR
1431 return 0;
1432 }
1433 data_size -= rr;
1434 }
1435
1436 if (dgs) {
a0638456 1437 drbd_csum_bio(mdev, mdev->tconn->integrity_r_tfm, bio, dig_vv);
b411b363
PR
1438 if (memcmp(dig_in, dig_vv, dgs)) {
1439 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1440 return 0;
1441 }
1442 }
1443
1444 D_ASSERT(data_size == 0);
1445 return 1;
1446}
1447
1448/* e_end_resync_block() is called via
1449 * drbd_process_done_ee() by asender only */
1450static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1451{
db830c46
AG
1452 struct drbd_peer_request *peer_req = (struct drbd_peer_request *)w;
1453 sector_t sector = peer_req->i.sector;
b411b363
PR
1454 int ok;
1455
db830c46 1456 D_ASSERT(drbd_interval_empty(&peer_req->i));
b411b363 1457
db830c46
AG
1458 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1459 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1460 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
b411b363
PR
1461 } else {
1462 /* Record failure to sync */
db830c46 1463 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
b411b363 1464
db830c46 1465 ok = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
b411b363
PR
1466 }
1467 dec_unacked(mdev);
1468
1469 return ok;
1470}
1471
1472static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1473{
db830c46 1474 struct drbd_peer_request *peer_req;
b411b363 1475
db830c46
AG
1476 peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1477 if (!peer_req)
45bb912b 1478 goto fail;
b411b363
PR
1479
1480 dec_rs_pending(mdev);
1481
b411b363
PR
1482 inc_unacked(mdev);
1483 /* corresponding dec_unacked() in e_end_resync_block()
1484 * respective _drbd_clear_done_ee */
1485
db830c46 1486 peer_req->w.cb = e_end_resync_block;
45bb912b 1487
87eeee41 1488 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 1489 list_add(&peer_req->w.list, &mdev->sync_ee);
87eeee41 1490 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 1491
0f0601f4 1492 atomic_add(data_size >> 9, &mdev->rs_sect_ev);
db830c46 1493 if (drbd_submit_ee(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
81e84650 1494 return true;
b411b363 1495
10f6d992
LE
1496 /* don't care for the reason here */
1497 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 1498 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 1499 list_del(&peer_req->w.list);
87eeee41 1500 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9 1501
db830c46 1502 drbd_free_ee(mdev, peer_req);
45bb912b
LE
1503fail:
1504 put_ldev(mdev);
81e84650 1505 return false;
b411b363
PR
1506}
1507
668eebc6 1508static struct drbd_request *
bc9c5c41
AG
1509find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1510 sector_t sector, bool missing_ok, const char *func)
51624585 1511{
51624585
AG
1512 struct drbd_request *req;
1513
bc9c5c41
AG
1514 /* Request object according to our peer */
1515 req = (struct drbd_request *)(unsigned long)id;
5e472264 1516 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
668eebc6 1517 return req;
c3afd8f5
AG
1518 if (!missing_ok) {
1519 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1520 (unsigned long)id, (unsigned long long)sector);
1521 }
51624585
AG
1522 return NULL;
1523}
1524
d8763023
AG
1525static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packet cmd,
1526 unsigned int data_size)
b411b363
PR
1527{
1528 struct drbd_request *req;
1529 sector_t sector;
b411b363 1530 int ok;
e42325a5 1531 struct p_data *p = &mdev->tconn->data.rbuf.data;
b411b363
PR
1532
1533 sector = be64_to_cpu(p->sector);
1534
87eeee41 1535 spin_lock_irq(&mdev->tconn->req_lock);
bc9c5c41 1536 req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
87eeee41 1537 spin_unlock_irq(&mdev->tconn->req_lock);
c3afd8f5 1538 if (unlikely(!req))
81e84650 1539 return false;
b411b363 1540
24c4830c 1541 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
b411b363
PR
1542 * special casing it there for the various failure cases.
1543 * still no race with drbd_fail_pending_reads */
1544 ok = recv_dless_read(mdev, req, sector, data_size);
1545
1546 if (ok)
8554df1c 1547 req_mod(req, DATA_RECEIVED);
b411b363
PR
1548 /* else: nothing. handled from drbd_disconnect...
1549 * I don't think we may complete this just yet
1550 * in case we are "on-disconnect: freeze" */
1551
1552 return ok;
1553}
1554
d8763023
AG
1555static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packet cmd,
1556 unsigned int data_size)
b411b363
PR
1557{
1558 sector_t sector;
b411b363 1559 int ok;
e42325a5 1560 struct p_data *p = &mdev->tconn->data.rbuf.data;
b411b363
PR
1561
1562 sector = be64_to_cpu(p->sector);
1563 D_ASSERT(p->block_id == ID_SYNCER);
1564
1565 if (get_ldev(mdev)) {
1566 /* data is submitted to disk within recv_resync_read.
1567 * corresponding put_ldev done below on error,
9c50842a 1568 * or in drbd_endio_sec. */
b411b363
PR
1569 ok = recv_resync_read(mdev, sector, data_size);
1570 } else {
1571 if (__ratelimit(&drbd_ratelimit_state))
1572 dev_err(DEV, "Can not write resync data to local disk.\n");
1573
1574 ok = drbd_drain_block(mdev, data_size);
1575
2b2bf214 1576 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
b411b363
PR
1577 }
1578
778f271d
PR
1579 atomic_add(data_size >> 9, &mdev->rs_sect_in);
1580
b411b363
PR
1581 return ok;
1582}
1583
1584/* e_end_block() is called via drbd_process_done_ee().
1585 * this means this function only runs in the asender thread
1586 */
1587static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1588{
db830c46
AG
1589 struct drbd_peer_request *peer_req = (struct drbd_peer_request *)w;
1590 sector_t sector = peer_req->i.sector;
b411b363
PR
1591 int ok = 1, pcmd;
1592
89e58e75 1593 if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C) {
db830c46 1594 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1595 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1596 mdev->state.conn <= C_PAUSED_SYNC_T &&
db830c46 1597 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
b411b363 1598 P_RS_WRITE_ACK : P_WRITE_ACK;
db830c46 1599 ok &= drbd_send_ack(mdev, pcmd, peer_req);
b411b363 1600 if (pcmd == P_RS_WRITE_ACK)
db830c46 1601 drbd_set_in_sync(mdev, sector, peer_req->i.size);
b411b363 1602 } else {
db830c46 1603 ok = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
b411b363
PR
1604 /* we expect it to be marked out of sync anyways...
1605 * maybe assert this? */
1606 }
1607 dec_unacked(mdev);
1608 }
1609 /* we delete from the conflict detection hash _after_ we sent out the
1610 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
89e58e75 1611 if (mdev->tconn->net_conf->two_primaries) {
87eeee41 1612 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
1613 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1614 drbd_remove_epoch_entry_interval(mdev, peer_req);
87eeee41 1615 spin_unlock_irq(&mdev->tconn->req_lock);
bb3bfe96 1616 } else
db830c46 1617 D_ASSERT(drbd_interval_empty(&peer_req->i));
b411b363 1618
db830c46 1619 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
b411b363
PR
1620
1621 return ok;
1622}
1623
1624static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1625{
db830c46 1626 struct drbd_peer_request *peer_req = (struct drbd_peer_request *)w;
b411b363
PR
1627 int ok = 1;
1628
89e58e75 1629 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
db830c46 1630 ok = drbd_send_ack(mdev, P_DISCARD_ACK, peer_req);
b411b363 1631
87eeee41 1632 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
1633 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1634 drbd_remove_epoch_entry_interval(mdev, peer_req);
87eeee41 1635 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
1636
1637 dec_unacked(mdev);
1638
1639 return ok;
1640}
1641
3e394da1
AG
1642static bool seq_greater(u32 a, u32 b)
1643{
1644 /*
1645 * We assume 32-bit wrap-around here.
1646 * For 24-bit wrap-around, we would have to shift:
1647 * a <<= 8; b <<= 8;
1648 */
1649 return (s32)a - (s32)b > 0;
1650}
1651
1652static u32 seq_max(u32 a, u32 b)
1653{
1654 return seq_greater(a, b) ? a : b;
1655}
1656
43ae077d 1657static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
3e394da1 1658{
43ae077d 1659 unsigned int old_peer_seq;
3e394da1
AG
1660
1661 spin_lock(&mdev->peer_seq_lock);
43ae077d
AG
1662 old_peer_seq = mdev->peer_seq;
1663 mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
3e394da1 1664 spin_unlock(&mdev->peer_seq_lock);
43ae077d 1665 if (old_peer_seq != peer_seq)
3e394da1
AG
1666 wake_up(&mdev->seq_wait);
1667}
1668
b411b363
PR
1669/* Called from receive_Data.
1670 * Synchronize packets on sock with packets on msock.
1671 *
1672 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1673 * packet traveling on msock, they are still processed in the order they have
1674 * been sent.
1675 *
1676 * Note: we don't care for Ack packets overtaking P_DATA packets.
1677 *
1678 * In case packet_seq is larger than mdev->peer_seq number, there are
1679 * outstanding packets on the msock. We wait for them to arrive.
1680 * In case we are the logically next packet, we update mdev->peer_seq
1681 * ourselves. Correctly handles 32bit wrap around.
1682 *
1683 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1684 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1685 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1686 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1687 *
1688 * returns 0 if we may process the packet,
1689 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1690static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1691{
1692 DEFINE_WAIT(wait);
1693 unsigned int p_seq;
1694 long timeout;
1695 int ret = 0;
1696 spin_lock(&mdev->peer_seq_lock);
1697 for (;;) {
1698 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
3e394da1 1699 if (!seq_greater(packet_seq, mdev->peer_seq + 1))
b411b363
PR
1700 break;
1701 if (signal_pending(current)) {
1702 ret = -ERESTARTSYS;
1703 break;
1704 }
1705 p_seq = mdev->peer_seq;
1706 spin_unlock(&mdev->peer_seq_lock);
1707 timeout = schedule_timeout(30*HZ);
1708 spin_lock(&mdev->peer_seq_lock);
1709 if (timeout == 0 && p_seq == mdev->peer_seq) {
1710 ret = -ETIMEDOUT;
1711 dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1712 break;
1713 }
1714 }
1715 finish_wait(&mdev->seq_wait, &wait);
1716 if (mdev->peer_seq+1 == packet_seq)
1717 mdev->peer_seq++;
1718 spin_unlock(&mdev->peer_seq_lock);
1719 return ret;
1720}
1721
688593c5
LE
1722/* see also bio_flags_to_wire()
1723 * DRBD_REQ_*, because we need to semantically map the flags to data packet
1724 * flags and back. We may replicate to other kernel versions. */
1725static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
76d2e7ec 1726{
688593c5
LE
1727 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1728 (dpf & DP_FUA ? REQ_FUA : 0) |
1729 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1730 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
76d2e7ec
PR
1731}
1732
b411b363 1733/* mirrored write */
d8763023
AG
1734static int receive_Data(struct drbd_conf *mdev, enum drbd_packet cmd,
1735 unsigned int data_size)
b411b363
PR
1736{
1737 sector_t sector;
db830c46 1738 struct drbd_peer_request *peer_req;
e42325a5 1739 struct p_data *p = &mdev->tconn->data.rbuf.data;
b411b363
PR
1740 int rw = WRITE;
1741 u32 dp_flags;
1742
b411b363 1743 if (!get_ldev(mdev)) {
b411b363
PR
1744 spin_lock(&mdev->peer_seq_lock);
1745 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1746 mdev->peer_seq++;
1747 spin_unlock(&mdev->peer_seq_lock);
1748
2b2bf214 1749 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
b411b363
PR
1750 atomic_inc(&mdev->current_epoch->epoch_size);
1751 return drbd_drain_block(mdev, data_size);
1752 }
1753
1754 /* get_ldev(mdev) successful.
1755 * Corresponding put_ldev done either below (on various errors),
9c50842a 1756 * or in drbd_endio_sec, if we successfully submit the data at
b411b363
PR
1757 * the end of this function. */
1758
1759 sector = be64_to_cpu(p->sector);
db830c46
AG
1760 peer_req = read_in_block(mdev, p->block_id, sector, data_size);
1761 if (!peer_req) {
b411b363 1762 put_ldev(mdev);
81e84650 1763 return false;
b411b363
PR
1764 }
1765
db830c46 1766 peer_req->w.cb = e_end_block;
b411b363 1767
688593c5
LE
1768 dp_flags = be32_to_cpu(p->dp_flags);
1769 rw |= wire_flags_to_bio(mdev, dp_flags);
1770
1771 if (dp_flags & DP_MAY_SET_IN_SYNC)
db830c46 1772 peer_req->flags |= EE_MAY_SET_IN_SYNC;
688593c5 1773
b411b363 1774 spin_lock(&mdev->epoch_lock);
db830c46
AG
1775 peer_req->epoch = mdev->current_epoch;
1776 atomic_inc(&peer_req->epoch->epoch_size);
1777 atomic_inc(&peer_req->epoch->active);
b411b363
PR
1778 spin_unlock(&mdev->epoch_lock);
1779
b411b363 1780 /* I'm the receiver, I do hold a net_cnt reference. */
89e58e75 1781 if (!mdev->tconn->net_conf->two_primaries) {
87eeee41 1782 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
1783 } else {
1784 /* don't get the req_lock yet,
1785 * we may sleep in drbd_wait_peer_seq */
db830c46 1786 const int size = peer_req->i.size;
25703f83 1787 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
b411b363 1788 DEFINE_WAIT(wait);
b411b363
PR
1789 int first;
1790
89e58e75 1791 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
b411b363
PR
1792
1793 /* conflict detection and handling:
1794 * 1. wait on the sequence number,
1795 * in case this data packet overtook ACK packets.
5e472264 1796 * 2. check for conflicting write requests.
b411b363
PR
1797 *
1798 * Note: for two_primaries, we are protocol C,
1799 * so there cannot be any request that is DONE
1800 * but still on the transfer log.
1801 *
b411b363
PR
1802 * if no conflicting request is found:
1803 * submit.
1804 *
1805 * if any conflicting request is found
1806 * that has not yet been acked,
1807 * AND I have the "discard concurrent writes" flag:
1808 * queue (via done_ee) the P_DISCARD_ACK; OUT.
1809 *
1810 * if any conflicting request is found:
1811 * block the receiver, waiting on misc_wait
1812 * until no more conflicting requests are there,
1813 * or we get interrupted (disconnect).
1814 *
1815 * we do not just write after local io completion of those
1816 * requests, but only after req is done completely, i.e.
1817 * we wait for the P_DISCARD_ACK to arrive!
1818 *
1819 * then proceed normally, i.e. submit.
1820 */
1821 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1822 goto out_interrupted;
1823
87eeee41 1824 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 1825
b411b363
PR
1826 first = 1;
1827 for (;;) {
de696716 1828 struct drbd_interval *i;
b411b363
PR
1829 int have_unacked = 0;
1830 int have_conflict = 0;
1831 prepare_to_wait(&mdev->misc_wait, &wait,
1832 TASK_INTERRUPTIBLE);
de696716
AG
1833
1834 i = drbd_find_overlap(&mdev->write_requests, sector, size);
1835 if (i) {
de696716
AG
1836 /* only ALERT on first iteration,
1837 * we may be woken up early... */
1838 if (first)
5e472264 1839 dev_alert(DEV, "%s[%u] Concurrent %s write detected!"
de696716
AG
1840 " new: %llus +%u; pending: %llus +%u\n",
1841 current->comm, current->pid,
5e472264 1842 i->local ? "local" : "remote",
de696716 1843 (unsigned long long)sector, size,
5e472264
AG
1844 (unsigned long long)i->sector, i->size);
1845
1846 if (i->local) {
1847 struct drbd_request *req2;
1848
1849 req2 = container_of(i, struct drbd_request, i);
1850 if (req2->rq_state & RQ_NET_PENDING)
1851 ++have_unacked;
1852 }
de696716 1853 ++have_conflict;
b411b363 1854 }
b411b363
PR
1855 if (!have_conflict)
1856 break;
1857
1858 /* Discard Ack only for the _first_ iteration */
1859 if (first && discard && have_unacked) {
1860 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1861 (unsigned long long)sector);
1862 inc_unacked(mdev);
db830c46
AG
1863 peer_req->w.cb = e_send_discard_ack;
1864 list_add_tail(&peer_req->w.list, &mdev->done_ee);
b411b363 1865
87eeee41 1866 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
1867
1868 /* we could probably send that P_DISCARD_ACK ourselves,
1869 * but I don't like the receiver using the msock */
1870
1871 put_ldev(mdev);
0625ac19 1872 wake_asender(mdev->tconn);
b411b363 1873 finish_wait(&mdev->misc_wait, &wait);
81e84650 1874 return true;
b411b363
PR
1875 }
1876
1877 if (signal_pending(current)) {
87eeee41 1878 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
1879 finish_wait(&mdev->misc_wait, &wait);
1880 goto out_interrupted;
1881 }
1882
a500c2ef 1883 /* Indicate to wake up mdev->misc_wait upon completion. */
53840641 1884 i->waiting = true;
a500c2ef 1885
87eeee41 1886 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
1887 if (first) {
1888 first = 0;
1889 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1890 "sec=%llus\n", (unsigned long long)sector);
1891 } else if (discard) {
1892 /* we had none on the first iteration.
1893 * there must be none now. */
1894 D_ASSERT(have_unacked == 0);
1895 }
1896 schedule();
87eeee41 1897 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
1898 }
1899 finish_wait(&mdev->misc_wait, &wait);
5e472264 1900
db830c46 1901 drbd_insert_interval(&mdev->write_requests, &peer_req->i);
b411b363
PR
1902 }
1903
db830c46 1904 list_add(&peer_req->w.list, &mdev->active_ee);
87eeee41 1905 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 1906
89e58e75 1907 switch (mdev->tconn->net_conf->wire_protocol) {
b411b363
PR
1908 case DRBD_PROT_C:
1909 inc_unacked(mdev);
1910 /* corresponding dec_unacked() in e_end_block()
1911 * respective _drbd_clear_done_ee */
1912 break;
1913 case DRBD_PROT_B:
1914 /* I really don't like it that the receiver thread
1915 * sends on the msock, but anyways */
db830c46 1916 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
b411b363
PR
1917 break;
1918 case DRBD_PROT_A:
1919 /* nothing to do */
1920 break;
1921 }
1922
6719fb03 1923 if (mdev->state.pdsk < D_INCONSISTENT) {
b411b363 1924 /* In case we have the only disk of the cluster, */
db830c46
AG
1925 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
1926 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
1927 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
1928 drbd_al_begin_io(mdev, peer_req->i.sector);
b411b363
PR
1929 }
1930
db830c46 1931 if (drbd_submit_ee(mdev, peer_req, rw, DRBD_FAULT_DT_WR) == 0)
81e84650 1932 return true;
b411b363 1933
10f6d992
LE
1934 /* don't care for the reason here */
1935 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 1936 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
1937 list_del(&peer_req->w.list);
1938 drbd_remove_epoch_entry_interval(mdev, peer_req);
87eeee41 1939 spin_unlock_irq(&mdev->tconn->req_lock);
db830c46
AG
1940 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
1941 drbd_al_complete_io(mdev, peer_req->i.sector);
22cc37a9 1942
b411b363 1943out_interrupted:
db830c46 1944 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
b411b363 1945 put_ldev(mdev);
db830c46 1946 drbd_free_ee(mdev, peer_req);
81e84650 1947 return false;
b411b363
PR
1948}
1949
0f0601f4
LE
1950/* We may throttle resync, if the lower device seems to be busy,
1951 * and current sync rate is above c_min_rate.
1952 *
1953 * To decide whether or not the lower device is busy, we use a scheme similar
1954 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
1955 * (more than 64 sectors) of activity we cannot account for with our own resync
1956 * activity, it obviously is "busy".
1957 *
1958 * The current sync rate used here uses only the most recent two step marks,
1959 * to have a short time average so we can react faster.
1960 */
e3555d85 1961int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
0f0601f4
LE
1962{
1963 struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
1964 unsigned long db, dt, dbdt;
e3555d85 1965 struct lc_element *tmp;
0f0601f4
LE
1966 int curr_events;
1967 int throttle = 0;
1968
1969 /* feature disabled? */
1970 if (mdev->sync_conf.c_min_rate == 0)
1971 return 0;
1972
e3555d85
PR
1973 spin_lock_irq(&mdev->al_lock);
1974 tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
1975 if (tmp) {
1976 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
1977 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
1978 spin_unlock_irq(&mdev->al_lock);
1979 return 0;
1980 }
1981 /* Do not slow down if app IO is already waiting for this extent */
1982 }
1983 spin_unlock_irq(&mdev->al_lock);
1984
0f0601f4
LE
1985 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
1986 (int)part_stat_read(&disk->part0, sectors[1]) -
1987 atomic_read(&mdev->rs_sect_ev);
e3555d85 1988
0f0601f4
LE
1989 if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
1990 unsigned long rs_left;
1991 int i;
1992
1993 mdev->rs_last_events = curr_events;
1994
1995 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
1996 * approx. */
2649f080
LE
1997 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
1998
1999 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2000 rs_left = mdev->ov_left;
2001 else
2002 rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
0f0601f4
LE
2003
2004 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2005 if (!dt)
2006 dt++;
2007 db = mdev->rs_mark_left[i] - rs_left;
2008 dbdt = Bit2KB(db/dt);
2009
2010 if (dbdt > mdev->sync_conf.c_min_rate)
2011 throttle = 1;
2012 }
2013 return throttle;
2014}
2015
2016
d8763023
AG
2017static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packet cmd,
2018 unsigned int digest_size)
b411b363
PR
2019{
2020 sector_t sector;
2021 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
db830c46 2022 struct drbd_peer_request *peer_req;
b411b363 2023 struct digest_info *di = NULL;
b18b37be 2024 int size, verb;
b411b363 2025 unsigned int fault_type;
e42325a5 2026 struct p_block_req *p = &mdev->tconn->data.rbuf.block_req;
b411b363
PR
2027
2028 sector = be64_to_cpu(p->sector);
2029 size = be32_to_cpu(p->blksize);
2030
1816a2b4 2031 if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_BIO_SIZE) {
b411b363
PR
2032 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2033 (unsigned long long)sector, size);
81e84650 2034 return false;
b411b363
PR
2035 }
2036 if (sector + (size>>9) > capacity) {
2037 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2038 (unsigned long long)sector, size);
81e84650 2039 return false;
b411b363
PR
2040 }
2041
2042 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
b18b37be
PR
2043 verb = 1;
2044 switch (cmd) {
2045 case P_DATA_REQUEST:
2046 drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2047 break;
2048 case P_RS_DATA_REQUEST:
2049 case P_CSUM_RS_REQUEST:
2050 case P_OV_REQUEST:
2051 drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2052 break;
2053 case P_OV_REPLY:
2054 verb = 0;
2055 dec_rs_pending(mdev);
2056 drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2057 break;
2058 default:
2059 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2060 cmdname(cmd));
2061 }
2062 if (verb && __ratelimit(&drbd_ratelimit_state))
b411b363
PR
2063 dev_err(DEV, "Can not satisfy peer's read request, "
2064 "no local data.\n");
b18b37be 2065
a821cc4a
LE
2066 /* drain possibly payload */
2067 return drbd_drain_block(mdev, digest_size);
b411b363
PR
2068 }
2069
2070 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2071 * "criss-cross" setup, that might cause write-out on some other DRBD,
2072 * which in turn might block on the other node at this very place. */
db830c46
AG
2073 peer_req = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2074 if (!peer_req) {
b411b363 2075 put_ldev(mdev);
81e84650 2076 return false;
b411b363
PR
2077 }
2078
02918be2 2079 switch (cmd) {
b411b363 2080 case P_DATA_REQUEST:
db830c46 2081 peer_req->w.cb = w_e_end_data_req;
b411b363 2082 fault_type = DRBD_FAULT_DT_RD;
80a40e43
LE
2083 /* application IO, don't drbd_rs_begin_io */
2084 goto submit;
2085
b411b363 2086 case P_RS_DATA_REQUEST:
db830c46 2087 peer_req->w.cb = w_e_end_rsdata_req;
b411b363 2088 fault_type = DRBD_FAULT_RS_RD;
5f9915bb
LE
2089 /* used in the sector offset progress display */
2090 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
b411b363
PR
2091 break;
2092
2093 case P_OV_REPLY:
2094 case P_CSUM_RS_REQUEST:
2095 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2096 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2097 if (!di)
2098 goto out_free_e;
2099
2100 di->digest_size = digest_size;
2101 di->digest = (((char *)di)+sizeof(struct digest_info));
2102
db830c46
AG
2103 peer_req->digest = di;
2104 peer_req->flags |= EE_HAS_DIGEST;
c36c3ced 2105
de0ff338 2106 if (drbd_recv(mdev->tconn, di->digest, digest_size) != digest_size)
b411b363
PR
2107 goto out_free_e;
2108
02918be2 2109 if (cmd == P_CSUM_RS_REQUEST) {
31890f4a 2110 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
db830c46 2111 peer_req->w.cb = w_e_end_csum_rs_req;
5f9915bb
LE
2112 /* used in the sector offset progress display */
2113 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
02918be2 2114 } else if (cmd == P_OV_REPLY) {
2649f080
LE
2115 /* track progress, we may need to throttle */
2116 atomic_add(size >> 9, &mdev->rs_sect_in);
db830c46 2117 peer_req->w.cb = w_e_end_ov_reply;
b411b363 2118 dec_rs_pending(mdev);
0f0601f4
LE
2119 /* drbd_rs_begin_io done when we sent this request,
2120 * but accounting still needs to be done. */
2121 goto submit_for_resync;
b411b363
PR
2122 }
2123 break;
2124
2125 case P_OV_REQUEST:
b411b363 2126 if (mdev->ov_start_sector == ~(sector_t)0 &&
31890f4a 2127 mdev->tconn->agreed_pro_version >= 90) {
de228bba
LE
2128 unsigned long now = jiffies;
2129 int i;
b411b363
PR
2130 mdev->ov_start_sector = sector;
2131 mdev->ov_position = sector;
30b743a2
LE
2132 mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2133 mdev->rs_total = mdev->ov_left;
de228bba
LE
2134 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2135 mdev->rs_mark_left[i] = mdev->ov_left;
2136 mdev->rs_mark_time[i] = now;
2137 }
b411b363
PR
2138 dev_info(DEV, "Online Verify start sector: %llu\n",
2139 (unsigned long long)sector);
2140 }
db830c46 2141 peer_req->w.cb = w_e_end_ov_req;
b411b363 2142 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2143 break;
2144
b411b363
PR
2145 default:
2146 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
02918be2 2147 cmdname(cmd));
b411b363 2148 fault_type = DRBD_FAULT_MAX;
80a40e43 2149 goto out_free_e;
b411b363
PR
2150 }
2151
0f0601f4
LE
2152 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2153 * wrt the receiver, but it is not as straightforward as it may seem.
2154 * Various places in the resync start and stop logic assume resync
2155 * requests are processed in order, requeuing this on the worker thread
2156 * introduces a bunch of new code for synchronization between threads.
2157 *
2158 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2159 * "forever", throttling after drbd_rs_begin_io will lock that extent
2160 * for application writes for the same time. For now, just throttle
2161 * here, where the rest of the code expects the receiver to sleep for
2162 * a while, anyways.
2163 */
2164
2165 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2166 * this defers syncer requests for some time, before letting at least
2167 * on request through. The resync controller on the receiving side
2168 * will adapt to the incoming rate accordingly.
2169 *
2170 * We cannot throttle here if remote is Primary/SyncTarget:
2171 * we would also throttle its application reads.
2172 * In that case, throttling is done on the SyncTarget only.
2173 */
e3555d85
PR
2174 if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2175 schedule_timeout_uninterruptible(HZ/10);
2176 if (drbd_rs_begin_io(mdev, sector))
80a40e43 2177 goto out_free_e;
b411b363 2178
0f0601f4
LE
2179submit_for_resync:
2180 atomic_add(size >> 9, &mdev->rs_sect_ev);
2181
80a40e43 2182submit:
b411b363 2183 inc_unacked(mdev);
87eeee41 2184 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2185 list_add_tail(&peer_req->w.list, &mdev->read_ee);
87eeee41 2186 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 2187
db830c46 2188 if (drbd_submit_ee(mdev, peer_req, READ, fault_type) == 0)
81e84650 2189 return true;
b411b363 2190
10f6d992
LE
2191 /* don't care for the reason here */
2192 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 2193 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2194 list_del(&peer_req->w.list);
87eeee41 2195 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9
LE
2196 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2197
b411b363 2198out_free_e:
b411b363 2199 put_ldev(mdev);
db830c46 2200 drbd_free_ee(mdev, peer_req);
81e84650 2201 return false;
b411b363
PR
2202}
2203
2204static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2205{
2206 int self, peer, rv = -100;
2207 unsigned long ch_self, ch_peer;
2208
2209 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2210 peer = mdev->p_uuid[UI_BITMAP] & 1;
2211
2212 ch_peer = mdev->p_uuid[UI_SIZE];
2213 ch_self = mdev->comm_bm_set;
2214
89e58e75 2215 switch (mdev->tconn->net_conf->after_sb_0p) {
b411b363
PR
2216 case ASB_CONSENSUS:
2217 case ASB_DISCARD_SECONDARY:
2218 case ASB_CALL_HELPER:
2219 dev_err(DEV, "Configuration error.\n");
2220 break;
2221 case ASB_DISCONNECT:
2222 break;
2223 case ASB_DISCARD_YOUNGER_PRI:
2224 if (self == 0 && peer == 1) {
2225 rv = -1;
2226 break;
2227 }
2228 if (self == 1 && peer == 0) {
2229 rv = 1;
2230 break;
2231 }
2232 /* Else fall through to one of the other strategies... */
2233 case ASB_DISCARD_OLDER_PRI:
2234 if (self == 0 && peer == 1) {
2235 rv = 1;
2236 break;
2237 }
2238 if (self == 1 && peer == 0) {
2239 rv = -1;
2240 break;
2241 }
2242 /* Else fall through to one of the other strategies... */
ad19bf6e 2243 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
b411b363
PR
2244 "Using discard-least-changes instead\n");
2245 case ASB_DISCARD_ZERO_CHG:
2246 if (ch_peer == 0 && ch_self == 0) {
25703f83 2247 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
b411b363
PR
2248 ? -1 : 1;
2249 break;
2250 } else {
2251 if (ch_peer == 0) { rv = 1; break; }
2252 if (ch_self == 0) { rv = -1; break; }
2253 }
89e58e75 2254 if (mdev->tconn->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
b411b363
PR
2255 break;
2256 case ASB_DISCARD_LEAST_CHG:
2257 if (ch_self < ch_peer)
2258 rv = -1;
2259 else if (ch_self > ch_peer)
2260 rv = 1;
2261 else /* ( ch_self == ch_peer ) */
2262 /* Well, then use something else. */
25703f83 2263 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
b411b363
PR
2264 ? -1 : 1;
2265 break;
2266 case ASB_DISCARD_LOCAL:
2267 rv = -1;
2268 break;
2269 case ASB_DISCARD_REMOTE:
2270 rv = 1;
2271 }
2272
2273 return rv;
2274}
2275
2276static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2277{
6184ea21 2278 int hg, rv = -100;
b411b363 2279
89e58e75 2280 switch (mdev->tconn->net_conf->after_sb_1p) {
b411b363
PR
2281 case ASB_DISCARD_YOUNGER_PRI:
2282 case ASB_DISCARD_OLDER_PRI:
2283 case ASB_DISCARD_LEAST_CHG:
2284 case ASB_DISCARD_LOCAL:
2285 case ASB_DISCARD_REMOTE:
2286 dev_err(DEV, "Configuration error.\n");
2287 break;
2288 case ASB_DISCONNECT:
2289 break;
2290 case ASB_CONSENSUS:
2291 hg = drbd_asb_recover_0p(mdev);
2292 if (hg == -1 && mdev->state.role == R_SECONDARY)
2293 rv = hg;
2294 if (hg == 1 && mdev->state.role == R_PRIMARY)
2295 rv = hg;
2296 break;
2297 case ASB_VIOLENTLY:
2298 rv = drbd_asb_recover_0p(mdev);
2299 break;
2300 case ASB_DISCARD_SECONDARY:
2301 return mdev->state.role == R_PRIMARY ? 1 : -1;
2302 case ASB_CALL_HELPER:
2303 hg = drbd_asb_recover_0p(mdev);
2304 if (hg == -1 && mdev->state.role == R_PRIMARY) {
bb437946
AG
2305 enum drbd_state_rv rv2;
2306
2307 drbd_set_role(mdev, R_SECONDARY, 0);
b411b363
PR
2308 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2309 * we might be here in C_WF_REPORT_PARAMS which is transient.
2310 * we do not need to wait for the after state change work either. */
bb437946
AG
2311 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2312 if (rv2 != SS_SUCCESS) {
b411b363
PR
2313 drbd_khelper(mdev, "pri-lost-after-sb");
2314 } else {
2315 dev_warn(DEV, "Successfully gave up primary role.\n");
2316 rv = hg;
2317 }
2318 } else
2319 rv = hg;
2320 }
2321
2322 return rv;
2323}
2324
2325static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2326{
6184ea21 2327 int hg, rv = -100;
b411b363 2328
89e58e75 2329 switch (mdev->tconn->net_conf->after_sb_2p) {
b411b363
PR
2330 case ASB_DISCARD_YOUNGER_PRI:
2331 case ASB_DISCARD_OLDER_PRI:
2332 case ASB_DISCARD_LEAST_CHG:
2333 case ASB_DISCARD_LOCAL:
2334 case ASB_DISCARD_REMOTE:
2335 case ASB_CONSENSUS:
2336 case ASB_DISCARD_SECONDARY:
2337 dev_err(DEV, "Configuration error.\n");
2338 break;
2339 case ASB_VIOLENTLY:
2340 rv = drbd_asb_recover_0p(mdev);
2341 break;
2342 case ASB_DISCONNECT:
2343 break;
2344 case ASB_CALL_HELPER:
2345 hg = drbd_asb_recover_0p(mdev);
2346 if (hg == -1) {
bb437946
AG
2347 enum drbd_state_rv rv2;
2348
b411b363
PR
2349 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2350 * we might be here in C_WF_REPORT_PARAMS which is transient.
2351 * we do not need to wait for the after state change work either. */
bb437946
AG
2352 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2353 if (rv2 != SS_SUCCESS) {
b411b363
PR
2354 drbd_khelper(mdev, "pri-lost-after-sb");
2355 } else {
2356 dev_warn(DEV, "Successfully gave up primary role.\n");
2357 rv = hg;
2358 }
2359 } else
2360 rv = hg;
2361 }
2362
2363 return rv;
2364}
2365
2366static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2367 u64 bits, u64 flags)
2368{
2369 if (!uuid) {
2370 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2371 return;
2372 }
2373 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2374 text,
2375 (unsigned long long)uuid[UI_CURRENT],
2376 (unsigned long long)uuid[UI_BITMAP],
2377 (unsigned long long)uuid[UI_HISTORY_START],
2378 (unsigned long long)uuid[UI_HISTORY_END],
2379 (unsigned long long)bits,
2380 (unsigned long long)flags);
2381}
2382
2383/*
2384 100 after split brain try auto recover
2385 2 C_SYNC_SOURCE set BitMap
2386 1 C_SYNC_SOURCE use BitMap
2387 0 no Sync
2388 -1 C_SYNC_TARGET use BitMap
2389 -2 C_SYNC_TARGET set BitMap
2390 -100 after split brain, disconnect
2391-1000 unrelated data
4a23f264
PR
2392-1091 requires proto 91
2393-1096 requires proto 96
b411b363
PR
2394 */
2395static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2396{
2397 u64 self, peer;
2398 int i, j;
2399
2400 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2401 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2402
2403 *rule_nr = 10;
2404 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2405 return 0;
2406
2407 *rule_nr = 20;
2408 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2409 peer != UUID_JUST_CREATED)
2410 return -2;
2411
2412 *rule_nr = 30;
2413 if (self != UUID_JUST_CREATED &&
2414 (peer == UUID_JUST_CREATED || peer == (u64)0))
2415 return 2;
2416
2417 if (self == peer) {
2418 int rct, dc; /* roles at crash time */
2419
2420 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2421
31890f4a 2422 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2423 return -1091;
b411b363
PR
2424
2425 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2426 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2427 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2428 drbd_uuid_set_bm(mdev, 0UL);
2429
2430 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2431 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2432 *rule_nr = 34;
2433 } else {
2434 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2435 *rule_nr = 36;
2436 }
2437
2438 return 1;
2439 }
2440
2441 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2442
31890f4a 2443 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2444 return -1091;
b411b363
PR
2445
2446 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2447 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2448 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2449
2450 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2451 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2452 mdev->p_uuid[UI_BITMAP] = 0UL;
2453
2454 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2455 *rule_nr = 35;
2456 } else {
2457 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2458 *rule_nr = 37;
2459 }
2460
2461 return -1;
2462 }
2463
2464 /* Common power [off|failure] */
2465 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2466 (mdev->p_uuid[UI_FLAGS] & 2);
2467 /* lowest bit is set when we were primary,
2468 * next bit (weight 2) is set when peer was primary */
2469 *rule_nr = 40;
2470
2471 switch (rct) {
2472 case 0: /* !self_pri && !peer_pri */ return 0;
2473 case 1: /* self_pri && !peer_pri */ return 1;
2474 case 2: /* !self_pri && peer_pri */ return -1;
2475 case 3: /* self_pri && peer_pri */
25703f83 2476 dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
b411b363
PR
2477 return dc ? -1 : 1;
2478 }
2479 }
2480
2481 *rule_nr = 50;
2482 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2483 if (self == peer)
2484 return -1;
2485
2486 *rule_nr = 51;
2487 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2488 if (self == peer) {
31890f4a 2489 if (mdev->tconn->agreed_pro_version < 96 ?
4a23f264
PR
2490 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2491 (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2492 peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2493 /* The last P_SYNC_UUID did not get though. Undo the last start of
2494 resync as sync source modifications of the peer's UUIDs. */
2495
31890f4a 2496 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2497 return -1091;
b411b363
PR
2498
2499 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2500 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
4a23f264
PR
2501
2502 dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2503 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2504
b411b363
PR
2505 return -1;
2506 }
2507 }
2508
2509 *rule_nr = 60;
2510 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2511 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2512 peer = mdev->p_uuid[i] & ~((u64)1);
2513 if (self == peer)
2514 return -2;
2515 }
2516
2517 *rule_nr = 70;
2518 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2519 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2520 if (self == peer)
2521 return 1;
2522
2523 *rule_nr = 71;
2524 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2525 if (self == peer) {
31890f4a 2526 if (mdev->tconn->agreed_pro_version < 96 ?
4a23f264
PR
2527 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2528 (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2529 self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2530 /* The last P_SYNC_UUID did not get though. Undo the last start of
2531 resync as sync source modifications of our UUIDs. */
2532
31890f4a 2533 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2534 return -1091;
b411b363
PR
2535
2536 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2537 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2538
4a23f264 2539 dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
b411b363
PR
2540 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2541 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2542
2543 return 1;
2544 }
2545 }
2546
2547
2548 *rule_nr = 80;
d8c2a36b 2549 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
b411b363
PR
2550 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2551 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2552 if (self == peer)
2553 return 2;
2554 }
2555
2556 *rule_nr = 90;
2557 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2558 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2559 if (self == peer && self != ((u64)0))
2560 return 100;
2561
2562 *rule_nr = 100;
2563 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2564 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2565 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2566 peer = mdev->p_uuid[j] & ~((u64)1);
2567 if (self == peer)
2568 return -100;
2569 }
2570 }
2571
2572 return -1000;
2573}
2574
2575/* drbd_sync_handshake() returns the new conn state on success, or
2576 CONN_MASK (-1) on failure.
2577 */
2578static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2579 enum drbd_disk_state peer_disk) __must_hold(local)
2580{
2581 int hg, rule_nr;
2582 enum drbd_conns rv = C_MASK;
2583 enum drbd_disk_state mydisk;
2584
2585 mydisk = mdev->state.disk;
2586 if (mydisk == D_NEGOTIATING)
2587 mydisk = mdev->new_state_tmp.disk;
2588
2589 dev_info(DEV, "drbd_sync_handshake:\n");
2590 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2591 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2592 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2593
2594 hg = drbd_uuid_compare(mdev, &rule_nr);
2595
2596 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2597
2598 if (hg == -1000) {
2599 dev_alert(DEV, "Unrelated data, aborting!\n");
2600 return C_MASK;
2601 }
4a23f264
PR
2602 if (hg < -1000) {
2603 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
b411b363
PR
2604 return C_MASK;
2605 }
2606
2607 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2608 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2609 int f = (hg == -100) || abs(hg) == 2;
2610 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2611 if (f)
2612 hg = hg*2;
2613 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2614 hg > 0 ? "source" : "target");
2615 }
2616
3a11a487
AG
2617 if (abs(hg) == 100)
2618 drbd_khelper(mdev, "initial-split-brain");
2619
89e58e75 2620 if (hg == 100 || (hg == -100 && mdev->tconn->net_conf->always_asbp)) {
b411b363
PR
2621 int pcount = (mdev->state.role == R_PRIMARY)
2622 + (peer_role == R_PRIMARY);
2623 int forced = (hg == -100);
2624
2625 switch (pcount) {
2626 case 0:
2627 hg = drbd_asb_recover_0p(mdev);
2628 break;
2629 case 1:
2630 hg = drbd_asb_recover_1p(mdev);
2631 break;
2632 case 2:
2633 hg = drbd_asb_recover_2p(mdev);
2634 break;
2635 }
2636 if (abs(hg) < 100) {
2637 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2638 "automatically solved. Sync from %s node\n",
2639 pcount, (hg < 0) ? "peer" : "this");
2640 if (forced) {
2641 dev_warn(DEV, "Doing a full sync, since"
2642 " UUIDs where ambiguous.\n");
2643 hg = hg*2;
2644 }
2645 }
2646 }
2647
2648 if (hg == -100) {
89e58e75 2649 if (mdev->tconn->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
b411b363 2650 hg = -1;
89e58e75 2651 if (!mdev->tconn->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
b411b363
PR
2652 hg = 1;
2653
2654 if (abs(hg) < 100)
2655 dev_warn(DEV, "Split-Brain detected, manually solved. "
2656 "Sync from %s node\n",
2657 (hg < 0) ? "peer" : "this");
2658 }
2659
2660 if (hg == -100) {
580b9767
LE
2661 /* FIXME this log message is not correct if we end up here
2662 * after an attempted attach on a diskless node.
2663 * We just refuse to attach -- well, we drop the "connection"
2664 * to that disk, in a way... */
3a11a487 2665 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
b411b363
PR
2666 drbd_khelper(mdev, "split-brain");
2667 return C_MASK;
2668 }
2669
2670 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2671 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2672 return C_MASK;
2673 }
2674
2675 if (hg < 0 && /* by intention we do not use mydisk here. */
2676 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
89e58e75 2677 switch (mdev->tconn->net_conf->rr_conflict) {
b411b363
PR
2678 case ASB_CALL_HELPER:
2679 drbd_khelper(mdev, "pri-lost");
2680 /* fall through */
2681 case ASB_DISCONNECT:
2682 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2683 return C_MASK;
2684 case ASB_VIOLENTLY:
2685 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2686 "assumption\n");
2687 }
2688 }
2689
89e58e75 2690 if (mdev->tconn->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
cf14c2e9
PR
2691 if (hg == 0)
2692 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2693 else
2694 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2695 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2696 abs(hg) >= 2 ? "full" : "bit-map based");
2697 return C_MASK;
2698 }
2699
b411b363
PR
2700 if (abs(hg) >= 2) {
2701 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
20ceb2b2
LE
2702 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2703 BM_LOCKED_SET_ALLOWED))
b411b363
PR
2704 return C_MASK;
2705 }
2706
2707 if (hg > 0) { /* become sync source. */
2708 rv = C_WF_BITMAP_S;
2709 } else if (hg < 0) { /* become sync target */
2710 rv = C_WF_BITMAP_T;
2711 } else {
2712 rv = C_CONNECTED;
2713 if (drbd_bm_total_weight(mdev)) {
2714 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2715 drbd_bm_total_weight(mdev));
2716 }
2717 }
2718
2719 return rv;
2720}
2721
2722/* returns 1 if invalid */
2723static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2724{
2725 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2726 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2727 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2728 return 0;
2729
2730 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2731 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2732 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2733 return 1;
2734
2735 /* everything else is valid if they are equal on both sides. */
2736 if (peer == self)
2737 return 0;
2738
2739 /* everything es is invalid. */
2740 return 1;
2741}
2742
d8763023
AG
2743static int receive_protocol(struct drbd_conf *mdev, enum drbd_packet cmd,
2744 unsigned int data_size)
b411b363 2745{
e42325a5 2746 struct p_protocol *p = &mdev->tconn->data.rbuf.protocol;
b411b363 2747 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
cf14c2e9 2748 int p_want_lose, p_two_primaries, cf;
b411b363
PR
2749 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2750
b411b363
PR
2751 p_proto = be32_to_cpu(p->protocol);
2752 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2753 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2754 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
b411b363 2755 p_two_primaries = be32_to_cpu(p->two_primaries);
cf14c2e9
PR
2756 cf = be32_to_cpu(p->conn_flags);
2757 p_want_lose = cf & CF_WANT_LOSE;
2758
2759 clear_bit(CONN_DRY_RUN, &mdev->flags);
2760
2761 if (cf & CF_DRY_RUN)
2762 set_bit(CONN_DRY_RUN, &mdev->flags);
b411b363 2763
89e58e75 2764 if (p_proto != mdev->tconn->net_conf->wire_protocol) {
b411b363
PR
2765 dev_err(DEV, "incompatible communication protocols\n");
2766 goto disconnect;
2767 }
2768
89e58e75 2769 if (cmp_after_sb(p_after_sb_0p, mdev->tconn->net_conf->after_sb_0p)) {
b411b363
PR
2770 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2771 goto disconnect;
2772 }
2773
89e58e75 2774 if (cmp_after_sb(p_after_sb_1p, mdev->tconn->net_conf->after_sb_1p)) {
b411b363
PR
2775 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2776 goto disconnect;
2777 }
2778
89e58e75 2779 if (cmp_after_sb(p_after_sb_2p, mdev->tconn->net_conf->after_sb_2p)) {
b411b363
PR
2780 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2781 goto disconnect;
2782 }
2783
89e58e75 2784 if (p_want_lose && mdev->tconn->net_conf->want_lose) {
b411b363
PR
2785 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2786 goto disconnect;
2787 }
2788
89e58e75 2789 if (p_two_primaries != mdev->tconn->net_conf->two_primaries) {
b411b363
PR
2790 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2791 goto disconnect;
2792 }
2793
31890f4a 2794 if (mdev->tconn->agreed_pro_version >= 87) {
89e58e75 2795 unsigned char *my_alg = mdev->tconn->net_conf->integrity_alg;
b411b363 2796
de0ff338 2797 if (drbd_recv(mdev->tconn, p_integrity_alg, data_size) != data_size)
81e84650 2798 return false;
b411b363
PR
2799
2800 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2801 if (strcmp(p_integrity_alg, my_alg)) {
2802 dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2803 goto disconnect;
2804 }
2805 dev_info(DEV, "data-integrity-alg: %s\n",
2806 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2807 }
2808
81e84650 2809 return true;
b411b363
PR
2810
2811disconnect:
2812 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 2813 return false;
b411b363
PR
2814}
2815
2816/* helper function
2817 * input: alg name, feature name
2818 * return: NULL (alg name was "")
2819 * ERR_PTR(error) if something goes wrong
2820 * or the crypto hash ptr, if it worked out ok. */
2821struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2822 const char *alg, const char *name)
2823{
2824 struct crypto_hash *tfm;
2825
2826 if (!alg[0])
2827 return NULL;
2828
2829 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2830 if (IS_ERR(tfm)) {
2831 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2832 alg, name, PTR_ERR(tfm));
2833 return tfm;
2834 }
2835 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2836 crypto_free_hash(tfm);
2837 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2838 return ERR_PTR(-EINVAL);
2839 }
2840 return tfm;
2841}
2842
d8763023
AG
2843static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packet cmd,
2844 unsigned int packet_size)
b411b363 2845{
81e84650 2846 int ok = true;
e42325a5 2847 struct p_rs_param_95 *p = &mdev->tconn->data.rbuf.rs_param_95;
b411b363
PR
2848 unsigned int header_size, data_size, exp_max_sz;
2849 struct crypto_hash *verify_tfm = NULL;
2850 struct crypto_hash *csums_tfm = NULL;
31890f4a 2851 const int apv = mdev->tconn->agreed_pro_version;
778f271d
PR
2852 int *rs_plan_s = NULL;
2853 int fifo_size = 0;
b411b363
PR
2854
2855 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
2856 : apv == 88 ? sizeof(struct p_rs_param)
2857 + SHARED_SECRET_MAX
8e26f9cc
PR
2858 : apv <= 94 ? sizeof(struct p_rs_param_89)
2859 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
b411b363 2860
02918be2 2861 if (packet_size > exp_max_sz) {
b411b363 2862 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
02918be2 2863 packet_size, exp_max_sz);
81e84650 2864 return false;
b411b363
PR
2865 }
2866
2867 if (apv <= 88) {
257d0af6 2868 header_size = sizeof(struct p_rs_param) - sizeof(struct p_header);
02918be2 2869 data_size = packet_size - header_size;
8e26f9cc 2870 } else if (apv <= 94) {
257d0af6 2871 header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header);
02918be2 2872 data_size = packet_size - header_size;
b411b363 2873 D_ASSERT(data_size == 0);
8e26f9cc 2874 } else {
257d0af6 2875 header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header);
02918be2 2876 data_size = packet_size - header_size;
b411b363
PR
2877 D_ASSERT(data_size == 0);
2878 }
2879
2880 /* initialize verify_alg and csums_alg */
2881 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2882
de0ff338 2883 if (drbd_recv(mdev->tconn, &p->head.payload, header_size) != header_size)
81e84650 2884 return false;
b411b363
PR
2885
2886 mdev->sync_conf.rate = be32_to_cpu(p->rate);
2887
2888 if (apv >= 88) {
2889 if (apv == 88) {
2890 if (data_size > SHARED_SECRET_MAX) {
2891 dev_err(DEV, "verify-alg too long, "
2892 "peer wants %u, accepting only %u byte\n",
2893 data_size, SHARED_SECRET_MAX);
81e84650 2894 return false;
b411b363
PR
2895 }
2896
de0ff338 2897 if (drbd_recv(mdev->tconn, p->verify_alg, data_size) != data_size)
81e84650 2898 return false;
b411b363
PR
2899
2900 /* we expect NUL terminated string */
2901 /* but just in case someone tries to be evil */
2902 D_ASSERT(p->verify_alg[data_size-1] == 0);
2903 p->verify_alg[data_size-1] = 0;
2904
2905 } else /* apv >= 89 */ {
2906 /* we still expect NUL terminated strings */
2907 /* but just in case someone tries to be evil */
2908 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2909 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2910 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2911 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2912 }
2913
2914 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2915 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2916 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2917 mdev->sync_conf.verify_alg, p->verify_alg);
2918 goto disconnect;
2919 }
2920 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2921 p->verify_alg, "verify-alg");
2922 if (IS_ERR(verify_tfm)) {
2923 verify_tfm = NULL;
2924 goto disconnect;
2925 }
2926 }
2927
2928 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2929 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2930 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2931 mdev->sync_conf.csums_alg, p->csums_alg);
2932 goto disconnect;
2933 }
2934 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2935 p->csums_alg, "csums-alg");
2936 if (IS_ERR(csums_tfm)) {
2937 csums_tfm = NULL;
2938 goto disconnect;
2939 }
2940 }
2941
8e26f9cc
PR
2942 if (apv > 94) {
2943 mdev->sync_conf.rate = be32_to_cpu(p->rate);
2944 mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
2945 mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
2946 mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
2947 mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
778f271d
PR
2948
2949 fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
2950 if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
2951 rs_plan_s = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
2952 if (!rs_plan_s) {
2953 dev_err(DEV, "kmalloc of fifo_buffer failed");
2954 goto disconnect;
2955 }
2956 }
8e26f9cc 2957 }
b411b363
PR
2958
2959 spin_lock(&mdev->peer_seq_lock);
2960 /* lock against drbd_nl_syncer_conf() */
2961 if (verify_tfm) {
2962 strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2963 mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2964 crypto_free_hash(mdev->verify_tfm);
2965 mdev->verify_tfm = verify_tfm;
2966 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2967 }
2968 if (csums_tfm) {
2969 strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2970 mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2971 crypto_free_hash(mdev->csums_tfm);
2972 mdev->csums_tfm = csums_tfm;
2973 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2974 }
778f271d
PR
2975 if (fifo_size != mdev->rs_plan_s.size) {
2976 kfree(mdev->rs_plan_s.values);
2977 mdev->rs_plan_s.values = rs_plan_s;
2978 mdev->rs_plan_s.size = fifo_size;
2979 mdev->rs_planed = 0;
2980 }
b411b363
PR
2981 spin_unlock(&mdev->peer_seq_lock);
2982 }
2983
2984 return ok;
2985disconnect:
2986 /* just for completeness: actually not needed,
2987 * as this is not reached if csums_tfm was ok. */
2988 crypto_free_hash(csums_tfm);
2989 /* but free the verify_tfm again, if csums_tfm did not work out */
2990 crypto_free_hash(verify_tfm);
2991 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 2992 return false;
b411b363
PR
2993}
2994
b411b363
PR
2995/* warn if the arguments differ by more than 12.5% */
2996static void warn_if_differ_considerably(struct drbd_conf *mdev,
2997 const char *s, sector_t a, sector_t b)
2998{
2999 sector_t d;
3000 if (a == 0 || b == 0)
3001 return;
3002 d = (a > b) ? (a - b) : (b - a);
3003 if (d > (a>>3) || d > (b>>3))
3004 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3005 (unsigned long long)a, (unsigned long long)b);
3006}
3007
d8763023
AG
3008static int receive_sizes(struct drbd_conf *mdev, enum drbd_packet cmd,
3009 unsigned int data_size)
b411b363 3010{
e42325a5 3011 struct p_sizes *p = &mdev->tconn->data.rbuf.sizes;
b411b363 3012 enum determine_dev_size dd = unchanged;
b411b363
PR
3013 sector_t p_size, p_usize, my_usize;
3014 int ldsc = 0; /* local disk size changed */
e89b591c 3015 enum dds_flags ddsf;
b411b363 3016
b411b363
PR
3017 p_size = be64_to_cpu(p->d_size);
3018 p_usize = be64_to_cpu(p->u_size);
3019
3020 if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
3021 dev_err(DEV, "some backing storage is needed\n");
3022 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 3023 return false;
b411b363
PR
3024 }
3025
3026 /* just store the peer's disk size for now.
3027 * we still need to figure out whether we accept that. */
3028 mdev->p_size = p_size;
3029
b411b363
PR
3030 if (get_ldev(mdev)) {
3031 warn_if_differ_considerably(mdev, "lower level device sizes",
3032 p_size, drbd_get_max_capacity(mdev->ldev));
3033 warn_if_differ_considerably(mdev, "user requested size",
3034 p_usize, mdev->ldev->dc.disk_size);
3035
3036 /* if this is the first connect, or an otherwise expected
3037 * param exchange, choose the minimum */
3038 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3039 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3040 p_usize);
3041
3042 my_usize = mdev->ldev->dc.disk_size;
3043
3044 if (mdev->ldev->dc.disk_size != p_usize) {
3045 mdev->ldev->dc.disk_size = p_usize;
3046 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3047 (unsigned long)mdev->ldev->dc.disk_size);
3048 }
3049
3050 /* Never shrink a device with usable data during connect.
3051 But allow online shrinking if we are connected. */
a393db6f 3052 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
b411b363
PR
3053 drbd_get_capacity(mdev->this_bdev) &&
3054 mdev->state.disk >= D_OUTDATED &&
3055 mdev->state.conn < C_CONNECTED) {
3056 dev_err(DEV, "The peer's disk size is too small!\n");
3057 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3058 mdev->ldev->dc.disk_size = my_usize;
3059 put_ldev(mdev);
81e84650 3060 return false;
b411b363
PR
3061 }
3062 put_ldev(mdev);
3063 }
b411b363 3064
e89b591c 3065 ddsf = be16_to_cpu(p->dds_flags);
b411b363 3066 if (get_ldev(mdev)) {
24c4830c 3067 dd = drbd_determine_dev_size(mdev, ddsf);
b411b363
PR
3068 put_ldev(mdev);
3069 if (dd == dev_size_error)
81e84650 3070 return false;
b411b363
PR
3071 drbd_md_sync(mdev);
3072 } else {
3073 /* I am diskless, need to accept the peer's size. */
3074 drbd_set_my_capacity(mdev, p_size);
3075 }
3076
99432fcc
PR
3077 mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3078 drbd_reconsider_max_bio_size(mdev);
3079
b411b363
PR
3080 if (get_ldev(mdev)) {
3081 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3082 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3083 ldsc = 1;
3084 }
3085
b411b363
PR
3086 put_ldev(mdev);
3087 }
3088
3089 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3090 if (be64_to_cpu(p->c_size) !=
3091 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3092 /* we have different sizes, probably peer
3093 * needs to know my new size... */
e89b591c 3094 drbd_send_sizes(mdev, 0, ddsf);
b411b363
PR
3095 }
3096 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3097 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3098 if (mdev->state.pdsk >= D_INCONSISTENT &&
e89b591c
PR
3099 mdev->state.disk >= D_INCONSISTENT) {
3100 if (ddsf & DDSF_NO_RESYNC)
3101 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3102 else
3103 resync_after_online_grow(mdev);
3104 } else
b411b363
PR
3105 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3106 }
3107 }
3108
81e84650 3109 return true;
b411b363
PR
3110}
3111
d8763023
AG
3112static int receive_uuids(struct drbd_conf *mdev, enum drbd_packet cmd,
3113 unsigned int data_size)
b411b363 3114{
e42325a5 3115 struct p_uuids *p = &mdev->tconn->data.rbuf.uuids;
b411b363 3116 u64 *p_uuid;
62b0da3a 3117 int i, updated_uuids = 0;
b411b363 3118
b411b363
PR
3119 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3120
3121 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3122 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3123
3124 kfree(mdev->p_uuid);
3125 mdev->p_uuid = p_uuid;
3126
3127 if (mdev->state.conn < C_CONNECTED &&
3128 mdev->state.disk < D_INCONSISTENT &&
3129 mdev->state.role == R_PRIMARY &&
3130 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3131 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3132 (unsigned long long)mdev->ed_uuid);
3133 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 3134 return false;
b411b363
PR
3135 }
3136
3137 if (get_ldev(mdev)) {
3138 int skip_initial_sync =
3139 mdev->state.conn == C_CONNECTED &&
31890f4a 3140 mdev->tconn->agreed_pro_version >= 90 &&
b411b363
PR
3141 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3142 (p_uuid[UI_FLAGS] & 8);
3143 if (skip_initial_sync) {
3144 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3145 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
20ceb2b2
LE
3146 "clear_n_write from receive_uuids",
3147 BM_LOCKED_TEST_ALLOWED);
b411b363
PR
3148 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3149 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3150 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3151 CS_VERBOSE, NULL);
3152 drbd_md_sync(mdev);
62b0da3a 3153 updated_uuids = 1;
b411b363
PR
3154 }
3155 put_ldev(mdev);
18a50fa2
PR
3156 } else if (mdev->state.disk < D_INCONSISTENT &&
3157 mdev->state.role == R_PRIMARY) {
3158 /* I am a diskless primary, the peer just created a new current UUID
3159 for me. */
62b0da3a 3160 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
b411b363
PR
3161 }
3162
3163 /* Before we test for the disk state, we should wait until an eventually
3164 ongoing cluster wide state change is finished. That is important if
3165 we are primary and are detaching from our disk. We need to see the
3166 new disk state... */
3167 wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3168 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
62b0da3a
LE
3169 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3170
3171 if (updated_uuids)
3172 drbd_print_uuids(mdev, "receiver updated UUIDs to");
b411b363 3173
81e84650 3174 return true;
b411b363
PR
3175}
3176
3177/**
3178 * convert_state() - Converts the peer's view of the cluster state to our point of view
3179 * @ps: The state as seen by the peer.
3180 */
3181static union drbd_state convert_state(union drbd_state ps)
3182{
3183 union drbd_state ms;
3184
3185 static enum drbd_conns c_tab[] = {
3186 [C_CONNECTED] = C_CONNECTED,
3187
3188 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3189 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3190 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3191 [C_VERIFY_S] = C_VERIFY_T,
3192 [C_MASK] = C_MASK,
3193 };
3194
3195 ms.i = ps.i;
3196
3197 ms.conn = c_tab[ps.conn];
3198 ms.peer = ps.role;
3199 ms.role = ps.peer;
3200 ms.pdsk = ps.disk;
3201 ms.disk = ps.pdsk;
3202 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3203
3204 return ms;
3205}
3206
d8763023
AG
3207static int receive_req_state(struct drbd_conf *mdev, enum drbd_packet cmd,
3208 unsigned int data_size)
b411b363 3209{
e42325a5 3210 struct p_req_state *p = &mdev->tconn->data.rbuf.req_state;
b411b363 3211 union drbd_state mask, val;
bf885f8a 3212 enum drbd_state_rv rv;
b411b363 3213
b411b363
PR
3214 mask.i = be32_to_cpu(p->mask);
3215 val.i = be32_to_cpu(p->val);
3216
25703f83 3217 if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
b411b363
PR
3218 test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3219 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
81e84650 3220 return true;
b411b363
PR
3221 }
3222
3223 mask = convert_state(mask);
3224 val = convert_state(val);
3225
3226 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3227
3228 drbd_send_sr_reply(mdev, rv);
3229 drbd_md_sync(mdev);
3230
81e84650 3231 return true;
b411b363
PR
3232}
3233
d8763023
AG
3234static int receive_state(struct drbd_conf *mdev, enum drbd_packet cmd,
3235 unsigned int data_size)
b411b363 3236{
e42325a5 3237 struct p_state *p = &mdev->tconn->data.rbuf.state;
4ac4aada 3238 union drbd_state os, ns, peer_state;
b411b363 3239 enum drbd_disk_state real_peer_disk;
65d922c3 3240 enum chg_state_flags cs_flags;
b411b363
PR
3241 int rv;
3242
b411b363
PR
3243 peer_state.i = be32_to_cpu(p->state);
3244
3245 real_peer_disk = peer_state.disk;
3246 if (peer_state.disk == D_NEGOTIATING) {
3247 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3248 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3249 }
3250
87eeee41 3251 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 3252 retry:
4ac4aada 3253 os = ns = mdev->state;
87eeee41 3254 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 3255
e9ef7bb6
LE
3256 /* peer says his disk is uptodate, while we think it is inconsistent,
3257 * and this happens while we think we have a sync going on. */
3258 if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3259 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3260 /* If we are (becoming) SyncSource, but peer is still in sync
3261 * preparation, ignore its uptodate-ness to avoid flapping, it
3262 * will change to inconsistent once the peer reaches active
3263 * syncing states.
3264 * It may have changed syncer-paused flags, however, so we
3265 * cannot ignore this completely. */
3266 if (peer_state.conn > C_CONNECTED &&
3267 peer_state.conn < C_SYNC_SOURCE)
3268 real_peer_disk = D_INCONSISTENT;
3269
3270 /* if peer_state changes to connected at the same time,
3271 * it explicitly notifies us that it finished resync.
3272 * Maybe we should finish it up, too? */
3273 else if (os.conn >= C_SYNC_SOURCE &&
3274 peer_state.conn == C_CONNECTED) {
3275 if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3276 drbd_resync_finished(mdev);
81e84650 3277 return true;
e9ef7bb6
LE
3278 }
3279 }
3280
3281 /* peer says his disk is inconsistent, while we think it is uptodate,
3282 * and this happens while the peer still thinks we have a sync going on,
3283 * but we think we are already done with the sync.
3284 * We ignore this to avoid flapping pdsk.
3285 * This should not happen, if the peer is a recent version of drbd. */
3286 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3287 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3288 real_peer_disk = D_UP_TO_DATE;
3289
4ac4aada
LE
3290 if (ns.conn == C_WF_REPORT_PARAMS)
3291 ns.conn = C_CONNECTED;
b411b363 3292
67531718
PR
3293 if (peer_state.conn == C_AHEAD)
3294 ns.conn = C_BEHIND;
3295
b411b363
PR
3296 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3297 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3298 int cr; /* consider resync */
3299
3300 /* if we established a new connection */
4ac4aada 3301 cr = (os.conn < C_CONNECTED);
b411b363
PR
3302 /* if we had an established connection
3303 * and one of the nodes newly attaches a disk */
4ac4aada 3304 cr |= (os.conn == C_CONNECTED &&
b411b363 3305 (peer_state.disk == D_NEGOTIATING ||
4ac4aada 3306 os.disk == D_NEGOTIATING));
b411b363
PR
3307 /* if we have both been inconsistent, and the peer has been
3308 * forced to be UpToDate with --overwrite-data */
3309 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3310 /* if we had been plain connected, and the admin requested to
3311 * start a sync by "invalidate" or "invalidate-remote" */
4ac4aada 3312 cr |= (os.conn == C_CONNECTED &&
b411b363
PR
3313 (peer_state.conn >= C_STARTING_SYNC_S &&
3314 peer_state.conn <= C_WF_BITMAP_T));
3315
3316 if (cr)
4ac4aada 3317 ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
b411b363
PR
3318
3319 put_ldev(mdev);
4ac4aada
LE
3320 if (ns.conn == C_MASK) {
3321 ns.conn = C_CONNECTED;
b411b363 3322 if (mdev->state.disk == D_NEGOTIATING) {
82f59cc6 3323 drbd_force_state(mdev, NS(disk, D_FAILED));
b411b363
PR
3324 } else if (peer_state.disk == D_NEGOTIATING) {
3325 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3326 peer_state.disk = D_DISKLESS;
580b9767 3327 real_peer_disk = D_DISKLESS;
b411b363 3328 } else {
cf14c2e9 3329 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
81e84650 3330 return false;
4ac4aada 3331 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
b411b363 3332 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 3333 return false;
b411b363
PR
3334 }
3335 }
3336 }
3337
87eeee41 3338 spin_lock_irq(&mdev->tconn->req_lock);
4ac4aada 3339 if (mdev->state.i != os.i)
b411b363
PR
3340 goto retry;
3341 clear_bit(CONSIDER_RESYNC, &mdev->flags);
b411b363
PR
3342 ns.peer = peer_state.role;
3343 ns.pdsk = real_peer_disk;
3344 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4ac4aada 3345 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
b411b363 3346 ns.disk = mdev->new_state_tmp.disk;
4ac4aada
LE
3347 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3348 if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
481c6f50 3349 test_bit(NEW_CUR_UUID, &mdev->flags)) {
8554df1c 3350 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
481c6f50 3351 for temporal network outages! */
87eeee41 3352 spin_unlock_irq(&mdev->tconn->req_lock);
481c6f50
PR
3353 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3354 tl_clear(mdev);
3355 drbd_uuid_new_current(mdev);
3356 clear_bit(NEW_CUR_UUID, &mdev->flags);
3357 drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0));
81e84650 3358 return false;
481c6f50 3359 }
65d922c3 3360 rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
b411b363 3361 ns = mdev->state;
87eeee41 3362 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
3363
3364 if (rv < SS_SUCCESS) {
3365 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 3366 return false;
b411b363
PR
3367 }
3368
4ac4aada
LE
3369 if (os.conn > C_WF_REPORT_PARAMS) {
3370 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
b411b363
PR
3371 peer_state.disk != D_NEGOTIATING ) {
3372 /* we want resync, peer has not yet decided to sync... */
3373 /* Nowadays only used when forcing a node into primary role and
3374 setting its disk to UpToDate with that */
3375 drbd_send_uuids(mdev);
3376 drbd_send_state(mdev);
3377 }
3378 }
3379
89e58e75 3380 mdev->tconn->net_conf->want_lose = 0;
b411b363
PR
3381
3382 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3383
81e84650 3384 return true;
b411b363
PR
3385}
3386
d8763023
AG
3387static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packet cmd,
3388 unsigned int data_size)
b411b363 3389{
e42325a5 3390 struct p_rs_uuid *p = &mdev->tconn->data.rbuf.rs_uuid;
b411b363
PR
3391
3392 wait_event(mdev->misc_wait,
3393 mdev->state.conn == C_WF_SYNC_UUID ||
c4752ef1 3394 mdev->state.conn == C_BEHIND ||
b411b363
PR
3395 mdev->state.conn < C_CONNECTED ||
3396 mdev->state.disk < D_NEGOTIATING);
3397
3398 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3399
b411b363
PR
3400 /* Here the _drbd_uuid_ functions are right, current should
3401 _not_ be rotated into the history */
3402 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3403 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3404 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3405
62b0da3a 3406 drbd_print_uuids(mdev, "updated sync uuid");
b411b363
PR
3407 drbd_start_resync(mdev, C_SYNC_TARGET);
3408
3409 put_ldev(mdev);
3410 } else
3411 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3412
81e84650 3413 return true;
b411b363
PR
3414}
3415
2c46407d
AG
3416/**
3417 * receive_bitmap_plain
3418 *
3419 * Return 0 when done, 1 when another iteration is needed, and a negative error
3420 * code upon failure.
3421 */
3422static int
02918be2
PR
3423receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3424 unsigned long *buffer, struct bm_xfer_ctx *c)
b411b363
PR
3425{
3426 unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3427 unsigned want = num_words * sizeof(long);
2c46407d 3428 int err;
b411b363 3429
02918be2
PR
3430 if (want != data_size) {
3431 dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
2c46407d 3432 return -EIO;
b411b363
PR
3433 }
3434 if (want == 0)
2c46407d 3435 return 0;
de0ff338 3436 err = drbd_recv(mdev->tconn, buffer, want);
2c46407d
AG
3437 if (err != want) {
3438 if (err >= 0)
3439 err = -EIO;
3440 return err;
3441 }
b411b363
PR
3442
3443 drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3444
3445 c->word_offset += num_words;
3446 c->bit_offset = c->word_offset * BITS_PER_LONG;
3447 if (c->bit_offset > c->bm_bits)
3448 c->bit_offset = c->bm_bits;
3449
2c46407d 3450 return 1;
b411b363
PR
3451}
3452
2c46407d
AG
3453/**
3454 * recv_bm_rle_bits
3455 *
3456 * Return 0 when done, 1 when another iteration is needed, and a negative error
3457 * code upon failure.
3458 */
3459static int
b411b363
PR
3460recv_bm_rle_bits(struct drbd_conf *mdev,
3461 struct p_compressed_bm *p,
c6d25cfe
PR
3462 struct bm_xfer_ctx *c,
3463 unsigned int len)
b411b363
PR
3464{
3465 struct bitstream bs;
3466 u64 look_ahead;
3467 u64 rl;
3468 u64 tmp;
3469 unsigned long s = c->bit_offset;
3470 unsigned long e;
b411b363
PR
3471 int toggle = DCBP_get_start(p);
3472 int have;
3473 int bits;
3474
3475 bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3476
3477 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3478 if (bits < 0)
2c46407d 3479 return -EIO;
b411b363
PR
3480
3481 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3482 bits = vli_decode_bits(&rl, look_ahead);
3483 if (bits <= 0)
2c46407d 3484 return -EIO;
b411b363
PR
3485
3486 if (toggle) {
3487 e = s + rl -1;
3488 if (e >= c->bm_bits) {
3489 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
2c46407d 3490 return -EIO;
b411b363
PR
3491 }
3492 _drbd_bm_set_bits(mdev, s, e);
3493 }
3494
3495 if (have < bits) {
3496 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3497 have, bits, look_ahead,
3498 (unsigned int)(bs.cur.b - p->code),
3499 (unsigned int)bs.buf_len);
2c46407d 3500 return -EIO;
b411b363
PR
3501 }
3502 look_ahead >>= bits;
3503 have -= bits;
3504
3505 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3506 if (bits < 0)
2c46407d 3507 return -EIO;
b411b363
PR
3508 look_ahead |= tmp << have;
3509 have += bits;
3510 }
3511
3512 c->bit_offset = s;
3513 bm_xfer_ctx_bit_to_word_offset(c);
3514
2c46407d 3515 return (s != c->bm_bits);
b411b363
PR
3516}
3517
2c46407d
AG
3518/**
3519 * decode_bitmap_c
3520 *
3521 * Return 0 when done, 1 when another iteration is needed, and a negative error
3522 * code upon failure.
3523 */
3524static int
b411b363
PR
3525decode_bitmap_c(struct drbd_conf *mdev,
3526 struct p_compressed_bm *p,
c6d25cfe
PR
3527 struct bm_xfer_ctx *c,
3528 unsigned int len)
b411b363
PR
3529{
3530 if (DCBP_get_code(p) == RLE_VLI_Bits)
c6d25cfe 3531 return recv_bm_rle_bits(mdev, p, c, len);
b411b363
PR
3532
3533 /* other variants had been implemented for evaluation,
3534 * but have been dropped as this one turned out to be "best"
3535 * during all our tests. */
3536
3537 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3538 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
2c46407d 3539 return -EIO;
b411b363
PR
3540}
3541
3542void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3543 const char *direction, struct bm_xfer_ctx *c)
3544{
3545 /* what would it take to transfer it "plaintext" */
c012949a 3546 unsigned plain = sizeof(struct p_header) *
b411b363
PR
3547 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3548 + c->bm_words * sizeof(long);
3549 unsigned total = c->bytes[0] + c->bytes[1];
3550 unsigned r;
3551
3552 /* total can not be zero. but just in case: */
3553 if (total == 0)
3554 return;
3555
3556 /* don't report if not compressed */
3557 if (total >= plain)
3558 return;
3559
3560 /* total < plain. check for overflow, still */
3561 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3562 : (1000 * total / plain);
3563
3564 if (r > 1000)
3565 r = 1000;
3566
3567 r = 1000 - r;
3568 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3569 "total %u; compression: %u.%u%%\n",
3570 direction,
3571 c->bytes[1], c->packets[1],
3572 c->bytes[0], c->packets[0],
3573 total, r/10, r % 10);
3574}
3575
3576/* Since we are processing the bitfield from lower addresses to higher,
3577 it does not matter if the process it in 32 bit chunks or 64 bit
3578 chunks as long as it is little endian. (Understand it as byte stream,
3579 beginning with the lowest byte...) If we would use big endian
3580 we would need to process it from the highest address to the lowest,
3581 in order to be agnostic to the 32 vs 64 bits issue.
3582
3583 returns 0 on failure, 1 if we successfully received it. */
d8763023
AG
3584static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packet cmd,
3585 unsigned int data_size)
b411b363
PR
3586{
3587 struct bm_xfer_ctx c;
3588 void *buffer;
2c46407d 3589 int err;
81e84650 3590 int ok = false;
257d0af6 3591 struct p_header *h = &mdev->tconn->data.rbuf.header;
77351055 3592 struct packet_info pi;
b411b363 3593
20ceb2b2
LE
3594 drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3595 /* you are supposed to send additional out-of-sync information
3596 * if you actually set bits during this phase */
b411b363
PR
3597
3598 /* maybe we should use some per thread scratch page,
3599 * and allocate that during initial device creation? */
3600 buffer = (unsigned long *) __get_free_page(GFP_NOIO);
3601 if (!buffer) {
3602 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3603 goto out;
3604 }
3605
3606 c = (struct bm_xfer_ctx) {
3607 .bm_bits = drbd_bm_bits(mdev),
3608 .bm_words = drbd_bm_words(mdev),
3609 };
3610
2c46407d 3611 for(;;) {
02918be2 3612 if (cmd == P_BITMAP) {
2c46407d 3613 err = receive_bitmap_plain(mdev, data_size, buffer, &c);
02918be2 3614 } else if (cmd == P_COMPRESSED_BITMAP) {
b411b363
PR
3615 /* MAYBE: sanity check that we speak proto >= 90,
3616 * and the feature is enabled! */
3617 struct p_compressed_bm *p;
3618
02918be2 3619 if (data_size > BM_PACKET_PAYLOAD_BYTES) {
b411b363
PR
3620 dev_err(DEV, "ReportCBitmap packet too large\n");
3621 goto out;
3622 }
3623 /* use the page buff */
3624 p = buffer;
3625 memcpy(p, h, sizeof(*h));
de0ff338 3626 if (drbd_recv(mdev->tconn, p->head.payload, data_size) != data_size)
b411b363 3627 goto out;
004352fa
LE
3628 if (data_size <= (sizeof(*p) - sizeof(p->head))) {
3629 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
78fcbdae 3630 goto out;
b411b363 3631 }
c6d25cfe 3632 err = decode_bitmap_c(mdev, p, &c, data_size);
b411b363 3633 } else {
02918be2 3634 dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
b411b363
PR
3635 goto out;
3636 }
3637
02918be2 3638 c.packets[cmd == P_BITMAP]++;
257d0af6 3639 c.bytes[cmd == P_BITMAP] += sizeof(struct p_header) + data_size;
b411b363 3640
2c46407d
AG
3641 if (err <= 0) {
3642 if (err < 0)
3643 goto out;
b411b363 3644 break;
2c46407d 3645 }
9ba7aa00 3646 if (!drbd_recv_header(mdev->tconn, &pi))
b411b363 3647 goto out;
77351055
PR
3648 cmd = pi.cmd;
3649 data_size = pi.size;
2c46407d 3650 }
b411b363
PR
3651
3652 INFO_bm_xfer_stats(mdev, "receive", &c);
3653
3654 if (mdev->state.conn == C_WF_BITMAP_T) {
de1f8e4a
AG
3655 enum drbd_state_rv rv;
3656
b411b363
PR
3657 ok = !drbd_send_bitmap(mdev);
3658 if (!ok)
3659 goto out;
3660 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
de1f8e4a
AG
3661 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3662 D_ASSERT(rv == SS_SUCCESS);
b411b363
PR
3663 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3664 /* admin may have requested C_DISCONNECTING,
3665 * other threads may have noticed network errors */
3666 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3667 drbd_conn_str(mdev->state.conn));
3668 }
3669
81e84650 3670 ok = true;
b411b363 3671 out:
20ceb2b2 3672 drbd_bm_unlock(mdev);
b411b363
PR
3673 if (ok && mdev->state.conn == C_WF_BITMAP_S)
3674 drbd_start_resync(mdev, C_SYNC_SOURCE);
3675 free_page((unsigned long) buffer);
3676 return ok;
3677}
3678
d8763023
AG
3679static int receive_skip(struct drbd_conf *mdev, enum drbd_packet cmd,
3680 unsigned int data_size)
b411b363
PR
3681{
3682 /* TODO zero copy sink :) */
3683 static char sink[128];
3684 int size, want, r;
3685
02918be2
PR
3686 dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3687 cmd, data_size);
b411b363 3688
02918be2 3689 size = data_size;
b411b363
PR
3690 while (size > 0) {
3691 want = min_t(int, size, sizeof(sink));
de0ff338 3692 r = drbd_recv(mdev->tconn, sink, want);
841ce241
AG
3693 if (!expect(r > 0))
3694 break;
b411b363
PR
3695 size -= r;
3696 }
3697 return size == 0;
3698}
3699
d8763023
AG
3700static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packet cmd,
3701 unsigned int data_size)
0ced55a3 3702{
e7f52dfb
LE
3703 /* Make sure we've acked all the TCP data associated
3704 * with the data requests being unplugged */
e42325a5 3705 drbd_tcp_quickack(mdev->tconn->data.socket);
0ced55a3 3706
81e84650 3707 return true;
0ced55a3
PR
3708}
3709
d8763023
AG
3710static int receive_out_of_sync(struct drbd_conf *mdev, enum drbd_packet cmd,
3711 unsigned int data_size)
73a01a18 3712{
e42325a5 3713 struct p_block_desc *p = &mdev->tconn->data.rbuf.block_desc;
73a01a18 3714
f735e363
LE
3715 switch (mdev->state.conn) {
3716 case C_WF_SYNC_UUID:
3717 case C_WF_BITMAP_T:
3718 case C_BEHIND:
3719 break;
3720 default:
3721 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3722 drbd_conn_str(mdev->state.conn));
3723 }
3724
73a01a18
PR
3725 drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3726
81e84650 3727 return true;
73a01a18
PR
3728}
3729
d8763023
AG
3730typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packet cmd,
3731 unsigned int to_receive);
02918be2
PR
3732
3733struct data_cmd {
3734 int expect_payload;
3735 size_t pkt_size;
3736 drbd_cmd_handler_f function;
3737};
3738
3739static struct data_cmd drbd_cmd_handler[] = {
3740 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
3741 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
3742 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3743 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
257d0af6
PR
3744 [P_BITMAP] = { 1, sizeof(struct p_header), receive_bitmap } ,
3745 [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header), receive_bitmap } ,
3746 [P_UNPLUG_REMOTE] = { 0, sizeof(struct p_header), receive_UnplugRemote },
02918be2
PR
3747 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3748 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
257d0af6
PR
3749 [P_SYNC_PARAM] = { 1, sizeof(struct p_header), receive_SyncParam },
3750 [P_SYNC_PARAM89] = { 1, sizeof(struct p_header), receive_SyncParam },
02918be2
PR
3751 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
3752 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
3753 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
3754 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
3755 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
3756 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
3757 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3758 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3759 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3760 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
73a01a18 3761 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
b411b363
PR
3762 /* anything missing from this table is in
3763 * the asender_tbl, see get_asender_cmd */
02918be2 3764 [P_MAX_CMD] = { 0, 0, NULL },
b411b363
PR
3765};
3766
02918be2 3767/* All handler functions that expect a sub-header get that sub-heder in
e42325a5 3768 mdev->tconn->data.rbuf.header.head.payload.
02918be2 3769
e42325a5 3770 Usually in mdev->tconn->data.rbuf.header.head the callback can find the usual
02918be2
PR
3771 p_header, but they may not rely on that. Since there is also p_header95 !
3772 */
b411b363 3773
eefc2f7d 3774static void drbdd(struct drbd_tconn *tconn)
b411b363 3775{
eefc2f7d 3776 struct p_header *header = &tconn->data.rbuf.header;
77351055 3777 struct packet_info pi;
02918be2
PR
3778 size_t shs; /* sub header size */
3779 int rv;
b411b363 3780
eefc2f7d
PR
3781 while (get_t_state(&tconn->receiver) == RUNNING) {
3782 drbd_thread_current_set_cpu(&tconn->receiver);
3783 if (!drbd_recv_header(tconn, &pi))
02918be2 3784 goto err_out;
b411b363 3785
77351055 3786 if (unlikely(pi.cmd >= P_MAX_CMD || !drbd_cmd_handler[pi.cmd].function)) {
eefc2f7d 3787 conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
02918be2 3788 goto err_out;
0b33a916 3789 }
b411b363 3790
77351055
PR
3791 shs = drbd_cmd_handler[pi.cmd].pkt_size - sizeof(struct p_header);
3792 if (pi.size - shs > 0 && !drbd_cmd_handler[pi.cmd].expect_payload) {
eefc2f7d 3793 conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
02918be2 3794 goto err_out;
b411b363 3795 }
b411b363 3796
c13f7e1a 3797 if (shs) {
eefc2f7d 3798 rv = drbd_recv(tconn, &header->payload, shs);
c13f7e1a 3799 if (unlikely(rv != shs)) {
0ddc5549 3800 if (!signal_pending(current))
eefc2f7d 3801 conn_warn(tconn, "short read while reading sub header: rv=%d\n", rv);
c13f7e1a
LE
3802 goto err_out;
3803 }
3804 }
3805
eefc2f7d 3806 rv = drbd_cmd_handler[pi.cmd].function(vnr_to_mdev(tconn, pi.vnr), pi.cmd, pi.size - shs);
b411b363 3807
02918be2 3808 if (unlikely(!rv)) {
eefc2f7d 3809 conn_err(tconn, "error receiving %s, l: %d!\n",
77351055 3810 cmdname(pi.cmd), pi.size);
02918be2 3811 goto err_out;
b411b363
PR
3812 }
3813 }
b411b363 3814
02918be2
PR
3815 if (0) {
3816 err_out:
eefc2f7d 3817 drbd_force_state(tconn->volume0, NS(conn, C_PROTOCOL_ERROR));
02918be2 3818 }
b411b363
PR
3819}
3820
a21e9298 3821void drbd_flush_workqueue(struct drbd_conf *mdev)
b411b363
PR
3822{
3823 struct drbd_wq_barrier barr;
3824
3825 barr.w.cb = w_prev_work_done;
a21e9298 3826 barr.w.mdev = mdev;
b411b363 3827 init_completion(&barr.done);
a21e9298 3828 drbd_queue_work(&mdev->tconn->data.work, &barr.w);
b411b363
PR
3829 wait_for_completion(&barr.done);
3830}
3831
360cc740 3832static void drbd_disconnect(struct drbd_tconn *tconn)
b411b363 3833{
b411b363
PR
3834 union drbd_state os, ns;
3835 int rv = SS_UNKNOWN_ERROR;
b411b363 3836
360cc740 3837 if (tconn->volume0->state.conn == C_STANDALONE)
b411b363 3838 return;
b411b363
PR
3839
3840 /* asender does not clean up anything. it must not interfere, either */
360cc740
PR
3841 drbd_thread_stop(&tconn->asender);
3842 drbd_free_sock(tconn);
3843
3844 idr_for_each(&tconn->volumes, drbd_disconnected, tconn);
3845
3846 conn_info(tconn, "Connection closed\n");
3847
3848 spin_lock_irq(&tconn->req_lock);
3849 os = tconn->volume0->state;
3850 if (os.conn >= C_UNCONNECTED) {
3851 /* Do not restart in case we are C_DISCONNECTING */
3852 ns.i = os.i;
3853 ns.conn = C_UNCONNECTED;
3854 rv = _drbd_set_state(tconn->volume0, ns, CS_VERBOSE, NULL);
3855 }
3856 spin_unlock_irq(&tconn->req_lock);
3857
3858 if (os.conn == C_DISCONNECTING) {
3859 wait_event(tconn->net_cnt_wait, atomic_read(&tconn->net_cnt) == 0);
3860
3861 crypto_free_hash(tconn->cram_hmac_tfm);
3862 tconn->cram_hmac_tfm = NULL;
3863
3864 kfree(tconn->net_conf);
3865 tconn->net_conf = NULL;
3866 drbd_request_state(tconn->volume0, NS(conn, C_STANDALONE));
3867 }
3868}
3869
3870static int drbd_disconnected(int vnr, void *p, void *data)
3871{
3872 struct drbd_conf *mdev = (struct drbd_conf *)p;
3873 enum drbd_fencing_p fp;
3874 unsigned int i;
b411b363 3875
85719573 3876 /* wait for current activity to cease. */
87eeee41 3877 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
3878 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3879 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3880 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
87eeee41 3881 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
3882
3883 /* We do not have data structures that would allow us to
3884 * get the rs_pending_cnt down to 0 again.
3885 * * On C_SYNC_TARGET we do not have any data structures describing
3886 * the pending RSDataRequest's we have sent.
3887 * * On C_SYNC_SOURCE there is no data structure that tracks
3888 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3889 * And no, it is not the sum of the reference counts in the
3890 * resync_LRU. The resync_LRU tracks the whole operation including
3891 * the disk-IO, while the rs_pending_cnt only tracks the blocks
3892 * on the fly. */
3893 drbd_rs_cancel_all(mdev);
3894 mdev->rs_total = 0;
3895 mdev->rs_failed = 0;
3896 atomic_set(&mdev->rs_pending_cnt, 0);
3897 wake_up(&mdev->misc_wait);
3898
7fde2be9
PR
3899 del_timer(&mdev->request_timer);
3900
b411b363
PR
3901 /* make sure syncer is stopped and w_resume_next_sg queued */
3902 del_timer_sync(&mdev->resync_timer);
b411b363
PR
3903 resync_timer_fn((unsigned long)mdev);
3904
b411b363
PR
3905 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3906 * w_make_resync_request etc. which may still be on the worker queue
3907 * to be "canceled" */
a21e9298 3908 drbd_flush_workqueue(mdev);
b411b363
PR
3909
3910 /* This also does reclaim_net_ee(). If we do this too early, we might
3911 * miss some resync ee and pages.*/
3912 drbd_process_done_ee(mdev);
3913
3914 kfree(mdev->p_uuid);
3915 mdev->p_uuid = NULL;
3916
fb22c402 3917 if (!is_susp(mdev->state))
b411b363
PR
3918 tl_clear(mdev);
3919
b411b363
PR
3920 drbd_md_sync(mdev);
3921
3922 fp = FP_DONT_CARE;
3923 if (get_ldev(mdev)) {
3924 fp = mdev->ldev->dc.fencing;
3925 put_ldev(mdev);
3926 }
3927
87f7be4c
PR
3928 if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
3929 drbd_try_outdate_peer_async(mdev);
b411b363 3930
20ceb2b2
LE
3931 /* serialize with bitmap writeout triggered by the state change,
3932 * if any. */
3933 wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
3934
b411b363
PR
3935 /* tcp_close and release of sendpage pages can be deferred. I don't
3936 * want to use SO_LINGER, because apparently it can be deferred for
3937 * more than 20 seconds (longest time I checked).
3938 *
3939 * Actually we don't care for exactly when the network stack does its
3940 * put_page(), but release our reference on these pages right here.
3941 */
3942 i = drbd_release_ee(mdev, &mdev->net_ee);
3943 if (i)
3944 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
435f0740
LE
3945 i = atomic_read(&mdev->pp_in_use_by_net);
3946 if (i)
3947 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
b411b363
PR
3948 i = atomic_read(&mdev->pp_in_use);
3949 if (i)
45bb912b 3950 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
b411b363
PR
3951
3952 D_ASSERT(list_empty(&mdev->read_ee));
3953 D_ASSERT(list_empty(&mdev->active_ee));
3954 D_ASSERT(list_empty(&mdev->sync_ee));
3955 D_ASSERT(list_empty(&mdev->done_ee));
3956
3957 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3958 atomic_set(&mdev->current_epoch->epoch_size, 0);
3959 D_ASSERT(list_empty(&mdev->current_epoch->list));
360cc740
PR
3960
3961 return 0;
b411b363
PR
3962}
3963
3964/*
3965 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3966 * we can agree on is stored in agreed_pro_version.
3967 *
3968 * feature flags and the reserved array should be enough room for future
3969 * enhancements of the handshake protocol, and possible plugins...
3970 *
3971 * for now, they are expected to be zero, but ignored.
3972 */
8a22cccc 3973static int drbd_send_handshake(struct drbd_tconn *tconn)
b411b363 3974{
e6b3ea83 3975 /* ASSERT current == mdev->tconn->receiver ... */
8a22cccc 3976 struct p_handshake *p = &tconn->data.sbuf.handshake;
b411b363
PR
3977 int ok;
3978
8a22cccc
PR
3979 if (mutex_lock_interruptible(&tconn->data.mutex)) {
3980 conn_err(tconn, "interrupted during initial handshake\n");
b411b363
PR
3981 return 0; /* interrupted. not ok. */
3982 }
3983
8a22cccc
PR
3984 if (tconn->data.socket == NULL) {
3985 mutex_unlock(&tconn->data.mutex);
b411b363
PR
3986 return 0;
3987 }
3988
3989 memset(p, 0, sizeof(*p));
3990 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3991 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
8a22cccc
PR
3992 ok = _conn_send_cmd(tconn, 0, tconn->data.socket, P_HAND_SHAKE,
3993 &p->head, sizeof(*p), 0);
3994 mutex_unlock(&tconn->data.mutex);
b411b363
PR
3995 return ok;
3996}
3997
3998/*
3999 * return values:
4000 * 1 yes, we have a valid connection
4001 * 0 oops, did not work out, please try again
4002 * -1 peer talks different language,
4003 * no point in trying again, please go standalone.
4004 */
65d11ed6 4005static int drbd_do_handshake(struct drbd_tconn *tconn)
b411b363 4006{
65d11ed6
PR
4007 /* ASSERT current == tconn->receiver ... */
4008 struct p_handshake *p = &tconn->data.rbuf.handshake;
02918be2 4009 const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
77351055 4010 struct packet_info pi;
b411b363
PR
4011 int rv;
4012
65d11ed6 4013 rv = drbd_send_handshake(tconn);
b411b363
PR
4014 if (!rv)
4015 return 0;
4016
65d11ed6 4017 rv = drbd_recv_header(tconn, &pi);
b411b363
PR
4018 if (!rv)
4019 return 0;
4020
77351055 4021 if (pi.cmd != P_HAND_SHAKE) {
65d11ed6 4022 conn_err(tconn, "expected HandShake packet, received: %s (0x%04x)\n",
77351055 4023 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4024 return -1;
4025 }
4026
77351055 4027 if (pi.size != expect) {
65d11ed6 4028 conn_err(tconn, "expected HandShake length: %u, received: %u\n",
77351055 4029 expect, pi.size);
b411b363
PR
4030 return -1;
4031 }
4032
65d11ed6 4033 rv = drbd_recv(tconn, &p->head.payload, expect);
b411b363
PR
4034
4035 if (rv != expect) {
0ddc5549 4036 if (!signal_pending(current))
65d11ed6 4037 conn_warn(tconn, "short read receiving handshake packet: l=%u\n", rv);
b411b363
PR
4038 return 0;
4039 }
4040
b411b363
PR
4041 p->protocol_min = be32_to_cpu(p->protocol_min);
4042 p->protocol_max = be32_to_cpu(p->protocol_max);
4043 if (p->protocol_max == 0)
4044 p->protocol_max = p->protocol_min;
4045
4046 if (PRO_VERSION_MAX < p->protocol_min ||
4047 PRO_VERSION_MIN > p->protocol_max)
4048 goto incompat;
4049
65d11ed6 4050 tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
b411b363 4051
65d11ed6
PR
4052 conn_info(tconn, "Handshake successful: "
4053 "Agreed network protocol version %d\n", tconn->agreed_pro_version);
b411b363
PR
4054
4055 return 1;
4056
4057 incompat:
65d11ed6 4058 conn_err(tconn, "incompatible DRBD dialects: "
b411b363
PR
4059 "I support %d-%d, peer supports %d-%d\n",
4060 PRO_VERSION_MIN, PRO_VERSION_MAX,
4061 p->protocol_min, p->protocol_max);
4062 return -1;
4063}
4064
4065#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
13e6037d 4066static int drbd_do_auth(struct drbd_tconn *tconn)
b411b363
PR
4067{
4068 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4069 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
b10d96cb 4070 return -1;
b411b363
PR
4071}
4072#else
4073#define CHALLENGE_LEN 64
b10d96cb
JT
4074
4075/* Return value:
4076 1 - auth succeeded,
4077 0 - failed, try again (network error),
4078 -1 - auth failed, don't try again.
4079*/
4080
13e6037d 4081static int drbd_do_auth(struct drbd_tconn *tconn)
b411b363
PR
4082{
4083 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4084 struct scatterlist sg;
4085 char *response = NULL;
4086 char *right_response = NULL;
4087 char *peers_ch = NULL;
13e6037d 4088 unsigned int key_len = strlen(tconn->net_conf->shared_secret);
b411b363
PR
4089 unsigned int resp_size;
4090 struct hash_desc desc;
77351055 4091 struct packet_info pi;
b411b363
PR
4092 int rv;
4093
13e6037d 4094 desc.tfm = tconn->cram_hmac_tfm;
b411b363
PR
4095 desc.flags = 0;
4096
13e6037d
PR
4097 rv = crypto_hash_setkey(tconn->cram_hmac_tfm,
4098 (u8 *)tconn->net_conf->shared_secret, key_len);
b411b363 4099 if (rv) {
13e6037d 4100 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
b10d96cb 4101 rv = -1;
b411b363
PR
4102 goto fail;
4103 }
4104
4105 get_random_bytes(my_challenge, CHALLENGE_LEN);
4106
13e6037d 4107 rv = conn_send_cmd2(tconn, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
b411b363
PR
4108 if (!rv)
4109 goto fail;
4110
13e6037d 4111 rv = drbd_recv_header(tconn, &pi);
b411b363
PR
4112 if (!rv)
4113 goto fail;
4114
77351055 4115 if (pi.cmd != P_AUTH_CHALLENGE) {
13e6037d 4116 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
77351055 4117 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4118 rv = 0;
4119 goto fail;
4120 }
4121
77351055 4122 if (pi.size > CHALLENGE_LEN * 2) {
13e6037d 4123 conn_err(tconn, "expected AuthChallenge payload too big.\n");
b10d96cb 4124 rv = -1;
b411b363
PR
4125 goto fail;
4126 }
4127
77351055 4128 peers_ch = kmalloc(pi.size, GFP_NOIO);
b411b363 4129 if (peers_ch == NULL) {
13e6037d 4130 conn_err(tconn, "kmalloc of peers_ch failed\n");
b10d96cb 4131 rv = -1;
b411b363
PR
4132 goto fail;
4133 }
4134
13e6037d 4135 rv = drbd_recv(tconn, peers_ch, pi.size);
b411b363 4136
77351055 4137 if (rv != pi.size) {
0ddc5549 4138 if (!signal_pending(current))
13e6037d 4139 conn_warn(tconn, "short read AuthChallenge: l=%u\n", rv);
b411b363
PR
4140 rv = 0;
4141 goto fail;
4142 }
4143
13e6037d 4144 resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
b411b363
PR
4145 response = kmalloc(resp_size, GFP_NOIO);
4146 if (response == NULL) {
13e6037d 4147 conn_err(tconn, "kmalloc of response failed\n");
b10d96cb 4148 rv = -1;
b411b363
PR
4149 goto fail;
4150 }
4151
4152 sg_init_table(&sg, 1);
77351055 4153 sg_set_buf(&sg, peers_ch, pi.size);
b411b363
PR
4154
4155 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4156 if (rv) {
13e6037d 4157 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4158 rv = -1;
b411b363
PR
4159 goto fail;
4160 }
4161
13e6037d 4162 rv = conn_send_cmd2(tconn, P_AUTH_RESPONSE, response, resp_size);
b411b363
PR
4163 if (!rv)
4164 goto fail;
4165
13e6037d 4166 rv = drbd_recv_header(tconn, &pi);
b411b363
PR
4167 if (!rv)
4168 goto fail;
4169
77351055 4170 if (pi.cmd != P_AUTH_RESPONSE) {
13e6037d 4171 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
77351055 4172 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4173 rv = 0;
4174 goto fail;
4175 }
4176
77351055 4177 if (pi.size != resp_size) {
13e6037d 4178 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
b411b363
PR
4179 rv = 0;
4180 goto fail;
4181 }
4182
13e6037d 4183 rv = drbd_recv(tconn, response , resp_size);
b411b363
PR
4184
4185 if (rv != resp_size) {
0ddc5549 4186 if (!signal_pending(current))
13e6037d 4187 conn_warn(tconn, "short read receiving AuthResponse: l=%u\n", rv);
b411b363
PR
4188 rv = 0;
4189 goto fail;
4190 }
4191
4192 right_response = kmalloc(resp_size, GFP_NOIO);
2d1ee87d 4193 if (right_response == NULL) {
13e6037d 4194 conn_err(tconn, "kmalloc of right_response failed\n");
b10d96cb 4195 rv = -1;
b411b363
PR
4196 goto fail;
4197 }
4198
4199 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4200
4201 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4202 if (rv) {
13e6037d 4203 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4204 rv = -1;
b411b363
PR
4205 goto fail;
4206 }
4207
4208 rv = !memcmp(response, right_response, resp_size);
4209
4210 if (rv)
13e6037d
PR
4211 conn_info(tconn, "Peer authenticated using %d bytes of '%s' HMAC\n",
4212 resp_size, tconn->net_conf->cram_hmac_alg);
b10d96cb
JT
4213 else
4214 rv = -1;
b411b363
PR
4215
4216 fail:
4217 kfree(peers_ch);
4218 kfree(response);
4219 kfree(right_response);
4220
4221 return rv;
4222}
4223#endif
4224
4225int drbdd_init(struct drbd_thread *thi)
4226{
4d641dd7 4227 struct drbd_tconn *tconn = thi->mdev->tconn;
b411b363
PR
4228 int h;
4229
4d641dd7 4230 conn_info(tconn, "receiver (re)started\n");
b411b363
PR
4231
4232 do {
4d641dd7 4233 h = drbd_connect(tconn);
b411b363 4234 if (h == 0) {
4d641dd7 4235 drbd_disconnect(tconn);
20ee6390 4236 schedule_timeout_interruptible(HZ);
b411b363
PR
4237 }
4238 if (h == -1) {
4d641dd7
PR
4239 conn_warn(tconn, "Discarding network configuration.\n");
4240 drbd_force_state(tconn->volume0, NS(conn, C_DISCONNECTING));
b411b363
PR
4241 }
4242 } while (h == 0);
4243
4244 if (h > 0) {
4d641dd7
PR
4245 if (get_net_conf(tconn)) {
4246 drbdd(tconn);
4247 put_net_conf(tconn);
b411b363
PR
4248 }
4249 }
4250
4d641dd7 4251 drbd_disconnect(tconn);
b411b363 4252
4d641dd7 4253 conn_info(tconn, "receiver terminated\n");
b411b363
PR
4254 return 0;
4255}
4256
4257/* ********* acknowledge sender ******** */
4258
d8763023 4259static int got_RqSReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4260{
257d0af6 4261 struct p_req_state_reply *p = &mdev->tconn->meta.rbuf.req_state_reply;
b411b363
PR
4262
4263 int retcode = be32_to_cpu(p->retcode);
4264
4265 if (retcode >= SS_SUCCESS) {
4266 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4267 } else {
4268 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4269 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4270 drbd_set_st_err_str(retcode), retcode);
4271 }
4272 wake_up(&mdev->state_wait);
4273
81e84650 4274 return true;
b411b363
PR
4275}
4276
d8763023 4277static int got_Ping(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363
PR
4278{
4279 return drbd_send_ping_ack(mdev);
4280
4281}
4282
d8763023 4283static int got_PingAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363
PR
4284{
4285 /* restore idle timeout */
e42325a5 4286 mdev->tconn->meta.socket->sk->sk_rcvtimeo = mdev->tconn->net_conf->ping_int*HZ;
309d1608
PR
4287 if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4288 wake_up(&mdev->misc_wait);
b411b363 4289
81e84650 4290 return true;
b411b363
PR
4291}
4292
d8763023 4293static int got_IsInSync(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4294{
257d0af6 4295 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4296 sector_t sector = be64_to_cpu(p->sector);
4297 int blksize = be32_to_cpu(p->blksize);
4298
31890f4a 4299 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
b411b363
PR
4300
4301 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4302
1d53f09e
LE
4303 if (get_ldev(mdev)) {
4304 drbd_rs_complete_io(mdev, sector);
4305 drbd_set_in_sync(mdev, sector, blksize);
4306 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4307 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4308 put_ldev(mdev);
4309 }
b411b363 4310 dec_rs_pending(mdev);
778f271d 4311 atomic_add(blksize >> 9, &mdev->rs_sect_in);
b411b363 4312
81e84650 4313 return true;
b411b363
PR
4314}
4315
bc9c5c41
AG
4316static int
4317validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4318 struct rb_root *root, const char *func,
4319 enum drbd_req_event what, bool missing_ok)
b411b363
PR
4320{
4321 struct drbd_request *req;
4322 struct bio_and_error m;
4323
87eeee41 4324 spin_lock_irq(&mdev->tconn->req_lock);
bc9c5c41 4325 req = find_request(mdev, root, id, sector, missing_ok, func);
b411b363 4326 if (unlikely(!req)) {
87eeee41 4327 spin_unlock_irq(&mdev->tconn->req_lock);
81e84650 4328 return false;
b411b363
PR
4329 }
4330 __req_mod(req, what, &m);
87eeee41 4331 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
4332
4333 if (m.bio)
4334 complete_master_bio(mdev, &m);
81e84650 4335 return true;
b411b363
PR
4336}
4337
d8763023 4338static int got_BlockAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4339{
257d0af6 4340 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4341 sector_t sector = be64_to_cpu(p->sector);
4342 int blksize = be32_to_cpu(p->blksize);
4343 enum drbd_req_event what;
4344
4345 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4346
579b57ed 4347 if (p->block_id == ID_SYNCER) {
b411b363
PR
4348 drbd_set_in_sync(mdev, sector, blksize);
4349 dec_rs_pending(mdev);
81e84650 4350 return true;
b411b363 4351 }
257d0af6 4352 switch (cmd) {
b411b363 4353 case P_RS_WRITE_ACK:
89e58e75 4354 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4355 what = WRITE_ACKED_BY_PEER_AND_SIS;
b411b363
PR
4356 break;
4357 case P_WRITE_ACK:
89e58e75 4358 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4359 what = WRITE_ACKED_BY_PEER;
b411b363
PR
4360 break;
4361 case P_RECV_ACK:
89e58e75 4362 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B);
8554df1c 4363 what = RECV_ACKED_BY_PEER;
b411b363
PR
4364 break;
4365 case P_DISCARD_ACK:
89e58e75 4366 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4367 what = CONFLICT_DISCARDED_BY_PEER;
b411b363
PR
4368 break;
4369 default:
4370 D_ASSERT(0);
81e84650 4371 return false;
b411b363
PR
4372 }
4373
4374 return validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41
AG
4375 &mdev->write_requests, __func__,
4376 what, false);
b411b363
PR
4377}
4378
d8763023 4379static int got_NegAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4380{
257d0af6 4381 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363 4382 sector_t sector = be64_to_cpu(p->sector);
2deb8336 4383 int size = be32_to_cpu(p->blksize);
89e58e75
PR
4384 bool missing_ok = mdev->tconn->net_conf->wire_protocol == DRBD_PROT_A ||
4385 mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B;
c3afd8f5 4386 bool found;
b411b363
PR
4387
4388 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4389
579b57ed 4390 if (p->block_id == ID_SYNCER) {
b411b363
PR
4391 dec_rs_pending(mdev);
4392 drbd_rs_failed_io(mdev, sector, size);
81e84650 4393 return true;
b411b363 4394 }
2deb8336 4395
c3afd8f5 4396 found = validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41 4397 &mdev->write_requests, __func__,
8554df1c 4398 NEG_ACKED, missing_ok);
c3afd8f5
AG
4399 if (!found) {
4400 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4401 The master bio might already be completed, therefore the
4402 request is no longer in the collision hash. */
4403 /* In Protocol B we might already have got a P_RECV_ACK
4404 but then get a P_NEG_ACK afterwards. */
4405 if (!missing_ok)
2deb8336 4406 return false;
c3afd8f5 4407 drbd_set_out_of_sync(mdev, sector, size);
2deb8336 4408 }
2deb8336 4409 return true;
b411b363
PR
4410}
4411
d8763023 4412static int got_NegDReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4413{
257d0af6 4414 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4415 sector_t sector = be64_to_cpu(p->sector);
4416
4417 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4418 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4419 (unsigned long long)sector, be32_to_cpu(p->blksize));
4420
4421 return validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41 4422 &mdev->read_requests, __func__,
8554df1c 4423 NEG_ACKED, false);
b411b363
PR
4424}
4425
d8763023 4426static int got_NegRSDReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363
PR
4427{
4428 sector_t sector;
4429 int size;
257d0af6 4430 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4431
4432 sector = be64_to_cpu(p->sector);
4433 size = be32_to_cpu(p->blksize);
b411b363
PR
4434
4435 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4436
4437 dec_rs_pending(mdev);
4438
4439 if (get_ldev_if_state(mdev, D_FAILED)) {
4440 drbd_rs_complete_io(mdev, sector);
257d0af6 4441 switch (cmd) {
d612d309
PR
4442 case P_NEG_RS_DREPLY:
4443 drbd_rs_failed_io(mdev, sector, size);
4444 case P_RS_CANCEL:
4445 break;
4446 default:
4447 D_ASSERT(0);
4448 put_ldev(mdev);
4449 return false;
4450 }
b411b363
PR
4451 put_ldev(mdev);
4452 }
4453
81e84650 4454 return true;
b411b363
PR
4455}
4456
d8763023 4457static int got_BarrierAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4458{
257d0af6 4459 struct p_barrier_ack *p = &mdev->tconn->meta.rbuf.barrier_ack;
b411b363
PR
4460
4461 tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4462
c4752ef1
PR
4463 if (mdev->state.conn == C_AHEAD &&
4464 atomic_read(&mdev->ap_in_flight) == 0 &&
370a43e7
PR
4465 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4466 mdev->start_resync_timer.expires = jiffies + HZ;
4467 add_timer(&mdev->start_resync_timer);
c4752ef1
PR
4468 }
4469
81e84650 4470 return true;
b411b363
PR
4471}
4472
d8763023 4473static int got_OVResult(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4474{
257d0af6 4475 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4476 struct drbd_work *w;
4477 sector_t sector;
4478 int size;
4479
4480 sector = be64_to_cpu(p->sector);
4481 size = be32_to_cpu(p->blksize);
4482
4483 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4484
4485 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4486 drbd_ov_oos_found(mdev, sector, size);
4487 else
4488 ov_oos_print(mdev);
4489
1d53f09e 4490 if (!get_ldev(mdev))
81e84650 4491 return true;
1d53f09e 4492
b411b363
PR
4493 drbd_rs_complete_io(mdev, sector);
4494 dec_rs_pending(mdev);
4495
ea5442af
LE
4496 --mdev->ov_left;
4497
4498 /* let's advance progress step marks only for every other megabyte */
4499 if ((mdev->ov_left & 0x200) == 0x200)
4500 drbd_advance_rs_marks(mdev, mdev->ov_left);
4501
4502 if (mdev->ov_left == 0) {
b411b363
PR
4503 w = kmalloc(sizeof(*w), GFP_NOIO);
4504 if (w) {
4505 w->cb = w_ov_finished;
a21e9298 4506 w->mdev = mdev;
e42325a5 4507 drbd_queue_work_front(&mdev->tconn->data.work, w);
b411b363
PR
4508 } else {
4509 dev_err(DEV, "kmalloc(w) failed.");
4510 ov_oos_print(mdev);
4511 drbd_resync_finished(mdev);
4512 }
4513 }
1d53f09e 4514 put_ldev(mdev);
81e84650 4515 return true;
b411b363
PR
4516}
4517
d8763023 4518static int got_skip(struct drbd_conf *mdev, enum drbd_packet cmd)
0ced55a3 4519{
81e84650 4520 return true;
0ced55a3
PR
4521}
4522
b411b363
PR
4523struct asender_cmd {
4524 size_t pkt_size;
d8763023 4525 int (*process)(struct drbd_conf *mdev, enum drbd_packet cmd);
b411b363
PR
4526};
4527
4528static struct asender_cmd *get_asender_cmd(int cmd)
4529{
4530 static struct asender_cmd asender_tbl[] = {
4531 /* anything missing from this table is in
4532 * the drbd_cmd_handler (drbd_default_handler) table,
4533 * see the beginning of drbdd() */
257d0af6
PR
4534 [P_PING] = { sizeof(struct p_header), got_Ping },
4535 [P_PING_ACK] = { sizeof(struct p_header), got_PingAck },
b411b363
PR
4536 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4537 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4538 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4539 [P_DISCARD_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4540 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
4541 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
4542 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply},
4543 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
4544 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
4545 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4546 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
02918be2 4547 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
d612d309 4548 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply},
b411b363
PR
4549 [P_MAX_CMD] = { 0, NULL },
4550 };
4551 if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4552 return NULL;
4553 return &asender_tbl[cmd];
4554}
4555
32862ec7
PR
4556static int _drbd_process_done_ee(int vnr, void *p, void *data)
4557{
4558 struct drbd_conf *mdev = (struct drbd_conf *)p;
4559 return !drbd_process_done_ee(mdev);
4560}
4561
4562static int _check_ee_empty(int vnr, void *p, void *data)
4563{
4564 struct drbd_conf *mdev = (struct drbd_conf *)p;
4565 struct drbd_tconn *tconn = mdev->tconn;
4566 int not_empty;
4567
4568 spin_lock_irq(&tconn->req_lock);
4569 not_empty = !list_empty(&mdev->done_ee);
4570 spin_unlock_irq(&tconn->req_lock);
4571
4572 return not_empty;
4573}
4574
4575static int tconn_process_done_ee(struct drbd_tconn *tconn)
4576{
4577 int not_empty, err;
4578
4579 do {
4580 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4581 flush_signals(current);
4582 err = idr_for_each(&tconn->volumes, _drbd_process_done_ee, NULL);
4583 if (err)
4584 return err;
4585 set_bit(SIGNAL_ASENDER, &tconn->flags);
4586 not_empty = idr_for_each(&tconn->volumes, _check_ee_empty, NULL);
4587 } while (not_empty);
4588
4589 return 0;
4590}
4591
b411b363
PR
4592int drbd_asender(struct drbd_thread *thi)
4593{
32862ec7
PR
4594 struct drbd_tconn *tconn = thi->mdev->tconn;
4595 struct p_header *h = &tconn->meta.rbuf.header;
b411b363 4596 struct asender_cmd *cmd = NULL;
77351055 4597 struct packet_info pi;
257d0af6 4598 int rv;
b411b363
PR
4599 void *buf = h;
4600 int received = 0;
257d0af6 4601 int expect = sizeof(struct p_header);
f36af18c 4602 int ping_timeout_active = 0;
b411b363 4603
b411b363
PR
4604 current->policy = SCHED_RR; /* Make this a realtime task! */
4605 current->rt_priority = 2; /* more important than all other tasks */
4606
e77a0a5c 4607 while (get_t_state(thi) == RUNNING) {
80822284 4608 drbd_thread_current_set_cpu(thi);
32862ec7
PR
4609 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
4610 if (!drbd_send_ping(tconn->volume0)) {
4611 conn_err(tconn, "drbd_send_ping has failed\n");
841ce241
AG
4612 goto reconnect;
4613 }
32862ec7
PR
4614 tconn->meta.socket->sk->sk_rcvtimeo =
4615 tconn->net_conf->ping_timeo*HZ/10;
f36af18c 4616 ping_timeout_active = 1;
b411b363
PR
4617 }
4618
32862ec7
PR
4619 /* TODO: conditionally cork; it may hurt latency if we cork without
4620 much to send */
4621 if (!tconn->net_conf->no_cork)
4622 drbd_tcp_cork(tconn->meta.socket);
4623 if (tconn_process_done_ee(tconn))
4624 goto reconnect;
b411b363 4625 /* but unconditionally uncork unless disabled */
32862ec7
PR
4626 if (!tconn->net_conf->no_cork)
4627 drbd_tcp_uncork(tconn->meta.socket);
b411b363
PR
4628
4629 /* short circuit, recv_msg would return EINTR anyways. */
4630 if (signal_pending(current))
4631 continue;
4632
32862ec7
PR
4633 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
4634 clear_bit(SIGNAL_ASENDER, &tconn->flags);
b411b363
PR
4635
4636 flush_signals(current);
4637
4638 /* Note:
4639 * -EINTR (on meta) we got a signal
4640 * -EAGAIN (on meta) rcvtimeo expired
4641 * -ECONNRESET other side closed the connection
4642 * -ERESTARTSYS (on data) we got a signal
4643 * rv < 0 other than above: unexpected error!
4644 * rv == expected: full header or command
4645 * rv < expected: "woken" by signal during receive
4646 * rv == 0 : "connection shut down by peer"
4647 */
4648 if (likely(rv > 0)) {
4649 received += rv;
4650 buf += rv;
4651 } else if (rv == 0) {
32862ec7 4652 conn_err(tconn, "meta connection shut down by peer.\n");
b411b363
PR
4653 goto reconnect;
4654 } else if (rv == -EAGAIN) {
cb6518cb
LE
4655 /* If the data socket received something meanwhile,
4656 * that is good enough: peer is still alive. */
32862ec7
PR
4657 if (time_after(tconn->last_received,
4658 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
cb6518cb 4659 continue;
f36af18c 4660 if (ping_timeout_active) {
32862ec7 4661 conn_err(tconn, "PingAck did not arrive in time.\n");
b411b363
PR
4662 goto reconnect;
4663 }
32862ec7 4664 set_bit(SEND_PING, &tconn->flags);
b411b363
PR
4665 continue;
4666 } else if (rv == -EINTR) {
4667 continue;
4668 } else {
32862ec7 4669 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
b411b363
PR
4670 goto reconnect;
4671 }
4672
4673 if (received == expect && cmd == NULL) {
32862ec7 4674 if (!decode_header(tconn, h, &pi))
b411b363 4675 goto reconnect;
77351055 4676 cmd = get_asender_cmd(pi.cmd);
b411b363 4677 if (unlikely(cmd == NULL)) {
32862ec7 4678 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
77351055 4679 pi.cmd, pi.size);
b411b363
PR
4680 goto disconnect;
4681 }
4682 expect = cmd->pkt_size;
77351055 4683 if (pi.size != expect - sizeof(struct p_header)) {
32862ec7 4684 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
77351055 4685 pi.cmd, pi.size);
b411b363 4686 goto reconnect;
257d0af6 4687 }
b411b363
PR
4688 }
4689 if (received == expect) {
32862ec7
PR
4690 tconn->last_received = jiffies;
4691 if (!cmd->process(vnr_to_mdev(tconn, pi.vnr), pi.cmd))
b411b363
PR
4692 goto reconnect;
4693
f36af18c
LE
4694 /* the idle_timeout (ping-int)
4695 * has been restored in got_PingAck() */
4696 if (cmd == get_asender_cmd(P_PING_ACK))
4697 ping_timeout_active = 0;
4698
b411b363
PR
4699 buf = h;
4700 received = 0;
257d0af6 4701 expect = sizeof(struct p_header);
b411b363
PR
4702 cmd = NULL;
4703 }
4704 }
4705
4706 if (0) {
4707reconnect:
32862ec7 4708 drbd_force_state(tconn->volume0, NS(conn, C_NETWORK_FAILURE));
b411b363
PR
4709 }
4710 if (0) {
4711disconnect:
32862ec7 4712 drbd_force_state(tconn->volume0, NS(conn, C_DISCONNECTING));
b411b363 4713 }
32862ec7 4714 clear_bit(SIGNAL_ASENDER, &tconn->flags);
b411b363 4715
32862ec7 4716 conn_info(tconn, "asender terminated\n");
b411b363
PR
4717
4718 return 0;
4719}