]> git.proxmox.com Git - mirror_ubuntu-artful-kernel.git/blame - drivers/block/drbd/drbd_worker.c
drbd: improve throttling decisions of background resynchronisation
[mirror_ubuntu-artful-kernel.git] / drivers / block / drbd / drbd_worker.c
CommitLineData
b411b363
PR
1/*
2 drbd_worker.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
84b8c06b 24*/
b411b363 25
b411b363 26#include <linux/module.h>
b411b363
PR
27#include <linux/drbd.h>
28#include <linux/sched.h>
b411b363
PR
29#include <linux/wait.h>
30#include <linux/mm.h>
31#include <linux/memcontrol.h>
32#include <linux/mm_inline.h>
33#include <linux/slab.h>
34#include <linux/random.h>
b411b363
PR
35#include <linux/string.h>
36#include <linux/scatterlist.h>
37
38#include "drbd_int.h"
a3603a6e 39#include "drbd_protocol.h"
b411b363 40#include "drbd_req.h"
b411b363 41
d448a2e1
AG
42static int make_ov_request(struct drbd_device *, int);
43static int make_resync_request(struct drbd_device *, int);
b411b363 44
c5a91619
AG
45/* endio handlers:
46 * drbd_md_io_complete (defined here)
fcefa62e
AG
47 * drbd_request_endio (defined here)
48 * drbd_peer_request_endio (defined here)
c5a91619
AG
49 * bm_async_io_complete (defined in drbd_bitmap.c)
50 *
b411b363
PR
51 * For all these callbacks, note the following:
52 * The callbacks will be called in irq context by the IDE drivers,
53 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54 * Try to get the locking right :)
55 *
56 */
57
58
59/* About the global_state_lock
60 Each state transition on an device holds a read lock. In case we have
95f8efd0 61 to evaluate the resync after dependencies, we grab a write lock, because
b411b363
PR
62 we need stable states on all devices for that. */
63rwlock_t global_state_lock;
64
65/* used for synchronous meta data and bitmap IO
66 * submitted by drbd_md_sync_page_io()
67 */
68void drbd_md_io_complete(struct bio *bio, int error)
69{
b30ab791 70 struct drbd_device *device;
b411b363 71
e37d2438
LE
72 device = bio->bi_private;
73 device->md_io.error = error;
b411b363 74
0cfac5dd
PR
75 /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
76 * to timeout on the lower level device, and eventually detach from it.
77 * If this io completion runs after that timeout expired, this
78 * drbd_md_put_buffer() may allow us to finally try and re-attach.
79 * During normal operation, this only puts that extra reference
80 * down to 1 again.
81 * Make sure we first drop the reference, and only then signal
82 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
83 * next drbd_md_sync_page_io(), that we trigger the
b30ab791 84 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
0cfac5dd 85 */
b30ab791 86 drbd_md_put_buffer(device);
e37d2438 87 device->md_io.done = 1;
b30ab791 88 wake_up(&device->misc_wait);
cdfda633 89 bio_put(bio);
b30ab791
AG
90 if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
91 put_ldev(device);
b411b363
PR
92}
93
94/* reads on behalf of the partner,
95 * "submitted" by the receiver
96 */
a186e478 97static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
b411b363
PR
98{
99 unsigned long flags = 0;
6780139c
AG
100 struct drbd_peer_device *peer_device = peer_req->peer_device;
101 struct drbd_device *device = peer_device->device;
b411b363 102
0500813f 103 spin_lock_irqsave(&device->resource->req_lock, flags);
b30ab791 104 device->read_cnt += peer_req->i.size >> 9;
a8cd15ba 105 list_del(&peer_req->w.list);
b30ab791
AG
106 if (list_empty(&device->read_ee))
107 wake_up(&device->ee_wait);
db830c46 108 if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
b30ab791 109 __drbd_chk_io_error(device, DRBD_READ_ERROR);
0500813f 110 spin_unlock_irqrestore(&device->resource->req_lock, flags);
b411b363 111
6780139c 112 drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
b30ab791 113 put_ldev(device);
b411b363
PR
114}
115
116/* writes on behalf of the partner, or resync writes,
45bb912b 117 * "submitted" by the receiver, final stage. */
a0fb3c47 118void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
b411b363
PR
119{
120 unsigned long flags = 0;
6780139c
AG
121 struct drbd_peer_device *peer_device = peer_req->peer_device;
122 struct drbd_device *device = peer_device->device;
181286ad 123 struct drbd_interval i;
b411b363 124 int do_wake;
579b57ed 125 u64 block_id;
b411b363 126 int do_al_complete_io;
b411b363 127
db830c46 128 /* after we moved peer_req to done_ee,
b411b363
PR
129 * we may no longer access it,
130 * it may be freed/reused already!
131 * (as soon as we release the req_lock) */
181286ad 132 i = peer_req->i;
db830c46
AG
133 do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
134 block_id = peer_req->block_id;
b411b363 135
0500813f 136 spin_lock_irqsave(&device->resource->req_lock, flags);
b30ab791 137 device->writ_cnt += peer_req->i.size >> 9;
a8cd15ba 138 list_move_tail(&peer_req->w.list, &device->done_ee);
b411b363 139
bb3bfe96 140 /*
5e472264 141 * Do not remove from the write_requests tree here: we did not send the
bb3bfe96
AG
142 * Ack yet and did not wake possibly waiting conflicting requests.
143 * Removed from the tree from "drbd_process_done_ee" within the
84b8c06b 144 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
bb3bfe96
AG
145 * _drbd_clear_done_ee.
146 */
b411b363 147
b30ab791 148 do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
b411b363 149
a0fb3c47
LE
150 /* FIXME do we want to detach for failed REQ_DISCARD?
151 * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
152 if (peer_req->flags & EE_WAS_ERROR)
b30ab791 153 __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
0500813f 154 spin_unlock_irqrestore(&device->resource->req_lock, flags);
b411b363 155
579b57ed 156 if (block_id == ID_SYNCER)
b30ab791 157 drbd_rs_complete_io(device, i.sector);
b411b363
PR
158
159 if (do_wake)
b30ab791 160 wake_up(&device->ee_wait);
b411b363
PR
161
162 if (do_al_complete_io)
b30ab791 163 drbd_al_complete_io(device, &i);
b411b363 164
6780139c 165 wake_asender(peer_device->connection);
b30ab791 166 put_ldev(device);
45bb912b 167}
b411b363 168
45bb912b
LE
169/* writes on behalf of the partner, or resync writes,
170 * "submitted" by the receiver.
171 */
fcefa62e 172void drbd_peer_request_endio(struct bio *bio, int error)
45bb912b 173{
db830c46 174 struct drbd_peer_request *peer_req = bio->bi_private;
a8cd15ba 175 struct drbd_device *device = peer_req->peer_device->device;
45bb912b
LE
176 int uptodate = bio_flagged(bio, BIO_UPTODATE);
177 int is_write = bio_data_dir(bio) == WRITE;
a0fb3c47 178 int is_discard = !!(bio->bi_rw & REQ_DISCARD);
45bb912b 179
07194272 180 if (error && __ratelimit(&drbd_ratelimit_state))
d0180171 181 drbd_warn(device, "%s: error=%d s=%llus\n",
a0fb3c47
LE
182 is_write ? (is_discard ? "discard" : "write")
183 : "read", error,
db830c46 184 (unsigned long long)peer_req->i.sector);
45bb912b 185 if (!error && !uptodate) {
07194272 186 if (__ratelimit(&drbd_ratelimit_state))
d0180171 187 drbd_warn(device, "%s: setting error to -EIO s=%llus\n",
07194272 188 is_write ? "write" : "read",
db830c46 189 (unsigned long long)peer_req->i.sector);
45bb912b
LE
190 /* strange behavior of some lower level drivers...
191 * fail the request by clearing the uptodate flag,
192 * but do not return any error?! */
193 error = -EIO;
194 }
195
196 if (error)
db830c46 197 set_bit(__EE_WAS_ERROR, &peer_req->flags);
45bb912b
LE
198
199 bio_put(bio); /* no need for the bio anymore */
db830c46 200 if (atomic_dec_and_test(&peer_req->pending_bios)) {
45bb912b 201 if (is_write)
db830c46 202 drbd_endio_write_sec_final(peer_req);
45bb912b 203 else
db830c46 204 drbd_endio_read_sec_final(peer_req);
45bb912b 205 }
b411b363
PR
206}
207
208/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
209 */
fcefa62e 210void drbd_request_endio(struct bio *bio, int error)
b411b363 211{
a115413d 212 unsigned long flags;
b411b363 213 struct drbd_request *req = bio->bi_private;
84b8c06b 214 struct drbd_device *device = req->device;
a115413d 215 struct bio_and_error m;
b411b363
PR
216 enum drbd_req_event what;
217 int uptodate = bio_flagged(bio, BIO_UPTODATE);
218
b411b363 219 if (!error && !uptodate) {
d0180171 220 drbd_warn(device, "p %s: setting error to -EIO\n",
b411b363
PR
221 bio_data_dir(bio) == WRITE ? "write" : "read");
222 /* strange behavior of some lower level drivers...
223 * fail the request by clearing the uptodate flag,
224 * but do not return any error?! */
225 error = -EIO;
226 }
227
1b6dd252
PR
228
229 /* If this request was aborted locally before,
230 * but now was completed "successfully",
231 * chances are that this caused arbitrary data corruption.
232 *
233 * "aborting" requests, or force-detaching the disk, is intended for
234 * completely blocked/hung local backing devices which do no longer
235 * complete requests at all, not even do error completions. In this
236 * situation, usually a hard-reset and failover is the only way out.
237 *
238 * By "aborting", basically faking a local error-completion,
239 * we allow for a more graceful swichover by cleanly migrating services.
240 * Still the affected node has to be rebooted "soon".
241 *
242 * By completing these requests, we allow the upper layers to re-use
243 * the associated data pages.
244 *
245 * If later the local backing device "recovers", and now DMAs some data
246 * from disk into the original request pages, in the best case it will
247 * just put random data into unused pages; but typically it will corrupt
248 * meanwhile completely unrelated data, causing all sorts of damage.
249 *
250 * Which means delayed successful completion,
251 * especially for READ requests,
252 * is a reason to panic().
253 *
254 * We assume that a delayed *error* completion is OK,
255 * though we still will complain noisily about it.
256 */
257 if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
258 if (__ratelimit(&drbd_ratelimit_state))
d0180171 259 drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
1b6dd252
PR
260
261 if (!error)
262 panic("possible random memory corruption caused by delayed completion of aborted local request\n");
263 }
264
b411b363
PR
265 /* to avoid recursion in __req_mod */
266 if (unlikely(error)) {
2f632aeb
LE
267 if (bio->bi_rw & REQ_DISCARD)
268 what = (error == -EOPNOTSUPP)
269 ? DISCARD_COMPLETED_NOTSUPP
270 : DISCARD_COMPLETED_WITH_ERROR;
271 else
272 what = (bio_data_dir(bio) == WRITE)
8554df1c 273 ? WRITE_COMPLETED_WITH_ERROR
5c3c7e64 274 : (bio_rw(bio) == READ)
8554df1c
AG
275 ? READ_COMPLETED_WITH_ERROR
276 : READ_AHEAD_COMPLETED_WITH_ERROR;
b411b363 277 } else
8554df1c 278 what = COMPLETED_OK;
b411b363
PR
279
280 bio_put(req->private_bio);
281 req->private_bio = ERR_PTR(error);
282
a115413d 283 /* not req_mod(), we need irqsave here! */
0500813f 284 spin_lock_irqsave(&device->resource->req_lock, flags);
a115413d 285 __req_mod(req, what, &m);
0500813f 286 spin_unlock_irqrestore(&device->resource->req_lock, flags);
b30ab791 287 put_ldev(device);
a115413d
LE
288
289 if (m.bio)
b30ab791 290 complete_master_bio(device, &m);
b411b363
PR
291}
292
79a3c8d3 293void drbd_csum_ee(struct crypto_hash *tfm, struct drbd_peer_request *peer_req, void *digest)
45bb912b
LE
294{
295 struct hash_desc desc;
296 struct scatterlist sg;
db830c46 297 struct page *page = peer_req->pages;
45bb912b
LE
298 struct page *tmp;
299 unsigned len;
300
301 desc.tfm = tfm;
302 desc.flags = 0;
303
304 sg_init_table(&sg, 1);
305 crypto_hash_init(&desc);
306
307 while ((tmp = page_chain_next(page))) {
308 /* all but the last page will be fully used */
309 sg_set_page(&sg, page, PAGE_SIZE, 0);
310 crypto_hash_update(&desc, &sg, sg.length);
311 page = tmp;
312 }
313 /* and now the last, possibly only partially used page */
db830c46 314 len = peer_req->i.size & (PAGE_SIZE - 1);
45bb912b
LE
315 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
316 crypto_hash_update(&desc, &sg, sg.length);
317 crypto_hash_final(&desc, digest);
318}
319
79a3c8d3 320void drbd_csum_bio(struct crypto_hash *tfm, struct bio *bio, void *digest)
b411b363
PR
321{
322 struct hash_desc desc;
323 struct scatterlist sg;
7988613b
KO
324 struct bio_vec bvec;
325 struct bvec_iter iter;
b411b363
PR
326
327 desc.tfm = tfm;
328 desc.flags = 0;
329
330 sg_init_table(&sg, 1);
331 crypto_hash_init(&desc);
332
7988613b
KO
333 bio_for_each_segment(bvec, bio, iter) {
334 sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
b411b363
PR
335 crypto_hash_update(&desc, &sg, sg.length);
336 }
337 crypto_hash_final(&desc, digest);
338}
339
9676c760 340/* MAYBE merge common code with w_e_end_ov_req */
99920dc5 341static int w_e_send_csum(struct drbd_work *w, int cancel)
b411b363 342{
a8cd15ba 343 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
6780139c
AG
344 struct drbd_peer_device *peer_device = peer_req->peer_device;
345 struct drbd_device *device = peer_device->device;
b411b363
PR
346 int digest_size;
347 void *digest;
99920dc5 348 int err = 0;
b411b363 349
53ea4331
LE
350 if (unlikely(cancel))
351 goto out;
b411b363 352
9676c760 353 if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
53ea4331 354 goto out;
b411b363 355
6780139c 356 digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
53ea4331
LE
357 digest = kmalloc(digest_size, GFP_NOIO);
358 if (digest) {
db830c46
AG
359 sector_t sector = peer_req->i.sector;
360 unsigned int size = peer_req->i.size;
6780139c 361 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
9676c760 362 /* Free peer_req and pages before send.
53ea4331
LE
363 * In case we block on congestion, we could otherwise run into
364 * some distributed deadlock, if the other side blocks on
365 * congestion as well, because our receiver blocks in
c37c8ecf 366 * drbd_alloc_pages due to pp_in_use > max_buffers. */
b30ab791 367 drbd_free_peer_req(device, peer_req);
db830c46 368 peer_req = NULL;
b30ab791 369 inc_rs_pending(device);
6780139c 370 err = drbd_send_drequest_csum(peer_device, sector, size,
db1b0b72
AG
371 digest, digest_size,
372 P_CSUM_RS_REQUEST);
53ea4331
LE
373 kfree(digest);
374 } else {
d0180171 375 drbd_err(device, "kmalloc() of digest failed.\n");
99920dc5 376 err = -ENOMEM;
53ea4331 377 }
b411b363 378
53ea4331 379out:
db830c46 380 if (peer_req)
b30ab791 381 drbd_free_peer_req(device, peer_req);
b411b363 382
99920dc5 383 if (unlikely(err))
d0180171 384 drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
99920dc5 385 return err;
b411b363
PR
386}
387
388#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
389
69a22773 390static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
b411b363 391{
69a22773 392 struct drbd_device *device = peer_device->device;
db830c46 393 struct drbd_peer_request *peer_req;
b411b363 394
b30ab791 395 if (!get_ldev(device))
80a40e43 396 return -EIO;
b411b363
PR
397
398 /* GFP_TRY, because if there is no memory available right now, this may
399 * be rescheduled for later. It is "only" background resync, after all. */
69a22773 400 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
a0fb3c47 401 size, true /* has real payload */, GFP_TRY);
db830c46 402 if (!peer_req)
80a40e43 403 goto defer;
b411b363 404
a8cd15ba 405 peer_req->w.cb = w_e_send_csum;
0500813f 406 spin_lock_irq(&device->resource->req_lock);
b9ed7080 407 list_add_tail(&peer_req->w.list, &device->read_ee);
0500813f 408 spin_unlock_irq(&device->resource->req_lock);
b411b363 409
b30ab791
AG
410 atomic_add(size >> 9, &device->rs_sect_ev);
411 if (drbd_submit_peer_request(device, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
80a40e43 412 return 0;
b411b363 413
10f6d992
LE
414 /* If it failed because of ENOMEM, retry should help. If it failed
415 * because bio_add_page failed (probably broken lower level driver),
416 * retry may or may not help.
417 * If it does not, you may need to force disconnect. */
0500813f 418 spin_lock_irq(&device->resource->req_lock);
a8cd15ba 419 list_del(&peer_req->w.list);
0500813f 420 spin_unlock_irq(&device->resource->req_lock);
22cc37a9 421
b30ab791 422 drbd_free_peer_req(device, peer_req);
80a40e43 423defer:
b30ab791 424 put_ldev(device);
80a40e43 425 return -EAGAIN;
b411b363
PR
426}
427
99920dc5 428int w_resync_timer(struct drbd_work *w, int cancel)
b411b363 429{
84b8c06b
AG
430 struct drbd_device *device =
431 container_of(w, struct drbd_device, resync_work);
432
b30ab791 433 switch (device->state.conn) {
63106d3c 434 case C_VERIFY_S:
d448a2e1 435 make_ov_request(device, cancel);
63106d3c
PR
436 break;
437 case C_SYNC_TARGET:
d448a2e1 438 make_resync_request(device, cancel);
63106d3c 439 break;
b411b363
PR
440 }
441
99920dc5 442 return 0;
794abb75
PR
443}
444
445void resync_timer_fn(unsigned long data)
446{
b30ab791 447 struct drbd_device *device = (struct drbd_device *) data;
794abb75 448
15e26f6a
LE
449 drbd_queue_work_if_unqueued(
450 &first_peer_device(device)->connection->sender_work,
451 &device->resync_work);
b411b363
PR
452}
453
778f271d
PR
454static void fifo_set(struct fifo_buffer *fb, int value)
455{
456 int i;
457
458 for (i = 0; i < fb->size; i++)
f10f2623 459 fb->values[i] = value;
778f271d
PR
460}
461
462static int fifo_push(struct fifo_buffer *fb, int value)
463{
464 int ov;
465
466 ov = fb->values[fb->head_index];
467 fb->values[fb->head_index++] = value;
468
469 if (fb->head_index >= fb->size)
470 fb->head_index = 0;
471
472 return ov;
473}
474
475static void fifo_add_val(struct fifo_buffer *fb, int value)
476{
477 int i;
478
479 for (i = 0; i < fb->size; i++)
480 fb->values[i] += value;
481}
482
9958c857
PR
483struct fifo_buffer *fifo_alloc(int fifo_size)
484{
485 struct fifo_buffer *fb;
486
8747d30a 487 fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
9958c857
PR
488 if (!fb)
489 return NULL;
490
491 fb->head_index = 0;
492 fb->size = fifo_size;
493 fb->total = 0;
494
495 return fb;
496}
497
0e49d7b0 498static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
778f271d 499{
daeda1cc 500 struct disk_conf *dc;
7f34f614 501 unsigned int want; /* The number of sectors we want in-flight */
778f271d 502 int req_sect; /* Number of sectors to request in this turn */
7f34f614 503 int correction; /* Number of sectors more we need in-flight */
778f271d
PR
504 int cps; /* correction per invocation of drbd_rs_controller() */
505 int steps; /* Number of time steps to plan ahead */
506 int curr_corr;
507 int max_sect;
813472ce 508 struct fifo_buffer *plan;
778f271d 509
b30ab791
AG
510 dc = rcu_dereference(device->ldev->disk_conf);
511 plan = rcu_dereference(device->rs_plan_s);
778f271d 512
813472ce 513 steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
778f271d 514
b30ab791 515 if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
daeda1cc 516 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
778f271d 517 } else { /* normal path */
daeda1cc
PR
518 want = dc->c_fill_target ? dc->c_fill_target :
519 sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
778f271d
PR
520 }
521
b30ab791 522 correction = want - device->rs_in_flight - plan->total;
778f271d
PR
523
524 /* Plan ahead */
525 cps = correction / steps;
813472ce
PR
526 fifo_add_val(plan, cps);
527 plan->total += cps * steps;
778f271d
PR
528
529 /* What we do in this step */
813472ce
PR
530 curr_corr = fifo_push(plan, 0);
531 plan->total -= curr_corr;
778f271d
PR
532
533 req_sect = sect_in + curr_corr;
534 if (req_sect < 0)
535 req_sect = 0;
536
daeda1cc 537 max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
778f271d
PR
538 if (req_sect > max_sect)
539 req_sect = max_sect;
540
541 /*
d0180171 542 drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
b30ab791
AG
543 sect_in, device->rs_in_flight, want, correction,
544 steps, cps, device->rs_planed, curr_corr, req_sect);
778f271d
PR
545 */
546
547 return req_sect;
548}
549
b30ab791 550static int drbd_rs_number_requests(struct drbd_device *device)
e65f440d 551{
0e49d7b0
LE
552 unsigned int sect_in; /* Number of sectors that came in since the last turn */
553 int number, mxb;
554
555 sect_in = atomic_xchg(&device->rs_sect_in, 0);
556 device->rs_in_flight -= sect_in;
813472ce
PR
557
558 rcu_read_lock();
0e49d7b0 559 mxb = drbd_get_max_buffers(device) / 2;
b30ab791 560 if (rcu_dereference(device->rs_plan_s)->size) {
0e49d7b0 561 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
b30ab791 562 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
e65f440d 563 } else {
b30ab791
AG
564 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
565 number = SLEEP_TIME * device->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ);
e65f440d 566 }
813472ce 567 rcu_read_unlock();
e65f440d 568
0e49d7b0
LE
569 /* Don't have more than "max-buffers"/2 in-flight.
570 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
571 * potentially causing a distributed deadlock on congestion during
572 * online-verify or (checksum-based) resync, if max-buffers,
573 * socket buffer sizes and resync rate settings are mis-configured. */
7f34f614
LE
574
575 /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
576 * mxb (as used here, and in drbd_alloc_pages on the peer) is
577 * "number of pages" (typically also 4k),
578 * but "rs_in_flight" is in "sectors" (512 Byte). */
579 if (mxb - device->rs_in_flight/8 < number)
580 number = mxb - device->rs_in_flight/8;
0e49d7b0 581
e65f440d
LE
582 return number;
583}
584
44a4d551 585static int make_resync_request(struct drbd_device *const device, int cancel)
b411b363 586{
44a4d551
LE
587 struct drbd_peer_device *const peer_device = first_peer_device(device);
588 struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
b411b363
PR
589 unsigned long bit;
590 sector_t sector;
b30ab791 591 const sector_t capacity = drbd_get_capacity(device->this_bdev);
1816a2b4 592 int max_bio_size;
e65f440d 593 int number, rollback_i, size;
506afb62 594 int align, requeue = 0;
0f0601f4 595 int i = 0;
b411b363
PR
596
597 if (unlikely(cancel))
99920dc5 598 return 0;
b411b363 599
b30ab791 600 if (device->rs_total == 0) {
af85e8e8 601 /* empty resync? */
b30ab791 602 drbd_resync_finished(device);
99920dc5 603 return 0;
af85e8e8
LE
604 }
605
b30ab791
AG
606 if (!get_ldev(device)) {
607 /* Since we only need to access device->rsync a
608 get_ldev_if_state(device,D_FAILED) would be sufficient, but
b411b363
PR
609 to continue resync with a broken disk makes no sense at
610 all */
d0180171 611 drbd_err(device, "Disk broke down during resync!\n");
99920dc5 612 return 0;
b411b363
PR
613 }
614
b30ab791
AG
615 max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
616 number = drbd_rs_number_requests(device);
0e49d7b0 617 if (number <= 0)
0f0601f4 618 goto requeue;
b411b363 619
b411b363 620 for (i = 0; i < number; i++) {
506afb62
LE
621 /* Stop generating RS requests when half of the send buffer is filled,
622 * but notify TCP that we'd like to have more space. */
44a4d551
LE
623 mutex_lock(&connection->data.mutex);
624 if (connection->data.socket) {
506afb62
LE
625 struct sock *sk = connection->data.socket->sk;
626 int queued = sk->sk_wmem_queued;
627 int sndbuf = sk->sk_sndbuf;
628 if (queued > sndbuf / 2) {
629 requeue = 1;
630 if (sk->sk_socket)
631 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
632 }
633 } else
634 requeue = 1;
44a4d551 635 mutex_unlock(&connection->data.mutex);
506afb62 636 if (requeue)
b411b363
PR
637 goto requeue;
638
639next_sector:
640 size = BM_BLOCK_SIZE;
b30ab791 641 bit = drbd_bm_find_next(device, device->bm_resync_fo);
b411b363 642
4b0715f0 643 if (bit == DRBD_END_OF_BITMAP) {
b30ab791
AG
644 device->bm_resync_fo = drbd_bm_bits(device);
645 put_ldev(device);
99920dc5 646 return 0;
b411b363
PR
647 }
648
649 sector = BM_BIT_TO_SECT(bit);
650
ad3fee79 651 if (drbd_try_rs_begin_io(device, sector)) {
b30ab791 652 device->bm_resync_fo = bit;
b411b363
PR
653 goto requeue;
654 }
b30ab791 655 device->bm_resync_fo = bit + 1;
b411b363 656
b30ab791
AG
657 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
658 drbd_rs_complete_io(device, sector);
b411b363
PR
659 goto next_sector;
660 }
661
1816a2b4 662#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
b411b363
PR
663 /* try to find some adjacent bits.
664 * we stop if we have already the maximum req size.
665 *
666 * Additionally always align bigger requests, in order to
667 * be prepared for all stripe sizes of software RAIDs.
b411b363
PR
668 */
669 align = 1;
d207450c 670 rollback_i = i;
6377b923 671 while (i < number) {
1816a2b4 672 if (size + BM_BLOCK_SIZE > max_bio_size)
b411b363
PR
673 break;
674
675 /* Be always aligned */
676 if (sector & ((1<<(align+3))-1))
677 break;
678
679 /* do not cross extent boundaries */
680 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
681 break;
682 /* now, is it actually dirty, after all?
683 * caution, drbd_bm_test_bit is tri-state for some
684 * obscure reason; ( b == 0 ) would get the out-of-band
685 * only accidentally right because of the "oddly sized"
686 * adjustment below */
b30ab791 687 if (drbd_bm_test_bit(device, bit+1) != 1)
b411b363
PR
688 break;
689 bit++;
690 size += BM_BLOCK_SIZE;
691 if ((BM_BLOCK_SIZE << align) <= size)
692 align++;
693 i++;
694 }
695 /* if we merged some,
696 * reset the offset to start the next drbd_bm_find_next from */
697 if (size > BM_BLOCK_SIZE)
b30ab791 698 device->bm_resync_fo = bit + 1;
b411b363
PR
699#endif
700
701 /* adjust very last sectors, in case we are oddly sized */
702 if (sector + (size>>9) > capacity)
703 size = (capacity-sector)<<9;
aaaba345
LE
704
705 if (device->use_csums) {
44a4d551 706 switch (read_for_csum(peer_device, sector, size)) {
80a40e43 707 case -EIO: /* Disk failure */
b30ab791 708 put_ldev(device);
99920dc5 709 return -EIO;
80a40e43 710 case -EAGAIN: /* allocation failed, or ldev busy */
b30ab791
AG
711 drbd_rs_complete_io(device, sector);
712 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
d207450c 713 i = rollback_i;
b411b363 714 goto requeue;
80a40e43
LE
715 case 0:
716 /* everything ok */
717 break;
718 default:
719 BUG();
b411b363
PR
720 }
721 } else {
99920dc5
AG
722 int err;
723
b30ab791 724 inc_rs_pending(device);
44a4d551 725 err = drbd_send_drequest(peer_device, P_RS_DATA_REQUEST,
99920dc5
AG
726 sector, size, ID_SYNCER);
727 if (err) {
d0180171 728 drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
b30ab791
AG
729 dec_rs_pending(device);
730 put_ldev(device);
99920dc5 731 return err;
b411b363
PR
732 }
733 }
734 }
735
b30ab791 736 if (device->bm_resync_fo >= drbd_bm_bits(device)) {
b411b363
PR
737 /* last syncer _request_ was sent,
738 * but the P_RS_DATA_REPLY not yet received. sync will end (and
739 * next sync group will resume), as soon as we receive the last
740 * resync data block, and the last bit is cleared.
741 * until then resync "work" is "inactive" ...
742 */
b30ab791 743 put_ldev(device);
99920dc5 744 return 0;
b411b363
PR
745 }
746
747 requeue:
b30ab791
AG
748 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
749 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
750 put_ldev(device);
99920dc5 751 return 0;
b411b363
PR
752}
753
d448a2e1 754static int make_ov_request(struct drbd_device *device, int cancel)
b411b363
PR
755{
756 int number, i, size;
757 sector_t sector;
b30ab791 758 const sector_t capacity = drbd_get_capacity(device->this_bdev);
58ffa580 759 bool stop_sector_reached = false;
b411b363
PR
760
761 if (unlikely(cancel))
762 return 1;
763
b30ab791 764 number = drbd_rs_number_requests(device);
b411b363 765
b30ab791 766 sector = device->ov_position;
b411b363 767 for (i = 0; i < number; i++) {
58ffa580 768 if (sector >= capacity)
b411b363 769 return 1;
58ffa580
LE
770
771 /* We check for "finished" only in the reply path:
772 * w_e_end_ov_reply().
773 * We need to send at least one request out. */
774 stop_sector_reached = i > 0
b30ab791
AG
775 && verify_can_do_stop_sector(device)
776 && sector >= device->ov_stop_sector;
58ffa580
LE
777 if (stop_sector_reached)
778 break;
b411b363
PR
779
780 size = BM_BLOCK_SIZE;
781
ad3fee79 782 if (drbd_try_rs_begin_io(device, sector)) {
b30ab791 783 device->ov_position = sector;
b411b363
PR
784 goto requeue;
785 }
786
787 if (sector + (size>>9) > capacity)
788 size = (capacity-sector)<<9;
789
b30ab791 790 inc_rs_pending(device);
69a22773 791 if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
b30ab791 792 dec_rs_pending(device);
b411b363
PR
793 return 0;
794 }
795 sector += BM_SECT_PER_BIT;
796 }
b30ab791 797 device->ov_position = sector;
b411b363
PR
798
799 requeue:
b30ab791 800 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
58ffa580 801 if (i == 0 || !stop_sector_reached)
b30ab791 802 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
b411b363
PR
803 return 1;
804}
805
99920dc5 806int w_ov_finished(struct drbd_work *w, int cancel)
b411b363 807{
84b8c06b
AG
808 struct drbd_device_work *dw =
809 container_of(w, struct drbd_device_work, w);
810 struct drbd_device *device = dw->device;
811 kfree(dw);
b30ab791
AG
812 ov_out_of_sync_print(device);
813 drbd_resync_finished(device);
b411b363 814
99920dc5 815 return 0;
b411b363
PR
816}
817
99920dc5 818static int w_resync_finished(struct drbd_work *w, int cancel)
b411b363 819{
84b8c06b
AG
820 struct drbd_device_work *dw =
821 container_of(w, struct drbd_device_work, w);
822 struct drbd_device *device = dw->device;
823 kfree(dw);
b411b363 824
b30ab791 825 drbd_resync_finished(device);
b411b363 826
99920dc5 827 return 0;
b411b363
PR
828}
829
b30ab791 830static void ping_peer(struct drbd_device *device)
af85e8e8 831{
a6b32bc3 832 struct drbd_connection *connection = first_peer_device(device)->connection;
2a67d8b9 833
bde89a9e
AG
834 clear_bit(GOT_PING_ACK, &connection->flags);
835 request_ping(connection);
836 wait_event(connection->ping_wait,
837 test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
af85e8e8
LE
838}
839
b30ab791 840int drbd_resync_finished(struct drbd_device *device)
b411b363
PR
841{
842 unsigned long db, dt, dbdt;
843 unsigned long n_oos;
844 union drbd_state os, ns;
84b8c06b 845 struct drbd_device_work *dw;
b411b363 846 char *khelper_cmd = NULL;
26525618 847 int verify_done = 0;
b411b363
PR
848
849 /* Remove all elements from the resync LRU. Since future actions
850 * might set bits in the (main) bitmap, then the entries in the
851 * resync LRU would be wrong. */
b30ab791 852 if (drbd_rs_del_all(device)) {
b411b363
PR
853 /* In case this is not possible now, most probably because
854 * there are P_RS_DATA_REPLY Packets lingering on the worker's
855 * queue (or even the read operations for those packets
856 * is not finished by now). Retry in 100ms. */
857
20ee6390 858 schedule_timeout_interruptible(HZ / 10);
84b8c06b
AG
859 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
860 if (dw) {
861 dw->w.cb = w_resync_finished;
862 dw->device = device;
863 drbd_queue_work(&first_peer_device(device)->connection->sender_work,
864 &dw->w);
b411b363
PR
865 return 1;
866 }
84b8c06b 867 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
b411b363
PR
868 }
869
b30ab791 870 dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
b411b363
PR
871 if (dt <= 0)
872 dt = 1;
84b8c06b 873
b30ab791 874 db = device->rs_total;
58ffa580 875 /* adjust for verify start and stop sectors, respective reached position */
b30ab791
AG
876 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
877 db -= device->ov_left;
58ffa580 878
b411b363 879 dbdt = Bit2KB(db/dt);
b30ab791 880 device->rs_paused /= HZ;
b411b363 881
b30ab791 882 if (!get_ldev(device))
b411b363
PR
883 goto out;
884
b30ab791 885 ping_peer(device);
af85e8e8 886
0500813f 887 spin_lock_irq(&device->resource->req_lock);
b30ab791 888 os = drbd_read_state(device);
b411b363 889
26525618
LE
890 verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
891
b411b363
PR
892 /* This protects us against multiple calls (that can happen in the presence
893 of application IO), and against connectivity loss just before we arrive here. */
894 if (os.conn <= C_CONNECTED)
895 goto out_unlock;
896
897 ns = os;
898 ns.conn = C_CONNECTED;
899
d0180171 900 drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
58ffa580 901 verify_done ? "Online verify" : "Resync",
b30ab791 902 dt + device->rs_paused, device->rs_paused, dbdt);
b411b363 903
b30ab791 904 n_oos = drbd_bm_total_weight(device);
b411b363
PR
905
906 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
907 if (n_oos) {
d0180171 908 drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
b411b363
PR
909 n_oos, Bit2KB(1));
910 khelper_cmd = "out-of-sync";
911 }
912 } else {
0b0ba1ef 913 D_ASSERT(device, (n_oos - device->rs_failed) == 0);
b411b363
PR
914
915 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
916 khelper_cmd = "after-resync-target";
917
aaaba345 918 if (device->use_csums && device->rs_total) {
b30ab791
AG
919 const unsigned long s = device->rs_same_csum;
920 const unsigned long t = device->rs_total;
b411b363
PR
921 const int ratio =
922 (t == 0) ? 0 :
923 (t < 100000) ? ((s*100)/t) : (s/(t/100));
d0180171 924 drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
b411b363
PR
925 "transferred %luK total %luK\n",
926 ratio,
b30ab791
AG
927 Bit2KB(device->rs_same_csum),
928 Bit2KB(device->rs_total - device->rs_same_csum),
929 Bit2KB(device->rs_total));
b411b363
PR
930 }
931 }
932
b30ab791 933 if (device->rs_failed) {
d0180171 934 drbd_info(device, " %lu failed blocks\n", device->rs_failed);
b411b363
PR
935
936 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
937 ns.disk = D_INCONSISTENT;
938 ns.pdsk = D_UP_TO_DATE;
939 } else {
940 ns.disk = D_UP_TO_DATE;
941 ns.pdsk = D_INCONSISTENT;
942 }
943 } else {
944 ns.disk = D_UP_TO_DATE;
945 ns.pdsk = D_UP_TO_DATE;
946
947 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
b30ab791 948 if (device->p_uuid) {
b411b363
PR
949 int i;
950 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
b30ab791
AG
951 _drbd_uuid_set(device, i, device->p_uuid[i]);
952 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
953 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
b411b363 954 } else {
d0180171 955 drbd_err(device, "device->p_uuid is NULL! BUG\n");
b411b363
PR
956 }
957 }
958
62b0da3a
LE
959 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
960 /* for verify runs, we don't update uuids here,
961 * so there would be nothing to report. */
b30ab791
AG
962 drbd_uuid_set_bm(device, 0UL);
963 drbd_print_uuids(device, "updated UUIDs");
964 if (device->p_uuid) {
62b0da3a
LE
965 /* Now the two UUID sets are equal, update what we
966 * know of the peer. */
967 int i;
968 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
b30ab791 969 device->p_uuid[i] = device->ldev->md.uuid[i];
62b0da3a 970 }
b411b363
PR
971 }
972 }
973
b30ab791 974 _drbd_set_state(device, ns, CS_VERBOSE, NULL);
b411b363 975out_unlock:
0500813f 976 spin_unlock_irq(&device->resource->req_lock);
b30ab791 977 put_ldev(device);
b411b363 978out:
b30ab791
AG
979 device->rs_total = 0;
980 device->rs_failed = 0;
981 device->rs_paused = 0;
58ffa580
LE
982
983 /* reset start sector, if we reached end of device */
b30ab791
AG
984 if (verify_done && device->ov_left == 0)
985 device->ov_start_sector = 0;
b411b363 986
b30ab791 987 drbd_md_sync(device);
13d42685 988
b411b363 989 if (khelper_cmd)
b30ab791 990 drbd_khelper(device, khelper_cmd);
b411b363
PR
991
992 return 1;
993}
994
995/* helper */
b30ab791 996static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
b411b363 997{
045417f7 998 if (drbd_peer_req_has_active_page(peer_req)) {
b411b363 999 /* This might happen if sendpage() has not finished */
db830c46 1000 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
b30ab791
AG
1001 atomic_add(i, &device->pp_in_use_by_net);
1002 atomic_sub(i, &device->pp_in_use);
0500813f 1003 spin_lock_irq(&device->resource->req_lock);
a8cd15ba 1004 list_add_tail(&peer_req->w.list, &device->net_ee);
0500813f 1005 spin_unlock_irq(&device->resource->req_lock);
435f0740 1006 wake_up(&drbd_pp_wait);
b411b363 1007 } else
b30ab791 1008 drbd_free_peer_req(device, peer_req);
b411b363
PR
1009}
1010
1011/**
1012 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
b30ab791 1013 * @device: DRBD device.
b411b363
PR
1014 * @w: work object.
1015 * @cancel: The connection will be closed anyways
1016 */
99920dc5 1017int w_e_end_data_req(struct drbd_work *w, int cancel)
b411b363 1018{
a8cd15ba 1019 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
6780139c
AG
1020 struct drbd_peer_device *peer_device = peer_req->peer_device;
1021 struct drbd_device *device = peer_device->device;
99920dc5 1022 int err;
b411b363
PR
1023
1024 if (unlikely(cancel)) {
b30ab791
AG
1025 drbd_free_peer_req(device, peer_req);
1026 dec_unacked(device);
99920dc5 1027 return 0;
b411b363
PR
1028 }
1029
db830c46 1030 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
6780139c 1031 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
b411b363
PR
1032 } else {
1033 if (__ratelimit(&drbd_ratelimit_state))
d0180171 1034 drbd_err(device, "Sending NegDReply. sector=%llus.\n",
db830c46 1035 (unsigned long long)peer_req->i.sector);
b411b363 1036
6780139c 1037 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
b411b363
PR
1038 }
1039
b30ab791 1040 dec_unacked(device);
b411b363 1041
b30ab791 1042 move_to_net_ee_or_free(device, peer_req);
b411b363 1043
99920dc5 1044 if (unlikely(err))
d0180171 1045 drbd_err(device, "drbd_send_block() failed\n");
99920dc5 1046 return err;
b411b363
PR
1047}
1048
1049/**
a209b4ae 1050 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
b411b363
PR
1051 * @w: work object.
1052 * @cancel: The connection will be closed anyways
1053 */
99920dc5 1054int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
b411b363 1055{
a8cd15ba 1056 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
6780139c
AG
1057 struct drbd_peer_device *peer_device = peer_req->peer_device;
1058 struct drbd_device *device = peer_device->device;
99920dc5 1059 int err;
b411b363
PR
1060
1061 if (unlikely(cancel)) {
b30ab791
AG
1062 drbd_free_peer_req(device, peer_req);
1063 dec_unacked(device);
99920dc5 1064 return 0;
b411b363
PR
1065 }
1066
b30ab791
AG
1067 if (get_ldev_if_state(device, D_FAILED)) {
1068 drbd_rs_complete_io(device, peer_req->i.sector);
1069 put_ldev(device);
b411b363
PR
1070 }
1071
b30ab791 1072 if (device->state.conn == C_AHEAD) {
6780139c 1073 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
db830c46 1074 } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
b30ab791
AG
1075 if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1076 inc_rs_pending(device);
6780139c 1077 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
b411b363
PR
1078 } else {
1079 if (__ratelimit(&drbd_ratelimit_state))
d0180171 1080 drbd_err(device, "Not sending RSDataReply, "
b411b363 1081 "partner DISKLESS!\n");
99920dc5 1082 err = 0;
b411b363
PR
1083 }
1084 } else {
1085 if (__ratelimit(&drbd_ratelimit_state))
d0180171 1086 drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
db830c46 1087 (unsigned long long)peer_req->i.sector);
b411b363 1088
6780139c 1089 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
b411b363
PR
1090
1091 /* update resync data with failure */
b30ab791 1092 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
b411b363
PR
1093 }
1094
b30ab791 1095 dec_unacked(device);
b411b363 1096
b30ab791 1097 move_to_net_ee_or_free(device, peer_req);
b411b363 1098
99920dc5 1099 if (unlikely(err))
d0180171 1100 drbd_err(device, "drbd_send_block() failed\n");
99920dc5 1101 return err;
b411b363
PR
1102}
1103
99920dc5 1104int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
b411b363 1105{
a8cd15ba 1106 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
6780139c
AG
1107 struct drbd_peer_device *peer_device = peer_req->peer_device;
1108 struct drbd_device *device = peer_device->device;
b411b363
PR
1109 struct digest_info *di;
1110 int digest_size;
1111 void *digest = NULL;
99920dc5 1112 int err, eq = 0;
b411b363
PR
1113
1114 if (unlikely(cancel)) {
b30ab791
AG
1115 drbd_free_peer_req(device, peer_req);
1116 dec_unacked(device);
99920dc5 1117 return 0;
b411b363
PR
1118 }
1119
b30ab791
AG
1120 if (get_ldev(device)) {
1121 drbd_rs_complete_io(device, peer_req->i.sector);
1122 put_ldev(device);
1d53f09e 1123 }
b411b363 1124
db830c46 1125 di = peer_req->digest;
b411b363 1126
db830c46 1127 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1128 /* quick hack to try to avoid a race against reconfiguration.
1129 * a real fix would be much more involved,
1130 * introducing more locking mechanisms */
6780139c
AG
1131 if (peer_device->connection->csums_tfm) {
1132 digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
0b0ba1ef 1133 D_ASSERT(device, digest_size == di->digest_size);
b411b363
PR
1134 digest = kmalloc(digest_size, GFP_NOIO);
1135 }
1136 if (digest) {
6780139c 1137 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
b411b363
PR
1138 eq = !memcmp(digest, di->digest, digest_size);
1139 kfree(digest);
1140 }
1141
1142 if (eq) {
b30ab791 1143 drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
676396d5 1144 /* rs_same_csums unit is BM_BLOCK_SIZE */
b30ab791 1145 device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
6780139c 1146 err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
b411b363 1147 } else {
b30ab791 1148 inc_rs_pending(device);
db830c46
AG
1149 peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1150 peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
204bba99 1151 kfree(di);
6780139c 1152 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
b411b363
PR
1153 }
1154 } else {
6780139c 1155 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
b411b363 1156 if (__ratelimit(&drbd_ratelimit_state))
d0180171 1157 drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
b411b363
PR
1158 }
1159
b30ab791
AG
1160 dec_unacked(device);
1161 move_to_net_ee_or_free(device, peer_req);
b411b363 1162
99920dc5 1163 if (unlikely(err))
d0180171 1164 drbd_err(device, "drbd_send_block/ack() failed\n");
99920dc5 1165 return err;
b411b363
PR
1166}
1167
99920dc5 1168int w_e_end_ov_req(struct drbd_work *w, int cancel)
b411b363 1169{
a8cd15ba 1170 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
6780139c
AG
1171 struct drbd_peer_device *peer_device = peer_req->peer_device;
1172 struct drbd_device *device = peer_device->device;
db830c46
AG
1173 sector_t sector = peer_req->i.sector;
1174 unsigned int size = peer_req->i.size;
b411b363
PR
1175 int digest_size;
1176 void *digest;
99920dc5 1177 int err = 0;
b411b363
PR
1178
1179 if (unlikely(cancel))
1180 goto out;
1181
6780139c 1182 digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
b411b363 1183 digest = kmalloc(digest_size, GFP_NOIO);
8f21420e 1184 if (!digest) {
99920dc5 1185 err = 1; /* terminate the connection in case the allocation failed */
8f21420e 1186 goto out;
b411b363
PR
1187 }
1188
db830c46 1189 if (likely(!(peer_req->flags & EE_WAS_ERROR)))
6780139c 1190 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
8f21420e
PR
1191 else
1192 memset(digest, 0, digest_size);
1193
53ea4331
LE
1194 /* Free e and pages before send.
1195 * In case we block on congestion, we could otherwise run into
1196 * some distributed deadlock, if the other side blocks on
1197 * congestion as well, because our receiver blocks in
c37c8ecf 1198 * drbd_alloc_pages due to pp_in_use > max_buffers. */
b30ab791 1199 drbd_free_peer_req(device, peer_req);
db830c46 1200 peer_req = NULL;
b30ab791 1201 inc_rs_pending(device);
6780139c 1202 err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
99920dc5 1203 if (err)
b30ab791 1204 dec_rs_pending(device);
8f21420e
PR
1205 kfree(digest);
1206
b411b363 1207out:
db830c46 1208 if (peer_req)
b30ab791
AG
1209 drbd_free_peer_req(device, peer_req);
1210 dec_unacked(device);
99920dc5 1211 return err;
b411b363
PR
1212}
1213
b30ab791 1214void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
b411b363 1215{
b30ab791
AG
1216 if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1217 device->ov_last_oos_size += size>>9;
b411b363 1218 } else {
b30ab791
AG
1219 device->ov_last_oos_start = sector;
1220 device->ov_last_oos_size = size>>9;
b411b363 1221 }
b30ab791 1222 drbd_set_out_of_sync(device, sector, size);
b411b363
PR
1223}
1224
99920dc5 1225int w_e_end_ov_reply(struct drbd_work *w, int cancel)
b411b363 1226{
a8cd15ba 1227 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
6780139c
AG
1228 struct drbd_peer_device *peer_device = peer_req->peer_device;
1229 struct drbd_device *device = peer_device->device;
b411b363 1230 struct digest_info *di;
b411b363 1231 void *digest;
db830c46
AG
1232 sector_t sector = peer_req->i.sector;
1233 unsigned int size = peer_req->i.size;
53ea4331 1234 int digest_size;
99920dc5 1235 int err, eq = 0;
58ffa580 1236 bool stop_sector_reached = false;
b411b363
PR
1237
1238 if (unlikely(cancel)) {
b30ab791
AG
1239 drbd_free_peer_req(device, peer_req);
1240 dec_unacked(device);
99920dc5 1241 return 0;
b411b363
PR
1242 }
1243
1244 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1245 * the resync lru has been cleaned up already */
b30ab791
AG
1246 if (get_ldev(device)) {
1247 drbd_rs_complete_io(device, peer_req->i.sector);
1248 put_ldev(device);
1d53f09e 1249 }
b411b363 1250
db830c46 1251 di = peer_req->digest;
b411b363 1252
db830c46 1253 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
6780139c 1254 digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
b411b363
PR
1255 digest = kmalloc(digest_size, GFP_NOIO);
1256 if (digest) {
6780139c 1257 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
b411b363 1258
0b0ba1ef 1259 D_ASSERT(device, digest_size == di->digest_size);
b411b363
PR
1260 eq = !memcmp(digest, di->digest, digest_size);
1261 kfree(digest);
1262 }
b411b363
PR
1263 }
1264
9676c760
LE
1265 /* Free peer_req and pages before send.
1266 * In case we block on congestion, we could otherwise run into
1267 * some distributed deadlock, if the other side blocks on
1268 * congestion as well, because our receiver blocks in
c37c8ecf 1269 * drbd_alloc_pages due to pp_in_use > max_buffers. */
b30ab791 1270 drbd_free_peer_req(device, peer_req);
b411b363 1271 if (!eq)
b30ab791 1272 drbd_ov_out_of_sync_found(device, sector, size);
b411b363 1273 else
b30ab791 1274 ov_out_of_sync_print(device);
b411b363 1275
6780139c 1276 err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
fa79abd8 1277 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
b411b363 1278
b30ab791 1279 dec_unacked(device);
b411b363 1280
b30ab791 1281 --device->ov_left;
ea5442af
LE
1282
1283 /* let's advance progress step marks only for every other megabyte */
b30ab791
AG
1284 if ((device->ov_left & 0x200) == 0x200)
1285 drbd_advance_rs_marks(device, device->ov_left);
ea5442af 1286
b30ab791
AG
1287 stop_sector_reached = verify_can_do_stop_sector(device) &&
1288 (sector + (size>>9)) >= device->ov_stop_sector;
58ffa580 1289
b30ab791
AG
1290 if (device->ov_left == 0 || stop_sector_reached) {
1291 ov_out_of_sync_print(device);
1292 drbd_resync_finished(device);
b411b363
PR
1293 }
1294
99920dc5 1295 return err;
b411b363
PR
1296}
1297
b6dd1a89
LE
1298/* FIXME
1299 * We need to track the number of pending barrier acks,
1300 * and to be able to wait for them.
1301 * See also comment in drbd_adm_attach before drbd_suspend_io.
1302 */
bde89a9e 1303static int drbd_send_barrier(struct drbd_connection *connection)
b411b363 1304{
9f5bdc33 1305 struct p_barrier *p;
b6dd1a89 1306 struct drbd_socket *sock;
b411b363 1307
bde89a9e
AG
1308 sock = &connection->data;
1309 p = conn_prepare_command(connection, sock);
9f5bdc33
AG
1310 if (!p)
1311 return -EIO;
bde89a9e 1312 p->barrier = connection->send.current_epoch_nr;
b6dd1a89 1313 p->pad = 0;
bde89a9e 1314 connection->send.current_epoch_writes = 0;
b6dd1a89 1315
bde89a9e 1316 return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
b411b363
PR
1317}
1318
99920dc5 1319int w_send_write_hint(struct drbd_work *w, int cancel)
b411b363 1320{
84b8c06b
AG
1321 struct drbd_device *device =
1322 container_of(w, struct drbd_device, unplug_work);
9f5bdc33
AG
1323 struct drbd_socket *sock;
1324
b411b363 1325 if (cancel)
99920dc5 1326 return 0;
a6b32bc3 1327 sock = &first_peer_device(device)->connection->data;
69a22773 1328 if (!drbd_prepare_command(first_peer_device(device), sock))
9f5bdc33 1329 return -EIO;
69a22773 1330 return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0);
b411b363
PR
1331}
1332
bde89a9e 1333static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
4eb9b3cb 1334{
bde89a9e
AG
1335 if (!connection->send.seen_any_write_yet) {
1336 connection->send.seen_any_write_yet = true;
1337 connection->send.current_epoch_nr = epoch;
1338 connection->send.current_epoch_writes = 0;
4eb9b3cb
LE
1339 }
1340}
1341
bde89a9e 1342static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
4eb9b3cb
LE
1343{
1344 /* re-init if first write on this connection */
bde89a9e 1345 if (!connection->send.seen_any_write_yet)
4eb9b3cb 1346 return;
bde89a9e
AG
1347 if (connection->send.current_epoch_nr != epoch) {
1348 if (connection->send.current_epoch_writes)
1349 drbd_send_barrier(connection);
1350 connection->send.current_epoch_nr = epoch;
4eb9b3cb
LE
1351 }
1352}
1353
8f7bed77 1354int w_send_out_of_sync(struct drbd_work *w, int cancel)
73a01a18
PR
1355{
1356 struct drbd_request *req = container_of(w, struct drbd_request, w);
84b8c06b 1357 struct drbd_device *device = req->device;
44a4d551
LE
1358 struct drbd_peer_device *const peer_device = first_peer_device(device);
1359 struct drbd_connection *const connection = peer_device->connection;
99920dc5 1360 int err;
73a01a18
PR
1361
1362 if (unlikely(cancel)) {
8554df1c 1363 req_mod(req, SEND_CANCELED);
99920dc5 1364 return 0;
73a01a18 1365 }
e5f891b2 1366 req->pre_send_jif = jiffies;
73a01a18 1367
bde89a9e 1368 /* this time, no connection->send.current_epoch_writes++;
b6dd1a89
LE
1369 * If it was sent, it was the closing barrier for the last
1370 * replicated epoch, before we went into AHEAD mode.
1371 * No more barriers will be sent, until we leave AHEAD mode again. */
bde89a9e 1372 maybe_send_barrier(connection, req->epoch);
b6dd1a89 1373
44a4d551 1374 err = drbd_send_out_of_sync(peer_device, req);
8554df1c 1375 req_mod(req, OOS_HANDED_TO_NETWORK);
73a01a18 1376
99920dc5 1377 return err;
73a01a18
PR
1378}
1379
b411b363
PR
1380/**
1381 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
b411b363
PR
1382 * @w: work object.
1383 * @cancel: The connection will be closed anyways
1384 */
99920dc5 1385int w_send_dblock(struct drbd_work *w, int cancel)
b411b363
PR
1386{
1387 struct drbd_request *req = container_of(w, struct drbd_request, w);
84b8c06b 1388 struct drbd_device *device = req->device;
44a4d551
LE
1389 struct drbd_peer_device *const peer_device = first_peer_device(device);
1390 struct drbd_connection *connection = peer_device->connection;
99920dc5 1391 int err;
b411b363
PR
1392
1393 if (unlikely(cancel)) {
8554df1c 1394 req_mod(req, SEND_CANCELED);
99920dc5 1395 return 0;
b411b363 1396 }
e5f891b2 1397 req->pre_send_jif = jiffies;
b411b363 1398
bde89a9e
AG
1399 re_init_if_first_write(connection, req->epoch);
1400 maybe_send_barrier(connection, req->epoch);
1401 connection->send.current_epoch_writes++;
b6dd1a89 1402
44a4d551 1403 err = drbd_send_dblock(peer_device, req);
99920dc5 1404 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
b411b363 1405
99920dc5 1406 return err;
b411b363
PR
1407}
1408
1409/**
1410 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
b411b363
PR
1411 * @w: work object.
1412 * @cancel: The connection will be closed anyways
1413 */
99920dc5 1414int w_send_read_req(struct drbd_work *w, int cancel)
b411b363
PR
1415{
1416 struct drbd_request *req = container_of(w, struct drbd_request, w);
84b8c06b 1417 struct drbd_device *device = req->device;
44a4d551
LE
1418 struct drbd_peer_device *const peer_device = first_peer_device(device);
1419 struct drbd_connection *connection = peer_device->connection;
99920dc5 1420 int err;
b411b363
PR
1421
1422 if (unlikely(cancel)) {
8554df1c 1423 req_mod(req, SEND_CANCELED);
99920dc5 1424 return 0;
b411b363 1425 }
e5f891b2 1426 req->pre_send_jif = jiffies;
b411b363 1427
b6dd1a89
LE
1428 /* Even read requests may close a write epoch,
1429 * if there was any yet. */
bde89a9e 1430 maybe_send_barrier(connection, req->epoch);
b6dd1a89 1431
44a4d551 1432 err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
6c1005e7 1433 (unsigned long)req);
b411b363 1434
99920dc5 1435 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
b411b363 1436
99920dc5 1437 return err;
b411b363
PR
1438}
1439
99920dc5 1440int w_restart_disk_io(struct drbd_work *w, int cancel)
265be2d0
PR
1441{
1442 struct drbd_request *req = container_of(w, struct drbd_request, w);
84b8c06b 1443 struct drbd_device *device = req->device;
265be2d0 1444
0778286a 1445 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
4dd726f0 1446 drbd_al_begin_io(device, &req->i);
265be2d0
PR
1447
1448 drbd_req_make_private_bio(req, req->master_bio);
b30ab791 1449 req->private_bio->bi_bdev = device->ldev->backing_bdev;
265be2d0
PR
1450 generic_make_request(req->private_bio);
1451
99920dc5 1452 return 0;
265be2d0
PR
1453}
1454
b30ab791 1455static int _drbd_may_sync_now(struct drbd_device *device)
b411b363 1456{
b30ab791 1457 struct drbd_device *odev = device;
95f8efd0 1458 int resync_after;
b411b363
PR
1459
1460 while (1) {
a3f8f7dc 1461 if (!odev->ldev || odev->state.disk == D_DISKLESS)
438c8374 1462 return 1;
daeda1cc 1463 rcu_read_lock();
95f8efd0 1464 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
daeda1cc 1465 rcu_read_unlock();
95f8efd0 1466 if (resync_after == -1)
b411b363 1467 return 1;
b30ab791 1468 odev = minor_to_device(resync_after);
a3f8f7dc 1469 if (!odev)
841ce241 1470 return 1;
b411b363
PR
1471 if ((odev->state.conn >= C_SYNC_SOURCE &&
1472 odev->state.conn <= C_PAUSED_SYNC_T) ||
1473 odev->state.aftr_isp || odev->state.peer_isp ||
1474 odev->state.user_isp)
1475 return 0;
1476 }
1477}
1478
1479/**
1480 * _drbd_pause_after() - Pause resync on all devices that may not resync now
b30ab791 1481 * @device: DRBD device.
b411b363
PR
1482 *
1483 * Called from process context only (admin command and after_state_ch).
1484 */
b30ab791 1485static int _drbd_pause_after(struct drbd_device *device)
b411b363 1486{
54761697 1487 struct drbd_device *odev;
b411b363
PR
1488 int i, rv = 0;
1489
695d08fa 1490 rcu_read_lock();
05a10ec7 1491 idr_for_each_entry(&drbd_devices, odev, i) {
b411b363
PR
1492 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1493 continue;
1494 if (!_drbd_may_sync_now(odev))
1495 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1496 != SS_NOTHING_TO_DO);
1497 }
695d08fa 1498 rcu_read_unlock();
b411b363
PR
1499
1500 return rv;
1501}
1502
1503/**
1504 * _drbd_resume_next() - Resume resync on all devices that may resync now
b30ab791 1505 * @device: DRBD device.
b411b363
PR
1506 *
1507 * Called from process context only (admin command and worker).
1508 */
b30ab791 1509static int _drbd_resume_next(struct drbd_device *device)
b411b363 1510{
54761697 1511 struct drbd_device *odev;
b411b363
PR
1512 int i, rv = 0;
1513
695d08fa 1514 rcu_read_lock();
05a10ec7 1515 idr_for_each_entry(&drbd_devices, odev, i) {
b411b363
PR
1516 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1517 continue;
1518 if (odev->state.aftr_isp) {
1519 if (_drbd_may_sync_now(odev))
1520 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1521 CS_HARD, NULL)
1522 != SS_NOTHING_TO_DO) ;
1523 }
1524 }
695d08fa 1525 rcu_read_unlock();
b411b363
PR
1526 return rv;
1527}
1528
b30ab791 1529void resume_next_sg(struct drbd_device *device)
b411b363
PR
1530{
1531 write_lock_irq(&global_state_lock);
b30ab791 1532 _drbd_resume_next(device);
b411b363
PR
1533 write_unlock_irq(&global_state_lock);
1534}
1535
b30ab791 1536void suspend_other_sg(struct drbd_device *device)
b411b363
PR
1537{
1538 write_lock_irq(&global_state_lock);
b30ab791 1539 _drbd_pause_after(device);
b411b363
PR
1540 write_unlock_irq(&global_state_lock);
1541}
1542
dc97b708 1543/* caller must hold global_state_lock */
b30ab791 1544enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
b411b363 1545{
54761697 1546 struct drbd_device *odev;
95f8efd0 1547 int resync_after;
b411b363
PR
1548
1549 if (o_minor == -1)
1550 return NO_ERROR;
a3f8f7dc 1551 if (o_minor < -1 || o_minor > MINORMASK)
95f8efd0 1552 return ERR_RESYNC_AFTER;
b411b363
PR
1553
1554 /* check for loops */
b30ab791 1555 odev = minor_to_device(o_minor);
b411b363 1556 while (1) {
b30ab791 1557 if (odev == device)
95f8efd0 1558 return ERR_RESYNC_AFTER_CYCLE;
b411b363 1559
a3f8f7dc
LE
1560 /* You are free to depend on diskless, non-existing,
1561 * or not yet/no longer existing minors.
1562 * We only reject dependency loops.
1563 * We cannot follow the dependency chain beyond a detached or
1564 * missing minor.
1565 */
1566 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1567 return NO_ERROR;
1568
daeda1cc 1569 rcu_read_lock();
95f8efd0 1570 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
daeda1cc 1571 rcu_read_unlock();
b411b363 1572 /* dependency chain ends here, no cycles. */
95f8efd0 1573 if (resync_after == -1)
b411b363
PR
1574 return NO_ERROR;
1575
1576 /* follow the dependency chain */
b30ab791 1577 odev = minor_to_device(resync_after);
b411b363
PR
1578 }
1579}
1580
dc97b708 1581/* caller must hold global_state_lock */
b30ab791 1582void drbd_resync_after_changed(struct drbd_device *device)
b411b363
PR
1583{
1584 int changes;
b411b363 1585
dc97b708 1586 do {
b30ab791
AG
1587 changes = _drbd_pause_after(device);
1588 changes |= _drbd_resume_next(device);
dc97b708 1589 } while (changes);
b411b363
PR
1590}
1591
b30ab791 1592void drbd_rs_controller_reset(struct drbd_device *device)
9bd28d3c 1593{
813472ce
PR
1594 struct fifo_buffer *plan;
1595
b30ab791
AG
1596 atomic_set(&device->rs_sect_in, 0);
1597 atomic_set(&device->rs_sect_ev, 0);
1598 device->rs_in_flight = 0;
813472ce
PR
1599
1600 /* Updating the RCU protected object in place is necessary since
1601 this function gets called from atomic context.
1602 It is valid since all other updates also lead to an completely
1603 empty fifo */
1604 rcu_read_lock();
b30ab791 1605 plan = rcu_dereference(device->rs_plan_s);
813472ce
PR
1606 plan->total = 0;
1607 fifo_set(plan, 0);
1608 rcu_read_unlock();
9bd28d3c
LE
1609}
1610
1f04af33
PR
1611void start_resync_timer_fn(unsigned long data)
1612{
b30ab791 1613 struct drbd_device *device = (struct drbd_device *) data;
ac0acb9e 1614 drbd_device_post_work(device, RS_START);
1f04af33
PR
1615}
1616
ac0acb9e 1617static void do_start_resync(struct drbd_device *device)
1f04af33 1618{
b30ab791 1619 if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
ac0acb9e 1620 drbd_warn(device, "postponing start_resync ...\n");
b30ab791
AG
1621 device->start_resync_timer.expires = jiffies + HZ/10;
1622 add_timer(&device->start_resync_timer);
ac0acb9e 1623 return;
1f04af33
PR
1624 }
1625
b30ab791
AG
1626 drbd_start_resync(device, C_SYNC_SOURCE);
1627 clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1f04af33
PR
1628}
1629
aaaba345
LE
1630static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1631{
1632 bool csums_after_crash_only;
1633 rcu_read_lock();
1634 csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1635 rcu_read_unlock();
1636 return connection->agreed_pro_version >= 89 && /* supported? */
1637 connection->csums_tfm && /* configured? */
1638 (csums_after_crash_only == 0 /* use for each resync? */
1639 || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1640}
1641
b411b363
PR
1642/**
1643 * drbd_start_resync() - Start the resync process
b30ab791 1644 * @device: DRBD device.
b411b363
PR
1645 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1646 *
1647 * This function might bring you directly into one of the
1648 * C_PAUSED_SYNC_* states.
1649 */
b30ab791 1650void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
b411b363 1651{
44a4d551
LE
1652 struct drbd_peer_device *peer_device = first_peer_device(device);
1653 struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
b411b363
PR
1654 union drbd_state ns;
1655 int r;
1656
b30ab791 1657 if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
d0180171 1658 drbd_err(device, "Resync already running!\n");
b411b363
PR
1659 return;
1660 }
1661
b30ab791 1662 if (!test_bit(B_RS_H_DONE, &device->flags)) {
e64a3294
PR
1663 if (side == C_SYNC_TARGET) {
1664 /* Since application IO was locked out during C_WF_BITMAP_T and
1665 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1666 we check that we might make the data inconsistent. */
b30ab791 1667 r = drbd_khelper(device, "before-resync-target");
e64a3294
PR
1668 r = (r >> 8) & 0xff;
1669 if (r > 0) {
d0180171 1670 drbd_info(device, "before-resync-target handler returned %d, "
09b9e797 1671 "dropping connection.\n", r);
44a4d551 1672 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
09b9e797
PR
1673 return;
1674 }
e64a3294 1675 } else /* C_SYNC_SOURCE */ {
b30ab791 1676 r = drbd_khelper(device, "before-resync-source");
e64a3294
PR
1677 r = (r >> 8) & 0xff;
1678 if (r > 0) {
1679 if (r == 3) {
d0180171 1680 drbd_info(device, "before-resync-source handler returned %d, "
e64a3294
PR
1681 "ignoring. Old userland tools?", r);
1682 } else {
d0180171 1683 drbd_info(device, "before-resync-source handler returned %d, "
e64a3294 1684 "dropping connection.\n", r);
44a4d551 1685 conn_request_state(connection,
a6b32bc3 1686 NS(conn, C_DISCONNECTING), CS_HARD);
e64a3294
PR
1687 return;
1688 }
1689 }
09b9e797 1690 }
b411b363
PR
1691 }
1692
44a4d551 1693 if (current == connection->worker.task) {
dad20554 1694 /* The worker should not sleep waiting for state_mutex,
e64a3294 1695 that can take long */
b30ab791
AG
1696 if (!mutex_trylock(device->state_mutex)) {
1697 set_bit(B_RS_H_DONE, &device->flags);
1698 device->start_resync_timer.expires = jiffies + HZ/5;
1699 add_timer(&device->start_resync_timer);
e64a3294
PR
1700 return;
1701 }
1702 } else {
b30ab791 1703 mutex_lock(device->state_mutex);
e64a3294 1704 }
b30ab791 1705 clear_bit(B_RS_H_DONE, &device->flags);
b411b363 1706
074f4afe
LE
1707 /* req_lock: serialize with drbd_send_and_submit() and others
1708 * global_state_lock: for stable sync-after dependencies */
1709 spin_lock_irq(&device->resource->req_lock);
1710 write_lock(&global_state_lock);
a700471b 1711 /* Did some connection breakage or IO error race with us? */
b30ab791
AG
1712 if (device->state.conn < C_CONNECTED
1713 || !get_ldev_if_state(device, D_NEGOTIATING)) {
074f4afe
LE
1714 write_unlock(&global_state_lock);
1715 spin_unlock_irq(&device->resource->req_lock);
b30ab791 1716 mutex_unlock(device->state_mutex);
b411b363
PR
1717 return;
1718 }
1719
b30ab791 1720 ns = drbd_read_state(device);
b411b363 1721
b30ab791 1722 ns.aftr_isp = !_drbd_may_sync_now(device);
b411b363
PR
1723
1724 ns.conn = side;
1725
1726 if (side == C_SYNC_TARGET)
1727 ns.disk = D_INCONSISTENT;
1728 else /* side == C_SYNC_SOURCE */
1729 ns.pdsk = D_INCONSISTENT;
1730
b30ab791
AG
1731 r = __drbd_set_state(device, ns, CS_VERBOSE, NULL);
1732 ns = drbd_read_state(device);
b411b363
PR
1733
1734 if (ns.conn < C_CONNECTED)
1735 r = SS_UNKNOWN_ERROR;
1736
1737 if (r == SS_SUCCESS) {
b30ab791 1738 unsigned long tw = drbd_bm_total_weight(device);
1d7734a0
LE
1739 unsigned long now = jiffies;
1740 int i;
1741
b30ab791
AG
1742 device->rs_failed = 0;
1743 device->rs_paused = 0;
1744 device->rs_same_csum = 0;
1745 device->rs_last_events = 0;
1746 device->rs_last_sect_ev = 0;
1747 device->rs_total = tw;
1748 device->rs_start = now;
1d7734a0 1749 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
b30ab791
AG
1750 device->rs_mark_left[i] = tw;
1751 device->rs_mark_time[i] = now;
1d7734a0 1752 }
b30ab791 1753 _drbd_pause_after(device);
5ab7d2c0
LE
1754 /* Forget potentially stale cached per resync extent bit-counts.
1755 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1756 * disabled, and know the disk state is ok. */
1757 spin_lock(&device->al_lock);
1758 lc_reset(device->resync);
1759 device->resync_locked = 0;
1760 device->resync_wenr = LC_FREE;
1761 spin_unlock(&device->al_lock);
b411b363 1762 }
074f4afe
LE
1763 write_unlock(&global_state_lock);
1764 spin_unlock_irq(&device->resource->req_lock);
5a22db89 1765
b411b363 1766 if (r == SS_SUCCESS) {
5ab7d2c0 1767 wake_up(&device->al_wait); /* for lc_reset() above */
328e0f12
PR
1768 /* reset rs_last_bcast when a resync or verify is started,
1769 * to deal with potential jiffies wrap. */
b30ab791 1770 device->rs_last_bcast = jiffies - HZ;
328e0f12 1771
d0180171 1772 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
b411b363 1773 drbd_conn_str(ns.conn),
b30ab791
AG
1774 (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1775 (unsigned long) device->rs_total);
aaaba345 1776 if (side == C_SYNC_TARGET) {
b30ab791 1777 device->bm_resync_fo = 0;
aaaba345
LE
1778 device->use_csums = use_checksum_based_resync(connection, device);
1779 } else {
1780 device->use_csums = 0;
1781 }
6c922ed5
LE
1782
1783 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1784 * with w_send_oos, or the sync target will get confused as to
1785 * how much bits to resync. We cannot do that always, because for an
1786 * empty resync and protocol < 95, we need to do it here, as we call
1787 * drbd_resync_finished from here in that case.
1788 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1789 * and from after_state_ch otherwise. */
44a4d551
LE
1790 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1791 drbd_gen_and_send_sync_uuid(peer_device);
b411b363 1792
44a4d551 1793 if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
af85e8e8
LE
1794 /* This still has a race (about when exactly the peers
1795 * detect connection loss) that can lead to a full sync
1796 * on next handshake. In 8.3.9 we fixed this with explicit
1797 * resync-finished notifications, but the fix
1798 * introduces a protocol change. Sleeping for some
1799 * time longer than the ping interval + timeout on the
1800 * SyncSource, to give the SyncTarget the chance to
1801 * detect connection loss, then waiting for a ping
1802 * response (implicit in drbd_resync_finished) reduces
1803 * the race considerably, but does not solve it. */
44ed167d
PR
1804 if (side == C_SYNC_SOURCE) {
1805 struct net_conf *nc;
1806 int timeo;
1807
1808 rcu_read_lock();
44a4d551 1809 nc = rcu_dereference(connection->net_conf);
44ed167d
PR
1810 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1811 rcu_read_unlock();
1812 schedule_timeout_interruptible(timeo);
1813 }
b30ab791 1814 drbd_resync_finished(device);
b411b363
PR
1815 }
1816
b30ab791
AG
1817 drbd_rs_controller_reset(device);
1818 /* ns.conn may already be != device->state.conn,
b411b363
PR
1819 * we may have been paused in between, or become paused until
1820 * the timer triggers.
1821 * No matter, that is handled in resync_timer_fn() */
1822 if (ns.conn == C_SYNC_TARGET)
b30ab791 1823 mod_timer(&device->resync_timer, jiffies);
b411b363 1824
b30ab791 1825 drbd_md_sync(device);
b411b363 1826 }
b30ab791
AG
1827 put_ldev(device);
1828 mutex_unlock(device->state_mutex);
b411b363
PR
1829}
1830
e334f550 1831static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
c7a58db4
LE
1832{
1833 struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1834 device->rs_last_bcast = jiffies;
1835
1836 if (!get_ldev(device))
1837 return;
1838
1839 drbd_bm_write_lazy(device, 0);
5ab7d2c0 1840 if (resync_done && is_sync_state(device->state.conn))
c7a58db4 1841 drbd_resync_finished(device);
5ab7d2c0 1842
c7a58db4
LE
1843 drbd_bcast_event(device, &sib);
1844 /* update timestamp, in case it took a while to write out stuff */
1845 device->rs_last_bcast = jiffies;
1846 put_ldev(device);
1847}
1848
e334f550
LE
1849static void drbd_ldev_destroy(struct drbd_device *device)
1850{
1851 lc_destroy(device->resync);
1852 device->resync = NULL;
1853 lc_destroy(device->act_log);
1854 device->act_log = NULL;
1855 __no_warn(local,
1856 drbd_free_ldev(device->ldev);
1857 device->ldev = NULL;);
1858 clear_bit(GOING_DISKLESS, &device->flags);
1859 wake_up(&device->misc_wait);
1860}
1861
1862static void go_diskless(struct drbd_device *device)
1863{
1864 D_ASSERT(device, device->state.disk == D_FAILED);
1865 /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1866 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1867 * the protected members anymore, though, so once put_ldev reaches zero
1868 * again, it will be safe to free them. */
1869
1870 /* Try to write changed bitmap pages, read errors may have just
1871 * set some bits outside the area covered by the activity log.
1872 *
1873 * If we have an IO error during the bitmap writeout,
1874 * we will want a full sync next time, just in case.
1875 * (Do we want a specific meta data flag for this?)
1876 *
1877 * If that does not make it to stable storage either,
1878 * we cannot do anything about that anymore.
1879 *
1880 * We still need to check if both bitmap and ldev are present, we may
1881 * end up here after a failed attach, before ldev was even assigned.
1882 */
1883 if (device->bitmap && device->ldev) {
1884 /* An interrupted resync or similar is allowed to recounts bits
1885 * while we detach.
1886 * Any modifications would not be expected anymore, though.
1887 */
1888 if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1889 "detach", BM_LOCKED_TEST_ALLOWED)) {
1890 if (test_bit(WAS_READ_ERROR, &device->flags)) {
1891 drbd_md_set_flag(device, MDF_FULL_SYNC);
1892 drbd_md_sync(device);
1893 }
1894 }
1895 }
1896
1897 drbd_force_state(device, NS(disk, D_DISKLESS));
1898}
1899
ac0acb9e
LE
1900static int do_md_sync(struct drbd_device *device)
1901{
1902 drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1903 drbd_md_sync(device);
1904 return 0;
1905}
1906
e334f550
LE
1907#define WORK_PENDING(work_bit, todo) (todo & (1UL << work_bit))
1908static void do_device_work(struct drbd_device *device, const unsigned long todo)
1909{
ac0acb9e
LE
1910 if (WORK_PENDING(MD_SYNC, todo))
1911 do_md_sync(device);
e334f550
LE
1912 if (WORK_PENDING(RS_DONE, todo) ||
1913 WORK_PENDING(RS_PROGRESS, todo))
1914 update_on_disk_bitmap(device, WORK_PENDING(RS_DONE, todo));
1915 if (WORK_PENDING(GO_DISKLESS, todo))
1916 go_diskless(device);
1917 if (WORK_PENDING(DESTROY_DISK, todo))
1918 drbd_ldev_destroy(device);
ac0acb9e
LE
1919 if (WORK_PENDING(RS_START, todo))
1920 do_start_resync(device);
e334f550
LE
1921}
1922
1923#define DRBD_DEVICE_WORK_MASK \
1924 ((1UL << GO_DISKLESS) \
1925 |(1UL << DESTROY_DISK) \
ac0acb9e
LE
1926 |(1UL << MD_SYNC) \
1927 |(1UL << RS_START) \
e334f550
LE
1928 |(1UL << RS_PROGRESS) \
1929 |(1UL << RS_DONE) \
1930 )
1931
1932static unsigned long get_work_bits(unsigned long *flags)
1933{
1934 unsigned long old, new;
1935 do {
1936 old = *flags;
1937 new = old & ~DRBD_DEVICE_WORK_MASK;
1938 } while (cmpxchg(flags, old, new) != old);
1939 return old & DRBD_DEVICE_WORK_MASK;
1940}
1941
1942static void do_unqueued_work(struct drbd_connection *connection)
c7a58db4
LE
1943{
1944 struct drbd_peer_device *peer_device;
1945 int vnr;
1946
1947 rcu_read_lock();
1948 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1949 struct drbd_device *device = peer_device->device;
e334f550
LE
1950 unsigned long todo = get_work_bits(&device->flags);
1951 if (!todo)
c7a58db4 1952 continue;
5ab7d2c0 1953
c7a58db4
LE
1954 kref_get(&device->kref);
1955 rcu_read_unlock();
e334f550 1956 do_device_work(device, todo);
c7a58db4
LE
1957 kref_put(&device->kref, drbd_destroy_device);
1958 rcu_read_lock();
1959 }
1960 rcu_read_unlock();
1961}
1962
a186e478 1963static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
8c0785a5
LE
1964{
1965 spin_lock_irq(&queue->q_lock);
15e26f6a 1966 list_splice_tail_init(&queue->q, work_list);
8c0785a5
LE
1967 spin_unlock_irq(&queue->q_lock);
1968 return !list_empty(work_list);
1969}
1970
a186e478 1971static bool dequeue_work_item(struct drbd_work_queue *queue, struct list_head *work_list)
8c0785a5
LE
1972{
1973 spin_lock_irq(&queue->q_lock);
1974 if (!list_empty(&queue->q))
1975 list_move(queue->q.next, work_list);
1976 spin_unlock_irq(&queue->q_lock);
1977 return !list_empty(work_list);
1978}
1979
bde89a9e 1980static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
b6dd1a89
LE
1981{
1982 DEFINE_WAIT(wait);
1983 struct net_conf *nc;
1984 int uncork, cork;
1985
1986 dequeue_work_item(&connection->sender_work, work_list);
1987 if (!list_empty(work_list))
1988 return;
1989
1990 /* Still nothing to do?
1991 * Maybe we still need to close the current epoch,
1992 * even if no new requests are queued yet.
1993 *
1994 * Also, poke TCP, just in case.
1995 * Then wait for new work (or signal). */
1996 rcu_read_lock();
1997 nc = rcu_dereference(connection->net_conf);
1998 uncork = nc ? nc->tcp_cork : 0;
1999 rcu_read_unlock();
2000 if (uncork) {
2001 mutex_lock(&connection->data.mutex);
2002 if (connection->data.socket)
2003 drbd_tcp_uncork(connection->data.socket);
2004 mutex_unlock(&connection->data.mutex);
2005 }
2006
2007 for (;;) {
2008 int send_barrier;
2009 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
0500813f 2010 spin_lock_irq(&connection->resource->req_lock);
b6dd1a89 2011 spin_lock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
bc317a9e
LE
2012 /* dequeue single item only,
2013 * we still use drbd_queue_work_front() in some places */
2014 if (!list_empty(&connection->sender_work.q))
4dd726f0 2015 list_splice_tail_init(&connection->sender_work.q, work_list);
b6dd1a89
LE
2016 spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
2017 if (!list_empty(work_list) || signal_pending(current)) {
0500813f 2018 spin_unlock_irq(&connection->resource->req_lock);
b6dd1a89
LE
2019 break;
2020 }
f9c78128
LE
2021
2022 /* We found nothing new to do, no to-be-communicated request,
2023 * no other work item. We may still need to close the last
2024 * epoch. Next incoming request epoch will be connection ->
2025 * current transfer log epoch number. If that is different
2026 * from the epoch of the last request we communicated, it is
2027 * safe to send the epoch separating barrier now.
2028 */
2029 send_barrier =
2030 atomic_read(&connection->current_tle_nr) !=
2031 connection->send.current_epoch_nr;
0500813f 2032 spin_unlock_irq(&connection->resource->req_lock);
f9c78128
LE
2033
2034 if (send_barrier)
2035 maybe_send_barrier(connection,
2036 connection->send.current_epoch_nr + 1);
5ab7d2c0 2037
e334f550 2038 if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
5ab7d2c0
LE
2039 break;
2040
a80ca1ae
LE
2041 /* drbd_send() may have called flush_signals() */
2042 if (get_t_state(&connection->worker) != RUNNING)
2043 break;
5ab7d2c0 2044
b6dd1a89
LE
2045 schedule();
2046 /* may be woken up for other things but new work, too,
2047 * e.g. if the current epoch got closed.
2048 * In which case we send the barrier above. */
2049 }
2050 finish_wait(&connection->sender_work.q_wait, &wait);
2051
2052 /* someone may have changed the config while we have been waiting above. */
2053 rcu_read_lock();
2054 nc = rcu_dereference(connection->net_conf);
2055 cork = nc ? nc->tcp_cork : 0;
2056 rcu_read_unlock();
2057 mutex_lock(&connection->data.mutex);
2058 if (connection->data.socket) {
2059 if (cork)
2060 drbd_tcp_cork(connection->data.socket);
2061 else if (!uncork)
2062 drbd_tcp_uncork(connection->data.socket);
2063 }
2064 mutex_unlock(&connection->data.mutex);
2065}
2066
b411b363
PR
2067int drbd_worker(struct drbd_thread *thi)
2068{
bde89a9e 2069 struct drbd_connection *connection = thi->connection;
6db7e50a 2070 struct drbd_work *w = NULL;
c06ece6b 2071 struct drbd_peer_device *peer_device;
b411b363 2072 LIST_HEAD(work_list);
8c0785a5 2073 int vnr;
b411b363 2074
e77a0a5c 2075 while (get_t_state(thi) == RUNNING) {
80822284 2076 drbd_thread_current_set_cpu(thi);
b411b363 2077
8c0785a5 2078 if (list_empty(&work_list))
bde89a9e 2079 wait_for_work(connection, &work_list);
b411b363 2080
e334f550
LE
2081 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags))
2082 do_unqueued_work(connection);
5ab7d2c0 2083
8c0785a5 2084 if (signal_pending(current)) {
b411b363 2085 flush_signals(current);
19393e10 2086 if (get_t_state(thi) == RUNNING) {
1ec861eb 2087 drbd_warn(connection, "Worker got an unexpected signal\n");
b411b363 2088 continue;
19393e10 2089 }
b411b363
PR
2090 break;
2091 }
2092
e77a0a5c 2093 if (get_t_state(thi) != RUNNING)
b411b363 2094 break;
b411b363 2095
8c0785a5 2096 while (!list_empty(&work_list)) {
6db7e50a
AG
2097 w = list_first_entry(&work_list, struct drbd_work, list);
2098 list_del_init(&w->list);
2099 if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
8c0785a5 2100 continue;
bde89a9e
AG
2101 if (connection->cstate >= C_WF_REPORT_PARAMS)
2102 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
b411b363
PR
2103 }
2104 }
b411b363 2105
8c0785a5 2106 do {
e334f550
LE
2107 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags))
2108 do_unqueued_work(connection);
b411b363 2109 while (!list_empty(&work_list)) {
6db7e50a
AG
2110 w = list_first_entry(&work_list, struct drbd_work, list);
2111 list_del_init(&w->list);
2112 w->cb(w, 1);
b411b363 2113 }
bde89a9e 2114 dequeue_work_batch(&connection->sender_work, &work_list);
e334f550 2115 } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
b411b363 2116
c141ebda 2117 rcu_read_lock();
c06ece6b
AG
2118 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2119 struct drbd_device *device = peer_device->device;
0b0ba1ef 2120 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
b30ab791 2121 kref_get(&device->kref);
c141ebda 2122 rcu_read_unlock();
b30ab791 2123 drbd_device_cleanup(device);
05a10ec7 2124 kref_put(&device->kref, drbd_destroy_device);
c141ebda 2125 rcu_read_lock();
0e29d163 2126 }
c141ebda 2127 rcu_read_unlock();
b411b363
PR
2128
2129 return 0;
2130}