]> git.proxmox.com Git - mirror_ubuntu-bionic-kernel.git/blame - drivers/block/drbd/drbd_receiver.c
drbd: receive_protocol(): Make the program flow less confusing
[mirror_ubuntu-bionic-kernel.git] / drivers / block / drbd / drbd_receiver.c
CommitLineData
b411b363
PR
1/*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
b411b363
PR
26#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
b411b363
PR
31#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
b411b363
PR
39#include <linux/pkt_sched.h>
40#define __KERNEL_SYSCALLS__
41#include <linux/unistd.h>
42#include <linux/vmalloc.h>
43#include <linux/random.h>
b411b363
PR
44#include <linux/string.h>
45#include <linux/scatterlist.h>
46#include "drbd_int.h"
b411b363
PR
47#include "drbd_req.h"
48
49#include "drbd_vli.h"
50
77351055
PR
51struct packet_info {
52 enum drbd_packet cmd;
e2857216
AG
53 unsigned int size;
54 unsigned int vnr;
e658983a 55 void *data;
77351055
PR
56};
57
b411b363
PR
58enum finish_epoch {
59 FE_STILL_LIVE,
60 FE_DESTROYED,
61 FE_RECYCLED,
62};
63
6038178e 64static int drbd_do_features(struct drbd_tconn *tconn);
13e6037d 65static int drbd_do_auth(struct drbd_tconn *tconn);
c141ebda 66static int drbd_disconnected(struct drbd_conf *mdev);
b411b363
PR
67
68static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
99920dc5 69static int e_end_block(struct drbd_work *, int);
b411b363 70
b411b363
PR
71
72#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
45bb912b
LE
74/*
75 * some helper functions to deal with single linked page lists,
76 * page->private being our "next" pointer.
77 */
78
79/* If at least n pages are linked at head, get n pages off.
80 * Otherwise, don't modify head, and return NULL.
81 * Locking is the responsibility of the caller.
82 */
83static struct page *page_chain_del(struct page **head, int n)
84{
85 struct page *page;
86 struct page *tmp;
87
88 BUG_ON(!n);
89 BUG_ON(!head);
90
91 page = *head;
23ce4227
PR
92
93 if (!page)
94 return NULL;
95
45bb912b
LE
96 while (page) {
97 tmp = page_chain_next(page);
98 if (--n == 0)
99 break; /* found sufficient pages */
100 if (tmp == NULL)
101 /* insufficient pages, don't use any of them. */
102 return NULL;
103 page = tmp;
104 }
105
106 /* add end of list marker for the returned list */
107 set_page_private(page, 0);
108 /* actual return value, and adjustment of head */
109 page = *head;
110 *head = tmp;
111 return page;
112}
113
114/* may be used outside of locks to find the tail of a (usually short)
115 * "private" page chain, before adding it back to a global chain head
116 * with page_chain_add() under a spinlock. */
117static struct page *page_chain_tail(struct page *page, int *len)
118{
119 struct page *tmp;
120 int i = 1;
121 while ((tmp = page_chain_next(page)))
122 ++i, page = tmp;
123 if (len)
124 *len = i;
125 return page;
126}
127
128static int page_chain_free(struct page *page)
129{
130 struct page *tmp;
131 int i = 0;
132 page_chain_for_each_safe(page, tmp) {
133 put_page(page);
134 ++i;
135 }
136 return i;
137}
138
139static void page_chain_add(struct page **head,
140 struct page *chain_first, struct page *chain_last)
141{
142#if 1
143 struct page *tmp;
144 tmp = page_chain_tail(chain_first, NULL);
145 BUG_ON(tmp != chain_last);
146#endif
147
148 /* add chain to head */
149 set_page_private(chain_last, (unsigned long)*head);
150 *head = chain_first;
151}
152
18c2d522
AG
153static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
154 unsigned int number)
b411b363
PR
155{
156 struct page *page = NULL;
45bb912b 157 struct page *tmp = NULL;
18c2d522 158 unsigned int i = 0;
b411b363
PR
159
160 /* Yes, testing drbd_pp_vacant outside the lock is racy.
161 * So what. It saves a spin_lock. */
45bb912b 162 if (drbd_pp_vacant >= number) {
b411b363 163 spin_lock(&drbd_pp_lock);
45bb912b
LE
164 page = page_chain_del(&drbd_pp_pool, number);
165 if (page)
166 drbd_pp_vacant -= number;
b411b363 167 spin_unlock(&drbd_pp_lock);
45bb912b
LE
168 if (page)
169 return page;
b411b363 170 }
45bb912b 171
b411b363
PR
172 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
173 * "criss-cross" setup, that might cause write-out on some other DRBD,
174 * which in turn might block on the other node at this very place. */
45bb912b
LE
175 for (i = 0; i < number; i++) {
176 tmp = alloc_page(GFP_TRY);
177 if (!tmp)
178 break;
179 set_page_private(tmp, (unsigned long)page);
180 page = tmp;
181 }
182
183 if (i == number)
184 return page;
185
186 /* Not enough pages immediately available this time.
c37c8ecf 187 * No need to jump around here, drbd_alloc_pages will retry this
45bb912b
LE
188 * function "soon". */
189 if (page) {
190 tmp = page_chain_tail(page, NULL);
191 spin_lock(&drbd_pp_lock);
192 page_chain_add(&drbd_pp_pool, page, tmp);
193 drbd_pp_vacant += i;
194 spin_unlock(&drbd_pp_lock);
195 }
196 return NULL;
b411b363
PR
197}
198
a990be46
AG
199static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
200 struct list_head *to_be_freed)
b411b363 201{
db830c46 202 struct drbd_peer_request *peer_req;
b411b363
PR
203 struct list_head *le, *tle;
204
205 /* The EEs are always appended to the end of the list. Since
206 they are sent in order over the wire, they have to finish
207 in order. As soon as we see the first not finished we can
208 stop to examine the list... */
209
210 list_for_each_safe(le, tle, &mdev->net_ee) {
db830c46 211 peer_req = list_entry(le, struct drbd_peer_request, w.list);
045417f7 212 if (drbd_peer_req_has_active_page(peer_req))
b411b363
PR
213 break;
214 list_move(le, to_be_freed);
215 }
216}
217
218static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
219{
220 LIST_HEAD(reclaimed);
db830c46 221 struct drbd_peer_request *peer_req, *t;
b411b363 222
87eeee41 223 spin_lock_irq(&mdev->tconn->req_lock);
a990be46 224 reclaim_finished_net_peer_reqs(mdev, &reclaimed);
87eeee41 225 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 226
db830c46 227 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
3967deb1 228 drbd_free_net_peer_req(mdev, peer_req);
b411b363
PR
229}
230
231/**
c37c8ecf 232 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
b411b363 233 * @mdev: DRBD device.
45bb912b
LE
234 * @number: number of pages requested
235 * @retry: whether to retry, if not enough pages are available right now
236 *
237 * Tries to allocate number pages, first from our own page pool, then from
238 * the kernel, unless this allocation would exceed the max_buffers setting.
239 * Possibly retry until DRBD frees sufficient pages somewhere else.
b411b363 240 *
45bb912b 241 * Returns a page chain linked via page->private.
b411b363 242 */
c37c8ecf
AG
243struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
244 bool retry)
b411b363
PR
245{
246 struct page *page = NULL;
44ed167d 247 struct net_conf *nc;
b411b363 248 DEFINE_WAIT(wait);
44ed167d 249 int mxb;
b411b363 250
45bb912b
LE
251 /* Yes, we may run up to @number over max_buffers. If we
252 * follow it strictly, the admin will get it wrong anyways. */
44ed167d
PR
253 rcu_read_lock();
254 nc = rcu_dereference(mdev->tconn->net_conf);
255 mxb = nc ? nc->max_buffers : 1000000;
256 rcu_read_unlock();
257
258 if (atomic_read(&mdev->pp_in_use) < mxb)
18c2d522 259 page = __drbd_alloc_pages(mdev, number);
b411b363 260
45bb912b 261 while (page == NULL) {
b411b363
PR
262 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
263
264 drbd_kick_lo_and_reclaim_net(mdev);
265
44ed167d 266 if (atomic_read(&mdev->pp_in_use) < mxb) {
18c2d522 267 page = __drbd_alloc_pages(mdev, number);
b411b363
PR
268 if (page)
269 break;
270 }
271
272 if (!retry)
273 break;
274
275 if (signal_pending(current)) {
c37c8ecf 276 dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
b411b363
PR
277 break;
278 }
279
280 schedule();
281 }
282 finish_wait(&drbd_pp_wait, &wait);
283
45bb912b
LE
284 if (page)
285 atomic_add(number, &mdev->pp_in_use);
b411b363
PR
286 return page;
287}
288
c37c8ecf 289/* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
87eeee41 290 * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
45bb912b
LE
291 * Either links the page chain back to the global pool,
292 * or returns all pages to the system. */
5cc287e0 293static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
b411b363 294{
435f0740 295 atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
b411b363 296 int i;
435f0740 297
81a5d60e 298 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
45bb912b
LE
299 i = page_chain_free(page);
300 else {
301 struct page *tmp;
302 tmp = page_chain_tail(page, &i);
303 spin_lock(&drbd_pp_lock);
304 page_chain_add(&drbd_pp_pool, page, tmp);
305 drbd_pp_vacant += i;
306 spin_unlock(&drbd_pp_lock);
b411b363 307 }
435f0740 308 i = atomic_sub_return(i, a);
45bb912b 309 if (i < 0)
435f0740
LE
310 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
311 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
b411b363
PR
312 wake_up(&drbd_pp_wait);
313}
314
315/*
316You need to hold the req_lock:
317 _drbd_wait_ee_list_empty()
318
319You must not have the req_lock:
3967deb1 320 drbd_free_peer_req()
0db55363 321 drbd_alloc_peer_req()
7721f567 322 drbd_free_peer_reqs()
b411b363 323 drbd_ee_fix_bhs()
a990be46 324 drbd_finish_peer_reqs()
b411b363
PR
325 drbd_clear_done_ee()
326 drbd_wait_ee_list_empty()
327*/
328
f6ffca9f 329struct drbd_peer_request *
0db55363
AG
330drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
331 unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
b411b363 332{
db830c46 333 struct drbd_peer_request *peer_req;
b411b363 334 struct page *page;
45bb912b 335 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
b411b363 336
0cf9d27e 337 if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
b411b363
PR
338 return NULL;
339
db830c46
AG
340 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
341 if (!peer_req) {
b411b363 342 if (!(gfp_mask & __GFP_NOWARN))
0db55363 343 dev_err(DEV, "%s: allocation failed\n", __func__);
b411b363
PR
344 return NULL;
345 }
346
c37c8ecf 347 page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
45bb912b
LE
348 if (!page)
349 goto fail;
b411b363 350
db830c46
AG
351 drbd_clear_interval(&peer_req->i);
352 peer_req->i.size = data_size;
353 peer_req->i.sector = sector;
354 peer_req->i.local = false;
355 peer_req->i.waiting = false;
356
357 peer_req->epoch = NULL;
a21e9298 358 peer_req->w.mdev = mdev;
db830c46
AG
359 peer_req->pages = page;
360 atomic_set(&peer_req->pending_bios, 0);
361 peer_req->flags = 0;
9a8e7753
AG
362 /*
363 * The block_id is opaque to the receiver. It is not endianness
364 * converted, and sent back to the sender unchanged.
365 */
db830c46 366 peer_req->block_id = id;
b411b363 367
db830c46 368 return peer_req;
b411b363 369
45bb912b 370 fail:
db830c46 371 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
372 return NULL;
373}
374
3967deb1 375void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
f6ffca9f 376 int is_net)
b411b363 377{
db830c46
AG
378 if (peer_req->flags & EE_HAS_DIGEST)
379 kfree(peer_req->digest);
5cc287e0 380 drbd_free_pages(mdev, peer_req->pages, is_net);
db830c46
AG
381 D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
382 D_ASSERT(drbd_interval_empty(&peer_req->i));
383 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
384}
385
7721f567 386int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
b411b363
PR
387{
388 LIST_HEAD(work_list);
db830c46 389 struct drbd_peer_request *peer_req, *t;
b411b363 390 int count = 0;
435f0740 391 int is_net = list == &mdev->net_ee;
b411b363 392
87eeee41 393 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 394 list_splice_init(list, &work_list);
87eeee41 395 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 396
db830c46 397 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
3967deb1 398 __drbd_free_peer_req(mdev, peer_req, is_net);
b411b363
PR
399 count++;
400 }
401 return count;
402}
403
a990be46
AG
404/*
405 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
b411b363 406 */
a990be46 407static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
b411b363
PR
408{
409 LIST_HEAD(work_list);
410 LIST_HEAD(reclaimed);
db830c46 411 struct drbd_peer_request *peer_req, *t;
e2b3032b 412 int err = 0;
b411b363 413
87eeee41 414 spin_lock_irq(&mdev->tconn->req_lock);
a990be46 415 reclaim_finished_net_peer_reqs(mdev, &reclaimed);
b411b363 416 list_splice_init(&mdev->done_ee, &work_list);
87eeee41 417 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 418
db830c46 419 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
3967deb1 420 drbd_free_net_peer_req(mdev, peer_req);
b411b363
PR
421
422 /* possible callbacks here:
7be8da07 423 * e_end_block, and e_end_resync_block, e_send_discard_write.
b411b363
PR
424 * all ignore the last argument.
425 */
db830c46 426 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
e2b3032b
AG
427 int err2;
428
b411b363 429 /* list_del not necessary, next/prev members not touched */
e2b3032b
AG
430 err2 = peer_req->w.cb(&peer_req->w, !!err);
431 if (!err)
432 err = err2;
3967deb1 433 drbd_free_peer_req(mdev, peer_req);
b411b363
PR
434 }
435 wake_up(&mdev->ee_wait);
436
e2b3032b 437 return err;
b411b363
PR
438}
439
d4da1537
AG
440static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
441 struct list_head *head)
b411b363
PR
442{
443 DEFINE_WAIT(wait);
444
445 /* avoids spin_lock/unlock
446 * and calling prepare_to_wait in the fast path */
447 while (!list_empty(head)) {
448 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
87eeee41 449 spin_unlock_irq(&mdev->tconn->req_lock);
7eaceacc 450 io_schedule();
b411b363 451 finish_wait(&mdev->ee_wait, &wait);
87eeee41 452 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
453 }
454}
455
d4da1537
AG
456static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
457 struct list_head *head)
b411b363 458{
87eeee41 459 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 460 _drbd_wait_ee_list_empty(mdev, head);
87eeee41 461 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
462}
463
464/* see also kernel_accept; which is only present since 2.6.18.
465 * also we want to log which part of it failed, exactly */
7653620d 466static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
b411b363
PR
467{
468 struct sock *sk = sock->sk;
469 int err = 0;
470
471 *what = "listen";
472 err = sock->ops->listen(sock, 5);
473 if (err < 0)
474 goto out;
475
476 *what = "sock_create_lite";
477 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
478 newsock);
479 if (err < 0)
480 goto out;
481
482 *what = "accept";
483 err = sock->ops->accept(sock, *newsock, 0);
484 if (err < 0) {
485 sock_release(*newsock);
486 *newsock = NULL;
487 goto out;
488 }
489 (*newsock)->ops = sock->ops;
490
491out:
492 return err;
493}
494
dbd9eea0 495static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
b411b363
PR
496{
497 mm_segment_t oldfs;
498 struct kvec iov = {
499 .iov_base = buf,
500 .iov_len = size,
501 };
502 struct msghdr msg = {
503 .msg_iovlen = 1,
504 .msg_iov = (struct iovec *)&iov,
505 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
506 };
507 int rv;
508
509 oldfs = get_fs();
510 set_fs(KERNEL_DS);
511 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
512 set_fs(oldfs);
513
514 return rv;
515}
516
de0ff338 517static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
b411b363
PR
518{
519 mm_segment_t oldfs;
520 struct kvec iov = {
521 .iov_base = buf,
522 .iov_len = size,
523 };
524 struct msghdr msg = {
525 .msg_iovlen = 1,
526 .msg_iov = (struct iovec *)&iov,
527 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
528 };
529 int rv;
530
531 oldfs = get_fs();
532 set_fs(KERNEL_DS);
533
534 for (;;) {
de0ff338 535 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
b411b363
PR
536 if (rv == size)
537 break;
538
539 /* Note:
540 * ECONNRESET other side closed the connection
541 * ERESTARTSYS (on sock) we got a signal
542 */
543
544 if (rv < 0) {
545 if (rv == -ECONNRESET)
de0ff338 546 conn_info(tconn, "sock was reset by peer\n");
b411b363 547 else if (rv != -ERESTARTSYS)
de0ff338 548 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
b411b363
PR
549 break;
550 } else if (rv == 0) {
de0ff338 551 conn_info(tconn, "sock was shut down by peer\n");
b411b363
PR
552 break;
553 } else {
554 /* signal came in, or peer/link went down,
555 * after we read a partial message
556 */
557 /* D_ASSERT(signal_pending(current)); */
558 break;
559 }
560 };
561
562 set_fs(oldfs);
563
564 if (rv != size)
bbeb641c 565 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
b411b363
PR
566
567 return rv;
568}
569
c6967746
AG
570static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
571{
572 int err;
573
574 err = drbd_recv(tconn, buf, size);
575 if (err != size) {
576 if (err >= 0)
577 err = -EIO;
578 } else
579 err = 0;
580 return err;
581}
582
a5c31904
AG
583static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
584{
585 int err;
586
587 err = drbd_recv_all(tconn, buf, size);
588 if (err && !signal_pending(current))
589 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
590 return err;
591}
592
5dbf1673
LE
593/* quoting tcp(7):
594 * On individual connections, the socket buffer size must be set prior to the
595 * listen(2) or connect(2) calls in order to have it take effect.
596 * This is our wrapper to do so.
597 */
598static void drbd_setbufsize(struct socket *sock, unsigned int snd,
599 unsigned int rcv)
600{
601 /* open coded SO_SNDBUF, SO_RCVBUF */
602 if (snd) {
603 sock->sk->sk_sndbuf = snd;
604 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
605 }
606 if (rcv) {
607 sock->sk->sk_rcvbuf = rcv;
608 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
609 }
610}
611
eac3e990 612static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
b411b363
PR
613{
614 const char *what;
615 struct socket *sock;
616 struct sockaddr_in6 src_in6;
44ed167d
PR
617 struct sockaddr_in6 peer_in6;
618 struct net_conf *nc;
619 int err, peer_addr_len, my_addr_len;
69ef82de 620 int sndbuf_size, rcvbuf_size, connect_int;
b411b363
PR
621 int disconnect_on_error = 1;
622
44ed167d
PR
623 rcu_read_lock();
624 nc = rcu_dereference(tconn->net_conf);
625 if (!nc) {
626 rcu_read_unlock();
b411b363 627 return NULL;
44ed167d 628 }
44ed167d
PR
629 sndbuf_size = nc->sndbuf_size;
630 rcvbuf_size = nc->rcvbuf_size;
69ef82de 631 connect_int = nc->connect_int;
089c075d 632 rcu_read_unlock();
44ed167d 633
089c075d
AG
634 my_addr_len = min_t(int, tconn->my_addr_len, sizeof(src_in6));
635 memcpy(&src_in6, &tconn->my_addr, my_addr_len);
44ed167d 636
089c075d 637 if (((struct sockaddr *)&tconn->my_addr)->sa_family == AF_INET6)
44ed167d
PR
638 src_in6.sin6_port = 0;
639 else
640 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
641
089c075d
AG
642 peer_addr_len = min_t(int, tconn->peer_addr_len, sizeof(src_in6));
643 memcpy(&peer_in6, &tconn->peer_addr, peer_addr_len);
b411b363
PR
644
645 what = "sock_create_kern";
44ed167d
PR
646 err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
647 SOCK_STREAM, IPPROTO_TCP, &sock);
b411b363
PR
648 if (err < 0) {
649 sock = NULL;
650 goto out;
651 }
652
653 sock->sk->sk_rcvtimeo =
69ef82de 654 sock->sk->sk_sndtimeo = connect_int * HZ;
44ed167d 655 drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
b411b363
PR
656
657 /* explicitly bind to the configured IP as source IP
658 * for the outgoing connections.
659 * This is needed for multihomed hosts and to be
660 * able to use lo: interfaces for drbd.
661 * Make sure to use 0 as port number, so linux selects
662 * a free one dynamically.
663 */
b411b363 664 what = "bind before connect";
44ed167d 665 err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
b411b363
PR
666 if (err < 0)
667 goto out;
668
669 /* connect may fail, peer not yet available.
670 * stay C_WF_CONNECTION, don't go Disconnecting! */
671 disconnect_on_error = 0;
672 what = "connect";
44ed167d 673 err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
b411b363
PR
674
675out:
676 if (err < 0) {
677 if (sock) {
678 sock_release(sock);
679 sock = NULL;
680 }
681 switch (-err) {
682 /* timeout, busy, signal pending */
683 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
684 case EINTR: case ERESTARTSYS:
685 /* peer not (yet) available, network problem */
686 case ECONNREFUSED: case ENETUNREACH:
687 case EHOSTDOWN: case EHOSTUNREACH:
688 disconnect_on_error = 0;
689 break;
690 default:
eac3e990 691 conn_err(tconn, "%s failed, err = %d\n", what, err);
b411b363
PR
692 }
693 if (disconnect_on_error)
bbeb641c 694 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 695 }
44ed167d 696
b411b363
PR
697 return sock;
698}
699
7653620d 700static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
b411b363 701{
44ed167d 702 int timeo, err, my_addr_len;
69ef82de 703 int sndbuf_size, rcvbuf_size, connect_int;
b411b363 704 struct socket *s_estab = NULL, *s_listen;
44ed167d
PR
705 struct sockaddr_in6 my_addr;
706 struct net_conf *nc;
b411b363
PR
707 const char *what;
708
44ed167d
PR
709 rcu_read_lock();
710 nc = rcu_dereference(tconn->net_conf);
711 if (!nc) {
712 rcu_read_unlock();
b411b363 713 return NULL;
44ed167d 714 }
44ed167d
PR
715 sndbuf_size = nc->sndbuf_size;
716 rcvbuf_size = nc->rcvbuf_size;
69ef82de 717 connect_int = nc->connect_int;
44ed167d 718 rcu_read_unlock();
b411b363 719
089c075d
AG
720 my_addr_len = min_t(int, tconn->my_addr_len, sizeof(struct sockaddr_in6));
721 memcpy(&my_addr, &tconn->my_addr, my_addr_len);
722
b411b363 723 what = "sock_create_kern";
44ed167d 724 err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
b411b363
PR
725 SOCK_STREAM, IPPROTO_TCP, &s_listen);
726 if (err) {
727 s_listen = NULL;
728 goto out;
729 }
730
69ef82de 731 timeo = connect_int * HZ;
b411b363
PR
732 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
733
734 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
735 s_listen->sk->sk_rcvtimeo = timeo;
736 s_listen->sk->sk_sndtimeo = timeo;
44ed167d 737 drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
b411b363
PR
738
739 what = "bind before listen";
44ed167d 740 err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
b411b363
PR
741 if (err < 0)
742 goto out;
743
7653620d 744 err = drbd_accept(&what, s_listen, &s_estab);
b411b363
PR
745
746out:
747 if (s_listen)
748 sock_release(s_listen);
749 if (err < 0) {
750 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
7653620d 751 conn_err(tconn, "%s failed, err = %d\n", what, err);
bbeb641c 752 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
753 }
754 }
b411b363
PR
755
756 return s_estab;
757}
758
e658983a 759static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
b411b363 760
9f5bdc33
AG
761static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
762 enum drbd_packet cmd)
763{
764 if (!conn_prepare_command(tconn, sock))
765 return -EIO;
e658983a 766 return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
b411b363
PR
767}
768
9f5bdc33 769static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
b411b363 770{
9f5bdc33
AG
771 unsigned int header_size = drbd_header_size(tconn);
772 struct packet_info pi;
773 int err;
b411b363 774
9f5bdc33
AG
775 err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
776 if (err != header_size) {
777 if (err >= 0)
778 err = -EIO;
779 return err;
780 }
781 err = decode_header(tconn, tconn->data.rbuf, &pi);
782 if (err)
783 return err;
784 return pi.cmd;
b411b363
PR
785}
786
787/**
788 * drbd_socket_okay() - Free the socket if its connection is not okay
b411b363
PR
789 * @sock: pointer to the pointer to the socket.
790 */
dbd9eea0 791static int drbd_socket_okay(struct socket **sock)
b411b363
PR
792{
793 int rr;
794 char tb[4];
795
796 if (!*sock)
81e84650 797 return false;
b411b363 798
dbd9eea0 799 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
b411b363
PR
800
801 if (rr > 0 || rr == -EAGAIN) {
81e84650 802 return true;
b411b363
PR
803 } else {
804 sock_release(*sock);
805 *sock = NULL;
81e84650 806 return false;
b411b363
PR
807 }
808}
2325eb66
PR
809/* Gets called if a connection is established, or if a new minor gets created
810 in a connection */
c141ebda 811int drbd_connected(struct drbd_conf *mdev)
907599e0 812{
0829f5ed 813 int err;
907599e0
PR
814
815 atomic_set(&mdev->packet_seq, 0);
816 mdev->peer_seq = 0;
817
8410da8f
PR
818 mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
819 &mdev->tconn->cstate_mutex :
820 &mdev->own_state_mutex;
821
0829f5ed
AG
822 err = drbd_send_sync_param(mdev);
823 if (!err)
824 err = drbd_send_sizes(mdev, 0, 0);
825 if (!err)
826 err = drbd_send_uuids(mdev);
827 if (!err)
828 err = drbd_send_state(mdev);
907599e0
PR
829 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
830 clear_bit(RESIZE_PENDING, &mdev->flags);
8b924f1d 831 mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
0829f5ed 832 return err;
907599e0
PR
833}
834
b411b363
PR
835/*
836 * return values:
837 * 1 yes, we have a valid connection
838 * 0 oops, did not work out, please try again
839 * -1 peer talks different language,
840 * no point in trying again, please go standalone.
841 * -2 We do not have a network config...
842 */
81fa2e67 843static int conn_connect(struct drbd_tconn *tconn)
b411b363 844{
2bf89621 845 struct socket *sock, *msock;
c141ebda 846 struct drbd_conf *mdev;
44ed167d 847 struct net_conf *nc;
c141ebda 848 int vnr, timeout, try, h, ok;
b411b363 849
bbeb641c 850 if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
b411b363
PR
851 return -2;
852
907599e0 853 clear_bit(DISCARD_CONCURRENT, &tconn->flags);
0916e0e3
AG
854
855 /* Assume that the peer only understands protocol 80 until we know better. */
856 tconn->agreed_pro_version = 80;
b411b363 857
b411b363 858 do {
2bf89621
AG
859 struct socket *s;
860
b411b363
PR
861 for (try = 0;;) {
862 /* 3 tries, this should take less than a second! */
907599e0 863 s = drbd_try_connect(tconn);
b411b363
PR
864 if (s || ++try >= 3)
865 break;
866 /* give the other side time to call bind() & listen() */
20ee6390 867 schedule_timeout_interruptible(HZ / 10);
b411b363
PR
868 }
869
870 if (s) {
2bf89621
AG
871 if (!tconn->data.socket) {
872 tconn->data.socket = s;
9f5bdc33 873 send_first_packet(tconn, &tconn->data, P_INITIAL_DATA);
2bf89621
AG
874 } else if (!tconn->meta.socket) {
875 tconn->meta.socket = s;
9f5bdc33 876 send_first_packet(tconn, &tconn->meta, P_INITIAL_META);
b411b363 877 } else {
81fa2e67 878 conn_err(tconn, "Logic error in conn_connect()\n");
b411b363
PR
879 goto out_release_sockets;
880 }
881 }
882
2bf89621 883 if (tconn->data.socket && tconn->meta.socket) {
907599e0 884 schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
2bf89621
AG
885 ok = drbd_socket_okay(&tconn->data.socket);
886 ok = drbd_socket_okay(&tconn->meta.socket) && ok;
b411b363
PR
887 if (ok)
888 break;
889 }
890
891retry:
907599e0 892 s = drbd_wait_for_connect(tconn);
b411b363 893 if (s) {
9f5bdc33 894 try = receive_first_packet(tconn, s);
2bf89621
AG
895 drbd_socket_okay(&tconn->data.socket);
896 drbd_socket_okay(&tconn->meta.socket);
b411b363 897 switch (try) {
e5d6f33a 898 case P_INITIAL_DATA:
2bf89621 899 if (tconn->data.socket) {
907599e0 900 conn_warn(tconn, "initial packet S crossed\n");
2bf89621 901 sock_release(tconn->data.socket);
b411b363 902 }
2bf89621 903 tconn->data.socket = s;
b411b363 904 break;
e5d6f33a 905 case P_INITIAL_META:
2bf89621 906 if (tconn->meta.socket) {
907599e0 907 conn_warn(tconn, "initial packet M crossed\n");
2bf89621 908 sock_release(tconn->meta.socket);
b411b363 909 }
2bf89621 910 tconn->meta.socket = s;
907599e0 911 set_bit(DISCARD_CONCURRENT, &tconn->flags);
b411b363
PR
912 break;
913 default:
907599e0 914 conn_warn(tconn, "Error receiving initial packet\n");
b411b363
PR
915 sock_release(s);
916 if (random32() & 1)
917 goto retry;
918 }
919 }
920
bbeb641c 921 if (tconn->cstate <= C_DISCONNECTING)
b411b363
PR
922 goto out_release_sockets;
923 if (signal_pending(current)) {
924 flush_signals(current);
925 smp_rmb();
907599e0 926 if (get_t_state(&tconn->receiver) == EXITING)
b411b363
PR
927 goto out_release_sockets;
928 }
929
2bf89621
AG
930 if (tconn->data.socket && &tconn->meta.socket) {
931 ok = drbd_socket_okay(&tconn->data.socket);
932 ok = drbd_socket_okay(&tconn->meta.socket) && ok;
b411b363
PR
933 if (ok)
934 break;
935 }
936 } while (1);
937
2bf89621
AG
938 sock = tconn->data.socket;
939 msock = tconn->meta.socket;
940
b411b363
PR
941 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
942 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
943
944 sock->sk->sk_allocation = GFP_NOIO;
945 msock->sk->sk_allocation = GFP_NOIO;
946
947 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
948 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
949
b411b363 950 /* NOT YET ...
907599e0 951 * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
b411b363 952 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
6038178e 953 * first set it to the P_CONNECTION_FEATURES timeout,
b411b363 954 * which we set to 4x the configured ping_timeout. */
44ed167d
PR
955 rcu_read_lock();
956 nc = rcu_dereference(tconn->net_conf);
957
b411b363 958 sock->sk->sk_sndtimeo =
44ed167d
PR
959 sock->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
960
961 msock->sk->sk_rcvtimeo = nc->ping_int*HZ;
962 timeout = nc->timeout * HZ / 10;
963 rcu_read_unlock();
b411b363 964
44ed167d 965 msock->sk->sk_sndtimeo = timeout;
b411b363
PR
966
967 /* we don't want delays.
25985edc 968 * we use TCP_CORK where appropriate, though */
b411b363
PR
969 drbd_tcp_nodelay(sock);
970 drbd_tcp_nodelay(msock);
971
907599e0 972 tconn->last_received = jiffies;
b411b363 973
6038178e 974 h = drbd_do_features(tconn);
b411b363
PR
975 if (h <= 0)
976 return h;
977
907599e0 978 if (tconn->cram_hmac_tfm) {
b411b363 979 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
907599e0 980 switch (drbd_do_auth(tconn)) {
b10d96cb 981 case -1:
907599e0 982 conn_err(tconn, "Authentication of peer failed\n");
b411b363 983 return -1;
b10d96cb 984 case 0:
907599e0 985 conn_err(tconn, "Authentication of peer failed, trying again.\n");
b10d96cb 986 return 0;
b411b363
PR
987 }
988 }
989
bbeb641c 990 if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
b411b363
PR
991 return 0;
992
44ed167d 993 sock->sk->sk_sndtimeo = timeout;
b411b363
PR
994 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
995
907599e0 996 drbd_thread_start(&tconn->asender);
b411b363 997
387eb308 998 if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
7e2455c1 999 return -1;
b411b363 1000
c141ebda
PR
1001 rcu_read_lock();
1002 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1003 kref_get(&mdev->kref);
1004 rcu_read_unlock();
1005 drbd_connected(mdev);
1006 kref_put(&mdev->kref, &drbd_minor_destroy);
1007 rcu_read_lock();
1008 }
1009 rcu_read_unlock();
1010
d3fcb490 1011 return h;
b411b363
PR
1012
1013out_release_sockets:
2bf89621
AG
1014 if (tconn->data.socket) {
1015 sock_release(tconn->data.socket);
1016 tconn->data.socket = NULL;
1017 }
1018 if (tconn->meta.socket) {
1019 sock_release(tconn->meta.socket);
1020 tconn->meta.socket = NULL;
1021 }
b411b363
PR
1022 return -1;
1023}
1024
e658983a 1025static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
b411b363 1026{
e658983a
AG
1027 unsigned int header_size = drbd_header_size(tconn);
1028
0c8e36d9
AG
1029 if (header_size == sizeof(struct p_header100) &&
1030 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1031 struct p_header100 *h = header;
1032 if (h->pad != 0) {
1033 conn_err(tconn, "Header padding is not zero\n");
1034 return -EINVAL;
1035 }
1036 pi->vnr = be16_to_cpu(h->volume);
1037 pi->cmd = be16_to_cpu(h->command);
1038 pi->size = be32_to_cpu(h->length);
1039 } else if (header_size == sizeof(struct p_header95) &&
1040 *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
e658983a 1041 struct p_header95 *h = header;
e658983a 1042 pi->cmd = be16_to_cpu(h->command);
b55d84ba
AG
1043 pi->size = be32_to_cpu(h->length);
1044 pi->vnr = 0;
e658983a
AG
1045 } else if (header_size == sizeof(struct p_header80) &&
1046 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1047 struct p_header80 *h = header;
1048 pi->cmd = be16_to_cpu(h->command);
1049 pi->size = be16_to_cpu(h->length);
77351055 1050 pi->vnr = 0;
02918be2 1051 } else {
e658983a
AG
1052 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1053 be32_to_cpu(*(__be32 *)header),
1054 tconn->agreed_pro_version);
8172f3e9 1055 return -EINVAL;
b411b363 1056 }
e658983a 1057 pi->data = header + header_size;
8172f3e9 1058 return 0;
257d0af6
PR
1059}
1060
9ba7aa00 1061static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
257d0af6 1062{
e658983a 1063 void *buffer = tconn->data.rbuf;
69bc7bc3 1064 int err;
257d0af6 1065
e658983a 1066 err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
a5c31904 1067 if (err)
69bc7bc3 1068 return err;
257d0af6 1069
e658983a 1070 err = decode_header(tconn, buffer, pi);
9ba7aa00 1071 tconn->last_received = jiffies;
b411b363 1072
69bc7bc3 1073 return err;
b411b363
PR
1074}
1075
2451fc3b 1076static void drbd_flush(struct drbd_conf *mdev)
b411b363
PR
1077{
1078 int rv;
1079
1080 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
fbd9b09a 1081 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
dd3932ed 1082 NULL);
b411b363 1083 if (rv) {
a67b813c 1084 dev_info(DEV, "local disk flush failed with status %d\n", rv);
b411b363
PR
1085 /* would rather check on EOPNOTSUPP, but that is not reliable.
1086 * don't try again for ANY return value != 0
1087 * if (rv == -EOPNOTSUPP) */
1088 drbd_bump_write_ordering(mdev, WO_drain_io);
1089 }
1090 put_ldev(mdev);
1091 }
b411b363
PR
1092}
1093
1094/**
1095 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1096 * @mdev: DRBD device.
1097 * @epoch: Epoch object.
1098 * @ev: Epoch event.
1099 */
1100static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1101 struct drbd_epoch *epoch,
1102 enum epoch_event ev)
1103{
2451fc3b 1104 int epoch_size;
b411b363 1105 struct drbd_epoch *next_epoch;
b411b363
PR
1106 enum finish_epoch rv = FE_STILL_LIVE;
1107
1108 spin_lock(&mdev->epoch_lock);
1109 do {
1110 next_epoch = NULL;
b411b363
PR
1111
1112 epoch_size = atomic_read(&epoch->epoch_size);
1113
1114 switch (ev & ~EV_CLEANUP) {
1115 case EV_PUT:
1116 atomic_dec(&epoch->active);
1117 break;
1118 case EV_GOT_BARRIER_NR:
1119 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
b411b363
PR
1120 break;
1121 case EV_BECAME_LAST:
1122 /* nothing to do*/
1123 break;
1124 }
1125
b411b363
PR
1126 if (epoch_size != 0 &&
1127 atomic_read(&epoch->active) == 0 &&
2451fc3b 1128 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
b411b363
PR
1129 if (!(ev & EV_CLEANUP)) {
1130 spin_unlock(&mdev->epoch_lock);
1131 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1132 spin_lock(&mdev->epoch_lock);
1133 }
1134 dec_unacked(mdev);
1135
1136 if (mdev->current_epoch != epoch) {
1137 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1138 list_del(&epoch->list);
1139 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1140 mdev->epochs--;
b411b363
PR
1141 kfree(epoch);
1142
1143 if (rv == FE_STILL_LIVE)
1144 rv = FE_DESTROYED;
1145 } else {
1146 epoch->flags = 0;
1147 atomic_set(&epoch->epoch_size, 0);
698f9315 1148 /* atomic_set(&epoch->active, 0); is already zero */
b411b363
PR
1149 if (rv == FE_STILL_LIVE)
1150 rv = FE_RECYCLED;
2451fc3b 1151 wake_up(&mdev->ee_wait);
b411b363
PR
1152 }
1153 }
1154
1155 if (!next_epoch)
1156 break;
1157
1158 epoch = next_epoch;
1159 } while (1);
1160
1161 spin_unlock(&mdev->epoch_lock);
1162
b411b363
PR
1163 return rv;
1164}
1165
1166/**
1167 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1168 * @mdev: DRBD device.
1169 * @wo: Write ordering method to try.
1170 */
1171void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1172{
daeda1cc 1173 struct disk_conf *dc;
b411b363
PR
1174 enum write_ordering_e pwo;
1175 static char *write_ordering_str[] = {
1176 [WO_none] = "none",
1177 [WO_drain_io] = "drain",
1178 [WO_bdev_flush] = "flush",
b411b363
PR
1179 };
1180
1181 pwo = mdev->write_ordering;
1182 wo = min(pwo, wo);
daeda1cc
PR
1183 rcu_read_lock();
1184 dc = rcu_dereference(mdev->ldev->disk_conf);
1185
66b2f6b9 1186 if (wo == WO_bdev_flush && !dc->disk_flushes)
b411b363 1187 wo = WO_drain_io;
d0c980e2 1188 if (wo == WO_drain_io && !dc->disk_drain)
b411b363 1189 wo = WO_none;
daeda1cc 1190 rcu_read_unlock();
b411b363 1191 mdev->write_ordering = wo;
2451fc3b 1192 if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
b411b363
PR
1193 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1194}
1195
45bb912b 1196/**
fbe29dec 1197 * drbd_submit_peer_request()
45bb912b 1198 * @mdev: DRBD device.
db830c46 1199 * @peer_req: peer request
45bb912b 1200 * @rw: flag field, see bio->bi_rw
10f6d992
LE
1201 *
1202 * May spread the pages to multiple bios,
1203 * depending on bio_add_page restrictions.
1204 *
1205 * Returns 0 if all bios have been submitted,
1206 * -ENOMEM if we could not allocate enough bios,
1207 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1208 * single page to an empty bio (which should never happen and likely indicates
1209 * that the lower level IO stack is in some way broken). This has been observed
1210 * on certain Xen deployments.
45bb912b
LE
1211 */
1212/* TODO allocate from our own bio_set. */
fbe29dec
AG
1213int drbd_submit_peer_request(struct drbd_conf *mdev,
1214 struct drbd_peer_request *peer_req,
1215 const unsigned rw, const int fault_type)
45bb912b
LE
1216{
1217 struct bio *bios = NULL;
1218 struct bio *bio;
db830c46
AG
1219 struct page *page = peer_req->pages;
1220 sector_t sector = peer_req->i.sector;
1221 unsigned ds = peer_req->i.size;
45bb912b
LE
1222 unsigned n_bios = 0;
1223 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
10f6d992 1224 int err = -ENOMEM;
45bb912b
LE
1225
1226 /* In most cases, we will only need one bio. But in case the lower
1227 * level restrictions happen to be different at this offset on this
1228 * side than those of the sending peer, we may need to submit the
da4a75d2
LE
1229 * request in more than one bio.
1230 *
1231 * Plain bio_alloc is good enough here, this is no DRBD internally
1232 * generated bio, but a bio allocated on behalf of the peer.
1233 */
45bb912b
LE
1234next_bio:
1235 bio = bio_alloc(GFP_NOIO, nr_pages);
1236 if (!bio) {
1237 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1238 goto fail;
1239 }
db830c46 1240 /* > peer_req->i.sector, unless this is the first bio */
45bb912b
LE
1241 bio->bi_sector = sector;
1242 bio->bi_bdev = mdev->ldev->backing_bdev;
45bb912b 1243 bio->bi_rw = rw;
db830c46 1244 bio->bi_private = peer_req;
fcefa62e 1245 bio->bi_end_io = drbd_peer_request_endio;
45bb912b
LE
1246
1247 bio->bi_next = bios;
1248 bios = bio;
1249 ++n_bios;
1250
1251 page_chain_for_each(page) {
1252 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1253 if (!bio_add_page(bio, page, len, 0)) {
10f6d992
LE
1254 /* A single page must always be possible!
1255 * But in case it fails anyways,
1256 * we deal with it, and complain (below). */
1257 if (bio->bi_vcnt == 0) {
1258 dev_err(DEV,
1259 "bio_add_page failed for len=%u, "
1260 "bi_vcnt=0 (bi_sector=%llu)\n",
1261 len, (unsigned long long)bio->bi_sector);
1262 err = -ENOSPC;
1263 goto fail;
1264 }
45bb912b
LE
1265 goto next_bio;
1266 }
1267 ds -= len;
1268 sector += len >> 9;
1269 --nr_pages;
1270 }
1271 D_ASSERT(page == NULL);
1272 D_ASSERT(ds == 0);
1273
db830c46 1274 atomic_set(&peer_req->pending_bios, n_bios);
45bb912b
LE
1275 do {
1276 bio = bios;
1277 bios = bios->bi_next;
1278 bio->bi_next = NULL;
1279
45bb912b 1280 drbd_generic_make_request(mdev, fault_type, bio);
45bb912b 1281 } while (bios);
45bb912b
LE
1282 return 0;
1283
1284fail:
1285 while (bios) {
1286 bio = bios;
1287 bios = bios->bi_next;
1288 bio_put(bio);
1289 }
10f6d992 1290 return err;
45bb912b
LE
1291}
1292
53840641 1293static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
db830c46 1294 struct drbd_peer_request *peer_req)
53840641 1295{
db830c46 1296 struct drbd_interval *i = &peer_req->i;
53840641
AG
1297
1298 drbd_remove_interval(&mdev->write_requests, i);
1299 drbd_clear_interval(i);
1300
6c852bec 1301 /* Wake up any processes waiting for this peer request to complete. */
53840641
AG
1302 if (i->waiting)
1303 wake_up(&mdev->misc_wait);
1304}
1305
4a76b161 1306static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 1307{
4a76b161 1308 struct drbd_conf *mdev;
2451fc3b 1309 int rv;
e658983a 1310 struct p_barrier *p = pi->data;
b411b363
PR
1311 struct drbd_epoch *epoch;
1312
4a76b161
AG
1313 mdev = vnr_to_mdev(tconn, pi->vnr);
1314 if (!mdev)
1315 return -EIO;
1316
b411b363
PR
1317 inc_unacked(mdev);
1318
b411b363
PR
1319 mdev->current_epoch->barrier_nr = p->barrier;
1320 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1321
1322 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1323 * the activity log, which means it would not be resynced in case the
1324 * R_PRIMARY crashes now.
1325 * Therefore we must send the barrier_ack after the barrier request was
1326 * completed. */
1327 switch (mdev->write_ordering) {
b411b363
PR
1328 case WO_none:
1329 if (rv == FE_RECYCLED)
82bc0194 1330 return 0;
2451fc3b
PR
1331
1332 /* receiver context, in the writeout path of the other node.
1333 * avoid potential distributed deadlock */
1334 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1335 if (epoch)
1336 break;
1337 else
1338 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1339 /* Fall through */
b411b363
PR
1340
1341 case WO_bdev_flush:
1342 case WO_drain_io:
b411b363 1343 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
2451fc3b
PR
1344 drbd_flush(mdev);
1345
1346 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1347 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1348 if (epoch)
1349 break;
b411b363
PR
1350 }
1351
2451fc3b
PR
1352 epoch = mdev->current_epoch;
1353 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1354
1355 D_ASSERT(atomic_read(&epoch->active) == 0);
1356 D_ASSERT(epoch->flags == 0);
b411b363 1357
82bc0194 1358 return 0;
2451fc3b
PR
1359 default:
1360 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
82bc0194 1361 return -EIO;
b411b363
PR
1362 }
1363
1364 epoch->flags = 0;
1365 atomic_set(&epoch->epoch_size, 0);
1366 atomic_set(&epoch->active, 0);
1367
1368 spin_lock(&mdev->epoch_lock);
1369 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1370 list_add(&epoch->list, &mdev->current_epoch->list);
1371 mdev->current_epoch = epoch;
1372 mdev->epochs++;
b411b363
PR
1373 } else {
1374 /* The current_epoch got recycled while we allocated this one... */
1375 kfree(epoch);
1376 }
1377 spin_unlock(&mdev->epoch_lock);
1378
82bc0194 1379 return 0;
b411b363
PR
1380}
1381
1382/* used from receive_RSDataReply (recv_resync_read)
1383 * and from receive_Data */
f6ffca9f
AG
1384static struct drbd_peer_request *
1385read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1386 int data_size) __must_hold(local)
b411b363 1387{
6666032a 1388 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
db830c46 1389 struct drbd_peer_request *peer_req;
b411b363 1390 struct page *page;
a5c31904 1391 int dgs, ds, err;
a0638456
PR
1392 void *dig_in = mdev->tconn->int_dig_in;
1393 void *dig_vv = mdev->tconn->int_dig_vv;
6b4388ac 1394 unsigned long *data;
b411b363 1395
88104ca4
AG
1396 dgs = 0;
1397 if (mdev->tconn->peer_integrity_tfm) {
1398 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
9f5bdc33
AG
1399 /*
1400 * FIXME: Receive the incoming digest into the receive buffer
1401 * here, together with its struct p_data?
1402 */
a5c31904
AG
1403 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1404 if (err)
b411b363 1405 return NULL;
88104ca4 1406 data_size -= dgs;
b411b363
PR
1407 }
1408
841ce241
AG
1409 if (!expect(data_size != 0))
1410 return NULL;
1411 if (!expect(IS_ALIGNED(data_size, 512)))
1412 return NULL;
1413 if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1414 return NULL;
b411b363 1415
6666032a
LE
1416 /* even though we trust out peer,
1417 * we sometimes have to double check. */
1418 if (sector + (data_size>>9) > capacity) {
fdda6544
LE
1419 dev_err(DEV, "request from peer beyond end of local disk: "
1420 "capacity: %llus < sector: %llus + size: %u\n",
6666032a
LE
1421 (unsigned long long)capacity,
1422 (unsigned long long)sector, data_size);
1423 return NULL;
1424 }
1425
b411b363
PR
1426 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1427 * "criss-cross" setup, that might cause write-out on some other DRBD,
1428 * which in turn might block on the other node at this very place. */
0db55363 1429 peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
db830c46 1430 if (!peer_req)
b411b363 1431 return NULL;
45bb912b 1432
b411b363 1433 ds = data_size;
db830c46 1434 page = peer_req->pages;
45bb912b
LE
1435 page_chain_for_each(page) {
1436 unsigned len = min_t(int, ds, PAGE_SIZE);
6b4388ac 1437 data = kmap(page);
a5c31904 1438 err = drbd_recv_all_warn(mdev->tconn, data, len);
0cf9d27e 1439 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
6b4388ac
PR
1440 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1441 data[0] = data[0] ^ (unsigned long)-1;
1442 }
b411b363 1443 kunmap(page);
a5c31904 1444 if (err) {
3967deb1 1445 drbd_free_peer_req(mdev, peer_req);
b411b363
PR
1446 return NULL;
1447 }
a5c31904 1448 ds -= len;
b411b363
PR
1449 }
1450
1451 if (dgs) {
5b614abe 1452 drbd_csum_ee(mdev, mdev->tconn->peer_integrity_tfm, peer_req, dig_vv);
b411b363 1453 if (memcmp(dig_in, dig_vv, dgs)) {
470be44a
LE
1454 dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1455 (unsigned long long)sector, data_size);
3967deb1 1456 drbd_free_peer_req(mdev, peer_req);
b411b363
PR
1457 return NULL;
1458 }
1459 }
1460 mdev->recv_cnt += data_size>>9;
db830c46 1461 return peer_req;
b411b363
PR
1462}
1463
1464/* drbd_drain_block() just takes a data block
1465 * out of the socket input buffer, and discards it.
1466 */
1467static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1468{
1469 struct page *page;
a5c31904 1470 int err = 0;
b411b363
PR
1471 void *data;
1472
c3470cde 1473 if (!data_size)
fc5be839 1474 return 0;
c3470cde 1475
c37c8ecf 1476 page = drbd_alloc_pages(mdev, 1, 1);
b411b363
PR
1477
1478 data = kmap(page);
1479 while (data_size) {
fc5be839
AG
1480 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1481
a5c31904
AG
1482 err = drbd_recv_all_warn(mdev->tconn, data, len);
1483 if (err)
b411b363 1484 break;
a5c31904 1485 data_size -= len;
b411b363
PR
1486 }
1487 kunmap(page);
5cc287e0 1488 drbd_free_pages(mdev, page, 0);
fc5be839 1489 return err;
b411b363
PR
1490}
1491
1492static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1493 sector_t sector, int data_size)
1494{
1495 struct bio_vec *bvec;
1496 struct bio *bio;
a5c31904 1497 int dgs, err, i, expect;
a0638456
PR
1498 void *dig_in = mdev->tconn->int_dig_in;
1499 void *dig_vv = mdev->tconn->int_dig_vv;
b411b363 1500
88104ca4
AG
1501 dgs = 0;
1502 if (mdev->tconn->peer_integrity_tfm) {
1503 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
a5c31904
AG
1504 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1505 if (err)
1506 return err;
88104ca4 1507 data_size -= dgs;
b411b363
PR
1508 }
1509
b411b363
PR
1510 /* optimistically update recv_cnt. if receiving fails below,
1511 * we disconnect anyways, and counters will be reset. */
1512 mdev->recv_cnt += data_size>>9;
1513
1514 bio = req->master_bio;
1515 D_ASSERT(sector == bio->bi_sector);
1516
1517 bio_for_each_segment(bvec, bio, i) {
a5c31904 1518 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
b411b363 1519 expect = min_t(int, data_size, bvec->bv_len);
a5c31904 1520 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
b411b363 1521 kunmap(bvec->bv_page);
a5c31904
AG
1522 if (err)
1523 return err;
1524 data_size -= expect;
b411b363
PR
1525 }
1526
1527 if (dgs) {
5b614abe 1528 drbd_csum_bio(mdev, mdev->tconn->peer_integrity_tfm, bio, dig_vv);
b411b363
PR
1529 if (memcmp(dig_in, dig_vv, dgs)) {
1530 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
28284cef 1531 return -EINVAL;
b411b363
PR
1532 }
1533 }
1534
1535 D_ASSERT(data_size == 0);
28284cef 1536 return 0;
b411b363
PR
1537}
1538
a990be46
AG
1539/*
1540 * e_end_resync_block() is called in asender context via
1541 * drbd_finish_peer_reqs().
1542 */
99920dc5 1543static int e_end_resync_block(struct drbd_work *w, int unused)
b411b363 1544{
8050e6d0
AG
1545 struct drbd_peer_request *peer_req =
1546 container_of(w, struct drbd_peer_request, w);
00d56944 1547 struct drbd_conf *mdev = w->mdev;
db830c46 1548 sector_t sector = peer_req->i.sector;
99920dc5 1549 int err;
b411b363 1550
db830c46 1551 D_ASSERT(drbd_interval_empty(&peer_req->i));
b411b363 1552
db830c46
AG
1553 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1554 drbd_set_in_sync(mdev, sector, peer_req->i.size);
99920dc5 1555 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
b411b363
PR
1556 } else {
1557 /* Record failure to sync */
db830c46 1558 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
b411b363 1559
99920dc5 1560 err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
b411b363
PR
1561 }
1562 dec_unacked(mdev);
1563
99920dc5 1564 return err;
b411b363
PR
1565}
1566
1567static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1568{
db830c46 1569 struct drbd_peer_request *peer_req;
b411b363 1570
db830c46
AG
1571 peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1572 if (!peer_req)
45bb912b 1573 goto fail;
b411b363
PR
1574
1575 dec_rs_pending(mdev);
1576
b411b363
PR
1577 inc_unacked(mdev);
1578 /* corresponding dec_unacked() in e_end_resync_block()
1579 * respective _drbd_clear_done_ee */
1580
db830c46 1581 peer_req->w.cb = e_end_resync_block;
45bb912b 1582
87eeee41 1583 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 1584 list_add(&peer_req->w.list, &mdev->sync_ee);
87eeee41 1585 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 1586
0f0601f4 1587 atomic_add(data_size >> 9, &mdev->rs_sect_ev);
fbe29dec 1588 if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
e1c1b0fc 1589 return 0;
b411b363 1590
10f6d992
LE
1591 /* don't care for the reason here */
1592 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 1593 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 1594 list_del(&peer_req->w.list);
87eeee41 1595 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9 1596
3967deb1 1597 drbd_free_peer_req(mdev, peer_req);
45bb912b
LE
1598fail:
1599 put_ldev(mdev);
e1c1b0fc 1600 return -EIO;
b411b363
PR
1601}
1602
668eebc6 1603static struct drbd_request *
bc9c5c41
AG
1604find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1605 sector_t sector, bool missing_ok, const char *func)
51624585 1606{
51624585
AG
1607 struct drbd_request *req;
1608
bc9c5c41
AG
1609 /* Request object according to our peer */
1610 req = (struct drbd_request *)(unsigned long)id;
5e472264 1611 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
668eebc6 1612 return req;
c3afd8f5 1613 if (!missing_ok) {
5af172ed 1614 dev_err(DEV, "%s: failed to find request 0x%lx, sector %llus\n", func,
c3afd8f5
AG
1615 (unsigned long)id, (unsigned long long)sector);
1616 }
51624585
AG
1617 return NULL;
1618}
1619
4a76b161 1620static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 1621{
4a76b161 1622 struct drbd_conf *mdev;
b411b363
PR
1623 struct drbd_request *req;
1624 sector_t sector;
82bc0194 1625 int err;
e658983a 1626 struct p_data *p = pi->data;
4a76b161
AG
1627
1628 mdev = vnr_to_mdev(tconn, pi->vnr);
1629 if (!mdev)
1630 return -EIO;
b411b363
PR
1631
1632 sector = be64_to_cpu(p->sector);
1633
87eeee41 1634 spin_lock_irq(&mdev->tconn->req_lock);
bc9c5c41 1635 req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
87eeee41 1636 spin_unlock_irq(&mdev->tconn->req_lock);
c3afd8f5 1637 if (unlikely(!req))
82bc0194 1638 return -EIO;
b411b363 1639
24c4830c 1640 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
b411b363
PR
1641 * special casing it there for the various failure cases.
1642 * still no race with drbd_fail_pending_reads */
e2857216 1643 err = recv_dless_read(mdev, req, sector, pi->size);
82bc0194 1644 if (!err)
8554df1c 1645 req_mod(req, DATA_RECEIVED);
b411b363
PR
1646 /* else: nothing. handled from drbd_disconnect...
1647 * I don't think we may complete this just yet
1648 * in case we are "on-disconnect: freeze" */
1649
82bc0194 1650 return err;
b411b363
PR
1651}
1652
4a76b161 1653static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 1654{
4a76b161 1655 struct drbd_conf *mdev;
b411b363 1656 sector_t sector;
82bc0194 1657 int err;
e658983a 1658 struct p_data *p = pi->data;
4a76b161
AG
1659
1660 mdev = vnr_to_mdev(tconn, pi->vnr);
1661 if (!mdev)
1662 return -EIO;
b411b363
PR
1663
1664 sector = be64_to_cpu(p->sector);
1665 D_ASSERT(p->block_id == ID_SYNCER);
1666
1667 if (get_ldev(mdev)) {
1668 /* data is submitted to disk within recv_resync_read.
1669 * corresponding put_ldev done below on error,
fcefa62e 1670 * or in drbd_peer_request_endio. */
e2857216 1671 err = recv_resync_read(mdev, sector, pi->size);
b411b363
PR
1672 } else {
1673 if (__ratelimit(&drbd_ratelimit_state))
1674 dev_err(DEV, "Can not write resync data to local disk.\n");
1675
e2857216 1676 err = drbd_drain_block(mdev, pi->size);
b411b363 1677
e2857216 1678 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
b411b363
PR
1679 }
1680
e2857216 1681 atomic_add(pi->size >> 9, &mdev->rs_sect_in);
778f271d 1682
82bc0194 1683 return err;
b411b363
PR
1684}
1685
99920dc5 1686static int w_restart_write(struct drbd_work *w, int cancel)
7be8da07
AG
1687{
1688 struct drbd_request *req = container_of(w, struct drbd_request, w);
1689 struct drbd_conf *mdev = w->mdev;
1690 struct bio *bio;
1691 unsigned long start_time;
1692 unsigned long flags;
1693
1694 spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1695 if (!expect(req->rq_state & RQ_POSTPONED)) {
1696 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
99920dc5 1697 return -EIO;
7be8da07
AG
1698 }
1699 bio = req->master_bio;
1700 start_time = req->start_time;
1701 /* Postponed requests will not have their master_bio completed! */
1702 __req_mod(req, DISCARD_WRITE, NULL);
1703 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1704
1705 while (__drbd_make_request(mdev, bio, start_time))
1706 /* retry */ ;
99920dc5 1707 return 0;
7be8da07
AG
1708}
1709
1710static void restart_conflicting_writes(struct drbd_conf *mdev,
1711 sector_t sector, int size)
1712{
1713 struct drbd_interval *i;
1714 struct drbd_request *req;
1715
1716 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1717 if (!i->local)
1718 continue;
1719 req = container_of(i, struct drbd_request, i);
1720 if (req->rq_state & RQ_LOCAL_PENDING ||
1721 !(req->rq_state & RQ_POSTPONED))
1722 continue;
1723 if (expect(list_empty(&req->w.list))) {
1724 req->w.mdev = mdev;
1725 req->w.cb = w_restart_write;
1726 drbd_queue_work(&mdev->tconn->data.work, &req->w);
1727 }
1728 }
1729}
1730
a990be46
AG
1731/*
1732 * e_end_block() is called in asender context via drbd_finish_peer_reqs().
b411b363 1733 */
99920dc5 1734static int e_end_block(struct drbd_work *w, int cancel)
b411b363 1735{
8050e6d0
AG
1736 struct drbd_peer_request *peer_req =
1737 container_of(w, struct drbd_peer_request, w);
00d56944 1738 struct drbd_conf *mdev = w->mdev;
db830c46 1739 sector_t sector = peer_req->i.sector;
99920dc5 1740 int err = 0, pcmd;
b411b363 1741
303d1448 1742 if (peer_req->flags & EE_SEND_WRITE_ACK) {
db830c46 1743 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1744 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1745 mdev->state.conn <= C_PAUSED_SYNC_T &&
db830c46 1746 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
b411b363 1747 P_RS_WRITE_ACK : P_WRITE_ACK;
99920dc5 1748 err = drbd_send_ack(mdev, pcmd, peer_req);
b411b363 1749 if (pcmd == P_RS_WRITE_ACK)
db830c46 1750 drbd_set_in_sync(mdev, sector, peer_req->i.size);
b411b363 1751 } else {
99920dc5 1752 err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
b411b363
PR
1753 /* we expect it to be marked out of sync anyways...
1754 * maybe assert this? */
1755 }
1756 dec_unacked(mdev);
1757 }
1758 /* we delete from the conflict detection hash _after_ we sent out the
1759 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
302bdeae 1760 if (peer_req->flags & EE_IN_INTERVAL_TREE) {
87eeee41 1761 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
1762 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1763 drbd_remove_epoch_entry_interval(mdev, peer_req);
7be8da07
AG
1764 if (peer_req->flags & EE_RESTART_REQUESTS)
1765 restart_conflicting_writes(mdev, sector, peer_req->i.size);
87eeee41 1766 spin_unlock_irq(&mdev->tconn->req_lock);
bb3bfe96 1767 } else
db830c46 1768 D_ASSERT(drbd_interval_empty(&peer_req->i));
b411b363 1769
db830c46 1770 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
b411b363 1771
99920dc5 1772 return err;
b411b363
PR
1773}
1774
7be8da07 1775static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
b411b363 1776{
7be8da07 1777 struct drbd_conf *mdev = w->mdev;
8050e6d0
AG
1778 struct drbd_peer_request *peer_req =
1779 container_of(w, struct drbd_peer_request, w);
99920dc5 1780 int err;
b411b363 1781
99920dc5 1782 err = drbd_send_ack(mdev, ack, peer_req);
b411b363
PR
1783 dec_unacked(mdev);
1784
99920dc5 1785 return err;
b411b363
PR
1786}
1787
99920dc5 1788static int e_send_discard_write(struct drbd_work *w, int unused)
7be8da07
AG
1789{
1790 return e_send_ack(w, P_DISCARD_WRITE);
1791}
1792
99920dc5 1793static int e_send_retry_write(struct drbd_work *w, int unused)
7be8da07
AG
1794{
1795 struct drbd_tconn *tconn = w->mdev->tconn;
1796
1797 return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1798 P_RETRY_WRITE : P_DISCARD_WRITE);
1799}
1800
3e394da1
AG
1801static bool seq_greater(u32 a, u32 b)
1802{
1803 /*
1804 * We assume 32-bit wrap-around here.
1805 * For 24-bit wrap-around, we would have to shift:
1806 * a <<= 8; b <<= 8;
1807 */
1808 return (s32)a - (s32)b > 0;
1809}
1810
1811static u32 seq_max(u32 a, u32 b)
1812{
1813 return seq_greater(a, b) ? a : b;
1814}
1815
7be8da07
AG
1816static bool need_peer_seq(struct drbd_conf *mdev)
1817{
1818 struct drbd_tconn *tconn = mdev->tconn;
302bdeae 1819 int tp;
7be8da07
AG
1820
1821 /*
1822 * We only need to keep track of the last packet_seq number of our peer
1823 * if we are in dual-primary mode and we have the discard flag set; see
1824 * handle_write_conflicts().
1825 */
302bdeae
PR
1826
1827 rcu_read_lock();
1828 tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
1829 rcu_read_unlock();
1830
1831 return tp && test_bit(DISCARD_CONCURRENT, &tconn->flags);
7be8da07
AG
1832}
1833
43ae077d 1834static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
3e394da1 1835{
3c13b680 1836 unsigned int newest_peer_seq;
3e394da1 1837
7be8da07
AG
1838 if (need_peer_seq(mdev)) {
1839 spin_lock(&mdev->peer_seq_lock);
3c13b680
LE
1840 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1841 mdev->peer_seq = newest_peer_seq;
7be8da07 1842 spin_unlock(&mdev->peer_seq_lock);
3c13b680
LE
1843 /* wake up only if we actually changed mdev->peer_seq */
1844 if (peer_seq == newest_peer_seq)
7be8da07
AG
1845 wake_up(&mdev->seq_wait);
1846 }
3e394da1
AG
1847}
1848
b411b363
PR
1849/* Called from receive_Data.
1850 * Synchronize packets on sock with packets on msock.
1851 *
1852 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1853 * packet traveling on msock, they are still processed in the order they have
1854 * been sent.
1855 *
1856 * Note: we don't care for Ack packets overtaking P_DATA packets.
1857 *
1858 * In case packet_seq is larger than mdev->peer_seq number, there are
1859 * outstanding packets on the msock. We wait for them to arrive.
1860 * In case we are the logically next packet, we update mdev->peer_seq
1861 * ourselves. Correctly handles 32bit wrap around.
1862 *
1863 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1864 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1865 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1866 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1867 *
1868 * returns 0 if we may process the packet,
1869 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
7be8da07 1870static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
b411b363
PR
1871{
1872 DEFINE_WAIT(wait);
b411b363 1873 long timeout;
7be8da07
AG
1874 int ret;
1875
1876 if (!need_peer_seq(mdev))
1877 return 0;
1878
b411b363
PR
1879 spin_lock(&mdev->peer_seq_lock);
1880 for (;;) {
7be8da07
AG
1881 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1882 mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1883 ret = 0;
b411b363 1884 break;
7be8da07 1885 }
b411b363
PR
1886 if (signal_pending(current)) {
1887 ret = -ERESTARTSYS;
1888 break;
1889 }
7be8da07 1890 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
b411b363 1891 spin_unlock(&mdev->peer_seq_lock);
44ed167d
PR
1892 rcu_read_lock();
1893 timeout = rcu_dereference(mdev->tconn->net_conf)->ping_timeo*HZ/10;
1894 rcu_read_unlock();
71b1c1eb 1895 timeout = schedule_timeout(timeout);
b411b363 1896 spin_lock(&mdev->peer_seq_lock);
7be8da07 1897 if (!timeout) {
b411b363 1898 ret = -ETIMEDOUT;
71b1c1eb 1899 dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
b411b363
PR
1900 break;
1901 }
1902 }
b411b363 1903 spin_unlock(&mdev->peer_seq_lock);
7be8da07 1904 finish_wait(&mdev->seq_wait, &wait);
b411b363
PR
1905 return ret;
1906}
1907
688593c5
LE
1908/* see also bio_flags_to_wire()
1909 * DRBD_REQ_*, because we need to semantically map the flags to data packet
1910 * flags and back. We may replicate to other kernel versions. */
1911static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
76d2e7ec 1912{
688593c5
LE
1913 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1914 (dpf & DP_FUA ? REQ_FUA : 0) |
1915 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1916 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
76d2e7ec
PR
1917}
1918
7be8da07
AG
1919static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1920 unsigned int size)
1921{
1922 struct drbd_interval *i;
1923
1924 repeat:
1925 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1926 struct drbd_request *req;
1927 struct bio_and_error m;
1928
1929 if (!i->local)
1930 continue;
1931 req = container_of(i, struct drbd_request, i);
1932 if (!(req->rq_state & RQ_POSTPONED))
1933 continue;
1934 req->rq_state &= ~RQ_POSTPONED;
1935 __req_mod(req, NEG_ACKED, &m);
1936 spin_unlock_irq(&mdev->tconn->req_lock);
1937 if (m.bio)
1938 complete_master_bio(mdev, &m);
1939 spin_lock_irq(&mdev->tconn->req_lock);
1940 goto repeat;
1941 }
1942}
1943
1944static int handle_write_conflicts(struct drbd_conf *mdev,
1945 struct drbd_peer_request *peer_req)
1946{
1947 struct drbd_tconn *tconn = mdev->tconn;
1948 bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1949 sector_t sector = peer_req->i.sector;
1950 const unsigned int size = peer_req->i.size;
1951 struct drbd_interval *i;
1952 bool equal;
1953 int err;
1954
1955 /*
1956 * Inserting the peer request into the write_requests tree will prevent
1957 * new conflicting local requests from being added.
1958 */
1959 drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1960
1961 repeat:
1962 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1963 if (i == &peer_req->i)
1964 continue;
1965
1966 if (!i->local) {
1967 /*
1968 * Our peer has sent a conflicting remote request; this
1969 * should not happen in a two-node setup. Wait for the
1970 * earlier peer request to complete.
1971 */
1972 err = drbd_wait_misc(mdev, i);
1973 if (err)
1974 goto out;
1975 goto repeat;
1976 }
1977
1978 equal = i->sector == sector && i->size == size;
1979 if (resolve_conflicts) {
1980 /*
1981 * If the peer request is fully contained within the
1982 * overlapping request, it can be discarded; otherwise,
1983 * it will be retried once all overlapping requests
1984 * have completed.
1985 */
1986 bool discard = i->sector <= sector && i->sector +
1987 (i->size >> 9) >= sector + (size >> 9);
1988
1989 if (!equal)
1990 dev_alert(DEV, "Concurrent writes detected: "
1991 "local=%llus +%u, remote=%llus +%u, "
1992 "assuming %s came first\n",
1993 (unsigned long long)i->sector, i->size,
1994 (unsigned long long)sector, size,
1995 discard ? "local" : "remote");
1996
1997 inc_unacked(mdev);
1998 peer_req->w.cb = discard ? e_send_discard_write :
1999 e_send_retry_write;
2000 list_add_tail(&peer_req->w.list, &mdev->done_ee);
2001 wake_asender(mdev->tconn);
2002
2003 err = -ENOENT;
2004 goto out;
2005 } else {
2006 struct drbd_request *req =
2007 container_of(i, struct drbd_request, i);
2008
2009 if (!equal)
2010 dev_alert(DEV, "Concurrent writes detected: "
2011 "local=%llus +%u, remote=%llus +%u\n",
2012 (unsigned long long)i->sector, i->size,
2013 (unsigned long long)sector, size);
2014
2015 if (req->rq_state & RQ_LOCAL_PENDING ||
2016 !(req->rq_state & RQ_POSTPONED)) {
2017 /*
2018 * Wait for the node with the discard flag to
2019 * decide if this request will be discarded or
2020 * retried. Requests that are discarded will
2021 * disappear from the write_requests tree.
2022 *
2023 * In addition, wait for the conflicting
2024 * request to finish locally before submitting
2025 * the conflicting peer request.
2026 */
2027 err = drbd_wait_misc(mdev, &req->i);
2028 if (err) {
2029 _conn_request_state(mdev->tconn,
2030 NS(conn, C_TIMEOUT),
2031 CS_HARD);
2032 fail_postponed_requests(mdev, sector, size);
2033 goto out;
2034 }
2035 goto repeat;
2036 }
2037 /*
2038 * Remember to restart the conflicting requests after
2039 * the new peer request has completed.
2040 */
2041 peer_req->flags |= EE_RESTART_REQUESTS;
2042 }
2043 }
2044 err = 0;
2045
2046 out:
2047 if (err)
2048 drbd_remove_epoch_entry_interval(mdev, peer_req);
2049 return err;
2050}
2051
b411b363 2052/* mirrored write */
4a76b161 2053static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 2054{
4a76b161 2055 struct drbd_conf *mdev;
b411b363 2056 sector_t sector;
db830c46 2057 struct drbd_peer_request *peer_req;
e658983a 2058 struct p_data *p = pi->data;
7be8da07 2059 u32 peer_seq = be32_to_cpu(p->seq_num);
b411b363
PR
2060 int rw = WRITE;
2061 u32 dp_flags;
302bdeae 2062 int err, tp;
b411b363 2063
4a76b161
AG
2064 mdev = vnr_to_mdev(tconn, pi->vnr);
2065 if (!mdev)
2066 return -EIO;
2067
7be8da07 2068 if (!get_ldev(mdev)) {
82bc0194
AG
2069 int err2;
2070
7be8da07 2071 err = wait_for_and_update_peer_seq(mdev, peer_seq);
e2857216 2072 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
b411b363 2073 atomic_inc(&mdev->current_epoch->epoch_size);
e2857216 2074 err2 = drbd_drain_block(mdev, pi->size);
82bc0194
AG
2075 if (!err)
2076 err = err2;
2077 return err;
b411b363
PR
2078 }
2079
fcefa62e
AG
2080 /*
2081 * Corresponding put_ldev done either below (on various errors), or in
2082 * drbd_peer_request_endio, if we successfully submit the data at the
2083 * end of this function.
2084 */
b411b363
PR
2085
2086 sector = be64_to_cpu(p->sector);
e2857216 2087 peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
db830c46 2088 if (!peer_req) {
b411b363 2089 put_ldev(mdev);
82bc0194 2090 return -EIO;
b411b363
PR
2091 }
2092
db830c46 2093 peer_req->w.cb = e_end_block;
b411b363 2094
688593c5
LE
2095 dp_flags = be32_to_cpu(p->dp_flags);
2096 rw |= wire_flags_to_bio(mdev, dp_flags);
2097
2098 if (dp_flags & DP_MAY_SET_IN_SYNC)
db830c46 2099 peer_req->flags |= EE_MAY_SET_IN_SYNC;
688593c5 2100
b411b363 2101 spin_lock(&mdev->epoch_lock);
db830c46
AG
2102 peer_req->epoch = mdev->current_epoch;
2103 atomic_inc(&peer_req->epoch->epoch_size);
2104 atomic_inc(&peer_req->epoch->active);
b411b363
PR
2105 spin_unlock(&mdev->epoch_lock);
2106
302bdeae
PR
2107 rcu_read_lock();
2108 tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
2109 rcu_read_unlock();
2110 if (tp) {
2111 peer_req->flags |= EE_IN_INTERVAL_TREE;
7be8da07
AG
2112 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2113 if (err)
b411b363 2114 goto out_interrupted;
87eeee41 2115 spin_lock_irq(&mdev->tconn->req_lock);
7be8da07
AG
2116 err = handle_write_conflicts(mdev, peer_req);
2117 if (err) {
2118 spin_unlock_irq(&mdev->tconn->req_lock);
2119 if (err == -ENOENT) {
b411b363 2120 put_ldev(mdev);
82bc0194 2121 return 0;
b411b363 2122 }
7be8da07 2123 goto out_interrupted;
b411b363 2124 }
7be8da07
AG
2125 } else
2126 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2127 list_add(&peer_req->w.list, &mdev->active_ee);
87eeee41 2128 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 2129
303d1448 2130 if (mdev->tconn->agreed_pro_version < 100) {
44ed167d
PR
2131 rcu_read_lock();
2132 switch (rcu_dereference(mdev->tconn->net_conf)->wire_protocol) {
303d1448
PR
2133 case DRBD_PROT_C:
2134 dp_flags |= DP_SEND_WRITE_ACK;
2135 break;
2136 case DRBD_PROT_B:
2137 dp_flags |= DP_SEND_RECEIVE_ACK;
2138 break;
2139 }
44ed167d 2140 rcu_read_unlock();
303d1448
PR
2141 }
2142
2143 if (dp_flags & DP_SEND_WRITE_ACK) {
2144 peer_req->flags |= EE_SEND_WRITE_ACK;
b411b363
PR
2145 inc_unacked(mdev);
2146 /* corresponding dec_unacked() in e_end_block()
2147 * respective _drbd_clear_done_ee */
303d1448
PR
2148 }
2149
2150 if (dp_flags & DP_SEND_RECEIVE_ACK) {
b411b363
PR
2151 /* I really don't like it that the receiver thread
2152 * sends on the msock, but anyways */
db830c46 2153 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
b411b363
PR
2154 }
2155
6719fb03 2156 if (mdev->state.pdsk < D_INCONSISTENT) {
b411b363 2157 /* In case we have the only disk of the cluster, */
db830c46
AG
2158 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2159 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2160 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
181286ad 2161 drbd_al_begin_io(mdev, &peer_req->i);
b411b363
PR
2162 }
2163
82bc0194
AG
2164 err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2165 if (!err)
2166 return 0;
b411b363 2167
10f6d992
LE
2168 /* don't care for the reason here */
2169 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 2170 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
2171 list_del(&peer_req->w.list);
2172 drbd_remove_epoch_entry_interval(mdev, peer_req);
87eeee41 2173 spin_unlock_irq(&mdev->tconn->req_lock);
db830c46 2174 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
181286ad 2175 drbd_al_complete_io(mdev, &peer_req->i);
22cc37a9 2176
b411b363 2177out_interrupted:
db830c46 2178 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
b411b363 2179 put_ldev(mdev);
3967deb1 2180 drbd_free_peer_req(mdev, peer_req);
82bc0194 2181 return err;
b411b363
PR
2182}
2183
0f0601f4
LE
2184/* We may throttle resync, if the lower device seems to be busy,
2185 * and current sync rate is above c_min_rate.
2186 *
2187 * To decide whether or not the lower device is busy, we use a scheme similar
2188 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2189 * (more than 64 sectors) of activity we cannot account for with our own resync
2190 * activity, it obviously is "busy".
2191 *
2192 * The current sync rate used here uses only the most recent two step marks,
2193 * to have a short time average so we can react faster.
2194 */
e3555d85 2195int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
0f0601f4
LE
2196{
2197 struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2198 unsigned long db, dt, dbdt;
e3555d85 2199 struct lc_element *tmp;
0f0601f4
LE
2200 int curr_events;
2201 int throttle = 0;
daeda1cc
PR
2202 unsigned int c_min_rate;
2203
2204 rcu_read_lock();
2205 c_min_rate = rcu_dereference(mdev->ldev->disk_conf)->c_min_rate;
2206 rcu_read_unlock();
0f0601f4
LE
2207
2208 /* feature disabled? */
daeda1cc 2209 if (c_min_rate == 0)
0f0601f4
LE
2210 return 0;
2211
e3555d85
PR
2212 spin_lock_irq(&mdev->al_lock);
2213 tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2214 if (tmp) {
2215 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2216 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2217 spin_unlock_irq(&mdev->al_lock);
2218 return 0;
2219 }
2220 /* Do not slow down if app IO is already waiting for this extent */
2221 }
2222 spin_unlock_irq(&mdev->al_lock);
2223
0f0601f4
LE
2224 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2225 (int)part_stat_read(&disk->part0, sectors[1]) -
2226 atomic_read(&mdev->rs_sect_ev);
e3555d85 2227
0f0601f4
LE
2228 if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2229 unsigned long rs_left;
2230 int i;
2231
2232 mdev->rs_last_events = curr_events;
2233
2234 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2235 * approx. */
2649f080
LE
2236 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2237
2238 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2239 rs_left = mdev->ov_left;
2240 else
2241 rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
0f0601f4
LE
2242
2243 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2244 if (!dt)
2245 dt++;
2246 db = mdev->rs_mark_left[i] - rs_left;
2247 dbdt = Bit2KB(db/dt);
2248
daeda1cc 2249 if (dbdt > c_min_rate)
0f0601f4
LE
2250 throttle = 1;
2251 }
2252 return throttle;
2253}
2254
2255
4a76b161 2256static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 2257{
4a76b161 2258 struct drbd_conf *mdev;
b411b363 2259 sector_t sector;
4a76b161 2260 sector_t capacity;
db830c46 2261 struct drbd_peer_request *peer_req;
b411b363 2262 struct digest_info *di = NULL;
b18b37be 2263 int size, verb;
b411b363 2264 unsigned int fault_type;
e658983a 2265 struct p_block_req *p = pi->data;
4a76b161
AG
2266
2267 mdev = vnr_to_mdev(tconn, pi->vnr);
2268 if (!mdev)
2269 return -EIO;
2270 capacity = drbd_get_capacity(mdev->this_bdev);
b411b363
PR
2271
2272 sector = be64_to_cpu(p->sector);
2273 size = be32_to_cpu(p->blksize);
2274
c670a398 2275 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
b411b363
PR
2276 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2277 (unsigned long long)sector, size);
82bc0194 2278 return -EINVAL;
b411b363
PR
2279 }
2280 if (sector + (size>>9) > capacity) {
2281 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2282 (unsigned long long)sector, size);
82bc0194 2283 return -EINVAL;
b411b363
PR
2284 }
2285
2286 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
b18b37be 2287 verb = 1;
e2857216 2288 switch (pi->cmd) {
b18b37be
PR
2289 case P_DATA_REQUEST:
2290 drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2291 break;
2292 case P_RS_DATA_REQUEST:
2293 case P_CSUM_RS_REQUEST:
2294 case P_OV_REQUEST:
2295 drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2296 break;
2297 case P_OV_REPLY:
2298 verb = 0;
2299 dec_rs_pending(mdev);
2300 drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2301 break;
2302 default:
49ba9b1b 2303 BUG();
b18b37be
PR
2304 }
2305 if (verb && __ratelimit(&drbd_ratelimit_state))
b411b363
PR
2306 dev_err(DEV, "Can not satisfy peer's read request, "
2307 "no local data.\n");
b18b37be 2308
a821cc4a 2309 /* drain possibly payload */
e2857216 2310 return drbd_drain_block(mdev, pi->size);
b411b363
PR
2311 }
2312
2313 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2314 * "criss-cross" setup, that might cause write-out on some other DRBD,
2315 * which in turn might block on the other node at this very place. */
0db55363 2316 peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
db830c46 2317 if (!peer_req) {
b411b363 2318 put_ldev(mdev);
82bc0194 2319 return -ENOMEM;
b411b363
PR
2320 }
2321
e2857216 2322 switch (pi->cmd) {
b411b363 2323 case P_DATA_REQUEST:
db830c46 2324 peer_req->w.cb = w_e_end_data_req;
b411b363 2325 fault_type = DRBD_FAULT_DT_RD;
80a40e43
LE
2326 /* application IO, don't drbd_rs_begin_io */
2327 goto submit;
2328
b411b363 2329 case P_RS_DATA_REQUEST:
db830c46 2330 peer_req->w.cb = w_e_end_rsdata_req;
b411b363 2331 fault_type = DRBD_FAULT_RS_RD;
5f9915bb
LE
2332 /* used in the sector offset progress display */
2333 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
b411b363
PR
2334 break;
2335
2336 case P_OV_REPLY:
2337 case P_CSUM_RS_REQUEST:
2338 fault_type = DRBD_FAULT_RS_RD;
e2857216 2339 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
b411b363
PR
2340 if (!di)
2341 goto out_free_e;
2342
e2857216 2343 di->digest_size = pi->size;
b411b363
PR
2344 di->digest = (((char *)di)+sizeof(struct digest_info));
2345
db830c46
AG
2346 peer_req->digest = di;
2347 peer_req->flags |= EE_HAS_DIGEST;
c36c3ced 2348
e2857216 2349 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
b411b363
PR
2350 goto out_free_e;
2351
e2857216 2352 if (pi->cmd == P_CSUM_RS_REQUEST) {
31890f4a 2353 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
db830c46 2354 peer_req->w.cb = w_e_end_csum_rs_req;
5f9915bb
LE
2355 /* used in the sector offset progress display */
2356 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
e2857216 2357 } else if (pi->cmd == P_OV_REPLY) {
2649f080
LE
2358 /* track progress, we may need to throttle */
2359 atomic_add(size >> 9, &mdev->rs_sect_in);
db830c46 2360 peer_req->w.cb = w_e_end_ov_reply;
b411b363 2361 dec_rs_pending(mdev);
0f0601f4
LE
2362 /* drbd_rs_begin_io done when we sent this request,
2363 * but accounting still needs to be done. */
2364 goto submit_for_resync;
b411b363
PR
2365 }
2366 break;
2367
2368 case P_OV_REQUEST:
b411b363 2369 if (mdev->ov_start_sector == ~(sector_t)0 &&
31890f4a 2370 mdev->tconn->agreed_pro_version >= 90) {
de228bba
LE
2371 unsigned long now = jiffies;
2372 int i;
b411b363
PR
2373 mdev->ov_start_sector = sector;
2374 mdev->ov_position = sector;
30b743a2
LE
2375 mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2376 mdev->rs_total = mdev->ov_left;
de228bba
LE
2377 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2378 mdev->rs_mark_left[i] = mdev->ov_left;
2379 mdev->rs_mark_time[i] = now;
2380 }
b411b363
PR
2381 dev_info(DEV, "Online Verify start sector: %llu\n",
2382 (unsigned long long)sector);
2383 }
db830c46 2384 peer_req->w.cb = w_e_end_ov_req;
b411b363 2385 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2386 break;
2387
b411b363 2388 default:
49ba9b1b 2389 BUG();
b411b363
PR
2390 }
2391
0f0601f4
LE
2392 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2393 * wrt the receiver, but it is not as straightforward as it may seem.
2394 * Various places in the resync start and stop logic assume resync
2395 * requests are processed in order, requeuing this on the worker thread
2396 * introduces a bunch of new code for synchronization between threads.
2397 *
2398 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2399 * "forever", throttling after drbd_rs_begin_io will lock that extent
2400 * for application writes for the same time. For now, just throttle
2401 * here, where the rest of the code expects the receiver to sleep for
2402 * a while, anyways.
2403 */
2404
2405 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2406 * this defers syncer requests for some time, before letting at least
2407 * on request through. The resync controller on the receiving side
2408 * will adapt to the incoming rate accordingly.
2409 *
2410 * We cannot throttle here if remote is Primary/SyncTarget:
2411 * we would also throttle its application reads.
2412 * In that case, throttling is done on the SyncTarget only.
2413 */
e3555d85
PR
2414 if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2415 schedule_timeout_uninterruptible(HZ/10);
2416 if (drbd_rs_begin_io(mdev, sector))
80a40e43 2417 goto out_free_e;
b411b363 2418
0f0601f4
LE
2419submit_for_resync:
2420 atomic_add(size >> 9, &mdev->rs_sect_ev);
2421
80a40e43 2422submit:
b411b363 2423 inc_unacked(mdev);
87eeee41 2424 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2425 list_add_tail(&peer_req->w.list, &mdev->read_ee);
87eeee41 2426 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 2427
fbe29dec 2428 if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
82bc0194 2429 return 0;
b411b363 2430
10f6d992
LE
2431 /* don't care for the reason here */
2432 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 2433 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2434 list_del(&peer_req->w.list);
87eeee41 2435 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9
LE
2436 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2437
b411b363 2438out_free_e:
b411b363 2439 put_ldev(mdev);
3967deb1 2440 drbd_free_peer_req(mdev, peer_req);
82bc0194 2441 return -EIO;
b411b363
PR
2442}
2443
2444static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2445{
2446 int self, peer, rv = -100;
2447 unsigned long ch_self, ch_peer;
44ed167d 2448 enum drbd_after_sb_p after_sb_0p;
b411b363
PR
2449
2450 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2451 peer = mdev->p_uuid[UI_BITMAP] & 1;
2452
2453 ch_peer = mdev->p_uuid[UI_SIZE];
2454 ch_self = mdev->comm_bm_set;
2455
44ed167d
PR
2456 rcu_read_lock();
2457 after_sb_0p = rcu_dereference(mdev->tconn->net_conf)->after_sb_0p;
2458 rcu_read_unlock();
2459 switch (after_sb_0p) {
b411b363
PR
2460 case ASB_CONSENSUS:
2461 case ASB_DISCARD_SECONDARY:
2462 case ASB_CALL_HELPER:
44ed167d 2463 case ASB_VIOLENTLY:
b411b363
PR
2464 dev_err(DEV, "Configuration error.\n");
2465 break;
2466 case ASB_DISCONNECT:
2467 break;
2468 case ASB_DISCARD_YOUNGER_PRI:
2469 if (self == 0 && peer == 1) {
2470 rv = -1;
2471 break;
2472 }
2473 if (self == 1 && peer == 0) {
2474 rv = 1;
2475 break;
2476 }
2477 /* Else fall through to one of the other strategies... */
2478 case ASB_DISCARD_OLDER_PRI:
2479 if (self == 0 && peer == 1) {
2480 rv = 1;
2481 break;
2482 }
2483 if (self == 1 && peer == 0) {
2484 rv = -1;
2485 break;
2486 }
2487 /* Else fall through to one of the other strategies... */
ad19bf6e 2488 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
b411b363
PR
2489 "Using discard-least-changes instead\n");
2490 case ASB_DISCARD_ZERO_CHG:
2491 if (ch_peer == 0 && ch_self == 0) {
25703f83 2492 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
b411b363
PR
2493 ? -1 : 1;
2494 break;
2495 } else {
2496 if (ch_peer == 0) { rv = 1; break; }
2497 if (ch_self == 0) { rv = -1; break; }
2498 }
44ed167d 2499 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
b411b363
PR
2500 break;
2501 case ASB_DISCARD_LEAST_CHG:
2502 if (ch_self < ch_peer)
2503 rv = -1;
2504 else if (ch_self > ch_peer)
2505 rv = 1;
2506 else /* ( ch_self == ch_peer ) */
2507 /* Well, then use something else. */
25703f83 2508 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
b411b363
PR
2509 ? -1 : 1;
2510 break;
2511 case ASB_DISCARD_LOCAL:
2512 rv = -1;
2513 break;
2514 case ASB_DISCARD_REMOTE:
2515 rv = 1;
2516 }
2517
2518 return rv;
2519}
2520
2521static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2522{
6184ea21 2523 int hg, rv = -100;
44ed167d 2524 enum drbd_after_sb_p after_sb_1p;
b411b363 2525
44ed167d
PR
2526 rcu_read_lock();
2527 after_sb_1p = rcu_dereference(mdev->tconn->net_conf)->after_sb_1p;
2528 rcu_read_unlock();
2529 switch (after_sb_1p) {
b411b363
PR
2530 case ASB_DISCARD_YOUNGER_PRI:
2531 case ASB_DISCARD_OLDER_PRI:
2532 case ASB_DISCARD_LEAST_CHG:
2533 case ASB_DISCARD_LOCAL:
2534 case ASB_DISCARD_REMOTE:
44ed167d 2535 case ASB_DISCARD_ZERO_CHG:
b411b363
PR
2536 dev_err(DEV, "Configuration error.\n");
2537 break;
2538 case ASB_DISCONNECT:
2539 break;
2540 case ASB_CONSENSUS:
2541 hg = drbd_asb_recover_0p(mdev);
2542 if (hg == -1 && mdev->state.role == R_SECONDARY)
2543 rv = hg;
2544 if (hg == 1 && mdev->state.role == R_PRIMARY)
2545 rv = hg;
2546 break;
2547 case ASB_VIOLENTLY:
2548 rv = drbd_asb_recover_0p(mdev);
2549 break;
2550 case ASB_DISCARD_SECONDARY:
2551 return mdev->state.role == R_PRIMARY ? 1 : -1;
2552 case ASB_CALL_HELPER:
2553 hg = drbd_asb_recover_0p(mdev);
2554 if (hg == -1 && mdev->state.role == R_PRIMARY) {
bb437946
AG
2555 enum drbd_state_rv rv2;
2556
2557 drbd_set_role(mdev, R_SECONDARY, 0);
b411b363
PR
2558 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2559 * we might be here in C_WF_REPORT_PARAMS which is transient.
2560 * we do not need to wait for the after state change work either. */
bb437946
AG
2561 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2562 if (rv2 != SS_SUCCESS) {
b411b363
PR
2563 drbd_khelper(mdev, "pri-lost-after-sb");
2564 } else {
2565 dev_warn(DEV, "Successfully gave up primary role.\n");
2566 rv = hg;
2567 }
2568 } else
2569 rv = hg;
2570 }
2571
2572 return rv;
2573}
2574
2575static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2576{
6184ea21 2577 int hg, rv = -100;
44ed167d 2578 enum drbd_after_sb_p after_sb_2p;
b411b363 2579
44ed167d
PR
2580 rcu_read_lock();
2581 after_sb_2p = rcu_dereference(mdev->tconn->net_conf)->after_sb_2p;
2582 rcu_read_unlock();
2583 switch (after_sb_2p) {
b411b363
PR
2584 case ASB_DISCARD_YOUNGER_PRI:
2585 case ASB_DISCARD_OLDER_PRI:
2586 case ASB_DISCARD_LEAST_CHG:
2587 case ASB_DISCARD_LOCAL:
2588 case ASB_DISCARD_REMOTE:
2589 case ASB_CONSENSUS:
2590 case ASB_DISCARD_SECONDARY:
44ed167d 2591 case ASB_DISCARD_ZERO_CHG:
b411b363
PR
2592 dev_err(DEV, "Configuration error.\n");
2593 break;
2594 case ASB_VIOLENTLY:
2595 rv = drbd_asb_recover_0p(mdev);
2596 break;
2597 case ASB_DISCONNECT:
2598 break;
2599 case ASB_CALL_HELPER:
2600 hg = drbd_asb_recover_0p(mdev);
2601 if (hg == -1) {
bb437946
AG
2602 enum drbd_state_rv rv2;
2603
b411b363
PR
2604 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2605 * we might be here in C_WF_REPORT_PARAMS which is transient.
2606 * we do not need to wait for the after state change work either. */
bb437946
AG
2607 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2608 if (rv2 != SS_SUCCESS) {
b411b363
PR
2609 drbd_khelper(mdev, "pri-lost-after-sb");
2610 } else {
2611 dev_warn(DEV, "Successfully gave up primary role.\n");
2612 rv = hg;
2613 }
2614 } else
2615 rv = hg;
2616 }
2617
2618 return rv;
2619}
2620
2621static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2622 u64 bits, u64 flags)
2623{
2624 if (!uuid) {
2625 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2626 return;
2627 }
2628 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2629 text,
2630 (unsigned long long)uuid[UI_CURRENT],
2631 (unsigned long long)uuid[UI_BITMAP],
2632 (unsigned long long)uuid[UI_HISTORY_START],
2633 (unsigned long long)uuid[UI_HISTORY_END],
2634 (unsigned long long)bits,
2635 (unsigned long long)flags);
2636}
2637
2638/*
2639 100 after split brain try auto recover
2640 2 C_SYNC_SOURCE set BitMap
2641 1 C_SYNC_SOURCE use BitMap
2642 0 no Sync
2643 -1 C_SYNC_TARGET use BitMap
2644 -2 C_SYNC_TARGET set BitMap
2645 -100 after split brain, disconnect
2646-1000 unrelated data
4a23f264
PR
2647-1091 requires proto 91
2648-1096 requires proto 96
b411b363
PR
2649 */
2650static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2651{
2652 u64 self, peer;
2653 int i, j;
2654
2655 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2656 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2657
2658 *rule_nr = 10;
2659 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2660 return 0;
2661
2662 *rule_nr = 20;
2663 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2664 peer != UUID_JUST_CREATED)
2665 return -2;
2666
2667 *rule_nr = 30;
2668 if (self != UUID_JUST_CREATED &&
2669 (peer == UUID_JUST_CREATED || peer == (u64)0))
2670 return 2;
2671
2672 if (self == peer) {
2673 int rct, dc; /* roles at crash time */
2674
2675 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2676
31890f4a 2677 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2678 return -1091;
b411b363
PR
2679
2680 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2681 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2682 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2683 drbd_uuid_set_bm(mdev, 0UL);
2684
2685 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2686 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2687 *rule_nr = 34;
2688 } else {
2689 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2690 *rule_nr = 36;
2691 }
2692
2693 return 1;
2694 }
2695
2696 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2697
31890f4a 2698 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2699 return -1091;
b411b363
PR
2700
2701 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2702 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2703 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2704
2705 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2706 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2707 mdev->p_uuid[UI_BITMAP] = 0UL;
2708
2709 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2710 *rule_nr = 35;
2711 } else {
2712 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2713 *rule_nr = 37;
2714 }
2715
2716 return -1;
2717 }
2718
2719 /* Common power [off|failure] */
2720 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2721 (mdev->p_uuid[UI_FLAGS] & 2);
2722 /* lowest bit is set when we were primary,
2723 * next bit (weight 2) is set when peer was primary */
2724 *rule_nr = 40;
2725
2726 switch (rct) {
2727 case 0: /* !self_pri && !peer_pri */ return 0;
2728 case 1: /* self_pri && !peer_pri */ return 1;
2729 case 2: /* !self_pri && peer_pri */ return -1;
2730 case 3: /* self_pri && peer_pri */
25703f83 2731 dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
b411b363
PR
2732 return dc ? -1 : 1;
2733 }
2734 }
2735
2736 *rule_nr = 50;
2737 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2738 if (self == peer)
2739 return -1;
2740
2741 *rule_nr = 51;
2742 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2743 if (self == peer) {
31890f4a 2744 if (mdev->tconn->agreed_pro_version < 96 ?
4a23f264
PR
2745 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2746 (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2747 peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2748 /* The last P_SYNC_UUID did not get though. Undo the last start of
2749 resync as sync source modifications of the peer's UUIDs. */
2750
31890f4a 2751 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2752 return -1091;
b411b363
PR
2753
2754 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2755 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
4a23f264
PR
2756
2757 dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2758 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2759
b411b363
PR
2760 return -1;
2761 }
2762 }
2763
2764 *rule_nr = 60;
2765 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2766 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2767 peer = mdev->p_uuid[i] & ~((u64)1);
2768 if (self == peer)
2769 return -2;
2770 }
2771
2772 *rule_nr = 70;
2773 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2774 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2775 if (self == peer)
2776 return 1;
2777
2778 *rule_nr = 71;
2779 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2780 if (self == peer) {
31890f4a 2781 if (mdev->tconn->agreed_pro_version < 96 ?
4a23f264
PR
2782 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2783 (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2784 self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2785 /* The last P_SYNC_UUID did not get though. Undo the last start of
2786 resync as sync source modifications of our UUIDs. */
2787
31890f4a 2788 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2789 return -1091;
b411b363
PR
2790
2791 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2792 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2793
4a23f264 2794 dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
b411b363
PR
2795 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2796 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2797
2798 return 1;
2799 }
2800 }
2801
2802
2803 *rule_nr = 80;
d8c2a36b 2804 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
b411b363
PR
2805 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2806 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2807 if (self == peer)
2808 return 2;
2809 }
2810
2811 *rule_nr = 90;
2812 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2813 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2814 if (self == peer && self != ((u64)0))
2815 return 100;
2816
2817 *rule_nr = 100;
2818 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2819 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2820 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2821 peer = mdev->p_uuid[j] & ~((u64)1);
2822 if (self == peer)
2823 return -100;
2824 }
2825 }
2826
2827 return -1000;
2828}
2829
2830/* drbd_sync_handshake() returns the new conn state on success, or
2831 CONN_MASK (-1) on failure.
2832 */
2833static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2834 enum drbd_disk_state peer_disk) __must_hold(local)
2835{
b411b363
PR
2836 enum drbd_conns rv = C_MASK;
2837 enum drbd_disk_state mydisk;
44ed167d 2838 struct net_conf *nc;
6dff2902 2839 int hg, rule_nr, rr_conflict, tentative;
b411b363
PR
2840
2841 mydisk = mdev->state.disk;
2842 if (mydisk == D_NEGOTIATING)
2843 mydisk = mdev->new_state_tmp.disk;
2844
2845 dev_info(DEV, "drbd_sync_handshake:\n");
2846 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2847 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2848 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2849
2850 hg = drbd_uuid_compare(mdev, &rule_nr);
2851
2852 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2853
2854 if (hg == -1000) {
2855 dev_alert(DEV, "Unrelated data, aborting!\n");
2856 return C_MASK;
2857 }
4a23f264
PR
2858 if (hg < -1000) {
2859 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
b411b363
PR
2860 return C_MASK;
2861 }
2862
2863 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2864 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2865 int f = (hg == -100) || abs(hg) == 2;
2866 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2867 if (f)
2868 hg = hg*2;
2869 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2870 hg > 0 ? "source" : "target");
2871 }
2872
3a11a487
AG
2873 if (abs(hg) == 100)
2874 drbd_khelper(mdev, "initial-split-brain");
2875
44ed167d
PR
2876 rcu_read_lock();
2877 nc = rcu_dereference(mdev->tconn->net_conf);
2878
2879 if (hg == 100 || (hg == -100 && nc->always_asbp)) {
b411b363
PR
2880 int pcount = (mdev->state.role == R_PRIMARY)
2881 + (peer_role == R_PRIMARY);
2882 int forced = (hg == -100);
2883
2884 switch (pcount) {
2885 case 0:
2886 hg = drbd_asb_recover_0p(mdev);
2887 break;
2888 case 1:
2889 hg = drbd_asb_recover_1p(mdev);
2890 break;
2891 case 2:
2892 hg = drbd_asb_recover_2p(mdev);
2893 break;
2894 }
2895 if (abs(hg) < 100) {
2896 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2897 "automatically solved. Sync from %s node\n",
2898 pcount, (hg < 0) ? "peer" : "this");
2899 if (forced) {
2900 dev_warn(DEV, "Doing a full sync, since"
2901 " UUIDs where ambiguous.\n");
2902 hg = hg*2;
2903 }
2904 }
2905 }
2906
2907 if (hg == -100) {
6139f60d 2908 if (nc->discard_my_data && !(mdev->p_uuid[UI_FLAGS]&1))
b411b363 2909 hg = -1;
6139f60d 2910 if (!nc->discard_my_data && (mdev->p_uuid[UI_FLAGS]&1))
b411b363
PR
2911 hg = 1;
2912
2913 if (abs(hg) < 100)
2914 dev_warn(DEV, "Split-Brain detected, manually solved. "
2915 "Sync from %s node\n",
2916 (hg < 0) ? "peer" : "this");
2917 }
44ed167d 2918 rr_conflict = nc->rr_conflict;
6dff2902 2919 tentative = nc->tentative;
44ed167d 2920 rcu_read_unlock();
b411b363
PR
2921
2922 if (hg == -100) {
580b9767
LE
2923 /* FIXME this log message is not correct if we end up here
2924 * after an attempted attach on a diskless node.
2925 * We just refuse to attach -- well, we drop the "connection"
2926 * to that disk, in a way... */
3a11a487 2927 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
b411b363
PR
2928 drbd_khelper(mdev, "split-brain");
2929 return C_MASK;
2930 }
2931
2932 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2933 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2934 return C_MASK;
2935 }
2936
2937 if (hg < 0 && /* by intention we do not use mydisk here. */
2938 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
44ed167d 2939 switch (rr_conflict) {
b411b363
PR
2940 case ASB_CALL_HELPER:
2941 drbd_khelper(mdev, "pri-lost");
2942 /* fall through */
2943 case ASB_DISCONNECT:
2944 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2945 return C_MASK;
2946 case ASB_VIOLENTLY:
2947 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2948 "assumption\n");
2949 }
2950 }
2951
6dff2902 2952 if (tentative || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
cf14c2e9
PR
2953 if (hg == 0)
2954 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2955 else
2956 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2957 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2958 abs(hg) >= 2 ? "full" : "bit-map based");
2959 return C_MASK;
2960 }
2961
b411b363
PR
2962 if (abs(hg) >= 2) {
2963 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
20ceb2b2
LE
2964 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2965 BM_LOCKED_SET_ALLOWED))
b411b363
PR
2966 return C_MASK;
2967 }
2968
2969 if (hg > 0) { /* become sync source. */
2970 rv = C_WF_BITMAP_S;
2971 } else if (hg < 0) { /* become sync target */
2972 rv = C_WF_BITMAP_T;
2973 } else {
2974 rv = C_CONNECTED;
2975 if (drbd_bm_total_weight(mdev)) {
2976 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2977 drbd_bm_total_weight(mdev));
2978 }
2979 }
2980
2981 return rv;
2982}
2983
f179d76d 2984static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
b411b363
PR
2985{
2986 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
f179d76d
PR
2987 if (peer == ASB_DISCARD_REMOTE)
2988 return ASB_DISCARD_LOCAL;
b411b363
PR
2989
2990 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
f179d76d
PR
2991 if (peer == ASB_DISCARD_LOCAL)
2992 return ASB_DISCARD_REMOTE;
b411b363
PR
2993
2994 /* everything else is valid if they are equal on both sides. */
f179d76d 2995 return peer;
b411b363
PR
2996}
2997
e2857216 2998static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 2999{
e658983a 3000 struct p_protocol *p = pi->data;
036b17ea
PR
3001 enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3002 int p_proto, p_discard_my_data, p_two_primaries, cf;
3003 struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3004 char integrity_alg[SHARED_SECRET_MAX] = "";
b792c35c 3005 struct crypto_hash *peer_integrity_tfm = NULL, *integrity_tfm = NULL;
7aca6c75 3006 void *int_dig_in = NULL, *int_dig_vv = NULL;
b411b363 3007
b411b363
PR
3008 p_proto = be32_to_cpu(p->protocol);
3009 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
3010 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
3011 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
b411b363 3012 p_two_primaries = be32_to_cpu(p->two_primaries);
cf14c2e9 3013 cf = be32_to_cpu(p->conn_flags);
6139f60d 3014 p_discard_my_data = cf & CF_DISCARD_MY_DATA;
cf14c2e9 3015
86db0618
AG
3016 if (tconn->agreed_pro_version >= 87) {
3017 int err;
3018
88104ca4 3019 if (pi->size > sizeof(integrity_alg))
86db0618 3020 return -EIO;
88104ca4 3021 err = drbd_recv_all(tconn, integrity_alg, pi->size);
86db0618
AG
3022 if (err)
3023 return err;
036b17ea
PR
3024 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3025 }
88104ca4 3026
036b17ea 3027 if (pi->cmd == P_PROTOCOL_UPDATE) {
88104ca4 3028 if (integrity_alg[0]) {
7aca6c75
PR
3029 int hash_size;
3030
b792c35c
AG
3031 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3032 integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3033 if (!(peer_integrity_tfm && integrity_tfm)) {
88104ca4
AG
3034 conn_err(tconn, "peer data-integrity-alg %s not supported\n",
3035 integrity_alg);
3036 goto disconnect;
3037 }
7aca6c75 3038
b792c35c 3039 hash_size = crypto_hash_digestsize(integrity_tfm);
7aca6c75
PR
3040 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3041 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3042 if (!(int_dig_in && int_dig_vv)) {
036b17ea 3043 conn_err(tconn, "Allocation of buffers for data integrity checking failed\n");
7aca6c75
PR
3044 goto disconnect;
3045 }
88104ca4 3046 }
86db0618 3047
036b17ea
PR
3048 new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3049 if (!new_net_conf) {
3050 conn_err(tconn, "Allocation of new net_conf failed\n");
3051 goto disconnect;
3052 }
3053
3054 mutex_lock(&tconn->data.mutex);
3055 mutex_lock(&tconn->conf_update);
3056 old_net_conf = tconn->net_conf;
3057 *new_net_conf = *old_net_conf;
3058
3059 new_net_conf->wire_protocol = p_proto;
3060 new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3061 new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3062 new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3063 new_net_conf->two_primaries = p_two_primaries;
3064 strcpy(new_net_conf->integrity_alg, integrity_alg);
3065 new_net_conf->integrity_alg_len = strlen(integrity_alg) + 1;
3066
3067 crypto_free_hash(tconn->integrity_tfm);
b792c35c 3068 tconn->integrity_tfm = integrity_tfm;
036b17ea
PR
3069
3070 rcu_assign_pointer(tconn->net_conf, new_net_conf);
3071 mutex_unlock(&tconn->conf_update);
3072 mutex_unlock(&tconn->data.mutex);
3073
3074 crypto_free_hash(tconn->peer_integrity_tfm);
7aca6c75
PR
3075 kfree(tconn->int_dig_in);
3076 kfree(tconn->int_dig_vv);
b792c35c 3077 tconn->peer_integrity_tfm = peer_integrity_tfm;
7aca6c75
PR
3078 tconn->int_dig_in = int_dig_in;
3079 tconn->int_dig_vv = int_dig_vv;
036b17ea
PR
3080
3081 if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3082 conn_info(tconn, "peer data-integrity-alg: %s\n", integrity_alg);
3083
3084 synchronize_rcu();
3085 kfree(old_net_conf);
fbc12f45
AG
3086 } else {
3087 clear_bit(CONN_DRY_RUN, &tconn->flags);
036b17ea 3088
fbc12f45
AG
3089 if (cf & CF_DRY_RUN)
3090 set_bit(CONN_DRY_RUN, &tconn->flags);
cf14c2e9 3091
fbc12f45
AG
3092 rcu_read_lock();
3093 nc = rcu_dereference(tconn->net_conf);
b411b363 3094
fbc12f45
AG
3095 if (p_proto != nc->wire_protocol) {
3096 conn_err(tconn, "incompatible communication protocols\n");
3097 goto disconnect_rcu_unlock;
3098 }
44ed167d 3099
fbc12f45
AG
3100 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3101 conn_err(tconn, "incompatible after-sb-0pri settings\n");
3102 goto disconnect_rcu_unlock;
3103 }
b411b363 3104
fbc12f45
AG
3105 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3106 conn_err(tconn, "incompatible after-sb-1pri settings\n");
3107 goto disconnect_rcu_unlock;
3108 }
b411b363 3109
fbc12f45
AG
3110 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3111 conn_err(tconn, "incompatible after-sb-2pri settings\n");
3112 goto disconnect_rcu_unlock;
3113 }
b411b363 3114
fbc12f45
AG
3115 if (p_discard_my_data && nc->discard_my_data) {
3116 conn_err(tconn, "both sides have the 'discard_my_data' flag set\n");
3117 goto disconnect_rcu_unlock;
3118 }
b411b363 3119
fbc12f45
AG
3120 if (p_two_primaries != nc->two_primaries) {
3121 conn_err(tconn, "incompatible setting of the two-primaries options\n");
3122 goto disconnect_rcu_unlock;
3123 }
b411b363 3124
fbc12f45
AG
3125 if (strcmp(integrity_alg, nc->integrity_alg)) {
3126 conn_err(tconn, "incompatible setting of the data-integrity-alg\n");
3127 goto disconnect_rcu_unlock;
3128 }
b411b363 3129
fbc12f45 3130 rcu_read_unlock();
036b17ea 3131 }
82bc0194 3132 return 0;
b411b363 3133
44ed167d
PR
3134disconnect_rcu_unlock:
3135 rcu_read_unlock();
b411b363 3136disconnect:
b792c35c
AG
3137 crypto_free_hash(peer_integrity_tfm);
3138 crypto_free_hash(integrity_tfm);
036b17ea
PR
3139 kfree(int_dig_in);
3140 kfree(int_dig_vv);
7204624c 3141 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3142 return -EIO;
b411b363
PR
3143}
3144
3145/* helper function
3146 * input: alg name, feature name
3147 * return: NULL (alg name was "")
3148 * ERR_PTR(error) if something goes wrong
3149 * or the crypto hash ptr, if it worked out ok. */
3150struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
3151 const char *alg, const char *name)
3152{
3153 struct crypto_hash *tfm;
3154
3155 if (!alg[0])
3156 return NULL;
3157
3158 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3159 if (IS_ERR(tfm)) {
3160 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3161 alg, name, PTR_ERR(tfm));
3162 return tfm;
3163 }
b411b363
PR
3164 return tfm;
3165}
3166
4a76b161
AG
3167static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3168{
3169 void *buffer = tconn->data.rbuf;
3170 int size = pi->size;
3171
3172 while (size) {
3173 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3174 s = drbd_recv(tconn, buffer, s);
3175 if (s <= 0) {
3176 if (s < 0)
3177 return s;
3178 break;
3179 }
3180 size -= s;
3181 }
3182 if (size)
3183 return -EIO;
3184 return 0;
3185}
3186
3187/*
3188 * config_unknown_volume - device configuration command for unknown volume
3189 *
3190 * When a device is added to an existing connection, the node on which the
3191 * device is added first will send configuration commands to its peer but the
3192 * peer will not know about the device yet. It will warn and ignore these
3193 * commands. Once the device is added on the second node, the second node will
3194 * send the same device configuration commands, but in the other direction.
3195 *
3196 * (We can also end up here if drbd is misconfigured.)
3197 */
3198static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3199{
2fcb8f30
AG
3200 conn_warn(tconn, "%s packet received for volume %u, which is not configured locally\n",
3201 cmdname(pi->cmd), pi->vnr);
4a76b161
AG
3202 return ignore_remaining_packet(tconn, pi);
3203}
3204
3205static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3206{
4a76b161 3207 struct drbd_conf *mdev;
e658983a 3208 struct p_rs_param_95 *p;
b411b363
PR
3209 unsigned int header_size, data_size, exp_max_sz;
3210 struct crypto_hash *verify_tfm = NULL;
3211 struct crypto_hash *csums_tfm = NULL;
2ec91e0e 3212 struct net_conf *old_net_conf, *new_net_conf = NULL;
813472ce 3213 struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
4a76b161 3214 const int apv = tconn->agreed_pro_version;
813472ce 3215 struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
778f271d 3216 int fifo_size = 0;
82bc0194 3217 int err;
b411b363 3218
4a76b161
AG
3219 mdev = vnr_to_mdev(tconn, pi->vnr);
3220 if (!mdev)
3221 return config_unknown_volume(tconn, pi);
3222
b411b363
PR
3223 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
3224 : apv == 88 ? sizeof(struct p_rs_param)
3225 + SHARED_SECRET_MAX
8e26f9cc
PR
3226 : apv <= 94 ? sizeof(struct p_rs_param_89)
3227 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
b411b363 3228
e2857216 3229 if (pi->size > exp_max_sz) {
b411b363 3230 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
e2857216 3231 pi->size, exp_max_sz);
82bc0194 3232 return -EIO;
b411b363
PR
3233 }
3234
3235 if (apv <= 88) {
e658983a 3236 header_size = sizeof(struct p_rs_param);
e2857216 3237 data_size = pi->size - header_size;
8e26f9cc 3238 } else if (apv <= 94) {
e658983a 3239 header_size = sizeof(struct p_rs_param_89);
e2857216 3240 data_size = pi->size - header_size;
b411b363 3241 D_ASSERT(data_size == 0);
8e26f9cc 3242 } else {
e658983a 3243 header_size = sizeof(struct p_rs_param_95);
e2857216 3244 data_size = pi->size - header_size;
b411b363
PR
3245 D_ASSERT(data_size == 0);
3246 }
3247
3248 /* initialize verify_alg and csums_alg */
e658983a 3249 p = pi->data;
b411b363
PR
3250 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3251
e658983a 3252 err = drbd_recv_all(mdev->tconn, p, header_size);
82bc0194
AG
3253 if (err)
3254 return err;
b411b363 3255
daeda1cc
PR
3256 mutex_lock(&mdev->tconn->conf_update);
3257 old_net_conf = mdev->tconn->net_conf;
813472ce
PR
3258 if (get_ldev(mdev)) {
3259 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3260 if (!new_disk_conf) {
3261 put_ldev(mdev);
3262 mutex_unlock(&mdev->tconn->conf_update);
3263 dev_err(DEV, "Allocation of new disk_conf failed\n");
3264 return -ENOMEM;
3265 }
daeda1cc 3266
813472ce
PR
3267 old_disk_conf = mdev->ldev->disk_conf;
3268 *new_disk_conf = *old_disk_conf;
3269
6394b935 3270 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
813472ce 3271 }
daeda1cc 3272
b411b363
PR
3273 if (apv >= 88) {
3274 if (apv == 88) {
3275 if (data_size > SHARED_SECRET_MAX) {
3276 dev_err(DEV, "verify-alg too long, "
3277 "peer wants %u, accepting only %u byte\n",
3278 data_size, SHARED_SECRET_MAX);
813472ce
PR
3279 err = -EIO;
3280 goto reconnect;
b411b363
PR
3281 }
3282
82bc0194 3283 err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
813472ce
PR
3284 if (err)
3285 goto reconnect;
b411b363
PR
3286 /* we expect NUL terminated string */
3287 /* but just in case someone tries to be evil */
3288 D_ASSERT(p->verify_alg[data_size-1] == 0);
3289 p->verify_alg[data_size-1] = 0;
3290
3291 } else /* apv >= 89 */ {
3292 /* we still expect NUL terminated strings */
3293 /* but just in case someone tries to be evil */
3294 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3295 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3296 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3297 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3298 }
3299
2ec91e0e 3300 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
b411b363
PR
3301 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3302 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2ec91e0e 3303 old_net_conf->verify_alg, p->verify_alg);
b411b363
PR
3304 goto disconnect;
3305 }
3306 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3307 p->verify_alg, "verify-alg");
3308 if (IS_ERR(verify_tfm)) {
3309 verify_tfm = NULL;
3310 goto disconnect;
3311 }
3312 }
3313
2ec91e0e 3314 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
b411b363
PR
3315 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3316 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2ec91e0e 3317 old_net_conf->csums_alg, p->csums_alg);
b411b363
PR
3318 goto disconnect;
3319 }
3320 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3321 p->csums_alg, "csums-alg");
3322 if (IS_ERR(csums_tfm)) {
3323 csums_tfm = NULL;
3324 goto disconnect;
3325 }
3326 }
3327
813472ce 3328 if (apv > 94 && new_disk_conf) {
daeda1cc
PR
3329 new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3330 new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3331 new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3332 new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
778f271d 3333
daeda1cc 3334 fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
9958c857 3335 if (fifo_size != mdev->rs_plan_s->size) {
813472ce
PR
3336 new_plan = fifo_alloc(fifo_size);
3337 if (!new_plan) {
778f271d 3338 dev_err(DEV, "kmalloc of fifo_buffer failed");
f399002e 3339 put_ldev(mdev);
778f271d
PR
3340 goto disconnect;
3341 }
3342 }
8e26f9cc 3343 }
b411b363 3344
91fd4dad 3345 if (verify_tfm || csums_tfm) {
2ec91e0e
PR
3346 new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3347 if (!new_net_conf) {
91fd4dad
PR
3348 dev_err(DEV, "Allocation of new net_conf failed\n");
3349 goto disconnect;
3350 }
3351
2ec91e0e 3352 *new_net_conf = *old_net_conf;
91fd4dad
PR
3353
3354 if (verify_tfm) {
2ec91e0e
PR
3355 strcpy(new_net_conf->verify_alg, p->verify_alg);
3356 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
91fd4dad
PR
3357 crypto_free_hash(mdev->tconn->verify_tfm);
3358 mdev->tconn->verify_tfm = verify_tfm;
3359 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3360 }
3361 if (csums_tfm) {
2ec91e0e
PR
3362 strcpy(new_net_conf->csums_alg, p->csums_alg);
3363 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
91fd4dad
PR
3364 crypto_free_hash(mdev->tconn->csums_tfm);
3365 mdev->tconn->csums_tfm = csums_tfm;
3366 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3367 }
2ec91e0e 3368 rcu_assign_pointer(tconn->net_conf, new_net_conf);
b411b363 3369 }
daeda1cc 3370 }
91fd4dad 3371
813472ce
PR
3372 if (new_disk_conf) {
3373 rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3374 put_ldev(mdev);
3375 }
3376
3377 if (new_plan) {
3378 old_plan = mdev->rs_plan_s;
3379 rcu_assign_pointer(mdev->rs_plan_s, new_plan);
b411b363 3380 }
daeda1cc
PR
3381
3382 mutex_unlock(&mdev->tconn->conf_update);
3383 synchronize_rcu();
3384 if (new_net_conf)
3385 kfree(old_net_conf);
3386 kfree(old_disk_conf);
813472ce 3387 kfree(old_plan);
daeda1cc 3388
82bc0194 3389 return 0;
b411b363 3390
813472ce
PR
3391reconnect:
3392 if (new_disk_conf) {
3393 put_ldev(mdev);
3394 kfree(new_disk_conf);
3395 }
3396 mutex_unlock(&mdev->tconn->conf_update);
3397 return -EIO;
3398
b411b363 3399disconnect:
813472ce
PR
3400 kfree(new_plan);
3401 if (new_disk_conf) {
3402 put_ldev(mdev);
3403 kfree(new_disk_conf);
3404 }
a0095508 3405 mutex_unlock(&mdev->tconn->conf_update);
b411b363
PR
3406 /* just for completeness: actually not needed,
3407 * as this is not reached if csums_tfm was ok. */
3408 crypto_free_hash(csums_tfm);
3409 /* but free the verify_tfm again, if csums_tfm did not work out */
3410 crypto_free_hash(verify_tfm);
38fa9988 3411 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3412 return -EIO;
b411b363
PR
3413}
3414
b411b363
PR
3415/* warn if the arguments differ by more than 12.5% */
3416static void warn_if_differ_considerably(struct drbd_conf *mdev,
3417 const char *s, sector_t a, sector_t b)
3418{
3419 sector_t d;
3420 if (a == 0 || b == 0)
3421 return;
3422 d = (a > b) ? (a - b) : (b - a);
3423 if (d > (a>>3) || d > (b>>3))
3424 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3425 (unsigned long long)a, (unsigned long long)b);
3426}
3427
4a76b161 3428static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3429{
4a76b161 3430 struct drbd_conf *mdev;
e658983a 3431 struct p_sizes *p = pi->data;
b411b363 3432 enum determine_dev_size dd = unchanged;
b411b363
PR
3433 sector_t p_size, p_usize, my_usize;
3434 int ldsc = 0; /* local disk size changed */
e89b591c 3435 enum dds_flags ddsf;
b411b363 3436
4a76b161
AG
3437 mdev = vnr_to_mdev(tconn, pi->vnr);
3438 if (!mdev)
3439 return config_unknown_volume(tconn, pi);
3440
b411b363
PR
3441 p_size = be64_to_cpu(p->d_size);
3442 p_usize = be64_to_cpu(p->u_size);
3443
b411b363
PR
3444 /* just store the peer's disk size for now.
3445 * we still need to figure out whether we accept that. */
3446 mdev->p_size = p_size;
3447
b411b363 3448 if (get_ldev(mdev)) {
daeda1cc
PR
3449 rcu_read_lock();
3450 my_usize = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
3451 rcu_read_unlock();
3452
b411b363
PR
3453 warn_if_differ_considerably(mdev, "lower level device sizes",
3454 p_size, drbd_get_max_capacity(mdev->ldev));
3455 warn_if_differ_considerably(mdev, "user requested size",
daeda1cc 3456 p_usize, my_usize);
b411b363
PR
3457
3458 /* if this is the first connect, or an otherwise expected
3459 * param exchange, choose the minimum */
3460 if (mdev->state.conn == C_WF_REPORT_PARAMS)
daeda1cc 3461 p_usize = min_not_zero(my_usize, p_usize);
b411b363
PR
3462
3463 /* Never shrink a device with usable data during connect.
3464 But allow online shrinking if we are connected. */
ef5e44a6 3465 if (drbd_new_dev_size(mdev, mdev->ldev, p_usize, 0) <
daeda1cc
PR
3466 drbd_get_capacity(mdev->this_bdev) &&
3467 mdev->state.disk >= D_OUTDATED &&
3468 mdev->state.conn < C_CONNECTED) {
b411b363 3469 dev_err(DEV, "The peer's disk size is too small!\n");
38fa9988 3470 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 3471 put_ldev(mdev);
82bc0194 3472 return -EIO;
b411b363 3473 }
daeda1cc
PR
3474
3475 if (my_usize != p_usize) {
3476 struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3477
3478 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3479 if (!new_disk_conf) {
3480 dev_err(DEV, "Allocation of new disk_conf failed\n");
3481 put_ldev(mdev);
3482 return -ENOMEM;
3483 }
3484
3485 mutex_lock(&mdev->tconn->conf_update);
3486 old_disk_conf = mdev->ldev->disk_conf;
3487 *new_disk_conf = *old_disk_conf;
3488 new_disk_conf->disk_size = p_usize;
3489
3490 rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3491 mutex_unlock(&mdev->tconn->conf_update);
3492 synchronize_rcu();
3493 kfree(old_disk_conf);
3494
3495 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3496 (unsigned long)my_usize);
3497 }
3498
b411b363
PR
3499 put_ldev(mdev);
3500 }
b411b363 3501
e89b591c 3502 ddsf = be16_to_cpu(p->dds_flags);
b411b363 3503 if (get_ldev(mdev)) {
24c4830c 3504 dd = drbd_determine_dev_size(mdev, ddsf);
b411b363
PR
3505 put_ldev(mdev);
3506 if (dd == dev_size_error)
82bc0194 3507 return -EIO;
b411b363
PR
3508 drbd_md_sync(mdev);
3509 } else {
3510 /* I am diskless, need to accept the peer's size. */
3511 drbd_set_my_capacity(mdev, p_size);
3512 }
3513
99432fcc
PR
3514 mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3515 drbd_reconsider_max_bio_size(mdev);
3516
b411b363
PR
3517 if (get_ldev(mdev)) {
3518 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3519 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3520 ldsc = 1;
3521 }
3522
b411b363
PR
3523 put_ldev(mdev);
3524 }
3525
3526 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3527 if (be64_to_cpu(p->c_size) !=
3528 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3529 /* we have different sizes, probably peer
3530 * needs to know my new size... */
e89b591c 3531 drbd_send_sizes(mdev, 0, ddsf);
b411b363
PR
3532 }
3533 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3534 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3535 if (mdev->state.pdsk >= D_INCONSISTENT &&
e89b591c
PR
3536 mdev->state.disk >= D_INCONSISTENT) {
3537 if (ddsf & DDSF_NO_RESYNC)
3538 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3539 else
3540 resync_after_online_grow(mdev);
3541 } else
b411b363
PR
3542 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3543 }
3544 }
3545
82bc0194 3546 return 0;
b411b363
PR
3547}
3548
4a76b161 3549static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3550{
4a76b161 3551 struct drbd_conf *mdev;
e658983a 3552 struct p_uuids *p = pi->data;
b411b363 3553 u64 *p_uuid;
62b0da3a 3554 int i, updated_uuids = 0;
b411b363 3555
4a76b161
AG
3556 mdev = vnr_to_mdev(tconn, pi->vnr);
3557 if (!mdev)
3558 return config_unknown_volume(tconn, pi);
3559
b411b363
PR
3560 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3561
3562 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3563 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3564
3565 kfree(mdev->p_uuid);
3566 mdev->p_uuid = p_uuid;
3567
3568 if (mdev->state.conn < C_CONNECTED &&
3569 mdev->state.disk < D_INCONSISTENT &&
3570 mdev->state.role == R_PRIMARY &&
3571 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3572 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3573 (unsigned long long)mdev->ed_uuid);
38fa9988 3574 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3575 return -EIO;
b411b363
PR
3576 }
3577
3578 if (get_ldev(mdev)) {
3579 int skip_initial_sync =
3580 mdev->state.conn == C_CONNECTED &&
31890f4a 3581 mdev->tconn->agreed_pro_version >= 90 &&
b411b363
PR
3582 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3583 (p_uuid[UI_FLAGS] & 8);
3584 if (skip_initial_sync) {
3585 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3586 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
20ceb2b2
LE
3587 "clear_n_write from receive_uuids",
3588 BM_LOCKED_TEST_ALLOWED);
b411b363
PR
3589 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3590 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3591 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3592 CS_VERBOSE, NULL);
3593 drbd_md_sync(mdev);
62b0da3a 3594 updated_uuids = 1;
b411b363
PR
3595 }
3596 put_ldev(mdev);
18a50fa2
PR
3597 } else if (mdev->state.disk < D_INCONSISTENT &&
3598 mdev->state.role == R_PRIMARY) {
3599 /* I am a diskless primary, the peer just created a new current UUID
3600 for me. */
62b0da3a 3601 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
b411b363
PR
3602 }
3603
3604 /* Before we test for the disk state, we should wait until an eventually
3605 ongoing cluster wide state change is finished. That is important if
3606 we are primary and are detaching from our disk. We need to see the
3607 new disk state... */
8410da8f
PR
3608 mutex_lock(mdev->state_mutex);
3609 mutex_unlock(mdev->state_mutex);
b411b363 3610 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
62b0da3a
LE
3611 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3612
3613 if (updated_uuids)
3614 drbd_print_uuids(mdev, "receiver updated UUIDs to");
b411b363 3615
82bc0194 3616 return 0;
b411b363
PR
3617}
3618
3619/**
3620 * convert_state() - Converts the peer's view of the cluster state to our point of view
3621 * @ps: The state as seen by the peer.
3622 */
3623static union drbd_state convert_state(union drbd_state ps)
3624{
3625 union drbd_state ms;
3626
3627 static enum drbd_conns c_tab[] = {
369bea63 3628 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
b411b363
PR
3629 [C_CONNECTED] = C_CONNECTED,
3630
3631 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3632 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3633 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3634 [C_VERIFY_S] = C_VERIFY_T,
3635 [C_MASK] = C_MASK,
3636 };
3637
3638 ms.i = ps.i;
3639
3640 ms.conn = c_tab[ps.conn];
3641 ms.peer = ps.role;
3642 ms.role = ps.peer;
3643 ms.pdsk = ps.disk;
3644 ms.disk = ps.pdsk;
3645 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3646
3647 return ms;
3648}
3649
4a76b161 3650static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3651{
4a76b161 3652 struct drbd_conf *mdev;
e658983a 3653 struct p_req_state *p = pi->data;
b411b363 3654 union drbd_state mask, val;
bf885f8a 3655 enum drbd_state_rv rv;
b411b363 3656
4a76b161
AG
3657 mdev = vnr_to_mdev(tconn, pi->vnr);
3658 if (!mdev)
3659 return -EIO;
3660
b411b363
PR
3661 mask.i = be32_to_cpu(p->mask);
3662 val.i = be32_to_cpu(p->val);
3663
25703f83 3664 if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
8410da8f 3665 mutex_is_locked(mdev->state_mutex)) {
b411b363 3666 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
82bc0194 3667 return 0;
b411b363
PR
3668 }
3669
3670 mask = convert_state(mask);
3671 val = convert_state(val);
3672
dfafcc8a
PR
3673 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3674 drbd_send_sr_reply(mdev, rv);
b411b363 3675
b411b363
PR
3676 drbd_md_sync(mdev);
3677
82bc0194 3678 return 0;
b411b363
PR
3679}
3680
e2857216 3681static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
dfafcc8a 3682{
e658983a 3683 struct p_req_state *p = pi->data;
dfafcc8a
PR
3684 union drbd_state mask, val;
3685 enum drbd_state_rv rv;
3686
3687 mask.i = be32_to_cpu(p->mask);
3688 val.i = be32_to_cpu(p->val);
3689
3690 if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3691 mutex_is_locked(&tconn->cstate_mutex)) {
3692 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
82bc0194 3693 return 0;
dfafcc8a
PR
3694 }
3695
3696 mask = convert_state(mask);
3697 val = convert_state(val);
3698
778bcf2e 3699 rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
dfafcc8a
PR
3700 conn_send_sr_reply(tconn, rv);
3701
82bc0194 3702 return 0;
dfafcc8a
PR
3703}
3704
4a76b161 3705static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3706{
4a76b161 3707 struct drbd_conf *mdev;
e658983a 3708 struct p_state *p = pi->data;
4ac4aada 3709 union drbd_state os, ns, peer_state;
b411b363 3710 enum drbd_disk_state real_peer_disk;
65d922c3 3711 enum chg_state_flags cs_flags;
b411b363
PR
3712 int rv;
3713
4a76b161
AG
3714 mdev = vnr_to_mdev(tconn, pi->vnr);
3715 if (!mdev)
3716 return config_unknown_volume(tconn, pi);
3717
b411b363
PR
3718 peer_state.i = be32_to_cpu(p->state);
3719
3720 real_peer_disk = peer_state.disk;
3721 if (peer_state.disk == D_NEGOTIATING) {
3722 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3723 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3724 }
3725
87eeee41 3726 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 3727 retry:
78bae59b 3728 os = ns = drbd_read_state(mdev);
87eeee41 3729 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 3730
e9ef7bb6
LE
3731 /* peer says his disk is uptodate, while we think it is inconsistent,
3732 * and this happens while we think we have a sync going on. */
3733 if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3734 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3735 /* If we are (becoming) SyncSource, but peer is still in sync
3736 * preparation, ignore its uptodate-ness to avoid flapping, it
3737 * will change to inconsistent once the peer reaches active
3738 * syncing states.
3739 * It may have changed syncer-paused flags, however, so we
3740 * cannot ignore this completely. */
3741 if (peer_state.conn > C_CONNECTED &&
3742 peer_state.conn < C_SYNC_SOURCE)
3743 real_peer_disk = D_INCONSISTENT;
3744
3745 /* if peer_state changes to connected at the same time,
3746 * it explicitly notifies us that it finished resync.
3747 * Maybe we should finish it up, too? */
3748 else if (os.conn >= C_SYNC_SOURCE &&
3749 peer_state.conn == C_CONNECTED) {
3750 if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3751 drbd_resync_finished(mdev);
82bc0194 3752 return 0;
e9ef7bb6
LE
3753 }
3754 }
3755
3756 /* peer says his disk is inconsistent, while we think it is uptodate,
3757 * and this happens while the peer still thinks we have a sync going on,
3758 * but we think we are already done with the sync.
3759 * We ignore this to avoid flapping pdsk.
3760 * This should not happen, if the peer is a recent version of drbd. */
3761 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3762 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3763 real_peer_disk = D_UP_TO_DATE;
3764
4ac4aada
LE
3765 if (ns.conn == C_WF_REPORT_PARAMS)
3766 ns.conn = C_CONNECTED;
b411b363 3767
67531718
PR
3768 if (peer_state.conn == C_AHEAD)
3769 ns.conn = C_BEHIND;
3770
b411b363
PR
3771 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3772 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3773 int cr; /* consider resync */
3774
3775 /* if we established a new connection */
4ac4aada 3776 cr = (os.conn < C_CONNECTED);
b411b363
PR
3777 /* if we had an established connection
3778 * and one of the nodes newly attaches a disk */
4ac4aada 3779 cr |= (os.conn == C_CONNECTED &&
b411b363 3780 (peer_state.disk == D_NEGOTIATING ||
4ac4aada 3781 os.disk == D_NEGOTIATING));
b411b363
PR
3782 /* if we have both been inconsistent, and the peer has been
3783 * forced to be UpToDate with --overwrite-data */
3784 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3785 /* if we had been plain connected, and the admin requested to
3786 * start a sync by "invalidate" or "invalidate-remote" */
4ac4aada 3787 cr |= (os.conn == C_CONNECTED &&
b411b363
PR
3788 (peer_state.conn >= C_STARTING_SYNC_S &&
3789 peer_state.conn <= C_WF_BITMAP_T));
3790
3791 if (cr)
4ac4aada 3792 ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
b411b363
PR
3793
3794 put_ldev(mdev);
4ac4aada
LE
3795 if (ns.conn == C_MASK) {
3796 ns.conn = C_CONNECTED;
b411b363 3797 if (mdev->state.disk == D_NEGOTIATING) {
82f59cc6 3798 drbd_force_state(mdev, NS(disk, D_FAILED));
b411b363
PR
3799 } else if (peer_state.disk == D_NEGOTIATING) {
3800 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3801 peer_state.disk = D_DISKLESS;
580b9767 3802 real_peer_disk = D_DISKLESS;
b411b363 3803 } else {
8169e41b 3804 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
82bc0194 3805 return -EIO;
4ac4aada 3806 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
38fa9988 3807 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3808 return -EIO;
b411b363
PR
3809 }
3810 }
3811 }
3812
87eeee41 3813 spin_lock_irq(&mdev->tconn->req_lock);
78bae59b 3814 if (os.i != drbd_read_state(mdev).i)
b411b363
PR
3815 goto retry;
3816 clear_bit(CONSIDER_RESYNC, &mdev->flags);
b411b363
PR
3817 ns.peer = peer_state.role;
3818 ns.pdsk = real_peer_disk;
3819 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4ac4aada 3820 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
b411b363 3821 ns.disk = mdev->new_state_tmp.disk;
4ac4aada 3822 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
2aebfabb 3823 if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
481c6f50 3824 test_bit(NEW_CUR_UUID, &mdev->flags)) {
8554df1c 3825 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
481c6f50 3826 for temporal network outages! */
87eeee41 3827 spin_unlock_irq(&mdev->tconn->req_lock);
481c6f50 3828 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
2f5cdd0b 3829 tl_clear(mdev->tconn);
481c6f50
PR
3830 drbd_uuid_new_current(mdev);
3831 clear_bit(NEW_CUR_UUID, &mdev->flags);
38fa9988 3832 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
82bc0194 3833 return -EIO;
481c6f50 3834 }
65d922c3 3835 rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
78bae59b 3836 ns = drbd_read_state(mdev);
87eeee41 3837 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
3838
3839 if (rv < SS_SUCCESS) {
38fa9988 3840 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3841 return -EIO;
b411b363
PR
3842 }
3843
4ac4aada
LE
3844 if (os.conn > C_WF_REPORT_PARAMS) {
3845 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
b411b363
PR
3846 peer_state.disk != D_NEGOTIATING ) {
3847 /* we want resync, peer has not yet decided to sync... */
3848 /* Nowadays only used when forcing a node into primary role and
3849 setting its disk to UpToDate with that */
3850 drbd_send_uuids(mdev);
3851 drbd_send_state(mdev);
3852 }
3853 }
3854
a0095508 3855 mutex_lock(&mdev->tconn->conf_update);
6139f60d 3856 mdev->tconn->net_conf->discard_my_data = 0; /* without copy; single bit op is atomic */
a0095508 3857 mutex_unlock(&mdev->tconn->conf_update);
b411b363
PR
3858
3859 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3860
82bc0194 3861 return 0;
b411b363
PR
3862}
3863
4a76b161 3864static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3865{
4a76b161 3866 struct drbd_conf *mdev;
e658983a 3867 struct p_rs_uuid *p = pi->data;
4a76b161
AG
3868
3869 mdev = vnr_to_mdev(tconn, pi->vnr);
3870 if (!mdev)
3871 return -EIO;
b411b363
PR
3872
3873 wait_event(mdev->misc_wait,
3874 mdev->state.conn == C_WF_SYNC_UUID ||
c4752ef1 3875 mdev->state.conn == C_BEHIND ||
b411b363
PR
3876 mdev->state.conn < C_CONNECTED ||
3877 mdev->state.disk < D_NEGOTIATING);
3878
3879 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3880
b411b363
PR
3881 /* Here the _drbd_uuid_ functions are right, current should
3882 _not_ be rotated into the history */
3883 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3884 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3885 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3886
62b0da3a 3887 drbd_print_uuids(mdev, "updated sync uuid");
b411b363
PR
3888 drbd_start_resync(mdev, C_SYNC_TARGET);
3889
3890 put_ldev(mdev);
3891 } else
3892 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3893
82bc0194 3894 return 0;
b411b363
PR
3895}
3896
2c46407d
AG
3897/**
3898 * receive_bitmap_plain
3899 *
3900 * Return 0 when done, 1 when another iteration is needed, and a negative error
3901 * code upon failure.
3902 */
3903static int
50d0b1ad 3904receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
e658983a 3905 unsigned long *p, struct bm_xfer_ctx *c)
b411b363 3906{
50d0b1ad
AG
3907 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
3908 drbd_header_size(mdev->tconn);
e658983a 3909 unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
50d0b1ad 3910 c->bm_words - c->word_offset);
e658983a 3911 unsigned int want = num_words * sizeof(*p);
2c46407d 3912 int err;
b411b363 3913
50d0b1ad
AG
3914 if (want != size) {
3915 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
2c46407d 3916 return -EIO;
b411b363
PR
3917 }
3918 if (want == 0)
2c46407d 3919 return 0;
e658983a 3920 err = drbd_recv_all(mdev->tconn, p, want);
82bc0194 3921 if (err)
2c46407d 3922 return err;
b411b363 3923
e658983a 3924 drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
b411b363
PR
3925
3926 c->word_offset += num_words;
3927 c->bit_offset = c->word_offset * BITS_PER_LONG;
3928 if (c->bit_offset > c->bm_bits)
3929 c->bit_offset = c->bm_bits;
3930
2c46407d 3931 return 1;
b411b363
PR
3932}
3933
a02d1240
AG
3934static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
3935{
3936 return (enum drbd_bitmap_code)(p->encoding & 0x0f);
3937}
3938
3939static int dcbp_get_start(struct p_compressed_bm *p)
3940{
3941 return (p->encoding & 0x80) != 0;
3942}
3943
3944static int dcbp_get_pad_bits(struct p_compressed_bm *p)
3945{
3946 return (p->encoding >> 4) & 0x7;
3947}
3948
2c46407d
AG
3949/**
3950 * recv_bm_rle_bits
3951 *
3952 * Return 0 when done, 1 when another iteration is needed, and a negative error
3953 * code upon failure.
3954 */
3955static int
b411b363
PR
3956recv_bm_rle_bits(struct drbd_conf *mdev,
3957 struct p_compressed_bm *p,
c6d25cfe
PR
3958 struct bm_xfer_ctx *c,
3959 unsigned int len)
b411b363
PR
3960{
3961 struct bitstream bs;
3962 u64 look_ahead;
3963 u64 rl;
3964 u64 tmp;
3965 unsigned long s = c->bit_offset;
3966 unsigned long e;
a02d1240 3967 int toggle = dcbp_get_start(p);
b411b363
PR
3968 int have;
3969 int bits;
3970
a02d1240 3971 bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
b411b363
PR
3972
3973 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3974 if (bits < 0)
2c46407d 3975 return -EIO;
b411b363
PR
3976
3977 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3978 bits = vli_decode_bits(&rl, look_ahead);
3979 if (bits <= 0)
2c46407d 3980 return -EIO;
b411b363
PR
3981
3982 if (toggle) {
3983 e = s + rl -1;
3984 if (e >= c->bm_bits) {
3985 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
2c46407d 3986 return -EIO;
b411b363
PR
3987 }
3988 _drbd_bm_set_bits(mdev, s, e);
3989 }
3990
3991 if (have < bits) {
3992 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3993 have, bits, look_ahead,
3994 (unsigned int)(bs.cur.b - p->code),
3995 (unsigned int)bs.buf_len);
2c46407d 3996 return -EIO;
b411b363
PR
3997 }
3998 look_ahead >>= bits;
3999 have -= bits;
4000
4001 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4002 if (bits < 0)
2c46407d 4003 return -EIO;
b411b363
PR
4004 look_ahead |= tmp << have;
4005 have += bits;
4006 }
4007
4008 c->bit_offset = s;
4009 bm_xfer_ctx_bit_to_word_offset(c);
4010
2c46407d 4011 return (s != c->bm_bits);
b411b363
PR
4012}
4013
2c46407d
AG
4014/**
4015 * decode_bitmap_c
4016 *
4017 * Return 0 when done, 1 when another iteration is needed, and a negative error
4018 * code upon failure.
4019 */
4020static int
b411b363
PR
4021decode_bitmap_c(struct drbd_conf *mdev,
4022 struct p_compressed_bm *p,
c6d25cfe
PR
4023 struct bm_xfer_ctx *c,
4024 unsigned int len)
b411b363 4025{
a02d1240 4026 if (dcbp_get_code(p) == RLE_VLI_Bits)
e658983a 4027 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
b411b363
PR
4028
4029 /* other variants had been implemented for evaluation,
4030 * but have been dropped as this one turned out to be "best"
4031 * during all our tests. */
4032
4033 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
38fa9988 4034 conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
2c46407d 4035 return -EIO;
b411b363
PR
4036}
4037
4038void INFO_bm_xfer_stats(struct drbd_conf *mdev,
4039 const char *direction, struct bm_xfer_ctx *c)
4040{
4041 /* what would it take to transfer it "plaintext" */
50d0b1ad
AG
4042 unsigned int header_size = drbd_header_size(mdev->tconn);
4043 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4044 unsigned int plain =
4045 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4046 c->bm_words * sizeof(unsigned long);
4047 unsigned int total = c->bytes[0] + c->bytes[1];
4048 unsigned int r;
b411b363
PR
4049
4050 /* total can not be zero. but just in case: */
4051 if (total == 0)
4052 return;
4053
4054 /* don't report if not compressed */
4055 if (total >= plain)
4056 return;
4057
4058 /* total < plain. check for overflow, still */
4059 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4060 : (1000 * total / plain);
4061
4062 if (r > 1000)
4063 r = 1000;
4064
4065 r = 1000 - r;
4066 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4067 "total %u; compression: %u.%u%%\n",
4068 direction,
4069 c->bytes[1], c->packets[1],
4070 c->bytes[0], c->packets[0],
4071 total, r/10, r % 10);
4072}
4073
4074/* Since we are processing the bitfield from lower addresses to higher,
4075 it does not matter if the process it in 32 bit chunks or 64 bit
4076 chunks as long as it is little endian. (Understand it as byte stream,
4077 beginning with the lowest byte...) If we would use big endian
4078 we would need to process it from the highest address to the lowest,
4079 in order to be agnostic to the 32 vs 64 bits issue.
4080
4081 returns 0 on failure, 1 if we successfully received it. */
4a76b161 4082static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4083{
4a76b161 4084 struct drbd_conf *mdev;
b411b363 4085 struct bm_xfer_ctx c;
2c46407d 4086 int err;
4a76b161
AG
4087
4088 mdev = vnr_to_mdev(tconn, pi->vnr);
4089 if (!mdev)
4090 return -EIO;
b411b363 4091
20ceb2b2
LE
4092 drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4093 /* you are supposed to send additional out-of-sync information
4094 * if you actually set bits during this phase */
b411b363 4095
b411b363
PR
4096 c = (struct bm_xfer_ctx) {
4097 .bm_bits = drbd_bm_bits(mdev),
4098 .bm_words = drbd_bm_words(mdev),
4099 };
4100
2c46407d 4101 for(;;) {
e658983a
AG
4102 if (pi->cmd == P_BITMAP)
4103 err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
4104 else if (pi->cmd == P_COMPRESSED_BITMAP) {
b411b363
PR
4105 /* MAYBE: sanity check that we speak proto >= 90,
4106 * and the feature is enabled! */
e658983a 4107 struct p_compressed_bm *p = pi->data;
b411b363 4108
50d0b1ad 4109 if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
b411b363 4110 dev_err(DEV, "ReportCBitmap packet too large\n");
82bc0194 4111 err = -EIO;
b411b363
PR
4112 goto out;
4113 }
e658983a 4114 if (pi->size <= sizeof(*p)) {
e2857216 4115 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
82bc0194 4116 err = -EIO;
78fcbdae 4117 goto out;
b411b363 4118 }
e658983a
AG
4119 err = drbd_recv_all(mdev->tconn, p, pi->size);
4120 if (err)
4121 goto out;
e2857216 4122 err = decode_bitmap_c(mdev, p, &c, pi->size);
b411b363 4123 } else {
e2857216 4124 dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
82bc0194 4125 err = -EIO;
b411b363
PR
4126 goto out;
4127 }
4128
e2857216 4129 c.packets[pi->cmd == P_BITMAP]++;
50d0b1ad 4130 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
b411b363 4131
2c46407d
AG
4132 if (err <= 0) {
4133 if (err < 0)
4134 goto out;
b411b363 4135 break;
2c46407d 4136 }
e2857216 4137 err = drbd_recv_header(mdev->tconn, pi);
82bc0194 4138 if (err)
b411b363 4139 goto out;
2c46407d 4140 }
b411b363
PR
4141
4142 INFO_bm_xfer_stats(mdev, "receive", &c);
4143
4144 if (mdev->state.conn == C_WF_BITMAP_T) {
de1f8e4a
AG
4145 enum drbd_state_rv rv;
4146
82bc0194
AG
4147 err = drbd_send_bitmap(mdev);
4148 if (err)
b411b363
PR
4149 goto out;
4150 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
de1f8e4a
AG
4151 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4152 D_ASSERT(rv == SS_SUCCESS);
b411b363
PR
4153 } else if (mdev->state.conn != C_WF_BITMAP_S) {
4154 /* admin may have requested C_DISCONNECTING,
4155 * other threads may have noticed network errors */
4156 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
4157 drbd_conn_str(mdev->state.conn));
4158 }
82bc0194 4159 err = 0;
b411b363 4160
b411b363 4161 out:
20ceb2b2 4162 drbd_bm_unlock(mdev);
82bc0194 4163 if (!err && mdev->state.conn == C_WF_BITMAP_S)
b411b363 4164 drbd_start_resync(mdev, C_SYNC_SOURCE);
82bc0194 4165 return err;
b411b363
PR
4166}
4167
4a76b161 4168static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4169{
4a76b161 4170 conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
e2857216 4171 pi->cmd, pi->size);
2de876ef 4172
4a76b161 4173 return ignore_remaining_packet(tconn, pi);
2de876ef
PR
4174}
4175
4a76b161 4176static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
0ced55a3 4177{
e7f52dfb
LE
4178 /* Make sure we've acked all the TCP data associated
4179 * with the data requests being unplugged */
4a76b161 4180 drbd_tcp_quickack(tconn->data.socket);
0ced55a3 4181
82bc0194 4182 return 0;
0ced55a3
PR
4183}
4184
4a76b161 4185static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
73a01a18 4186{
4a76b161 4187 struct drbd_conf *mdev;
e658983a 4188 struct p_block_desc *p = pi->data;
4a76b161
AG
4189
4190 mdev = vnr_to_mdev(tconn, pi->vnr);
4191 if (!mdev)
4192 return -EIO;
73a01a18 4193
f735e363
LE
4194 switch (mdev->state.conn) {
4195 case C_WF_SYNC_UUID:
4196 case C_WF_BITMAP_T:
4197 case C_BEHIND:
4198 break;
4199 default:
4200 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4201 drbd_conn_str(mdev->state.conn));
4202 }
4203
73a01a18
PR
4204 drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4205
82bc0194 4206 return 0;
73a01a18
PR
4207}
4208
02918be2
PR
4209struct data_cmd {
4210 int expect_payload;
4211 size_t pkt_size;
4a76b161 4212 int (*fn)(struct drbd_tconn *, struct packet_info *);
02918be2
PR
4213};
4214
4215static struct data_cmd drbd_cmd_handler[] = {
4a76b161
AG
4216 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
4217 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
4218 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4219 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
e658983a
AG
4220 [P_BITMAP] = { 1, 0, receive_bitmap } ,
4221 [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4222 [P_UNPLUG_REMOTE] = { 0, 0, receive_UnplugRemote },
4a76b161
AG
4223 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4224 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
e658983a
AG
4225 [P_SYNC_PARAM] = { 1, 0, receive_SyncParam },
4226 [P_SYNC_PARAM89] = { 1, 0, receive_SyncParam },
4a76b161
AG
4227 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
4228 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
4229 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
4230 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
4231 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
4232 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4233 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4234 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4235 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4236 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
4237 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4238 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
036b17ea 4239 [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
b411b363
PR
4240};
4241
eefc2f7d 4242static void drbdd(struct drbd_tconn *tconn)
b411b363 4243{
77351055 4244 struct packet_info pi;
02918be2 4245 size_t shs; /* sub header size */
82bc0194 4246 int err;
b411b363 4247
eefc2f7d 4248 while (get_t_state(&tconn->receiver) == RUNNING) {
deebe195
AG
4249 struct data_cmd *cmd;
4250
eefc2f7d 4251 drbd_thread_current_set_cpu(&tconn->receiver);
69bc7bc3 4252 if (drbd_recv_header(tconn, &pi))
02918be2 4253 goto err_out;
b411b363 4254
deebe195 4255 cmd = &drbd_cmd_handler[pi.cmd];
4a76b161 4256 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
2fcb8f30
AG
4257 conn_err(tconn, "Unexpected data packet %s (0x%04x)",
4258 cmdname(pi.cmd), pi.cmd);
02918be2 4259 goto err_out;
0b33a916 4260 }
b411b363 4261
e658983a
AG
4262 shs = cmd->pkt_size;
4263 if (pi.size > shs && !cmd->expect_payload) {
2fcb8f30
AG
4264 conn_err(tconn, "No payload expected %s l:%d\n",
4265 cmdname(pi.cmd), pi.size);
02918be2 4266 goto err_out;
b411b363 4267 }
b411b363 4268
c13f7e1a 4269 if (shs) {
e658983a 4270 err = drbd_recv_all_warn(tconn, pi.data, shs);
a5c31904 4271 if (err)
c13f7e1a 4272 goto err_out;
e2857216 4273 pi.size -= shs;
c13f7e1a
LE
4274 }
4275
4a76b161
AG
4276 err = cmd->fn(tconn, &pi);
4277 if (err) {
9f5bdc33
AG
4278 conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4279 cmdname(pi.cmd), err, pi.size);
02918be2 4280 goto err_out;
b411b363
PR
4281 }
4282 }
82bc0194 4283 return;
b411b363 4284
82bc0194
AG
4285 err_out:
4286 conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
b411b363
PR
4287}
4288
0e29d163 4289void conn_flush_workqueue(struct drbd_tconn *tconn)
b411b363
PR
4290{
4291 struct drbd_wq_barrier barr;
4292
4293 barr.w.cb = w_prev_work_done;
0e29d163 4294 barr.w.tconn = tconn;
b411b363 4295 init_completion(&barr.done);
0e29d163 4296 drbd_queue_work(&tconn->data.work, &barr.w);
b411b363
PR
4297 wait_for_completion(&barr.done);
4298}
4299
81fa2e67 4300static void conn_disconnect(struct drbd_tconn *tconn)
b411b363 4301{
c141ebda 4302 struct drbd_conf *mdev;
bbeb641c 4303 enum drbd_conns oc;
c141ebda 4304 int vnr, rv = SS_UNKNOWN_ERROR;
b411b363 4305
bbeb641c 4306 if (tconn->cstate == C_STANDALONE)
b411b363 4307 return;
b411b363
PR
4308
4309 /* asender does not clean up anything. it must not interfere, either */
360cc740
PR
4310 drbd_thread_stop(&tconn->asender);
4311 drbd_free_sock(tconn);
4312
c141ebda
PR
4313 rcu_read_lock();
4314 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
4315 kref_get(&mdev->kref);
4316 rcu_read_unlock();
4317 drbd_disconnected(mdev);
4318 kref_put(&mdev->kref, &drbd_minor_destroy);
4319 rcu_read_lock();
4320 }
4321 rcu_read_unlock();
4322
360cc740
PR
4323 conn_info(tconn, "Connection closed\n");
4324
cb703454
PR
4325 if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4326 conn_try_outdate_peer_async(tconn);
4327
360cc740 4328 spin_lock_irq(&tconn->req_lock);
bbeb641c
PR
4329 oc = tconn->cstate;
4330 if (oc >= C_UNCONNECTED)
4331 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4332
360cc740
PR
4333 spin_unlock_irq(&tconn->req_lock);
4334
f3dfa40a 4335 if (oc == C_DISCONNECTING)
d9cc6e23 4336 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
360cc740
PR
4337}
4338
c141ebda 4339static int drbd_disconnected(struct drbd_conf *mdev)
360cc740 4340{
360cc740
PR
4341 enum drbd_fencing_p fp;
4342 unsigned int i;
b411b363 4343
85719573 4344 /* wait for current activity to cease. */
87eeee41 4345 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
4346 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4347 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4348 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
87eeee41 4349 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
4350
4351 /* We do not have data structures that would allow us to
4352 * get the rs_pending_cnt down to 0 again.
4353 * * On C_SYNC_TARGET we do not have any data structures describing
4354 * the pending RSDataRequest's we have sent.
4355 * * On C_SYNC_SOURCE there is no data structure that tracks
4356 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4357 * And no, it is not the sum of the reference counts in the
4358 * resync_LRU. The resync_LRU tracks the whole operation including
4359 * the disk-IO, while the rs_pending_cnt only tracks the blocks
4360 * on the fly. */
4361 drbd_rs_cancel_all(mdev);
4362 mdev->rs_total = 0;
4363 mdev->rs_failed = 0;
4364 atomic_set(&mdev->rs_pending_cnt, 0);
4365 wake_up(&mdev->misc_wait);
4366
b411b363 4367 del_timer_sync(&mdev->resync_timer);
b411b363
PR
4368 resync_timer_fn((unsigned long)mdev);
4369
b411b363
PR
4370 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4371 * w_make_resync_request etc. which may still be on the worker queue
4372 * to be "canceled" */
a21e9298 4373 drbd_flush_workqueue(mdev);
b411b363 4374
a990be46 4375 drbd_finish_peer_reqs(mdev);
b411b363
PR
4376
4377 kfree(mdev->p_uuid);
4378 mdev->p_uuid = NULL;
4379
2aebfabb 4380 if (!drbd_suspended(mdev))
2f5cdd0b 4381 tl_clear(mdev->tconn);
b411b363 4382
b411b363
PR
4383 drbd_md_sync(mdev);
4384
4385 fp = FP_DONT_CARE;
4386 if (get_ldev(mdev)) {
daeda1cc
PR
4387 rcu_read_lock();
4388 fp = rcu_dereference(mdev->ldev->disk_conf)->fencing;
4389 rcu_read_unlock();
b411b363
PR
4390 put_ldev(mdev);
4391 }
4392
20ceb2b2
LE
4393 /* serialize with bitmap writeout triggered by the state change,
4394 * if any. */
4395 wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4396
b411b363
PR
4397 /* tcp_close and release of sendpage pages can be deferred. I don't
4398 * want to use SO_LINGER, because apparently it can be deferred for
4399 * more than 20 seconds (longest time I checked).
4400 *
4401 * Actually we don't care for exactly when the network stack does its
4402 * put_page(), but release our reference on these pages right here.
4403 */
7721f567 4404 i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
b411b363
PR
4405 if (i)
4406 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
435f0740
LE
4407 i = atomic_read(&mdev->pp_in_use_by_net);
4408 if (i)
4409 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
b411b363
PR
4410 i = atomic_read(&mdev->pp_in_use);
4411 if (i)
45bb912b 4412 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
b411b363
PR
4413
4414 D_ASSERT(list_empty(&mdev->read_ee));
4415 D_ASSERT(list_empty(&mdev->active_ee));
4416 D_ASSERT(list_empty(&mdev->sync_ee));
4417 D_ASSERT(list_empty(&mdev->done_ee));
4418
4419 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4420 atomic_set(&mdev->current_epoch->epoch_size, 0);
4421 D_ASSERT(list_empty(&mdev->current_epoch->list));
360cc740
PR
4422
4423 return 0;
b411b363
PR
4424}
4425
4426/*
4427 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4428 * we can agree on is stored in agreed_pro_version.
4429 *
4430 * feature flags and the reserved array should be enough room for future
4431 * enhancements of the handshake protocol, and possible plugins...
4432 *
4433 * for now, they are expected to be zero, but ignored.
4434 */
6038178e 4435static int drbd_send_features(struct drbd_tconn *tconn)
b411b363 4436{
9f5bdc33
AG
4437 struct drbd_socket *sock;
4438 struct p_connection_features *p;
b411b363 4439
9f5bdc33
AG
4440 sock = &tconn->data;
4441 p = conn_prepare_command(tconn, sock);
4442 if (!p)
e8d17b01 4443 return -EIO;
b411b363
PR
4444 memset(p, 0, sizeof(*p));
4445 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4446 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
9f5bdc33 4447 return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
b411b363
PR
4448}
4449
4450/*
4451 * return values:
4452 * 1 yes, we have a valid connection
4453 * 0 oops, did not work out, please try again
4454 * -1 peer talks different language,
4455 * no point in trying again, please go standalone.
4456 */
6038178e 4457static int drbd_do_features(struct drbd_tconn *tconn)
b411b363 4458{
65d11ed6 4459 /* ASSERT current == tconn->receiver ... */
e658983a
AG
4460 struct p_connection_features *p;
4461 const int expect = sizeof(struct p_connection_features);
77351055 4462 struct packet_info pi;
a5c31904 4463 int err;
b411b363 4464
6038178e 4465 err = drbd_send_features(tconn);
e8d17b01 4466 if (err)
b411b363
PR
4467 return 0;
4468
69bc7bc3
AG
4469 err = drbd_recv_header(tconn, &pi);
4470 if (err)
b411b363
PR
4471 return 0;
4472
6038178e
AG
4473 if (pi.cmd != P_CONNECTION_FEATURES) {
4474 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
2fcb8f30 4475 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4476 return -1;
4477 }
4478
77351055 4479 if (pi.size != expect) {
6038178e 4480 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
77351055 4481 expect, pi.size);
b411b363
PR
4482 return -1;
4483 }
4484
e658983a
AG
4485 p = pi.data;
4486 err = drbd_recv_all_warn(tconn, p, expect);
a5c31904 4487 if (err)
b411b363 4488 return 0;
b411b363 4489
b411b363
PR
4490 p->protocol_min = be32_to_cpu(p->protocol_min);
4491 p->protocol_max = be32_to_cpu(p->protocol_max);
4492 if (p->protocol_max == 0)
4493 p->protocol_max = p->protocol_min;
4494
4495 if (PRO_VERSION_MAX < p->protocol_min ||
4496 PRO_VERSION_MIN > p->protocol_max)
4497 goto incompat;
4498
65d11ed6 4499 tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
b411b363 4500
65d11ed6
PR
4501 conn_info(tconn, "Handshake successful: "
4502 "Agreed network protocol version %d\n", tconn->agreed_pro_version);
b411b363
PR
4503
4504 return 1;
4505
4506 incompat:
65d11ed6 4507 conn_err(tconn, "incompatible DRBD dialects: "
b411b363
PR
4508 "I support %d-%d, peer supports %d-%d\n",
4509 PRO_VERSION_MIN, PRO_VERSION_MAX,
4510 p->protocol_min, p->protocol_max);
4511 return -1;
4512}
4513
4514#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
13e6037d 4515static int drbd_do_auth(struct drbd_tconn *tconn)
b411b363
PR
4516{
4517 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4518 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
b10d96cb 4519 return -1;
b411b363
PR
4520}
4521#else
4522#define CHALLENGE_LEN 64
b10d96cb
JT
4523
4524/* Return value:
4525 1 - auth succeeded,
4526 0 - failed, try again (network error),
4527 -1 - auth failed, don't try again.
4528*/
4529
13e6037d 4530static int drbd_do_auth(struct drbd_tconn *tconn)
b411b363 4531{
9f5bdc33 4532 struct drbd_socket *sock;
b411b363
PR
4533 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4534 struct scatterlist sg;
4535 char *response = NULL;
4536 char *right_response = NULL;
4537 char *peers_ch = NULL;
44ed167d
PR
4538 unsigned int key_len;
4539 char secret[SHARED_SECRET_MAX]; /* 64 byte */
b411b363
PR
4540 unsigned int resp_size;
4541 struct hash_desc desc;
77351055 4542 struct packet_info pi;
44ed167d 4543 struct net_conf *nc;
69bc7bc3 4544 int err, rv;
b411b363 4545
9f5bdc33
AG
4546 /* FIXME: Put the challenge/response into the preallocated socket buffer. */
4547
44ed167d
PR
4548 rcu_read_lock();
4549 nc = rcu_dereference(tconn->net_conf);
4550 key_len = strlen(nc->shared_secret);
4551 memcpy(secret, nc->shared_secret, key_len);
4552 rcu_read_unlock();
4553
13e6037d 4554 desc.tfm = tconn->cram_hmac_tfm;
b411b363
PR
4555 desc.flags = 0;
4556
44ed167d 4557 rv = crypto_hash_setkey(tconn->cram_hmac_tfm, (u8 *)secret, key_len);
b411b363 4558 if (rv) {
13e6037d 4559 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
b10d96cb 4560 rv = -1;
b411b363
PR
4561 goto fail;
4562 }
4563
4564 get_random_bytes(my_challenge, CHALLENGE_LEN);
4565
9f5bdc33
AG
4566 sock = &tconn->data;
4567 if (!conn_prepare_command(tconn, sock)) {
4568 rv = 0;
4569 goto fail;
4570 }
e658983a 4571 rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
9f5bdc33 4572 my_challenge, CHALLENGE_LEN);
b411b363
PR
4573 if (!rv)
4574 goto fail;
4575
69bc7bc3
AG
4576 err = drbd_recv_header(tconn, &pi);
4577 if (err) {
4578 rv = 0;
b411b363 4579 goto fail;
69bc7bc3 4580 }
b411b363 4581
77351055 4582 if (pi.cmd != P_AUTH_CHALLENGE) {
13e6037d 4583 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
2fcb8f30 4584 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4585 rv = 0;
4586 goto fail;
4587 }
4588
77351055 4589 if (pi.size > CHALLENGE_LEN * 2) {
13e6037d 4590 conn_err(tconn, "expected AuthChallenge payload too big.\n");
b10d96cb 4591 rv = -1;
b411b363
PR
4592 goto fail;
4593 }
4594
77351055 4595 peers_ch = kmalloc(pi.size, GFP_NOIO);
b411b363 4596 if (peers_ch == NULL) {
13e6037d 4597 conn_err(tconn, "kmalloc of peers_ch failed\n");
b10d96cb 4598 rv = -1;
b411b363
PR
4599 goto fail;
4600 }
4601
a5c31904
AG
4602 err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4603 if (err) {
b411b363
PR
4604 rv = 0;
4605 goto fail;
4606 }
4607
13e6037d 4608 resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
b411b363
PR
4609 response = kmalloc(resp_size, GFP_NOIO);
4610 if (response == NULL) {
13e6037d 4611 conn_err(tconn, "kmalloc of response failed\n");
b10d96cb 4612 rv = -1;
b411b363
PR
4613 goto fail;
4614 }
4615
4616 sg_init_table(&sg, 1);
77351055 4617 sg_set_buf(&sg, peers_ch, pi.size);
b411b363
PR
4618
4619 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4620 if (rv) {
13e6037d 4621 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4622 rv = -1;
b411b363
PR
4623 goto fail;
4624 }
4625
9f5bdc33
AG
4626 if (!conn_prepare_command(tconn, sock)) {
4627 rv = 0;
4628 goto fail;
4629 }
e658983a 4630 rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
9f5bdc33 4631 response, resp_size);
b411b363
PR
4632 if (!rv)
4633 goto fail;
4634
69bc7bc3
AG
4635 err = drbd_recv_header(tconn, &pi);
4636 if (err) {
4637 rv = 0;
b411b363 4638 goto fail;
69bc7bc3 4639 }
b411b363 4640
77351055 4641 if (pi.cmd != P_AUTH_RESPONSE) {
13e6037d 4642 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
2fcb8f30 4643 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4644 rv = 0;
4645 goto fail;
4646 }
4647
77351055 4648 if (pi.size != resp_size) {
13e6037d 4649 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
b411b363
PR
4650 rv = 0;
4651 goto fail;
4652 }
4653
a5c31904
AG
4654 err = drbd_recv_all_warn(tconn, response , resp_size);
4655 if (err) {
b411b363
PR
4656 rv = 0;
4657 goto fail;
4658 }
4659
4660 right_response = kmalloc(resp_size, GFP_NOIO);
2d1ee87d 4661 if (right_response == NULL) {
13e6037d 4662 conn_err(tconn, "kmalloc of right_response failed\n");
b10d96cb 4663 rv = -1;
b411b363
PR
4664 goto fail;
4665 }
4666
4667 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4668
4669 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4670 if (rv) {
13e6037d 4671 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4672 rv = -1;
b411b363
PR
4673 goto fail;
4674 }
4675
4676 rv = !memcmp(response, right_response, resp_size);
4677
4678 if (rv)
44ed167d
PR
4679 conn_info(tconn, "Peer authenticated using %d bytes HMAC\n",
4680 resp_size);
b10d96cb
JT
4681 else
4682 rv = -1;
b411b363
PR
4683
4684 fail:
4685 kfree(peers_ch);
4686 kfree(response);
4687 kfree(right_response);
4688
4689 return rv;
4690}
4691#endif
4692
4693int drbdd_init(struct drbd_thread *thi)
4694{
392c8801 4695 struct drbd_tconn *tconn = thi->tconn;
b411b363
PR
4696 int h;
4697
4d641dd7 4698 conn_info(tconn, "receiver (re)started\n");
b411b363
PR
4699
4700 do {
81fa2e67 4701 h = conn_connect(tconn);
b411b363 4702 if (h == 0) {
81fa2e67 4703 conn_disconnect(tconn);
20ee6390 4704 schedule_timeout_interruptible(HZ);
b411b363
PR
4705 }
4706 if (h == -1) {
4d641dd7 4707 conn_warn(tconn, "Discarding network configuration.\n");
bbeb641c 4708 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
4709 }
4710 } while (h == 0);
4711
91fd4dad
PR
4712 if (h > 0)
4713 drbdd(tconn);
b411b363 4714
81fa2e67 4715 conn_disconnect(tconn);
b411b363 4716
4d641dd7 4717 conn_info(tconn, "receiver terminated\n");
b411b363
PR
4718 return 0;
4719}
4720
4721/* ********* acknowledge sender ******** */
4722
e05e1e59 4723static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
e4f78ede 4724{
e658983a 4725 struct p_req_state_reply *p = pi->data;
e4f78ede
PR
4726 int retcode = be32_to_cpu(p->retcode);
4727
4728 if (retcode >= SS_SUCCESS) {
4729 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4730 } else {
4731 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4732 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4733 drbd_set_st_err_str(retcode), retcode);
4734 }
4735 wake_up(&tconn->ping_wait);
4736
2735a594 4737 return 0;
e4f78ede
PR
4738}
4739
1952e916 4740static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4741{
1952e916 4742 struct drbd_conf *mdev;
e658983a 4743 struct p_req_state_reply *p = pi->data;
b411b363
PR
4744 int retcode = be32_to_cpu(p->retcode);
4745
1952e916
AG
4746 mdev = vnr_to_mdev(tconn, pi->vnr);
4747 if (!mdev)
2735a594 4748 return -EIO;
1952e916 4749
e4f78ede
PR
4750 if (retcode >= SS_SUCCESS) {
4751 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4752 } else {
4753 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4754 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4755 drbd_set_st_err_str(retcode), retcode);
b411b363 4756 }
e4f78ede
PR
4757 wake_up(&mdev->state_wait);
4758
2735a594 4759 return 0;
b411b363
PR
4760}
4761
e05e1e59 4762static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4763{
2735a594 4764 return drbd_send_ping_ack(tconn);
b411b363
PR
4765
4766}
4767
e05e1e59 4768static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363
PR
4769{
4770 /* restore idle timeout */
2a67d8b9
PR
4771 tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4772 if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4773 wake_up(&tconn->ping_wait);
b411b363 4774
2735a594 4775 return 0;
b411b363
PR
4776}
4777
1952e916 4778static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4779{
1952e916 4780 struct drbd_conf *mdev;
e658983a 4781 struct p_block_ack *p = pi->data;
b411b363
PR
4782 sector_t sector = be64_to_cpu(p->sector);
4783 int blksize = be32_to_cpu(p->blksize);
4784
1952e916
AG
4785 mdev = vnr_to_mdev(tconn, pi->vnr);
4786 if (!mdev)
2735a594 4787 return -EIO;
1952e916 4788
31890f4a 4789 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
b411b363
PR
4790
4791 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4792
1d53f09e
LE
4793 if (get_ldev(mdev)) {
4794 drbd_rs_complete_io(mdev, sector);
4795 drbd_set_in_sync(mdev, sector, blksize);
4796 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4797 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4798 put_ldev(mdev);
4799 }
b411b363 4800 dec_rs_pending(mdev);
778f271d 4801 atomic_add(blksize >> 9, &mdev->rs_sect_in);
b411b363 4802
2735a594 4803 return 0;
b411b363
PR
4804}
4805
bc9c5c41
AG
4806static int
4807validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4808 struct rb_root *root, const char *func,
4809 enum drbd_req_event what, bool missing_ok)
b411b363
PR
4810{
4811 struct drbd_request *req;
4812 struct bio_and_error m;
4813
87eeee41 4814 spin_lock_irq(&mdev->tconn->req_lock);
bc9c5c41 4815 req = find_request(mdev, root, id, sector, missing_ok, func);
b411b363 4816 if (unlikely(!req)) {
87eeee41 4817 spin_unlock_irq(&mdev->tconn->req_lock);
85997675 4818 return -EIO;
b411b363
PR
4819 }
4820 __req_mod(req, what, &m);
87eeee41 4821 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
4822
4823 if (m.bio)
4824 complete_master_bio(mdev, &m);
85997675 4825 return 0;
b411b363
PR
4826}
4827
1952e916 4828static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4829{
1952e916 4830 struct drbd_conf *mdev;
e658983a 4831 struct p_block_ack *p = pi->data;
b411b363
PR
4832 sector_t sector = be64_to_cpu(p->sector);
4833 int blksize = be32_to_cpu(p->blksize);
4834 enum drbd_req_event what;
4835
1952e916
AG
4836 mdev = vnr_to_mdev(tconn, pi->vnr);
4837 if (!mdev)
2735a594 4838 return -EIO;
1952e916 4839
b411b363
PR
4840 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4841
579b57ed 4842 if (p->block_id == ID_SYNCER) {
b411b363
PR
4843 drbd_set_in_sync(mdev, sector, blksize);
4844 dec_rs_pending(mdev);
2735a594 4845 return 0;
b411b363 4846 }
e05e1e59 4847 switch (pi->cmd) {
b411b363 4848 case P_RS_WRITE_ACK:
8554df1c 4849 what = WRITE_ACKED_BY_PEER_AND_SIS;
b411b363
PR
4850 break;
4851 case P_WRITE_ACK:
8554df1c 4852 what = WRITE_ACKED_BY_PEER;
b411b363
PR
4853 break;
4854 case P_RECV_ACK:
8554df1c 4855 what = RECV_ACKED_BY_PEER;
b411b363 4856 break;
7be8da07 4857 case P_DISCARD_WRITE:
7be8da07
AG
4858 what = DISCARD_WRITE;
4859 break;
4860 case P_RETRY_WRITE:
7be8da07 4861 what = POSTPONE_WRITE;
b411b363
PR
4862 break;
4863 default:
2735a594 4864 BUG();
b411b363
PR
4865 }
4866
2735a594
AG
4867 return validate_req_change_req_state(mdev, p->block_id, sector,
4868 &mdev->write_requests, __func__,
4869 what, false);
b411b363
PR
4870}
4871
1952e916 4872static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4873{
1952e916 4874 struct drbd_conf *mdev;
e658983a 4875 struct p_block_ack *p = pi->data;
b411b363 4876 sector_t sector = be64_to_cpu(p->sector);
2deb8336 4877 int size = be32_to_cpu(p->blksize);
85997675 4878 int err;
b411b363 4879
1952e916
AG
4880 mdev = vnr_to_mdev(tconn, pi->vnr);
4881 if (!mdev)
2735a594 4882 return -EIO;
1952e916 4883
b411b363
PR
4884 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4885
579b57ed 4886 if (p->block_id == ID_SYNCER) {
b411b363
PR
4887 dec_rs_pending(mdev);
4888 drbd_rs_failed_io(mdev, sector, size);
2735a594 4889 return 0;
b411b363 4890 }
2deb8336 4891
85997675
AG
4892 err = validate_req_change_req_state(mdev, p->block_id, sector,
4893 &mdev->write_requests, __func__,
303d1448 4894 NEG_ACKED, true);
85997675 4895 if (err) {
c3afd8f5
AG
4896 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4897 The master bio might already be completed, therefore the
4898 request is no longer in the collision hash. */
4899 /* In Protocol B we might already have got a P_RECV_ACK
4900 but then get a P_NEG_ACK afterwards. */
c3afd8f5 4901 drbd_set_out_of_sync(mdev, sector, size);
2deb8336 4902 }
2735a594 4903 return 0;
b411b363
PR
4904}
4905
1952e916 4906static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4907{
1952e916 4908 struct drbd_conf *mdev;
e658983a 4909 struct p_block_ack *p = pi->data;
b411b363
PR
4910 sector_t sector = be64_to_cpu(p->sector);
4911
1952e916
AG
4912 mdev = vnr_to_mdev(tconn, pi->vnr);
4913 if (!mdev)
2735a594 4914 return -EIO;
1952e916 4915
b411b363 4916 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
7be8da07 4917
b411b363
PR
4918 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4919 (unsigned long long)sector, be32_to_cpu(p->blksize));
4920
2735a594
AG
4921 return validate_req_change_req_state(mdev, p->block_id, sector,
4922 &mdev->read_requests, __func__,
4923 NEG_ACKED, false);
b411b363
PR
4924}
4925
1952e916 4926static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4927{
1952e916 4928 struct drbd_conf *mdev;
b411b363
PR
4929 sector_t sector;
4930 int size;
e658983a 4931 struct p_block_ack *p = pi->data;
1952e916
AG
4932
4933 mdev = vnr_to_mdev(tconn, pi->vnr);
4934 if (!mdev)
2735a594 4935 return -EIO;
b411b363
PR
4936
4937 sector = be64_to_cpu(p->sector);
4938 size = be32_to_cpu(p->blksize);
b411b363
PR
4939
4940 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4941
4942 dec_rs_pending(mdev);
4943
4944 if (get_ldev_if_state(mdev, D_FAILED)) {
4945 drbd_rs_complete_io(mdev, sector);
e05e1e59 4946 switch (pi->cmd) {
d612d309
PR
4947 case P_NEG_RS_DREPLY:
4948 drbd_rs_failed_io(mdev, sector, size);
4949 case P_RS_CANCEL:
4950 break;
4951 default:
2735a594 4952 BUG();
d612d309 4953 }
b411b363
PR
4954 put_ldev(mdev);
4955 }
4956
2735a594 4957 return 0;
b411b363
PR
4958}
4959
1952e916 4960static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4961{
1952e916 4962 struct drbd_conf *mdev;
e658983a 4963 struct p_barrier_ack *p = pi->data;
1952e916
AG
4964
4965 mdev = vnr_to_mdev(tconn, pi->vnr);
4966 if (!mdev)
2735a594 4967 return -EIO;
b411b363 4968
2f5cdd0b 4969 tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
b411b363 4970
c4752ef1
PR
4971 if (mdev->state.conn == C_AHEAD &&
4972 atomic_read(&mdev->ap_in_flight) == 0 &&
370a43e7
PR
4973 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4974 mdev->start_resync_timer.expires = jiffies + HZ;
4975 add_timer(&mdev->start_resync_timer);
c4752ef1
PR
4976 }
4977
2735a594 4978 return 0;
b411b363
PR
4979}
4980
1952e916 4981static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4982{
1952e916 4983 struct drbd_conf *mdev;
e658983a 4984 struct p_block_ack *p = pi->data;
b411b363
PR
4985 struct drbd_work *w;
4986 sector_t sector;
4987 int size;
4988
1952e916
AG
4989 mdev = vnr_to_mdev(tconn, pi->vnr);
4990 if (!mdev)
2735a594 4991 return -EIO;
1952e916 4992
b411b363
PR
4993 sector = be64_to_cpu(p->sector);
4994 size = be32_to_cpu(p->blksize);
4995
4996 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4997
4998 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
8f7bed77 4999 drbd_ov_out_of_sync_found(mdev, sector, size);
b411b363 5000 else
8f7bed77 5001 ov_out_of_sync_print(mdev);
b411b363 5002
1d53f09e 5003 if (!get_ldev(mdev))
2735a594 5004 return 0;
1d53f09e 5005
b411b363
PR
5006 drbd_rs_complete_io(mdev, sector);
5007 dec_rs_pending(mdev);
5008
ea5442af
LE
5009 --mdev->ov_left;
5010
5011 /* let's advance progress step marks only for every other megabyte */
5012 if ((mdev->ov_left & 0x200) == 0x200)
5013 drbd_advance_rs_marks(mdev, mdev->ov_left);
5014
5015 if (mdev->ov_left == 0) {
b411b363
PR
5016 w = kmalloc(sizeof(*w), GFP_NOIO);
5017 if (w) {
5018 w->cb = w_ov_finished;
a21e9298 5019 w->mdev = mdev;
e42325a5 5020 drbd_queue_work_front(&mdev->tconn->data.work, w);
b411b363
PR
5021 } else {
5022 dev_err(DEV, "kmalloc(w) failed.");
8f7bed77 5023 ov_out_of_sync_print(mdev);
b411b363
PR
5024 drbd_resync_finished(mdev);
5025 }
5026 }
1d53f09e 5027 put_ldev(mdev);
2735a594 5028 return 0;
b411b363
PR
5029}
5030
1952e916 5031static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
0ced55a3 5032{
2735a594 5033 return 0;
0ced55a3
PR
5034}
5035
a990be46 5036static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
32862ec7 5037{
082a3439 5038 struct drbd_conf *mdev;
c141ebda 5039 int vnr, not_empty = 0;
32862ec7
PR
5040
5041 do {
5042 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5043 flush_signals(current);
c141ebda
PR
5044
5045 rcu_read_lock();
5046 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5047 kref_get(&mdev->kref);
5048 rcu_read_unlock();
d3fcb490 5049 if (drbd_finish_peer_reqs(mdev)) {
c141ebda
PR
5050 kref_put(&mdev->kref, &drbd_minor_destroy);
5051 return 1;
d3fcb490 5052 }
c141ebda
PR
5053 kref_put(&mdev->kref, &drbd_minor_destroy);
5054 rcu_read_lock();
082a3439 5055 }
32862ec7 5056 set_bit(SIGNAL_ASENDER, &tconn->flags);
082a3439
PR
5057
5058 spin_lock_irq(&tconn->req_lock);
c141ebda 5059 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
082a3439
PR
5060 not_empty = !list_empty(&mdev->done_ee);
5061 if (not_empty)
5062 break;
5063 }
5064 spin_unlock_irq(&tconn->req_lock);
c141ebda 5065 rcu_read_unlock();
32862ec7
PR
5066 } while (not_empty);
5067
5068 return 0;
5069}
5070
7201b972
AG
5071struct asender_cmd {
5072 size_t pkt_size;
1952e916 5073 int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
7201b972
AG
5074};
5075
5076static struct asender_cmd asender_tbl[] = {
e658983a
AG
5077 [P_PING] = { 0, got_Ping },
5078 [P_PING_ACK] = { 0, got_PingAck },
1952e916
AG
5079 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5080 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5081 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5082 [P_DISCARD_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
5083 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
5084 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
5085 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply },
5086 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
5087 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
5088 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5089 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
5090 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
5091 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply },
5092 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5093 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
7201b972
AG
5094};
5095
b411b363
PR
5096int drbd_asender(struct drbd_thread *thi)
5097{
392c8801 5098 struct drbd_tconn *tconn = thi->tconn;
b411b363 5099 struct asender_cmd *cmd = NULL;
77351055 5100 struct packet_info pi;
257d0af6 5101 int rv;
e658983a 5102 void *buf = tconn->meta.rbuf;
b411b363 5103 int received = 0;
52b061a4
AG
5104 unsigned int header_size = drbd_header_size(tconn);
5105 int expect = header_size;
44ed167d
PR
5106 bool ping_timeout_active = false;
5107 struct net_conf *nc;
bb77d34e 5108 int ping_timeo, tcp_cork, ping_int;
b411b363 5109
b411b363
PR
5110 current->policy = SCHED_RR; /* Make this a realtime task! */
5111 current->rt_priority = 2; /* more important than all other tasks */
5112
e77a0a5c 5113 while (get_t_state(thi) == RUNNING) {
80822284 5114 drbd_thread_current_set_cpu(thi);
44ed167d
PR
5115
5116 rcu_read_lock();
5117 nc = rcu_dereference(tconn->net_conf);
5118 ping_timeo = nc->ping_timeo;
bb77d34e 5119 tcp_cork = nc->tcp_cork;
44ed167d
PR
5120 ping_int = nc->ping_int;
5121 rcu_read_unlock();
5122
32862ec7 5123 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
a17647aa 5124 if (drbd_send_ping(tconn)) {
32862ec7 5125 conn_err(tconn, "drbd_send_ping has failed\n");
841ce241
AG
5126 goto reconnect;
5127 }
44ed167d
PR
5128 tconn->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5129 ping_timeout_active = true;
b411b363
PR
5130 }
5131
32862ec7
PR
5132 /* TODO: conditionally cork; it may hurt latency if we cork without
5133 much to send */
bb77d34e 5134 if (tcp_cork)
32862ec7 5135 drbd_tcp_cork(tconn->meta.socket);
a990be46
AG
5136 if (tconn_finish_peer_reqs(tconn)) {
5137 conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
32862ec7 5138 goto reconnect;
082a3439 5139 }
b411b363 5140 /* but unconditionally uncork unless disabled */
bb77d34e 5141 if (tcp_cork)
32862ec7 5142 drbd_tcp_uncork(tconn->meta.socket);
b411b363
PR
5143
5144 /* short circuit, recv_msg would return EINTR anyways. */
5145 if (signal_pending(current))
5146 continue;
5147
32862ec7
PR
5148 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
5149 clear_bit(SIGNAL_ASENDER, &tconn->flags);
b411b363
PR
5150
5151 flush_signals(current);
5152
5153 /* Note:
5154 * -EINTR (on meta) we got a signal
5155 * -EAGAIN (on meta) rcvtimeo expired
5156 * -ECONNRESET other side closed the connection
5157 * -ERESTARTSYS (on data) we got a signal
5158 * rv < 0 other than above: unexpected error!
5159 * rv == expected: full header or command
5160 * rv < expected: "woken" by signal during receive
5161 * rv == 0 : "connection shut down by peer"
5162 */
5163 if (likely(rv > 0)) {
5164 received += rv;
5165 buf += rv;
5166 } else if (rv == 0) {
32862ec7 5167 conn_err(tconn, "meta connection shut down by peer.\n");
b411b363
PR
5168 goto reconnect;
5169 } else if (rv == -EAGAIN) {
cb6518cb
LE
5170 /* If the data socket received something meanwhile,
5171 * that is good enough: peer is still alive. */
32862ec7
PR
5172 if (time_after(tconn->last_received,
5173 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
cb6518cb 5174 continue;
f36af18c 5175 if (ping_timeout_active) {
32862ec7 5176 conn_err(tconn, "PingAck did not arrive in time.\n");
b411b363
PR
5177 goto reconnect;
5178 }
32862ec7 5179 set_bit(SEND_PING, &tconn->flags);
b411b363
PR
5180 continue;
5181 } else if (rv == -EINTR) {
5182 continue;
5183 } else {
32862ec7 5184 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
b411b363
PR
5185 goto reconnect;
5186 }
5187
5188 if (received == expect && cmd == NULL) {
e658983a 5189 if (decode_header(tconn, tconn->meta.rbuf, &pi))
b411b363 5190 goto reconnect;
7201b972 5191 cmd = &asender_tbl[pi.cmd];
1952e916 5192 if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
2fcb8f30
AG
5193 conn_err(tconn, "Unexpected meta packet %s (0x%04x)\n",
5194 cmdname(pi.cmd), pi.cmd);
b411b363
PR
5195 goto disconnect;
5196 }
e658983a 5197 expect = header_size + cmd->pkt_size;
52b061a4 5198 if (pi.size != expect - header_size) {
32862ec7 5199 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
77351055 5200 pi.cmd, pi.size);
b411b363 5201 goto reconnect;
257d0af6 5202 }
b411b363
PR
5203 }
5204 if (received == expect) {
2735a594 5205 bool err;
a4fbda8e 5206
2735a594
AG
5207 err = cmd->fn(tconn, &pi);
5208 if (err) {
1952e916 5209 conn_err(tconn, "%pf failed\n", cmd->fn);
b411b363 5210 goto reconnect;
1952e916 5211 }
b411b363 5212
a4fbda8e
PR
5213 tconn->last_received = jiffies;
5214
44ed167d
PR
5215 if (cmd == &asender_tbl[P_PING_ACK]) {
5216 /* restore idle timeout */
5217 tconn->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5218 ping_timeout_active = false;
5219 }
f36af18c 5220
e658983a 5221 buf = tconn->meta.rbuf;
b411b363 5222 received = 0;
52b061a4 5223 expect = header_size;
b411b363
PR
5224 cmd = NULL;
5225 }
5226 }
5227
5228 if (0) {
5229reconnect:
bbeb641c 5230 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
b411b363
PR
5231 }
5232 if (0) {
5233disconnect:
bbeb641c 5234 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 5235 }
32862ec7 5236 clear_bit(SIGNAL_ASENDER, &tconn->flags);
b411b363 5237
32862ec7 5238 conn_info(tconn, "asender terminated\n");
b411b363
PR
5239
5240 return 0;
5241}