1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "common/admin_socket.h"
5 #include "common/ceph_argparse.h"
6 #include "common/ceph_context.h"
7 #include "common/common_init.h"
8 #include "common/debug.h"
9 #include "common/errno.h"
10 #include "common/WorkQueue.h"
11 #include "include/stringify.h"
12 #include "msg/Messenger.h"
14 #include "PeerReplayer.h"
15 #include "aio_utils.h"
16 #include "ServiceDaemon.h"
19 #include "common/Cond.h"
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_cephfs_mirror
24 #define dout_prefix *_dout << "cephfs::mirror::FSMirror " << __func__
31 const std::string
SERVICE_DAEMON_DIR_COUNT_KEY("directory_count");
32 const std::string
SERVICE_DAEMON_PEER_INIT_FAILED_KEY("peer_init_failed");
34 class MirrorAdminSocketCommand
{
36 virtual ~MirrorAdminSocketCommand() {
38 virtual int call(Formatter
*f
) = 0;
41 class StatusCommand
: public MirrorAdminSocketCommand
{
43 explicit StatusCommand(FSMirror
*fs_mirror
)
44 : fs_mirror(fs_mirror
) {
47 int call(Formatter
*f
) override
{
48 fs_mirror
->mirror_status(f
);
56 } // anonymous namespace
58 class MirrorAdminSocketHook
: public AdminSocketHook
{
60 MirrorAdminSocketHook(CephContext
*cct
, const Filesystem
&filesystem
, FSMirror
*fs_mirror
)
61 : admin_socket(cct
->get_admin_socket()) {
65 // mirror status format is name@fscid
66 cmd
= "fs mirror status " + stringify(filesystem
.fs_name
) + "@" + stringify(filesystem
.fscid
);
67 r
= admin_socket
->register_command(
68 cmd
, this, "get filesystem mirror status");
70 commands
[cmd
] = new StatusCommand(fs_mirror
);
74 ~MirrorAdminSocketHook() override
{
75 admin_socket
->unregister_commands(this);
76 for (auto &[command
, cmdptr
] : commands
) {
81 int call(std::string_view command
, const cmdmap_t
& cmdmap
,
82 Formatter
*f
, std::ostream
&errss
, bufferlist
&out
) override
{
83 auto p
= commands
.at(std::string(command
));
88 typedef std::map
<std::string
, MirrorAdminSocketCommand
*, std::less
<>> Commands
;
90 AdminSocket
*admin_socket
;
94 FSMirror::FSMirror(CephContext
*cct
, const Filesystem
&filesystem
, uint64_t pool_id
,
95 ServiceDaemon
*service_daemon
, std::vector
<const char*> args
,
96 ContextWQ
*work_queue
)
98 m_filesystem(filesystem
),
100 m_service_daemon(service_daemon
),
102 m_work_queue(work_queue
),
103 m_snap_listener(this),
104 m_asok_hook(new MirrorAdminSocketHook(cct
, filesystem
, this)) {
105 m_service_daemon
->add_or_update_fs_attribute(m_filesystem
.fscid
, SERVICE_DAEMON_DIR_COUNT_KEY
,
109 FSMirror::~FSMirror() {
113 std::scoped_lock
locker(m_lock
);
114 delete m_instance_watcher
;
115 delete m_mirror_watcher
;
117 // outside the lock so that in-progress commands can acquire
118 // lock and finish executing.
122 int FSMirror::init_replayer(PeerReplayer
*peer_replayer
) {
123 ceph_assert(ceph_mutex_is_locked(m_lock
));
124 return peer_replayer
->init();
127 void FSMirror::shutdown_replayer(PeerReplayer
*peer_replayer
) {
128 peer_replayer
->shutdown();
131 void FSMirror::cleanup() {
133 ceph_unmount(m_mount
);
134 ceph_release(m_mount
);
139 void FSMirror::reopen_logs() {
140 std::scoped_lock
locker(m_lock
);
143 reinterpret_cast<CephContext
*>(m_cluster
->cct())->reopen_logs();
145 for (auto &[peer
, replayer
] : m_peer_replayers
) {
146 replayer
->reopen_logs();
150 void FSMirror::init(Context
*on_finish
) {
153 std::scoped_lock
locker(m_lock
);
154 int r
= connect(g_ceph_context
->_conf
->name
.to_str(),
155 g_ceph_context
->_conf
->cluster
, &m_cluster
, "", "", m_args
);
157 m_init_failed
= true;
158 on_finish
->complete(r
);
162 r
= m_cluster
->ioctx_create2(m_pool_id
, m_ioctx
);
164 m_init_failed
= true;
166 derr
<< ": error accessing local pool (id=" << m_pool_id
<< "): "
167 << cpp_strerror(r
) << dendl
;
168 on_finish
->complete(r
);
172 r
= mount(m_cluster
, m_filesystem
, true, &m_mount
);
174 m_init_failed
= true;
177 on_finish
->complete(r
);
181 m_addrs
= m_cluster
->get_addrs();
182 dout(10) << ": rados addrs=" << m_addrs
<< dendl
;
184 init_instance_watcher(on_finish
);
187 void FSMirror::shutdown(Context
*on_finish
) {
191 std::scoped_lock
locker(m_lock
);
193 if (m_on_init_finish
!= nullptr) {
194 dout(10) << ": delaying shutdown -- init in progress" << dendl
;
195 m_on_shutdown_finish
= new LambdaContext([this, on_finish
](int r
) {
197 on_finish
->complete(0);
200 m_on_shutdown_finish
= on_finish
;
201 shutdown_peer_replayers();
206 m_on_shutdown_finish
= on_finish
;
209 shutdown_peer_replayers();
212 void FSMirror::shutdown_peer_replayers() {
215 for (auto &[peer
, peer_replayer
] : m_peer_replayers
) {
216 dout(5) << ": shutting down replayer for peer=" << peer
<< dendl
;
217 shutdown_replayer(peer_replayer
.get());
219 m_peer_replayers
.clear();
221 shutdown_mirror_watcher();
224 void FSMirror::init_instance_watcher(Context
*on_finish
) {
227 m_on_init_finish
= new LambdaContext([this, on_finish
](int r
) {
229 std::scoped_lock
locker(m_lock
);
231 m_init_failed
= true;
234 on_finish
->complete(r
);
235 if (m_on_shutdown_finish
!= nullptr) {
236 m_on_shutdown_finish
->complete(r
);
240 Context
*ctx
= new C_CallbackAdapter
<
241 FSMirror
, &FSMirror::handle_init_instance_watcher
>(this);
242 m_instance_watcher
= InstanceWatcher::create(m_ioctx
, m_snap_listener
, m_work_queue
);
243 m_instance_watcher
->init(ctx
);
246 void FSMirror::handle_init_instance_watcher(int r
) {
247 dout(20) << ": r=" << r
<< dendl
;
249 Context
*on_init_finish
= nullptr;
251 std::scoped_lock
locker(m_lock
);
253 std::swap(on_init_finish
, m_on_init_finish
);
257 if (on_init_finish
!= nullptr) {
258 on_init_finish
->complete(r
);
262 init_mirror_watcher();
265 void FSMirror::init_mirror_watcher() {
268 std::scoped_lock
locker(m_lock
);
269 Context
*ctx
= new C_CallbackAdapter
<
270 FSMirror
, &FSMirror::handle_init_mirror_watcher
>(this);
271 m_mirror_watcher
= MirrorWatcher::create(m_ioctx
, this, m_work_queue
);
272 m_mirror_watcher
->init(ctx
);
275 void FSMirror::handle_init_mirror_watcher(int r
) {
276 dout(20) << ": r=" << r
<< dendl
;
278 Context
*on_init_finish
= nullptr;
280 std::scoped_lock
locker(m_lock
);
282 std::swap(on_init_finish
, m_on_init_finish
);
286 if (on_init_finish
!= nullptr) {
287 on_init_finish
->complete(r
);
291 m_retval
= r
; // save errcode for init context callback
292 shutdown_instance_watcher();
295 void FSMirror::shutdown_mirror_watcher() {
298 std::scoped_lock
locker(m_lock
);
299 Context
*ctx
= new C_CallbackAdapter
<
300 FSMirror
, &FSMirror::handle_shutdown_mirror_watcher
>(this);
301 m_mirror_watcher
->shutdown(ctx
);
304 void FSMirror::handle_shutdown_mirror_watcher(int r
) {
305 dout(20) << ": r=" << r
<< dendl
;
307 shutdown_instance_watcher();
310 void FSMirror::shutdown_instance_watcher() {
313 std::scoped_lock
locker(m_lock
);
314 Context
*ctx
= new C_CallbackAdapter
<
315 FSMirror
, &FSMirror::handle_shutdown_instance_watcher
>(this);
316 m_instance_watcher
->shutdown(new C_AsyncCallback
<ContextWQ
>(m_work_queue
, ctx
));
319 void FSMirror::handle_shutdown_instance_watcher(int r
) {
320 dout(20) << ": r=" << r
<< dendl
;
324 Context
*on_init_finish
= nullptr;
325 Context
*on_shutdown_finish
= nullptr;
328 std::scoped_lock
locker(m_lock
);
329 std::swap(on_init_finish
, m_on_init_finish
);
330 std::swap(on_shutdown_finish
, m_on_shutdown_finish
);
333 if (on_init_finish
!= nullptr) {
334 on_init_finish
->complete(m_retval
);
336 if (on_shutdown_finish
!= nullptr) {
337 on_shutdown_finish
->complete(r
);
341 void FSMirror::handle_acquire_directory(string_view dir_path
) {
342 dout(5) << ": dir_path=" << dir_path
<< dendl
;
345 std::scoped_lock
locker(m_lock
);
346 m_directories
.emplace(dir_path
);
347 m_service_daemon
->add_or_update_fs_attribute(m_filesystem
.fscid
, SERVICE_DAEMON_DIR_COUNT_KEY
,
348 m_directories
.size());
350 for (auto &[peer
, peer_replayer
] : m_peer_replayers
) {
351 dout(10) << ": peer=" << peer
<< dendl
;
352 peer_replayer
->add_directory(dir_path
);
357 void FSMirror::handle_release_directory(string_view dir_path
) {
358 dout(5) << ": dir_path=" << dir_path
<< dendl
;
361 std::scoped_lock
locker(m_lock
);
362 auto it
= m_directories
.find(dir_path
);
363 if (it
!= m_directories
.end()) {
364 m_directories
.erase(it
);
365 m_service_daemon
->add_or_update_fs_attribute(m_filesystem
.fscid
, SERVICE_DAEMON_DIR_COUNT_KEY
,
366 m_directories
.size());
367 for (auto &[peer
, peer_replayer
] : m_peer_replayers
) {
368 dout(10) << ": peer=" << peer
<< dendl
;
369 peer_replayer
->remove_directory(dir_path
);
375 void FSMirror::add_peer(const Peer
&peer
) {
376 dout(10) << ": peer=" << peer
<< dendl
;
378 std::scoped_lock
locker(m_lock
);
379 m_all_peers
.emplace(peer
);
380 if (m_peer_replayers
.find(peer
) != m_peer_replayers
.end()) {
384 auto replayer
= std::make_unique
<PeerReplayer
>(
385 m_cct
, this, m_cluster
, m_filesystem
, peer
, m_directories
, m_mount
, m_service_daemon
);
386 int r
= init_replayer(replayer
.get());
388 m_service_daemon
->add_or_update_peer_attribute(m_filesystem
.fscid
, peer
,
389 SERVICE_DAEMON_PEER_INIT_FAILED_KEY
,
393 m_peer_replayers
.emplace(peer
, std::move(replayer
));
394 ceph_assert(m_peer_replayers
.size() == 1); // support only a single peer
397 void FSMirror::remove_peer(const Peer
&peer
) {
398 dout(10) << ": peer=" << peer
<< dendl
;
400 std::unique_ptr
<PeerReplayer
> replayer
;
402 std::scoped_lock
locker(m_lock
);
403 m_all_peers
.erase(peer
);
404 auto it
= m_peer_replayers
.find(peer
);
405 if (it
!= m_peer_replayers
.end()) {
406 replayer
= std::move(it
->second
);
407 m_peer_replayers
.erase(it
);
412 dout(5) << ": shutting down replayers for peer=" << peer
<< dendl
;
413 shutdown_replayer(replayer
.get());
417 void FSMirror::mirror_status(Formatter
*f
) {
418 std::scoped_lock
locker(m_lock
);
419 f
->open_object_section("status");
421 f
->dump_string("state", "failed");
422 } else if (is_blocklisted(locker
)) {
423 f
->dump_string("state", "blocklisted");
425 // dump rados addr for blocklist test
426 f
->dump_string("rados_inst", m_addrs
);
427 f
->open_object_section("peers");
428 for ([[maybe_unused
]] auto &[peer
, peer_replayer
] : m_peer_replayers
) {
431 f
->close_section(); // peers
432 f
->open_object_section("snap_dirs");
433 f
->dump_int("dir_count", m_directories
.size());
434 f
->close_section(); // snap_dirs
436 f
->close_section(); // status
440 } // namespace mirror
441 } // namespace cephfs