]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/InstanceReplayer.cc
4ef838fa43ebbfcd1671d07231b01b153c7d035a
[ceph.git] / ceph / src / tools / rbd_mirror / InstanceReplayer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/stringify.h"
5 #include "common/Cond.h"
6 #include "common/Timer.h"
7 #include "common/debug.h"
8 #include "common/errno.h"
9 #include "common/WorkQueue.h"
10 #include "librbd/Utils.h"
11 #include "ImageReplayer.h"
12 #include "InstanceReplayer.h"
13 #include "ServiceDaemon.h"
14 #include "Threads.h"
15
16 #define dout_context g_ceph_context
17 #define dout_subsys ceph_subsys_rbd_mirror
18 #undef dout_prefix
19 #define dout_prefix *_dout << "rbd::mirror::InstanceReplayer: " \
20 << this << " " << __func__ << ": "
21
22 namespace rbd {
23 namespace mirror {
24
25 namespace {
26
27 const std::string SERVICE_DAEMON_ASSIGNED_COUNT_KEY("image_assigned_count");
28 const std::string SERVICE_DAEMON_WARNING_COUNT_KEY("image_warning_count");
29 const std::string SERVICE_DAEMON_ERROR_COUNT_KEY("image_error_count");
30
31 } // anonymous namespace
32
33 using librbd::util::create_async_context_callback;
34 using librbd::util::create_context_callback;
35
36 template <typename I>
37 InstanceReplayer<I>::InstanceReplayer(
38 librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid,
39 Threads<I> *threads, ServiceDaemon<I>* service_daemon,
40 MirrorStatusUpdater<I>* local_status_updater,
41 journal::CacheManagerHandler *cache_manager_handler,
42 PoolMetaCache* pool_meta_cache)
43 : m_local_io_ctx(local_io_ctx), m_local_mirror_uuid(local_mirror_uuid),
44 m_threads(threads), m_service_daemon(service_daemon),
45 m_local_status_updater(local_status_updater),
46 m_cache_manager_handler(cache_manager_handler),
47 m_pool_meta_cache(pool_meta_cache),
48 m_lock(ceph::make_mutex("rbd::mirror::InstanceReplayer " +
49 stringify(local_io_ctx.get_id()))) {
50 }
51
52 template <typename I>
53 InstanceReplayer<I>::~InstanceReplayer() {
54 ceph_assert(m_image_state_check_task == nullptr);
55 ceph_assert(m_async_op_tracker.empty());
56 ceph_assert(m_image_replayers.empty());
57 }
58
59 template <typename I>
60 bool InstanceReplayer<I>::is_blacklisted() const {
61 std::lock_guard locker{m_lock};
62 return m_blacklisted;
63 }
64
65 template <typename I>
66 int InstanceReplayer<I>::init() {
67 C_SaferCond init_ctx;
68 init(&init_ctx);
69 return init_ctx.wait();
70 }
71
72 template <typename I>
73 void InstanceReplayer<I>::init(Context *on_finish) {
74 dout(10) << dendl;
75
76 Context *ctx = new LambdaContext(
77 [this, on_finish] (int r) {
78 {
79 std::lock_guard timer_locker{m_threads->timer_lock};
80 schedule_image_state_check_task();
81 }
82 on_finish->complete(0);
83 });
84
85 m_threads->work_queue->queue(ctx, 0);
86 }
87
88 template <typename I>
89 void InstanceReplayer<I>::shut_down() {
90 C_SaferCond shut_down_ctx;
91 shut_down(&shut_down_ctx);
92 int r = shut_down_ctx.wait();
93 ceph_assert(r == 0);
94 }
95
96 template <typename I>
97 void InstanceReplayer<I>::shut_down(Context *on_finish) {
98 dout(10) << dendl;
99
100 std::lock_guard locker{m_lock};
101
102 ceph_assert(m_on_shut_down == nullptr);
103 m_on_shut_down = on_finish;
104
105 Context *ctx = new LambdaContext(
106 [this] (int r) {
107 cancel_image_state_check_task();
108 wait_for_ops();
109 });
110
111 m_threads->work_queue->queue(ctx, 0);
112 }
113
114 template <typename I>
115 void InstanceReplayer<I>::add_peer(const Peer<I>& peer) {
116 dout(10) << "peer=" << peer << dendl;
117
118 std::lock_guard locker{m_lock};
119 auto result = m_peers.insert(peer).second;
120 ceph_assert(result);
121 }
122
123 template <typename I>
124 void InstanceReplayer<I>::release_all(Context *on_finish) {
125 dout(10) << dendl;
126
127 std::lock_guard locker{m_lock};
128
129 C_Gather *gather_ctx = new C_Gather(g_ceph_context, on_finish);
130 for (auto it = m_image_replayers.begin(); it != m_image_replayers.end();
131 it = m_image_replayers.erase(it)) {
132 auto image_replayer = it->second;
133 auto ctx = gather_ctx->new_sub();
134 ctx = new LambdaContext(
135 [image_replayer, ctx] (int r) {
136 image_replayer->destroy();
137 ctx->complete(0);
138 });
139 stop_image_replayer(image_replayer, ctx);
140 }
141 gather_ctx->activate();
142 }
143
144 template <typename I>
145 void InstanceReplayer<I>::acquire_image(InstanceWatcher<I> *instance_watcher,
146 const std::string &global_image_id,
147 Context *on_finish) {
148 dout(10) << "global_image_id=" << global_image_id << dendl;
149
150 std::lock_guard locker{m_lock};
151
152 ceph_assert(m_on_shut_down == nullptr);
153
154 auto it = m_image_replayers.find(global_image_id);
155 if (it == m_image_replayers.end()) {
156 auto image_replayer = ImageReplayer<I>::create(
157 m_local_io_ctx, m_local_mirror_uuid, global_image_id,
158 m_threads, instance_watcher, m_local_status_updater,
159 m_cache_manager_handler, m_pool_meta_cache);
160
161 dout(10) << global_image_id << ": creating replayer " << image_replayer
162 << dendl;
163
164 it = m_image_replayers.insert(std::make_pair(global_image_id,
165 image_replayer)).first;
166
167 // TODO only a single peer is currently supported
168 ceph_assert(m_peers.size() == 1);
169 auto peer = *m_peers.begin();
170 image_replayer->add_peer(peer);
171 start_image_replayer(image_replayer);
172 } else {
173 // A duplicate acquire notification implies (1) connection hiccup or
174 // (2) new leader election. For the second case, restart the replayer to
175 // detect if the image has been deleted while the leader was offline
176 auto& image_replayer = it->second;
177 image_replayer->set_finished(false);
178 image_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr));
179 }
180
181 m_threads->work_queue->queue(on_finish, 0);
182 }
183
184 template <typename I>
185 void InstanceReplayer<I>::release_image(const std::string &global_image_id,
186 Context *on_finish) {
187 dout(10) << "global_image_id=" << global_image_id << dendl;
188
189 std::lock_guard locker{m_lock};
190 ceph_assert(m_on_shut_down == nullptr);
191
192 auto it = m_image_replayers.find(global_image_id);
193 if (it == m_image_replayers.end()) {
194 dout(5) << global_image_id << ": not found" << dendl;
195 m_threads->work_queue->queue(on_finish, 0);
196 return;
197 }
198
199 auto image_replayer = it->second;
200 m_image_replayers.erase(it);
201
202 on_finish = new LambdaContext(
203 [image_replayer, on_finish] (int r) {
204 image_replayer->destroy();
205 on_finish->complete(0);
206 });
207 stop_image_replayer(image_replayer, on_finish);
208 }
209
210 template <typename I>
211 void InstanceReplayer<I>::remove_peer_image(const std::string &global_image_id,
212 const std::string &peer_mirror_uuid,
213 Context *on_finish) {
214 dout(10) << "global_image_id=" << global_image_id << ", "
215 << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
216
217 std::lock_guard locker{m_lock};
218 ceph_assert(m_on_shut_down == nullptr);
219
220 auto it = m_image_replayers.find(global_image_id);
221 if (it != m_image_replayers.end()) {
222 // TODO only a single peer is currently supported, therefore
223 // we can just interrupt the current image replayer and
224 // it will eventually detect that the peer image is missing and
225 // determine if a delete propagation is required.
226 auto image_replayer = it->second;
227 image_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr));
228 }
229 m_threads->work_queue->queue(on_finish, 0);
230 }
231
232 template <typename I>
233 void InstanceReplayer<I>::print_status(Formatter *f) {
234 dout(10) << dendl;
235
236 std::lock_guard locker{m_lock};
237
238 f->open_array_section("image_replayers");
239 for (auto &kv : m_image_replayers) {
240 auto &image_replayer = kv.second;
241 image_replayer->print_status(f);
242 }
243 f->close_section();
244 }
245
246 template <typename I>
247 void InstanceReplayer<I>::start()
248 {
249 dout(10) << dendl;
250
251 std::lock_guard locker{m_lock};
252
253 m_manual_stop = false;
254
255 auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
256 auto gather_ctx = new C_Gather(
257 cct, new C_TrackedOp(m_async_op_tracker, nullptr));
258 for (auto &kv : m_image_replayers) {
259 auto &image_replayer = kv.second;
260 image_replayer->start(gather_ctx->new_sub(), true);
261 }
262
263 gather_ctx->activate();
264 }
265
266 template <typename I>
267 void InstanceReplayer<I>::stop()
268 {
269 stop(nullptr);
270 }
271
272 template <typename I>
273 void InstanceReplayer<I>::stop(Context *on_finish)
274 {
275 dout(10) << dendl;
276
277 if (on_finish == nullptr) {
278 on_finish = new C_TrackedOp(m_async_op_tracker, on_finish);
279 } else {
280 on_finish = new LambdaContext(
281 [this, on_finish] (int r) {
282 m_async_op_tracker.wait_for_ops(on_finish);
283 });
284 }
285
286 auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
287 auto gather_ctx = new C_Gather(cct, on_finish);
288 {
289 std::lock_guard locker{m_lock};
290
291 m_manual_stop = true;
292
293 for (auto &kv : m_image_replayers) {
294 auto &image_replayer = kv.second;
295 image_replayer->stop(gather_ctx->new_sub(), true);
296 }
297 }
298
299 gather_ctx->activate();
300 }
301
302 template <typename I>
303 void InstanceReplayer<I>::restart()
304 {
305 dout(10) << dendl;
306
307 std::lock_guard locker{m_lock};
308
309 m_manual_stop = false;
310
311 for (auto &kv : m_image_replayers) {
312 auto &image_replayer = kv.second;
313 image_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr));
314 }
315 }
316
317 template <typename I>
318 void InstanceReplayer<I>::flush()
319 {
320 dout(10) << dendl;
321
322 std::lock_guard locker{m_lock};
323
324 for (auto &kv : m_image_replayers) {
325 auto &image_replayer = kv.second;
326 image_replayer->flush();
327 }
328 }
329
330 template <typename I>
331 void InstanceReplayer<I>::start_image_replayer(
332 ImageReplayer<I> *image_replayer) {
333 ceph_assert(ceph_mutex_is_locked(m_lock));
334
335 std::string global_image_id = image_replayer->get_global_image_id();
336 if (!image_replayer->is_stopped()) {
337 return;
338 } else if (image_replayer->is_blacklisted()) {
339 derr << "global_image_id=" << global_image_id << ": blacklisted detected "
340 << "during image replay" << dendl;
341 m_blacklisted = true;
342 return;
343 } else if (image_replayer->is_finished()) {
344 // TODO temporary until policy integrated
345 dout(5) << "removing image replayer for global_image_id="
346 << global_image_id << dendl;
347 m_image_replayers.erase(image_replayer->get_global_image_id());
348 image_replayer->destroy();
349 return;
350 } else if (m_manual_stop) {
351 return;
352 }
353
354 dout(10) << "global_image_id=" << global_image_id << dendl;
355 image_replayer->start(new C_TrackedOp(m_async_op_tracker, nullptr), false);
356 }
357
358 template <typename I>
359 void InstanceReplayer<I>::queue_start_image_replayers() {
360 dout(10) << dendl;
361
362 Context *ctx = create_context_callback<
363 InstanceReplayer, &InstanceReplayer<I>::start_image_replayers>(this);
364 m_async_op_tracker.start_op();
365 m_threads->work_queue->queue(ctx, 0);
366 }
367
368 template <typename I>
369 void InstanceReplayer<I>::start_image_replayers(int r) {
370 dout(10) << dendl;
371
372 std::lock_guard locker{m_lock};
373 if (m_on_shut_down != nullptr) {
374 return;
375 }
376
377 uint64_t image_count = 0;
378 uint64_t warning_count = 0;
379 uint64_t error_count = 0;
380 for (auto it = m_image_replayers.begin();
381 it != m_image_replayers.end();) {
382 auto current_it(it);
383 ++it;
384
385 ++image_count;
386 auto health_state = current_it->second->get_health_state();
387 if (health_state == image_replayer::HEALTH_STATE_WARNING) {
388 ++warning_count;
389 } else if (health_state == image_replayer::HEALTH_STATE_ERROR) {
390 ++error_count;
391 }
392
393 start_image_replayer(current_it->second);
394 }
395
396 m_service_daemon->add_or_update_namespace_attribute(
397 m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
398 SERVICE_DAEMON_ASSIGNED_COUNT_KEY, image_count);
399 m_service_daemon->add_or_update_namespace_attribute(
400 m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
401 SERVICE_DAEMON_WARNING_COUNT_KEY, warning_count);
402 m_service_daemon->add_or_update_namespace_attribute(
403 m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
404 SERVICE_DAEMON_ERROR_COUNT_KEY, error_count);
405
406 m_async_op_tracker.finish_op();
407 }
408
409 template <typename I>
410 void InstanceReplayer<I>::stop_image_replayer(ImageReplayer<I> *image_replayer,
411 Context *on_finish) {
412 dout(10) << image_replayer << " global_image_id="
413 << image_replayer->get_global_image_id() << ", on_finish="
414 << on_finish << dendl;
415
416 if (image_replayer->is_stopped()) {
417 m_threads->work_queue->queue(on_finish, 0);
418 return;
419 }
420
421 m_async_op_tracker.start_op();
422 Context *ctx = create_async_context_callback(
423 m_threads->work_queue, new LambdaContext(
424 [this, image_replayer, on_finish] (int r) {
425 stop_image_replayer(image_replayer, on_finish);
426 m_async_op_tracker.finish_op();
427 }));
428
429 if (image_replayer->is_running()) {
430 image_replayer->stop(ctx, false);
431 } else {
432 int after = 1;
433 dout(10) << "scheduling image replayer " << image_replayer << " stop after "
434 << after << " sec (task " << ctx << ")" << dendl;
435 ctx = new LambdaContext(
436 [this, after, ctx] (int r) {
437 std::lock_guard timer_locker{m_threads->timer_lock};
438 m_threads->timer->add_event_after(after, ctx);
439 });
440 m_threads->work_queue->queue(ctx, 0);
441 }
442 }
443
444 template <typename I>
445 void InstanceReplayer<I>::wait_for_ops() {
446 dout(10) << dendl;
447
448 Context *ctx = create_context_callback<
449 InstanceReplayer, &InstanceReplayer<I>::handle_wait_for_ops>(this);
450
451 m_async_op_tracker.wait_for_ops(ctx);
452 }
453
454 template <typename I>
455 void InstanceReplayer<I>::handle_wait_for_ops(int r) {
456 dout(10) << "r=" << r << dendl;
457
458 ceph_assert(r == 0);
459
460 std::lock_guard locker{m_lock};
461 stop_image_replayers();
462 }
463
464 template <typename I>
465 void InstanceReplayer<I>::stop_image_replayers() {
466 dout(10) << dendl;
467
468 ceph_assert(ceph_mutex_is_locked(m_lock));
469
470 Context *ctx = create_async_context_callback(
471 m_threads->work_queue, create_context_callback<InstanceReplayer<I>,
472 &InstanceReplayer<I>::handle_stop_image_replayers>(this));
473
474 C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
475 for (auto &it : m_image_replayers) {
476 stop_image_replayer(it.second, gather_ctx->new_sub());
477 }
478 gather_ctx->activate();
479 }
480
481 template <typename I>
482 void InstanceReplayer<I>::handle_stop_image_replayers(int r) {
483 dout(10) << "r=" << r << dendl;
484
485 ceph_assert(r == 0);
486
487 Context *on_finish = nullptr;
488 {
489 std::lock_guard locker{m_lock};
490
491 for (auto &it : m_image_replayers) {
492 ceph_assert(it.second->is_stopped());
493 it.second->destroy();
494 }
495 m_image_replayers.clear();
496
497 ceph_assert(m_on_shut_down != nullptr);
498 std::swap(on_finish, m_on_shut_down);
499 }
500 on_finish->complete(r);
501 }
502
503 template <typename I>
504 void InstanceReplayer<I>::cancel_image_state_check_task() {
505 std::lock_guard timer_locker{m_threads->timer_lock};
506
507 if (m_image_state_check_task == nullptr) {
508 return;
509 }
510
511 dout(10) << m_image_state_check_task << dendl;
512 bool canceled = m_threads->timer->cancel_event(m_image_state_check_task);
513 ceph_assert(canceled);
514 m_image_state_check_task = nullptr;
515 }
516
517 template <typename I>
518 void InstanceReplayer<I>::schedule_image_state_check_task() {
519 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
520 ceph_assert(m_image_state_check_task == nullptr);
521
522 m_image_state_check_task = new LambdaContext(
523 [this](int r) {
524 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
525 m_image_state_check_task = nullptr;
526 schedule_image_state_check_task();
527 queue_start_image_replayers();
528 });
529
530 auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
531 int after = cct->_conf.get_val<uint64_t>(
532 "rbd_mirror_image_state_check_interval");
533
534 dout(10) << "scheduling image state check after " << after << " sec (task "
535 << m_image_state_check_task << ")" << dendl;
536 m_threads->timer->add_event_after(after, m_image_state_check_task);
537 }
538
539 } // namespace mirror
540 } // namespace rbd
541
542 template class rbd::mirror::InstanceReplayer<librbd::ImageCtx>;