]> git.proxmox.com Git - ceph.git/blame - ceph/src/librbd/io/ImageRequestWQ.cc
update sources to v12.1.1
[ceph.git] / ceph / src / librbd / io / ImageRequestWQ.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "librbd/io/ImageRequestWQ.h"
5#include "common/errno.h"
31f18b77 6#include "common/zipkin_trace.h"
7c673cae
FG
7#include "librbd/ExclusiveLock.h"
8#include "librbd/ImageCtx.h"
9#include "librbd/ImageState.h"
10#include "librbd/internal.h"
11#include "librbd/Utils.h"
12#include "librbd/exclusive_lock/Policy.h"
13#include "librbd/io/AioCompletion.h"
14#include "librbd/io/ImageRequest.h"
15
16#define dout_subsys ceph_subsys_rbd
17#undef dout_prefix
18#define dout_prefix *_dout << "librbd::io::ImageRequestWQ: " << this \
19 << " " << __func__ << ": "
20
21namespace librbd {
22namespace io {
23
224ce89b
WB
24template <typename I>
25struct ImageRequestWQ<I>::C_AcquireLock : public Context {
26 ImageRequestWQ *work_queue;
27 ImageRequest<I> *image_request;
28
29 C_AcquireLock(ImageRequestWQ *work_queue, ImageRequest<I> *image_request)
30 : work_queue(work_queue), image_request(image_request) {
31 }
32
33 void finish(int r) override {
34 work_queue->handle_acquire_lock(r, image_request);
35 }
36};
37
38template <typename I>
39struct ImageRequestWQ<I>::C_BlockedWrites : public Context {
40 ImageRequestWQ *work_queue;
41 C_BlockedWrites(ImageRequestWQ *_work_queue)
42 : work_queue(_work_queue) {
43 }
44
45 void finish(int r) override {
46 work_queue->handle_blocked_writes(r);
47 }
48};
49
50template <typename I>
51struct ImageRequestWQ<I>::C_RefreshFinish : public Context {
52 ImageRequestWQ *work_queue;
53 ImageRequest<I> *image_request;
54
55 C_RefreshFinish(ImageRequestWQ *work_queue,
56 ImageRequest<I> *image_request)
57 : work_queue(work_queue), image_request(image_request) {
58 }
59 void finish(int r) override {
60 work_queue->handle_refreshed(r, image_request);
61 }
62};
63
64template <typename I>
65ImageRequestWQ<I>::ImageRequestWQ(I *image_ctx, const string &name,
66 time_t ti, ThreadPool *tp)
67 : ThreadPool::PointerWQ<ImageRequest<I> >(name, ti, 0, tp),
7c673cae 68 m_image_ctx(*image_ctx),
224ce89b 69 m_lock(util::unique_lock_name("ImageRequestWQ<I>::m_lock", this)) {
7c673cae
FG
70 CephContext *cct = m_image_ctx.cct;
71 ldout(cct, 5) << "ictx=" << image_ctx << dendl;
224ce89b 72 this->register_work_queue();
7c673cae
FG
73}
74
224ce89b
WB
75template <typename I>
76ssize_t ImageRequestWQ<I>::read(uint64_t off, uint64_t len,
77 ReadResult &&read_result, int op_flags) {
7c673cae
FG
78 CephContext *cct = m_image_ctx.cct;
79 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
80 << "len = " << len << dendl;
81
82 C_SaferCond cond;
83 AioCompletion *c = AioCompletion::create(&cond);
84 aio_read(c, off, len, std::move(read_result), op_flags, false);
85 return cond.wait();
86}
87
224ce89b
WB
88template <typename I>
89ssize_t ImageRequestWQ<I>::write(uint64_t off, uint64_t len,
90 bufferlist &&bl, int op_flags) {
7c673cae
FG
91 CephContext *cct = m_image_ctx.cct;
92 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
93 << "len = " << len << dendl;
94
95 m_image_ctx.snap_lock.get_read();
96 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
97 m_image_ctx.snap_lock.put_read();
98 if (r < 0) {
99 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
100 return r;
101 }
102
103 C_SaferCond cond;
104 AioCompletion *c = AioCompletion::create(&cond);
105 aio_write(c, off, len, std::move(bl), op_flags, false);
106
107 r = cond.wait();
108 if (r < 0) {
109 return r;
110 }
111 return len;
112}
113
224ce89b
WB
114template <typename I>
115ssize_t ImageRequestWQ<I>::discard(uint64_t off, uint64_t len,
116 bool skip_partial_discard) {
7c673cae
FG
117 CephContext *cct = m_image_ctx.cct;
118 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
119 << "len = " << len << dendl;
120
121 m_image_ctx.snap_lock.get_read();
122 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
123 m_image_ctx.snap_lock.put_read();
124 if (r < 0) {
125 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
126 return r;
127 }
128
129 C_SaferCond cond;
130 AioCompletion *c = AioCompletion::create(&cond);
131 aio_discard(c, off, len, skip_partial_discard, false);
132
133 r = cond.wait();
134 if (r < 0) {
135 return r;
136 }
137 return len;
138}
139
224ce89b
WB
140template <typename I>
141ssize_t ImageRequestWQ<I>::writesame(uint64_t off, uint64_t len,
142 bufferlist &&bl, int op_flags) {
7c673cae
FG
143 CephContext *cct = m_image_ctx.cct;
144 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
145 << "len = " << len << ", data_len " << bl.length() << dendl;
146
147 m_image_ctx.snap_lock.get_read();
148 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
149 m_image_ctx.snap_lock.put_read();
150 if (r < 0) {
151 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
152 return r;
153 }
154
155 C_SaferCond cond;
156 AioCompletion *c = AioCompletion::create(&cond);
157 aio_writesame(c, off, len, std::move(bl), op_flags, false);
158
159 r = cond.wait();
160 if (r < 0) {
161 return r;
162 }
163 return len;
164}
165
224ce89b
WB
166template <typename I>
167void ImageRequestWQ<I>::aio_read(AioCompletion *c, uint64_t off, uint64_t len,
168 ReadResult &&read_result, int op_flags,
169 bool native_async) {
7c673cae 170 CephContext *cct = m_image_ctx.cct;
31f18b77
FG
171 ZTracer::Trace trace;
172 if (cct->_conf->rbd_blkin_trace_all) {
173 trace.init("wq: read", &m_image_ctx.trace_endpoint);
174 trace.event("start");
175 }
176
224ce89b 177 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_READ);
7c673cae
FG
178 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
179 << "completion=" << c << ", off=" << off << ", "
180 << "len=" << len << ", " << "flags=" << op_flags << dendl;
181
182 if (native_async && m_image_ctx.event_socket.is_valid()) {
183 c->set_event_notify(true);
184 }
185
224ce89b 186 if (!start_in_flight_io(c)) {
7c673cae
FG
187 return;
188 }
189
7c673cae
FG
190 // if journaling is enabled -- we need to replay the journal because
191 // it might contain an uncommitted write
224ce89b 192 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
7c673cae 193 if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty() ||
224ce89b
WB
194 require_lock_on_read()) {
195 queue(ImageRequest<I>::create_read_request(
196 m_image_ctx, c, {{off, len}}, std::move(read_result), op_flags,
197 trace));
7c673cae
FG
198 } else {
199 c->start_op();
224ce89b
WB
200 ImageRequest<I>::aio_read(&m_image_ctx, c, {{off, len}},
201 std::move(read_result), op_flags, trace);
202 finish_in_flight_io();
7c673cae 203 }
31f18b77 204 trace.event("finish");
7c673cae
FG
205}
206
224ce89b
WB
207template <typename I>
208void ImageRequestWQ<I>::aio_write(AioCompletion *c, uint64_t off, uint64_t len,
209 bufferlist &&bl, int op_flags,
210 bool native_async) {
7c673cae 211 CephContext *cct = m_image_ctx.cct;
31f18b77
FG
212 ZTracer::Trace trace;
213 if (cct->_conf->rbd_blkin_trace_all) {
214 trace.init("wq: write", &m_image_ctx.trace_endpoint);
215 trace.event("init");
216 }
217
224ce89b 218 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITE);
7c673cae
FG
219 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
220 << "completion=" << c << ", off=" << off << ", "
221 << "len=" << len << ", flags=" << op_flags << dendl;
222
223 if (native_async && m_image_ctx.event_socket.is_valid()) {
224 c->set_event_notify(true);
225 }
226
224ce89b 227 if (!start_in_flight_io(c)) {
7c673cae
FG
228 return;
229 }
230
231 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
232 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
224ce89b
WB
233 queue(ImageRequest<I>::create_write_request(
234 m_image_ctx, c, {{off, len}}, std::move(bl), op_flags, trace));
7c673cae
FG
235 } else {
236 c->start_op();
224ce89b
WB
237 ImageRequest<I>::aio_write(&m_image_ctx, c, {{off, len}},
238 std::move(bl), op_flags, trace);
239 finish_in_flight_io();
7c673cae 240 }
31f18b77 241 trace.event("finish");
7c673cae
FG
242}
243
224ce89b
WB
244template <typename I>
245void ImageRequestWQ<I>::aio_discard(AioCompletion *c, uint64_t off,
246 uint64_t len, bool skip_partial_discard,
247 bool native_async) {
7c673cae 248 CephContext *cct = m_image_ctx.cct;
31f18b77
FG
249 ZTracer::Trace trace;
250 if (cct->_conf->rbd_blkin_trace_all) {
251 trace.init("wq: discard", &m_image_ctx.trace_endpoint);
252 trace.event("init");
253 }
254
224ce89b 255 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_DISCARD);
7c673cae
FG
256 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
257 << "completion=" << c << ", off=" << off << ", len=" << len
258 << dendl;
259
260 if (native_async && m_image_ctx.event_socket.is_valid()) {
261 c->set_event_notify(true);
262 }
263
224ce89b 264 if (!start_in_flight_io(c)) {
7c673cae
FG
265 return;
266 }
267
268 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
269 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
224ce89b
WB
270 queue(ImageRequest<I>::create_discard_request(
271 m_image_ctx, c, off, len, skip_partial_discard, trace));
7c673cae
FG
272 } else {
273 c->start_op();
224ce89b
WB
274 ImageRequest<I>::aio_discard(&m_image_ctx, c, off, len,
275 skip_partial_discard, trace);
276 finish_in_flight_io();
7c673cae 277 }
31f18b77 278 trace.event("finish");
7c673cae
FG
279}
280
224ce89b
WB
281template <typename I>
282void ImageRequestWQ<I>::aio_flush(AioCompletion *c, bool native_async) {
7c673cae 283 CephContext *cct = m_image_ctx.cct;
31f18b77
FG
284 ZTracer::Trace trace;
285 if (cct->_conf->rbd_blkin_trace_all) {
286 trace.init("wq: flush", &m_image_ctx.trace_endpoint);
287 trace.event("init");
288 }
289
224ce89b 290 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_FLUSH);
7c673cae
FG
291 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
292 << "completion=" << c << dendl;
293
294 if (native_async && m_image_ctx.event_socket.is_valid()) {
295 c->set_event_notify(true);
296 }
297
224ce89b 298 if (!start_in_flight_io(c)) {
7c673cae
FG
299 return;
300 }
301
302 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
303 if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty()) {
224ce89b 304 queue(ImageRequest<I>::create_flush_request(m_image_ctx, c, trace));
7c673cae 305 } else {
224ce89b
WB
306 ImageRequest<I>::aio_flush(&m_image_ctx, c, trace);
307 finish_in_flight_io();
7c673cae 308 }
31f18b77 309 trace.event("finish");
7c673cae
FG
310}
311
224ce89b
WB
312template <typename I>
313void ImageRequestWQ<I>::aio_writesame(AioCompletion *c, uint64_t off,
314 uint64_t len, bufferlist &&bl,
315 int op_flags, bool native_async) {
7c673cae 316 CephContext *cct = m_image_ctx.cct;
31f18b77
FG
317 ZTracer::Trace trace;
318 if (cct->_conf->rbd_blkin_trace_all) {
319 trace.init("wq: writesame", &m_image_ctx.trace_endpoint);
320 trace.event("init");
321 }
322
224ce89b 323 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITESAME);
7c673cae
FG
324 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
325 << "completion=" << c << ", off=" << off << ", "
326 << "len=" << len << ", data_len = " << bl.length() << ", "
327 << "flags=" << op_flags << dendl;
328
329 if (native_async && m_image_ctx.event_socket.is_valid()) {
330 c->set_event_notify(true);
331 }
332
224ce89b 333 if (!start_in_flight_io(c)) {
7c673cae
FG
334 return;
335 }
336
337 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
338 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
224ce89b
WB
339 queue(ImageRequest<I>::create_writesame_request(
340 m_image_ctx, c, off, len, std::move(bl), op_flags, trace));
7c673cae
FG
341 } else {
342 c->start_op();
224ce89b
WB
343 ImageRequest<I>::aio_writesame(&m_image_ctx, c, off, len, std::move(bl),
344 op_flags, trace);
345 finish_in_flight_io();
7c673cae 346 }
31f18b77 347 trace.event("finish");
7c673cae
FG
348}
349
224ce89b
WB
350template <typename I>
351void ImageRequestWQ<I>::shut_down(Context *on_shutdown) {
7c673cae
FG
352 assert(m_image_ctx.owner_lock.is_locked());
353
354 {
355 RWLock::WLocker locker(m_lock);
356 assert(!m_shutdown);
357 m_shutdown = true;
358
359 CephContext *cct = m_image_ctx.cct;
224ce89b 360 ldout(cct, 5) << __func__ << ": in_flight=" << m_in_flight_ios.load()
7c673cae 361 << dendl;
224ce89b 362 if (m_in_flight_ios > 0) {
7c673cae
FG
363 m_on_shutdown = on_shutdown;
364 return;
365 }
366 }
367
368 // ensure that all in-flight IO is flushed
369 m_image_ctx.flush(on_shutdown);
370}
371
224ce89b
WB
372template <typename I>
373int ImageRequestWQ<I>::block_writes() {
7c673cae
FG
374 C_SaferCond cond_ctx;
375 block_writes(&cond_ctx);
376 return cond_ctx.wait();
377}
378
224ce89b
WB
379template <typename I>
380void ImageRequestWQ<I>::block_writes(Context *on_blocked) {
7c673cae
FG
381 assert(m_image_ctx.owner_lock.is_locked());
382 CephContext *cct = m_image_ctx.cct;
383
384 {
385 RWLock::WLocker locker(m_lock);
386 ++m_write_blockers;
387 ldout(cct, 5) << &m_image_ctx << ", " << "num="
388 << m_write_blockers << dendl;
224ce89b 389 if (!m_write_blocker_contexts.empty() || m_in_flight_writes > 0) {
7c673cae
FG
390 m_write_blocker_contexts.push_back(on_blocked);
391 return;
392 }
393 }
394
395 // ensure that all in-flight IO is flushed
396 m_image_ctx.flush(on_blocked);
397}
398
224ce89b
WB
399template <typename I>
400void ImageRequestWQ<I>::unblock_writes() {
7c673cae
FG
401 CephContext *cct = m_image_ctx.cct;
402
403 bool wake_up = false;
404 {
405 RWLock::WLocker locker(m_lock);
406 assert(m_write_blockers > 0);
407 --m_write_blockers;
408
409 ldout(cct, 5) << &m_image_ctx << ", " << "num="
410 << m_write_blockers << dendl;
411 if (m_write_blockers == 0) {
412 wake_up = true;
413 }
414 }
415
416 if (wake_up) {
224ce89b 417 this->signal();
7c673cae
FG
418 }
419}
420
224ce89b
WB
421template <typename I>
422void ImageRequestWQ<I>::set_require_lock(Direction direction, bool enabled) {
7c673cae
FG
423 CephContext *cct = m_image_ctx.cct;
424 ldout(cct, 20) << dendl;
425
224ce89b 426 bool wake_up = false;
7c673cae
FG
427 {
428 RWLock::WLocker locker(m_lock);
224ce89b
WB
429 switch (direction) {
430 case DIRECTION_READ:
431 wake_up = (enabled != m_require_lock_on_read);
432 m_require_lock_on_read = enabled;
433 break;
434 case DIRECTION_WRITE:
435 wake_up = (enabled != m_require_lock_on_write);
436 m_require_lock_on_write = enabled;
437 break;
438 case DIRECTION_BOTH:
439 wake_up = (enabled != m_require_lock_on_read ||
440 enabled != m_require_lock_on_write);
441 m_require_lock_on_read = enabled;
442 m_require_lock_on_write = enabled;
443 break;
7c673cae 444 }
224ce89b 445 }
7c673cae 446
224ce89b
WB
447 // wake up the thread pool whenever the state changes so that
448 // we can re-request the lock if required
449 if (wake_up) {
450 this->signal();
7c673cae 451 }
7c673cae
FG
452}
453
224ce89b
WB
454template <typename I>
455void *ImageRequestWQ<I>::_void_dequeue() {
456 CephContext *cct = m_image_ctx.cct;
457 ImageRequest<I> *peek_item = this->front();
7c673cae 458
224ce89b
WB
459 // no queued IO requests or all IO is blocked/stalled
460 if (peek_item == nullptr || m_io_blockers.load() > 0) {
7c673cae
FG
461 return nullptr;
462 }
463
224ce89b 464 bool lock_required;
7c673cae
FG
465 bool refresh_required = m_image_ctx.state->is_refresh_required();
466 {
467 RWLock::RLocker locker(m_lock);
224ce89b
WB
468 bool write_op = peek_item->is_write_op();
469 lock_required = is_lock_required(write_op);
470 if (write_op) {
471 if (!lock_required && m_write_blockers > 0) {
472 // missing lock is not the write blocker
7c673cae
FG
473 return nullptr;
474 }
475
224ce89b
WB
476 if (!lock_required && !refresh_required) {
477 // completed ops will requeue the IO -- don't count it as in-progress
478 m_in_flight_writes++;
7c673cae 479 }
7c673cae
FG
480 }
481 }
482
224ce89b
WB
483 ImageRequest<I> *item = reinterpret_cast<ImageRequest<I> *>(
484 ThreadPool::PointerWQ<ImageRequest<I> >::_void_dequeue());
7c673cae
FG
485 assert(peek_item == item);
486
224ce89b
WB
487 if (lock_required) {
488 this->get_pool_lock().Unlock();
489 m_image_ctx.owner_lock.get_read();
490 if (m_image_ctx.exclusive_lock != nullptr) {
491 ldout(cct, 5) << "exclusive lock required: delaying IO " << item << dendl;
492 if (!m_image_ctx.get_exclusive_lock_policy()->may_auto_request_lock()) {
493 lderr(cct) << "op requires exclusive lock" << dendl;
494 fail_in_flight_io(-EROFS, item);
495
496 // wake up the IO since we won't be returning a request to process
497 this->signal();
498 } else {
499 // stall IO until the acquire completes
500 ++m_io_blockers;
501 m_image_ctx.exclusive_lock->acquire_lock(new C_AcquireLock(this, item));
502 }
503 } else {
504 // raced with the exclusive lock being disabled
505 lock_required = false;
506 }
507 m_image_ctx.owner_lock.put_read();
508 this->get_pool_lock().Lock();
509
510 if (lock_required) {
511 return nullptr;
512 }
513 }
514
7c673cae 515 if (refresh_required) {
224ce89b 516 ldout(cct, 5) << "image refresh required: delaying IO " << item << dendl;
7c673cae
FG
517
518 // stall IO until the refresh completes
224ce89b 519 ++m_io_blockers;
7c673cae 520
224ce89b 521 this->get_pool_lock().Unlock();
7c673cae 522 m_image_ctx.state->refresh(new C_RefreshFinish(this, item));
224ce89b 523 this->get_pool_lock().Lock();
7c673cae
FG
524 return nullptr;
525 }
526
527 item->start_op();
528 return item;
529}
530
224ce89b
WB
531template <typename I>
532void ImageRequestWQ<I>::process(ImageRequest<I> *req) {
7c673cae
FG
533 CephContext *cct = m_image_ctx.cct;
534 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
535 << "req=" << req << dendl;
536
537 req->send();
538
224ce89b 539 finish_queued_io(req);
7c673cae 540 if (req->is_write_op()) {
224ce89b 541 finish_in_flight_write();
7c673cae
FG
542 }
543 delete req;
544
224ce89b 545 finish_in_flight_io();
7c673cae
FG
546}
547
224ce89b
WB
548template <typename I>
549void ImageRequestWQ<I>::finish_queued_io(ImageRequest<I> *req) {
7c673cae
FG
550 RWLock::RLocker locker(m_lock);
551 if (req->is_write_op()) {
552 assert(m_queued_writes > 0);
553 m_queued_writes--;
554 } else {
555 assert(m_queued_reads > 0);
556 m_queued_reads--;
557 }
558}
559
224ce89b
WB
560template <typename I>
561void ImageRequestWQ<I>::finish_in_flight_write() {
7c673cae
FG
562 bool writes_blocked = false;
563 {
564 RWLock::RLocker locker(m_lock);
224ce89b
WB
565 assert(m_in_flight_writes > 0);
566 if (--m_in_flight_writes == 0 &&
7c673cae
FG
567 !m_write_blocker_contexts.empty()) {
568 writes_blocked = true;
569 }
570 }
571
572 if (writes_blocked) {
573 m_image_ctx.flush(new C_BlockedWrites(this));
574 }
575}
576
224ce89b
WB
577template <typename I>
578int ImageRequestWQ<I>::start_in_flight_io(AioCompletion *c) {
7c673cae
FG
579 RWLock::RLocker locker(m_lock);
580
581 if (m_shutdown) {
582 CephContext *cct = m_image_ctx.cct;
583 lderr(cct) << "IO received on closed image" << dendl;
584
224ce89b 585 c->get();
7c673cae
FG
586 c->fail(-ESHUTDOWN);
587 return false;
588 }
589
224ce89b 590 m_in_flight_ios++;
7c673cae
FG
591 return true;
592}
593
224ce89b
WB
594template <typename I>
595void ImageRequestWQ<I>::finish_in_flight_io() {
7c673cae
FG
596 Context *on_shutdown;
597 {
598 RWLock::RLocker locker(m_lock);
224ce89b 599 if (--m_in_flight_ios > 0 || !m_shutdown) {
7c673cae
FG
600 return;
601 }
602 on_shutdown = m_on_shutdown;
603 }
604
605 CephContext *cct = m_image_ctx.cct;
606 ldout(cct, 5) << "completing shut down" << dendl;
607
608 assert(on_shutdown != nullptr);
609 m_image_ctx.flush(on_shutdown);
610}
611
224ce89b
WB
612template <typename I>
613void ImageRequestWQ<I>::fail_in_flight_io(int r, ImageRequest<I> *req) {
614 this->process_finish();
615 req->fail(r);
616 finish_queued_io(req);
617 delete req;
618 finish_in_flight_io();
619}
7c673cae 620
224ce89b
WB
621template <typename I>
622bool ImageRequestWQ<I>::is_lock_required(bool write_op) const {
623 assert(m_lock.is_locked());
624 return ((write_op && m_require_lock_on_write) ||
625 (!write_op && m_require_lock_on_read));
7c673cae
FG
626}
627
224ce89b
WB
628template <typename I>
629void ImageRequestWQ<I>::queue(ImageRequest<I> *req) {
630 assert(m_image_ctx.owner_lock.is_locked());
631
7c673cae
FG
632 CephContext *cct = m_image_ctx.cct;
633 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
634 << "req=" << req << dendl;
635
224ce89b 636 if (req->is_write_op()) {
7c673cae
FG
637 m_queued_writes++;
638 } else {
639 m_queued_reads++;
640 }
641
224ce89b
WB
642 ThreadPool::PointerWQ<ImageRequest<I> >::queue(req);
643}
7c673cae 644
224ce89b
WB
645template <typename I>
646void ImageRequestWQ<I>::handle_acquire_lock(int r, ImageRequest<I> *req) {
647 CephContext *cct = m_image_ctx.cct;
648 ldout(cct, 5) << "r=" << r << ", " << "req=" << req << dendl;
649
650 if (r < 0) {
651 fail_in_flight_io(r, req);
652 } else {
653 // since IO was stalled for acquire -- original IO order is preserved
654 // if we requeue this op for work queue processing
655 this->requeue(req);
7c673cae 656 }
224ce89b
WB
657
658 assert(m_io_blockers.load() > 0);
659 --m_io_blockers;
660 this->signal();
7c673cae
FG
661}
662
224ce89b
WB
663template <typename I>
664void ImageRequestWQ<I>::handle_refreshed(int r, ImageRequest<I> *req) {
7c673cae 665 CephContext *cct = m_image_ctx.cct;
224ce89b
WB
666 ldout(cct, 5) << "resuming IO after image refresh: r=" << r << ", "
667 << "req=" << req << dendl;
7c673cae 668 if (r < 0) {
224ce89b 669 fail_in_flight_io(r, req);
7c673cae
FG
670 } else {
671 // since IO was stalled for refresh -- original IO order is preserved
672 // if we requeue this op for work queue processing
224ce89b 673 this->requeue(req);
7c673cae
FG
674 }
675
224ce89b
WB
676 assert(m_io_blockers.load() > 0);
677 --m_io_blockers;
678 this->signal();
7c673cae
FG
679}
680
224ce89b
WB
681template <typename I>
682void ImageRequestWQ<I>::handle_blocked_writes(int r) {
7c673cae
FG
683 Contexts contexts;
684 {
685 RWLock::WLocker locker(m_lock);
686 contexts.swap(m_write_blocker_contexts);
687 }
688
689 for (auto ctx : contexts) {
690 ctx->complete(0);
691 }
692}
693
224ce89b
WB
694template class librbd::io::ImageRequestWQ<librbd::ImageCtx>;
695
7c673cae
FG
696} // namespace io
697} // namespace librbd