1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/stringify.h"
5 #include "common/Cond.h"
6 #include "common/Timer.h"
7 #include "common/debug.h"
8 #include "common/errno.h"
9 #include "common/WorkQueue.h"
10 #include "librbd/Utils.h"
11 #include "ImageReplayer.h"
12 #include "InstanceReplayer.h"
13 #include "ServiceDaemon.h"
16 #define dout_context g_ceph_context
17 #define dout_subsys ceph_subsys_rbd_mirror
19 #define dout_prefix *_dout << "rbd::mirror::InstanceReplayer: " \
20 << this << " " << __func__ << ": "
27 const std::string
SERVICE_DAEMON_ASSIGNED_COUNT_KEY("image_assigned_count");
28 const std::string
SERVICE_DAEMON_WARNING_COUNT_KEY("image_warning_count");
29 const std::string
SERVICE_DAEMON_ERROR_COUNT_KEY("image_error_count");
31 } // anonymous namespace
33 using librbd::util::create_async_context_callback
;
34 using librbd::util::create_context_callback
;
37 InstanceReplayer
<I
>::InstanceReplayer(
38 librados::IoCtx
&local_io_ctx
, const std::string
&local_mirror_uuid
,
39 Threads
<I
> *threads
, ServiceDaemon
<I
>* service_daemon
,
40 MirrorStatusUpdater
<I
>* local_status_updater
,
41 journal::CacheManagerHandler
*cache_manager_handler
,
42 PoolMetaCache
* pool_meta_cache
)
43 : m_local_io_ctx(local_io_ctx
), m_local_mirror_uuid(local_mirror_uuid
),
44 m_threads(threads
), m_service_daemon(service_daemon
),
45 m_local_status_updater(local_status_updater
),
46 m_cache_manager_handler(cache_manager_handler
),
47 m_pool_meta_cache(pool_meta_cache
),
48 m_lock(ceph::make_mutex("rbd::mirror::InstanceReplayer " +
49 stringify(local_io_ctx
.get_id()))) {
53 InstanceReplayer
<I
>::~InstanceReplayer() {
54 ceph_assert(m_image_state_check_task
== nullptr);
55 ceph_assert(m_async_op_tracker
.empty());
56 ceph_assert(m_image_replayers
.empty());
60 bool InstanceReplayer
<I
>::is_blacklisted() const {
61 std::lock_guard locker
{m_lock
};
66 int InstanceReplayer
<I
>::init() {
69 return init_ctx
.wait();
73 void InstanceReplayer
<I
>::init(Context
*on_finish
) {
76 Context
*ctx
= new LambdaContext(
77 [this, on_finish
] (int r
) {
79 std::lock_guard timer_locker
{m_threads
->timer_lock
};
80 schedule_image_state_check_task();
82 on_finish
->complete(0);
85 m_threads
->work_queue
->queue(ctx
, 0);
89 void InstanceReplayer
<I
>::shut_down() {
90 C_SaferCond shut_down_ctx
;
91 shut_down(&shut_down_ctx
);
92 int r
= shut_down_ctx
.wait();
97 void InstanceReplayer
<I
>::shut_down(Context
*on_finish
) {
100 std::lock_guard locker
{m_lock
};
102 ceph_assert(m_on_shut_down
== nullptr);
103 m_on_shut_down
= on_finish
;
105 Context
*ctx
= new LambdaContext(
107 cancel_image_state_check_task();
111 m_threads
->work_queue
->queue(ctx
, 0);
114 template <typename I
>
115 void InstanceReplayer
<I
>::add_peer(const Peer
<I
>& peer
) {
116 dout(10) << "peer=" << peer
<< dendl
;
118 std::lock_guard locker
{m_lock
};
119 auto result
= m_peers
.insert(peer
).second
;
123 template <typename I
>
124 void InstanceReplayer
<I
>::release_all(Context
*on_finish
) {
127 std::lock_guard locker
{m_lock
};
129 C_Gather
*gather_ctx
= new C_Gather(g_ceph_context
, on_finish
);
130 for (auto it
= m_image_replayers
.begin(); it
!= m_image_replayers
.end();
131 it
= m_image_replayers
.erase(it
)) {
132 auto image_replayer
= it
->second
;
133 auto ctx
= gather_ctx
->new_sub();
134 ctx
= new LambdaContext(
135 [image_replayer
, ctx
] (int r
) {
136 image_replayer
->destroy();
139 stop_image_replayer(image_replayer
, ctx
);
141 gather_ctx
->activate();
144 template <typename I
>
145 void InstanceReplayer
<I
>::acquire_image(InstanceWatcher
<I
> *instance_watcher
,
146 const std::string
&global_image_id
,
147 Context
*on_finish
) {
148 dout(10) << "global_image_id=" << global_image_id
<< dendl
;
150 std::lock_guard locker
{m_lock
};
152 ceph_assert(m_on_shut_down
== nullptr);
154 auto it
= m_image_replayers
.find(global_image_id
);
155 if (it
== m_image_replayers
.end()) {
156 auto image_replayer
= ImageReplayer
<I
>::create(
157 m_local_io_ctx
, m_local_mirror_uuid
, global_image_id
,
158 m_threads
, instance_watcher
, m_local_status_updater
,
159 m_cache_manager_handler
, m_pool_meta_cache
);
161 dout(10) << global_image_id
<< ": creating replayer " << image_replayer
164 it
= m_image_replayers
.insert(std::make_pair(global_image_id
,
165 image_replayer
)).first
;
167 // TODO only a single peer is currently supported
168 ceph_assert(m_peers
.size() == 1);
169 auto peer
= *m_peers
.begin();
170 image_replayer
->add_peer(peer
);
171 start_image_replayer(image_replayer
);
173 // A duplicate acquire notification implies (1) connection hiccup or
174 // (2) new leader election. For the second case, restart the replayer to
175 // detect if the image has been deleted while the leader was offline
176 auto& image_replayer
= it
->second
;
177 image_replayer
->set_finished(false);
178 image_replayer
->restart(new C_TrackedOp(m_async_op_tracker
, nullptr));
181 m_threads
->work_queue
->queue(on_finish
, 0);
184 template <typename I
>
185 void InstanceReplayer
<I
>::release_image(const std::string
&global_image_id
,
186 Context
*on_finish
) {
187 dout(10) << "global_image_id=" << global_image_id
<< dendl
;
189 std::lock_guard locker
{m_lock
};
190 ceph_assert(m_on_shut_down
== nullptr);
192 auto it
= m_image_replayers
.find(global_image_id
);
193 if (it
== m_image_replayers
.end()) {
194 dout(5) << global_image_id
<< ": not found" << dendl
;
195 m_threads
->work_queue
->queue(on_finish
, 0);
199 auto image_replayer
= it
->second
;
200 m_image_replayers
.erase(it
);
202 on_finish
= new LambdaContext(
203 [image_replayer
, on_finish
] (int r
) {
204 image_replayer
->destroy();
205 on_finish
->complete(0);
207 stop_image_replayer(image_replayer
, on_finish
);
210 template <typename I
>
211 void InstanceReplayer
<I
>::remove_peer_image(const std::string
&global_image_id
,
212 const std::string
&peer_mirror_uuid
,
213 Context
*on_finish
) {
214 dout(10) << "global_image_id=" << global_image_id
<< ", "
215 << "peer_mirror_uuid=" << peer_mirror_uuid
<< dendl
;
217 std::lock_guard locker
{m_lock
};
218 ceph_assert(m_on_shut_down
== nullptr);
220 auto it
= m_image_replayers
.find(global_image_id
);
221 if (it
!= m_image_replayers
.end()) {
222 // TODO only a single peer is currently supported, therefore
223 // we can just interrupt the current image replayer and
224 // it will eventually detect that the peer image is missing and
225 // determine if a delete propagation is required.
226 auto image_replayer
= it
->second
;
227 image_replayer
->restart(new C_TrackedOp(m_async_op_tracker
, nullptr));
229 m_threads
->work_queue
->queue(on_finish
, 0);
232 template <typename I
>
233 void InstanceReplayer
<I
>::print_status(Formatter
*f
) {
236 std::lock_guard locker
{m_lock
};
238 f
->open_array_section("image_replayers");
239 for (auto &kv
: m_image_replayers
) {
240 auto &image_replayer
= kv
.second
;
241 image_replayer
->print_status(f
);
246 template <typename I
>
247 void InstanceReplayer
<I
>::start()
251 std::lock_guard locker
{m_lock
};
253 m_manual_stop
= false;
255 auto cct
= static_cast<CephContext
*>(m_local_io_ctx
.cct());
256 auto gather_ctx
= new C_Gather(
257 cct
, new C_TrackedOp(m_async_op_tracker
, nullptr));
258 for (auto &kv
: m_image_replayers
) {
259 auto &image_replayer
= kv
.second
;
260 image_replayer
->start(gather_ctx
->new_sub(), true);
263 gather_ctx
->activate();
266 template <typename I
>
267 void InstanceReplayer
<I
>::stop()
272 template <typename I
>
273 void InstanceReplayer
<I
>::stop(Context
*on_finish
)
277 auto cct
= static_cast<CephContext
*>(m_local_io_ctx
.cct());
278 auto gather_ctx
= new C_Gather(
279 cct
, new C_TrackedOp(m_async_op_tracker
, on_finish
));
281 std::lock_guard locker
{m_lock
};
283 m_manual_stop
= true;
285 for (auto &kv
: m_image_replayers
) {
286 auto &image_replayer
= kv
.second
;
287 image_replayer
->stop(gather_ctx
->new_sub(), true);
291 gather_ctx
->activate();
294 template <typename I
>
295 void InstanceReplayer
<I
>::restart()
299 std::lock_guard locker
{m_lock
};
301 m_manual_stop
= false;
303 for (auto &kv
: m_image_replayers
) {
304 auto &image_replayer
= kv
.second
;
305 image_replayer
->restart(new C_TrackedOp(m_async_op_tracker
, nullptr));
309 template <typename I
>
310 void InstanceReplayer
<I
>::flush()
314 std::lock_guard locker
{m_lock
};
316 for (auto &kv
: m_image_replayers
) {
317 auto &image_replayer
= kv
.second
;
318 image_replayer
->flush();
322 template <typename I
>
323 void InstanceReplayer
<I
>::start_image_replayer(
324 ImageReplayer
<I
> *image_replayer
) {
325 ceph_assert(ceph_mutex_is_locked(m_lock
));
327 std::string global_image_id
= image_replayer
->get_global_image_id();
328 if (!image_replayer
->is_stopped()) {
330 } else if (image_replayer
->is_blacklisted()) {
331 derr
<< "global_image_id=" << global_image_id
<< ": blacklisted detected "
332 << "during image replay" << dendl
;
333 m_blacklisted
= true;
335 } else if (image_replayer
->is_finished()) {
336 // TODO temporary until policy integrated
337 dout(5) << "removing image replayer for global_image_id="
338 << global_image_id
<< dendl
;
339 m_image_replayers
.erase(image_replayer
->get_global_image_id());
340 image_replayer
->destroy();
342 } else if (m_manual_stop
) {
346 dout(10) << "global_image_id=" << global_image_id
<< dendl
;
347 image_replayer
->start(new C_TrackedOp(m_async_op_tracker
, nullptr), false);
350 template <typename I
>
351 void InstanceReplayer
<I
>::queue_start_image_replayers() {
354 Context
*ctx
= create_context_callback
<
355 InstanceReplayer
, &InstanceReplayer
<I
>::start_image_replayers
>(this);
356 m_async_op_tracker
.start_op();
357 m_threads
->work_queue
->queue(ctx
, 0);
360 template <typename I
>
361 void InstanceReplayer
<I
>::start_image_replayers(int r
) {
364 std::lock_guard locker
{m_lock
};
365 if (m_on_shut_down
!= nullptr) {
369 uint64_t image_count
= 0;
370 uint64_t warning_count
= 0;
371 uint64_t error_count
= 0;
372 for (auto it
= m_image_replayers
.begin();
373 it
!= m_image_replayers
.end();) {
378 auto health_state
= current_it
->second
->get_health_state();
379 if (health_state
== image_replayer::HEALTH_STATE_WARNING
) {
381 } else if (health_state
== image_replayer::HEALTH_STATE_ERROR
) {
385 start_image_replayer(current_it
->second
);
388 m_service_daemon
->add_or_update_namespace_attribute(
389 m_local_io_ctx
.get_id(), m_local_io_ctx
.get_namespace(),
390 SERVICE_DAEMON_ASSIGNED_COUNT_KEY
, image_count
);
391 m_service_daemon
->add_or_update_namespace_attribute(
392 m_local_io_ctx
.get_id(), m_local_io_ctx
.get_namespace(),
393 SERVICE_DAEMON_WARNING_COUNT_KEY
, warning_count
);
394 m_service_daemon
->add_or_update_namespace_attribute(
395 m_local_io_ctx
.get_id(), m_local_io_ctx
.get_namespace(),
396 SERVICE_DAEMON_ERROR_COUNT_KEY
, error_count
);
398 m_async_op_tracker
.finish_op();
401 template <typename I
>
402 void InstanceReplayer
<I
>::stop_image_replayer(ImageReplayer
<I
> *image_replayer
,
403 Context
*on_finish
) {
404 dout(10) << image_replayer
<< " global_image_id="
405 << image_replayer
->get_global_image_id() << ", on_finish="
406 << on_finish
<< dendl
;
408 if (image_replayer
->is_stopped()) {
409 m_threads
->work_queue
->queue(on_finish
, 0);
413 m_async_op_tracker
.start_op();
414 Context
*ctx
= create_async_context_callback(
415 m_threads
->work_queue
, new LambdaContext(
416 [this, image_replayer
, on_finish
] (int r
) {
417 stop_image_replayer(image_replayer
, on_finish
);
418 m_async_op_tracker
.finish_op();
421 if (image_replayer
->is_running()) {
422 image_replayer
->stop(ctx
, false);
425 dout(10) << "scheduling image replayer " << image_replayer
<< " stop after "
426 << after
<< " sec (task " << ctx
<< ")" << dendl
;
427 ctx
= new LambdaContext(
428 [this, after
, ctx
] (int r
) {
429 std::lock_guard timer_locker
{m_threads
->timer_lock
};
430 m_threads
->timer
->add_event_after(after
, ctx
);
432 m_threads
->work_queue
->queue(ctx
, 0);
436 template <typename I
>
437 void InstanceReplayer
<I
>::wait_for_ops() {
440 Context
*ctx
= create_context_callback
<
441 InstanceReplayer
, &InstanceReplayer
<I
>::handle_wait_for_ops
>(this);
443 m_async_op_tracker
.wait_for_ops(ctx
);
446 template <typename I
>
447 void InstanceReplayer
<I
>::handle_wait_for_ops(int r
) {
448 dout(10) << "r=" << r
<< dendl
;
452 std::lock_guard locker
{m_lock
};
453 stop_image_replayers();
456 template <typename I
>
457 void InstanceReplayer
<I
>::stop_image_replayers() {
460 ceph_assert(ceph_mutex_is_locked(m_lock
));
462 Context
*ctx
= create_async_context_callback(
463 m_threads
->work_queue
, create_context_callback
<InstanceReplayer
<I
>,
464 &InstanceReplayer
<I
>::handle_stop_image_replayers
>(this));
466 C_Gather
*gather_ctx
= new C_Gather(g_ceph_context
, ctx
);
467 for (auto &it
: m_image_replayers
) {
468 stop_image_replayer(it
.second
, gather_ctx
->new_sub());
470 gather_ctx
->activate();
473 template <typename I
>
474 void InstanceReplayer
<I
>::handle_stop_image_replayers(int r
) {
475 dout(10) << "r=" << r
<< dendl
;
479 Context
*on_finish
= nullptr;
481 std::lock_guard locker
{m_lock
};
483 for (auto &it
: m_image_replayers
) {
484 ceph_assert(it
.second
->is_stopped());
485 it
.second
->destroy();
487 m_image_replayers
.clear();
489 ceph_assert(m_on_shut_down
!= nullptr);
490 std::swap(on_finish
, m_on_shut_down
);
492 on_finish
->complete(r
);
495 template <typename I
>
496 void InstanceReplayer
<I
>::cancel_image_state_check_task() {
497 std::lock_guard timer_locker
{m_threads
->timer_lock
};
499 if (m_image_state_check_task
== nullptr) {
503 dout(10) << m_image_state_check_task
<< dendl
;
504 bool canceled
= m_threads
->timer
->cancel_event(m_image_state_check_task
);
505 ceph_assert(canceled
);
506 m_image_state_check_task
= nullptr;
509 template <typename I
>
510 void InstanceReplayer
<I
>::schedule_image_state_check_task() {
511 ceph_assert(ceph_mutex_is_locked(m_threads
->timer_lock
));
512 ceph_assert(m_image_state_check_task
== nullptr);
514 m_image_state_check_task
= new LambdaContext(
516 ceph_assert(ceph_mutex_is_locked(m_threads
->timer_lock
));
517 m_image_state_check_task
= nullptr;
518 schedule_image_state_check_task();
519 queue_start_image_replayers();
522 auto cct
= static_cast<CephContext
*>(m_local_io_ctx
.cct());
523 int after
= cct
->_conf
.get_val
<uint64_t>(
524 "rbd_mirror_image_state_check_interval");
526 dout(10) << "scheduling image state check after " << after
<< " sec (task "
527 << m_image_state_check_task
<< ")" << dendl
;
528 m_threads
->timer
->add_event_after(after
, m_image_state_check_task
);
531 } // namespace mirror
534 template class rbd::mirror::InstanceReplayer
<librbd::ImageCtx
>;