]> git.proxmox.com Git - ceph.git/blame - ceph/src/librbd/io/ImageRequestWQ.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / librbd / io / ImageRequestWQ.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "librbd/io/ImageRequestWQ.h"
5#include "common/errno.h"
31f18b77 6#include "common/zipkin_trace.h"
11fdf7f2 7#include "common/Cond.h"
7c673cae
FG
8#include "librbd/ExclusiveLock.h"
9#include "librbd/ImageCtx.h"
10#include "librbd/ImageState.h"
91327a77 11#include "librbd/ImageWatcher.h"
7c673cae
FG
12#include "librbd/internal.h"
13#include "librbd/Utils.h"
14#include "librbd/exclusive_lock/Policy.h"
15#include "librbd/io/AioCompletion.h"
16#include "librbd/io/ImageRequest.h"
11fdf7f2
TL
17#include "librbd/io/ImageDispatchSpec.h"
18#include "common/EventTrace.h"
7c673cae
FG
19
20#define dout_subsys ceph_subsys_rbd
21#undef dout_prefix
22#define dout_prefix *_dout << "librbd::io::ImageRequestWQ: " << this \
23 << " " << __func__ << ": "
24
25namespace librbd {
9f95a23c
TL
26
27using util::create_context_callback;
28
7c673cae
FG
29namespace io {
30
11fdf7f2
TL
31namespace {
32
33template <typename I>
34void flush_image(I& image_ctx, Context* on_finish) {
494da23a 35 auto aio_comp = librbd::io::AioCompletion::create_and_start(
11fdf7f2
TL
36 on_finish, util::get_image_ctx(&image_ctx), librbd::io::AIO_TYPE_FLUSH);
37 auto req = librbd::io::ImageDispatchSpec<I>::create_flush_request(
38 image_ctx, aio_comp, librbd::io::FLUSH_SOURCE_INTERNAL, {});
39 req->send();
40 delete req;
41}
42
43} // anonymous namespace
44
224ce89b
WB
45template <typename I>
46struct ImageRequestWQ<I>::C_AcquireLock : public Context {
47 ImageRequestWQ *work_queue;
11fdf7f2 48 ImageDispatchSpec<I> *image_request;
224ce89b 49
11fdf7f2 50 C_AcquireLock(ImageRequestWQ *work_queue, ImageDispatchSpec<I> *image_request)
224ce89b
WB
51 : work_queue(work_queue), image_request(image_request) {
52 }
53
54 void finish(int r) override {
55 work_queue->handle_acquire_lock(r, image_request);
56 }
57};
58
59template <typename I>
60struct ImageRequestWQ<I>::C_BlockedWrites : public Context {
61 ImageRequestWQ *work_queue;
11fdf7f2 62 explicit C_BlockedWrites(ImageRequestWQ *_work_queue)
224ce89b
WB
63 : work_queue(_work_queue) {
64 }
65
66 void finish(int r) override {
67 work_queue->handle_blocked_writes(r);
68 }
69};
70
71template <typename I>
72struct ImageRequestWQ<I>::C_RefreshFinish : public Context {
73 ImageRequestWQ *work_queue;
11fdf7f2 74 ImageDispatchSpec<I> *image_request;
224ce89b
WB
75
76 C_RefreshFinish(ImageRequestWQ *work_queue,
11fdf7f2 77 ImageDispatchSpec<I> *image_request)
224ce89b
WB
78 : work_queue(work_queue), image_request(image_request) {
79 }
80 void finish(int r) override {
81 work_queue->handle_refreshed(r, image_request);
82 }
83};
84
11fdf7f2
TL
85static std::map<uint64_t, std::string> throttle_flags = {
86 { RBD_QOS_IOPS_THROTTLE, "rbd_qos_iops_throttle" },
87 { RBD_QOS_BPS_THROTTLE, "rbd_qos_bps_throttle" },
88 { RBD_QOS_READ_IOPS_THROTTLE, "rbd_qos_read_iops_throttle" },
89 { RBD_QOS_WRITE_IOPS_THROTTLE, "rbd_qos_write_iops_throttle" },
90 { RBD_QOS_READ_BPS_THROTTLE, "rbd_qos_read_bps_throttle" },
91 { RBD_QOS_WRITE_BPS_THROTTLE, "rbd_qos_write_bps_throttle" }
92};
93
224ce89b
WB
94template <typename I>
95ImageRequestWQ<I>::ImageRequestWQ(I *image_ctx, const string &name,
96 time_t ti, ThreadPool *tp)
11fdf7f2 97 : ThreadPool::PointerWQ<ImageDispatchSpec<I> >(name, ti, 0, tp),
7c673cae 98 m_image_ctx(*image_ctx),
9f95a23c
TL
99 m_lock(ceph::make_shared_mutex(
100 util::unique_lock_name("ImageRequestWQ<I>::m_lock", this))) {
7c673cae
FG
101 CephContext *cct = m_image_ctx.cct;
102 ldout(cct, 5) << "ictx=" << image_ctx << dendl;
11fdf7f2
TL
103
104 SafeTimer *timer;
9f95a23c 105 ceph::mutex *timer_lock;
11fdf7f2
TL
106 ImageCtx::get_timer_instance(cct, &timer, &timer_lock);
107
108 for (auto flag : throttle_flags) {
109 m_throttles.push_back(make_pair(
110 flag.first,
111 new TokenBucketThrottle(cct, flag.second, 0, 0, timer, timer_lock)));
112 }
113
224ce89b 114 this->register_work_queue();
7c673cae
FG
115}
116
11fdf7f2
TL
117template <typename I>
118ImageRequestWQ<I>::~ImageRequestWQ() {
119 for (auto t : m_throttles) {
120 delete t.second;
121 }
122}
123
224ce89b
WB
124template <typename I>
125ssize_t ImageRequestWQ<I>::read(uint64_t off, uint64_t len,
126 ReadResult &&read_result, int op_flags) {
7c673cae
FG
127 CephContext *cct = m_image_ctx.cct;
128 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
129 << "len = " << len << dendl;
130
131 C_SaferCond cond;
132 AioCompletion *c = AioCompletion::create(&cond);
133 aio_read(c, off, len, std::move(read_result), op_flags, false);
134 return cond.wait();
135}
136
224ce89b
WB
137template <typename I>
138ssize_t ImageRequestWQ<I>::write(uint64_t off, uint64_t len,
139 bufferlist &&bl, int op_flags) {
7c673cae
FG
140 CephContext *cct = m_image_ctx.cct;
141 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
142 << "len = " << len << dendl;
143
9f95a23c 144 m_image_ctx.image_lock.lock_shared();
7c673cae 145 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
9f95a23c 146 m_image_ctx.image_lock.unlock_shared();
7c673cae
FG
147 if (r < 0) {
148 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
149 return r;
150 }
151
152 C_SaferCond cond;
153 AioCompletion *c = AioCompletion::create(&cond);
154 aio_write(c, off, len, std::move(bl), op_flags, false);
155
156 r = cond.wait();
157 if (r < 0) {
158 return r;
159 }
160 return len;
161}
162
224ce89b
WB
163template <typename I>
164ssize_t ImageRequestWQ<I>::discard(uint64_t off, uint64_t len,
11fdf7f2 165 uint32_t discard_granularity_bytes) {
7c673cae
FG
166 CephContext *cct = m_image_ctx.cct;
167 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
168 << "len = " << len << dendl;
169
9f95a23c 170 m_image_ctx.image_lock.lock_shared();
7c673cae 171 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
9f95a23c 172 m_image_ctx.image_lock.unlock_shared();
7c673cae
FG
173 if (r < 0) {
174 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
175 return r;
176 }
177
178 C_SaferCond cond;
179 AioCompletion *c = AioCompletion::create(&cond);
11fdf7f2 180 aio_discard(c, off, len, discard_granularity_bytes, false);
7c673cae
FG
181
182 r = cond.wait();
183 if (r < 0) {
184 return r;
185 }
186 return len;
187}
188
224ce89b
WB
189template <typename I>
190ssize_t ImageRequestWQ<I>::writesame(uint64_t off, uint64_t len,
191 bufferlist &&bl, int op_flags) {
7c673cae
FG
192 CephContext *cct = m_image_ctx.cct;
193 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
194 << "len = " << len << ", data_len " << bl.length() << dendl;
195
9f95a23c 196 m_image_ctx.image_lock.lock_shared();
7c673cae 197 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
9f95a23c 198 m_image_ctx.image_lock.unlock_shared();
7c673cae
FG
199 if (r < 0) {
200 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
201 return r;
202 }
203
204 C_SaferCond cond;
205 AioCompletion *c = AioCompletion::create(&cond);
206 aio_writesame(c, off, len, std::move(bl), op_flags, false);
207
208 r = cond.wait();
209 if (r < 0) {
210 return r;
211 }
212 return len;
213}
214
c07f9fc5
FG
215template <typename I>
216ssize_t ImageRequestWQ<I>::compare_and_write(uint64_t off, uint64_t len,
217 bufferlist &&cmp_bl,
218 bufferlist &&bl,
219 uint64_t *mismatch_off,
220 int op_flags){
221 CephContext *cct = m_image_ctx.cct;
222 ldout(cct, 20) << "compare_and_write ictx=" << &m_image_ctx << ", off="
223 << off << ", " << "len = " << len << dendl;
224
9f95a23c 225 m_image_ctx.image_lock.lock_shared();
c07f9fc5 226 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
9f95a23c 227 m_image_ctx.image_lock.unlock_shared();
c07f9fc5
FG
228 if (r < 0) {
229 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
230 return r;
231 }
232
233 C_SaferCond cond;
234 AioCompletion *c = AioCompletion::create(&cond);
235 aio_compare_and_write(c, off, len, std::move(cmp_bl), std::move(bl),
236 mismatch_off, op_flags, false);
237
238 r = cond.wait();
239 if (r < 0) {
240 return r;
241 }
242
243 return len;
244}
245
11fdf7f2
TL
246template <typename I>
247int ImageRequestWQ<I>::flush() {
248 CephContext *cct = m_image_ctx.cct;
249 ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl;
250
251 C_SaferCond cond;
252 AioCompletion *c = AioCompletion::create(&cond);
253 aio_flush(c, false);
254
255 int r = cond.wait();
256 if (r < 0) {
257 return r;
258 }
259
260 return 0;
261}
262
224ce89b
WB
263template <typename I>
264void ImageRequestWQ<I>::aio_read(AioCompletion *c, uint64_t off, uint64_t len,
265 ReadResult &&read_result, int op_flags,
266 bool native_async) {
7c673cae 267 CephContext *cct = m_image_ctx.cct;
11fdf7f2 268 FUNCTRACE(cct);
31f18b77 269 ZTracer::Trace trace;
181888fb 270 if (m_image_ctx.blkin_trace_all) {
31f18b77
FG
271 trace.init("wq: read", &m_image_ctx.trace_endpoint);
272 trace.event("start");
273 }
274
224ce89b 275 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_READ);
7c673cae
FG
276 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
277 << "completion=" << c << ", off=" << off << ", "
278 << "len=" << len << ", " << "flags=" << op_flags << dendl;
279
280 if (native_async && m_image_ctx.event_socket.is_valid()) {
281 c->set_event_notify(true);
282 }
283
224ce89b 284 if (!start_in_flight_io(c)) {
7c673cae
FG
285 return;
286 }
287
7c673cae
FG
288 // if journaling is enabled -- we need to replay the journal because
289 // it might contain an uncommitted write
9f95a23c 290 std::shared_lock owner_locker{m_image_ctx.owner_lock};
7c673cae 291 if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty() ||
224ce89b 292 require_lock_on_read()) {
11fdf7f2 293 queue(ImageDispatchSpec<I>::create_read_request(
224ce89b
WB
294 m_image_ctx, c, {{off, len}}, std::move(read_result), op_flags,
295 trace));
7c673cae
FG
296 } else {
297 c->start_op();
224ce89b
WB
298 ImageRequest<I>::aio_read(&m_image_ctx, c, {{off, len}},
299 std::move(read_result), op_flags, trace);
300 finish_in_flight_io();
7c673cae 301 }
31f18b77 302 trace.event("finish");
7c673cae
FG
303}
304
224ce89b
WB
305template <typename I>
306void ImageRequestWQ<I>::aio_write(AioCompletion *c, uint64_t off, uint64_t len,
307 bufferlist &&bl, int op_flags,
308 bool native_async) {
7c673cae 309 CephContext *cct = m_image_ctx.cct;
11fdf7f2 310 FUNCTRACE(cct);
31f18b77 311 ZTracer::Trace trace;
181888fb 312 if (m_image_ctx.blkin_trace_all) {
31f18b77
FG
313 trace.init("wq: write", &m_image_ctx.trace_endpoint);
314 trace.event("init");
315 }
316
224ce89b 317 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITE);
7c673cae
FG
318 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
319 << "completion=" << c << ", off=" << off << ", "
320 << "len=" << len << ", flags=" << op_flags << dendl;
321
322 if (native_async && m_image_ctx.event_socket.is_valid()) {
323 c->set_event_notify(true);
324 }
325
224ce89b 326 if (!start_in_flight_io(c)) {
7c673cae
FG
327 return;
328 }
329
9f95a23c
TL
330 auto tid = ++m_last_tid;
331
332 {
333 std::lock_guard locker{m_lock};
334 m_queued_or_blocked_io_tids.insert(tid);
335 }
336
337 ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_write_request(
338 m_image_ctx, c, {{off, len}}, std::move(bl), op_flags, trace, tid);
339
340 std::shared_lock owner_locker{m_image_ctx.owner_lock};
7c673cae 341 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
9f95a23c 342 queue(req);
7c673cae 343 } else {
9f95a23c 344 process_io(req, false);
224ce89b 345 finish_in_flight_io();
7c673cae 346 }
31f18b77 347 trace.event("finish");
7c673cae
FG
348}
349
224ce89b
WB
350template <typename I>
351void ImageRequestWQ<I>::aio_discard(AioCompletion *c, uint64_t off,
11fdf7f2
TL
352 uint64_t len,
353 uint32_t discard_granularity_bytes,
224ce89b 354 bool native_async) {
7c673cae 355 CephContext *cct = m_image_ctx.cct;
11fdf7f2 356 FUNCTRACE(cct);
31f18b77 357 ZTracer::Trace trace;
181888fb 358 if (m_image_ctx.blkin_trace_all) {
31f18b77
FG
359 trace.init("wq: discard", &m_image_ctx.trace_endpoint);
360 trace.event("init");
361 }
362
224ce89b 363 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_DISCARD);
7c673cae
FG
364 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
365 << "completion=" << c << ", off=" << off << ", len=" << len
366 << dendl;
367
368 if (native_async && m_image_ctx.event_socket.is_valid()) {
369 c->set_event_notify(true);
370 }
371
224ce89b 372 if (!start_in_flight_io(c)) {
7c673cae
FG
373 return;
374 }
375
9f95a23c
TL
376 auto tid = ++m_last_tid;
377
378 {
379 std::lock_guard locker{m_lock};
380 m_queued_or_blocked_io_tids.insert(tid);
381 }
382
383 ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_discard_request(
384 m_image_ctx, c, off, len, discard_granularity_bytes, trace, tid);
385
386 std::shared_lock owner_locker{m_image_ctx.owner_lock};
7c673cae 387 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
9f95a23c 388 queue(req);
7c673cae 389 } else {
9f95a23c 390 process_io(req, false);
224ce89b 391 finish_in_flight_io();
7c673cae 392 }
31f18b77 393 trace.event("finish");
7c673cae
FG
394}
395
224ce89b
WB
396template <typename I>
397void ImageRequestWQ<I>::aio_flush(AioCompletion *c, bool native_async) {
7c673cae 398 CephContext *cct = m_image_ctx.cct;
11fdf7f2 399 FUNCTRACE(cct);
31f18b77 400 ZTracer::Trace trace;
181888fb 401 if (m_image_ctx.blkin_trace_all) {
31f18b77
FG
402 trace.init("wq: flush", &m_image_ctx.trace_endpoint);
403 trace.event("init");
404 }
405
224ce89b 406 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_FLUSH);
7c673cae
FG
407 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
408 << "completion=" << c << dendl;
409
410 if (native_async && m_image_ctx.event_socket.is_valid()) {
411 c->set_event_notify(true);
412 }
413
224ce89b 414 if (!start_in_flight_io(c)) {
7c673cae
FG
415 return;
416 }
417
9f95a23c
TL
418 auto tid = ++m_last_tid;
419
420 ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_flush_request(
421 m_image_ctx, c, FLUSH_SOURCE_USER, trace);
422
423 {
424 std::lock_guard locker{m_lock};
425 if(!m_queued_or_blocked_io_tids.empty()) {
426 ldout(cct, 20) << "queueing flush, tid: " << tid << dendl;
427 m_queued_flushes.emplace(tid, req);
428 --m_in_flight_ios;
429 return;
430 }
431 }
432
433 std::shared_lock owner_locker{m_image_ctx.owner_lock};
7c673cae 434 if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty()) {
9f95a23c 435 queue(req);
7c673cae 436 } else {
9f95a23c 437 process_io(req, false);
224ce89b 438 finish_in_flight_io();
7c673cae 439 }
31f18b77 440 trace.event("finish");
7c673cae
FG
441}
442
224ce89b
WB
443template <typename I>
444void ImageRequestWQ<I>::aio_writesame(AioCompletion *c, uint64_t off,
445 uint64_t len, bufferlist &&bl,
446 int op_flags, bool native_async) {
7c673cae 447 CephContext *cct = m_image_ctx.cct;
11fdf7f2 448 FUNCTRACE(cct);
31f18b77 449 ZTracer::Trace trace;
181888fb 450 if (m_image_ctx.blkin_trace_all) {
31f18b77
FG
451 trace.init("wq: writesame", &m_image_ctx.trace_endpoint);
452 trace.event("init");
453 }
454
224ce89b 455 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITESAME);
7c673cae
FG
456 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
457 << "completion=" << c << ", off=" << off << ", "
458 << "len=" << len << ", data_len = " << bl.length() << ", "
459 << "flags=" << op_flags << dendl;
460
461 if (native_async && m_image_ctx.event_socket.is_valid()) {
462 c->set_event_notify(true);
463 }
464
224ce89b 465 if (!start_in_flight_io(c)) {
7c673cae
FG
466 return;
467 }
468
9f95a23c
TL
469 auto tid = ++m_last_tid;
470
471 {
472 std::lock_guard locker{m_lock};
473 m_queued_or_blocked_io_tids.insert(tid);
474 }
475
476 ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_write_same_request(
477 m_image_ctx, c, off, len, std::move(bl), op_flags, trace, tid);
478
479 std::shared_lock owner_locker{m_image_ctx.owner_lock};
7c673cae 480 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
9f95a23c 481 queue(req);
7c673cae 482 } else {
9f95a23c 483 process_io(req, false);
224ce89b 484 finish_in_flight_io();
7c673cae 485 }
31f18b77 486 trace.event("finish");
7c673cae
FG
487}
488
c07f9fc5
FG
489template <typename I>
490void ImageRequestWQ<I>::aio_compare_and_write(AioCompletion *c,
491 uint64_t off, uint64_t len,
492 bufferlist &&cmp_bl,
493 bufferlist &&bl,
494 uint64_t *mismatch_off,
495 int op_flags, bool native_async) {
496 CephContext *cct = m_image_ctx.cct;
11fdf7f2 497 FUNCTRACE(cct);
c07f9fc5 498 ZTracer::Trace trace;
181888fb 499 if (m_image_ctx.blkin_trace_all) {
c07f9fc5
FG
500 trace.init("wq: compare_and_write", &m_image_ctx.trace_endpoint);
501 trace.event("init");
502 }
503
504 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_COMPARE_AND_WRITE);
505 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
506 << "completion=" << c << ", off=" << off << ", "
507 << "len=" << len << dendl;
508
509 if (native_async && m_image_ctx.event_socket.is_valid()) {
510 c->set_event_notify(true);
511 }
512
513 if (!start_in_flight_io(c)) {
514 return;
515 }
516
9f95a23c
TL
517 auto tid = ++m_last_tid;
518
519 {
520 std::lock_guard locker{m_lock};
521 m_queued_or_blocked_io_tids.insert(tid);
522 }
523
524 ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_compare_and_write_request(
c07f9fc5 525 m_image_ctx, c, {{off, len}}, std::move(cmp_bl), std::move(bl),
9f95a23c
TL
526 mismatch_off, op_flags, trace, tid);
527
528 std::shared_lock owner_locker{m_image_ctx.owner_lock};
529 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
530 queue(req);
c07f9fc5 531 } else {
9f95a23c 532 process_io(req, false);
c07f9fc5
FG
533 finish_in_flight_io();
534 }
535 trace.event("finish");
536}
537
9f95a23c
TL
538template <typename I>
539bool ImageRequestWQ<I>::block_overlapping_io(
540 ImageExtentIntervals* in_flight_image_extents, uint64_t off, uint64_t len) {
541 CephContext *cct = m_image_ctx.cct;
542 ldout(cct, 20) << "ictx=" << &m_image_ctx
543 << "off: " << off << " len: " << len <<dendl;
544
545 if(len == 0) {
546 return false;
547 }
548
549 if (in_flight_image_extents->empty() ||
550 !in_flight_image_extents->intersects(off, len)) {
551 in_flight_image_extents->insert(off, len);
552 return false;
553 }
554
555 return true;
556}
557
558template <typename I>
559void ImageRequestWQ<I>::unblock_overlapping_io(uint64_t offset, uint64_t length,
560 uint64_t tid) {
561 CephContext *cct = m_image_ctx.cct;
562 ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl;
563
564 remove_in_flight_write_ios(offset, length, true, tid);
565
566 std::unique_lock locker{m_lock};
567 if (!m_blocked_ios.empty()) {
568 auto it = m_blocked_ios.begin();
569 while (it != m_blocked_ios.end()) {
570 auto blocked_io = *it;
571
572 const auto& extents = blocked_io->get_image_extents();
573 uint64_t off = extents.size() ? extents.front().first : 0;
574 uint64_t len = extents.size() ? extents.front().second : 0;
575
576 if (block_overlapping_io(&m_in_flight_extents, off, len)) {
577 break;
578 }
579 ldout(cct, 20) << "unblocking off: " << off << ", "
580 << "len: " << len << dendl;
581 AioCompletion *aio_comp = blocked_io->get_aio_completion();
582
583 m_blocked_ios.erase(it);
584 locker.unlock();
585 queue_unblocked_io(aio_comp, blocked_io);
586 locker.lock();
587 }
588 }
589}
590
591template <typename I>
592void ImageRequestWQ<I>::unblock_flushes() {
593 CephContext *cct = m_image_ctx.cct;
594 ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl;
595 std::unique_lock locker{m_lock};
596 auto io_tid_it = m_queued_or_blocked_io_tids.begin();
597 while (true) {
598 auto it = m_queued_flushes.begin();
599 if (it == m_queued_flushes.end() ||
600 (io_tid_it != m_queued_or_blocked_io_tids.end() &&
601 *io_tid_it < it->first)) {
602 break;
603 }
604
605 auto blocked_flush = *it;
606 ldout(cct, 20) << "unblocking flush: tid " << blocked_flush.first << dendl;
607
608 AioCompletion *aio_comp = blocked_flush.second->get_aio_completion();
609
610 m_queued_flushes.erase(it);
611 locker.unlock();
612 queue_unblocked_io(aio_comp, blocked_flush.second);
613 locker.lock();
614 }
615}
616
617template <typename I>
618void ImageRequestWQ<I>::queue_unblocked_io(AioCompletion *comp,
619 ImageDispatchSpec<I> *req) {
620 if (!start_in_flight_io(comp)) {
621 return;
622 }
623
624 std::shared_lock owner_locker{m_image_ctx.owner_lock};
625 queue(req);
626}
627
224ce89b
WB
628template <typename I>
629void ImageRequestWQ<I>::shut_down(Context *on_shutdown) {
9f95a23c 630 ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
7c673cae
FG
631
632 {
9f95a23c 633 std::unique_lock locker{m_lock};
11fdf7f2 634 ceph_assert(!m_shutdown);
7c673cae
FG
635 m_shutdown = true;
636
637 CephContext *cct = m_image_ctx.cct;
224ce89b 638 ldout(cct, 5) << __func__ << ": in_flight=" << m_in_flight_ios.load()
7c673cae 639 << dendl;
224ce89b 640 if (m_in_flight_ios > 0) {
7c673cae
FG
641 m_on_shutdown = on_shutdown;
642 return;
643 }
644 }
645
646 // ensure that all in-flight IO is flushed
11fdf7f2 647 flush_image(m_image_ctx, on_shutdown);
7c673cae
FG
648}
649
224ce89b
WB
650template <typename I>
651int ImageRequestWQ<I>::block_writes() {
7c673cae
FG
652 C_SaferCond cond_ctx;
653 block_writes(&cond_ctx);
654 return cond_ctx.wait();
655}
656
224ce89b
WB
657template <typename I>
658void ImageRequestWQ<I>::block_writes(Context *on_blocked) {
9f95a23c 659 ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
7c673cae
FG
660 CephContext *cct = m_image_ctx.cct;
661
662 {
9f95a23c 663 std::unique_lock locker{m_lock};
7c673cae
FG
664 ++m_write_blockers;
665 ldout(cct, 5) << &m_image_ctx << ", " << "num="
666 << m_write_blockers << dendl;
224ce89b 667 if (!m_write_blocker_contexts.empty() || m_in_flight_writes > 0) {
7c673cae
FG
668 m_write_blocker_contexts.push_back(on_blocked);
669 return;
670 }
671 }
672
673 // ensure that all in-flight IO is flushed
11fdf7f2 674 flush_image(m_image_ctx, on_blocked);
7c673cae
FG
675}
676
224ce89b
WB
677template <typename I>
678void ImageRequestWQ<I>::unblock_writes() {
7c673cae
FG
679 CephContext *cct = m_image_ctx.cct;
680
681 bool wake_up = false;
11fdf7f2 682 Contexts waiter_contexts;
7c673cae 683 {
9f95a23c 684 std::unique_lock locker{m_lock};
11fdf7f2 685 ceph_assert(m_write_blockers > 0);
7c673cae
FG
686 --m_write_blockers;
687
688 ldout(cct, 5) << &m_image_ctx << ", " << "num="
689 << m_write_blockers << dendl;
690 if (m_write_blockers == 0) {
691 wake_up = true;
11fdf7f2 692 std::swap(waiter_contexts, m_unblocked_write_waiter_contexts);
7c673cae
FG
693 }
694 }
695
696 if (wake_up) {
11fdf7f2
TL
697 for (auto ctx : waiter_contexts) {
698 ctx->complete(0);
699 }
224ce89b 700 this->signal();
7c673cae
FG
701 }
702}
703
11fdf7f2
TL
704template <typename I>
705void ImageRequestWQ<I>::wait_on_writes_unblocked(Context *on_unblocked) {
9f95a23c 706 ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
11fdf7f2
TL
707 CephContext *cct = m_image_ctx.cct;
708
709 {
9f95a23c 710 std::unique_lock locker{m_lock};
11fdf7f2
TL
711 ldout(cct, 20) << &m_image_ctx << ", " << "write_blockers="
712 << m_write_blockers << dendl;
713 if (!m_unblocked_write_waiter_contexts.empty() || m_write_blockers > 0) {
714 m_unblocked_write_waiter_contexts.push_back(on_unblocked);
715 return;
716 }
717 }
718
719 on_unblocked->complete(0);
720}
721
224ce89b
WB
722template <typename I>
723void ImageRequestWQ<I>::set_require_lock(Direction direction, bool enabled) {
7c673cae
FG
724 CephContext *cct = m_image_ctx.cct;
725 ldout(cct, 20) << dendl;
726
224ce89b 727 bool wake_up = false;
7c673cae 728 {
9f95a23c 729 std::unique_lock locker{m_lock};
224ce89b
WB
730 switch (direction) {
731 case DIRECTION_READ:
732 wake_up = (enabled != m_require_lock_on_read);
733 m_require_lock_on_read = enabled;
734 break;
735 case DIRECTION_WRITE:
736 wake_up = (enabled != m_require_lock_on_write);
737 m_require_lock_on_write = enabled;
738 break;
739 case DIRECTION_BOTH:
740 wake_up = (enabled != m_require_lock_on_read ||
741 enabled != m_require_lock_on_write);
742 m_require_lock_on_read = enabled;
743 m_require_lock_on_write = enabled;
744 break;
7c673cae 745 }
224ce89b 746 }
7c673cae 747
224ce89b
WB
748 // wake up the thread pool whenever the state changes so that
749 // we can re-request the lock if required
750 if (wake_up) {
751 this->signal();
7c673cae 752 }
7c673cae
FG
753}
754
11fdf7f2
TL
755template <typename I>
756void ImageRequestWQ<I>::apply_qos_schedule_tick_min(uint64_t tick){
757 for (auto pair : m_throttles) {
758 pair.second->set_schedule_tick_min(tick);
759 }
760}
761
762template <typename I>
763void ImageRequestWQ<I>::apply_qos_limit(const uint64_t flag,
764 uint64_t limit,
765 uint64_t burst) {
766 CephContext *cct = m_image_ctx.cct;
767 TokenBucketThrottle *throttle = nullptr;
768 for (auto pair : m_throttles) {
769 if (flag == pair.first) {
770 throttle = pair.second;
771 break;
772 }
773 }
774 ceph_assert(throttle != nullptr);
775
776 int r = throttle->set_limit(limit, burst);
777 if (r < 0) {
778 lderr(cct) << throttle->get_name() << ": invalid qos parameter: "
779 << "burst(" << burst << ") is less than "
780 << "limit(" << limit << ")" << dendl;
781 // if apply failed, we should at least make sure the limit works.
782 throttle->set_limit(limit, 0);
783 }
784
785 if (limit)
786 m_qos_enabled_flag |= flag;
787 else
788 m_qos_enabled_flag &= ~flag;
789}
790
791template <typename I>
792void ImageRequestWQ<I>::handle_throttle_ready(int r, ImageDispatchSpec<I> *item, uint64_t flag) {
793 CephContext *cct = m_image_ctx.cct;
794 ldout(cct, 15) << "r=" << r << ", " << "req=" << item << dendl;
795
796 ceph_assert(m_io_throttled.load() > 0);
797 item->set_throttled(flag);
798 if (item->were_all_throttled()) {
81eedcae 799 this->requeue_back(item);
11fdf7f2
TL
800 --m_io_throttled;
801 this->signal();
802 }
803}
804
805template <typename I>
806bool ImageRequestWQ<I>::needs_throttle(ImageDispatchSpec<I> *item) {
807 uint64_t tokens = 0;
808 uint64_t flag = 0;
809 bool blocked = false;
810 TokenBucketThrottle* throttle = nullptr;
811
812 for (auto t : m_throttles) {
813 flag = t.first;
814 if (item->was_throttled(flag))
815 continue;
816
817 if (!(m_qos_enabled_flag & flag)) {
818 item->set_throttled(flag);
819 continue;
820 }
821
822 throttle = t.second;
81eedcae
TL
823 if (item->tokens_requested(flag, &tokens) &&
824 throttle->get<ImageRequestWQ<I>, ImageDispatchSpec<I>,
11fdf7f2
TL
825 &ImageRequestWQ<I>::handle_throttle_ready>(
826 tokens, this, item, flag)) {
827 blocked = true;
828 } else {
829 item->set_throttled(flag);
830 }
831 }
832 return blocked;
833}
834
224ce89b
WB
835template <typename I>
836void *ImageRequestWQ<I>::_void_dequeue() {
837 CephContext *cct = m_image_ctx.cct;
11fdf7f2 838 ImageDispatchSpec<I> *peek_item = this->front();
7c673cae 839
224ce89b
WB
840 // no queued IO requests or all IO is blocked/stalled
841 if (peek_item == nullptr || m_io_blockers.load() > 0) {
7c673cae
FG
842 return nullptr;
843 }
844
11fdf7f2
TL
845 if (needs_throttle(peek_item)) {
846 ldout(cct, 15) << "throttling IO " << peek_item << dendl;
847
848 ++m_io_throttled;
849 // dequeue the throttled item
850 ThreadPool::PointerWQ<ImageDispatchSpec<I> >::_void_dequeue();
851 return nullptr;
852 }
853
224ce89b 854 bool lock_required;
7c673cae
FG
855 bool refresh_required = m_image_ctx.state->is_refresh_required();
856 {
9f95a23c 857 std::shared_lock locker{m_lock};
224ce89b
WB
858 bool write_op = peek_item->is_write_op();
859 lock_required = is_lock_required(write_op);
860 if (write_op) {
861 if (!lock_required && m_write_blockers > 0) {
862 // missing lock is not the write blocker
7c673cae
FG
863 return nullptr;
864 }
865
9f95a23c 866 if (!lock_required && !refresh_required && !peek_item->blocked) {
224ce89b
WB
867 // completed ops will requeue the IO -- don't count it as in-progress
868 m_in_flight_writes++;
7c673cae 869 }
7c673cae
FG
870 }
871 }
872
11fdf7f2
TL
873 auto item = reinterpret_cast<ImageDispatchSpec<I> *>(
874 ThreadPool::PointerWQ<ImageDispatchSpec<I> >::_void_dequeue());
875 ceph_assert(peek_item == item);
7c673cae 876
224ce89b 877 if (lock_required) {
11fdf7f2 878 this->get_pool_lock().unlock();
9f95a23c 879 m_image_ctx.owner_lock.lock_shared();
224ce89b
WB
880 if (m_image_ctx.exclusive_lock != nullptr) {
881 ldout(cct, 5) << "exclusive lock required: delaying IO " << item << dendl;
882 if (!m_image_ctx.get_exclusive_lock_policy()->may_auto_request_lock()) {
883 lderr(cct) << "op requires exclusive lock" << dendl;
91327a77
AA
884 fail_in_flight_io(m_image_ctx.exclusive_lock->get_unlocked_op_error(),
885 item);
224ce89b
WB
886
887 // wake up the IO since we won't be returning a request to process
888 this->signal();
889 } else {
890 // stall IO until the acquire completes
891 ++m_io_blockers;
9f95a23c
TL
892 Context *ctx = new C_AcquireLock(this, item);
893 ctx = create_context_callback<
894 Context, &Context::complete>(
895 ctx, m_image_ctx.exclusive_lock);
896 m_image_ctx.exclusive_lock->acquire_lock(ctx);
224ce89b
WB
897 }
898 } else {
899 // raced with the exclusive lock being disabled
900 lock_required = false;
901 }
9f95a23c 902 m_image_ctx.owner_lock.unlock_shared();
11fdf7f2 903 this->get_pool_lock().lock();
224ce89b
WB
904
905 if (lock_required) {
906 return nullptr;
907 }
908 }
909
7c673cae 910 if (refresh_required) {
224ce89b 911 ldout(cct, 5) << "image refresh required: delaying IO " << item << dendl;
7c673cae
FG
912
913 // stall IO until the refresh completes
224ce89b 914 ++m_io_blockers;
7c673cae 915
11fdf7f2 916 this->get_pool_lock().unlock();
7c673cae 917 m_image_ctx.state->refresh(new C_RefreshFinish(this, item));
11fdf7f2 918 this->get_pool_lock().lock();
7c673cae
FG
919 return nullptr;
920 }
921
7c673cae
FG
922 return item;
923}
924
224ce89b 925template <typename I>
9f95a23c
TL
926void ImageRequestWQ<I>::process_io(ImageDispatchSpec<I> *req,
927 bool non_blocking_io) {
7c673cae
FG
928 CephContext *cct = m_image_ctx.cct;
929 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
930 << "req=" << req << dendl;
931
9f95a23c
TL
932 //extents are invalidated after the request is sent
933 //so gather them ahead of that
934 const auto& extents = req->get_image_extents();
935 bool write_op = req->is_write_op();
936 uint64_t tid = req->get_tid();
937 uint64_t offset = extents.size() ? extents.front().first : 0;
938 uint64_t length = extents.size() ? extents.front().second : 0;
939
940 if (write_op && !req->blocked) {
941 std::lock_guard locker{m_lock};
942 bool blocked = block_overlapping_io(&m_in_flight_extents, offset, length);
943 if (blocked) {
944 ldout(cct, 20) << "blocking overlapping IO: " << "ictx="
945 << &m_image_ctx << ", "
946 << "off=" << offset << ", len=" << length << dendl;
947 req->blocked = true;
948 m_blocked_ios.push_back(req);
949 return;
950 }
951 }
952
953 req->start_op();
7c673cae
FG
954 req->send();
955
9f95a23c
TL
956 if (write_op) {
957 if (non_blocking_io) {
958 finish_in_flight_write();
959 }
960 unblock_overlapping_io(offset, length, tid);
961 unblock_flushes();
7c673cae
FG
962 }
963 delete req;
9f95a23c 964}
7c673cae 965
9f95a23c
TL
966template <typename I>
967void ImageRequestWQ<I>::process(ImageDispatchSpec<I> *req) {
968 CephContext *cct = m_image_ctx.cct;
969 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
970 << "req=" << req << dendl;
971
972 bool write_op = req->is_write_op();
973
974 process_io(req, true);
975
976 finish_queued_io(write_op);
224ce89b 977 finish_in_flight_io();
7c673cae
FG
978}
979
224ce89b 980template <typename I>
9f95a23c
TL
981void ImageRequestWQ<I>::remove_in_flight_write_ios(uint64_t offset, uint64_t length,
982 bool write_op, uint64_t tid) {
983 CephContext *cct = m_image_ctx.cct;
984 ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl;
985 {
986 std::lock_guard locker{m_lock};
987 if (write_op) {
988 if (length > 0) {
989 if(!m_in_flight_extents.empty()) {
990 CephContext *cct = m_image_ctx.cct;
991 ldout(cct, 20) << "erasing in flight extents with tid:"
992 << tid << ", offset: " << offset << dendl;
993 ImageExtentIntervals extents;
994 extents.insert(offset, length);
995 ImageExtentIntervals intersect;
996 intersect.intersection_of(extents, m_in_flight_extents);
997 m_in_flight_extents.subtract(intersect);
998 }
999 }
1000 m_queued_or_blocked_io_tids.erase(tid);
1001 }
1002 }
1003}
1004
1005template <typename I>
1006void ImageRequestWQ<I>::finish_queued_io(bool write_op) {
1007 std::shared_lock locker{m_lock};
1008 if (write_op) {
11fdf7f2 1009 ceph_assert(m_queued_writes > 0);
7c673cae
FG
1010 m_queued_writes--;
1011 } else {
11fdf7f2 1012 ceph_assert(m_queued_reads > 0);
7c673cae
FG
1013 m_queued_reads--;
1014 }
1015}
1016
224ce89b
WB
1017template <typename I>
1018void ImageRequestWQ<I>::finish_in_flight_write() {
7c673cae
FG
1019 bool writes_blocked = false;
1020 {
9f95a23c 1021 std::shared_lock locker{m_lock};
11fdf7f2 1022 ceph_assert(m_in_flight_writes > 0);
224ce89b 1023 if (--m_in_flight_writes == 0 &&
7c673cae
FG
1024 !m_write_blocker_contexts.empty()) {
1025 writes_blocked = true;
1026 }
1027 }
7c673cae 1028 if (writes_blocked) {
11fdf7f2 1029 flush_image(m_image_ctx, new C_BlockedWrites(this));
7c673cae
FG
1030 }
1031}
1032
224ce89b
WB
1033template <typename I>
1034int ImageRequestWQ<I>::start_in_flight_io(AioCompletion *c) {
9f95a23c 1035 std::shared_lock locker{m_lock};
7c673cae
FG
1036
1037 if (m_shutdown) {
1038 CephContext *cct = m_image_ctx.cct;
1039 lderr(cct) << "IO received on closed image" << dendl;
1040
1041 c->fail(-ESHUTDOWN);
1042 return false;
1043 }
1044
eafe8130
TL
1045 if (!m_image_ctx.data_ctx.is_valid()) {
1046 CephContext *cct = m_image_ctx.cct;
1047 lderr(cct) << "missing data pool" << dendl;
1048
eafe8130
TL
1049 c->fail(-ENODEV);
1050 return false;
1051 }
1052
224ce89b 1053 m_in_flight_ios++;
7c673cae
FG
1054 return true;
1055}
1056
224ce89b
WB
1057template <typename I>
1058void ImageRequestWQ<I>::finish_in_flight_io() {
7c673cae
FG
1059 Context *on_shutdown;
1060 {
9f95a23c 1061 std::shared_lock locker{m_lock};
224ce89b 1062 if (--m_in_flight_ios > 0 || !m_shutdown) {
7c673cae
FG
1063 return;
1064 }
1065 on_shutdown = m_on_shutdown;
1066 }
1067
1068 CephContext *cct = m_image_ctx.cct;
1069 ldout(cct, 5) << "completing shut down" << dendl;
1070
11fdf7f2
TL
1071 ceph_assert(on_shutdown != nullptr);
1072 flush_image(m_image_ctx, on_shutdown);
7c673cae
FG
1073}
1074
224ce89b 1075template <typename I>
11fdf7f2
TL
1076void ImageRequestWQ<I>::fail_in_flight_io(
1077 int r, ImageDispatchSpec<I> *req) {
224ce89b
WB
1078 this->process_finish();
1079 req->fail(r);
9f95a23c
TL
1080
1081 bool write_op = req->is_write_op();
1082 uint64_t tid = req->get_tid();
1083 const auto& extents = req->get_image_extents();
1084 uint64_t offset = extents.size() ? extents.front().first : 0;
1085 uint64_t length = extents.size() ? extents.front().second : 0;
1086
1087 finish_queued_io(write_op);
1088 remove_in_flight_write_ios(offset, length, write_op, tid);
224ce89b
WB
1089 delete req;
1090 finish_in_flight_io();
1091}
7c673cae 1092
224ce89b
WB
1093template <typename I>
1094bool ImageRequestWQ<I>::is_lock_required(bool write_op) const {
9f95a23c 1095 ceph_assert(ceph_mutex_is_locked(m_lock));
224ce89b
WB
1096 return ((write_op && m_require_lock_on_write) ||
1097 (!write_op && m_require_lock_on_read));
7c673cae
FG
1098}
1099
224ce89b 1100template <typename I>
11fdf7f2 1101void ImageRequestWQ<I>::queue(ImageDispatchSpec<I> *req) {
9f95a23c 1102 ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
224ce89b 1103
7c673cae
FG
1104 CephContext *cct = m_image_ctx.cct;
1105 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
1106 << "req=" << req << dendl;
1107
224ce89b 1108 if (req->is_write_op()) {
7c673cae
FG
1109 m_queued_writes++;
1110 } else {
1111 m_queued_reads++;
1112 }
1113
11fdf7f2 1114 ThreadPool::PointerWQ<ImageDispatchSpec<I> >::queue(req);
224ce89b 1115}
7c673cae 1116
224ce89b 1117template <typename I>
11fdf7f2
TL
1118void ImageRequestWQ<I>::handle_acquire_lock(
1119 int r, ImageDispatchSpec<I> *req) {
224ce89b
WB
1120 CephContext *cct = m_image_ctx.cct;
1121 ldout(cct, 5) << "r=" << r << ", " << "req=" << req << dendl;
1122
1123 if (r < 0) {
1124 fail_in_flight_io(r, req);
1125 } else {
1126 // since IO was stalled for acquire -- original IO order is preserved
1127 // if we requeue this op for work queue processing
81eedcae 1128 this->requeue_front(req);
7c673cae 1129 }
224ce89b 1130
11fdf7f2 1131 ceph_assert(m_io_blockers.load() > 0);
224ce89b
WB
1132 --m_io_blockers;
1133 this->signal();
7c673cae
FG
1134}
1135
224ce89b 1136template <typename I>
11fdf7f2
TL
1137void ImageRequestWQ<I>::handle_refreshed(
1138 int r, ImageDispatchSpec<I> *req) {
7c673cae 1139 CephContext *cct = m_image_ctx.cct;
224ce89b
WB
1140 ldout(cct, 5) << "resuming IO after image refresh: r=" << r << ", "
1141 << "req=" << req << dendl;
7c673cae 1142 if (r < 0) {
224ce89b 1143 fail_in_flight_io(r, req);
7c673cae
FG
1144 } else {
1145 // since IO was stalled for refresh -- original IO order is preserved
1146 // if we requeue this op for work queue processing
81eedcae 1147 this->requeue_front(req);
7c673cae
FG
1148 }
1149
11fdf7f2 1150 ceph_assert(m_io_blockers.load() > 0);
224ce89b
WB
1151 --m_io_blockers;
1152 this->signal();
7c673cae
FG
1153}
1154
224ce89b
WB
1155template <typename I>
1156void ImageRequestWQ<I>::handle_blocked_writes(int r) {
7c673cae
FG
1157 Contexts contexts;
1158 {
9f95a23c 1159 std::unique_lock locker{m_lock};
7c673cae
FG
1160 contexts.swap(m_write_blocker_contexts);
1161 }
1162
1163 for (auto ctx : contexts) {
1164 ctx->complete(0);
1165 }
1166}
1167
224ce89b
WB
1168template class librbd::io::ImageRequestWQ<librbd::ImageCtx>;
1169
7c673cae
FG
1170} // namespace io
1171} // namespace librbd