1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "common/admin_socket.h"
5 #include "common/ceph_argparse.h"
6 #include "common/ceph_context.h"
7 #include "common/common_init.h"
8 #include "common/debug.h"
9 #include "common/errno.h"
10 #include "common/WorkQueue.h"
11 #include "include/stringify.h"
12 #include "msg/Messenger.h"
14 #include "PeerReplayer.h"
15 #include "aio_utils.h"
16 #include "ServiceDaemon.h"
19 #include "common/Cond.h"
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_cephfs_mirror
24 #define dout_prefix *_dout << "cephfs::mirror::FSMirror " << __func__
33 const std::string
SERVICE_DAEMON_DIR_COUNT_KEY("directory_count");
34 const std::string
SERVICE_DAEMON_PEER_INIT_FAILED_KEY("peer_init_failed");
36 class MirrorAdminSocketCommand
{
38 virtual ~MirrorAdminSocketCommand() {
40 virtual int call(Formatter
*f
) = 0;
43 class StatusCommand
: public MirrorAdminSocketCommand
{
45 explicit StatusCommand(FSMirror
*fs_mirror
)
46 : fs_mirror(fs_mirror
) {
49 int call(Formatter
*f
) override
{
50 fs_mirror
->mirror_status(f
);
58 } // anonymous namespace
60 class MirrorAdminSocketHook
: public AdminSocketHook
{
62 MirrorAdminSocketHook(CephContext
*cct
, const Filesystem
&filesystem
, FSMirror
*fs_mirror
)
63 : admin_socket(cct
->get_admin_socket()) {
67 // mirror status format is name@fscid
68 cmd
= "fs mirror status " + stringify(filesystem
.fs_name
) + "@" + stringify(filesystem
.fscid
);
69 r
= admin_socket
->register_command(
70 cmd
, this, "get filesystem mirror status");
72 commands
[cmd
] = new StatusCommand(fs_mirror
);
76 ~MirrorAdminSocketHook() override
{
77 admin_socket
->unregister_commands(this);
78 for (auto &[command
, cmdptr
] : commands
) {
83 int call(std::string_view command
, const cmdmap_t
& cmdmap
,
84 Formatter
*f
, std::ostream
&errss
, bufferlist
&out
) override
{
85 auto p
= commands
.at(std::string(command
));
90 typedef std::map
<std::string
, MirrorAdminSocketCommand
*, std::less
<>> Commands
;
92 AdminSocket
*admin_socket
;
96 FSMirror::FSMirror(CephContext
*cct
, const Filesystem
&filesystem
, uint64_t pool_id
,
97 ServiceDaemon
*service_daemon
, std::vector
<const char*> args
,
98 ContextWQ
*work_queue
)
100 m_filesystem(filesystem
),
102 m_service_daemon(service_daemon
),
104 m_work_queue(work_queue
),
105 m_snap_listener(this),
106 m_asok_hook(new MirrorAdminSocketHook(cct
, filesystem
, this)) {
107 m_service_daemon
->add_or_update_fs_attribute(m_filesystem
.fscid
, SERVICE_DAEMON_DIR_COUNT_KEY
,
111 FSMirror::~FSMirror() {
115 std::scoped_lock
locker(m_lock
);
116 delete m_instance_watcher
;
117 delete m_mirror_watcher
;
119 // outside the lock so that in-progress commands can acquire
120 // lock and finish executing.
124 int FSMirror::init_replayer(PeerReplayer
*peer_replayer
) {
125 ceph_assert(ceph_mutex_is_locked(m_lock
));
126 return peer_replayer
->init();
129 void FSMirror::shutdown_replayer(PeerReplayer
*peer_replayer
) {
130 peer_replayer
->shutdown();
133 void FSMirror::cleanup() {
135 ceph_unmount(m_mount
);
136 ceph_release(m_mount
);
141 void FSMirror::reopen_logs() {
142 std::scoped_lock
locker(m_lock
);
145 reinterpret_cast<CephContext
*>(m_cluster
->cct())->reopen_logs();
147 for (auto &[peer
, replayer
] : m_peer_replayers
) {
148 replayer
->reopen_logs();
152 void FSMirror::init(Context
*on_finish
) {
155 std::scoped_lock
locker(m_lock
);
156 int r
= connect(g_ceph_context
->_conf
->name
.to_str(),
157 g_ceph_context
->_conf
->cluster
, &m_cluster
, "", "", m_args
);
159 m_init_failed
= true;
160 on_finish
->complete(r
);
164 r
= m_cluster
->ioctx_create2(m_pool_id
, m_ioctx
);
166 m_init_failed
= true;
168 derr
<< ": error accessing local pool (id=" << m_pool_id
<< "): "
169 << cpp_strerror(r
) << dendl
;
170 on_finish
->complete(r
);
174 r
= mount(m_cluster
, m_filesystem
, true, &m_mount
);
176 m_init_failed
= true;
179 on_finish
->complete(r
);
183 m_addrs
= m_cluster
->get_addrs();
184 dout(10) << ": rados addrs=" << m_addrs
<< dendl
;
186 init_instance_watcher(on_finish
);
189 void FSMirror::shutdown(Context
*on_finish
) {
193 std::scoped_lock
locker(m_lock
);
195 if (m_on_init_finish
!= nullptr) {
196 dout(10) << ": delaying shutdown -- init in progress" << dendl
;
197 m_on_shutdown_finish
= new LambdaContext([this, on_finish
](int r
) {
199 on_finish
->complete(0);
202 m_on_shutdown_finish
= on_finish
;
203 shutdown_peer_replayers();
208 m_on_shutdown_finish
= on_finish
;
211 shutdown_peer_replayers();
214 void FSMirror::shutdown_peer_replayers() {
217 for (auto &[peer
, peer_replayer
] : m_peer_replayers
) {
218 dout(5) << ": shutting down replayer for peer=" << peer
<< dendl
;
219 shutdown_replayer(peer_replayer
.get());
221 m_peer_replayers
.clear();
223 shutdown_mirror_watcher();
226 void FSMirror::init_instance_watcher(Context
*on_finish
) {
229 m_on_init_finish
= new LambdaContext([this, on_finish
](int r
) {
231 std::scoped_lock
locker(m_lock
);
233 m_init_failed
= true;
236 on_finish
->complete(r
);
237 if (m_on_shutdown_finish
!= nullptr) {
238 m_on_shutdown_finish
->complete(r
);
242 Context
*ctx
= new C_CallbackAdapter
<
243 FSMirror
, &FSMirror::handle_init_instance_watcher
>(this);
244 m_instance_watcher
= InstanceWatcher::create(m_ioctx
, m_snap_listener
, m_work_queue
);
245 m_instance_watcher
->init(ctx
);
248 void FSMirror::handle_init_instance_watcher(int r
) {
249 dout(20) << ": r=" << r
<< dendl
;
251 Context
*on_init_finish
= nullptr;
253 std::scoped_lock
locker(m_lock
);
255 std::swap(on_init_finish
, m_on_init_finish
);
259 if (on_init_finish
!= nullptr) {
260 on_init_finish
->complete(r
);
264 init_mirror_watcher();
267 void FSMirror::init_mirror_watcher() {
270 std::scoped_lock
locker(m_lock
);
271 Context
*ctx
= new C_CallbackAdapter
<
272 FSMirror
, &FSMirror::handle_init_mirror_watcher
>(this);
273 m_mirror_watcher
= MirrorWatcher::create(m_ioctx
, this, m_work_queue
);
274 m_mirror_watcher
->init(ctx
);
277 void FSMirror::handle_init_mirror_watcher(int r
) {
278 dout(20) << ": r=" << r
<< dendl
;
280 Context
*on_init_finish
= nullptr;
282 std::scoped_lock
locker(m_lock
);
284 std::swap(on_init_finish
, m_on_init_finish
);
288 if (on_init_finish
!= nullptr) {
289 on_init_finish
->complete(r
);
293 m_retval
= r
; // save errcode for init context callback
294 shutdown_instance_watcher();
297 void FSMirror::shutdown_mirror_watcher() {
300 std::scoped_lock
locker(m_lock
);
301 Context
*ctx
= new C_CallbackAdapter
<
302 FSMirror
, &FSMirror::handle_shutdown_mirror_watcher
>(this);
303 m_mirror_watcher
->shutdown(ctx
);
306 void FSMirror::handle_shutdown_mirror_watcher(int r
) {
307 dout(20) << ": r=" << r
<< dendl
;
309 shutdown_instance_watcher();
312 void FSMirror::shutdown_instance_watcher() {
315 std::scoped_lock
locker(m_lock
);
316 Context
*ctx
= new C_CallbackAdapter
<
317 FSMirror
, &FSMirror::handle_shutdown_instance_watcher
>(this);
318 m_instance_watcher
->shutdown(new C_AsyncCallback
<ContextWQ
>(m_work_queue
, ctx
));
321 void FSMirror::handle_shutdown_instance_watcher(int r
) {
322 dout(20) << ": r=" << r
<< dendl
;
326 Context
*on_init_finish
= nullptr;
327 Context
*on_shutdown_finish
= nullptr;
330 std::scoped_lock
locker(m_lock
);
331 std::swap(on_init_finish
, m_on_init_finish
);
332 std::swap(on_shutdown_finish
, m_on_shutdown_finish
);
335 if (on_init_finish
!= nullptr) {
336 on_init_finish
->complete(m_retval
);
338 if (on_shutdown_finish
!= nullptr) {
339 on_shutdown_finish
->complete(r
);
343 void FSMirror::handle_acquire_directory(string_view dir_path
) {
344 dout(5) << ": dir_path=" << dir_path
<< dendl
;
347 std::scoped_lock
locker(m_lock
);
348 m_directories
.emplace(dir_path
);
349 m_service_daemon
->add_or_update_fs_attribute(m_filesystem
.fscid
, SERVICE_DAEMON_DIR_COUNT_KEY
,
350 m_directories
.size());
352 for (auto &[peer
, peer_replayer
] : m_peer_replayers
) {
353 dout(10) << ": peer=" << peer
<< dendl
;
354 peer_replayer
->add_directory(dir_path
);
359 void FSMirror::handle_release_directory(string_view dir_path
) {
360 dout(5) << ": dir_path=" << dir_path
<< dendl
;
363 std::scoped_lock
locker(m_lock
);
364 auto it
= m_directories
.find(dir_path
);
365 if (it
!= m_directories
.end()) {
366 m_directories
.erase(it
);
367 m_service_daemon
->add_or_update_fs_attribute(m_filesystem
.fscid
, SERVICE_DAEMON_DIR_COUNT_KEY
,
368 m_directories
.size());
369 for (auto &[peer
, peer_replayer
] : m_peer_replayers
) {
370 dout(10) << ": peer=" << peer
<< dendl
;
371 peer_replayer
->remove_directory(dir_path
);
377 void FSMirror::add_peer(const Peer
&peer
) {
378 dout(10) << ": peer=" << peer
<< dendl
;
380 std::scoped_lock
locker(m_lock
);
381 m_all_peers
.emplace(peer
);
382 if (m_peer_replayers
.find(peer
) != m_peer_replayers
.end()) {
386 auto replayer
= std::make_unique
<PeerReplayer
>(
387 m_cct
, this, m_cluster
, m_filesystem
, peer
, m_directories
, m_mount
, m_service_daemon
);
388 int r
= init_replayer(replayer
.get());
390 m_service_daemon
->add_or_update_peer_attribute(m_filesystem
.fscid
, peer
,
391 SERVICE_DAEMON_PEER_INIT_FAILED_KEY
,
395 m_peer_replayers
.emplace(peer
, std::move(replayer
));
396 ceph_assert(m_peer_replayers
.size() == 1); // support only a single peer
399 void FSMirror::remove_peer(const Peer
&peer
) {
400 dout(10) << ": peer=" << peer
<< dendl
;
402 std::unique_ptr
<PeerReplayer
> replayer
;
404 std::scoped_lock
locker(m_lock
);
405 m_all_peers
.erase(peer
);
406 auto it
= m_peer_replayers
.find(peer
);
407 if (it
!= m_peer_replayers
.end()) {
408 replayer
= std::move(it
->second
);
409 m_peer_replayers
.erase(it
);
414 dout(5) << ": shutting down replayers for peer=" << peer
<< dendl
;
415 shutdown_replayer(replayer
.get());
419 void FSMirror::mirror_status(Formatter
*f
) {
420 std::scoped_lock
locker(m_lock
);
421 f
->open_object_section("status");
423 f
->dump_string("state", "failed");
424 } else if (is_blocklisted(locker
)) {
425 f
->dump_string("state", "blocklisted");
427 // dump rados addr for blocklist test
428 f
->dump_string("rados_inst", m_addrs
);
429 f
->open_object_section("peers");
430 for ([[maybe_unused
]] auto &[peer
, peer_replayer
] : m_peer_replayers
) {
433 f
->close_section(); // peers
434 f
->open_object_section("snap_dirs");
435 f
->dump_int("dir_count", m_directories
.size());
436 f
->close_section(); // snap_dirs
438 f
->close_section(); // status
442 } // namespace mirror
443 } // namespace cephfs