]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/InstanceReplayer.cc
9a8b50596c6ac66909994613cbb11722d05772a9
[ceph.git] / ceph / src / tools / rbd_mirror / InstanceReplayer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/stringify.h"
5 #include "common/Cond.h"
6 #include "common/Timer.h"
7 #include "common/debug.h"
8 #include "common/errno.h"
9 #include "common/WorkQueue.h"
10 #include "librbd/Utils.h"
11 #include "ImageReplayer.h"
12 #include "InstanceReplayer.h"
13 #include "ServiceDaemon.h"
14 #include "Threads.h"
15
16 #define dout_context g_ceph_context
17 #define dout_subsys ceph_subsys_rbd_mirror
18 #undef dout_prefix
19 #define dout_prefix *_dout << "rbd::mirror::InstanceReplayer: " \
20 << this << " " << __func__ << ": "
21
22 namespace rbd {
23 namespace mirror {
24
25 namespace {
26
27 const std::string SERVICE_DAEMON_ASSIGNED_COUNT_KEY("image_assigned_count");
28 const std::string SERVICE_DAEMON_WARNING_COUNT_KEY("image_warning_count");
29 const std::string SERVICE_DAEMON_ERROR_COUNT_KEY("image_error_count");
30
31 } // anonymous namespace
32
33 using librbd::util::create_async_context_callback;
34 using librbd::util::create_context_callback;
35
36 template <typename I>
37 InstanceReplayer<I>::InstanceReplayer(
38 librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid,
39 Threads<I> *threads, ServiceDaemon<I>* service_daemon,
40 MirrorStatusUpdater<I>* local_status_updater,
41 journal::CacheManagerHandler *cache_manager_handler,
42 PoolMetaCache* pool_meta_cache)
43 : m_local_io_ctx(local_io_ctx), m_local_mirror_uuid(local_mirror_uuid),
44 m_threads(threads), m_service_daemon(service_daemon),
45 m_local_status_updater(local_status_updater),
46 m_cache_manager_handler(cache_manager_handler),
47 m_pool_meta_cache(pool_meta_cache),
48 m_lock(ceph::make_mutex("rbd::mirror::InstanceReplayer " +
49 stringify(local_io_ctx.get_id()))) {
50 }
51
52 template <typename I>
53 InstanceReplayer<I>::~InstanceReplayer() {
54 ceph_assert(m_image_state_check_task == nullptr);
55 ceph_assert(m_async_op_tracker.empty());
56 ceph_assert(m_image_replayers.empty());
57 }
58
59 template <typename I>
60 bool InstanceReplayer<I>::is_blacklisted() const {
61 std::lock_guard locker{m_lock};
62 return m_blacklisted;
63 }
64
65 template <typename I>
66 int InstanceReplayer<I>::init() {
67 C_SaferCond init_ctx;
68 init(&init_ctx);
69 return init_ctx.wait();
70 }
71
72 template <typename I>
73 void InstanceReplayer<I>::init(Context *on_finish) {
74 dout(10) << dendl;
75
76 Context *ctx = new LambdaContext(
77 [this, on_finish] (int r) {
78 {
79 std::lock_guard timer_locker{m_threads->timer_lock};
80 schedule_image_state_check_task();
81 }
82 on_finish->complete(0);
83 });
84
85 m_threads->work_queue->queue(ctx, 0);
86 }
87
88 template <typename I>
89 void InstanceReplayer<I>::shut_down() {
90 C_SaferCond shut_down_ctx;
91 shut_down(&shut_down_ctx);
92 int r = shut_down_ctx.wait();
93 ceph_assert(r == 0);
94 }
95
96 template <typename I>
97 void InstanceReplayer<I>::shut_down(Context *on_finish) {
98 dout(10) << dendl;
99
100 std::lock_guard locker{m_lock};
101
102 ceph_assert(m_on_shut_down == nullptr);
103 m_on_shut_down = on_finish;
104
105 Context *ctx = new LambdaContext(
106 [this] (int r) {
107 cancel_image_state_check_task();
108 wait_for_ops();
109 });
110
111 m_threads->work_queue->queue(ctx, 0);
112 }
113
114 template <typename I>
115 void InstanceReplayer<I>::add_peer(const Peer<I>& peer) {
116 dout(10) << "peer=" << peer << dendl;
117
118 std::lock_guard locker{m_lock};
119 auto result = m_peers.insert(peer).second;
120 ceph_assert(result);
121 }
122
123 template <typename I>
124 void InstanceReplayer<I>::release_all(Context *on_finish) {
125 dout(10) << dendl;
126
127 std::lock_guard locker{m_lock};
128
129 C_Gather *gather_ctx = new C_Gather(g_ceph_context, on_finish);
130 for (auto it = m_image_replayers.begin(); it != m_image_replayers.end();
131 it = m_image_replayers.erase(it)) {
132 auto image_replayer = it->second;
133 auto ctx = gather_ctx->new_sub();
134 ctx = new LambdaContext(
135 [image_replayer, ctx] (int r) {
136 image_replayer->destroy();
137 ctx->complete(0);
138 });
139 stop_image_replayer(image_replayer, ctx);
140 }
141 gather_ctx->activate();
142 }
143
144 template <typename I>
145 void InstanceReplayer<I>::acquire_image(InstanceWatcher<I> *instance_watcher,
146 const std::string &global_image_id,
147 Context *on_finish) {
148 dout(10) << "global_image_id=" << global_image_id << dendl;
149
150 std::lock_guard locker{m_lock};
151
152 ceph_assert(m_on_shut_down == nullptr);
153
154 auto it = m_image_replayers.find(global_image_id);
155 if (it == m_image_replayers.end()) {
156 auto image_replayer = ImageReplayer<I>::create(
157 m_local_io_ctx, m_local_mirror_uuid, global_image_id,
158 m_threads, instance_watcher, m_local_status_updater,
159 m_cache_manager_handler, m_pool_meta_cache);
160
161 dout(10) << global_image_id << ": creating replayer " << image_replayer
162 << dendl;
163
164 it = m_image_replayers.insert(std::make_pair(global_image_id,
165 image_replayer)).first;
166
167 // TODO only a single peer is currently supported
168 ceph_assert(m_peers.size() == 1);
169 auto peer = *m_peers.begin();
170 image_replayer->add_peer(peer);
171 start_image_replayer(image_replayer);
172 } else {
173 // A duplicate acquire notification implies (1) connection hiccup or
174 // (2) new leader election. For the second case, restart the replayer to
175 // detect if the image has been deleted while the leader was offline
176 auto& image_replayer = it->second;
177 image_replayer->set_finished(false);
178 image_replayer->restart();
179 }
180
181 m_threads->work_queue->queue(on_finish, 0);
182 }
183
184 template <typename I>
185 void InstanceReplayer<I>::release_image(const std::string &global_image_id,
186 Context *on_finish) {
187 dout(10) << "global_image_id=" << global_image_id << dendl;
188
189 std::lock_guard locker{m_lock};
190 ceph_assert(m_on_shut_down == nullptr);
191
192 auto it = m_image_replayers.find(global_image_id);
193 if (it == m_image_replayers.end()) {
194 dout(5) << global_image_id << ": not found" << dendl;
195 m_threads->work_queue->queue(on_finish, 0);
196 return;
197 }
198
199 auto image_replayer = it->second;
200 m_image_replayers.erase(it);
201
202 on_finish = new LambdaContext(
203 [image_replayer, on_finish] (int r) {
204 image_replayer->destroy();
205 on_finish->complete(0);
206 });
207 stop_image_replayer(image_replayer, on_finish);
208 }
209
210 template <typename I>
211 void InstanceReplayer<I>::remove_peer_image(const std::string &global_image_id,
212 const std::string &peer_mirror_uuid,
213 Context *on_finish) {
214 dout(10) << "global_image_id=" << global_image_id << ", "
215 << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
216
217 std::lock_guard locker{m_lock};
218 ceph_assert(m_on_shut_down == nullptr);
219
220 auto it = m_image_replayers.find(global_image_id);
221 if (it != m_image_replayers.end()) {
222 // TODO only a single peer is currently supported, therefore
223 // we can just interrupt the current image replayer and
224 // it will eventually detect that the peer image is missing and
225 // determine if a delete propagation is required.
226 auto image_replayer = it->second;
227 image_replayer->restart();
228 }
229 m_threads->work_queue->queue(on_finish, 0);
230 }
231
232 template <typename I>
233 void InstanceReplayer<I>::print_status(Formatter *f) {
234 dout(10) << dendl;
235
236 std::lock_guard locker{m_lock};
237
238 f->open_array_section("image_replayers");
239 for (auto &kv : m_image_replayers) {
240 auto &image_replayer = kv.second;
241 image_replayer->print_status(f);
242 }
243 f->close_section();
244 }
245
246 template <typename I>
247 void InstanceReplayer<I>::start()
248 {
249 dout(10) << dendl;
250
251 std::lock_guard locker{m_lock};
252
253 m_manual_stop = false;
254
255 for (auto &kv : m_image_replayers) {
256 auto &image_replayer = kv.second;
257 image_replayer->start(nullptr, true);
258 }
259 }
260
261 template <typename I>
262 void InstanceReplayer<I>::stop()
263 {
264 dout(10) << dendl;
265
266 std::lock_guard locker{m_lock};
267
268 m_manual_stop = true;
269
270 for (auto &kv : m_image_replayers) {
271 auto &image_replayer = kv.second;
272 image_replayer->stop(nullptr, true);
273 }
274 }
275
276 template <typename I>
277 void InstanceReplayer<I>::stop(Context *on_finish)
278 {
279 dout(10) << dendl;
280
281 auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
282 auto gather_ctx = new C_Gather(cct, on_finish);
283 {
284 std::lock_guard locker{m_lock};
285
286 m_manual_stop = true;
287
288 for (auto &kv : m_image_replayers) {
289 auto &image_replayer = kv.second;
290 image_replayer->stop(gather_ctx->new_sub(), true);
291 }
292 }
293
294 gather_ctx->activate();
295 }
296
297 template <typename I>
298 void InstanceReplayer<I>::restart()
299 {
300 dout(10) << dendl;
301
302 std::lock_guard locker{m_lock};
303
304 m_manual_stop = false;
305
306 for (auto &kv : m_image_replayers) {
307 auto &image_replayer = kv.second;
308 image_replayer->restart();
309 }
310 }
311
312 template <typename I>
313 void InstanceReplayer<I>::flush()
314 {
315 dout(10) << dendl;
316
317 std::lock_guard locker{m_lock};
318
319 for (auto &kv : m_image_replayers) {
320 auto &image_replayer = kv.second;
321 image_replayer->flush();
322 }
323 }
324
325 template <typename I>
326 void InstanceReplayer<I>::start_image_replayer(
327 ImageReplayer<I> *image_replayer) {
328 ceph_assert(ceph_mutex_is_locked(m_lock));
329
330 std::string global_image_id = image_replayer->get_global_image_id();
331 if (!image_replayer->is_stopped()) {
332 return;
333 } else if (image_replayer->is_blacklisted()) {
334 derr << "global_image_id=" << global_image_id << ": blacklisted detected "
335 << "during image replay" << dendl;
336 m_blacklisted = true;
337 return;
338 } else if (image_replayer->is_finished()) {
339 // TODO temporary until policy integrated
340 dout(5) << "removing image replayer for global_image_id="
341 << global_image_id << dendl;
342 m_image_replayers.erase(image_replayer->get_global_image_id());
343 image_replayer->destroy();
344 return;
345 } else if (m_manual_stop) {
346 return;
347 }
348
349 dout(10) << "global_image_id=" << global_image_id << dendl;
350 image_replayer->start(nullptr, false);
351 }
352
353 template <typename I>
354 void InstanceReplayer<I>::queue_start_image_replayers() {
355 dout(10) << dendl;
356
357 Context *ctx = create_context_callback<
358 InstanceReplayer, &InstanceReplayer<I>::start_image_replayers>(this);
359 m_async_op_tracker.start_op();
360 m_threads->work_queue->queue(ctx, 0);
361 }
362
363 template <typename I>
364 void InstanceReplayer<I>::start_image_replayers(int r) {
365 dout(10) << dendl;
366
367 std::lock_guard locker{m_lock};
368 if (m_on_shut_down != nullptr) {
369 return;
370 }
371
372 uint64_t image_count = 0;
373 uint64_t warning_count = 0;
374 uint64_t error_count = 0;
375 for (auto it = m_image_replayers.begin();
376 it != m_image_replayers.end();) {
377 auto current_it(it);
378 ++it;
379
380 ++image_count;
381 auto health_state = current_it->second->get_health_state();
382 if (health_state == image_replayer::HEALTH_STATE_WARNING) {
383 ++warning_count;
384 } else if (health_state == image_replayer::HEALTH_STATE_ERROR) {
385 ++error_count;
386 }
387
388 start_image_replayer(current_it->second);
389 }
390
391 m_service_daemon->add_or_update_namespace_attribute(
392 m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
393 SERVICE_DAEMON_ASSIGNED_COUNT_KEY, image_count);
394 m_service_daemon->add_or_update_namespace_attribute(
395 m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
396 SERVICE_DAEMON_WARNING_COUNT_KEY, warning_count);
397 m_service_daemon->add_or_update_namespace_attribute(
398 m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
399 SERVICE_DAEMON_ERROR_COUNT_KEY, error_count);
400
401 m_async_op_tracker.finish_op();
402 }
403
404 template <typename I>
405 void InstanceReplayer<I>::stop_image_replayer(ImageReplayer<I> *image_replayer,
406 Context *on_finish) {
407 dout(10) << image_replayer << " global_image_id="
408 << image_replayer->get_global_image_id() << ", on_finish="
409 << on_finish << dendl;
410
411 if (image_replayer->is_stopped()) {
412 m_threads->work_queue->queue(on_finish, 0);
413 return;
414 }
415
416 m_async_op_tracker.start_op();
417 Context *ctx = create_async_context_callback(
418 m_threads->work_queue, new LambdaContext(
419 [this, image_replayer, on_finish] (int r) {
420 stop_image_replayer(image_replayer, on_finish);
421 m_async_op_tracker.finish_op();
422 }));
423
424 if (image_replayer->is_running()) {
425 image_replayer->stop(ctx, false);
426 } else {
427 int after = 1;
428 dout(10) << "scheduling image replayer " << image_replayer << " stop after "
429 << after << " sec (task " << ctx << ")" << dendl;
430 ctx = new LambdaContext(
431 [this, after, ctx] (int r) {
432 std::lock_guard timer_locker{m_threads->timer_lock};
433 m_threads->timer->add_event_after(after, ctx);
434 });
435 m_threads->work_queue->queue(ctx, 0);
436 }
437 }
438
439 template <typename I>
440 void InstanceReplayer<I>::wait_for_ops() {
441 dout(10) << dendl;
442
443 Context *ctx = create_context_callback<
444 InstanceReplayer, &InstanceReplayer<I>::handle_wait_for_ops>(this);
445
446 m_async_op_tracker.wait_for_ops(ctx);
447 }
448
449 template <typename I>
450 void InstanceReplayer<I>::handle_wait_for_ops(int r) {
451 dout(10) << "r=" << r << dendl;
452
453 ceph_assert(r == 0);
454
455 std::lock_guard locker{m_lock};
456 stop_image_replayers();
457 }
458
459 template <typename I>
460 void InstanceReplayer<I>::stop_image_replayers() {
461 dout(10) << dendl;
462
463 ceph_assert(ceph_mutex_is_locked(m_lock));
464
465 Context *ctx = create_async_context_callback(
466 m_threads->work_queue, create_context_callback<InstanceReplayer<I>,
467 &InstanceReplayer<I>::handle_stop_image_replayers>(this));
468
469 C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
470 for (auto &it : m_image_replayers) {
471 stop_image_replayer(it.second, gather_ctx->new_sub());
472 }
473 gather_ctx->activate();
474 }
475
476 template <typename I>
477 void InstanceReplayer<I>::handle_stop_image_replayers(int r) {
478 dout(10) << "r=" << r << dendl;
479
480 ceph_assert(r == 0);
481
482 Context *on_finish = nullptr;
483 {
484 std::lock_guard locker{m_lock};
485
486 for (auto &it : m_image_replayers) {
487 ceph_assert(it.second->is_stopped());
488 it.second->destroy();
489 }
490 m_image_replayers.clear();
491
492 ceph_assert(m_on_shut_down != nullptr);
493 std::swap(on_finish, m_on_shut_down);
494 }
495 on_finish->complete(r);
496 }
497
498 template <typename I>
499 void InstanceReplayer<I>::cancel_image_state_check_task() {
500 std::lock_guard timer_locker{m_threads->timer_lock};
501
502 if (m_image_state_check_task == nullptr) {
503 return;
504 }
505
506 dout(10) << m_image_state_check_task << dendl;
507 bool canceled = m_threads->timer->cancel_event(m_image_state_check_task);
508 ceph_assert(canceled);
509 m_image_state_check_task = nullptr;
510 }
511
512 template <typename I>
513 void InstanceReplayer<I>::schedule_image_state_check_task() {
514 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
515 ceph_assert(m_image_state_check_task == nullptr);
516
517 m_image_state_check_task = new LambdaContext(
518 [this](int r) {
519 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
520 m_image_state_check_task = nullptr;
521 schedule_image_state_check_task();
522 queue_start_image_replayers();
523 });
524
525 auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
526 int after = cct->_conf.get_val<uint64_t>(
527 "rbd_mirror_image_state_check_interval");
528
529 dout(10) << "scheduling image state check after " << after << " sec (task "
530 << m_image_state_check_task << ")" << dendl;
531 m_threads->timer->add_event_after(after, m_image_state_check_task);
532 }
533
534 } // namespace mirror
535 } // namespace rbd
536
537 template class rbd::mirror::InstanceReplayer<librbd::ImageCtx>;