1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "librbd/journal/Replay.h"
5 #include "common/dout.h"
6 #include "common/errno.h"
7 #include "common/WorkQueue.h"
8 #include "librbd/ExclusiveLock.h"
9 #include "librbd/ImageCtx.h"
10 #include "librbd/ImageState.h"
11 #include "librbd/internal.h"
12 #include "librbd/Operations.h"
13 #include "librbd/Utils.h"
14 #include "librbd/io/AioCompletion.h"
15 #include "librbd/io/ImageRequest.h"
17 #define dout_subsys ceph_subsys_rbd
19 #define dout_prefix *_dout << "librbd::journal::Replay: " << this << " "
26 static const uint64_t IN_FLIGHT_IO_LOW_WATER_MARK(32);
27 static const uint64_t IN_FLIGHT_IO_HIGH_WATER_MARK(64);
29 static NoOpProgressContext no_op_progress_callback
;
31 template <typename I
, typename E
>
32 struct ExecuteOp
: public Context
{
35 Context
*on_op_complete
;
37 ExecuteOp(I
&image_ctx
, const E
&event
, Context
*on_op_complete
)
38 : image_ctx(image_ctx
), event(event
), on_op_complete(on_op_complete
) {
41 void execute(const journal::SnapCreateEvent
&_
) {
42 image_ctx
.operations
->execute_snap_create(event
.snap_namespace
,
48 void execute(const journal::SnapRemoveEvent
&_
) {
49 image_ctx
.operations
->execute_snap_remove(event
.snap_namespace
,
54 void execute(const journal::SnapRenameEvent
&_
) {
55 image_ctx
.operations
->execute_snap_rename(event
.snap_id
,
60 void execute(const journal::SnapProtectEvent
&_
) {
61 image_ctx
.operations
->execute_snap_protect(event
.snap_namespace
,
66 void execute(const journal::SnapUnprotectEvent
&_
) {
67 image_ctx
.operations
->execute_snap_unprotect(event
.snap_namespace
,
72 void execute(const journal::SnapRollbackEvent
&_
) {
73 image_ctx
.operations
->execute_snap_rollback(event
.snap_namespace
,
75 no_op_progress_callback
,
79 void execute(const journal::RenameEvent
&_
) {
80 image_ctx
.operations
->execute_rename(event
.image_name
,
84 void execute(const journal::ResizeEvent
&_
) {
85 image_ctx
.operations
->execute_resize(event
.size
, true, no_op_progress_callback
,
86 on_op_complete
, event
.op_tid
);
89 void execute(const journal::FlattenEvent
&_
) {
90 image_ctx
.operations
->execute_flatten(no_op_progress_callback
,
94 void execute(const journal::SnapLimitEvent
&_
) {
95 image_ctx
.operations
->execute_snap_set_limit(event
.limit
, on_op_complete
);
98 void execute(const journal::UpdateFeaturesEvent
&_
) {
99 image_ctx
.operations
->execute_update_features(event
.features
, event
.enabled
,
100 on_op_complete
, event
.op_tid
);
103 void execute(const journal::MetadataSetEvent
&_
) {
104 image_ctx
.operations
->execute_metadata_set(event
.key
, event
.value
,
108 void execute(const journal::MetadataRemoveEvent
&_
) {
109 image_ctx
.operations
->execute_metadata_remove(event
.key
, on_op_complete
);
112 void finish(int r
) override
{
113 CephContext
*cct
= image_ctx
.cct
;
115 lderr(cct
) << ": ExecuteOp::" << __func__
<< ": r=" << r
<< dendl
;
116 on_op_complete
->complete(r
);
120 ldout(cct
, 20) << ": ExecuteOp::" << __func__
<< dendl
;
121 RWLock::RLocker
owner_locker(image_ctx
.owner_lock
);
123 if (image_ctx
.exclusive_lock
== nullptr ||
124 !image_ctx
.exclusive_lock
->accept_ops()) {
125 ldout(cct
, 5) << ": lost exclusive lock -- skipping op" << dendl
;
126 on_op_complete
->complete(-ECANCELED
);
134 template <typename I
>
135 struct C_RefreshIfRequired
: public Context
{
139 C_RefreshIfRequired(I
&image_ctx
, Context
*on_finish
)
140 : image_ctx(image_ctx
), on_finish(on_finish
) {
142 ~C_RefreshIfRequired() override
{
146 void finish(int r
) override
{
147 CephContext
*cct
= image_ctx
.cct
;
148 Context
*ctx
= on_finish
;
152 lderr(cct
) << ": C_RefreshIfRequired::" << __func__
<< ": r=" << r
<< dendl
;
153 image_ctx
.op_work_queue
->queue(ctx
, r
);
157 if (image_ctx
.state
->is_refresh_required()) {
158 ldout(cct
, 20) << ": C_RefreshIfRequired::" << __func__
<< ": "
159 << "refresh required" << dendl
;
160 image_ctx
.state
->refresh(ctx
);
164 image_ctx
.op_work_queue
->queue(ctx
, 0);
168 } // anonymous namespace
171 #define dout_prefix *_dout << "librbd::journal::Replay: " << this << " " \
174 template <typename I
>
175 Replay
<I
>::Replay(I
&image_ctx
)
176 : m_image_ctx(image_ctx
), m_lock("Replay<I>::m_lock") {
179 template <typename I
>
180 Replay
<I
>::~Replay() {
181 assert(m_in_flight_aio_flush
== 0);
182 assert(m_in_flight_aio_modify
== 0);
183 assert(m_aio_modify_unsafe_contexts
.empty());
184 assert(m_aio_modify_safe_contexts
.empty());
185 assert(m_op_events
.empty());
186 assert(m_in_flight_op_events
== 0);
189 template <typename I
>
190 int Replay
<I
>::decode(bufferlist::iterator
*it
, EventEntry
*event_entry
) {
192 ::decode(*event_entry
, *it
);
193 } catch (const buffer::error
&err
) {
199 template <typename I
>
200 void Replay
<I
>::process(const EventEntry
&event_entry
,
201 Context
*on_ready
, Context
*on_safe
) {
202 CephContext
*cct
= m_image_ctx
.cct
;
203 ldout(cct
, 20) << ": on_ready=" << on_ready
<< ", on_safe=" << on_safe
206 on_ready
= util::create_async_context_callback(m_image_ctx
, on_ready
);
208 RWLock::RLocker
owner_lock(m_image_ctx
.owner_lock
);
209 if (m_image_ctx
.exclusive_lock
== nullptr ||
210 !m_image_ctx
.exclusive_lock
->accept_ops()) {
211 ldout(cct
, 5) << ": lost exclusive lock -- skipping event" << dendl
;
212 m_image_ctx
.op_work_queue
->queue(on_safe
, -ECANCELED
);
213 on_ready
->complete(0);
217 boost::apply_visitor(EventVisitor(this, on_ready
, on_safe
),
221 template <typename I
>
222 void Replay
<I
>::shut_down(bool cancel_ops
, Context
*on_finish
) {
223 CephContext
*cct
= m_image_ctx
.cct
;
224 ldout(cct
, 20) << dendl
;
226 io::AioCompletion
*flush_comp
= nullptr;
227 on_finish
= util::create_async_context_callback(
228 m_image_ctx
, on_finish
);
231 Mutex::Locker
locker(m_lock
);
233 // safely commit any remaining AIO modify operations
234 if ((m_in_flight_aio_flush
+ m_in_flight_aio_modify
) != 0) {
235 flush_comp
= create_aio_flush_completion(nullptr);
236 assert(flush_comp
!= nullptr);
239 for (auto &op_event_pair
: m_op_events
) {
240 OpEvent
&op_event
= op_event_pair
.second
;
242 // cancel ops that are waiting to start (waiting for
243 // OpFinishEvent or waiting for ready)
244 if (op_event
.on_start_ready
== nullptr &&
245 op_event
.on_op_finish_event
!= nullptr) {
246 Context
*on_op_finish_event
= nullptr;
247 std::swap(on_op_finish_event
, op_event
.on_op_finish_event
);
248 m_image_ctx
.op_work_queue
->queue(on_op_finish_event
, -ERESTART
);
250 } else if (op_event
.on_op_finish_event
!= nullptr) {
251 // start ops waiting for OpFinishEvent
252 Context
*on_op_finish_event
= nullptr;
253 std::swap(on_op_finish_event
, op_event
.on_op_finish_event
);
254 m_image_ctx
.op_work_queue
->queue(on_op_finish_event
, 0);
255 } else if (op_event
.on_start_ready
!= nullptr) {
256 // waiting for op ready
257 op_event_pair
.second
.finish_on_ready
= true;
261 assert(!m_shut_down
);
264 assert(m_flush_ctx
== nullptr);
265 if (m_in_flight_op_events
> 0 || flush_comp
!= nullptr) {
266 std::swap(m_flush_ctx
, on_finish
);
270 // execute the following outside of lock scope
271 if (flush_comp
!= nullptr) {
272 RWLock::RLocker
owner_locker(m_image_ctx
.owner_lock
);
273 io::ImageRequest
<I
>::aio_flush(&m_image_ctx
, flush_comp
, {});
275 if (on_finish
!= nullptr) {
276 on_finish
->complete(0);
280 template <typename I
>
281 void Replay
<I
>::flush(Context
*on_finish
) {
282 io::AioCompletion
*aio_comp
;
284 Mutex::Locker
locker(m_lock
);
285 aio_comp
= create_aio_flush_completion(
286 util::create_async_context_callback(m_image_ctx
, on_finish
));
287 if (aio_comp
== nullptr) {
292 RWLock::RLocker
owner_locker(m_image_ctx
.owner_lock
);
293 io::ImageRequest
<I
>::aio_flush(&m_image_ctx
, aio_comp
, {});
296 template <typename I
>
297 void Replay
<I
>::replay_op_ready(uint64_t op_tid
, Context
*on_resume
) {
298 CephContext
*cct
= m_image_ctx
.cct
;
299 ldout(cct
, 20) << ": op_tid=" << op_tid
<< dendl
;
301 Mutex::Locker
locker(m_lock
);
302 auto op_it
= m_op_events
.find(op_tid
);
303 assert(op_it
!= m_op_events
.end());
305 OpEvent
&op_event
= op_it
->second
;
306 assert(op_event
.op_in_progress
&&
307 op_event
.on_op_finish_event
== nullptr &&
308 op_event
.on_finish_ready
== nullptr &&
309 op_event
.on_finish_safe
== nullptr);
311 // resume processing replay events
312 Context
*on_start_ready
= nullptr;
313 std::swap(on_start_ready
, op_event
.on_start_ready
);
314 on_start_ready
->complete(0);
316 // cancel has been requested -- send error to paused state machine
317 if (!op_event
.finish_on_ready
&& m_flush_ctx
!= nullptr) {
318 m_image_ctx
.op_work_queue
->queue(on_resume
, -ERESTART
);
322 // resume the op state machine once the associated OpFinishEvent
324 op_event
.on_op_finish_event
= new FunctionContext(
326 on_resume
->complete(r
);
329 // shut down request -- don't expect OpFinishEvent
330 if (op_event
.finish_on_ready
) {
331 m_image_ctx
.op_work_queue
->queue(on_resume
, 0);
335 template <typename I
>
336 void Replay
<I
>::handle_event(const journal::AioDiscardEvent
&event
,
337 Context
*on_ready
, Context
*on_safe
) {
338 CephContext
*cct
= m_image_ctx
.cct
;
339 ldout(cct
, 20) << ": AIO discard event" << dendl
;
342 auto aio_comp
= create_aio_modify_completion(on_ready
, on_safe
,
343 io::AIO_TYPE_DISCARD
,
345 if (aio_comp
== nullptr) {
349 io::ImageRequest
<I
>::aio_discard(&m_image_ctx
, aio_comp
, event
.offset
,
350 event
.length
, event
.skip_partial_discard
,
352 if (flush_required
) {
354 auto flush_comp
= create_aio_flush_completion(nullptr);
357 if (flush_comp
!= nullptr) {
358 io::ImageRequest
<I
>::aio_flush(&m_image_ctx
, flush_comp
, {});
363 template <typename I
>
364 void Replay
<I
>::handle_event(const journal::AioWriteEvent
&event
,
365 Context
*on_ready
, Context
*on_safe
) {
366 CephContext
*cct
= m_image_ctx
.cct
;
367 ldout(cct
, 20) << ": AIO write event" << dendl
;
369 bufferlist data
= event
.data
;
371 auto aio_comp
= create_aio_modify_completion(on_ready
, on_safe
,
374 if (aio_comp
== nullptr) {
378 io::ImageRequest
<I
>::aio_write(&m_image_ctx
, aio_comp
,
379 {{event
.offset
, event
.length
}},
380 std::move(data
), 0, {});
381 if (flush_required
) {
383 auto flush_comp
= create_aio_flush_completion(nullptr);
386 if (flush_comp
!= nullptr) {
387 io::ImageRequest
<I
>::aio_flush(&m_image_ctx
, flush_comp
, {});
392 template <typename I
>
393 void Replay
<I
>::handle_event(const journal::AioFlushEvent
&event
,
394 Context
*on_ready
, Context
*on_safe
) {
395 CephContext
*cct
= m_image_ctx
.cct
;
396 ldout(cct
, 20) << ": AIO flush event" << dendl
;
398 io::AioCompletion
*aio_comp
;
400 Mutex::Locker
locker(m_lock
);
401 aio_comp
= create_aio_flush_completion(on_safe
);
404 if (aio_comp
!= nullptr) {
405 io::ImageRequest
<I
>::aio_flush(&m_image_ctx
, aio_comp
, {});
407 on_ready
->complete(0);
410 template <typename I
>
411 void Replay
<I
>::handle_event(const journal::AioWriteSameEvent
&event
,
412 Context
*on_ready
, Context
*on_safe
) {
413 CephContext
*cct
= m_image_ctx
.cct
;
414 ldout(cct
, 20) << ": AIO writesame event" << dendl
;
416 bufferlist data
= event
.data
;
418 auto aio_comp
= create_aio_modify_completion(on_ready
, on_safe
,
419 io::AIO_TYPE_WRITESAME
,
421 if (aio_comp
== nullptr) {
425 io::ImageRequest
<I
>::aio_writesame(&m_image_ctx
, aio_comp
, event
.offset
,
426 event
.length
, std::move(data
), 0, {});
427 if (flush_required
) {
429 auto flush_comp
= create_aio_flush_completion(nullptr);
432 if (flush_comp
!= nullptr) {
433 io::ImageRequest
<I
>::aio_flush(&m_image_ctx
, flush_comp
, {});
438 template <typename I
>
439 void Replay
<I
>::handle_event(const journal::OpFinishEvent
&event
,
440 Context
*on_ready
, Context
*on_safe
) {
441 CephContext
*cct
= m_image_ctx
.cct
;
442 ldout(cct
, 20) << ": Op finish event: "
443 << "op_tid=" << event
.op_tid
<< dendl
;
447 Context
*on_op_complete
= nullptr;
448 Context
*on_op_finish_event
= nullptr;
450 Mutex::Locker
locker(m_lock
);
451 auto op_it
= m_op_events
.find(event
.op_tid
);
452 if (op_it
== m_op_events
.end()) {
453 ldout(cct
, 10) << ": unable to locate associated op: assuming previously "
454 << "committed." << dendl
;
455 on_ready
->complete(0);
456 m_image_ctx
.op_work_queue
->queue(on_safe
, 0);
460 OpEvent
&op_event
= op_it
->second
;
461 assert(op_event
.on_finish_safe
== nullptr);
462 op_event
.on_finish_ready
= on_ready
;
463 op_event
.on_finish_safe
= on_safe
;
464 op_in_progress
= op_event
.op_in_progress
;
465 std::swap(on_op_complete
, op_event
.on_op_complete
);
466 std::swap(on_op_finish_event
, op_event
.on_op_finish_event
);
468 // special errors which indicate op never started but was recorded
469 // as failed in the journal
470 filter_ret_val
= (op_event
.op_finish_error_codes
.count(event
.r
) != 0);
474 if (op_in_progress
) {
475 // bubble the error up to the in-progress op to cancel it
476 on_op_finish_event
->complete(event
.r
);
478 // op hasn't been started -- bubble the error up since
479 // our image is now potentially in an inconsistent state
480 // since simple errors should have been caught before
481 // creating the op event
482 delete on_op_complete
;
483 delete on_op_finish_event
;
484 handle_op_complete(event
.op_tid
, filter_ret_val
? 0 : event
.r
);
489 // journal recorded success -- apply the op now
490 on_op_finish_event
->complete(0);
493 template <typename I
>
494 void Replay
<I
>::handle_event(const journal::SnapCreateEvent
&event
,
495 Context
*on_ready
, Context
*on_safe
) {
496 CephContext
*cct
= m_image_ctx
.cct
;
497 ldout(cct
, 20) << ": Snap create event" << dendl
;
499 Mutex::Locker
locker(m_lock
);
501 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
503 if (on_op_complete
== nullptr) {
507 // ignore errors caused due to replay
508 op_event
->ignore_error_codes
= {-EEXIST
};
511 m_image_ctx
.op_work_queue
->queue(new C_RefreshIfRequired
<I
>(
512 m_image_ctx
, new ExecuteOp
<I
, journal::SnapCreateEvent
>(m_image_ctx
, event
,
516 // do not process more events until the state machine is ready
517 // since it will affect IO
518 op_event
->op_in_progress
= true;
519 op_event
->on_start_ready
= on_ready
;
522 template <typename I
>
523 void Replay
<I
>::handle_event(const journal::SnapRemoveEvent
&event
,
524 Context
*on_ready
, Context
*on_safe
) {
525 CephContext
*cct
= m_image_ctx
.cct
;
526 ldout(cct
, 20) << ": Snap remove event" << dendl
;
528 Mutex::Locker
locker(m_lock
);
530 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
532 if (on_op_complete
== nullptr) {
536 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
537 m_image_ctx
, new ExecuteOp
<I
, journal::SnapRemoveEvent
>(m_image_ctx
, event
,
540 // ignore errors caused due to replay
541 op_event
->ignore_error_codes
= {-ENOENT
};
543 on_ready
->complete(0);
546 template <typename I
>
547 void Replay
<I
>::handle_event(const journal::SnapRenameEvent
&event
,
548 Context
*on_ready
, Context
*on_safe
) {
549 CephContext
*cct
= m_image_ctx
.cct
;
550 ldout(cct
, 20) << ": Snap rename event" << dendl
;
552 Mutex::Locker
locker(m_lock
);
554 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
556 if (on_op_complete
== nullptr) {
560 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
561 m_image_ctx
, new ExecuteOp
<I
, journal::SnapRenameEvent
>(m_image_ctx
, event
,
564 // ignore errors caused due to replay
565 op_event
->ignore_error_codes
= {-EEXIST
};
567 on_ready
->complete(0);
570 template <typename I
>
571 void Replay
<I
>::handle_event(const journal::SnapProtectEvent
&event
,
572 Context
*on_ready
, Context
*on_safe
) {
573 CephContext
*cct
= m_image_ctx
.cct
;
574 ldout(cct
, 20) << ": Snap protect event" << dendl
;
576 Mutex::Locker
locker(m_lock
);
578 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
580 if (on_op_complete
== nullptr) {
584 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
585 m_image_ctx
, new ExecuteOp
<I
, journal::SnapProtectEvent
>(m_image_ctx
, event
,
588 // ignore errors caused due to replay
589 op_event
->ignore_error_codes
= {-EBUSY
};
591 on_ready
->complete(0);
594 template <typename I
>
595 void Replay
<I
>::handle_event(const journal::SnapUnprotectEvent
&event
,
596 Context
*on_ready
, Context
*on_safe
) {
597 CephContext
*cct
= m_image_ctx
.cct
;
598 ldout(cct
, 20) << ": Snap unprotect event" << dendl
;
600 Mutex::Locker
locker(m_lock
);
602 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
604 if (on_op_complete
== nullptr) {
608 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
609 m_image_ctx
, new ExecuteOp
<I
, journal::SnapUnprotectEvent
>(m_image_ctx
,
613 // ignore errors recorded in the journal
614 op_event
->op_finish_error_codes
= {-EBUSY
};
616 // ignore errors caused due to replay
617 op_event
->ignore_error_codes
= {-EINVAL
};
619 on_ready
->complete(0);
622 template <typename I
>
623 void Replay
<I
>::handle_event(const journal::SnapRollbackEvent
&event
,
624 Context
*on_ready
, Context
*on_safe
) {
625 CephContext
*cct
= m_image_ctx
.cct
;
626 ldout(cct
, 20) << ": Snap rollback start event" << dendl
;
628 Mutex::Locker
locker(m_lock
);
630 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
632 if (on_op_complete
== nullptr) {
636 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
637 m_image_ctx
, new ExecuteOp
<I
, journal::SnapRollbackEvent
>(m_image_ctx
,
641 on_ready
->complete(0);
644 template <typename I
>
645 void Replay
<I
>::handle_event(const journal::RenameEvent
&event
,
646 Context
*on_ready
, Context
*on_safe
) {
647 CephContext
*cct
= m_image_ctx
.cct
;
648 ldout(cct
, 20) << ": Rename event" << dendl
;
650 Mutex::Locker
locker(m_lock
);
652 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
654 if (on_op_complete
== nullptr) {
658 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
659 m_image_ctx
, new ExecuteOp
<I
, journal::RenameEvent
>(m_image_ctx
, event
,
662 // ignore errors caused due to replay
663 op_event
->ignore_error_codes
= {-EEXIST
};
665 on_ready
->complete(0);
668 template <typename I
>
669 void Replay
<I
>::handle_event(const journal::ResizeEvent
&event
,
670 Context
*on_ready
, Context
*on_safe
) {
671 CephContext
*cct
= m_image_ctx
.cct
;
672 ldout(cct
, 20) << ": Resize start event" << dendl
;
674 Mutex::Locker
locker(m_lock
);
676 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
678 if (on_op_complete
== nullptr) {
683 m_image_ctx
.op_work_queue
->queue(new C_RefreshIfRequired
<I
>(
684 m_image_ctx
, new ExecuteOp
<I
, journal::ResizeEvent
>(m_image_ctx
, event
,
685 on_op_complete
)), 0);
687 // do not process more events until the state machine is ready
688 // since it will affect IO
689 op_event
->op_in_progress
= true;
690 op_event
->on_start_ready
= on_ready
;
693 template <typename I
>
694 void Replay
<I
>::handle_event(const journal::FlattenEvent
&event
,
695 Context
*on_ready
, Context
*on_safe
) {
696 CephContext
*cct
= m_image_ctx
.cct
;
697 ldout(cct
, 20) << ": Flatten start event" << dendl
;
699 Mutex::Locker
locker(m_lock
);
701 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
703 if (on_op_complete
== nullptr) {
707 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
708 m_image_ctx
, new ExecuteOp
<I
, journal::FlattenEvent
>(m_image_ctx
, event
,
711 // ignore errors caused due to replay
712 op_event
->ignore_error_codes
= {-EINVAL
};
714 on_ready
->complete(0);
717 template <typename I
>
718 void Replay
<I
>::handle_event(const journal::DemotePromoteEvent
&event
,
719 Context
*on_ready
, Context
*on_safe
) {
720 CephContext
*cct
= m_image_ctx
.cct
;
721 ldout(cct
, 20) << ": Demote/Promote event" << dendl
;
722 on_ready
->complete(0);
723 on_safe
->complete(0);
726 template <typename I
>
727 void Replay
<I
>::handle_event(const journal::SnapLimitEvent
&event
,
728 Context
*on_ready
, Context
*on_safe
) {
729 CephContext
*cct
= m_image_ctx
.cct
;
730 ldout(cct
, 20) << ": Snap limit event" << dendl
;
732 Mutex::Locker
locker(m_lock
);
734 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
736 if (on_op_complete
== nullptr) {
740 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
741 m_image_ctx
, new ExecuteOp
<I
, journal::SnapLimitEvent
>(m_image_ctx
,
745 on_ready
->complete(0);
748 template <typename I
>
749 void Replay
<I
>::handle_event(const journal::UpdateFeaturesEvent
&event
,
750 Context
*on_ready
, Context
*on_safe
) {
751 CephContext
*cct
= m_image_ctx
.cct
;
752 ldout(cct
, 20) << ": Update features event" << dendl
;
754 Mutex::Locker
locker(m_lock
);
756 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
758 if (on_op_complete
== nullptr) {
763 m_image_ctx
.op_work_queue
->queue(new C_RefreshIfRequired
<I
>(
764 m_image_ctx
, new ExecuteOp
<I
, journal::UpdateFeaturesEvent
>(
765 m_image_ctx
, event
, on_op_complete
)), 0);
767 // do not process more events until the state machine is ready
768 // since it will affect IO
769 op_event
->op_in_progress
= true;
770 op_event
->on_start_ready
= on_ready
;
773 template <typename I
>
774 void Replay
<I
>::handle_event(const journal::MetadataSetEvent
&event
,
775 Context
*on_ready
, Context
*on_safe
) {
776 CephContext
*cct
= m_image_ctx
.cct
;
777 ldout(cct
, 20) << ": Metadata set event" << dendl
;
779 Mutex::Locker
locker(m_lock
);
781 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
783 if (on_op_complete
== nullptr) {
787 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
788 m_image_ctx
, new ExecuteOp
<I
, journal::MetadataSetEvent
>(
789 m_image_ctx
, event
, on_op_complete
));
791 on_ready
->complete(0);
794 template <typename I
>
795 void Replay
<I
>::handle_event(const journal::MetadataRemoveEvent
&event
,
796 Context
*on_ready
, Context
*on_safe
) {
797 CephContext
*cct
= m_image_ctx
.cct
;
798 ldout(cct
, 20) << ": Metadata remove event" << dendl
;
800 Mutex::Locker
locker(m_lock
);
802 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
804 if (on_op_complete
== nullptr) {
808 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
809 m_image_ctx
, new ExecuteOp
<I
, journal::MetadataRemoveEvent
>(
810 m_image_ctx
, event
, on_op_complete
));
812 // ignore errors caused due to replay
813 op_event
->ignore_error_codes
= {-ENOENT
};
815 on_ready
->complete(0);
818 template <typename I
>
819 void Replay
<I
>::handle_event(const journal::UnknownEvent
&event
,
820 Context
*on_ready
, Context
*on_safe
) {
821 CephContext
*cct
= m_image_ctx
.cct
;
822 ldout(cct
, 20) << ": unknown event" << dendl
;
823 on_ready
->complete(0);
824 on_safe
->complete(0);
827 template <typename I
>
828 void Replay
<I
>::handle_aio_modify_complete(Context
*on_ready
, Context
*on_safe
,
830 Mutex::Locker
locker(m_lock
);
831 CephContext
*cct
= m_image_ctx
.cct
;
832 ldout(cct
, 20) << ": on_ready=" << on_ready
<< ", "
833 << "on_safe=" << on_safe
<< ", r=" << r
<< dendl
;
835 if (on_ready
!= nullptr) {
836 on_ready
->complete(0);
839 lderr(cct
) << ": AIO modify op failed: " << cpp_strerror(r
) << dendl
;
840 on_safe
->complete(r
);
844 // will be completed after next flush operation completes
845 m_aio_modify_safe_contexts
.insert(on_safe
);
848 template <typename I
>
849 void Replay
<I
>::handle_aio_flush_complete(Context
*on_flush_safe
,
850 Contexts
&on_safe_ctxs
, int r
) {
851 CephContext
*cct
= m_image_ctx
.cct
;
852 ldout(cct
, 20) << ": r=" << r
<< dendl
;
855 lderr(cct
) << ": AIO flush failed: " << cpp_strerror(r
) << dendl
;
858 Context
*on_aio_ready
= nullptr;
859 Context
*on_flush
= nullptr;
861 Mutex::Locker
locker(m_lock
);
862 assert(m_in_flight_aio_flush
> 0);
863 assert(m_in_flight_aio_modify
>= on_safe_ctxs
.size());
864 --m_in_flight_aio_flush
;
865 m_in_flight_aio_modify
-= on_safe_ctxs
.size();
867 std::swap(on_aio_ready
, m_on_aio_ready
);
868 if (m_in_flight_op_events
== 0 &&
869 (m_in_flight_aio_flush
+ m_in_flight_aio_modify
) == 0) {
870 on_flush
= m_flush_ctx
;
873 // strip out previously failed on_safe contexts
874 for (auto it
= on_safe_ctxs
.begin(); it
!= on_safe_ctxs
.end(); ) {
875 if (m_aio_modify_safe_contexts
.erase(*it
)) {
878 it
= on_safe_ctxs
.erase(it
);
883 if (on_aio_ready
!= nullptr) {
884 ldout(cct
, 10) << ": resuming paused AIO" << dendl
;
885 on_aio_ready
->complete(0);
888 if (on_flush_safe
!= nullptr) {
889 on_safe_ctxs
.push_back(on_flush_safe
);
891 for (auto ctx
: on_safe_ctxs
) {
892 ldout(cct
, 20) << ": completing safe context: " << ctx
<< dendl
;
896 if (on_flush
!= nullptr) {
897 ldout(cct
, 20) << ": completing flush context: " << on_flush
<< dendl
;
898 on_flush
->complete(r
);
902 template <typename I
>
903 Context
*Replay
<I
>::create_op_context_callback(uint64_t op_tid
,
906 OpEvent
**op_event
) {
907 CephContext
*cct
= m_image_ctx
.cct
;
909 ldout(cct
, 5) << ": ignoring event after shut down" << dendl
;
910 on_ready
->complete(0);
911 m_image_ctx
.op_work_queue
->queue(on_safe
, -ESHUTDOWN
);
915 assert(m_lock
.is_locked());
916 if (m_op_events
.count(op_tid
) != 0) {
917 lderr(cct
) << ": duplicate op tid detected: " << op_tid
<< dendl
;
919 // on_ready is already async but on failure invoke on_safe async
921 on_ready
->complete(0);
922 m_image_ctx
.op_work_queue
->queue(on_safe
, -EINVAL
);
926 ++m_in_flight_op_events
;
927 *op_event
= &m_op_events
[op_tid
];
928 (*op_event
)->on_start_safe
= on_safe
;
930 Context
*on_op_complete
= new C_OpOnComplete(this, op_tid
);
931 (*op_event
)->on_op_complete
= on_op_complete
;
932 return on_op_complete
;
935 template <typename I
>
936 void Replay
<I
>::handle_op_complete(uint64_t op_tid
, int r
) {
937 CephContext
*cct
= m_image_ctx
.cct
;
938 ldout(cct
, 20) << ": op_tid=" << op_tid
<< ", "
939 << "r=" << r
<< dendl
;
942 bool shutting_down
= false;
944 Mutex::Locker
locker(m_lock
);
945 auto op_it
= m_op_events
.find(op_tid
);
946 assert(op_it
!= m_op_events
.end());
948 op_event
= std::move(op_it
->second
);
949 m_op_events
.erase(op_it
);
952 assert(m_flush_ctx
!= nullptr);
953 shutting_down
= true;
957 assert(op_event
.on_start_ready
== nullptr || (r
< 0 && r
!= -ERESTART
));
958 if (op_event
.on_start_ready
!= nullptr) {
959 // blocking op event failed before it became ready
960 assert(op_event
.on_finish_ready
== nullptr &&
961 op_event
.on_finish_safe
== nullptr);
963 op_event
.on_start_ready
->complete(0);
965 // event kicked off by OpFinishEvent
966 assert((op_event
.on_finish_ready
!= nullptr &&
967 op_event
.on_finish_safe
!= nullptr) || shutting_down
);
970 if (op_event
.on_op_finish_event
!= nullptr) {
971 op_event
.on_op_finish_event
->complete(r
);
974 if (op_event
.on_finish_ready
!= nullptr) {
975 op_event
.on_finish_ready
->complete(0);
978 // filter out errors caused by replay of the same op
979 if (r
< 0 && op_event
.ignore_error_codes
.count(r
) != 0) {
983 op_event
.on_start_safe
->complete(r
);
984 if (op_event
.on_finish_safe
!= nullptr) {
985 op_event
.on_finish_safe
->complete(r
);
988 // shut down request might have occurred while lock was
989 // dropped -- handle if pending
990 Context
*on_flush
= nullptr;
992 Mutex::Locker
locker(m_lock
);
993 assert(m_in_flight_op_events
> 0);
994 --m_in_flight_op_events
;
995 if (m_in_flight_op_events
== 0 &&
996 (m_in_flight_aio_flush
+ m_in_flight_aio_modify
) == 0) {
997 on_flush
= m_flush_ctx
;
1000 if (on_flush
!= nullptr) {
1001 m_image_ctx
.op_work_queue
->queue(on_flush
, 0);
1005 template <typename I
>
1007 Replay
<I
>::create_aio_modify_completion(Context
*on_ready
, Context
*on_safe
,
1008 io::aio_type_t aio_type
,
1009 bool *flush_required
) {
1010 Mutex::Locker
locker(m_lock
);
1011 CephContext
*cct
= m_image_ctx
.cct
;
1012 assert(m_on_aio_ready
== nullptr);
1015 ldout(cct
, 5) << ": ignoring event after shut down" << dendl
;
1016 on_ready
->complete(0);
1017 m_image_ctx
.op_work_queue
->queue(on_safe
, -ESHUTDOWN
);
1021 ++m_in_flight_aio_modify
;
1022 m_aio_modify_unsafe_contexts
.push_back(on_safe
);
1024 // FLUSH if we hit the low-water mark -- on_safe contexts are
1025 // completed by flushes-only so that we don't move the journal
1026 // commit position until safely on-disk
1028 *flush_required
= (m_aio_modify_unsafe_contexts
.size() ==
1029 IN_FLIGHT_IO_LOW_WATER_MARK
);
1030 if (*flush_required
) {
1031 ldout(cct
, 10) << ": hit AIO replay low-water mark: scheduling flush"
1035 // READY for more events if:
1036 // * not at high-water mark for IO
1037 // * in-flight ops are at a consistent point (snap create has IO flushed,
1038 // shrink has adjusted clip boundary, etc) -- should have already been
1039 // flagged not-ready
1040 if (m_in_flight_aio_modify
== IN_FLIGHT_IO_HIGH_WATER_MARK
) {
1041 ldout(cct
, 10) << ": hit AIO replay high-water mark: pausing replay"
1043 assert(m_on_aio_ready
== nullptr);
1044 std::swap(m_on_aio_ready
, on_ready
);
1047 // when the modification is ACKed by librbd, we can process the next
1048 // event. when flushed, the completion of the next flush will fire the
1050 auto aio_comp
= io::AioCompletion::create_and_start
<Context
>(
1051 new C_AioModifyComplete(this, on_ready
, on_safe
),
1052 util::get_image_ctx(&m_image_ctx
), aio_type
);
1056 template <typename I
>
1057 io::AioCompletion
*Replay
<I
>::create_aio_flush_completion(Context
*on_safe
) {
1058 assert(m_lock
.is_locked());
1060 CephContext
*cct
= m_image_ctx
.cct
;
1062 ldout(cct
, 5) << ": ignoring event after shut down" << dendl
;
1063 if (on_safe
!= nullptr) {
1064 m_image_ctx
.op_work_queue
->queue(on_safe
, -ESHUTDOWN
);
1069 ++m_in_flight_aio_flush
;
1071 // associate all prior write/discard ops to this flush request
1072 auto aio_comp
= io::AioCompletion::create_and_start
<Context
>(
1073 new C_AioFlushComplete(this, on_safe
,
1074 std::move(m_aio_modify_unsafe_contexts
)),
1075 util::get_image_ctx(&m_image_ctx
), io::AIO_TYPE_FLUSH
);
1076 m_aio_modify_unsafe_contexts
.clear();
1080 } // namespace journal
1081 } // namespace librbd
1083 template class librbd::journal::Replay
<librbd::ImageCtx
>;