]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/cephfs_mirror/PeerReplayer.h
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / tools / cephfs_mirror / PeerReplayer.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #ifndef CEPHFS_MIRROR_PEER_REPLAYER_H
5 #define CEPHFS_MIRROR_PEER_REPLAYER_H
6
7 #include "common/Formatter.h"
8 #include "common/Thread.h"
9 #include "mds/FSMap.h"
10 #include "ServiceDaemon.h"
11 #include "Types.h"
12
13 namespace cephfs {
14 namespace mirror {
15
16 class FSMirror;
17 class PeerReplayerAdminSocketHook;
18
19 class PeerReplayer {
20 public:
21 PeerReplayer(CephContext *cct, FSMirror *fs_mirror,
22 RadosRef local_cluster, const Filesystem &filesystem,
23 const Peer &peer, const std::set<std::string, std::less<>> &directories,
24 MountRef mount, ServiceDaemon *service_daemon);
25 ~PeerReplayer();
26
27 // initialize replayer for a peer
28 int init();
29
30 // shutdown replayer for a peer
31 void shutdown();
32
33 // add a directory to mirror queue
34 void add_directory(string_view dir_path);
35
36 // remove a directory from queue
37 void remove_directory(string_view dir_path);
38
39 // admin socket helpers
40 void peer_status(Formatter *f);
41
42 private:
43 inline static const std::string PRIMARY_SNAP_ID_KEY = "primary_snap_id";
44
45 inline static const std::string SERVICE_DAEMON_FAILED_DIR_COUNT_KEY = "failure_count";
46 inline static const std::string SERVICE_DAEMON_RECOVERED_DIR_COUNT_KEY = "recovery_count";
47
48 bool is_stopping() {
49 return m_stopping;
50 }
51
52 struct Replayer;
53 class SnapshotReplayerThread : public Thread {
54 public:
55 SnapshotReplayerThread(PeerReplayer *peer_replayer)
56 : m_peer_replayer(peer_replayer) {
57 }
58
59 void *entry() override {
60 m_peer_replayer->run(this);
61 return 0;
62 }
63
64 void cancel() {
65 canceled = true;
66 }
67
68 bool is_canceled() const {
69 return canceled;
70 }
71
72 private:
73 PeerReplayer *m_peer_replayer;
74 bool canceled = false;
75 };
76
77 struct DirRegistry {
78 int fd;
79 SnapshotReplayerThread *replayer;
80 };
81
82 struct SyncEntry {
83 std::string epath;
84 ceph_dir_result *dirp; // valid for directories
85 struct ceph_statx stx;
86
87 SyncEntry(std::string_view path,
88 const struct ceph_statx &stx)
89 : epath(path),
90 stx(stx) {
91 }
92 SyncEntry(std::string_view path,
93 ceph_dir_result *dirp,
94 const struct ceph_statx &stx)
95 : epath(path),
96 dirp(dirp),
97 stx(stx) {
98 }
99
100 bool is_directory() const {
101 return S_ISDIR(stx.stx_mode);
102 }
103 };
104
105 using clock = ceph::coarse_mono_clock;
106 using time = ceph::coarse_mono_time;
107
108 // stats sent to service daemon
109 struct ServiceDaemonStats {
110 uint64_t failed_dir_count = 0;
111 uint64_t recovered_dir_count = 0;
112 };
113
114 struct SnapSyncStat {
115 uint64_t nr_failures = 0; // number of consecutive failures
116 boost::optional<time> last_failed; // lat failed timestamp
117 bool failed = false; // hit upper cap for consecutive failures
118 boost::optional<std::pair<uint64_t, std::string>> last_synced_snap;
119 boost::optional<std::pair<uint64_t, std::string>> current_syncing_snap;
120 uint64_t synced_snap_count = 0;
121 uint64_t deleted_snap_count = 0;
122 uint64_t renamed_snap_count = 0;
123 time last_synced = clock::zero();
124 boost::optional<double> last_sync_duration;
125 };
126
127 void _inc_failed_count(const std::string &dir_path) {
128 auto max_failures = g_ceph_context->_conf.get_val<uint64_t>(
129 "cephfs_mirror_max_consecutive_failures_per_directory");
130 auto &sync_stat = m_snap_sync_stats.at(dir_path);
131 sync_stat.last_failed = clock::now();
132 if (++sync_stat.nr_failures >= max_failures && !sync_stat.failed) {
133 sync_stat.failed = true;
134 ++m_service_daemon_stats.failed_dir_count;
135 m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer,
136 SERVICE_DAEMON_FAILED_DIR_COUNT_KEY,
137 m_service_daemon_stats.failed_dir_count);
138 }
139 }
140 void _reset_failed_count(const std::string &dir_path) {
141 auto &sync_stat = m_snap_sync_stats.at(dir_path);
142 if (sync_stat.failed) {
143 ++m_service_daemon_stats.recovered_dir_count;
144 m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer,
145 SERVICE_DAEMON_RECOVERED_DIR_COUNT_KEY,
146 m_service_daemon_stats.recovered_dir_count);
147 }
148 sync_stat.nr_failures = 0;
149 sync_stat.failed = false;
150 sync_stat.last_failed = boost::none;
151 }
152
153 void _set_last_synced_snap(const std::string &dir_path, uint64_t snap_id,
154 const std::string &snap_name) {
155 auto &sync_stat = m_snap_sync_stats.at(dir_path);
156 sync_stat.last_synced_snap = std::make_pair(snap_id, snap_name);
157 sync_stat.current_syncing_snap = boost::none;
158 }
159 void set_last_synced_snap(const std::string &dir_path, uint64_t snap_id,
160 const std::string &snap_name) {
161 std::scoped_lock locker(m_lock);
162 _set_last_synced_snap(dir_path, snap_id, snap_name);
163 }
164 void set_current_syncing_snap(const std::string &dir_path, uint64_t snap_id,
165 const std::string &snap_name) {
166 std::scoped_lock locker(m_lock);
167 auto &sync_stat = m_snap_sync_stats.at(dir_path);
168 sync_stat.current_syncing_snap = std::make_pair(snap_id, snap_name);
169 }
170 void clear_current_syncing_snap(const std::string &dir_path) {
171 std::scoped_lock locker(m_lock);
172 auto &sync_stat = m_snap_sync_stats.at(dir_path);
173 sync_stat.current_syncing_snap = boost::none;
174 }
175 void inc_deleted_snap(const std::string &dir_path) {
176 std::scoped_lock locker(m_lock);
177 auto &sync_stat = m_snap_sync_stats.at(dir_path);
178 ++sync_stat.deleted_snap_count;
179 }
180 void inc_renamed_snap(const std::string &dir_path) {
181 std::scoped_lock locker(m_lock);
182 auto &sync_stat = m_snap_sync_stats.at(dir_path);
183 ++sync_stat.renamed_snap_count;
184 }
185 void set_last_synced_stat(const std::string &dir_path, uint64_t snap_id,
186 const std::string &snap_name, double duration) {
187 std::scoped_lock locker(m_lock);
188 _set_last_synced_snap(dir_path, snap_id, snap_name);
189 auto &sync_stat = m_snap_sync_stats.at(dir_path);
190 sync_stat.last_synced = clock::now();
191 sync_stat.last_sync_duration = duration;
192 ++sync_stat.synced_snap_count;
193 }
194
195 bool should_backoff(const std::string &dir_path, int *retval) {
196 if (m_fs_mirror->is_blocklisted()) {
197 *retval = -EBLOCKLISTED;
198 return true;
199 }
200
201 std::scoped_lock locker(m_lock);
202 if (is_stopping()) {
203 // ceph defines EBLOCKLISTED to ESHUTDOWN (108). so use
204 // EINPROGRESS to identify shutdown.
205 *retval = -EINPROGRESS;
206 return true;
207 }
208 auto &dr = m_registered.at(dir_path);
209 if (dr.replayer->is_canceled()) {
210 *retval = -ECANCELED;
211 return true;
212 }
213
214 *retval = 0;
215 return false;
216 }
217
218 typedef std::vector<std::unique_ptr<SnapshotReplayerThread>> SnapshotReplayers;
219
220 CephContext *m_cct;
221 FSMirror *m_fs_mirror;
222 RadosRef m_local_cluster;
223 Filesystem m_filesystem;
224 Peer m_peer;
225 // probably need to be encapsulated when supporting cancelations
226 std::map<std::string, DirRegistry> m_registered;
227 std::vector<std::string> m_directories;
228 std::map<std::string, SnapSyncStat> m_snap_sync_stats;
229 MountRef m_local_mount;
230 ServiceDaemon *m_service_daemon;
231 PeerReplayerAdminSocketHook *m_asok_hook = nullptr;
232
233 ceph::mutex m_lock;
234 ceph::condition_variable m_cond;
235 RadosRef m_remote_cluster;
236 MountRef m_remote_mount;
237 bool m_stopping = false;
238 SnapshotReplayers m_replayers;
239
240 ServiceDaemonStats m_service_daemon_stats;
241
242 void run(SnapshotReplayerThread *replayer);
243
244 boost::optional<std::string> pick_directory();
245 int register_directory(const std::string &dir_path, SnapshotReplayerThread *replayer);
246 void unregister_directory(const std::string &dir_path);
247 int try_lock_directory(const std::string &dir_path, SnapshotReplayerThread *replayer,
248 DirRegistry *registry);
249 void unlock_directory(const std::string &dir_path, const DirRegistry &registry);
250 void sync_snaps(const std::string &dir_path, std::unique_lock<ceph::mutex> &locker);
251
252 int do_sync_snaps(const std::string &dir_path);
253 int build_snap_map(const std::string &dir_path, std::map<uint64_t, std::string> *snap_map,
254 bool is_remote=false);
255 int propagate_snap_deletes(const std::string &dir_name, const std::set<std::string> &snaps);
256 int propagate_snap_renames(const std::string &dir_name,
257 const std::set<std::pair<std::string,std::string>> &snaps);
258 int synchronize(const std::string &dir_path, uint64_t snap_id, const std::string &snap_name);
259 int do_synchronize(const std::string &path, const std::string &snap_name);
260
261 int cleanup_remote_dir(const std::string &dir_path);
262 int remote_mkdir(const std::string &local_path, const std::string &remote_path,
263 const struct ceph_statx &stx);
264 int remote_file_op(const std::string &dir_path,
265 const std::string &local_path,
266 const std::string &remote_path, const struct ceph_statx &stx);
267 int remote_copy(const std::string &dir_path,
268 const std::string &local_path,
269 const std::string &remote_path,
270 const struct ceph_statx &local_stx);
271 };
272
273 } // namespace mirror
274 } // namespace cephfs
275
276 #endif // CEPHFS_MIRROR_PEER_REPLAYER_H