1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/stringify.h"
5 #include "common/Timer.h"
6 #include "common/debug.h"
7 #include "common/errno.h"
8 #include "librbd/Utils.h"
9 #include "ImageReplayer.h"
10 #include "InstanceReplayer.h"
11 #include "ServiceDaemon.h"
14 #define dout_context g_ceph_context
15 #define dout_subsys ceph_subsys_rbd_mirror
17 #define dout_prefix *_dout << "rbd::mirror::InstanceReplayer: " \
18 << this << " " << __func__ << ": "
25 const std::string
SERVICE_DAEMON_ASSIGNED_COUNT_KEY("image_assigned_count");
26 const std::string
SERVICE_DAEMON_WARNING_COUNT_KEY("image_warning_count");
27 const std::string
SERVICE_DAEMON_ERROR_COUNT_KEY("image_error_count");
29 } // anonymous namespace
31 using librbd::util::create_async_context_callback
;
32 using librbd::util::create_context_callback
;
35 InstanceReplayer
<I
>::InstanceReplayer(
36 Threads
<I
> *threads
, ServiceDaemon
<I
>* service_daemon
,
37 ImageDeleter
<I
>* image_deleter
, RadosRef local_rados
,
38 const std::string
&local_mirror_uuid
, int64_t local_pool_id
)
39 : m_threads(threads
), m_service_daemon(service_daemon
),
40 m_image_deleter(image_deleter
), m_local_rados(local_rados
),
41 m_local_mirror_uuid(local_mirror_uuid
), m_local_pool_id(local_pool_id
),
42 m_lock("rbd::mirror::InstanceReplayer " + stringify(local_pool_id
)) {
46 InstanceReplayer
<I
>::~InstanceReplayer() {
47 assert(m_image_state_check_task
== nullptr);
48 assert(m_async_op_tracker
.empty());
49 assert(m_image_replayers
.empty());
53 int InstanceReplayer
<I
>::init() {
56 return init_ctx
.wait();
60 void InstanceReplayer
<I
>::init(Context
*on_finish
) {
63 Context
*ctx
= new FunctionContext(
64 [this, on_finish
] (int r
) {
66 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
67 schedule_image_state_check_task();
69 on_finish
->complete(0);
72 m_threads
->work_queue
->queue(ctx
, 0);
76 void InstanceReplayer
<I
>::shut_down() {
77 C_SaferCond shut_down_ctx
;
78 shut_down(&shut_down_ctx
);
79 int r
= shut_down_ctx
.wait();
84 void InstanceReplayer
<I
>::shut_down(Context
*on_finish
) {
87 Mutex::Locker
locker(m_lock
);
89 assert(m_on_shut_down
== nullptr);
90 m_on_shut_down
= on_finish
;
92 Context
*ctx
= new FunctionContext(
94 cancel_image_state_check_task();
98 m_threads
->work_queue
->queue(ctx
, 0);
101 template <typename I
>
102 void InstanceReplayer
<I
>::add_peer(std::string mirror_uuid
,
103 librados::IoCtx io_ctx
) {
104 dout(20) << mirror_uuid
<< dendl
;
106 Mutex::Locker
locker(m_lock
);
107 auto result
= m_peers
.insert(Peer(mirror_uuid
, io_ctx
)).second
;
111 template <typename I
>
112 void InstanceReplayer
<I
>::remove_peer(std::string mirror_uuid
) {
113 dout(20) << mirror_uuid
<< dendl
;
115 Mutex::Locker
locker(m_lock
);
116 auto result
= m_peers
.erase(Peer(mirror_uuid
));
120 template <typename I
>
121 void InstanceReplayer
<I
>::release_all(Context
*on_finish
) {
124 Mutex::Locker
locker(m_lock
);
126 C_Gather
*gather_ctx
= new C_Gather(g_ceph_context
, on_finish
);
127 for (auto it
= m_image_replayers
.begin(); it
!= m_image_replayers
.end();
128 it
= m_image_replayers
.erase(it
)) {
129 auto image_replayer
= it
->second
;
130 auto ctx
= gather_ctx
->new_sub();
131 ctx
= new FunctionContext(
132 [image_replayer
, ctx
] (int r
) {
133 image_replayer
->destroy();
136 stop_image_replayer(image_replayer
, ctx
);
138 gather_ctx
->activate();
141 template <typename I
>
142 void InstanceReplayer
<I
>::acquire_image(InstanceWatcher
<I
> *instance_watcher
,
143 const std::string
&global_image_id
,
144 const std::string
&peer_mirror_uuid
,
145 const std::string
&peer_image_id
,
146 Context
*on_finish
) {
147 dout(20) << "global_image_id=" << global_image_id
<< ", peer_mirror_uuid="
148 << peer_mirror_uuid
<< ", peer_image_id=" << peer_image_id
<< dendl
;
150 Mutex::Locker
locker(m_lock
);
152 assert(m_on_shut_down
== nullptr);
154 auto it
= m_image_replayers
.find(global_image_id
);
156 if (it
== m_image_replayers
.end()) {
157 auto image_replayer
= ImageReplayer
<I
>::create(
158 m_threads
, m_image_deleter
, instance_watcher
, m_local_rados
,
159 m_local_mirror_uuid
, m_local_pool_id
, global_image_id
);
161 dout(20) << global_image_id
<< ": creating replayer " << image_replayer
164 it
= m_image_replayers
.insert(std::make_pair(global_image_id
,
165 image_replayer
)).first
;
168 auto image_replayer
= it
->second
;
169 if (!peer_mirror_uuid
.empty()) {
170 auto iter
= m_peers
.find(Peer(peer_mirror_uuid
));
171 assert(iter
!= m_peers
.end());
172 auto io_ctx
= iter
->io_ctx
;
174 image_replayer
->add_remote_image(peer_mirror_uuid
, peer_image_id
, io_ctx
);
176 start_image_replayer(image_replayer
);
178 m_threads
->work_queue
->queue(on_finish
, 0);
181 template <typename I
>
182 void InstanceReplayer
<I
>::release_image(const std::string
&global_image_id
,
183 const std::string
&peer_mirror_uuid
,
184 const std::string
&peer_image_id
,
185 bool schedule_delete
,
186 Context
*on_finish
) {
187 dout(20) << "global_image_id=" << global_image_id
<< ", peer_mirror_uuid="
188 << peer_mirror_uuid
<< ", peer_image_id=" << peer_image_id
<< dendl
;
190 Mutex::Locker
locker(m_lock
);
192 assert(m_on_shut_down
== nullptr);
194 auto it
= m_image_replayers
.find(global_image_id
);
196 if (it
== m_image_replayers
.end()) {
197 dout(20) << global_image_id
<< ": not found" << dendl
;
198 m_threads
->work_queue
->queue(on_finish
, 0);
202 auto image_replayer
= it
->second
;
203 if (!peer_mirror_uuid
.empty()) {
204 image_replayer
->remove_remote_image(peer_mirror_uuid
, peer_image_id
,
208 if (!image_replayer
->remote_images_empty()) {
209 dout(20) << global_image_id
<< ": still has peer images" << dendl
;
210 m_threads
->work_queue
->queue(on_finish
, 0);
214 m_image_replayers
.erase(it
);
216 on_finish
= new FunctionContext(
217 [image_replayer
, on_finish
] (int r
) {
218 image_replayer
->destroy();
219 on_finish
->complete(0);
222 if (schedule_delete
) {
223 on_finish
= new FunctionContext(
224 [this, image_replayer
, on_finish
] (int r
) {
225 auto global_image_id
= image_replayer
->get_global_image_id();
226 m_image_deleter
->schedule_image_delete(
227 m_local_rados
, m_local_pool_id
, global_image_id
, false);
228 on_finish
->complete(0);
232 stop_image_replayer(image_replayer
, on_finish
);
235 template <typename I
>
236 void InstanceReplayer
<I
>::print_status(Formatter
*f
, stringstream
*ss
) {
243 Mutex::Locker
locker(m_lock
);
245 f
->open_array_section("image_replayers");
246 for (auto &kv
: m_image_replayers
) {
247 auto &image_replayer
= kv
.second
;
248 image_replayer
->print_status(f
, ss
);
253 template <typename I
>
254 void InstanceReplayer
<I
>::start()
258 Mutex::Locker
locker(m_lock
);
260 m_manual_stop
= false;
262 for (auto &kv
: m_image_replayers
) {
263 auto &image_replayer
= kv
.second
;
264 image_replayer
->start(nullptr, true);
268 template <typename I
>
269 void InstanceReplayer
<I
>::stop()
273 Mutex::Locker
locker(m_lock
);
275 m_manual_stop
= true;
277 for (auto &kv
: m_image_replayers
) {
278 auto &image_replayer
= kv
.second
;
279 image_replayer
->stop(nullptr, true);
283 template <typename I
>
284 void InstanceReplayer
<I
>::restart()
288 Mutex::Locker
locker(m_lock
);
290 m_manual_stop
= false;
292 for (auto &kv
: m_image_replayers
) {
293 auto &image_replayer
= kv
.second
;
294 image_replayer
->restart();
298 template <typename I
>
299 void InstanceReplayer
<I
>::flush()
301 dout(20) << "enter" << dendl
;
303 Mutex::Locker
locker(m_lock
);
305 for (auto &kv
: m_image_replayers
) {
306 auto &image_replayer
= kv
.second
;
307 image_replayer
->flush();
311 template <typename I
>
312 void InstanceReplayer
<I
>::start_image_replayer(
313 ImageReplayer
<I
> *image_replayer
) {
314 assert(m_lock
.is_locked());
316 std::string global_image_id
= image_replayer
->get_global_image_id();
317 dout(20) << "global_image_id=" << global_image_id
<< dendl
;
319 if (!image_replayer
->is_stopped()) {
321 } else if (image_replayer
->is_blacklisted()) {
322 derr
<< "blacklisted detected during image replay" << dendl
;
326 FunctionContext
*ctx
= new FunctionContext(
327 [this, global_image_id
] (int r
) {
328 dout(20) << "image deleter result: r=" << r
<< ", "
329 << "global_image_id=" << global_image_id
<< dendl
;
331 Mutex::Locker
locker(m_lock
);
332 m_async_op_tracker
.finish_op();
334 if (r
== -ESTALE
|| r
== -ECANCELED
) {
338 auto it
= m_image_replayers
.find(global_image_id
);
339 if (it
== m_image_replayers
.end()) {
343 auto image_replayer
= it
->second
;
345 image_replayer
->start(nullptr, false);
347 start_image_replayer(image_replayer
);
351 m_async_op_tracker
.start_op();
352 m_image_deleter
->wait_for_scheduled_deletion(
353 m_local_pool_id
, image_replayer
->get_global_image_id(), ctx
, false);
356 template <typename I
>
357 void InstanceReplayer
<I
>::queue_start_image_replayers() {
360 Context
*ctx
= create_context_callback
<
361 InstanceReplayer
, &InstanceReplayer
<I
>::start_image_replayers
>(this);
362 m_async_op_tracker
.start_op();
363 m_threads
->work_queue
->queue(ctx
, 0);
366 template <typename I
>
367 void InstanceReplayer
<I
>::start_image_replayers(int r
) {
370 Mutex::Locker
locker(m_lock
);
371 if (m_on_shut_down
!= nullptr) {
375 size_t image_count
= 0;
376 size_t warning_count
= 0;
377 size_t error_count
= 0;
378 for (auto &it
: m_image_replayers
) {
380 auto health_state
= it
.second
->get_health_state();
381 if (health_state
== image_replayer::HEALTH_STATE_WARNING
) {
383 } else if (health_state
== image_replayer::HEALTH_STATE_ERROR
) {
387 start_image_replayer(it
.second
);
390 m_service_daemon
->add_or_update_attribute(
391 m_local_pool_id
, SERVICE_DAEMON_ASSIGNED_COUNT_KEY
, image_count
);
392 m_service_daemon
->add_or_update_attribute(
393 m_local_pool_id
, SERVICE_DAEMON_WARNING_COUNT_KEY
, warning_count
);
394 m_service_daemon
->add_or_update_attribute(
395 m_local_pool_id
, SERVICE_DAEMON_ERROR_COUNT_KEY
, error_count
);
397 m_async_op_tracker
.finish_op();
400 template <typename I
>
401 void InstanceReplayer
<I
>::stop_image_replayer(ImageReplayer
<I
> *image_replayer
,
402 Context
*on_finish
) {
403 dout(20) << image_replayer
<< " global_image_id="
404 << image_replayer
->get_global_image_id() << ", on_finish="
405 << on_finish
<< dendl
;
407 if (image_replayer
->is_stopped()) {
408 m_threads
->work_queue
->queue(on_finish
, 0);
412 m_async_op_tracker
.start_op();
413 Context
*ctx
= create_async_context_callback(
414 m_threads
->work_queue
, new FunctionContext(
415 [this, image_replayer
, on_finish
] (int r
) {
416 stop_image_replayer(image_replayer
, on_finish
);
417 m_async_op_tracker
.finish_op();
420 if (image_replayer
->is_running()) {
421 image_replayer
->stop(ctx
, false);
424 dout(20) << "scheduling image replayer " << image_replayer
<< " stop after "
425 << after
<< " sec (task " << ctx
<< ")" << dendl
;
426 ctx
= new FunctionContext(
427 [this, after
, ctx
] (int r
) {
428 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
429 m_threads
->timer
->add_event_after(after
, ctx
);
431 m_threads
->work_queue
->queue(ctx
, 0);
435 template <typename I
>
436 void InstanceReplayer
<I
>::wait_for_ops() {
439 Context
*ctx
= create_context_callback
<
440 InstanceReplayer
, &InstanceReplayer
<I
>::handle_wait_for_ops
>(this);
442 m_async_op_tracker
.wait_for_ops(ctx
);
445 template <typename I
>
446 void InstanceReplayer
<I
>::handle_wait_for_ops(int r
) {
447 dout(20) << "r=" << r
<< dendl
;
451 Mutex::Locker
locker(m_lock
);
452 stop_image_replayers();
455 template <typename I
>
456 void InstanceReplayer
<I
>::stop_image_replayers() {
459 assert(m_lock
.is_locked());
461 Context
*ctx
= create_async_context_callback(
462 m_threads
->work_queue
, create_context_callback
<InstanceReplayer
<I
>,
463 &InstanceReplayer
<I
>::handle_stop_image_replayers
>(this));
465 C_Gather
*gather_ctx
= new C_Gather(g_ceph_context
, ctx
);
466 for (auto &it
: m_image_replayers
) {
467 stop_image_replayer(it
.second
, gather_ctx
->new_sub());
469 gather_ctx
->activate();
472 template <typename I
>
473 void InstanceReplayer
<I
>::handle_stop_image_replayers(int r
) {
474 dout(20) << "r=" << r
<< dendl
;
478 Context
*on_finish
= nullptr;
480 Mutex::Locker
locker(m_lock
);
482 for (auto &it
: m_image_replayers
) {
483 assert(it
.second
->is_stopped());
484 it
.second
->destroy();
486 m_image_replayers
.clear();
488 assert(m_on_shut_down
!= nullptr);
489 std::swap(on_finish
, m_on_shut_down
);
491 on_finish
->complete(r
);
494 template <typename I
>
495 void InstanceReplayer
<I
>::cancel_image_state_check_task() {
496 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
498 if (m_image_state_check_task
== nullptr) {
502 dout(20) << m_image_state_check_task
<< dendl
;
503 bool canceled
= m_threads
->timer
->cancel_event(m_image_state_check_task
);
505 m_image_state_check_task
= nullptr;
508 template <typename I
>
509 void InstanceReplayer
<I
>::schedule_image_state_check_task() {
510 assert(m_threads
->timer_lock
.is_locked());
511 assert(m_image_state_check_task
== nullptr);
513 m_image_state_check_task
= new FunctionContext(
515 assert(m_threads
->timer_lock
.is_locked());
516 m_image_state_check_task
= nullptr;
517 schedule_image_state_check_task();
518 queue_start_image_replayers();
521 int after
= g_ceph_context
->_conf
->rbd_mirror_image_state_check_interval
;
523 dout(20) << "scheduling image state check after " << after
<< " sec (task "
524 << m_image_state_check_task
<< ")" << dendl
;
525 m_threads
->timer
->add_event_after(after
, m_image_state_check_task
);
528 } // namespace mirror
531 template class rbd::mirror::InstanceReplayer
<librbd::ImageCtx
>;