]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include <stack> | |
5 | #include <fcntl.h> | |
6 | #include <algorithm> | |
7 | #include <sys/time.h> | |
8 | #include <sys/file.h> | |
9 | ||
10 | #include "common/admin_socket.h" | |
11 | #include "common/ceph_context.h" | |
12 | #include "common/debug.h" | |
13 | #include "common/errno.h" | |
14 | #include "FSMirror.h" | |
15 | #include "PeerReplayer.h" | |
16 | #include "Utils.h" | |
17 | ||
18 | #include "json_spirit/json_spirit.h" | |
19 | ||
20 | #define dout_context g_ceph_context | |
21 | #define dout_subsys ceph_subsys_cephfs_mirror | |
22 | #undef dout_prefix | |
23 | #define dout_prefix *_dout << "cephfs::mirror::PeerReplayer(" \ | |
24 | << m_peer.uuid << ") " << __func__ | |
25 | ||
26 | namespace cephfs { | |
27 | namespace mirror { | |
28 | ||
29 | namespace { | |
30 | ||
31 | const std::string PEER_CONFIG_KEY_PREFIX = "cephfs/mirror/peer"; | |
32 | ||
33 | std::string snapshot_dir_path(CephContext *cct, const std::string &path) { | |
34 | return path + "/" + cct->_conf->client_snapdir; | |
35 | } | |
36 | ||
37 | std::string snapshot_path(const std::string &snap_dir, const std::string &snap_name) { | |
38 | return snap_dir + "/" + snap_name; | |
39 | } | |
40 | ||
41 | std::string snapshot_path(CephContext *cct, const std::string &path, const std::string &snap_name) { | |
42 | return path + "/" + cct->_conf->client_snapdir + "/" + snap_name; | |
43 | } | |
44 | ||
45 | std::string entry_path(const std::string &dir, const std::string &name) { | |
46 | return dir + "/" + name; | |
47 | } | |
48 | ||
49 | std::map<std::string, std::string> decode_snap_metadata(snap_metadata *snap_metadata, | |
50 | size_t nr_snap_metadata) { | |
51 | std::map<std::string, std::string> metadata; | |
52 | for (size_t i = 0; i < nr_snap_metadata; ++i) { | |
53 | metadata.emplace(snap_metadata[i].key, snap_metadata[i].value); | |
54 | } | |
55 | ||
56 | return metadata; | |
57 | } | |
58 | ||
59 | std::string peer_config_key(const std::string &fs_name, const std::string &uuid) { | |
60 | return PEER_CONFIG_KEY_PREFIX + "/" + fs_name + "/" + uuid; | |
61 | } | |
62 | ||
63 | class PeerAdminSocketCommand { | |
64 | public: | |
65 | virtual ~PeerAdminSocketCommand() { | |
66 | } | |
67 | virtual int call(Formatter *f) = 0; | |
68 | }; | |
69 | ||
70 | class StatusCommand : public PeerAdminSocketCommand { | |
71 | public: | |
72 | explicit StatusCommand(PeerReplayer *peer_replayer) | |
73 | : peer_replayer(peer_replayer) { | |
74 | } | |
75 | ||
76 | int call(Formatter *f) override { | |
77 | peer_replayer->peer_status(f); | |
78 | return 0; | |
79 | } | |
80 | ||
81 | private: | |
82 | PeerReplayer *peer_replayer; | |
83 | }; | |
84 | ||
85 | } // anonymous namespace | |
86 | ||
87 | class PeerReplayerAdminSocketHook : public AdminSocketHook { | |
88 | public: | |
89 | PeerReplayerAdminSocketHook(CephContext *cct, const Filesystem &filesystem, | |
90 | const Peer &peer, PeerReplayer *peer_replayer) | |
91 | : admin_socket(cct->get_admin_socket()) { | |
92 | int r; | |
93 | std::string cmd; | |
94 | ||
95 | // mirror peer status format is name@id uuid | |
96 | cmd = "fs mirror peer status " | |
97 | + stringify(filesystem.fs_name) + "@" + stringify(filesystem.fscid) | |
98 | + " " | |
99 | + stringify(peer.uuid); | |
100 | r = admin_socket->register_command( | |
101 | cmd, this, "get peer mirror status"); | |
102 | if (r == 0) { | |
103 | commands[cmd] = new StatusCommand(peer_replayer); | |
104 | } | |
105 | } | |
106 | ||
107 | ~PeerReplayerAdminSocketHook() override { | |
108 | admin_socket->unregister_commands(this); | |
109 | for (auto &[command, cmdptr] : commands) { | |
110 | delete cmdptr; | |
111 | } | |
112 | } | |
113 | ||
114 | int call(std::string_view command, const cmdmap_t& cmdmap, | |
115 | Formatter *f, std::ostream &errss, bufferlist &out) override { | |
116 | auto p = commands.at(std::string(command)); | |
117 | return p->call(f); | |
118 | } | |
119 | ||
120 | private: | |
121 | typedef std::map<std::string, PeerAdminSocketCommand*, std::less<>> Commands; | |
122 | ||
123 | AdminSocket *admin_socket; | |
124 | Commands commands; | |
125 | }; | |
126 | ||
127 | PeerReplayer::PeerReplayer(CephContext *cct, FSMirror *fs_mirror, | |
128 | RadosRef local_cluster, const Filesystem &filesystem, | |
129 | const Peer &peer, const std::set<std::string, std::less<>> &directories, | |
130 | MountRef mount, ServiceDaemon *service_daemon) | |
131 | : m_cct(cct), | |
132 | m_fs_mirror(fs_mirror), | |
133 | m_local_cluster(local_cluster), | |
134 | m_filesystem(filesystem), | |
135 | m_peer(peer), | |
136 | m_directories(directories.begin(), directories.end()), | |
137 | m_local_mount(mount), | |
138 | m_service_daemon(service_daemon), | |
139 | m_asok_hook(new PeerReplayerAdminSocketHook(cct, filesystem, peer, this)), | |
140 | m_lock(ceph::make_mutex("cephfs::mirror::PeerReplayer::" + stringify(peer.uuid))) { | |
141 | // reset sync stats sent via service daemon | |
142 | m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer, | |
143 | SERVICE_DAEMON_FAILED_DIR_COUNT_KEY, (uint64_t)0); | |
144 | m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer, | |
145 | SERVICE_DAEMON_RECOVERED_DIR_COUNT_KEY, (uint64_t)0); | |
146 | } | |
147 | ||
148 | PeerReplayer::~PeerReplayer() { | |
149 | delete m_asok_hook; | |
150 | } | |
151 | ||
152 | int PeerReplayer::init() { | |
153 | dout(20) << ": initial dir list=[" << m_directories << "]" << dendl; | |
154 | for (auto &dir_path : m_directories) { | |
155 | m_snap_sync_stats.emplace(dir_path, SnapSyncStat()); | |
156 | } | |
157 | ||
158 | auto &remote_client = m_peer.remote.client_name; | |
159 | auto &remote_cluster = m_peer.remote.cluster_name; | |
160 | auto remote_filesystem = Filesystem{0, m_peer.remote.fs_name}; | |
161 | ||
162 | std::string key = peer_config_key(m_filesystem.fs_name, m_peer.uuid); | |
163 | std::string cmd = | |
164 | "{" | |
165 | "\"prefix\": \"config-key get\", " | |
166 | "\"key\": \"" + key + "\"" | |
167 | "}"; | |
168 | ||
169 | bufferlist in_bl; | |
170 | bufferlist out_bl; | |
171 | ||
172 | int r = m_local_cluster->mon_command(cmd, in_bl, &out_bl, nullptr); | |
173 | dout(5) << ": mon command r=" << r << dendl; | |
174 | if (r < 0 && r != -ENOENT) { | |
175 | return r; | |
176 | } | |
177 | ||
178 | std::string mon_host; | |
179 | std::string cephx_key; | |
180 | if (!r) { | |
181 | json_spirit::mValue root; | |
182 | if (!json_spirit::read(out_bl.to_str(), root)) { | |
183 | derr << ": invalid config-key JSON" << dendl; | |
184 | return -EBADMSG; | |
185 | } | |
186 | try { | |
187 | auto &root_obj = root.get_obj(); | |
188 | mon_host = root_obj.at("mon_host").get_str(); | |
189 | cephx_key = root_obj.at("key").get_str(); | |
190 | dout(0) << ": remote monitor host=" << mon_host << dendl; | |
191 | } catch (std::runtime_error&) { | |
192 | derr << ": unexpected JSON received" << dendl; | |
193 | return -EBADMSG; | |
194 | } | |
195 | } | |
196 | ||
197 | r = connect(remote_client, remote_cluster, &m_remote_cluster, mon_host, cephx_key); | |
198 | if (r < 0) { | |
199 | derr << ": error connecting to remote cluster: " << cpp_strerror(r) | |
200 | << dendl; | |
201 | return r; | |
202 | } | |
203 | ||
204 | r = mount(m_remote_cluster, remote_filesystem, false, &m_remote_mount); | |
205 | if (r < 0) { | |
206 | m_remote_cluster.reset(); | |
207 | derr << ": error mounting remote filesystem=" << remote_filesystem << dendl; | |
208 | return r; | |
209 | } | |
210 | ||
211 | std::scoped_lock locker(m_lock); | |
212 | auto nr_replayers = g_ceph_context->_conf.get_val<uint64_t>( | |
213 | "cephfs_mirror_max_concurrent_directory_syncs"); | |
214 | dout(20) << ": spawning " << nr_replayers << " snapshot replayer(s)" << dendl; | |
215 | ||
216 | while (nr_replayers-- > 0) { | |
217 | std::unique_ptr<SnapshotReplayerThread> replayer( | |
218 | new SnapshotReplayerThread(this)); | |
219 | std::string name("replayer-" + stringify(nr_replayers)); | |
220 | replayer->create(name.c_str()); | |
221 | m_replayers.push_back(std::move(replayer)); | |
222 | } | |
223 | ||
224 | return 0; | |
225 | } | |
226 | ||
227 | void PeerReplayer::shutdown() { | |
228 | dout(20) << dendl; | |
229 | ||
230 | { | |
231 | std::scoped_lock locker(m_lock); | |
232 | ceph_assert(!m_stopping); | |
233 | m_stopping = true; | |
234 | m_cond.notify_all(); | |
235 | } | |
236 | ||
237 | for (auto &replayer : m_replayers) { | |
238 | replayer->join(); | |
239 | } | |
240 | m_replayers.clear(); | |
241 | ceph_unmount(m_remote_mount); | |
242 | ceph_release(m_remote_mount); | |
243 | m_remote_mount = nullptr; | |
244 | m_remote_cluster.reset(); | |
245 | } | |
246 | ||
247 | void PeerReplayer::add_directory(string_view dir_path) { | |
248 | dout(20) << ": dir_path=" << dir_path << dendl; | |
249 | ||
250 | std::scoped_lock locker(m_lock); | |
251 | m_directories.emplace_back(dir_path); | |
252 | m_snap_sync_stats.emplace(dir_path, SnapSyncStat()); | |
253 | m_cond.notify_all(); | |
254 | } | |
255 | ||
256 | void PeerReplayer::remove_directory(string_view dir_path) { | |
257 | dout(20) << ": dir_path=" << dir_path << dendl; | |
258 | auto _dir_path = std::string(dir_path); | |
259 | ||
260 | std::scoped_lock locker(m_lock); | |
261 | auto it = std::find(m_directories.begin(), m_directories.end(), _dir_path); | |
262 | if (it != m_directories.end()) { | |
263 | m_directories.erase(it); | |
264 | } | |
265 | ||
266 | auto it1 = m_registered.find(_dir_path); | |
267 | if (it1 == m_registered.end()) { | |
268 | m_snap_sync_stats.erase(_dir_path); | |
269 | } else { | |
270 | it1->second.replayer->cancel(); | |
271 | } | |
272 | m_cond.notify_all(); | |
273 | } | |
274 | ||
275 | boost::optional<std::string> PeerReplayer::pick_directory() { | |
276 | dout(20) << dendl; | |
277 | ||
278 | auto now = clock::now(); | |
279 | auto retry_timo = g_ceph_context->_conf.get_val<uint64_t>( | |
280 | "cephfs_mirror_retry_failed_directories_interval"); | |
281 | ||
282 | boost::optional<std::string> candidate; | |
283 | for (auto &dir_path : m_directories) { | |
284 | auto &sync_stat = m_snap_sync_stats.at(dir_path); | |
285 | if (sync_stat.failed) { | |
286 | std::chrono::duration<double> d = now - *sync_stat.last_failed; | |
287 | if (d.count() < retry_timo) { | |
288 | continue; | |
289 | } | |
290 | } | |
291 | if (!m_registered.count(dir_path)) { | |
292 | candidate = dir_path; | |
293 | break; | |
294 | } | |
295 | } | |
296 | ||
297 | std::rotate(m_directories.begin(), m_directories.begin() + 1, m_directories.end()); | |
298 | return candidate; | |
299 | } | |
300 | ||
301 | int PeerReplayer::register_directory(const std::string &dir_path, | |
302 | SnapshotReplayerThread *replayer) { | |
303 | dout(20) << ": dir_path=" << dir_path << dendl; | |
304 | ceph_assert(m_registered.find(dir_path) == m_registered.end()); | |
305 | ||
306 | DirRegistry registry; | |
307 | int r = try_lock_directory(dir_path, replayer, ®istry); | |
308 | if (r < 0) { | |
309 | return r; | |
310 | } | |
311 | ||
312 | dout(5) << ": dir_path=" << dir_path << " registered with replayer=" | |
313 | << replayer << dendl; | |
314 | m_registered.emplace(dir_path, std::move(registry)); | |
315 | return 0; | |
316 | } | |
317 | ||
318 | void PeerReplayer::unregister_directory(const std::string &dir_path) { | |
319 | dout(20) << ": dir_path=" << dir_path << dendl; | |
320 | ||
321 | auto it = m_registered.find(dir_path); | |
322 | ceph_assert(it != m_registered.end()); | |
323 | ||
324 | unlock_directory(it->first, it->second); | |
325 | m_registered.erase(it); | |
326 | if (std::find(m_directories.begin(), m_directories.end(), dir_path) == m_directories.end()) { | |
327 | m_snap_sync_stats.erase(dir_path); | |
328 | } | |
329 | } | |
330 | ||
331 | int PeerReplayer::try_lock_directory(const std::string &dir_path, | |
332 | SnapshotReplayerThread *replayer, DirRegistry *registry) { | |
333 | dout(20) << ": dir_path=" << dir_path << dendl; | |
334 | ||
335 | int r = ceph_open(m_remote_mount, dir_path.c_str(), O_RDONLY | O_DIRECTORY, 0); | |
336 | if (r < 0 && r != -ENOENT) { | |
337 | derr << ": failed to open remote dir_path=" << dir_path << ": " << cpp_strerror(r) | |
338 | << dendl; | |
339 | return r; | |
340 | } | |
341 | ||
342 | if (r == -ENOENT) { | |
343 | // we snap under dir_path, so mode does not matter much | |
344 | r = ceph_mkdirs(m_remote_mount, dir_path.c_str(), 0755); | |
345 | if (r < 0) { | |
346 | derr << ": failed to create remote directory=" << dir_path << ": " << cpp_strerror(r) | |
347 | << dendl; | |
348 | return r; | |
349 | } | |
350 | ||
351 | r = ceph_open(m_remote_mount, dir_path.c_str(), O_RDONLY | O_DIRECTORY, 0); | |
352 | if (r < 0) { | |
353 | derr << ": failed to open remote dir_path=" << dir_path << ": " << cpp_strerror(r) | |
354 | << dendl; | |
355 | return r; | |
356 | } | |
357 | } | |
358 | ||
359 | int fd = r; | |
360 | r = ceph_flock(m_remote_mount, fd, LOCK_EX | LOCK_NB, (uint64_t)replayer->get_thread_id()); | |
361 | if (r != 0) { | |
362 | if (r == -EWOULDBLOCK) { | |
363 | dout(5) << ": dir_path=" << dir_path << " is locked by cephfs-mirror, " | |
364 | << "will retry again" << dendl; | |
365 | } else { | |
366 | derr << ": failed to lock dir_path=" << dir_path << ": " << cpp_strerror(r) | |
367 | << dendl; | |
368 | } | |
369 | ||
370 | if (ceph_close(m_remote_mount, fd) < 0) { | |
371 | derr << ": failed to close (cleanup) remote dir_path=" << dir_path << ": " | |
372 | << cpp_strerror(r) << dendl; | |
373 | } | |
374 | return r; | |
375 | } | |
376 | ||
377 | dout(10) << ": dir_path=" << dir_path << " locked" << dendl; | |
378 | ||
379 | registry->fd = fd; | |
380 | registry->replayer = replayer; | |
381 | return 0; | |
382 | } | |
383 | ||
384 | void PeerReplayer::unlock_directory(const std::string &dir_path, const DirRegistry ®istry) { | |
385 | dout(20) << ": dir_path=" << dir_path << dendl; | |
386 | ||
387 | int r = ceph_flock(m_remote_mount, registry.fd, LOCK_UN, | |
388 | (uint64_t)registry.replayer->get_thread_id()); | |
389 | if (r < 0) { | |
390 | derr << ": failed to unlock remote dir_path=" << dir_path << ": " << cpp_strerror(r) | |
391 | << dendl; | |
392 | return; | |
393 | } | |
394 | ||
395 | r = ceph_close(m_remote_mount, registry.fd); | |
396 | if (r < 0) { | |
397 | derr << ": failed to close remote dir_path=" << dir_path << ": " << cpp_strerror(r) | |
398 | << dendl; | |
399 | } | |
400 | ||
401 | dout(10) << ": dir_path=" << dir_path << " unlocked" << dendl; | |
402 | } | |
403 | ||
404 | int PeerReplayer::build_snap_map(const std::string &dir_path, | |
405 | std::map<uint64_t, std::string> *snap_map, bool is_remote) { | |
406 | auto snap_dir = snapshot_dir_path(m_cct, dir_path); | |
407 | dout(20) << ": dir_path=" << dir_path << ", snap_dir=" << snap_dir | |
408 | << ", is_remote=" << is_remote << dendl; | |
409 | ||
410 | auto lr_str = is_remote ? "remote" : "local"; | |
411 | auto mnt = is_remote ? m_remote_mount : m_local_mount; | |
412 | ||
413 | ceph_dir_result *dirp = nullptr; | |
414 | int r = ceph_opendir(mnt, snap_dir.c_str(), &dirp); | |
415 | if (r < 0) { | |
416 | if (is_remote && r == -ENOENT) { | |
417 | return 0; | |
418 | } | |
419 | derr << ": failed to open " << lr_str << " snap directory=" << snap_dir | |
420 | << ": " << cpp_strerror(r) << dendl; | |
421 | return r; | |
422 | } | |
423 | ||
424 | std::set<std::string> snaps; | |
425 | auto entry = ceph_readdir(mnt, dirp); | |
426 | while (entry != NULL) { | |
427 | auto d_name = std::string(entry->d_name); | |
428 | dout(20) << ": entry=" << d_name << dendl; | |
429 | if (d_name != "." && d_name != "..") { | |
430 | snaps.emplace(d_name); | |
431 | } | |
432 | ||
433 | entry = ceph_readdir(mnt, dirp); | |
434 | } | |
435 | ||
436 | int rv = 0; | |
437 | for (auto &snap : snaps) { | |
438 | snap_info info; | |
439 | auto snap_path = snapshot_path(snap_dir, snap); | |
440 | r = ceph_get_snap_info(mnt, snap_path.c_str(), &info); | |
441 | if (r < 0) { | |
442 | derr << ": failed to fetch " << lr_str << " snap info for snap_path=" << snap_path | |
443 | << ": " << cpp_strerror(r) << dendl; | |
444 | rv = r; | |
445 | break; | |
446 | } | |
447 | ||
448 | uint64_t snap_id; | |
449 | if (is_remote) { | |
450 | if (!info.nr_snap_metadata) { | |
451 | derr << ": snap_path=" << snap_path << " has invalid metadata in remote snapshot" | |
452 | << dendl; | |
453 | rv = -EINVAL; | |
454 | } else { | |
455 | auto metadata = decode_snap_metadata(info.snap_metadata, info.nr_snap_metadata); | |
456 | dout(20) << ": snap_path=" << snap_path << ", metadata=" << metadata << dendl; | |
457 | auto it = metadata.find(PRIMARY_SNAP_ID_KEY); | |
458 | if (it == metadata.end()) { | |
459 | derr << ": snap_path=" << snap_path << " has missing \"" << PRIMARY_SNAP_ID_KEY | |
460 | << "\" in metadata" << dendl; | |
461 | rv = -EINVAL; | |
462 | } else { | |
463 | snap_id = std::stoull(it->second); | |
464 | } | |
465 | ceph_free_snap_info_buffer(&info); | |
466 | } | |
467 | } else { | |
468 | snap_id = info.id; | |
469 | } | |
470 | ||
471 | if (rv != 0) { | |
472 | break; | |
473 | } | |
474 | snap_map->emplace(snap_id, snap); | |
475 | } | |
476 | ||
477 | r = ceph_closedir(mnt, dirp); | |
478 | if (r < 0) { | |
479 | derr << ": failed to close " << lr_str << " snap directory=" << snap_dir | |
480 | << ": " << cpp_strerror(r) << dendl; | |
481 | } | |
482 | ||
483 | dout(10) << ": " << lr_str << " snap_map=" << *snap_map << dendl; | |
484 | return rv; | |
485 | } | |
486 | ||
487 | int PeerReplayer::propagate_snap_deletes(const std::string &dir_path, | |
488 | const std::set<std::string> &snaps) { | |
489 | dout(5) << ": dir_path=" << dir_path << ", deleted snapshots=" << snaps << dendl; | |
490 | ||
491 | for (auto &snap : snaps) { | |
492 | dout(20) << ": deleting dir_path=" << dir_path << ", snapshot=" << snap | |
493 | << dendl; | |
494 | int r = ceph_rmsnap(m_remote_mount, dir_path.c_str(), snap.c_str()); | |
495 | if (r < 0) { | |
496 | derr << ": failed to delete remote snap dir_path=" << dir_path | |
497 | << ", snapshot=" << snaps << ": " << cpp_strerror(r) << dendl; | |
498 | return r; | |
499 | } | |
500 | inc_deleted_snap(dir_path); | |
501 | } | |
502 | ||
503 | return 0; | |
504 | } | |
505 | ||
506 | int PeerReplayer::propagate_snap_renames( | |
507 | const std::string &dir_path, | |
508 | const std::set<std::pair<std::string,std::string>> &snaps) { | |
509 | dout(10) << ": dir_path=" << dir_path << ", renamed snapshots=" << snaps << dendl; | |
510 | ||
511 | for (auto &snapp : snaps) { | |
512 | auto from = snapshot_path(m_cct, dir_path, snapp.first); | |
513 | auto to = snapshot_path(m_cct, dir_path, snapp.second); | |
514 | dout(20) << ": renaming dir_path=" << dir_path << ", snapshot from=" | |
515 | << from << ", to=" << to << dendl; | |
516 | int r = ceph_rename(m_remote_mount, from.c_str(), to.c_str()); | |
517 | if (r < 0) { | |
518 | derr << ": failed to rename remote snap dir_path=" << dir_path | |
519 | << ", snapshot from =" << from << ", to=" << to << ": " | |
520 | << cpp_strerror(r) << dendl; | |
521 | return r; | |
522 | } | |
523 | inc_renamed_snap(dir_path); | |
524 | } | |
525 | ||
526 | return 0; | |
527 | } | |
528 | ||
529 | int PeerReplayer::remote_mkdir(const std::string &local_path, | |
530 | const std::string &remote_path, | |
531 | const struct ceph_statx &stx) { | |
532 | dout(10) << ": local_path=" << local_path << ", remote_path=" << remote_path | |
533 | << dendl; | |
534 | ||
535 | int r = ceph_mkdir(m_remote_mount, remote_path.c_str(), stx.stx_mode & ~S_IFDIR); | |
536 | if (r < 0 && r != -EEXIST) { | |
537 | derr << ": failed to create remote directory=" << remote_path << ": " << cpp_strerror(r) | |
538 | << dendl; | |
539 | return r; | |
540 | } | |
541 | ||
542 | r = ceph_lchown(m_remote_mount, remote_path.c_str(), stx.stx_uid, stx.stx_gid); | |
543 | if (r < 0) { | |
544 | derr << ": failed to chown remote directory=" << remote_path << ": " << cpp_strerror(r) | |
545 | << dendl; | |
546 | return r; | |
547 | } | |
548 | ||
549 | struct timeval times[] = {{stx.stx_atime.tv_sec, stx.stx_atime.tv_nsec / 1000}, | |
550 | {stx.stx_mtime.tv_sec, stx.stx_mtime.tv_nsec / 1000}}; | |
551 | r = ceph_lutimes(m_remote_mount, remote_path.c_str(), times); | |
552 | if (r < 0) { | |
553 | derr << ": failed to change [am]time on remote directory=" << remote_path << ": " | |
554 | << cpp_strerror(r) << dendl; | |
555 | return r; | |
556 | } | |
557 | ||
558 | return 0; | |
559 | } | |
560 | ||
561 | #define NR_IOVECS 8 // # iovecs | |
562 | #define IOVEC_SIZE (8 * 1024 * 1024) // buffer size for each iovec | |
563 | int PeerReplayer::remote_copy(const std::string &dir_path, | |
564 | const std::string &local_path, | |
565 | const std::string &remote_path, | |
566 | const struct ceph_statx &stx) { | |
567 | dout(10) << ": dir_path=" << dir_path << ", local_path=" << local_path | |
568 | << ", remote_path=" << remote_path << dendl; | |
569 | int l_fd; | |
570 | int r_fd; | |
571 | void *ptr; | |
572 | struct iovec iov[NR_IOVECS]; | |
573 | ||
574 | int r = ceph_open(m_local_mount, local_path.c_str(), O_RDONLY, 0); | |
575 | if (r < 0) { | |
576 | derr << ": failed to open local file path=" << local_path << ": " | |
577 | << cpp_strerror(r) << dendl; | |
578 | return r; | |
579 | } | |
580 | ||
581 | l_fd = r; | |
582 | r = ceph_open(m_remote_mount, remote_path.c_str(), | |
583 | O_CREAT | O_TRUNC | O_WRONLY, stx.stx_mode); | |
584 | if (r < 0) { | |
585 | derr << ": failed to create remote file path=" << remote_path << ": " | |
586 | << cpp_strerror(r) << dendl; | |
587 | goto close_local_fd; | |
588 | } | |
589 | ||
590 | r_fd = r; | |
591 | ptr = malloc(NR_IOVECS * IOVEC_SIZE); | |
592 | if (!ptr) { | |
593 | r = -ENOMEM; | |
594 | derr << ": failed to allocate memory" << dendl; | |
595 | goto close_remote_fd; | |
596 | } | |
597 | ||
598 | while (true) { | |
599 | if (should_backoff(dir_path, &r)) { | |
600 | dout(0) << ": backing off r=" << r << dendl; | |
601 | break; | |
602 | } | |
603 | ||
604 | for (int i = 0; i < NR_IOVECS; ++i) { | |
605 | iov[i].iov_base = (char*)ptr + IOVEC_SIZE*i; | |
606 | iov[i].iov_len = IOVEC_SIZE; | |
607 | } | |
608 | ||
609 | r = ceph_preadv(m_local_mount, l_fd, iov, NR_IOVECS, -1); | |
610 | if (r < 0) { | |
611 | derr << ": failed to read local file path=" << local_path << ": " | |
612 | << cpp_strerror(r) << dendl; | |
613 | break; | |
614 | } | |
615 | if (r == 0) { | |
616 | break; | |
617 | } | |
618 | ||
619 | int iovs = (int)(r / IOVEC_SIZE); | |
620 | int t = r % IOVEC_SIZE; | |
621 | if (t) { | |
622 | iov[iovs].iov_len = t; | |
623 | ++iovs; | |
624 | } | |
625 | ||
626 | r = ceph_pwritev(m_remote_mount, r_fd, iov, iovs, -1); | |
627 | if (r < 0) { | |
628 | derr << ": failed to write remote file path=" << remote_path << ": " | |
629 | << cpp_strerror(r) << dendl; | |
630 | break; | |
631 | } | |
632 | } | |
633 | ||
634 | if (r == 0) { | |
635 | r = ceph_fsync(m_remote_mount, r_fd, 0); | |
636 | if (r < 0) { | |
637 | derr << ": failed to sync data for dir_path=" << remote_path << ": " | |
638 | << cpp_strerror(r) << dendl; | |
639 | } | |
640 | } | |
641 | ||
642 | free(ptr); | |
643 | ||
644 | close_remote_fd: | |
645 | if (ceph_close(m_remote_mount, r_fd) < 0) { | |
646 | derr << ": failed to close remote fd path=" << remote_path << ": " << cpp_strerror(r) | |
647 | << dendl; | |
648 | return -EINVAL; | |
649 | } | |
650 | ||
651 | close_local_fd: | |
652 | if (ceph_close(m_local_mount, l_fd) < 0) { | |
653 | derr << ": failed to close local fd path=" << local_path << ": " << cpp_strerror(r) | |
654 | << dendl; | |
655 | return -EINVAL; | |
656 | } | |
657 | ||
658 | return r == 0 ? 0 : r; | |
659 | } | |
660 | ||
661 | int PeerReplayer::remote_file_op(const std::string &dir_path, | |
662 | const std::string &local_path, | |
663 | const std::string &remote_path, | |
664 | const struct ceph_statx &stx) { | |
665 | dout(10) << ": dir_path=" << dir_path << ", local_path=" << local_path | |
666 | << ", remote_path=" << remote_path << dendl; | |
667 | ||
668 | int r; | |
669 | if (S_ISREG(stx.stx_mode)) { | |
670 | r = remote_copy(dir_path, local_path, remote_path, stx); | |
671 | if (r < 0) { | |
672 | derr << ": failed to copy path=" << local_path << ": " << cpp_strerror(r) | |
673 | << dendl; | |
674 | return r; | |
675 | } | |
676 | } else if (S_ISLNK(stx.stx_mode)) { | |
677 | char *target = (char *)alloca(stx.stx_size+1); | |
678 | r = ceph_readlink(m_local_mount, local_path.c_str(), target, stx.stx_size); | |
679 | if (r < 0) { | |
680 | derr << ": failed to readlink local path=" << local_path << ": " << cpp_strerror(r) | |
681 | << dendl; | |
682 | return r; | |
683 | } | |
684 | ||
685 | target[stx.stx_size] = '\0'; | |
686 | r = ceph_symlink(m_remote_mount, target, remote_path.c_str()); | |
687 | if (r < 0 && r != EEXIST) { | |
688 | derr << ": failed to symlink remote path=" << remote_path << " to target=" << target | |
689 | << ": " << cpp_strerror(r) << dendl; | |
690 | return r; | |
691 | } | |
692 | } else { | |
693 | dout(5) << ": skipping entry=" << local_path << ": unsupported mode=" << stx.stx_mode | |
694 | << dendl; | |
695 | return 0; | |
696 | } | |
697 | ||
698 | r = ceph_lchown(m_remote_mount, remote_path.c_str(), stx.stx_uid, stx.stx_gid); | |
699 | if (r < 0) { | |
700 | derr << ": failed to chown remote directory=" << remote_path << ": " | |
701 | << cpp_strerror(r) << dendl; | |
702 | return r; | |
703 | } | |
704 | ||
705 | struct timeval times[] = {{stx.stx_atime.tv_sec, stx.stx_atime.tv_nsec / 1000}, | |
706 | {stx.stx_mtime.tv_sec, stx.stx_mtime.tv_nsec / 1000}}; | |
707 | r = ceph_lutimes(m_remote_mount, remote_path.c_str(), times); | |
708 | if (r < 0) { | |
709 | derr << ": failed to change [am]time on remote directory=" << remote_path << ": " | |
710 | << cpp_strerror(r) << dendl; | |
711 | return r; | |
712 | } | |
713 | ||
714 | return 0; | |
715 | } | |
716 | ||
717 | int PeerReplayer::cleanup_remote_dir(const std::string &dir_path) { | |
718 | dout(20) << ": dir_path=" << dir_path << dendl; | |
719 | ||
720 | std::stack<SyncEntry> rm_stack; | |
721 | ceph_dir_result *tdirp; | |
722 | int r = ceph_opendir(m_remote_mount, dir_path.c_str(), &tdirp); | |
723 | if (r < 0) { | |
724 | derr << ": failed to open remote directory=" << dir_path << ": " | |
725 | << cpp_strerror(r) << dendl; | |
726 | return r; | |
727 | } | |
728 | ||
729 | struct ceph_statx tstx; | |
730 | r = ceph_statx(m_remote_mount, dir_path.c_str(), &tstx, | |
731 | CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID | | |
732 | CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME, | |
733 | AT_NO_ATTR_SYNC | AT_SYMLINK_NOFOLLOW); | |
734 | if (r < 0) { | |
735 | derr << ": failed to stat remote directory=" << dir_path << ": " | |
736 | << cpp_strerror(r) << dendl; | |
737 | return r; | |
738 | } | |
739 | ||
740 | rm_stack.emplace(SyncEntry(dir_path, tdirp, tstx)); | |
741 | while (!rm_stack.empty()) { | |
742 | if (should_backoff(dir_path, &r)) { | |
743 | dout(0) << ": backing off r=" << r << dendl; | |
744 | break; | |
745 | } | |
746 | ||
747 | dout(20) << ": " << rm_stack.size() << " entries in stack" << dendl; | |
748 | std::string e_name; | |
749 | auto &entry = rm_stack.top(); | |
750 | dout(20) << ": top of stack path=" << entry.epath << dendl; | |
751 | if (entry.is_directory()) { | |
752 | struct ceph_statx stx; | |
753 | struct dirent de; | |
754 | while (true) { | |
755 | r = ceph_readdirplus_r(m_remote_mount, entry.dirp, &de, &stx, | |
756 | CEPH_STATX_MODE, AT_NO_ATTR_SYNC | AT_SYMLINK_NOFOLLOW, NULL); | |
757 | if (r < 0) { | |
758 | derr << ": failed to read remote directory=" << entry.epath << dendl; | |
759 | break; | |
760 | } | |
761 | if (r == 0) { | |
762 | break; | |
763 | } | |
764 | ||
765 | auto d_name = std::string(de.d_name); | |
766 | if (d_name != "." && d_name != "..") { | |
767 | e_name = d_name; | |
768 | break; | |
769 | } | |
770 | } | |
771 | ||
772 | if (r == 0) { | |
773 | if (rm_stack.size() > 1) { | |
774 | r = ceph_rmdir(m_remote_mount, entry.epath.c_str()); | |
775 | if (r < 0) { | |
776 | derr << ": failed to remove remote directory=" << entry.epath << ": " | |
777 | << cpp_strerror(r) << dendl; | |
778 | break; | |
779 | } | |
780 | } | |
781 | ||
782 | dout(10) << ": done for remote directory=" << entry.epath << dendl; | |
783 | if (ceph_closedir(m_remote_mount, entry.dirp) < 0) { | |
784 | derr << ": failed to close remote directory=" << entry.epath << dendl; | |
785 | } | |
786 | rm_stack.pop(); | |
787 | continue; | |
788 | } | |
789 | if (r < 0) { | |
790 | break; | |
791 | } | |
792 | ||
793 | auto epath = entry_path(entry.epath, e_name); | |
794 | if (S_ISDIR(stx.stx_mode)) { | |
795 | ceph_dir_result *dirp; | |
796 | r = ceph_opendir(m_remote_mount, epath.c_str(), &dirp); | |
797 | if (r < 0) { | |
798 | derr << ": failed to open remote directory=" << epath << ": " | |
799 | << cpp_strerror(r) << dendl; | |
800 | break; | |
801 | } | |
802 | rm_stack.emplace(SyncEntry(epath, dirp, stx)); | |
803 | } else { | |
804 | rm_stack.emplace(SyncEntry(epath, stx)); | |
805 | } | |
806 | } else { | |
807 | r = ceph_unlink(m_remote_mount, entry.epath.c_str()); | |
808 | if (r < 0) { | |
809 | derr << ": failed to remove remote directory=" << entry.epath << ": " | |
810 | << cpp_strerror(r) << dendl; | |
811 | break; | |
812 | } | |
813 | dout(10) << ": done for remote file=" << entry.epath << dendl; | |
814 | rm_stack.pop(); | |
815 | } | |
816 | } | |
817 | ||
818 | while (!rm_stack.empty()) { | |
819 | auto &entry = rm_stack.top(); | |
820 | if (entry.is_directory()) { | |
821 | dout(20) << ": closing remote directory=" << entry.epath << dendl; | |
822 | if (ceph_closedir(m_remote_mount, entry.dirp) < 0) { | |
823 | derr << ": failed to close remote directory=" << entry.epath << dendl; | |
824 | } | |
825 | } | |
826 | ||
827 | rm_stack.pop(); | |
828 | } | |
829 | ||
830 | return r; | |
831 | } | |
832 | ||
833 | int PeerReplayer::do_synchronize(const std::string &dir_path, const std::string &snap_name) { | |
834 | dout(20) << ": dir_path=" << dir_path << ", snap_name=" << snap_name << dendl; | |
835 | ||
836 | auto snap_path = snapshot_path(m_cct, dir_path, snap_name); | |
837 | std::stack<SyncEntry> sync_stack; | |
838 | ||
839 | ceph_dir_result *tdirp; | |
840 | int r = ceph_opendir(m_local_mount, snap_path.c_str(), &tdirp); | |
841 | if (r < 0) { | |
842 | derr << ": failed to open local directory=" << snap_path << ": " | |
843 | << cpp_strerror(r) << dendl; | |
844 | return r; | |
845 | } | |
846 | ||
847 | struct ceph_statx tstx; | |
848 | r = ceph_statx(m_local_mount, snap_path.c_str(), &tstx, | |
849 | CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID | | |
850 | CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME, | |
851 | AT_NO_ATTR_SYNC | AT_SYMLINK_NOFOLLOW); | |
852 | if (r < 0) { | |
853 | derr << ": failed to stat local directory=" << snap_path << ": " | |
854 | << cpp_strerror(r) << dendl; | |
855 | return r; | |
856 | } | |
857 | ||
858 | sync_stack.emplace(SyncEntry("/", tdirp, tstx)); | |
859 | while (!sync_stack.empty()) { | |
860 | if (should_backoff(dir_path, &r)) { | |
861 | dout(0) << ": backing off r=" << r << dendl; | |
862 | break; | |
863 | } | |
864 | ||
865 | dout(20) << ": " << sync_stack.size() << " entries in stack" << dendl; | |
866 | std::string e_name; | |
867 | auto &entry = sync_stack.top(); | |
868 | dout(20) << ": top of stack path=" << entry.epath << dendl; | |
869 | if (entry.is_directory()) { | |
870 | struct ceph_statx stx; | |
871 | struct dirent de; | |
872 | while (true) { | |
873 | r = ceph_readdirplus_r(m_local_mount, entry.dirp, &de, &stx, | |
874 | CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID | | |
875 | CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME, | |
876 | AT_NO_ATTR_SYNC | AT_SYMLINK_NOFOLLOW, NULL); | |
877 | if (r < 0) { | |
878 | derr << ": failed to local read directory=" << entry.epath << dendl; | |
879 | break; | |
880 | } | |
881 | if (r == 0) { | |
882 | break; | |
883 | } | |
884 | ||
885 | auto d_name = std::string(de.d_name); | |
886 | if (d_name != "." && d_name != "..") { | |
887 | e_name = d_name; | |
888 | break; | |
889 | } | |
890 | } | |
891 | ||
892 | if (r == 0) { | |
893 | dout(10) << ": done for directory=" << entry.epath << dendl; | |
894 | if (ceph_closedir(m_local_mount, entry.dirp) < 0) { | |
895 | derr << ": failed to close local directory=" << entry.epath << dendl; | |
896 | } | |
897 | sync_stack.pop(); | |
898 | continue; | |
899 | } | |
900 | if (r < 0) { | |
901 | break; | |
902 | } | |
903 | ||
904 | auto epath = entry_path(entry.epath, e_name); | |
905 | auto l_path = entry_path(snap_path, epath); | |
906 | auto r_path = entry_path(dir_path, epath); | |
907 | if (S_ISDIR(stx.stx_mode)) { | |
908 | r = remote_mkdir(l_path, r_path, stx); | |
909 | if (r < 0) { | |
910 | break; | |
911 | } | |
912 | ceph_dir_result *dirp; | |
913 | r = ceph_opendir(m_local_mount, l_path.c_str(), &dirp); | |
914 | if (r < 0) { | |
915 | derr << ": failed to open local directory=" << l_path << ": " | |
916 | << cpp_strerror(r) << dendl; | |
917 | break; | |
918 | } | |
919 | sync_stack.emplace(SyncEntry(epath, dirp, stx)); | |
920 | } else { | |
921 | sync_stack.emplace(SyncEntry(epath, stx)); | |
922 | } | |
923 | } else { | |
924 | auto l_path = entry_path(snap_path, entry.epath); | |
925 | auto r_path = entry_path(dir_path, entry.epath); | |
926 | r = remote_file_op(dir_path, l_path, r_path, entry.stx); | |
927 | if (r < 0) { | |
928 | break; | |
929 | } | |
930 | dout(10) << ": done for file=" << entry.epath << dendl; | |
931 | sync_stack.pop(); | |
932 | } | |
933 | } | |
934 | ||
935 | while (!sync_stack.empty()) { | |
936 | auto &entry = sync_stack.top(); | |
937 | if (entry.is_directory()) { | |
938 | dout(20) << ": closing local directory=" << entry.epath << dendl; | |
939 | if (ceph_closedir(m_local_mount, entry.dirp) < 0) { | |
940 | derr << ": failed to close local directory=" << entry.epath << dendl; | |
941 | } | |
942 | } | |
943 | ||
944 | sync_stack.pop(); | |
945 | } | |
946 | ||
947 | return r; | |
948 | } | |
949 | ||
950 | int PeerReplayer::synchronize(const std::string &dir_path, uint64_t snap_id, | |
951 | const std::string &snap_name) { | |
952 | dout(20) << ": dir_path=" << dir_path << ", snap_id=" << snap_id | |
953 | << ", snap_name=" << snap_name << dendl; | |
954 | ||
955 | auto snap_path = snapshot_path(m_cct, dir_path, snap_name); | |
956 | ||
957 | int r = cleanup_remote_dir(dir_path); | |
958 | if (r < 0) { | |
959 | derr << ": failed to cleanup remote directory=" << dir_path << dendl; | |
960 | return r; | |
961 | } | |
962 | ||
963 | r = do_synchronize(dir_path, snap_name); | |
964 | if (r < 0) { | |
965 | derr << ": failed to synchronize dir_path=" << dir_path << ", snapshot=" | |
966 | << snap_path << dendl; | |
967 | return r; | |
968 | } | |
969 | ||
970 | auto snap_id_str{stringify(snap_id)}; | |
971 | snap_metadata snap_meta[] = {{PRIMARY_SNAP_ID_KEY.c_str(), snap_id_str.c_str()}}; | |
972 | r = ceph_mksnap(m_remote_mount, dir_path.c_str(), snap_name.c_str(), 0755, | |
973 | snap_meta, 1); | |
974 | if (r < 0) { | |
975 | derr << ": failed to snap remote directory dir_path=" << dir_path | |
976 | << ": " << cpp_strerror(r) << dendl; | |
977 | } | |
978 | ||
979 | return r; | |
980 | } | |
981 | ||
982 | int PeerReplayer::do_sync_snaps(const std::string &dir_path) { | |
983 | dout(20) << ": dir_path=" << dir_path << dendl; | |
984 | ||
985 | std::map<uint64_t, std::string> local_snap_map; | |
986 | std::map<uint64_t, std::string> remote_snap_map; | |
987 | ||
988 | int r = build_snap_map(dir_path, &local_snap_map); | |
989 | if (r < 0) { | |
990 | derr << ": failed to build local snap map" << dendl; | |
991 | return r; | |
992 | } | |
993 | ||
994 | r = build_snap_map(dir_path, &remote_snap_map, true); | |
995 | if (r < 0) { | |
996 | derr << ": failed to build remote snap map" << dendl; | |
997 | return r; | |
998 | } | |
999 | ||
1000 | // infer deleted and renamed snapshots from local and remote | |
1001 | // snap maps | |
1002 | std::set<std::string> snaps_deleted; | |
1003 | std::set<std::pair<std::string,std::string>> snaps_renamed; | |
1004 | for (auto &[primary_snap_id, snap_name] : remote_snap_map) { | |
1005 | auto it = local_snap_map.find(primary_snap_id); | |
1006 | if (it == local_snap_map.end()) { | |
1007 | snaps_deleted.emplace(snap_name); | |
1008 | } else if (it->second != snap_name) { | |
1009 | snaps_renamed.emplace(std::make_pair(snap_name, it->second)); | |
1010 | } | |
1011 | } | |
1012 | ||
1013 | r = propagate_snap_deletes(dir_path, snaps_deleted); | |
1014 | if (r < 0) { | |
1015 | derr << ": failed to propgate deleted snapshots" << dendl; | |
1016 | return r; | |
1017 | } | |
1018 | ||
1019 | r = propagate_snap_renames(dir_path, snaps_renamed); | |
1020 | if (r < 0) { | |
1021 | derr << ": failed to propgate renamed snapshots" << dendl; | |
1022 | return r; | |
1023 | } | |
1024 | ||
1025 | // start mirroring snapshots from the last snap-id synchronized | |
1026 | uint64_t last_snap_id = 0; | |
1027 | if (!remote_snap_map.empty()) { | |
1028 | auto last = remote_snap_map.rbegin(); | |
1029 | last_snap_id = last->first; | |
1030 | set_last_synced_snap(dir_path, last_snap_id, last->second); | |
1031 | } | |
1032 | ||
1033 | dout(5) << ": last snap-id transferred=" << last_snap_id << dendl; | |
1034 | auto it = local_snap_map.upper_bound(last_snap_id); | |
1035 | if (it == local_snap_map.end()) { | |
1036 | dout(20) << ": nothing to synchronize" << dendl; | |
1037 | return 0; | |
1038 | } | |
1039 | ||
1040 | auto snaps_per_cycle = g_ceph_context->_conf.get_val<uint64_t>( | |
1041 | "cephfs_mirror_max_snapshot_sync_per_cycle"); | |
1042 | ||
1043 | dout(10) << ": synzhronizing from snap-id=" << it->first << dendl; | |
1044 | for (; it != local_snap_map.end(); ++it) { | |
1045 | set_current_syncing_snap(dir_path, it->first, it->second); | |
1046 | auto start = clock::now(); | |
1047 | r = synchronize(dir_path, it->first, it->second); | |
1048 | if (r < 0) { | |
1049 | derr << ": failed to synchronize dir_path=" << dir_path | |
1050 | << ", snapshot=" << it->second << dendl; | |
1051 | clear_current_syncing_snap(dir_path); | |
1052 | return r; | |
1053 | } | |
1054 | std::chrono::duration<double> duration = clock::now() - start; | |
1055 | set_last_synced_stat(dir_path, it->first, it->second, duration.count()); | |
1056 | if (--snaps_per_cycle == 0) { | |
1057 | break; | |
1058 | } | |
1059 | } | |
1060 | ||
1061 | return 0; | |
1062 | } | |
1063 | ||
1064 | void PeerReplayer::sync_snaps(const std::string &dir_path, | |
1065 | std::unique_lock<ceph::mutex> &locker) { | |
1066 | dout(20) << ": dir_path=" << dir_path << dendl; | |
1067 | locker.unlock(); | |
1068 | int r = do_sync_snaps(dir_path); | |
1069 | if (r < 0) { | |
1070 | derr << ": failed to sync snapshots for dir_path=" << dir_path << dendl; | |
1071 | } | |
1072 | locker.lock(); | |
1073 | if (r < 0) { | |
1074 | _inc_failed_count(dir_path); | |
1075 | } else { | |
1076 | _reset_failed_count(dir_path); | |
1077 | } | |
1078 | } | |
1079 | ||
1080 | void PeerReplayer::run(SnapshotReplayerThread *replayer) { | |
1081 | dout(10) << ": snapshot replayer=" << replayer << dendl; | |
1082 | ||
1083 | time last_directory_scan = clock::zero(); | |
1084 | auto scan_interval = g_ceph_context->_conf.get_val<uint64_t>( | |
1085 | "cephfs_mirror_directory_scan_interval"); | |
1086 | ||
1087 | std::unique_lock locker(m_lock); | |
1088 | while (true) { | |
1089 | // do not check if client is blocklisted under lock | |
1090 | m_cond.wait_for(locker, 1s, [this]{return is_stopping();}); | |
1091 | if (is_stopping()) { | |
1092 | dout(5) << ": exiting" << dendl; | |
1093 | break; | |
1094 | } | |
1095 | ||
1096 | locker.unlock(); | |
1097 | ||
1098 | if (m_fs_mirror->is_blocklisted()) { | |
1099 | dout(5) << ": exiting as client is blocklisted" << dendl; | |
1100 | break; | |
1101 | } | |
1102 | ||
1103 | locker.lock(); | |
1104 | ||
1105 | auto now = clock::now(); | |
1106 | std::chrono::duration<double> timo = now - last_directory_scan; | |
1107 | if (timo.count() >= scan_interval && m_directories.size()) { | |
1108 | dout(20) << ": trying to pick from " << m_directories.size() << " directories" << dendl; | |
1109 | auto dir_path = pick_directory(); | |
1110 | if (dir_path) { | |
1111 | dout(5) << ": picked dir_path=" << *dir_path << dendl; | |
1112 | int r = register_directory(*dir_path, replayer); | |
1113 | if (r == 0) { | |
1114 | sync_snaps(*dir_path, locker); | |
1115 | unregister_directory(*dir_path); | |
1116 | } | |
1117 | } | |
1118 | ||
1119 | last_directory_scan = now; | |
1120 | } | |
1121 | } | |
1122 | } | |
1123 | ||
1124 | void PeerReplayer::peer_status(Formatter *f) { | |
1125 | std::scoped_lock locker(m_lock); | |
1126 | f->open_object_section("stats"); | |
1127 | for (auto &[dir_path, sync_stat] : m_snap_sync_stats) { | |
1128 | f->open_object_section(dir_path); | |
1129 | if (sync_stat.failed) { | |
1130 | f->dump_string("state", "failed"); | |
1131 | } else if (!sync_stat.current_syncing_snap) { | |
1132 | f->dump_string("state", "idle"); | |
1133 | } else { | |
1134 | f->dump_string("state", "syncing"); | |
1135 | f->open_object_section("current_sycning_snap"); | |
1136 | f->dump_unsigned("id", (*sync_stat.current_syncing_snap).first); | |
1137 | f->dump_string("name", (*sync_stat.current_syncing_snap).second); | |
1138 | f->close_section(); | |
1139 | } | |
1140 | if (sync_stat.last_synced_snap) { | |
1141 | f->open_object_section("last_synced_snap"); | |
1142 | f->dump_unsigned("id", (*sync_stat.last_synced_snap).first); | |
1143 | f->dump_string("name", (*sync_stat.last_synced_snap).second); | |
1144 | if (sync_stat.last_sync_duration) { | |
1145 | f->dump_float("sync_duration", *sync_stat.last_sync_duration); | |
1146 | f->dump_stream("sync_time_stamp") << sync_stat.last_synced; | |
1147 | } | |
1148 | f->close_section(); | |
1149 | } | |
1150 | f->dump_unsigned("snaps_synced", sync_stat.synced_snap_count); | |
1151 | f->dump_unsigned("snaps_deleted", sync_stat.deleted_snap_count); | |
1152 | f->dump_unsigned("snaps_renamed", sync_stat.renamed_snap_count); | |
1153 | f->close_section(); // dir_path | |
1154 | } | |
1155 | f->close_section(); // stats | |
1156 | } | |
1157 | ||
1158 | } // namespace mirror | |
1159 | } // namespace cephfs |