1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "librbd/Journal.h"
5 #include "include/rados/librados.hpp"
6 #include "common/AsyncOpTracker.h"
7 #include "common/errno.h"
8 #include "common/Timer.h"
9 #include "common/WorkQueue.h"
10 #include "cls/journal/cls_journal_types.h"
11 #include "journal/Journaler.h"
12 #include "journal/Policy.h"
13 #include "journal/ReplayEntry.h"
14 #include "journal/Settings.h"
15 #include "journal/Utils.h"
16 #include "librbd/ImageCtx.h"
17 #include "librbd/asio/ContextWQ.h"
18 #include "librbd/io/ObjectDispatchSpec.h"
19 #include "librbd/io/ObjectDispatcherInterface.h"
20 #include "librbd/journal/CreateRequest.h"
21 #include "librbd/journal/DemoteRequest.h"
22 #include "librbd/journal/ObjectDispatch.h"
23 #include "librbd/journal/OpenRequest.h"
24 #include "librbd/journal/RemoveRequest.h"
25 #include "librbd/journal/ResetRequest.h"
26 #include "librbd/journal/Replay.h"
27 #include "librbd/journal/PromoteRequest.h"
29 #include <boost/scope_exit.hpp>
32 #define dout_subsys ceph_subsys_rbd
34 #define dout_prefix *_dout << "librbd::Journal: "
38 using util::create_async_context_callback
;
39 using util::create_context_callback
;
40 using journal::util::C_DecodeTag
;
41 using journal::util::C_DecodeTags
;
45 // TODO: once journaler is 100% async and converted to ASIO, remove separate
46 // threads and reuse librbd's AsioEngine
47 class ThreadPoolSingleton
: public ThreadPool
{
49 ContextWQ
*work_queue
;
51 explicit ThreadPoolSingleton(CephContext
*cct
)
52 : ThreadPool(cct
, "librbd::Journal", "tp_librbd_journ", 1),
53 work_queue(new ContextWQ("librbd::journal::work_queue",
55 cct
->_conf
.get_val
<uint64_t>("rbd_op_thread_timeout")),
59 ~ThreadPoolSingleton() override
{
68 struct C_IsTagOwner
: public Context
{
69 librados::IoCtx
&io_ctx
;
72 asio::ContextWQ
*op_work_queue
;
75 CephContext
*cct
= nullptr;
77 cls::journal::Client client
;
78 journal::ImageClientMeta client_meta
;
80 journal::TagData tag_data
;
82 C_IsTagOwner(librados::IoCtx
&io_ctx
, const std::string
&image_id
,
83 bool *is_tag_owner
, asio::ContextWQ
*op_work_queue
,
85 : io_ctx(io_ctx
), image_id(image_id
), is_tag_owner(is_tag_owner
),
86 op_work_queue(op_work_queue
), on_finish(on_finish
),
87 cct(reinterpret_cast<CephContext
*>(io_ctx
.cct())),
88 journaler(new Journaler(io_ctx
, image_id
, Journal
<>::IMAGE_CLIENT_ID
,
92 void finish(int r
) override
{
93 ldout(cct
, 20) << this << " C_IsTagOwner::" << __func__
<< ": r=" << r
96 lderr(cct
) << this << " C_IsTagOwner::" << __func__
<< ": "
97 << "failed to get tag owner: " << cpp_strerror(r
) << dendl
;
99 *is_tag_owner
= (tag_data
.mirror_uuid
== Journal
<>::LOCAL_MIRROR_UUID
);
102 Journaler
*journaler
= this->journaler
;
103 Context
*on_finish
= this->on_finish
;
104 auto ctx
= new LambdaContext(
105 [journaler
, on_finish
](int r
) {
106 on_finish
->complete(r
);
109 op_work_queue
->queue(ctx
, r
);
113 struct C_GetTagOwner
: public Context
{
114 std::string
*mirror_uuid
;
118 cls::journal::Client client
;
119 journal::ImageClientMeta client_meta
;
120 uint64_t tag_tid
= 0;
121 journal::TagData tag_data
;
123 C_GetTagOwner(librados::IoCtx
&io_ctx
, const std::string
&image_id
,
124 std::string
*mirror_uuid
, Context
*on_finish
)
125 : mirror_uuid(mirror_uuid
), on_finish(on_finish
),
126 journaler(io_ctx
, image_id
, Journal
<>::IMAGE_CLIENT_ID
, {}, nullptr) {
129 virtual void finish(int r
) {
131 *mirror_uuid
= tag_data
.mirror_uuid
;
133 on_finish
->complete(r
);
137 template <typename J
>
138 struct GetTagsRequest
{
141 cls::journal::Client
*client
;
142 journal::ImageClientMeta
*client_meta
;
144 journal::TagData
*tag_data
;
147 ceph::mutex lock
= ceph::make_mutex("lock");
149 GetTagsRequest(CephContext
*cct
, J
*journaler
, cls::journal::Client
*client
,
150 journal::ImageClientMeta
*client_meta
, uint64_t *tag_tid
,
151 journal::TagData
*tag_data
, Context
*on_finish
)
152 : cct(cct
), journaler(journaler
), client(client
), client_meta(client_meta
),
153 tag_tid(tag_tid
), tag_data(tag_data
), on_finish(on_finish
) {
162 * GET_CLIENT * * * * * * * * * * * *
165 * GET_TAGS * * * * * * * * * * * * * (error)
168 * <finish> * * * * * * * * * * * * *
177 void send_get_client() {
178 ldout(cct
, 20) << __func__
<< dendl
;
180 auto ctx
= new LambdaContext(
182 handle_get_client(r
);
184 journaler
->get_client(Journal
<ImageCtx
>::IMAGE_CLIENT_ID
, client
, ctx
);
187 void handle_get_client(int r
) {
188 ldout(cct
, 20) << __func__
<< ": r=" << r
<< dendl
;
195 librbd::journal::ClientData client_data
;
196 auto bl_it
= client
->data
.cbegin();
198 decode(client_data
, bl_it
);
199 } catch (const buffer::error
&err
) {
200 lderr(cct
) << this << " OpenJournalerRequest::" << __func__
<< ": "
201 << "failed to decode client data" << dendl
;
206 journal::ImageClientMeta
*image_client_meta
=
207 boost::get
<journal::ImageClientMeta
>(&client_data
.client_meta
);
208 if (image_client_meta
== nullptr) {
209 lderr(cct
) << this << " OpenJournalerRequest::" << __func__
<< ": "
210 << "failed to get client meta" << dendl
;
214 *client_meta
= *image_client_meta
;
219 void send_get_tags() {
220 ldout(cct
, 20) << __func__
<< dendl
;
222 auto ctx
= new LambdaContext(
226 C_DecodeTags
*tags_ctx
= new C_DecodeTags(cct
, &lock
, tag_tid
, tag_data
,
228 journaler
->get_tags(client_meta
->tag_class
, &tags_ctx
->tags
, tags_ctx
);
231 void handle_get_tags(int r
) {
232 ldout(cct
, 20) << __func__
<< ": r=" << r
<< dendl
;
237 void complete(int r
) {
238 on_finish
->complete(r
);
243 template <typename J
>
244 void get_tags(CephContext
*cct
, J
*journaler
,
245 cls::journal::Client
*client
,
246 journal::ImageClientMeta
*client_meta
,
247 uint64_t *tag_tid
, journal::TagData
*tag_data
,
248 Context
*on_finish
) {
249 ldout(cct
, 20) << __func__
<< dendl
;
251 GetTagsRequest
<J
> *req
=
252 new GetTagsRequest
<J
>(cct
, journaler
, client
, client_meta
, tag_tid
,
253 tag_data
, on_finish
);
257 template <typename J
>
258 int allocate_journaler_tag(CephContext
*cct
, J
*journaler
,
260 const journal::TagPredecessor
&predecessor
,
261 const std::string
&mirror_uuid
,
262 cls::journal::Tag
*new_tag
) {
263 journal::TagData tag_data
;
264 tag_data
.mirror_uuid
= mirror_uuid
;
265 tag_data
.predecessor
= predecessor
;
268 encode(tag_data
, tag_bl
);
270 C_SaferCond allocate_tag_ctx
;
271 journaler
->allocate_tag(tag_class
, tag_bl
, new_tag
, &allocate_tag_ctx
);
273 int r
= allocate_tag_ctx
.wait();
275 lderr(cct
) << __func__
<< ": "
276 << "failed to allocate tag: " << cpp_strerror(r
) << dendl
;
282 } // anonymous namespace
284 // client id for local image
285 template <typename I
>
286 const std::string Journal
<I
>::IMAGE_CLIENT_ID("");
288 // mirror uuid to use for local images
289 template <typename I
>
290 const std::string Journal
<I
>::LOCAL_MIRROR_UUID("");
292 // mirror uuid to use for orphaned (demoted) images
293 template <typename I
>
294 const std::string Journal
<I
>::ORPHAN_MIRROR_UUID("<orphan>");
296 template <typename I
>
297 std::ostream
&operator<<(std::ostream
&os
,
298 const typename Journal
<I
>::State
&state
) {
300 case Journal
<I
>::STATE_UNINITIALIZED
:
301 os
<< "Uninitialized";
303 case Journal
<I
>::STATE_INITIALIZING
:
304 os
<< "Initializing";
306 case Journal
<I
>::STATE_REPLAYING
:
309 case Journal
<I
>::STATE_FLUSHING_RESTART
:
310 os
<< "FlushingRestart";
312 case Journal
<I
>::STATE_RESTARTING_REPLAY
:
313 os
<< "RestartingReplay";
315 case Journal
<I
>::STATE_FLUSHING_REPLAY
:
316 os
<< "FlushingReplay";
318 case Journal
<I
>::STATE_READY
:
321 case Journal
<I
>::STATE_STOPPING
:
324 case Journal
<I
>::STATE_CLOSING
:
327 case Journal
<I
>::STATE_CLOSED
:
331 os
<< "Unknown (" << static_cast<uint32_t>(state
) << ")";
338 template <typename I
>
339 void Journal
<I
>::MetadataListener::handle_update(::journal::JournalMetadata
*) {
340 auto ctx
= new LambdaContext([this](int r
) {
341 journal
->handle_metadata_updated();
343 journal
->m_work_queue
->queue(ctx
, 0);
347 template <typename I
>
348 void Journal
<I
>::get_work_queue(CephContext
*cct
, ContextWQ
**work_queue
) {
349 auto thread_pool_singleton
=
350 &cct
->lookup_or_create_singleton_object
<ThreadPoolSingleton
>(
351 "librbd::journal::thread_pool", false, cct
);
352 *work_queue
= thread_pool_singleton
->work_queue
;
355 template <typename I
>
356 Journal
<I
>::Journal(I
&image_ctx
)
357 : RefCountedObject(image_ctx
.cct
),
358 m_image_ctx(image_ctx
), m_journaler(NULL
),
359 m_state(STATE_UNINITIALIZED
),
360 m_error_result(0), m_replay_handler(this), m_close_pending(false),
362 m_blocking_writes(false), m_journal_replay(NULL
),
363 m_metadata_listener(this) {
365 CephContext
*cct
= m_image_ctx
.cct
;
366 ldout(cct
, 5) << this << ": ictx=" << &m_image_ctx
<< dendl
;
368 get_work_queue(cct
, &m_work_queue
);
369 ImageCtx::get_timer_instance(cct
, &m_timer
, &m_timer_lock
);
372 template <typename I
>
373 Journal
<I
>::~Journal() {
374 if (m_work_queue
!= nullptr) {
375 m_work_queue
->drain();
378 std::lock_guard locker
{m_lock
};
379 ceph_assert(m_state
== STATE_UNINITIALIZED
|| m_state
== STATE_CLOSED
);
380 ceph_assert(m_journaler
== NULL
);
381 ceph_assert(m_journal_replay
== NULL
);
382 ceph_assert(m_wait_for_state_contexts
.empty());
385 template <typename I
>
386 bool Journal
<I
>::is_journal_supported(I
&image_ctx
) {
387 ceph_assert(ceph_mutex_is_locked(image_ctx
.image_lock
));
388 return ((image_ctx
.features
& RBD_FEATURE_JOURNALING
) &&
389 !image_ctx
.read_only
&& image_ctx
.snap_id
== CEPH_NOSNAP
);
392 template <typename I
>
393 int Journal
<I
>::create(librados::IoCtx
&io_ctx
, const std::string
&image_id
,
394 uint8_t order
, uint8_t splay_width
,
395 const std::string
&object_pool
) {
396 CephContext
*cct
= reinterpret_cast<CephContext
*>(io_ctx
.cct());
397 ldout(cct
, 5) << __func__
<< ": image=" << image_id
<< dendl
;
399 ContextWQ
*work_queue
;
400 get_work_queue(cct
, &work_queue
);
403 journal::TagData
tag_data(LOCAL_MIRROR_UUID
);
404 journal::CreateRequest
<I
> *req
= journal::CreateRequest
<I
>::create(
405 io_ctx
, image_id
, order
, splay_width
, object_pool
, cls::journal::Tag::TAG_CLASS_NEW
,
406 tag_data
, IMAGE_CLIENT_ID
, work_queue
, &cond
);
412 template <typename I
>
413 int Journal
<I
>::remove(librados::IoCtx
&io_ctx
, const std::string
&image_id
) {
414 CephContext
*cct
= reinterpret_cast<CephContext
*>(io_ctx
.cct());
415 ldout(cct
, 5) << __func__
<< ": image=" << image_id
<< dendl
;
417 ContextWQ
*work_queue
;
418 get_work_queue(cct
, &work_queue
);
421 journal::RemoveRequest
<I
> *req
= journal::RemoveRequest
<I
>::create(
422 io_ctx
, image_id
, IMAGE_CLIENT_ID
, work_queue
, &cond
);
428 template <typename I
>
429 int Journal
<I
>::reset(librados::IoCtx
&io_ctx
, const std::string
&image_id
) {
430 CephContext
*cct
= reinterpret_cast<CephContext
*>(io_ctx
.cct());
431 ldout(cct
, 5) << __func__
<< ": image=" << image_id
<< dendl
;
433 ContextWQ
*work_queue
;
434 get_work_queue(cct
, &work_queue
);
437 auto req
= journal::ResetRequest
<I
>::create(io_ctx
, image_id
, IMAGE_CLIENT_ID
,
438 Journal
<>::LOCAL_MIRROR_UUID
,
445 template <typename I
>
446 void Journal
<I
>::is_tag_owner(I
*image_ctx
, bool *owner
,
447 Context
*on_finish
) {
448 Journal
<I
>::is_tag_owner(image_ctx
->md_ctx
, image_ctx
->id
, owner
,
449 image_ctx
->op_work_queue
, on_finish
);
452 template <typename I
>
453 void Journal
<I
>::is_tag_owner(librados::IoCtx
& io_ctx
, std::string
& image_id
,
455 asio::ContextWQ
*op_work_queue
,
456 Context
*on_finish
) {
457 CephContext
*cct
= reinterpret_cast<CephContext
*>(io_ctx
.cct());
458 ldout(cct
, 20) << __func__
<< dendl
;
460 C_IsTagOwner
<I
> *is_tag_owner_ctx
= new C_IsTagOwner
<I
>(
461 io_ctx
, image_id
, is_tag_owner
, op_work_queue
, on_finish
);
462 get_tags(cct
, is_tag_owner_ctx
->journaler
, &is_tag_owner_ctx
->client
,
463 &is_tag_owner_ctx
->client_meta
, &is_tag_owner_ctx
->tag_tid
,
464 &is_tag_owner_ctx
->tag_data
, is_tag_owner_ctx
);
467 template <typename I
>
468 void Journal
<I
>::get_tag_owner(IoCtx
& io_ctx
, std::string
& image_id
,
469 std::string
*mirror_uuid
,
470 asio::ContextWQ
*op_work_queue
,
471 Context
*on_finish
) {
472 CephContext
*cct
= static_cast<CephContext
*>(io_ctx
.cct());
473 ldout(cct
, 20) << __func__
<< dendl
;
475 auto ctx
= new C_GetTagOwner(io_ctx
, image_id
, mirror_uuid
, on_finish
);
476 get_tags(cct
, &ctx
->journaler
, &ctx
->client
, &ctx
->client_meta
, &ctx
->tag_tid
,
477 &ctx
->tag_data
, create_async_context_callback(op_work_queue
, ctx
));
480 template <typename I
>
481 int Journal
<I
>::request_resync(I
*image_ctx
) {
482 CephContext
*cct
= image_ctx
->cct
;
483 ldout(cct
, 20) << __func__
<< dendl
;
485 Journaler
journaler(image_ctx
->md_ctx
, image_ctx
->id
, IMAGE_CLIENT_ID
, {},
488 ceph::mutex lock
= ceph::make_mutex("lock");
489 journal::ImageClientMeta client_meta
;
491 journal::TagData tag_data
;
493 C_SaferCond open_ctx
;
494 auto open_req
= journal::OpenRequest
<I
>::create(image_ctx
, &journaler
, &lock
,
495 &client_meta
, &tag_tid
,
496 &tag_data
, &open_ctx
);
499 BOOST_SCOPE_EXIT_ALL(&journaler
) {
500 journaler
.shut_down();
503 int r
= open_ctx
.wait();
508 client_meta
.resync_requested
= true;
510 journal::ClientData
client_data(client_meta
);
511 bufferlist client_data_bl
;
512 encode(client_data
, client_data_bl
);
514 C_SaferCond update_client_ctx
;
515 journaler
.update_client(client_data_bl
, &update_client_ctx
);
517 r
= update_client_ctx
.wait();
519 lderr(cct
) << __func__
<< ": "
520 << "failed to update client: " << cpp_strerror(r
) << dendl
;
526 template <typename I
>
527 void Journal
<I
>::promote(I
*image_ctx
, Context
*on_finish
) {
528 CephContext
*cct
= image_ctx
->cct
;
529 ldout(cct
, 20) << __func__
<< dendl
;
531 auto promote_req
= journal::PromoteRequest
<I
>::create(image_ctx
, false,
536 template <typename I
>
537 void Journal
<I
>::demote(I
*image_ctx
, Context
*on_finish
) {
538 CephContext
*cct
= image_ctx
->cct
;
539 ldout(cct
, 20) << __func__
<< dendl
;
541 auto req
= journal::DemoteRequest
<I
>::create(*image_ctx
, on_finish
);
545 template <typename I
>
546 bool Journal
<I
>::is_journal_ready() const {
547 std::lock_guard locker
{m_lock
};
548 return (m_state
== STATE_READY
);
551 template <typename I
>
552 bool Journal
<I
>::is_journal_replaying() const {
553 std::lock_guard locker
{m_lock
};
554 return is_journal_replaying(m_lock
);
557 template <typename I
>
558 bool Journal
<I
>::is_journal_replaying(const ceph::mutex
&) const {
559 ceph_assert(ceph_mutex_is_locked(m_lock
));
560 return (m_state
== STATE_REPLAYING
||
561 m_state
== STATE_FLUSHING_REPLAY
||
562 m_state
== STATE_FLUSHING_RESTART
||
563 m_state
== STATE_RESTARTING_REPLAY
);
566 template <typename I
>
567 bool Journal
<I
>::is_journal_appending() const {
568 ceph_assert(ceph_mutex_is_locked(m_image_ctx
.image_lock
));
569 std::lock_guard locker
{m_lock
};
570 return (m_state
== STATE_READY
&&
571 !m_image_ctx
.get_journal_policy()->append_disabled());
574 template <typename I
>
575 void Journal
<I
>::wait_for_journal_ready(Context
*on_ready
) {
576 on_ready
= create_async_context_callback(m_image_ctx
, on_ready
);
578 std::lock_guard locker
{m_lock
};
579 if (m_state
== STATE_READY
) {
580 on_ready
->complete(m_error_result
);
582 wait_for_steady_state(on_ready
);
586 template <typename I
>
587 void Journal
<I
>::open(Context
*on_finish
) {
588 CephContext
*cct
= m_image_ctx
.cct
;
589 ldout(cct
, 20) << this << " " << __func__
<< dendl
;
591 on_finish
= create_context_callback
<Context
>(on_finish
, this);
593 on_finish
= create_async_context_callback(m_image_ctx
, on_finish
);
595 // inject our handler into the object dispatcher chain
596 m_image_ctx
.io_object_dispatcher
->register_dispatch(
597 journal::ObjectDispatch
<I
>::create(&m_image_ctx
, this));
599 std::lock_guard locker
{m_lock
};
600 ceph_assert(m_state
== STATE_UNINITIALIZED
);
601 wait_for_steady_state(on_finish
);
605 template <typename I
>
606 void Journal
<I
>::close(Context
*on_finish
) {
607 CephContext
*cct
= m_image_ctx
.cct
;
608 ldout(cct
, 20) << this << " " << __func__
<< dendl
;
610 on_finish
= create_context_callback
<Context
>(on_finish
, this);
612 on_finish
= new LambdaContext([this, on_finish
](int r
) {
613 // remove our handler from object dispatcher chain - preserve error
614 auto ctx
= new LambdaContext([on_finish
, r
](int _
) {
615 on_finish
->complete(r
);
617 m_image_ctx
.io_object_dispatcher
->shut_down_dispatch(
618 io::OBJECT_DISPATCH_LAYER_JOURNAL
, ctx
);
620 on_finish
= create_async_context_callback(m_image_ctx
, on_finish
);
622 std::unique_lock locker
{m_lock
};
623 m_listener_cond
.wait(locker
, [this] { return !m_listener_notify
; });
625 Listeners
listeners(m_listeners
);
626 m_listener_notify
= true;
628 for (auto listener
: listeners
) {
629 listener
->handle_close();
633 m_listener_notify
= false;
634 m_listener_cond
.notify_all();
636 ceph_assert(m_state
!= STATE_UNINITIALIZED
);
637 if (m_state
== STATE_CLOSED
) {
638 on_finish
->complete(m_error_result
);
642 if (m_state
== STATE_READY
) {
646 m_close_pending
= true;
647 wait_for_steady_state(on_finish
);
650 template <typename I
>
651 bool Journal
<I
>::is_tag_owner() const {
652 std::lock_guard locker
{m_lock
};
653 return is_tag_owner(m_lock
);
656 template <typename I
>
657 bool Journal
<I
>::is_tag_owner(const ceph::mutex
&) const {
658 ceph_assert(ceph_mutex_is_locked(m_lock
));
659 return (m_tag_data
.mirror_uuid
== LOCAL_MIRROR_UUID
);
662 template <typename I
>
663 uint64_t Journal
<I
>::get_tag_tid() const {
664 std::lock_guard locker
{m_lock
};
668 template <typename I
>
669 journal::TagData Journal
<I
>::get_tag_data() const {
670 std::lock_guard locker
{m_lock
};
674 template <typename I
>
675 void Journal
<I
>::allocate_local_tag(Context
*on_finish
) {
676 CephContext
*cct
= m_image_ctx
.cct
;
677 ldout(cct
, 20) << this << " " << __func__
<< dendl
;
679 journal::TagPredecessor predecessor
;
680 predecessor
.mirror_uuid
= LOCAL_MIRROR_UUID
;
682 std::lock_guard locker
{m_lock
};
683 ceph_assert(m_journaler
!= nullptr && is_tag_owner(m_lock
));
685 cls::journal::Client client
;
686 int r
= m_journaler
->get_cached_client(IMAGE_CLIENT_ID
, &client
);
688 lderr(cct
) << this << " " << __func__
<< ": "
689 << "failed to retrieve client: " << cpp_strerror(r
) << dendl
;
690 m_image_ctx
.op_work_queue
->queue(on_finish
, r
);
694 // since we are primary, populate the predecessor with our known commit
696 ceph_assert(m_tag_data
.mirror_uuid
== LOCAL_MIRROR_UUID
);
697 if (!client
.commit_position
.object_positions
.empty()) {
698 auto position
= client
.commit_position
.object_positions
.front();
699 predecessor
.commit_valid
= true;
700 predecessor
.tag_tid
= position
.tag_tid
;
701 predecessor
.entry_tid
= position
.entry_tid
;
705 allocate_tag(LOCAL_MIRROR_UUID
, predecessor
, on_finish
);
708 template <typename I
>
709 void Journal
<I
>::allocate_tag(const std::string
&mirror_uuid
,
710 const journal::TagPredecessor
&predecessor
,
711 Context
*on_finish
) {
712 CephContext
*cct
= m_image_ctx
.cct
;
713 ldout(cct
, 20) << this << " " << __func__
<< ": mirror_uuid=" << mirror_uuid
716 std::lock_guard locker
{m_lock
};
717 ceph_assert(m_journaler
!= nullptr);
719 journal::TagData tag_data
;
720 tag_data
.mirror_uuid
= mirror_uuid
;
721 tag_data
.predecessor
= predecessor
;
724 encode(tag_data
, tag_bl
);
726 C_DecodeTag
*decode_tag_ctx
= new C_DecodeTag(cct
, &m_lock
, &m_tag_tid
,
727 &m_tag_data
, on_finish
);
728 m_journaler
->allocate_tag(m_tag_class
, tag_bl
, &decode_tag_ctx
->tag
,
732 template <typename I
>
733 void Journal
<I
>::flush_commit_position(Context
*on_finish
) {
734 CephContext
*cct
= m_image_ctx
.cct
;
735 ldout(cct
, 20) << this << " " << __func__
<< dendl
;
737 std::lock_guard locker
{m_lock
};
738 ceph_assert(m_journaler
!= nullptr);
739 m_journaler
->flush_commit_position(on_finish
);
742 template <typename I
>
743 void Journal
<I
>::user_flushed() {
744 if (m_state
== STATE_READY
&& !m_user_flushed
.exchange(true) &&
745 m_image_ctx
.config
.template get_val
<bool>("rbd_journal_object_writethrough_until_flush")) {
746 std::lock_guard locker
{m_lock
};
747 if (m_state
== STATE_READY
) {
748 CephContext
*cct
= m_image_ctx
.cct
;
749 ldout(cct
, 5) << this << " " << __func__
<< dendl
;
751 ceph_assert(m_journaler
!= nullptr);
752 m_journaler
->set_append_batch_options(
753 m_image_ctx
.config
.template get_val
<uint64_t>("rbd_journal_object_flush_interval"),
754 m_image_ctx
.config
.template get_val
<Option::size_t>("rbd_journal_object_flush_bytes"),
755 m_image_ctx
.config
.template get_val
<double>("rbd_journal_object_flush_age"));
757 m_user_flushed
= false;
762 template <typename I
>
763 uint64_t Journal
<I
>::append_write_event(uint64_t offset
, size_t length
,
764 const bufferlist
&bl
,
766 ceph_assert(m_max_append_size
> journal::AioWriteEvent::get_fixed_size());
767 uint64_t max_write_data_size
=
768 m_max_append_size
- journal::AioWriteEvent::get_fixed_size();
770 // ensure that the write event fits within the journal entry
771 Bufferlists bufferlists
;
772 uint64_t bytes_remaining
= length
;
773 uint64_t event_offset
= 0;
775 uint64_t event_length
= std::min(bytes_remaining
, max_write_data_size
);
778 event_bl
.substr_of(bl
, event_offset
, event_length
);
779 journal::EventEntry
event_entry(journal::AioWriteEvent(offset
+ event_offset
,
784 bufferlists
.emplace_back();
785 encode(event_entry
, bufferlists
.back());
787 event_offset
+= event_length
;
788 bytes_remaining
-= event_length
;
789 } while (bytes_remaining
> 0);
791 return append_io_events(journal::EVENT_TYPE_AIO_WRITE
, bufferlists
, offset
,
792 length
, flush_entry
, 0);
795 template <typename I
>
796 uint64_t Journal
<I
>::append_io_event(journal::EventEntry
&&event_entry
,
797 uint64_t offset
, size_t length
,
798 bool flush_entry
, int filter_ret_val
) {
800 event_entry
.timestamp
= ceph_clock_now();
801 encode(event_entry
, bl
);
802 return append_io_events(event_entry
.get_event_type(), {bl
}, offset
, length
,
803 flush_entry
, filter_ret_val
);
806 template <typename I
>
807 uint64_t Journal
<I
>::append_io_events(journal::EventType event_type
,
808 const Bufferlists
&bufferlists
,
809 uint64_t offset
, size_t length
,
810 bool flush_entry
, int filter_ret_val
) {
811 ceph_assert(!bufferlists
.empty());
815 std::lock_guard locker
{m_lock
};
816 ceph_assert(m_state
== STATE_READY
);
819 ceph_assert(tid
!= 0);
823 for (auto &bl
: bufferlists
) {
824 ceph_assert(bl
.length() <= m_max_append_size
);
825 futures
.push_back(m_journaler
->append(m_tag_tid
, bl
));
829 std::lock_guard event_locker
{m_event_lock
};
830 m_events
[tid
] = Event(futures
, offset
, length
, filter_ret_val
);
833 CephContext
*cct
= m_image_ctx
.cct
;
834 ldout(cct
, 20) << this << " " << __func__
<< ": "
835 << "event=" << event_type
<< ", "
836 << "offset=" << offset
<< ", "
837 << "length=" << length
<< ", "
838 << "flush=" << flush_entry
<< ", tid=" << tid
<< dendl
;
840 Context
*on_safe
= create_async_context_callback(
841 m_image_ctx
, new C_IOEventSafe(this, tid
));
843 futures
.back().flush(on_safe
);
845 futures
.back().wait(on_safe
);
851 template <typename I
>
852 void Journal
<I
>::commit_io_event(uint64_t tid
, int r
) {
853 CephContext
*cct
= m_image_ctx
.cct
;
854 ldout(cct
, 20) << this << " " << __func__
<< ": tid=" << tid
<< ", "
857 std::lock_guard event_locker
{m_event_lock
};
858 typename
Events::iterator it
= m_events
.find(tid
);
859 if (it
== m_events
.end()) {
862 complete_event(it
, r
);
865 template <typename I
>
866 void Journal
<I
>::commit_io_event_extent(uint64_t tid
, uint64_t offset
,
867 uint64_t length
, int r
) {
868 ceph_assert(length
> 0);
870 CephContext
*cct
= m_image_ctx
.cct
;
871 ldout(cct
, 20) << this << " " << __func__
<< ": tid=" << tid
<< ", "
872 << "offset=" << offset
<< ", "
873 << "length=" << length
<< ", "
874 << "r=" << r
<< dendl
;
876 std::lock_guard event_locker
{m_event_lock
};
877 typename
Events::iterator it
= m_events
.find(tid
);
878 if (it
== m_events
.end()) {
882 Event
&event
= it
->second
;
883 if (event
.ret_val
== 0 && r
< 0) {
887 ExtentInterval extent
;
888 extent
.insert(offset
, length
);
890 ExtentInterval intersect
;
891 intersect
.intersection_of(extent
, event
.pending_extents
);
893 event
.pending_extents
.subtract(intersect
);
894 if (!event
.pending_extents
.empty()) {
895 ldout(cct
, 20) << this << " " << __func__
<< ": "
896 << "pending extents: " << event
.pending_extents
<< dendl
;
899 complete_event(it
, event
.ret_val
);
902 template <typename I
>
903 void Journal
<I
>::append_op_event(uint64_t op_tid
,
904 journal::EventEntry
&&event_entry
,
906 ceph_assert(ceph_mutex_is_locked(m_image_ctx
.owner_lock
));
909 event_entry
.timestamp
= ceph_clock_now();
910 encode(event_entry
, bl
);
914 std::lock_guard locker
{m_lock
};
915 ceph_assert(m_state
== STATE_READY
);
917 future
= m_journaler
->append(m_tag_tid
, bl
);
919 // delay committing op event to ensure consistent replay
920 ceph_assert(m_op_futures
.count(op_tid
) == 0);
921 m_op_futures
[op_tid
] = future
;
924 on_safe
= create_async_context_callback(m_image_ctx
, on_safe
);
925 on_safe
= new LambdaContext([this, on_safe
](int r
) {
926 // ensure all committed IO before this op is committed
927 m_journaler
->flush_commit_position(on_safe
);
929 future
.flush(on_safe
);
931 CephContext
*cct
= m_image_ctx
.cct
;
932 ldout(cct
, 10) << this << " " << __func__
<< ": "
933 << "op_tid=" << op_tid
<< ", "
934 << "event=" << event_entry
.get_event_type() << dendl
;
937 template <typename I
>
938 void Journal
<I
>::commit_op_event(uint64_t op_tid
, int r
, Context
*on_safe
) {
939 CephContext
*cct
= m_image_ctx
.cct
;
940 ldout(cct
, 10) << this << " " << __func__
<< ": op_tid=" << op_tid
<< ", "
941 << "r=" << r
<< dendl
;
943 journal::EventEntry
event_entry((journal::OpFinishEvent(op_tid
, r
)),
947 encode(event_entry
, bl
);
949 Future op_start_future
;
950 Future op_finish_future
;
952 std::lock_guard locker
{m_lock
};
953 ceph_assert(m_state
== STATE_READY
);
955 // ready to commit op event
956 auto it
= m_op_futures
.find(op_tid
);
957 ceph_assert(it
!= m_op_futures
.end());
958 op_start_future
= it
->second
;
959 m_op_futures
.erase(it
);
961 op_finish_future
= m_journaler
->append(m_tag_tid
, bl
);
964 op_finish_future
.flush(create_async_context_callback(
965 m_image_ctx
, new C_OpEventSafe(this, op_tid
, op_start_future
,
966 op_finish_future
, on_safe
)));
969 template <typename I
>
970 void Journal
<I
>::replay_op_ready(uint64_t op_tid
, Context
*on_resume
) {
971 CephContext
*cct
= m_image_ctx
.cct
;
972 ldout(cct
, 10) << this << " " << __func__
<< ": op_tid=" << op_tid
<< dendl
;
975 std::lock_guard locker
{m_lock
};
976 ceph_assert(m_journal_replay
!= nullptr);
977 m_journal_replay
->replay_op_ready(op_tid
, on_resume
);
981 template <typename I
>
982 void Journal
<I
>::flush_event(uint64_t tid
, Context
*on_safe
) {
983 CephContext
*cct
= m_image_ctx
.cct
;
984 ldout(cct
, 20) << this << " " << __func__
<< ": tid=" << tid
<< ", "
985 << "on_safe=" << on_safe
<< dendl
;
987 on_safe
= create_context_callback
<Context
>(on_safe
, this);
991 std::lock_guard event_locker
{m_event_lock
};
992 future
= wait_event(m_lock
, tid
, on_safe
);
995 if (future
.is_valid()) {
996 future
.flush(nullptr);
1000 template <typename I
>
1001 void Journal
<I
>::wait_event(uint64_t tid
, Context
*on_safe
) {
1002 CephContext
*cct
= m_image_ctx
.cct
;
1003 ldout(cct
, 20) << this << " " << __func__
<< ": tid=" << tid
<< ", "
1004 << "on_safe=" << on_safe
<< dendl
;
1006 on_safe
= create_context_callback
<Context
>(on_safe
, this);
1008 std::lock_guard event_locker
{m_event_lock
};
1009 wait_event(m_lock
, tid
, on_safe
);
1012 template <typename I
>
1013 typename Journal
<I
>::Future Journal
<I
>::wait_event(ceph::mutex
&lock
, uint64_t tid
,
1015 ceph_assert(ceph_mutex_is_locked(m_event_lock
));
1016 CephContext
*cct
= m_image_ctx
.cct
;
1018 typename
Events::iterator it
= m_events
.find(tid
);
1019 ceph_assert(it
!= m_events
.end());
1021 Event
&event
= it
->second
;
1023 // journal entry already safe
1024 ldout(cct
, 20) << this << " " << __func__
<< ": "
1025 << "journal entry already safe" << dendl
;
1026 m_image_ctx
.op_work_queue
->queue(on_safe
, event
.ret_val
);
1030 event
.on_safe_contexts
.push_back(create_async_context_callback(m_image_ctx
,
1032 return event
.futures
.back();
1035 template <typename I
>
1036 void Journal
<I
>::start_external_replay(journal::Replay
<I
> **journal_replay
,
1037 Context
*on_start
) {
1038 CephContext
*cct
= m_image_ctx
.cct
;
1039 ldout(cct
, 20) << this << " " << __func__
<< dendl
;
1041 std::lock_guard locker
{m_lock
};
1042 ceph_assert(m_state
== STATE_READY
);
1043 ceph_assert(m_journal_replay
== nullptr);
1045 on_start
= util::create_async_context_callback(m_image_ctx
, on_start
);
1046 on_start
= new LambdaContext(
1047 [this, journal_replay
, on_start
](int r
) {
1048 handle_start_external_replay(r
, journal_replay
, on_start
);
1051 // safely flush all in-flight events before starting external replay
1052 m_journaler
->stop_append(util::create_async_context_callback(m_image_ctx
,
1056 template <typename I
>
1057 void Journal
<I
>::handle_start_external_replay(int r
,
1058 journal::Replay
<I
> **journal_replay
,
1059 Context
*on_finish
) {
1060 CephContext
*cct
= m_image_ctx
.cct
;
1061 ldout(cct
, 20) << this << " " << __func__
<< dendl
;
1063 std::lock_guard locker
{m_lock
};
1064 ceph_assert(m_state
== STATE_READY
);
1065 ceph_assert(m_journal_replay
== nullptr);
1068 lderr(cct
) << this << " " << __func__
<< ": "
1069 << "failed to stop recording: " << cpp_strerror(r
) << dendl
;
1070 *journal_replay
= nullptr;
1072 // get back to a sane-state
1074 on_finish
->complete(r
);
1078 transition_state(STATE_REPLAYING
, 0);
1079 m_journal_replay
= journal::Replay
<I
>::create(m_image_ctx
);
1080 *journal_replay
= m_journal_replay
;
1081 on_finish
->complete(0);
1084 template <typename I
>
1085 void Journal
<I
>::stop_external_replay() {
1086 CephContext
*cct
= m_image_ctx
.cct
;
1087 ldout(cct
, 20) << this << " " << __func__
<< dendl
;
1089 std::lock_guard locker
{m_lock
};
1090 ceph_assert(m_journal_replay
!= nullptr);
1091 ceph_assert(m_state
== STATE_REPLAYING
);
1093 delete m_journal_replay
;
1094 m_journal_replay
= nullptr;
1096 if (m_close_pending
) {
1097 destroy_journaler(0);
1104 template <typename I
>
1105 void Journal
<I
>::create_journaler() {
1106 CephContext
*cct
= m_image_ctx
.cct
;
1107 ldout(cct
, 20) << this << " " << __func__
<< dendl
;
1109 ceph_assert(ceph_mutex_is_locked(m_lock
));
1110 ceph_assert(m_state
== STATE_UNINITIALIZED
|| m_state
== STATE_RESTARTING_REPLAY
);
1111 ceph_assert(m_journaler
== NULL
);
1113 transition_state(STATE_INITIALIZING
, 0);
1114 ::journal::Settings settings
;
1115 settings
.commit_interval
=
1116 m_image_ctx
.config
.template get_val
<double>("rbd_journal_commit_age");
1117 settings
.max_payload_bytes
=
1118 m_image_ctx
.config
.template get_val
<Option::size_t>("rbd_journal_max_payload_bytes");
1119 settings
.max_concurrent_object_sets
=
1120 m_image_ctx
.config
.template get_val
<uint64_t>("rbd_journal_max_concurrent_object_sets");
1121 // TODO: a configurable filter to exclude certain peers from being
1123 settings
.ignored_laggy_clients
= {IMAGE_CLIENT_ID
};
1125 m_journaler
= new Journaler(m_work_queue
, m_timer
, m_timer_lock
,
1126 m_image_ctx
.md_ctx
, m_image_ctx
.id
,
1127 IMAGE_CLIENT_ID
, settings
, nullptr);
1128 m_journaler
->add_listener(&m_metadata_listener
);
1130 Context
*ctx
= create_async_context_callback(
1131 m_image_ctx
, create_context_callback
<
1132 Journal
<I
>, &Journal
<I
>::handle_open
>(this));
1133 auto open_req
= journal::OpenRequest
<I
>::create(&m_image_ctx
, m_journaler
,
1134 &m_lock
, &m_client_meta
,
1135 &m_tag_tid
, &m_tag_data
, ctx
);
1139 template <typename I
>
1140 void Journal
<I
>::destroy_journaler(int r
) {
1141 CephContext
*cct
= m_image_ctx
.cct
;
1142 ldout(cct
, 20) << this << " " << __func__
<< ": r=" << r
<< dendl
;
1144 ceph_assert(ceph_mutex_is_locked(m_lock
));
1146 delete m_journal_replay
;
1147 m_journal_replay
= NULL
;
1149 m_journaler
->remove_listener(&m_metadata_listener
);
1151 transition_state(STATE_CLOSING
, r
);
1153 Context
*ctx
= create_async_context_callback(
1154 m_image_ctx
, create_context_callback
<
1155 Journal
<I
>, &Journal
<I
>::handle_journal_destroyed
>(this));
1156 ctx
= new LambdaContext(
1157 [this, ctx
](int r
) {
1158 std::lock_guard locker
{m_lock
};
1159 m_journaler
->shut_down(ctx
);
1161 ctx
= create_async_context_callback(m_image_ctx
, ctx
);
1162 m_async_journal_op_tracker
.wait_for_ops(ctx
);
1165 template <typename I
>
1166 void Journal
<I
>::recreate_journaler(int r
) {
1167 CephContext
*cct
= m_image_ctx
.cct
;
1168 ldout(cct
, 20) << this << " " << __func__
<< ": r=" << r
<< dendl
;
1170 ceph_assert(ceph_mutex_is_locked(m_lock
));
1171 ceph_assert(m_state
== STATE_FLUSHING_RESTART
||
1172 m_state
== STATE_FLUSHING_REPLAY
);
1174 delete m_journal_replay
;
1175 m_journal_replay
= NULL
;
1177 m_journaler
->remove_listener(&m_metadata_listener
);
1179 transition_state(STATE_RESTARTING_REPLAY
, r
);
1180 m_journaler
->shut_down(create_async_context_callback(
1181 m_image_ctx
, create_context_callback
<
1182 Journal
<I
>, &Journal
<I
>::handle_journal_destroyed
>(this)));
1185 template <typename I
>
1186 void Journal
<I
>::complete_event(typename
Events::iterator it
, int r
) {
1187 ceph_assert(ceph_mutex_is_locked(m_event_lock
));
1188 ceph_assert(m_state
== STATE_READY
);
1190 CephContext
*cct
= m_image_ctx
.cct
;
1191 ldout(cct
, 20) << this << " " << __func__
<< ": tid=" << it
->first
<< " "
1192 << "r=" << r
<< dendl
;
1194 Event
&event
= it
->second
;
1195 if (r
< 0 && r
== event
.filter_ret_val
) {
1196 // ignore allowed error codes
1200 // event recorded to journal but failed to update disk, we cannot
1201 // commit this IO event. this event must be replayed.
1202 ceph_assert(event
.safe
);
1203 lderr(cct
) << this << " " << __func__
<< ": "
1204 << "failed to commit IO to disk, replay required: "
1205 << cpp_strerror(r
) << dendl
;
1208 event
.committed_io
= true;
1211 for (auto &future
: event
.futures
) {
1212 m_journaler
->committed(future
);
1219 template <typename I
>
1220 void Journal
<I
>::start_append() {
1221 ceph_assert(ceph_mutex_is_locked(m_lock
));
1223 m_journaler
->start_append(
1224 m_image_ctx
.config
.template get_val
<uint64_t>("rbd_journal_object_max_in_flight_appends"));
1225 if (!m_image_ctx
.config
.template get_val
<bool>("rbd_journal_object_writethrough_until_flush")) {
1226 m_journaler
->set_append_batch_options(
1227 m_image_ctx
.config
.template get_val
<uint64_t>("rbd_journal_object_flush_interval"),
1228 m_image_ctx
.config
.template get_val
<Option::size_t>("rbd_journal_object_flush_bytes"),
1229 m_image_ctx
.config
.template get_val
<double>("rbd_journal_object_flush_age"));
1232 transition_state(STATE_READY
, 0);
1235 template <typename I
>
1236 void Journal
<I
>::handle_open(int r
) {
1237 CephContext
*cct
= m_image_ctx
.cct
;
1238 ldout(cct
, 20) << this << " " << __func__
<< ": r=" << r
<< dendl
;
1240 std::lock_guard locker
{m_lock
};
1241 ceph_assert(m_state
== STATE_INITIALIZING
);
1244 lderr(cct
) << this << " " << __func__
<< ": "
1245 << "failed to initialize journal: " << cpp_strerror(r
)
1247 destroy_journaler(r
);
1251 m_tag_class
= m_client_meta
.tag_class
;
1252 m_max_append_size
= m_journaler
->get_max_append_size();
1253 ldout(cct
, 20) << this << " " << __func__
<< ": "
1254 << "tag_class=" << m_tag_class
<< ", "
1255 << "max_append_size=" << m_max_append_size
<< dendl
;
1257 transition_state(STATE_REPLAYING
, 0);
1258 m_journal_replay
= journal::Replay
<I
>::create(m_image_ctx
);
1259 m_journaler
->start_replay(&m_replay_handler
);
1262 template <typename I
>
1263 void Journal
<I
>::handle_replay_ready() {
1264 CephContext
*cct
= m_image_ctx
.cct
;
1265 ReplayEntry replay_entry
;
1267 std::lock_guard locker
{m_lock
};
1268 if (m_state
!= STATE_REPLAYING
) {
1272 ldout(cct
, 20) << this << " " << __func__
<< dendl
;
1273 if (!m_journaler
->try_pop_front(&replay_entry
)) {
1277 // only one entry should be in-flight at a time
1278 ceph_assert(!m_processing_entry
);
1279 m_processing_entry
= true;
1282 m_async_journal_op_tracker
.start_op();
1284 bufferlist data
= replay_entry
.get_data();
1285 auto it
= data
.cbegin();
1287 journal::EventEntry event_entry
;
1288 int r
= m_journal_replay
->decode(&it
, &event_entry
);
1290 lderr(cct
) << this << " " << __func__
<< ": "
1291 << "failed to decode journal event entry" << dendl
;
1292 handle_replay_process_safe(replay_entry
, r
);
1296 Context
*on_ready
= create_context_callback
<
1297 Journal
<I
>, &Journal
<I
>::handle_replay_process_ready
>(this);
1298 Context
*on_commit
= new C_ReplayProcessSafe(this, std::move(replay_entry
));
1299 m_journal_replay
->process(event_entry
, on_ready
, on_commit
);
1302 template <typename I
>
1303 void Journal
<I
>::handle_replay_complete(int r
) {
1304 CephContext
*cct
= m_image_ctx
.cct
;
1306 bool cancel_ops
= false;
1308 std::lock_guard locker
{m_lock
};
1309 if (m_state
!= STATE_REPLAYING
) {
1313 ldout(cct
, 20) << this << " " << __func__
<< ": r=" << r
<< dendl
;
1316 transition_state(STATE_FLUSHING_RESTART
, r
);
1318 // state might change back to FLUSHING_RESTART on flush error
1319 transition_state(STATE_FLUSHING_REPLAY
, 0);
1323 Context
*ctx
= new LambdaContext([this, cct
](int r
) {
1324 ldout(cct
, 20) << this << " handle_replay_complete: "
1325 << "handle shut down replay" << dendl
;
1329 std::lock_guard locker
{m_lock
};
1330 ceph_assert(m_state
== STATE_FLUSHING_RESTART
||
1331 m_state
== STATE_FLUSHING_REPLAY
);
1335 if (state
== STATE_FLUSHING_RESTART
) {
1336 handle_flushing_restart(0);
1338 handle_flushing_replay();
1341 ctx
= new LambdaContext([this, ctx
](int r
) {
1342 // ensure the commit position is flushed to disk
1343 m_journaler
->flush_commit_position(ctx
);
1345 ctx
= create_async_context_callback(m_image_ctx
, ctx
);
1346 ctx
= new LambdaContext([this, ctx
](int r
) {
1347 m_async_journal_op_tracker
.wait_for_ops(ctx
);
1349 ctx
= new LambdaContext([this, cct
, cancel_ops
, ctx
](int r
) {
1350 ldout(cct
, 20) << this << " handle_replay_complete: "
1351 << "shut down replay" << dendl
;
1352 m_journal_replay
->shut_down(cancel_ops
, ctx
);
1355 m_journaler
->stop_replay(ctx
);
1358 template <typename I
>
1359 void Journal
<I
>::handle_replay_process_ready(int r
) {
1360 // journal::Replay is ready for more events -- attempt to pop another
1361 CephContext
*cct
= m_image_ctx
.cct
;
1362 ldout(cct
, 20) << this << " " << __func__
<< dendl
;
1364 ceph_assert(r
== 0);
1366 std::lock_guard locker
{m_lock
};
1367 ceph_assert(m_processing_entry
);
1368 m_processing_entry
= false;
1370 handle_replay_ready();
1373 template <typename I
>
1374 void Journal
<I
>::handle_replay_process_safe(ReplayEntry replay_entry
, int r
) {
1375 CephContext
*cct
= m_image_ctx
.cct
;
1377 std::unique_lock locker
{m_lock
};
1378 ceph_assert(m_state
== STATE_REPLAYING
||
1379 m_state
== STATE_FLUSHING_RESTART
||
1380 m_state
== STATE_FLUSHING_REPLAY
);
1382 ldout(cct
, 20) << this << " " << __func__
<< ": r=" << r
<< dendl
;
1384 if (r
!= -ECANCELED
) {
1385 lderr(cct
) << this << " " << __func__
<< ": "
1386 << "failed to commit journal event to disk: "
1387 << cpp_strerror(r
) << dendl
;
1390 if (m_state
== STATE_REPLAYING
) {
1391 // abort the replay if we have an error
1392 transition_state(STATE_FLUSHING_RESTART
, r
);
1395 // stop replay, shut down, and restart
1396 Context
* ctx
= create_context_callback
<
1397 Journal
<I
>, &Journal
<I
>::handle_flushing_restart
>(this);
1398 ctx
= new LambdaContext([this, ctx
](int r
) {
1399 // ensure the commit position is flushed to disk
1400 m_journaler
->flush_commit_position(ctx
);
1402 ctx
= new LambdaContext([this, cct
, ctx
](int r
) {
1403 ldout(cct
, 20) << this << " handle_replay_process_safe: "
1404 << "shut down replay" << dendl
;
1406 std::lock_guard locker
{m_lock
};
1407 ceph_assert(m_state
== STATE_FLUSHING_RESTART
);
1410 m_journal_replay
->shut_down(true, ctx
);
1412 m_journaler
->stop_replay(ctx
);
1413 m_async_journal_op_tracker
.finish_op();
1415 } else if (m_state
== STATE_FLUSHING_REPLAY
) {
1416 // end-of-replay flush in-progress -- we need to restart replay
1417 transition_state(STATE_FLUSHING_RESTART
, r
);
1419 m_async_journal_op_tracker
.finish_op();
1423 // only commit the entry if written successfully
1424 m_journaler
->committed(replay_entry
);
1427 m_async_journal_op_tracker
.finish_op();
1430 template <typename I
>
1431 void Journal
<I
>::handle_flushing_restart(int r
) {
1432 std::lock_guard locker
{m_lock
};
1434 CephContext
*cct
= m_image_ctx
.cct
;
1435 ldout(cct
, 20) << this << " " << __func__
<< dendl
;
1437 ceph_assert(r
== 0);
1438 ceph_assert(m_state
== STATE_FLUSHING_RESTART
);
1439 if (m_close_pending
) {
1440 destroy_journaler(r
);
1444 recreate_journaler(r
);
1447 template <typename I
>
1448 void Journal
<I
>::handle_flushing_replay() {
1449 std::lock_guard locker
{m_lock
};
1451 CephContext
*cct
= m_image_ctx
.cct
;
1452 ldout(cct
, 20) << this << " " << __func__
<< dendl
;
1454 ceph_assert(m_state
== STATE_FLUSHING_REPLAY
||
1455 m_state
== STATE_FLUSHING_RESTART
);
1456 if (m_close_pending
) {
1457 destroy_journaler(0);
1459 } else if (m_state
== STATE_FLUSHING_RESTART
) {
1460 // failed to replay one-or-more events -- restart
1461 recreate_journaler(0);
1465 delete m_journal_replay
;
1466 m_journal_replay
= NULL
;
1472 template <typename I
>
1473 void Journal
<I
>::handle_recording_stopped(int r
) {
1474 CephContext
*cct
= m_image_ctx
.cct
;
1475 ldout(cct
, 20) << this << " " << __func__
<< ": r=" << r
<< dendl
;
1477 std::lock_guard locker
{m_lock
};
1478 ceph_assert(m_state
== STATE_STOPPING
);
1480 destroy_journaler(r
);
1483 template <typename I
>
1484 void Journal
<I
>::handle_journal_destroyed(int r
) {
1485 CephContext
*cct
= m_image_ctx
.cct
;
1486 ldout(cct
, 20) << this << " " << __func__
<< ": r=" << r
<< dendl
;
1489 lderr(cct
) << this << " " << __func__
1490 << "error detected while closing journal: " << cpp_strerror(r
)
1494 std::lock_guard locker
{m_lock
};
1496 m_journaler
= nullptr;
1498 ceph_assert(m_state
== STATE_CLOSING
|| m_state
== STATE_RESTARTING_REPLAY
);
1499 if (m_state
== STATE_RESTARTING_REPLAY
) {
1504 transition_state(STATE_CLOSED
, r
);
1507 template <typename I
>
1508 void Journal
<I
>::handle_io_event_safe(int r
, uint64_t tid
) {
1509 CephContext
*cct
= m_image_ctx
.cct
;
1510 ldout(cct
, 20) << this << " " << __func__
<< ": r=" << r
<< ", "
1511 << "tid=" << tid
<< dendl
;
1513 // journal will be flushed before closing
1514 ceph_assert(m_state
== STATE_READY
|| m_state
== STATE_STOPPING
);
1516 lderr(cct
) << this << " " << __func__
<< ": "
1517 << "failed to commit IO event: " << cpp_strerror(r
) << dendl
;
1520 Contexts on_safe_contexts
;
1522 std::lock_guard event_locker
{m_event_lock
};
1523 typename
Events::iterator it
= m_events
.find(tid
);
1524 ceph_assert(it
!= m_events
.end());
1526 Event
&event
= it
->second
;
1527 on_safe_contexts
.swap(event
.on_safe_contexts
);
1529 if (r
< 0 || event
.committed_io
) {
1530 // failed journal write so IO won't be sent -- or IO extent was
1531 // overwritten by future IO operations so this was a no-op IO event
1533 for (auto &future
: event
.futures
) {
1534 m_journaler
->committed(future
);
1538 if (event
.committed_io
) {
1545 ldout(cct
, 20) << this << " " << __func__
<< ": "
1546 << "completing tid=" << tid
<< dendl
;
1548 // alert the cache about the journal event status
1549 for (Contexts::iterator it
= on_safe_contexts
.begin();
1550 it
!= on_safe_contexts
.end(); ++it
) {
1555 template <typename I
>
1556 void Journal
<I
>::handle_op_event_safe(int r
, uint64_t tid
,
1557 const Future
&op_start_future
,
1558 const Future
&op_finish_future
,
1560 CephContext
*cct
= m_image_ctx
.cct
;
1561 ldout(cct
, 20) << this << " " << __func__
<< ": r=" << r
<< ", "
1562 << "tid=" << tid
<< dendl
;
1564 // journal will be flushed before closing
1565 ceph_assert(m_state
== STATE_READY
|| m_state
== STATE_STOPPING
);
1567 lderr(cct
) << this << " " << __func__
<< ": "
1568 << "failed to commit op event: " << cpp_strerror(r
) << dendl
;
1571 m_journaler
->committed(op_start_future
);
1572 m_journaler
->committed(op_finish_future
);
1574 // reduce the replay window after committing an op event
1575 m_journaler
->flush_commit_position(on_safe
);
1578 template <typename I
>
1579 void Journal
<I
>::stop_recording() {
1580 ceph_assert(ceph_mutex_is_locked(m_lock
));
1581 ceph_assert(m_journaler
!= NULL
);
1583 ceph_assert(m_state
== STATE_READY
);
1584 transition_state(STATE_STOPPING
, 0);
1586 m_journaler
->stop_append(util::create_async_context_callback(
1587 m_image_ctx
, create_context_callback
<
1588 Journal
<I
>, &Journal
<I
>::handle_recording_stopped
>(this)));
1591 template <typename I
>
1592 void Journal
<I
>::transition_state(State state
, int r
) {
1593 CephContext
*cct
= m_image_ctx
.cct
;
1594 ldout(cct
, 20) << this << " " << __func__
<< ": new state=" << state
<< dendl
;
1595 ceph_assert(ceph_mutex_is_locked(m_lock
));
1598 if (m_error_result
== 0 && r
< 0) {
1602 if (is_steady_state()) {
1603 auto wait_for_state_contexts(std::move(m_wait_for_state_contexts
));
1604 m_wait_for_state_contexts
.clear();
1606 for (auto ctx
: wait_for_state_contexts
) {
1607 ctx
->complete(m_error_result
);
1612 template <typename I
>
1613 bool Journal
<I
>::is_steady_state() const {
1614 ceph_assert(ceph_mutex_is_locked(m_lock
));
1619 case STATE_UNINITIALIZED
:
1620 case STATE_INITIALIZING
:
1621 case STATE_REPLAYING
:
1622 case STATE_FLUSHING_RESTART
:
1623 case STATE_RESTARTING_REPLAY
:
1624 case STATE_FLUSHING_REPLAY
:
1625 case STATE_STOPPING
:
1632 template <typename I
>
1633 void Journal
<I
>::wait_for_steady_state(Context
*on_state
) {
1634 ceph_assert(ceph_mutex_is_locked(m_lock
));
1635 ceph_assert(!is_steady_state());
1637 CephContext
*cct
= m_image_ctx
.cct
;
1638 ldout(cct
, 20) << this << " " << __func__
<< ": on_state=" << on_state
1640 m_wait_for_state_contexts
.push_back(on_state
);
1643 template <typename I
>
1644 int Journal
<I
>::is_resync_requested(bool *do_resync
) {
1645 std::lock_guard l
{m_lock
};
1646 return check_resync_requested(do_resync
);
1649 template <typename I
>
1650 int Journal
<I
>::check_resync_requested(bool *do_resync
) {
1651 CephContext
*cct
= m_image_ctx
.cct
;
1652 ldout(cct
, 20) << this << " " << __func__
<< dendl
;
1654 ceph_assert(ceph_mutex_is_locked(m_lock
));
1655 ceph_assert(do_resync
!= nullptr);
1657 cls::journal::Client client
;
1658 int r
= m_journaler
->get_cached_client(IMAGE_CLIENT_ID
, &client
);
1660 lderr(cct
) << this << " " << __func__
<< ": "
1661 << "failed to retrieve client: " << cpp_strerror(r
) << dendl
;
1665 librbd::journal::ClientData client_data
;
1666 auto bl_it
= client
.data
.cbegin();
1668 decode(client_data
, bl_it
);
1669 } catch (const buffer::error
&err
) {
1670 lderr(cct
) << this << " " << __func__
<< ": "
1671 << "failed to decode client data: " << err
.what() << dendl
;
1675 journal::ImageClientMeta
*image_client_meta
=
1676 boost::get
<journal::ImageClientMeta
>(&client_data
.client_meta
);
1677 if (image_client_meta
== nullptr) {
1678 lderr(cct
) << this << " " << __func__
<< ": "
1679 << "failed to access image client meta struct" << dendl
;
1683 *do_resync
= image_client_meta
->resync_requested
;
1688 struct C_RefreshTags
: public Context
{
1689 AsyncOpTracker
&async_op_tracker
;
1690 Context
*on_finish
= nullptr;
1693 ceph::make_mutex("librbd::Journal::C_RefreshTags::lock");
1694 uint64_t tag_tid
= 0;
1695 journal::TagData tag_data
;
1697 explicit C_RefreshTags(AsyncOpTracker
&async_op_tracker
)
1698 : async_op_tracker(async_op_tracker
) {
1699 async_op_tracker
.start_op();
1701 ~C_RefreshTags() override
{
1702 async_op_tracker
.finish_op();
1705 void finish(int r
) override
{
1706 on_finish
->complete(r
);
1710 template <typename I
>
1711 void Journal
<I
>::handle_metadata_updated() {
1712 CephContext
*cct
= m_image_ctx
.cct
;
1713 std::lock_guard locker
{m_lock
};
1715 if (m_state
!= STATE_READY
&& !is_journal_replaying(m_lock
)) {
1717 } else if (is_tag_owner(m_lock
)) {
1718 ldout(cct
, 20) << this << " " << __func__
<< ": primary image" << dendl
;
1720 } else if (m_listeners
.empty()) {
1721 ldout(cct
, 20) << this << " " << __func__
<< ": no listeners" << dendl
;
1725 uint64_t refresh_sequence
= ++m_refresh_sequence
;
1726 ldout(cct
, 20) << this << " " << __func__
<< ": "
1727 << "refresh_sequence=" << refresh_sequence
<< dendl
;
1729 // pull the most recent tags from the journal, decode, and
1730 // update the internal tag state
1731 C_RefreshTags
*refresh_ctx
= new C_RefreshTags(m_async_journal_op_tracker
);
1732 refresh_ctx
->on_finish
= new LambdaContext(
1733 [this, refresh_sequence
, refresh_ctx
](int r
) {
1734 handle_refresh_metadata(refresh_sequence
, refresh_ctx
->tag_tid
,
1735 refresh_ctx
->tag_data
, r
);
1737 C_DecodeTags
*decode_tags_ctx
= new C_DecodeTags(
1738 cct
, &refresh_ctx
->lock
, &refresh_ctx
->tag_tid
,
1739 &refresh_ctx
->tag_data
, refresh_ctx
);
1740 m_journaler
->get_tags(m_tag_tid
== 0 ? 0 : m_tag_tid
- 1, m_tag_class
,
1741 &decode_tags_ctx
->tags
, decode_tags_ctx
);
1744 template <typename I
>
1745 void Journal
<I
>::handle_refresh_metadata(uint64_t refresh_sequence
,
1747 journal::TagData tag_data
, int r
) {
1748 CephContext
*cct
= m_image_ctx
.cct
;
1749 std::unique_lock locker
{m_lock
};
1752 lderr(cct
) << this << " " << __func__
<< ": failed to refresh metadata: "
1753 << cpp_strerror(r
) << dendl
;
1755 } else if (m_state
!= STATE_READY
&& !is_journal_replaying(m_lock
)) {
1757 } else if (refresh_sequence
!= m_refresh_sequence
) {
1758 // another, more up-to-date refresh is in-flight
1762 ldout(cct
, 20) << this << " " << __func__
<< ": "
1763 << "refresh_sequence=" << refresh_sequence
<< ", "
1764 << "tag_tid=" << tag_tid
<< ", "
1765 << "tag_data=" << tag_data
<< dendl
;
1766 m_listener_cond
.wait(locker
, [this] { return !m_listener_notify
; });
1768 bool was_tag_owner
= is_tag_owner(m_lock
);
1769 if (m_tag_tid
< tag_tid
) {
1770 m_tag_tid
= tag_tid
;
1771 m_tag_data
= tag_data
;
1773 bool promoted_to_primary
= (!was_tag_owner
&& is_tag_owner(m_lock
));
1775 bool resync_requested
= false;
1776 r
= check_resync_requested(&resync_requested
);
1778 lderr(cct
) << this << " " << __func__
<< ": "
1779 << "failed to check if a resync was requested" << dendl
;
1783 Listeners
listeners(m_listeners
);
1784 m_listener_notify
= true;
1787 if (promoted_to_primary
) {
1788 for (auto listener
: listeners
) {
1789 listener
->handle_promoted();
1791 } else if (resync_requested
) {
1792 for (auto listener
: listeners
) {
1793 listener
->handle_resync();
1798 m_listener_notify
= false;
1799 m_listener_cond
.notify_all();
1802 template <typename I
>
1803 void Journal
<I
>::add_listener(journal::Listener
*listener
) {
1804 std::lock_guard locker
{m_lock
};
1805 m_listeners
.insert(listener
);
1808 template <typename I
>
1809 void Journal
<I
>::remove_listener(journal::Listener
*listener
) {
1810 std::unique_lock locker
{m_lock
};
1811 m_listener_cond
.wait(locker
, [this] { return !m_listener_notify
; });
1812 m_listeners
.erase(listener
);
1815 } // namespace librbd
1818 template class librbd::Journal
<librbd::ImageCtx
>;