4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
28 #include <linux/slab.h>
29 #include <linux/drbd.h>
34 static bool drbd_may_do_local_read(struct drbd_conf
*mdev
, sector_t sector
, int size
);
36 /* Update disk stats at start of I/O request */
37 static void _drbd_start_io_acct(struct drbd_conf
*mdev
, struct drbd_request
*req
, struct bio
*bio
)
39 const int rw
= bio_data_dir(bio
);
41 cpu
= part_stat_lock();
42 part_round_stats(cpu
, &mdev
->vdisk
->part0
);
43 part_stat_inc(cpu
, &mdev
->vdisk
->part0
, ios
[rw
]);
44 part_stat_add(cpu
, &mdev
->vdisk
->part0
, sectors
[rw
], bio_sectors(bio
));
45 (void) cpu
; /* The macro invocations above want the cpu argument, I do not like
46 the compiler warning about cpu only assigned but never used... */
47 part_inc_in_flight(&mdev
->vdisk
->part0
, rw
);
51 /* Update disk stats when completing request upwards */
52 static void _drbd_end_io_acct(struct drbd_conf
*mdev
, struct drbd_request
*req
)
54 int rw
= bio_data_dir(req
->master_bio
);
55 unsigned long duration
= jiffies
- req
->start_time
;
57 cpu
= part_stat_lock();
58 part_stat_add(cpu
, &mdev
->vdisk
->part0
, ticks
[rw
], duration
);
59 part_round_stats(cpu
, &mdev
->vdisk
->part0
);
60 part_dec_in_flight(&mdev
->vdisk
->part0
, rw
);
64 static struct drbd_request
*drbd_req_new(struct drbd_conf
*mdev
,
67 struct drbd_request
*req
;
69 req
= mempool_alloc(drbd_request_mempool
, GFP_NOIO
);
73 drbd_req_make_private_bio(req
, bio_src
);
74 req
->rq_state
= bio_data_dir(bio_src
) == WRITE
? RQ_WRITE
: 0;
76 req
->master_bio
= bio_src
;
79 drbd_clear_interval(&req
->i
);
80 req
->i
.sector
= bio_src
->bi_sector
;
81 req
->i
.size
= bio_src
->bi_size
;
83 req
->i
.waiting
= false;
85 INIT_LIST_HEAD(&req
->tl_requests
);
86 INIT_LIST_HEAD(&req
->w
.list
);
88 atomic_set(&req
->completion_ref
, 1);
89 kref_init(&req
->kref
);
93 static void drbd_req_destroy(struct kref
*kref
)
95 struct drbd_request
*req
= container_of(kref
, struct drbd_request
, kref
);
96 struct drbd_conf
*mdev
= req
->w
.mdev
;
97 const unsigned long s
= req
->rq_state
;
99 /* remove it from the transfer log.
100 * well, only if it had been there in the first
101 * place... if it had not (local only or conflicting
102 * and never sent), it should still be "empty" as
103 * initialized in drbd_req_new(), so we can list_del() it
104 * here unconditionally */
105 list_del_init(&req
->tl_requests
);
107 /* if it was a write, we may have to set the corresponding
108 * bit(s) out-of-sync first. If it had a local part, we need to
109 * release the reference to the activity log. */
111 /* Set out-of-sync unless both OK flags are set
112 * (local only or remote failed).
113 * Other places where we set out-of-sync:
114 * READ with local io-error */
115 if (!(s
& RQ_NET_OK
) || !(s
& RQ_LOCAL_OK
))
116 drbd_set_out_of_sync(mdev
, req
->i
.sector
, req
->i
.size
);
118 if ((s
& RQ_NET_OK
) && (s
& RQ_LOCAL_OK
) && (s
& RQ_NET_SIS
))
119 drbd_set_in_sync(mdev
, req
->i
.sector
, req
->i
.size
);
121 /* one might be tempted to move the drbd_al_complete_io
122 * to the local io completion callback drbd_request_endio.
123 * but, if this was a mirror write, we may only
124 * drbd_al_complete_io after this is RQ_NET_DONE,
125 * otherwise the extent could be dropped from the al
126 * before it has actually been written on the peer.
127 * if we crash before our peer knows about the request,
128 * but after the extent has been dropped from the al,
129 * we would forget to resync the corresponding extent.
131 if (s
& RQ_LOCAL_MASK
) {
132 if (get_ldev_if_state(mdev
, D_FAILED
)) {
133 if (s
& RQ_IN_ACT_LOG
)
134 drbd_al_complete_io(mdev
, &req
->i
);
136 } else if (__ratelimit(&drbd_ratelimit_state
)) {
137 dev_warn(DEV
, "Should have called drbd_al_complete_io(, %llu, %u), "
138 "but my Disk seems to have failed :(\n",
139 (unsigned long long) req
->i
.sector
, req
->i
.size
);
144 if (s
& RQ_POSTPONED
)
145 drbd_restart_request(req
);
147 mempool_free(req
, drbd_request_mempool
);
150 static void wake_all_senders(struct drbd_tconn
*tconn
) {
151 wake_up(&tconn
->sender_work
.q_wait
);
154 /* must hold resource->req_lock */
155 static void start_new_tl_epoch(struct drbd_tconn
*tconn
)
157 tconn
->current_tle_writes
= 0;
158 atomic_inc(&tconn
->current_tle_nr
);
159 wake_all_senders(tconn
);
162 void complete_master_bio(struct drbd_conf
*mdev
,
163 struct bio_and_error
*m
)
165 bio_endio(m
->bio
, m
->error
);
170 static void drbd_remove_request_interval(struct rb_root
*root
,
171 struct drbd_request
*req
)
173 struct drbd_conf
*mdev
= req
->w
.mdev
;
174 struct drbd_interval
*i
= &req
->i
;
176 drbd_remove_interval(root
, i
);
178 /* Wake up any processes waiting for this request to complete. */
180 wake_up(&mdev
->misc_wait
);
183 static void maybe_wakeup_conflicting_requests(struct drbd_request
*req
)
185 const unsigned long s
= req
->rq_state
;
186 if (s
& RQ_LOCAL_PENDING
&& !(s
& RQ_LOCAL_ABORTED
))
189 /* Retry all conflicting peer requests. */
190 wake_up(&req
->w
.mdev
->misc_wait
);
194 void req_may_be_done(struct drbd_request
*req
)
196 const unsigned long s
= req
->rq_state
;
198 /* req->master_bio still present means: Not yet completed.
200 * Unless this is RQ_POSTPONED, which will cause drbd_req_destroy() to
201 * queue it on the retry workqueue instead of destroying it.
203 if (req
->master_bio
&& !(s
& RQ_POSTPONED
))
206 /* Local still pending, even though master_bio is already completed?
207 * may happen for RQ_LOCAL_ABORTED requests. */
208 if (s
& RQ_LOCAL_PENDING
)
211 if ((s
& RQ_NET_MASK
) == 0 || (s
& RQ_NET_DONE
)) {
212 /* this is disconnected (local only) operation,
213 * or protocol A, B, or C P_BARRIER_ACK,
214 * or killed from the transfer log due to connection loss. */
215 kref_put(&req
->kref
, drbd_req_destroy
);
217 /* else: network part and not DONE yet. that is
218 * protocol A, B, or C, barrier ack still pending... */
221 /* Helper for __req_mod().
222 * Set m->bio to the master bio, if it is fit to be completed,
223 * or leave it alone (it is initialized to NULL in __req_mod),
224 * if it has already been completed, or cannot be completed yet.
225 * If m->bio is set, the error status to be returned is placed in m->error.
228 void req_may_be_completed(struct drbd_request
*req
, struct bio_and_error
*m
)
230 const unsigned long s
= req
->rq_state
;
231 struct drbd_conf
*mdev
= req
->w
.mdev
;
233 /* we must not complete the master bio, while it is
234 * still being processed by _drbd_send_zc_bio (drbd_send_dblock)
235 * not yet acknowledged by the peer
236 * not yet completed by the local io subsystem
237 * these flags may get cleared in any order by
240 * the bio_endio completion callbacks.
242 if (s
& RQ_LOCAL_PENDING
&& !(s
& RQ_LOCAL_ABORTED
))
244 if (s
& RQ_NET_QUEUED
)
246 if (s
& RQ_NET_PENDING
)
250 * instead of all the RQ_FLAGS, actually use the completion_ref
251 * to decide if this is ready to be completed. */
252 if (req
->master_bio
) {
253 int complete
= atomic_dec_and_test(&req
->completion_ref
);
254 D_ASSERT(complete
!= 0);
256 D_ASSERT(atomic_read(&req
->completion_ref
) == 0);
258 if (req
->master_bio
) {
259 int rw
= bio_rw(req
->master_bio
);
261 /* this is DATA_RECEIVED (remote read)
262 * or protocol C P_WRITE_ACK
263 * or protocol B P_RECV_ACK
264 * or protocol A "HANDED_OVER_TO_NETWORK" (SendAck)
265 * or canceled or failed,
266 * or killed from the transfer log due to connection loss.
270 * figure out whether to report success or failure.
272 * report success when at least one of the operations succeeded.
273 * or, to put the other way,
274 * only report failure, when both operations failed.
276 * what to do about the failures is handled elsewhere.
277 * what we need to do here is just: complete the master_bio.
279 * local completion error, if any, has been stored as ERR_PTR
280 * in private_bio within drbd_request_endio.
282 int ok
= (s
& RQ_LOCAL_OK
) || (s
& RQ_NET_OK
);
283 int error
= PTR_ERR(req
->private_bio
);
285 /* remove the request from the conflict detection
286 * respective block_id verification hash */
287 if (!drbd_interval_empty(&req
->i
)) {
288 struct rb_root
*root
;
291 root
= &mdev
->write_requests
;
293 root
= &mdev
->read_requests
;
294 drbd_remove_request_interval(root
, req
);
295 } else if (!(s
& RQ_POSTPONED
))
296 D_ASSERT((s
& (RQ_NET_MASK
& ~RQ_NET_DONE
)) == 0);
298 /* Before we can signal completion to the upper layers,
299 * we may need to close the current transfer log epoch.
300 * We are within the request lock, so we can simply compare
301 * the request epoch number with the current transfer log
302 * epoch number. If they match, increase the current_tle_nr,
303 * and reset the transfer log epoch write_cnt.
306 req
->epoch
== atomic_read(&mdev
->tconn
->current_tle_nr
))
307 start_new_tl_epoch(mdev
->tconn
);
309 /* Update disk stats */
310 _drbd_end_io_acct(mdev
, req
);
313 * have it be pushed back to the retry work queue,
314 * so it will re-enter __drbd_make_request(),
315 * and be re-assigned to a suitable local or remote path,
316 * or failed if we do not have access to good data anymore.
318 * Unless it was failed early by __drbd_make_request(),
319 * because no path was available, in which case
320 * it was not even added to the transfer_log.
322 * READA may fail, and will not be retried.
324 * WRITE should have used all available paths already.
326 if (!ok
&& rw
== READ
&& !list_empty(&req
->tl_requests
))
327 req
->rq_state
|= RQ_POSTPONED
;
329 if (!(req
->rq_state
& RQ_POSTPONED
)) {
330 m
->error
= ok
? 0 : (error
?: -EIO
);
331 m
->bio
= req
->master_bio
;
332 req
->master_bio
= NULL
;
334 /* Assert that this will be _req_is_done()
335 * with this very invokation. */
337 * what about (RQ_LOCAL_PENDING | RQ_LOCAL_ABORTED)?
339 D_ASSERT(!(s
& RQ_LOCAL_PENDING
));
340 D_ASSERT((s
& RQ_NET_MASK
) == 0 || (s
& RQ_NET_DONE
));
343 req_may_be_done(req
);
346 static void req_may_be_completed_not_susp(struct drbd_request
*req
, struct bio_and_error
*m
)
348 struct drbd_conf
*mdev
= req
->w
.mdev
;
350 if (!drbd_suspended(mdev
))
351 req_may_be_completed(req
, m
);
354 /* obviously this could be coded as many single functions
355 * instead of one huge switch,
356 * or by putting the code directly in the respective locations
357 * (as it has been before).
359 * but having it this way
360 * enforces that it is all in this one place, where it is easier to audit,
361 * it makes it obvious that whatever "event" "happens" to a request should
362 * happen "atomically" within the req_lock,
363 * and it enforces that we have to think in a very structured manner
364 * about the "events" that may happen to a request during its life time ...
366 int __req_mod(struct drbd_request
*req
, enum drbd_req_event what
,
367 struct bio_and_error
*m
)
369 struct drbd_conf
*mdev
= req
->w
.mdev
;
378 dev_err(DEV
, "LOGIC BUG in %s:%u\n", __FILE__
, __LINE__
);
381 /* does not happen...
382 * initialization done in drbd_req_new
387 case TO_BE_SENT
: /* via network */
388 /* reached via __drbd_make_request
389 * and from w_read_retry_remote */
390 D_ASSERT(!(req
->rq_state
& RQ_NET_MASK
));
391 req
->rq_state
|= RQ_NET_PENDING
;
393 nc
= rcu_dereference(mdev
->tconn
->net_conf
);
394 p
= nc
->wire_protocol
;
397 p
== DRBD_PROT_C
? RQ_EXP_WRITE_ACK
:
398 p
== DRBD_PROT_B
? RQ_EXP_RECEIVE_ACK
: 0;
399 inc_ap_pending(mdev
);
402 case TO_BE_SUBMITTED
: /* locally */
403 /* reached via __drbd_make_request */
404 D_ASSERT(!(req
->rq_state
& RQ_LOCAL_MASK
));
405 req
->rq_state
|= RQ_LOCAL_PENDING
;
409 if (req
->rq_state
& RQ_WRITE
)
410 mdev
->writ_cnt
+= req
->i
.size
>> 9;
412 mdev
->read_cnt
+= req
->i
.size
>> 9;
414 req
->rq_state
|= (RQ_LOCAL_COMPLETED
|RQ_LOCAL_OK
);
415 req
->rq_state
&= ~RQ_LOCAL_PENDING
;
417 maybe_wakeup_conflicting_requests(req
);
418 req_may_be_completed_not_susp(req
, m
);
422 req
->rq_state
|= RQ_LOCAL_ABORTED
;
423 req_may_be_completed_not_susp(req
, m
);
426 case WRITE_COMPLETED_WITH_ERROR
:
427 req
->rq_state
|= RQ_LOCAL_COMPLETED
;
428 req
->rq_state
&= ~RQ_LOCAL_PENDING
;
430 __drbd_chk_io_error(mdev
, false);
431 maybe_wakeup_conflicting_requests(req
);
432 req_may_be_completed_not_susp(req
, m
);
435 case READ_AHEAD_COMPLETED_WITH_ERROR
:
436 /* it is legal to fail READA */
437 req
->rq_state
|= RQ_LOCAL_COMPLETED
;
438 req
->rq_state
&= ~RQ_LOCAL_PENDING
;
439 req_may_be_completed_not_susp(req
, m
);
442 case READ_COMPLETED_WITH_ERROR
:
443 drbd_set_out_of_sync(mdev
, req
->i
.sector
, req
->i
.size
);
445 req
->rq_state
|= RQ_LOCAL_COMPLETED
;
446 req
->rq_state
&= ~RQ_LOCAL_PENDING
;
448 D_ASSERT(!(req
->rq_state
& RQ_NET_MASK
));
450 __drbd_chk_io_error(mdev
, false);
451 req_may_be_completed_not_susp(req
, m
);
454 case QUEUE_FOR_NET_READ
:
455 /* READ or READA, and
457 * or target area marked as invalid,
458 * or just got an io-error. */
459 /* from __drbd_make_request
460 * or from bio_endio during read io-error recovery */
462 /* So we can verify the handle in the answer packet.
463 * Corresponding drbd_remove_request_interval is in
464 * req_may_be_completed() */
465 D_ASSERT(drbd_interval_empty(&req
->i
));
466 drbd_insert_interval(&mdev
->read_requests
, &req
->i
);
468 set_bit(UNPLUG_REMOTE
, &mdev
->flags
);
470 D_ASSERT(req
->rq_state
& RQ_NET_PENDING
);
471 D_ASSERT((req
->rq_state
& RQ_LOCAL_MASK
) == 0);
472 req
->rq_state
|= RQ_NET_QUEUED
;
473 req
->w
.cb
= w_send_read_req
;
474 drbd_queue_work(&mdev
->tconn
->sender_work
, &req
->w
);
477 case QUEUE_FOR_NET_WRITE
:
478 /* assert something? */
479 /* from __drbd_make_request only */
481 /* Corresponding drbd_remove_request_interval is in
482 * req_may_be_completed() */
483 D_ASSERT(drbd_interval_empty(&req
->i
));
484 drbd_insert_interval(&mdev
->write_requests
, &req
->i
);
487 * In case the req ended up on the transfer log before being
488 * queued on the worker, it could lead to this request being
489 * missed during cleanup after connection loss.
490 * So we have to do both operations here,
491 * within the same lock that protects the transfer log.
493 * _req_add_to_epoch(req); this has to be after the
494 * _maybe_start_new_epoch(req); which happened in
495 * __drbd_make_request, because we now may set the bit
496 * again ourselves to close the current epoch.
498 * Add req to the (now) current epoch (barrier). */
500 /* otherwise we may lose an unplug, which may cause some remote
501 * io-scheduler timeout to expire, increasing maximum latency,
502 * hurting performance. */
503 set_bit(UNPLUG_REMOTE
, &mdev
->flags
);
505 /* queue work item to send data */
506 D_ASSERT(req
->rq_state
& RQ_NET_PENDING
);
507 req
->rq_state
|= RQ_NET_QUEUED
;
508 req
->w
.cb
= w_send_dblock
;
509 drbd_queue_work(&mdev
->tconn
->sender_work
, &req
->w
);
511 /* close the epoch, in case it outgrew the limit */
513 nc
= rcu_dereference(mdev
->tconn
->net_conf
);
514 p
= nc
->max_epoch_size
;
516 if (mdev
->tconn
->current_tle_writes
>= p
)
517 start_new_tl_epoch(mdev
->tconn
);
521 case QUEUE_FOR_SEND_OOS
:
522 req
->rq_state
|= RQ_NET_QUEUED
;
523 req
->w
.cb
= w_send_out_of_sync
;
524 drbd_queue_work(&mdev
->tconn
->sender_work
, &req
->w
);
527 case READ_RETRY_REMOTE_CANCELED
:
530 /* real cleanup will be done from tl_clear. just update flags
531 * so it is no longer marked as on the worker queue */
532 req
->rq_state
&= ~RQ_NET_QUEUED
;
533 /* if we did it right, tl_clear should be scheduled only after
534 * this, so this should not be necessary! */
535 req_may_be_completed_not_susp(req
, m
);
538 case HANDED_OVER_TO_NETWORK
:
539 /* assert something? */
540 if (bio_data_dir(req
->master_bio
) == WRITE
)
541 atomic_add(req
->i
.size
>> 9, &mdev
->ap_in_flight
);
543 if (bio_data_dir(req
->master_bio
) == WRITE
&&
544 !(req
->rq_state
& (RQ_EXP_RECEIVE_ACK
| RQ_EXP_WRITE_ACK
))) {
545 /* this is what is dangerous about protocol A:
546 * pretend it was successfully written on the peer. */
547 if (req
->rq_state
& RQ_NET_PENDING
) {
548 dec_ap_pending(mdev
);
549 req
->rq_state
&= ~RQ_NET_PENDING
;
550 req
->rq_state
|= RQ_NET_OK
;
551 } /* else: neg-ack was faster... */
552 /* it is still not yet RQ_NET_DONE until the
553 * corresponding epoch barrier got acked as well,
554 * so we know what to dirty on connection loss */
556 req
->rq_state
&= ~RQ_NET_QUEUED
;
557 req
->rq_state
|= RQ_NET_SENT
;
558 req_may_be_completed_not_susp(req
, m
);
561 case OOS_HANDED_TO_NETWORK
:
562 /* Was not set PENDING, no longer QUEUED, so is now DONE
563 * as far as this connection is concerned. */
564 req
->rq_state
&= ~RQ_NET_QUEUED
;
565 req
->rq_state
|= RQ_NET_DONE
;
566 req_may_be_completed_not_susp(req
, m
);
569 case CONNECTION_LOST_WHILE_PENDING
:
570 /* transfer log cleanup after connection loss */
571 /* assert something? */
572 if (req
->rq_state
& RQ_NET_PENDING
)
573 dec_ap_pending(mdev
);
575 p
= !(req
->rq_state
& RQ_WRITE
) && req
->rq_state
& RQ_NET_PENDING
;
577 req
->rq_state
&= ~(RQ_NET_OK
|RQ_NET_PENDING
);
578 req
->rq_state
|= RQ_NET_DONE
;
579 if (req
->rq_state
& RQ_NET_SENT
&& req
->rq_state
& RQ_WRITE
)
580 atomic_sub(req
->i
.size
>> 9, &mdev
->ap_in_flight
);
582 req_may_be_completed(req
, m
); /* Allowed while state.susp */
586 /* for discarded conflicting writes of multiple primaries,
587 * there is no need to keep anything in the tl, potential
588 * node crashes are covered by the activity log. */
589 req
->rq_state
|= RQ_NET_DONE
;
591 case WRITE_ACKED_BY_PEER_AND_SIS
:
592 case WRITE_ACKED_BY_PEER
:
593 if (what
== WRITE_ACKED_BY_PEER_AND_SIS
)
594 req
->rq_state
|= RQ_NET_SIS
;
595 D_ASSERT(req
->rq_state
& RQ_EXP_WRITE_ACK
);
596 /* protocol C; successfully written on peer.
597 * Nothing more to do here.
598 * We want to keep the tl in place for all protocols, to cater
599 * for volatile write-back caches on lower level devices. */
602 case RECV_ACKED_BY_PEER
:
603 D_ASSERT(req
->rq_state
& RQ_EXP_RECEIVE_ACK
);
604 /* protocol B; pretends to be successfully written on peer.
605 * see also notes above in HANDED_OVER_TO_NETWORK about
608 req
->rq_state
|= RQ_NET_OK
;
609 D_ASSERT(req
->rq_state
& RQ_NET_PENDING
);
610 dec_ap_pending(mdev
);
611 atomic_sub(req
->i
.size
>> 9, &mdev
->ap_in_flight
);
612 req
->rq_state
&= ~RQ_NET_PENDING
;
613 maybe_wakeup_conflicting_requests(req
);
614 req_may_be_completed_not_susp(req
, m
);
618 D_ASSERT(req
->rq_state
& RQ_EXP_WRITE_ACK
);
619 /* If this node has already detected the write conflict, the
620 * worker will be waiting on misc_wait. Wake it up once this
621 * request has completed locally.
623 D_ASSERT(req
->rq_state
& RQ_NET_PENDING
);
624 req
->rq_state
|= RQ_POSTPONED
;
625 maybe_wakeup_conflicting_requests(req
);
626 req_may_be_completed_not_susp(req
, m
);
630 /* assert something? */
631 if (req
->rq_state
& RQ_NET_PENDING
) {
632 dec_ap_pending(mdev
);
633 if (req
->rq_state
& RQ_WRITE
)
634 atomic_sub(req
->i
.size
>> 9, &mdev
->ap_in_flight
);
636 req
->rq_state
&= ~(RQ_NET_OK
|RQ_NET_PENDING
);
638 req
->rq_state
|= RQ_NET_DONE
;
640 maybe_wakeup_conflicting_requests(req
);
641 req_may_be_completed_not_susp(req
, m
);
642 /* else: done by HANDED_OVER_TO_NETWORK */
645 case FAIL_FROZEN_DISK_IO
:
646 if (!(req
->rq_state
& RQ_LOCAL_COMPLETED
))
649 req_may_be_completed(req
, m
); /* Allowed while state.susp */
652 case RESTART_FROZEN_DISK_IO
:
653 if (!(req
->rq_state
& RQ_LOCAL_COMPLETED
))
656 req
->rq_state
&= ~RQ_LOCAL_COMPLETED
;
659 if (bio_data_dir(req
->master_bio
) == WRITE
)
663 req
->w
.cb
= w_restart_disk_io
;
664 drbd_queue_work(&mdev
->tconn
->sender_work
, &req
->w
);
668 /* If RQ_NET_OK is already set, we got a P_WRITE_ACK or P_RECV_ACK
669 before the connection loss (B&C only); only P_BARRIER_ACK was missing.
670 Throwing them out of the TL here by pretending we got a BARRIER_ACK.
671 During connection handshake, we ensure that the peer was not rebooted. */
672 if (!(req
->rq_state
& RQ_NET_OK
)) {
674 /* w.cb expected to be w_send_dblock, or w_send_read_req */
675 drbd_queue_work(&mdev
->tconn
->sender_work
, &req
->w
);
676 rv
= req
->rq_state
& RQ_WRITE
? MR_WRITE
: MR_READ
;
680 /* else, fall through to BARRIER_ACKED */
683 if (!(req
->rq_state
& RQ_WRITE
))
686 if (req
->rq_state
& RQ_NET_PENDING
) {
687 /* barrier came in before all requests were acked.
688 * this is bad, because if the connection is lost now,
689 * we won't be able to clean them up... */
690 dev_err(DEV
, "FIXME (BARRIER_ACKED but pending)\n");
692 if ((req
->rq_state
& RQ_NET_MASK
) != 0) {
693 req
->rq_state
|= RQ_NET_DONE
;
694 if (!(req
->rq_state
& (RQ_EXP_RECEIVE_ACK
| RQ_EXP_WRITE_ACK
)))
695 atomic_sub(req
->i
.size
>>9, &mdev
->ap_in_flight
);
697 req_may_be_done(req
); /* Allowed while state.susp */
701 D_ASSERT(req
->rq_state
& RQ_NET_PENDING
);
702 dec_ap_pending(mdev
);
703 req
->rq_state
&= ~RQ_NET_PENDING
;
704 req
->rq_state
|= (RQ_NET_OK
|RQ_NET_DONE
);
705 req_may_be_completed_not_susp(req
, m
);
712 /* we may do a local read if:
713 * - we are consistent (of course),
714 * - or we are generally inconsistent,
715 * BUT we are still/already IN SYNC for this area.
716 * since size may be bigger than BM_BLOCK_SIZE,
717 * we may need to check several bits.
719 static bool drbd_may_do_local_read(struct drbd_conf
*mdev
, sector_t sector
, int size
)
721 unsigned long sbnr
, ebnr
;
722 sector_t esector
, nr_sectors
;
724 if (mdev
->state
.disk
== D_UP_TO_DATE
)
726 if (mdev
->state
.disk
!= D_INCONSISTENT
)
728 esector
= sector
+ (size
>> 9) - 1;
729 nr_sectors
= drbd_get_capacity(mdev
->this_bdev
);
730 D_ASSERT(sector
< nr_sectors
);
731 D_ASSERT(esector
< nr_sectors
);
733 sbnr
= BM_SECT_TO_BIT(sector
);
734 ebnr
= BM_SECT_TO_BIT(esector
);
736 return drbd_bm_count_bits(mdev
, sbnr
, ebnr
) == 0;
739 static bool remote_due_to_read_balancing(struct drbd_conf
*mdev
, sector_t sector
,
740 enum drbd_read_balancing rbm
)
742 struct backing_dev_info
*bdi
;
746 case RB_CONGESTED_REMOTE
:
747 bdi
= &mdev
->ldev
->backing_bdev
->bd_disk
->queue
->backing_dev_info
;
748 return bdi_read_congested(bdi
);
749 case RB_LEAST_PENDING
:
750 return atomic_read(&mdev
->local_cnt
) >
751 atomic_read(&mdev
->ap_pending_cnt
) + atomic_read(&mdev
->rs_pending_cnt
);
752 case RB_32K_STRIPING
: /* stripe_shift = 15 */
753 case RB_64K_STRIPING
:
754 case RB_128K_STRIPING
:
755 case RB_256K_STRIPING
:
756 case RB_512K_STRIPING
:
757 case RB_1M_STRIPING
: /* stripe_shift = 20 */
758 stripe_shift
= (rbm
- RB_32K_STRIPING
+ 15);
759 return (sector
>> (stripe_shift
- 9)) & 1;
761 return test_and_change_bit(READ_BALANCE_RR
, &mdev
->flags
);
762 case RB_PREFER_REMOTE
:
764 case RB_PREFER_LOCAL
:
771 * complete_conflicting_writes - wait for any conflicting write requests
773 * The write_requests tree contains all active write requests which we
774 * currently know about. Wait for any requests to complete which conflict with
777 * Only way out: remove the conflicting intervals from the tree.
779 static void complete_conflicting_writes(struct drbd_request
*req
)
782 struct drbd_conf
*mdev
= req
->w
.mdev
;
783 struct drbd_interval
*i
;
784 sector_t sector
= req
->i
.sector
;
785 int size
= req
->i
.size
;
787 i
= drbd_find_overlap(&mdev
->write_requests
, sector
, size
);
792 prepare_to_wait(&mdev
->misc_wait
, &wait
, TASK_UNINTERRUPTIBLE
);
793 i
= drbd_find_overlap(&mdev
->write_requests
, sector
, size
);
796 /* Indicate to wake up device->misc_wait on progress. */
798 spin_unlock_irq(&mdev
->tconn
->req_lock
);
800 spin_lock_irq(&mdev
->tconn
->req_lock
);
802 finish_wait(&mdev
->misc_wait
, &wait
);
805 /* called within req_lock and rcu_read_lock() */
806 static bool conn_check_congested(struct drbd_conf
*mdev
)
808 struct drbd_tconn
*tconn
= mdev
->tconn
;
810 bool congested
= false;
811 enum drbd_on_congestion on_congestion
;
813 nc
= rcu_dereference(tconn
->net_conf
);
814 on_congestion
= nc
? nc
->on_congestion
: OC_BLOCK
;
815 if (on_congestion
== OC_BLOCK
||
816 tconn
->agreed_pro_version
< 96)
820 atomic_read(&mdev
->ap_in_flight
) >= nc
->cong_fill
) {
821 dev_info(DEV
, "Congestion-fill threshold reached\n");
825 if (mdev
->act_log
->used
>= nc
->cong_extents
) {
826 dev_info(DEV
, "Congestion-extents threshold reached\n");
831 if (mdev
->tconn
->current_tle_writes
)
832 /* start a new epoch for non-mirrored writes */
833 start_new_tl_epoch(mdev
->tconn
);
835 if (on_congestion
== OC_PULL_AHEAD
)
836 _drbd_set_state(_NS(mdev
, conn
, C_AHEAD
), 0, NULL
);
837 else /*nc->on_congestion == OC_DISCONNECT */
838 _drbd_set_state(_NS(mdev
, conn
, C_DISCONNECTING
), 0, NULL
);
844 /* If this returns false, and req->private_bio is still set,
845 * this should be submitted locally.
847 * If it returns false, but req->private_bio is not set,
848 * we do not have access to good data :(
850 * Otherwise, this destroys req->private_bio, if any,
853 static bool do_remote_read(struct drbd_request
*req
)
855 struct drbd_conf
*mdev
= req
->w
.mdev
;
856 enum drbd_read_balancing rbm
;
858 if (req
->private_bio
) {
859 if (!drbd_may_do_local_read(mdev
,
860 req
->i
.sector
, req
->i
.size
)) {
861 bio_put(req
->private_bio
);
862 req
->private_bio
= NULL
;
867 if (mdev
->state
.pdsk
!= D_UP_TO_DATE
)
870 /* TODO: improve read balancing decisions, take into account drbd
871 * protocol, pending requests etc. */
874 rbm
= rcu_dereference(mdev
->ldev
->disk_conf
)->read_balancing
;
877 if (rbm
== RB_PREFER_LOCAL
&& req
->private_bio
)
878 return false; /* submit locally */
880 if (req
->private_bio
== NULL
)
883 if (remote_due_to_read_balancing(mdev
, req
->i
.sector
, rbm
)) {
884 if (req
->private_bio
) {
885 bio_put(req
->private_bio
);
886 req
->private_bio
= NULL
;
895 /* returns number of connections (== 1, for drbd 8.4)
896 * expected to actually write this data,
897 * which does NOT include those that we are L_AHEAD for. */
898 static int drbd_process_write_request(struct drbd_request
*req
)
900 struct drbd_conf
*mdev
= req
->w
.mdev
;
901 int remote
, send_oos
;
904 remote
= drbd_should_do_remote(mdev
->state
);
906 conn_check_congested(mdev
);
907 remote
= drbd_should_do_remote(mdev
->state
);
909 send_oos
= drbd_should_send_out_of_sync(mdev
->state
);
912 if (!remote
&& !send_oos
)
915 D_ASSERT(!(remote
&& send_oos
));
918 _req_mod(req
, TO_BE_SENT
);
919 _req_mod(req
, QUEUE_FOR_NET_WRITE
);
920 } else if (drbd_set_out_of_sync(mdev
, req
->i
.sector
, req
->i
.size
))
921 _req_mod(req
, QUEUE_FOR_SEND_OOS
);
927 drbd_submit_req_private_bio(struct drbd_request
*req
)
929 struct drbd_conf
*mdev
= req
->w
.mdev
;
930 struct bio
*bio
= req
->private_bio
;
931 const int rw
= bio_rw(bio
);
933 bio
->bi_bdev
= mdev
->ldev
->backing_bdev
;
935 /* State may have changed since we grabbed our reference on the
936 * ->ldev member. Double check, and short-circuit to endio.
937 * In case the last activity log transaction failed to get on
938 * stable storage, and this is a WRITE, we may not even submit
940 if (get_ldev(mdev
)) {
941 if (drbd_insert_fault(mdev
,
942 rw
== WRITE
? DRBD_FAULT_DT_WR
943 : rw
== READ
? DRBD_FAULT_DT_RD
945 bio_endio(bio
, -EIO
);
947 generic_make_request(bio
);
950 bio_endio(bio
, -EIO
);
953 void __drbd_make_request(struct drbd_conf
*mdev
, struct bio
*bio
, unsigned long start_time
)
955 const int rw
= bio_rw(bio
);
956 struct bio_and_error m
= { NULL
, };
957 struct drbd_request
*req
;
958 bool no_remote
= false;
960 /* allocate outside of all locks; */
961 req
= drbd_req_new(mdev
, bio
);
964 /* only pass the error to the upper layers.
965 * if user cannot handle io errors, that's not our business. */
966 dev_err(DEV
, "could not kmalloc() req\n");
967 bio_endio(bio
, -ENOMEM
);
970 req
->start_time
= start_time
;
972 if (!get_ldev(mdev
)) {
973 bio_put(req
->private_bio
);
974 req
->private_bio
= NULL
;
977 /* For WRITES going to the local disk, grab a reference on the target
978 * extent. This waits for any resync activity in the corresponding
979 * resync extent to finish, and, if necessary, pulls in the target
980 * extent into the activity log, which involves further disk io because
981 * of transactional on-disk meta data updates. */
982 if (rw
== WRITE
&& req
->private_bio
983 && !test_bit(AL_SUSPENDED
, &mdev
->flags
)) {
984 req
->rq_state
|= RQ_IN_ACT_LOG
;
985 drbd_al_begin_io(mdev
, &req
->i
);
988 spin_lock_irq(&mdev
->tconn
->req_lock
);
990 /* This may temporarily give up the req_lock,
991 * but will re-aquire it before it returns here.
992 * Needs to be before the check on drbd_suspended() */
993 complete_conflicting_writes(req
);
996 /* no more giving up req_lock from now on! */
998 if (drbd_suspended(mdev
)) {
999 /* push back and retry: */
1000 req
->rq_state
|= RQ_POSTPONED
;
1001 if (req
->private_bio
) {
1002 bio_put(req
->private_bio
);
1003 req
->private_bio
= NULL
;
1008 /* Update disk stats */
1009 _drbd_start_io_acct(mdev
, req
, bio
);
1011 /* We fail READ/READA early, if we can not serve it.
1012 * We must do this before req is registered on any lists.
1013 * Otherwise, req_may_be_completed() will queue failed READ for retry. */
1015 if (!do_remote_read(req
) && !req
->private_bio
)
1019 /* which transfer log epoch does this belong to? */
1020 req
->epoch
= atomic_read(&mdev
->tconn
->current_tle_nr
);
1022 mdev
->tconn
->current_tle_writes
++;
1024 list_add_tail(&req
->tl_requests
, &mdev
->tconn
->transfer_log
);
1027 if (!drbd_process_write_request(req
))
1030 /* We either have a private_bio, or we can read from remote.
1031 * Otherwise we had done the goto nodata above. */
1032 if (req
->private_bio
== NULL
) {
1033 _req_mod(req
, TO_BE_SENT
);
1034 _req_mod(req
, QUEUE_FOR_NET_READ
);
1039 if (req
->private_bio
) {
1040 /* needs to be marked within the same spinlock */
1041 _req_mod(req
, TO_BE_SUBMITTED
);
1042 /* but we need to give up the spinlock to submit */
1043 spin_unlock_irq(&mdev
->tconn
->req_lock
);
1044 drbd_submit_req_private_bio(req
);
1045 /* once we have submitted, we must no longer look at req,
1046 * it may already be destroyed. */
1048 } else if (no_remote
) {
1050 if (__ratelimit(&drbd_ratelimit_state
))
1051 dev_err(DEV
, "IO ERROR: neither local nor remote disk\n");
1052 /* A write may have been queued for send_oos, however.
1053 * So we can not simply free it, we must go through req_may_be_completed() */
1057 req_may_be_completed(req
, &m
);
1058 spin_unlock_irq(&mdev
->tconn
->req_lock
);
1061 complete_master_bio(mdev
, &m
);
1065 int drbd_make_request(struct request_queue
*q
, struct bio
*bio
)
1067 struct drbd_conf
*mdev
= (struct drbd_conf
*) q
->queuedata
;
1068 unsigned long start_time
;
1070 start_time
= jiffies
;
1073 * what we "blindly" assume:
1075 D_ASSERT(bio
->bi_size
> 0);
1076 D_ASSERT(IS_ALIGNED(bio
->bi_size
, 512));
1079 __drbd_make_request(mdev
, bio
, start_time
);
1084 /* This is called by bio_add_page().
1086 * q->max_hw_sectors and other global limits are already enforced there.
1088 * We need to call down to our lower level device,
1089 * in case it has special restrictions.
1091 * We also may need to enforce configured max-bio-bvecs limits.
1093 * As long as the BIO is empty we have to allow at least one bvec,
1094 * regardless of size and offset, so no need to ask lower levels.
1096 int drbd_merge_bvec(struct request_queue
*q
, struct bvec_merge_data
*bvm
, struct bio_vec
*bvec
)
1098 struct drbd_conf
*mdev
= (struct drbd_conf
*) q
->queuedata
;
1099 unsigned int bio_size
= bvm
->bi_size
;
1100 int limit
= DRBD_MAX_BIO_SIZE
;
1103 if (bio_size
&& get_ldev(mdev
)) {
1104 struct request_queue
* const b
=
1105 mdev
->ldev
->backing_bdev
->bd_disk
->queue
;
1106 if (b
->merge_bvec_fn
) {
1107 backing_limit
= b
->merge_bvec_fn(b
, bvm
, bvec
);
1108 limit
= min(limit
, backing_limit
);
1115 struct drbd_request
*find_oldest_request(struct drbd_tconn
*tconn
)
1117 /* Walk the transfer log,
1118 * and find the oldest not yet completed request */
1119 struct drbd_request
*r
;
1120 list_for_each_entry(r
, &tconn
->transfer_log
, tl_requests
) {
1121 if (atomic_read(&r
->completion_ref
))
1127 void request_timer_fn(unsigned long data
)
1129 struct drbd_conf
*mdev
= (struct drbd_conf
*) data
;
1130 struct drbd_tconn
*tconn
= mdev
->tconn
;
1131 struct drbd_request
*req
; /* oldest request */
1132 struct net_conf
*nc
;
1133 unsigned long ent
= 0, dt
= 0, et
, nt
; /* effective timeout = ko_count * timeout */
1137 nc
= rcu_dereference(tconn
->net_conf
);
1138 if (nc
&& mdev
->state
.conn
>= C_WF_REPORT_PARAMS
)
1139 ent
= nc
->timeout
* HZ
/10 * nc
->ko_count
;
1141 if (get_ldev(mdev
)) { /* implicit state.disk >= D_INCONSISTENT */
1142 dt
= rcu_dereference(mdev
->ldev
->disk_conf
)->disk_timeout
* HZ
/ 10;
1147 et
= min_not_zero(dt
, ent
);
1150 return; /* Recurring timer stopped */
1154 spin_lock_irq(&tconn
->req_lock
);
1155 req
= find_oldest_request(tconn
);
1157 spin_unlock_irq(&tconn
->req_lock
);
1158 mod_timer(&mdev
->request_timer
, now
+ et
);
1162 /* The request is considered timed out, if
1163 * - we have some effective timeout from the configuration,
1164 * with above state restrictions applied,
1165 * - the oldest request is waiting for a response from the network
1166 * resp. the local disk,
1167 * - the oldest request is in fact older than the effective timeout,
1168 * - the connection was established (resp. disk was attached)
1169 * for longer than the timeout already.
1170 * Note that for 32bit jiffies and very stable connections/disks,
1171 * we may have a wrap around, which is catched by
1172 * !time_in_range(now, last_..._jif, last_..._jif + timeout).
1174 * Side effect: once per 32bit wrap-around interval, which means every
1175 * ~198 days with 250 HZ, we have a window where the timeout would need
1176 * to expire twice (worst case) to become effective. Good enough.
1178 if (ent
&& req
->rq_state
& RQ_NET_PENDING
&&
1179 time_after(now
, req
->start_time
+ ent
) &&
1180 !time_in_range(now
, tconn
->last_reconnect_jif
, tconn
->last_reconnect_jif
+ ent
)) {
1181 dev_warn(DEV
, "Remote failed to finish a request within ko-count * timeout\n");
1182 _drbd_set_state(_NS(mdev
, conn
, C_TIMEOUT
), CS_VERBOSE
| CS_HARD
, NULL
);
1184 if (dt
&& req
->rq_state
& RQ_LOCAL_PENDING
&& req
->w
.mdev
== mdev
&&
1185 time_after(now
, req
->start_time
+ dt
) &&
1186 !time_in_range(now
, mdev
->last_reattach_jif
, mdev
->last_reattach_jif
+ dt
)) {
1187 dev_warn(DEV
, "Local backing device failed to meet the disk-timeout\n");
1188 __drbd_chk_io_error(mdev
, 1);
1190 nt
= (time_after(now
, req
->start_time
+ et
) ? now
: req
->start_time
) + et
;
1191 spin_unlock_irq(&tconn
->req_lock
);
1192 mod_timer(&mdev
->request_timer
, nt
);