]> git.proxmox.com Git - mirror_ubuntu-artful-kernel.git/blame - drivers/block/drbd/drbd_receiver.c
drbd: Converted receive_protocol() from mdev to tconn
[mirror_ubuntu-artful-kernel.git] / drivers / block / drbd / drbd_receiver.c
CommitLineData
b411b363
PR
1/*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
b411b363
PR
26#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
b411b363
PR
31#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
b411b363
PR
39#include <linux/pkt_sched.h>
40#define __KERNEL_SYSCALLS__
41#include <linux/unistd.h>
42#include <linux/vmalloc.h>
43#include <linux/random.h>
b411b363
PR
44#include <linux/string.h>
45#include <linux/scatterlist.h>
46#include "drbd_int.h"
b411b363
PR
47#include "drbd_req.h"
48
49#include "drbd_vli.h"
50
77351055
PR
51struct packet_info {
52 enum drbd_packet cmd;
53 int size;
54 int vnr;
55};
56
b411b363
PR
57enum finish_epoch {
58 FE_STILL_LIVE,
59 FE_DESTROYED,
60 FE_RECYCLED,
61};
62
65d11ed6 63static int drbd_do_handshake(struct drbd_tconn *tconn);
13e6037d 64static int drbd_do_auth(struct drbd_tconn *tconn);
360cc740 65static int drbd_disconnected(int vnr, void *p, void *data);
b411b363
PR
66
67static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
00d56944 68static int e_end_block(struct drbd_work *, int);
b411b363 69
b411b363
PR
70
71#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
72
45bb912b
LE
73/*
74 * some helper functions to deal with single linked page lists,
75 * page->private being our "next" pointer.
76 */
77
78/* If at least n pages are linked at head, get n pages off.
79 * Otherwise, don't modify head, and return NULL.
80 * Locking is the responsibility of the caller.
81 */
82static struct page *page_chain_del(struct page **head, int n)
83{
84 struct page *page;
85 struct page *tmp;
86
87 BUG_ON(!n);
88 BUG_ON(!head);
89
90 page = *head;
23ce4227
PR
91
92 if (!page)
93 return NULL;
94
45bb912b
LE
95 while (page) {
96 tmp = page_chain_next(page);
97 if (--n == 0)
98 break; /* found sufficient pages */
99 if (tmp == NULL)
100 /* insufficient pages, don't use any of them. */
101 return NULL;
102 page = tmp;
103 }
104
105 /* add end of list marker for the returned list */
106 set_page_private(page, 0);
107 /* actual return value, and adjustment of head */
108 page = *head;
109 *head = tmp;
110 return page;
111}
112
113/* may be used outside of locks to find the tail of a (usually short)
114 * "private" page chain, before adding it back to a global chain head
115 * with page_chain_add() under a spinlock. */
116static struct page *page_chain_tail(struct page *page, int *len)
117{
118 struct page *tmp;
119 int i = 1;
120 while ((tmp = page_chain_next(page)))
121 ++i, page = tmp;
122 if (len)
123 *len = i;
124 return page;
125}
126
127static int page_chain_free(struct page *page)
128{
129 struct page *tmp;
130 int i = 0;
131 page_chain_for_each_safe(page, tmp) {
132 put_page(page);
133 ++i;
134 }
135 return i;
136}
137
138static void page_chain_add(struct page **head,
139 struct page *chain_first, struct page *chain_last)
140{
141#if 1
142 struct page *tmp;
143 tmp = page_chain_tail(chain_first, NULL);
144 BUG_ON(tmp != chain_last);
145#endif
146
147 /* add chain to head */
148 set_page_private(chain_last, (unsigned long)*head);
149 *head = chain_first;
150}
151
152static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
b411b363
PR
153{
154 struct page *page = NULL;
45bb912b
LE
155 struct page *tmp = NULL;
156 int i = 0;
b411b363
PR
157
158 /* Yes, testing drbd_pp_vacant outside the lock is racy.
159 * So what. It saves a spin_lock. */
45bb912b 160 if (drbd_pp_vacant >= number) {
b411b363 161 spin_lock(&drbd_pp_lock);
45bb912b
LE
162 page = page_chain_del(&drbd_pp_pool, number);
163 if (page)
164 drbd_pp_vacant -= number;
b411b363 165 spin_unlock(&drbd_pp_lock);
45bb912b
LE
166 if (page)
167 return page;
b411b363 168 }
45bb912b 169
b411b363
PR
170 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
171 * "criss-cross" setup, that might cause write-out on some other DRBD,
172 * which in turn might block on the other node at this very place. */
45bb912b
LE
173 for (i = 0; i < number; i++) {
174 tmp = alloc_page(GFP_TRY);
175 if (!tmp)
176 break;
177 set_page_private(tmp, (unsigned long)page);
178 page = tmp;
179 }
180
181 if (i == number)
182 return page;
183
184 /* Not enough pages immediately available this time.
185 * No need to jump around here, drbd_pp_alloc will retry this
186 * function "soon". */
187 if (page) {
188 tmp = page_chain_tail(page, NULL);
189 spin_lock(&drbd_pp_lock);
190 page_chain_add(&drbd_pp_pool, page, tmp);
191 drbd_pp_vacant += i;
192 spin_unlock(&drbd_pp_lock);
193 }
194 return NULL;
b411b363
PR
195}
196
b411b363
PR
197static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
198{
db830c46 199 struct drbd_peer_request *peer_req;
b411b363
PR
200 struct list_head *le, *tle;
201
202 /* The EEs are always appended to the end of the list. Since
203 they are sent in order over the wire, they have to finish
204 in order. As soon as we see the first not finished we can
205 stop to examine the list... */
206
207 list_for_each_safe(le, tle, &mdev->net_ee) {
db830c46
AG
208 peer_req = list_entry(le, struct drbd_peer_request, w.list);
209 if (drbd_ee_has_active_page(peer_req))
b411b363
PR
210 break;
211 list_move(le, to_be_freed);
212 }
213}
214
215static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
216{
217 LIST_HEAD(reclaimed);
db830c46 218 struct drbd_peer_request *peer_req, *t;
b411b363 219
87eeee41 220 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 221 reclaim_net_ee(mdev, &reclaimed);
87eeee41 222 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 223
db830c46
AG
224 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
225 drbd_free_net_ee(mdev, peer_req);
b411b363
PR
226}
227
228/**
45bb912b 229 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
b411b363 230 * @mdev: DRBD device.
45bb912b
LE
231 * @number: number of pages requested
232 * @retry: whether to retry, if not enough pages are available right now
233 *
234 * Tries to allocate number pages, first from our own page pool, then from
235 * the kernel, unless this allocation would exceed the max_buffers setting.
236 * Possibly retry until DRBD frees sufficient pages somewhere else.
b411b363 237 *
45bb912b 238 * Returns a page chain linked via page->private.
b411b363 239 */
45bb912b 240static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
b411b363
PR
241{
242 struct page *page = NULL;
243 DEFINE_WAIT(wait);
244
45bb912b
LE
245 /* Yes, we may run up to @number over max_buffers. If we
246 * follow it strictly, the admin will get it wrong anyways. */
89e58e75 247 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers)
45bb912b 248 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363 249
45bb912b 250 while (page == NULL) {
b411b363
PR
251 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
252
253 drbd_kick_lo_and_reclaim_net(mdev);
254
89e58e75 255 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers) {
45bb912b 256 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363
PR
257 if (page)
258 break;
259 }
260
261 if (!retry)
262 break;
263
264 if (signal_pending(current)) {
265 dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
266 break;
267 }
268
269 schedule();
270 }
271 finish_wait(&drbd_pp_wait, &wait);
272
45bb912b
LE
273 if (page)
274 atomic_add(number, &mdev->pp_in_use);
b411b363
PR
275 return page;
276}
277
278/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
87eeee41 279 * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
45bb912b
LE
280 * Either links the page chain back to the global pool,
281 * or returns all pages to the system. */
435f0740 282static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
b411b363 283{
435f0740 284 atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
b411b363 285 int i;
435f0740 286
81a5d60e 287 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
45bb912b
LE
288 i = page_chain_free(page);
289 else {
290 struct page *tmp;
291 tmp = page_chain_tail(page, &i);
292 spin_lock(&drbd_pp_lock);
293 page_chain_add(&drbd_pp_pool, page, tmp);
294 drbd_pp_vacant += i;
295 spin_unlock(&drbd_pp_lock);
b411b363 296 }
435f0740 297 i = atomic_sub_return(i, a);
45bb912b 298 if (i < 0)
435f0740
LE
299 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
300 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
b411b363
PR
301 wake_up(&drbd_pp_wait);
302}
303
304/*
305You need to hold the req_lock:
306 _drbd_wait_ee_list_empty()
307
308You must not have the req_lock:
309 drbd_free_ee()
310 drbd_alloc_ee()
311 drbd_init_ee()
312 drbd_release_ee()
313 drbd_ee_fix_bhs()
314 drbd_process_done_ee()
315 drbd_clear_done_ee()
316 drbd_wait_ee_list_empty()
317*/
318
f6ffca9f
AG
319struct drbd_peer_request *
320drbd_alloc_ee(struct drbd_conf *mdev, u64 id, sector_t sector,
321 unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
b411b363 322{
db830c46 323 struct drbd_peer_request *peer_req;
b411b363 324 struct page *page;
45bb912b 325 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
b411b363 326
0cf9d27e 327 if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
b411b363
PR
328 return NULL;
329
db830c46
AG
330 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
331 if (!peer_req) {
b411b363
PR
332 if (!(gfp_mask & __GFP_NOWARN))
333 dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
334 return NULL;
335 }
336
45bb912b
LE
337 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
338 if (!page)
339 goto fail;
b411b363 340
db830c46
AG
341 drbd_clear_interval(&peer_req->i);
342 peer_req->i.size = data_size;
343 peer_req->i.sector = sector;
344 peer_req->i.local = false;
345 peer_req->i.waiting = false;
346
347 peer_req->epoch = NULL;
a21e9298 348 peer_req->w.mdev = mdev;
db830c46
AG
349 peer_req->pages = page;
350 atomic_set(&peer_req->pending_bios, 0);
351 peer_req->flags = 0;
9a8e7753
AG
352 /*
353 * The block_id is opaque to the receiver. It is not endianness
354 * converted, and sent back to the sender unchanged.
355 */
db830c46 356 peer_req->block_id = id;
b411b363 357
db830c46 358 return peer_req;
b411b363 359
45bb912b 360 fail:
db830c46 361 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
362 return NULL;
363}
364
db830c46 365void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
f6ffca9f 366 int is_net)
b411b363 367{
db830c46
AG
368 if (peer_req->flags & EE_HAS_DIGEST)
369 kfree(peer_req->digest);
370 drbd_pp_free(mdev, peer_req->pages, is_net);
371 D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
372 D_ASSERT(drbd_interval_empty(&peer_req->i));
373 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
374}
375
376int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
377{
378 LIST_HEAD(work_list);
db830c46 379 struct drbd_peer_request *peer_req, *t;
b411b363 380 int count = 0;
435f0740 381 int is_net = list == &mdev->net_ee;
b411b363 382
87eeee41 383 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 384 list_splice_init(list, &work_list);
87eeee41 385 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 386
db830c46
AG
387 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
388 drbd_free_some_ee(mdev, peer_req, is_net);
b411b363
PR
389 count++;
390 }
391 return count;
392}
393
394
32862ec7 395/* See also comments in _req_mod(,BARRIER_ACKED)
b411b363
PR
396 * and receive_Barrier.
397 *
398 * Move entries from net_ee to done_ee, if ready.
399 * Grab done_ee, call all callbacks, free the entries.
400 * The callbacks typically send out ACKs.
401 */
402static int drbd_process_done_ee(struct drbd_conf *mdev)
403{
404 LIST_HEAD(work_list);
405 LIST_HEAD(reclaimed);
db830c46 406 struct drbd_peer_request *peer_req, *t;
082a3439 407 int ok = 1;
b411b363 408
87eeee41 409 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
410 reclaim_net_ee(mdev, &reclaimed);
411 list_splice_init(&mdev->done_ee, &work_list);
87eeee41 412 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 413
db830c46
AG
414 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
415 drbd_free_net_ee(mdev, peer_req);
b411b363
PR
416
417 /* possible callbacks here:
7be8da07 418 * e_end_block, and e_end_resync_block, e_send_discard_write.
b411b363
PR
419 * all ignore the last argument.
420 */
db830c46 421 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
b411b363 422 /* list_del not necessary, next/prev members not touched */
00d56944 423 ok = peer_req->w.cb(&peer_req->w, !ok) && ok;
db830c46 424 drbd_free_ee(mdev, peer_req);
b411b363
PR
425 }
426 wake_up(&mdev->ee_wait);
427
428 return ok;
429}
430
431void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
432{
433 DEFINE_WAIT(wait);
434
435 /* avoids spin_lock/unlock
436 * and calling prepare_to_wait in the fast path */
437 while (!list_empty(head)) {
438 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
87eeee41 439 spin_unlock_irq(&mdev->tconn->req_lock);
7eaceacc 440 io_schedule();
b411b363 441 finish_wait(&mdev->ee_wait, &wait);
87eeee41 442 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
443 }
444}
445
446void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
447{
87eeee41 448 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 449 _drbd_wait_ee_list_empty(mdev, head);
87eeee41 450 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
451}
452
453/* see also kernel_accept; which is only present since 2.6.18.
454 * also we want to log which part of it failed, exactly */
7653620d 455static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
b411b363
PR
456{
457 struct sock *sk = sock->sk;
458 int err = 0;
459
460 *what = "listen";
461 err = sock->ops->listen(sock, 5);
462 if (err < 0)
463 goto out;
464
465 *what = "sock_create_lite";
466 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
467 newsock);
468 if (err < 0)
469 goto out;
470
471 *what = "accept";
472 err = sock->ops->accept(sock, *newsock, 0);
473 if (err < 0) {
474 sock_release(*newsock);
475 *newsock = NULL;
476 goto out;
477 }
478 (*newsock)->ops = sock->ops;
479
480out:
481 return err;
482}
483
dbd9eea0 484static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
b411b363
PR
485{
486 mm_segment_t oldfs;
487 struct kvec iov = {
488 .iov_base = buf,
489 .iov_len = size,
490 };
491 struct msghdr msg = {
492 .msg_iovlen = 1,
493 .msg_iov = (struct iovec *)&iov,
494 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
495 };
496 int rv;
497
498 oldfs = get_fs();
499 set_fs(KERNEL_DS);
500 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
501 set_fs(oldfs);
502
503 return rv;
504}
505
de0ff338 506static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
b411b363
PR
507{
508 mm_segment_t oldfs;
509 struct kvec iov = {
510 .iov_base = buf,
511 .iov_len = size,
512 };
513 struct msghdr msg = {
514 .msg_iovlen = 1,
515 .msg_iov = (struct iovec *)&iov,
516 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
517 };
518 int rv;
519
520 oldfs = get_fs();
521 set_fs(KERNEL_DS);
522
523 for (;;) {
de0ff338 524 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
b411b363
PR
525 if (rv == size)
526 break;
527
528 /* Note:
529 * ECONNRESET other side closed the connection
530 * ERESTARTSYS (on sock) we got a signal
531 */
532
533 if (rv < 0) {
534 if (rv == -ECONNRESET)
de0ff338 535 conn_info(tconn, "sock was reset by peer\n");
b411b363 536 else if (rv != -ERESTARTSYS)
de0ff338 537 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
b411b363
PR
538 break;
539 } else if (rv == 0) {
de0ff338 540 conn_info(tconn, "sock was shut down by peer\n");
b411b363
PR
541 break;
542 } else {
543 /* signal came in, or peer/link went down,
544 * after we read a partial message
545 */
546 /* D_ASSERT(signal_pending(current)); */
547 break;
548 }
549 };
550
551 set_fs(oldfs);
552
553 if (rv != size)
bbeb641c 554 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
b411b363
PR
555
556 return rv;
557}
558
5dbf1673
LE
559/* quoting tcp(7):
560 * On individual connections, the socket buffer size must be set prior to the
561 * listen(2) or connect(2) calls in order to have it take effect.
562 * This is our wrapper to do so.
563 */
564static void drbd_setbufsize(struct socket *sock, unsigned int snd,
565 unsigned int rcv)
566{
567 /* open coded SO_SNDBUF, SO_RCVBUF */
568 if (snd) {
569 sock->sk->sk_sndbuf = snd;
570 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
571 }
572 if (rcv) {
573 sock->sk->sk_rcvbuf = rcv;
574 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
575 }
576}
577
eac3e990 578static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
b411b363
PR
579{
580 const char *what;
581 struct socket *sock;
582 struct sockaddr_in6 src_in6;
583 int err;
584 int disconnect_on_error = 1;
585
eac3e990 586 if (!get_net_conf(tconn))
b411b363
PR
587 return NULL;
588
589 what = "sock_create_kern";
eac3e990 590 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
b411b363
PR
591 SOCK_STREAM, IPPROTO_TCP, &sock);
592 if (err < 0) {
593 sock = NULL;
594 goto out;
595 }
596
597 sock->sk->sk_rcvtimeo =
eac3e990
PR
598 sock->sk->sk_sndtimeo = tconn->net_conf->try_connect_int*HZ;
599 drbd_setbufsize(sock, tconn->net_conf->sndbuf_size,
600 tconn->net_conf->rcvbuf_size);
b411b363
PR
601
602 /* explicitly bind to the configured IP as source IP
603 * for the outgoing connections.
604 * This is needed for multihomed hosts and to be
605 * able to use lo: interfaces for drbd.
606 * Make sure to use 0 as port number, so linux selects
607 * a free one dynamically.
608 */
eac3e990
PR
609 memcpy(&src_in6, tconn->net_conf->my_addr,
610 min_t(int, tconn->net_conf->my_addr_len, sizeof(src_in6)));
611 if (((struct sockaddr *)tconn->net_conf->my_addr)->sa_family == AF_INET6)
b411b363
PR
612 src_in6.sin6_port = 0;
613 else
614 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
615
616 what = "bind before connect";
617 err = sock->ops->bind(sock,
618 (struct sockaddr *) &src_in6,
eac3e990 619 tconn->net_conf->my_addr_len);
b411b363
PR
620 if (err < 0)
621 goto out;
622
623 /* connect may fail, peer not yet available.
624 * stay C_WF_CONNECTION, don't go Disconnecting! */
625 disconnect_on_error = 0;
626 what = "connect";
627 err = sock->ops->connect(sock,
eac3e990
PR
628 (struct sockaddr *)tconn->net_conf->peer_addr,
629 tconn->net_conf->peer_addr_len, 0);
b411b363
PR
630
631out:
632 if (err < 0) {
633 if (sock) {
634 sock_release(sock);
635 sock = NULL;
636 }
637 switch (-err) {
638 /* timeout, busy, signal pending */
639 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
640 case EINTR: case ERESTARTSYS:
641 /* peer not (yet) available, network problem */
642 case ECONNREFUSED: case ENETUNREACH:
643 case EHOSTDOWN: case EHOSTUNREACH:
644 disconnect_on_error = 0;
645 break;
646 default:
eac3e990 647 conn_err(tconn, "%s failed, err = %d\n", what, err);
b411b363
PR
648 }
649 if (disconnect_on_error)
bbeb641c 650 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 651 }
eac3e990 652 put_net_conf(tconn);
b411b363
PR
653 return sock;
654}
655
7653620d 656static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
b411b363
PR
657{
658 int timeo, err;
659 struct socket *s_estab = NULL, *s_listen;
660 const char *what;
661
7653620d 662 if (!get_net_conf(tconn))
b411b363
PR
663 return NULL;
664
665 what = "sock_create_kern";
7653620d 666 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
b411b363
PR
667 SOCK_STREAM, IPPROTO_TCP, &s_listen);
668 if (err) {
669 s_listen = NULL;
670 goto out;
671 }
672
7653620d 673 timeo = tconn->net_conf->try_connect_int * HZ;
b411b363
PR
674 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
675
676 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
677 s_listen->sk->sk_rcvtimeo = timeo;
678 s_listen->sk->sk_sndtimeo = timeo;
7653620d
PR
679 drbd_setbufsize(s_listen, tconn->net_conf->sndbuf_size,
680 tconn->net_conf->rcvbuf_size);
b411b363
PR
681
682 what = "bind before listen";
683 err = s_listen->ops->bind(s_listen,
7653620d
PR
684 (struct sockaddr *) tconn->net_conf->my_addr,
685 tconn->net_conf->my_addr_len);
b411b363
PR
686 if (err < 0)
687 goto out;
688
7653620d 689 err = drbd_accept(&what, s_listen, &s_estab);
b411b363
PR
690
691out:
692 if (s_listen)
693 sock_release(s_listen);
694 if (err < 0) {
695 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
7653620d 696 conn_err(tconn, "%s failed, err = %d\n", what, err);
bbeb641c 697 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
698 }
699 }
7653620d 700 put_net_conf(tconn);
b411b363
PR
701
702 return s_estab;
703}
704
d38e787e 705static int drbd_send_fp(struct drbd_tconn *tconn, struct socket *sock, enum drbd_packet cmd)
b411b363 706{
d38e787e 707 struct p_header *h = &tconn->data.sbuf.header;
b411b363 708
d38e787e 709 return _conn_send_cmd(tconn, 0, sock, cmd, h, sizeof(*h), 0);
b411b363
PR
710}
711
a25b63f1 712static enum drbd_packet drbd_recv_fp(struct drbd_tconn *tconn, struct socket *sock)
b411b363 713{
a25b63f1 714 struct p_header80 *h = &tconn->data.rbuf.header.h80;
b411b363
PR
715 int rr;
716
dbd9eea0 717 rr = drbd_recv_short(sock, h, sizeof(*h), 0);
b411b363 718
ca9bc12b 719 if (rr == sizeof(*h) && h->magic == cpu_to_be32(DRBD_MAGIC))
b411b363
PR
720 return be16_to_cpu(h->command);
721
722 return 0xffff;
723}
724
725/**
726 * drbd_socket_okay() - Free the socket if its connection is not okay
b411b363
PR
727 * @sock: pointer to the pointer to the socket.
728 */
dbd9eea0 729static int drbd_socket_okay(struct socket **sock)
b411b363
PR
730{
731 int rr;
732 char tb[4];
733
734 if (!*sock)
81e84650 735 return false;
b411b363 736
dbd9eea0 737 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
b411b363
PR
738
739 if (rr > 0 || rr == -EAGAIN) {
81e84650 740 return true;
b411b363
PR
741 } else {
742 sock_release(*sock);
743 *sock = NULL;
81e84650 744 return false;
b411b363
PR
745 }
746}
2325eb66
PR
747/* Gets called if a connection is established, or if a new minor gets created
748 in a connection */
749int drbd_connected(int vnr, void *p, void *data)
907599e0
PR
750{
751 struct drbd_conf *mdev = (struct drbd_conf *)p;
752 int ok = 1;
753
754 atomic_set(&mdev->packet_seq, 0);
755 mdev->peer_seq = 0;
756
8410da8f
PR
757 mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
758 &mdev->tconn->cstate_mutex :
759 &mdev->own_state_mutex;
760
907599e0
PR
761 ok &= drbd_send_sync_param(mdev, &mdev->sync_conf);
762 ok &= drbd_send_sizes(mdev, 0, 0);
763 ok &= drbd_send_uuids(mdev);
764 ok &= drbd_send_state(mdev);
765 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
766 clear_bit(RESIZE_PENDING, &mdev->flags);
767
8410da8f 768
907599e0
PR
769 return !ok;
770}
771
b411b363
PR
772/*
773 * return values:
774 * 1 yes, we have a valid connection
775 * 0 oops, did not work out, please try again
776 * -1 peer talks different language,
777 * no point in trying again, please go standalone.
778 * -2 We do not have a network config...
779 */
907599e0 780static int drbd_connect(struct drbd_tconn *tconn)
b411b363
PR
781{
782 struct socket *s, *sock, *msock;
783 int try, h, ok;
784
bbeb641c 785 if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
b411b363
PR
786 return -2;
787
907599e0
PR
788 clear_bit(DISCARD_CONCURRENT, &tconn->flags);
789 tconn->agreed_pro_version = 99;
fd340c12
PR
790 /* agreed_pro_version must be smaller than 100 so we send the old
791 header (h80) in the first packet and in the handshake packet. */
b411b363
PR
792
793 sock = NULL;
794 msock = NULL;
795
796 do {
797 for (try = 0;;) {
798 /* 3 tries, this should take less than a second! */
907599e0 799 s = drbd_try_connect(tconn);
b411b363
PR
800 if (s || ++try >= 3)
801 break;
802 /* give the other side time to call bind() & listen() */
20ee6390 803 schedule_timeout_interruptible(HZ / 10);
b411b363
PR
804 }
805
806 if (s) {
807 if (!sock) {
907599e0 808 drbd_send_fp(tconn, s, P_HAND_SHAKE_S);
b411b363
PR
809 sock = s;
810 s = NULL;
811 } else if (!msock) {
907599e0 812 drbd_send_fp(tconn, s, P_HAND_SHAKE_M);
b411b363
PR
813 msock = s;
814 s = NULL;
815 } else {
907599e0 816 conn_err(tconn, "Logic error in drbd_connect()\n");
b411b363
PR
817 goto out_release_sockets;
818 }
819 }
820
821 if (sock && msock) {
907599e0 822 schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
dbd9eea0
PR
823 ok = drbd_socket_okay(&sock);
824 ok = drbd_socket_okay(&msock) && ok;
b411b363
PR
825 if (ok)
826 break;
827 }
828
829retry:
907599e0 830 s = drbd_wait_for_connect(tconn);
b411b363 831 if (s) {
907599e0 832 try = drbd_recv_fp(tconn, s);
dbd9eea0
PR
833 drbd_socket_okay(&sock);
834 drbd_socket_okay(&msock);
b411b363
PR
835 switch (try) {
836 case P_HAND_SHAKE_S:
837 if (sock) {
907599e0 838 conn_warn(tconn, "initial packet S crossed\n");
b411b363
PR
839 sock_release(sock);
840 }
841 sock = s;
842 break;
843 case P_HAND_SHAKE_M:
844 if (msock) {
907599e0 845 conn_warn(tconn, "initial packet M crossed\n");
b411b363
PR
846 sock_release(msock);
847 }
848 msock = s;
907599e0 849 set_bit(DISCARD_CONCURRENT, &tconn->flags);
b411b363
PR
850 break;
851 default:
907599e0 852 conn_warn(tconn, "Error receiving initial packet\n");
b411b363
PR
853 sock_release(s);
854 if (random32() & 1)
855 goto retry;
856 }
857 }
858
bbeb641c 859 if (tconn->cstate <= C_DISCONNECTING)
b411b363
PR
860 goto out_release_sockets;
861 if (signal_pending(current)) {
862 flush_signals(current);
863 smp_rmb();
907599e0 864 if (get_t_state(&tconn->receiver) == EXITING)
b411b363
PR
865 goto out_release_sockets;
866 }
867
868 if (sock && msock) {
dbd9eea0
PR
869 ok = drbd_socket_okay(&sock);
870 ok = drbd_socket_okay(&msock) && ok;
b411b363
PR
871 if (ok)
872 break;
873 }
874 } while (1);
875
876 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
877 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
878
879 sock->sk->sk_allocation = GFP_NOIO;
880 msock->sk->sk_allocation = GFP_NOIO;
881
882 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
883 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
884
b411b363 885 /* NOT YET ...
907599e0 886 * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
b411b363
PR
887 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
888 * first set it to the P_HAND_SHAKE timeout,
889 * which we set to 4x the configured ping_timeout. */
890 sock->sk->sk_sndtimeo =
907599e0 891 sock->sk->sk_rcvtimeo = tconn->net_conf->ping_timeo*4*HZ/10;
b411b363 892
907599e0
PR
893 msock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
894 msock->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
b411b363
PR
895
896 /* we don't want delays.
25985edc 897 * we use TCP_CORK where appropriate, though */
b411b363
PR
898 drbd_tcp_nodelay(sock);
899 drbd_tcp_nodelay(msock);
900
907599e0
PR
901 tconn->data.socket = sock;
902 tconn->meta.socket = msock;
903 tconn->last_received = jiffies;
b411b363 904
907599e0 905 h = drbd_do_handshake(tconn);
b411b363
PR
906 if (h <= 0)
907 return h;
908
907599e0 909 if (tconn->cram_hmac_tfm) {
b411b363 910 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
907599e0 911 switch (drbd_do_auth(tconn)) {
b10d96cb 912 case -1:
907599e0 913 conn_err(tconn, "Authentication of peer failed\n");
b411b363 914 return -1;
b10d96cb 915 case 0:
907599e0 916 conn_err(tconn, "Authentication of peer failed, trying again.\n");
b10d96cb 917 return 0;
b411b363
PR
918 }
919 }
920
bbeb641c 921 if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
b411b363
PR
922 return 0;
923
907599e0 924 sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
b411b363
PR
925 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
926
907599e0 927 drbd_thread_start(&tconn->asender);
b411b363 928
907599e0 929 if (drbd_send_protocol(tconn) == -1)
7e2455c1 930 return -1;
b411b363 931
907599e0 932 return !idr_for_each(&tconn->volumes, drbd_connected, tconn);
b411b363
PR
933
934out_release_sockets:
935 if (sock)
936 sock_release(sock);
937 if (msock)
938 sock_release(msock);
939 return -1;
940}
941
ce243853 942static bool decode_header(struct drbd_tconn *tconn, struct p_header *h, struct packet_info *pi)
b411b363 943{
fd340c12 944 if (h->h80.magic == cpu_to_be32(DRBD_MAGIC)) {
77351055
PR
945 pi->cmd = be16_to_cpu(h->h80.command);
946 pi->size = be16_to_cpu(h->h80.length);
eefc2f7d 947 pi->vnr = 0;
ca9bc12b 948 } else if (h->h95.magic == cpu_to_be16(DRBD_MAGIC_BIG)) {
77351055
PR
949 pi->cmd = be16_to_cpu(h->h95.command);
950 pi->size = be32_to_cpu(h->h95.length) & 0x00ffffff;
951 pi->vnr = 0;
02918be2 952 } else {
ce243853 953 conn_err(tconn, "magic?? on data m: 0x%08x c: %d l: %d\n",
004352fa
LE
954 be32_to_cpu(h->h80.magic),
955 be16_to_cpu(h->h80.command),
956 be16_to_cpu(h->h80.length));
81e84650 957 return false;
b411b363 958 }
257d0af6
PR
959 return true;
960}
961
9ba7aa00 962static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
257d0af6 963{
9ba7aa00 964 struct p_header *h = &tconn->data.rbuf.header;
257d0af6
PR
965 int r;
966
9ba7aa00 967 r = drbd_recv(tconn, h, sizeof(*h));
257d0af6
PR
968 if (unlikely(r != sizeof(*h))) {
969 if (!signal_pending(current))
9ba7aa00 970 conn_warn(tconn, "short read expecting header on sock: r=%d\n", r);
257d0af6
PR
971 return false;
972 }
973
9ba7aa00
PR
974 r = decode_header(tconn, h, pi);
975 tconn->last_received = jiffies;
b411b363 976
257d0af6 977 return r;
b411b363
PR
978}
979
2451fc3b 980static void drbd_flush(struct drbd_conf *mdev)
b411b363
PR
981{
982 int rv;
983
984 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
fbd9b09a 985 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
dd3932ed 986 NULL);
b411b363
PR
987 if (rv) {
988 dev_err(DEV, "local disk flush failed with status %d\n", rv);
989 /* would rather check on EOPNOTSUPP, but that is not reliable.
990 * don't try again for ANY return value != 0
991 * if (rv == -EOPNOTSUPP) */
992 drbd_bump_write_ordering(mdev, WO_drain_io);
993 }
994 put_ldev(mdev);
995 }
b411b363
PR
996}
997
998/**
999 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1000 * @mdev: DRBD device.
1001 * @epoch: Epoch object.
1002 * @ev: Epoch event.
1003 */
1004static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1005 struct drbd_epoch *epoch,
1006 enum epoch_event ev)
1007{
2451fc3b 1008 int epoch_size;
b411b363 1009 struct drbd_epoch *next_epoch;
b411b363
PR
1010 enum finish_epoch rv = FE_STILL_LIVE;
1011
1012 spin_lock(&mdev->epoch_lock);
1013 do {
1014 next_epoch = NULL;
b411b363
PR
1015
1016 epoch_size = atomic_read(&epoch->epoch_size);
1017
1018 switch (ev & ~EV_CLEANUP) {
1019 case EV_PUT:
1020 atomic_dec(&epoch->active);
1021 break;
1022 case EV_GOT_BARRIER_NR:
1023 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
b411b363
PR
1024 break;
1025 case EV_BECAME_LAST:
1026 /* nothing to do*/
1027 break;
1028 }
1029
b411b363
PR
1030 if (epoch_size != 0 &&
1031 atomic_read(&epoch->active) == 0 &&
2451fc3b 1032 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
b411b363
PR
1033 if (!(ev & EV_CLEANUP)) {
1034 spin_unlock(&mdev->epoch_lock);
1035 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1036 spin_lock(&mdev->epoch_lock);
1037 }
1038 dec_unacked(mdev);
1039
1040 if (mdev->current_epoch != epoch) {
1041 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1042 list_del(&epoch->list);
1043 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1044 mdev->epochs--;
b411b363
PR
1045 kfree(epoch);
1046
1047 if (rv == FE_STILL_LIVE)
1048 rv = FE_DESTROYED;
1049 } else {
1050 epoch->flags = 0;
1051 atomic_set(&epoch->epoch_size, 0);
698f9315 1052 /* atomic_set(&epoch->active, 0); is already zero */
b411b363
PR
1053 if (rv == FE_STILL_LIVE)
1054 rv = FE_RECYCLED;
2451fc3b 1055 wake_up(&mdev->ee_wait);
b411b363
PR
1056 }
1057 }
1058
1059 if (!next_epoch)
1060 break;
1061
1062 epoch = next_epoch;
1063 } while (1);
1064
1065 spin_unlock(&mdev->epoch_lock);
1066
b411b363
PR
1067 return rv;
1068}
1069
1070/**
1071 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1072 * @mdev: DRBD device.
1073 * @wo: Write ordering method to try.
1074 */
1075void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1076{
1077 enum write_ordering_e pwo;
1078 static char *write_ordering_str[] = {
1079 [WO_none] = "none",
1080 [WO_drain_io] = "drain",
1081 [WO_bdev_flush] = "flush",
b411b363
PR
1082 };
1083
1084 pwo = mdev->write_ordering;
1085 wo = min(pwo, wo);
b411b363
PR
1086 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1087 wo = WO_drain_io;
1088 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1089 wo = WO_none;
1090 mdev->write_ordering = wo;
2451fc3b 1091 if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
b411b363
PR
1092 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1093}
1094
45bb912b 1095/**
fbe29dec 1096 * drbd_submit_peer_request()
45bb912b 1097 * @mdev: DRBD device.
db830c46 1098 * @peer_req: peer request
45bb912b 1099 * @rw: flag field, see bio->bi_rw
10f6d992
LE
1100 *
1101 * May spread the pages to multiple bios,
1102 * depending on bio_add_page restrictions.
1103 *
1104 * Returns 0 if all bios have been submitted,
1105 * -ENOMEM if we could not allocate enough bios,
1106 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1107 * single page to an empty bio (which should never happen and likely indicates
1108 * that the lower level IO stack is in some way broken). This has been observed
1109 * on certain Xen deployments.
45bb912b
LE
1110 */
1111/* TODO allocate from our own bio_set. */
fbe29dec
AG
1112int drbd_submit_peer_request(struct drbd_conf *mdev,
1113 struct drbd_peer_request *peer_req,
1114 const unsigned rw, const int fault_type)
45bb912b
LE
1115{
1116 struct bio *bios = NULL;
1117 struct bio *bio;
db830c46
AG
1118 struct page *page = peer_req->pages;
1119 sector_t sector = peer_req->i.sector;
1120 unsigned ds = peer_req->i.size;
45bb912b
LE
1121 unsigned n_bios = 0;
1122 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
10f6d992 1123 int err = -ENOMEM;
45bb912b
LE
1124
1125 /* In most cases, we will only need one bio. But in case the lower
1126 * level restrictions happen to be different at this offset on this
1127 * side than those of the sending peer, we may need to submit the
da4a75d2
LE
1128 * request in more than one bio.
1129 *
1130 * Plain bio_alloc is good enough here, this is no DRBD internally
1131 * generated bio, but a bio allocated on behalf of the peer.
1132 */
45bb912b
LE
1133next_bio:
1134 bio = bio_alloc(GFP_NOIO, nr_pages);
1135 if (!bio) {
1136 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1137 goto fail;
1138 }
db830c46 1139 /* > peer_req->i.sector, unless this is the first bio */
45bb912b
LE
1140 bio->bi_sector = sector;
1141 bio->bi_bdev = mdev->ldev->backing_bdev;
45bb912b 1142 bio->bi_rw = rw;
db830c46 1143 bio->bi_private = peer_req;
fcefa62e 1144 bio->bi_end_io = drbd_peer_request_endio;
45bb912b
LE
1145
1146 bio->bi_next = bios;
1147 bios = bio;
1148 ++n_bios;
1149
1150 page_chain_for_each(page) {
1151 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1152 if (!bio_add_page(bio, page, len, 0)) {
10f6d992
LE
1153 /* A single page must always be possible!
1154 * But in case it fails anyways,
1155 * we deal with it, and complain (below). */
1156 if (bio->bi_vcnt == 0) {
1157 dev_err(DEV,
1158 "bio_add_page failed for len=%u, "
1159 "bi_vcnt=0 (bi_sector=%llu)\n",
1160 len, (unsigned long long)bio->bi_sector);
1161 err = -ENOSPC;
1162 goto fail;
1163 }
45bb912b
LE
1164 goto next_bio;
1165 }
1166 ds -= len;
1167 sector += len >> 9;
1168 --nr_pages;
1169 }
1170 D_ASSERT(page == NULL);
1171 D_ASSERT(ds == 0);
1172
db830c46 1173 atomic_set(&peer_req->pending_bios, n_bios);
45bb912b
LE
1174 do {
1175 bio = bios;
1176 bios = bios->bi_next;
1177 bio->bi_next = NULL;
1178
45bb912b 1179 drbd_generic_make_request(mdev, fault_type, bio);
45bb912b 1180 } while (bios);
45bb912b
LE
1181 return 0;
1182
1183fail:
1184 while (bios) {
1185 bio = bios;
1186 bios = bios->bi_next;
1187 bio_put(bio);
1188 }
10f6d992 1189 return err;
45bb912b
LE
1190}
1191
53840641 1192static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
db830c46 1193 struct drbd_peer_request *peer_req)
53840641 1194{
db830c46 1195 struct drbd_interval *i = &peer_req->i;
53840641
AG
1196
1197 drbd_remove_interval(&mdev->write_requests, i);
1198 drbd_clear_interval(i);
1199
6c852bec 1200 /* Wake up any processes waiting for this peer request to complete. */
53840641
AG
1201 if (i->waiting)
1202 wake_up(&mdev->misc_wait);
1203}
1204
d8763023
AG
1205static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packet cmd,
1206 unsigned int data_size)
b411b363 1207{
2451fc3b 1208 int rv;
e42325a5 1209 struct p_barrier *p = &mdev->tconn->data.rbuf.barrier;
b411b363
PR
1210 struct drbd_epoch *epoch;
1211
b411b363
PR
1212 inc_unacked(mdev);
1213
b411b363
PR
1214 mdev->current_epoch->barrier_nr = p->barrier;
1215 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1216
1217 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1218 * the activity log, which means it would not be resynced in case the
1219 * R_PRIMARY crashes now.
1220 * Therefore we must send the barrier_ack after the barrier request was
1221 * completed. */
1222 switch (mdev->write_ordering) {
b411b363
PR
1223 case WO_none:
1224 if (rv == FE_RECYCLED)
81e84650 1225 return true;
2451fc3b
PR
1226
1227 /* receiver context, in the writeout path of the other node.
1228 * avoid potential distributed deadlock */
1229 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1230 if (epoch)
1231 break;
1232 else
1233 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1234 /* Fall through */
b411b363
PR
1235
1236 case WO_bdev_flush:
1237 case WO_drain_io:
b411b363 1238 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
2451fc3b
PR
1239 drbd_flush(mdev);
1240
1241 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1242 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1243 if (epoch)
1244 break;
b411b363
PR
1245 }
1246
2451fc3b
PR
1247 epoch = mdev->current_epoch;
1248 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1249
1250 D_ASSERT(atomic_read(&epoch->active) == 0);
1251 D_ASSERT(epoch->flags == 0);
b411b363 1252
81e84650 1253 return true;
2451fc3b
PR
1254 default:
1255 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
81e84650 1256 return false;
b411b363
PR
1257 }
1258
1259 epoch->flags = 0;
1260 atomic_set(&epoch->epoch_size, 0);
1261 atomic_set(&epoch->active, 0);
1262
1263 spin_lock(&mdev->epoch_lock);
1264 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1265 list_add(&epoch->list, &mdev->current_epoch->list);
1266 mdev->current_epoch = epoch;
1267 mdev->epochs++;
b411b363
PR
1268 } else {
1269 /* The current_epoch got recycled while we allocated this one... */
1270 kfree(epoch);
1271 }
1272 spin_unlock(&mdev->epoch_lock);
1273
81e84650 1274 return true;
b411b363
PR
1275}
1276
1277/* used from receive_RSDataReply (recv_resync_read)
1278 * and from receive_Data */
f6ffca9f
AG
1279static struct drbd_peer_request *
1280read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1281 int data_size) __must_hold(local)
b411b363 1282{
6666032a 1283 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
db830c46 1284 struct drbd_peer_request *peer_req;
b411b363 1285 struct page *page;
45bb912b 1286 int dgs, ds, rr;
a0638456
PR
1287 void *dig_in = mdev->tconn->int_dig_in;
1288 void *dig_vv = mdev->tconn->int_dig_vv;
6b4388ac 1289 unsigned long *data;
b411b363 1290
a0638456
PR
1291 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1292 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
b411b363
PR
1293
1294 if (dgs) {
de0ff338 1295 rr = drbd_recv(mdev->tconn, dig_in, dgs);
b411b363 1296 if (rr != dgs) {
0ddc5549
LE
1297 if (!signal_pending(current))
1298 dev_warn(DEV,
1299 "short read receiving data digest: read %d expected %d\n",
1300 rr, dgs);
b411b363
PR
1301 return NULL;
1302 }
1303 }
1304
1305 data_size -= dgs;
1306
841ce241
AG
1307 if (!expect(data_size != 0))
1308 return NULL;
1309 if (!expect(IS_ALIGNED(data_size, 512)))
1310 return NULL;
1311 if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1312 return NULL;
b411b363 1313
6666032a
LE
1314 /* even though we trust out peer,
1315 * we sometimes have to double check. */
1316 if (sector + (data_size>>9) > capacity) {
fdda6544
LE
1317 dev_err(DEV, "request from peer beyond end of local disk: "
1318 "capacity: %llus < sector: %llus + size: %u\n",
6666032a
LE
1319 (unsigned long long)capacity,
1320 (unsigned long long)sector, data_size);
1321 return NULL;
1322 }
1323
b411b363
PR
1324 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1325 * "criss-cross" setup, that might cause write-out on some other DRBD,
1326 * which in turn might block on the other node at this very place. */
db830c46
AG
1327 peer_req = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1328 if (!peer_req)
b411b363 1329 return NULL;
45bb912b 1330
b411b363 1331 ds = data_size;
db830c46 1332 page = peer_req->pages;
45bb912b
LE
1333 page_chain_for_each(page) {
1334 unsigned len = min_t(int, ds, PAGE_SIZE);
6b4388ac 1335 data = kmap(page);
de0ff338 1336 rr = drbd_recv(mdev->tconn, data, len);
0cf9d27e 1337 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
6b4388ac
PR
1338 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1339 data[0] = data[0] ^ (unsigned long)-1;
1340 }
b411b363 1341 kunmap(page);
45bb912b 1342 if (rr != len) {
db830c46 1343 drbd_free_ee(mdev, peer_req);
0ddc5549
LE
1344 if (!signal_pending(current))
1345 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1346 rr, len);
b411b363
PR
1347 return NULL;
1348 }
1349 ds -= rr;
1350 }
1351
1352 if (dgs) {
db830c46 1353 drbd_csum_ee(mdev, mdev->tconn->integrity_r_tfm, peer_req, dig_vv);
b411b363 1354 if (memcmp(dig_in, dig_vv, dgs)) {
470be44a
LE
1355 dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1356 (unsigned long long)sector, data_size);
db830c46 1357 drbd_free_ee(mdev, peer_req);
b411b363
PR
1358 return NULL;
1359 }
1360 }
1361 mdev->recv_cnt += data_size>>9;
db830c46 1362 return peer_req;
b411b363
PR
1363}
1364
1365/* drbd_drain_block() just takes a data block
1366 * out of the socket input buffer, and discards it.
1367 */
1368static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1369{
1370 struct page *page;
1371 int rr, rv = 1;
1372 void *data;
1373
c3470cde 1374 if (!data_size)
81e84650 1375 return true;
c3470cde 1376
45bb912b 1377 page = drbd_pp_alloc(mdev, 1, 1);
b411b363
PR
1378
1379 data = kmap(page);
1380 while (data_size) {
de0ff338 1381 rr = drbd_recv(mdev->tconn, data, min_t(int, data_size, PAGE_SIZE));
b411b363
PR
1382 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1383 rv = 0;
0ddc5549
LE
1384 if (!signal_pending(current))
1385 dev_warn(DEV,
1386 "short read receiving data: read %d expected %d\n",
1387 rr, min_t(int, data_size, PAGE_SIZE));
b411b363
PR
1388 break;
1389 }
1390 data_size -= rr;
1391 }
1392 kunmap(page);
435f0740 1393 drbd_pp_free(mdev, page, 0);
b411b363
PR
1394 return rv;
1395}
1396
1397static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1398 sector_t sector, int data_size)
1399{
1400 struct bio_vec *bvec;
1401 struct bio *bio;
1402 int dgs, rr, i, expect;
a0638456
PR
1403 void *dig_in = mdev->tconn->int_dig_in;
1404 void *dig_vv = mdev->tconn->int_dig_vv;
b411b363 1405
a0638456
PR
1406 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1407 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
b411b363
PR
1408
1409 if (dgs) {
de0ff338 1410 rr = drbd_recv(mdev->tconn, dig_in, dgs);
b411b363 1411 if (rr != dgs) {
0ddc5549
LE
1412 if (!signal_pending(current))
1413 dev_warn(DEV,
1414 "short read receiving data reply digest: read %d expected %d\n",
1415 rr, dgs);
b411b363
PR
1416 return 0;
1417 }
1418 }
1419
1420 data_size -= dgs;
1421
1422 /* optimistically update recv_cnt. if receiving fails below,
1423 * we disconnect anyways, and counters will be reset. */
1424 mdev->recv_cnt += data_size>>9;
1425
1426 bio = req->master_bio;
1427 D_ASSERT(sector == bio->bi_sector);
1428
1429 bio_for_each_segment(bvec, bio, i) {
1430 expect = min_t(int, data_size, bvec->bv_len);
de0ff338 1431 rr = drbd_recv(mdev->tconn,
b411b363
PR
1432 kmap(bvec->bv_page)+bvec->bv_offset,
1433 expect);
1434 kunmap(bvec->bv_page);
1435 if (rr != expect) {
0ddc5549
LE
1436 if (!signal_pending(current))
1437 dev_warn(DEV, "short read receiving data reply: "
1438 "read %d expected %d\n",
1439 rr, expect);
b411b363
PR
1440 return 0;
1441 }
1442 data_size -= rr;
1443 }
1444
1445 if (dgs) {
a0638456 1446 drbd_csum_bio(mdev, mdev->tconn->integrity_r_tfm, bio, dig_vv);
b411b363
PR
1447 if (memcmp(dig_in, dig_vv, dgs)) {
1448 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1449 return 0;
1450 }
1451 }
1452
1453 D_ASSERT(data_size == 0);
1454 return 1;
1455}
1456
1457/* e_end_resync_block() is called via
1458 * drbd_process_done_ee() by asender only */
00d56944 1459static int e_end_resync_block(struct drbd_work *w, int unused)
b411b363 1460{
8050e6d0
AG
1461 struct drbd_peer_request *peer_req =
1462 container_of(w, struct drbd_peer_request, w);
00d56944 1463 struct drbd_conf *mdev = w->mdev;
db830c46 1464 sector_t sector = peer_req->i.sector;
b411b363
PR
1465 int ok;
1466
db830c46 1467 D_ASSERT(drbd_interval_empty(&peer_req->i));
b411b363 1468
db830c46
AG
1469 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1470 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1471 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
b411b363
PR
1472 } else {
1473 /* Record failure to sync */
db830c46 1474 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
b411b363 1475
db830c46 1476 ok = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
b411b363
PR
1477 }
1478 dec_unacked(mdev);
1479
1480 return ok;
1481}
1482
1483static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1484{
db830c46 1485 struct drbd_peer_request *peer_req;
b411b363 1486
db830c46
AG
1487 peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1488 if (!peer_req)
45bb912b 1489 goto fail;
b411b363
PR
1490
1491 dec_rs_pending(mdev);
1492
b411b363
PR
1493 inc_unacked(mdev);
1494 /* corresponding dec_unacked() in e_end_resync_block()
1495 * respective _drbd_clear_done_ee */
1496
db830c46 1497 peer_req->w.cb = e_end_resync_block;
45bb912b 1498
87eeee41 1499 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 1500 list_add(&peer_req->w.list, &mdev->sync_ee);
87eeee41 1501 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 1502
0f0601f4 1503 atomic_add(data_size >> 9, &mdev->rs_sect_ev);
fbe29dec 1504 if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
81e84650 1505 return true;
b411b363 1506
10f6d992
LE
1507 /* don't care for the reason here */
1508 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 1509 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 1510 list_del(&peer_req->w.list);
87eeee41 1511 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9 1512
db830c46 1513 drbd_free_ee(mdev, peer_req);
45bb912b
LE
1514fail:
1515 put_ldev(mdev);
81e84650 1516 return false;
b411b363
PR
1517}
1518
668eebc6 1519static struct drbd_request *
bc9c5c41
AG
1520find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1521 sector_t sector, bool missing_ok, const char *func)
51624585 1522{
51624585
AG
1523 struct drbd_request *req;
1524
bc9c5c41
AG
1525 /* Request object according to our peer */
1526 req = (struct drbd_request *)(unsigned long)id;
5e472264 1527 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
668eebc6 1528 return req;
c3afd8f5
AG
1529 if (!missing_ok) {
1530 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1531 (unsigned long)id, (unsigned long long)sector);
1532 }
51624585
AG
1533 return NULL;
1534}
1535
d8763023
AG
1536static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packet cmd,
1537 unsigned int data_size)
b411b363
PR
1538{
1539 struct drbd_request *req;
1540 sector_t sector;
b411b363 1541 int ok;
e42325a5 1542 struct p_data *p = &mdev->tconn->data.rbuf.data;
b411b363
PR
1543
1544 sector = be64_to_cpu(p->sector);
1545
87eeee41 1546 spin_lock_irq(&mdev->tconn->req_lock);
bc9c5c41 1547 req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
87eeee41 1548 spin_unlock_irq(&mdev->tconn->req_lock);
c3afd8f5 1549 if (unlikely(!req))
81e84650 1550 return false;
b411b363 1551
24c4830c 1552 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
b411b363
PR
1553 * special casing it there for the various failure cases.
1554 * still no race with drbd_fail_pending_reads */
1555 ok = recv_dless_read(mdev, req, sector, data_size);
1556
1557 if (ok)
8554df1c 1558 req_mod(req, DATA_RECEIVED);
b411b363
PR
1559 /* else: nothing. handled from drbd_disconnect...
1560 * I don't think we may complete this just yet
1561 * in case we are "on-disconnect: freeze" */
1562
1563 return ok;
1564}
1565
d8763023
AG
1566static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packet cmd,
1567 unsigned int data_size)
b411b363
PR
1568{
1569 sector_t sector;
b411b363 1570 int ok;
e42325a5 1571 struct p_data *p = &mdev->tconn->data.rbuf.data;
b411b363
PR
1572
1573 sector = be64_to_cpu(p->sector);
1574 D_ASSERT(p->block_id == ID_SYNCER);
1575
1576 if (get_ldev(mdev)) {
1577 /* data is submitted to disk within recv_resync_read.
1578 * corresponding put_ldev done below on error,
fcefa62e 1579 * or in drbd_peer_request_endio. */
b411b363
PR
1580 ok = recv_resync_read(mdev, sector, data_size);
1581 } else {
1582 if (__ratelimit(&drbd_ratelimit_state))
1583 dev_err(DEV, "Can not write resync data to local disk.\n");
1584
1585 ok = drbd_drain_block(mdev, data_size);
1586
2b2bf214 1587 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
b411b363
PR
1588 }
1589
778f271d
PR
1590 atomic_add(data_size >> 9, &mdev->rs_sect_in);
1591
b411b363
PR
1592 return ok;
1593}
1594
7be8da07
AG
1595static int w_restart_write(struct drbd_work *w, int cancel)
1596{
1597 struct drbd_request *req = container_of(w, struct drbd_request, w);
1598 struct drbd_conf *mdev = w->mdev;
1599 struct bio *bio;
1600 unsigned long start_time;
1601 unsigned long flags;
1602
1603 spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1604 if (!expect(req->rq_state & RQ_POSTPONED)) {
1605 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1606 return 0;
1607 }
1608 bio = req->master_bio;
1609 start_time = req->start_time;
1610 /* Postponed requests will not have their master_bio completed! */
1611 __req_mod(req, DISCARD_WRITE, NULL);
1612 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1613
1614 while (__drbd_make_request(mdev, bio, start_time))
1615 /* retry */ ;
1616 return 1;
1617}
1618
1619static void restart_conflicting_writes(struct drbd_conf *mdev,
1620 sector_t sector, int size)
1621{
1622 struct drbd_interval *i;
1623 struct drbd_request *req;
1624
1625 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1626 if (!i->local)
1627 continue;
1628 req = container_of(i, struct drbd_request, i);
1629 if (req->rq_state & RQ_LOCAL_PENDING ||
1630 !(req->rq_state & RQ_POSTPONED))
1631 continue;
1632 if (expect(list_empty(&req->w.list))) {
1633 req->w.mdev = mdev;
1634 req->w.cb = w_restart_write;
1635 drbd_queue_work(&mdev->tconn->data.work, &req->w);
1636 }
1637 }
1638}
1639
b411b363
PR
1640/* e_end_block() is called via drbd_process_done_ee().
1641 * this means this function only runs in the asender thread
1642 */
00d56944 1643static int e_end_block(struct drbd_work *w, int cancel)
b411b363 1644{
8050e6d0
AG
1645 struct drbd_peer_request *peer_req =
1646 container_of(w, struct drbd_peer_request, w);
00d56944 1647 struct drbd_conf *mdev = w->mdev;
db830c46 1648 sector_t sector = peer_req->i.sector;
b411b363
PR
1649 int ok = 1, pcmd;
1650
89e58e75 1651 if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C) {
db830c46 1652 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1653 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1654 mdev->state.conn <= C_PAUSED_SYNC_T &&
db830c46 1655 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
b411b363 1656 P_RS_WRITE_ACK : P_WRITE_ACK;
db830c46 1657 ok &= drbd_send_ack(mdev, pcmd, peer_req);
b411b363 1658 if (pcmd == P_RS_WRITE_ACK)
db830c46 1659 drbd_set_in_sync(mdev, sector, peer_req->i.size);
b411b363 1660 } else {
db830c46 1661 ok = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
b411b363
PR
1662 /* we expect it to be marked out of sync anyways...
1663 * maybe assert this? */
1664 }
1665 dec_unacked(mdev);
1666 }
1667 /* we delete from the conflict detection hash _after_ we sent out the
1668 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
89e58e75 1669 if (mdev->tconn->net_conf->two_primaries) {
87eeee41 1670 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
1671 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1672 drbd_remove_epoch_entry_interval(mdev, peer_req);
7be8da07
AG
1673 if (peer_req->flags & EE_RESTART_REQUESTS)
1674 restart_conflicting_writes(mdev, sector, peer_req->i.size);
87eeee41 1675 spin_unlock_irq(&mdev->tconn->req_lock);
bb3bfe96 1676 } else
db830c46 1677 D_ASSERT(drbd_interval_empty(&peer_req->i));
b411b363 1678
db830c46 1679 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
b411b363
PR
1680
1681 return ok;
1682}
1683
7be8da07 1684static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
b411b363 1685{
7be8da07 1686 struct drbd_conf *mdev = w->mdev;
8050e6d0
AG
1687 struct drbd_peer_request *peer_req =
1688 container_of(w, struct drbd_peer_request, w);
206d3589 1689 int ok;
b411b363 1690
7be8da07 1691 ok = drbd_send_ack(mdev, ack, peer_req);
b411b363
PR
1692 dec_unacked(mdev);
1693
1694 return ok;
1695}
1696
7be8da07
AG
1697static int e_send_discard_write(struct drbd_work *w, int unused)
1698{
1699 return e_send_ack(w, P_DISCARD_WRITE);
1700}
1701
1702static int e_send_retry_write(struct drbd_work *w, int unused)
1703{
1704 struct drbd_tconn *tconn = w->mdev->tconn;
1705
1706 return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1707 P_RETRY_WRITE : P_DISCARD_WRITE);
1708}
1709
3e394da1
AG
1710static bool seq_greater(u32 a, u32 b)
1711{
1712 /*
1713 * We assume 32-bit wrap-around here.
1714 * For 24-bit wrap-around, we would have to shift:
1715 * a <<= 8; b <<= 8;
1716 */
1717 return (s32)a - (s32)b > 0;
1718}
1719
1720static u32 seq_max(u32 a, u32 b)
1721{
1722 return seq_greater(a, b) ? a : b;
1723}
1724
7be8da07
AG
1725static bool need_peer_seq(struct drbd_conf *mdev)
1726{
1727 struct drbd_tconn *tconn = mdev->tconn;
1728
1729 /*
1730 * We only need to keep track of the last packet_seq number of our peer
1731 * if we are in dual-primary mode and we have the discard flag set; see
1732 * handle_write_conflicts().
1733 */
1734 return tconn->net_conf->two_primaries &&
1735 test_bit(DISCARD_CONCURRENT, &tconn->flags);
1736}
1737
43ae077d 1738static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
3e394da1 1739{
3c13b680 1740 unsigned int newest_peer_seq;
3e394da1 1741
7be8da07
AG
1742 if (need_peer_seq(mdev)) {
1743 spin_lock(&mdev->peer_seq_lock);
3c13b680
LE
1744 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1745 mdev->peer_seq = newest_peer_seq;
7be8da07 1746 spin_unlock(&mdev->peer_seq_lock);
3c13b680
LE
1747 /* wake up only if we actually changed mdev->peer_seq */
1748 if (peer_seq == newest_peer_seq)
7be8da07
AG
1749 wake_up(&mdev->seq_wait);
1750 }
3e394da1
AG
1751}
1752
b411b363
PR
1753/* Called from receive_Data.
1754 * Synchronize packets on sock with packets on msock.
1755 *
1756 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1757 * packet traveling on msock, they are still processed in the order they have
1758 * been sent.
1759 *
1760 * Note: we don't care for Ack packets overtaking P_DATA packets.
1761 *
1762 * In case packet_seq is larger than mdev->peer_seq number, there are
1763 * outstanding packets on the msock. We wait for them to arrive.
1764 * In case we are the logically next packet, we update mdev->peer_seq
1765 * ourselves. Correctly handles 32bit wrap around.
1766 *
1767 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1768 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1769 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1770 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1771 *
1772 * returns 0 if we may process the packet,
1773 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
7be8da07 1774static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
b411b363
PR
1775{
1776 DEFINE_WAIT(wait);
b411b363 1777 long timeout;
7be8da07
AG
1778 int ret;
1779
1780 if (!need_peer_seq(mdev))
1781 return 0;
1782
b411b363
PR
1783 spin_lock(&mdev->peer_seq_lock);
1784 for (;;) {
7be8da07
AG
1785 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1786 mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1787 ret = 0;
b411b363 1788 break;
7be8da07 1789 }
b411b363
PR
1790 if (signal_pending(current)) {
1791 ret = -ERESTARTSYS;
1792 break;
1793 }
7be8da07 1794 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
b411b363 1795 spin_unlock(&mdev->peer_seq_lock);
71b1c1eb
AG
1796 timeout = mdev->tconn->net_conf->ping_timeo*HZ/10;
1797 timeout = schedule_timeout(timeout);
b411b363 1798 spin_lock(&mdev->peer_seq_lock);
7be8da07 1799 if (!timeout) {
b411b363 1800 ret = -ETIMEDOUT;
71b1c1eb 1801 dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
b411b363
PR
1802 break;
1803 }
1804 }
b411b363 1805 spin_unlock(&mdev->peer_seq_lock);
7be8da07 1806 finish_wait(&mdev->seq_wait, &wait);
b411b363
PR
1807 return ret;
1808}
1809
688593c5
LE
1810/* see also bio_flags_to_wire()
1811 * DRBD_REQ_*, because we need to semantically map the flags to data packet
1812 * flags and back. We may replicate to other kernel versions. */
1813static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
76d2e7ec 1814{
688593c5
LE
1815 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1816 (dpf & DP_FUA ? REQ_FUA : 0) |
1817 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1818 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
76d2e7ec
PR
1819}
1820
7be8da07
AG
1821static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1822 unsigned int size)
1823{
1824 struct drbd_interval *i;
1825
1826 repeat:
1827 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1828 struct drbd_request *req;
1829 struct bio_and_error m;
1830
1831 if (!i->local)
1832 continue;
1833 req = container_of(i, struct drbd_request, i);
1834 if (!(req->rq_state & RQ_POSTPONED))
1835 continue;
1836 req->rq_state &= ~RQ_POSTPONED;
1837 __req_mod(req, NEG_ACKED, &m);
1838 spin_unlock_irq(&mdev->tconn->req_lock);
1839 if (m.bio)
1840 complete_master_bio(mdev, &m);
1841 spin_lock_irq(&mdev->tconn->req_lock);
1842 goto repeat;
1843 }
1844}
1845
1846static int handle_write_conflicts(struct drbd_conf *mdev,
1847 struct drbd_peer_request *peer_req)
1848{
1849 struct drbd_tconn *tconn = mdev->tconn;
1850 bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1851 sector_t sector = peer_req->i.sector;
1852 const unsigned int size = peer_req->i.size;
1853 struct drbd_interval *i;
1854 bool equal;
1855 int err;
1856
1857 /*
1858 * Inserting the peer request into the write_requests tree will prevent
1859 * new conflicting local requests from being added.
1860 */
1861 drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1862
1863 repeat:
1864 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1865 if (i == &peer_req->i)
1866 continue;
1867
1868 if (!i->local) {
1869 /*
1870 * Our peer has sent a conflicting remote request; this
1871 * should not happen in a two-node setup. Wait for the
1872 * earlier peer request to complete.
1873 */
1874 err = drbd_wait_misc(mdev, i);
1875 if (err)
1876 goto out;
1877 goto repeat;
1878 }
1879
1880 equal = i->sector == sector && i->size == size;
1881 if (resolve_conflicts) {
1882 /*
1883 * If the peer request is fully contained within the
1884 * overlapping request, it can be discarded; otherwise,
1885 * it will be retried once all overlapping requests
1886 * have completed.
1887 */
1888 bool discard = i->sector <= sector && i->sector +
1889 (i->size >> 9) >= sector + (size >> 9);
1890
1891 if (!equal)
1892 dev_alert(DEV, "Concurrent writes detected: "
1893 "local=%llus +%u, remote=%llus +%u, "
1894 "assuming %s came first\n",
1895 (unsigned long long)i->sector, i->size,
1896 (unsigned long long)sector, size,
1897 discard ? "local" : "remote");
1898
1899 inc_unacked(mdev);
1900 peer_req->w.cb = discard ? e_send_discard_write :
1901 e_send_retry_write;
1902 list_add_tail(&peer_req->w.list, &mdev->done_ee);
1903 wake_asender(mdev->tconn);
1904
1905 err = -ENOENT;
1906 goto out;
1907 } else {
1908 struct drbd_request *req =
1909 container_of(i, struct drbd_request, i);
1910
1911 if (!equal)
1912 dev_alert(DEV, "Concurrent writes detected: "
1913 "local=%llus +%u, remote=%llus +%u\n",
1914 (unsigned long long)i->sector, i->size,
1915 (unsigned long long)sector, size);
1916
1917 if (req->rq_state & RQ_LOCAL_PENDING ||
1918 !(req->rq_state & RQ_POSTPONED)) {
1919 /*
1920 * Wait for the node with the discard flag to
1921 * decide if this request will be discarded or
1922 * retried. Requests that are discarded will
1923 * disappear from the write_requests tree.
1924 *
1925 * In addition, wait for the conflicting
1926 * request to finish locally before submitting
1927 * the conflicting peer request.
1928 */
1929 err = drbd_wait_misc(mdev, &req->i);
1930 if (err) {
1931 _conn_request_state(mdev->tconn,
1932 NS(conn, C_TIMEOUT),
1933 CS_HARD);
1934 fail_postponed_requests(mdev, sector, size);
1935 goto out;
1936 }
1937 goto repeat;
1938 }
1939 /*
1940 * Remember to restart the conflicting requests after
1941 * the new peer request has completed.
1942 */
1943 peer_req->flags |= EE_RESTART_REQUESTS;
1944 }
1945 }
1946 err = 0;
1947
1948 out:
1949 if (err)
1950 drbd_remove_epoch_entry_interval(mdev, peer_req);
1951 return err;
1952}
1953
b411b363 1954/* mirrored write */
d8763023
AG
1955static int receive_Data(struct drbd_conf *mdev, enum drbd_packet cmd,
1956 unsigned int data_size)
b411b363
PR
1957{
1958 sector_t sector;
db830c46 1959 struct drbd_peer_request *peer_req;
e42325a5 1960 struct p_data *p = &mdev->tconn->data.rbuf.data;
7be8da07 1961 u32 peer_seq = be32_to_cpu(p->seq_num);
b411b363
PR
1962 int rw = WRITE;
1963 u32 dp_flags;
7be8da07 1964 int err;
b411b363 1965
b411b363 1966
7be8da07
AG
1967 if (!get_ldev(mdev)) {
1968 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2b2bf214 1969 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
b411b363 1970 atomic_inc(&mdev->current_epoch->epoch_size);
7be8da07 1971 return drbd_drain_block(mdev, data_size) && err == 0;
b411b363
PR
1972 }
1973
fcefa62e
AG
1974 /*
1975 * Corresponding put_ldev done either below (on various errors), or in
1976 * drbd_peer_request_endio, if we successfully submit the data at the
1977 * end of this function.
1978 */
b411b363
PR
1979
1980 sector = be64_to_cpu(p->sector);
db830c46
AG
1981 peer_req = read_in_block(mdev, p->block_id, sector, data_size);
1982 if (!peer_req) {
b411b363 1983 put_ldev(mdev);
81e84650 1984 return false;
b411b363
PR
1985 }
1986
db830c46 1987 peer_req->w.cb = e_end_block;
b411b363 1988
688593c5
LE
1989 dp_flags = be32_to_cpu(p->dp_flags);
1990 rw |= wire_flags_to_bio(mdev, dp_flags);
1991
1992 if (dp_flags & DP_MAY_SET_IN_SYNC)
db830c46 1993 peer_req->flags |= EE_MAY_SET_IN_SYNC;
688593c5 1994
b411b363 1995 spin_lock(&mdev->epoch_lock);
db830c46
AG
1996 peer_req->epoch = mdev->current_epoch;
1997 atomic_inc(&peer_req->epoch->epoch_size);
1998 atomic_inc(&peer_req->epoch->active);
b411b363
PR
1999 spin_unlock(&mdev->epoch_lock);
2000
7be8da07
AG
2001 if (mdev->tconn->net_conf->two_primaries) {
2002 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2003 if (err)
b411b363 2004 goto out_interrupted;
87eeee41 2005 spin_lock_irq(&mdev->tconn->req_lock);
7be8da07
AG
2006 err = handle_write_conflicts(mdev, peer_req);
2007 if (err) {
2008 spin_unlock_irq(&mdev->tconn->req_lock);
2009 if (err == -ENOENT) {
b411b363 2010 put_ldev(mdev);
81e84650 2011 return true;
b411b363 2012 }
7be8da07 2013 goto out_interrupted;
b411b363 2014 }
7be8da07
AG
2015 } else
2016 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2017 list_add(&peer_req->w.list, &mdev->active_ee);
87eeee41 2018 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 2019
89e58e75 2020 switch (mdev->tconn->net_conf->wire_protocol) {
b411b363
PR
2021 case DRBD_PROT_C:
2022 inc_unacked(mdev);
2023 /* corresponding dec_unacked() in e_end_block()
2024 * respective _drbd_clear_done_ee */
2025 break;
2026 case DRBD_PROT_B:
2027 /* I really don't like it that the receiver thread
2028 * sends on the msock, but anyways */
db830c46 2029 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
b411b363
PR
2030 break;
2031 case DRBD_PROT_A:
2032 /* nothing to do */
2033 break;
2034 }
2035
6719fb03 2036 if (mdev->state.pdsk < D_INCONSISTENT) {
b411b363 2037 /* In case we have the only disk of the cluster, */
db830c46
AG
2038 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2039 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2040 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2041 drbd_al_begin_io(mdev, peer_req->i.sector);
b411b363
PR
2042 }
2043
fbe29dec 2044 if (drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR) == 0)
81e84650 2045 return true;
b411b363 2046
10f6d992
LE
2047 /* don't care for the reason here */
2048 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 2049 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
2050 list_del(&peer_req->w.list);
2051 drbd_remove_epoch_entry_interval(mdev, peer_req);
87eeee41 2052 spin_unlock_irq(&mdev->tconn->req_lock);
db830c46
AG
2053 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2054 drbd_al_complete_io(mdev, peer_req->i.sector);
22cc37a9 2055
b411b363 2056out_interrupted:
db830c46 2057 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
b411b363 2058 put_ldev(mdev);
db830c46 2059 drbd_free_ee(mdev, peer_req);
81e84650 2060 return false;
b411b363
PR
2061}
2062
0f0601f4
LE
2063/* We may throttle resync, if the lower device seems to be busy,
2064 * and current sync rate is above c_min_rate.
2065 *
2066 * To decide whether or not the lower device is busy, we use a scheme similar
2067 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2068 * (more than 64 sectors) of activity we cannot account for with our own resync
2069 * activity, it obviously is "busy".
2070 *
2071 * The current sync rate used here uses only the most recent two step marks,
2072 * to have a short time average so we can react faster.
2073 */
e3555d85 2074int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
0f0601f4
LE
2075{
2076 struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2077 unsigned long db, dt, dbdt;
e3555d85 2078 struct lc_element *tmp;
0f0601f4
LE
2079 int curr_events;
2080 int throttle = 0;
2081
2082 /* feature disabled? */
2083 if (mdev->sync_conf.c_min_rate == 0)
2084 return 0;
2085
e3555d85
PR
2086 spin_lock_irq(&mdev->al_lock);
2087 tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2088 if (tmp) {
2089 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2090 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2091 spin_unlock_irq(&mdev->al_lock);
2092 return 0;
2093 }
2094 /* Do not slow down if app IO is already waiting for this extent */
2095 }
2096 spin_unlock_irq(&mdev->al_lock);
2097
0f0601f4
LE
2098 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2099 (int)part_stat_read(&disk->part0, sectors[1]) -
2100 atomic_read(&mdev->rs_sect_ev);
e3555d85 2101
0f0601f4
LE
2102 if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2103 unsigned long rs_left;
2104 int i;
2105
2106 mdev->rs_last_events = curr_events;
2107
2108 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2109 * approx. */
2649f080
LE
2110 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2111
2112 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2113 rs_left = mdev->ov_left;
2114 else
2115 rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
0f0601f4
LE
2116
2117 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2118 if (!dt)
2119 dt++;
2120 db = mdev->rs_mark_left[i] - rs_left;
2121 dbdt = Bit2KB(db/dt);
2122
2123 if (dbdt > mdev->sync_conf.c_min_rate)
2124 throttle = 1;
2125 }
2126 return throttle;
2127}
2128
2129
d8763023
AG
2130static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packet cmd,
2131 unsigned int digest_size)
b411b363
PR
2132{
2133 sector_t sector;
2134 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
db830c46 2135 struct drbd_peer_request *peer_req;
b411b363 2136 struct digest_info *di = NULL;
b18b37be 2137 int size, verb;
b411b363 2138 unsigned int fault_type;
e42325a5 2139 struct p_block_req *p = &mdev->tconn->data.rbuf.block_req;
b411b363
PR
2140
2141 sector = be64_to_cpu(p->sector);
2142 size = be32_to_cpu(p->blksize);
2143
c670a398 2144 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
b411b363
PR
2145 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2146 (unsigned long long)sector, size);
81e84650 2147 return false;
b411b363
PR
2148 }
2149 if (sector + (size>>9) > capacity) {
2150 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2151 (unsigned long long)sector, size);
81e84650 2152 return false;
b411b363
PR
2153 }
2154
2155 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
b18b37be
PR
2156 verb = 1;
2157 switch (cmd) {
2158 case P_DATA_REQUEST:
2159 drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2160 break;
2161 case P_RS_DATA_REQUEST:
2162 case P_CSUM_RS_REQUEST:
2163 case P_OV_REQUEST:
2164 drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2165 break;
2166 case P_OV_REPLY:
2167 verb = 0;
2168 dec_rs_pending(mdev);
2169 drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2170 break;
2171 default:
2172 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2173 cmdname(cmd));
2174 }
2175 if (verb && __ratelimit(&drbd_ratelimit_state))
b411b363
PR
2176 dev_err(DEV, "Can not satisfy peer's read request, "
2177 "no local data.\n");
b18b37be 2178
a821cc4a
LE
2179 /* drain possibly payload */
2180 return drbd_drain_block(mdev, digest_size);
b411b363
PR
2181 }
2182
2183 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2184 * "criss-cross" setup, that might cause write-out on some other DRBD,
2185 * which in turn might block on the other node at this very place. */
db830c46
AG
2186 peer_req = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2187 if (!peer_req) {
b411b363 2188 put_ldev(mdev);
81e84650 2189 return false;
b411b363
PR
2190 }
2191
02918be2 2192 switch (cmd) {
b411b363 2193 case P_DATA_REQUEST:
db830c46 2194 peer_req->w.cb = w_e_end_data_req;
b411b363 2195 fault_type = DRBD_FAULT_DT_RD;
80a40e43
LE
2196 /* application IO, don't drbd_rs_begin_io */
2197 goto submit;
2198
b411b363 2199 case P_RS_DATA_REQUEST:
db830c46 2200 peer_req->w.cb = w_e_end_rsdata_req;
b411b363 2201 fault_type = DRBD_FAULT_RS_RD;
5f9915bb
LE
2202 /* used in the sector offset progress display */
2203 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
b411b363
PR
2204 break;
2205
2206 case P_OV_REPLY:
2207 case P_CSUM_RS_REQUEST:
2208 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2209 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2210 if (!di)
2211 goto out_free_e;
2212
2213 di->digest_size = digest_size;
2214 di->digest = (((char *)di)+sizeof(struct digest_info));
2215
db830c46
AG
2216 peer_req->digest = di;
2217 peer_req->flags |= EE_HAS_DIGEST;
c36c3ced 2218
de0ff338 2219 if (drbd_recv(mdev->tconn, di->digest, digest_size) != digest_size)
b411b363
PR
2220 goto out_free_e;
2221
02918be2 2222 if (cmd == P_CSUM_RS_REQUEST) {
31890f4a 2223 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
db830c46 2224 peer_req->w.cb = w_e_end_csum_rs_req;
5f9915bb
LE
2225 /* used in the sector offset progress display */
2226 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
02918be2 2227 } else if (cmd == P_OV_REPLY) {
2649f080
LE
2228 /* track progress, we may need to throttle */
2229 atomic_add(size >> 9, &mdev->rs_sect_in);
db830c46 2230 peer_req->w.cb = w_e_end_ov_reply;
b411b363 2231 dec_rs_pending(mdev);
0f0601f4
LE
2232 /* drbd_rs_begin_io done when we sent this request,
2233 * but accounting still needs to be done. */
2234 goto submit_for_resync;
b411b363
PR
2235 }
2236 break;
2237
2238 case P_OV_REQUEST:
b411b363 2239 if (mdev->ov_start_sector == ~(sector_t)0 &&
31890f4a 2240 mdev->tconn->agreed_pro_version >= 90) {
de228bba
LE
2241 unsigned long now = jiffies;
2242 int i;
b411b363
PR
2243 mdev->ov_start_sector = sector;
2244 mdev->ov_position = sector;
30b743a2
LE
2245 mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2246 mdev->rs_total = mdev->ov_left;
de228bba
LE
2247 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2248 mdev->rs_mark_left[i] = mdev->ov_left;
2249 mdev->rs_mark_time[i] = now;
2250 }
b411b363
PR
2251 dev_info(DEV, "Online Verify start sector: %llu\n",
2252 (unsigned long long)sector);
2253 }
db830c46 2254 peer_req->w.cb = w_e_end_ov_req;
b411b363 2255 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2256 break;
2257
b411b363
PR
2258 default:
2259 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
02918be2 2260 cmdname(cmd));
b411b363 2261 fault_type = DRBD_FAULT_MAX;
80a40e43 2262 goto out_free_e;
b411b363
PR
2263 }
2264
0f0601f4
LE
2265 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2266 * wrt the receiver, but it is not as straightforward as it may seem.
2267 * Various places in the resync start and stop logic assume resync
2268 * requests are processed in order, requeuing this on the worker thread
2269 * introduces a bunch of new code for synchronization between threads.
2270 *
2271 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2272 * "forever", throttling after drbd_rs_begin_io will lock that extent
2273 * for application writes for the same time. For now, just throttle
2274 * here, where the rest of the code expects the receiver to sleep for
2275 * a while, anyways.
2276 */
2277
2278 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2279 * this defers syncer requests for some time, before letting at least
2280 * on request through. The resync controller on the receiving side
2281 * will adapt to the incoming rate accordingly.
2282 *
2283 * We cannot throttle here if remote is Primary/SyncTarget:
2284 * we would also throttle its application reads.
2285 * In that case, throttling is done on the SyncTarget only.
2286 */
e3555d85
PR
2287 if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2288 schedule_timeout_uninterruptible(HZ/10);
2289 if (drbd_rs_begin_io(mdev, sector))
80a40e43 2290 goto out_free_e;
b411b363 2291
0f0601f4
LE
2292submit_for_resync:
2293 atomic_add(size >> 9, &mdev->rs_sect_ev);
2294
80a40e43 2295submit:
b411b363 2296 inc_unacked(mdev);
87eeee41 2297 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2298 list_add_tail(&peer_req->w.list, &mdev->read_ee);
87eeee41 2299 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 2300
fbe29dec 2301 if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
81e84650 2302 return true;
b411b363 2303
10f6d992
LE
2304 /* don't care for the reason here */
2305 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 2306 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2307 list_del(&peer_req->w.list);
87eeee41 2308 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9
LE
2309 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2310
b411b363 2311out_free_e:
b411b363 2312 put_ldev(mdev);
db830c46 2313 drbd_free_ee(mdev, peer_req);
81e84650 2314 return false;
b411b363
PR
2315}
2316
2317static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2318{
2319 int self, peer, rv = -100;
2320 unsigned long ch_self, ch_peer;
2321
2322 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2323 peer = mdev->p_uuid[UI_BITMAP] & 1;
2324
2325 ch_peer = mdev->p_uuid[UI_SIZE];
2326 ch_self = mdev->comm_bm_set;
2327
89e58e75 2328 switch (mdev->tconn->net_conf->after_sb_0p) {
b411b363
PR
2329 case ASB_CONSENSUS:
2330 case ASB_DISCARD_SECONDARY:
2331 case ASB_CALL_HELPER:
2332 dev_err(DEV, "Configuration error.\n");
2333 break;
2334 case ASB_DISCONNECT:
2335 break;
2336 case ASB_DISCARD_YOUNGER_PRI:
2337 if (self == 0 && peer == 1) {
2338 rv = -1;
2339 break;
2340 }
2341 if (self == 1 && peer == 0) {
2342 rv = 1;
2343 break;
2344 }
2345 /* Else fall through to one of the other strategies... */
2346 case ASB_DISCARD_OLDER_PRI:
2347 if (self == 0 && peer == 1) {
2348 rv = 1;
2349 break;
2350 }
2351 if (self == 1 && peer == 0) {
2352 rv = -1;
2353 break;
2354 }
2355 /* Else fall through to one of the other strategies... */
ad19bf6e 2356 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
b411b363
PR
2357 "Using discard-least-changes instead\n");
2358 case ASB_DISCARD_ZERO_CHG:
2359 if (ch_peer == 0 && ch_self == 0) {
25703f83 2360 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
b411b363
PR
2361 ? -1 : 1;
2362 break;
2363 } else {
2364 if (ch_peer == 0) { rv = 1; break; }
2365 if (ch_self == 0) { rv = -1; break; }
2366 }
89e58e75 2367 if (mdev->tconn->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
b411b363
PR
2368 break;
2369 case ASB_DISCARD_LEAST_CHG:
2370 if (ch_self < ch_peer)
2371 rv = -1;
2372 else if (ch_self > ch_peer)
2373 rv = 1;
2374 else /* ( ch_self == ch_peer ) */
2375 /* Well, then use something else. */
25703f83 2376 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
b411b363
PR
2377 ? -1 : 1;
2378 break;
2379 case ASB_DISCARD_LOCAL:
2380 rv = -1;
2381 break;
2382 case ASB_DISCARD_REMOTE:
2383 rv = 1;
2384 }
2385
2386 return rv;
2387}
2388
2389static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2390{
6184ea21 2391 int hg, rv = -100;
b411b363 2392
89e58e75 2393 switch (mdev->tconn->net_conf->after_sb_1p) {
b411b363
PR
2394 case ASB_DISCARD_YOUNGER_PRI:
2395 case ASB_DISCARD_OLDER_PRI:
2396 case ASB_DISCARD_LEAST_CHG:
2397 case ASB_DISCARD_LOCAL:
2398 case ASB_DISCARD_REMOTE:
2399 dev_err(DEV, "Configuration error.\n");
2400 break;
2401 case ASB_DISCONNECT:
2402 break;
2403 case ASB_CONSENSUS:
2404 hg = drbd_asb_recover_0p(mdev);
2405 if (hg == -1 && mdev->state.role == R_SECONDARY)
2406 rv = hg;
2407 if (hg == 1 && mdev->state.role == R_PRIMARY)
2408 rv = hg;
2409 break;
2410 case ASB_VIOLENTLY:
2411 rv = drbd_asb_recover_0p(mdev);
2412 break;
2413 case ASB_DISCARD_SECONDARY:
2414 return mdev->state.role == R_PRIMARY ? 1 : -1;
2415 case ASB_CALL_HELPER:
2416 hg = drbd_asb_recover_0p(mdev);
2417 if (hg == -1 && mdev->state.role == R_PRIMARY) {
bb437946
AG
2418 enum drbd_state_rv rv2;
2419
2420 drbd_set_role(mdev, R_SECONDARY, 0);
b411b363
PR
2421 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2422 * we might be here in C_WF_REPORT_PARAMS which is transient.
2423 * we do not need to wait for the after state change work either. */
bb437946
AG
2424 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2425 if (rv2 != SS_SUCCESS) {
b411b363
PR
2426 drbd_khelper(mdev, "pri-lost-after-sb");
2427 } else {
2428 dev_warn(DEV, "Successfully gave up primary role.\n");
2429 rv = hg;
2430 }
2431 } else
2432 rv = hg;
2433 }
2434
2435 return rv;
2436}
2437
2438static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2439{
6184ea21 2440 int hg, rv = -100;
b411b363 2441
89e58e75 2442 switch (mdev->tconn->net_conf->after_sb_2p) {
b411b363
PR
2443 case ASB_DISCARD_YOUNGER_PRI:
2444 case ASB_DISCARD_OLDER_PRI:
2445 case ASB_DISCARD_LEAST_CHG:
2446 case ASB_DISCARD_LOCAL:
2447 case ASB_DISCARD_REMOTE:
2448 case ASB_CONSENSUS:
2449 case ASB_DISCARD_SECONDARY:
2450 dev_err(DEV, "Configuration error.\n");
2451 break;
2452 case ASB_VIOLENTLY:
2453 rv = drbd_asb_recover_0p(mdev);
2454 break;
2455 case ASB_DISCONNECT:
2456 break;
2457 case ASB_CALL_HELPER:
2458 hg = drbd_asb_recover_0p(mdev);
2459 if (hg == -1) {
bb437946
AG
2460 enum drbd_state_rv rv2;
2461
b411b363
PR
2462 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2463 * we might be here in C_WF_REPORT_PARAMS which is transient.
2464 * we do not need to wait for the after state change work either. */
bb437946
AG
2465 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2466 if (rv2 != SS_SUCCESS) {
b411b363
PR
2467 drbd_khelper(mdev, "pri-lost-after-sb");
2468 } else {
2469 dev_warn(DEV, "Successfully gave up primary role.\n");
2470 rv = hg;
2471 }
2472 } else
2473 rv = hg;
2474 }
2475
2476 return rv;
2477}
2478
2479static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2480 u64 bits, u64 flags)
2481{
2482 if (!uuid) {
2483 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2484 return;
2485 }
2486 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2487 text,
2488 (unsigned long long)uuid[UI_CURRENT],
2489 (unsigned long long)uuid[UI_BITMAP],
2490 (unsigned long long)uuid[UI_HISTORY_START],
2491 (unsigned long long)uuid[UI_HISTORY_END],
2492 (unsigned long long)bits,
2493 (unsigned long long)flags);
2494}
2495
2496/*
2497 100 after split brain try auto recover
2498 2 C_SYNC_SOURCE set BitMap
2499 1 C_SYNC_SOURCE use BitMap
2500 0 no Sync
2501 -1 C_SYNC_TARGET use BitMap
2502 -2 C_SYNC_TARGET set BitMap
2503 -100 after split brain, disconnect
2504-1000 unrelated data
4a23f264
PR
2505-1091 requires proto 91
2506-1096 requires proto 96
b411b363
PR
2507 */
2508static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2509{
2510 u64 self, peer;
2511 int i, j;
2512
2513 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2514 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2515
2516 *rule_nr = 10;
2517 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2518 return 0;
2519
2520 *rule_nr = 20;
2521 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2522 peer != UUID_JUST_CREATED)
2523 return -2;
2524
2525 *rule_nr = 30;
2526 if (self != UUID_JUST_CREATED &&
2527 (peer == UUID_JUST_CREATED || peer == (u64)0))
2528 return 2;
2529
2530 if (self == peer) {
2531 int rct, dc; /* roles at crash time */
2532
2533 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2534
31890f4a 2535 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2536 return -1091;
b411b363
PR
2537
2538 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2539 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2540 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2541 drbd_uuid_set_bm(mdev, 0UL);
2542
2543 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2544 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2545 *rule_nr = 34;
2546 } else {
2547 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2548 *rule_nr = 36;
2549 }
2550
2551 return 1;
2552 }
2553
2554 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2555
31890f4a 2556 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2557 return -1091;
b411b363
PR
2558
2559 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2560 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2561 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2562
2563 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2564 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2565 mdev->p_uuid[UI_BITMAP] = 0UL;
2566
2567 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2568 *rule_nr = 35;
2569 } else {
2570 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2571 *rule_nr = 37;
2572 }
2573
2574 return -1;
2575 }
2576
2577 /* Common power [off|failure] */
2578 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2579 (mdev->p_uuid[UI_FLAGS] & 2);
2580 /* lowest bit is set when we were primary,
2581 * next bit (weight 2) is set when peer was primary */
2582 *rule_nr = 40;
2583
2584 switch (rct) {
2585 case 0: /* !self_pri && !peer_pri */ return 0;
2586 case 1: /* self_pri && !peer_pri */ return 1;
2587 case 2: /* !self_pri && peer_pri */ return -1;
2588 case 3: /* self_pri && peer_pri */
25703f83 2589 dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
b411b363
PR
2590 return dc ? -1 : 1;
2591 }
2592 }
2593
2594 *rule_nr = 50;
2595 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2596 if (self == peer)
2597 return -1;
2598
2599 *rule_nr = 51;
2600 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2601 if (self == peer) {
31890f4a 2602 if (mdev->tconn->agreed_pro_version < 96 ?
4a23f264
PR
2603 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2604 (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2605 peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2606 /* The last P_SYNC_UUID did not get though. Undo the last start of
2607 resync as sync source modifications of the peer's UUIDs. */
2608
31890f4a 2609 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2610 return -1091;
b411b363
PR
2611
2612 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2613 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
4a23f264
PR
2614
2615 dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2616 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2617
b411b363
PR
2618 return -1;
2619 }
2620 }
2621
2622 *rule_nr = 60;
2623 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2624 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2625 peer = mdev->p_uuid[i] & ~((u64)1);
2626 if (self == peer)
2627 return -2;
2628 }
2629
2630 *rule_nr = 70;
2631 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2632 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2633 if (self == peer)
2634 return 1;
2635
2636 *rule_nr = 71;
2637 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2638 if (self == peer) {
31890f4a 2639 if (mdev->tconn->agreed_pro_version < 96 ?
4a23f264
PR
2640 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2641 (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2642 self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2643 /* The last P_SYNC_UUID did not get though. Undo the last start of
2644 resync as sync source modifications of our UUIDs. */
2645
31890f4a 2646 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2647 return -1091;
b411b363
PR
2648
2649 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2650 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2651
4a23f264 2652 dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
b411b363
PR
2653 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2654 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2655
2656 return 1;
2657 }
2658 }
2659
2660
2661 *rule_nr = 80;
d8c2a36b 2662 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
b411b363
PR
2663 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2664 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2665 if (self == peer)
2666 return 2;
2667 }
2668
2669 *rule_nr = 90;
2670 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2671 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2672 if (self == peer && self != ((u64)0))
2673 return 100;
2674
2675 *rule_nr = 100;
2676 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2677 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2678 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2679 peer = mdev->p_uuid[j] & ~((u64)1);
2680 if (self == peer)
2681 return -100;
2682 }
2683 }
2684
2685 return -1000;
2686}
2687
2688/* drbd_sync_handshake() returns the new conn state on success, or
2689 CONN_MASK (-1) on failure.
2690 */
2691static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2692 enum drbd_disk_state peer_disk) __must_hold(local)
2693{
2694 int hg, rule_nr;
2695 enum drbd_conns rv = C_MASK;
2696 enum drbd_disk_state mydisk;
2697
2698 mydisk = mdev->state.disk;
2699 if (mydisk == D_NEGOTIATING)
2700 mydisk = mdev->new_state_tmp.disk;
2701
2702 dev_info(DEV, "drbd_sync_handshake:\n");
2703 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2704 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2705 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2706
2707 hg = drbd_uuid_compare(mdev, &rule_nr);
2708
2709 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2710
2711 if (hg == -1000) {
2712 dev_alert(DEV, "Unrelated data, aborting!\n");
2713 return C_MASK;
2714 }
4a23f264
PR
2715 if (hg < -1000) {
2716 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
b411b363
PR
2717 return C_MASK;
2718 }
2719
2720 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2721 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2722 int f = (hg == -100) || abs(hg) == 2;
2723 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2724 if (f)
2725 hg = hg*2;
2726 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2727 hg > 0 ? "source" : "target");
2728 }
2729
3a11a487
AG
2730 if (abs(hg) == 100)
2731 drbd_khelper(mdev, "initial-split-brain");
2732
89e58e75 2733 if (hg == 100 || (hg == -100 && mdev->tconn->net_conf->always_asbp)) {
b411b363
PR
2734 int pcount = (mdev->state.role == R_PRIMARY)
2735 + (peer_role == R_PRIMARY);
2736 int forced = (hg == -100);
2737
2738 switch (pcount) {
2739 case 0:
2740 hg = drbd_asb_recover_0p(mdev);
2741 break;
2742 case 1:
2743 hg = drbd_asb_recover_1p(mdev);
2744 break;
2745 case 2:
2746 hg = drbd_asb_recover_2p(mdev);
2747 break;
2748 }
2749 if (abs(hg) < 100) {
2750 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2751 "automatically solved. Sync from %s node\n",
2752 pcount, (hg < 0) ? "peer" : "this");
2753 if (forced) {
2754 dev_warn(DEV, "Doing a full sync, since"
2755 " UUIDs where ambiguous.\n");
2756 hg = hg*2;
2757 }
2758 }
2759 }
2760
2761 if (hg == -100) {
89e58e75 2762 if (mdev->tconn->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
b411b363 2763 hg = -1;
89e58e75 2764 if (!mdev->tconn->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
b411b363
PR
2765 hg = 1;
2766
2767 if (abs(hg) < 100)
2768 dev_warn(DEV, "Split-Brain detected, manually solved. "
2769 "Sync from %s node\n",
2770 (hg < 0) ? "peer" : "this");
2771 }
2772
2773 if (hg == -100) {
580b9767
LE
2774 /* FIXME this log message is not correct if we end up here
2775 * after an attempted attach on a diskless node.
2776 * We just refuse to attach -- well, we drop the "connection"
2777 * to that disk, in a way... */
3a11a487 2778 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
b411b363
PR
2779 drbd_khelper(mdev, "split-brain");
2780 return C_MASK;
2781 }
2782
2783 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2784 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2785 return C_MASK;
2786 }
2787
2788 if (hg < 0 && /* by intention we do not use mydisk here. */
2789 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
89e58e75 2790 switch (mdev->tconn->net_conf->rr_conflict) {
b411b363
PR
2791 case ASB_CALL_HELPER:
2792 drbd_khelper(mdev, "pri-lost");
2793 /* fall through */
2794 case ASB_DISCONNECT:
2795 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2796 return C_MASK;
2797 case ASB_VIOLENTLY:
2798 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2799 "assumption\n");
2800 }
2801 }
2802
8169e41b 2803 if (mdev->tconn->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
cf14c2e9
PR
2804 if (hg == 0)
2805 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2806 else
2807 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2808 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2809 abs(hg) >= 2 ? "full" : "bit-map based");
2810 return C_MASK;
2811 }
2812
b411b363
PR
2813 if (abs(hg) >= 2) {
2814 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
20ceb2b2
LE
2815 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2816 BM_LOCKED_SET_ALLOWED))
b411b363
PR
2817 return C_MASK;
2818 }
2819
2820 if (hg > 0) { /* become sync source. */
2821 rv = C_WF_BITMAP_S;
2822 } else if (hg < 0) { /* become sync target */
2823 rv = C_WF_BITMAP_T;
2824 } else {
2825 rv = C_CONNECTED;
2826 if (drbd_bm_total_weight(mdev)) {
2827 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2828 drbd_bm_total_weight(mdev));
2829 }
2830 }
2831
2832 return rv;
2833}
2834
2835/* returns 1 if invalid */
2836static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2837{
2838 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2839 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2840 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2841 return 0;
2842
2843 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2844 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2845 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2846 return 1;
2847
2848 /* everything else is valid if they are equal on both sides. */
2849 if (peer == self)
2850 return 0;
2851
2852 /* everything es is invalid. */
2853 return 1;
2854}
2855
7204624c 2856static int receive_protocol(struct drbd_tconn *tconn, enum drbd_packet cmd,
d8763023 2857 unsigned int data_size)
b411b363 2858{
7204624c 2859 struct p_protocol *p = &tconn->data.rbuf.protocol;
b411b363 2860 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
cf14c2e9 2861 int p_want_lose, p_two_primaries, cf;
b411b363
PR
2862 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2863
b411b363
PR
2864 p_proto = be32_to_cpu(p->protocol);
2865 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2866 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2867 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
b411b363 2868 p_two_primaries = be32_to_cpu(p->two_primaries);
cf14c2e9
PR
2869 cf = be32_to_cpu(p->conn_flags);
2870 p_want_lose = cf & CF_WANT_LOSE;
2871
7204624c 2872 clear_bit(CONN_DRY_RUN, &tconn->flags);
cf14c2e9
PR
2873
2874 if (cf & CF_DRY_RUN)
7204624c 2875 set_bit(CONN_DRY_RUN, &tconn->flags);
b411b363 2876
7204624c
PR
2877 if (p_proto != tconn->net_conf->wire_protocol) {
2878 conn_err(tconn, "incompatible communication protocols\n");
b411b363
PR
2879 goto disconnect;
2880 }
2881
7204624c
PR
2882 if (cmp_after_sb(p_after_sb_0p, tconn->net_conf->after_sb_0p)) {
2883 conn_err(tconn, "incompatible after-sb-0pri settings\n");
b411b363
PR
2884 goto disconnect;
2885 }
2886
7204624c
PR
2887 if (cmp_after_sb(p_after_sb_1p, tconn->net_conf->after_sb_1p)) {
2888 conn_err(tconn, "incompatible after-sb-1pri settings\n");
b411b363
PR
2889 goto disconnect;
2890 }
2891
7204624c
PR
2892 if (cmp_after_sb(p_after_sb_2p, tconn->net_conf->after_sb_2p)) {
2893 conn_err(tconn, "incompatible after-sb-2pri settings\n");
b411b363
PR
2894 goto disconnect;
2895 }
2896
7204624c
PR
2897 if (p_want_lose && tconn->net_conf->want_lose) {
2898 conn_err(tconn, "both sides have the 'want_lose' flag set\n");
b411b363
PR
2899 goto disconnect;
2900 }
2901
7204624c
PR
2902 if (p_two_primaries != tconn->net_conf->two_primaries) {
2903 conn_err(tconn, "incompatible setting of the two-primaries options\n");
b411b363
PR
2904 goto disconnect;
2905 }
2906
7204624c
PR
2907 if (tconn->agreed_pro_version >= 87) {
2908 unsigned char *my_alg = tconn->net_conf->integrity_alg;
b411b363 2909
7204624c 2910 if (drbd_recv(tconn, p_integrity_alg, data_size) != data_size)
81e84650 2911 return false;
b411b363
PR
2912
2913 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2914 if (strcmp(p_integrity_alg, my_alg)) {
7204624c 2915 conn_err(tconn, "incompatible setting of the data-integrity-alg\n");
b411b363
PR
2916 goto disconnect;
2917 }
7204624c 2918 conn_info(tconn, "data-integrity-alg: %s\n",
b411b363
PR
2919 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2920 }
2921
81e84650 2922 return true;
b411b363
PR
2923
2924disconnect:
7204624c 2925 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
81e84650 2926 return false;
b411b363
PR
2927}
2928
2929/* helper function
2930 * input: alg name, feature name
2931 * return: NULL (alg name was "")
2932 * ERR_PTR(error) if something goes wrong
2933 * or the crypto hash ptr, if it worked out ok. */
2934struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2935 const char *alg, const char *name)
2936{
2937 struct crypto_hash *tfm;
2938
2939 if (!alg[0])
2940 return NULL;
2941
2942 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2943 if (IS_ERR(tfm)) {
2944 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2945 alg, name, PTR_ERR(tfm));
2946 return tfm;
2947 }
2948 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2949 crypto_free_hash(tfm);
2950 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2951 return ERR_PTR(-EINVAL);
2952 }
2953 return tfm;
2954}
2955
d8763023
AG
2956static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packet cmd,
2957 unsigned int packet_size)
b411b363 2958{
81e84650 2959 int ok = true;
e42325a5 2960 struct p_rs_param_95 *p = &mdev->tconn->data.rbuf.rs_param_95;
b411b363
PR
2961 unsigned int header_size, data_size, exp_max_sz;
2962 struct crypto_hash *verify_tfm = NULL;
2963 struct crypto_hash *csums_tfm = NULL;
31890f4a 2964 const int apv = mdev->tconn->agreed_pro_version;
778f271d
PR
2965 int *rs_plan_s = NULL;
2966 int fifo_size = 0;
b411b363
PR
2967
2968 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
2969 : apv == 88 ? sizeof(struct p_rs_param)
2970 + SHARED_SECRET_MAX
8e26f9cc
PR
2971 : apv <= 94 ? sizeof(struct p_rs_param_89)
2972 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
b411b363 2973
02918be2 2974 if (packet_size > exp_max_sz) {
b411b363 2975 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
02918be2 2976 packet_size, exp_max_sz);
81e84650 2977 return false;
b411b363
PR
2978 }
2979
2980 if (apv <= 88) {
257d0af6 2981 header_size = sizeof(struct p_rs_param) - sizeof(struct p_header);
02918be2 2982 data_size = packet_size - header_size;
8e26f9cc 2983 } else if (apv <= 94) {
257d0af6 2984 header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header);
02918be2 2985 data_size = packet_size - header_size;
b411b363 2986 D_ASSERT(data_size == 0);
8e26f9cc 2987 } else {
257d0af6 2988 header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header);
02918be2 2989 data_size = packet_size - header_size;
b411b363
PR
2990 D_ASSERT(data_size == 0);
2991 }
2992
2993 /* initialize verify_alg and csums_alg */
2994 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2995
de0ff338 2996 if (drbd_recv(mdev->tconn, &p->head.payload, header_size) != header_size)
81e84650 2997 return false;
b411b363
PR
2998
2999 mdev->sync_conf.rate = be32_to_cpu(p->rate);
3000
3001 if (apv >= 88) {
3002 if (apv == 88) {
3003 if (data_size > SHARED_SECRET_MAX) {
3004 dev_err(DEV, "verify-alg too long, "
3005 "peer wants %u, accepting only %u byte\n",
3006 data_size, SHARED_SECRET_MAX);
81e84650 3007 return false;
b411b363
PR
3008 }
3009
de0ff338 3010 if (drbd_recv(mdev->tconn, p->verify_alg, data_size) != data_size)
81e84650 3011 return false;
b411b363
PR
3012
3013 /* we expect NUL terminated string */
3014 /* but just in case someone tries to be evil */
3015 D_ASSERT(p->verify_alg[data_size-1] == 0);
3016 p->verify_alg[data_size-1] = 0;
3017
3018 } else /* apv >= 89 */ {
3019 /* we still expect NUL terminated strings */
3020 /* but just in case someone tries to be evil */
3021 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3022 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3023 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3024 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3025 }
3026
3027 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
3028 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3029 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3030 mdev->sync_conf.verify_alg, p->verify_alg);
3031 goto disconnect;
3032 }
3033 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3034 p->verify_alg, "verify-alg");
3035 if (IS_ERR(verify_tfm)) {
3036 verify_tfm = NULL;
3037 goto disconnect;
3038 }
3039 }
3040
3041 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
3042 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3043 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3044 mdev->sync_conf.csums_alg, p->csums_alg);
3045 goto disconnect;
3046 }
3047 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3048 p->csums_alg, "csums-alg");
3049 if (IS_ERR(csums_tfm)) {
3050 csums_tfm = NULL;
3051 goto disconnect;
3052 }
3053 }
3054
8e26f9cc
PR
3055 if (apv > 94) {
3056 mdev->sync_conf.rate = be32_to_cpu(p->rate);
3057 mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3058 mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
3059 mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
3060 mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
778f271d
PR
3061
3062 fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3063 if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
3064 rs_plan_s = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
3065 if (!rs_plan_s) {
3066 dev_err(DEV, "kmalloc of fifo_buffer failed");
3067 goto disconnect;
3068 }
3069 }
8e26f9cc 3070 }
b411b363
PR
3071
3072 spin_lock(&mdev->peer_seq_lock);
3073 /* lock against drbd_nl_syncer_conf() */
3074 if (verify_tfm) {
3075 strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
3076 mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
3077 crypto_free_hash(mdev->verify_tfm);
3078 mdev->verify_tfm = verify_tfm;
3079 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3080 }
3081 if (csums_tfm) {
3082 strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
3083 mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
3084 crypto_free_hash(mdev->csums_tfm);
3085 mdev->csums_tfm = csums_tfm;
3086 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3087 }
778f271d
PR
3088 if (fifo_size != mdev->rs_plan_s.size) {
3089 kfree(mdev->rs_plan_s.values);
3090 mdev->rs_plan_s.values = rs_plan_s;
3091 mdev->rs_plan_s.size = fifo_size;
3092 mdev->rs_planed = 0;
3093 }
b411b363
PR
3094 spin_unlock(&mdev->peer_seq_lock);
3095 }
3096
3097 return ok;
3098disconnect:
3099 /* just for completeness: actually not needed,
3100 * as this is not reached if csums_tfm was ok. */
3101 crypto_free_hash(csums_tfm);
3102 /* but free the verify_tfm again, if csums_tfm did not work out */
3103 crypto_free_hash(verify_tfm);
38fa9988 3104 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
81e84650 3105 return false;
b411b363
PR
3106}
3107
b411b363
PR
3108/* warn if the arguments differ by more than 12.5% */
3109static void warn_if_differ_considerably(struct drbd_conf *mdev,
3110 const char *s, sector_t a, sector_t b)
3111{
3112 sector_t d;
3113 if (a == 0 || b == 0)
3114 return;
3115 d = (a > b) ? (a - b) : (b - a);
3116 if (d > (a>>3) || d > (b>>3))
3117 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3118 (unsigned long long)a, (unsigned long long)b);
3119}
3120
d8763023
AG
3121static int receive_sizes(struct drbd_conf *mdev, enum drbd_packet cmd,
3122 unsigned int data_size)
b411b363 3123{
e42325a5 3124 struct p_sizes *p = &mdev->tconn->data.rbuf.sizes;
b411b363 3125 enum determine_dev_size dd = unchanged;
b411b363
PR
3126 sector_t p_size, p_usize, my_usize;
3127 int ldsc = 0; /* local disk size changed */
e89b591c 3128 enum dds_flags ddsf;
b411b363 3129
b411b363
PR
3130 p_size = be64_to_cpu(p->d_size);
3131 p_usize = be64_to_cpu(p->u_size);
3132
b411b363
PR
3133 /* just store the peer's disk size for now.
3134 * we still need to figure out whether we accept that. */
3135 mdev->p_size = p_size;
3136
b411b363
PR
3137 if (get_ldev(mdev)) {
3138 warn_if_differ_considerably(mdev, "lower level device sizes",
3139 p_size, drbd_get_max_capacity(mdev->ldev));
3140 warn_if_differ_considerably(mdev, "user requested size",
3141 p_usize, mdev->ldev->dc.disk_size);
3142
3143 /* if this is the first connect, or an otherwise expected
3144 * param exchange, choose the minimum */
3145 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3146 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3147 p_usize);
3148
3149 my_usize = mdev->ldev->dc.disk_size;
3150
3151 if (mdev->ldev->dc.disk_size != p_usize) {
3152 mdev->ldev->dc.disk_size = p_usize;
3153 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3154 (unsigned long)mdev->ldev->dc.disk_size);
3155 }
3156
3157 /* Never shrink a device with usable data during connect.
3158 But allow online shrinking if we are connected. */
a393db6f 3159 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
b411b363
PR
3160 drbd_get_capacity(mdev->this_bdev) &&
3161 mdev->state.disk >= D_OUTDATED &&
3162 mdev->state.conn < C_CONNECTED) {
3163 dev_err(DEV, "The peer's disk size is too small!\n");
38fa9988 3164 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
3165 mdev->ldev->dc.disk_size = my_usize;
3166 put_ldev(mdev);
81e84650 3167 return false;
b411b363
PR
3168 }
3169 put_ldev(mdev);
3170 }
b411b363 3171
e89b591c 3172 ddsf = be16_to_cpu(p->dds_flags);
b411b363 3173 if (get_ldev(mdev)) {
24c4830c 3174 dd = drbd_determine_dev_size(mdev, ddsf);
b411b363
PR
3175 put_ldev(mdev);
3176 if (dd == dev_size_error)
81e84650 3177 return false;
b411b363
PR
3178 drbd_md_sync(mdev);
3179 } else {
3180 /* I am diskless, need to accept the peer's size. */
3181 drbd_set_my_capacity(mdev, p_size);
3182 }
3183
99432fcc
PR
3184 mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3185 drbd_reconsider_max_bio_size(mdev);
3186
b411b363
PR
3187 if (get_ldev(mdev)) {
3188 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3189 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3190 ldsc = 1;
3191 }
3192
b411b363
PR
3193 put_ldev(mdev);
3194 }
3195
3196 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3197 if (be64_to_cpu(p->c_size) !=
3198 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3199 /* we have different sizes, probably peer
3200 * needs to know my new size... */
e89b591c 3201 drbd_send_sizes(mdev, 0, ddsf);
b411b363
PR
3202 }
3203 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3204 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3205 if (mdev->state.pdsk >= D_INCONSISTENT &&
e89b591c
PR
3206 mdev->state.disk >= D_INCONSISTENT) {
3207 if (ddsf & DDSF_NO_RESYNC)
3208 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3209 else
3210 resync_after_online_grow(mdev);
3211 } else
b411b363
PR
3212 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3213 }
3214 }
3215
81e84650 3216 return true;
b411b363
PR
3217}
3218
d8763023
AG
3219static int receive_uuids(struct drbd_conf *mdev, enum drbd_packet cmd,
3220 unsigned int data_size)
b411b363 3221{
e42325a5 3222 struct p_uuids *p = &mdev->tconn->data.rbuf.uuids;
b411b363 3223 u64 *p_uuid;
62b0da3a 3224 int i, updated_uuids = 0;
b411b363 3225
b411b363
PR
3226 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3227
3228 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3229 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3230
3231 kfree(mdev->p_uuid);
3232 mdev->p_uuid = p_uuid;
3233
3234 if (mdev->state.conn < C_CONNECTED &&
3235 mdev->state.disk < D_INCONSISTENT &&
3236 mdev->state.role == R_PRIMARY &&
3237 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3238 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3239 (unsigned long long)mdev->ed_uuid);
38fa9988 3240 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
81e84650 3241 return false;
b411b363
PR
3242 }
3243
3244 if (get_ldev(mdev)) {
3245 int skip_initial_sync =
3246 mdev->state.conn == C_CONNECTED &&
31890f4a 3247 mdev->tconn->agreed_pro_version >= 90 &&
b411b363
PR
3248 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3249 (p_uuid[UI_FLAGS] & 8);
3250 if (skip_initial_sync) {
3251 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3252 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
20ceb2b2
LE
3253 "clear_n_write from receive_uuids",
3254 BM_LOCKED_TEST_ALLOWED);
b411b363
PR
3255 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3256 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3257 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3258 CS_VERBOSE, NULL);
3259 drbd_md_sync(mdev);
62b0da3a 3260 updated_uuids = 1;
b411b363
PR
3261 }
3262 put_ldev(mdev);
18a50fa2
PR
3263 } else if (mdev->state.disk < D_INCONSISTENT &&
3264 mdev->state.role == R_PRIMARY) {
3265 /* I am a diskless primary, the peer just created a new current UUID
3266 for me. */
62b0da3a 3267 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
b411b363
PR
3268 }
3269
3270 /* Before we test for the disk state, we should wait until an eventually
3271 ongoing cluster wide state change is finished. That is important if
3272 we are primary and are detaching from our disk. We need to see the
3273 new disk state... */
8410da8f
PR
3274 mutex_lock(mdev->state_mutex);
3275 mutex_unlock(mdev->state_mutex);
b411b363 3276 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
62b0da3a
LE
3277 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3278
3279 if (updated_uuids)
3280 drbd_print_uuids(mdev, "receiver updated UUIDs to");
b411b363 3281
81e84650 3282 return true;
b411b363
PR
3283}
3284
3285/**
3286 * convert_state() - Converts the peer's view of the cluster state to our point of view
3287 * @ps: The state as seen by the peer.
3288 */
3289static union drbd_state convert_state(union drbd_state ps)
3290{
3291 union drbd_state ms;
3292
3293 static enum drbd_conns c_tab[] = {
3294 [C_CONNECTED] = C_CONNECTED,
3295
3296 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3297 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3298 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3299 [C_VERIFY_S] = C_VERIFY_T,
3300 [C_MASK] = C_MASK,
3301 };
3302
3303 ms.i = ps.i;
3304
3305 ms.conn = c_tab[ps.conn];
3306 ms.peer = ps.role;
3307 ms.role = ps.peer;
3308 ms.pdsk = ps.disk;
3309 ms.disk = ps.pdsk;
3310 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3311
3312 return ms;
3313}
3314
d8763023
AG
3315static int receive_req_state(struct drbd_conf *mdev, enum drbd_packet cmd,
3316 unsigned int data_size)
b411b363 3317{
e42325a5 3318 struct p_req_state *p = &mdev->tconn->data.rbuf.req_state;
b411b363 3319 union drbd_state mask, val;
bf885f8a 3320 enum drbd_state_rv rv;
b411b363 3321
b411b363
PR
3322 mask.i = be32_to_cpu(p->mask);
3323 val.i = be32_to_cpu(p->val);
3324
25703f83 3325 if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
8410da8f 3326 mutex_is_locked(mdev->state_mutex)) {
b411b363 3327 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
81e84650 3328 return true;
b411b363
PR
3329 }
3330
3331 mask = convert_state(mask);
3332 val = convert_state(val);
3333
047cd4a6
PR
3334 if (cmd == P_CONN_ST_CHG_REQ) {
3335 rv = conn_request_state(mdev->tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY);
3336 conn_send_sr_reply(mdev->tconn, rv);
3337 } else {
3338 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3339 drbd_send_sr_reply(mdev, rv);
3340 }
b411b363 3341
b411b363
PR
3342 drbd_md_sync(mdev);
3343
81e84650 3344 return true;
b411b363
PR
3345}
3346
d8763023
AG
3347static int receive_state(struct drbd_conf *mdev, enum drbd_packet cmd,
3348 unsigned int data_size)
b411b363 3349{
e42325a5 3350 struct p_state *p = &mdev->tconn->data.rbuf.state;
4ac4aada 3351 union drbd_state os, ns, peer_state;
b411b363 3352 enum drbd_disk_state real_peer_disk;
65d922c3 3353 enum chg_state_flags cs_flags;
b411b363
PR
3354 int rv;
3355
b411b363
PR
3356 peer_state.i = be32_to_cpu(p->state);
3357
3358 real_peer_disk = peer_state.disk;
3359 if (peer_state.disk == D_NEGOTIATING) {
3360 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3361 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3362 }
3363
87eeee41 3364 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 3365 retry:
4ac4aada 3366 os = ns = mdev->state;
87eeee41 3367 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 3368
e9ef7bb6
LE
3369 /* peer says his disk is uptodate, while we think it is inconsistent,
3370 * and this happens while we think we have a sync going on. */
3371 if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3372 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3373 /* If we are (becoming) SyncSource, but peer is still in sync
3374 * preparation, ignore its uptodate-ness to avoid flapping, it
3375 * will change to inconsistent once the peer reaches active
3376 * syncing states.
3377 * It may have changed syncer-paused flags, however, so we
3378 * cannot ignore this completely. */
3379 if (peer_state.conn > C_CONNECTED &&
3380 peer_state.conn < C_SYNC_SOURCE)
3381 real_peer_disk = D_INCONSISTENT;
3382
3383 /* if peer_state changes to connected at the same time,
3384 * it explicitly notifies us that it finished resync.
3385 * Maybe we should finish it up, too? */
3386 else if (os.conn >= C_SYNC_SOURCE &&
3387 peer_state.conn == C_CONNECTED) {
3388 if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3389 drbd_resync_finished(mdev);
81e84650 3390 return true;
e9ef7bb6
LE
3391 }
3392 }
3393
3394 /* peer says his disk is inconsistent, while we think it is uptodate,
3395 * and this happens while the peer still thinks we have a sync going on,
3396 * but we think we are already done with the sync.
3397 * We ignore this to avoid flapping pdsk.
3398 * This should not happen, if the peer is a recent version of drbd. */
3399 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3400 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3401 real_peer_disk = D_UP_TO_DATE;
3402
4ac4aada
LE
3403 if (ns.conn == C_WF_REPORT_PARAMS)
3404 ns.conn = C_CONNECTED;
b411b363 3405
67531718
PR
3406 if (peer_state.conn == C_AHEAD)
3407 ns.conn = C_BEHIND;
3408
b411b363
PR
3409 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3410 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3411 int cr; /* consider resync */
3412
3413 /* if we established a new connection */
4ac4aada 3414 cr = (os.conn < C_CONNECTED);
b411b363
PR
3415 /* if we had an established connection
3416 * and one of the nodes newly attaches a disk */
4ac4aada 3417 cr |= (os.conn == C_CONNECTED &&
b411b363 3418 (peer_state.disk == D_NEGOTIATING ||
4ac4aada 3419 os.disk == D_NEGOTIATING));
b411b363
PR
3420 /* if we have both been inconsistent, and the peer has been
3421 * forced to be UpToDate with --overwrite-data */
3422 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3423 /* if we had been plain connected, and the admin requested to
3424 * start a sync by "invalidate" or "invalidate-remote" */
4ac4aada 3425 cr |= (os.conn == C_CONNECTED &&
b411b363
PR
3426 (peer_state.conn >= C_STARTING_SYNC_S &&
3427 peer_state.conn <= C_WF_BITMAP_T));
3428
3429 if (cr)
4ac4aada 3430 ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
b411b363
PR
3431
3432 put_ldev(mdev);
4ac4aada
LE
3433 if (ns.conn == C_MASK) {
3434 ns.conn = C_CONNECTED;
b411b363 3435 if (mdev->state.disk == D_NEGOTIATING) {
82f59cc6 3436 drbd_force_state(mdev, NS(disk, D_FAILED));
b411b363
PR
3437 } else if (peer_state.disk == D_NEGOTIATING) {
3438 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3439 peer_state.disk = D_DISKLESS;
580b9767 3440 real_peer_disk = D_DISKLESS;
b411b363 3441 } else {
8169e41b 3442 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
81e84650 3443 return false;
4ac4aada 3444 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
38fa9988 3445 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
81e84650 3446 return false;
b411b363
PR
3447 }
3448 }
3449 }
3450
87eeee41 3451 spin_lock_irq(&mdev->tconn->req_lock);
4ac4aada 3452 if (mdev->state.i != os.i)
b411b363
PR
3453 goto retry;
3454 clear_bit(CONSIDER_RESYNC, &mdev->flags);
b411b363
PR
3455 ns.peer = peer_state.role;
3456 ns.pdsk = real_peer_disk;
3457 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4ac4aada 3458 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
b411b363 3459 ns.disk = mdev->new_state_tmp.disk;
4ac4aada
LE
3460 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3461 if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
481c6f50 3462 test_bit(NEW_CUR_UUID, &mdev->flags)) {
8554df1c 3463 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
481c6f50 3464 for temporal network outages! */
87eeee41 3465 spin_unlock_irq(&mdev->tconn->req_lock);
481c6f50 3466 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
2f5cdd0b 3467 tl_clear(mdev->tconn);
481c6f50
PR
3468 drbd_uuid_new_current(mdev);
3469 clear_bit(NEW_CUR_UUID, &mdev->flags);
38fa9988 3470 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
81e84650 3471 return false;
481c6f50 3472 }
65d922c3 3473 rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
b411b363 3474 ns = mdev->state;
87eeee41 3475 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
3476
3477 if (rv < SS_SUCCESS) {
38fa9988 3478 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
81e84650 3479 return false;
b411b363
PR
3480 }
3481
4ac4aada
LE
3482 if (os.conn > C_WF_REPORT_PARAMS) {
3483 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
b411b363
PR
3484 peer_state.disk != D_NEGOTIATING ) {
3485 /* we want resync, peer has not yet decided to sync... */
3486 /* Nowadays only used when forcing a node into primary role and
3487 setting its disk to UpToDate with that */
3488 drbd_send_uuids(mdev);
3489 drbd_send_state(mdev);
3490 }
3491 }
3492
89e58e75 3493 mdev->tconn->net_conf->want_lose = 0;
b411b363
PR
3494
3495 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3496
81e84650 3497 return true;
b411b363
PR
3498}
3499
d8763023
AG
3500static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packet cmd,
3501 unsigned int data_size)
b411b363 3502{
e42325a5 3503 struct p_rs_uuid *p = &mdev->tconn->data.rbuf.rs_uuid;
b411b363
PR
3504
3505 wait_event(mdev->misc_wait,
3506 mdev->state.conn == C_WF_SYNC_UUID ||
c4752ef1 3507 mdev->state.conn == C_BEHIND ||
b411b363
PR
3508 mdev->state.conn < C_CONNECTED ||
3509 mdev->state.disk < D_NEGOTIATING);
3510
3511 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3512
b411b363
PR
3513 /* Here the _drbd_uuid_ functions are right, current should
3514 _not_ be rotated into the history */
3515 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3516 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3517 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3518
62b0da3a 3519 drbd_print_uuids(mdev, "updated sync uuid");
b411b363
PR
3520 drbd_start_resync(mdev, C_SYNC_TARGET);
3521
3522 put_ldev(mdev);
3523 } else
3524 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3525
81e84650 3526 return true;
b411b363
PR
3527}
3528
2c46407d
AG
3529/**
3530 * receive_bitmap_plain
3531 *
3532 * Return 0 when done, 1 when another iteration is needed, and a negative error
3533 * code upon failure.
3534 */
3535static int
02918be2
PR
3536receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3537 unsigned long *buffer, struct bm_xfer_ctx *c)
b411b363
PR
3538{
3539 unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3540 unsigned want = num_words * sizeof(long);
2c46407d 3541 int err;
b411b363 3542
02918be2
PR
3543 if (want != data_size) {
3544 dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
2c46407d 3545 return -EIO;
b411b363
PR
3546 }
3547 if (want == 0)
2c46407d 3548 return 0;
de0ff338 3549 err = drbd_recv(mdev->tconn, buffer, want);
2c46407d
AG
3550 if (err != want) {
3551 if (err >= 0)
3552 err = -EIO;
3553 return err;
3554 }
b411b363
PR
3555
3556 drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3557
3558 c->word_offset += num_words;
3559 c->bit_offset = c->word_offset * BITS_PER_LONG;
3560 if (c->bit_offset > c->bm_bits)
3561 c->bit_offset = c->bm_bits;
3562
2c46407d 3563 return 1;
b411b363
PR
3564}
3565
2c46407d
AG
3566/**
3567 * recv_bm_rle_bits
3568 *
3569 * Return 0 when done, 1 when another iteration is needed, and a negative error
3570 * code upon failure.
3571 */
3572static int
b411b363
PR
3573recv_bm_rle_bits(struct drbd_conf *mdev,
3574 struct p_compressed_bm *p,
c6d25cfe
PR
3575 struct bm_xfer_ctx *c,
3576 unsigned int len)
b411b363
PR
3577{
3578 struct bitstream bs;
3579 u64 look_ahead;
3580 u64 rl;
3581 u64 tmp;
3582 unsigned long s = c->bit_offset;
3583 unsigned long e;
b411b363
PR
3584 int toggle = DCBP_get_start(p);
3585 int have;
3586 int bits;
3587
3588 bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3589
3590 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3591 if (bits < 0)
2c46407d 3592 return -EIO;
b411b363
PR
3593
3594 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3595 bits = vli_decode_bits(&rl, look_ahead);
3596 if (bits <= 0)
2c46407d 3597 return -EIO;
b411b363
PR
3598
3599 if (toggle) {
3600 e = s + rl -1;
3601 if (e >= c->bm_bits) {
3602 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
2c46407d 3603 return -EIO;
b411b363
PR
3604 }
3605 _drbd_bm_set_bits(mdev, s, e);
3606 }
3607
3608 if (have < bits) {
3609 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3610 have, bits, look_ahead,
3611 (unsigned int)(bs.cur.b - p->code),
3612 (unsigned int)bs.buf_len);
2c46407d 3613 return -EIO;
b411b363
PR
3614 }
3615 look_ahead >>= bits;
3616 have -= bits;
3617
3618 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3619 if (bits < 0)
2c46407d 3620 return -EIO;
b411b363
PR
3621 look_ahead |= tmp << have;
3622 have += bits;
3623 }
3624
3625 c->bit_offset = s;
3626 bm_xfer_ctx_bit_to_word_offset(c);
3627
2c46407d 3628 return (s != c->bm_bits);
b411b363
PR
3629}
3630
2c46407d
AG
3631/**
3632 * decode_bitmap_c
3633 *
3634 * Return 0 when done, 1 when another iteration is needed, and a negative error
3635 * code upon failure.
3636 */
3637static int
b411b363
PR
3638decode_bitmap_c(struct drbd_conf *mdev,
3639 struct p_compressed_bm *p,
c6d25cfe
PR
3640 struct bm_xfer_ctx *c,
3641 unsigned int len)
b411b363
PR
3642{
3643 if (DCBP_get_code(p) == RLE_VLI_Bits)
c6d25cfe 3644 return recv_bm_rle_bits(mdev, p, c, len);
b411b363
PR
3645
3646 /* other variants had been implemented for evaluation,
3647 * but have been dropped as this one turned out to be "best"
3648 * during all our tests. */
3649
3650 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
38fa9988 3651 conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
2c46407d 3652 return -EIO;
b411b363
PR
3653}
3654
3655void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3656 const char *direction, struct bm_xfer_ctx *c)
3657{
3658 /* what would it take to transfer it "plaintext" */
c012949a 3659 unsigned plain = sizeof(struct p_header) *
b411b363
PR
3660 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3661 + c->bm_words * sizeof(long);
3662 unsigned total = c->bytes[0] + c->bytes[1];
3663 unsigned r;
3664
3665 /* total can not be zero. but just in case: */
3666 if (total == 0)
3667 return;
3668
3669 /* don't report if not compressed */
3670 if (total >= plain)
3671 return;
3672
3673 /* total < plain. check for overflow, still */
3674 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3675 : (1000 * total / plain);
3676
3677 if (r > 1000)
3678 r = 1000;
3679
3680 r = 1000 - r;
3681 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3682 "total %u; compression: %u.%u%%\n",
3683 direction,
3684 c->bytes[1], c->packets[1],
3685 c->bytes[0], c->packets[0],
3686 total, r/10, r % 10);
3687}
3688
3689/* Since we are processing the bitfield from lower addresses to higher,
3690 it does not matter if the process it in 32 bit chunks or 64 bit
3691 chunks as long as it is little endian. (Understand it as byte stream,
3692 beginning with the lowest byte...) If we would use big endian
3693 we would need to process it from the highest address to the lowest,
3694 in order to be agnostic to the 32 vs 64 bits issue.
3695
3696 returns 0 on failure, 1 if we successfully received it. */
d8763023
AG
3697static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packet cmd,
3698 unsigned int data_size)
b411b363
PR
3699{
3700 struct bm_xfer_ctx c;
3701 void *buffer;
2c46407d 3702 int err;
81e84650 3703 int ok = false;
257d0af6 3704 struct p_header *h = &mdev->tconn->data.rbuf.header;
77351055 3705 struct packet_info pi;
b411b363 3706
20ceb2b2
LE
3707 drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3708 /* you are supposed to send additional out-of-sync information
3709 * if you actually set bits during this phase */
b411b363
PR
3710
3711 /* maybe we should use some per thread scratch page,
3712 * and allocate that during initial device creation? */
3713 buffer = (unsigned long *) __get_free_page(GFP_NOIO);
3714 if (!buffer) {
3715 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3716 goto out;
3717 }
3718
3719 c = (struct bm_xfer_ctx) {
3720 .bm_bits = drbd_bm_bits(mdev),
3721 .bm_words = drbd_bm_words(mdev),
3722 };
3723
2c46407d 3724 for(;;) {
02918be2 3725 if (cmd == P_BITMAP) {
2c46407d 3726 err = receive_bitmap_plain(mdev, data_size, buffer, &c);
02918be2 3727 } else if (cmd == P_COMPRESSED_BITMAP) {
b411b363
PR
3728 /* MAYBE: sanity check that we speak proto >= 90,
3729 * and the feature is enabled! */
3730 struct p_compressed_bm *p;
3731
02918be2 3732 if (data_size > BM_PACKET_PAYLOAD_BYTES) {
b411b363
PR
3733 dev_err(DEV, "ReportCBitmap packet too large\n");
3734 goto out;
3735 }
3736 /* use the page buff */
3737 p = buffer;
3738 memcpy(p, h, sizeof(*h));
de0ff338 3739 if (drbd_recv(mdev->tconn, p->head.payload, data_size) != data_size)
b411b363 3740 goto out;
004352fa
LE
3741 if (data_size <= (sizeof(*p) - sizeof(p->head))) {
3742 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
78fcbdae 3743 goto out;
b411b363 3744 }
c6d25cfe 3745 err = decode_bitmap_c(mdev, p, &c, data_size);
b411b363 3746 } else {
02918be2 3747 dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
b411b363
PR
3748 goto out;
3749 }
3750
02918be2 3751 c.packets[cmd == P_BITMAP]++;
257d0af6 3752 c.bytes[cmd == P_BITMAP] += sizeof(struct p_header) + data_size;
b411b363 3753
2c46407d
AG
3754 if (err <= 0) {
3755 if (err < 0)
3756 goto out;
b411b363 3757 break;
2c46407d 3758 }
9ba7aa00 3759 if (!drbd_recv_header(mdev->tconn, &pi))
b411b363 3760 goto out;
77351055
PR
3761 cmd = pi.cmd;
3762 data_size = pi.size;
2c46407d 3763 }
b411b363
PR
3764
3765 INFO_bm_xfer_stats(mdev, "receive", &c);
3766
3767 if (mdev->state.conn == C_WF_BITMAP_T) {
de1f8e4a
AG
3768 enum drbd_state_rv rv;
3769
b411b363
PR
3770 ok = !drbd_send_bitmap(mdev);
3771 if (!ok)
3772 goto out;
3773 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
de1f8e4a
AG
3774 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3775 D_ASSERT(rv == SS_SUCCESS);
b411b363
PR
3776 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3777 /* admin may have requested C_DISCONNECTING,
3778 * other threads may have noticed network errors */
3779 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3780 drbd_conn_str(mdev->state.conn));
3781 }
3782
81e84650 3783 ok = true;
b411b363 3784 out:
20ceb2b2 3785 drbd_bm_unlock(mdev);
b411b363
PR
3786 if (ok && mdev->state.conn == C_WF_BITMAP_S)
3787 drbd_start_resync(mdev, C_SYNC_SOURCE);
3788 free_page((unsigned long) buffer);
3789 return ok;
3790}
3791
2de876ef 3792static int _tconn_receive_skip(struct drbd_tconn *tconn, unsigned int data_size)
b411b363
PR
3793{
3794 /* TODO zero copy sink :) */
3795 static char sink[128];
3796 int size, want, r;
3797
02918be2 3798 size = data_size;
b411b363
PR
3799 while (size > 0) {
3800 want = min_t(int, size, sizeof(sink));
2de876ef
PR
3801 r = drbd_recv(tconn, sink, want);
3802 if (r <= 0)
841ce241 3803 break;
b411b363
PR
3804 size -= r;
3805 }
3806 return size == 0;
3807}
3808
2de876ef
PR
3809static int receive_skip(struct drbd_conf *mdev, enum drbd_packet cmd,
3810 unsigned int data_size)
3811{
3812 dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3813 cmd, data_size);
3814
3815 return _tconn_receive_skip(mdev->tconn, data_size);
3816}
3817
3818static int tconn_receive_skip(struct drbd_tconn *tconn, enum drbd_packet cmd, unsigned int data_size)
3819{
3820 conn_warn(tconn, "skipping packet for non existing volume type %d, l: %d!\n",
3821 cmd, data_size);
3822
3823 return _tconn_receive_skip(tconn, data_size);
3824}
3825
d8763023
AG
3826static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packet cmd,
3827 unsigned int data_size)
0ced55a3 3828{
e7f52dfb
LE
3829 /* Make sure we've acked all the TCP data associated
3830 * with the data requests being unplugged */
e42325a5 3831 drbd_tcp_quickack(mdev->tconn->data.socket);
0ced55a3 3832
81e84650 3833 return true;
0ced55a3
PR
3834}
3835
d8763023
AG
3836static int receive_out_of_sync(struct drbd_conf *mdev, enum drbd_packet cmd,
3837 unsigned int data_size)
73a01a18 3838{
e42325a5 3839 struct p_block_desc *p = &mdev->tconn->data.rbuf.block_desc;
73a01a18 3840
f735e363
LE
3841 switch (mdev->state.conn) {
3842 case C_WF_SYNC_UUID:
3843 case C_WF_BITMAP_T:
3844 case C_BEHIND:
3845 break;
3846 default:
3847 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3848 drbd_conn_str(mdev->state.conn));
3849 }
3850
73a01a18
PR
3851 drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3852
81e84650 3853 return true;
73a01a18
PR
3854}
3855
02918be2
PR
3856struct data_cmd {
3857 int expect_payload;
3858 size_t pkt_size;
d9ae84e7
PR
3859 enum {
3860 MDEV,
3861 CONN,
3862 } type;
3863 union {
3864 int (*mdev_fn)(struct drbd_conf *, enum drbd_packet cmd,
3865 unsigned int to_receive);
3866 int (*conn_fn)(struct drbd_tconn *, enum drbd_packet cmd,
3867 unsigned int to_receive);
3868 };
02918be2
PR
3869};
3870
3871static struct data_cmd drbd_cmd_handler[] = {
d9ae84e7
PR
3872 [P_DATA] = { 1, sizeof(struct p_data), MDEV, { receive_Data } },
3873 [P_DATA_REPLY] = { 1, sizeof(struct p_data), MDEV, { receive_DataReply } },
3874 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), MDEV, { receive_RSDataReply } } ,
3875 [P_BARRIER] = { 0, sizeof(struct p_barrier), MDEV, { receive_Barrier } } ,
3876 [P_BITMAP] = { 1, sizeof(struct p_header), MDEV, { receive_bitmap } } ,
3877 [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header), MDEV, { receive_bitmap } } ,
3878 [P_UNPLUG_REMOTE] = { 0, sizeof(struct p_header), MDEV, { receive_UnplugRemote } },
3879 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3880 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3881 [P_SYNC_PARAM] = { 1, sizeof(struct p_header), MDEV, { receive_SyncParam } },
3882 [P_SYNC_PARAM89] = { 1, sizeof(struct p_header), MDEV, { receive_SyncParam } },
7204624c 3883 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), CONN, { .conn_fn = receive_protocol } },
d9ae84e7
PR
3884 [P_UUIDS] = { 0, sizeof(struct p_uuids), MDEV, { receive_uuids } },
3885 [P_SIZES] = { 0, sizeof(struct p_sizes), MDEV, { receive_sizes } },
3886 [P_STATE] = { 0, sizeof(struct p_state), MDEV, { receive_state } },
3887 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), MDEV, { receive_req_state } },
3888 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), MDEV, { receive_sync_uuid } },
3889 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3890 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3891 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3892 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), MDEV, { receive_skip } },
3893 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), MDEV, { receive_out_of_sync } },
3894 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), MDEV, { receive_req_state } },
b411b363
PR
3895};
3896
02918be2 3897/* All handler functions that expect a sub-header get that sub-heder in
e42325a5 3898 mdev->tconn->data.rbuf.header.head.payload.
02918be2 3899
e42325a5 3900 Usually in mdev->tconn->data.rbuf.header.head the callback can find the usual
02918be2
PR
3901 p_header, but they may not rely on that. Since there is also p_header95 !
3902 */
b411b363 3903
eefc2f7d 3904static void drbdd(struct drbd_tconn *tconn)
b411b363 3905{
eefc2f7d 3906 struct p_header *header = &tconn->data.rbuf.header;
77351055 3907 struct packet_info pi;
02918be2
PR
3908 size_t shs; /* sub header size */
3909 int rv;
b411b363 3910
eefc2f7d
PR
3911 while (get_t_state(&tconn->receiver) == RUNNING) {
3912 drbd_thread_current_set_cpu(&tconn->receiver);
3913 if (!drbd_recv_header(tconn, &pi))
02918be2 3914 goto err_out;
b411b363 3915
6e849ce8 3916 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) ||
d9ae84e7 3917 !drbd_cmd_handler[pi.cmd].mdev_fn)) {
eefc2f7d 3918 conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
02918be2 3919 goto err_out;
0b33a916 3920 }
b411b363 3921
77351055
PR
3922 shs = drbd_cmd_handler[pi.cmd].pkt_size - sizeof(struct p_header);
3923 if (pi.size - shs > 0 && !drbd_cmd_handler[pi.cmd].expect_payload) {
eefc2f7d 3924 conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
02918be2 3925 goto err_out;
b411b363 3926 }
b411b363 3927
c13f7e1a 3928 if (shs) {
eefc2f7d 3929 rv = drbd_recv(tconn, &header->payload, shs);
c13f7e1a 3930 if (unlikely(rv != shs)) {
0ddc5549 3931 if (!signal_pending(current))
eefc2f7d 3932 conn_warn(tconn, "short read while reading sub header: rv=%d\n", rv);
c13f7e1a
LE
3933 goto err_out;
3934 }
3935 }
3936
d9ae84e7
PR
3937 if (drbd_cmd_handler[pi.cmd].type == CONN) {
3938 rv = drbd_cmd_handler[pi.cmd].conn_fn(tconn, pi.cmd, pi.size - shs);
3939 } else {
3940 struct drbd_conf *mdev = vnr_to_mdev(tconn, pi.vnr);
3941 rv = mdev ?
3942 drbd_cmd_handler[pi.cmd].mdev_fn(mdev, pi.cmd, pi.size - shs) :
3943 tconn_receive_skip(tconn, pi.cmd, pi.size - shs);
3944 }
b411b363 3945
02918be2 3946 if (unlikely(!rv)) {
eefc2f7d 3947 conn_err(tconn, "error receiving %s, l: %d!\n",
77351055 3948 cmdname(pi.cmd), pi.size);
02918be2 3949 goto err_out;
b411b363
PR
3950 }
3951 }
b411b363 3952
02918be2
PR
3953 if (0) {
3954 err_out:
bbeb641c 3955 conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
02918be2 3956 }
b411b363
PR
3957}
3958
0e29d163 3959void conn_flush_workqueue(struct drbd_tconn *tconn)
b411b363
PR
3960{
3961 struct drbd_wq_barrier barr;
3962
3963 barr.w.cb = w_prev_work_done;
0e29d163 3964 barr.w.tconn = tconn;
b411b363 3965 init_completion(&barr.done);
0e29d163 3966 drbd_queue_work(&tconn->data.work, &barr.w);
b411b363
PR
3967 wait_for_completion(&barr.done);
3968}
3969
360cc740 3970static void drbd_disconnect(struct drbd_tconn *tconn)
b411b363 3971{
bbeb641c 3972 enum drbd_conns oc;
b411b363 3973 int rv = SS_UNKNOWN_ERROR;
b411b363 3974
bbeb641c 3975 if (tconn->cstate == C_STANDALONE)
b411b363 3976 return;
b411b363
PR
3977
3978 /* asender does not clean up anything. it must not interfere, either */
360cc740
PR
3979 drbd_thread_stop(&tconn->asender);
3980 drbd_free_sock(tconn);
3981
3982 idr_for_each(&tconn->volumes, drbd_disconnected, tconn);
3983
3984 conn_info(tconn, "Connection closed\n");
3985
3986 spin_lock_irq(&tconn->req_lock);
bbeb641c
PR
3987 oc = tconn->cstate;
3988 if (oc >= C_UNCONNECTED)
3989 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
3990
360cc740
PR
3991 spin_unlock_irq(&tconn->req_lock);
3992
bbeb641c 3993 if (oc == C_DISCONNECTING) {
360cc740
PR
3994 wait_event(tconn->net_cnt_wait, atomic_read(&tconn->net_cnt) == 0);
3995
3996 crypto_free_hash(tconn->cram_hmac_tfm);
3997 tconn->cram_hmac_tfm = NULL;
3998
3999 kfree(tconn->net_conf);
4000 tconn->net_conf = NULL;
bbeb641c 4001 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE);
360cc740
PR
4002 }
4003}
4004
4005static int drbd_disconnected(int vnr, void *p, void *data)
4006{
4007 struct drbd_conf *mdev = (struct drbd_conf *)p;
4008 enum drbd_fencing_p fp;
4009 unsigned int i;
b411b363 4010
85719573 4011 /* wait for current activity to cease. */
87eeee41 4012 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
4013 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4014 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4015 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
87eeee41 4016 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
4017
4018 /* We do not have data structures that would allow us to
4019 * get the rs_pending_cnt down to 0 again.
4020 * * On C_SYNC_TARGET we do not have any data structures describing
4021 * the pending RSDataRequest's we have sent.
4022 * * On C_SYNC_SOURCE there is no data structure that tracks
4023 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4024 * And no, it is not the sum of the reference counts in the
4025 * resync_LRU. The resync_LRU tracks the whole operation including
4026 * the disk-IO, while the rs_pending_cnt only tracks the blocks
4027 * on the fly. */
4028 drbd_rs_cancel_all(mdev);
4029 mdev->rs_total = 0;
4030 mdev->rs_failed = 0;
4031 atomic_set(&mdev->rs_pending_cnt, 0);
4032 wake_up(&mdev->misc_wait);
4033
7fde2be9
PR
4034 del_timer(&mdev->request_timer);
4035
b411b363 4036 del_timer_sync(&mdev->resync_timer);
b411b363
PR
4037 resync_timer_fn((unsigned long)mdev);
4038
b411b363
PR
4039 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4040 * w_make_resync_request etc. which may still be on the worker queue
4041 * to be "canceled" */
a21e9298 4042 drbd_flush_workqueue(mdev);
b411b363
PR
4043
4044 /* This also does reclaim_net_ee(). If we do this too early, we might
4045 * miss some resync ee and pages.*/
4046 drbd_process_done_ee(mdev);
4047
4048 kfree(mdev->p_uuid);
4049 mdev->p_uuid = NULL;
4050
fb22c402 4051 if (!is_susp(mdev->state))
2f5cdd0b 4052 tl_clear(mdev->tconn);
b411b363 4053
b411b363
PR
4054 drbd_md_sync(mdev);
4055
4056 fp = FP_DONT_CARE;
4057 if (get_ldev(mdev)) {
4058 fp = mdev->ldev->dc.fencing;
4059 put_ldev(mdev);
4060 }
4061
87f7be4c
PR
4062 if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
4063 drbd_try_outdate_peer_async(mdev);
b411b363 4064
20ceb2b2
LE
4065 /* serialize with bitmap writeout triggered by the state change,
4066 * if any. */
4067 wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4068
b411b363
PR
4069 /* tcp_close and release of sendpage pages can be deferred. I don't
4070 * want to use SO_LINGER, because apparently it can be deferred for
4071 * more than 20 seconds (longest time I checked).
4072 *
4073 * Actually we don't care for exactly when the network stack does its
4074 * put_page(), but release our reference on these pages right here.
4075 */
4076 i = drbd_release_ee(mdev, &mdev->net_ee);
4077 if (i)
4078 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
435f0740
LE
4079 i = atomic_read(&mdev->pp_in_use_by_net);
4080 if (i)
4081 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
b411b363
PR
4082 i = atomic_read(&mdev->pp_in_use);
4083 if (i)
45bb912b 4084 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
b411b363
PR
4085
4086 D_ASSERT(list_empty(&mdev->read_ee));
4087 D_ASSERT(list_empty(&mdev->active_ee));
4088 D_ASSERT(list_empty(&mdev->sync_ee));
4089 D_ASSERT(list_empty(&mdev->done_ee));
4090
4091 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4092 atomic_set(&mdev->current_epoch->epoch_size, 0);
4093 D_ASSERT(list_empty(&mdev->current_epoch->list));
360cc740
PR
4094
4095 return 0;
b411b363
PR
4096}
4097
4098/*
4099 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4100 * we can agree on is stored in agreed_pro_version.
4101 *
4102 * feature flags and the reserved array should be enough room for future
4103 * enhancements of the handshake protocol, and possible plugins...
4104 *
4105 * for now, they are expected to be zero, but ignored.
4106 */
8a22cccc 4107static int drbd_send_handshake(struct drbd_tconn *tconn)
b411b363 4108{
e6b3ea83 4109 /* ASSERT current == mdev->tconn->receiver ... */
8a22cccc 4110 struct p_handshake *p = &tconn->data.sbuf.handshake;
b411b363
PR
4111 int ok;
4112
8a22cccc
PR
4113 if (mutex_lock_interruptible(&tconn->data.mutex)) {
4114 conn_err(tconn, "interrupted during initial handshake\n");
b411b363
PR
4115 return 0; /* interrupted. not ok. */
4116 }
4117
8a22cccc
PR
4118 if (tconn->data.socket == NULL) {
4119 mutex_unlock(&tconn->data.mutex);
b411b363
PR
4120 return 0;
4121 }
4122
4123 memset(p, 0, sizeof(*p));
4124 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4125 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
8a22cccc
PR
4126 ok = _conn_send_cmd(tconn, 0, tconn->data.socket, P_HAND_SHAKE,
4127 &p->head, sizeof(*p), 0);
4128 mutex_unlock(&tconn->data.mutex);
b411b363
PR
4129 return ok;
4130}
4131
4132/*
4133 * return values:
4134 * 1 yes, we have a valid connection
4135 * 0 oops, did not work out, please try again
4136 * -1 peer talks different language,
4137 * no point in trying again, please go standalone.
4138 */
65d11ed6 4139static int drbd_do_handshake(struct drbd_tconn *tconn)
b411b363 4140{
65d11ed6
PR
4141 /* ASSERT current == tconn->receiver ... */
4142 struct p_handshake *p = &tconn->data.rbuf.handshake;
02918be2 4143 const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
77351055 4144 struct packet_info pi;
b411b363
PR
4145 int rv;
4146
65d11ed6 4147 rv = drbd_send_handshake(tconn);
b411b363
PR
4148 if (!rv)
4149 return 0;
4150
65d11ed6 4151 rv = drbd_recv_header(tconn, &pi);
b411b363
PR
4152 if (!rv)
4153 return 0;
4154
77351055 4155 if (pi.cmd != P_HAND_SHAKE) {
65d11ed6 4156 conn_err(tconn, "expected HandShake packet, received: %s (0x%04x)\n",
77351055 4157 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4158 return -1;
4159 }
4160
77351055 4161 if (pi.size != expect) {
65d11ed6 4162 conn_err(tconn, "expected HandShake length: %u, received: %u\n",
77351055 4163 expect, pi.size);
b411b363
PR
4164 return -1;
4165 }
4166
65d11ed6 4167 rv = drbd_recv(tconn, &p->head.payload, expect);
b411b363
PR
4168
4169 if (rv != expect) {
0ddc5549 4170 if (!signal_pending(current))
65d11ed6 4171 conn_warn(tconn, "short read receiving handshake packet: l=%u\n", rv);
b411b363
PR
4172 return 0;
4173 }
4174
b411b363
PR
4175 p->protocol_min = be32_to_cpu(p->protocol_min);
4176 p->protocol_max = be32_to_cpu(p->protocol_max);
4177 if (p->protocol_max == 0)
4178 p->protocol_max = p->protocol_min;
4179
4180 if (PRO_VERSION_MAX < p->protocol_min ||
4181 PRO_VERSION_MIN > p->protocol_max)
4182 goto incompat;
4183
65d11ed6 4184 tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
b411b363 4185
65d11ed6
PR
4186 conn_info(tconn, "Handshake successful: "
4187 "Agreed network protocol version %d\n", tconn->agreed_pro_version);
b411b363
PR
4188
4189 return 1;
4190
4191 incompat:
65d11ed6 4192 conn_err(tconn, "incompatible DRBD dialects: "
b411b363
PR
4193 "I support %d-%d, peer supports %d-%d\n",
4194 PRO_VERSION_MIN, PRO_VERSION_MAX,
4195 p->protocol_min, p->protocol_max);
4196 return -1;
4197}
4198
4199#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
13e6037d 4200static int drbd_do_auth(struct drbd_tconn *tconn)
b411b363
PR
4201{
4202 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4203 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
b10d96cb 4204 return -1;
b411b363
PR
4205}
4206#else
4207#define CHALLENGE_LEN 64
b10d96cb
JT
4208
4209/* Return value:
4210 1 - auth succeeded,
4211 0 - failed, try again (network error),
4212 -1 - auth failed, don't try again.
4213*/
4214
13e6037d 4215static int drbd_do_auth(struct drbd_tconn *tconn)
b411b363
PR
4216{
4217 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4218 struct scatterlist sg;
4219 char *response = NULL;
4220 char *right_response = NULL;
4221 char *peers_ch = NULL;
13e6037d 4222 unsigned int key_len = strlen(tconn->net_conf->shared_secret);
b411b363
PR
4223 unsigned int resp_size;
4224 struct hash_desc desc;
77351055 4225 struct packet_info pi;
b411b363
PR
4226 int rv;
4227
13e6037d 4228 desc.tfm = tconn->cram_hmac_tfm;
b411b363
PR
4229 desc.flags = 0;
4230
13e6037d
PR
4231 rv = crypto_hash_setkey(tconn->cram_hmac_tfm,
4232 (u8 *)tconn->net_conf->shared_secret, key_len);
b411b363 4233 if (rv) {
13e6037d 4234 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
b10d96cb 4235 rv = -1;
b411b363
PR
4236 goto fail;
4237 }
4238
4239 get_random_bytes(my_challenge, CHALLENGE_LEN);
4240
13e6037d 4241 rv = conn_send_cmd2(tconn, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
b411b363
PR
4242 if (!rv)
4243 goto fail;
4244
13e6037d 4245 rv = drbd_recv_header(tconn, &pi);
b411b363
PR
4246 if (!rv)
4247 goto fail;
4248
77351055 4249 if (pi.cmd != P_AUTH_CHALLENGE) {
13e6037d 4250 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
77351055 4251 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4252 rv = 0;
4253 goto fail;
4254 }
4255
77351055 4256 if (pi.size > CHALLENGE_LEN * 2) {
13e6037d 4257 conn_err(tconn, "expected AuthChallenge payload too big.\n");
b10d96cb 4258 rv = -1;
b411b363
PR
4259 goto fail;
4260 }
4261
77351055 4262 peers_ch = kmalloc(pi.size, GFP_NOIO);
b411b363 4263 if (peers_ch == NULL) {
13e6037d 4264 conn_err(tconn, "kmalloc of peers_ch failed\n");
b10d96cb 4265 rv = -1;
b411b363
PR
4266 goto fail;
4267 }
4268
13e6037d 4269 rv = drbd_recv(tconn, peers_ch, pi.size);
b411b363 4270
77351055 4271 if (rv != pi.size) {
0ddc5549 4272 if (!signal_pending(current))
13e6037d 4273 conn_warn(tconn, "short read AuthChallenge: l=%u\n", rv);
b411b363
PR
4274 rv = 0;
4275 goto fail;
4276 }
4277
13e6037d 4278 resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
b411b363
PR
4279 response = kmalloc(resp_size, GFP_NOIO);
4280 if (response == NULL) {
13e6037d 4281 conn_err(tconn, "kmalloc of response failed\n");
b10d96cb 4282 rv = -1;
b411b363
PR
4283 goto fail;
4284 }
4285
4286 sg_init_table(&sg, 1);
77351055 4287 sg_set_buf(&sg, peers_ch, pi.size);
b411b363
PR
4288
4289 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4290 if (rv) {
13e6037d 4291 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4292 rv = -1;
b411b363
PR
4293 goto fail;
4294 }
4295
13e6037d 4296 rv = conn_send_cmd2(tconn, P_AUTH_RESPONSE, response, resp_size);
b411b363
PR
4297 if (!rv)
4298 goto fail;
4299
13e6037d 4300 rv = drbd_recv_header(tconn, &pi);
b411b363
PR
4301 if (!rv)
4302 goto fail;
4303
77351055 4304 if (pi.cmd != P_AUTH_RESPONSE) {
13e6037d 4305 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
77351055 4306 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4307 rv = 0;
4308 goto fail;
4309 }
4310
77351055 4311 if (pi.size != resp_size) {
13e6037d 4312 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
b411b363
PR
4313 rv = 0;
4314 goto fail;
4315 }
4316
13e6037d 4317 rv = drbd_recv(tconn, response , resp_size);
b411b363
PR
4318
4319 if (rv != resp_size) {
0ddc5549 4320 if (!signal_pending(current))
13e6037d 4321 conn_warn(tconn, "short read receiving AuthResponse: l=%u\n", rv);
b411b363
PR
4322 rv = 0;
4323 goto fail;
4324 }
4325
4326 right_response = kmalloc(resp_size, GFP_NOIO);
2d1ee87d 4327 if (right_response == NULL) {
13e6037d 4328 conn_err(tconn, "kmalloc of right_response failed\n");
b10d96cb 4329 rv = -1;
b411b363
PR
4330 goto fail;
4331 }
4332
4333 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4334
4335 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4336 if (rv) {
13e6037d 4337 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4338 rv = -1;
b411b363
PR
4339 goto fail;
4340 }
4341
4342 rv = !memcmp(response, right_response, resp_size);
4343
4344 if (rv)
13e6037d
PR
4345 conn_info(tconn, "Peer authenticated using %d bytes of '%s' HMAC\n",
4346 resp_size, tconn->net_conf->cram_hmac_alg);
b10d96cb
JT
4347 else
4348 rv = -1;
b411b363
PR
4349
4350 fail:
4351 kfree(peers_ch);
4352 kfree(response);
4353 kfree(right_response);
4354
4355 return rv;
4356}
4357#endif
4358
4359int drbdd_init(struct drbd_thread *thi)
4360{
392c8801 4361 struct drbd_tconn *tconn = thi->tconn;
b411b363
PR
4362 int h;
4363
4d641dd7 4364 conn_info(tconn, "receiver (re)started\n");
b411b363
PR
4365
4366 do {
4d641dd7 4367 h = drbd_connect(tconn);
b411b363 4368 if (h == 0) {
4d641dd7 4369 drbd_disconnect(tconn);
20ee6390 4370 schedule_timeout_interruptible(HZ);
b411b363
PR
4371 }
4372 if (h == -1) {
4d641dd7 4373 conn_warn(tconn, "Discarding network configuration.\n");
bbeb641c 4374 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
4375 }
4376 } while (h == 0);
4377
4378 if (h > 0) {
4d641dd7
PR
4379 if (get_net_conf(tconn)) {
4380 drbdd(tconn);
4381 put_net_conf(tconn);
b411b363
PR
4382 }
4383 }
4384
4d641dd7 4385 drbd_disconnect(tconn);
b411b363 4386
4d641dd7 4387 conn_info(tconn, "receiver terminated\n");
b411b363
PR
4388 return 0;
4389}
4390
4391/* ********* acknowledge sender ******** */
4392
d8763023 4393static int got_RqSReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4394{
257d0af6 4395 struct p_req_state_reply *p = &mdev->tconn->meta.rbuf.req_state_reply;
fc3b10a4 4396 struct drbd_tconn *tconn = mdev->tconn;
b411b363
PR
4397
4398 int retcode = be32_to_cpu(p->retcode);
4399
fc3b10a4
PR
4400 if (cmd == P_STATE_CHG_REPLY) {
4401 if (retcode >= SS_SUCCESS) {
4402 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4403 } else {
4404 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4405 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4406 drbd_set_st_err_str(retcode), retcode);
4407 }
4408 wake_up(&mdev->state_wait);
4409 } else /* conn == P_CONN_ST_CHG_REPLY */ {
4410 if (retcode >= SS_SUCCESS) {
4411 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4412 } else {
4413 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4414 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4415 drbd_set_st_err_str(retcode), retcode);
4416 }
4417 wake_up(&tconn->ping_wait);
b411b363 4418 }
81e84650 4419 return true;
b411b363
PR
4420}
4421
d8763023 4422static int got_Ping(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4423{
2a67d8b9 4424 return drbd_send_ping_ack(mdev->tconn);
b411b363
PR
4425
4426}
4427
d8763023 4428static int got_PingAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4429{
2a67d8b9 4430 struct drbd_tconn *tconn = mdev->tconn;
b411b363 4431 /* restore idle timeout */
2a67d8b9
PR
4432 tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4433 if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4434 wake_up(&tconn->ping_wait);
b411b363 4435
81e84650 4436 return true;
b411b363
PR
4437}
4438
d8763023 4439static int got_IsInSync(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4440{
257d0af6 4441 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4442 sector_t sector = be64_to_cpu(p->sector);
4443 int blksize = be32_to_cpu(p->blksize);
4444
31890f4a 4445 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
b411b363
PR
4446
4447 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4448
1d53f09e
LE
4449 if (get_ldev(mdev)) {
4450 drbd_rs_complete_io(mdev, sector);
4451 drbd_set_in_sync(mdev, sector, blksize);
4452 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4453 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4454 put_ldev(mdev);
4455 }
b411b363 4456 dec_rs_pending(mdev);
778f271d 4457 atomic_add(blksize >> 9, &mdev->rs_sect_in);
b411b363 4458
81e84650 4459 return true;
b411b363
PR
4460}
4461
bc9c5c41
AG
4462static int
4463validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4464 struct rb_root *root, const char *func,
4465 enum drbd_req_event what, bool missing_ok)
b411b363
PR
4466{
4467 struct drbd_request *req;
4468 struct bio_and_error m;
4469
87eeee41 4470 spin_lock_irq(&mdev->tconn->req_lock);
bc9c5c41 4471 req = find_request(mdev, root, id, sector, missing_ok, func);
b411b363 4472 if (unlikely(!req)) {
87eeee41 4473 spin_unlock_irq(&mdev->tconn->req_lock);
81e84650 4474 return false;
b411b363
PR
4475 }
4476 __req_mod(req, what, &m);
87eeee41 4477 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
4478
4479 if (m.bio)
4480 complete_master_bio(mdev, &m);
81e84650 4481 return true;
b411b363
PR
4482}
4483
d8763023 4484static int got_BlockAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4485{
257d0af6 4486 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4487 sector_t sector = be64_to_cpu(p->sector);
4488 int blksize = be32_to_cpu(p->blksize);
4489 enum drbd_req_event what;
4490
4491 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4492
579b57ed 4493 if (p->block_id == ID_SYNCER) {
b411b363
PR
4494 drbd_set_in_sync(mdev, sector, blksize);
4495 dec_rs_pending(mdev);
81e84650 4496 return true;
b411b363 4497 }
257d0af6 4498 switch (cmd) {
b411b363 4499 case P_RS_WRITE_ACK:
89e58e75 4500 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4501 what = WRITE_ACKED_BY_PEER_AND_SIS;
b411b363
PR
4502 break;
4503 case P_WRITE_ACK:
89e58e75 4504 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4505 what = WRITE_ACKED_BY_PEER;
b411b363
PR
4506 break;
4507 case P_RECV_ACK:
89e58e75 4508 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B);
8554df1c 4509 what = RECV_ACKED_BY_PEER;
b411b363 4510 break;
7be8da07 4511 case P_DISCARD_WRITE:
89e58e75 4512 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
7be8da07
AG
4513 what = DISCARD_WRITE;
4514 break;
4515 case P_RETRY_WRITE:
4516 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4517 what = POSTPONE_WRITE;
b411b363
PR
4518 break;
4519 default:
4520 D_ASSERT(0);
81e84650 4521 return false;
b411b363
PR
4522 }
4523
4524 return validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41
AG
4525 &mdev->write_requests, __func__,
4526 what, false);
b411b363
PR
4527}
4528
d8763023 4529static int got_NegAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4530{
257d0af6 4531 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363 4532 sector_t sector = be64_to_cpu(p->sector);
2deb8336 4533 int size = be32_to_cpu(p->blksize);
89e58e75
PR
4534 bool missing_ok = mdev->tconn->net_conf->wire_protocol == DRBD_PROT_A ||
4535 mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B;
c3afd8f5 4536 bool found;
b411b363
PR
4537
4538 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4539
579b57ed 4540 if (p->block_id == ID_SYNCER) {
b411b363
PR
4541 dec_rs_pending(mdev);
4542 drbd_rs_failed_io(mdev, sector, size);
81e84650 4543 return true;
b411b363 4544 }
2deb8336 4545
c3afd8f5 4546 found = validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41 4547 &mdev->write_requests, __func__,
8554df1c 4548 NEG_ACKED, missing_ok);
c3afd8f5
AG
4549 if (!found) {
4550 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4551 The master bio might already be completed, therefore the
4552 request is no longer in the collision hash. */
4553 /* In Protocol B we might already have got a P_RECV_ACK
4554 but then get a P_NEG_ACK afterwards. */
4555 if (!missing_ok)
2deb8336 4556 return false;
c3afd8f5 4557 drbd_set_out_of_sync(mdev, sector, size);
2deb8336 4558 }
2deb8336 4559 return true;
b411b363
PR
4560}
4561
d8763023 4562static int got_NegDReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4563{
257d0af6 4564 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4565 sector_t sector = be64_to_cpu(p->sector);
4566
4567 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
7be8da07 4568
b411b363
PR
4569 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4570 (unsigned long long)sector, be32_to_cpu(p->blksize));
4571
4572 return validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41 4573 &mdev->read_requests, __func__,
8554df1c 4574 NEG_ACKED, false);
b411b363
PR
4575}
4576
d8763023 4577static int got_NegRSDReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363
PR
4578{
4579 sector_t sector;
4580 int size;
257d0af6 4581 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4582
4583 sector = be64_to_cpu(p->sector);
4584 size = be32_to_cpu(p->blksize);
b411b363
PR
4585
4586 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4587
4588 dec_rs_pending(mdev);
4589
4590 if (get_ldev_if_state(mdev, D_FAILED)) {
4591 drbd_rs_complete_io(mdev, sector);
257d0af6 4592 switch (cmd) {
d612d309
PR
4593 case P_NEG_RS_DREPLY:
4594 drbd_rs_failed_io(mdev, sector, size);
4595 case P_RS_CANCEL:
4596 break;
4597 default:
4598 D_ASSERT(0);
4599 put_ldev(mdev);
4600 return false;
4601 }
b411b363
PR
4602 put_ldev(mdev);
4603 }
4604
81e84650 4605 return true;
b411b363
PR
4606}
4607
d8763023 4608static int got_BarrierAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4609{
257d0af6 4610 struct p_barrier_ack *p = &mdev->tconn->meta.rbuf.barrier_ack;
b411b363 4611
2f5cdd0b 4612 tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
b411b363 4613
c4752ef1
PR
4614 if (mdev->state.conn == C_AHEAD &&
4615 atomic_read(&mdev->ap_in_flight) == 0 &&
370a43e7
PR
4616 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4617 mdev->start_resync_timer.expires = jiffies + HZ;
4618 add_timer(&mdev->start_resync_timer);
c4752ef1
PR
4619 }
4620
81e84650 4621 return true;
b411b363
PR
4622}
4623
d8763023 4624static int got_OVResult(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4625{
257d0af6 4626 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
b411b363
PR
4627 struct drbd_work *w;
4628 sector_t sector;
4629 int size;
4630
4631 sector = be64_to_cpu(p->sector);
4632 size = be32_to_cpu(p->blksize);
4633
4634 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4635
4636 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4637 drbd_ov_oos_found(mdev, sector, size);
4638 else
4639 ov_oos_print(mdev);
4640
1d53f09e 4641 if (!get_ldev(mdev))
81e84650 4642 return true;
1d53f09e 4643
b411b363
PR
4644 drbd_rs_complete_io(mdev, sector);
4645 dec_rs_pending(mdev);
4646
ea5442af
LE
4647 --mdev->ov_left;
4648
4649 /* let's advance progress step marks only for every other megabyte */
4650 if ((mdev->ov_left & 0x200) == 0x200)
4651 drbd_advance_rs_marks(mdev, mdev->ov_left);
4652
4653 if (mdev->ov_left == 0) {
b411b363
PR
4654 w = kmalloc(sizeof(*w), GFP_NOIO);
4655 if (w) {
4656 w->cb = w_ov_finished;
a21e9298 4657 w->mdev = mdev;
e42325a5 4658 drbd_queue_work_front(&mdev->tconn->data.work, w);
b411b363
PR
4659 } else {
4660 dev_err(DEV, "kmalloc(w) failed.");
4661 ov_oos_print(mdev);
4662 drbd_resync_finished(mdev);
4663 }
4664 }
1d53f09e 4665 put_ldev(mdev);
81e84650 4666 return true;
b411b363
PR
4667}
4668
d8763023 4669static int got_skip(struct drbd_conf *mdev, enum drbd_packet cmd)
0ced55a3 4670{
81e84650 4671 return true;
0ced55a3
PR
4672}
4673
32862ec7
PR
4674static int tconn_process_done_ee(struct drbd_tconn *tconn)
4675{
082a3439
PR
4676 struct drbd_conf *mdev;
4677 int i, not_empty = 0;
32862ec7
PR
4678
4679 do {
4680 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4681 flush_signals(current);
082a3439
PR
4682 idr_for_each_entry(&tconn->volumes, mdev, i) {
4683 if (!drbd_process_done_ee(mdev))
4684 return 1; /* error */
4685 }
32862ec7 4686 set_bit(SIGNAL_ASENDER, &tconn->flags);
082a3439
PR
4687
4688 spin_lock_irq(&tconn->req_lock);
4689 idr_for_each_entry(&tconn->volumes, mdev, i) {
4690 not_empty = !list_empty(&mdev->done_ee);
4691 if (not_empty)
4692 break;
4693 }
4694 spin_unlock_irq(&tconn->req_lock);
32862ec7
PR
4695 } while (not_empty);
4696
4697 return 0;
4698}
4699
7201b972
AG
4700struct asender_cmd {
4701 size_t pkt_size;
4702 int (*process)(struct drbd_conf *, enum drbd_packet);
4703};
4704
4705static struct asender_cmd asender_tbl[] = {
4706 [P_PING] = { sizeof(struct p_header), got_Ping },
4707 [P_PING_ACK] = { sizeof(struct p_header), got_PingAck },
4708 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4709 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4710 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4711 [P_DISCARD_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
4712 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
4713 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
4714 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply},
4715 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
4716 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
4717 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4718 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
4719 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
4720 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply},
4721 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_RqSReply },
4722 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
4723};
4724
b411b363
PR
4725int drbd_asender(struct drbd_thread *thi)
4726{
392c8801 4727 struct drbd_tconn *tconn = thi->tconn;
32862ec7 4728 struct p_header *h = &tconn->meta.rbuf.header;
b411b363 4729 struct asender_cmd *cmd = NULL;
77351055 4730 struct packet_info pi;
257d0af6 4731 int rv;
b411b363
PR
4732 void *buf = h;
4733 int received = 0;
257d0af6 4734 int expect = sizeof(struct p_header);
f36af18c 4735 int ping_timeout_active = 0;
b411b363 4736
b411b363
PR
4737 current->policy = SCHED_RR; /* Make this a realtime task! */
4738 current->rt_priority = 2; /* more important than all other tasks */
4739
e77a0a5c 4740 while (get_t_state(thi) == RUNNING) {
80822284 4741 drbd_thread_current_set_cpu(thi);
32862ec7 4742 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
2a67d8b9 4743 if (!drbd_send_ping(tconn)) {
32862ec7 4744 conn_err(tconn, "drbd_send_ping has failed\n");
841ce241
AG
4745 goto reconnect;
4746 }
32862ec7
PR
4747 tconn->meta.socket->sk->sk_rcvtimeo =
4748 tconn->net_conf->ping_timeo*HZ/10;
f36af18c 4749 ping_timeout_active = 1;
b411b363
PR
4750 }
4751
32862ec7
PR
4752 /* TODO: conditionally cork; it may hurt latency if we cork without
4753 much to send */
4754 if (!tconn->net_conf->no_cork)
4755 drbd_tcp_cork(tconn->meta.socket);
082a3439
PR
4756 if (tconn_process_done_ee(tconn)) {
4757 conn_err(tconn, "tconn_process_done_ee() failed\n");
32862ec7 4758 goto reconnect;
082a3439 4759 }
b411b363 4760 /* but unconditionally uncork unless disabled */
32862ec7
PR
4761 if (!tconn->net_conf->no_cork)
4762 drbd_tcp_uncork(tconn->meta.socket);
b411b363
PR
4763
4764 /* short circuit, recv_msg would return EINTR anyways. */
4765 if (signal_pending(current))
4766 continue;
4767
32862ec7
PR
4768 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
4769 clear_bit(SIGNAL_ASENDER, &tconn->flags);
b411b363
PR
4770
4771 flush_signals(current);
4772
4773 /* Note:
4774 * -EINTR (on meta) we got a signal
4775 * -EAGAIN (on meta) rcvtimeo expired
4776 * -ECONNRESET other side closed the connection
4777 * -ERESTARTSYS (on data) we got a signal
4778 * rv < 0 other than above: unexpected error!
4779 * rv == expected: full header or command
4780 * rv < expected: "woken" by signal during receive
4781 * rv == 0 : "connection shut down by peer"
4782 */
4783 if (likely(rv > 0)) {
4784 received += rv;
4785 buf += rv;
4786 } else if (rv == 0) {
32862ec7 4787 conn_err(tconn, "meta connection shut down by peer.\n");
b411b363
PR
4788 goto reconnect;
4789 } else if (rv == -EAGAIN) {
cb6518cb
LE
4790 /* If the data socket received something meanwhile,
4791 * that is good enough: peer is still alive. */
32862ec7
PR
4792 if (time_after(tconn->last_received,
4793 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
cb6518cb 4794 continue;
f36af18c 4795 if (ping_timeout_active) {
32862ec7 4796 conn_err(tconn, "PingAck did not arrive in time.\n");
b411b363
PR
4797 goto reconnect;
4798 }
32862ec7 4799 set_bit(SEND_PING, &tconn->flags);
b411b363
PR
4800 continue;
4801 } else if (rv == -EINTR) {
4802 continue;
4803 } else {
32862ec7 4804 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
b411b363
PR
4805 goto reconnect;
4806 }
4807
4808 if (received == expect && cmd == NULL) {
32862ec7 4809 if (!decode_header(tconn, h, &pi))
b411b363 4810 goto reconnect;
7201b972
AG
4811 cmd = &asender_tbl[pi.cmd];
4812 if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd) {
32862ec7 4813 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
77351055 4814 pi.cmd, pi.size);
b411b363
PR
4815 goto disconnect;
4816 }
4817 expect = cmd->pkt_size;
77351055 4818 if (pi.size != expect - sizeof(struct p_header)) {
32862ec7 4819 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
77351055 4820 pi.cmd, pi.size);
b411b363 4821 goto reconnect;
257d0af6 4822 }
b411b363
PR
4823 }
4824 if (received == expect) {
32862ec7
PR
4825 tconn->last_received = jiffies;
4826 if (!cmd->process(vnr_to_mdev(tconn, pi.vnr), pi.cmd))
b411b363
PR
4827 goto reconnect;
4828
f36af18c
LE
4829 /* the idle_timeout (ping-int)
4830 * has been restored in got_PingAck() */
7201b972 4831 if (cmd == &asender_tbl[P_PING_ACK])
f36af18c
LE
4832 ping_timeout_active = 0;
4833
b411b363
PR
4834 buf = h;
4835 received = 0;
257d0af6 4836 expect = sizeof(struct p_header);
b411b363
PR
4837 cmd = NULL;
4838 }
4839 }
4840
4841 if (0) {
4842reconnect:
bbeb641c 4843 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
b411b363
PR
4844 }
4845 if (0) {
4846disconnect:
bbeb641c 4847 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 4848 }
32862ec7 4849 clear_bit(SIGNAL_ASENDER, &tconn->flags);
b411b363 4850
32862ec7 4851 conn_info(tconn, "asender terminated\n");
b411b363
PR
4852
4853 return 0;
4854}