]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/InstanceReplayer.cc
95bb67129ddb6c7749c3fdd104b8b9a184d0cc6e
[ceph.git] / ceph / src / tools / rbd_mirror / InstanceReplayer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/stringify.h"
5 #include "common/Cond.h"
6 #include "common/Timer.h"
7 #include "common/debug.h"
8 #include "common/errno.h"
9 #include "common/WorkQueue.h"
10 #include "librbd/Utils.h"
11 #include "ImageReplayer.h"
12 #include "InstanceReplayer.h"
13 #include "ServiceDaemon.h"
14 #include "Threads.h"
15
16 #define dout_context g_ceph_context
17 #define dout_subsys ceph_subsys_rbd_mirror
18 #undef dout_prefix
19 #define dout_prefix *_dout << "rbd::mirror::InstanceReplayer: " \
20 << this << " " << __func__ << ": "
21
22 namespace rbd {
23 namespace mirror {
24
25 namespace {
26
27 const std::string SERVICE_DAEMON_ASSIGNED_COUNT_KEY("image_assigned_count");
28 const std::string SERVICE_DAEMON_WARNING_COUNT_KEY("image_warning_count");
29 const std::string SERVICE_DAEMON_ERROR_COUNT_KEY("image_error_count");
30
31 } // anonymous namespace
32
33 using librbd::util::create_async_context_callback;
34 using librbd::util::create_context_callback;
35
36 template <typename I>
37 InstanceReplayer<I>::InstanceReplayer(
38 librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid,
39 Threads<I> *threads, ServiceDaemon<I>* service_daemon,
40 MirrorStatusUpdater<I>* local_status_updater,
41 journal::CacheManagerHandler *cache_manager_handler,
42 PoolMetaCache* pool_meta_cache)
43 : m_local_io_ctx(local_io_ctx), m_local_mirror_uuid(local_mirror_uuid),
44 m_threads(threads), m_service_daemon(service_daemon),
45 m_local_status_updater(local_status_updater),
46 m_cache_manager_handler(cache_manager_handler),
47 m_pool_meta_cache(pool_meta_cache),
48 m_lock(ceph::make_mutex("rbd::mirror::InstanceReplayer " +
49 stringify(local_io_ctx.get_id()))) {
50 }
51
52 template <typename I>
53 InstanceReplayer<I>::~InstanceReplayer() {
54 ceph_assert(m_image_state_check_task == nullptr);
55 ceph_assert(m_async_op_tracker.empty());
56 ceph_assert(m_image_replayers.empty());
57 }
58
59 template <typename I>
60 bool InstanceReplayer<I>::is_blacklisted() const {
61 std::lock_guard locker{m_lock};
62 return m_blacklisted;
63 }
64
65 template <typename I>
66 int InstanceReplayer<I>::init() {
67 C_SaferCond init_ctx;
68 init(&init_ctx);
69 return init_ctx.wait();
70 }
71
72 template <typename I>
73 void InstanceReplayer<I>::init(Context *on_finish) {
74 dout(10) << dendl;
75
76 Context *ctx = new LambdaContext(
77 [this, on_finish] (int r) {
78 {
79 std::lock_guard timer_locker{m_threads->timer_lock};
80 schedule_image_state_check_task();
81 }
82 on_finish->complete(0);
83 });
84
85 m_threads->work_queue->queue(ctx, 0);
86 }
87
88 template <typename I>
89 void InstanceReplayer<I>::shut_down() {
90 C_SaferCond shut_down_ctx;
91 shut_down(&shut_down_ctx);
92 int r = shut_down_ctx.wait();
93 ceph_assert(r == 0);
94 }
95
96 template <typename I>
97 void InstanceReplayer<I>::shut_down(Context *on_finish) {
98 dout(10) << dendl;
99
100 std::lock_guard locker{m_lock};
101
102 ceph_assert(m_on_shut_down == nullptr);
103 m_on_shut_down = on_finish;
104
105 Context *ctx = new LambdaContext(
106 [this] (int r) {
107 cancel_image_state_check_task();
108 wait_for_ops();
109 });
110
111 m_threads->work_queue->queue(ctx, 0);
112 }
113
114 template <typename I>
115 void InstanceReplayer<I>::add_peer(const Peer<I>& peer) {
116 dout(10) << "peer=" << peer << dendl;
117
118 std::lock_guard locker{m_lock};
119 auto result = m_peers.insert(peer).second;
120 ceph_assert(result);
121 }
122
123 template <typename I>
124 void InstanceReplayer<I>::release_all(Context *on_finish) {
125 dout(10) << dendl;
126
127 std::lock_guard locker{m_lock};
128
129 C_Gather *gather_ctx = new C_Gather(g_ceph_context, on_finish);
130 for (auto it = m_image_replayers.begin(); it != m_image_replayers.end();
131 it = m_image_replayers.erase(it)) {
132 auto image_replayer = it->second;
133 auto ctx = gather_ctx->new_sub();
134 ctx = new LambdaContext(
135 [image_replayer, ctx] (int r) {
136 image_replayer->destroy();
137 ctx->complete(0);
138 });
139 stop_image_replayer(image_replayer, ctx);
140 }
141 gather_ctx->activate();
142 }
143
144 template <typename I>
145 void InstanceReplayer<I>::acquire_image(InstanceWatcher<I> *instance_watcher,
146 const std::string &global_image_id,
147 Context *on_finish) {
148 dout(10) << "global_image_id=" << global_image_id << dendl;
149
150 std::lock_guard locker{m_lock};
151
152 ceph_assert(m_on_shut_down == nullptr);
153
154 auto it = m_image_replayers.find(global_image_id);
155 if (it == m_image_replayers.end()) {
156 auto image_replayer = ImageReplayer<I>::create(
157 m_local_io_ctx, m_local_mirror_uuid, global_image_id,
158 m_threads, instance_watcher, m_local_status_updater,
159 m_cache_manager_handler, m_pool_meta_cache);
160
161 dout(10) << global_image_id << ": creating replayer " << image_replayer
162 << dendl;
163
164 it = m_image_replayers.insert(std::make_pair(global_image_id,
165 image_replayer)).first;
166
167 // TODO only a single peer is currently supported
168 ceph_assert(m_peers.size() == 1);
169 auto peer = *m_peers.begin();
170 image_replayer->add_peer(peer);
171 start_image_replayer(image_replayer);
172 } else {
173 // A duplicate acquire notification implies (1) connection hiccup or
174 // (2) new leader election. For the second case, restart the replayer to
175 // detect if the image has been deleted while the leader was offline
176 auto& image_replayer = it->second;
177 image_replayer->set_finished(false);
178 image_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr));
179 }
180
181 m_threads->work_queue->queue(on_finish, 0);
182 }
183
184 template <typename I>
185 void InstanceReplayer<I>::release_image(const std::string &global_image_id,
186 Context *on_finish) {
187 dout(10) << "global_image_id=" << global_image_id << dendl;
188
189 std::lock_guard locker{m_lock};
190 ceph_assert(m_on_shut_down == nullptr);
191
192 auto it = m_image_replayers.find(global_image_id);
193 if (it == m_image_replayers.end()) {
194 dout(5) << global_image_id << ": not found" << dendl;
195 m_threads->work_queue->queue(on_finish, 0);
196 return;
197 }
198
199 auto image_replayer = it->second;
200 m_image_replayers.erase(it);
201
202 on_finish = new LambdaContext(
203 [image_replayer, on_finish] (int r) {
204 image_replayer->destroy();
205 on_finish->complete(0);
206 });
207 stop_image_replayer(image_replayer, on_finish);
208 }
209
210 template <typename I>
211 void InstanceReplayer<I>::remove_peer_image(const std::string &global_image_id,
212 const std::string &peer_mirror_uuid,
213 Context *on_finish) {
214 dout(10) << "global_image_id=" << global_image_id << ", "
215 << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
216
217 std::lock_guard locker{m_lock};
218 ceph_assert(m_on_shut_down == nullptr);
219
220 auto it = m_image_replayers.find(global_image_id);
221 if (it != m_image_replayers.end()) {
222 // TODO only a single peer is currently supported, therefore
223 // we can just interrupt the current image replayer and
224 // it will eventually detect that the peer image is missing and
225 // determine if a delete propagation is required.
226 auto image_replayer = it->second;
227 image_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr));
228 }
229 m_threads->work_queue->queue(on_finish, 0);
230 }
231
232 template <typename I>
233 void InstanceReplayer<I>::print_status(Formatter *f) {
234 dout(10) << dendl;
235
236 std::lock_guard locker{m_lock};
237
238 f->open_array_section("image_replayers");
239 for (auto &kv : m_image_replayers) {
240 auto &image_replayer = kv.second;
241 image_replayer->print_status(f);
242 }
243 f->close_section();
244 }
245
246 template <typename I>
247 void InstanceReplayer<I>::start()
248 {
249 dout(10) << dendl;
250
251 std::lock_guard locker{m_lock};
252
253 m_manual_stop = false;
254
255 auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
256 auto gather_ctx = new C_Gather(
257 cct, new C_TrackedOp(m_async_op_tracker, nullptr));
258 for (auto &kv : m_image_replayers) {
259 auto &image_replayer = kv.second;
260 image_replayer->start(gather_ctx->new_sub(), true);
261 }
262
263 gather_ctx->activate();
264 }
265
266 template <typename I>
267 void InstanceReplayer<I>::stop()
268 {
269 stop(nullptr);
270 }
271
272 template <typename I>
273 void InstanceReplayer<I>::stop(Context *on_finish)
274 {
275 dout(10) << dendl;
276
277 auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
278 auto gather_ctx = new C_Gather(
279 cct, new C_TrackedOp(m_async_op_tracker, on_finish));
280 {
281 std::lock_guard locker{m_lock};
282
283 m_manual_stop = true;
284
285 for (auto &kv : m_image_replayers) {
286 auto &image_replayer = kv.second;
287 image_replayer->stop(gather_ctx->new_sub(), true);
288 }
289 }
290
291 gather_ctx->activate();
292 }
293
294 template <typename I>
295 void InstanceReplayer<I>::restart()
296 {
297 dout(10) << dendl;
298
299 std::lock_guard locker{m_lock};
300
301 m_manual_stop = false;
302
303 for (auto &kv : m_image_replayers) {
304 auto &image_replayer = kv.second;
305 image_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr));
306 }
307 }
308
309 template <typename I>
310 void InstanceReplayer<I>::flush()
311 {
312 dout(10) << dendl;
313
314 std::lock_guard locker{m_lock};
315
316 for (auto &kv : m_image_replayers) {
317 auto &image_replayer = kv.second;
318 image_replayer->flush();
319 }
320 }
321
322 template <typename I>
323 void InstanceReplayer<I>::start_image_replayer(
324 ImageReplayer<I> *image_replayer) {
325 ceph_assert(ceph_mutex_is_locked(m_lock));
326
327 std::string global_image_id = image_replayer->get_global_image_id();
328 if (!image_replayer->is_stopped()) {
329 return;
330 } else if (image_replayer->is_blacklisted()) {
331 derr << "global_image_id=" << global_image_id << ": blacklisted detected "
332 << "during image replay" << dendl;
333 m_blacklisted = true;
334 return;
335 } else if (image_replayer->is_finished()) {
336 // TODO temporary until policy integrated
337 dout(5) << "removing image replayer for global_image_id="
338 << global_image_id << dendl;
339 m_image_replayers.erase(image_replayer->get_global_image_id());
340 image_replayer->destroy();
341 return;
342 } else if (m_manual_stop) {
343 return;
344 }
345
346 dout(10) << "global_image_id=" << global_image_id << dendl;
347 image_replayer->start(new C_TrackedOp(m_async_op_tracker, nullptr), false);
348 }
349
350 template <typename I>
351 void InstanceReplayer<I>::queue_start_image_replayers() {
352 dout(10) << dendl;
353
354 Context *ctx = create_context_callback<
355 InstanceReplayer, &InstanceReplayer<I>::start_image_replayers>(this);
356 m_async_op_tracker.start_op();
357 m_threads->work_queue->queue(ctx, 0);
358 }
359
360 template <typename I>
361 void InstanceReplayer<I>::start_image_replayers(int r) {
362 dout(10) << dendl;
363
364 std::lock_guard locker{m_lock};
365 if (m_on_shut_down != nullptr) {
366 return;
367 }
368
369 uint64_t image_count = 0;
370 uint64_t warning_count = 0;
371 uint64_t error_count = 0;
372 for (auto it = m_image_replayers.begin();
373 it != m_image_replayers.end();) {
374 auto current_it(it);
375 ++it;
376
377 ++image_count;
378 auto health_state = current_it->second->get_health_state();
379 if (health_state == image_replayer::HEALTH_STATE_WARNING) {
380 ++warning_count;
381 } else if (health_state == image_replayer::HEALTH_STATE_ERROR) {
382 ++error_count;
383 }
384
385 start_image_replayer(current_it->second);
386 }
387
388 m_service_daemon->add_or_update_namespace_attribute(
389 m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
390 SERVICE_DAEMON_ASSIGNED_COUNT_KEY, image_count);
391 m_service_daemon->add_or_update_namespace_attribute(
392 m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
393 SERVICE_DAEMON_WARNING_COUNT_KEY, warning_count);
394 m_service_daemon->add_or_update_namespace_attribute(
395 m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
396 SERVICE_DAEMON_ERROR_COUNT_KEY, error_count);
397
398 m_async_op_tracker.finish_op();
399 }
400
401 template <typename I>
402 void InstanceReplayer<I>::stop_image_replayer(ImageReplayer<I> *image_replayer,
403 Context *on_finish) {
404 dout(10) << image_replayer << " global_image_id="
405 << image_replayer->get_global_image_id() << ", on_finish="
406 << on_finish << dendl;
407
408 if (image_replayer->is_stopped()) {
409 m_threads->work_queue->queue(on_finish, 0);
410 return;
411 }
412
413 m_async_op_tracker.start_op();
414 Context *ctx = create_async_context_callback(
415 m_threads->work_queue, new LambdaContext(
416 [this, image_replayer, on_finish] (int r) {
417 stop_image_replayer(image_replayer, on_finish);
418 m_async_op_tracker.finish_op();
419 }));
420
421 if (image_replayer->is_running()) {
422 image_replayer->stop(ctx, false);
423 } else {
424 int after = 1;
425 dout(10) << "scheduling image replayer " << image_replayer << " stop after "
426 << after << " sec (task " << ctx << ")" << dendl;
427 ctx = new LambdaContext(
428 [this, after, ctx] (int r) {
429 std::lock_guard timer_locker{m_threads->timer_lock};
430 m_threads->timer->add_event_after(after, ctx);
431 });
432 m_threads->work_queue->queue(ctx, 0);
433 }
434 }
435
436 template <typename I>
437 void InstanceReplayer<I>::wait_for_ops() {
438 dout(10) << dendl;
439
440 Context *ctx = create_context_callback<
441 InstanceReplayer, &InstanceReplayer<I>::handle_wait_for_ops>(this);
442
443 m_async_op_tracker.wait_for_ops(ctx);
444 }
445
446 template <typename I>
447 void InstanceReplayer<I>::handle_wait_for_ops(int r) {
448 dout(10) << "r=" << r << dendl;
449
450 ceph_assert(r == 0);
451
452 std::lock_guard locker{m_lock};
453 stop_image_replayers();
454 }
455
456 template <typename I>
457 void InstanceReplayer<I>::stop_image_replayers() {
458 dout(10) << dendl;
459
460 ceph_assert(ceph_mutex_is_locked(m_lock));
461
462 Context *ctx = create_async_context_callback(
463 m_threads->work_queue, create_context_callback<InstanceReplayer<I>,
464 &InstanceReplayer<I>::handle_stop_image_replayers>(this));
465
466 C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
467 for (auto &it : m_image_replayers) {
468 stop_image_replayer(it.second, gather_ctx->new_sub());
469 }
470 gather_ctx->activate();
471 }
472
473 template <typename I>
474 void InstanceReplayer<I>::handle_stop_image_replayers(int r) {
475 dout(10) << "r=" << r << dendl;
476
477 ceph_assert(r == 0);
478
479 Context *on_finish = nullptr;
480 {
481 std::lock_guard locker{m_lock};
482
483 for (auto &it : m_image_replayers) {
484 ceph_assert(it.second->is_stopped());
485 it.second->destroy();
486 }
487 m_image_replayers.clear();
488
489 ceph_assert(m_on_shut_down != nullptr);
490 std::swap(on_finish, m_on_shut_down);
491 }
492 on_finish->complete(r);
493 }
494
495 template <typename I>
496 void InstanceReplayer<I>::cancel_image_state_check_task() {
497 std::lock_guard timer_locker{m_threads->timer_lock};
498
499 if (m_image_state_check_task == nullptr) {
500 return;
501 }
502
503 dout(10) << m_image_state_check_task << dendl;
504 bool canceled = m_threads->timer->cancel_event(m_image_state_check_task);
505 ceph_assert(canceled);
506 m_image_state_check_task = nullptr;
507 }
508
509 template <typename I>
510 void InstanceReplayer<I>::schedule_image_state_check_task() {
511 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
512 ceph_assert(m_image_state_check_task == nullptr);
513
514 m_image_state_check_task = new LambdaContext(
515 [this](int r) {
516 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
517 m_image_state_check_task = nullptr;
518 schedule_image_state_check_task();
519 queue_start_image_replayers();
520 });
521
522 auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
523 int after = cct->_conf.get_val<uint64_t>(
524 "rbd_mirror_image_state_check_interval");
525
526 dout(10) << "scheduling image state check after " << after << " sec (task "
527 << m_image_state_check_task << ")" << dendl;
528 m_threads->timer->add_event_after(after, m_image_state_check_task);
529 }
530
531 } // namespace mirror
532 } // namespace rbd
533
534 template class rbd::mirror::InstanceReplayer<librbd::ImageCtx>;