]> git.proxmox.com Git - ceph.git/blame - ceph/src/librbd/io/ImageRequestWQ.cc
import 15.2.5
[ceph.git] / ceph / src / librbd / io / ImageRequestWQ.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "librbd/io/ImageRequestWQ.h"
5#include "common/errno.h"
31f18b77 6#include "common/zipkin_trace.h"
11fdf7f2 7#include "common/Cond.h"
7c673cae
FG
8#include "librbd/ExclusiveLock.h"
9#include "librbd/ImageCtx.h"
10#include "librbd/ImageState.h"
91327a77 11#include "librbd/ImageWatcher.h"
7c673cae
FG
12#include "librbd/internal.h"
13#include "librbd/Utils.h"
14#include "librbd/exclusive_lock/Policy.h"
15#include "librbd/io/AioCompletion.h"
16#include "librbd/io/ImageRequest.h"
11fdf7f2
TL
17#include "librbd/io/ImageDispatchSpec.h"
18#include "common/EventTrace.h"
7c673cae
FG
19
20#define dout_subsys ceph_subsys_rbd
21#undef dout_prefix
22#define dout_prefix *_dout << "librbd::io::ImageRequestWQ: " << this \
23 << " " << __func__ << ": "
24
25namespace librbd {
9f95a23c
TL
26
27using util::create_context_callback;
28
7c673cae
FG
29namespace io {
30
11fdf7f2
TL
31namespace {
32
33template <typename I>
34void flush_image(I& image_ctx, Context* on_finish) {
494da23a 35 auto aio_comp = librbd::io::AioCompletion::create_and_start(
11fdf7f2
TL
36 on_finish, util::get_image_ctx(&image_ctx), librbd::io::AIO_TYPE_FLUSH);
37 auto req = librbd::io::ImageDispatchSpec<I>::create_flush_request(
38 image_ctx, aio_comp, librbd::io::FLUSH_SOURCE_INTERNAL, {});
39 req->send();
40 delete req;
41}
42
43} // anonymous namespace
44
224ce89b
WB
45template <typename I>
46struct ImageRequestWQ<I>::C_AcquireLock : public Context {
47 ImageRequestWQ *work_queue;
11fdf7f2 48 ImageDispatchSpec<I> *image_request;
224ce89b 49
11fdf7f2 50 C_AcquireLock(ImageRequestWQ *work_queue, ImageDispatchSpec<I> *image_request)
224ce89b
WB
51 : work_queue(work_queue), image_request(image_request) {
52 }
53
54 void finish(int r) override {
55 work_queue->handle_acquire_lock(r, image_request);
56 }
57};
58
59template <typename I>
60struct ImageRequestWQ<I>::C_BlockedWrites : public Context {
61 ImageRequestWQ *work_queue;
11fdf7f2 62 explicit C_BlockedWrites(ImageRequestWQ *_work_queue)
224ce89b
WB
63 : work_queue(_work_queue) {
64 }
65
66 void finish(int r) override {
67 work_queue->handle_blocked_writes(r);
68 }
69};
70
71template <typename I>
72struct ImageRequestWQ<I>::C_RefreshFinish : public Context {
73 ImageRequestWQ *work_queue;
11fdf7f2 74 ImageDispatchSpec<I> *image_request;
224ce89b
WB
75
76 C_RefreshFinish(ImageRequestWQ *work_queue,
11fdf7f2 77 ImageDispatchSpec<I> *image_request)
224ce89b
WB
78 : work_queue(work_queue), image_request(image_request) {
79 }
80 void finish(int r) override {
81 work_queue->handle_refreshed(r, image_request);
82 }
83};
84
11fdf7f2
TL
85static std::map<uint64_t, std::string> throttle_flags = {
86 { RBD_QOS_IOPS_THROTTLE, "rbd_qos_iops_throttle" },
87 { RBD_QOS_BPS_THROTTLE, "rbd_qos_bps_throttle" },
88 { RBD_QOS_READ_IOPS_THROTTLE, "rbd_qos_read_iops_throttle" },
89 { RBD_QOS_WRITE_IOPS_THROTTLE, "rbd_qos_write_iops_throttle" },
90 { RBD_QOS_READ_BPS_THROTTLE, "rbd_qos_read_bps_throttle" },
91 { RBD_QOS_WRITE_BPS_THROTTLE, "rbd_qos_write_bps_throttle" }
92};
93
224ce89b
WB
94template <typename I>
95ImageRequestWQ<I>::ImageRequestWQ(I *image_ctx, const string &name,
96 time_t ti, ThreadPool *tp)
11fdf7f2 97 : ThreadPool::PointerWQ<ImageDispatchSpec<I> >(name, ti, 0, tp),
7c673cae 98 m_image_ctx(*image_ctx),
9f95a23c
TL
99 m_lock(ceph::make_shared_mutex(
100 util::unique_lock_name("ImageRequestWQ<I>::m_lock", this))) {
7c673cae
FG
101 CephContext *cct = m_image_ctx.cct;
102 ldout(cct, 5) << "ictx=" << image_ctx << dendl;
11fdf7f2
TL
103
104 SafeTimer *timer;
9f95a23c 105 ceph::mutex *timer_lock;
11fdf7f2
TL
106 ImageCtx::get_timer_instance(cct, &timer, &timer_lock);
107
108 for (auto flag : throttle_flags) {
109 m_throttles.push_back(make_pair(
110 flag.first,
111 new TokenBucketThrottle(cct, flag.second, 0, 0, timer, timer_lock)));
112 }
113
224ce89b 114 this->register_work_queue();
7c673cae
FG
115}
116
11fdf7f2
TL
117template <typename I>
118ImageRequestWQ<I>::~ImageRequestWQ() {
119 for (auto t : m_throttles) {
120 delete t.second;
121 }
122}
123
224ce89b
WB
124template <typename I>
125ssize_t ImageRequestWQ<I>::read(uint64_t off, uint64_t len,
126 ReadResult &&read_result, int op_flags) {
7c673cae
FG
127 CephContext *cct = m_image_ctx.cct;
128 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
129 << "len = " << len << dendl;
130
131 C_SaferCond cond;
132 AioCompletion *c = AioCompletion::create(&cond);
133 aio_read(c, off, len, std::move(read_result), op_flags, false);
134 return cond.wait();
135}
136
224ce89b
WB
137template <typename I>
138ssize_t ImageRequestWQ<I>::write(uint64_t off, uint64_t len,
139 bufferlist &&bl, int op_flags) {
7c673cae
FG
140 CephContext *cct = m_image_ctx.cct;
141 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
142 << "len = " << len << dendl;
143
9f95a23c 144 m_image_ctx.image_lock.lock_shared();
7c673cae 145 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
9f95a23c 146 m_image_ctx.image_lock.unlock_shared();
7c673cae
FG
147 if (r < 0) {
148 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
149 return r;
150 }
151
152 C_SaferCond cond;
153 AioCompletion *c = AioCompletion::create(&cond);
154 aio_write(c, off, len, std::move(bl), op_flags, false);
155
156 r = cond.wait();
157 if (r < 0) {
158 return r;
159 }
160 return len;
161}
162
224ce89b
WB
163template <typename I>
164ssize_t ImageRequestWQ<I>::discard(uint64_t off, uint64_t len,
11fdf7f2 165 uint32_t discard_granularity_bytes) {
7c673cae
FG
166 CephContext *cct = m_image_ctx.cct;
167 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
168 << "len = " << len << dendl;
169
9f95a23c 170 m_image_ctx.image_lock.lock_shared();
7c673cae 171 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
9f95a23c 172 m_image_ctx.image_lock.unlock_shared();
7c673cae
FG
173 if (r < 0) {
174 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
175 return r;
176 }
177
178 C_SaferCond cond;
179 AioCompletion *c = AioCompletion::create(&cond);
11fdf7f2 180 aio_discard(c, off, len, discard_granularity_bytes, false);
7c673cae
FG
181
182 r = cond.wait();
183 if (r < 0) {
184 return r;
185 }
186 return len;
187}
188
224ce89b
WB
189template <typename I>
190ssize_t ImageRequestWQ<I>::writesame(uint64_t off, uint64_t len,
191 bufferlist &&bl, int op_flags) {
7c673cae
FG
192 CephContext *cct = m_image_ctx.cct;
193 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
194 << "len = " << len << ", data_len " << bl.length() << dendl;
195
9f95a23c 196 m_image_ctx.image_lock.lock_shared();
7c673cae 197 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
9f95a23c 198 m_image_ctx.image_lock.unlock_shared();
7c673cae
FG
199 if (r < 0) {
200 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
201 return r;
202 }
203
204 C_SaferCond cond;
205 AioCompletion *c = AioCompletion::create(&cond);
206 aio_writesame(c, off, len, std::move(bl), op_flags, false);
207
208 r = cond.wait();
209 if (r < 0) {
210 return r;
211 }
212 return len;
213}
214
f6b5b4d7
TL
215template <typename I>
216ssize_t ImageRequestWQ<I>::write_zeroes(uint64_t off, uint64_t len,
217 int zero_flags, int op_flags) {
218 auto cct = m_image_ctx.cct;
219 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
220 << "len = " << len << dendl;
221
222 m_image_ctx.image_lock.lock_shared();
223 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
224 m_image_ctx.image_lock.unlock_shared();
225 if (r < 0) {
226 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
227 return r;
228 }
229
230 C_SaferCond ctx;
231 auto aio_comp = io::AioCompletion::create(&ctx);
232 aio_write_zeroes(aio_comp, off, len, zero_flags, op_flags, false);
233
234 r = ctx.wait();
235 if (r < 0) {
236 return r;
237 }
238 return len;
239}
240
c07f9fc5
FG
241template <typename I>
242ssize_t ImageRequestWQ<I>::compare_and_write(uint64_t off, uint64_t len,
243 bufferlist &&cmp_bl,
244 bufferlist &&bl,
245 uint64_t *mismatch_off,
246 int op_flags){
247 CephContext *cct = m_image_ctx.cct;
248 ldout(cct, 20) << "compare_and_write ictx=" << &m_image_ctx << ", off="
249 << off << ", " << "len = " << len << dendl;
250
9f95a23c 251 m_image_ctx.image_lock.lock_shared();
c07f9fc5 252 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
9f95a23c 253 m_image_ctx.image_lock.unlock_shared();
c07f9fc5
FG
254 if (r < 0) {
255 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
256 return r;
257 }
258
259 C_SaferCond cond;
260 AioCompletion *c = AioCompletion::create(&cond);
261 aio_compare_and_write(c, off, len, std::move(cmp_bl), std::move(bl),
262 mismatch_off, op_flags, false);
263
264 r = cond.wait();
265 if (r < 0) {
266 return r;
267 }
268
269 return len;
270}
271
11fdf7f2
TL
272template <typename I>
273int ImageRequestWQ<I>::flush() {
274 CephContext *cct = m_image_ctx.cct;
275 ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl;
276
277 C_SaferCond cond;
278 AioCompletion *c = AioCompletion::create(&cond);
279 aio_flush(c, false);
280
281 int r = cond.wait();
282 if (r < 0) {
283 return r;
284 }
285
286 return 0;
287}
288
224ce89b
WB
289template <typename I>
290void ImageRequestWQ<I>::aio_read(AioCompletion *c, uint64_t off, uint64_t len,
291 ReadResult &&read_result, int op_flags,
292 bool native_async) {
7c673cae 293 CephContext *cct = m_image_ctx.cct;
11fdf7f2 294 FUNCTRACE(cct);
31f18b77 295 ZTracer::Trace trace;
181888fb 296 if (m_image_ctx.blkin_trace_all) {
31f18b77
FG
297 trace.init("wq: read", &m_image_ctx.trace_endpoint);
298 trace.event("start");
299 }
300
224ce89b 301 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_READ);
7c673cae
FG
302 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
303 << "completion=" << c << ", off=" << off << ", "
304 << "len=" << len << ", " << "flags=" << op_flags << dendl;
305
306 if (native_async && m_image_ctx.event_socket.is_valid()) {
307 c->set_event_notify(true);
308 }
309
224ce89b 310 if (!start_in_flight_io(c)) {
7c673cae
FG
311 return;
312 }
313
7c673cae
FG
314 // if journaling is enabled -- we need to replay the journal because
315 // it might contain an uncommitted write
9f95a23c 316 std::shared_lock owner_locker{m_image_ctx.owner_lock};
7c673cae 317 if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty() ||
224ce89b 318 require_lock_on_read()) {
11fdf7f2 319 queue(ImageDispatchSpec<I>::create_read_request(
224ce89b
WB
320 m_image_ctx, c, {{off, len}}, std::move(read_result), op_flags,
321 trace));
7c673cae
FG
322 } else {
323 c->start_op();
224ce89b
WB
324 ImageRequest<I>::aio_read(&m_image_ctx, c, {{off, len}},
325 std::move(read_result), op_flags, trace);
326 finish_in_flight_io();
7c673cae 327 }
31f18b77 328 trace.event("finish");
7c673cae
FG
329}
330
224ce89b
WB
331template <typename I>
332void ImageRequestWQ<I>::aio_write(AioCompletion *c, uint64_t off, uint64_t len,
333 bufferlist &&bl, int op_flags,
334 bool native_async) {
7c673cae 335 CephContext *cct = m_image_ctx.cct;
11fdf7f2 336 FUNCTRACE(cct);
31f18b77 337 ZTracer::Trace trace;
181888fb 338 if (m_image_ctx.blkin_trace_all) {
31f18b77
FG
339 trace.init("wq: write", &m_image_ctx.trace_endpoint);
340 trace.event("init");
341 }
342
224ce89b 343 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITE);
7c673cae
FG
344 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
345 << "completion=" << c << ", off=" << off << ", "
346 << "len=" << len << ", flags=" << op_flags << dendl;
347
348 if (native_async && m_image_ctx.event_socket.is_valid()) {
349 c->set_event_notify(true);
350 }
351
224ce89b 352 if (!start_in_flight_io(c)) {
7c673cae
FG
353 return;
354 }
355
9f95a23c
TL
356 auto tid = ++m_last_tid;
357
358 {
359 std::lock_guard locker{m_lock};
360 m_queued_or_blocked_io_tids.insert(tid);
361 }
362
363 ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_write_request(
364 m_image_ctx, c, {{off, len}}, std::move(bl), op_flags, trace, tid);
365
366 std::shared_lock owner_locker{m_image_ctx.owner_lock};
7c673cae 367 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
9f95a23c 368 queue(req);
7c673cae 369 } else {
9f95a23c 370 process_io(req, false);
224ce89b 371 finish_in_flight_io();
7c673cae 372 }
31f18b77 373 trace.event("finish");
7c673cae
FG
374}
375
224ce89b
WB
376template <typename I>
377void ImageRequestWQ<I>::aio_discard(AioCompletion *c, uint64_t off,
11fdf7f2
TL
378 uint64_t len,
379 uint32_t discard_granularity_bytes,
224ce89b 380 bool native_async) {
7c673cae 381 CephContext *cct = m_image_ctx.cct;
11fdf7f2 382 FUNCTRACE(cct);
31f18b77 383 ZTracer::Trace trace;
181888fb 384 if (m_image_ctx.blkin_trace_all) {
31f18b77
FG
385 trace.init("wq: discard", &m_image_ctx.trace_endpoint);
386 trace.event("init");
387 }
388
224ce89b 389 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_DISCARD);
7c673cae
FG
390 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
391 << "completion=" << c << ", off=" << off << ", len=" << len
392 << dendl;
393
394 if (native_async && m_image_ctx.event_socket.is_valid()) {
395 c->set_event_notify(true);
396 }
397
224ce89b 398 if (!start_in_flight_io(c)) {
7c673cae
FG
399 return;
400 }
401
9f95a23c
TL
402 auto tid = ++m_last_tid;
403
404 {
405 std::lock_guard locker{m_lock};
406 m_queued_or_blocked_io_tids.insert(tid);
407 }
408
409 ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_discard_request(
410 m_image_ctx, c, off, len, discard_granularity_bytes, trace, tid);
411
412 std::shared_lock owner_locker{m_image_ctx.owner_lock};
7c673cae 413 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
9f95a23c 414 queue(req);
7c673cae 415 } else {
9f95a23c 416 process_io(req, false);
224ce89b 417 finish_in_flight_io();
7c673cae 418 }
31f18b77 419 trace.event("finish");
7c673cae
FG
420}
421
224ce89b
WB
422template <typename I>
423void ImageRequestWQ<I>::aio_flush(AioCompletion *c, bool native_async) {
7c673cae 424 CephContext *cct = m_image_ctx.cct;
11fdf7f2 425 FUNCTRACE(cct);
31f18b77 426 ZTracer::Trace trace;
181888fb 427 if (m_image_ctx.blkin_trace_all) {
31f18b77
FG
428 trace.init("wq: flush", &m_image_ctx.trace_endpoint);
429 trace.event("init");
430 }
431
224ce89b 432 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_FLUSH);
7c673cae
FG
433 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
434 << "completion=" << c << dendl;
435
436 if (native_async && m_image_ctx.event_socket.is_valid()) {
437 c->set_event_notify(true);
438 }
439
224ce89b 440 if (!start_in_flight_io(c)) {
7c673cae
FG
441 return;
442 }
443
9f95a23c
TL
444 auto tid = ++m_last_tid;
445
446 ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_flush_request(
447 m_image_ctx, c, FLUSH_SOURCE_USER, trace);
448
449 {
450 std::lock_guard locker{m_lock};
451 if(!m_queued_or_blocked_io_tids.empty()) {
452 ldout(cct, 20) << "queueing flush, tid: " << tid << dendl;
453 m_queued_flushes.emplace(tid, req);
454 --m_in_flight_ios;
455 return;
456 }
457 }
458
459 std::shared_lock owner_locker{m_image_ctx.owner_lock};
7c673cae 460 if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty()) {
9f95a23c 461 queue(req);
7c673cae 462 } else {
9f95a23c 463 process_io(req, false);
224ce89b 464 finish_in_flight_io();
7c673cae 465 }
31f18b77 466 trace.event("finish");
7c673cae
FG
467}
468
224ce89b
WB
469template <typename I>
470void ImageRequestWQ<I>::aio_writesame(AioCompletion *c, uint64_t off,
471 uint64_t len, bufferlist &&bl,
472 int op_flags, bool native_async) {
7c673cae 473 CephContext *cct = m_image_ctx.cct;
11fdf7f2 474 FUNCTRACE(cct);
31f18b77 475 ZTracer::Trace trace;
181888fb 476 if (m_image_ctx.blkin_trace_all) {
31f18b77
FG
477 trace.init("wq: writesame", &m_image_ctx.trace_endpoint);
478 trace.event("init");
479 }
480
224ce89b 481 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITESAME);
7c673cae
FG
482 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
483 << "completion=" << c << ", off=" << off << ", "
484 << "len=" << len << ", data_len = " << bl.length() << ", "
485 << "flags=" << op_flags << dendl;
486
487 if (native_async && m_image_ctx.event_socket.is_valid()) {
488 c->set_event_notify(true);
489 }
490
224ce89b 491 if (!start_in_flight_io(c)) {
7c673cae
FG
492 return;
493 }
494
9f95a23c
TL
495 auto tid = ++m_last_tid;
496
497 {
498 std::lock_guard locker{m_lock};
499 m_queued_or_blocked_io_tids.insert(tid);
500 }
501
502 ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_write_same_request(
503 m_image_ctx, c, off, len, std::move(bl), op_flags, trace, tid);
504
505 std::shared_lock owner_locker{m_image_ctx.owner_lock};
f6b5b4d7
TL
506 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
507 queue(req);
508 } else {
509 process_io(req, false);
510 finish_in_flight_io();
511 }
512 trace.event("finish");
513}
514
515
516template <typename I>
517void ImageRequestWQ<I>::aio_write_zeroes(io::AioCompletion *aio_comp,
518 uint64_t off, uint64_t len,
519 int zero_flags, int op_flags,
520 bool native_async) {
521 auto cct = m_image_ctx.cct;
522 FUNCTRACE(cct);
523 ZTracer::Trace trace;
524 if (m_image_ctx.blkin_trace_all) {
525 trace.init("io: write_zeroes", &m_image_ctx.trace_endpoint);
526 trace.event("init");
527 }
528
529 aio_comp->init_time(util::get_image_ctx(&m_image_ctx), io::AIO_TYPE_DISCARD);
530 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
531 << "completion=" << aio_comp << ", off=" << off << ", "
532 << "len=" << len << dendl;
533
534 if (native_async && m_image_ctx.event_socket.is_valid()) {
535 aio_comp->set_event_notify(true);
536 }
537
538 // validate the supported flags
539 if (zero_flags != 0U) {
540 aio_comp->fail(-EINVAL);
541 return;
542 }
543
544 if (!start_in_flight_io(aio_comp)) {
545 return;
546 }
547
548 // enable partial discard (zeroing) of objects
549 uint32_t discard_granularity_bytes = 0;
550
551 auto tid = ++m_last_tid;
552
553 {
554 std::lock_guard locker{m_lock};
555 m_queued_or_blocked_io_tids.insert(tid);
556 }
557
558 auto req = ImageDispatchSpec<I>::create_discard_request(
559 m_image_ctx, aio_comp, off, len, discard_granularity_bytes, trace, tid);
560
561 std::shared_lock owner_locker{m_image_ctx.owner_lock};
7c673cae 562 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
9f95a23c 563 queue(req);
7c673cae 564 } else {
9f95a23c 565 process_io(req, false);
224ce89b 566 finish_in_flight_io();
7c673cae 567 }
31f18b77 568 trace.event("finish");
7c673cae
FG
569}
570
c07f9fc5
FG
571template <typename I>
572void ImageRequestWQ<I>::aio_compare_and_write(AioCompletion *c,
573 uint64_t off, uint64_t len,
574 bufferlist &&cmp_bl,
575 bufferlist &&bl,
576 uint64_t *mismatch_off,
577 int op_flags, bool native_async) {
578 CephContext *cct = m_image_ctx.cct;
11fdf7f2 579 FUNCTRACE(cct);
c07f9fc5 580 ZTracer::Trace trace;
181888fb 581 if (m_image_ctx.blkin_trace_all) {
c07f9fc5
FG
582 trace.init("wq: compare_and_write", &m_image_ctx.trace_endpoint);
583 trace.event("init");
584 }
585
586 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_COMPARE_AND_WRITE);
587 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
588 << "completion=" << c << ", off=" << off << ", "
589 << "len=" << len << dendl;
590
591 if (native_async && m_image_ctx.event_socket.is_valid()) {
592 c->set_event_notify(true);
593 }
594
595 if (!start_in_flight_io(c)) {
596 return;
597 }
598
9f95a23c
TL
599 auto tid = ++m_last_tid;
600
601 {
602 std::lock_guard locker{m_lock};
603 m_queued_or_blocked_io_tids.insert(tid);
604 }
605
606 ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_compare_and_write_request(
c07f9fc5 607 m_image_ctx, c, {{off, len}}, std::move(cmp_bl), std::move(bl),
9f95a23c
TL
608 mismatch_off, op_flags, trace, tid);
609
610 std::shared_lock owner_locker{m_image_ctx.owner_lock};
611 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
612 queue(req);
c07f9fc5 613 } else {
9f95a23c 614 process_io(req, false);
c07f9fc5
FG
615 finish_in_flight_io();
616 }
617 trace.event("finish");
618}
619
9f95a23c
TL
620template <typename I>
621bool ImageRequestWQ<I>::block_overlapping_io(
622 ImageExtentIntervals* in_flight_image_extents, uint64_t off, uint64_t len) {
623 CephContext *cct = m_image_ctx.cct;
624 ldout(cct, 20) << "ictx=" << &m_image_ctx
625 << "off: " << off << " len: " << len <<dendl;
626
627 if(len == 0) {
628 return false;
629 }
630
631 if (in_flight_image_extents->empty() ||
632 !in_flight_image_extents->intersects(off, len)) {
633 in_flight_image_extents->insert(off, len);
634 return false;
635 }
636
637 return true;
638}
639
640template <typename I>
641void ImageRequestWQ<I>::unblock_overlapping_io(uint64_t offset, uint64_t length,
642 uint64_t tid) {
643 CephContext *cct = m_image_ctx.cct;
644 ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl;
645
646 remove_in_flight_write_ios(offset, length, true, tid);
647
648 std::unique_lock locker{m_lock};
649 if (!m_blocked_ios.empty()) {
650 auto it = m_blocked_ios.begin();
651 while (it != m_blocked_ios.end()) {
652 auto blocked_io = *it;
653
654 const auto& extents = blocked_io->get_image_extents();
655 uint64_t off = extents.size() ? extents.front().first : 0;
656 uint64_t len = extents.size() ? extents.front().second : 0;
657
658 if (block_overlapping_io(&m_in_flight_extents, off, len)) {
659 break;
660 }
661 ldout(cct, 20) << "unblocking off: " << off << ", "
662 << "len: " << len << dendl;
663 AioCompletion *aio_comp = blocked_io->get_aio_completion();
664
665 m_blocked_ios.erase(it);
666 locker.unlock();
667 queue_unblocked_io(aio_comp, blocked_io);
668 locker.lock();
669 }
670 }
671}
672
673template <typename I>
674void ImageRequestWQ<I>::unblock_flushes() {
675 CephContext *cct = m_image_ctx.cct;
676 ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl;
677 std::unique_lock locker{m_lock};
678 auto io_tid_it = m_queued_or_blocked_io_tids.begin();
679 while (true) {
680 auto it = m_queued_flushes.begin();
681 if (it == m_queued_flushes.end() ||
682 (io_tid_it != m_queued_or_blocked_io_tids.end() &&
683 *io_tid_it < it->first)) {
684 break;
685 }
686
687 auto blocked_flush = *it;
688 ldout(cct, 20) << "unblocking flush: tid " << blocked_flush.first << dendl;
689
690 AioCompletion *aio_comp = blocked_flush.second->get_aio_completion();
691
692 m_queued_flushes.erase(it);
693 locker.unlock();
694 queue_unblocked_io(aio_comp, blocked_flush.second);
695 locker.lock();
696 }
697}
698
699template <typename I>
700void ImageRequestWQ<I>::queue_unblocked_io(AioCompletion *comp,
701 ImageDispatchSpec<I> *req) {
702 if (!start_in_flight_io(comp)) {
703 return;
704 }
705
706 std::shared_lock owner_locker{m_image_ctx.owner_lock};
707 queue(req);
708}
709
224ce89b
WB
710template <typename I>
711void ImageRequestWQ<I>::shut_down(Context *on_shutdown) {
9f95a23c 712 ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
7c673cae
FG
713
714 {
9f95a23c 715 std::unique_lock locker{m_lock};
11fdf7f2 716 ceph_assert(!m_shutdown);
7c673cae
FG
717 m_shutdown = true;
718
719 CephContext *cct = m_image_ctx.cct;
224ce89b 720 ldout(cct, 5) << __func__ << ": in_flight=" << m_in_flight_ios.load()
7c673cae 721 << dendl;
224ce89b 722 if (m_in_flight_ios > 0) {
7c673cae
FG
723 m_on_shutdown = on_shutdown;
724 return;
725 }
726 }
727
728 // ensure that all in-flight IO is flushed
11fdf7f2 729 flush_image(m_image_ctx, on_shutdown);
7c673cae
FG
730}
731
224ce89b
WB
732template <typename I>
733int ImageRequestWQ<I>::block_writes() {
7c673cae
FG
734 C_SaferCond cond_ctx;
735 block_writes(&cond_ctx);
736 return cond_ctx.wait();
737}
738
224ce89b
WB
739template <typename I>
740void ImageRequestWQ<I>::block_writes(Context *on_blocked) {
9f95a23c 741 ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
7c673cae
FG
742 CephContext *cct = m_image_ctx.cct;
743
744 {
9f95a23c 745 std::unique_lock locker{m_lock};
7c673cae
FG
746 ++m_write_blockers;
747 ldout(cct, 5) << &m_image_ctx << ", " << "num="
748 << m_write_blockers << dendl;
224ce89b 749 if (!m_write_blocker_contexts.empty() || m_in_flight_writes > 0) {
7c673cae
FG
750 m_write_blocker_contexts.push_back(on_blocked);
751 return;
752 }
753 }
754
755 // ensure that all in-flight IO is flushed
11fdf7f2 756 flush_image(m_image_ctx, on_blocked);
7c673cae
FG
757}
758
224ce89b
WB
759template <typename I>
760void ImageRequestWQ<I>::unblock_writes() {
7c673cae
FG
761 CephContext *cct = m_image_ctx.cct;
762
763 bool wake_up = false;
11fdf7f2 764 Contexts waiter_contexts;
7c673cae 765 {
9f95a23c 766 std::unique_lock locker{m_lock};
11fdf7f2 767 ceph_assert(m_write_blockers > 0);
7c673cae
FG
768 --m_write_blockers;
769
770 ldout(cct, 5) << &m_image_ctx << ", " << "num="
771 << m_write_blockers << dendl;
772 if (m_write_blockers == 0) {
773 wake_up = true;
11fdf7f2 774 std::swap(waiter_contexts, m_unblocked_write_waiter_contexts);
7c673cae
FG
775 }
776 }
777
778 if (wake_up) {
11fdf7f2
TL
779 for (auto ctx : waiter_contexts) {
780 ctx->complete(0);
781 }
224ce89b 782 this->signal();
7c673cae
FG
783 }
784}
785
11fdf7f2
TL
786template <typename I>
787void ImageRequestWQ<I>::wait_on_writes_unblocked(Context *on_unblocked) {
9f95a23c 788 ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
11fdf7f2
TL
789 CephContext *cct = m_image_ctx.cct;
790
791 {
9f95a23c 792 std::unique_lock locker{m_lock};
11fdf7f2
TL
793 ldout(cct, 20) << &m_image_ctx << ", " << "write_blockers="
794 << m_write_blockers << dendl;
795 if (!m_unblocked_write_waiter_contexts.empty() || m_write_blockers > 0) {
796 m_unblocked_write_waiter_contexts.push_back(on_unblocked);
797 return;
798 }
799 }
800
801 on_unblocked->complete(0);
802}
803
224ce89b
WB
804template <typename I>
805void ImageRequestWQ<I>::set_require_lock(Direction direction, bool enabled) {
7c673cae
FG
806 CephContext *cct = m_image_ctx.cct;
807 ldout(cct, 20) << dendl;
808
224ce89b 809 bool wake_up = false;
7c673cae 810 {
9f95a23c 811 std::unique_lock locker{m_lock};
224ce89b
WB
812 switch (direction) {
813 case DIRECTION_READ:
814 wake_up = (enabled != m_require_lock_on_read);
815 m_require_lock_on_read = enabled;
816 break;
817 case DIRECTION_WRITE:
818 wake_up = (enabled != m_require_lock_on_write);
819 m_require_lock_on_write = enabled;
820 break;
821 case DIRECTION_BOTH:
822 wake_up = (enabled != m_require_lock_on_read ||
823 enabled != m_require_lock_on_write);
824 m_require_lock_on_read = enabled;
825 m_require_lock_on_write = enabled;
826 break;
7c673cae 827 }
224ce89b 828 }
7c673cae 829
224ce89b
WB
830 // wake up the thread pool whenever the state changes so that
831 // we can re-request the lock if required
832 if (wake_up) {
833 this->signal();
7c673cae 834 }
7c673cae
FG
835}
836
11fdf7f2
TL
837template <typename I>
838void ImageRequestWQ<I>::apply_qos_schedule_tick_min(uint64_t tick){
839 for (auto pair : m_throttles) {
840 pair.second->set_schedule_tick_min(tick);
841 }
842}
843
844template <typename I>
845void ImageRequestWQ<I>::apply_qos_limit(const uint64_t flag,
846 uint64_t limit,
847 uint64_t burst) {
848 CephContext *cct = m_image_ctx.cct;
849 TokenBucketThrottle *throttle = nullptr;
850 for (auto pair : m_throttles) {
851 if (flag == pair.first) {
852 throttle = pair.second;
853 break;
854 }
855 }
856 ceph_assert(throttle != nullptr);
857
858 int r = throttle->set_limit(limit, burst);
859 if (r < 0) {
860 lderr(cct) << throttle->get_name() << ": invalid qos parameter: "
861 << "burst(" << burst << ") is less than "
862 << "limit(" << limit << ")" << dendl;
863 // if apply failed, we should at least make sure the limit works.
864 throttle->set_limit(limit, 0);
865 }
866
867 if (limit)
868 m_qos_enabled_flag |= flag;
869 else
870 m_qos_enabled_flag &= ~flag;
871}
872
873template <typename I>
874void ImageRequestWQ<I>::handle_throttle_ready(int r, ImageDispatchSpec<I> *item, uint64_t flag) {
875 CephContext *cct = m_image_ctx.cct;
876 ldout(cct, 15) << "r=" << r << ", " << "req=" << item << dendl;
877
878 ceph_assert(m_io_throttled.load() > 0);
879 item->set_throttled(flag);
880 if (item->were_all_throttled()) {
81eedcae 881 this->requeue_back(item);
11fdf7f2
TL
882 --m_io_throttled;
883 this->signal();
884 }
885}
886
887template <typename I>
888bool ImageRequestWQ<I>::needs_throttle(ImageDispatchSpec<I> *item) {
889 uint64_t tokens = 0;
890 uint64_t flag = 0;
891 bool blocked = false;
892 TokenBucketThrottle* throttle = nullptr;
893
894 for (auto t : m_throttles) {
895 flag = t.first;
896 if (item->was_throttled(flag))
897 continue;
898
899 if (!(m_qos_enabled_flag & flag)) {
900 item->set_throttled(flag);
901 continue;
902 }
903
904 throttle = t.second;
81eedcae
TL
905 if (item->tokens_requested(flag, &tokens) &&
906 throttle->get<ImageRequestWQ<I>, ImageDispatchSpec<I>,
11fdf7f2
TL
907 &ImageRequestWQ<I>::handle_throttle_ready>(
908 tokens, this, item, flag)) {
909 blocked = true;
910 } else {
911 item->set_throttled(flag);
912 }
913 }
914 return blocked;
915}
916
224ce89b
WB
917template <typename I>
918void *ImageRequestWQ<I>::_void_dequeue() {
919 CephContext *cct = m_image_ctx.cct;
11fdf7f2 920 ImageDispatchSpec<I> *peek_item = this->front();
7c673cae 921
224ce89b
WB
922 // no queued IO requests or all IO is blocked/stalled
923 if (peek_item == nullptr || m_io_blockers.load() > 0) {
7c673cae
FG
924 return nullptr;
925 }
926
11fdf7f2
TL
927 if (needs_throttle(peek_item)) {
928 ldout(cct, 15) << "throttling IO " << peek_item << dendl;
929
930 ++m_io_throttled;
931 // dequeue the throttled item
932 ThreadPool::PointerWQ<ImageDispatchSpec<I> >::_void_dequeue();
933 return nullptr;
934 }
935
224ce89b 936 bool lock_required;
7c673cae
FG
937 bool refresh_required = m_image_ctx.state->is_refresh_required();
938 {
9f95a23c 939 std::shared_lock locker{m_lock};
224ce89b
WB
940 bool write_op = peek_item->is_write_op();
941 lock_required = is_lock_required(write_op);
942 if (write_op) {
943 if (!lock_required && m_write_blockers > 0) {
944 // missing lock is not the write blocker
7c673cae
FG
945 return nullptr;
946 }
947
9f95a23c 948 if (!lock_required && !refresh_required && !peek_item->blocked) {
224ce89b
WB
949 // completed ops will requeue the IO -- don't count it as in-progress
950 m_in_flight_writes++;
7c673cae 951 }
7c673cae
FG
952 }
953 }
954
11fdf7f2
TL
955 auto item = reinterpret_cast<ImageDispatchSpec<I> *>(
956 ThreadPool::PointerWQ<ImageDispatchSpec<I> >::_void_dequeue());
957 ceph_assert(peek_item == item);
7c673cae 958
224ce89b 959 if (lock_required) {
11fdf7f2 960 this->get_pool_lock().unlock();
9f95a23c 961 m_image_ctx.owner_lock.lock_shared();
224ce89b
WB
962 if (m_image_ctx.exclusive_lock != nullptr) {
963 ldout(cct, 5) << "exclusive lock required: delaying IO " << item << dendl;
964 if (!m_image_ctx.get_exclusive_lock_policy()->may_auto_request_lock()) {
965 lderr(cct) << "op requires exclusive lock" << dendl;
91327a77
AA
966 fail_in_flight_io(m_image_ctx.exclusive_lock->get_unlocked_op_error(),
967 item);
224ce89b
WB
968
969 // wake up the IO since we won't be returning a request to process
970 this->signal();
971 } else {
972 // stall IO until the acquire completes
973 ++m_io_blockers;
9f95a23c
TL
974 Context *ctx = new C_AcquireLock(this, item);
975 ctx = create_context_callback<
976 Context, &Context::complete>(
977 ctx, m_image_ctx.exclusive_lock);
978 m_image_ctx.exclusive_lock->acquire_lock(ctx);
224ce89b
WB
979 }
980 } else {
981 // raced with the exclusive lock being disabled
982 lock_required = false;
983 }
9f95a23c 984 m_image_ctx.owner_lock.unlock_shared();
11fdf7f2 985 this->get_pool_lock().lock();
224ce89b
WB
986
987 if (lock_required) {
988 return nullptr;
989 }
990 }
991
7c673cae 992 if (refresh_required) {
224ce89b 993 ldout(cct, 5) << "image refresh required: delaying IO " << item << dendl;
7c673cae
FG
994
995 // stall IO until the refresh completes
224ce89b 996 ++m_io_blockers;
7c673cae 997
11fdf7f2 998 this->get_pool_lock().unlock();
7c673cae 999 m_image_ctx.state->refresh(new C_RefreshFinish(this, item));
11fdf7f2 1000 this->get_pool_lock().lock();
7c673cae
FG
1001 return nullptr;
1002 }
1003
7c673cae
FG
1004 return item;
1005}
1006
224ce89b 1007template <typename I>
9f95a23c
TL
1008void ImageRequestWQ<I>::process_io(ImageDispatchSpec<I> *req,
1009 bool non_blocking_io) {
7c673cae
FG
1010 CephContext *cct = m_image_ctx.cct;
1011 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
1012 << "req=" << req << dendl;
1013
9f95a23c
TL
1014 //extents are invalidated after the request is sent
1015 //so gather them ahead of that
1016 const auto& extents = req->get_image_extents();
1017 bool write_op = req->is_write_op();
1018 uint64_t tid = req->get_tid();
1019 uint64_t offset = extents.size() ? extents.front().first : 0;
1020 uint64_t length = extents.size() ? extents.front().second : 0;
1021
1022 if (write_op && !req->blocked) {
1023 std::lock_guard locker{m_lock};
1024 bool blocked = block_overlapping_io(&m_in_flight_extents, offset, length);
1025 if (blocked) {
1026 ldout(cct, 20) << "blocking overlapping IO: " << "ictx="
1027 << &m_image_ctx << ", "
1028 << "off=" << offset << ", len=" << length << dendl;
1029 req->blocked = true;
1030 m_blocked_ios.push_back(req);
1031 return;
1032 }
1033 }
1034
1035 req->start_op();
7c673cae
FG
1036 req->send();
1037
9f95a23c
TL
1038 if (write_op) {
1039 if (non_blocking_io) {
1040 finish_in_flight_write();
1041 }
1042 unblock_overlapping_io(offset, length, tid);
1043 unblock_flushes();
7c673cae
FG
1044 }
1045 delete req;
9f95a23c 1046}
7c673cae 1047
9f95a23c
TL
1048template <typename I>
1049void ImageRequestWQ<I>::process(ImageDispatchSpec<I> *req) {
1050 CephContext *cct = m_image_ctx.cct;
1051 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
1052 << "req=" << req << dendl;
1053
1054 bool write_op = req->is_write_op();
1055
1056 process_io(req, true);
1057
1058 finish_queued_io(write_op);
224ce89b 1059 finish_in_flight_io();
7c673cae
FG
1060}
1061
224ce89b 1062template <typename I>
9f95a23c
TL
1063void ImageRequestWQ<I>::remove_in_flight_write_ios(uint64_t offset, uint64_t length,
1064 bool write_op, uint64_t tid) {
1065 CephContext *cct = m_image_ctx.cct;
1066 ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl;
1067 {
1068 std::lock_guard locker{m_lock};
1069 if (write_op) {
1070 if (length > 0) {
1071 if(!m_in_flight_extents.empty()) {
1072 CephContext *cct = m_image_ctx.cct;
1073 ldout(cct, 20) << "erasing in flight extents with tid:"
1074 << tid << ", offset: " << offset << dendl;
1075 ImageExtentIntervals extents;
1076 extents.insert(offset, length);
1077 ImageExtentIntervals intersect;
1078 intersect.intersection_of(extents, m_in_flight_extents);
1079 m_in_flight_extents.subtract(intersect);
1080 }
1081 }
1082 m_queued_or_blocked_io_tids.erase(tid);
1083 }
1084 }
1085}
1086
1087template <typename I>
1088void ImageRequestWQ<I>::finish_queued_io(bool write_op) {
1089 std::shared_lock locker{m_lock};
1090 if (write_op) {
11fdf7f2 1091 ceph_assert(m_queued_writes > 0);
7c673cae
FG
1092 m_queued_writes--;
1093 } else {
11fdf7f2 1094 ceph_assert(m_queued_reads > 0);
7c673cae
FG
1095 m_queued_reads--;
1096 }
1097}
1098
224ce89b
WB
1099template <typename I>
1100void ImageRequestWQ<I>::finish_in_flight_write() {
7c673cae
FG
1101 bool writes_blocked = false;
1102 {
9f95a23c 1103 std::shared_lock locker{m_lock};
11fdf7f2 1104 ceph_assert(m_in_flight_writes > 0);
224ce89b 1105 if (--m_in_flight_writes == 0 &&
7c673cae
FG
1106 !m_write_blocker_contexts.empty()) {
1107 writes_blocked = true;
1108 }
1109 }
7c673cae 1110 if (writes_blocked) {
11fdf7f2 1111 flush_image(m_image_ctx, new C_BlockedWrites(this));
7c673cae
FG
1112 }
1113}
1114
224ce89b
WB
1115template <typename I>
1116int ImageRequestWQ<I>::start_in_flight_io(AioCompletion *c) {
9f95a23c 1117 std::shared_lock locker{m_lock};
7c673cae
FG
1118
1119 if (m_shutdown) {
1120 CephContext *cct = m_image_ctx.cct;
1121 lderr(cct) << "IO received on closed image" << dendl;
1122
1123 c->fail(-ESHUTDOWN);
1124 return false;
1125 }
1126
eafe8130
TL
1127 if (!m_image_ctx.data_ctx.is_valid()) {
1128 CephContext *cct = m_image_ctx.cct;
1129 lderr(cct) << "missing data pool" << dendl;
1130
eafe8130
TL
1131 c->fail(-ENODEV);
1132 return false;
1133 }
1134
224ce89b 1135 m_in_flight_ios++;
7c673cae
FG
1136 return true;
1137}
1138
224ce89b
WB
1139template <typename I>
1140void ImageRequestWQ<I>::finish_in_flight_io() {
7c673cae
FG
1141 Context *on_shutdown;
1142 {
9f95a23c 1143 std::shared_lock locker{m_lock};
224ce89b 1144 if (--m_in_flight_ios > 0 || !m_shutdown) {
7c673cae
FG
1145 return;
1146 }
1147 on_shutdown = m_on_shutdown;
1148 }
1149
1150 CephContext *cct = m_image_ctx.cct;
1151 ldout(cct, 5) << "completing shut down" << dendl;
1152
11fdf7f2
TL
1153 ceph_assert(on_shutdown != nullptr);
1154 flush_image(m_image_ctx, on_shutdown);
7c673cae
FG
1155}
1156
224ce89b 1157template <typename I>
11fdf7f2
TL
1158void ImageRequestWQ<I>::fail_in_flight_io(
1159 int r, ImageDispatchSpec<I> *req) {
224ce89b
WB
1160 this->process_finish();
1161 req->fail(r);
9f95a23c
TL
1162
1163 bool write_op = req->is_write_op();
1164 uint64_t tid = req->get_tid();
1165 const auto& extents = req->get_image_extents();
1166 uint64_t offset = extents.size() ? extents.front().first : 0;
1167 uint64_t length = extents.size() ? extents.front().second : 0;
1168
1169 finish_queued_io(write_op);
1170 remove_in_flight_write_ios(offset, length, write_op, tid);
224ce89b
WB
1171 delete req;
1172 finish_in_flight_io();
1173}
7c673cae 1174
224ce89b
WB
1175template <typename I>
1176bool ImageRequestWQ<I>::is_lock_required(bool write_op) const {
9f95a23c 1177 ceph_assert(ceph_mutex_is_locked(m_lock));
224ce89b
WB
1178 return ((write_op && m_require_lock_on_write) ||
1179 (!write_op && m_require_lock_on_read));
7c673cae
FG
1180}
1181
224ce89b 1182template <typename I>
11fdf7f2 1183void ImageRequestWQ<I>::queue(ImageDispatchSpec<I> *req) {
9f95a23c 1184 ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
224ce89b 1185
7c673cae
FG
1186 CephContext *cct = m_image_ctx.cct;
1187 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
1188 << "req=" << req << dendl;
1189
224ce89b 1190 if (req->is_write_op()) {
7c673cae
FG
1191 m_queued_writes++;
1192 } else {
1193 m_queued_reads++;
1194 }
1195
11fdf7f2 1196 ThreadPool::PointerWQ<ImageDispatchSpec<I> >::queue(req);
224ce89b 1197}
7c673cae 1198
224ce89b 1199template <typename I>
11fdf7f2
TL
1200void ImageRequestWQ<I>::handle_acquire_lock(
1201 int r, ImageDispatchSpec<I> *req) {
224ce89b
WB
1202 CephContext *cct = m_image_ctx.cct;
1203 ldout(cct, 5) << "r=" << r << ", " << "req=" << req << dendl;
1204
1205 if (r < 0) {
1206 fail_in_flight_io(r, req);
1207 } else {
1208 // since IO was stalled for acquire -- original IO order is preserved
1209 // if we requeue this op for work queue processing
81eedcae 1210 this->requeue_front(req);
7c673cae 1211 }
224ce89b 1212
11fdf7f2 1213 ceph_assert(m_io_blockers.load() > 0);
224ce89b
WB
1214 --m_io_blockers;
1215 this->signal();
7c673cae
FG
1216}
1217
224ce89b 1218template <typename I>
11fdf7f2
TL
1219void ImageRequestWQ<I>::handle_refreshed(
1220 int r, ImageDispatchSpec<I> *req) {
7c673cae 1221 CephContext *cct = m_image_ctx.cct;
224ce89b
WB
1222 ldout(cct, 5) << "resuming IO after image refresh: r=" << r << ", "
1223 << "req=" << req << dendl;
7c673cae 1224 if (r < 0) {
224ce89b 1225 fail_in_flight_io(r, req);
7c673cae
FG
1226 } else {
1227 // since IO was stalled for refresh -- original IO order is preserved
1228 // if we requeue this op for work queue processing
81eedcae 1229 this->requeue_front(req);
7c673cae
FG
1230 }
1231
11fdf7f2 1232 ceph_assert(m_io_blockers.load() > 0);
224ce89b
WB
1233 --m_io_blockers;
1234 this->signal();
7c673cae
FG
1235}
1236
224ce89b
WB
1237template <typename I>
1238void ImageRequestWQ<I>::handle_blocked_writes(int r) {
7c673cae
FG
1239 Contexts contexts;
1240 {
9f95a23c 1241 std::unique_lock locker{m_lock};
7c673cae
FG
1242 contexts.swap(m_write_blocker_contexts);
1243 }
1244
1245 for (auto ctx : contexts) {
1246 ctx->complete(0);
1247 }
1248}
1249
224ce89b
WB
1250template class librbd::io::ImageRequestWQ<librbd::ImageCtx>;
1251
7c673cae
FG
1252} // namespace io
1253} // namespace librbd