]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/image_replayer/journal/Replayer.cc
import 15.2.2 octopus source
[ceph.git] / ceph / src / tools / rbd_mirror / image_replayer / journal / Replayer.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "Replayer.h"
5#include "common/debug.h"
6#include "common/errno.h"
7#include "common/Timer.h"
8#include "common/WorkQueue.h"
9#include "librbd/Journal.h"
10#include "librbd/Utils.h"
11#include "librbd/journal/Replay.h"
12#include "journal/Journaler.h"
13#include "journal/JournalMetadataListener.h"
14#include "journal/ReplayHandler.h"
15#include "tools/rbd_mirror/Threads.h"
16#include "tools/rbd_mirror/Types.h"
17#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
18#include "tools/rbd_mirror/image_replayer/ReplayerListener.h"
19#include "tools/rbd_mirror/image_replayer/Utils.h"
20#include "tools/rbd_mirror/image_replayer/journal/EventPreprocessor.h"
21#include "tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h"
22#include "tools/rbd_mirror/image_replayer/journal/StateBuilder.h"
23
24#define dout_context g_ceph_context
25#define dout_subsys ceph_subsys_rbd_mirror
26#undef dout_prefix
27#define dout_prefix *_dout << "rbd::mirror::image_replayer::journal::" \
28 << "Replayer: " << this << " " << __func__ << ": "
29
30extern PerfCounters *g_perf_counters;
31
32namespace rbd {
33namespace mirror {
34namespace image_replayer {
35namespace journal {
36
37namespace {
38
39uint32_t calculate_replay_delay(const utime_t &event_time,
40 int mirroring_replay_delay) {
41 if (mirroring_replay_delay <= 0) {
42 return 0;
43 }
44
45 utime_t now = ceph_clock_now();
46 if (event_time + mirroring_replay_delay <= now) {
47 return 0;
48 }
49
50 // ensure it is rounded up when converting to integer
51 return (event_time + mirroring_replay_delay - now) + 1;
52}
53
54} // anonymous namespace
55
56using librbd::util::create_async_context_callback;
57using librbd::util::create_context_callback;
58
59template <typename I>
60struct Replayer<I>::C_ReplayCommitted : public Context {
61 Replayer* replayer;
62 ReplayEntry replay_entry;
63 uint64_t replay_bytes;
64 utime_t replay_start_time;
65
66 C_ReplayCommitted(Replayer* replayer, ReplayEntry &&replay_entry,
67 uint64_t replay_bytes, const utime_t &replay_start_time)
68 : replayer(replayer), replay_entry(std::move(replay_entry)),
69 replay_bytes(replay_bytes), replay_start_time(replay_start_time) {
70 }
71
72 void finish(int r) override {
73 replayer->handle_process_entry_safe(replay_entry, replay_bytes,
74 replay_start_time, r);
75 }
76};
77
9f95a23c
TL
78template <typename I>
79struct Replayer<I>::RemoteJournalerListener
80 : public ::journal::JournalMetadataListener {
81 Replayer* replayer;
82
83 RemoteJournalerListener(Replayer* replayer) : replayer(replayer) {}
84
85 void handle_update(::journal::JournalMetadata*) override {
1911f103
TL
86 auto ctx = new C_TrackedOp(
87 replayer->m_in_flight_op_tracker,
88 new LambdaContext([this](int r) {
89 replayer->handle_remote_journal_metadata_updated();
90 }));
9f95a23c
TL
91 replayer->m_threads->work_queue->queue(ctx, 0);
92 }
93};
94
95template <typename I>
96struct Replayer<I>::RemoteReplayHandler : public ::journal::ReplayHandler {
97 Replayer* replayer;
98
99 RemoteReplayHandler(Replayer* replayer) : replayer(replayer) {}
100 ~RemoteReplayHandler() override {};
101
102 void handle_entries_available() override {
103 replayer->handle_replay_ready();
104 }
105
106 void handle_complete(int r) override {
107 std::string error;
108 if (r == -ENOMEM) {
109 error = "not enough memory in autotune cache";
110 } else if (r < 0) {
111 error = "replay completed with error: " + cpp_strerror(r);
112 }
113 replayer->handle_replay_complete(r, error);
114 }
115};
116
117template <typename I>
118struct Replayer<I>::LocalJournalListener
119 : public librbd::journal::Listener {
120 Replayer* replayer;
121
122 LocalJournalListener(Replayer* replayer) : replayer(replayer) {
123 }
124
125 void handle_close() override {
126 replayer->handle_replay_complete(0, "");
127 }
128
129 void handle_promoted() override {
130 replayer->handle_replay_complete(0, "force promoted");
131 }
132
133 void handle_resync() override {
134 replayer->handle_resync_image();
135 }
136};
137
138template <typename I>
139Replayer<I>::Replayer(
140 Threads<I>* threads,
141 const std::string& local_mirror_uuid,
142 StateBuilder<I>* state_builder,
143 ReplayerListener* replayer_listener)
144 : m_threads(threads),
145 m_local_mirror_uuid(local_mirror_uuid),
146 m_state_builder(state_builder),
147 m_replayer_listener(replayer_listener),
148 m_lock(ceph::make_mutex(librbd::util::unique_lock_name(
149 "rbd::mirror::image_replayer::journal::Replayer", this))) {
150 dout(10) << dendl;
151
152 {
153 std::unique_lock locker{m_lock};
154 register_perf_counters();
155 }
156}
157
158template <typename I>
159Replayer<I>::~Replayer() {
160 dout(10) << dendl;
161
162 {
163 std::unique_lock locker{m_lock};
164 unregister_perf_counters();
165 }
166
167 ceph_assert(m_remote_listener == nullptr);
168 ceph_assert(m_local_journal_listener == nullptr);
169 ceph_assert(m_local_journal_replay == nullptr);
170 ceph_assert(m_remote_replay_handler == nullptr);
171 ceph_assert(m_event_preprocessor == nullptr);
172 ceph_assert(m_replay_status_formatter == nullptr);
173 ceph_assert(m_delayed_preprocess_task == nullptr);
174 ceph_assert(m_flush_local_replay_task == nullptr);
175 ceph_assert(m_state_builder->local_image_ctx == nullptr);
176}
177
178template <typename I>
179void Replayer<I>::init(Context* on_finish) {
180 dout(10) << dendl;
181
182 ceph_assert(m_local_journal == nullptr);
183 {
184 auto local_image_ctx = m_state_builder->local_image_ctx;
185 std::shared_lock image_locker{local_image_ctx->image_lock};
186 m_image_spec = util::compute_image_spec(local_image_ctx->md_ctx,
187 local_image_ctx->name);
188 m_local_journal = local_image_ctx->journal;
189 }
190
191 ceph_assert(m_on_init_shutdown == nullptr);
192 m_on_init_shutdown = on_finish;
193
194 if (m_local_journal == nullptr) {
195 std::unique_lock locker{m_lock};
196 m_state = STATE_COMPLETE;
197 m_state_builder->remote_journaler = nullptr;
198
199 handle_replay_complete(locker, -EINVAL, "error accessing local journal");
200 close_local_image();
201 return;
202 }
203
204 init_remote_journaler();
205}
206
207template <typename I>
208void Replayer<I>::shut_down(Context* on_finish) {
209 dout(10) << dendl;
210
211 std::unique_lock locker{m_lock};
212 ceph_assert(m_on_init_shutdown == nullptr);
213 m_on_init_shutdown = on_finish;
214
215 if (m_state == STATE_INIT) {
216 // raced with the last piece of the init state machine
217 return;
218 } else if (m_state == STATE_REPLAYING) {
219 m_state = STATE_COMPLETE;
220 }
221
222 // if shutting down due to an error notification, we don't
223 // need to propagate the same error again
224 m_error_code = 0;
225 m_error_description = "";
226
227 cancel_delayed_preprocess_task();
228 cancel_flush_local_replay_task();
229 shut_down_local_journal_replay();
230}
231
232template <typename I>
233void Replayer<I>::flush(Context* on_finish) {
234 dout(10) << dendl;
235
1911f103 236 flush_local_replay(new C_TrackedOp(m_in_flight_op_tracker, on_finish));
9f95a23c
TL
237}
238
239template <typename I>
240bool Replayer<I>::get_replay_status(std::string* description,
241 Context* on_finish) {
242 dout(10) << dendl;
243
244 std::unique_lock locker{m_lock};
245 if (m_replay_status_formatter == nullptr) {
246 derr << "replay not running" << dendl;
247 locker.unlock();
248
249 on_finish->complete(-EAGAIN);
250 return false;
251 }
252
1911f103 253 on_finish = new C_TrackedOp(m_in_flight_op_tracker, on_finish);
9f95a23c
TL
254 return m_replay_status_formatter->get_or_send_update(description,
255 on_finish);
256}
257
258template <typename I>
259void Replayer<I>::init_remote_journaler() {
260 dout(10) << dendl;
261
262 Context *ctx = create_context_callback<
263 Replayer, &Replayer<I>::handle_init_remote_journaler>(this);
264 m_state_builder->remote_journaler->init(ctx);
265}
266
267template <typename I>
268void Replayer<I>::handle_init_remote_journaler(int r) {
269 dout(10) << "r=" << r << dendl;
270
271 std::unique_lock locker{m_lock};
272 if (r < 0) {
273 derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl;
274 handle_replay_complete(locker, r, "error initializing remote journal");
275 close_local_image();
276 return;
277 }
278
279 // listen for metadata updates to check for disconnect events
280 ceph_assert(m_remote_listener == nullptr);
281 m_remote_listener = new RemoteJournalerListener(this);
282 m_state_builder->remote_journaler->add_listener(m_remote_listener);
283
284 cls::journal::Client remote_client;
285 r = m_state_builder->remote_journaler->get_cached_client(m_local_mirror_uuid,
286 &remote_client);
287 if (r < 0) {
288 derr << "error retrieving remote journal client: " << cpp_strerror(r)
289 << dendl;
290 handle_replay_complete(locker, r, "error retrieving remote journal client");
291 close_local_image();
292 return;
293 }
294
295 std::string error;
296 r = validate_remote_client_state(remote_client,
297 &m_state_builder->remote_client_meta,
298 &m_resync_requested, &error);
299 if (r < 0) {
300 handle_replay_complete(locker, r, error);
301 close_local_image();
302 return;
303 }
304
305 start_external_replay();
306}
307
308template <typename I>
309void Replayer<I>::start_external_replay() {
310 dout(10) << dendl;
311
312 Context *start_ctx = create_context_callback<
313 Replayer, &Replayer<I>::handle_start_external_replay>(this);
314 m_local_journal->start_external_replay(&m_local_journal_replay, start_ctx);
315}
316
317template <typename I>
318void Replayer<I>::handle_start_external_replay(int r) {
319 dout(10) << "r=" << r << dendl;
320
321 std::unique_lock locker{m_lock};
322 if (r < 0) {
323 ceph_assert(m_local_journal_replay == nullptr);
324 derr << "error starting external replay on local image "
325 << m_state_builder->local_image_ctx->id << ": "
326 << cpp_strerror(r) << dendl;
327
328 handle_replay_complete(locker, r, "error starting replay on local image");
329 close_local_image();
330 return;
331 }
332
333 if (!notify_init_complete(locker)) {
334 return;
335 }
336
337 m_state = STATE_REPLAYING;
338
339 // listen for promotion and resync requests against local journal
340 m_local_journal_listener = new LocalJournalListener(this);
341 m_local_journal->add_listener(m_local_journal_listener);
342
343 // verify that the local image wasn't force-promoted and that a resync hasn't
344 // been requested now that we are listening for events
345 if (m_local_journal->is_tag_owner()) {
346 dout(10) << "local image force-promoted" << dendl;
347 handle_replay_complete(locker, 0, "force promoted");
348 return;
349 }
350
351 bool resync_requested = false;
352 r = m_local_journal->is_resync_requested(&resync_requested);
353 if (r < 0) {
354 dout(10) << "failed to determine resync state: " << cpp_strerror(r)
355 << dendl;
356 handle_replay_complete(locker, r, "error parsing resync state");
357 return;
358 } else if (resync_requested) {
359 dout(10) << "local image resync requested" << dendl;
360 handle_replay_complete(locker, 0, "resync requested");
361 return;
362 }
363
364 // start remote journal replay
365 m_event_preprocessor = EventPreprocessor<I>::create(
366 *m_state_builder->local_image_ctx, *m_state_builder->remote_journaler,
367 m_local_mirror_uuid, &m_state_builder->remote_client_meta,
368 m_threads->work_queue);
369 m_replay_status_formatter = ReplayStatusFormatter<I>::create(
370 m_state_builder->remote_journaler, m_local_mirror_uuid);
371
372 auto cct = static_cast<CephContext *>(m_state_builder->local_image_ctx->cct);
373 double poll_seconds = cct->_conf.get_val<double>(
374 "rbd_mirror_journal_poll_age");
375 m_remote_replay_handler = new RemoteReplayHandler(this);
376 m_state_builder->remote_journaler->start_live_replay(m_remote_replay_handler,
377 poll_seconds);
378
379 notify_status_updated();
380}
381
382template <typename I>
383bool Replayer<I>::notify_init_complete(std::unique_lock<ceph::mutex>& locker) {
384 dout(10) << dendl;
385
386 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
387 ceph_assert(m_state == STATE_INIT);
388
389 // notify that init has completed
390 Context *on_finish = nullptr;
391 std::swap(m_on_init_shutdown, on_finish);
392
393 locker.unlock();
394 on_finish->complete(0);
395 locker.lock();
396
397 if (m_on_init_shutdown != nullptr) {
398 // shut down requested after we notified init complete but before we
399 // grabbed the lock
400 close_local_image();
401 return false;
402 }
403
404 return true;
405}
406
407template <typename I>
408void Replayer<I>::shut_down_local_journal_replay() {
409 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
410
411 if (m_local_journal_replay == nullptr) {
412 wait_for_event_replay();
413 return;
414 }
415
416 dout(10) << dendl;
417 auto ctx = create_context_callback<
418 Replayer<I>, &Replayer<I>::handle_shut_down_local_journal_replay>(this);
419 m_local_journal_replay->shut_down(true, ctx);
420}
421
422template <typename I>
423void Replayer<I>::handle_shut_down_local_journal_replay(int r) {
424 dout(10) << "r=" << r << dendl;
425
426 std::unique_lock locker{m_lock};
427 if (r < 0) {
428 derr << "error shutting down journal replay: " << cpp_strerror(r) << dendl;
429 handle_replay_error(r, "failed to shut down local journal replay");
430 }
431
432 wait_for_event_replay();
433}
434
435template <typename I>
436void Replayer<I>::wait_for_event_replay() {
437 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
438
439 dout(10) << dendl;
440 auto ctx = create_async_context_callback(
441 m_threads->work_queue, create_context_callback<
442 Replayer<I>, &Replayer<I>::handle_wait_for_event_replay>(this));
443 m_event_replay_tracker.wait_for_ops(ctx);
444}
445
446template <typename I>
447void Replayer<I>::handle_wait_for_event_replay(int r) {
448 dout(10) << "r=" << r << dendl;
449
450 std::unique_lock locker{m_lock};
451 close_local_image();
452}
453
454template <typename I>
455void Replayer<I>::close_local_image() {
456 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
457 if (m_state_builder->local_image_ctx == nullptr) {
458 stop_remote_journaler_replay();
459 return;
460 }
461
462 dout(10) << dendl;
463 if (m_local_journal_listener != nullptr) {
464 // blocks if listener notification is in-progress
465 m_local_journal->remove_listener(m_local_journal_listener);
466 delete m_local_journal_listener;
467 m_local_journal_listener = nullptr;
468 }
469
470 if (m_local_journal_replay != nullptr) {
471 m_local_journal->stop_external_replay();
472 m_local_journal_replay = nullptr;
473 }
474
475 if (m_event_preprocessor != nullptr) {
476 image_replayer::journal::EventPreprocessor<I>::destroy(
477 m_event_preprocessor);
478 m_event_preprocessor = nullptr;
479 }
480
481 m_local_journal.reset();
482
483 // NOTE: it's important to ensure that the local image is fully
484 // closed before attempting to close the remote journal in
485 // case the remote cluster is unreachable
486 ceph_assert(m_state_builder->local_image_ctx != nullptr);
487 auto ctx = create_context_callback<
488 Replayer<I>, &Replayer<I>::handle_close_local_image>(this);
489 auto request = image_replayer::CloseImageRequest<I>::create(
490 &m_state_builder->local_image_ctx, ctx);
491 request->send();
492}
493
494
495template <typename I>
496void Replayer<I>::handle_close_local_image(int r) {
497 dout(10) << "r=" << r << dendl;
498
499 std::unique_lock locker{m_lock};
500 if (r < 0) {
501 derr << "error closing local iamge: " << cpp_strerror(r) << dendl;
502 handle_replay_error(r, "failed to close local image");
503 }
504
505 ceph_assert(m_state_builder->local_image_ctx == nullptr);
506 stop_remote_journaler_replay();
507}
508
509template <typename I>
510void Replayer<I>::stop_remote_journaler_replay() {
511 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
512
513 if (m_state_builder->remote_journaler == nullptr) {
514 wait_for_in_flight_ops();
515 return;
516 } else if (m_remote_replay_handler == nullptr) {
517 wait_for_in_flight_ops();
518 return;
519 }
520
521 dout(10) << dendl;
522 auto ctx = create_async_context_callback(
523 m_threads->work_queue, create_context_callback<
524 Replayer<I>, &Replayer<I>::handle_stop_remote_journaler_replay>(this));
525 m_state_builder->remote_journaler->stop_replay(ctx);
526}
527
528template <typename I>
529void Replayer<I>::handle_stop_remote_journaler_replay(int r) {
530 dout(10) << "r=" << r << dendl;
531
532 std::unique_lock locker{m_lock};
533 if (r < 0) {
534 derr << "failed to stop remote journaler replay : " << cpp_strerror(r)
535 << dendl;
536 handle_replay_error(r, "failed to stop remote journaler replay");
537 }
538
539 delete m_remote_replay_handler;
540 m_remote_replay_handler = nullptr;
541
542 wait_for_in_flight_ops();
543}
544
545template <typename I>
546void Replayer<I>::wait_for_in_flight_ops() {
547 dout(10) << dendl;
548 if (m_remote_listener != nullptr) {
549 m_state_builder->remote_journaler->remove_listener(m_remote_listener);
550 delete m_remote_listener;
551 m_remote_listener = nullptr;
552 }
553
554 auto ctx = create_async_context_callback(
555 m_threads->work_queue, create_context_callback<
556 Replayer<I>, &Replayer<I>::handle_wait_for_in_flight_ops>(this));
557 m_in_flight_op_tracker.wait_for_ops(ctx);
558}
559
560template <typename I>
561void Replayer<I>::handle_wait_for_in_flight_ops(int r) {
562 dout(10) << "r=" << r << dendl;
563
564 ReplayStatusFormatter<I>::destroy(m_replay_status_formatter);
565 m_replay_status_formatter = nullptr;
566
567 Context* on_init_shutdown = nullptr;
568 {
569 std::unique_lock locker{m_lock};
570 ceph_assert(m_on_init_shutdown != nullptr);
571 std::swap(m_on_init_shutdown, on_init_shutdown);
572 m_state = STATE_COMPLETE;
573 }
574 on_init_shutdown->complete(m_error_code);
575}
576
577template <typename I>
578void Replayer<I>::handle_remote_journal_metadata_updated() {
579 dout(20) << dendl;
580
581 std::unique_lock locker{m_lock};
582 if (m_state != STATE_REPLAYING) {
583 return;
584 }
585
586 cls::journal::Client remote_client;
587 int r = m_state_builder->remote_journaler->get_cached_client(
588 m_local_mirror_uuid, &remote_client);
589 if (r < 0) {
590 derr << "failed to retrieve client: " << cpp_strerror(r) << dendl;
591 return;
592 }
593
594 librbd::journal::MirrorPeerClientMeta remote_client_meta;
595 std::string error;
596 r = validate_remote_client_state(remote_client, &remote_client_meta,
597 &m_resync_requested, &error);
598 if (r < 0) {
599 dout(0) << "client flagged disconnected, stopping image replay" << dendl;
600 handle_replay_complete(locker, r, error);
601 }
602}
603
604template <typename I>
605void Replayer<I>::schedule_flush_local_replay_task() {
606 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
607
608 std::unique_lock timer_locker{m_threads->timer_lock};
609 if (m_state != STATE_REPLAYING || m_flush_local_replay_task != nullptr) {
610 return;
611 }
612
613 dout(15) << dendl;
614 m_flush_local_replay_task = create_async_context_callback(
615 m_threads->work_queue, create_context_callback<
616 Replayer<I>, &Replayer<I>::handle_flush_local_replay_task>(this));
617 m_threads->timer->add_event_after(30, m_flush_local_replay_task);
618}
619
620template <typename I>
621void Replayer<I>::cancel_flush_local_replay_task() {
622 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
623
624 std::unique_lock timer_locker{m_threads->timer_lock};
625 if (m_flush_local_replay_task != nullptr) {
626 dout(10) << dendl;
627 m_threads->timer->cancel_event(m_flush_local_replay_task);
628 m_flush_local_replay_task = nullptr;
629 }
630}
631
632template <typename I>
633void Replayer<I>::handle_flush_local_replay_task(int) {
634 dout(15) << dendl;
635
636 m_in_flight_op_tracker.start_op();
637 auto on_finish = new LambdaContext([this](int) {
638 std::unique_lock locker{m_lock};
639
640 {
641 std::unique_lock timer_locker{m_threads->timer_lock};
642 m_flush_local_replay_task = nullptr;
643 }
644
645 notify_status_updated();
646 m_in_flight_op_tracker.finish_op();
647 });
648 flush_local_replay(on_finish);
649}
650
651template <typename I>
652void Replayer<I>::flush_local_replay(Context* on_flush) {
653 std::unique_lock locker{m_lock};
654 if (m_state != STATE_REPLAYING) {
655 locker.unlock();
656 on_flush->complete(0);
657 return;
658 } else if (m_local_journal_replay == nullptr) {
659 // raced w/ a tag creation stop/start, which implies that
660 // the replay is flushed
661 locker.unlock();
662 flush_commit_position(on_flush);
663 return;
664 }
665
666 dout(15) << dendl;
667 auto ctx = new LambdaContext(
668 [this, on_flush](int r) {
669 handle_flush_local_replay(on_flush, r);
670 });
671 m_local_journal_replay->flush(ctx);
672}
673
674template <typename I>
675void Replayer<I>::handle_flush_local_replay(Context* on_flush, int r) {
676 dout(15) << "r=" << r << dendl;
677 if (r < 0) {
678 derr << "error flushing local replay: " << cpp_strerror(r) << dendl;
679 on_flush->complete(r);
680 return;
681 }
682
683 flush_commit_position(on_flush);
684}
685
686template <typename I>
687void Replayer<I>::flush_commit_position(Context* on_flush) {
688 std::unique_lock locker{m_lock};
689 if (m_state != STATE_REPLAYING) {
690 locker.unlock();
691 on_flush->complete(0);
692 return;
693 }
694
695 dout(15) << dendl;
696 auto ctx = new LambdaContext(
697 [this, on_flush](int r) {
698 handle_flush_commit_position(on_flush, r);
699 });
700 m_state_builder->remote_journaler->flush_commit_position(ctx);
701}
702
703template <typename I>
704void Replayer<I>::handle_flush_commit_position(Context* on_flush, int r) {
705 dout(15) << "r=" << r << dendl;
706 if (r < 0) {
707 derr << "error flushing remote journal commit position: "
708 << cpp_strerror(r) << dendl;
709 }
710
711 on_flush->complete(r);
712}
713
714template <typename I>
715void Replayer<I>::handle_replay_error(int r, const std::string &error) {
716 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
717
718 if (m_error_code == 0) {
719 m_error_code = r;
720 m_error_description = error;
721 }
722}
723
724template <typename I>
725bool Replayer<I>::is_replay_complete() const {
726 std::unique_lock locker{m_lock};
727 return is_replay_complete(locker);
728}
729
730template <typename I>
731bool Replayer<I>::is_replay_complete(
732 const std::unique_lock<ceph::mutex>&) const {
733 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
734 return (m_state == STATE_COMPLETE);
735}
736
737template <typename I>
738void Replayer<I>::handle_replay_complete(int r, const std::string &error) {
739 std::unique_lock locker{m_lock};
740 handle_replay_complete(locker, r, error);
741}
742
743template <typename I>
744void Replayer<I>::handle_replay_complete(
745 const std::unique_lock<ceph::mutex>&, int r, const std::string &error) {
746 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
747
748 dout(10) << "r=" << r << ", error=" << error << dendl;
749 if (r < 0) {
750 derr << "replay encountered an error: " << cpp_strerror(r) << dendl;
751 handle_replay_error(r, error);
752 }
753
754 if (m_state != STATE_REPLAYING) {
755 return;
756 }
757
758 m_state = STATE_COMPLETE;
759 notify_status_updated();
760}
761
762template <typename I>
763void Replayer<I>::handle_replay_ready() {
764 std::unique_lock locker{m_lock};
765 handle_replay_ready(locker);
766}
767
768template <typename I>
769void Replayer<I>::handle_replay_ready(
770 std::unique_lock<ceph::mutex>& locker) {
771 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
772
773 dout(20) << dendl;
774 if (is_replay_complete(locker)) {
775 return;
776 }
777
778 if (!m_state_builder->remote_journaler->try_pop_front(&m_replay_entry,
779 &m_replay_tag_tid)) {
780 dout(20) << "no entries ready for replay" << dendl;
781 return;
782 }
783
784 // can safely drop lock once the entry is tracked
785 m_event_replay_tracker.start_op();
786 locker.unlock();
787
788 dout(20) << "entry tid=" << m_replay_entry.get_commit_tid()
789 << "tag_tid=" << m_replay_tag_tid << dendl;
790 if (!m_replay_tag_valid || m_replay_tag.tid != m_replay_tag_tid) {
791 // must allocate a new local journal tag prior to processing
792 replay_flush();
793 return;
794 }
795
796 preprocess_entry();
797}
798
799template <typename I>
800void Replayer<I>::replay_flush() {
801 dout(10) << dendl;
802
803 // shut down the replay to flush all IO and ops and create a new
804 // replayer to handle the new tag epoch
805 auto ctx = create_context_callback<
806 Replayer<I>, &Replayer<I>::handle_replay_flush_shut_down>(this);
807 ceph_assert(m_local_journal_replay != nullptr);
808 m_local_journal_replay->shut_down(false, ctx);
809}
810
811template <typename I>
812void Replayer<I>::handle_replay_flush_shut_down(int r) {
813 {
814 std::unique_lock locker{m_lock};
815 ceph_assert(m_local_journal != nullptr);
816 m_local_journal->stop_external_replay();
817 m_local_journal_replay = nullptr;
818 }
819
820 dout(10) << "r=" << r << dendl;
821 if (r < 0) {
822 handle_replay_flush(r);
823 return;
824 }
825
826 auto ctx = create_context_callback<
827 Replayer<I>, &Replayer<I>::handle_replay_flush>(this);
828 m_local_journal->start_external_replay(&m_local_journal_replay, ctx);
829}
830
831template <typename I>
832void Replayer<I>::handle_replay_flush(int r) {
833 dout(10) << "r=" << r << dendl;
834 if (r < 0) {
835 derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl;
836 handle_replay_complete(r, "replay flush encountered an error");
837 m_event_replay_tracker.finish_op();
838 return;
839 } else if (is_replay_complete()) {
840 m_event_replay_tracker.finish_op();
841 return;
842 }
843
844 get_remote_tag();
845}
846
847template <typename I>
848void Replayer<I>::get_remote_tag() {
849 dout(15) << "tag_tid: " << m_replay_tag_tid << dendl;
850
851 Context *ctx = create_context_callback<
852 Replayer, &Replayer<I>::handle_get_remote_tag>(this);
853 m_state_builder->remote_journaler->get_tag(m_replay_tag_tid, &m_replay_tag,
854 ctx);
855}
856
857template <typename I>
858void Replayer<I>::handle_get_remote_tag(int r) {
859 dout(15) << "r=" << r << dendl;
860
861 if (r == 0) {
862 try {
863 auto it = m_replay_tag.data.cbegin();
864 decode(m_replay_tag_data, it);
865 } catch (const buffer::error &err) {
866 r = -EBADMSG;
867 }
868 }
869
870 if (r < 0) {
871 derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": "
872 << cpp_strerror(r) << dendl;
873 handle_replay_complete(r, "failed to retrieve remote tag");
874 m_event_replay_tracker.finish_op();
875 return;
876 }
877
878 m_replay_tag_valid = true;
879 dout(15) << "decoded remote tag " << m_replay_tag_tid << ": "
880 << m_replay_tag_data << dendl;
881
882 allocate_local_tag();
883}
884
885template <typename I>
886void Replayer<I>::allocate_local_tag() {
887 dout(15) << dendl;
888
889 std::string mirror_uuid = m_replay_tag_data.mirror_uuid;
890 if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
891 mirror_uuid = m_state_builder->remote_mirror_uuid;
892 } else if (mirror_uuid == m_local_mirror_uuid) {
893 mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
894 } else if (mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) {
895 // handle possible edge condition where daemon can failover and
896 // the local image has already been promoted/demoted
897 auto local_tag_data = m_local_journal->get_tag_data();
898 if (local_tag_data.mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID &&
899 (local_tag_data.predecessor.commit_valid &&
900 local_tag_data.predecessor.mirror_uuid ==
901 librbd::Journal<>::LOCAL_MIRROR_UUID)) {
902 dout(15) << "skipping stale demotion event" << dendl;
903 handle_process_entry_safe(m_replay_entry, m_replay_bytes,
904 m_replay_start_time, 0);
905 handle_replay_ready();
906 return;
907 } else {
908 dout(5) << "encountered image demotion: stopping" << dendl;
909 handle_replay_complete(0, "");
910 }
911 }
912
913 librbd::journal::TagPredecessor predecessor(m_replay_tag_data.predecessor);
914 if (predecessor.mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
915 predecessor.mirror_uuid = m_state_builder->remote_mirror_uuid;
916 } else if (predecessor.mirror_uuid == m_local_mirror_uuid) {
917 predecessor.mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
918 }
919
920 dout(15) << "mirror_uuid=" << mirror_uuid << ", "
921 << "predecessor=" << predecessor << ", "
922 << "replay_tag_tid=" << m_replay_tag_tid << dendl;
923 Context *ctx = create_context_callback<
924 Replayer, &Replayer<I>::handle_allocate_local_tag>(this);
925 m_local_journal->allocate_tag(mirror_uuid, predecessor, ctx);
926}
927
928template <typename I>
929void Replayer<I>::handle_allocate_local_tag(int r) {
930 dout(15) << "r=" << r << ", "
931 << "tag_tid=" << m_local_journal->get_tag_tid() << dendl;
932 if (r < 0) {
933 derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl;
934 handle_replay_complete(r, "failed to allocate journal tag");
935 m_event_replay_tracker.finish_op();
936 return;
937 }
938
939 preprocess_entry();
940}
941
942template <typename I>
943void Replayer<I>::preprocess_entry() {
944 dout(20) << "preprocessing entry tid=" << m_replay_entry.get_commit_tid()
945 << dendl;
946
947 bufferlist data = m_replay_entry.get_data();
948 auto it = data.cbegin();
949 int r = m_local_journal_replay->decode(&it, &m_event_entry);
950 if (r < 0) {
951 derr << "failed to decode journal event" << dendl;
952 handle_replay_complete(r, "failed to decode journal event");
953 m_event_replay_tracker.finish_op();
954 return;
955 }
956
957 m_replay_bytes = data.length();
958 uint32_t delay = calculate_replay_delay(
959 m_event_entry.timestamp,
960 m_state_builder->local_image_ctx->mirroring_replay_delay);
961 if (delay == 0) {
962 handle_preprocess_entry_ready(0);
963 return;
964 }
965
966 std::unique_lock locker{m_lock};
967 if (is_replay_complete(locker)) {
968 // don't schedule a delayed replay task if a shut-down is in-progress
969 m_event_replay_tracker.finish_op();
970 return;
971 }
972
973 dout(20) << "delaying replay by " << delay << " sec" << dendl;
974 std::unique_lock timer_locker{m_threads->timer_lock};
975 ceph_assert(m_delayed_preprocess_task == nullptr);
976 m_delayed_preprocess_task = create_context_callback<
977 Replayer<I>, &Replayer<I>::handle_delayed_preprocess_task>(this);
978 m_threads->timer->add_event_after(delay, m_delayed_preprocess_task);
979}
980
981template <typename I>
982void Replayer<I>::handle_delayed_preprocess_task(int r) {
983 dout(20) << "r=" << r << dendl;
984
985 ceph_assert(ceph_mutex_is_locked_by_me(m_threads->timer_lock));
986 m_delayed_preprocess_task = nullptr;
987
988 m_threads->work_queue->queue(create_context_callback<
989 Replayer, &Replayer<I>::handle_preprocess_entry_ready>(this), 0);
990}
991
992template <typename I>
993void Replayer<I>::handle_preprocess_entry_ready(int r) {
994 dout(20) << "r=" << r << dendl;
995 ceph_assert(r == 0);
996
997 m_replay_start_time = ceph_clock_now();
998 if (!m_event_preprocessor->is_required(m_event_entry)) {
999 process_entry();
1000 return;
1001 }
1002
1003 Context *ctx = create_context_callback<
1004 Replayer, &Replayer<I>::handle_preprocess_entry_safe>(this);
1005 m_event_preprocessor->preprocess(&m_event_entry, ctx);
1006}
1007
1008template <typename I>
1009void Replayer<I>::handle_preprocess_entry_safe(int r) {
1010 dout(20) << "r=" << r << dendl;
1011
1012 if (r < 0) {
1013 if (r == -ECANCELED) {
1014 handle_replay_complete(0, "lost exclusive lock");
1015 } else {
1016 derr << "failed to preprocess journal event" << dendl;
1017 handle_replay_complete(r, "failed to preprocess journal event");
1018 }
1019
1020 m_event_replay_tracker.finish_op();
1021 return;
1022 }
1023
1024 process_entry();
1025}
1026
1027template <typename I>
1028void Replayer<I>::process_entry() {
1029 dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid()
1030 << dendl;
1031
1032 Context *on_ready = create_context_callback<
1033 Replayer, &Replayer<I>::handle_process_entry_ready>(this);
1034 Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry),
1035 m_replay_bytes,
1036 m_replay_start_time);
1037
1038 m_local_journal_replay->process(m_event_entry, on_ready, on_commit);
1039}
1040
1041template <typename I>
1042void Replayer<I>::handle_process_entry_ready(int r) {
1043 std::unique_lock locker{m_lock};
1044
1045 dout(20) << dendl;
1046 ceph_assert(r == 0);
1047
1048 bool update_status = false;
1049 {
1050 auto local_image_ctx = m_state_builder->local_image_ctx;
1051 std::shared_lock image_locker{local_image_ctx->image_lock};
1052 auto image_spec = util::compute_image_spec(local_image_ctx->md_ctx,
1053 local_image_ctx->name);
1054 if (m_image_spec != image_spec) {
1055 m_image_spec = image_spec;
1056 update_status = true;
1057 }
1058 }
1059
1911f103
TL
1060 m_replay_status_formatter->handle_entry_processed(m_replay_bytes);
1061
9f95a23c
TL
1062 if (update_status) {
1063 unregister_perf_counters();
1064 register_perf_counters();
1065 notify_status_updated();
1066 }
1067
1068 // attempt to process the next event
1069 handle_replay_ready(locker);
1070}
1071
1072template <typename I>
1073void Replayer<I>::handle_process_entry_safe(
1074 const ReplayEntry &replay_entry, uint64_t replay_bytes,
1075 const utime_t &replay_start_time, int r) {
1076 dout(20) << "commit_tid=" << replay_entry.get_commit_tid() << ", r=" << r
1077 << dendl;
1078
1079 if (r < 0) {
1080 derr << "failed to commit journal event: " << cpp_strerror(r) << dendl;
1081 handle_replay_complete(r, "failed to commit journal event");
1082 } else {
1083 ceph_assert(m_state_builder->remote_journaler != nullptr);
1084 m_state_builder->remote_journaler->committed(replay_entry);
1085 }
1086
1087 auto latency = ceph_clock_now() - replay_start_time;
1088 if (g_perf_counters) {
1089 g_perf_counters->inc(l_rbd_mirror_replay);
1090 g_perf_counters->inc(l_rbd_mirror_replay_bytes, replay_bytes);
1091 g_perf_counters->tinc(l_rbd_mirror_replay_latency, latency);
1092 }
1093
1094 auto ctx = new LambdaContext(
1095 [this, replay_bytes, latency](int r) {
1096 std::unique_lock locker{m_lock};
1097 schedule_flush_local_replay_task();
1098
1099 if (m_perf_counters) {
1100 m_perf_counters->inc(l_rbd_mirror_replay);
1101 m_perf_counters->inc(l_rbd_mirror_replay_bytes, replay_bytes);
1102 m_perf_counters->tinc(l_rbd_mirror_replay_latency, latency);
1103 }
1104
1105 m_event_replay_tracker.finish_op();
1106 });
1107 m_threads->work_queue->queue(ctx, 0);
1108}
1109
1110template <typename I>
1111void Replayer<I>::handle_resync_image() {
1112 dout(10) << dendl;
1113
1114 std::unique_lock locker{m_lock};
1115 m_resync_requested = true;
1116 handle_replay_complete(locker, 0, "resync requested");
1117}
1118
1119template <typename I>
1120void Replayer<I>::notify_status_updated() {
1121 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1122
1123 dout(10) << dendl;
1124
1911f103 1125 auto ctx = new C_TrackedOp(m_in_flight_op_tracker, new LambdaContext(
9f95a23c
TL
1126 [this](int) {
1127 m_replayer_listener->handle_notification();
1128 }));
1129 m_threads->work_queue->queue(ctx, 0);
1130}
1131
1132template <typename I>
1133void Replayer<I>::cancel_delayed_preprocess_task() {
1134 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1135
1136 bool canceled_delayed_preprocess_task = false;
1137 {
1138 std::unique_lock timer_locker{m_threads->timer_lock};
1139 if (m_delayed_preprocess_task != nullptr) {
1140 dout(10) << dendl;
1141 canceled_delayed_preprocess_task = m_threads->timer->cancel_event(
1142 m_delayed_preprocess_task);
1143 ceph_assert(canceled_delayed_preprocess_task);
1144 m_delayed_preprocess_task = nullptr;
1145 }
1146 }
1147
1148 if (canceled_delayed_preprocess_task) {
1149 // wake up sleeping replay
1150 m_event_replay_tracker.finish_op();
1151 }
1152}
1153
1154template <typename I>
1155int Replayer<I>::validate_remote_client_state(
1156 const cls::journal::Client& remote_client,
1157 librbd::journal::MirrorPeerClientMeta* remote_client_meta,
1158 bool* resync_requested, std::string* error) {
1159 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1160
1161 if (!util::decode_client_meta(remote_client, remote_client_meta)) {
1162 // require operator intervention since the data is corrupt
1163 *error = "error retrieving remote journal client";
1164 return -EBADMSG;
1165 }
1166
1167 auto local_image_ctx = m_state_builder->local_image_ctx;
1168 dout(5) << "image_id=" << local_image_ctx->id << ", "
1169 << "remote_client_meta.image_id="
1170 << remote_client_meta->image_id << ", "
1171 << "remote_client.state=" << remote_client.state << dendl;
1172 if (remote_client_meta->image_id == local_image_ctx->id &&
1173 remote_client.state != cls::journal::CLIENT_STATE_CONNECTED) {
1174 dout(5) << "client flagged disconnected, stopping image replay" << dendl;
1175 if (local_image_ctx->config.template get_val<bool>(
1176 "rbd_mirroring_resync_after_disconnect")) {
1177 dout(10) << "disconnected: automatic resync" << dendl;
1178 *resync_requested = true;
1179 *error = "disconnected: automatic resync";
1180 return -ENOTCONN;
1181 } else {
1182 dout(10) << "disconnected" << dendl;
1183 *error = "disconnected";
1184 return -ENOTCONN;
1185 }
1186 }
1187
1188 return 0;
1189}
1190
1191template <typename I>
1192void Replayer<I>::register_perf_counters() {
1193 dout(5) << dendl;
1194
1195 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1196 ceph_assert(m_perf_counters == nullptr);
1197
1198 auto cct = static_cast<CephContext *>(m_state_builder->local_image_ctx->cct);
1199 auto prio = cct->_conf.get_val<int64_t>("rbd_mirror_image_perf_stats_prio");
1200 PerfCountersBuilder plb(g_ceph_context, "rbd_mirror_image_" + m_image_spec,
1201 l_rbd_mirror_first, l_rbd_mirror_last);
1202 plb.add_u64_counter(l_rbd_mirror_replay, "replay", "Replays", "r", prio);
1203 plb.add_u64_counter(l_rbd_mirror_replay_bytes, "replay_bytes",
1204 "Replayed data", "rb", prio, unit_t(UNIT_BYTES));
1205 plb.add_time_avg(l_rbd_mirror_replay_latency, "replay_latency",
1206 "Replay latency", "rl", prio);
1207 m_perf_counters = plb.create_perf_counters();
1208 g_ceph_context->get_perfcounters_collection()->add(m_perf_counters);
1209}
1210
1211template <typename I>
1212void Replayer<I>::unregister_perf_counters() {
1213 dout(5) << dendl;
1214 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1215
1216 PerfCounters *perf_counters = nullptr;
1217 std::swap(perf_counters, m_perf_counters);
1218
1219 if (perf_counters != nullptr) {
1220 g_ceph_context->get_perfcounters_collection()->remove(perf_counters);
1221 delete perf_counters;
1222 }
1223}
1224
1225} // namespace journal
1226} // namespace image_replayer
1227} // namespace mirror
1228} // namespace rbd
1229
1230template class rbd::mirror::image_replayer::journal::Replayer<librbd::ImageCtx>;