1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "librbd/journal/Replay.h"
5 #include "common/dout.h"
6 #include "common/errno.h"
7 #include "common/WorkQueue.h"
8 #include "librbd/ExclusiveLock.h"
9 #include "librbd/ImageCtx.h"
10 #include "librbd/ImageState.h"
11 #include "librbd/internal.h"
12 #include "librbd/Operations.h"
13 #include "librbd/Utils.h"
14 #include "librbd/io/AioCompletion.h"
15 #include "librbd/io/ImageRequest.h"
17 #define dout_subsys ceph_subsys_rbd
19 #define dout_prefix *_dout << "librbd::journal::Replay: " << this << " "
26 static const uint64_t IN_FLIGHT_IO_LOW_WATER_MARK(32);
27 static const uint64_t IN_FLIGHT_IO_HIGH_WATER_MARK(64);
29 static NoOpProgressContext no_op_progress_callback
;
31 template <typename I
, typename E
>
32 struct ExecuteOp
: public Context
{
35 Context
*on_op_complete
;
37 ExecuteOp(I
&image_ctx
, const E
&event
, Context
*on_op_complete
)
38 : image_ctx(image_ctx
), event(event
), on_op_complete(on_op_complete
) {
41 void execute(const journal::SnapCreateEvent
&_
) {
42 image_ctx
.operations
->execute_snap_create(event
.snap_namespace
,
48 void execute(const journal::SnapRemoveEvent
&_
) {
49 image_ctx
.operations
->execute_snap_remove(event
.snap_namespace
,
54 void execute(const journal::SnapRenameEvent
&_
) {
55 image_ctx
.operations
->execute_snap_rename(event
.snap_id
,
60 void execute(const journal::SnapProtectEvent
&_
) {
61 image_ctx
.operations
->execute_snap_protect(event
.snap_namespace
,
66 void execute(const journal::SnapUnprotectEvent
&_
) {
67 image_ctx
.operations
->execute_snap_unprotect(event
.snap_namespace
,
72 void execute(const journal::SnapRollbackEvent
&_
) {
73 image_ctx
.operations
->execute_snap_rollback(event
.snap_namespace
,
75 no_op_progress_callback
,
79 void execute(const journal::RenameEvent
&_
) {
80 image_ctx
.operations
->execute_rename(event
.image_name
,
84 void execute(const journal::ResizeEvent
&_
) {
85 image_ctx
.operations
->execute_resize(event
.size
, true, no_op_progress_callback
,
86 on_op_complete
, event
.op_tid
);
89 void execute(const journal::FlattenEvent
&_
) {
90 image_ctx
.operations
->execute_flatten(no_op_progress_callback
,
94 void execute(const journal::SnapLimitEvent
&_
) {
95 image_ctx
.operations
->execute_snap_set_limit(event
.limit
, on_op_complete
);
98 void execute(const journal::UpdateFeaturesEvent
&_
) {
99 image_ctx
.operations
->execute_update_features(event
.features
, event
.enabled
,
100 on_op_complete
, event
.op_tid
);
103 void execute(const journal::MetadataSetEvent
&_
) {
104 image_ctx
.operations
->execute_metadata_set(event
.key
, event
.value
,
108 void execute(const journal::MetadataRemoveEvent
&_
) {
109 image_ctx
.operations
->execute_metadata_remove(event
.key
, on_op_complete
);
112 void finish(int r
) override
{
113 CephContext
*cct
= image_ctx
.cct
;
115 lderr(cct
) << ": ExecuteOp::" << __func__
<< ": r=" << r
<< dendl
;
116 on_op_complete
->complete(r
);
120 ldout(cct
, 20) << ": ExecuteOp::" << __func__
<< dendl
;
121 RWLock::RLocker
owner_locker(image_ctx
.owner_lock
);
123 if (image_ctx
.exclusive_lock
== nullptr ||
124 !image_ctx
.exclusive_lock
->accept_ops()) {
125 ldout(cct
, 5) << ": lost exclusive lock -- skipping op" << dendl
;
126 on_op_complete
->complete(-ECANCELED
);
134 template <typename I
>
135 struct C_RefreshIfRequired
: public Context
{
139 C_RefreshIfRequired(I
&image_ctx
, Context
*on_finish
)
140 : image_ctx(image_ctx
), on_finish(on_finish
) {
142 ~C_RefreshIfRequired() override
{
146 void finish(int r
) override
{
147 CephContext
*cct
= image_ctx
.cct
;
148 Context
*ctx
= on_finish
;
152 lderr(cct
) << ": C_RefreshIfRequired::" << __func__
<< ": r=" << r
<< dendl
;
153 image_ctx
.op_work_queue
->queue(ctx
, r
);
157 if (image_ctx
.state
->is_refresh_required()) {
158 ldout(cct
, 20) << ": C_RefreshIfRequired::" << __func__
<< ": "
159 << "refresh required" << dendl
;
160 image_ctx
.state
->refresh(ctx
);
164 image_ctx
.op_work_queue
->queue(ctx
, 0);
168 } // anonymous namespace
171 #define dout_prefix *_dout << "librbd::journal::Replay: " << this << " " \
174 template <typename I
>
175 Replay
<I
>::Replay(I
&image_ctx
)
176 : m_image_ctx(image_ctx
), m_lock("Replay<I>::m_lock") {
179 template <typename I
>
180 Replay
<I
>::~Replay() {
181 ceph_assert(m_in_flight_aio_flush
== 0);
182 ceph_assert(m_in_flight_aio_modify
== 0);
183 ceph_assert(m_aio_modify_unsafe_contexts
.empty());
184 ceph_assert(m_aio_modify_safe_contexts
.empty());
185 ceph_assert(m_op_events
.empty());
186 ceph_assert(m_in_flight_op_events
== 0);
189 template <typename I
>
190 int Replay
<I
>::decode(bufferlist::const_iterator
*it
, EventEntry
*event_entry
) {
193 decode(*event_entry
, *it
);
194 } catch (const buffer::error
&err
) {
200 template <typename I
>
201 void Replay
<I
>::process(const EventEntry
&event_entry
,
202 Context
*on_ready
, Context
*on_safe
) {
203 CephContext
*cct
= m_image_ctx
.cct
;
204 ldout(cct
, 20) << ": on_ready=" << on_ready
<< ", on_safe=" << on_safe
207 on_ready
= util::create_async_context_callback(m_image_ctx
, on_ready
);
209 RWLock::RLocker
owner_lock(m_image_ctx
.owner_lock
);
210 if (m_image_ctx
.exclusive_lock
== nullptr ||
211 !m_image_ctx
.exclusive_lock
->accept_ops()) {
212 ldout(cct
, 5) << ": lost exclusive lock -- skipping event" << dendl
;
213 m_image_ctx
.op_work_queue
->queue(on_safe
, -ECANCELED
);
214 on_ready
->complete(0);
218 boost::apply_visitor(EventVisitor(this, on_ready
, on_safe
),
222 template <typename I
>
223 void Replay
<I
>::shut_down(bool cancel_ops
, Context
*on_finish
) {
224 CephContext
*cct
= m_image_ctx
.cct
;
225 ldout(cct
, 20) << dendl
;
227 io::AioCompletion
*flush_comp
= nullptr;
228 on_finish
= util::create_async_context_callback(
229 m_image_ctx
, on_finish
);
232 Mutex::Locker
locker(m_lock
);
234 // safely commit any remaining AIO modify operations
235 if ((m_in_flight_aio_flush
+ m_in_flight_aio_modify
) != 0) {
236 flush_comp
= create_aio_flush_completion(nullptr);
237 ceph_assert(flush_comp
!= nullptr);
240 for (auto &op_event_pair
: m_op_events
) {
241 OpEvent
&op_event
= op_event_pair
.second
;
243 // cancel ops that are waiting to start (waiting for
244 // OpFinishEvent or waiting for ready)
245 if (op_event
.on_start_ready
== nullptr &&
246 op_event
.on_op_finish_event
!= nullptr) {
247 Context
*on_op_finish_event
= nullptr;
248 std::swap(on_op_finish_event
, op_event
.on_op_finish_event
);
249 m_image_ctx
.op_work_queue
->queue(on_op_finish_event
, -ERESTART
);
251 } else if (op_event
.on_op_finish_event
!= nullptr) {
252 // start ops waiting for OpFinishEvent
253 Context
*on_op_finish_event
= nullptr;
254 std::swap(on_op_finish_event
, op_event
.on_op_finish_event
);
255 m_image_ctx
.op_work_queue
->queue(on_op_finish_event
, 0);
256 } else if (op_event
.on_start_ready
!= nullptr) {
257 // waiting for op ready
258 op_event_pair
.second
.finish_on_ready
= true;
262 ceph_assert(!m_shut_down
);
265 ceph_assert(m_flush_ctx
== nullptr);
266 if (m_in_flight_op_events
> 0 || flush_comp
!= nullptr) {
267 std::swap(m_flush_ctx
, on_finish
);
271 // execute the following outside of lock scope
272 if (flush_comp
!= nullptr) {
273 RWLock::RLocker
owner_locker(m_image_ctx
.owner_lock
);
274 io::ImageRequest
<I
>::aio_flush(&m_image_ctx
, flush_comp
,
275 io::FLUSH_SOURCE_INTERNAL
, {});
277 if (on_finish
!= nullptr) {
278 on_finish
->complete(0);
282 template <typename I
>
283 void Replay
<I
>::flush(Context
*on_finish
) {
284 io::AioCompletion
*aio_comp
;
286 Mutex::Locker
locker(m_lock
);
287 aio_comp
= create_aio_flush_completion(
288 util::create_async_context_callback(m_image_ctx
, on_finish
));
289 if (aio_comp
== nullptr) {
294 RWLock::RLocker
owner_locker(m_image_ctx
.owner_lock
);
295 io::ImageRequest
<I
>::aio_flush(&m_image_ctx
, aio_comp
,
296 io::FLUSH_SOURCE_INTERNAL
, {});
299 template <typename I
>
300 void Replay
<I
>::replay_op_ready(uint64_t op_tid
, Context
*on_resume
) {
301 CephContext
*cct
= m_image_ctx
.cct
;
302 ldout(cct
, 20) << ": op_tid=" << op_tid
<< dendl
;
304 Mutex::Locker
locker(m_lock
);
305 auto op_it
= m_op_events
.find(op_tid
);
306 ceph_assert(op_it
!= m_op_events
.end());
308 OpEvent
&op_event
= op_it
->second
;
309 ceph_assert(op_event
.op_in_progress
&&
310 op_event
.on_op_finish_event
== nullptr &&
311 op_event
.on_finish_ready
== nullptr &&
312 op_event
.on_finish_safe
== nullptr);
314 // resume processing replay events
315 Context
*on_start_ready
= nullptr;
316 std::swap(on_start_ready
, op_event
.on_start_ready
);
317 on_start_ready
->complete(0);
319 // cancel has been requested -- send error to paused state machine
320 if (!op_event
.finish_on_ready
&& m_flush_ctx
!= nullptr) {
321 m_image_ctx
.op_work_queue
->queue(on_resume
, -ERESTART
);
325 // resume the op state machine once the associated OpFinishEvent
327 op_event
.on_op_finish_event
= new FunctionContext(
329 on_resume
->complete(r
);
332 // shut down request -- don't expect OpFinishEvent
333 if (op_event
.finish_on_ready
) {
334 m_image_ctx
.op_work_queue
->queue(on_resume
, 0);
338 template <typename I
>
339 void Replay
<I
>::handle_event(const journal::AioDiscardEvent
&event
,
340 Context
*on_ready
, Context
*on_safe
) {
341 CephContext
*cct
= m_image_ctx
.cct
;
342 ldout(cct
, 20) << ": AIO discard event" << dendl
;
345 auto aio_comp
= create_aio_modify_completion(on_ready
, on_safe
,
346 io::AIO_TYPE_DISCARD
,
349 if (aio_comp
== nullptr) {
353 if (!clipped_io(event
.offset
, aio_comp
)) {
354 io::ImageRequest
<I
>::aio_discard(&m_image_ctx
, aio_comp
,
355 {{event
.offset
, event
.length
}},
356 event
.discard_granularity_bytes
, {});
359 if (flush_required
) {
361 auto flush_comp
= create_aio_flush_completion(nullptr);
364 if (flush_comp
!= nullptr) {
365 io::ImageRequest
<I
>::aio_flush(&m_image_ctx
, flush_comp
,
366 io::FLUSH_SOURCE_INTERNAL
, {});
371 template <typename I
>
372 void Replay
<I
>::handle_event(const journal::AioWriteEvent
&event
,
373 Context
*on_ready
, Context
*on_safe
) {
374 CephContext
*cct
= m_image_ctx
.cct
;
375 ldout(cct
, 20) << ": AIO write event" << dendl
;
377 bufferlist data
= event
.data
;
379 auto aio_comp
= create_aio_modify_completion(on_ready
, on_safe
,
383 if (aio_comp
== nullptr) {
387 if (!clipped_io(event
.offset
, aio_comp
)) {
388 io::ImageRequest
<I
>::aio_write(&m_image_ctx
, aio_comp
,
389 {{event
.offset
, event
.length
}},
390 std::move(data
), 0, {});
393 if (flush_required
) {
395 auto flush_comp
= create_aio_flush_completion(nullptr);
398 if (flush_comp
!= nullptr) {
399 io::ImageRequest
<I
>::aio_flush(&m_image_ctx
, flush_comp
,
400 io::FLUSH_SOURCE_INTERNAL
, {});
405 template <typename I
>
406 void Replay
<I
>::handle_event(const journal::AioFlushEvent
&event
,
407 Context
*on_ready
, Context
*on_safe
) {
408 CephContext
*cct
= m_image_ctx
.cct
;
409 ldout(cct
, 20) << ": AIO flush event" << dendl
;
411 io::AioCompletion
*aio_comp
;
413 Mutex::Locker
locker(m_lock
);
414 aio_comp
= create_aio_flush_completion(on_safe
);
417 if (aio_comp
!= nullptr) {
418 io::ImageRequest
<I
>::aio_flush(&m_image_ctx
, aio_comp
,
419 io::FLUSH_SOURCE_INTERNAL
, {});
421 on_ready
->complete(0);
424 template <typename I
>
425 void Replay
<I
>::handle_event(const journal::AioWriteSameEvent
&event
,
426 Context
*on_ready
, Context
*on_safe
) {
427 CephContext
*cct
= m_image_ctx
.cct
;
428 ldout(cct
, 20) << ": AIO writesame event" << dendl
;
430 bufferlist data
= event
.data
;
432 auto aio_comp
= create_aio_modify_completion(on_ready
, on_safe
,
433 io::AIO_TYPE_WRITESAME
,
436 if (aio_comp
== nullptr) {
440 if (!clipped_io(event
.offset
, aio_comp
)) {
441 io::ImageRequest
<I
>::aio_writesame(&m_image_ctx
, aio_comp
,
442 {{event
.offset
, event
.length
}},
443 std::move(data
), 0, {});
446 if (flush_required
) {
448 auto flush_comp
= create_aio_flush_completion(nullptr);
451 if (flush_comp
!= nullptr) {
452 io::ImageRequest
<I
>::aio_flush(&m_image_ctx
, flush_comp
,
453 io::FLUSH_SOURCE_INTERNAL
, {});
458 template <typename I
>
459 void Replay
<I
>::handle_event(const journal::AioCompareAndWriteEvent
&event
,
460 Context
*on_ready
, Context
*on_safe
) {
461 CephContext
*cct
= m_image_ctx
.cct
;
462 ldout(cct
, 20) << ": AIO CompareAndWrite event" << dendl
;
464 bufferlist cmp_data
= event
.cmp_data
;
465 bufferlist write_data
= event
.write_data
;
467 auto aio_comp
= create_aio_modify_completion(on_ready
, on_safe
,
468 io::AIO_TYPE_COMPARE_AND_WRITE
,
472 if (!clipped_io(event
.offset
, aio_comp
)) {
473 io::ImageRequest
<I
>::aio_compare_and_write(&m_image_ctx
, aio_comp
,
474 {{event
.offset
, event
.length
}},
476 std::move(write_data
),
480 if (flush_required
) {
482 auto flush_comp
= create_aio_flush_completion(nullptr);
485 io::ImageRequest
<I
>::aio_flush(&m_image_ctx
, flush_comp
,
486 io::FLUSH_SOURCE_INTERNAL
, {});
490 template <typename I
>
491 void Replay
<I
>::handle_event(const journal::OpFinishEvent
&event
,
492 Context
*on_ready
, Context
*on_safe
) {
493 CephContext
*cct
= m_image_ctx
.cct
;
494 ldout(cct
, 20) << ": Op finish event: "
495 << "op_tid=" << event
.op_tid
<< dendl
;
499 Context
*on_op_complete
= nullptr;
500 Context
*on_op_finish_event
= nullptr;
502 Mutex::Locker
locker(m_lock
);
503 auto op_it
= m_op_events
.find(event
.op_tid
);
504 if (op_it
== m_op_events
.end()) {
505 ldout(cct
, 10) << ": unable to locate associated op: assuming previously "
506 << "committed." << dendl
;
507 on_ready
->complete(0);
508 m_image_ctx
.op_work_queue
->queue(on_safe
, 0);
512 OpEvent
&op_event
= op_it
->second
;
513 ceph_assert(op_event
.on_finish_safe
== nullptr);
514 op_event
.on_finish_ready
= on_ready
;
515 op_event
.on_finish_safe
= on_safe
;
516 op_in_progress
= op_event
.op_in_progress
;
517 std::swap(on_op_complete
, op_event
.on_op_complete
);
518 std::swap(on_op_finish_event
, op_event
.on_op_finish_event
);
520 // special errors which indicate op never started but was recorded
521 // as failed in the journal
522 filter_ret_val
= (op_event
.op_finish_error_codes
.count(event
.r
) != 0);
526 if (op_in_progress
) {
527 // bubble the error up to the in-progress op to cancel it
528 on_op_finish_event
->complete(event
.r
);
530 // op hasn't been started -- bubble the error up since
531 // our image is now potentially in an inconsistent state
532 // since simple errors should have been caught before
533 // creating the op event
534 delete on_op_complete
;
535 delete on_op_finish_event
;
536 handle_op_complete(event
.op_tid
, filter_ret_val
? 0 : event
.r
);
541 // journal recorded success -- apply the op now
542 on_op_finish_event
->complete(0);
545 template <typename I
>
546 void Replay
<I
>::handle_event(const journal::SnapCreateEvent
&event
,
547 Context
*on_ready
, Context
*on_safe
) {
548 CephContext
*cct
= m_image_ctx
.cct
;
549 ldout(cct
, 20) << ": Snap create event" << dendl
;
551 Mutex::Locker
locker(m_lock
);
553 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
555 if (on_op_complete
== nullptr) {
559 // ignore errors caused due to replay
560 op_event
->ignore_error_codes
= {-EEXIST
};
563 m_image_ctx
.op_work_queue
->queue(new C_RefreshIfRequired
<I
>(
564 m_image_ctx
, new ExecuteOp
<I
, journal::SnapCreateEvent
>(m_image_ctx
, event
,
568 // do not process more events until the state machine is ready
569 // since it will affect IO
570 op_event
->op_in_progress
= true;
571 op_event
->on_start_ready
= on_ready
;
574 template <typename I
>
575 void Replay
<I
>::handle_event(const journal::SnapRemoveEvent
&event
,
576 Context
*on_ready
, Context
*on_safe
) {
577 CephContext
*cct
= m_image_ctx
.cct
;
578 ldout(cct
, 20) << ": Snap remove event" << dendl
;
580 Mutex::Locker
locker(m_lock
);
582 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
584 if (on_op_complete
== nullptr) {
588 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
589 m_image_ctx
, new ExecuteOp
<I
, journal::SnapRemoveEvent
>(m_image_ctx
, event
,
592 // ignore errors caused due to replay
593 op_event
->ignore_error_codes
= {-ENOENT
};
595 on_ready
->complete(0);
598 template <typename I
>
599 void Replay
<I
>::handle_event(const journal::SnapRenameEvent
&event
,
600 Context
*on_ready
, Context
*on_safe
) {
601 CephContext
*cct
= m_image_ctx
.cct
;
602 ldout(cct
, 20) << ": Snap rename event" << dendl
;
604 Mutex::Locker
locker(m_lock
);
606 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
608 if (on_op_complete
== nullptr) {
612 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
613 m_image_ctx
, new ExecuteOp
<I
, journal::SnapRenameEvent
>(m_image_ctx
, event
,
616 // ignore errors caused due to replay
617 op_event
->ignore_error_codes
= {-EEXIST
};
619 on_ready
->complete(0);
622 template <typename I
>
623 void Replay
<I
>::handle_event(const journal::SnapProtectEvent
&event
,
624 Context
*on_ready
, Context
*on_safe
) {
625 CephContext
*cct
= m_image_ctx
.cct
;
626 ldout(cct
, 20) << ": Snap protect event" << dendl
;
628 Mutex::Locker
locker(m_lock
);
630 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
632 if (on_op_complete
== nullptr) {
636 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
637 m_image_ctx
, new ExecuteOp
<I
, journal::SnapProtectEvent
>(m_image_ctx
, event
,
640 // ignore errors caused due to replay
641 op_event
->ignore_error_codes
= {-EBUSY
};
643 on_ready
->complete(0);
646 template <typename I
>
647 void Replay
<I
>::handle_event(const journal::SnapUnprotectEvent
&event
,
648 Context
*on_ready
, Context
*on_safe
) {
649 CephContext
*cct
= m_image_ctx
.cct
;
650 ldout(cct
, 20) << ": Snap unprotect event" << dendl
;
652 Mutex::Locker
locker(m_lock
);
654 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
656 if (on_op_complete
== nullptr) {
660 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
661 m_image_ctx
, new ExecuteOp
<I
, journal::SnapUnprotectEvent
>(m_image_ctx
,
665 // ignore errors recorded in the journal
666 op_event
->op_finish_error_codes
= {-EBUSY
};
668 // ignore errors caused due to replay
669 op_event
->ignore_error_codes
= {-EINVAL
};
671 on_ready
->complete(0);
674 template <typename I
>
675 void Replay
<I
>::handle_event(const journal::SnapRollbackEvent
&event
,
676 Context
*on_ready
, Context
*on_safe
) {
677 CephContext
*cct
= m_image_ctx
.cct
;
678 ldout(cct
, 20) << ": Snap rollback start event" << dendl
;
680 Mutex::Locker
locker(m_lock
);
682 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
684 if (on_op_complete
== nullptr) {
688 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
689 m_image_ctx
, new ExecuteOp
<I
, journal::SnapRollbackEvent
>(m_image_ctx
,
693 on_ready
->complete(0);
696 template <typename I
>
697 void Replay
<I
>::handle_event(const journal::RenameEvent
&event
,
698 Context
*on_ready
, Context
*on_safe
) {
699 CephContext
*cct
= m_image_ctx
.cct
;
700 ldout(cct
, 20) << ": Rename event" << dendl
;
702 Mutex::Locker
locker(m_lock
);
704 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
706 if (on_op_complete
== nullptr) {
710 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
711 m_image_ctx
, new ExecuteOp
<I
, journal::RenameEvent
>(m_image_ctx
, event
,
714 // ignore errors caused due to replay
715 op_event
->ignore_error_codes
= {-EEXIST
};
717 on_ready
->complete(0);
720 template <typename I
>
721 void Replay
<I
>::handle_event(const journal::ResizeEvent
&event
,
722 Context
*on_ready
, Context
*on_safe
) {
723 CephContext
*cct
= m_image_ctx
.cct
;
724 ldout(cct
, 20) << ": Resize start event" << dendl
;
726 Mutex::Locker
locker(m_lock
);
728 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
730 if (on_op_complete
== nullptr) {
735 m_image_ctx
.op_work_queue
->queue(new C_RefreshIfRequired
<I
>(
736 m_image_ctx
, new ExecuteOp
<I
, journal::ResizeEvent
>(m_image_ctx
, event
,
737 on_op_complete
)), 0);
739 // do not process more events until the state machine is ready
740 // since it will affect IO
741 op_event
->op_in_progress
= true;
742 op_event
->on_start_ready
= on_ready
;
745 template <typename I
>
746 void Replay
<I
>::handle_event(const journal::FlattenEvent
&event
,
747 Context
*on_ready
, Context
*on_safe
) {
748 CephContext
*cct
= m_image_ctx
.cct
;
749 ldout(cct
, 20) << ": Flatten start event" << dendl
;
751 Mutex::Locker
locker(m_lock
);
753 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
755 if (on_op_complete
== nullptr) {
759 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
760 m_image_ctx
, new ExecuteOp
<I
, journal::FlattenEvent
>(m_image_ctx
, event
,
763 // ignore errors caused due to replay
764 op_event
->ignore_error_codes
= {-EINVAL
};
766 on_ready
->complete(0);
769 template <typename I
>
770 void Replay
<I
>::handle_event(const journal::DemotePromoteEvent
&event
,
771 Context
*on_ready
, Context
*on_safe
) {
772 CephContext
*cct
= m_image_ctx
.cct
;
773 ldout(cct
, 20) << ": Demote/Promote event" << dendl
;
774 on_ready
->complete(0);
775 on_safe
->complete(0);
778 template <typename I
>
779 void Replay
<I
>::handle_event(const journal::SnapLimitEvent
&event
,
780 Context
*on_ready
, Context
*on_safe
) {
781 CephContext
*cct
= m_image_ctx
.cct
;
782 ldout(cct
, 20) << ": Snap limit event" << dendl
;
784 Mutex::Locker
locker(m_lock
);
786 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
788 if (on_op_complete
== nullptr) {
792 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
793 m_image_ctx
, new ExecuteOp
<I
, journal::SnapLimitEvent
>(m_image_ctx
,
797 op_event
->ignore_error_codes
= {-ERANGE
};
799 on_ready
->complete(0);
802 template <typename I
>
803 void Replay
<I
>::handle_event(const journal::UpdateFeaturesEvent
&event
,
804 Context
*on_ready
, Context
*on_safe
) {
805 CephContext
*cct
= m_image_ctx
.cct
;
806 ldout(cct
, 20) << ": Update features event" << dendl
;
808 Mutex::Locker
locker(m_lock
);
810 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
812 if (on_op_complete
== nullptr) {
817 m_image_ctx
.op_work_queue
->queue(new C_RefreshIfRequired
<I
>(
818 m_image_ctx
, new ExecuteOp
<I
, journal::UpdateFeaturesEvent
>(
819 m_image_ctx
, event
, on_op_complete
)), 0);
821 // do not process more events until the state machine is ready
822 // since it will affect IO
823 op_event
->op_in_progress
= true;
824 op_event
->on_start_ready
= on_ready
;
827 template <typename I
>
828 void Replay
<I
>::handle_event(const journal::MetadataSetEvent
&event
,
829 Context
*on_ready
, Context
*on_safe
) {
830 CephContext
*cct
= m_image_ctx
.cct
;
831 ldout(cct
, 20) << ": Metadata set event" << dendl
;
833 Mutex::Locker
locker(m_lock
);
835 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
837 if (on_op_complete
== nullptr) {
841 on_op_complete
= new C_RefreshIfRequired
<I
>(m_image_ctx
, on_op_complete
);
842 op_event
->on_op_finish_event
= util::create_async_context_callback(
843 m_image_ctx
, new ExecuteOp
<I
, journal::MetadataSetEvent
>(
844 m_image_ctx
, event
, on_op_complete
));
846 on_ready
->complete(0);
849 template <typename I
>
850 void Replay
<I
>::handle_event(const journal::MetadataRemoveEvent
&event
,
851 Context
*on_ready
, Context
*on_safe
) {
852 CephContext
*cct
= m_image_ctx
.cct
;
853 ldout(cct
, 20) << ": Metadata remove event" << dendl
;
855 Mutex::Locker
locker(m_lock
);
857 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
859 if (on_op_complete
== nullptr) {
863 on_op_complete
= new C_RefreshIfRequired
<I
>(m_image_ctx
, on_op_complete
);
864 op_event
->on_op_finish_event
= util::create_async_context_callback(
865 m_image_ctx
, new ExecuteOp
<I
, journal::MetadataRemoveEvent
>(
866 m_image_ctx
, event
, on_op_complete
));
868 // ignore errors caused due to replay
869 op_event
->ignore_error_codes
= {-ENOENT
};
871 on_ready
->complete(0);
874 template <typename I
>
875 void Replay
<I
>::handle_event(const journal::UnknownEvent
&event
,
876 Context
*on_ready
, Context
*on_safe
) {
877 CephContext
*cct
= m_image_ctx
.cct
;
878 ldout(cct
, 20) << ": unknown event" << dendl
;
879 on_ready
->complete(0);
880 on_safe
->complete(0);
883 template <typename I
>
884 void Replay
<I
>::handle_aio_modify_complete(Context
*on_ready
, Context
*on_safe
,
885 int r
, std::set
<int> &filters
,
886 bool writeback_cache_enabled
) {
887 Mutex::Locker
locker(m_lock
);
888 CephContext
*cct
= m_image_ctx
.cct
;
889 ldout(cct
, 20) << ": on_ready=" << on_ready
<< ", "
890 << "on_safe=" << on_safe
<< ", r=" << r
<< dendl
;
892 if (on_ready
!= nullptr) {
893 on_ready
->complete(0);
896 if (filters
.find(r
) != filters
.end())
900 lderr(cct
) << ": AIO modify op failed: " << cpp_strerror(r
) << dendl
;
901 m_image_ctx
.op_work_queue
->queue(on_safe
, r
);
905 if (writeback_cache_enabled
) {
906 // will be completed after next flush operation completes
907 m_aio_modify_safe_contexts
.insert(on_safe
);
909 // IO is safely stored on disk
910 ceph_assert(m_in_flight_aio_modify
> 0);
911 --m_in_flight_aio_modify
;
913 if (m_on_aio_ready
!= nullptr) {
914 ldout(cct
, 10) << ": resuming paused AIO" << dendl
;
915 m_on_aio_ready
->complete(0);
916 m_on_aio_ready
= nullptr;
919 ldout(cct
, 20) << ": completing safe context: " << on_safe
<< dendl
;
920 m_image_ctx
.op_work_queue
->queue(on_safe
, 0);
924 template <typename I
>
925 void Replay
<I
>::handle_aio_flush_complete(Context
*on_flush_safe
,
926 Contexts
&on_safe_ctxs
, int r
) {
927 CephContext
*cct
= m_image_ctx
.cct
;
928 ldout(cct
, 20) << ": r=" << r
<< dendl
;
931 lderr(cct
) << ": AIO flush failed: " << cpp_strerror(r
) << dendl
;
934 Context
*on_aio_ready
= nullptr;
935 Context
*on_flush
= nullptr;
937 Mutex::Locker
locker(m_lock
);
938 ceph_assert(m_in_flight_aio_flush
> 0);
939 ceph_assert(m_in_flight_aio_modify
>= on_safe_ctxs
.size());
940 --m_in_flight_aio_flush
;
941 m_in_flight_aio_modify
-= on_safe_ctxs
.size();
943 std::swap(on_aio_ready
, m_on_aio_ready
);
944 if (m_in_flight_op_events
== 0 &&
945 (m_in_flight_aio_flush
+ m_in_flight_aio_modify
) == 0) {
946 on_flush
= m_flush_ctx
;
949 // strip out previously failed on_safe contexts
950 for (auto it
= on_safe_ctxs
.begin(); it
!= on_safe_ctxs
.end(); ) {
951 if (m_aio_modify_safe_contexts
.erase(*it
)) {
954 it
= on_safe_ctxs
.erase(it
);
959 if (on_aio_ready
!= nullptr) {
960 ldout(cct
, 10) << ": resuming paused AIO" << dendl
;
961 on_aio_ready
->complete(0);
964 if (on_flush_safe
!= nullptr) {
965 on_safe_ctxs
.push_back(on_flush_safe
);
967 for (auto ctx
: on_safe_ctxs
) {
968 ldout(cct
, 20) << ": completing safe context: " << ctx
<< dendl
;
972 if (on_flush
!= nullptr) {
973 ldout(cct
, 20) << ": completing flush context: " << on_flush
<< dendl
;
974 on_flush
->complete(r
);
978 template <typename I
>
979 Context
*Replay
<I
>::create_op_context_callback(uint64_t op_tid
,
982 OpEvent
**op_event
) {
983 CephContext
*cct
= m_image_ctx
.cct
;
985 ldout(cct
, 5) << ": ignoring event after shut down" << dendl
;
986 on_ready
->complete(0);
987 m_image_ctx
.op_work_queue
->queue(on_safe
, -ESHUTDOWN
);
991 ceph_assert(m_lock
.is_locked());
992 if (m_op_events
.count(op_tid
) != 0) {
993 lderr(cct
) << ": duplicate op tid detected: " << op_tid
<< dendl
;
995 // on_ready is already async but on failure invoke on_safe async
997 on_ready
->complete(0);
998 m_image_ctx
.op_work_queue
->queue(on_safe
, -EINVAL
);
1002 ++m_in_flight_op_events
;
1003 *op_event
= &m_op_events
[op_tid
];
1004 (*op_event
)->on_start_safe
= on_safe
;
1006 Context
*on_op_complete
= new C_OpOnComplete(this, op_tid
);
1007 (*op_event
)->on_op_complete
= on_op_complete
;
1008 return on_op_complete
;
1011 template <typename I
>
1012 void Replay
<I
>::handle_op_complete(uint64_t op_tid
, int r
) {
1013 CephContext
*cct
= m_image_ctx
.cct
;
1014 ldout(cct
, 20) << ": op_tid=" << op_tid
<< ", "
1015 << "r=" << r
<< dendl
;
1018 bool shutting_down
= false;
1020 Mutex::Locker
locker(m_lock
);
1021 auto op_it
= m_op_events
.find(op_tid
);
1022 ceph_assert(op_it
!= m_op_events
.end());
1024 op_event
= std::move(op_it
->second
);
1025 m_op_events
.erase(op_it
);
1028 ceph_assert(m_flush_ctx
!= nullptr);
1029 shutting_down
= true;
1033 ceph_assert(op_event
.on_start_ready
== nullptr || (r
< 0 && r
!= -ERESTART
));
1034 if (op_event
.on_start_ready
!= nullptr) {
1035 // blocking op event failed before it became ready
1036 ceph_assert(op_event
.on_finish_ready
== nullptr &&
1037 op_event
.on_finish_safe
== nullptr);
1039 op_event
.on_start_ready
->complete(0);
1041 // event kicked off by OpFinishEvent
1042 ceph_assert((op_event
.on_finish_ready
!= nullptr &&
1043 op_event
.on_finish_safe
!= nullptr) || shutting_down
);
1046 if (op_event
.on_op_finish_event
!= nullptr) {
1047 op_event
.on_op_finish_event
->complete(r
);
1050 if (op_event
.on_finish_ready
!= nullptr) {
1051 op_event
.on_finish_ready
->complete(0);
1054 // filter out errors caused by replay of the same op
1055 if (r
< 0 && op_event
.ignore_error_codes
.count(r
) != 0) {
1059 op_event
.on_start_safe
->complete(r
);
1060 if (op_event
.on_finish_safe
!= nullptr) {
1061 op_event
.on_finish_safe
->complete(r
);
1064 // shut down request might have occurred while lock was
1065 // dropped -- handle if pending
1066 Context
*on_flush
= nullptr;
1068 Mutex::Locker
locker(m_lock
);
1069 ceph_assert(m_in_flight_op_events
> 0);
1070 --m_in_flight_op_events
;
1071 if (m_in_flight_op_events
== 0 &&
1072 (m_in_flight_aio_flush
+ m_in_flight_aio_modify
) == 0) {
1073 on_flush
= m_flush_ctx
;
1076 if (on_flush
!= nullptr) {
1077 m_image_ctx
.op_work_queue
->queue(on_flush
, 0);
1081 template <typename I
>
1083 Replay
<I
>::create_aio_modify_completion(Context
*on_ready
,
1085 io::aio_type_t aio_type
,
1086 bool *flush_required
,
1087 std::set
<int> &&filters
) {
1088 Mutex::Locker
locker(m_lock
);
1089 CephContext
*cct
= m_image_ctx
.cct
;
1090 ceph_assert(m_on_aio_ready
== nullptr);
1093 ldout(cct
, 5) << ": ignoring event after shut down" << dendl
;
1094 on_ready
->complete(0);
1095 m_image_ctx
.op_work_queue
->queue(on_safe
, -ESHUTDOWN
);
1099 ++m_in_flight_aio_modify
;
1101 bool writeback_cache_enabled
= m_image_ctx
.is_writeback_cache_enabled();
1102 if (writeback_cache_enabled
) {
1103 m_aio_modify_unsafe_contexts
.push_back(on_safe
);
1106 // FLUSH if we hit the low-water mark -- on_safe contexts are
1107 // completed by flushes-only so that we don't move the journal
1108 // commit position until safely on-disk
1110 *flush_required
= (writeback_cache_enabled
&&
1111 m_aio_modify_unsafe_contexts
.size() ==
1112 IN_FLIGHT_IO_LOW_WATER_MARK
);
1113 if (*flush_required
) {
1114 ldout(cct
, 10) << ": hit AIO replay low-water mark: scheduling flush"
1118 // READY for more events if:
1119 // * not at high-water mark for IO
1120 // * in-flight ops are at a consistent point (snap create has IO flushed,
1121 // shrink has adjusted clip boundary, etc) -- should have already been
1122 // flagged not-ready
1123 if (m_in_flight_aio_modify
== IN_FLIGHT_IO_HIGH_WATER_MARK
) {
1124 ldout(cct
, 10) << ": hit AIO replay high-water mark: pausing replay"
1126 ceph_assert(m_on_aio_ready
== nullptr);
1127 std::swap(m_on_aio_ready
, on_ready
);
1130 // when the modification is ACKed by librbd, we can process the next
1131 // event. when flushed, the completion of the next flush will fire the
1133 auto aio_comp
= io::AioCompletion::create_and_start
<Context
>(
1134 new C_AioModifyComplete(this, on_ready
, on_safe
, std::move(filters
),
1135 writeback_cache_enabled
),
1136 util::get_image_ctx(&m_image_ctx
), aio_type
);
1140 template <typename I
>
1141 io::AioCompletion
*Replay
<I
>::create_aio_flush_completion(Context
*on_safe
) {
1142 ceph_assert(m_lock
.is_locked());
1144 CephContext
*cct
= m_image_ctx
.cct
;
1146 ldout(cct
, 5) << ": ignoring event after shut down" << dendl
;
1147 if (on_safe
!= nullptr) {
1148 m_image_ctx
.op_work_queue
->queue(on_safe
, -ESHUTDOWN
);
1153 ++m_in_flight_aio_flush
;
1155 // associate all prior write/discard ops to this flush request
1156 auto aio_comp
= io::AioCompletion::create_and_start
<Context
>(
1157 new C_AioFlushComplete(this, on_safe
,
1158 std::move(m_aio_modify_unsafe_contexts
)),
1159 util::get_image_ctx(&m_image_ctx
), io::AIO_TYPE_FLUSH
);
1160 m_aio_modify_unsafe_contexts
.clear();
1164 template <typename I
>
1165 bool Replay
<I
>::clipped_io(uint64_t image_offset
, io::AioCompletion
*aio_comp
) {
1166 CephContext
*cct
= m_image_ctx
.cct
;
1168 m_image_ctx
.snap_lock
.get_read();
1169 size_t image_size
= m_image_ctx
.size
;
1170 m_image_ctx
.snap_lock
.put_read();
1172 if (image_offset
>= image_size
) {
1173 // rbd-mirror image sync might race an IO event w/ associated resize between
1174 // the point the peer is registered and the sync point is created, so no-op
1175 // IO events beyond the current image extents since under normal conditions
1176 // it wouldn't have been recorded in the journal
1177 ldout(cct
, 5) << ": no-op IO event beyond image size" << dendl
;
1179 aio_comp
->unblock();
1187 } // namespace journal
1188 } // namespace librbd
1190 template class librbd::journal::Replay
<librbd::ImageCtx
>;