]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/cephfs_mirror/PeerReplayer.h
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / tools / cephfs_mirror / PeerReplayer.h
CommitLineData
f67539c2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#ifndef CEPHFS_MIRROR_PEER_REPLAYER_H
5#define CEPHFS_MIRROR_PEER_REPLAYER_H
6
7#include "common/Formatter.h"
8#include "common/Thread.h"
9#include "mds/FSMap.h"
10#include "ServiceDaemon.h"
11#include "Types.h"
12
13namespace cephfs {
14namespace mirror {
15
16class FSMirror;
17class PeerReplayerAdminSocketHook;
18
19class PeerReplayer {
20public:
21 PeerReplayer(CephContext *cct, FSMirror *fs_mirror,
22 RadosRef local_cluster, const Filesystem &filesystem,
23 const Peer &peer, const std::set<std::string, std::less<>> &directories,
24 MountRef mount, ServiceDaemon *service_daemon);
25 ~PeerReplayer();
26
27 // initialize replayer for a peer
28 int init();
29
30 // shutdown replayer for a peer
31 void shutdown();
32
33 // add a directory to mirror queue
20effc67 34 void add_directory(std::string_view dir_root);
f67539c2
TL
35
36 // remove a directory from queue
20effc67 37 void remove_directory(std::string_view dir_root);
f67539c2
TL
38
39 // admin socket helpers
40 void peer_status(Formatter *f);
41
b3b6e05e
TL
42 // reopen logs
43 void reopen_logs();
44
f67539c2
TL
45private:
46 inline static const std::string PRIMARY_SNAP_ID_KEY = "primary_snap_id";
47
48 inline static const std::string SERVICE_DAEMON_FAILED_DIR_COUNT_KEY = "failure_count";
49 inline static const std::string SERVICE_DAEMON_RECOVERED_DIR_COUNT_KEY = "recovery_count";
50
b3b6e05e
TL
51 using Snapshot = std::pair<std::string, uint64_t>;
52
53 // file descriptor "triplet" for synchronizing a snapshot
54 // w/ an added MountRef for accessing "previous" snapshot.
55 struct FHandles {
56 // open file descriptor on the snap directory for snapshot
57 // currently being synchronized. Always use this fd with
58 // @m_local_mount.
59 int c_fd;
60
61 // open file descriptor on the "previous" snapshot or on
62 // dir_root on remote filesystem (based on if the snapshot
63 // can be used for incremental transfer). Always use this
64 // fd with p_mnt which either points to @m_local_mount (
65 // for local incremental comparison) or @m_remote_mount (
66 // for remote incremental comparison).
67 int p_fd;
68 MountRef p_mnt;
69
70 // open file descriptor on dir_root on remote filesystem.
71 // Always use this fd with @m_remote_mount.
72 int r_fd_dir_root;
73 };
74
f67539c2
TL
75 bool is_stopping() {
76 return m_stopping;
77 }
78
79 struct Replayer;
80 class SnapshotReplayerThread : public Thread {
81 public:
82 SnapshotReplayerThread(PeerReplayer *peer_replayer)
83 : m_peer_replayer(peer_replayer) {
84 }
85
86 void *entry() override {
87 m_peer_replayer->run(this);
88 return 0;
89 }
90
f67539c2
TL
91 private:
92 PeerReplayer *m_peer_replayer;
f67539c2
TL
93 };
94
95 struct DirRegistry {
96 int fd;
522d829b 97 bool canceled = false;
f67539c2
TL
98 SnapshotReplayerThread *replayer;
99 };
100
101 struct SyncEntry {
102 std::string epath;
103 ceph_dir_result *dirp; // valid for directories
104 struct ceph_statx stx;
b3b6e05e
TL
105 // set by incremental sync _after_ ensuring missing entries
106 // in the currently synced snapshot have been propagated to
107 // the remote filesystem.
108 bool remote_synced = false;
f67539c2
TL
109
110 SyncEntry(std::string_view path,
111 const struct ceph_statx &stx)
112 : epath(path),
113 stx(stx) {
114 }
115 SyncEntry(std::string_view path,
116 ceph_dir_result *dirp,
117 const struct ceph_statx &stx)
118 : epath(path),
119 dirp(dirp),
120 stx(stx) {
121 }
122
123 bool is_directory() const {
124 return S_ISDIR(stx.stx_mode);
125 }
b3b6e05e
TL
126
127 bool needs_remote_sync() const {
128 return remote_synced;
129 }
130 void set_remote_synced() {
131 remote_synced = true;
132 }
f67539c2
TL
133 };
134
135 using clock = ceph::coarse_mono_clock;
136 using time = ceph::coarse_mono_time;
137
138 // stats sent to service daemon
139 struct ServiceDaemonStats {
140 uint64_t failed_dir_count = 0;
141 uint64_t recovered_dir_count = 0;
142 };
143
144 struct SnapSyncStat {
145 uint64_t nr_failures = 0; // number of consecutive failures
146 boost::optional<time> last_failed; // lat failed timestamp
147 bool failed = false; // hit upper cap for consecutive failures
148 boost::optional<std::pair<uint64_t, std::string>> last_synced_snap;
149 boost::optional<std::pair<uint64_t, std::string>> current_syncing_snap;
150 uint64_t synced_snap_count = 0;
151 uint64_t deleted_snap_count = 0;
152 uint64_t renamed_snap_count = 0;
153 time last_synced = clock::zero();
154 boost::optional<double> last_sync_duration;
155 };
156
b3b6e05e 157 void _inc_failed_count(const std::string &dir_root) {
f67539c2
TL
158 auto max_failures = g_ceph_context->_conf.get_val<uint64_t>(
159 "cephfs_mirror_max_consecutive_failures_per_directory");
b3b6e05e 160 auto &sync_stat = m_snap_sync_stats.at(dir_root);
f67539c2
TL
161 sync_stat.last_failed = clock::now();
162 if (++sync_stat.nr_failures >= max_failures && !sync_stat.failed) {
163 sync_stat.failed = true;
164 ++m_service_daemon_stats.failed_dir_count;
165 m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer,
166 SERVICE_DAEMON_FAILED_DIR_COUNT_KEY,
167 m_service_daemon_stats.failed_dir_count);
168 }
169 }
b3b6e05e
TL
170 void _reset_failed_count(const std::string &dir_root) {
171 auto &sync_stat = m_snap_sync_stats.at(dir_root);
f67539c2
TL
172 if (sync_stat.failed) {
173 ++m_service_daemon_stats.recovered_dir_count;
174 m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer,
175 SERVICE_DAEMON_RECOVERED_DIR_COUNT_KEY,
176 m_service_daemon_stats.recovered_dir_count);
177 }
178 sync_stat.nr_failures = 0;
179 sync_stat.failed = false;
180 sync_stat.last_failed = boost::none;
181 }
182
b3b6e05e 183 void _set_last_synced_snap(const std::string &dir_root, uint64_t snap_id,
f67539c2 184 const std::string &snap_name) {
b3b6e05e 185 auto &sync_stat = m_snap_sync_stats.at(dir_root);
f67539c2
TL
186 sync_stat.last_synced_snap = std::make_pair(snap_id, snap_name);
187 sync_stat.current_syncing_snap = boost::none;
188 }
b3b6e05e 189 void set_last_synced_snap(const std::string &dir_root, uint64_t snap_id,
f67539c2
TL
190 const std::string &snap_name) {
191 std::scoped_lock locker(m_lock);
b3b6e05e 192 _set_last_synced_snap(dir_root, snap_id, snap_name);
f67539c2 193 }
b3b6e05e 194 void set_current_syncing_snap(const std::string &dir_root, uint64_t snap_id,
f67539c2
TL
195 const std::string &snap_name) {
196 std::scoped_lock locker(m_lock);
b3b6e05e 197 auto &sync_stat = m_snap_sync_stats.at(dir_root);
f67539c2
TL
198 sync_stat.current_syncing_snap = std::make_pair(snap_id, snap_name);
199 }
b3b6e05e 200 void clear_current_syncing_snap(const std::string &dir_root) {
f67539c2 201 std::scoped_lock locker(m_lock);
b3b6e05e 202 auto &sync_stat = m_snap_sync_stats.at(dir_root);
f67539c2
TL
203 sync_stat.current_syncing_snap = boost::none;
204 }
b3b6e05e 205 void inc_deleted_snap(const std::string &dir_root) {
f67539c2 206 std::scoped_lock locker(m_lock);
b3b6e05e 207 auto &sync_stat = m_snap_sync_stats.at(dir_root);
f67539c2
TL
208 ++sync_stat.deleted_snap_count;
209 }
b3b6e05e 210 void inc_renamed_snap(const std::string &dir_root) {
f67539c2 211 std::scoped_lock locker(m_lock);
b3b6e05e 212 auto &sync_stat = m_snap_sync_stats.at(dir_root);
f67539c2
TL
213 ++sync_stat.renamed_snap_count;
214 }
b3b6e05e 215 void set_last_synced_stat(const std::string &dir_root, uint64_t snap_id,
f67539c2
TL
216 const std::string &snap_name, double duration) {
217 std::scoped_lock locker(m_lock);
b3b6e05e
TL
218 _set_last_synced_snap(dir_root, snap_id, snap_name);
219 auto &sync_stat = m_snap_sync_stats.at(dir_root);
f67539c2
TL
220 sync_stat.last_synced = clock::now();
221 sync_stat.last_sync_duration = duration;
222 ++sync_stat.synced_snap_count;
223 }
224
b3b6e05e 225 bool should_backoff(const std::string &dir_root, int *retval) {
f67539c2
TL
226 if (m_fs_mirror->is_blocklisted()) {
227 *retval = -EBLOCKLISTED;
228 return true;
229 }
230
231 std::scoped_lock locker(m_lock);
232 if (is_stopping()) {
233 // ceph defines EBLOCKLISTED to ESHUTDOWN (108). so use
234 // EINPROGRESS to identify shutdown.
235 *retval = -EINPROGRESS;
236 return true;
237 }
b3b6e05e 238 auto &dr = m_registered.at(dir_root);
522d829b 239 if (dr.canceled) {
f67539c2
TL
240 *retval = -ECANCELED;
241 return true;
242 }
243
244 *retval = 0;
245 return false;
246 }
247
248 typedef std::vector<std::unique_ptr<SnapshotReplayerThread>> SnapshotReplayers;
249
250 CephContext *m_cct;
251 FSMirror *m_fs_mirror;
252 RadosRef m_local_cluster;
253 Filesystem m_filesystem;
254 Peer m_peer;
255 // probably need to be encapsulated when supporting cancelations
256 std::map<std::string, DirRegistry> m_registered;
257 std::vector<std::string> m_directories;
258 std::map<std::string, SnapSyncStat> m_snap_sync_stats;
259 MountRef m_local_mount;
260 ServiceDaemon *m_service_daemon;
261 PeerReplayerAdminSocketHook *m_asok_hook = nullptr;
262
263 ceph::mutex m_lock;
264 ceph::condition_variable m_cond;
265 RadosRef m_remote_cluster;
266 MountRef m_remote_mount;
267 bool m_stopping = false;
268 SnapshotReplayers m_replayers;
269
270 ServiceDaemonStats m_service_daemon_stats;
271
272 void run(SnapshotReplayerThread *replayer);
273
274 boost::optional<std::string> pick_directory();
b3b6e05e
TL
275 int register_directory(const std::string &dir_root, SnapshotReplayerThread *replayer);
276 void unregister_directory(const std::string &dir_root);
277 int try_lock_directory(const std::string &dir_root, SnapshotReplayerThread *replayer,
f67539c2 278 DirRegistry *registry);
b3b6e05e
TL
279 void unlock_directory(const std::string &dir_root, const DirRegistry &registry);
280 void sync_snaps(const std::string &dir_root, std::unique_lock<ceph::mutex> &locker);
f67539c2 281
b3b6e05e
TL
282
283 int build_snap_map(const std::string &dir_root, std::map<uint64_t, std::string> *snap_map,
f67539c2 284 bool is_remote=false);
b3b6e05e
TL
285
286 int propagate_snap_deletes(const std::string &dir_root, const std::set<std::string> &snaps);
287 int propagate_snap_renames(const std::string &dir_root,
f67539c2 288 const std::set<std::pair<std::string,std::string>> &snaps);
b3b6e05e
TL
289 int propagate_deleted_entries(const std::string &dir_root, const std::string &epath,
290 const FHandles &fh);
291 int cleanup_remote_dir(const std::string &dir_root, const std::string &epath,
292 const FHandles &fh);
293
294 int should_sync_entry(const std::string &epath, const struct ceph_statx &cstx,
295 const FHandles &fh, bool *need_data_sync, bool *need_attr_sync);
296
297 int open_dir(MountRef mnt, const std::string &dir_path, boost::optional<uint64_t> snap_id);
298 int pre_sync_check_and_open_handles(const std::string &dir_root, const Snapshot &current,
299 boost::optional<Snapshot> prev, FHandles *fh);
300 void post_sync_close_handles(const FHandles &fh);
301
302 int do_synchronize(const std::string &dir_root, const Snapshot &current,
303 boost::optional<Snapshot> prev);
304
305 int synchronize(const std::string &dir_root, const Snapshot &current,
306 boost::optional<Snapshot> prev);
307 int do_sync_snaps(const std::string &dir_root);
308
309 int remote_mkdir(const std::string &epath, const struct ceph_statx &stx, const FHandles &fh);
310 int remote_file_op(const std::string &dir_root, const std::string &epath, const struct ceph_statx &stx,
311 const FHandles &fh, bool need_data_sync, bool need_attr_sync);
312 int copy_to_remote(const std::string &dir_root, const std::string &epath, const struct ceph_statx &stx,
313 const FHandles &fh);
aee94f69 314 int sync_perms(const std::string& path);
f67539c2
TL
315};
316
317} // namespace mirror
318} // namespace cephfs
319
320#endif // CEPHFS_MIRROR_PEER_REPLAYER_H