1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "librbd/journal/Replay.h"
5 #include "common/dout.h"
6 #include "common/errno.h"
7 #include "common/WorkQueue.h"
8 #include "librbd/ImageCtx.h"
9 #include "librbd/ImageState.h"
10 #include "librbd/internal.h"
11 #include "librbd/Operations.h"
12 #include "librbd/Utils.h"
13 #include "librbd/io/AioCompletion.h"
14 #include "librbd/io/ImageRequest.h"
16 #define dout_subsys ceph_subsys_rbd
18 #define dout_prefix *_dout << "librbd::journal::Replay: " << this << " "
25 static const uint64_t IN_FLIGHT_IO_LOW_WATER_MARK(32);
26 static const uint64_t IN_FLIGHT_IO_HIGH_WATER_MARK(64);
28 static NoOpProgressContext no_op_progress_callback
;
30 template <typename I
, typename E
>
31 struct ExecuteOp
: public Context
{
34 Context
*on_op_complete
;
36 ExecuteOp(I
&image_ctx
, const E
&event
, Context
*on_op_complete
)
37 : image_ctx(image_ctx
), event(event
), on_op_complete(on_op_complete
) {
40 void execute(const journal::SnapCreateEvent
&_
) {
41 image_ctx
.operations
->execute_snap_create(event
.snap_namespace
,
47 void execute(const journal::SnapRemoveEvent
&_
) {
48 image_ctx
.operations
->execute_snap_remove(event
.snap_namespace
,
53 void execute(const journal::SnapRenameEvent
&_
) {
54 image_ctx
.operations
->execute_snap_rename(event
.snap_id
,
59 void execute(const journal::SnapProtectEvent
&_
) {
60 image_ctx
.operations
->execute_snap_protect(event
.snap_namespace
,
65 void execute(const journal::SnapUnprotectEvent
&_
) {
66 image_ctx
.operations
->execute_snap_unprotect(event
.snap_namespace
,
71 void execute(const journal::SnapRollbackEvent
&_
) {
72 image_ctx
.operations
->execute_snap_rollback(event
.snap_namespace
,
74 no_op_progress_callback
,
78 void execute(const journal::RenameEvent
&_
) {
79 image_ctx
.operations
->execute_rename(event
.image_name
,
83 void execute(const journal::ResizeEvent
&_
) {
84 image_ctx
.operations
->execute_resize(event
.size
, true, no_op_progress_callback
,
85 on_op_complete
, event
.op_tid
);
88 void execute(const journal::FlattenEvent
&_
) {
89 image_ctx
.operations
->execute_flatten(no_op_progress_callback
,
93 void execute(const journal::SnapLimitEvent
&_
) {
94 image_ctx
.operations
->execute_snap_set_limit(event
.limit
, on_op_complete
);
97 void execute(const journal::UpdateFeaturesEvent
&_
) {
98 image_ctx
.operations
->execute_update_features(event
.features
, event
.enabled
,
99 on_op_complete
, event
.op_tid
);
102 void execute(const journal::MetadataSetEvent
&_
) {
103 image_ctx
.operations
->execute_metadata_set(event
.key
, event
.value
,
107 void execute(const journal::MetadataRemoveEvent
&_
) {
108 image_ctx
.operations
->execute_metadata_remove(event
.key
, on_op_complete
);
111 void finish(int r
) override
{
112 CephContext
*cct
= image_ctx
.cct
;
114 lderr(cct
) << ": ExecuteOp::" << __func__
<< ": r=" << r
<< dendl
;
115 on_op_complete
->complete(r
);
119 ldout(cct
, 20) << ": ExecuteOp::" << __func__
<< dendl
;
120 RWLock::RLocker
owner_locker(image_ctx
.owner_lock
);
125 template <typename I
>
126 struct C_RefreshIfRequired
: public Context
{
130 C_RefreshIfRequired(I
&image_ctx
, Context
*on_finish
)
131 : image_ctx(image_ctx
), on_finish(on_finish
) {
133 ~C_RefreshIfRequired() override
{
137 void finish(int r
) override
{
138 CephContext
*cct
= image_ctx
.cct
;
139 Context
*ctx
= on_finish
;
143 lderr(cct
) << ": C_RefreshIfRequired::" << __func__
<< ": r=" << r
<< dendl
;
144 image_ctx
.op_work_queue
->queue(ctx
, r
);
148 if (image_ctx
.state
->is_refresh_required()) {
149 ldout(cct
, 20) << ": C_RefreshIfRequired::" << __func__
<< ": "
150 << "refresh required" << dendl
;
151 image_ctx
.state
->refresh(ctx
);
155 image_ctx
.op_work_queue
->queue(ctx
, 0);
159 } // anonymous namespace
162 #define dout_prefix *_dout << "librbd::journal::Replay: " << this << " " \
165 template <typename I
>
166 Replay
<I
>::Replay(I
&image_ctx
)
167 : m_image_ctx(image_ctx
), m_lock("Replay<I>::m_lock") {
170 template <typename I
>
171 Replay
<I
>::~Replay() {
172 assert(m_in_flight_aio_flush
== 0);
173 assert(m_in_flight_aio_modify
== 0);
174 assert(m_aio_modify_unsafe_contexts
.empty());
175 assert(m_aio_modify_safe_contexts
.empty());
176 assert(m_op_events
.empty());
177 assert(m_in_flight_op_events
== 0);
180 template <typename I
>
181 int Replay
<I
>::decode(bufferlist::iterator
*it
, EventEntry
*event_entry
) {
183 ::decode(*event_entry
, *it
);
184 } catch (const buffer::error
&err
) {
190 template <typename I
>
191 void Replay
<I
>::process(const EventEntry
&event_entry
,
192 Context
*on_ready
, Context
*on_safe
) {
193 CephContext
*cct
= m_image_ctx
.cct
;
194 ldout(cct
, 20) << ": on_ready=" << on_ready
<< ", on_safe=" << on_safe
197 on_ready
= util::create_async_context_callback(m_image_ctx
, on_ready
);
199 RWLock::RLocker
owner_lock(m_image_ctx
.owner_lock
);
200 boost::apply_visitor(EventVisitor(this, on_ready
, on_safe
),
204 template <typename I
>
205 void Replay
<I
>::shut_down(bool cancel_ops
, Context
*on_finish
) {
206 CephContext
*cct
= m_image_ctx
.cct
;
207 ldout(cct
, 20) << dendl
;
209 io::AioCompletion
*flush_comp
= nullptr;
210 on_finish
= util::create_async_context_callback(
211 m_image_ctx
, on_finish
);
214 Mutex::Locker
locker(m_lock
);
216 // safely commit any remaining AIO modify operations
217 if ((m_in_flight_aio_flush
+ m_in_flight_aio_modify
) != 0) {
218 flush_comp
= create_aio_flush_completion(nullptr);
221 for (auto &op_event_pair
: m_op_events
) {
222 OpEvent
&op_event
= op_event_pair
.second
;
224 // cancel ops that are waiting to start (waiting for
225 // OpFinishEvent or waiting for ready)
226 if (op_event
.on_start_ready
== nullptr &&
227 op_event
.on_op_finish_event
!= nullptr) {
228 Context
*on_op_finish_event
= nullptr;
229 std::swap(on_op_finish_event
, op_event
.on_op_finish_event
);
230 m_image_ctx
.op_work_queue
->queue(on_op_finish_event
, -ERESTART
);
232 } else if (op_event
.on_op_finish_event
!= nullptr) {
233 // start ops waiting for OpFinishEvent
234 Context
*on_op_finish_event
= nullptr;
235 std::swap(on_op_finish_event
, op_event
.on_op_finish_event
);
236 m_image_ctx
.op_work_queue
->queue(on_op_finish_event
, 0);
237 } else if (op_event
.on_start_ready
!= nullptr) {
238 // waiting for op ready
239 op_event_pair
.second
.finish_on_ready
= true;
243 assert(m_flush_ctx
== nullptr);
244 if (m_in_flight_op_events
> 0 || flush_comp
!= nullptr) {
245 std::swap(m_flush_ctx
, on_finish
);
249 // execute the following outside of lock scope
250 if (flush_comp
!= nullptr) {
251 RWLock::RLocker
owner_locker(m_image_ctx
.owner_lock
);
252 io::ImageRequest
<I
>::aio_flush(&m_image_ctx
, flush_comp
);
254 if (on_finish
!= nullptr) {
255 on_finish
->complete(0);
259 template <typename I
>
260 void Replay
<I
>::flush(Context
*on_finish
) {
261 io::AioCompletion
*aio_comp
;
263 Mutex::Locker
locker(m_lock
);
264 aio_comp
= create_aio_flush_completion(
265 util::create_async_context_callback(m_image_ctx
, on_finish
));
268 RWLock::RLocker
owner_locker(m_image_ctx
.owner_lock
);
269 io::ImageRequest
<I
>::aio_flush(&m_image_ctx
, aio_comp
);
272 template <typename I
>
273 void Replay
<I
>::replay_op_ready(uint64_t op_tid
, Context
*on_resume
) {
274 CephContext
*cct
= m_image_ctx
.cct
;
275 ldout(cct
, 20) << ": op_tid=" << op_tid
<< dendl
;
277 Mutex::Locker
locker(m_lock
);
278 auto op_it
= m_op_events
.find(op_tid
);
279 assert(op_it
!= m_op_events
.end());
281 OpEvent
&op_event
= op_it
->second
;
282 assert(op_event
.op_in_progress
&&
283 op_event
.on_op_finish_event
== nullptr &&
284 op_event
.on_finish_ready
== nullptr &&
285 op_event
.on_finish_safe
== nullptr);
287 // resume processing replay events
288 Context
*on_start_ready
= nullptr;
289 std::swap(on_start_ready
, op_event
.on_start_ready
);
290 on_start_ready
->complete(0);
292 // cancel has been requested -- send error to paused state machine
293 if (!op_event
.finish_on_ready
&& m_flush_ctx
!= nullptr) {
294 m_image_ctx
.op_work_queue
->queue(on_resume
, -ERESTART
);
298 // resume the op state machine once the associated OpFinishEvent
300 op_event
.on_op_finish_event
= new FunctionContext(
302 on_resume
->complete(r
);
305 // shut down request -- don't expect OpFinishEvent
306 if (op_event
.finish_on_ready
) {
307 m_image_ctx
.op_work_queue
->queue(on_resume
, 0);
311 template <typename I
>
312 void Replay
<I
>::handle_event(const journal::AioDiscardEvent
&event
,
313 Context
*on_ready
, Context
*on_safe
) {
314 CephContext
*cct
= m_image_ctx
.cct
;
315 ldout(cct
, 20) << ": AIO discard event" << dendl
;
318 auto aio_comp
= create_aio_modify_completion(on_ready
, on_safe
,
319 io::AIO_TYPE_DISCARD
,
321 io::ImageRequest
<I
>::aio_discard(&m_image_ctx
, aio_comp
, event
.offset
,
322 event
.length
, event
.skip_partial_discard
);
323 if (flush_required
) {
325 auto flush_comp
= create_aio_flush_completion(nullptr);
328 io::ImageRequest
<I
>::aio_flush(&m_image_ctx
, flush_comp
);
332 template <typename I
>
333 void Replay
<I
>::handle_event(const journal::AioWriteEvent
&event
,
334 Context
*on_ready
, Context
*on_safe
) {
335 CephContext
*cct
= m_image_ctx
.cct
;
336 ldout(cct
, 20) << ": AIO write event" << dendl
;
338 bufferlist data
= event
.data
;
340 auto aio_comp
= create_aio_modify_completion(on_ready
, on_safe
,
343 io::ImageRequest
<I
>::aio_write(&m_image_ctx
, aio_comp
,
344 {{event
.offset
, event
.length
}},
346 if (flush_required
) {
348 auto flush_comp
= create_aio_flush_completion(nullptr);
351 io::ImageRequest
<I
>::aio_flush(&m_image_ctx
, flush_comp
);
355 template <typename I
>
356 void Replay
<I
>::handle_event(const journal::AioFlushEvent
&event
,
357 Context
*on_ready
, Context
*on_safe
) {
358 CephContext
*cct
= m_image_ctx
.cct
;
359 ldout(cct
, 20) << ": AIO flush event" << dendl
;
361 io::AioCompletion
*aio_comp
;
363 Mutex::Locker
locker(m_lock
);
364 aio_comp
= create_aio_flush_completion(on_safe
);
366 io::ImageRequest
<I
>::aio_flush(&m_image_ctx
, aio_comp
);
368 on_ready
->complete(0);
371 template <typename I
>
372 void Replay
<I
>::handle_event(const journal::AioWriteSameEvent
&event
,
373 Context
*on_ready
, Context
*on_safe
) {
374 CephContext
*cct
= m_image_ctx
.cct
;
375 ldout(cct
, 20) << ": AIO writesame event" << dendl
;
377 bufferlist data
= event
.data
;
379 auto aio_comp
= create_aio_modify_completion(on_ready
, on_safe
,
380 io::AIO_TYPE_WRITESAME
,
382 io::ImageRequest
<I
>::aio_writesame(&m_image_ctx
, aio_comp
, event
.offset
,
383 event
.length
, std::move(data
), 0);
384 if (flush_required
) {
386 auto flush_comp
= create_aio_flush_completion(nullptr);
389 io::ImageRequest
<I
>::aio_flush(&m_image_ctx
, flush_comp
);
393 template <typename I
>
394 void Replay
<I
>::handle_event(const journal::OpFinishEvent
&event
,
395 Context
*on_ready
, Context
*on_safe
) {
396 CephContext
*cct
= m_image_ctx
.cct
;
397 ldout(cct
, 20) << ": Op finish event: "
398 << "op_tid=" << event
.op_tid
<< dendl
;
402 Context
*on_op_complete
= nullptr;
403 Context
*on_op_finish_event
= nullptr;
405 Mutex::Locker
locker(m_lock
);
406 auto op_it
= m_op_events
.find(event
.op_tid
);
407 if (op_it
== m_op_events
.end()) {
408 ldout(cct
, 10) << ": unable to locate associated op: assuming previously "
409 << "committed." << dendl
;
410 on_ready
->complete(0);
411 m_image_ctx
.op_work_queue
->queue(on_safe
, 0);
415 OpEvent
&op_event
= op_it
->second
;
416 assert(op_event
.on_finish_safe
== nullptr);
417 op_event
.on_finish_ready
= on_ready
;
418 op_event
.on_finish_safe
= on_safe
;
419 op_in_progress
= op_event
.op_in_progress
;
420 std::swap(on_op_complete
, op_event
.on_op_complete
);
421 std::swap(on_op_finish_event
, op_event
.on_op_finish_event
);
423 // special errors which indicate op never started but was recorded
424 // as failed in the journal
425 filter_ret_val
= (op_event
.op_finish_error_codes
.count(event
.r
) != 0);
429 if (op_in_progress
) {
430 // bubble the error up to the in-progress op to cancel it
431 on_op_finish_event
->complete(event
.r
);
433 // op hasn't been started -- bubble the error up since
434 // our image is now potentially in an inconsistent state
435 // since simple errors should have been caught before
436 // creating the op event
437 delete on_op_complete
;
438 delete on_op_finish_event
;
439 handle_op_complete(event
.op_tid
, filter_ret_val
? 0 : event
.r
);
444 // journal recorded success -- apply the op now
445 on_op_finish_event
->complete(0);
448 template <typename I
>
449 void Replay
<I
>::handle_event(const journal::SnapCreateEvent
&event
,
450 Context
*on_ready
, Context
*on_safe
) {
451 CephContext
*cct
= m_image_ctx
.cct
;
452 ldout(cct
, 20) << ": Snap create event" << dendl
;
454 Mutex::Locker
locker(m_lock
);
456 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
458 if (on_op_complete
== nullptr) {
462 // ignore errors caused due to replay
463 op_event
->ignore_error_codes
= {-EEXIST
};
466 m_image_ctx
.op_work_queue
->queue(new C_RefreshIfRequired
<I
>(
467 m_image_ctx
, new ExecuteOp
<I
, journal::SnapCreateEvent
>(m_image_ctx
, event
,
471 // do not process more events until the state machine is ready
472 // since it will affect IO
473 op_event
->op_in_progress
= true;
474 op_event
->on_start_ready
= on_ready
;
477 template <typename I
>
478 void Replay
<I
>::handle_event(const journal::SnapRemoveEvent
&event
,
479 Context
*on_ready
, Context
*on_safe
) {
480 CephContext
*cct
= m_image_ctx
.cct
;
481 ldout(cct
, 20) << ": Snap remove event" << dendl
;
483 Mutex::Locker
locker(m_lock
);
485 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
487 if (on_op_complete
== nullptr) {
491 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
492 m_image_ctx
, new ExecuteOp
<I
, journal::SnapRemoveEvent
>(m_image_ctx
, event
,
495 // ignore errors caused due to replay
496 op_event
->ignore_error_codes
= {-ENOENT
};
498 on_ready
->complete(0);
501 template <typename I
>
502 void Replay
<I
>::handle_event(const journal::SnapRenameEvent
&event
,
503 Context
*on_ready
, Context
*on_safe
) {
504 CephContext
*cct
= m_image_ctx
.cct
;
505 ldout(cct
, 20) << ": Snap rename event" << dendl
;
507 Mutex::Locker
locker(m_lock
);
509 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
511 if (on_op_complete
== nullptr) {
515 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
516 m_image_ctx
, new ExecuteOp
<I
, journal::SnapRenameEvent
>(m_image_ctx
, event
,
519 // ignore errors caused due to replay
520 op_event
->ignore_error_codes
= {-EEXIST
};
522 on_ready
->complete(0);
525 template <typename I
>
526 void Replay
<I
>::handle_event(const journal::SnapProtectEvent
&event
,
527 Context
*on_ready
, Context
*on_safe
) {
528 CephContext
*cct
= m_image_ctx
.cct
;
529 ldout(cct
, 20) << ": Snap protect event" << dendl
;
531 Mutex::Locker
locker(m_lock
);
533 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
535 if (on_op_complete
== nullptr) {
539 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
540 m_image_ctx
, new ExecuteOp
<I
, journal::SnapProtectEvent
>(m_image_ctx
, event
,
543 // ignore errors caused due to replay
544 op_event
->ignore_error_codes
= {-EBUSY
};
546 on_ready
->complete(0);
549 template <typename I
>
550 void Replay
<I
>::handle_event(const journal::SnapUnprotectEvent
&event
,
551 Context
*on_ready
, Context
*on_safe
) {
552 CephContext
*cct
= m_image_ctx
.cct
;
553 ldout(cct
, 20) << ": Snap unprotect event" << dendl
;
555 Mutex::Locker
locker(m_lock
);
557 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
559 if (on_op_complete
== nullptr) {
563 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
564 m_image_ctx
, new ExecuteOp
<I
, journal::SnapUnprotectEvent
>(m_image_ctx
,
568 // ignore errors recorded in the journal
569 op_event
->op_finish_error_codes
= {-EBUSY
};
571 // ignore errors caused due to replay
572 op_event
->ignore_error_codes
= {-EINVAL
};
574 on_ready
->complete(0);
577 template <typename I
>
578 void Replay
<I
>::handle_event(const journal::SnapRollbackEvent
&event
,
579 Context
*on_ready
, Context
*on_safe
) {
580 CephContext
*cct
= m_image_ctx
.cct
;
581 ldout(cct
, 20) << ": Snap rollback start event" << dendl
;
583 Mutex::Locker
locker(m_lock
);
585 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
587 if (on_op_complete
== nullptr) {
591 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
592 m_image_ctx
, new ExecuteOp
<I
, journal::SnapRollbackEvent
>(m_image_ctx
,
596 on_ready
->complete(0);
599 template <typename I
>
600 void Replay
<I
>::handle_event(const journal::RenameEvent
&event
,
601 Context
*on_ready
, Context
*on_safe
) {
602 CephContext
*cct
= m_image_ctx
.cct
;
603 ldout(cct
, 20) << ": Rename event" << dendl
;
605 Mutex::Locker
locker(m_lock
);
607 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
609 if (on_op_complete
== nullptr) {
613 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
614 m_image_ctx
, new ExecuteOp
<I
, journal::RenameEvent
>(m_image_ctx
, event
,
617 // ignore errors caused due to replay
618 op_event
->ignore_error_codes
= {-EEXIST
};
620 on_ready
->complete(0);
623 template <typename I
>
624 void Replay
<I
>::handle_event(const journal::ResizeEvent
&event
,
625 Context
*on_ready
, Context
*on_safe
) {
626 CephContext
*cct
= m_image_ctx
.cct
;
627 ldout(cct
, 20) << ": Resize start event" << dendl
;
629 Mutex::Locker
locker(m_lock
);
631 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
633 if (on_op_complete
== nullptr) {
638 m_image_ctx
.op_work_queue
->queue(new C_RefreshIfRequired
<I
>(
639 m_image_ctx
, new ExecuteOp
<I
, journal::ResizeEvent
>(m_image_ctx
, event
,
640 on_op_complete
)), 0);
642 // do not process more events until the state machine is ready
643 // since it will affect IO
644 op_event
->op_in_progress
= true;
645 op_event
->on_start_ready
= on_ready
;
648 template <typename I
>
649 void Replay
<I
>::handle_event(const journal::FlattenEvent
&event
,
650 Context
*on_ready
, Context
*on_safe
) {
651 CephContext
*cct
= m_image_ctx
.cct
;
652 ldout(cct
, 20) << ": Flatten start event" << dendl
;
654 Mutex::Locker
locker(m_lock
);
656 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
658 if (on_op_complete
== nullptr) {
662 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
663 m_image_ctx
, new ExecuteOp
<I
, journal::FlattenEvent
>(m_image_ctx
, event
,
666 // ignore errors caused due to replay
667 op_event
->ignore_error_codes
= {-EINVAL
};
669 on_ready
->complete(0);
672 template <typename I
>
673 void Replay
<I
>::handle_event(const journal::DemotePromoteEvent
&event
,
674 Context
*on_ready
, Context
*on_safe
) {
675 CephContext
*cct
= m_image_ctx
.cct
;
676 ldout(cct
, 20) << ": Demote/Promote event" << dendl
;
677 on_ready
->complete(0);
678 on_safe
->complete(0);
681 template <typename I
>
682 void Replay
<I
>::handle_event(const journal::SnapLimitEvent
&event
,
683 Context
*on_ready
, Context
*on_safe
) {
684 CephContext
*cct
= m_image_ctx
.cct
;
685 ldout(cct
, 20) << ": Snap limit event" << dendl
;
687 Mutex::Locker
locker(m_lock
);
689 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
691 if (on_op_complete
== nullptr) {
695 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
696 m_image_ctx
, new ExecuteOp
<I
, journal::SnapLimitEvent
>(m_image_ctx
,
700 on_ready
->complete(0);
703 template <typename I
>
704 void Replay
<I
>::handle_event(const journal::UpdateFeaturesEvent
&event
,
705 Context
*on_ready
, Context
*on_safe
) {
706 CephContext
*cct
= m_image_ctx
.cct
;
707 ldout(cct
, 20) << ": Update features event" << dendl
;
709 Mutex::Locker
locker(m_lock
);
711 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
713 if (on_op_complete
== nullptr) {
718 m_image_ctx
.op_work_queue
->queue(new C_RefreshIfRequired
<I
>(
719 m_image_ctx
, new ExecuteOp
<I
, journal::UpdateFeaturesEvent
>(
720 m_image_ctx
, event
, on_op_complete
)), 0);
722 // do not process more events until the state machine is ready
723 // since it will affect IO
724 op_event
->op_in_progress
= true;
725 op_event
->on_start_ready
= on_ready
;
728 template <typename I
>
729 void Replay
<I
>::handle_event(const journal::MetadataSetEvent
&event
,
730 Context
*on_ready
, Context
*on_safe
) {
731 CephContext
*cct
= m_image_ctx
.cct
;
732 ldout(cct
, 20) << ": Metadata set event" << dendl
;
734 Mutex::Locker
locker(m_lock
);
736 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
738 if (on_op_complete
== nullptr) {
742 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
743 m_image_ctx
, new ExecuteOp
<I
, journal::MetadataSetEvent
>(
744 m_image_ctx
, event
, on_op_complete
));
746 on_ready
->complete(0);
749 template <typename I
>
750 void Replay
<I
>::handle_event(const journal::MetadataRemoveEvent
&event
,
751 Context
*on_ready
, Context
*on_safe
) {
752 CephContext
*cct
= m_image_ctx
.cct
;
753 ldout(cct
, 20) << ": Metadata remove event" << dendl
;
755 Mutex::Locker
locker(m_lock
);
757 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
759 if (on_op_complete
== nullptr) {
763 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
764 m_image_ctx
, new ExecuteOp
<I
, journal::MetadataRemoveEvent
>(
765 m_image_ctx
, event
, on_op_complete
));
767 // ignore errors caused due to replay
768 op_event
->ignore_error_codes
= {-ENOENT
};
770 on_ready
->complete(0);
773 template <typename I
>
774 void Replay
<I
>::handle_event(const journal::UnknownEvent
&event
,
775 Context
*on_ready
, Context
*on_safe
) {
776 CephContext
*cct
= m_image_ctx
.cct
;
777 ldout(cct
, 20) << ": unknown event" << dendl
;
778 on_ready
->complete(0);
779 on_safe
->complete(0);
782 template <typename I
>
783 void Replay
<I
>::handle_aio_modify_complete(Context
*on_ready
, Context
*on_safe
,
785 Mutex::Locker
locker(m_lock
);
786 CephContext
*cct
= m_image_ctx
.cct
;
787 ldout(cct
, 20) << ": on_ready=" << on_ready
<< ", "
788 << "on_safe=" << on_safe
<< ", r=" << r
<< dendl
;
790 if (on_ready
!= nullptr) {
791 on_ready
->complete(0);
794 lderr(cct
) << ": AIO modify op failed: " << cpp_strerror(r
) << dendl
;
795 on_safe
->complete(r
);
799 // will be completed after next flush operation completes
800 m_aio_modify_safe_contexts
.insert(on_safe
);
803 template <typename I
>
804 void Replay
<I
>::handle_aio_flush_complete(Context
*on_flush_safe
,
805 Contexts
&on_safe_ctxs
, int r
) {
806 CephContext
*cct
= m_image_ctx
.cct
;
807 ldout(cct
, 20) << ": r=" << r
<< dendl
;
810 lderr(cct
) << ": AIO flush failed: " << cpp_strerror(r
) << dendl
;
813 Context
*on_aio_ready
= nullptr;
814 Context
*on_flush
= nullptr;
816 Mutex::Locker
locker(m_lock
);
817 assert(m_in_flight_aio_flush
> 0);
818 assert(m_in_flight_aio_modify
>= on_safe_ctxs
.size());
819 --m_in_flight_aio_flush
;
820 m_in_flight_aio_modify
-= on_safe_ctxs
.size();
822 std::swap(on_aio_ready
, m_on_aio_ready
);
823 if (m_in_flight_op_events
== 0 &&
824 (m_in_flight_aio_flush
+ m_in_flight_aio_modify
) == 0) {
825 on_flush
= m_flush_ctx
;
828 // strip out previously failed on_safe contexts
829 for (auto it
= on_safe_ctxs
.begin(); it
!= on_safe_ctxs
.end(); ) {
830 if (m_aio_modify_safe_contexts
.erase(*it
)) {
833 it
= on_safe_ctxs
.erase(it
);
838 if (on_aio_ready
!= nullptr) {
839 ldout(cct
, 10) << ": resuming paused AIO" << dendl
;
840 on_aio_ready
->complete(0);
843 if (on_flush_safe
!= nullptr) {
844 on_safe_ctxs
.push_back(on_flush_safe
);
846 for (auto ctx
: on_safe_ctxs
) {
847 ldout(cct
, 20) << ": completing safe context: " << ctx
<< dendl
;
851 if (on_flush
!= nullptr) {
852 ldout(cct
, 20) << ": completing flush context: " << on_flush
<< dendl
;
853 on_flush
->complete(r
);
857 template <typename I
>
858 Context
*Replay
<I
>::create_op_context_callback(uint64_t op_tid
,
861 OpEvent
**op_event
) {
862 CephContext
*cct
= m_image_ctx
.cct
;
864 assert(m_lock
.is_locked());
865 if (m_op_events
.count(op_tid
) != 0) {
866 lderr(cct
) << ": duplicate op tid detected: " << op_tid
<< dendl
;
868 // on_ready is already async but on failure invoke on_safe async
870 on_ready
->complete(0);
871 m_image_ctx
.op_work_queue
->queue(on_safe
, -EINVAL
);
875 ++m_in_flight_op_events
;
876 *op_event
= &m_op_events
[op_tid
];
877 (*op_event
)->on_start_safe
= on_safe
;
879 Context
*on_op_complete
= new C_OpOnComplete(this, op_tid
);
880 (*op_event
)->on_op_complete
= on_op_complete
;
881 return on_op_complete
;
884 template <typename I
>
885 void Replay
<I
>::handle_op_complete(uint64_t op_tid
, int r
) {
886 CephContext
*cct
= m_image_ctx
.cct
;
887 ldout(cct
, 20) << ": op_tid=" << op_tid
<< ", "
888 << "r=" << r
<< dendl
;
891 bool shutting_down
= false;
893 Mutex::Locker
locker(m_lock
);
894 auto op_it
= m_op_events
.find(op_tid
);
895 assert(op_it
!= m_op_events
.end());
897 op_event
= std::move(op_it
->second
);
898 m_op_events
.erase(op_it
);
900 shutting_down
= (m_flush_ctx
!= nullptr);
903 assert(op_event
.on_start_ready
== nullptr || (r
< 0 && r
!= -ERESTART
));
904 if (op_event
.on_start_ready
!= nullptr) {
905 // blocking op event failed before it became ready
906 assert(op_event
.on_finish_ready
== nullptr &&
907 op_event
.on_finish_safe
== nullptr);
909 op_event
.on_start_ready
->complete(0);
911 // event kicked off by OpFinishEvent
912 assert((op_event
.on_finish_ready
!= nullptr &&
913 op_event
.on_finish_safe
!= nullptr) || shutting_down
);
916 if (op_event
.on_op_finish_event
!= nullptr) {
917 op_event
.on_op_finish_event
->complete(r
);
920 if (op_event
.on_finish_ready
!= nullptr) {
921 op_event
.on_finish_ready
->complete(0);
924 // filter out errors caused by replay of the same op
925 if (r
< 0 && op_event
.ignore_error_codes
.count(r
) != 0) {
929 op_event
.on_start_safe
->complete(r
);
930 if (op_event
.on_finish_safe
!= nullptr) {
931 op_event
.on_finish_safe
->complete(r
);
934 // shut down request might have occurred while lock was
935 // dropped -- handle if pending
936 Context
*on_flush
= nullptr;
938 Mutex::Locker
locker(m_lock
);
939 assert(m_in_flight_op_events
> 0);
940 --m_in_flight_op_events
;
941 if (m_in_flight_op_events
== 0 &&
942 (m_in_flight_aio_flush
+ m_in_flight_aio_modify
) == 0) {
943 on_flush
= m_flush_ctx
;
946 if (on_flush
!= nullptr) {
947 m_image_ctx
.op_work_queue
->queue(on_flush
, 0);
951 template <typename I
>
953 Replay
<I
>::create_aio_modify_completion(Context
*on_ready
, Context
*on_safe
,
954 io::aio_type_t aio_type
,
955 bool *flush_required
) {
956 Mutex::Locker
locker(m_lock
);
957 CephContext
*cct
= m_image_ctx
.cct
;
958 assert(m_on_aio_ready
== nullptr);
960 ++m_in_flight_aio_modify
;
961 m_aio_modify_unsafe_contexts
.push_back(on_safe
);
963 // FLUSH if we hit the low-water mark -- on_safe contexts are
964 // completed by flushes-only so that we don't move the journal
965 // commit position until safely on-disk
967 *flush_required
= (m_aio_modify_unsafe_contexts
.size() ==
968 IN_FLIGHT_IO_LOW_WATER_MARK
);
969 if (*flush_required
) {
970 ldout(cct
, 10) << ": hit AIO replay low-water mark: scheduling flush"
974 // READY for more events if:
975 // * not at high-water mark for IO
976 // * in-flight ops are at a consistent point (snap create has IO flushed,
977 // shrink has adjusted clip boundary, etc) -- should have already been
979 if (m_in_flight_aio_modify
== IN_FLIGHT_IO_HIGH_WATER_MARK
) {
980 ldout(cct
, 10) << ": hit AIO replay high-water mark: pausing replay"
982 assert(m_on_aio_ready
== nullptr);
983 std::swap(m_on_aio_ready
, on_ready
);
986 // when the modification is ACKed by librbd, we can process the next
987 // event. when flushed, the completion of the next flush will fire the
989 auto aio_comp
= io::AioCompletion::create_and_start
<Context
>(
990 new C_AioModifyComplete(this, on_ready
, on_safe
),
991 util::get_image_ctx(&m_image_ctx
), aio_type
);
995 template <typename I
>
996 io::AioCompletion
*Replay
<I
>::create_aio_flush_completion(Context
*on_safe
) {
997 assert(m_lock
.is_locked());
999 ++m_in_flight_aio_flush
;
1001 // associate all prior write/discard ops to this flush request
1002 auto aio_comp
= io::AioCompletion::create_and_start
<Context
>(
1003 new C_AioFlushComplete(this, on_safe
,
1004 std::move(m_aio_modify_unsafe_contexts
)),
1005 util::get_image_ctx(&m_image_ctx
), io::AIO_TYPE_FLUSH
);
1006 m_aio_modify_unsafe_contexts
.clear();
1010 } // namespace journal
1011 } // namespace librbd
1013 template class librbd::journal::Replay
<librbd::ImageCtx
>;