1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "journal/JournalPlayer.h"
5 #include "journal/Entry.h"
6 #include "journal/ReplayHandler.h"
7 #include "journal/Utils.h"
9 #define dout_subsys ceph_subsys_journaler
11 #define dout_prefix *_dout << "JournalPlayer: " << this << " "
17 struct C_HandleComplete
: public Context
{
18 ReplayHandler
*replay_handler
;
20 explicit C_HandleComplete(ReplayHandler
*_replay_handler
)
21 : replay_handler(_replay_handler
) {
22 replay_handler
->get();
24 ~C_HandleComplete() override
{
25 replay_handler
->put();
27 void finish(int r
) override
{
28 replay_handler
->handle_complete(r
);
32 struct C_HandleEntriesAvailable
: public Context
{
33 ReplayHandler
*replay_handler
;
35 explicit C_HandleEntriesAvailable(ReplayHandler
*_replay_handler
)
36 : replay_handler(_replay_handler
) {
37 replay_handler
->get();
39 ~C_HandleEntriesAvailable() override
{
40 replay_handler
->put();
42 void finish(int r
) override
{
43 replay_handler
->handle_entries_available();
47 } // anonymous namespace
49 JournalPlayer::JournalPlayer(librados::IoCtx
&ioctx
,
50 const std::string
&object_oid_prefix
,
51 const JournalMetadataPtr
& journal_metadata
,
52 ReplayHandler
*replay_handler
)
53 : m_cct(NULL
), m_object_oid_prefix(object_oid_prefix
),
54 m_journal_metadata(journal_metadata
), m_replay_handler(replay_handler
),
55 m_lock("JournalPlayer::m_lock"), m_state(STATE_INIT
), m_splay_offset(0),
56 m_watch_enabled(false), m_watch_scheduled(false), m_watch_interval(0) {
57 m_replay_handler
->get();
59 m_cct
= reinterpret_cast<CephContext
*>(m_ioctx
.cct());
61 ObjectSetPosition commit_position
;
62 m_journal_metadata
->get_commit_position(&commit_position
);
64 if (!commit_position
.object_positions
.empty()) {
65 ldout(m_cct
, 5) << "commit position: " << commit_position
<< dendl
;
67 // start replay after the last committed entry's object
68 uint8_t splay_width
= m_journal_metadata
->get_splay_width();
69 auto &active_position
= commit_position
.object_positions
.front();
70 m_active_tag_tid
= active_position
.tag_tid
;
71 m_commit_position_valid
= true;
72 m_commit_position
= active_position
;
73 m_splay_offset
= active_position
.object_number
% splay_width
;
74 for (auto &position
: commit_position
.object_positions
) {
75 uint8_t splay_offset
= position
.object_number
% splay_width
;
76 m_commit_positions
[splay_offset
] = position
;
81 JournalPlayer::~JournalPlayer() {
82 ceph_assert(m_async_op_tracker
.empty());
84 Mutex::Locker
locker(m_lock
);
85 ceph_assert(m_shut_down
);
86 ceph_assert(m_fetch_object_numbers
.empty());
87 ceph_assert(!m_watch_scheduled
);
89 m_replay_handler
->put();
92 void JournalPlayer::prefetch() {
93 Mutex::Locker
locker(m_lock
);
94 ceph_assert(m_state
== STATE_INIT
);
95 m_state
= STATE_PREFETCH
;
97 m_active_set
= m_journal_metadata
->get_active_set();
98 uint8_t splay_width
= m_journal_metadata
->get_splay_width();
99 for (uint8_t splay_offset
= 0; splay_offset
< splay_width
; ++splay_offset
) {
100 m_prefetch_splay_offsets
.insert(splay_offset
);
103 // compute active object for each splay offset (might be before
105 std::map
<uint8_t, uint64_t> splay_offset_to_objects
;
106 for (auto &position
: m_commit_positions
) {
107 ceph_assert(splay_offset_to_objects
.count(position
.first
) == 0);
108 splay_offset_to_objects
[position
.first
] = position
.second
.object_number
;
111 // prefetch the active object for each splay offset
112 std::set
<uint64_t> prefetch_object_numbers
;
113 for (uint8_t splay_offset
= 0; splay_offset
< splay_width
; ++splay_offset
) {
114 uint64_t object_number
= splay_offset
;
115 if (splay_offset_to_objects
.count(splay_offset
) != 0) {
116 object_number
= splay_offset_to_objects
[splay_offset
];
119 prefetch_object_numbers
.insert(object_number
);
122 ldout(m_cct
, 10) << __func__
<< ": prefetching "
123 << prefetch_object_numbers
.size() << " " << "objects"
125 for (auto object_number
: prefetch_object_numbers
) {
126 fetch(object_number
);
130 void JournalPlayer::prefetch_and_watch(double interval
) {
132 Mutex::Locker
locker(m_lock
);
133 m_watch_enabled
= true;
134 m_watch_interval
= interval
;
135 m_watch_step
= WATCH_STEP_FETCH_CURRENT
;
140 void JournalPlayer::shut_down(Context
*on_finish
) {
141 ldout(m_cct
, 20) << __func__
<< dendl
;
142 Mutex::Locker
locker(m_lock
);
144 ceph_assert(!m_shut_down
);
146 m_watch_enabled
= false;
148 on_finish
= utils::create_async_context_callback(
149 m_journal_metadata
, on_finish
);
151 if (m_watch_scheduled
) {
152 ObjectPlayerPtr object_player
= get_object_player();
153 switch (m_watch_step
) {
154 case WATCH_STEP_FETCH_FIRST
:
155 object_player
= m_object_players
.begin()->second
;
157 case WATCH_STEP_FETCH_CURRENT
:
158 object_player
->unwatch();
160 case WATCH_STEP_ASSERT_ACTIVE
:
165 m_async_op_tracker
.wait_for_ops(on_finish
);
168 bool JournalPlayer::try_pop_front(Entry
*entry
, uint64_t *commit_tid
) {
169 ldout(m_cct
, 20) << __func__
<< dendl
;
170 Mutex::Locker
locker(m_lock
);
172 if (m_state
!= STATE_PLAYBACK
) {
173 m_handler_notified
= false;
177 if (!verify_playback_ready()) {
178 if (!is_object_set_ready()) {
179 m_handler_notified
= false;
186 ObjectPlayerPtr object_player
= get_object_player();
187 ceph_assert(object_player
&& !object_player
->empty());
189 object_player
->front(entry
);
190 object_player
->pop_front();
192 uint64_t last_entry_tid
;
193 if (m_journal_metadata
->get_last_allocated_entry_tid(
194 entry
->get_tag_tid(), &last_entry_tid
) &&
195 entry
->get_entry_tid() != last_entry_tid
+ 1) {
196 lderr(m_cct
) << "missing prior journal entry: " << *entry
<< dendl
;
198 m_state
= STATE_ERROR
;
199 notify_complete(-ENOMSG
);
203 advance_splay_object();
204 remove_empty_object_player(object_player
);
206 m_journal_metadata
->reserve_entry_tid(entry
->get_tag_tid(),
207 entry
->get_entry_tid());
208 *commit_tid
= m_journal_metadata
->allocate_commit_tid(
209 object_player
->get_object_number(), entry
->get_tag_tid(),
210 entry
->get_entry_tid());
214 void JournalPlayer::process_state(uint64_t object_number
, int r
) {
215 ldout(m_cct
, 10) << __func__
<< ": object_num=" << object_number
<< ", "
216 << "r=" << r
<< dendl
;
218 ceph_assert(m_lock
.is_locked());
222 ldout(m_cct
, 10) << "PREFETCH" << dendl
;
223 r
= process_prefetch(object_number
);
226 ldout(m_cct
, 10) << "PLAYBACK" << dendl
;
227 r
= process_playback(object_number
);
230 ldout(m_cct
, 10) << "ERROR" << dendl
;
233 lderr(m_cct
) << "UNEXPECTED STATE (" << m_state
<< ")" << dendl
;
240 m_state
= STATE_ERROR
;
245 int JournalPlayer::process_prefetch(uint64_t object_number
) {
246 ldout(m_cct
, 10) << __func__
<< ": object_num=" << object_number
<< dendl
;
247 ceph_assert(m_lock
.is_locked());
249 uint8_t splay_width
= m_journal_metadata
->get_splay_width();
250 uint8_t splay_offset
= object_number
% splay_width
;
252 PrefetchSplayOffsets::iterator it
= m_prefetch_splay_offsets
.find(
254 if (it
== m_prefetch_splay_offsets
.end()) {
258 bool prefetch_complete
= false;
259 ceph_assert(m_object_players
.count(splay_offset
) == 1);
260 ObjectPlayerPtr object_player
= m_object_players
[splay_offset
];
262 // prefetch in-order since a newer splay object could prefetch first
263 if (m_fetch_object_numbers
.count(object_player
->get_object_number()) == 0) {
264 // skip past known committed records
265 if (m_commit_positions
.count(splay_offset
) != 0 &&
266 !object_player
->empty()) {
267 ObjectPosition
&position
= m_commit_positions
[splay_offset
];
269 ldout(m_cct
, 15) << "seeking known commit position " << position
<< " in "
270 << object_player
->get_oid() << dendl
;
272 bool found_commit
= false;
274 while (!object_player
->empty()) {
275 object_player
->front(&entry
);
277 if (entry
.get_tag_tid() == position
.tag_tid
&&
278 entry
.get_entry_tid() == position
.entry_tid
) {
280 } else if (found_commit
) {
281 ldout(m_cct
, 10) << "located next uncommitted entry: " << entry
286 ldout(m_cct
, 20) << "skipping committed entry: " << entry
<< dendl
;
287 m_journal_metadata
->reserve_entry_tid(entry
.get_tag_tid(),
288 entry
.get_entry_tid());
289 object_player
->pop_front();
292 // do not search for commit position for this object
293 // if we've already seen it
295 m_commit_positions
.erase(splay_offset
);
299 // if the object is empty, pre-fetch the next splay object
300 if (object_player
->empty() && object_player
->refetch_required()) {
301 ldout(m_cct
, 10) << "refetching potentially partially decoded object"
303 object_player
->set_refetch_state(ObjectPlayer::REFETCH_STATE_NONE
);
304 fetch(object_player
);
305 } else if (!remove_empty_object_player(object_player
)) {
306 ldout(m_cct
, 10) << "prefetch of object complete" << dendl
;
307 prefetch_complete
= true;
311 if (!prefetch_complete
) {
315 m_prefetch_splay_offsets
.erase(it
);
316 if (!m_prefetch_splay_offsets
.empty()) {
320 ldout(m_cct
, 10) << "switching to playback mode" << dendl
;
321 m_state
= STATE_PLAYBACK
;
323 // if we have a valid commit position, our read should start with
324 // the next consistent journal entry in the sequence
325 if (m_commit_position_valid
) {
326 splay_offset
= m_commit_position
.object_number
% splay_width
;
327 object_player
= m_object_players
[splay_offset
];
329 if (object_player
->empty()) {
330 if (!object_player
->refetch_required()) {
331 advance_splay_object();
335 object_player
->front(&entry
);
336 if (entry
.get_tag_tid() == m_commit_position
.tag_tid
) {
337 advance_splay_object();
342 if (verify_playback_ready()) {
343 notify_entries_available();
344 } else if (is_object_set_ready()) {
350 int JournalPlayer::process_playback(uint64_t object_number
) {
351 ldout(m_cct
, 10) << __func__
<< ": object_num=" << object_number
<< dendl
;
352 ceph_assert(m_lock
.is_locked());
354 if (verify_playback_ready()) {
355 notify_entries_available();
356 } else if (is_object_set_ready()) {
362 bool JournalPlayer::is_object_set_ready() const {
363 ceph_assert(m_lock
.is_locked());
364 if (m_watch_scheduled
|| !m_fetch_object_numbers
.empty()) {
365 ldout(m_cct
, 20) << __func__
<< ": waiting for in-flight fetch" << dendl
;
372 bool JournalPlayer::verify_playback_ready() {
373 ceph_assert(m_lock
.is_locked());
376 if (!is_object_set_ready()) {
377 ldout(m_cct
, 10) << __func__
<< ": waiting for full object set" << dendl
;
381 ObjectPlayerPtr object_player
= get_object_player();
382 ceph_assert(object_player
);
383 uint64_t object_num
= object_player
->get_object_number();
385 // Verify is the active object player has another entry available
387 // NOTE: replay currently does not check tag class to playback multiple tags
388 // from different classes (issue #14909). When a new tag is discovered, it
389 // is assumed that the previous tag was closed at the last replayable entry.
391 if (!object_player
->empty()) {
392 m_watch_prune_active_tag
= false;
393 object_player
->front(&entry
);
395 if (!m_active_tag_tid
) {
396 ldout(m_cct
, 10) << __func__
<< ": "
397 << "object_num=" << object_num
<< ", "
398 << "initial tag=" << entry
.get_tag_tid()
400 m_active_tag_tid
= entry
.get_tag_tid();
402 } else if (entry
.get_tag_tid() < *m_active_tag_tid
||
403 (m_prune_tag_tid
&& entry
.get_tag_tid() <= *m_prune_tag_tid
)) {
404 // entry occurred before the current active tag
405 ldout(m_cct
, 10) << __func__
<< ": detected stale entry: "
406 << "object_num=" << object_num
<< ", "
407 << "entry=" << entry
<< dendl
;
408 prune_tag(entry
.get_tag_tid());
410 } else if (entry
.get_tag_tid() > *m_active_tag_tid
) {
411 // new tag at current playback position -- implies that previous
412 // tag ended abruptly without flushing out all records
413 // search for the start record for the next tag
414 ldout(m_cct
, 10) << __func__
<< ": new tag detected: "
415 << "object_num=" << object_num
<< ", "
416 << "active_tag=" << *m_active_tag_tid
<< ", "
417 << "new_tag=" << entry
.get_tag_tid() << dendl
;
418 if (entry
.get_entry_tid() == 0) {
419 // first entry in new tag -- can promote to active
420 prune_active_tag(entry
.get_tag_tid());
423 // prune current active and wait for initial entry for new tag
424 prune_active_tag(boost::none
);
428 ldout(m_cct
, 20) << __func__
<< ": "
429 << "object_num=" << object_num
<< ", "
430 << "entry: " << entry
<< dendl
;
431 ceph_assert(entry
.get_tag_tid() == *m_active_tag_tid
);
435 if (!m_active_tag_tid
) {
436 // waiting for our first entry
437 ldout(m_cct
, 10) << __func__
<< ": waiting for first entry: "
438 << "object_num=" << object_num
<< dendl
;
440 } else if (m_prune_tag_tid
&& *m_prune_tag_tid
== *m_active_tag_tid
) {
441 ldout(m_cct
, 10) << __func__
<< ": no more entries" << dendl
;
443 } else if (m_watch_enabled
&& m_watch_prune_active_tag
) {
444 // detected current tag is now longer active and we have re-read the
445 // current object but it's still empty, so this tag is done
446 ldout(m_cct
, 10) << __func__
<< ": assuming no more in-sequence entries: "
447 << "object_num=" << object_num
<< ", "
448 << "active_tag " << *m_active_tag_tid
<< dendl
;
449 prune_active_tag(boost::none
);
451 } else if (object_player
->refetch_required()) {
452 // if the active object requires a refetch, don't proceed looking for a
453 // new tag before this process completes
454 ldout(m_cct
, 10) << __func__
<< ": refetch required: "
455 << "object_num=" << object_num
<< dendl
;
457 } else if (!m_watch_enabled
) {
458 // current playback position is empty so this tag is done
459 ldout(m_cct
, 10) << __func__
<< ": no more in-sequence entries: "
460 << "object_num=" << object_num
<< ", "
461 << "active_tag=" << *m_active_tag_tid
<< dendl
;
462 prune_active_tag(boost::none
);
464 } else if (!m_watch_scheduled
) {
465 // no more entries and we don't have an active watch in-progress
466 ldout(m_cct
, 10) << __func__
<< ": no more entries -- watch required"
475 void JournalPlayer::prune_tag(uint64_t tag_tid
) {
476 ceph_assert(m_lock
.is_locked());
477 ldout(m_cct
, 10) << __func__
<< ": pruning remaining entries for tag "
480 // prune records that are at or below the largest prune tag tid
481 if (!m_prune_tag_tid
|| *m_prune_tag_tid
< tag_tid
) {
482 m_prune_tag_tid
= tag_tid
;
486 for (auto &player_pair
: m_object_players
) {
487 ObjectPlayerPtr
object_player(player_pair
.second
);
488 ldout(m_cct
, 15) << __func__
<< ": checking " << object_player
->get_oid()
490 while (!object_player
->empty()) {
492 object_player
->front(&entry
);
493 if (entry
.get_tag_tid() == tag_tid
) {
494 ldout(m_cct
, 20) << __func__
<< ": pruned " << entry
<< dendl
;
495 object_player
->pop_front();
503 // avoid watch delay when pruning stale tags from journal objects
505 ldout(m_cct
, 15) << __func__
<< ": resetting refetch state to immediate"
507 for (auto &player_pair
: m_object_players
) {
508 ObjectPlayerPtr
object_player(player_pair
.second
);
509 object_player
->set_refetch_state(ObjectPlayer::REFETCH_STATE_IMMEDIATE
);
513 // trim empty player to prefetch the next available object
514 for (auto &player_pair
: m_object_players
) {
515 remove_empty_object_player(player_pair
.second
);
519 void JournalPlayer::prune_active_tag(const boost::optional
<uint64_t>& tag_tid
) {
520 ceph_assert(m_lock
.is_locked());
521 ceph_assert(m_active_tag_tid
);
523 uint64_t active_tag_tid
= *m_active_tag_tid
;
525 m_active_tag_tid
= tag_tid
;
528 m_watch_step
= WATCH_STEP_FETCH_CURRENT
;
530 prune_tag(active_tag_tid
);
533 ObjectPlayerPtr
JournalPlayer::get_object_player() const {
534 ceph_assert(m_lock
.is_locked());
536 SplayedObjectPlayers::const_iterator it
= m_object_players
.find(
538 ceph_assert(it
!= m_object_players
.end());
542 ObjectPlayerPtr
JournalPlayer::get_object_player(uint64_t object_number
) const {
543 ceph_assert(m_lock
.is_locked());
545 uint8_t splay_width
= m_journal_metadata
->get_splay_width();
546 uint8_t splay_offset
= object_number
% splay_width
;
547 auto splay_it
= m_object_players
.find(splay_offset
);
548 ceph_assert(splay_it
!= m_object_players
.end());
550 ObjectPlayerPtr object_player
= splay_it
->second
;
551 ceph_assert(object_player
->get_object_number() == object_number
);
552 return object_player
;
555 void JournalPlayer::advance_splay_object() {
556 ceph_assert(m_lock
.is_locked());
558 m_splay_offset
%= m_journal_metadata
->get_splay_width();
559 m_watch_step
= WATCH_STEP_FETCH_CURRENT
;
560 ldout(m_cct
, 20) << __func__
<< ": new offset "
561 << static_cast<uint32_t>(m_splay_offset
) << dendl
;
564 bool JournalPlayer::remove_empty_object_player(const ObjectPlayerPtr
&player
) {
565 ceph_assert(m_lock
.is_locked());
566 ceph_assert(!m_watch_scheduled
);
568 uint8_t splay_width
= m_journal_metadata
->get_splay_width();
569 uint64_t object_set
= player
->get_object_number() / splay_width
;
570 uint64_t active_set
= m_journal_metadata
->get_active_set();
571 if (!player
->empty() || object_set
== active_set
) {
573 } else if (player
->refetch_required()) {
574 ldout(m_cct
, 20) << __func__
<< ": " << player
->get_oid() << " requires "
575 << "a refetch" << dendl
;
577 } else if (m_active_set
!= active_set
) {
578 ldout(m_cct
, 20) << __func__
<< ": new active set detected, all players "
579 << "require refetch" << dendl
;
580 m_active_set
= active_set
;
581 for (auto &pair
: m_object_players
) {
582 pair
.second
->set_refetch_state(ObjectPlayer::REFETCH_STATE_IMMEDIATE
);
587 ldout(m_cct
, 15) << __func__
<< ": " << player
->get_oid() << " empty"
590 m_watch_prune_active_tag
= false;
591 m_watch_step
= WATCH_STEP_FETCH_CURRENT
;
593 uint64_t next_object_num
= player
->get_object_number() + splay_width
;
594 fetch(next_object_num
);
598 void JournalPlayer::fetch(uint64_t object_num
) {
599 ceph_assert(m_lock
.is_locked());
601 ObjectPlayerPtr
object_player(new ObjectPlayer(
602 m_ioctx
, m_object_oid_prefix
, object_num
, m_journal_metadata
->get_timer(),
603 m_journal_metadata
->get_timer_lock(), m_journal_metadata
->get_order(),
604 m_journal_metadata
->get_settings().max_fetch_bytes
));
606 uint8_t splay_width
= m_journal_metadata
->get_splay_width();
607 m_object_players
[object_num
% splay_width
] = object_player
;
608 fetch(object_player
);
611 void JournalPlayer::fetch(const ObjectPlayerPtr
&object_player
) {
612 ceph_assert(m_lock
.is_locked());
614 uint64_t object_num
= object_player
->get_object_number();
615 std::string oid
= utils::get_object_name(m_object_oid_prefix
, object_num
);
616 ceph_assert(m_fetch_object_numbers
.count(object_num
) == 0);
617 m_fetch_object_numbers
.insert(object_num
);
619 ldout(m_cct
, 10) << __func__
<< ": " << oid
<< dendl
;
620 C_Fetch
*fetch_ctx
= new C_Fetch(this, object_num
);
622 object_player
->fetch(fetch_ctx
);
625 void JournalPlayer::handle_fetched(uint64_t object_num
, int r
) {
626 ldout(m_cct
, 10) << __func__
<< ": "
627 << utils::get_object_name(m_object_oid_prefix
, object_num
)
628 << ": r=" << r
<< dendl
;
630 Mutex::Locker
locker(m_lock
);
631 ceph_assert(m_fetch_object_numbers
.count(object_num
) == 1);
632 m_fetch_object_numbers
.erase(object_num
);
639 ObjectPlayerPtr object_player
= get_object_player(object_num
);
640 remove_empty_object_player(object_player
);
642 process_state(object_num
, r
);
645 void JournalPlayer::refetch(bool immediate
) {
646 ldout(m_cct
, 10) << __func__
<< dendl
;
647 ceph_assert(m_lock
.is_locked());
648 m_handler_notified
= false;
650 // if watching the object, handle the periodic re-fetch
651 if (m_watch_enabled
) {
652 schedule_watch(immediate
);
656 ObjectPlayerPtr object_player
= get_object_player();
657 if (object_player
->refetch_required()) {
658 object_player
->set_refetch_state(ObjectPlayer::REFETCH_STATE_NONE
);
659 fetch(object_player
);
666 void JournalPlayer::schedule_watch(bool immediate
) {
667 ldout(m_cct
, 10) << __func__
<< dendl
;
668 ceph_assert(m_lock
.is_locked());
669 if (m_watch_scheduled
) {
673 m_watch_scheduled
= true;
675 if (m_watch_step
== WATCH_STEP_ASSERT_ACTIVE
) {
676 // detect if a new tag has been created in case we are blocked
677 // by an incomplete tag sequence
678 ldout(m_cct
, 20) << __func__
<< ": asserting active tag="
679 << *m_active_tag_tid
<< dendl
;
681 m_async_op_tracker
.start_op();
682 FunctionContext
*ctx
= new FunctionContext([this](int r
) {
683 handle_watch_assert_active(r
);
685 m_journal_metadata
->assert_active_tag(*m_active_tag_tid
, ctx
);
689 ObjectPlayerPtr object_player
;
690 double watch_interval
= m_watch_interval
;
692 switch (m_watch_step
) {
693 case WATCH_STEP_FETCH_CURRENT
:
695 object_player
= get_object_player();
697 uint8_t splay_width
= m_journal_metadata
->get_splay_width();
698 uint64_t active_set
= m_journal_metadata
->get_active_set();
699 uint64_t object_set
= object_player
->get_object_number() / splay_width
;
701 (object_player
->get_refetch_state() ==
702 ObjectPlayer::REFETCH_STATE_IMMEDIATE
) ||
703 (object_set
< active_set
&& object_player
->refetch_required())) {
704 ldout(m_cct
, 20) << __func__
<< ": immediately refetching "
705 << object_player
->get_oid()
707 object_player
->set_refetch_state(ObjectPlayer::REFETCH_STATE_NONE
);
712 case WATCH_STEP_FETCH_FIRST
:
713 object_player
= m_object_players
.begin()->second
;
720 ldout(m_cct
, 20) << __func__
<< ": scheduling watch on "
721 << object_player
->get_oid() << dendl
;
722 Context
*ctx
= utils::create_async_context_callback(
723 m_journal_metadata
, new C_Watch(this, object_player
->get_object_number()));
724 object_player
->watch(ctx
, watch_interval
);
727 void JournalPlayer::handle_watch(uint64_t object_num
, int r
) {
728 ldout(m_cct
, 10) << __func__
<< ": r=" << r
<< dendl
;
729 Mutex::Locker
locker(m_lock
);
730 ceph_assert(m_watch_scheduled
);
731 m_watch_scheduled
= false;
733 if (m_shut_down
|| r
== -ECANCELED
) {
734 // unwatch of object player(s)
738 ObjectPlayerPtr object_player
= get_object_player(object_num
);
739 if (r
== 0 && object_player
->empty()) {
740 // possibly need to prune this empty object player if we've
741 // already fetched it after the active set was advanced with no
743 remove_empty_object_player(object_player
);
746 // determine what object to query on next watch schedule tick
747 uint8_t splay_width
= m_journal_metadata
->get_splay_width();
748 if (m_watch_step
== WATCH_STEP_FETCH_CURRENT
&&
749 object_player
->get_object_number() % splay_width
!= 0) {
750 m_watch_step
= WATCH_STEP_FETCH_FIRST
;
751 } else if (m_active_tag_tid
) {
752 m_watch_step
= WATCH_STEP_ASSERT_ACTIVE
;
754 m_watch_step
= WATCH_STEP_FETCH_CURRENT
;
757 process_state(object_num
, r
);
760 void JournalPlayer::handle_watch_assert_active(int r
) {
761 ldout(m_cct
, 10) << __func__
<< ": r=" << r
<< dendl
;
763 Mutex::Locker
locker(m_lock
);
764 ceph_assert(m_watch_scheduled
);
765 m_watch_scheduled
= false;
768 // newer tag exists -- since we are at this step in the watch sequence,
769 // we know we can prune the active tag if watch fails again
770 ldout(m_cct
, 10) << __func__
<< ": tag " << *m_active_tag_tid
<< " "
771 << "no longer active" << dendl
;
772 m_watch_prune_active_tag
= true;
775 m_watch_step
= WATCH_STEP_FETCH_CURRENT
;
776 if (!m_shut_down
&& m_watch_enabled
) {
777 schedule_watch(false);
779 m_async_op_tracker
.finish_op();
782 void JournalPlayer::notify_entries_available() {
783 ceph_assert(m_lock
.is_locked());
784 if (m_handler_notified
) {
787 m_handler_notified
= true;
789 ldout(m_cct
, 10) << __func__
<< ": entries available" << dendl
;
790 m_journal_metadata
->queue(new C_HandleEntriesAvailable(
791 m_replay_handler
), 0);
794 void JournalPlayer::notify_complete(int r
) {
795 ceph_assert(m_lock
.is_locked());
796 m_handler_notified
= true;
798 ldout(m_cct
, 10) << __func__
<< ": replay complete: r=" << r
<< dendl
;
799 m_journal_metadata
->queue(new C_HandleComplete(
800 m_replay_handler
), r
);
803 } // namespace journal