1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "librbd/journal/Replay.h"
5 #include "common/dout.h"
6 #include "common/errno.h"
7 #include "common/WorkQueue.h"
8 #include "librbd/ExclusiveLock.h"
9 #include "librbd/ImageCtx.h"
10 #include "librbd/ImageState.h"
11 #include "librbd/internal.h"
12 #include "librbd/Operations.h"
13 #include "librbd/Utils.h"
14 #include "librbd/io/AioCompletion.h"
15 #include "librbd/io/ImageRequest.h"
17 #define dout_subsys ceph_subsys_rbd
19 #define dout_prefix *_dout << "librbd::journal::Replay: " << this << " "
26 static const uint64_t IN_FLIGHT_IO_LOW_WATER_MARK(32);
27 static const uint64_t IN_FLIGHT_IO_HIGH_WATER_MARK(64);
29 static NoOpProgressContext no_op_progress_callback
;
31 template <typename I
, typename E
>
32 struct ExecuteOp
: public Context
{
35 Context
*on_op_complete
;
37 ExecuteOp(I
&image_ctx
, const E
&event
, Context
*on_op_complete
)
38 : image_ctx(image_ctx
), event(event
), on_op_complete(on_op_complete
) {
41 void execute(const journal::SnapCreateEvent
&_
) {
42 image_ctx
.operations
->execute_snap_create(event
.snap_namespace
,
48 void execute(const journal::SnapRemoveEvent
&_
) {
49 image_ctx
.operations
->execute_snap_remove(event
.snap_namespace
,
54 void execute(const journal::SnapRenameEvent
&_
) {
55 image_ctx
.operations
->execute_snap_rename(event
.snap_id
,
60 void execute(const journal::SnapProtectEvent
&_
) {
61 image_ctx
.operations
->execute_snap_protect(event
.snap_namespace
,
66 void execute(const journal::SnapUnprotectEvent
&_
) {
67 image_ctx
.operations
->execute_snap_unprotect(event
.snap_namespace
,
72 void execute(const journal::SnapRollbackEvent
&_
) {
73 image_ctx
.operations
->execute_snap_rollback(event
.snap_namespace
,
75 no_op_progress_callback
,
79 void execute(const journal::RenameEvent
&_
) {
80 image_ctx
.operations
->execute_rename(event
.image_name
,
84 void execute(const journal::ResizeEvent
&_
) {
85 image_ctx
.operations
->execute_resize(event
.size
, true, no_op_progress_callback
,
86 on_op_complete
, event
.op_tid
);
89 void execute(const journal::FlattenEvent
&_
) {
90 image_ctx
.operations
->execute_flatten(no_op_progress_callback
,
94 void execute(const journal::SnapLimitEvent
&_
) {
95 image_ctx
.operations
->execute_snap_set_limit(event
.limit
, on_op_complete
);
98 void execute(const journal::UpdateFeaturesEvent
&_
) {
99 image_ctx
.operations
->execute_update_features(event
.features
, event
.enabled
,
100 on_op_complete
, event
.op_tid
);
103 void execute(const journal::MetadataSetEvent
&_
) {
104 image_ctx
.operations
->execute_metadata_set(event
.key
, event
.value
,
108 void execute(const journal::MetadataRemoveEvent
&_
) {
109 image_ctx
.operations
->execute_metadata_remove(event
.key
, on_op_complete
);
112 void finish(int r
) override
{
113 CephContext
*cct
= image_ctx
.cct
;
115 lderr(cct
) << ": ExecuteOp::" << __func__
<< ": r=" << r
<< dendl
;
116 on_op_complete
->complete(r
);
120 ldout(cct
, 20) << ": ExecuteOp::" << __func__
<< dendl
;
121 RWLock::RLocker
owner_locker(image_ctx
.owner_lock
);
123 if (image_ctx
.exclusive_lock
== nullptr ||
124 !image_ctx
.exclusive_lock
->accept_ops()) {
125 ldout(cct
, 5) << ": lost exclusive lock -- skipping op" << dendl
;
126 on_op_complete
->complete(-ECANCELED
);
134 template <typename I
>
135 struct C_RefreshIfRequired
: public Context
{
139 C_RefreshIfRequired(I
&image_ctx
, Context
*on_finish
)
140 : image_ctx(image_ctx
), on_finish(on_finish
) {
142 ~C_RefreshIfRequired() override
{
146 void finish(int r
) override
{
147 CephContext
*cct
= image_ctx
.cct
;
148 Context
*ctx
= on_finish
;
152 lderr(cct
) << ": C_RefreshIfRequired::" << __func__
<< ": r=" << r
<< dendl
;
153 image_ctx
.op_work_queue
->queue(ctx
, r
);
157 if (image_ctx
.state
->is_refresh_required()) {
158 ldout(cct
, 20) << ": C_RefreshIfRequired::" << __func__
<< ": "
159 << "refresh required" << dendl
;
160 image_ctx
.state
->refresh(ctx
);
164 image_ctx
.op_work_queue
->queue(ctx
, 0);
168 } // anonymous namespace
171 #define dout_prefix *_dout << "librbd::journal::Replay: " << this << " " \
174 template <typename I
>
175 Replay
<I
>::Replay(I
&image_ctx
)
176 : m_image_ctx(image_ctx
), m_lock("Replay<I>::m_lock") {
179 template <typename I
>
180 Replay
<I
>::~Replay() {
181 assert(m_in_flight_aio_flush
== 0);
182 assert(m_in_flight_aio_modify
== 0);
183 assert(m_aio_modify_unsafe_contexts
.empty());
184 assert(m_aio_modify_safe_contexts
.empty());
185 assert(m_op_events
.empty());
186 assert(m_in_flight_op_events
== 0);
189 template <typename I
>
190 int Replay
<I
>::decode(bufferlist::iterator
*it
, EventEntry
*event_entry
) {
192 ::decode(*event_entry
, *it
);
193 } catch (const buffer::error
&err
) {
199 template <typename I
>
200 void Replay
<I
>::process(const EventEntry
&event_entry
,
201 Context
*on_ready
, Context
*on_safe
) {
202 CephContext
*cct
= m_image_ctx
.cct
;
203 ldout(cct
, 20) << ": on_ready=" << on_ready
<< ", on_safe=" << on_safe
206 on_ready
= util::create_async_context_callback(m_image_ctx
, on_ready
);
208 RWLock::RLocker
owner_lock(m_image_ctx
.owner_lock
);
209 if (m_image_ctx
.exclusive_lock
== nullptr ||
210 !m_image_ctx
.exclusive_lock
->accept_ops()) {
211 ldout(cct
, 5) << ": lost exclusive lock -- skipping event" << dendl
;
212 m_image_ctx
.op_work_queue
->queue(on_safe
, -ECANCELED
);
213 on_ready
->complete(0);
217 boost::apply_visitor(EventVisitor(this, on_ready
, on_safe
),
221 template <typename I
>
222 void Replay
<I
>::shut_down(bool cancel_ops
, Context
*on_finish
) {
223 CephContext
*cct
= m_image_ctx
.cct
;
224 ldout(cct
, 20) << dendl
;
226 io::AioCompletion
*flush_comp
= nullptr;
227 on_finish
= util::create_async_context_callback(
228 m_image_ctx
, on_finish
);
231 Mutex::Locker
locker(m_lock
);
233 // safely commit any remaining AIO modify operations
234 if ((m_in_flight_aio_flush
+ m_in_flight_aio_modify
) != 0) {
235 flush_comp
= create_aio_flush_completion(nullptr);
236 assert(flush_comp
!= nullptr);
239 for (auto &op_event_pair
: m_op_events
) {
240 OpEvent
&op_event
= op_event_pair
.second
;
242 // cancel ops that are waiting to start (waiting for
243 // OpFinishEvent or waiting for ready)
244 if (op_event
.on_start_ready
== nullptr &&
245 op_event
.on_op_finish_event
!= nullptr) {
246 Context
*on_op_finish_event
= nullptr;
247 std::swap(on_op_finish_event
, op_event
.on_op_finish_event
);
248 m_image_ctx
.op_work_queue
->queue(on_op_finish_event
, -ERESTART
);
250 } else if (op_event
.on_op_finish_event
!= nullptr) {
251 // start ops waiting for OpFinishEvent
252 Context
*on_op_finish_event
= nullptr;
253 std::swap(on_op_finish_event
, op_event
.on_op_finish_event
);
254 m_image_ctx
.op_work_queue
->queue(on_op_finish_event
, 0);
255 } else if (op_event
.on_start_ready
!= nullptr) {
256 // waiting for op ready
257 op_event_pair
.second
.finish_on_ready
= true;
261 assert(!m_shut_down
);
264 assert(m_flush_ctx
== nullptr);
265 if (m_in_flight_op_events
> 0 || flush_comp
!= nullptr) {
266 std::swap(m_flush_ctx
, on_finish
);
270 // execute the following outside of lock scope
271 if (flush_comp
!= nullptr) {
272 RWLock::RLocker
owner_locker(m_image_ctx
.owner_lock
);
273 io::ImageRequest
<I
>::aio_flush(&m_image_ctx
, flush_comp
, {});
275 if (on_finish
!= nullptr) {
276 on_finish
->complete(0);
280 template <typename I
>
281 void Replay
<I
>::flush(Context
*on_finish
) {
282 io::AioCompletion
*aio_comp
;
284 Mutex::Locker
locker(m_lock
);
285 aio_comp
= create_aio_flush_completion(
286 util::create_async_context_callback(m_image_ctx
, on_finish
));
287 if (aio_comp
== nullptr) {
292 RWLock::RLocker
owner_locker(m_image_ctx
.owner_lock
);
293 io::ImageRequest
<I
>::aio_flush(&m_image_ctx
, aio_comp
, {});
296 template <typename I
>
297 void Replay
<I
>::replay_op_ready(uint64_t op_tid
, Context
*on_resume
) {
298 CephContext
*cct
= m_image_ctx
.cct
;
299 ldout(cct
, 20) << ": op_tid=" << op_tid
<< dendl
;
301 Mutex::Locker
locker(m_lock
);
302 auto op_it
= m_op_events
.find(op_tid
);
303 assert(op_it
!= m_op_events
.end());
305 OpEvent
&op_event
= op_it
->second
;
306 assert(op_event
.op_in_progress
&&
307 op_event
.on_op_finish_event
== nullptr &&
308 op_event
.on_finish_ready
== nullptr &&
309 op_event
.on_finish_safe
== nullptr);
311 // resume processing replay events
312 Context
*on_start_ready
= nullptr;
313 std::swap(on_start_ready
, op_event
.on_start_ready
);
314 on_start_ready
->complete(0);
316 // cancel has been requested -- send error to paused state machine
317 if (!op_event
.finish_on_ready
&& m_flush_ctx
!= nullptr) {
318 m_image_ctx
.op_work_queue
->queue(on_resume
, -ERESTART
);
322 // resume the op state machine once the associated OpFinishEvent
324 op_event
.on_op_finish_event
= new FunctionContext(
326 on_resume
->complete(r
);
329 // shut down request -- don't expect OpFinishEvent
330 if (op_event
.finish_on_ready
) {
331 m_image_ctx
.op_work_queue
->queue(on_resume
, 0);
335 template <typename I
>
336 void Replay
<I
>::handle_event(const journal::AioDiscardEvent
&event
,
337 Context
*on_ready
, Context
*on_safe
) {
338 CephContext
*cct
= m_image_ctx
.cct
;
339 ldout(cct
, 20) << ": AIO discard event" << dendl
;
342 auto aio_comp
= create_aio_modify_completion(on_ready
, on_safe
,
343 io::AIO_TYPE_DISCARD
,
346 if (aio_comp
== nullptr) {
350 io::ImageRequest
<I
>::aio_discard(&m_image_ctx
, aio_comp
, event
.offset
,
351 event
.length
, event
.skip_partial_discard
,
353 if (flush_required
) {
355 auto flush_comp
= create_aio_flush_completion(nullptr);
358 if (flush_comp
!= nullptr) {
359 io::ImageRequest
<I
>::aio_flush(&m_image_ctx
, flush_comp
, {});
364 template <typename I
>
365 void Replay
<I
>::handle_event(const journal::AioWriteEvent
&event
,
366 Context
*on_ready
, Context
*on_safe
) {
367 CephContext
*cct
= m_image_ctx
.cct
;
368 ldout(cct
, 20) << ": AIO write event" << dendl
;
370 bufferlist data
= event
.data
;
372 auto aio_comp
= create_aio_modify_completion(on_ready
, on_safe
,
376 if (aio_comp
== nullptr) {
380 io::ImageRequest
<I
>::aio_write(&m_image_ctx
, aio_comp
,
381 {{event
.offset
, event
.length
}},
382 std::move(data
), 0, {});
383 if (flush_required
) {
385 auto flush_comp
= create_aio_flush_completion(nullptr);
388 if (flush_comp
!= nullptr) {
389 io::ImageRequest
<I
>::aio_flush(&m_image_ctx
, flush_comp
, {});
394 template <typename I
>
395 void Replay
<I
>::handle_event(const journal::AioFlushEvent
&event
,
396 Context
*on_ready
, Context
*on_safe
) {
397 CephContext
*cct
= m_image_ctx
.cct
;
398 ldout(cct
, 20) << ": AIO flush event" << dendl
;
400 io::AioCompletion
*aio_comp
;
402 Mutex::Locker
locker(m_lock
);
403 aio_comp
= create_aio_flush_completion(on_safe
);
406 if (aio_comp
!= nullptr) {
407 io::ImageRequest
<I
>::aio_flush(&m_image_ctx
, aio_comp
, {});
409 on_ready
->complete(0);
412 template <typename I
>
413 void Replay
<I
>::handle_event(const journal::AioWriteSameEvent
&event
,
414 Context
*on_ready
, Context
*on_safe
) {
415 CephContext
*cct
= m_image_ctx
.cct
;
416 ldout(cct
, 20) << ": AIO writesame event" << dendl
;
418 bufferlist data
= event
.data
;
420 auto aio_comp
= create_aio_modify_completion(on_ready
, on_safe
,
421 io::AIO_TYPE_WRITESAME
,
424 if (aio_comp
== nullptr) {
428 io::ImageRequest
<I
>::aio_writesame(&m_image_ctx
, aio_comp
, event
.offset
,
429 event
.length
, std::move(data
), 0, {});
430 if (flush_required
) {
432 auto flush_comp
= create_aio_flush_completion(nullptr);
435 if (flush_comp
!= nullptr) {
436 io::ImageRequest
<I
>::aio_flush(&m_image_ctx
, flush_comp
, {});
441 template <typename I
>
442 void Replay
<I
>::handle_event(const journal::AioCompareAndWriteEvent
&event
,
443 Context
*on_ready
, Context
*on_safe
) {
444 CephContext
*cct
= m_image_ctx
.cct
;
445 ldout(cct
, 20) << ": AIO CompareAndWrite event" << dendl
;
447 bufferlist cmp_data
= event
.cmp_data
;
448 bufferlist write_data
= event
.write_data
;
450 auto aio_comp
= create_aio_modify_completion(on_ready
, on_safe
,
451 io::AIO_TYPE_COMPARE_AND_WRITE
,
454 io::ImageRequest
<I
>::aio_compare_and_write(&m_image_ctx
, aio_comp
,
455 {{event
.offset
, event
.length
}},
457 std::move(write_data
),
459 if (flush_required
) {
461 auto flush_comp
= create_aio_flush_completion(nullptr);
464 io::ImageRequest
<I
>::aio_flush(&m_image_ctx
, flush_comp
, {});
468 template <typename I
>
469 void Replay
<I
>::handle_event(const journal::OpFinishEvent
&event
,
470 Context
*on_ready
, Context
*on_safe
) {
471 CephContext
*cct
= m_image_ctx
.cct
;
472 ldout(cct
, 20) << ": Op finish event: "
473 << "op_tid=" << event
.op_tid
<< dendl
;
477 Context
*on_op_complete
= nullptr;
478 Context
*on_op_finish_event
= nullptr;
480 Mutex::Locker
locker(m_lock
);
481 auto op_it
= m_op_events
.find(event
.op_tid
);
482 if (op_it
== m_op_events
.end()) {
483 ldout(cct
, 10) << ": unable to locate associated op: assuming previously "
484 << "committed." << dendl
;
485 on_ready
->complete(0);
486 m_image_ctx
.op_work_queue
->queue(on_safe
, 0);
490 OpEvent
&op_event
= op_it
->second
;
491 assert(op_event
.on_finish_safe
== nullptr);
492 op_event
.on_finish_ready
= on_ready
;
493 op_event
.on_finish_safe
= on_safe
;
494 op_in_progress
= op_event
.op_in_progress
;
495 std::swap(on_op_complete
, op_event
.on_op_complete
);
496 std::swap(on_op_finish_event
, op_event
.on_op_finish_event
);
498 // special errors which indicate op never started but was recorded
499 // as failed in the journal
500 filter_ret_val
= (op_event
.op_finish_error_codes
.count(event
.r
) != 0);
504 if (op_in_progress
) {
505 // bubble the error up to the in-progress op to cancel it
506 on_op_finish_event
->complete(event
.r
);
508 // op hasn't been started -- bubble the error up since
509 // our image is now potentially in an inconsistent state
510 // since simple errors should have been caught before
511 // creating the op event
512 delete on_op_complete
;
513 delete on_op_finish_event
;
514 handle_op_complete(event
.op_tid
, filter_ret_val
? 0 : event
.r
);
519 // journal recorded success -- apply the op now
520 on_op_finish_event
->complete(0);
523 template <typename I
>
524 void Replay
<I
>::handle_event(const journal::SnapCreateEvent
&event
,
525 Context
*on_ready
, Context
*on_safe
) {
526 CephContext
*cct
= m_image_ctx
.cct
;
527 ldout(cct
, 20) << ": Snap create event" << dendl
;
529 Mutex::Locker
locker(m_lock
);
531 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
533 if (on_op_complete
== nullptr) {
537 // ignore errors caused due to replay
538 op_event
->ignore_error_codes
= {-EEXIST
};
541 m_image_ctx
.op_work_queue
->queue(new C_RefreshIfRequired
<I
>(
542 m_image_ctx
, new ExecuteOp
<I
, journal::SnapCreateEvent
>(m_image_ctx
, event
,
546 // do not process more events until the state machine is ready
547 // since it will affect IO
548 op_event
->op_in_progress
= true;
549 op_event
->on_start_ready
= on_ready
;
552 template <typename I
>
553 void Replay
<I
>::handle_event(const journal::SnapRemoveEvent
&event
,
554 Context
*on_ready
, Context
*on_safe
) {
555 CephContext
*cct
= m_image_ctx
.cct
;
556 ldout(cct
, 20) << ": Snap remove event" << dendl
;
558 Mutex::Locker
locker(m_lock
);
560 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
562 if (on_op_complete
== nullptr) {
566 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
567 m_image_ctx
, new ExecuteOp
<I
, journal::SnapRemoveEvent
>(m_image_ctx
, event
,
570 // ignore errors caused due to replay
571 op_event
->ignore_error_codes
= {-ENOENT
};
573 on_ready
->complete(0);
576 template <typename I
>
577 void Replay
<I
>::handle_event(const journal::SnapRenameEvent
&event
,
578 Context
*on_ready
, Context
*on_safe
) {
579 CephContext
*cct
= m_image_ctx
.cct
;
580 ldout(cct
, 20) << ": Snap rename event" << dendl
;
582 Mutex::Locker
locker(m_lock
);
584 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
586 if (on_op_complete
== nullptr) {
590 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
591 m_image_ctx
, new ExecuteOp
<I
, journal::SnapRenameEvent
>(m_image_ctx
, event
,
594 // ignore errors caused due to replay
595 op_event
->ignore_error_codes
= {-EEXIST
};
597 on_ready
->complete(0);
600 template <typename I
>
601 void Replay
<I
>::handle_event(const journal::SnapProtectEvent
&event
,
602 Context
*on_ready
, Context
*on_safe
) {
603 CephContext
*cct
= m_image_ctx
.cct
;
604 ldout(cct
, 20) << ": Snap protect event" << dendl
;
606 Mutex::Locker
locker(m_lock
);
608 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
610 if (on_op_complete
== nullptr) {
614 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
615 m_image_ctx
, new ExecuteOp
<I
, journal::SnapProtectEvent
>(m_image_ctx
, event
,
618 // ignore errors caused due to replay
619 op_event
->ignore_error_codes
= {-EBUSY
};
621 on_ready
->complete(0);
624 template <typename I
>
625 void Replay
<I
>::handle_event(const journal::SnapUnprotectEvent
&event
,
626 Context
*on_ready
, Context
*on_safe
) {
627 CephContext
*cct
= m_image_ctx
.cct
;
628 ldout(cct
, 20) << ": Snap unprotect event" << dendl
;
630 Mutex::Locker
locker(m_lock
);
632 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
634 if (on_op_complete
== nullptr) {
638 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
639 m_image_ctx
, new ExecuteOp
<I
, journal::SnapUnprotectEvent
>(m_image_ctx
,
643 // ignore errors recorded in the journal
644 op_event
->op_finish_error_codes
= {-EBUSY
};
646 // ignore errors caused due to replay
647 op_event
->ignore_error_codes
= {-EINVAL
};
649 on_ready
->complete(0);
652 template <typename I
>
653 void Replay
<I
>::handle_event(const journal::SnapRollbackEvent
&event
,
654 Context
*on_ready
, Context
*on_safe
) {
655 CephContext
*cct
= m_image_ctx
.cct
;
656 ldout(cct
, 20) << ": Snap rollback start event" << dendl
;
658 Mutex::Locker
locker(m_lock
);
660 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
662 if (on_op_complete
== nullptr) {
666 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
667 m_image_ctx
, new ExecuteOp
<I
, journal::SnapRollbackEvent
>(m_image_ctx
,
671 on_ready
->complete(0);
674 template <typename I
>
675 void Replay
<I
>::handle_event(const journal::RenameEvent
&event
,
676 Context
*on_ready
, Context
*on_safe
) {
677 CephContext
*cct
= m_image_ctx
.cct
;
678 ldout(cct
, 20) << ": Rename event" << dendl
;
680 Mutex::Locker
locker(m_lock
);
682 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
684 if (on_op_complete
== nullptr) {
688 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
689 m_image_ctx
, new ExecuteOp
<I
, journal::RenameEvent
>(m_image_ctx
, event
,
692 // ignore errors caused due to replay
693 op_event
->ignore_error_codes
= {-EEXIST
};
695 on_ready
->complete(0);
698 template <typename I
>
699 void Replay
<I
>::handle_event(const journal::ResizeEvent
&event
,
700 Context
*on_ready
, Context
*on_safe
) {
701 CephContext
*cct
= m_image_ctx
.cct
;
702 ldout(cct
, 20) << ": Resize start event" << dendl
;
704 Mutex::Locker
locker(m_lock
);
706 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
708 if (on_op_complete
== nullptr) {
713 m_image_ctx
.op_work_queue
->queue(new C_RefreshIfRequired
<I
>(
714 m_image_ctx
, new ExecuteOp
<I
, journal::ResizeEvent
>(m_image_ctx
, event
,
715 on_op_complete
)), 0);
717 // do not process more events until the state machine is ready
718 // since it will affect IO
719 op_event
->op_in_progress
= true;
720 op_event
->on_start_ready
= on_ready
;
723 template <typename I
>
724 void Replay
<I
>::handle_event(const journal::FlattenEvent
&event
,
725 Context
*on_ready
, Context
*on_safe
) {
726 CephContext
*cct
= m_image_ctx
.cct
;
727 ldout(cct
, 20) << ": Flatten start event" << dendl
;
729 Mutex::Locker
locker(m_lock
);
731 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
733 if (on_op_complete
== nullptr) {
737 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
738 m_image_ctx
, new ExecuteOp
<I
, journal::FlattenEvent
>(m_image_ctx
, event
,
741 // ignore errors caused due to replay
742 op_event
->ignore_error_codes
= {-EINVAL
};
744 on_ready
->complete(0);
747 template <typename I
>
748 void Replay
<I
>::handle_event(const journal::DemotePromoteEvent
&event
,
749 Context
*on_ready
, Context
*on_safe
) {
750 CephContext
*cct
= m_image_ctx
.cct
;
751 ldout(cct
, 20) << ": Demote/Promote event" << dendl
;
752 on_ready
->complete(0);
753 on_safe
->complete(0);
756 template <typename I
>
757 void Replay
<I
>::handle_event(const journal::SnapLimitEvent
&event
,
758 Context
*on_ready
, Context
*on_safe
) {
759 CephContext
*cct
= m_image_ctx
.cct
;
760 ldout(cct
, 20) << ": Snap limit event" << dendl
;
762 Mutex::Locker
locker(m_lock
);
764 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
766 if (on_op_complete
== nullptr) {
770 op_event
->on_op_finish_event
= new C_RefreshIfRequired
<I
>(
771 m_image_ctx
, new ExecuteOp
<I
, journal::SnapLimitEvent
>(m_image_ctx
,
775 on_ready
->complete(0);
778 template <typename I
>
779 void Replay
<I
>::handle_event(const journal::UpdateFeaturesEvent
&event
,
780 Context
*on_ready
, Context
*on_safe
) {
781 CephContext
*cct
= m_image_ctx
.cct
;
782 ldout(cct
, 20) << ": Update features event" << dendl
;
784 Mutex::Locker
locker(m_lock
);
786 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
788 if (on_op_complete
== nullptr) {
793 m_image_ctx
.op_work_queue
->queue(new C_RefreshIfRequired
<I
>(
794 m_image_ctx
, new ExecuteOp
<I
, journal::UpdateFeaturesEvent
>(
795 m_image_ctx
, event
, on_op_complete
)), 0);
797 // do not process more events until the state machine is ready
798 // since it will affect IO
799 op_event
->op_in_progress
= true;
800 op_event
->on_start_ready
= on_ready
;
803 template <typename I
>
804 void Replay
<I
>::handle_event(const journal::MetadataSetEvent
&event
,
805 Context
*on_ready
, Context
*on_safe
) {
806 CephContext
*cct
= m_image_ctx
.cct
;
807 ldout(cct
, 20) << ": Metadata set event" << dendl
;
809 Mutex::Locker
locker(m_lock
);
811 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
813 if (on_op_complete
== nullptr) {
817 on_op_complete
= new C_RefreshIfRequired
<I
>(m_image_ctx
, on_op_complete
);
818 op_event
->on_op_finish_event
= util::create_async_context_callback(
819 m_image_ctx
, new ExecuteOp
<I
, journal::MetadataSetEvent
>(
820 m_image_ctx
, event
, on_op_complete
));
822 on_ready
->complete(0);
825 template <typename I
>
826 void Replay
<I
>::handle_event(const journal::MetadataRemoveEvent
&event
,
827 Context
*on_ready
, Context
*on_safe
) {
828 CephContext
*cct
= m_image_ctx
.cct
;
829 ldout(cct
, 20) << ": Metadata remove event" << dendl
;
831 Mutex::Locker
locker(m_lock
);
833 Context
*on_op_complete
= create_op_context_callback(event
.op_tid
, on_ready
,
835 if (on_op_complete
== nullptr) {
839 on_op_complete
= new C_RefreshIfRequired
<I
>(m_image_ctx
, on_op_complete
);
840 op_event
->on_op_finish_event
= util::create_async_context_callback(
841 m_image_ctx
, new ExecuteOp
<I
, journal::MetadataRemoveEvent
>(
842 m_image_ctx
, event
, on_op_complete
));
844 // ignore errors caused due to replay
845 op_event
->ignore_error_codes
= {-ENOENT
};
847 on_ready
->complete(0);
850 template <typename I
>
851 void Replay
<I
>::handle_event(const journal::UnknownEvent
&event
,
852 Context
*on_ready
, Context
*on_safe
) {
853 CephContext
*cct
= m_image_ctx
.cct
;
854 ldout(cct
, 20) << ": unknown event" << dendl
;
855 on_ready
->complete(0);
856 on_safe
->complete(0);
859 template <typename I
>
860 void Replay
<I
>::handle_aio_modify_complete(Context
*on_ready
, Context
*on_safe
,
861 int r
, std::set
<int> &filters
,
862 bool writeback_cache_enabled
) {
863 Mutex::Locker
locker(m_lock
);
864 CephContext
*cct
= m_image_ctx
.cct
;
865 ldout(cct
, 20) << ": on_ready=" << on_ready
<< ", "
866 << "on_safe=" << on_safe
<< ", r=" << r
<< dendl
;
868 if (on_ready
!= nullptr) {
869 on_ready
->complete(0);
872 if (filters
.find(r
) != filters
.end())
876 lderr(cct
) << ": AIO modify op failed: " << cpp_strerror(r
) << dendl
;
877 on_safe
->complete(r
);
881 if (writeback_cache_enabled
) {
882 // will be completed after next flush operation completes
883 m_aio_modify_safe_contexts
.insert(on_safe
);
885 // IO is safely stored on disk
886 assert(m_in_flight_aio_modify
> 0);
887 --m_in_flight_aio_modify
;
889 if (m_on_aio_ready
!= nullptr) {
890 ldout(cct
, 10) << ": resuming paused AIO" << dendl
;
891 m_on_aio_ready
->complete(0);
892 m_on_aio_ready
= nullptr;
895 ldout(cct
, 20) << ": completing safe context: " << on_safe
<< dendl
;
896 m_image_ctx
.op_work_queue
->queue(on_safe
, 0);
900 template <typename I
>
901 void Replay
<I
>::handle_aio_flush_complete(Context
*on_flush_safe
,
902 Contexts
&on_safe_ctxs
, int r
) {
903 CephContext
*cct
= m_image_ctx
.cct
;
904 ldout(cct
, 20) << ": r=" << r
<< dendl
;
907 lderr(cct
) << ": AIO flush failed: " << cpp_strerror(r
) << dendl
;
910 Context
*on_aio_ready
= nullptr;
911 Context
*on_flush
= nullptr;
913 Mutex::Locker
locker(m_lock
);
914 assert(m_in_flight_aio_flush
> 0);
915 assert(m_in_flight_aio_modify
>= on_safe_ctxs
.size());
916 --m_in_flight_aio_flush
;
917 m_in_flight_aio_modify
-= on_safe_ctxs
.size();
919 std::swap(on_aio_ready
, m_on_aio_ready
);
920 if (m_in_flight_op_events
== 0 &&
921 (m_in_flight_aio_flush
+ m_in_flight_aio_modify
) == 0) {
922 on_flush
= m_flush_ctx
;
925 // strip out previously failed on_safe contexts
926 for (auto it
= on_safe_ctxs
.begin(); it
!= on_safe_ctxs
.end(); ) {
927 if (m_aio_modify_safe_contexts
.erase(*it
)) {
930 it
= on_safe_ctxs
.erase(it
);
935 if (on_aio_ready
!= nullptr) {
936 ldout(cct
, 10) << ": resuming paused AIO" << dendl
;
937 on_aio_ready
->complete(0);
940 if (on_flush_safe
!= nullptr) {
941 on_safe_ctxs
.push_back(on_flush_safe
);
943 for (auto ctx
: on_safe_ctxs
) {
944 ldout(cct
, 20) << ": completing safe context: " << ctx
<< dendl
;
948 if (on_flush
!= nullptr) {
949 ldout(cct
, 20) << ": completing flush context: " << on_flush
<< dendl
;
950 on_flush
->complete(r
);
954 template <typename I
>
955 Context
*Replay
<I
>::create_op_context_callback(uint64_t op_tid
,
958 OpEvent
**op_event
) {
959 CephContext
*cct
= m_image_ctx
.cct
;
961 ldout(cct
, 5) << ": ignoring event after shut down" << dendl
;
962 on_ready
->complete(0);
963 m_image_ctx
.op_work_queue
->queue(on_safe
, -ESHUTDOWN
);
967 assert(m_lock
.is_locked());
968 if (m_op_events
.count(op_tid
) != 0) {
969 lderr(cct
) << ": duplicate op tid detected: " << op_tid
<< dendl
;
971 // on_ready is already async but on failure invoke on_safe async
973 on_ready
->complete(0);
974 m_image_ctx
.op_work_queue
->queue(on_safe
, -EINVAL
);
978 ++m_in_flight_op_events
;
979 *op_event
= &m_op_events
[op_tid
];
980 (*op_event
)->on_start_safe
= on_safe
;
982 Context
*on_op_complete
= new C_OpOnComplete(this, op_tid
);
983 (*op_event
)->on_op_complete
= on_op_complete
;
984 return on_op_complete
;
987 template <typename I
>
988 void Replay
<I
>::handle_op_complete(uint64_t op_tid
, int r
) {
989 CephContext
*cct
= m_image_ctx
.cct
;
990 ldout(cct
, 20) << ": op_tid=" << op_tid
<< ", "
991 << "r=" << r
<< dendl
;
994 bool shutting_down
= false;
996 Mutex::Locker
locker(m_lock
);
997 auto op_it
= m_op_events
.find(op_tid
);
998 assert(op_it
!= m_op_events
.end());
1000 op_event
= std::move(op_it
->second
);
1001 m_op_events
.erase(op_it
);
1004 assert(m_flush_ctx
!= nullptr);
1005 shutting_down
= true;
1009 assert(op_event
.on_start_ready
== nullptr || (r
< 0 && r
!= -ERESTART
));
1010 if (op_event
.on_start_ready
!= nullptr) {
1011 // blocking op event failed before it became ready
1012 assert(op_event
.on_finish_ready
== nullptr &&
1013 op_event
.on_finish_safe
== nullptr);
1015 op_event
.on_start_ready
->complete(0);
1017 // event kicked off by OpFinishEvent
1018 assert((op_event
.on_finish_ready
!= nullptr &&
1019 op_event
.on_finish_safe
!= nullptr) || shutting_down
);
1022 if (op_event
.on_op_finish_event
!= nullptr) {
1023 op_event
.on_op_finish_event
->complete(r
);
1026 if (op_event
.on_finish_ready
!= nullptr) {
1027 op_event
.on_finish_ready
->complete(0);
1030 // filter out errors caused by replay of the same op
1031 if (r
< 0 && op_event
.ignore_error_codes
.count(r
) != 0) {
1035 op_event
.on_start_safe
->complete(r
);
1036 if (op_event
.on_finish_safe
!= nullptr) {
1037 op_event
.on_finish_safe
->complete(r
);
1040 // shut down request might have occurred while lock was
1041 // dropped -- handle if pending
1042 Context
*on_flush
= nullptr;
1044 Mutex::Locker
locker(m_lock
);
1045 assert(m_in_flight_op_events
> 0);
1046 --m_in_flight_op_events
;
1047 if (m_in_flight_op_events
== 0 &&
1048 (m_in_flight_aio_flush
+ m_in_flight_aio_modify
) == 0) {
1049 on_flush
= m_flush_ctx
;
1052 if (on_flush
!= nullptr) {
1053 m_image_ctx
.op_work_queue
->queue(on_flush
, 0);
1057 template <typename I
>
1059 Replay
<I
>::create_aio_modify_completion(Context
*on_ready
,
1061 io::aio_type_t aio_type
,
1062 bool *flush_required
,
1063 std::set
<int> &&filters
) {
1064 Mutex::Locker
locker(m_lock
);
1065 CephContext
*cct
= m_image_ctx
.cct
;
1066 assert(m_on_aio_ready
== nullptr);
1069 ldout(cct
, 5) << ": ignoring event after shut down" << dendl
;
1070 on_ready
->complete(0);
1071 m_image_ctx
.op_work_queue
->queue(on_safe
, -ESHUTDOWN
);
1075 ++m_in_flight_aio_modify
;
1077 bool writeback_cache_enabled
= m_image_ctx
.is_writeback_cache_enabled();
1078 if (writeback_cache_enabled
) {
1079 m_aio_modify_unsafe_contexts
.push_back(on_safe
);
1082 // FLUSH if we hit the low-water mark -- on_safe contexts are
1083 // completed by flushes-only so that we don't move the journal
1084 // commit position until safely on-disk
1086 *flush_required
= (writeback_cache_enabled
&&
1087 m_aio_modify_unsafe_contexts
.size() ==
1088 IN_FLIGHT_IO_LOW_WATER_MARK
);
1089 if (*flush_required
) {
1090 ldout(cct
, 10) << ": hit AIO replay low-water mark: scheduling flush"
1094 // READY for more events if:
1095 // * not at high-water mark for IO
1096 // * in-flight ops are at a consistent point (snap create has IO flushed,
1097 // shrink has adjusted clip boundary, etc) -- should have already been
1098 // flagged not-ready
1099 if (m_in_flight_aio_modify
== IN_FLIGHT_IO_HIGH_WATER_MARK
) {
1100 ldout(cct
, 10) << ": hit AIO replay high-water mark: pausing replay"
1102 assert(m_on_aio_ready
== nullptr);
1103 std::swap(m_on_aio_ready
, on_ready
);
1106 // when the modification is ACKed by librbd, we can process the next
1107 // event. when flushed, the completion of the next flush will fire the
1109 auto aio_comp
= io::AioCompletion::create_and_start
<Context
>(
1110 new C_AioModifyComplete(this, on_ready
, on_safe
, std::move(filters
),
1111 writeback_cache_enabled
),
1112 util::get_image_ctx(&m_image_ctx
), aio_type
);
1116 template <typename I
>
1117 io::AioCompletion
*Replay
<I
>::create_aio_flush_completion(Context
*on_safe
) {
1118 assert(m_lock
.is_locked());
1120 CephContext
*cct
= m_image_ctx
.cct
;
1122 ldout(cct
, 5) << ": ignoring event after shut down" << dendl
;
1123 if (on_safe
!= nullptr) {
1124 m_image_ctx
.op_work_queue
->queue(on_safe
, -ESHUTDOWN
);
1129 ++m_in_flight_aio_flush
;
1131 // associate all prior write/discard ops to this flush request
1132 auto aio_comp
= io::AioCompletion::create_and_start
<Context
>(
1133 new C_AioFlushComplete(this, on_safe
,
1134 std::move(m_aio_modify_unsafe_contexts
)),
1135 util::get_image_ctx(&m_image_ctx
), io::AIO_TYPE_FLUSH
);
1136 m_aio_modify_unsafe_contexts
.clear();
1140 } // namespace journal
1141 } // namespace librbd
1143 template class librbd::journal::Replay
<librbd::ImageCtx
>;