1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef CEPHFS_MIRROR_PEER_REPLAYER_H
5 #define CEPHFS_MIRROR_PEER_REPLAYER_H
7 #include "common/Formatter.h"
8 #include "common/Thread.h"
10 #include "ServiceDaemon.h"
17 class PeerReplayerAdminSocketHook
;
21 PeerReplayer(CephContext
*cct
, FSMirror
*fs_mirror
,
22 RadosRef local_cluster
, const Filesystem
&filesystem
,
23 const Peer
&peer
, const std::set
<std::string
, std::less
<>> &directories
,
24 MountRef mount
, ServiceDaemon
*service_daemon
);
27 // initialize replayer for a peer
30 // shutdown replayer for a peer
33 // add a directory to mirror queue
34 void add_directory(string_view dir_path
);
36 // remove a directory from queue
37 void remove_directory(string_view dir_path
);
39 // admin socket helpers
40 void peer_status(Formatter
*f
);
43 inline static const std::string PRIMARY_SNAP_ID_KEY
= "primary_snap_id";
45 inline static const std::string SERVICE_DAEMON_FAILED_DIR_COUNT_KEY
= "failure_count";
46 inline static const std::string SERVICE_DAEMON_RECOVERED_DIR_COUNT_KEY
= "recovery_count";
53 class SnapshotReplayerThread
: public Thread
{
55 SnapshotReplayerThread(PeerReplayer
*peer_replayer
)
56 : m_peer_replayer(peer_replayer
) {
59 void *entry() override
{
60 m_peer_replayer
->run(this);
68 bool is_canceled() const {
73 PeerReplayer
*m_peer_replayer
;
74 bool canceled
= false;
79 SnapshotReplayerThread
*replayer
;
84 ceph_dir_result
*dirp
; // valid for directories
85 struct ceph_statx stx
;
87 SyncEntry(std::string_view path
,
88 const struct ceph_statx
&stx
)
92 SyncEntry(std::string_view path
,
93 ceph_dir_result
*dirp
,
94 const struct ceph_statx
&stx
)
100 bool is_directory() const {
101 return S_ISDIR(stx
.stx_mode
);
105 using clock
= ceph::coarse_mono_clock
;
106 using time
= ceph::coarse_mono_time
;
108 // stats sent to service daemon
109 struct ServiceDaemonStats
{
110 uint64_t failed_dir_count
= 0;
111 uint64_t recovered_dir_count
= 0;
114 struct SnapSyncStat
{
115 uint64_t nr_failures
= 0; // number of consecutive failures
116 boost::optional
<time
> last_failed
; // lat failed timestamp
117 bool failed
= false; // hit upper cap for consecutive failures
118 boost::optional
<std::pair
<uint64_t, std::string
>> last_synced_snap
;
119 boost::optional
<std::pair
<uint64_t, std::string
>> current_syncing_snap
;
120 uint64_t synced_snap_count
= 0;
121 uint64_t deleted_snap_count
= 0;
122 uint64_t renamed_snap_count
= 0;
123 time last_synced
= clock::zero();
124 boost::optional
<double> last_sync_duration
;
127 void _inc_failed_count(const std::string
&dir_path
) {
128 auto max_failures
= g_ceph_context
->_conf
.get_val
<uint64_t>(
129 "cephfs_mirror_max_consecutive_failures_per_directory");
130 auto &sync_stat
= m_snap_sync_stats
.at(dir_path
);
131 sync_stat
.last_failed
= clock::now();
132 if (++sync_stat
.nr_failures
>= max_failures
&& !sync_stat
.failed
) {
133 sync_stat
.failed
= true;
134 ++m_service_daemon_stats
.failed_dir_count
;
135 m_service_daemon
->add_or_update_peer_attribute(m_filesystem
.fscid
, m_peer
,
136 SERVICE_DAEMON_FAILED_DIR_COUNT_KEY
,
137 m_service_daemon_stats
.failed_dir_count
);
140 void _reset_failed_count(const std::string
&dir_path
) {
141 auto &sync_stat
= m_snap_sync_stats
.at(dir_path
);
142 if (sync_stat
.failed
) {
143 ++m_service_daemon_stats
.recovered_dir_count
;
144 m_service_daemon
->add_or_update_peer_attribute(m_filesystem
.fscid
, m_peer
,
145 SERVICE_DAEMON_RECOVERED_DIR_COUNT_KEY
,
146 m_service_daemon_stats
.recovered_dir_count
);
148 sync_stat
.nr_failures
= 0;
149 sync_stat
.failed
= false;
150 sync_stat
.last_failed
= boost::none
;
153 void _set_last_synced_snap(const std::string
&dir_path
, uint64_t snap_id
,
154 const std::string
&snap_name
) {
155 auto &sync_stat
= m_snap_sync_stats
.at(dir_path
);
156 sync_stat
.last_synced_snap
= std::make_pair(snap_id
, snap_name
);
157 sync_stat
.current_syncing_snap
= boost::none
;
159 void set_last_synced_snap(const std::string
&dir_path
, uint64_t snap_id
,
160 const std::string
&snap_name
) {
161 std::scoped_lock
locker(m_lock
);
162 _set_last_synced_snap(dir_path
, snap_id
, snap_name
);
164 void set_current_syncing_snap(const std::string
&dir_path
, uint64_t snap_id
,
165 const std::string
&snap_name
) {
166 std::scoped_lock
locker(m_lock
);
167 auto &sync_stat
= m_snap_sync_stats
.at(dir_path
);
168 sync_stat
.current_syncing_snap
= std::make_pair(snap_id
, snap_name
);
170 void clear_current_syncing_snap(const std::string
&dir_path
) {
171 std::scoped_lock
locker(m_lock
);
172 auto &sync_stat
= m_snap_sync_stats
.at(dir_path
);
173 sync_stat
.current_syncing_snap
= boost::none
;
175 void inc_deleted_snap(const std::string
&dir_path
) {
176 std::scoped_lock
locker(m_lock
);
177 auto &sync_stat
= m_snap_sync_stats
.at(dir_path
);
178 ++sync_stat
.deleted_snap_count
;
180 void inc_renamed_snap(const std::string
&dir_path
) {
181 std::scoped_lock
locker(m_lock
);
182 auto &sync_stat
= m_snap_sync_stats
.at(dir_path
);
183 ++sync_stat
.renamed_snap_count
;
185 void set_last_synced_stat(const std::string
&dir_path
, uint64_t snap_id
,
186 const std::string
&snap_name
, double duration
) {
187 std::scoped_lock
locker(m_lock
);
188 _set_last_synced_snap(dir_path
, snap_id
, snap_name
);
189 auto &sync_stat
= m_snap_sync_stats
.at(dir_path
);
190 sync_stat
.last_synced
= clock::now();
191 sync_stat
.last_sync_duration
= duration
;
192 ++sync_stat
.synced_snap_count
;
195 bool should_backoff(const std::string
&dir_path
, int *retval
) {
196 if (m_fs_mirror
->is_blocklisted()) {
197 *retval
= -EBLOCKLISTED
;
201 std::scoped_lock
locker(m_lock
);
203 // ceph defines EBLOCKLISTED to ESHUTDOWN (108). so use
204 // EINPROGRESS to identify shutdown.
205 *retval
= -EINPROGRESS
;
208 auto &dr
= m_registered
.at(dir_path
);
209 if (dr
.replayer
->is_canceled()) {
210 *retval
= -ECANCELED
;
218 typedef std::vector
<std::unique_ptr
<SnapshotReplayerThread
>> SnapshotReplayers
;
221 FSMirror
*m_fs_mirror
;
222 RadosRef m_local_cluster
;
223 Filesystem m_filesystem
;
225 // probably need to be encapsulated when supporting cancelations
226 std::map
<std::string
, DirRegistry
> m_registered
;
227 std::vector
<std::string
> m_directories
;
228 std::map
<std::string
, SnapSyncStat
> m_snap_sync_stats
;
229 MountRef m_local_mount
;
230 ServiceDaemon
*m_service_daemon
;
231 PeerReplayerAdminSocketHook
*m_asok_hook
= nullptr;
234 ceph::condition_variable m_cond
;
235 RadosRef m_remote_cluster
;
236 MountRef m_remote_mount
;
237 bool m_stopping
= false;
238 SnapshotReplayers m_replayers
;
240 ServiceDaemonStats m_service_daemon_stats
;
242 void run(SnapshotReplayerThread
*replayer
);
244 boost::optional
<std::string
> pick_directory();
245 int register_directory(const std::string
&dir_path
, SnapshotReplayerThread
*replayer
);
246 void unregister_directory(const std::string
&dir_path
);
247 int try_lock_directory(const std::string
&dir_path
, SnapshotReplayerThread
*replayer
,
248 DirRegistry
*registry
);
249 void unlock_directory(const std::string
&dir_path
, const DirRegistry
®istry
);
250 void sync_snaps(const std::string
&dir_path
, std::unique_lock
<ceph::mutex
> &locker
);
252 int do_sync_snaps(const std::string
&dir_path
);
253 int build_snap_map(const std::string
&dir_path
, std::map
<uint64_t, std::string
> *snap_map
,
254 bool is_remote
=false);
255 int propagate_snap_deletes(const std::string
&dir_name
, const std::set
<std::string
> &snaps
);
256 int propagate_snap_renames(const std::string
&dir_name
,
257 const std::set
<std::pair
<std::string
,std::string
>> &snaps
);
258 int synchronize(const std::string
&dir_path
, uint64_t snap_id
, const std::string
&snap_name
);
259 int do_synchronize(const std::string
&path
, const std::string
&snap_name
);
261 int cleanup_remote_dir(const std::string
&dir_path
);
262 int remote_mkdir(const std::string
&local_path
, const std::string
&remote_path
,
263 const struct ceph_statx
&stx
);
264 int remote_file_op(const std::string
&dir_path
,
265 const std::string
&local_path
,
266 const std::string
&remote_path
, const struct ceph_statx
&stx
);
267 int remote_copy(const std::string
&dir_path
,
268 const std::string
&local_path
,
269 const std::string
&remote_path
,
270 const struct ceph_statx
&local_stx
);
273 } // namespace mirror
274 } // namespace cephfs
276 #endif // CEPHFS_MIRROR_PEER_REPLAYER_H