]> git.proxmox.com Git - ceph.git/blob - ceph/src/journal/JournalPlayer.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / journal / JournalPlayer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "journal/JournalPlayer.h"
5 #include "journal/Entry.h"
6 #include "journal/ReplayHandler.h"
7 #include "journal/Utils.h"
8
9 #define dout_subsys ceph_subsys_journaler
10 #undef dout_prefix
11 #define dout_prefix *_dout << "JournalPlayer: " << this << " "
12
13 namespace journal {
14
15 namespace {
16
17 struct C_HandleComplete : public Context {
18 ReplayHandler *replay_handler;
19
20 explicit C_HandleComplete(ReplayHandler *_replay_handler)
21 : replay_handler(_replay_handler) {
22 replay_handler->get();
23 }
24 ~C_HandleComplete() override {
25 replay_handler->put();
26 }
27 void finish(int r) override {
28 replay_handler->handle_complete(r);
29 }
30 };
31
32 struct C_HandleEntriesAvailable : public Context {
33 ReplayHandler *replay_handler;
34
35 explicit C_HandleEntriesAvailable(ReplayHandler *_replay_handler)
36 : replay_handler(_replay_handler) {
37 replay_handler->get();
38 }
39 ~C_HandleEntriesAvailable() override {
40 replay_handler->put();
41 }
42 void finish(int r) override {
43 replay_handler->handle_entries_available();
44 }
45 };
46
47 } // anonymous namespace
48
49 JournalPlayer::JournalPlayer(librados::IoCtx &ioctx,
50 const std::string &object_oid_prefix,
51 const JournalMetadataPtr& journal_metadata,
52 ReplayHandler *replay_handler)
53 : m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
54 m_journal_metadata(journal_metadata), m_replay_handler(replay_handler),
55 m_lock("JournalPlayer::m_lock"), m_state(STATE_INIT), m_splay_offset(0),
56 m_watch_enabled(false), m_watch_scheduled(false), m_watch_interval(0) {
57 m_replay_handler->get();
58 m_ioctx.dup(ioctx);
59 m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
60
61 ObjectSetPosition commit_position;
62 m_journal_metadata->get_commit_position(&commit_position);
63
64 if (!commit_position.object_positions.empty()) {
65 ldout(m_cct, 5) << "commit position: " << commit_position << dendl;
66
67 // start replay after the last committed entry's object
68 uint8_t splay_width = m_journal_metadata->get_splay_width();
69 auto &active_position = commit_position.object_positions.front();
70 m_active_tag_tid = active_position.tag_tid;
71 m_commit_position_valid = true;
72 m_commit_position = active_position;
73 m_splay_offset = active_position.object_number % splay_width;
74 for (auto &position : commit_position.object_positions) {
75 uint8_t splay_offset = position.object_number % splay_width;
76 m_commit_positions[splay_offset] = position;
77 }
78 }
79 }
80
81 JournalPlayer::~JournalPlayer() {
82 ceph_assert(m_async_op_tracker.empty());
83 {
84 Mutex::Locker locker(m_lock);
85 ceph_assert(m_shut_down);
86 ceph_assert(m_fetch_object_numbers.empty());
87 ceph_assert(!m_watch_scheduled);
88 }
89 m_replay_handler->put();
90 }
91
92 void JournalPlayer::prefetch() {
93 Mutex::Locker locker(m_lock);
94 ceph_assert(m_state == STATE_INIT);
95 m_state = STATE_PREFETCH;
96
97 m_active_set = m_journal_metadata->get_active_set();
98 uint8_t splay_width = m_journal_metadata->get_splay_width();
99 for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
100 m_prefetch_splay_offsets.insert(splay_offset);
101 }
102
103 // compute active object for each splay offset (might be before
104 // active set)
105 std::map<uint8_t, uint64_t> splay_offset_to_objects;
106 for (auto &position : m_commit_positions) {
107 ceph_assert(splay_offset_to_objects.count(position.first) == 0);
108 splay_offset_to_objects[position.first] = position.second.object_number;
109 }
110
111 // prefetch the active object for each splay offset
112 std::set<uint64_t> prefetch_object_numbers;
113 for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
114 uint64_t object_number = splay_offset;
115 if (splay_offset_to_objects.count(splay_offset) != 0) {
116 object_number = splay_offset_to_objects[splay_offset];
117 }
118
119 prefetch_object_numbers.insert(object_number);
120 }
121
122 ldout(m_cct, 10) << __func__ << ": prefetching "
123 << prefetch_object_numbers.size() << " " << "objects"
124 << dendl;
125 for (auto object_number : prefetch_object_numbers) {
126 fetch(object_number);
127 }
128 }
129
130 void JournalPlayer::prefetch_and_watch(double interval) {
131 {
132 Mutex::Locker locker(m_lock);
133 m_watch_enabled = true;
134 m_watch_interval = interval;
135 m_watch_step = WATCH_STEP_FETCH_CURRENT;
136 }
137 prefetch();
138 }
139
140 void JournalPlayer::shut_down(Context *on_finish) {
141 ldout(m_cct, 20) << __func__ << dendl;
142 Mutex::Locker locker(m_lock);
143
144 ceph_assert(!m_shut_down);
145 m_shut_down = true;
146 m_watch_enabled = false;
147
148 on_finish = utils::create_async_context_callback(
149 m_journal_metadata, on_finish);
150
151 if (m_watch_scheduled) {
152 ObjectPlayerPtr object_player = get_object_player();
153 switch (m_watch_step) {
154 case WATCH_STEP_FETCH_FIRST:
155 object_player = m_object_players.begin()->second;
156 // fallthrough
157 case WATCH_STEP_FETCH_CURRENT:
158 object_player->unwatch();
159 break;
160 case WATCH_STEP_ASSERT_ACTIVE:
161 break;
162 }
163 }
164
165 m_async_op_tracker.wait_for_ops(on_finish);
166 }
167
168 bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
169 ldout(m_cct, 20) << __func__ << dendl;
170 Mutex::Locker locker(m_lock);
171
172 if (m_state != STATE_PLAYBACK) {
173 m_handler_notified = false;
174 return false;
175 }
176
177 if (!verify_playback_ready()) {
178 if (!is_object_set_ready()) {
179 m_handler_notified = false;
180 } else {
181 refetch(true);
182 }
183 return false;
184 }
185
186 ObjectPlayerPtr object_player = get_object_player();
187 ceph_assert(object_player && !object_player->empty());
188
189 object_player->front(entry);
190 object_player->pop_front();
191
192 uint64_t last_entry_tid;
193 if (m_journal_metadata->get_last_allocated_entry_tid(
194 entry->get_tag_tid(), &last_entry_tid) &&
195 entry->get_entry_tid() != last_entry_tid + 1) {
196 lderr(m_cct) << "missing prior journal entry: " << *entry << dendl;
197
198 m_state = STATE_ERROR;
199 notify_complete(-ENOMSG);
200 return false;
201 }
202
203 advance_splay_object();
204 remove_empty_object_player(object_player);
205
206 m_journal_metadata->reserve_entry_tid(entry->get_tag_tid(),
207 entry->get_entry_tid());
208 *commit_tid = m_journal_metadata->allocate_commit_tid(
209 object_player->get_object_number(), entry->get_tag_tid(),
210 entry->get_entry_tid());
211 return true;
212 }
213
214 void JournalPlayer::process_state(uint64_t object_number, int r) {
215 ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << ", "
216 << "r=" << r << dendl;
217
218 ceph_assert(m_lock.is_locked());
219 if (r >= 0) {
220 switch (m_state) {
221 case STATE_PREFETCH:
222 ldout(m_cct, 10) << "PREFETCH" << dendl;
223 r = process_prefetch(object_number);
224 break;
225 case STATE_PLAYBACK:
226 ldout(m_cct, 10) << "PLAYBACK" << dendl;
227 r = process_playback(object_number);
228 break;
229 case STATE_ERROR:
230 ldout(m_cct, 10) << "ERROR" << dendl;
231 break;
232 default:
233 lderr(m_cct) << "UNEXPECTED STATE (" << m_state << ")" << dendl;
234 ceph_abort();
235 break;
236 }
237 }
238
239 if (r < 0) {
240 m_state = STATE_ERROR;
241 notify_complete(r);
242 }
243 }
244
245 int JournalPlayer::process_prefetch(uint64_t object_number) {
246 ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << dendl;
247 ceph_assert(m_lock.is_locked());
248
249 uint8_t splay_width = m_journal_metadata->get_splay_width();
250 uint8_t splay_offset = object_number % splay_width;
251
252 PrefetchSplayOffsets::iterator it = m_prefetch_splay_offsets.find(
253 splay_offset);
254 if (it == m_prefetch_splay_offsets.end()) {
255 return 0;
256 }
257
258 bool prefetch_complete = false;
259 ceph_assert(m_object_players.count(splay_offset) == 1);
260 ObjectPlayerPtr object_player = m_object_players[splay_offset];
261
262 // prefetch in-order since a newer splay object could prefetch first
263 if (m_fetch_object_numbers.count(object_player->get_object_number()) == 0) {
264 // skip past known committed records
265 if (m_commit_positions.count(splay_offset) != 0 &&
266 !object_player->empty()) {
267 ObjectPosition &position = m_commit_positions[splay_offset];
268
269 ldout(m_cct, 15) << "seeking known commit position " << position << " in "
270 << object_player->get_oid() << dendl;
271
272 bool found_commit = false;
273 Entry entry;
274 while (!object_player->empty()) {
275 object_player->front(&entry);
276
277 if (entry.get_tag_tid() == position.tag_tid &&
278 entry.get_entry_tid() == position.entry_tid) {
279 found_commit = true;
280 } else if (found_commit) {
281 ldout(m_cct, 10) << "located next uncommitted entry: " << entry
282 << dendl;
283 break;
284 }
285
286 ldout(m_cct, 20) << "skipping committed entry: " << entry << dendl;
287 m_journal_metadata->reserve_entry_tid(entry.get_tag_tid(),
288 entry.get_entry_tid());
289 object_player->pop_front();
290 }
291
292 // do not search for commit position for this object
293 // if we've already seen it
294 if (found_commit) {
295 m_commit_positions.erase(splay_offset);
296 }
297 }
298
299 // if the object is empty, pre-fetch the next splay object
300 if (object_player->empty() && object_player->refetch_required()) {
301 ldout(m_cct, 10) << "refetching potentially partially decoded object"
302 << dendl;
303 object_player->set_refetch_state(ObjectPlayer::REFETCH_STATE_NONE);
304 fetch(object_player);
305 } else if (!remove_empty_object_player(object_player)) {
306 ldout(m_cct, 10) << "prefetch of object complete" << dendl;
307 prefetch_complete = true;
308 }
309 }
310
311 if (!prefetch_complete) {
312 return 0;
313 }
314
315 m_prefetch_splay_offsets.erase(it);
316 if (!m_prefetch_splay_offsets.empty()) {
317 return 0;
318 }
319
320 ldout(m_cct, 10) << "switching to playback mode" << dendl;
321 m_state = STATE_PLAYBACK;
322
323 // if we have a valid commit position, our read should start with
324 // the next consistent journal entry in the sequence
325 if (m_commit_position_valid) {
326 splay_offset = m_commit_position.object_number % splay_width;
327 object_player = m_object_players[splay_offset];
328
329 if (object_player->empty()) {
330 if (!object_player->refetch_required()) {
331 advance_splay_object();
332 }
333 } else {
334 Entry entry;
335 object_player->front(&entry);
336 if (entry.get_tag_tid() == m_commit_position.tag_tid) {
337 advance_splay_object();
338 }
339 }
340 }
341
342 if (verify_playback_ready()) {
343 notify_entries_available();
344 } else if (is_object_set_ready()) {
345 refetch(false);
346 }
347 return 0;
348 }
349
350 int JournalPlayer::process_playback(uint64_t object_number) {
351 ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << dendl;
352 ceph_assert(m_lock.is_locked());
353
354 if (verify_playback_ready()) {
355 notify_entries_available();
356 } else if (is_object_set_ready()) {
357 refetch(false);
358 }
359 return 0;
360 }
361
362 bool JournalPlayer::is_object_set_ready() const {
363 ceph_assert(m_lock.is_locked());
364 if (m_watch_scheduled || !m_fetch_object_numbers.empty()) {
365 ldout(m_cct, 20) << __func__ << ": waiting for in-flight fetch" << dendl;
366 return false;
367 }
368
369 return true;
370 }
371
372 bool JournalPlayer::verify_playback_ready() {
373 ceph_assert(m_lock.is_locked());
374
375 while (true) {
376 if (!is_object_set_ready()) {
377 ldout(m_cct, 10) << __func__ << ": waiting for full object set" << dendl;
378 return false;
379 }
380
381 ObjectPlayerPtr object_player = get_object_player();
382 ceph_assert(object_player);
383 uint64_t object_num = object_player->get_object_number();
384
385 // Verify is the active object player has another entry available
386 // in the sequence
387 // NOTE: replay currently does not check tag class to playback multiple tags
388 // from different classes (issue #14909). When a new tag is discovered, it
389 // is assumed that the previous tag was closed at the last replayable entry.
390 Entry entry;
391 if (!object_player->empty()) {
392 m_watch_prune_active_tag = false;
393 object_player->front(&entry);
394
395 if (!m_active_tag_tid) {
396 ldout(m_cct, 10) << __func__ << ": "
397 << "object_num=" << object_num << ", "
398 << "initial tag=" << entry.get_tag_tid()
399 << dendl;
400 m_active_tag_tid = entry.get_tag_tid();
401 return true;
402 } else if (entry.get_tag_tid() < *m_active_tag_tid ||
403 (m_prune_tag_tid && entry.get_tag_tid() <= *m_prune_tag_tid)) {
404 // entry occurred before the current active tag
405 ldout(m_cct, 10) << __func__ << ": detected stale entry: "
406 << "object_num=" << object_num << ", "
407 << "entry=" << entry << dendl;
408 prune_tag(entry.get_tag_tid());
409 continue;
410 } else if (entry.get_tag_tid() > *m_active_tag_tid) {
411 // new tag at current playback position -- implies that previous
412 // tag ended abruptly without flushing out all records
413 // search for the start record for the next tag
414 ldout(m_cct, 10) << __func__ << ": new tag detected: "
415 << "object_num=" << object_num << ", "
416 << "active_tag=" << *m_active_tag_tid << ", "
417 << "new_tag=" << entry.get_tag_tid() << dendl;
418 if (entry.get_entry_tid() == 0) {
419 // first entry in new tag -- can promote to active
420 prune_active_tag(entry.get_tag_tid());
421 return true;
422 } else {
423 // prune current active and wait for initial entry for new tag
424 prune_active_tag(boost::none);
425 continue;
426 }
427 } else {
428 ldout(m_cct, 20) << __func__ << ": "
429 << "object_num=" << object_num << ", "
430 << "entry: " << entry << dendl;
431 ceph_assert(entry.get_tag_tid() == *m_active_tag_tid);
432 return true;
433 }
434 } else {
435 if (!m_active_tag_tid) {
436 // waiting for our first entry
437 ldout(m_cct, 10) << __func__ << ": waiting for first entry: "
438 << "object_num=" << object_num << dendl;
439 return false;
440 } else if (m_prune_tag_tid && *m_prune_tag_tid == *m_active_tag_tid) {
441 ldout(m_cct, 10) << __func__ << ": no more entries" << dendl;
442 return false;
443 } else if (m_watch_enabled && m_watch_prune_active_tag) {
444 // detected current tag is now longer active and we have re-read the
445 // current object but it's still empty, so this tag is done
446 ldout(m_cct, 10) << __func__ << ": assuming no more in-sequence entries: "
447 << "object_num=" << object_num << ", "
448 << "active_tag " << *m_active_tag_tid << dendl;
449 prune_active_tag(boost::none);
450 continue;
451 } else if (object_player->refetch_required()) {
452 // if the active object requires a refetch, don't proceed looking for a
453 // new tag before this process completes
454 ldout(m_cct, 10) << __func__ << ": refetch required: "
455 << "object_num=" << object_num << dendl;
456 return false;
457 } else if (!m_watch_enabled) {
458 // current playback position is empty so this tag is done
459 ldout(m_cct, 10) << __func__ << ": no more in-sequence entries: "
460 << "object_num=" << object_num << ", "
461 << "active_tag=" << *m_active_tag_tid << dendl;
462 prune_active_tag(boost::none);
463 continue;
464 } else if (!m_watch_scheduled) {
465 // no more entries and we don't have an active watch in-progress
466 ldout(m_cct, 10) << __func__ << ": no more entries -- watch required"
467 << dendl;
468 return false;
469 }
470 }
471 }
472 return false;
473 }
474
475 void JournalPlayer::prune_tag(uint64_t tag_tid) {
476 ceph_assert(m_lock.is_locked());
477 ldout(m_cct, 10) << __func__ << ": pruning remaining entries for tag "
478 << tag_tid << dendl;
479
480 // prune records that are at or below the largest prune tag tid
481 if (!m_prune_tag_tid || *m_prune_tag_tid < tag_tid) {
482 m_prune_tag_tid = tag_tid;
483 }
484
485 bool pruned = false;
486 for (auto &player_pair : m_object_players) {
487 ObjectPlayerPtr object_player(player_pair.second);
488 ldout(m_cct, 15) << __func__ << ": checking " << object_player->get_oid()
489 << dendl;
490 while (!object_player->empty()) {
491 Entry entry;
492 object_player->front(&entry);
493 if (entry.get_tag_tid() == tag_tid) {
494 ldout(m_cct, 20) << __func__ << ": pruned " << entry << dendl;
495 object_player->pop_front();
496 pruned = true;
497 } else {
498 break;
499 }
500 }
501 }
502
503 // avoid watch delay when pruning stale tags from journal objects
504 if (pruned) {
505 ldout(m_cct, 15) << __func__ << ": resetting refetch state to immediate"
506 << dendl;
507 for (auto &player_pair : m_object_players) {
508 ObjectPlayerPtr object_player(player_pair.second);
509 object_player->set_refetch_state(ObjectPlayer::REFETCH_STATE_IMMEDIATE);
510 }
511 }
512
513 // trim empty player to prefetch the next available object
514 for (auto &player_pair : m_object_players) {
515 remove_empty_object_player(player_pair.second);
516 }
517 }
518
519 void JournalPlayer::prune_active_tag(const boost::optional<uint64_t>& tag_tid) {
520 ceph_assert(m_lock.is_locked());
521 ceph_assert(m_active_tag_tid);
522
523 uint64_t active_tag_tid = *m_active_tag_tid;
524 if (tag_tid) {
525 m_active_tag_tid = tag_tid;
526 }
527 m_splay_offset = 0;
528 m_watch_step = WATCH_STEP_FETCH_CURRENT;
529
530 prune_tag(active_tag_tid);
531 }
532
533 ObjectPlayerPtr JournalPlayer::get_object_player() const {
534 ceph_assert(m_lock.is_locked());
535
536 SplayedObjectPlayers::const_iterator it = m_object_players.find(
537 m_splay_offset);
538 ceph_assert(it != m_object_players.end());
539 return it->second;
540 }
541
542 ObjectPlayerPtr JournalPlayer::get_object_player(uint64_t object_number) const {
543 ceph_assert(m_lock.is_locked());
544
545 uint8_t splay_width = m_journal_metadata->get_splay_width();
546 uint8_t splay_offset = object_number % splay_width;
547 auto splay_it = m_object_players.find(splay_offset);
548 ceph_assert(splay_it != m_object_players.end());
549
550 ObjectPlayerPtr object_player = splay_it->second;
551 ceph_assert(object_player->get_object_number() == object_number);
552 return object_player;
553 }
554
555 void JournalPlayer::advance_splay_object() {
556 ceph_assert(m_lock.is_locked());
557 ++m_splay_offset;
558 m_splay_offset %= m_journal_metadata->get_splay_width();
559 m_watch_step = WATCH_STEP_FETCH_CURRENT;
560 ldout(m_cct, 20) << __func__ << ": new offset "
561 << static_cast<uint32_t>(m_splay_offset) << dendl;
562 }
563
564 bool JournalPlayer::remove_empty_object_player(const ObjectPlayerPtr &player) {
565 ceph_assert(m_lock.is_locked());
566 ceph_assert(!m_watch_scheduled);
567
568 uint8_t splay_width = m_journal_metadata->get_splay_width();
569 uint64_t object_set = player->get_object_number() / splay_width;
570 uint64_t active_set = m_journal_metadata->get_active_set();
571 if (!player->empty() || object_set == active_set) {
572 return false;
573 } else if (player->refetch_required()) {
574 ldout(m_cct, 20) << __func__ << ": " << player->get_oid() << " requires "
575 << "a refetch" << dendl;
576 return false;
577 } else if (m_active_set != active_set) {
578 ldout(m_cct, 20) << __func__ << ": new active set detected, all players "
579 << "require refetch" << dendl;
580 m_active_set = active_set;
581 for (auto &pair : m_object_players) {
582 pair.second->set_refetch_state(ObjectPlayer::REFETCH_STATE_IMMEDIATE);
583 }
584 return false;
585 }
586
587 ldout(m_cct, 15) << __func__ << ": " << player->get_oid() << " empty"
588 << dendl;
589
590 m_watch_prune_active_tag = false;
591 m_watch_step = WATCH_STEP_FETCH_CURRENT;
592
593 uint64_t next_object_num = player->get_object_number() + splay_width;
594 fetch(next_object_num);
595 return true;
596 }
597
598 void JournalPlayer::fetch(uint64_t object_num) {
599 ceph_assert(m_lock.is_locked());
600
601 ObjectPlayerPtr object_player(new ObjectPlayer(
602 m_ioctx, m_object_oid_prefix, object_num, m_journal_metadata->get_timer(),
603 m_journal_metadata->get_timer_lock(), m_journal_metadata->get_order(),
604 m_journal_metadata->get_settings().max_fetch_bytes));
605
606 uint8_t splay_width = m_journal_metadata->get_splay_width();
607 m_object_players[object_num % splay_width] = object_player;
608 fetch(object_player);
609 }
610
611 void JournalPlayer::fetch(const ObjectPlayerPtr &object_player) {
612 ceph_assert(m_lock.is_locked());
613
614 uint64_t object_num = object_player->get_object_number();
615 std::string oid = utils::get_object_name(m_object_oid_prefix, object_num);
616 ceph_assert(m_fetch_object_numbers.count(object_num) == 0);
617 m_fetch_object_numbers.insert(object_num);
618
619 ldout(m_cct, 10) << __func__ << ": " << oid << dendl;
620 C_Fetch *fetch_ctx = new C_Fetch(this, object_num);
621
622 object_player->fetch(fetch_ctx);
623 }
624
625 void JournalPlayer::handle_fetched(uint64_t object_num, int r) {
626 ldout(m_cct, 10) << __func__ << ": "
627 << utils::get_object_name(m_object_oid_prefix, object_num)
628 << ": r=" << r << dendl;
629
630 Mutex::Locker locker(m_lock);
631 ceph_assert(m_fetch_object_numbers.count(object_num) == 1);
632 m_fetch_object_numbers.erase(object_num);
633
634 if (m_shut_down) {
635 return;
636 }
637
638 if (r == 0) {
639 ObjectPlayerPtr object_player = get_object_player(object_num);
640 remove_empty_object_player(object_player);
641 }
642 process_state(object_num, r);
643 }
644
645 void JournalPlayer::refetch(bool immediate) {
646 ldout(m_cct, 10) << __func__ << dendl;
647 ceph_assert(m_lock.is_locked());
648 m_handler_notified = false;
649
650 // if watching the object, handle the periodic re-fetch
651 if (m_watch_enabled) {
652 schedule_watch(immediate);
653 return;
654 }
655
656 ObjectPlayerPtr object_player = get_object_player();
657 if (object_player->refetch_required()) {
658 object_player->set_refetch_state(ObjectPlayer::REFETCH_STATE_NONE);
659 fetch(object_player);
660 return;
661 }
662
663 notify_complete(0);
664 }
665
666 void JournalPlayer::schedule_watch(bool immediate) {
667 ldout(m_cct, 10) << __func__ << dendl;
668 ceph_assert(m_lock.is_locked());
669 if (m_watch_scheduled) {
670 return;
671 }
672
673 m_watch_scheduled = true;
674
675 if (m_watch_step == WATCH_STEP_ASSERT_ACTIVE) {
676 // detect if a new tag has been created in case we are blocked
677 // by an incomplete tag sequence
678 ldout(m_cct, 20) << __func__ << ": asserting active tag="
679 << *m_active_tag_tid << dendl;
680
681 m_async_op_tracker.start_op();
682 FunctionContext *ctx = new FunctionContext([this](int r) {
683 handle_watch_assert_active(r);
684 });
685 m_journal_metadata->assert_active_tag(*m_active_tag_tid, ctx);
686 return;
687 }
688
689 ObjectPlayerPtr object_player;
690 double watch_interval = m_watch_interval;
691
692 switch (m_watch_step) {
693 case WATCH_STEP_FETCH_CURRENT:
694 {
695 object_player = get_object_player();
696
697 uint8_t splay_width = m_journal_metadata->get_splay_width();
698 uint64_t active_set = m_journal_metadata->get_active_set();
699 uint64_t object_set = object_player->get_object_number() / splay_width;
700 if (immediate ||
701 (object_player->get_refetch_state() ==
702 ObjectPlayer::REFETCH_STATE_IMMEDIATE) ||
703 (object_set < active_set && object_player->refetch_required())) {
704 ldout(m_cct, 20) << __func__ << ": immediately refetching "
705 << object_player->get_oid()
706 << dendl;
707 object_player->set_refetch_state(ObjectPlayer::REFETCH_STATE_NONE);
708 watch_interval = 0;
709 }
710 }
711 break;
712 case WATCH_STEP_FETCH_FIRST:
713 object_player = m_object_players.begin()->second;
714 watch_interval = 0;
715 break;
716 default:
717 ceph_abort();
718 }
719
720 ldout(m_cct, 20) << __func__ << ": scheduling watch on "
721 << object_player->get_oid() << dendl;
722 Context *ctx = utils::create_async_context_callback(
723 m_journal_metadata, new C_Watch(this, object_player->get_object_number()));
724 object_player->watch(ctx, watch_interval);
725 }
726
727 void JournalPlayer::handle_watch(uint64_t object_num, int r) {
728 ldout(m_cct, 10) << __func__ << ": r=" << r << dendl;
729 Mutex::Locker locker(m_lock);
730 ceph_assert(m_watch_scheduled);
731 m_watch_scheduled = false;
732
733 if (m_shut_down || r == -ECANCELED) {
734 // unwatch of object player(s)
735 return;
736 }
737
738 ObjectPlayerPtr object_player = get_object_player(object_num);
739 if (r == 0 && object_player->empty()) {
740 // possibly need to prune this empty object player if we've
741 // already fetched it after the active set was advanced with no
742 // new records
743 remove_empty_object_player(object_player);
744 }
745
746 // determine what object to query on next watch schedule tick
747 uint8_t splay_width = m_journal_metadata->get_splay_width();
748 if (m_watch_step == WATCH_STEP_FETCH_CURRENT &&
749 object_player->get_object_number() % splay_width != 0) {
750 m_watch_step = WATCH_STEP_FETCH_FIRST;
751 } else if (m_active_tag_tid) {
752 m_watch_step = WATCH_STEP_ASSERT_ACTIVE;
753 } else {
754 m_watch_step = WATCH_STEP_FETCH_CURRENT;
755 }
756
757 process_state(object_num, r);
758 }
759
760 void JournalPlayer::handle_watch_assert_active(int r) {
761 ldout(m_cct, 10) << __func__ << ": r=" << r << dendl;
762
763 Mutex::Locker locker(m_lock);
764 ceph_assert(m_watch_scheduled);
765 m_watch_scheduled = false;
766
767 if (r == -ESTALE) {
768 // newer tag exists -- since we are at this step in the watch sequence,
769 // we know we can prune the active tag if watch fails again
770 ldout(m_cct, 10) << __func__ << ": tag " << *m_active_tag_tid << " "
771 << "no longer active" << dendl;
772 m_watch_prune_active_tag = true;
773 }
774
775 m_watch_step = WATCH_STEP_FETCH_CURRENT;
776 if (!m_shut_down && m_watch_enabled) {
777 schedule_watch(false);
778 }
779 m_async_op_tracker.finish_op();
780 }
781
782 void JournalPlayer::notify_entries_available() {
783 ceph_assert(m_lock.is_locked());
784 if (m_handler_notified) {
785 return;
786 }
787 m_handler_notified = true;
788
789 ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
790 m_journal_metadata->queue(new C_HandleEntriesAvailable(
791 m_replay_handler), 0);
792 }
793
794 void JournalPlayer::notify_complete(int r) {
795 ceph_assert(m_lock.is_locked());
796 m_handler_notified = true;
797
798 ldout(m_cct, 10) << __func__ << ": replay complete: r=" << r << dendl;
799 m_journal_metadata->queue(new C_HandleComplete(
800 m_replay_handler), r);
801 }
802
803 } // namespace journal