]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/image_replayer/journal/Replayer.cc
import 15.2.4
[ceph.git] / ceph / src / tools / rbd_mirror / image_replayer / journal / Replayer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "Replayer.h"
5 #include "common/debug.h"
6 #include "common/errno.h"
7 #include "common/Timer.h"
8 #include "common/WorkQueue.h"
9 #include "librbd/Journal.h"
10 #include "librbd/Utils.h"
11 #include "librbd/journal/Replay.h"
12 #include "journal/Journaler.h"
13 #include "journal/JournalMetadataListener.h"
14 #include "journal/ReplayHandler.h"
15 #include "tools/rbd_mirror/Threads.h"
16 #include "tools/rbd_mirror/Types.h"
17 #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
18 #include "tools/rbd_mirror/image_replayer/ReplayerListener.h"
19 #include "tools/rbd_mirror/image_replayer/Utils.h"
20 #include "tools/rbd_mirror/image_replayer/journal/EventPreprocessor.h"
21 #include "tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h"
22 #include "tools/rbd_mirror/image_replayer/journal/StateBuilder.h"
23
24 #define dout_context g_ceph_context
25 #define dout_subsys ceph_subsys_rbd_mirror
26 #undef dout_prefix
27 #define dout_prefix *_dout << "rbd::mirror::image_replayer::journal::" \
28 << "Replayer: " << this << " " << __func__ << ": "
29
30 extern PerfCounters *g_perf_counters;
31
32 namespace rbd {
33 namespace mirror {
34 namespace image_replayer {
35 namespace journal {
36
37 namespace {
38
39 uint32_t calculate_replay_delay(const utime_t &event_time,
40 int mirroring_replay_delay) {
41 if (mirroring_replay_delay <= 0) {
42 return 0;
43 }
44
45 utime_t now = ceph_clock_now();
46 if (event_time + mirroring_replay_delay <= now) {
47 return 0;
48 }
49
50 // ensure it is rounded up when converting to integer
51 return (event_time + mirroring_replay_delay - now) + 1;
52 }
53
54 } // anonymous namespace
55
56 using librbd::util::create_async_context_callback;
57 using librbd::util::create_context_callback;
58
59 template <typename I>
60 struct Replayer<I>::C_ReplayCommitted : public Context {
61 Replayer* replayer;
62 ReplayEntry replay_entry;
63 uint64_t replay_bytes;
64 utime_t replay_start_time;
65
66 C_ReplayCommitted(Replayer* replayer, ReplayEntry &&replay_entry,
67 uint64_t replay_bytes, const utime_t &replay_start_time)
68 : replayer(replayer), replay_entry(std::move(replay_entry)),
69 replay_bytes(replay_bytes), replay_start_time(replay_start_time) {
70 }
71
72 void finish(int r) override {
73 replayer->handle_process_entry_safe(replay_entry, replay_bytes,
74 replay_start_time, r);
75 }
76 };
77
78 template <typename I>
79 struct Replayer<I>::RemoteJournalerListener
80 : public ::journal::JournalMetadataListener {
81 Replayer* replayer;
82
83 RemoteJournalerListener(Replayer* replayer) : replayer(replayer) {}
84
85 void handle_update(::journal::JournalMetadata*) override {
86 auto ctx = new C_TrackedOp(
87 replayer->m_in_flight_op_tracker,
88 new LambdaContext([this](int r) {
89 replayer->handle_remote_journal_metadata_updated();
90 }));
91 replayer->m_threads->work_queue->queue(ctx, 0);
92 }
93 };
94
95 template <typename I>
96 struct Replayer<I>::RemoteReplayHandler : public ::journal::ReplayHandler {
97 Replayer* replayer;
98
99 RemoteReplayHandler(Replayer* replayer) : replayer(replayer) {}
100 ~RemoteReplayHandler() override {};
101
102 void handle_entries_available() override {
103 replayer->handle_replay_ready();
104 }
105
106 void handle_complete(int r) override {
107 std::string error;
108 if (r == -ENOMEM) {
109 error = "not enough memory in autotune cache";
110 } else if (r < 0) {
111 error = "replay completed with error: " + cpp_strerror(r);
112 }
113 replayer->handle_replay_complete(r, error);
114 }
115 };
116
117 template <typename I>
118 struct Replayer<I>::LocalJournalListener
119 : public librbd::journal::Listener {
120 Replayer* replayer;
121
122 LocalJournalListener(Replayer* replayer) : replayer(replayer) {
123 }
124
125 void handle_close() override {
126 replayer->handle_replay_complete(0, "");
127 }
128
129 void handle_promoted() override {
130 replayer->handle_replay_complete(0, "force promoted");
131 }
132
133 void handle_resync() override {
134 replayer->handle_resync_image();
135 }
136 };
137
138 template <typename I>
139 Replayer<I>::Replayer(
140 Threads<I>* threads,
141 const std::string& local_mirror_uuid,
142 StateBuilder<I>* state_builder,
143 ReplayerListener* replayer_listener)
144 : m_threads(threads),
145 m_local_mirror_uuid(local_mirror_uuid),
146 m_state_builder(state_builder),
147 m_replayer_listener(replayer_listener),
148 m_lock(ceph::make_mutex(librbd::util::unique_lock_name(
149 "rbd::mirror::image_replayer::journal::Replayer", this))) {
150 dout(10) << dendl;
151
152 {
153 std::unique_lock locker{m_lock};
154 register_perf_counters();
155 }
156 }
157
158 template <typename I>
159 Replayer<I>::~Replayer() {
160 dout(10) << dendl;
161
162 {
163 std::unique_lock locker{m_lock};
164 unregister_perf_counters();
165 }
166
167 ceph_assert(m_remote_listener == nullptr);
168 ceph_assert(m_local_journal_listener == nullptr);
169 ceph_assert(m_local_journal_replay == nullptr);
170 ceph_assert(m_remote_replay_handler == nullptr);
171 ceph_assert(m_event_preprocessor == nullptr);
172 ceph_assert(m_replay_status_formatter == nullptr);
173 ceph_assert(m_delayed_preprocess_task == nullptr);
174 ceph_assert(m_flush_local_replay_task == nullptr);
175 ceph_assert(m_state_builder->local_image_ctx == nullptr);
176 }
177
178 template <typename I>
179 void Replayer<I>::init(Context* on_finish) {
180 dout(10) << dendl;
181
182 {
183 auto local_image_ctx = m_state_builder->local_image_ctx;
184 std::shared_lock image_locker{local_image_ctx->image_lock};
185 m_image_spec = util::compute_image_spec(local_image_ctx->md_ctx,
186 local_image_ctx->name);
187 }
188
189 ceph_assert(m_on_init_shutdown == nullptr);
190 m_on_init_shutdown = on_finish;
191
192 init_remote_journaler();
193 }
194
195 template <typename I>
196 void Replayer<I>::shut_down(Context* on_finish) {
197 dout(10) << dendl;
198
199 std::unique_lock locker{m_lock};
200 ceph_assert(m_on_init_shutdown == nullptr);
201 m_on_init_shutdown = on_finish;
202
203 if (m_state == STATE_INIT) {
204 // raced with the last piece of the init state machine
205 return;
206 } else if (m_state == STATE_REPLAYING) {
207 m_state = STATE_COMPLETE;
208 }
209
210 // if shutting down due to an error notification, we don't
211 // need to propagate the same error again
212 m_error_code = 0;
213 m_error_description = "";
214
215 cancel_delayed_preprocess_task();
216 cancel_flush_local_replay_task();
217 wait_for_flush();
218 }
219
220 template <typename I>
221 void Replayer<I>::flush(Context* on_finish) {
222 dout(10) << dendl;
223
224 flush_local_replay(new C_TrackedOp(m_in_flight_op_tracker, on_finish));
225 }
226
227 template <typename I>
228 bool Replayer<I>::get_replay_status(std::string* description,
229 Context* on_finish) {
230 dout(10) << dendl;
231
232 std::unique_lock locker{m_lock};
233 if (m_replay_status_formatter == nullptr) {
234 derr << "replay not running" << dendl;
235 locker.unlock();
236
237 on_finish->complete(-EAGAIN);
238 return false;
239 }
240
241 on_finish = new C_TrackedOp(m_in_flight_op_tracker, on_finish);
242 return m_replay_status_formatter->get_or_send_update(description,
243 on_finish);
244 }
245
246 template <typename I>
247 void Replayer<I>::init_remote_journaler() {
248 dout(10) << dendl;
249
250 Context *ctx = create_context_callback<
251 Replayer, &Replayer<I>::handle_init_remote_journaler>(this);
252 m_state_builder->remote_journaler->init(ctx);
253 }
254
255 template <typename I>
256 void Replayer<I>::handle_init_remote_journaler(int r) {
257 dout(10) << "r=" << r << dendl;
258
259 std::unique_lock locker{m_lock};
260 if (r < 0) {
261 derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl;
262 handle_replay_complete(locker, r, "error initializing remote journal");
263 close_local_image();
264 return;
265 }
266
267 // listen for metadata updates to check for disconnect events
268 ceph_assert(m_remote_listener == nullptr);
269 m_remote_listener = new RemoteJournalerListener(this);
270 m_state_builder->remote_journaler->add_listener(m_remote_listener);
271
272 cls::journal::Client remote_client;
273 r = m_state_builder->remote_journaler->get_cached_client(m_local_mirror_uuid,
274 &remote_client);
275 if (r < 0) {
276 derr << "error retrieving remote journal client: " << cpp_strerror(r)
277 << dendl;
278 handle_replay_complete(locker, r, "error retrieving remote journal client");
279 close_local_image();
280 return;
281 }
282
283 std::string error;
284 r = validate_remote_client_state(remote_client,
285 &m_state_builder->remote_client_meta,
286 &m_resync_requested, &error);
287 if (r < 0) {
288 handle_replay_complete(locker, r, error);
289 close_local_image();
290 return;
291 }
292
293 start_external_replay(locker);
294 }
295
296 template <typename I>
297 void Replayer<I>::start_external_replay(std::unique_lock<ceph::mutex>& locker) {
298 dout(10) << dendl;
299
300 auto local_image_ctx = m_state_builder->local_image_ctx;
301 std::shared_lock local_image_locker{local_image_ctx->image_lock};
302
303 ceph_assert(m_local_journal == nullptr);
304 m_local_journal = local_image_ctx->journal;
305 if (m_local_journal == nullptr) {
306 local_image_locker.unlock();
307
308 derr << "local image journal closed" << dendl;
309 handle_replay_complete(locker, -EINVAL, "error accessing local journal");
310 close_local_image();
311 return;
312 }
313
314 // safe to hold pointer to journal after external playback starts
315 Context *start_ctx = create_context_callback<
316 Replayer, &Replayer<I>::handle_start_external_replay>(this);
317 m_local_journal->start_external_replay(&m_local_journal_replay, start_ctx);
318 }
319
320 template <typename I>
321 void Replayer<I>::handle_start_external_replay(int r) {
322 dout(10) << "r=" << r << dendl;
323
324 std::unique_lock locker{m_lock};
325 if (r < 0) {
326 ceph_assert(m_local_journal_replay == nullptr);
327 derr << "error starting external replay on local image "
328 << m_state_builder->local_image_ctx->id << ": "
329 << cpp_strerror(r) << dendl;
330
331 handle_replay_complete(locker, r, "error starting replay on local image");
332 close_local_image();
333 return;
334 }
335
336 if (!notify_init_complete(locker)) {
337 return;
338 }
339
340 m_state = STATE_REPLAYING;
341
342 // check for resync/promotion state after adding listener
343 if (!add_local_journal_listener(locker)) {
344 return;
345 }
346
347 // start remote journal replay
348 m_event_preprocessor = EventPreprocessor<I>::create(
349 *m_state_builder->local_image_ctx, *m_state_builder->remote_journaler,
350 m_local_mirror_uuid, &m_state_builder->remote_client_meta,
351 m_threads->work_queue);
352 m_replay_status_formatter = ReplayStatusFormatter<I>::create(
353 m_state_builder->remote_journaler, m_local_mirror_uuid);
354
355 auto cct = static_cast<CephContext *>(m_state_builder->local_image_ctx->cct);
356 double poll_seconds = cct->_conf.get_val<double>(
357 "rbd_mirror_journal_poll_age");
358 m_remote_replay_handler = new RemoteReplayHandler(this);
359 m_state_builder->remote_journaler->start_live_replay(m_remote_replay_handler,
360 poll_seconds);
361
362 notify_status_updated();
363 }
364
365 template <typename I>
366 bool Replayer<I>::add_local_journal_listener(
367 std::unique_lock<ceph::mutex>& locker) {
368 dout(10) << dendl;
369
370 // listen for promotion and resync requests against local journal
371 ceph_assert(m_local_journal_listener == nullptr);
372 m_local_journal_listener = new LocalJournalListener(this);
373 m_local_journal->add_listener(m_local_journal_listener);
374
375 // verify that the local image wasn't force-promoted and that a resync hasn't
376 // been requested now that we are listening for events
377 if (m_local_journal->is_tag_owner()) {
378 dout(10) << "local image force-promoted" << dendl;
379 handle_replay_complete(locker, 0, "force promoted");
380 return false;
381 }
382
383 bool resync_requested = false;
384 int r = m_local_journal->is_resync_requested(&resync_requested);
385 if (r < 0) {
386 dout(10) << "failed to determine resync state: " << cpp_strerror(r)
387 << dendl;
388 handle_replay_complete(locker, r, "error parsing resync state");
389 return false;
390 } else if (resync_requested) {
391 dout(10) << "local image resync requested" << dendl;
392 handle_replay_complete(locker, 0, "resync requested");
393 return false;
394 }
395
396 return true;
397 }
398
399 template <typename I>
400 bool Replayer<I>::notify_init_complete(std::unique_lock<ceph::mutex>& locker) {
401 dout(10) << dendl;
402
403 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
404 ceph_assert(m_state == STATE_INIT);
405
406 // notify that init has completed
407 Context *on_finish = nullptr;
408 std::swap(m_on_init_shutdown, on_finish);
409
410 locker.unlock();
411 on_finish->complete(0);
412 locker.lock();
413
414 if (m_on_init_shutdown != nullptr) {
415 // shut down requested after we notified init complete but before we
416 // grabbed the lock
417 close_local_image();
418 return false;
419 }
420
421 return true;
422 }
423
424 template <typename I>
425 void Replayer<I>::wait_for_flush() {
426 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
427
428 // ensure that we don't have two concurrent local journal replay shut downs
429 dout(10) << dendl;
430 auto ctx = create_async_context_callback(
431 m_threads->work_queue, create_context_callback<
432 Replayer<I>, &Replayer<I>::handle_wait_for_flush>(this));
433 m_flush_tracker.wait_for_ops(ctx);
434 }
435
436 template <typename I>
437 void Replayer<I>::handle_wait_for_flush(int r) {
438 dout(10) << "r=" << r << dendl;
439
440 shut_down_local_journal_replay();
441 }
442
443 template <typename I>
444 void Replayer<I>::shut_down_local_journal_replay() {
445 std::unique_lock locker{m_lock};
446
447 if (m_local_journal_replay == nullptr) {
448 wait_for_event_replay();
449 return;
450 }
451
452 // It's required to stop the local journal replay state machine prior to
453 // waiting for the events to complete. This is to ensure that IO is properly
454 // flushed (it might be batched), wait for any running ops to complete, and
455 // to cancel any ops waiting for their associated OnFinish events.
456 dout(10) << dendl;
457 auto ctx = create_context_callback<
458 Replayer<I>, &Replayer<I>::handle_shut_down_local_journal_replay>(this);
459 m_local_journal_replay->shut_down(true, ctx);
460 }
461
462 template <typename I>
463 void Replayer<I>::handle_shut_down_local_journal_replay(int r) {
464 dout(10) << "r=" << r << dendl;
465
466 std::unique_lock locker{m_lock};
467 if (r < 0) {
468 derr << "error shutting down journal replay: " << cpp_strerror(r) << dendl;
469 handle_replay_error(r, "failed to shut down local journal replay");
470 }
471
472 wait_for_event_replay();
473 }
474
475 template <typename I>
476 void Replayer<I>::wait_for_event_replay() {
477 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
478
479 dout(10) << dendl;
480 auto ctx = create_async_context_callback(
481 m_threads->work_queue, create_context_callback<
482 Replayer<I>, &Replayer<I>::handle_wait_for_event_replay>(this));
483 m_event_replay_tracker.wait_for_ops(ctx);
484 }
485
486 template <typename I>
487 void Replayer<I>::handle_wait_for_event_replay(int r) {
488 dout(10) << "r=" << r << dendl;
489
490 std::unique_lock locker{m_lock};
491 close_local_image();
492 }
493
494 template <typename I>
495 void Replayer<I>::close_local_image() {
496 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
497 if (m_state_builder->local_image_ctx == nullptr) {
498 stop_remote_journaler_replay();
499 return;
500 }
501
502 dout(10) << dendl;
503 if (m_local_journal_listener != nullptr) {
504 // blocks if listener notification is in-progress
505 m_local_journal->remove_listener(m_local_journal_listener);
506 delete m_local_journal_listener;
507 m_local_journal_listener = nullptr;
508 }
509
510 if (m_local_journal_replay != nullptr) {
511 m_local_journal->stop_external_replay();
512 m_local_journal_replay = nullptr;
513 }
514
515 if (m_event_preprocessor != nullptr) {
516 image_replayer::journal::EventPreprocessor<I>::destroy(
517 m_event_preprocessor);
518 m_event_preprocessor = nullptr;
519 }
520
521 m_local_journal.reset();
522
523 // NOTE: it's important to ensure that the local image is fully
524 // closed before attempting to close the remote journal in
525 // case the remote cluster is unreachable
526 ceph_assert(m_state_builder->local_image_ctx != nullptr);
527 auto ctx = create_context_callback<
528 Replayer<I>, &Replayer<I>::handle_close_local_image>(this);
529 auto request = image_replayer::CloseImageRequest<I>::create(
530 &m_state_builder->local_image_ctx, ctx);
531 request->send();
532 }
533
534
535 template <typename I>
536 void Replayer<I>::handle_close_local_image(int r) {
537 dout(10) << "r=" << r << dendl;
538
539 std::unique_lock locker{m_lock};
540 if (r < 0) {
541 derr << "error closing local iamge: " << cpp_strerror(r) << dendl;
542 handle_replay_error(r, "failed to close local image");
543 }
544
545 ceph_assert(m_state_builder->local_image_ctx == nullptr);
546 stop_remote_journaler_replay();
547 }
548
549 template <typename I>
550 void Replayer<I>::stop_remote_journaler_replay() {
551 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
552
553 if (m_state_builder->remote_journaler == nullptr) {
554 wait_for_in_flight_ops();
555 return;
556 } else if (m_remote_replay_handler == nullptr) {
557 wait_for_in_flight_ops();
558 return;
559 }
560
561 dout(10) << dendl;
562 auto ctx = create_async_context_callback(
563 m_threads->work_queue, create_context_callback<
564 Replayer<I>, &Replayer<I>::handle_stop_remote_journaler_replay>(this));
565 m_state_builder->remote_journaler->stop_replay(ctx);
566 }
567
568 template <typename I>
569 void Replayer<I>::handle_stop_remote_journaler_replay(int r) {
570 dout(10) << "r=" << r << dendl;
571
572 std::unique_lock locker{m_lock};
573 if (r < 0) {
574 derr << "failed to stop remote journaler replay : " << cpp_strerror(r)
575 << dendl;
576 handle_replay_error(r, "failed to stop remote journaler replay");
577 }
578
579 delete m_remote_replay_handler;
580 m_remote_replay_handler = nullptr;
581
582 wait_for_in_flight_ops();
583 }
584
585 template <typename I>
586 void Replayer<I>::wait_for_in_flight_ops() {
587 dout(10) << dendl;
588 if (m_remote_listener != nullptr) {
589 m_state_builder->remote_journaler->remove_listener(m_remote_listener);
590 delete m_remote_listener;
591 m_remote_listener = nullptr;
592 }
593
594 auto ctx = create_async_context_callback(
595 m_threads->work_queue, create_context_callback<
596 Replayer<I>, &Replayer<I>::handle_wait_for_in_flight_ops>(this));
597 m_in_flight_op_tracker.wait_for_ops(ctx);
598 }
599
600 template <typename I>
601 void Replayer<I>::handle_wait_for_in_flight_ops(int r) {
602 dout(10) << "r=" << r << dendl;
603
604 ReplayStatusFormatter<I>::destroy(m_replay_status_formatter);
605 m_replay_status_formatter = nullptr;
606
607 Context* on_init_shutdown = nullptr;
608 {
609 std::unique_lock locker{m_lock};
610 ceph_assert(m_on_init_shutdown != nullptr);
611 std::swap(m_on_init_shutdown, on_init_shutdown);
612 m_state = STATE_COMPLETE;
613 }
614 on_init_shutdown->complete(m_error_code);
615 }
616
617 template <typename I>
618 void Replayer<I>::handle_remote_journal_metadata_updated() {
619 dout(20) << dendl;
620
621 std::unique_lock locker{m_lock};
622 if (m_state != STATE_REPLAYING) {
623 return;
624 }
625
626 cls::journal::Client remote_client;
627 int r = m_state_builder->remote_journaler->get_cached_client(
628 m_local_mirror_uuid, &remote_client);
629 if (r < 0) {
630 derr << "failed to retrieve client: " << cpp_strerror(r) << dendl;
631 return;
632 }
633
634 librbd::journal::MirrorPeerClientMeta remote_client_meta;
635 std::string error;
636 r = validate_remote_client_state(remote_client, &remote_client_meta,
637 &m_resync_requested, &error);
638 if (r < 0) {
639 dout(0) << "client flagged disconnected, stopping image replay" << dendl;
640 handle_replay_complete(locker, r, error);
641 }
642 }
643
644 template <typename I>
645 void Replayer<I>::schedule_flush_local_replay_task() {
646 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
647
648 std::unique_lock timer_locker{m_threads->timer_lock};
649 if (m_state != STATE_REPLAYING || m_flush_local_replay_task != nullptr) {
650 return;
651 }
652
653 dout(15) << dendl;
654 m_flush_local_replay_task = create_async_context_callback(
655 m_threads->work_queue, create_context_callback<
656 Replayer<I>, &Replayer<I>::handle_flush_local_replay_task>(this));
657 m_threads->timer->add_event_after(30, m_flush_local_replay_task);
658 }
659
660 template <typename I>
661 void Replayer<I>::cancel_flush_local_replay_task() {
662 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
663
664 std::unique_lock timer_locker{m_threads->timer_lock};
665 if (m_flush_local_replay_task != nullptr) {
666 dout(10) << dendl;
667 m_threads->timer->cancel_event(m_flush_local_replay_task);
668 m_flush_local_replay_task = nullptr;
669 }
670 }
671
672 template <typename I>
673 void Replayer<I>::handle_flush_local_replay_task(int) {
674 dout(15) << dendl;
675
676 m_in_flight_op_tracker.start_op();
677 auto on_finish = new LambdaContext([this](int) {
678 std::unique_lock locker{m_lock};
679
680 {
681 std::unique_lock timer_locker{m_threads->timer_lock};
682 m_flush_local_replay_task = nullptr;
683 }
684
685 notify_status_updated();
686 m_in_flight_op_tracker.finish_op();
687 });
688 flush_local_replay(on_finish);
689 }
690
691 template <typename I>
692 void Replayer<I>::flush_local_replay(Context* on_flush) {
693 std::unique_lock locker{m_lock};
694 if (m_state != STATE_REPLAYING) {
695 locker.unlock();
696 on_flush->complete(0);
697 return;
698 } else if (m_local_journal_replay == nullptr) {
699 // raced w/ a tag creation stop/start, which implies that
700 // the replay is flushed
701 locker.unlock();
702 flush_commit_position(on_flush);
703 return;
704 }
705
706 dout(15) << dendl;
707 auto ctx = new LambdaContext(
708 [this, on_flush](int r) {
709 handle_flush_local_replay(on_flush, r);
710 });
711 m_local_journal_replay->flush(ctx);
712 }
713
714 template <typename I>
715 void Replayer<I>::handle_flush_local_replay(Context* on_flush, int r) {
716 dout(15) << "r=" << r << dendl;
717 if (r < 0) {
718 derr << "error flushing local replay: " << cpp_strerror(r) << dendl;
719 on_flush->complete(r);
720 return;
721 }
722
723 flush_commit_position(on_flush);
724 }
725
726 template <typename I>
727 void Replayer<I>::flush_commit_position(Context* on_flush) {
728 std::unique_lock locker{m_lock};
729 if (m_state != STATE_REPLAYING) {
730 locker.unlock();
731 on_flush->complete(0);
732 return;
733 }
734
735 dout(15) << dendl;
736 auto ctx = new LambdaContext(
737 [this, on_flush](int r) {
738 handle_flush_commit_position(on_flush, r);
739 });
740 m_state_builder->remote_journaler->flush_commit_position(ctx);
741 }
742
743 template <typename I>
744 void Replayer<I>::handle_flush_commit_position(Context* on_flush, int r) {
745 dout(15) << "r=" << r << dendl;
746 if (r < 0) {
747 derr << "error flushing remote journal commit position: "
748 << cpp_strerror(r) << dendl;
749 }
750
751 on_flush->complete(r);
752 }
753
754 template <typename I>
755 void Replayer<I>::handle_replay_error(int r, const std::string &error) {
756 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
757
758 if (m_error_code == 0) {
759 m_error_code = r;
760 m_error_description = error;
761 }
762 }
763
764 template <typename I>
765 bool Replayer<I>::is_replay_complete() const {
766 std::unique_lock locker{m_lock};
767 return is_replay_complete(locker);
768 }
769
770 template <typename I>
771 bool Replayer<I>::is_replay_complete(
772 const std::unique_lock<ceph::mutex>&) const {
773 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
774 return (m_state == STATE_COMPLETE);
775 }
776
777 template <typename I>
778 void Replayer<I>::handle_replay_complete(int r, const std::string &error) {
779 std::unique_lock locker{m_lock};
780 handle_replay_complete(locker, r, error);
781 }
782
783 template <typename I>
784 void Replayer<I>::handle_replay_complete(
785 const std::unique_lock<ceph::mutex>&, int r, const std::string &error) {
786 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
787
788 dout(10) << "r=" << r << ", error=" << error << dendl;
789 if (r < 0) {
790 derr << "replay encountered an error: " << cpp_strerror(r) << dendl;
791 handle_replay_error(r, error);
792 }
793
794 if (m_state != STATE_REPLAYING) {
795 return;
796 }
797
798 m_state = STATE_COMPLETE;
799 notify_status_updated();
800 }
801
802 template <typename I>
803 void Replayer<I>::handle_replay_ready() {
804 std::unique_lock locker{m_lock};
805 handle_replay_ready(locker);
806 }
807
808 template <typename I>
809 void Replayer<I>::handle_replay_ready(
810 std::unique_lock<ceph::mutex>& locker) {
811 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
812
813 dout(20) << dendl;
814 if (is_replay_complete(locker)) {
815 return;
816 }
817
818 if (!m_state_builder->remote_journaler->try_pop_front(&m_replay_entry,
819 &m_replay_tag_tid)) {
820 dout(20) << "no entries ready for replay" << dendl;
821 return;
822 }
823
824 // can safely drop lock once the entry is tracked
825 m_event_replay_tracker.start_op();
826 locker.unlock();
827
828 dout(20) << "entry tid=" << m_replay_entry.get_commit_tid()
829 << "tag_tid=" << m_replay_tag_tid << dendl;
830 if (!m_replay_tag_valid || m_replay_tag.tid != m_replay_tag_tid) {
831 // must allocate a new local journal tag prior to processing
832 replay_flush();
833 return;
834 }
835
836 preprocess_entry();
837 }
838
839 template <typename I>
840 void Replayer<I>::replay_flush() {
841 dout(10) << dendl;
842 m_flush_tracker.start_op();
843
844 // shut down the replay to flush all IO and ops and create a new
845 // replayer to handle the new tag epoch
846 auto ctx = create_context_callback<
847 Replayer<I>, &Replayer<I>::handle_replay_flush_shut_down>(this);
848 ceph_assert(m_local_journal_replay != nullptr);
849 m_local_journal_replay->shut_down(false, ctx);
850 }
851
852 template <typename I>
853 void Replayer<I>::handle_replay_flush_shut_down(int r) {
854 std::unique_lock locker{m_lock};
855 dout(10) << "r=" << r << dendl;
856
857 ceph_assert(m_local_journal != nullptr);
858 ceph_assert(m_local_journal_listener != nullptr);
859
860 // blocks if listener notification is in-progress
861 m_local_journal->remove_listener(m_local_journal_listener);
862 delete m_local_journal_listener;
863 m_local_journal_listener = nullptr;
864
865 m_local_journal->stop_external_replay();
866 m_local_journal_replay = nullptr;
867 m_local_journal.reset();
868
869 if (r < 0) {
870 locker.unlock();
871
872 handle_replay_flush(r);
873 return;
874 }
875
876 // journal might have been closed now that we stopped external replay
877 auto local_image_ctx = m_state_builder->local_image_ctx;
878 std::shared_lock local_image_locker{local_image_ctx->image_lock};
879 m_local_journal = local_image_ctx->journal;
880 if (m_local_journal == nullptr) {
881 local_image_locker.unlock();
882 locker.unlock();
883
884 derr << "local image journal closed" << dendl;
885 handle_replay_flush(-EINVAL);
886 return;
887 }
888
889 auto ctx = create_context_callback<
890 Replayer<I>, &Replayer<I>::handle_replay_flush>(this);
891 m_local_journal->start_external_replay(&m_local_journal_replay, ctx);
892 }
893
894 template <typename I>
895 void Replayer<I>::handle_replay_flush(int r) {
896 std::unique_lock locker{m_lock};
897 dout(10) << "r=" << r << dendl;
898 m_flush_tracker.finish_op();
899
900 if (r < 0) {
901 derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl;
902 handle_replay_complete(locker, r, "replay flush encountered an error");
903 m_event_replay_tracker.finish_op();
904 return;
905 } else if (is_replay_complete(locker)) {
906 m_event_replay_tracker.finish_op();
907 return;
908 }
909
910 // check for resync/promotion state after adding listener
911 if (!add_local_journal_listener(locker)) {
912 m_event_replay_tracker.finish_op();
913 return;
914 }
915 locker.unlock();
916
917 get_remote_tag();
918 }
919
920 template <typename I>
921 void Replayer<I>::get_remote_tag() {
922 dout(15) << "tag_tid: " << m_replay_tag_tid << dendl;
923
924 Context *ctx = create_context_callback<
925 Replayer, &Replayer<I>::handle_get_remote_tag>(this);
926 m_state_builder->remote_journaler->get_tag(m_replay_tag_tid, &m_replay_tag,
927 ctx);
928 }
929
930 template <typename I>
931 void Replayer<I>::handle_get_remote_tag(int r) {
932 dout(15) << "r=" << r << dendl;
933
934 if (r == 0) {
935 try {
936 auto it = m_replay_tag.data.cbegin();
937 decode(m_replay_tag_data, it);
938 } catch (const buffer::error &err) {
939 r = -EBADMSG;
940 }
941 }
942
943 if (r < 0) {
944 derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": "
945 << cpp_strerror(r) << dendl;
946 handle_replay_complete(r, "failed to retrieve remote tag");
947 m_event_replay_tracker.finish_op();
948 return;
949 }
950
951 m_replay_tag_valid = true;
952 dout(15) << "decoded remote tag " << m_replay_tag_tid << ": "
953 << m_replay_tag_data << dendl;
954
955 allocate_local_tag();
956 }
957
958 template <typename I>
959 void Replayer<I>::allocate_local_tag() {
960 dout(15) << dendl;
961
962 std::string mirror_uuid = m_replay_tag_data.mirror_uuid;
963 if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
964 mirror_uuid = m_state_builder->remote_mirror_uuid;
965 } else if (mirror_uuid == m_local_mirror_uuid) {
966 mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
967 } else if (mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) {
968 // handle possible edge condition where daemon can failover and
969 // the local image has already been promoted/demoted
970 auto local_tag_data = m_local_journal->get_tag_data();
971 if (local_tag_data.mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID &&
972 (local_tag_data.predecessor.commit_valid &&
973 local_tag_data.predecessor.mirror_uuid ==
974 librbd::Journal<>::LOCAL_MIRROR_UUID)) {
975 dout(15) << "skipping stale demotion event" << dendl;
976 handle_process_entry_safe(m_replay_entry, m_replay_bytes,
977 m_replay_start_time, 0);
978 handle_replay_ready();
979 return;
980 } else {
981 dout(5) << "encountered image demotion: stopping" << dendl;
982 handle_replay_complete(0, "");
983 }
984 }
985
986 librbd::journal::TagPredecessor predecessor(m_replay_tag_data.predecessor);
987 if (predecessor.mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
988 predecessor.mirror_uuid = m_state_builder->remote_mirror_uuid;
989 } else if (predecessor.mirror_uuid == m_local_mirror_uuid) {
990 predecessor.mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
991 }
992
993 dout(15) << "mirror_uuid=" << mirror_uuid << ", "
994 << "predecessor=" << predecessor << ", "
995 << "replay_tag_tid=" << m_replay_tag_tid << dendl;
996 Context *ctx = create_context_callback<
997 Replayer, &Replayer<I>::handle_allocate_local_tag>(this);
998 m_local_journal->allocate_tag(mirror_uuid, predecessor, ctx);
999 }
1000
1001 template <typename I>
1002 void Replayer<I>::handle_allocate_local_tag(int r) {
1003 dout(15) << "r=" << r << ", "
1004 << "tag_tid=" << m_local_journal->get_tag_tid() << dendl;
1005 if (r < 0) {
1006 derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl;
1007 handle_replay_complete(r, "failed to allocate journal tag");
1008 m_event_replay_tracker.finish_op();
1009 return;
1010 }
1011
1012 preprocess_entry();
1013 }
1014
1015 template <typename I>
1016 void Replayer<I>::preprocess_entry() {
1017 dout(20) << "preprocessing entry tid=" << m_replay_entry.get_commit_tid()
1018 << dendl;
1019
1020 bufferlist data = m_replay_entry.get_data();
1021 auto it = data.cbegin();
1022 int r = m_local_journal_replay->decode(&it, &m_event_entry);
1023 if (r < 0) {
1024 derr << "failed to decode journal event" << dendl;
1025 handle_replay_complete(r, "failed to decode journal event");
1026 m_event_replay_tracker.finish_op();
1027 return;
1028 }
1029
1030 m_replay_bytes = data.length();
1031 uint32_t delay = calculate_replay_delay(
1032 m_event_entry.timestamp,
1033 m_state_builder->local_image_ctx->mirroring_replay_delay);
1034 if (delay == 0) {
1035 handle_preprocess_entry_ready(0);
1036 return;
1037 }
1038
1039 std::unique_lock locker{m_lock};
1040 if (is_replay_complete(locker)) {
1041 // don't schedule a delayed replay task if a shut-down is in-progress
1042 m_event_replay_tracker.finish_op();
1043 return;
1044 }
1045
1046 dout(20) << "delaying replay by " << delay << " sec" << dendl;
1047 std::unique_lock timer_locker{m_threads->timer_lock};
1048 ceph_assert(m_delayed_preprocess_task == nullptr);
1049 m_delayed_preprocess_task = create_context_callback<
1050 Replayer<I>, &Replayer<I>::handle_delayed_preprocess_task>(this);
1051 m_threads->timer->add_event_after(delay, m_delayed_preprocess_task);
1052 }
1053
1054 template <typename I>
1055 void Replayer<I>::handle_delayed_preprocess_task(int r) {
1056 dout(20) << "r=" << r << dendl;
1057
1058 ceph_assert(ceph_mutex_is_locked_by_me(m_threads->timer_lock));
1059 m_delayed_preprocess_task = nullptr;
1060
1061 m_threads->work_queue->queue(create_context_callback<
1062 Replayer, &Replayer<I>::handle_preprocess_entry_ready>(this), 0);
1063 }
1064
1065 template <typename I>
1066 void Replayer<I>::handle_preprocess_entry_ready(int r) {
1067 dout(20) << "r=" << r << dendl;
1068 ceph_assert(r == 0);
1069
1070 m_replay_start_time = ceph_clock_now();
1071 if (!m_event_preprocessor->is_required(m_event_entry)) {
1072 process_entry();
1073 return;
1074 }
1075
1076 Context *ctx = create_context_callback<
1077 Replayer, &Replayer<I>::handle_preprocess_entry_safe>(this);
1078 m_event_preprocessor->preprocess(&m_event_entry, ctx);
1079 }
1080
1081 template <typename I>
1082 void Replayer<I>::handle_preprocess_entry_safe(int r) {
1083 dout(20) << "r=" << r << dendl;
1084
1085 if (r < 0) {
1086 if (r == -ECANCELED) {
1087 handle_replay_complete(0, "lost exclusive lock");
1088 } else {
1089 derr << "failed to preprocess journal event" << dendl;
1090 handle_replay_complete(r, "failed to preprocess journal event");
1091 }
1092
1093 m_event_replay_tracker.finish_op();
1094 return;
1095 }
1096
1097 process_entry();
1098 }
1099
1100 template <typename I>
1101 void Replayer<I>::process_entry() {
1102 dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid()
1103 << dendl;
1104
1105 Context *on_ready = create_context_callback<
1106 Replayer, &Replayer<I>::handle_process_entry_ready>(this);
1107 Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry),
1108 m_replay_bytes,
1109 m_replay_start_time);
1110
1111 m_local_journal_replay->process(m_event_entry, on_ready, on_commit);
1112 }
1113
1114 template <typename I>
1115 void Replayer<I>::handle_process_entry_ready(int r) {
1116 std::unique_lock locker{m_lock};
1117
1118 dout(20) << dendl;
1119 ceph_assert(r == 0);
1120
1121 bool update_status = false;
1122 {
1123 auto local_image_ctx = m_state_builder->local_image_ctx;
1124 std::shared_lock image_locker{local_image_ctx->image_lock};
1125 auto image_spec = util::compute_image_spec(local_image_ctx->md_ctx,
1126 local_image_ctx->name);
1127 if (m_image_spec != image_spec) {
1128 m_image_spec = image_spec;
1129 update_status = true;
1130 }
1131 }
1132
1133 m_replay_status_formatter->handle_entry_processed(m_replay_bytes);
1134
1135 if (update_status) {
1136 unregister_perf_counters();
1137 register_perf_counters();
1138 notify_status_updated();
1139 }
1140
1141 // attempt to process the next event
1142 handle_replay_ready(locker);
1143 }
1144
1145 template <typename I>
1146 void Replayer<I>::handle_process_entry_safe(
1147 const ReplayEntry &replay_entry, uint64_t replay_bytes,
1148 const utime_t &replay_start_time, int r) {
1149 dout(20) << "commit_tid=" << replay_entry.get_commit_tid() << ", r=" << r
1150 << dendl;
1151
1152 if (r < 0) {
1153 derr << "failed to commit journal event: " << cpp_strerror(r) << dendl;
1154 handle_replay_complete(r, "failed to commit journal event");
1155 } else {
1156 ceph_assert(m_state_builder->remote_journaler != nullptr);
1157 m_state_builder->remote_journaler->committed(replay_entry);
1158 }
1159
1160 auto latency = ceph_clock_now() - replay_start_time;
1161 if (g_perf_counters) {
1162 g_perf_counters->inc(l_rbd_mirror_replay);
1163 g_perf_counters->inc(l_rbd_mirror_replay_bytes, replay_bytes);
1164 g_perf_counters->tinc(l_rbd_mirror_replay_latency, latency);
1165 }
1166
1167 auto ctx = new LambdaContext(
1168 [this, replay_bytes, latency](int r) {
1169 std::unique_lock locker{m_lock};
1170 schedule_flush_local_replay_task();
1171
1172 if (m_perf_counters) {
1173 m_perf_counters->inc(l_rbd_mirror_replay);
1174 m_perf_counters->inc(l_rbd_mirror_replay_bytes, replay_bytes);
1175 m_perf_counters->tinc(l_rbd_mirror_replay_latency, latency);
1176 }
1177
1178 m_event_replay_tracker.finish_op();
1179 });
1180 m_threads->work_queue->queue(ctx, 0);
1181 }
1182
1183 template <typename I>
1184 void Replayer<I>::handle_resync_image() {
1185 dout(10) << dendl;
1186
1187 std::unique_lock locker{m_lock};
1188 m_resync_requested = true;
1189 handle_replay_complete(locker, 0, "resync requested");
1190 }
1191
1192 template <typename I>
1193 void Replayer<I>::notify_status_updated() {
1194 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1195
1196 dout(10) << dendl;
1197
1198 auto ctx = new C_TrackedOp(m_in_flight_op_tracker, new LambdaContext(
1199 [this](int) {
1200 m_replayer_listener->handle_notification();
1201 }));
1202 m_threads->work_queue->queue(ctx, 0);
1203 }
1204
1205 template <typename I>
1206 void Replayer<I>::cancel_delayed_preprocess_task() {
1207 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1208
1209 bool canceled_delayed_preprocess_task = false;
1210 {
1211 std::unique_lock timer_locker{m_threads->timer_lock};
1212 if (m_delayed_preprocess_task != nullptr) {
1213 dout(10) << dendl;
1214 canceled_delayed_preprocess_task = m_threads->timer->cancel_event(
1215 m_delayed_preprocess_task);
1216 ceph_assert(canceled_delayed_preprocess_task);
1217 m_delayed_preprocess_task = nullptr;
1218 }
1219 }
1220
1221 if (canceled_delayed_preprocess_task) {
1222 // wake up sleeping replay
1223 m_event_replay_tracker.finish_op();
1224 }
1225 }
1226
1227 template <typename I>
1228 int Replayer<I>::validate_remote_client_state(
1229 const cls::journal::Client& remote_client,
1230 librbd::journal::MirrorPeerClientMeta* remote_client_meta,
1231 bool* resync_requested, std::string* error) {
1232 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1233
1234 if (!util::decode_client_meta(remote_client, remote_client_meta)) {
1235 // require operator intervention since the data is corrupt
1236 *error = "error retrieving remote journal client";
1237 return -EBADMSG;
1238 }
1239
1240 auto local_image_ctx = m_state_builder->local_image_ctx;
1241 dout(5) << "image_id=" << local_image_ctx->id << ", "
1242 << "remote_client_meta.image_id="
1243 << remote_client_meta->image_id << ", "
1244 << "remote_client.state=" << remote_client.state << dendl;
1245 if (remote_client_meta->image_id == local_image_ctx->id &&
1246 remote_client.state != cls::journal::CLIENT_STATE_CONNECTED) {
1247 dout(5) << "client flagged disconnected, stopping image replay" << dendl;
1248 if (local_image_ctx->config.template get_val<bool>(
1249 "rbd_mirroring_resync_after_disconnect")) {
1250 dout(10) << "disconnected: automatic resync" << dendl;
1251 *resync_requested = true;
1252 *error = "disconnected: automatic resync";
1253 return -ENOTCONN;
1254 } else {
1255 dout(10) << "disconnected" << dendl;
1256 *error = "disconnected";
1257 return -ENOTCONN;
1258 }
1259 }
1260
1261 return 0;
1262 }
1263
1264 template <typename I>
1265 void Replayer<I>::register_perf_counters() {
1266 dout(5) << dendl;
1267
1268 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1269 ceph_assert(m_perf_counters == nullptr);
1270
1271 auto cct = static_cast<CephContext *>(m_state_builder->local_image_ctx->cct);
1272 auto prio = cct->_conf.get_val<int64_t>("rbd_mirror_image_perf_stats_prio");
1273 PerfCountersBuilder plb(g_ceph_context, "rbd_mirror_image_" + m_image_spec,
1274 l_rbd_mirror_first, l_rbd_mirror_last);
1275 plb.add_u64_counter(l_rbd_mirror_replay, "replay", "Replays", "r", prio);
1276 plb.add_u64_counter(l_rbd_mirror_replay_bytes, "replay_bytes",
1277 "Replayed data", "rb", prio, unit_t(UNIT_BYTES));
1278 plb.add_time_avg(l_rbd_mirror_replay_latency, "replay_latency",
1279 "Replay latency", "rl", prio);
1280 m_perf_counters = plb.create_perf_counters();
1281 g_ceph_context->get_perfcounters_collection()->add(m_perf_counters);
1282 }
1283
1284 template <typename I>
1285 void Replayer<I>::unregister_perf_counters() {
1286 dout(5) << dendl;
1287 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1288
1289 PerfCounters *perf_counters = nullptr;
1290 std::swap(perf_counters, m_perf_counters);
1291
1292 if (perf_counters != nullptr) {
1293 g_ceph_context->get_perfcounters_collection()->remove(perf_counters);
1294 delete perf_counters;
1295 }
1296 }
1297
1298 } // namespace journal
1299 } // namespace image_replayer
1300 } // namespace mirror
1301 } // namespace rbd
1302
1303 template class rbd::mirror::image_replayer::journal::Replayer<librbd::ImageCtx>;