]> git.proxmox.com Git - mirror_ubuntu-zesty-kernel.git/blob - drivers/block/drbd/drbd_req.c
drbd: introduce completion_ref and kref to struct drbd_request
[mirror_ubuntu-zesty-kernel.git] / drivers / block / drbd / drbd_req.c
1 /*
2 drbd_req.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
26 #include <linux/module.h>
27
28 #include <linux/slab.h>
29 #include <linux/drbd.h>
30 #include "drbd_int.h"
31 #include "drbd_req.h"
32
33
34 static bool drbd_may_do_local_read(struct drbd_conf *mdev, sector_t sector, int size);
35
36 /* Update disk stats at start of I/O request */
37 static void _drbd_start_io_acct(struct drbd_conf *mdev, struct drbd_request *req, struct bio *bio)
38 {
39 const int rw = bio_data_dir(bio);
40 int cpu;
41 cpu = part_stat_lock();
42 part_round_stats(cpu, &mdev->vdisk->part0);
43 part_stat_inc(cpu, &mdev->vdisk->part0, ios[rw]);
44 part_stat_add(cpu, &mdev->vdisk->part0, sectors[rw], bio_sectors(bio));
45 (void) cpu; /* The macro invocations above want the cpu argument, I do not like
46 the compiler warning about cpu only assigned but never used... */
47 part_inc_in_flight(&mdev->vdisk->part0, rw);
48 part_stat_unlock();
49 }
50
51 /* Update disk stats when completing request upwards */
52 static void _drbd_end_io_acct(struct drbd_conf *mdev, struct drbd_request *req)
53 {
54 int rw = bio_data_dir(req->master_bio);
55 unsigned long duration = jiffies - req->start_time;
56 int cpu;
57 cpu = part_stat_lock();
58 part_stat_add(cpu, &mdev->vdisk->part0, ticks[rw], duration);
59 part_round_stats(cpu, &mdev->vdisk->part0);
60 part_dec_in_flight(&mdev->vdisk->part0, rw);
61 part_stat_unlock();
62 }
63
64 static struct drbd_request *drbd_req_new(struct drbd_conf *mdev,
65 struct bio *bio_src)
66 {
67 struct drbd_request *req;
68
69 req = mempool_alloc(drbd_request_mempool, GFP_NOIO);
70 if (!req)
71 return NULL;
72
73 drbd_req_make_private_bio(req, bio_src);
74 req->rq_state = bio_data_dir(bio_src) == WRITE ? RQ_WRITE : 0;
75 req->w.mdev = mdev;
76 req->master_bio = bio_src;
77 req->epoch = 0;
78
79 drbd_clear_interval(&req->i);
80 req->i.sector = bio_src->bi_sector;
81 req->i.size = bio_src->bi_size;
82 req->i.local = true;
83 req->i.waiting = false;
84
85 INIT_LIST_HEAD(&req->tl_requests);
86 INIT_LIST_HEAD(&req->w.list);
87
88 atomic_set(&req->completion_ref, 1);
89 kref_init(&req->kref);
90 return req;
91 }
92
93 static void drbd_req_destroy(struct kref *kref)
94 {
95 struct drbd_request *req = container_of(kref, struct drbd_request, kref);
96 struct drbd_conf *mdev = req->w.mdev;
97 const unsigned long s = req->rq_state;
98
99 /* remove it from the transfer log.
100 * well, only if it had been there in the first
101 * place... if it had not (local only or conflicting
102 * and never sent), it should still be "empty" as
103 * initialized in drbd_req_new(), so we can list_del() it
104 * here unconditionally */
105 list_del_init(&req->tl_requests);
106
107 /* if it was a write, we may have to set the corresponding
108 * bit(s) out-of-sync first. If it had a local part, we need to
109 * release the reference to the activity log. */
110 if (s & RQ_WRITE) {
111 /* Set out-of-sync unless both OK flags are set
112 * (local only or remote failed).
113 * Other places where we set out-of-sync:
114 * READ with local io-error */
115 if (!(s & RQ_NET_OK) || !(s & RQ_LOCAL_OK))
116 drbd_set_out_of_sync(mdev, req->i.sector, req->i.size);
117
118 if ((s & RQ_NET_OK) && (s & RQ_LOCAL_OK) && (s & RQ_NET_SIS))
119 drbd_set_in_sync(mdev, req->i.sector, req->i.size);
120
121 /* one might be tempted to move the drbd_al_complete_io
122 * to the local io completion callback drbd_request_endio.
123 * but, if this was a mirror write, we may only
124 * drbd_al_complete_io after this is RQ_NET_DONE,
125 * otherwise the extent could be dropped from the al
126 * before it has actually been written on the peer.
127 * if we crash before our peer knows about the request,
128 * but after the extent has been dropped from the al,
129 * we would forget to resync the corresponding extent.
130 */
131 if (s & RQ_LOCAL_MASK) {
132 if (get_ldev_if_state(mdev, D_FAILED)) {
133 if (s & RQ_IN_ACT_LOG)
134 drbd_al_complete_io(mdev, &req->i);
135 put_ldev(mdev);
136 } else if (__ratelimit(&drbd_ratelimit_state)) {
137 dev_warn(DEV, "Should have called drbd_al_complete_io(, %llu, %u), "
138 "but my Disk seems to have failed :(\n",
139 (unsigned long long) req->i.sector, req->i.size);
140 }
141 }
142 }
143
144 if (s & RQ_POSTPONED)
145 drbd_restart_request(req);
146 else
147 mempool_free(req, drbd_request_mempool);
148 }
149
150 static void wake_all_senders(struct drbd_tconn *tconn) {
151 wake_up(&tconn->sender_work.q_wait);
152 }
153
154 /* must hold resource->req_lock */
155 static void start_new_tl_epoch(struct drbd_tconn *tconn)
156 {
157 tconn->current_tle_writes = 0;
158 atomic_inc(&tconn->current_tle_nr);
159 wake_all_senders(tconn);
160 }
161
162 void complete_master_bio(struct drbd_conf *mdev,
163 struct bio_and_error *m)
164 {
165 bio_endio(m->bio, m->error);
166 dec_ap_bio(mdev);
167 }
168
169
170 static void drbd_remove_request_interval(struct rb_root *root,
171 struct drbd_request *req)
172 {
173 struct drbd_conf *mdev = req->w.mdev;
174 struct drbd_interval *i = &req->i;
175
176 drbd_remove_interval(root, i);
177
178 /* Wake up any processes waiting for this request to complete. */
179 if (i->waiting)
180 wake_up(&mdev->misc_wait);
181 }
182
183 static void maybe_wakeup_conflicting_requests(struct drbd_request *req)
184 {
185 const unsigned long s = req->rq_state;
186 if (s & RQ_LOCAL_PENDING && !(s & RQ_LOCAL_ABORTED))
187 return;
188 if (req->i.waiting)
189 /* Retry all conflicting peer requests. */
190 wake_up(&req->w.mdev->misc_wait);
191 }
192
193 static
194 void req_may_be_done(struct drbd_request *req)
195 {
196 const unsigned long s = req->rq_state;
197
198 /* req->master_bio still present means: Not yet completed.
199 *
200 * Unless this is RQ_POSTPONED, which will cause drbd_req_destroy() to
201 * queue it on the retry workqueue instead of destroying it.
202 */
203 if (req->master_bio && !(s & RQ_POSTPONED))
204 return;
205
206 /* Local still pending, even though master_bio is already completed?
207 * may happen for RQ_LOCAL_ABORTED requests. */
208 if (s & RQ_LOCAL_PENDING)
209 return;
210
211 if ((s & RQ_NET_MASK) == 0 || (s & RQ_NET_DONE)) {
212 /* this is disconnected (local only) operation,
213 * or protocol A, B, or C P_BARRIER_ACK,
214 * or killed from the transfer log due to connection loss. */
215 kref_put(&req->kref, drbd_req_destroy);
216 }
217 /* else: network part and not DONE yet. that is
218 * protocol A, B, or C, barrier ack still pending... */
219 }
220
221 /* Helper for __req_mod().
222 * Set m->bio to the master bio, if it is fit to be completed,
223 * or leave it alone (it is initialized to NULL in __req_mod),
224 * if it has already been completed, or cannot be completed yet.
225 * If m->bio is set, the error status to be returned is placed in m->error.
226 */
227 static
228 void req_may_be_completed(struct drbd_request *req, struct bio_and_error *m)
229 {
230 const unsigned long s = req->rq_state;
231 struct drbd_conf *mdev = req->w.mdev;
232
233 /* we must not complete the master bio, while it is
234 * still being processed by _drbd_send_zc_bio (drbd_send_dblock)
235 * not yet acknowledged by the peer
236 * not yet completed by the local io subsystem
237 * these flags may get cleared in any order by
238 * the worker,
239 * the receiver,
240 * the bio_endio completion callbacks.
241 */
242 if (s & RQ_LOCAL_PENDING && !(s & RQ_LOCAL_ABORTED))
243 return;
244 if (s & RQ_NET_QUEUED)
245 return;
246 if (s & RQ_NET_PENDING)
247 return;
248
249 /* FIXME
250 * instead of all the RQ_FLAGS, actually use the completion_ref
251 * to decide if this is ready to be completed. */
252 if (req->master_bio) {
253 int complete = atomic_dec_and_test(&req->completion_ref);
254 D_ASSERT(complete != 0);
255 } else
256 D_ASSERT(atomic_read(&req->completion_ref) == 0);
257
258 if (req->master_bio) {
259 int rw = bio_rw(req->master_bio);
260
261 /* this is DATA_RECEIVED (remote read)
262 * or protocol C P_WRITE_ACK
263 * or protocol B P_RECV_ACK
264 * or protocol A "HANDED_OVER_TO_NETWORK" (SendAck)
265 * or canceled or failed,
266 * or killed from the transfer log due to connection loss.
267 */
268
269 /*
270 * figure out whether to report success or failure.
271 *
272 * report success when at least one of the operations succeeded.
273 * or, to put the other way,
274 * only report failure, when both operations failed.
275 *
276 * what to do about the failures is handled elsewhere.
277 * what we need to do here is just: complete the master_bio.
278 *
279 * local completion error, if any, has been stored as ERR_PTR
280 * in private_bio within drbd_request_endio.
281 */
282 int ok = (s & RQ_LOCAL_OK) || (s & RQ_NET_OK);
283 int error = PTR_ERR(req->private_bio);
284
285 /* remove the request from the conflict detection
286 * respective block_id verification hash */
287 if (!drbd_interval_empty(&req->i)) {
288 struct rb_root *root;
289
290 if (rw == WRITE)
291 root = &mdev->write_requests;
292 else
293 root = &mdev->read_requests;
294 drbd_remove_request_interval(root, req);
295 } else if (!(s & RQ_POSTPONED))
296 D_ASSERT((s & (RQ_NET_MASK & ~RQ_NET_DONE)) == 0);
297
298 /* Before we can signal completion to the upper layers,
299 * we may need to close the current transfer log epoch.
300 * We are within the request lock, so we can simply compare
301 * the request epoch number with the current transfer log
302 * epoch number. If they match, increase the current_tle_nr,
303 * and reset the transfer log epoch write_cnt.
304 */
305 if (rw == WRITE &&
306 req->epoch == atomic_read(&mdev->tconn->current_tle_nr))
307 start_new_tl_epoch(mdev->tconn);
308
309 /* Update disk stats */
310 _drbd_end_io_acct(mdev, req);
311
312 /* If READ failed,
313 * have it be pushed back to the retry work queue,
314 * so it will re-enter __drbd_make_request(),
315 * and be re-assigned to a suitable local or remote path,
316 * or failed if we do not have access to good data anymore.
317 *
318 * Unless it was failed early by __drbd_make_request(),
319 * because no path was available, in which case
320 * it was not even added to the transfer_log.
321 *
322 * READA may fail, and will not be retried.
323 *
324 * WRITE should have used all available paths already.
325 */
326 if (!ok && rw == READ && !list_empty(&req->tl_requests))
327 req->rq_state |= RQ_POSTPONED;
328
329 if (!(req->rq_state & RQ_POSTPONED)) {
330 m->error = ok ? 0 : (error ?: -EIO);
331 m->bio = req->master_bio;
332 req->master_bio = NULL;
333 } else {
334 /* Assert that this will be _req_is_done()
335 * with this very invokation. */
336 /* FIXME:
337 * what about (RQ_LOCAL_PENDING | RQ_LOCAL_ABORTED)?
338 */
339 D_ASSERT(!(s & RQ_LOCAL_PENDING));
340 D_ASSERT((s & RQ_NET_MASK) == 0 || (s & RQ_NET_DONE));
341 }
342 }
343 req_may_be_done(req);
344 }
345
346 static void req_may_be_completed_not_susp(struct drbd_request *req, struct bio_and_error *m)
347 {
348 struct drbd_conf *mdev = req->w.mdev;
349
350 if (!drbd_suspended(mdev))
351 req_may_be_completed(req, m);
352 }
353
354 /* obviously this could be coded as many single functions
355 * instead of one huge switch,
356 * or by putting the code directly in the respective locations
357 * (as it has been before).
358 *
359 * but having it this way
360 * enforces that it is all in this one place, where it is easier to audit,
361 * it makes it obvious that whatever "event" "happens" to a request should
362 * happen "atomically" within the req_lock,
363 * and it enforces that we have to think in a very structured manner
364 * about the "events" that may happen to a request during its life time ...
365 */
366 int __req_mod(struct drbd_request *req, enum drbd_req_event what,
367 struct bio_and_error *m)
368 {
369 struct drbd_conf *mdev = req->w.mdev;
370 struct net_conf *nc;
371 int p, rv = 0;
372
373 if (m)
374 m->bio = NULL;
375
376 switch (what) {
377 default:
378 dev_err(DEV, "LOGIC BUG in %s:%u\n", __FILE__ , __LINE__);
379 break;
380
381 /* does not happen...
382 * initialization done in drbd_req_new
383 case CREATED:
384 break;
385 */
386
387 case TO_BE_SENT: /* via network */
388 /* reached via __drbd_make_request
389 * and from w_read_retry_remote */
390 D_ASSERT(!(req->rq_state & RQ_NET_MASK));
391 req->rq_state |= RQ_NET_PENDING;
392 rcu_read_lock();
393 nc = rcu_dereference(mdev->tconn->net_conf);
394 p = nc->wire_protocol;
395 rcu_read_unlock();
396 req->rq_state |=
397 p == DRBD_PROT_C ? RQ_EXP_WRITE_ACK :
398 p == DRBD_PROT_B ? RQ_EXP_RECEIVE_ACK : 0;
399 inc_ap_pending(mdev);
400 break;
401
402 case TO_BE_SUBMITTED: /* locally */
403 /* reached via __drbd_make_request */
404 D_ASSERT(!(req->rq_state & RQ_LOCAL_MASK));
405 req->rq_state |= RQ_LOCAL_PENDING;
406 break;
407
408 case COMPLETED_OK:
409 if (req->rq_state & RQ_WRITE)
410 mdev->writ_cnt += req->i.size >> 9;
411 else
412 mdev->read_cnt += req->i.size >> 9;
413
414 req->rq_state |= (RQ_LOCAL_COMPLETED|RQ_LOCAL_OK);
415 req->rq_state &= ~RQ_LOCAL_PENDING;
416
417 maybe_wakeup_conflicting_requests(req);
418 req_may_be_completed_not_susp(req, m);
419 break;
420
421 case ABORT_DISK_IO:
422 req->rq_state |= RQ_LOCAL_ABORTED;
423 req_may_be_completed_not_susp(req, m);
424 break;
425
426 case WRITE_COMPLETED_WITH_ERROR:
427 req->rq_state |= RQ_LOCAL_COMPLETED;
428 req->rq_state &= ~RQ_LOCAL_PENDING;
429
430 __drbd_chk_io_error(mdev, false);
431 maybe_wakeup_conflicting_requests(req);
432 req_may_be_completed_not_susp(req, m);
433 break;
434
435 case READ_AHEAD_COMPLETED_WITH_ERROR:
436 /* it is legal to fail READA */
437 req->rq_state |= RQ_LOCAL_COMPLETED;
438 req->rq_state &= ~RQ_LOCAL_PENDING;
439 req_may_be_completed_not_susp(req, m);
440 break;
441
442 case READ_COMPLETED_WITH_ERROR:
443 drbd_set_out_of_sync(mdev, req->i.sector, req->i.size);
444
445 req->rq_state |= RQ_LOCAL_COMPLETED;
446 req->rq_state &= ~RQ_LOCAL_PENDING;
447
448 D_ASSERT(!(req->rq_state & RQ_NET_MASK));
449
450 __drbd_chk_io_error(mdev, false);
451 req_may_be_completed_not_susp(req, m);
452 break;
453
454 case QUEUE_FOR_NET_READ:
455 /* READ or READA, and
456 * no local disk,
457 * or target area marked as invalid,
458 * or just got an io-error. */
459 /* from __drbd_make_request
460 * or from bio_endio during read io-error recovery */
461
462 /* So we can verify the handle in the answer packet.
463 * Corresponding drbd_remove_request_interval is in
464 * req_may_be_completed() */
465 D_ASSERT(drbd_interval_empty(&req->i));
466 drbd_insert_interval(&mdev->read_requests, &req->i);
467
468 set_bit(UNPLUG_REMOTE, &mdev->flags);
469
470 D_ASSERT(req->rq_state & RQ_NET_PENDING);
471 D_ASSERT((req->rq_state & RQ_LOCAL_MASK) == 0);
472 req->rq_state |= RQ_NET_QUEUED;
473 req->w.cb = w_send_read_req;
474 drbd_queue_work(&mdev->tconn->sender_work, &req->w);
475 break;
476
477 case QUEUE_FOR_NET_WRITE:
478 /* assert something? */
479 /* from __drbd_make_request only */
480
481 /* Corresponding drbd_remove_request_interval is in
482 * req_may_be_completed() */
483 D_ASSERT(drbd_interval_empty(&req->i));
484 drbd_insert_interval(&mdev->write_requests, &req->i);
485
486 /* NOTE
487 * In case the req ended up on the transfer log before being
488 * queued on the worker, it could lead to this request being
489 * missed during cleanup after connection loss.
490 * So we have to do both operations here,
491 * within the same lock that protects the transfer log.
492 *
493 * _req_add_to_epoch(req); this has to be after the
494 * _maybe_start_new_epoch(req); which happened in
495 * __drbd_make_request, because we now may set the bit
496 * again ourselves to close the current epoch.
497 *
498 * Add req to the (now) current epoch (barrier). */
499
500 /* otherwise we may lose an unplug, which may cause some remote
501 * io-scheduler timeout to expire, increasing maximum latency,
502 * hurting performance. */
503 set_bit(UNPLUG_REMOTE, &mdev->flags);
504
505 /* queue work item to send data */
506 D_ASSERT(req->rq_state & RQ_NET_PENDING);
507 req->rq_state |= RQ_NET_QUEUED;
508 req->w.cb = w_send_dblock;
509 drbd_queue_work(&mdev->tconn->sender_work, &req->w);
510
511 /* close the epoch, in case it outgrew the limit */
512 rcu_read_lock();
513 nc = rcu_dereference(mdev->tconn->net_conf);
514 p = nc->max_epoch_size;
515 rcu_read_unlock();
516 if (mdev->tconn->current_tle_writes >= p)
517 start_new_tl_epoch(mdev->tconn);
518
519 break;
520
521 case QUEUE_FOR_SEND_OOS:
522 req->rq_state |= RQ_NET_QUEUED;
523 req->w.cb = w_send_out_of_sync;
524 drbd_queue_work(&mdev->tconn->sender_work, &req->w);
525 break;
526
527 case READ_RETRY_REMOTE_CANCELED:
528 case SEND_CANCELED:
529 case SEND_FAILED:
530 /* real cleanup will be done from tl_clear. just update flags
531 * so it is no longer marked as on the worker queue */
532 req->rq_state &= ~RQ_NET_QUEUED;
533 /* if we did it right, tl_clear should be scheduled only after
534 * this, so this should not be necessary! */
535 req_may_be_completed_not_susp(req, m);
536 break;
537
538 case HANDED_OVER_TO_NETWORK:
539 /* assert something? */
540 if (bio_data_dir(req->master_bio) == WRITE)
541 atomic_add(req->i.size >> 9, &mdev->ap_in_flight);
542
543 if (bio_data_dir(req->master_bio) == WRITE &&
544 !(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK))) {
545 /* this is what is dangerous about protocol A:
546 * pretend it was successfully written on the peer. */
547 if (req->rq_state & RQ_NET_PENDING) {
548 dec_ap_pending(mdev);
549 req->rq_state &= ~RQ_NET_PENDING;
550 req->rq_state |= RQ_NET_OK;
551 } /* else: neg-ack was faster... */
552 /* it is still not yet RQ_NET_DONE until the
553 * corresponding epoch barrier got acked as well,
554 * so we know what to dirty on connection loss */
555 }
556 req->rq_state &= ~RQ_NET_QUEUED;
557 req->rq_state |= RQ_NET_SENT;
558 req_may_be_completed_not_susp(req, m);
559 break;
560
561 case OOS_HANDED_TO_NETWORK:
562 /* Was not set PENDING, no longer QUEUED, so is now DONE
563 * as far as this connection is concerned. */
564 req->rq_state &= ~RQ_NET_QUEUED;
565 req->rq_state |= RQ_NET_DONE;
566 req_may_be_completed_not_susp(req, m);
567 break;
568
569 case CONNECTION_LOST_WHILE_PENDING:
570 /* transfer log cleanup after connection loss */
571 /* assert something? */
572 if (req->rq_state & RQ_NET_PENDING)
573 dec_ap_pending(mdev);
574
575 p = !(req->rq_state & RQ_WRITE) && req->rq_state & RQ_NET_PENDING;
576
577 req->rq_state &= ~(RQ_NET_OK|RQ_NET_PENDING);
578 req->rq_state |= RQ_NET_DONE;
579 if (req->rq_state & RQ_NET_SENT && req->rq_state & RQ_WRITE)
580 atomic_sub(req->i.size >> 9, &mdev->ap_in_flight);
581
582 req_may_be_completed(req, m); /* Allowed while state.susp */
583 break;
584
585 case DISCARD_WRITE:
586 /* for discarded conflicting writes of multiple primaries,
587 * there is no need to keep anything in the tl, potential
588 * node crashes are covered by the activity log. */
589 req->rq_state |= RQ_NET_DONE;
590 /* fall through */
591 case WRITE_ACKED_BY_PEER_AND_SIS:
592 case WRITE_ACKED_BY_PEER:
593 if (what == WRITE_ACKED_BY_PEER_AND_SIS)
594 req->rq_state |= RQ_NET_SIS;
595 D_ASSERT(req->rq_state & RQ_EXP_WRITE_ACK);
596 /* protocol C; successfully written on peer.
597 * Nothing more to do here.
598 * We want to keep the tl in place for all protocols, to cater
599 * for volatile write-back caches on lower level devices. */
600
601 goto ack_common;
602 case RECV_ACKED_BY_PEER:
603 D_ASSERT(req->rq_state & RQ_EXP_RECEIVE_ACK);
604 /* protocol B; pretends to be successfully written on peer.
605 * see also notes above in HANDED_OVER_TO_NETWORK about
606 * protocol != C */
607 ack_common:
608 req->rq_state |= RQ_NET_OK;
609 D_ASSERT(req->rq_state & RQ_NET_PENDING);
610 dec_ap_pending(mdev);
611 atomic_sub(req->i.size >> 9, &mdev->ap_in_flight);
612 req->rq_state &= ~RQ_NET_PENDING;
613 maybe_wakeup_conflicting_requests(req);
614 req_may_be_completed_not_susp(req, m);
615 break;
616
617 case POSTPONE_WRITE:
618 D_ASSERT(req->rq_state & RQ_EXP_WRITE_ACK);
619 /* If this node has already detected the write conflict, the
620 * worker will be waiting on misc_wait. Wake it up once this
621 * request has completed locally.
622 */
623 D_ASSERT(req->rq_state & RQ_NET_PENDING);
624 req->rq_state |= RQ_POSTPONED;
625 maybe_wakeup_conflicting_requests(req);
626 req_may_be_completed_not_susp(req, m);
627 break;
628
629 case NEG_ACKED:
630 /* assert something? */
631 if (req->rq_state & RQ_NET_PENDING) {
632 dec_ap_pending(mdev);
633 if (req->rq_state & RQ_WRITE)
634 atomic_sub(req->i.size >> 9, &mdev->ap_in_flight);
635 }
636 req->rq_state &= ~(RQ_NET_OK|RQ_NET_PENDING);
637
638 req->rq_state |= RQ_NET_DONE;
639
640 maybe_wakeup_conflicting_requests(req);
641 req_may_be_completed_not_susp(req, m);
642 /* else: done by HANDED_OVER_TO_NETWORK */
643 break;
644
645 case FAIL_FROZEN_DISK_IO:
646 if (!(req->rq_state & RQ_LOCAL_COMPLETED))
647 break;
648
649 req_may_be_completed(req, m); /* Allowed while state.susp */
650 break;
651
652 case RESTART_FROZEN_DISK_IO:
653 if (!(req->rq_state & RQ_LOCAL_COMPLETED))
654 break;
655
656 req->rq_state &= ~RQ_LOCAL_COMPLETED;
657
658 rv = MR_READ;
659 if (bio_data_dir(req->master_bio) == WRITE)
660 rv = MR_WRITE;
661
662 get_ldev(mdev);
663 req->w.cb = w_restart_disk_io;
664 drbd_queue_work(&mdev->tconn->sender_work, &req->w);
665 break;
666
667 case RESEND:
668 /* If RQ_NET_OK is already set, we got a P_WRITE_ACK or P_RECV_ACK
669 before the connection loss (B&C only); only P_BARRIER_ACK was missing.
670 Throwing them out of the TL here by pretending we got a BARRIER_ACK.
671 During connection handshake, we ensure that the peer was not rebooted. */
672 if (!(req->rq_state & RQ_NET_OK)) {
673 if (req->w.cb) {
674 /* w.cb expected to be w_send_dblock, or w_send_read_req */
675 drbd_queue_work(&mdev->tconn->sender_work, &req->w);
676 rv = req->rq_state & RQ_WRITE ? MR_WRITE : MR_READ;
677 }
678 break;
679 }
680 /* else, fall through to BARRIER_ACKED */
681
682 case BARRIER_ACKED:
683 if (!(req->rq_state & RQ_WRITE))
684 break;
685
686 if (req->rq_state & RQ_NET_PENDING) {
687 /* barrier came in before all requests were acked.
688 * this is bad, because if the connection is lost now,
689 * we won't be able to clean them up... */
690 dev_err(DEV, "FIXME (BARRIER_ACKED but pending)\n");
691 }
692 if ((req->rq_state & RQ_NET_MASK) != 0) {
693 req->rq_state |= RQ_NET_DONE;
694 if (!(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK)))
695 atomic_sub(req->i.size>>9, &mdev->ap_in_flight);
696 }
697 req_may_be_done(req); /* Allowed while state.susp */
698 break;
699
700 case DATA_RECEIVED:
701 D_ASSERT(req->rq_state & RQ_NET_PENDING);
702 dec_ap_pending(mdev);
703 req->rq_state &= ~RQ_NET_PENDING;
704 req->rq_state |= (RQ_NET_OK|RQ_NET_DONE);
705 req_may_be_completed_not_susp(req, m);
706 break;
707 };
708
709 return rv;
710 }
711
712 /* we may do a local read if:
713 * - we are consistent (of course),
714 * - or we are generally inconsistent,
715 * BUT we are still/already IN SYNC for this area.
716 * since size may be bigger than BM_BLOCK_SIZE,
717 * we may need to check several bits.
718 */
719 static bool drbd_may_do_local_read(struct drbd_conf *mdev, sector_t sector, int size)
720 {
721 unsigned long sbnr, ebnr;
722 sector_t esector, nr_sectors;
723
724 if (mdev->state.disk == D_UP_TO_DATE)
725 return true;
726 if (mdev->state.disk != D_INCONSISTENT)
727 return false;
728 esector = sector + (size >> 9) - 1;
729 nr_sectors = drbd_get_capacity(mdev->this_bdev);
730 D_ASSERT(sector < nr_sectors);
731 D_ASSERT(esector < nr_sectors);
732
733 sbnr = BM_SECT_TO_BIT(sector);
734 ebnr = BM_SECT_TO_BIT(esector);
735
736 return drbd_bm_count_bits(mdev, sbnr, ebnr) == 0;
737 }
738
739 static bool remote_due_to_read_balancing(struct drbd_conf *mdev, sector_t sector,
740 enum drbd_read_balancing rbm)
741 {
742 struct backing_dev_info *bdi;
743 int stripe_shift;
744
745 switch (rbm) {
746 case RB_CONGESTED_REMOTE:
747 bdi = &mdev->ldev->backing_bdev->bd_disk->queue->backing_dev_info;
748 return bdi_read_congested(bdi);
749 case RB_LEAST_PENDING:
750 return atomic_read(&mdev->local_cnt) >
751 atomic_read(&mdev->ap_pending_cnt) + atomic_read(&mdev->rs_pending_cnt);
752 case RB_32K_STRIPING: /* stripe_shift = 15 */
753 case RB_64K_STRIPING:
754 case RB_128K_STRIPING:
755 case RB_256K_STRIPING:
756 case RB_512K_STRIPING:
757 case RB_1M_STRIPING: /* stripe_shift = 20 */
758 stripe_shift = (rbm - RB_32K_STRIPING + 15);
759 return (sector >> (stripe_shift - 9)) & 1;
760 case RB_ROUND_ROBIN:
761 return test_and_change_bit(READ_BALANCE_RR, &mdev->flags);
762 case RB_PREFER_REMOTE:
763 return true;
764 case RB_PREFER_LOCAL:
765 default:
766 return false;
767 }
768 }
769
770 /*
771 * complete_conflicting_writes - wait for any conflicting write requests
772 *
773 * The write_requests tree contains all active write requests which we
774 * currently know about. Wait for any requests to complete which conflict with
775 * the new one.
776 *
777 * Only way out: remove the conflicting intervals from the tree.
778 */
779 static void complete_conflicting_writes(struct drbd_request *req)
780 {
781 DEFINE_WAIT(wait);
782 struct drbd_conf *mdev = req->w.mdev;
783 struct drbd_interval *i;
784 sector_t sector = req->i.sector;
785 int size = req->i.size;
786
787 i = drbd_find_overlap(&mdev->write_requests, sector, size);
788 if (!i)
789 return;
790
791 for (;;) {
792 prepare_to_wait(&mdev->misc_wait, &wait, TASK_UNINTERRUPTIBLE);
793 i = drbd_find_overlap(&mdev->write_requests, sector, size);
794 if (!i)
795 break;
796 /* Indicate to wake up device->misc_wait on progress. */
797 i->waiting = true;
798 spin_unlock_irq(&mdev->tconn->req_lock);
799 schedule();
800 spin_lock_irq(&mdev->tconn->req_lock);
801 }
802 finish_wait(&mdev->misc_wait, &wait);
803 }
804
805 /* called within req_lock and rcu_read_lock() */
806 static bool conn_check_congested(struct drbd_conf *mdev)
807 {
808 struct drbd_tconn *tconn = mdev->tconn;
809 struct net_conf *nc;
810 bool congested = false;
811 enum drbd_on_congestion on_congestion;
812
813 nc = rcu_dereference(tconn->net_conf);
814 on_congestion = nc ? nc->on_congestion : OC_BLOCK;
815 if (on_congestion == OC_BLOCK ||
816 tconn->agreed_pro_version < 96)
817 return false;
818
819 if (nc->cong_fill &&
820 atomic_read(&mdev->ap_in_flight) >= nc->cong_fill) {
821 dev_info(DEV, "Congestion-fill threshold reached\n");
822 congested = true;
823 }
824
825 if (mdev->act_log->used >= nc->cong_extents) {
826 dev_info(DEV, "Congestion-extents threshold reached\n");
827 congested = true;
828 }
829
830 if (congested) {
831 if (mdev->tconn->current_tle_writes)
832 /* start a new epoch for non-mirrored writes */
833 start_new_tl_epoch(mdev->tconn);
834
835 if (on_congestion == OC_PULL_AHEAD)
836 _drbd_set_state(_NS(mdev, conn, C_AHEAD), 0, NULL);
837 else /*nc->on_congestion == OC_DISCONNECT */
838 _drbd_set_state(_NS(mdev, conn, C_DISCONNECTING), 0, NULL);
839 }
840
841 return congested;
842 }
843
844 /* If this returns false, and req->private_bio is still set,
845 * this should be submitted locally.
846 *
847 * If it returns false, but req->private_bio is not set,
848 * we do not have access to good data :(
849 *
850 * Otherwise, this destroys req->private_bio, if any,
851 * and returns true.
852 */
853 static bool do_remote_read(struct drbd_request *req)
854 {
855 struct drbd_conf *mdev = req->w.mdev;
856 enum drbd_read_balancing rbm;
857
858 if (req->private_bio) {
859 if (!drbd_may_do_local_read(mdev,
860 req->i.sector, req->i.size)) {
861 bio_put(req->private_bio);
862 req->private_bio = NULL;
863 put_ldev(mdev);
864 }
865 }
866
867 if (mdev->state.pdsk != D_UP_TO_DATE)
868 return false;
869
870 /* TODO: improve read balancing decisions, take into account drbd
871 * protocol, pending requests etc. */
872
873 rcu_read_lock();
874 rbm = rcu_dereference(mdev->ldev->disk_conf)->read_balancing;
875 rcu_read_unlock();
876
877 if (rbm == RB_PREFER_LOCAL && req->private_bio)
878 return false; /* submit locally */
879
880 if (req->private_bio == NULL)
881 return true;
882
883 if (remote_due_to_read_balancing(mdev, req->i.sector, rbm)) {
884 if (req->private_bio) {
885 bio_put(req->private_bio);
886 req->private_bio = NULL;
887 put_ldev(mdev);
888 }
889 return true;
890 }
891
892 return false;
893 }
894
895 /* returns number of connections (== 1, for drbd 8.4)
896 * expected to actually write this data,
897 * which does NOT include those that we are L_AHEAD for. */
898 static int drbd_process_write_request(struct drbd_request *req)
899 {
900 struct drbd_conf *mdev = req->w.mdev;
901 int remote, send_oos;
902
903 rcu_read_lock();
904 remote = drbd_should_do_remote(mdev->state);
905 if (remote) {
906 conn_check_congested(mdev);
907 remote = drbd_should_do_remote(mdev->state);
908 }
909 send_oos = drbd_should_send_out_of_sync(mdev->state);
910 rcu_read_unlock();
911
912 if (!remote && !send_oos)
913 return 0;
914
915 D_ASSERT(!(remote && send_oos));
916
917 if (remote) {
918 _req_mod(req, TO_BE_SENT);
919 _req_mod(req, QUEUE_FOR_NET_WRITE);
920 } else if (drbd_set_out_of_sync(mdev, req->i.sector, req->i.size))
921 _req_mod(req, QUEUE_FOR_SEND_OOS);
922
923 return remote;
924 }
925
926 static void
927 drbd_submit_req_private_bio(struct drbd_request *req)
928 {
929 struct drbd_conf *mdev = req->w.mdev;
930 struct bio *bio = req->private_bio;
931 const int rw = bio_rw(bio);
932
933 bio->bi_bdev = mdev->ldev->backing_bdev;
934
935 /* State may have changed since we grabbed our reference on the
936 * ->ldev member. Double check, and short-circuit to endio.
937 * In case the last activity log transaction failed to get on
938 * stable storage, and this is a WRITE, we may not even submit
939 * this bio. */
940 if (get_ldev(mdev)) {
941 if (drbd_insert_fault(mdev,
942 rw == WRITE ? DRBD_FAULT_DT_WR
943 : rw == READ ? DRBD_FAULT_DT_RD
944 : DRBD_FAULT_DT_RA))
945 bio_endio(bio, -EIO);
946 else
947 generic_make_request(bio);
948 put_ldev(mdev);
949 } else
950 bio_endio(bio, -EIO);
951 }
952
953 void __drbd_make_request(struct drbd_conf *mdev, struct bio *bio, unsigned long start_time)
954 {
955 const int rw = bio_rw(bio);
956 struct bio_and_error m = { NULL, };
957 struct drbd_request *req;
958 bool no_remote = false;
959
960 /* allocate outside of all locks; */
961 req = drbd_req_new(mdev, bio);
962 if (!req) {
963 dec_ap_bio(mdev);
964 /* only pass the error to the upper layers.
965 * if user cannot handle io errors, that's not our business. */
966 dev_err(DEV, "could not kmalloc() req\n");
967 bio_endio(bio, -ENOMEM);
968 return;
969 }
970 req->start_time = start_time;
971
972 if (!get_ldev(mdev)) {
973 bio_put(req->private_bio);
974 req->private_bio = NULL;
975 }
976
977 /* For WRITES going to the local disk, grab a reference on the target
978 * extent. This waits for any resync activity in the corresponding
979 * resync extent to finish, and, if necessary, pulls in the target
980 * extent into the activity log, which involves further disk io because
981 * of transactional on-disk meta data updates. */
982 if (rw == WRITE && req->private_bio
983 && !test_bit(AL_SUSPENDED, &mdev->flags)) {
984 req->rq_state |= RQ_IN_ACT_LOG;
985 drbd_al_begin_io(mdev, &req->i);
986 }
987
988 spin_lock_irq(&mdev->tconn->req_lock);
989 if (rw == WRITE) {
990 /* This may temporarily give up the req_lock,
991 * but will re-aquire it before it returns here.
992 * Needs to be before the check on drbd_suspended() */
993 complete_conflicting_writes(req);
994 }
995
996 /* no more giving up req_lock from now on! */
997
998 if (drbd_suspended(mdev)) {
999 /* push back and retry: */
1000 req->rq_state |= RQ_POSTPONED;
1001 if (req->private_bio) {
1002 bio_put(req->private_bio);
1003 req->private_bio = NULL;
1004 }
1005 goto out;
1006 }
1007
1008 /* Update disk stats */
1009 _drbd_start_io_acct(mdev, req, bio);
1010
1011 /* We fail READ/READA early, if we can not serve it.
1012 * We must do this before req is registered on any lists.
1013 * Otherwise, req_may_be_completed() will queue failed READ for retry. */
1014 if (rw != WRITE) {
1015 if (!do_remote_read(req) && !req->private_bio)
1016 goto nodata;
1017 }
1018
1019 /* which transfer log epoch does this belong to? */
1020 req->epoch = atomic_read(&mdev->tconn->current_tle_nr);
1021 if (rw == WRITE)
1022 mdev->tconn->current_tle_writes++;
1023
1024 list_add_tail(&req->tl_requests, &mdev->tconn->transfer_log);
1025
1026 if (rw == WRITE) {
1027 if (!drbd_process_write_request(req))
1028 no_remote = true;
1029 } else {
1030 /* We either have a private_bio, or we can read from remote.
1031 * Otherwise we had done the goto nodata above. */
1032 if (req->private_bio == NULL) {
1033 _req_mod(req, TO_BE_SENT);
1034 _req_mod(req, QUEUE_FOR_NET_READ);
1035 } else
1036 no_remote = true;
1037 }
1038
1039 if (req->private_bio) {
1040 /* needs to be marked within the same spinlock */
1041 _req_mod(req, TO_BE_SUBMITTED);
1042 /* but we need to give up the spinlock to submit */
1043 spin_unlock_irq(&mdev->tconn->req_lock);
1044 drbd_submit_req_private_bio(req);
1045 /* once we have submitted, we must no longer look at req,
1046 * it may already be destroyed. */
1047 return;
1048 } else if (no_remote) {
1049 nodata:
1050 if (__ratelimit(&drbd_ratelimit_state))
1051 dev_err(DEV, "IO ERROR: neither local nor remote disk\n");
1052 /* A write may have been queued for send_oos, however.
1053 * So we can not simply free it, we must go through req_may_be_completed() */
1054 }
1055
1056 out:
1057 req_may_be_completed(req, &m);
1058 spin_unlock_irq(&mdev->tconn->req_lock);
1059
1060 if (m.bio)
1061 complete_master_bio(mdev, &m);
1062 return;
1063 }
1064
1065 int drbd_make_request(struct request_queue *q, struct bio *bio)
1066 {
1067 struct drbd_conf *mdev = (struct drbd_conf *) q->queuedata;
1068 unsigned long start_time;
1069
1070 start_time = jiffies;
1071
1072 /*
1073 * what we "blindly" assume:
1074 */
1075 D_ASSERT(bio->bi_size > 0);
1076 D_ASSERT(IS_ALIGNED(bio->bi_size, 512));
1077
1078 inc_ap_bio(mdev);
1079 __drbd_make_request(mdev, bio, start_time);
1080
1081 return 0;
1082 }
1083
1084 /* This is called by bio_add_page().
1085 *
1086 * q->max_hw_sectors and other global limits are already enforced there.
1087 *
1088 * We need to call down to our lower level device,
1089 * in case it has special restrictions.
1090 *
1091 * We also may need to enforce configured max-bio-bvecs limits.
1092 *
1093 * As long as the BIO is empty we have to allow at least one bvec,
1094 * regardless of size and offset, so no need to ask lower levels.
1095 */
1096 int drbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bvm, struct bio_vec *bvec)
1097 {
1098 struct drbd_conf *mdev = (struct drbd_conf *) q->queuedata;
1099 unsigned int bio_size = bvm->bi_size;
1100 int limit = DRBD_MAX_BIO_SIZE;
1101 int backing_limit;
1102
1103 if (bio_size && get_ldev(mdev)) {
1104 struct request_queue * const b =
1105 mdev->ldev->backing_bdev->bd_disk->queue;
1106 if (b->merge_bvec_fn) {
1107 backing_limit = b->merge_bvec_fn(b, bvm, bvec);
1108 limit = min(limit, backing_limit);
1109 }
1110 put_ldev(mdev);
1111 }
1112 return limit;
1113 }
1114
1115 struct drbd_request *find_oldest_request(struct drbd_tconn *tconn)
1116 {
1117 /* Walk the transfer log,
1118 * and find the oldest not yet completed request */
1119 struct drbd_request *r;
1120 list_for_each_entry(r, &tconn->transfer_log, tl_requests) {
1121 if (atomic_read(&r->completion_ref))
1122 return r;
1123 }
1124 return NULL;
1125 }
1126
1127 void request_timer_fn(unsigned long data)
1128 {
1129 struct drbd_conf *mdev = (struct drbd_conf *) data;
1130 struct drbd_tconn *tconn = mdev->tconn;
1131 struct drbd_request *req; /* oldest request */
1132 struct net_conf *nc;
1133 unsigned long ent = 0, dt = 0, et, nt; /* effective timeout = ko_count * timeout */
1134 unsigned long now;
1135
1136 rcu_read_lock();
1137 nc = rcu_dereference(tconn->net_conf);
1138 if (nc && mdev->state.conn >= C_WF_REPORT_PARAMS)
1139 ent = nc->timeout * HZ/10 * nc->ko_count;
1140
1141 if (get_ldev(mdev)) { /* implicit state.disk >= D_INCONSISTENT */
1142 dt = rcu_dereference(mdev->ldev->disk_conf)->disk_timeout * HZ / 10;
1143 put_ldev(mdev);
1144 }
1145 rcu_read_unlock();
1146
1147 et = min_not_zero(dt, ent);
1148
1149 if (!et)
1150 return; /* Recurring timer stopped */
1151
1152 now = jiffies;
1153
1154 spin_lock_irq(&tconn->req_lock);
1155 req = find_oldest_request(tconn);
1156 if (!req) {
1157 spin_unlock_irq(&tconn->req_lock);
1158 mod_timer(&mdev->request_timer, now + et);
1159 return;
1160 }
1161
1162 /* The request is considered timed out, if
1163 * - we have some effective timeout from the configuration,
1164 * with above state restrictions applied,
1165 * - the oldest request is waiting for a response from the network
1166 * resp. the local disk,
1167 * - the oldest request is in fact older than the effective timeout,
1168 * - the connection was established (resp. disk was attached)
1169 * for longer than the timeout already.
1170 * Note that for 32bit jiffies and very stable connections/disks,
1171 * we may have a wrap around, which is catched by
1172 * !time_in_range(now, last_..._jif, last_..._jif + timeout).
1173 *
1174 * Side effect: once per 32bit wrap-around interval, which means every
1175 * ~198 days with 250 HZ, we have a window where the timeout would need
1176 * to expire twice (worst case) to become effective. Good enough.
1177 */
1178 if (ent && req->rq_state & RQ_NET_PENDING &&
1179 time_after(now, req->start_time + ent) &&
1180 !time_in_range(now, tconn->last_reconnect_jif, tconn->last_reconnect_jif + ent)) {
1181 dev_warn(DEV, "Remote failed to finish a request within ko-count * timeout\n");
1182 _drbd_set_state(_NS(mdev, conn, C_TIMEOUT), CS_VERBOSE | CS_HARD, NULL);
1183 }
1184 if (dt && req->rq_state & RQ_LOCAL_PENDING && req->w.mdev == mdev &&
1185 time_after(now, req->start_time + dt) &&
1186 !time_in_range(now, mdev->last_reattach_jif, mdev->last_reattach_jif + dt)) {
1187 dev_warn(DEV, "Local backing device failed to meet the disk-timeout\n");
1188 __drbd_chk_io_error(mdev, 1);
1189 }
1190 nt = (time_after(now, req->start_time + et) ? now : req->start_time) + et;
1191 spin_unlock_irq(&tconn->req_lock);
1192 mod_timer(&mdev->request_timer, nt);
1193 }