]> git.proxmox.com Git - mirror_ubuntu-artful-kernel.git/blob - drivers/block/drbd/drbd_worker.c
Merge tag 'xfs-4.13-merge-6' of git://git.kernel.org/pub/scm/fs/xfs/xfs-linux
[mirror_ubuntu-artful-kernel.git] / drivers / block / drbd / drbd_worker.c
1 /*
2 drbd_worker.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched/signal.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_protocol.h"
40 #include "drbd_req.h"
41
42 static int make_ov_request(struct drbd_device *, int);
43 static int make_resync_request(struct drbd_device *, int);
44
45 /* endio handlers:
46 * drbd_md_endio (defined here)
47 * drbd_request_endio (defined here)
48 * drbd_peer_request_endio (defined here)
49 * drbd_bm_endio (defined in drbd_bitmap.c)
50 *
51 * For all these callbacks, note the following:
52 * The callbacks will be called in irq context by the IDE drivers,
53 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54 * Try to get the locking right :)
55 *
56 */
57
58 /* used for synchronous meta data and bitmap IO
59 * submitted by drbd_md_sync_page_io()
60 */
61 void drbd_md_endio(struct bio *bio)
62 {
63 struct drbd_device *device;
64
65 device = bio->bi_private;
66 device->md_io.error = blk_status_to_errno(bio->bi_status);
67
68 /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
69 * to timeout on the lower level device, and eventually detach from it.
70 * If this io completion runs after that timeout expired, this
71 * drbd_md_put_buffer() may allow us to finally try and re-attach.
72 * During normal operation, this only puts that extra reference
73 * down to 1 again.
74 * Make sure we first drop the reference, and only then signal
75 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
76 * next drbd_md_sync_page_io(), that we trigger the
77 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
78 */
79 drbd_md_put_buffer(device);
80 device->md_io.done = 1;
81 wake_up(&device->misc_wait);
82 bio_put(bio);
83 if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
84 put_ldev(device);
85 }
86
87 /* reads on behalf of the partner,
88 * "submitted" by the receiver
89 */
90 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
91 {
92 unsigned long flags = 0;
93 struct drbd_peer_device *peer_device = peer_req->peer_device;
94 struct drbd_device *device = peer_device->device;
95
96 spin_lock_irqsave(&device->resource->req_lock, flags);
97 device->read_cnt += peer_req->i.size >> 9;
98 list_del(&peer_req->w.list);
99 if (list_empty(&device->read_ee))
100 wake_up(&device->ee_wait);
101 if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
102 __drbd_chk_io_error(device, DRBD_READ_ERROR);
103 spin_unlock_irqrestore(&device->resource->req_lock, flags);
104
105 drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
106 put_ldev(device);
107 }
108
109 /* writes on behalf of the partner, or resync writes,
110 * "submitted" by the receiver, final stage. */
111 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
112 {
113 unsigned long flags = 0;
114 struct drbd_peer_device *peer_device = peer_req->peer_device;
115 struct drbd_device *device = peer_device->device;
116 struct drbd_connection *connection = peer_device->connection;
117 struct drbd_interval i;
118 int do_wake;
119 u64 block_id;
120 int do_al_complete_io;
121
122 /* after we moved peer_req to done_ee,
123 * we may no longer access it,
124 * it may be freed/reused already!
125 * (as soon as we release the req_lock) */
126 i = peer_req->i;
127 do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
128 block_id = peer_req->block_id;
129 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
130
131 spin_lock_irqsave(&device->resource->req_lock, flags);
132 device->writ_cnt += peer_req->i.size >> 9;
133 list_move_tail(&peer_req->w.list, &device->done_ee);
134
135 /*
136 * Do not remove from the write_requests tree here: we did not send the
137 * Ack yet and did not wake possibly waiting conflicting requests.
138 * Removed from the tree from "drbd_process_done_ee" within the
139 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
140 * _drbd_clear_done_ee.
141 */
142
143 do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
144
145 /* FIXME do we want to detach for failed REQ_DISCARD?
146 * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
147 if (peer_req->flags & EE_WAS_ERROR)
148 __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
149
150 if (connection->cstate >= C_WF_REPORT_PARAMS) {
151 kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
152 if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
153 kref_put(&device->kref, drbd_destroy_device);
154 }
155 spin_unlock_irqrestore(&device->resource->req_lock, flags);
156
157 if (block_id == ID_SYNCER)
158 drbd_rs_complete_io(device, i.sector);
159
160 if (do_wake)
161 wake_up(&device->ee_wait);
162
163 if (do_al_complete_io)
164 drbd_al_complete_io(device, &i);
165
166 put_ldev(device);
167 }
168
169 /* writes on behalf of the partner, or resync writes,
170 * "submitted" by the receiver.
171 */
172 void drbd_peer_request_endio(struct bio *bio)
173 {
174 struct drbd_peer_request *peer_req = bio->bi_private;
175 struct drbd_device *device = peer_req->peer_device->device;
176 bool is_write = bio_data_dir(bio) == WRITE;
177 bool is_discard = bio_op(bio) == REQ_OP_WRITE_ZEROES ||
178 bio_op(bio) == REQ_OP_DISCARD;
179
180 if (bio->bi_status && __ratelimit(&drbd_ratelimit_state))
181 drbd_warn(device, "%s: error=%d s=%llus\n",
182 is_write ? (is_discard ? "discard" : "write")
183 : "read", bio->bi_status,
184 (unsigned long long)peer_req->i.sector);
185
186 if (bio->bi_status)
187 set_bit(__EE_WAS_ERROR, &peer_req->flags);
188
189 bio_put(bio); /* no need for the bio anymore */
190 if (atomic_dec_and_test(&peer_req->pending_bios)) {
191 if (is_write)
192 drbd_endio_write_sec_final(peer_req);
193 else
194 drbd_endio_read_sec_final(peer_req);
195 }
196 }
197
198 void drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
199 {
200 panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
201 device->minor, device->resource->name, device->vnr);
202 }
203
204 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
205 */
206 void drbd_request_endio(struct bio *bio)
207 {
208 unsigned long flags;
209 struct drbd_request *req = bio->bi_private;
210 struct drbd_device *device = req->device;
211 struct bio_and_error m;
212 enum drbd_req_event what;
213
214 /* If this request was aborted locally before,
215 * but now was completed "successfully",
216 * chances are that this caused arbitrary data corruption.
217 *
218 * "aborting" requests, or force-detaching the disk, is intended for
219 * completely blocked/hung local backing devices which do no longer
220 * complete requests at all, not even do error completions. In this
221 * situation, usually a hard-reset and failover is the only way out.
222 *
223 * By "aborting", basically faking a local error-completion,
224 * we allow for a more graceful swichover by cleanly migrating services.
225 * Still the affected node has to be rebooted "soon".
226 *
227 * By completing these requests, we allow the upper layers to re-use
228 * the associated data pages.
229 *
230 * If later the local backing device "recovers", and now DMAs some data
231 * from disk into the original request pages, in the best case it will
232 * just put random data into unused pages; but typically it will corrupt
233 * meanwhile completely unrelated data, causing all sorts of damage.
234 *
235 * Which means delayed successful completion,
236 * especially for READ requests,
237 * is a reason to panic().
238 *
239 * We assume that a delayed *error* completion is OK,
240 * though we still will complain noisily about it.
241 */
242 if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
243 if (__ratelimit(&drbd_ratelimit_state))
244 drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
245
246 if (!bio->bi_status)
247 drbd_panic_after_delayed_completion_of_aborted_request(device);
248 }
249
250 /* to avoid recursion in __req_mod */
251 if (unlikely(bio->bi_status)) {
252 switch (bio_op(bio)) {
253 case REQ_OP_WRITE_ZEROES:
254 case REQ_OP_DISCARD:
255 if (bio->bi_status == BLK_STS_NOTSUPP)
256 what = DISCARD_COMPLETED_NOTSUPP;
257 else
258 what = DISCARD_COMPLETED_WITH_ERROR;
259 break;
260 case REQ_OP_READ:
261 if (bio->bi_opf & REQ_RAHEAD)
262 what = READ_AHEAD_COMPLETED_WITH_ERROR;
263 else
264 what = READ_COMPLETED_WITH_ERROR;
265 break;
266 default:
267 what = WRITE_COMPLETED_WITH_ERROR;
268 break;
269 }
270 } else {
271 what = COMPLETED_OK;
272 }
273
274 bio_put(req->private_bio);
275 req->private_bio = ERR_PTR(blk_status_to_errno(bio->bi_status));
276
277 /* not req_mod(), we need irqsave here! */
278 spin_lock_irqsave(&device->resource->req_lock, flags);
279 __req_mod(req, what, &m);
280 spin_unlock_irqrestore(&device->resource->req_lock, flags);
281 put_ldev(device);
282
283 if (m.bio)
284 complete_master_bio(device, &m);
285 }
286
287 void drbd_csum_ee(struct crypto_ahash *tfm, struct drbd_peer_request *peer_req, void *digest)
288 {
289 AHASH_REQUEST_ON_STACK(req, tfm);
290 struct scatterlist sg;
291 struct page *page = peer_req->pages;
292 struct page *tmp;
293 unsigned len;
294
295 ahash_request_set_tfm(req, tfm);
296 ahash_request_set_callback(req, 0, NULL, NULL);
297
298 sg_init_table(&sg, 1);
299 crypto_ahash_init(req);
300
301 while ((tmp = page_chain_next(page))) {
302 /* all but the last page will be fully used */
303 sg_set_page(&sg, page, PAGE_SIZE, 0);
304 ahash_request_set_crypt(req, &sg, NULL, sg.length);
305 crypto_ahash_update(req);
306 page = tmp;
307 }
308 /* and now the last, possibly only partially used page */
309 len = peer_req->i.size & (PAGE_SIZE - 1);
310 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
311 ahash_request_set_crypt(req, &sg, digest, sg.length);
312 crypto_ahash_finup(req);
313 ahash_request_zero(req);
314 }
315
316 void drbd_csum_bio(struct crypto_ahash *tfm, struct bio *bio, void *digest)
317 {
318 AHASH_REQUEST_ON_STACK(req, tfm);
319 struct scatterlist sg;
320 struct bio_vec bvec;
321 struct bvec_iter iter;
322
323 ahash_request_set_tfm(req, tfm);
324 ahash_request_set_callback(req, 0, NULL, NULL);
325
326 sg_init_table(&sg, 1);
327 crypto_ahash_init(req);
328
329 bio_for_each_segment(bvec, bio, iter) {
330 sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
331 ahash_request_set_crypt(req, &sg, NULL, sg.length);
332 crypto_ahash_update(req);
333 /* REQ_OP_WRITE_SAME has only one segment,
334 * checksum the payload only once. */
335 if (bio_op(bio) == REQ_OP_WRITE_SAME)
336 break;
337 }
338 ahash_request_set_crypt(req, NULL, digest, 0);
339 crypto_ahash_final(req);
340 ahash_request_zero(req);
341 }
342
343 /* MAYBE merge common code with w_e_end_ov_req */
344 static int w_e_send_csum(struct drbd_work *w, int cancel)
345 {
346 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
347 struct drbd_peer_device *peer_device = peer_req->peer_device;
348 struct drbd_device *device = peer_device->device;
349 int digest_size;
350 void *digest;
351 int err = 0;
352
353 if (unlikely(cancel))
354 goto out;
355
356 if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
357 goto out;
358
359 digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
360 digest = kmalloc(digest_size, GFP_NOIO);
361 if (digest) {
362 sector_t sector = peer_req->i.sector;
363 unsigned int size = peer_req->i.size;
364 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
365 /* Free peer_req and pages before send.
366 * In case we block on congestion, we could otherwise run into
367 * some distributed deadlock, if the other side blocks on
368 * congestion as well, because our receiver blocks in
369 * drbd_alloc_pages due to pp_in_use > max_buffers. */
370 drbd_free_peer_req(device, peer_req);
371 peer_req = NULL;
372 inc_rs_pending(device);
373 err = drbd_send_drequest_csum(peer_device, sector, size,
374 digest, digest_size,
375 P_CSUM_RS_REQUEST);
376 kfree(digest);
377 } else {
378 drbd_err(device, "kmalloc() of digest failed.\n");
379 err = -ENOMEM;
380 }
381
382 out:
383 if (peer_req)
384 drbd_free_peer_req(device, peer_req);
385
386 if (unlikely(err))
387 drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
388 return err;
389 }
390
391 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
392
393 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
394 {
395 struct drbd_device *device = peer_device->device;
396 struct drbd_peer_request *peer_req;
397
398 if (!get_ldev(device))
399 return -EIO;
400
401 /* GFP_TRY, because if there is no memory available right now, this may
402 * be rescheduled for later. It is "only" background resync, after all. */
403 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
404 size, size, GFP_TRY);
405 if (!peer_req)
406 goto defer;
407
408 peer_req->w.cb = w_e_send_csum;
409 spin_lock_irq(&device->resource->req_lock);
410 list_add_tail(&peer_req->w.list, &device->read_ee);
411 spin_unlock_irq(&device->resource->req_lock);
412
413 atomic_add(size >> 9, &device->rs_sect_ev);
414 if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
415 DRBD_FAULT_RS_RD) == 0)
416 return 0;
417
418 /* If it failed because of ENOMEM, retry should help. If it failed
419 * because bio_add_page failed (probably broken lower level driver),
420 * retry may or may not help.
421 * If it does not, you may need to force disconnect. */
422 spin_lock_irq(&device->resource->req_lock);
423 list_del(&peer_req->w.list);
424 spin_unlock_irq(&device->resource->req_lock);
425
426 drbd_free_peer_req(device, peer_req);
427 defer:
428 put_ldev(device);
429 return -EAGAIN;
430 }
431
432 int w_resync_timer(struct drbd_work *w, int cancel)
433 {
434 struct drbd_device *device =
435 container_of(w, struct drbd_device, resync_work);
436
437 switch (device->state.conn) {
438 case C_VERIFY_S:
439 make_ov_request(device, cancel);
440 break;
441 case C_SYNC_TARGET:
442 make_resync_request(device, cancel);
443 break;
444 }
445
446 return 0;
447 }
448
449 void resync_timer_fn(unsigned long data)
450 {
451 struct drbd_device *device = (struct drbd_device *) data;
452
453 drbd_queue_work_if_unqueued(
454 &first_peer_device(device)->connection->sender_work,
455 &device->resync_work);
456 }
457
458 static void fifo_set(struct fifo_buffer *fb, int value)
459 {
460 int i;
461
462 for (i = 0; i < fb->size; i++)
463 fb->values[i] = value;
464 }
465
466 static int fifo_push(struct fifo_buffer *fb, int value)
467 {
468 int ov;
469
470 ov = fb->values[fb->head_index];
471 fb->values[fb->head_index++] = value;
472
473 if (fb->head_index >= fb->size)
474 fb->head_index = 0;
475
476 return ov;
477 }
478
479 static void fifo_add_val(struct fifo_buffer *fb, int value)
480 {
481 int i;
482
483 for (i = 0; i < fb->size; i++)
484 fb->values[i] += value;
485 }
486
487 struct fifo_buffer *fifo_alloc(int fifo_size)
488 {
489 struct fifo_buffer *fb;
490
491 fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
492 if (!fb)
493 return NULL;
494
495 fb->head_index = 0;
496 fb->size = fifo_size;
497 fb->total = 0;
498
499 return fb;
500 }
501
502 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
503 {
504 struct disk_conf *dc;
505 unsigned int want; /* The number of sectors we want in-flight */
506 int req_sect; /* Number of sectors to request in this turn */
507 int correction; /* Number of sectors more we need in-flight */
508 int cps; /* correction per invocation of drbd_rs_controller() */
509 int steps; /* Number of time steps to plan ahead */
510 int curr_corr;
511 int max_sect;
512 struct fifo_buffer *plan;
513
514 dc = rcu_dereference(device->ldev->disk_conf);
515 plan = rcu_dereference(device->rs_plan_s);
516
517 steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
518
519 if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
520 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
521 } else { /* normal path */
522 want = dc->c_fill_target ? dc->c_fill_target :
523 sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
524 }
525
526 correction = want - device->rs_in_flight - plan->total;
527
528 /* Plan ahead */
529 cps = correction / steps;
530 fifo_add_val(plan, cps);
531 plan->total += cps * steps;
532
533 /* What we do in this step */
534 curr_corr = fifo_push(plan, 0);
535 plan->total -= curr_corr;
536
537 req_sect = sect_in + curr_corr;
538 if (req_sect < 0)
539 req_sect = 0;
540
541 max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
542 if (req_sect > max_sect)
543 req_sect = max_sect;
544
545 /*
546 drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
547 sect_in, device->rs_in_flight, want, correction,
548 steps, cps, device->rs_planed, curr_corr, req_sect);
549 */
550
551 return req_sect;
552 }
553
554 static int drbd_rs_number_requests(struct drbd_device *device)
555 {
556 unsigned int sect_in; /* Number of sectors that came in since the last turn */
557 int number, mxb;
558
559 sect_in = atomic_xchg(&device->rs_sect_in, 0);
560 device->rs_in_flight -= sect_in;
561
562 rcu_read_lock();
563 mxb = drbd_get_max_buffers(device) / 2;
564 if (rcu_dereference(device->rs_plan_s)->size) {
565 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
566 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
567 } else {
568 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
569 number = SLEEP_TIME * device->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ);
570 }
571 rcu_read_unlock();
572
573 /* Don't have more than "max-buffers"/2 in-flight.
574 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
575 * potentially causing a distributed deadlock on congestion during
576 * online-verify or (checksum-based) resync, if max-buffers,
577 * socket buffer sizes and resync rate settings are mis-configured. */
578
579 /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
580 * mxb (as used here, and in drbd_alloc_pages on the peer) is
581 * "number of pages" (typically also 4k),
582 * but "rs_in_flight" is in "sectors" (512 Byte). */
583 if (mxb - device->rs_in_flight/8 < number)
584 number = mxb - device->rs_in_flight/8;
585
586 return number;
587 }
588
589 static int make_resync_request(struct drbd_device *const device, int cancel)
590 {
591 struct drbd_peer_device *const peer_device = first_peer_device(device);
592 struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
593 unsigned long bit;
594 sector_t sector;
595 const sector_t capacity = drbd_get_capacity(device->this_bdev);
596 int max_bio_size;
597 int number, rollback_i, size;
598 int align, requeue = 0;
599 int i = 0;
600 int discard_granularity = 0;
601
602 if (unlikely(cancel))
603 return 0;
604
605 if (device->rs_total == 0) {
606 /* empty resync? */
607 drbd_resync_finished(device);
608 return 0;
609 }
610
611 if (!get_ldev(device)) {
612 /* Since we only need to access device->rsync a
613 get_ldev_if_state(device,D_FAILED) would be sufficient, but
614 to continue resync with a broken disk makes no sense at
615 all */
616 drbd_err(device, "Disk broke down during resync!\n");
617 return 0;
618 }
619
620 if (connection->agreed_features & DRBD_FF_THIN_RESYNC) {
621 rcu_read_lock();
622 discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity;
623 rcu_read_unlock();
624 }
625
626 max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
627 number = drbd_rs_number_requests(device);
628 if (number <= 0)
629 goto requeue;
630
631 for (i = 0; i < number; i++) {
632 /* Stop generating RS requests when half of the send buffer is filled,
633 * but notify TCP that we'd like to have more space. */
634 mutex_lock(&connection->data.mutex);
635 if (connection->data.socket) {
636 struct sock *sk = connection->data.socket->sk;
637 int queued = sk->sk_wmem_queued;
638 int sndbuf = sk->sk_sndbuf;
639 if (queued > sndbuf / 2) {
640 requeue = 1;
641 if (sk->sk_socket)
642 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
643 }
644 } else
645 requeue = 1;
646 mutex_unlock(&connection->data.mutex);
647 if (requeue)
648 goto requeue;
649
650 next_sector:
651 size = BM_BLOCK_SIZE;
652 bit = drbd_bm_find_next(device, device->bm_resync_fo);
653
654 if (bit == DRBD_END_OF_BITMAP) {
655 device->bm_resync_fo = drbd_bm_bits(device);
656 put_ldev(device);
657 return 0;
658 }
659
660 sector = BM_BIT_TO_SECT(bit);
661
662 if (drbd_try_rs_begin_io(device, sector)) {
663 device->bm_resync_fo = bit;
664 goto requeue;
665 }
666 device->bm_resync_fo = bit + 1;
667
668 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
669 drbd_rs_complete_io(device, sector);
670 goto next_sector;
671 }
672
673 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
674 /* try to find some adjacent bits.
675 * we stop if we have already the maximum req size.
676 *
677 * Additionally always align bigger requests, in order to
678 * be prepared for all stripe sizes of software RAIDs.
679 */
680 align = 1;
681 rollback_i = i;
682 while (i < number) {
683 if (size + BM_BLOCK_SIZE > max_bio_size)
684 break;
685
686 /* Be always aligned */
687 if (sector & ((1<<(align+3))-1))
688 break;
689
690 if (discard_granularity && size == discard_granularity)
691 break;
692
693 /* do not cross extent boundaries */
694 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
695 break;
696 /* now, is it actually dirty, after all?
697 * caution, drbd_bm_test_bit is tri-state for some
698 * obscure reason; ( b == 0 ) would get the out-of-band
699 * only accidentally right because of the "oddly sized"
700 * adjustment below */
701 if (drbd_bm_test_bit(device, bit+1) != 1)
702 break;
703 bit++;
704 size += BM_BLOCK_SIZE;
705 if ((BM_BLOCK_SIZE << align) <= size)
706 align++;
707 i++;
708 }
709 /* if we merged some,
710 * reset the offset to start the next drbd_bm_find_next from */
711 if (size > BM_BLOCK_SIZE)
712 device->bm_resync_fo = bit + 1;
713 #endif
714
715 /* adjust very last sectors, in case we are oddly sized */
716 if (sector + (size>>9) > capacity)
717 size = (capacity-sector)<<9;
718
719 if (device->use_csums) {
720 switch (read_for_csum(peer_device, sector, size)) {
721 case -EIO: /* Disk failure */
722 put_ldev(device);
723 return -EIO;
724 case -EAGAIN: /* allocation failed, or ldev busy */
725 drbd_rs_complete_io(device, sector);
726 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
727 i = rollback_i;
728 goto requeue;
729 case 0:
730 /* everything ok */
731 break;
732 default:
733 BUG();
734 }
735 } else {
736 int err;
737
738 inc_rs_pending(device);
739 err = drbd_send_drequest(peer_device,
740 size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST,
741 sector, size, ID_SYNCER);
742 if (err) {
743 drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
744 dec_rs_pending(device);
745 put_ldev(device);
746 return err;
747 }
748 }
749 }
750
751 if (device->bm_resync_fo >= drbd_bm_bits(device)) {
752 /* last syncer _request_ was sent,
753 * but the P_RS_DATA_REPLY not yet received. sync will end (and
754 * next sync group will resume), as soon as we receive the last
755 * resync data block, and the last bit is cleared.
756 * until then resync "work" is "inactive" ...
757 */
758 put_ldev(device);
759 return 0;
760 }
761
762 requeue:
763 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
764 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
765 put_ldev(device);
766 return 0;
767 }
768
769 static int make_ov_request(struct drbd_device *device, int cancel)
770 {
771 int number, i, size;
772 sector_t sector;
773 const sector_t capacity = drbd_get_capacity(device->this_bdev);
774 bool stop_sector_reached = false;
775
776 if (unlikely(cancel))
777 return 1;
778
779 number = drbd_rs_number_requests(device);
780
781 sector = device->ov_position;
782 for (i = 0; i < number; i++) {
783 if (sector >= capacity)
784 return 1;
785
786 /* We check for "finished" only in the reply path:
787 * w_e_end_ov_reply().
788 * We need to send at least one request out. */
789 stop_sector_reached = i > 0
790 && verify_can_do_stop_sector(device)
791 && sector >= device->ov_stop_sector;
792 if (stop_sector_reached)
793 break;
794
795 size = BM_BLOCK_SIZE;
796
797 if (drbd_try_rs_begin_io(device, sector)) {
798 device->ov_position = sector;
799 goto requeue;
800 }
801
802 if (sector + (size>>9) > capacity)
803 size = (capacity-sector)<<9;
804
805 inc_rs_pending(device);
806 if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
807 dec_rs_pending(device);
808 return 0;
809 }
810 sector += BM_SECT_PER_BIT;
811 }
812 device->ov_position = sector;
813
814 requeue:
815 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
816 if (i == 0 || !stop_sector_reached)
817 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
818 return 1;
819 }
820
821 int w_ov_finished(struct drbd_work *w, int cancel)
822 {
823 struct drbd_device_work *dw =
824 container_of(w, struct drbd_device_work, w);
825 struct drbd_device *device = dw->device;
826 kfree(dw);
827 ov_out_of_sync_print(device);
828 drbd_resync_finished(device);
829
830 return 0;
831 }
832
833 static int w_resync_finished(struct drbd_work *w, int cancel)
834 {
835 struct drbd_device_work *dw =
836 container_of(w, struct drbd_device_work, w);
837 struct drbd_device *device = dw->device;
838 kfree(dw);
839
840 drbd_resync_finished(device);
841
842 return 0;
843 }
844
845 static void ping_peer(struct drbd_device *device)
846 {
847 struct drbd_connection *connection = first_peer_device(device)->connection;
848
849 clear_bit(GOT_PING_ACK, &connection->flags);
850 request_ping(connection);
851 wait_event(connection->ping_wait,
852 test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
853 }
854
855 int drbd_resync_finished(struct drbd_device *device)
856 {
857 struct drbd_connection *connection = first_peer_device(device)->connection;
858 unsigned long db, dt, dbdt;
859 unsigned long n_oos;
860 union drbd_state os, ns;
861 struct drbd_device_work *dw;
862 char *khelper_cmd = NULL;
863 int verify_done = 0;
864
865 /* Remove all elements from the resync LRU. Since future actions
866 * might set bits in the (main) bitmap, then the entries in the
867 * resync LRU would be wrong. */
868 if (drbd_rs_del_all(device)) {
869 /* In case this is not possible now, most probably because
870 * there are P_RS_DATA_REPLY Packets lingering on the worker's
871 * queue (or even the read operations for those packets
872 * is not finished by now). Retry in 100ms. */
873
874 schedule_timeout_interruptible(HZ / 10);
875 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
876 if (dw) {
877 dw->w.cb = w_resync_finished;
878 dw->device = device;
879 drbd_queue_work(&connection->sender_work, &dw->w);
880 return 1;
881 }
882 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
883 }
884
885 dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
886 if (dt <= 0)
887 dt = 1;
888
889 db = device->rs_total;
890 /* adjust for verify start and stop sectors, respective reached position */
891 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
892 db -= device->ov_left;
893
894 dbdt = Bit2KB(db/dt);
895 device->rs_paused /= HZ;
896
897 if (!get_ldev(device))
898 goto out;
899
900 ping_peer(device);
901
902 spin_lock_irq(&device->resource->req_lock);
903 os = drbd_read_state(device);
904
905 verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
906
907 /* This protects us against multiple calls (that can happen in the presence
908 of application IO), and against connectivity loss just before we arrive here. */
909 if (os.conn <= C_CONNECTED)
910 goto out_unlock;
911
912 ns = os;
913 ns.conn = C_CONNECTED;
914
915 drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
916 verify_done ? "Online verify" : "Resync",
917 dt + device->rs_paused, device->rs_paused, dbdt);
918
919 n_oos = drbd_bm_total_weight(device);
920
921 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
922 if (n_oos) {
923 drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
924 n_oos, Bit2KB(1));
925 khelper_cmd = "out-of-sync";
926 }
927 } else {
928 D_ASSERT(device, (n_oos - device->rs_failed) == 0);
929
930 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
931 khelper_cmd = "after-resync-target";
932
933 if (device->use_csums && device->rs_total) {
934 const unsigned long s = device->rs_same_csum;
935 const unsigned long t = device->rs_total;
936 const int ratio =
937 (t == 0) ? 0 :
938 (t < 100000) ? ((s*100)/t) : (s/(t/100));
939 drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
940 "transferred %luK total %luK\n",
941 ratio,
942 Bit2KB(device->rs_same_csum),
943 Bit2KB(device->rs_total - device->rs_same_csum),
944 Bit2KB(device->rs_total));
945 }
946 }
947
948 if (device->rs_failed) {
949 drbd_info(device, " %lu failed blocks\n", device->rs_failed);
950
951 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
952 ns.disk = D_INCONSISTENT;
953 ns.pdsk = D_UP_TO_DATE;
954 } else {
955 ns.disk = D_UP_TO_DATE;
956 ns.pdsk = D_INCONSISTENT;
957 }
958 } else {
959 ns.disk = D_UP_TO_DATE;
960 ns.pdsk = D_UP_TO_DATE;
961
962 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
963 if (device->p_uuid) {
964 int i;
965 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
966 _drbd_uuid_set(device, i, device->p_uuid[i]);
967 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
968 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
969 } else {
970 drbd_err(device, "device->p_uuid is NULL! BUG\n");
971 }
972 }
973
974 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
975 /* for verify runs, we don't update uuids here,
976 * so there would be nothing to report. */
977 drbd_uuid_set_bm(device, 0UL);
978 drbd_print_uuids(device, "updated UUIDs");
979 if (device->p_uuid) {
980 /* Now the two UUID sets are equal, update what we
981 * know of the peer. */
982 int i;
983 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
984 device->p_uuid[i] = device->ldev->md.uuid[i];
985 }
986 }
987 }
988
989 _drbd_set_state(device, ns, CS_VERBOSE, NULL);
990 out_unlock:
991 spin_unlock_irq(&device->resource->req_lock);
992
993 /* If we have been sync source, and have an effective fencing-policy,
994 * once *all* volumes are back in sync, call "unfence". */
995 if (os.conn == C_SYNC_SOURCE) {
996 enum drbd_disk_state disk_state = D_MASK;
997 enum drbd_disk_state pdsk_state = D_MASK;
998 enum drbd_fencing_p fp = FP_DONT_CARE;
999
1000 rcu_read_lock();
1001 fp = rcu_dereference(device->ldev->disk_conf)->fencing;
1002 if (fp != FP_DONT_CARE) {
1003 struct drbd_peer_device *peer_device;
1004 int vnr;
1005 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1006 struct drbd_device *device = peer_device->device;
1007 disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk);
1008 pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk);
1009 }
1010 }
1011 rcu_read_unlock();
1012 if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE)
1013 conn_khelper(connection, "unfence-peer");
1014 }
1015
1016 put_ldev(device);
1017 out:
1018 device->rs_total = 0;
1019 device->rs_failed = 0;
1020 device->rs_paused = 0;
1021
1022 /* reset start sector, if we reached end of device */
1023 if (verify_done && device->ov_left == 0)
1024 device->ov_start_sector = 0;
1025
1026 drbd_md_sync(device);
1027
1028 if (khelper_cmd)
1029 drbd_khelper(device, khelper_cmd);
1030
1031 return 1;
1032 }
1033
1034 /* helper */
1035 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
1036 {
1037 if (drbd_peer_req_has_active_page(peer_req)) {
1038 /* This might happen if sendpage() has not finished */
1039 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
1040 atomic_add(i, &device->pp_in_use_by_net);
1041 atomic_sub(i, &device->pp_in_use);
1042 spin_lock_irq(&device->resource->req_lock);
1043 list_add_tail(&peer_req->w.list, &device->net_ee);
1044 spin_unlock_irq(&device->resource->req_lock);
1045 wake_up(&drbd_pp_wait);
1046 } else
1047 drbd_free_peer_req(device, peer_req);
1048 }
1049
1050 /**
1051 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1052 * @w: work object.
1053 * @cancel: The connection will be closed anyways
1054 */
1055 int w_e_end_data_req(struct drbd_work *w, int cancel)
1056 {
1057 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1058 struct drbd_peer_device *peer_device = peer_req->peer_device;
1059 struct drbd_device *device = peer_device->device;
1060 int err;
1061
1062 if (unlikely(cancel)) {
1063 drbd_free_peer_req(device, peer_req);
1064 dec_unacked(device);
1065 return 0;
1066 }
1067
1068 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1069 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1070 } else {
1071 if (__ratelimit(&drbd_ratelimit_state))
1072 drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1073 (unsigned long long)peer_req->i.sector);
1074
1075 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1076 }
1077
1078 dec_unacked(device);
1079
1080 move_to_net_ee_or_free(device, peer_req);
1081
1082 if (unlikely(err))
1083 drbd_err(device, "drbd_send_block() failed\n");
1084 return err;
1085 }
1086
1087 static bool all_zero(struct drbd_peer_request *peer_req)
1088 {
1089 struct page *page = peer_req->pages;
1090 unsigned int len = peer_req->i.size;
1091
1092 page_chain_for_each(page) {
1093 unsigned int l = min_t(unsigned int, len, PAGE_SIZE);
1094 unsigned int i, words = l / sizeof(long);
1095 unsigned long *d;
1096
1097 d = kmap_atomic(page);
1098 for (i = 0; i < words; i++) {
1099 if (d[i]) {
1100 kunmap_atomic(d);
1101 return false;
1102 }
1103 }
1104 kunmap_atomic(d);
1105 len -= l;
1106 }
1107
1108 return true;
1109 }
1110
1111 /**
1112 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1113 * @w: work object.
1114 * @cancel: The connection will be closed anyways
1115 */
1116 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1117 {
1118 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1119 struct drbd_peer_device *peer_device = peer_req->peer_device;
1120 struct drbd_device *device = peer_device->device;
1121 int err;
1122
1123 if (unlikely(cancel)) {
1124 drbd_free_peer_req(device, peer_req);
1125 dec_unacked(device);
1126 return 0;
1127 }
1128
1129 if (get_ldev_if_state(device, D_FAILED)) {
1130 drbd_rs_complete_io(device, peer_req->i.sector);
1131 put_ldev(device);
1132 }
1133
1134 if (device->state.conn == C_AHEAD) {
1135 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1136 } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1137 if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1138 inc_rs_pending(device);
1139 if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req))
1140 err = drbd_send_rs_deallocated(peer_device, peer_req);
1141 else
1142 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1143 } else {
1144 if (__ratelimit(&drbd_ratelimit_state))
1145 drbd_err(device, "Not sending RSDataReply, "
1146 "partner DISKLESS!\n");
1147 err = 0;
1148 }
1149 } else {
1150 if (__ratelimit(&drbd_ratelimit_state))
1151 drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1152 (unsigned long long)peer_req->i.sector);
1153
1154 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1155
1156 /* update resync data with failure */
1157 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1158 }
1159
1160 dec_unacked(device);
1161
1162 move_to_net_ee_or_free(device, peer_req);
1163
1164 if (unlikely(err))
1165 drbd_err(device, "drbd_send_block() failed\n");
1166 return err;
1167 }
1168
1169 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1170 {
1171 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1172 struct drbd_peer_device *peer_device = peer_req->peer_device;
1173 struct drbd_device *device = peer_device->device;
1174 struct digest_info *di;
1175 int digest_size;
1176 void *digest = NULL;
1177 int err, eq = 0;
1178
1179 if (unlikely(cancel)) {
1180 drbd_free_peer_req(device, peer_req);
1181 dec_unacked(device);
1182 return 0;
1183 }
1184
1185 if (get_ldev(device)) {
1186 drbd_rs_complete_io(device, peer_req->i.sector);
1187 put_ldev(device);
1188 }
1189
1190 di = peer_req->digest;
1191
1192 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1193 /* quick hack to try to avoid a race against reconfiguration.
1194 * a real fix would be much more involved,
1195 * introducing more locking mechanisms */
1196 if (peer_device->connection->csums_tfm) {
1197 digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
1198 D_ASSERT(device, digest_size == di->digest_size);
1199 digest = kmalloc(digest_size, GFP_NOIO);
1200 }
1201 if (digest) {
1202 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1203 eq = !memcmp(digest, di->digest, digest_size);
1204 kfree(digest);
1205 }
1206
1207 if (eq) {
1208 drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1209 /* rs_same_csums unit is BM_BLOCK_SIZE */
1210 device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1211 err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1212 } else {
1213 inc_rs_pending(device);
1214 peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1215 peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1216 kfree(di);
1217 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1218 }
1219 } else {
1220 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1221 if (__ratelimit(&drbd_ratelimit_state))
1222 drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1223 }
1224
1225 dec_unacked(device);
1226 move_to_net_ee_or_free(device, peer_req);
1227
1228 if (unlikely(err))
1229 drbd_err(device, "drbd_send_block/ack() failed\n");
1230 return err;
1231 }
1232
1233 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1234 {
1235 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1236 struct drbd_peer_device *peer_device = peer_req->peer_device;
1237 struct drbd_device *device = peer_device->device;
1238 sector_t sector = peer_req->i.sector;
1239 unsigned int size = peer_req->i.size;
1240 int digest_size;
1241 void *digest;
1242 int err = 0;
1243
1244 if (unlikely(cancel))
1245 goto out;
1246
1247 digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
1248 digest = kmalloc(digest_size, GFP_NOIO);
1249 if (!digest) {
1250 err = 1; /* terminate the connection in case the allocation failed */
1251 goto out;
1252 }
1253
1254 if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1255 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1256 else
1257 memset(digest, 0, digest_size);
1258
1259 /* Free e and pages before send.
1260 * In case we block on congestion, we could otherwise run into
1261 * some distributed deadlock, if the other side blocks on
1262 * congestion as well, because our receiver blocks in
1263 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1264 drbd_free_peer_req(device, peer_req);
1265 peer_req = NULL;
1266 inc_rs_pending(device);
1267 err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1268 if (err)
1269 dec_rs_pending(device);
1270 kfree(digest);
1271
1272 out:
1273 if (peer_req)
1274 drbd_free_peer_req(device, peer_req);
1275 dec_unacked(device);
1276 return err;
1277 }
1278
1279 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1280 {
1281 if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1282 device->ov_last_oos_size += size>>9;
1283 } else {
1284 device->ov_last_oos_start = sector;
1285 device->ov_last_oos_size = size>>9;
1286 }
1287 drbd_set_out_of_sync(device, sector, size);
1288 }
1289
1290 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1291 {
1292 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1293 struct drbd_peer_device *peer_device = peer_req->peer_device;
1294 struct drbd_device *device = peer_device->device;
1295 struct digest_info *di;
1296 void *digest;
1297 sector_t sector = peer_req->i.sector;
1298 unsigned int size = peer_req->i.size;
1299 int digest_size;
1300 int err, eq = 0;
1301 bool stop_sector_reached = false;
1302
1303 if (unlikely(cancel)) {
1304 drbd_free_peer_req(device, peer_req);
1305 dec_unacked(device);
1306 return 0;
1307 }
1308
1309 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1310 * the resync lru has been cleaned up already */
1311 if (get_ldev(device)) {
1312 drbd_rs_complete_io(device, peer_req->i.sector);
1313 put_ldev(device);
1314 }
1315
1316 di = peer_req->digest;
1317
1318 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1319 digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
1320 digest = kmalloc(digest_size, GFP_NOIO);
1321 if (digest) {
1322 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1323
1324 D_ASSERT(device, digest_size == di->digest_size);
1325 eq = !memcmp(digest, di->digest, digest_size);
1326 kfree(digest);
1327 }
1328 }
1329
1330 /* Free peer_req and pages before send.
1331 * In case we block on congestion, we could otherwise run into
1332 * some distributed deadlock, if the other side blocks on
1333 * congestion as well, because our receiver blocks in
1334 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1335 drbd_free_peer_req(device, peer_req);
1336 if (!eq)
1337 drbd_ov_out_of_sync_found(device, sector, size);
1338 else
1339 ov_out_of_sync_print(device);
1340
1341 err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1342 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1343
1344 dec_unacked(device);
1345
1346 --device->ov_left;
1347
1348 /* let's advance progress step marks only for every other megabyte */
1349 if ((device->ov_left & 0x200) == 0x200)
1350 drbd_advance_rs_marks(device, device->ov_left);
1351
1352 stop_sector_reached = verify_can_do_stop_sector(device) &&
1353 (sector + (size>>9)) >= device->ov_stop_sector;
1354
1355 if (device->ov_left == 0 || stop_sector_reached) {
1356 ov_out_of_sync_print(device);
1357 drbd_resync_finished(device);
1358 }
1359
1360 return err;
1361 }
1362
1363 /* FIXME
1364 * We need to track the number of pending barrier acks,
1365 * and to be able to wait for them.
1366 * See also comment in drbd_adm_attach before drbd_suspend_io.
1367 */
1368 static int drbd_send_barrier(struct drbd_connection *connection)
1369 {
1370 struct p_barrier *p;
1371 struct drbd_socket *sock;
1372
1373 sock = &connection->data;
1374 p = conn_prepare_command(connection, sock);
1375 if (!p)
1376 return -EIO;
1377 p->barrier = connection->send.current_epoch_nr;
1378 p->pad = 0;
1379 connection->send.current_epoch_writes = 0;
1380 connection->send.last_sent_barrier_jif = jiffies;
1381
1382 return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1383 }
1384
1385 int w_send_write_hint(struct drbd_work *w, int cancel)
1386 {
1387 struct drbd_device *device =
1388 container_of(w, struct drbd_device, unplug_work);
1389 struct drbd_socket *sock;
1390
1391 if (cancel)
1392 return 0;
1393 sock = &first_peer_device(device)->connection->data;
1394 if (!drbd_prepare_command(first_peer_device(device), sock))
1395 return -EIO;
1396 return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1397 }
1398
1399 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1400 {
1401 if (!connection->send.seen_any_write_yet) {
1402 connection->send.seen_any_write_yet = true;
1403 connection->send.current_epoch_nr = epoch;
1404 connection->send.current_epoch_writes = 0;
1405 connection->send.last_sent_barrier_jif = jiffies;
1406 }
1407 }
1408
1409 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1410 {
1411 /* re-init if first write on this connection */
1412 if (!connection->send.seen_any_write_yet)
1413 return;
1414 if (connection->send.current_epoch_nr != epoch) {
1415 if (connection->send.current_epoch_writes)
1416 drbd_send_barrier(connection);
1417 connection->send.current_epoch_nr = epoch;
1418 }
1419 }
1420
1421 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1422 {
1423 struct drbd_request *req = container_of(w, struct drbd_request, w);
1424 struct drbd_device *device = req->device;
1425 struct drbd_peer_device *const peer_device = first_peer_device(device);
1426 struct drbd_connection *const connection = peer_device->connection;
1427 int err;
1428
1429 if (unlikely(cancel)) {
1430 req_mod(req, SEND_CANCELED);
1431 return 0;
1432 }
1433 req->pre_send_jif = jiffies;
1434
1435 /* this time, no connection->send.current_epoch_writes++;
1436 * If it was sent, it was the closing barrier for the last
1437 * replicated epoch, before we went into AHEAD mode.
1438 * No more barriers will be sent, until we leave AHEAD mode again. */
1439 maybe_send_barrier(connection, req->epoch);
1440
1441 err = drbd_send_out_of_sync(peer_device, req);
1442 req_mod(req, OOS_HANDED_TO_NETWORK);
1443
1444 return err;
1445 }
1446
1447 /**
1448 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1449 * @w: work object.
1450 * @cancel: The connection will be closed anyways
1451 */
1452 int w_send_dblock(struct drbd_work *w, int cancel)
1453 {
1454 struct drbd_request *req = container_of(w, struct drbd_request, w);
1455 struct drbd_device *device = req->device;
1456 struct drbd_peer_device *const peer_device = first_peer_device(device);
1457 struct drbd_connection *connection = peer_device->connection;
1458 int err;
1459
1460 if (unlikely(cancel)) {
1461 req_mod(req, SEND_CANCELED);
1462 return 0;
1463 }
1464 req->pre_send_jif = jiffies;
1465
1466 re_init_if_first_write(connection, req->epoch);
1467 maybe_send_barrier(connection, req->epoch);
1468 connection->send.current_epoch_writes++;
1469
1470 err = drbd_send_dblock(peer_device, req);
1471 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1472
1473 return err;
1474 }
1475
1476 /**
1477 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1478 * @w: work object.
1479 * @cancel: The connection will be closed anyways
1480 */
1481 int w_send_read_req(struct drbd_work *w, int cancel)
1482 {
1483 struct drbd_request *req = container_of(w, struct drbd_request, w);
1484 struct drbd_device *device = req->device;
1485 struct drbd_peer_device *const peer_device = first_peer_device(device);
1486 struct drbd_connection *connection = peer_device->connection;
1487 int err;
1488
1489 if (unlikely(cancel)) {
1490 req_mod(req, SEND_CANCELED);
1491 return 0;
1492 }
1493 req->pre_send_jif = jiffies;
1494
1495 /* Even read requests may close a write epoch,
1496 * if there was any yet. */
1497 maybe_send_barrier(connection, req->epoch);
1498
1499 err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1500 (unsigned long)req);
1501
1502 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1503
1504 return err;
1505 }
1506
1507 int w_restart_disk_io(struct drbd_work *w, int cancel)
1508 {
1509 struct drbd_request *req = container_of(w, struct drbd_request, w);
1510 struct drbd_device *device = req->device;
1511
1512 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1513 drbd_al_begin_io(device, &req->i);
1514
1515 drbd_req_make_private_bio(req, req->master_bio);
1516 req->private_bio->bi_bdev = device->ldev->backing_bdev;
1517 generic_make_request(req->private_bio);
1518
1519 return 0;
1520 }
1521
1522 static int _drbd_may_sync_now(struct drbd_device *device)
1523 {
1524 struct drbd_device *odev = device;
1525 int resync_after;
1526
1527 while (1) {
1528 if (!odev->ldev || odev->state.disk == D_DISKLESS)
1529 return 1;
1530 rcu_read_lock();
1531 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1532 rcu_read_unlock();
1533 if (resync_after == -1)
1534 return 1;
1535 odev = minor_to_device(resync_after);
1536 if (!odev)
1537 return 1;
1538 if ((odev->state.conn >= C_SYNC_SOURCE &&
1539 odev->state.conn <= C_PAUSED_SYNC_T) ||
1540 odev->state.aftr_isp || odev->state.peer_isp ||
1541 odev->state.user_isp)
1542 return 0;
1543 }
1544 }
1545
1546 /**
1547 * drbd_pause_after() - Pause resync on all devices that may not resync now
1548 * @device: DRBD device.
1549 *
1550 * Called from process context only (admin command and after_state_ch).
1551 */
1552 static bool drbd_pause_after(struct drbd_device *device)
1553 {
1554 bool changed = false;
1555 struct drbd_device *odev;
1556 int i;
1557
1558 rcu_read_lock();
1559 idr_for_each_entry(&drbd_devices, odev, i) {
1560 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1561 continue;
1562 if (!_drbd_may_sync_now(odev) &&
1563 _drbd_set_state(_NS(odev, aftr_isp, 1),
1564 CS_HARD, NULL) != SS_NOTHING_TO_DO)
1565 changed = true;
1566 }
1567 rcu_read_unlock();
1568
1569 return changed;
1570 }
1571
1572 /**
1573 * drbd_resume_next() - Resume resync on all devices that may resync now
1574 * @device: DRBD device.
1575 *
1576 * Called from process context only (admin command and worker).
1577 */
1578 static bool drbd_resume_next(struct drbd_device *device)
1579 {
1580 bool changed = false;
1581 struct drbd_device *odev;
1582 int i;
1583
1584 rcu_read_lock();
1585 idr_for_each_entry(&drbd_devices, odev, i) {
1586 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1587 continue;
1588 if (odev->state.aftr_isp) {
1589 if (_drbd_may_sync_now(odev) &&
1590 _drbd_set_state(_NS(odev, aftr_isp, 0),
1591 CS_HARD, NULL) != SS_NOTHING_TO_DO)
1592 changed = true;
1593 }
1594 }
1595 rcu_read_unlock();
1596 return changed;
1597 }
1598
1599 void resume_next_sg(struct drbd_device *device)
1600 {
1601 lock_all_resources();
1602 drbd_resume_next(device);
1603 unlock_all_resources();
1604 }
1605
1606 void suspend_other_sg(struct drbd_device *device)
1607 {
1608 lock_all_resources();
1609 drbd_pause_after(device);
1610 unlock_all_resources();
1611 }
1612
1613 /* caller must lock_all_resources() */
1614 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1615 {
1616 struct drbd_device *odev;
1617 int resync_after;
1618
1619 if (o_minor == -1)
1620 return NO_ERROR;
1621 if (o_minor < -1 || o_minor > MINORMASK)
1622 return ERR_RESYNC_AFTER;
1623
1624 /* check for loops */
1625 odev = minor_to_device(o_minor);
1626 while (1) {
1627 if (odev == device)
1628 return ERR_RESYNC_AFTER_CYCLE;
1629
1630 /* You are free to depend on diskless, non-existing,
1631 * or not yet/no longer existing minors.
1632 * We only reject dependency loops.
1633 * We cannot follow the dependency chain beyond a detached or
1634 * missing minor.
1635 */
1636 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1637 return NO_ERROR;
1638
1639 rcu_read_lock();
1640 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1641 rcu_read_unlock();
1642 /* dependency chain ends here, no cycles. */
1643 if (resync_after == -1)
1644 return NO_ERROR;
1645
1646 /* follow the dependency chain */
1647 odev = minor_to_device(resync_after);
1648 }
1649 }
1650
1651 /* caller must lock_all_resources() */
1652 void drbd_resync_after_changed(struct drbd_device *device)
1653 {
1654 int changed;
1655
1656 do {
1657 changed = drbd_pause_after(device);
1658 changed |= drbd_resume_next(device);
1659 } while (changed);
1660 }
1661
1662 void drbd_rs_controller_reset(struct drbd_device *device)
1663 {
1664 struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
1665 struct fifo_buffer *plan;
1666
1667 atomic_set(&device->rs_sect_in, 0);
1668 atomic_set(&device->rs_sect_ev, 0);
1669 device->rs_in_flight = 0;
1670 device->rs_last_events =
1671 (int)part_stat_read(&disk->part0, sectors[0]) +
1672 (int)part_stat_read(&disk->part0, sectors[1]);
1673
1674 /* Updating the RCU protected object in place is necessary since
1675 this function gets called from atomic context.
1676 It is valid since all other updates also lead to an completely
1677 empty fifo */
1678 rcu_read_lock();
1679 plan = rcu_dereference(device->rs_plan_s);
1680 plan->total = 0;
1681 fifo_set(plan, 0);
1682 rcu_read_unlock();
1683 }
1684
1685 void start_resync_timer_fn(unsigned long data)
1686 {
1687 struct drbd_device *device = (struct drbd_device *) data;
1688 drbd_device_post_work(device, RS_START);
1689 }
1690
1691 static void do_start_resync(struct drbd_device *device)
1692 {
1693 if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1694 drbd_warn(device, "postponing start_resync ...\n");
1695 device->start_resync_timer.expires = jiffies + HZ/10;
1696 add_timer(&device->start_resync_timer);
1697 return;
1698 }
1699
1700 drbd_start_resync(device, C_SYNC_SOURCE);
1701 clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1702 }
1703
1704 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1705 {
1706 bool csums_after_crash_only;
1707 rcu_read_lock();
1708 csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1709 rcu_read_unlock();
1710 return connection->agreed_pro_version >= 89 && /* supported? */
1711 connection->csums_tfm && /* configured? */
1712 (csums_after_crash_only == false /* use for each resync? */
1713 || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1714 }
1715
1716 /**
1717 * drbd_start_resync() - Start the resync process
1718 * @device: DRBD device.
1719 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1720 *
1721 * This function might bring you directly into one of the
1722 * C_PAUSED_SYNC_* states.
1723 */
1724 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1725 {
1726 struct drbd_peer_device *peer_device = first_peer_device(device);
1727 struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1728 union drbd_state ns;
1729 int r;
1730
1731 if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1732 drbd_err(device, "Resync already running!\n");
1733 return;
1734 }
1735
1736 if (!test_bit(B_RS_H_DONE, &device->flags)) {
1737 if (side == C_SYNC_TARGET) {
1738 /* Since application IO was locked out during C_WF_BITMAP_T and
1739 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1740 we check that we might make the data inconsistent. */
1741 r = drbd_khelper(device, "before-resync-target");
1742 r = (r >> 8) & 0xff;
1743 if (r > 0) {
1744 drbd_info(device, "before-resync-target handler returned %d, "
1745 "dropping connection.\n", r);
1746 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1747 return;
1748 }
1749 } else /* C_SYNC_SOURCE */ {
1750 r = drbd_khelper(device, "before-resync-source");
1751 r = (r >> 8) & 0xff;
1752 if (r > 0) {
1753 if (r == 3) {
1754 drbd_info(device, "before-resync-source handler returned %d, "
1755 "ignoring. Old userland tools?", r);
1756 } else {
1757 drbd_info(device, "before-resync-source handler returned %d, "
1758 "dropping connection.\n", r);
1759 conn_request_state(connection,
1760 NS(conn, C_DISCONNECTING), CS_HARD);
1761 return;
1762 }
1763 }
1764 }
1765 }
1766
1767 if (current == connection->worker.task) {
1768 /* The worker should not sleep waiting for state_mutex,
1769 that can take long */
1770 if (!mutex_trylock(device->state_mutex)) {
1771 set_bit(B_RS_H_DONE, &device->flags);
1772 device->start_resync_timer.expires = jiffies + HZ/5;
1773 add_timer(&device->start_resync_timer);
1774 return;
1775 }
1776 } else {
1777 mutex_lock(device->state_mutex);
1778 }
1779
1780 lock_all_resources();
1781 clear_bit(B_RS_H_DONE, &device->flags);
1782 /* Did some connection breakage or IO error race with us? */
1783 if (device->state.conn < C_CONNECTED
1784 || !get_ldev_if_state(device, D_NEGOTIATING)) {
1785 unlock_all_resources();
1786 goto out;
1787 }
1788
1789 ns = drbd_read_state(device);
1790
1791 ns.aftr_isp = !_drbd_may_sync_now(device);
1792
1793 ns.conn = side;
1794
1795 if (side == C_SYNC_TARGET)
1796 ns.disk = D_INCONSISTENT;
1797 else /* side == C_SYNC_SOURCE */
1798 ns.pdsk = D_INCONSISTENT;
1799
1800 r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1801 ns = drbd_read_state(device);
1802
1803 if (ns.conn < C_CONNECTED)
1804 r = SS_UNKNOWN_ERROR;
1805
1806 if (r == SS_SUCCESS) {
1807 unsigned long tw = drbd_bm_total_weight(device);
1808 unsigned long now = jiffies;
1809 int i;
1810
1811 device->rs_failed = 0;
1812 device->rs_paused = 0;
1813 device->rs_same_csum = 0;
1814 device->rs_last_sect_ev = 0;
1815 device->rs_total = tw;
1816 device->rs_start = now;
1817 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1818 device->rs_mark_left[i] = tw;
1819 device->rs_mark_time[i] = now;
1820 }
1821 drbd_pause_after(device);
1822 /* Forget potentially stale cached per resync extent bit-counts.
1823 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1824 * disabled, and know the disk state is ok. */
1825 spin_lock(&device->al_lock);
1826 lc_reset(device->resync);
1827 device->resync_locked = 0;
1828 device->resync_wenr = LC_FREE;
1829 spin_unlock(&device->al_lock);
1830 }
1831 unlock_all_resources();
1832
1833 if (r == SS_SUCCESS) {
1834 wake_up(&device->al_wait); /* for lc_reset() above */
1835 /* reset rs_last_bcast when a resync or verify is started,
1836 * to deal with potential jiffies wrap. */
1837 device->rs_last_bcast = jiffies - HZ;
1838
1839 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1840 drbd_conn_str(ns.conn),
1841 (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1842 (unsigned long) device->rs_total);
1843 if (side == C_SYNC_TARGET) {
1844 device->bm_resync_fo = 0;
1845 device->use_csums = use_checksum_based_resync(connection, device);
1846 } else {
1847 device->use_csums = false;
1848 }
1849
1850 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1851 * with w_send_oos, or the sync target will get confused as to
1852 * how much bits to resync. We cannot do that always, because for an
1853 * empty resync and protocol < 95, we need to do it here, as we call
1854 * drbd_resync_finished from here in that case.
1855 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1856 * and from after_state_ch otherwise. */
1857 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1858 drbd_gen_and_send_sync_uuid(peer_device);
1859
1860 if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1861 /* This still has a race (about when exactly the peers
1862 * detect connection loss) that can lead to a full sync
1863 * on next handshake. In 8.3.9 we fixed this with explicit
1864 * resync-finished notifications, but the fix
1865 * introduces a protocol change. Sleeping for some
1866 * time longer than the ping interval + timeout on the
1867 * SyncSource, to give the SyncTarget the chance to
1868 * detect connection loss, then waiting for a ping
1869 * response (implicit in drbd_resync_finished) reduces
1870 * the race considerably, but does not solve it. */
1871 if (side == C_SYNC_SOURCE) {
1872 struct net_conf *nc;
1873 int timeo;
1874
1875 rcu_read_lock();
1876 nc = rcu_dereference(connection->net_conf);
1877 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1878 rcu_read_unlock();
1879 schedule_timeout_interruptible(timeo);
1880 }
1881 drbd_resync_finished(device);
1882 }
1883
1884 drbd_rs_controller_reset(device);
1885 /* ns.conn may already be != device->state.conn,
1886 * we may have been paused in between, or become paused until
1887 * the timer triggers.
1888 * No matter, that is handled in resync_timer_fn() */
1889 if (ns.conn == C_SYNC_TARGET)
1890 mod_timer(&device->resync_timer, jiffies);
1891
1892 drbd_md_sync(device);
1893 }
1894 put_ldev(device);
1895 out:
1896 mutex_unlock(device->state_mutex);
1897 }
1898
1899 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1900 {
1901 struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1902 device->rs_last_bcast = jiffies;
1903
1904 if (!get_ldev(device))
1905 return;
1906
1907 drbd_bm_write_lazy(device, 0);
1908 if (resync_done && is_sync_state(device->state.conn))
1909 drbd_resync_finished(device);
1910
1911 drbd_bcast_event(device, &sib);
1912 /* update timestamp, in case it took a while to write out stuff */
1913 device->rs_last_bcast = jiffies;
1914 put_ldev(device);
1915 }
1916
1917 static void drbd_ldev_destroy(struct drbd_device *device)
1918 {
1919 lc_destroy(device->resync);
1920 device->resync = NULL;
1921 lc_destroy(device->act_log);
1922 device->act_log = NULL;
1923
1924 __acquire(local);
1925 drbd_backing_dev_free(device, device->ldev);
1926 device->ldev = NULL;
1927 __release(local);
1928
1929 clear_bit(GOING_DISKLESS, &device->flags);
1930 wake_up(&device->misc_wait);
1931 }
1932
1933 static void go_diskless(struct drbd_device *device)
1934 {
1935 D_ASSERT(device, device->state.disk == D_FAILED);
1936 /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1937 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1938 * the protected members anymore, though, so once put_ldev reaches zero
1939 * again, it will be safe to free them. */
1940
1941 /* Try to write changed bitmap pages, read errors may have just
1942 * set some bits outside the area covered by the activity log.
1943 *
1944 * If we have an IO error during the bitmap writeout,
1945 * we will want a full sync next time, just in case.
1946 * (Do we want a specific meta data flag for this?)
1947 *
1948 * If that does not make it to stable storage either,
1949 * we cannot do anything about that anymore.
1950 *
1951 * We still need to check if both bitmap and ldev are present, we may
1952 * end up here after a failed attach, before ldev was even assigned.
1953 */
1954 if (device->bitmap && device->ldev) {
1955 /* An interrupted resync or similar is allowed to recounts bits
1956 * while we detach.
1957 * Any modifications would not be expected anymore, though.
1958 */
1959 if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1960 "detach", BM_LOCKED_TEST_ALLOWED)) {
1961 if (test_bit(WAS_READ_ERROR, &device->flags)) {
1962 drbd_md_set_flag(device, MDF_FULL_SYNC);
1963 drbd_md_sync(device);
1964 }
1965 }
1966 }
1967
1968 drbd_force_state(device, NS(disk, D_DISKLESS));
1969 }
1970
1971 static int do_md_sync(struct drbd_device *device)
1972 {
1973 drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1974 drbd_md_sync(device);
1975 return 0;
1976 }
1977
1978 /* only called from drbd_worker thread, no locking */
1979 void __update_timing_details(
1980 struct drbd_thread_timing_details *tdp,
1981 unsigned int *cb_nr,
1982 void *cb,
1983 const char *fn, const unsigned int line)
1984 {
1985 unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
1986 struct drbd_thread_timing_details *td = tdp + i;
1987
1988 td->start_jif = jiffies;
1989 td->cb_addr = cb;
1990 td->caller_fn = fn;
1991 td->line = line;
1992 td->cb_nr = *cb_nr;
1993
1994 i = (i+1) % DRBD_THREAD_DETAILS_HIST;
1995 td = tdp + i;
1996 memset(td, 0, sizeof(*td));
1997
1998 ++(*cb_nr);
1999 }
2000
2001 static void do_device_work(struct drbd_device *device, const unsigned long todo)
2002 {
2003 if (test_bit(MD_SYNC, &todo))
2004 do_md_sync(device);
2005 if (test_bit(RS_DONE, &todo) ||
2006 test_bit(RS_PROGRESS, &todo))
2007 update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
2008 if (test_bit(GO_DISKLESS, &todo))
2009 go_diskless(device);
2010 if (test_bit(DESTROY_DISK, &todo))
2011 drbd_ldev_destroy(device);
2012 if (test_bit(RS_START, &todo))
2013 do_start_resync(device);
2014 }
2015
2016 #define DRBD_DEVICE_WORK_MASK \
2017 ((1UL << GO_DISKLESS) \
2018 |(1UL << DESTROY_DISK) \
2019 |(1UL << MD_SYNC) \
2020 |(1UL << RS_START) \
2021 |(1UL << RS_PROGRESS) \
2022 |(1UL << RS_DONE) \
2023 )
2024
2025 static unsigned long get_work_bits(unsigned long *flags)
2026 {
2027 unsigned long old, new;
2028 do {
2029 old = *flags;
2030 new = old & ~DRBD_DEVICE_WORK_MASK;
2031 } while (cmpxchg(flags, old, new) != old);
2032 return old & DRBD_DEVICE_WORK_MASK;
2033 }
2034
2035 static void do_unqueued_work(struct drbd_connection *connection)
2036 {
2037 struct drbd_peer_device *peer_device;
2038 int vnr;
2039
2040 rcu_read_lock();
2041 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2042 struct drbd_device *device = peer_device->device;
2043 unsigned long todo = get_work_bits(&device->flags);
2044 if (!todo)
2045 continue;
2046
2047 kref_get(&device->kref);
2048 rcu_read_unlock();
2049 do_device_work(device, todo);
2050 kref_put(&device->kref, drbd_destroy_device);
2051 rcu_read_lock();
2052 }
2053 rcu_read_unlock();
2054 }
2055
2056 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
2057 {
2058 spin_lock_irq(&queue->q_lock);
2059 list_splice_tail_init(&queue->q, work_list);
2060 spin_unlock_irq(&queue->q_lock);
2061 return !list_empty(work_list);
2062 }
2063
2064 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2065 {
2066 DEFINE_WAIT(wait);
2067 struct net_conf *nc;
2068 int uncork, cork;
2069
2070 dequeue_work_batch(&connection->sender_work, work_list);
2071 if (!list_empty(work_list))
2072 return;
2073
2074 /* Still nothing to do?
2075 * Maybe we still need to close the current epoch,
2076 * even if no new requests are queued yet.
2077 *
2078 * Also, poke TCP, just in case.
2079 * Then wait for new work (or signal). */
2080 rcu_read_lock();
2081 nc = rcu_dereference(connection->net_conf);
2082 uncork = nc ? nc->tcp_cork : 0;
2083 rcu_read_unlock();
2084 if (uncork) {
2085 mutex_lock(&connection->data.mutex);
2086 if (connection->data.socket)
2087 drbd_tcp_uncork(connection->data.socket);
2088 mutex_unlock(&connection->data.mutex);
2089 }
2090
2091 for (;;) {
2092 int send_barrier;
2093 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2094 spin_lock_irq(&connection->resource->req_lock);
2095 spin_lock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
2096 if (!list_empty(&connection->sender_work.q))
2097 list_splice_tail_init(&connection->sender_work.q, work_list);
2098 spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
2099 if (!list_empty(work_list) || signal_pending(current)) {
2100 spin_unlock_irq(&connection->resource->req_lock);
2101 break;
2102 }
2103
2104 /* We found nothing new to do, no to-be-communicated request,
2105 * no other work item. We may still need to close the last
2106 * epoch. Next incoming request epoch will be connection ->
2107 * current transfer log epoch number. If that is different
2108 * from the epoch of the last request we communicated, it is
2109 * safe to send the epoch separating barrier now.
2110 */
2111 send_barrier =
2112 atomic_read(&connection->current_tle_nr) !=
2113 connection->send.current_epoch_nr;
2114 spin_unlock_irq(&connection->resource->req_lock);
2115
2116 if (send_barrier)
2117 maybe_send_barrier(connection,
2118 connection->send.current_epoch_nr + 1);
2119
2120 if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2121 break;
2122
2123 /* drbd_send() may have called flush_signals() */
2124 if (get_t_state(&connection->worker) != RUNNING)
2125 break;
2126
2127 schedule();
2128 /* may be woken up for other things but new work, too,
2129 * e.g. if the current epoch got closed.
2130 * In which case we send the barrier above. */
2131 }
2132 finish_wait(&connection->sender_work.q_wait, &wait);
2133
2134 /* someone may have changed the config while we have been waiting above. */
2135 rcu_read_lock();
2136 nc = rcu_dereference(connection->net_conf);
2137 cork = nc ? nc->tcp_cork : 0;
2138 rcu_read_unlock();
2139 mutex_lock(&connection->data.mutex);
2140 if (connection->data.socket) {
2141 if (cork)
2142 drbd_tcp_cork(connection->data.socket);
2143 else if (!uncork)
2144 drbd_tcp_uncork(connection->data.socket);
2145 }
2146 mutex_unlock(&connection->data.mutex);
2147 }
2148
2149 int drbd_worker(struct drbd_thread *thi)
2150 {
2151 struct drbd_connection *connection = thi->connection;
2152 struct drbd_work *w = NULL;
2153 struct drbd_peer_device *peer_device;
2154 LIST_HEAD(work_list);
2155 int vnr;
2156
2157 while (get_t_state(thi) == RUNNING) {
2158 drbd_thread_current_set_cpu(thi);
2159
2160 if (list_empty(&work_list)) {
2161 update_worker_timing_details(connection, wait_for_work);
2162 wait_for_work(connection, &work_list);
2163 }
2164
2165 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2166 update_worker_timing_details(connection, do_unqueued_work);
2167 do_unqueued_work(connection);
2168 }
2169
2170 if (signal_pending(current)) {
2171 flush_signals(current);
2172 if (get_t_state(thi) == RUNNING) {
2173 drbd_warn(connection, "Worker got an unexpected signal\n");
2174 continue;
2175 }
2176 break;
2177 }
2178
2179 if (get_t_state(thi) != RUNNING)
2180 break;
2181
2182 if (!list_empty(&work_list)) {
2183 w = list_first_entry(&work_list, struct drbd_work, list);
2184 list_del_init(&w->list);
2185 update_worker_timing_details(connection, w->cb);
2186 if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2187 continue;
2188 if (connection->cstate >= C_WF_REPORT_PARAMS)
2189 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2190 }
2191 }
2192
2193 do {
2194 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2195 update_worker_timing_details(connection, do_unqueued_work);
2196 do_unqueued_work(connection);
2197 }
2198 if (!list_empty(&work_list)) {
2199 w = list_first_entry(&work_list, struct drbd_work, list);
2200 list_del_init(&w->list);
2201 update_worker_timing_details(connection, w->cb);
2202 w->cb(w, 1);
2203 } else
2204 dequeue_work_batch(&connection->sender_work, &work_list);
2205 } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2206
2207 rcu_read_lock();
2208 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2209 struct drbd_device *device = peer_device->device;
2210 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2211 kref_get(&device->kref);
2212 rcu_read_unlock();
2213 drbd_device_cleanup(device);
2214 kref_put(&device->kref, drbd_destroy_device);
2215 rcu_read_lock();
2216 }
2217 rcu_read_unlock();
2218
2219 return 0;
2220 }