]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "Replayer.h" | |
5 | #include "common/debug.h" | |
6 | #include "common/errno.h" | |
7 | #include "common/Timer.h" | |
8 | #include "common/WorkQueue.h" | |
9 | #include "librbd/Journal.h" | |
10 | #include "librbd/Utils.h" | |
11 | #include "librbd/journal/Replay.h" | |
12 | #include "journal/Journaler.h" | |
13 | #include "journal/JournalMetadataListener.h" | |
14 | #include "journal/ReplayHandler.h" | |
15 | #include "tools/rbd_mirror/Threads.h" | |
16 | #include "tools/rbd_mirror/Types.h" | |
17 | #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h" | |
18 | #include "tools/rbd_mirror/image_replayer/ReplayerListener.h" | |
19 | #include "tools/rbd_mirror/image_replayer/Utils.h" | |
20 | #include "tools/rbd_mirror/image_replayer/journal/EventPreprocessor.h" | |
21 | #include "tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h" | |
22 | #include "tools/rbd_mirror/image_replayer/journal/StateBuilder.h" | |
23 | ||
24 | #define dout_context g_ceph_context | |
25 | #define dout_subsys ceph_subsys_rbd_mirror | |
26 | #undef dout_prefix | |
27 | #define dout_prefix *_dout << "rbd::mirror::image_replayer::journal::" \ | |
28 | << "Replayer: " << this << " " << __func__ << ": " | |
29 | ||
30 | extern PerfCounters *g_perf_counters; | |
31 | ||
32 | namespace rbd { | |
33 | namespace mirror { | |
34 | namespace image_replayer { | |
35 | namespace journal { | |
36 | ||
37 | namespace { | |
38 | ||
39 | uint32_t calculate_replay_delay(const utime_t &event_time, | |
40 | int mirroring_replay_delay) { | |
41 | if (mirroring_replay_delay <= 0) { | |
42 | return 0; | |
43 | } | |
44 | ||
45 | utime_t now = ceph_clock_now(); | |
46 | if (event_time + mirroring_replay_delay <= now) { | |
47 | return 0; | |
48 | } | |
49 | ||
50 | // ensure it is rounded up when converting to integer | |
51 | return (event_time + mirroring_replay_delay - now) + 1; | |
52 | } | |
53 | ||
54 | } // anonymous namespace | |
55 | ||
56 | using librbd::util::create_async_context_callback; | |
57 | using librbd::util::create_context_callback; | |
58 | ||
59 | template <typename I> | |
60 | struct Replayer<I>::C_ReplayCommitted : public Context { | |
61 | Replayer* replayer; | |
62 | ReplayEntry replay_entry; | |
63 | uint64_t replay_bytes; | |
64 | utime_t replay_start_time; | |
65 | ||
66 | C_ReplayCommitted(Replayer* replayer, ReplayEntry &&replay_entry, | |
67 | uint64_t replay_bytes, const utime_t &replay_start_time) | |
68 | : replayer(replayer), replay_entry(std::move(replay_entry)), | |
69 | replay_bytes(replay_bytes), replay_start_time(replay_start_time) { | |
70 | } | |
71 | ||
72 | void finish(int r) override { | |
73 | replayer->handle_process_entry_safe(replay_entry, replay_bytes, | |
74 | replay_start_time, r); | |
75 | } | |
76 | }; | |
77 | ||
9f95a23c TL |
78 | template <typename I> |
79 | struct Replayer<I>::RemoteJournalerListener | |
80 | : public ::journal::JournalMetadataListener { | |
81 | Replayer* replayer; | |
82 | ||
83 | RemoteJournalerListener(Replayer* replayer) : replayer(replayer) {} | |
84 | ||
85 | void handle_update(::journal::JournalMetadata*) override { | |
1911f103 TL |
86 | auto ctx = new C_TrackedOp( |
87 | replayer->m_in_flight_op_tracker, | |
88 | new LambdaContext([this](int r) { | |
89 | replayer->handle_remote_journal_metadata_updated(); | |
90 | })); | |
9f95a23c TL |
91 | replayer->m_threads->work_queue->queue(ctx, 0); |
92 | } | |
93 | }; | |
94 | ||
95 | template <typename I> | |
96 | struct Replayer<I>::RemoteReplayHandler : public ::journal::ReplayHandler { | |
97 | Replayer* replayer; | |
98 | ||
99 | RemoteReplayHandler(Replayer* replayer) : replayer(replayer) {} | |
100 | ~RemoteReplayHandler() override {}; | |
101 | ||
102 | void handle_entries_available() override { | |
103 | replayer->handle_replay_ready(); | |
104 | } | |
105 | ||
106 | void handle_complete(int r) override { | |
107 | std::string error; | |
108 | if (r == -ENOMEM) { | |
109 | error = "not enough memory in autotune cache"; | |
110 | } else if (r < 0) { | |
111 | error = "replay completed with error: " + cpp_strerror(r); | |
112 | } | |
113 | replayer->handle_replay_complete(r, error); | |
114 | } | |
115 | }; | |
116 | ||
117 | template <typename I> | |
118 | struct Replayer<I>::LocalJournalListener | |
119 | : public librbd::journal::Listener { | |
120 | Replayer* replayer; | |
121 | ||
122 | LocalJournalListener(Replayer* replayer) : replayer(replayer) { | |
123 | } | |
124 | ||
125 | void handle_close() override { | |
126 | replayer->handle_replay_complete(0, ""); | |
127 | } | |
128 | ||
129 | void handle_promoted() override { | |
130 | replayer->handle_replay_complete(0, "force promoted"); | |
131 | } | |
132 | ||
133 | void handle_resync() override { | |
134 | replayer->handle_resync_image(); | |
135 | } | |
136 | }; | |
137 | ||
138 | template <typename I> | |
139 | Replayer<I>::Replayer( | |
140 | Threads<I>* threads, | |
141 | const std::string& local_mirror_uuid, | |
142 | StateBuilder<I>* state_builder, | |
143 | ReplayerListener* replayer_listener) | |
144 | : m_threads(threads), | |
145 | m_local_mirror_uuid(local_mirror_uuid), | |
146 | m_state_builder(state_builder), | |
147 | m_replayer_listener(replayer_listener), | |
148 | m_lock(ceph::make_mutex(librbd::util::unique_lock_name( | |
149 | "rbd::mirror::image_replayer::journal::Replayer", this))) { | |
150 | dout(10) << dendl; | |
151 | ||
152 | { | |
153 | std::unique_lock locker{m_lock}; | |
154 | register_perf_counters(); | |
155 | } | |
156 | } | |
157 | ||
158 | template <typename I> | |
159 | Replayer<I>::~Replayer() { | |
160 | dout(10) << dendl; | |
161 | ||
162 | { | |
163 | std::unique_lock locker{m_lock}; | |
164 | unregister_perf_counters(); | |
165 | } | |
166 | ||
167 | ceph_assert(m_remote_listener == nullptr); | |
168 | ceph_assert(m_local_journal_listener == nullptr); | |
169 | ceph_assert(m_local_journal_replay == nullptr); | |
170 | ceph_assert(m_remote_replay_handler == nullptr); | |
171 | ceph_assert(m_event_preprocessor == nullptr); | |
172 | ceph_assert(m_replay_status_formatter == nullptr); | |
173 | ceph_assert(m_delayed_preprocess_task == nullptr); | |
174 | ceph_assert(m_flush_local_replay_task == nullptr); | |
175 | ceph_assert(m_state_builder->local_image_ctx == nullptr); | |
176 | } | |
177 | ||
178 | template <typename I> | |
179 | void Replayer<I>::init(Context* on_finish) { | |
180 | dout(10) << dendl; | |
181 | ||
182 | ceph_assert(m_local_journal == nullptr); | |
183 | { | |
184 | auto local_image_ctx = m_state_builder->local_image_ctx; | |
185 | std::shared_lock image_locker{local_image_ctx->image_lock}; | |
186 | m_image_spec = util::compute_image_spec(local_image_ctx->md_ctx, | |
187 | local_image_ctx->name); | |
188 | m_local_journal = local_image_ctx->journal; | |
189 | } | |
190 | ||
191 | ceph_assert(m_on_init_shutdown == nullptr); | |
192 | m_on_init_shutdown = on_finish; | |
193 | ||
194 | if (m_local_journal == nullptr) { | |
195 | std::unique_lock locker{m_lock}; | |
196 | m_state = STATE_COMPLETE; | |
197 | m_state_builder->remote_journaler = nullptr; | |
198 | ||
199 | handle_replay_complete(locker, -EINVAL, "error accessing local journal"); | |
200 | close_local_image(); | |
201 | return; | |
202 | } | |
203 | ||
204 | init_remote_journaler(); | |
205 | } | |
206 | ||
207 | template <typename I> | |
208 | void Replayer<I>::shut_down(Context* on_finish) { | |
209 | dout(10) << dendl; | |
210 | ||
211 | std::unique_lock locker{m_lock}; | |
212 | ceph_assert(m_on_init_shutdown == nullptr); | |
213 | m_on_init_shutdown = on_finish; | |
214 | ||
215 | if (m_state == STATE_INIT) { | |
216 | // raced with the last piece of the init state machine | |
217 | return; | |
218 | } else if (m_state == STATE_REPLAYING) { | |
219 | m_state = STATE_COMPLETE; | |
220 | } | |
221 | ||
222 | // if shutting down due to an error notification, we don't | |
223 | // need to propagate the same error again | |
224 | m_error_code = 0; | |
225 | m_error_description = ""; | |
226 | ||
227 | cancel_delayed_preprocess_task(); | |
228 | cancel_flush_local_replay_task(); | |
229 | shut_down_local_journal_replay(); | |
230 | } | |
231 | ||
232 | template <typename I> | |
233 | void Replayer<I>::flush(Context* on_finish) { | |
234 | dout(10) << dendl; | |
235 | ||
1911f103 | 236 | flush_local_replay(new C_TrackedOp(m_in_flight_op_tracker, on_finish)); |
9f95a23c TL |
237 | } |
238 | ||
239 | template <typename I> | |
240 | bool Replayer<I>::get_replay_status(std::string* description, | |
241 | Context* on_finish) { | |
242 | dout(10) << dendl; | |
243 | ||
244 | std::unique_lock locker{m_lock}; | |
245 | if (m_replay_status_formatter == nullptr) { | |
246 | derr << "replay not running" << dendl; | |
247 | locker.unlock(); | |
248 | ||
249 | on_finish->complete(-EAGAIN); | |
250 | return false; | |
251 | } | |
252 | ||
1911f103 | 253 | on_finish = new C_TrackedOp(m_in_flight_op_tracker, on_finish); |
9f95a23c TL |
254 | return m_replay_status_formatter->get_or_send_update(description, |
255 | on_finish); | |
256 | } | |
257 | ||
258 | template <typename I> | |
259 | void Replayer<I>::init_remote_journaler() { | |
260 | dout(10) << dendl; | |
261 | ||
262 | Context *ctx = create_context_callback< | |
263 | Replayer, &Replayer<I>::handle_init_remote_journaler>(this); | |
264 | m_state_builder->remote_journaler->init(ctx); | |
265 | } | |
266 | ||
267 | template <typename I> | |
268 | void Replayer<I>::handle_init_remote_journaler(int r) { | |
269 | dout(10) << "r=" << r << dendl; | |
270 | ||
271 | std::unique_lock locker{m_lock}; | |
272 | if (r < 0) { | |
273 | derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl; | |
274 | handle_replay_complete(locker, r, "error initializing remote journal"); | |
275 | close_local_image(); | |
276 | return; | |
277 | } | |
278 | ||
279 | // listen for metadata updates to check for disconnect events | |
280 | ceph_assert(m_remote_listener == nullptr); | |
281 | m_remote_listener = new RemoteJournalerListener(this); | |
282 | m_state_builder->remote_journaler->add_listener(m_remote_listener); | |
283 | ||
284 | cls::journal::Client remote_client; | |
285 | r = m_state_builder->remote_journaler->get_cached_client(m_local_mirror_uuid, | |
286 | &remote_client); | |
287 | if (r < 0) { | |
288 | derr << "error retrieving remote journal client: " << cpp_strerror(r) | |
289 | << dendl; | |
290 | handle_replay_complete(locker, r, "error retrieving remote journal client"); | |
291 | close_local_image(); | |
292 | return; | |
293 | } | |
294 | ||
295 | std::string error; | |
296 | r = validate_remote_client_state(remote_client, | |
297 | &m_state_builder->remote_client_meta, | |
298 | &m_resync_requested, &error); | |
299 | if (r < 0) { | |
300 | handle_replay_complete(locker, r, error); | |
301 | close_local_image(); | |
302 | return; | |
303 | } | |
304 | ||
305 | start_external_replay(); | |
306 | } | |
307 | ||
308 | template <typename I> | |
309 | void Replayer<I>::start_external_replay() { | |
310 | dout(10) << dendl; | |
311 | ||
312 | Context *start_ctx = create_context_callback< | |
313 | Replayer, &Replayer<I>::handle_start_external_replay>(this); | |
314 | m_local_journal->start_external_replay(&m_local_journal_replay, start_ctx); | |
315 | } | |
316 | ||
317 | template <typename I> | |
318 | void Replayer<I>::handle_start_external_replay(int r) { | |
319 | dout(10) << "r=" << r << dendl; | |
320 | ||
321 | std::unique_lock locker{m_lock}; | |
322 | if (r < 0) { | |
323 | ceph_assert(m_local_journal_replay == nullptr); | |
324 | derr << "error starting external replay on local image " | |
325 | << m_state_builder->local_image_ctx->id << ": " | |
326 | << cpp_strerror(r) << dendl; | |
327 | ||
328 | handle_replay_complete(locker, r, "error starting replay on local image"); | |
329 | close_local_image(); | |
330 | return; | |
331 | } | |
332 | ||
333 | if (!notify_init_complete(locker)) { | |
334 | return; | |
335 | } | |
336 | ||
337 | m_state = STATE_REPLAYING; | |
338 | ||
339 | // listen for promotion and resync requests against local journal | |
340 | m_local_journal_listener = new LocalJournalListener(this); | |
341 | m_local_journal->add_listener(m_local_journal_listener); | |
342 | ||
343 | // verify that the local image wasn't force-promoted and that a resync hasn't | |
344 | // been requested now that we are listening for events | |
345 | if (m_local_journal->is_tag_owner()) { | |
346 | dout(10) << "local image force-promoted" << dendl; | |
347 | handle_replay_complete(locker, 0, "force promoted"); | |
348 | return; | |
349 | } | |
350 | ||
351 | bool resync_requested = false; | |
352 | r = m_local_journal->is_resync_requested(&resync_requested); | |
353 | if (r < 0) { | |
354 | dout(10) << "failed to determine resync state: " << cpp_strerror(r) | |
355 | << dendl; | |
356 | handle_replay_complete(locker, r, "error parsing resync state"); | |
357 | return; | |
358 | } else if (resync_requested) { | |
359 | dout(10) << "local image resync requested" << dendl; | |
360 | handle_replay_complete(locker, 0, "resync requested"); | |
361 | return; | |
362 | } | |
363 | ||
364 | // start remote journal replay | |
365 | m_event_preprocessor = EventPreprocessor<I>::create( | |
366 | *m_state_builder->local_image_ctx, *m_state_builder->remote_journaler, | |
367 | m_local_mirror_uuid, &m_state_builder->remote_client_meta, | |
368 | m_threads->work_queue); | |
369 | m_replay_status_formatter = ReplayStatusFormatter<I>::create( | |
370 | m_state_builder->remote_journaler, m_local_mirror_uuid); | |
371 | ||
372 | auto cct = static_cast<CephContext *>(m_state_builder->local_image_ctx->cct); | |
373 | double poll_seconds = cct->_conf.get_val<double>( | |
374 | "rbd_mirror_journal_poll_age"); | |
375 | m_remote_replay_handler = new RemoteReplayHandler(this); | |
376 | m_state_builder->remote_journaler->start_live_replay(m_remote_replay_handler, | |
377 | poll_seconds); | |
378 | ||
379 | notify_status_updated(); | |
380 | } | |
381 | ||
382 | template <typename I> | |
383 | bool Replayer<I>::notify_init_complete(std::unique_lock<ceph::mutex>& locker) { | |
384 | dout(10) << dendl; | |
385 | ||
386 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
387 | ceph_assert(m_state == STATE_INIT); | |
388 | ||
389 | // notify that init has completed | |
390 | Context *on_finish = nullptr; | |
391 | std::swap(m_on_init_shutdown, on_finish); | |
392 | ||
393 | locker.unlock(); | |
394 | on_finish->complete(0); | |
395 | locker.lock(); | |
396 | ||
397 | if (m_on_init_shutdown != nullptr) { | |
398 | // shut down requested after we notified init complete but before we | |
399 | // grabbed the lock | |
400 | close_local_image(); | |
401 | return false; | |
402 | } | |
403 | ||
404 | return true; | |
405 | } | |
406 | ||
407 | template <typename I> | |
408 | void Replayer<I>::shut_down_local_journal_replay() { | |
409 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
410 | ||
411 | if (m_local_journal_replay == nullptr) { | |
412 | wait_for_event_replay(); | |
413 | return; | |
414 | } | |
415 | ||
416 | dout(10) << dendl; | |
417 | auto ctx = create_context_callback< | |
418 | Replayer<I>, &Replayer<I>::handle_shut_down_local_journal_replay>(this); | |
419 | m_local_journal_replay->shut_down(true, ctx); | |
420 | } | |
421 | ||
422 | template <typename I> | |
423 | void Replayer<I>::handle_shut_down_local_journal_replay(int r) { | |
424 | dout(10) << "r=" << r << dendl; | |
425 | ||
426 | std::unique_lock locker{m_lock}; | |
427 | if (r < 0) { | |
428 | derr << "error shutting down journal replay: " << cpp_strerror(r) << dendl; | |
429 | handle_replay_error(r, "failed to shut down local journal replay"); | |
430 | } | |
431 | ||
432 | wait_for_event_replay(); | |
433 | } | |
434 | ||
435 | template <typename I> | |
436 | void Replayer<I>::wait_for_event_replay() { | |
437 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
438 | ||
439 | dout(10) << dendl; | |
440 | auto ctx = create_async_context_callback( | |
441 | m_threads->work_queue, create_context_callback< | |
442 | Replayer<I>, &Replayer<I>::handle_wait_for_event_replay>(this)); | |
443 | m_event_replay_tracker.wait_for_ops(ctx); | |
444 | } | |
445 | ||
446 | template <typename I> | |
447 | void Replayer<I>::handle_wait_for_event_replay(int r) { | |
448 | dout(10) << "r=" << r << dendl; | |
449 | ||
450 | std::unique_lock locker{m_lock}; | |
451 | close_local_image(); | |
452 | } | |
453 | ||
454 | template <typename I> | |
455 | void Replayer<I>::close_local_image() { | |
456 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
457 | if (m_state_builder->local_image_ctx == nullptr) { | |
458 | stop_remote_journaler_replay(); | |
459 | return; | |
460 | } | |
461 | ||
462 | dout(10) << dendl; | |
463 | if (m_local_journal_listener != nullptr) { | |
464 | // blocks if listener notification is in-progress | |
465 | m_local_journal->remove_listener(m_local_journal_listener); | |
466 | delete m_local_journal_listener; | |
467 | m_local_journal_listener = nullptr; | |
468 | } | |
469 | ||
470 | if (m_local_journal_replay != nullptr) { | |
471 | m_local_journal->stop_external_replay(); | |
472 | m_local_journal_replay = nullptr; | |
473 | } | |
474 | ||
475 | if (m_event_preprocessor != nullptr) { | |
476 | image_replayer::journal::EventPreprocessor<I>::destroy( | |
477 | m_event_preprocessor); | |
478 | m_event_preprocessor = nullptr; | |
479 | } | |
480 | ||
481 | m_local_journal.reset(); | |
482 | ||
483 | // NOTE: it's important to ensure that the local image is fully | |
484 | // closed before attempting to close the remote journal in | |
485 | // case the remote cluster is unreachable | |
486 | ceph_assert(m_state_builder->local_image_ctx != nullptr); | |
487 | auto ctx = create_context_callback< | |
488 | Replayer<I>, &Replayer<I>::handle_close_local_image>(this); | |
489 | auto request = image_replayer::CloseImageRequest<I>::create( | |
490 | &m_state_builder->local_image_ctx, ctx); | |
491 | request->send(); | |
492 | } | |
493 | ||
494 | ||
495 | template <typename I> | |
496 | void Replayer<I>::handle_close_local_image(int r) { | |
497 | dout(10) << "r=" << r << dendl; | |
498 | ||
499 | std::unique_lock locker{m_lock}; | |
500 | if (r < 0) { | |
501 | derr << "error closing local iamge: " << cpp_strerror(r) << dendl; | |
502 | handle_replay_error(r, "failed to close local image"); | |
503 | } | |
504 | ||
505 | ceph_assert(m_state_builder->local_image_ctx == nullptr); | |
506 | stop_remote_journaler_replay(); | |
507 | } | |
508 | ||
509 | template <typename I> | |
510 | void Replayer<I>::stop_remote_journaler_replay() { | |
511 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
512 | ||
513 | if (m_state_builder->remote_journaler == nullptr) { | |
514 | wait_for_in_flight_ops(); | |
515 | return; | |
516 | } else if (m_remote_replay_handler == nullptr) { | |
517 | wait_for_in_flight_ops(); | |
518 | return; | |
519 | } | |
520 | ||
521 | dout(10) << dendl; | |
522 | auto ctx = create_async_context_callback( | |
523 | m_threads->work_queue, create_context_callback< | |
524 | Replayer<I>, &Replayer<I>::handle_stop_remote_journaler_replay>(this)); | |
525 | m_state_builder->remote_journaler->stop_replay(ctx); | |
526 | } | |
527 | ||
528 | template <typename I> | |
529 | void Replayer<I>::handle_stop_remote_journaler_replay(int r) { | |
530 | dout(10) << "r=" << r << dendl; | |
531 | ||
532 | std::unique_lock locker{m_lock}; | |
533 | if (r < 0) { | |
534 | derr << "failed to stop remote journaler replay : " << cpp_strerror(r) | |
535 | << dendl; | |
536 | handle_replay_error(r, "failed to stop remote journaler replay"); | |
537 | } | |
538 | ||
539 | delete m_remote_replay_handler; | |
540 | m_remote_replay_handler = nullptr; | |
541 | ||
542 | wait_for_in_flight_ops(); | |
543 | } | |
544 | ||
545 | template <typename I> | |
546 | void Replayer<I>::wait_for_in_flight_ops() { | |
547 | dout(10) << dendl; | |
548 | if (m_remote_listener != nullptr) { | |
549 | m_state_builder->remote_journaler->remove_listener(m_remote_listener); | |
550 | delete m_remote_listener; | |
551 | m_remote_listener = nullptr; | |
552 | } | |
553 | ||
554 | auto ctx = create_async_context_callback( | |
555 | m_threads->work_queue, create_context_callback< | |
556 | Replayer<I>, &Replayer<I>::handle_wait_for_in_flight_ops>(this)); | |
557 | m_in_flight_op_tracker.wait_for_ops(ctx); | |
558 | } | |
559 | ||
560 | template <typename I> | |
561 | void Replayer<I>::handle_wait_for_in_flight_ops(int r) { | |
562 | dout(10) << "r=" << r << dendl; | |
563 | ||
564 | ReplayStatusFormatter<I>::destroy(m_replay_status_formatter); | |
565 | m_replay_status_formatter = nullptr; | |
566 | ||
567 | Context* on_init_shutdown = nullptr; | |
568 | { | |
569 | std::unique_lock locker{m_lock}; | |
570 | ceph_assert(m_on_init_shutdown != nullptr); | |
571 | std::swap(m_on_init_shutdown, on_init_shutdown); | |
572 | m_state = STATE_COMPLETE; | |
573 | } | |
574 | on_init_shutdown->complete(m_error_code); | |
575 | } | |
576 | ||
577 | template <typename I> | |
578 | void Replayer<I>::handle_remote_journal_metadata_updated() { | |
579 | dout(20) << dendl; | |
580 | ||
581 | std::unique_lock locker{m_lock}; | |
582 | if (m_state != STATE_REPLAYING) { | |
583 | return; | |
584 | } | |
585 | ||
586 | cls::journal::Client remote_client; | |
587 | int r = m_state_builder->remote_journaler->get_cached_client( | |
588 | m_local_mirror_uuid, &remote_client); | |
589 | if (r < 0) { | |
590 | derr << "failed to retrieve client: " << cpp_strerror(r) << dendl; | |
591 | return; | |
592 | } | |
593 | ||
594 | librbd::journal::MirrorPeerClientMeta remote_client_meta; | |
595 | std::string error; | |
596 | r = validate_remote_client_state(remote_client, &remote_client_meta, | |
597 | &m_resync_requested, &error); | |
598 | if (r < 0) { | |
599 | dout(0) << "client flagged disconnected, stopping image replay" << dendl; | |
600 | handle_replay_complete(locker, r, error); | |
601 | } | |
602 | } | |
603 | ||
604 | template <typename I> | |
605 | void Replayer<I>::schedule_flush_local_replay_task() { | |
606 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
607 | ||
608 | std::unique_lock timer_locker{m_threads->timer_lock}; | |
609 | if (m_state != STATE_REPLAYING || m_flush_local_replay_task != nullptr) { | |
610 | return; | |
611 | } | |
612 | ||
613 | dout(15) << dendl; | |
614 | m_flush_local_replay_task = create_async_context_callback( | |
615 | m_threads->work_queue, create_context_callback< | |
616 | Replayer<I>, &Replayer<I>::handle_flush_local_replay_task>(this)); | |
617 | m_threads->timer->add_event_after(30, m_flush_local_replay_task); | |
618 | } | |
619 | ||
620 | template <typename I> | |
621 | void Replayer<I>::cancel_flush_local_replay_task() { | |
622 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
623 | ||
624 | std::unique_lock timer_locker{m_threads->timer_lock}; | |
625 | if (m_flush_local_replay_task != nullptr) { | |
626 | dout(10) << dendl; | |
627 | m_threads->timer->cancel_event(m_flush_local_replay_task); | |
628 | m_flush_local_replay_task = nullptr; | |
629 | } | |
630 | } | |
631 | ||
632 | template <typename I> | |
633 | void Replayer<I>::handle_flush_local_replay_task(int) { | |
634 | dout(15) << dendl; | |
635 | ||
636 | m_in_flight_op_tracker.start_op(); | |
637 | auto on_finish = new LambdaContext([this](int) { | |
638 | std::unique_lock locker{m_lock}; | |
639 | ||
640 | { | |
641 | std::unique_lock timer_locker{m_threads->timer_lock}; | |
642 | m_flush_local_replay_task = nullptr; | |
643 | } | |
644 | ||
645 | notify_status_updated(); | |
646 | m_in_flight_op_tracker.finish_op(); | |
647 | }); | |
648 | flush_local_replay(on_finish); | |
649 | } | |
650 | ||
651 | template <typename I> | |
652 | void Replayer<I>::flush_local_replay(Context* on_flush) { | |
653 | std::unique_lock locker{m_lock}; | |
654 | if (m_state != STATE_REPLAYING) { | |
655 | locker.unlock(); | |
656 | on_flush->complete(0); | |
657 | return; | |
658 | } else if (m_local_journal_replay == nullptr) { | |
659 | // raced w/ a tag creation stop/start, which implies that | |
660 | // the replay is flushed | |
661 | locker.unlock(); | |
662 | flush_commit_position(on_flush); | |
663 | return; | |
664 | } | |
665 | ||
666 | dout(15) << dendl; | |
667 | auto ctx = new LambdaContext( | |
668 | [this, on_flush](int r) { | |
669 | handle_flush_local_replay(on_flush, r); | |
670 | }); | |
671 | m_local_journal_replay->flush(ctx); | |
672 | } | |
673 | ||
674 | template <typename I> | |
675 | void Replayer<I>::handle_flush_local_replay(Context* on_flush, int r) { | |
676 | dout(15) << "r=" << r << dendl; | |
677 | if (r < 0) { | |
678 | derr << "error flushing local replay: " << cpp_strerror(r) << dendl; | |
679 | on_flush->complete(r); | |
680 | return; | |
681 | } | |
682 | ||
683 | flush_commit_position(on_flush); | |
684 | } | |
685 | ||
686 | template <typename I> | |
687 | void Replayer<I>::flush_commit_position(Context* on_flush) { | |
688 | std::unique_lock locker{m_lock}; | |
689 | if (m_state != STATE_REPLAYING) { | |
690 | locker.unlock(); | |
691 | on_flush->complete(0); | |
692 | return; | |
693 | } | |
694 | ||
695 | dout(15) << dendl; | |
696 | auto ctx = new LambdaContext( | |
697 | [this, on_flush](int r) { | |
698 | handle_flush_commit_position(on_flush, r); | |
699 | }); | |
700 | m_state_builder->remote_journaler->flush_commit_position(ctx); | |
701 | } | |
702 | ||
703 | template <typename I> | |
704 | void Replayer<I>::handle_flush_commit_position(Context* on_flush, int r) { | |
705 | dout(15) << "r=" << r << dendl; | |
706 | if (r < 0) { | |
707 | derr << "error flushing remote journal commit position: " | |
708 | << cpp_strerror(r) << dendl; | |
709 | } | |
710 | ||
711 | on_flush->complete(r); | |
712 | } | |
713 | ||
714 | template <typename I> | |
715 | void Replayer<I>::handle_replay_error(int r, const std::string &error) { | |
716 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
717 | ||
718 | if (m_error_code == 0) { | |
719 | m_error_code = r; | |
720 | m_error_description = error; | |
721 | } | |
722 | } | |
723 | ||
724 | template <typename I> | |
725 | bool Replayer<I>::is_replay_complete() const { | |
726 | std::unique_lock locker{m_lock}; | |
727 | return is_replay_complete(locker); | |
728 | } | |
729 | ||
730 | template <typename I> | |
731 | bool Replayer<I>::is_replay_complete( | |
732 | const std::unique_lock<ceph::mutex>&) const { | |
733 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
734 | return (m_state == STATE_COMPLETE); | |
735 | } | |
736 | ||
737 | template <typename I> | |
738 | void Replayer<I>::handle_replay_complete(int r, const std::string &error) { | |
739 | std::unique_lock locker{m_lock}; | |
740 | handle_replay_complete(locker, r, error); | |
741 | } | |
742 | ||
743 | template <typename I> | |
744 | void Replayer<I>::handle_replay_complete( | |
745 | const std::unique_lock<ceph::mutex>&, int r, const std::string &error) { | |
746 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
747 | ||
748 | dout(10) << "r=" << r << ", error=" << error << dendl; | |
749 | if (r < 0) { | |
750 | derr << "replay encountered an error: " << cpp_strerror(r) << dendl; | |
751 | handle_replay_error(r, error); | |
752 | } | |
753 | ||
754 | if (m_state != STATE_REPLAYING) { | |
755 | return; | |
756 | } | |
757 | ||
758 | m_state = STATE_COMPLETE; | |
759 | notify_status_updated(); | |
760 | } | |
761 | ||
762 | template <typename I> | |
763 | void Replayer<I>::handle_replay_ready() { | |
764 | std::unique_lock locker{m_lock}; | |
765 | handle_replay_ready(locker); | |
766 | } | |
767 | ||
768 | template <typename I> | |
769 | void Replayer<I>::handle_replay_ready( | |
770 | std::unique_lock<ceph::mutex>& locker) { | |
771 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
772 | ||
773 | dout(20) << dendl; | |
774 | if (is_replay_complete(locker)) { | |
775 | return; | |
776 | } | |
777 | ||
778 | if (!m_state_builder->remote_journaler->try_pop_front(&m_replay_entry, | |
779 | &m_replay_tag_tid)) { | |
780 | dout(20) << "no entries ready for replay" << dendl; | |
781 | return; | |
782 | } | |
783 | ||
784 | // can safely drop lock once the entry is tracked | |
785 | m_event_replay_tracker.start_op(); | |
786 | locker.unlock(); | |
787 | ||
788 | dout(20) << "entry tid=" << m_replay_entry.get_commit_tid() | |
789 | << "tag_tid=" << m_replay_tag_tid << dendl; | |
790 | if (!m_replay_tag_valid || m_replay_tag.tid != m_replay_tag_tid) { | |
791 | // must allocate a new local journal tag prior to processing | |
792 | replay_flush(); | |
793 | return; | |
794 | } | |
795 | ||
796 | preprocess_entry(); | |
797 | } | |
798 | ||
799 | template <typename I> | |
800 | void Replayer<I>::replay_flush() { | |
801 | dout(10) << dendl; | |
802 | ||
803 | // shut down the replay to flush all IO and ops and create a new | |
804 | // replayer to handle the new tag epoch | |
805 | auto ctx = create_context_callback< | |
806 | Replayer<I>, &Replayer<I>::handle_replay_flush_shut_down>(this); | |
807 | ceph_assert(m_local_journal_replay != nullptr); | |
808 | m_local_journal_replay->shut_down(false, ctx); | |
809 | } | |
810 | ||
811 | template <typename I> | |
812 | void Replayer<I>::handle_replay_flush_shut_down(int r) { | |
813 | { | |
814 | std::unique_lock locker{m_lock}; | |
815 | ceph_assert(m_local_journal != nullptr); | |
816 | m_local_journal->stop_external_replay(); | |
817 | m_local_journal_replay = nullptr; | |
818 | } | |
819 | ||
820 | dout(10) << "r=" << r << dendl; | |
821 | if (r < 0) { | |
822 | handle_replay_flush(r); | |
823 | return; | |
824 | } | |
825 | ||
826 | auto ctx = create_context_callback< | |
827 | Replayer<I>, &Replayer<I>::handle_replay_flush>(this); | |
828 | m_local_journal->start_external_replay(&m_local_journal_replay, ctx); | |
829 | } | |
830 | ||
831 | template <typename I> | |
832 | void Replayer<I>::handle_replay_flush(int r) { | |
833 | dout(10) << "r=" << r << dendl; | |
834 | if (r < 0) { | |
835 | derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl; | |
836 | handle_replay_complete(r, "replay flush encountered an error"); | |
837 | m_event_replay_tracker.finish_op(); | |
838 | return; | |
839 | } else if (is_replay_complete()) { | |
840 | m_event_replay_tracker.finish_op(); | |
841 | return; | |
842 | } | |
843 | ||
844 | get_remote_tag(); | |
845 | } | |
846 | ||
847 | template <typename I> | |
848 | void Replayer<I>::get_remote_tag() { | |
849 | dout(15) << "tag_tid: " << m_replay_tag_tid << dendl; | |
850 | ||
851 | Context *ctx = create_context_callback< | |
852 | Replayer, &Replayer<I>::handle_get_remote_tag>(this); | |
853 | m_state_builder->remote_journaler->get_tag(m_replay_tag_tid, &m_replay_tag, | |
854 | ctx); | |
855 | } | |
856 | ||
857 | template <typename I> | |
858 | void Replayer<I>::handle_get_remote_tag(int r) { | |
859 | dout(15) << "r=" << r << dendl; | |
860 | ||
861 | if (r == 0) { | |
862 | try { | |
863 | auto it = m_replay_tag.data.cbegin(); | |
864 | decode(m_replay_tag_data, it); | |
865 | } catch (const buffer::error &err) { | |
866 | r = -EBADMSG; | |
867 | } | |
868 | } | |
869 | ||
870 | if (r < 0) { | |
871 | derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": " | |
872 | << cpp_strerror(r) << dendl; | |
873 | handle_replay_complete(r, "failed to retrieve remote tag"); | |
874 | m_event_replay_tracker.finish_op(); | |
875 | return; | |
876 | } | |
877 | ||
878 | m_replay_tag_valid = true; | |
879 | dout(15) << "decoded remote tag " << m_replay_tag_tid << ": " | |
880 | << m_replay_tag_data << dendl; | |
881 | ||
882 | allocate_local_tag(); | |
883 | } | |
884 | ||
885 | template <typename I> | |
886 | void Replayer<I>::allocate_local_tag() { | |
887 | dout(15) << dendl; | |
888 | ||
889 | std::string mirror_uuid = m_replay_tag_data.mirror_uuid; | |
890 | if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) { | |
891 | mirror_uuid = m_state_builder->remote_mirror_uuid; | |
892 | } else if (mirror_uuid == m_local_mirror_uuid) { | |
893 | mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID; | |
894 | } else if (mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) { | |
895 | // handle possible edge condition where daemon can failover and | |
896 | // the local image has already been promoted/demoted | |
897 | auto local_tag_data = m_local_journal->get_tag_data(); | |
898 | if (local_tag_data.mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID && | |
899 | (local_tag_data.predecessor.commit_valid && | |
900 | local_tag_data.predecessor.mirror_uuid == | |
901 | librbd::Journal<>::LOCAL_MIRROR_UUID)) { | |
902 | dout(15) << "skipping stale demotion event" << dendl; | |
903 | handle_process_entry_safe(m_replay_entry, m_replay_bytes, | |
904 | m_replay_start_time, 0); | |
905 | handle_replay_ready(); | |
906 | return; | |
907 | } else { | |
908 | dout(5) << "encountered image demotion: stopping" << dendl; | |
909 | handle_replay_complete(0, ""); | |
910 | } | |
911 | } | |
912 | ||
913 | librbd::journal::TagPredecessor predecessor(m_replay_tag_data.predecessor); | |
914 | if (predecessor.mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) { | |
915 | predecessor.mirror_uuid = m_state_builder->remote_mirror_uuid; | |
916 | } else if (predecessor.mirror_uuid == m_local_mirror_uuid) { | |
917 | predecessor.mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID; | |
918 | } | |
919 | ||
920 | dout(15) << "mirror_uuid=" << mirror_uuid << ", " | |
921 | << "predecessor=" << predecessor << ", " | |
922 | << "replay_tag_tid=" << m_replay_tag_tid << dendl; | |
923 | Context *ctx = create_context_callback< | |
924 | Replayer, &Replayer<I>::handle_allocate_local_tag>(this); | |
925 | m_local_journal->allocate_tag(mirror_uuid, predecessor, ctx); | |
926 | } | |
927 | ||
928 | template <typename I> | |
929 | void Replayer<I>::handle_allocate_local_tag(int r) { | |
930 | dout(15) << "r=" << r << ", " | |
931 | << "tag_tid=" << m_local_journal->get_tag_tid() << dendl; | |
932 | if (r < 0) { | |
933 | derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl; | |
934 | handle_replay_complete(r, "failed to allocate journal tag"); | |
935 | m_event_replay_tracker.finish_op(); | |
936 | return; | |
937 | } | |
938 | ||
939 | preprocess_entry(); | |
940 | } | |
941 | ||
942 | template <typename I> | |
943 | void Replayer<I>::preprocess_entry() { | |
944 | dout(20) << "preprocessing entry tid=" << m_replay_entry.get_commit_tid() | |
945 | << dendl; | |
946 | ||
947 | bufferlist data = m_replay_entry.get_data(); | |
948 | auto it = data.cbegin(); | |
949 | int r = m_local_journal_replay->decode(&it, &m_event_entry); | |
950 | if (r < 0) { | |
951 | derr << "failed to decode journal event" << dendl; | |
952 | handle_replay_complete(r, "failed to decode journal event"); | |
953 | m_event_replay_tracker.finish_op(); | |
954 | return; | |
955 | } | |
956 | ||
957 | m_replay_bytes = data.length(); | |
958 | uint32_t delay = calculate_replay_delay( | |
959 | m_event_entry.timestamp, | |
960 | m_state_builder->local_image_ctx->mirroring_replay_delay); | |
961 | if (delay == 0) { | |
962 | handle_preprocess_entry_ready(0); | |
963 | return; | |
964 | } | |
965 | ||
966 | std::unique_lock locker{m_lock}; | |
967 | if (is_replay_complete(locker)) { | |
968 | // don't schedule a delayed replay task if a shut-down is in-progress | |
969 | m_event_replay_tracker.finish_op(); | |
970 | return; | |
971 | } | |
972 | ||
973 | dout(20) << "delaying replay by " << delay << " sec" << dendl; | |
974 | std::unique_lock timer_locker{m_threads->timer_lock}; | |
975 | ceph_assert(m_delayed_preprocess_task == nullptr); | |
976 | m_delayed_preprocess_task = create_context_callback< | |
977 | Replayer<I>, &Replayer<I>::handle_delayed_preprocess_task>(this); | |
978 | m_threads->timer->add_event_after(delay, m_delayed_preprocess_task); | |
979 | } | |
980 | ||
981 | template <typename I> | |
982 | void Replayer<I>::handle_delayed_preprocess_task(int r) { | |
983 | dout(20) << "r=" << r << dendl; | |
984 | ||
985 | ceph_assert(ceph_mutex_is_locked_by_me(m_threads->timer_lock)); | |
986 | m_delayed_preprocess_task = nullptr; | |
987 | ||
988 | m_threads->work_queue->queue(create_context_callback< | |
989 | Replayer, &Replayer<I>::handle_preprocess_entry_ready>(this), 0); | |
990 | } | |
991 | ||
992 | template <typename I> | |
993 | void Replayer<I>::handle_preprocess_entry_ready(int r) { | |
994 | dout(20) << "r=" << r << dendl; | |
995 | ceph_assert(r == 0); | |
996 | ||
997 | m_replay_start_time = ceph_clock_now(); | |
998 | if (!m_event_preprocessor->is_required(m_event_entry)) { | |
999 | process_entry(); | |
1000 | return; | |
1001 | } | |
1002 | ||
1003 | Context *ctx = create_context_callback< | |
1004 | Replayer, &Replayer<I>::handle_preprocess_entry_safe>(this); | |
1005 | m_event_preprocessor->preprocess(&m_event_entry, ctx); | |
1006 | } | |
1007 | ||
1008 | template <typename I> | |
1009 | void Replayer<I>::handle_preprocess_entry_safe(int r) { | |
1010 | dout(20) << "r=" << r << dendl; | |
1011 | ||
1012 | if (r < 0) { | |
1013 | if (r == -ECANCELED) { | |
1014 | handle_replay_complete(0, "lost exclusive lock"); | |
1015 | } else { | |
1016 | derr << "failed to preprocess journal event" << dendl; | |
1017 | handle_replay_complete(r, "failed to preprocess journal event"); | |
1018 | } | |
1019 | ||
1020 | m_event_replay_tracker.finish_op(); | |
1021 | return; | |
1022 | } | |
1023 | ||
1024 | process_entry(); | |
1025 | } | |
1026 | ||
1027 | template <typename I> | |
1028 | void Replayer<I>::process_entry() { | |
1029 | dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid() | |
1030 | << dendl; | |
1031 | ||
1032 | Context *on_ready = create_context_callback< | |
1033 | Replayer, &Replayer<I>::handle_process_entry_ready>(this); | |
1034 | Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry), | |
1035 | m_replay_bytes, | |
1036 | m_replay_start_time); | |
1037 | ||
1038 | m_local_journal_replay->process(m_event_entry, on_ready, on_commit); | |
1039 | } | |
1040 | ||
1041 | template <typename I> | |
1042 | void Replayer<I>::handle_process_entry_ready(int r) { | |
1043 | std::unique_lock locker{m_lock}; | |
1044 | ||
1045 | dout(20) << dendl; | |
1046 | ceph_assert(r == 0); | |
1047 | ||
1048 | bool update_status = false; | |
1049 | { | |
1050 | auto local_image_ctx = m_state_builder->local_image_ctx; | |
1051 | std::shared_lock image_locker{local_image_ctx->image_lock}; | |
1052 | auto image_spec = util::compute_image_spec(local_image_ctx->md_ctx, | |
1053 | local_image_ctx->name); | |
1054 | if (m_image_spec != image_spec) { | |
1055 | m_image_spec = image_spec; | |
1056 | update_status = true; | |
1057 | } | |
1058 | } | |
1059 | ||
1911f103 TL |
1060 | m_replay_status_formatter->handle_entry_processed(m_replay_bytes); |
1061 | ||
9f95a23c TL |
1062 | if (update_status) { |
1063 | unregister_perf_counters(); | |
1064 | register_perf_counters(); | |
1065 | notify_status_updated(); | |
1066 | } | |
1067 | ||
1068 | // attempt to process the next event | |
1069 | handle_replay_ready(locker); | |
1070 | } | |
1071 | ||
1072 | template <typename I> | |
1073 | void Replayer<I>::handle_process_entry_safe( | |
1074 | const ReplayEntry &replay_entry, uint64_t replay_bytes, | |
1075 | const utime_t &replay_start_time, int r) { | |
1076 | dout(20) << "commit_tid=" << replay_entry.get_commit_tid() << ", r=" << r | |
1077 | << dendl; | |
1078 | ||
1079 | if (r < 0) { | |
1080 | derr << "failed to commit journal event: " << cpp_strerror(r) << dendl; | |
1081 | handle_replay_complete(r, "failed to commit journal event"); | |
1082 | } else { | |
1083 | ceph_assert(m_state_builder->remote_journaler != nullptr); | |
1084 | m_state_builder->remote_journaler->committed(replay_entry); | |
1085 | } | |
1086 | ||
1087 | auto latency = ceph_clock_now() - replay_start_time; | |
1088 | if (g_perf_counters) { | |
1089 | g_perf_counters->inc(l_rbd_mirror_replay); | |
1090 | g_perf_counters->inc(l_rbd_mirror_replay_bytes, replay_bytes); | |
1091 | g_perf_counters->tinc(l_rbd_mirror_replay_latency, latency); | |
1092 | } | |
1093 | ||
1094 | auto ctx = new LambdaContext( | |
1095 | [this, replay_bytes, latency](int r) { | |
1096 | std::unique_lock locker{m_lock}; | |
1097 | schedule_flush_local_replay_task(); | |
1098 | ||
1099 | if (m_perf_counters) { | |
1100 | m_perf_counters->inc(l_rbd_mirror_replay); | |
1101 | m_perf_counters->inc(l_rbd_mirror_replay_bytes, replay_bytes); | |
1102 | m_perf_counters->tinc(l_rbd_mirror_replay_latency, latency); | |
1103 | } | |
1104 | ||
1105 | m_event_replay_tracker.finish_op(); | |
1106 | }); | |
1107 | m_threads->work_queue->queue(ctx, 0); | |
1108 | } | |
1109 | ||
1110 | template <typename I> | |
1111 | void Replayer<I>::handle_resync_image() { | |
1112 | dout(10) << dendl; | |
1113 | ||
1114 | std::unique_lock locker{m_lock}; | |
1115 | m_resync_requested = true; | |
1116 | handle_replay_complete(locker, 0, "resync requested"); | |
1117 | } | |
1118 | ||
1119 | template <typename I> | |
1120 | void Replayer<I>::notify_status_updated() { | |
1121 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
1122 | ||
1123 | dout(10) << dendl; | |
1124 | ||
1911f103 | 1125 | auto ctx = new C_TrackedOp(m_in_flight_op_tracker, new LambdaContext( |
9f95a23c TL |
1126 | [this](int) { |
1127 | m_replayer_listener->handle_notification(); | |
1128 | })); | |
1129 | m_threads->work_queue->queue(ctx, 0); | |
1130 | } | |
1131 | ||
1132 | template <typename I> | |
1133 | void Replayer<I>::cancel_delayed_preprocess_task() { | |
1134 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
1135 | ||
1136 | bool canceled_delayed_preprocess_task = false; | |
1137 | { | |
1138 | std::unique_lock timer_locker{m_threads->timer_lock}; | |
1139 | if (m_delayed_preprocess_task != nullptr) { | |
1140 | dout(10) << dendl; | |
1141 | canceled_delayed_preprocess_task = m_threads->timer->cancel_event( | |
1142 | m_delayed_preprocess_task); | |
1143 | ceph_assert(canceled_delayed_preprocess_task); | |
1144 | m_delayed_preprocess_task = nullptr; | |
1145 | } | |
1146 | } | |
1147 | ||
1148 | if (canceled_delayed_preprocess_task) { | |
1149 | // wake up sleeping replay | |
1150 | m_event_replay_tracker.finish_op(); | |
1151 | } | |
1152 | } | |
1153 | ||
1154 | template <typename I> | |
1155 | int Replayer<I>::validate_remote_client_state( | |
1156 | const cls::journal::Client& remote_client, | |
1157 | librbd::journal::MirrorPeerClientMeta* remote_client_meta, | |
1158 | bool* resync_requested, std::string* error) { | |
1159 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
1160 | ||
1161 | if (!util::decode_client_meta(remote_client, remote_client_meta)) { | |
1162 | // require operator intervention since the data is corrupt | |
1163 | *error = "error retrieving remote journal client"; | |
1164 | return -EBADMSG; | |
1165 | } | |
1166 | ||
1167 | auto local_image_ctx = m_state_builder->local_image_ctx; | |
1168 | dout(5) << "image_id=" << local_image_ctx->id << ", " | |
1169 | << "remote_client_meta.image_id=" | |
1170 | << remote_client_meta->image_id << ", " | |
1171 | << "remote_client.state=" << remote_client.state << dendl; | |
1172 | if (remote_client_meta->image_id == local_image_ctx->id && | |
1173 | remote_client.state != cls::journal::CLIENT_STATE_CONNECTED) { | |
1174 | dout(5) << "client flagged disconnected, stopping image replay" << dendl; | |
1175 | if (local_image_ctx->config.template get_val<bool>( | |
1176 | "rbd_mirroring_resync_after_disconnect")) { | |
1177 | dout(10) << "disconnected: automatic resync" << dendl; | |
1178 | *resync_requested = true; | |
1179 | *error = "disconnected: automatic resync"; | |
1180 | return -ENOTCONN; | |
1181 | } else { | |
1182 | dout(10) << "disconnected" << dendl; | |
1183 | *error = "disconnected"; | |
1184 | return -ENOTCONN; | |
1185 | } | |
1186 | } | |
1187 | ||
1188 | return 0; | |
1189 | } | |
1190 | ||
1191 | template <typename I> | |
1192 | void Replayer<I>::register_perf_counters() { | |
1193 | dout(5) << dendl; | |
1194 | ||
1195 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
1196 | ceph_assert(m_perf_counters == nullptr); | |
1197 | ||
1198 | auto cct = static_cast<CephContext *>(m_state_builder->local_image_ctx->cct); | |
1199 | auto prio = cct->_conf.get_val<int64_t>("rbd_mirror_image_perf_stats_prio"); | |
1200 | PerfCountersBuilder plb(g_ceph_context, "rbd_mirror_image_" + m_image_spec, | |
1201 | l_rbd_mirror_first, l_rbd_mirror_last); | |
1202 | plb.add_u64_counter(l_rbd_mirror_replay, "replay", "Replays", "r", prio); | |
1203 | plb.add_u64_counter(l_rbd_mirror_replay_bytes, "replay_bytes", | |
1204 | "Replayed data", "rb", prio, unit_t(UNIT_BYTES)); | |
1205 | plb.add_time_avg(l_rbd_mirror_replay_latency, "replay_latency", | |
1206 | "Replay latency", "rl", prio); | |
1207 | m_perf_counters = plb.create_perf_counters(); | |
1208 | g_ceph_context->get_perfcounters_collection()->add(m_perf_counters); | |
1209 | } | |
1210 | ||
1211 | template <typename I> | |
1212 | void Replayer<I>::unregister_perf_counters() { | |
1213 | dout(5) << dendl; | |
1214 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
1215 | ||
1216 | PerfCounters *perf_counters = nullptr; | |
1217 | std::swap(perf_counters, m_perf_counters); | |
1218 | ||
1219 | if (perf_counters != nullptr) { | |
1220 | g_ceph_context->get_perfcounters_collection()->remove(perf_counters); | |
1221 | delete perf_counters; | |
1222 | } | |
1223 | } | |
1224 | ||
1225 | } // namespace journal | |
1226 | } // namespace image_replayer | |
1227 | } // namespace mirror | |
1228 | } // namespace rbd | |
1229 | ||
1230 | template class rbd::mirror::image_replayer::journal::Replayer<librbd::ImageCtx>; |