]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "librbd/cache/WriteAroundObjectDispatch.h" | |
5 | #include "common/dout.h" | |
6 | #include "common/errno.h" | |
9f95a23c TL |
7 | #include "librbd/ImageCtx.h" |
8 | #include "librbd/Utils.h" | |
f67539c2 | 9 | #include "librbd/asio/ContextWQ.h" |
9f95a23c | 10 | #include "librbd/io/ObjectDispatchSpec.h" |
f67539c2 | 11 | #include "librbd/io/ObjectDispatcherInterface.h" |
9f95a23c TL |
12 | |
13 | #define dout_subsys ceph_subsys_rbd | |
14 | #undef dout_prefix | |
15 | #define dout_prefix *_dout << "librbd::cache::WriteAroundObjectDispatch: " \ | |
16 | << this << " " << __func__ << ": " | |
17 | ||
18 | namespace librbd { | |
19 | namespace cache { | |
20 | ||
21 | using librbd::util::data_object_name; | |
22 | ||
23 | template <typename I> | |
24 | WriteAroundObjectDispatch<I>::WriteAroundObjectDispatch( | |
25 | I* image_ctx, size_t max_dirty, bool writethrough_until_flush) | |
26 | : m_image_ctx(image_ctx), m_init_max_dirty(max_dirty), m_max_dirty(max_dirty), | |
27 | m_lock(ceph::make_mutex(util::unique_lock_name( | |
28 | "librbd::cache::WriteAroundObjectDispatch::lock", this))) { | |
29 | if (writethrough_until_flush) { | |
30 | m_max_dirty = 0; | |
31 | } | |
32 | } | |
33 | ||
34 | template <typename I> | |
35 | WriteAroundObjectDispatch<I>::~WriteAroundObjectDispatch() { | |
36 | } | |
37 | ||
38 | template <typename I> | |
39 | void WriteAroundObjectDispatch<I>::init() { | |
40 | auto cct = m_image_ctx->cct; | |
41 | ldout(cct, 5) << dendl; | |
42 | ||
43 | // add ourself to the IO object dispatcher chain | |
44 | if (m_init_max_dirty > 0) { | |
45 | m_image_ctx->disable_zero_copy = true; | |
46 | } | |
f67539c2 | 47 | m_image_ctx->io_object_dispatcher->register_dispatch(this); |
9f95a23c TL |
48 | } |
49 | ||
50 | template <typename I> | |
51 | void WriteAroundObjectDispatch<I>::shut_down(Context* on_finish) { | |
52 | auto cct = m_image_ctx->cct; | |
53 | ldout(cct, 5) << dendl; | |
54 | ||
55 | on_finish->complete(0); | |
56 | } | |
57 | ||
58 | template <typename I> | |
59 | bool WriteAroundObjectDispatch<I>::read( | |
f67539c2 TL |
60 | uint64_t object_no, io::ReadExtents* extents, IOContext io_context, |
61 | int op_flags, int read_flags, const ZTracer::Trace &parent_trace, | |
62 | uint64_t* version, int* object_dispatch_flags, | |
63 | io::DispatchResult* dispatch_result, Context** on_finish, | |
64 | Context* on_dispatched) { | |
65 | bool handled = false; | |
66 | for (auto& extent: *extents) { | |
67 | handled |= dispatch_unoptimized_io(object_no, extent.offset, extent.length, | |
68 | dispatch_result, on_dispatched); | |
69 | } | |
70 | return handled; | |
9f95a23c TL |
71 | } |
72 | ||
73 | template <typename I> | |
74 | bool WriteAroundObjectDispatch<I>::discard( | |
75 | uint64_t object_no, uint64_t object_off, uint64_t object_len, | |
f67539c2 | 76 | IOContext io_context, int discard_flags, |
9f95a23c TL |
77 | const ZTracer::Trace &parent_trace, int* object_dispatch_flags, |
78 | uint64_t* journal_tid, io::DispatchResult* dispatch_result, | |
79 | Context** on_finish, Context* on_dispatched) { | |
80 | auto cct = m_image_ctx->cct; | |
81 | ldout(cct, 20) << data_object_name(m_image_ctx, object_no) << " " | |
82 | << object_off << "~" << object_len << dendl; | |
83 | ||
84 | return dispatch_io(object_no, object_off, object_len, 0, dispatch_result, | |
85 | on_finish, on_dispatched); | |
86 | } | |
87 | ||
88 | template <typename I> | |
89 | bool WriteAroundObjectDispatch<I>::write( | |
90 | uint64_t object_no, uint64_t object_off, ceph::bufferlist&& data, | |
f67539c2 TL |
91 | IOContext io_context, int op_flags, int write_flags, |
92 | std::optional<uint64_t> assert_version, | |
9f95a23c TL |
93 | const ZTracer::Trace &parent_trace, int* object_dispatch_flags, |
94 | uint64_t* journal_tid, io::DispatchResult* dispatch_result, | |
95 | Context**on_finish, Context* on_dispatched) { | |
96 | auto cct = m_image_ctx->cct; | |
97 | ldout(cct, 20) << data_object_name(m_image_ctx, object_no) << " " | |
98 | << object_off << "~" << data.length() << dendl; | |
99 | ||
100 | return dispatch_io(object_no, object_off, data.length(), op_flags, | |
101 | dispatch_result, on_finish, on_dispatched); | |
102 | } | |
103 | ||
104 | template <typename I> | |
105 | bool WriteAroundObjectDispatch<I>::write_same( | |
106 | uint64_t object_no, uint64_t object_off, uint64_t object_len, | |
107 | io::LightweightBufferExtents&& buffer_extents, ceph::bufferlist&& data, | |
f67539c2 | 108 | IOContext io_context, int op_flags, |
9f95a23c TL |
109 | const ZTracer::Trace &parent_trace, int* object_dispatch_flags, |
110 | uint64_t* journal_tid, io::DispatchResult* dispatch_result, | |
111 | Context**on_finish, Context* on_dispatched) { | |
112 | auto cct = m_image_ctx->cct; | |
113 | ldout(cct, 20) << data_object_name(m_image_ctx, object_no) << " " | |
114 | << object_off << "~" << object_len << dendl; | |
115 | ||
20effc67 TL |
116 | return dispatch_io(object_no, object_off, object_len, op_flags, |
117 | dispatch_result, on_finish, on_dispatched); | |
9f95a23c TL |
118 | } |
119 | ||
120 | template <typename I> | |
121 | bool WriteAroundObjectDispatch<I>::compare_and_write( | |
122 | uint64_t object_no, uint64_t object_off, ceph::bufferlist&& cmp_data, | |
f67539c2 | 123 | ceph::bufferlist&& write_data, IOContext io_context, int op_flags, |
9f95a23c TL |
124 | const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset, |
125 | int* object_dispatch_flags, uint64_t* journal_tid, | |
126 | io::DispatchResult* dispatch_result, Context** on_finish, | |
127 | Context* on_dispatched) { | |
128 | return dispatch_unoptimized_io(object_no, object_off, cmp_data.length(), | |
129 | dispatch_result, on_dispatched); | |
130 | } | |
131 | ||
132 | template <typename I> | |
133 | bool WriteAroundObjectDispatch<I>::flush( | |
134 | io::FlushSource flush_source, const ZTracer::Trace &parent_trace, | |
135 | uint64_t* journal_tid, io::DispatchResult* dispatch_result, | |
136 | Context** on_finish, Context* on_dispatched) { | |
137 | auto cct = m_image_ctx->cct; | |
138 | ldout(cct, 20) << dendl; | |
139 | ||
140 | std::lock_guard locker{m_lock}; | |
141 | if (flush_source == io::FLUSH_SOURCE_USER && !m_user_flushed) { | |
142 | m_user_flushed = true; | |
143 | if (m_max_dirty == 0 && m_init_max_dirty > 0) { | |
144 | ldout(cct, 5) << "first user flush: enabling write-around" << dendl; | |
145 | m_max_dirty = m_init_max_dirty; | |
146 | } | |
147 | } | |
148 | ||
149 | if (m_in_flight_io_tids.empty()) { | |
150 | // no in-flight IO (also implies no queued/blocked IO) | |
151 | return false; | |
152 | } | |
153 | ||
154 | auto tid = ++m_last_tid; | |
155 | auto ctx = util::create_async_context_callback(*m_image_ctx, *on_finish); | |
156 | ||
157 | *dispatch_result = io::DISPATCH_RESULT_CONTINUE; | |
158 | *on_finish = new LambdaContext([this, tid](int r) { | |
159 | handle_in_flight_flush_complete(r, tid); | |
160 | }); | |
161 | ||
162 | if (m_queued_ios.empty() && m_blocked_ios.empty()) { | |
163 | // immediately allow the flush to be dispatched | |
164 | ldout(cct, 20) << "dispatching: tid=" << tid << dendl; | |
165 | m_in_flight_flushes.emplace(tid, ctx); | |
166 | return false; | |
167 | } | |
168 | ||
169 | // cannot dispatch the flush until after preceeding IO is dispatched | |
170 | ldout(cct, 20) << "queueing: tid=" << tid << dendl; | |
171 | m_queued_flushes.emplace(tid, QueuedFlush{ctx, on_dispatched}); | |
172 | return true; | |
173 | } | |
174 | ||
175 | template <typename I> | |
176 | bool WriteAroundObjectDispatch<I>::dispatch_unoptimized_io( | |
177 | uint64_t object_no, uint64_t object_off, uint64_t object_len, | |
178 | io::DispatchResult* dispatch_result, Context* on_dispatched) { | |
179 | auto cct = m_image_ctx->cct; | |
180 | ||
181 | m_lock.lock(); | |
182 | auto in_flight_extents_it = m_in_flight_extents.find(object_no); | |
183 | if (in_flight_extents_it == m_in_flight_extents.end() || | |
184 | !in_flight_extents_it->second.intersects(object_off, object_len)) { | |
185 | // no IO in-flight to the specified extent | |
186 | m_lock.unlock(); | |
187 | return false; | |
188 | } | |
189 | ||
190 | // write IO is in-flight -- it needs to complete before the unoptimized | |
191 | // IO can be dispatched | |
192 | auto tid = ++m_last_tid; | |
193 | ldout(cct, 20) << "blocked by in-flight IO: tid=" << tid << dendl; | |
194 | *dispatch_result = io::DISPATCH_RESULT_CONTINUE; | |
195 | m_blocked_unoptimized_ios[object_no].emplace( | |
196 | tid, BlockedIO{object_off, object_len, nullptr, on_dispatched}); | |
197 | m_lock.unlock(); | |
198 | ||
199 | return true; | |
200 | } | |
201 | ||
202 | template <typename I> | |
203 | bool WriteAroundObjectDispatch<I>::dispatch_io( | |
204 | uint64_t object_no, uint64_t object_off, uint64_t object_len, | |
205 | int op_flags, io::DispatchResult* dispatch_result, Context** on_finish, | |
206 | Context* on_dispatched) { | |
207 | auto cct = m_image_ctx->cct; | |
208 | ||
209 | m_lock.lock(); | |
210 | if (m_max_dirty == 0) { | |
211 | // write-through mode is active -- no-op the cache | |
212 | m_lock.unlock(); | |
213 | return false; | |
214 | } | |
215 | ||
216 | if ((op_flags & LIBRADOS_OP_FLAG_FADVISE_FUA) != 0) { | |
217 | // force unit access flag is set -- disable write-around | |
218 | m_lock.unlock(); | |
219 | return dispatch_unoptimized_io(object_no, object_off, object_len, | |
220 | dispatch_result, on_dispatched); | |
221 | } | |
222 | ||
223 | auto tid = ++m_last_tid; | |
224 | auto ctx = util::create_async_context_callback(*m_image_ctx, *on_finish); | |
225 | ||
226 | *dispatch_result = io::DISPATCH_RESULT_CONTINUE; | |
227 | *on_finish = new LambdaContext( | |
228 | [this, tid, object_no, object_off, object_len](int r) { | |
229 | handle_in_flight_io_complete(r, tid, object_no, object_off, object_len); | |
230 | }); | |
231 | ||
232 | bool blocked = block_overlapping_io(&m_in_flight_extents[object_no], | |
233 | object_off, object_len); | |
234 | if (blocked) { | |
235 | ldout(cct, 20) << "blocked on overlap: tid=" << tid << dendl; | |
236 | m_queued_or_blocked_io_tids.insert(tid); | |
237 | m_blocked_ios[object_no].emplace(tid, BlockedIO{object_off, object_len, ctx, | |
238 | on_dispatched}); | |
239 | m_lock.unlock(); | |
240 | } else if (can_dispatch_io(tid, object_len)) { | |
241 | m_lock.unlock(); | |
242 | ||
243 | ldout(cct, 20) << "dispatching: tid=" << tid << dendl; | |
244 | on_dispatched->complete(0); | |
245 | ctx->complete(0); | |
246 | } else { | |
247 | ldout(cct, 20) << "queueing: tid=" << tid << dendl; | |
248 | m_queued_or_blocked_io_tids.insert(tid); | |
249 | m_queued_ios.emplace(tid, QueuedIO{object_len, ctx, on_dispatched}); | |
250 | m_lock.unlock(); | |
251 | } | |
252 | return true; | |
253 | } | |
254 | ||
255 | template <typename I> | |
256 | bool WriteAroundObjectDispatch<I>::block_overlapping_io( | |
257 | InFlightObjectExtents* in_flight_object_extents, uint64_t object_off, | |
258 | uint64_t object_len) { | |
259 | if (in_flight_object_extents->intersects(object_off, object_len)) { | |
260 | return true; | |
261 | } | |
262 | ||
263 | in_flight_object_extents->insert(object_off, object_len); | |
264 | return false; | |
265 | } | |
266 | ||
267 | template <typename I> | |
268 | void WriteAroundObjectDispatch<I>::unblock_overlapping_ios( | |
269 | uint64_t object_no, uint64_t object_off, uint64_t object_len, | |
270 | Contexts* unoptimized_io_dispatches) { | |
271 | auto cct = m_image_ctx->cct; | |
272 | ceph_assert(ceph_mutex_is_locked(m_lock)); | |
273 | ||
274 | auto in_flight_extents_it = m_in_flight_extents.find(object_no); | |
275 | ceph_assert(in_flight_extents_it != m_in_flight_extents.end()); | |
276 | ||
277 | auto& in_flight_object_extents = in_flight_extents_it->second; | |
278 | in_flight_object_extents.erase(object_off, object_len); | |
279 | ||
280 | // handle unoptimized IOs that were blocked by in-flight IO | |
281 | InFlightObjectExtents blocked_unoptimized_ios; | |
282 | auto blocked_unoptimized_ios_it = m_blocked_unoptimized_ios.find(object_no); | |
283 | if (blocked_unoptimized_ios_it != m_blocked_unoptimized_ios.end()) { | |
284 | auto& blocked_unoptimized_object_ios = blocked_unoptimized_ios_it->second; | |
285 | for (auto it = blocked_unoptimized_object_ios.begin(); | |
286 | it != blocked_unoptimized_object_ios.end();) { | |
287 | auto& blocked_io = it->second; | |
288 | if (!in_flight_object_extents.intersects(blocked_io.offset, | |
289 | blocked_io.length)) { | |
290 | unoptimized_io_dispatches->emplace(it->first, blocked_io.on_dispatched); | |
291 | it = blocked_unoptimized_object_ios.erase(it); | |
292 | } else { | |
293 | blocked_unoptimized_ios.union_insert(blocked_io.offset, | |
294 | blocked_io.length); | |
295 | ++it; | |
296 | } | |
297 | } | |
298 | ||
299 | if (blocked_unoptimized_object_ios.empty()) { | |
300 | m_blocked_unoptimized_ios.erase(blocked_unoptimized_ios_it); | |
301 | } | |
302 | } | |
303 | ||
304 | // handle optimized IOs that were blocked | |
305 | auto blocked_io_it = m_blocked_ios.find(object_no); | |
306 | if (blocked_io_it != m_blocked_ios.end()) { | |
307 | auto& blocked_object_ios = blocked_io_it->second; | |
308 | ||
309 | auto blocked_object_ios_it = blocked_object_ios.begin(); | |
310 | while (blocked_object_ios_it != blocked_object_ios.end()) { | |
311 | auto next_blocked_object_ios_it = blocked_object_ios_it; | |
312 | ++next_blocked_object_ios_it; | |
313 | ||
314 | auto& blocked_io = blocked_object_ios_it->second; | |
315 | if (blocked_unoptimized_ios.intersects(blocked_io.offset, | |
316 | blocked_io.length) || | |
317 | block_overlapping_io(&in_flight_object_extents, blocked_io.offset, | |
318 | blocked_io.length)) { | |
319 | break; | |
320 | } | |
321 | ||
322 | // move unblocked IO to the queued list, which will get processed when | |
323 | // there is capacity | |
324 | auto tid = blocked_object_ios_it->first; | |
325 | ldout(cct, 20) << "queueing unblocked: tid=" << tid << dendl; | |
326 | m_queued_ios.emplace(tid, blocked_io); | |
327 | ||
328 | blocked_object_ios.erase(blocked_object_ios_it); | |
329 | blocked_object_ios_it = next_blocked_object_ios_it; | |
330 | } | |
331 | ||
332 | if (blocked_object_ios.empty()) { | |
333 | m_blocked_ios.erase(blocked_io_it); | |
334 | } | |
335 | } | |
336 | ||
337 | if (in_flight_object_extents.empty()) { | |
338 | m_in_flight_extents.erase(in_flight_extents_it); | |
339 | } | |
340 | } | |
341 | ||
342 | template <typename I> | |
343 | bool WriteAroundObjectDispatch<I>::can_dispatch_io( | |
344 | uint64_t tid, uint64_t length) { | |
345 | ceph_assert(ceph_mutex_is_locked(m_lock)); | |
346 | ||
347 | if (m_in_flight_bytes == 0 || m_in_flight_bytes + length <= m_max_dirty) { | |
348 | // no in-flight IO or still under max write-around in-flight limit. | |
349 | // allow the dispatcher to proceed to send the IO but complete it back | |
350 | // to the invoker. | |
351 | m_in_flight_bytes += length; | |
352 | m_in_flight_io_tids.insert(tid); | |
353 | return true; | |
354 | } | |
355 | ||
356 | return false; | |
357 | } | |
358 | ||
359 | template <typename I> | |
360 | void WriteAroundObjectDispatch<I>::handle_in_flight_io_complete( | |
361 | int r, uint64_t tid, uint64_t object_no, uint64_t object_off, | |
362 | uint64_t object_len) { | |
363 | auto cct = m_image_ctx->cct; | |
364 | ldout(cct, 20) << "r=" << r << ", tid=" << tid << dendl; | |
365 | ||
366 | m_lock.lock(); | |
367 | m_in_flight_io_tids.erase(tid); | |
368 | ceph_assert(m_in_flight_bytes >= object_len); | |
369 | m_in_flight_bytes -= object_len; | |
370 | ||
371 | if (r < 0) { | |
372 | lderr(cct) << "IO error encountered: tid=" << tid << ": " | |
373 | << cpp_strerror(r) << dendl; | |
374 | if (m_pending_flush_error == 0) { | |
375 | m_pending_flush_error = r; | |
376 | } | |
377 | } | |
378 | ||
379 | // any overlapping blocked IOs can be queued now | |
380 | Contexts unoptimized_io_dispatches; | |
381 | unblock_overlapping_ios(object_no, object_off, object_len, | |
382 | &unoptimized_io_dispatches); | |
383 | ||
384 | // collect any flushes that are ready for completion | |
385 | int pending_flush_error = 0; | |
386 | auto finished_flushes = collect_finished_flushes(); | |
387 | if (!finished_flushes.empty()) { | |
388 | std::swap(pending_flush_error, m_pending_flush_error); | |
389 | } | |
390 | ||
391 | // collect any queued IOs that are ready for dispatch | |
392 | auto ready_ios = collect_ready_ios(); | |
393 | ||
394 | // collect any queued flushes that were tied to queued IOs | |
395 | auto ready_flushes = collect_ready_flushes(); | |
396 | m_lock.unlock(); | |
397 | ||
398 | // dispatch any ready unoptimized IOs | |
399 | for (auto& it : unoptimized_io_dispatches) { | |
400 | ldout(cct, 20) << "dispatching unoptimized IO: tid=" << it.first << dendl; | |
401 | it.second->complete(0); | |
402 | } | |
403 | ||
404 | // complete flushes that were waiting on in-flight IO | |
405 | // (and propogate any IO error to first flush) | |
406 | for (auto& it : finished_flushes) { | |
407 | ldout(cct, 20) << "completing flush: tid=" << it.first << ", " | |
408 | << "r=" << pending_flush_error << dendl; | |
409 | it.second->complete(pending_flush_error); | |
410 | } | |
411 | ||
412 | // dispatch any ready queued IOs | |
413 | for (auto& it : ready_ios) { | |
414 | ldout(cct, 20) << "dispatching IO: tid=" << it.first << dendl; | |
415 | it.second.on_dispatched->complete(0); | |
416 | it.second.on_finish->complete(0); | |
417 | } | |
418 | ||
419 | // dispatch any ready flushes | |
420 | for (auto& it : ready_flushes) { | |
421 | ldout(cct, 20) << "dispatching flush: tid=" << it.first << dendl; | |
422 | it.second->complete(0); | |
423 | } | |
424 | } | |
425 | ||
426 | template <typename I> | |
427 | void WriteAroundObjectDispatch<I>::handle_in_flight_flush_complete( | |
428 | int r, uint64_t tid) { | |
429 | auto cct = m_image_ctx->cct; | |
430 | ldout(cct, 20) << "r=" << r << ", tid=" << tid << dendl; | |
431 | ||
432 | m_lock.lock(); | |
433 | ||
434 | // move the in-flight flush to the pending completion list | |
435 | auto it = m_in_flight_flushes.find(tid); | |
436 | ceph_assert(it != m_in_flight_flushes.end()); | |
437 | ||
438 | m_pending_flushes.emplace(it->first, it->second); | |
439 | m_in_flight_flushes.erase(it); | |
440 | ||
441 | // collect any flushes that are ready for completion | |
442 | int pending_flush_error = 0; | |
443 | auto finished_flushes = collect_finished_flushes(); | |
444 | if (!finished_flushes.empty()) { | |
445 | std::swap(pending_flush_error, m_pending_flush_error); | |
446 | } | |
447 | m_lock.unlock(); | |
448 | ||
449 | // complete flushes that were waiting on in-flight IO | |
450 | // (and propogate any IO errors) | |
451 | for (auto& it : finished_flushes) { | |
452 | ldout(cct, 20) << "completing flush: tid=" << it.first << dendl; | |
453 | it.second->complete(pending_flush_error); | |
454 | pending_flush_error = 0; | |
455 | } | |
456 | } | |
457 | ||
458 | template <typename I> | |
459 | typename WriteAroundObjectDispatch<I>::QueuedIOs | |
460 | WriteAroundObjectDispatch<I>::collect_ready_ios() { | |
461 | ceph_assert(ceph_mutex_is_locked(m_lock)); | |
462 | ||
463 | QueuedIOs queued_ios; | |
464 | ||
465 | while (true) { | |
466 | auto it = m_queued_ios.begin(); | |
467 | if (it == m_queued_ios.end() || | |
468 | !can_dispatch_io(it->first, it->second.length)) { | |
469 | break; | |
470 | } | |
471 | ||
472 | queued_ios.emplace(it->first, it->second); | |
473 | m_queued_or_blocked_io_tids.erase(it->first); | |
474 | m_queued_ios.erase(it); | |
475 | } | |
476 | return queued_ios; | |
477 | } | |
478 | ||
479 | template <typename I> | |
480 | typename WriteAroundObjectDispatch<I>::Contexts | |
481 | WriteAroundObjectDispatch<I>::collect_ready_flushes() { | |
482 | ceph_assert(ceph_mutex_is_locked(m_lock)); | |
483 | ||
484 | Contexts ready_flushes; | |
485 | auto io_tid_it = m_queued_or_blocked_io_tids.begin(); | |
486 | while (true) { | |
487 | auto it = m_queued_flushes.begin(); | |
488 | if (it == m_queued_flushes.end() || | |
489 | (io_tid_it != m_queued_or_blocked_io_tids.end() && | |
490 | *io_tid_it < it->first)) { | |
491 | break; | |
492 | } | |
493 | ||
494 | m_in_flight_flushes.emplace(it->first, it->second.on_finish); | |
495 | ready_flushes.emplace(it->first, it->second.on_dispatched); | |
496 | m_queued_flushes.erase(it); | |
497 | } | |
498 | ||
499 | return ready_flushes; | |
500 | } | |
501 | ||
502 | template <typename I> | |
503 | typename WriteAroundObjectDispatch<I>::Contexts | |
504 | WriteAroundObjectDispatch<I>::collect_finished_flushes() { | |
505 | ceph_assert(ceph_mutex_is_locked(m_lock)); | |
506 | ||
507 | Contexts finished_flushes; | |
508 | auto io_tid_it = m_in_flight_io_tids.begin(); | |
509 | while (true) { | |
510 | auto it = m_pending_flushes.begin(); | |
511 | if (it == m_pending_flushes.end() || | |
512 | (io_tid_it != m_in_flight_io_tids.end() && *io_tid_it < it->first)) { | |
513 | break; | |
514 | } | |
515 | ||
516 | finished_flushes.emplace(it->first, it->second); | |
517 | m_pending_flushes.erase(it); | |
518 | } | |
519 | return finished_flushes; | |
520 | } | |
521 | ||
522 | } // namespace cache | |
523 | } // namespace librbd | |
524 | ||
525 | template class librbd::cache::WriteAroundObjectDispatch<librbd::ImageCtx>; |