1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "common/ceph_argparse.h"
5 #include "common/ceph_context.h"
6 #include "common/common_init.h"
7 #include "common/Cond.h"
8 #include "common/debug.h"
9 #include "common/errno.h"
10 #include "common/Timer.h"
11 #include "common/WorkQueue.h"
12 #include "include/types.h"
13 #include "mon/MonClient.h"
14 #include "msg/Messenger.h"
15 #include "aio_utils.h"
18 #define dout_context g_ceph_context
19 #define dout_subsys ceph_subsys_cephfs_mirror
21 #define dout_prefix *_dout << "cephfs::mirror::Mirror " << __func__
28 const std::string
SERVICE_DAEMON_MIRROR_ENABLE_FAILED_KEY("mirroring_failed");
30 class SafeTimerSingleton
: public CommonSafeTimer
<ceph::mutex
> {
32 ceph::mutex timer_lock
= ceph::make_mutex("cephfs::mirror::timer_lock");
34 explicit SafeTimerSingleton(CephContext
*cct
)
35 : SafeTimer(cct
, timer_lock
, true) {
40 class ThreadPoolSingleton
: public ThreadPool
{
42 ContextWQ
*work_queue
= nullptr;
44 explicit ThreadPoolSingleton(CephContext
*cct
)
45 : ThreadPool(cct
, "Mirror::thread_pool", "tp_mirror", 1) {
46 work_queue
= new ContextWQ("Mirror::work_queue", ceph::make_timespan(60), this);
52 } // anonymous namespace
54 struct Mirror::C_EnableMirroring
: Context
{
56 Filesystem filesystem
;
59 C_EnableMirroring(Mirror
*mirror
, const Filesystem
&filesystem
, uint64_t pool_id
)
61 filesystem(filesystem
),
65 void finish(int r
) override
{
69 void enable_mirroring() {
70 Context
*ctx
= new C_CallbackAdapter
<C_EnableMirroring
,
71 &C_EnableMirroring::handle_enable_mirroring
>(this);
72 mirror
->enable_mirroring(filesystem
, pool_id
, ctx
);
75 void handle_enable_mirroring(int r
) {
76 mirror
->handle_enable_mirroring(filesystem
, r
);
80 // context needs to live post completion
81 void complete(int r
) override
{
86 struct Mirror::C_DisableMirroring
: Context
{
88 Filesystem filesystem
;
90 C_DisableMirroring(Mirror
*mirror
, const Filesystem
&filesystem
)
92 filesystem(filesystem
) {
95 void finish(int r
) override
{
99 void disable_mirroring() {
100 Context
*ctx
= new C_CallbackAdapter
<C_DisableMirroring
,
101 &C_DisableMirroring::handle_disable_mirroring
>(this);
102 mirror
->disable_mirroring(filesystem
, ctx
);
105 void handle_disable_mirroring(int r
) {
106 mirror
->handle_disable_mirroring(filesystem
, r
);
110 // context needs to live post completion
111 void complete(int r
) override
{
116 struct Mirror::C_PeerUpdate
: Context
{
118 Filesystem filesystem
;
122 C_PeerUpdate(Mirror
*mirror
, const Filesystem
&filesystem
,
125 filesystem(filesystem
),
128 C_PeerUpdate(Mirror
*mirror
, const Filesystem
&filesystem
,
129 const Peer
&peer
, bool remove
)
131 filesystem(filesystem
),
136 void finish(int r
) override
{
138 mirror
->remove_peer(filesystem
, peer
);
140 mirror
->add_peer(filesystem
, peer
);
145 struct Mirror::C_RestartMirroring
: Context
{
147 Filesystem filesystem
;
151 C_RestartMirroring(Mirror
*mirror
, const Filesystem
&filesystem
,
152 uint64_t pool_id
, const Peers
&peers
)
154 filesystem(filesystem
),
159 void finish(int r
) override
{
163 void disable_mirroring() {
164 Context
*ctx
= new C_CallbackAdapter
<C_RestartMirroring
,
165 &C_RestartMirroring::handle_disable_mirroring
>(this);
166 mirror
->disable_mirroring(filesystem
, ctx
);
169 void handle_disable_mirroring(int r
) {
173 void enable_mirroring() {
174 std::scoped_lock
locker(mirror
->m_lock
);
175 Context
*ctx
= new C_CallbackAdapter
<C_RestartMirroring
,
176 &C_RestartMirroring::handle_enable_mirroring
>(this);
177 mirror
->enable_mirroring(filesystem
, pool_id
, ctx
, true);
180 void handle_enable_mirroring(int r
) {
181 mirror
->handle_enable_mirroring(filesystem
, peers
, r
);
185 // context needs to live post completion
186 void complete(int r
) override
{
191 Mirror::Mirror(CephContext
*cct
, const std::vector
<const char*> &args
,
192 MonClient
*monc
, Messenger
*msgr
)
198 m_last_blocklist_check(ceph_clock_now()),
199 m_last_failure_check(ceph_clock_now()),
200 m_local(new librados::Rados()) {
201 auto thread_pool
= &(cct
->lookup_or_create_singleton_object
<ThreadPoolSingleton
>(
202 "cephfs::mirror::thread_pool", false, cct
));
203 auto safe_timer
= &(cct
->lookup_or_create_singleton_object
<SafeTimerSingleton
>(
204 "cephfs::mirror::safe_timer", false, cct
));
205 m_thread_pool
= thread_pool
;
206 m_work_queue
= thread_pool
->work_queue
;
207 m_timer
= safe_timer
;
208 m_timer_lock
= &safe_timer
->timer_lock
;
209 std::scoped_lock
timer_lock(*m_timer_lock
);
210 schedule_mirror_update_task();
216 std::scoped_lock
timer_lock(*m_timer_lock
);
220 m_work_queue
->drain();
223 std::scoped_lock
locker(m_lock
);
224 m_thread_pool
->stop();
228 int Mirror::init_mon_client() {
231 m_monc
->set_messenger(m_msgr
);
232 m_msgr
->add_dispatcher_head(m_monc
);
233 m_monc
->set_want_keys(CEPH_ENTITY_TYPE_MON
);
235 int r
= m_monc
->init();
237 derr
<< ": failed to init mon client: " << cpp_strerror(r
) << dendl
;
241 r
= m_monc
->authenticate(std::chrono::duration
<double>(m_cct
->_conf
.get_val
<std::chrono::seconds
>("client_mount_timeout")).count());
243 derr
<< ": failed to authenticate to monitor: " << cpp_strerror(r
) << dendl
;
247 client_t me
= m_monc
->get_global_id();
248 m_msgr
->set_myname(entity_name_t::CLIENT(me
.v
));
252 int Mirror::init(std::string
&reason
) {
255 std::scoped_lock
locker(m_lock
);
257 int r
= m_local
->init_with_context(m_cct
);
259 derr
<< ": could not initialize rados handler" << dendl
;
263 r
= m_local
->connect();
265 derr
<< ": error connecting to local cluster" << dendl
;
269 m_service_daemon
= std::make_unique
<ServiceDaemon
>(m_cct
, m_local
);
270 r
= m_service_daemon
->init();
272 derr
<< ": error registering service daemon: " << cpp_strerror(r
) << dendl
;
276 r
= init_mon_client();
284 void Mirror::shutdown() {
287 m_cluster_watcher
->shutdown();
291 void Mirror::reopen_logs() {
292 for (auto &[filesystem
, mirror_action
] : m_mirror_actions
) {
293 mirror_action
.fs_mirror
->reopen_logs();
295 g_ceph_context
->reopen_logs();
298 void Mirror::handle_signal(int signum
) {
299 dout(10) << ": signal=" << signum
<< dendl
;
301 std::scoped_lock
locker(m_lock
);
311 ceph_abort_msgf("unexpected signal %d", signum
);
315 void Mirror::handle_enable_mirroring(const Filesystem
&filesystem
,
316 const Peers
&peers
, int r
) {
317 dout(20) << ": filesystem=" << filesystem
<< ", peers=" << peers
318 << ", r=" << r
<< dendl
;
320 std::scoped_lock
locker(m_lock
);
321 auto &mirror_action
= m_mirror_actions
.at(filesystem
);
322 ceph_assert(mirror_action
.action_in_progress
);
324 mirror_action
.action_in_progress
= false;
327 derr
<< ": failed to initialize FSMirror for filesystem=" << filesystem
328 << ": " << cpp_strerror(r
) << dendl
;
329 m_service_daemon
->add_or_update_fs_attribute(filesystem
.fscid
,
330 SERVICE_DAEMON_MIRROR_ENABLE_FAILED_KEY
,
335 for (auto &peer
: peers
) {
336 mirror_action
.fs_mirror
->add_peer(peer
);
339 dout(10) << ": Initialized FSMirror for filesystem=" << filesystem
<< dendl
;
342 void Mirror::handle_enable_mirroring(const Filesystem
&filesystem
, int r
) {
343 dout(20) << ": filesystem=" << filesystem
<< ", r=" << r
<< dendl
;
345 std::scoped_lock
locker(m_lock
);
346 auto &mirror_action
= m_mirror_actions
.at(filesystem
);
347 ceph_assert(mirror_action
.action_in_progress
);
349 mirror_action
.action_in_progress
= false;
352 derr
<< ": failed to initialize FSMirror for filesystem=" << filesystem
353 << ": " << cpp_strerror(r
) << dendl
;
354 m_service_daemon
->add_or_update_fs_attribute(filesystem
.fscid
,
355 SERVICE_DAEMON_MIRROR_ENABLE_FAILED_KEY
,
360 dout(10) << ": Initialized FSMirror for filesystem=" << filesystem
<< dendl
;
363 void Mirror::enable_mirroring(const Filesystem
&filesystem
, uint64_t local_pool_id
,
364 Context
*on_finish
, bool is_restart
) {
365 ceph_assert(ceph_mutex_is_locked(m_lock
));
367 auto &mirror_action
= m_mirror_actions
.at(filesystem
);
369 mirror_action
.fs_mirror
.reset();
371 ceph_assert(!mirror_action
.action_in_progress
);
374 ceph_assert(!mirror_action
.fs_mirror
);
376 dout(10) << ": starting FSMirror: filesystem=" << filesystem
<< dendl
;
378 mirror_action
.action_in_progress
= true;
379 mirror_action
.fs_mirror
= std::make_unique
<FSMirror
>(m_cct
, filesystem
, local_pool_id
,
380 m_service_daemon
.get(), m_args
, m_work_queue
);
381 mirror_action
.fs_mirror
->init(new C_AsyncCallback
<ContextWQ
>(m_work_queue
, on_finish
));
384 void Mirror::mirroring_enabled(const Filesystem
&filesystem
, uint64_t local_pool_id
) {
385 dout(10) << ": filesystem=" << filesystem
<< ", pool_id=" << local_pool_id
<< dendl
;
387 std::scoped_lock
locker(m_lock
);
392 auto p
= m_mirror_actions
.emplace(filesystem
, MirrorAction(local_pool_id
));
393 auto &mirror_action
= p
.first
->second
;
394 mirror_action
.action_ctxs
.push_back(new C_EnableMirroring(this, filesystem
, local_pool_id
));
397 void Mirror::handle_disable_mirroring(const Filesystem
&filesystem
, int r
) {
398 dout(10) << ": filesystem=" << filesystem
<< ", r=" << r
<< dendl
;
400 std::scoped_lock
locker(m_lock
);
401 auto &mirror_action
= m_mirror_actions
.at(filesystem
);
403 if (!mirror_action
.fs_mirror
->is_init_failed()) {
404 ceph_assert(mirror_action
.action_in_progress
);
405 mirror_action
.action_in_progress
= false;
410 mirror_action
.fs_mirror
.reset();
411 if (mirror_action
.action_ctxs
.empty()) {
412 dout(10) << ": no pending actions for filesystem=" << filesystem
<< dendl
;
413 m_mirror_actions
.erase(filesystem
);
418 void Mirror::disable_mirroring(const Filesystem
&filesystem
, Context
*on_finish
) {
419 ceph_assert(ceph_mutex_is_locked(m_lock
));
421 auto &mirror_action
= m_mirror_actions
.at(filesystem
);
422 ceph_assert(mirror_action
.fs_mirror
);
423 ceph_assert(!mirror_action
.action_in_progress
);
425 if (mirror_action
.fs_mirror
->is_init_failed()) {
426 dout(10) << ": init failed for filesystem=" << filesystem
<< dendl
;
427 m_work_queue
->queue(on_finish
, -EINVAL
);
431 mirror_action
.action_in_progress
= true;
432 mirror_action
.fs_mirror
->shutdown(new C_AsyncCallback
<ContextWQ
>(m_work_queue
, on_finish
));
435 void Mirror::mirroring_disabled(const Filesystem
&filesystem
) {
436 dout(10) << ": filesystem=" << filesystem
<< dendl
;
438 std::scoped_lock
locker(m_lock
);
440 dout(5) << "shutting down" << dendl
;
444 auto &mirror_action
= m_mirror_actions
.at(filesystem
);
445 mirror_action
.action_ctxs
.push_back(new C_DisableMirroring(this, filesystem
));
448 void Mirror::add_peer(const Filesystem
&filesystem
, const Peer
&peer
) {
449 ceph_assert(ceph_mutex_is_locked(m_lock
));
451 auto &mirror_action
= m_mirror_actions
.at(filesystem
);
452 ceph_assert(mirror_action
.fs_mirror
);
453 ceph_assert(!mirror_action
.action_in_progress
);
455 mirror_action
.fs_mirror
->add_peer(peer
);
458 void Mirror::peer_added(const Filesystem
&filesystem
, const Peer
&peer
) {
459 dout(20) << ": filesystem=" << filesystem
<< ", peer=" << peer
<< dendl
;
461 std::scoped_lock
locker(m_lock
);
463 dout(5) << "shutting down" << dendl
;
467 auto &mirror_action
= m_mirror_actions
.at(filesystem
);
468 mirror_action
.action_ctxs
.push_back(new C_PeerUpdate(this, filesystem
, peer
));
471 void Mirror::remove_peer(const Filesystem
&filesystem
, const Peer
&peer
) {
472 ceph_assert(ceph_mutex_is_locked(m_lock
));
474 auto &mirror_action
= m_mirror_actions
.at(filesystem
);
475 ceph_assert(mirror_action
.fs_mirror
);
476 ceph_assert(!mirror_action
.action_in_progress
);
478 mirror_action
.fs_mirror
->remove_peer(peer
);
481 void Mirror::peer_removed(const Filesystem
&filesystem
, const Peer
&peer
) {
482 dout(20) << ": filesystem=" << filesystem
<< ", peer=" << peer
<< dendl
;
484 std::scoped_lock
locker(m_lock
);
486 dout(5) << "shutting down" << dendl
;
490 auto &mirror_action
= m_mirror_actions
.at(filesystem
);
491 mirror_action
.action_ctxs
.push_back(new C_PeerUpdate(this, filesystem
, peer
, true));
494 void Mirror::update_fs_mirrors() {
497 auto now
= ceph_clock_now();
498 double blocklist_interval
= g_ceph_context
->_conf
.get_val
<std::chrono::seconds
>
499 ("cephfs_mirror_restart_mirror_on_blocklist_interval").count();
500 bool check_blocklist
= blocklist_interval
> 0 && ((now
- m_last_blocklist_check
) >= blocklist_interval
);
502 double failed_interval
= g_ceph_context
->_conf
.get_val
<std::chrono::seconds
>
503 ("cephfs_mirror_restart_mirror_on_failure_interval").count();
504 bool check_failure
= failed_interval
> 0 && ((now
- m_last_failure_check
) >= failed_interval
);
507 std::scoped_lock
locker(m_lock
);
508 for (auto &[filesystem
, mirror_action
] : m_mirror_actions
) {
509 auto failed
= mirror_action
.fs_mirror
&& mirror_action
.fs_mirror
->is_failed();
510 auto blocklisted
= mirror_action
.fs_mirror
&& mirror_action
.fs_mirror
->is_blocklisted();
512 if (check_failure
&& !mirror_action
.action_in_progress
&& failed
) {
513 // about to restart failed mirror instance -- nothing
515 dout(5) << ": filesystem=" << filesystem
<< " failed mirroring -- restarting" << dendl
;
516 auto peers
= mirror_action
.fs_mirror
->get_peers();
517 auto ctx
= new C_RestartMirroring(this, filesystem
, mirror_action
.pool_id
, peers
);
519 } else if (check_blocklist
&& !mirror_action
.action_in_progress
&& blocklisted
) {
520 // about to restart blocklisted mirror instance -- nothing
522 dout(5) << ": filesystem=" << filesystem
<< " is blocklisted -- restarting" << dendl
;
523 auto peers
= mirror_action
.fs_mirror
->get_peers();
524 auto ctx
= new C_RestartMirroring(this, filesystem
, mirror_action
.pool_id
, peers
);
527 if (!failed
&& !blocklisted
&& !mirror_action
.action_ctxs
.empty()
528 && !mirror_action
.action_in_progress
) {
529 auto ctx
= std::move(mirror_action
.action_ctxs
.front());
530 mirror_action
.action_ctxs
.pop_front();
535 if (check_blocklist
) {
536 m_last_blocklist_check
= now
;
539 m_last_failure_check
= now
;
543 schedule_mirror_update_task();
546 void Mirror::schedule_mirror_update_task() {
547 ceph_assert(m_timer_task
== nullptr);
548 ceph_assert(ceph_mutex_is_locked(*m_timer_lock
));
550 m_timer_task
= new LambdaContext([this](int _
) {
551 m_timer_task
= nullptr;
554 double after
= g_ceph_context
->_conf
.get_val
<std::chrono::seconds
>
555 ("cephfs_mirror_action_update_interval").count();
556 dout(20) << ": scheduling fs mirror update (" << m_timer_task
<< ") after "
557 << after
<< " seconds" << dendl
;
558 m_timer
->add_event_after(after
, m_timer_task
);
564 std::unique_lock
locker(m_lock
);
565 m_cluster_watcher
.reset(new ClusterWatcher(m_cct
, m_monc
, m_service_daemon
.get(), m_listener
));
566 m_msgr
->add_dispatcher_tail(m_cluster_watcher
.get());
568 m_cluster_watcher
->init();
569 m_cond
.wait(locker
, [this]{return m_stopping
;});
573 std::scoped_lock
timer_lock(*m_timer_lock
);
574 if (m_timer_task
!= nullptr) {
575 dout(10) << ": canceling timer task=" << m_timer_task
<< dendl
;
576 m_timer
->cancel_event(m_timer_task
);
577 m_timer_task
= nullptr;
582 for (auto &[filesystem
, mirror_action
] : m_mirror_actions
) {
583 dout(10) << ": trying to shutdown filesystem=" << filesystem
<< dendl
;
584 // wait for in-progress action and shutdown
585 m_cond
.wait(locker
, [&mirror_action
=mirror_action
]
586 {return !mirror_action
.action_in_progress
;});
587 if (mirror_action
.fs_mirror
&&
588 !mirror_action
.fs_mirror
->is_stopping() &&
589 !mirror_action
.fs_mirror
->is_init_failed()) {
591 mirror_action
.fs_mirror
->shutdown(new C_AsyncCallback
<ContextWQ
>(m_work_queue
, &cond
));
593 dout(10) << ": shutdown filesystem=" << filesystem
<< ", r=" << r
<< dendl
;
596 mirror_action
.fs_mirror
.reset();
600 } // namespace mirror
601 } // namespace cephfs