]> git.proxmox.com Git - mirror_ubuntu-artful-kernel.git/blame - drivers/block/drbd/drbd_worker.c
drbd: fix race between role change and handshake
[mirror_ubuntu-artful-kernel.git] / drivers / block / drbd / drbd_worker.c
CommitLineData
b411b363
PR
1/*
2 drbd_worker.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
84b8c06b 24*/
b411b363 25
b411b363 26#include <linux/module.h>
b411b363
PR
27#include <linux/drbd.h>
28#include <linux/sched.h>
b411b363
PR
29#include <linux/wait.h>
30#include <linux/mm.h>
31#include <linux/memcontrol.h>
32#include <linux/mm_inline.h>
33#include <linux/slab.h>
34#include <linux/random.h>
b411b363
PR
35#include <linux/string.h>
36#include <linux/scatterlist.h>
37
38#include "drbd_int.h"
a3603a6e 39#include "drbd_protocol.h"
b411b363 40#include "drbd_req.h"
b411b363 41
d448a2e1
AG
42static int make_ov_request(struct drbd_device *, int);
43static int make_resync_request(struct drbd_device *, int);
b411b363 44
c5a91619 45/* endio handlers:
ed15b795 46 * drbd_md_endio (defined here)
fcefa62e
AG
47 * drbd_request_endio (defined here)
48 * drbd_peer_request_endio (defined here)
ed15b795 49 * drbd_bm_endio (defined in drbd_bitmap.c)
c5a91619 50 *
b411b363
PR
51 * For all these callbacks, note the following:
52 * The callbacks will be called in irq context by the IDE drivers,
53 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54 * Try to get the locking right :)
55 *
56 */
57
58
59/* About the global_state_lock
60 Each state transition on an device holds a read lock. In case we have
95f8efd0 61 to evaluate the resync after dependencies, we grab a write lock, because
b411b363
PR
62 we need stable states on all devices for that. */
63rwlock_t global_state_lock;
64
65/* used for synchronous meta data and bitmap IO
66 * submitted by drbd_md_sync_page_io()
67 */
ed15b795 68void drbd_md_endio(struct bio *bio, int error)
b411b363 69{
b30ab791 70 struct drbd_device *device;
b411b363 71
e37d2438
LE
72 device = bio->bi_private;
73 device->md_io.error = error;
b411b363 74
0cfac5dd
PR
75 /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
76 * to timeout on the lower level device, and eventually detach from it.
77 * If this io completion runs after that timeout expired, this
78 * drbd_md_put_buffer() may allow us to finally try and re-attach.
79 * During normal operation, this only puts that extra reference
80 * down to 1 again.
81 * Make sure we first drop the reference, and only then signal
82 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
83 * next drbd_md_sync_page_io(), that we trigger the
b30ab791 84 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
0cfac5dd 85 */
b30ab791 86 drbd_md_put_buffer(device);
e37d2438 87 device->md_io.done = 1;
b30ab791 88 wake_up(&device->misc_wait);
cdfda633 89 bio_put(bio);
b30ab791
AG
90 if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
91 put_ldev(device);
b411b363
PR
92}
93
94/* reads on behalf of the partner,
95 * "submitted" by the receiver
96 */
a186e478 97static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
b411b363
PR
98{
99 unsigned long flags = 0;
6780139c
AG
100 struct drbd_peer_device *peer_device = peer_req->peer_device;
101 struct drbd_device *device = peer_device->device;
b411b363 102
0500813f 103 spin_lock_irqsave(&device->resource->req_lock, flags);
b30ab791 104 device->read_cnt += peer_req->i.size >> 9;
a8cd15ba 105 list_del(&peer_req->w.list);
b30ab791
AG
106 if (list_empty(&device->read_ee))
107 wake_up(&device->ee_wait);
db830c46 108 if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
b30ab791 109 __drbd_chk_io_error(device, DRBD_READ_ERROR);
0500813f 110 spin_unlock_irqrestore(&device->resource->req_lock, flags);
b411b363 111
6780139c 112 drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
b30ab791 113 put_ldev(device);
b411b363
PR
114}
115
116/* writes on behalf of the partner, or resync writes,
45bb912b 117 * "submitted" by the receiver, final stage. */
a0fb3c47 118void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
b411b363
PR
119{
120 unsigned long flags = 0;
6780139c
AG
121 struct drbd_peer_device *peer_device = peer_req->peer_device;
122 struct drbd_device *device = peer_device->device;
181286ad 123 struct drbd_interval i;
b411b363 124 int do_wake;
579b57ed 125 u64 block_id;
b411b363 126 int do_al_complete_io;
b411b363 127
db830c46 128 /* after we moved peer_req to done_ee,
b411b363
PR
129 * we may no longer access it,
130 * it may be freed/reused already!
131 * (as soon as we release the req_lock) */
181286ad 132 i = peer_req->i;
db830c46
AG
133 do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
134 block_id = peer_req->block_id;
21ae5d7f 135 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
b411b363 136
0500813f 137 spin_lock_irqsave(&device->resource->req_lock, flags);
b30ab791 138 device->writ_cnt += peer_req->i.size >> 9;
a8cd15ba 139 list_move_tail(&peer_req->w.list, &device->done_ee);
b411b363 140
bb3bfe96 141 /*
5e472264 142 * Do not remove from the write_requests tree here: we did not send the
bb3bfe96
AG
143 * Ack yet and did not wake possibly waiting conflicting requests.
144 * Removed from the tree from "drbd_process_done_ee" within the
84b8c06b 145 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
bb3bfe96
AG
146 * _drbd_clear_done_ee.
147 */
b411b363 148
b30ab791 149 do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
b411b363 150
a0fb3c47
LE
151 /* FIXME do we want to detach for failed REQ_DISCARD?
152 * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
153 if (peer_req->flags & EE_WAS_ERROR)
b30ab791 154 __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
0500813f 155 spin_unlock_irqrestore(&device->resource->req_lock, flags);
b411b363 156
579b57ed 157 if (block_id == ID_SYNCER)
b30ab791 158 drbd_rs_complete_io(device, i.sector);
b411b363
PR
159
160 if (do_wake)
b30ab791 161 wake_up(&device->ee_wait);
b411b363
PR
162
163 if (do_al_complete_io)
b30ab791 164 drbd_al_complete_io(device, &i);
b411b363 165
6780139c 166 wake_asender(peer_device->connection);
b30ab791 167 put_ldev(device);
45bb912b 168}
b411b363 169
45bb912b
LE
170/* writes on behalf of the partner, or resync writes,
171 * "submitted" by the receiver.
172 */
fcefa62e 173void drbd_peer_request_endio(struct bio *bio, int error)
45bb912b 174{
db830c46 175 struct drbd_peer_request *peer_req = bio->bi_private;
a8cd15ba 176 struct drbd_device *device = peer_req->peer_device->device;
45bb912b
LE
177 int uptodate = bio_flagged(bio, BIO_UPTODATE);
178 int is_write = bio_data_dir(bio) == WRITE;
a0fb3c47 179 int is_discard = !!(bio->bi_rw & REQ_DISCARD);
45bb912b 180
07194272 181 if (error && __ratelimit(&drbd_ratelimit_state))
d0180171 182 drbd_warn(device, "%s: error=%d s=%llus\n",
a0fb3c47
LE
183 is_write ? (is_discard ? "discard" : "write")
184 : "read", error,
db830c46 185 (unsigned long long)peer_req->i.sector);
45bb912b 186 if (!error && !uptodate) {
07194272 187 if (__ratelimit(&drbd_ratelimit_state))
d0180171 188 drbd_warn(device, "%s: setting error to -EIO s=%llus\n",
07194272 189 is_write ? "write" : "read",
db830c46 190 (unsigned long long)peer_req->i.sector);
45bb912b
LE
191 /* strange behavior of some lower level drivers...
192 * fail the request by clearing the uptodate flag,
193 * but do not return any error?! */
194 error = -EIO;
195 }
196
197 if (error)
db830c46 198 set_bit(__EE_WAS_ERROR, &peer_req->flags);
45bb912b
LE
199
200 bio_put(bio); /* no need for the bio anymore */
db830c46 201 if (atomic_dec_and_test(&peer_req->pending_bios)) {
45bb912b 202 if (is_write)
db830c46 203 drbd_endio_write_sec_final(peer_req);
45bb912b 204 else
db830c46 205 drbd_endio_read_sec_final(peer_req);
45bb912b 206 }
b411b363
PR
207}
208
209/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
210 */
fcefa62e 211void drbd_request_endio(struct bio *bio, int error)
b411b363 212{
a115413d 213 unsigned long flags;
b411b363 214 struct drbd_request *req = bio->bi_private;
84b8c06b 215 struct drbd_device *device = req->device;
a115413d 216 struct bio_and_error m;
b411b363
PR
217 enum drbd_req_event what;
218 int uptodate = bio_flagged(bio, BIO_UPTODATE);
219
b411b363 220 if (!error && !uptodate) {
d0180171 221 drbd_warn(device, "p %s: setting error to -EIO\n",
b411b363
PR
222 bio_data_dir(bio) == WRITE ? "write" : "read");
223 /* strange behavior of some lower level drivers...
224 * fail the request by clearing the uptodate flag,
225 * but do not return any error?! */
226 error = -EIO;
227 }
228
1b6dd252
PR
229
230 /* If this request was aborted locally before,
231 * but now was completed "successfully",
232 * chances are that this caused arbitrary data corruption.
233 *
234 * "aborting" requests, or force-detaching the disk, is intended for
235 * completely blocked/hung local backing devices which do no longer
236 * complete requests at all, not even do error completions. In this
237 * situation, usually a hard-reset and failover is the only way out.
238 *
239 * By "aborting", basically faking a local error-completion,
240 * we allow for a more graceful swichover by cleanly migrating services.
241 * Still the affected node has to be rebooted "soon".
242 *
243 * By completing these requests, we allow the upper layers to re-use
244 * the associated data pages.
245 *
246 * If later the local backing device "recovers", and now DMAs some data
247 * from disk into the original request pages, in the best case it will
248 * just put random data into unused pages; but typically it will corrupt
249 * meanwhile completely unrelated data, causing all sorts of damage.
250 *
251 * Which means delayed successful completion,
252 * especially for READ requests,
253 * is a reason to panic().
254 *
255 * We assume that a delayed *error* completion is OK,
256 * though we still will complain noisily about it.
257 */
258 if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
259 if (__ratelimit(&drbd_ratelimit_state))
d0180171 260 drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
1b6dd252
PR
261
262 if (!error)
263 panic("possible random memory corruption caused by delayed completion of aborted local request\n");
264 }
265
b411b363
PR
266 /* to avoid recursion in __req_mod */
267 if (unlikely(error)) {
2f632aeb
LE
268 if (bio->bi_rw & REQ_DISCARD)
269 what = (error == -EOPNOTSUPP)
270 ? DISCARD_COMPLETED_NOTSUPP
271 : DISCARD_COMPLETED_WITH_ERROR;
272 else
273 what = (bio_data_dir(bio) == WRITE)
8554df1c 274 ? WRITE_COMPLETED_WITH_ERROR
5c3c7e64 275 : (bio_rw(bio) == READ)
8554df1c
AG
276 ? READ_COMPLETED_WITH_ERROR
277 : READ_AHEAD_COMPLETED_WITH_ERROR;
b411b363 278 } else
8554df1c 279 what = COMPLETED_OK;
b411b363
PR
280
281 bio_put(req->private_bio);
282 req->private_bio = ERR_PTR(error);
283
a115413d 284 /* not req_mod(), we need irqsave here! */
0500813f 285 spin_lock_irqsave(&device->resource->req_lock, flags);
a115413d 286 __req_mod(req, what, &m);
0500813f 287 spin_unlock_irqrestore(&device->resource->req_lock, flags);
b30ab791 288 put_ldev(device);
a115413d
LE
289
290 if (m.bio)
b30ab791 291 complete_master_bio(device, &m);
b411b363
PR
292}
293
79a3c8d3 294void drbd_csum_ee(struct crypto_hash *tfm, struct drbd_peer_request *peer_req, void *digest)
45bb912b
LE
295{
296 struct hash_desc desc;
297 struct scatterlist sg;
db830c46 298 struct page *page = peer_req->pages;
45bb912b
LE
299 struct page *tmp;
300 unsigned len;
301
302 desc.tfm = tfm;
303 desc.flags = 0;
304
305 sg_init_table(&sg, 1);
306 crypto_hash_init(&desc);
307
308 while ((tmp = page_chain_next(page))) {
309 /* all but the last page will be fully used */
310 sg_set_page(&sg, page, PAGE_SIZE, 0);
311 crypto_hash_update(&desc, &sg, sg.length);
312 page = tmp;
313 }
314 /* and now the last, possibly only partially used page */
db830c46 315 len = peer_req->i.size & (PAGE_SIZE - 1);
45bb912b
LE
316 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
317 crypto_hash_update(&desc, &sg, sg.length);
318 crypto_hash_final(&desc, digest);
319}
320
79a3c8d3 321void drbd_csum_bio(struct crypto_hash *tfm, struct bio *bio, void *digest)
b411b363
PR
322{
323 struct hash_desc desc;
324 struct scatterlist sg;
7988613b
KO
325 struct bio_vec bvec;
326 struct bvec_iter iter;
b411b363
PR
327
328 desc.tfm = tfm;
329 desc.flags = 0;
330
331 sg_init_table(&sg, 1);
332 crypto_hash_init(&desc);
333
7988613b
KO
334 bio_for_each_segment(bvec, bio, iter) {
335 sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
b411b363
PR
336 crypto_hash_update(&desc, &sg, sg.length);
337 }
338 crypto_hash_final(&desc, digest);
339}
340
9676c760 341/* MAYBE merge common code with w_e_end_ov_req */
99920dc5 342static int w_e_send_csum(struct drbd_work *w, int cancel)
b411b363 343{
a8cd15ba 344 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
6780139c
AG
345 struct drbd_peer_device *peer_device = peer_req->peer_device;
346 struct drbd_device *device = peer_device->device;
b411b363
PR
347 int digest_size;
348 void *digest;
99920dc5 349 int err = 0;
b411b363 350
53ea4331
LE
351 if (unlikely(cancel))
352 goto out;
b411b363 353
9676c760 354 if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
53ea4331 355 goto out;
b411b363 356
6780139c 357 digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
53ea4331
LE
358 digest = kmalloc(digest_size, GFP_NOIO);
359 if (digest) {
db830c46
AG
360 sector_t sector = peer_req->i.sector;
361 unsigned int size = peer_req->i.size;
6780139c 362 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
9676c760 363 /* Free peer_req and pages before send.
53ea4331
LE
364 * In case we block on congestion, we could otherwise run into
365 * some distributed deadlock, if the other side blocks on
366 * congestion as well, because our receiver blocks in
c37c8ecf 367 * drbd_alloc_pages due to pp_in_use > max_buffers. */
b30ab791 368 drbd_free_peer_req(device, peer_req);
db830c46 369 peer_req = NULL;
b30ab791 370 inc_rs_pending(device);
6780139c 371 err = drbd_send_drequest_csum(peer_device, sector, size,
db1b0b72
AG
372 digest, digest_size,
373 P_CSUM_RS_REQUEST);
53ea4331
LE
374 kfree(digest);
375 } else {
d0180171 376 drbd_err(device, "kmalloc() of digest failed.\n");
99920dc5 377 err = -ENOMEM;
53ea4331 378 }
b411b363 379
53ea4331 380out:
db830c46 381 if (peer_req)
b30ab791 382 drbd_free_peer_req(device, peer_req);
b411b363 383
99920dc5 384 if (unlikely(err))
d0180171 385 drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
99920dc5 386 return err;
b411b363
PR
387}
388
389#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
390
69a22773 391static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
b411b363 392{
69a22773 393 struct drbd_device *device = peer_device->device;
db830c46 394 struct drbd_peer_request *peer_req;
b411b363 395
b30ab791 396 if (!get_ldev(device))
80a40e43 397 return -EIO;
b411b363
PR
398
399 /* GFP_TRY, because if there is no memory available right now, this may
400 * be rescheduled for later. It is "only" background resync, after all. */
69a22773 401 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
a0fb3c47 402 size, true /* has real payload */, GFP_TRY);
db830c46 403 if (!peer_req)
80a40e43 404 goto defer;
b411b363 405
a8cd15ba 406 peer_req->w.cb = w_e_send_csum;
0500813f 407 spin_lock_irq(&device->resource->req_lock);
b9ed7080 408 list_add_tail(&peer_req->w.list, &device->read_ee);
0500813f 409 spin_unlock_irq(&device->resource->req_lock);
b411b363 410
b30ab791
AG
411 atomic_add(size >> 9, &device->rs_sect_ev);
412 if (drbd_submit_peer_request(device, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
80a40e43 413 return 0;
b411b363 414
10f6d992
LE
415 /* If it failed because of ENOMEM, retry should help. If it failed
416 * because bio_add_page failed (probably broken lower level driver),
417 * retry may or may not help.
418 * If it does not, you may need to force disconnect. */
0500813f 419 spin_lock_irq(&device->resource->req_lock);
a8cd15ba 420 list_del(&peer_req->w.list);
0500813f 421 spin_unlock_irq(&device->resource->req_lock);
22cc37a9 422
b30ab791 423 drbd_free_peer_req(device, peer_req);
80a40e43 424defer:
b30ab791 425 put_ldev(device);
80a40e43 426 return -EAGAIN;
b411b363
PR
427}
428
99920dc5 429int w_resync_timer(struct drbd_work *w, int cancel)
b411b363 430{
84b8c06b
AG
431 struct drbd_device *device =
432 container_of(w, struct drbd_device, resync_work);
433
b30ab791 434 switch (device->state.conn) {
63106d3c 435 case C_VERIFY_S:
d448a2e1 436 make_ov_request(device, cancel);
63106d3c
PR
437 break;
438 case C_SYNC_TARGET:
d448a2e1 439 make_resync_request(device, cancel);
63106d3c 440 break;
b411b363
PR
441 }
442
99920dc5 443 return 0;
794abb75
PR
444}
445
446void resync_timer_fn(unsigned long data)
447{
b30ab791 448 struct drbd_device *device = (struct drbd_device *) data;
794abb75 449
15e26f6a
LE
450 drbd_queue_work_if_unqueued(
451 &first_peer_device(device)->connection->sender_work,
452 &device->resync_work);
b411b363
PR
453}
454
778f271d
PR
455static void fifo_set(struct fifo_buffer *fb, int value)
456{
457 int i;
458
459 for (i = 0; i < fb->size; i++)
f10f2623 460 fb->values[i] = value;
778f271d
PR
461}
462
463static int fifo_push(struct fifo_buffer *fb, int value)
464{
465 int ov;
466
467 ov = fb->values[fb->head_index];
468 fb->values[fb->head_index++] = value;
469
470 if (fb->head_index >= fb->size)
471 fb->head_index = 0;
472
473 return ov;
474}
475
476static void fifo_add_val(struct fifo_buffer *fb, int value)
477{
478 int i;
479
480 for (i = 0; i < fb->size; i++)
481 fb->values[i] += value;
482}
483
9958c857
PR
484struct fifo_buffer *fifo_alloc(int fifo_size)
485{
486 struct fifo_buffer *fb;
487
8747d30a 488 fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
9958c857
PR
489 if (!fb)
490 return NULL;
491
492 fb->head_index = 0;
493 fb->size = fifo_size;
494 fb->total = 0;
495
496 return fb;
497}
498
0e49d7b0 499static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
778f271d 500{
daeda1cc 501 struct disk_conf *dc;
7f34f614 502 unsigned int want; /* The number of sectors we want in-flight */
778f271d 503 int req_sect; /* Number of sectors to request in this turn */
7f34f614 504 int correction; /* Number of sectors more we need in-flight */
778f271d
PR
505 int cps; /* correction per invocation of drbd_rs_controller() */
506 int steps; /* Number of time steps to plan ahead */
507 int curr_corr;
508 int max_sect;
813472ce 509 struct fifo_buffer *plan;
778f271d 510
b30ab791
AG
511 dc = rcu_dereference(device->ldev->disk_conf);
512 plan = rcu_dereference(device->rs_plan_s);
778f271d 513
813472ce 514 steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
778f271d 515
b30ab791 516 if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
daeda1cc 517 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
778f271d 518 } else { /* normal path */
daeda1cc
PR
519 want = dc->c_fill_target ? dc->c_fill_target :
520 sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
778f271d
PR
521 }
522
b30ab791 523 correction = want - device->rs_in_flight - plan->total;
778f271d
PR
524
525 /* Plan ahead */
526 cps = correction / steps;
813472ce
PR
527 fifo_add_val(plan, cps);
528 plan->total += cps * steps;
778f271d
PR
529
530 /* What we do in this step */
813472ce
PR
531 curr_corr = fifo_push(plan, 0);
532 plan->total -= curr_corr;
778f271d
PR
533
534 req_sect = sect_in + curr_corr;
535 if (req_sect < 0)
536 req_sect = 0;
537
daeda1cc 538 max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
778f271d
PR
539 if (req_sect > max_sect)
540 req_sect = max_sect;
541
542 /*
d0180171 543 drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
b30ab791
AG
544 sect_in, device->rs_in_flight, want, correction,
545 steps, cps, device->rs_planed, curr_corr, req_sect);
778f271d
PR
546 */
547
548 return req_sect;
549}
550
b30ab791 551static int drbd_rs_number_requests(struct drbd_device *device)
e65f440d 552{
0e49d7b0
LE
553 unsigned int sect_in; /* Number of sectors that came in since the last turn */
554 int number, mxb;
555
556 sect_in = atomic_xchg(&device->rs_sect_in, 0);
557 device->rs_in_flight -= sect_in;
813472ce
PR
558
559 rcu_read_lock();
0e49d7b0 560 mxb = drbd_get_max_buffers(device) / 2;
b30ab791 561 if (rcu_dereference(device->rs_plan_s)->size) {
0e49d7b0 562 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
b30ab791 563 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
e65f440d 564 } else {
b30ab791
AG
565 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
566 number = SLEEP_TIME * device->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ);
e65f440d 567 }
813472ce 568 rcu_read_unlock();
e65f440d 569
0e49d7b0
LE
570 /* Don't have more than "max-buffers"/2 in-flight.
571 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
572 * potentially causing a distributed deadlock on congestion during
573 * online-verify or (checksum-based) resync, if max-buffers,
574 * socket buffer sizes and resync rate settings are mis-configured. */
7f34f614
LE
575
576 /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
577 * mxb (as used here, and in drbd_alloc_pages on the peer) is
578 * "number of pages" (typically also 4k),
579 * but "rs_in_flight" is in "sectors" (512 Byte). */
580 if (mxb - device->rs_in_flight/8 < number)
581 number = mxb - device->rs_in_flight/8;
0e49d7b0 582
e65f440d
LE
583 return number;
584}
585
44a4d551 586static int make_resync_request(struct drbd_device *const device, int cancel)
b411b363 587{
44a4d551
LE
588 struct drbd_peer_device *const peer_device = first_peer_device(device);
589 struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
b411b363
PR
590 unsigned long bit;
591 sector_t sector;
b30ab791 592 const sector_t capacity = drbd_get_capacity(device->this_bdev);
1816a2b4 593 int max_bio_size;
e65f440d 594 int number, rollback_i, size;
506afb62 595 int align, requeue = 0;
0f0601f4 596 int i = 0;
b411b363
PR
597
598 if (unlikely(cancel))
99920dc5 599 return 0;
b411b363 600
b30ab791 601 if (device->rs_total == 0) {
af85e8e8 602 /* empty resync? */
b30ab791 603 drbd_resync_finished(device);
99920dc5 604 return 0;
af85e8e8
LE
605 }
606
b30ab791
AG
607 if (!get_ldev(device)) {
608 /* Since we only need to access device->rsync a
609 get_ldev_if_state(device,D_FAILED) would be sufficient, but
b411b363
PR
610 to continue resync with a broken disk makes no sense at
611 all */
d0180171 612 drbd_err(device, "Disk broke down during resync!\n");
99920dc5 613 return 0;
b411b363
PR
614 }
615
b30ab791
AG
616 max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
617 number = drbd_rs_number_requests(device);
0e49d7b0 618 if (number <= 0)
0f0601f4 619 goto requeue;
b411b363 620
b411b363 621 for (i = 0; i < number; i++) {
506afb62
LE
622 /* Stop generating RS requests when half of the send buffer is filled,
623 * but notify TCP that we'd like to have more space. */
44a4d551
LE
624 mutex_lock(&connection->data.mutex);
625 if (connection->data.socket) {
506afb62
LE
626 struct sock *sk = connection->data.socket->sk;
627 int queued = sk->sk_wmem_queued;
628 int sndbuf = sk->sk_sndbuf;
629 if (queued > sndbuf / 2) {
630 requeue = 1;
631 if (sk->sk_socket)
632 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
633 }
634 } else
635 requeue = 1;
44a4d551 636 mutex_unlock(&connection->data.mutex);
506afb62 637 if (requeue)
b411b363
PR
638 goto requeue;
639
640next_sector:
641 size = BM_BLOCK_SIZE;
b30ab791 642 bit = drbd_bm_find_next(device, device->bm_resync_fo);
b411b363 643
4b0715f0 644 if (bit == DRBD_END_OF_BITMAP) {
b30ab791
AG
645 device->bm_resync_fo = drbd_bm_bits(device);
646 put_ldev(device);
99920dc5 647 return 0;
b411b363
PR
648 }
649
650 sector = BM_BIT_TO_SECT(bit);
651
ad3fee79 652 if (drbd_try_rs_begin_io(device, sector)) {
b30ab791 653 device->bm_resync_fo = bit;
b411b363
PR
654 goto requeue;
655 }
b30ab791 656 device->bm_resync_fo = bit + 1;
b411b363 657
b30ab791
AG
658 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
659 drbd_rs_complete_io(device, sector);
b411b363
PR
660 goto next_sector;
661 }
662
1816a2b4 663#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
b411b363
PR
664 /* try to find some adjacent bits.
665 * we stop if we have already the maximum req size.
666 *
667 * Additionally always align bigger requests, in order to
668 * be prepared for all stripe sizes of software RAIDs.
b411b363
PR
669 */
670 align = 1;
d207450c 671 rollback_i = i;
6377b923 672 while (i < number) {
1816a2b4 673 if (size + BM_BLOCK_SIZE > max_bio_size)
b411b363
PR
674 break;
675
676 /* Be always aligned */
677 if (sector & ((1<<(align+3))-1))
678 break;
679
680 /* do not cross extent boundaries */
681 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
682 break;
683 /* now, is it actually dirty, after all?
684 * caution, drbd_bm_test_bit is tri-state for some
685 * obscure reason; ( b == 0 ) would get the out-of-band
686 * only accidentally right because of the "oddly sized"
687 * adjustment below */
b30ab791 688 if (drbd_bm_test_bit(device, bit+1) != 1)
b411b363
PR
689 break;
690 bit++;
691 size += BM_BLOCK_SIZE;
692 if ((BM_BLOCK_SIZE << align) <= size)
693 align++;
694 i++;
695 }
696 /* if we merged some,
697 * reset the offset to start the next drbd_bm_find_next from */
698 if (size > BM_BLOCK_SIZE)
b30ab791 699 device->bm_resync_fo = bit + 1;
b411b363
PR
700#endif
701
702 /* adjust very last sectors, in case we are oddly sized */
703 if (sector + (size>>9) > capacity)
704 size = (capacity-sector)<<9;
aaaba345
LE
705
706 if (device->use_csums) {
44a4d551 707 switch (read_for_csum(peer_device, sector, size)) {
80a40e43 708 case -EIO: /* Disk failure */
b30ab791 709 put_ldev(device);
99920dc5 710 return -EIO;
80a40e43 711 case -EAGAIN: /* allocation failed, or ldev busy */
b30ab791
AG
712 drbd_rs_complete_io(device, sector);
713 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
d207450c 714 i = rollback_i;
b411b363 715 goto requeue;
80a40e43
LE
716 case 0:
717 /* everything ok */
718 break;
719 default:
720 BUG();
b411b363
PR
721 }
722 } else {
99920dc5
AG
723 int err;
724
b30ab791 725 inc_rs_pending(device);
44a4d551 726 err = drbd_send_drequest(peer_device, P_RS_DATA_REQUEST,
99920dc5
AG
727 sector, size, ID_SYNCER);
728 if (err) {
d0180171 729 drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
b30ab791
AG
730 dec_rs_pending(device);
731 put_ldev(device);
99920dc5 732 return err;
b411b363
PR
733 }
734 }
735 }
736
b30ab791 737 if (device->bm_resync_fo >= drbd_bm_bits(device)) {
b411b363
PR
738 /* last syncer _request_ was sent,
739 * but the P_RS_DATA_REPLY not yet received. sync will end (and
740 * next sync group will resume), as soon as we receive the last
741 * resync data block, and the last bit is cleared.
742 * until then resync "work" is "inactive" ...
743 */
b30ab791 744 put_ldev(device);
99920dc5 745 return 0;
b411b363
PR
746 }
747
748 requeue:
b30ab791
AG
749 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
750 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
751 put_ldev(device);
99920dc5 752 return 0;
b411b363
PR
753}
754
d448a2e1 755static int make_ov_request(struct drbd_device *device, int cancel)
b411b363
PR
756{
757 int number, i, size;
758 sector_t sector;
b30ab791 759 const sector_t capacity = drbd_get_capacity(device->this_bdev);
58ffa580 760 bool stop_sector_reached = false;
b411b363
PR
761
762 if (unlikely(cancel))
763 return 1;
764
b30ab791 765 number = drbd_rs_number_requests(device);
b411b363 766
b30ab791 767 sector = device->ov_position;
b411b363 768 for (i = 0; i < number; i++) {
58ffa580 769 if (sector >= capacity)
b411b363 770 return 1;
58ffa580
LE
771
772 /* We check for "finished" only in the reply path:
773 * w_e_end_ov_reply().
774 * We need to send at least one request out. */
775 stop_sector_reached = i > 0
b30ab791
AG
776 && verify_can_do_stop_sector(device)
777 && sector >= device->ov_stop_sector;
58ffa580
LE
778 if (stop_sector_reached)
779 break;
b411b363
PR
780
781 size = BM_BLOCK_SIZE;
782
ad3fee79 783 if (drbd_try_rs_begin_io(device, sector)) {
b30ab791 784 device->ov_position = sector;
b411b363
PR
785 goto requeue;
786 }
787
788 if (sector + (size>>9) > capacity)
789 size = (capacity-sector)<<9;
790
b30ab791 791 inc_rs_pending(device);
69a22773 792 if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
b30ab791 793 dec_rs_pending(device);
b411b363
PR
794 return 0;
795 }
796 sector += BM_SECT_PER_BIT;
797 }
b30ab791 798 device->ov_position = sector;
b411b363
PR
799
800 requeue:
b30ab791 801 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
58ffa580 802 if (i == 0 || !stop_sector_reached)
b30ab791 803 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
b411b363
PR
804 return 1;
805}
806
99920dc5 807int w_ov_finished(struct drbd_work *w, int cancel)
b411b363 808{
84b8c06b
AG
809 struct drbd_device_work *dw =
810 container_of(w, struct drbd_device_work, w);
811 struct drbd_device *device = dw->device;
812 kfree(dw);
b30ab791
AG
813 ov_out_of_sync_print(device);
814 drbd_resync_finished(device);
b411b363 815
99920dc5 816 return 0;
b411b363
PR
817}
818
99920dc5 819static int w_resync_finished(struct drbd_work *w, int cancel)
b411b363 820{
84b8c06b
AG
821 struct drbd_device_work *dw =
822 container_of(w, struct drbd_device_work, w);
823 struct drbd_device *device = dw->device;
824 kfree(dw);
b411b363 825
b30ab791 826 drbd_resync_finished(device);
b411b363 827
99920dc5 828 return 0;
b411b363
PR
829}
830
b30ab791 831static void ping_peer(struct drbd_device *device)
af85e8e8 832{
a6b32bc3 833 struct drbd_connection *connection = first_peer_device(device)->connection;
2a67d8b9 834
bde89a9e
AG
835 clear_bit(GOT_PING_ACK, &connection->flags);
836 request_ping(connection);
837 wait_event(connection->ping_wait,
838 test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
af85e8e8
LE
839}
840
b30ab791 841int drbd_resync_finished(struct drbd_device *device)
b411b363
PR
842{
843 unsigned long db, dt, dbdt;
844 unsigned long n_oos;
845 union drbd_state os, ns;
84b8c06b 846 struct drbd_device_work *dw;
b411b363 847 char *khelper_cmd = NULL;
26525618 848 int verify_done = 0;
b411b363
PR
849
850 /* Remove all elements from the resync LRU. Since future actions
851 * might set bits in the (main) bitmap, then the entries in the
852 * resync LRU would be wrong. */
b30ab791 853 if (drbd_rs_del_all(device)) {
b411b363
PR
854 /* In case this is not possible now, most probably because
855 * there are P_RS_DATA_REPLY Packets lingering on the worker's
856 * queue (or even the read operations for those packets
857 * is not finished by now). Retry in 100ms. */
858
20ee6390 859 schedule_timeout_interruptible(HZ / 10);
84b8c06b
AG
860 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
861 if (dw) {
862 dw->w.cb = w_resync_finished;
863 dw->device = device;
864 drbd_queue_work(&first_peer_device(device)->connection->sender_work,
865 &dw->w);
b411b363
PR
866 return 1;
867 }
84b8c06b 868 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
b411b363
PR
869 }
870
b30ab791 871 dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
b411b363
PR
872 if (dt <= 0)
873 dt = 1;
84b8c06b 874
b30ab791 875 db = device->rs_total;
58ffa580 876 /* adjust for verify start and stop sectors, respective reached position */
b30ab791
AG
877 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
878 db -= device->ov_left;
58ffa580 879
b411b363 880 dbdt = Bit2KB(db/dt);
b30ab791 881 device->rs_paused /= HZ;
b411b363 882
b30ab791 883 if (!get_ldev(device))
b411b363
PR
884 goto out;
885
b30ab791 886 ping_peer(device);
af85e8e8 887
0500813f 888 spin_lock_irq(&device->resource->req_lock);
b30ab791 889 os = drbd_read_state(device);
b411b363 890
26525618
LE
891 verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
892
b411b363
PR
893 /* This protects us against multiple calls (that can happen in the presence
894 of application IO), and against connectivity loss just before we arrive here. */
895 if (os.conn <= C_CONNECTED)
896 goto out_unlock;
897
898 ns = os;
899 ns.conn = C_CONNECTED;
900
d0180171 901 drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
58ffa580 902 verify_done ? "Online verify" : "Resync",
b30ab791 903 dt + device->rs_paused, device->rs_paused, dbdt);
b411b363 904
b30ab791 905 n_oos = drbd_bm_total_weight(device);
b411b363
PR
906
907 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
908 if (n_oos) {
d0180171 909 drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
b411b363
PR
910 n_oos, Bit2KB(1));
911 khelper_cmd = "out-of-sync";
912 }
913 } else {
0b0ba1ef 914 D_ASSERT(device, (n_oos - device->rs_failed) == 0);
b411b363
PR
915
916 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
917 khelper_cmd = "after-resync-target";
918
aaaba345 919 if (device->use_csums && device->rs_total) {
b30ab791
AG
920 const unsigned long s = device->rs_same_csum;
921 const unsigned long t = device->rs_total;
b411b363
PR
922 const int ratio =
923 (t == 0) ? 0 :
924 (t < 100000) ? ((s*100)/t) : (s/(t/100));
d0180171 925 drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
b411b363
PR
926 "transferred %luK total %luK\n",
927 ratio,
b30ab791
AG
928 Bit2KB(device->rs_same_csum),
929 Bit2KB(device->rs_total - device->rs_same_csum),
930 Bit2KB(device->rs_total));
b411b363
PR
931 }
932 }
933
b30ab791 934 if (device->rs_failed) {
d0180171 935 drbd_info(device, " %lu failed blocks\n", device->rs_failed);
b411b363
PR
936
937 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
938 ns.disk = D_INCONSISTENT;
939 ns.pdsk = D_UP_TO_DATE;
940 } else {
941 ns.disk = D_UP_TO_DATE;
942 ns.pdsk = D_INCONSISTENT;
943 }
944 } else {
945 ns.disk = D_UP_TO_DATE;
946 ns.pdsk = D_UP_TO_DATE;
947
948 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
b30ab791 949 if (device->p_uuid) {
b411b363
PR
950 int i;
951 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
b30ab791
AG
952 _drbd_uuid_set(device, i, device->p_uuid[i]);
953 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
954 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
b411b363 955 } else {
d0180171 956 drbd_err(device, "device->p_uuid is NULL! BUG\n");
b411b363
PR
957 }
958 }
959
62b0da3a
LE
960 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
961 /* for verify runs, we don't update uuids here,
962 * so there would be nothing to report. */
b30ab791
AG
963 drbd_uuid_set_bm(device, 0UL);
964 drbd_print_uuids(device, "updated UUIDs");
965 if (device->p_uuid) {
62b0da3a
LE
966 /* Now the two UUID sets are equal, update what we
967 * know of the peer. */
968 int i;
969 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
b30ab791 970 device->p_uuid[i] = device->ldev->md.uuid[i];
62b0da3a 971 }
b411b363
PR
972 }
973 }
974
b30ab791 975 _drbd_set_state(device, ns, CS_VERBOSE, NULL);
b411b363 976out_unlock:
0500813f 977 spin_unlock_irq(&device->resource->req_lock);
b30ab791 978 put_ldev(device);
b411b363 979out:
b30ab791
AG
980 device->rs_total = 0;
981 device->rs_failed = 0;
982 device->rs_paused = 0;
58ffa580
LE
983
984 /* reset start sector, if we reached end of device */
b30ab791
AG
985 if (verify_done && device->ov_left == 0)
986 device->ov_start_sector = 0;
b411b363 987
b30ab791 988 drbd_md_sync(device);
13d42685 989
b411b363 990 if (khelper_cmd)
b30ab791 991 drbd_khelper(device, khelper_cmd);
b411b363
PR
992
993 return 1;
994}
995
996/* helper */
b30ab791 997static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
b411b363 998{
045417f7 999 if (drbd_peer_req_has_active_page(peer_req)) {
b411b363 1000 /* This might happen if sendpage() has not finished */
db830c46 1001 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
b30ab791
AG
1002 atomic_add(i, &device->pp_in_use_by_net);
1003 atomic_sub(i, &device->pp_in_use);
0500813f 1004 spin_lock_irq(&device->resource->req_lock);
a8cd15ba 1005 list_add_tail(&peer_req->w.list, &device->net_ee);
0500813f 1006 spin_unlock_irq(&device->resource->req_lock);
435f0740 1007 wake_up(&drbd_pp_wait);
b411b363 1008 } else
b30ab791 1009 drbd_free_peer_req(device, peer_req);
b411b363
PR
1010}
1011
1012/**
1013 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
b30ab791 1014 * @device: DRBD device.
b411b363
PR
1015 * @w: work object.
1016 * @cancel: The connection will be closed anyways
1017 */
99920dc5 1018int w_e_end_data_req(struct drbd_work *w, int cancel)
b411b363 1019{
a8cd15ba 1020 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
6780139c
AG
1021 struct drbd_peer_device *peer_device = peer_req->peer_device;
1022 struct drbd_device *device = peer_device->device;
99920dc5 1023 int err;
b411b363
PR
1024
1025 if (unlikely(cancel)) {
b30ab791
AG
1026 drbd_free_peer_req(device, peer_req);
1027 dec_unacked(device);
99920dc5 1028 return 0;
b411b363
PR
1029 }
1030
db830c46 1031 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
6780139c 1032 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
b411b363
PR
1033 } else {
1034 if (__ratelimit(&drbd_ratelimit_state))
d0180171 1035 drbd_err(device, "Sending NegDReply. sector=%llus.\n",
db830c46 1036 (unsigned long long)peer_req->i.sector);
b411b363 1037
6780139c 1038 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
b411b363
PR
1039 }
1040
b30ab791 1041 dec_unacked(device);
b411b363 1042
b30ab791 1043 move_to_net_ee_or_free(device, peer_req);
b411b363 1044
99920dc5 1045 if (unlikely(err))
d0180171 1046 drbd_err(device, "drbd_send_block() failed\n");
99920dc5 1047 return err;
b411b363
PR
1048}
1049
1050/**
a209b4ae 1051 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
b411b363
PR
1052 * @w: work object.
1053 * @cancel: The connection will be closed anyways
1054 */
99920dc5 1055int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
b411b363 1056{
a8cd15ba 1057 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
6780139c
AG
1058 struct drbd_peer_device *peer_device = peer_req->peer_device;
1059 struct drbd_device *device = peer_device->device;
99920dc5 1060 int err;
b411b363
PR
1061
1062 if (unlikely(cancel)) {
b30ab791
AG
1063 drbd_free_peer_req(device, peer_req);
1064 dec_unacked(device);
99920dc5 1065 return 0;
b411b363
PR
1066 }
1067
b30ab791
AG
1068 if (get_ldev_if_state(device, D_FAILED)) {
1069 drbd_rs_complete_io(device, peer_req->i.sector);
1070 put_ldev(device);
b411b363
PR
1071 }
1072
b30ab791 1073 if (device->state.conn == C_AHEAD) {
6780139c 1074 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
db830c46 1075 } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
b30ab791
AG
1076 if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1077 inc_rs_pending(device);
6780139c 1078 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
b411b363
PR
1079 } else {
1080 if (__ratelimit(&drbd_ratelimit_state))
d0180171 1081 drbd_err(device, "Not sending RSDataReply, "
b411b363 1082 "partner DISKLESS!\n");
99920dc5 1083 err = 0;
b411b363
PR
1084 }
1085 } else {
1086 if (__ratelimit(&drbd_ratelimit_state))
d0180171 1087 drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
db830c46 1088 (unsigned long long)peer_req->i.sector);
b411b363 1089
6780139c 1090 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
b411b363
PR
1091
1092 /* update resync data with failure */
b30ab791 1093 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
b411b363
PR
1094 }
1095
b30ab791 1096 dec_unacked(device);
b411b363 1097
b30ab791 1098 move_to_net_ee_or_free(device, peer_req);
b411b363 1099
99920dc5 1100 if (unlikely(err))
d0180171 1101 drbd_err(device, "drbd_send_block() failed\n");
99920dc5 1102 return err;
b411b363
PR
1103}
1104
99920dc5 1105int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
b411b363 1106{
a8cd15ba 1107 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
6780139c
AG
1108 struct drbd_peer_device *peer_device = peer_req->peer_device;
1109 struct drbd_device *device = peer_device->device;
b411b363
PR
1110 struct digest_info *di;
1111 int digest_size;
1112 void *digest = NULL;
99920dc5 1113 int err, eq = 0;
b411b363
PR
1114
1115 if (unlikely(cancel)) {
b30ab791
AG
1116 drbd_free_peer_req(device, peer_req);
1117 dec_unacked(device);
99920dc5 1118 return 0;
b411b363
PR
1119 }
1120
b30ab791
AG
1121 if (get_ldev(device)) {
1122 drbd_rs_complete_io(device, peer_req->i.sector);
1123 put_ldev(device);
1d53f09e 1124 }
b411b363 1125
db830c46 1126 di = peer_req->digest;
b411b363 1127
db830c46 1128 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1129 /* quick hack to try to avoid a race against reconfiguration.
1130 * a real fix would be much more involved,
1131 * introducing more locking mechanisms */
6780139c
AG
1132 if (peer_device->connection->csums_tfm) {
1133 digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
0b0ba1ef 1134 D_ASSERT(device, digest_size == di->digest_size);
b411b363
PR
1135 digest = kmalloc(digest_size, GFP_NOIO);
1136 }
1137 if (digest) {
6780139c 1138 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
b411b363
PR
1139 eq = !memcmp(digest, di->digest, digest_size);
1140 kfree(digest);
1141 }
1142
1143 if (eq) {
b30ab791 1144 drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
676396d5 1145 /* rs_same_csums unit is BM_BLOCK_SIZE */
b30ab791 1146 device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
6780139c 1147 err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
b411b363 1148 } else {
b30ab791 1149 inc_rs_pending(device);
db830c46
AG
1150 peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1151 peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
204bba99 1152 kfree(di);
6780139c 1153 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
b411b363
PR
1154 }
1155 } else {
6780139c 1156 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
b411b363 1157 if (__ratelimit(&drbd_ratelimit_state))
d0180171 1158 drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
b411b363
PR
1159 }
1160
b30ab791
AG
1161 dec_unacked(device);
1162 move_to_net_ee_or_free(device, peer_req);
b411b363 1163
99920dc5 1164 if (unlikely(err))
d0180171 1165 drbd_err(device, "drbd_send_block/ack() failed\n");
99920dc5 1166 return err;
b411b363
PR
1167}
1168
99920dc5 1169int w_e_end_ov_req(struct drbd_work *w, int cancel)
b411b363 1170{
a8cd15ba 1171 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
6780139c
AG
1172 struct drbd_peer_device *peer_device = peer_req->peer_device;
1173 struct drbd_device *device = peer_device->device;
db830c46
AG
1174 sector_t sector = peer_req->i.sector;
1175 unsigned int size = peer_req->i.size;
b411b363
PR
1176 int digest_size;
1177 void *digest;
99920dc5 1178 int err = 0;
b411b363
PR
1179
1180 if (unlikely(cancel))
1181 goto out;
1182
6780139c 1183 digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
b411b363 1184 digest = kmalloc(digest_size, GFP_NOIO);
8f21420e 1185 if (!digest) {
99920dc5 1186 err = 1; /* terminate the connection in case the allocation failed */
8f21420e 1187 goto out;
b411b363
PR
1188 }
1189
db830c46 1190 if (likely(!(peer_req->flags & EE_WAS_ERROR)))
6780139c 1191 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
8f21420e
PR
1192 else
1193 memset(digest, 0, digest_size);
1194
53ea4331
LE
1195 /* Free e and pages before send.
1196 * In case we block on congestion, we could otherwise run into
1197 * some distributed deadlock, if the other side blocks on
1198 * congestion as well, because our receiver blocks in
c37c8ecf 1199 * drbd_alloc_pages due to pp_in_use > max_buffers. */
b30ab791 1200 drbd_free_peer_req(device, peer_req);
db830c46 1201 peer_req = NULL;
b30ab791 1202 inc_rs_pending(device);
6780139c 1203 err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
99920dc5 1204 if (err)
b30ab791 1205 dec_rs_pending(device);
8f21420e
PR
1206 kfree(digest);
1207
b411b363 1208out:
db830c46 1209 if (peer_req)
b30ab791
AG
1210 drbd_free_peer_req(device, peer_req);
1211 dec_unacked(device);
99920dc5 1212 return err;
b411b363
PR
1213}
1214
b30ab791 1215void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
b411b363 1216{
b30ab791
AG
1217 if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1218 device->ov_last_oos_size += size>>9;
b411b363 1219 } else {
b30ab791
AG
1220 device->ov_last_oos_start = sector;
1221 device->ov_last_oos_size = size>>9;
b411b363 1222 }
b30ab791 1223 drbd_set_out_of_sync(device, sector, size);
b411b363
PR
1224}
1225
99920dc5 1226int w_e_end_ov_reply(struct drbd_work *w, int cancel)
b411b363 1227{
a8cd15ba 1228 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
6780139c
AG
1229 struct drbd_peer_device *peer_device = peer_req->peer_device;
1230 struct drbd_device *device = peer_device->device;
b411b363 1231 struct digest_info *di;
b411b363 1232 void *digest;
db830c46
AG
1233 sector_t sector = peer_req->i.sector;
1234 unsigned int size = peer_req->i.size;
53ea4331 1235 int digest_size;
99920dc5 1236 int err, eq = 0;
58ffa580 1237 bool stop_sector_reached = false;
b411b363
PR
1238
1239 if (unlikely(cancel)) {
b30ab791
AG
1240 drbd_free_peer_req(device, peer_req);
1241 dec_unacked(device);
99920dc5 1242 return 0;
b411b363
PR
1243 }
1244
1245 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1246 * the resync lru has been cleaned up already */
b30ab791
AG
1247 if (get_ldev(device)) {
1248 drbd_rs_complete_io(device, peer_req->i.sector);
1249 put_ldev(device);
1d53f09e 1250 }
b411b363 1251
db830c46 1252 di = peer_req->digest;
b411b363 1253
db830c46 1254 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
6780139c 1255 digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
b411b363
PR
1256 digest = kmalloc(digest_size, GFP_NOIO);
1257 if (digest) {
6780139c 1258 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
b411b363 1259
0b0ba1ef 1260 D_ASSERT(device, digest_size == di->digest_size);
b411b363
PR
1261 eq = !memcmp(digest, di->digest, digest_size);
1262 kfree(digest);
1263 }
b411b363
PR
1264 }
1265
9676c760
LE
1266 /* Free peer_req and pages before send.
1267 * In case we block on congestion, we could otherwise run into
1268 * some distributed deadlock, if the other side blocks on
1269 * congestion as well, because our receiver blocks in
c37c8ecf 1270 * drbd_alloc_pages due to pp_in_use > max_buffers. */
b30ab791 1271 drbd_free_peer_req(device, peer_req);
b411b363 1272 if (!eq)
b30ab791 1273 drbd_ov_out_of_sync_found(device, sector, size);
b411b363 1274 else
b30ab791 1275 ov_out_of_sync_print(device);
b411b363 1276
6780139c 1277 err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
fa79abd8 1278 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
b411b363 1279
b30ab791 1280 dec_unacked(device);
b411b363 1281
b30ab791 1282 --device->ov_left;
ea5442af
LE
1283
1284 /* let's advance progress step marks only for every other megabyte */
b30ab791
AG
1285 if ((device->ov_left & 0x200) == 0x200)
1286 drbd_advance_rs_marks(device, device->ov_left);
ea5442af 1287
b30ab791
AG
1288 stop_sector_reached = verify_can_do_stop_sector(device) &&
1289 (sector + (size>>9)) >= device->ov_stop_sector;
58ffa580 1290
b30ab791
AG
1291 if (device->ov_left == 0 || stop_sector_reached) {
1292 ov_out_of_sync_print(device);
1293 drbd_resync_finished(device);
b411b363
PR
1294 }
1295
99920dc5 1296 return err;
b411b363
PR
1297}
1298
b6dd1a89
LE
1299/* FIXME
1300 * We need to track the number of pending barrier acks,
1301 * and to be able to wait for them.
1302 * See also comment in drbd_adm_attach before drbd_suspend_io.
1303 */
bde89a9e 1304static int drbd_send_barrier(struct drbd_connection *connection)
b411b363 1305{
9f5bdc33 1306 struct p_barrier *p;
b6dd1a89 1307 struct drbd_socket *sock;
b411b363 1308
bde89a9e
AG
1309 sock = &connection->data;
1310 p = conn_prepare_command(connection, sock);
9f5bdc33
AG
1311 if (!p)
1312 return -EIO;
bde89a9e 1313 p->barrier = connection->send.current_epoch_nr;
b6dd1a89 1314 p->pad = 0;
bde89a9e 1315 connection->send.current_epoch_writes = 0;
b6dd1a89 1316
bde89a9e 1317 return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
b411b363
PR
1318}
1319
99920dc5 1320int w_send_write_hint(struct drbd_work *w, int cancel)
b411b363 1321{
84b8c06b
AG
1322 struct drbd_device *device =
1323 container_of(w, struct drbd_device, unplug_work);
9f5bdc33
AG
1324 struct drbd_socket *sock;
1325
b411b363 1326 if (cancel)
99920dc5 1327 return 0;
a6b32bc3 1328 sock = &first_peer_device(device)->connection->data;
69a22773 1329 if (!drbd_prepare_command(first_peer_device(device), sock))
9f5bdc33 1330 return -EIO;
69a22773 1331 return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0);
b411b363
PR
1332}
1333
bde89a9e 1334static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
4eb9b3cb 1335{
bde89a9e
AG
1336 if (!connection->send.seen_any_write_yet) {
1337 connection->send.seen_any_write_yet = true;
1338 connection->send.current_epoch_nr = epoch;
1339 connection->send.current_epoch_writes = 0;
4eb9b3cb
LE
1340 }
1341}
1342
bde89a9e 1343static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
4eb9b3cb
LE
1344{
1345 /* re-init if first write on this connection */
bde89a9e 1346 if (!connection->send.seen_any_write_yet)
4eb9b3cb 1347 return;
bde89a9e
AG
1348 if (connection->send.current_epoch_nr != epoch) {
1349 if (connection->send.current_epoch_writes)
1350 drbd_send_barrier(connection);
1351 connection->send.current_epoch_nr = epoch;
4eb9b3cb
LE
1352 }
1353}
1354
8f7bed77 1355int w_send_out_of_sync(struct drbd_work *w, int cancel)
73a01a18
PR
1356{
1357 struct drbd_request *req = container_of(w, struct drbd_request, w);
84b8c06b 1358 struct drbd_device *device = req->device;
44a4d551
LE
1359 struct drbd_peer_device *const peer_device = first_peer_device(device);
1360 struct drbd_connection *const connection = peer_device->connection;
99920dc5 1361 int err;
73a01a18
PR
1362
1363 if (unlikely(cancel)) {
8554df1c 1364 req_mod(req, SEND_CANCELED);
99920dc5 1365 return 0;
73a01a18 1366 }
e5f891b2 1367 req->pre_send_jif = jiffies;
73a01a18 1368
bde89a9e 1369 /* this time, no connection->send.current_epoch_writes++;
b6dd1a89
LE
1370 * If it was sent, it was the closing barrier for the last
1371 * replicated epoch, before we went into AHEAD mode.
1372 * No more barriers will be sent, until we leave AHEAD mode again. */
bde89a9e 1373 maybe_send_barrier(connection, req->epoch);
b6dd1a89 1374
44a4d551 1375 err = drbd_send_out_of_sync(peer_device, req);
8554df1c 1376 req_mod(req, OOS_HANDED_TO_NETWORK);
73a01a18 1377
99920dc5 1378 return err;
73a01a18
PR
1379}
1380
b411b363
PR
1381/**
1382 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
b411b363
PR
1383 * @w: work object.
1384 * @cancel: The connection will be closed anyways
1385 */
99920dc5 1386int w_send_dblock(struct drbd_work *w, int cancel)
b411b363
PR
1387{
1388 struct drbd_request *req = container_of(w, struct drbd_request, w);
84b8c06b 1389 struct drbd_device *device = req->device;
44a4d551
LE
1390 struct drbd_peer_device *const peer_device = first_peer_device(device);
1391 struct drbd_connection *connection = peer_device->connection;
99920dc5 1392 int err;
b411b363
PR
1393
1394 if (unlikely(cancel)) {
8554df1c 1395 req_mod(req, SEND_CANCELED);
99920dc5 1396 return 0;
b411b363 1397 }
e5f891b2 1398 req->pre_send_jif = jiffies;
b411b363 1399
bde89a9e
AG
1400 re_init_if_first_write(connection, req->epoch);
1401 maybe_send_barrier(connection, req->epoch);
1402 connection->send.current_epoch_writes++;
b6dd1a89 1403
44a4d551 1404 err = drbd_send_dblock(peer_device, req);
99920dc5 1405 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
b411b363 1406
99920dc5 1407 return err;
b411b363
PR
1408}
1409
1410/**
1411 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
b411b363
PR
1412 * @w: work object.
1413 * @cancel: The connection will be closed anyways
1414 */
99920dc5 1415int w_send_read_req(struct drbd_work *w, int cancel)
b411b363
PR
1416{
1417 struct drbd_request *req = container_of(w, struct drbd_request, w);
84b8c06b 1418 struct drbd_device *device = req->device;
44a4d551
LE
1419 struct drbd_peer_device *const peer_device = first_peer_device(device);
1420 struct drbd_connection *connection = peer_device->connection;
99920dc5 1421 int err;
b411b363
PR
1422
1423 if (unlikely(cancel)) {
8554df1c 1424 req_mod(req, SEND_CANCELED);
99920dc5 1425 return 0;
b411b363 1426 }
e5f891b2 1427 req->pre_send_jif = jiffies;
b411b363 1428
b6dd1a89
LE
1429 /* Even read requests may close a write epoch,
1430 * if there was any yet. */
bde89a9e 1431 maybe_send_barrier(connection, req->epoch);
b6dd1a89 1432
44a4d551 1433 err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
6c1005e7 1434 (unsigned long)req);
b411b363 1435
99920dc5 1436 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
b411b363 1437
99920dc5 1438 return err;
b411b363
PR
1439}
1440
99920dc5 1441int w_restart_disk_io(struct drbd_work *w, int cancel)
265be2d0
PR
1442{
1443 struct drbd_request *req = container_of(w, struct drbd_request, w);
84b8c06b 1444 struct drbd_device *device = req->device;
265be2d0 1445
0778286a 1446 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
4dd726f0 1447 drbd_al_begin_io(device, &req->i);
265be2d0
PR
1448
1449 drbd_req_make_private_bio(req, req->master_bio);
b30ab791 1450 req->private_bio->bi_bdev = device->ldev->backing_bdev;
265be2d0
PR
1451 generic_make_request(req->private_bio);
1452
99920dc5 1453 return 0;
265be2d0
PR
1454}
1455
b30ab791 1456static int _drbd_may_sync_now(struct drbd_device *device)
b411b363 1457{
b30ab791 1458 struct drbd_device *odev = device;
95f8efd0 1459 int resync_after;
b411b363
PR
1460
1461 while (1) {
a3f8f7dc 1462 if (!odev->ldev || odev->state.disk == D_DISKLESS)
438c8374 1463 return 1;
daeda1cc 1464 rcu_read_lock();
95f8efd0 1465 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
daeda1cc 1466 rcu_read_unlock();
95f8efd0 1467 if (resync_after == -1)
b411b363 1468 return 1;
b30ab791 1469 odev = minor_to_device(resync_after);
a3f8f7dc 1470 if (!odev)
841ce241 1471 return 1;
b411b363
PR
1472 if ((odev->state.conn >= C_SYNC_SOURCE &&
1473 odev->state.conn <= C_PAUSED_SYNC_T) ||
1474 odev->state.aftr_isp || odev->state.peer_isp ||
1475 odev->state.user_isp)
1476 return 0;
1477 }
1478}
1479
1480/**
1481 * _drbd_pause_after() - Pause resync on all devices that may not resync now
b30ab791 1482 * @device: DRBD device.
b411b363
PR
1483 *
1484 * Called from process context only (admin command and after_state_ch).
1485 */
b30ab791 1486static int _drbd_pause_after(struct drbd_device *device)
b411b363 1487{
54761697 1488 struct drbd_device *odev;
b411b363
PR
1489 int i, rv = 0;
1490
695d08fa 1491 rcu_read_lock();
05a10ec7 1492 idr_for_each_entry(&drbd_devices, odev, i) {
b411b363
PR
1493 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1494 continue;
1495 if (!_drbd_may_sync_now(odev))
1496 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1497 != SS_NOTHING_TO_DO);
1498 }
695d08fa 1499 rcu_read_unlock();
b411b363
PR
1500
1501 return rv;
1502}
1503
1504/**
1505 * _drbd_resume_next() - Resume resync on all devices that may resync now
b30ab791 1506 * @device: DRBD device.
b411b363
PR
1507 *
1508 * Called from process context only (admin command and worker).
1509 */
b30ab791 1510static int _drbd_resume_next(struct drbd_device *device)
b411b363 1511{
54761697 1512 struct drbd_device *odev;
b411b363
PR
1513 int i, rv = 0;
1514
695d08fa 1515 rcu_read_lock();
05a10ec7 1516 idr_for_each_entry(&drbd_devices, odev, i) {
b411b363
PR
1517 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1518 continue;
1519 if (odev->state.aftr_isp) {
1520 if (_drbd_may_sync_now(odev))
1521 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1522 CS_HARD, NULL)
1523 != SS_NOTHING_TO_DO) ;
1524 }
1525 }
695d08fa 1526 rcu_read_unlock();
b411b363
PR
1527 return rv;
1528}
1529
b30ab791 1530void resume_next_sg(struct drbd_device *device)
b411b363
PR
1531{
1532 write_lock_irq(&global_state_lock);
b30ab791 1533 _drbd_resume_next(device);
b411b363
PR
1534 write_unlock_irq(&global_state_lock);
1535}
1536
b30ab791 1537void suspend_other_sg(struct drbd_device *device)
b411b363
PR
1538{
1539 write_lock_irq(&global_state_lock);
b30ab791 1540 _drbd_pause_after(device);
b411b363
PR
1541 write_unlock_irq(&global_state_lock);
1542}
1543
dc97b708 1544/* caller must hold global_state_lock */
b30ab791 1545enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
b411b363 1546{
54761697 1547 struct drbd_device *odev;
95f8efd0 1548 int resync_after;
b411b363
PR
1549
1550 if (o_minor == -1)
1551 return NO_ERROR;
a3f8f7dc 1552 if (o_minor < -1 || o_minor > MINORMASK)
95f8efd0 1553 return ERR_RESYNC_AFTER;
b411b363
PR
1554
1555 /* check for loops */
b30ab791 1556 odev = minor_to_device(o_minor);
b411b363 1557 while (1) {
b30ab791 1558 if (odev == device)
95f8efd0 1559 return ERR_RESYNC_AFTER_CYCLE;
b411b363 1560
a3f8f7dc
LE
1561 /* You are free to depend on diskless, non-existing,
1562 * or not yet/no longer existing minors.
1563 * We only reject dependency loops.
1564 * We cannot follow the dependency chain beyond a detached or
1565 * missing minor.
1566 */
1567 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1568 return NO_ERROR;
1569
daeda1cc 1570 rcu_read_lock();
95f8efd0 1571 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
daeda1cc 1572 rcu_read_unlock();
b411b363 1573 /* dependency chain ends here, no cycles. */
95f8efd0 1574 if (resync_after == -1)
b411b363
PR
1575 return NO_ERROR;
1576
1577 /* follow the dependency chain */
b30ab791 1578 odev = minor_to_device(resync_after);
b411b363
PR
1579 }
1580}
1581
dc97b708 1582/* caller must hold global_state_lock */
b30ab791 1583void drbd_resync_after_changed(struct drbd_device *device)
b411b363
PR
1584{
1585 int changes;
b411b363 1586
dc97b708 1587 do {
b30ab791
AG
1588 changes = _drbd_pause_after(device);
1589 changes |= _drbd_resume_next(device);
dc97b708 1590 } while (changes);
b411b363
PR
1591}
1592
b30ab791 1593void drbd_rs_controller_reset(struct drbd_device *device)
9bd28d3c 1594{
813472ce
PR
1595 struct fifo_buffer *plan;
1596
b30ab791
AG
1597 atomic_set(&device->rs_sect_in, 0);
1598 atomic_set(&device->rs_sect_ev, 0);
1599 device->rs_in_flight = 0;
813472ce
PR
1600
1601 /* Updating the RCU protected object in place is necessary since
1602 this function gets called from atomic context.
1603 It is valid since all other updates also lead to an completely
1604 empty fifo */
1605 rcu_read_lock();
b30ab791 1606 plan = rcu_dereference(device->rs_plan_s);
813472ce
PR
1607 plan->total = 0;
1608 fifo_set(plan, 0);
1609 rcu_read_unlock();
9bd28d3c
LE
1610}
1611
1f04af33
PR
1612void start_resync_timer_fn(unsigned long data)
1613{
b30ab791 1614 struct drbd_device *device = (struct drbd_device *) data;
ac0acb9e 1615 drbd_device_post_work(device, RS_START);
1f04af33
PR
1616}
1617
ac0acb9e 1618static void do_start_resync(struct drbd_device *device)
1f04af33 1619{
b30ab791 1620 if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
ac0acb9e 1621 drbd_warn(device, "postponing start_resync ...\n");
b30ab791
AG
1622 device->start_resync_timer.expires = jiffies + HZ/10;
1623 add_timer(&device->start_resync_timer);
ac0acb9e 1624 return;
1f04af33
PR
1625 }
1626
b30ab791
AG
1627 drbd_start_resync(device, C_SYNC_SOURCE);
1628 clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1f04af33
PR
1629}
1630
aaaba345
LE
1631static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1632{
1633 bool csums_after_crash_only;
1634 rcu_read_lock();
1635 csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1636 rcu_read_unlock();
1637 return connection->agreed_pro_version >= 89 && /* supported? */
1638 connection->csums_tfm && /* configured? */
1639 (csums_after_crash_only == 0 /* use for each resync? */
1640 || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1641}
1642
b411b363
PR
1643/**
1644 * drbd_start_resync() - Start the resync process
b30ab791 1645 * @device: DRBD device.
b411b363
PR
1646 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1647 *
1648 * This function might bring you directly into one of the
1649 * C_PAUSED_SYNC_* states.
1650 */
b30ab791 1651void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
b411b363 1652{
44a4d551
LE
1653 struct drbd_peer_device *peer_device = first_peer_device(device);
1654 struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
b411b363
PR
1655 union drbd_state ns;
1656 int r;
1657
b30ab791 1658 if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
d0180171 1659 drbd_err(device, "Resync already running!\n");
b411b363
PR
1660 return;
1661 }
1662
b30ab791 1663 if (!test_bit(B_RS_H_DONE, &device->flags)) {
e64a3294
PR
1664 if (side == C_SYNC_TARGET) {
1665 /* Since application IO was locked out during C_WF_BITMAP_T and
1666 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1667 we check that we might make the data inconsistent. */
b30ab791 1668 r = drbd_khelper(device, "before-resync-target");
e64a3294
PR
1669 r = (r >> 8) & 0xff;
1670 if (r > 0) {
d0180171 1671 drbd_info(device, "before-resync-target handler returned %d, "
09b9e797 1672 "dropping connection.\n", r);
44a4d551 1673 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
09b9e797
PR
1674 return;
1675 }
e64a3294 1676 } else /* C_SYNC_SOURCE */ {
b30ab791 1677 r = drbd_khelper(device, "before-resync-source");
e64a3294
PR
1678 r = (r >> 8) & 0xff;
1679 if (r > 0) {
1680 if (r == 3) {
d0180171 1681 drbd_info(device, "before-resync-source handler returned %d, "
e64a3294
PR
1682 "ignoring. Old userland tools?", r);
1683 } else {
d0180171 1684 drbd_info(device, "before-resync-source handler returned %d, "
e64a3294 1685 "dropping connection.\n", r);
44a4d551 1686 conn_request_state(connection,
a6b32bc3 1687 NS(conn, C_DISCONNECTING), CS_HARD);
e64a3294
PR
1688 return;
1689 }
1690 }
09b9e797 1691 }
b411b363
PR
1692 }
1693
44a4d551 1694 if (current == connection->worker.task) {
dad20554 1695 /* The worker should not sleep waiting for state_mutex,
e64a3294 1696 that can take long */
b30ab791
AG
1697 if (!mutex_trylock(device->state_mutex)) {
1698 set_bit(B_RS_H_DONE, &device->flags);
1699 device->start_resync_timer.expires = jiffies + HZ/5;
1700 add_timer(&device->start_resync_timer);
e64a3294
PR
1701 return;
1702 }
1703 } else {
b30ab791 1704 mutex_lock(device->state_mutex);
e64a3294 1705 }
b30ab791 1706 clear_bit(B_RS_H_DONE, &device->flags);
b411b363 1707
074f4afe
LE
1708 /* req_lock: serialize with drbd_send_and_submit() and others
1709 * global_state_lock: for stable sync-after dependencies */
1710 spin_lock_irq(&device->resource->req_lock);
1711 write_lock(&global_state_lock);
a700471b 1712 /* Did some connection breakage or IO error race with us? */
b30ab791
AG
1713 if (device->state.conn < C_CONNECTED
1714 || !get_ldev_if_state(device, D_NEGOTIATING)) {
074f4afe
LE
1715 write_unlock(&global_state_lock);
1716 spin_unlock_irq(&device->resource->req_lock);
b30ab791 1717 mutex_unlock(device->state_mutex);
b411b363
PR
1718 return;
1719 }
1720
b30ab791 1721 ns = drbd_read_state(device);
b411b363 1722
b30ab791 1723 ns.aftr_isp = !_drbd_may_sync_now(device);
b411b363
PR
1724
1725 ns.conn = side;
1726
1727 if (side == C_SYNC_TARGET)
1728 ns.disk = D_INCONSISTENT;
1729 else /* side == C_SYNC_SOURCE */
1730 ns.pdsk = D_INCONSISTENT;
1731
b30ab791
AG
1732 r = __drbd_set_state(device, ns, CS_VERBOSE, NULL);
1733 ns = drbd_read_state(device);
b411b363
PR
1734
1735 if (ns.conn < C_CONNECTED)
1736 r = SS_UNKNOWN_ERROR;
1737
1738 if (r == SS_SUCCESS) {
b30ab791 1739 unsigned long tw = drbd_bm_total_weight(device);
1d7734a0
LE
1740 unsigned long now = jiffies;
1741 int i;
1742
b30ab791
AG
1743 device->rs_failed = 0;
1744 device->rs_paused = 0;
1745 device->rs_same_csum = 0;
1746 device->rs_last_events = 0;
1747 device->rs_last_sect_ev = 0;
1748 device->rs_total = tw;
1749 device->rs_start = now;
1d7734a0 1750 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
b30ab791
AG
1751 device->rs_mark_left[i] = tw;
1752 device->rs_mark_time[i] = now;
1d7734a0 1753 }
b30ab791 1754 _drbd_pause_after(device);
5ab7d2c0
LE
1755 /* Forget potentially stale cached per resync extent bit-counts.
1756 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1757 * disabled, and know the disk state is ok. */
1758 spin_lock(&device->al_lock);
1759 lc_reset(device->resync);
1760 device->resync_locked = 0;
1761 device->resync_wenr = LC_FREE;
1762 spin_unlock(&device->al_lock);
b411b363 1763 }
074f4afe
LE
1764 write_unlock(&global_state_lock);
1765 spin_unlock_irq(&device->resource->req_lock);
5a22db89 1766
b411b363 1767 if (r == SS_SUCCESS) {
5ab7d2c0 1768 wake_up(&device->al_wait); /* for lc_reset() above */
328e0f12
PR
1769 /* reset rs_last_bcast when a resync or verify is started,
1770 * to deal with potential jiffies wrap. */
b30ab791 1771 device->rs_last_bcast = jiffies - HZ;
328e0f12 1772
d0180171 1773 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
b411b363 1774 drbd_conn_str(ns.conn),
b30ab791
AG
1775 (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1776 (unsigned long) device->rs_total);
aaaba345 1777 if (side == C_SYNC_TARGET) {
b30ab791 1778 device->bm_resync_fo = 0;
aaaba345
LE
1779 device->use_csums = use_checksum_based_resync(connection, device);
1780 } else {
1781 device->use_csums = 0;
1782 }
6c922ed5
LE
1783
1784 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1785 * with w_send_oos, or the sync target will get confused as to
1786 * how much bits to resync. We cannot do that always, because for an
1787 * empty resync and protocol < 95, we need to do it here, as we call
1788 * drbd_resync_finished from here in that case.
1789 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1790 * and from after_state_ch otherwise. */
44a4d551
LE
1791 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1792 drbd_gen_and_send_sync_uuid(peer_device);
b411b363 1793
44a4d551 1794 if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
af85e8e8
LE
1795 /* This still has a race (about when exactly the peers
1796 * detect connection loss) that can lead to a full sync
1797 * on next handshake. In 8.3.9 we fixed this with explicit
1798 * resync-finished notifications, but the fix
1799 * introduces a protocol change. Sleeping for some
1800 * time longer than the ping interval + timeout on the
1801 * SyncSource, to give the SyncTarget the chance to
1802 * detect connection loss, then waiting for a ping
1803 * response (implicit in drbd_resync_finished) reduces
1804 * the race considerably, but does not solve it. */
44ed167d
PR
1805 if (side == C_SYNC_SOURCE) {
1806 struct net_conf *nc;
1807 int timeo;
1808
1809 rcu_read_lock();
44a4d551 1810 nc = rcu_dereference(connection->net_conf);
44ed167d
PR
1811 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1812 rcu_read_unlock();
1813 schedule_timeout_interruptible(timeo);
1814 }
b30ab791 1815 drbd_resync_finished(device);
b411b363
PR
1816 }
1817
b30ab791
AG
1818 drbd_rs_controller_reset(device);
1819 /* ns.conn may already be != device->state.conn,
b411b363
PR
1820 * we may have been paused in between, or become paused until
1821 * the timer triggers.
1822 * No matter, that is handled in resync_timer_fn() */
1823 if (ns.conn == C_SYNC_TARGET)
b30ab791 1824 mod_timer(&device->resync_timer, jiffies);
b411b363 1825
b30ab791 1826 drbd_md_sync(device);
b411b363 1827 }
b30ab791
AG
1828 put_ldev(device);
1829 mutex_unlock(device->state_mutex);
b411b363
PR
1830}
1831
e334f550 1832static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
c7a58db4
LE
1833{
1834 struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1835 device->rs_last_bcast = jiffies;
1836
1837 if (!get_ldev(device))
1838 return;
1839
1840 drbd_bm_write_lazy(device, 0);
5ab7d2c0 1841 if (resync_done && is_sync_state(device->state.conn))
c7a58db4 1842 drbd_resync_finished(device);
5ab7d2c0 1843
c7a58db4
LE
1844 drbd_bcast_event(device, &sib);
1845 /* update timestamp, in case it took a while to write out stuff */
1846 device->rs_last_bcast = jiffies;
1847 put_ldev(device);
1848}
1849
e334f550
LE
1850static void drbd_ldev_destroy(struct drbd_device *device)
1851{
1852 lc_destroy(device->resync);
1853 device->resync = NULL;
1854 lc_destroy(device->act_log);
1855 device->act_log = NULL;
d1b80853
AG
1856
1857 __acquire(local);
1858 drbd_free_ldev(device->ldev);
1859 device->ldev = NULL;
1860 __release(local);
1861
e334f550
LE
1862 clear_bit(GOING_DISKLESS, &device->flags);
1863 wake_up(&device->misc_wait);
1864}
1865
1866static void go_diskless(struct drbd_device *device)
1867{
1868 D_ASSERT(device, device->state.disk == D_FAILED);
1869 /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1870 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1871 * the protected members anymore, though, so once put_ldev reaches zero
1872 * again, it will be safe to free them. */
1873
1874 /* Try to write changed bitmap pages, read errors may have just
1875 * set some bits outside the area covered by the activity log.
1876 *
1877 * If we have an IO error during the bitmap writeout,
1878 * we will want a full sync next time, just in case.
1879 * (Do we want a specific meta data flag for this?)
1880 *
1881 * If that does not make it to stable storage either,
1882 * we cannot do anything about that anymore.
1883 *
1884 * We still need to check if both bitmap and ldev are present, we may
1885 * end up here after a failed attach, before ldev was even assigned.
1886 */
1887 if (device->bitmap && device->ldev) {
1888 /* An interrupted resync or similar is allowed to recounts bits
1889 * while we detach.
1890 * Any modifications would not be expected anymore, though.
1891 */
1892 if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1893 "detach", BM_LOCKED_TEST_ALLOWED)) {
1894 if (test_bit(WAS_READ_ERROR, &device->flags)) {
1895 drbd_md_set_flag(device, MDF_FULL_SYNC);
1896 drbd_md_sync(device);
1897 }
1898 }
1899 }
1900
1901 drbd_force_state(device, NS(disk, D_DISKLESS));
1902}
1903
ac0acb9e
LE
1904static int do_md_sync(struct drbd_device *device)
1905{
1906 drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1907 drbd_md_sync(device);
1908 return 0;
1909}
1910
944410e9
LE
1911/* only called from drbd_worker thread, no locking */
1912void __update_timing_details(
1913 struct drbd_thread_timing_details *tdp,
1914 unsigned int *cb_nr,
1915 void *cb,
1916 const char *fn, const unsigned int line)
1917{
1918 unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
1919 struct drbd_thread_timing_details *td = tdp + i;
1920
1921 td->start_jif = jiffies;
1922 td->cb_addr = cb;
1923 td->caller_fn = fn;
1924 td->line = line;
1925 td->cb_nr = *cb_nr;
1926
1927 i = (i+1) % DRBD_THREAD_DETAILS_HIST;
1928 td = tdp + i;
1929 memset(td, 0, sizeof(*td));
1930
1931 ++(*cb_nr);
1932}
1933
e334f550
LE
1934static void do_device_work(struct drbd_device *device, const unsigned long todo)
1935{
b47a06d1 1936 if (test_bit(MD_SYNC, &todo))
ac0acb9e 1937 do_md_sync(device);
b47a06d1
AG
1938 if (test_bit(RS_DONE, &todo) ||
1939 test_bit(RS_PROGRESS, &todo))
1940 update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
1941 if (test_bit(GO_DISKLESS, &todo))
e334f550 1942 go_diskless(device);
b47a06d1 1943 if (test_bit(DESTROY_DISK, &todo))
e334f550 1944 drbd_ldev_destroy(device);
b47a06d1 1945 if (test_bit(RS_START, &todo))
ac0acb9e 1946 do_start_resync(device);
e334f550
LE
1947}
1948
1949#define DRBD_DEVICE_WORK_MASK \
1950 ((1UL << GO_DISKLESS) \
1951 |(1UL << DESTROY_DISK) \
ac0acb9e
LE
1952 |(1UL << MD_SYNC) \
1953 |(1UL << RS_START) \
e334f550
LE
1954 |(1UL << RS_PROGRESS) \
1955 |(1UL << RS_DONE) \
1956 )
1957
1958static unsigned long get_work_bits(unsigned long *flags)
1959{
1960 unsigned long old, new;
1961 do {
1962 old = *flags;
1963 new = old & ~DRBD_DEVICE_WORK_MASK;
1964 } while (cmpxchg(flags, old, new) != old);
1965 return old & DRBD_DEVICE_WORK_MASK;
1966}
1967
1968static void do_unqueued_work(struct drbd_connection *connection)
c7a58db4
LE
1969{
1970 struct drbd_peer_device *peer_device;
1971 int vnr;
1972
1973 rcu_read_lock();
1974 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1975 struct drbd_device *device = peer_device->device;
e334f550
LE
1976 unsigned long todo = get_work_bits(&device->flags);
1977 if (!todo)
c7a58db4 1978 continue;
5ab7d2c0 1979
c7a58db4
LE
1980 kref_get(&device->kref);
1981 rcu_read_unlock();
e334f550 1982 do_device_work(device, todo);
c7a58db4
LE
1983 kref_put(&device->kref, drbd_destroy_device);
1984 rcu_read_lock();
1985 }
1986 rcu_read_unlock();
1987}
1988
a186e478 1989static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
8c0785a5
LE
1990{
1991 spin_lock_irq(&queue->q_lock);
15e26f6a 1992 list_splice_tail_init(&queue->q, work_list);
8c0785a5
LE
1993 spin_unlock_irq(&queue->q_lock);
1994 return !list_empty(work_list);
1995}
1996
bde89a9e 1997static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
b6dd1a89
LE
1998{
1999 DEFINE_WAIT(wait);
2000 struct net_conf *nc;
2001 int uncork, cork;
2002
abde9cc6 2003 dequeue_work_batch(&connection->sender_work, work_list);
b6dd1a89
LE
2004 if (!list_empty(work_list))
2005 return;
2006
2007 /* Still nothing to do?
2008 * Maybe we still need to close the current epoch,
2009 * even if no new requests are queued yet.
2010 *
2011 * Also, poke TCP, just in case.
2012 * Then wait for new work (or signal). */
2013 rcu_read_lock();
2014 nc = rcu_dereference(connection->net_conf);
2015 uncork = nc ? nc->tcp_cork : 0;
2016 rcu_read_unlock();
2017 if (uncork) {
2018 mutex_lock(&connection->data.mutex);
2019 if (connection->data.socket)
2020 drbd_tcp_uncork(connection->data.socket);
2021 mutex_unlock(&connection->data.mutex);
2022 }
2023
2024 for (;;) {
2025 int send_barrier;
2026 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
0500813f 2027 spin_lock_irq(&connection->resource->req_lock);
b6dd1a89 2028 spin_lock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
bc317a9e 2029 if (!list_empty(&connection->sender_work.q))
4dd726f0 2030 list_splice_tail_init(&connection->sender_work.q, work_list);
b6dd1a89
LE
2031 spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
2032 if (!list_empty(work_list) || signal_pending(current)) {
0500813f 2033 spin_unlock_irq(&connection->resource->req_lock);
b6dd1a89
LE
2034 break;
2035 }
f9c78128
LE
2036
2037 /* We found nothing new to do, no to-be-communicated request,
2038 * no other work item. We may still need to close the last
2039 * epoch. Next incoming request epoch will be connection ->
2040 * current transfer log epoch number. If that is different
2041 * from the epoch of the last request we communicated, it is
2042 * safe to send the epoch separating barrier now.
2043 */
2044 send_barrier =
2045 atomic_read(&connection->current_tle_nr) !=
2046 connection->send.current_epoch_nr;
0500813f 2047 spin_unlock_irq(&connection->resource->req_lock);
f9c78128
LE
2048
2049 if (send_barrier)
2050 maybe_send_barrier(connection,
2051 connection->send.current_epoch_nr + 1);
5ab7d2c0 2052
e334f550 2053 if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
5ab7d2c0
LE
2054 break;
2055
a80ca1ae
LE
2056 /* drbd_send() may have called flush_signals() */
2057 if (get_t_state(&connection->worker) != RUNNING)
2058 break;
5ab7d2c0 2059
b6dd1a89
LE
2060 schedule();
2061 /* may be woken up for other things but new work, too,
2062 * e.g. if the current epoch got closed.
2063 * In which case we send the barrier above. */
2064 }
2065 finish_wait(&connection->sender_work.q_wait, &wait);
2066
2067 /* someone may have changed the config while we have been waiting above. */
2068 rcu_read_lock();
2069 nc = rcu_dereference(connection->net_conf);
2070 cork = nc ? nc->tcp_cork : 0;
2071 rcu_read_unlock();
2072 mutex_lock(&connection->data.mutex);
2073 if (connection->data.socket) {
2074 if (cork)
2075 drbd_tcp_cork(connection->data.socket);
2076 else if (!uncork)
2077 drbd_tcp_uncork(connection->data.socket);
2078 }
2079 mutex_unlock(&connection->data.mutex);
2080}
2081
b411b363
PR
2082int drbd_worker(struct drbd_thread *thi)
2083{
bde89a9e 2084 struct drbd_connection *connection = thi->connection;
6db7e50a 2085 struct drbd_work *w = NULL;
c06ece6b 2086 struct drbd_peer_device *peer_device;
b411b363 2087 LIST_HEAD(work_list);
8c0785a5 2088 int vnr;
b411b363 2089
e77a0a5c 2090 while (get_t_state(thi) == RUNNING) {
80822284 2091 drbd_thread_current_set_cpu(thi);
b411b363 2092
944410e9
LE
2093 if (list_empty(&work_list)) {
2094 update_worker_timing_details(connection, wait_for_work);
bde89a9e 2095 wait_for_work(connection, &work_list);
944410e9 2096 }
b411b363 2097
944410e9
LE
2098 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2099 update_worker_timing_details(connection, do_unqueued_work);
e334f550 2100 do_unqueued_work(connection);
944410e9 2101 }
5ab7d2c0 2102
8c0785a5 2103 if (signal_pending(current)) {
b411b363 2104 flush_signals(current);
19393e10 2105 if (get_t_state(thi) == RUNNING) {
1ec861eb 2106 drbd_warn(connection, "Worker got an unexpected signal\n");
b411b363 2107 continue;
19393e10 2108 }
b411b363
PR
2109 break;
2110 }
2111
e77a0a5c 2112 if (get_t_state(thi) != RUNNING)
b411b363 2113 break;
b411b363 2114
729e8b87 2115 if (!list_empty(&work_list)) {
6db7e50a
AG
2116 w = list_first_entry(&work_list, struct drbd_work, list);
2117 list_del_init(&w->list);
944410e9 2118 update_worker_timing_details(connection, w->cb);
6db7e50a 2119 if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
8c0785a5 2120 continue;
bde89a9e
AG
2121 if (connection->cstate >= C_WF_REPORT_PARAMS)
2122 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
b411b363
PR
2123 }
2124 }
b411b363 2125
8c0785a5 2126 do {
944410e9
LE
2127 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2128 update_worker_timing_details(connection, do_unqueued_work);
e334f550 2129 do_unqueued_work(connection);
944410e9 2130 }
729e8b87 2131 if (!list_empty(&work_list)) {
6db7e50a
AG
2132 w = list_first_entry(&work_list, struct drbd_work, list);
2133 list_del_init(&w->list);
944410e9 2134 update_worker_timing_details(connection, w->cb);
6db7e50a 2135 w->cb(w, 1);
729e8b87
LE
2136 } else
2137 dequeue_work_batch(&connection->sender_work, &work_list);
e334f550 2138 } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
b411b363 2139
c141ebda 2140 rcu_read_lock();
c06ece6b
AG
2141 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2142 struct drbd_device *device = peer_device->device;
0b0ba1ef 2143 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
b30ab791 2144 kref_get(&device->kref);
c141ebda 2145 rcu_read_unlock();
b30ab791 2146 drbd_device_cleanup(device);
05a10ec7 2147 kref_put(&device->kref, drbd_destroy_device);
c141ebda 2148 rcu_read_lock();
0e29d163 2149 }
c141ebda 2150 rcu_read_unlock();
b411b363
PR
2151
2152 return 0;
2153}