]> git.proxmox.com Git - mirror_ubuntu-bionic-kernel.git/blob - drivers/block/drbd/drbd_receiver.c
Merge tag 'powerpc-4.5-5' of git://git.kernel.org/pub/scm/linux/kernel/git/powerpc...
[mirror_ubuntu-bionic-kernel.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_protocol.h"
48 #include "drbd_req.h"
49 #include "drbd_vli.h"
50
51 #define PRO_FEATURES (FF_TRIM)
52
53 struct packet_info {
54 enum drbd_packet cmd;
55 unsigned int size;
56 unsigned int vnr;
57 void *data;
58 };
59
60 enum finish_epoch {
61 FE_STILL_LIVE,
62 FE_DESTROYED,
63 FE_RECYCLED,
64 };
65
66 static int drbd_do_features(struct drbd_connection *connection);
67 static int drbd_do_auth(struct drbd_connection *connection);
68 static int drbd_disconnected(struct drbd_peer_device *);
69 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
70 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
71 static int e_end_block(struct drbd_work *, int);
72
73
74 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
75
76 /*
77 * some helper functions to deal with single linked page lists,
78 * page->private being our "next" pointer.
79 */
80
81 /* If at least n pages are linked at head, get n pages off.
82 * Otherwise, don't modify head, and return NULL.
83 * Locking is the responsibility of the caller.
84 */
85 static struct page *page_chain_del(struct page **head, int n)
86 {
87 struct page *page;
88 struct page *tmp;
89
90 BUG_ON(!n);
91 BUG_ON(!head);
92
93 page = *head;
94
95 if (!page)
96 return NULL;
97
98 while (page) {
99 tmp = page_chain_next(page);
100 if (--n == 0)
101 break; /* found sufficient pages */
102 if (tmp == NULL)
103 /* insufficient pages, don't use any of them. */
104 return NULL;
105 page = tmp;
106 }
107
108 /* add end of list marker for the returned list */
109 set_page_private(page, 0);
110 /* actual return value, and adjustment of head */
111 page = *head;
112 *head = tmp;
113 return page;
114 }
115
116 /* may be used outside of locks to find the tail of a (usually short)
117 * "private" page chain, before adding it back to a global chain head
118 * with page_chain_add() under a spinlock. */
119 static struct page *page_chain_tail(struct page *page, int *len)
120 {
121 struct page *tmp;
122 int i = 1;
123 while ((tmp = page_chain_next(page)))
124 ++i, page = tmp;
125 if (len)
126 *len = i;
127 return page;
128 }
129
130 static int page_chain_free(struct page *page)
131 {
132 struct page *tmp;
133 int i = 0;
134 page_chain_for_each_safe(page, tmp) {
135 put_page(page);
136 ++i;
137 }
138 return i;
139 }
140
141 static void page_chain_add(struct page **head,
142 struct page *chain_first, struct page *chain_last)
143 {
144 #if 1
145 struct page *tmp;
146 tmp = page_chain_tail(chain_first, NULL);
147 BUG_ON(tmp != chain_last);
148 #endif
149
150 /* add chain to head */
151 set_page_private(chain_last, (unsigned long)*head);
152 *head = chain_first;
153 }
154
155 static struct page *__drbd_alloc_pages(struct drbd_device *device,
156 unsigned int number)
157 {
158 struct page *page = NULL;
159 struct page *tmp = NULL;
160 unsigned int i = 0;
161
162 /* Yes, testing drbd_pp_vacant outside the lock is racy.
163 * So what. It saves a spin_lock. */
164 if (drbd_pp_vacant >= number) {
165 spin_lock(&drbd_pp_lock);
166 page = page_chain_del(&drbd_pp_pool, number);
167 if (page)
168 drbd_pp_vacant -= number;
169 spin_unlock(&drbd_pp_lock);
170 if (page)
171 return page;
172 }
173
174 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
175 * "criss-cross" setup, that might cause write-out on some other DRBD,
176 * which in turn might block on the other node at this very place. */
177 for (i = 0; i < number; i++) {
178 tmp = alloc_page(GFP_TRY);
179 if (!tmp)
180 break;
181 set_page_private(tmp, (unsigned long)page);
182 page = tmp;
183 }
184
185 if (i == number)
186 return page;
187
188 /* Not enough pages immediately available this time.
189 * No need to jump around here, drbd_alloc_pages will retry this
190 * function "soon". */
191 if (page) {
192 tmp = page_chain_tail(page, NULL);
193 spin_lock(&drbd_pp_lock);
194 page_chain_add(&drbd_pp_pool, page, tmp);
195 drbd_pp_vacant += i;
196 spin_unlock(&drbd_pp_lock);
197 }
198 return NULL;
199 }
200
201 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
202 struct list_head *to_be_freed)
203 {
204 struct drbd_peer_request *peer_req, *tmp;
205
206 /* The EEs are always appended to the end of the list. Since
207 they are sent in order over the wire, they have to finish
208 in order. As soon as we see the first not finished we can
209 stop to examine the list... */
210
211 list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
212 if (drbd_peer_req_has_active_page(peer_req))
213 break;
214 list_move(&peer_req->w.list, to_be_freed);
215 }
216 }
217
218 static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
219 {
220 LIST_HEAD(reclaimed);
221 struct drbd_peer_request *peer_req, *t;
222
223 spin_lock_irq(&device->resource->req_lock);
224 reclaim_finished_net_peer_reqs(device, &reclaimed);
225 spin_unlock_irq(&device->resource->req_lock);
226 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
227 drbd_free_net_peer_req(device, peer_req);
228 }
229
230 static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
231 {
232 struct drbd_peer_device *peer_device;
233 int vnr;
234
235 rcu_read_lock();
236 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
237 struct drbd_device *device = peer_device->device;
238 if (!atomic_read(&device->pp_in_use_by_net))
239 continue;
240
241 kref_get(&device->kref);
242 rcu_read_unlock();
243 drbd_reclaim_net_peer_reqs(device);
244 kref_put(&device->kref, drbd_destroy_device);
245 rcu_read_lock();
246 }
247 rcu_read_unlock();
248 }
249
250 /**
251 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
252 * @device: DRBD device.
253 * @number: number of pages requested
254 * @retry: whether to retry, if not enough pages are available right now
255 *
256 * Tries to allocate number pages, first from our own page pool, then from
257 * the kernel.
258 * Possibly retry until DRBD frees sufficient pages somewhere else.
259 *
260 * If this allocation would exceed the max_buffers setting, we throttle
261 * allocation (schedule_timeout) to give the system some room to breathe.
262 *
263 * We do not use max-buffers as hard limit, because it could lead to
264 * congestion and further to a distributed deadlock during online-verify or
265 * (checksum based) resync, if the max-buffers, socket buffer sizes and
266 * resync-rate settings are mis-configured.
267 *
268 * Returns a page chain linked via page->private.
269 */
270 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
271 bool retry)
272 {
273 struct drbd_device *device = peer_device->device;
274 struct page *page = NULL;
275 struct net_conf *nc;
276 DEFINE_WAIT(wait);
277 unsigned int mxb;
278
279 rcu_read_lock();
280 nc = rcu_dereference(peer_device->connection->net_conf);
281 mxb = nc ? nc->max_buffers : 1000000;
282 rcu_read_unlock();
283
284 if (atomic_read(&device->pp_in_use) < mxb)
285 page = __drbd_alloc_pages(device, number);
286
287 /* Try to keep the fast path fast, but occasionally we need
288 * to reclaim the pages we lended to the network stack. */
289 if (page && atomic_read(&device->pp_in_use_by_net) > 512)
290 drbd_reclaim_net_peer_reqs(device);
291
292 while (page == NULL) {
293 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
294
295 drbd_reclaim_net_peer_reqs(device);
296
297 if (atomic_read(&device->pp_in_use) < mxb) {
298 page = __drbd_alloc_pages(device, number);
299 if (page)
300 break;
301 }
302
303 if (!retry)
304 break;
305
306 if (signal_pending(current)) {
307 drbd_warn(device, "drbd_alloc_pages interrupted!\n");
308 break;
309 }
310
311 if (schedule_timeout(HZ/10) == 0)
312 mxb = UINT_MAX;
313 }
314 finish_wait(&drbd_pp_wait, &wait);
315
316 if (page)
317 atomic_add(number, &device->pp_in_use);
318 return page;
319 }
320
321 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
322 * Is also used from inside an other spin_lock_irq(&resource->req_lock);
323 * Either links the page chain back to the global pool,
324 * or returns all pages to the system. */
325 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
326 {
327 atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
328 int i;
329
330 if (page == NULL)
331 return;
332
333 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
334 i = page_chain_free(page);
335 else {
336 struct page *tmp;
337 tmp = page_chain_tail(page, &i);
338 spin_lock(&drbd_pp_lock);
339 page_chain_add(&drbd_pp_pool, page, tmp);
340 drbd_pp_vacant += i;
341 spin_unlock(&drbd_pp_lock);
342 }
343 i = atomic_sub_return(i, a);
344 if (i < 0)
345 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
346 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
347 wake_up(&drbd_pp_wait);
348 }
349
350 /*
351 You need to hold the req_lock:
352 _drbd_wait_ee_list_empty()
353
354 You must not have the req_lock:
355 drbd_free_peer_req()
356 drbd_alloc_peer_req()
357 drbd_free_peer_reqs()
358 drbd_ee_fix_bhs()
359 drbd_finish_peer_reqs()
360 drbd_clear_done_ee()
361 drbd_wait_ee_list_empty()
362 */
363
364 struct drbd_peer_request *
365 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
366 unsigned int data_size, bool has_payload, gfp_t gfp_mask) __must_hold(local)
367 {
368 struct drbd_device *device = peer_device->device;
369 struct drbd_peer_request *peer_req;
370 struct page *page = NULL;
371 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
372
373 if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
374 return NULL;
375
376 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
377 if (!peer_req) {
378 if (!(gfp_mask & __GFP_NOWARN))
379 drbd_err(device, "%s: allocation failed\n", __func__);
380 return NULL;
381 }
382
383 if (has_payload && data_size) {
384 page = drbd_alloc_pages(peer_device, nr_pages,
385 gfpflags_allow_blocking(gfp_mask));
386 if (!page)
387 goto fail;
388 }
389
390 memset(peer_req, 0, sizeof(*peer_req));
391 INIT_LIST_HEAD(&peer_req->w.list);
392 drbd_clear_interval(&peer_req->i);
393 peer_req->i.size = data_size;
394 peer_req->i.sector = sector;
395 peer_req->submit_jif = jiffies;
396 peer_req->peer_device = peer_device;
397 peer_req->pages = page;
398 /*
399 * The block_id is opaque to the receiver. It is not endianness
400 * converted, and sent back to the sender unchanged.
401 */
402 peer_req->block_id = id;
403
404 return peer_req;
405
406 fail:
407 mempool_free(peer_req, drbd_ee_mempool);
408 return NULL;
409 }
410
411 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
412 int is_net)
413 {
414 might_sleep();
415 if (peer_req->flags & EE_HAS_DIGEST)
416 kfree(peer_req->digest);
417 drbd_free_pages(device, peer_req->pages, is_net);
418 D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
419 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
420 if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
421 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
422 drbd_al_complete_io(device, &peer_req->i);
423 }
424 mempool_free(peer_req, drbd_ee_mempool);
425 }
426
427 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
428 {
429 LIST_HEAD(work_list);
430 struct drbd_peer_request *peer_req, *t;
431 int count = 0;
432 int is_net = list == &device->net_ee;
433
434 spin_lock_irq(&device->resource->req_lock);
435 list_splice_init(list, &work_list);
436 spin_unlock_irq(&device->resource->req_lock);
437
438 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
439 __drbd_free_peer_req(device, peer_req, is_net);
440 count++;
441 }
442 return count;
443 }
444
445 /*
446 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
447 */
448 static int drbd_finish_peer_reqs(struct drbd_device *device)
449 {
450 LIST_HEAD(work_list);
451 LIST_HEAD(reclaimed);
452 struct drbd_peer_request *peer_req, *t;
453 int err = 0;
454
455 spin_lock_irq(&device->resource->req_lock);
456 reclaim_finished_net_peer_reqs(device, &reclaimed);
457 list_splice_init(&device->done_ee, &work_list);
458 spin_unlock_irq(&device->resource->req_lock);
459
460 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
461 drbd_free_net_peer_req(device, peer_req);
462
463 /* possible callbacks here:
464 * e_end_block, and e_end_resync_block, e_send_superseded.
465 * all ignore the last argument.
466 */
467 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
468 int err2;
469
470 /* list_del not necessary, next/prev members not touched */
471 err2 = peer_req->w.cb(&peer_req->w, !!err);
472 if (!err)
473 err = err2;
474 drbd_free_peer_req(device, peer_req);
475 }
476 wake_up(&device->ee_wait);
477
478 return err;
479 }
480
481 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
482 struct list_head *head)
483 {
484 DEFINE_WAIT(wait);
485
486 /* avoids spin_lock/unlock
487 * and calling prepare_to_wait in the fast path */
488 while (!list_empty(head)) {
489 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
490 spin_unlock_irq(&device->resource->req_lock);
491 io_schedule();
492 finish_wait(&device->ee_wait, &wait);
493 spin_lock_irq(&device->resource->req_lock);
494 }
495 }
496
497 static void drbd_wait_ee_list_empty(struct drbd_device *device,
498 struct list_head *head)
499 {
500 spin_lock_irq(&device->resource->req_lock);
501 _drbd_wait_ee_list_empty(device, head);
502 spin_unlock_irq(&device->resource->req_lock);
503 }
504
505 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
506 {
507 struct kvec iov = {
508 .iov_base = buf,
509 .iov_len = size,
510 };
511 struct msghdr msg = {
512 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
513 };
514 return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
515 }
516
517 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
518 {
519 int rv;
520
521 rv = drbd_recv_short(connection->data.socket, buf, size, 0);
522
523 if (rv < 0) {
524 if (rv == -ECONNRESET)
525 drbd_info(connection, "sock was reset by peer\n");
526 else if (rv != -ERESTARTSYS)
527 drbd_err(connection, "sock_recvmsg returned %d\n", rv);
528 } else if (rv == 0) {
529 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
530 long t;
531 rcu_read_lock();
532 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
533 rcu_read_unlock();
534
535 t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
536
537 if (t)
538 goto out;
539 }
540 drbd_info(connection, "sock was shut down by peer\n");
541 }
542
543 if (rv != size)
544 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
545
546 out:
547 return rv;
548 }
549
550 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
551 {
552 int err;
553
554 err = drbd_recv(connection, buf, size);
555 if (err != size) {
556 if (err >= 0)
557 err = -EIO;
558 } else
559 err = 0;
560 return err;
561 }
562
563 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
564 {
565 int err;
566
567 err = drbd_recv_all(connection, buf, size);
568 if (err && !signal_pending(current))
569 drbd_warn(connection, "short read (expected size %d)\n", (int)size);
570 return err;
571 }
572
573 /* quoting tcp(7):
574 * On individual connections, the socket buffer size must be set prior to the
575 * listen(2) or connect(2) calls in order to have it take effect.
576 * This is our wrapper to do so.
577 */
578 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
579 unsigned int rcv)
580 {
581 /* open coded SO_SNDBUF, SO_RCVBUF */
582 if (snd) {
583 sock->sk->sk_sndbuf = snd;
584 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
585 }
586 if (rcv) {
587 sock->sk->sk_rcvbuf = rcv;
588 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
589 }
590 }
591
592 static struct socket *drbd_try_connect(struct drbd_connection *connection)
593 {
594 const char *what;
595 struct socket *sock;
596 struct sockaddr_in6 src_in6;
597 struct sockaddr_in6 peer_in6;
598 struct net_conf *nc;
599 int err, peer_addr_len, my_addr_len;
600 int sndbuf_size, rcvbuf_size, connect_int;
601 int disconnect_on_error = 1;
602
603 rcu_read_lock();
604 nc = rcu_dereference(connection->net_conf);
605 if (!nc) {
606 rcu_read_unlock();
607 return NULL;
608 }
609 sndbuf_size = nc->sndbuf_size;
610 rcvbuf_size = nc->rcvbuf_size;
611 connect_int = nc->connect_int;
612 rcu_read_unlock();
613
614 my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
615 memcpy(&src_in6, &connection->my_addr, my_addr_len);
616
617 if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
618 src_in6.sin6_port = 0;
619 else
620 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
621
622 peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
623 memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
624
625 what = "sock_create_kern";
626 err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
627 SOCK_STREAM, IPPROTO_TCP, &sock);
628 if (err < 0) {
629 sock = NULL;
630 goto out;
631 }
632
633 sock->sk->sk_rcvtimeo =
634 sock->sk->sk_sndtimeo = connect_int * HZ;
635 drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
636
637 /* explicitly bind to the configured IP as source IP
638 * for the outgoing connections.
639 * This is needed for multihomed hosts and to be
640 * able to use lo: interfaces for drbd.
641 * Make sure to use 0 as port number, so linux selects
642 * a free one dynamically.
643 */
644 what = "bind before connect";
645 err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
646 if (err < 0)
647 goto out;
648
649 /* connect may fail, peer not yet available.
650 * stay C_WF_CONNECTION, don't go Disconnecting! */
651 disconnect_on_error = 0;
652 what = "connect";
653 err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
654
655 out:
656 if (err < 0) {
657 if (sock) {
658 sock_release(sock);
659 sock = NULL;
660 }
661 switch (-err) {
662 /* timeout, busy, signal pending */
663 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
664 case EINTR: case ERESTARTSYS:
665 /* peer not (yet) available, network problem */
666 case ECONNREFUSED: case ENETUNREACH:
667 case EHOSTDOWN: case EHOSTUNREACH:
668 disconnect_on_error = 0;
669 break;
670 default:
671 drbd_err(connection, "%s failed, err = %d\n", what, err);
672 }
673 if (disconnect_on_error)
674 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
675 }
676
677 return sock;
678 }
679
680 struct accept_wait_data {
681 struct drbd_connection *connection;
682 struct socket *s_listen;
683 struct completion door_bell;
684 void (*original_sk_state_change)(struct sock *sk);
685
686 };
687
688 static void drbd_incoming_connection(struct sock *sk)
689 {
690 struct accept_wait_data *ad = sk->sk_user_data;
691 void (*state_change)(struct sock *sk);
692
693 state_change = ad->original_sk_state_change;
694 if (sk->sk_state == TCP_ESTABLISHED)
695 complete(&ad->door_bell);
696 state_change(sk);
697 }
698
699 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
700 {
701 int err, sndbuf_size, rcvbuf_size, my_addr_len;
702 struct sockaddr_in6 my_addr;
703 struct socket *s_listen;
704 struct net_conf *nc;
705 const char *what;
706
707 rcu_read_lock();
708 nc = rcu_dereference(connection->net_conf);
709 if (!nc) {
710 rcu_read_unlock();
711 return -EIO;
712 }
713 sndbuf_size = nc->sndbuf_size;
714 rcvbuf_size = nc->rcvbuf_size;
715 rcu_read_unlock();
716
717 my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
718 memcpy(&my_addr, &connection->my_addr, my_addr_len);
719
720 what = "sock_create_kern";
721 err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
722 SOCK_STREAM, IPPROTO_TCP, &s_listen);
723 if (err) {
724 s_listen = NULL;
725 goto out;
726 }
727
728 s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
729 drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
730
731 what = "bind before listen";
732 err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
733 if (err < 0)
734 goto out;
735
736 ad->s_listen = s_listen;
737 write_lock_bh(&s_listen->sk->sk_callback_lock);
738 ad->original_sk_state_change = s_listen->sk->sk_state_change;
739 s_listen->sk->sk_state_change = drbd_incoming_connection;
740 s_listen->sk->sk_user_data = ad;
741 write_unlock_bh(&s_listen->sk->sk_callback_lock);
742
743 what = "listen";
744 err = s_listen->ops->listen(s_listen, 5);
745 if (err < 0)
746 goto out;
747
748 return 0;
749 out:
750 if (s_listen)
751 sock_release(s_listen);
752 if (err < 0) {
753 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
754 drbd_err(connection, "%s failed, err = %d\n", what, err);
755 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
756 }
757 }
758
759 return -EIO;
760 }
761
762 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
763 {
764 write_lock_bh(&sk->sk_callback_lock);
765 sk->sk_state_change = ad->original_sk_state_change;
766 sk->sk_user_data = NULL;
767 write_unlock_bh(&sk->sk_callback_lock);
768 }
769
770 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
771 {
772 int timeo, connect_int, err = 0;
773 struct socket *s_estab = NULL;
774 struct net_conf *nc;
775
776 rcu_read_lock();
777 nc = rcu_dereference(connection->net_conf);
778 if (!nc) {
779 rcu_read_unlock();
780 return NULL;
781 }
782 connect_int = nc->connect_int;
783 rcu_read_unlock();
784
785 timeo = connect_int * HZ;
786 /* 28.5% random jitter */
787 timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
788
789 err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
790 if (err <= 0)
791 return NULL;
792
793 err = kernel_accept(ad->s_listen, &s_estab, 0);
794 if (err < 0) {
795 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
796 drbd_err(connection, "accept failed, err = %d\n", err);
797 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
798 }
799 }
800
801 if (s_estab)
802 unregister_state_change(s_estab->sk, ad);
803
804 return s_estab;
805 }
806
807 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
808
809 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
810 enum drbd_packet cmd)
811 {
812 if (!conn_prepare_command(connection, sock))
813 return -EIO;
814 return conn_send_command(connection, sock, cmd, 0, NULL, 0);
815 }
816
817 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
818 {
819 unsigned int header_size = drbd_header_size(connection);
820 struct packet_info pi;
821 struct net_conf *nc;
822 int err;
823
824 rcu_read_lock();
825 nc = rcu_dereference(connection->net_conf);
826 if (!nc) {
827 rcu_read_unlock();
828 return -EIO;
829 }
830 sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
831 rcu_read_unlock();
832
833 err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
834 if (err != header_size) {
835 if (err >= 0)
836 err = -EIO;
837 return err;
838 }
839 err = decode_header(connection, connection->data.rbuf, &pi);
840 if (err)
841 return err;
842 return pi.cmd;
843 }
844
845 /**
846 * drbd_socket_okay() - Free the socket if its connection is not okay
847 * @sock: pointer to the pointer to the socket.
848 */
849 static bool drbd_socket_okay(struct socket **sock)
850 {
851 int rr;
852 char tb[4];
853
854 if (!*sock)
855 return false;
856
857 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
858
859 if (rr > 0 || rr == -EAGAIN) {
860 return true;
861 } else {
862 sock_release(*sock);
863 *sock = NULL;
864 return false;
865 }
866 }
867
868 static bool connection_established(struct drbd_connection *connection,
869 struct socket **sock1,
870 struct socket **sock2)
871 {
872 struct net_conf *nc;
873 int timeout;
874 bool ok;
875
876 if (!*sock1 || !*sock2)
877 return false;
878
879 rcu_read_lock();
880 nc = rcu_dereference(connection->net_conf);
881 timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
882 rcu_read_unlock();
883 schedule_timeout_interruptible(timeout);
884
885 ok = drbd_socket_okay(sock1);
886 ok = drbd_socket_okay(sock2) && ok;
887
888 return ok;
889 }
890
891 /* Gets called if a connection is established, or if a new minor gets created
892 in a connection */
893 int drbd_connected(struct drbd_peer_device *peer_device)
894 {
895 struct drbd_device *device = peer_device->device;
896 int err;
897
898 atomic_set(&device->packet_seq, 0);
899 device->peer_seq = 0;
900
901 device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
902 &peer_device->connection->cstate_mutex :
903 &device->own_state_mutex;
904
905 err = drbd_send_sync_param(peer_device);
906 if (!err)
907 err = drbd_send_sizes(peer_device, 0, 0);
908 if (!err)
909 err = drbd_send_uuids(peer_device);
910 if (!err)
911 err = drbd_send_current_state(peer_device);
912 clear_bit(USE_DEGR_WFC_T, &device->flags);
913 clear_bit(RESIZE_PENDING, &device->flags);
914 atomic_set(&device->ap_in_flight, 0);
915 mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
916 return err;
917 }
918
919 /*
920 * return values:
921 * 1 yes, we have a valid connection
922 * 0 oops, did not work out, please try again
923 * -1 peer talks different language,
924 * no point in trying again, please go standalone.
925 * -2 We do not have a network config...
926 */
927 static int conn_connect(struct drbd_connection *connection)
928 {
929 struct drbd_socket sock, msock;
930 struct drbd_peer_device *peer_device;
931 struct net_conf *nc;
932 int vnr, timeout, h;
933 bool discard_my_data, ok;
934 enum drbd_state_rv rv;
935 struct accept_wait_data ad = {
936 .connection = connection,
937 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
938 };
939
940 clear_bit(DISCONNECT_SENT, &connection->flags);
941 if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
942 return -2;
943
944 mutex_init(&sock.mutex);
945 sock.sbuf = connection->data.sbuf;
946 sock.rbuf = connection->data.rbuf;
947 sock.socket = NULL;
948 mutex_init(&msock.mutex);
949 msock.sbuf = connection->meta.sbuf;
950 msock.rbuf = connection->meta.rbuf;
951 msock.socket = NULL;
952
953 /* Assume that the peer only understands protocol 80 until we know better. */
954 connection->agreed_pro_version = 80;
955
956 if (prepare_listen_socket(connection, &ad))
957 return 0;
958
959 do {
960 struct socket *s;
961
962 s = drbd_try_connect(connection);
963 if (s) {
964 if (!sock.socket) {
965 sock.socket = s;
966 send_first_packet(connection, &sock, P_INITIAL_DATA);
967 } else if (!msock.socket) {
968 clear_bit(RESOLVE_CONFLICTS, &connection->flags);
969 msock.socket = s;
970 send_first_packet(connection, &msock, P_INITIAL_META);
971 } else {
972 drbd_err(connection, "Logic error in conn_connect()\n");
973 goto out_release_sockets;
974 }
975 }
976
977 if (connection_established(connection, &sock.socket, &msock.socket))
978 break;
979
980 retry:
981 s = drbd_wait_for_connect(connection, &ad);
982 if (s) {
983 int fp = receive_first_packet(connection, s);
984 drbd_socket_okay(&sock.socket);
985 drbd_socket_okay(&msock.socket);
986 switch (fp) {
987 case P_INITIAL_DATA:
988 if (sock.socket) {
989 drbd_warn(connection, "initial packet S crossed\n");
990 sock_release(sock.socket);
991 sock.socket = s;
992 goto randomize;
993 }
994 sock.socket = s;
995 break;
996 case P_INITIAL_META:
997 set_bit(RESOLVE_CONFLICTS, &connection->flags);
998 if (msock.socket) {
999 drbd_warn(connection, "initial packet M crossed\n");
1000 sock_release(msock.socket);
1001 msock.socket = s;
1002 goto randomize;
1003 }
1004 msock.socket = s;
1005 break;
1006 default:
1007 drbd_warn(connection, "Error receiving initial packet\n");
1008 sock_release(s);
1009 randomize:
1010 if (prandom_u32() & 1)
1011 goto retry;
1012 }
1013 }
1014
1015 if (connection->cstate <= C_DISCONNECTING)
1016 goto out_release_sockets;
1017 if (signal_pending(current)) {
1018 flush_signals(current);
1019 smp_rmb();
1020 if (get_t_state(&connection->receiver) == EXITING)
1021 goto out_release_sockets;
1022 }
1023
1024 ok = connection_established(connection, &sock.socket, &msock.socket);
1025 } while (!ok);
1026
1027 if (ad.s_listen)
1028 sock_release(ad.s_listen);
1029
1030 sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1031 msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1032
1033 sock.socket->sk->sk_allocation = GFP_NOIO;
1034 msock.socket->sk->sk_allocation = GFP_NOIO;
1035
1036 sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1037 msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1038
1039 /* NOT YET ...
1040 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1041 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1042 * first set it to the P_CONNECTION_FEATURES timeout,
1043 * which we set to 4x the configured ping_timeout. */
1044 rcu_read_lock();
1045 nc = rcu_dereference(connection->net_conf);
1046
1047 sock.socket->sk->sk_sndtimeo =
1048 sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1049
1050 msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1051 timeout = nc->timeout * HZ / 10;
1052 discard_my_data = nc->discard_my_data;
1053 rcu_read_unlock();
1054
1055 msock.socket->sk->sk_sndtimeo = timeout;
1056
1057 /* we don't want delays.
1058 * we use TCP_CORK where appropriate, though */
1059 drbd_tcp_nodelay(sock.socket);
1060 drbd_tcp_nodelay(msock.socket);
1061
1062 connection->data.socket = sock.socket;
1063 connection->meta.socket = msock.socket;
1064 connection->last_received = jiffies;
1065
1066 h = drbd_do_features(connection);
1067 if (h <= 0)
1068 return h;
1069
1070 if (connection->cram_hmac_tfm) {
1071 /* drbd_request_state(device, NS(conn, WFAuth)); */
1072 switch (drbd_do_auth(connection)) {
1073 case -1:
1074 drbd_err(connection, "Authentication of peer failed\n");
1075 return -1;
1076 case 0:
1077 drbd_err(connection, "Authentication of peer failed, trying again.\n");
1078 return 0;
1079 }
1080 }
1081
1082 connection->data.socket->sk->sk_sndtimeo = timeout;
1083 connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1084
1085 if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1086 return -1;
1087
1088 /* Prevent a race between resync-handshake and
1089 * being promoted to Primary.
1090 *
1091 * Grab and release the state mutex, so we know that any current
1092 * drbd_set_role() is finished, and any incoming drbd_set_role
1093 * will see the STATE_SENT flag, and wait for it to be cleared.
1094 */
1095 idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1096 mutex_lock(peer_device->device->state_mutex);
1097
1098 set_bit(STATE_SENT, &connection->flags);
1099
1100 idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1101 mutex_unlock(peer_device->device->state_mutex);
1102
1103 rcu_read_lock();
1104 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1105 struct drbd_device *device = peer_device->device;
1106 kref_get(&device->kref);
1107 rcu_read_unlock();
1108
1109 if (discard_my_data)
1110 set_bit(DISCARD_MY_DATA, &device->flags);
1111 else
1112 clear_bit(DISCARD_MY_DATA, &device->flags);
1113
1114 drbd_connected(peer_device);
1115 kref_put(&device->kref, drbd_destroy_device);
1116 rcu_read_lock();
1117 }
1118 rcu_read_unlock();
1119
1120 rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1121 if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1122 clear_bit(STATE_SENT, &connection->flags);
1123 return 0;
1124 }
1125
1126 drbd_thread_start(&connection->ack_receiver);
1127 /* opencoded create_singlethread_workqueue(),
1128 * to be able to use format string arguments */
1129 connection->ack_sender =
1130 alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
1131 if (!connection->ack_sender) {
1132 drbd_err(connection, "Failed to create workqueue ack_sender\n");
1133 return 0;
1134 }
1135
1136 mutex_lock(&connection->resource->conf_update);
1137 /* The discard_my_data flag is a single-shot modifier to the next
1138 * connection attempt, the handshake of which is now well underway.
1139 * No need for rcu style copying of the whole struct
1140 * just to clear a single value. */
1141 connection->net_conf->discard_my_data = 0;
1142 mutex_unlock(&connection->resource->conf_update);
1143
1144 return h;
1145
1146 out_release_sockets:
1147 if (ad.s_listen)
1148 sock_release(ad.s_listen);
1149 if (sock.socket)
1150 sock_release(sock.socket);
1151 if (msock.socket)
1152 sock_release(msock.socket);
1153 return -1;
1154 }
1155
1156 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1157 {
1158 unsigned int header_size = drbd_header_size(connection);
1159
1160 if (header_size == sizeof(struct p_header100) &&
1161 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1162 struct p_header100 *h = header;
1163 if (h->pad != 0) {
1164 drbd_err(connection, "Header padding is not zero\n");
1165 return -EINVAL;
1166 }
1167 pi->vnr = be16_to_cpu(h->volume);
1168 pi->cmd = be16_to_cpu(h->command);
1169 pi->size = be32_to_cpu(h->length);
1170 } else if (header_size == sizeof(struct p_header95) &&
1171 *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1172 struct p_header95 *h = header;
1173 pi->cmd = be16_to_cpu(h->command);
1174 pi->size = be32_to_cpu(h->length);
1175 pi->vnr = 0;
1176 } else if (header_size == sizeof(struct p_header80) &&
1177 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1178 struct p_header80 *h = header;
1179 pi->cmd = be16_to_cpu(h->command);
1180 pi->size = be16_to_cpu(h->length);
1181 pi->vnr = 0;
1182 } else {
1183 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1184 be32_to_cpu(*(__be32 *)header),
1185 connection->agreed_pro_version);
1186 return -EINVAL;
1187 }
1188 pi->data = header + header_size;
1189 return 0;
1190 }
1191
1192 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1193 {
1194 void *buffer = connection->data.rbuf;
1195 int err;
1196
1197 err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1198 if (err)
1199 return err;
1200
1201 err = decode_header(connection, buffer, pi);
1202 connection->last_received = jiffies;
1203
1204 return err;
1205 }
1206
1207 static void drbd_flush(struct drbd_connection *connection)
1208 {
1209 int rv;
1210 struct drbd_peer_device *peer_device;
1211 int vnr;
1212
1213 if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1214 rcu_read_lock();
1215 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1216 struct drbd_device *device = peer_device->device;
1217
1218 if (!get_ldev(device))
1219 continue;
1220 kref_get(&device->kref);
1221 rcu_read_unlock();
1222
1223 /* Right now, we have only this one synchronous code path
1224 * for flushes between request epochs.
1225 * We may want to make those asynchronous,
1226 * or at least parallelize the flushes to the volume devices.
1227 */
1228 device->flush_jif = jiffies;
1229 set_bit(FLUSH_PENDING, &device->flags);
1230 rv = blkdev_issue_flush(device->ldev->backing_bdev,
1231 GFP_NOIO, NULL);
1232 clear_bit(FLUSH_PENDING, &device->flags);
1233 if (rv) {
1234 drbd_info(device, "local disk flush failed with status %d\n", rv);
1235 /* would rather check on EOPNOTSUPP, but that is not reliable.
1236 * don't try again for ANY return value != 0
1237 * if (rv == -EOPNOTSUPP) */
1238 drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1239 }
1240 put_ldev(device);
1241 kref_put(&device->kref, drbd_destroy_device);
1242
1243 rcu_read_lock();
1244 if (rv)
1245 break;
1246 }
1247 rcu_read_unlock();
1248 }
1249 }
1250
1251 /**
1252 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1253 * @device: DRBD device.
1254 * @epoch: Epoch object.
1255 * @ev: Epoch event.
1256 */
1257 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1258 struct drbd_epoch *epoch,
1259 enum epoch_event ev)
1260 {
1261 int epoch_size;
1262 struct drbd_epoch *next_epoch;
1263 enum finish_epoch rv = FE_STILL_LIVE;
1264
1265 spin_lock(&connection->epoch_lock);
1266 do {
1267 next_epoch = NULL;
1268
1269 epoch_size = atomic_read(&epoch->epoch_size);
1270
1271 switch (ev & ~EV_CLEANUP) {
1272 case EV_PUT:
1273 atomic_dec(&epoch->active);
1274 break;
1275 case EV_GOT_BARRIER_NR:
1276 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1277 break;
1278 case EV_BECAME_LAST:
1279 /* nothing to do*/
1280 break;
1281 }
1282
1283 if (epoch_size != 0 &&
1284 atomic_read(&epoch->active) == 0 &&
1285 (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1286 if (!(ev & EV_CLEANUP)) {
1287 spin_unlock(&connection->epoch_lock);
1288 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1289 spin_lock(&connection->epoch_lock);
1290 }
1291 #if 0
1292 /* FIXME: dec unacked on connection, once we have
1293 * something to count pending connection packets in. */
1294 if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1295 dec_unacked(epoch->connection);
1296 #endif
1297
1298 if (connection->current_epoch != epoch) {
1299 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1300 list_del(&epoch->list);
1301 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1302 connection->epochs--;
1303 kfree(epoch);
1304
1305 if (rv == FE_STILL_LIVE)
1306 rv = FE_DESTROYED;
1307 } else {
1308 epoch->flags = 0;
1309 atomic_set(&epoch->epoch_size, 0);
1310 /* atomic_set(&epoch->active, 0); is already zero */
1311 if (rv == FE_STILL_LIVE)
1312 rv = FE_RECYCLED;
1313 }
1314 }
1315
1316 if (!next_epoch)
1317 break;
1318
1319 epoch = next_epoch;
1320 } while (1);
1321
1322 spin_unlock(&connection->epoch_lock);
1323
1324 return rv;
1325 }
1326
1327 static enum write_ordering_e
1328 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1329 {
1330 struct disk_conf *dc;
1331
1332 dc = rcu_dereference(bdev->disk_conf);
1333
1334 if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1335 wo = WO_DRAIN_IO;
1336 if (wo == WO_DRAIN_IO && !dc->disk_drain)
1337 wo = WO_NONE;
1338
1339 return wo;
1340 }
1341
1342 /**
1343 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1344 * @connection: DRBD connection.
1345 * @wo: Write ordering method to try.
1346 */
1347 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1348 enum write_ordering_e wo)
1349 {
1350 struct drbd_device *device;
1351 enum write_ordering_e pwo;
1352 int vnr;
1353 static char *write_ordering_str[] = {
1354 [WO_NONE] = "none",
1355 [WO_DRAIN_IO] = "drain",
1356 [WO_BDEV_FLUSH] = "flush",
1357 };
1358
1359 pwo = resource->write_ordering;
1360 if (wo != WO_BDEV_FLUSH)
1361 wo = min(pwo, wo);
1362 rcu_read_lock();
1363 idr_for_each_entry(&resource->devices, device, vnr) {
1364 if (get_ldev(device)) {
1365 wo = max_allowed_wo(device->ldev, wo);
1366 if (device->ldev == bdev)
1367 bdev = NULL;
1368 put_ldev(device);
1369 }
1370 }
1371
1372 if (bdev)
1373 wo = max_allowed_wo(bdev, wo);
1374
1375 rcu_read_unlock();
1376
1377 resource->write_ordering = wo;
1378 if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1379 drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1380 }
1381
1382 /**
1383 * drbd_submit_peer_request()
1384 * @device: DRBD device.
1385 * @peer_req: peer request
1386 * @rw: flag field, see bio->bi_rw
1387 *
1388 * May spread the pages to multiple bios,
1389 * depending on bio_add_page restrictions.
1390 *
1391 * Returns 0 if all bios have been submitted,
1392 * -ENOMEM if we could not allocate enough bios,
1393 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1394 * single page to an empty bio (which should never happen and likely indicates
1395 * that the lower level IO stack is in some way broken). This has been observed
1396 * on certain Xen deployments.
1397 */
1398 /* TODO allocate from our own bio_set. */
1399 int drbd_submit_peer_request(struct drbd_device *device,
1400 struct drbd_peer_request *peer_req,
1401 const unsigned rw, const int fault_type)
1402 {
1403 struct bio *bios = NULL;
1404 struct bio *bio;
1405 struct page *page = peer_req->pages;
1406 sector_t sector = peer_req->i.sector;
1407 unsigned data_size = peer_req->i.size;
1408 unsigned n_bios = 0;
1409 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1410 int err = -ENOMEM;
1411
1412 if (peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) {
1413 /* wait for all pending IO completions, before we start
1414 * zeroing things out. */
1415 conn_wait_active_ee_empty(peer_req->peer_device->connection);
1416 /* add it to the active list now,
1417 * so we can find it to present it in debugfs */
1418 peer_req->submit_jif = jiffies;
1419 peer_req->flags |= EE_SUBMITTED;
1420 spin_lock_irq(&device->resource->req_lock);
1421 list_add_tail(&peer_req->w.list, &device->active_ee);
1422 spin_unlock_irq(&device->resource->req_lock);
1423 if (blkdev_issue_zeroout(device->ldev->backing_bdev,
1424 sector, data_size >> 9, GFP_NOIO, false))
1425 peer_req->flags |= EE_WAS_ERROR;
1426 drbd_endio_write_sec_final(peer_req);
1427 return 0;
1428 }
1429
1430 /* Discards don't have any payload.
1431 * But the scsi layer still expects a bio_vec it can use internally,
1432 * see sd_setup_discard_cmnd() and blk_add_request_payload(). */
1433 if (peer_req->flags & EE_IS_TRIM)
1434 nr_pages = 1;
1435
1436 /* In most cases, we will only need one bio. But in case the lower
1437 * level restrictions happen to be different at this offset on this
1438 * side than those of the sending peer, we may need to submit the
1439 * request in more than one bio.
1440 *
1441 * Plain bio_alloc is good enough here, this is no DRBD internally
1442 * generated bio, but a bio allocated on behalf of the peer.
1443 */
1444 next_bio:
1445 bio = bio_alloc(GFP_NOIO, nr_pages);
1446 if (!bio) {
1447 drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1448 goto fail;
1449 }
1450 /* > peer_req->i.sector, unless this is the first bio */
1451 bio->bi_iter.bi_sector = sector;
1452 bio->bi_bdev = device->ldev->backing_bdev;
1453 bio->bi_rw = rw;
1454 bio->bi_private = peer_req;
1455 bio->bi_end_io = drbd_peer_request_endio;
1456
1457 bio->bi_next = bios;
1458 bios = bio;
1459 ++n_bios;
1460
1461 if (rw & REQ_DISCARD) {
1462 bio->bi_iter.bi_size = data_size;
1463 goto submit;
1464 }
1465
1466 page_chain_for_each(page) {
1467 unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1468 if (!bio_add_page(bio, page, len, 0)) {
1469 /* A single page must always be possible!
1470 * But in case it fails anyways,
1471 * we deal with it, and complain (below). */
1472 if (bio->bi_vcnt == 0) {
1473 drbd_err(device,
1474 "bio_add_page failed for len=%u, "
1475 "bi_vcnt=0 (bi_sector=%llu)\n",
1476 len, (uint64_t)bio->bi_iter.bi_sector);
1477 err = -ENOSPC;
1478 goto fail;
1479 }
1480 goto next_bio;
1481 }
1482 data_size -= len;
1483 sector += len >> 9;
1484 --nr_pages;
1485 }
1486 D_ASSERT(device, data_size == 0);
1487 submit:
1488 D_ASSERT(device, page == NULL);
1489
1490 atomic_set(&peer_req->pending_bios, n_bios);
1491 /* for debugfs: update timestamp, mark as submitted */
1492 peer_req->submit_jif = jiffies;
1493 peer_req->flags |= EE_SUBMITTED;
1494 do {
1495 bio = bios;
1496 bios = bios->bi_next;
1497 bio->bi_next = NULL;
1498
1499 drbd_generic_make_request(device, fault_type, bio);
1500 } while (bios);
1501 return 0;
1502
1503 fail:
1504 while (bios) {
1505 bio = bios;
1506 bios = bios->bi_next;
1507 bio_put(bio);
1508 }
1509 return err;
1510 }
1511
1512 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1513 struct drbd_peer_request *peer_req)
1514 {
1515 struct drbd_interval *i = &peer_req->i;
1516
1517 drbd_remove_interval(&device->write_requests, i);
1518 drbd_clear_interval(i);
1519
1520 /* Wake up any processes waiting for this peer request to complete. */
1521 if (i->waiting)
1522 wake_up(&device->misc_wait);
1523 }
1524
1525 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1526 {
1527 struct drbd_peer_device *peer_device;
1528 int vnr;
1529
1530 rcu_read_lock();
1531 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1532 struct drbd_device *device = peer_device->device;
1533
1534 kref_get(&device->kref);
1535 rcu_read_unlock();
1536 drbd_wait_ee_list_empty(device, &device->active_ee);
1537 kref_put(&device->kref, drbd_destroy_device);
1538 rcu_read_lock();
1539 }
1540 rcu_read_unlock();
1541 }
1542
1543 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1544 {
1545 int rv;
1546 struct p_barrier *p = pi->data;
1547 struct drbd_epoch *epoch;
1548
1549 /* FIXME these are unacked on connection,
1550 * not a specific (peer)device.
1551 */
1552 connection->current_epoch->barrier_nr = p->barrier;
1553 connection->current_epoch->connection = connection;
1554 rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1555
1556 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1557 * the activity log, which means it would not be resynced in case the
1558 * R_PRIMARY crashes now.
1559 * Therefore we must send the barrier_ack after the barrier request was
1560 * completed. */
1561 switch (connection->resource->write_ordering) {
1562 case WO_NONE:
1563 if (rv == FE_RECYCLED)
1564 return 0;
1565
1566 /* receiver context, in the writeout path of the other node.
1567 * avoid potential distributed deadlock */
1568 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1569 if (epoch)
1570 break;
1571 else
1572 drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1573 /* Fall through */
1574
1575 case WO_BDEV_FLUSH:
1576 case WO_DRAIN_IO:
1577 conn_wait_active_ee_empty(connection);
1578 drbd_flush(connection);
1579
1580 if (atomic_read(&connection->current_epoch->epoch_size)) {
1581 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1582 if (epoch)
1583 break;
1584 }
1585
1586 return 0;
1587 default:
1588 drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1589 connection->resource->write_ordering);
1590 return -EIO;
1591 }
1592
1593 epoch->flags = 0;
1594 atomic_set(&epoch->epoch_size, 0);
1595 atomic_set(&epoch->active, 0);
1596
1597 spin_lock(&connection->epoch_lock);
1598 if (atomic_read(&connection->current_epoch->epoch_size)) {
1599 list_add(&epoch->list, &connection->current_epoch->list);
1600 connection->current_epoch = epoch;
1601 connection->epochs++;
1602 } else {
1603 /* The current_epoch got recycled while we allocated this one... */
1604 kfree(epoch);
1605 }
1606 spin_unlock(&connection->epoch_lock);
1607
1608 return 0;
1609 }
1610
1611 /* used from receive_RSDataReply (recv_resync_read)
1612 * and from receive_Data */
1613 static struct drbd_peer_request *
1614 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1615 struct packet_info *pi) __must_hold(local)
1616 {
1617 struct drbd_device *device = peer_device->device;
1618 const sector_t capacity = drbd_get_capacity(device->this_bdev);
1619 struct drbd_peer_request *peer_req;
1620 struct page *page;
1621 int digest_size, err;
1622 unsigned int data_size = pi->size, ds;
1623 void *dig_in = peer_device->connection->int_dig_in;
1624 void *dig_vv = peer_device->connection->int_dig_vv;
1625 unsigned long *data;
1626 struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1627
1628 digest_size = 0;
1629 if (!trim && peer_device->connection->peer_integrity_tfm) {
1630 digest_size = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1631 /*
1632 * FIXME: Receive the incoming digest into the receive buffer
1633 * here, together with its struct p_data?
1634 */
1635 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1636 if (err)
1637 return NULL;
1638 data_size -= digest_size;
1639 }
1640
1641 if (trim) {
1642 D_ASSERT(peer_device, data_size == 0);
1643 data_size = be32_to_cpu(trim->size);
1644 }
1645
1646 if (!expect(IS_ALIGNED(data_size, 512)))
1647 return NULL;
1648 /* prepare for larger trim requests. */
1649 if (!trim && !expect(data_size <= DRBD_MAX_BIO_SIZE))
1650 return NULL;
1651
1652 /* even though we trust out peer,
1653 * we sometimes have to double check. */
1654 if (sector + (data_size>>9) > capacity) {
1655 drbd_err(device, "request from peer beyond end of local disk: "
1656 "capacity: %llus < sector: %llus + size: %u\n",
1657 (unsigned long long)capacity,
1658 (unsigned long long)sector, data_size);
1659 return NULL;
1660 }
1661
1662 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1663 * "criss-cross" setup, that might cause write-out on some other DRBD,
1664 * which in turn might block on the other node at this very place. */
1665 peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, trim == NULL, GFP_NOIO);
1666 if (!peer_req)
1667 return NULL;
1668
1669 peer_req->flags |= EE_WRITE;
1670 if (trim)
1671 return peer_req;
1672
1673 ds = data_size;
1674 page = peer_req->pages;
1675 page_chain_for_each(page) {
1676 unsigned len = min_t(int, ds, PAGE_SIZE);
1677 data = kmap(page);
1678 err = drbd_recv_all_warn(peer_device->connection, data, len);
1679 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1680 drbd_err(device, "Fault injection: Corrupting data on receive\n");
1681 data[0] = data[0] ^ (unsigned long)-1;
1682 }
1683 kunmap(page);
1684 if (err) {
1685 drbd_free_peer_req(device, peer_req);
1686 return NULL;
1687 }
1688 ds -= len;
1689 }
1690
1691 if (digest_size) {
1692 drbd_csum_ee(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv);
1693 if (memcmp(dig_in, dig_vv, digest_size)) {
1694 drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1695 (unsigned long long)sector, data_size);
1696 drbd_free_peer_req(device, peer_req);
1697 return NULL;
1698 }
1699 }
1700 device->recv_cnt += data_size >> 9;
1701 return peer_req;
1702 }
1703
1704 /* drbd_drain_block() just takes a data block
1705 * out of the socket input buffer, and discards it.
1706 */
1707 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1708 {
1709 struct page *page;
1710 int err = 0;
1711 void *data;
1712
1713 if (!data_size)
1714 return 0;
1715
1716 page = drbd_alloc_pages(peer_device, 1, 1);
1717
1718 data = kmap(page);
1719 while (data_size) {
1720 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1721
1722 err = drbd_recv_all_warn(peer_device->connection, data, len);
1723 if (err)
1724 break;
1725 data_size -= len;
1726 }
1727 kunmap(page);
1728 drbd_free_pages(peer_device->device, page, 0);
1729 return err;
1730 }
1731
1732 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1733 sector_t sector, int data_size)
1734 {
1735 struct bio_vec bvec;
1736 struct bvec_iter iter;
1737 struct bio *bio;
1738 int digest_size, err, expect;
1739 void *dig_in = peer_device->connection->int_dig_in;
1740 void *dig_vv = peer_device->connection->int_dig_vv;
1741
1742 digest_size = 0;
1743 if (peer_device->connection->peer_integrity_tfm) {
1744 digest_size = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1745 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1746 if (err)
1747 return err;
1748 data_size -= digest_size;
1749 }
1750
1751 /* optimistically update recv_cnt. if receiving fails below,
1752 * we disconnect anyways, and counters will be reset. */
1753 peer_device->device->recv_cnt += data_size>>9;
1754
1755 bio = req->master_bio;
1756 D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1757
1758 bio_for_each_segment(bvec, bio, iter) {
1759 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1760 expect = min_t(int, data_size, bvec.bv_len);
1761 err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1762 kunmap(bvec.bv_page);
1763 if (err)
1764 return err;
1765 data_size -= expect;
1766 }
1767
1768 if (digest_size) {
1769 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1770 if (memcmp(dig_in, dig_vv, digest_size)) {
1771 drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1772 return -EINVAL;
1773 }
1774 }
1775
1776 D_ASSERT(peer_device->device, data_size == 0);
1777 return 0;
1778 }
1779
1780 /*
1781 * e_end_resync_block() is called in ack_sender context via
1782 * drbd_finish_peer_reqs().
1783 */
1784 static int e_end_resync_block(struct drbd_work *w, int unused)
1785 {
1786 struct drbd_peer_request *peer_req =
1787 container_of(w, struct drbd_peer_request, w);
1788 struct drbd_peer_device *peer_device = peer_req->peer_device;
1789 struct drbd_device *device = peer_device->device;
1790 sector_t sector = peer_req->i.sector;
1791 int err;
1792
1793 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1794
1795 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1796 drbd_set_in_sync(device, sector, peer_req->i.size);
1797 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1798 } else {
1799 /* Record failure to sync */
1800 drbd_rs_failed_io(device, sector, peer_req->i.size);
1801
1802 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1803 }
1804 dec_unacked(device);
1805
1806 return err;
1807 }
1808
1809 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1810 struct packet_info *pi) __releases(local)
1811 {
1812 struct drbd_device *device = peer_device->device;
1813 struct drbd_peer_request *peer_req;
1814
1815 peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
1816 if (!peer_req)
1817 goto fail;
1818
1819 dec_rs_pending(device);
1820
1821 inc_unacked(device);
1822 /* corresponding dec_unacked() in e_end_resync_block()
1823 * respective _drbd_clear_done_ee */
1824
1825 peer_req->w.cb = e_end_resync_block;
1826 peer_req->submit_jif = jiffies;
1827
1828 spin_lock_irq(&device->resource->req_lock);
1829 list_add_tail(&peer_req->w.list, &device->sync_ee);
1830 spin_unlock_irq(&device->resource->req_lock);
1831
1832 atomic_add(pi->size >> 9, &device->rs_sect_ev);
1833 if (drbd_submit_peer_request(device, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1834 return 0;
1835
1836 /* don't care for the reason here */
1837 drbd_err(device, "submit failed, triggering re-connect\n");
1838 spin_lock_irq(&device->resource->req_lock);
1839 list_del(&peer_req->w.list);
1840 spin_unlock_irq(&device->resource->req_lock);
1841
1842 drbd_free_peer_req(device, peer_req);
1843 fail:
1844 put_ldev(device);
1845 return -EIO;
1846 }
1847
1848 static struct drbd_request *
1849 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1850 sector_t sector, bool missing_ok, const char *func)
1851 {
1852 struct drbd_request *req;
1853
1854 /* Request object according to our peer */
1855 req = (struct drbd_request *)(unsigned long)id;
1856 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1857 return req;
1858 if (!missing_ok) {
1859 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1860 (unsigned long)id, (unsigned long long)sector);
1861 }
1862 return NULL;
1863 }
1864
1865 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
1866 {
1867 struct drbd_peer_device *peer_device;
1868 struct drbd_device *device;
1869 struct drbd_request *req;
1870 sector_t sector;
1871 int err;
1872 struct p_data *p = pi->data;
1873
1874 peer_device = conn_peer_device(connection, pi->vnr);
1875 if (!peer_device)
1876 return -EIO;
1877 device = peer_device->device;
1878
1879 sector = be64_to_cpu(p->sector);
1880
1881 spin_lock_irq(&device->resource->req_lock);
1882 req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
1883 spin_unlock_irq(&device->resource->req_lock);
1884 if (unlikely(!req))
1885 return -EIO;
1886
1887 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1888 * special casing it there for the various failure cases.
1889 * still no race with drbd_fail_pending_reads */
1890 err = recv_dless_read(peer_device, req, sector, pi->size);
1891 if (!err)
1892 req_mod(req, DATA_RECEIVED);
1893 /* else: nothing. handled from drbd_disconnect...
1894 * I don't think we may complete this just yet
1895 * in case we are "on-disconnect: freeze" */
1896
1897 return err;
1898 }
1899
1900 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
1901 {
1902 struct drbd_peer_device *peer_device;
1903 struct drbd_device *device;
1904 sector_t sector;
1905 int err;
1906 struct p_data *p = pi->data;
1907
1908 peer_device = conn_peer_device(connection, pi->vnr);
1909 if (!peer_device)
1910 return -EIO;
1911 device = peer_device->device;
1912
1913 sector = be64_to_cpu(p->sector);
1914 D_ASSERT(device, p->block_id == ID_SYNCER);
1915
1916 if (get_ldev(device)) {
1917 /* data is submitted to disk within recv_resync_read.
1918 * corresponding put_ldev done below on error,
1919 * or in drbd_peer_request_endio. */
1920 err = recv_resync_read(peer_device, sector, pi);
1921 } else {
1922 if (__ratelimit(&drbd_ratelimit_state))
1923 drbd_err(device, "Can not write resync data to local disk.\n");
1924
1925 err = drbd_drain_block(peer_device, pi->size);
1926
1927 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
1928 }
1929
1930 atomic_add(pi->size >> 9, &device->rs_sect_in);
1931
1932 return err;
1933 }
1934
1935 static void restart_conflicting_writes(struct drbd_device *device,
1936 sector_t sector, int size)
1937 {
1938 struct drbd_interval *i;
1939 struct drbd_request *req;
1940
1941 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
1942 if (!i->local)
1943 continue;
1944 req = container_of(i, struct drbd_request, i);
1945 if (req->rq_state & RQ_LOCAL_PENDING ||
1946 !(req->rq_state & RQ_POSTPONED))
1947 continue;
1948 /* as it is RQ_POSTPONED, this will cause it to
1949 * be queued on the retry workqueue. */
1950 __req_mod(req, CONFLICT_RESOLVED, NULL);
1951 }
1952 }
1953
1954 /*
1955 * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
1956 */
1957 static int e_end_block(struct drbd_work *w, int cancel)
1958 {
1959 struct drbd_peer_request *peer_req =
1960 container_of(w, struct drbd_peer_request, w);
1961 struct drbd_peer_device *peer_device = peer_req->peer_device;
1962 struct drbd_device *device = peer_device->device;
1963 sector_t sector = peer_req->i.sector;
1964 int err = 0, pcmd;
1965
1966 if (peer_req->flags & EE_SEND_WRITE_ACK) {
1967 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1968 pcmd = (device->state.conn >= C_SYNC_SOURCE &&
1969 device->state.conn <= C_PAUSED_SYNC_T &&
1970 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1971 P_RS_WRITE_ACK : P_WRITE_ACK;
1972 err = drbd_send_ack(peer_device, pcmd, peer_req);
1973 if (pcmd == P_RS_WRITE_ACK)
1974 drbd_set_in_sync(device, sector, peer_req->i.size);
1975 } else {
1976 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1977 /* we expect it to be marked out of sync anyways...
1978 * maybe assert this? */
1979 }
1980 dec_unacked(device);
1981 }
1982
1983 /* we delete from the conflict detection hash _after_ we sent out the
1984 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1985 if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1986 spin_lock_irq(&device->resource->req_lock);
1987 D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
1988 drbd_remove_epoch_entry_interval(device, peer_req);
1989 if (peer_req->flags & EE_RESTART_REQUESTS)
1990 restart_conflicting_writes(device, sector, peer_req->i.size);
1991 spin_unlock_irq(&device->resource->req_lock);
1992 } else
1993 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1994
1995 drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1996
1997 return err;
1998 }
1999
2000 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2001 {
2002 struct drbd_peer_request *peer_req =
2003 container_of(w, struct drbd_peer_request, w);
2004 struct drbd_peer_device *peer_device = peer_req->peer_device;
2005 int err;
2006
2007 err = drbd_send_ack(peer_device, ack, peer_req);
2008 dec_unacked(peer_device->device);
2009
2010 return err;
2011 }
2012
2013 static int e_send_superseded(struct drbd_work *w, int unused)
2014 {
2015 return e_send_ack(w, P_SUPERSEDED);
2016 }
2017
2018 static int e_send_retry_write(struct drbd_work *w, int unused)
2019 {
2020 struct drbd_peer_request *peer_req =
2021 container_of(w, struct drbd_peer_request, w);
2022 struct drbd_connection *connection = peer_req->peer_device->connection;
2023
2024 return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2025 P_RETRY_WRITE : P_SUPERSEDED);
2026 }
2027
2028 static bool seq_greater(u32 a, u32 b)
2029 {
2030 /*
2031 * We assume 32-bit wrap-around here.
2032 * For 24-bit wrap-around, we would have to shift:
2033 * a <<= 8; b <<= 8;
2034 */
2035 return (s32)a - (s32)b > 0;
2036 }
2037
2038 static u32 seq_max(u32 a, u32 b)
2039 {
2040 return seq_greater(a, b) ? a : b;
2041 }
2042
2043 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2044 {
2045 struct drbd_device *device = peer_device->device;
2046 unsigned int newest_peer_seq;
2047
2048 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2049 spin_lock(&device->peer_seq_lock);
2050 newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2051 device->peer_seq = newest_peer_seq;
2052 spin_unlock(&device->peer_seq_lock);
2053 /* wake up only if we actually changed device->peer_seq */
2054 if (peer_seq == newest_peer_seq)
2055 wake_up(&device->seq_wait);
2056 }
2057 }
2058
2059 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2060 {
2061 return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2062 }
2063
2064 /* maybe change sync_ee into interval trees as well? */
2065 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2066 {
2067 struct drbd_peer_request *rs_req;
2068 bool rv = 0;
2069
2070 spin_lock_irq(&device->resource->req_lock);
2071 list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2072 if (overlaps(peer_req->i.sector, peer_req->i.size,
2073 rs_req->i.sector, rs_req->i.size)) {
2074 rv = 1;
2075 break;
2076 }
2077 }
2078 spin_unlock_irq(&device->resource->req_lock);
2079
2080 return rv;
2081 }
2082
2083 /* Called from receive_Data.
2084 * Synchronize packets on sock with packets on msock.
2085 *
2086 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2087 * packet traveling on msock, they are still processed in the order they have
2088 * been sent.
2089 *
2090 * Note: we don't care for Ack packets overtaking P_DATA packets.
2091 *
2092 * In case packet_seq is larger than device->peer_seq number, there are
2093 * outstanding packets on the msock. We wait for them to arrive.
2094 * In case we are the logically next packet, we update device->peer_seq
2095 * ourselves. Correctly handles 32bit wrap around.
2096 *
2097 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2098 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2099 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2100 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2101 *
2102 * returns 0 if we may process the packet,
2103 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2104 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2105 {
2106 struct drbd_device *device = peer_device->device;
2107 DEFINE_WAIT(wait);
2108 long timeout;
2109 int ret = 0, tp;
2110
2111 if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2112 return 0;
2113
2114 spin_lock(&device->peer_seq_lock);
2115 for (;;) {
2116 if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2117 device->peer_seq = seq_max(device->peer_seq, peer_seq);
2118 break;
2119 }
2120
2121 if (signal_pending(current)) {
2122 ret = -ERESTARTSYS;
2123 break;
2124 }
2125
2126 rcu_read_lock();
2127 tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2128 rcu_read_unlock();
2129
2130 if (!tp)
2131 break;
2132
2133 /* Only need to wait if two_primaries is enabled */
2134 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2135 spin_unlock(&device->peer_seq_lock);
2136 rcu_read_lock();
2137 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2138 rcu_read_unlock();
2139 timeout = schedule_timeout(timeout);
2140 spin_lock(&device->peer_seq_lock);
2141 if (!timeout) {
2142 ret = -ETIMEDOUT;
2143 drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2144 break;
2145 }
2146 }
2147 spin_unlock(&device->peer_seq_lock);
2148 finish_wait(&device->seq_wait, &wait);
2149 return ret;
2150 }
2151
2152 /* see also bio_flags_to_wire()
2153 * DRBD_REQ_*, because we need to semantically map the flags to data packet
2154 * flags and back. We may replicate to other kernel versions. */
2155 static unsigned long wire_flags_to_bio(u32 dpf)
2156 {
2157 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2158 (dpf & DP_FUA ? REQ_FUA : 0) |
2159 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2160 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
2161 }
2162
2163 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2164 unsigned int size)
2165 {
2166 struct drbd_interval *i;
2167
2168 repeat:
2169 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2170 struct drbd_request *req;
2171 struct bio_and_error m;
2172
2173 if (!i->local)
2174 continue;
2175 req = container_of(i, struct drbd_request, i);
2176 if (!(req->rq_state & RQ_POSTPONED))
2177 continue;
2178 req->rq_state &= ~RQ_POSTPONED;
2179 __req_mod(req, NEG_ACKED, &m);
2180 spin_unlock_irq(&device->resource->req_lock);
2181 if (m.bio)
2182 complete_master_bio(device, &m);
2183 spin_lock_irq(&device->resource->req_lock);
2184 goto repeat;
2185 }
2186 }
2187
2188 static int handle_write_conflicts(struct drbd_device *device,
2189 struct drbd_peer_request *peer_req)
2190 {
2191 struct drbd_connection *connection = peer_req->peer_device->connection;
2192 bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2193 sector_t sector = peer_req->i.sector;
2194 const unsigned int size = peer_req->i.size;
2195 struct drbd_interval *i;
2196 bool equal;
2197 int err;
2198
2199 /*
2200 * Inserting the peer request into the write_requests tree will prevent
2201 * new conflicting local requests from being added.
2202 */
2203 drbd_insert_interval(&device->write_requests, &peer_req->i);
2204
2205 repeat:
2206 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2207 if (i == &peer_req->i)
2208 continue;
2209 if (i->completed)
2210 continue;
2211
2212 if (!i->local) {
2213 /*
2214 * Our peer has sent a conflicting remote request; this
2215 * should not happen in a two-node setup. Wait for the
2216 * earlier peer request to complete.
2217 */
2218 err = drbd_wait_misc(device, i);
2219 if (err)
2220 goto out;
2221 goto repeat;
2222 }
2223
2224 equal = i->sector == sector && i->size == size;
2225 if (resolve_conflicts) {
2226 /*
2227 * If the peer request is fully contained within the
2228 * overlapping request, it can be considered overwritten
2229 * and thus superseded; otherwise, it will be retried
2230 * once all overlapping requests have completed.
2231 */
2232 bool superseded = i->sector <= sector && i->sector +
2233 (i->size >> 9) >= sector + (size >> 9);
2234
2235 if (!equal)
2236 drbd_alert(device, "Concurrent writes detected: "
2237 "local=%llus +%u, remote=%llus +%u, "
2238 "assuming %s came first\n",
2239 (unsigned long long)i->sector, i->size,
2240 (unsigned long long)sector, size,
2241 superseded ? "local" : "remote");
2242
2243 peer_req->w.cb = superseded ? e_send_superseded :
2244 e_send_retry_write;
2245 list_add_tail(&peer_req->w.list, &device->done_ee);
2246 queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
2247
2248 err = -ENOENT;
2249 goto out;
2250 } else {
2251 struct drbd_request *req =
2252 container_of(i, struct drbd_request, i);
2253
2254 if (!equal)
2255 drbd_alert(device, "Concurrent writes detected: "
2256 "local=%llus +%u, remote=%llus +%u\n",
2257 (unsigned long long)i->sector, i->size,
2258 (unsigned long long)sector, size);
2259
2260 if (req->rq_state & RQ_LOCAL_PENDING ||
2261 !(req->rq_state & RQ_POSTPONED)) {
2262 /*
2263 * Wait for the node with the discard flag to
2264 * decide if this request has been superseded
2265 * or needs to be retried.
2266 * Requests that have been superseded will
2267 * disappear from the write_requests tree.
2268 *
2269 * In addition, wait for the conflicting
2270 * request to finish locally before submitting
2271 * the conflicting peer request.
2272 */
2273 err = drbd_wait_misc(device, &req->i);
2274 if (err) {
2275 _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2276 fail_postponed_requests(device, sector, size);
2277 goto out;
2278 }
2279 goto repeat;
2280 }
2281 /*
2282 * Remember to restart the conflicting requests after
2283 * the new peer request has completed.
2284 */
2285 peer_req->flags |= EE_RESTART_REQUESTS;
2286 }
2287 }
2288 err = 0;
2289
2290 out:
2291 if (err)
2292 drbd_remove_epoch_entry_interval(device, peer_req);
2293 return err;
2294 }
2295
2296 /* mirrored write */
2297 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2298 {
2299 struct drbd_peer_device *peer_device;
2300 struct drbd_device *device;
2301 struct net_conf *nc;
2302 sector_t sector;
2303 struct drbd_peer_request *peer_req;
2304 struct p_data *p = pi->data;
2305 u32 peer_seq = be32_to_cpu(p->seq_num);
2306 int rw = WRITE;
2307 u32 dp_flags;
2308 int err, tp;
2309
2310 peer_device = conn_peer_device(connection, pi->vnr);
2311 if (!peer_device)
2312 return -EIO;
2313 device = peer_device->device;
2314
2315 if (!get_ldev(device)) {
2316 int err2;
2317
2318 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2319 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2320 atomic_inc(&connection->current_epoch->epoch_size);
2321 err2 = drbd_drain_block(peer_device, pi->size);
2322 if (!err)
2323 err = err2;
2324 return err;
2325 }
2326
2327 /*
2328 * Corresponding put_ldev done either below (on various errors), or in
2329 * drbd_peer_request_endio, if we successfully submit the data at the
2330 * end of this function.
2331 */
2332
2333 sector = be64_to_cpu(p->sector);
2334 peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2335 if (!peer_req) {
2336 put_ldev(device);
2337 return -EIO;
2338 }
2339
2340 peer_req->w.cb = e_end_block;
2341 peer_req->submit_jif = jiffies;
2342 peer_req->flags |= EE_APPLICATION;
2343
2344 dp_flags = be32_to_cpu(p->dp_flags);
2345 rw |= wire_flags_to_bio(dp_flags);
2346 if (pi->cmd == P_TRIM) {
2347 struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
2348 peer_req->flags |= EE_IS_TRIM;
2349 if (!blk_queue_discard(q))
2350 peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT;
2351 D_ASSERT(peer_device, peer_req->i.size > 0);
2352 D_ASSERT(peer_device, rw & REQ_DISCARD);
2353 D_ASSERT(peer_device, peer_req->pages == NULL);
2354 } else if (peer_req->pages == NULL) {
2355 D_ASSERT(device, peer_req->i.size == 0);
2356 D_ASSERT(device, dp_flags & DP_FLUSH);
2357 }
2358
2359 if (dp_flags & DP_MAY_SET_IN_SYNC)
2360 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2361
2362 spin_lock(&connection->epoch_lock);
2363 peer_req->epoch = connection->current_epoch;
2364 atomic_inc(&peer_req->epoch->epoch_size);
2365 atomic_inc(&peer_req->epoch->active);
2366 spin_unlock(&connection->epoch_lock);
2367
2368 rcu_read_lock();
2369 nc = rcu_dereference(peer_device->connection->net_conf);
2370 tp = nc->two_primaries;
2371 if (peer_device->connection->agreed_pro_version < 100) {
2372 switch (nc->wire_protocol) {
2373 case DRBD_PROT_C:
2374 dp_flags |= DP_SEND_WRITE_ACK;
2375 break;
2376 case DRBD_PROT_B:
2377 dp_flags |= DP_SEND_RECEIVE_ACK;
2378 break;
2379 }
2380 }
2381 rcu_read_unlock();
2382
2383 if (dp_flags & DP_SEND_WRITE_ACK) {
2384 peer_req->flags |= EE_SEND_WRITE_ACK;
2385 inc_unacked(device);
2386 /* corresponding dec_unacked() in e_end_block()
2387 * respective _drbd_clear_done_ee */
2388 }
2389
2390 if (dp_flags & DP_SEND_RECEIVE_ACK) {
2391 /* I really don't like it that the receiver thread
2392 * sends on the msock, but anyways */
2393 drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2394 }
2395
2396 if (tp) {
2397 /* two primaries implies protocol C */
2398 D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2399 peer_req->flags |= EE_IN_INTERVAL_TREE;
2400 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2401 if (err)
2402 goto out_interrupted;
2403 spin_lock_irq(&device->resource->req_lock);
2404 err = handle_write_conflicts(device, peer_req);
2405 if (err) {
2406 spin_unlock_irq(&device->resource->req_lock);
2407 if (err == -ENOENT) {
2408 put_ldev(device);
2409 return 0;
2410 }
2411 goto out_interrupted;
2412 }
2413 } else {
2414 update_peer_seq(peer_device, peer_seq);
2415 spin_lock_irq(&device->resource->req_lock);
2416 }
2417 /* if we use the zeroout fallback code, we process synchronously
2418 * and we wait for all pending requests, respectively wait for
2419 * active_ee to become empty in drbd_submit_peer_request();
2420 * better not add ourselves here. */
2421 if ((peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) == 0)
2422 list_add_tail(&peer_req->w.list, &device->active_ee);
2423 spin_unlock_irq(&device->resource->req_lock);
2424
2425 if (device->state.conn == C_SYNC_TARGET)
2426 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2427
2428 if (device->state.pdsk < D_INCONSISTENT) {
2429 /* In case we have the only disk of the cluster, */
2430 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2431 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2432 drbd_al_begin_io(device, &peer_req->i);
2433 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2434 }
2435
2436 err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR);
2437 if (!err)
2438 return 0;
2439
2440 /* don't care for the reason here */
2441 drbd_err(device, "submit failed, triggering re-connect\n");
2442 spin_lock_irq(&device->resource->req_lock);
2443 list_del(&peer_req->w.list);
2444 drbd_remove_epoch_entry_interval(device, peer_req);
2445 spin_unlock_irq(&device->resource->req_lock);
2446 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2447 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2448 drbd_al_complete_io(device, &peer_req->i);
2449 }
2450
2451 out_interrupted:
2452 drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
2453 put_ldev(device);
2454 drbd_free_peer_req(device, peer_req);
2455 return err;
2456 }
2457
2458 /* We may throttle resync, if the lower device seems to be busy,
2459 * and current sync rate is above c_min_rate.
2460 *
2461 * To decide whether or not the lower device is busy, we use a scheme similar
2462 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2463 * (more than 64 sectors) of activity we cannot account for with our own resync
2464 * activity, it obviously is "busy".
2465 *
2466 * The current sync rate used here uses only the most recent two step marks,
2467 * to have a short time average so we can react faster.
2468 */
2469 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2470 bool throttle_if_app_is_waiting)
2471 {
2472 struct lc_element *tmp;
2473 bool throttle = drbd_rs_c_min_rate_throttle(device);
2474
2475 if (!throttle || throttle_if_app_is_waiting)
2476 return throttle;
2477
2478 spin_lock_irq(&device->al_lock);
2479 tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2480 if (tmp) {
2481 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2482 if (test_bit(BME_PRIORITY, &bm_ext->flags))
2483 throttle = false;
2484 /* Do not slow down if app IO is already waiting for this extent,
2485 * and our progress is necessary for application IO to complete. */
2486 }
2487 spin_unlock_irq(&device->al_lock);
2488
2489 return throttle;
2490 }
2491
2492 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2493 {
2494 struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2495 unsigned long db, dt, dbdt;
2496 unsigned int c_min_rate;
2497 int curr_events;
2498
2499 rcu_read_lock();
2500 c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2501 rcu_read_unlock();
2502
2503 /* feature disabled? */
2504 if (c_min_rate == 0)
2505 return false;
2506
2507 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2508 (int)part_stat_read(&disk->part0, sectors[1]) -
2509 atomic_read(&device->rs_sect_ev);
2510
2511 if (atomic_read(&device->ap_actlog_cnt)
2512 || curr_events - device->rs_last_events > 64) {
2513 unsigned long rs_left;
2514 int i;
2515
2516 device->rs_last_events = curr_events;
2517
2518 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2519 * approx. */
2520 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2521
2522 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2523 rs_left = device->ov_left;
2524 else
2525 rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2526
2527 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2528 if (!dt)
2529 dt++;
2530 db = device->rs_mark_left[i] - rs_left;
2531 dbdt = Bit2KB(db/dt);
2532
2533 if (dbdt > c_min_rate)
2534 return true;
2535 }
2536 return false;
2537 }
2538
2539 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2540 {
2541 struct drbd_peer_device *peer_device;
2542 struct drbd_device *device;
2543 sector_t sector;
2544 sector_t capacity;
2545 struct drbd_peer_request *peer_req;
2546 struct digest_info *di = NULL;
2547 int size, verb;
2548 unsigned int fault_type;
2549 struct p_block_req *p = pi->data;
2550
2551 peer_device = conn_peer_device(connection, pi->vnr);
2552 if (!peer_device)
2553 return -EIO;
2554 device = peer_device->device;
2555 capacity = drbd_get_capacity(device->this_bdev);
2556
2557 sector = be64_to_cpu(p->sector);
2558 size = be32_to_cpu(p->blksize);
2559
2560 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2561 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2562 (unsigned long long)sector, size);
2563 return -EINVAL;
2564 }
2565 if (sector + (size>>9) > capacity) {
2566 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2567 (unsigned long long)sector, size);
2568 return -EINVAL;
2569 }
2570
2571 if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2572 verb = 1;
2573 switch (pi->cmd) {
2574 case P_DATA_REQUEST:
2575 drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2576 break;
2577 case P_RS_DATA_REQUEST:
2578 case P_CSUM_RS_REQUEST:
2579 case P_OV_REQUEST:
2580 drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2581 break;
2582 case P_OV_REPLY:
2583 verb = 0;
2584 dec_rs_pending(device);
2585 drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2586 break;
2587 default:
2588 BUG();
2589 }
2590 if (verb && __ratelimit(&drbd_ratelimit_state))
2591 drbd_err(device, "Can not satisfy peer's read request, "
2592 "no local data.\n");
2593
2594 /* drain possibly payload */
2595 return drbd_drain_block(peer_device, pi->size);
2596 }
2597
2598 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2599 * "criss-cross" setup, that might cause write-out on some other DRBD,
2600 * which in turn might block on the other node at this very place. */
2601 peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2602 true /* has real payload */, GFP_NOIO);
2603 if (!peer_req) {
2604 put_ldev(device);
2605 return -ENOMEM;
2606 }
2607
2608 switch (pi->cmd) {
2609 case P_DATA_REQUEST:
2610 peer_req->w.cb = w_e_end_data_req;
2611 fault_type = DRBD_FAULT_DT_RD;
2612 /* application IO, don't drbd_rs_begin_io */
2613 peer_req->flags |= EE_APPLICATION;
2614 goto submit;
2615
2616 case P_RS_DATA_REQUEST:
2617 peer_req->w.cb = w_e_end_rsdata_req;
2618 fault_type = DRBD_FAULT_RS_RD;
2619 /* used in the sector offset progress display */
2620 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2621 break;
2622
2623 case P_OV_REPLY:
2624 case P_CSUM_RS_REQUEST:
2625 fault_type = DRBD_FAULT_RS_RD;
2626 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2627 if (!di)
2628 goto out_free_e;
2629
2630 di->digest_size = pi->size;
2631 di->digest = (((char *)di)+sizeof(struct digest_info));
2632
2633 peer_req->digest = di;
2634 peer_req->flags |= EE_HAS_DIGEST;
2635
2636 if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2637 goto out_free_e;
2638
2639 if (pi->cmd == P_CSUM_RS_REQUEST) {
2640 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2641 peer_req->w.cb = w_e_end_csum_rs_req;
2642 /* used in the sector offset progress display */
2643 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2644 /* remember to report stats in drbd_resync_finished */
2645 device->use_csums = true;
2646 } else if (pi->cmd == P_OV_REPLY) {
2647 /* track progress, we may need to throttle */
2648 atomic_add(size >> 9, &device->rs_sect_in);
2649 peer_req->w.cb = w_e_end_ov_reply;
2650 dec_rs_pending(device);
2651 /* drbd_rs_begin_io done when we sent this request,
2652 * but accounting still needs to be done. */
2653 goto submit_for_resync;
2654 }
2655 break;
2656
2657 case P_OV_REQUEST:
2658 if (device->ov_start_sector == ~(sector_t)0 &&
2659 peer_device->connection->agreed_pro_version >= 90) {
2660 unsigned long now = jiffies;
2661 int i;
2662 device->ov_start_sector = sector;
2663 device->ov_position = sector;
2664 device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2665 device->rs_total = device->ov_left;
2666 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2667 device->rs_mark_left[i] = device->ov_left;
2668 device->rs_mark_time[i] = now;
2669 }
2670 drbd_info(device, "Online Verify start sector: %llu\n",
2671 (unsigned long long)sector);
2672 }
2673 peer_req->w.cb = w_e_end_ov_req;
2674 fault_type = DRBD_FAULT_RS_RD;
2675 break;
2676
2677 default:
2678 BUG();
2679 }
2680
2681 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2682 * wrt the receiver, but it is not as straightforward as it may seem.
2683 * Various places in the resync start and stop logic assume resync
2684 * requests are processed in order, requeuing this on the worker thread
2685 * introduces a bunch of new code for synchronization between threads.
2686 *
2687 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2688 * "forever", throttling after drbd_rs_begin_io will lock that extent
2689 * for application writes for the same time. For now, just throttle
2690 * here, where the rest of the code expects the receiver to sleep for
2691 * a while, anyways.
2692 */
2693
2694 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2695 * this defers syncer requests for some time, before letting at least
2696 * on request through. The resync controller on the receiving side
2697 * will adapt to the incoming rate accordingly.
2698 *
2699 * We cannot throttle here if remote is Primary/SyncTarget:
2700 * we would also throttle its application reads.
2701 * In that case, throttling is done on the SyncTarget only.
2702 */
2703
2704 /* Even though this may be a resync request, we do add to "read_ee";
2705 * "sync_ee" is only used for resync WRITEs.
2706 * Add to list early, so debugfs can find this request
2707 * even if we have to sleep below. */
2708 spin_lock_irq(&device->resource->req_lock);
2709 list_add_tail(&peer_req->w.list, &device->read_ee);
2710 spin_unlock_irq(&device->resource->req_lock);
2711
2712 update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2713 if (device->state.peer != R_PRIMARY
2714 && drbd_rs_should_slow_down(device, sector, false))
2715 schedule_timeout_uninterruptible(HZ/10);
2716 update_receiver_timing_details(connection, drbd_rs_begin_io);
2717 if (drbd_rs_begin_io(device, sector))
2718 goto out_free_e;
2719
2720 submit_for_resync:
2721 atomic_add(size >> 9, &device->rs_sect_ev);
2722
2723 submit:
2724 update_receiver_timing_details(connection, drbd_submit_peer_request);
2725 inc_unacked(device);
2726 if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0)
2727 return 0;
2728
2729 /* don't care for the reason here */
2730 drbd_err(device, "submit failed, triggering re-connect\n");
2731
2732 out_free_e:
2733 spin_lock_irq(&device->resource->req_lock);
2734 list_del(&peer_req->w.list);
2735 spin_unlock_irq(&device->resource->req_lock);
2736 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2737
2738 put_ldev(device);
2739 drbd_free_peer_req(device, peer_req);
2740 return -EIO;
2741 }
2742
2743 /**
2744 * drbd_asb_recover_0p - Recover after split-brain with no remaining primaries
2745 */
2746 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2747 {
2748 struct drbd_device *device = peer_device->device;
2749 int self, peer, rv = -100;
2750 unsigned long ch_self, ch_peer;
2751 enum drbd_after_sb_p after_sb_0p;
2752
2753 self = device->ldev->md.uuid[UI_BITMAP] & 1;
2754 peer = device->p_uuid[UI_BITMAP] & 1;
2755
2756 ch_peer = device->p_uuid[UI_SIZE];
2757 ch_self = device->comm_bm_set;
2758
2759 rcu_read_lock();
2760 after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2761 rcu_read_unlock();
2762 switch (after_sb_0p) {
2763 case ASB_CONSENSUS:
2764 case ASB_DISCARD_SECONDARY:
2765 case ASB_CALL_HELPER:
2766 case ASB_VIOLENTLY:
2767 drbd_err(device, "Configuration error.\n");
2768 break;
2769 case ASB_DISCONNECT:
2770 break;
2771 case ASB_DISCARD_YOUNGER_PRI:
2772 if (self == 0 && peer == 1) {
2773 rv = -1;
2774 break;
2775 }
2776 if (self == 1 && peer == 0) {
2777 rv = 1;
2778 break;
2779 }
2780 /* Else fall through to one of the other strategies... */
2781 case ASB_DISCARD_OLDER_PRI:
2782 if (self == 0 && peer == 1) {
2783 rv = 1;
2784 break;
2785 }
2786 if (self == 1 && peer == 0) {
2787 rv = -1;
2788 break;
2789 }
2790 /* Else fall through to one of the other strategies... */
2791 drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2792 "Using discard-least-changes instead\n");
2793 case ASB_DISCARD_ZERO_CHG:
2794 if (ch_peer == 0 && ch_self == 0) {
2795 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2796 ? -1 : 1;
2797 break;
2798 } else {
2799 if (ch_peer == 0) { rv = 1; break; }
2800 if (ch_self == 0) { rv = -1; break; }
2801 }
2802 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2803 break;
2804 case ASB_DISCARD_LEAST_CHG:
2805 if (ch_self < ch_peer)
2806 rv = -1;
2807 else if (ch_self > ch_peer)
2808 rv = 1;
2809 else /* ( ch_self == ch_peer ) */
2810 /* Well, then use something else. */
2811 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2812 ? -1 : 1;
2813 break;
2814 case ASB_DISCARD_LOCAL:
2815 rv = -1;
2816 break;
2817 case ASB_DISCARD_REMOTE:
2818 rv = 1;
2819 }
2820
2821 return rv;
2822 }
2823
2824 /**
2825 * drbd_asb_recover_1p - Recover after split-brain with one remaining primary
2826 */
2827 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
2828 {
2829 struct drbd_device *device = peer_device->device;
2830 int hg, rv = -100;
2831 enum drbd_after_sb_p after_sb_1p;
2832
2833 rcu_read_lock();
2834 after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
2835 rcu_read_unlock();
2836 switch (after_sb_1p) {
2837 case ASB_DISCARD_YOUNGER_PRI:
2838 case ASB_DISCARD_OLDER_PRI:
2839 case ASB_DISCARD_LEAST_CHG:
2840 case ASB_DISCARD_LOCAL:
2841 case ASB_DISCARD_REMOTE:
2842 case ASB_DISCARD_ZERO_CHG:
2843 drbd_err(device, "Configuration error.\n");
2844 break;
2845 case ASB_DISCONNECT:
2846 break;
2847 case ASB_CONSENSUS:
2848 hg = drbd_asb_recover_0p(peer_device);
2849 if (hg == -1 && device->state.role == R_SECONDARY)
2850 rv = hg;
2851 if (hg == 1 && device->state.role == R_PRIMARY)
2852 rv = hg;
2853 break;
2854 case ASB_VIOLENTLY:
2855 rv = drbd_asb_recover_0p(peer_device);
2856 break;
2857 case ASB_DISCARD_SECONDARY:
2858 return device->state.role == R_PRIMARY ? 1 : -1;
2859 case ASB_CALL_HELPER:
2860 hg = drbd_asb_recover_0p(peer_device);
2861 if (hg == -1 && device->state.role == R_PRIMARY) {
2862 enum drbd_state_rv rv2;
2863
2864 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2865 * we might be here in C_WF_REPORT_PARAMS which is transient.
2866 * we do not need to wait for the after state change work either. */
2867 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2868 if (rv2 != SS_SUCCESS) {
2869 drbd_khelper(device, "pri-lost-after-sb");
2870 } else {
2871 drbd_warn(device, "Successfully gave up primary role.\n");
2872 rv = hg;
2873 }
2874 } else
2875 rv = hg;
2876 }
2877
2878 return rv;
2879 }
2880
2881 /**
2882 * drbd_asb_recover_2p - Recover after split-brain with two remaining primaries
2883 */
2884 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
2885 {
2886 struct drbd_device *device = peer_device->device;
2887 int hg, rv = -100;
2888 enum drbd_after_sb_p after_sb_2p;
2889
2890 rcu_read_lock();
2891 after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
2892 rcu_read_unlock();
2893 switch (after_sb_2p) {
2894 case ASB_DISCARD_YOUNGER_PRI:
2895 case ASB_DISCARD_OLDER_PRI:
2896 case ASB_DISCARD_LEAST_CHG:
2897 case ASB_DISCARD_LOCAL:
2898 case ASB_DISCARD_REMOTE:
2899 case ASB_CONSENSUS:
2900 case ASB_DISCARD_SECONDARY:
2901 case ASB_DISCARD_ZERO_CHG:
2902 drbd_err(device, "Configuration error.\n");
2903 break;
2904 case ASB_VIOLENTLY:
2905 rv = drbd_asb_recover_0p(peer_device);
2906 break;
2907 case ASB_DISCONNECT:
2908 break;
2909 case ASB_CALL_HELPER:
2910 hg = drbd_asb_recover_0p(peer_device);
2911 if (hg == -1) {
2912 enum drbd_state_rv rv2;
2913
2914 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2915 * we might be here in C_WF_REPORT_PARAMS which is transient.
2916 * we do not need to wait for the after state change work either. */
2917 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2918 if (rv2 != SS_SUCCESS) {
2919 drbd_khelper(device, "pri-lost-after-sb");
2920 } else {
2921 drbd_warn(device, "Successfully gave up primary role.\n");
2922 rv = hg;
2923 }
2924 } else
2925 rv = hg;
2926 }
2927
2928 return rv;
2929 }
2930
2931 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
2932 u64 bits, u64 flags)
2933 {
2934 if (!uuid) {
2935 drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
2936 return;
2937 }
2938 drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2939 text,
2940 (unsigned long long)uuid[UI_CURRENT],
2941 (unsigned long long)uuid[UI_BITMAP],
2942 (unsigned long long)uuid[UI_HISTORY_START],
2943 (unsigned long long)uuid[UI_HISTORY_END],
2944 (unsigned long long)bits,
2945 (unsigned long long)flags);
2946 }
2947
2948 /*
2949 100 after split brain try auto recover
2950 2 C_SYNC_SOURCE set BitMap
2951 1 C_SYNC_SOURCE use BitMap
2952 0 no Sync
2953 -1 C_SYNC_TARGET use BitMap
2954 -2 C_SYNC_TARGET set BitMap
2955 -100 after split brain, disconnect
2956 -1000 unrelated data
2957 -1091 requires proto 91
2958 -1096 requires proto 96
2959 */
2960 static int drbd_uuid_compare(struct drbd_device *const device, int *rule_nr) __must_hold(local)
2961 {
2962 struct drbd_peer_device *const peer_device = first_peer_device(device);
2963 struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
2964 u64 self, peer;
2965 int i, j;
2966
2967 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2968 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2969
2970 *rule_nr = 10;
2971 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2972 return 0;
2973
2974 *rule_nr = 20;
2975 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2976 peer != UUID_JUST_CREATED)
2977 return -2;
2978
2979 *rule_nr = 30;
2980 if (self != UUID_JUST_CREATED &&
2981 (peer == UUID_JUST_CREATED || peer == (u64)0))
2982 return 2;
2983
2984 if (self == peer) {
2985 int rct, dc; /* roles at crash time */
2986
2987 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2988
2989 if (connection->agreed_pro_version < 91)
2990 return -1091;
2991
2992 if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2993 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2994 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
2995 drbd_uuid_move_history(device);
2996 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
2997 device->ldev->md.uuid[UI_BITMAP] = 0;
2998
2999 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3000 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3001 *rule_nr = 34;
3002 } else {
3003 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3004 *rule_nr = 36;
3005 }
3006
3007 return 1;
3008 }
3009
3010 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3011
3012 if (connection->agreed_pro_version < 91)
3013 return -1091;
3014
3015 if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3016 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3017 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3018
3019 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3020 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3021 device->p_uuid[UI_BITMAP] = 0UL;
3022
3023 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3024 *rule_nr = 35;
3025 } else {
3026 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3027 *rule_nr = 37;
3028 }
3029
3030 return -1;
3031 }
3032
3033 /* Common power [off|failure] */
3034 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3035 (device->p_uuid[UI_FLAGS] & 2);
3036 /* lowest bit is set when we were primary,
3037 * next bit (weight 2) is set when peer was primary */
3038 *rule_nr = 40;
3039
3040 switch (rct) {
3041 case 0: /* !self_pri && !peer_pri */ return 0;
3042 case 1: /* self_pri && !peer_pri */ return 1;
3043 case 2: /* !self_pri && peer_pri */ return -1;
3044 case 3: /* self_pri && peer_pri */
3045 dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3046 return dc ? -1 : 1;
3047 }
3048 }
3049
3050 *rule_nr = 50;
3051 peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3052 if (self == peer)
3053 return -1;
3054
3055 *rule_nr = 51;
3056 peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3057 if (self == peer) {
3058 if (connection->agreed_pro_version < 96 ?
3059 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3060 (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3061 peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3062 /* The last P_SYNC_UUID did not get though. Undo the last start of
3063 resync as sync source modifications of the peer's UUIDs. */
3064
3065 if (connection->agreed_pro_version < 91)
3066 return -1091;
3067
3068 device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3069 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3070
3071 drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3072 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3073
3074 return -1;
3075 }
3076 }
3077
3078 *rule_nr = 60;
3079 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3080 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3081 peer = device->p_uuid[i] & ~((u64)1);
3082 if (self == peer)
3083 return -2;
3084 }
3085
3086 *rule_nr = 70;
3087 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3088 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3089 if (self == peer)
3090 return 1;
3091
3092 *rule_nr = 71;
3093 self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3094 if (self == peer) {
3095 if (connection->agreed_pro_version < 96 ?
3096 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3097 (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3098 self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3099 /* The last P_SYNC_UUID did not get though. Undo the last start of
3100 resync as sync source modifications of our UUIDs. */
3101
3102 if (connection->agreed_pro_version < 91)
3103 return -1091;
3104
3105 __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3106 __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3107
3108 drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3109 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3110 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3111
3112 return 1;
3113 }
3114 }
3115
3116
3117 *rule_nr = 80;
3118 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3119 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3120 self = device->ldev->md.uuid[i] & ~((u64)1);
3121 if (self == peer)
3122 return 2;
3123 }
3124
3125 *rule_nr = 90;
3126 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3127 peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3128 if (self == peer && self != ((u64)0))
3129 return 100;
3130
3131 *rule_nr = 100;
3132 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3133 self = device->ldev->md.uuid[i] & ~((u64)1);
3134 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3135 peer = device->p_uuid[j] & ~((u64)1);
3136 if (self == peer)
3137 return -100;
3138 }
3139 }
3140
3141 return -1000;
3142 }
3143
3144 /* drbd_sync_handshake() returns the new conn state on success, or
3145 CONN_MASK (-1) on failure.
3146 */
3147 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3148 enum drbd_role peer_role,
3149 enum drbd_disk_state peer_disk) __must_hold(local)
3150 {
3151 struct drbd_device *device = peer_device->device;
3152 enum drbd_conns rv = C_MASK;
3153 enum drbd_disk_state mydisk;
3154 struct net_conf *nc;
3155 int hg, rule_nr, rr_conflict, tentative;
3156
3157 mydisk = device->state.disk;
3158 if (mydisk == D_NEGOTIATING)
3159 mydisk = device->new_state_tmp.disk;
3160
3161 drbd_info(device, "drbd_sync_handshake:\n");
3162
3163 spin_lock_irq(&device->ldev->md.uuid_lock);
3164 drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3165 drbd_uuid_dump(device, "peer", device->p_uuid,
3166 device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3167
3168 hg = drbd_uuid_compare(device, &rule_nr);
3169 spin_unlock_irq(&device->ldev->md.uuid_lock);
3170
3171 drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3172
3173 if (hg == -1000) {
3174 drbd_alert(device, "Unrelated data, aborting!\n");
3175 return C_MASK;
3176 }
3177 if (hg < -1000) {
3178 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3179 return C_MASK;
3180 }
3181
3182 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3183 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
3184 int f = (hg == -100) || abs(hg) == 2;
3185 hg = mydisk > D_INCONSISTENT ? 1 : -1;
3186 if (f)
3187 hg = hg*2;
3188 drbd_info(device, "Becoming sync %s due to disk states.\n",
3189 hg > 0 ? "source" : "target");
3190 }
3191
3192 if (abs(hg) == 100)
3193 drbd_khelper(device, "initial-split-brain");
3194
3195 rcu_read_lock();
3196 nc = rcu_dereference(peer_device->connection->net_conf);
3197
3198 if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3199 int pcount = (device->state.role == R_PRIMARY)
3200 + (peer_role == R_PRIMARY);
3201 int forced = (hg == -100);
3202
3203 switch (pcount) {
3204 case 0:
3205 hg = drbd_asb_recover_0p(peer_device);
3206 break;
3207 case 1:
3208 hg = drbd_asb_recover_1p(peer_device);
3209 break;
3210 case 2:
3211 hg = drbd_asb_recover_2p(peer_device);
3212 break;
3213 }
3214 if (abs(hg) < 100) {
3215 drbd_warn(device, "Split-Brain detected, %d primaries, "
3216 "automatically solved. Sync from %s node\n",
3217 pcount, (hg < 0) ? "peer" : "this");
3218 if (forced) {
3219 drbd_warn(device, "Doing a full sync, since"
3220 " UUIDs where ambiguous.\n");
3221 hg = hg*2;
3222 }
3223 }
3224 }
3225
3226 if (hg == -100) {
3227 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3228 hg = -1;
3229 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3230 hg = 1;
3231
3232 if (abs(hg) < 100)
3233 drbd_warn(device, "Split-Brain detected, manually solved. "
3234 "Sync from %s node\n",
3235 (hg < 0) ? "peer" : "this");
3236 }
3237 rr_conflict = nc->rr_conflict;
3238 tentative = nc->tentative;
3239 rcu_read_unlock();
3240
3241 if (hg == -100) {
3242 /* FIXME this log message is not correct if we end up here
3243 * after an attempted attach on a diskless node.
3244 * We just refuse to attach -- well, we drop the "connection"
3245 * to that disk, in a way... */
3246 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3247 drbd_khelper(device, "split-brain");
3248 return C_MASK;
3249 }
3250
3251 if (hg > 0 && mydisk <= D_INCONSISTENT) {
3252 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3253 return C_MASK;
3254 }
3255
3256 if (hg < 0 && /* by intention we do not use mydisk here. */
3257 device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3258 switch (rr_conflict) {
3259 case ASB_CALL_HELPER:
3260 drbd_khelper(device, "pri-lost");
3261 /* fall through */
3262 case ASB_DISCONNECT:
3263 drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3264 return C_MASK;
3265 case ASB_VIOLENTLY:
3266 drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3267 "assumption\n");
3268 }
3269 }
3270
3271 if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3272 if (hg == 0)
3273 drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3274 else
3275 drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3276 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3277 abs(hg) >= 2 ? "full" : "bit-map based");
3278 return C_MASK;
3279 }
3280
3281 if (abs(hg) >= 2) {
3282 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3283 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3284 BM_LOCKED_SET_ALLOWED))
3285 return C_MASK;
3286 }
3287
3288 if (hg > 0) { /* become sync source. */
3289 rv = C_WF_BITMAP_S;
3290 } else if (hg < 0) { /* become sync target */
3291 rv = C_WF_BITMAP_T;
3292 } else {
3293 rv = C_CONNECTED;
3294 if (drbd_bm_total_weight(device)) {
3295 drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3296 drbd_bm_total_weight(device));
3297 }
3298 }
3299
3300 return rv;
3301 }
3302
3303 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3304 {
3305 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3306 if (peer == ASB_DISCARD_REMOTE)
3307 return ASB_DISCARD_LOCAL;
3308
3309 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3310 if (peer == ASB_DISCARD_LOCAL)
3311 return ASB_DISCARD_REMOTE;
3312
3313 /* everything else is valid if they are equal on both sides. */
3314 return peer;
3315 }
3316
3317 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3318 {
3319 struct p_protocol *p = pi->data;
3320 enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3321 int p_proto, p_discard_my_data, p_two_primaries, cf;
3322 struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3323 char integrity_alg[SHARED_SECRET_MAX] = "";
3324 struct crypto_hash *peer_integrity_tfm = NULL;
3325 void *int_dig_in = NULL, *int_dig_vv = NULL;
3326
3327 p_proto = be32_to_cpu(p->protocol);
3328 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
3329 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
3330 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
3331 p_two_primaries = be32_to_cpu(p->two_primaries);
3332 cf = be32_to_cpu(p->conn_flags);
3333 p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3334
3335 if (connection->agreed_pro_version >= 87) {
3336 int err;
3337
3338 if (pi->size > sizeof(integrity_alg))
3339 return -EIO;
3340 err = drbd_recv_all(connection, integrity_alg, pi->size);
3341 if (err)
3342 return err;
3343 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3344 }
3345
3346 if (pi->cmd != P_PROTOCOL_UPDATE) {
3347 clear_bit(CONN_DRY_RUN, &connection->flags);
3348
3349 if (cf & CF_DRY_RUN)
3350 set_bit(CONN_DRY_RUN, &connection->flags);
3351
3352 rcu_read_lock();
3353 nc = rcu_dereference(connection->net_conf);
3354
3355 if (p_proto != nc->wire_protocol) {
3356 drbd_err(connection, "incompatible %s settings\n", "protocol");
3357 goto disconnect_rcu_unlock;
3358 }
3359
3360 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3361 drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3362 goto disconnect_rcu_unlock;
3363 }
3364
3365 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3366 drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3367 goto disconnect_rcu_unlock;
3368 }
3369
3370 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3371 drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3372 goto disconnect_rcu_unlock;
3373 }
3374
3375 if (p_discard_my_data && nc->discard_my_data) {
3376 drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3377 goto disconnect_rcu_unlock;
3378 }
3379
3380 if (p_two_primaries != nc->two_primaries) {
3381 drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3382 goto disconnect_rcu_unlock;
3383 }
3384
3385 if (strcmp(integrity_alg, nc->integrity_alg)) {
3386 drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3387 goto disconnect_rcu_unlock;
3388 }
3389
3390 rcu_read_unlock();
3391 }
3392
3393 if (integrity_alg[0]) {
3394 int hash_size;
3395
3396 /*
3397 * We can only change the peer data integrity algorithm
3398 * here. Changing our own data integrity algorithm
3399 * requires that we send a P_PROTOCOL_UPDATE packet at
3400 * the same time; otherwise, the peer has no way to
3401 * tell between which packets the algorithm should
3402 * change.
3403 */
3404
3405 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3406 if (!peer_integrity_tfm) {
3407 drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3408 integrity_alg);
3409 goto disconnect;
3410 }
3411
3412 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3413 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3414 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3415 if (!(int_dig_in && int_dig_vv)) {
3416 drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3417 goto disconnect;
3418 }
3419 }
3420
3421 new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3422 if (!new_net_conf) {
3423 drbd_err(connection, "Allocation of new net_conf failed\n");
3424 goto disconnect;
3425 }
3426
3427 mutex_lock(&connection->data.mutex);
3428 mutex_lock(&connection->resource->conf_update);
3429 old_net_conf = connection->net_conf;
3430 *new_net_conf = *old_net_conf;
3431
3432 new_net_conf->wire_protocol = p_proto;
3433 new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3434 new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3435 new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3436 new_net_conf->two_primaries = p_two_primaries;
3437
3438 rcu_assign_pointer(connection->net_conf, new_net_conf);
3439 mutex_unlock(&connection->resource->conf_update);
3440 mutex_unlock(&connection->data.mutex);
3441
3442 crypto_free_hash(connection->peer_integrity_tfm);
3443 kfree(connection->int_dig_in);
3444 kfree(connection->int_dig_vv);
3445 connection->peer_integrity_tfm = peer_integrity_tfm;
3446 connection->int_dig_in = int_dig_in;
3447 connection->int_dig_vv = int_dig_vv;
3448
3449 if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3450 drbd_info(connection, "peer data-integrity-alg: %s\n",
3451 integrity_alg[0] ? integrity_alg : "(none)");
3452
3453 synchronize_rcu();
3454 kfree(old_net_conf);
3455 return 0;
3456
3457 disconnect_rcu_unlock:
3458 rcu_read_unlock();
3459 disconnect:
3460 crypto_free_hash(peer_integrity_tfm);
3461 kfree(int_dig_in);
3462 kfree(int_dig_vv);
3463 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3464 return -EIO;
3465 }
3466
3467 /* helper function
3468 * input: alg name, feature name
3469 * return: NULL (alg name was "")
3470 * ERR_PTR(error) if something goes wrong
3471 * or the crypto hash ptr, if it worked out ok. */
3472 static struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3473 const char *alg, const char *name)
3474 {
3475 struct crypto_hash *tfm;
3476
3477 if (!alg[0])
3478 return NULL;
3479
3480 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3481 if (IS_ERR(tfm)) {
3482 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3483 alg, name, PTR_ERR(tfm));
3484 return tfm;
3485 }
3486 return tfm;
3487 }
3488
3489 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3490 {
3491 void *buffer = connection->data.rbuf;
3492 int size = pi->size;
3493
3494 while (size) {
3495 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3496 s = drbd_recv(connection, buffer, s);
3497 if (s <= 0) {
3498 if (s < 0)
3499 return s;
3500 break;
3501 }
3502 size -= s;
3503 }
3504 if (size)
3505 return -EIO;
3506 return 0;
3507 }
3508
3509 /*
3510 * config_unknown_volume - device configuration command for unknown volume
3511 *
3512 * When a device is added to an existing connection, the node on which the
3513 * device is added first will send configuration commands to its peer but the
3514 * peer will not know about the device yet. It will warn and ignore these
3515 * commands. Once the device is added on the second node, the second node will
3516 * send the same device configuration commands, but in the other direction.
3517 *
3518 * (We can also end up here if drbd is misconfigured.)
3519 */
3520 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3521 {
3522 drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3523 cmdname(pi->cmd), pi->vnr);
3524 return ignore_remaining_packet(connection, pi);
3525 }
3526
3527 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3528 {
3529 struct drbd_peer_device *peer_device;
3530 struct drbd_device *device;
3531 struct p_rs_param_95 *p;
3532 unsigned int header_size, data_size, exp_max_sz;
3533 struct crypto_hash *verify_tfm = NULL;
3534 struct crypto_hash *csums_tfm = NULL;
3535 struct net_conf *old_net_conf, *new_net_conf = NULL;
3536 struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3537 const int apv = connection->agreed_pro_version;
3538 struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3539 int fifo_size = 0;
3540 int err;
3541
3542 peer_device = conn_peer_device(connection, pi->vnr);
3543 if (!peer_device)
3544 return config_unknown_volume(connection, pi);
3545 device = peer_device->device;
3546
3547 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
3548 : apv == 88 ? sizeof(struct p_rs_param)
3549 + SHARED_SECRET_MAX
3550 : apv <= 94 ? sizeof(struct p_rs_param_89)
3551 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3552
3553 if (pi->size > exp_max_sz) {
3554 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3555 pi->size, exp_max_sz);
3556 return -EIO;
3557 }
3558
3559 if (apv <= 88) {
3560 header_size = sizeof(struct p_rs_param);
3561 data_size = pi->size - header_size;
3562 } else if (apv <= 94) {
3563 header_size = sizeof(struct p_rs_param_89);
3564 data_size = pi->size - header_size;
3565 D_ASSERT(device, data_size == 0);
3566 } else {
3567 header_size = sizeof(struct p_rs_param_95);
3568 data_size = pi->size - header_size;
3569 D_ASSERT(device, data_size == 0);
3570 }
3571
3572 /* initialize verify_alg and csums_alg */
3573 p = pi->data;
3574 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3575
3576 err = drbd_recv_all(peer_device->connection, p, header_size);
3577 if (err)
3578 return err;
3579
3580 mutex_lock(&connection->resource->conf_update);
3581 old_net_conf = peer_device->connection->net_conf;
3582 if (get_ldev(device)) {
3583 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3584 if (!new_disk_conf) {
3585 put_ldev(device);
3586 mutex_unlock(&connection->resource->conf_update);
3587 drbd_err(device, "Allocation of new disk_conf failed\n");
3588 return -ENOMEM;
3589 }
3590
3591 old_disk_conf = device->ldev->disk_conf;
3592 *new_disk_conf = *old_disk_conf;
3593
3594 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3595 }
3596
3597 if (apv >= 88) {
3598 if (apv == 88) {
3599 if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3600 drbd_err(device, "verify-alg of wrong size, "
3601 "peer wants %u, accepting only up to %u byte\n",
3602 data_size, SHARED_SECRET_MAX);
3603 err = -EIO;
3604 goto reconnect;
3605 }
3606
3607 err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3608 if (err)
3609 goto reconnect;
3610 /* we expect NUL terminated string */
3611 /* but just in case someone tries to be evil */
3612 D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3613 p->verify_alg[data_size-1] = 0;
3614
3615 } else /* apv >= 89 */ {
3616 /* we still expect NUL terminated strings */
3617 /* but just in case someone tries to be evil */
3618 D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3619 D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3620 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3621 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3622 }
3623
3624 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3625 if (device->state.conn == C_WF_REPORT_PARAMS) {
3626 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3627 old_net_conf->verify_alg, p->verify_alg);
3628 goto disconnect;
3629 }
3630 verify_tfm = drbd_crypto_alloc_digest_safe(device,
3631 p->verify_alg, "verify-alg");
3632 if (IS_ERR(verify_tfm)) {
3633 verify_tfm = NULL;
3634 goto disconnect;
3635 }
3636 }
3637
3638 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3639 if (device->state.conn == C_WF_REPORT_PARAMS) {
3640 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3641 old_net_conf->csums_alg, p->csums_alg);
3642 goto disconnect;
3643 }
3644 csums_tfm = drbd_crypto_alloc_digest_safe(device,
3645 p->csums_alg, "csums-alg");
3646 if (IS_ERR(csums_tfm)) {
3647 csums_tfm = NULL;
3648 goto disconnect;
3649 }
3650 }
3651
3652 if (apv > 94 && new_disk_conf) {
3653 new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3654 new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3655 new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3656 new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3657
3658 fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3659 if (fifo_size != device->rs_plan_s->size) {
3660 new_plan = fifo_alloc(fifo_size);
3661 if (!new_plan) {
3662 drbd_err(device, "kmalloc of fifo_buffer failed");
3663 put_ldev(device);
3664 goto disconnect;
3665 }
3666 }
3667 }
3668
3669 if (verify_tfm || csums_tfm) {
3670 new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3671 if (!new_net_conf) {
3672 drbd_err(device, "Allocation of new net_conf failed\n");
3673 goto disconnect;
3674 }
3675
3676 *new_net_conf = *old_net_conf;
3677
3678 if (verify_tfm) {
3679 strcpy(new_net_conf->verify_alg, p->verify_alg);
3680 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3681 crypto_free_hash(peer_device->connection->verify_tfm);
3682 peer_device->connection->verify_tfm = verify_tfm;
3683 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3684 }
3685 if (csums_tfm) {
3686 strcpy(new_net_conf->csums_alg, p->csums_alg);
3687 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3688 crypto_free_hash(peer_device->connection->csums_tfm);
3689 peer_device->connection->csums_tfm = csums_tfm;
3690 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3691 }
3692 rcu_assign_pointer(connection->net_conf, new_net_conf);
3693 }
3694 }
3695
3696 if (new_disk_conf) {
3697 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3698 put_ldev(device);
3699 }
3700
3701 if (new_plan) {
3702 old_plan = device->rs_plan_s;
3703 rcu_assign_pointer(device->rs_plan_s, new_plan);
3704 }
3705
3706 mutex_unlock(&connection->resource->conf_update);
3707 synchronize_rcu();
3708 if (new_net_conf)
3709 kfree(old_net_conf);
3710 kfree(old_disk_conf);
3711 kfree(old_plan);
3712
3713 return 0;
3714
3715 reconnect:
3716 if (new_disk_conf) {
3717 put_ldev(device);
3718 kfree(new_disk_conf);
3719 }
3720 mutex_unlock(&connection->resource->conf_update);
3721 return -EIO;
3722
3723 disconnect:
3724 kfree(new_plan);
3725 if (new_disk_conf) {
3726 put_ldev(device);
3727 kfree(new_disk_conf);
3728 }
3729 mutex_unlock(&connection->resource->conf_update);
3730 /* just for completeness: actually not needed,
3731 * as this is not reached if csums_tfm was ok. */
3732 crypto_free_hash(csums_tfm);
3733 /* but free the verify_tfm again, if csums_tfm did not work out */
3734 crypto_free_hash(verify_tfm);
3735 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3736 return -EIO;
3737 }
3738
3739 /* warn if the arguments differ by more than 12.5% */
3740 static void warn_if_differ_considerably(struct drbd_device *device,
3741 const char *s, sector_t a, sector_t b)
3742 {
3743 sector_t d;
3744 if (a == 0 || b == 0)
3745 return;
3746 d = (a > b) ? (a - b) : (b - a);
3747 if (d > (a>>3) || d > (b>>3))
3748 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3749 (unsigned long long)a, (unsigned long long)b);
3750 }
3751
3752 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3753 {
3754 struct drbd_peer_device *peer_device;
3755 struct drbd_device *device;
3756 struct p_sizes *p = pi->data;
3757 enum determine_dev_size dd = DS_UNCHANGED;
3758 sector_t p_size, p_usize, p_csize, my_usize;
3759 int ldsc = 0; /* local disk size changed */
3760 enum dds_flags ddsf;
3761
3762 peer_device = conn_peer_device(connection, pi->vnr);
3763 if (!peer_device)
3764 return config_unknown_volume(connection, pi);
3765 device = peer_device->device;
3766
3767 p_size = be64_to_cpu(p->d_size);
3768 p_usize = be64_to_cpu(p->u_size);
3769 p_csize = be64_to_cpu(p->c_size);
3770
3771 /* just store the peer's disk size for now.
3772 * we still need to figure out whether we accept that. */
3773 device->p_size = p_size;
3774
3775 if (get_ldev(device)) {
3776 rcu_read_lock();
3777 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3778 rcu_read_unlock();
3779
3780 warn_if_differ_considerably(device, "lower level device sizes",
3781 p_size, drbd_get_max_capacity(device->ldev));
3782 warn_if_differ_considerably(device, "user requested size",
3783 p_usize, my_usize);
3784
3785 /* if this is the first connect, or an otherwise expected
3786 * param exchange, choose the minimum */
3787 if (device->state.conn == C_WF_REPORT_PARAMS)
3788 p_usize = min_not_zero(my_usize, p_usize);
3789
3790 /* Never shrink a device with usable data during connect.
3791 But allow online shrinking if we are connected. */
3792 if (drbd_new_dev_size(device, device->ldev, p_usize, 0) <
3793 drbd_get_capacity(device->this_bdev) &&
3794 device->state.disk >= D_OUTDATED &&
3795 device->state.conn < C_CONNECTED) {
3796 drbd_err(device, "The peer's disk size is too small!\n");
3797 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3798 put_ldev(device);
3799 return -EIO;
3800 }
3801
3802 if (my_usize != p_usize) {
3803 struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3804
3805 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3806 if (!new_disk_conf) {
3807 drbd_err(device, "Allocation of new disk_conf failed\n");
3808 put_ldev(device);
3809 return -ENOMEM;
3810 }
3811
3812 mutex_lock(&connection->resource->conf_update);
3813 old_disk_conf = device->ldev->disk_conf;
3814 *new_disk_conf = *old_disk_conf;
3815 new_disk_conf->disk_size = p_usize;
3816
3817 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3818 mutex_unlock(&connection->resource->conf_update);
3819 synchronize_rcu();
3820 kfree(old_disk_conf);
3821
3822 drbd_info(device, "Peer sets u_size to %lu sectors\n",
3823 (unsigned long)my_usize);
3824 }
3825
3826 put_ldev(device);
3827 }
3828
3829 device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3830 /* Leave drbd_reconsider_max_bio_size() before drbd_determine_dev_size().
3831 In case we cleared the QUEUE_FLAG_DISCARD from our queue in
3832 drbd_reconsider_max_bio_size(), we can be sure that after
3833 drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
3834
3835 ddsf = be16_to_cpu(p->dds_flags);
3836 if (get_ldev(device)) {
3837 drbd_reconsider_max_bio_size(device, device->ldev);
3838 dd = drbd_determine_dev_size(device, ddsf, NULL);
3839 put_ldev(device);
3840 if (dd == DS_ERROR)
3841 return -EIO;
3842 drbd_md_sync(device);
3843 } else {
3844 /*
3845 * I am diskless, need to accept the peer's *current* size.
3846 * I must NOT accept the peers backing disk size,
3847 * it may have been larger than mine all along...
3848 *
3849 * At this point, the peer knows more about my disk, or at
3850 * least about what we last agreed upon, than myself.
3851 * So if his c_size is less than his d_size, the most likely
3852 * reason is that *my* d_size was smaller last time we checked.
3853 *
3854 * However, if he sends a zero current size,
3855 * take his (user-capped or) backing disk size anyways.
3856 */
3857 drbd_reconsider_max_bio_size(device, NULL);
3858 drbd_set_my_capacity(device, p_csize ?: p_usize ?: p_size);
3859 }
3860
3861 if (get_ldev(device)) {
3862 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
3863 device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
3864 ldsc = 1;
3865 }
3866
3867 put_ldev(device);
3868 }
3869
3870 if (device->state.conn > C_WF_REPORT_PARAMS) {
3871 if (be64_to_cpu(p->c_size) !=
3872 drbd_get_capacity(device->this_bdev) || ldsc) {
3873 /* we have different sizes, probably peer
3874 * needs to know my new size... */
3875 drbd_send_sizes(peer_device, 0, ddsf);
3876 }
3877 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
3878 (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
3879 if (device->state.pdsk >= D_INCONSISTENT &&
3880 device->state.disk >= D_INCONSISTENT) {
3881 if (ddsf & DDSF_NO_RESYNC)
3882 drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
3883 else
3884 resync_after_online_grow(device);
3885 } else
3886 set_bit(RESYNC_AFTER_NEG, &device->flags);
3887 }
3888 }
3889
3890 return 0;
3891 }
3892
3893 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
3894 {
3895 struct drbd_peer_device *peer_device;
3896 struct drbd_device *device;
3897 struct p_uuids *p = pi->data;
3898 u64 *p_uuid;
3899 int i, updated_uuids = 0;
3900
3901 peer_device = conn_peer_device(connection, pi->vnr);
3902 if (!peer_device)
3903 return config_unknown_volume(connection, pi);
3904 device = peer_device->device;
3905
3906 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3907 if (!p_uuid) {
3908 drbd_err(device, "kmalloc of p_uuid failed\n");
3909 return false;
3910 }
3911
3912 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3913 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3914
3915 kfree(device->p_uuid);
3916 device->p_uuid = p_uuid;
3917
3918 if (device->state.conn < C_CONNECTED &&
3919 device->state.disk < D_INCONSISTENT &&
3920 device->state.role == R_PRIMARY &&
3921 (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3922 drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
3923 (unsigned long long)device->ed_uuid);
3924 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3925 return -EIO;
3926 }
3927
3928 if (get_ldev(device)) {
3929 int skip_initial_sync =
3930 device->state.conn == C_CONNECTED &&
3931 peer_device->connection->agreed_pro_version >= 90 &&
3932 device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3933 (p_uuid[UI_FLAGS] & 8);
3934 if (skip_initial_sync) {
3935 drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
3936 drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
3937 "clear_n_write from receive_uuids",
3938 BM_LOCKED_TEST_ALLOWED);
3939 _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
3940 _drbd_uuid_set(device, UI_BITMAP, 0);
3941 _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3942 CS_VERBOSE, NULL);
3943 drbd_md_sync(device);
3944 updated_uuids = 1;
3945 }
3946 put_ldev(device);
3947 } else if (device->state.disk < D_INCONSISTENT &&
3948 device->state.role == R_PRIMARY) {
3949 /* I am a diskless primary, the peer just created a new current UUID
3950 for me. */
3951 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3952 }
3953
3954 /* Before we test for the disk state, we should wait until an eventually
3955 ongoing cluster wide state change is finished. That is important if
3956 we are primary and are detaching from our disk. We need to see the
3957 new disk state... */
3958 mutex_lock(device->state_mutex);
3959 mutex_unlock(device->state_mutex);
3960 if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
3961 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3962
3963 if (updated_uuids)
3964 drbd_print_uuids(device, "receiver updated UUIDs to");
3965
3966 return 0;
3967 }
3968
3969 /**
3970 * convert_state() - Converts the peer's view of the cluster state to our point of view
3971 * @ps: The state as seen by the peer.
3972 */
3973 static union drbd_state convert_state(union drbd_state ps)
3974 {
3975 union drbd_state ms;
3976
3977 static enum drbd_conns c_tab[] = {
3978 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3979 [C_CONNECTED] = C_CONNECTED,
3980
3981 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3982 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3983 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3984 [C_VERIFY_S] = C_VERIFY_T,
3985 [C_MASK] = C_MASK,
3986 };
3987
3988 ms.i = ps.i;
3989
3990 ms.conn = c_tab[ps.conn];
3991 ms.peer = ps.role;
3992 ms.role = ps.peer;
3993 ms.pdsk = ps.disk;
3994 ms.disk = ps.pdsk;
3995 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3996
3997 return ms;
3998 }
3999
4000 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4001 {
4002 struct drbd_peer_device *peer_device;
4003 struct drbd_device *device;
4004 struct p_req_state *p = pi->data;
4005 union drbd_state mask, val;
4006 enum drbd_state_rv rv;
4007
4008 peer_device = conn_peer_device(connection, pi->vnr);
4009 if (!peer_device)
4010 return -EIO;
4011 device = peer_device->device;
4012
4013 mask.i = be32_to_cpu(p->mask);
4014 val.i = be32_to_cpu(p->val);
4015
4016 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4017 mutex_is_locked(device->state_mutex)) {
4018 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4019 return 0;
4020 }
4021
4022 mask = convert_state(mask);
4023 val = convert_state(val);
4024
4025 rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4026 drbd_send_sr_reply(peer_device, rv);
4027
4028 drbd_md_sync(device);
4029
4030 return 0;
4031 }
4032
4033 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4034 {
4035 struct p_req_state *p = pi->data;
4036 union drbd_state mask, val;
4037 enum drbd_state_rv rv;
4038
4039 mask.i = be32_to_cpu(p->mask);
4040 val.i = be32_to_cpu(p->val);
4041
4042 if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4043 mutex_is_locked(&connection->cstate_mutex)) {
4044 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4045 return 0;
4046 }
4047
4048 mask = convert_state(mask);
4049 val = convert_state(val);
4050
4051 rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4052 conn_send_sr_reply(connection, rv);
4053
4054 return 0;
4055 }
4056
4057 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4058 {
4059 struct drbd_peer_device *peer_device;
4060 struct drbd_device *device;
4061 struct p_state *p = pi->data;
4062 union drbd_state os, ns, peer_state;
4063 enum drbd_disk_state real_peer_disk;
4064 enum chg_state_flags cs_flags;
4065 int rv;
4066
4067 peer_device = conn_peer_device(connection, pi->vnr);
4068 if (!peer_device)
4069 return config_unknown_volume(connection, pi);
4070 device = peer_device->device;
4071
4072 peer_state.i = be32_to_cpu(p->state);
4073
4074 real_peer_disk = peer_state.disk;
4075 if (peer_state.disk == D_NEGOTIATING) {
4076 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4077 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4078 }
4079
4080 spin_lock_irq(&device->resource->req_lock);
4081 retry:
4082 os = ns = drbd_read_state(device);
4083 spin_unlock_irq(&device->resource->req_lock);
4084
4085 /* If some other part of the code (ack_receiver thread, timeout)
4086 * already decided to close the connection again,
4087 * we must not "re-establish" it here. */
4088 if (os.conn <= C_TEAR_DOWN)
4089 return -ECONNRESET;
4090
4091 /* If this is the "end of sync" confirmation, usually the peer disk
4092 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4093 * set) resync started in PausedSyncT, or if the timing of pause-/
4094 * unpause-sync events has been "just right", the peer disk may
4095 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4096 */
4097 if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4098 real_peer_disk == D_UP_TO_DATE &&
4099 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4100 /* If we are (becoming) SyncSource, but peer is still in sync
4101 * preparation, ignore its uptodate-ness to avoid flapping, it
4102 * will change to inconsistent once the peer reaches active
4103 * syncing states.
4104 * It may have changed syncer-paused flags, however, so we
4105 * cannot ignore this completely. */
4106 if (peer_state.conn > C_CONNECTED &&
4107 peer_state.conn < C_SYNC_SOURCE)
4108 real_peer_disk = D_INCONSISTENT;
4109
4110 /* if peer_state changes to connected at the same time,
4111 * it explicitly notifies us that it finished resync.
4112 * Maybe we should finish it up, too? */
4113 else if (os.conn >= C_SYNC_SOURCE &&
4114 peer_state.conn == C_CONNECTED) {
4115 if (drbd_bm_total_weight(device) <= device->rs_failed)
4116 drbd_resync_finished(device);
4117 return 0;
4118 }
4119 }
4120
4121 /* explicit verify finished notification, stop sector reached. */
4122 if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4123 peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4124 ov_out_of_sync_print(device);
4125 drbd_resync_finished(device);
4126 return 0;
4127 }
4128
4129 /* peer says his disk is inconsistent, while we think it is uptodate,
4130 * and this happens while the peer still thinks we have a sync going on,
4131 * but we think we are already done with the sync.
4132 * We ignore this to avoid flapping pdsk.
4133 * This should not happen, if the peer is a recent version of drbd. */
4134 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4135 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4136 real_peer_disk = D_UP_TO_DATE;
4137
4138 if (ns.conn == C_WF_REPORT_PARAMS)
4139 ns.conn = C_CONNECTED;
4140
4141 if (peer_state.conn == C_AHEAD)
4142 ns.conn = C_BEHIND;
4143
4144 if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4145 get_ldev_if_state(device, D_NEGOTIATING)) {
4146 int cr; /* consider resync */
4147
4148 /* if we established a new connection */
4149 cr = (os.conn < C_CONNECTED);
4150 /* if we had an established connection
4151 * and one of the nodes newly attaches a disk */
4152 cr |= (os.conn == C_CONNECTED &&
4153 (peer_state.disk == D_NEGOTIATING ||
4154 os.disk == D_NEGOTIATING));
4155 /* if we have both been inconsistent, and the peer has been
4156 * forced to be UpToDate with --overwrite-data */
4157 cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4158 /* if we had been plain connected, and the admin requested to
4159 * start a sync by "invalidate" or "invalidate-remote" */
4160 cr |= (os.conn == C_CONNECTED &&
4161 (peer_state.conn >= C_STARTING_SYNC_S &&
4162 peer_state.conn <= C_WF_BITMAP_T));
4163
4164 if (cr)
4165 ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4166
4167 put_ldev(device);
4168 if (ns.conn == C_MASK) {
4169 ns.conn = C_CONNECTED;
4170 if (device->state.disk == D_NEGOTIATING) {
4171 drbd_force_state(device, NS(disk, D_FAILED));
4172 } else if (peer_state.disk == D_NEGOTIATING) {
4173 drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4174 peer_state.disk = D_DISKLESS;
4175 real_peer_disk = D_DISKLESS;
4176 } else {
4177 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4178 return -EIO;
4179 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4180 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4181 return -EIO;
4182 }
4183 }
4184 }
4185
4186 spin_lock_irq(&device->resource->req_lock);
4187 if (os.i != drbd_read_state(device).i)
4188 goto retry;
4189 clear_bit(CONSIDER_RESYNC, &device->flags);
4190 ns.peer = peer_state.role;
4191 ns.pdsk = real_peer_disk;
4192 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4193 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4194 ns.disk = device->new_state_tmp.disk;
4195 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4196 if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4197 test_bit(NEW_CUR_UUID, &device->flags)) {
4198 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4199 for temporal network outages! */
4200 spin_unlock_irq(&device->resource->req_lock);
4201 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4202 tl_clear(peer_device->connection);
4203 drbd_uuid_new_current(device);
4204 clear_bit(NEW_CUR_UUID, &device->flags);
4205 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4206 return -EIO;
4207 }
4208 rv = _drbd_set_state(device, ns, cs_flags, NULL);
4209 ns = drbd_read_state(device);
4210 spin_unlock_irq(&device->resource->req_lock);
4211
4212 if (rv < SS_SUCCESS) {
4213 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4214 return -EIO;
4215 }
4216
4217 if (os.conn > C_WF_REPORT_PARAMS) {
4218 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4219 peer_state.disk != D_NEGOTIATING ) {
4220 /* we want resync, peer has not yet decided to sync... */
4221 /* Nowadays only used when forcing a node into primary role and
4222 setting its disk to UpToDate with that */
4223 drbd_send_uuids(peer_device);
4224 drbd_send_current_state(peer_device);
4225 }
4226 }
4227
4228 clear_bit(DISCARD_MY_DATA, &device->flags);
4229
4230 drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4231
4232 return 0;
4233 }
4234
4235 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4236 {
4237 struct drbd_peer_device *peer_device;
4238 struct drbd_device *device;
4239 struct p_rs_uuid *p = pi->data;
4240
4241 peer_device = conn_peer_device(connection, pi->vnr);
4242 if (!peer_device)
4243 return -EIO;
4244 device = peer_device->device;
4245
4246 wait_event(device->misc_wait,
4247 device->state.conn == C_WF_SYNC_UUID ||
4248 device->state.conn == C_BEHIND ||
4249 device->state.conn < C_CONNECTED ||
4250 device->state.disk < D_NEGOTIATING);
4251
4252 /* D_ASSERT(device, device->state.conn == C_WF_SYNC_UUID ); */
4253
4254 /* Here the _drbd_uuid_ functions are right, current should
4255 _not_ be rotated into the history */
4256 if (get_ldev_if_state(device, D_NEGOTIATING)) {
4257 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4258 _drbd_uuid_set(device, UI_BITMAP, 0UL);
4259
4260 drbd_print_uuids(device, "updated sync uuid");
4261 drbd_start_resync(device, C_SYNC_TARGET);
4262
4263 put_ldev(device);
4264 } else
4265 drbd_err(device, "Ignoring SyncUUID packet!\n");
4266
4267 return 0;
4268 }
4269
4270 /**
4271 * receive_bitmap_plain
4272 *
4273 * Return 0 when done, 1 when another iteration is needed, and a negative error
4274 * code upon failure.
4275 */
4276 static int
4277 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4278 unsigned long *p, struct bm_xfer_ctx *c)
4279 {
4280 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4281 drbd_header_size(peer_device->connection);
4282 unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4283 c->bm_words - c->word_offset);
4284 unsigned int want = num_words * sizeof(*p);
4285 int err;
4286
4287 if (want != size) {
4288 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4289 return -EIO;
4290 }
4291 if (want == 0)
4292 return 0;
4293 err = drbd_recv_all(peer_device->connection, p, want);
4294 if (err)
4295 return err;
4296
4297 drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4298
4299 c->word_offset += num_words;
4300 c->bit_offset = c->word_offset * BITS_PER_LONG;
4301 if (c->bit_offset > c->bm_bits)
4302 c->bit_offset = c->bm_bits;
4303
4304 return 1;
4305 }
4306
4307 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4308 {
4309 return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4310 }
4311
4312 static int dcbp_get_start(struct p_compressed_bm *p)
4313 {
4314 return (p->encoding & 0x80) != 0;
4315 }
4316
4317 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4318 {
4319 return (p->encoding >> 4) & 0x7;
4320 }
4321
4322 /**
4323 * recv_bm_rle_bits
4324 *
4325 * Return 0 when done, 1 when another iteration is needed, and a negative error
4326 * code upon failure.
4327 */
4328 static int
4329 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4330 struct p_compressed_bm *p,
4331 struct bm_xfer_ctx *c,
4332 unsigned int len)
4333 {
4334 struct bitstream bs;
4335 u64 look_ahead;
4336 u64 rl;
4337 u64 tmp;
4338 unsigned long s = c->bit_offset;
4339 unsigned long e;
4340 int toggle = dcbp_get_start(p);
4341 int have;
4342 int bits;
4343
4344 bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4345
4346 bits = bitstream_get_bits(&bs, &look_ahead, 64);
4347 if (bits < 0)
4348 return -EIO;
4349
4350 for (have = bits; have > 0; s += rl, toggle = !toggle) {
4351 bits = vli_decode_bits(&rl, look_ahead);
4352 if (bits <= 0)
4353 return -EIO;
4354
4355 if (toggle) {
4356 e = s + rl -1;
4357 if (e >= c->bm_bits) {
4358 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4359 return -EIO;
4360 }
4361 _drbd_bm_set_bits(peer_device->device, s, e);
4362 }
4363
4364 if (have < bits) {
4365 drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4366 have, bits, look_ahead,
4367 (unsigned int)(bs.cur.b - p->code),
4368 (unsigned int)bs.buf_len);
4369 return -EIO;
4370 }
4371 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4372 if (likely(bits < 64))
4373 look_ahead >>= bits;
4374 else
4375 look_ahead = 0;
4376 have -= bits;
4377
4378 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4379 if (bits < 0)
4380 return -EIO;
4381 look_ahead |= tmp << have;
4382 have += bits;
4383 }
4384
4385 c->bit_offset = s;
4386 bm_xfer_ctx_bit_to_word_offset(c);
4387
4388 return (s != c->bm_bits);
4389 }
4390
4391 /**
4392 * decode_bitmap_c
4393 *
4394 * Return 0 when done, 1 when another iteration is needed, and a negative error
4395 * code upon failure.
4396 */
4397 static int
4398 decode_bitmap_c(struct drbd_peer_device *peer_device,
4399 struct p_compressed_bm *p,
4400 struct bm_xfer_ctx *c,
4401 unsigned int len)
4402 {
4403 if (dcbp_get_code(p) == RLE_VLI_Bits)
4404 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4405
4406 /* other variants had been implemented for evaluation,
4407 * but have been dropped as this one turned out to be "best"
4408 * during all our tests. */
4409
4410 drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4411 conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4412 return -EIO;
4413 }
4414
4415 void INFO_bm_xfer_stats(struct drbd_device *device,
4416 const char *direction, struct bm_xfer_ctx *c)
4417 {
4418 /* what would it take to transfer it "plaintext" */
4419 unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4420 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4421 unsigned int plain =
4422 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4423 c->bm_words * sizeof(unsigned long);
4424 unsigned int total = c->bytes[0] + c->bytes[1];
4425 unsigned int r;
4426
4427 /* total can not be zero. but just in case: */
4428 if (total == 0)
4429 return;
4430
4431 /* don't report if not compressed */
4432 if (total >= plain)
4433 return;
4434
4435 /* total < plain. check for overflow, still */
4436 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4437 : (1000 * total / plain);
4438
4439 if (r > 1000)
4440 r = 1000;
4441
4442 r = 1000 - r;
4443 drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4444 "total %u; compression: %u.%u%%\n",
4445 direction,
4446 c->bytes[1], c->packets[1],
4447 c->bytes[0], c->packets[0],
4448 total, r/10, r % 10);
4449 }
4450
4451 /* Since we are processing the bitfield from lower addresses to higher,
4452 it does not matter if the process it in 32 bit chunks or 64 bit
4453 chunks as long as it is little endian. (Understand it as byte stream,
4454 beginning with the lowest byte...) If we would use big endian
4455 we would need to process it from the highest address to the lowest,
4456 in order to be agnostic to the 32 vs 64 bits issue.
4457
4458 returns 0 on failure, 1 if we successfully received it. */
4459 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4460 {
4461 struct drbd_peer_device *peer_device;
4462 struct drbd_device *device;
4463 struct bm_xfer_ctx c;
4464 int err;
4465
4466 peer_device = conn_peer_device(connection, pi->vnr);
4467 if (!peer_device)
4468 return -EIO;
4469 device = peer_device->device;
4470
4471 drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4472 /* you are supposed to send additional out-of-sync information
4473 * if you actually set bits during this phase */
4474
4475 c = (struct bm_xfer_ctx) {
4476 .bm_bits = drbd_bm_bits(device),
4477 .bm_words = drbd_bm_words(device),
4478 };
4479
4480 for(;;) {
4481 if (pi->cmd == P_BITMAP)
4482 err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4483 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4484 /* MAYBE: sanity check that we speak proto >= 90,
4485 * and the feature is enabled! */
4486 struct p_compressed_bm *p = pi->data;
4487
4488 if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4489 drbd_err(device, "ReportCBitmap packet too large\n");
4490 err = -EIO;
4491 goto out;
4492 }
4493 if (pi->size <= sizeof(*p)) {
4494 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4495 err = -EIO;
4496 goto out;
4497 }
4498 err = drbd_recv_all(peer_device->connection, p, pi->size);
4499 if (err)
4500 goto out;
4501 err = decode_bitmap_c(peer_device, p, &c, pi->size);
4502 } else {
4503 drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4504 err = -EIO;
4505 goto out;
4506 }
4507
4508 c.packets[pi->cmd == P_BITMAP]++;
4509 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4510
4511 if (err <= 0) {
4512 if (err < 0)
4513 goto out;
4514 break;
4515 }
4516 err = drbd_recv_header(peer_device->connection, pi);
4517 if (err)
4518 goto out;
4519 }
4520
4521 INFO_bm_xfer_stats(device, "receive", &c);
4522
4523 if (device->state.conn == C_WF_BITMAP_T) {
4524 enum drbd_state_rv rv;
4525
4526 err = drbd_send_bitmap(device);
4527 if (err)
4528 goto out;
4529 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4530 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4531 D_ASSERT(device, rv == SS_SUCCESS);
4532 } else if (device->state.conn != C_WF_BITMAP_S) {
4533 /* admin may have requested C_DISCONNECTING,
4534 * other threads may have noticed network errors */
4535 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4536 drbd_conn_str(device->state.conn));
4537 }
4538 err = 0;
4539
4540 out:
4541 drbd_bm_unlock(device);
4542 if (!err && device->state.conn == C_WF_BITMAP_S)
4543 drbd_start_resync(device, C_SYNC_SOURCE);
4544 return err;
4545 }
4546
4547 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4548 {
4549 drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4550 pi->cmd, pi->size);
4551
4552 return ignore_remaining_packet(connection, pi);
4553 }
4554
4555 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4556 {
4557 /* Make sure we've acked all the TCP data associated
4558 * with the data requests being unplugged */
4559 drbd_tcp_quickack(connection->data.socket);
4560
4561 return 0;
4562 }
4563
4564 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4565 {
4566 struct drbd_peer_device *peer_device;
4567 struct drbd_device *device;
4568 struct p_block_desc *p = pi->data;
4569
4570 peer_device = conn_peer_device(connection, pi->vnr);
4571 if (!peer_device)
4572 return -EIO;
4573 device = peer_device->device;
4574
4575 switch (device->state.conn) {
4576 case C_WF_SYNC_UUID:
4577 case C_WF_BITMAP_T:
4578 case C_BEHIND:
4579 break;
4580 default:
4581 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4582 drbd_conn_str(device->state.conn));
4583 }
4584
4585 drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4586
4587 return 0;
4588 }
4589
4590 struct data_cmd {
4591 int expect_payload;
4592 size_t pkt_size;
4593 int (*fn)(struct drbd_connection *, struct packet_info *);
4594 };
4595
4596 static struct data_cmd drbd_cmd_handler[] = {
4597 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
4598 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
4599 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4600 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4601 [P_BITMAP] = { 1, 0, receive_bitmap } ,
4602 [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4603 [P_UNPLUG_REMOTE] = { 0, 0, receive_UnplugRemote },
4604 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4605 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4606 [P_SYNC_PARAM] = { 1, 0, receive_SyncParam },
4607 [P_SYNC_PARAM89] = { 1, 0, receive_SyncParam },
4608 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
4609 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
4610 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
4611 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
4612 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
4613 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4614 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4615 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4616 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4617 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
4618 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4619 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4620 [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4621 [P_TRIM] = { 0, sizeof(struct p_trim), receive_Data },
4622 };
4623
4624 static void drbdd(struct drbd_connection *connection)
4625 {
4626 struct packet_info pi;
4627 size_t shs; /* sub header size */
4628 int err;
4629
4630 while (get_t_state(&connection->receiver) == RUNNING) {
4631 struct data_cmd *cmd;
4632
4633 drbd_thread_current_set_cpu(&connection->receiver);
4634 update_receiver_timing_details(connection, drbd_recv_header);
4635 if (drbd_recv_header(connection, &pi))
4636 goto err_out;
4637
4638 cmd = &drbd_cmd_handler[pi.cmd];
4639 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4640 drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4641 cmdname(pi.cmd), pi.cmd);
4642 goto err_out;
4643 }
4644
4645 shs = cmd->pkt_size;
4646 if (pi.size > shs && !cmd->expect_payload) {
4647 drbd_err(connection, "No payload expected %s l:%d\n",
4648 cmdname(pi.cmd), pi.size);
4649 goto err_out;
4650 }
4651
4652 if (shs) {
4653 update_receiver_timing_details(connection, drbd_recv_all_warn);
4654 err = drbd_recv_all_warn(connection, pi.data, shs);
4655 if (err)
4656 goto err_out;
4657 pi.size -= shs;
4658 }
4659
4660 update_receiver_timing_details(connection, cmd->fn);
4661 err = cmd->fn(connection, &pi);
4662 if (err) {
4663 drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4664 cmdname(pi.cmd), err, pi.size);
4665 goto err_out;
4666 }
4667 }
4668 return;
4669
4670 err_out:
4671 conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4672 }
4673
4674 static void conn_disconnect(struct drbd_connection *connection)
4675 {
4676 struct drbd_peer_device *peer_device;
4677 enum drbd_conns oc;
4678 int vnr;
4679
4680 if (connection->cstate == C_STANDALONE)
4681 return;
4682
4683 /* We are about to start the cleanup after connection loss.
4684 * Make sure drbd_make_request knows about that.
4685 * Usually we should be in some network failure state already,
4686 * but just in case we are not, we fix it up here.
4687 */
4688 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4689
4690 /* ack_receiver does not clean up anything. it must not interfere, either */
4691 drbd_thread_stop(&connection->ack_receiver);
4692 if (connection->ack_sender) {
4693 destroy_workqueue(connection->ack_sender);
4694 connection->ack_sender = NULL;
4695 }
4696 drbd_free_sock(connection);
4697
4698 rcu_read_lock();
4699 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4700 struct drbd_device *device = peer_device->device;
4701 kref_get(&device->kref);
4702 rcu_read_unlock();
4703 drbd_disconnected(peer_device);
4704 kref_put(&device->kref, drbd_destroy_device);
4705 rcu_read_lock();
4706 }
4707 rcu_read_unlock();
4708
4709 if (!list_empty(&connection->current_epoch->list))
4710 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4711 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4712 atomic_set(&connection->current_epoch->epoch_size, 0);
4713 connection->send.seen_any_write_yet = false;
4714
4715 drbd_info(connection, "Connection closed\n");
4716
4717 if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4718 conn_try_outdate_peer_async(connection);
4719
4720 spin_lock_irq(&connection->resource->req_lock);
4721 oc = connection->cstate;
4722 if (oc >= C_UNCONNECTED)
4723 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4724
4725 spin_unlock_irq(&connection->resource->req_lock);
4726
4727 if (oc == C_DISCONNECTING)
4728 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4729 }
4730
4731 static int drbd_disconnected(struct drbd_peer_device *peer_device)
4732 {
4733 struct drbd_device *device = peer_device->device;
4734 unsigned int i;
4735
4736 /* wait for current activity to cease. */
4737 spin_lock_irq(&device->resource->req_lock);
4738 _drbd_wait_ee_list_empty(device, &device->active_ee);
4739 _drbd_wait_ee_list_empty(device, &device->sync_ee);
4740 _drbd_wait_ee_list_empty(device, &device->read_ee);
4741 spin_unlock_irq(&device->resource->req_lock);
4742
4743 /* We do not have data structures that would allow us to
4744 * get the rs_pending_cnt down to 0 again.
4745 * * On C_SYNC_TARGET we do not have any data structures describing
4746 * the pending RSDataRequest's we have sent.
4747 * * On C_SYNC_SOURCE there is no data structure that tracks
4748 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4749 * And no, it is not the sum of the reference counts in the
4750 * resync_LRU. The resync_LRU tracks the whole operation including
4751 * the disk-IO, while the rs_pending_cnt only tracks the blocks
4752 * on the fly. */
4753 drbd_rs_cancel_all(device);
4754 device->rs_total = 0;
4755 device->rs_failed = 0;
4756 atomic_set(&device->rs_pending_cnt, 0);
4757 wake_up(&device->misc_wait);
4758
4759 del_timer_sync(&device->resync_timer);
4760 resync_timer_fn((unsigned long)device);
4761
4762 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4763 * w_make_resync_request etc. which may still be on the worker queue
4764 * to be "canceled" */
4765 drbd_flush_workqueue(&peer_device->connection->sender_work);
4766
4767 drbd_finish_peer_reqs(device);
4768
4769 /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4770 might have issued a work again. The one before drbd_finish_peer_reqs() is
4771 necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4772 drbd_flush_workqueue(&peer_device->connection->sender_work);
4773
4774 /* need to do it again, drbd_finish_peer_reqs() may have populated it
4775 * again via drbd_try_clear_on_disk_bm(). */
4776 drbd_rs_cancel_all(device);
4777
4778 kfree(device->p_uuid);
4779 device->p_uuid = NULL;
4780
4781 if (!drbd_suspended(device))
4782 tl_clear(peer_device->connection);
4783
4784 drbd_md_sync(device);
4785
4786 /* serialize with bitmap writeout triggered by the state change,
4787 * if any. */
4788 wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags));
4789
4790 /* tcp_close and release of sendpage pages can be deferred. I don't
4791 * want to use SO_LINGER, because apparently it can be deferred for
4792 * more than 20 seconds (longest time I checked).
4793 *
4794 * Actually we don't care for exactly when the network stack does its
4795 * put_page(), but release our reference on these pages right here.
4796 */
4797 i = drbd_free_peer_reqs(device, &device->net_ee);
4798 if (i)
4799 drbd_info(device, "net_ee not empty, killed %u entries\n", i);
4800 i = atomic_read(&device->pp_in_use_by_net);
4801 if (i)
4802 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
4803 i = atomic_read(&device->pp_in_use);
4804 if (i)
4805 drbd_info(device, "pp_in_use = %d, expected 0\n", i);
4806
4807 D_ASSERT(device, list_empty(&device->read_ee));
4808 D_ASSERT(device, list_empty(&device->active_ee));
4809 D_ASSERT(device, list_empty(&device->sync_ee));
4810 D_ASSERT(device, list_empty(&device->done_ee));
4811
4812 return 0;
4813 }
4814
4815 /*
4816 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4817 * we can agree on is stored in agreed_pro_version.
4818 *
4819 * feature flags and the reserved array should be enough room for future
4820 * enhancements of the handshake protocol, and possible plugins...
4821 *
4822 * for now, they are expected to be zero, but ignored.
4823 */
4824 static int drbd_send_features(struct drbd_connection *connection)
4825 {
4826 struct drbd_socket *sock;
4827 struct p_connection_features *p;
4828
4829 sock = &connection->data;
4830 p = conn_prepare_command(connection, sock);
4831 if (!p)
4832 return -EIO;
4833 memset(p, 0, sizeof(*p));
4834 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4835 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4836 p->feature_flags = cpu_to_be32(PRO_FEATURES);
4837 return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4838 }
4839
4840 /*
4841 * return values:
4842 * 1 yes, we have a valid connection
4843 * 0 oops, did not work out, please try again
4844 * -1 peer talks different language,
4845 * no point in trying again, please go standalone.
4846 */
4847 static int drbd_do_features(struct drbd_connection *connection)
4848 {
4849 /* ASSERT current == connection->receiver ... */
4850 struct p_connection_features *p;
4851 const int expect = sizeof(struct p_connection_features);
4852 struct packet_info pi;
4853 int err;
4854
4855 err = drbd_send_features(connection);
4856 if (err)
4857 return 0;
4858
4859 err = drbd_recv_header(connection, &pi);
4860 if (err)
4861 return 0;
4862
4863 if (pi.cmd != P_CONNECTION_FEATURES) {
4864 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4865 cmdname(pi.cmd), pi.cmd);
4866 return -1;
4867 }
4868
4869 if (pi.size != expect) {
4870 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
4871 expect, pi.size);
4872 return -1;
4873 }
4874
4875 p = pi.data;
4876 err = drbd_recv_all_warn(connection, p, expect);
4877 if (err)
4878 return 0;
4879
4880 p->protocol_min = be32_to_cpu(p->protocol_min);
4881 p->protocol_max = be32_to_cpu(p->protocol_max);
4882 if (p->protocol_max == 0)
4883 p->protocol_max = p->protocol_min;
4884
4885 if (PRO_VERSION_MAX < p->protocol_min ||
4886 PRO_VERSION_MIN > p->protocol_max)
4887 goto incompat;
4888
4889 connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4890 connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
4891
4892 drbd_info(connection, "Handshake successful: "
4893 "Agreed network protocol version %d\n", connection->agreed_pro_version);
4894
4895 drbd_info(connection, "Agreed to%ssupport TRIM on protocol level\n",
4896 connection->agreed_features & FF_TRIM ? " " : " not ");
4897
4898 return 1;
4899
4900 incompat:
4901 drbd_err(connection, "incompatible DRBD dialects: "
4902 "I support %d-%d, peer supports %d-%d\n",
4903 PRO_VERSION_MIN, PRO_VERSION_MAX,
4904 p->protocol_min, p->protocol_max);
4905 return -1;
4906 }
4907
4908 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4909 static int drbd_do_auth(struct drbd_connection *connection)
4910 {
4911 drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4912 drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4913 return -1;
4914 }
4915 #else
4916 #define CHALLENGE_LEN 64
4917
4918 /* Return value:
4919 1 - auth succeeded,
4920 0 - failed, try again (network error),
4921 -1 - auth failed, don't try again.
4922 */
4923
4924 static int drbd_do_auth(struct drbd_connection *connection)
4925 {
4926 struct drbd_socket *sock;
4927 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4928 struct scatterlist sg;
4929 char *response = NULL;
4930 char *right_response = NULL;
4931 char *peers_ch = NULL;
4932 unsigned int key_len;
4933 char secret[SHARED_SECRET_MAX]; /* 64 byte */
4934 unsigned int resp_size;
4935 struct hash_desc desc;
4936 struct packet_info pi;
4937 struct net_conf *nc;
4938 int err, rv;
4939
4940 /* FIXME: Put the challenge/response into the preallocated socket buffer. */
4941
4942 rcu_read_lock();
4943 nc = rcu_dereference(connection->net_conf);
4944 key_len = strlen(nc->shared_secret);
4945 memcpy(secret, nc->shared_secret, key_len);
4946 rcu_read_unlock();
4947
4948 desc.tfm = connection->cram_hmac_tfm;
4949 desc.flags = 0;
4950
4951 rv = crypto_hash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
4952 if (rv) {
4953 drbd_err(connection, "crypto_hash_setkey() failed with %d\n", rv);
4954 rv = -1;
4955 goto fail;
4956 }
4957
4958 get_random_bytes(my_challenge, CHALLENGE_LEN);
4959
4960 sock = &connection->data;
4961 if (!conn_prepare_command(connection, sock)) {
4962 rv = 0;
4963 goto fail;
4964 }
4965 rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
4966 my_challenge, CHALLENGE_LEN);
4967 if (!rv)
4968 goto fail;
4969
4970 err = drbd_recv_header(connection, &pi);
4971 if (err) {
4972 rv = 0;
4973 goto fail;
4974 }
4975
4976 if (pi.cmd != P_AUTH_CHALLENGE) {
4977 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4978 cmdname(pi.cmd), pi.cmd);
4979 rv = 0;
4980 goto fail;
4981 }
4982
4983 if (pi.size > CHALLENGE_LEN * 2) {
4984 drbd_err(connection, "expected AuthChallenge payload too big.\n");
4985 rv = -1;
4986 goto fail;
4987 }
4988
4989 if (pi.size < CHALLENGE_LEN) {
4990 drbd_err(connection, "AuthChallenge payload too small.\n");
4991 rv = -1;
4992 goto fail;
4993 }
4994
4995 peers_ch = kmalloc(pi.size, GFP_NOIO);
4996 if (peers_ch == NULL) {
4997 drbd_err(connection, "kmalloc of peers_ch failed\n");
4998 rv = -1;
4999 goto fail;
5000 }
5001
5002 err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5003 if (err) {
5004 rv = 0;
5005 goto fail;
5006 }
5007
5008 if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5009 drbd_err(connection, "Peer presented the same challenge!\n");
5010 rv = -1;
5011 goto fail;
5012 }
5013
5014 resp_size = crypto_hash_digestsize(connection->cram_hmac_tfm);
5015 response = kmalloc(resp_size, GFP_NOIO);
5016 if (response == NULL) {
5017 drbd_err(connection, "kmalloc of response failed\n");
5018 rv = -1;
5019 goto fail;
5020 }
5021
5022 sg_init_table(&sg, 1);
5023 sg_set_buf(&sg, peers_ch, pi.size);
5024
5025 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
5026 if (rv) {
5027 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5028 rv = -1;
5029 goto fail;
5030 }
5031
5032 if (!conn_prepare_command(connection, sock)) {
5033 rv = 0;
5034 goto fail;
5035 }
5036 rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5037 response, resp_size);
5038 if (!rv)
5039 goto fail;
5040
5041 err = drbd_recv_header(connection, &pi);
5042 if (err) {
5043 rv = 0;
5044 goto fail;
5045 }
5046
5047 if (pi.cmd != P_AUTH_RESPONSE) {
5048 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5049 cmdname(pi.cmd), pi.cmd);
5050 rv = 0;
5051 goto fail;
5052 }
5053
5054 if (pi.size != resp_size) {
5055 drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5056 rv = 0;
5057 goto fail;
5058 }
5059
5060 err = drbd_recv_all_warn(connection, response , resp_size);
5061 if (err) {
5062 rv = 0;
5063 goto fail;
5064 }
5065
5066 right_response = kmalloc(resp_size, GFP_NOIO);
5067 if (right_response == NULL) {
5068 drbd_err(connection, "kmalloc of right_response failed\n");
5069 rv = -1;
5070 goto fail;
5071 }
5072
5073 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
5074
5075 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
5076 if (rv) {
5077 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5078 rv = -1;
5079 goto fail;
5080 }
5081
5082 rv = !memcmp(response, right_response, resp_size);
5083
5084 if (rv)
5085 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5086 resp_size);
5087 else
5088 rv = -1;
5089
5090 fail:
5091 kfree(peers_ch);
5092 kfree(response);
5093 kfree(right_response);
5094
5095 return rv;
5096 }
5097 #endif
5098
5099 int drbd_receiver(struct drbd_thread *thi)
5100 {
5101 struct drbd_connection *connection = thi->connection;
5102 int h;
5103
5104 drbd_info(connection, "receiver (re)started\n");
5105
5106 do {
5107 h = conn_connect(connection);
5108 if (h == 0) {
5109 conn_disconnect(connection);
5110 schedule_timeout_interruptible(HZ);
5111 }
5112 if (h == -1) {
5113 drbd_warn(connection, "Discarding network configuration.\n");
5114 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5115 }
5116 } while (h == 0);
5117
5118 if (h > 0)
5119 drbdd(connection);
5120
5121 conn_disconnect(connection);
5122
5123 drbd_info(connection, "receiver terminated\n");
5124 return 0;
5125 }
5126
5127 /* ********* acknowledge sender ******** */
5128
5129 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5130 {
5131 struct p_req_state_reply *p = pi->data;
5132 int retcode = be32_to_cpu(p->retcode);
5133
5134 if (retcode >= SS_SUCCESS) {
5135 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5136 } else {
5137 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5138 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5139 drbd_set_st_err_str(retcode), retcode);
5140 }
5141 wake_up(&connection->ping_wait);
5142
5143 return 0;
5144 }
5145
5146 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5147 {
5148 struct drbd_peer_device *peer_device;
5149 struct drbd_device *device;
5150 struct p_req_state_reply *p = pi->data;
5151 int retcode = be32_to_cpu(p->retcode);
5152
5153 peer_device = conn_peer_device(connection, pi->vnr);
5154 if (!peer_device)
5155 return -EIO;
5156 device = peer_device->device;
5157
5158 if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5159 D_ASSERT(device, connection->agreed_pro_version < 100);
5160 return got_conn_RqSReply(connection, pi);
5161 }
5162
5163 if (retcode >= SS_SUCCESS) {
5164 set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5165 } else {
5166 set_bit(CL_ST_CHG_FAIL, &device->flags);
5167 drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5168 drbd_set_st_err_str(retcode), retcode);
5169 }
5170 wake_up(&device->state_wait);
5171
5172 return 0;
5173 }
5174
5175 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5176 {
5177 return drbd_send_ping_ack(connection);
5178
5179 }
5180
5181 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5182 {
5183 /* restore idle timeout */
5184 connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5185 if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5186 wake_up(&connection->ping_wait);
5187
5188 return 0;
5189 }
5190
5191 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5192 {
5193 struct drbd_peer_device *peer_device;
5194 struct drbd_device *device;
5195 struct p_block_ack *p = pi->data;
5196 sector_t sector = be64_to_cpu(p->sector);
5197 int blksize = be32_to_cpu(p->blksize);
5198
5199 peer_device = conn_peer_device(connection, pi->vnr);
5200 if (!peer_device)
5201 return -EIO;
5202 device = peer_device->device;
5203
5204 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5205
5206 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5207
5208 if (get_ldev(device)) {
5209 drbd_rs_complete_io(device, sector);
5210 drbd_set_in_sync(device, sector, blksize);
5211 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5212 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5213 put_ldev(device);
5214 }
5215 dec_rs_pending(device);
5216 atomic_add(blksize >> 9, &device->rs_sect_in);
5217
5218 return 0;
5219 }
5220
5221 static int
5222 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5223 struct rb_root *root, const char *func,
5224 enum drbd_req_event what, bool missing_ok)
5225 {
5226 struct drbd_request *req;
5227 struct bio_and_error m;
5228
5229 spin_lock_irq(&device->resource->req_lock);
5230 req = find_request(device, root, id, sector, missing_ok, func);
5231 if (unlikely(!req)) {
5232 spin_unlock_irq(&device->resource->req_lock);
5233 return -EIO;
5234 }
5235 __req_mod(req, what, &m);
5236 spin_unlock_irq(&device->resource->req_lock);
5237
5238 if (m.bio)
5239 complete_master_bio(device, &m);
5240 return 0;
5241 }
5242
5243 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5244 {
5245 struct drbd_peer_device *peer_device;
5246 struct drbd_device *device;
5247 struct p_block_ack *p = pi->data;
5248 sector_t sector = be64_to_cpu(p->sector);
5249 int blksize = be32_to_cpu(p->blksize);
5250 enum drbd_req_event what;
5251
5252 peer_device = conn_peer_device(connection, pi->vnr);
5253 if (!peer_device)
5254 return -EIO;
5255 device = peer_device->device;
5256
5257 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5258
5259 if (p->block_id == ID_SYNCER) {
5260 drbd_set_in_sync(device, sector, blksize);
5261 dec_rs_pending(device);
5262 return 0;
5263 }
5264 switch (pi->cmd) {
5265 case P_RS_WRITE_ACK:
5266 what = WRITE_ACKED_BY_PEER_AND_SIS;
5267 break;
5268 case P_WRITE_ACK:
5269 what = WRITE_ACKED_BY_PEER;
5270 break;
5271 case P_RECV_ACK:
5272 what = RECV_ACKED_BY_PEER;
5273 break;
5274 case P_SUPERSEDED:
5275 what = CONFLICT_RESOLVED;
5276 break;
5277 case P_RETRY_WRITE:
5278 what = POSTPONE_WRITE;
5279 break;
5280 default:
5281 BUG();
5282 }
5283
5284 return validate_req_change_req_state(device, p->block_id, sector,
5285 &device->write_requests, __func__,
5286 what, false);
5287 }
5288
5289 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5290 {
5291 struct drbd_peer_device *peer_device;
5292 struct drbd_device *device;
5293 struct p_block_ack *p = pi->data;
5294 sector_t sector = be64_to_cpu(p->sector);
5295 int size = be32_to_cpu(p->blksize);
5296 int err;
5297
5298 peer_device = conn_peer_device(connection, pi->vnr);
5299 if (!peer_device)
5300 return -EIO;
5301 device = peer_device->device;
5302
5303 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5304
5305 if (p->block_id == ID_SYNCER) {
5306 dec_rs_pending(device);
5307 drbd_rs_failed_io(device, sector, size);
5308 return 0;
5309 }
5310
5311 err = validate_req_change_req_state(device, p->block_id, sector,
5312 &device->write_requests, __func__,
5313 NEG_ACKED, true);
5314 if (err) {
5315 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5316 The master bio might already be completed, therefore the
5317 request is no longer in the collision hash. */
5318 /* In Protocol B we might already have got a P_RECV_ACK
5319 but then get a P_NEG_ACK afterwards. */
5320 drbd_set_out_of_sync(device, sector, size);
5321 }
5322 return 0;
5323 }
5324
5325 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5326 {
5327 struct drbd_peer_device *peer_device;
5328 struct drbd_device *device;
5329 struct p_block_ack *p = pi->data;
5330 sector_t sector = be64_to_cpu(p->sector);
5331
5332 peer_device = conn_peer_device(connection, pi->vnr);
5333 if (!peer_device)
5334 return -EIO;
5335 device = peer_device->device;
5336
5337 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5338
5339 drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5340 (unsigned long long)sector, be32_to_cpu(p->blksize));
5341
5342 return validate_req_change_req_state(device, p->block_id, sector,
5343 &device->read_requests, __func__,
5344 NEG_ACKED, false);
5345 }
5346
5347 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5348 {
5349 struct drbd_peer_device *peer_device;
5350 struct drbd_device *device;
5351 sector_t sector;
5352 int size;
5353 struct p_block_ack *p = pi->data;
5354
5355 peer_device = conn_peer_device(connection, pi->vnr);
5356 if (!peer_device)
5357 return -EIO;
5358 device = peer_device->device;
5359
5360 sector = be64_to_cpu(p->sector);
5361 size = be32_to_cpu(p->blksize);
5362
5363 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5364
5365 dec_rs_pending(device);
5366
5367 if (get_ldev_if_state(device, D_FAILED)) {
5368 drbd_rs_complete_io(device, sector);
5369 switch (pi->cmd) {
5370 case P_NEG_RS_DREPLY:
5371 drbd_rs_failed_io(device, sector, size);
5372 case P_RS_CANCEL:
5373 break;
5374 default:
5375 BUG();
5376 }
5377 put_ldev(device);
5378 }
5379
5380 return 0;
5381 }
5382
5383 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5384 {
5385 struct p_barrier_ack *p = pi->data;
5386 struct drbd_peer_device *peer_device;
5387 int vnr;
5388
5389 tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5390
5391 rcu_read_lock();
5392 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5393 struct drbd_device *device = peer_device->device;
5394
5395 if (device->state.conn == C_AHEAD &&
5396 atomic_read(&device->ap_in_flight) == 0 &&
5397 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5398 device->start_resync_timer.expires = jiffies + HZ;
5399 add_timer(&device->start_resync_timer);
5400 }
5401 }
5402 rcu_read_unlock();
5403
5404 return 0;
5405 }
5406
5407 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5408 {
5409 struct drbd_peer_device *peer_device;
5410 struct drbd_device *device;
5411 struct p_block_ack *p = pi->data;
5412 struct drbd_device_work *dw;
5413 sector_t sector;
5414 int size;
5415
5416 peer_device = conn_peer_device(connection, pi->vnr);
5417 if (!peer_device)
5418 return -EIO;
5419 device = peer_device->device;
5420
5421 sector = be64_to_cpu(p->sector);
5422 size = be32_to_cpu(p->blksize);
5423
5424 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5425
5426 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5427 drbd_ov_out_of_sync_found(device, sector, size);
5428 else
5429 ov_out_of_sync_print(device);
5430
5431 if (!get_ldev(device))
5432 return 0;
5433
5434 drbd_rs_complete_io(device, sector);
5435 dec_rs_pending(device);
5436
5437 --device->ov_left;
5438
5439 /* let's advance progress step marks only for every other megabyte */
5440 if ((device->ov_left & 0x200) == 0x200)
5441 drbd_advance_rs_marks(device, device->ov_left);
5442
5443 if (device->ov_left == 0) {
5444 dw = kmalloc(sizeof(*dw), GFP_NOIO);
5445 if (dw) {
5446 dw->w.cb = w_ov_finished;
5447 dw->device = device;
5448 drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5449 } else {
5450 drbd_err(device, "kmalloc(dw) failed.");
5451 ov_out_of_sync_print(device);
5452 drbd_resync_finished(device);
5453 }
5454 }
5455 put_ldev(device);
5456 return 0;
5457 }
5458
5459 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5460 {
5461 return 0;
5462 }
5463
5464 struct meta_sock_cmd {
5465 size_t pkt_size;
5466 int (*fn)(struct drbd_connection *connection, struct packet_info *);
5467 };
5468
5469 static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5470 {
5471 long t;
5472 struct net_conf *nc;
5473
5474 rcu_read_lock();
5475 nc = rcu_dereference(connection->net_conf);
5476 t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5477 rcu_read_unlock();
5478
5479 t *= HZ;
5480 if (ping_timeout)
5481 t /= 10;
5482
5483 connection->meta.socket->sk->sk_rcvtimeo = t;
5484 }
5485
5486 static void set_ping_timeout(struct drbd_connection *connection)
5487 {
5488 set_rcvtimeo(connection, 1);
5489 }
5490
5491 static void set_idle_timeout(struct drbd_connection *connection)
5492 {
5493 set_rcvtimeo(connection, 0);
5494 }
5495
5496 static struct meta_sock_cmd ack_receiver_tbl[] = {
5497 [P_PING] = { 0, got_Ping },
5498 [P_PING_ACK] = { 0, got_PingAck },
5499 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5500 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5501 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5502 [P_SUPERSEDED] = { sizeof(struct p_block_ack), got_BlockAck },
5503 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
5504 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
5505 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply },
5506 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
5507 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
5508 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5509 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
5510 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
5511 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply },
5512 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5513 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
5514 };
5515
5516 int drbd_ack_receiver(struct drbd_thread *thi)
5517 {
5518 struct drbd_connection *connection = thi->connection;
5519 struct meta_sock_cmd *cmd = NULL;
5520 struct packet_info pi;
5521 unsigned long pre_recv_jif;
5522 int rv;
5523 void *buf = connection->meta.rbuf;
5524 int received = 0;
5525 unsigned int header_size = drbd_header_size(connection);
5526 int expect = header_size;
5527 bool ping_timeout_active = false;
5528 struct sched_param param = { .sched_priority = 2 };
5529
5530 rv = sched_setscheduler(current, SCHED_RR, &param);
5531 if (rv < 0)
5532 drbd_err(connection, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv);
5533
5534 while (get_t_state(thi) == RUNNING) {
5535 drbd_thread_current_set_cpu(thi);
5536
5537 conn_reclaim_net_peer_reqs(connection);
5538
5539 if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5540 if (drbd_send_ping(connection)) {
5541 drbd_err(connection, "drbd_send_ping has failed\n");
5542 goto reconnect;
5543 }
5544 set_ping_timeout(connection);
5545 ping_timeout_active = true;
5546 }
5547
5548 pre_recv_jif = jiffies;
5549 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5550
5551 /* Note:
5552 * -EINTR (on meta) we got a signal
5553 * -EAGAIN (on meta) rcvtimeo expired
5554 * -ECONNRESET other side closed the connection
5555 * -ERESTARTSYS (on data) we got a signal
5556 * rv < 0 other than above: unexpected error!
5557 * rv == expected: full header or command
5558 * rv < expected: "woken" by signal during receive
5559 * rv == 0 : "connection shut down by peer"
5560 */
5561 if (likely(rv > 0)) {
5562 received += rv;
5563 buf += rv;
5564 } else if (rv == 0) {
5565 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5566 long t;
5567 rcu_read_lock();
5568 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5569 rcu_read_unlock();
5570
5571 t = wait_event_timeout(connection->ping_wait,
5572 connection->cstate < C_WF_REPORT_PARAMS,
5573 t);
5574 if (t)
5575 break;
5576 }
5577 drbd_err(connection, "meta connection shut down by peer.\n");
5578 goto reconnect;
5579 } else if (rv == -EAGAIN) {
5580 /* If the data socket received something meanwhile,
5581 * that is good enough: peer is still alive. */
5582 if (time_after(connection->last_received, pre_recv_jif))
5583 continue;
5584 if (ping_timeout_active) {
5585 drbd_err(connection, "PingAck did not arrive in time.\n");
5586 goto reconnect;
5587 }
5588 set_bit(SEND_PING, &connection->flags);
5589 continue;
5590 } else if (rv == -EINTR) {
5591 /* maybe drbd_thread_stop(): the while condition will notice.
5592 * maybe woken for send_ping: we'll send a ping above,
5593 * and change the rcvtimeo */
5594 flush_signals(current);
5595 continue;
5596 } else {
5597 drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5598 goto reconnect;
5599 }
5600
5601 if (received == expect && cmd == NULL) {
5602 if (decode_header(connection, connection->meta.rbuf, &pi))
5603 goto reconnect;
5604 cmd = &ack_receiver_tbl[pi.cmd];
5605 if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
5606 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5607 cmdname(pi.cmd), pi.cmd);
5608 goto disconnect;
5609 }
5610 expect = header_size + cmd->pkt_size;
5611 if (pi.size != expect - header_size) {
5612 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5613 pi.cmd, pi.size);
5614 goto reconnect;
5615 }
5616 }
5617 if (received == expect) {
5618 bool err;
5619
5620 err = cmd->fn(connection, &pi);
5621 if (err) {
5622 drbd_err(connection, "%pf failed\n", cmd->fn);
5623 goto reconnect;
5624 }
5625
5626 connection->last_received = jiffies;
5627
5628 if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
5629 set_idle_timeout(connection);
5630 ping_timeout_active = false;
5631 }
5632
5633 buf = connection->meta.rbuf;
5634 received = 0;
5635 expect = header_size;
5636 cmd = NULL;
5637 }
5638 }
5639
5640 if (0) {
5641 reconnect:
5642 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5643 conn_md_sync(connection);
5644 }
5645 if (0) {
5646 disconnect:
5647 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5648 }
5649
5650 drbd_info(connection, "ack_receiver terminated\n");
5651
5652 return 0;
5653 }
5654
5655 void drbd_send_acks_wf(struct work_struct *ws)
5656 {
5657 struct drbd_peer_device *peer_device =
5658 container_of(ws, struct drbd_peer_device, send_acks_work);
5659 struct drbd_connection *connection = peer_device->connection;
5660 struct drbd_device *device = peer_device->device;
5661 struct net_conf *nc;
5662 int tcp_cork, err;
5663
5664 rcu_read_lock();
5665 nc = rcu_dereference(connection->net_conf);
5666 tcp_cork = nc->tcp_cork;
5667 rcu_read_unlock();
5668
5669 if (tcp_cork)
5670 drbd_tcp_cork(connection->meta.socket);
5671
5672 err = drbd_finish_peer_reqs(device);
5673 kref_put(&device->kref, drbd_destroy_device);
5674 /* get is in drbd_endio_write_sec_final(). That is necessary to keep the
5675 struct work_struct send_acks_work alive, which is in the peer_device object */
5676
5677 if (err) {
5678 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5679 return;
5680 }
5681
5682 if (tcp_cork)
5683 drbd_tcp_uncork(connection->meta.socket);
5684
5685 return;
5686 }