]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/image_replayer/journal/Replayer.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / tools / rbd_mirror / image_replayer / journal / Replayer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "Replayer.h"
5 #include "common/debug.h"
6 #include "common/errno.h"
7 #include "common/Timer.h"
8 #include "common/WorkQueue.h"
9 #include "librbd/Journal.h"
10 #include "librbd/Utils.h"
11 #include "librbd/journal/Replay.h"
12 #include "journal/Journaler.h"
13 #include "journal/JournalMetadataListener.h"
14 #include "journal/ReplayHandler.h"
15 #include "tools/rbd_mirror/Threads.h"
16 #include "tools/rbd_mirror/Types.h"
17 #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
18 #include "tools/rbd_mirror/image_replayer/ReplayerListener.h"
19 #include "tools/rbd_mirror/image_replayer/Utils.h"
20 #include "tools/rbd_mirror/image_replayer/journal/EventPreprocessor.h"
21 #include "tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h"
22 #include "tools/rbd_mirror/image_replayer/journal/StateBuilder.h"
23
24 #define dout_context g_ceph_context
25 #define dout_subsys ceph_subsys_rbd_mirror
26 #undef dout_prefix
27 #define dout_prefix *_dout << "rbd::mirror::image_replayer::journal::" \
28 << "Replayer: " << this << " " << __func__ << ": "
29
30 extern PerfCounters *g_perf_counters;
31
32 namespace rbd {
33 namespace mirror {
34 namespace image_replayer {
35 namespace journal {
36
37 namespace {
38
39 uint32_t calculate_replay_delay(const utime_t &event_time,
40 int mirroring_replay_delay) {
41 if (mirroring_replay_delay <= 0) {
42 return 0;
43 }
44
45 utime_t now = ceph_clock_now();
46 if (event_time + mirroring_replay_delay <= now) {
47 return 0;
48 }
49
50 // ensure it is rounded up when converting to integer
51 return (event_time + mirroring_replay_delay - now) + 1;
52 }
53
54 } // anonymous namespace
55
56 using librbd::util::create_async_context_callback;
57 using librbd::util::create_context_callback;
58
59 template <typename I>
60 struct Replayer<I>::C_ReplayCommitted : public Context {
61 Replayer* replayer;
62 ReplayEntry replay_entry;
63 uint64_t replay_bytes;
64 utime_t replay_start_time;
65
66 C_ReplayCommitted(Replayer* replayer, ReplayEntry &&replay_entry,
67 uint64_t replay_bytes, const utime_t &replay_start_time)
68 : replayer(replayer), replay_entry(std::move(replay_entry)),
69 replay_bytes(replay_bytes), replay_start_time(replay_start_time) {
70 }
71
72 void finish(int r) override {
73 replayer->handle_process_entry_safe(replay_entry, replay_bytes,
74 replay_start_time, r);
75 }
76 };
77
78 template <typename I>
79 struct Replayer<I>::C_TrackedOp : public Context {
80 Replayer *replayer;
81 Context* ctx;
82
83 C_TrackedOp(Replayer* replayer, Context* ctx)
84 : replayer(replayer), ctx(ctx) {
85 replayer->m_in_flight_op_tracker.start_op();
86 }
87
88 void finish(int r) override {
89 ctx->complete(r);
90 replayer->m_in_flight_op_tracker.finish_op();
91 }
92 };
93
94 template <typename I>
95 struct Replayer<I>::RemoteJournalerListener
96 : public ::journal::JournalMetadataListener {
97 Replayer* replayer;
98
99 RemoteJournalerListener(Replayer* replayer) : replayer(replayer) {}
100
101 void handle_update(::journal::JournalMetadata*) override {
102 auto ctx = new C_TrackedOp(replayer, new LambdaContext([this](int r) {
103 replayer->handle_remote_journal_metadata_updated();
104 }));
105 replayer->m_threads->work_queue->queue(ctx, 0);
106 }
107 };
108
109 template <typename I>
110 struct Replayer<I>::RemoteReplayHandler : public ::journal::ReplayHandler {
111 Replayer* replayer;
112
113 RemoteReplayHandler(Replayer* replayer) : replayer(replayer) {}
114 ~RemoteReplayHandler() override {};
115
116 void handle_entries_available() override {
117 replayer->handle_replay_ready();
118 }
119
120 void handle_complete(int r) override {
121 std::string error;
122 if (r == -ENOMEM) {
123 error = "not enough memory in autotune cache";
124 } else if (r < 0) {
125 error = "replay completed with error: " + cpp_strerror(r);
126 }
127 replayer->handle_replay_complete(r, error);
128 }
129 };
130
131 template <typename I>
132 struct Replayer<I>::LocalJournalListener
133 : public librbd::journal::Listener {
134 Replayer* replayer;
135
136 LocalJournalListener(Replayer* replayer) : replayer(replayer) {
137 }
138
139 void handle_close() override {
140 replayer->handle_replay_complete(0, "");
141 }
142
143 void handle_promoted() override {
144 replayer->handle_replay_complete(0, "force promoted");
145 }
146
147 void handle_resync() override {
148 replayer->handle_resync_image();
149 }
150 };
151
152 template <typename I>
153 Replayer<I>::Replayer(
154 Threads<I>* threads,
155 const std::string& local_mirror_uuid,
156 StateBuilder<I>* state_builder,
157 ReplayerListener* replayer_listener)
158 : m_threads(threads),
159 m_local_mirror_uuid(local_mirror_uuid),
160 m_state_builder(state_builder),
161 m_replayer_listener(replayer_listener),
162 m_lock(ceph::make_mutex(librbd::util::unique_lock_name(
163 "rbd::mirror::image_replayer::journal::Replayer", this))) {
164 dout(10) << dendl;
165
166 {
167 std::unique_lock locker{m_lock};
168 register_perf_counters();
169 }
170 }
171
172 template <typename I>
173 Replayer<I>::~Replayer() {
174 dout(10) << dendl;
175
176 {
177 std::unique_lock locker{m_lock};
178 unregister_perf_counters();
179 }
180
181 ceph_assert(m_remote_listener == nullptr);
182 ceph_assert(m_local_journal_listener == nullptr);
183 ceph_assert(m_local_journal_replay == nullptr);
184 ceph_assert(m_remote_replay_handler == nullptr);
185 ceph_assert(m_event_preprocessor == nullptr);
186 ceph_assert(m_replay_status_formatter == nullptr);
187 ceph_assert(m_delayed_preprocess_task == nullptr);
188 ceph_assert(m_flush_local_replay_task == nullptr);
189 ceph_assert(m_state_builder->local_image_ctx == nullptr);
190 }
191
192 template <typename I>
193 void Replayer<I>::init(Context* on_finish) {
194 dout(10) << dendl;
195
196 ceph_assert(m_local_journal == nullptr);
197 {
198 auto local_image_ctx = m_state_builder->local_image_ctx;
199 std::shared_lock image_locker{local_image_ctx->image_lock};
200 m_image_spec = util::compute_image_spec(local_image_ctx->md_ctx,
201 local_image_ctx->name);
202 m_local_journal = local_image_ctx->journal;
203 }
204
205 ceph_assert(m_on_init_shutdown == nullptr);
206 m_on_init_shutdown = on_finish;
207
208 if (m_local_journal == nullptr) {
209 std::unique_lock locker{m_lock};
210 m_state = STATE_COMPLETE;
211 m_state_builder->remote_journaler = nullptr;
212
213 handle_replay_complete(locker, -EINVAL, "error accessing local journal");
214 close_local_image();
215 return;
216 }
217
218 init_remote_journaler();
219 }
220
221 template <typename I>
222 void Replayer<I>::shut_down(Context* on_finish) {
223 dout(10) << dendl;
224
225 std::unique_lock locker{m_lock};
226 ceph_assert(m_on_init_shutdown == nullptr);
227 m_on_init_shutdown = on_finish;
228
229 if (m_state == STATE_INIT) {
230 // raced with the last piece of the init state machine
231 return;
232 } else if (m_state == STATE_REPLAYING) {
233 m_state = STATE_COMPLETE;
234 }
235
236 // if shutting down due to an error notification, we don't
237 // need to propagate the same error again
238 m_error_code = 0;
239 m_error_description = "";
240
241 cancel_delayed_preprocess_task();
242 cancel_flush_local_replay_task();
243 shut_down_local_journal_replay();
244 }
245
246 template <typename I>
247 void Replayer<I>::flush(Context* on_finish) {
248 dout(10) << dendl;
249
250 flush_local_replay(new C_TrackedOp(this, on_finish));
251 }
252
253 template <typename I>
254 bool Replayer<I>::get_replay_status(std::string* description,
255 Context* on_finish) {
256 dout(10) << dendl;
257
258 std::unique_lock locker{m_lock};
259 if (m_replay_status_formatter == nullptr) {
260 derr << "replay not running" << dendl;
261 locker.unlock();
262
263 on_finish->complete(-EAGAIN);
264 return false;
265 }
266
267 on_finish = new C_TrackedOp(this, on_finish);
268 return m_replay_status_formatter->get_or_send_update(description,
269 on_finish);
270 }
271
272 template <typename I>
273 void Replayer<I>::init_remote_journaler() {
274 dout(10) << dendl;
275
276 Context *ctx = create_context_callback<
277 Replayer, &Replayer<I>::handle_init_remote_journaler>(this);
278 m_state_builder->remote_journaler->init(ctx);
279 }
280
281 template <typename I>
282 void Replayer<I>::handle_init_remote_journaler(int r) {
283 dout(10) << "r=" << r << dendl;
284
285 std::unique_lock locker{m_lock};
286 if (r < 0) {
287 derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl;
288 handle_replay_complete(locker, r, "error initializing remote journal");
289 close_local_image();
290 return;
291 }
292
293 // listen for metadata updates to check for disconnect events
294 ceph_assert(m_remote_listener == nullptr);
295 m_remote_listener = new RemoteJournalerListener(this);
296 m_state_builder->remote_journaler->add_listener(m_remote_listener);
297
298 cls::journal::Client remote_client;
299 r = m_state_builder->remote_journaler->get_cached_client(m_local_mirror_uuid,
300 &remote_client);
301 if (r < 0) {
302 derr << "error retrieving remote journal client: " << cpp_strerror(r)
303 << dendl;
304 handle_replay_complete(locker, r, "error retrieving remote journal client");
305 close_local_image();
306 return;
307 }
308
309 std::string error;
310 r = validate_remote_client_state(remote_client,
311 &m_state_builder->remote_client_meta,
312 &m_resync_requested, &error);
313 if (r < 0) {
314 handle_replay_complete(locker, r, error);
315 close_local_image();
316 return;
317 }
318
319 start_external_replay();
320 }
321
322 template <typename I>
323 void Replayer<I>::start_external_replay() {
324 dout(10) << dendl;
325
326 Context *start_ctx = create_context_callback<
327 Replayer, &Replayer<I>::handle_start_external_replay>(this);
328 m_local_journal->start_external_replay(&m_local_journal_replay, start_ctx);
329 }
330
331 template <typename I>
332 void Replayer<I>::handle_start_external_replay(int r) {
333 dout(10) << "r=" << r << dendl;
334
335 std::unique_lock locker{m_lock};
336 if (r < 0) {
337 ceph_assert(m_local_journal_replay == nullptr);
338 derr << "error starting external replay on local image "
339 << m_state_builder->local_image_ctx->id << ": "
340 << cpp_strerror(r) << dendl;
341
342 handle_replay_complete(locker, r, "error starting replay on local image");
343 close_local_image();
344 return;
345 }
346
347 if (!notify_init_complete(locker)) {
348 return;
349 }
350
351 m_state = STATE_REPLAYING;
352
353 // listen for promotion and resync requests against local journal
354 m_local_journal_listener = new LocalJournalListener(this);
355 m_local_journal->add_listener(m_local_journal_listener);
356
357 // verify that the local image wasn't force-promoted and that a resync hasn't
358 // been requested now that we are listening for events
359 if (m_local_journal->is_tag_owner()) {
360 dout(10) << "local image force-promoted" << dendl;
361 handle_replay_complete(locker, 0, "force promoted");
362 return;
363 }
364
365 bool resync_requested = false;
366 r = m_local_journal->is_resync_requested(&resync_requested);
367 if (r < 0) {
368 dout(10) << "failed to determine resync state: " << cpp_strerror(r)
369 << dendl;
370 handle_replay_complete(locker, r, "error parsing resync state");
371 return;
372 } else if (resync_requested) {
373 dout(10) << "local image resync requested" << dendl;
374 handle_replay_complete(locker, 0, "resync requested");
375 return;
376 }
377
378 // start remote journal replay
379 m_event_preprocessor = EventPreprocessor<I>::create(
380 *m_state_builder->local_image_ctx, *m_state_builder->remote_journaler,
381 m_local_mirror_uuid, &m_state_builder->remote_client_meta,
382 m_threads->work_queue);
383 m_replay_status_formatter = ReplayStatusFormatter<I>::create(
384 m_state_builder->remote_journaler, m_local_mirror_uuid);
385
386 auto cct = static_cast<CephContext *>(m_state_builder->local_image_ctx->cct);
387 double poll_seconds = cct->_conf.get_val<double>(
388 "rbd_mirror_journal_poll_age");
389 m_remote_replay_handler = new RemoteReplayHandler(this);
390 m_state_builder->remote_journaler->start_live_replay(m_remote_replay_handler,
391 poll_seconds);
392
393 notify_status_updated();
394 }
395
396 template <typename I>
397 bool Replayer<I>::notify_init_complete(std::unique_lock<ceph::mutex>& locker) {
398 dout(10) << dendl;
399
400 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
401 ceph_assert(m_state == STATE_INIT);
402
403 // notify that init has completed
404 Context *on_finish = nullptr;
405 std::swap(m_on_init_shutdown, on_finish);
406
407 locker.unlock();
408 on_finish->complete(0);
409 locker.lock();
410
411 if (m_on_init_shutdown != nullptr) {
412 // shut down requested after we notified init complete but before we
413 // grabbed the lock
414 close_local_image();
415 return false;
416 }
417
418 return true;
419 }
420
421 template <typename I>
422 void Replayer<I>::shut_down_local_journal_replay() {
423 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
424
425 if (m_local_journal_replay == nullptr) {
426 wait_for_event_replay();
427 return;
428 }
429
430 dout(10) << dendl;
431 auto ctx = create_context_callback<
432 Replayer<I>, &Replayer<I>::handle_shut_down_local_journal_replay>(this);
433 m_local_journal_replay->shut_down(true, ctx);
434 }
435
436 template <typename I>
437 void Replayer<I>::handle_shut_down_local_journal_replay(int r) {
438 dout(10) << "r=" << r << dendl;
439
440 std::unique_lock locker{m_lock};
441 if (r < 0) {
442 derr << "error shutting down journal replay: " << cpp_strerror(r) << dendl;
443 handle_replay_error(r, "failed to shut down local journal replay");
444 }
445
446 wait_for_event_replay();
447 }
448
449 template <typename I>
450 void Replayer<I>::wait_for_event_replay() {
451 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
452
453 dout(10) << dendl;
454 auto ctx = create_async_context_callback(
455 m_threads->work_queue, create_context_callback<
456 Replayer<I>, &Replayer<I>::handle_wait_for_event_replay>(this));
457 m_event_replay_tracker.wait_for_ops(ctx);
458 }
459
460 template <typename I>
461 void Replayer<I>::handle_wait_for_event_replay(int r) {
462 dout(10) << "r=" << r << dendl;
463
464 std::unique_lock locker{m_lock};
465 close_local_image();
466 }
467
468 template <typename I>
469 void Replayer<I>::close_local_image() {
470 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
471 if (m_state_builder->local_image_ctx == nullptr) {
472 stop_remote_journaler_replay();
473 return;
474 }
475
476 dout(10) << dendl;
477 if (m_local_journal_listener != nullptr) {
478 // blocks if listener notification is in-progress
479 m_local_journal->remove_listener(m_local_journal_listener);
480 delete m_local_journal_listener;
481 m_local_journal_listener = nullptr;
482 }
483
484 if (m_local_journal_replay != nullptr) {
485 m_local_journal->stop_external_replay();
486 m_local_journal_replay = nullptr;
487 }
488
489 if (m_event_preprocessor != nullptr) {
490 image_replayer::journal::EventPreprocessor<I>::destroy(
491 m_event_preprocessor);
492 m_event_preprocessor = nullptr;
493 }
494
495 m_local_journal.reset();
496
497 // NOTE: it's important to ensure that the local image is fully
498 // closed before attempting to close the remote journal in
499 // case the remote cluster is unreachable
500 ceph_assert(m_state_builder->local_image_ctx != nullptr);
501 auto ctx = create_context_callback<
502 Replayer<I>, &Replayer<I>::handle_close_local_image>(this);
503 auto request = image_replayer::CloseImageRequest<I>::create(
504 &m_state_builder->local_image_ctx, ctx);
505 request->send();
506 }
507
508
509 template <typename I>
510 void Replayer<I>::handle_close_local_image(int r) {
511 dout(10) << "r=" << r << dendl;
512
513 std::unique_lock locker{m_lock};
514 if (r < 0) {
515 derr << "error closing local iamge: " << cpp_strerror(r) << dendl;
516 handle_replay_error(r, "failed to close local image");
517 }
518
519 ceph_assert(m_state_builder->local_image_ctx == nullptr);
520 stop_remote_journaler_replay();
521 }
522
523 template <typename I>
524 void Replayer<I>::stop_remote_journaler_replay() {
525 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
526
527 if (m_state_builder->remote_journaler == nullptr) {
528 wait_for_in_flight_ops();
529 return;
530 } else if (m_remote_replay_handler == nullptr) {
531 wait_for_in_flight_ops();
532 return;
533 }
534
535 dout(10) << dendl;
536 auto ctx = create_async_context_callback(
537 m_threads->work_queue, create_context_callback<
538 Replayer<I>, &Replayer<I>::handle_stop_remote_journaler_replay>(this));
539 m_state_builder->remote_journaler->stop_replay(ctx);
540 }
541
542 template <typename I>
543 void Replayer<I>::handle_stop_remote_journaler_replay(int r) {
544 dout(10) << "r=" << r << dendl;
545
546 std::unique_lock locker{m_lock};
547 if (r < 0) {
548 derr << "failed to stop remote journaler replay : " << cpp_strerror(r)
549 << dendl;
550 handle_replay_error(r, "failed to stop remote journaler replay");
551 }
552
553 delete m_remote_replay_handler;
554 m_remote_replay_handler = nullptr;
555
556 wait_for_in_flight_ops();
557 }
558
559 template <typename I>
560 void Replayer<I>::wait_for_in_flight_ops() {
561 dout(10) << dendl;
562 if (m_remote_listener != nullptr) {
563 m_state_builder->remote_journaler->remove_listener(m_remote_listener);
564 delete m_remote_listener;
565 m_remote_listener = nullptr;
566 }
567
568 auto ctx = create_async_context_callback(
569 m_threads->work_queue, create_context_callback<
570 Replayer<I>, &Replayer<I>::handle_wait_for_in_flight_ops>(this));
571 m_in_flight_op_tracker.wait_for_ops(ctx);
572 }
573
574 template <typename I>
575 void Replayer<I>::handle_wait_for_in_flight_ops(int r) {
576 dout(10) << "r=" << r << dendl;
577
578 ReplayStatusFormatter<I>::destroy(m_replay_status_formatter);
579 m_replay_status_formatter = nullptr;
580
581 Context* on_init_shutdown = nullptr;
582 {
583 std::unique_lock locker{m_lock};
584 ceph_assert(m_on_init_shutdown != nullptr);
585 std::swap(m_on_init_shutdown, on_init_shutdown);
586 m_state = STATE_COMPLETE;
587 }
588 on_init_shutdown->complete(m_error_code);
589 }
590
591 template <typename I>
592 void Replayer<I>::handle_remote_journal_metadata_updated() {
593 dout(20) << dendl;
594
595 std::unique_lock locker{m_lock};
596 if (m_state != STATE_REPLAYING) {
597 return;
598 }
599
600 cls::journal::Client remote_client;
601 int r = m_state_builder->remote_journaler->get_cached_client(
602 m_local_mirror_uuid, &remote_client);
603 if (r < 0) {
604 derr << "failed to retrieve client: " << cpp_strerror(r) << dendl;
605 return;
606 }
607
608 librbd::journal::MirrorPeerClientMeta remote_client_meta;
609 std::string error;
610 r = validate_remote_client_state(remote_client, &remote_client_meta,
611 &m_resync_requested, &error);
612 if (r < 0) {
613 dout(0) << "client flagged disconnected, stopping image replay" << dendl;
614 handle_replay_complete(locker, r, error);
615 }
616 }
617
618 template <typename I>
619 void Replayer<I>::schedule_flush_local_replay_task() {
620 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
621
622 std::unique_lock timer_locker{m_threads->timer_lock};
623 if (m_state != STATE_REPLAYING || m_flush_local_replay_task != nullptr) {
624 return;
625 }
626
627 dout(15) << dendl;
628 m_flush_local_replay_task = create_async_context_callback(
629 m_threads->work_queue, create_context_callback<
630 Replayer<I>, &Replayer<I>::handle_flush_local_replay_task>(this));
631 m_threads->timer->add_event_after(30, m_flush_local_replay_task);
632 }
633
634 template <typename I>
635 void Replayer<I>::cancel_flush_local_replay_task() {
636 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
637
638 std::unique_lock timer_locker{m_threads->timer_lock};
639 if (m_flush_local_replay_task != nullptr) {
640 dout(10) << dendl;
641 m_threads->timer->cancel_event(m_flush_local_replay_task);
642 m_flush_local_replay_task = nullptr;
643 }
644 }
645
646 template <typename I>
647 void Replayer<I>::handle_flush_local_replay_task(int) {
648 dout(15) << dendl;
649
650 m_in_flight_op_tracker.start_op();
651 auto on_finish = new LambdaContext([this](int) {
652 std::unique_lock locker{m_lock};
653
654 {
655 std::unique_lock timer_locker{m_threads->timer_lock};
656 m_flush_local_replay_task = nullptr;
657 }
658
659 notify_status_updated();
660 m_in_flight_op_tracker.finish_op();
661 });
662 flush_local_replay(on_finish);
663 }
664
665 template <typename I>
666 void Replayer<I>::flush_local_replay(Context* on_flush) {
667 std::unique_lock locker{m_lock};
668 if (m_state != STATE_REPLAYING) {
669 locker.unlock();
670 on_flush->complete(0);
671 return;
672 } else if (m_local_journal_replay == nullptr) {
673 // raced w/ a tag creation stop/start, which implies that
674 // the replay is flushed
675 locker.unlock();
676 flush_commit_position(on_flush);
677 return;
678 }
679
680 dout(15) << dendl;
681 auto ctx = new LambdaContext(
682 [this, on_flush](int r) {
683 handle_flush_local_replay(on_flush, r);
684 });
685 m_local_journal_replay->flush(ctx);
686 }
687
688 template <typename I>
689 void Replayer<I>::handle_flush_local_replay(Context* on_flush, int r) {
690 dout(15) << "r=" << r << dendl;
691 if (r < 0) {
692 derr << "error flushing local replay: " << cpp_strerror(r) << dendl;
693 on_flush->complete(r);
694 return;
695 }
696
697 flush_commit_position(on_flush);
698 }
699
700 template <typename I>
701 void Replayer<I>::flush_commit_position(Context* on_flush) {
702 std::unique_lock locker{m_lock};
703 if (m_state != STATE_REPLAYING) {
704 locker.unlock();
705 on_flush->complete(0);
706 return;
707 }
708
709 dout(15) << dendl;
710 auto ctx = new LambdaContext(
711 [this, on_flush](int r) {
712 handle_flush_commit_position(on_flush, r);
713 });
714 m_state_builder->remote_journaler->flush_commit_position(ctx);
715 }
716
717 template <typename I>
718 void Replayer<I>::handle_flush_commit_position(Context* on_flush, int r) {
719 dout(15) << "r=" << r << dendl;
720 if (r < 0) {
721 derr << "error flushing remote journal commit position: "
722 << cpp_strerror(r) << dendl;
723 }
724
725 on_flush->complete(r);
726 }
727
728 template <typename I>
729 void Replayer<I>::handle_replay_error(int r, const std::string &error) {
730 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
731
732 if (m_error_code == 0) {
733 m_error_code = r;
734 m_error_description = error;
735 }
736 }
737
738 template <typename I>
739 bool Replayer<I>::is_replay_complete() const {
740 std::unique_lock locker{m_lock};
741 return is_replay_complete(locker);
742 }
743
744 template <typename I>
745 bool Replayer<I>::is_replay_complete(
746 const std::unique_lock<ceph::mutex>&) const {
747 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
748 return (m_state == STATE_COMPLETE);
749 }
750
751 template <typename I>
752 void Replayer<I>::handle_replay_complete(int r, const std::string &error) {
753 std::unique_lock locker{m_lock};
754 handle_replay_complete(locker, r, error);
755 }
756
757 template <typename I>
758 void Replayer<I>::handle_replay_complete(
759 const std::unique_lock<ceph::mutex>&, int r, const std::string &error) {
760 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
761
762 dout(10) << "r=" << r << ", error=" << error << dendl;
763 if (r < 0) {
764 derr << "replay encountered an error: " << cpp_strerror(r) << dendl;
765 handle_replay_error(r, error);
766 }
767
768 if (m_state != STATE_REPLAYING) {
769 return;
770 }
771
772 m_state = STATE_COMPLETE;
773 notify_status_updated();
774 }
775
776 template <typename I>
777 void Replayer<I>::handle_replay_ready() {
778 std::unique_lock locker{m_lock};
779 handle_replay_ready(locker);
780 }
781
782 template <typename I>
783 void Replayer<I>::handle_replay_ready(
784 std::unique_lock<ceph::mutex>& locker) {
785 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
786
787 dout(20) << dendl;
788 if (is_replay_complete(locker)) {
789 return;
790 }
791
792 if (!m_state_builder->remote_journaler->try_pop_front(&m_replay_entry,
793 &m_replay_tag_tid)) {
794 dout(20) << "no entries ready for replay" << dendl;
795 return;
796 }
797
798 // can safely drop lock once the entry is tracked
799 m_event_replay_tracker.start_op();
800 locker.unlock();
801
802 dout(20) << "entry tid=" << m_replay_entry.get_commit_tid()
803 << "tag_tid=" << m_replay_tag_tid << dendl;
804 if (!m_replay_tag_valid || m_replay_tag.tid != m_replay_tag_tid) {
805 // must allocate a new local journal tag prior to processing
806 replay_flush();
807 return;
808 }
809
810 preprocess_entry();
811 }
812
813 template <typename I>
814 void Replayer<I>::replay_flush() {
815 dout(10) << dendl;
816
817 // shut down the replay to flush all IO and ops and create a new
818 // replayer to handle the new tag epoch
819 auto ctx = create_context_callback<
820 Replayer<I>, &Replayer<I>::handle_replay_flush_shut_down>(this);
821 ceph_assert(m_local_journal_replay != nullptr);
822 m_local_journal_replay->shut_down(false, ctx);
823 }
824
825 template <typename I>
826 void Replayer<I>::handle_replay_flush_shut_down(int r) {
827 {
828 std::unique_lock locker{m_lock};
829 ceph_assert(m_local_journal != nullptr);
830 m_local_journal->stop_external_replay();
831 m_local_journal_replay = nullptr;
832 }
833
834 dout(10) << "r=" << r << dendl;
835 if (r < 0) {
836 handle_replay_flush(r);
837 return;
838 }
839
840 auto ctx = create_context_callback<
841 Replayer<I>, &Replayer<I>::handle_replay_flush>(this);
842 m_local_journal->start_external_replay(&m_local_journal_replay, ctx);
843 }
844
845 template <typename I>
846 void Replayer<I>::handle_replay_flush(int r) {
847 dout(10) << "r=" << r << dendl;
848 if (r < 0) {
849 derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl;
850 handle_replay_complete(r, "replay flush encountered an error");
851 m_event_replay_tracker.finish_op();
852 return;
853 } else if (is_replay_complete()) {
854 m_event_replay_tracker.finish_op();
855 return;
856 }
857
858 get_remote_tag();
859 }
860
861 template <typename I>
862 void Replayer<I>::get_remote_tag() {
863 dout(15) << "tag_tid: " << m_replay_tag_tid << dendl;
864
865 Context *ctx = create_context_callback<
866 Replayer, &Replayer<I>::handle_get_remote_tag>(this);
867 m_state_builder->remote_journaler->get_tag(m_replay_tag_tid, &m_replay_tag,
868 ctx);
869 }
870
871 template <typename I>
872 void Replayer<I>::handle_get_remote_tag(int r) {
873 dout(15) << "r=" << r << dendl;
874
875 if (r == 0) {
876 try {
877 auto it = m_replay_tag.data.cbegin();
878 decode(m_replay_tag_data, it);
879 } catch (const buffer::error &err) {
880 r = -EBADMSG;
881 }
882 }
883
884 if (r < 0) {
885 derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": "
886 << cpp_strerror(r) << dendl;
887 handle_replay_complete(r, "failed to retrieve remote tag");
888 m_event_replay_tracker.finish_op();
889 return;
890 }
891
892 m_replay_tag_valid = true;
893 dout(15) << "decoded remote tag " << m_replay_tag_tid << ": "
894 << m_replay_tag_data << dendl;
895
896 allocate_local_tag();
897 }
898
899 template <typename I>
900 void Replayer<I>::allocate_local_tag() {
901 dout(15) << dendl;
902
903 std::string mirror_uuid = m_replay_tag_data.mirror_uuid;
904 if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
905 mirror_uuid = m_state_builder->remote_mirror_uuid;
906 } else if (mirror_uuid == m_local_mirror_uuid) {
907 mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
908 } else if (mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) {
909 // handle possible edge condition where daemon can failover and
910 // the local image has already been promoted/demoted
911 auto local_tag_data = m_local_journal->get_tag_data();
912 if (local_tag_data.mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID &&
913 (local_tag_data.predecessor.commit_valid &&
914 local_tag_data.predecessor.mirror_uuid ==
915 librbd::Journal<>::LOCAL_MIRROR_UUID)) {
916 dout(15) << "skipping stale demotion event" << dendl;
917 handle_process_entry_safe(m_replay_entry, m_replay_bytes,
918 m_replay_start_time, 0);
919 handle_replay_ready();
920 return;
921 } else {
922 dout(5) << "encountered image demotion: stopping" << dendl;
923 handle_replay_complete(0, "");
924 }
925 }
926
927 librbd::journal::TagPredecessor predecessor(m_replay_tag_data.predecessor);
928 if (predecessor.mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
929 predecessor.mirror_uuid = m_state_builder->remote_mirror_uuid;
930 } else if (predecessor.mirror_uuid == m_local_mirror_uuid) {
931 predecessor.mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
932 }
933
934 dout(15) << "mirror_uuid=" << mirror_uuid << ", "
935 << "predecessor=" << predecessor << ", "
936 << "replay_tag_tid=" << m_replay_tag_tid << dendl;
937 Context *ctx = create_context_callback<
938 Replayer, &Replayer<I>::handle_allocate_local_tag>(this);
939 m_local_journal->allocate_tag(mirror_uuid, predecessor, ctx);
940 }
941
942 template <typename I>
943 void Replayer<I>::handle_allocate_local_tag(int r) {
944 dout(15) << "r=" << r << ", "
945 << "tag_tid=" << m_local_journal->get_tag_tid() << dendl;
946 if (r < 0) {
947 derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl;
948 handle_replay_complete(r, "failed to allocate journal tag");
949 m_event_replay_tracker.finish_op();
950 return;
951 }
952
953 preprocess_entry();
954 }
955
956 template <typename I>
957 void Replayer<I>::preprocess_entry() {
958 dout(20) << "preprocessing entry tid=" << m_replay_entry.get_commit_tid()
959 << dendl;
960
961 bufferlist data = m_replay_entry.get_data();
962 auto it = data.cbegin();
963 int r = m_local_journal_replay->decode(&it, &m_event_entry);
964 if (r < 0) {
965 derr << "failed to decode journal event" << dendl;
966 handle_replay_complete(r, "failed to decode journal event");
967 m_event_replay_tracker.finish_op();
968 return;
969 }
970
971 m_replay_bytes = data.length();
972 uint32_t delay = calculate_replay_delay(
973 m_event_entry.timestamp,
974 m_state_builder->local_image_ctx->mirroring_replay_delay);
975 if (delay == 0) {
976 handle_preprocess_entry_ready(0);
977 return;
978 }
979
980 std::unique_lock locker{m_lock};
981 if (is_replay_complete(locker)) {
982 // don't schedule a delayed replay task if a shut-down is in-progress
983 m_event_replay_tracker.finish_op();
984 return;
985 }
986
987 dout(20) << "delaying replay by " << delay << " sec" << dendl;
988 std::unique_lock timer_locker{m_threads->timer_lock};
989 ceph_assert(m_delayed_preprocess_task == nullptr);
990 m_delayed_preprocess_task = create_context_callback<
991 Replayer<I>, &Replayer<I>::handle_delayed_preprocess_task>(this);
992 m_threads->timer->add_event_after(delay, m_delayed_preprocess_task);
993 }
994
995 template <typename I>
996 void Replayer<I>::handle_delayed_preprocess_task(int r) {
997 dout(20) << "r=" << r << dendl;
998
999 ceph_assert(ceph_mutex_is_locked_by_me(m_threads->timer_lock));
1000 m_delayed_preprocess_task = nullptr;
1001
1002 m_threads->work_queue->queue(create_context_callback<
1003 Replayer, &Replayer<I>::handle_preprocess_entry_ready>(this), 0);
1004 }
1005
1006 template <typename I>
1007 void Replayer<I>::handle_preprocess_entry_ready(int r) {
1008 dout(20) << "r=" << r << dendl;
1009 ceph_assert(r == 0);
1010
1011 m_replay_start_time = ceph_clock_now();
1012 if (!m_event_preprocessor->is_required(m_event_entry)) {
1013 process_entry();
1014 return;
1015 }
1016
1017 Context *ctx = create_context_callback<
1018 Replayer, &Replayer<I>::handle_preprocess_entry_safe>(this);
1019 m_event_preprocessor->preprocess(&m_event_entry, ctx);
1020 }
1021
1022 template <typename I>
1023 void Replayer<I>::handle_preprocess_entry_safe(int r) {
1024 dout(20) << "r=" << r << dendl;
1025
1026 if (r < 0) {
1027 if (r == -ECANCELED) {
1028 handle_replay_complete(0, "lost exclusive lock");
1029 } else {
1030 derr << "failed to preprocess journal event" << dendl;
1031 handle_replay_complete(r, "failed to preprocess journal event");
1032 }
1033
1034 m_event_replay_tracker.finish_op();
1035 return;
1036 }
1037
1038 process_entry();
1039 }
1040
1041 template <typename I>
1042 void Replayer<I>::process_entry() {
1043 dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid()
1044 << dendl;
1045
1046 Context *on_ready = create_context_callback<
1047 Replayer, &Replayer<I>::handle_process_entry_ready>(this);
1048 Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry),
1049 m_replay_bytes,
1050 m_replay_start_time);
1051
1052 m_local_journal_replay->process(m_event_entry, on_ready, on_commit);
1053 }
1054
1055 template <typename I>
1056 void Replayer<I>::handle_process_entry_ready(int r) {
1057 std::unique_lock locker{m_lock};
1058
1059 dout(20) << dendl;
1060 ceph_assert(r == 0);
1061
1062 bool update_status = false;
1063 {
1064 auto local_image_ctx = m_state_builder->local_image_ctx;
1065 std::shared_lock image_locker{local_image_ctx->image_lock};
1066 auto image_spec = util::compute_image_spec(local_image_ctx->md_ctx,
1067 local_image_ctx->name);
1068 if (m_image_spec != image_spec) {
1069 m_image_spec = image_spec;
1070 update_status = true;
1071 }
1072 }
1073
1074 if (update_status) {
1075 unregister_perf_counters();
1076 register_perf_counters();
1077 notify_status_updated();
1078 }
1079
1080 // attempt to process the next event
1081 handle_replay_ready(locker);
1082 }
1083
1084 template <typename I>
1085 void Replayer<I>::handle_process_entry_safe(
1086 const ReplayEntry &replay_entry, uint64_t replay_bytes,
1087 const utime_t &replay_start_time, int r) {
1088 dout(20) << "commit_tid=" << replay_entry.get_commit_tid() << ", r=" << r
1089 << dendl;
1090
1091 if (r < 0) {
1092 derr << "failed to commit journal event: " << cpp_strerror(r) << dendl;
1093 handle_replay_complete(r, "failed to commit journal event");
1094 } else {
1095 ceph_assert(m_state_builder->remote_journaler != nullptr);
1096 m_state_builder->remote_journaler->committed(replay_entry);
1097 }
1098
1099 auto latency = ceph_clock_now() - replay_start_time;
1100 if (g_perf_counters) {
1101 g_perf_counters->inc(l_rbd_mirror_replay);
1102 g_perf_counters->inc(l_rbd_mirror_replay_bytes, replay_bytes);
1103 g_perf_counters->tinc(l_rbd_mirror_replay_latency, latency);
1104 }
1105
1106 auto ctx = new LambdaContext(
1107 [this, replay_bytes, latency](int r) {
1108 std::unique_lock locker{m_lock};
1109 schedule_flush_local_replay_task();
1110
1111 if (m_perf_counters) {
1112 m_perf_counters->inc(l_rbd_mirror_replay);
1113 m_perf_counters->inc(l_rbd_mirror_replay_bytes, replay_bytes);
1114 m_perf_counters->tinc(l_rbd_mirror_replay_latency, latency);
1115 }
1116
1117 m_event_replay_tracker.finish_op();
1118 });
1119 m_threads->work_queue->queue(ctx, 0);
1120 }
1121
1122 template <typename I>
1123 void Replayer<I>::handle_resync_image() {
1124 dout(10) << dendl;
1125
1126 std::unique_lock locker{m_lock};
1127 m_resync_requested = true;
1128 handle_replay_complete(locker, 0, "resync requested");
1129 }
1130
1131 template <typename I>
1132 void Replayer<I>::notify_status_updated() {
1133 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1134
1135 dout(10) << dendl;
1136
1137 auto ctx = new C_TrackedOp(this, new LambdaContext(
1138 [this](int) {
1139 m_replayer_listener->handle_notification();
1140 }));
1141 m_threads->work_queue->queue(ctx, 0);
1142 }
1143
1144 template <typename I>
1145 void Replayer<I>::cancel_delayed_preprocess_task() {
1146 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1147
1148 bool canceled_delayed_preprocess_task = false;
1149 {
1150 std::unique_lock timer_locker{m_threads->timer_lock};
1151 if (m_delayed_preprocess_task != nullptr) {
1152 dout(10) << dendl;
1153 canceled_delayed_preprocess_task = m_threads->timer->cancel_event(
1154 m_delayed_preprocess_task);
1155 ceph_assert(canceled_delayed_preprocess_task);
1156 m_delayed_preprocess_task = nullptr;
1157 }
1158 }
1159
1160 if (canceled_delayed_preprocess_task) {
1161 // wake up sleeping replay
1162 m_event_replay_tracker.finish_op();
1163 }
1164 }
1165
1166 template <typename I>
1167 int Replayer<I>::validate_remote_client_state(
1168 const cls::journal::Client& remote_client,
1169 librbd::journal::MirrorPeerClientMeta* remote_client_meta,
1170 bool* resync_requested, std::string* error) {
1171 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1172
1173 if (!util::decode_client_meta(remote_client, remote_client_meta)) {
1174 // require operator intervention since the data is corrupt
1175 *error = "error retrieving remote journal client";
1176 return -EBADMSG;
1177 }
1178
1179 auto local_image_ctx = m_state_builder->local_image_ctx;
1180 dout(5) << "image_id=" << local_image_ctx->id << ", "
1181 << "remote_client_meta.image_id="
1182 << remote_client_meta->image_id << ", "
1183 << "remote_client.state=" << remote_client.state << dendl;
1184 if (remote_client_meta->image_id == local_image_ctx->id &&
1185 remote_client.state != cls::journal::CLIENT_STATE_CONNECTED) {
1186 dout(5) << "client flagged disconnected, stopping image replay" << dendl;
1187 if (local_image_ctx->config.template get_val<bool>(
1188 "rbd_mirroring_resync_after_disconnect")) {
1189 dout(10) << "disconnected: automatic resync" << dendl;
1190 *resync_requested = true;
1191 *error = "disconnected: automatic resync";
1192 return -ENOTCONN;
1193 } else {
1194 dout(10) << "disconnected" << dendl;
1195 *error = "disconnected";
1196 return -ENOTCONN;
1197 }
1198 }
1199
1200 return 0;
1201 }
1202
1203 template <typename I>
1204 void Replayer<I>::register_perf_counters() {
1205 dout(5) << dendl;
1206
1207 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1208 ceph_assert(m_perf_counters == nullptr);
1209
1210 auto cct = static_cast<CephContext *>(m_state_builder->local_image_ctx->cct);
1211 auto prio = cct->_conf.get_val<int64_t>("rbd_mirror_image_perf_stats_prio");
1212 PerfCountersBuilder plb(g_ceph_context, "rbd_mirror_image_" + m_image_spec,
1213 l_rbd_mirror_first, l_rbd_mirror_last);
1214 plb.add_u64_counter(l_rbd_mirror_replay, "replay", "Replays", "r", prio);
1215 plb.add_u64_counter(l_rbd_mirror_replay_bytes, "replay_bytes",
1216 "Replayed data", "rb", prio, unit_t(UNIT_BYTES));
1217 plb.add_time_avg(l_rbd_mirror_replay_latency, "replay_latency",
1218 "Replay latency", "rl", prio);
1219 m_perf_counters = plb.create_perf_counters();
1220 g_ceph_context->get_perfcounters_collection()->add(m_perf_counters);
1221 }
1222
1223 template <typename I>
1224 void Replayer<I>::unregister_perf_counters() {
1225 dout(5) << dendl;
1226 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1227
1228 PerfCounters *perf_counters = nullptr;
1229 std::swap(perf_counters, m_perf_counters);
1230
1231 if (perf_counters != nullptr) {
1232 g_ceph_context->get_perfcounters_collection()->remove(perf_counters);
1233 delete perf_counters;
1234 }
1235 }
1236
1237 } // namespace journal
1238 } // namespace image_replayer
1239 } // namespace mirror
1240 } // namespace rbd
1241
1242 template class rbd::mirror::image_replayer::journal::Replayer<librbd::ImageCtx>;