]> git.proxmox.com Git - mirror_ubuntu-bionic-kernel.git/blame - drivers/block/drbd/drbd_receiver.c
drbd: report sizes if rejecting too small peer disk
[mirror_ubuntu-bionic-kernel.git] / drivers / block / drbd / drbd_receiver.c
CommitLineData
b411b363
PR
1/*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
b411b363
PR
26#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
b411b363
PR
31#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
b411b363
PR
39#include <linux/pkt_sched.h>
40#define __KERNEL_SYSCALLS__
41#include <linux/unistd.h>
42#include <linux/vmalloc.h>
43#include <linux/random.h>
b411b363
PR
44#include <linux/string.h>
45#include <linux/scatterlist.h>
46#include "drbd_int.h"
a3603a6e 47#include "drbd_protocol.h"
b411b363 48#include "drbd_req.h"
b411b363
PR
49#include "drbd_vli.h"
50
92d94ae6 51#define PRO_FEATURES (FF_TRIM | FF_THIN_RESYNC)
20c68fde 52
77351055
PR
53struct packet_info {
54 enum drbd_packet cmd;
e2857216
AG
55 unsigned int size;
56 unsigned int vnr;
e658983a 57 void *data;
77351055
PR
58};
59
b411b363
PR
60enum finish_epoch {
61 FE_STILL_LIVE,
62 FE_DESTROYED,
63 FE_RECYCLED,
64};
65
bde89a9e
AG
66static int drbd_do_features(struct drbd_connection *connection);
67static int drbd_do_auth(struct drbd_connection *connection);
69a22773 68static int drbd_disconnected(struct drbd_peer_device *);
a0fb3c47 69static void conn_wait_active_ee_empty(struct drbd_connection *connection);
bde89a9e 70static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
99920dc5 71static int e_end_block(struct drbd_work *, int);
b411b363 72
b411b363
PR
73
74#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
75
45bb912b
LE
76/*
77 * some helper functions to deal with single linked page lists,
78 * page->private being our "next" pointer.
79 */
80
81/* If at least n pages are linked at head, get n pages off.
82 * Otherwise, don't modify head, and return NULL.
83 * Locking is the responsibility of the caller.
84 */
85static struct page *page_chain_del(struct page **head, int n)
86{
87 struct page *page;
88 struct page *tmp;
89
90 BUG_ON(!n);
91 BUG_ON(!head);
92
93 page = *head;
23ce4227
PR
94
95 if (!page)
96 return NULL;
97
45bb912b
LE
98 while (page) {
99 tmp = page_chain_next(page);
100 if (--n == 0)
101 break; /* found sufficient pages */
102 if (tmp == NULL)
103 /* insufficient pages, don't use any of them. */
104 return NULL;
105 page = tmp;
106 }
107
108 /* add end of list marker for the returned list */
109 set_page_private(page, 0);
110 /* actual return value, and adjustment of head */
111 page = *head;
112 *head = tmp;
113 return page;
114}
115
116/* may be used outside of locks to find the tail of a (usually short)
117 * "private" page chain, before adding it back to a global chain head
118 * with page_chain_add() under a spinlock. */
119static struct page *page_chain_tail(struct page *page, int *len)
120{
121 struct page *tmp;
122 int i = 1;
123 while ((tmp = page_chain_next(page)))
124 ++i, page = tmp;
125 if (len)
126 *len = i;
127 return page;
128}
129
130static int page_chain_free(struct page *page)
131{
132 struct page *tmp;
133 int i = 0;
134 page_chain_for_each_safe(page, tmp) {
135 put_page(page);
136 ++i;
137 }
138 return i;
139}
140
141static void page_chain_add(struct page **head,
142 struct page *chain_first, struct page *chain_last)
143{
144#if 1
145 struct page *tmp;
146 tmp = page_chain_tail(chain_first, NULL);
147 BUG_ON(tmp != chain_last);
148#endif
149
150 /* add chain to head */
151 set_page_private(chain_last, (unsigned long)*head);
152 *head = chain_first;
153}
154
b30ab791 155static struct page *__drbd_alloc_pages(struct drbd_device *device,
18c2d522 156 unsigned int number)
b411b363
PR
157{
158 struct page *page = NULL;
45bb912b 159 struct page *tmp = NULL;
18c2d522 160 unsigned int i = 0;
b411b363
PR
161
162 /* Yes, testing drbd_pp_vacant outside the lock is racy.
163 * So what. It saves a spin_lock. */
45bb912b 164 if (drbd_pp_vacant >= number) {
b411b363 165 spin_lock(&drbd_pp_lock);
45bb912b
LE
166 page = page_chain_del(&drbd_pp_pool, number);
167 if (page)
168 drbd_pp_vacant -= number;
b411b363 169 spin_unlock(&drbd_pp_lock);
45bb912b
LE
170 if (page)
171 return page;
b411b363 172 }
45bb912b 173
b411b363
PR
174 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
175 * "criss-cross" setup, that might cause write-out on some other DRBD,
176 * which in turn might block on the other node at this very place. */
45bb912b
LE
177 for (i = 0; i < number; i++) {
178 tmp = alloc_page(GFP_TRY);
179 if (!tmp)
180 break;
181 set_page_private(tmp, (unsigned long)page);
182 page = tmp;
183 }
184
185 if (i == number)
186 return page;
187
188 /* Not enough pages immediately available this time.
c37c8ecf 189 * No need to jump around here, drbd_alloc_pages will retry this
45bb912b
LE
190 * function "soon". */
191 if (page) {
192 tmp = page_chain_tail(page, NULL);
193 spin_lock(&drbd_pp_lock);
194 page_chain_add(&drbd_pp_pool, page, tmp);
195 drbd_pp_vacant += i;
196 spin_unlock(&drbd_pp_lock);
197 }
198 return NULL;
b411b363
PR
199}
200
b30ab791 201static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
a990be46 202 struct list_head *to_be_freed)
b411b363 203{
a8cd15ba 204 struct drbd_peer_request *peer_req, *tmp;
b411b363
PR
205
206 /* The EEs are always appended to the end of the list. Since
207 they are sent in order over the wire, they have to finish
208 in order. As soon as we see the first not finished we can
209 stop to examine the list... */
210
a8cd15ba 211 list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
045417f7 212 if (drbd_peer_req_has_active_page(peer_req))
b411b363 213 break;
a8cd15ba 214 list_move(&peer_req->w.list, to_be_freed);
b411b363
PR
215 }
216}
217
668700b4 218static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
b411b363
PR
219{
220 LIST_HEAD(reclaimed);
db830c46 221 struct drbd_peer_request *peer_req, *t;
b411b363 222
0500813f 223 spin_lock_irq(&device->resource->req_lock);
b30ab791 224 reclaim_finished_net_peer_reqs(device, &reclaimed);
0500813f 225 spin_unlock_irq(&device->resource->req_lock);
a8cd15ba 226 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
b30ab791 227 drbd_free_net_peer_req(device, peer_req);
b411b363
PR
228}
229
668700b4
PR
230static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
231{
232 struct drbd_peer_device *peer_device;
233 int vnr;
234
235 rcu_read_lock();
236 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
237 struct drbd_device *device = peer_device->device;
238 if (!atomic_read(&device->pp_in_use_by_net))
239 continue;
240
241 kref_get(&device->kref);
242 rcu_read_unlock();
243 drbd_reclaim_net_peer_reqs(device);
244 kref_put(&device->kref, drbd_destroy_device);
245 rcu_read_lock();
246 }
247 rcu_read_unlock();
248}
249
b411b363 250/**
c37c8ecf 251 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
b30ab791 252 * @device: DRBD device.
45bb912b
LE
253 * @number: number of pages requested
254 * @retry: whether to retry, if not enough pages are available right now
255 *
256 * Tries to allocate number pages, first from our own page pool, then from
0e49d7b0 257 * the kernel.
45bb912b 258 * Possibly retry until DRBD frees sufficient pages somewhere else.
b411b363 259 *
0e49d7b0
LE
260 * If this allocation would exceed the max_buffers setting, we throttle
261 * allocation (schedule_timeout) to give the system some room to breathe.
262 *
263 * We do not use max-buffers as hard limit, because it could lead to
264 * congestion and further to a distributed deadlock during online-verify or
265 * (checksum based) resync, if the max-buffers, socket buffer sizes and
266 * resync-rate settings are mis-configured.
267 *
45bb912b 268 * Returns a page chain linked via page->private.
b411b363 269 */
69a22773 270struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
c37c8ecf 271 bool retry)
b411b363 272{
69a22773 273 struct drbd_device *device = peer_device->device;
b411b363 274 struct page *page = NULL;
44ed167d 275 struct net_conf *nc;
b411b363 276 DEFINE_WAIT(wait);
0e49d7b0 277 unsigned int mxb;
b411b363 278
44ed167d 279 rcu_read_lock();
69a22773 280 nc = rcu_dereference(peer_device->connection->net_conf);
44ed167d
PR
281 mxb = nc ? nc->max_buffers : 1000000;
282 rcu_read_unlock();
283
b30ab791
AG
284 if (atomic_read(&device->pp_in_use) < mxb)
285 page = __drbd_alloc_pages(device, number);
b411b363 286
668700b4
PR
287 /* Try to keep the fast path fast, but occasionally we need
288 * to reclaim the pages we lended to the network stack. */
289 if (page && atomic_read(&device->pp_in_use_by_net) > 512)
290 drbd_reclaim_net_peer_reqs(device);
291
45bb912b 292 while (page == NULL) {
b411b363
PR
293 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
294
668700b4 295 drbd_reclaim_net_peer_reqs(device);
b411b363 296
b30ab791
AG
297 if (atomic_read(&device->pp_in_use) < mxb) {
298 page = __drbd_alloc_pages(device, number);
b411b363
PR
299 if (page)
300 break;
301 }
302
303 if (!retry)
304 break;
305
306 if (signal_pending(current)) {
d0180171 307 drbd_warn(device, "drbd_alloc_pages interrupted!\n");
b411b363
PR
308 break;
309 }
310
0e49d7b0
LE
311 if (schedule_timeout(HZ/10) == 0)
312 mxb = UINT_MAX;
b411b363
PR
313 }
314 finish_wait(&drbd_pp_wait, &wait);
315
45bb912b 316 if (page)
b30ab791 317 atomic_add(number, &device->pp_in_use);
b411b363
PR
318 return page;
319}
320
c37c8ecf 321/* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
0500813f 322 * Is also used from inside an other spin_lock_irq(&resource->req_lock);
45bb912b
LE
323 * Either links the page chain back to the global pool,
324 * or returns all pages to the system. */
b30ab791 325static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
b411b363 326{
b30ab791 327 atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
b411b363 328 int i;
435f0740 329
a73ff323
LE
330 if (page == NULL)
331 return;
332
81a5d60e 333 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
45bb912b
LE
334 i = page_chain_free(page);
335 else {
336 struct page *tmp;
337 tmp = page_chain_tail(page, &i);
338 spin_lock(&drbd_pp_lock);
339 page_chain_add(&drbd_pp_pool, page, tmp);
340 drbd_pp_vacant += i;
341 spin_unlock(&drbd_pp_lock);
b411b363 342 }
435f0740 343 i = atomic_sub_return(i, a);
45bb912b 344 if (i < 0)
d0180171 345 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
435f0740 346 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
b411b363
PR
347 wake_up(&drbd_pp_wait);
348}
349
350/*
351You need to hold the req_lock:
352 _drbd_wait_ee_list_empty()
353
354You must not have the req_lock:
3967deb1 355 drbd_free_peer_req()
0db55363 356 drbd_alloc_peer_req()
7721f567 357 drbd_free_peer_reqs()
b411b363 358 drbd_ee_fix_bhs()
a990be46 359 drbd_finish_peer_reqs()
b411b363
PR
360 drbd_clear_done_ee()
361 drbd_wait_ee_list_empty()
362*/
363
f6ffca9f 364struct drbd_peer_request *
69a22773 365drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
a0fb3c47 366 unsigned int data_size, bool has_payload, gfp_t gfp_mask) __must_hold(local)
b411b363 367{
69a22773 368 struct drbd_device *device = peer_device->device;
db830c46 369 struct drbd_peer_request *peer_req;
a73ff323 370 struct page *page = NULL;
45bb912b 371 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
b411b363 372
b30ab791 373 if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
b411b363
PR
374 return NULL;
375
db830c46
AG
376 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
377 if (!peer_req) {
b411b363 378 if (!(gfp_mask & __GFP_NOWARN))
d0180171 379 drbd_err(device, "%s: allocation failed\n", __func__);
b411b363
PR
380 return NULL;
381 }
382
a0fb3c47 383 if (has_payload && data_size) {
d0164adc
MG
384 page = drbd_alloc_pages(peer_device, nr_pages,
385 gfpflags_allow_blocking(gfp_mask));
a73ff323
LE
386 if (!page)
387 goto fail;
388 }
b411b363 389
c5a2c150
LE
390 memset(peer_req, 0, sizeof(*peer_req));
391 INIT_LIST_HEAD(&peer_req->w.list);
db830c46
AG
392 drbd_clear_interval(&peer_req->i);
393 peer_req->i.size = data_size;
394 peer_req->i.sector = sector;
c5a2c150 395 peer_req->submit_jif = jiffies;
a8cd15ba 396 peer_req->peer_device = peer_device;
db830c46 397 peer_req->pages = page;
9a8e7753
AG
398 /*
399 * The block_id is opaque to the receiver. It is not endianness
400 * converted, and sent back to the sender unchanged.
401 */
db830c46 402 peer_req->block_id = id;
b411b363 403
db830c46 404 return peer_req;
b411b363 405
45bb912b 406 fail:
db830c46 407 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
408 return NULL;
409}
410
b30ab791 411void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
f6ffca9f 412 int is_net)
b411b363 413{
21ae5d7f 414 might_sleep();
db830c46
AG
415 if (peer_req->flags & EE_HAS_DIGEST)
416 kfree(peer_req->digest);
b30ab791 417 drbd_free_pages(device, peer_req->pages, is_net);
0b0ba1ef
AG
418 D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
419 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
21ae5d7f
LE
420 if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
421 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
422 drbd_al_complete_io(device, &peer_req->i);
423 }
db830c46 424 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
425}
426
b30ab791 427int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
b411b363
PR
428{
429 LIST_HEAD(work_list);
db830c46 430 struct drbd_peer_request *peer_req, *t;
b411b363 431 int count = 0;
b30ab791 432 int is_net = list == &device->net_ee;
b411b363 433
0500813f 434 spin_lock_irq(&device->resource->req_lock);
b411b363 435 list_splice_init(list, &work_list);
0500813f 436 spin_unlock_irq(&device->resource->req_lock);
b411b363 437
a8cd15ba 438 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
b30ab791 439 __drbd_free_peer_req(device, peer_req, is_net);
b411b363
PR
440 count++;
441 }
442 return count;
443}
444
b411b363 445/*
a990be46 446 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
b411b363 447 */
b30ab791 448static int drbd_finish_peer_reqs(struct drbd_device *device)
b411b363
PR
449{
450 LIST_HEAD(work_list);
451 LIST_HEAD(reclaimed);
db830c46 452 struct drbd_peer_request *peer_req, *t;
e2b3032b 453 int err = 0;
b411b363 454
0500813f 455 spin_lock_irq(&device->resource->req_lock);
b30ab791
AG
456 reclaim_finished_net_peer_reqs(device, &reclaimed);
457 list_splice_init(&device->done_ee, &work_list);
0500813f 458 spin_unlock_irq(&device->resource->req_lock);
b411b363 459
a8cd15ba 460 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
b30ab791 461 drbd_free_net_peer_req(device, peer_req);
b411b363
PR
462
463 /* possible callbacks here:
d4dabbe2 464 * e_end_block, and e_end_resync_block, e_send_superseded.
b411b363
PR
465 * all ignore the last argument.
466 */
a8cd15ba 467 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
e2b3032b
AG
468 int err2;
469
b411b363 470 /* list_del not necessary, next/prev members not touched */
a8cd15ba 471 err2 = peer_req->w.cb(&peer_req->w, !!err);
e2b3032b
AG
472 if (!err)
473 err = err2;
b30ab791 474 drbd_free_peer_req(device, peer_req);
b411b363 475 }
b30ab791 476 wake_up(&device->ee_wait);
b411b363 477
e2b3032b 478 return err;
b411b363
PR
479}
480
b30ab791 481static void _drbd_wait_ee_list_empty(struct drbd_device *device,
d4da1537 482 struct list_head *head)
b411b363
PR
483{
484 DEFINE_WAIT(wait);
485
486 /* avoids spin_lock/unlock
487 * and calling prepare_to_wait in the fast path */
488 while (!list_empty(head)) {
b30ab791 489 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
0500813f 490 spin_unlock_irq(&device->resource->req_lock);
7eaceacc 491 io_schedule();
b30ab791 492 finish_wait(&device->ee_wait, &wait);
0500813f 493 spin_lock_irq(&device->resource->req_lock);
b411b363
PR
494 }
495}
496
b30ab791 497static void drbd_wait_ee_list_empty(struct drbd_device *device,
d4da1537 498 struct list_head *head)
b411b363 499{
0500813f 500 spin_lock_irq(&device->resource->req_lock);
b30ab791 501 _drbd_wait_ee_list_empty(device, head);
0500813f 502 spin_unlock_irq(&device->resource->req_lock);
b411b363
PR
503}
504
dbd9eea0 505static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
b411b363 506{
b411b363
PR
507 struct kvec iov = {
508 .iov_base = buf,
509 .iov_len = size,
510 };
511 struct msghdr msg = {
b411b363
PR
512 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
513 };
f730c848 514 return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
b411b363
PR
515}
516
bde89a9e 517static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
b411b363 518{
b411b363
PR
519 int rv;
520
bde89a9e 521 rv = drbd_recv_short(connection->data.socket, buf, size, 0);
b411b363 522
dbd0820c
PR
523 if (rv < 0) {
524 if (rv == -ECONNRESET)
1ec861eb 525 drbd_info(connection, "sock was reset by peer\n");
dbd0820c 526 else if (rv != -ERESTARTSYS)
1ec861eb 527 drbd_err(connection, "sock_recvmsg returned %d\n", rv);
dbd0820c 528 } else if (rv == 0) {
bde89a9e 529 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
b66623e3
PR
530 long t;
531 rcu_read_lock();
bde89a9e 532 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
b66623e3
PR
533 rcu_read_unlock();
534
bde89a9e 535 t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
b66623e3 536
599377ac
PR
537 if (t)
538 goto out;
539 }
1ec861eb 540 drbd_info(connection, "sock was shut down by peer\n");
599377ac
PR
541 }
542
b411b363 543 if (rv != size)
bde89a9e 544 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
b411b363 545
599377ac 546out:
b411b363
PR
547 return rv;
548}
549
bde89a9e 550static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
c6967746
AG
551{
552 int err;
553
bde89a9e 554 err = drbd_recv(connection, buf, size);
c6967746
AG
555 if (err != size) {
556 if (err >= 0)
557 err = -EIO;
558 } else
559 err = 0;
560 return err;
561}
562
bde89a9e 563static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
a5c31904
AG
564{
565 int err;
566
bde89a9e 567 err = drbd_recv_all(connection, buf, size);
a5c31904 568 if (err && !signal_pending(current))
1ec861eb 569 drbd_warn(connection, "short read (expected size %d)\n", (int)size);
a5c31904
AG
570 return err;
571}
572
5dbf1673
LE
573/* quoting tcp(7):
574 * On individual connections, the socket buffer size must be set prior to the
575 * listen(2) or connect(2) calls in order to have it take effect.
576 * This is our wrapper to do so.
577 */
578static void drbd_setbufsize(struct socket *sock, unsigned int snd,
579 unsigned int rcv)
580{
581 /* open coded SO_SNDBUF, SO_RCVBUF */
582 if (snd) {
583 sock->sk->sk_sndbuf = snd;
584 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
585 }
586 if (rcv) {
587 sock->sk->sk_rcvbuf = rcv;
588 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
589 }
590}
591
bde89a9e 592static struct socket *drbd_try_connect(struct drbd_connection *connection)
b411b363
PR
593{
594 const char *what;
595 struct socket *sock;
596 struct sockaddr_in6 src_in6;
44ed167d
PR
597 struct sockaddr_in6 peer_in6;
598 struct net_conf *nc;
599 int err, peer_addr_len, my_addr_len;
69ef82de 600 int sndbuf_size, rcvbuf_size, connect_int;
b411b363
PR
601 int disconnect_on_error = 1;
602
44ed167d 603 rcu_read_lock();
bde89a9e 604 nc = rcu_dereference(connection->net_conf);
44ed167d
PR
605 if (!nc) {
606 rcu_read_unlock();
b411b363 607 return NULL;
44ed167d 608 }
44ed167d
PR
609 sndbuf_size = nc->sndbuf_size;
610 rcvbuf_size = nc->rcvbuf_size;
69ef82de 611 connect_int = nc->connect_int;
089c075d 612 rcu_read_unlock();
44ed167d 613
bde89a9e
AG
614 my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
615 memcpy(&src_in6, &connection->my_addr, my_addr_len);
44ed167d 616
bde89a9e 617 if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
44ed167d
PR
618 src_in6.sin6_port = 0;
619 else
620 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
621
bde89a9e
AG
622 peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
623 memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
b411b363
PR
624
625 what = "sock_create_kern";
eeb1bd5c 626 err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
44ed167d 627 SOCK_STREAM, IPPROTO_TCP, &sock);
b411b363
PR
628 if (err < 0) {
629 sock = NULL;
630 goto out;
631 }
632
633 sock->sk->sk_rcvtimeo =
69ef82de 634 sock->sk->sk_sndtimeo = connect_int * HZ;
44ed167d 635 drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
b411b363
PR
636
637 /* explicitly bind to the configured IP as source IP
638 * for the outgoing connections.
639 * This is needed for multihomed hosts and to be
640 * able to use lo: interfaces for drbd.
641 * Make sure to use 0 as port number, so linux selects
642 * a free one dynamically.
643 */
b411b363 644 what = "bind before connect";
44ed167d 645 err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
b411b363
PR
646 if (err < 0)
647 goto out;
648
649 /* connect may fail, peer not yet available.
650 * stay C_WF_CONNECTION, don't go Disconnecting! */
651 disconnect_on_error = 0;
652 what = "connect";
44ed167d 653 err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
b411b363
PR
654
655out:
656 if (err < 0) {
657 if (sock) {
658 sock_release(sock);
659 sock = NULL;
660 }
661 switch (-err) {
662 /* timeout, busy, signal pending */
663 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
664 case EINTR: case ERESTARTSYS:
665 /* peer not (yet) available, network problem */
666 case ECONNREFUSED: case ENETUNREACH:
667 case EHOSTDOWN: case EHOSTUNREACH:
668 disconnect_on_error = 0;
669 break;
670 default:
1ec861eb 671 drbd_err(connection, "%s failed, err = %d\n", what, err);
b411b363
PR
672 }
673 if (disconnect_on_error)
bde89a9e 674 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 675 }
44ed167d 676
b411b363
PR
677 return sock;
678}
679
7a426fd8 680struct accept_wait_data {
bde89a9e 681 struct drbd_connection *connection;
7a426fd8
PR
682 struct socket *s_listen;
683 struct completion door_bell;
684 void (*original_sk_state_change)(struct sock *sk);
685
686};
687
715306f6 688static void drbd_incoming_connection(struct sock *sk)
7a426fd8
PR
689{
690 struct accept_wait_data *ad = sk->sk_user_data;
715306f6 691 void (*state_change)(struct sock *sk);
7a426fd8 692
715306f6
AG
693 state_change = ad->original_sk_state_change;
694 if (sk->sk_state == TCP_ESTABLISHED)
695 complete(&ad->door_bell);
696 state_change(sk);
7a426fd8
PR
697}
698
bde89a9e 699static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
b411b363 700{
1f3e509b 701 int err, sndbuf_size, rcvbuf_size, my_addr_len;
44ed167d 702 struct sockaddr_in6 my_addr;
1f3e509b 703 struct socket *s_listen;
44ed167d 704 struct net_conf *nc;
b411b363
PR
705 const char *what;
706
44ed167d 707 rcu_read_lock();
bde89a9e 708 nc = rcu_dereference(connection->net_conf);
44ed167d
PR
709 if (!nc) {
710 rcu_read_unlock();
7a426fd8 711 return -EIO;
44ed167d 712 }
44ed167d
PR
713 sndbuf_size = nc->sndbuf_size;
714 rcvbuf_size = nc->rcvbuf_size;
44ed167d 715 rcu_read_unlock();
b411b363 716
bde89a9e
AG
717 my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
718 memcpy(&my_addr, &connection->my_addr, my_addr_len);
b411b363
PR
719
720 what = "sock_create_kern";
eeb1bd5c 721 err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
1f3e509b 722 SOCK_STREAM, IPPROTO_TCP, &s_listen);
b411b363
PR
723 if (err) {
724 s_listen = NULL;
725 goto out;
726 }
727
98683650 728 s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
44ed167d 729 drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
b411b363
PR
730
731 what = "bind before listen";
44ed167d 732 err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
b411b363
PR
733 if (err < 0)
734 goto out;
735
7a426fd8
PR
736 ad->s_listen = s_listen;
737 write_lock_bh(&s_listen->sk->sk_callback_lock);
738 ad->original_sk_state_change = s_listen->sk->sk_state_change;
715306f6 739 s_listen->sk->sk_state_change = drbd_incoming_connection;
7a426fd8
PR
740 s_listen->sk->sk_user_data = ad;
741 write_unlock_bh(&s_listen->sk->sk_callback_lock);
b411b363 742
2820fd39
PR
743 what = "listen";
744 err = s_listen->ops->listen(s_listen, 5);
745 if (err < 0)
746 goto out;
747
7a426fd8 748 return 0;
b411b363
PR
749out:
750 if (s_listen)
751 sock_release(s_listen);
752 if (err < 0) {
753 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
1ec861eb 754 drbd_err(connection, "%s failed, err = %d\n", what, err);
bde89a9e 755 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
756 }
757 }
b411b363 758
7a426fd8 759 return -EIO;
b411b363
PR
760}
761
715306f6 762static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
b411b363 763{
715306f6
AG
764 write_lock_bh(&sk->sk_callback_lock);
765 sk->sk_state_change = ad->original_sk_state_change;
766 sk->sk_user_data = NULL;
767 write_unlock_bh(&sk->sk_callback_lock);
b411b363
PR
768}
769
bde89a9e 770static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
b411b363 771{
1f3e509b
PR
772 int timeo, connect_int, err = 0;
773 struct socket *s_estab = NULL;
1f3e509b
PR
774 struct net_conf *nc;
775
776 rcu_read_lock();
bde89a9e 777 nc = rcu_dereference(connection->net_conf);
1f3e509b
PR
778 if (!nc) {
779 rcu_read_unlock();
780 return NULL;
781 }
782 connect_int = nc->connect_int;
783 rcu_read_unlock();
784
785 timeo = connect_int * HZ;
38b682b2
AM
786 /* 28.5% random jitter */
787 timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
1f3e509b 788
7a426fd8
PR
789 err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
790 if (err <= 0)
791 return NULL;
b411b363 792
7a426fd8 793 err = kernel_accept(ad->s_listen, &s_estab, 0);
b411b363
PR
794 if (err < 0) {
795 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
1ec861eb 796 drbd_err(connection, "accept failed, err = %d\n", err);
bde89a9e 797 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
798 }
799 }
b411b363 800
715306f6
AG
801 if (s_estab)
802 unregister_state_change(s_estab->sk, ad);
b411b363 803
b411b363
PR
804 return s_estab;
805}
b411b363 806
bde89a9e 807static int decode_header(struct drbd_connection *, void *, struct packet_info *);
b411b363 808
bde89a9e 809static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
9f5bdc33
AG
810 enum drbd_packet cmd)
811{
bde89a9e 812 if (!conn_prepare_command(connection, sock))
9f5bdc33 813 return -EIO;
bde89a9e 814 return conn_send_command(connection, sock, cmd, 0, NULL, 0);
b411b363
PR
815}
816
bde89a9e 817static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
b411b363 818{
bde89a9e 819 unsigned int header_size = drbd_header_size(connection);
9f5bdc33 820 struct packet_info pi;
4920e37a 821 struct net_conf *nc;
9f5bdc33 822 int err;
b411b363 823
4920e37a
PR
824 rcu_read_lock();
825 nc = rcu_dereference(connection->net_conf);
826 if (!nc) {
827 rcu_read_unlock();
828 return -EIO;
829 }
830 sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
831 rcu_read_unlock();
832
bde89a9e 833 err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
9f5bdc33
AG
834 if (err != header_size) {
835 if (err >= 0)
836 err = -EIO;
837 return err;
838 }
bde89a9e 839 err = decode_header(connection, connection->data.rbuf, &pi);
9f5bdc33
AG
840 if (err)
841 return err;
842 return pi.cmd;
b411b363
PR
843}
844
845/**
846 * drbd_socket_okay() - Free the socket if its connection is not okay
b411b363
PR
847 * @sock: pointer to the pointer to the socket.
848 */
5d0b17f1 849static bool drbd_socket_okay(struct socket **sock)
b411b363
PR
850{
851 int rr;
852 char tb[4];
853
854 if (!*sock)
81e84650 855 return false;
b411b363 856
dbd9eea0 857 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
b411b363
PR
858
859 if (rr > 0 || rr == -EAGAIN) {
81e84650 860 return true;
b411b363
PR
861 } else {
862 sock_release(*sock);
863 *sock = NULL;
81e84650 864 return false;
b411b363
PR
865 }
866}
5d0b17f1
PR
867
868static bool connection_established(struct drbd_connection *connection,
869 struct socket **sock1,
870 struct socket **sock2)
871{
872 struct net_conf *nc;
873 int timeout;
874 bool ok;
875
876 if (!*sock1 || !*sock2)
877 return false;
878
879 rcu_read_lock();
880 nc = rcu_dereference(connection->net_conf);
881 timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
882 rcu_read_unlock();
883 schedule_timeout_interruptible(timeout);
884
885 ok = drbd_socket_okay(sock1);
886 ok = drbd_socket_okay(sock2) && ok;
887
888 return ok;
889}
890
2325eb66
PR
891/* Gets called if a connection is established, or if a new minor gets created
892 in a connection */
69a22773 893int drbd_connected(struct drbd_peer_device *peer_device)
907599e0 894{
69a22773 895 struct drbd_device *device = peer_device->device;
0829f5ed 896 int err;
907599e0 897
b30ab791
AG
898 atomic_set(&device->packet_seq, 0);
899 device->peer_seq = 0;
907599e0 900
69a22773
AG
901 device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
902 &peer_device->connection->cstate_mutex :
b30ab791 903 &device->own_state_mutex;
8410da8f 904
69a22773 905 err = drbd_send_sync_param(peer_device);
0829f5ed 906 if (!err)
69a22773 907 err = drbd_send_sizes(peer_device, 0, 0);
0829f5ed 908 if (!err)
69a22773 909 err = drbd_send_uuids(peer_device);
0829f5ed 910 if (!err)
69a22773 911 err = drbd_send_current_state(peer_device);
b30ab791
AG
912 clear_bit(USE_DEGR_WFC_T, &device->flags);
913 clear_bit(RESIZE_PENDING, &device->flags);
914 atomic_set(&device->ap_in_flight, 0);
915 mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
0829f5ed 916 return err;
907599e0 917}
b411b363
PR
918
919/*
920 * return values:
921 * 1 yes, we have a valid connection
922 * 0 oops, did not work out, please try again
923 * -1 peer talks different language,
924 * no point in trying again, please go standalone.
925 * -2 We do not have a network config...
926 */
bde89a9e 927static int conn_connect(struct drbd_connection *connection)
b411b363 928{
7da35862 929 struct drbd_socket sock, msock;
c06ece6b 930 struct drbd_peer_device *peer_device;
44ed167d 931 struct net_conf *nc;
5d0b17f1
PR
932 int vnr, timeout, h;
933 bool discard_my_data, ok;
197296ff 934 enum drbd_state_rv rv;
7a426fd8 935 struct accept_wait_data ad = {
bde89a9e 936 .connection = connection,
7a426fd8
PR
937 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
938 };
b411b363 939
bde89a9e
AG
940 clear_bit(DISCONNECT_SENT, &connection->flags);
941 if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
b411b363
PR
942 return -2;
943
7da35862 944 mutex_init(&sock.mutex);
bde89a9e
AG
945 sock.sbuf = connection->data.sbuf;
946 sock.rbuf = connection->data.rbuf;
7da35862
PR
947 sock.socket = NULL;
948 mutex_init(&msock.mutex);
bde89a9e
AG
949 msock.sbuf = connection->meta.sbuf;
950 msock.rbuf = connection->meta.rbuf;
7da35862
PR
951 msock.socket = NULL;
952
0916e0e3 953 /* Assume that the peer only understands protocol 80 until we know better. */
bde89a9e 954 connection->agreed_pro_version = 80;
b411b363 955
bde89a9e 956 if (prepare_listen_socket(connection, &ad))
7a426fd8 957 return 0;
b411b363
PR
958
959 do {
2bf89621 960 struct socket *s;
b411b363 961
bde89a9e 962 s = drbd_try_connect(connection);
b411b363 963 if (s) {
7da35862
PR
964 if (!sock.socket) {
965 sock.socket = s;
bde89a9e 966 send_first_packet(connection, &sock, P_INITIAL_DATA);
7da35862 967 } else if (!msock.socket) {
bde89a9e 968 clear_bit(RESOLVE_CONFLICTS, &connection->flags);
7da35862 969 msock.socket = s;
bde89a9e 970 send_first_packet(connection, &msock, P_INITIAL_META);
b411b363 971 } else {
1ec861eb 972 drbd_err(connection, "Logic error in conn_connect()\n");
b411b363
PR
973 goto out_release_sockets;
974 }
975 }
976
5d0b17f1
PR
977 if (connection_established(connection, &sock.socket, &msock.socket))
978 break;
b411b363
PR
979
980retry:
bde89a9e 981 s = drbd_wait_for_connect(connection, &ad);
b411b363 982 if (s) {
bde89a9e 983 int fp = receive_first_packet(connection, s);
7da35862
PR
984 drbd_socket_okay(&sock.socket);
985 drbd_socket_okay(&msock.socket);
92f14951 986 switch (fp) {
e5d6f33a 987 case P_INITIAL_DATA:
7da35862 988 if (sock.socket) {
1ec861eb 989 drbd_warn(connection, "initial packet S crossed\n");
7da35862 990 sock_release(sock.socket);
80c6eed4
PR
991 sock.socket = s;
992 goto randomize;
b411b363 993 }
7da35862 994 sock.socket = s;
b411b363 995 break;
e5d6f33a 996 case P_INITIAL_META:
bde89a9e 997 set_bit(RESOLVE_CONFLICTS, &connection->flags);
7da35862 998 if (msock.socket) {
1ec861eb 999 drbd_warn(connection, "initial packet M crossed\n");
7da35862 1000 sock_release(msock.socket);
80c6eed4
PR
1001 msock.socket = s;
1002 goto randomize;
b411b363 1003 }
7da35862 1004 msock.socket = s;
b411b363
PR
1005 break;
1006 default:
1ec861eb 1007 drbd_warn(connection, "Error receiving initial packet\n");
b411b363 1008 sock_release(s);
80c6eed4 1009randomize:
38b682b2 1010 if (prandom_u32() & 1)
b411b363
PR
1011 goto retry;
1012 }
1013 }
1014
bde89a9e 1015 if (connection->cstate <= C_DISCONNECTING)
b411b363
PR
1016 goto out_release_sockets;
1017 if (signal_pending(current)) {
1018 flush_signals(current);
1019 smp_rmb();
bde89a9e 1020 if (get_t_state(&connection->receiver) == EXITING)
b411b363
PR
1021 goto out_release_sockets;
1022 }
1023
5d0b17f1 1024 ok = connection_established(connection, &sock.socket, &msock.socket);
b666dbf8 1025 } while (!ok);
b411b363 1026
7a426fd8
PR
1027 if (ad.s_listen)
1028 sock_release(ad.s_listen);
b411b363 1029
98683650
PR
1030 sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1031 msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
b411b363 1032
7da35862
PR
1033 sock.socket->sk->sk_allocation = GFP_NOIO;
1034 msock.socket->sk->sk_allocation = GFP_NOIO;
b411b363 1035
7da35862
PR
1036 sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1037 msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
b411b363 1038
b411b363 1039 /* NOT YET ...
bde89a9e 1040 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
7da35862 1041 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
6038178e 1042 * first set it to the P_CONNECTION_FEATURES timeout,
b411b363 1043 * which we set to 4x the configured ping_timeout. */
44ed167d 1044 rcu_read_lock();
bde89a9e 1045 nc = rcu_dereference(connection->net_conf);
44ed167d 1046
7da35862
PR
1047 sock.socket->sk->sk_sndtimeo =
1048 sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
b411b363 1049
7da35862 1050 msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
44ed167d 1051 timeout = nc->timeout * HZ / 10;
08b165ba 1052 discard_my_data = nc->discard_my_data;
44ed167d 1053 rcu_read_unlock();
b411b363 1054
7da35862 1055 msock.socket->sk->sk_sndtimeo = timeout;
b411b363
PR
1056
1057 /* we don't want delays.
25985edc 1058 * we use TCP_CORK where appropriate, though */
7da35862
PR
1059 drbd_tcp_nodelay(sock.socket);
1060 drbd_tcp_nodelay(msock.socket);
b411b363 1061
bde89a9e
AG
1062 connection->data.socket = sock.socket;
1063 connection->meta.socket = msock.socket;
1064 connection->last_received = jiffies;
b411b363 1065
bde89a9e 1066 h = drbd_do_features(connection);
b411b363
PR
1067 if (h <= 0)
1068 return h;
1069
bde89a9e 1070 if (connection->cram_hmac_tfm) {
b30ab791 1071 /* drbd_request_state(device, NS(conn, WFAuth)); */
bde89a9e 1072 switch (drbd_do_auth(connection)) {
b10d96cb 1073 case -1:
1ec861eb 1074 drbd_err(connection, "Authentication of peer failed\n");
b411b363 1075 return -1;
b10d96cb 1076 case 0:
1ec861eb 1077 drbd_err(connection, "Authentication of peer failed, trying again.\n");
b10d96cb 1078 return 0;
b411b363
PR
1079 }
1080 }
1081
bde89a9e
AG
1082 connection->data.socket->sk->sk_sndtimeo = timeout;
1083 connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
b411b363 1084
bde89a9e 1085 if (drbd_send_protocol(connection) == -EOPNOTSUPP)
7e2455c1 1086 return -1;
b411b363 1087
31007745
PR
1088 /* Prevent a race between resync-handshake and
1089 * being promoted to Primary.
1090 *
1091 * Grab and release the state mutex, so we know that any current
1092 * drbd_set_role() is finished, and any incoming drbd_set_role
1093 * will see the STATE_SENT flag, and wait for it to be cleared.
1094 */
1095 idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1096 mutex_lock(peer_device->device->state_mutex);
1097
bde89a9e 1098 set_bit(STATE_SENT, &connection->flags);
a1096a6e 1099
31007745
PR
1100 idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1101 mutex_unlock(peer_device->device->state_mutex);
1102
c141ebda 1103 rcu_read_lock();
c06ece6b
AG
1104 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1105 struct drbd_device *device = peer_device->device;
b30ab791 1106 kref_get(&device->kref);
26ea8f92
AG
1107 rcu_read_unlock();
1108
08b165ba 1109 if (discard_my_data)
b30ab791 1110 set_bit(DISCARD_MY_DATA, &device->flags);
08b165ba 1111 else
b30ab791 1112 clear_bit(DISCARD_MY_DATA, &device->flags);
08b165ba 1113
69a22773 1114 drbd_connected(peer_device);
05a10ec7 1115 kref_put(&device->kref, drbd_destroy_device);
c141ebda
PR
1116 rcu_read_lock();
1117 }
1118 rcu_read_unlock();
1119
bde89a9e
AG
1120 rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1121 if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1122 clear_bit(STATE_SENT, &connection->flags);
1e86ac48 1123 return 0;
a1096a6e 1124 }
1e86ac48 1125
1c03e520 1126 drbd_thread_start(&connection->ack_receiver);
39e91a60
LE
1127 /* opencoded create_singlethread_workqueue(),
1128 * to be able to use format string arguments */
1129 connection->ack_sender =
1130 alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
668700b4
PR
1131 if (!connection->ack_sender) {
1132 drbd_err(connection, "Failed to create workqueue ack_sender\n");
1133 return 0;
1134 }
b411b363 1135
0500813f 1136 mutex_lock(&connection->resource->conf_update);
08b165ba
PR
1137 /* The discard_my_data flag is a single-shot modifier to the next
1138 * connection attempt, the handshake of which is now well underway.
1139 * No need for rcu style copying of the whole struct
1140 * just to clear a single value. */
bde89a9e 1141 connection->net_conf->discard_my_data = 0;
0500813f 1142 mutex_unlock(&connection->resource->conf_update);
08b165ba 1143
d3fcb490 1144 return h;
b411b363
PR
1145
1146out_release_sockets:
7a426fd8
PR
1147 if (ad.s_listen)
1148 sock_release(ad.s_listen);
7da35862
PR
1149 if (sock.socket)
1150 sock_release(sock.socket);
1151 if (msock.socket)
1152 sock_release(msock.socket);
b411b363
PR
1153 return -1;
1154}
1155
bde89a9e 1156static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
b411b363 1157{
bde89a9e 1158 unsigned int header_size = drbd_header_size(connection);
e658983a 1159
0c8e36d9
AG
1160 if (header_size == sizeof(struct p_header100) &&
1161 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1162 struct p_header100 *h = header;
1163 if (h->pad != 0) {
1ec861eb 1164 drbd_err(connection, "Header padding is not zero\n");
0c8e36d9
AG
1165 return -EINVAL;
1166 }
1167 pi->vnr = be16_to_cpu(h->volume);
1168 pi->cmd = be16_to_cpu(h->command);
1169 pi->size = be32_to_cpu(h->length);
1170 } else if (header_size == sizeof(struct p_header95) &&
1171 *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
e658983a 1172 struct p_header95 *h = header;
e658983a 1173 pi->cmd = be16_to_cpu(h->command);
b55d84ba
AG
1174 pi->size = be32_to_cpu(h->length);
1175 pi->vnr = 0;
e658983a
AG
1176 } else if (header_size == sizeof(struct p_header80) &&
1177 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1178 struct p_header80 *h = header;
1179 pi->cmd = be16_to_cpu(h->command);
1180 pi->size = be16_to_cpu(h->length);
77351055 1181 pi->vnr = 0;
02918be2 1182 } else {
1ec861eb 1183 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
e658983a 1184 be32_to_cpu(*(__be32 *)header),
bde89a9e 1185 connection->agreed_pro_version);
8172f3e9 1186 return -EINVAL;
b411b363 1187 }
e658983a 1188 pi->data = header + header_size;
8172f3e9 1189 return 0;
257d0af6 1190}
b411b363 1191
bde89a9e 1192static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
257d0af6 1193{
bde89a9e 1194 void *buffer = connection->data.rbuf;
69bc7bc3 1195 int err;
257d0af6 1196
bde89a9e 1197 err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
a5c31904 1198 if (err)
69bc7bc3 1199 return err;
257d0af6 1200
bde89a9e
AG
1201 err = decode_header(connection, buffer, pi);
1202 connection->last_received = jiffies;
b411b363 1203
69bc7bc3 1204 return err;
b411b363
PR
1205}
1206
f9ff0da5
LE
1207/* This is blkdev_issue_flush, but asynchronous.
1208 * We want to submit to all component volumes in parallel,
1209 * then wait for all completions.
1210 */
1211struct issue_flush_context {
1212 atomic_t pending;
1213 int error;
1214 struct completion done;
1215};
1216struct one_flush_context {
1217 struct drbd_device *device;
1218 struct issue_flush_context *ctx;
1219};
1220
1221void one_flush_endio(struct bio *bio)
b411b363 1222{
f9ff0da5
LE
1223 struct one_flush_context *octx = bio->bi_private;
1224 struct drbd_device *device = octx->device;
1225 struct issue_flush_context *ctx = octx->ctx;
1226
1227 if (bio->bi_error) {
1228 ctx->error = bio->bi_error;
1229 drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_error);
1230 }
1231 kfree(octx);
1232 bio_put(bio);
1233
1234 clear_bit(FLUSH_PENDING, &device->flags);
1235 put_ldev(device);
1236 kref_put(&device->kref, drbd_destroy_device);
1237
1238 if (atomic_dec_and_test(&ctx->pending))
1239 complete(&ctx->done);
1240}
1241
1242static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx)
1243{
1244 struct bio *bio = bio_alloc(GFP_NOIO, 0);
1245 struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO);
1246 if (!bio || !octx) {
1247 drbd_warn(device, "Could not allocate a bio, CANNOT ISSUE FLUSH\n");
1248 /* FIXME: what else can I do now? disconnecting or detaching
1249 * really does not help to improve the state of the world, either.
1250 */
1251 kfree(octx);
1252 if (bio)
1253 bio_put(bio);
1254
1255 ctx->error = -ENOMEM;
1256 put_ldev(device);
1257 kref_put(&device->kref, drbd_destroy_device);
1258 return;
1259 }
4b0007c0 1260
f9ff0da5
LE
1261 octx->device = device;
1262 octx->ctx = ctx;
1263 bio->bi_bdev = device->ldev->backing_bdev;
1264 bio->bi_private = octx;
1265 bio->bi_end_io = one_flush_endio;
1266 bio_set_op_attrs(bio, REQ_OP_FLUSH, WRITE_FLUSH);
1267
1268 device->flush_jif = jiffies;
1269 set_bit(FLUSH_PENDING, &device->flags);
1270 atomic_inc(&ctx->pending);
1271 submit_bio(bio);
1272}
1273
1274static void drbd_flush(struct drbd_connection *connection)
1275{
f6ba8636 1276 if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
f9ff0da5
LE
1277 struct drbd_peer_device *peer_device;
1278 struct issue_flush_context ctx;
1279 int vnr;
1280
1281 atomic_set(&ctx.pending, 1);
1282 ctx.error = 0;
1283 init_completion(&ctx.done);
1284
615e087f 1285 rcu_read_lock();
c06ece6b
AG
1286 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1287 struct drbd_device *device = peer_device->device;
1288
b30ab791 1289 if (!get_ldev(device))
615e087f 1290 continue;
b30ab791 1291 kref_get(&device->kref);
615e087f
LE
1292 rcu_read_unlock();
1293
f9ff0da5 1294 submit_one_flush(device, &ctx);
b411b363 1295
615e087f 1296 rcu_read_lock();
b411b363 1297 }
615e087f 1298 rcu_read_unlock();
f9ff0da5
LE
1299
1300 /* Do we want to add a timeout,
1301 * if disk-timeout is set? */
1302 if (!atomic_dec_and_test(&ctx.pending))
1303 wait_for_completion(&ctx.done);
1304
1305 if (ctx.error) {
1306 /* would rather check on EOPNOTSUPP, but that is not reliable.
1307 * don't try again for ANY return value != 0
1308 * if (rv == -EOPNOTSUPP) */
1309 /* Any error is already reported by bio_endio callback. */
1310 drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1311 }
b411b363 1312 }
b411b363
PR
1313}
1314
1315/**
1316 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
b30ab791 1317 * @device: DRBD device.
b411b363
PR
1318 * @epoch: Epoch object.
1319 * @ev: Epoch event.
1320 */
bde89a9e 1321static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
b411b363
PR
1322 struct drbd_epoch *epoch,
1323 enum epoch_event ev)
1324{
2451fc3b 1325 int epoch_size;
b411b363 1326 struct drbd_epoch *next_epoch;
b411b363
PR
1327 enum finish_epoch rv = FE_STILL_LIVE;
1328
bde89a9e 1329 spin_lock(&connection->epoch_lock);
b411b363
PR
1330 do {
1331 next_epoch = NULL;
b411b363
PR
1332
1333 epoch_size = atomic_read(&epoch->epoch_size);
1334
1335 switch (ev & ~EV_CLEANUP) {
1336 case EV_PUT:
1337 atomic_dec(&epoch->active);
1338 break;
1339 case EV_GOT_BARRIER_NR:
1340 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
b411b363
PR
1341 break;
1342 case EV_BECAME_LAST:
1343 /* nothing to do*/
1344 break;
1345 }
1346
b411b363
PR
1347 if (epoch_size != 0 &&
1348 atomic_read(&epoch->active) == 0 &&
80f9fd55 1349 (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
b411b363 1350 if (!(ev & EV_CLEANUP)) {
bde89a9e
AG
1351 spin_unlock(&connection->epoch_lock);
1352 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1353 spin_lock(&connection->epoch_lock);
b411b363 1354 }
9ed57dcb
LE
1355#if 0
1356 /* FIXME: dec unacked on connection, once we have
1357 * something to count pending connection packets in. */
80f9fd55 1358 if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
bde89a9e 1359 dec_unacked(epoch->connection);
9ed57dcb 1360#endif
b411b363 1361
bde89a9e 1362 if (connection->current_epoch != epoch) {
b411b363
PR
1363 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1364 list_del(&epoch->list);
1365 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
bde89a9e 1366 connection->epochs--;
b411b363
PR
1367 kfree(epoch);
1368
1369 if (rv == FE_STILL_LIVE)
1370 rv = FE_DESTROYED;
1371 } else {
1372 epoch->flags = 0;
1373 atomic_set(&epoch->epoch_size, 0);
698f9315 1374 /* atomic_set(&epoch->active, 0); is already zero */
b411b363
PR
1375 if (rv == FE_STILL_LIVE)
1376 rv = FE_RECYCLED;
1377 }
1378 }
1379
1380 if (!next_epoch)
1381 break;
1382
1383 epoch = next_epoch;
1384 } while (1);
1385
bde89a9e 1386 spin_unlock(&connection->epoch_lock);
b411b363 1387
b411b363
PR
1388 return rv;
1389}
1390
8fe39aac
PR
1391static enum write_ordering_e
1392max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1393{
1394 struct disk_conf *dc;
1395
1396 dc = rcu_dereference(bdev->disk_conf);
1397
f6ba8636
AG
1398 if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1399 wo = WO_DRAIN_IO;
1400 if (wo == WO_DRAIN_IO && !dc->disk_drain)
1401 wo = WO_NONE;
8fe39aac
PR
1402
1403 return wo;
1404}
1405
b411b363
PR
1406/**
1407 * drbd_bump_write_ordering() - Fall back to an other write ordering method
bde89a9e 1408 * @connection: DRBD connection.
b411b363
PR
1409 * @wo: Write ordering method to try.
1410 */
8fe39aac
PR
1411void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1412 enum write_ordering_e wo)
b411b363 1413{
e9526580 1414 struct drbd_device *device;
b411b363 1415 enum write_ordering_e pwo;
4b0007c0 1416 int vnr;
b411b363 1417 static char *write_ordering_str[] = {
f6ba8636
AG
1418 [WO_NONE] = "none",
1419 [WO_DRAIN_IO] = "drain",
1420 [WO_BDEV_FLUSH] = "flush",
b411b363
PR
1421 };
1422
e9526580 1423 pwo = resource->write_ordering;
f6ba8636 1424 if (wo != WO_BDEV_FLUSH)
70df7092 1425 wo = min(pwo, wo);
daeda1cc 1426 rcu_read_lock();
e9526580 1427 idr_for_each_entry(&resource->devices, device, vnr) {
8fe39aac
PR
1428 if (get_ldev(device)) {
1429 wo = max_allowed_wo(device->ldev, wo);
1430 if (device->ldev == bdev)
1431 bdev = NULL;
1432 put_ldev(device);
1433 }
4b0007c0 1434 }
8fe39aac
PR
1435
1436 if (bdev)
1437 wo = max_allowed_wo(bdev, wo);
1438
70df7092
LE
1439 rcu_read_unlock();
1440
e9526580 1441 resource->write_ordering = wo;
f6ba8636 1442 if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
e9526580 1443 drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
b411b363
PR
1444}
1445
dd4f699d
LE
1446/*
1447 * We *may* ignore the discard-zeroes-data setting, if so configured.
1448 *
1449 * Assumption is that it "discard_zeroes_data=0" is only because the backend
1450 * may ignore partial unaligned discards.
1451 *
1452 * LVM/DM thin as of at least
1453 * LVM version: 2.02.115(2)-RHEL7 (2015-01-28)
1454 * Library version: 1.02.93-RHEL7 (2015-01-28)
1455 * Driver version: 4.29.0
1456 * still behaves this way.
1457 *
1458 * For unaligned (wrt. alignment and granularity) or too small discards,
1459 * we zero-out the initial (and/or) trailing unaligned partial chunks,
1460 * but discard all the aligned full chunks.
1461 *
1462 * At least for LVM/DM thin, the result is effectively "discard_zeroes_data=1".
1463 */
1464int drbd_issue_discard_or_zero_out(struct drbd_device *device, sector_t start, unsigned int nr_sectors, bool discard)
1465{
1466 struct block_device *bdev = device->ldev->backing_bdev;
1467 struct request_queue *q = bdev_get_queue(bdev);
1468 sector_t tmp, nr;
1469 unsigned int max_discard_sectors, granularity;
1470 int alignment;
1471 int err = 0;
1472
1473 if (!discard)
1474 goto zero_out;
1475
1476 /* Zero-sector (unknown) and one-sector granularities are the same. */
1477 granularity = max(q->limits.discard_granularity >> 9, 1U);
1478 alignment = (bdev_discard_alignment(bdev) >> 9) % granularity;
1479
1480 max_discard_sectors = min(q->limits.max_discard_sectors, (1U << 22));
1481 max_discard_sectors -= max_discard_sectors % granularity;
1482 if (unlikely(!max_discard_sectors))
1483 goto zero_out;
1484
1485 if (nr_sectors < granularity)
1486 goto zero_out;
1487
1488 tmp = start;
1489 if (sector_div(tmp, granularity) != alignment) {
1490 if (nr_sectors < 2*granularity)
1491 goto zero_out;
1492 /* start + gran - (start + gran - align) % gran */
1493 tmp = start + granularity - alignment;
1494 tmp = start + granularity - sector_div(tmp, granularity);
1495
1496 nr = tmp - start;
1497 err |= blkdev_issue_zeroout(bdev, start, nr, GFP_NOIO, 0);
1498 nr_sectors -= nr;
1499 start = tmp;
1500 }
1501 while (nr_sectors >= granularity) {
1502 nr = min_t(sector_t, nr_sectors, max_discard_sectors);
1503 err |= blkdev_issue_discard(bdev, start, nr, GFP_NOIO, 0);
1504 nr_sectors -= nr;
1505 start += nr;
1506 }
1507 zero_out:
1508 if (nr_sectors) {
1509 err |= blkdev_issue_zeroout(bdev, start, nr_sectors, GFP_NOIO, 0);
1510 }
1511 return err != 0;
1512}
1513
1514static bool can_do_reliable_discards(struct drbd_device *device)
1515{
1516 struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
1517 struct disk_conf *dc;
1518 bool can_do;
1519
1520 if (!blk_queue_discard(q))
1521 return false;
1522
1523 if (q->limits.discard_zeroes_data)
1524 return true;
1525
1526 rcu_read_lock();
1527 dc = rcu_dereference(device->ldev->disk_conf);
1528 can_do = dc->discard_zeroes_if_aligned;
1529 rcu_read_unlock();
1530 return can_do;
1531}
1532
1533void drbd_issue_peer_discard(struct drbd_device *device, struct drbd_peer_request *peer_req)
1534{
1535 /* If the backend cannot discard, or does not guarantee
1536 * read-back zeroes in discarded ranges, we fall back to
1537 * zero-out. Unless configuration specifically requested
1538 * otherwise. */
1539 if (!can_do_reliable_discards(device))
1540 peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT;
1541
1542 if (drbd_issue_discard_or_zero_out(device, peer_req->i.sector,
1543 peer_req->i.size >> 9, !(peer_req->flags & EE_IS_TRIM_USE_ZEROOUT)))
1544 peer_req->flags |= EE_WAS_ERROR;
1545 drbd_endio_write_sec_final(peer_req);
1546}
1547
45bb912b 1548/**
fbe29dec 1549 * drbd_submit_peer_request()
b30ab791 1550 * @device: DRBD device.
db830c46 1551 * @peer_req: peer request
45bb912b 1552 * @rw: flag field, see bio->bi_rw
10f6d992
LE
1553 *
1554 * May spread the pages to multiple bios,
1555 * depending on bio_add_page restrictions.
1556 *
1557 * Returns 0 if all bios have been submitted,
1558 * -ENOMEM if we could not allocate enough bios,
1559 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1560 * single page to an empty bio (which should never happen and likely indicates
1561 * that the lower level IO stack is in some way broken). This has been observed
1562 * on certain Xen deployments.
45bb912b
LE
1563 */
1564/* TODO allocate from our own bio_set. */
b30ab791 1565int drbd_submit_peer_request(struct drbd_device *device,
fbe29dec 1566 struct drbd_peer_request *peer_req,
bb3cc85e
MC
1567 const unsigned op, const unsigned op_flags,
1568 const int fault_type)
45bb912b
LE
1569{
1570 struct bio *bios = NULL;
1571 struct bio *bio;
db830c46
AG
1572 struct page *page = peer_req->pages;
1573 sector_t sector = peer_req->i.sector;
11f8b2b6 1574 unsigned data_size = peer_req->i.size;
45bb912b 1575 unsigned n_bios = 0;
11f8b2b6 1576 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
10f6d992 1577 int err = -ENOMEM;
45bb912b 1578
dd4f699d
LE
1579 /* TRIM/DISCARD: for now, always use the helper function
1580 * blkdev_issue_zeroout(..., discard=true).
1581 * It's synchronous, but it does the right thing wrt. bio splitting.
1582 * Correctness first, performance later. Next step is to code an
1583 * asynchronous variant of the same.
1584 */
1585 if (peer_req->flags & EE_IS_TRIM) {
a0fb3c47
LE
1586 /* wait for all pending IO completions, before we start
1587 * zeroing things out. */
5dd2ca19 1588 conn_wait_active_ee_empty(peer_req->peer_device->connection);
45d2933c
LE
1589 /* add it to the active list now,
1590 * so we can find it to present it in debugfs */
21ae5d7f
LE
1591 peer_req->submit_jif = jiffies;
1592 peer_req->flags |= EE_SUBMITTED;
700ca8c0
PR
1593
1594 /* If this was a resync request from receive_rs_deallocated(),
1595 * it is already on the sync_ee list */
1596 if (list_empty(&peer_req->w.list)) {
1597 spin_lock_irq(&device->resource->req_lock);
1598 list_add_tail(&peer_req->w.list, &device->active_ee);
1599 spin_unlock_irq(&device->resource->req_lock);
1600 }
1601
dd4f699d 1602 drbd_issue_peer_discard(device, peer_req);
a0fb3c47
LE
1603 return 0;
1604 }
1605
45bb912b
LE
1606 /* In most cases, we will only need one bio. But in case the lower
1607 * level restrictions happen to be different at this offset on this
1608 * side than those of the sending peer, we may need to submit the
9476f39d
LE
1609 * request in more than one bio.
1610 *
1611 * Plain bio_alloc is good enough here, this is no DRBD internally
1612 * generated bio, but a bio allocated on behalf of the peer.
1613 */
45bb912b
LE
1614next_bio:
1615 bio = bio_alloc(GFP_NOIO, nr_pages);
1616 if (!bio) {
a0fb3c47 1617 drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
45bb912b
LE
1618 goto fail;
1619 }
db830c46 1620 /* > peer_req->i.sector, unless this is the first bio */
4f024f37 1621 bio->bi_iter.bi_sector = sector;
b30ab791 1622 bio->bi_bdev = device->ldev->backing_bdev;
bb3cc85e 1623 bio_set_op_attrs(bio, op, op_flags);
db830c46 1624 bio->bi_private = peer_req;
fcefa62e 1625 bio->bi_end_io = drbd_peer_request_endio;
45bb912b
LE
1626
1627 bio->bi_next = bios;
1628 bios = bio;
1629 ++n_bios;
1630
1631 page_chain_for_each(page) {
11f8b2b6 1632 unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
45bb912b 1633 if (!bio_add_page(bio, page, len, 0)) {
10f6d992
LE
1634 /* A single page must always be possible!
1635 * But in case it fails anyways,
1636 * we deal with it, and complain (below). */
1637 if (bio->bi_vcnt == 0) {
d0180171 1638 drbd_err(device,
10f6d992
LE
1639 "bio_add_page failed for len=%u, "
1640 "bi_vcnt=0 (bi_sector=%llu)\n",
4f024f37 1641 len, (uint64_t)bio->bi_iter.bi_sector);
10f6d992
LE
1642 err = -ENOSPC;
1643 goto fail;
1644 }
45bb912b
LE
1645 goto next_bio;
1646 }
11f8b2b6 1647 data_size -= len;
45bb912b
LE
1648 sector += len >> 9;
1649 --nr_pages;
1650 }
11f8b2b6 1651 D_ASSERT(device, data_size == 0);
a0fb3c47 1652 D_ASSERT(device, page == NULL);
45bb912b 1653
db830c46 1654 atomic_set(&peer_req->pending_bios, n_bios);
21ae5d7f
LE
1655 /* for debugfs: update timestamp, mark as submitted */
1656 peer_req->submit_jif = jiffies;
1657 peer_req->flags |= EE_SUBMITTED;
45bb912b
LE
1658 do {
1659 bio = bios;
1660 bios = bios->bi_next;
1661 bio->bi_next = NULL;
1662
b30ab791 1663 drbd_generic_make_request(device, fault_type, bio);
45bb912b 1664 } while (bios);
45bb912b
LE
1665 return 0;
1666
1667fail:
1668 while (bios) {
1669 bio = bios;
1670 bios = bios->bi_next;
1671 bio_put(bio);
1672 }
10f6d992 1673 return err;
45bb912b
LE
1674}
1675
b30ab791 1676static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
db830c46 1677 struct drbd_peer_request *peer_req)
53840641 1678{
db830c46 1679 struct drbd_interval *i = &peer_req->i;
53840641 1680
b30ab791 1681 drbd_remove_interval(&device->write_requests, i);
53840641
AG
1682 drbd_clear_interval(i);
1683
6c852bec 1684 /* Wake up any processes waiting for this peer request to complete. */
53840641 1685 if (i->waiting)
b30ab791 1686 wake_up(&device->misc_wait);
53840641
AG
1687}
1688
bde89a9e 1689static void conn_wait_active_ee_empty(struct drbd_connection *connection)
77fede51 1690{
c06ece6b 1691 struct drbd_peer_device *peer_device;
77fede51
PR
1692 int vnr;
1693
1694 rcu_read_lock();
c06ece6b
AG
1695 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1696 struct drbd_device *device = peer_device->device;
1697
b30ab791 1698 kref_get(&device->kref);
77fede51 1699 rcu_read_unlock();
b30ab791 1700 drbd_wait_ee_list_empty(device, &device->active_ee);
05a10ec7 1701 kref_put(&device->kref, drbd_destroy_device);
77fede51
PR
1702 rcu_read_lock();
1703 }
1704 rcu_read_unlock();
1705}
1706
bde89a9e 1707static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
b411b363 1708{
2451fc3b 1709 int rv;
e658983a 1710 struct p_barrier *p = pi->data;
b411b363
PR
1711 struct drbd_epoch *epoch;
1712
9ed57dcb
LE
1713 /* FIXME these are unacked on connection,
1714 * not a specific (peer)device.
1715 */
bde89a9e
AG
1716 connection->current_epoch->barrier_nr = p->barrier;
1717 connection->current_epoch->connection = connection;
1718 rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
b411b363
PR
1719
1720 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1721 * the activity log, which means it would not be resynced in case the
1722 * R_PRIMARY crashes now.
1723 * Therefore we must send the barrier_ack after the barrier request was
1724 * completed. */
e9526580 1725 switch (connection->resource->write_ordering) {
f6ba8636 1726 case WO_NONE:
b411b363 1727 if (rv == FE_RECYCLED)
82bc0194 1728 return 0;
2451fc3b
PR
1729
1730 /* receiver context, in the writeout path of the other node.
1731 * avoid potential distributed deadlock */
1732 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1733 if (epoch)
1734 break;
1735 else
1ec861eb 1736 drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
2451fc3b 1737 /* Fall through */
b411b363 1738
f6ba8636
AG
1739 case WO_BDEV_FLUSH:
1740 case WO_DRAIN_IO:
bde89a9e
AG
1741 conn_wait_active_ee_empty(connection);
1742 drbd_flush(connection);
2451fc3b 1743
bde89a9e 1744 if (atomic_read(&connection->current_epoch->epoch_size)) {
2451fc3b
PR
1745 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1746 if (epoch)
1747 break;
b411b363
PR
1748 }
1749
82bc0194 1750 return 0;
2451fc3b 1751 default:
e9526580
PR
1752 drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1753 connection->resource->write_ordering);
82bc0194 1754 return -EIO;
b411b363
PR
1755 }
1756
1757 epoch->flags = 0;
1758 atomic_set(&epoch->epoch_size, 0);
1759 atomic_set(&epoch->active, 0);
1760
bde89a9e
AG
1761 spin_lock(&connection->epoch_lock);
1762 if (atomic_read(&connection->current_epoch->epoch_size)) {
1763 list_add(&epoch->list, &connection->current_epoch->list);
1764 connection->current_epoch = epoch;
1765 connection->epochs++;
b411b363
PR
1766 } else {
1767 /* The current_epoch got recycled while we allocated this one... */
1768 kfree(epoch);
1769 }
bde89a9e 1770 spin_unlock(&connection->epoch_lock);
b411b363 1771
82bc0194 1772 return 0;
b411b363
PR
1773}
1774
1775/* used from receive_RSDataReply (recv_resync_read)
1776 * and from receive_Data */
f6ffca9f 1777static struct drbd_peer_request *
69a22773 1778read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
a0fb3c47 1779 struct packet_info *pi) __must_hold(local)
b411b363 1780{
69a22773 1781 struct drbd_device *device = peer_device->device;
b30ab791 1782 const sector_t capacity = drbd_get_capacity(device->this_bdev);
db830c46 1783 struct drbd_peer_request *peer_req;
b411b363 1784 struct page *page;
11f8b2b6
AG
1785 int digest_size, err;
1786 unsigned int data_size = pi->size, ds;
69a22773
AG
1787 void *dig_in = peer_device->connection->int_dig_in;
1788 void *dig_vv = peer_device->connection->int_dig_vv;
6b4388ac 1789 unsigned long *data;
a0fb3c47 1790 struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
b411b363 1791
11f8b2b6 1792 digest_size = 0;
a0fb3c47 1793 if (!trim && peer_device->connection->peer_integrity_tfm) {
9534d671 1794 digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
9f5bdc33
AG
1795 /*
1796 * FIXME: Receive the incoming digest into the receive buffer
1797 * here, together with its struct p_data?
1798 */
11f8b2b6 1799 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
a5c31904 1800 if (err)
b411b363 1801 return NULL;
11f8b2b6 1802 data_size -= digest_size;
b411b363
PR
1803 }
1804
a0fb3c47
LE
1805 if (trim) {
1806 D_ASSERT(peer_device, data_size == 0);
1807 data_size = be32_to_cpu(trim->size);
1808 }
1809
841ce241
AG
1810 if (!expect(IS_ALIGNED(data_size, 512)))
1811 return NULL;
a0fb3c47
LE
1812 /* prepare for larger trim requests. */
1813 if (!trim && !expect(data_size <= DRBD_MAX_BIO_SIZE))
841ce241 1814 return NULL;
b411b363 1815
6666032a
LE
1816 /* even though we trust out peer,
1817 * we sometimes have to double check. */
1818 if (sector + (data_size>>9) > capacity) {
d0180171 1819 drbd_err(device, "request from peer beyond end of local disk: "
fdda6544 1820 "capacity: %llus < sector: %llus + size: %u\n",
6666032a
LE
1821 (unsigned long long)capacity,
1822 (unsigned long long)sector, data_size);
1823 return NULL;
1824 }
1825
b411b363
PR
1826 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1827 * "criss-cross" setup, that might cause write-out on some other DRBD,
1828 * which in turn might block on the other node at this very place. */
a0fb3c47 1829 peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, trim == NULL, GFP_NOIO);
db830c46 1830 if (!peer_req)
b411b363 1831 return NULL;
45bb912b 1832
21ae5d7f 1833 peer_req->flags |= EE_WRITE;
a0fb3c47 1834 if (trim)
81a3537a 1835 return peer_req;
a73ff323 1836
b411b363 1837 ds = data_size;
db830c46 1838 page = peer_req->pages;
45bb912b
LE
1839 page_chain_for_each(page) {
1840 unsigned len = min_t(int, ds, PAGE_SIZE);
6b4388ac 1841 data = kmap(page);
69a22773 1842 err = drbd_recv_all_warn(peer_device->connection, data, len);
b30ab791 1843 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
d0180171 1844 drbd_err(device, "Fault injection: Corrupting data on receive\n");
6b4388ac
PR
1845 data[0] = data[0] ^ (unsigned long)-1;
1846 }
b411b363 1847 kunmap(page);
a5c31904 1848 if (err) {
b30ab791 1849 drbd_free_peer_req(device, peer_req);
b411b363
PR
1850 return NULL;
1851 }
a5c31904 1852 ds -= len;
b411b363
PR
1853 }
1854
11f8b2b6 1855 if (digest_size) {
69a22773 1856 drbd_csum_ee(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv);
11f8b2b6 1857 if (memcmp(dig_in, dig_vv, digest_size)) {
d0180171 1858 drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
470be44a 1859 (unsigned long long)sector, data_size);
b30ab791 1860 drbd_free_peer_req(device, peer_req);
b411b363
PR
1861 return NULL;
1862 }
1863 }
11f8b2b6 1864 device->recv_cnt += data_size >> 9;
db830c46 1865 return peer_req;
b411b363
PR
1866}
1867
1868/* drbd_drain_block() just takes a data block
1869 * out of the socket input buffer, and discards it.
1870 */
69a22773 1871static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
b411b363
PR
1872{
1873 struct page *page;
a5c31904 1874 int err = 0;
b411b363
PR
1875 void *data;
1876
c3470cde 1877 if (!data_size)
fc5be839 1878 return 0;
c3470cde 1879
69a22773 1880 page = drbd_alloc_pages(peer_device, 1, 1);
b411b363
PR
1881
1882 data = kmap(page);
1883 while (data_size) {
fc5be839
AG
1884 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1885
69a22773 1886 err = drbd_recv_all_warn(peer_device->connection, data, len);
a5c31904 1887 if (err)
b411b363 1888 break;
a5c31904 1889 data_size -= len;
b411b363
PR
1890 }
1891 kunmap(page);
69a22773 1892 drbd_free_pages(peer_device->device, page, 0);
fc5be839 1893 return err;
b411b363
PR
1894}
1895
69a22773 1896static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
b411b363
PR
1897 sector_t sector, int data_size)
1898{
7988613b
KO
1899 struct bio_vec bvec;
1900 struct bvec_iter iter;
b411b363 1901 struct bio *bio;
11f8b2b6 1902 int digest_size, err, expect;
69a22773
AG
1903 void *dig_in = peer_device->connection->int_dig_in;
1904 void *dig_vv = peer_device->connection->int_dig_vv;
b411b363 1905
11f8b2b6 1906 digest_size = 0;
69a22773 1907 if (peer_device->connection->peer_integrity_tfm) {
9534d671 1908 digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
11f8b2b6 1909 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
a5c31904
AG
1910 if (err)
1911 return err;
11f8b2b6 1912 data_size -= digest_size;
b411b363
PR
1913 }
1914
b411b363
PR
1915 /* optimistically update recv_cnt. if receiving fails below,
1916 * we disconnect anyways, and counters will be reset. */
69a22773 1917 peer_device->device->recv_cnt += data_size>>9;
b411b363
PR
1918
1919 bio = req->master_bio;
69a22773 1920 D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
b411b363 1921
7988613b
KO
1922 bio_for_each_segment(bvec, bio, iter) {
1923 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1924 expect = min_t(int, data_size, bvec.bv_len);
69a22773 1925 err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
7988613b 1926 kunmap(bvec.bv_page);
a5c31904
AG
1927 if (err)
1928 return err;
1929 data_size -= expect;
b411b363
PR
1930 }
1931
11f8b2b6 1932 if (digest_size) {
69a22773 1933 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
11f8b2b6 1934 if (memcmp(dig_in, dig_vv, digest_size)) {
69a22773 1935 drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
28284cef 1936 return -EINVAL;
b411b363
PR
1937 }
1938 }
1939
69a22773 1940 D_ASSERT(peer_device->device, data_size == 0);
28284cef 1941 return 0;
b411b363
PR
1942}
1943
a990be46 1944/*
668700b4 1945 * e_end_resync_block() is called in ack_sender context via
a990be46
AG
1946 * drbd_finish_peer_reqs().
1947 */
99920dc5 1948static int e_end_resync_block(struct drbd_work *w, int unused)
b411b363 1949{
8050e6d0 1950 struct drbd_peer_request *peer_req =
a8cd15ba
AG
1951 container_of(w, struct drbd_peer_request, w);
1952 struct drbd_peer_device *peer_device = peer_req->peer_device;
1953 struct drbd_device *device = peer_device->device;
db830c46 1954 sector_t sector = peer_req->i.sector;
99920dc5 1955 int err;
b411b363 1956
0b0ba1ef 1957 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
b411b363 1958
db830c46 1959 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
b30ab791 1960 drbd_set_in_sync(device, sector, peer_req->i.size);
a8cd15ba 1961 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
b411b363
PR
1962 } else {
1963 /* Record failure to sync */
b30ab791 1964 drbd_rs_failed_io(device, sector, peer_req->i.size);
b411b363 1965
a8cd15ba 1966 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
b411b363 1967 }
b30ab791 1968 dec_unacked(device);
b411b363 1969
99920dc5 1970 return err;
b411b363
PR
1971}
1972
69a22773 1973static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
a0fb3c47 1974 struct packet_info *pi) __releases(local)
b411b363 1975{
69a22773 1976 struct drbd_device *device = peer_device->device;
db830c46 1977 struct drbd_peer_request *peer_req;
b411b363 1978
a0fb3c47 1979 peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
db830c46 1980 if (!peer_req)
45bb912b 1981 goto fail;
b411b363 1982
b30ab791 1983 dec_rs_pending(device);
b411b363 1984
b30ab791 1985 inc_unacked(device);
b411b363
PR
1986 /* corresponding dec_unacked() in e_end_resync_block()
1987 * respective _drbd_clear_done_ee */
1988
a8cd15ba 1989 peer_req->w.cb = e_end_resync_block;
21ae5d7f 1990 peer_req->submit_jif = jiffies;
45bb912b 1991
0500813f 1992 spin_lock_irq(&device->resource->req_lock);
b9ed7080 1993 list_add_tail(&peer_req->w.list, &device->sync_ee);
0500813f 1994 spin_unlock_irq(&device->resource->req_lock);
b411b363 1995
a0fb3c47 1996 atomic_add(pi->size >> 9, &device->rs_sect_ev);
bb3cc85e
MC
1997 if (drbd_submit_peer_request(device, peer_req, REQ_OP_WRITE, 0,
1998 DRBD_FAULT_RS_WR) == 0)
e1c1b0fc 1999 return 0;
b411b363 2000
10f6d992 2001 /* don't care for the reason here */
d0180171 2002 drbd_err(device, "submit failed, triggering re-connect\n");
0500813f 2003 spin_lock_irq(&device->resource->req_lock);
a8cd15ba 2004 list_del(&peer_req->w.list);
0500813f 2005 spin_unlock_irq(&device->resource->req_lock);
22cc37a9 2006
b30ab791 2007 drbd_free_peer_req(device, peer_req);
45bb912b 2008fail:
b30ab791 2009 put_ldev(device);
e1c1b0fc 2010 return -EIO;
b411b363
PR
2011}
2012
668eebc6 2013static struct drbd_request *
b30ab791 2014find_request(struct drbd_device *device, struct rb_root *root, u64 id,
bc9c5c41 2015 sector_t sector, bool missing_ok, const char *func)
51624585 2016{
51624585
AG
2017 struct drbd_request *req;
2018
bc9c5c41
AG
2019 /* Request object according to our peer */
2020 req = (struct drbd_request *)(unsigned long)id;
5e472264 2021 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
668eebc6 2022 return req;
c3afd8f5 2023 if (!missing_ok) {
d0180171 2024 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
c3afd8f5
AG
2025 (unsigned long)id, (unsigned long long)sector);
2026 }
51624585 2027 return NULL;
b411b363
PR
2028}
2029
bde89a9e 2030static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
b411b363 2031{
9f4fe9ad 2032 struct drbd_peer_device *peer_device;
b30ab791 2033 struct drbd_device *device;
b411b363
PR
2034 struct drbd_request *req;
2035 sector_t sector;
82bc0194 2036 int err;
e658983a 2037 struct p_data *p = pi->data;
4a76b161 2038
9f4fe9ad
AG
2039 peer_device = conn_peer_device(connection, pi->vnr);
2040 if (!peer_device)
4a76b161 2041 return -EIO;
9f4fe9ad 2042 device = peer_device->device;
b411b363
PR
2043
2044 sector = be64_to_cpu(p->sector);
2045
0500813f 2046 spin_lock_irq(&device->resource->req_lock);
b30ab791 2047 req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
0500813f 2048 spin_unlock_irq(&device->resource->req_lock);
c3afd8f5 2049 if (unlikely(!req))
82bc0194 2050 return -EIO;
b411b363 2051
24c4830c 2052 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
b411b363
PR
2053 * special casing it there for the various failure cases.
2054 * still no race with drbd_fail_pending_reads */
69a22773 2055 err = recv_dless_read(peer_device, req, sector, pi->size);
82bc0194 2056 if (!err)
8554df1c 2057 req_mod(req, DATA_RECEIVED);
b411b363
PR
2058 /* else: nothing. handled from drbd_disconnect...
2059 * I don't think we may complete this just yet
2060 * in case we are "on-disconnect: freeze" */
2061
82bc0194 2062 return err;
b411b363
PR
2063}
2064
bde89a9e 2065static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
b411b363 2066{
9f4fe9ad 2067 struct drbd_peer_device *peer_device;
b30ab791 2068 struct drbd_device *device;
b411b363 2069 sector_t sector;
82bc0194 2070 int err;
e658983a 2071 struct p_data *p = pi->data;
4a76b161 2072
9f4fe9ad
AG
2073 peer_device = conn_peer_device(connection, pi->vnr);
2074 if (!peer_device)
4a76b161 2075 return -EIO;
9f4fe9ad 2076 device = peer_device->device;
b411b363
PR
2077
2078 sector = be64_to_cpu(p->sector);
0b0ba1ef 2079 D_ASSERT(device, p->block_id == ID_SYNCER);
b411b363 2080
b30ab791 2081 if (get_ldev(device)) {
b411b363
PR
2082 /* data is submitted to disk within recv_resync_read.
2083 * corresponding put_ldev done below on error,
fcefa62e 2084 * or in drbd_peer_request_endio. */
a0fb3c47 2085 err = recv_resync_read(peer_device, sector, pi);
b411b363
PR
2086 } else {
2087 if (__ratelimit(&drbd_ratelimit_state))
d0180171 2088 drbd_err(device, "Can not write resync data to local disk.\n");
b411b363 2089
69a22773 2090 err = drbd_drain_block(peer_device, pi->size);
b411b363 2091
69a22773 2092 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
b411b363
PR
2093 }
2094
b30ab791 2095 atomic_add(pi->size >> 9, &device->rs_sect_in);
778f271d 2096
82bc0194 2097 return err;
b411b363
PR
2098}
2099
b30ab791 2100static void restart_conflicting_writes(struct drbd_device *device,
7be8da07 2101 sector_t sector, int size)
b411b363 2102{
7be8da07
AG
2103 struct drbd_interval *i;
2104 struct drbd_request *req;
2105
b30ab791 2106 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
7be8da07
AG
2107 if (!i->local)
2108 continue;
2109 req = container_of(i, struct drbd_request, i);
2110 if (req->rq_state & RQ_LOCAL_PENDING ||
2111 !(req->rq_state & RQ_POSTPONED))
2112 continue;
2312f0b3
LE
2113 /* as it is RQ_POSTPONED, this will cause it to
2114 * be queued on the retry workqueue. */
d4dabbe2 2115 __req_mod(req, CONFLICT_RESOLVED, NULL);
7be8da07
AG
2116 }
2117}
b411b363 2118
a990be46 2119/*
668700b4 2120 * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
b411b363 2121 */
99920dc5 2122static int e_end_block(struct drbd_work *w, int cancel)
b411b363 2123{
8050e6d0 2124 struct drbd_peer_request *peer_req =
a8cd15ba
AG
2125 container_of(w, struct drbd_peer_request, w);
2126 struct drbd_peer_device *peer_device = peer_req->peer_device;
2127 struct drbd_device *device = peer_device->device;
db830c46 2128 sector_t sector = peer_req->i.sector;
99920dc5 2129 int err = 0, pcmd;
b411b363 2130
303d1448 2131 if (peer_req->flags & EE_SEND_WRITE_ACK) {
db830c46 2132 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
b30ab791
AG
2133 pcmd = (device->state.conn >= C_SYNC_SOURCE &&
2134 device->state.conn <= C_PAUSED_SYNC_T &&
db830c46 2135 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
b411b363 2136 P_RS_WRITE_ACK : P_WRITE_ACK;
a8cd15ba 2137 err = drbd_send_ack(peer_device, pcmd, peer_req);
b411b363 2138 if (pcmd == P_RS_WRITE_ACK)
b30ab791 2139 drbd_set_in_sync(device, sector, peer_req->i.size);
b411b363 2140 } else {
a8cd15ba 2141 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
b411b363
PR
2142 /* we expect it to be marked out of sync anyways...
2143 * maybe assert this? */
2144 }
b30ab791 2145 dec_unacked(device);
b411b363 2146 }
08d0dabf 2147
b411b363
PR
2148 /* we delete from the conflict detection hash _after_ we sent out the
2149 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
302bdeae 2150 if (peer_req->flags & EE_IN_INTERVAL_TREE) {
0500813f 2151 spin_lock_irq(&device->resource->req_lock);
0b0ba1ef 2152 D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
b30ab791 2153 drbd_remove_epoch_entry_interval(device, peer_req);
7be8da07 2154 if (peer_req->flags & EE_RESTART_REQUESTS)
b30ab791 2155 restart_conflicting_writes(device, sector, peer_req->i.size);
0500813f 2156 spin_unlock_irq(&device->resource->req_lock);
bb3bfe96 2157 } else
0b0ba1ef 2158 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
b411b363 2159
5dd2ca19 2160 drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
b411b363 2161
99920dc5 2162 return err;
b411b363
PR
2163}
2164
a8cd15ba 2165static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
b411b363 2166{
8050e6d0 2167 struct drbd_peer_request *peer_req =
a8cd15ba
AG
2168 container_of(w, struct drbd_peer_request, w);
2169 struct drbd_peer_device *peer_device = peer_req->peer_device;
99920dc5 2170 int err;
b411b363 2171
a8cd15ba
AG
2172 err = drbd_send_ack(peer_device, ack, peer_req);
2173 dec_unacked(peer_device->device);
b411b363 2174
99920dc5 2175 return err;
b411b363
PR
2176}
2177
d4dabbe2 2178static int e_send_superseded(struct drbd_work *w, int unused)
7be8da07 2179{
a8cd15ba 2180 return e_send_ack(w, P_SUPERSEDED);
7be8da07
AG
2181}
2182
99920dc5 2183static int e_send_retry_write(struct drbd_work *w, int unused)
7be8da07 2184{
a8cd15ba
AG
2185 struct drbd_peer_request *peer_req =
2186 container_of(w, struct drbd_peer_request, w);
2187 struct drbd_connection *connection = peer_req->peer_device->connection;
7be8da07 2188
a8cd15ba 2189 return e_send_ack(w, connection->agreed_pro_version >= 100 ?
d4dabbe2 2190 P_RETRY_WRITE : P_SUPERSEDED);
7be8da07 2191}
b411b363 2192
3e394da1
AG
2193static bool seq_greater(u32 a, u32 b)
2194{
2195 /*
2196 * We assume 32-bit wrap-around here.
2197 * For 24-bit wrap-around, we would have to shift:
2198 * a <<= 8; b <<= 8;
2199 */
2200 return (s32)a - (s32)b > 0;
2201}
b411b363 2202
3e394da1
AG
2203static u32 seq_max(u32 a, u32 b)
2204{
2205 return seq_greater(a, b) ? a : b;
b411b363
PR
2206}
2207
69a22773 2208static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
3e394da1 2209{
69a22773 2210 struct drbd_device *device = peer_device->device;
3c13b680 2211 unsigned int newest_peer_seq;
3e394da1 2212
69a22773 2213 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
b30ab791
AG
2214 spin_lock(&device->peer_seq_lock);
2215 newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2216 device->peer_seq = newest_peer_seq;
2217 spin_unlock(&device->peer_seq_lock);
2218 /* wake up only if we actually changed device->peer_seq */
3c13b680 2219 if (peer_seq == newest_peer_seq)
b30ab791 2220 wake_up(&device->seq_wait);
7be8da07 2221 }
b411b363
PR
2222}
2223
d93f6302 2224static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
b6a370ba 2225{
d93f6302
LE
2226 return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2227}
b6a370ba 2228
d93f6302 2229/* maybe change sync_ee into interval trees as well? */
b30ab791 2230static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
d93f6302
LE
2231{
2232 struct drbd_peer_request *rs_req;
b6a370ba
PR
2233 bool rv = 0;
2234
0500813f 2235 spin_lock_irq(&device->resource->req_lock);
a8cd15ba 2236 list_for_each_entry(rs_req, &device->sync_ee, w.list) {
d93f6302
LE
2237 if (overlaps(peer_req->i.sector, peer_req->i.size,
2238 rs_req->i.sector, rs_req->i.size)) {
b6a370ba
PR
2239 rv = 1;
2240 break;
2241 }
2242 }
0500813f 2243 spin_unlock_irq(&device->resource->req_lock);
b6a370ba
PR
2244
2245 return rv;
2246}
2247
b411b363
PR
2248/* Called from receive_Data.
2249 * Synchronize packets on sock with packets on msock.
2250 *
2251 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2252 * packet traveling on msock, they are still processed in the order they have
2253 * been sent.
2254 *
2255 * Note: we don't care for Ack packets overtaking P_DATA packets.
2256 *
b30ab791 2257 * In case packet_seq is larger than device->peer_seq number, there are
b411b363 2258 * outstanding packets on the msock. We wait for them to arrive.
b30ab791 2259 * In case we are the logically next packet, we update device->peer_seq
b411b363
PR
2260 * ourselves. Correctly handles 32bit wrap around.
2261 *
2262 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2263 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2264 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2265 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2266 *
2267 * returns 0 if we may process the packet,
2268 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
69a22773 2269static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
b411b363 2270{
69a22773 2271 struct drbd_device *device = peer_device->device;
b411b363 2272 DEFINE_WAIT(wait);
b411b363 2273 long timeout;
b874d231 2274 int ret = 0, tp;
7be8da07 2275
69a22773 2276 if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
7be8da07
AG
2277 return 0;
2278
b30ab791 2279 spin_lock(&device->peer_seq_lock);
b411b363 2280 for (;;) {
b30ab791
AG
2281 if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2282 device->peer_seq = seq_max(device->peer_seq, peer_seq);
b411b363 2283 break;
7be8da07 2284 }
b874d231 2285
b411b363
PR
2286 if (signal_pending(current)) {
2287 ret = -ERESTARTSYS;
2288 break;
2289 }
b874d231
PR
2290
2291 rcu_read_lock();
5dd2ca19 2292 tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
b874d231
PR
2293 rcu_read_unlock();
2294
2295 if (!tp)
2296 break;
2297
2298 /* Only need to wait if two_primaries is enabled */
b30ab791
AG
2299 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2300 spin_unlock(&device->peer_seq_lock);
44ed167d 2301 rcu_read_lock();
69a22773 2302 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
44ed167d 2303 rcu_read_unlock();
71b1c1eb 2304 timeout = schedule_timeout(timeout);
b30ab791 2305 spin_lock(&device->peer_seq_lock);
7be8da07 2306 if (!timeout) {
b411b363 2307 ret = -ETIMEDOUT;
d0180171 2308 drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
b411b363
PR
2309 break;
2310 }
2311 }
b30ab791
AG
2312 spin_unlock(&device->peer_seq_lock);
2313 finish_wait(&device->seq_wait, &wait);
b411b363
PR
2314 return ret;
2315}
2316
688593c5
LE
2317/* see also bio_flags_to_wire()
2318 * DRBD_REQ_*, because we need to semantically map the flags to data packet
2319 * flags and back. We may replicate to other kernel versions. */
bb3cc85e 2320static unsigned long wire_flags_to_bio_flags(u32 dpf)
76d2e7ec 2321{
688593c5
LE
2322 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2323 (dpf & DP_FUA ? REQ_FUA : 0) |
28a8f0d3 2324 (dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
bb3cc85e
MC
2325}
2326
2327static unsigned long wire_flags_to_bio_op(u32 dpf)
2328{
2329 if (dpf & DP_DISCARD)
2330 return REQ_OP_DISCARD;
2331 else
2332 return REQ_OP_WRITE;
76d2e7ec
PR
2333}
2334
b30ab791 2335static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
7be8da07
AG
2336 unsigned int size)
2337{
2338 struct drbd_interval *i;
2339
2340 repeat:
b30ab791 2341 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
7be8da07
AG
2342 struct drbd_request *req;
2343 struct bio_and_error m;
2344
2345 if (!i->local)
2346 continue;
2347 req = container_of(i, struct drbd_request, i);
2348 if (!(req->rq_state & RQ_POSTPONED))
2349 continue;
2350 req->rq_state &= ~RQ_POSTPONED;
2351 __req_mod(req, NEG_ACKED, &m);
0500813f 2352 spin_unlock_irq(&device->resource->req_lock);
7be8da07 2353 if (m.bio)
b30ab791 2354 complete_master_bio(device, &m);
0500813f 2355 spin_lock_irq(&device->resource->req_lock);
7be8da07
AG
2356 goto repeat;
2357 }
2358}
2359
b30ab791 2360static int handle_write_conflicts(struct drbd_device *device,
7be8da07
AG
2361 struct drbd_peer_request *peer_req)
2362{
e33b32de 2363 struct drbd_connection *connection = peer_req->peer_device->connection;
bde89a9e 2364 bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
7be8da07
AG
2365 sector_t sector = peer_req->i.sector;
2366 const unsigned int size = peer_req->i.size;
2367 struct drbd_interval *i;
2368 bool equal;
2369 int err;
2370
2371 /*
2372 * Inserting the peer request into the write_requests tree will prevent
2373 * new conflicting local requests from being added.
2374 */
b30ab791 2375 drbd_insert_interval(&device->write_requests, &peer_req->i);
7be8da07
AG
2376
2377 repeat:
b30ab791 2378 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
7be8da07
AG
2379 if (i == &peer_req->i)
2380 continue;
08d0dabf
LE
2381 if (i->completed)
2382 continue;
7be8da07
AG
2383
2384 if (!i->local) {
2385 /*
2386 * Our peer has sent a conflicting remote request; this
2387 * should not happen in a two-node setup. Wait for the
2388 * earlier peer request to complete.
2389 */
b30ab791 2390 err = drbd_wait_misc(device, i);
7be8da07
AG
2391 if (err)
2392 goto out;
2393 goto repeat;
2394 }
2395
2396 equal = i->sector == sector && i->size == size;
2397 if (resolve_conflicts) {
2398 /*
2399 * If the peer request is fully contained within the
d4dabbe2
LE
2400 * overlapping request, it can be considered overwritten
2401 * and thus superseded; otherwise, it will be retried
2402 * once all overlapping requests have completed.
7be8da07 2403 */
d4dabbe2 2404 bool superseded = i->sector <= sector && i->sector +
7be8da07
AG
2405 (i->size >> 9) >= sector + (size >> 9);
2406
2407 if (!equal)
d0180171 2408 drbd_alert(device, "Concurrent writes detected: "
7be8da07
AG
2409 "local=%llus +%u, remote=%llus +%u, "
2410 "assuming %s came first\n",
2411 (unsigned long long)i->sector, i->size,
2412 (unsigned long long)sector, size,
d4dabbe2 2413 superseded ? "local" : "remote");
7be8da07 2414
a8cd15ba 2415 peer_req->w.cb = superseded ? e_send_superseded :
7be8da07 2416 e_send_retry_write;
a8cd15ba 2417 list_add_tail(&peer_req->w.list, &device->done_ee);
668700b4 2418 queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
7be8da07
AG
2419
2420 err = -ENOENT;
2421 goto out;
2422 } else {
2423 struct drbd_request *req =
2424 container_of(i, struct drbd_request, i);
2425
2426 if (!equal)
d0180171 2427 drbd_alert(device, "Concurrent writes detected: "
7be8da07
AG
2428 "local=%llus +%u, remote=%llus +%u\n",
2429 (unsigned long long)i->sector, i->size,
2430 (unsigned long long)sector, size);
2431
2432 if (req->rq_state & RQ_LOCAL_PENDING ||
2433 !(req->rq_state & RQ_POSTPONED)) {
2434 /*
2435 * Wait for the node with the discard flag to
d4dabbe2
LE
2436 * decide if this request has been superseded
2437 * or needs to be retried.
2438 * Requests that have been superseded will
7be8da07
AG
2439 * disappear from the write_requests tree.
2440 *
2441 * In addition, wait for the conflicting
2442 * request to finish locally before submitting
2443 * the conflicting peer request.
2444 */
b30ab791 2445 err = drbd_wait_misc(device, &req->i);
7be8da07 2446 if (err) {
e33b32de 2447 _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
b30ab791 2448 fail_postponed_requests(device, sector, size);
7be8da07
AG
2449 goto out;
2450 }
2451 goto repeat;
2452 }
2453 /*
2454 * Remember to restart the conflicting requests after
2455 * the new peer request has completed.
2456 */
2457 peer_req->flags |= EE_RESTART_REQUESTS;
2458 }
2459 }
2460 err = 0;
2461
2462 out:
2463 if (err)
b30ab791 2464 drbd_remove_epoch_entry_interval(device, peer_req);
7be8da07
AG
2465 return err;
2466}
2467
b411b363 2468/* mirrored write */
bde89a9e 2469static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
b411b363 2470{
9f4fe9ad 2471 struct drbd_peer_device *peer_device;
b30ab791 2472 struct drbd_device *device;
21ae5d7f 2473 struct net_conf *nc;
b411b363 2474 sector_t sector;
db830c46 2475 struct drbd_peer_request *peer_req;
e658983a 2476 struct p_data *p = pi->data;
7be8da07 2477 u32 peer_seq = be32_to_cpu(p->seq_num);
bb3cc85e 2478 int op, op_flags;
b411b363 2479 u32 dp_flags;
302bdeae 2480 int err, tp;
b411b363 2481
9f4fe9ad
AG
2482 peer_device = conn_peer_device(connection, pi->vnr);
2483 if (!peer_device)
4a76b161 2484 return -EIO;
9f4fe9ad 2485 device = peer_device->device;
b411b363 2486
b30ab791 2487 if (!get_ldev(device)) {
82bc0194
AG
2488 int err2;
2489
69a22773
AG
2490 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2491 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
bde89a9e 2492 atomic_inc(&connection->current_epoch->epoch_size);
69a22773 2493 err2 = drbd_drain_block(peer_device, pi->size);
82bc0194
AG
2494 if (!err)
2495 err = err2;
2496 return err;
b411b363
PR
2497 }
2498
fcefa62e
AG
2499 /*
2500 * Corresponding put_ldev done either below (on various errors), or in
2501 * drbd_peer_request_endio, if we successfully submit the data at the
2502 * end of this function.
2503 */
b411b363
PR
2504
2505 sector = be64_to_cpu(p->sector);
a0fb3c47 2506 peer_req = read_in_block(peer_device, p->block_id, sector, pi);
db830c46 2507 if (!peer_req) {
b30ab791 2508 put_ldev(device);
82bc0194 2509 return -EIO;
b411b363
PR
2510 }
2511
a8cd15ba 2512 peer_req->w.cb = e_end_block;
21ae5d7f
LE
2513 peer_req->submit_jif = jiffies;
2514 peer_req->flags |= EE_APPLICATION;
b411b363 2515
688593c5 2516 dp_flags = be32_to_cpu(p->dp_flags);
bb3cc85e
MC
2517 op = wire_flags_to_bio_op(dp_flags);
2518 op_flags = wire_flags_to_bio_flags(dp_flags);
a0fb3c47 2519 if (pi->cmd == P_TRIM) {
a0fb3c47 2520 peer_req->flags |= EE_IS_TRIM;
a0fb3c47 2521 D_ASSERT(peer_device, peer_req->i.size > 0);
bb3cc85e 2522 D_ASSERT(peer_device, op == REQ_OP_DISCARD);
a0fb3c47
LE
2523 D_ASSERT(peer_device, peer_req->pages == NULL);
2524 } else if (peer_req->pages == NULL) {
0b0ba1ef
AG
2525 D_ASSERT(device, peer_req->i.size == 0);
2526 D_ASSERT(device, dp_flags & DP_FLUSH);
a73ff323 2527 }
688593c5
LE
2528
2529 if (dp_flags & DP_MAY_SET_IN_SYNC)
db830c46 2530 peer_req->flags |= EE_MAY_SET_IN_SYNC;
688593c5 2531
bde89a9e
AG
2532 spin_lock(&connection->epoch_lock);
2533 peer_req->epoch = connection->current_epoch;
db830c46
AG
2534 atomic_inc(&peer_req->epoch->epoch_size);
2535 atomic_inc(&peer_req->epoch->active);
bde89a9e 2536 spin_unlock(&connection->epoch_lock);
b411b363 2537
302bdeae 2538 rcu_read_lock();
21ae5d7f
LE
2539 nc = rcu_dereference(peer_device->connection->net_conf);
2540 tp = nc->two_primaries;
2541 if (peer_device->connection->agreed_pro_version < 100) {
2542 switch (nc->wire_protocol) {
2543 case DRBD_PROT_C:
2544 dp_flags |= DP_SEND_WRITE_ACK;
2545 break;
2546 case DRBD_PROT_B:
2547 dp_flags |= DP_SEND_RECEIVE_ACK;
2548 break;
2549 }
2550 }
302bdeae 2551 rcu_read_unlock();
21ae5d7f
LE
2552
2553 if (dp_flags & DP_SEND_WRITE_ACK) {
2554 peer_req->flags |= EE_SEND_WRITE_ACK;
2555 inc_unacked(device);
2556 /* corresponding dec_unacked() in e_end_block()
2557 * respective _drbd_clear_done_ee */
2558 }
2559
2560 if (dp_flags & DP_SEND_RECEIVE_ACK) {
2561 /* I really don't like it that the receiver thread
2562 * sends on the msock, but anyways */
5dd2ca19 2563 drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
21ae5d7f
LE
2564 }
2565
302bdeae 2566 if (tp) {
21ae5d7f
LE
2567 /* two primaries implies protocol C */
2568 D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
302bdeae 2569 peer_req->flags |= EE_IN_INTERVAL_TREE;
69a22773 2570 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
7be8da07 2571 if (err)
b411b363 2572 goto out_interrupted;
0500813f 2573 spin_lock_irq(&device->resource->req_lock);
b30ab791 2574 err = handle_write_conflicts(device, peer_req);
7be8da07 2575 if (err) {
0500813f 2576 spin_unlock_irq(&device->resource->req_lock);
7be8da07 2577 if (err == -ENOENT) {
b30ab791 2578 put_ldev(device);
82bc0194 2579 return 0;
b411b363 2580 }
7be8da07 2581 goto out_interrupted;
b411b363 2582 }
b874d231 2583 } else {
69a22773 2584 update_peer_seq(peer_device, peer_seq);
0500813f 2585 spin_lock_irq(&device->resource->req_lock);
b874d231 2586 }
a0fb3c47
LE
2587 /* if we use the zeroout fallback code, we process synchronously
2588 * and we wait for all pending requests, respectively wait for
2589 * active_ee to become empty in drbd_submit_peer_request();
2590 * better not add ourselves here. */
dd4f699d 2591 if ((peer_req->flags & EE_IS_TRIM) == 0)
b9ed7080 2592 list_add_tail(&peer_req->w.list, &device->active_ee);
0500813f 2593 spin_unlock_irq(&device->resource->req_lock);
b411b363 2594
b30ab791
AG
2595 if (device->state.conn == C_SYNC_TARGET)
2596 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
b411b363 2597
b30ab791 2598 if (device->state.pdsk < D_INCONSISTENT) {
b411b363 2599 /* In case we have the only disk of the cluster, */
b30ab791 2600 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
db830c46 2601 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
4dd726f0 2602 drbd_al_begin_io(device, &peer_req->i);
21ae5d7f 2603 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
b411b363
PR
2604 }
2605
bb3cc85e
MC
2606 err = drbd_submit_peer_request(device, peer_req, op, op_flags,
2607 DRBD_FAULT_DT_WR);
82bc0194
AG
2608 if (!err)
2609 return 0;
b411b363 2610
10f6d992 2611 /* don't care for the reason here */
d0180171 2612 drbd_err(device, "submit failed, triggering re-connect\n");
0500813f 2613 spin_lock_irq(&device->resource->req_lock);
a8cd15ba 2614 list_del(&peer_req->w.list);
b30ab791 2615 drbd_remove_epoch_entry_interval(device, peer_req);
0500813f 2616 spin_unlock_irq(&device->resource->req_lock);
21ae5d7f
LE
2617 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2618 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
b30ab791 2619 drbd_al_complete_io(device, &peer_req->i);
21ae5d7f 2620 }
22cc37a9 2621
b411b363 2622out_interrupted:
bde89a9e 2623 drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
b30ab791
AG
2624 put_ldev(device);
2625 drbd_free_peer_req(device, peer_req);
82bc0194 2626 return err;
b411b363
PR
2627}
2628
0f0601f4
LE
2629/* We may throttle resync, if the lower device seems to be busy,
2630 * and current sync rate is above c_min_rate.
2631 *
2632 * To decide whether or not the lower device is busy, we use a scheme similar
2633 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2634 * (more than 64 sectors) of activity we cannot account for with our own resync
2635 * activity, it obviously is "busy".
2636 *
2637 * The current sync rate used here uses only the most recent two step marks,
2638 * to have a short time average so we can react faster.
2639 */
ad3fee79
LE
2640bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2641 bool throttle_if_app_is_waiting)
0f0601f4 2642{
e3555d85 2643 struct lc_element *tmp;
ad3fee79 2644 bool throttle = drbd_rs_c_min_rate_throttle(device);
daeda1cc 2645
ad3fee79
LE
2646 if (!throttle || throttle_if_app_is_waiting)
2647 return throttle;
0f0601f4 2648
b30ab791
AG
2649 spin_lock_irq(&device->al_lock);
2650 tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
e3555d85
PR
2651 if (tmp) {
2652 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
e8299874
LE
2653 if (test_bit(BME_PRIORITY, &bm_ext->flags))
2654 throttle = false;
ad3fee79
LE
2655 /* Do not slow down if app IO is already waiting for this extent,
2656 * and our progress is necessary for application IO to complete. */
e3555d85 2657 }
b30ab791 2658 spin_unlock_irq(&device->al_lock);
e3555d85 2659
e8299874
LE
2660 return throttle;
2661}
2662
2663bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2664{
2665 struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2666 unsigned long db, dt, dbdt;
2667 unsigned int c_min_rate;
2668 int curr_events;
2669
2670 rcu_read_lock();
2671 c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2672 rcu_read_unlock();
2673
2674 /* feature disabled? */
2675 if (c_min_rate == 0)
2676 return false;
2677
0f0601f4
LE
2678 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2679 (int)part_stat_read(&disk->part0, sectors[1]) -
b30ab791 2680 atomic_read(&device->rs_sect_ev);
ad3fee79
LE
2681
2682 if (atomic_read(&device->ap_actlog_cnt)
ff8bd88b 2683 || curr_events - device->rs_last_events > 64) {
0f0601f4
LE
2684 unsigned long rs_left;
2685 int i;
2686
b30ab791 2687 device->rs_last_events = curr_events;
0f0601f4
LE
2688
2689 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2690 * approx. */
b30ab791 2691 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2649f080 2692
b30ab791
AG
2693 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2694 rs_left = device->ov_left;
2649f080 2695 else
b30ab791 2696 rs_left = drbd_bm_total_weight(device) - device->rs_failed;
0f0601f4 2697
b30ab791 2698 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
0f0601f4
LE
2699 if (!dt)
2700 dt++;
b30ab791 2701 db = device->rs_mark_left[i] - rs_left;
0f0601f4
LE
2702 dbdt = Bit2KB(db/dt);
2703
daeda1cc 2704 if (dbdt > c_min_rate)
e8299874 2705 return true;
0f0601f4 2706 }
e8299874 2707 return false;
0f0601f4
LE
2708}
2709
bde89a9e 2710static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
b411b363 2711{
9f4fe9ad 2712 struct drbd_peer_device *peer_device;
b30ab791 2713 struct drbd_device *device;
b411b363 2714 sector_t sector;
4a76b161 2715 sector_t capacity;
db830c46 2716 struct drbd_peer_request *peer_req;
b411b363 2717 struct digest_info *di = NULL;
b18b37be 2718 int size, verb;
b411b363 2719 unsigned int fault_type;
e658983a 2720 struct p_block_req *p = pi->data;
4a76b161 2721
9f4fe9ad
AG
2722 peer_device = conn_peer_device(connection, pi->vnr);
2723 if (!peer_device)
4a76b161 2724 return -EIO;
9f4fe9ad 2725 device = peer_device->device;
b30ab791 2726 capacity = drbd_get_capacity(device->this_bdev);
b411b363
PR
2727
2728 sector = be64_to_cpu(p->sector);
2729 size = be32_to_cpu(p->blksize);
2730
c670a398 2731 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
d0180171 2732 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
b411b363 2733 (unsigned long long)sector, size);
82bc0194 2734 return -EINVAL;
b411b363
PR
2735 }
2736 if (sector + (size>>9) > capacity) {
d0180171 2737 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
b411b363 2738 (unsigned long long)sector, size);
82bc0194 2739 return -EINVAL;
b411b363
PR
2740 }
2741
b30ab791 2742 if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
b18b37be 2743 verb = 1;
e2857216 2744 switch (pi->cmd) {
b18b37be 2745 case P_DATA_REQUEST:
69a22773 2746 drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
b18b37be 2747 break;
700ca8c0 2748 case P_RS_THIN_REQ:
b18b37be
PR
2749 case P_RS_DATA_REQUEST:
2750 case P_CSUM_RS_REQUEST:
2751 case P_OV_REQUEST:
69a22773 2752 drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
b18b37be
PR
2753 break;
2754 case P_OV_REPLY:
2755 verb = 0;
b30ab791 2756 dec_rs_pending(device);
69a22773 2757 drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
b18b37be
PR
2758 break;
2759 default:
49ba9b1b 2760 BUG();
b18b37be
PR
2761 }
2762 if (verb && __ratelimit(&drbd_ratelimit_state))
d0180171 2763 drbd_err(device, "Can not satisfy peer's read request, "
b411b363 2764 "no local data.\n");
b18b37be 2765
a821cc4a 2766 /* drain possibly payload */
69a22773 2767 return drbd_drain_block(peer_device, pi->size);
b411b363
PR
2768 }
2769
2770 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2771 * "criss-cross" setup, that might cause write-out on some other DRBD,
2772 * which in turn might block on the other node at this very place. */
a0fb3c47
LE
2773 peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2774 true /* has real payload */, GFP_NOIO);
db830c46 2775 if (!peer_req) {
b30ab791 2776 put_ldev(device);
82bc0194 2777 return -ENOMEM;
b411b363
PR
2778 }
2779
e2857216 2780 switch (pi->cmd) {
b411b363 2781 case P_DATA_REQUEST:
a8cd15ba 2782 peer_req->w.cb = w_e_end_data_req;
b411b363 2783 fault_type = DRBD_FAULT_DT_RD;
80a40e43 2784 /* application IO, don't drbd_rs_begin_io */
21ae5d7f 2785 peer_req->flags |= EE_APPLICATION;
80a40e43
LE
2786 goto submit;
2787
700ca8c0
PR
2788 case P_RS_THIN_REQ:
2789 /* If at some point in the future we have a smart way to
2790 find out if this data block is completely deallocated,
2791 then we would do something smarter here than reading
2792 the block... */
2793 peer_req->flags |= EE_RS_THIN_REQ;
b411b363 2794 case P_RS_DATA_REQUEST:
a8cd15ba 2795 peer_req->w.cb = w_e_end_rsdata_req;
b411b363 2796 fault_type = DRBD_FAULT_RS_RD;
5f9915bb 2797 /* used in the sector offset progress display */
b30ab791 2798 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
b411b363
PR
2799 break;
2800
2801 case P_OV_REPLY:
2802 case P_CSUM_RS_REQUEST:
2803 fault_type = DRBD_FAULT_RS_RD;
e2857216 2804 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
b411b363
PR
2805 if (!di)
2806 goto out_free_e;
2807
e2857216 2808 di->digest_size = pi->size;
b411b363
PR
2809 di->digest = (((char *)di)+sizeof(struct digest_info));
2810
db830c46
AG
2811 peer_req->digest = di;
2812 peer_req->flags |= EE_HAS_DIGEST;
c36c3ced 2813
9f4fe9ad 2814 if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
b411b363
PR
2815 goto out_free_e;
2816
e2857216 2817 if (pi->cmd == P_CSUM_RS_REQUEST) {
9f4fe9ad 2818 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
a8cd15ba 2819 peer_req->w.cb = w_e_end_csum_rs_req;
5f9915bb 2820 /* used in the sector offset progress display */
b30ab791 2821 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
aaaba345
LE
2822 /* remember to report stats in drbd_resync_finished */
2823 device->use_csums = true;
e2857216 2824 } else if (pi->cmd == P_OV_REPLY) {
2649f080 2825 /* track progress, we may need to throttle */
b30ab791 2826 atomic_add(size >> 9, &device->rs_sect_in);
a8cd15ba 2827 peer_req->w.cb = w_e_end_ov_reply;
b30ab791 2828 dec_rs_pending(device);
0f0601f4
LE
2829 /* drbd_rs_begin_io done when we sent this request,
2830 * but accounting still needs to be done. */
2831 goto submit_for_resync;
b411b363
PR
2832 }
2833 break;
2834
2835 case P_OV_REQUEST:
b30ab791 2836 if (device->ov_start_sector == ~(sector_t)0 &&
9f4fe9ad 2837 peer_device->connection->agreed_pro_version >= 90) {
de228bba
LE
2838 unsigned long now = jiffies;
2839 int i;
b30ab791
AG
2840 device->ov_start_sector = sector;
2841 device->ov_position = sector;
2842 device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2843 device->rs_total = device->ov_left;
de228bba 2844 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
b30ab791
AG
2845 device->rs_mark_left[i] = device->ov_left;
2846 device->rs_mark_time[i] = now;
de228bba 2847 }
d0180171 2848 drbd_info(device, "Online Verify start sector: %llu\n",
b411b363
PR
2849 (unsigned long long)sector);
2850 }
a8cd15ba 2851 peer_req->w.cb = w_e_end_ov_req;
b411b363 2852 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2853 break;
2854
b411b363 2855 default:
49ba9b1b 2856 BUG();
b411b363
PR
2857 }
2858
0f0601f4
LE
2859 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2860 * wrt the receiver, but it is not as straightforward as it may seem.
2861 * Various places in the resync start and stop logic assume resync
2862 * requests are processed in order, requeuing this on the worker thread
2863 * introduces a bunch of new code for synchronization between threads.
2864 *
2865 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2866 * "forever", throttling after drbd_rs_begin_io will lock that extent
2867 * for application writes for the same time. For now, just throttle
2868 * here, where the rest of the code expects the receiver to sleep for
2869 * a while, anyways.
2870 */
2871
2872 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2873 * this defers syncer requests for some time, before letting at least
2874 * on request through. The resync controller on the receiving side
2875 * will adapt to the incoming rate accordingly.
2876 *
2877 * We cannot throttle here if remote is Primary/SyncTarget:
2878 * we would also throttle its application reads.
2879 * In that case, throttling is done on the SyncTarget only.
2880 */
c5a2c150
LE
2881
2882 /* Even though this may be a resync request, we do add to "read_ee";
2883 * "sync_ee" is only used for resync WRITEs.
2884 * Add to list early, so debugfs can find this request
2885 * even if we have to sleep below. */
2886 spin_lock_irq(&device->resource->req_lock);
2887 list_add_tail(&peer_req->w.list, &device->read_ee);
2888 spin_unlock_irq(&device->resource->req_lock);
2889
944410e9 2890 update_receiver_timing_details(connection, drbd_rs_should_slow_down);
ad3fee79
LE
2891 if (device->state.peer != R_PRIMARY
2892 && drbd_rs_should_slow_down(device, sector, false))
e3555d85 2893 schedule_timeout_uninterruptible(HZ/10);
944410e9 2894 update_receiver_timing_details(connection, drbd_rs_begin_io);
b30ab791 2895 if (drbd_rs_begin_io(device, sector))
80a40e43 2896 goto out_free_e;
b411b363 2897
0f0601f4 2898submit_for_resync:
b30ab791 2899 atomic_add(size >> 9, &device->rs_sect_ev);
0f0601f4 2900
80a40e43 2901submit:
944410e9 2902 update_receiver_timing_details(connection, drbd_submit_peer_request);
b30ab791 2903 inc_unacked(device);
bb3cc85e
MC
2904 if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
2905 fault_type) == 0)
82bc0194 2906 return 0;
b411b363 2907
10f6d992 2908 /* don't care for the reason here */
d0180171 2909 drbd_err(device, "submit failed, triggering re-connect\n");
c5a2c150
LE
2910
2911out_free_e:
0500813f 2912 spin_lock_irq(&device->resource->req_lock);
a8cd15ba 2913 list_del(&peer_req->w.list);
0500813f 2914 spin_unlock_irq(&device->resource->req_lock);
22cc37a9
LE
2915 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2916
b30ab791
AG
2917 put_ldev(device);
2918 drbd_free_peer_req(device, peer_req);
82bc0194 2919 return -EIO;
b411b363
PR
2920}
2921
69a22773
AG
2922/**
2923 * drbd_asb_recover_0p - Recover after split-brain with no remaining primaries
2924 */
2925static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
b411b363 2926{
69a22773 2927 struct drbd_device *device = peer_device->device;
b411b363
PR
2928 int self, peer, rv = -100;
2929 unsigned long ch_self, ch_peer;
44ed167d 2930 enum drbd_after_sb_p after_sb_0p;
b411b363 2931
b30ab791
AG
2932 self = device->ldev->md.uuid[UI_BITMAP] & 1;
2933 peer = device->p_uuid[UI_BITMAP] & 1;
b411b363 2934
b30ab791
AG
2935 ch_peer = device->p_uuid[UI_SIZE];
2936 ch_self = device->comm_bm_set;
b411b363 2937
44ed167d 2938 rcu_read_lock();
69a22773 2939 after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
44ed167d
PR
2940 rcu_read_unlock();
2941 switch (after_sb_0p) {
b411b363
PR
2942 case ASB_CONSENSUS:
2943 case ASB_DISCARD_SECONDARY:
2944 case ASB_CALL_HELPER:
44ed167d 2945 case ASB_VIOLENTLY:
d0180171 2946 drbd_err(device, "Configuration error.\n");
b411b363
PR
2947 break;
2948 case ASB_DISCONNECT:
2949 break;
2950 case ASB_DISCARD_YOUNGER_PRI:
2951 if (self == 0 && peer == 1) {
2952 rv = -1;
2953 break;
2954 }
2955 if (self == 1 && peer == 0) {
2956 rv = 1;
2957 break;
2958 }
2959 /* Else fall through to one of the other strategies... */
2960 case ASB_DISCARD_OLDER_PRI:
2961 if (self == 0 && peer == 1) {
2962 rv = 1;
2963 break;
2964 }
2965 if (self == 1 && peer == 0) {
2966 rv = -1;
2967 break;
2968 }
2969 /* Else fall through to one of the other strategies... */
d0180171 2970 drbd_warn(device, "Discard younger/older primary did not find a decision\n"
b411b363
PR
2971 "Using discard-least-changes instead\n");
2972 case ASB_DISCARD_ZERO_CHG:
2973 if (ch_peer == 0 && ch_self == 0) {
69a22773 2974 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
b411b363
PR
2975 ? -1 : 1;
2976 break;
2977 } else {
2978 if (ch_peer == 0) { rv = 1; break; }
2979 if (ch_self == 0) { rv = -1; break; }
2980 }
44ed167d 2981 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
b411b363
PR
2982 break;
2983 case ASB_DISCARD_LEAST_CHG:
2984 if (ch_self < ch_peer)
2985 rv = -1;
2986 else if (ch_self > ch_peer)
2987 rv = 1;
2988 else /* ( ch_self == ch_peer ) */
2989 /* Well, then use something else. */
69a22773 2990 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
b411b363
PR
2991 ? -1 : 1;
2992 break;
2993 case ASB_DISCARD_LOCAL:
2994 rv = -1;
2995 break;
2996 case ASB_DISCARD_REMOTE:
2997 rv = 1;
2998 }
2999
3000 return rv;
3001}
3002
69a22773
AG
3003/**
3004 * drbd_asb_recover_1p - Recover after split-brain with one remaining primary
3005 */
3006static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
b411b363 3007{
69a22773 3008 struct drbd_device *device = peer_device->device;
6184ea21 3009 int hg, rv = -100;
44ed167d 3010 enum drbd_after_sb_p after_sb_1p;
b411b363 3011
44ed167d 3012 rcu_read_lock();
69a22773 3013 after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
44ed167d
PR
3014 rcu_read_unlock();
3015 switch (after_sb_1p) {
b411b363
PR
3016 case ASB_DISCARD_YOUNGER_PRI:
3017 case ASB_DISCARD_OLDER_PRI:
3018 case ASB_DISCARD_LEAST_CHG:
3019 case ASB_DISCARD_LOCAL:
3020 case ASB_DISCARD_REMOTE:
44ed167d 3021 case ASB_DISCARD_ZERO_CHG:
d0180171 3022 drbd_err(device, "Configuration error.\n");
b411b363
PR
3023 break;
3024 case ASB_DISCONNECT:
3025 break;
3026 case ASB_CONSENSUS:
69a22773 3027 hg = drbd_asb_recover_0p(peer_device);
b30ab791 3028 if (hg == -1 && device->state.role == R_SECONDARY)
b411b363 3029 rv = hg;
b30ab791 3030 if (hg == 1 && device->state.role == R_PRIMARY)
b411b363
PR
3031 rv = hg;
3032 break;
3033 case ASB_VIOLENTLY:
69a22773 3034 rv = drbd_asb_recover_0p(peer_device);
b411b363
PR
3035 break;
3036 case ASB_DISCARD_SECONDARY:
b30ab791 3037 return device->state.role == R_PRIMARY ? 1 : -1;
b411b363 3038 case ASB_CALL_HELPER:
69a22773 3039 hg = drbd_asb_recover_0p(peer_device);
b30ab791 3040 if (hg == -1 && device->state.role == R_PRIMARY) {
bb437946
AG
3041 enum drbd_state_rv rv2;
3042
b411b363
PR
3043 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3044 * we might be here in C_WF_REPORT_PARAMS which is transient.
3045 * we do not need to wait for the after state change work either. */
b30ab791 3046 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
bb437946 3047 if (rv2 != SS_SUCCESS) {
b30ab791 3048 drbd_khelper(device, "pri-lost-after-sb");
b411b363 3049 } else {
d0180171 3050 drbd_warn(device, "Successfully gave up primary role.\n");
b411b363
PR
3051 rv = hg;
3052 }
3053 } else
3054 rv = hg;
3055 }
3056
3057 return rv;
3058}
3059
69a22773
AG
3060/**
3061 * drbd_asb_recover_2p - Recover after split-brain with two remaining primaries
3062 */
3063static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
b411b363 3064{
69a22773 3065 struct drbd_device *device = peer_device->device;
6184ea21 3066 int hg, rv = -100;
44ed167d 3067 enum drbd_after_sb_p after_sb_2p;
b411b363 3068
44ed167d 3069 rcu_read_lock();
69a22773 3070 after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
44ed167d
PR
3071 rcu_read_unlock();
3072 switch (after_sb_2p) {
b411b363
PR
3073 case ASB_DISCARD_YOUNGER_PRI:
3074 case ASB_DISCARD_OLDER_PRI:
3075 case ASB_DISCARD_LEAST_CHG:
3076 case ASB_DISCARD_LOCAL:
3077 case ASB_DISCARD_REMOTE:
3078 case ASB_CONSENSUS:
3079 case ASB_DISCARD_SECONDARY:
44ed167d 3080 case ASB_DISCARD_ZERO_CHG:
d0180171 3081 drbd_err(device, "Configuration error.\n");
b411b363
PR
3082 break;
3083 case ASB_VIOLENTLY:
69a22773 3084 rv = drbd_asb_recover_0p(peer_device);
b411b363
PR
3085 break;
3086 case ASB_DISCONNECT:
3087 break;
3088 case ASB_CALL_HELPER:
69a22773 3089 hg = drbd_asb_recover_0p(peer_device);
b411b363 3090 if (hg == -1) {
bb437946
AG
3091 enum drbd_state_rv rv2;
3092
b411b363
PR
3093 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3094 * we might be here in C_WF_REPORT_PARAMS which is transient.
3095 * we do not need to wait for the after state change work either. */
b30ab791 3096 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
bb437946 3097 if (rv2 != SS_SUCCESS) {
b30ab791 3098 drbd_khelper(device, "pri-lost-after-sb");
b411b363 3099 } else {
d0180171 3100 drbd_warn(device, "Successfully gave up primary role.\n");
b411b363
PR
3101 rv = hg;
3102 }
3103 } else
3104 rv = hg;
3105 }
3106
3107 return rv;
3108}
3109
b30ab791 3110static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
b411b363
PR
3111 u64 bits, u64 flags)
3112{
3113 if (!uuid) {
d0180171 3114 drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
b411b363
PR
3115 return;
3116 }
d0180171 3117 drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
b411b363
PR
3118 text,
3119 (unsigned long long)uuid[UI_CURRENT],
3120 (unsigned long long)uuid[UI_BITMAP],
3121 (unsigned long long)uuid[UI_HISTORY_START],
3122 (unsigned long long)uuid[UI_HISTORY_END],
3123 (unsigned long long)bits,
3124 (unsigned long long)flags);
3125}
3126
3127/*
3128 100 after split brain try auto recover
3129 2 C_SYNC_SOURCE set BitMap
3130 1 C_SYNC_SOURCE use BitMap
3131 0 no Sync
3132 -1 C_SYNC_TARGET use BitMap
3133 -2 C_SYNC_TARGET set BitMap
3134 -100 after split brain, disconnect
3135-1000 unrelated data
4a23f264
PR
3136-1091 requires proto 91
3137-1096 requires proto 96
b411b363 3138 */
44a4d551 3139static int drbd_uuid_compare(struct drbd_device *const device, int *rule_nr) __must_hold(local)
b411b363 3140{
44a4d551
LE
3141 struct drbd_peer_device *const peer_device = first_peer_device(device);
3142 struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
b411b363
PR
3143 u64 self, peer;
3144 int i, j;
3145
b30ab791
AG
3146 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3147 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
b411b363
PR
3148
3149 *rule_nr = 10;
3150 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
3151 return 0;
3152
3153 *rule_nr = 20;
3154 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
3155 peer != UUID_JUST_CREATED)
3156 return -2;
3157
3158 *rule_nr = 30;
3159 if (self != UUID_JUST_CREATED &&
3160 (peer == UUID_JUST_CREATED || peer == (u64)0))
3161 return 2;
3162
3163 if (self == peer) {
3164 int rct, dc; /* roles at crash time */
3165
b30ab791 3166 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
b411b363 3167
44a4d551 3168 if (connection->agreed_pro_version < 91)
4a23f264 3169 return -1091;
b411b363 3170
b30ab791
AG
3171 if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
3172 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
d0180171 3173 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
b30ab791
AG
3174 drbd_uuid_move_history(device);
3175 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
3176 device->ldev->md.uuid[UI_BITMAP] = 0;
b411b363 3177
b30ab791
AG
3178 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3179 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
b411b363
PR
3180 *rule_nr = 34;
3181 } else {
d0180171 3182 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
b411b363
PR
3183 *rule_nr = 36;
3184 }
3185
3186 return 1;
3187 }
3188
b30ab791 3189 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
b411b363 3190
44a4d551 3191 if (connection->agreed_pro_version < 91)
4a23f264 3192 return -1091;
b411b363 3193
b30ab791
AG
3194 if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3195 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
d0180171 3196 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
b411b363 3197
b30ab791
AG
3198 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3199 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3200 device->p_uuid[UI_BITMAP] = 0UL;
b411b363 3201
b30ab791 3202 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
b411b363
PR
3203 *rule_nr = 35;
3204 } else {
d0180171 3205 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
b411b363
PR
3206 *rule_nr = 37;
3207 }
3208
3209 return -1;
3210 }
3211
3212 /* Common power [off|failure] */
b30ab791
AG
3213 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3214 (device->p_uuid[UI_FLAGS] & 2);
b411b363
PR
3215 /* lowest bit is set when we were primary,
3216 * next bit (weight 2) is set when peer was primary */
3217 *rule_nr = 40;
3218
3219 switch (rct) {
3220 case 0: /* !self_pri && !peer_pri */ return 0;
3221 case 1: /* self_pri && !peer_pri */ return 1;
3222 case 2: /* !self_pri && peer_pri */ return -1;
3223 case 3: /* self_pri && peer_pri */
44a4d551 3224 dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
b411b363
PR
3225 return dc ? -1 : 1;
3226 }
3227 }
3228
3229 *rule_nr = 50;
b30ab791 3230 peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
b411b363
PR
3231 if (self == peer)
3232 return -1;
3233
3234 *rule_nr = 51;
b30ab791 3235 peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
b411b363 3236 if (self == peer) {
44a4d551 3237 if (connection->agreed_pro_version < 96 ?
b30ab791
AG
3238 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3239 (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3240 peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
3241 /* The last P_SYNC_UUID did not get though. Undo the last start of
3242 resync as sync source modifications of the peer's UUIDs. */
3243
44a4d551 3244 if (connection->agreed_pro_version < 91)
4a23f264 3245 return -1091;
b411b363 3246
b30ab791
AG
3247 device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3248 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
4a23f264 3249
d0180171 3250 drbd_info(device, "Lost last syncUUID packet, corrected:\n");
b30ab791 3251 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
4a23f264 3252
b411b363
PR
3253 return -1;
3254 }
3255 }
3256
3257 *rule_nr = 60;
b30ab791 3258 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
b411b363 3259 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
b30ab791 3260 peer = device->p_uuid[i] & ~((u64)1);
b411b363
PR
3261 if (self == peer)
3262 return -2;
3263 }
3264
3265 *rule_nr = 70;
b30ab791
AG
3266 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3267 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
b411b363
PR
3268 if (self == peer)
3269 return 1;
3270
3271 *rule_nr = 71;
b30ab791 3272 self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
b411b363 3273 if (self == peer) {
44a4d551 3274 if (connection->agreed_pro_version < 96 ?
b30ab791
AG
3275 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3276 (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3277 self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
3278 /* The last P_SYNC_UUID did not get though. Undo the last start of
3279 resync as sync source modifications of our UUIDs. */
3280
44a4d551 3281 if (connection->agreed_pro_version < 91)
4a23f264 3282 return -1091;
b411b363 3283
b30ab791
AG
3284 __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3285 __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
b411b363 3286
d0180171 3287 drbd_info(device, "Last syncUUID did not get through, corrected:\n");
b30ab791
AG
3288 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3289 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
b411b363
PR
3290
3291 return 1;
3292 }
3293 }
3294
3295
3296 *rule_nr = 80;
b30ab791 3297 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
b411b363 3298 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
b30ab791 3299 self = device->ldev->md.uuid[i] & ~((u64)1);
b411b363
PR
3300 if (self == peer)
3301 return 2;
3302 }
3303
3304 *rule_nr = 90;
b30ab791
AG
3305 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3306 peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
b411b363
PR
3307 if (self == peer && self != ((u64)0))
3308 return 100;
3309
3310 *rule_nr = 100;
3311 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
b30ab791 3312 self = device->ldev->md.uuid[i] & ~((u64)1);
b411b363 3313 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
b30ab791 3314 peer = device->p_uuid[j] & ~((u64)1);
b411b363
PR
3315 if (self == peer)
3316 return -100;
3317 }
3318 }
3319
3320 return -1000;
3321}
3322
3323/* drbd_sync_handshake() returns the new conn state on success, or
3324 CONN_MASK (-1) on failure.
3325 */
69a22773
AG
3326static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3327 enum drbd_role peer_role,
b411b363
PR
3328 enum drbd_disk_state peer_disk) __must_hold(local)
3329{
69a22773 3330 struct drbd_device *device = peer_device->device;
b411b363
PR
3331 enum drbd_conns rv = C_MASK;
3332 enum drbd_disk_state mydisk;
44ed167d 3333 struct net_conf *nc;
6dff2902 3334 int hg, rule_nr, rr_conflict, tentative;
b411b363 3335
b30ab791 3336 mydisk = device->state.disk;
b411b363 3337 if (mydisk == D_NEGOTIATING)
b30ab791 3338 mydisk = device->new_state_tmp.disk;
b411b363 3339
d0180171 3340 drbd_info(device, "drbd_sync_handshake:\n");
9f2247bb 3341
b30ab791
AG
3342 spin_lock_irq(&device->ldev->md.uuid_lock);
3343 drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3344 drbd_uuid_dump(device, "peer", device->p_uuid,
3345 device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
b411b363 3346
b30ab791
AG
3347 hg = drbd_uuid_compare(device, &rule_nr);
3348 spin_unlock_irq(&device->ldev->md.uuid_lock);
b411b363 3349
d0180171 3350 drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
b411b363
PR
3351
3352 if (hg == -1000) {
d0180171 3353 drbd_alert(device, "Unrelated data, aborting!\n");
b411b363
PR
3354 return C_MASK;
3355 }
4a23f264 3356 if (hg < -1000) {
d0180171 3357 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
b411b363
PR
3358 return C_MASK;
3359 }
3360
3361 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3362 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
3363 int f = (hg == -100) || abs(hg) == 2;
3364 hg = mydisk > D_INCONSISTENT ? 1 : -1;
3365 if (f)
3366 hg = hg*2;
d0180171 3367 drbd_info(device, "Becoming sync %s due to disk states.\n",
b411b363
PR
3368 hg > 0 ? "source" : "target");
3369 }
3370
3a11a487 3371 if (abs(hg) == 100)
b30ab791 3372 drbd_khelper(device, "initial-split-brain");
3a11a487 3373
44ed167d 3374 rcu_read_lock();
69a22773 3375 nc = rcu_dereference(peer_device->connection->net_conf);
44ed167d
PR
3376
3377 if (hg == 100 || (hg == -100 && nc->always_asbp)) {
b30ab791 3378 int pcount = (device->state.role == R_PRIMARY)
b411b363
PR
3379 + (peer_role == R_PRIMARY);
3380 int forced = (hg == -100);
3381
3382 switch (pcount) {
3383 case 0:
69a22773 3384 hg = drbd_asb_recover_0p(peer_device);
b411b363
PR
3385 break;
3386 case 1:
69a22773 3387 hg = drbd_asb_recover_1p(peer_device);
b411b363
PR
3388 break;
3389 case 2:
69a22773 3390 hg = drbd_asb_recover_2p(peer_device);
b411b363
PR
3391 break;
3392 }
3393 if (abs(hg) < 100) {
d0180171 3394 drbd_warn(device, "Split-Brain detected, %d primaries, "
b411b363
PR
3395 "automatically solved. Sync from %s node\n",
3396 pcount, (hg < 0) ? "peer" : "this");
3397 if (forced) {
d0180171 3398 drbd_warn(device, "Doing a full sync, since"
b411b363
PR
3399 " UUIDs where ambiguous.\n");
3400 hg = hg*2;
3401 }
3402 }
3403 }
3404
3405 if (hg == -100) {
b30ab791 3406 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
b411b363 3407 hg = -1;
b30ab791 3408 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
b411b363
PR
3409 hg = 1;
3410
3411 if (abs(hg) < 100)
d0180171 3412 drbd_warn(device, "Split-Brain detected, manually solved. "
b411b363
PR
3413 "Sync from %s node\n",
3414 (hg < 0) ? "peer" : "this");
3415 }
44ed167d 3416 rr_conflict = nc->rr_conflict;
6dff2902 3417 tentative = nc->tentative;
44ed167d 3418 rcu_read_unlock();
b411b363
PR
3419
3420 if (hg == -100) {
580b9767
LE
3421 /* FIXME this log message is not correct if we end up here
3422 * after an attempted attach on a diskless node.
3423 * We just refuse to attach -- well, we drop the "connection"
3424 * to that disk, in a way... */
d0180171 3425 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
b30ab791 3426 drbd_khelper(device, "split-brain");
b411b363
PR
3427 return C_MASK;
3428 }
3429
3430 if (hg > 0 && mydisk <= D_INCONSISTENT) {
d0180171 3431 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
b411b363
PR
3432 return C_MASK;
3433 }
3434
3435 if (hg < 0 && /* by intention we do not use mydisk here. */
b30ab791 3436 device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
44ed167d 3437 switch (rr_conflict) {
b411b363 3438 case ASB_CALL_HELPER:
b30ab791 3439 drbd_khelper(device, "pri-lost");
b411b363
PR
3440 /* fall through */
3441 case ASB_DISCONNECT:
d0180171 3442 drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
b411b363
PR
3443 return C_MASK;
3444 case ASB_VIOLENTLY:
d0180171 3445 drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
b411b363
PR
3446 "assumption\n");
3447 }
3448 }
3449
69a22773 3450 if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
cf14c2e9 3451 if (hg == 0)
d0180171 3452 drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
cf14c2e9 3453 else
d0180171 3454 drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
cf14c2e9
PR
3455 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3456 abs(hg) >= 2 ? "full" : "bit-map based");
3457 return C_MASK;
3458 }
3459
b411b363 3460 if (abs(hg) >= 2) {
d0180171 3461 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
b30ab791 3462 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
20ceb2b2 3463 BM_LOCKED_SET_ALLOWED))
b411b363
PR
3464 return C_MASK;
3465 }
3466
3467 if (hg > 0) { /* become sync source. */
3468 rv = C_WF_BITMAP_S;
3469 } else if (hg < 0) { /* become sync target */
3470 rv = C_WF_BITMAP_T;
3471 } else {
3472 rv = C_CONNECTED;
b30ab791 3473 if (drbd_bm_total_weight(device)) {
d0180171 3474 drbd_info(device, "No resync, but %lu bits in bitmap!\n",
b30ab791 3475 drbd_bm_total_weight(device));
b411b363
PR
3476 }
3477 }
3478
3479 return rv;
3480}
3481
f179d76d 3482static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
b411b363
PR
3483{
3484 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
f179d76d
PR
3485 if (peer == ASB_DISCARD_REMOTE)
3486 return ASB_DISCARD_LOCAL;
b411b363
PR
3487
3488 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
f179d76d
PR
3489 if (peer == ASB_DISCARD_LOCAL)
3490 return ASB_DISCARD_REMOTE;
b411b363
PR
3491
3492 /* everything else is valid if they are equal on both sides. */
f179d76d 3493 return peer;
b411b363
PR
3494}
3495
bde89a9e 3496static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
b411b363 3497{
e658983a 3498 struct p_protocol *p = pi->data;
036b17ea
PR
3499 enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3500 int p_proto, p_discard_my_data, p_two_primaries, cf;
3501 struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3502 char integrity_alg[SHARED_SECRET_MAX] = "";
9534d671 3503 struct crypto_ahash *peer_integrity_tfm = NULL;
7aca6c75 3504 void *int_dig_in = NULL, *int_dig_vv = NULL;
b411b363 3505
b411b363
PR
3506 p_proto = be32_to_cpu(p->protocol);
3507 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
3508 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
3509 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
b411b363 3510 p_two_primaries = be32_to_cpu(p->two_primaries);
cf14c2e9 3511 cf = be32_to_cpu(p->conn_flags);
6139f60d 3512 p_discard_my_data = cf & CF_DISCARD_MY_DATA;
cf14c2e9 3513
bde89a9e 3514 if (connection->agreed_pro_version >= 87) {
86db0618 3515 int err;
cf14c2e9 3516
88104ca4 3517 if (pi->size > sizeof(integrity_alg))
86db0618 3518 return -EIO;
bde89a9e 3519 err = drbd_recv_all(connection, integrity_alg, pi->size);
86db0618
AG
3520 if (err)
3521 return err;
036b17ea 3522 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
b411b363
PR
3523 }
3524
7d4c782c 3525 if (pi->cmd != P_PROTOCOL_UPDATE) {
bde89a9e 3526 clear_bit(CONN_DRY_RUN, &connection->flags);
b411b363 3527
fbc12f45 3528 if (cf & CF_DRY_RUN)
bde89a9e 3529 set_bit(CONN_DRY_RUN, &connection->flags);
b411b363 3530
fbc12f45 3531 rcu_read_lock();
bde89a9e 3532 nc = rcu_dereference(connection->net_conf);
b411b363 3533
fbc12f45 3534 if (p_proto != nc->wire_protocol) {
1ec861eb 3535 drbd_err(connection, "incompatible %s settings\n", "protocol");
fbc12f45
AG
3536 goto disconnect_rcu_unlock;
3537 }
b411b363 3538
fbc12f45 3539 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
1ec861eb 3540 drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
fbc12f45
AG
3541 goto disconnect_rcu_unlock;
3542 }
b411b363 3543
fbc12f45 3544 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
1ec861eb 3545 drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
fbc12f45
AG
3546 goto disconnect_rcu_unlock;
3547 }
b411b363 3548
fbc12f45 3549 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
1ec861eb 3550 drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
fbc12f45
AG
3551 goto disconnect_rcu_unlock;
3552 }
b411b363 3553
fbc12f45 3554 if (p_discard_my_data && nc->discard_my_data) {
1ec861eb 3555 drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
fbc12f45
AG
3556 goto disconnect_rcu_unlock;
3557 }
b411b363 3558
fbc12f45 3559 if (p_two_primaries != nc->two_primaries) {
1ec861eb 3560 drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
fbc12f45
AG
3561 goto disconnect_rcu_unlock;
3562 }
b411b363 3563
fbc12f45 3564 if (strcmp(integrity_alg, nc->integrity_alg)) {
1ec861eb 3565 drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
fbc12f45
AG
3566 goto disconnect_rcu_unlock;
3567 }
b411b363 3568
fbc12f45 3569 rcu_read_unlock();
b411b363
PR
3570 }
3571
7d4c782c
AG
3572 if (integrity_alg[0]) {
3573 int hash_size;
3574
3575 /*
3576 * We can only change the peer data integrity algorithm
3577 * here. Changing our own data integrity algorithm
3578 * requires that we send a P_PROTOCOL_UPDATE packet at
3579 * the same time; otherwise, the peer has no way to
3580 * tell between which packets the algorithm should
3581 * change.
3582 */
b411b363 3583
9534d671 3584 peer_integrity_tfm = crypto_alloc_ahash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
7d4c782c 3585 if (!peer_integrity_tfm) {
1ec861eb 3586 drbd_err(connection, "peer data-integrity-alg %s not supported\n",
7d4c782c
AG
3587 integrity_alg);
3588 goto disconnect;
3589 }
b411b363 3590
9534d671 3591 hash_size = crypto_ahash_digestsize(peer_integrity_tfm);
7d4c782c
AG
3592 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3593 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3594 if (!(int_dig_in && int_dig_vv)) {
1ec861eb 3595 drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
b411b363
PR
3596 goto disconnect;
3597 }
b411b363
PR
3598 }
3599
7d4c782c
AG
3600 new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3601 if (!new_net_conf) {
1ec861eb 3602 drbd_err(connection, "Allocation of new net_conf failed\n");
7d4c782c
AG
3603 goto disconnect;
3604 }
3605
bde89a9e 3606 mutex_lock(&connection->data.mutex);
0500813f 3607 mutex_lock(&connection->resource->conf_update);
bde89a9e 3608 old_net_conf = connection->net_conf;
7d4c782c
AG
3609 *new_net_conf = *old_net_conf;
3610
3611 new_net_conf->wire_protocol = p_proto;
3612 new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3613 new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3614 new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3615 new_net_conf->two_primaries = p_two_primaries;
3616
bde89a9e 3617 rcu_assign_pointer(connection->net_conf, new_net_conf);
0500813f 3618 mutex_unlock(&connection->resource->conf_update);
bde89a9e 3619 mutex_unlock(&connection->data.mutex);
7d4c782c 3620
9534d671 3621 crypto_free_ahash(connection->peer_integrity_tfm);
bde89a9e
AG
3622 kfree(connection->int_dig_in);
3623 kfree(connection->int_dig_vv);
3624 connection->peer_integrity_tfm = peer_integrity_tfm;
3625 connection->int_dig_in = int_dig_in;
3626 connection->int_dig_vv = int_dig_vv;
7d4c782c
AG
3627
3628 if (strcmp(old_net_conf->integrity_alg, integrity_alg))
1ec861eb 3629 drbd_info(connection, "peer data-integrity-alg: %s\n",
7d4c782c
AG
3630 integrity_alg[0] ? integrity_alg : "(none)");
3631
3632 synchronize_rcu();
3633 kfree(old_net_conf);
82bc0194 3634 return 0;
b411b363 3635
44ed167d
PR
3636disconnect_rcu_unlock:
3637 rcu_read_unlock();
b411b363 3638disconnect:
9534d671 3639 crypto_free_ahash(peer_integrity_tfm);
036b17ea
PR
3640 kfree(int_dig_in);
3641 kfree(int_dig_vv);
bde89a9e 3642 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3643 return -EIO;
b411b363
PR
3644}
3645
3646/* helper function
3647 * input: alg name, feature name
3648 * return: NULL (alg name was "")
3649 * ERR_PTR(error) if something goes wrong
3650 * or the crypto hash ptr, if it worked out ok. */
9534d671 3651static struct crypto_ahash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
b411b363
PR
3652 const char *alg, const char *name)
3653{
9534d671 3654 struct crypto_ahash *tfm;
b411b363
PR
3655
3656 if (!alg[0])
3657 return NULL;
3658
9534d671 3659 tfm = crypto_alloc_ahash(alg, 0, CRYPTO_ALG_ASYNC);
b411b363 3660 if (IS_ERR(tfm)) {
d0180171 3661 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
b411b363
PR
3662 alg, name, PTR_ERR(tfm));
3663 return tfm;
3664 }
b411b363
PR
3665 return tfm;
3666}
3667
bde89a9e 3668static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
4a76b161 3669{
bde89a9e 3670 void *buffer = connection->data.rbuf;
4a76b161
AG
3671 int size = pi->size;
3672
3673 while (size) {
3674 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
bde89a9e 3675 s = drbd_recv(connection, buffer, s);
4a76b161
AG
3676 if (s <= 0) {
3677 if (s < 0)
3678 return s;
3679 break;
3680 }
3681 size -= s;
3682 }
3683 if (size)
3684 return -EIO;
3685 return 0;
3686}
3687
3688/*
3689 * config_unknown_volume - device configuration command for unknown volume
3690 *
3691 * When a device is added to an existing connection, the node on which the
3692 * device is added first will send configuration commands to its peer but the
3693 * peer will not know about the device yet. It will warn and ignore these
3694 * commands. Once the device is added on the second node, the second node will
3695 * send the same device configuration commands, but in the other direction.
3696 *
3697 * (We can also end up here if drbd is misconfigured.)
3698 */
bde89a9e 3699static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
4a76b161 3700{
1ec861eb 3701 drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
2fcb8f30 3702 cmdname(pi->cmd), pi->vnr);
bde89a9e 3703 return ignore_remaining_packet(connection, pi);
4a76b161
AG
3704}
3705
bde89a9e 3706static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
b411b363 3707{
9f4fe9ad 3708 struct drbd_peer_device *peer_device;
b30ab791 3709 struct drbd_device *device;
e658983a 3710 struct p_rs_param_95 *p;
b411b363 3711 unsigned int header_size, data_size, exp_max_sz;
9534d671
HX
3712 struct crypto_ahash *verify_tfm = NULL;
3713 struct crypto_ahash *csums_tfm = NULL;
2ec91e0e 3714 struct net_conf *old_net_conf, *new_net_conf = NULL;
813472ce 3715 struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
bde89a9e 3716 const int apv = connection->agreed_pro_version;
813472ce 3717 struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
778f271d 3718 int fifo_size = 0;
82bc0194 3719 int err;
b411b363 3720
9f4fe9ad
AG
3721 peer_device = conn_peer_device(connection, pi->vnr);
3722 if (!peer_device)
bde89a9e 3723 return config_unknown_volume(connection, pi);
9f4fe9ad 3724 device = peer_device->device;
b411b363
PR
3725
3726 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
3727 : apv == 88 ? sizeof(struct p_rs_param)
3728 + SHARED_SECRET_MAX
8e26f9cc
PR
3729 : apv <= 94 ? sizeof(struct p_rs_param_89)
3730 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
b411b363 3731
e2857216 3732 if (pi->size > exp_max_sz) {
d0180171 3733 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
e2857216 3734 pi->size, exp_max_sz);
82bc0194 3735 return -EIO;
b411b363
PR
3736 }
3737
3738 if (apv <= 88) {
e658983a 3739 header_size = sizeof(struct p_rs_param);
e2857216 3740 data_size = pi->size - header_size;
8e26f9cc 3741 } else if (apv <= 94) {
e658983a 3742 header_size = sizeof(struct p_rs_param_89);
e2857216 3743 data_size = pi->size - header_size;
0b0ba1ef 3744 D_ASSERT(device, data_size == 0);
8e26f9cc 3745 } else {
e658983a 3746 header_size = sizeof(struct p_rs_param_95);
e2857216 3747 data_size = pi->size - header_size;
0b0ba1ef 3748 D_ASSERT(device, data_size == 0);
b411b363
PR
3749 }
3750
3751 /* initialize verify_alg and csums_alg */
e658983a 3752 p = pi->data;
b411b363
PR
3753 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3754
9f4fe9ad 3755 err = drbd_recv_all(peer_device->connection, p, header_size);
82bc0194
AG
3756 if (err)
3757 return err;
b411b363 3758
0500813f 3759 mutex_lock(&connection->resource->conf_update);
9f4fe9ad 3760 old_net_conf = peer_device->connection->net_conf;
b30ab791 3761 if (get_ldev(device)) {
813472ce
PR
3762 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3763 if (!new_disk_conf) {
b30ab791 3764 put_ldev(device);
0500813f 3765 mutex_unlock(&connection->resource->conf_update);
d0180171 3766 drbd_err(device, "Allocation of new disk_conf failed\n");
813472ce
PR
3767 return -ENOMEM;
3768 }
daeda1cc 3769
b30ab791 3770 old_disk_conf = device->ldev->disk_conf;
813472ce 3771 *new_disk_conf = *old_disk_conf;
b411b363 3772
6394b935 3773 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
813472ce 3774 }
b411b363
PR
3775
3776 if (apv >= 88) {
3777 if (apv == 88) {
5de73827 3778 if (data_size > SHARED_SECRET_MAX || data_size == 0) {
d0180171 3779 drbd_err(device, "verify-alg of wrong size, "
5de73827
PR
3780 "peer wants %u, accepting only up to %u byte\n",
3781 data_size, SHARED_SECRET_MAX);
813472ce
PR
3782 err = -EIO;
3783 goto reconnect;
b411b363
PR
3784 }
3785
9f4fe9ad 3786 err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
813472ce
PR
3787 if (err)
3788 goto reconnect;
b411b363
PR
3789 /* we expect NUL terminated string */
3790 /* but just in case someone tries to be evil */
0b0ba1ef 3791 D_ASSERT(device, p->verify_alg[data_size-1] == 0);
b411b363
PR
3792 p->verify_alg[data_size-1] = 0;
3793
3794 } else /* apv >= 89 */ {
3795 /* we still expect NUL terminated strings */
3796 /* but just in case someone tries to be evil */
0b0ba1ef
AG
3797 D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3798 D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
b411b363
PR
3799 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3800 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3801 }
3802
2ec91e0e 3803 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
b30ab791 3804 if (device->state.conn == C_WF_REPORT_PARAMS) {
d0180171 3805 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2ec91e0e 3806 old_net_conf->verify_alg, p->verify_alg);
b411b363
PR
3807 goto disconnect;
3808 }
b30ab791 3809 verify_tfm = drbd_crypto_alloc_digest_safe(device,
b411b363
PR
3810 p->verify_alg, "verify-alg");
3811 if (IS_ERR(verify_tfm)) {
3812 verify_tfm = NULL;
3813 goto disconnect;
3814 }
3815 }
3816
2ec91e0e 3817 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
b30ab791 3818 if (device->state.conn == C_WF_REPORT_PARAMS) {
d0180171 3819 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2ec91e0e 3820 old_net_conf->csums_alg, p->csums_alg);
b411b363
PR
3821 goto disconnect;
3822 }
b30ab791 3823 csums_tfm = drbd_crypto_alloc_digest_safe(device,
b411b363
PR
3824 p->csums_alg, "csums-alg");
3825 if (IS_ERR(csums_tfm)) {
3826 csums_tfm = NULL;
3827 goto disconnect;
3828 }
3829 }
3830
813472ce 3831 if (apv > 94 && new_disk_conf) {
daeda1cc
PR
3832 new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3833 new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3834 new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3835 new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
778f271d 3836
daeda1cc 3837 fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
b30ab791 3838 if (fifo_size != device->rs_plan_s->size) {
813472ce
PR
3839 new_plan = fifo_alloc(fifo_size);
3840 if (!new_plan) {
d0180171 3841 drbd_err(device, "kmalloc of fifo_buffer failed");
b30ab791 3842 put_ldev(device);
778f271d
PR
3843 goto disconnect;
3844 }
3845 }
8e26f9cc 3846 }
b411b363 3847
91fd4dad 3848 if (verify_tfm || csums_tfm) {
2ec91e0e
PR
3849 new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3850 if (!new_net_conf) {
d0180171 3851 drbd_err(device, "Allocation of new net_conf failed\n");
91fd4dad
PR
3852 goto disconnect;
3853 }
3854
2ec91e0e 3855 *new_net_conf = *old_net_conf;
91fd4dad
PR
3856
3857 if (verify_tfm) {
2ec91e0e
PR
3858 strcpy(new_net_conf->verify_alg, p->verify_alg);
3859 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
9534d671 3860 crypto_free_ahash(peer_device->connection->verify_tfm);
9f4fe9ad 3861 peer_device->connection->verify_tfm = verify_tfm;
d0180171 3862 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
91fd4dad
PR
3863 }
3864 if (csums_tfm) {
2ec91e0e
PR
3865 strcpy(new_net_conf->csums_alg, p->csums_alg);
3866 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
9534d671 3867 crypto_free_ahash(peer_device->connection->csums_tfm);
9f4fe9ad 3868 peer_device->connection->csums_tfm = csums_tfm;
d0180171 3869 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
91fd4dad 3870 }
bde89a9e 3871 rcu_assign_pointer(connection->net_conf, new_net_conf);
778f271d 3872 }
b411b363
PR
3873 }
3874
813472ce 3875 if (new_disk_conf) {
b30ab791
AG
3876 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3877 put_ldev(device);
813472ce
PR
3878 }
3879
3880 if (new_plan) {
b30ab791
AG
3881 old_plan = device->rs_plan_s;
3882 rcu_assign_pointer(device->rs_plan_s, new_plan);
b411b363 3883 }
daeda1cc 3884
0500813f 3885 mutex_unlock(&connection->resource->conf_update);
daeda1cc
PR
3886 synchronize_rcu();
3887 if (new_net_conf)
3888 kfree(old_net_conf);
3889 kfree(old_disk_conf);
813472ce 3890 kfree(old_plan);
daeda1cc 3891
82bc0194 3892 return 0;
b411b363 3893
813472ce
PR
3894reconnect:
3895 if (new_disk_conf) {
b30ab791 3896 put_ldev(device);
813472ce
PR
3897 kfree(new_disk_conf);
3898 }
0500813f 3899 mutex_unlock(&connection->resource->conf_update);
813472ce
PR
3900 return -EIO;
3901
b411b363 3902disconnect:
813472ce
PR
3903 kfree(new_plan);
3904 if (new_disk_conf) {
b30ab791 3905 put_ldev(device);
813472ce
PR
3906 kfree(new_disk_conf);
3907 }
0500813f 3908 mutex_unlock(&connection->resource->conf_update);
b411b363
PR
3909 /* just for completeness: actually not needed,
3910 * as this is not reached if csums_tfm was ok. */
9534d671 3911 crypto_free_ahash(csums_tfm);
b411b363 3912 /* but free the verify_tfm again, if csums_tfm did not work out */
9534d671 3913 crypto_free_ahash(verify_tfm);
9f4fe9ad 3914 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3915 return -EIO;
b411b363
PR
3916}
3917
b411b363 3918/* warn if the arguments differ by more than 12.5% */
b30ab791 3919static void warn_if_differ_considerably(struct drbd_device *device,
b411b363
PR
3920 const char *s, sector_t a, sector_t b)
3921{
3922 sector_t d;
3923 if (a == 0 || b == 0)
3924 return;
3925 d = (a > b) ? (a - b) : (b - a);
3926 if (d > (a>>3) || d > (b>>3))
d0180171 3927 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
b411b363
PR
3928 (unsigned long long)a, (unsigned long long)b);
3929}
3930
bde89a9e 3931static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
b411b363 3932{
9f4fe9ad 3933 struct drbd_peer_device *peer_device;
b30ab791 3934 struct drbd_device *device;
e658983a 3935 struct p_sizes *p = pi->data;
e96c9633 3936 enum determine_dev_size dd = DS_UNCHANGED;
6a8d68b1 3937 sector_t p_size, p_usize, p_csize, my_usize;
b411b363 3938 int ldsc = 0; /* local disk size changed */
e89b591c 3939 enum dds_flags ddsf;
b411b363 3940
9f4fe9ad
AG
3941 peer_device = conn_peer_device(connection, pi->vnr);
3942 if (!peer_device)
bde89a9e 3943 return config_unknown_volume(connection, pi);
9f4fe9ad 3944 device = peer_device->device;
4a76b161 3945
b411b363
PR
3946 p_size = be64_to_cpu(p->d_size);
3947 p_usize = be64_to_cpu(p->u_size);
6a8d68b1 3948 p_csize = be64_to_cpu(p->c_size);
b411b363 3949
b411b363
PR
3950 /* just store the peer's disk size for now.
3951 * we still need to figure out whether we accept that. */
b30ab791 3952 device->p_size = p_size;
b411b363 3953
b30ab791 3954 if (get_ldev(device)) {
60bac040 3955 sector_t new_size, cur_size;
daeda1cc 3956 rcu_read_lock();
b30ab791 3957 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
daeda1cc
PR
3958 rcu_read_unlock();
3959
b30ab791
AG
3960 warn_if_differ_considerably(device, "lower level device sizes",
3961 p_size, drbd_get_max_capacity(device->ldev));
3962 warn_if_differ_considerably(device, "user requested size",
daeda1cc 3963 p_usize, my_usize);
b411b363
PR
3964
3965 /* if this is the first connect, or an otherwise expected
3966 * param exchange, choose the minimum */
b30ab791 3967 if (device->state.conn == C_WF_REPORT_PARAMS)
daeda1cc 3968 p_usize = min_not_zero(my_usize, p_usize);
b411b363
PR
3969
3970 /* Never shrink a device with usable data during connect.
3971 But allow online shrinking if we are connected. */
60bac040
LE
3972 new_size = drbd_new_dev_size(device, device->ldev, p_usize, 0);
3973 cur_size = drbd_get_capacity(device->this_bdev);
3974 if (new_size < cur_size &&
b30ab791
AG
3975 device->state.disk >= D_OUTDATED &&
3976 device->state.conn < C_CONNECTED) {
60bac040
LE
3977 drbd_err(device, "The peer's disk size is too small! (%llu < %llu sectors)\n",
3978 (unsigned long long)new_size, (unsigned long long)cur_size);
9f4fe9ad 3979 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
b30ab791 3980 put_ldev(device);
82bc0194 3981 return -EIO;
b411b363 3982 }
daeda1cc
PR
3983
3984 if (my_usize != p_usize) {
3985 struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3986
3987 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3988 if (!new_disk_conf) {
d0180171 3989 drbd_err(device, "Allocation of new disk_conf failed\n");
b30ab791 3990 put_ldev(device);
daeda1cc
PR
3991 return -ENOMEM;
3992 }
3993
0500813f 3994 mutex_lock(&connection->resource->conf_update);
b30ab791 3995 old_disk_conf = device->ldev->disk_conf;
daeda1cc
PR
3996 *new_disk_conf = *old_disk_conf;
3997 new_disk_conf->disk_size = p_usize;
3998
b30ab791 3999 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
0500813f 4000 mutex_unlock(&connection->resource->conf_update);
daeda1cc
PR
4001 synchronize_rcu();
4002 kfree(old_disk_conf);
4003
d0180171 4004 drbd_info(device, "Peer sets u_size to %lu sectors\n",
daeda1cc 4005 (unsigned long)my_usize);
b411b363 4006 }
daeda1cc 4007
b30ab791 4008 put_ldev(device);
b411b363 4009 }
b411b363 4010
20c68fde 4011 device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
dd4f699d 4012 /* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size().
20c68fde 4013 In case we cleared the QUEUE_FLAG_DISCARD from our queue in
dd4f699d 4014 drbd_reconsider_queue_parameters(), we can be sure that after
20c68fde
LE
4015 drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
4016
e89b591c 4017 ddsf = be16_to_cpu(p->dds_flags);
b30ab791 4018 if (get_ldev(device)) {
dd4f699d 4019 drbd_reconsider_queue_parameters(device, device->ldev);
b30ab791
AG
4020 dd = drbd_determine_dev_size(device, ddsf, NULL);
4021 put_ldev(device);
e96c9633 4022 if (dd == DS_ERROR)
82bc0194 4023 return -EIO;
b30ab791 4024 drbd_md_sync(device);
b411b363 4025 } else {
6a8d68b1
LE
4026 /*
4027 * I am diskless, need to accept the peer's *current* size.
4028 * I must NOT accept the peers backing disk size,
4029 * it may have been larger than mine all along...
4030 *
4031 * At this point, the peer knows more about my disk, or at
4032 * least about what we last agreed upon, than myself.
4033 * So if his c_size is less than his d_size, the most likely
4034 * reason is that *my* d_size was smaller last time we checked.
4035 *
4036 * However, if he sends a zero current size,
4037 * take his (user-capped or) backing disk size anyways.
4038 */
dd4f699d 4039 drbd_reconsider_queue_parameters(device, NULL);
6a8d68b1 4040 drbd_set_my_capacity(device, p_csize ?: p_usize ?: p_size);
b411b363
PR
4041 }
4042
b30ab791
AG
4043 if (get_ldev(device)) {
4044 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
4045 device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
b411b363
PR
4046 ldsc = 1;
4047 }
4048
b30ab791 4049 put_ldev(device);
b411b363
PR
4050 }
4051
b30ab791 4052 if (device->state.conn > C_WF_REPORT_PARAMS) {
b411b363 4053 if (be64_to_cpu(p->c_size) !=
b30ab791 4054 drbd_get_capacity(device->this_bdev) || ldsc) {
b411b363
PR
4055 /* we have different sizes, probably peer
4056 * needs to know my new size... */
69a22773 4057 drbd_send_sizes(peer_device, 0, ddsf);
b411b363 4058 }
b30ab791
AG
4059 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
4060 (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
4061 if (device->state.pdsk >= D_INCONSISTENT &&
4062 device->state.disk >= D_INCONSISTENT) {
e89b591c 4063 if (ddsf & DDSF_NO_RESYNC)
d0180171 4064 drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
e89b591c 4065 else
b30ab791 4066 resync_after_online_grow(device);
e89b591c 4067 } else
b30ab791 4068 set_bit(RESYNC_AFTER_NEG, &device->flags);
b411b363
PR
4069 }
4070 }
4071
82bc0194 4072 return 0;
b411b363
PR
4073}
4074
bde89a9e 4075static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
b411b363 4076{
9f4fe9ad 4077 struct drbd_peer_device *peer_device;
b30ab791 4078 struct drbd_device *device;
e658983a 4079 struct p_uuids *p = pi->data;
b411b363 4080 u64 *p_uuid;
62b0da3a 4081 int i, updated_uuids = 0;
b411b363 4082
9f4fe9ad
AG
4083 peer_device = conn_peer_device(connection, pi->vnr);
4084 if (!peer_device)
bde89a9e 4085 return config_unknown_volume(connection, pi);
9f4fe9ad 4086 device = peer_device->device;
4a76b161 4087
b411b363 4088 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
063eacf8 4089 if (!p_uuid) {
d0180171 4090 drbd_err(device, "kmalloc of p_uuid failed\n");
063eacf8
JW
4091 return false;
4092 }
b411b363
PR
4093
4094 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
4095 p_uuid[i] = be64_to_cpu(p->uuid[i]);
4096
b30ab791
AG
4097 kfree(device->p_uuid);
4098 device->p_uuid = p_uuid;
b411b363 4099
b30ab791
AG
4100 if (device->state.conn < C_CONNECTED &&
4101 device->state.disk < D_INCONSISTENT &&
4102 device->state.role == R_PRIMARY &&
4103 (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
d0180171 4104 drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
b30ab791 4105 (unsigned long long)device->ed_uuid);
9f4fe9ad 4106 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 4107 return -EIO;
b411b363
PR
4108 }
4109
b30ab791 4110 if (get_ldev(device)) {
b411b363 4111 int skip_initial_sync =
b30ab791 4112 device->state.conn == C_CONNECTED &&
9f4fe9ad 4113 peer_device->connection->agreed_pro_version >= 90 &&
b30ab791 4114 device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
b411b363
PR
4115 (p_uuid[UI_FLAGS] & 8);
4116 if (skip_initial_sync) {
d0180171 4117 drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
b30ab791 4118 drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
20ceb2b2
LE
4119 "clear_n_write from receive_uuids",
4120 BM_LOCKED_TEST_ALLOWED);
b30ab791
AG
4121 _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
4122 _drbd_uuid_set(device, UI_BITMAP, 0);
4123 _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
b411b363 4124 CS_VERBOSE, NULL);
b30ab791 4125 drbd_md_sync(device);
62b0da3a 4126 updated_uuids = 1;
b411b363 4127 }
b30ab791
AG
4128 put_ldev(device);
4129 } else if (device->state.disk < D_INCONSISTENT &&
4130 device->state.role == R_PRIMARY) {
18a50fa2
PR
4131 /* I am a diskless primary, the peer just created a new current UUID
4132 for me. */
b30ab791 4133 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
b411b363
PR
4134 }
4135
4136 /* Before we test for the disk state, we should wait until an eventually
4137 ongoing cluster wide state change is finished. That is important if
4138 we are primary and are detaching from our disk. We need to see the
4139 new disk state... */
b30ab791
AG
4140 mutex_lock(device->state_mutex);
4141 mutex_unlock(device->state_mutex);
4142 if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
4143 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
62b0da3a
LE
4144
4145 if (updated_uuids)
b30ab791 4146 drbd_print_uuids(device, "receiver updated UUIDs to");
b411b363 4147
82bc0194 4148 return 0;
b411b363
PR
4149}
4150
4151/**
4152 * convert_state() - Converts the peer's view of the cluster state to our point of view
4153 * @ps: The state as seen by the peer.
4154 */
4155static union drbd_state convert_state(union drbd_state ps)
4156{
4157 union drbd_state ms;
4158
4159 static enum drbd_conns c_tab[] = {
369bea63 4160 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
b411b363
PR
4161 [C_CONNECTED] = C_CONNECTED,
4162
4163 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
4164 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
4165 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
4166 [C_VERIFY_S] = C_VERIFY_T,
4167 [C_MASK] = C_MASK,
4168 };
4169
4170 ms.i = ps.i;
4171
4172 ms.conn = c_tab[ps.conn];
4173 ms.peer = ps.role;
4174 ms.role = ps.peer;
4175 ms.pdsk = ps.disk;
4176 ms.disk = ps.pdsk;
4177 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
4178
4179 return ms;
4180}
4181
bde89a9e 4182static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
b411b363 4183{
9f4fe9ad 4184 struct drbd_peer_device *peer_device;
b30ab791 4185 struct drbd_device *device;
e658983a 4186 struct p_req_state *p = pi->data;
b411b363 4187 union drbd_state mask, val;
bf885f8a 4188 enum drbd_state_rv rv;
b411b363 4189
9f4fe9ad
AG
4190 peer_device = conn_peer_device(connection, pi->vnr);
4191 if (!peer_device)
4a76b161 4192 return -EIO;
9f4fe9ad 4193 device = peer_device->device;
4a76b161 4194
b411b363
PR
4195 mask.i = be32_to_cpu(p->mask);
4196 val.i = be32_to_cpu(p->val);
4197
9f4fe9ad 4198 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
b30ab791 4199 mutex_is_locked(device->state_mutex)) {
69a22773 4200 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
82bc0194 4201 return 0;
b411b363
PR
4202 }
4203
4204 mask = convert_state(mask);
4205 val = convert_state(val);
4206
b30ab791 4207 rv = drbd_change_state(device, CS_VERBOSE, mask, val);
69a22773 4208 drbd_send_sr_reply(peer_device, rv);
b411b363 4209
b30ab791 4210 drbd_md_sync(device);
b411b363 4211
82bc0194 4212 return 0;
b411b363
PR
4213}
4214
bde89a9e 4215static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
b411b363 4216{
e658983a 4217 struct p_req_state *p = pi->data;
b411b363 4218 union drbd_state mask, val;
bf885f8a 4219 enum drbd_state_rv rv;
b411b363 4220
b411b363
PR
4221 mask.i = be32_to_cpu(p->mask);
4222 val.i = be32_to_cpu(p->val);
4223
bde89a9e
AG
4224 if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4225 mutex_is_locked(&connection->cstate_mutex)) {
4226 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
82bc0194 4227 return 0;
b411b363
PR
4228 }
4229
4230 mask = convert_state(mask);
4231 val = convert_state(val);
4232
bde89a9e
AG
4233 rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4234 conn_send_sr_reply(connection, rv);
b411b363 4235
82bc0194 4236 return 0;
b411b363
PR
4237}
4238
bde89a9e 4239static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
b411b363 4240{
9f4fe9ad 4241 struct drbd_peer_device *peer_device;
b30ab791 4242 struct drbd_device *device;
e658983a 4243 struct p_state *p = pi->data;
4ac4aada 4244 union drbd_state os, ns, peer_state;
b411b363 4245 enum drbd_disk_state real_peer_disk;
65d922c3 4246 enum chg_state_flags cs_flags;
b411b363
PR
4247 int rv;
4248
9f4fe9ad
AG
4249 peer_device = conn_peer_device(connection, pi->vnr);
4250 if (!peer_device)
bde89a9e 4251 return config_unknown_volume(connection, pi);
9f4fe9ad 4252 device = peer_device->device;
4a76b161 4253
b411b363
PR
4254 peer_state.i = be32_to_cpu(p->state);
4255
4256 real_peer_disk = peer_state.disk;
4257 if (peer_state.disk == D_NEGOTIATING) {
b30ab791 4258 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
d0180171 4259 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
b411b363
PR
4260 }
4261
0500813f 4262 spin_lock_irq(&device->resource->req_lock);
b411b363 4263 retry:
b30ab791 4264 os = ns = drbd_read_state(device);
0500813f 4265 spin_unlock_irq(&device->resource->req_lock);
b411b363 4266
668700b4 4267 /* If some other part of the code (ack_receiver thread, timeout)
545752d5
LE
4268 * already decided to close the connection again,
4269 * we must not "re-establish" it here. */
4270 if (os.conn <= C_TEAR_DOWN)
58ffa580 4271 return -ECONNRESET;
545752d5 4272
40424e4a
LE
4273 /* If this is the "end of sync" confirmation, usually the peer disk
4274 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4275 * set) resync started in PausedSyncT, or if the timing of pause-/
4276 * unpause-sync events has been "just right", the peer disk may
4277 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4278 */
4279 if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4280 real_peer_disk == D_UP_TO_DATE &&
e9ef7bb6
LE
4281 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4282 /* If we are (becoming) SyncSource, but peer is still in sync
4283 * preparation, ignore its uptodate-ness to avoid flapping, it
4284 * will change to inconsistent once the peer reaches active
4285 * syncing states.
4286 * It may have changed syncer-paused flags, however, so we
4287 * cannot ignore this completely. */
4288 if (peer_state.conn > C_CONNECTED &&
4289 peer_state.conn < C_SYNC_SOURCE)
4290 real_peer_disk = D_INCONSISTENT;
4291
4292 /* if peer_state changes to connected at the same time,
4293 * it explicitly notifies us that it finished resync.
4294 * Maybe we should finish it up, too? */
4295 else if (os.conn >= C_SYNC_SOURCE &&
4296 peer_state.conn == C_CONNECTED) {
b30ab791
AG
4297 if (drbd_bm_total_weight(device) <= device->rs_failed)
4298 drbd_resync_finished(device);
82bc0194 4299 return 0;
e9ef7bb6
LE
4300 }
4301 }
4302
02b91b55
LE
4303 /* explicit verify finished notification, stop sector reached. */
4304 if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4305 peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
b30ab791
AG
4306 ov_out_of_sync_print(device);
4307 drbd_resync_finished(device);
58ffa580 4308 return 0;
02b91b55
LE
4309 }
4310
e9ef7bb6
LE
4311 /* peer says his disk is inconsistent, while we think it is uptodate,
4312 * and this happens while the peer still thinks we have a sync going on,
4313 * but we think we are already done with the sync.
4314 * We ignore this to avoid flapping pdsk.
4315 * This should not happen, if the peer is a recent version of drbd. */
4316 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4317 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4318 real_peer_disk = D_UP_TO_DATE;
4319
4ac4aada
LE
4320 if (ns.conn == C_WF_REPORT_PARAMS)
4321 ns.conn = C_CONNECTED;
b411b363 4322
67531718
PR
4323 if (peer_state.conn == C_AHEAD)
4324 ns.conn = C_BEHIND;
4325
b30ab791
AG
4326 if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4327 get_ldev_if_state(device, D_NEGOTIATING)) {
b411b363
PR
4328 int cr; /* consider resync */
4329
4330 /* if we established a new connection */
4ac4aada 4331 cr = (os.conn < C_CONNECTED);
b411b363
PR
4332 /* if we had an established connection
4333 * and one of the nodes newly attaches a disk */
4ac4aada 4334 cr |= (os.conn == C_CONNECTED &&
b411b363 4335 (peer_state.disk == D_NEGOTIATING ||
4ac4aada 4336 os.disk == D_NEGOTIATING));
b411b363
PR
4337 /* if we have both been inconsistent, and the peer has been
4338 * forced to be UpToDate with --overwrite-data */
b30ab791 4339 cr |= test_bit(CONSIDER_RESYNC, &device->flags);
b411b363
PR
4340 /* if we had been plain connected, and the admin requested to
4341 * start a sync by "invalidate" or "invalidate-remote" */
4ac4aada 4342 cr |= (os.conn == C_CONNECTED &&
b411b363
PR
4343 (peer_state.conn >= C_STARTING_SYNC_S &&
4344 peer_state.conn <= C_WF_BITMAP_T));
4345
4346 if (cr)
69a22773 4347 ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
b411b363 4348
b30ab791 4349 put_ldev(device);
4ac4aada
LE
4350 if (ns.conn == C_MASK) {
4351 ns.conn = C_CONNECTED;
b30ab791
AG
4352 if (device->state.disk == D_NEGOTIATING) {
4353 drbd_force_state(device, NS(disk, D_FAILED));
b411b363 4354 } else if (peer_state.disk == D_NEGOTIATING) {
d0180171 4355 drbd_err(device, "Disk attach process on the peer node was aborted.\n");
b411b363 4356 peer_state.disk = D_DISKLESS;
580b9767 4357 real_peer_disk = D_DISKLESS;
b411b363 4358 } else {
9f4fe9ad 4359 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
82bc0194 4360 return -EIO;
0b0ba1ef 4361 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
9f4fe9ad 4362 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 4363 return -EIO;
b411b363
PR
4364 }
4365 }
4366 }
4367
0500813f 4368 spin_lock_irq(&device->resource->req_lock);
b30ab791 4369 if (os.i != drbd_read_state(device).i)
b411b363 4370 goto retry;
b30ab791 4371 clear_bit(CONSIDER_RESYNC, &device->flags);
b411b363
PR
4372 ns.peer = peer_state.role;
4373 ns.pdsk = real_peer_disk;
4374 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4ac4aada 4375 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
b30ab791 4376 ns.disk = device->new_state_tmp.disk;
4ac4aada 4377 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
b30ab791
AG
4378 if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4379 test_bit(NEW_CUR_UUID, &device->flags)) {
8554df1c 4380 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
481c6f50 4381 for temporal network outages! */
0500813f 4382 spin_unlock_irq(&device->resource->req_lock);
d0180171 4383 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
9f4fe9ad 4384 tl_clear(peer_device->connection);
b30ab791
AG
4385 drbd_uuid_new_current(device);
4386 clear_bit(NEW_CUR_UUID, &device->flags);
9f4fe9ad 4387 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
82bc0194 4388 return -EIO;
481c6f50 4389 }
b30ab791
AG
4390 rv = _drbd_set_state(device, ns, cs_flags, NULL);
4391 ns = drbd_read_state(device);
0500813f 4392 spin_unlock_irq(&device->resource->req_lock);
b411b363
PR
4393
4394 if (rv < SS_SUCCESS) {
9f4fe9ad 4395 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 4396 return -EIO;
b411b363
PR
4397 }
4398
4ac4aada
LE
4399 if (os.conn > C_WF_REPORT_PARAMS) {
4400 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
b411b363
PR
4401 peer_state.disk != D_NEGOTIATING ) {
4402 /* we want resync, peer has not yet decided to sync... */
4403 /* Nowadays only used when forcing a node into primary role and
4404 setting its disk to UpToDate with that */
69a22773
AG
4405 drbd_send_uuids(peer_device);
4406 drbd_send_current_state(peer_device);
b411b363
PR
4407 }
4408 }
4409
b30ab791 4410 clear_bit(DISCARD_MY_DATA, &device->flags);
b411b363 4411
b30ab791 4412 drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
b411b363 4413
82bc0194 4414 return 0;
b411b363
PR
4415}
4416
bde89a9e 4417static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
b411b363 4418{
9f4fe9ad 4419 struct drbd_peer_device *peer_device;
b30ab791 4420 struct drbd_device *device;
e658983a 4421 struct p_rs_uuid *p = pi->data;
4a76b161 4422
9f4fe9ad
AG
4423 peer_device = conn_peer_device(connection, pi->vnr);
4424 if (!peer_device)
4a76b161 4425 return -EIO;
9f4fe9ad 4426 device = peer_device->device;
b411b363 4427
b30ab791
AG
4428 wait_event(device->misc_wait,
4429 device->state.conn == C_WF_SYNC_UUID ||
4430 device->state.conn == C_BEHIND ||
4431 device->state.conn < C_CONNECTED ||
4432 device->state.disk < D_NEGOTIATING);
b411b363 4433
0b0ba1ef 4434 /* D_ASSERT(device, device->state.conn == C_WF_SYNC_UUID ); */
b411b363 4435
b411b363
PR
4436 /* Here the _drbd_uuid_ functions are right, current should
4437 _not_ be rotated into the history */
b30ab791
AG
4438 if (get_ldev_if_state(device, D_NEGOTIATING)) {
4439 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4440 _drbd_uuid_set(device, UI_BITMAP, 0UL);
b411b363 4441
b30ab791
AG
4442 drbd_print_uuids(device, "updated sync uuid");
4443 drbd_start_resync(device, C_SYNC_TARGET);
b411b363 4444
b30ab791 4445 put_ldev(device);
b411b363 4446 } else
d0180171 4447 drbd_err(device, "Ignoring SyncUUID packet!\n");
b411b363 4448
82bc0194 4449 return 0;
b411b363
PR
4450}
4451
2c46407d
AG
4452/**
4453 * receive_bitmap_plain
4454 *
4455 * Return 0 when done, 1 when another iteration is needed, and a negative error
4456 * code upon failure.
4457 */
4458static int
69a22773 4459receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
e658983a 4460 unsigned long *p, struct bm_xfer_ctx *c)
b411b363 4461{
50d0b1ad 4462 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
69a22773 4463 drbd_header_size(peer_device->connection);
e658983a 4464 unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
50d0b1ad 4465 c->bm_words - c->word_offset);
e658983a 4466 unsigned int want = num_words * sizeof(*p);
2c46407d 4467 int err;
b411b363 4468
50d0b1ad 4469 if (want != size) {
69a22773 4470 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
2c46407d 4471 return -EIO;
b411b363
PR
4472 }
4473 if (want == 0)
2c46407d 4474 return 0;
69a22773 4475 err = drbd_recv_all(peer_device->connection, p, want);
82bc0194 4476 if (err)
2c46407d 4477 return err;
b411b363 4478
69a22773 4479 drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
b411b363
PR
4480
4481 c->word_offset += num_words;
4482 c->bit_offset = c->word_offset * BITS_PER_LONG;
4483 if (c->bit_offset > c->bm_bits)
4484 c->bit_offset = c->bm_bits;
4485
2c46407d 4486 return 1;
b411b363
PR
4487}
4488
a02d1240
AG
4489static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4490{
4491 return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4492}
4493
4494static int dcbp_get_start(struct p_compressed_bm *p)
4495{
4496 return (p->encoding & 0x80) != 0;
4497}
4498
4499static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4500{
4501 return (p->encoding >> 4) & 0x7;
4502}
4503
2c46407d
AG
4504/**
4505 * recv_bm_rle_bits
4506 *
4507 * Return 0 when done, 1 when another iteration is needed, and a negative error
4508 * code upon failure.
4509 */
4510static int
69a22773 4511recv_bm_rle_bits(struct drbd_peer_device *peer_device,
b411b363 4512 struct p_compressed_bm *p,
c6d25cfe
PR
4513 struct bm_xfer_ctx *c,
4514 unsigned int len)
b411b363
PR
4515{
4516 struct bitstream bs;
4517 u64 look_ahead;
4518 u64 rl;
4519 u64 tmp;
4520 unsigned long s = c->bit_offset;
4521 unsigned long e;
a02d1240 4522 int toggle = dcbp_get_start(p);
b411b363
PR
4523 int have;
4524 int bits;
4525
a02d1240 4526 bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
b411b363
PR
4527
4528 bits = bitstream_get_bits(&bs, &look_ahead, 64);
4529 if (bits < 0)
2c46407d 4530 return -EIO;
b411b363
PR
4531
4532 for (have = bits; have > 0; s += rl, toggle = !toggle) {
4533 bits = vli_decode_bits(&rl, look_ahead);
4534 if (bits <= 0)
2c46407d 4535 return -EIO;
b411b363
PR
4536
4537 if (toggle) {
4538 e = s + rl -1;
4539 if (e >= c->bm_bits) {
69a22773 4540 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
2c46407d 4541 return -EIO;
b411b363 4542 }
69a22773 4543 _drbd_bm_set_bits(peer_device->device, s, e);
b411b363
PR
4544 }
4545
4546 if (have < bits) {
69a22773 4547 drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
b411b363
PR
4548 have, bits, look_ahead,
4549 (unsigned int)(bs.cur.b - p->code),
4550 (unsigned int)bs.buf_len);
2c46407d 4551 return -EIO;
b411b363 4552 }
d2da5b0c
LE
4553 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4554 if (likely(bits < 64))
4555 look_ahead >>= bits;
4556 else
4557 look_ahead = 0;
b411b363
PR
4558 have -= bits;
4559
4560 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4561 if (bits < 0)
2c46407d 4562 return -EIO;
b411b363
PR
4563 look_ahead |= tmp << have;
4564 have += bits;
4565 }
4566
4567 c->bit_offset = s;
4568 bm_xfer_ctx_bit_to_word_offset(c);
4569
2c46407d 4570 return (s != c->bm_bits);
b411b363
PR
4571}
4572
2c46407d
AG
4573/**
4574 * decode_bitmap_c
4575 *
4576 * Return 0 when done, 1 when another iteration is needed, and a negative error
4577 * code upon failure.
4578 */
4579static int
69a22773 4580decode_bitmap_c(struct drbd_peer_device *peer_device,
b411b363 4581 struct p_compressed_bm *p,
c6d25cfe
PR
4582 struct bm_xfer_ctx *c,
4583 unsigned int len)
b411b363 4584{
a02d1240 4585 if (dcbp_get_code(p) == RLE_VLI_Bits)
69a22773 4586 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
b411b363
PR
4587
4588 /* other variants had been implemented for evaluation,
4589 * but have been dropped as this one turned out to be "best"
4590 * during all our tests. */
4591
69a22773
AG
4592 drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4593 conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
2c46407d 4594 return -EIO;
b411b363
PR
4595}
4596
b30ab791 4597void INFO_bm_xfer_stats(struct drbd_device *device,
b411b363
PR
4598 const char *direction, struct bm_xfer_ctx *c)
4599{
4600 /* what would it take to transfer it "plaintext" */
a6b32bc3 4601 unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
50d0b1ad
AG
4602 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4603 unsigned int plain =
4604 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4605 c->bm_words * sizeof(unsigned long);
4606 unsigned int total = c->bytes[0] + c->bytes[1];
4607 unsigned int r;
b411b363
PR
4608
4609 /* total can not be zero. but just in case: */
4610 if (total == 0)
4611 return;
4612
4613 /* don't report if not compressed */
4614 if (total >= plain)
4615 return;
4616
4617 /* total < plain. check for overflow, still */
4618 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4619 : (1000 * total / plain);
4620
4621 if (r > 1000)
4622 r = 1000;
4623
4624 r = 1000 - r;
d0180171 4625 drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
b411b363
PR
4626 "total %u; compression: %u.%u%%\n",
4627 direction,
4628 c->bytes[1], c->packets[1],
4629 c->bytes[0], c->packets[0],
4630 total, r/10, r % 10);
4631}
4632
4633/* Since we are processing the bitfield from lower addresses to higher,
4634 it does not matter if the process it in 32 bit chunks or 64 bit
4635 chunks as long as it is little endian. (Understand it as byte stream,
4636 beginning with the lowest byte...) If we would use big endian
4637 we would need to process it from the highest address to the lowest,
4638 in order to be agnostic to the 32 vs 64 bits issue.
4639
4640 returns 0 on failure, 1 if we successfully received it. */
bde89a9e 4641static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
b411b363 4642{
9f4fe9ad 4643 struct drbd_peer_device *peer_device;
b30ab791 4644 struct drbd_device *device;
b411b363 4645 struct bm_xfer_ctx c;
2c46407d 4646 int err;
4a76b161 4647
9f4fe9ad
AG
4648 peer_device = conn_peer_device(connection, pi->vnr);
4649 if (!peer_device)
4a76b161 4650 return -EIO;
9f4fe9ad 4651 device = peer_device->device;
b411b363 4652
b30ab791 4653 drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
20ceb2b2
LE
4654 /* you are supposed to send additional out-of-sync information
4655 * if you actually set bits during this phase */
b411b363 4656
b411b363 4657 c = (struct bm_xfer_ctx) {
b30ab791
AG
4658 .bm_bits = drbd_bm_bits(device),
4659 .bm_words = drbd_bm_words(device),
b411b363
PR
4660 };
4661
2c46407d 4662 for(;;) {
e658983a 4663 if (pi->cmd == P_BITMAP)
69a22773 4664 err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
e658983a 4665 else if (pi->cmd == P_COMPRESSED_BITMAP) {
b411b363
PR
4666 /* MAYBE: sanity check that we speak proto >= 90,
4667 * and the feature is enabled! */
e658983a 4668 struct p_compressed_bm *p = pi->data;
b411b363 4669
bde89a9e 4670 if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
d0180171 4671 drbd_err(device, "ReportCBitmap packet too large\n");
82bc0194 4672 err = -EIO;
b411b363
PR
4673 goto out;
4674 }
e658983a 4675 if (pi->size <= sizeof(*p)) {
d0180171 4676 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
82bc0194 4677 err = -EIO;
78fcbdae 4678 goto out;
b411b363 4679 }
9f4fe9ad 4680 err = drbd_recv_all(peer_device->connection, p, pi->size);
e658983a
AG
4681 if (err)
4682 goto out;
69a22773 4683 err = decode_bitmap_c(peer_device, p, &c, pi->size);
b411b363 4684 } else {
d0180171 4685 drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
82bc0194 4686 err = -EIO;
b411b363
PR
4687 goto out;
4688 }
4689
e2857216 4690 c.packets[pi->cmd == P_BITMAP]++;
bde89a9e 4691 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
b411b363 4692
2c46407d
AG
4693 if (err <= 0) {
4694 if (err < 0)
4695 goto out;
b411b363 4696 break;
2c46407d 4697 }
9f4fe9ad 4698 err = drbd_recv_header(peer_device->connection, pi);
82bc0194 4699 if (err)
b411b363 4700 goto out;
2c46407d 4701 }
b411b363 4702
b30ab791 4703 INFO_bm_xfer_stats(device, "receive", &c);
b411b363 4704
b30ab791 4705 if (device->state.conn == C_WF_BITMAP_T) {
de1f8e4a
AG
4706 enum drbd_state_rv rv;
4707
b30ab791 4708 err = drbd_send_bitmap(device);
82bc0194 4709 if (err)
b411b363
PR
4710 goto out;
4711 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
b30ab791 4712 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
0b0ba1ef 4713 D_ASSERT(device, rv == SS_SUCCESS);
b30ab791 4714 } else if (device->state.conn != C_WF_BITMAP_S) {
b411b363
PR
4715 /* admin may have requested C_DISCONNECTING,
4716 * other threads may have noticed network errors */
d0180171 4717 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
b30ab791 4718 drbd_conn_str(device->state.conn));
b411b363 4719 }
82bc0194 4720 err = 0;
b411b363 4721
b411b363 4722 out:
b30ab791
AG
4723 drbd_bm_unlock(device);
4724 if (!err && device->state.conn == C_WF_BITMAP_S)
4725 drbd_start_resync(device, C_SYNC_SOURCE);
82bc0194 4726 return err;
b411b363
PR
4727}
4728
bde89a9e 4729static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
b411b363 4730{
1ec861eb 4731 drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
e2857216 4732 pi->cmd, pi->size);
b411b363 4733
bde89a9e 4734 return ignore_remaining_packet(connection, pi);
b411b363
PR
4735}
4736
bde89a9e 4737static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
0ced55a3 4738{
e7f52dfb
LE
4739 /* Make sure we've acked all the TCP data associated
4740 * with the data requests being unplugged */
bde89a9e 4741 drbd_tcp_quickack(connection->data.socket);
0ced55a3 4742
82bc0194 4743 return 0;
0ced55a3
PR
4744}
4745
bde89a9e 4746static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
73a01a18 4747{
9f4fe9ad 4748 struct drbd_peer_device *peer_device;
b30ab791 4749 struct drbd_device *device;
e658983a 4750 struct p_block_desc *p = pi->data;
4a76b161 4751
9f4fe9ad
AG
4752 peer_device = conn_peer_device(connection, pi->vnr);
4753 if (!peer_device)
4a76b161 4754 return -EIO;
9f4fe9ad 4755 device = peer_device->device;
73a01a18 4756
b30ab791 4757 switch (device->state.conn) {
f735e363
LE
4758 case C_WF_SYNC_UUID:
4759 case C_WF_BITMAP_T:
4760 case C_BEHIND:
4761 break;
4762 default:
d0180171 4763 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
b30ab791 4764 drbd_conn_str(device->state.conn));
f735e363
LE
4765 }
4766
b30ab791 4767 drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
73a01a18 4768
82bc0194 4769 return 0;
73a01a18
PR
4770}
4771
700ca8c0
PR
4772static int receive_rs_deallocated(struct drbd_connection *connection, struct packet_info *pi)
4773{
4774 struct drbd_peer_device *peer_device;
4775 struct p_block_desc *p = pi->data;
4776 struct drbd_device *device;
4777 sector_t sector;
4778 int size, err = 0;
4779
4780 peer_device = conn_peer_device(connection, pi->vnr);
4781 if (!peer_device)
4782 return -EIO;
4783 device = peer_device->device;
4784
4785 sector = be64_to_cpu(p->sector);
4786 size = be32_to_cpu(p->blksize);
4787
4788 dec_rs_pending(device);
4789
4790 if (get_ldev(device)) {
4791 struct drbd_peer_request *peer_req;
4792 const int op = REQ_OP_DISCARD;
4793
4794 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER, sector,
4795 size, false, GFP_NOIO);
4796 if (!peer_req) {
4797 put_ldev(device);
4798 return -ENOMEM;
4799 }
4800
4801 peer_req->w.cb = e_end_resync_block;
4802 peer_req->submit_jif = jiffies;
4803 peer_req->flags |= EE_IS_TRIM;
4804
4805 spin_lock_irq(&device->resource->req_lock);
4806 list_add_tail(&peer_req->w.list, &device->sync_ee);
4807 spin_unlock_irq(&device->resource->req_lock);
4808
4809 atomic_add(pi->size >> 9, &device->rs_sect_ev);
4810 err = drbd_submit_peer_request(device, peer_req, op, 0, DRBD_FAULT_RS_WR);
4811
4812 if (err) {
4813 spin_lock_irq(&device->resource->req_lock);
4814 list_del(&peer_req->w.list);
4815 spin_unlock_irq(&device->resource->req_lock);
4816
4817 drbd_free_peer_req(device, peer_req);
4818 put_ldev(device);
4819 err = 0;
4820 goto fail;
4821 }
4822
4823 inc_unacked(device);
4824
4825 /* No put_ldev() here. Gets called in drbd_endio_write_sec_final(),
4826 as well as drbd_rs_complete_io() */
4827 } else {
4828 fail:
4829 drbd_rs_complete_io(device, sector);
4830 drbd_send_ack_ex(peer_device, P_NEG_ACK, sector, size, ID_SYNCER);
4831 }
4832
4833 atomic_add(size >> 9, &device->rs_sect_in);
4834
4835 return err;
4836}
4837
02918be2
PR
4838struct data_cmd {
4839 int expect_payload;
4840 size_t pkt_size;
bde89a9e 4841 int (*fn)(struct drbd_connection *, struct packet_info *);
02918be2
PR
4842};
4843
4844static struct data_cmd drbd_cmd_handler[] = {
4845 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
4846 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
4847 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4848 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
e658983a
AG
4849 [P_BITMAP] = { 1, 0, receive_bitmap } ,
4850 [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4851 [P_UNPLUG_REMOTE] = { 0, 0, receive_UnplugRemote },
02918be2
PR
4852 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4853 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
e658983a
AG
4854 [P_SYNC_PARAM] = { 1, 0, receive_SyncParam },
4855 [P_SYNC_PARAM89] = { 1, 0, receive_SyncParam },
02918be2
PR
4856 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
4857 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
4858 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
4859 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
4860 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
4861 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4862 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4863 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4864 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
700ca8c0 4865 [P_RS_THIN_REQ] = { 0, sizeof(struct p_block_req), receive_DataRequest },
02918be2 4866 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
73a01a18 4867 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4a76b161 4868 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
036b17ea 4869 [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
a0fb3c47 4870 [P_TRIM] = { 0, sizeof(struct p_trim), receive_Data },
700ca8c0
PR
4871 [P_RS_DEALLOCATED] = { 0, sizeof(struct p_block_desc), receive_rs_deallocated },
4872
b411b363
PR
4873};
4874
bde89a9e 4875static void drbdd(struct drbd_connection *connection)
b411b363 4876{
77351055 4877 struct packet_info pi;
02918be2 4878 size_t shs; /* sub header size */
82bc0194 4879 int err;
b411b363 4880
bde89a9e 4881 while (get_t_state(&connection->receiver) == RUNNING) {
deebe195 4882 struct data_cmd *cmd;
b411b363 4883
bde89a9e 4884 drbd_thread_current_set_cpu(&connection->receiver);
944410e9 4885 update_receiver_timing_details(connection, drbd_recv_header);
bde89a9e 4886 if (drbd_recv_header(connection, &pi))
02918be2 4887 goto err_out;
b411b363 4888
deebe195 4889 cmd = &drbd_cmd_handler[pi.cmd];
4a76b161 4890 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
1ec861eb 4891 drbd_err(connection, "Unexpected data packet %s (0x%04x)",
2fcb8f30 4892 cmdname(pi.cmd), pi.cmd);
02918be2 4893 goto err_out;
0b33a916 4894 }
b411b363 4895
e658983a
AG
4896 shs = cmd->pkt_size;
4897 if (pi.size > shs && !cmd->expect_payload) {
1ec861eb 4898 drbd_err(connection, "No payload expected %s l:%d\n",
2fcb8f30 4899 cmdname(pi.cmd), pi.size);
02918be2 4900 goto err_out;
b411b363 4901 }
b411b363 4902
c13f7e1a 4903 if (shs) {
944410e9 4904 update_receiver_timing_details(connection, drbd_recv_all_warn);
bde89a9e 4905 err = drbd_recv_all_warn(connection, pi.data, shs);
a5c31904 4906 if (err)
c13f7e1a 4907 goto err_out;
e2857216 4908 pi.size -= shs;
c13f7e1a
LE
4909 }
4910
944410e9 4911 update_receiver_timing_details(connection, cmd->fn);
bde89a9e 4912 err = cmd->fn(connection, &pi);
4a76b161 4913 if (err) {
1ec861eb 4914 drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
9f5bdc33 4915 cmdname(pi.cmd), err, pi.size);
02918be2 4916 goto err_out;
b411b363
PR
4917 }
4918 }
82bc0194 4919 return;
b411b363 4920
82bc0194 4921 err_out:
bde89a9e 4922 conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
b411b363
PR
4923}
4924
bde89a9e 4925static void conn_disconnect(struct drbd_connection *connection)
b411b363 4926{
c06ece6b 4927 struct drbd_peer_device *peer_device;
bbeb641c 4928 enum drbd_conns oc;
376694a0 4929 int vnr;
b411b363 4930
bde89a9e 4931 if (connection->cstate == C_STANDALONE)
b411b363 4932 return;
b411b363 4933
545752d5
LE
4934 /* We are about to start the cleanup after connection loss.
4935 * Make sure drbd_make_request knows about that.
4936 * Usually we should be in some network failure state already,
4937 * but just in case we are not, we fix it up here.
4938 */
bde89a9e 4939 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
545752d5 4940
668700b4 4941 /* ack_receiver does not clean up anything. it must not interfere, either */
1c03e520 4942 drbd_thread_stop(&connection->ack_receiver);
668700b4
PR
4943 if (connection->ack_sender) {
4944 destroy_workqueue(connection->ack_sender);
4945 connection->ack_sender = NULL;
4946 }
bde89a9e 4947 drbd_free_sock(connection);
360cc740 4948
c141ebda 4949 rcu_read_lock();
c06ece6b
AG
4950 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4951 struct drbd_device *device = peer_device->device;
b30ab791 4952 kref_get(&device->kref);
c141ebda 4953 rcu_read_unlock();
69a22773 4954 drbd_disconnected(peer_device);
c06ece6b 4955 kref_put(&device->kref, drbd_destroy_device);
c141ebda
PR
4956 rcu_read_lock();
4957 }
4958 rcu_read_unlock();
4959
bde89a9e 4960 if (!list_empty(&connection->current_epoch->list))
1ec861eb 4961 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
12038a3a 4962 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
bde89a9e
AG
4963 atomic_set(&connection->current_epoch->epoch_size, 0);
4964 connection->send.seen_any_write_yet = false;
12038a3a 4965
1ec861eb 4966 drbd_info(connection, "Connection closed\n");
360cc740 4967
bde89a9e
AG
4968 if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4969 conn_try_outdate_peer_async(connection);
cb703454 4970
0500813f 4971 spin_lock_irq(&connection->resource->req_lock);
bde89a9e 4972 oc = connection->cstate;
bbeb641c 4973 if (oc >= C_UNCONNECTED)
bde89a9e 4974 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
bbeb641c 4975
0500813f 4976 spin_unlock_irq(&connection->resource->req_lock);
360cc740 4977
f3dfa40a 4978 if (oc == C_DISCONNECTING)
bde89a9e 4979 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
360cc740
PR
4980}
4981
69a22773 4982static int drbd_disconnected(struct drbd_peer_device *peer_device)
360cc740 4983{
69a22773 4984 struct drbd_device *device = peer_device->device;
360cc740 4985 unsigned int i;
b411b363 4986
85719573 4987 /* wait for current activity to cease. */
0500813f 4988 spin_lock_irq(&device->resource->req_lock);
b30ab791
AG
4989 _drbd_wait_ee_list_empty(device, &device->active_ee);
4990 _drbd_wait_ee_list_empty(device, &device->sync_ee);
4991 _drbd_wait_ee_list_empty(device, &device->read_ee);
0500813f 4992 spin_unlock_irq(&device->resource->req_lock);
b411b363
PR
4993
4994 /* We do not have data structures that would allow us to
4995 * get the rs_pending_cnt down to 0 again.
4996 * * On C_SYNC_TARGET we do not have any data structures describing
4997 * the pending RSDataRequest's we have sent.
4998 * * On C_SYNC_SOURCE there is no data structure that tracks
4999 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
5000 * And no, it is not the sum of the reference counts in the
5001 * resync_LRU. The resync_LRU tracks the whole operation including
5002 * the disk-IO, while the rs_pending_cnt only tracks the blocks
5003 * on the fly. */
b30ab791
AG
5004 drbd_rs_cancel_all(device);
5005 device->rs_total = 0;
5006 device->rs_failed = 0;
5007 atomic_set(&device->rs_pending_cnt, 0);
5008 wake_up(&device->misc_wait);
b411b363 5009
b30ab791
AG
5010 del_timer_sync(&device->resync_timer);
5011 resync_timer_fn((unsigned long)device);
b411b363 5012
b411b363
PR
5013 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
5014 * w_make_resync_request etc. which may still be on the worker queue
5015 * to be "canceled" */
b5043c5e 5016 drbd_flush_workqueue(&peer_device->connection->sender_work);
b411b363 5017
b30ab791 5018 drbd_finish_peer_reqs(device);
b411b363 5019
d10b4ea3
PR
5020 /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
5021 might have issued a work again. The one before drbd_finish_peer_reqs() is
5022 necessary to reclain net_ee in drbd_finish_peer_reqs(). */
b5043c5e 5023 drbd_flush_workqueue(&peer_device->connection->sender_work);
d10b4ea3 5024
08332d73
LE
5025 /* need to do it again, drbd_finish_peer_reqs() may have populated it
5026 * again via drbd_try_clear_on_disk_bm(). */
b30ab791 5027 drbd_rs_cancel_all(device);
b411b363 5028
b30ab791
AG
5029 kfree(device->p_uuid);
5030 device->p_uuid = NULL;
b411b363 5031
b30ab791 5032 if (!drbd_suspended(device))
69a22773 5033 tl_clear(peer_device->connection);
b411b363 5034
b30ab791 5035 drbd_md_sync(device);
b411b363 5036
be115b69
LE
5037 if (get_ldev(device)) {
5038 drbd_bitmap_io(device, &drbd_bm_write_copy_pages,
5039 "write from disconnected", BM_LOCKED_CHANGE_ALLOWED);
5040 put_ldev(device);
5041 }
20ceb2b2 5042
b411b363
PR
5043 /* tcp_close and release of sendpage pages can be deferred. I don't
5044 * want to use SO_LINGER, because apparently it can be deferred for
5045 * more than 20 seconds (longest time I checked).
5046 *
5047 * Actually we don't care for exactly when the network stack does its
5048 * put_page(), but release our reference on these pages right here.
5049 */
b30ab791 5050 i = drbd_free_peer_reqs(device, &device->net_ee);
b411b363 5051 if (i)
d0180171 5052 drbd_info(device, "net_ee not empty, killed %u entries\n", i);
b30ab791 5053 i = atomic_read(&device->pp_in_use_by_net);
435f0740 5054 if (i)
d0180171 5055 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
b30ab791 5056 i = atomic_read(&device->pp_in_use);
b411b363 5057 if (i)
d0180171 5058 drbd_info(device, "pp_in_use = %d, expected 0\n", i);
b411b363 5059
0b0ba1ef
AG
5060 D_ASSERT(device, list_empty(&device->read_ee));
5061 D_ASSERT(device, list_empty(&device->active_ee));
5062 D_ASSERT(device, list_empty(&device->sync_ee));
5063 D_ASSERT(device, list_empty(&device->done_ee));
b411b363 5064
360cc740 5065 return 0;
b411b363
PR
5066}
5067
5068/*
5069 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
5070 * we can agree on is stored in agreed_pro_version.
5071 *
5072 * feature flags and the reserved array should be enough room for future
5073 * enhancements of the handshake protocol, and possible plugins...
5074 *
5075 * for now, they are expected to be zero, but ignored.
5076 */
bde89a9e 5077static int drbd_send_features(struct drbd_connection *connection)
b411b363 5078{
9f5bdc33
AG
5079 struct drbd_socket *sock;
5080 struct p_connection_features *p;
b411b363 5081
bde89a9e
AG
5082 sock = &connection->data;
5083 p = conn_prepare_command(connection, sock);
9f5bdc33 5084 if (!p)
e8d17b01 5085 return -EIO;
b411b363
PR
5086 memset(p, 0, sizeof(*p));
5087 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
5088 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
20c68fde 5089 p->feature_flags = cpu_to_be32(PRO_FEATURES);
bde89a9e 5090 return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
b411b363
PR
5091}
5092
5093/*
5094 * return values:
5095 * 1 yes, we have a valid connection
5096 * 0 oops, did not work out, please try again
5097 * -1 peer talks different language,
5098 * no point in trying again, please go standalone.
5099 */
bde89a9e 5100static int drbd_do_features(struct drbd_connection *connection)
b411b363 5101{
bde89a9e 5102 /* ASSERT current == connection->receiver ... */
e658983a
AG
5103 struct p_connection_features *p;
5104 const int expect = sizeof(struct p_connection_features);
77351055 5105 struct packet_info pi;
a5c31904 5106 int err;
b411b363 5107
bde89a9e 5108 err = drbd_send_features(connection);
e8d17b01 5109 if (err)
b411b363
PR
5110 return 0;
5111
bde89a9e 5112 err = drbd_recv_header(connection, &pi);
69bc7bc3 5113 if (err)
b411b363
PR
5114 return 0;
5115
6038178e 5116 if (pi.cmd != P_CONNECTION_FEATURES) {
1ec861eb 5117 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
2fcb8f30 5118 cmdname(pi.cmd), pi.cmd);
b411b363
PR
5119 return -1;
5120 }
5121
77351055 5122 if (pi.size != expect) {
1ec861eb 5123 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
77351055 5124 expect, pi.size);
b411b363
PR
5125 return -1;
5126 }
5127
e658983a 5128 p = pi.data;
bde89a9e 5129 err = drbd_recv_all_warn(connection, p, expect);
a5c31904 5130 if (err)
b411b363 5131 return 0;
b411b363 5132
b411b363
PR
5133 p->protocol_min = be32_to_cpu(p->protocol_min);
5134 p->protocol_max = be32_to_cpu(p->protocol_max);
5135 if (p->protocol_max == 0)
5136 p->protocol_max = p->protocol_min;
5137
5138 if (PRO_VERSION_MAX < p->protocol_min ||
5139 PRO_VERSION_MIN > p->protocol_max)
5140 goto incompat;
5141
bde89a9e 5142 connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
20c68fde 5143 connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
b411b363 5144
1ec861eb 5145 drbd_info(connection, "Handshake successful: "
bde89a9e 5146 "Agreed network protocol version %d\n", connection->agreed_pro_version);
b411b363 5147
20c68fde
LE
5148 drbd_info(connection, "Agreed to%ssupport TRIM on protocol level\n",
5149 connection->agreed_features & FF_TRIM ? " " : " not ");
5150
92d94ae6
PR
5151 drbd_info(connection, "Agreed to%ssupport THIN_RESYNC on protocol level\n",
5152 connection->agreed_features & FF_THIN_RESYNC ? " " : " not ");
5153
b411b363
PR
5154 return 1;
5155
5156 incompat:
1ec861eb 5157 drbd_err(connection, "incompatible DRBD dialects: "
b411b363
PR
5158 "I support %d-%d, peer supports %d-%d\n",
5159 PRO_VERSION_MIN, PRO_VERSION_MAX,
5160 p->protocol_min, p->protocol_max);
5161 return -1;
5162}
5163
5164#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
bde89a9e 5165static int drbd_do_auth(struct drbd_connection *connection)
b411b363 5166{
1ec861eb
AG
5167 drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
5168 drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
b10d96cb 5169 return -1;
b411b363
PR
5170}
5171#else
5172#define CHALLENGE_LEN 64
b10d96cb
JT
5173
5174/* Return value:
5175 1 - auth succeeded,
5176 0 - failed, try again (network error),
5177 -1 - auth failed, don't try again.
5178*/
5179
bde89a9e 5180static int drbd_do_auth(struct drbd_connection *connection)
b411b363 5181{
9f5bdc33 5182 struct drbd_socket *sock;
b411b363 5183 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
b411b363
PR
5184 char *response = NULL;
5185 char *right_response = NULL;
5186 char *peers_ch = NULL;
44ed167d
PR
5187 unsigned int key_len;
5188 char secret[SHARED_SECRET_MAX]; /* 64 byte */
b411b363 5189 unsigned int resp_size;
9534d671 5190 SHASH_DESC_ON_STACK(desc, connection->cram_hmac_tfm);
77351055 5191 struct packet_info pi;
44ed167d 5192 struct net_conf *nc;
69bc7bc3 5193 int err, rv;
b411b363 5194
9f5bdc33 5195 /* FIXME: Put the challenge/response into the preallocated socket buffer. */
b411b363 5196
44ed167d 5197 rcu_read_lock();
bde89a9e 5198 nc = rcu_dereference(connection->net_conf);
44ed167d
PR
5199 key_len = strlen(nc->shared_secret);
5200 memcpy(secret, nc->shared_secret, key_len);
5201 rcu_read_unlock();
5202
9534d671
HX
5203 desc->tfm = connection->cram_hmac_tfm;
5204 desc->flags = 0;
b411b363 5205
9534d671 5206 rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
b411b363 5207 if (rv) {
9534d671 5208 drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
b10d96cb 5209 rv = -1;
b411b363
PR
5210 goto fail;
5211 }
5212
5213 get_random_bytes(my_challenge, CHALLENGE_LEN);
5214
bde89a9e
AG
5215 sock = &connection->data;
5216 if (!conn_prepare_command(connection, sock)) {
9f5bdc33
AG
5217 rv = 0;
5218 goto fail;
5219 }
bde89a9e 5220 rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
9f5bdc33 5221 my_challenge, CHALLENGE_LEN);
b411b363
PR
5222 if (!rv)
5223 goto fail;
5224
bde89a9e 5225 err = drbd_recv_header(connection, &pi);
69bc7bc3
AG
5226 if (err) {
5227 rv = 0;
b411b363 5228 goto fail;
69bc7bc3 5229 }
b411b363 5230
77351055 5231 if (pi.cmd != P_AUTH_CHALLENGE) {
1ec861eb 5232 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
2fcb8f30 5233 cmdname(pi.cmd), pi.cmd);
b411b363
PR
5234 rv = 0;
5235 goto fail;
5236 }
5237
77351055 5238 if (pi.size > CHALLENGE_LEN * 2) {
1ec861eb 5239 drbd_err(connection, "expected AuthChallenge payload too big.\n");
b10d96cb 5240 rv = -1;
b411b363
PR
5241 goto fail;
5242 }
5243
67cca286
PR
5244 if (pi.size < CHALLENGE_LEN) {
5245 drbd_err(connection, "AuthChallenge payload too small.\n");
5246 rv = -1;
5247 goto fail;
5248 }
5249
77351055 5250 peers_ch = kmalloc(pi.size, GFP_NOIO);
b411b363 5251 if (peers_ch == NULL) {
1ec861eb 5252 drbd_err(connection, "kmalloc of peers_ch failed\n");
b10d96cb 5253 rv = -1;
b411b363
PR
5254 goto fail;
5255 }
5256
bde89a9e 5257 err = drbd_recv_all_warn(connection, peers_ch, pi.size);
a5c31904 5258 if (err) {
b411b363
PR
5259 rv = 0;
5260 goto fail;
5261 }
5262
67cca286
PR
5263 if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5264 drbd_err(connection, "Peer presented the same challenge!\n");
5265 rv = -1;
5266 goto fail;
5267 }
5268
9534d671 5269 resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
b411b363
PR
5270 response = kmalloc(resp_size, GFP_NOIO);
5271 if (response == NULL) {
1ec861eb 5272 drbd_err(connection, "kmalloc of response failed\n");
b10d96cb 5273 rv = -1;
b411b363
PR
5274 goto fail;
5275 }
5276
9534d671 5277 rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
b411b363 5278 if (rv) {
1ec861eb 5279 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 5280 rv = -1;
b411b363
PR
5281 goto fail;
5282 }
5283
bde89a9e 5284 if (!conn_prepare_command(connection, sock)) {
9f5bdc33 5285 rv = 0;
b411b363 5286 goto fail;
9f5bdc33 5287 }
bde89a9e 5288 rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
9f5bdc33 5289 response, resp_size);
b411b363
PR
5290 if (!rv)
5291 goto fail;
5292
bde89a9e 5293 err = drbd_recv_header(connection, &pi);
69bc7bc3 5294 if (err) {
b411b363
PR
5295 rv = 0;
5296 goto fail;
5297 }
5298
77351055 5299 if (pi.cmd != P_AUTH_RESPONSE) {
1ec861eb 5300 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
2fcb8f30 5301 cmdname(pi.cmd), pi.cmd);
b411b363
PR
5302 rv = 0;
5303 goto fail;
5304 }
5305
77351055 5306 if (pi.size != resp_size) {
1ec861eb 5307 drbd_err(connection, "expected AuthResponse payload of wrong size\n");
b411b363
PR
5308 rv = 0;
5309 goto fail;
5310 }
b411b363 5311
bde89a9e 5312 err = drbd_recv_all_warn(connection, response , resp_size);
a5c31904 5313 if (err) {
b411b363
PR
5314 rv = 0;
5315 goto fail;
5316 }
5317
5318 right_response = kmalloc(resp_size, GFP_NOIO);
2d1ee87d 5319 if (right_response == NULL) {
1ec861eb 5320 drbd_err(connection, "kmalloc of right_response failed\n");
b10d96cb 5321 rv = -1;
b411b363
PR
5322 goto fail;
5323 }
5324
9534d671
HX
5325 rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5326 right_response);
b411b363 5327 if (rv) {
1ec861eb 5328 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 5329 rv = -1;
b411b363
PR
5330 goto fail;
5331 }
5332
5333 rv = !memcmp(response, right_response, resp_size);
5334
5335 if (rv)
1ec861eb 5336 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
44ed167d 5337 resp_size);
b10d96cb
JT
5338 else
5339 rv = -1;
b411b363
PR
5340
5341 fail:
5342 kfree(peers_ch);
5343 kfree(response);
5344 kfree(right_response);
9534d671 5345 shash_desc_zero(desc);
b411b363
PR
5346
5347 return rv;
5348}
5349#endif
5350
8fe60551 5351int drbd_receiver(struct drbd_thread *thi)
b411b363 5352{
bde89a9e 5353 struct drbd_connection *connection = thi->connection;
b411b363
PR
5354 int h;
5355
1ec861eb 5356 drbd_info(connection, "receiver (re)started\n");
b411b363
PR
5357
5358 do {
bde89a9e 5359 h = conn_connect(connection);
b411b363 5360 if (h == 0) {
bde89a9e 5361 conn_disconnect(connection);
20ee6390 5362 schedule_timeout_interruptible(HZ);
b411b363
PR
5363 }
5364 if (h == -1) {
1ec861eb 5365 drbd_warn(connection, "Discarding network configuration.\n");
bde89a9e 5366 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
5367 }
5368 } while (h == 0);
5369
91fd4dad 5370 if (h > 0)
bde89a9e 5371 drbdd(connection);
b411b363 5372
bde89a9e 5373 conn_disconnect(connection);
b411b363 5374
1ec861eb 5375 drbd_info(connection, "receiver terminated\n");
b411b363
PR
5376 return 0;
5377}
5378
5379/* ********* acknowledge sender ******** */
5380
bde89a9e 5381static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
b411b363 5382{
e658983a 5383 struct p_req_state_reply *p = pi->data;
e4f78ede
PR
5384 int retcode = be32_to_cpu(p->retcode);
5385
5386 if (retcode >= SS_SUCCESS) {
bde89a9e 5387 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
e4f78ede 5388 } else {
bde89a9e 5389 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
1ec861eb 5390 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
e4f78ede
PR
5391 drbd_set_st_err_str(retcode), retcode);
5392 }
bde89a9e 5393 wake_up(&connection->ping_wait);
e4f78ede 5394
2735a594 5395 return 0;
e4f78ede 5396}
b411b363 5397
bde89a9e 5398static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
b411b363 5399{
9f4fe9ad 5400 struct drbd_peer_device *peer_device;
b30ab791 5401 struct drbd_device *device;
e658983a 5402 struct p_req_state_reply *p = pi->data;
b411b363
PR
5403 int retcode = be32_to_cpu(p->retcode);
5404
9f4fe9ad
AG
5405 peer_device = conn_peer_device(connection, pi->vnr);
5406 if (!peer_device)
2735a594 5407 return -EIO;
9f4fe9ad 5408 device = peer_device->device;
1952e916 5409
bde89a9e 5410 if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
0b0ba1ef 5411 D_ASSERT(device, connection->agreed_pro_version < 100);
bde89a9e 5412 return got_conn_RqSReply(connection, pi);
4d0fc3fd
PR
5413 }
5414
b411b363 5415 if (retcode >= SS_SUCCESS) {
b30ab791 5416 set_bit(CL_ST_CHG_SUCCESS, &device->flags);
b411b363 5417 } else {
b30ab791 5418 set_bit(CL_ST_CHG_FAIL, &device->flags);
d0180171 5419 drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
e4f78ede 5420 drbd_set_st_err_str(retcode), retcode);
b411b363 5421 }
b30ab791 5422 wake_up(&device->state_wait);
b411b363 5423
2735a594 5424 return 0;
b411b363
PR
5425}
5426
bde89a9e 5427static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
b411b363 5428{
bde89a9e 5429 return drbd_send_ping_ack(connection);
b411b363
PR
5430
5431}
5432
bde89a9e 5433static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
b411b363
PR
5434{
5435 /* restore idle timeout */
bde89a9e
AG
5436 connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5437 if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5438 wake_up(&connection->ping_wait);
b411b363 5439
2735a594 5440 return 0;
b411b363
PR
5441}
5442
bde89a9e 5443static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
b411b363 5444{
9f4fe9ad 5445 struct drbd_peer_device *peer_device;
b30ab791 5446 struct drbd_device *device;
e658983a 5447 struct p_block_ack *p = pi->data;
b411b363
PR
5448 sector_t sector = be64_to_cpu(p->sector);
5449 int blksize = be32_to_cpu(p->blksize);
5450
9f4fe9ad
AG
5451 peer_device = conn_peer_device(connection, pi->vnr);
5452 if (!peer_device)
2735a594 5453 return -EIO;
9f4fe9ad 5454 device = peer_device->device;
1952e916 5455
9f4fe9ad 5456 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
b411b363 5457
69a22773 5458 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
b411b363 5459
b30ab791
AG
5460 if (get_ldev(device)) {
5461 drbd_rs_complete_io(device, sector);
5462 drbd_set_in_sync(device, sector, blksize);
1d53f09e 5463 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
b30ab791
AG
5464 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5465 put_ldev(device);
1d53f09e 5466 }
b30ab791
AG
5467 dec_rs_pending(device);
5468 atomic_add(blksize >> 9, &device->rs_sect_in);
b411b363 5469
2735a594 5470 return 0;
b411b363
PR
5471}
5472
bc9c5c41 5473static int
b30ab791 5474validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
bc9c5c41
AG
5475 struct rb_root *root, const char *func,
5476 enum drbd_req_event what, bool missing_ok)
b411b363
PR
5477{
5478 struct drbd_request *req;
5479 struct bio_and_error m;
5480
0500813f 5481 spin_lock_irq(&device->resource->req_lock);
b30ab791 5482 req = find_request(device, root, id, sector, missing_ok, func);
b411b363 5483 if (unlikely(!req)) {
0500813f 5484 spin_unlock_irq(&device->resource->req_lock);
85997675 5485 return -EIO;
b411b363
PR
5486 }
5487 __req_mod(req, what, &m);
0500813f 5488 spin_unlock_irq(&device->resource->req_lock);
b411b363
PR
5489
5490 if (m.bio)
b30ab791 5491 complete_master_bio(device, &m);
85997675 5492 return 0;
b411b363
PR
5493}
5494
bde89a9e 5495static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
b411b363 5496{
9f4fe9ad 5497 struct drbd_peer_device *peer_device;
b30ab791 5498 struct drbd_device *device;
e658983a 5499 struct p_block_ack *p = pi->data;
b411b363
PR
5500 sector_t sector = be64_to_cpu(p->sector);
5501 int blksize = be32_to_cpu(p->blksize);
5502 enum drbd_req_event what;
5503
9f4fe9ad
AG
5504 peer_device = conn_peer_device(connection, pi->vnr);
5505 if (!peer_device)
2735a594 5506 return -EIO;
9f4fe9ad 5507 device = peer_device->device;
1952e916 5508
69a22773 5509 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
b411b363 5510
579b57ed 5511 if (p->block_id == ID_SYNCER) {
b30ab791
AG
5512 drbd_set_in_sync(device, sector, blksize);
5513 dec_rs_pending(device);
2735a594 5514 return 0;
b411b363 5515 }
e05e1e59 5516 switch (pi->cmd) {
b411b363 5517 case P_RS_WRITE_ACK:
8554df1c 5518 what = WRITE_ACKED_BY_PEER_AND_SIS;
b411b363
PR
5519 break;
5520 case P_WRITE_ACK:
8554df1c 5521 what = WRITE_ACKED_BY_PEER;
b411b363
PR
5522 break;
5523 case P_RECV_ACK:
8554df1c 5524 what = RECV_ACKED_BY_PEER;
b411b363 5525 break;
d4dabbe2
LE
5526 case P_SUPERSEDED:
5527 what = CONFLICT_RESOLVED;
b411b363 5528 break;
7be8da07 5529 case P_RETRY_WRITE:
7be8da07 5530 what = POSTPONE_WRITE;
b411b363
PR
5531 break;
5532 default:
2735a594 5533 BUG();
b411b363
PR
5534 }
5535
b30ab791
AG
5536 return validate_req_change_req_state(device, p->block_id, sector,
5537 &device->write_requests, __func__,
2735a594 5538 what, false);
b411b363
PR
5539}
5540
bde89a9e 5541static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
b411b363 5542{
9f4fe9ad 5543 struct drbd_peer_device *peer_device;
b30ab791 5544 struct drbd_device *device;
e658983a 5545 struct p_block_ack *p = pi->data;
b411b363 5546 sector_t sector = be64_to_cpu(p->sector);
2deb8336 5547 int size = be32_to_cpu(p->blksize);
85997675 5548 int err;
b411b363 5549
9f4fe9ad
AG
5550 peer_device = conn_peer_device(connection, pi->vnr);
5551 if (!peer_device)
2735a594 5552 return -EIO;
9f4fe9ad 5553 device = peer_device->device;
b411b363 5554
69a22773 5555 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
b411b363 5556
579b57ed 5557 if (p->block_id == ID_SYNCER) {
b30ab791
AG
5558 dec_rs_pending(device);
5559 drbd_rs_failed_io(device, sector, size);
2735a594 5560 return 0;
b411b363 5561 }
2deb8336 5562
b30ab791
AG
5563 err = validate_req_change_req_state(device, p->block_id, sector,
5564 &device->write_requests, __func__,
303d1448 5565 NEG_ACKED, true);
85997675 5566 if (err) {
c3afd8f5
AG
5567 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5568 The master bio might already be completed, therefore the
5569 request is no longer in the collision hash. */
5570 /* In Protocol B we might already have got a P_RECV_ACK
5571 but then get a P_NEG_ACK afterwards. */
b30ab791 5572 drbd_set_out_of_sync(device, sector, size);
2deb8336 5573 }
2735a594 5574 return 0;
b411b363
PR
5575}
5576
bde89a9e 5577static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
b411b363 5578{
9f4fe9ad 5579 struct drbd_peer_device *peer_device;
b30ab791 5580 struct drbd_device *device;
e658983a 5581 struct p_block_ack *p = pi->data;
b411b363
PR
5582 sector_t sector = be64_to_cpu(p->sector);
5583
9f4fe9ad
AG
5584 peer_device = conn_peer_device(connection, pi->vnr);
5585 if (!peer_device)
2735a594 5586 return -EIO;
9f4fe9ad 5587 device = peer_device->device;
1952e916 5588
69a22773 5589 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
7be8da07 5590
d0180171 5591 drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
b411b363
PR
5592 (unsigned long long)sector, be32_to_cpu(p->blksize));
5593
b30ab791
AG
5594 return validate_req_change_req_state(device, p->block_id, sector,
5595 &device->read_requests, __func__,
2735a594 5596 NEG_ACKED, false);
b411b363
PR
5597}
5598
bde89a9e 5599static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
b411b363 5600{
9f4fe9ad 5601 struct drbd_peer_device *peer_device;
b30ab791 5602 struct drbd_device *device;
b411b363
PR
5603 sector_t sector;
5604 int size;
e658983a 5605 struct p_block_ack *p = pi->data;
1952e916 5606
9f4fe9ad
AG
5607 peer_device = conn_peer_device(connection, pi->vnr);
5608 if (!peer_device)
2735a594 5609 return -EIO;
9f4fe9ad 5610 device = peer_device->device;
b411b363
PR
5611
5612 sector = be64_to_cpu(p->sector);
5613 size = be32_to_cpu(p->blksize);
b411b363 5614
69a22773 5615 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
b411b363 5616
b30ab791 5617 dec_rs_pending(device);
b411b363 5618
b30ab791
AG
5619 if (get_ldev_if_state(device, D_FAILED)) {
5620 drbd_rs_complete_io(device, sector);
e05e1e59 5621 switch (pi->cmd) {
d612d309 5622 case P_NEG_RS_DREPLY:
b30ab791 5623 drbd_rs_failed_io(device, sector, size);
d612d309
PR
5624 case P_RS_CANCEL:
5625 break;
5626 default:
2735a594 5627 BUG();
d612d309 5628 }
b30ab791 5629 put_ldev(device);
b411b363
PR
5630 }
5631
2735a594 5632 return 0;
b411b363
PR
5633}
5634
bde89a9e 5635static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
b411b363 5636{
e658983a 5637 struct p_barrier_ack *p = pi->data;
c06ece6b 5638 struct drbd_peer_device *peer_device;
9ed57dcb 5639 int vnr;
1952e916 5640
bde89a9e 5641 tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
b411b363 5642
9ed57dcb 5643 rcu_read_lock();
c06ece6b
AG
5644 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5645 struct drbd_device *device = peer_device->device;
5646
b30ab791
AG
5647 if (device->state.conn == C_AHEAD &&
5648 atomic_read(&device->ap_in_flight) == 0 &&
5649 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5650 device->start_resync_timer.expires = jiffies + HZ;
5651 add_timer(&device->start_resync_timer);
9ed57dcb 5652 }
c4752ef1 5653 }
9ed57dcb 5654 rcu_read_unlock();
c4752ef1 5655
2735a594 5656 return 0;
b411b363
PR
5657}
5658
bde89a9e 5659static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
b411b363 5660{
9f4fe9ad 5661 struct drbd_peer_device *peer_device;
b30ab791 5662 struct drbd_device *device;
e658983a 5663 struct p_block_ack *p = pi->data;
84b8c06b 5664 struct drbd_device_work *dw;
b411b363
PR
5665 sector_t sector;
5666 int size;
5667
9f4fe9ad
AG
5668 peer_device = conn_peer_device(connection, pi->vnr);
5669 if (!peer_device)
2735a594 5670 return -EIO;
9f4fe9ad 5671 device = peer_device->device;
1952e916 5672
b411b363
PR
5673 sector = be64_to_cpu(p->sector);
5674 size = be32_to_cpu(p->blksize);
5675
69a22773 5676 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
b411b363
PR
5677
5678 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
b30ab791 5679 drbd_ov_out_of_sync_found(device, sector, size);
b411b363 5680 else
b30ab791 5681 ov_out_of_sync_print(device);
b411b363 5682
b30ab791 5683 if (!get_ldev(device))
2735a594 5684 return 0;
1d53f09e 5685
b30ab791
AG
5686 drbd_rs_complete_io(device, sector);
5687 dec_rs_pending(device);
b411b363 5688
b30ab791 5689 --device->ov_left;
ea5442af
LE
5690
5691 /* let's advance progress step marks only for every other megabyte */
b30ab791
AG
5692 if ((device->ov_left & 0x200) == 0x200)
5693 drbd_advance_rs_marks(device, device->ov_left);
ea5442af 5694
b30ab791 5695 if (device->ov_left == 0) {
84b8c06b
AG
5696 dw = kmalloc(sizeof(*dw), GFP_NOIO);
5697 if (dw) {
5698 dw->w.cb = w_ov_finished;
5699 dw->device = device;
5700 drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
b411b363 5701 } else {
84b8c06b 5702 drbd_err(device, "kmalloc(dw) failed.");
b30ab791
AG
5703 ov_out_of_sync_print(device);
5704 drbd_resync_finished(device);
b411b363
PR
5705 }
5706 }
b30ab791 5707 put_ldev(device);
2735a594 5708 return 0;
b411b363
PR
5709}
5710
bde89a9e 5711static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
0ced55a3 5712{
2735a594 5713 return 0;
b411b363
PR
5714}
5715
668700b4
PR
5716struct meta_sock_cmd {
5717 size_t pkt_size;
5718 int (*fn)(struct drbd_connection *connection, struct packet_info *);
5719};
5720
5721static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
0ced55a3 5722{
668700b4
PR
5723 long t;
5724 struct net_conf *nc;
32862ec7 5725
668700b4
PR
5726 rcu_read_lock();
5727 nc = rcu_dereference(connection->net_conf);
5728 t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5729 rcu_read_unlock();
c141ebda 5730
668700b4
PR
5731 t *= HZ;
5732 if (ping_timeout)
5733 t /= 10;
082a3439 5734
668700b4
PR
5735 connection->meta.socket->sk->sk_rcvtimeo = t;
5736}
32862ec7 5737
668700b4
PR
5738static void set_ping_timeout(struct drbd_connection *connection)
5739{
5740 set_rcvtimeo(connection, 1);
0ced55a3
PR
5741}
5742
668700b4
PR
5743static void set_idle_timeout(struct drbd_connection *connection)
5744{
5745 set_rcvtimeo(connection, 0);
5746}
b411b363 5747
668700b4 5748static struct meta_sock_cmd ack_receiver_tbl[] = {
e658983a
AG
5749 [P_PING] = { 0, got_Ping },
5750 [P_PING_ACK] = { 0, got_PingAck },
b411b363
PR
5751 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5752 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5753 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
d4dabbe2 5754 [P_SUPERSEDED] = { sizeof(struct p_block_ack), got_BlockAck },
b411b363
PR
5755 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
5756 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
1952e916 5757 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply },
b411b363
PR
5758 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
5759 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
5760 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5761 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
02918be2 5762 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
1952e916
AG
5763 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply },
5764 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5765 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
7201b972 5766};
b411b363 5767
1c03e520 5768int drbd_ack_receiver(struct drbd_thread *thi)
b411b363 5769{
bde89a9e 5770 struct drbd_connection *connection = thi->connection;
668700b4 5771 struct meta_sock_cmd *cmd = NULL;
77351055 5772 struct packet_info pi;
668700b4 5773 unsigned long pre_recv_jif;
257d0af6 5774 int rv;
bde89a9e 5775 void *buf = connection->meta.rbuf;
b411b363 5776 int received = 0;
bde89a9e 5777 unsigned int header_size = drbd_header_size(connection);
52b061a4 5778 int expect = header_size;
44ed167d 5779 bool ping_timeout_active = false;
3990e04d 5780 struct sched_param param = { .sched_priority = 2 };
b411b363 5781
3990e04d
PR
5782 rv = sched_setscheduler(current, SCHED_RR, &param);
5783 if (rv < 0)
668700b4 5784 drbd_err(connection, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv);
b411b363 5785
e77a0a5c 5786 while (get_t_state(thi) == RUNNING) {
80822284 5787 drbd_thread_current_set_cpu(thi);
b411b363 5788
668700b4 5789 conn_reclaim_net_peer_reqs(connection);
44ed167d 5790
bde89a9e
AG
5791 if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5792 if (drbd_send_ping(connection)) {
1ec861eb 5793 drbd_err(connection, "drbd_send_ping has failed\n");
b411b363 5794 goto reconnect;
841ce241 5795 }
668700b4 5796 set_ping_timeout(connection);
44ed167d 5797 ping_timeout_active = true;
b411b363
PR
5798 }
5799
668700b4 5800 pre_recv_jif = jiffies;
bde89a9e 5801 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
b411b363
PR
5802
5803 /* Note:
5804 * -EINTR (on meta) we got a signal
5805 * -EAGAIN (on meta) rcvtimeo expired
5806 * -ECONNRESET other side closed the connection
5807 * -ERESTARTSYS (on data) we got a signal
5808 * rv < 0 other than above: unexpected error!
5809 * rv == expected: full header or command
5810 * rv < expected: "woken" by signal during receive
5811 * rv == 0 : "connection shut down by peer"
5812 */
5813 if (likely(rv > 0)) {
5814 received += rv;
5815 buf += rv;
5816 } else if (rv == 0) {
bde89a9e 5817 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
b66623e3
PR
5818 long t;
5819 rcu_read_lock();
bde89a9e 5820 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
b66623e3
PR
5821 rcu_read_unlock();
5822
bde89a9e
AG
5823 t = wait_event_timeout(connection->ping_wait,
5824 connection->cstate < C_WF_REPORT_PARAMS,
b66623e3 5825 t);
599377ac
PR
5826 if (t)
5827 break;
5828 }
1ec861eb 5829 drbd_err(connection, "meta connection shut down by peer.\n");
b411b363
PR
5830 goto reconnect;
5831 } else if (rv == -EAGAIN) {
cb6518cb
LE
5832 /* If the data socket received something meanwhile,
5833 * that is good enough: peer is still alive. */
668700b4 5834 if (time_after(connection->last_received, pre_recv_jif))
cb6518cb 5835 continue;
f36af18c 5836 if (ping_timeout_active) {
1ec861eb 5837 drbd_err(connection, "PingAck did not arrive in time.\n");
b411b363
PR
5838 goto reconnect;
5839 }
bde89a9e 5840 set_bit(SEND_PING, &connection->flags);
b411b363
PR
5841 continue;
5842 } else if (rv == -EINTR) {
668700b4
PR
5843 /* maybe drbd_thread_stop(): the while condition will notice.
5844 * maybe woken for send_ping: we'll send a ping above,
5845 * and change the rcvtimeo */
5846 flush_signals(current);
b411b363
PR
5847 continue;
5848 } else {
1ec861eb 5849 drbd_err(connection, "sock_recvmsg returned %d\n", rv);
b411b363
PR
5850 goto reconnect;
5851 }
5852
5853 if (received == expect && cmd == NULL) {
bde89a9e 5854 if (decode_header(connection, connection->meta.rbuf, &pi))
b411b363 5855 goto reconnect;
668700b4
PR
5856 cmd = &ack_receiver_tbl[pi.cmd];
5857 if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
1ec861eb 5858 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
2fcb8f30 5859 cmdname(pi.cmd), pi.cmd);
b411b363
PR
5860 goto disconnect;
5861 }
e658983a 5862 expect = header_size + cmd->pkt_size;
52b061a4 5863 if (pi.size != expect - header_size) {
1ec861eb 5864 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
77351055 5865 pi.cmd, pi.size);
b411b363 5866 goto reconnect;
257d0af6 5867 }
b411b363
PR
5868 }
5869 if (received == expect) {
2735a594 5870 bool err;
a4fbda8e 5871
bde89a9e 5872 err = cmd->fn(connection, &pi);
2735a594 5873 if (err) {
1ec861eb 5874 drbd_err(connection, "%pf failed\n", cmd->fn);
b411b363 5875 goto reconnect;
1952e916 5876 }
b411b363 5877
bde89a9e 5878 connection->last_received = jiffies;
f36af18c 5879
668700b4
PR
5880 if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
5881 set_idle_timeout(connection);
44ed167d
PR
5882 ping_timeout_active = false;
5883 }
f36af18c 5884
bde89a9e 5885 buf = connection->meta.rbuf;
b411b363 5886 received = 0;
52b061a4 5887 expect = header_size;
b411b363
PR
5888 cmd = NULL;
5889 }
5890 }
5891
5892 if (0) {
5893reconnect:
bde89a9e
AG
5894 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5895 conn_md_sync(connection);
b411b363
PR
5896 }
5897 if (0) {
5898disconnect:
bde89a9e 5899 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 5900 }
b411b363 5901
668700b4 5902 drbd_info(connection, "ack_receiver terminated\n");
b411b363
PR
5903
5904 return 0;
5905}
668700b4
PR
5906
5907void drbd_send_acks_wf(struct work_struct *ws)
5908{
5909 struct drbd_peer_device *peer_device =
5910 container_of(ws, struct drbd_peer_device, send_acks_work);
5911 struct drbd_connection *connection = peer_device->connection;
5912 struct drbd_device *device = peer_device->device;
5913 struct net_conf *nc;
5914 int tcp_cork, err;
5915
5916 rcu_read_lock();
5917 nc = rcu_dereference(connection->net_conf);
5918 tcp_cork = nc->tcp_cork;
5919 rcu_read_unlock();
5920
5921 if (tcp_cork)
5922 drbd_tcp_cork(connection->meta.socket);
5923
5924 err = drbd_finish_peer_reqs(device);
5925 kref_put(&device->kref, drbd_destroy_device);
5926 /* get is in drbd_endio_write_sec_final(). That is necessary to keep the
5927 struct work_struct send_acks_work alive, which is in the peer_device object */
5928
5929 if (err) {
5930 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5931 return;
5932 }
5933
5934 if (tcp_cork)
5935 drbd_tcp_uncork(connection->meta.socket);
5936
5937 return;
5938}