]> git.proxmox.com Git - ceph.git/blob - ceph/src/librbd/io/ImageRequestWQ.cc
4cc8e0148dd19ab1d57825abb0df9c000b602c7b
[ceph.git] / ceph / src / librbd / io / ImageRequestWQ.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "librbd/io/ImageRequestWQ.h"
5 #include "common/errno.h"
6 #include "common/zipkin_trace.h"
7 #include "librbd/ExclusiveLock.h"
8 #include "librbd/ImageCtx.h"
9 #include "librbd/ImageState.h"
10 #include "librbd/internal.h"
11 #include "librbd/Utils.h"
12 #include "librbd/exclusive_lock/Policy.h"
13 #include "librbd/io/AioCompletion.h"
14 #include "librbd/io/ImageRequest.h"
15
16 #define dout_subsys ceph_subsys_rbd
17 #undef dout_prefix
18 #define dout_prefix *_dout << "librbd::io::ImageRequestWQ: " << this \
19 << " " << __func__ << ": "
20
21 namespace librbd {
22 namespace io {
23
24 template <typename I>
25 struct ImageRequestWQ<I>::C_AcquireLock : public Context {
26 ImageRequestWQ *work_queue;
27 ImageRequest<I> *image_request;
28
29 C_AcquireLock(ImageRequestWQ *work_queue, ImageRequest<I> *image_request)
30 : work_queue(work_queue), image_request(image_request) {
31 }
32
33 void finish(int r) override {
34 work_queue->handle_acquire_lock(r, image_request);
35 }
36 };
37
38 template <typename I>
39 struct ImageRequestWQ<I>::C_BlockedWrites : public Context {
40 ImageRequestWQ *work_queue;
41 C_BlockedWrites(ImageRequestWQ *_work_queue)
42 : work_queue(_work_queue) {
43 }
44
45 void finish(int r) override {
46 work_queue->handle_blocked_writes(r);
47 }
48 };
49
50 template <typename I>
51 struct ImageRequestWQ<I>::C_RefreshFinish : public Context {
52 ImageRequestWQ *work_queue;
53 ImageRequest<I> *image_request;
54
55 C_RefreshFinish(ImageRequestWQ *work_queue,
56 ImageRequest<I> *image_request)
57 : work_queue(work_queue), image_request(image_request) {
58 }
59 void finish(int r) override {
60 work_queue->handle_refreshed(r, image_request);
61 }
62 };
63
64 template <typename I>
65 ImageRequestWQ<I>::ImageRequestWQ(I *image_ctx, const string &name,
66 time_t ti, ThreadPool *tp)
67 : ThreadPool::PointerWQ<ImageRequest<I> >(name, ti, 0, tp),
68 m_image_ctx(*image_ctx),
69 m_lock(util::unique_lock_name("ImageRequestWQ<I>::m_lock", this)) {
70 CephContext *cct = m_image_ctx.cct;
71 ldout(cct, 5) << "ictx=" << image_ctx << dendl;
72 this->register_work_queue();
73 }
74
75 template <typename I>
76 ssize_t ImageRequestWQ<I>::read(uint64_t off, uint64_t len,
77 ReadResult &&read_result, int op_flags) {
78 CephContext *cct = m_image_ctx.cct;
79 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
80 << "len = " << len << dendl;
81
82 C_SaferCond cond;
83 AioCompletion *c = AioCompletion::create(&cond);
84 aio_read(c, off, len, std::move(read_result), op_flags, false);
85 return cond.wait();
86 }
87
88 template <typename I>
89 ssize_t ImageRequestWQ<I>::write(uint64_t off, uint64_t len,
90 bufferlist &&bl, int op_flags) {
91 CephContext *cct = m_image_ctx.cct;
92 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
93 << "len = " << len << dendl;
94
95 m_image_ctx.snap_lock.get_read();
96 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
97 m_image_ctx.snap_lock.put_read();
98 if (r < 0) {
99 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
100 return r;
101 }
102
103 C_SaferCond cond;
104 AioCompletion *c = AioCompletion::create(&cond);
105 aio_write(c, off, len, std::move(bl), op_flags, false);
106
107 r = cond.wait();
108 if (r < 0) {
109 return r;
110 }
111 return len;
112 }
113
114 template <typename I>
115 ssize_t ImageRequestWQ<I>::discard(uint64_t off, uint64_t len,
116 bool skip_partial_discard) {
117 CephContext *cct = m_image_ctx.cct;
118 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
119 << "len = " << len << dendl;
120
121 m_image_ctx.snap_lock.get_read();
122 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
123 m_image_ctx.snap_lock.put_read();
124 if (r < 0) {
125 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
126 return r;
127 }
128
129 C_SaferCond cond;
130 AioCompletion *c = AioCompletion::create(&cond);
131 aio_discard(c, off, len, skip_partial_discard, false);
132
133 r = cond.wait();
134 if (r < 0) {
135 return r;
136 }
137 return len;
138 }
139
140 template <typename I>
141 ssize_t ImageRequestWQ<I>::writesame(uint64_t off, uint64_t len,
142 bufferlist &&bl, int op_flags) {
143 CephContext *cct = m_image_ctx.cct;
144 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
145 << "len = " << len << ", data_len " << bl.length() << dendl;
146
147 m_image_ctx.snap_lock.get_read();
148 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
149 m_image_ctx.snap_lock.put_read();
150 if (r < 0) {
151 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
152 return r;
153 }
154
155 C_SaferCond cond;
156 AioCompletion *c = AioCompletion::create(&cond);
157 aio_writesame(c, off, len, std::move(bl), op_flags, false);
158
159 r = cond.wait();
160 if (r < 0) {
161 return r;
162 }
163 return len;
164 }
165
166 template <typename I>
167 ssize_t ImageRequestWQ<I>::compare_and_write(uint64_t off, uint64_t len,
168 bufferlist &&cmp_bl,
169 bufferlist &&bl,
170 uint64_t *mismatch_off,
171 int op_flags){
172 CephContext *cct = m_image_ctx.cct;
173 ldout(cct, 20) << "compare_and_write ictx=" << &m_image_ctx << ", off="
174 << off << ", " << "len = " << len << dendl;
175
176 m_image_ctx.snap_lock.get_read();
177 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
178 m_image_ctx.snap_lock.put_read();
179 if (r < 0) {
180 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
181 return r;
182 }
183
184 C_SaferCond cond;
185 AioCompletion *c = AioCompletion::create(&cond);
186 aio_compare_and_write(c, off, len, std::move(cmp_bl), std::move(bl),
187 mismatch_off, op_flags, false);
188
189 r = cond.wait();
190 if (r < 0) {
191 return r;
192 }
193
194 return len;
195 }
196
197 template <typename I>
198 void ImageRequestWQ<I>::aio_read(AioCompletion *c, uint64_t off, uint64_t len,
199 ReadResult &&read_result, int op_flags,
200 bool native_async) {
201 CephContext *cct = m_image_ctx.cct;
202 ZTracer::Trace trace;
203 if (cct->_conf->rbd_blkin_trace_all) {
204 trace.init("wq: read", &m_image_ctx.trace_endpoint);
205 trace.event("start");
206 }
207
208 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_READ);
209 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
210 << "completion=" << c << ", off=" << off << ", "
211 << "len=" << len << ", " << "flags=" << op_flags << dendl;
212
213 if (native_async && m_image_ctx.event_socket.is_valid()) {
214 c->set_event_notify(true);
215 }
216
217 if (!start_in_flight_io(c)) {
218 return;
219 }
220
221 // if journaling is enabled -- we need to replay the journal because
222 // it might contain an uncommitted write
223 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
224 if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty() ||
225 require_lock_on_read()) {
226 queue(ImageRequest<I>::create_read_request(
227 m_image_ctx, c, {{off, len}}, std::move(read_result), op_flags,
228 trace));
229 } else {
230 c->start_op();
231 ImageRequest<I>::aio_read(&m_image_ctx, c, {{off, len}},
232 std::move(read_result), op_flags, trace);
233 finish_in_flight_io();
234 }
235 trace.event("finish");
236 }
237
238 template <typename I>
239 void ImageRequestWQ<I>::aio_write(AioCompletion *c, uint64_t off, uint64_t len,
240 bufferlist &&bl, int op_flags,
241 bool native_async) {
242 CephContext *cct = m_image_ctx.cct;
243 ZTracer::Trace trace;
244 if (cct->_conf->rbd_blkin_trace_all) {
245 trace.init("wq: write", &m_image_ctx.trace_endpoint);
246 trace.event("init");
247 }
248
249 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITE);
250 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
251 << "completion=" << c << ", off=" << off << ", "
252 << "len=" << len << ", flags=" << op_flags << dendl;
253
254 if (native_async && m_image_ctx.event_socket.is_valid()) {
255 c->set_event_notify(true);
256 }
257
258 if (!start_in_flight_io(c)) {
259 return;
260 }
261
262 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
263 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
264 queue(ImageRequest<I>::create_write_request(
265 m_image_ctx, c, {{off, len}}, std::move(bl), op_flags, trace));
266 } else {
267 c->start_op();
268 ImageRequest<I>::aio_write(&m_image_ctx, c, {{off, len}},
269 std::move(bl), op_flags, trace);
270 finish_in_flight_io();
271 }
272 trace.event("finish");
273 }
274
275 template <typename I>
276 void ImageRequestWQ<I>::aio_discard(AioCompletion *c, uint64_t off,
277 uint64_t len, bool skip_partial_discard,
278 bool native_async) {
279 CephContext *cct = m_image_ctx.cct;
280 ZTracer::Trace trace;
281 if (cct->_conf->rbd_blkin_trace_all) {
282 trace.init("wq: discard", &m_image_ctx.trace_endpoint);
283 trace.event("init");
284 }
285
286 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_DISCARD);
287 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
288 << "completion=" << c << ", off=" << off << ", len=" << len
289 << dendl;
290
291 if (native_async && m_image_ctx.event_socket.is_valid()) {
292 c->set_event_notify(true);
293 }
294
295 if (!start_in_flight_io(c)) {
296 return;
297 }
298
299 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
300 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
301 queue(ImageRequest<I>::create_discard_request(
302 m_image_ctx, c, off, len, skip_partial_discard, trace));
303 } else {
304 c->start_op();
305 ImageRequest<I>::aio_discard(&m_image_ctx, c, off, len,
306 skip_partial_discard, trace);
307 finish_in_flight_io();
308 }
309 trace.event("finish");
310 }
311
312 template <typename I>
313 void ImageRequestWQ<I>::aio_flush(AioCompletion *c, bool native_async) {
314 CephContext *cct = m_image_ctx.cct;
315 ZTracer::Trace trace;
316 if (cct->_conf->rbd_blkin_trace_all) {
317 trace.init("wq: flush", &m_image_ctx.trace_endpoint);
318 trace.event("init");
319 }
320
321 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_FLUSH);
322 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
323 << "completion=" << c << dendl;
324
325 if (native_async && m_image_ctx.event_socket.is_valid()) {
326 c->set_event_notify(true);
327 }
328
329 if (!start_in_flight_io(c)) {
330 return;
331 }
332
333 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
334 if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty()) {
335 queue(ImageRequest<I>::create_flush_request(m_image_ctx, c, trace));
336 } else {
337 ImageRequest<I>::aio_flush(&m_image_ctx, c, trace);
338 finish_in_flight_io();
339 }
340 trace.event("finish");
341 }
342
343 template <typename I>
344 void ImageRequestWQ<I>::aio_writesame(AioCompletion *c, uint64_t off,
345 uint64_t len, bufferlist &&bl,
346 int op_flags, bool native_async) {
347 CephContext *cct = m_image_ctx.cct;
348 ZTracer::Trace trace;
349 if (cct->_conf->rbd_blkin_trace_all) {
350 trace.init("wq: writesame", &m_image_ctx.trace_endpoint);
351 trace.event("init");
352 }
353
354 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITESAME);
355 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
356 << "completion=" << c << ", off=" << off << ", "
357 << "len=" << len << ", data_len = " << bl.length() << ", "
358 << "flags=" << op_flags << dendl;
359
360 if (native_async && m_image_ctx.event_socket.is_valid()) {
361 c->set_event_notify(true);
362 }
363
364 if (!start_in_flight_io(c)) {
365 return;
366 }
367
368 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
369 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
370 queue(ImageRequest<I>::create_writesame_request(
371 m_image_ctx, c, off, len, std::move(bl), op_flags, trace));
372 } else {
373 c->start_op();
374 ImageRequest<I>::aio_writesame(&m_image_ctx, c, off, len, std::move(bl),
375 op_flags, trace);
376 finish_in_flight_io();
377 }
378 trace.event("finish");
379 }
380
381 template <typename I>
382 void ImageRequestWQ<I>::aio_compare_and_write(AioCompletion *c,
383 uint64_t off, uint64_t len,
384 bufferlist &&cmp_bl,
385 bufferlist &&bl,
386 uint64_t *mismatch_off,
387 int op_flags, bool native_async) {
388 CephContext *cct = m_image_ctx.cct;
389 ZTracer::Trace trace;
390 if (cct->_conf->rbd_blkin_trace_all) {
391 trace.init("wq: compare_and_write", &m_image_ctx.trace_endpoint);
392 trace.event("init");
393 }
394
395 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_COMPARE_AND_WRITE);
396 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
397 << "completion=" << c << ", off=" << off << ", "
398 << "len=" << len << dendl;
399
400 if (native_async && m_image_ctx.event_socket.is_valid()) {
401 c->set_event_notify(true);
402 }
403
404 if (!start_in_flight_io(c)) {
405 return;
406 }
407
408 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
409 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
410 queue(ImageRequest<I>::create_compare_and_write_request(
411 m_image_ctx, c, {{off, len}}, std::move(cmp_bl), std::move(bl),
412 mismatch_off, op_flags, trace));
413 } else {
414 c->start_op();
415 ImageRequest<I>::aio_compare_and_write(&m_image_ctx, c, {{off, len}},
416 std::move(cmp_bl), std::move(bl),
417 mismatch_off, op_flags, trace);
418 finish_in_flight_io();
419 }
420 trace.event("finish");
421 }
422
423 template <typename I>
424 void ImageRequestWQ<I>::shut_down(Context *on_shutdown) {
425 assert(m_image_ctx.owner_lock.is_locked());
426
427 {
428 RWLock::WLocker locker(m_lock);
429 assert(!m_shutdown);
430 m_shutdown = true;
431
432 CephContext *cct = m_image_ctx.cct;
433 ldout(cct, 5) << __func__ << ": in_flight=" << m_in_flight_ios.load()
434 << dendl;
435 if (m_in_flight_ios > 0) {
436 m_on_shutdown = on_shutdown;
437 return;
438 }
439 }
440
441 // ensure that all in-flight IO is flushed
442 m_image_ctx.flush(on_shutdown);
443 }
444
445 template <typename I>
446 int ImageRequestWQ<I>::block_writes() {
447 C_SaferCond cond_ctx;
448 block_writes(&cond_ctx);
449 return cond_ctx.wait();
450 }
451
452 template <typename I>
453 void ImageRequestWQ<I>::block_writes(Context *on_blocked) {
454 assert(m_image_ctx.owner_lock.is_locked());
455 CephContext *cct = m_image_ctx.cct;
456
457 {
458 RWLock::WLocker locker(m_lock);
459 ++m_write_blockers;
460 ldout(cct, 5) << &m_image_ctx << ", " << "num="
461 << m_write_blockers << dendl;
462 if (!m_write_blocker_contexts.empty() || m_in_flight_writes > 0) {
463 m_write_blocker_contexts.push_back(on_blocked);
464 return;
465 }
466 }
467
468 // ensure that all in-flight IO is flushed
469 m_image_ctx.flush(on_blocked);
470 }
471
472 template <typename I>
473 void ImageRequestWQ<I>::unblock_writes() {
474 CephContext *cct = m_image_ctx.cct;
475
476 bool wake_up = false;
477 {
478 RWLock::WLocker locker(m_lock);
479 assert(m_write_blockers > 0);
480 --m_write_blockers;
481
482 ldout(cct, 5) << &m_image_ctx << ", " << "num="
483 << m_write_blockers << dendl;
484 if (m_write_blockers == 0) {
485 wake_up = true;
486 }
487 }
488
489 if (wake_up) {
490 this->signal();
491 }
492 }
493
494 template <typename I>
495 void ImageRequestWQ<I>::set_require_lock(Direction direction, bool enabled) {
496 CephContext *cct = m_image_ctx.cct;
497 ldout(cct, 20) << dendl;
498
499 bool wake_up = false;
500 {
501 RWLock::WLocker locker(m_lock);
502 switch (direction) {
503 case DIRECTION_READ:
504 wake_up = (enabled != m_require_lock_on_read);
505 m_require_lock_on_read = enabled;
506 break;
507 case DIRECTION_WRITE:
508 wake_up = (enabled != m_require_lock_on_write);
509 m_require_lock_on_write = enabled;
510 break;
511 case DIRECTION_BOTH:
512 wake_up = (enabled != m_require_lock_on_read ||
513 enabled != m_require_lock_on_write);
514 m_require_lock_on_read = enabled;
515 m_require_lock_on_write = enabled;
516 break;
517 }
518 }
519
520 // wake up the thread pool whenever the state changes so that
521 // we can re-request the lock if required
522 if (wake_up) {
523 this->signal();
524 }
525 }
526
527 template <typename I>
528 void *ImageRequestWQ<I>::_void_dequeue() {
529 CephContext *cct = m_image_ctx.cct;
530 ImageRequest<I> *peek_item = this->front();
531
532 // no queued IO requests or all IO is blocked/stalled
533 if (peek_item == nullptr || m_io_blockers.load() > 0) {
534 return nullptr;
535 }
536
537 bool lock_required;
538 bool refresh_required = m_image_ctx.state->is_refresh_required();
539 {
540 RWLock::RLocker locker(m_lock);
541 bool write_op = peek_item->is_write_op();
542 lock_required = is_lock_required(write_op);
543 if (write_op) {
544 if (!lock_required && m_write_blockers > 0) {
545 // missing lock is not the write blocker
546 return nullptr;
547 }
548
549 if (!lock_required && !refresh_required) {
550 // completed ops will requeue the IO -- don't count it as in-progress
551 m_in_flight_writes++;
552 }
553 }
554 }
555
556 ImageRequest<I> *item = reinterpret_cast<ImageRequest<I> *>(
557 ThreadPool::PointerWQ<ImageRequest<I> >::_void_dequeue());
558 assert(peek_item == item);
559
560 if (lock_required) {
561 this->get_pool_lock().Unlock();
562 m_image_ctx.owner_lock.get_read();
563 if (m_image_ctx.exclusive_lock != nullptr) {
564 ldout(cct, 5) << "exclusive lock required: delaying IO " << item << dendl;
565 if (!m_image_ctx.get_exclusive_lock_policy()->may_auto_request_lock()) {
566 lderr(cct) << "op requires exclusive lock" << dendl;
567 fail_in_flight_io(-EROFS, item);
568
569 // wake up the IO since we won't be returning a request to process
570 this->signal();
571 } else {
572 // stall IO until the acquire completes
573 ++m_io_blockers;
574 m_image_ctx.exclusive_lock->acquire_lock(new C_AcquireLock(this, item));
575 }
576 } else {
577 // raced with the exclusive lock being disabled
578 lock_required = false;
579 }
580 m_image_ctx.owner_lock.put_read();
581 this->get_pool_lock().Lock();
582
583 if (lock_required) {
584 return nullptr;
585 }
586 }
587
588 if (refresh_required) {
589 ldout(cct, 5) << "image refresh required: delaying IO " << item << dendl;
590
591 // stall IO until the refresh completes
592 ++m_io_blockers;
593
594 this->get_pool_lock().Unlock();
595 m_image_ctx.state->refresh(new C_RefreshFinish(this, item));
596 this->get_pool_lock().Lock();
597 return nullptr;
598 }
599
600 item->start_op();
601 return item;
602 }
603
604 template <typename I>
605 void ImageRequestWQ<I>::process(ImageRequest<I> *req) {
606 CephContext *cct = m_image_ctx.cct;
607 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
608 << "req=" << req << dendl;
609
610 req->send();
611
612 finish_queued_io(req);
613 if (req->is_write_op()) {
614 finish_in_flight_write();
615 }
616 delete req;
617
618 finish_in_flight_io();
619 }
620
621 template <typename I>
622 void ImageRequestWQ<I>::finish_queued_io(ImageRequest<I> *req) {
623 RWLock::RLocker locker(m_lock);
624 if (req->is_write_op()) {
625 assert(m_queued_writes > 0);
626 m_queued_writes--;
627 } else {
628 assert(m_queued_reads > 0);
629 m_queued_reads--;
630 }
631 }
632
633 template <typename I>
634 void ImageRequestWQ<I>::finish_in_flight_write() {
635 bool writes_blocked = false;
636 {
637 RWLock::RLocker locker(m_lock);
638 assert(m_in_flight_writes > 0);
639 if (--m_in_flight_writes == 0 &&
640 !m_write_blocker_contexts.empty()) {
641 writes_blocked = true;
642 }
643 }
644
645 if (writes_blocked) {
646 m_image_ctx.flush(new C_BlockedWrites(this));
647 }
648 }
649
650 template <typename I>
651 int ImageRequestWQ<I>::start_in_flight_io(AioCompletion *c) {
652 RWLock::RLocker locker(m_lock);
653
654 if (m_shutdown) {
655 CephContext *cct = m_image_ctx.cct;
656 lderr(cct) << "IO received on closed image" << dendl;
657
658 c->get();
659 c->fail(-ESHUTDOWN);
660 return false;
661 }
662
663 m_in_flight_ios++;
664 return true;
665 }
666
667 template <typename I>
668 void ImageRequestWQ<I>::finish_in_flight_io() {
669 Context *on_shutdown;
670 {
671 RWLock::RLocker locker(m_lock);
672 if (--m_in_flight_ios > 0 || !m_shutdown) {
673 return;
674 }
675 on_shutdown = m_on_shutdown;
676 }
677
678 CephContext *cct = m_image_ctx.cct;
679 ldout(cct, 5) << "completing shut down" << dendl;
680
681 assert(on_shutdown != nullptr);
682 m_image_ctx.flush(on_shutdown);
683 }
684
685 template <typename I>
686 void ImageRequestWQ<I>::fail_in_flight_io(int r, ImageRequest<I> *req) {
687 this->process_finish();
688 req->fail(r);
689 finish_queued_io(req);
690 delete req;
691 finish_in_flight_io();
692 }
693
694 template <typename I>
695 bool ImageRequestWQ<I>::is_lock_required(bool write_op) const {
696 assert(m_lock.is_locked());
697 return ((write_op && m_require_lock_on_write) ||
698 (!write_op && m_require_lock_on_read));
699 }
700
701 template <typename I>
702 void ImageRequestWQ<I>::queue(ImageRequest<I> *req) {
703 assert(m_image_ctx.owner_lock.is_locked());
704
705 CephContext *cct = m_image_ctx.cct;
706 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
707 << "req=" << req << dendl;
708
709 if (req->is_write_op()) {
710 m_queued_writes++;
711 } else {
712 m_queued_reads++;
713 }
714
715 ThreadPool::PointerWQ<ImageRequest<I> >::queue(req);
716 }
717
718 template <typename I>
719 void ImageRequestWQ<I>::handle_acquire_lock(int r, ImageRequest<I> *req) {
720 CephContext *cct = m_image_ctx.cct;
721 ldout(cct, 5) << "r=" << r << ", " << "req=" << req << dendl;
722
723 if (r < 0) {
724 fail_in_flight_io(r, req);
725 } else {
726 // since IO was stalled for acquire -- original IO order is preserved
727 // if we requeue this op for work queue processing
728 this->requeue(req);
729 }
730
731 assert(m_io_blockers.load() > 0);
732 --m_io_blockers;
733 this->signal();
734 }
735
736 template <typename I>
737 void ImageRequestWQ<I>::handle_refreshed(int r, ImageRequest<I> *req) {
738 CephContext *cct = m_image_ctx.cct;
739 ldout(cct, 5) << "resuming IO after image refresh: r=" << r << ", "
740 << "req=" << req << dendl;
741 if (r < 0) {
742 fail_in_flight_io(r, req);
743 } else {
744 // since IO was stalled for refresh -- original IO order is preserved
745 // if we requeue this op for work queue processing
746 this->requeue(req);
747 }
748
749 assert(m_io_blockers.load() > 0);
750 --m_io_blockers;
751 this->signal();
752 }
753
754 template <typename I>
755 void ImageRequestWQ<I>::handle_blocked_writes(int r) {
756 Contexts contexts;
757 {
758 RWLock::WLocker locker(m_lock);
759 contexts.swap(m_write_blocker_contexts);
760 }
761
762 for (auto ctx : contexts) {
763 ctx->complete(0);
764 }
765 }
766
767 template class librbd::io::ImageRequestWQ<librbd::ImageCtx>;
768
769 } // namespace io
770 } // namespace librbd