1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef CEPHFS_MIRROR_FS_MIRROR_H
5 #define CEPHFS_MIRROR_FS_MIRROR_H
7 #include "common/Formatter.h"
8 #include "common/Thread.h"
11 #include "InstanceWatcher.h"
12 #include "MirrorWatcher.h"
19 class MirrorAdminSocketHook
;
23 // handle mirroring for a filesystem to a set of peers
27 FSMirror(CephContext
*cct
, const Filesystem
&filesystem
, uint64_t pool_id
,
28 ServiceDaemon
*service_daemon
, std::vector
<const char*> args
,
29 ContextWQ
*work_queue
);
32 void init(Context
*on_finish
);
33 void shutdown(Context
*on_finish
);
35 void add_peer(const Peer
&peer
);
36 void remove_peer(const Peer
&peer
);
39 std::scoped_lock
locker(m_lock
);
43 bool is_init_failed() {
44 std::scoped_lock
locker(m_lock
);
49 std::scoped_lock
locker(m_lock
);
50 return m_init_failed
||
51 m_instance_watcher
->is_failed() ||
52 m_mirror_watcher
->is_failed();
55 utime_t
get_failed_ts() {
56 std::scoped_lock
locker(m_lock
);
57 if (m_instance_watcher
) {
58 return m_instance_watcher
->get_failed_ts();
60 if (m_mirror_watcher
) {
61 return m_mirror_watcher
->get_failed_ts();
67 bool is_blocklisted() {
68 std::scoped_lock
locker(m_lock
);
69 return is_blocklisted(locker
);
72 utime_t
get_blocklisted_ts() {
73 std::scoped_lock
locker(m_lock
);
74 if (m_instance_watcher
) {
75 return m_instance_watcher
->get_blocklisted_ts();
77 if (m_mirror_watcher
) {
78 return m_mirror_watcher
->get_blocklisted_ts();
85 std::scoped_lock
locker(m_lock
);
89 std::string
get_instance_addr() {
90 std::scoped_lock
locker(m_lock
);
94 // admin socket helpers
95 void mirror_status(Formatter
*f
);
100 bool is_blocklisted(const std::scoped_lock
<ceph::mutex
> &locker
) const {
101 bool blocklisted
= false;
102 if (m_instance_watcher
) {
103 blocklisted
= m_instance_watcher
->is_blocklisted();
105 if (m_mirror_watcher
) {
106 blocklisted
|= m_mirror_watcher
->is_blocklisted();
112 struct SnapListener
: public InstanceWatcher::Listener
{
115 SnapListener(FSMirror
*fs_mirror
)
116 : fs_mirror(fs_mirror
) {
119 void acquire_directory(std::string_view dir_path
) override
{
120 fs_mirror
->handle_acquire_directory(dir_path
);
123 void release_directory(std::string_view dir_path
) override
{
124 fs_mirror
->handle_release_directory(dir_path
);
129 Filesystem m_filesystem
;
131 ServiceDaemon
*m_service_daemon
;
132 std::vector
<const char *> m_args
;
133 ContextWQ
*m_work_queue
;
135 ceph::mutex m_lock
= ceph::make_mutex("cephfs::mirror::fs_mirror");
136 SnapListener m_snap_listener
;
137 std::set
<std::string
, std::less
<>> m_directories
;
139 std::map
<Peer
, std::unique_ptr
<PeerReplayer
>> m_peer_replayers
;
143 librados::IoCtx m_ioctx
;
144 InstanceWatcher
*m_instance_watcher
= nullptr;
145 MirrorWatcher
*m_mirror_watcher
= nullptr;
148 bool m_stopping
= false;
149 bool m_init_failed
= false;
150 Context
*m_on_init_finish
= nullptr;
151 Context
*m_on_shutdown_finish
= nullptr;
153 MirrorAdminSocketHook
*m_asok_hook
= nullptr;
157 int init_replayer(PeerReplayer
*peer_replayer
);
158 void shutdown_replayer(PeerReplayer
*peer_replayer
);
161 void init_instance_watcher(Context
*on_finish
);
162 void handle_init_instance_watcher(int r
);
164 void init_mirror_watcher();
165 void handle_init_mirror_watcher(int r
);
167 void shutdown_peer_replayers();
169 void shutdown_mirror_watcher();
170 void handle_shutdown_mirror_watcher(int r
);
172 void shutdown_instance_watcher();
173 void handle_shutdown_instance_watcher(int r
);
175 void handle_acquire_directory(std::string_view dir_path
);
176 void handle_release_directory(std::string_view dir_path
);
179 } // namespace mirror
180 } // namespace cephfs
182 #endif // CEPHFS_MIRROR_FS_MIRROR_H