]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/cephfs_mirror/PeerReplayer.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / tools / cephfs_mirror / PeerReplayer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <stack>
5 #include <fcntl.h>
6 #include <algorithm>
7 #include <sys/time.h>
8 #include <sys/file.h>
9 #include <boost/scope_exit.hpp>
10
11 #include "common/admin_socket.h"
12 #include "common/ceph_context.h"
13 #include "common/debug.h"
14 #include "common/errno.h"
15 #include "FSMirror.h"
16 #include "PeerReplayer.h"
17 #include "Utils.h"
18
19 #include "json_spirit/json_spirit.h"
20
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_cephfs_mirror
23 #undef dout_prefix
24 #define dout_prefix *_dout << "cephfs::mirror::PeerReplayer(" \
25 << m_peer.uuid << ") " << __func__
26
27 using namespace std;
28
29 namespace cephfs {
30 namespace mirror {
31
32 namespace {
33
34 const std::string PEER_CONFIG_KEY_PREFIX = "cephfs/mirror/peer";
35
36 std::string snapshot_dir_path(CephContext *cct, const std::string &path) {
37 return path + "/" + cct->_conf->client_snapdir;
38 }
39
40 std::string snapshot_path(const std::string &snap_dir, const std::string &snap_name) {
41 return snap_dir + "/" + snap_name;
42 }
43
44 std::string snapshot_path(CephContext *cct, const std::string &path, const std::string &snap_name) {
45 return path + "/" + cct->_conf->client_snapdir + "/" + snap_name;
46 }
47
48 std::string entry_path(const std::string &dir, const std::string &name) {
49 return dir + "/" + name;
50 }
51
52 std::map<std::string, std::string> decode_snap_metadata(snap_metadata *snap_metadata,
53 size_t nr_snap_metadata) {
54 std::map<std::string, std::string> metadata;
55 for (size_t i = 0; i < nr_snap_metadata; ++i) {
56 metadata.emplace(snap_metadata[i].key, snap_metadata[i].value);
57 }
58
59 return metadata;
60 }
61
62 std::string peer_config_key(const std::string &fs_name, const std::string &uuid) {
63 return PEER_CONFIG_KEY_PREFIX + "/" + fs_name + "/" + uuid;
64 }
65
66 class PeerAdminSocketCommand {
67 public:
68 virtual ~PeerAdminSocketCommand() {
69 }
70 virtual int call(Formatter *f) = 0;
71 };
72
73 class StatusCommand : public PeerAdminSocketCommand {
74 public:
75 explicit StatusCommand(PeerReplayer *peer_replayer)
76 : peer_replayer(peer_replayer) {
77 }
78
79 int call(Formatter *f) override {
80 peer_replayer->peer_status(f);
81 return 0;
82 }
83
84 private:
85 PeerReplayer *peer_replayer;
86 };
87
88 // helper to open a directory relative to a file descriptor
89 int opendirat(MountRef mnt, int dirfd, const std::string &relpath, int flags,
90 ceph_dir_result **dirp) {
91 int r = ceph_openat(mnt, dirfd, relpath.c_str(), flags, 0);
92 if (r < 0) {
93 return r;
94 }
95
96 int fd = r;
97 r = ceph_fdopendir(mnt, fd, dirp);
98 ceph_close(mnt, fd);
99 return r;
100 }
101
102 } // anonymous namespace
103
104 class PeerReplayerAdminSocketHook : public AdminSocketHook {
105 public:
106 PeerReplayerAdminSocketHook(CephContext *cct, const Filesystem &filesystem,
107 const Peer &peer, PeerReplayer *peer_replayer)
108 : admin_socket(cct->get_admin_socket()) {
109 int r;
110 std::string cmd;
111
112 // mirror peer status format is name@id uuid
113 cmd = "fs mirror peer status "
114 + stringify(filesystem.fs_name) + "@" + stringify(filesystem.fscid)
115 + " "
116 + stringify(peer.uuid);
117 r = admin_socket->register_command(
118 cmd, this, "get peer mirror status");
119 if (r == 0) {
120 commands[cmd] = new StatusCommand(peer_replayer);
121 }
122 }
123
124 ~PeerReplayerAdminSocketHook() override {
125 admin_socket->unregister_commands(this);
126 for (auto &[command, cmdptr] : commands) {
127 delete cmdptr;
128 }
129 }
130
131 int call(std::string_view command, const cmdmap_t& cmdmap,
132 Formatter *f, std::ostream &errss, bufferlist &out) override {
133 auto p = commands.at(std::string(command));
134 return p->call(f);
135 }
136
137 private:
138 typedef std::map<std::string, PeerAdminSocketCommand*, std::less<>> Commands;
139
140 AdminSocket *admin_socket;
141 Commands commands;
142 };
143
144 PeerReplayer::PeerReplayer(CephContext *cct, FSMirror *fs_mirror,
145 RadosRef local_cluster, const Filesystem &filesystem,
146 const Peer &peer, const std::set<std::string, std::less<>> &directories,
147 MountRef mount, ServiceDaemon *service_daemon)
148 : m_cct(cct),
149 m_fs_mirror(fs_mirror),
150 m_local_cluster(local_cluster),
151 m_filesystem(filesystem),
152 m_peer(peer),
153 m_directories(directories.begin(), directories.end()),
154 m_local_mount(mount),
155 m_service_daemon(service_daemon),
156 m_asok_hook(new PeerReplayerAdminSocketHook(cct, filesystem, peer, this)),
157 m_lock(ceph::make_mutex("cephfs::mirror::PeerReplayer::" + stringify(peer.uuid))) {
158 // reset sync stats sent via service daemon
159 m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer,
160 SERVICE_DAEMON_FAILED_DIR_COUNT_KEY, (uint64_t)0);
161 m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer,
162 SERVICE_DAEMON_RECOVERED_DIR_COUNT_KEY, (uint64_t)0);
163 }
164
165 PeerReplayer::~PeerReplayer() {
166 delete m_asok_hook;
167 }
168
169 int PeerReplayer::init() {
170 dout(20) << ": initial dir list=[" << m_directories << "]" << dendl;
171 for (auto &dir_root : m_directories) {
172 m_snap_sync_stats.emplace(dir_root, SnapSyncStat());
173 }
174
175 auto &remote_client = m_peer.remote.client_name;
176 auto &remote_cluster = m_peer.remote.cluster_name;
177 auto remote_filesystem = Filesystem{0, m_peer.remote.fs_name};
178
179 std::string key = peer_config_key(m_filesystem.fs_name, m_peer.uuid);
180 std::string cmd =
181 "{"
182 "\"prefix\": \"config-key get\", "
183 "\"key\": \"" + key + "\""
184 "}";
185
186 bufferlist in_bl;
187 bufferlist out_bl;
188
189 int r = m_local_cluster->mon_command(cmd, in_bl, &out_bl, nullptr);
190 dout(5) << ": mon command r=" << r << dendl;
191 if (r < 0 && r != -ENOENT) {
192 return r;
193 }
194
195 std::string mon_host;
196 std::string cephx_key;
197 if (!r) {
198 json_spirit::mValue root;
199 if (!json_spirit::read(out_bl.to_str(), root)) {
200 derr << ": invalid config-key JSON" << dendl;
201 return -EBADMSG;
202 }
203 try {
204 auto &root_obj = root.get_obj();
205 mon_host = root_obj.at("mon_host").get_str();
206 cephx_key = root_obj.at("key").get_str();
207 dout(0) << ": remote monitor host=" << mon_host << dendl;
208 } catch (std::runtime_error&) {
209 derr << ": unexpected JSON received" << dendl;
210 return -EBADMSG;
211 }
212 }
213
214 r = connect(remote_client, remote_cluster, &m_remote_cluster, mon_host, cephx_key);
215 if (r < 0) {
216 derr << ": error connecting to remote cluster: " << cpp_strerror(r)
217 << dendl;
218 return r;
219 }
220
221 r = mount(m_remote_cluster, remote_filesystem, false, &m_remote_mount);
222 if (r < 0) {
223 m_remote_cluster.reset();
224 derr << ": error mounting remote filesystem=" << remote_filesystem << dendl;
225 return r;
226 }
227
228 std::scoped_lock locker(m_lock);
229 auto nr_replayers = g_ceph_context->_conf.get_val<uint64_t>(
230 "cephfs_mirror_max_concurrent_directory_syncs");
231 dout(20) << ": spawning " << nr_replayers << " snapshot replayer(s)" << dendl;
232
233 while (nr_replayers-- > 0) {
234 std::unique_ptr<SnapshotReplayerThread> replayer(
235 new SnapshotReplayerThread(this));
236 std::string name("replayer-" + stringify(nr_replayers));
237 replayer->create(name.c_str());
238 m_replayers.push_back(std::move(replayer));
239 }
240
241 return 0;
242 }
243
244 void PeerReplayer::shutdown() {
245 dout(20) << dendl;
246
247 {
248 std::scoped_lock locker(m_lock);
249 ceph_assert(!m_stopping);
250 m_stopping = true;
251 m_cond.notify_all();
252 }
253
254 for (auto &replayer : m_replayers) {
255 replayer->join();
256 }
257 m_replayers.clear();
258 ceph_unmount(m_remote_mount);
259 ceph_release(m_remote_mount);
260 m_remote_mount = nullptr;
261 m_remote_cluster.reset();
262 }
263
264 void PeerReplayer::add_directory(string_view dir_root) {
265 dout(20) << ": dir_root=" << dir_root << dendl;
266
267 std::scoped_lock locker(m_lock);
268 m_directories.emplace_back(dir_root);
269 m_snap_sync_stats.emplace(dir_root, SnapSyncStat());
270 m_cond.notify_all();
271 }
272
273 void PeerReplayer::remove_directory(string_view dir_root) {
274 dout(20) << ": dir_root=" << dir_root << dendl;
275 auto _dir_root = std::string(dir_root);
276
277 std::scoped_lock locker(m_lock);
278 auto it = std::find(m_directories.begin(), m_directories.end(), _dir_root);
279 if (it != m_directories.end()) {
280 m_directories.erase(it);
281 }
282
283 auto it1 = m_registered.find(_dir_root);
284 if (it1 == m_registered.end()) {
285 m_snap_sync_stats.erase(_dir_root);
286 } else {
287 it1->second.canceled = true;
288 }
289 m_cond.notify_all();
290 }
291
292 boost::optional<std::string> PeerReplayer::pick_directory() {
293 dout(20) << dendl;
294
295 auto now = clock::now();
296 auto retry_timo = g_ceph_context->_conf.get_val<uint64_t>(
297 "cephfs_mirror_retry_failed_directories_interval");
298
299 boost::optional<std::string> candidate;
300 for (auto &dir_root : m_directories) {
301 auto &sync_stat = m_snap_sync_stats.at(dir_root);
302 if (sync_stat.failed) {
303 std::chrono::duration<double> d = now - *sync_stat.last_failed;
304 if (d.count() < retry_timo) {
305 continue;
306 }
307 }
308 if (!m_registered.count(dir_root)) {
309 candidate = dir_root;
310 break;
311 }
312 }
313
314 std::rotate(m_directories.begin(), m_directories.begin() + 1, m_directories.end());
315 return candidate;
316 }
317
318 int PeerReplayer::register_directory(const std::string &dir_root,
319 SnapshotReplayerThread *replayer) {
320 dout(20) << ": dir_root=" << dir_root << dendl;
321 ceph_assert(m_registered.find(dir_root) == m_registered.end());
322
323 DirRegistry registry;
324 int r = try_lock_directory(dir_root, replayer, &registry);
325 if (r < 0) {
326 return r;
327 }
328
329 dout(5) << ": dir_root=" << dir_root << " registered with replayer="
330 << replayer << dendl;
331 m_registered.emplace(dir_root, std::move(registry));
332 return 0;
333 }
334
335 void PeerReplayer::unregister_directory(const std::string &dir_root) {
336 dout(20) << ": dir_root=" << dir_root << dendl;
337
338 auto it = m_registered.find(dir_root);
339 ceph_assert(it != m_registered.end());
340
341 unlock_directory(it->first, it->second);
342 m_registered.erase(it);
343 if (std::find(m_directories.begin(), m_directories.end(), dir_root) == m_directories.end()) {
344 m_snap_sync_stats.erase(dir_root);
345 }
346 }
347
348 int PeerReplayer::try_lock_directory(const std::string &dir_root,
349 SnapshotReplayerThread *replayer, DirRegistry *registry) {
350 dout(20) << ": dir_root=" << dir_root << dendl;
351
352 int r = ceph_open(m_remote_mount, dir_root.c_str(), O_RDONLY | O_DIRECTORY, 0);
353 if (r < 0 && r != -ENOENT) {
354 derr << ": failed to open remote dir_root=" << dir_root << ": " << cpp_strerror(r)
355 << dendl;
356 return r;
357 }
358
359 if (r == -ENOENT) {
360 // we snap under dir_root, so mode does not matter much
361 r = ceph_mkdirs(m_remote_mount, dir_root.c_str(), 0755);
362 if (r < 0) {
363 derr << ": failed to create remote directory=" << dir_root << ": " << cpp_strerror(r)
364 << dendl;
365 return r;
366 }
367
368 r = ceph_open(m_remote_mount, dir_root.c_str(), O_RDONLY | O_DIRECTORY, 0);
369 if (r < 0) {
370 derr << ": failed to open remote dir_root=" << dir_root << ": " << cpp_strerror(r)
371 << dendl;
372 return r;
373 }
374 }
375
376 int fd = r;
377 r = ceph_flock(m_remote_mount, fd, LOCK_EX | LOCK_NB, (uint64_t)replayer->get_thread_id());
378 if (r != 0) {
379 if (r == -EWOULDBLOCK) {
380 dout(5) << ": dir_root=" << dir_root << " is locked by cephfs-mirror, "
381 << "will retry again" << dendl;
382 } else {
383 derr << ": failed to lock dir_root=" << dir_root << ": " << cpp_strerror(r)
384 << dendl;
385 }
386
387 if (ceph_close(m_remote_mount, fd) < 0) {
388 derr << ": failed to close (cleanup) remote dir_root=" << dir_root << ": "
389 << cpp_strerror(r) << dendl;
390 }
391 return r;
392 }
393
394 dout(10) << ": dir_root=" << dir_root << " locked" << dendl;
395
396 registry->fd = fd;
397 registry->replayer = replayer;
398 return 0;
399 }
400
401 void PeerReplayer::unlock_directory(const std::string &dir_root, const DirRegistry &registry) {
402 dout(20) << ": dir_root=" << dir_root << dendl;
403
404 int r = ceph_flock(m_remote_mount, registry.fd, LOCK_UN,
405 (uint64_t)registry.replayer->get_thread_id());
406 if (r < 0) {
407 derr << ": failed to unlock remote dir_root=" << dir_root << ": " << cpp_strerror(r)
408 << dendl;
409 return;
410 }
411
412 r = ceph_close(m_remote_mount, registry.fd);
413 if (r < 0) {
414 derr << ": failed to close remote dir_root=" << dir_root << ": " << cpp_strerror(r)
415 << dendl;
416 }
417
418 dout(10) << ": dir_root=" << dir_root << " unlocked" << dendl;
419 }
420
421 int PeerReplayer::build_snap_map(const std::string &dir_root,
422 std::map<uint64_t, std::string> *snap_map, bool is_remote) {
423 auto snap_dir = snapshot_dir_path(m_cct, dir_root);
424 dout(20) << ": dir_root=" << dir_root << ", snap_dir=" << snap_dir
425 << ", is_remote=" << is_remote << dendl;
426
427 auto lr_str = is_remote ? "remote" : "local";
428 auto mnt = is_remote ? m_remote_mount : m_local_mount;
429
430 ceph_dir_result *dirp = nullptr;
431 int r = ceph_opendir(mnt, snap_dir.c_str(), &dirp);
432 if (r < 0) {
433 if (is_remote && r == -ENOENT) {
434 return 0;
435 }
436 derr << ": failed to open " << lr_str << " snap directory=" << snap_dir
437 << ": " << cpp_strerror(r) << dendl;
438 return r;
439 }
440
441 std::set<std::string> snaps;
442 auto entry = ceph_readdir(mnt, dirp);
443 while (entry != NULL) {
444 auto d_name = std::string(entry->d_name);
445 dout(20) << ": entry=" << d_name << dendl;
446 if (d_name != "." && d_name != ".." && d_name.rfind("_", 0) != 0) {
447 snaps.emplace(d_name);
448 }
449
450 entry = ceph_readdir(mnt, dirp);
451 }
452
453 int rv = 0;
454 for (auto &snap : snaps) {
455 snap_info info;
456 auto snap_path = snapshot_path(snap_dir, snap);
457 r = ceph_get_snap_info(mnt, snap_path.c_str(), &info);
458 if (r < 0) {
459 derr << ": failed to fetch " << lr_str << " snap info for snap_path=" << snap_path
460 << ": " << cpp_strerror(r) << dendl;
461 rv = r;
462 break;
463 }
464
465 uint64_t snap_id;
466 if (is_remote) {
467 if (!info.nr_snap_metadata) {
468 derr << ": snap_path=" << snap_path << " has invalid metadata in remote snapshot"
469 << dendl;
470 rv = -EINVAL;
471 } else {
472 auto metadata = decode_snap_metadata(info.snap_metadata, info.nr_snap_metadata);
473 dout(20) << ": snap_path=" << snap_path << ", metadata=" << metadata << dendl;
474 auto it = metadata.find(PRIMARY_SNAP_ID_KEY);
475 if (it == metadata.end()) {
476 derr << ": snap_path=" << snap_path << " has missing \"" << PRIMARY_SNAP_ID_KEY
477 << "\" in metadata" << dendl;
478 rv = -EINVAL;
479 } else {
480 snap_id = std::stoull(it->second);
481 }
482 ceph_free_snap_info_buffer(&info);
483 }
484 } else {
485 snap_id = info.id;
486 }
487
488 if (rv != 0) {
489 break;
490 }
491 snap_map->emplace(snap_id, snap);
492 }
493
494 r = ceph_closedir(mnt, dirp);
495 if (r < 0) {
496 derr << ": failed to close " << lr_str << " snap directory=" << snap_dir
497 << ": " << cpp_strerror(r) << dendl;
498 }
499
500 dout(10) << ": " << lr_str << " snap_map=" << *snap_map << dendl;
501 return rv;
502 }
503
504 int PeerReplayer::propagate_snap_deletes(const std::string &dir_root,
505 const std::set<std::string> &snaps) {
506 dout(5) << ": dir_root=" << dir_root << ", deleted snapshots=" << snaps << dendl;
507
508 for (auto &snap : snaps) {
509 dout(20) << ": deleting dir_root=" << dir_root << ", snapshot=" << snap
510 << dendl;
511 int r = ceph_rmsnap(m_remote_mount, dir_root.c_str(), snap.c_str());
512 if (r < 0) {
513 derr << ": failed to delete remote snap dir_root=" << dir_root
514 << ", snapshot=" << snaps << ": " << cpp_strerror(r) << dendl;
515 return r;
516 }
517 inc_deleted_snap(dir_root);
518 }
519
520 return 0;
521 }
522
523 int PeerReplayer::propagate_snap_renames(
524 const std::string &dir_root,
525 const std::set<std::pair<std::string,std::string>> &snaps) {
526 dout(10) << ": dir_root=" << dir_root << ", renamed snapshots=" << snaps << dendl;
527
528 for (auto &snapp : snaps) {
529 auto from = snapshot_path(m_cct, dir_root, snapp.first);
530 auto to = snapshot_path(m_cct, dir_root, snapp.second);
531 dout(20) << ": renaming dir_root=" << dir_root << ", snapshot from="
532 << from << ", to=" << to << dendl;
533 int r = ceph_rename(m_remote_mount, from.c_str(), to.c_str());
534 if (r < 0) {
535 derr << ": failed to rename remote snap dir_root=" << dir_root
536 << ", snapshot from =" << from << ", to=" << to << ": "
537 << cpp_strerror(r) << dendl;
538 return r;
539 }
540 inc_renamed_snap(dir_root);
541 }
542
543 return 0;
544 }
545
546 int PeerReplayer::remote_mkdir(const std::string &epath, const struct ceph_statx &stx,
547 const FHandles &fh) {
548 dout(10) << ": remote epath=" << epath << dendl;
549
550 int r = ceph_mkdirat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), stx.stx_mode & ~S_IFDIR);
551 if (r < 0 && r != -EEXIST) {
552 derr << ": failed to create remote directory=" << epath << ": " << cpp_strerror(r)
553 << dendl;
554 return r;
555 }
556
557 r = ceph_chownat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), stx.stx_uid, stx.stx_gid,
558 AT_SYMLINK_NOFOLLOW);
559 if (r < 0) {
560 derr << ": failed to chown remote directory=" << epath << ": " << cpp_strerror(r)
561 << dendl;
562 return r;
563 }
564
565 r = ceph_chmodat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), stx.stx_mode & ~S_IFMT,
566 AT_SYMLINK_NOFOLLOW);
567 if (r < 0) {
568 derr << ": failed to chmod remote directory=" << epath << ": " << cpp_strerror(r)
569 << dendl;
570 return r;
571 }
572
573 struct timespec times[] = {{stx.stx_atime.tv_sec, stx.stx_atime.tv_nsec},
574 {stx.stx_mtime.tv_sec, stx.stx_mtime.tv_nsec}};
575 r = ceph_utimensat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), times, AT_SYMLINK_NOFOLLOW);
576 if (r < 0) {
577 derr << ": failed to change [am]time on remote directory=" << epath << ": "
578 << cpp_strerror(r) << dendl;
579 return r;
580 }
581
582 return 0;
583 }
584
585 #define NR_IOVECS 8 // # iovecs
586 #define IOVEC_SIZE (8 * 1024 * 1024) // buffer size for each iovec
587 int PeerReplayer::copy_to_remote(const std::string &dir_root, const std::string &epath,
588 const struct ceph_statx &stx, const FHandles &fh) {
589 dout(10) << ": dir_root=" << dir_root << ", epath=" << epath << dendl;
590 int l_fd;
591 int r_fd;
592 void *ptr;
593 struct iovec iov[NR_IOVECS];
594
595 int r = ceph_openat(m_local_mount, fh.c_fd, epath.c_str(), O_RDONLY | O_NOFOLLOW, 0);
596 if (r < 0) {
597 derr << ": failed to open local file path=" << epath << ": "
598 << cpp_strerror(r) << dendl;
599 return r;
600 }
601
602 l_fd = r;
603 r = ceph_openat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(),
604 O_CREAT | O_TRUNC | O_WRONLY | O_NOFOLLOW, stx.stx_mode);
605 if (r < 0) {
606 derr << ": failed to create remote file path=" << epath << ": "
607 << cpp_strerror(r) << dendl;
608 goto close_local_fd;
609 }
610
611 r_fd = r;
612 ptr = malloc(NR_IOVECS * IOVEC_SIZE);
613 if (!ptr) {
614 r = -ENOMEM;
615 derr << ": failed to allocate memory" << dendl;
616 goto close_remote_fd;
617 }
618
619 while (true) {
620 if (should_backoff(dir_root, &r)) {
621 dout(0) << ": backing off r=" << r << dendl;
622 break;
623 }
624
625 for (int i = 0; i < NR_IOVECS; ++i) {
626 iov[i].iov_base = (char*)ptr + IOVEC_SIZE*i;
627 iov[i].iov_len = IOVEC_SIZE;
628 }
629
630 r = ceph_preadv(m_local_mount, l_fd, iov, NR_IOVECS, -1);
631 if (r < 0) {
632 derr << ": failed to read local file path=" << epath << ": "
633 << cpp_strerror(r) << dendl;
634 break;
635 }
636 if (r == 0) {
637 break;
638 }
639
640 int iovs = (int)(r / IOVEC_SIZE);
641 int t = r % IOVEC_SIZE;
642 if (t) {
643 iov[iovs].iov_len = t;
644 ++iovs;
645 }
646
647 r = ceph_pwritev(m_remote_mount, r_fd, iov, iovs, -1);
648 if (r < 0) {
649 derr << ": failed to write remote file path=" << epath << ": "
650 << cpp_strerror(r) << dendl;
651 break;
652 }
653 }
654
655 if (r == 0) {
656 r = ceph_fsync(m_remote_mount, r_fd, 0);
657 if (r < 0) {
658 derr << ": failed to sync data for file path=" << epath << ": "
659 << cpp_strerror(r) << dendl;
660 }
661 }
662
663 free(ptr);
664
665 close_remote_fd:
666 if (ceph_close(m_remote_mount, r_fd) < 0) {
667 derr << ": failed to close remote fd path=" << epath << ": " << cpp_strerror(r)
668 << dendl;
669 return -EINVAL;
670 }
671
672 close_local_fd:
673 if (ceph_close(m_local_mount, l_fd) < 0) {
674 derr << ": failed to close local fd path=" << epath << ": " << cpp_strerror(r)
675 << dendl;
676 return -EINVAL;
677 }
678
679 return r == 0 ? 0 : r;
680 }
681
682 int PeerReplayer::remote_file_op(const std::string &dir_root, const std::string &epath,
683 const struct ceph_statx &stx, const FHandles &fh,
684 bool need_data_sync, bool need_attr_sync) {
685 dout(10) << ": dir_root=" << dir_root << ", epath=" << epath << ", need_data_sync=" << need_data_sync
686 << ", need_attr_sync=" << need_attr_sync << dendl;
687
688 int r;
689 if (need_data_sync) {
690 if (S_ISREG(stx.stx_mode)) {
691 r = copy_to_remote(dir_root, epath, stx, fh);
692 if (r < 0) {
693 derr << ": failed to copy path=" << epath << ": " << cpp_strerror(r) << dendl;
694 return r;
695 }
696 } else if (S_ISLNK(stx.stx_mode)) {
697 // free the remote link before relinking
698 r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), 0);
699 if (r < 0 && r != -ENOENT) {
700 derr << ": failed to remove remote symlink=" << epath << dendl;
701 return r;
702 }
703 char *target = (char *)alloca(stx.stx_size+1);
704 r = ceph_readlinkat(m_local_mount, fh.c_fd, epath.c_str(), target, stx.stx_size);
705 if (r < 0) {
706 derr << ": failed to readlink local path=" << epath << ": " << cpp_strerror(r)
707 << dendl;
708 return r;
709 }
710
711 target[stx.stx_size] = '\0';
712 r = ceph_symlinkat(m_remote_mount, target, fh.r_fd_dir_root, epath.c_str());
713 if (r < 0 && r != EEXIST) {
714 derr << ": failed to symlink remote path=" << epath << " to target=" << target
715 << ": " << cpp_strerror(r) << dendl;
716 return r;
717 }
718 } else {
719 dout(5) << ": skipping entry=" << epath << ": unsupported mode=" << stx.stx_mode
720 << dendl;
721 return 0;
722 }
723 }
724
725 if (need_attr_sync) {
726 r = ceph_chownat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), stx.stx_uid, stx.stx_gid,
727 AT_SYMLINK_NOFOLLOW);
728 if (r < 0) {
729 derr << ": failed to chown remote directory=" << epath << ": " << cpp_strerror(r)
730 << dendl;
731 return r;
732 }
733
734 r = ceph_chmodat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), stx.stx_mode & ~S_IFMT,
735 AT_SYMLINK_NOFOLLOW);
736 if (r < 0) {
737 derr << ": failed to chmod remote directory=" << epath << ": " << cpp_strerror(r)
738 << dendl;
739 return r;
740 }
741
742 struct timespec times[] = {{stx.stx_atime.tv_sec, stx.stx_atime.tv_nsec},
743 {stx.stx_mtime.tv_sec, stx.stx_mtime.tv_nsec}};
744 r = ceph_utimensat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), times, AT_SYMLINK_NOFOLLOW);
745 if (r < 0) {
746 derr << ": failed to change [am]time on remote directory=" << epath << ": "
747 << cpp_strerror(r) << dendl;
748 return r;
749 }
750 }
751
752 return 0;
753 }
754
755 int PeerReplayer::cleanup_remote_dir(const std::string &dir_root,
756 const std::string &epath, const FHandles &fh) {
757 dout(20) << ": dir_root=" << dir_root << ", epath=" << epath
758 << dendl;
759
760 struct ceph_statx tstx;
761 int r = ceph_statxat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), &tstx,
762 CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
763 CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
764 AT_NO_ATTR_SYNC | AT_SYMLINK_NOFOLLOW);
765 if (r < 0) {
766 derr << ": failed to stat remote directory=" << epath << ": "
767 << cpp_strerror(r) << dendl;
768 return r;
769 }
770
771 ceph_dir_result *tdirp;
772 r = opendirat(m_remote_mount, fh.r_fd_dir_root, epath, AT_SYMLINK_NOFOLLOW,
773 &tdirp);
774 if (r < 0) {
775 derr << ": failed to open remote directory=" << epath << ": "
776 << cpp_strerror(r) << dendl;
777 return r;
778 }
779
780 std::stack<SyncEntry> rm_stack;
781 rm_stack.emplace(SyncEntry(epath, tdirp, tstx));
782 while (!rm_stack.empty()) {
783 if (should_backoff(dir_root, &r)) {
784 dout(0) << ": backing off r=" << r << dendl;
785 break;
786 }
787
788 dout(20) << ": " << rm_stack.size() << " entries in stack" << dendl;
789 std::string e_name;
790 auto &entry = rm_stack.top();
791 dout(20) << ": top of stack path=" << entry.epath << dendl;
792 if (entry.is_directory()) {
793 struct ceph_statx stx;
794 struct dirent de;
795 while (true) {
796 r = ceph_readdirplus_r(m_remote_mount, entry.dirp, &de, &stx,
797 CEPH_STATX_MODE, AT_NO_ATTR_SYNC | AT_SYMLINK_NOFOLLOW, NULL);
798 if (r < 0) {
799 derr << ": failed to read remote directory=" << entry.epath << dendl;
800 break;
801 }
802 if (r == 0) {
803 break;
804 }
805
806 auto d_name = std::string(de.d_name);
807 if (d_name != "." && d_name != "..") {
808 e_name = d_name;
809 break;
810 }
811 }
812
813 if (r == 0) {
814 r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, entry.epath.c_str(), AT_REMOVEDIR);
815 if (r < 0) {
816 derr << ": failed to remove remote directory=" << entry.epath << ": "
817 << cpp_strerror(r) << dendl;
818 break;
819 }
820
821 dout(10) << ": done for remote directory=" << entry.epath << dendl;
822 if (ceph_closedir(m_remote_mount, entry.dirp) < 0) {
823 derr << ": failed to close remote directory=" << entry.epath << dendl;
824 }
825 rm_stack.pop();
826 continue;
827 }
828 if (r < 0) {
829 break;
830 }
831
832 auto epath = entry_path(entry.epath, e_name);
833 if (S_ISDIR(stx.stx_mode)) {
834 ceph_dir_result *dirp;
835 r = opendirat(m_remote_mount, fh.r_fd_dir_root, epath, AT_SYMLINK_NOFOLLOW,
836 &dirp);
837 if (r < 0) {
838 derr << ": failed to open remote directory=" << epath << ": "
839 << cpp_strerror(r) << dendl;
840 break;
841 }
842 rm_stack.emplace(SyncEntry(epath, dirp, stx));
843 } else {
844 rm_stack.emplace(SyncEntry(epath, stx));
845 }
846 } else {
847 r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, entry.epath.c_str(), 0);
848 if (r < 0) {
849 derr << ": failed to remove remote directory=" << entry.epath << ": "
850 << cpp_strerror(r) << dendl;
851 break;
852 }
853 dout(10) << ": done for remote file=" << entry.epath << dendl;
854 rm_stack.pop();
855 }
856 }
857
858 while (!rm_stack.empty()) {
859 auto &entry = rm_stack.top();
860 if (entry.is_directory()) {
861 dout(20) << ": closing remote directory=" << entry.epath << dendl;
862 if (ceph_closedir(m_remote_mount, entry.dirp) < 0) {
863 derr << ": failed to close remote directory=" << entry.epath << dendl;
864 }
865 }
866
867 rm_stack.pop();
868 }
869
870 return r;
871 }
872
873 int PeerReplayer::should_sync_entry(const std::string &epath, const struct ceph_statx &cstx,
874 const FHandles &fh, bool *need_data_sync, bool *need_attr_sync) {
875 dout(10) << ": epath=" << epath << dendl;
876
877 *need_data_sync = false;
878 *need_attr_sync = false;
879 struct ceph_statx pstx;
880 int r = ceph_statxat(fh.p_mnt, fh.p_fd, epath.c_str(), &pstx,
881 CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
882 CEPH_STATX_SIZE | CEPH_STATX_CTIME | CEPH_STATX_MTIME,
883 AT_NO_ATTR_SYNC | AT_SYMLINK_NOFOLLOW);
884 if (r < 0 && r != -ENOENT && r != -ENOTDIR) {
885 derr << ": failed to stat prev entry= " << epath << ": " << cpp_strerror(r)
886 << dendl;
887 return r;
888 }
889
890 if (r < 0) {
891 // inode does not exist in prev snapshot or file type has changed
892 // (file was S_IFREG earlier, S_IFDIR now).
893 dout(5) << ": entry=" << epath << ", r=" << r << dendl;
894 *need_data_sync = true;
895 *need_attr_sync = true;
896 return 0;
897 }
898
899 dout(10) << ": local cur statx: mode=" << cstx.stx_mode << ", uid=" << cstx.stx_uid
900 << ", gid=" << cstx.stx_gid << ", size=" << cstx.stx_size << ", ctime="
901 << cstx.stx_ctime << ", mtime=" << cstx.stx_mtime << dendl;
902 dout(10) << ": local prev statx: mode=" << pstx.stx_mode << ", uid=" << pstx.stx_uid
903 << ", gid=" << pstx.stx_gid << ", size=" << pstx.stx_size << ", ctime="
904 << pstx.stx_ctime << ", mtime=" << pstx.stx_mtime << dendl;
905 if ((cstx.stx_mode & S_IFMT) != (pstx.stx_mode & S_IFMT)) {
906 dout(5) << ": entry=" << epath << " has mode mismatch" << dendl;
907 *need_data_sync = true;
908 *need_attr_sync = true;
909 } else {
910 *need_data_sync = (cstx.stx_size != pstx.stx_size) || (cstx.stx_mtime != pstx.stx_mtime);
911 *need_attr_sync = (cstx.stx_ctime != pstx.stx_ctime);
912 }
913
914 return 0;
915 }
916
917 int PeerReplayer::propagate_deleted_entries(const std::string &dir_root,
918 const std::string &epath, const FHandles &fh) {
919 dout(10) << ": dir_root=" << dir_root << ", epath=" << epath << dendl;
920
921 ceph_dir_result *dirp;
922 int r = opendirat(fh.p_mnt, fh.p_fd, epath, AT_SYMLINK_NOFOLLOW, &dirp);
923 if (r < 0) {
924 if (r == -ELOOP) {
925 dout(5) << ": epath=" << epath << " is a symbolic link -- mode sync"
926 << " done when traversing parent" << dendl;
927 return 0;
928 }
929 if (r == -ENOTDIR) {
930 dout(5) << ": epath=" << epath << " is not a directory -- mode sync"
931 << " done when traversing parent" << dendl;
932 return 0;
933 }
934 if (r == -ENOENT) {
935 dout(5) << ": epath=" << epath << " missing in previous-snap/remote dir-root"
936 << dendl;
937 }
938 return r;
939 }
940
941 struct dirent *dire = (struct dirent *)alloca(512 * sizeof(struct dirent));
942 while (true) {
943 if (should_backoff(dir_root, &r)) {
944 dout(0) << ": backing off r=" << r << dendl;
945 break;
946 }
947
948 int len = ceph_getdents(fh.p_mnt, dirp, (char *)dire, 512);
949 if (len < 0) {
950 derr << ": failed to read directory entries: " << cpp_strerror(len) << dendl;
951 r = len;
952 // flip errno to signal that we got an err (possible the
953 // snapshot getting deleted in midst).
954 if (r == -ENOENT) {
955 r = -EINVAL;
956 }
957 break;
958 }
959 if (len == 0) {
960 dout(10) << ": reached EOD" << dendl;
961 break;
962 }
963 int nr = len / sizeof(struct dirent);
964 for (int i = 0; i < nr; ++i) {
965 if (should_backoff(dir_root, &r)) {
966 dout(0) << ": backing off r=" << r << dendl;
967 break;
968 }
969 std::string d_name = std::string(dire[i].d_name);
970 if (d_name == "." || d_name == "..") {
971 continue;
972 }
973
974 struct ceph_statx pstx;
975 auto dpath = entry_path(epath, d_name);
976 r = ceph_statxat(fh.p_mnt, fh.p_fd, dpath.c_str(), &pstx,
977 CEPH_STATX_MODE, AT_NO_ATTR_SYNC | AT_SYMLINK_NOFOLLOW);
978 if (r < 0) {
979 derr << ": failed to stat (prev) directory=" << dpath << ": "
980 << cpp_strerror(r) << dendl;
981 // flip errno to signal that we got an err (possible the
982 // snapshot getting deleted in midst).
983 if (r == -ENOENT) {
984 r = -EINVAL;
985 }
986 return r;
987 }
988
989 struct ceph_statx cstx;
990 r = ceph_statxat(m_local_mount, fh.c_fd, dpath.c_str(), &cstx,
991 CEPH_STATX_MODE, AT_NO_ATTR_SYNC | AT_SYMLINK_NOFOLLOW);
992 if (r < 0 && r != -ENOENT) {
993 derr << ": failed to stat local (cur) directory=" << dpath << ": "
994 << cpp_strerror(r) << dendl;
995 return r;
996 }
997
998 bool purge_remote = true;
999 if (r == 0) {
1000 // directory entry present in both snapshots -- check inode
1001 // type
1002 if ((pstx.stx_mode & S_IFMT) == (cstx.stx_mode & S_IFMT)) {
1003 dout(5) << ": mode matches for entry=" << d_name << dendl;
1004 purge_remote = false;
1005 } else {
1006 dout(5) << ": mode mismatch for entry=" << d_name << dendl;
1007 }
1008 } else {
1009 dout(5) << ": entry=" << d_name << " missing in current snapshot" << dendl;
1010 }
1011
1012 if (purge_remote) {
1013 dout(5) << ": purging remote entry=" << dpath << dendl;
1014 if (S_ISDIR(pstx.stx_mode)) {
1015 r = cleanup_remote_dir(dir_root, dpath, fh);
1016 } else {
1017 r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, dpath.c_str(), 0);
1018 }
1019
1020 if (r < 0 && r != -ENOENT) {
1021 derr << ": failed to cleanup remote entry=" << d_name << ": "
1022 << cpp_strerror(r) << dendl;
1023 return r;
1024 }
1025 }
1026 }
1027 }
1028
1029 ceph_closedir(fh.p_mnt, dirp);
1030 return r;
1031 }
1032
1033 int PeerReplayer::open_dir(MountRef mnt, const std::string &dir_path,
1034 boost::optional<uint64_t> snap_id) {
1035 dout(20) << ": dir_path=" << dir_path << dendl;
1036 if (snap_id) {
1037 dout(20) << ": expected snapshot id=" << *snap_id << dendl;
1038 }
1039
1040 int fd = ceph_open(mnt, dir_path.c_str(), O_DIRECTORY | O_RDONLY, 0);
1041 if (fd < 0) {
1042 derr << ": cannot open dir_path=" << dir_path << ": " << cpp_strerror(fd)
1043 << dendl;
1044 return fd;
1045 }
1046
1047 if (!snap_id) {
1048 return fd;
1049 }
1050
1051 snap_info info;
1052 int r = ceph_get_snap_info(mnt, dir_path.c_str(), &info);
1053 if (r < 0) {
1054 derr << ": failed to fetch snap_info for path=" << dir_path
1055 << ": " << cpp_strerror(r) << dendl;
1056 ceph_close(mnt, fd);
1057 return r;
1058 }
1059
1060 if (info.id != *snap_id) {
1061 dout(5) << ": got mismatching snapshot id for path=" << dir_path << " (" << info.id
1062 << " vs " << *snap_id << ") -- possible recreate" << dendl;
1063 ceph_close(mnt, fd);
1064 return -EINVAL;
1065 }
1066
1067 return fd;
1068 }
1069
1070 int PeerReplayer::pre_sync_check_and_open_handles(
1071 const std::string &dir_root,
1072 const Snapshot &current, boost::optional<Snapshot> prev,
1073 FHandles *fh) {
1074 dout(20) << ": dir_root=" << dir_root << ", current=" << current << dendl;
1075 if (prev) {
1076 dout(20) << ": prev=" << prev << dendl;
1077 }
1078
1079 auto cur_snap_path = snapshot_path(m_cct, dir_root, current.first);
1080 auto fd = open_dir(m_local_mount, cur_snap_path, current.second);
1081 if (fd < 0) {
1082 return fd;
1083 }
1084
1085 // current snapshot file descriptor
1086 fh->c_fd = fd;
1087
1088 MountRef mnt;
1089 if (prev) {
1090 mnt = m_local_mount;
1091 auto prev_snap_path = snapshot_path(m_cct, dir_root, (*prev).first);
1092 fd = open_dir(mnt, prev_snap_path, (*prev).second);
1093 } else {
1094 mnt = m_remote_mount;
1095 fd = open_dir(mnt, dir_root, boost::none);
1096 }
1097
1098 if (fd < 0) {
1099 if (!prev || fd != -ENOENT) {
1100 ceph_close(m_local_mount, fh->c_fd);
1101 return fd;
1102 }
1103
1104 // ENOENT of previous snap
1105 dout(5) << ": previous snapshot=" << *prev << " missing" << dendl;
1106 mnt = m_remote_mount;
1107 fd = open_dir(mnt, dir_root, boost::none);
1108 if (fd < 0) {
1109 ceph_close(m_local_mount, fh->c_fd);
1110 return fd;
1111 }
1112 }
1113
1114 // "previous" snapshot or dir_root file descriptor
1115 fh->p_fd = fd;
1116 fh->p_mnt = mnt;
1117
1118 {
1119 std::scoped_lock locker(m_lock);
1120 auto it = m_registered.find(dir_root);
1121 ceph_assert(it != m_registered.end());
1122 fh->r_fd_dir_root = it->second.fd;
1123 }
1124
1125 dout(5) << ": using " << ((fh->p_mnt == m_local_mount) ? "local (previous) snapshot" : "remote dir_root")
1126 << " for incremental transfer" << dendl;
1127 return 0;
1128 }
1129
1130 void PeerReplayer::post_sync_close_handles(const FHandles &fh) {
1131 dout(20) << dendl;
1132
1133 // @FHandles.r_fd_dir_root is closed in @unregister_directory since
1134 // its used to acquire an exclusive lock on remote dir_root.
1135 ceph_close(m_local_mount, fh.c_fd);
1136 ceph_close(fh.p_mnt, fh.p_fd);
1137 }
1138
1139 int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot &current,
1140 boost::optional<Snapshot> prev) {
1141 dout(20) << ": dir_root=" << dir_root << ", current=" << current << dendl;
1142 if (prev) {
1143 dout(20) << ": incremental sync check from prev=" << prev << dendl;
1144 }
1145
1146 FHandles fh;
1147 int r = pre_sync_check_and_open_handles(dir_root, current, prev, &fh);
1148 if (r < 0) {
1149 dout(5) << ": cannot proceeed with sync: " << cpp_strerror(r) << dendl;
1150 return r;
1151 }
1152
1153 BOOST_SCOPE_EXIT_ALL( (this)(&fh) ) {
1154 post_sync_close_handles(fh);
1155 };
1156
1157 // record that we are going to "dirty" the data under this
1158 // directory root
1159 auto snap_id_str{stringify(current.second)};
1160 r = ceph_fsetxattr(m_remote_mount, fh.r_fd_dir_root, "ceph.mirror.dirty_snap_id",
1161 snap_id_str.c_str(), snap_id_str.size(), 0);
1162 if (r < 0) {
1163 derr << ": error setting \"ceph.mirror.dirty_snap_id\" on dir_root=" << dir_root
1164 << ": " << cpp_strerror(r) << dendl;
1165 return r;
1166 }
1167
1168 struct ceph_statx tstx;
1169 r = ceph_fstatx(m_local_mount, fh.c_fd, &tstx,
1170 CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
1171 CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
1172 AT_NO_ATTR_SYNC | AT_SYMLINK_NOFOLLOW);
1173 if (r < 0) {
1174 derr << ": failed to stat snap=" << current.first << ": " << cpp_strerror(r)
1175 << dendl;
1176 return r;
1177 }
1178
1179 ceph_dir_result *tdirp;
1180 r = ceph_fdopendir(m_local_mount, fh.c_fd, &tdirp);
1181 if (r < 0) {
1182 derr << ": failed to open local snap=" << current.first << ": " << cpp_strerror(r)
1183 << dendl;
1184 return r;
1185 }
1186
1187 std::stack<SyncEntry> sync_stack;
1188 sync_stack.emplace(SyncEntry(".", tdirp, tstx));
1189 while (!sync_stack.empty()) {
1190 if (should_backoff(dir_root, &r)) {
1191 dout(0) << ": backing off r=" << r << dendl;
1192 break;
1193 }
1194
1195 dout(20) << ": " << sync_stack.size() << " entries in stack" << dendl;
1196 std::string e_name;
1197 auto &entry = sync_stack.top();
1198 dout(20) << ": top of stack path=" << entry.epath << dendl;
1199 if (entry.is_directory()) {
1200 // entry is a directory -- propagate deletes for missing entries
1201 // (and changed inode types) to the remote filesystem.
1202 if (!entry.needs_remote_sync()) {
1203 r = propagate_deleted_entries(dir_root, entry.epath, fh);
1204 if (r < 0 && r != -ENOENT) {
1205 derr << ": failed to propagate missing dirs: " << cpp_strerror(r) << dendl;
1206 break;
1207 }
1208 entry.set_remote_synced();
1209 }
1210
1211 struct ceph_statx stx;
1212 struct dirent de;
1213 while (true) {
1214 r = ceph_readdirplus_r(m_local_mount, entry.dirp, &de, &stx,
1215 CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
1216 CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
1217 AT_NO_ATTR_SYNC | AT_SYMLINK_NOFOLLOW, NULL);
1218 if (r < 0) {
1219 derr << ": failed to local read directory=" << entry.epath << dendl;
1220 break;
1221 }
1222 if (r == 0) {
1223 break;
1224 }
1225
1226 auto d_name = std::string(de.d_name);
1227 if (d_name != "." && d_name != "..") {
1228 e_name = d_name;
1229 break;
1230 }
1231 }
1232
1233 if (r == 0) {
1234 dout(10) << ": done for directory=" << entry.epath << dendl;
1235 if (ceph_closedir(m_local_mount, entry.dirp) < 0) {
1236 derr << ": failed to close local directory=" << entry.epath << dendl;
1237 }
1238 sync_stack.pop();
1239 continue;
1240 }
1241 if (r < 0) {
1242 break;
1243 }
1244
1245 auto epath = entry_path(entry.epath, e_name);
1246 if (S_ISDIR(stx.stx_mode)) {
1247 r = remote_mkdir(epath, stx, fh);
1248 if (r < 0) {
1249 break;
1250 }
1251 ceph_dir_result *dirp;
1252 r = opendirat(m_local_mount, fh.c_fd, epath, AT_SYMLINK_NOFOLLOW, &dirp);
1253 if (r < 0) {
1254 derr << ": failed to open local directory=" << epath << ": "
1255 << cpp_strerror(r) << dendl;
1256 break;
1257 }
1258 sync_stack.emplace(SyncEntry(epath, dirp, stx));
1259 } else {
1260 sync_stack.emplace(SyncEntry(epath, stx));
1261 }
1262 } else {
1263 bool need_data_sync = true;
1264 bool need_attr_sync = true;
1265 r = should_sync_entry(entry.epath, entry.stx, fh,
1266 &need_data_sync, &need_attr_sync);
1267 if (r < 0) {
1268 break;
1269 }
1270
1271 dout(5) << ": entry=" << entry.epath << ", data_sync=" << need_data_sync
1272 << ", attr_sync=" << need_attr_sync << dendl;
1273 if (need_data_sync || need_attr_sync) {
1274 r = remote_file_op(dir_root, entry.epath, entry.stx, fh, need_data_sync,
1275 need_attr_sync);
1276 if (r < 0) {
1277 break;
1278 }
1279 }
1280 dout(10) << ": done for epath=" << entry.epath << dendl;
1281 sync_stack.pop();
1282 }
1283 }
1284
1285 while (!sync_stack.empty()) {
1286 auto &entry = sync_stack.top();
1287 if (entry.is_directory()) {
1288 dout(20) << ": closing local directory=" << entry.epath << dendl;
1289 if (ceph_closedir(m_local_mount, entry.dirp) < 0) {
1290 derr << ": failed to close local directory=" << entry.epath << dendl;
1291 }
1292 }
1293
1294 sync_stack.pop();
1295 }
1296
1297 return r;
1298 }
1299
1300 int PeerReplayer::synchronize(const std::string &dir_root, const Snapshot &current,
1301 boost::optional<Snapshot> prev) {
1302 dout(20) << ": dir_root=" << dir_root << ", current=" << current << dendl;
1303 if (prev) {
1304 dout(20) << ": prev=" << prev << dendl;
1305 }
1306
1307 int r = ceph_getxattr(m_remote_mount, dir_root.c_str(), "ceph.mirror.dirty_snap_id", nullptr, 0);
1308 if (r < 0 && r != -ENODATA) {
1309 derr << ": failed to fetch primary_snap_id length from dir_root=" << dir_root
1310 << ": " << cpp_strerror(r) << dendl;
1311 return r;
1312 }
1313
1314 // no xattr, can't determine which snap the data belongs to!
1315 if (r < 0) {
1316 dout(5) << ": missing \"ceph.mirror.dirty_snap_id\" xattr on remote -- using"
1317 << " incremental sync with remote scan" << dendl;
1318 r = do_synchronize(dir_root, current, boost::none);
1319 } else {
1320 size_t xlen = r;
1321 char *val = (char *)alloca(xlen+1);
1322 r = ceph_getxattr(m_remote_mount, dir_root.c_str(), "ceph.mirror.dirty_snap_id", (void*)val, xlen);
1323 if (r < 0) {
1324 derr << ": failed to fetch \"dirty_snap_id\" for dir_root: " << dir_root
1325 << ": " << cpp_strerror(r) << dendl;
1326 return r;
1327 }
1328
1329 val[xlen] = '\0';
1330 uint64_t dirty_snap_id = atoll(val);
1331
1332 dout(20) << ": dirty_snap_id: " << dirty_snap_id << " vs (" << current.second
1333 << "," << (prev ? stringify((*prev).second) : "~") << ")" << dendl;
1334 if (prev && (dirty_snap_id == (*prev).second || dirty_snap_id == current.second)) {
1335 dout(5) << ": match -- using incremental sync with local scan" << dendl;
1336 r = do_synchronize(dir_root, current, prev);
1337 } else {
1338 dout(5) << ": mismatch -- using incremental sync with remote scan" << dendl;
1339 r = do_synchronize(dir_root, current, boost::none);
1340 }
1341 }
1342
1343 // snap sync failed -- bail out!
1344 if (r < 0) {
1345 return r;
1346 }
1347
1348 auto cur_snap_id_str{stringify(current.second)};
1349 snap_metadata snap_meta[] = {{PRIMARY_SNAP_ID_KEY.c_str(), cur_snap_id_str.c_str()}};
1350 r = ceph_mksnap(m_remote_mount, dir_root.c_str(), current.first.c_str(), 0755,
1351 snap_meta, sizeof(snap_meta)/sizeof(snap_metadata));
1352 if (r < 0) {
1353 derr << ": failed to snap remote directory dir_root=" << dir_root
1354 << ": " << cpp_strerror(r) << dendl;
1355 }
1356
1357 return r;
1358 }
1359
1360 int PeerReplayer::do_sync_snaps(const std::string &dir_root) {
1361 dout(20) << ": dir_root=" << dir_root << dendl;
1362
1363 std::map<uint64_t, std::string> local_snap_map;
1364 std::map<uint64_t, std::string> remote_snap_map;
1365
1366 int r = build_snap_map(dir_root, &local_snap_map);
1367 if (r < 0) {
1368 derr << ": failed to build local snap map" << dendl;
1369 return r;
1370 }
1371
1372 r = build_snap_map(dir_root, &remote_snap_map, true);
1373 if (r < 0) {
1374 derr << ": failed to build remote snap map" << dendl;
1375 return r;
1376 }
1377
1378 // infer deleted and renamed snapshots from local and remote
1379 // snap maps
1380 std::set<std::string> snaps_deleted;
1381 std::set<std::pair<std::string,std::string>> snaps_renamed;
1382 for (auto &[primary_snap_id, snap_name] : remote_snap_map) {
1383 auto it = local_snap_map.find(primary_snap_id);
1384 if (it == local_snap_map.end()) {
1385 snaps_deleted.emplace(snap_name);
1386 } else if (it->second != snap_name) {
1387 snaps_renamed.emplace(std::make_pair(snap_name, it->second));
1388 }
1389 }
1390
1391 r = propagate_snap_deletes(dir_root, snaps_deleted);
1392 if (r < 0) {
1393 derr << ": failed to propgate deleted snapshots" << dendl;
1394 return r;
1395 }
1396
1397 r = propagate_snap_renames(dir_root, snaps_renamed);
1398 if (r < 0) {
1399 derr << ": failed to propgate renamed snapshots" << dendl;
1400 return r;
1401 }
1402
1403 // start mirroring snapshots from the last snap-id synchronized
1404 uint64_t last_snap_id = 0;
1405 std::string last_snap_name;
1406 if (!remote_snap_map.empty()) {
1407 auto last = remote_snap_map.rbegin();
1408 last_snap_id = last->first;
1409 last_snap_name = last->second;
1410 set_last_synced_snap(dir_root, last_snap_id, last_snap_name);
1411 }
1412
1413 dout(5) << ": last snap-id transferred=" << last_snap_id << dendl;
1414 auto it = local_snap_map.upper_bound(last_snap_id);
1415 if (it == local_snap_map.end()) {
1416 dout(20) << ": nothing to synchronize" << dendl;
1417 return 0;
1418 }
1419
1420 auto snaps_per_cycle = g_ceph_context->_conf.get_val<uint64_t>(
1421 "cephfs_mirror_max_snapshot_sync_per_cycle");
1422
1423 dout(10) << ": synchronizing from snap-id=" << it->first << dendl;
1424 for (; it != local_snap_map.end(); ++it) {
1425 set_current_syncing_snap(dir_root, it->first, it->second);
1426 auto start = clock::now();
1427 boost::optional<Snapshot> prev = boost::none;
1428 if (last_snap_id != 0) {
1429 prev = std::make_pair(last_snap_name, last_snap_id);
1430 }
1431 r = synchronize(dir_root, std::make_pair(it->second, it->first), prev);
1432 if (r < 0) {
1433 derr << ": failed to synchronize dir_root=" << dir_root
1434 << ", snapshot=" << it->second << dendl;
1435 clear_current_syncing_snap(dir_root);
1436 return r;
1437 }
1438 std::chrono::duration<double> duration = clock::now() - start;
1439 set_last_synced_stat(dir_root, it->first, it->second, duration.count());
1440 if (--snaps_per_cycle == 0) {
1441 break;
1442 }
1443
1444 last_snap_name = it->second;
1445 last_snap_id = it->first;
1446 }
1447
1448 return 0;
1449 }
1450
1451 void PeerReplayer::sync_snaps(const std::string &dir_root,
1452 std::unique_lock<ceph::mutex> &locker) {
1453 dout(20) << ": dir_root=" << dir_root << dendl;
1454 locker.unlock();
1455 int r = do_sync_snaps(dir_root);
1456 if (r < 0) {
1457 derr << ": failed to sync snapshots for dir_root=" << dir_root << dendl;
1458 }
1459 locker.lock();
1460 if (r < 0) {
1461 _inc_failed_count(dir_root);
1462 } else {
1463 _reset_failed_count(dir_root);
1464 }
1465 }
1466
1467 void PeerReplayer::run(SnapshotReplayerThread *replayer) {
1468 dout(10) << ": snapshot replayer=" << replayer << dendl;
1469
1470 time last_directory_scan = clock::zero();
1471 auto scan_interval = g_ceph_context->_conf.get_val<uint64_t>(
1472 "cephfs_mirror_directory_scan_interval");
1473
1474 std::unique_lock locker(m_lock);
1475 while (true) {
1476 // do not check if client is blocklisted under lock
1477 m_cond.wait_for(locker, 1s, [this]{return is_stopping();});
1478 if (is_stopping()) {
1479 dout(5) << ": exiting" << dendl;
1480 break;
1481 }
1482
1483 locker.unlock();
1484
1485 if (m_fs_mirror->is_blocklisted()) {
1486 dout(5) << ": exiting as client is blocklisted" << dendl;
1487 break;
1488 }
1489
1490 locker.lock();
1491
1492 auto now = clock::now();
1493 std::chrono::duration<double> timo = now - last_directory_scan;
1494 if (timo.count() >= scan_interval && m_directories.size()) {
1495 dout(20) << ": trying to pick from " << m_directories.size() << " directories" << dendl;
1496 auto dir_root = pick_directory();
1497 if (dir_root) {
1498 dout(5) << ": picked dir_root=" << *dir_root << dendl;
1499 int r = register_directory(*dir_root, replayer);
1500 if (r == 0) {
1501 sync_snaps(*dir_root, locker);
1502 unregister_directory(*dir_root);
1503 }
1504 }
1505
1506 last_directory_scan = now;
1507 }
1508 }
1509 }
1510
1511 void PeerReplayer::peer_status(Formatter *f) {
1512 std::scoped_lock locker(m_lock);
1513 f->open_object_section("stats");
1514 for (auto &[dir_root, sync_stat] : m_snap_sync_stats) {
1515 f->open_object_section(dir_root);
1516 if (sync_stat.failed) {
1517 f->dump_string("state", "failed");
1518 } else if (!sync_stat.current_syncing_snap) {
1519 f->dump_string("state", "idle");
1520 } else {
1521 f->dump_string("state", "syncing");
1522 f->open_object_section("current_sycning_snap");
1523 f->dump_unsigned("id", (*sync_stat.current_syncing_snap).first);
1524 f->dump_string("name", (*sync_stat.current_syncing_snap).second);
1525 f->close_section();
1526 }
1527 if (sync_stat.last_synced_snap) {
1528 f->open_object_section("last_synced_snap");
1529 f->dump_unsigned("id", (*sync_stat.last_synced_snap).first);
1530 f->dump_string("name", (*sync_stat.last_synced_snap).second);
1531 if (sync_stat.last_sync_duration) {
1532 f->dump_float("sync_duration", *sync_stat.last_sync_duration);
1533 f->dump_stream("sync_time_stamp") << sync_stat.last_synced;
1534 }
1535 f->close_section();
1536 }
1537 f->dump_unsigned("snaps_synced", sync_stat.synced_snap_count);
1538 f->dump_unsigned("snaps_deleted", sync_stat.deleted_snap_count);
1539 f->dump_unsigned("snaps_renamed", sync_stat.renamed_snap_count);
1540 f->close_section(); // dir_root
1541 }
1542 f->close_section(); // stats
1543 }
1544
1545 void PeerReplayer::reopen_logs() {
1546 std::scoped_lock locker(m_lock);
1547
1548 if (m_remote_cluster) {
1549 reinterpret_cast<CephContext *>(m_remote_cluster->cct())->reopen_logs();
1550 }
1551 }
1552
1553 } // namespace mirror
1554 } // namespace cephfs