]> git.proxmox.com Git - mirror_ubuntu-artful-kernel.git/blame - drivers/block/drbd/drbd_worker.c
drbd: bump current uuid when resuming IO with diskless peer
[mirror_ubuntu-artful-kernel.git] / drivers / block / drbd / drbd_worker.c
CommitLineData
b411b363
PR
1/*
2 drbd_worker.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
84b8c06b 24*/
b411b363 25
b411b363 26#include <linux/module.h>
b411b363
PR
27#include <linux/drbd.h>
28#include <linux/sched.h>
b411b363
PR
29#include <linux/wait.h>
30#include <linux/mm.h>
31#include <linux/memcontrol.h>
32#include <linux/mm_inline.h>
33#include <linux/slab.h>
34#include <linux/random.h>
b411b363
PR
35#include <linux/string.h>
36#include <linux/scatterlist.h>
37
38#include "drbd_int.h"
a3603a6e 39#include "drbd_protocol.h"
b411b363 40#include "drbd_req.h"
b411b363 41
d448a2e1
AG
42static int make_ov_request(struct drbd_device *, int);
43static int make_resync_request(struct drbd_device *, int);
b411b363 44
c5a91619 45/* endio handlers:
ed15b795 46 * drbd_md_endio (defined here)
fcefa62e
AG
47 * drbd_request_endio (defined here)
48 * drbd_peer_request_endio (defined here)
ed15b795 49 * drbd_bm_endio (defined in drbd_bitmap.c)
c5a91619 50 *
b411b363
PR
51 * For all these callbacks, note the following:
52 * The callbacks will be called in irq context by the IDE drivers,
53 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54 * Try to get the locking right :)
55 *
56 */
57
b411b363
PR
58/* used for synchronous meta data and bitmap IO
59 * submitted by drbd_md_sync_page_io()
60 */
4246a0b6 61void drbd_md_endio(struct bio *bio)
b411b363 62{
b30ab791 63 struct drbd_device *device;
b411b363 64
e37d2438 65 device = bio->bi_private;
4246a0b6 66 device->md_io.error = bio->bi_error;
b411b363 67
0cfac5dd
PR
68 /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
69 * to timeout on the lower level device, and eventually detach from it.
70 * If this io completion runs after that timeout expired, this
71 * drbd_md_put_buffer() may allow us to finally try and re-attach.
72 * During normal operation, this only puts that extra reference
73 * down to 1 again.
74 * Make sure we first drop the reference, and only then signal
75 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
76 * next drbd_md_sync_page_io(), that we trigger the
b30ab791 77 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
0cfac5dd 78 */
b30ab791 79 drbd_md_put_buffer(device);
e37d2438 80 device->md_io.done = 1;
b30ab791 81 wake_up(&device->misc_wait);
cdfda633 82 bio_put(bio);
b30ab791
AG
83 if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
84 put_ldev(device);
b411b363
PR
85}
86
87/* reads on behalf of the partner,
88 * "submitted" by the receiver
89 */
a186e478 90static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
b411b363
PR
91{
92 unsigned long flags = 0;
6780139c
AG
93 struct drbd_peer_device *peer_device = peer_req->peer_device;
94 struct drbd_device *device = peer_device->device;
b411b363 95
0500813f 96 spin_lock_irqsave(&device->resource->req_lock, flags);
b30ab791 97 device->read_cnt += peer_req->i.size >> 9;
a8cd15ba 98 list_del(&peer_req->w.list);
b30ab791
AG
99 if (list_empty(&device->read_ee))
100 wake_up(&device->ee_wait);
db830c46 101 if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
b30ab791 102 __drbd_chk_io_error(device, DRBD_READ_ERROR);
0500813f 103 spin_unlock_irqrestore(&device->resource->req_lock, flags);
b411b363 104
6780139c 105 drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
b30ab791 106 put_ldev(device);
b411b363
PR
107}
108
109/* writes on behalf of the partner, or resync writes,
45bb912b 110 * "submitted" by the receiver, final stage. */
a0fb3c47 111void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
b411b363
PR
112{
113 unsigned long flags = 0;
6780139c
AG
114 struct drbd_peer_device *peer_device = peer_req->peer_device;
115 struct drbd_device *device = peer_device->device;
668700b4 116 struct drbd_connection *connection = peer_device->connection;
181286ad 117 struct drbd_interval i;
b411b363 118 int do_wake;
579b57ed 119 u64 block_id;
b411b363 120 int do_al_complete_io;
b411b363 121
db830c46 122 /* after we moved peer_req to done_ee,
b411b363
PR
123 * we may no longer access it,
124 * it may be freed/reused already!
125 * (as soon as we release the req_lock) */
181286ad 126 i = peer_req->i;
db830c46
AG
127 do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
128 block_id = peer_req->block_id;
21ae5d7f 129 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
b411b363 130
0500813f 131 spin_lock_irqsave(&device->resource->req_lock, flags);
b30ab791 132 device->writ_cnt += peer_req->i.size >> 9;
a8cd15ba 133 list_move_tail(&peer_req->w.list, &device->done_ee);
b411b363 134
bb3bfe96 135 /*
5e472264 136 * Do not remove from the write_requests tree here: we did not send the
bb3bfe96
AG
137 * Ack yet and did not wake possibly waiting conflicting requests.
138 * Removed from the tree from "drbd_process_done_ee" within the
84b8c06b 139 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
bb3bfe96
AG
140 * _drbd_clear_done_ee.
141 */
b411b363 142
b30ab791 143 do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
b411b363 144
a0fb3c47
LE
145 /* FIXME do we want to detach for failed REQ_DISCARD?
146 * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
147 if (peer_req->flags & EE_WAS_ERROR)
b30ab791 148 __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
668700b4
PR
149
150 if (connection->cstate >= C_WF_REPORT_PARAMS) {
151 kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
152 if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
153 kref_put(&device->kref, drbd_destroy_device);
154 }
0500813f 155 spin_unlock_irqrestore(&device->resource->req_lock, flags);
b411b363 156
579b57ed 157 if (block_id == ID_SYNCER)
b30ab791 158 drbd_rs_complete_io(device, i.sector);
b411b363
PR
159
160 if (do_wake)
b30ab791 161 wake_up(&device->ee_wait);
b411b363
PR
162
163 if (do_al_complete_io)
b30ab791 164 drbd_al_complete_io(device, &i);
b411b363 165
b30ab791 166 put_ldev(device);
45bb912b 167}
b411b363 168
45bb912b
LE
169/* writes on behalf of the partner, or resync writes,
170 * "submitted" by the receiver.
171 */
4246a0b6 172void drbd_peer_request_endio(struct bio *bio)
45bb912b 173{
db830c46 174 struct drbd_peer_request *peer_req = bio->bi_private;
a8cd15ba 175 struct drbd_device *device = peer_req->peer_device->device;
45bb912b 176 int is_write = bio_data_dir(bio) == WRITE;
bb3cc85e 177 int is_discard = !!(bio_op(bio) == REQ_OP_DISCARD);
45bb912b 178
4246a0b6 179 if (bio->bi_error && __ratelimit(&drbd_ratelimit_state))
d0180171 180 drbd_warn(device, "%s: error=%d s=%llus\n",
a0fb3c47 181 is_write ? (is_discard ? "discard" : "write")
4246a0b6 182 : "read", bio->bi_error,
db830c46 183 (unsigned long long)peer_req->i.sector);
45bb912b 184
4246a0b6 185 if (bio->bi_error)
db830c46 186 set_bit(__EE_WAS_ERROR, &peer_req->flags);
45bb912b
LE
187
188 bio_put(bio); /* no need for the bio anymore */
db830c46 189 if (atomic_dec_and_test(&peer_req->pending_bios)) {
45bb912b 190 if (is_write)
db830c46 191 drbd_endio_write_sec_final(peer_req);
45bb912b 192 else
db830c46 193 drbd_endio_read_sec_final(peer_req);
45bb912b 194 }
b411b363
PR
195}
196
142207f7
LE
197void drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
198{
199 panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
200 device->minor, device->resource->name, device->vnr);
201}
202
b411b363
PR
203/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
204 */
4246a0b6 205void drbd_request_endio(struct bio *bio)
b411b363 206{
a115413d 207 unsigned long flags;
b411b363 208 struct drbd_request *req = bio->bi_private;
84b8c06b 209 struct drbd_device *device = req->device;
a115413d 210 struct bio_and_error m;
b411b363 211 enum drbd_req_event what;
1b6dd252
PR
212
213 /* If this request was aborted locally before,
214 * but now was completed "successfully",
215 * chances are that this caused arbitrary data corruption.
216 *
217 * "aborting" requests, or force-detaching the disk, is intended for
218 * completely blocked/hung local backing devices which do no longer
219 * complete requests at all, not even do error completions. In this
220 * situation, usually a hard-reset and failover is the only way out.
221 *
222 * By "aborting", basically faking a local error-completion,
223 * we allow for a more graceful swichover by cleanly migrating services.
224 * Still the affected node has to be rebooted "soon".
225 *
226 * By completing these requests, we allow the upper layers to re-use
227 * the associated data pages.
228 *
229 * If later the local backing device "recovers", and now DMAs some data
230 * from disk into the original request pages, in the best case it will
231 * just put random data into unused pages; but typically it will corrupt
232 * meanwhile completely unrelated data, causing all sorts of damage.
233 *
234 * Which means delayed successful completion,
235 * especially for READ requests,
236 * is a reason to panic().
237 *
238 * We assume that a delayed *error* completion is OK,
239 * though we still will complain noisily about it.
240 */
241 if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
242 if (__ratelimit(&drbd_ratelimit_state))
d0180171 243 drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
1b6dd252 244
4246a0b6 245 if (!bio->bi_error)
142207f7 246 drbd_panic_after_delayed_completion_of_aborted_request(device);
1b6dd252
PR
247 }
248
b411b363 249 /* to avoid recursion in __req_mod */
4246a0b6 250 if (unlikely(bio->bi_error)) {
bb3cc85e 251 if (bio_op(bio) == REQ_OP_DISCARD)
4246a0b6 252 what = (bio->bi_error == -EOPNOTSUPP)
2f632aeb
LE
253 ? DISCARD_COMPLETED_NOTSUPP
254 : DISCARD_COMPLETED_WITH_ERROR;
255 else
256 what = (bio_data_dir(bio) == WRITE)
8554df1c 257 ? WRITE_COMPLETED_WITH_ERROR
5c3c7e64 258 : (bio_rw(bio) == READ)
8554df1c
AG
259 ? READ_COMPLETED_WITH_ERROR
260 : READ_AHEAD_COMPLETED_WITH_ERROR;
b411b363 261 } else
8554df1c 262 what = COMPLETED_OK;
b411b363
PR
263
264 bio_put(req->private_bio);
4246a0b6 265 req->private_bio = ERR_PTR(bio->bi_error);
b411b363 266
a115413d 267 /* not req_mod(), we need irqsave here! */
0500813f 268 spin_lock_irqsave(&device->resource->req_lock, flags);
a115413d 269 __req_mod(req, what, &m);
0500813f 270 spin_unlock_irqrestore(&device->resource->req_lock, flags);
b30ab791 271 put_ldev(device);
a115413d
LE
272
273 if (m.bio)
b30ab791 274 complete_master_bio(device, &m);
b411b363
PR
275}
276
9534d671 277void drbd_csum_ee(struct crypto_ahash *tfm, struct drbd_peer_request *peer_req, void *digest)
45bb912b 278{
9534d671 279 AHASH_REQUEST_ON_STACK(req, tfm);
45bb912b 280 struct scatterlist sg;
db830c46 281 struct page *page = peer_req->pages;
45bb912b
LE
282 struct page *tmp;
283 unsigned len;
284
9534d671
HX
285 ahash_request_set_tfm(req, tfm);
286 ahash_request_set_callback(req, 0, NULL, NULL);
45bb912b
LE
287
288 sg_init_table(&sg, 1);
9534d671 289 crypto_ahash_init(req);
45bb912b
LE
290
291 while ((tmp = page_chain_next(page))) {
292 /* all but the last page will be fully used */
293 sg_set_page(&sg, page, PAGE_SIZE, 0);
9534d671
HX
294 ahash_request_set_crypt(req, &sg, NULL, sg.length);
295 crypto_ahash_update(req);
45bb912b
LE
296 page = tmp;
297 }
298 /* and now the last, possibly only partially used page */
db830c46 299 len = peer_req->i.size & (PAGE_SIZE - 1);
45bb912b 300 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
9534d671
HX
301 ahash_request_set_crypt(req, &sg, digest, sg.length);
302 crypto_ahash_finup(req);
303 ahash_request_zero(req);
45bb912b
LE
304}
305
9534d671 306void drbd_csum_bio(struct crypto_ahash *tfm, struct bio *bio, void *digest)
b411b363 307{
9534d671 308 AHASH_REQUEST_ON_STACK(req, tfm);
b411b363 309 struct scatterlist sg;
7988613b
KO
310 struct bio_vec bvec;
311 struct bvec_iter iter;
b411b363 312
9534d671
HX
313 ahash_request_set_tfm(req, tfm);
314 ahash_request_set_callback(req, 0, NULL, NULL);
b411b363
PR
315
316 sg_init_table(&sg, 1);
9534d671 317 crypto_ahash_init(req);
b411b363 318
7988613b
KO
319 bio_for_each_segment(bvec, bio, iter) {
320 sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
9534d671
HX
321 ahash_request_set_crypt(req, &sg, NULL, sg.length);
322 crypto_ahash_update(req);
9104d31a
LE
323 /* REQ_OP_WRITE_SAME has only one segment,
324 * checksum the payload only once. */
325 if (bio_op(bio) == REQ_OP_WRITE_SAME)
326 break;
b411b363 327 }
9534d671
HX
328 ahash_request_set_crypt(req, NULL, digest, 0);
329 crypto_ahash_final(req);
330 ahash_request_zero(req);
b411b363
PR
331}
332
9676c760 333/* MAYBE merge common code with w_e_end_ov_req */
99920dc5 334static int w_e_send_csum(struct drbd_work *w, int cancel)
b411b363 335{
a8cd15ba 336 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
6780139c
AG
337 struct drbd_peer_device *peer_device = peer_req->peer_device;
338 struct drbd_device *device = peer_device->device;
b411b363
PR
339 int digest_size;
340 void *digest;
99920dc5 341 int err = 0;
b411b363 342
53ea4331
LE
343 if (unlikely(cancel))
344 goto out;
b411b363 345
9676c760 346 if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
53ea4331 347 goto out;
b411b363 348
9534d671 349 digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
53ea4331
LE
350 digest = kmalloc(digest_size, GFP_NOIO);
351 if (digest) {
db830c46
AG
352 sector_t sector = peer_req->i.sector;
353 unsigned int size = peer_req->i.size;
6780139c 354 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
9676c760 355 /* Free peer_req and pages before send.
53ea4331
LE
356 * In case we block on congestion, we could otherwise run into
357 * some distributed deadlock, if the other side blocks on
358 * congestion as well, because our receiver blocks in
c37c8ecf 359 * drbd_alloc_pages due to pp_in_use > max_buffers. */
b30ab791 360 drbd_free_peer_req(device, peer_req);
db830c46 361 peer_req = NULL;
b30ab791 362 inc_rs_pending(device);
6780139c 363 err = drbd_send_drequest_csum(peer_device, sector, size,
db1b0b72
AG
364 digest, digest_size,
365 P_CSUM_RS_REQUEST);
53ea4331
LE
366 kfree(digest);
367 } else {
d0180171 368 drbd_err(device, "kmalloc() of digest failed.\n");
99920dc5 369 err = -ENOMEM;
53ea4331 370 }
b411b363 371
53ea4331 372out:
db830c46 373 if (peer_req)
b30ab791 374 drbd_free_peer_req(device, peer_req);
b411b363 375
99920dc5 376 if (unlikely(err))
d0180171 377 drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
99920dc5 378 return err;
b411b363
PR
379}
380
381#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
382
69a22773 383static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
b411b363 384{
69a22773 385 struct drbd_device *device = peer_device->device;
db830c46 386 struct drbd_peer_request *peer_req;
b411b363 387
b30ab791 388 if (!get_ldev(device))
80a40e43 389 return -EIO;
b411b363
PR
390
391 /* GFP_TRY, because if there is no memory available right now, this may
392 * be rescheduled for later. It is "only" background resync, after all. */
69a22773 393 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
9104d31a 394 size, size, GFP_TRY);
db830c46 395 if (!peer_req)
80a40e43 396 goto defer;
b411b363 397
a8cd15ba 398 peer_req->w.cb = w_e_send_csum;
0500813f 399 spin_lock_irq(&device->resource->req_lock);
b9ed7080 400 list_add_tail(&peer_req->w.list, &device->read_ee);
0500813f 401 spin_unlock_irq(&device->resource->req_lock);
b411b363 402
b30ab791 403 atomic_add(size >> 9, &device->rs_sect_ev);
bb3cc85e
MC
404 if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
405 DRBD_FAULT_RS_RD) == 0)
80a40e43 406 return 0;
b411b363 407
10f6d992
LE
408 /* If it failed because of ENOMEM, retry should help. If it failed
409 * because bio_add_page failed (probably broken lower level driver),
410 * retry may or may not help.
411 * If it does not, you may need to force disconnect. */
0500813f 412 spin_lock_irq(&device->resource->req_lock);
a8cd15ba 413 list_del(&peer_req->w.list);
0500813f 414 spin_unlock_irq(&device->resource->req_lock);
22cc37a9 415
b30ab791 416 drbd_free_peer_req(device, peer_req);
80a40e43 417defer:
b30ab791 418 put_ldev(device);
80a40e43 419 return -EAGAIN;
b411b363
PR
420}
421
99920dc5 422int w_resync_timer(struct drbd_work *w, int cancel)
b411b363 423{
84b8c06b
AG
424 struct drbd_device *device =
425 container_of(w, struct drbd_device, resync_work);
426
b30ab791 427 switch (device->state.conn) {
63106d3c 428 case C_VERIFY_S:
d448a2e1 429 make_ov_request(device, cancel);
63106d3c
PR
430 break;
431 case C_SYNC_TARGET:
d448a2e1 432 make_resync_request(device, cancel);
63106d3c 433 break;
b411b363
PR
434 }
435
99920dc5 436 return 0;
794abb75
PR
437}
438
439void resync_timer_fn(unsigned long data)
440{
b30ab791 441 struct drbd_device *device = (struct drbd_device *) data;
794abb75 442
15e26f6a
LE
443 drbd_queue_work_if_unqueued(
444 &first_peer_device(device)->connection->sender_work,
445 &device->resync_work);
b411b363
PR
446}
447
778f271d
PR
448static void fifo_set(struct fifo_buffer *fb, int value)
449{
450 int i;
451
452 for (i = 0; i < fb->size; i++)
f10f2623 453 fb->values[i] = value;
778f271d
PR
454}
455
456static int fifo_push(struct fifo_buffer *fb, int value)
457{
458 int ov;
459
460 ov = fb->values[fb->head_index];
461 fb->values[fb->head_index++] = value;
462
463 if (fb->head_index >= fb->size)
464 fb->head_index = 0;
465
466 return ov;
467}
468
469static void fifo_add_val(struct fifo_buffer *fb, int value)
470{
471 int i;
472
473 for (i = 0; i < fb->size; i++)
474 fb->values[i] += value;
475}
476
9958c857
PR
477struct fifo_buffer *fifo_alloc(int fifo_size)
478{
479 struct fifo_buffer *fb;
480
8747d30a 481 fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
9958c857
PR
482 if (!fb)
483 return NULL;
484
485 fb->head_index = 0;
486 fb->size = fifo_size;
487 fb->total = 0;
488
489 return fb;
490}
491
0e49d7b0 492static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
778f271d 493{
daeda1cc 494 struct disk_conf *dc;
7f34f614 495 unsigned int want; /* The number of sectors we want in-flight */
778f271d 496 int req_sect; /* Number of sectors to request in this turn */
7f34f614 497 int correction; /* Number of sectors more we need in-flight */
778f271d
PR
498 int cps; /* correction per invocation of drbd_rs_controller() */
499 int steps; /* Number of time steps to plan ahead */
500 int curr_corr;
501 int max_sect;
813472ce 502 struct fifo_buffer *plan;
778f271d 503
b30ab791
AG
504 dc = rcu_dereference(device->ldev->disk_conf);
505 plan = rcu_dereference(device->rs_plan_s);
778f271d 506
813472ce 507 steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
778f271d 508
b30ab791 509 if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
daeda1cc 510 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
778f271d 511 } else { /* normal path */
daeda1cc
PR
512 want = dc->c_fill_target ? dc->c_fill_target :
513 sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
778f271d
PR
514 }
515
b30ab791 516 correction = want - device->rs_in_flight - plan->total;
778f271d
PR
517
518 /* Plan ahead */
519 cps = correction / steps;
813472ce
PR
520 fifo_add_val(plan, cps);
521 plan->total += cps * steps;
778f271d
PR
522
523 /* What we do in this step */
813472ce
PR
524 curr_corr = fifo_push(plan, 0);
525 plan->total -= curr_corr;
778f271d
PR
526
527 req_sect = sect_in + curr_corr;
528 if (req_sect < 0)
529 req_sect = 0;
530
daeda1cc 531 max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
778f271d
PR
532 if (req_sect > max_sect)
533 req_sect = max_sect;
534
535 /*
d0180171 536 drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
b30ab791
AG
537 sect_in, device->rs_in_flight, want, correction,
538 steps, cps, device->rs_planed, curr_corr, req_sect);
778f271d
PR
539 */
540
541 return req_sect;
542}
543
b30ab791 544static int drbd_rs_number_requests(struct drbd_device *device)
e65f440d 545{
0e49d7b0
LE
546 unsigned int sect_in; /* Number of sectors that came in since the last turn */
547 int number, mxb;
548
549 sect_in = atomic_xchg(&device->rs_sect_in, 0);
550 device->rs_in_flight -= sect_in;
813472ce
PR
551
552 rcu_read_lock();
0e49d7b0 553 mxb = drbd_get_max_buffers(device) / 2;
b30ab791 554 if (rcu_dereference(device->rs_plan_s)->size) {
0e49d7b0 555 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
b30ab791 556 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
e65f440d 557 } else {
b30ab791
AG
558 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
559 number = SLEEP_TIME * device->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ);
e65f440d 560 }
813472ce 561 rcu_read_unlock();
e65f440d 562
0e49d7b0
LE
563 /* Don't have more than "max-buffers"/2 in-flight.
564 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
565 * potentially causing a distributed deadlock on congestion during
566 * online-verify or (checksum-based) resync, if max-buffers,
567 * socket buffer sizes and resync rate settings are mis-configured. */
7f34f614
LE
568
569 /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
570 * mxb (as used here, and in drbd_alloc_pages on the peer) is
571 * "number of pages" (typically also 4k),
572 * but "rs_in_flight" is in "sectors" (512 Byte). */
573 if (mxb - device->rs_in_flight/8 < number)
574 number = mxb - device->rs_in_flight/8;
0e49d7b0 575
e65f440d
LE
576 return number;
577}
578
44a4d551 579static int make_resync_request(struct drbd_device *const device, int cancel)
b411b363 580{
44a4d551
LE
581 struct drbd_peer_device *const peer_device = first_peer_device(device);
582 struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
b411b363
PR
583 unsigned long bit;
584 sector_t sector;
b30ab791 585 const sector_t capacity = drbd_get_capacity(device->this_bdev);
1816a2b4 586 int max_bio_size;
e65f440d 587 int number, rollback_i, size;
506afb62 588 int align, requeue = 0;
0f0601f4 589 int i = 0;
92d94ae6 590 int discard_granularity = 0;
b411b363
PR
591
592 if (unlikely(cancel))
99920dc5 593 return 0;
b411b363 594
b30ab791 595 if (device->rs_total == 0) {
af85e8e8 596 /* empty resync? */
b30ab791 597 drbd_resync_finished(device);
99920dc5 598 return 0;
af85e8e8
LE
599 }
600
b30ab791
AG
601 if (!get_ldev(device)) {
602 /* Since we only need to access device->rsync a
603 get_ldev_if_state(device,D_FAILED) would be sufficient, but
b411b363
PR
604 to continue resync with a broken disk makes no sense at
605 all */
d0180171 606 drbd_err(device, "Disk broke down during resync!\n");
99920dc5 607 return 0;
b411b363
PR
608 }
609
9104d31a 610 if (connection->agreed_features & DRBD_FF_THIN_RESYNC) {
92d94ae6
PR
611 rcu_read_lock();
612 discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity;
613 rcu_read_unlock();
614 }
615
b30ab791
AG
616 max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
617 number = drbd_rs_number_requests(device);
0e49d7b0 618 if (number <= 0)
0f0601f4 619 goto requeue;
b411b363 620
b411b363 621 for (i = 0; i < number; i++) {
506afb62
LE
622 /* Stop generating RS requests when half of the send buffer is filled,
623 * but notify TCP that we'd like to have more space. */
44a4d551
LE
624 mutex_lock(&connection->data.mutex);
625 if (connection->data.socket) {
506afb62
LE
626 struct sock *sk = connection->data.socket->sk;
627 int queued = sk->sk_wmem_queued;
628 int sndbuf = sk->sk_sndbuf;
629 if (queued > sndbuf / 2) {
630 requeue = 1;
631 if (sk->sk_socket)
632 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
633 }
634 } else
635 requeue = 1;
44a4d551 636 mutex_unlock(&connection->data.mutex);
506afb62 637 if (requeue)
b411b363
PR
638 goto requeue;
639
640next_sector:
641 size = BM_BLOCK_SIZE;
b30ab791 642 bit = drbd_bm_find_next(device, device->bm_resync_fo);
b411b363 643
4b0715f0 644 if (bit == DRBD_END_OF_BITMAP) {
b30ab791
AG
645 device->bm_resync_fo = drbd_bm_bits(device);
646 put_ldev(device);
99920dc5 647 return 0;
b411b363
PR
648 }
649
650 sector = BM_BIT_TO_SECT(bit);
651
ad3fee79 652 if (drbd_try_rs_begin_io(device, sector)) {
b30ab791 653 device->bm_resync_fo = bit;
b411b363
PR
654 goto requeue;
655 }
b30ab791 656 device->bm_resync_fo = bit + 1;
b411b363 657
b30ab791
AG
658 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
659 drbd_rs_complete_io(device, sector);
b411b363
PR
660 goto next_sector;
661 }
662
1816a2b4 663#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
b411b363
PR
664 /* try to find some adjacent bits.
665 * we stop if we have already the maximum req size.
666 *
667 * Additionally always align bigger requests, in order to
668 * be prepared for all stripe sizes of software RAIDs.
b411b363
PR
669 */
670 align = 1;
d207450c 671 rollback_i = i;
6377b923 672 while (i < number) {
1816a2b4 673 if (size + BM_BLOCK_SIZE > max_bio_size)
b411b363
PR
674 break;
675
676 /* Be always aligned */
677 if (sector & ((1<<(align+3))-1))
678 break;
679
92d94ae6
PR
680 if (discard_granularity && size == discard_granularity)
681 break;
682
b411b363
PR
683 /* do not cross extent boundaries */
684 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
685 break;
686 /* now, is it actually dirty, after all?
687 * caution, drbd_bm_test_bit is tri-state for some
688 * obscure reason; ( b == 0 ) would get the out-of-band
689 * only accidentally right because of the "oddly sized"
690 * adjustment below */
b30ab791 691 if (drbd_bm_test_bit(device, bit+1) != 1)
b411b363
PR
692 break;
693 bit++;
694 size += BM_BLOCK_SIZE;
695 if ((BM_BLOCK_SIZE << align) <= size)
696 align++;
697 i++;
698 }
699 /* if we merged some,
700 * reset the offset to start the next drbd_bm_find_next from */
701 if (size > BM_BLOCK_SIZE)
b30ab791 702 device->bm_resync_fo = bit + 1;
b411b363
PR
703#endif
704
705 /* adjust very last sectors, in case we are oddly sized */
706 if (sector + (size>>9) > capacity)
707 size = (capacity-sector)<<9;
aaaba345
LE
708
709 if (device->use_csums) {
44a4d551 710 switch (read_for_csum(peer_device, sector, size)) {
80a40e43 711 case -EIO: /* Disk failure */
b30ab791 712 put_ldev(device);
99920dc5 713 return -EIO;
80a40e43 714 case -EAGAIN: /* allocation failed, or ldev busy */
b30ab791
AG
715 drbd_rs_complete_io(device, sector);
716 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
d207450c 717 i = rollback_i;
b411b363 718 goto requeue;
80a40e43
LE
719 case 0:
720 /* everything ok */
721 break;
722 default:
723 BUG();
b411b363
PR
724 }
725 } else {
99920dc5
AG
726 int err;
727
b30ab791 728 inc_rs_pending(device);
92d94ae6
PR
729 err = drbd_send_drequest(peer_device,
730 size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST,
99920dc5
AG
731 sector, size, ID_SYNCER);
732 if (err) {
d0180171 733 drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
b30ab791
AG
734 dec_rs_pending(device);
735 put_ldev(device);
99920dc5 736 return err;
b411b363
PR
737 }
738 }
739 }
740
b30ab791 741 if (device->bm_resync_fo >= drbd_bm_bits(device)) {
b411b363
PR
742 /* last syncer _request_ was sent,
743 * but the P_RS_DATA_REPLY not yet received. sync will end (and
744 * next sync group will resume), as soon as we receive the last
745 * resync data block, and the last bit is cleared.
746 * until then resync "work" is "inactive" ...
747 */
b30ab791 748 put_ldev(device);
99920dc5 749 return 0;
b411b363
PR
750 }
751
752 requeue:
b30ab791
AG
753 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
754 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
755 put_ldev(device);
99920dc5 756 return 0;
b411b363
PR
757}
758
d448a2e1 759static int make_ov_request(struct drbd_device *device, int cancel)
b411b363
PR
760{
761 int number, i, size;
762 sector_t sector;
b30ab791 763 const sector_t capacity = drbd_get_capacity(device->this_bdev);
58ffa580 764 bool stop_sector_reached = false;
b411b363
PR
765
766 if (unlikely(cancel))
767 return 1;
768
b30ab791 769 number = drbd_rs_number_requests(device);
b411b363 770
b30ab791 771 sector = device->ov_position;
b411b363 772 for (i = 0; i < number; i++) {
58ffa580 773 if (sector >= capacity)
b411b363 774 return 1;
58ffa580
LE
775
776 /* We check for "finished" only in the reply path:
777 * w_e_end_ov_reply().
778 * We need to send at least one request out. */
779 stop_sector_reached = i > 0
b30ab791
AG
780 && verify_can_do_stop_sector(device)
781 && sector >= device->ov_stop_sector;
58ffa580
LE
782 if (stop_sector_reached)
783 break;
b411b363
PR
784
785 size = BM_BLOCK_SIZE;
786
ad3fee79 787 if (drbd_try_rs_begin_io(device, sector)) {
b30ab791 788 device->ov_position = sector;
b411b363
PR
789 goto requeue;
790 }
791
792 if (sector + (size>>9) > capacity)
793 size = (capacity-sector)<<9;
794
b30ab791 795 inc_rs_pending(device);
69a22773 796 if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
b30ab791 797 dec_rs_pending(device);
b411b363
PR
798 return 0;
799 }
800 sector += BM_SECT_PER_BIT;
801 }
b30ab791 802 device->ov_position = sector;
b411b363
PR
803
804 requeue:
b30ab791 805 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
58ffa580 806 if (i == 0 || !stop_sector_reached)
b30ab791 807 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
b411b363
PR
808 return 1;
809}
810
99920dc5 811int w_ov_finished(struct drbd_work *w, int cancel)
b411b363 812{
84b8c06b
AG
813 struct drbd_device_work *dw =
814 container_of(w, struct drbd_device_work, w);
815 struct drbd_device *device = dw->device;
816 kfree(dw);
b30ab791
AG
817 ov_out_of_sync_print(device);
818 drbd_resync_finished(device);
b411b363 819
99920dc5 820 return 0;
b411b363
PR
821}
822
99920dc5 823static int w_resync_finished(struct drbd_work *w, int cancel)
b411b363 824{
84b8c06b
AG
825 struct drbd_device_work *dw =
826 container_of(w, struct drbd_device_work, w);
827 struct drbd_device *device = dw->device;
828 kfree(dw);
b411b363 829
b30ab791 830 drbd_resync_finished(device);
b411b363 831
99920dc5 832 return 0;
b411b363
PR
833}
834
b30ab791 835static void ping_peer(struct drbd_device *device)
af85e8e8 836{
a6b32bc3 837 struct drbd_connection *connection = first_peer_device(device)->connection;
2a67d8b9 838
bde89a9e
AG
839 clear_bit(GOT_PING_ACK, &connection->flags);
840 request_ping(connection);
841 wait_event(connection->ping_wait,
842 test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
af85e8e8
LE
843}
844
b30ab791 845int drbd_resync_finished(struct drbd_device *device)
b411b363 846{
26a96110 847 struct drbd_connection *connection = first_peer_device(device)->connection;
b411b363
PR
848 unsigned long db, dt, dbdt;
849 unsigned long n_oos;
850 union drbd_state os, ns;
84b8c06b 851 struct drbd_device_work *dw;
b411b363 852 char *khelper_cmd = NULL;
26525618 853 int verify_done = 0;
b411b363
PR
854
855 /* Remove all elements from the resync LRU. Since future actions
856 * might set bits in the (main) bitmap, then the entries in the
857 * resync LRU would be wrong. */
b30ab791 858 if (drbd_rs_del_all(device)) {
b411b363
PR
859 /* In case this is not possible now, most probably because
860 * there are P_RS_DATA_REPLY Packets lingering on the worker's
861 * queue (or even the read operations for those packets
862 * is not finished by now). Retry in 100ms. */
863
20ee6390 864 schedule_timeout_interruptible(HZ / 10);
84b8c06b
AG
865 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
866 if (dw) {
867 dw->w.cb = w_resync_finished;
868 dw->device = device;
26a96110 869 drbd_queue_work(&connection->sender_work, &dw->w);
b411b363
PR
870 return 1;
871 }
84b8c06b 872 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
b411b363
PR
873 }
874
b30ab791 875 dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
b411b363
PR
876 if (dt <= 0)
877 dt = 1;
84b8c06b 878
b30ab791 879 db = device->rs_total;
58ffa580 880 /* adjust for verify start and stop sectors, respective reached position */
b30ab791
AG
881 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
882 db -= device->ov_left;
58ffa580 883
b411b363 884 dbdt = Bit2KB(db/dt);
b30ab791 885 device->rs_paused /= HZ;
b411b363 886
b30ab791 887 if (!get_ldev(device))
b411b363
PR
888 goto out;
889
b30ab791 890 ping_peer(device);
af85e8e8 891
0500813f 892 spin_lock_irq(&device->resource->req_lock);
b30ab791 893 os = drbd_read_state(device);
b411b363 894
26525618
LE
895 verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
896
b411b363
PR
897 /* This protects us against multiple calls (that can happen in the presence
898 of application IO), and against connectivity loss just before we arrive here. */
899 if (os.conn <= C_CONNECTED)
900 goto out_unlock;
901
902 ns = os;
903 ns.conn = C_CONNECTED;
904
d0180171 905 drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
58ffa580 906 verify_done ? "Online verify" : "Resync",
b30ab791 907 dt + device->rs_paused, device->rs_paused, dbdt);
b411b363 908
b30ab791 909 n_oos = drbd_bm_total_weight(device);
b411b363
PR
910
911 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
912 if (n_oos) {
d0180171 913 drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
b411b363
PR
914 n_oos, Bit2KB(1));
915 khelper_cmd = "out-of-sync";
916 }
917 } else {
0b0ba1ef 918 D_ASSERT(device, (n_oos - device->rs_failed) == 0);
b411b363
PR
919
920 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
921 khelper_cmd = "after-resync-target";
922
aaaba345 923 if (device->use_csums && device->rs_total) {
b30ab791
AG
924 const unsigned long s = device->rs_same_csum;
925 const unsigned long t = device->rs_total;
b411b363
PR
926 const int ratio =
927 (t == 0) ? 0 :
928 (t < 100000) ? ((s*100)/t) : (s/(t/100));
d0180171 929 drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
b411b363
PR
930 "transferred %luK total %luK\n",
931 ratio,
b30ab791
AG
932 Bit2KB(device->rs_same_csum),
933 Bit2KB(device->rs_total - device->rs_same_csum),
934 Bit2KB(device->rs_total));
b411b363
PR
935 }
936 }
937
b30ab791 938 if (device->rs_failed) {
d0180171 939 drbd_info(device, " %lu failed blocks\n", device->rs_failed);
b411b363
PR
940
941 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
942 ns.disk = D_INCONSISTENT;
943 ns.pdsk = D_UP_TO_DATE;
944 } else {
945 ns.disk = D_UP_TO_DATE;
946 ns.pdsk = D_INCONSISTENT;
947 }
948 } else {
949 ns.disk = D_UP_TO_DATE;
950 ns.pdsk = D_UP_TO_DATE;
951
952 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
b30ab791 953 if (device->p_uuid) {
b411b363
PR
954 int i;
955 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
b30ab791
AG
956 _drbd_uuid_set(device, i, device->p_uuid[i]);
957 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
958 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
b411b363 959 } else {
d0180171 960 drbd_err(device, "device->p_uuid is NULL! BUG\n");
b411b363
PR
961 }
962 }
963
62b0da3a
LE
964 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
965 /* for verify runs, we don't update uuids here,
966 * so there would be nothing to report. */
b30ab791
AG
967 drbd_uuid_set_bm(device, 0UL);
968 drbd_print_uuids(device, "updated UUIDs");
969 if (device->p_uuid) {
62b0da3a
LE
970 /* Now the two UUID sets are equal, update what we
971 * know of the peer. */
972 int i;
973 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
b30ab791 974 device->p_uuid[i] = device->ldev->md.uuid[i];
62b0da3a 975 }
b411b363
PR
976 }
977 }
978
b30ab791 979 _drbd_set_state(device, ns, CS_VERBOSE, NULL);
b411b363 980out_unlock:
0500813f 981 spin_unlock_irq(&device->resource->req_lock);
26a96110
LE
982
983 /* If we have been sync source, and have an effective fencing-policy,
984 * once *all* volumes are back in sync, call "unfence". */
985 if (os.conn == C_SYNC_SOURCE) {
986 enum drbd_disk_state disk_state = D_MASK;
987 enum drbd_disk_state pdsk_state = D_MASK;
988 enum drbd_fencing_p fp = FP_DONT_CARE;
989
990 rcu_read_lock();
991 fp = rcu_dereference(device->ldev->disk_conf)->fencing;
992 if (fp != FP_DONT_CARE) {
993 struct drbd_peer_device *peer_device;
994 int vnr;
995 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
996 struct drbd_device *device = peer_device->device;
997 disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk);
998 pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk);
999 }
1000 }
1001 rcu_read_unlock();
1002 if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE)
1003 conn_khelper(connection, "unfence-peer");
1004 }
1005
b30ab791 1006 put_ldev(device);
b411b363 1007out:
b30ab791
AG
1008 device->rs_total = 0;
1009 device->rs_failed = 0;
1010 device->rs_paused = 0;
58ffa580
LE
1011
1012 /* reset start sector, if we reached end of device */
b30ab791
AG
1013 if (verify_done && device->ov_left == 0)
1014 device->ov_start_sector = 0;
b411b363 1015
b30ab791 1016 drbd_md_sync(device);
13d42685 1017
b411b363 1018 if (khelper_cmd)
b30ab791 1019 drbd_khelper(device, khelper_cmd);
b411b363
PR
1020
1021 return 1;
1022}
1023
1024/* helper */
b30ab791 1025static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
b411b363 1026{
045417f7 1027 if (drbd_peer_req_has_active_page(peer_req)) {
b411b363 1028 /* This might happen if sendpage() has not finished */
db830c46 1029 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
b30ab791
AG
1030 atomic_add(i, &device->pp_in_use_by_net);
1031 atomic_sub(i, &device->pp_in_use);
0500813f 1032 spin_lock_irq(&device->resource->req_lock);
a8cd15ba 1033 list_add_tail(&peer_req->w.list, &device->net_ee);
0500813f 1034 spin_unlock_irq(&device->resource->req_lock);
435f0740 1035 wake_up(&drbd_pp_wait);
b411b363 1036 } else
b30ab791 1037 drbd_free_peer_req(device, peer_req);
b411b363
PR
1038}
1039
1040/**
1041 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
b30ab791 1042 * @device: DRBD device.
b411b363
PR
1043 * @w: work object.
1044 * @cancel: The connection will be closed anyways
1045 */
99920dc5 1046int w_e_end_data_req(struct drbd_work *w, int cancel)
b411b363 1047{
a8cd15ba 1048 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
6780139c
AG
1049 struct drbd_peer_device *peer_device = peer_req->peer_device;
1050 struct drbd_device *device = peer_device->device;
99920dc5 1051 int err;
b411b363
PR
1052
1053 if (unlikely(cancel)) {
b30ab791
AG
1054 drbd_free_peer_req(device, peer_req);
1055 dec_unacked(device);
99920dc5 1056 return 0;
b411b363
PR
1057 }
1058
db830c46 1059 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
6780139c 1060 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
b411b363
PR
1061 } else {
1062 if (__ratelimit(&drbd_ratelimit_state))
d0180171 1063 drbd_err(device, "Sending NegDReply. sector=%llus.\n",
db830c46 1064 (unsigned long long)peer_req->i.sector);
b411b363 1065
6780139c 1066 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
b411b363
PR
1067 }
1068
b30ab791 1069 dec_unacked(device);
b411b363 1070
b30ab791 1071 move_to_net_ee_or_free(device, peer_req);
b411b363 1072
99920dc5 1073 if (unlikely(err))
d0180171 1074 drbd_err(device, "drbd_send_block() failed\n");
99920dc5 1075 return err;
b411b363
PR
1076}
1077
700ca8c0
PR
1078static bool all_zero(struct drbd_peer_request *peer_req)
1079{
1080 struct page *page = peer_req->pages;
1081 unsigned int len = peer_req->i.size;
1082
1083 page_chain_for_each(page) {
1084 unsigned int l = min_t(unsigned int, len, PAGE_SIZE);
1085 unsigned int i, words = l / sizeof(long);
1086 unsigned long *d;
1087
1088 d = kmap_atomic(page);
1089 for (i = 0; i < words; i++) {
1090 if (d[i]) {
1091 kunmap_atomic(d);
1092 return false;
1093 }
1094 }
1095 kunmap_atomic(d);
1096 len -= l;
1097 }
1098
1099 return true;
1100}
1101
b411b363 1102/**
a209b4ae 1103 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
b411b363
PR
1104 * @w: work object.
1105 * @cancel: The connection will be closed anyways
1106 */
99920dc5 1107int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
b411b363 1108{
a8cd15ba 1109 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
6780139c
AG
1110 struct drbd_peer_device *peer_device = peer_req->peer_device;
1111 struct drbd_device *device = peer_device->device;
99920dc5 1112 int err;
b411b363
PR
1113
1114 if (unlikely(cancel)) {
b30ab791
AG
1115 drbd_free_peer_req(device, peer_req);
1116 dec_unacked(device);
99920dc5 1117 return 0;
b411b363
PR
1118 }
1119
b30ab791
AG
1120 if (get_ldev_if_state(device, D_FAILED)) {
1121 drbd_rs_complete_io(device, peer_req->i.sector);
1122 put_ldev(device);
b411b363
PR
1123 }
1124
b30ab791 1125 if (device->state.conn == C_AHEAD) {
6780139c 1126 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
db830c46 1127 } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
b30ab791
AG
1128 if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1129 inc_rs_pending(device);
700ca8c0
PR
1130 if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req))
1131 err = drbd_send_rs_deallocated(peer_device, peer_req);
1132 else
1133 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
b411b363
PR
1134 } else {
1135 if (__ratelimit(&drbd_ratelimit_state))
d0180171 1136 drbd_err(device, "Not sending RSDataReply, "
b411b363 1137 "partner DISKLESS!\n");
99920dc5 1138 err = 0;
b411b363
PR
1139 }
1140 } else {
1141 if (__ratelimit(&drbd_ratelimit_state))
d0180171 1142 drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
db830c46 1143 (unsigned long long)peer_req->i.sector);
b411b363 1144
6780139c 1145 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
b411b363
PR
1146
1147 /* update resync data with failure */
b30ab791 1148 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
b411b363
PR
1149 }
1150
b30ab791 1151 dec_unacked(device);
b411b363 1152
b30ab791 1153 move_to_net_ee_or_free(device, peer_req);
b411b363 1154
99920dc5 1155 if (unlikely(err))
d0180171 1156 drbd_err(device, "drbd_send_block() failed\n");
99920dc5 1157 return err;
b411b363
PR
1158}
1159
99920dc5 1160int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
b411b363 1161{
a8cd15ba 1162 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
6780139c
AG
1163 struct drbd_peer_device *peer_device = peer_req->peer_device;
1164 struct drbd_device *device = peer_device->device;
b411b363
PR
1165 struct digest_info *di;
1166 int digest_size;
1167 void *digest = NULL;
99920dc5 1168 int err, eq = 0;
b411b363
PR
1169
1170 if (unlikely(cancel)) {
b30ab791
AG
1171 drbd_free_peer_req(device, peer_req);
1172 dec_unacked(device);
99920dc5 1173 return 0;
b411b363
PR
1174 }
1175
b30ab791
AG
1176 if (get_ldev(device)) {
1177 drbd_rs_complete_io(device, peer_req->i.sector);
1178 put_ldev(device);
1d53f09e 1179 }
b411b363 1180
db830c46 1181 di = peer_req->digest;
b411b363 1182
db830c46 1183 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1184 /* quick hack to try to avoid a race against reconfiguration.
1185 * a real fix would be much more involved,
1186 * introducing more locking mechanisms */
6780139c 1187 if (peer_device->connection->csums_tfm) {
9534d671 1188 digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
0b0ba1ef 1189 D_ASSERT(device, digest_size == di->digest_size);
b411b363
PR
1190 digest = kmalloc(digest_size, GFP_NOIO);
1191 }
1192 if (digest) {
6780139c 1193 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
b411b363
PR
1194 eq = !memcmp(digest, di->digest, digest_size);
1195 kfree(digest);
1196 }
1197
1198 if (eq) {
b30ab791 1199 drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
676396d5 1200 /* rs_same_csums unit is BM_BLOCK_SIZE */
b30ab791 1201 device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
6780139c 1202 err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
b411b363 1203 } else {
b30ab791 1204 inc_rs_pending(device);
db830c46
AG
1205 peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1206 peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
204bba99 1207 kfree(di);
6780139c 1208 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
b411b363
PR
1209 }
1210 } else {
6780139c 1211 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
b411b363 1212 if (__ratelimit(&drbd_ratelimit_state))
d0180171 1213 drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
b411b363
PR
1214 }
1215
b30ab791
AG
1216 dec_unacked(device);
1217 move_to_net_ee_or_free(device, peer_req);
b411b363 1218
99920dc5 1219 if (unlikely(err))
d0180171 1220 drbd_err(device, "drbd_send_block/ack() failed\n");
99920dc5 1221 return err;
b411b363
PR
1222}
1223
99920dc5 1224int w_e_end_ov_req(struct drbd_work *w, int cancel)
b411b363 1225{
a8cd15ba 1226 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
6780139c
AG
1227 struct drbd_peer_device *peer_device = peer_req->peer_device;
1228 struct drbd_device *device = peer_device->device;
db830c46
AG
1229 sector_t sector = peer_req->i.sector;
1230 unsigned int size = peer_req->i.size;
b411b363
PR
1231 int digest_size;
1232 void *digest;
99920dc5 1233 int err = 0;
b411b363
PR
1234
1235 if (unlikely(cancel))
1236 goto out;
1237
9534d671 1238 digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
b411b363 1239 digest = kmalloc(digest_size, GFP_NOIO);
8f21420e 1240 if (!digest) {
99920dc5 1241 err = 1; /* terminate the connection in case the allocation failed */
8f21420e 1242 goto out;
b411b363
PR
1243 }
1244
db830c46 1245 if (likely(!(peer_req->flags & EE_WAS_ERROR)))
6780139c 1246 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
8f21420e
PR
1247 else
1248 memset(digest, 0, digest_size);
1249
53ea4331
LE
1250 /* Free e and pages before send.
1251 * In case we block on congestion, we could otherwise run into
1252 * some distributed deadlock, if the other side blocks on
1253 * congestion as well, because our receiver blocks in
c37c8ecf 1254 * drbd_alloc_pages due to pp_in_use > max_buffers. */
b30ab791 1255 drbd_free_peer_req(device, peer_req);
db830c46 1256 peer_req = NULL;
b30ab791 1257 inc_rs_pending(device);
6780139c 1258 err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
99920dc5 1259 if (err)
b30ab791 1260 dec_rs_pending(device);
8f21420e
PR
1261 kfree(digest);
1262
b411b363 1263out:
db830c46 1264 if (peer_req)
b30ab791
AG
1265 drbd_free_peer_req(device, peer_req);
1266 dec_unacked(device);
99920dc5 1267 return err;
b411b363
PR
1268}
1269
b30ab791 1270void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
b411b363 1271{
b30ab791
AG
1272 if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1273 device->ov_last_oos_size += size>>9;
b411b363 1274 } else {
b30ab791
AG
1275 device->ov_last_oos_start = sector;
1276 device->ov_last_oos_size = size>>9;
b411b363 1277 }
b30ab791 1278 drbd_set_out_of_sync(device, sector, size);
b411b363
PR
1279}
1280
99920dc5 1281int w_e_end_ov_reply(struct drbd_work *w, int cancel)
b411b363 1282{
a8cd15ba 1283 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
6780139c
AG
1284 struct drbd_peer_device *peer_device = peer_req->peer_device;
1285 struct drbd_device *device = peer_device->device;
b411b363 1286 struct digest_info *di;
b411b363 1287 void *digest;
db830c46
AG
1288 sector_t sector = peer_req->i.sector;
1289 unsigned int size = peer_req->i.size;
53ea4331 1290 int digest_size;
99920dc5 1291 int err, eq = 0;
58ffa580 1292 bool stop_sector_reached = false;
b411b363
PR
1293
1294 if (unlikely(cancel)) {
b30ab791
AG
1295 drbd_free_peer_req(device, peer_req);
1296 dec_unacked(device);
99920dc5 1297 return 0;
b411b363
PR
1298 }
1299
1300 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1301 * the resync lru has been cleaned up already */
b30ab791
AG
1302 if (get_ldev(device)) {
1303 drbd_rs_complete_io(device, peer_req->i.sector);
1304 put_ldev(device);
1d53f09e 1305 }
b411b363 1306
db830c46 1307 di = peer_req->digest;
b411b363 1308
db830c46 1309 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
9534d671 1310 digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
b411b363
PR
1311 digest = kmalloc(digest_size, GFP_NOIO);
1312 if (digest) {
6780139c 1313 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
b411b363 1314
0b0ba1ef 1315 D_ASSERT(device, digest_size == di->digest_size);
b411b363
PR
1316 eq = !memcmp(digest, di->digest, digest_size);
1317 kfree(digest);
1318 }
b411b363
PR
1319 }
1320
9676c760
LE
1321 /* Free peer_req and pages before send.
1322 * In case we block on congestion, we could otherwise run into
1323 * some distributed deadlock, if the other side blocks on
1324 * congestion as well, because our receiver blocks in
c37c8ecf 1325 * drbd_alloc_pages due to pp_in_use > max_buffers. */
b30ab791 1326 drbd_free_peer_req(device, peer_req);
b411b363 1327 if (!eq)
b30ab791 1328 drbd_ov_out_of_sync_found(device, sector, size);
b411b363 1329 else
b30ab791 1330 ov_out_of_sync_print(device);
b411b363 1331
6780139c 1332 err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
fa79abd8 1333 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
b411b363 1334
b30ab791 1335 dec_unacked(device);
b411b363 1336
b30ab791 1337 --device->ov_left;
ea5442af
LE
1338
1339 /* let's advance progress step marks only for every other megabyte */
b30ab791
AG
1340 if ((device->ov_left & 0x200) == 0x200)
1341 drbd_advance_rs_marks(device, device->ov_left);
ea5442af 1342
b30ab791
AG
1343 stop_sector_reached = verify_can_do_stop_sector(device) &&
1344 (sector + (size>>9)) >= device->ov_stop_sector;
58ffa580 1345
b30ab791
AG
1346 if (device->ov_left == 0 || stop_sector_reached) {
1347 ov_out_of_sync_print(device);
1348 drbd_resync_finished(device);
b411b363
PR
1349 }
1350
99920dc5 1351 return err;
b411b363
PR
1352}
1353
b6dd1a89
LE
1354/* FIXME
1355 * We need to track the number of pending barrier acks,
1356 * and to be able to wait for them.
1357 * See also comment in drbd_adm_attach before drbd_suspend_io.
1358 */
bde89a9e 1359static int drbd_send_barrier(struct drbd_connection *connection)
b411b363 1360{
9f5bdc33 1361 struct p_barrier *p;
b6dd1a89 1362 struct drbd_socket *sock;
b411b363 1363
bde89a9e
AG
1364 sock = &connection->data;
1365 p = conn_prepare_command(connection, sock);
9f5bdc33
AG
1366 if (!p)
1367 return -EIO;
bde89a9e 1368 p->barrier = connection->send.current_epoch_nr;
b6dd1a89 1369 p->pad = 0;
bde89a9e 1370 connection->send.current_epoch_writes = 0;
84d34f2f 1371 connection->send.last_sent_barrier_jif = jiffies;
b6dd1a89 1372
bde89a9e 1373 return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
b411b363
PR
1374}
1375
99920dc5 1376int w_send_write_hint(struct drbd_work *w, int cancel)
b411b363 1377{
84b8c06b
AG
1378 struct drbd_device *device =
1379 container_of(w, struct drbd_device, unplug_work);
9f5bdc33
AG
1380 struct drbd_socket *sock;
1381
b411b363 1382 if (cancel)
99920dc5 1383 return 0;
a6b32bc3 1384 sock = &first_peer_device(device)->connection->data;
69a22773 1385 if (!drbd_prepare_command(first_peer_device(device), sock))
9f5bdc33 1386 return -EIO;
69a22773 1387 return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0);
b411b363
PR
1388}
1389
bde89a9e 1390static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
4eb9b3cb 1391{
bde89a9e
AG
1392 if (!connection->send.seen_any_write_yet) {
1393 connection->send.seen_any_write_yet = true;
1394 connection->send.current_epoch_nr = epoch;
1395 connection->send.current_epoch_writes = 0;
84d34f2f 1396 connection->send.last_sent_barrier_jif = jiffies;
4eb9b3cb
LE
1397 }
1398}
1399
bde89a9e 1400static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
4eb9b3cb
LE
1401{
1402 /* re-init if first write on this connection */
bde89a9e 1403 if (!connection->send.seen_any_write_yet)
4eb9b3cb 1404 return;
bde89a9e
AG
1405 if (connection->send.current_epoch_nr != epoch) {
1406 if (connection->send.current_epoch_writes)
1407 drbd_send_barrier(connection);
1408 connection->send.current_epoch_nr = epoch;
4eb9b3cb
LE
1409 }
1410}
1411
8f7bed77 1412int w_send_out_of_sync(struct drbd_work *w, int cancel)
73a01a18
PR
1413{
1414 struct drbd_request *req = container_of(w, struct drbd_request, w);
84b8c06b 1415 struct drbd_device *device = req->device;
44a4d551
LE
1416 struct drbd_peer_device *const peer_device = first_peer_device(device);
1417 struct drbd_connection *const connection = peer_device->connection;
99920dc5 1418 int err;
73a01a18
PR
1419
1420 if (unlikely(cancel)) {
8554df1c 1421 req_mod(req, SEND_CANCELED);
99920dc5 1422 return 0;
73a01a18 1423 }
e5f891b2 1424 req->pre_send_jif = jiffies;
73a01a18 1425
bde89a9e 1426 /* this time, no connection->send.current_epoch_writes++;
b6dd1a89
LE
1427 * If it was sent, it was the closing barrier for the last
1428 * replicated epoch, before we went into AHEAD mode.
1429 * No more barriers will be sent, until we leave AHEAD mode again. */
bde89a9e 1430 maybe_send_barrier(connection, req->epoch);
b6dd1a89 1431
44a4d551 1432 err = drbd_send_out_of_sync(peer_device, req);
8554df1c 1433 req_mod(req, OOS_HANDED_TO_NETWORK);
73a01a18 1434
99920dc5 1435 return err;
73a01a18
PR
1436}
1437
b411b363
PR
1438/**
1439 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
b411b363
PR
1440 * @w: work object.
1441 * @cancel: The connection will be closed anyways
1442 */
99920dc5 1443int w_send_dblock(struct drbd_work *w, int cancel)
b411b363
PR
1444{
1445 struct drbd_request *req = container_of(w, struct drbd_request, w);
84b8c06b 1446 struct drbd_device *device = req->device;
44a4d551
LE
1447 struct drbd_peer_device *const peer_device = first_peer_device(device);
1448 struct drbd_connection *connection = peer_device->connection;
99920dc5 1449 int err;
b411b363
PR
1450
1451 if (unlikely(cancel)) {
8554df1c 1452 req_mod(req, SEND_CANCELED);
99920dc5 1453 return 0;
b411b363 1454 }
e5f891b2 1455 req->pre_send_jif = jiffies;
b411b363 1456
bde89a9e
AG
1457 re_init_if_first_write(connection, req->epoch);
1458 maybe_send_barrier(connection, req->epoch);
1459 connection->send.current_epoch_writes++;
b6dd1a89 1460
44a4d551 1461 err = drbd_send_dblock(peer_device, req);
99920dc5 1462 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
b411b363 1463
99920dc5 1464 return err;
b411b363
PR
1465}
1466
1467/**
1468 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
b411b363
PR
1469 * @w: work object.
1470 * @cancel: The connection will be closed anyways
1471 */
99920dc5 1472int w_send_read_req(struct drbd_work *w, int cancel)
b411b363
PR
1473{
1474 struct drbd_request *req = container_of(w, struct drbd_request, w);
84b8c06b 1475 struct drbd_device *device = req->device;
44a4d551
LE
1476 struct drbd_peer_device *const peer_device = first_peer_device(device);
1477 struct drbd_connection *connection = peer_device->connection;
99920dc5 1478 int err;
b411b363
PR
1479
1480 if (unlikely(cancel)) {
8554df1c 1481 req_mod(req, SEND_CANCELED);
99920dc5 1482 return 0;
b411b363 1483 }
e5f891b2 1484 req->pre_send_jif = jiffies;
b411b363 1485
b6dd1a89
LE
1486 /* Even read requests may close a write epoch,
1487 * if there was any yet. */
bde89a9e 1488 maybe_send_barrier(connection, req->epoch);
b6dd1a89 1489
44a4d551 1490 err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
6c1005e7 1491 (unsigned long)req);
b411b363 1492
99920dc5 1493 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
b411b363 1494
99920dc5 1495 return err;
b411b363
PR
1496}
1497
99920dc5 1498int w_restart_disk_io(struct drbd_work *w, int cancel)
265be2d0
PR
1499{
1500 struct drbd_request *req = container_of(w, struct drbd_request, w);
84b8c06b 1501 struct drbd_device *device = req->device;
265be2d0 1502
0778286a 1503 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
4dd726f0 1504 drbd_al_begin_io(device, &req->i);
265be2d0
PR
1505
1506 drbd_req_make_private_bio(req, req->master_bio);
b30ab791 1507 req->private_bio->bi_bdev = device->ldev->backing_bdev;
265be2d0
PR
1508 generic_make_request(req->private_bio);
1509
99920dc5 1510 return 0;
265be2d0
PR
1511}
1512
b30ab791 1513static int _drbd_may_sync_now(struct drbd_device *device)
b411b363 1514{
b30ab791 1515 struct drbd_device *odev = device;
95f8efd0 1516 int resync_after;
b411b363
PR
1517
1518 while (1) {
a3f8f7dc 1519 if (!odev->ldev || odev->state.disk == D_DISKLESS)
438c8374 1520 return 1;
daeda1cc 1521 rcu_read_lock();
95f8efd0 1522 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
daeda1cc 1523 rcu_read_unlock();
95f8efd0 1524 if (resync_after == -1)
b411b363 1525 return 1;
b30ab791 1526 odev = minor_to_device(resync_after);
a3f8f7dc 1527 if (!odev)
841ce241 1528 return 1;
b411b363
PR
1529 if ((odev->state.conn >= C_SYNC_SOURCE &&
1530 odev->state.conn <= C_PAUSED_SYNC_T) ||
1531 odev->state.aftr_isp || odev->state.peer_isp ||
1532 odev->state.user_isp)
1533 return 0;
1534 }
1535}
1536
1537/**
28bc3b8c 1538 * drbd_pause_after() - Pause resync on all devices that may not resync now
b30ab791 1539 * @device: DRBD device.
b411b363
PR
1540 *
1541 * Called from process context only (admin command and after_state_ch).
1542 */
28bc3b8c 1543static bool drbd_pause_after(struct drbd_device *device)
b411b363 1544{
28bc3b8c 1545 bool changed = false;
54761697 1546 struct drbd_device *odev;
28bc3b8c 1547 int i;
b411b363 1548
695d08fa 1549 rcu_read_lock();
05a10ec7 1550 idr_for_each_entry(&drbd_devices, odev, i) {
b411b363
PR
1551 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1552 continue;
28bc3b8c
AG
1553 if (!_drbd_may_sync_now(odev) &&
1554 _drbd_set_state(_NS(odev, aftr_isp, 1),
1555 CS_HARD, NULL) != SS_NOTHING_TO_DO)
1556 changed = true;
b411b363 1557 }
695d08fa 1558 rcu_read_unlock();
b411b363 1559
28bc3b8c 1560 return changed;
b411b363
PR
1561}
1562
1563/**
28bc3b8c 1564 * drbd_resume_next() - Resume resync on all devices that may resync now
b30ab791 1565 * @device: DRBD device.
b411b363
PR
1566 *
1567 * Called from process context only (admin command and worker).
1568 */
28bc3b8c 1569static bool drbd_resume_next(struct drbd_device *device)
b411b363 1570{
28bc3b8c 1571 bool changed = false;
54761697 1572 struct drbd_device *odev;
28bc3b8c 1573 int i;
b411b363 1574
695d08fa 1575 rcu_read_lock();
05a10ec7 1576 idr_for_each_entry(&drbd_devices, odev, i) {
b411b363
PR
1577 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1578 continue;
1579 if (odev->state.aftr_isp) {
28bc3b8c
AG
1580 if (_drbd_may_sync_now(odev) &&
1581 _drbd_set_state(_NS(odev, aftr_isp, 0),
1582 CS_HARD, NULL) != SS_NOTHING_TO_DO)
1583 changed = true;
b411b363
PR
1584 }
1585 }
695d08fa 1586 rcu_read_unlock();
28bc3b8c 1587 return changed;
b411b363
PR
1588}
1589
b30ab791 1590void resume_next_sg(struct drbd_device *device)
b411b363 1591{
28bc3b8c
AG
1592 lock_all_resources();
1593 drbd_resume_next(device);
1594 unlock_all_resources();
b411b363
PR
1595}
1596
b30ab791 1597void suspend_other_sg(struct drbd_device *device)
b411b363 1598{
28bc3b8c
AG
1599 lock_all_resources();
1600 drbd_pause_after(device);
1601 unlock_all_resources();
b411b363
PR
1602}
1603
28bc3b8c 1604/* caller must lock_all_resources() */
b30ab791 1605enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
b411b363 1606{
54761697 1607 struct drbd_device *odev;
95f8efd0 1608 int resync_after;
b411b363
PR
1609
1610 if (o_minor == -1)
1611 return NO_ERROR;
a3f8f7dc 1612 if (o_minor < -1 || o_minor > MINORMASK)
95f8efd0 1613 return ERR_RESYNC_AFTER;
b411b363
PR
1614
1615 /* check for loops */
b30ab791 1616 odev = minor_to_device(o_minor);
b411b363 1617 while (1) {
b30ab791 1618 if (odev == device)
95f8efd0 1619 return ERR_RESYNC_AFTER_CYCLE;
b411b363 1620
a3f8f7dc
LE
1621 /* You are free to depend on diskless, non-existing,
1622 * or not yet/no longer existing minors.
1623 * We only reject dependency loops.
1624 * We cannot follow the dependency chain beyond a detached or
1625 * missing minor.
1626 */
1627 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1628 return NO_ERROR;
1629
daeda1cc 1630 rcu_read_lock();
95f8efd0 1631 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
daeda1cc 1632 rcu_read_unlock();
b411b363 1633 /* dependency chain ends here, no cycles. */
95f8efd0 1634 if (resync_after == -1)
b411b363
PR
1635 return NO_ERROR;
1636
1637 /* follow the dependency chain */
b30ab791 1638 odev = minor_to_device(resync_after);
b411b363
PR
1639 }
1640}
1641
28bc3b8c 1642/* caller must lock_all_resources() */
b30ab791 1643void drbd_resync_after_changed(struct drbd_device *device)
b411b363 1644{
28bc3b8c 1645 int changed;
b411b363 1646
dc97b708 1647 do {
28bc3b8c
AG
1648 changed = drbd_pause_after(device);
1649 changed |= drbd_resume_next(device);
1650 } while (changed);
b411b363
PR
1651}
1652
b30ab791 1653void drbd_rs_controller_reset(struct drbd_device *device)
9bd28d3c 1654{
ff8bd88b 1655 struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
813472ce
PR
1656 struct fifo_buffer *plan;
1657
b30ab791
AG
1658 atomic_set(&device->rs_sect_in, 0);
1659 atomic_set(&device->rs_sect_ev, 0);
1660 device->rs_in_flight = 0;
ff8bd88b
LE
1661 device->rs_last_events =
1662 (int)part_stat_read(&disk->part0, sectors[0]) +
1663 (int)part_stat_read(&disk->part0, sectors[1]);
813472ce
PR
1664
1665 /* Updating the RCU protected object in place is necessary since
1666 this function gets called from atomic context.
1667 It is valid since all other updates also lead to an completely
1668 empty fifo */
1669 rcu_read_lock();
b30ab791 1670 plan = rcu_dereference(device->rs_plan_s);
813472ce
PR
1671 plan->total = 0;
1672 fifo_set(plan, 0);
1673 rcu_read_unlock();
9bd28d3c
LE
1674}
1675
1f04af33
PR
1676void start_resync_timer_fn(unsigned long data)
1677{
b30ab791 1678 struct drbd_device *device = (struct drbd_device *) data;
ac0acb9e 1679 drbd_device_post_work(device, RS_START);
1f04af33
PR
1680}
1681
ac0acb9e 1682static void do_start_resync(struct drbd_device *device)
1f04af33 1683{
b30ab791 1684 if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
ac0acb9e 1685 drbd_warn(device, "postponing start_resync ...\n");
b30ab791
AG
1686 device->start_resync_timer.expires = jiffies + HZ/10;
1687 add_timer(&device->start_resync_timer);
ac0acb9e 1688 return;
1f04af33
PR
1689 }
1690
b30ab791
AG
1691 drbd_start_resync(device, C_SYNC_SOURCE);
1692 clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1f04af33
PR
1693}
1694
aaaba345
LE
1695static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1696{
1697 bool csums_after_crash_only;
1698 rcu_read_lock();
1699 csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1700 rcu_read_unlock();
1701 return connection->agreed_pro_version >= 89 && /* supported? */
1702 connection->csums_tfm && /* configured? */
1703 (csums_after_crash_only == 0 /* use for each resync? */
1704 || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1705}
1706
b411b363
PR
1707/**
1708 * drbd_start_resync() - Start the resync process
b30ab791 1709 * @device: DRBD device.
b411b363
PR
1710 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1711 *
1712 * This function might bring you directly into one of the
1713 * C_PAUSED_SYNC_* states.
1714 */
b30ab791 1715void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
b411b363 1716{
44a4d551
LE
1717 struct drbd_peer_device *peer_device = first_peer_device(device);
1718 struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
b411b363
PR
1719 union drbd_state ns;
1720 int r;
1721
b30ab791 1722 if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
d0180171 1723 drbd_err(device, "Resync already running!\n");
b411b363
PR
1724 return;
1725 }
1726
b30ab791 1727 if (!test_bit(B_RS_H_DONE, &device->flags)) {
e64a3294
PR
1728 if (side == C_SYNC_TARGET) {
1729 /* Since application IO was locked out during C_WF_BITMAP_T and
1730 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1731 we check that we might make the data inconsistent. */
b30ab791 1732 r = drbd_khelper(device, "before-resync-target");
e64a3294
PR
1733 r = (r >> 8) & 0xff;
1734 if (r > 0) {
d0180171 1735 drbd_info(device, "before-resync-target handler returned %d, "
09b9e797 1736 "dropping connection.\n", r);
44a4d551 1737 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
09b9e797
PR
1738 return;
1739 }
e64a3294 1740 } else /* C_SYNC_SOURCE */ {
b30ab791 1741 r = drbd_khelper(device, "before-resync-source");
e64a3294
PR
1742 r = (r >> 8) & 0xff;
1743 if (r > 0) {
1744 if (r == 3) {
d0180171 1745 drbd_info(device, "before-resync-source handler returned %d, "
e64a3294
PR
1746 "ignoring. Old userland tools?", r);
1747 } else {
d0180171 1748 drbd_info(device, "before-resync-source handler returned %d, "
e64a3294 1749 "dropping connection.\n", r);
44a4d551 1750 conn_request_state(connection,
a6b32bc3 1751 NS(conn, C_DISCONNECTING), CS_HARD);
e64a3294
PR
1752 return;
1753 }
1754 }
09b9e797 1755 }
b411b363
PR
1756 }
1757
44a4d551 1758 if (current == connection->worker.task) {
dad20554 1759 /* The worker should not sleep waiting for state_mutex,
e64a3294 1760 that can take long */
b30ab791
AG
1761 if (!mutex_trylock(device->state_mutex)) {
1762 set_bit(B_RS_H_DONE, &device->flags);
1763 device->start_resync_timer.expires = jiffies + HZ/5;
1764 add_timer(&device->start_resync_timer);
e64a3294
PR
1765 return;
1766 }
1767 } else {
b30ab791 1768 mutex_lock(device->state_mutex);
e64a3294 1769 }
b411b363 1770
28bc3b8c
AG
1771 lock_all_resources();
1772 clear_bit(B_RS_H_DONE, &device->flags);
a700471b 1773 /* Did some connection breakage or IO error race with us? */
b30ab791
AG
1774 if (device->state.conn < C_CONNECTED
1775 || !get_ldev_if_state(device, D_NEGOTIATING)) {
28bc3b8c
AG
1776 unlock_all_resources();
1777 goto out;
b411b363
PR
1778 }
1779
b30ab791 1780 ns = drbd_read_state(device);
b411b363 1781
b30ab791 1782 ns.aftr_isp = !_drbd_may_sync_now(device);
b411b363
PR
1783
1784 ns.conn = side;
1785
1786 if (side == C_SYNC_TARGET)
1787 ns.disk = D_INCONSISTENT;
1788 else /* side == C_SYNC_SOURCE */
1789 ns.pdsk = D_INCONSISTENT;
1790
28bc3b8c 1791 r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
b30ab791 1792 ns = drbd_read_state(device);
b411b363
PR
1793
1794 if (ns.conn < C_CONNECTED)
1795 r = SS_UNKNOWN_ERROR;
1796
1797 if (r == SS_SUCCESS) {
b30ab791 1798 unsigned long tw = drbd_bm_total_weight(device);
1d7734a0
LE
1799 unsigned long now = jiffies;
1800 int i;
1801
b30ab791
AG
1802 device->rs_failed = 0;
1803 device->rs_paused = 0;
1804 device->rs_same_csum = 0;
b30ab791
AG
1805 device->rs_last_sect_ev = 0;
1806 device->rs_total = tw;
1807 device->rs_start = now;
1d7734a0 1808 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
b30ab791
AG
1809 device->rs_mark_left[i] = tw;
1810 device->rs_mark_time[i] = now;
1d7734a0 1811 }
28bc3b8c 1812 drbd_pause_after(device);
5ab7d2c0
LE
1813 /* Forget potentially stale cached per resync extent bit-counts.
1814 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1815 * disabled, and know the disk state is ok. */
1816 spin_lock(&device->al_lock);
1817 lc_reset(device->resync);
1818 device->resync_locked = 0;
1819 device->resync_wenr = LC_FREE;
1820 spin_unlock(&device->al_lock);
b411b363 1821 }
28bc3b8c 1822 unlock_all_resources();
5a22db89 1823
b411b363 1824 if (r == SS_SUCCESS) {
5ab7d2c0 1825 wake_up(&device->al_wait); /* for lc_reset() above */
328e0f12
PR
1826 /* reset rs_last_bcast when a resync or verify is started,
1827 * to deal with potential jiffies wrap. */
b30ab791 1828 device->rs_last_bcast = jiffies - HZ;
328e0f12 1829
d0180171 1830 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
b411b363 1831 drbd_conn_str(ns.conn),
b30ab791
AG
1832 (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1833 (unsigned long) device->rs_total);
aaaba345 1834 if (side == C_SYNC_TARGET) {
b30ab791 1835 device->bm_resync_fo = 0;
aaaba345
LE
1836 device->use_csums = use_checksum_based_resync(connection, device);
1837 } else {
1838 device->use_csums = 0;
1839 }
6c922ed5
LE
1840
1841 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1842 * with w_send_oos, or the sync target will get confused as to
1843 * how much bits to resync. We cannot do that always, because for an
1844 * empty resync and protocol < 95, we need to do it here, as we call
1845 * drbd_resync_finished from here in that case.
1846 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1847 * and from after_state_ch otherwise. */
44a4d551
LE
1848 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1849 drbd_gen_and_send_sync_uuid(peer_device);
b411b363 1850
44a4d551 1851 if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
af85e8e8
LE
1852 /* This still has a race (about when exactly the peers
1853 * detect connection loss) that can lead to a full sync
1854 * on next handshake. In 8.3.9 we fixed this with explicit
1855 * resync-finished notifications, but the fix
1856 * introduces a protocol change. Sleeping for some
1857 * time longer than the ping interval + timeout on the
1858 * SyncSource, to give the SyncTarget the chance to
1859 * detect connection loss, then waiting for a ping
1860 * response (implicit in drbd_resync_finished) reduces
1861 * the race considerably, but does not solve it. */
44ed167d
PR
1862 if (side == C_SYNC_SOURCE) {
1863 struct net_conf *nc;
1864 int timeo;
1865
1866 rcu_read_lock();
44a4d551 1867 nc = rcu_dereference(connection->net_conf);
44ed167d
PR
1868 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1869 rcu_read_unlock();
1870 schedule_timeout_interruptible(timeo);
1871 }
b30ab791 1872 drbd_resync_finished(device);
b411b363
PR
1873 }
1874
b30ab791
AG
1875 drbd_rs_controller_reset(device);
1876 /* ns.conn may already be != device->state.conn,
b411b363
PR
1877 * we may have been paused in between, or become paused until
1878 * the timer triggers.
1879 * No matter, that is handled in resync_timer_fn() */
1880 if (ns.conn == C_SYNC_TARGET)
b30ab791 1881 mod_timer(&device->resync_timer, jiffies);
b411b363 1882
b30ab791 1883 drbd_md_sync(device);
b411b363 1884 }
b30ab791 1885 put_ldev(device);
28bc3b8c 1886out:
b30ab791 1887 mutex_unlock(device->state_mutex);
b411b363
PR
1888}
1889
e334f550 1890static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
c7a58db4
LE
1891{
1892 struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1893 device->rs_last_bcast = jiffies;
1894
1895 if (!get_ldev(device))
1896 return;
1897
1898 drbd_bm_write_lazy(device, 0);
5ab7d2c0 1899 if (resync_done && is_sync_state(device->state.conn))
c7a58db4 1900 drbd_resync_finished(device);
5ab7d2c0 1901
c7a58db4
LE
1902 drbd_bcast_event(device, &sib);
1903 /* update timestamp, in case it took a while to write out stuff */
1904 device->rs_last_bcast = jiffies;
1905 put_ldev(device);
1906}
1907
e334f550
LE
1908static void drbd_ldev_destroy(struct drbd_device *device)
1909{
1910 lc_destroy(device->resync);
1911 device->resync = NULL;
1912 lc_destroy(device->act_log);
1913 device->act_log = NULL;
d1b80853
AG
1914
1915 __acquire(local);
63a7c8ad 1916 drbd_backing_dev_free(device, device->ldev);
d1b80853
AG
1917 device->ldev = NULL;
1918 __release(local);
1919
e334f550
LE
1920 clear_bit(GOING_DISKLESS, &device->flags);
1921 wake_up(&device->misc_wait);
1922}
1923
1924static void go_diskless(struct drbd_device *device)
1925{
1926 D_ASSERT(device, device->state.disk == D_FAILED);
1927 /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1928 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1929 * the protected members anymore, though, so once put_ldev reaches zero
1930 * again, it will be safe to free them. */
1931
1932 /* Try to write changed bitmap pages, read errors may have just
1933 * set some bits outside the area covered by the activity log.
1934 *
1935 * If we have an IO error during the bitmap writeout,
1936 * we will want a full sync next time, just in case.
1937 * (Do we want a specific meta data flag for this?)
1938 *
1939 * If that does not make it to stable storage either,
1940 * we cannot do anything about that anymore.
1941 *
1942 * We still need to check if both bitmap and ldev are present, we may
1943 * end up here after a failed attach, before ldev was even assigned.
1944 */
1945 if (device->bitmap && device->ldev) {
1946 /* An interrupted resync or similar is allowed to recounts bits
1947 * while we detach.
1948 * Any modifications would not be expected anymore, though.
1949 */
1950 if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1951 "detach", BM_LOCKED_TEST_ALLOWED)) {
1952 if (test_bit(WAS_READ_ERROR, &device->flags)) {
1953 drbd_md_set_flag(device, MDF_FULL_SYNC);
1954 drbd_md_sync(device);
1955 }
1956 }
1957 }
1958
1959 drbd_force_state(device, NS(disk, D_DISKLESS));
1960}
1961
ac0acb9e
LE
1962static int do_md_sync(struct drbd_device *device)
1963{
1964 drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1965 drbd_md_sync(device);
1966 return 0;
1967}
1968
944410e9
LE
1969/* only called from drbd_worker thread, no locking */
1970void __update_timing_details(
1971 struct drbd_thread_timing_details *tdp,
1972 unsigned int *cb_nr,
1973 void *cb,
1974 const char *fn, const unsigned int line)
1975{
1976 unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
1977 struct drbd_thread_timing_details *td = tdp + i;
1978
1979 td->start_jif = jiffies;
1980 td->cb_addr = cb;
1981 td->caller_fn = fn;
1982 td->line = line;
1983 td->cb_nr = *cb_nr;
1984
1985 i = (i+1) % DRBD_THREAD_DETAILS_HIST;
1986 td = tdp + i;
1987 memset(td, 0, sizeof(*td));
1988
1989 ++(*cb_nr);
1990}
1991
e334f550
LE
1992static void do_device_work(struct drbd_device *device, const unsigned long todo)
1993{
b47a06d1 1994 if (test_bit(MD_SYNC, &todo))
ac0acb9e 1995 do_md_sync(device);
b47a06d1
AG
1996 if (test_bit(RS_DONE, &todo) ||
1997 test_bit(RS_PROGRESS, &todo))
1998 update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
1999 if (test_bit(GO_DISKLESS, &todo))
e334f550 2000 go_diskless(device);
b47a06d1 2001 if (test_bit(DESTROY_DISK, &todo))
e334f550 2002 drbd_ldev_destroy(device);
b47a06d1 2003 if (test_bit(RS_START, &todo))
ac0acb9e 2004 do_start_resync(device);
e334f550
LE
2005}
2006
2007#define DRBD_DEVICE_WORK_MASK \
2008 ((1UL << GO_DISKLESS) \
2009 |(1UL << DESTROY_DISK) \
ac0acb9e
LE
2010 |(1UL << MD_SYNC) \
2011 |(1UL << RS_START) \
e334f550
LE
2012 |(1UL << RS_PROGRESS) \
2013 |(1UL << RS_DONE) \
2014 )
2015
2016static unsigned long get_work_bits(unsigned long *flags)
2017{
2018 unsigned long old, new;
2019 do {
2020 old = *flags;
2021 new = old & ~DRBD_DEVICE_WORK_MASK;
2022 } while (cmpxchg(flags, old, new) != old);
2023 return old & DRBD_DEVICE_WORK_MASK;
2024}
2025
2026static void do_unqueued_work(struct drbd_connection *connection)
c7a58db4
LE
2027{
2028 struct drbd_peer_device *peer_device;
2029 int vnr;
2030
2031 rcu_read_lock();
2032 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2033 struct drbd_device *device = peer_device->device;
e334f550
LE
2034 unsigned long todo = get_work_bits(&device->flags);
2035 if (!todo)
c7a58db4 2036 continue;
5ab7d2c0 2037
c7a58db4
LE
2038 kref_get(&device->kref);
2039 rcu_read_unlock();
e334f550 2040 do_device_work(device, todo);
c7a58db4
LE
2041 kref_put(&device->kref, drbd_destroy_device);
2042 rcu_read_lock();
2043 }
2044 rcu_read_unlock();
2045}
2046
a186e478 2047static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
8c0785a5
LE
2048{
2049 spin_lock_irq(&queue->q_lock);
15e26f6a 2050 list_splice_tail_init(&queue->q, work_list);
8c0785a5
LE
2051 spin_unlock_irq(&queue->q_lock);
2052 return !list_empty(work_list);
2053}
2054
bde89a9e 2055static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
b6dd1a89
LE
2056{
2057 DEFINE_WAIT(wait);
2058 struct net_conf *nc;
2059 int uncork, cork;
2060
abde9cc6 2061 dequeue_work_batch(&connection->sender_work, work_list);
b6dd1a89
LE
2062 if (!list_empty(work_list))
2063 return;
2064
2065 /* Still nothing to do?
2066 * Maybe we still need to close the current epoch,
2067 * even if no new requests are queued yet.
2068 *
2069 * Also, poke TCP, just in case.
2070 * Then wait for new work (or signal). */
2071 rcu_read_lock();
2072 nc = rcu_dereference(connection->net_conf);
2073 uncork = nc ? nc->tcp_cork : 0;
2074 rcu_read_unlock();
2075 if (uncork) {
2076 mutex_lock(&connection->data.mutex);
2077 if (connection->data.socket)
2078 drbd_tcp_uncork(connection->data.socket);
2079 mutex_unlock(&connection->data.mutex);
2080 }
2081
2082 for (;;) {
2083 int send_barrier;
2084 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
0500813f 2085 spin_lock_irq(&connection->resource->req_lock);
b6dd1a89 2086 spin_lock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
bc317a9e 2087 if (!list_empty(&connection->sender_work.q))
4dd726f0 2088 list_splice_tail_init(&connection->sender_work.q, work_list);
b6dd1a89
LE
2089 spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
2090 if (!list_empty(work_list) || signal_pending(current)) {
0500813f 2091 spin_unlock_irq(&connection->resource->req_lock);
b6dd1a89
LE
2092 break;
2093 }
f9c78128
LE
2094
2095 /* We found nothing new to do, no to-be-communicated request,
2096 * no other work item. We may still need to close the last
2097 * epoch. Next incoming request epoch will be connection ->
2098 * current transfer log epoch number. If that is different
2099 * from the epoch of the last request we communicated, it is
2100 * safe to send the epoch separating barrier now.
2101 */
2102 send_barrier =
2103 atomic_read(&connection->current_tle_nr) !=
2104 connection->send.current_epoch_nr;
0500813f 2105 spin_unlock_irq(&connection->resource->req_lock);
f9c78128
LE
2106
2107 if (send_barrier)
2108 maybe_send_barrier(connection,
2109 connection->send.current_epoch_nr + 1);
5ab7d2c0 2110
e334f550 2111 if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
5ab7d2c0
LE
2112 break;
2113
a80ca1ae
LE
2114 /* drbd_send() may have called flush_signals() */
2115 if (get_t_state(&connection->worker) != RUNNING)
2116 break;
5ab7d2c0 2117
b6dd1a89
LE
2118 schedule();
2119 /* may be woken up for other things but new work, too,
2120 * e.g. if the current epoch got closed.
2121 * In which case we send the barrier above. */
2122 }
2123 finish_wait(&connection->sender_work.q_wait, &wait);
2124
2125 /* someone may have changed the config while we have been waiting above. */
2126 rcu_read_lock();
2127 nc = rcu_dereference(connection->net_conf);
2128 cork = nc ? nc->tcp_cork : 0;
2129 rcu_read_unlock();
2130 mutex_lock(&connection->data.mutex);
2131 if (connection->data.socket) {
2132 if (cork)
2133 drbd_tcp_cork(connection->data.socket);
2134 else if (!uncork)
2135 drbd_tcp_uncork(connection->data.socket);
2136 }
2137 mutex_unlock(&connection->data.mutex);
2138}
2139
b411b363
PR
2140int drbd_worker(struct drbd_thread *thi)
2141{
bde89a9e 2142 struct drbd_connection *connection = thi->connection;
6db7e50a 2143 struct drbd_work *w = NULL;
c06ece6b 2144 struct drbd_peer_device *peer_device;
b411b363 2145 LIST_HEAD(work_list);
8c0785a5 2146 int vnr;
b411b363 2147
e77a0a5c 2148 while (get_t_state(thi) == RUNNING) {
80822284 2149 drbd_thread_current_set_cpu(thi);
b411b363 2150
944410e9
LE
2151 if (list_empty(&work_list)) {
2152 update_worker_timing_details(connection, wait_for_work);
bde89a9e 2153 wait_for_work(connection, &work_list);
944410e9 2154 }
b411b363 2155
944410e9
LE
2156 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2157 update_worker_timing_details(connection, do_unqueued_work);
e334f550 2158 do_unqueued_work(connection);
944410e9 2159 }
5ab7d2c0 2160
8c0785a5 2161 if (signal_pending(current)) {
b411b363 2162 flush_signals(current);
19393e10 2163 if (get_t_state(thi) == RUNNING) {
1ec861eb 2164 drbd_warn(connection, "Worker got an unexpected signal\n");
b411b363 2165 continue;
19393e10 2166 }
b411b363
PR
2167 break;
2168 }
2169
e77a0a5c 2170 if (get_t_state(thi) != RUNNING)
b411b363 2171 break;
b411b363 2172
729e8b87 2173 if (!list_empty(&work_list)) {
6db7e50a
AG
2174 w = list_first_entry(&work_list, struct drbd_work, list);
2175 list_del_init(&w->list);
944410e9 2176 update_worker_timing_details(connection, w->cb);
6db7e50a 2177 if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
8c0785a5 2178 continue;
bde89a9e
AG
2179 if (connection->cstate >= C_WF_REPORT_PARAMS)
2180 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
b411b363
PR
2181 }
2182 }
b411b363 2183
8c0785a5 2184 do {
944410e9
LE
2185 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2186 update_worker_timing_details(connection, do_unqueued_work);
e334f550 2187 do_unqueued_work(connection);
944410e9 2188 }
729e8b87 2189 if (!list_empty(&work_list)) {
6db7e50a
AG
2190 w = list_first_entry(&work_list, struct drbd_work, list);
2191 list_del_init(&w->list);
944410e9 2192 update_worker_timing_details(connection, w->cb);
6db7e50a 2193 w->cb(w, 1);
729e8b87
LE
2194 } else
2195 dequeue_work_batch(&connection->sender_work, &work_list);
e334f550 2196 } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
b411b363 2197
c141ebda 2198 rcu_read_lock();
c06ece6b
AG
2199 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2200 struct drbd_device *device = peer_device->device;
0b0ba1ef 2201 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
b30ab791 2202 kref_get(&device->kref);
c141ebda 2203 rcu_read_unlock();
b30ab791 2204 drbd_device_cleanup(device);
05a10ec7 2205 kref_put(&device->kref, drbd_destroy_device);
c141ebda 2206 rcu_read_lock();
0e29d163 2207 }
c141ebda 2208 rcu_read_unlock();
b411b363
PR
2209
2210 return 0;
2211}