1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "librbd/journal/Replay.h"
5 #include "common/dout.h"
6 #include "common/errno.h"
7 #include "librbd/ExclusiveLock.h"
8 #include "librbd/ImageCtx.h"
9 #include "librbd/ImageState.h"
10 #include "librbd/internal.h"
11 #include "librbd/Operations.h"
12 #include "librbd/Utils.h"
13 #include "librbd/asio/ContextWQ.h"
14 #include "librbd/io/AioCompletion.h"
15 #include "librbd/io/ImageRequest.h"
17 #define dout_subsys ceph_subsys_rbd
19 #define dout_prefix *_dout << "librbd::journal::Replay: " << this << " "
26 static const uint64_t IN_FLIGHT_IO_LOW_WATER_MARK(32);
27 static const uint64_t IN_FLIGHT_IO_HIGH_WATER_MARK(64);
29 static NoOpProgressContext no_op_progress_callback
;
31 template <typename I
, typename E
>
32 struct ExecuteOp
: public Context
{
35 Context
*on_op_complete
;
37 ExecuteOp(I
&image_ctx
, const E
&event
, Context
*on_op_complete
)
38 : image_ctx(image_ctx
), event(event
), on_op_complete(on_op_complete
) {
41 void execute(const journal::SnapCreateEvent
&_
) {
42 image_ctx
.operations
->execute_snap_create(event
.snap_namespace
,
46 SNAP_CREATE_FLAG_SKIP_NOTIFY_QUIESCE
,
47 no_op_progress_callback
);
50 void execute(const journal::SnapRemoveEvent
&_
) {
51 image_ctx
.operations
->execute_snap_remove(event
.snap_namespace
,
56 void execute(const journal::SnapRenameEvent
&_
) {
57 image_ctx
.operations
->execute_snap_rename(event
.snap_id
,
62 void execute(const journal::SnapProtectEvent
&_
) {
63 image_ctx
.operations
->execute_snap_protect(event
.snap_namespace
,
68 void execute(const journal::SnapUnprotectEvent
&_
) {
69 image_ctx
.operations
->execute_snap_unprotect(event
.snap_namespace
,
74 void execute(const journal::SnapRollbackEvent
&_
) {
75 image_ctx
.operations
->execute_snap_rollback(event
.snap_namespace
,
77 no_op_progress_callback
,
81 void execute(const journal::RenameEvent
&_
) {
82 image_ctx
.operations
->execute_rename(event
.image_name
,
86 void execute(const journal::ResizeEvent
&_
) {
87 image_ctx
.operations
->execute_resize(event
.size
, true, no_op_progress_callback
,
88 on_op_complete
, event
.op_tid
);
91 void execute(const journal::FlattenEvent
&_
) {
92 image_ctx
.operations
->execute_flatten(no_op_progress_callback
,
96 void execute(const journal::SnapLimitEvent
&_
) {
97 image_ctx
.operations
->execute_snap_set_limit(event
.limit
, on_op_complete
);
100 void execute(const journal::UpdateFeaturesEvent
&_
) {
101 image_ctx
.operations
->execute_update_features(event
.features
, event
.enabled
,
102 on_op_complete
, event
.op_tid
);
105 void execute(const journal::MetadataSetEvent
&_
) {
106 image_ctx
.operations
->execute_metadata_set(event
.key
, event
.value
,
110 void execute(const journal::MetadataRemoveEvent
&_
) {
111 image_ctx
.operations
->execute_metadata_remove(event
.key
, on_op_complete
);
114 void finish(int r
) override
{
115 CephContext
*cct
= image_ctx
.cct
;
117 lderr(cct
) << ": ExecuteOp::" << __func__
<< ": r=" << r
<< dendl
;
118 on_op_complete
->complete(r
);
122 ldout(cct
, 20) << ": ExecuteOp::" << __func__
<< dendl
;
123 std::shared_lock owner_locker
{image_ctx
.owner_lock
};
125 if (image_ctx
.exclusive_lock
== nullptr ||
126 !image_ctx
.exclusive_lock
->accept_ops()) {
127 ldout(cct
, 5) << ": lost exclusive lock -- skipping op" << dendl
;
128 on_op_complete
->complete(-ECANCELED
);
136 template <typename I
>
137 struct C_RefreshIfRequired
: public Context
{
141 C_RefreshIfRequired(I
&image_ctx
, Context
*on_finish
)
142 : image_ctx(image_ctx
), on_finish(on_finish
) {
144 ~C_RefreshIfRequired() override
{
148 void finish(int r
) override
{
149 CephContext
*cct
= image_ctx
.cct
;
150 Context
*ctx
= on_finish
;
154 lderr(cct
) << ": C_RefreshIfRequired::" << __func__
<< ": r=" << r
<< dendl
;
155 image_ctx
.op_work_queue
->queue(ctx
, r
);
159 if (image_ctx
.state
->is_refresh_required()) {
160 ldout(cct
, 20) << ": C_RefreshIfRequired::" << __func__
<< ": "
161 << "refresh required" << dendl
;
162 image_ctx
.state
->refresh(ctx
);
166 image_ctx
.op_work_queue
->queue(ctx
, 0);
170 } // anonymous namespace
173 #define dout_prefix *_dout << "librbd::journal::Replay: " << this << " " \
176 template <typename I
>
177 Replay
<I
>::Replay(I
&image_ctx
)
178 : m_image_ctx(image_ctx
) {
181 template <typename I
>
182 Replay
<I
>::~Replay() {
183 std::lock_guard locker
{m_lock
};
184 ceph_assert(m_in_flight_aio_flush
== 0);
185 ceph_assert(m_in_flight_aio_modify
== 0);
186 ceph_assert(m_aio_modify_unsafe_contexts
.empty());
187 ceph_assert(m_aio_modify_safe_contexts
.empty());
188 ceph_assert(m_op_events
.empty());
189 ceph_assert(m_in_flight_op_events
== 0);
192 template <typename I
>
193 int Replay
<I
>::decode(bufferlist::const_iterator
*it
, EventEntry
*event_entry
) {
196 decode(*event_entry
, *it
);
197 } catch (const buffer::error
&err
) {
203 template <typename I
>
204 void Replay
<I
>::process(const EventEntry
&event_entry
,
205 Context
*on_ready
, Context
*on_safe
) {
206 CephContext
*cct
= m_image_ctx
.cct
;
207 ldout(cct
, 20) << ": on_ready=" << on_ready
<< ", on_safe=" << on_safe
210 on_ready
= util::create_async_context_callback(m_image_ctx
, on_ready
);
212 std::shared_lock owner_lock
{m_image_ctx
.owner_lock
};
213 if (m_image_ctx
.exclusive_lock
== nullptr ||
214 !m_image_ctx
.exclusive_lock
->accept_ops()) {
215 ldout(cct
, 5) << ": lost exclusive lock -- skipping event" << dendl
;
216 m_image_ctx
.op_work_queue
->queue(on_safe
, -ECANCELED
);
217 on_ready
->complete(0);
221 boost::apply_visitor(EventVisitor(this, on_ready
, on_safe
),
225 template <typename I
>
226 void Replay
<I
>::shut_down(bool cancel_ops
, Context
*on_finish
) {
227 CephContext
*cct
= m_image_ctx
.cct
;
228 ldout(cct
, 20) << dendl
;
230 io::AioCompletion
*flush_comp
= nullptr;
231 on_finish
= util::create_async_context_callback(
232 m_image_ctx
, on_finish
);
235 std::lock_guard locker
{m_lock
};
237 // safely commit any remaining AIO modify operations
238 if ((m_in_flight_aio_flush
+ m_in_flight_aio_modify
) != 0) {
239 flush_comp
= create_aio_flush_completion(nullptr);
240 ceph_assert(flush_comp
!= nullptr);
243 for (auto &op_event_pair
: m_op_events
) {
244 OpEvent
&op_event
= op_event_pair
.second
;
246 // cancel ops that are waiting to start (waiting for
247 // OpFinishEvent or waiting for ready)
248 if (op_event
.on_start_ready
== nullptr &&
249 op_event
.on_op_finish_event
!= nullptr) {
250 Context
*on_op_finish_event
= nullptr;
251 std::swap(on_op_finish_event
, op_event
.on_op_finish_event
);
252 m_image_ctx
.op_work_queue
->queue(on_op_finish_event
, -ERESTART
);
254 } else if (op_event
.on_op_finish_event
!= nullptr) {
255 // start ops waiting for OpFinishEvent
256 Context
*on_op_finish_event
= nullptr;
257 std::swap(on_op_finish_event
, op_event
.on_op_finish_event
);
258 m_image_ctx
.op_work_queue
->queue(on_op_finish_event
, 0);
259 } else if (op_event
.on_start_ready
!= nullptr) {
260 // waiting for op ready
261 op_event_pair
.second
.finish_on_ready
= true;
265 ceph_assert(!m_shut_down
);
268 ceph_assert(m_flush_ctx
== nullptr);
269 if (m_in_flight_op_events
> 0 || flush_comp
!= nullptr) {
270 std::swap(m_flush_ctx
, on_finish
);
274 // execute the following outside of lock scope
275 if (flush_comp
!= nullptr) {
276 std::shared_lock owner_locker
{m_image_ctx
.owner_lock
};
277 io::ImageRequest
<I
>::aio_flush(&m_image_ctx
, flush_comp
,
278 io::FLUSH_SOURCE_INTERNAL
, {});
280 if (on_finish
!= nullptr) {
281 on_finish
->complete(0);
285 template <typename I
>
286 void Replay
<I
>::flush(Context
*on_finish
) {
287 io::AioCompletion
*aio_comp
;
289 std::lock_guard locker
{m_lock
};
290 aio_comp
= create_aio_flush_completion(
291 util::create_async_context_callback(m_image_ctx
, on_finish
));
292 if (aio_comp
== nullptr) {
297 std::shared_lock owner_locker
{m_image_ctx
.owner_lock
};
298 io::ImageRequest
<I
>::aio_flush(&m_image_ctx
, aio_comp
,
299 io::FLUSH_SOURCE_INTERNAL
, {});
302 template <typename I
>
303 void Replay
<I
>::replay_op_ready(uint64_t op_tid
, Context
*on_resume
) {
304 CephContext
*cct
= m_image_ctx
.cct
;
305 ldout(cct
, 20) << ": op_tid=" << op_tid
<< dendl
;
307 std::lock_guard locker
{m_lock
};
308 auto op_it
= m_op_events
.find(op_tid
);
309 ceph_assert(op_it
!= m_op_events
.end());
311 OpEvent
&op_event
= op_it
->second
;
312 ceph_assert(op_event
.op_in_progress
&&
313 op_event
.on_op_finish_event
== nullptr &&
314 op_event
.on_finish_ready
== nullptr &&
315 op_event
.on_finish_safe
== nullptr);
317 // resume processing replay events
318 Context
*on_start_ready
= nullptr;
319 std::swap(on_start_ready
, op_event
.on_start_ready
);
320 on_start_ready
->complete(0);
322 // cancel has been requested -- send error to paused state machine
323 if (!op_event
.finish_on_ready
&& m_flush_ctx
!= nullptr) {
324 m_image_ctx
.op_work_queue
->queue(on_resume
, -ERESTART
);
328 // resume the op state machine once the associated OpFinishEvent
330 op_event
.on_op_finish_event
= new LambdaContext(
332 on_resume
->complete(r
);
335 // shut down request -- don't expect OpFinishEvent
336 if (op_event
.finish_on_ready
) {
337 m_image_ctx
.op_work_queue
->queue(on_resume
, 0);
341 template <typename I
>
342 void Replay
<I
>::handle_event(const journal::AioDiscardEvent
&event
,
343 Context
*on_ready
, Context
*on_safe
) {
344 CephContext
*cct
= m_image_ctx
.cct
;
345 ldout(cct
, 20) << ": AIO discard event" << dendl
;
348 auto aio_comp
= create_aio_modify_completion(on_ready
, on_safe
,
349 io::AIO_TYPE_DISCARD
,
352 if (aio_comp
== nullptr) {
356 if (!clipped_io(event
.offset
, aio_comp
)) {
357 io::ImageRequest
<I
>::aio_discard(&m_image_ctx
, aio_comp
,
358 {{event
.offset
, event
.length
}},
360 event
.discard_granularity_bytes
, {});
363 if (flush_required
) {
365 auto flush_comp
= create_aio_flush_completion(nullptr);
368 if (flush_comp
!= nullptr) {
369 io::ImageRequest
<I
>::aio_flush(&m_image_ctx
, flush_comp
,
370 io::FLUSH_SOURCE_INTERNAL
, {});
375 template <typename I
>
376 void Replay
<I
>::handle_event(const journal::AioWriteEvent
&event
,
377 Context
*on_ready
, Context
*on_safe
) {
378 CephContext
*cct
= m_image_ctx
.cct
;
379 ldout(cct
, 20) << ": AIO write event" << dendl
;
381 bufferlist data
= event
.data
;
383 auto aio_comp
= create_aio_modify_completion(on_ready
, on_safe
,
387 if (aio_comp
== nullptr) {
391 if (!clipped_io(event
.offset
, aio_comp
)) {
392 io::ImageRequest
<I
>::aio_write(&m_image_ctx
, aio_comp
,
393 {{event
.offset
, event
.length
}},
394 io::ImageArea::DATA
, std::move(data
),
398 if (flush_required
) {
400 auto flush_comp
= create_aio_flush_completion(nullptr);
403 if (flush_comp
!= nullptr) {
404 io::ImageRequest
<I
>::aio_flush(&m_image_ctx
, flush_comp
,
405 io::FLUSH_SOURCE_INTERNAL
, {});
410 template <typename I
>
411 void Replay
<I
>::handle_event(const journal::AioFlushEvent
&event
,
412 Context
*on_ready
, Context
*on_safe
) {
413 CephContext
*cct
= m_image_ctx
.cct
;
414 ldout(cct
, 20) << ": AIO flush event" << dendl
;
416 io::AioCompletion
*aio_comp
;
418 std::lock_guard locker
{m_lock
};
419 aio_comp
= create_aio_flush_completion(on_safe
);
422 if (aio_comp
!= nullptr) {
423 io::ImageRequest
<I
>::aio_flush(&m_image_ctx
, aio_comp
,
424 io::FLUSH_SOURCE_INTERNAL
, {});
426 on_ready
->complete(0);
429 template <typename I
>
430 void Replay
<I
>::handle_event(const journal::AioWriteSameEvent
&event
,
431 Context
*on_ready
, Context
*on_safe
) {
432 CephContext
*cct
= m_image_ctx
.cct
;
433 ldout(cct
, 20) << ": AIO writesame event" << dendl
;
435 bufferlist data
= event
.data
;
437 auto aio_comp
= create_aio_modify_completion(on_ready
, on_safe
,
438 io::AIO_TYPE_WRITESAME
,
441 if (aio_comp
== nullptr) {
445 if (!clipped_io(event
.offset
, aio_comp
)) {
446 io::ImageRequest
<I
>::aio_writesame(&m_image_ctx
, aio_comp
,
447 {{event
.offset
, event
.length
}},
448 io::ImageArea::DATA
, std::move(data
),
452 if (flush_required
) {
454 auto flush_comp
= create_aio_flush_completion(nullptr);
457 if (flush_comp
!= nullptr) {
458 io::ImageRequest
<I
>::aio_flush(&m_image_ctx
, flush_comp
,
459 io::FLUSH_SOURCE_INTERNAL
, {});
464 template <typename I
>
465 void Replay
<I
>::handle_event(const journal::AioCompareAndWriteEvent
&event
,
466 Context
*on_ready
, Context
*on_safe
) {
467 CephContext
*cct
= m_image_ctx
.cct
;
468 ldout(cct
, 20) << ": AIO CompareAndWrite event" << dendl
;
470 bufferlist cmp_data
= event
.cmp_data
;
471 bufferlist write_data
= event
.write_data
;
473 auto aio_comp
= create_aio_modify_completion(on_ready
, on_safe
,
474 io::AIO_TYPE_COMPARE_AND_WRITE
,
478 if (!clipped_io(event
.offset
, aio_comp
)) {
479 io::ImageRequest
<I
>::aio_compare_and_write(&m_image_ctx
, aio_comp
,
480 {{event
.offset
, event
.length
}},
483 std::move(write_data
),
487 if (flush_required
) {
489 auto flush_comp
= create_aio_flush_completion(nullptr);
492 io::ImageRequest
<I
>::aio_flush(&m_image_ctx
, flush_comp
,
493 io::FLUSH_SOURCE_INTERNAL
, {});
497 template <typename I
>
498 void Replay
<I
>::handle_event(const journal::OpFinishEvent
&event
,
499 Context
*on_ready
, Context
*on_safe
) {
500 CephContext
*cct
= m_image_ctx
.cct
;
501 ldout(cct
, 20) << ": Op finish event: "
502 << "op_tid=" << event
.op_tid
<< dendl
;
506 Context
*on_op_complete
= nullptr;
507 Context
*on_op_finish_event
= nullptr;
509 std::lock_guard locker
{m_lock
};
510 auto op_it
= m_op_events
.find(event
.op_tid
);
511 if (op_it
== m_op_events
.end()) {
512 ldout(cct
, 10) << ": unable to locate associated op: assuming previously "
513 << "committed." << dendl
;
514 on_ready
->complete(0);
515 m_image_ctx
.op_work_queue
->queue(on_safe
, 0);
519 OpEvent
&op_event
= op_it
->second
;
520 ceph_assert(op_event
.on_finish_safe
== nullptr);
521 op_event
.on_finish_ready
= on_ready
;
522 op_event
.on_finish_safe
= on_safe
;
523 op_in_progress
= op_event
.op_in_progress
;
524 std::swap(on_op_complete
, op_event
.on_op_complete
);
525 std::swap(on_op_finish_event
, op_event
.on_op_finish_event
);
527 // special errors which indicate op never started but was recorded
528 // as failed in the journal
529 filter_ret_val
= (op_event
.op_finish_error_codes
.count(event
.r
) != 0);
533 if (op_in_progress
) {
534 // bubble the error up to the in-progress op to cancel it
535 on_op_finish_event
->complete(event
.r
);
537 // op hasn't been started -- bubble the error up since
538 // our image is now potentially in an inconsistent state
539 // since simple errors should have been caught before
540 // creating the op event
541 delete on_op_complete
;
542 delete on_op_finish_event
;
543 handle_op_complete(event
.op_tid
, filter_ret_val
? 0 : event
.r
);
548 // journal recorded success -- apply the op now
549 on_op_finish_event
->complete(0);
552 template <typename I
>
553 void Replay
<I
>::handle_event(const journal::SnapCreateEvent
&event
,
554 Context
*on_ready
, Context
*on_safe
) {
555 CephContext
*cct
= m_image_ctx
.cct
;
556 ldout(cct
, 20) << ": Snap create event" << dendl
;
558 std::lock_guard locker
{m_lock
};
560 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
562 if (on_op_complete
== nullptr) {
566 // ignore errors caused due to replay
567 op_event
->ignore_error_codes
= {-EEXIST
};
570 m_image_ctx
.op_work_queue
->queue(new C_RefreshIfRequired
<I
>(
571 m_image_ctx
, new ExecuteOp
<I
, journal::SnapCreateEvent
>(m_image_ctx
, event
,
575 // do not process more events until the state machine is ready
576 // since it will affect IO
577 op_event
->op_in_progress
= true;
578 op_event
->on_start_ready
= on_ready
;
581 template <typename I
>
582 void Replay
<I
>::handle_event(const journal::SnapRemoveEvent
&event
,
583 Context
*on_ready
, Context
*on_safe
) {
584 CephContext
*cct
= m_image_ctx
.cct
;
585 ldout(cct
, 20) << ": Snap remove event" << dendl
;
587 std::lock_guard locker
{m_lock
};
589 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
591 if (on_op_complete
== nullptr) {
595 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
596 m_image_ctx
, new ExecuteOp
<I
, journal::SnapRemoveEvent
>(m_image_ctx
, event
,
599 // ignore errors caused due to replay
600 op_event
->ignore_error_codes
= {-ENOENT
};
602 on_ready
->complete(0);
605 template <typename I
>
606 void Replay
<I
>::handle_event(const journal::SnapRenameEvent
&event
,
607 Context
*on_ready
, Context
*on_safe
) {
608 CephContext
*cct
= m_image_ctx
.cct
;
609 ldout(cct
, 20) << ": Snap rename event" << dendl
;
611 std::lock_guard locker
{m_lock
};
613 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
615 if (on_op_complete
== nullptr) {
619 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
620 m_image_ctx
, new ExecuteOp
<I
, journal::SnapRenameEvent
>(m_image_ctx
, event
,
623 // ignore errors caused due to replay
624 op_event
->ignore_error_codes
= {-EEXIST
};
626 on_ready
->complete(0);
629 template <typename I
>
630 void Replay
<I
>::handle_event(const journal::SnapProtectEvent
&event
,
631 Context
*on_ready
, Context
*on_safe
) {
632 CephContext
*cct
= m_image_ctx
.cct
;
633 ldout(cct
, 20) << ": Snap protect event" << dendl
;
635 std::lock_guard locker
{m_lock
};
637 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
639 if (on_op_complete
== nullptr) {
643 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
644 m_image_ctx
, new ExecuteOp
<I
, journal::SnapProtectEvent
>(m_image_ctx
, event
,
647 // ignore errors caused due to replay
648 op_event
->ignore_error_codes
= {-EBUSY
};
650 on_ready
->complete(0);
653 template <typename I
>
654 void Replay
<I
>::handle_event(const journal::SnapUnprotectEvent
&event
,
655 Context
*on_ready
, Context
*on_safe
) {
656 CephContext
*cct
= m_image_ctx
.cct
;
657 ldout(cct
, 20) << ": Snap unprotect event" << dendl
;
659 std::lock_guard locker
{m_lock
};
661 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
663 if (on_op_complete
== nullptr) {
667 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
668 m_image_ctx
, new ExecuteOp
<I
, journal::SnapUnprotectEvent
>(m_image_ctx
,
672 // ignore errors recorded in the journal
673 op_event
->op_finish_error_codes
= {-EBUSY
};
675 // ignore errors caused due to replay
676 op_event
->ignore_error_codes
= {-EINVAL
};
678 on_ready
->complete(0);
681 template <typename I
>
682 void Replay
<I
>::handle_event(const journal::SnapRollbackEvent
&event
,
683 Context
*on_ready
, Context
*on_safe
) {
684 CephContext
*cct
= m_image_ctx
.cct
;
685 ldout(cct
, 20) << ": Snap rollback start event" << dendl
;
687 std::lock_guard locker
{m_lock
};
689 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
691 if (on_op_complete
== nullptr) {
695 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
696 m_image_ctx
, new ExecuteOp
<I
, journal::SnapRollbackEvent
>(m_image_ctx
,
700 on_ready
->complete(0);
703 template <typename I
>
704 void Replay
<I
>::handle_event(const journal::RenameEvent
&event
,
705 Context
*on_ready
, Context
*on_safe
) {
706 CephContext
*cct
= m_image_ctx
.cct
;
707 ldout(cct
, 20) << ": Rename event" << dendl
;
709 std::lock_guard locker
{m_lock
};
711 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
713 if (on_op_complete
== nullptr) {
717 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
718 m_image_ctx
, new ExecuteOp
<I
, journal::RenameEvent
>(m_image_ctx
, event
,
721 // ignore errors caused due to replay
722 op_event
->ignore_error_codes
= {-EEXIST
};
724 on_ready
->complete(0);
727 template <typename I
>
728 void Replay
<I
>::handle_event(const journal::ResizeEvent
&event
,
729 Context
*on_ready
, Context
*on_safe
) {
730 CephContext
*cct
= m_image_ctx
.cct
;
731 ldout(cct
, 20) << ": Resize start event" << dendl
;
733 std::lock_guard locker
{m_lock
};
735 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
737 if (on_op_complete
== nullptr) {
742 m_image_ctx
.op_work_queue
->queue(new C_RefreshIfRequired
<I
>(
743 m_image_ctx
, new ExecuteOp
<I
, journal::ResizeEvent
>(m_image_ctx
, event
,
744 on_op_complete
)), 0);
746 // do not process more events until the state machine is ready
747 // since it will affect IO
748 op_event
->op_in_progress
= true;
749 op_event
->on_start_ready
= on_ready
;
752 template <typename I
>
753 void Replay
<I
>::handle_event(const journal::FlattenEvent
&event
,
754 Context
*on_ready
, Context
*on_safe
) {
755 CephContext
*cct
= m_image_ctx
.cct
;
756 ldout(cct
, 20) << ": Flatten start event" << dendl
;
758 std::lock_guard locker
{m_lock
};
760 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
762 if (on_op_complete
== nullptr) {
766 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
767 m_image_ctx
, new ExecuteOp
<I
, journal::FlattenEvent
>(m_image_ctx
, event
,
770 // ignore errors caused due to replay
771 op_event
->ignore_error_codes
= {-EINVAL
};
773 on_ready
->complete(0);
776 template <typename I
>
777 void Replay
<I
>::handle_event(const journal::DemotePromoteEvent
&event
,
778 Context
*on_ready
, Context
*on_safe
) {
779 CephContext
*cct
= m_image_ctx
.cct
;
780 ldout(cct
, 20) << ": Demote/Promote event" << dendl
;
781 on_ready
->complete(0);
782 on_safe
->complete(0);
785 template <typename I
>
786 void Replay
<I
>::handle_event(const journal::SnapLimitEvent
&event
,
787 Context
*on_ready
, Context
*on_safe
) {
788 CephContext
*cct
= m_image_ctx
.cct
;
789 ldout(cct
, 20) << ": Snap limit event" << dendl
;
791 std::lock_guard locker
{m_lock
};
793 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
795 if (on_op_complete
== nullptr) {
799 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
800 m_image_ctx
, new ExecuteOp
<I
, journal::SnapLimitEvent
>(m_image_ctx
,
804 op_event
->ignore_error_codes
= {-ERANGE
};
806 on_ready
->complete(0);
809 template <typename I
>
810 void Replay
<I
>::handle_event(const journal::UpdateFeaturesEvent
&event
,
811 Context
*on_ready
, Context
*on_safe
) {
812 CephContext
*cct
= m_image_ctx
.cct
;
813 ldout(cct
, 20) << ": Update features event" << dendl
;
815 std::lock_guard locker
{m_lock
};
817 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
819 if (on_op_complete
== nullptr) {
824 m_image_ctx
.op_work_queue
->queue(new C_RefreshIfRequired
<I
>(
825 m_image_ctx
, new ExecuteOp
<I
, journal::UpdateFeaturesEvent
>(
826 m_image_ctx
, event
, on_op_complete
)), 0);
828 // do not process more events until the state machine is ready
829 // since it will affect IO
830 op_event
->op_in_progress
= true;
831 op_event
->on_start_ready
= on_ready
;
834 template <typename I
>
835 void Replay
<I
>::handle_event(const journal::MetadataSetEvent
&event
,
836 Context
*on_ready
, Context
*on_safe
) {
837 CephContext
*cct
= m_image_ctx
.cct
;
838 ldout(cct
, 20) << ": Metadata set event" << dendl
;
840 std::lock_guard locker
{m_lock
};
842 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
844 if (on_op_complete
== nullptr) {
848 on_op_complete
= new C_RefreshIfRequired
<I
>(m_image_ctx
, on_op_complete
);
849 op_event
->on_op_finish_event
= util::create_async_context_callback(
850 m_image_ctx
, new ExecuteOp
<I
, journal::MetadataSetEvent
>(
851 m_image_ctx
, event
, on_op_complete
));
853 on_ready
->complete(0);
856 template <typename I
>
857 void Replay
<I
>::handle_event(const journal::MetadataRemoveEvent
&event
,
858 Context
*on_ready
, Context
*on_safe
) {
859 CephContext
*cct
= m_image_ctx
.cct
;
860 ldout(cct
, 20) << ": Metadata remove event" << dendl
;
862 std::lock_guard locker
{m_lock
};
864 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
866 if (on_op_complete
== nullptr) {
870 on_op_complete
= new C_RefreshIfRequired
<I
>(m_image_ctx
, on_op_complete
);
871 op_event
->on_op_finish_event
= util::create_async_context_callback(
872 m_image_ctx
, new ExecuteOp
<I
, journal::MetadataRemoveEvent
>(
873 m_image_ctx
, event
, on_op_complete
));
875 // ignore errors caused due to replay
876 op_event
->ignore_error_codes
= {-ENOENT
};
878 on_ready
->complete(0);
881 template <typename I
>
882 void Replay
<I
>::handle_event(const journal::UnknownEvent
&event
,
883 Context
*on_ready
, Context
*on_safe
) {
884 CephContext
*cct
= m_image_ctx
.cct
;
885 ldout(cct
, 20) << ": unknown event" << dendl
;
886 on_ready
->complete(0);
887 on_safe
->complete(0);
890 template <typename I
>
891 void Replay
<I
>::handle_aio_modify_complete(Context
*on_ready
, Context
*on_safe
,
892 int r
, std::set
<int> &filters
) {
893 std::lock_guard locker
{m_lock
};
894 CephContext
*cct
= m_image_ctx
.cct
;
895 ldout(cct
, 20) << ": on_ready=" << on_ready
<< ", "
896 << "on_safe=" << on_safe
<< ", r=" << r
<< dendl
;
898 if (on_ready
!= nullptr) {
899 on_ready
->complete(0);
902 if (filters
.find(r
) != filters
.end())
906 lderr(cct
) << ": AIO modify op failed: " << cpp_strerror(r
) << dendl
;
907 m_image_ctx
.op_work_queue
->queue(on_safe
, r
);
911 // will be completed after next flush operation completes
912 m_aio_modify_safe_contexts
.insert(on_safe
);
915 template <typename I
>
916 void Replay
<I
>::handle_aio_flush_complete(Context
*on_flush_safe
,
917 Contexts
&on_safe_ctxs
, int r
) {
918 CephContext
*cct
= m_image_ctx
.cct
;
919 ldout(cct
, 20) << ": r=" << r
<< dendl
;
922 lderr(cct
) << ": AIO flush failed: " << cpp_strerror(r
) << dendl
;
925 Context
*on_aio_ready
= nullptr;
926 Context
*on_flush
= nullptr;
928 std::lock_guard locker
{m_lock
};
929 ceph_assert(m_in_flight_aio_flush
> 0);
930 ceph_assert(m_in_flight_aio_modify
>= on_safe_ctxs
.size());
931 --m_in_flight_aio_flush
;
932 m_in_flight_aio_modify
-= on_safe_ctxs
.size();
934 std::swap(on_aio_ready
, m_on_aio_ready
);
935 if (m_in_flight_op_events
== 0 &&
936 (m_in_flight_aio_flush
+ m_in_flight_aio_modify
) == 0) {
937 on_flush
= m_flush_ctx
;
940 // strip out previously failed on_safe contexts
941 for (auto it
= on_safe_ctxs
.begin(); it
!= on_safe_ctxs
.end(); ) {
942 if (m_aio_modify_safe_contexts
.erase(*it
)) {
945 it
= on_safe_ctxs
.erase(it
);
950 if (on_aio_ready
!= nullptr) {
951 ldout(cct
, 10) << ": resuming paused AIO" << dendl
;
952 on_aio_ready
->complete(0);
955 if (on_flush_safe
!= nullptr) {
956 on_safe_ctxs
.push_back(on_flush_safe
);
958 for (auto ctx
: on_safe_ctxs
) {
959 ldout(cct
, 20) << ": completing safe context: " << ctx
<< dendl
;
963 if (on_flush
!= nullptr) {
964 ldout(cct
, 20) << ": completing flush context: " << on_flush
<< dendl
;
965 on_flush
->complete(r
);
969 template <typename I
>
970 Context
*Replay
<I
>::create_op_context_callback(uint64_t op_tid
,
973 OpEvent
**op_event
) {
974 CephContext
*cct
= m_image_ctx
.cct
;
976 ldout(cct
, 5) << ": ignoring event after shut down" << dendl
;
977 on_ready
->complete(0);
978 m_image_ctx
.op_work_queue
->queue(on_safe
, -ESHUTDOWN
);
982 ceph_assert(ceph_mutex_is_locked(m_lock
));
983 if (m_op_events
.count(op_tid
) != 0) {
984 lderr(cct
) << ": duplicate op tid detected: " << op_tid
<< dendl
;
986 // on_ready is already async but on failure invoke on_safe async
988 on_ready
->complete(0);
989 m_image_ctx
.op_work_queue
->queue(on_safe
, -EINVAL
);
993 ++m_in_flight_op_events
;
994 *op_event
= &m_op_events
[op_tid
];
995 (*op_event
)->on_start_safe
= on_safe
;
997 Context
*on_op_complete
= new C_OpOnComplete(this, op_tid
);
998 (*op_event
)->on_op_complete
= on_op_complete
;
999 return on_op_complete
;
1002 template <typename I
>
1003 void Replay
<I
>::handle_op_complete(uint64_t op_tid
, int r
) {
1004 CephContext
*cct
= m_image_ctx
.cct
;
1005 ldout(cct
, 20) << ": op_tid=" << op_tid
<< ", "
1006 << "r=" << r
<< dendl
;
1009 bool shutting_down
= false;
1011 std::lock_guard locker
{m_lock
};
1012 auto op_it
= m_op_events
.find(op_tid
);
1013 ceph_assert(op_it
!= m_op_events
.end());
1015 op_event
= std::move(op_it
->second
);
1016 m_op_events
.erase(op_it
);
1019 ceph_assert(m_flush_ctx
!= nullptr);
1020 shutting_down
= true;
1024 ceph_assert(op_event
.on_start_ready
== nullptr || (r
< 0 && r
!= -ERESTART
));
1025 if (op_event
.on_start_ready
!= nullptr) {
1026 // blocking op event failed before it became ready
1027 ceph_assert(op_event
.on_finish_ready
== nullptr &&
1028 op_event
.on_finish_safe
== nullptr);
1030 op_event
.on_start_ready
->complete(0);
1032 // event kicked off by OpFinishEvent
1033 ceph_assert((op_event
.on_finish_ready
!= nullptr &&
1034 op_event
.on_finish_safe
!= nullptr) || shutting_down
);
1037 if (op_event
.on_op_finish_event
!= nullptr) {
1038 op_event
.on_op_finish_event
->complete(r
);
1041 if (op_event
.on_finish_ready
!= nullptr) {
1042 op_event
.on_finish_ready
->complete(0);
1045 // filter out errors caused by replay of the same op
1046 if (r
< 0 && op_event
.ignore_error_codes
.count(r
) != 0) {
1050 op_event
.on_start_safe
->complete(r
);
1051 if (op_event
.on_finish_safe
!= nullptr) {
1052 op_event
.on_finish_safe
->complete(r
);
1055 // shut down request might have occurred while lock was
1056 // dropped -- handle if pending
1057 Context
*on_flush
= nullptr;
1059 std::lock_guard locker
{m_lock
};
1060 ceph_assert(m_in_flight_op_events
> 0);
1061 --m_in_flight_op_events
;
1062 if (m_in_flight_op_events
== 0 &&
1063 (m_in_flight_aio_flush
+ m_in_flight_aio_modify
) == 0) {
1064 on_flush
= m_flush_ctx
;
1067 if (on_flush
!= nullptr) {
1068 m_image_ctx
.op_work_queue
->queue(on_flush
, 0);
1072 template <typename I
>
1074 Replay
<I
>::create_aio_modify_completion(Context
*on_ready
,
1076 io::aio_type_t aio_type
,
1077 bool *flush_required
,
1078 std::set
<int> &&filters
) {
1079 std::lock_guard locker
{m_lock
};
1080 CephContext
*cct
= m_image_ctx
.cct
;
1081 ceph_assert(m_on_aio_ready
== nullptr);
1084 ldout(cct
, 5) << ": ignoring event after shut down" << dendl
;
1085 on_ready
->complete(0);
1086 m_image_ctx
.op_work_queue
->queue(on_safe
, -ESHUTDOWN
);
1090 ++m_in_flight_aio_modify
;
1091 m_aio_modify_unsafe_contexts
.push_back(on_safe
);
1093 // FLUSH if we hit the low-water mark -- on_safe contexts are
1094 // completed by flushes-only so that we don't move the journal
1095 // commit position until safely on-disk
1097 *flush_required
= (m_aio_modify_unsafe_contexts
.size() ==
1098 IN_FLIGHT_IO_LOW_WATER_MARK
);
1099 if (*flush_required
) {
1100 ldout(cct
, 10) << ": hit AIO replay low-water mark: scheduling flush"
1104 // READY for more events if:
1105 // * not at high-water mark for IO
1106 // * in-flight ops are at a consistent point (snap create has IO flushed,
1107 // shrink has adjusted clip boundary, etc) -- should have already been
1108 // flagged not-ready
1109 if (m_in_flight_aio_modify
== IN_FLIGHT_IO_HIGH_WATER_MARK
) {
1110 ldout(cct
, 10) << ": hit AIO replay high-water mark: pausing replay"
1112 ceph_assert(m_on_aio_ready
== nullptr);
1113 std::swap(m_on_aio_ready
, on_ready
);
1116 // when the modification is ACKed by librbd, we can process the next
1117 // event. when flushed, the completion of the next flush will fire the
1119 auto aio_comp
= io::AioCompletion::create_and_start
<Context
>(
1120 new C_AioModifyComplete(this, on_ready
, on_safe
, std::move(filters
)),
1121 util::get_image_ctx(&m_image_ctx
), aio_type
);
1125 template <typename I
>
1126 io::AioCompletion
*Replay
<I
>::create_aio_flush_completion(Context
*on_safe
) {
1127 ceph_assert(ceph_mutex_is_locked(m_lock
));
1129 CephContext
*cct
= m_image_ctx
.cct
;
1131 ldout(cct
, 5) << ": ignoring event after shut down" << dendl
;
1132 if (on_safe
!= nullptr) {
1133 m_image_ctx
.op_work_queue
->queue(on_safe
, -ESHUTDOWN
);
1138 ++m_in_flight_aio_flush
;
1140 // associate all prior write/discard ops to this flush request
1141 auto aio_comp
= io::AioCompletion::create_and_start
<Context
>(
1142 new C_AioFlushComplete(this, on_safe
,
1143 std::move(m_aio_modify_unsafe_contexts
)),
1144 util::get_image_ctx(&m_image_ctx
), io::AIO_TYPE_FLUSH
);
1145 m_aio_modify_unsafe_contexts
.clear();
1149 template <typename I
>
1150 bool Replay
<I
>::clipped_io(uint64_t image_offset
, io::AioCompletion
*aio_comp
) {
1151 CephContext
*cct
= m_image_ctx
.cct
;
1153 m_image_ctx
.image_lock
.lock_shared();
1154 size_t image_size
= m_image_ctx
.size
;
1155 m_image_ctx
.image_lock
.unlock_shared();
1157 if (image_offset
>= image_size
) {
1158 // rbd-mirror image sync might race an IO event w/ associated resize between
1159 // the point the peer is registered and the sync point is created, so no-op
1160 // IO events beyond the current image extents since under normal conditions
1161 // it wouldn't have been recorded in the journal
1162 ldout(cct
, 5) << ": no-op IO event beyond image size" << dendl
;
1164 aio_comp
->set_request_count(0);
1172 } // namespace journal
1173 } // namespace librbd
1175 template class librbd::journal::Replay
<librbd::ImageCtx
>;