1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "librbd/io/ImageRequestWQ.h"
5 #include "common/errno.h"
6 #include "common/zipkin_trace.h"
7 #include "common/Cond.h"
8 #include "librbd/ExclusiveLock.h"
9 #include "librbd/ImageCtx.h"
10 #include "librbd/ImageState.h"
11 #include "librbd/ImageWatcher.h"
12 #include "librbd/internal.h"
13 #include "librbd/Utils.h"
14 #include "librbd/exclusive_lock/Policy.h"
15 #include "librbd/io/AioCompletion.h"
16 #include "librbd/io/ImageRequest.h"
17 #include "librbd/io/ImageDispatchSpec.h"
18 #include "common/EventTrace.h"
20 #define dout_subsys ceph_subsys_rbd
22 #define dout_prefix *_dout << "librbd::io::ImageRequestWQ: " << this \
23 << " " << __func__ << ": "
27 using util::create_context_callback
;
34 void flush_image(I
& image_ctx
, Context
* on_finish
) {
35 auto aio_comp
= librbd::io::AioCompletion::create_and_start(
36 on_finish
, util::get_image_ctx(&image_ctx
), librbd::io::AIO_TYPE_FLUSH
);
37 auto req
= librbd::io::ImageDispatchSpec
<I
>::create_flush_request(
38 image_ctx
, aio_comp
, librbd::io::FLUSH_SOURCE_INTERNAL
, {});
43 } // anonymous namespace
46 struct ImageRequestWQ
<I
>::C_AcquireLock
: public Context
{
47 ImageRequestWQ
*work_queue
;
48 ImageDispatchSpec
<I
> *image_request
;
50 C_AcquireLock(ImageRequestWQ
*work_queue
, ImageDispatchSpec
<I
> *image_request
)
51 : work_queue(work_queue
), image_request(image_request
) {
54 void finish(int r
) override
{
55 work_queue
->handle_acquire_lock(r
, image_request
);
60 struct ImageRequestWQ
<I
>::C_BlockedWrites
: public Context
{
61 ImageRequestWQ
*work_queue
;
62 explicit C_BlockedWrites(ImageRequestWQ
*_work_queue
)
63 : work_queue(_work_queue
) {
66 void finish(int r
) override
{
67 work_queue
->handle_blocked_writes(r
);
72 struct ImageRequestWQ
<I
>::C_RefreshFinish
: public Context
{
73 ImageRequestWQ
*work_queue
;
74 ImageDispatchSpec
<I
> *image_request
;
76 C_RefreshFinish(ImageRequestWQ
*work_queue
,
77 ImageDispatchSpec
<I
> *image_request
)
78 : work_queue(work_queue
), image_request(image_request
) {
80 void finish(int r
) override
{
81 work_queue
->handle_refreshed(r
, image_request
);
85 static std::map
<uint64_t, std::string
> throttle_flags
= {
86 { RBD_QOS_IOPS_THROTTLE
, "rbd_qos_iops_throttle" },
87 { RBD_QOS_BPS_THROTTLE
, "rbd_qos_bps_throttle" },
88 { RBD_QOS_READ_IOPS_THROTTLE
, "rbd_qos_read_iops_throttle" },
89 { RBD_QOS_WRITE_IOPS_THROTTLE
, "rbd_qos_write_iops_throttle" },
90 { RBD_QOS_READ_BPS_THROTTLE
, "rbd_qos_read_bps_throttle" },
91 { RBD_QOS_WRITE_BPS_THROTTLE
, "rbd_qos_write_bps_throttle" }
95 ImageRequestWQ
<I
>::ImageRequestWQ(I
*image_ctx
, const string
&name
,
96 time_t ti
, ThreadPool
*tp
)
97 : ThreadPool::PointerWQ
<ImageDispatchSpec
<I
> >(name
, ti
, 0, tp
),
98 m_image_ctx(*image_ctx
),
99 m_lock(ceph::make_shared_mutex(
100 util::unique_lock_name("ImageRequestWQ<I>::m_lock", this))) {
101 CephContext
*cct
= m_image_ctx
.cct
;
102 ldout(cct
, 5) << "ictx=" << image_ctx
<< dendl
;
105 ceph::mutex
*timer_lock
;
106 ImageCtx::get_timer_instance(cct
, &timer
, &timer_lock
);
108 for (auto flag
: throttle_flags
) {
109 m_throttles
.push_back(make_pair(
111 new TokenBucketThrottle(cct
, flag
.second
, 0, 0, timer
, timer_lock
)));
114 this->register_work_queue();
117 template <typename I
>
118 ImageRequestWQ
<I
>::~ImageRequestWQ() {
119 for (auto t
: m_throttles
) {
124 template <typename I
>
125 ssize_t ImageRequestWQ
<I
>::read(uint64_t off
, uint64_t len
,
126 ReadResult
&&read_result
, int op_flags
) {
127 CephContext
*cct
= m_image_ctx
.cct
;
128 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", off=" << off
<< ", "
129 << "len = " << len
<< dendl
;
132 AioCompletion
*c
= AioCompletion::create(&cond
);
133 aio_read(c
, off
, len
, std::move(read_result
), op_flags
, false);
137 template <typename I
>
138 ssize_t ImageRequestWQ
<I
>::write(uint64_t off
, uint64_t len
,
139 bufferlist
&&bl
, int op_flags
) {
140 CephContext
*cct
= m_image_ctx
.cct
;
141 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", off=" << off
<< ", "
142 << "len = " << len
<< dendl
;
144 m_image_ctx
.image_lock
.lock_shared();
145 int r
= clip_io(util::get_image_ctx(&m_image_ctx
), off
, &len
);
146 m_image_ctx
.image_lock
.unlock_shared();
148 lderr(cct
) << "invalid IO request: " << cpp_strerror(r
) << dendl
;
153 AioCompletion
*c
= AioCompletion::create(&cond
);
154 aio_write(c
, off
, len
, std::move(bl
), op_flags
, false);
163 template <typename I
>
164 ssize_t ImageRequestWQ
<I
>::discard(uint64_t off
, uint64_t len
,
165 uint32_t discard_granularity_bytes
) {
166 CephContext
*cct
= m_image_ctx
.cct
;
167 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", off=" << off
<< ", "
168 << "len = " << len
<< dendl
;
170 m_image_ctx
.image_lock
.lock_shared();
171 int r
= clip_io(util::get_image_ctx(&m_image_ctx
), off
, &len
);
172 m_image_ctx
.image_lock
.unlock_shared();
174 lderr(cct
) << "invalid IO request: " << cpp_strerror(r
) << dendl
;
179 AioCompletion
*c
= AioCompletion::create(&cond
);
180 aio_discard(c
, off
, len
, discard_granularity_bytes
, false);
189 template <typename I
>
190 ssize_t ImageRequestWQ
<I
>::writesame(uint64_t off
, uint64_t len
,
191 bufferlist
&&bl
, int op_flags
) {
192 CephContext
*cct
= m_image_ctx
.cct
;
193 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", off=" << off
<< ", "
194 << "len = " << len
<< ", data_len " << bl
.length() << dendl
;
196 m_image_ctx
.image_lock
.lock_shared();
197 int r
= clip_io(util::get_image_ctx(&m_image_ctx
), off
, &len
);
198 m_image_ctx
.image_lock
.unlock_shared();
200 lderr(cct
) << "invalid IO request: " << cpp_strerror(r
) << dendl
;
205 AioCompletion
*c
= AioCompletion::create(&cond
);
206 aio_writesame(c
, off
, len
, std::move(bl
), op_flags
, false);
215 template <typename I
>
216 ssize_t ImageRequestWQ
<I
>::compare_and_write(uint64_t off
, uint64_t len
,
219 uint64_t *mismatch_off
,
221 CephContext
*cct
= m_image_ctx
.cct
;
222 ldout(cct
, 20) << "compare_and_write ictx=" << &m_image_ctx
<< ", off="
223 << off
<< ", " << "len = " << len
<< dendl
;
225 m_image_ctx
.image_lock
.lock_shared();
226 int r
= clip_io(util::get_image_ctx(&m_image_ctx
), off
, &len
);
227 m_image_ctx
.image_lock
.unlock_shared();
229 lderr(cct
) << "invalid IO request: " << cpp_strerror(r
) << dendl
;
234 AioCompletion
*c
= AioCompletion::create(&cond
);
235 aio_compare_and_write(c
, off
, len
, std::move(cmp_bl
), std::move(bl
),
236 mismatch_off
, op_flags
, false);
246 template <typename I
>
247 int ImageRequestWQ
<I
>::flush() {
248 CephContext
*cct
= m_image_ctx
.cct
;
249 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< dendl
;
252 AioCompletion
*c
= AioCompletion::create(&cond
);
263 template <typename I
>
264 void ImageRequestWQ
<I
>::aio_read(AioCompletion
*c
, uint64_t off
, uint64_t len
,
265 ReadResult
&&read_result
, int op_flags
,
267 CephContext
*cct
= m_image_ctx
.cct
;
269 ZTracer::Trace trace
;
270 if (m_image_ctx
.blkin_trace_all
) {
271 trace
.init("wq: read", &m_image_ctx
.trace_endpoint
);
272 trace
.event("start");
275 c
->init_time(util::get_image_ctx(&m_image_ctx
), AIO_TYPE_READ
);
276 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
277 << "completion=" << c
<< ", off=" << off
<< ", "
278 << "len=" << len
<< ", " << "flags=" << op_flags
<< dendl
;
280 if (native_async
&& m_image_ctx
.event_socket
.is_valid()) {
281 c
->set_event_notify(true);
284 if (!start_in_flight_io(c
)) {
288 // if journaling is enabled -- we need to replay the journal because
289 // it might contain an uncommitted write
290 std::shared_lock owner_locker
{m_image_ctx
.owner_lock
};
291 if (m_image_ctx
.non_blocking_aio
|| writes_blocked() || !writes_empty() ||
292 require_lock_on_read()) {
293 queue(ImageDispatchSpec
<I
>::create_read_request(
294 m_image_ctx
, c
, {{off
, len
}}, std::move(read_result
), op_flags
,
298 ImageRequest
<I
>::aio_read(&m_image_ctx
, c
, {{off
, len
}},
299 std::move(read_result
), op_flags
, trace
);
300 finish_in_flight_io();
302 trace
.event("finish");
305 template <typename I
>
306 void ImageRequestWQ
<I
>::aio_write(AioCompletion
*c
, uint64_t off
, uint64_t len
,
307 bufferlist
&&bl
, int op_flags
,
309 CephContext
*cct
= m_image_ctx
.cct
;
311 ZTracer::Trace trace
;
312 if (m_image_ctx
.blkin_trace_all
) {
313 trace
.init("wq: write", &m_image_ctx
.trace_endpoint
);
317 c
->init_time(util::get_image_ctx(&m_image_ctx
), AIO_TYPE_WRITE
);
318 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
319 << "completion=" << c
<< ", off=" << off
<< ", "
320 << "len=" << len
<< ", flags=" << op_flags
<< dendl
;
322 if (native_async
&& m_image_ctx
.event_socket
.is_valid()) {
323 c
->set_event_notify(true);
326 if (!start_in_flight_io(c
)) {
330 auto tid
= ++m_last_tid
;
333 std::lock_guard locker
{m_lock
};
334 m_queued_or_blocked_io_tids
.insert(tid
);
337 ImageDispatchSpec
<I
> *req
= ImageDispatchSpec
<I
>::create_write_request(
338 m_image_ctx
, c
, {{off
, len
}}, std::move(bl
), op_flags
, trace
, tid
);
340 std::shared_lock owner_locker
{m_image_ctx
.owner_lock
};
341 if (m_image_ctx
.non_blocking_aio
|| writes_blocked()) {
344 process_io(req
, false);
345 finish_in_flight_io();
347 trace
.event("finish");
350 template <typename I
>
351 void ImageRequestWQ
<I
>::aio_discard(AioCompletion
*c
, uint64_t off
,
353 uint32_t discard_granularity_bytes
,
355 CephContext
*cct
= m_image_ctx
.cct
;
357 ZTracer::Trace trace
;
358 if (m_image_ctx
.blkin_trace_all
) {
359 trace
.init("wq: discard", &m_image_ctx
.trace_endpoint
);
363 c
->init_time(util::get_image_ctx(&m_image_ctx
), AIO_TYPE_DISCARD
);
364 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
365 << "completion=" << c
<< ", off=" << off
<< ", len=" << len
368 if (native_async
&& m_image_ctx
.event_socket
.is_valid()) {
369 c
->set_event_notify(true);
372 if (!start_in_flight_io(c
)) {
376 auto tid
= ++m_last_tid
;
379 std::lock_guard locker
{m_lock
};
380 m_queued_or_blocked_io_tids
.insert(tid
);
383 ImageDispatchSpec
<I
> *req
= ImageDispatchSpec
<I
>::create_discard_request(
384 m_image_ctx
, c
, off
, len
, discard_granularity_bytes
, trace
, tid
);
386 std::shared_lock owner_locker
{m_image_ctx
.owner_lock
};
387 if (m_image_ctx
.non_blocking_aio
|| writes_blocked()) {
390 process_io(req
, false);
391 finish_in_flight_io();
393 trace
.event("finish");
396 template <typename I
>
397 void ImageRequestWQ
<I
>::aio_flush(AioCompletion
*c
, bool native_async
) {
398 CephContext
*cct
= m_image_ctx
.cct
;
400 ZTracer::Trace trace
;
401 if (m_image_ctx
.blkin_trace_all
) {
402 trace
.init("wq: flush", &m_image_ctx
.trace_endpoint
);
406 c
->init_time(util::get_image_ctx(&m_image_ctx
), AIO_TYPE_FLUSH
);
407 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
408 << "completion=" << c
<< dendl
;
410 if (native_async
&& m_image_ctx
.event_socket
.is_valid()) {
411 c
->set_event_notify(true);
414 if (!start_in_flight_io(c
)) {
418 auto tid
= ++m_last_tid
;
420 ImageDispatchSpec
<I
> *req
= ImageDispatchSpec
<I
>::create_flush_request(
421 m_image_ctx
, c
, FLUSH_SOURCE_USER
, trace
);
424 std::lock_guard locker
{m_lock
};
425 if(!m_queued_or_blocked_io_tids
.empty()) {
426 ldout(cct
, 20) << "queueing flush, tid: " << tid
<< dendl
;
427 m_queued_flushes
.emplace(tid
, req
);
433 std::shared_lock owner_locker
{m_image_ctx
.owner_lock
};
434 if (m_image_ctx
.non_blocking_aio
|| writes_blocked() || !writes_empty()) {
437 process_io(req
, false);
438 finish_in_flight_io();
440 trace
.event("finish");
443 template <typename I
>
444 void ImageRequestWQ
<I
>::aio_writesame(AioCompletion
*c
, uint64_t off
,
445 uint64_t len
, bufferlist
&&bl
,
446 int op_flags
, bool native_async
) {
447 CephContext
*cct
= m_image_ctx
.cct
;
449 ZTracer::Trace trace
;
450 if (m_image_ctx
.blkin_trace_all
) {
451 trace
.init("wq: writesame", &m_image_ctx
.trace_endpoint
);
455 c
->init_time(util::get_image_ctx(&m_image_ctx
), AIO_TYPE_WRITESAME
);
456 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
457 << "completion=" << c
<< ", off=" << off
<< ", "
458 << "len=" << len
<< ", data_len = " << bl
.length() << ", "
459 << "flags=" << op_flags
<< dendl
;
461 if (native_async
&& m_image_ctx
.event_socket
.is_valid()) {
462 c
->set_event_notify(true);
465 if (!start_in_flight_io(c
)) {
469 auto tid
= ++m_last_tid
;
472 std::lock_guard locker
{m_lock
};
473 m_queued_or_blocked_io_tids
.insert(tid
);
476 ImageDispatchSpec
<I
> *req
= ImageDispatchSpec
<I
>::create_write_same_request(
477 m_image_ctx
, c
, off
, len
, std::move(bl
), op_flags
, trace
, tid
);
479 std::shared_lock owner_locker
{m_image_ctx
.owner_lock
};
480 if (m_image_ctx
.non_blocking_aio
|| writes_blocked()) {
483 process_io(req
, false);
484 finish_in_flight_io();
486 trace
.event("finish");
489 template <typename I
>
490 void ImageRequestWQ
<I
>::aio_compare_and_write(AioCompletion
*c
,
491 uint64_t off
, uint64_t len
,
494 uint64_t *mismatch_off
,
495 int op_flags
, bool native_async
) {
496 CephContext
*cct
= m_image_ctx
.cct
;
498 ZTracer::Trace trace
;
499 if (m_image_ctx
.blkin_trace_all
) {
500 trace
.init("wq: compare_and_write", &m_image_ctx
.trace_endpoint
);
504 c
->init_time(util::get_image_ctx(&m_image_ctx
), AIO_TYPE_COMPARE_AND_WRITE
);
505 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
506 << "completion=" << c
<< ", off=" << off
<< ", "
507 << "len=" << len
<< dendl
;
509 if (native_async
&& m_image_ctx
.event_socket
.is_valid()) {
510 c
->set_event_notify(true);
513 if (!start_in_flight_io(c
)) {
517 auto tid
= ++m_last_tid
;
520 std::lock_guard locker
{m_lock
};
521 m_queued_or_blocked_io_tids
.insert(tid
);
524 ImageDispatchSpec
<I
> *req
= ImageDispatchSpec
<I
>::create_compare_and_write_request(
525 m_image_ctx
, c
, {{off
, len
}}, std::move(cmp_bl
), std::move(bl
),
526 mismatch_off
, op_flags
, trace
, tid
);
528 std::shared_lock owner_locker
{m_image_ctx
.owner_lock
};
529 if (m_image_ctx
.non_blocking_aio
|| writes_blocked()) {
532 process_io(req
, false);
533 finish_in_flight_io();
535 trace
.event("finish");
538 template <typename I
>
539 bool ImageRequestWQ
<I
>::block_overlapping_io(
540 ImageExtentIntervals
* in_flight_image_extents
, uint64_t off
, uint64_t len
) {
541 CephContext
*cct
= m_image_ctx
.cct
;
542 ldout(cct
, 20) << "ictx=" << &m_image_ctx
543 << "off: " << off
<< " len: " << len
<<dendl
;
549 if (in_flight_image_extents
->empty() ||
550 !in_flight_image_extents
->intersects(off
, len
)) {
551 in_flight_image_extents
->insert(off
, len
);
558 template <typename I
>
559 void ImageRequestWQ
<I
>::unblock_overlapping_io(uint64_t offset
, uint64_t length
,
561 CephContext
*cct
= m_image_ctx
.cct
;
562 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< dendl
;
564 remove_in_flight_write_ios(offset
, length
, true, tid
);
566 std::unique_lock locker
{m_lock
};
567 if (!m_blocked_ios
.empty()) {
568 auto it
= m_blocked_ios
.begin();
569 while (it
!= m_blocked_ios
.end()) {
570 auto blocked_io
= *it
;
572 const auto& extents
= blocked_io
->get_image_extents();
573 uint64_t off
= extents
.size() ? extents
.front().first
: 0;
574 uint64_t len
= extents
.size() ? extents
.front().second
: 0;
576 if (block_overlapping_io(&m_in_flight_extents
, off
, len
)) {
579 ldout(cct
, 20) << "unblocking off: " << off
<< ", "
580 << "len: " << len
<< dendl
;
581 AioCompletion
*aio_comp
= blocked_io
->get_aio_completion();
583 m_blocked_ios
.erase(it
);
585 queue_unblocked_io(aio_comp
, blocked_io
);
591 template <typename I
>
592 void ImageRequestWQ
<I
>::unblock_flushes() {
593 CephContext
*cct
= m_image_ctx
.cct
;
594 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< dendl
;
595 std::unique_lock locker
{m_lock
};
596 auto io_tid_it
= m_queued_or_blocked_io_tids
.begin();
598 auto it
= m_queued_flushes
.begin();
599 if (it
== m_queued_flushes
.end() ||
600 (io_tid_it
!= m_queued_or_blocked_io_tids
.end() &&
601 *io_tid_it
< it
->first
)) {
605 auto blocked_flush
= *it
;
606 ldout(cct
, 20) << "unblocking flush: tid " << blocked_flush
.first
<< dendl
;
608 AioCompletion
*aio_comp
= blocked_flush
.second
->get_aio_completion();
610 m_queued_flushes
.erase(it
);
612 queue_unblocked_io(aio_comp
, blocked_flush
.second
);
617 template <typename I
>
618 void ImageRequestWQ
<I
>::queue_unblocked_io(AioCompletion
*comp
,
619 ImageDispatchSpec
<I
> *req
) {
620 if (!start_in_flight_io(comp
)) {
624 std::shared_lock owner_locker
{m_image_ctx
.owner_lock
};
628 template <typename I
>
629 void ImageRequestWQ
<I
>::shut_down(Context
*on_shutdown
) {
630 ceph_assert(ceph_mutex_is_locked(m_image_ctx
.owner_lock
));
633 std::unique_lock locker
{m_lock
};
634 ceph_assert(!m_shutdown
);
637 CephContext
*cct
= m_image_ctx
.cct
;
638 ldout(cct
, 5) << __func__
<< ": in_flight=" << m_in_flight_ios
.load()
640 if (m_in_flight_ios
> 0) {
641 m_on_shutdown
= on_shutdown
;
646 // ensure that all in-flight IO is flushed
647 flush_image(m_image_ctx
, on_shutdown
);
650 template <typename I
>
651 int ImageRequestWQ
<I
>::block_writes() {
652 C_SaferCond cond_ctx
;
653 block_writes(&cond_ctx
);
654 return cond_ctx
.wait();
657 template <typename I
>
658 void ImageRequestWQ
<I
>::block_writes(Context
*on_blocked
) {
659 ceph_assert(ceph_mutex_is_locked(m_image_ctx
.owner_lock
));
660 CephContext
*cct
= m_image_ctx
.cct
;
663 std::unique_lock locker
{m_lock
};
665 ldout(cct
, 5) << &m_image_ctx
<< ", " << "num="
666 << m_write_blockers
<< dendl
;
667 if (!m_write_blocker_contexts
.empty() || m_in_flight_writes
> 0) {
668 m_write_blocker_contexts
.push_back(on_blocked
);
673 // ensure that all in-flight IO is flushed
674 flush_image(m_image_ctx
, on_blocked
);
677 template <typename I
>
678 void ImageRequestWQ
<I
>::unblock_writes() {
679 CephContext
*cct
= m_image_ctx
.cct
;
681 bool wake_up
= false;
682 Contexts waiter_contexts
;
684 std::unique_lock locker
{m_lock
};
685 ceph_assert(m_write_blockers
> 0);
688 ldout(cct
, 5) << &m_image_ctx
<< ", " << "num="
689 << m_write_blockers
<< dendl
;
690 if (m_write_blockers
== 0) {
692 std::swap(waiter_contexts
, m_unblocked_write_waiter_contexts
);
697 for (auto ctx
: waiter_contexts
) {
704 template <typename I
>
705 void ImageRequestWQ
<I
>::wait_on_writes_unblocked(Context
*on_unblocked
) {
706 ceph_assert(ceph_mutex_is_locked(m_image_ctx
.owner_lock
));
707 CephContext
*cct
= m_image_ctx
.cct
;
710 std::unique_lock locker
{m_lock
};
711 ldout(cct
, 20) << &m_image_ctx
<< ", " << "write_blockers="
712 << m_write_blockers
<< dendl
;
713 if (!m_unblocked_write_waiter_contexts
.empty() || m_write_blockers
> 0) {
714 m_unblocked_write_waiter_contexts
.push_back(on_unblocked
);
719 on_unblocked
->complete(0);
722 template <typename I
>
723 void ImageRequestWQ
<I
>::set_require_lock(Direction direction
, bool enabled
) {
724 CephContext
*cct
= m_image_ctx
.cct
;
725 ldout(cct
, 20) << dendl
;
727 bool wake_up
= false;
729 std::unique_lock locker
{m_lock
};
732 wake_up
= (enabled
!= m_require_lock_on_read
);
733 m_require_lock_on_read
= enabled
;
735 case DIRECTION_WRITE
:
736 wake_up
= (enabled
!= m_require_lock_on_write
);
737 m_require_lock_on_write
= enabled
;
740 wake_up
= (enabled
!= m_require_lock_on_read
||
741 enabled
!= m_require_lock_on_write
);
742 m_require_lock_on_read
= enabled
;
743 m_require_lock_on_write
= enabled
;
748 // wake up the thread pool whenever the state changes so that
749 // we can re-request the lock if required
755 template <typename I
>
756 void ImageRequestWQ
<I
>::apply_qos_schedule_tick_min(uint64_t tick
){
757 for (auto pair
: m_throttles
) {
758 pair
.second
->set_schedule_tick_min(tick
);
762 template <typename I
>
763 void ImageRequestWQ
<I
>::apply_qos_limit(const uint64_t flag
,
766 CephContext
*cct
= m_image_ctx
.cct
;
767 TokenBucketThrottle
*throttle
= nullptr;
768 for (auto pair
: m_throttles
) {
769 if (flag
== pair
.first
) {
770 throttle
= pair
.second
;
774 ceph_assert(throttle
!= nullptr);
776 int r
= throttle
->set_limit(limit
, burst
);
778 lderr(cct
) << throttle
->get_name() << ": invalid qos parameter: "
779 << "burst(" << burst
<< ") is less than "
780 << "limit(" << limit
<< ")" << dendl
;
781 // if apply failed, we should at least make sure the limit works.
782 throttle
->set_limit(limit
, 0);
786 m_qos_enabled_flag
|= flag
;
788 m_qos_enabled_flag
&= ~flag
;
791 template <typename I
>
792 void ImageRequestWQ
<I
>::handle_throttle_ready(int r
, ImageDispatchSpec
<I
> *item
, uint64_t flag
) {
793 CephContext
*cct
= m_image_ctx
.cct
;
794 ldout(cct
, 15) << "r=" << r
<< ", " << "req=" << item
<< dendl
;
796 ceph_assert(m_io_throttled
.load() > 0);
797 item
->set_throttled(flag
);
798 if (item
->were_all_throttled()) {
799 this->requeue_back(item
);
805 template <typename I
>
806 bool ImageRequestWQ
<I
>::needs_throttle(ImageDispatchSpec
<I
> *item
) {
809 bool blocked
= false;
810 TokenBucketThrottle
* throttle
= nullptr;
812 for (auto t
: m_throttles
) {
814 if (item
->was_throttled(flag
))
817 if (!(m_qos_enabled_flag
& flag
)) {
818 item
->set_throttled(flag
);
823 if (item
->tokens_requested(flag
, &tokens
) &&
824 throttle
->get
<ImageRequestWQ
<I
>, ImageDispatchSpec
<I
>,
825 &ImageRequestWQ
<I
>::handle_throttle_ready
>(
826 tokens
, this, item
, flag
)) {
829 item
->set_throttled(flag
);
835 template <typename I
>
836 void *ImageRequestWQ
<I
>::_void_dequeue() {
837 CephContext
*cct
= m_image_ctx
.cct
;
838 ImageDispatchSpec
<I
> *peek_item
= this->front();
840 // no queued IO requests or all IO is blocked/stalled
841 if (peek_item
== nullptr || m_io_blockers
.load() > 0) {
845 if (needs_throttle(peek_item
)) {
846 ldout(cct
, 15) << "throttling IO " << peek_item
<< dendl
;
849 // dequeue the throttled item
850 ThreadPool::PointerWQ
<ImageDispatchSpec
<I
> >::_void_dequeue();
855 bool refresh_required
= m_image_ctx
.state
->is_refresh_required();
857 std::shared_lock locker
{m_lock
};
858 bool write_op
= peek_item
->is_write_op();
859 lock_required
= is_lock_required(write_op
);
861 if (!lock_required
&& m_write_blockers
> 0) {
862 // missing lock is not the write blocker
866 if (!lock_required
&& !refresh_required
&& !peek_item
->blocked
) {
867 // completed ops will requeue the IO -- don't count it as in-progress
868 m_in_flight_writes
++;
873 auto item
= reinterpret_cast<ImageDispatchSpec
<I
> *>(
874 ThreadPool::PointerWQ
<ImageDispatchSpec
<I
> >::_void_dequeue());
875 ceph_assert(peek_item
== item
);
878 this->get_pool_lock().unlock();
879 m_image_ctx
.owner_lock
.lock_shared();
880 if (m_image_ctx
.exclusive_lock
!= nullptr) {
881 ldout(cct
, 5) << "exclusive lock required: delaying IO " << item
<< dendl
;
882 if (!m_image_ctx
.get_exclusive_lock_policy()->may_auto_request_lock()) {
883 lderr(cct
) << "op requires exclusive lock" << dendl
;
884 fail_in_flight_io(m_image_ctx
.exclusive_lock
->get_unlocked_op_error(),
887 // wake up the IO since we won't be returning a request to process
890 // stall IO until the acquire completes
892 Context
*ctx
= new C_AcquireLock(this, item
);
893 ctx
= create_context_callback
<
894 Context
, &Context::complete
>(
895 ctx
, m_image_ctx
.exclusive_lock
);
896 m_image_ctx
.exclusive_lock
->acquire_lock(ctx
);
899 // raced with the exclusive lock being disabled
900 lock_required
= false;
902 m_image_ctx
.owner_lock
.unlock_shared();
903 this->get_pool_lock().lock();
910 if (refresh_required
) {
911 ldout(cct
, 5) << "image refresh required: delaying IO " << item
<< dendl
;
913 // stall IO until the refresh completes
916 this->get_pool_lock().unlock();
917 m_image_ctx
.state
->refresh(new C_RefreshFinish(this, item
));
918 this->get_pool_lock().lock();
925 template <typename I
>
926 void ImageRequestWQ
<I
>::process_io(ImageDispatchSpec
<I
> *req
,
927 bool non_blocking_io
) {
928 CephContext
*cct
= m_image_ctx
.cct
;
929 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
930 << "req=" << req
<< dendl
;
932 //extents are invalidated after the request is sent
933 //so gather them ahead of that
934 const auto& extents
= req
->get_image_extents();
935 bool write_op
= req
->is_write_op();
936 uint64_t tid
= req
->get_tid();
937 uint64_t offset
= extents
.size() ? extents
.front().first
: 0;
938 uint64_t length
= extents
.size() ? extents
.front().second
: 0;
940 if (write_op
&& !req
->blocked
) {
941 std::lock_guard locker
{m_lock
};
942 bool blocked
= block_overlapping_io(&m_in_flight_extents
, offset
, length
);
944 ldout(cct
, 20) << "blocking overlapping IO: " << "ictx="
945 << &m_image_ctx
<< ", "
946 << "off=" << offset
<< ", len=" << length
<< dendl
;
948 m_blocked_ios
.push_back(req
);
957 if (non_blocking_io
) {
958 finish_in_flight_write();
960 unblock_overlapping_io(offset
, length
, tid
);
966 template <typename I
>
967 void ImageRequestWQ
<I
>::process(ImageDispatchSpec
<I
> *req
) {
968 CephContext
*cct
= m_image_ctx
.cct
;
969 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
970 << "req=" << req
<< dendl
;
972 bool write_op
= req
->is_write_op();
974 process_io(req
, true);
976 finish_queued_io(write_op
);
977 finish_in_flight_io();
980 template <typename I
>
981 void ImageRequestWQ
<I
>::remove_in_flight_write_ios(uint64_t offset
, uint64_t length
,
982 bool write_op
, uint64_t tid
) {
983 CephContext
*cct
= m_image_ctx
.cct
;
984 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< dendl
;
986 std::lock_guard locker
{m_lock
};
989 if(!m_in_flight_extents
.empty()) {
990 CephContext
*cct
= m_image_ctx
.cct
;
991 ldout(cct
, 20) << "erasing in flight extents with tid:"
992 << tid
<< ", offset: " << offset
<< dendl
;
993 ImageExtentIntervals extents
;
994 extents
.insert(offset
, length
);
995 ImageExtentIntervals intersect
;
996 intersect
.intersection_of(extents
, m_in_flight_extents
);
997 m_in_flight_extents
.subtract(intersect
);
1000 m_queued_or_blocked_io_tids
.erase(tid
);
1005 template <typename I
>
1006 void ImageRequestWQ
<I
>::finish_queued_io(bool write_op
) {
1007 std::shared_lock locker
{m_lock
};
1009 ceph_assert(m_queued_writes
> 0);
1012 ceph_assert(m_queued_reads
> 0);
1017 template <typename I
>
1018 void ImageRequestWQ
<I
>::finish_in_flight_write() {
1019 bool writes_blocked
= false;
1021 std::shared_lock locker
{m_lock
};
1022 ceph_assert(m_in_flight_writes
> 0);
1023 if (--m_in_flight_writes
== 0 &&
1024 !m_write_blocker_contexts
.empty()) {
1025 writes_blocked
= true;
1028 if (writes_blocked
) {
1029 flush_image(m_image_ctx
, new C_BlockedWrites(this));
1033 template <typename I
>
1034 int ImageRequestWQ
<I
>::start_in_flight_io(AioCompletion
*c
) {
1035 std::shared_lock locker
{m_lock
};
1038 CephContext
*cct
= m_image_ctx
.cct
;
1039 lderr(cct
) << "IO received on closed image" << dendl
;
1041 c
->fail(-ESHUTDOWN
);
1045 if (!m_image_ctx
.data_ctx
.is_valid()) {
1046 CephContext
*cct
= m_image_ctx
.cct
;
1047 lderr(cct
) << "missing data pool" << dendl
;
1057 template <typename I
>
1058 void ImageRequestWQ
<I
>::finish_in_flight_io() {
1059 Context
*on_shutdown
;
1061 std::shared_lock locker
{m_lock
};
1062 if (--m_in_flight_ios
> 0 || !m_shutdown
) {
1065 on_shutdown
= m_on_shutdown
;
1068 CephContext
*cct
= m_image_ctx
.cct
;
1069 ldout(cct
, 5) << "completing shut down" << dendl
;
1071 ceph_assert(on_shutdown
!= nullptr);
1072 flush_image(m_image_ctx
, on_shutdown
);
1075 template <typename I
>
1076 void ImageRequestWQ
<I
>::fail_in_flight_io(
1077 int r
, ImageDispatchSpec
<I
> *req
) {
1078 this->process_finish();
1081 bool write_op
= req
->is_write_op();
1082 uint64_t tid
= req
->get_tid();
1083 const auto& extents
= req
->get_image_extents();
1084 uint64_t offset
= extents
.size() ? extents
.front().first
: 0;
1085 uint64_t length
= extents
.size() ? extents
.front().second
: 0;
1087 finish_queued_io(write_op
);
1088 remove_in_flight_write_ios(offset
, length
, write_op
, tid
);
1090 finish_in_flight_io();
1093 template <typename I
>
1094 bool ImageRequestWQ
<I
>::is_lock_required(bool write_op
) const {
1095 ceph_assert(ceph_mutex_is_locked(m_lock
));
1096 return ((write_op
&& m_require_lock_on_write
) ||
1097 (!write_op
&& m_require_lock_on_read
));
1100 template <typename I
>
1101 void ImageRequestWQ
<I
>::queue(ImageDispatchSpec
<I
> *req
) {
1102 ceph_assert(ceph_mutex_is_locked(m_image_ctx
.owner_lock
));
1104 CephContext
*cct
= m_image_ctx
.cct
;
1105 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
1106 << "req=" << req
<< dendl
;
1108 if (req
->is_write_op()) {
1114 ThreadPool::PointerWQ
<ImageDispatchSpec
<I
> >::queue(req
);
1117 template <typename I
>
1118 void ImageRequestWQ
<I
>::handle_acquire_lock(
1119 int r
, ImageDispatchSpec
<I
> *req
) {
1120 CephContext
*cct
= m_image_ctx
.cct
;
1121 ldout(cct
, 5) << "r=" << r
<< ", " << "req=" << req
<< dendl
;
1124 fail_in_flight_io(r
, req
);
1126 // since IO was stalled for acquire -- original IO order is preserved
1127 // if we requeue this op for work queue processing
1128 this->requeue_front(req
);
1131 ceph_assert(m_io_blockers
.load() > 0);
1136 template <typename I
>
1137 void ImageRequestWQ
<I
>::handle_refreshed(
1138 int r
, ImageDispatchSpec
<I
> *req
) {
1139 CephContext
*cct
= m_image_ctx
.cct
;
1140 ldout(cct
, 5) << "resuming IO after image refresh: r=" << r
<< ", "
1141 << "req=" << req
<< dendl
;
1143 fail_in_flight_io(r
, req
);
1145 // since IO was stalled for refresh -- original IO order is preserved
1146 // if we requeue this op for work queue processing
1147 this->requeue_front(req
);
1150 ceph_assert(m_io_blockers
.load() > 0);
1155 template <typename I
>
1156 void ImageRequestWQ
<I
>::handle_blocked_writes(int r
) {
1159 std::unique_lock locker
{m_lock
};
1160 contexts
.swap(m_write_blocker_contexts
);
1163 for (auto ctx
: contexts
) {
1168 template class librbd::io::ImageRequestWQ
<librbd::ImageCtx
>;
1171 } // namespace librbd