1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "librbd/cache/WriteAroundObjectDispatch.h"
5 #include "common/dout.h"
6 #include "common/errno.h"
7 #include "librbd/ImageCtx.h"
8 #include "librbd/Utils.h"
9 #include "librbd/asio/ContextWQ.h"
10 #include "librbd/io/ObjectDispatchSpec.h"
11 #include "librbd/io/ObjectDispatcherInterface.h"
13 #define dout_subsys ceph_subsys_rbd
15 #define dout_prefix *_dout << "librbd::cache::WriteAroundObjectDispatch: " \
16 << this << " " << __func__ << ": "
21 using librbd::util::data_object_name
;
24 WriteAroundObjectDispatch
<I
>::WriteAroundObjectDispatch(
25 I
* image_ctx
, size_t max_dirty
, bool writethrough_until_flush
)
26 : m_image_ctx(image_ctx
), m_init_max_dirty(max_dirty
), m_max_dirty(max_dirty
),
27 m_lock(ceph::make_mutex(util::unique_lock_name(
28 "librbd::cache::WriteAroundObjectDispatch::lock", this))) {
29 if (writethrough_until_flush
) {
35 WriteAroundObjectDispatch
<I
>::~WriteAroundObjectDispatch() {
39 void WriteAroundObjectDispatch
<I
>::init() {
40 auto cct
= m_image_ctx
->cct
;
41 ldout(cct
, 5) << dendl
;
43 // add ourself to the IO object dispatcher chain
44 if (m_init_max_dirty
> 0) {
45 m_image_ctx
->disable_zero_copy
= true;
47 m_image_ctx
->io_object_dispatcher
->register_dispatch(this);
51 void WriteAroundObjectDispatch
<I
>::shut_down(Context
* on_finish
) {
52 auto cct
= m_image_ctx
->cct
;
53 ldout(cct
, 5) << dendl
;
55 on_finish
->complete(0);
59 bool WriteAroundObjectDispatch
<I
>::read(
60 uint64_t object_no
, io::ReadExtents
* extents
, IOContext io_context
,
61 int op_flags
, int read_flags
, const ZTracer::Trace
&parent_trace
,
62 uint64_t* version
, int* object_dispatch_flags
,
63 io::DispatchResult
* dispatch_result
, Context
** on_finish
,
64 Context
* on_dispatched
) {
66 for (auto& extent
: *extents
) {
67 handled
|= dispatch_unoptimized_io(object_no
, extent
.offset
, extent
.length
,
68 dispatch_result
, on_dispatched
);
74 bool WriteAroundObjectDispatch
<I
>::discard(
75 uint64_t object_no
, uint64_t object_off
, uint64_t object_len
,
76 IOContext io_context
, int discard_flags
,
77 const ZTracer::Trace
&parent_trace
, int* object_dispatch_flags
,
78 uint64_t* journal_tid
, io::DispatchResult
* dispatch_result
,
79 Context
** on_finish
, Context
* on_dispatched
) {
80 auto cct
= m_image_ctx
->cct
;
81 ldout(cct
, 20) << data_object_name(m_image_ctx
, object_no
) << " "
82 << object_off
<< "~" << object_len
<< dendl
;
84 return dispatch_io(object_no
, object_off
, object_len
, 0, dispatch_result
,
85 on_finish
, on_dispatched
);
89 bool WriteAroundObjectDispatch
<I
>::write(
90 uint64_t object_no
, uint64_t object_off
, ceph::bufferlist
&& data
,
91 IOContext io_context
, int op_flags
, int write_flags
,
92 std::optional
<uint64_t> assert_version
,
93 const ZTracer::Trace
&parent_trace
, int* object_dispatch_flags
,
94 uint64_t* journal_tid
, io::DispatchResult
* dispatch_result
,
95 Context
**on_finish
, Context
* on_dispatched
) {
96 auto cct
= m_image_ctx
->cct
;
97 ldout(cct
, 20) << data_object_name(m_image_ctx
, object_no
) << " "
98 << object_off
<< "~" << data
.length() << dendl
;
100 return dispatch_io(object_no
, object_off
, data
.length(), op_flags
,
101 dispatch_result
, on_finish
, on_dispatched
);
104 template <typename I
>
105 bool WriteAroundObjectDispatch
<I
>::write_same(
106 uint64_t object_no
, uint64_t object_off
, uint64_t object_len
,
107 io::LightweightBufferExtents
&& buffer_extents
, ceph::bufferlist
&& data
,
108 IOContext io_context
, int op_flags
,
109 const ZTracer::Trace
&parent_trace
, int* object_dispatch_flags
,
110 uint64_t* journal_tid
, io::DispatchResult
* dispatch_result
,
111 Context
**on_finish
, Context
* on_dispatched
) {
112 auto cct
= m_image_ctx
->cct
;
113 ldout(cct
, 20) << data_object_name(m_image_ctx
, object_no
) << " "
114 << object_off
<< "~" << object_len
<< dendl
;
116 return dispatch_io(object_no
, object_off
, object_len
, op_flags
,
117 dispatch_result
, on_finish
, on_dispatched
);
120 template <typename I
>
121 bool WriteAroundObjectDispatch
<I
>::compare_and_write(
122 uint64_t object_no
, uint64_t object_off
, ceph::bufferlist
&& cmp_data
,
123 ceph::bufferlist
&& write_data
, IOContext io_context
, int op_flags
,
124 const ZTracer::Trace
&parent_trace
, uint64_t* mismatch_offset
,
125 int* object_dispatch_flags
, uint64_t* journal_tid
,
126 io::DispatchResult
* dispatch_result
, Context
** on_finish
,
127 Context
* on_dispatched
) {
128 return dispatch_unoptimized_io(object_no
, object_off
, cmp_data
.length(),
129 dispatch_result
, on_dispatched
);
132 template <typename I
>
133 bool WriteAroundObjectDispatch
<I
>::flush(
134 io::FlushSource flush_source
, const ZTracer::Trace
&parent_trace
,
135 uint64_t* journal_tid
, io::DispatchResult
* dispatch_result
,
136 Context
** on_finish
, Context
* on_dispatched
) {
137 auto cct
= m_image_ctx
->cct
;
138 ldout(cct
, 20) << dendl
;
140 std::lock_guard locker
{m_lock
};
141 if (flush_source
== io::FLUSH_SOURCE_USER
&& !m_user_flushed
) {
142 m_user_flushed
= true;
143 if (m_max_dirty
== 0 && m_init_max_dirty
> 0) {
144 ldout(cct
, 5) << "first user flush: enabling write-around" << dendl
;
145 m_max_dirty
= m_init_max_dirty
;
149 if (m_in_flight_io_tids
.empty()) {
150 // no in-flight IO (also implies no queued/blocked IO)
154 auto tid
= ++m_last_tid
;
155 auto ctx
= util::create_async_context_callback(*m_image_ctx
, *on_finish
);
157 *dispatch_result
= io::DISPATCH_RESULT_CONTINUE
;
158 *on_finish
= new LambdaContext([this, tid
](int r
) {
159 handle_in_flight_flush_complete(r
, tid
);
162 if (m_queued_ios
.empty() && m_blocked_ios
.empty()) {
163 // immediately allow the flush to be dispatched
164 ldout(cct
, 20) << "dispatching: tid=" << tid
<< dendl
;
165 m_in_flight_flushes
.emplace(tid
, ctx
);
169 // cannot dispatch the flush until after preceeding IO is dispatched
170 ldout(cct
, 20) << "queueing: tid=" << tid
<< dendl
;
171 m_queued_flushes
.emplace(tid
, QueuedFlush
{ctx
, on_dispatched
});
175 template <typename I
>
176 bool WriteAroundObjectDispatch
<I
>::dispatch_unoptimized_io(
177 uint64_t object_no
, uint64_t object_off
, uint64_t object_len
,
178 io::DispatchResult
* dispatch_result
, Context
* on_dispatched
) {
179 auto cct
= m_image_ctx
->cct
;
182 auto in_flight_extents_it
= m_in_flight_extents
.find(object_no
);
183 if (in_flight_extents_it
== m_in_flight_extents
.end() ||
184 !in_flight_extents_it
->second
.intersects(object_off
, object_len
)) {
185 // no IO in-flight to the specified extent
190 // write IO is in-flight -- it needs to complete before the unoptimized
191 // IO can be dispatched
192 auto tid
= ++m_last_tid
;
193 ldout(cct
, 20) << "blocked by in-flight IO: tid=" << tid
<< dendl
;
194 *dispatch_result
= io::DISPATCH_RESULT_CONTINUE
;
195 m_blocked_unoptimized_ios
[object_no
].emplace(
196 tid
, BlockedIO
{object_off
, object_len
, nullptr, on_dispatched
});
202 template <typename I
>
203 bool WriteAroundObjectDispatch
<I
>::dispatch_io(
204 uint64_t object_no
, uint64_t object_off
, uint64_t object_len
,
205 int op_flags
, io::DispatchResult
* dispatch_result
, Context
** on_finish
,
206 Context
* on_dispatched
) {
207 auto cct
= m_image_ctx
->cct
;
210 if (m_max_dirty
== 0) {
211 // write-through mode is active -- no-op the cache
216 if ((op_flags
& LIBRADOS_OP_FLAG_FADVISE_FUA
) != 0) {
217 // force unit access flag is set -- disable write-around
219 return dispatch_unoptimized_io(object_no
, object_off
, object_len
,
220 dispatch_result
, on_dispatched
);
223 auto tid
= ++m_last_tid
;
224 auto ctx
= util::create_async_context_callback(*m_image_ctx
, *on_finish
);
226 *dispatch_result
= io::DISPATCH_RESULT_CONTINUE
;
227 *on_finish
= new LambdaContext(
228 [this, tid
, object_no
, object_off
, object_len
](int r
) {
229 handle_in_flight_io_complete(r
, tid
, object_no
, object_off
, object_len
);
232 bool blocked
= block_overlapping_io(&m_in_flight_extents
[object_no
],
233 object_off
, object_len
);
235 ldout(cct
, 20) << "blocked on overlap: tid=" << tid
<< dendl
;
236 m_queued_or_blocked_io_tids
.insert(tid
);
237 m_blocked_ios
[object_no
].emplace(tid
, BlockedIO
{object_off
, object_len
, ctx
,
240 } else if (can_dispatch_io(tid
, object_len
)) {
243 ldout(cct
, 20) << "dispatching: tid=" << tid
<< dendl
;
244 on_dispatched
->complete(0);
247 ldout(cct
, 20) << "queueing: tid=" << tid
<< dendl
;
248 m_queued_or_blocked_io_tids
.insert(tid
);
249 m_queued_ios
.emplace(tid
, QueuedIO
{object_len
, ctx
, on_dispatched
});
255 template <typename I
>
256 bool WriteAroundObjectDispatch
<I
>::block_overlapping_io(
257 InFlightObjectExtents
* in_flight_object_extents
, uint64_t object_off
,
258 uint64_t object_len
) {
259 if (in_flight_object_extents
->intersects(object_off
, object_len
)) {
263 in_flight_object_extents
->insert(object_off
, object_len
);
267 template <typename I
>
268 void WriteAroundObjectDispatch
<I
>::unblock_overlapping_ios(
269 uint64_t object_no
, uint64_t object_off
, uint64_t object_len
,
270 Contexts
* unoptimized_io_dispatches
) {
271 auto cct
= m_image_ctx
->cct
;
272 ceph_assert(ceph_mutex_is_locked(m_lock
));
274 auto in_flight_extents_it
= m_in_flight_extents
.find(object_no
);
275 ceph_assert(in_flight_extents_it
!= m_in_flight_extents
.end());
277 auto& in_flight_object_extents
= in_flight_extents_it
->second
;
278 in_flight_object_extents
.erase(object_off
, object_len
);
280 // handle unoptimized IOs that were blocked by in-flight IO
281 InFlightObjectExtents blocked_unoptimized_ios
;
282 auto blocked_unoptimized_ios_it
= m_blocked_unoptimized_ios
.find(object_no
);
283 if (blocked_unoptimized_ios_it
!= m_blocked_unoptimized_ios
.end()) {
284 auto& blocked_unoptimized_object_ios
= blocked_unoptimized_ios_it
->second
;
285 for (auto it
= blocked_unoptimized_object_ios
.begin();
286 it
!= blocked_unoptimized_object_ios
.end();) {
287 auto& blocked_io
= it
->second
;
288 if (!in_flight_object_extents
.intersects(blocked_io
.offset
,
289 blocked_io
.length
)) {
290 unoptimized_io_dispatches
->emplace(it
->first
, blocked_io
.on_dispatched
);
291 it
= blocked_unoptimized_object_ios
.erase(it
);
293 blocked_unoptimized_ios
.union_insert(blocked_io
.offset
,
299 if (blocked_unoptimized_object_ios
.empty()) {
300 m_blocked_unoptimized_ios
.erase(blocked_unoptimized_ios_it
);
304 // handle optimized IOs that were blocked
305 auto blocked_io_it
= m_blocked_ios
.find(object_no
);
306 if (blocked_io_it
!= m_blocked_ios
.end()) {
307 auto& blocked_object_ios
= blocked_io_it
->second
;
309 auto blocked_object_ios_it
= blocked_object_ios
.begin();
310 while (blocked_object_ios_it
!= blocked_object_ios
.end()) {
311 auto next_blocked_object_ios_it
= blocked_object_ios_it
;
312 ++next_blocked_object_ios_it
;
314 auto& blocked_io
= blocked_object_ios_it
->second
;
315 if (blocked_unoptimized_ios
.intersects(blocked_io
.offset
,
316 blocked_io
.length
) ||
317 block_overlapping_io(&in_flight_object_extents
, blocked_io
.offset
,
318 blocked_io
.length
)) {
322 // move unblocked IO to the queued list, which will get processed when
324 auto tid
= blocked_object_ios_it
->first
;
325 ldout(cct
, 20) << "queueing unblocked: tid=" << tid
<< dendl
;
326 m_queued_ios
.emplace(tid
, blocked_io
);
328 blocked_object_ios
.erase(blocked_object_ios_it
);
329 blocked_object_ios_it
= next_blocked_object_ios_it
;
332 if (blocked_object_ios
.empty()) {
333 m_blocked_ios
.erase(blocked_io_it
);
337 if (in_flight_object_extents
.empty()) {
338 m_in_flight_extents
.erase(in_flight_extents_it
);
342 template <typename I
>
343 bool WriteAroundObjectDispatch
<I
>::can_dispatch_io(
344 uint64_t tid
, uint64_t length
) {
345 ceph_assert(ceph_mutex_is_locked(m_lock
));
347 if (m_in_flight_bytes
== 0 || m_in_flight_bytes
+ length
<= m_max_dirty
) {
348 // no in-flight IO or still under max write-around in-flight limit.
349 // allow the dispatcher to proceed to send the IO but complete it back
351 m_in_flight_bytes
+= length
;
352 m_in_flight_io_tids
.insert(tid
);
359 template <typename I
>
360 void WriteAroundObjectDispatch
<I
>::handle_in_flight_io_complete(
361 int r
, uint64_t tid
, uint64_t object_no
, uint64_t object_off
,
362 uint64_t object_len
) {
363 auto cct
= m_image_ctx
->cct
;
364 ldout(cct
, 20) << "r=" << r
<< ", tid=" << tid
<< dendl
;
367 m_in_flight_io_tids
.erase(tid
);
368 ceph_assert(m_in_flight_bytes
>= object_len
);
369 m_in_flight_bytes
-= object_len
;
372 lderr(cct
) << "IO error encountered: tid=" << tid
<< ": "
373 << cpp_strerror(r
) << dendl
;
374 if (m_pending_flush_error
== 0) {
375 m_pending_flush_error
= r
;
379 // any overlapping blocked IOs can be queued now
380 Contexts unoptimized_io_dispatches
;
381 unblock_overlapping_ios(object_no
, object_off
, object_len
,
382 &unoptimized_io_dispatches
);
384 // collect any flushes that are ready for completion
385 int pending_flush_error
= 0;
386 auto finished_flushes
= collect_finished_flushes();
387 if (!finished_flushes
.empty()) {
388 std::swap(pending_flush_error
, m_pending_flush_error
);
391 // collect any queued IOs that are ready for dispatch
392 auto ready_ios
= collect_ready_ios();
394 // collect any queued flushes that were tied to queued IOs
395 auto ready_flushes
= collect_ready_flushes();
398 // dispatch any ready unoptimized IOs
399 for (auto& it
: unoptimized_io_dispatches
) {
400 ldout(cct
, 20) << "dispatching unoptimized IO: tid=" << it
.first
<< dendl
;
401 it
.second
->complete(0);
404 // complete flushes that were waiting on in-flight IO
405 // (and propogate any IO error to first flush)
406 for (auto& it
: finished_flushes
) {
407 ldout(cct
, 20) << "completing flush: tid=" << it
.first
<< ", "
408 << "r=" << pending_flush_error
<< dendl
;
409 it
.second
->complete(pending_flush_error
);
412 // dispatch any ready queued IOs
413 for (auto& it
: ready_ios
) {
414 ldout(cct
, 20) << "dispatching IO: tid=" << it
.first
<< dendl
;
415 it
.second
.on_dispatched
->complete(0);
416 it
.second
.on_finish
->complete(0);
419 // dispatch any ready flushes
420 for (auto& it
: ready_flushes
) {
421 ldout(cct
, 20) << "dispatching flush: tid=" << it
.first
<< dendl
;
422 it
.second
->complete(0);
426 template <typename I
>
427 void WriteAroundObjectDispatch
<I
>::handle_in_flight_flush_complete(
428 int r
, uint64_t tid
) {
429 auto cct
= m_image_ctx
->cct
;
430 ldout(cct
, 20) << "r=" << r
<< ", tid=" << tid
<< dendl
;
434 // move the in-flight flush to the pending completion list
435 auto it
= m_in_flight_flushes
.find(tid
);
436 ceph_assert(it
!= m_in_flight_flushes
.end());
438 m_pending_flushes
.emplace(it
->first
, it
->second
);
439 m_in_flight_flushes
.erase(it
);
441 // collect any flushes that are ready for completion
442 int pending_flush_error
= 0;
443 auto finished_flushes
= collect_finished_flushes();
444 if (!finished_flushes
.empty()) {
445 std::swap(pending_flush_error
, m_pending_flush_error
);
449 // complete flushes that were waiting on in-flight IO
450 // (and propogate any IO errors)
451 for (auto& it
: finished_flushes
) {
452 ldout(cct
, 20) << "completing flush: tid=" << it
.first
<< dendl
;
453 it
.second
->complete(pending_flush_error
);
454 pending_flush_error
= 0;
458 template <typename I
>
459 typename WriteAroundObjectDispatch
<I
>::QueuedIOs
460 WriteAroundObjectDispatch
<I
>::collect_ready_ios() {
461 ceph_assert(ceph_mutex_is_locked(m_lock
));
463 QueuedIOs queued_ios
;
466 auto it
= m_queued_ios
.begin();
467 if (it
== m_queued_ios
.end() ||
468 !can_dispatch_io(it
->first
, it
->second
.length
)) {
472 queued_ios
.emplace(it
->first
, it
->second
);
473 m_queued_or_blocked_io_tids
.erase(it
->first
);
474 m_queued_ios
.erase(it
);
479 template <typename I
>
480 typename WriteAroundObjectDispatch
<I
>::Contexts
481 WriteAroundObjectDispatch
<I
>::collect_ready_flushes() {
482 ceph_assert(ceph_mutex_is_locked(m_lock
));
484 Contexts ready_flushes
;
485 auto io_tid_it
= m_queued_or_blocked_io_tids
.begin();
487 auto it
= m_queued_flushes
.begin();
488 if (it
== m_queued_flushes
.end() ||
489 (io_tid_it
!= m_queued_or_blocked_io_tids
.end() &&
490 *io_tid_it
< it
->first
)) {
494 m_in_flight_flushes
.emplace(it
->first
, it
->second
.on_finish
);
495 ready_flushes
.emplace(it
->first
, it
->second
.on_dispatched
);
496 m_queued_flushes
.erase(it
);
499 return ready_flushes
;
502 template <typename I
>
503 typename WriteAroundObjectDispatch
<I
>::Contexts
504 WriteAroundObjectDispatch
<I
>::collect_finished_flushes() {
505 ceph_assert(ceph_mutex_is_locked(m_lock
));
507 Contexts finished_flushes
;
508 auto io_tid_it
= m_in_flight_io_tids
.begin();
510 auto it
= m_pending_flushes
.begin();
511 if (it
== m_pending_flushes
.end() ||
512 (io_tid_it
!= m_in_flight_io_tids
.end() && *io_tid_it
< it
->first
)) {
516 finished_flushes
.emplace(it
->first
, it
->second
);
517 m_pending_flushes
.erase(it
);
519 return finished_flushes
;
523 } // namespace librbd
525 template class librbd::cache::WriteAroundObjectDispatch
<librbd::ImageCtx
>;