]> git.proxmox.com Git - ceph.git/blame - ceph/src/librbd/cache/WriteAroundObjectDispatch.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / librbd / cache / WriteAroundObjectDispatch.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "librbd/cache/WriteAroundObjectDispatch.h"
5#include "common/dout.h"
6#include "common/errno.h"
9f95a23c
TL
7#include "librbd/ImageCtx.h"
8#include "librbd/Utils.h"
f67539c2 9#include "librbd/asio/ContextWQ.h"
9f95a23c 10#include "librbd/io/ObjectDispatchSpec.h"
f67539c2 11#include "librbd/io/ObjectDispatcherInterface.h"
9f95a23c
TL
12
13#define dout_subsys ceph_subsys_rbd
14#undef dout_prefix
15#define dout_prefix *_dout << "librbd::cache::WriteAroundObjectDispatch: " \
16 << this << " " << __func__ << ": "
17
18namespace librbd {
19namespace cache {
20
21using librbd::util::data_object_name;
22
23template <typename I>
24WriteAroundObjectDispatch<I>::WriteAroundObjectDispatch(
25 I* image_ctx, size_t max_dirty, bool writethrough_until_flush)
26 : m_image_ctx(image_ctx), m_init_max_dirty(max_dirty), m_max_dirty(max_dirty),
27 m_lock(ceph::make_mutex(util::unique_lock_name(
28 "librbd::cache::WriteAroundObjectDispatch::lock", this))) {
29 if (writethrough_until_flush) {
30 m_max_dirty = 0;
31 }
32}
33
34template <typename I>
35WriteAroundObjectDispatch<I>::~WriteAroundObjectDispatch() {
36}
37
38template <typename I>
39void WriteAroundObjectDispatch<I>::init() {
40 auto cct = m_image_ctx->cct;
41 ldout(cct, 5) << dendl;
42
43 // add ourself to the IO object dispatcher chain
44 if (m_init_max_dirty > 0) {
45 m_image_ctx->disable_zero_copy = true;
46 }
f67539c2 47 m_image_ctx->io_object_dispatcher->register_dispatch(this);
9f95a23c
TL
48}
49
50template <typename I>
51void WriteAroundObjectDispatch<I>::shut_down(Context* on_finish) {
52 auto cct = m_image_ctx->cct;
53 ldout(cct, 5) << dendl;
54
55 on_finish->complete(0);
56}
57
58template <typename I>
59bool WriteAroundObjectDispatch<I>::read(
f67539c2
TL
60 uint64_t object_no, io::ReadExtents* extents, IOContext io_context,
61 int op_flags, int read_flags, const ZTracer::Trace &parent_trace,
62 uint64_t* version, int* object_dispatch_flags,
63 io::DispatchResult* dispatch_result, Context** on_finish,
64 Context* on_dispatched) {
65 bool handled = false;
66 for (auto& extent: *extents) {
67 handled |= dispatch_unoptimized_io(object_no, extent.offset, extent.length,
68 dispatch_result, on_dispatched);
69 }
70 return handled;
9f95a23c
TL
71}
72
73template <typename I>
74bool WriteAroundObjectDispatch<I>::discard(
75 uint64_t object_no, uint64_t object_off, uint64_t object_len,
f67539c2 76 IOContext io_context, int discard_flags,
9f95a23c
TL
77 const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
78 uint64_t* journal_tid, io::DispatchResult* dispatch_result,
79 Context** on_finish, Context* on_dispatched) {
80 auto cct = m_image_ctx->cct;
81 ldout(cct, 20) << data_object_name(m_image_ctx, object_no) << " "
82 << object_off << "~" << object_len << dendl;
83
84 return dispatch_io(object_no, object_off, object_len, 0, dispatch_result,
85 on_finish, on_dispatched);
86}
87
88template <typename I>
89bool WriteAroundObjectDispatch<I>::write(
90 uint64_t object_no, uint64_t object_off, ceph::bufferlist&& data,
f67539c2
TL
91 IOContext io_context, int op_flags, int write_flags,
92 std::optional<uint64_t> assert_version,
9f95a23c
TL
93 const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
94 uint64_t* journal_tid, io::DispatchResult* dispatch_result,
95 Context**on_finish, Context* on_dispatched) {
96 auto cct = m_image_ctx->cct;
97 ldout(cct, 20) << data_object_name(m_image_ctx, object_no) << " "
98 << object_off << "~" << data.length() << dendl;
99
100 return dispatch_io(object_no, object_off, data.length(), op_flags,
101 dispatch_result, on_finish, on_dispatched);
102}
103
104template <typename I>
105bool WriteAroundObjectDispatch<I>::write_same(
106 uint64_t object_no, uint64_t object_off, uint64_t object_len,
107 io::LightweightBufferExtents&& buffer_extents, ceph::bufferlist&& data,
f67539c2 108 IOContext io_context, int op_flags,
9f95a23c
TL
109 const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
110 uint64_t* journal_tid, io::DispatchResult* dispatch_result,
111 Context**on_finish, Context* on_dispatched) {
112 auto cct = m_image_ctx->cct;
113 ldout(cct, 20) << data_object_name(m_image_ctx, object_no) << " "
114 << object_off << "~" << object_len << dendl;
115
20effc67
TL
116 return dispatch_io(object_no, object_off, object_len, op_flags,
117 dispatch_result, on_finish, on_dispatched);
9f95a23c
TL
118}
119
120template <typename I>
121bool WriteAroundObjectDispatch<I>::compare_and_write(
122 uint64_t object_no, uint64_t object_off, ceph::bufferlist&& cmp_data,
f67539c2 123 ceph::bufferlist&& write_data, IOContext io_context, int op_flags,
9f95a23c
TL
124 const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset,
125 int* object_dispatch_flags, uint64_t* journal_tid,
126 io::DispatchResult* dispatch_result, Context** on_finish,
127 Context* on_dispatched) {
128 return dispatch_unoptimized_io(object_no, object_off, cmp_data.length(),
129 dispatch_result, on_dispatched);
130}
131
132template <typename I>
133bool WriteAroundObjectDispatch<I>::flush(
134 io::FlushSource flush_source, const ZTracer::Trace &parent_trace,
135 uint64_t* journal_tid, io::DispatchResult* dispatch_result,
136 Context** on_finish, Context* on_dispatched) {
137 auto cct = m_image_ctx->cct;
138 ldout(cct, 20) << dendl;
139
140 std::lock_guard locker{m_lock};
141 if (flush_source == io::FLUSH_SOURCE_USER && !m_user_flushed) {
142 m_user_flushed = true;
143 if (m_max_dirty == 0 && m_init_max_dirty > 0) {
144 ldout(cct, 5) << "first user flush: enabling write-around" << dendl;
145 m_max_dirty = m_init_max_dirty;
146 }
147 }
148
149 if (m_in_flight_io_tids.empty()) {
150 // no in-flight IO (also implies no queued/blocked IO)
151 return false;
152 }
153
154 auto tid = ++m_last_tid;
155 auto ctx = util::create_async_context_callback(*m_image_ctx, *on_finish);
156
157 *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
158 *on_finish = new LambdaContext([this, tid](int r) {
159 handle_in_flight_flush_complete(r, tid);
160 });
161
162 if (m_queued_ios.empty() && m_blocked_ios.empty()) {
163 // immediately allow the flush to be dispatched
164 ldout(cct, 20) << "dispatching: tid=" << tid << dendl;
165 m_in_flight_flushes.emplace(tid, ctx);
166 return false;
167 }
168
169 // cannot dispatch the flush until after preceeding IO is dispatched
170 ldout(cct, 20) << "queueing: tid=" << tid << dendl;
171 m_queued_flushes.emplace(tid, QueuedFlush{ctx, on_dispatched});
172 return true;
173}
174
175template <typename I>
176bool WriteAroundObjectDispatch<I>::dispatch_unoptimized_io(
177 uint64_t object_no, uint64_t object_off, uint64_t object_len,
178 io::DispatchResult* dispatch_result, Context* on_dispatched) {
179 auto cct = m_image_ctx->cct;
180
181 m_lock.lock();
182 auto in_flight_extents_it = m_in_flight_extents.find(object_no);
183 if (in_flight_extents_it == m_in_flight_extents.end() ||
184 !in_flight_extents_it->second.intersects(object_off, object_len)) {
185 // no IO in-flight to the specified extent
186 m_lock.unlock();
187 return false;
188 }
189
190 // write IO is in-flight -- it needs to complete before the unoptimized
191 // IO can be dispatched
192 auto tid = ++m_last_tid;
193 ldout(cct, 20) << "blocked by in-flight IO: tid=" << tid << dendl;
194 *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
195 m_blocked_unoptimized_ios[object_no].emplace(
196 tid, BlockedIO{object_off, object_len, nullptr, on_dispatched});
197 m_lock.unlock();
198
199 return true;
200}
201
202template <typename I>
203bool WriteAroundObjectDispatch<I>::dispatch_io(
204 uint64_t object_no, uint64_t object_off, uint64_t object_len,
205 int op_flags, io::DispatchResult* dispatch_result, Context** on_finish,
206 Context* on_dispatched) {
207 auto cct = m_image_ctx->cct;
208
209 m_lock.lock();
210 if (m_max_dirty == 0) {
211 // write-through mode is active -- no-op the cache
212 m_lock.unlock();
213 return false;
214 }
215
216 if ((op_flags & LIBRADOS_OP_FLAG_FADVISE_FUA) != 0) {
217 // force unit access flag is set -- disable write-around
218 m_lock.unlock();
219 return dispatch_unoptimized_io(object_no, object_off, object_len,
220 dispatch_result, on_dispatched);
221 }
222
223 auto tid = ++m_last_tid;
224 auto ctx = util::create_async_context_callback(*m_image_ctx, *on_finish);
225
226 *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
227 *on_finish = new LambdaContext(
228 [this, tid, object_no, object_off, object_len](int r) {
229 handle_in_flight_io_complete(r, tid, object_no, object_off, object_len);
230 });
231
232 bool blocked = block_overlapping_io(&m_in_flight_extents[object_no],
233 object_off, object_len);
234 if (blocked) {
235 ldout(cct, 20) << "blocked on overlap: tid=" << tid << dendl;
236 m_queued_or_blocked_io_tids.insert(tid);
237 m_blocked_ios[object_no].emplace(tid, BlockedIO{object_off, object_len, ctx,
238 on_dispatched});
239 m_lock.unlock();
240 } else if (can_dispatch_io(tid, object_len)) {
241 m_lock.unlock();
242
243 ldout(cct, 20) << "dispatching: tid=" << tid << dendl;
244 on_dispatched->complete(0);
245 ctx->complete(0);
246 } else {
247 ldout(cct, 20) << "queueing: tid=" << tid << dendl;
248 m_queued_or_blocked_io_tids.insert(tid);
249 m_queued_ios.emplace(tid, QueuedIO{object_len, ctx, on_dispatched});
250 m_lock.unlock();
251 }
252 return true;
253}
254
255template <typename I>
256bool WriteAroundObjectDispatch<I>::block_overlapping_io(
257 InFlightObjectExtents* in_flight_object_extents, uint64_t object_off,
258 uint64_t object_len) {
259 if (in_flight_object_extents->intersects(object_off, object_len)) {
260 return true;
261 }
262
263 in_flight_object_extents->insert(object_off, object_len);
264 return false;
265}
266
267template <typename I>
268void WriteAroundObjectDispatch<I>::unblock_overlapping_ios(
269 uint64_t object_no, uint64_t object_off, uint64_t object_len,
270 Contexts* unoptimized_io_dispatches) {
271 auto cct = m_image_ctx->cct;
272 ceph_assert(ceph_mutex_is_locked(m_lock));
273
274 auto in_flight_extents_it = m_in_flight_extents.find(object_no);
275 ceph_assert(in_flight_extents_it != m_in_flight_extents.end());
276
277 auto& in_flight_object_extents = in_flight_extents_it->second;
278 in_flight_object_extents.erase(object_off, object_len);
279
280 // handle unoptimized IOs that were blocked by in-flight IO
281 InFlightObjectExtents blocked_unoptimized_ios;
282 auto blocked_unoptimized_ios_it = m_blocked_unoptimized_ios.find(object_no);
283 if (blocked_unoptimized_ios_it != m_blocked_unoptimized_ios.end()) {
284 auto& blocked_unoptimized_object_ios = blocked_unoptimized_ios_it->second;
285 for (auto it = blocked_unoptimized_object_ios.begin();
286 it != blocked_unoptimized_object_ios.end();) {
287 auto& blocked_io = it->second;
288 if (!in_flight_object_extents.intersects(blocked_io.offset,
289 blocked_io.length)) {
290 unoptimized_io_dispatches->emplace(it->first, blocked_io.on_dispatched);
291 it = blocked_unoptimized_object_ios.erase(it);
292 } else {
293 blocked_unoptimized_ios.union_insert(blocked_io.offset,
294 blocked_io.length);
295 ++it;
296 }
297 }
298
299 if (blocked_unoptimized_object_ios.empty()) {
300 m_blocked_unoptimized_ios.erase(blocked_unoptimized_ios_it);
301 }
302 }
303
304 // handle optimized IOs that were blocked
305 auto blocked_io_it = m_blocked_ios.find(object_no);
306 if (blocked_io_it != m_blocked_ios.end()) {
307 auto& blocked_object_ios = blocked_io_it->second;
308
309 auto blocked_object_ios_it = blocked_object_ios.begin();
310 while (blocked_object_ios_it != blocked_object_ios.end()) {
311 auto next_blocked_object_ios_it = blocked_object_ios_it;
312 ++next_blocked_object_ios_it;
313
314 auto& blocked_io = blocked_object_ios_it->second;
315 if (blocked_unoptimized_ios.intersects(blocked_io.offset,
316 blocked_io.length) ||
317 block_overlapping_io(&in_flight_object_extents, blocked_io.offset,
318 blocked_io.length)) {
319 break;
320 }
321
322 // move unblocked IO to the queued list, which will get processed when
323 // there is capacity
324 auto tid = blocked_object_ios_it->first;
325 ldout(cct, 20) << "queueing unblocked: tid=" << tid << dendl;
326 m_queued_ios.emplace(tid, blocked_io);
327
328 blocked_object_ios.erase(blocked_object_ios_it);
329 blocked_object_ios_it = next_blocked_object_ios_it;
330 }
331
332 if (blocked_object_ios.empty()) {
333 m_blocked_ios.erase(blocked_io_it);
334 }
335 }
336
337 if (in_flight_object_extents.empty()) {
338 m_in_flight_extents.erase(in_flight_extents_it);
339 }
340}
341
342template <typename I>
343bool WriteAroundObjectDispatch<I>::can_dispatch_io(
344 uint64_t tid, uint64_t length) {
345 ceph_assert(ceph_mutex_is_locked(m_lock));
346
347 if (m_in_flight_bytes == 0 || m_in_flight_bytes + length <= m_max_dirty) {
348 // no in-flight IO or still under max write-around in-flight limit.
349 // allow the dispatcher to proceed to send the IO but complete it back
350 // to the invoker.
351 m_in_flight_bytes += length;
352 m_in_flight_io_tids.insert(tid);
353 return true;
354 }
355
356 return false;
357}
358
359template <typename I>
360void WriteAroundObjectDispatch<I>::handle_in_flight_io_complete(
361 int r, uint64_t tid, uint64_t object_no, uint64_t object_off,
362 uint64_t object_len) {
363 auto cct = m_image_ctx->cct;
364 ldout(cct, 20) << "r=" << r << ", tid=" << tid << dendl;
365
366 m_lock.lock();
367 m_in_flight_io_tids.erase(tid);
368 ceph_assert(m_in_flight_bytes >= object_len);
369 m_in_flight_bytes -= object_len;
370
371 if (r < 0) {
372 lderr(cct) << "IO error encountered: tid=" << tid << ": "
373 << cpp_strerror(r) << dendl;
374 if (m_pending_flush_error == 0) {
375 m_pending_flush_error = r;
376 }
377 }
378
379 // any overlapping blocked IOs can be queued now
380 Contexts unoptimized_io_dispatches;
381 unblock_overlapping_ios(object_no, object_off, object_len,
382 &unoptimized_io_dispatches);
383
384 // collect any flushes that are ready for completion
385 int pending_flush_error = 0;
386 auto finished_flushes = collect_finished_flushes();
387 if (!finished_flushes.empty()) {
388 std::swap(pending_flush_error, m_pending_flush_error);
389 }
390
391 // collect any queued IOs that are ready for dispatch
392 auto ready_ios = collect_ready_ios();
393
394 // collect any queued flushes that were tied to queued IOs
395 auto ready_flushes = collect_ready_flushes();
396 m_lock.unlock();
397
398 // dispatch any ready unoptimized IOs
399 for (auto& it : unoptimized_io_dispatches) {
400 ldout(cct, 20) << "dispatching unoptimized IO: tid=" << it.first << dendl;
401 it.second->complete(0);
402 }
403
404 // complete flushes that were waiting on in-flight IO
405 // (and propogate any IO error to first flush)
406 for (auto& it : finished_flushes) {
407 ldout(cct, 20) << "completing flush: tid=" << it.first << ", "
408 << "r=" << pending_flush_error << dendl;
409 it.second->complete(pending_flush_error);
410 }
411
412 // dispatch any ready queued IOs
413 for (auto& it : ready_ios) {
414 ldout(cct, 20) << "dispatching IO: tid=" << it.first << dendl;
415 it.second.on_dispatched->complete(0);
416 it.second.on_finish->complete(0);
417 }
418
419 // dispatch any ready flushes
420 for (auto& it : ready_flushes) {
421 ldout(cct, 20) << "dispatching flush: tid=" << it.first << dendl;
422 it.second->complete(0);
423 }
424}
425
426template <typename I>
427void WriteAroundObjectDispatch<I>::handle_in_flight_flush_complete(
428 int r, uint64_t tid) {
429 auto cct = m_image_ctx->cct;
430 ldout(cct, 20) << "r=" << r << ", tid=" << tid << dendl;
431
432 m_lock.lock();
433
434 // move the in-flight flush to the pending completion list
435 auto it = m_in_flight_flushes.find(tid);
436 ceph_assert(it != m_in_flight_flushes.end());
437
438 m_pending_flushes.emplace(it->first, it->second);
439 m_in_flight_flushes.erase(it);
440
441 // collect any flushes that are ready for completion
442 int pending_flush_error = 0;
443 auto finished_flushes = collect_finished_flushes();
444 if (!finished_flushes.empty()) {
445 std::swap(pending_flush_error, m_pending_flush_error);
446 }
447 m_lock.unlock();
448
449 // complete flushes that were waiting on in-flight IO
450 // (and propogate any IO errors)
451 for (auto& it : finished_flushes) {
452 ldout(cct, 20) << "completing flush: tid=" << it.first << dendl;
453 it.second->complete(pending_flush_error);
454 pending_flush_error = 0;
455 }
456}
457
458template <typename I>
459typename WriteAroundObjectDispatch<I>::QueuedIOs
460WriteAroundObjectDispatch<I>::collect_ready_ios() {
461 ceph_assert(ceph_mutex_is_locked(m_lock));
462
463 QueuedIOs queued_ios;
464
465 while (true) {
466 auto it = m_queued_ios.begin();
467 if (it == m_queued_ios.end() ||
468 !can_dispatch_io(it->first, it->second.length)) {
469 break;
470 }
471
472 queued_ios.emplace(it->first, it->second);
473 m_queued_or_blocked_io_tids.erase(it->first);
474 m_queued_ios.erase(it);
475 }
476 return queued_ios;
477}
478
479template <typename I>
480typename WriteAroundObjectDispatch<I>::Contexts
481WriteAroundObjectDispatch<I>::collect_ready_flushes() {
482 ceph_assert(ceph_mutex_is_locked(m_lock));
483
484 Contexts ready_flushes;
485 auto io_tid_it = m_queued_or_blocked_io_tids.begin();
486 while (true) {
487 auto it = m_queued_flushes.begin();
488 if (it == m_queued_flushes.end() ||
489 (io_tid_it != m_queued_or_blocked_io_tids.end() &&
490 *io_tid_it < it->first)) {
491 break;
492 }
493
494 m_in_flight_flushes.emplace(it->first, it->second.on_finish);
495 ready_flushes.emplace(it->first, it->second.on_dispatched);
496 m_queued_flushes.erase(it);
497 }
498
499 return ready_flushes;
500}
501
502template <typename I>
503typename WriteAroundObjectDispatch<I>::Contexts
504WriteAroundObjectDispatch<I>::collect_finished_flushes() {
505 ceph_assert(ceph_mutex_is_locked(m_lock));
506
507 Contexts finished_flushes;
508 auto io_tid_it = m_in_flight_io_tids.begin();
509 while (true) {
510 auto it = m_pending_flushes.begin();
511 if (it == m_pending_flushes.end() ||
512 (io_tid_it != m_in_flight_io_tids.end() && *io_tid_it < it->first)) {
513 break;
514 }
515
516 finished_flushes.emplace(it->first, it->second);
517 m_pending_flushes.erase(it);
518 }
519 return finished_flushes;
520}
521
522} // namespace cache
523} // namespace librbd
524
525template class librbd::cache::WriteAroundObjectDispatch<librbd::ImageCtx>;