]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/InstanceReplayer.cc
update sources to v12.1.3
[ceph.git] / ceph / src / tools / rbd_mirror / InstanceReplayer.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "include/stringify.h"
5#include "common/Timer.h"
6#include "common/debug.h"
7#include "common/errno.h"
8#include "librbd/Utils.h"
9#include "ImageReplayer.h"
10#include "InstanceReplayer.h"
c07f9fc5 11#include "ServiceDaemon.h"
7c673cae
FG
12#include "Threads.h"
13
14#define dout_context g_ceph_context
15#define dout_subsys ceph_subsys_rbd_mirror
16#undef dout_prefix
17#define dout_prefix *_dout << "rbd::mirror::InstanceReplayer: " \
18 << this << " " << __func__ << ": "
19
20namespace rbd {
21namespace mirror {
22
c07f9fc5
FG
23namespace {
24
25const std::string SERVICE_DAEMON_ASSIGNED_COUNT_KEY("image_assigned_count");
26const std::string SERVICE_DAEMON_WARNING_COUNT_KEY("image_warning_count");
27const std::string SERVICE_DAEMON_ERROR_COUNT_KEY("image_error_count");
28
29} // anonymous namespace
30
7c673cae
FG
31using librbd::util::create_async_context_callback;
32using librbd::util::create_context_callback;
33
34template <typename I>
35InstanceReplayer<I>::InstanceReplayer(
c07f9fc5
FG
36 Threads<I> *threads, ServiceDaemon<I>* service_daemon,
37 ImageDeleter<I>* image_deleter, RadosRef local_rados,
38 const std::string &local_mirror_uuid, int64_t local_pool_id)
39 : m_threads(threads), m_service_daemon(service_daemon),
40 m_image_deleter(image_deleter), m_local_rados(local_rados),
41 m_local_mirror_uuid(local_mirror_uuid), m_local_pool_id(local_pool_id),
31f18b77 42 m_lock("rbd::mirror::InstanceReplayer " + stringify(local_pool_id)) {
7c673cae
FG
43}
44
45template <typename I>
46InstanceReplayer<I>::~InstanceReplayer() {
47 assert(m_image_state_check_task == nullptr);
48 assert(m_async_op_tracker.empty());
49 assert(m_image_replayers.empty());
50}
51
52template <typename I>
53int InstanceReplayer<I>::init() {
54 C_SaferCond init_ctx;
55 init(&init_ctx);
56 return init_ctx.wait();
57}
58
59template <typename I>
60void InstanceReplayer<I>::init(Context *on_finish) {
61 dout(20) << dendl;
62
63 Context *ctx = new FunctionContext(
64 [this, on_finish] (int r) {
65 {
66 Mutex::Locker timer_locker(m_threads->timer_lock);
67 schedule_image_state_check_task();
68 }
69 on_finish->complete(0);
70 });
71
72 m_threads->work_queue->queue(ctx, 0);
73}
74
75template <typename I>
76void InstanceReplayer<I>::shut_down() {
77 C_SaferCond shut_down_ctx;
78 shut_down(&shut_down_ctx);
79 int r = shut_down_ctx.wait();
80 assert(r == 0);
81}
82
83template <typename I>
84void InstanceReplayer<I>::shut_down(Context *on_finish) {
85 dout(20) << dendl;
86
87 Mutex::Locker locker(m_lock);
88
89 assert(m_on_shut_down == nullptr);
90 m_on_shut_down = on_finish;
91
92 Context *ctx = new FunctionContext(
93 [this] (int r) {
94 cancel_image_state_check_task();
95 wait_for_ops();
96 });
97
98 m_threads->work_queue->queue(ctx, 0);
99}
100
101template <typename I>
d2e6a577 102void InstanceReplayer<I>::add_peer(std::string peer_uuid,
7c673cae 103 librados::IoCtx io_ctx) {
d2e6a577 104 dout(20) << peer_uuid << dendl;
7c673cae
FG
105
106 Mutex::Locker locker(m_lock);
d2e6a577 107 auto result = m_peers.insert(Peer(peer_uuid, io_ctx)).second;
7c673cae
FG
108 assert(result);
109}
110
7c673cae
FG
111template <typename I>
112void InstanceReplayer<I>::release_all(Context *on_finish) {
113 dout(20) << dendl;
114
115 Mutex::Locker locker(m_lock);
116
117 C_Gather *gather_ctx = new C_Gather(g_ceph_context, on_finish);
118 for (auto it = m_image_replayers.begin(); it != m_image_replayers.end();
119 it = m_image_replayers.erase(it)) {
120 auto image_replayer = it->second;
121 auto ctx = gather_ctx->new_sub();
122 ctx = new FunctionContext(
123 [image_replayer, ctx] (int r) {
124 image_replayer->destroy();
125 ctx->complete(0);
126 });
127 stop_image_replayer(image_replayer, ctx);
128 }
129 gather_ctx->activate();
130}
131
132template <typename I>
31f18b77
FG
133void InstanceReplayer<I>::acquire_image(InstanceWatcher<I> *instance_watcher,
134 const std::string &global_image_id,
7c673cae 135 Context *on_finish) {
d2e6a577 136 dout(20) << "global_image_id=" << global_image_id << dendl;
7c673cae
FG
137
138 Mutex::Locker locker(m_lock);
139
140 assert(m_on_shut_down == nullptr);
141
142 auto it = m_image_replayers.find(global_image_id);
7c673cae
FG
143 if (it == m_image_replayers.end()) {
144 auto image_replayer = ImageReplayer<I>::create(
31f18b77
FG
145 m_threads, m_image_deleter, instance_watcher, m_local_rados,
146 m_local_mirror_uuid, m_local_pool_id, global_image_id);
7c673cae
FG
147
148 dout(20) << global_image_id << ": creating replayer " << image_replayer
149 << dendl;
150
151 it = m_image_replayers.insert(std::make_pair(global_image_id,
152 image_replayer)).first;
d2e6a577
FG
153
154 // TODO only a single peer is currently supported
155 assert(m_peers.size() == 1);
156 auto peer = *m_peers.begin();
157 image_replayer->add_peer(peer.peer_uuid, peer.io_ctx);
7c673cae
FG
158 }
159
d2e6a577
FG
160 auto& image_replayer = it->second;
161 // TODO temporary until policy integrated
162 image_replayer->set_finished(false);
7c673cae 163
7c673cae 164 start_image_replayer(image_replayer);
7c673cae
FG
165 m_threads->work_queue->queue(on_finish, 0);
166}
167
168template <typename I>
169void InstanceReplayer<I>::release_image(const std::string &global_image_id,
7c673cae 170 Context *on_finish) {
d2e6a577 171 dout(20) << "global_image_id=" << global_image_id << dendl;
7c673cae
FG
172
173 Mutex::Locker locker(m_lock);
7c673cae
FG
174 assert(m_on_shut_down == nullptr);
175
176 auto it = m_image_replayers.find(global_image_id);
7c673cae
FG
177 if (it == m_image_replayers.end()) {
178 dout(20) << global_image_id << ": not found" << dendl;
179 m_threads->work_queue->queue(on_finish, 0);
180 return;
181 }
182
183 auto image_replayer = it->second;
7c673cae
FG
184 m_image_replayers.erase(it);
185
186 on_finish = new FunctionContext(
187 [image_replayer, on_finish] (int r) {
188 image_replayer->destroy();
189 on_finish->complete(0);
190 });
d2e6a577
FG
191 stop_image_replayer(image_replayer, on_finish);
192}
7c673cae 193
d2e6a577
FG
194template <typename I>
195void InstanceReplayer<I>::remove_peer_image(const std::string &global_image_id,
196 const std::string &peer_mirror_uuid,
197 Context *on_finish) {
198 dout(20) << "global_image_id=" << global_image_id << ", "
199 << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
7c673cae 200
d2e6a577
FG
201 Mutex::Locker locker(m_lock);
202 assert(m_on_shut_down == nullptr);
203
204 auto it = m_image_replayers.find(global_image_id);
205 if (it != m_image_replayers.end()) {
206 // TODO only a single peer is currently supported, therefore
207 // we can just interrupt the current image replayer and
208 // it will eventually detect that the peer image is missing and
209 // determine if a delete propagation is required.
210 auto image_replayer = it->second;
211 image_replayer->restart();
212 }
213 m_threads->work_queue->queue(on_finish, 0);
7c673cae
FG
214}
215
216template <typename I>
217void InstanceReplayer<I>::print_status(Formatter *f, stringstream *ss) {
218 dout(20) << dendl;
219
220 if (!f) {
221 return;
222 }
223
224 Mutex::Locker locker(m_lock);
225
226 f->open_array_section("image_replayers");
227 for (auto &kv : m_image_replayers) {
228 auto &image_replayer = kv.second;
229 image_replayer->print_status(f, ss);
230 }
231 f->close_section();
232}
233
234template <typename I>
235void InstanceReplayer<I>::start()
236{
237 dout(20) << dendl;
238
239 Mutex::Locker locker(m_lock);
240
241 m_manual_stop = false;
242
243 for (auto &kv : m_image_replayers) {
244 auto &image_replayer = kv.second;
245 image_replayer->start(nullptr, true);
246 }
247}
248
249template <typename I>
250void InstanceReplayer<I>::stop()
251{
252 dout(20) << dendl;
253
254 Mutex::Locker locker(m_lock);
255
256 m_manual_stop = true;
257
258 for (auto &kv : m_image_replayers) {
259 auto &image_replayer = kv.second;
260 image_replayer->stop(nullptr, true);
261 }
262}
263
264template <typename I>
265void InstanceReplayer<I>::restart()
266{
267 dout(20) << dendl;
268
269 Mutex::Locker locker(m_lock);
270
271 m_manual_stop = false;
272
273 for (auto &kv : m_image_replayers) {
274 auto &image_replayer = kv.second;
275 image_replayer->restart();
276 }
277}
278
279template <typename I>
280void InstanceReplayer<I>::flush()
281{
282 dout(20) << "enter" << dendl;
283
284 Mutex::Locker locker(m_lock);
285
286 for (auto &kv : m_image_replayers) {
287 auto &image_replayer = kv.second;
288 image_replayer->flush();
289 }
290}
291
292template <typename I>
293void InstanceReplayer<I>::start_image_replayer(
294 ImageReplayer<I> *image_replayer) {
295 assert(m_lock.is_locked());
296
297 std::string global_image_id = image_replayer->get_global_image_id();
298 dout(20) << "global_image_id=" << global_image_id << dendl;
299
300 if (!image_replayer->is_stopped()) {
301 return;
302 } else if (image_replayer->is_blacklisted()) {
303 derr << "blacklisted detected during image replay" << dendl;
304 return;
d2e6a577
FG
305 } else if (image_replayer->is_finished()) {
306 // TODO temporary until policy integrated
307 dout(5) << "removing image replayer for global_image_id="
308 << global_image_id << dendl;
309 m_image_replayers.erase(image_replayer->get_global_image_id());
310 image_replayer->destroy();
311 return;
7c673cae
FG
312 }
313
d2e6a577 314 image_replayer->start(nullptr, false);
7c673cae
FG
315}
316
317template <typename I>
c07f9fc5 318void InstanceReplayer<I>::queue_start_image_replayers() {
7c673cae
FG
319 dout(20) << dendl;
320
c07f9fc5
FG
321 Context *ctx = create_context_callback<
322 InstanceReplayer, &InstanceReplayer<I>::start_image_replayers>(this);
7c673cae
FG
323 m_async_op_tracker.start_op();
324 m_threads->work_queue->queue(ctx, 0);
325}
326
c07f9fc5
FG
327template <typename I>
328void InstanceReplayer<I>::start_image_replayers(int r) {
329 dout(20) << dendl;
330
331 Mutex::Locker locker(m_lock);
332 if (m_on_shut_down != nullptr) {
333 return;
334 }
335
d2e6a577
FG
336 uint64_t image_count = 0;
337 uint64_t warning_count = 0;
338 uint64_t error_count = 0;
339 for (auto it = m_image_replayers.begin();
340 it != m_image_replayers.end();) {
341 auto current_it(it);
342 ++it;
343
c07f9fc5 344 ++image_count;
d2e6a577 345 auto health_state = current_it->second->get_health_state();
c07f9fc5
FG
346 if (health_state == image_replayer::HEALTH_STATE_WARNING) {
347 ++warning_count;
348 } else if (health_state == image_replayer::HEALTH_STATE_ERROR) {
349 ++error_count;
350 }
351
d2e6a577 352 start_image_replayer(current_it->second);
c07f9fc5
FG
353 }
354
355 m_service_daemon->add_or_update_attribute(
356 m_local_pool_id, SERVICE_DAEMON_ASSIGNED_COUNT_KEY, image_count);
357 m_service_daemon->add_or_update_attribute(
358 m_local_pool_id, SERVICE_DAEMON_WARNING_COUNT_KEY, warning_count);
359 m_service_daemon->add_or_update_attribute(
360 m_local_pool_id, SERVICE_DAEMON_ERROR_COUNT_KEY, error_count);
361
362 m_async_op_tracker.finish_op();
363}
7c673cae
FG
364
365template <typename I>
366void InstanceReplayer<I>::stop_image_replayer(ImageReplayer<I> *image_replayer,
367 Context *on_finish) {
368 dout(20) << image_replayer << " global_image_id="
369 << image_replayer->get_global_image_id() << ", on_finish="
370 << on_finish << dendl;
371
372 if (image_replayer->is_stopped()) {
373 m_threads->work_queue->queue(on_finish, 0);
374 return;
375 }
376
377 m_async_op_tracker.start_op();
378 Context *ctx = create_async_context_callback(
379 m_threads->work_queue, new FunctionContext(
380 [this, image_replayer, on_finish] (int r) {
381 stop_image_replayer(image_replayer, on_finish);
382 m_async_op_tracker.finish_op();
383 }));
384
385 if (image_replayer->is_running()) {
386 image_replayer->stop(ctx, false);
387 } else {
388 int after = 1;
389 dout(20) << "scheduling image replayer " << image_replayer << " stop after "
390 << after << " sec (task " << ctx << ")" << dendl;
391 ctx = new FunctionContext(
392 [this, after, ctx] (int r) {
393 Mutex::Locker timer_locker(m_threads->timer_lock);
394 m_threads->timer->add_event_after(after, ctx);
395 });
396 m_threads->work_queue->queue(ctx, 0);
397 }
398}
399
400template <typename I>
401void InstanceReplayer<I>::wait_for_ops() {
402 dout(20) << dendl;
403
404 Context *ctx = create_context_callback<
405 InstanceReplayer, &InstanceReplayer<I>::handle_wait_for_ops>(this);
406
407 m_async_op_tracker.wait_for_ops(ctx);
408}
409
410template <typename I>
411void InstanceReplayer<I>::handle_wait_for_ops(int r) {
412 dout(20) << "r=" << r << dendl;
413
414 assert(r == 0);
415
416 Mutex::Locker locker(m_lock);
417 stop_image_replayers();
418}
419
420template <typename I>
421void InstanceReplayer<I>::stop_image_replayers() {
422 dout(20) << dendl;
423
424 assert(m_lock.is_locked());
425
426 Context *ctx = create_async_context_callback(
427 m_threads->work_queue, create_context_callback<InstanceReplayer<I>,
428 &InstanceReplayer<I>::handle_stop_image_replayers>(this));
429
430 C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
431 for (auto &it : m_image_replayers) {
432 stop_image_replayer(it.second, gather_ctx->new_sub());
433 }
434 gather_ctx->activate();
435}
436
437template <typename I>
438void InstanceReplayer<I>::handle_stop_image_replayers(int r) {
439 dout(20) << "r=" << r << dendl;
440
441 assert(r == 0);
442
443 Context *on_finish = nullptr;
444 {
445 Mutex::Locker locker(m_lock);
446
447 for (auto &it : m_image_replayers) {
448 assert(it.second->is_stopped());
449 it.second->destroy();
450 }
451 m_image_replayers.clear();
452
453 assert(m_on_shut_down != nullptr);
454 std::swap(on_finish, m_on_shut_down);
455 }
456 on_finish->complete(r);
457}
458
459template <typename I>
460void InstanceReplayer<I>::cancel_image_state_check_task() {
461 Mutex::Locker timer_locker(m_threads->timer_lock);
462
463 if (m_image_state_check_task == nullptr) {
464 return;
465 }
466
467 dout(20) << m_image_state_check_task << dendl;
468 bool canceled = m_threads->timer->cancel_event(m_image_state_check_task);
469 assert(canceled);
470 m_image_state_check_task = nullptr;
471}
472
473template <typename I>
474void InstanceReplayer<I>::schedule_image_state_check_task() {
475 assert(m_threads->timer_lock.is_locked());
476 assert(m_image_state_check_task == nullptr);
477
478 m_image_state_check_task = new FunctionContext(
479 [this](int r) {
480 assert(m_threads->timer_lock.is_locked());
481 m_image_state_check_task = nullptr;
482 schedule_image_state_check_task();
c07f9fc5 483 queue_start_image_replayers();
7c673cae
FG
484 });
485
c07f9fc5 486 int after = g_ceph_context->_conf->rbd_mirror_image_state_check_interval;
7c673cae
FG
487
488 dout(20) << "scheduling image state check after " << after << " sec (task "
489 << m_image_state_check_task << ")" << dendl;
490 m_threads->timer->add_event_after(after, m_image_state_check_task);
491}
492
493} // namespace mirror
494} // namespace rbd
495
496template class rbd::mirror::InstanceReplayer<librbd::ImageCtx>;