]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/InstanceReplayer.cc
40a8c1b43205a6f7d6e79043ff4437050ee9c619
[ceph.git] / ceph / src / tools / rbd_mirror / InstanceReplayer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/stringify.h"
5 #include "common/Timer.h"
6 #include "common/debug.h"
7 #include "common/errno.h"
8 #include "librbd/Utils.h"
9 #include "ImageReplayer.h"
10 #include "InstanceReplayer.h"
11 #include "ServiceDaemon.h"
12 #include "Threads.h"
13
14 #define dout_context g_ceph_context
15 #define dout_subsys ceph_subsys_rbd_mirror
16 #undef dout_prefix
17 #define dout_prefix *_dout << "rbd::mirror::InstanceReplayer: " \
18 << this << " " << __func__ << ": "
19
20 namespace rbd {
21 namespace mirror {
22
23 namespace {
24
25 const std::string SERVICE_DAEMON_ASSIGNED_COUNT_KEY("image_assigned_count");
26 const std::string SERVICE_DAEMON_WARNING_COUNT_KEY("image_warning_count");
27 const std::string SERVICE_DAEMON_ERROR_COUNT_KEY("image_error_count");
28
29 } // anonymous namespace
30
31 using librbd::util::create_async_context_callback;
32 using librbd::util::create_context_callback;
33
34 template <typename I>
35 InstanceReplayer<I>::InstanceReplayer(
36 Threads<I> *threads, ServiceDaemon<I>* service_daemon,
37 ImageDeleter<I>* image_deleter, RadosRef local_rados,
38 const std::string &local_mirror_uuid, int64_t local_pool_id)
39 : m_threads(threads), m_service_daemon(service_daemon),
40 m_image_deleter(image_deleter), m_local_rados(local_rados),
41 m_local_mirror_uuid(local_mirror_uuid), m_local_pool_id(local_pool_id),
42 m_lock("rbd::mirror::InstanceReplayer " + stringify(local_pool_id)) {
43 }
44
45 template <typename I>
46 InstanceReplayer<I>::~InstanceReplayer() {
47 assert(m_image_state_check_task == nullptr);
48 assert(m_async_op_tracker.empty());
49 assert(m_image_replayers.empty());
50 }
51
52 template <typename I>
53 int InstanceReplayer<I>::init() {
54 C_SaferCond init_ctx;
55 init(&init_ctx);
56 return init_ctx.wait();
57 }
58
59 template <typename I>
60 void InstanceReplayer<I>::init(Context *on_finish) {
61 dout(20) << dendl;
62
63 Context *ctx = new FunctionContext(
64 [this, on_finish] (int r) {
65 {
66 Mutex::Locker timer_locker(m_threads->timer_lock);
67 schedule_image_state_check_task();
68 }
69 on_finish->complete(0);
70 });
71
72 m_threads->work_queue->queue(ctx, 0);
73 }
74
75 template <typename I>
76 void InstanceReplayer<I>::shut_down() {
77 C_SaferCond shut_down_ctx;
78 shut_down(&shut_down_ctx);
79 int r = shut_down_ctx.wait();
80 assert(r == 0);
81 }
82
83 template <typename I>
84 void InstanceReplayer<I>::shut_down(Context *on_finish) {
85 dout(20) << dendl;
86
87 Mutex::Locker locker(m_lock);
88
89 assert(m_on_shut_down == nullptr);
90 m_on_shut_down = on_finish;
91
92 Context *ctx = new FunctionContext(
93 [this] (int r) {
94 cancel_image_state_check_task();
95 wait_for_ops();
96 });
97
98 m_threads->work_queue->queue(ctx, 0);
99 }
100
101 template <typename I>
102 void InstanceReplayer<I>::add_peer(std::string mirror_uuid,
103 librados::IoCtx io_ctx) {
104 dout(20) << mirror_uuid << dendl;
105
106 Mutex::Locker locker(m_lock);
107 auto result = m_peers.insert(Peer(mirror_uuid, io_ctx)).second;
108 assert(result);
109 }
110
111 template <typename I>
112 void InstanceReplayer<I>::remove_peer(std::string mirror_uuid) {
113 dout(20) << mirror_uuid << dendl;
114
115 Mutex::Locker locker(m_lock);
116 auto result = m_peers.erase(Peer(mirror_uuid));
117 assert(result > 0);
118 }
119
120 template <typename I>
121 void InstanceReplayer<I>::release_all(Context *on_finish) {
122 dout(20) << dendl;
123
124 Mutex::Locker locker(m_lock);
125
126 C_Gather *gather_ctx = new C_Gather(g_ceph_context, on_finish);
127 for (auto it = m_image_replayers.begin(); it != m_image_replayers.end();
128 it = m_image_replayers.erase(it)) {
129 auto image_replayer = it->second;
130 auto ctx = gather_ctx->new_sub();
131 ctx = new FunctionContext(
132 [image_replayer, ctx] (int r) {
133 image_replayer->destroy();
134 ctx->complete(0);
135 });
136 stop_image_replayer(image_replayer, ctx);
137 }
138 gather_ctx->activate();
139 }
140
141 template <typename I>
142 void InstanceReplayer<I>::acquire_image(InstanceWatcher<I> *instance_watcher,
143 const std::string &global_image_id,
144 const std::string &peer_mirror_uuid,
145 const std::string &peer_image_id,
146 Context *on_finish) {
147 dout(20) << "global_image_id=" << global_image_id << ", peer_mirror_uuid="
148 << peer_mirror_uuid << ", peer_image_id=" << peer_image_id << dendl;
149
150 Mutex::Locker locker(m_lock);
151
152 assert(m_on_shut_down == nullptr);
153
154 auto it = m_image_replayers.find(global_image_id);
155
156 if (it == m_image_replayers.end()) {
157 auto image_replayer = ImageReplayer<I>::create(
158 m_threads, m_image_deleter, instance_watcher, m_local_rados,
159 m_local_mirror_uuid, m_local_pool_id, global_image_id);
160
161 dout(20) << global_image_id << ": creating replayer " << image_replayer
162 << dendl;
163
164 it = m_image_replayers.insert(std::make_pair(global_image_id,
165 image_replayer)).first;
166 }
167
168 auto image_replayer = it->second;
169 if (!peer_mirror_uuid.empty()) {
170 auto iter = m_peers.find(Peer(peer_mirror_uuid));
171 assert(iter != m_peers.end());
172 auto io_ctx = iter->io_ctx;
173
174 image_replayer->add_remote_image(peer_mirror_uuid, peer_image_id, io_ctx);
175 }
176 start_image_replayer(image_replayer);
177
178 m_threads->work_queue->queue(on_finish, 0);
179 }
180
181 template <typename I>
182 void InstanceReplayer<I>::release_image(const std::string &global_image_id,
183 const std::string &peer_mirror_uuid,
184 const std::string &peer_image_id,
185 bool schedule_delete,
186 Context *on_finish) {
187 dout(20) << "global_image_id=" << global_image_id << ", peer_mirror_uuid="
188 << peer_mirror_uuid << ", peer_image_id=" << peer_image_id << dendl;
189
190 Mutex::Locker locker(m_lock);
191
192 assert(m_on_shut_down == nullptr);
193
194 auto it = m_image_replayers.find(global_image_id);
195
196 if (it == m_image_replayers.end()) {
197 dout(20) << global_image_id << ": not found" << dendl;
198 m_threads->work_queue->queue(on_finish, 0);
199 return;
200 }
201
202 auto image_replayer = it->second;
203 if (!peer_mirror_uuid.empty()) {
204 image_replayer->remove_remote_image(peer_mirror_uuid, peer_image_id,
205 schedule_delete);
206 }
207
208 if (!image_replayer->remote_images_empty()) {
209 dout(20) << global_image_id << ": still has peer images" << dendl;
210 m_threads->work_queue->queue(on_finish, 0);
211 return;
212 }
213
214 m_image_replayers.erase(it);
215
216 on_finish = new FunctionContext(
217 [image_replayer, on_finish] (int r) {
218 image_replayer->destroy();
219 on_finish->complete(0);
220 });
221
222 if (schedule_delete) {
223 on_finish = new FunctionContext(
224 [this, image_replayer, on_finish] (int r) {
225 auto global_image_id = image_replayer->get_global_image_id();
226 m_image_deleter->schedule_image_delete(
227 m_local_rados, m_local_pool_id, global_image_id, false);
228 on_finish->complete(0);
229 });
230 }
231
232 stop_image_replayer(image_replayer, on_finish);
233 }
234
235 template <typename I>
236 void InstanceReplayer<I>::print_status(Formatter *f, stringstream *ss) {
237 dout(20) << dendl;
238
239 if (!f) {
240 return;
241 }
242
243 Mutex::Locker locker(m_lock);
244
245 f->open_array_section("image_replayers");
246 for (auto &kv : m_image_replayers) {
247 auto &image_replayer = kv.second;
248 image_replayer->print_status(f, ss);
249 }
250 f->close_section();
251 }
252
253 template <typename I>
254 void InstanceReplayer<I>::start()
255 {
256 dout(20) << dendl;
257
258 Mutex::Locker locker(m_lock);
259
260 m_manual_stop = false;
261
262 for (auto &kv : m_image_replayers) {
263 auto &image_replayer = kv.second;
264 image_replayer->start(nullptr, true);
265 }
266 }
267
268 template <typename I>
269 void InstanceReplayer<I>::stop()
270 {
271 dout(20) << dendl;
272
273 Mutex::Locker locker(m_lock);
274
275 m_manual_stop = true;
276
277 for (auto &kv : m_image_replayers) {
278 auto &image_replayer = kv.second;
279 image_replayer->stop(nullptr, true);
280 }
281 }
282
283 template <typename I>
284 void InstanceReplayer<I>::restart()
285 {
286 dout(20) << dendl;
287
288 Mutex::Locker locker(m_lock);
289
290 m_manual_stop = false;
291
292 for (auto &kv : m_image_replayers) {
293 auto &image_replayer = kv.second;
294 image_replayer->restart();
295 }
296 }
297
298 template <typename I>
299 void InstanceReplayer<I>::flush()
300 {
301 dout(20) << "enter" << dendl;
302
303 Mutex::Locker locker(m_lock);
304
305 for (auto &kv : m_image_replayers) {
306 auto &image_replayer = kv.second;
307 image_replayer->flush();
308 }
309 }
310
311 template <typename I>
312 void InstanceReplayer<I>::start_image_replayer(
313 ImageReplayer<I> *image_replayer) {
314 assert(m_lock.is_locked());
315
316 std::string global_image_id = image_replayer->get_global_image_id();
317 dout(20) << "global_image_id=" << global_image_id << dendl;
318
319 if (!image_replayer->is_stopped()) {
320 return;
321 } else if (image_replayer->is_blacklisted()) {
322 derr << "blacklisted detected during image replay" << dendl;
323 return;
324 }
325
326 FunctionContext *ctx = new FunctionContext(
327 [this, global_image_id] (int r) {
328 dout(20) << "image deleter result: r=" << r << ", "
329 << "global_image_id=" << global_image_id << dendl;
330
331 Mutex::Locker locker(m_lock);
332 m_async_op_tracker.finish_op();
333
334 if (r == -ESTALE || r == -ECANCELED) {
335 return;
336 }
337
338 auto it = m_image_replayers.find(global_image_id);
339 if (it == m_image_replayers.end()) {
340 return;
341 }
342
343 auto image_replayer = it->second;
344 if (r >= 0) {
345 image_replayer->start(nullptr, false);
346 } else {
347 start_image_replayer(image_replayer);
348 }
349 });
350
351 m_async_op_tracker.start_op();
352 m_image_deleter->wait_for_scheduled_deletion(
353 m_local_pool_id, image_replayer->get_global_image_id(), ctx, false);
354 }
355
356 template <typename I>
357 void InstanceReplayer<I>::queue_start_image_replayers() {
358 dout(20) << dendl;
359
360 Context *ctx = create_context_callback<
361 InstanceReplayer, &InstanceReplayer<I>::start_image_replayers>(this);
362 m_async_op_tracker.start_op();
363 m_threads->work_queue->queue(ctx, 0);
364 }
365
366 template <typename I>
367 void InstanceReplayer<I>::start_image_replayers(int r) {
368 dout(20) << dendl;
369
370 Mutex::Locker locker(m_lock);
371 if (m_on_shut_down != nullptr) {
372 return;
373 }
374
375 size_t image_count = 0;
376 size_t warning_count = 0;
377 size_t error_count = 0;
378 for (auto &it : m_image_replayers) {
379 ++image_count;
380 auto health_state = it.second->get_health_state();
381 if (health_state == image_replayer::HEALTH_STATE_WARNING) {
382 ++warning_count;
383 } else if (health_state == image_replayer::HEALTH_STATE_ERROR) {
384 ++error_count;
385 }
386
387 start_image_replayer(it.second);
388 }
389
390 m_service_daemon->add_or_update_attribute(
391 m_local_pool_id, SERVICE_DAEMON_ASSIGNED_COUNT_KEY, image_count);
392 m_service_daemon->add_or_update_attribute(
393 m_local_pool_id, SERVICE_DAEMON_WARNING_COUNT_KEY, warning_count);
394 m_service_daemon->add_or_update_attribute(
395 m_local_pool_id, SERVICE_DAEMON_ERROR_COUNT_KEY, error_count);
396
397 m_async_op_tracker.finish_op();
398 }
399
400 template <typename I>
401 void InstanceReplayer<I>::stop_image_replayer(ImageReplayer<I> *image_replayer,
402 Context *on_finish) {
403 dout(20) << image_replayer << " global_image_id="
404 << image_replayer->get_global_image_id() << ", on_finish="
405 << on_finish << dendl;
406
407 if (image_replayer->is_stopped()) {
408 m_threads->work_queue->queue(on_finish, 0);
409 return;
410 }
411
412 m_async_op_tracker.start_op();
413 Context *ctx = create_async_context_callback(
414 m_threads->work_queue, new FunctionContext(
415 [this, image_replayer, on_finish] (int r) {
416 stop_image_replayer(image_replayer, on_finish);
417 m_async_op_tracker.finish_op();
418 }));
419
420 if (image_replayer->is_running()) {
421 image_replayer->stop(ctx, false);
422 } else {
423 int after = 1;
424 dout(20) << "scheduling image replayer " << image_replayer << " stop after "
425 << after << " sec (task " << ctx << ")" << dendl;
426 ctx = new FunctionContext(
427 [this, after, ctx] (int r) {
428 Mutex::Locker timer_locker(m_threads->timer_lock);
429 m_threads->timer->add_event_after(after, ctx);
430 });
431 m_threads->work_queue->queue(ctx, 0);
432 }
433 }
434
435 template <typename I>
436 void InstanceReplayer<I>::wait_for_ops() {
437 dout(20) << dendl;
438
439 Context *ctx = create_context_callback<
440 InstanceReplayer, &InstanceReplayer<I>::handle_wait_for_ops>(this);
441
442 m_async_op_tracker.wait_for_ops(ctx);
443 }
444
445 template <typename I>
446 void InstanceReplayer<I>::handle_wait_for_ops(int r) {
447 dout(20) << "r=" << r << dendl;
448
449 assert(r == 0);
450
451 Mutex::Locker locker(m_lock);
452 stop_image_replayers();
453 }
454
455 template <typename I>
456 void InstanceReplayer<I>::stop_image_replayers() {
457 dout(20) << dendl;
458
459 assert(m_lock.is_locked());
460
461 Context *ctx = create_async_context_callback(
462 m_threads->work_queue, create_context_callback<InstanceReplayer<I>,
463 &InstanceReplayer<I>::handle_stop_image_replayers>(this));
464
465 C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
466 for (auto &it : m_image_replayers) {
467 stop_image_replayer(it.second, gather_ctx->new_sub());
468 }
469 gather_ctx->activate();
470 }
471
472 template <typename I>
473 void InstanceReplayer<I>::handle_stop_image_replayers(int r) {
474 dout(20) << "r=" << r << dendl;
475
476 assert(r == 0);
477
478 Context *on_finish = nullptr;
479 {
480 Mutex::Locker locker(m_lock);
481
482 for (auto &it : m_image_replayers) {
483 assert(it.second->is_stopped());
484 it.second->destroy();
485 }
486 m_image_replayers.clear();
487
488 assert(m_on_shut_down != nullptr);
489 std::swap(on_finish, m_on_shut_down);
490 }
491 on_finish->complete(r);
492 }
493
494 template <typename I>
495 void InstanceReplayer<I>::cancel_image_state_check_task() {
496 Mutex::Locker timer_locker(m_threads->timer_lock);
497
498 if (m_image_state_check_task == nullptr) {
499 return;
500 }
501
502 dout(20) << m_image_state_check_task << dendl;
503 bool canceled = m_threads->timer->cancel_event(m_image_state_check_task);
504 assert(canceled);
505 m_image_state_check_task = nullptr;
506 }
507
508 template <typename I>
509 void InstanceReplayer<I>::schedule_image_state_check_task() {
510 assert(m_threads->timer_lock.is_locked());
511 assert(m_image_state_check_task == nullptr);
512
513 m_image_state_check_task = new FunctionContext(
514 [this](int r) {
515 assert(m_threads->timer_lock.is_locked());
516 m_image_state_check_task = nullptr;
517 schedule_image_state_check_task();
518 queue_start_image_replayers();
519 });
520
521 int after = g_ceph_context->_conf->rbd_mirror_image_state_check_interval;
522
523 dout(20) << "scheduling image state check after " << after << " sec (task "
524 << m_image_state_check_task << ")" << dendl;
525 m_threads->timer->add_event_after(after, m_image_state_check_task);
526 }
527
528 } // namespace mirror
529 } // namespace rbd
530
531 template class rbd::mirror::InstanceReplayer<librbd::ImageCtx>;