1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "librbd/io/ImageRequestWQ.h"
5 #include "common/errno.h"
6 #include "common/zipkin_trace.h"
7 #include "common/Cond.h"
8 #include "librbd/ExclusiveLock.h"
9 #include "librbd/ImageCtx.h"
10 #include "librbd/ImageState.h"
11 #include "librbd/ImageWatcher.h"
12 #include "librbd/internal.h"
13 #include "librbd/Utils.h"
14 #include "librbd/exclusive_lock/Policy.h"
15 #include "librbd/io/AioCompletion.h"
16 #include "librbd/io/ImageRequest.h"
17 #include "librbd/io/ImageDispatchSpec.h"
18 #include "common/EventTrace.h"
20 #define dout_subsys ceph_subsys_rbd
22 #define dout_prefix *_dout << "librbd::io::ImageRequestWQ: " << this \
23 << " " << __func__ << ": "
27 using util::create_context_callback
;
34 void flush_image(I
& image_ctx
, Context
* on_finish
) {
35 auto aio_comp
= librbd::io::AioCompletion::create_and_start(
36 on_finish
, util::get_image_ctx(&image_ctx
), librbd::io::AIO_TYPE_FLUSH
);
37 auto req
= librbd::io::ImageDispatchSpec
<I
>::create_flush_request(
38 image_ctx
, aio_comp
, librbd::io::FLUSH_SOURCE_INTERNAL
, {});
43 } // anonymous namespace
46 struct ImageRequestWQ
<I
>::C_AcquireLock
: public Context
{
47 ImageRequestWQ
*work_queue
;
48 ImageDispatchSpec
<I
> *image_request
;
50 C_AcquireLock(ImageRequestWQ
*work_queue
, ImageDispatchSpec
<I
> *image_request
)
51 : work_queue(work_queue
), image_request(image_request
) {
54 void finish(int r
) override
{
55 work_queue
->handle_acquire_lock(r
, image_request
);
60 struct ImageRequestWQ
<I
>::C_BlockedWrites
: public Context
{
61 ImageRequestWQ
*work_queue
;
62 explicit C_BlockedWrites(ImageRequestWQ
*_work_queue
)
63 : work_queue(_work_queue
) {
66 void finish(int r
) override
{
67 work_queue
->handle_blocked_writes(r
);
72 struct ImageRequestWQ
<I
>::C_RefreshFinish
: public Context
{
73 ImageRequestWQ
*work_queue
;
74 ImageDispatchSpec
<I
> *image_request
;
76 C_RefreshFinish(ImageRequestWQ
*work_queue
,
77 ImageDispatchSpec
<I
> *image_request
)
78 : work_queue(work_queue
), image_request(image_request
) {
80 void finish(int r
) override
{
81 work_queue
->handle_refreshed(r
, image_request
);
85 static std::map
<uint64_t, std::string
> throttle_flags
= {
86 { RBD_QOS_IOPS_THROTTLE
, "rbd_qos_iops_throttle" },
87 { RBD_QOS_BPS_THROTTLE
, "rbd_qos_bps_throttle" },
88 { RBD_QOS_READ_IOPS_THROTTLE
, "rbd_qos_read_iops_throttle" },
89 { RBD_QOS_WRITE_IOPS_THROTTLE
, "rbd_qos_write_iops_throttle" },
90 { RBD_QOS_READ_BPS_THROTTLE
, "rbd_qos_read_bps_throttle" },
91 { RBD_QOS_WRITE_BPS_THROTTLE
, "rbd_qos_write_bps_throttle" }
95 ImageRequestWQ
<I
>::ImageRequestWQ(I
*image_ctx
, const string
&name
,
96 time_t ti
, ThreadPool
*tp
)
97 : ThreadPool::PointerWQ
<ImageDispatchSpec
<I
> >(name
, ti
, 0, tp
),
98 m_image_ctx(*image_ctx
),
99 m_lock(ceph::make_shared_mutex(
100 util::unique_lock_name("ImageRequestWQ<I>::m_lock", this))) {
101 CephContext
*cct
= m_image_ctx
.cct
;
102 ldout(cct
, 5) << "ictx=" << image_ctx
<< dendl
;
105 ceph::mutex
*timer_lock
;
106 ImageCtx::get_timer_instance(cct
, &timer
, &timer_lock
);
108 for (auto flag
: throttle_flags
) {
109 m_throttles
.push_back(make_pair(
111 new TokenBucketThrottle(cct
, flag
.second
, 0, 0, timer
, timer_lock
)));
114 this->register_work_queue();
117 template <typename I
>
118 ImageRequestWQ
<I
>::~ImageRequestWQ() {
119 for (auto t
: m_throttles
) {
124 template <typename I
>
125 ssize_t ImageRequestWQ
<I
>::read(uint64_t off
, uint64_t len
,
126 ReadResult
&&read_result
, int op_flags
) {
127 CephContext
*cct
= m_image_ctx
.cct
;
128 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", off=" << off
<< ", "
129 << "len = " << len
<< dendl
;
132 AioCompletion
*c
= AioCompletion::create(&cond
);
133 aio_read(c
, off
, len
, std::move(read_result
), op_flags
, false);
137 template <typename I
>
138 ssize_t ImageRequestWQ
<I
>::write(uint64_t off
, uint64_t len
,
139 bufferlist
&&bl
, int op_flags
) {
140 CephContext
*cct
= m_image_ctx
.cct
;
141 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", off=" << off
<< ", "
142 << "len = " << len
<< dendl
;
144 m_image_ctx
.image_lock
.lock_shared();
145 int r
= clip_io(util::get_image_ctx(&m_image_ctx
), off
, &len
);
146 m_image_ctx
.image_lock
.unlock_shared();
148 lderr(cct
) << "invalid IO request: " << cpp_strerror(r
) << dendl
;
153 AioCompletion
*c
= AioCompletion::create(&cond
);
154 aio_write(c
, off
, len
, std::move(bl
), op_flags
, false);
163 template <typename I
>
164 ssize_t ImageRequestWQ
<I
>::discard(uint64_t off
, uint64_t len
,
165 uint32_t discard_granularity_bytes
) {
166 CephContext
*cct
= m_image_ctx
.cct
;
167 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", off=" << off
<< ", "
168 << "len = " << len
<< dendl
;
170 m_image_ctx
.image_lock
.lock_shared();
171 int r
= clip_io(util::get_image_ctx(&m_image_ctx
), off
, &len
);
172 m_image_ctx
.image_lock
.unlock_shared();
174 lderr(cct
) << "invalid IO request: " << cpp_strerror(r
) << dendl
;
179 AioCompletion
*c
= AioCompletion::create(&cond
);
180 aio_discard(c
, off
, len
, discard_granularity_bytes
, false);
189 template <typename I
>
190 ssize_t ImageRequestWQ
<I
>::writesame(uint64_t off
, uint64_t len
,
191 bufferlist
&&bl
, int op_flags
) {
192 CephContext
*cct
= m_image_ctx
.cct
;
193 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", off=" << off
<< ", "
194 << "len = " << len
<< ", data_len " << bl
.length() << dendl
;
196 m_image_ctx
.image_lock
.lock_shared();
197 int r
= clip_io(util::get_image_ctx(&m_image_ctx
), off
, &len
);
198 m_image_ctx
.image_lock
.unlock_shared();
200 lderr(cct
) << "invalid IO request: " << cpp_strerror(r
) << dendl
;
205 AioCompletion
*c
= AioCompletion::create(&cond
);
206 aio_writesame(c
, off
, len
, std::move(bl
), op_flags
, false);
215 template <typename I
>
216 ssize_t ImageRequestWQ
<I
>::write_zeroes(uint64_t off
, uint64_t len
,
217 int zero_flags
, int op_flags
) {
218 auto cct
= m_image_ctx
.cct
;
219 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", off=" << off
<< ", "
220 << "len = " << len
<< dendl
;
222 m_image_ctx
.image_lock
.lock_shared();
223 int r
= clip_io(util::get_image_ctx(&m_image_ctx
), off
, &len
);
224 m_image_ctx
.image_lock
.unlock_shared();
226 lderr(cct
) << "invalid IO request: " << cpp_strerror(r
) << dendl
;
231 auto aio_comp
= io::AioCompletion::create(&ctx
);
232 aio_write_zeroes(aio_comp
, off
, len
, zero_flags
, op_flags
, false);
241 template <typename I
>
242 ssize_t ImageRequestWQ
<I
>::compare_and_write(uint64_t off
, uint64_t len
,
245 uint64_t *mismatch_off
,
247 CephContext
*cct
= m_image_ctx
.cct
;
248 ldout(cct
, 20) << "compare_and_write ictx=" << &m_image_ctx
<< ", off="
249 << off
<< ", " << "len = " << len
<< dendl
;
251 m_image_ctx
.image_lock
.lock_shared();
252 int r
= clip_io(util::get_image_ctx(&m_image_ctx
), off
, &len
);
253 m_image_ctx
.image_lock
.unlock_shared();
255 lderr(cct
) << "invalid IO request: " << cpp_strerror(r
) << dendl
;
260 AioCompletion
*c
= AioCompletion::create(&cond
);
261 aio_compare_and_write(c
, off
, len
, std::move(cmp_bl
), std::move(bl
),
262 mismatch_off
, op_flags
, false);
272 template <typename I
>
273 int ImageRequestWQ
<I
>::flush() {
274 CephContext
*cct
= m_image_ctx
.cct
;
275 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< dendl
;
278 AioCompletion
*c
= AioCompletion::create(&cond
);
289 template <typename I
>
290 void ImageRequestWQ
<I
>::aio_read(AioCompletion
*c
, uint64_t off
, uint64_t len
,
291 ReadResult
&&read_result
, int op_flags
,
293 CephContext
*cct
= m_image_ctx
.cct
;
295 ZTracer::Trace trace
;
296 if (m_image_ctx
.blkin_trace_all
) {
297 trace
.init("wq: read", &m_image_ctx
.trace_endpoint
);
298 trace
.event("start");
301 c
->init_time(util::get_image_ctx(&m_image_ctx
), AIO_TYPE_READ
);
302 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
303 << "completion=" << c
<< ", off=" << off
<< ", "
304 << "len=" << len
<< ", " << "flags=" << op_flags
<< dendl
;
306 if (native_async
&& m_image_ctx
.event_socket
.is_valid()) {
307 c
->set_event_notify(true);
310 if (!start_in_flight_io(c
)) {
314 // if journaling is enabled -- we need to replay the journal because
315 // it might contain an uncommitted write
316 std::shared_lock owner_locker
{m_image_ctx
.owner_lock
};
317 if (m_image_ctx
.non_blocking_aio
|| writes_blocked() || !writes_empty() ||
318 require_lock_on_read()) {
319 queue(ImageDispatchSpec
<I
>::create_read_request(
320 m_image_ctx
, c
, {{off
, len
}}, std::move(read_result
), op_flags
,
324 ImageRequest
<I
>::aio_read(&m_image_ctx
, c
, {{off
, len
}},
325 std::move(read_result
), op_flags
, trace
);
326 finish_in_flight_io();
328 trace
.event("finish");
331 template <typename I
>
332 void ImageRequestWQ
<I
>::aio_write(AioCompletion
*c
, uint64_t off
, uint64_t len
,
333 bufferlist
&&bl
, int op_flags
,
335 CephContext
*cct
= m_image_ctx
.cct
;
337 ZTracer::Trace trace
;
338 if (m_image_ctx
.blkin_trace_all
) {
339 trace
.init("wq: write", &m_image_ctx
.trace_endpoint
);
343 c
->init_time(util::get_image_ctx(&m_image_ctx
), AIO_TYPE_WRITE
);
344 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
345 << "completion=" << c
<< ", off=" << off
<< ", "
346 << "len=" << len
<< ", flags=" << op_flags
<< dendl
;
348 if (native_async
&& m_image_ctx
.event_socket
.is_valid()) {
349 c
->set_event_notify(true);
352 if (!start_in_flight_io(c
)) {
356 auto tid
= ++m_last_tid
;
359 std::lock_guard locker
{m_lock
};
360 m_queued_or_blocked_io_tids
.insert(tid
);
363 ImageDispatchSpec
<I
> *req
= ImageDispatchSpec
<I
>::create_write_request(
364 m_image_ctx
, c
, {{off
, len
}}, std::move(bl
), op_flags
, trace
, tid
);
366 std::shared_lock owner_locker
{m_image_ctx
.owner_lock
};
367 if (m_image_ctx
.non_blocking_aio
|| writes_blocked()) {
370 process_io(req
, false);
371 finish_in_flight_io();
373 trace
.event("finish");
376 template <typename I
>
377 void ImageRequestWQ
<I
>::aio_discard(AioCompletion
*c
, uint64_t off
,
379 uint32_t discard_granularity_bytes
,
381 CephContext
*cct
= m_image_ctx
.cct
;
383 ZTracer::Trace trace
;
384 if (m_image_ctx
.blkin_trace_all
) {
385 trace
.init("wq: discard", &m_image_ctx
.trace_endpoint
);
389 c
->init_time(util::get_image_ctx(&m_image_ctx
), AIO_TYPE_DISCARD
);
390 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
391 << "completion=" << c
<< ", off=" << off
<< ", len=" << len
394 if (native_async
&& m_image_ctx
.event_socket
.is_valid()) {
395 c
->set_event_notify(true);
398 if (!start_in_flight_io(c
)) {
402 auto tid
= ++m_last_tid
;
405 std::lock_guard locker
{m_lock
};
406 m_queued_or_blocked_io_tids
.insert(tid
);
409 ImageDispatchSpec
<I
> *req
= ImageDispatchSpec
<I
>::create_discard_request(
410 m_image_ctx
, c
, off
, len
, discard_granularity_bytes
, trace
, tid
);
412 std::shared_lock owner_locker
{m_image_ctx
.owner_lock
};
413 if (m_image_ctx
.non_blocking_aio
|| writes_blocked()) {
416 process_io(req
, false);
417 finish_in_flight_io();
419 trace
.event("finish");
422 template <typename I
>
423 void ImageRequestWQ
<I
>::aio_flush(AioCompletion
*c
, bool native_async
) {
424 CephContext
*cct
= m_image_ctx
.cct
;
426 ZTracer::Trace trace
;
427 if (m_image_ctx
.blkin_trace_all
) {
428 trace
.init("wq: flush", &m_image_ctx
.trace_endpoint
);
432 c
->init_time(util::get_image_ctx(&m_image_ctx
), AIO_TYPE_FLUSH
);
433 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
434 << "completion=" << c
<< dendl
;
436 if (native_async
&& m_image_ctx
.event_socket
.is_valid()) {
437 c
->set_event_notify(true);
440 if (!start_in_flight_io(c
)) {
444 auto tid
= ++m_last_tid
;
446 ImageDispatchSpec
<I
> *req
= ImageDispatchSpec
<I
>::create_flush_request(
447 m_image_ctx
, c
, FLUSH_SOURCE_USER
, trace
);
450 std::lock_guard locker
{m_lock
};
451 if(!m_queued_or_blocked_io_tids
.empty()) {
452 ldout(cct
, 20) << "queueing flush, tid: " << tid
<< dendl
;
453 m_queued_flushes
.emplace(tid
, req
);
459 std::shared_lock owner_locker
{m_image_ctx
.owner_lock
};
460 if (m_image_ctx
.non_blocking_aio
|| writes_blocked() || !writes_empty()) {
463 process_io(req
, false);
464 finish_in_flight_io();
466 trace
.event("finish");
469 template <typename I
>
470 void ImageRequestWQ
<I
>::aio_writesame(AioCompletion
*c
, uint64_t off
,
471 uint64_t len
, bufferlist
&&bl
,
472 int op_flags
, bool native_async
) {
473 CephContext
*cct
= m_image_ctx
.cct
;
475 ZTracer::Trace trace
;
476 if (m_image_ctx
.blkin_trace_all
) {
477 trace
.init("wq: writesame", &m_image_ctx
.trace_endpoint
);
481 c
->init_time(util::get_image_ctx(&m_image_ctx
), AIO_TYPE_WRITESAME
);
482 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
483 << "completion=" << c
<< ", off=" << off
<< ", "
484 << "len=" << len
<< ", data_len = " << bl
.length() << ", "
485 << "flags=" << op_flags
<< dendl
;
487 if (native_async
&& m_image_ctx
.event_socket
.is_valid()) {
488 c
->set_event_notify(true);
491 if (!start_in_flight_io(c
)) {
495 auto tid
= ++m_last_tid
;
498 std::lock_guard locker
{m_lock
};
499 m_queued_or_blocked_io_tids
.insert(tid
);
502 ImageDispatchSpec
<I
> *req
= ImageDispatchSpec
<I
>::create_write_same_request(
503 m_image_ctx
, c
, off
, len
, std::move(bl
), op_flags
, trace
, tid
);
505 std::shared_lock owner_locker
{m_image_ctx
.owner_lock
};
506 if (m_image_ctx
.non_blocking_aio
|| writes_blocked()) {
509 process_io(req
, false);
510 finish_in_flight_io();
512 trace
.event("finish");
516 template <typename I
>
517 void ImageRequestWQ
<I
>::aio_write_zeroes(io::AioCompletion
*aio_comp
,
518 uint64_t off
, uint64_t len
,
519 int zero_flags
, int op_flags
,
521 auto cct
= m_image_ctx
.cct
;
523 ZTracer::Trace trace
;
524 if (m_image_ctx
.blkin_trace_all
) {
525 trace
.init("io: write_zeroes", &m_image_ctx
.trace_endpoint
);
529 aio_comp
->init_time(util::get_image_ctx(&m_image_ctx
), io::AIO_TYPE_DISCARD
);
530 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
531 << "completion=" << aio_comp
<< ", off=" << off
<< ", "
532 << "len=" << len
<< dendl
;
534 if (native_async
&& m_image_ctx
.event_socket
.is_valid()) {
535 aio_comp
->set_event_notify(true);
538 // validate the supported flags
539 if (zero_flags
!= 0U) {
540 aio_comp
->fail(-EINVAL
);
544 if (!start_in_flight_io(aio_comp
)) {
548 // enable partial discard (zeroing) of objects
549 uint32_t discard_granularity_bytes
= 0;
551 auto tid
= ++m_last_tid
;
554 std::lock_guard locker
{m_lock
};
555 m_queued_or_blocked_io_tids
.insert(tid
);
558 auto req
= ImageDispatchSpec
<I
>::create_discard_request(
559 m_image_ctx
, aio_comp
, off
, len
, discard_granularity_bytes
, trace
, tid
);
561 std::shared_lock owner_locker
{m_image_ctx
.owner_lock
};
562 if (m_image_ctx
.non_blocking_aio
|| writes_blocked()) {
565 process_io(req
, false);
566 finish_in_flight_io();
568 trace
.event("finish");
571 template <typename I
>
572 void ImageRequestWQ
<I
>::aio_compare_and_write(AioCompletion
*c
,
573 uint64_t off
, uint64_t len
,
576 uint64_t *mismatch_off
,
577 int op_flags
, bool native_async
) {
578 CephContext
*cct
= m_image_ctx
.cct
;
580 ZTracer::Trace trace
;
581 if (m_image_ctx
.blkin_trace_all
) {
582 trace
.init("wq: compare_and_write", &m_image_ctx
.trace_endpoint
);
586 c
->init_time(util::get_image_ctx(&m_image_ctx
), AIO_TYPE_COMPARE_AND_WRITE
);
587 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
588 << "completion=" << c
<< ", off=" << off
<< ", "
589 << "len=" << len
<< dendl
;
591 if (native_async
&& m_image_ctx
.event_socket
.is_valid()) {
592 c
->set_event_notify(true);
595 if (!start_in_flight_io(c
)) {
599 auto tid
= ++m_last_tid
;
602 std::lock_guard locker
{m_lock
};
603 m_queued_or_blocked_io_tids
.insert(tid
);
606 ImageDispatchSpec
<I
> *req
= ImageDispatchSpec
<I
>::create_compare_and_write_request(
607 m_image_ctx
, c
, {{off
, len
}}, std::move(cmp_bl
), std::move(bl
),
608 mismatch_off
, op_flags
, trace
, tid
);
610 std::shared_lock owner_locker
{m_image_ctx
.owner_lock
};
611 if (m_image_ctx
.non_blocking_aio
|| writes_blocked()) {
614 process_io(req
, false);
615 finish_in_flight_io();
617 trace
.event("finish");
620 template <typename I
>
621 bool ImageRequestWQ
<I
>::block_overlapping_io(
622 ImageExtentIntervals
* in_flight_image_extents
, uint64_t off
, uint64_t len
) {
623 CephContext
*cct
= m_image_ctx
.cct
;
624 ldout(cct
, 20) << "ictx=" << &m_image_ctx
625 << "off: " << off
<< " len: " << len
<<dendl
;
631 if (in_flight_image_extents
->empty() ||
632 !in_flight_image_extents
->intersects(off
, len
)) {
633 in_flight_image_extents
->insert(off
, len
);
640 template <typename I
>
641 void ImageRequestWQ
<I
>::unblock_overlapping_io(uint64_t offset
, uint64_t length
,
643 CephContext
*cct
= m_image_ctx
.cct
;
644 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< dendl
;
646 remove_in_flight_write_ios(offset
, length
, true, tid
);
648 std::unique_lock locker
{m_lock
};
649 if (!m_blocked_ios
.empty()) {
650 auto it
= m_blocked_ios
.begin();
651 while (it
!= m_blocked_ios
.end()) {
652 auto blocked_io
= *it
;
654 const auto& extents
= blocked_io
->get_image_extents();
655 uint64_t off
= extents
.size() ? extents
.front().first
: 0;
656 uint64_t len
= extents
.size() ? extents
.front().second
: 0;
658 if (block_overlapping_io(&m_in_flight_extents
, off
, len
)) {
661 ldout(cct
, 20) << "unblocking off: " << off
<< ", "
662 << "len: " << len
<< dendl
;
663 AioCompletion
*aio_comp
= blocked_io
->get_aio_completion();
665 m_blocked_ios
.erase(it
);
667 queue_unblocked_io(aio_comp
, blocked_io
);
673 template <typename I
>
674 void ImageRequestWQ
<I
>::unblock_flushes() {
675 CephContext
*cct
= m_image_ctx
.cct
;
676 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< dendl
;
677 std::unique_lock locker
{m_lock
};
678 auto io_tid_it
= m_queued_or_blocked_io_tids
.begin();
680 auto it
= m_queued_flushes
.begin();
681 if (it
== m_queued_flushes
.end() ||
682 (io_tid_it
!= m_queued_or_blocked_io_tids
.end() &&
683 *io_tid_it
< it
->first
)) {
687 auto blocked_flush
= *it
;
688 ldout(cct
, 20) << "unblocking flush: tid " << blocked_flush
.first
<< dendl
;
690 AioCompletion
*aio_comp
= blocked_flush
.second
->get_aio_completion();
692 m_queued_flushes
.erase(it
);
694 queue_unblocked_io(aio_comp
, blocked_flush
.second
);
699 template <typename I
>
700 void ImageRequestWQ
<I
>::queue_unblocked_io(AioCompletion
*comp
,
701 ImageDispatchSpec
<I
> *req
) {
702 if (!start_in_flight_io(comp
)) {
706 std::shared_lock owner_locker
{m_image_ctx
.owner_lock
};
710 template <typename I
>
711 void ImageRequestWQ
<I
>::shut_down(Context
*on_shutdown
) {
712 ceph_assert(ceph_mutex_is_locked(m_image_ctx
.owner_lock
));
715 std::unique_lock locker
{m_lock
};
716 ceph_assert(!m_shutdown
);
719 CephContext
*cct
= m_image_ctx
.cct
;
720 ldout(cct
, 5) << __func__
<< ": in_flight=" << m_in_flight_ios
.load()
722 if (m_in_flight_ios
> 0) {
723 m_on_shutdown
= on_shutdown
;
728 // ensure that all in-flight IO is flushed
729 flush_image(m_image_ctx
, on_shutdown
);
732 template <typename I
>
733 int ImageRequestWQ
<I
>::block_writes() {
734 C_SaferCond cond_ctx
;
735 block_writes(&cond_ctx
);
736 return cond_ctx
.wait();
739 template <typename I
>
740 void ImageRequestWQ
<I
>::block_writes(Context
*on_blocked
) {
741 ceph_assert(ceph_mutex_is_locked(m_image_ctx
.owner_lock
));
742 CephContext
*cct
= m_image_ctx
.cct
;
745 std::unique_lock locker
{m_lock
};
747 ldout(cct
, 5) << &m_image_ctx
<< ", " << "num="
748 << m_write_blockers
<< dendl
;
749 if (!m_write_blocker_contexts
.empty() || m_in_flight_writes
> 0) {
750 m_write_blocker_contexts
.push_back(on_blocked
);
755 // ensure that all in-flight IO is flushed
756 flush_image(m_image_ctx
, on_blocked
);
759 template <typename I
>
760 void ImageRequestWQ
<I
>::unblock_writes() {
761 CephContext
*cct
= m_image_ctx
.cct
;
763 bool wake_up
= false;
764 Contexts waiter_contexts
;
766 std::unique_lock locker
{m_lock
};
767 ceph_assert(m_write_blockers
> 0);
770 ldout(cct
, 5) << &m_image_ctx
<< ", " << "num="
771 << m_write_blockers
<< dendl
;
772 if (m_write_blockers
== 0) {
774 std::swap(waiter_contexts
, m_unblocked_write_waiter_contexts
);
779 for (auto ctx
: waiter_contexts
) {
786 template <typename I
>
787 void ImageRequestWQ
<I
>::wait_on_writes_unblocked(Context
*on_unblocked
) {
788 ceph_assert(ceph_mutex_is_locked(m_image_ctx
.owner_lock
));
789 CephContext
*cct
= m_image_ctx
.cct
;
792 std::unique_lock locker
{m_lock
};
793 ldout(cct
, 20) << &m_image_ctx
<< ", " << "write_blockers="
794 << m_write_blockers
<< dendl
;
795 if (!m_unblocked_write_waiter_contexts
.empty() || m_write_blockers
> 0) {
796 m_unblocked_write_waiter_contexts
.push_back(on_unblocked
);
801 on_unblocked
->complete(0);
804 template <typename I
>
805 void ImageRequestWQ
<I
>::set_require_lock(Direction direction
, bool enabled
) {
806 CephContext
*cct
= m_image_ctx
.cct
;
807 ldout(cct
, 20) << dendl
;
809 bool wake_up
= false;
811 std::unique_lock locker
{m_lock
};
814 wake_up
= (enabled
!= m_require_lock_on_read
);
815 m_require_lock_on_read
= enabled
;
817 case DIRECTION_WRITE
:
818 wake_up
= (enabled
!= m_require_lock_on_write
);
819 m_require_lock_on_write
= enabled
;
822 wake_up
= (enabled
!= m_require_lock_on_read
||
823 enabled
!= m_require_lock_on_write
);
824 m_require_lock_on_read
= enabled
;
825 m_require_lock_on_write
= enabled
;
830 // wake up the thread pool whenever the state changes so that
831 // we can re-request the lock if required
837 template <typename I
>
838 void ImageRequestWQ
<I
>::apply_qos_schedule_tick_min(uint64_t tick
){
839 for (auto pair
: m_throttles
) {
840 pair
.second
->set_schedule_tick_min(tick
);
844 template <typename I
>
845 void ImageRequestWQ
<I
>::apply_qos_limit(const uint64_t flag
,
848 CephContext
*cct
= m_image_ctx
.cct
;
849 TokenBucketThrottle
*throttle
= nullptr;
850 for (auto pair
: m_throttles
) {
851 if (flag
== pair
.first
) {
852 throttle
= pair
.second
;
856 ceph_assert(throttle
!= nullptr);
858 int r
= throttle
->set_limit(limit
, burst
);
860 lderr(cct
) << throttle
->get_name() << ": invalid qos parameter: "
861 << "burst(" << burst
<< ") is less than "
862 << "limit(" << limit
<< ")" << dendl
;
863 // if apply failed, we should at least make sure the limit works.
864 throttle
->set_limit(limit
, 0);
868 m_qos_enabled_flag
|= flag
;
870 m_qos_enabled_flag
&= ~flag
;
873 template <typename I
>
874 void ImageRequestWQ
<I
>::handle_throttle_ready(int r
, ImageDispatchSpec
<I
> *item
,
876 CephContext
*cct
= m_image_ctx
.cct
;
877 ldout(cct
, 15) << "r=" << r
<< ", " << "req=" << item
<< dendl
;
879 std::lock_guard pool_locker
{this->get_pool_lock()};
880 ceph_assert(m_io_throttled
.load() > 0);
881 item
->set_throttled(flag
);
882 if (item
->were_all_throttled()) {
883 this->requeue_back(pool_locker
, item
);
885 this->signal(pool_locker
);
889 template <typename I
>
890 bool ImageRequestWQ
<I
>::needs_throttle(ImageDispatchSpec
<I
> *item
) {
893 bool blocked
= false;
894 TokenBucketThrottle
* throttle
= nullptr;
896 for (auto t
: m_throttles
) {
898 if (item
->was_throttled(flag
))
901 if (!(m_qos_enabled_flag
& flag
)) {
902 item
->set_throttled(flag
);
907 if (item
->tokens_requested(flag
, &tokens
) &&
908 throttle
->get
<ImageRequestWQ
<I
>, ImageDispatchSpec
<I
>,
909 &ImageRequestWQ
<I
>::handle_throttle_ready
>(
910 tokens
, this, item
, flag
)) {
913 item
->set_throttled(flag
);
919 template <typename I
>
920 void *ImageRequestWQ
<I
>::_void_dequeue() {
921 CephContext
*cct
= m_image_ctx
.cct
;
922 ImageDispatchSpec
<I
> *peek_item
= this->front();
924 // no queued IO requests or all IO is blocked/stalled
925 if (peek_item
== nullptr || m_io_blockers
.load() > 0) {
929 if (needs_throttle(peek_item
)) {
930 ldout(cct
, 15) << "throttling IO " << peek_item
<< dendl
;
933 // dequeue the throttled item
934 ThreadPool::PointerWQ
<ImageDispatchSpec
<I
> >::_void_dequeue();
939 bool refresh_required
= m_image_ctx
.state
->is_refresh_required();
941 std::shared_lock locker
{m_lock
};
942 bool write_op
= peek_item
->is_write_op();
943 lock_required
= is_lock_required(write_op
);
945 if (!lock_required
&& m_write_blockers
> 0) {
946 // missing lock is not the write blocker
950 if (!lock_required
&& !refresh_required
&& !peek_item
->blocked
) {
951 // completed ops will requeue the IO -- don't count it as in-progress
952 m_in_flight_writes
++;
957 auto item
= reinterpret_cast<ImageDispatchSpec
<I
> *>(
958 ThreadPool::PointerWQ
<ImageDispatchSpec
<I
> >::_void_dequeue());
959 ceph_assert(peek_item
== item
);
962 this->get_pool_lock().unlock();
963 m_image_ctx
.owner_lock
.lock_shared();
964 if (m_image_ctx
.exclusive_lock
!= nullptr) {
965 ldout(cct
, 5) << "exclusive lock required: delaying IO " << item
<< dendl
;
966 if (!m_image_ctx
.get_exclusive_lock_policy()->may_auto_request_lock()) {
967 lderr(cct
) << "op requires exclusive lock" << dendl
;
968 fail_in_flight_io(m_image_ctx
.exclusive_lock
->get_unlocked_op_error(),
971 // wake up the IO since we won't be returning a request to process
974 // stall IO until the acquire completes
976 Context
*ctx
= new C_AcquireLock(this, item
);
977 ctx
= create_context_callback
<
978 Context
, &Context::complete
>(
979 ctx
, m_image_ctx
.exclusive_lock
);
980 m_image_ctx
.exclusive_lock
->acquire_lock(ctx
);
983 // raced with the exclusive lock being disabled
984 lock_required
= false;
986 m_image_ctx
.owner_lock
.unlock_shared();
987 this->get_pool_lock().lock();
994 if (refresh_required
) {
995 ldout(cct
, 5) << "image refresh required: delaying IO " << item
<< dendl
;
997 // stall IO until the refresh completes
1000 this->get_pool_lock().unlock();
1001 m_image_ctx
.state
->refresh(new C_RefreshFinish(this, item
));
1002 this->get_pool_lock().lock();
1009 template <typename I
>
1010 void ImageRequestWQ
<I
>::process_io(ImageDispatchSpec
<I
> *req
,
1011 bool non_blocking_io
) {
1012 CephContext
*cct
= m_image_ctx
.cct
;
1013 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
1014 << "req=" << req
<< dendl
;
1016 //extents are invalidated after the request is sent
1017 //so gather them ahead of that
1018 const auto& extents
= req
->get_image_extents();
1019 bool write_op
= req
->is_write_op();
1020 uint64_t tid
= req
->get_tid();
1021 uint64_t offset
= extents
.size() ? extents
.front().first
: 0;
1022 uint64_t length
= extents
.size() ? extents
.front().second
: 0;
1024 if (write_op
&& !req
->blocked
) {
1025 std::lock_guard locker
{m_lock
};
1026 bool blocked
= block_overlapping_io(&m_in_flight_extents
, offset
, length
);
1028 ldout(cct
, 20) << "blocking overlapping IO: " << "ictx="
1029 << &m_image_ctx
<< ", "
1030 << "off=" << offset
<< ", len=" << length
<< dendl
;
1031 req
->blocked
= true;
1032 m_blocked_ios
.push_back(req
);
1041 if (non_blocking_io
) {
1042 finish_in_flight_write();
1044 unblock_overlapping_io(offset
, length
, tid
);
1050 template <typename I
>
1051 void ImageRequestWQ
<I
>::process(ImageDispatchSpec
<I
> *req
) {
1052 CephContext
*cct
= m_image_ctx
.cct
;
1053 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
1054 << "req=" << req
<< dendl
;
1056 bool write_op
= req
->is_write_op();
1058 process_io(req
, true);
1060 finish_queued_io(write_op
);
1061 finish_in_flight_io();
1064 template <typename I
>
1065 void ImageRequestWQ
<I
>::remove_in_flight_write_ios(uint64_t offset
, uint64_t length
,
1066 bool write_op
, uint64_t tid
) {
1067 CephContext
*cct
= m_image_ctx
.cct
;
1068 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< dendl
;
1070 std::lock_guard locker
{m_lock
};
1073 if(!m_in_flight_extents
.empty()) {
1074 CephContext
*cct
= m_image_ctx
.cct
;
1075 ldout(cct
, 20) << "erasing in flight extents with tid:"
1076 << tid
<< ", offset: " << offset
<< dendl
;
1077 ImageExtentIntervals extents
;
1078 extents
.insert(offset
, length
);
1079 ImageExtentIntervals intersect
;
1080 intersect
.intersection_of(extents
, m_in_flight_extents
);
1081 m_in_flight_extents
.subtract(intersect
);
1084 m_queued_or_blocked_io_tids
.erase(tid
);
1089 template <typename I
>
1090 void ImageRequestWQ
<I
>::finish_queued_io(bool write_op
) {
1091 std::shared_lock locker
{m_lock
};
1093 ceph_assert(m_queued_writes
> 0);
1096 ceph_assert(m_queued_reads
> 0);
1101 template <typename I
>
1102 void ImageRequestWQ
<I
>::finish_in_flight_write() {
1103 bool writes_blocked
= false;
1105 std::shared_lock locker
{m_lock
};
1106 ceph_assert(m_in_flight_writes
> 0);
1107 if (--m_in_flight_writes
== 0 &&
1108 !m_write_blocker_contexts
.empty()) {
1109 writes_blocked
= true;
1112 if (writes_blocked
) {
1113 flush_image(m_image_ctx
, new C_BlockedWrites(this));
1117 template <typename I
>
1118 int ImageRequestWQ
<I
>::start_in_flight_io(AioCompletion
*c
) {
1119 std::shared_lock locker
{m_lock
};
1122 CephContext
*cct
= m_image_ctx
.cct
;
1123 lderr(cct
) << "IO received on closed image" << dendl
;
1125 c
->fail(-ESHUTDOWN
);
1129 if (!m_image_ctx
.data_ctx
.is_valid()) {
1130 CephContext
*cct
= m_image_ctx
.cct
;
1131 lderr(cct
) << "missing data pool" << dendl
;
1141 template <typename I
>
1142 void ImageRequestWQ
<I
>::finish_in_flight_io() {
1143 Context
*on_shutdown
;
1145 std::shared_lock locker
{m_lock
};
1146 if (--m_in_flight_ios
> 0 || !m_shutdown
) {
1149 on_shutdown
= m_on_shutdown
;
1152 CephContext
*cct
= m_image_ctx
.cct
;
1153 ldout(cct
, 5) << "completing shut down" << dendl
;
1155 ceph_assert(on_shutdown
!= nullptr);
1156 flush_image(m_image_ctx
, on_shutdown
);
1159 template <typename I
>
1160 void ImageRequestWQ
<I
>::fail_in_flight_io(
1161 int r
, ImageDispatchSpec
<I
> *req
) {
1162 this->process_finish();
1165 bool write_op
= req
->is_write_op();
1166 uint64_t tid
= req
->get_tid();
1167 const auto& extents
= req
->get_image_extents();
1168 uint64_t offset
= extents
.size() ? extents
.front().first
: 0;
1169 uint64_t length
= extents
.size() ? extents
.front().second
: 0;
1171 finish_queued_io(write_op
);
1172 remove_in_flight_write_ios(offset
, length
, write_op
, tid
);
1174 finish_in_flight_io();
1177 template <typename I
>
1178 bool ImageRequestWQ
<I
>::is_lock_required(bool write_op
) const {
1179 ceph_assert(ceph_mutex_is_locked(m_lock
));
1180 return ((write_op
&& m_require_lock_on_write
) ||
1181 (!write_op
&& m_require_lock_on_read
));
1184 template <typename I
>
1185 void ImageRequestWQ
<I
>::queue(ImageDispatchSpec
<I
> *req
) {
1186 ceph_assert(ceph_mutex_is_locked(m_image_ctx
.owner_lock
));
1188 CephContext
*cct
= m_image_ctx
.cct
;
1189 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
1190 << "req=" << req
<< dendl
;
1192 if (req
->is_write_op()) {
1198 ThreadPool::PointerWQ
<ImageDispatchSpec
<I
> >::queue(req
);
1201 template <typename I
>
1202 void ImageRequestWQ
<I
>::handle_acquire_lock(
1203 int r
, ImageDispatchSpec
<I
> *req
) {
1204 CephContext
*cct
= m_image_ctx
.cct
;
1205 ldout(cct
, 5) << "r=" << r
<< ", " << "req=" << req
<< dendl
;
1208 fail_in_flight_io(r
, req
);
1210 // since IO was stalled for acquire -- original IO order is preserved
1211 // if we requeue this op for work queue processing
1212 this->requeue_front(req
);
1215 ceph_assert(m_io_blockers
.load() > 0);
1220 template <typename I
>
1221 void ImageRequestWQ
<I
>::handle_refreshed(
1222 int r
, ImageDispatchSpec
<I
> *req
) {
1223 CephContext
*cct
= m_image_ctx
.cct
;
1224 ldout(cct
, 5) << "resuming IO after image refresh: r=" << r
<< ", "
1225 << "req=" << req
<< dendl
;
1227 fail_in_flight_io(r
, req
);
1229 // since IO was stalled for refresh -- original IO order is preserved
1230 // if we requeue this op for work queue processing
1231 this->requeue_front(req
);
1234 ceph_assert(m_io_blockers
.load() > 0);
1239 template <typename I
>
1240 void ImageRequestWQ
<I
>::handle_blocked_writes(int r
) {
1243 std::unique_lock locker
{m_lock
};
1244 contexts
.swap(m_write_blocker_contexts
);
1247 for (auto ctx
: contexts
) {
1252 template class librbd::io::ImageRequestWQ
<librbd::ImageCtx
>;
1255 } // namespace librbd