1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "journal/JournalMetadata.h"
5 #include "journal/Utils.h"
6 #include "common/errno.h"
7 #include "common/Timer.h"
8 #include "cls/journal/cls_journal_client.h"
12 #define dout_subsys ceph_subsys_journaler
14 #define dout_prefix *_dout << "JournalMetadata: " << this << " "
18 using namespace cls::journal
;
22 struct C_GetClient
: public Context
{
24 librados::IoCtx
&ioctx
;
25 const std::string
&oid
;
26 AsyncOpTracker
&async_op_tracker
;
27 std::string client_id
;
28 cls::journal::Client
*client
;
33 C_GetClient(CephContext
*cct
, librados::IoCtx
&ioctx
, const std::string
&oid
,
34 AsyncOpTracker
&async_op_tracker
, const std::string
&client_id
,
35 cls::journal::Client
*client
, Context
*on_finish
)
36 : cct(cct
), ioctx(ioctx
), oid(oid
), async_op_tracker(async_op_tracker
),
37 client_id(client_id
), client(client
), on_finish(on_finish
) {
38 async_op_tracker
.start_op();
40 ~C_GetClient() override
{
41 async_op_tracker
.finish_op();
48 void send_get_client() {
49 ldout(cct
, 20) << "C_GetClient: " << __func__
<< dendl
;
51 librados::ObjectReadOperation op
;
52 client::get_client_start(&op
, client_id
);
54 librados::AioCompletion
*comp
= librados::Rados::aio_create_completion(
55 this, nullptr, &utils::rados_state_callback
<
56 C_GetClient
, &C_GetClient::handle_get_client
>);
58 int r
= ioctx
.aio_operate(oid
, comp
, &op
, &out_bl
);
63 void handle_get_client(int r
) {
64 ldout(cct
, 20) << "C_GetClient: " << __func__
<< ": r=" << r
<< dendl
;
67 bufferlist::iterator it
= out_bl
.begin();
68 r
= client::get_client_finish(&it
, client
);
73 void finish(int r
) override
{
74 on_finish
->complete(r
);
78 struct C_AllocateTag
: public Context
{
80 librados::IoCtx
&ioctx
;
81 const std::string
&oid
;
82 AsyncOpTracker
&async_op_tracker
;
89 C_AllocateTag(CephContext
*cct
, librados::IoCtx
&ioctx
,
90 const std::string
&oid
, AsyncOpTracker
&async_op_tracker
,
91 uint64_t tag_class
, const bufferlist
&data
, Tag
*tag
,
93 : cct(cct
), ioctx(ioctx
), oid(oid
), async_op_tracker(async_op_tracker
),
94 tag_class(tag_class
), tag(tag
), on_finish(on_finish
) {
95 async_op_tracker
.start_op();
98 ~C_AllocateTag() override
{
99 async_op_tracker
.finish_op();
103 send_get_next_tag_tid();
106 void send_get_next_tag_tid() {
107 ldout(cct
, 20) << "C_AllocateTag: " << __func__
<< dendl
;
109 librados::ObjectReadOperation op
;
110 client::get_next_tag_tid_start(&op
);
112 librados::AioCompletion
*comp
= librados::Rados::aio_create_completion(
113 this, nullptr, &utils::rados_state_callback
<
114 C_AllocateTag
, &C_AllocateTag::handle_get_next_tag_tid
>);
117 int r
= ioctx
.aio_operate(oid
, comp
, &op
, &out_bl
);
122 void handle_get_next_tag_tid(int r
) {
123 ldout(cct
, 20) << "C_AllocateTag: " << __func__
<< ": r=" << r
<< dendl
;
126 bufferlist::iterator iter
= out_bl
.begin();
127 r
= client::get_next_tag_tid_finish(&iter
, &tag
->tid
);
136 void send_tag_create() {
137 ldout(cct
, 20) << "C_AllocateTag: " << __func__
<< dendl
;
139 librados::ObjectWriteOperation op
;
140 client::tag_create(&op
, tag
->tid
, tag_class
, tag
->data
);
142 librados::AioCompletion
*comp
= librados::Rados::aio_create_completion(
143 this, nullptr, &utils::rados_state_callback
<
144 C_AllocateTag
, &C_AllocateTag::handle_tag_create
>);
146 int r
= ioctx
.aio_operate(oid
, comp
, &op
);
151 void handle_tag_create(int r
) {
152 ldout(cct
, 20) << "C_AllocateTag: " << __func__
<< ": r=" << r
<< dendl
;
155 send_get_next_tag_tid();
165 void send_get_tag() {
166 ldout(cct
, 20) << "C_AllocateTag: " << __func__
<< dendl
;
168 librados::ObjectReadOperation op
;
169 client::get_tag_start(&op
, tag
->tid
);
171 librados::AioCompletion
*comp
= librados::Rados::aio_create_completion(
172 this, nullptr, &utils::rados_state_callback
<
173 C_AllocateTag
, &C_AllocateTag::handle_get_tag
>);
176 int r
= ioctx
.aio_operate(oid
, comp
, &op
, &out_bl
);
181 void handle_get_tag(int r
) {
182 ldout(cct
, 20) << "C_AllocateTag: " << __func__
<< ": r=" << r
<< dendl
;
185 bufferlist::iterator iter
= out_bl
.begin();
187 cls::journal::Tag journal_tag
;
188 r
= client::get_tag_finish(&iter
, &journal_tag
);
196 void finish(int r
) override
{
197 on_finish
->complete(r
);
201 struct C_GetTag
: public Context
{
203 librados::IoCtx
&ioctx
;
204 const std::string
&oid
;
205 AsyncOpTracker
&async_op_tracker
;
207 JournalMetadata::Tag
*tag
;
212 C_GetTag(CephContext
*cct
, librados::IoCtx
&ioctx
, const std::string
&oid
,
213 AsyncOpTracker
&async_op_tracker
, uint64_t tag_tid
,
214 JournalMetadata::Tag
*tag
, Context
*on_finish
)
215 : cct(cct
), ioctx(ioctx
), oid(oid
), async_op_tracker(async_op_tracker
),
216 tag_tid(tag_tid
), tag(tag
), on_finish(on_finish
) {
217 async_op_tracker
.start_op();
219 ~C_GetTag() override
{
220 async_op_tracker
.finish_op();
227 void send_get_tag() {
228 librados::ObjectReadOperation op
;
229 client::get_tag_start(&op
, tag_tid
);
231 librados::AioCompletion
*comp
= librados::Rados::aio_create_completion(
232 this, nullptr, &utils::rados_state_callback
<
233 C_GetTag
, &C_GetTag::handle_get_tag
>);
235 int r
= ioctx
.aio_operate(oid
, comp
, &op
, &out_bl
);
240 void handle_get_tag(int r
) {
242 bufferlist::iterator iter
= out_bl
.begin();
243 r
= client::get_tag_finish(&iter
, tag
);
248 void finish(int r
) override
{
249 on_finish
->complete(r
);
253 struct C_GetTags
: public Context
{
255 librados::IoCtx
&ioctx
;
256 const std::string
&oid
;
257 const std::string
&client_id
;
258 AsyncOpTracker
&async_op_tracker
;
259 uint64_t start_after_tag_tid
;
260 boost::optional
<uint64_t> tag_class
;
261 JournalMetadata::Tags
*tags
;
264 const uint64_t MAX_RETURN
= 64;
267 C_GetTags(CephContext
*cct
, librados::IoCtx
&ioctx
, const std::string
&oid
,
268 const std::string
&client_id
, AsyncOpTracker
&async_op_tracker
,
269 uint64_t start_after_tag_tid
,
270 const boost::optional
<uint64_t> &tag_class
,
271 JournalMetadata::Tags
*tags
, Context
*on_finish
)
272 : cct(cct
), ioctx(ioctx
), oid(oid
), client_id(client_id
),
273 async_op_tracker(async_op_tracker
),
274 start_after_tag_tid(start_after_tag_tid
), tag_class(tag_class
),
275 tags(tags
), on_finish(on_finish
) {
276 async_op_tracker
.start_op();
278 ~C_GetTags() override
{
279 async_op_tracker
.finish_op();
286 void send_tag_list() {
287 librados::ObjectReadOperation op
;
288 client::tag_list_start(&op
, start_after_tag_tid
, MAX_RETURN
, client_id
,
291 librados::AioCompletion
*comp
= librados::Rados::aio_create_completion(
292 this, nullptr, &utils::rados_state_callback
<
293 C_GetTags
, &C_GetTags::handle_tag_list
>);
296 int r
= ioctx
.aio_operate(oid
, comp
, &op
, &out_bl
);
301 void handle_tag_list(int r
) {
303 std::set
<cls::journal::Tag
> journal_tags
;
304 bufferlist::iterator iter
= out_bl
.begin();
305 r
= client::tag_list_finish(&iter
, &journal_tags
);
307 for (auto &journal_tag
: journal_tags
) {
308 tags
->push_back(journal_tag
);
309 start_after_tag_tid
= journal_tag
.tid
;
312 if (journal_tags
.size() == MAX_RETURN
) {
321 void finish(int r
) override
{
322 on_finish
->complete(r
);
326 struct C_FlushCommitPosition
: public Context
{
327 Context
*commit_position_ctx
;
330 C_FlushCommitPosition(Context
*commit_position_ctx
, Context
*on_finish
)
331 : commit_position_ctx(commit_position_ctx
), on_finish(on_finish
) {
333 void finish(int r
) override
{
334 if (commit_position_ctx
!= nullptr) {
335 commit_position_ctx
->complete(r
);
337 on_finish
->complete(r
);
341 struct C_AssertActiveTag
: public Context
{
343 librados::IoCtx
&ioctx
;
344 const std::string
&oid
;
345 AsyncOpTracker
&async_op_tracker
;
346 std::string client_id
;
352 C_AssertActiveTag(CephContext
*cct
, librados::IoCtx
&ioctx
,
353 const std::string
&oid
, AsyncOpTracker
&async_op_tracker
,
354 const std::string
&client_id
, uint64_t tag_tid
,
356 : cct(cct
), ioctx(ioctx
), oid(oid
), async_op_tracker(async_op_tracker
),
357 client_id(client_id
), tag_tid(tag_tid
), on_finish(on_finish
) {
358 async_op_tracker
.start_op();
360 ~C_AssertActiveTag() override
{
361 async_op_tracker
.finish_op();
365 ldout(cct
, 20) << "C_AssertActiveTag: " << __func__
<< dendl
;
367 librados::ObjectReadOperation op
;
368 client::tag_list_start(&op
, tag_tid
, 2, client_id
, boost::none
);
370 librados::AioCompletion
*comp
= librados::Rados::aio_create_completion(
371 this, nullptr, &utils::rados_state_callback
<
372 C_AssertActiveTag
, &C_AssertActiveTag::handle_send
>);
374 int r
= ioctx
.aio_operate(oid
, comp
, &op
, &out_bl
);
379 void handle_send(int r
) {
380 ldout(cct
, 20) << "C_AssertActiveTag: " << __func__
<< ": r=" << r
<< dendl
;
382 std::set
<cls::journal::Tag
> tags
;
384 bufferlist::iterator it
= out_bl
.begin();
385 r
= client::tag_list_finish(&it
, &tags
);
388 // NOTE: since 0 is treated as an uninitialized list filter, we need to
389 // load to entries and look at the last tid
390 if (r
== 0 && !tags
.empty() && tags
.rbegin()->tid
> tag_tid
) {
396 void finish(int r
) override
{
397 on_finish
->complete(r
);
401 } // anonymous namespace
403 JournalMetadata::JournalMetadata(ContextWQ
*work_queue
, SafeTimer
*timer
,
404 Mutex
*timer_lock
, librados::IoCtx
&ioctx
,
405 const std::string
&oid
,
406 const std::string
&client_id
,
407 const Settings
&settings
)
408 : RefCountedObject(NULL
, 0), m_cct(NULL
), m_oid(oid
),
409 m_client_id(client_id
), m_settings(settings
), m_order(0),
410 m_splay_width(0), m_pool_id(-1), m_initialized(false),
411 m_work_queue(work_queue
), m_timer(timer
), m_timer_lock(timer_lock
),
412 m_lock("JournalMetadata::m_lock"), m_commit_tid(0), m_watch_ctx(this),
413 m_watch_handle(0), m_minimum_set(0), m_active_set(0),
414 m_update_notifications(0), m_commit_position_ctx(NULL
),
415 m_commit_position_task_ctx(NULL
) {
417 m_cct
= reinterpret_cast<CephContext
*>(m_ioctx
.cct());
420 JournalMetadata::~JournalMetadata() {
421 Mutex::Locker
locker(m_lock
);
422 assert(!m_initialized
);
425 void JournalMetadata::init(Context
*on_finish
) {
427 Mutex::Locker
locker(m_lock
);
428 assert(!m_initialized
);
429 m_initialized
= true;
432 // chain the init sequence (reverse order)
433 on_finish
= utils::create_async_context_callback(
435 on_finish
= new C_ImmutableMetadata(this, on_finish
);
436 on_finish
= new FunctionContext([this, on_finish
](int r
) {
438 lderr(m_cct
) << __func__
<< ": failed to watch journal"
439 << cpp_strerror(r
) << dendl
;
440 Mutex::Locker
locker(m_lock
);
442 on_finish
->complete(r
);
446 get_immutable_metadata(&m_order
, &m_splay_width
, &m_pool_id
, on_finish
);
449 librados::AioCompletion
*comp
= librados::Rados::aio_create_completion(
450 on_finish
, nullptr, utils::rados_ctx_callback
);
451 int r
= m_ioctx
.aio_watch(m_oid
, comp
, &m_watch_handle
, &m_watch_ctx
);
456 void JournalMetadata::shut_down(Context
*on_finish
) {
458 ldout(m_cct
, 20) << __func__
<< dendl
;
460 uint64_t watch_handle
= 0;
462 Mutex::Locker
locker(m_lock
);
463 m_initialized
= false;
464 std::swap(watch_handle
, m_watch_handle
);
467 // chain the shut down sequence (reverse order)
468 on_finish
= utils::create_async_context_callback(
470 on_finish
= new FunctionContext([this, on_finish
](int r
) {
471 ldout(m_cct
, 20) << "shut_down: waiting for ops" << dendl
;
472 m_async_op_tracker
.wait_for_ops(on_finish
);
474 on_finish
= new FunctionContext([this, on_finish
](int r
) {
475 ldout(m_cct
, 20) << "shut_down: flushing watch" << dendl
;
476 librados::Rados
rados(m_ioctx
);
477 librados::AioCompletion
*comp
= librados::Rados::aio_create_completion(
478 on_finish
, nullptr, utils::rados_ctx_callback
);
479 r
= rados
.aio_watch_flush(comp
);
483 on_finish
= new FunctionContext([this, on_finish
](int r
) {
484 flush_commit_position(on_finish
);
486 if (watch_handle
!= 0) {
487 librados::AioCompletion
*comp
= librados::Rados::aio_create_completion(
488 on_finish
, nullptr, utils::rados_ctx_callback
);
489 int r
= m_ioctx
.aio_unwatch(watch_handle
, comp
);
493 on_finish
->complete(0);
497 void JournalMetadata::get_immutable_metadata(uint8_t *order
,
498 uint8_t *splay_width
,
500 Context
*on_finish
) {
501 client::get_immutable_metadata(m_ioctx
, m_oid
, order
, splay_width
, pool_id
,
505 void JournalMetadata::get_mutable_metadata(uint64_t *minimum_set
,
506 uint64_t *active_set
,
507 RegisteredClients
*clients
,
508 Context
*on_finish
) {
509 client::get_mutable_metadata(m_ioctx
, m_oid
, minimum_set
, active_set
, clients
,
513 void JournalMetadata::register_client(const bufferlist
&data
,
514 Context
*on_finish
) {
515 ldout(m_cct
, 10) << __func__
<< ": " << m_client_id
<< dendl
;
516 librados::ObjectWriteOperation op
;
517 client::client_register(&op
, m_client_id
, data
);
519 C_NotifyUpdate
*ctx
= new C_NotifyUpdate(this, on_finish
);
521 librados::AioCompletion
*comp
=
522 librados::Rados::aio_create_completion(ctx
, NULL
,
523 utils::rados_ctx_callback
);
524 int r
= m_ioctx
.aio_operate(m_oid
, comp
, &op
);
529 void JournalMetadata::update_client(const bufferlist
&data
,
530 Context
*on_finish
) {
531 ldout(m_cct
, 10) << __func__
<< ": " << m_client_id
<< dendl
;
532 librados::ObjectWriteOperation op
;
533 client::client_update_data(&op
, m_client_id
, data
);
535 C_NotifyUpdate
*ctx
= new C_NotifyUpdate(this, on_finish
);
537 librados::AioCompletion
*comp
=
538 librados::Rados::aio_create_completion(ctx
, NULL
,
539 utils::rados_ctx_callback
);
540 int r
= m_ioctx
.aio_operate(m_oid
, comp
, &op
);
545 void JournalMetadata::unregister_client(Context
*on_finish
) {
546 assert(!m_client_id
.empty());
548 ldout(m_cct
, 10) << __func__
<< ": " << m_client_id
<< dendl
;
549 librados::ObjectWriteOperation op
;
550 client::client_unregister(&op
, m_client_id
);
552 C_NotifyUpdate
*ctx
= new C_NotifyUpdate(this, on_finish
);
554 librados::AioCompletion
*comp
=
555 librados::Rados::aio_create_completion(ctx
, NULL
,
556 utils::rados_ctx_callback
);
557 int r
= m_ioctx
.aio_operate(m_oid
, comp
, &op
);
562 void JournalMetadata::allocate_tag(uint64_t tag_class
, const bufferlist
&data
,
563 Tag
*tag
, Context
*on_finish
) {
564 on_finish
= new C_NotifyUpdate(this, on_finish
);
565 C_AllocateTag
*ctx
= new C_AllocateTag(m_cct
, m_ioctx
, m_oid
,
566 m_async_op_tracker
, tag_class
,
567 data
, tag
, on_finish
);
571 void JournalMetadata::get_client(const std::string
&client_id
,
572 cls::journal::Client
*client
,
573 Context
*on_finish
) {
574 C_GetClient
*ctx
= new C_GetClient(m_cct
, m_ioctx
, m_oid
, m_async_op_tracker
,
575 client_id
, client
, on_finish
);
579 void JournalMetadata::get_tag(uint64_t tag_tid
, Tag
*tag
, Context
*on_finish
) {
580 C_GetTag
*ctx
= new C_GetTag(m_cct
, m_ioctx
, m_oid
, m_async_op_tracker
,
581 tag_tid
, tag
, on_finish
);
585 void JournalMetadata::get_tags(uint64_t start_after_tag_tid
,
586 const boost::optional
<uint64_t> &tag_class
,
587 Tags
*tags
, Context
*on_finish
) {
588 C_GetTags
*ctx
= new C_GetTags(m_cct
, m_ioctx
, m_oid
, m_client_id
,
589 m_async_op_tracker
, start_after_tag_tid
,
590 tag_class
, tags
, on_finish
);
594 void JournalMetadata::add_listener(JournalMetadataListener
*listener
) {
595 Mutex::Locker
locker(m_lock
);
596 while (m_update_notifications
> 0) {
597 m_update_cond
.Wait(m_lock
);
599 m_listeners
.push_back(listener
);
602 void JournalMetadata::remove_listener(JournalMetadataListener
*listener
) {
603 Mutex::Locker
locker(m_lock
);
604 while (m_update_notifications
> 0) {
605 m_update_cond
.Wait(m_lock
);
607 m_listeners
.remove(listener
);
610 void JournalMetadata::set_minimum_set(uint64_t object_set
) {
611 Mutex::Locker
locker(m_lock
);
613 ldout(m_cct
, 20) << __func__
<< ": current=" << m_minimum_set
614 << ", new=" << object_set
<< dendl
;
615 if (m_minimum_set
>= object_set
) {
619 librados::ObjectWriteOperation op
;
620 client::set_minimum_set(&op
, object_set
);
622 C_NotifyUpdate
*ctx
= new C_NotifyUpdate(this);
623 librados::AioCompletion
*comp
=
624 librados::Rados::aio_create_completion(ctx
, NULL
,
625 utils::rados_ctx_callback
);
626 int r
= m_ioctx
.aio_operate(m_oid
, comp
, &op
);
630 m_minimum_set
= object_set
;
633 int JournalMetadata::set_active_set(uint64_t object_set
) {
635 set_active_set(object_set
, &ctx
);
639 void JournalMetadata::set_active_set(uint64_t object_set
, Context
*on_finish
) {
640 Mutex::Locker
locker(m_lock
);
642 ldout(m_cct
, 20) << __func__
<< ": current=" << m_active_set
643 << ", new=" << object_set
<< dendl
;
644 if (m_active_set
>= object_set
) {
645 m_work_queue
->queue(on_finish
, 0);
649 librados::ObjectWriteOperation op
;
650 client::set_active_set(&op
, object_set
);
652 C_NotifyUpdate
*ctx
= new C_NotifyUpdate(this, on_finish
);
653 librados::AioCompletion
*comp
=
654 librados::Rados::aio_create_completion(ctx
, NULL
,
655 utils::rados_ctx_callback
);
656 int r
= m_ioctx
.aio_operate(m_oid
, comp
, &op
);
660 m_active_set
= object_set
;
663 void JournalMetadata::assert_active_tag(uint64_t tag_tid
, Context
*on_finish
) {
664 Mutex::Locker
locker(m_lock
);
666 C_AssertActiveTag
*ctx
= new C_AssertActiveTag(m_cct
, m_ioctx
, m_oid
,
668 m_client_id
, tag_tid
,
673 void JournalMetadata::flush_commit_position() {
674 ldout(m_cct
, 20) << __func__
<< dendl
;
676 Mutex::Locker
timer_locker(*m_timer_lock
);
677 Mutex::Locker
locker(m_lock
);
678 if (m_commit_position_ctx
== nullptr) {
682 cancel_commit_task();
683 handle_commit_position_task();
686 void JournalMetadata::flush_commit_position(Context
*on_safe
) {
687 ldout(m_cct
, 20) << __func__
<< dendl
;
689 Mutex::Locker
timer_locker(*m_timer_lock
);
690 Mutex::Locker
locker(m_lock
);
691 if (m_commit_position_ctx
== nullptr) {
693 if (on_safe
!= nullptr) {
694 m_work_queue
->queue(on_safe
, 0);
699 if (on_safe
!= nullptr) {
700 m_commit_position_ctx
= new C_FlushCommitPosition(
701 m_commit_position_ctx
, on_safe
);
703 cancel_commit_task();
704 handle_commit_position_task();
707 void JournalMetadata::reserve_entry_tid(uint64_t tag_tid
, uint64_t entry_tid
) {
708 Mutex::Locker
locker(m_lock
);
709 uint64_t &allocated_entry_tid
= m_allocated_entry_tids
[tag_tid
];
710 if (allocated_entry_tid
<= entry_tid
) {
711 allocated_entry_tid
= entry_tid
+ 1;
715 bool JournalMetadata::get_last_allocated_entry_tid(uint64_t tag_tid
,
716 uint64_t *entry_tid
) const {
717 Mutex::Locker
locker(m_lock
);
719 AllocatedEntryTids::const_iterator it
= m_allocated_entry_tids
.find(tag_tid
);
720 if (it
== m_allocated_entry_tids
.end()) {
724 assert(it
->second
> 0);
725 *entry_tid
= it
->second
- 1;
729 void JournalMetadata::handle_immutable_metadata(int r
, Context
*on_init
) {
731 lderr(m_cct
) << "failed to initialize immutable metadata: "
732 << cpp_strerror(r
) << dendl
;
733 on_init
->complete(r
);
737 ldout(m_cct
, 10) << "initialized immutable metadata" << dendl
;
741 void JournalMetadata::refresh(Context
*on_complete
) {
742 ldout(m_cct
, 10) << "refreshing mutable metadata" << dendl
;
743 C_Refresh
*refresh
= new C_Refresh(this, on_complete
);
744 get_mutable_metadata(&refresh
->minimum_set
, &refresh
->active_set
,
745 &refresh
->registered_clients
, refresh
);
748 void JournalMetadata::handle_refresh_complete(C_Refresh
*refresh
, int r
) {
749 ldout(m_cct
, 10) << "refreshed mutable metadata: r=" << r
<< dendl
;
751 Mutex::Locker
locker(m_lock
);
753 Client
client(m_client_id
, bufferlist());
754 RegisteredClients::iterator it
= refresh
->registered_clients
.find(client
);
755 if (it
!= refresh
->registered_clients
.end()) {
756 if (it
->state
== cls::journal::CLIENT_STATE_DISCONNECTED
) {
757 ldout(m_cct
, 0) << "client flagged disconnected: " << m_client_id
760 m_minimum_set
= MAX(m_minimum_set
, refresh
->minimum_set
);
761 m_active_set
= MAX(m_active_set
, refresh
->active_set
);
762 m_registered_clients
= refresh
->registered_clients
;
765 ++m_update_notifications
;
767 for (Listeners::iterator it
= m_listeners
.begin();
768 it
!= m_listeners
.end(); ++it
) {
769 (*it
)->handle_update(this);
772 if (--m_update_notifications
== 0) {
773 m_update_cond
.Signal();
776 lderr(m_cct
) << "failed to locate client: " << m_client_id
<< dendl
;
781 if (refresh
->on_finish
!= NULL
) {
782 refresh
->on_finish
->complete(r
);
786 void JournalMetadata::cancel_commit_task() {
787 ldout(m_cct
, 20) << __func__
<< dendl
;
789 assert(m_timer_lock
->is_locked());
790 assert(m_lock
.is_locked());
791 assert(m_commit_position_ctx
!= nullptr);
792 assert(m_commit_position_task_ctx
!= nullptr);
794 m_timer
->cancel_event(m_commit_position_task_ctx
);
795 m_commit_position_task_ctx
= NULL
;
798 void JournalMetadata::schedule_commit_task() {
799 ldout(m_cct
, 20) << __func__
<< dendl
;
801 assert(m_timer_lock
->is_locked());
802 assert(m_lock
.is_locked());
803 assert(m_commit_position_ctx
!= nullptr);
804 if (m_commit_position_task_ctx
== NULL
) {
805 m_commit_position_task_ctx
= new C_CommitPositionTask(this);
806 m_timer
->add_event_after(m_settings
.commit_interval
,
807 m_commit_position_task_ctx
);
811 void JournalMetadata::handle_commit_position_task() {
812 assert(m_timer_lock
->is_locked());
813 assert(m_lock
.is_locked());
814 ldout(m_cct
, 20) << __func__
<< ": "
815 << "client_id=" << m_client_id
<< ", "
816 << "commit_position=" << m_commit_position
<< dendl
;
818 librados::ObjectWriteOperation op
;
819 client::client_commit(&op
, m_client_id
, m_commit_position
);
821 Context
*ctx
= new C_NotifyUpdate(this, m_commit_position_ctx
);
822 m_commit_position_ctx
= NULL
;
824 ctx
= schedule_laggy_clients_disconnect(ctx
);
826 librados::AioCompletion
*comp
=
827 librados::Rados::aio_create_completion(ctx
, NULL
,
828 utils::rados_ctx_callback
);
829 int r
= m_ioctx
.aio_operate(m_oid
, comp
, &op
);
833 m_commit_position_task_ctx
= NULL
;
836 void JournalMetadata::schedule_watch_reset() {
837 assert(m_timer_lock
->is_locked());
838 m_timer
->add_event_after(1, new C_WatchReset(this));
841 void JournalMetadata::handle_watch_reset() {
842 assert(m_timer_lock
->is_locked());
843 if (!m_initialized
) {
847 int r
= m_ioctx
.watch2(m_oid
, &m_watch_handle
, &m_watch_ctx
);
850 ldout(m_cct
, 5) << __func__
<< ": journal header not found" << dendl
;
851 } else if (r
== -EBLACKLISTED
) {
852 ldout(m_cct
, 5) << __func__
<< ": client blacklisted" << dendl
;
854 lderr(m_cct
) << __func__
<< ": failed to watch journal: "
855 << cpp_strerror(r
) << dendl
;
857 schedule_watch_reset();
859 ldout(m_cct
, 10) << __func__
<< ": reset journal watch" << dendl
;
864 void JournalMetadata::handle_watch_notify(uint64_t notify_id
, uint64_t cookie
) {
865 ldout(m_cct
, 10) << "journal header updated" << dendl
;
868 m_ioctx
.notify_ack(m_oid
, notify_id
, cookie
, bl
);
873 void JournalMetadata::handle_watch_error(int err
) {
874 if (err
== -ENOTCONN
) {
875 ldout(m_cct
, 5) << "journal watch error: header removed" << dendl
;
876 } else if (err
== -EBLACKLISTED
) {
877 lderr(m_cct
) << "journal watch error: client blacklisted" << dendl
;
879 lderr(m_cct
) << "journal watch error: " << cpp_strerror(err
) << dendl
;
882 Mutex::Locker
timer_locker(*m_timer_lock
);
883 Mutex::Locker
locker(m_lock
);
885 // release old watch on error
886 if (m_watch_handle
!= 0) {
887 m_ioctx
.unwatch2(m_watch_handle
);
891 if (m_initialized
&& err
!= -ENOENT
) {
892 schedule_watch_reset();
896 uint64_t JournalMetadata::allocate_commit_tid(uint64_t object_num
,
898 uint64_t entry_tid
) {
899 Mutex::Locker
locker(m_lock
);
900 uint64_t commit_tid
= ++m_commit_tid
;
901 m_pending_commit_tids
[commit_tid
] = CommitEntry(object_num
, tag_tid
,
904 ldout(m_cct
, 20) << "allocated commit tid: commit_tid=" << commit_tid
<< " ["
905 << "object_num=" << object_num
<< ", "
906 << "tag_tid=" << tag_tid
<< ", "
907 << "entry_tid=" << entry_tid
<< "]"
912 void JournalMetadata::overflow_commit_tid(uint64_t commit_tid
,
913 uint64_t object_num
) {
914 Mutex::Locker
locker(m_lock
);
916 auto it
= m_pending_commit_tids
.find(commit_tid
);
917 assert(it
!= m_pending_commit_tids
.end());
918 assert(it
->second
.object_num
< object_num
);
920 ldout(m_cct
, 20) << __func__
<< ": "
921 << "commit_tid=" << commit_tid
<< ", "
922 << "old_object_num=" << it
->second
.object_num
<< ", "
923 << "new_object_num=" << object_num
<< dendl
;
924 it
->second
.object_num
= object_num
;
927 void JournalMetadata::get_commit_entry(uint64_t commit_tid
,
928 uint64_t *object_num
,
929 uint64_t *tag_tid
, uint64_t *entry_tid
) {
930 Mutex::Locker
locker(m_lock
);
932 auto it
= m_pending_commit_tids
.find(commit_tid
);
933 assert(it
!= m_pending_commit_tids
.end());
935 *object_num
= it
->second
.object_num
;
936 *tag_tid
= it
->second
.tag_tid
;
937 *entry_tid
= it
->second
.entry_tid
;
940 void JournalMetadata::committed(uint64_t commit_tid
,
941 const CreateContext
&create_context
) {
942 ldout(m_cct
, 20) << "committed tid=" << commit_tid
<< dendl
;
944 ObjectSetPosition commit_position
;
945 Context
*stale_ctx
= nullptr;
947 Mutex::Locker
timer_locker(*m_timer_lock
);
948 Mutex::Locker
locker(m_lock
);
949 assert(commit_tid
> m_commit_position_tid
);
951 if (!m_commit_position
.object_positions
.empty()) {
952 // in-flight commit position update
953 commit_position
= m_commit_position
;
955 // safe commit position
956 commit_position
= m_client
.commit_position
;
959 CommitTids::iterator it
= m_pending_commit_tids
.find(commit_tid
);
960 assert(it
!= m_pending_commit_tids
.end());
962 CommitEntry
&commit_entry
= it
->second
;
963 commit_entry
.committed
= true;
965 bool update_commit_position
= false;
966 while (!m_pending_commit_tids
.empty()) {
967 CommitTids::iterator it
= m_pending_commit_tids
.begin();
968 CommitEntry
&commit_entry
= it
->second
;
969 if (!commit_entry
.committed
) {
973 commit_position
.object_positions
.emplace_front(
974 commit_entry
.object_num
, commit_entry
.tag_tid
,
975 commit_entry
.entry_tid
);
976 m_pending_commit_tids
.erase(it
);
977 update_commit_position
= true;
980 if (!update_commit_position
) {
984 // prune the position to have one position per splay offset
985 std::set
<uint8_t> in_use_splay_offsets
;
986 ObjectPositions::iterator ob_it
= commit_position
.object_positions
.begin();
987 while (ob_it
!= commit_position
.object_positions
.end()) {
988 uint8_t splay_offset
= ob_it
->object_number
% m_splay_width
;
989 if (!in_use_splay_offsets
.insert(splay_offset
).second
) {
990 ob_it
= commit_position
.object_positions
.erase(ob_it
);
996 stale_ctx
= m_commit_position_ctx
;
997 m_commit_position_ctx
= create_context();
998 m_commit_position
= commit_position
;
999 m_commit_position_tid
= commit_tid
;
1001 ldout(m_cct
, 20) << "updated commit position: " << commit_position
<< ", "
1002 << "on_safe=" << m_commit_position_ctx
<< dendl
;
1003 schedule_commit_task();
1007 if (stale_ctx
!= nullptr) {
1008 ldout(m_cct
, 20) << "canceling stale commit: on_safe=" << stale_ctx
1010 stale_ctx
->complete(-ESTALE
);
1014 void JournalMetadata::notify_update() {
1015 ldout(m_cct
, 10) << "notifying journal header update" << dendl
;
1018 m_ioctx
.notify2(m_oid
, bl
, 5000, NULL
);
1021 void JournalMetadata::async_notify_update(Context
*on_safe
) {
1022 ldout(m_cct
, 10) << "async notifying journal header update" << dendl
;
1024 C_AioNotify
*ctx
= new C_AioNotify(this, on_safe
);
1025 librados::AioCompletion
*comp
=
1026 librados::Rados::aio_create_completion(ctx
, NULL
,
1027 utils::rados_ctx_callback
);
1030 int r
= m_ioctx
.aio_notify(m_oid
, comp
, bl
, 5000, NULL
);
1036 void JournalMetadata::wait_for_ops() {
1038 m_async_op_tracker
.wait_for_ops(&ctx
);
1042 void JournalMetadata::handle_notified(int r
) {
1043 ldout(m_cct
, 10) << "notified journal header update: r=" << r
<< dendl
;
1046 Context
*JournalMetadata::schedule_laggy_clients_disconnect(Context
*on_finish
) {
1047 assert(m_lock
.is_locked());
1049 ldout(m_cct
, 20) << __func__
<< dendl
;
1051 if (m_settings
.max_concurrent_object_sets
<= 0) {
1055 Context
*ctx
= on_finish
;
1057 for (auto &c
: m_registered_clients
) {
1058 if (c
.state
== cls::journal::CLIENT_STATE_DISCONNECTED
||
1059 c
.id
== m_client_id
||
1060 m_settings
.whitelisted_laggy_clients
.count(c
.id
) > 0) {
1063 const std::string
&client_id
= c
.id
;
1064 uint64_t object_set
= 0;
1065 if (!c
.commit_position
.object_positions
.empty()) {
1066 auto &position
= *(c
.commit_position
.object_positions
.begin());
1067 object_set
= position
.object_number
/ m_splay_width
;
1070 if (m_active_set
> object_set
+ m_settings
.max_concurrent_object_sets
) {
1071 ldout(m_cct
, 1) << __func__
<< ": " << client_id
1072 << ": scheduling disconnect" << dendl
;
1074 ctx
= new FunctionContext([this, client_id
, ctx
](int r1
) {
1075 ldout(m_cct
, 10) << __func__
<< ": " << client_id
1076 << ": flagging disconnected" << dendl
;
1078 librados::ObjectWriteOperation op
;
1079 client::client_update_state(&op
, client_id
,
1080 cls::journal::CLIENT_STATE_DISCONNECTED
);
1082 librados::AioCompletion
*comp
=
1083 librados::Rados::aio_create_completion(ctx
, nullptr,
1084 utils::rados_ctx_callback
);
1085 int r
= m_ioctx
.aio_operate(m_oid
, comp
, &op
);
1092 if (ctx
== on_finish
) {
1093 ldout(m_cct
, 20) << __func__
<< ": no laggy clients to disconnect" << dendl
;
1099 std::ostream
&operator<<(std::ostream
&os
,
1100 const JournalMetadata::RegisteredClients
&clients
) {
1102 for (JournalMetadata::RegisteredClients::const_iterator c
= clients
.begin();
1103 c
!= clients
.end(); ++c
) {
1104 os
<< (c
== clients
.begin() ? "" : ", " ) << *c
;
1110 std::ostream
&operator<<(std::ostream
&os
,
1111 const JournalMetadata
&jm
) {
1112 Mutex::Locker
locker(jm
.m_lock
);
1113 os
<< "[oid=" << jm
.m_oid
<< ", "
1114 << "initialized=" << jm
.m_initialized
<< ", "
1115 << "order=" << (int)jm
.m_order
<< ", "
1116 << "splay_width=" << (int)jm
.m_splay_width
<< ", "
1117 << "pool_id=" << jm
.m_pool_id
<< ", "
1118 << "minimum_set=" << jm
.m_minimum_set
<< ", "
1119 << "active_set=" << jm
.m_active_set
<< ", "
1120 << "client_id=" << jm
.m_client_id
<< ", "
1121 << "commit_tid=" << jm
.m_commit_tid
<< ", "
1122 << "commit_interval=" << jm
.m_settings
.commit_interval
<< ", "
1123 << "commit_position=" << jm
.m_commit_position
<< ", "
1124 << "registered_clients=" << jm
.m_registered_clients
<< "]";
1128 } // namespace journal