]> git.proxmox.com Git - mirror_ubuntu-bionic-kernel.git/blob - drivers/block/drbd/drbd_worker.c
block: add a bi_error field to struct bio
[mirror_ubuntu-bionic-kernel.git] / drivers / block / drbd / drbd_worker.c
1 /*
2 drbd_worker.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_protocol.h"
40 #include "drbd_req.h"
41
42 static int make_ov_request(struct drbd_device *, int);
43 static int make_resync_request(struct drbd_device *, int);
44
45 /* endio handlers:
46 * drbd_md_endio (defined here)
47 * drbd_request_endio (defined here)
48 * drbd_peer_request_endio (defined here)
49 * drbd_bm_endio (defined in drbd_bitmap.c)
50 *
51 * For all these callbacks, note the following:
52 * The callbacks will be called in irq context by the IDE drivers,
53 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54 * Try to get the locking right :)
55 *
56 */
57
58
59 /* About the global_state_lock
60 Each state transition on an device holds a read lock. In case we have
61 to evaluate the resync after dependencies, we grab a write lock, because
62 we need stable states on all devices for that. */
63 rwlock_t global_state_lock;
64
65 /* used for synchronous meta data and bitmap IO
66 * submitted by drbd_md_sync_page_io()
67 */
68 void drbd_md_endio(struct bio *bio)
69 {
70 struct drbd_device *device;
71
72 device = bio->bi_private;
73 device->md_io.error = bio->bi_error;
74
75 /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
76 * to timeout on the lower level device, and eventually detach from it.
77 * If this io completion runs after that timeout expired, this
78 * drbd_md_put_buffer() may allow us to finally try and re-attach.
79 * During normal operation, this only puts that extra reference
80 * down to 1 again.
81 * Make sure we first drop the reference, and only then signal
82 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
83 * next drbd_md_sync_page_io(), that we trigger the
84 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
85 */
86 drbd_md_put_buffer(device);
87 device->md_io.done = 1;
88 wake_up(&device->misc_wait);
89 bio_put(bio);
90 if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
91 put_ldev(device);
92 }
93
94 /* reads on behalf of the partner,
95 * "submitted" by the receiver
96 */
97 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
98 {
99 unsigned long flags = 0;
100 struct drbd_peer_device *peer_device = peer_req->peer_device;
101 struct drbd_device *device = peer_device->device;
102
103 spin_lock_irqsave(&device->resource->req_lock, flags);
104 device->read_cnt += peer_req->i.size >> 9;
105 list_del(&peer_req->w.list);
106 if (list_empty(&device->read_ee))
107 wake_up(&device->ee_wait);
108 if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
109 __drbd_chk_io_error(device, DRBD_READ_ERROR);
110 spin_unlock_irqrestore(&device->resource->req_lock, flags);
111
112 drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
113 put_ldev(device);
114 }
115
116 /* writes on behalf of the partner, or resync writes,
117 * "submitted" by the receiver, final stage. */
118 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
119 {
120 unsigned long flags = 0;
121 struct drbd_peer_device *peer_device = peer_req->peer_device;
122 struct drbd_device *device = peer_device->device;
123 struct drbd_interval i;
124 int do_wake;
125 u64 block_id;
126 int do_al_complete_io;
127
128 /* after we moved peer_req to done_ee,
129 * we may no longer access it,
130 * it may be freed/reused already!
131 * (as soon as we release the req_lock) */
132 i = peer_req->i;
133 do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
134 block_id = peer_req->block_id;
135 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
136
137 spin_lock_irqsave(&device->resource->req_lock, flags);
138 device->writ_cnt += peer_req->i.size >> 9;
139 list_move_tail(&peer_req->w.list, &device->done_ee);
140
141 /*
142 * Do not remove from the write_requests tree here: we did not send the
143 * Ack yet and did not wake possibly waiting conflicting requests.
144 * Removed from the tree from "drbd_process_done_ee" within the
145 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
146 * _drbd_clear_done_ee.
147 */
148
149 do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
150
151 /* FIXME do we want to detach for failed REQ_DISCARD?
152 * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
153 if (peer_req->flags & EE_WAS_ERROR)
154 __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
155 spin_unlock_irqrestore(&device->resource->req_lock, flags);
156
157 if (block_id == ID_SYNCER)
158 drbd_rs_complete_io(device, i.sector);
159
160 if (do_wake)
161 wake_up(&device->ee_wait);
162
163 if (do_al_complete_io)
164 drbd_al_complete_io(device, &i);
165
166 wake_asender(peer_device->connection);
167 put_ldev(device);
168 }
169
170 /* writes on behalf of the partner, or resync writes,
171 * "submitted" by the receiver.
172 */
173 void drbd_peer_request_endio(struct bio *bio)
174 {
175 struct drbd_peer_request *peer_req = bio->bi_private;
176 struct drbd_device *device = peer_req->peer_device->device;
177 int is_write = bio_data_dir(bio) == WRITE;
178 int is_discard = !!(bio->bi_rw & REQ_DISCARD);
179
180 if (bio->bi_error && __ratelimit(&drbd_ratelimit_state))
181 drbd_warn(device, "%s: error=%d s=%llus\n",
182 is_write ? (is_discard ? "discard" : "write")
183 : "read", bio->bi_error,
184 (unsigned long long)peer_req->i.sector);
185
186 if (bio->bi_error)
187 set_bit(__EE_WAS_ERROR, &peer_req->flags);
188
189 bio_put(bio); /* no need for the bio anymore */
190 if (atomic_dec_and_test(&peer_req->pending_bios)) {
191 if (is_write)
192 drbd_endio_write_sec_final(peer_req);
193 else
194 drbd_endio_read_sec_final(peer_req);
195 }
196 }
197
198 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
199 */
200 void drbd_request_endio(struct bio *bio)
201 {
202 unsigned long flags;
203 struct drbd_request *req = bio->bi_private;
204 struct drbd_device *device = req->device;
205 struct bio_and_error m;
206 enum drbd_req_event what;
207
208 /* If this request was aborted locally before,
209 * but now was completed "successfully",
210 * chances are that this caused arbitrary data corruption.
211 *
212 * "aborting" requests, or force-detaching the disk, is intended for
213 * completely blocked/hung local backing devices which do no longer
214 * complete requests at all, not even do error completions. In this
215 * situation, usually a hard-reset and failover is the only way out.
216 *
217 * By "aborting", basically faking a local error-completion,
218 * we allow for a more graceful swichover by cleanly migrating services.
219 * Still the affected node has to be rebooted "soon".
220 *
221 * By completing these requests, we allow the upper layers to re-use
222 * the associated data pages.
223 *
224 * If later the local backing device "recovers", and now DMAs some data
225 * from disk into the original request pages, in the best case it will
226 * just put random data into unused pages; but typically it will corrupt
227 * meanwhile completely unrelated data, causing all sorts of damage.
228 *
229 * Which means delayed successful completion,
230 * especially for READ requests,
231 * is a reason to panic().
232 *
233 * We assume that a delayed *error* completion is OK,
234 * though we still will complain noisily about it.
235 */
236 if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
237 if (__ratelimit(&drbd_ratelimit_state))
238 drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
239
240 if (!bio->bi_error)
241 panic("possible random memory corruption caused by delayed completion of aborted local request\n");
242 }
243
244 /* to avoid recursion in __req_mod */
245 if (unlikely(bio->bi_error)) {
246 if (bio->bi_rw & REQ_DISCARD)
247 what = (bio->bi_error == -EOPNOTSUPP)
248 ? DISCARD_COMPLETED_NOTSUPP
249 : DISCARD_COMPLETED_WITH_ERROR;
250 else
251 what = (bio_data_dir(bio) == WRITE)
252 ? WRITE_COMPLETED_WITH_ERROR
253 : (bio_rw(bio) == READ)
254 ? READ_COMPLETED_WITH_ERROR
255 : READ_AHEAD_COMPLETED_WITH_ERROR;
256 } else
257 what = COMPLETED_OK;
258
259 bio_put(req->private_bio);
260 req->private_bio = ERR_PTR(bio->bi_error);
261
262 /* not req_mod(), we need irqsave here! */
263 spin_lock_irqsave(&device->resource->req_lock, flags);
264 __req_mod(req, what, &m);
265 spin_unlock_irqrestore(&device->resource->req_lock, flags);
266 put_ldev(device);
267
268 if (m.bio)
269 complete_master_bio(device, &m);
270 }
271
272 void drbd_csum_ee(struct crypto_hash *tfm, struct drbd_peer_request *peer_req, void *digest)
273 {
274 struct hash_desc desc;
275 struct scatterlist sg;
276 struct page *page = peer_req->pages;
277 struct page *tmp;
278 unsigned len;
279
280 desc.tfm = tfm;
281 desc.flags = 0;
282
283 sg_init_table(&sg, 1);
284 crypto_hash_init(&desc);
285
286 while ((tmp = page_chain_next(page))) {
287 /* all but the last page will be fully used */
288 sg_set_page(&sg, page, PAGE_SIZE, 0);
289 crypto_hash_update(&desc, &sg, sg.length);
290 page = tmp;
291 }
292 /* and now the last, possibly only partially used page */
293 len = peer_req->i.size & (PAGE_SIZE - 1);
294 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
295 crypto_hash_update(&desc, &sg, sg.length);
296 crypto_hash_final(&desc, digest);
297 }
298
299 void drbd_csum_bio(struct crypto_hash *tfm, struct bio *bio, void *digest)
300 {
301 struct hash_desc desc;
302 struct scatterlist sg;
303 struct bio_vec bvec;
304 struct bvec_iter iter;
305
306 desc.tfm = tfm;
307 desc.flags = 0;
308
309 sg_init_table(&sg, 1);
310 crypto_hash_init(&desc);
311
312 bio_for_each_segment(bvec, bio, iter) {
313 sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
314 crypto_hash_update(&desc, &sg, sg.length);
315 }
316 crypto_hash_final(&desc, digest);
317 }
318
319 /* MAYBE merge common code with w_e_end_ov_req */
320 static int w_e_send_csum(struct drbd_work *w, int cancel)
321 {
322 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
323 struct drbd_peer_device *peer_device = peer_req->peer_device;
324 struct drbd_device *device = peer_device->device;
325 int digest_size;
326 void *digest;
327 int err = 0;
328
329 if (unlikely(cancel))
330 goto out;
331
332 if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
333 goto out;
334
335 digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
336 digest = kmalloc(digest_size, GFP_NOIO);
337 if (digest) {
338 sector_t sector = peer_req->i.sector;
339 unsigned int size = peer_req->i.size;
340 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
341 /* Free peer_req and pages before send.
342 * In case we block on congestion, we could otherwise run into
343 * some distributed deadlock, if the other side blocks on
344 * congestion as well, because our receiver blocks in
345 * drbd_alloc_pages due to pp_in_use > max_buffers. */
346 drbd_free_peer_req(device, peer_req);
347 peer_req = NULL;
348 inc_rs_pending(device);
349 err = drbd_send_drequest_csum(peer_device, sector, size,
350 digest, digest_size,
351 P_CSUM_RS_REQUEST);
352 kfree(digest);
353 } else {
354 drbd_err(device, "kmalloc() of digest failed.\n");
355 err = -ENOMEM;
356 }
357
358 out:
359 if (peer_req)
360 drbd_free_peer_req(device, peer_req);
361
362 if (unlikely(err))
363 drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
364 return err;
365 }
366
367 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
368
369 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
370 {
371 struct drbd_device *device = peer_device->device;
372 struct drbd_peer_request *peer_req;
373
374 if (!get_ldev(device))
375 return -EIO;
376
377 /* GFP_TRY, because if there is no memory available right now, this may
378 * be rescheduled for later. It is "only" background resync, after all. */
379 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
380 size, true /* has real payload */, GFP_TRY);
381 if (!peer_req)
382 goto defer;
383
384 peer_req->w.cb = w_e_send_csum;
385 spin_lock_irq(&device->resource->req_lock);
386 list_add_tail(&peer_req->w.list, &device->read_ee);
387 spin_unlock_irq(&device->resource->req_lock);
388
389 atomic_add(size >> 9, &device->rs_sect_ev);
390 if (drbd_submit_peer_request(device, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
391 return 0;
392
393 /* If it failed because of ENOMEM, retry should help. If it failed
394 * because bio_add_page failed (probably broken lower level driver),
395 * retry may or may not help.
396 * If it does not, you may need to force disconnect. */
397 spin_lock_irq(&device->resource->req_lock);
398 list_del(&peer_req->w.list);
399 spin_unlock_irq(&device->resource->req_lock);
400
401 drbd_free_peer_req(device, peer_req);
402 defer:
403 put_ldev(device);
404 return -EAGAIN;
405 }
406
407 int w_resync_timer(struct drbd_work *w, int cancel)
408 {
409 struct drbd_device *device =
410 container_of(w, struct drbd_device, resync_work);
411
412 switch (device->state.conn) {
413 case C_VERIFY_S:
414 make_ov_request(device, cancel);
415 break;
416 case C_SYNC_TARGET:
417 make_resync_request(device, cancel);
418 break;
419 }
420
421 return 0;
422 }
423
424 void resync_timer_fn(unsigned long data)
425 {
426 struct drbd_device *device = (struct drbd_device *) data;
427
428 drbd_queue_work_if_unqueued(
429 &first_peer_device(device)->connection->sender_work,
430 &device->resync_work);
431 }
432
433 static void fifo_set(struct fifo_buffer *fb, int value)
434 {
435 int i;
436
437 for (i = 0; i < fb->size; i++)
438 fb->values[i] = value;
439 }
440
441 static int fifo_push(struct fifo_buffer *fb, int value)
442 {
443 int ov;
444
445 ov = fb->values[fb->head_index];
446 fb->values[fb->head_index++] = value;
447
448 if (fb->head_index >= fb->size)
449 fb->head_index = 0;
450
451 return ov;
452 }
453
454 static void fifo_add_val(struct fifo_buffer *fb, int value)
455 {
456 int i;
457
458 for (i = 0; i < fb->size; i++)
459 fb->values[i] += value;
460 }
461
462 struct fifo_buffer *fifo_alloc(int fifo_size)
463 {
464 struct fifo_buffer *fb;
465
466 fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
467 if (!fb)
468 return NULL;
469
470 fb->head_index = 0;
471 fb->size = fifo_size;
472 fb->total = 0;
473
474 return fb;
475 }
476
477 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
478 {
479 struct disk_conf *dc;
480 unsigned int want; /* The number of sectors we want in-flight */
481 int req_sect; /* Number of sectors to request in this turn */
482 int correction; /* Number of sectors more we need in-flight */
483 int cps; /* correction per invocation of drbd_rs_controller() */
484 int steps; /* Number of time steps to plan ahead */
485 int curr_corr;
486 int max_sect;
487 struct fifo_buffer *plan;
488
489 dc = rcu_dereference(device->ldev->disk_conf);
490 plan = rcu_dereference(device->rs_plan_s);
491
492 steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
493
494 if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
495 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
496 } else { /* normal path */
497 want = dc->c_fill_target ? dc->c_fill_target :
498 sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
499 }
500
501 correction = want - device->rs_in_flight - plan->total;
502
503 /* Plan ahead */
504 cps = correction / steps;
505 fifo_add_val(plan, cps);
506 plan->total += cps * steps;
507
508 /* What we do in this step */
509 curr_corr = fifo_push(plan, 0);
510 plan->total -= curr_corr;
511
512 req_sect = sect_in + curr_corr;
513 if (req_sect < 0)
514 req_sect = 0;
515
516 max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
517 if (req_sect > max_sect)
518 req_sect = max_sect;
519
520 /*
521 drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
522 sect_in, device->rs_in_flight, want, correction,
523 steps, cps, device->rs_planed, curr_corr, req_sect);
524 */
525
526 return req_sect;
527 }
528
529 static int drbd_rs_number_requests(struct drbd_device *device)
530 {
531 unsigned int sect_in; /* Number of sectors that came in since the last turn */
532 int number, mxb;
533
534 sect_in = atomic_xchg(&device->rs_sect_in, 0);
535 device->rs_in_flight -= sect_in;
536
537 rcu_read_lock();
538 mxb = drbd_get_max_buffers(device) / 2;
539 if (rcu_dereference(device->rs_plan_s)->size) {
540 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
541 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
542 } else {
543 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
544 number = SLEEP_TIME * device->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ);
545 }
546 rcu_read_unlock();
547
548 /* Don't have more than "max-buffers"/2 in-flight.
549 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
550 * potentially causing a distributed deadlock on congestion during
551 * online-verify or (checksum-based) resync, if max-buffers,
552 * socket buffer sizes and resync rate settings are mis-configured. */
553
554 /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
555 * mxb (as used here, and in drbd_alloc_pages on the peer) is
556 * "number of pages" (typically also 4k),
557 * but "rs_in_flight" is in "sectors" (512 Byte). */
558 if (mxb - device->rs_in_flight/8 < number)
559 number = mxb - device->rs_in_flight/8;
560
561 return number;
562 }
563
564 static int make_resync_request(struct drbd_device *const device, int cancel)
565 {
566 struct drbd_peer_device *const peer_device = first_peer_device(device);
567 struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
568 unsigned long bit;
569 sector_t sector;
570 const sector_t capacity = drbd_get_capacity(device->this_bdev);
571 int max_bio_size;
572 int number, rollback_i, size;
573 int align, requeue = 0;
574 int i = 0;
575
576 if (unlikely(cancel))
577 return 0;
578
579 if (device->rs_total == 0) {
580 /* empty resync? */
581 drbd_resync_finished(device);
582 return 0;
583 }
584
585 if (!get_ldev(device)) {
586 /* Since we only need to access device->rsync a
587 get_ldev_if_state(device,D_FAILED) would be sufficient, but
588 to continue resync with a broken disk makes no sense at
589 all */
590 drbd_err(device, "Disk broke down during resync!\n");
591 return 0;
592 }
593
594 max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
595 number = drbd_rs_number_requests(device);
596 if (number <= 0)
597 goto requeue;
598
599 for (i = 0; i < number; i++) {
600 /* Stop generating RS requests when half of the send buffer is filled,
601 * but notify TCP that we'd like to have more space. */
602 mutex_lock(&connection->data.mutex);
603 if (connection->data.socket) {
604 struct sock *sk = connection->data.socket->sk;
605 int queued = sk->sk_wmem_queued;
606 int sndbuf = sk->sk_sndbuf;
607 if (queued > sndbuf / 2) {
608 requeue = 1;
609 if (sk->sk_socket)
610 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
611 }
612 } else
613 requeue = 1;
614 mutex_unlock(&connection->data.mutex);
615 if (requeue)
616 goto requeue;
617
618 next_sector:
619 size = BM_BLOCK_SIZE;
620 bit = drbd_bm_find_next(device, device->bm_resync_fo);
621
622 if (bit == DRBD_END_OF_BITMAP) {
623 device->bm_resync_fo = drbd_bm_bits(device);
624 put_ldev(device);
625 return 0;
626 }
627
628 sector = BM_BIT_TO_SECT(bit);
629
630 if (drbd_try_rs_begin_io(device, sector)) {
631 device->bm_resync_fo = bit;
632 goto requeue;
633 }
634 device->bm_resync_fo = bit + 1;
635
636 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
637 drbd_rs_complete_io(device, sector);
638 goto next_sector;
639 }
640
641 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
642 /* try to find some adjacent bits.
643 * we stop if we have already the maximum req size.
644 *
645 * Additionally always align bigger requests, in order to
646 * be prepared for all stripe sizes of software RAIDs.
647 */
648 align = 1;
649 rollback_i = i;
650 while (i < number) {
651 if (size + BM_BLOCK_SIZE > max_bio_size)
652 break;
653
654 /* Be always aligned */
655 if (sector & ((1<<(align+3))-1))
656 break;
657
658 /* do not cross extent boundaries */
659 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
660 break;
661 /* now, is it actually dirty, after all?
662 * caution, drbd_bm_test_bit is tri-state for some
663 * obscure reason; ( b == 0 ) would get the out-of-band
664 * only accidentally right because of the "oddly sized"
665 * adjustment below */
666 if (drbd_bm_test_bit(device, bit+1) != 1)
667 break;
668 bit++;
669 size += BM_BLOCK_SIZE;
670 if ((BM_BLOCK_SIZE << align) <= size)
671 align++;
672 i++;
673 }
674 /* if we merged some,
675 * reset the offset to start the next drbd_bm_find_next from */
676 if (size > BM_BLOCK_SIZE)
677 device->bm_resync_fo = bit + 1;
678 #endif
679
680 /* adjust very last sectors, in case we are oddly sized */
681 if (sector + (size>>9) > capacity)
682 size = (capacity-sector)<<9;
683
684 if (device->use_csums) {
685 switch (read_for_csum(peer_device, sector, size)) {
686 case -EIO: /* Disk failure */
687 put_ldev(device);
688 return -EIO;
689 case -EAGAIN: /* allocation failed, or ldev busy */
690 drbd_rs_complete_io(device, sector);
691 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
692 i = rollback_i;
693 goto requeue;
694 case 0:
695 /* everything ok */
696 break;
697 default:
698 BUG();
699 }
700 } else {
701 int err;
702
703 inc_rs_pending(device);
704 err = drbd_send_drequest(peer_device, P_RS_DATA_REQUEST,
705 sector, size, ID_SYNCER);
706 if (err) {
707 drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
708 dec_rs_pending(device);
709 put_ldev(device);
710 return err;
711 }
712 }
713 }
714
715 if (device->bm_resync_fo >= drbd_bm_bits(device)) {
716 /* last syncer _request_ was sent,
717 * but the P_RS_DATA_REPLY not yet received. sync will end (and
718 * next sync group will resume), as soon as we receive the last
719 * resync data block, and the last bit is cleared.
720 * until then resync "work" is "inactive" ...
721 */
722 put_ldev(device);
723 return 0;
724 }
725
726 requeue:
727 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
728 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
729 put_ldev(device);
730 return 0;
731 }
732
733 static int make_ov_request(struct drbd_device *device, int cancel)
734 {
735 int number, i, size;
736 sector_t sector;
737 const sector_t capacity = drbd_get_capacity(device->this_bdev);
738 bool stop_sector_reached = false;
739
740 if (unlikely(cancel))
741 return 1;
742
743 number = drbd_rs_number_requests(device);
744
745 sector = device->ov_position;
746 for (i = 0; i < number; i++) {
747 if (sector >= capacity)
748 return 1;
749
750 /* We check for "finished" only in the reply path:
751 * w_e_end_ov_reply().
752 * We need to send at least one request out. */
753 stop_sector_reached = i > 0
754 && verify_can_do_stop_sector(device)
755 && sector >= device->ov_stop_sector;
756 if (stop_sector_reached)
757 break;
758
759 size = BM_BLOCK_SIZE;
760
761 if (drbd_try_rs_begin_io(device, sector)) {
762 device->ov_position = sector;
763 goto requeue;
764 }
765
766 if (sector + (size>>9) > capacity)
767 size = (capacity-sector)<<9;
768
769 inc_rs_pending(device);
770 if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
771 dec_rs_pending(device);
772 return 0;
773 }
774 sector += BM_SECT_PER_BIT;
775 }
776 device->ov_position = sector;
777
778 requeue:
779 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
780 if (i == 0 || !stop_sector_reached)
781 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
782 return 1;
783 }
784
785 int w_ov_finished(struct drbd_work *w, int cancel)
786 {
787 struct drbd_device_work *dw =
788 container_of(w, struct drbd_device_work, w);
789 struct drbd_device *device = dw->device;
790 kfree(dw);
791 ov_out_of_sync_print(device);
792 drbd_resync_finished(device);
793
794 return 0;
795 }
796
797 static int w_resync_finished(struct drbd_work *w, int cancel)
798 {
799 struct drbd_device_work *dw =
800 container_of(w, struct drbd_device_work, w);
801 struct drbd_device *device = dw->device;
802 kfree(dw);
803
804 drbd_resync_finished(device);
805
806 return 0;
807 }
808
809 static void ping_peer(struct drbd_device *device)
810 {
811 struct drbd_connection *connection = first_peer_device(device)->connection;
812
813 clear_bit(GOT_PING_ACK, &connection->flags);
814 request_ping(connection);
815 wait_event(connection->ping_wait,
816 test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
817 }
818
819 int drbd_resync_finished(struct drbd_device *device)
820 {
821 unsigned long db, dt, dbdt;
822 unsigned long n_oos;
823 union drbd_state os, ns;
824 struct drbd_device_work *dw;
825 char *khelper_cmd = NULL;
826 int verify_done = 0;
827
828 /* Remove all elements from the resync LRU. Since future actions
829 * might set bits in the (main) bitmap, then the entries in the
830 * resync LRU would be wrong. */
831 if (drbd_rs_del_all(device)) {
832 /* In case this is not possible now, most probably because
833 * there are P_RS_DATA_REPLY Packets lingering on the worker's
834 * queue (or even the read operations for those packets
835 * is not finished by now). Retry in 100ms. */
836
837 schedule_timeout_interruptible(HZ / 10);
838 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
839 if (dw) {
840 dw->w.cb = w_resync_finished;
841 dw->device = device;
842 drbd_queue_work(&first_peer_device(device)->connection->sender_work,
843 &dw->w);
844 return 1;
845 }
846 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
847 }
848
849 dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
850 if (dt <= 0)
851 dt = 1;
852
853 db = device->rs_total;
854 /* adjust for verify start and stop sectors, respective reached position */
855 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
856 db -= device->ov_left;
857
858 dbdt = Bit2KB(db/dt);
859 device->rs_paused /= HZ;
860
861 if (!get_ldev(device))
862 goto out;
863
864 ping_peer(device);
865
866 spin_lock_irq(&device->resource->req_lock);
867 os = drbd_read_state(device);
868
869 verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
870
871 /* This protects us against multiple calls (that can happen in the presence
872 of application IO), and against connectivity loss just before we arrive here. */
873 if (os.conn <= C_CONNECTED)
874 goto out_unlock;
875
876 ns = os;
877 ns.conn = C_CONNECTED;
878
879 drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
880 verify_done ? "Online verify" : "Resync",
881 dt + device->rs_paused, device->rs_paused, dbdt);
882
883 n_oos = drbd_bm_total_weight(device);
884
885 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
886 if (n_oos) {
887 drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
888 n_oos, Bit2KB(1));
889 khelper_cmd = "out-of-sync";
890 }
891 } else {
892 D_ASSERT(device, (n_oos - device->rs_failed) == 0);
893
894 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
895 khelper_cmd = "after-resync-target";
896
897 if (device->use_csums && device->rs_total) {
898 const unsigned long s = device->rs_same_csum;
899 const unsigned long t = device->rs_total;
900 const int ratio =
901 (t == 0) ? 0 :
902 (t < 100000) ? ((s*100)/t) : (s/(t/100));
903 drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
904 "transferred %luK total %luK\n",
905 ratio,
906 Bit2KB(device->rs_same_csum),
907 Bit2KB(device->rs_total - device->rs_same_csum),
908 Bit2KB(device->rs_total));
909 }
910 }
911
912 if (device->rs_failed) {
913 drbd_info(device, " %lu failed blocks\n", device->rs_failed);
914
915 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
916 ns.disk = D_INCONSISTENT;
917 ns.pdsk = D_UP_TO_DATE;
918 } else {
919 ns.disk = D_UP_TO_DATE;
920 ns.pdsk = D_INCONSISTENT;
921 }
922 } else {
923 ns.disk = D_UP_TO_DATE;
924 ns.pdsk = D_UP_TO_DATE;
925
926 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
927 if (device->p_uuid) {
928 int i;
929 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
930 _drbd_uuid_set(device, i, device->p_uuid[i]);
931 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
932 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
933 } else {
934 drbd_err(device, "device->p_uuid is NULL! BUG\n");
935 }
936 }
937
938 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
939 /* for verify runs, we don't update uuids here,
940 * so there would be nothing to report. */
941 drbd_uuid_set_bm(device, 0UL);
942 drbd_print_uuids(device, "updated UUIDs");
943 if (device->p_uuid) {
944 /* Now the two UUID sets are equal, update what we
945 * know of the peer. */
946 int i;
947 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
948 device->p_uuid[i] = device->ldev->md.uuid[i];
949 }
950 }
951 }
952
953 _drbd_set_state(device, ns, CS_VERBOSE, NULL);
954 out_unlock:
955 spin_unlock_irq(&device->resource->req_lock);
956 put_ldev(device);
957 out:
958 device->rs_total = 0;
959 device->rs_failed = 0;
960 device->rs_paused = 0;
961
962 /* reset start sector, if we reached end of device */
963 if (verify_done && device->ov_left == 0)
964 device->ov_start_sector = 0;
965
966 drbd_md_sync(device);
967
968 if (khelper_cmd)
969 drbd_khelper(device, khelper_cmd);
970
971 return 1;
972 }
973
974 /* helper */
975 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
976 {
977 if (drbd_peer_req_has_active_page(peer_req)) {
978 /* This might happen if sendpage() has not finished */
979 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
980 atomic_add(i, &device->pp_in_use_by_net);
981 atomic_sub(i, &device->pp_in_use);
982 spin_lock_irq(&device->resource->req_lock);
983 list_add_tail(&peer_req->w.list, &device->net_ee);
984 spin_unlock_irq(&device->resource->req_lock);
985 wake_up(&drbd_pp_wait);
986 } else
987 drbd_free_peer_req(device, peer_req);
988 }
989
990 /**
991 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
992 * @device: DRBD device.
993 * @w: work object.
994 * @cancel: The connection will be closed anyways
995 */
996 int w_e_end_data_req(struct drbd_work *w, int cancel)
997 {
998 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
999 struct drbd_peer_device *peer_device = peer_req->peer_device;
1000 struct drbd_device *device = peer_device->device;
1001 int err;
1002
1003 if (unlikely(cancel)) {
1004 drbd_free_peer_req(device, peer_req);
1005 dec_unacked(device);
1006 return 0;
1007 }
1008
1009 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1010 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1011 } else {
1012 if (__ratelimit(&drbd_ratelimit_state))
1013 drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1014 (unsigned long long)peer_req->i.sector);
1015
1016 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1017 }
1018
1019 dec_unacked(device);
1020
1021 move_to_net_ee_or_free(device, peer_req);
1022
1023 if (unlikely(err))
1024 drbd_err(device, "drbd_send_block() failed\n");
1025 return err;
1026 }
1027
1028 /**
1029 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1030 * @w: work object.
1031 * @cancel: The connection will be closed anyways
1032 */
1033 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1034 {
1035 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1036 struct drbd_peer_device *peer_device = peer_req->peer_device;
1037 struct drbd_device *device = peer_device->device;
1038 int err;
1039
1040 if (unlikely(cancel)) {
1041 drbd_free_peer_req(device, peer_req);
1042 dec_unacked(device);
1043 return 0;
1044 }
1045
1046 if (get_ldev_if_state(device, D_FAILED)) {
1047 drbd_rs_complete_io(device, peer_req->i.sector);
1048 put_ldev(device);
1049 }
1050
1051 if (device->state.conn == C_AHEAD) {
1052 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1053 } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1054 if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1055 inc_rs_pending(device);
1056 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1057 } else {
1058 if (__ratelimit(&drbd_ratelimit_state))
1059 drbd_err(device, "Not sending RSDataReply, "
1060 "partner DISKLESS!\n");
1061 err = 0;
1062 }
1063 } else {
1064 if (__ratelimit(&drbd_ratelimit_state))
1065 drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1066 (unsigned long long)peer_req->i.sector);
1067
1068 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1069
1070 /* update resync data with failure */
1071 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1072 }
1073
1074 dec_unacked(device);
1075
1076 move_to_net_ee_or_free(device, peer_req);
1077
1078 if (unlikely(err))
1079 drbd_err(device, "drbd_send_block() failed\n");
1080 return err;
1081 }
1082
1083 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1084 {
1085 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1086 struct drbd_peer_device *peer_device = peer_req->peer_device;
1087 struct drbd_device *device = peer_device->device;
1088 struct digest_info *di;
1089 int digest_size;
1090 void *digest = NULL;
1091 int err, eq = 0;
1092
1093 if (unlikely(cancel)) {
1094 drbd_free_peer_req(device, peer_req);
1095 dec_unacked(device);
1096 return 0;
1097 }
1098
1099 if (get_ldev(device)) {
1100 drbd_rs_complete_io(device, peer_req->i.sector);
1101 put_ldev(device);
1102 }
1103
1104 di = peer_req->digest;
1105
1106 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1107 /* quick hack to try to avoid a race against reconfiguration.
1108 * a real fix would be much more involved,
1109 * introducing more locking mechanisms */
1110 if (peer_device->connection->csums_tfm) {
1111 digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
1112 D_ASSERT(device, digest_size == di->digest_size);
1113 digest = kmalloc(digest_size, GFP_NOIO);
1114 }
1115 if (digest) {
1116 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1117 eq = !memcmp(digest, di->digest, digest_size);
1118 kfree(digest);
1119 }
1120
1121 if (eq) {
1122 drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1123 /* rs_same_csums unit is BM_BLOCK_SIZE */
1124 device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1125 err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1126 } else {
1127 inc_rs_pending(device);
1128 peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1129 peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1130 kfree(di);
1131 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1132 }
1133 } else {
1134 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1135 if (__ratelimit(&drbd_ratelimit_state))
1136 drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1137 }
1138
1139 dec_unacked(device);
1140 move_to_net_ee_or_free(device, peer_req);
1141
1142 if (unlikely(err))
1143 drbd_err(device, "drbd_send_block/ack() failed\n");
1144 return err;
1145 }
1146
1147 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1148 {
1149 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1150 struct drbd_peer_device *peer_device = peer_req->peer_device;
1151 struct drbd_device *device = peer_device->device;
1152 sector_t sector = peer_req->i.sector;
1153 unsigned int size = peer_req->i.size;
1154 int digest_size;
1155 void *digest;
1156 int err = 0;
1157
1158 if (unlikely(cancel))
1159 goto out;
1160
1161 digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
1162 digest = kmalloc(digest_size, GFP_NOIO);
1163 if (!digest) {
1164 err = 1; /* terminate the connection in case the allocation failed */
1165 goto out;
1166 }
1167
1168 if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1169 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1170 else
1171 memset(digest, 0, digest_size);
1172
1173 /* Free e and pages before send.
1174 * In case we block on congestion, we could otherwise run into
1175 * some distributed deadlock, if the other side blocks on
1176 * congestion as well, because our receiver blocks in
1177 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1178 drbd_free_peer_req(device, peer_req);
1179 peer_req = NULL;
1180 inc_rs_pending(device);
1181 err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1182 if (err)
1183 dec_rs_pending(device);
1184 kfree(digest);
1185
1186 out:
1187 if (peer_req)
1188 drbd_free_peer_req(device, peer_req);
1189 dec_unacked(device);
1190 return err;
1191 }
1192
1193 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1194 {
1195 if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1196 device->ov_last_oos_size += size>>9;
1197 } else {
1198 device->ov_last_oos_start = sector;
1199 device->ov_last_oos_size = size>>9;
1200 }
1201 drbd_set_out_of_sync(device, sector, size);
1202 }
1203
1204 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1205 {
1206 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1207 struct drbd_peer_device *peer_device = peer_req->peer_device;
1208 struct drbd_device *device = peer_device->device;
1209 struct digest_info *di;
1210 void *digest;
1211 sector_t sector = peer_req->i.sector;
1212 unsigned int size = peer_req->i.size;
1213 int digest_size;
1214 int err, eq = 0;
1215 bool stop_sector_reached = false;
1216
1217 if (unlikely(cancel)) {
1218 drbd_free_peer_req(device, peer_req);
1219 dec_unacked(device);
1220 return 0;
1221 }
1222
1223 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1224 * the resync lru has been cleaned up already */
1225 if (get_ldev(device)) {
1226 drbd_rs_complete_io(device, peer_req->i.sector);
1227 put_ldev(device);
1228 }
1229
1230 di = peer_req->digest;
1231
1232 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1233 digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
1234 digest = kmalloc(digest_size, GFP_NOIO);
1235 if (digest) {
1236 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1237
1238 D_ASSERT(device, digest_size == di->digest_size);
1239 eq = !memcmp(digest, di->digest, digest_size);
1240 kfree(digest);
1241 }
1242 }
1243
1244 /* Free peer_req and pages before send.
1245 * In case we block on congestion, we could otherwise run into
1246 * some distributed deadlock, if the other side blocks on
1247 * congestion as well, because our receiver blocks in
1248 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1249 drbd_free_peer_req(device, peer_req);
1250 if (!eq)
1251 drbd_ov_out_of_sync_found(device, sector, size);
1252 else
1253 ov_out_of_sync_print(device);
1254
1255 err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1256 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1257
1258 dec_unacked(device);
1259
1260 --device->ov_left;
1261
1262 /* let's advance progress step marks only for every other megabyte */
1263 if ((device->ov_left & 0x200) == 0x200)
1264 drbd_advance_rs_marks(device, device->ov_left);
1265
1266 stop_sector_reached = verify_can_do_stop_sector(device) &&
1267 (sector + (size>>9)) >= device->ov_stop_sector;
1268
1269 if (device->ov_left == 0 || stop_sector_reached) {
1270 ov_out_of_sync_print(device);
1271 drbd_resync_finished(device);
1272 }
1273
1274 return err;
1275 }
1276
1277 /* FIXME
1278 * We need to track the number of pending barrier acks,
1279 * and to be able to wait for them.
1280 * See also comment in drbd_adm_attach before drbd_suspend_io.
1281 */
1282 static int drbd_send_barrier(struct drbd_connection *connection)
1283 {
1284 struct p_barrier *p;
1285 struct drbd_socket *sock;
1286
1287 sock = &connection->data;
1288 p = conn_prepare_command(connection, sock);
1289 if (!p)
1290 return -EIO;
1291 p->barrier = connection->send.current_epoch_nr;
1292 p->pad = 0;
1293 connection->send.current_epoch_writes = 0;
1294
1295 return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1296 }
1297
1298 int w_send_write_hint(struct drbd_work *w, int cancel)
1299 {
1300 struct drbd_device *device =
1301 container_of(w, struct drbd_device, unplug_work);
1302 struct drbd_socket *sock;
1303
1304 if (cancel)
1305 return 0;
1306 sock = &first_peer_device(device)->connection->data;
1307 if (!drbd_prepare_command(first_peer_device(device), sock))
1308 return -EIO;
1309 return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1310 }
1311
1312 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1313 {
1314 if (!connection->send.seen_any_write_yet) {
1315 connection->send.seen_any_write_yet = true;
1316 connection->send.current_epoch_nr = epoch;
1317 connection->send.current_epoch_writes = 0;
1318 }
1319 }
1320
1321 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1322 {
1323 /* re-init if first write on this connection */
1324 if (!connection->send.seen_any_write_yet)
1325 return;
1326 if (connection->send.current_epoch_nr != epoch) {
1327 if (connection->send.current_epoch_writes)
1328 drbd_send_barrier(connection);
1329 connection->send.current_epoch_nr = epoch;
1330 }
1331 }
1332
1333 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1334 {
1335 struct drbd_request *req = container_of(w, struct drbd_request, w);
1336 struct drbd_device *device = req->device;
1337 struct drbd_peer_device *const peer_device = first_peer_device(device);
1338 struct drbd_connection *const connection = peer_device->connection;
1339 int err;
1340
1341 if (unlikely(cancel)) {
1342 req_mod(req, SEND_CANCELED);
1343 return 0;
1344 }
1345 req->pre_send_jif = jiffies;
1346
1347 /* this time, no connection->send.current_epoch_writes++;
1348 * If it was sent, it was the closing barrier for the last
1349 * replicated epoch, before we went into AHEAD mode.
1350 * No more barriers will be sent, until we leave AHEAD mode again. */
1351 maybe_send_barrier(connection, req->epoch);
1352
1353 err = drbd_send_out_of_sync(peer_device, req);
1354 req_mod(req, OOS_HANDED_TO_NETWORK);
1355
1356 return err;
1357 }
1358
1359 /**
1360 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1361 * @w: work object.
1362 * @cancel: The connection will be closed anyways
1363 */
1364 int w_send_dblock(struct drbd_work *w, int cancel)
1365 {
1366 struct drbd_request *req = container_of(w, struct drbd_request, w);
1367 struct drbd_device *device = req->device;
1368 struct drbd_peer_device *const peer_device = first_peer_device(device);
1369 struct drbd_connection *connection = peer_device->connection;
1370 int err;
1371
1372 if (unlikely(cancel)) {
1373 req_mod(req, SEND_CANCELED);
1374 return 0;
1375 }
1376 req->pre_send_jif = jiffies;
1377
1378 re_init_if_first_write(connection, req->epoch);
1379 maybe_send_barrier(connection, req->epoch);
1380 connection->send.current_epoch_writes++;
1381
1382 err = drbd_send_dblock(peer_device, req);
1383 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1384
1385 return err;
1386 }
1387
1388 /**
1389 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1390 * @w: work object.
1391 * @cancel: The connection will be closed anyways
1392 */
1393 int w_send_read_req(struct drbd_work *w, int cancel)
1394 {
1395 struct drbd_request *req = container_of(w, struct drbd_request, w);
1396 struct drbd_device *device = req->device;
1397 struct drbd_peer_device *const peer_device = first_peer_device(device);
1398 struct drbd_connection *connection = peer_device->connection;
1399 int err;
1400
1401 if (unlikely(cancel)) {
1402 req_mod(req, SEND_CANCELED);
1403 return 0;
1404 }
1405 req->pre_send_jif = jiffies;
1406
1407 /* Even read requests may close a write epoch,
1408 * if there was any yet. */
1409 maybe_send_barrier(connection, req->epoch);
1410
1411 err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1412 (unsigned long)req);
1413
1414 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1415
1416 return err;
1417 }
1418
1419 int w_restart_disk_io(struct drbd_work *w, int cancel)
1420 {
1421 struct drbd_request *req = container_of(w, struct drbd_request, w);
1422 struct drbd_device *device = req->device;
1423
1424 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1425 drbd_al_begin_io(device, &req->i);
1426
1427 drbd_req_make_private_bio(req, req->master_bio);
1428 req->private_bio->bi_bdev = device->ldev->backing_bdev;
1429 generic_make_request(req->private_bio);
1430
1431 return 0;
1432 }
1433
1434 static int _drbd_may_sync_now(struct drbd_device *device)
1435 {
1436 struct drbd_device *odev = device;
1437 int resync_after;
1438
1439 while (1) {
1440 if (!odev->ldev || odev->state.disk == D_DISKLESS)
1441 return 1;
1442 rcu_read_lock();
1443 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1444 rcu_read_unlock();
1445 if (resync_after == -1)
1446 return 1;
1447 odev = minor_to_device(resync_after);
1448 if (!odev)
1449 return 1;
1450 if ((odev->state.conn >= C_SYNC_SOURCE &&
1451 odev->state.conn <= C_PAUSED_SYNC_T) ||
1452 odev->state.aftr_isp || odev->state.peer_isp ||
1453 odev->state.user_isp)
1454 return 0;
1455 }
1456 }
1457
1458 /**
1459 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1460 * @device: DRBD device.
1461 *
1462 * Called from process context only (admin command and after_state_ch).
1463 */
1464 static int _drbd_pause_after(struct drbd_device *device)
1465 {
1466 struct drbd_device *odev;
1467 int i, rv = 0;
1468
1469 rcu_read_lock();
1470 idr_for_each_entry(&drbd_devices, odev, i) {
1471 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1472 continue;
1473 if (!_drbd_may_sync_now(odev))
1474 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1475 != SS_NOTHING_TO_DO);
1476 }
1477 rcu_read_unlock();
1478
1479 return rv;
1480 }
1481
1482 /**
1483 * _drbd_resume_next() - Resume resync on all devices that may resync now
1484 * @device: DRBD device.
1485 *
1486 * Called from process context only (admin command and worker).
1487 */
1488 static int _drbd_resume_next(struct drbd_device *device)
1489 {
1490 struct drbd_device *odev;
1491 int i, rv = 0;
1492
1493 rcu_read_lock();
1494 idr_for_each_entry(&drbd_devices, odev, i) {
1495 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1496 continue;
1497 if (odev->state.aftr_isp) {
1498 if (_drbd_may_sync_now(odev))
1499 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1500 CS_HARD, NULL)
1501 != SS_NOTHING_TO_DO) ;
1502 }
1503 }
1504 rcu_read_unlock();
1505 return rv;
1506 }
1507
1508 void resume_next_sg(struct drbd_device *device)
1509 {
1510 write_lock_irq(&global_state_lock);
1511 _drbd_resume_next(device);
1512 write_unlock_irq(&global_state_lock);
1513 }
1514
1515 void suspend_other_sg(struct drbd_device *device)
1516 {
1517 write_lock_irq(&global_state_lock);
1518 _drbd_pause_after(device);
1519 write_unlock_irq(&global_state_lock);
1520 }
1521
1522 /* caller must hold global_state_lock */
1523 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1524 {
1525 struct drbd_device *odev;
1526 int resync_after;
1527
1528 if (o_minor == -1)
1529 return NO_ERROR;
1530 if (o_minor < -1 || o_minor > MINORMASK)
1531 return ERR_RESYNC_AFTER;
1532
1533 /* check for loops */
1534 odev = minor_to_device(o_minor);
1535 while (1) {
1536 if (odev == device)
1537 return ERR_RESYNC_AFTER_CYCLE;
1538
1539 /* You are free to depend on diskless, non-existing,
1540 * or not yet/no longer existing minors.
1541 * We only reject dependency loops.
1542 * We cannot follow the dependency chain beyond a detached or
1543 * missing minor.
1544 */
1545 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1546 return NO_ERROR;
1547
1548 rcu_read_lock();
1549 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1550 rcu_read_unlock();
1551 /* dependency chain ends here, no cycles. */
1552 if (resync_after == -1)
1553 return NO_ERROR;
1554
1555 /* follow the dependency chain */
1556 odev = minor_to_device(resync_after);
1557 }
1558 }
1559
1560 /* caller must hold global_state_lock */
1561 void drbd_resync_after_changed(struct drbd_device *device)
1562 {
1563 int changes;
1564
1565 do {
1566 changes = _drbd_pause_after(device);
1567 changes |= _drbd_resume_next(device);
1568 } while (changes);
1569 }
1570
1571 void drbd_rs_controller_reset(struct drbd_device *device)
1572 {
1573 struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
1574 struct fifo_buffer *plan;
1575
1576 atomic_set(&device->rs_sect_in, 0);
1577 atomic_set(&device->rs_sect_ev, 0);
1578 device->rs_in_flight = 0;
1579 device->rs_last_events =
1580 (int)part_stat_read(&disk->part0, sectors[0]) +
1581 (int)part_stat_read(&disk->part0, sectors[1]);
1582
1583 /* Updating the RCU protected object in place is necessary since
1584 this function gets called from atomic context.
1585 It is valid since all other updates also lead to an completely
1586 empty fifo */
1587 rcu_read_lock();
1588 plan = rcu_dereference(device->rs_plan_s);
1589 plan->total = 0;
1590 fifo_set(plan, 0);
1591 rcu_read_unlock();
1592 }
1593
1594 void start_resync_timer_fn(unsigned long data)
1595 {
1596 struct drbd_device *device = (struct drbd_device *) data;
1597 drbd_device_post_work(device, RS_START);
1598 }
1599
1600 static void do_start_resync(struct drbd_device *device)
1601 {
1602 if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1603 drbd_warn(device, "postponing start_resync ...\n");
1604 device->start_resync_timer.expires = jiffies + HZ/10;
1605 add_timer(&device->start_resync_timer);
1606 return;
1607 }
1608
1609 drbd_start_resync(device, C_SYNC_SOURCE);
1610 clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1611 }
1612
1613 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1614 {
1615 bool csums_after_crash_only;
1616 rcu_read_lock();
1617 csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1618 rcu_read_unlock();
1619 return connection->agreed_pro_version >= 89 && /* supported? */
1620 connection->csums_tfm && /* configured? */
1621 (csums_after_crash_only == 0 /* use for each resync? */
1622 || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1623 }
1624
1625 /**
1626 * drbd_start_resync() - Start the resync process
1627 * @device: DRBD device.
1628 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1629 *
1630 * This function might bring you directly into one of the
1631 * C_PAUSED_SYNC_* states.
1632 */
1633 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1634 {
1635 struct drbd_peer_device *peer_device = first_peer_device(device);
1636 struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1637 union drbd_state ns;
1638 int r;
1639
1640 if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1641 drbd_err(device, "Resync already running!\n");
1642 return;
1643 }
1644
1645 if (!test_bit(B_RS_H_DONE, &device->flags)) {
1646 if (side == C_SYNC_TARGET) {
1647 /* Since application IO was locked out during C_WF_BITMAP_T and
1648 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1649 we check that we might make the data inconsistent. */
1650 r = drbd_khelper(device, "before-resync-target");
1651 r = (r >> 8) & 0xff;
1652 if (r > 0) {
1653 drbd_info(device, "before-resync-target handler returned %d, "
1654 "dropping connection.\n", r);
1655 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1656 return;
1657 }
1658 } else /* C_SYNC_SOURCE */ {
1659 r = drbd_khelper(device, "before-resync-source");
1660 r = (r >> 8) & 0xff;
1661 if (r > 0) {
1662 if (r == 3) {
1663 drbd_info(device, "before-resync-source handler returned %d, "
1664 "ignoring. Old userland tools?", r);
1665 } else {
1666 drbd_info(device, "before-resync-source handler returned %d, "
1667 "dropping connection.\n", r);
1668 conn_request_state(connection,
1669 NS(conn, C_DISCONNECTING), CS_HARD);
1670 return;
1671 }
1672 }
1673 }
1674 }
1675
1676 if (current == connection->worker.task) {
1677 /* The worker should not sleep waiting for state_mutex,
1678 that can take long */
1679 if (!mutex_trylock(device->state_mutex)) {
1680 set_bit(B_RS_H_DONE, &device->flags);
1681 device->start_resync_timer.expires = jiffies + HZ/5;
1682 add_timer(&device->start_resync_timer);
1683 return;
1684 }
1685 } else {
1686 mutex_lock(device->state_mutex);
1687 }
1688 clear_bit(B_RS_H_DONE, &device->flags);
1689
1690 /* req_lock: serialize with drbd_send_and_submit() and others
1691 * global_state_lock: for stable sync-after dependencies */
1692 spin_lock_irq(&device->resource->req_lock);
1693 write_lock(&global_state_lock);
1694 /* Did some connection breakage or IO error race with us? */
1695 if (device->state.conn < C_CONNECTED
1696 || !get_ldev_if_state(device, D_NEGOTIATING)) {
1697 write_unlock(&global_state_lock);
1698 spin_unlock_irq(&device->resource->req_lock);
1699 mutex_unlock(device->state_mutex);
1700 return;
1701 }
1702
1703 ns = drbd_read_state(device);
1704
1705 ns.aftr_isp = !_drbd_may_sync_now(device);
1706
1707 ns.conn = side;
1708
1709 if (side == C_SYNC_TARGET)
1710 ns.disk = D_INCONSISTENT;
1711 else /* side == C_SYNC_SOURCE */
1712 ns.pdsk = D_INCONSISTENT;
1713
1714 r = __drbd_set_state(device, ns, CS_VERBOSE, NULL);
1715 ns = drbd_read_state(device);
1716
1717 if (ns.conn < C_CONNECTED)
1718 r = SS_UNKNOWN_ERROR;
1719
1720 if (r == SS_SUCCESS) {
1721 unsigned long tw = drbd_bm_total_weight(device);
1722 unsigned long now = jiffies;
1723 int i;
1724
1725 device->rs_failed = 0;
1726 device->rs_paused = 0;
1727 device->rs_same_csum = 0;
1728 device->rs_last_sect_ev = 0;
1729 device->rs_total = tw;
1730 device->rs_start = now;
1731 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1732 device->rs_mark_left[i] = tw;
1733 device->rs_mark_time[i] = now;
1734 }
1735 _drbd_pause_after(device);
1736 /* Forget potentially stale cached per resync extent bit-counts.
1737 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1738 * disabled, and know the disk state is ok. */
1739 spin_lock(&device->al_lock);
1740 lc_reset(device->resync);
1741 device->resync_locked = 0;
1742 device->resync_wenr = LC_FREE;
1743 spin_unlock(&device->al_lock);
1744 }
1745 write_unlock(&global_state_lock);
1746 spin_unlock_irq(&device->resource->req_lock);
1747
1748 if (r == SS_SUCCESS) {
1749 wake_up(&device->al_wait); /* for lc_reset() above */
1750 /* reset rs_last_bcast when a resync or verify is started,
1751 * to deal with potential jiffies wrap. */
1752 device->rs_last_bcast = jiffies - HZ;
1753
1754 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1755 drbd_conn_str(ns.conn),
1756 (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1757 (unsigned long) device->rs_total);
1758 if (side == C_SYNC_TARGET) {
1759 device->bm_resync_fo = 0;
1760 device->use_csums = use_checksum_based_resync(connection, device);
1761 } else {
1762 device->use_csums = 0;
1763 }
1764
1765 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1766 * with w_send_oos, or the sync target will get confused as to
1767 * how much bits to resync. We cannot do that always, because for an
1768 * empty resync and protocol < 95, we need to do it here, as we call
1769 * drbd_resync_finished from here in that case.
1770 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1771 * and from after_state_ch otherwise. */
1772 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1773 drbd_gen_and_send_sync_uuid(peer_device);
1774
1775 if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1776 /* This still has a race (about when exactly the peers
1777 * detect connection loss) that can lead to a full sync
1778 * on next handshake. In 8.3.9 we fixed this with explicit
1779 * resync-finished notifications, but the fix
1780 * introduces a protocol change. Sleeping for some
1781 * time longer than the ping interval + timeout on the
1782 * SyncSource, to give the SyncTarget the chance to
1783 * detect connection loss, then waiting for a ping
1784 * response (implicit in drbd_resync_finished) reduces
1785 * the race considerably, but does not solve it. */
1786 if (side == C_SYNC_SOURCE) {
1787 struct net_conf *nc;
1788 int timeo;
1789
1790 rcu_read_lock();
1791 nc = rcu_dereference(connection->net_conf);
1792 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1793 rcu_read_unlock();
1794 schedule_timeout_interruptible(timeo);
1795 }
1796 drbd_resync_finished(device);
1797 }
1798
1799 drbd_rs_controller_reset(device);
1800 /* ns.conn may already be != device->state.conn,
1801 * we may have been paused in between, or become paused until
1802 * the timer triggers.
1803 * No matter, that is handled in resync_timer_fn() */
1804 if (ns.conn == C_SYNC_TARGET)
1805 mod_timer(&device->resync_timer, jiffies);
1806
1807 drbd_md_sync(device);
1808 }
1809 put_ldev(device);
1810 mutex_unlock(device->state_mutex);
1811 }
1812
1813 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1814 {
1815 struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1816 device->rs_last_bcast = jiffies;
1817
1818 if (!get_ldev(device))
1819 return;
1820
1821 drbd_bm_write_lazy(device, 0);
1822 if (resync_done && is_sync_state(device->state.conn))
1823 drbd_resync_finished(device);
1824
1825 drbd_bcast_event(device, &sib);
1826 /* update timestamp, in case it took a while to write out stuff */
1827 device->rs_last_bcast = jiffies;
1828 put_ldev(device);
1829 }
1830
1831 static void drbd_ldev_destroy(struct drbd_device *device)
1832 {
1833 lc_destroy(device->resync);
1834 device->resync = NULL;
1835 lc_destroy(device->act_log);
1836 device->act_log = NULL;
1837
1838 __acquire(local);
1839 drbd_free_ldev(device->ldev);
1840 device->ldev = NULL;
1841 __release(local);
1842
1843 clear_bit(GOING_DISKLESS, &device->flags);
1844 wake_up(&device->misc_wait);
1845 }
1846
1847 static void go_diskless(struct drbd_device *device)
1848 {
1849 D_ASSERT(device, device->state.disk == D_FAILED);
1850 /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1851 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1852 * the protected members anymore, though, so once put_ldev reaches zero
1853 * again, it will be safe to free them. */
1854
1855 /* Try to write changed bitmap pages, read errors may have just
1856 * set some bits outside the area covered by the activity log.
1857 *
1858 * If we have an IO error during the bitmap writeout,
1859 * we will want a full sync next time, just in case.
1860 * (Do we want a specific meta data flag for this?)
1861 *
1862 * If that does not make it to stable storage either,
1863 * we cannot do anything about that anymore.
1864 *
1865 * We still need to check if both bitmap and ldev are present, we may
1866 * end up here after a failed attach, before ldev was even assigned.
1867 */
1868 if (device->bitmap && device->ldev) {
1869 /* An interrupted resync or similar is allowed to recounts bits
1870 * while we detach.
1871 * Any modifications would not be expected anymore, though.
1872 */
1873 if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1874 "detach", BM_LOCKED_TEST_ALLOWED)) {
1875 if (test_bit(WAS_READ_ERROR, &device->flags)) {
1876 drbd_md_set_flag(device, MDF_FULL_SYNC);
1877 drbd_md_sync(device);
1878 }
1879 }
1880 }
1881
1882 drbd_force_state(device, NS(disk, D_DISKLESS));
1883 }
1884
1885 static int do_md_sync(struct drbd_device *device)
1886 {
1887 drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1888 drbd_md_sync(device);
1889 return 0;
1890 }
1891
1892 /* only called from drbd_worker thread, no locking */
1893 void __update_timing_details(
1894 struct drbd_thread_timing_details *tdp,
1895 unsigned int *cb_nr,
1896 void *cb,
1897 const char *fn, const unsigned int line)
1898 {
1899 unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
1900 struct drbd_thread_timing_details *td = tdp + i;
1901
1902 td->start_jif = jiffies;
1903 td->cb_addr = cb;
1904 td->caller_fn = fn;
1905 td->line = line;
1906 td->cb_nr = *cb_nr;
1907
1908 i = (i+1) % DRBD_THREAD_DETAILS_HIST;
1909 td = tdp + i;
1910 memset(td, 0, sizeof(*td));
1911
1912 ++(*cb_nr);
1913 }
1914
1915 static void do_device_work(struct drbd_device *device, const unsigned long todo)
1916 {
1917 if (test_bit(MD_SYNC, &todo))
1918 do_md_sync(device);
1919 if (test_bit(RS_DONE, &todo) ||
1920 test_bit(RS_PROGRESS, &todo))
1921 update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
1922 if (test_bit(GO_DISKLESS, &todo))
1923 go_diskless(device);
1924 if (test_bit(DESTROY_DISK, &todo))
1925 drbd_ldev_destroy(device);
1926 if (test_bit(RS_START, &todo))
1927 do_start_resync(device);
1928 }
1929
1930 #define DRBD_DEVICE_WORK_MASK \
1931 ((1UL << GO_DISKLESS) \
1932 |(1UL << DESTROY_DISK) \
1933 |(1UL << MD_SYNC) \
1934 |(1UL << RS_START) \
1935 |(1UL << RS_PROGRESS) \
1936 |(1UL << RS_DONE) \
1937 )
1938
1939 static unsigned long get_work_bits(unsigned long *flags)
1940 {
1941 unsigned long old, new;
1942 do {
1943 old = *flags;
1944 new = old & ~DRBD_DEVICE_WORK_MASK;
1945 } while (cmpxchg(flags, old, new) != old);
1946 return old & DRBD_DEVICE_WORK_MASK;
1947 }
1948
1949 static void do_unqueued_work(struct drbd_connection *connection)
1950 {
1951 struct drbd_peer_device *peer_device;
1952 int vnr;
1953
1954 rcu_read_lock();
1955 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1956 struct drbd_device *device = peer_device->device;
1957 unsigned long todo = get_work_bits(&device->flags);
1958 if (!todo)
1959 continue;
1960
1961 kref_get(&device->kref);
1962 rcu_read_unlock();
1963 do_device_work(device, todo);
1964 kref_put(&device->kref, drbd_destroy_device);
1965 rcu_read_lock();
1966 }
1967 rcu_read_unlock();
1968 }
1969
1970 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
1971 {
1972 spin_lock_irq(&queue->q_lock);
1973 list_splice_tail_init(&queue->q, work_list);
1974 spin_unlock_irq(&queue->q_lock);
1975 return !list_empty(work_list);
1976 }
1977
1978 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
1979 {
1980 DEFINE_WAIT(wait);
1981 struct net_conf *nc;
1982 int uncork, cork;
1983
1984 dequeue_work_batch(&connection->sender_work, work_list);
1985 if (!list_empty(work_list))
1986 return;
1987
1988 /* Still nothing to do?
1989 * Maybe we still need to close the current epoch,
1990 * even if no new requests are queued yet.
1991 *
1992 * Also, poke TCP, just in case.
1993 * Then wait for new work (or signal). */
1994 rcu_read_lock();
1995 nc = rcu_dereference(connection->net_conf);
1996 uncork = nc ? nc->tcp_cork : 0;
1997 rcu_read_unlock();
1998 if (uncork) {
1999 mutex_lock(&connection->data.mutex);
2000 if (connection->data.socket)
2001 drbd_tcp_uncork(connection->data.socket);
2002 mutex_unlock(&connection->data.mutex);
2003 }
2004
2005 for (;;) {
2006 int send_barrier;
2007 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2008 spin_lock_irq(&connection->resource->req_lock);
2009 spin_lock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
2010 if (!list_empty(&connection->sender_work.q))
2011 list_splice_tail_init(&connection->sender_work.q, work_list);
2012 spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
2013 if (!list_empty(work_list) || signal_pending(current)) {
2014 spin_unlock_irq(&connection->resource->req_lock);
2015 break;
2016 }
2017
2018 /* We found nothing new to do, no to-be-communicated request,
2019 * no other work item. We may still need to close the last
2020 * epoch. Next incoming request epoch will be connection ->
2021 * current transfer log epoch number. If that is different
2022 * from the epoch of the last request we communicated, it is
2023 * safe to send the epoch separating barrier now.
2024 */
2025 send_barrier =
2026 atomic_read(&connection->current_tle_nr) !=
2027 connection->send.current_epoch_nr;
2028 spin_unlock_irq(&connection->resource->req_lock);
2029
2030 if (send_barrier)
2031 maybe_send_barrier(connection,
2032 connection->send.current_epoch_nr + 1);
2033
2034 if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2035 break;
2036
2037 /* drbd_send() may have called flush_signals() */
2038 if (get_t_state(&connection->worker) != RUNNING)
2039 break;
2040
2041 schedule();
2042 /* may be woken up for other things but new work, too,
2043 * e.g. if the current epoch got closed.
2044 * In which case we send the barrier above. */
2045 }
2046 finish_wait(&connection->sender_work.q_wait, &wait);
2047
2048 /* someone may have changed the config while we have been waiting above. */
2049 rcu_read_lock();
2050 nc = rcu_dereference(connection->net_conf);
2051 cork = nc ? nc->tcp_cork : 0;
2052 rcu_read_unlock();
2053 mutex_lock(&connection->data.mutex);
2054 if (connection->data.socket) {
2055 if (cork)
2056 drbd_tcp_cork(connection->data.socket);
2057 else if (!uncork)
2058 drbd_tcp_uncork(connection->data.socket);
2059 }
2060 mutex_unlock(&connection->data.mutex);
2061 }
2062
2063 int drbd_worker(struct drbd_thread *thi)
2064 {
2065 struct drbd_connection *connection = thi->connection;
2066 struct drbd_work *w = NULL;
2067 struct drbd_peer_device *peer_device;
2068 LIST_HEAD(work_list);
2069 int vnr;
2070
2071 while (get_t_state(thi) == RUNNING) {
2072 drbd_thread_current_set_cpu(thi);
2073
2074 if (list_empty(&work_list)) {
2075 update_worker_timing_details(connection, wait_for_work);
2076 wait_for_work(connection, &work_list);
2077 }
2078
2079 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2080 update_worker_timing_details(connection, do_unqueued_work);
2081 do_unqueued_work(connection);
2082 }
2083
2084 if (signal_pending(current)) {
2085 flush_signals(current);
2086 if (get_t_state(thi) == RUNNING) {
2087 drbd_warn(connection, "Worker got an unexpected signal\n");
2088 continue;
2089 }
2090 break;
2091 }
2092
2093 if (get_t_state(thi) != RUNNING)
2094 break;
2095
2096 if (!list_empty(&work_list)) {
2097 w = list_first_entry(&work_list, struct drbd_work, list);
2098 list_del_init(&w->list);
2099 update_worker_timing_details(connection, w->cb);
2100 if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2101 continue;
2102 if (connection->cstate >= C_WF_REPORT_PARAMS)
2103 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2104 }
2105 }
2106
2107 do {
2108 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2109 update_worker_timing_details(connection, do_unqueued_work);
2110 do_unqueued_work(connection);
2111 }
2112 if (!list_empty(&work_list)) {
2113 w = list_first_entry(&work_list, struct drbd_work, list);
2114 list_del_init(&w->list);
2115 update_worker_timing_details(connection, w->cb);
2116 w->cb(w, 1);
2117 } else
2118 dequeue_work_batch(&connection->sender_work, &work_list);
2119 } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2120
2121 rcu_read_lock();
2122 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2123 struct drbd_device *device = peer_device->device;
2124 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2125 kref_get(&device->kref);
2126 rcu_read_unlock();
2127 drbd_device_cleanup(device);
2128 kref_put(&device->kref, drbd_destroy_device);
2129 rcu_read_lock();
2130 }
2131 rcu_read_unlock();
2132
2133 return 0;
2134 }