]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #ifndef CEPHFS_MIRROR_PEER_REPLAYER_H | |
5 | #define CEPHFS_MIRROR_PEER_REPLAYER_H | |
6 | ||
7 | #include "common/Formatter.h" | |
8 | #include "common/Thread.h" | |
9 | #include "mds/FSMap.h" | |
10 | #include "ServiceDaemon.h" | |
11 | #include "Types.h" | |
12 | ||
13 | namespace cephfs { | |
14 | namespace mirror { | |
15 | ||
16 | class FSMirror; | |
17 | class PeerReplayerAdminSocketHook; | |
18 | ||
19 | class PeerReplayer { | |
20 | public: | |
21 | PeerReplayer(CephContext *cct, FSMirror *fs_mirror, | |
22 | RadosRef local_cluster, const Filesystem &filesystem, | |
23 | const Peer &peer, const std::set<std::string, std::less<>> &directories, | |
24 | MountRef mount, ServiceDaemon *service_daemon); | |
25 | ~PeerReplayer(); | |
26 | ||
27 | // initialize replayer for a peer | |
28 | int init(); | |
29 | ||
30 | // shutdown replayer for a peer | |
31 | void shutdown(); | |
32 | ||
33 | // add a directory to mirror queue | |
20effc67 | 34 | void add_directory(std::string_view dir_root); |
f67539c2 TL |
35 | |
36 | // remove a directory from queue | |
20effc67 | 37 | void remove_directory(std::string_view dir_root); |
f67539c2 TL |
38 | |
39 | // admin socket helpers | |
40 | void peer_status(Formatter *f); | |
41 | ||
b3b6e05e TL |
42 | // reopen logs |
43 | void reopen_logs(); | |
44 | ||
f67539c2 TL |
45 | private: |
46 | inline static const std::string PRIMARY_SNAP_ID_KEY = "primary_snap_id"; | |
47 | ||
48 | inline static const std::string SERVICE_DAEMON_FAILED_DIR_COUNT_KEY = "failure_count"; | |
49 | inline static const std::string SERVICE_DAEMON_RECOVERED_DIR_COUNT_KEY = "recovery_count"; | |
50 | ||
b3b6e05e TL |
51 | using Snapshot = std::pair<std::string, uint64_t>; |
52 | ||
53 | // file descriptor "triplet" for synchronizing a snapshot | |
54 | // w/ an added MountRef for accessing "previous" snapshot. | |
55 | struct FHandles { | |
56 | // open file descriptor on the snap directory for snapshot | |
57 | // currently being synchronized. Always use this fd with | |
58 | // @m_local_mount. | |
59 | int c_fd; | |
60 | ||
61 | // open file descriptor on the "previous" snapshot or on | |
62 | // dir_root on remote filesystem (based on if the snapshot | |
63 | // can be used for incremental transfer). Always use this | |
64 | // fd with p_mnt which either points to @m_local_mount ( | |
65 | // for local incremental comparison) or @m_remote_mount ( | |
66 | // for remote incremental comparison). | |
67 | int p_fd; | |
68 | MountRef p_mnt; | |
69 | ||
70 | // open file descriptor on dir_root on remote filesystem. | |
71 | // Always use this fd with @m_remote_mount. | |
72 | int r_fd_dir_root; | |
73 | }; | |
74 | ||
f67539c2 TL |
75 | bool is_stopping() { |
76 | return m_stopping; | |
77 | } | |
78 | ||
79 | struct Replayer; | |
80 | class SnapshotReplayerThread : public Thread { | |
81 | public: | |
82 | SnapshotReplayerThread(PeerReplayer *peer_replayer) | |
83 | : m_peer_replayer(peer_replayer) { | |
84 | } | |
85 | ||
86 | void *entry() override { | |
87 | m_peer_replayer->run(this); | |
88 | return 0; | |
89 | } | |
90 | ||
f67539c2 TL |
91 | private: |
92 | PeerReplayer *m_peer_replayer; | |
f67539c2 TL |
93 | }; |
94 | ||
95 | struct DirRegistry { | |
96 | int fd; | |
522d829b | 97 | bool canceled = false; |
f67539c2 TL |
98 | SnapshotReplayerThread *replayer; |
99 | }; | |
100 | ||
101 | struct SyncEntry { | |
102 | std::string epath; | |
103 | ceph_dir_result *dirp; // valid for directories | |
104 | struct ceph_statx stx; | |
b3b6e05e TL |
105 | // set by incremental sync _after_ ensuring missing entries |
106 | // in the currently synced snapshot have been propagated to | |
107 | // the remote filesystem. | |
108 | bool remote_synced = false; | |
f67539c2 TL |
109 | |
110 | SyncEntry(std::string_view path, | |
111 | const struct ceph_statx &stx) | |
112 | : epath(path), | |
113 | stx(stx) { | |
114 | } | |
115 | SyncEntry(std::string_view path, | |
116 | ceph_dir_result *dirp, | |
117 | const struct ceph_statx &stx) | |
118 | : epath(path), | |
119 | dirp(dirp), | |
120 | stx(stx) { | |
121 | } | |
122 | ||
123 | bool is_directory() const { | |
124 | return S_ISDIR(stx.stx_mode); | |
125 | } | |
b3b6e05e TL |
126 | |
127 | bool needs_remote_sync() const { | |
128 | return remote_synced; | |
129 | } | |
130 | void set_remote_synced() { | |
131 | remote_synced = true; | |
132 | } | |
f67539c2 TL |
133 | }; |
134 | ||
135 | using clock = ceph::coarse_mono_clock; | |
136 | using time = ceph::coarse_mono_time; | |
137 | ||
138 | // stats sent to service daemon | |
139 | struct ServiceDaemonStats { | |
140 | uint64_t failed_dir_count = 0; | |
141 | uint64_t recovered_dir_count = 0; | |
142 | }; | |
143 | ||
144 | struct SnapSyncStat { | |
145 | uint64_t nr_failures = 0; // number of consecutive failures | |
146 | boost::optional<time> last_failed; // lat failed timestamp | |
147 | bool failed = false; // hit upper cap for consecutive failures | |
148 | boost::optional<std::pair<uint64_t, std::string>> last_synced_snap; | |
149 | boost::optional<std::pair<uint64_t, std::string>> current_syncing_snap; | |
150 | uint64_t synced_snap_count = 0; | |
151 | uint64_t deleted_snap_count = 0; | |
152 | uint64_t renamed_snap_count = 0; | |
153 | time last_synced = clock::zero(); | |
154 | boost::optional<double> last_sync_duration; | |
155 | }; | |
156 | ||
b3b6e05e | 157 | void _inc_failed_count(const std::string &dir_root) { |
f67539c2 TL |
158 | auto max_failures = g_ceph_context->_conf.get_val<uint64_t>( |
159 | "cephfs_mirror_max_consecutive_failures_per_directory"); | |
b3b6e05e | 160 | auto &sync_stat = m_snap_sync_stats.at(dir_root); |
f67539c2 TL |
161 | sync_stat.last_failed = clock::now(); |
162 | if (++sync_stat.nr_failures >= max_failures && !sync_stat.failed) { | |
163 | sync_stat.failed = true; | |
164 | ++m_service_daemon_stats.failed_dir_count; | |
165 | m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer, | |
166 | SERVICE_DAEMON_FAILED_DIR_COUNT_KEY, | |
167 | m_service_daemon_stats.failed_dir_count); | |
168 | } | |
169 | } | |
b3b6e05e TL |
170 | void _reset_failed_count(const std::string &dir_root) { |
171 | auto &sync_stat = m_snap_sync_stats.at(dir_root); | |
f67539c2 TL |
172 | if (sync_stat.failed) { |
173 | ++m_service_daemon_stats.recovered_dir_count; | |
174 | m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer, | |
175 | SERVICE_DAEMON_RECOVERED_DIR_COUNT_KEY, | |
176 | m_service_daemon_stats.recovered_dir_count); | |
177 | } | |
178 | sync_stat.nr_failures = 0; | |
179 | sync_stat.failed = false; | |
180 | sync_stat.last_failed = boost::none; | |
181 | } | |
182 | ||
b3b6e05e | 183 | void _set_last_synced_snap(const std::string &dir_root, uint64_t snap_id, |
f67539c2 | 184 | const std::string &snap_name) { |
b3b6e05e | 185 | auto &sync_stat = m_snap_sync_stats.at(dir_root); |
f67539c2 TL |
186 | sync_stat.last_synced_snap = std::make_pair(snap_id, snap_name); |
187 | sync_stat.current_syncing_snap = boost::none; | |
188 | } | |
b3b6e05e | 189 | void set_last_synced_snap(const std::string &dir_root, uint64_t snap_id, |
f67539c2 TL |
190 | const std::string &snap_name) { |
191 | std::scoped_lock locker(m_lock); | |
b3b6e05e | 192 | _set_last_synced_snap(dir_root, snap_id, snap_name); |
f67539c2 | 193 | } |
b3b6e05e | 194 | void set_current_syncing_snap(const std::string &dir_root, uint64_t snap_id, |
f67539c2 TL |
195 | const std::string &snap_name) { |
196 | std::scoped_lock locker(m_lock); | |
b3b6e05e | 197 | auto &sync_stat = m_snap_sync_stats.at(dir_root); |
f67539c2 TL |
198 | sync_stat.current_syncing_snap = std::make_pair(snap_id, snap_name); |
199 | } | |
b3b6e05e | 200 | void clear_current_syncing_snap(const std::string &dir_root) { |
f67539c2 | 201 | std::scoped_lock locker(m_lock); |
b3b6e05e | 202 | auto &sync_stat = m_snap_sync_stats.at(dir_root); |
f67539c2 TL |
203 | sync_stat.current_syncing_snap = boost::none; |
204 | } | |
b3b6e05e | 205 | void inc_deleted_snap(const std::string &dir_root) { |
f67539c2 | 206 | std::scoped_lock locker(m_lock); |
b3b6e05e | 207 | auto &sync_stat = m_snap_sync_stats.at(dir_root); |
f67539c2 TL |
208 | ++sync_stat.deleted_snap_count; |
209 | } | |
b3b6e05e | 210 | void inc_renamed_snap(const std::string &dir_root) { |
f67539c2 | 211 | std::scoped_lock locker(m_lock); |
b3b6e05e | 212 | auto &sync_stat = m_snap_sync_stats.at(dir_root); |
f67539c2 TL |
213 | ++sync_stat.renamed_snap_count; |
214 | } | |
b3b6e05e | 215 | void set_last_synced_stat(const std::string &dir_root, uint64_t snap_id, |
f67539c2 TL |
216 | const std::string &snap_name, double duration) { |
217 | std::scoped_lock locker(m_lock); | |
b3b6e05e TL |
218 | _set_last_synced_snap(dir_root, snap_id, snap_name); |
219 | auto &sync_stat = m_snap_sync_stats.at(dir_root); | |
f67539c2 TL |
220 | sync_stat.last_synced = clock::now(); |
221 | sync_stat.last_sync_duration = duration; | |
222 | ++sync_stat.synced_snap_count; | |
223 | } | |
224 | ||
b3b6e05e | 225 | bool should_backoff(const std::string &dir_root, int *retval) { |
f67539c2 TL |
226 | if (m_fs_mirror->is_blocklisted()) { |
227 | *retval = -EBLOCKLISTED; | |
228 | return true; | |
229 | } | |
230 | ||
231 | std::scoped_lock locker(m_lock); | |
232 | if (is_stopping()) { | |
233 | // ceph defines EBLOCKLISTED to ESHUTDOWN (108). so use | |
234 | // EINPROGRESS to identify shutdown. | |
235 | *retval = -EINPROGRESS; | |
236 | return true; | |
237 | } | |
b3b6e05e | 238 | auto &dr = m_registered.at(dir_root); |
522d829b | 239 | if (dr.canceled) { |
f67539c2 TL |
240 | *retval = -ECANCELED; |
241 | return true; | |
242 | } | |
243 | ||
244 | *retval = 0; | |
245 | return false; | |
246 | } | |
247 | ||
248 | typedef std::vector<std::unique_ptr<SnapshotReplayerThread>> SnapshotReplayers; | |
249 | ||
250 | CephContext *m_cct; | |
251 | FSMirror *m_fs_mirror; | |
252 | RadosRef m_local_cluster; | |
253 | Filesystem m_filesystem; | |
254 | Peer m_peer; | |
255 | // probably need to be encapsulated when supporting cancelations | |
256 | std::map<std::string, DirRegistry> m_registered; | |
257 | std::vector<std::string> m_directories; | |
258 | std::map<std::string, SnapSyncStat> m_snap_sync_stats; | |
259 | MountRef m_local_mount; | |
260 | ServiceDaemon *m_service_daemon; | |
261 | PeerReplayerAdminSocketHook *m_asok_hook = nullptr; | |
262 | ||
263 | ceph::mutex m_lock; | |
264 | ceph::condition_variable m_cond; | |
265 | RadosRef m_remote_cluster; | |
266 | MountRef m_remote_mount; | |
267 | bool m_stopping = false; | |
268 | SnapshotReplayers m_replayers; | |
269 | ||
270 | ServiceDaemonStats m_service_daemon_stats; | |
271 | ||
272 | void run(SnapshotReplayerThread *replayer); | |
273 | ||
274 | boost::optional<std::string> pick_directory(); | |
b3b6e05e TL |
275 | int register_directory(const std::string &dir_root, SnapshotReplayerThread *replayer); |
276 | void unregister_directory(const std::string &dir_root); | |
277 | int try_lock_directory(const std::string &dir_root, SnapshotReplayerThread *replayer, | |
f67539c2 | 278 | DirRegistry *registry); |
b3b6e05e TL |
279 | void unlock_directory(const std::string &dir_root, const DirRegistry ®istry); |
280 | void sync_snaps(const std::string &dir_root, std::unique_lock<ceph::mutex> &locker); | |
f67539c2 | 281 | |
b3b6e05e TL |
282 | |
283 | int build_snap_map(const std::string &dir_root, std::map<uint64_t, std::string> *snap_map, | |
f67539c2 | 284 | bool is_remote=false); |
b3b6e05e TL |
285 | |
286 | int propagate_snap_deletes(const std::string &dir_root, const std::set<std::string> &snaps); | |
287 | int propagate_snap_renames(const std::string &dir_root, | |
f67539c2 | 288 | const std::set<std::pair<std::string,std::string>> &snaps); |
b3b6e05e TL |
289 | int propagate_deleted_entries(const std::string &dir_root, const std::string &epath, |
290 | const FHandles &fh); | |
291 | int cleanup_remote_dir(const std::string &dir_root, const std::string &epath, | |
292 | const FHandles &fh); | |
293 | ||
294 | int should_sync_entry(const std::string &epath, const struct ceph_statx &cstx, | |
295 | const FHandles &fh, bool *need_data_sync, bool *need_attr_sync); | |
296 | ||
297 | int open_dir(MountRef mnt, const std::string &dir_path, boost::optional<uint64_t> snap_id); | |
298 | int pre_sync_check_and_open_handles(const std::string &dir_root, const Snapshot ¤t, | |
299 | boost::optional<Snapshot> prev, FHandles *fh); | |
300 | void post_sync_close_handles(const FHandles &fh); | |
301 | ||
302 | int do_synchronize(const std::string &dir_root, const Snapshot ¤t, | |
303 | boost::optional<Snapshot> prev); | |
304 | ||
305 | int synchronize(const std::string &dir_root, const Snapshot ¤t, | |
306 | boost::optional<Snapshot> prev); | |
307 | int do_sync_snaps(const std::string &dir_root); | |
308 | ||
309 | int remote_mkdir(const std::string &epath, const struct ceph_statx &stx, const FHandles &fh); | |
310 | int remote_file_op(const std::string &dir_root, const std::string &epath, const struct ceph_statx &stx, | |
311 | const FHandles &fh, bool need_data_sync, bool need_attr_sync); | |
312 | int copy_to_remote(const std::string &dir_root, const std::string &epath, const struct ceph_statx &stx, | |
313 | const FHandles &fh); | |
aee94f69 | 314 | int sync_perms(const std::string& path); |
f67539c2 TL |
315 | }; |
316 | ||
317 | } // namespace mirror | |
318 | } // namespace cephfs | |
319 | ||
320 | #endif // CEPHFS_MIRROR_PEER_REPLAYER_H |