]> git.proxmox.com Git - mirror_ubuntu-disco-kernel.git/blame - drivers/block/drbd/drbd_worker.c
drbd: Adding support for BIO/Request flags: REQ_FUA, REQ_FLUSH and REQ_DISCARD
[mirror_ubuntu-disco-kernel.git] / drivers / block / drbd / drbd_worker.c
CommitLineData
b411b363
PR
1/*
2 drbd_worker.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
b411b363 26#include <linux/module.h>
b411b363
PR
27#include <linux/drbd.h>
28#include <linux/sched.h>
29#include <linux/smp_lock.h>
30#include <linux/wait.h>
31#include <linux/mm.h>
32#include <linux/memcontrol.h>
33#include <linux/mm_inline.h>
34#include <linux/slab.h>
35#include <linux/random.h>
b411b363
PR
36#include <linux/string.h>
37#include <linux/scatterlist.h>
38
39#include "drbd_int.h"
40#include "drbd_req.h"
b411b363 41
b411b363
PR
42static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
43
44
45
46/* defined here:
47 drbd_md_io_complete
45bb912b 48 drbd_endio_sec
b411b363
PR
49 drbd_endio_pri
50
51 * more endio handlers:
52 atodb_endio in drbd_actlog.c
53 drbd_bm_async_io_complete in drbd_bitmap.c
54
55 * For all these callbacks, note the following:
56 * The callbacks will be called in irq context by the IDE drivers,
57 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
58 * Try to get the locking right :)
59 *
60 */
61
62
63/* About the global_state_lock
64 Each state transition on an device holds a read lock. In case we have
65 to evaluate the sync after dependencies, we grab a write lock, because
66 we need stable states on all devices for that. */
67rwlock_t global_state_lock;
68
69/* used for synchronous meta data and bitmap IO
70 * submitted by drbd_md_sync_page_io()
71 */
72void drbd_md_io_complete(struct bio *bio, int error)
73{
74 struct drbd_md_io *md_io;
75
76 md_io = (struct drbd_md_io *)bio->bi_private;
77 md_io->error = error;
78
b411b363
PR
79 complete(&md_io->event);
80}
81
82/* reads on behalf of the partner,
83 * "submitted" by the receiver
84 */
45bb912b 85void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
b411b363
PR
86{
87 unsigned long flags = 0;
45bb912b 88 struct drbd_conf *mdev = e->mdev;
b411b363
PR
89
90 D_ASSERT(e->block_id != ID_VACANT);
91
b411b363
PR
92 spin_lock_irqsave(&mdev->req_lock, flags);
93 mdev->read_cnt += e->size >> 9;
94 list_del(&e->w.list);
95 if (list_empty(&mdev->read_ee))
96 wake_up(&mdev->ee_wait);
45bb912b
LE
97 if (test_bit(__EE_WAS_ERROR, &e->flags))
98 __drbd_chk_io_error(mdev, FALSE);
b411b363
PR
99 spin_unlock_irqrestore(&mdev->req_lock, flags);
100
b411b363
PR
101 drbd_queue_work(&mdev->data.work, &e->w);
102 put_ldev(mdev);
b411b363
PR
103}
104
45bb912b
LE
105static int is_failed_barrier(int ee_flags)
106{
107 return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED))
108 == (EE_IS_BARRIER|EE_WAS_ERROR);
109}
110
b411b363 111/* writes on behalf of the partner, or resync writes,
45bb912b
LE
112 * "submitted" by the receiver, final stage. */
113static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
b411b363
PR
114{
115 unsigned long flags = 0;
45bb912b 116 struct drbd_conf *mdev = e->mdev;
b411b363
PR
117 sector_t e_sector;
118 int do_wake;
119 int is_syncer_req;
120 int do_al_complete_io;
b411b363 121
45bb912b
LE
122 /* if this is a failed barrier request, disable use of barriers,
123 * and schedule for resubmission */
124 if (is_failed_barrier(e->flags)) {
b411b363
PR
125 drbd_bump_write_ordering(mdev, WO_bdev_flush);
126 spin_lock_irqsave(&mdev->req_lock, flags);
127 list_del(&e->w.list);
fc8ce194 128 e->flags = (e->flags & ~EE_WAS_ERROR) | EE_RESUBMITTED;
b411b363
PR
129 e->w.cb = w_e_reissue;
130 /* put_ldev actually happens below, once we come here again. */
131 __release(local);
132 spin_unlock_irqrestore(&mdev->req_lock, flags);
133 drbd_queue_work(&mdev->data.work, &e->w);
134 return;
135 }
136
137 D_ASSERT(e->block_id != ID_VACANT);
138
b411b363
PR
139 /* after we moved e to done_ee,
140 * we may no longer access it,
141 * it may be freed/reused already!
142 * (as soon as we release the req_lock) */
143 e_sector = e->sector;
144 do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
45bb912b 145 is_syncer_req = is_syncer_block_id(e->block_id);
b411b363 146
45bb912b
LE
147 spin_lock_irqsave(&mdev->req_lock, flags);
148 mdev->writ_cnt += e->size >> 9;
b411b363
PR
149 list_del(&e->w.list); /* has been on active_ee or sync_ee */
150 list_add_tail(&e->w.list, &mdev->done_ee);
151
b411b363
PR
152 /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
153 * neither did we wake possibly waiting conflicting requests.
154 * done from "drbd_process_done_ee" within the appropriate w.cb
155 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
156
157 do_wake = is_syncer_req
158 ? list_empty(&mdev->sync_ee)
159 : list_empty(&mdev->active_ee);
160
45bb912b 161 if (test_bit(__EE_WAS_ERROR, &e->flags))
b411b363
PR
162 __drbd_chk_io_error(mdev, FALSE);
163 spin_unlock_irqrestore(&mdev->req_lock, flags);
164
165 if (is_syncer_req)
166 drbd_rs_complete_io(mdev, e_sector);
167
168 if (do_wake)
169 wake_up(&mdev->ee_wait);
170
171 if (do_al_complete_io)
172 drbd_al_complete_io(mdev, e_sector);
173
174 wake_asender(mdev);
175 put_ldev(mdev);
45bb912b 176}
b411b363 177
45bb912b
LE
178/* writes on behalf of the partner, or resync writes,
179 * "submitted" by the receiver.
180 */
181void drbd_endio_sec(struct bio *bio, int error)
182{
183 struct drbd_epoch_entry *e = bio->bi_private;
184 struct drbd_conf *mdev = e->mdev;
185 int uptodate = bio_flagged(bio, BIO_UPTODATE);
186 int is_write = bio_data_dir(bio) == WRITE;
187
188 if (error)
189 dev_warn(DEV, "%s: error=%d s=%llus\n",
190 is_write ? "write" : "read", error,
191 (unsigned long long)e->sector);
192 if (!error && !uptodate) {
193 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
194 is_write ? "write" : "read",
195 (unsigned long long)e->sector);
196 /* strange behavior of some lower level drivers...
197 * fail the request by clearing the uptodate flag,
198 * but do not return any error?! */
199 error = -EIO;
200 }
201
202 if (error)
203 set_bit(__EE_WAS_ERROR, &e->flags);
204
205 bio_put(bio); /* no need for the bio anymore */
206 if (atomic_dec_and_test(&e->pending_bios)) {
207 if (is_write)
208 drbd_endio_write_sec_final(e);
209 else
210 drbd_endio_read_sec_final(e);
211 }
b411b363
PR
212}
213
214/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
215 */
216void drbd_endio_pri(struct bio *bio, int error)
217{
b411b363
PR
218 struct drbd_request *req = bio->bi_private;
219 struct drbd_conf *mdev = req->mdev;
b411b363
PR
220 enum drbd_req_event what;
221 int uptodate = bio_flagged(bio, BIO_UPTODATE);
222
b411b363
PR
223 if (!error && !uptodate) {
224 dev_warn(DEV, "p %s: setting error to -EIO\n",
225 bio_data_dir(bio) == WRITE ? "write" : "read");
226 /* strange behavior of some lower level drivers...
227 * fail the request by clearing the uptodate flag,
228 * but do not return any error?! */
229 error = -EIO;
230 }
231
b411b363
PR
232 /* to avoid recursion in __req_mod */
233 if (unlikely(error)) {
234 what = (bio_data_dir(bio) == WRITE)
235 ? write_completed_with_error
5c3c7e64 236 : (bio_rw(bio) == READ)
b411b363
PR
237 ? read_completed_with_error
238 : read_ahead_completed_with_error;
239 } else
240 what = completed_ok;
241
242 bio_put(req->private_bio);
243 req->private_bio = ERR_PTR(error);
244
0f0601f4 245 req_mod(req, what);
b411b363
PR
246}
247
b411b363
PR
248int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
249{
250 struct drbd_request *req = container_of(w, struct drbd_request, w);
251
252 /* We should not detach for read io-error,
253 * but try to WRITE the P_DATA_REPLY to the failed location,
254 * to give the disk the chance to relocate that block */
255
256 spin_lock_irq(&mdev->req_lock);
d255e5ff
LE
257 if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
258 _req_mod(req, read_retry_remote_canceled);
b411b363 259 spin_unlock_irq(&mdev->req_lock);
b411b363
PR
260 return 1;
261 }
262 spin_unlock_irq(&mdev->req_lock);
263
264 return w_send_read_req(mdev, w, 0);
265}
266
267int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
268{
269 ERR_IF(cancel) return 1;
270 dev_err(DEV, "resync inactive, but callback triggered??\n");
271 return 1; /* Simply ignore this! */
272}
273
45bb912b
LE
274void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
275{
276 struct hash_desc desc;
277 struct scatterlist sg;
278 struct page *page = e->pages;
279 struct page *tmp;
280 unsigned len;
281
282 desc.tfm = tfm;
283 desc.flags = 0;
284
285 sg_init_table(&sg, 1);
286 crypto_hash_init(&desc);
287
288 while ((tmp = page_chain_next(page))) {
289 /* all but the last page will be fully used */
290 sg_set_page(&sg, page, PAGE_SIZE, 0);
291 crypto_hash_update(&desc, &sg, sg.length);
292 page = tmp;
293 }
294 /* and now the last, possibly only partially used page */
295 len = e->size & (PAGE_SIZE - 1);
296 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
297 crypto_hash_update(&desc, &sg, sg.length);
298 crypto_hash_final(&desc, digest);
299}
300
301void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
b411b363
PR
302{
303 struct hash_desc desc;
304 struct scatterlist sg;
305 struct bio_vec *bvec;
306 int i;
307
308 desc.tfm = tfm;
309 desc.flags = 0;
310
311 sg_init_table(&sg, 1);
312 crypto_hash_init(&desc);
313
314 __bio_for_each_segment(bvec, bio, i, 0) {
315 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
316 crypto_hash_update(&desc, &sg, sg.length);
317 }
318 crypto_hash_final(&desc, digest);
319}
320
321static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
322{
323 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
324 int digest_size;
325 void *digest;
326 int ok;
327
328 D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
329
330 if (unlikely(cancel)) {
331 drbd_free_ee(mdev, e);
332 return 1;
333 }
334
45bb912b 335 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
336 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
337 digest = kmalloc(digest_size, GFP_NOIO);
338 if (digest) {
45bb912b 339 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
b411b363
PR
340
341 inc_rs_pending(mdev);
342 ok = drbd_send_drequest_csum(mdev,
343 e->sector,
344 e->size,
345 digest,
346 digest_size,
347 P_CSUM_RS_REQUEST);
348 kfree(digest);
349 } else {
350 dev_err(DEV, "kmalloc() of digest failed.\n");
351 ok = 0;
352 }
353 } else
354 ok = 1;
355
356 drbd_free_ee(mdev, e);
357
358 if (unlikely(!ok))
359 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
360 return ok;
361}
362
363#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
364
365static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
366{
367 struct drbd_epoch_entry *e;
368
369 if (!get_ldev(mdev))
80a40e43 370 return -EIO;
b411b363 371
0f0601f4
LE
372 if (drbd_rs_should_slow_down(mdev))
373 goto defer;
374
b411b363
PR
375 /* GFP_TRY, because if there is no memory available right now, this may
376 * be rescheduled for later. It is "only" background resync, after all. */
377 e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
45bb912b 378 if (!e)
80a40e43 379 goto defer;
b411b363 380
80a40e43 381 e->w.cb = w_e_send_csum;
b411b363
PR
382 spin_lock_irq(&mdev->req_lock);
383 list_add(&e->w.list, &mdev->read_ee);
384 spin_unlock_irq(&mdev->req_lock);
385
0f0601f4 386 atomic_add(size >> 9, &mdev->rs_sect_ev);
45bb912b 387 if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
80a40e43 388 return 0;
b411b363 389
45bb912b 390 drbd_free_ee(mdev, e);
80a40e43 391defer:
45bb912b 392 put_ldev(mdev);
80a40e43 393 return -EAGAIN;
b411b363
PR
394}
395
396void resync_timer_fn(unsigned long data)
397{
b411b363
PR
398 struct drbd_conf *mdev = (struct drbd_conf *) data;
399 int queue;
400
63106d3c
PR
401 queue = 1;
402 switch (mdev->state.conn) {
403 case C_VERIFY_S:
404 mdev->resync_work.cb = w_make_ov_request;
405 break;
406 case C_SYNC_TARGET:
407 mdev->resync_work.cb = w_make_resync_request;
408 break;
409 default:
b411b363
PR
410 queue = 0;
411 mdev->resync_work.cb = w_resync_inactive;
412 }
413
b411b363
PR
414 /* harmless race: list_empty outside data.work.q_lock */
415 if (list_empty(&mdev->resync_work.list) && queue)
416 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
417}
418
778f271d
PR
419static void fifo_set(struct fifo_buffer *fb, int value)
420{
421 int i;
422
423 for (i = 0; i < fb->size; i++)
424 fb->values[i] += value;
425}
426
427static int fifo_push(struct fifo_buffer *fb, int value)
428{
429 int ov;
430
431 ov = fb->values[fb->head_index];
432 fb->values[fb->head_index++] = value;
433
434 if (fb->head_index >= fb->size)
435 fb->head_index = 0;
436
437 return ov;
438}
439
440static void fifo_add_val(struct fifo_buffer *fb, int value)
441{
442 int i;
443
444 for (i = 0; i < fb->size; i++)
445 fb->values[i] += value;
446}
447
448int drbd_rs_controller(struct drbd_conf *mdev)
449{
450 unsigned int sect_in; /* Number of sectors that came in since the last turn */
451 unsigned int want; /* The number of sectors we want in the proxy */
452 int req_sect; /* Number of sectors to request in this turn */
453 int correction; /* Number of sectors more we need in the proxy*/
454 int cps; /* correction per invocation of drbd_rs_controller() */
455 int steps; /* Number of time steps to plan ahead */
456 int curr_corr;
457 int max_sect;
458
459 sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
460 mdev->rs_in_flight -= sect_in;
461
462 spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
463
464 steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
465
466 if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
467 want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
468 } else { /* normal path */
469 want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
470 sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
471 }
472
473 correction = want - mdev->rs_in_flight - mdev->rs_planed;
474
475 /* Plan ahead */
476 cps = correction / steps;
477 fifo_add_val(&mdev->rs_plan_s, cps);
478 mdev->rs_planed += cps * steps;
479
480 /* What we do in this step */
481 curr_corr = fifo_push(&mdev->rs_plan_s, 0);
482 spin_unlock(&mdev->peer_seq_lock);
483 mdev->rs_planed -= curr_corr;
484
485 req_sect = sect_in + curr_corr;
486 if (req_sect < 0)
487 req_sect = 0;
488
489 max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
490 if (req_sect > max_sect)
491 req_sect = max_sect;
492
493 /*
494 dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
495 sect_in, mdev->rs_in_flight, want, correction,
496 steps, cps, mdev->rs_planed, curr_corr, req_sect);
497 */
498
499 return req_sect;
500}
501
b411b363
PR
502int w_make_resync_request(struct drbd_conf *mdev,
503 struct drbd_work *w, int cancel)
504{
505 unsigned long bit;
506 sector_t sector;
507 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
bb3d000c 508 int max_segment_size;
0f0601f4 509 int number, rollback_i, size, pe, mx;
b411b363 510 int align, queued, sndbuf;
0f0601f4 511 int i = 0;
b411b363
PR
512
513 if (unlikely(cancel))
514 return 1;
515
516 if (unlikely(mdev->state.conn < C_CONNECTED)) {
517 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
518 return 0;
519 }
520
521 if (mdev->state.conn != C_SYNC_TARGET)
522 dev_err(DEV, "%s in w_make_resync_request\n",
523 drbd_conn_str(mdev->state.conn));
524
525 if (!get_ldev(mdev)) {
526 /* Since we only need to access mdev->rsync a
527 get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
528 to continue resync with a broken disk makes no sense at
529 all */
530 dev_err(DEV, "Disk broke down during resync!\n");
531 mdev->resync_work.cb = w_resync_inactive;
532 return 1;
533 }
534
bb3d000c
LE
535 /* starting with drbd 8.3.8, we can handle multi-bio EEs,
536 * if it should be necessary */
537 max_segment_size = mdev->agreed_pro_version < 94 ?
538 queue_max_segment_size(mdev->rq_queue) : DRBD_MAX_SEGMENT_SIZE;
539
778f271d
PR
540 if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
541 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
542 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
543 } else {
544 mdev->c_sync_rate = mdev->sync_conf.rate;
545 number = SLEEP_TIME * mdev->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ);
546 }
0f0601f4
LE
547
548 /* Throttle resync on lower level disk activity, which may also be
549 * caused by application IO on Primary/SyncTarget.
550 * Keep this after the call to drbd_rs_controller, as that assumes
551 * to be called as precisely as possible every SLEEP_TIME,
552 * and would be confused otherwise. */
553 if (drbd_rs_should_slow_down(mdev))
554 goto requeue;
b411b363
PR
555
556 mutex_lock(&mdev->data.mutex);
557 if (mdev->data.socket)
558 mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
559 else
560 mx = 1;
561 mutex_unlock(&mdev->data.mutex);
562
563 /* For resync rates >160MB/sec, allow more pending RS requests */
564 if (number > mx)
565 mx = number;
566
567 /* Limit the number of pending RS requests to no more than the peer's receive buffer */
0f0601f4 568 pe = atomic_read(&mdev->rs_pending_cnt);
b411b363
PR
569 if ((pe + number) > mx) {
570 number = mx - pe;
571 }
572
573 for (i = 0; i < number; i++) {
574 /* Stop generating RS requests, when half of the send buffer is filled */
575 mutex_lock(&mdev->data.mutex);
576 if (mdev->data.socket) {
577 queued = mdev->data.socket->sk->sk_wmem_queued;
578 sndbuf = mdev->data.socket->sk->sk_sndbuf;
579 } else {
580 queued = 1;
581 sndbuf = 0;
582 }
583 mutex_unlock(&mdev->data.mutex);
584 if (queued > sndbuf / 2)
585 goto requeue;
586
587next_sector:
588 size = BM_BLOCK_SIZE;
589 bit = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
590
591 if (bit == -1UL) {
592 mdev->bm_resync_fo = drbd_bm_bits(mdev);
593 mdev->resync_work.cb = w_resync_inactive;
594 put_ldev(mdev);
595 return 1;
596 }
597
598 sector = BM_BIT_TO_SECT(bit);
599
600 if (drbd_try_rs_begin_io(mdev, sector)) {
601 mdev->bm_resync_fo = bit;
602 goto requeue;
603 }
604 mdev->bm_resync_fo = bit + 1;
605
606 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
607 drbd_rs_complete_io(mdev, sector);
608 goto next_sector;
609 }
610
611#if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
612 /* try to find some adjacent bits.
613 * we stop if we have already the maximum req size.
614 *
615 * Additionally always align bigger requests, in order to
616 * be prepared for all stripe sizes of software RAIDs.
b411b363
PR
617 */
618 align = 1;
d207450c 619 rollback_i = i;
b411b363
PR
620 for (;;) {
621 if (size + BM_BLOCK_SIZE > max_segment_size)
622 break;
623
624 /* Be always aligned */
625 if (sector & ((1<<(align+3))-1))
626 break;
627
628 /* do not cross extent boundaries */
629 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
630 break;
631 /* now, is it actually dirty, after all?
632 * caution, drbd_bm_test_bit is tri-state for some
633 * obscure reason; ( b == 0 ) would get the out-of-band
634 * only accidentally right because of the "oddly sized"
635 * adjustment below */
636 if (drbd_bm_test_bit(mdev, bit+1) != 1)
637 break;
638 bit++;
639 size += BM_BLOCK_SIZE;
640 if ((BM_BLOCK_SIZE << align) <= size)
641 align++;
642 i++;
643 }
644 /* if we merged some,
645 * reset the offset to start the next drbd_bm_find_next from */
646 if (size > BM_BLOCK_SIZE)
647 mdev->bm_resync_fo = bit + 1;
648#endif
649
650 /* adjust very last sectors, in case we are oddly sized */
651 if (sector + (size>>9) > capacity)
652 size = (capacity-sector)<<9;
653 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
654 switch (read_for_csum(mdev, sector, size)) {
80a40e43 655 case -EIO: /* Disk failure */
b411b363
PR
656 put_ldev(mdev);
657 return 0;
80a40e43 658 case -EAGAIN: /* allocation failed, or ldev busy */
b411b363
PR
659 drbd_rs_complete_io(mdev, sector);
660 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
d207450c 661 i = rollback_i;
b411b363 662 goto requeue;
80a40e43
LE
663 case 0:
664 /* everything ok */
665 break;
666 default:
667 BUG();
b411b363
PR
668 }
669 } else {
670 inc_rs_pending(mdev);
671 if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
672 sector, size, ID_SYNCER)) {
673 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
674 dec_rs_pending(mdev);
675 put_ldev(mdev);
676 return 0;
677 }
678 }
679 }
680
681 if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
682 /* last syncer _request_ was sent,
683 * but the P_RS_DATA_REPLY not yet received. sync will end (and
684 * next sync group will resume), as soon as we receive the last
685 * resync data block, and the last bit is cleared.
686 * until then resync "work" is "inactive" ...
687 */
688 mdev->resync_work.cb = w_resync_inactive;
689 put_ldev(mdev);
690 return 1;
691 }
692
693 requeue:
778f271d 694 mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
b411b363
PR
695 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
696 put_ldev(mdev);
697 return 1;
698}
699
700static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
701{
702 int number, i, size;
703 sector_t sector;
704 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
705
706 if (unlikely(cancel))
707 return 1;
708
709 if (unlikely(mdev->state.conn < C_CONNECTED)) {
710 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
711 return 0;
712 }
713
714 number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
715 if (atomic_read(&mdev->rs_pending_cnt) > number)
716 goto requeue;
717
718 number -= atomic_read(&mdev->rs_pending_cnt);
719
720 sector = mdev->ov_position;
721 for (i = 0; i < number; i++) {
722 if (sector >= capacity) {
723 mdev->resync_work.cb = w_resync_inactive;
724 return 1;
725 }
726
727 size = BM_BLOCK_SIZE;
728
729 if (drbd_try_rs_begin_io(mdev, sector)) {
730 mdev->ov_position = sector;
731 goto requeue;
732 }
733
734 if (sector + (size>>9) > capacity)
735 size = (capacity-sector)<<9;
736
737 inc_rs_pending(mdev);
738 if (!drbd_send_ov_request(mdev, sector, size)) {
739 dec_rs_pending(mdev);
740 return 0;
741 }
742 sector += BM_SECT_PER_BIT;
743 }
744 mdev->ov_position = sector;
745
746 requeue:
747 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
748 return 1;
749}
750
751
752int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
753{
754 kfree(w);
755 ov_oos_print(mdev);
756 drbd_resync_finished(mdev);
757
758 return 1;
759}
760
761static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
762{
763 kfree(w);
764
765 drbd_resync_finished(mdev);
766
767 return 1;
768}
769
770int drbd_resync_finished(struct drbd_conf *mdev)
771{
772 unsigned long db, dt, dbdt;
773 unsigned long n_oos;
774 union drbd_state os, ns;
775 struct drbd_work *w;
776 char *khelper_cmd = NULL;
777
778 /* Remove all elements from the resync LRU. Since future actions
779 * might set bits in the (main) bitmap, then the entries in the
780 * resync LRU would be wrong. */
781 if (drbd_rs_del_all(mdev)) {
782 /* In case this is not possible now, most probably because
783 * there are P_RS_DATA_REPLY Packets lingering on the worker's
784 * queue (or even the read operations for those packets
785 * is not finished by now). Retry in 100ms. */
786
787 drbd_kick_lo(mdev);
788 __set_current_state(TASK_INTERRUPTIBLE);
789 schedule_timeout(HZ / 10);
790 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
791 if (w) {
792 w->cb = w_resync_finished;
793 drbd_queue_work(&mdev->data.work, w);
794 return 1;
795 }
796 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
797 }
798
799 dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
800 if (dt <= 0)
801 dt = 1;
802 db = mdev->rs_total;
803 dbdt = Bit2KB(db/dt);
804 mdev->rs_paused /= HZ;
805
806 if (!get_ldev(mdev))
807 goto out;
808
809 spin_lock_irq(&mdev->req_lock);
810 os = mdev->state;
811
812 /* This protects us against multiple calls (that can happen in the presence
813 of application IO), and against connectivity loss just before we arrive here. */
814 if (os.conn <= C_CONNECTED)
815 goto out_unlock;
816
817 ns = os;
818 ns.conn = C_CONNECTED;
819
820 dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
821 (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
822 "Online verify " : "Resync",
823 dt + mdev->rs_paused, mdev->rs_paused, dbdt);
824
825 n_oos = drbd_bm_total_weight(mdev);
826
827 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
828 if (n_oos) {
829 dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
830 n_oos, Bit2KB(1));
831 khelper_cmd = "out-of-sync";
832 }
833 } else {
834 D_ASSERT((n_oos - mdev->rs_failed) == 0);
835
836 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
837 khelper_cmd = "after-resync-target";
838
839 if (mdev->csums_tfm && mdev->rs_total) {
840 const unsigned long s = mdev->rs_same_csum;
841 const unsigned long t = mdev->rs_total;
842 const int ratio =
843 (t == 0) ? 0 :
844 (t < 100000) ? ((s*100)/t) : (s/(t/100));
845 dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
846 "transferred %luK total %luK\n",
847 ratio,
848 Bit2KB(mdev->rs_same_csum),
849 Bit2KB(mdev->rs_total - mdev->rs_same_csum),
850 Bit2KB(mdev->rs_total));
851 }
852 }
853
854 if (mdev->rs_failed) {
855 dev_info(DEV, " %lu failed blocks\n", mdev->rs_failed);
856
857 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
858 ns.disk = D_INCONSISTENT;
859 ns.pdsk = D_UP_TO_DATE;
860 } else {
861 ns.disk = D_UP_TO_DATE;
862 ns.pdsk = D_INCONSISTENT;
863 }
864 } else {
865 ns.disk = D_UP_TO_DATE;
866 ns.pdsk = D_UP_TO_DATE;
867
868 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
869 if (mdev->p_uuid) {
870 int i;
871 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
872 _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
873 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
874 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
875 } else {
876 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
877 }
878 }
879
880 drbd_uuid_set_bm(mdev, 0UL);
881
882 if (mdev->p_uuid) {
883 /* Now the two UUID sets are equal, update what we
884 * know of the peer. */
885 int i;
886 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
887 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
888 }
889 }
890
891 _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
892out_unlock:
893 spin_unlock_irq(&mdev->req_lock);
894 put_ldev(mdev);
895out:
896 mdev->rs_total = 0;
897 mdev->rs_failed = 0;
898 mdev->rs_paused = 0;
899 mdev->ov_start_sector = 0;
900
901 if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
902 dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
903 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
904 }
905
906 if (khelper_cmd)
907 drbd_khelper(mdev, khelper_cmd);
908
909 return 1;
910}
911
912/* helper */
913static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
914{
45bb912b 915 if (drbd_ee_has_active_page(e)) {
b411b363
PR
916 /* This might happen if sendpage() has not finished */
917 spin_lock_irq(&mdev->req_lock);
918 list_add_tail(&e->w.list, &mdev->net_ee);
919 spin_unlock_irq(&mdev->req_lock);
920 } else
921 drbd_free_ee(mdev, e);
922}
923
924/**
925 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
926 * @mdev: DRBD device.
927 * @w: work object.
928 * @cancel: The connection will be closed anyways
929 */
930int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
931{
932 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
933 int ok;
934
935 if (unlikely(cancel)) {
936 drbd_free_ee(mdev, e);
937 dec_unacked(mdev);
938 return 1;
939 }
940
45bb912b 941 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
942 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
943 } else {
944 if (__ratelimit(&drbd_ratelimit_state))
945 dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
946 (unsigned long long)e->sector);
947
948 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
949 }
950
951 dec_unacked(mdev);
952
953 move_to_net_ee_or_free(mdev, e);
954
955 if (unlikely(!ok))
956 dev_err(DEV, "drbd_send_block() failed\n");
957 return ok;
958}
959
960/**
961 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
962 * @mdev: DRBD device.
963 * @w: work object.
964 * @cancel: The connection will be closed anyways
965 */
966int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
967{
968 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
969 int ok;
970
971 if (unlikely(cancel)) {
972 drbd_free_ee(mdev, e);
973 dec_unacked(mdev);
974 return 1;
975 }
976
977 if (get_ldev_if_state(mdev, D_FAILED)) {
978 drbd_rs_complete_io(mdev, e->sector);
979 put_ldev(mdev);
980 }
981
45bb912b 982 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
983 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
984 inc_rs_pending(mdev);
985 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
986 } else {
987 if (__ratelimit(&drbd_ratelimit_state))
988 dev_err(DEV, "Not sending RSDataReply, "
989 "partner DISKLESS!\n");
990 ok = 1;
991 }
992 } else {
993 if (__ratelimit(&drbd_ratelimit_state))
994 dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
995 (unsigned long long)e->sector);
996
997 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
998
999 /* update resync data with failure */
1000 drbd_rs_failed_io(mdev, e->sector, e->size);
1001 }
1002
1003 dec_unacked(mdev);
1004
1005 move_to_net_ee_or_free(mdev, e);
1006
1007 if (unlikely(!ok))
1008 dev_err(DEV, "drbd_send_block() failed\n");
1009 return ok;
1010}
1011
1012int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1013{
1014 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1015 struct digest_info *di;
1016 int digest_size;
1017 void *digest = NULL;
1018 int ok, eq = 0;
1019
1020 if (unlikely(cancel)) {
1021 drbd_free_ee(mdev, e);
1022 dec_unacked(mdev);
1023 return 1;
1024 }
1025
1026 drbd_rs_complete_io(mdev, e->sector);
1027
85719573 1028 di = e->digest;
b411b363 1029
45bb912b 1030 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1031 /* quick hack to try to avoid a race against reconfiguration.
1032 * a real fix would be much more involved,
1033 * introducing more locking mechanisms */
1034 if (mdev->csums_tfm) {
1035 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1036 D_ASSERT(digest_size == di->digest_size);
1037 digest = kmalloc(digest_size, GFP_NOIO);
1038 }
1039 if (digest) {
45bb912b 1040 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
b411b363
PR
1041 eq = !memcmp(digest, di->digest, digest_size);
1042 kfree(digest);
1043 }
1044
1045 if (eq) {
1046 drbd_set_in_sync(mdev, e->sector, e->size);
676396d5
LE
1047 /* rs_same_csums unit is BM_BLOCK_SIZE */
1048 mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
b411b363
PR
1049 ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1050 } else {
1051 inc_rs_pending(mdev);
204bba99
PR
1052 e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1053 e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1054 kfree(di);
b411b363
PR
1055 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1056 }
1057 } else {
1058 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1059 if (__ratelimit(&drbd_ratelimit_state))
1060 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1061 }
1062
1063 dec_unacked(mdev);
b411b363
PR
1064 move_to_net_ee_or_free(mdev, e);
1065
1066 if (unlikely(!ok))
1067 dev_err(DEV, "drbd_send_block/ack() failed\n");
1068 return ok;
1069}
1070
1071int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1072{
1073 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1074 int digest_size;
1075 void *digest;
1076 int ok = 1;
1077
1078 if (unlikely(cancel))
1079 goto out;
1080
45bb912b 1081 if (unlikely((e->flags & EE_WAS_ERROR) != 0))
b411b363
PR
1082 goto out;
1083
1084 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1085 /* FIXME if this allocation fails, online verify will not terminate! */
1086 digest = kmalloc(digest_size, GFP_NOIO);
1087 if (digest) {
45bb912b 1088 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
b411b363
PR
1089 inc_rs_pending(mdev);
1090 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1091 digest, digest_size, P_OV_REPLY);
1092 if (!ok)
1093 dec_rs_pending(mdev);
1094 kfree(digest);
1095 }
1096
1097out:
1098 drbd_free_ee(mdev, e);
1099
1100 dec_unacked(mdev);
1101
1102 return ok;
1103}
1104
1105void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1106{
1107 if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1108 mdev->ov_last_oos_size += size>>9;
1109 } else {
1110 mdev->ov_last_oos_start = sector;
1111 mdev->ov_last_oos_size = size>>9;
1112 }
1113 drbd_set_out_of_sync(mdev, sector, size);
1114 set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1115}
1116
1117int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1118{
1119 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1120 struct digest_info *di;
1121 int digest_size;
1122 void *digest;
1123 int ok, eq = 0;
1124
1125 if (unlikely(cancel)) {
1126 drbd_free_ee(mdev, e);
1127 dec_unacked(mdev);
1128 return 1;
1129 }
1130
1131 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1132 * the resync lru has been cleaned up already */
1133 drbd_rs_complete_io(mdev, e->sector);
1134
85719573 1135 di = e->digest;
b411b363 1136
45bb912b 1137 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1138 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1139 digest = kmalloc(digest_size, GFP_NOIO);
1140 if (digest) {
45bb912b 1141 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
b411b363
PR
1142
1143 D_ASSERT(digest_size == di->digest_size);
1144 eq = !memcmp(digest, di->digest, digest_size);
1145 kfree(digest);
1146 }
1147 } else {
1148 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1149 if (__ratelimit(&drbd_ratelimit_state))
1150 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1151 }
1152
1153 dec_unacked(mdev);
b411b363
PR
1154 if (!eq)
1155 drbd_ov_oos_found(mdev, e->sector, e->size);
1156 else
1157 ov_oos_print(mdev);
1158
1159 ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1160 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1161
1162 drbd_free_ee(mdev, e);
1163
1164 if (--mdev->ov_left == 0) {
1165 ov_oos_print(mdev);
1166 drbd_resync_finished(mdev);
1167 }
1168
1169 return ok;
1170}
1171
1172int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1173{
1174 struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1175 complete(&b->done);
1176 return 1;
1177}
1178
1179int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1180{
1181 struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1182 struct p_barrier *p = &mdev->data.sbuf.barrier;
1183 int ok = 1;
1184
1185 /* really avoid racing with tl_clear. w.cb may have been referenced
1186 * just before it was reassigned and re-queued, so double check that.
1187 * actually, this race was harmless, since we only try to send the
1188 * barrier packet here, and otherwise do nothing with the object.
1189 * but compare with the head of w_clear_epoch */
1190 spin_lock_irq(&mdev->req_lock);
1191 if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1192 cancel = 1;
1193 spin_unlock_irq(&mdev->req_lock);
1194 if (cancel)
1195 return 1;
1196
1197 if (!drbd_get_data_sock(mdev))
1198 return 0;
1199 p->barrier = b->br_number;
1200 /* inc_ap_pending was done where this was queued.
1201 * dec_ap_pending will be done in got_BarrierAck
1202 * or (on connection loss) in w_clear_epoch. */
1203 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
0b70a13d 1204 (struct p_header80 *)p, sizeof(*p), 0);
b411b363
PR
1205 drbd_put_data_sock(mdev);
1206
1207 return ok;
1208}
1209
1210int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1211{
1212 if (cancel)
1213 return 1;
1214 return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1215}
1216
1217/**
1218 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1219 * @mdev: DRBD device.
1220 * @w: work object.
1221 * @cancel: The connection will be closed anyways
1222 */
1223int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1224{
1225 struct drbd_request *req = container_of(w, struct drbd_request, w);
1226 int ok;
1227
1228 if (unlikely(cancel)) {
1229 req_mod(req, send_canceled);
1230 return 1;
1231 }
1232
1233 ok = drbd_send_dblock(mdev, req);
1234 req_mod(req, ok ? handed_over_to_network : send_failed);
1235
1236 return ok;
1237}
1238
1239/**
1240 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1241 * @mdev: DRBD device.
1242 * @w: work object.
1243 * @cancel: The connection will be closed anyways
1244 */
1245int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1246{
1247 struct drbd_request *req = container_of(w, struct drbd_request, w);
1248 int ok;
1249
1250 if (unlikely(cancel)) {
1251 req_mod(req, send_canceled);
1252 return 1;
1253 }
1254
1255 ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1256 (unsigned long)req);
1257
1258 if (!ok) {
1259 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1260 * so this is probably redundant */
1261 if (mdev->state.conn >= C_CONNECTED)
1262 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1263 }
1264 req_mod(req, ok ? handed_over_to_network : send_failed);
1265
1266 return ok;
1267}
1268
265be2d0
PR
1269int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1270{
1271 struct drbd_request *req = container_of(w, struct drbd_request, w);
1272
0778286a 1273 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
265be2d0
PR
1274 drbd_al_begin_io(mdev, req->sector);
1275 /* Calling drbd_al_begin_io() out of the worker might deadlocks
1276 theoretically. Practically it can not deadlock, since this is
1277 only used when unfreezing IOs. All the extents of the requests
1278 that made it into the TL are already active */
1279
1280 drbd_req_make_private_bio(req, req->master_bio);
1281 req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1282 generic_make_request(req->private_bio);
1283
1284 return 1;
1285}
1286
b411b363
PR
1287static int _drbd_may_sync_now(struct drbd_conf *mdev)
1288{
1289 struct drbd_conf *odev = mdev;
1290
1291 while (1) {
1292 if (odev->sync_conf.after == -1)
1293 return 1;
1294 odev = minor_to_mdev(odev->sync_conf.after);
1295 ERR_IF(!odev) return 1;
1296 if ((odev->state.conn >= C_SYNC_SOURCE &&
1297 odev->state.conn <= C_PAUSED_SYNC_T) ||
1298 odev->state.aftr_isp || odev->state.peer_isp ||
1299 odev->state.user_isp)
1300 return 0;
1301 }
1302}
1303
1304/**
1305 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1306 * @mdev: DRBD device.
1307 *
1308 * Called from process context only (admin command and after_state_ch).
1309 */
1310static int _drbd_pause_after(struct drbd_conf *mdev)
1311{
1312 struct drbd_conf *odev;
1313 int i, rv = 0;
1314
1315 for (i = 0; i < minor_count; i++) {
1316 odev = minor_to_mdev(i);
1317 if (!odev)
1318 continue;
1319 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1320 continue;
1321 if (!_drbd_may_sync_now(odev))
1322 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1323 != SS_NOTHING_TO_DO);
1324 }
1325
1326 return rv;
1327}
1328
1329/**
1330 * _drbd_resume_next() - Resume resync on all devices that may resync now
1331 * @mdev: DRBD device.
1332 *
1333 * Called from process context only (admin command and worker).
1334 */
1335static int _drbd_resume_next(struct drbd_conf *mdev)
1336{
1337 struct drbd_conf *odev;
1338 int i, rv = 0;
1339
1340 for (i = 0; i < minor_count; i++) {
1341 odev = minor_to_mdev(i);
1342 if (!odev)
1343 continue;
1344 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1345 continue;
1346 if (odev->state.aftr_isp) {
1347 if (_drbd_may_sync_now(odev))
1348 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1349 CS_HARD, NULL)
1350 != SS_NOTHING_TO_DO) ;
1351 }
1352 }
1353 return rv;
1354}
1355
1356void resume_next_sg(struct drbd_conf *mdev)
1357{
1358 write_lock_irq(&global_state_lock);
1359 _drbd_resume_next(mdev);
1360 write_unlock_irq(&global_state_lock);
1361}
1362
1363void suspend_other_sg(struct drbd_conf *mdev)
1364{
1365 write_lock_irq(&global_state_lock);
1366 _drbd_pause_after(mdev);
1367 write_unlock_irq(&global_state_lock);
1368}
1369
1370static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1371{
1372 struct drbd_conf *odev;
1373
1374 if (o_minor == -1)
1375 return NO_ERROR;
1376 if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1377 return ERR_SYNC_AFTER;
1378
1379 /* check for loops */
1380 odev = minor_to_mdev(o_minor);
1381 while (1) {
1382 if (odev == mdev)
1383 return ERR_SYNC_AFTER_CYCLE;
1384
1385 /* dependency chain ends here, no cycles. */
1386 if (odev->sync_conf.after == -1)
1387 return NO_ERROR;
1388
1389 /* follow the dependency chain */
1390 odev = minor_to_mdev(odev->sync_conf.after);
1391 }
1392}
1393
1394int drbd_alter_sa(struct drbd_conf *mdev, int na)
1395{
1396 int changes;
1397 int retcode;
1398
1399 write_lock_irq(&global_state_lock);
1400 retcode = sync_after_error(mdev, na);
1401 if (retcode == NO_ERROR) {
1402 mdev->sync_conf.after = na;
1403 do {
1404 changes = _drbd_pause_after(mdev);
1405 changes |= _drbd_resume_next(mdev);
1406 } while (changes);
1407 }
1408 write_unlock_irq(&global_state_lock);
1409 return retcode;
1410}
1411
309d1608
PR
1412static void ping_peer(struct drbd_conf *mdev)
1413{
1414 clear_bit(GOT_PING_ACK, &mdev->flags);
1415 request_ping(mdev);
1416 wait_event(mdev->misc_wait,
1417 test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
1418}
1419
b411b363
PR
1420/**
1421 * drbd_start_resync() - Start the resync process
1422 * @mdev: DRBD device.
1423 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1424 *
1425 * This function might bring you directly into one of the
1426 * C_PAUSED_SYNC_* states.
1427 */
1428void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1429{
1430 union drbd_state ns;
1431 int r;
1432
1433 if (mdev->state.conn >= C_SYNC_SOURCE) {
1434 dev_err(DEV, "Resync already running!\n");
1435 return;
1436 }
1437
b411b363
PR
1438 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1439 drbd_rs_cancel_all(mdev);
1440
1441 if (side == C_SYNC_TARGET) {
1442 /* Since application IO was locked out during C_WF_BITMAP_T and
1443 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1444 we check that we might make the data inconsistent. */
1445 r = drbd_khelper(mdev, "before-resync-target");
1446 r = (r >> 8) & 0xff;
1447 if (r > 0) {
1448 dev_info(DEV, "before-resync-target handler returned %d, "
1449 "dropping connection.\n", r);
1450 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1451 return;
1452 }
1453 }
1454
1455 drbd_state_lock(mdev);
1456
1457 if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1458 drbd_state_unlock(mdev);
1459 return;
1460 }
1461
1462 if (side == C_SYNC_TARGET) {
1463 mdev->bm_resync_fo = 0;
1464 } else /* side == C_SYNC_SOURCE */ {
1465 u64 uuid;
1466
1467 get_random_bytes(&uuid, sizeof(u64));
1468 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1469 drbd_send_sync_uuid(mdev, uuid);
1470
1471 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1472 }
1473
1474 write_lock_irq(&global_state_lock);
1475 ns = mdev->state;
1476
1477 ns.aftr_isp = !_drbd_may_sync_now(mdev);
1478
1479 ns.conn = side;
1480
1481 if (side == C_SYNC_TARGET)
1482 ns.disk = D_INCONSISTENT;
1483 else /* side == C_SYNC_SOURCE */
1484 ns.pdsk = D_INCONSISTENT;
1485
1486 r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1487 ns = mdev->state;
1488
1489 if (ns.conn < C_CONNECTED)
1490 r = SS_UNKNOWN_ERROR;
1491
1492 if (r == SS_SUCCESS) {
1d7734a0
LE
1493 unsigned long tw = drbd_bm_total_weight(mdev);
1494 unsigned long now = jiffies;
1495 int i;
1496
b411b363
PR
1497 mdev->rs_failed = 0;
1498 mdev->rs_paused = 0;
b411b363 1499 mdev->rs_same_csum = 0;
0f0601f4
LE
1500 mdev->rs_last_events = 0;
1501 mdev->rs_last_sect_ev = 0;
1d7734a0
LE
1502 mdev->rs_total = tw;
1503 mdev->rs_start = now;
1504 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1505 mdev->rs_mark_left[i] = tw;
1506 mdev->rs_mark_time[i] = now;
1507 }
b411b363
PR
1508 _drbd_pause_after(mdev);
1509 }
1510 write_unlock_irq(&global_state_lock);
b411b363
PR
1511 put_ldev(mdev);
1512
1513 if (r == SS_SUCCESS) {
1514 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1515 drbd_conn_str(ns.conn),
1516 (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1517 (unsigned long) mdev->rs_total);
1518
1519 if (mdev->rs_total == 0) {
1520 /* Peer still reachable? Beware of failing before-resync-target handlers! */
309d1608 1521 ping_peer(mdev);
b411b363 1522 drbd_resync_finished(mdev);
b411b363
PR
1523 }
1524
778f271d 1525 atomic_set(&mdev->rs_sect_in, 0);
0f0601f4 1526 atomic_set(&mdev->rs_sect_ev, 0);
778f271d
PR
1527 mdev->rs_in_flight = 0;
1528 mdev->rs_planed = 0;
1529 spin_lock(&mdev->peer_seq_lock);
1530 fifo_set(&mdev->rs_plan_s, 0);
1531 spin_unlock(&mdev->peer_seq_lock);
b411b363
PR
1532 /* ns.conn may already be != mdev->state.conn,
1533 * we may have been paused in between, or become paused until
1534 * the timer triggers.
1535 * No matter, that is handled in resync_timer_fn() */
1536 if (ns.conn == C_SYNC_TARGET)
1537 mod_timer(&mdev->resync_timer, jiffies);
1538
1539 drbd_md_sync(mdev);
1540 }
d0c3f60f 1541 drbd_state_unlock(mdev);
b411b363
PR
1542}
1543
1544int drbd_worker(struct drbd_thread *thi)
1545{
1546 struct drbd_conf *mdev = thi->mdev;
1547 struct drbd_work *w = NULL;
1548 LIST_HEAD(work_list);
1549 int intr = 0, i;
1550
1551 sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1552
1553 while (get_t_state(thi) == Running) {
1554 drbd_thread_current_set_cpu(mdev);
1555
1556 if (down_trylock(&mdev->data.work.s)) {
1557 mutex_lock(&mdev->data.mutex);
1558 if (mdev->data.socket && !mdev->net_conf->no_cork)
1559 drbd_tcp_uncork(mdev->data.socket);
1560 mutex_unlock(&mdev->data.mutex);
1561
1562 intr = down_interruptible(&mdev->data.work.s);
1563
1564 mutex_lock(&mdev->data.mutex);
1565 if (mdev->data.socket && !mdev->net_conf->no_cork)
1566 drbd_tcp_cork(mdev->data.socket);
1567 mutex_unlock(&mdev->data.mutex);
1568 }
1569
1570 if (intr) {
1571 D_ASSERT(intr == -EINTR);
1572 flush_signals(current);
1573 ERR_IF (get_t_state(thi) == Running)
1574 continue;
1575 break;
1576 }
1577
1578 if (get_t_state(thi) != Running)
1579 break;
1580 /* With this break, we have done a down() but not consumed
1581 the entry from the list. The cleanup code takes care of
1582 this... */
1583
1584 w = NULL;
1585 spin_lock_irq(&mdev->data.work.q_lock);
1586 ERR_IF(list_empty(&mdev->data.work.q)) {
1587 /* something terribly wrong in our logic.
1588 * we were able to down() the semaphore,
1589 * but the list is empty... doh.
1590 *
1591 * what is the best thing to do now?
1592 * try again from scratch, restarting the receiver,
1593 * asender, whatnot? could break even more ugly,
1594 * e.g. when we are primary, but no good local data.
1595 *
1596 * I'll try to get away just starting over this loop.
1597 */
1598 spin_unlock_irq(&mdev->data.work.q_lock);
1599 continue;
1600 }
1601 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1602 list_del_init(&w->list);
1603 spin_unlock_irq(&mdev->data.work.q_lock);
1604
1605 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1606 /* dev_warn(DEV, "worker: a callback failed! \n"); */
1607 if (mdev->state.conn >= C_CONNECTED)
1608 drbd_force_state(mdev,
1609 NS(conn, C_NETWORK_FAILURE));
1610 }
1611 }
1612 D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1613 D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1614
1615 spin_lock_irq(&mdev->data.work.q_lock);
1616 i = 0;
1617 while (!list_empty(&mdev->data.work.q)) {
1618 list_splice_init(&mdev->data.work.q, &work_list);
1619 spin_unlock_irq(&mdev->data.work.q_lock);
1620
1621 while (!list_empty(&work_list)) {
1622 w = list_entry(work_list.next, struct drbd_work, list);
1623 list_del_init(&w->list);
1624 w->cb(mdev, w, 1);
1625 i++; /* dead debugging code */
1626 }
1627
1628 spin_lock_irq(&mdev->data.work.q_lock);
1629 }
1630 sema_init(&mdev->data.work.s, 0);
1631 /* DANGEROUS race: if someone did queue his work within the spinlock,
1632 * but up() ed outside the spinlock, we could get an up() on the
1633 * semaphore without corresponding list entry.
1634 * So don't do that.
1635 */
1636 spin_unlock_irq(&mdev->data.work.q_lock);
1637
1638 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1639 /* _drbd_set_state only uses stop_nowait.
1640 * wait here for the Exiting receiver. */
1641 drbd_thread_stop(&mdev->receiver);
1642 drbd_mdev_cleanup(mdev);
1643
1644 dev_info(DEV, "worker terminated\n");
1645
1646 clear_bit(DEVICE_DYING, &mdev->flags);
1647 clear_bit(CONFIG_PENDING, &mdev->flags);
1648 wake_up(&mdev->state_wait);
1649
1650 return 0;
1651}