1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "librbd/Journal.h"
5 #include "include/rados/librados.hpp"
6 #include "common/errno.h"
7 #include "common/Timer.h"
8 #include "common/WorkQueue.h"
9 #include "cls/journal/cls_journal_types.h"
10 #include "journal/Journaler.h"
11 #include "journal/Policy.h"
12 #include "journal/ReplayEntry.h"
13 #include "journal/Settings.h"
14 #include "journal/Utils.h"
15 #include "librbd/ExclusiveLock.h"
16 #include "librbd/ImageCtx.h"
17 #include "librbd/io/ImageRequestWQ.h"
18 #include "librbd/io/ObjectRequest.h"
19 #include "librbd/journal/CreateRequest.h"
20 #include "librbd/journal/DemoteRequest.h"
21 #include "librbd/journal/OpenRequest.h"
22 #include "librbd/journal/RemoveRequest.h"
23 #include "librbd/journal/Replay.h"
24 #include "librbd/journal/PromoteRequest.h"
26 #include <boost/scope_exit.hpp>
29 #define dout_subsys ceph_subsys_rbd
31 #define dout_prefix *_dout << "librbd::Journal: "
35 using util::create_async_context_callback
;
36 using util::create_context_callback
;
37 using journal::util::C_DecodeTag
;
38 using journal::util::C_DecodeTags
;
42 // TODO: once journaler is 100% async, remove separate threads and
43 // reuse ImageCtx's thread pool
44 class ThreadPoolSingleton
: public ThreadPool
{
46 explicit ThreadPoolSingleton(CephContext
*cct
)
47 : ThreadPool(cct
, "librbd::Journal", "tp_librbd_journ", 1) {
50 ~ThreadPoolSingleton() override
{
56 struct C_IsTagOwner
: public Context
{
57 librados::IoCtx
&io_ctx
;
60 ContextWQ
*op_work_queue
;
63 CephContext
*cct
= nullptr;
65 cls::journal::Client client
;
66 journal::ImageClientMeta client_meta
;
68 journal::TagData tag_data
;
70 C_IsTagOwner(librados::IoCtx
&io_ctx
, const std::string
&image_id
,
71 bool *is_tag_owner
, ContextWQ
*op_work_queue
, Context
*on_finish
)
72 : io_ctx(io_ctx
), image_id(image_id
), is_tag_owner(is_tag_owner
),
73 op_work_queue(op_work_queue
), on_finish(on_finish
),
74 cct(reinterpret_cast<CephContext
*>(io_ctx
.cct())),
75 journaler(new Journaler(io_ctx
, image_id
, Journal
<>::IMAGE_CLIENT_ID
,
79 void finish(int r
) override
{
80 ldout(cct
, 20) << this << " C_IsTagOwner::" << __func__
<< ": r=" << r
83 lderr(cct
) << this << " C_IsTagOwner::" << __func__
<< ": "
84 << "failed to get tag owner: " << cpp_strerror(r
) << dendl
;
86 *is_tag_owner
= (tag_data
.mirror_uuid
== Journal
<>::LOCAL_MIRROR_UUID
);
89 Journaler
*journaler
= this->journaler
;
90 Context
*on_finish
= this->on_finish
;
91 FunctionContext
*ctx
= new FunctionContext(
92 [journaler
, on_finish
](int r
) {
93 on_finish
->complete(r
);
96 op_work_queue
->queue(ctx
, r
);
100 struct C_GetTagOwner
: public Context
{
101 std::string
*mirror_uuid
;
105 cls::journal::Client client
;
106 journal::ImageClientMeta client_meta
;
108 journal::TagData tag_data
;
110 C_GetTagOwner(librados::IoCtx
&io_ctx
, const std::string
&image_id
,
111 std::string
*mirror_uuid
, Context
*on_finish
)
112 : mirror_uuid(mirror_uuid
), on_finish(on_finish
),
113 journaler(io_ctx
, image_id
, Journal
<>::IMAGE_CLIENT_ID
, {}) {
116 virtual void finish(int r
) {
118 *mirror_uuid
= tag_data
.mirror_uuid
;
120 on_finish
->complete(r
);
124 template <typename J
>
125 struct GetTagsRequest
{
128 cls::journal::Client
*client
;
129 journal::ImageClientMeta
*client_meta
;
131 journal::TagData
*tag_data
;
136 GetTagsRequest(CephContext
*cct
, J
*journaler
, cls::journal::Client
*client
,
137 journal::ImageClientMeta
*client_meta
, uint64_t *tag_tid
,
138 journal::TagData
*tag_data
, Context
*on_finish
)
139 : cct(cct
), journaler(journaler
), client(client
), client_meta(client_meta
),
140 tag_tid(tag_tid
), tag_data(tag_data
), on_finish(on_finish
), lock("lock") {
149 * GET_CLIENT * * * * * * * * * * * *
152 * GET_TAGS * * * * * * * * * * * * * (error)
155 * <finish> * * * * * * * * * * * * *
164 void send_get_client() {
165 ldout(cct
, 20) << __func__
<< dendl
;
167 FunctionContext
*ctx
= new FunctionContext(
169 handle_get_client(r
);
171 journaler
->get_client(Journal
<ImageCtx
>::IMAGE_CLIENT_ID
, client
, ctx
);
174 void handle_get_client(int r
) {
175 ldout(cct
, 20) << __func__
<< ": r=" << r
<< dendl
;
182 librbd::journal::ClientData client_data
;
183 bufferlist::iterator bl_it
= client
->data
.begin();
185 ::decode(client_data
, bl_it
);
186 } catch (const buffer::error
&err
) {
187 lderr(cct
) << this << " OpenJournalerRequest::" << __func__
<< ": "
188 << "failed to decode client data" << dendl
;
193 journal::ImageClientMeta
*image_client_meta
=
194 boost::get
<journal::ImageClientMeta
>(&client_data
.client_meta
);
195 if (image_client_meta
== nullptr) {
196 lderr(cct
) << this << " OpenJournalerRequest::" << __func__
<< ": "
197 << "failed to get client meta" << dendl
;
201 *client_meta
= *image_client_meta
;
206 void send_get_tags() {
207 ldout(cct
, 20) << __func__
<< dendl
;
209 FunctionContext
*ctx
= new FunctionContext(
213 C_DecodeTags
*tags_ctx
= new C_DecodeTags(cct
, &lock
, tag_tid
, tag_data
,
215 journaler
->get_tags(client_meta
->tag_class
, &tags_ctx
->tags
, tags_ctx
);
218 void handle_get_tags(int r
) {
219 ldout(cct
, 20) << __func__
<< ": r=" << r
<< dendl
;
224 void complete(int r
) {
225 on_finish
->complete(r
);
230 template <typename J
>
231 void get_tags(CephContext
*cct
, J
*journaler
,
232 cls::journal::Client
*client
,
233 journal::ImageClientMeta
*client_meta
,
234 uint64_t *tag_tid
, journal::TagData
*tag_data
,
235 Context
*on_finish
) {
236 ldout(cct
, 20) << __func__
<< dendl
;
238 GetTagsRequest
<J
> *req
=
239 new GetTagsRequest
<J
>(cct
, journaler
, client
, client_meta
, tag_tid
,
240 tag_data
, on_finish
);
244 template <typename J
>
245 int allocate_journaler_tag(CephContext
*cct
, J
*journaler
,
247 const journal::TagPredecessor
&predecessor
,
248 const std::string
&mirror_uuid
,
249 cls::journal::Tag
*new_tag
) {
250 journal::TagData tag_data
;
251 tag_data
.mirror_uuid
= mirror_uuid
;
252 tag_data
.predecessor
= predecessor
;
255 ::encode(tag_data
, tag_bl
);
257 C_SaferCond allocate_tag_ctx
;
258 journaler
->allocate_tag(tag_class
, tag_bl
, new_tag
, &allocate_tag_ctx
);
260 int r
= allocate_tag_ctx
.wait();
262 lderr(cct
) << __func__
<< ": "
263 << "failed to allocate tag: " << cpp_strerror(r
) << dendl
;
269 } // anonymous namespace
271 // client id for local image
272 template <typename I
>
273 const std::string Journal
<I
>::IMAGE_CLIENT_ID("");
275 // mirror uuid to use for local images
276 template <typename I
>
277 const std::string Journal
<I
>::LOCAL_MIRROR_UUID("");
279 // mirror uuid to use for orphaned (demoted) images
280 template <typename I
>
281 const std::string Journal
<I
>::ORPHAN_MIRROR_UUID("<orphan>");
283 template <typename I
>
284 std::ostream
&operator<<(std::ostream
&os
,
285 const typename Journal
<I
>::State
&state
) {
287 case Journal
<I
>::STATE_UNINITIALIZED
:
288 os
<< "Uninitialized";
290 case Journal
<I
>::STATE_INITIALIZING
:
291 os
<< "Initializing";
293 case Journal
<I
>::STATE_REPLAYING
:
296 case Journal
<I
>::STATE_FLUSHING_RESTART
:
297 os
<< "FlushingRestart";
299 case Journal
<I
>::STATE_RESTARTING_REPLAY
:
300 os
<< "RestartingReplay";
302 case Journal
<I
>::STATE_FLUSHING_REPLAY
:
303 os
<< "FlushingReplay";
305 case Journal
<I
>::STATE_READY
:
308 case Journal
<I
>::STATE_STOPPING
:
311 case Journal
<I
>::STATE_CLOSING
:
314 case Journal
<I
>::STATE_CLOSED
:
318 os
<< "Unknown (" << static_cast<uint32_t>(state
) << ")";
324 template <typename I
>
325 Journal
<I
>::Journal(I
&image_ctx
)
326 : m_image_ctx(image_ctx
), m_journaler(NULL
),
327 m_lock("Journal<I>::m_lock"), m_state(STATE_UNINITIALIZED
),
328 m_error_result(0), m_replay_handler(this), m_close_pending(false),
329 m_event_lock("Journal<I>::m_event_lock"), m_event_tid(0),
330 m_blocking_writes(false), m_journal_replay(NULL
),
331 m_metadata_listener(this) {
333 CephContext
*cct
= m_image_ctx
.cct
;
334 ldout(cct
, 5) << this << ": ictx=" << &m_image_ctx
<< dendl
;
336 ThreadPoolSingleton
*thread_pool_singleton
;
337 cct
->lookup_or_create_singleton_object
<ThreadPoolSingleton
>(
338 thread_pool_singleton
, "librbd::journal::thread_pool");
339 m_work_queue
= new ContextWQ("librbd::journal::work_queue",
340 cct
->_conf
->get_val
<int64_t>("rbd_op_thread_timeout"),
341 thread_pool_singleton
);
342 ImageCtx::get_timer_instance(cct
, &m_timer
, &m_timer_lock
);
345 template <typename I
>
346 Journal
<I
>::~Journal() {
347 if (m_work_queue
!= nullptr) {
348 m_work_queue
->drain();
352 assert(m_state
== STATE_UNINITIALIZED
|| m_state
== STATE_CLOSED
);
353 assert(m_journaler
== NULL
);
354 assert(m_journal_replay
== NULL
);
355 assert(m_wait_for_state_contexts
.empty());
358 template <typename I
>
359 bool Journal
<I
>::is_journal_supported(I
&image_ctx
) {
360 assert(image_ctx
.snap_lock
.is_locked());
361 return ((image_ctx
.features
& RBD_FEATURE_JOURNALING
) &&
362 !image_ctx
.read_only
&& image_ctx
.snap_id
== CEPH_NOSNAP
);
365 template <typename I
>
366 int Journal
<I
>::create(librados::IoCtx
&io_ctx
, const std::string
&image_id
,
367 uint8_t order
, uint8_t splay_width
,
368 const std::string
&object_pool
) {
369 CephContext
*cct
= reinterpret_cast<CephContext
*>(io_ctx
.cct());
370 ldout(cct
, 5) << __func__
<< ": image=" << image_id
<< dendl
;
372 ThreadPool
*thread_pool
;
373 ContextWQ
*op_work_queue
;
374 ImageCtx::get_thread_pool_instance(cct
, &thread_pool
, &op_work_queue
);
377 journal::TagData
tag_data(LOCAL_MIRROR_UUID
);
378 journal::CreateRequest
<I
> *req
= journal::CreateRequest
<I
>::create(
379 io_ctx
, image_id
, order
, splay_width
, object_pool
, cls::journal::Tag::TAG_CLASS_NEW
,
380 tag_data
, IMAGE_CLIENT_ID
, op_work_queue
, &cond
);
386 template <typename I
>
387 int Journal
<I
>::remove(librados::IoCtx
&io_ctx
, const std::string
&image_id
) {
388 CephContext
*cct
= reinterpret_cast<CephContext
*>(io_ctx
.cct());
389 ldout(cct
, 5) << __func__
<< ": image=" << image_id
<< dendl
;
391 ThreadPool
*thread_pool
;
392 ContextWQ
*op_work_queue
;
393 ImageCtx::get_thread_pool_instance(cct
, &thread_pool
, &op_work_queue
);
396 journal::RemoveRequest
<I
> *req
= journal::RemoveRequest
<I
>::create(
397 io_ctx
, image_id
, IMAGE_CLIENT_ID
, op_work_queue
, &cond
);
403 template <typename I
>
404 int Journal
<I
>::reset(librados::IoCtx
&io_ctx
, const std::string
&image_id
) {
405 CephContext
*cct
= reinterpret_cast<CephContext
*>(io_ctx
.cct());
406 ldout(cct
, 5) << __func__
<< ": image=" << image_id
<< dendl
;
408 Journaler
journaler(io_ctx
, image_id
, IMAGE_CLIENT_ID
, {});
411 journaler
.init(&cond
);
412 BOOST_SCOPE_EXIT_ALL(&journaler
) {
413 journaler
.shut_down();
420 lderr(cct
) << __func__
<< ": "
421 << "failed to initialize journal: " << cpp_strerror(r
) << dendl
;
425 uint8_t order
, splay_width
;
427 journaler
.get_metadata(&order
, &splay_width
, &pool_id
);
429 std::string pool_name
;
431 librados::Rados
rados(io_ctx
);
432 r
= rados
.pool_reverse_lookup(pool_id
, &pool_name
);
434 lderr(cct
) << __func__
<< ": "
435 << "failed to lookup data pool: " << cpp_strerror(r
) << dendl
;
441 journaler
.remove(true, &ctx1
);
444 lderr(cct
) << __func__
<< ": "
445 << "failed to reset journal: " << cpp_strerror(r
) << dendl
;
449 r
= create(io_ctx
, image_id
, order
, splay_width
, pool_name
);
451 lderr(cct
) << __func__
<< ": "
452 << "failed to create journal: " << cpp_strerror(r
) << dendl
;
458 template <typename I
>
459 void Journal
<I
>::is_tag_owner(I
*image_ctx
, bool *owner
,
460 Context
*on_finish
) {
461 Journal
<I
>::is_tag_owner(image_ctx
->md_ctx
, image_ctx
->id
, owner
,
462 image_ctx
->op_work_queue
, on_finish
);
465 template <typename I
>
466 void Journal
<I
>::is_tag_owner(librados::IoCtx
& io_ctx
, std::string
& image_id
,
467 bool *is_tag_owner
, ContextWQ
*op_work_queue
,
468 Context
*on_finish
) {
469 CephContext
*cct
= reinterpret_cast<CephContext
*>(io_ctx
.cct());
470 ldout(cct
, 20) << __func__
<< dendl
;
472 C_IsTagOwner
<I
> *is_tag_owner_ctx
= new C_IsTagOwner
<I
>(
473 io_ctx
, image_id
, is_tag_owner
, op_work_queue
, on_finish
);
474 get_tags(cct
, is_tag_owner_ctx
->journaler
, &is_tag_owner_ctx
->client
,
475 &is_tag_owner_ctx
->client_meta
, &is_tag_owner_ctx
->tag_tid
,
476 &is_tag_owner_ctx
->tag_data
, is_tag_owner_ctx
);
479 template <typename I
>
480 void Journal
<I
>::get_tag_owner(IoCtx
& io_ctx
, std::string
& image_id
,
481 std::string
*mirror_uuid
,
482 ContextWQ
*op_work_queue
, Context
*on_finish
) {
483 CephContext
*cct
= (CephContext
*)io_ctx
.cct();
484 ldout(cct
, 20) << __func__
<< dendl
;
486 auto ctx
= new C_GetTagOwner(io_ctx
, image_id
, mirror_uuid
, on_finish
);
487 get_tags(cct
, &ctx
->journaler
, &ctx
->client
, &ctx
->client_meta
, &ctx
->tag_tid
,
488 &ctx
->tag_data
, create_async_context_callback(op_work_queue
, ctx
));
491 template <typename I
>
492 int Journal
<I
>::request_resync(I
*image_ctx
) {
493 CephContext
*cct
= image_ctx
->cct
;
494 ldout(cct
, 20) << __func__
<< dendl
;
496 Journaler
journaler(image_ctx
->md_ctx
, image_ctx
->id
, IMAGE_CLIENT_ID
, {});
499 journal::ImageClientMeta client_meta
;
501 journal::TagData tag_data
;
503 C_SaferCond open_ctx
;
504 auto open_req
= journal::OpenRequest
<I
>::create(image_ctx
, &journaler
, &lock
,
505 &client_meta
, &tag_tid
,
506 &tag_data
, &open_ctx
);
509 BOOST_SCOPE_EXIT_ALL(&journaler
) {
510 journaler
.shut_down();
513 int r
= open_ctx
.wait();
518 client_meta
.resync_requested
= true;
520 journal::ClientData
client_data(client_meta
);
521 bufferlist client_data_bl
;
522 ::encode(client_data
, client_data_bl
);
524 C_SaferCond update_client_ctx
;
525 journaler
.update_client(client_data_bl
, &update_client_ctx
);
527 r
= update_client_ctx
.wait();
529 lderr(cct
) << __func__
<< ": "
530 << "failed to update client: " << cpp_strerror(r
) << dendl
;
536 template <typename I
>
537 void Journal
<I
>::promote(I
*image_ctx
, Context
*on_finish
) {
538 CephContext
*cct
= image_ctx
->cct
;
539 ldout(cct
, 20) << __func__
<< dendl
;
541 auto promote_req
= journal::PromoteRequest
<I
>::create(image_ctx
, false,
546 template <typename I
>
547 void Journal
<I
>::demote(I
*image_ctx
, Context
*on_finish
) {
548 CephContext
*cct
= image_ctx
->cct
;
549 ldout(cct
, 20) << __func__
<< dendl
;
551 auto req
= journal::DemoteRequest
<I
>::create(*image_ctx
, on_finish
);
555 template <typename I
>
556 bool Journal
<I
>::is_journal_ready() const {
557 Mutex::Locker
locker(m_lock
);
558 return (m_state
== STATE_READY
);
561 template <typename I
>
562 bool Journal
<I
>::is_journal_replaying() const {
563 Mutex::Locker
locker(m_lock
);
564 return is_journal_replaying(m_lock
);
567 template <typename I
>
568 bool Journal
<I
>::is_journal_replaying(const Mutex
&) const {
569 assert(m_lock
.is_locked());
570 return (m_state
== STATE_REPLAYING
||
571 m_state
== STATE_FLUSHING_REPLAY
||
572 m_state
== STATE_FLUSHING_RESTART
||
573 m_state
== STATE_RESTARTING_REPLAY
);
576 template <typename I
>
577 bool Journal
<I
>::is_journal_appending() const {
578 assert(m_image_ctx
.snap_lock
.is_locked());
579 Mutex::Locker
locker(m_lock
);
580 return (m_state
== STATE_READY
&&
581 !m_image_ctx
.get_journal_policy()->append_disabled());
584 template <typename I
>
585 void Journal
<I
>::wait_for_journal_ready(Context
*on_ready
) {
586 on_ready
= create_async_context_callback(m_image_ctx
, on_ready
);
588 Mutex::Locker
locker(m_lock
);
589 if (m_state
== STATE_READY
) {
590 on_ready
->complete(m_error_result
);
592 wait_for_steady_state(on_ready
);
596 template <typename I
>
597 void Journal
<I
>::open(Context
*on_finish
) {
598 CephContext
*cct
= m_image_ctx
.cct
;
599 ldout(cct
, 20) << this << " " << __func__
<< dendl
;
601 on_finish
= create_async_context_callback(m_image_ctx
, on_finish
);
603 Mutex::Locker
locker(m_lock
);
604 assert(m_state
== STATE_UNINITIALIZED
);
605 wait_for_steady_state(on_finish
);
609 template <typename I
>
610 void Journal
<I
>::close(Context
*on_finish
) {
611 CephContext
*cct
= m_image_ctx
.cct
;
612 ldout(cct
, 20) << this << " " << __func__
<< dendl
;
614 on_finish
= create_async_context_callback(m_image_ctx
, on_finish
);
616 Mutex::Locker
locker(m_lock
);
617 while (m_listener_notify
) {
618 m_listener_cond
.Wait(m_lock
);
621 Listeners
listeners(m_listeners
);
622 m_listener_notify
= true;
624 for (auto listener
: listeners
) {
625 listener
->handle_close();
629 m_listener_notify
= false;
630 m_listener_cond
.Signal();
632 assert(m_state
!= STATE_UNINITIALIZED
);
633 if (m_state
== STATE_CLOSED
) {
634 on_finish
->complete(m_error_result
);
638 if (m_state
== STATE_READY
) {
642 m_close_pending
= true;
643 wait_for_steady_state(on_finish
);
646 template <typename I
>
647 bool Journal
<I
>::is_tag_owner() const {
648 Mutex::Locker
locker(m_lock
);
649 return is_tag_owner(m_lock
);
652 template <typename I
>
653 bool Journal
<I
>::is_tag_owner(const Mutex
&) const {
654 assert(m_lock
.is_locked());
655 return (m_tag_data
.mirror_uuid
== LOCAL_MIRROR_UUID
);
658 template <typename I
>
659 uint64_t Journal
<I
>::get_tag_tid() const {
660 Mutex::Locker
locker(m_lock
);
664 template <typename I
>
665 journal::TagData Journal
<I
>::get_tag_data() const {
666 Mutex::Locker
locker(m_lock
);
670 template <typename I
>
671 void Journal
<I
>::allocate_local_tag(Context
*on_finish
) {
672 CephContext
*cct
= m_image_ctx
.cct
;
673 ldout(cct
, 20) << this << " " << __func__
<< dendl
;
675 journal::TagPredecessor predecessor
;
676 predecessor
.mirror_uuid
= LOCAL_MIRROR_UUID
;
678 Mutex::Locker
locker(m_lock
);
679 assert(m_journaler
!= nullptr && is_tag_owner(m_lock
));
681 cls::journal::Client client
;
682 int r
= m_journaler
->get_cached_client(IMAGE_CLIENT_ID
, &client
);
684 lderr(cct
) << this << " " << __func__
<< ": "
685 << "failed to retrieve client: " << cpp_strerror(r
) << dendl
;
686 m_image_ctx
.op_work_queue
->queue(on_finish
, r
);
690 // since we are primary, populate the predecessor with our known commit
692 assert(m_tag_data
.mirror_uuid
== LOCAL_MIRROR_UUID
);
693 if (!client
.commit_position
.object_positions
.empty()) {
694 auto position
= client
.commit_position
.object_positions
.front();
695 predecessor
.commit_valid
= true;
696 predecessor
.tag_tid
= position
.tag_tid
;
697 predecessor
.entry_tid
= position
.entry_tid
;
701 allocate_tag(LOCAL_MIRROR_UUID
, predecessor
, on_finish
);
704 template <typename I
>
705 void Journal
<I
>::allocate_tag(const std::string
&mirror_uuid
,
706 const journal::TagPredecessor
&predecessor
,
707 Context
*on_finish
) {
708 CephContext
*cct
= m_image_ctx
.cct
;
709 ldout(cct
, 20) << this << " " << __func__
<< ": mirror_uuid=" << mirror_uuid
712 Mutex::Locker
locker(m_lock
);
713 assert(m_journaler
!= nullptr);
715 journal::TagData tag_data
;
716 tag_data
.mirror_uuid
= mirror_uuid
;
717 tag_data
.predecessor
= predecessor
;
720 ::encode(tag_data
, tag_bl
);
722 C_DecodeTag
*decode_tag_ctx
= new C_DecodeTag(cct
, &m_lock
, &m_tag_tid
,
723 &m_tag_data
, on_finish
);
724 m_journaler
->allocate_tag(m_tag_class
, tag_bl
, &decode_tag_ctx
->tag
,
728 template <typename I
>
729 void Journal
<I
>::flush_commit_position(Context
*on_finish
) {
730 CephContext
*cct
= m_image_ctx
.cct
;
731 ldout(cct
, 20) << this << " " << __func__
<< dendl
;
733 Mutex::Locker
locker(m_lock
);
734 assert(m_journaler
!= nullptr);
735 m_journaler
->flush_commit_position(on_finish
);
738 template <typename I
>
739 uint64_t Journal
<I
>::append_write_event(uint64_t offset
, size_t length
,
740 const bufferlist
&bl
,
741 const IOObjectRequests
&requests
,
743 assert(m_max_append_size
> journal::AioWriteEvent::get_fixed_size());
744 uint64_t max_write_data_size
=
745 m_max_append_size
- journal::AioWriteEvent::get_fixed_size();
747 // ensure that the write event fits within the journal entry
748 Bufferlists bufferlists
;
749 uint64_t bytes_remaining
= length
;
750 uint64_t event_offset
= 0;
752 uint64_t event_length
= MIN(bytes_remaining
, max_write_data_size
);
755 event_bl
.substr_of(bl
, event_offset
, event_length
);
756 journal::EventEntry
event_entry(journal::AioWriteEvent(offset
+ event_offset
,
761 bufferlists
.emplace_back();
762 ::encode(event_entry
, bufferlists
.back());
764 event_offset
+= event_length
;
765 bytes_remaining
-= event_length
;
766 } while (bytes_remaining
> 0);
768 return append_io_events(journal::EVENT_TYPE_AIO_WRITE
, bufferlists
, requests
,
769 offset
, length
, flush_entry
, 0);
772 template <typename I
>
773 uint64_t Journal
<I
>::append_io_event(journal::EventEntry
&&event_entry
,
774 const IOObjectRequests
&requests
,
775 uint64_t offset
, size_t length
,
776 bool flush_entry
, int filter_ret_val
) {
778 event_entry
.timestamp
= ceph_clock_now();
779 ::encode(event_entry
, bl
);
780 return append_io_events(event_entry
.get_event_type(), {bl
}, requests
, offset
,
781 length
, flush_entry
, filter_ret_val
);
784 template <typename I
>
785 uint64_t Journal
<I
>::append_io_events(journal::EventType event_type
,
786 const Bufferlists
&bufferlists
,
787 const IOObjectRequests
&requests
,
788 uint64_t offset
, size_t length
,
789 bool flush_entry
, int filter_ret_val
) {
790 assert(!bufferlists
.empty());
794 Mutex::Locker
locker(m_lock
);
795 assert(m_state
== STATE_READY
);
802 for (auto &bl
: bufferlists
) {
803 assert(bl
.length() <= m_max_append_size
);
804 futures
.push_back(m_journaler
->append(m_tag_tid
, bl
));
808 Mutex::Locker
event_locker(m_event_lock
);
809 m_events
[tid
] = Event(futures
, requests
, offset
, length
, filter_ret_val
);
812 CephContext
*cct
= m_image_ctx
.cct
;
813 ldout(cct
, 20) << this << " " << __func__
<< ": "
814 << "event=" << event_type
<< ", "
815 << "new_reqs=" << requests
.size() << ", "
816 << "offset=" << offset
<< ", "
817 << "length=" << length
<< ", "
818 << "flush=" << flush_entry
<< ", tid=" << tid
<< dendl
;
820 Context
*on_safe
= create_async_context_callback(
821 m_image_ctx
, new C_IOEventSafe(this, tid
));
823 futures
.back().flush(on_safe
);
825 futures
.back().wait(on_safe
);
831 template <typename I
>
832 void Journal
<I
>::commit_io_event(uint64_t tid
, int r
) {
833 CephContext
*cct
= m_image_ctx
.cct
;
834 ldout(cct
, 20) << this << " " << __func__
<< ": tid=" << tid
<< ", "
837 Mutex::Locker
event_locker(m_event_lock
);
838 typename
Events::iterator it
= m_events
.find(tid
);
839 if (it
== m_events
.end()) {
842 complete_event(it
, r
);
845 template <typename I
>
846 void Journal
<I
>::commit_io_event_extent(uint64_t tid
, uint64_t offset
,
847 uint64_t length
, int r
) {
850 CephContext
*cct
= m_image_ctx
.cct
;
851 ldout(cct
, 20) << this << " " << __func__
<< ": tid=" << tid
<< ", "
852 << "offset=" << offset
<< ", "
853 << "length=" << length
<< ", "
854 << "r=" << r
<< dendl
;
856 Mutex::Locker
event_locker(m_event_lock
);
857 typename
Events::iterator it
= m_events
.find(tid
);
858 if (it
== m_events
.end()) {
862 Event
&event
= it
->second
;
863 if (event
.ret_val
== 0 && r
< 0) {
867 ExtentInterval extent
;
868 extent
.insert(offset
, length
);
870 ExtentInterval intersect
;
871 intersect
.intersection_of(extent
, event
.pending_extents
);
873 event
.pending_extents
.subtract(intersect
);
874 if (!event
.pending_extents
.empty()) {
875 ldout(cct
, 20) << this << " " << __func__
<< ": "
876 << "pending extents: " << event
.pending_extents
<< dendl
;
879 complete_event(it
, event
.ret_val
);
882 template <typename I
>
883 void Journal
<I
>::append_op_event(uint64_t op_tid
,
884 journal::EventEntry
&&event_entry
,
886 assert(m_image_ctx
.owner_lock
.is_locked());
889 event_entry
.timestamp
= ceph_clock_now();
890 ::encode(event_entry
, bl
);
894 Mutex::Locker
locker(m_lock
);
895 assert(m_state
== STATE_READY
);
897 future
= m_journaler
->append(m_tag_tid
, bl
);
899 // delay committing op event to ensure consistent replay
900 assert(m_op_futures
.count(op_tid
) == 0);
901 m_op_futures
[op_tid
] = future
;
904 on_safe
= create_async_context_callback(m_image_ctx
, on_safe
);
905 on_safe
= new FunctionContext([this, on_safe
](int r
) {
906 // ensure all committed IO before this op is committed
907 m_journaler
->flush_commit_position(on_safe
);
909 future
.flush(on_safe
);
911 CephContext
*cct
= m_image_ctx
.cct
;
912 ldout(cct
, 10) << this << " " << __func__
<< ": "
913 << "op_tid=" << op_tid
<< ", "
914 << "event=" << event_entry
.get_event_type() << dendl
;
917 template <typename I
>
918 void Journal
<I
>::commit_op_event(uint64_t op_tid
, int r
, Context
*on_safe
) {
919 CephContext
*cct
= m_image_ctx
.cct
;
920 ldout(cct
, 10) << this << " " << __func__
<< ": op_tid=" << op_tid
<< ", "
921 << "r=" << r
<< dendl
;
923 journal::EventEntry
event_entry((journal::OpFinishEvent(op_tid
, r
)),
927 ::encode(event_entry
, bl
);
929 Future op_start_future
;
930 Future op_finish_future
;
932 Mutex::Locker
locker(m_lock
);
933 assert(m_state
== STATE_READY
);
935 // ready to commit op event
936 auto it
= m_op_futures
.find(op_tid
);
937 assert(it
!= m_op_futures
.end());
938 op_start_future
= it
->second
;
939 m_op_futures
.erase(it
);
941 op_finish_future
= m_journaler
->append(m_tag_tid
, bl
);
944 op_finish_future
.flush(create_async_context_callback(
945 m_image_ctx
, new C_OpEventSafe(this, op_tid
, op_start_future
,
946 op_finish_future
, on_safe
)));
949 template <typename I
>
950 void Journal
<I
>::replay_op_ready(uint64_t op_tid
, Context
*on_resume
) {
951 CephContext
*cct
= m_image_ctx
.cct
;
952 ldout(cct
, 10) << this << " " << __func__
<< ": op_tid=" << op_tid
<< dendl
;
955 Mutex::Locker
locker(m_lock
);
956 assert(m_journal_replay
!= nullptr);
957 m_journal_replay
->replay_op_ready(op_tid
, on_resume
);
961 template <typename I
>
962 void Journal
<I
>::flush_event(uint64_t tid
, Context
*on_safe
) {
963 CephContext
*cct
= m_image_ctx
.cct
;
964 ldout(cct
, 20) << this << " " << __func__
<< ": tid=" << tid
<< ", "
965 << "on_safe=" << on_safe
<< dendl
;
969 Mutex::Locker
event_locker(m_event_lock
);
970 future
= wait_event(m_lock
, tid
, on_safe
);
973 if (future
.is_valid()) {
974 future
.flush(nullptr);
978 template <typename I
>
979 void Journal
<I
>::wait_event(uint64_t tid
, Context
*on_safe
) {
980 CephContext
*cct
= m_image_ctx
.cct
;
981 ldout(cct
, 20) << this << " " << __func__
<< ": tid=" << tid
<< ", "
982 << "on_safe=" << on_safe
<< dendl
;
984 Mutex::Locker
event_locker(m_event_lock
);
985 wait_event(m_lock
, tid
, on_safe
);
988 template <typename I
>
989 typename Journal
<I
>::Future Journal
<I
>::wait_event(Mutex
&lock
, uint64_t tid
,
991 assert(m_event_lock
.is_locked());
992 CephContext
*cct
= m_image_ctx
.cct
;
994 typename
Events::iterator it
= m_events
.find(tid
);
995 assert(it
!= m_events
.end());
997 Event
&event
= it
->second
;
999 // journal entry already safe
1000 ldout(cct
, 20) << this << " " << __func__
<< ": "
1001 << "journal entry already safe" << dendl
;
1002 m_image_ctx
.op_work_queue
->queue(on_safe
, event
.ret_val
);
1006 event
.on_safe_contexts
.push_back(create_async_context_callback(m_image_ctx
,
1008 return event
.futures
.back();
1011 template <typename I
>
1012 void Journal
<I
>::start_external_replay(journal::Replay
<I
> **journal_replay
,
1013 Context
*on_start
) {
1014 CephContext
*cct
= m_image_ctx
.cct
;
1015 ldout(cct
, 20) << this << " " << __func__
<< dendl
;
1017 Mutex::Locker
locker(m_lock
);
1018 assert(m_state
== STATE_READY
);
1019 assert(m_journal_replay
== nullptr);
1021 on_start
= util::create_async_context_callback(m_image_ctx
, on_start
);
1022 on_start
= new FunctionContext(
1023 [this, journal_replay
, on_start
](int r
) {
1024 handle_start_external_replay(r
, journal_replay
, on_start
);
1027 // safely flush all in-flight events before starting external replay
1028 m_journaler
->stop_append(util::create_async_context_callback(m_image_ctx
,
1032 template <typename I
>
1033 void Journal
<I
>::handle_start_external_replay(int r
,
1034 journal::Replay
<I
> **journal_replay
,
1035 Context
*on_finish
) {
1036 CephContext
*cct
= m_image_ctx
.cct
;
1037 ldout(cct
, 20) << this << " " << __func__
<< dendl
;
1039 Mutex::Locker
locker(m_lock
);
1040 assert(m_state
== STATE_READY
);
1041 assert(m_journal_replay
== nullptr);
1044 lderr(cct
) << this << " " << __func__
<< ": "
1045 << "failed to stop recording: " << cpp_strerror(r
) << dendl
;
1046 *journal_replay
= nullptr;
1048 // get back to a sane-state
1050 on_finish
->complete(r
);
1054 transition_state(STATE_REPLAYING
, 0);
1055 m_journal_replay
= journal::Replay
<I
>::create(m_image_ctx
);
1056 *journal_replay
= m_journal_replay
;
1057 on_finish
->complete(0);
1060 template <typename I
>
1061 void Journal
<I
>::stop_external_replay() {
1062 CephContext
*cct
= m_image_ctx
.cct
;
1063 ldout(cct
, 20) << this << " " << __func__
<< dendl
;
1065 Mutex::Locker
locker(m_lock
);
1066 assert(m_journal_replay
!= nullptr);
1067 assert(m_state
== STATE_REPLAYING
);
1069 delete m_journal_replay
;
1070 m_journal_replay
= nullptr;
1072 if (m_close_pending
) {
1073 destroy_journaler(0);
1080 template <typename I
>
1081 void Journal
<I
>::create_journaler() {
1082 CephContext
*cct
= m_image_ctx
.cct
;
1083 ldout(cct
, 20) << this << " " << __func__
<< dendl
;
1085 assert(m_lock
.is_locked());
1086 assert(m_state
== STATE_UNINITIALIZED
|| m_state
== STATE_RESTARTING_REPLAY
);
1087 assert(m_journaler
== NULL
);
1089 transition_state(STATE_INITIALIZING
, 0);
1090 ::journal::Settings settings
;
1091 settings
.commit_interval
= m_image_ctx
.journal_commit_age
;
1092 settings
.max_payload_bytes
= m_image_ctx
.journal_max_payload_bytes
;
1093 settings
.max_concurrent_object_sets
=
1094 m_image_ctx
.journal_max_concurrent_object_sets
;
1095 // TODO: a configurable filter to exclude certain peers from being
1097 settings
.whitelisted_laggy_clients
= {IMAGE_CLIENT_ID
};
1099 m_journaler
= new Journaler(m_work_queue
, m_timer
, m_timer_lock
,
1100 m_image_ctx
.md_ctx
, m_image_ctx
.id
,
1101 IMAGE_CLIENT_ID
, settings
);
1102 m_journaler
->add_listener(&m_metadata_listener
);
1104 Context
*ctx
= create_async_context_callback(
1105 m_image_ctx
, create_context_callback
<
1106 Journal
<I
>, &Journal
<I
>::handle_open
>(this));
1107 auto open_req
= journal::OpenRequest
<I
>::create(&m_image_ctx
, m_journaler
,
1108 &m_lock
, &m_client_meta
,
1109 &m_tag_tid
, &m_tag_data
, ctx
);
1113 template <typename I
>
1114 void Journal
<I
>::destroy_journaler(int r
) {
1115 CephContext
*cct
= m_image_ctx
.cct
;
1116 ldout(cct
, 20) << this << " " << __func__
<< ": r=" << r
<< dendl
;
1118 assert(m_lock
.is_locked());
1120 delete m_journal_replay
;
1121 m_journal_replay
= NULL
;
1123 m_journaler
->remove_listener(&m_metadata_listener
);
1125 transition_state(STATE_CLOSING
, r
);
1127 Context
*ctx
= create_async_context_callback(
1128 m_image_ctx
, create_context_callback
<
1129 Journal
<I
>, &Journal
<I
>::handle_journal_destroyed
>(this));
1130 ctx
= new FunctionContext(
1131 [this, ctx
](int r
) {
1132 Mutex::Locker
locker(m_lock
);
1133 m_journaler
->shut_down(ctx
);
1135 m_async_journal_op_tracker
.wait(m_image_ctx
, ctx
);
1138 template <typename I
>
1139 void Journal
<I
>::recreate_journaler(int r
) {
1140 CephContext
*cct
= m_image_ctx
.cct
;
1141 ldout(cct
, 20) << this << " " << __func__
<< ": r=" << r
<< dendl
;
1143 assert(m_lock
.is_locked());
1144 assert(m_state
== STATE_FLUSHING_RESTART
||
1145 m_state
== STATE_FLUSHING_REPLAY
);
1147 delete m_journal_replay
;
1148 m_journal_replay
= NULL
;
1150 m_journaler
->remove_listener(&m_metadata_listener
);
1152 transition_state(STATE_RESTARTING_REPLAY
, r
);
1153 m_journaler
->shut_down(create_async_context_callback(
1154 m_image_ctx
, create_context_callback
<
1155 Journal
<I
>, &Journal
<I
>::handle_journal_destroyed
>(this)));
1158 template <typename I
>
1159 void Journal
<I
>::complete_event(typename
Events::iterator it
, int r
) {
1160 assert(m_event_lock
.is_locked());
1161 assert(m_state
== STATE_READY
);
1163 CephContext
*cct
= m_image_ctx
.cct
;
1164 ldout(cct
, 20) << this << " " << __func__
<< ": tid=" << it
->first
<< " "
1165 << "r=" << r
<< dendl
;
1167 Event
&event
= it
->second
;
1168 if (r
< 0 && r
== event
.filter_ret_val
) {
1169 // ignore allowed error codes
1173 // event recorded to journal but failed to update disk, we cannot
1174 // commit this IO event. this event must be replayed.
1176 lderr(cct
) << this << " " << __func__
<< ": "
1177 << "failed to commit IO to disk, replay required: "
1178 << cpp_strerror(r
) << dendl
;
1181 event
.committed_io
= true;
1184 for (auto &future
: event
.futures
) {
1185 m_journaler
->committed(future
);
1192 template <typename I
>
1193 void Journal
<I
>::start_append() {
1194 assert(m_lock
.is_locked());
1195 m_journaler
->start_append(m_image_ctx
.journal_object_flush_interval
,
1196 m_image_ctx
.journal_object_flush_bytes
,
1197 m_image_ctx
.journal_object_flush_age
);
1198 transition_state(STATE_READY
, 0);
1201 template <typename I
>
1202 void Journal
<I
>::handle_open(int r
) {
1203 CephContext
*cct
= m_image_ctx
.cct
;
1204 ldout(cct
, 20) << this << " " << __func__
<< ": r=" << r
<< dendl
;
1206 Mutex::Locker
locker(m_lock
);
1207 assert(m_state
== STATE_INITIALIZING
);
1210 lderr(cct
) << this << " " << __func__
<< ": "
1211 << "failed to initialize journal: " << cpp_strerror(r
)
1213 destroy_journaler(r
);
1217 m_tag_class
= m_client_meta
.tag_class
;
1218 m_max_append_size
= m_journaler
->get_max_append_size();
1219 ldout(cct
, 20) << this << " " << __func__
<< ": "
1220 << "tag_class=" << m_tag_class
<< ", "
1221 << "max_append_size=" << m_max_append_size
<< dendl
;
1223 transition_state(STATE_REPLAYING
, 0);
1224 m_journal_replay
= journal::Replay
<I
>::create(m_image_ctx
);
1225 m_journaler
->start_replay(&m_replay_handler
);
1228 template <typename I
>
1229 void Journal
<I
>::handle_replay_ready() {
1230 CephContext
*cct
= m_image_ctx
.cct
;
1231 ReplayEntry replay_entry
;
1233 Mutex::Locker
locker(m_lock
);
1234 if (m_state
!= STATE_REPLAYING
) {
1238 ldout(cct
, 20) << this << " " << __func__
<< dendl
;
1239 if (!m_journaler
->try_pop_front(&replay_entry
)) {
1243 // only one entry should be in-flight at a time
1244 assert(!m_processing_entry
);
1245 m_processing_entry
= true;
1248 bufferlist data
= replay_entry
.get_data();
1249 bufferlist::iterator it
= data
.begin();
1251 journal::EventEntry event_entry
;
1252 int r
= m_journal_replay
->decode(&it
, &event_entry
);
1254 lderr(cct
) << this << " " << __func__
<< ": "
1255 << "failed to decode journal event entry" << dendl
;
1256 handle_replay_process_safe(replay_entry
, r
);
1260 Context
*on_ready
= create_context_callback
<
1261 Journal
<I
>, &Journal
<I
>::handle_replay_process_ready
>(this);
1262 Context
*on_commit
= new C_ReplayProcessSafe(this, std::move(replay_entry
));
1263 m_journal_replay
->process(event_entry
, on_ready
, on_commit
);
1266 template <typename I
>
1267 void Journal
<I
>::handle_replay_complete(int r
) {
1268 CephContext
*cct
= m_image_ctx
.cct
;
1270 bool cancel_ops
= false;
1272 Mutex::Locker
locker(m_lock
);
1273 if (m_state
!= STATE_REPLAYING
) {
1277 ldout(cct
, 20) << this << " " << __func__
<< ": r=" << r
<< dendl
;
1280 transition_state(STATE_FLUSHING_RESTART
, r
);
1282 // state might change back to FLUSHING_RESTART on flush error
1283 transition_state(STATE_FLUSHING_REPLAY
, 0);
1287 Context
*ctx
= new FunctionContext([this, cct
](int r
) {
1288 ldout(cct
, 20) << this << " handle_replay_complete: "
1289 << "handle shut down replay" << dendl
;
1293 Mutex::Locker
locker(m_lock
);
1294 assert(m_state
== STATE_FLUSHING_RESTART
||
1295 m_state
== STATE_FLUSHING_REPLAY
);
1299 if (state
== STATE_FLUSHING_RESTART
) {
1300 handle_flushing_restart(0);
1302 handle_flushing_replay();
1305 ctx
= new FunctionContext([this, cct
, cancel_ops
, ctx
](int r
) {
1306 ldout(cct
, 20) << this << " handle_replay_complete: "
1307 << "shut down replay" << dendl
;
1308 m_journal_replay
->shut_down(cancel_ops
, ctx
);
1310 m_journaler
->stop_replay(ctx
);
1313 template <typename I
>
1314 void Journal
<I
>::handle_replay_process_ready(int r
) {
1315 // journal::Replay is ready for more events -- attempt to pop another
1316 CephContext
*cct
= m_image_ctx
.cct
;
1317 ldout(cct
, 20) << this << " " << __func__
<< dendl
;
1321 Mutex::Locker
locker(m_lock
);
1322 assert(m_processing_entry
);
1323 m_processing_entry
= false;
1325 handle_replay_ready();
1328 template <typename I
>
1329 void Journal
<I
>::handle_replay_process_safe(ReplayEntry replay_entry
, int r
) {
1330 CephContext
*cct
= m_image_ctx
.cct
;
1333 assert(m_state
== STATE_REPLAYING
||
1334 m_state
== STATE_FLUSHING_RESTART
||
1335 m_state
== STATE_FLUSHING_REPLAY
);
1337 ldout(cct
, 20) << this << " " << __func__
<< ": r=" << r
<< dendl
;
1339 if (r
!= -ECANCELED
) {
1340 lderr(cct
) << this << " " << __func__
<< ": "
1341 << "failed to commit journal event to disk: "
1342 << cpp_strerror(r
) << dendl
;
1345 if (m_state
== STATE_REPLAYING
) {
1346 // abort the replay if we have an error
1347 transition_state(STATE_FLUSHING_RESTART
, r
);
1350 // stop replay, shut down, and restart
1351 Context
*ctx
= new FunctionContext([this, cct
](int r
) {
1352 ldout(cct
, 20) << this << " handle_replay_process_safe: "
1353 << "shut down replay" << dendl
;
1355 Mutex::Locker
locker(m_lock
);
1356 assert(m_state
== STATE_FLUSHING_RESTART
);
1359 m_journal_replay
->shut_down(true, create_context_callback
<
1360 Journal
<I
>, &Journal
<I
>::handle_flushing_restart
>(this));
1362 m_journaler
->stop_replay(ctx
);
1364 } else if (m_state
== STATE_FLUSHING_REPLAY
) {
1365 // end-of-replay flush in-progress -- we need to restart replay
1366 transition_state(STATE_FLUSHING_RESTART
, r
);
1371 // only commit the entry if written successfully
1372 m_journaler
->committed(replay_entry
);
1377 template <typename I
>
1378 void Journal
<I
>::handle_flushing_restart(int r
) {
1379 Mutex::Locker
locker(m_lock
);
1381 CephContext
*cct
= m_image_ctx
.cct
;
1382 ldout(cct
, 20) << this << " " << __func__
<< dendl
;
1385 assert(m_state
== STATE_FLUSHING_RESTART
);
1386 if (m_close_pending
) {
1387 destroy_journaler(r
);
1391 recreate_journaler(r
);
1394 template <typename I
>
1395 void Journal
<I
>::handle_flushing_replay() {
1396 Mutex::Locker
locker(m_lock
);
1398 CephContext
*cct
= m_image_ctx
.cct
;
1399 ldout(cct
, 20) << this << " " << __func__
<< dendl
;
1401 assert(m_state
== STATE_FLUSHING_REPLAY
|| m_state
== STATE_FLUSHING_RESTART
);
1402 if (m_close_pending
) {
1403 destroy_journaler(0);
1405 } else if (m_state
== STATE_FLUSHING_RESTART
) {
1406 // failed to replay one-or-more events -- restart
1407 recreate_journaler(0);
1411 delete m_journal_replay
;
1412 m_journal_replay
= NULL
;
1418 template <typename I
>
1419 void Journal
<I
>::handle_recording_stopped(int r
) {
1420 CephContext
*cct
= m_image_ctx
.cct
;
1421 ldout(cct
, 20) << this << " " << __func__
<< ": r=" << r
<< dendl
;
1423 Mutex::Locker
locker(m_lock
);
1424 assert(m_state
== STATE_STOPPING
);
1426 destroy_journaler(r
);
1429 template <typename I
>
1430 void Journal
<I
>::handle_journal_destroyed(int r
) {
1431 CephContext
*cct
= m_image_ctx
.cct
;
1432 ldout(cct
, 20) << this << " " << __func__
<< ": r=" << r
<< dendl
;
1435 lderr(cct
) << this << " " << __func__
1436 << "error detected while closing journal: " << cpp_strerror(r
)
1440 Mutex::Locker
locker(m_lock
);
1442 m_journaler
= nullptr;
1444 assert(m_state
== STATE_CLOSING
|| m_state
== STATE_RESTARTING_REPLAY
);
1445 if (m_state
== STATE_RESTARTING_REPLAY
) {
1450 transition_state(STATE_CLOSED
, r
);
1453 template <typename I
>
1454 void Journal
<I
>::handle_io_event_safe(int r
, uint64_t tid
) {
1455 CephContext
*cct
= m_image_ctx
.cct
;
1456 ldout(cct
, 20) << this << " " << __func__
<< ": r=" << r
<< ", "
1457 << "tid=" << tid
<< dendl
;
1459 // journal will be flushed before closing
1460 assert(m_state
== STATE_READY
|| m_state
== STATE_STOPPING
);
1462 lderr(cct
) << this << " " << __func__
<< ": "
1463 << "failed to commit IO event: " << cpp_strerror(r
) << dendl
;
1466 IOObjectRequests aio_object_requests
;
1467 Contexts on_safe_contexts
;
1469 Mutex::Locker
event_locker(m_event_lock
);
1470 typename
Events::iterator it
= m_events
.find(tid
);
1471 assert(it
!= m_events
.end());
1473 Event
&event
= it
->second
;
1474 aio_object_requests
.swap(event
.aio_object_requests
);
1475 on_safe_contexts
.swap(event
.on_safe_contexts
);
1477 if (r
< 0 || event
.committed_io
) {
1478 // failed journal write so IO won't be sent -- or IO extent was
1479 // overwritten by future IO operations so this was a no-op IO event
1481 for (auto &future
: event
.futures
) {
1482 m_journaler
->committed(future
);
1486 if (event
.committed_io
) {
1493 ldout(cct
, 20) << this << " " << __func__
<< ": "
1494 << "completing tid=" << tid
<< dendl
;
1495 for (IOObjectRequests::iterator it
= aio_object_requests
.begin();
1496 it
!= aio_object_requests
.end(); ++it
) {
1498 // don't send aio requests if the journal fails -- bubble error up
1501 // send any waiting aio requests now that journal entry is safe
1506 // alert the cache about the journal event status
1507 for (Contexts::iterator it
= on_safe_contexts
.begin();
1508 it
!= on_safe_contexts
.end(); ++it
) {
1513 template <typename I
>
1514 void Journal
<I
>::handle_op_event_safe(int r
, uint64_t tid
,
1515 const Future
&op_start_future
,
1516 const Future
&op_finish_future
,
1518 CephContext
*cct
= m_image_ctx
.cct
;
1519 ldout(cct
, 20) << this << " " << __func__
<< ": r=" << r
<< ", "
1520 << "tid=" << tid
<< dendl
;
1522 // journal will be flushed before closing
1523 assert(m_state
== STATE_READY
|| m_state
== STATE_STOPPING
);
1525 lderr(cct
) << this << " " << __func__
<< ": "
1526 << "failed to commit op event: " << cpp_strerror(r
) << dendl
;
1529 m_journaler
->committed(op_start_future
);
1530 m_journaler
->committed(op_finish_future
);
1532 // reduce the replay window after committing an op event
1533 m_journaler
->flush_commit_position(on_safe
);
1536 template <typename I
>
1537 void Journal
<I
>::stop_recording() {
1538 assert(m_lock
.is_locked());
1539 assert(m_journaler
!= NULL
);
1541 assert(m_state
== STATE_READY
);
1542 transition_state(STATE_STOPPING
, 0);
1544 m_journaler
->stop_append(util::create_async_context_callback(
1545 m_image_ctx
, create_context_callback
<
1546 Journal
<I
>, &Journal
<I
>::handle_recording_stopped
>(this)));
1549 template <typename I
>
1550 void Journal
<I
>::transition_state(State state
, int r
) {
1551 CephContext
*cct
= m_image_ctx
.cct
;
1552 ldout(cct
, 20) << this << " " << __func__
<< ": new state=" << state
<< dendl
;
1553 assert(m_lock
.is_locked());
1556 if (m_error_result
== 0 && r
< 0) {
1560 if (is_steady_state()) {
1561 Contexts
wait_for_state_contexts(std::move(m_wait_for_state_contexts
));
1562 for (auto ctx
: wait_for_state_contexts
) {
1563 ctx
->complete(m_error_result
);
1568 template <typename I
>
1569 bool Journal
<I
>::is_steady_state() const {
1570 assert(m_lock
.is_locked());
1575 case STATE_UNINITIALIZED
:
1576 case STATE_INITIALIZING
:
1577 case STATE_REPLAYING
:
1578 case STATE_FLUSHING_RESTART
:
1579 case STATE_RESTARTING_REPLAY
:
1580 case STATE_FLUSHING_REPLAY
:
1581 case STATE_STOPPING
:
1588 template <typename I
>
1589 void Journal
<I
>::wait_for_steady_state(Context
*on_state
) {
1590 assert(m_lock
.is_locked());
1591 assert(!is_steady_state());
1593 CephContext
*cct
= m_image_ctx
.cct
;
1594 ldout(cct
, 20) << this << " " << __func__
<< ": on_state=" << on_state
1596 m_wait_for_state_contexts
.push_back(on_state
);
1599 template <typename I
>
1600 int Journal
<I
>::is_resync_requested(bool *do_resync
) {
1601 Mutex::Locker
l(m_lock
);
1602 return check_resync_requested(do_resync
);
1605 template <typename I
>
1606 int Journal
<I
>::check_resync_requested(bool *do_resync
) {
1607 CephContext
*cct
= m_image_ctx
.cct
;
1608 ldout(cct
, 20) << this << " " << __func__
<< dendl
;
1610 assert(m_lock
.is_locked());
1611 assert(do_resync
!= nullptr);
1613 cls::journal::Client client
;
1614 int r
= m_journaler
->get_cached_client(IMAGE_CLIENT_ID
, &client
);
1616 lderr(cct
) << this << " " << __func__
<< ": "
1617 << "failed to retrieve client: " << cpp_strerror(r
) << dendl
;
1621 librbd::journal::ClientData client_data
;
1622 bufferlist::iterator bl_it
= client
.data
.begin();
1624 ::decode(client_data
, bl_it
);
1625 } catch (const buffer::error
&err
) {
1626 lderr(cct
) << this << " " << __func__
<< ": "
1627 << "failed to decode client data: " << err
<< dendl
;
1631 journal::ImageClientMeta
*image_client_meta
=
1632 boost::get
<journal::ImageClientMeta
>(&client_data
.client_meta
);
1633 if (image_client_meta
== nullptr) {
1634 lderr(cct
) << this << " " << __func__
<< ": "
1635 << "failed to access image client meta struct" << dendl
;
1639 *do_resync
= image_client_meta
->resync_requested
;
1644 struct C_RefreshTags
: public Context
{
1645 util::AsyncOpTracker
&async_op_tracker
;
1646 Context
*on_finish
= nullptr;
1650 journal::TagData tag_data
;
1652 C_RefreshTags(util::AsyncOpTracker
&async_op_tracker
)
1653 : async_op_tracker(async_op_tracker
),
1654 lock("librbd::Journal::C_RefreshTags::lock") {
1655 async_op_tracker
.start_op();
1657 ~C_RefreshTags() override
{
1658 async_op_tracker
.finish_op();
1661 void finish(int r
) override
{
1662 on_finish
->complete(r
);
1666 template <typename I
>
1667 void Journal
<I
>::handle_metadata_updated() {
1668 CephContext
*cct
= m_image_ctx
.cct
;
1669 Mutex::Locker
locker(m_lock
);
1671 if (m_state
!= STATE_READY
&& !is_journal_replaying(m_lock
)) {
1673 } else if (is_tag_owner(m_lock
)) {
1674 ldout(cct
, 20) << this << " " << __func__
<< ": primary image" << dendl
;
1676 } else if (m_listeners
.empty()) {
1677 ldout(cct
, 20) << this << " " << __func__
<< ": no listeners" << dendl
;
1681 uint64_t refresh_sequence
= ++m_refresh_sequence
;
1682 ldout(cct
, 20) << this << " " << __func__
<< ": "
1683 << "refresh_sequence=" << refresh_sequence
<< dendl
;
1685 // pull the most recent tags from the journal, decode, and
1686 // update the internal tag state
1687 C_RefreshTags
*refresh_ctx
= new C_RefreshTags(m_async_journal_op_tracker
);
1688 refresh_ctx
->on_finish
= new FunctionContext(
1689 [this, refresh_sequence
, refresh_ctx
](int r
) {
1690 handle_refresh_metadata(refresh_sequence
, refresh_ctx
->tag_tid
,
1691 refresh_ctx
->tag_data
, r
);
1693 C_DecodeTags
*decode_tags_ctx
= new C_DecodeTags(
1694 cct
, &refresh_ctx
->lock
, &refresh_ctx
->tag_tid
,
1695 &refresh_ctx
->tag_data
, refresh_ctx
);
1696 m_journaler
->get_tags(m_tag_tid
== 0 ? 0 : m_tag_tid
- 1, m_tag_class
,
1697 &decode_tags_ctx
->tags
, decode_tags_ctx
);
1700 template <typename I
>
1701 void Journal
<I
>::handle_refresh_metadata(uint64_t refresh_sequence
,
1703 journal::TagData tag_data
, int r
) {
1704 CephContext
*cct
= m_image_ctx
.cct
;
1705 Mutex::Locker
locker(m_lock
);
1708 lderr(cct
) << this << " " << __func__
<< ": failed to refresh metadata: "
1709 << cpp_strerror(r
) << dendl
;
1711 } else if (m_state
!= STATE_READY
&& !is_journal_replaying(m_lock
)) {
1713 } else if (refresh_sequence
!= m_refresh_sequence
) {
1714 // another, more up-to-date refresh is in-flight
1718 ldout(cct
, 20) << this << " " << __func__
<< ": "
1719 << "refresh_sequence=" << refresh_sequence
<< ", "
1720 << "tag_tid=" << tag_tid
<< ", "
1721 << "tag_data=" << tag_data
<< dendl
;
1722 while (m_listener_notify
) {
1723 m_listener_cond
.Wait(m_lock
);
1726 bool was_tag_owner
= is_tag_owner(m_lock
);
1727 if (m_tag_tid
< tag_tid
) {
1728 m_tag_tid
= tag_tid
;
1729 m_tag_data
= tag_data
;
1731 bool promoted_to_primary
= (!was_tag_owner
&& is_tag_owner(m_lock
));
1733 bool resync_requested
= false;
1734 r
= check_resync_requested(&resync_requested
);
1736 lderr(cct
) << this << " " << __func__
<< ": "
1737 << "failed to check if a resync was requested" << dendl
;
1741 Listeners
listeners(m_listeners
);
1742 m_listener_notify
= true;
1745 if (promoted_to_primary
) {
1746 for (auto listener
: listeners
) {
1747 listener
->handle_promoted();
1749 } else if (resync_requested
) {
1750 for (auto listener
: listeners
) {
1751 listener
->handle_resync();
1756 m_listener_notify
= false;
1757 m_listener_cond
.Signal();
1760 template <typename I
>
1761 void Journal
<I
>::add_listener(journal::Listener
*listener
) {
1762 Mutex::Locker
locker(m_lock
);
1763 m_listeners
.insert(listener
);
1766 template <typename I
>
1767 void Journal
<I
>::remove_listener(journal::Listener
*listener
) {
1768 Mutex::Locker
locker(m_lock
);
1769 while (m_listener_notify
) {
1770 m_listener_cond
.Wait(m_lock
);
1772 m_listeners
.erase(listener
);
1775 } // namespace librbd
1778 template class librbd::Journal
<librbd::ImageCtx
>;