1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/stringify.h"
5 #include "common/Cond.h"
6 #include "common/Timer.h"
7 #include "common/debug.h"
8 #include "common/errno.h"
9 #include "common/WorkQueue.h"
10 #include "librbd/Utils.h"
11 #include "ImageReplayer.h"
12 #include "InstanceReplayer.h"
13 #include "ServiceDaemon.h"
16 #define dout_context g_ceph_context
17 #define dout_subsys ceph_subsys_rbd_mirror
19 #define dout_prefix *_dout << "rbd::mirror::InstanceReplayer: " \
20 << this << " " << __func__ << ": "
27 const std::string
SERVICE_DAEMON_ASSIGNED_COUNT_KEY("image_assigned_count");
28 const std::string
SERVICE_DAEMON_WARNING_COUNT_KEY("image_warning_count");
29 const std::string
SERVICE_DAEMON_ERROR_COUNT_KEY("image_error_count");
31 } // anonymous namespace
33 using librbd::util::create_async_context_callback
;
34 using librbd::util::create_context_callback
;
37 InstanceReplayer
<I
>::InstanceReplayer(
38 librados::IoCtx
&local_io_ctx
, const std::string
&local_mirror_uuid
,
39 Threads
<I
> *threads
, ServiceDaemon
<I
>* service_daemon
,
40 MirrorStatusUpdater
<I
>* local_status_updater
,
41 journal::CacheManagerHandler
*cache_manager_handler
,
42 PoolMetaCache
* pool_meta_cache
)
43 : m_local_io_ctx(local_io_ctx
), m_local_mirror_uuid(local_mirror_uuid
),
44 m_threads(threads
), m_service_daemon(service_daemon
),
45 m_local_status_updater(local_status_updater
),
46 m_cache_manager_handler(cache_manager_handler
),
47 m_pool_meta_cache(pool_meta_cache
),
48 m_lock(ceph::make_mutex("rbd::mirror::InstanceReplayer " +
49 stringify(local_io_ctx
.get_id()))) {
53 InstanceReplayer
<I
>::~InstanceReplayer() {
54 ceph_assert(m_image_state_check_task
== nullptr);
55 ceph_assert(m_async_op_tracker
.empty());
56 ceph_assert(m_image_replayers
.empty());
60 bool InstanceReplayer
<I
>::is_blacklisted() const {
61 std::lock_guard locker
{m_lock
};
66 int InstanceReplayer
<I
>::init() {
69 return init_ctx
.wait();
73 void InstanceReplayer
<I
>::init(Context
*on_finish
) {
76 Context
*ctx
= new LambdaContext(
77 [this, on_finish
] (int r
) {
79 std::lock_guard timer_locker
{m_threads
->timer_lock
};
80 schedule_image_state_check_task();
82 on_finish
->complete(0);
85 m_threads
->work_queue
->queue(ctx
, 0);
89 void InstanceReplayer
<I
>::shut_down() {
90 C_SaferCond shut_down_ctx
;
91 shut_down(&shut_down_ctx
);
92 int r
= shut_down_ctx
.wait();
97 void InstanceReplayer
<I
>::shut_down(Context
*on_finish
) {
100 std::lock_guard locker
{m_lock
};
102 ceph_assert(m_on_shut_down
== nullptr);
103 m_on_shut_down
= on_finish
;
105 Context
*ctx
= new LambdaContext(
107 cancel_image_state_check_task();
111 m_threads
->work_queue
->queue(ctx
, 0);
114 template <typename I
>
115 void InstanceReplayer
<I
>::add_peer(const Peer
<I
>& peer
) {
116 dout(10) << "peer=" << peer
<< dendl
;
118 std::lock_guard locker
{m_lock
};
119 auto result
= m_peers
.insert(peer
).second
;
123 template <typename I
>
124 void InstanceReplayer
<I
>::release_all(Context
*on_finish
) {
127 std::lock_guard locker
{m_lock
};
129 C_Gather
*gather_ctx
= new C_Gather(g_ceph_context
, on_finish
);
130 for (auto it
= m_image_replayers
.begin(); it
!= m_image_replayers
.end();
131 it
= m_image_replayers
.erase(it
)) {
132 auto image_replayer
= it
->second
;
133 auto ctx
= gather_ctx
->new_sub();
134 ctx
= new LambdaContext(
135 [image_replayer
, ctx
] (int r
) {
136 image_replayer
->destroy();
139 stop_image_replayer(image_replayer
, ctx
);
141 gather_ctx
->activate();
144 template <typename I
>
145 void InstanceReplayer
<I
>::acquire_image(InstanceWatcher
<I
> *instance_watcher
,
146 const std::string
&global_image_id
,
147 Context
*on_finish
) {
148 dout(10) << "global_image_id=" << global_image_id
<< dendl
;
150 std::lock_guard locker
{m_lock
};
152 ceph_assert(m_on_shut_down
== nullptr);
154 auto it
= m_image_replayers
.find(global_image_id
);
155 if (it
== m_image_replayers
.end()) {
156 auto image_replayer
= ImageReplayer
<I
>::create(
157 m_local_io_ctx
, m_local_mirror_uuid
, global_image_id
,
158 m_threads
, instance_watcher
, m_local_status_updater
,
159 m_cache_manager_handler
, m_pool_meta_cache
);
161 dout(10) << global_image_id
<< ": creating replayer " << image_replayer
164 it
= m_image_replayers
.insert(std::make_pair(global_image_id
,
165 image_replayer
)).first
;
167 // TODO only a single peer is currently supported
168 ceph_assert(m_peers
.size() == 1);
169 auto peer
= *m_peers
.begin();
170 image_replayer
->add_peer(peer
);
171 start_image_replayer(image_replayer
);
173 // A duplicate acquire notification implies (1) connection hiccup or
174 // (2) new leader election. For the second case, restart the replayer to
175 // detect if the image has been deleted while the leader was offline
176 auto& image_replayer
= it
->second
;
177 image_replayer
->set_finished(false);
178 image_replayer
->restart(new C_TrackedOp(m_async_op_tracker
, nullptr));
181 m_threads
->work_queue
->queue(on_finish
, 0);
184 template <typename I
>
185 void InstanceReplayer
<I
>::release_image(const std::string
&global_image_id
,
186 Context
*on_finish
) {
187 dout(10) << "global_image_id=" << global_image_id
<< dendl
;
189 std::lock_guard locker
{m_lock
};
190 ceph_assert(m_on_shut_down
== nullptr);
192 auto it
= m_image_replayers
.find(global_image_id
);
193 if (it
== m_image_replayers
.end()) {
194 dout(5) << global_image_id
<< ": not found" << dendl
;
195 m_threads
->work_queue
->queue(on_finish
, 0);
199 auto image_replayer
= it
->second
;
200 m_image_replayers
.erase(it
);
202 on_finish
= new LambdaContext(
203 [image_replayer
, on_finish
] (int r
) {
204 image_replayer
->destroy();
205 on_finish
->complete(0);
207 stop_image_replayer(image_replayer
, on_finish
);
210 template <typename I
>
211 void InstanceReplayer
<I
>::remove_peer_image(const std::string
&global_image_id
,
212 const std::string
&peer_mirror_uuid
,
213 Context
*on_finish
) {
214 dout(10) << "global_image_id=" << global_image_id
<< ", "
215 << "peer_mirror_uuid=" << peer_mirror_uuid
<< dendl
;
217 std::lock_guard locker
{m_lock
};
218 ceph_assert(m_on_shut_down
== nullptr);
220 auto it
= m_image_replayers
.find(global_image_id
);
221 if (it
!= m_image_replayers
.end()) {
222 // TODO only a single peer is currently supported, therefore
223 // we can just interrupt the current image replayer and
224 // it will eventually detect that the peer image is missing and
225 // determine if a delete propagation is required.
226 auto image_replayer
= it
->second
;
227 image_replayer
->restart(new C_TrackedOp(m_async_op_tracker
, nullptr));
229 m_threads
->work_queue
->queue(on_finish
, 0);
232 template <typename I
>
233 void InstanceReplayer
<I
>::print_status(Formatter
*f
) {
236 std::lock_guard locker
{m_lock
};
238 f
->open_array_section("image_replayers");
239 for (auto &kv
: m_image_replayers
) {
240 auto &image_replayer
= kv
.second
;
241 image_replayer
->print_status(f
);
246 template <typename I
>
247 void InstanceReplayer
<I
>::start()
251 std::lock_guard locker
{m_lock
};
253 m_manual_stop
= false;
255 auto cct
= static_cast<CephContext
*>(m_local_io_ctx
.cct());
256 auto gather_ctx
= new C_Gather(
257 cct
, new C_TrackedOp(m_async_op_tracker
, nullptr));
258 for (auto &kv
: m_image_replayers
) {
259 auto &image_replayer
= kv
.second
;
260 image_replayer
->start(gather_ctx
->new_sub(), true);
263 gather_ctx
->activate();
266 template <typename I
>
267 void InstanceReplayer
<I
>::stop()
272 template <typename I
>
273 void InstanceReplayer
<I
>::stop(Context
*on_finish
)
277 if (on_finish
== nullptr) {
278 on_finish
= new C_TrackedOp(m_async_op_tracker
, on_finish
);
280 on_finish
= new LambdaContext(
281 [this, on_finish
] (int r
) {
282 m_async_op_tracker
.wait_for_ops(on_finish
);
286 auto cct
= static_cast<CephContext
*>(m_local_io_ctx
.cct());
287 auto gather_ctx
= new C_Gather(cct
, on_finish
);
289 std::lock_guard locker
{m_lock
};
291 m_manual_stop
= true;
293 for (auto &kv
: m_image_replayers
) {
294 auto &image_replayer
= kv
.second
;
295 image_replayer
->stop(gather_ctx
->new_sub(), true);
299 gather_ctx
->activate();
302 template <typename I
>
303 void InstanceReplayer
<I
>::restart()
307 std::lock_guard locker
{m_lock
};
309 m_manual_stop
= false;
311 for (auto &kv
: m_image_replayers
) {
312 auto &image_replayer
= kv
.second
;
313 image_replayer
->restart(new C_TrackedOp(m_async_op_tracker
, nullptr));
317 template <typename I
>
318 void InstanceReplayer
<I
>::flush()
322 std::lock_guard locker
{m_lock
};
324 for (auto &kv
: m_image_replayers
) {
325 auto &image_replayer
= kv
.second
;
326 image_replayer
->flush();
330 template <typename I
>
331 void InstanceReplayer
<I
>::start_image_replayer(
332 ImageReplayer
<I
> *image_replayer
) {
333 ceph_assert(ceph_mutex_is_locked(m_lock
));
335 std::string global_image_id
= image_replayer
->get_global_image_id();
336 if (!image_replayer
->is_stopped()) {
338 } else if (image_replayer
->is_blacklisted()) {
339 derr
<< "global_image_id=" << global_image_id
<< ": blacklisted detected "
340 << "during image replay" << dendl
;
341 m_blacklisted
= true;
343 } else if (image_replayer
->is_finished()) {
344 // TODO temporary until policy integrated
345 dout(5) << "removing image replayer for global_image_id="
346 << global_image_id
<< dendl
;
347 m_image_replayers
.erase(image_replayer
->get_global_image_id());
348 image_replayer
->destroy();
350 } else if (m_manual_stop
) {
354 dout(10) << "global_image_id=" << global_image_id
<< dendl
;
355 image_replayer
->start(new C_TrackedOp(m_async_op_tracker
, nullptr), false);
358 template <typename I
>
359 void InstanceReplayer
<I
>::queue_start_image_replayers() {
362 Context
*ctx
= create_context_callback
<
363 InstanceReplayer
, &InstanceReplayer
<I
>::start_image_replayers
>(this);
364 m_async_op_tracker
.start_op();
365 m_threads
->work_queue
->queue(ctx
, 0);
368 template <typename I
>
369 void InstanceReplayer
<I
>::start_image_replayers(int r
) {
372 std::lock_guard locker
{m_lock
};
373 if (m_on_shut_down
!= nullptr) {
377 uint64_t image_count
= 0;
378 uint64_t warning_count
= 0;
379 uint64_t error_count
= 0;
380 for (auto it
= m_image_replayers
.begin();
381 it
!= m_image_replayers
.end();) {
386 auto health_state
= current_it
->second
->get_health_state();
387 if (health_state
== image_replayer::HEALTH_STATE_WARNING
) {
389 } else if (health_state
== image_replayer::HEALTH_STATE_ERROR
) {
393 start_image_replayer(current_it
->second
);
396 m_service_daemon
->add_or_update_namespace_attribute(
397 m_local_io_ctx
.get_id(), m_local_io_ctx
.get_namespace(),
398 SERVICE_DAEMON_ASSIGNED_COUNT_KEY
, image_count
);
399 m_service_daemon
->add_or_update_namespace_attribute(
400 m_local_io_ctx
.get_id(), m_local_io_ctx
.get_namespace(),
401 SERVICE_DAEMON_WARNING_COUNT_KEY
, warning_count
);
402 m_service_daemon
->add_or_update_namespace_attribute(
403 m_local_io_ctx
.get_id(), m_local_io_ctx
.get_namespace(),
404 SERVICE_DAEMON_ERROR_COUNT_KEY
, error_count
);
406 m_async_op_tracker
.finish_op();
409 template <typename I
>
410 void InstanceReplayer
<I
>::stop_image_replayer(ImageReplayer
<I
> *image_replayer
,
411 Context
*on_finish
) {
412 dout(10) << image_replayer
<< " global_image_id="
413 << image_replayer
->get_global_image_id() << ", on_finish="
414 << on_finish
<< dendl
;
416 if (image_replayer
->is_stopped()) {
417 m_threads
->work_queue
->queue(on_finish
, 0);
421 m_async_op_tracker
.start_op();
422 Context
*ctx
= create_async_context_callback(
423 m_threads
->work_queue
, new LambdaContext(
424 [this, image_replayer
, on_finish
] (int r
) {
425 stop_image_replayer(image_replayer
, on_finish
);
426 m_async_op_tracker
.finish_op();
429 if (image_replayer
->is_running()) {
430 image_replayer
->stop(ctx
, false);
433 dout(10) << "scheduling image replayer " << image_replayer
<< " stop after "
434 << after
<< " sec (task " << ctx
<< ")" << dendl
;
435 ctx
= new LambdaContext(
436 [this, after
, ctx
] (int r
) {
437 std::lock_guard timer_locker
{m_threads
->timer_lock
};
438 m_threads
->timer
->add_event_after(after
, ctx
);
440 m_threads
->work_queue
->queue(ctx
, 0);
444 template <typename I
>
445 void InstanceReplayer
<I
>::wait_for_ops() {
448 Context
*ctx
= create_context_callback
<
449 InstanceReplayer
, &InstanceReplayer
<I
>::handle_wait_for_ops
>(this);
451 m_async_op_tracker
.wait_for_ops(ctx
);
454 template <typename I
>
455 void InstanceReplayer
<I
>::handle_wait_for_ops(int r
) {
456 dout(10) << "r=" << r
<< dendl
;
460 std::lock_guard locker
{m_lock
};
461 stop_image_replayers();
464 template <typename I
>
465 void InstanceReplayer
<I
>::stop_image_replayers() {
468 ceph_assert(ceph_mutex_is_locked(m_lock
));
470 Context
*ctx
= create_async_context_callback(
471 m_threads
->work_queue
, create_context_callback
<InstanceReplayer
<I
>,
472 &InstanceReplayer
<I
>::handle_stop_image_replayers
>(this));
474 C_Gather
*gather_ctx
= new C_Gather(g_ceph_context
, ctx
);
475 for (auto &it
: m_image_replayers
) {
476 stop_image_replayer(it
.second
, gather_ctx
->new_sub());
478 gather_ctx
->activate();
481 template <typename I
>
482 void InstanceReplayer
<I
>::handle_stop_image_replayers(int r
) {
483 dout(10) << "r=" << r
<< dendl
;
487 Context
*on_finish
= nullptr;
489 std::lock_guard locker
{m_lock
};
491 for (auto &it
: m_image_replayers
) {
492 ceph_assert(it
.second
->is_stopped());
493 it
.second
->destroy();
495 m_image_replayers
.clear();
497 ceph_assert(m_on_shut_down
!= nullptr);
498 std::swap(on_finish
, m_on_shut_down
);
500 on_finish
->complete(r
);
503 template <typename I
>
504 void InstanceReplayer
<I
>::cancel_image_state_check_task() {
505 std::lock_guard timer_locker
{m_threads
->timer_lock
};
507 if (m_image_state_check_task
== nullptr) {
511 dout(10) << m_image_state_check_task
<< dendl
;
512 bool canceled
= m_threads
->timer
->cancel_event(m_image_state_check_task
);
513 ceph_assert(canceled
);
514 m_image_state_check_task
= nullptr;
517 template <typename I
>
518 void InstanceReplayer
<I
>::schedule_image_state_check_task() {
519 ceph_assert(ceph_mutex_is_locked(m_threads
->timer_lock
));
520 ceph_assert(m_image_state_check_task
== nullptr);
522 m_image_state_check_task
= new LambdaContext(
524 ceph_assert(ceph_mutex_is_locked(m_threads
->timer_lock
));
525 m_image_state_check_task
= nullptr;
526 schedule_image_state_check_task();
527 queue_start_image_replayers();
530 auto cct
= static_cast<CephContext
*>(m_local_io_ctx
.cct());
531 int after
= cct
->_conf
.get_val
<uint64_t>(
532 "rbd_mirror_image_state_check_interval");
534 dout(10) << "scheduling image state check after " << after
<< " sec (task "
535 << m_image_state_check_task
<< ")" << dendl
;
536 m_threads
->timer
->add_event_after(after
, m_image_state_check_task
);
539 } // namespace mirror
542 template class rbd::mirror::InstanceReplayer
<librbd::ImageCtx
>;