1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/stringify.h"
5 #include "common/Timer.h"
6 #include "common/debug.h"
7 #include "common/errno.h"
8 #include "librbd/Utils.h"
9 #include "ImageReplayer.h"
10 #include "InstanceReplayer.h"
11 #include "ServiceDaemon.h"
14 #define dout_context g_ceph_context
15 #define dout_subsys ceph_subsys_rbd_mirror
17 #define dout_prefix *_dout << "rbd::mirror::InstanceReplayer: " \
18 << this << " " << __func__ << ": "
25 const std::string
SERVICE_DAEMON_ASSIGNED_COUNT_KEY("image_assigned_count");
26 const std::string
SERVICE_DAEMON_WARNING_COUNT_KEY("image_warning_count");
27 const std::string
SERVICE_DAEMON_ERROR_COUNT_KEY("image_error_count");
29 } // anonymous namespace
31 using librbd::util::create_async_context_callback
;
32 using librbd::util::create_context_callback
;
35 InstanceReplayer
<I
>::InstanceReplayer(
36 Threads
<I
> *threads
, ServiceDaemon
<I
>* service_daemon
,
37 ImageDeleter
<I
>* image_deleter
, RadosRef local_rados
,
38 const std::string
&local_mirror_uuid
, int64_t local_pool_id
)
39 : m_threads(threads
), m_service_daemon(service_daemon
),
40 m_image_deleter(image_deleter
), m_local_rados(local_rados
),
41 m_local_mirror_uuid(local_mirror_uuid
), m_local_pool_id(local_pool_id
),
42 m_lock("rbd::mirror::InstanceReplayer " + stringify(local_pool_id
)) {
46 InstanceReplayer
<I
>::~InstanceReplayer() {
47 assert(m_image_state_check_task
== nullptr);
48 assert(m_async_op_tracker
.empty());
49 assert(m_image_replayers
.empty());
53 int InstanceReplayer
<I
>::init() {
56 return init_ctx
.wait();
60 void InstanceReplayer
<I
>::init(Context
*on_finish
) {
63 Context
*ctx
= new FunctionContext(
64 [this, on_finish
] (int r
) {
66 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
67 schedule_image_state_check_task();
69 on_finish
->complete(0);
72 m_threads
->work_queue
->queue(ctx
, 0);
76 void InstanceReplayer
<I
>::shut_down() {
77 C_SaferCond shut_down_ctx
;
78 shut_down(&shut_down_ctx
);
79 int r
= shut_down_ctx
.wait();
84 void InstanceReplayer
<I
>::shut_down(Context
*on_finish
) {
87 Mutex::Locker
locker(m_lock
);
89 assert(m_on_shut_down
== nullptr);
90 m_on_shut_down
= on_finish
;
92 Context
*ctx
= new FunctionContext(
94 cancel_image_state_check_task();
98 m_threads
->work_queue
->queue(ctx
, 0);
101 template <typename I
>
102 void InstanceReplayer
<I
>::add_peer(std::string peer_uuid
,
103 librados::IoCtx io_ctx
) {
104 dout(20) << peer_uuid
<< dendl
;
106 Mutex::Locker
locker(m_lock
);
107 auto result
= m_peers
.insert(Peer(peer_uuid
, io_ctx
)).second
;
111 template <typename I
>
112 void InstanceReplayer
<I
>::release_all(Context
*on_finish
) {
115 Mutex::Locker
locker(m_lock
);
117 C_Gather
*gather_ctx
= new C_Gather(g_ceph_context
, on_finish
);
118 for (auto it
= m_image_replayers
.begin(); it
!= m_image_replayers
.end();
119 it
= m_image_replayers
.erase(it
)) {
120 auto image_replayer
= it
->second
;
121 auto ctx
= gather_ctx
->new_sub();
122 ctx
= new FunctionContext(
123 [image_replayer
, ctx
] (int r
) {
124 image_replayer
->destroy();
127 stop_image_replayer(image_replayer
, ctx
);
129 gather_ctx
->activate();
132 template <typename I
>
133 void InstanceReplayer
<I
>::acquire_image(InstanceWatcher
<I
> *instance_watcher
,
134 const std::string
&global_image_id
,
135 Context
*on_finish
) {
136 dout(20) << "global_image_id=" << global_image_id
<< dendl
;
138 Mutex::Locker
locker(m_lock
);
140 assert(m_on_shut_down
== nullptr);
142 auto it
= m_image_replayers
.find(global_image_id
);
143 if (it
== m_image_replayers
.end()) {
144 auto image_replayer
= ImageReplayer
<I
>::create(
145 m_threads
, m_image_deleter
, instance_watcher
, m_local_rados
,
146 m_local_mirror_uuid
, m_local_pool_id
, global_image_id
);
148 dout(20) << global_image_id
<< ": creating replayer " << image_replayer
151 it
= m_image_replayers
.insert(std::make_pair(global_image_id
,
152 image_replayer
)).first
;
154 // TODO only a single peer is currently supported
155 assert(m_peers
.size() == 1);
156 auto peer
= *m_peers
.begin();
157 image_replayer
->add_peer(peer
.peer_uuid
, peer
.io_ctx
);
160 auto& image_replayer
= it
->second
;
161 // TODO temporary until policy integrated
162 image_replayer
->set_finished(false);
164 start_image_replayer(image_replayer
);
165 m_threads
->work_queue
->queue(on_finish
, 0);
168 template <typename I
>
169 void InstanceReplayer
<I
>::release_image(const std::string
&global_image_id
,
170 Context
*on_finish
) {
171 dout(20) << "global_image_id=" << global_image_id
<< dendl
;
173 Mutex::Locker
locker(m_lock
);
174 assert(m_on_shut_down
== nullptr);
176 auto it
= m_image_replayers
.find(global_image_id
);
177 if (it
== m_image_replayers
.end()) {
178 dout(20) << global_image_id
<< ": not found" << dendl
;
179 m_threads
->work_queue
->queue(on_finish
, 0);
183 auto image_replayer
= it
->second
;
184 m_image_replayers
.erase(it
);
186 on_finish
= new FunctionContext(
187 [image_replayer
, on_finish
] (int r
) {
188 image_replayer
->destroy();
189 on_finish
->complete(0);
191 stop_image_replayer(image_replayer
, on_finish
);
194 template <typename I
>
195 void InstanceReplayer
<I
>::remove_peer_image(const std::string
&global_image_id
,
196 const std::string
&peer_mirror_uuid
,
197 Context
*on_finish
) {
198 dout(20) << "global_image_id=" << global_image_id
<< ", "
199 << "peer_mirror_uuid=" << peer_mirror_uuid
<< dendl
;
201 Mutex::Locker
locker(m_lock
);
202 assert(m_on_shut_down
== nullptr);
204 auto it
= m_image_replayers
.find(global_image_id
);
205 if (it
!= m_image_replayers
.end()) {
206 // TODO only a single peer is currently supported, therefore
207 // we can just interrupt the current image replayer and
208 // it will eventually detect that the peer image is missing and
209 // determine if a delete propagation is required.
210 auto image_replayer
= it
->second
;
211 image_replayer
->restart();
213 m_threads
->work_queue
->queue(on_finish
, 0);
216 template <typename I
>
217 void InstanceReplayer
<I
>::print_status(Formatter
*f
, stringstream
*ss
) {
224 Mutex::Locker
locker(m_lock
);
226 f
->open_array_section("image_replayers");
227 for (auto &kv
: m_image_replayers
) {
228 auto &image_replayer
= kv
.second
;
229 image_replayer
->print_status(f
, ss
);
234 template <typename I
>
235 void InstanceReplayer
<I
>::start()
239 Mutex::Locker
locker(m_lock
);
241 m_manual_stop
= false;
243 for (auto &kv
: m_image_replayers
) {
244 auto &image_replayer
= kv
.second
;
245 image_replayer
->start(nullptr, true);
249 template <typename I
>
250 void InstanceReplayer
<I
>::stop()
254 Mutex::Locker
locker(m_lock
);
256 m_manual_stop
= true;
258 for (auto &kv
: m_image_replayers
) {
259 auto &image_replayer
= kv
.second
;
260 image_replayer
->stop(nullptr, true);
264 template <typename I
>
265 void InstanceReplayer
<I
>::restart()
269 Mutex::Locker
locker(m_lock
);
271 m_manual_stop
= false;
273 for (auto &kv
: m_image_replayers
) {
274 auto &image_replayer
= kv
.second
;
275 image_replayer
->restart();
279 template <typename I
>
280 void InstanceReplayer
<I
>::flush()
282 dout(20) << "enter" << dendl
;
284 Mutex::Locker
locker(m_lock
);
286 for (auto &kv
: m_image_replayers
) {
287 auto &image_replayer
= kv
.second
;
288 image_replayer
->flush();
292 template <typename I
>
293 void InstanceReplayer
<I
>::start_image_replayer(
294 ImageReplayer
<I
> *image_replayer
) {
295 assert(m_lock
.is_locked());
297 std::string global_image_id
= image_replayer
->get_global_image_id();
298 dout(20) << "global_image_id=" << global_image_id
<< dendl
;
300 if (!image_replayer
->is_stopped()) {
302 } else if (image_replayer
->is_blacklisted()) {
303 derr
<< "blacklisted detected during image replay" << dendl
;
305 } else if (image_replayer
->is_finished()) {
306 // TODO temporary until policy integrated
307 dout(5) << "removing image replayer for global_image_id="
308 << global_image_id
<< dendl
;
309 m_image_replayers
.erase(image_replayer
->get_global_image_id());
310 image_replayer
->destroy();
314 image_replayer
->start(nullptr, false);
317 template <typename I
>
318 void InstanceReplayer
<I
>::queue_start_image_replayers() {
321 Context
*ctx
= create_context_callback
<
322 InstanceReplayer
, &InstanceReplayer
<I
>::start_image_replayers
>(this);
323 m_async_op_tracker
.start_op();
324 m_threads
->work_queue
->queue(ctx
, 0);
327 template <typename I
>
328 void InstanceReplayer
<I
>::start_image_replayers(int r
) {
331 Mutex::Locker
locker(m_lock
);
332 if (m_on_shut_down
!= nullptr) {
336 uint64_t image_count
= 0;
337 uint64_t warning_count
= 0;
338 uint64_t error_count
= 0;
339 for (auto it
= m_image_replayers
.begin();
340 it
!= m_image_replayers
.end();) {
345 auto health_state
= current_it
->second
->get_health_state();
346 if (health_state
== image_replayer::HEALTH_STATE_WARNING
) {
348 } else if (health_state
== image_replayer::HEALTH_STATE_ERROR
) {
352 start_image_replayer(current_it
->second
);
355 m_service_daemon
->add_or_update_attribute(
356 m_local_pool_id
, SERVICE_DAEMON_ASSIGNED_COUNT_KEY
, image_count
);
357 m_service_daemon
->add_or_update_attribute(
358 m_local_pool_id
, SERVICE_DAEMON_WARNING_COUNT_KEY
, warning_count
);
359 m_service_daemon
->add_or_update_attribute(
360 m_local_pool_id
, SERVICE_DAEMON_ERROR_COUNT_KEY
, error_count
);
362 m_async_op_tracker
.finish_op();
365 template <typename I
>
366 void InstanceReplayer
<I
>::stop_image_replayer(ImageReplayer
<I
> *image_replayer
,
367 Context
*on_finish
) {
368 dout(20) << image_replayer
<< " global_image_id="
369 << image_replayer
->get_global_image_id() << ", on_finish="
370 << on_finish
<< dendl
;
372 if (image_replayer
->is_stopped()) {
373 m_threads
->work_queue
->queue(on_finish
, 0);
377 m_async_op_tracker
.start_op();
378 Context
*ctx
= create_async_context_callback(
379 m_threads
->work_queue
, new FunctionContext(
380 [this, image_replayer
, on_finish
] (int r
) {
381 stop_image_replayer(image_replayer
, on_finish
);
382 m_async_op_tracker
.finish_op();
385 if (image_replayer
->is_running()) {
386 image_replayer
->stop(ctx
, false);
389 dout(20) << "scheduling image replayer " << image_replayer
<< " stop after "
390 << after
<< " sec (task " << ctx
<< ")" << dendl
;
391 ctx
= new FunctionContext(
392 [this, after
, ctx
] (int r
) {
393 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
394 m_threads
->timer
->add_event_after(after
, ctx
);
396 m_threads
->work_queue
->queue(ctx
, 0);
400 template <typename I
>
401 void InstanceReplayer
<I
>::wait_for_ops() {
404 Context
*ctx
= create_context_callback
<
405 InstanceReplayer
, &InstanceReplayer
<I
>::handle_wait_for_ops
>(this);
407 m_async_op_tracker
.wait_for_ops(ctx
);
410 template <typename I
>
411 void InstanceReplayer
<I
>::handle_wait_for_ops(int r
) {
412 dout(20) << "r=" << r
<< dendl
;
416 Mutex::Locker
locker(m_lock
);
417 stop_image_replayers();
420 template <typename I
>
421 void InstanceReplayer
<I
>::stop_image_replayers() {
424 assert(m_lock
.is_locked());
426 Context
*ctx
= create_async_context_callback(
427 m_threads
->work_queue
, create_context_callback
<InstanceReplayer
<I
>,
428 &InstanceReplayer
<I
>::handle_stop_image_replayers
>(this));
430 C_Gather
*gather_ctx
= new C_Gather(g_ceph_context
, ctx
);
431 for (auto &it
: m_image_replayers
) {
432 stop_image_replayer(it
.second
, gather_ctx
->new_sub());
434 gather_ctx
->activate();
437 template <typename I
>
438 void InstanceReplayer
<I
>::handle_stop_image_replayers(int r
) {
439 dout(20) << "r=" << r
<< dendl
;
443 Context
*on_finish
= nullptr;
445 Mutex::Locker
locker(m_lock
);
447 for (auto &it
: m_image_replayers
) {
448 assert(it
.second
->is_stopped());
449 it
.second
->destroy();
451 m_image_replayers
.clear();
453 assert(m_on_shut_down
!= nullptr);
454 std::swap(on_finish
, m_on_shut_down
);
456 on_finish
->complete(r
);
459 template <typename I
>
460 void InstanceReplayer
<I
>::cancel_image_state_check_task() {
461 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
463 if (m_image_state_check_task
== nullptr) {
467 dout(20) << m_image_state_check_task
<< dendl
;
468 bool canceled
= m_threads
->timer
->cancel_event(m_image_state_check_task
);
470 m_image_state_check_task
= nullptr;
473 template <typename I
>
474 void InstanceReplayer
<I
>::schedule_image_state_check_task() {
475 assert(m_threads
->timer_lock
.is_locked());
476 assert(m_image_state_check_task
== nullptr);
478 m_image_state_check_task
= new FunctionContext(
480 assert(m_threads
->timer_lock
.is_locked());
481 m_image_state_check_task
= nullptr;
482 schedule_image_state_check_task();
483 queue_start_image_replayers();
486 int after
= g_ceph_context
->_conf
->rbd_mirror_image_state_check_interval
;
488 dout(20) << "scheduling image state check after " << after
<< " sec (task "
489 << m_image_state_check_task
<< ")" << dendl
;
490 m_threads
->timer
->add_event_after(after
, m_image_state_check_task
);
493 } // namespace mirror
496 template class rbd::mirror::InstanceReplayer
<librbd::ImageCtx
>;