]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "include/stringify.h" | |
9f95a23c | 5 | #include "common/Cond.h" |
7c673cae FG |
6 | #include "common/Timer.h" |
7 | #include "common/debug.h" | |
8 | #include "common/errno.h" | |
9 | #include "librbd/Utils.h" | |
f67539c2 | 10 | #include "librbd/asio/ContextWQ.h" |
7c673cae FG |
11 | #include "ImageReplayer.h" |
12 | #include "InstanceReplayer.h" | |
c07f9fc5 | 13 | #include "ServiceDaemon.h" |
7c673cae FG |
14 | #include "Threads.h" |
15 | ||
16 | #define dout_context g_ceph_context | |
17 | #define dout_subsys ceph_subsys_rbd_mirror | |
18 | #undef dout_prefix | |
19 | #define dout_prefix *_dout << "rbd::mirror::InstanceReplayer: " \ | |
20 | << this << " " << __func__ << ": " | |
21 | ||
22 | namespace rbd { | |
23 | namespace mirror { | |
24 | ||
c07f9fc5 FG |
25 | namespace { |
26 | ||
27 | const std::string SERVICE_DAEMON_ASSIGNED_COUNT_KEY("image_assigned_count"); | |
28 | const std::string SERVICE_DAEMON_WARNING_COUNT_KEY("image_warning_count"); | |
29 | const std::string SERVICE_DAEMON_ERROR_COUNT_KEY("image_error_count"); | |
30 | ||
31 | } // anonymous namespace | |
32 | ||
7c673cae FG |
33 | using librbd::util::create_async_context_callback; |
34 | using librbd::util::create_context_callback; | |
35 | ||
36 | template <typename I> | |
37 | InstanceReplayer<I>::InstanceReplayer( | |
9f95a23c | 38 | librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid, |
c07f9fc5 | 39 | Threads<I> *threads, ServiceDaemon<I>* service_daemon, |
9f95a23c TL |
40 | MirrorStatusUpdater<I>* local_status_updater, |
41 | journal::CacheManagerHandler *cache_manager_handler, | |
42 | PoolMetaCache* pool_meta_cache) | |
43 | : m_local_io_ctx(local_io_ctx), m_local_mirror_uuid(local_mirror_uuid), | |
44 | m_threads(threads), m_service_daemon(service_daemon), | |
45 | m_local_status_updater(local_status_updater), | |
46 | m_cache_manager_handler(cache_manager_handler), | |
47 | m_pool_meta_cache(pool_meta_cache), | |
48 | m_lock(ceph::make_mutex("rbd::mirror::InstanceReplayer " + | |
49 | stringify(local_io_ctx.get_id()))) { | |
7c673cae FG |
50 | } |
51 | ||
52 | template <typename I> | |
53 | InstanceReplayer<I>::~InstanceReplayer() { | |
11fdf7f2 TL |
54 | ceph_assert(m_image_state_check_task == nullptr); |
55 | ceph_assert(m_async_op_tracker.empty()); | |
56 | ceph_assert(m_image_replayers.empty()); | |
7c673cae FG |
57 | } |
58 | ||
9f95a23c | 59 | template <typename I> |
f67539c2 | 60 | bool InstanceReplayer<I>::is_blocklisted() const { |
9f95a23c | 61 | std::lock_guard locker{m_lock}; |
f67539c2 | 62 | return m_blocklisted; |
9f95a23c TL |
63 | } |
64 | ||
7c673cae FG |
65 | template <typename I> |
66 | int InstanceReplayer<I>::init() { | |
67 | C_SaferCond init_ctx; | |
68 | init(&init_ctx); | |
69 | return init_ctx.wait(); | |
70 | } | |
71 | ||
72 | template <typename I> | |
73 | void InstanceReplayer<I>::init(Context *on_finish) { | |
11fdf7f2 | 74 | dout(10) << dendl; |
7c673cae | 75 | |
9f95a23c | 76 | Context *ctx = new LambdaContext( |
7c673cae FG |
77 | [this, on_finish] (int r) { |
78 | { | |
9f95a23c | 79 | std::lock_guard timer_locker{m_threads->timer_lock}; |
7c673cae FG |
80 | schedule_image_state_check_task(); |
81 | } | |
82 | on_finish->complete(0); | |
83 | }); | |
84 | ||
85 | m_threads->work_queue->queue(ctx, 0); | |
86 | } | |
87 | ||
88 | template <typename I> | |
89 | void InstanceReplayer<I>::shut_down() { | |
90 | C_SaferCond shut_down_ctx; | |
91 | shut_down(&shut_down_ctx); | |
92 | int r = shut_down_ctx.wait(); | |
11fdf7f2 | 93 | ceph_assert(r == 0); |
7c673cae FG |
94 | } |
95 | ||
96 | template <typename I> | |
97 | void InstanceReplayer<I>::shut_down(Context *on_finish) { | |
11fdf7f2 | 98 | dout(10) << dendl; |
7c673cae | 99 | |
9f95a23c | 100 | std::lock_guard locker{m_lock}; |
7c673cae | 101 | |
11fdf7f2 | 102 | ceph_assert(m_on_shut_down == nullptr); |
7c673cae FG |
103 | m_on_shut_down = on_finish; |
104 | ||
9f95a23c | 105 | Context *ctx = new LambdaContext( |
7c673cae FG |
106 | [this] (int r) { |
107 | cancel_image_state_check_task(); | |
108 | wait_for_ops(); | |
109 | }); | |
110 | ||
111 | m_threads->work_queue->queue(ctx, 0); | |
112 | } | |
113 | ||
114 | template <typename I> | |
9f95a23c TL |
115 | void InstanceReplayer<I>::add_peer(const Peer<I>& peer) { |
116 | dout(10) << "peer=" << peer << dendl; | |
7c673cae | 117 | |
9f95a23c TL |
118 | std::lock_guard locker{m_lock}; |
119 | auto result = m_peers.insert(peer).second; | |
11fdf7f2 | 120 | ceph_assert(result); |
7c673cae FG |
121 | } |
122 | ||
7c673cae FG |
123 | template <typename I> |
124 | void InstanceReplayer<I>::release_all(Context *on_finish) { | |
11fdf7f2 | 125 | dout(10) << dendl; |
7c673cae | 126 | |
9f95a23c | 127 | std::lock_guard locker{m_lock}; |
7c673cae FG |
128 | |
129 | C_Gather *gather_ctx = new C_Gather(g_ceph_context, on_finish); | |
130 | for (auto it = m_image_replayers.begin(); it != m_image_replayers.end(); | |
131 | it = m_image_replayers.erase(it)) { | |
132 | auto image_replayer = it->second; | |
133 | auto ctx = gather_ctx->new_sub(); | |
9f95a23c | 134 | ctx = new LambdaContext( |
7c673cae FG |
135 | [image_replayer, ctx] (int r) { |
136 | image_replayer->destroy(); | |
137 | ctx->complete(0); | |
138 | }); | |
139 | stop_image_replayer(image_replayer, ctx); | |
140 | } | |
141 | gather_ctx->activate(); | |
142 | } | |
143 | ||
144 | template <typename I> | |
31f18b77 FG |
145 | void InstanceReplayer<I>::acquire_image(InstanceWatcher<I> *instance_watcher, |
146 | const std::string &global_image_id, | |
7c673cae | 147 | Context *on_finish) { |
11fdf7f2 | 148 | dout(10) << "global_image_id=" << global_image_id << dendl; |
7c673cae | 149 | |
9f95a23c | 150 | std::lock_guard locker{m_lock}; |
7c673cae | 151 | |
11fdf7f2 | 152 | ceph_assert(m_on_shut_down == nullptr); |
7c673cae FG |
153 | |
154 | auto it = m_image_replayers.find(global_image_id); | |
7c673cae FG |
155 | if (it == m_image_replayers.end()) { |
156 | auto image_replayer = ImageReplayer<I>::create( | |
9f95a23c TL |
157 | m_local_io_ctx, m_local_mirror_uuid, global_image_id, |
158 | m_threads, instance_watcher, m_local_status_updater, | |
159 | m_cache_manager_handler, m_pool_meta_cache); | |
7c673cae | 160 | |
11fdf7f2 | 161 | dout(10) << global_image_id << ": creating replayer " << image_replayer |
7c673cae FG |
162 | << dendl; |
163 | ||
164 | it = m_image_replayers.insert(std::make_pair(global_image_id, | |
165 | image_replayer)).first; | |
d2e6a577 FG |
166 | |
167 | // TODO only a single peer is currently supported | |
11fdf7f2 | 168 | ceph_assert(m_peers.size() == 1); |
d2e6a577 | 169 | auto peer = *m_peers.begin(); |
9f95a23c | 170 | image_replayer->add_peer(peer); |
11fdf7f2 TL |
171 | start_image_replayer(image_replayer); |
172 | } else { | |
173 | // A duplicate acquire notification implies (1) connection hiccup or | |
174 | // (2) new leader election. For the second case, restart the replayer to | |
175 | // detect if the image has been deleted while the leader was offline | |
176 | auto& image_replayer = it->second; | |
177 | image_replayer->set_finished(false); | |
1911f103 | 178 | image_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr)); |
7c673cae FG |
179 | } |
180 | ||
7c673cae FG |
181 | m_threads->work_queue->queue(on_finish, 0); |
182 | } | |
183 | ||
184 | template <typename I> | |
185 | void InstanceReplayer<I>::release_image(const std::string &global_image_id, | |
7c673cae | 186 | Context *on_finish) { |
11fdf7f2 | 187 | dout(10) << "global_image_id=" << global_image_id << dendl; |
7c673cae | 188 | |
9f95a23c | 189 | std::lock_guard locker{m_lock}; |
11fdf7f2 | 190 | ceph_assert(m_on_shut_down == nullptr); |
7c673cae FG |
191 | |
192 | auto it = m_image_replayers.find(global_image_id); | |
7c673cae | 193 | if (it == m_image_replayers.end()) { |
11fdf7f2 | 194 | dout(5) << global_image_id << ": not found" << dendl; |
7c673cae FG |
195 | m_threads->work_queue->queue(on_finish, 0); |
196 | return; | |
197 | } | |
198 | ||
199 | auto image_replayer = it->second; | |
7c673cae FG |
200 | m_image_replayers.erase(it); |
201 | ||
9f95a23c | 202 | on_finish = new LambdaContext( |
7c673cae FG |
203 | [image_replayer, on_finish] (int r) { |
204 | image_replayer->destroy(); | |
205 | on_finish->complete(0); | |
206 | }); | |
d2e6a577 FG |
207 | stop_image_replayer(image_replayer, on_finish); |
208 | } | |
7c673cae | 209 | |
d2e6a577 FG |
210 | template <typename I> |
211 | void InstanceReplayer<I>::remove_peer_image(const std::string &global_image_id, | |
212 | const std::string &peer_mirror_uuid, | |
213 | Context *on_finish) { | |
11fdf7f2 | 214 | dout(10) << "global_image_id=" << global_image_id << ", " |
d2e6a577 | 215 | << "peer_mirror_uuid=" << peer_mirror_uuid << dendl; |
7c673cae | 216 | |
9f95a23c | 217 | std::lock_guard locker{m_lock}; |
11fdf7f2 | 218 | ceph_assert(m_on_shut_down == nullptr); |
d2e6a577 FG |
219 | |
220 | auto it = m_image_replayers.find(global_image_id); | |
221 | if (it != m_image_replayers.end()) { | |
222 | // TODO only a single peer is currently supported, therefore | |
223 | // we can just interrupt the current image replayer and | |
224 | // it will eventually detect that the peer image is missing and | |
225 | // determine if a delete propagation is required. | |
226 | auto image_replayer = it->second; | |
1911f103 | 227 | image_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr)); |
d2e6a577 FG |
228 | } |
229 | m_threads->work_queue->queue(on_finish, 0); | |
7c673cae FG |
230 | } |
231 | ||
232 | template <typename I> | |
9f95a23c | 233 | void InstanceReplayer<I>::print_status(Formatter *f) { |
11fdf7f2 | 234 | dout(10) << dendl; |
7c673cae | 235 | |
9f95a23c | 236 | std::lock_guard locker{m_lock}; |
7c673cae FG |
237 | |
238 | f->open_array_section("image_replayers"); | |
239 | for (auto &kv : m_image_replayers) { | |
240 | auto &image_replayer = kv.second; | |
9f95a23c | 241 | image_replayer->print_status(f); |
7c673cae FG |
242 | } |
243 | f->close_section(); | |
244 | } | |
245 | ||
246 | template <typename I> | |
247 | void InstanceReplayer<I>::start() | |
248 | { | |
11fdf7f2 | 249 | dout(10) << dendl; |
7c673cae | 250 | |
9f95a23c | 251 | std::lock_guard locker{m_lock}; |
7c673cae FG |
252 | |
253 | m_manual_stop = false; | |
254 | ||
1911f103 TL |
255 | auto cct = static_cast<CephContext *>(m_local_io_ctx.cct()); |
256 | auto gather_ctx = new C_Gather( | |
257 | cct, new C_TrackedOp(m_async_op_tracker, nullptr)); | |
7c673cae FG |
258 | for (auto &kv : m_image_replayers) { |
259 | auto &image_replayer = kv.second; | |
1911f103 | 260 | image_replayer->start(gather_ctx->new_sub(), true); |
7c673cae | 261 | } |
1911f103 TL |
262 | |
263 | gather_ctx->activate(); | |
7c673cae FG |
264 | } |
265 | ||
266 | template <typename I> | |
267 | void InstanceReplayer<I>::stop() | |
268 | { | |
1911f103 | 269 | stop(nullptr); |
7c673cae FG |
270 | } |
271 | ||
9f95a23c TL |
272 | template <typename I> |
273 | void InstanceReplayer<I>::stop(Context *on_finish) | |
274 | { | |
275 | dout(10) << dendl; | |
276 | ||
e306af50 TL |
277 | if (on_finish == nullptr) { |
278 | on_finish = new C_TrackedOp(m_async_op_tracker, on_finish); | |
279 | } else { | |
280 | on_finish = new LambdaContext( | |
281 | [this, on_finish] (int r) { | |
282 | m_async_op_tracker.wait_for_ops(on_finish); | |
283 | }); | |
284 | } | |
285 | ||
9f95a23c | 286 | auto cct = static_cast<CephContext *>(m_local_io_ctx.cct()); |
e306af50 | 287 | auto gather_ctx = new C_Gather(cct, on_finish); |
9f95a23c TL |
288 | { |
289 | std::lock_guard locker{m_lock}; | |
290 | ||
291 | m_manual_stop = true; | |
292 | ||
293 | for (auto &kv : m_image_replayers) { | |
294 | auto &image_replayer = kv.second; | |
295 | image_replayer->stop(gather_ctx->new_sub(), true); | |
296 | } | |
297 | } | |
298 | ||
299 | gather_ctx->activate(); | |
300 | } | |
301 | ||
7c673cae FG |
302 | template <typename I> |
303 | void InstanceReplayer<I>::restart() | |
304 | { | |
11fdf7f2 | 305 | dout(10) << dendl; |
7c673cae | 306 | |
9f95a23c | 307 | std::lock_guard locker{m_lock}; |
7c673cae FG |
308 | |
309 | m_manual_stop = false; | |
310 | ||
311 | for (auto &kv : m_image_replayers) { | |
312 | auto &image_replayer = kv.second; | |
1911f103 | 313 | image_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr)); |
7c673cae FG |
314 | } |
315 | } | |
316 | ||
317 | template <typename I> | |
318 | void InstanceReplayer<I>::flush() | |
319 | { | |
11fdf7f2 | 320 | dout(10) << dendl; |
7c673cae | 321 | |
9f95a23c | 322 | std::lock_guard locker{m_lock}; |
7c673cae FG |
323 | |
324 | for (auto &kv : m_image_replayers) { | |
325 | auto &image_replayer = kv.second; | |
326 | image_replayer->flush(); | |
327 | } | |
328 | } | |
329 | ||
330 | template <typename I> | |
331 | void InstanceReplayer<I>::start_image_replayer( | |
332 | ImageReplayer<I> *image_replayer) { | |
9f95a23c | 333 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae FG |
334 | |
335 | std::string global_image_id = image_replayer->get_global_image_id(); | |
7c673cae FG |
336 | if (!image_replayer->is_stopped()) { |
337 | return; | |
f67539c2 TL |
338 | } else if (image_replayer->is_blocklisted()) { |
339 | derr << "global_image_id=" << global_image_id << ": blocklisted detected " | |
11fdf7f2 | 340 | << "during image replay" << dendl; |
f67539c2 | 341 | m_blocklisted = true; |
7c673cae | 342 | return; |
d2e6a577 FG |
343 | } else if (image_replayer->is_finished()) { |
344 | // TODO temporary until policy integrated | |
345 | dout(5) << "removing image replayer for global_image_id=" | |
346 | << global_image_id << dendl; | |
347 | m_image_replayers.erase(image_replayer->get_global_image_id()); | |
348 | image_replayer->destroy(); | |
349 | return; | |
11fdf7f2 TL |
350 | } else if (m_manual_stop) { |
351 | return; | |
7c673cae FG |
352 | } |
353 | ||
11fdf7f2 | 354 | dout(10) << "global_image_id=" << global_image_id << dendl; |
1911f103 | 355 | image_replayer->start(new C_TrackedOp(m_async_op_tracker, nullptr), false); |
7c673cae FG |
356 | } |
357 | ||
358 | template <typename I> | |
c07f9fc5 | 359 | void InstanceReplayer<I>::queue_start_image_replayers() { |
11fdf7f2 | 360 | dout(10) << dendl; |
7c673cae | 361 | |
c07f9fc5 FG |
362 | Context *ctx = create_context_callback< |
363 | InstanceReplayer, &InstanceReplayer<I>::start_image_replayers>(this); | |
7c673cae FG |
364 | m_async_op_tracker.start_op(); |
365 | m_threads->work_queue->queue(ctx, 0); | |
366 | } | |
367 | ||
c07f9fc5 FG |
368 | template <typename I> |
369 | void InstanceReplayer<I>::start_image_replayers(int r) { | |
11fdf7f2 | 370 | dout(10) << dendl; |
c07f9fc5 | 371 | |
9f95a23c | 372 | std::lock_guard locker{m_lock}; |
c07f9fc5 | 373 | if (m_on_shut_down != nullptr) { |
522d829b | 374 | m_async_op_tracker.finish_op(); |
c07f9fc5 FG |
375 | return; |
376 | } | |
377 | ||
d2e6a577 FG |
378 | uint64_t image_count = 0; |
379 | uint64_t warning_count = 0; | |
380 | uint64_t error_count = 0; | |
381 | for (auto it = m_image_replayers.begin(); | |
382 | it != m_image_replayers.end();) { | |
383 | auto current_it(it); | |
384 | ++it; | |
385 | ||
c07f9fc5 | 386 | ++image_count; |
d2e6a577 | 387 | auto health_state = current_it->second->get_health_state(); |
c07f9fc5 FG |
388 | if (health_state == image_replayer::HEALTH_STATE_WARNING) { |
389 | ++warning_count; | |
390 | } else if (health_state == image_replayer::HEALTH_STATE_ERROR) { | |
391 | ++error_count; | |
392 | } | |
393 | ||
d2e6a577 | 394 | start_image_replayer(current_it->second); |
c07f9fc5 FG |
395 | } |
396 | ||
9f95a23c TL |
397 | m_service_daemon->add_or_update_namespace_attribute( |
398 | m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(), | |
399 | SERVICE_DAEMON_ASSIGNED_COUNT_KEY, image_count); | |
400 | m_service_daemon->add_or_update_namespace_attribute( | |
401 | m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(), | |
402 | SERVICE_DAEMON_WARNING_COUNT_KEY, warning_count); | |
403 | m_service_daemon->add_or_update_namespace_attribute( | |
404 | m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(), | |
405 | SERVICE_DAEMON_ERROR_COUNT_KEY, error_count); | |
c07f9fc5 FG |
406 | |
407 | m_async_op_tracker.finish_op(); | |
408 | } | |
7c673cae FG |
409 | |
410 | template <typename I> | |
411 | void InstanceReplayer<I>::stop_image_replayer(ImageReplayer<I> *image_replayer, | |
412 | Context *on_finish) { | |
11fdf7f2 | 413 | dout(10) << image_replayer << " global_image_id=" |
7c673cae FG |
414 | << image_replayer->get_global_image_id() << ", on_finish=" |
415 | << on_finish << dendl; | |
416 | ||
417 | if (image_replayer->is_stopped()) { | |
418 | m_threads->work_queue->queue(on_finish, 0); | |
419 | return; | |
420 | } | |
421 | ||
422 | m_async_op_tracker.start_op(); | |
423 | Context *ctx = create_async_context_callback( | |
9f95a23c | 424 | m_threads->work_queue, new LambdaContext( |
7c673cae FG |
425 | [this, image_replayer, on_finish] (int r) { |
426 | stop_image_replayer(image_replayer, on_finish); | |
427 | m_async_op_tracker.finish_op(); | |
428 | })); | |
429 | ||
430 | if (image_replayer->is_running()) { | |
431 | image_replayer->stop(ctx, false); | |
432 | } else { | |
433 | int after = 1; | |
11fdf7f2 | 434 | dout(10) << "scheduling image replayer " << image_replayer << " stop after " |
7c673cae | 435 | << after << " sec (task " << ctx << ")" << dendl; |
9f95a23c | 436 | ctx = new LambdaContext( |
7c673cae | 437 | [this, after, ctx] (int r) { |
9f95a23c | 438 | std::lock_guard timer_locker{m_threads->timer_lock}; |
7c673cae FG |
439 | m_threads->timer->add_event_after(after, ctx); |
440 | }); | |
441 | m_threads->work_queue->queue(ctx, 0); | |
442 | } | |
443 | } | |
444 | ||
445 | template <typename I> | |
446 | void InstanceReplayer<I>::wait_for_ops() { | |
11fdf7f2 | 447 | dout(10) << dendl; |
7c673cae FG |
448 | |
449 | Context *ctx = create_context_callback< | |
450 | InstanceReplayer, &InstanceReplayer<I>::handle_wait_for_ops>(this); | |
451 | ||
452 | m_async_op_tracker.wait_for_ops(ctx); | |
453 | } | |
454 | ||
455 | template <typename I> | |
456 | void InstanceReplayer<I>::handle_wait_for_ops(int r) { | |
11fdf7f2 | 457 | dout(10) << "r=" << r << dendl; |
7c673cae | 458 | |
11fdf7f2 | 459 | ceph_assert(r == 0); |
7c673cae | 460 | |
9f95a23c | 461 | std::lock_guard locker{m_lock}; |
7c673cae FG |
462 | stop_image_replayers(); |
463 | } | |
464 | ||
465 | template <typename I> | |
466 | void InstanceReplayer<I>::stop_image_replayers() { | |
11fdf7f2 | 467 | dout(10) << dendl; |
7c673cae | 468 | |
9f95a23c | 469 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae FG |
470 | |
471 | Context *ctx = create_async_context_callback( | |
472 | m_threads->work_queue, create_context_callback<InstanceReplayer<I>, | |
473 | &InstanceReplayer<I>::handle_stop_image_replayers>(this)); | |
474 | ||
475 | C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx); | |
476 | for (auto &it : m_image_replayers) { | |
477 | stop_image_replayer(it.second, gather_ctx->new_sub()); | |
478 | } | |
479 | gather_ctx->activate(); | |
480 | } | |
481 | ||
482 | template <typename I> | |
483 | void InstanceReplayer<I>::handle_stop_image_replayers(int r) { | |
11fdf7f2 | 484 | dout(10) << "r=" << r << dendl; |
7c673cae | 485 | |
11fdf7f2 | 486 | ceph_assert(r == 0); |
7c673cae FG |
487 | |
488 | Context *on_finish = nullptr; | |
489 | { | |
9f95a23c | 490 | std::lock_guard locker{m_lock}; |
7c673cae FG |
491 | |
492 | for (auto &it : m_image_replayers) { | |
11fdf7f2 | 493 | ceph_assert(it.second->is_stopped()); |
7c673cae FG |
494 | it.second->destroy(); |
495 | } | |
496 | m_image_replayers.clear(); | |
497 | ||
11fdf7f2 | 498 | ceph_assert(m_on_shut_down != nullptr); |
7c673cae FG |
499 | std::swap(on_finish, m_on_shut_down); |
500 | } | |
501 | on_finish->complete(r); | |
502 | } | |
503 | ||
504 | template <typename I> | |
505 | void InstanceReplayer<I>::cancel_image_state_check_task() { | |
9f95a23c | 506 | std::lock_guard timer_locker{m_threads->timer_lock}; |
7c673cae FG |
507 | |
508 | if (m_image_state_check_task == nullptr) { | |
509 | return; | |
510 | } | |
511 | ||
11fdf7f2 | 512 | dout(10) << m_image_state_check_task << dendl; |
7c673cae | 513 | bool canceled = m_threads->timer->cancel_event(m_image_state_check_task); |
11fdf7f2 | 514 | ceph_assert(canceled); |
7c673cae FG |
515 | m_image_state_check_task = nullptr; |
516 | } | |
517 | ||
518 | template <typename I> | |
519 | void InstanceReplayer<I>::schedule_image_state_check_task() { | |
9f95a23c | 520 | ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); |
11fdf7f2 | 521 | ceph_assert(m_image_state_check_task == nullptr); |
7c673cae | 522 | |
9f95a23c | 523 | m_image_state_check_task = new LambdaContext( |
7c673cae | 524 | [this](int r) { |
9f95a23c | 525 | ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); |
7c673cae FG |
526 | m_image_state_check_task = nullptr; |
527 | schedule_image_state_check_task(); | |
c07f9fc5 | 528 | queue_start_image_replayers(); |
7c673cae FG |
529 | }); |
530 | ||
9f95a23c | 531 | auto cct = static_cast<CephContext *>(m_local_io_ctx.cct()); |
11fdf7f2 | 532 | int after = cct->_conf.get_val<uint64_t>( |
181888fb | 533 | "rbd_mirror_image_state_check_interval"); |
7c673cae | 534 | |
11fdf7f2 | 535 | dout(10) << "scheduling image state check after " << after << " sec (task " |
7c673cae FG |
536 | << m_image_state_check_task << ")" << dendl; |
537 | m_threads->timer->add_event_after(after, m_image_state_check_task); | |
538 | } | |
539 | ||
540 | } // namespace mirror | |
541 | } // namespace rbd | |
542 | ||
543 | template class rbd::mirror::InstanceReplayer<librbd::ImageCtx>; |