1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
9 #include <boost/scope_exit.hpp>
11 #include "common/admin_socket.h"
12 #include "common/ceph_context.h"
13 #include "common/debug.h"
14 #include "common/errno.h"
16 #include "PeerReplayer.h"
19 #include "json_spirit/json_spirit.h"
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_cephfs_mirror
24 #define dout_prefix *_dout << "cephfs::mirror::PeerReplayer(" \
25 << m_peer.uuid << ") " << __func__
34 const std::string PEER_CONFIG_KEY_PREFIX
= "cephfs/mirror/peer";
36 std::string
snapshot_dir_path(CephContext
*cct
, const std::string
&path
) {
37 return path
+ "/" + cct
->_conf
->client_snapdir
;
40 std::string
snapshot_path(const std::string
&snap_dir
, const std::string
&snap_name
) {
41 return snap_dir
+ "/" + snap_name
;
44 std::string
snapshot_path(CephContext
*cct
, const std::string
&path
, const std::string
&snap_name
) {
45 return path
+ "/" + cct
->_conf
->client_snapdir
+ "/" + snap_name
;
48 std::string
entry_path(const std::string
&dir
, const std::string
&name
) {
49 return dir
+ "/" + name
;
52 std::map
<std::string
, std::string
> decode_snap_metadata(snap_metadata
*snap_metadata
,
53 size_t nr_snap_metadata
) {
54 std::map
<std::string
, std::string
> metadata
;
55 for (size_t i
= 0; i
< nr_snap_metadata
; ++i
) {
56 metadata
.emplace(snap_metadata
[i
].key
, snap_metadata
[i
].value
);
62 std::string
peer_config_key(const std::string
&fs_name
, const std::string
&uuid
) {
63 return PEER_CONFIG_KEY_PREFIX
+ "/" + fs_name
+ "/" + uuid
;
66 class PeerAdminSocketCommand
{
68 virtual ~PeerAdminSocketCommand() {
70 virtual int call(Formatter
*f
) = 0;
73 class StatusCommand
: public PeerAdminSocketCommand
{
75 explicit StatusCommand(PeerReplayer
*peer_replayer
)
76 : peer_replayer(peer_replayer
) {
79 int call(Formatter
*f
) override
{
80 peer_replayer
->peer_status(f
);
85 PeerReplayer
*peer_replayer
;
88 // helper to open a directory relative to a file descriptor
89 int opendirat(MountRef mnt
, int dirfd
, const std::string
&relpath
, int flags
,
90 ceph_dir_result
**dirp
) {
91 int r
= ceph_openat(mnt
, dirfd
, relpath
.c_str(), flags
, 0);
97 r
= ceph_fdopendir(mnt
, fd
, dirp
);
102 } // anonymous namespace
104 class PeerReplayerAdminSocketHook
: public AdminSocketHook
{
106 PeerReplayerAdminSocketHook(CephContext
*cct
, const Filesystem
&filesystem
,
107 const Peer
&peer
, PeerReplayer
*peer_replayer
)
108 : admin_socket(cct
->get_admin_socket()) {
112 // mirror peer status format is name@id uuid
113 cmd
= "fs mirror peer status "
114 + stringify(filesystem
.fs_name
) + "@" + stringify(filesystem
.fscid
)
116 + stringify(peer
.uuid
);
117 r
= admin_socket
->register_command(
118 cmd
, this, "get peer mirror status");
120 commands
[cmd
] = new StatusCommand(peer_replayer
);
124 ~PeerReplayerAdminSocketHook() override
{
125 admin_socket
->unregister_commands(this);
126 for (auto &[command
, cmdptr
] : commands
) {
131 int call(std::string_view command
, const cmdmap_t
& cmdmap
,
132 Formatter
*f
, std::ostream
&errss
, bufferlist
&out
) override
{
133 auto p
= commands
.at(std::string(command
));
138 typedef std::map
<std::string
, PeerAdminSocketCommand
*, std::less
<>> Commands
;
140 AdminSocket
*admin_socket
;
144 PeerReplayer::PeerReplayer(CephContext
*cct
, FSMirror
*fs_mirror
,
145 RadosRef local_cluster
, const Filesystem
&filesystem
,
146 const Peer
&peer
, const std::set
<std::string
, std::less
<>> &directories
,
147 MountRef mount
, ServiceDaemon
*service_daemon
)
149 m_fs_mirror(fs_mirror
),
150 m_local_cluster(local_cluster
),
151 m_filesystem(filesystem
),
153 m_directories(directories
.begin(), directories
.end()),
154 m_local_mount(mount
),
155 m_service_daemon(service_daemon
),
156 m_asok_hook(new PeerReplayerAdminSocketHook(cct
, filesystem
, peer
, this)),
157 m_lock(ceph::make_mutex("cephfs::mirror::PeerReplayer::" + stringify(peer
.uuid
))) {
158 // reset sync stats sent via service daemon
159 m_service_daemon
->add_or_update_peer_attribute(m_filesystem
.fscid
, m_peer
,
160 SERVICE_DAEMON_FAILED_DIR_COUNT_KEY
, (uint64_t)0);
161 m_service_daemon
->add_or_update_peer_attribute(m_filesystem
.fscid
, m_peer
,
162 SERVICE_DAEMON_RECOVERED_DIR_COUNT_KEY
, (uint64_t)0);
165 PeerReplayer::~PeerReplayer() {
169 int PeerReplayer::init() {
170 dout(20) << ": initial dir list=[" << m_directories
<< "]" << dendl
;
171 for (auto &dir_root
: m_directories
) {
172 m_snap_sync_stats
.emplace(dir_root
, SnapSyncStat());
175 auto &remote_client
= m_peer
.remote
.client_name
;
176 auto &remote_cluster
= m_peer
.remote
.cluster_name
;
177 auto remote_filesystem
= Filesystem
{0, m_peer
.remote
.fs_name
};
179 std::string key
= peer_config_key(m_filesystem
.fs_name
, m_peer
.uuid
);
182 "\"prefix\": \"config-key get\", "
183 "\"key\": \"" + key
+ "\""
189 int r
= m_local_cluster
->mon_command(cmd
, in_bl
, &out_bl
, nullptr);
190 dout(5) << ": mon command r=" << r
<< dendl
;
191 if (r
< 0 && r
!= -ENOENT
) {
195 std::string mon_host
;
196 std::string cephx_key
;
198 json_spirit::mValue root
;
199 if (!json_spirit::read(out_bl
.to_str(), root
)) {
200 derr
<< ": invalid config-key JSON" << dendl
;
204 auto &root_obj
= root
.get_obj();
205 mon_host
= root_obj
.at("mon_host").get_str();
206 cephx_key
= root_obj
.at("key").get_str();
207 dout(0) << ": remote monitor host=" << mon_host
<< dendl
;
208 } catch (std::runtime_error
&) {
209 derr
<< ": unexpected JSON received" << dendl
;
214 r
= connect(remote_client
, remote_cluster
, &m_remote_cluster
, mon_host
, cephx_key
);
216 derr
<< ": error connecting to remote cluster: " << cpp_strerror(r
)
221 r
= mount(m_remote_cluster
, remote_filesystem
, false, &m_remote_mount
);
223 m_remote_cluster
.reset();
224 derr
<< ": error mounting remote filesystem=" << remote_filesystem
<< dendl
;
228 std::scoped_lock
locker(m_lock
);
229 auto nr_replayers
= g_ceph_context
->_conf
.get_val
<uint64_t>(
230 "cephfs_mirror_max_concurrent_directory_syncs");
231 dout(20) << ": spawning " << nr_replayers
<< " snapshot replayer(s)" << dendl
;
233 while (nr_replayers
-- > 0) {
234 std::unique_ptr
<SnapshotReplayerThread
> replayer(
235 new SnapshotReplayerThread(this));
236 std::string
name("replayer-" + stringify(nr_replayers
));
237 replayer
->create(name
.c_str());
238 m_replayers
.push_back(std::move(replayer
));
244 void PeerReplayer::shutdown() {
248 std::scoped_lock
locker(m_lock
);
249 ceph_assert(!m_stopping
);
254 for (auto &replayer
: m_replayers
) {
258 ceph_unmount(m_remote_mount
);
259 ceph_release(m_remote_mount
);
260 m_remote_mount
= nullptr;
261 m_remote_cluster
.reset();
264 void PeerReplayer::add_directory(string_view dir_root
) {
265 dout(20) << ": dir_root=" << dir_root
<< dendl
;
267 std::scoped_lock
locker(m_lock
);
268 m_directories
.emplace_back(dir_root
);
269 m_snap_sync_stats
.emplace(dir_root
, SnapSyncStat());
273 void PeerReplayer::remove_directory(string_view dir_root
) {
274 dout(20) << ": dir_root=" << dir_root
<< dendl
;
275 auto _dir_root
= std::string(dir_root
);
277 std::scoped_lock
locker(m_lock
);
278 auto it
= std::find(m_directories
.begin(), m_directories
.end(), _dir_root
);
279 if (it
!= m_directories
.end()) {
280 m_directories
.erase(it
);
283 auto it1
= m_registered
.find(_dir_root
);
284 if (it1
== m_registered
.end()) {
285 m_snap_sync_stats
.erase(_dir_root
);
287 it1
->second
.canceled
= true;
292 boost::optional
<std::string
> PeerReplayer::pick_directory() {
295 auto now
= clock::now();
296 auto retry_timo
= g_ceph_context
->_conf
.get_val
<uint64_t>(
297 "cephfs_mirror_retry_failed_directories_interval");
299 boost::optional
<std::string
> candidate
;
300 for (auto &dir_root
: m_directories
) {
301 auto &sync_stat
= m_snap_sync_stats
.at(dir_root
);
302 if (sync_stat
.failed
) {
303 std::chrono::duration
<double> d
= now
- *sync_stat
.last_failed
;
304 if (d
.count() < retry_timo
) {
308 if (!m_registered
.count(dir_root
)) {
309 candidate
= dir_root
;
314 std::rotate(m_directories
.begin(), m_directories
.begin() + 1, m_directories
.end());
318 int PeerReplayer::register_directory(const std::string
&dir_root
,
319 SnapshotReplayerThread
*replayer
) {
320 dout(20) << ": dir_root=" << dir_root
<< dendl
;
321 ceph_assert(m_registered
.find(dir_root
) == m_registered
.end());
323 DirRegistry registry
;
324 int r
= try_lock_directory(dir_root
, replayer
, ®istry
);
329 dout(5) << ": dir_root=" << dir_root
<< " registered with replayer="
330 << replayer
<< dendl
;
331 m_registered
.emplace(dir_root
, std::move(registry
));
335 void PeerReplayer::unregister_directory(const std::string
&dir_root
) {
336 dout(20) << ": dir_root=" << dir_root
<< dendl
;
338 auto it
= m_registered
.find(dir_root
);
339 ceph_assert(it
!= m_registered
.end());
341 unlock_directory(it
->first
, it
->second
);
342 m_registered
.erase(it
);
343 if (std::find(m_directories
.begin(), m_directories
.end(), dir_root
) == m_directories
.end()) {
344 m_snap_sync_stats
.erase(dir_root
);
348 int PeerReplayer::try_lock_directory(const std::string
&dir_root
,
349 SnapshotReplayerThread
*replayer
, DirRegistry
*registry
) {
350 dout(20) << ": dir_root=" << dir_root
<< dendl
;
352 int r
= ceph_open(m_remote_mount
, dir_root
.c_str(), O_RDONLY
| O_DIRECTORY
, 0);
353 if (r
< 0 && r
!= -ENOENT
) {
354 derr
<< ": failed to open remote dir_root=" << dir_root
<< ": " << cpp_strerror(r
)
360 // we snap under dir_root, so mode does not matter much
361 r
= ceph_mkdirs(m_remote_mount
, dir_root
.c_str(), 0755);
363 derr
<< ": failed to create remote directory=" << dir_root
<< ": " << cpp_strerror(r
)
368 r
= ceph_open(m_remote_mount
, dir_root
.c_str(), O_RDONLY
| O_DIRECTORY
, 0);
370 derr
<< ": failed to open remote dir_root=" << dir_root
<< ": " << cpp_strerror(r
)
377 r
= ceph_flock(m_remote_mount
, fd
, LOCK_EX
| LOCK_NB
, (uint64_t)replayer
->get_thread_id());
379 if (r
== -EWOULDBLOCK
) {
380 dout(5) << ": dir_root=" << dir_root
<< " is locked by cephfs-mirror, "
381 << "will retry again" << dendl
;
383 derr
<< ": failed to lock dir_root=" << dir_root
<< ": " << cpp_strerror(r
)
387 if (ceph_close(m_remote_mount
, fd
) < 0) {
388 derr
<< ": failed to close (cleanup) remote dir_root=" << dir_root
<< ": "
389 << cpp_strerror(r
) << dendl
;
394 dout(10) << ": dir_root=" << dir_root
<< " locked" << dendl
;
397 registry
->replayer
= replayer
;
401 void PeerReplayer::unlock_directory(const std::string
&dir_root
, const DirRegistry
®istry
) {
402 dout(20) << ": dir_root=" << dir_root
<< dendl
;
404 int r
= ceph_flock(m_remote_mount
, registry
.fd
, LOCK_UN
,
405 (uint64_t)registry
.replayer
->get_thread_id());
407 derr
<< ": failed to unlock remote dir_root=" << dir_root
<< ": " << cpp_strerror(r
)
412 r
= ceph_close(m_remote_mount
, registry
.fd
);
414 derr
<< ": failed to close remote dir_root=" << dir_root
<< ": " << cpp_strerror(r
)
418 dout(10) << ": dir_root=" << dir_root
<< " unlocked" << dendl
;
421 int PeerReplayer::build_snap_map(const std::string
&dir_root
,
422 std::map
<uint64_t, std::string
> *snap_map
, bool is_remote
) {
423 auto snap_dir
= snapshot_dir_path(m_cct
, dir_root
);
424 dout(20) << ": dir_root=" << dir_root
<< ", snap_dir=" << snap_dir
425 << ", is_remote=" << is_remote
<< dendl
;
427 auto lr_str
= is_remote
? "remote" : "local";
428 auto mnt
= is_remote
? m_remote_mount
: m_local_mount
;
430 ceph_dir_result
*dirp
= nullptr;
431 int r
= ceph_opendir(mnt
, snap_dir
.c_str(), &dirp
);
433 if (is_remote
&& r
== -ENOENT
) {
436 derr
<< ": failed to open " << lr_str
<< " snap directory=" << snap_dir
437 << ": " << cpp_strerror(r
) << dendl
;
441 std::set
<std::string
> snaps
;
442 auto entry
= ceph_readdir(mnt
, dirp
);
443 while (entry
!= NULL
) {
444 auto d_name
= std::string(entry
->d_name
);
445 dout(20) << ": entry=" << d_name
<< dendl
;
446 if (d_name
!= "." && d_name
!= ".." && d_name
.rfind("_", 0) != 0) {
447 snaps
.emplace(d_name
);
450 entry
= ceph_readdir(mnt
, dirp
);
454 for (auto &snap
: snaps
) {
456 auto snap_path
= snapshot_path(snap_dir
, snap
);
457 r
= ceph_get_snap_info(mnt
, snap_path
.c_str(), &info
);
459 derr
<< ": failed to fetch " << lr_str
<< " snap info for snap_path=" << snap_path
460 << ": " << cpp_strerror(r
) << dendl
;
467 if (!info
.nr_snap_metadata
) {
468 derr
<< ": snap_path=" << snap_path
<< " has invalid metadata in remote snapshot"
472 auto metadata
= decode_snap_metadata(info
.snap_metadata
, info
.nr_snap_metadata
);
473 dout(20) << ": snap_path=" << snap_path
<< ", metadata=" << metadata
<< dendl
;
474 auto it
= metadata
.find(PRIMARY_SNAP_ID_KEY
);
475 if (it
== metadata
.end()) {
476 derr
<< ": snap_path=" << snap_path
<< " has missing \"" << PRIMARY_SNAP_ID_KEY
477 << "\" in metadata" << dendl
;
480 snap_id
= std::stoull(it
->second
);
482 ceph_free_snap_info_buffer(&info
);
491 snap_map
->emplace(snap_id
, snap
);
494 r
= ceph_closedir(mnt
, dirp
);
496 derr
<< ": failed to close " << lr_str
<< " snap directory=" << snap_dir
497 << ": " << cpp_strerror(r
) << dendl
;
500 dout(10) << ": " << lr_str
<< " snap_map=" << *snap_map
<< dendl
;
504 int PeerReplayer::propagate_snap_deletes(const std::string
&dir_root
,
505 const std::set
<std::string
> &snaps
) {
506 dout(5) << ": dir_root=" << dir_root
<< ", deleted snapshots=" << snaps
<< dendl
;
508 for (auto &snap
: snaps
) {
509 dout(20) << ": deleting dir_root=" << dir_root
<< ", snapshot=" << snap
511 int r
= ceph_rmsnap(m_remote_mount
, dir_root
.c_str(), snap
.c_str());
513 derr
<< ": failed to delete remote snap dir_root=" << dir_root
514 << ", snapshot=" << snaps
<< ": " << cpp_strerror(r
) << dendl
;
517 inc_deleted_snap(dir_root
);
523 int PeerReplayer::propagate_snap_renames(
524 const std::string
&dir_root
,
525 const std::set
<std::pair
<std::string
,std::string
>> &snaps
) {
526 dout(10) << ": dir_root=" << dir_root
<< ", renamed snapshots=" << snaps
<< dendl
;
528 for (auto &snapp
: snaps
) {
529 auto from
= snapshot_path(m_cct
, dir_root
, snapp
.first
);
530 auto to
= snapshot_path(m_cct
, dir_root
, snapp
.second
);
531 dout(20) << ": renaming dir_root=" << dir_root
<< ", snapshot from="
532 << from
<< ", to=" << to
<< dendl
;
533 int r
= ceph_rename(m_remote_mount
, from
.c_str(), to
.c_str());
535 derr
<< ": failed to rename remote snap dir_root=" << dir_root
536 << ", snapshot from =" << from
<< ", to=" << to
<< ": "
537 << cpp_strerror(r
) << dendl
;
540 inc_renamed_snap(dir_root
);
546 int PeerReplayer::remote_mkdir(const std::string
&epath
, const struct ceph_statx
&stx
,
547 const FHandles
&fh
) {
548 dout(10) << ": remote epath=" << epath
<< dendl
;
550 int r
= ceph_mkdirat(m_remote_mount
, fh
.r_fd_dir_root
, epath
.c_str(), stx
.stx_mode
& ~S_IFDIR
);
551 if (r
< 0 && r
!= -EEXIST
) {
552 derr
<< ": failed to create remote directory=" << epath
<< ": " << cpp_strerror(r
)
557 r
= ceph_chownat(m_remote_mount
, fh
.r_fd_dir_root
, epath
.c_str(), stx
.stx_uid
, stx
.stx_gid
,
558 AT_SYMLINK_NOFOLLOW
);
560 derr
<< ": failed to chown remote directory=" << epath
<< ": " << cpp_strerror(r
)
565 r
= ceph_chmodat(m_remote_mount
, fh
.r_fd_dir_root
, epath
.c_str(), stx
.stx_mode
& ~S_IFMT
,
566 AT_SYMLINK_NOFOLLOW
);
568 derr
<< ": failed to chmod remote directory=" << epath
<< ": " << cpp_strerror(r
)
573 struct timespec times
[] = {{stx
.stx_atime
.tv_sec
, stx
.stx_atime
.tv_nsec
},
574 {stx
.stx_mtime
.tv_sec
, stx
.stx_mtime
.tv_nsec
}};
575 r
= ceph_utimensat(m_remote_mount
, fh
.r_fd_dir_root
, epath
.c_str(), times
, AT_SYMLINK_NOFOLLOW
);
577 derr
<< ": failed to change [am]time on remote directory=" << epath
<< ": "
578 << cpp_strerror(r
) << dendl
;
585 #define NR_IOVECS 8 // # iovecs
586 #define IOVEC_SIZE (8 * 1024 * 1024) // buffer size for each iovec
587 int PeerReplayer::copy_to_remote(const std::string
&dir_root
, const std::string
&epath
,
588 const struct ceph_statx
&stx
, const FHandles
&fh
) {
589 dout(10) << ": dir_root=" << dir_root
<< ", epath=" << epath
<< dendl
;
593 struct iovec iov
[NR_IOVECS
];
595 int r
= ceph_openat(m_local_mount
, fh
.c_fd
, epath
.c_str(), O_RDONLY
| O_NOFOLLOW
, 0);
597 derr
<< ": failed to open local file path=" << epath
<< ": "
598 << cpp_strerror(r
) << dendl
;
603 r
= ceph_openat(m_remote_mount
, fh
.r_fd_dir_root
, epath
.c_str(),
604 O_CREAT
| O_TRUNC
| O_WRONLY
| O_NOFOLLOW
, stx
.stx_mode
);
606 derr
<< ": failed to create remote file path=" << epath
<< ": "
607 << cpp_strerror(r
) << dendl
;
612 ptr
= malloc(NR_IOVECS
* IOVEC_SIZE
);
615 derr
<< ": failed to allocate memory" << dendl
;
616 goto close_remote_fd
;
620 if (should_backoff(dir_root
, &r
)) {
621 dout(0) << ": backing off r=" << r
<< dendl
;
625 for (int i
= 0; i
< NR_IOVECS
; ++i
) {
626 iov
[i
].iov_base
= (char*)ptr
+ IOVEC_SIZE
*i
;
627 iov
[i
].iov_len
= IOVEC_SIZE
;
630 r
= ceph_preadv(m_local_mount
, l_fd
, iov
, NR_IOVECS
, -1);
632 derr
<< ": failed to read local file path=" << epath
<< ": "
633 << cpp_strerror(r
) << dendl
;
640 int iovs
= (int)(r
/ IOVEC_SIZE
);
641 int t
= r
% IOVEC_SIZE
;
643 iov
[iovs
].iov_len
= t
;
647 r
= ceph_pwritev(m_remote_mount
, r_fd
, iov
, iovs
, -1);
649 derr
<< ": failed to write remote file path=" << epath
<< ": "
650 << cpp_strerror(r
) << dendl
;
656 r
= ceph_fsync(m_remote_mount
, r_fd
, 0);
658 derr
<< ": failed to sync data for file path=" << epath
<< ": "
659 << cpp_strerror(r
) << dendl
;
666 if (ceph_close(m_remote_mount
, r_fd
) < 0) {
667 derr
<< ": failed to close remote fd path=" << epath
<< ": " << cpp_strerror(r
)
673 if (ceph_close(m_local_mount
, l_fd
) < 0) {
674 derr
<< ": failed to close local fd path=" << epath
<< ": " << cpp_strerror(r
)
679 return r
== 0 ? 0 : r
;
682 int PeerReplayer::remote_file_op(const std::string
&dir_root
, const std::string
&epath
,
683 const struct ceph_statx
&stx
, const FHandles
&fh
,
684 bool need_data_sync
, bool need_attr_sync
) {
685 dout(10) << ": dir_root=" << dir_root
<< ", epath=" << epath
<< ", need_data_sync=" << need_data_sync
686 << ", need_attr_sync=" << need_attr_sync
<< dendl
;
689 if (need_data_sync
) {
690 if (S_ISREG(stx
.stx_mode
)) {
691 r
= copy_to_remote(dir_root
, epath
, stx
, fh
);
693 derr
<< ": failed to copy path=" << epath
<< ": " << cpp_strerror(r
) << dendl
;
696 } else if (S_ISLNK(stx
.stx_mode
)) {
697 // free the remote link before relinking
698 r
= ceph_unlinkat(m_remote_mount
, fh
.r_fd_dir_root
, epath
.c_str(), 0);
699 if (r
< 0 && r
!= -ENOENT
) {
700 derr
<< ": failed to remove remote symlink=" << epath
<< dendl
;
703 char *target
= (char *)alloca(stx
.stx_size
+1);
704 r
= ceph_readlinkat(m_local_mount
, fh
.c_fd
, epath
.c_str(), target
, stx
.stx_size
);
706 derr
<< ": failed to readlink local path=" << epath
<< ": " << cpp_strerror(r
)
711 target
[stx
.stx_size
] = '\0';
712 r
= ceph_symlinkat(m_remote_mount
, target
, fh
.r_fd_dir_root
, epath
.c_str());
713 if (r
< 0 && r
!= EEXIST
) {
714 derr
<< ": failed to symlink remote path=" << epath
<< " to target=" << target
715 << ": " << cpp_strerror(r
) << dendl
;
719 dout(5) << ": skipping entry=" << epath
<< ": unsupported mode=" << stx
.stx_mode
725 if (need_attr_sync
) {
726 r
= ceph_chownat(m_remote_mount
, fh
.r_fd_dir_root
, epath
.c_str(), stx
.stx_uid
, stx
.stx_gid
,
727 AT_SYMLINK_NOFOLLOW
);
729 derr
<< ": failed to chown remote directory=" << epath
<< ": " << cpp_strerror(r
)
734 r
= ceph_chmodat(m_remote_mount
, fh
.r_fd_dir_root
, epath
.c_str(), stx
.stx_mode
& ~S_IFMT
,
735 AT_SYMLINK_NOFOLLOW
);
737 derr
<< ": failed to chmod remote directory=" << epath
<< ": " << cpp_strerror(r
)
742 struct timespec times
[] = {{stx
.stx_atime
.tv_sec
, stx
.stx_atime
.tv_nsec
},
743 {stx
.stx_mtime
.tv_sec
, stx
.stx_mtime
.tv_nsec
}};
744 r
= ceph_utimensat(m_remote_mount
, fh
.r_fd_dir_root
, epath
.c_str(), times
, AT_SYMLINK_NOFOLLOW
);
746 derr
<< ": failed to change [am]time on remote directory=" << epath
<< ": "
747 << cpp_strerror(r
) << dendl
;
755 int PeerReplayer::cleanup_remote_dir(const std::string
&dir_root
,
756 const std::string
&epath
, const FHandles
&fh
) {
757 dout(20) << ": dir_root=" << dir_root
<< ", epath=" << epath
760 struct ceph_statx tstx
;
761 int r
= ceph_statxat(m_remote_mount
, fh
.r_fd_dir_root
, epath
.c_str(), &tstx
,
762 CEPH_STATX_MODE
| CEPH_STATX_UID
| CEPH_STATX_GID
|
763 CEPH_STATX_SIZE
| CEPH_STATX_ATIME
| CEPH_STATX_MTIME
,
764 AT_NO_ATTR_SYNC
| AT_SYMLINK_NOFOLLOW
);
766 derr
<< ": failed to stat remote directory=" << epath
<< ": "
767 << cpp_strerror(r
) << dendl
;
771 ceph_dir_result
*tdirp
;
772 r
= opendirat(m_remote_mount
, fh
.r_fd_dir_root
, epath
, AT_SYMLINK_NOFOLLOW
,
775 derr
<< ": failed to open remote directory=" << epath
<< ": "
776 << cpp_strerror(r
) << dendl
;
780 std::stack
<SyncEntry
> rm_stack
;
781 rm_stack
.emplace(SyncEntry(epath
, tdirp
, tstx
));
782 while (!rm_stack
.empty()) {
783 if (should_backoff(dir_root
, &r
)) {
784 dout(0) << ": backing off r=" << r
<< dendl
;
788 dout(20) << ": " << rm_stack
.size() << " entries in stack" << dendl
;
790 auto &entry
= rm_stack
.top();
791 dout(20) << ": top of stack path=" << entry
.epath
<< dendl
;
792 if (entry
.is_directory()) {
793 struct ceph_statx stx
;
796 r
= ceph_readdirplus_r(m_remote_mount
, entry
.dirp
, &de
, &stx
,
797 CEPH_STATX_MODE
, AT_NO_ATTR_SYNC
| AT_SYMLINK_NOFOLLOW
, NULL
);
799 derr
<< ": failed to read remote directory=" << entry
.epath
<< dendl
;
806 auto d_name
= std::string(de
.d_name
);
807 if (d_name
!= "." && d_name
!= "..") {
814 r
= ceph_unlinkat(m_remote_mount
, fh
.r_fd_dir_root
, entry
.epath
.c_str(), AT_REMOVEDIR
);
816 derr
<< ": failed to remove remote directory=" << entry
.epath
<< ": "
817 << cpp_strerror(r
) << dendl
;
821 dout(10) << ": done for remote directory=" << entry
.epath
<< dendl
;
822 if (ceph_closedir(m_remote_mount
, entry
.dirp
) < 0) {
823 derr
<< ": failed to close remote directory=" << entry
.epath
<< dendl
;
832 auto epath
= entry_path(entry
.epath
, e_name
);
833 if (S_ISDIR(stx
.stx_mode
)) {
834 ceph_dir_result
*dirp
;
835 r
= opendirat(m_remote_mount
, fh
.r_fd_dir_root
, epath
, AT_SYMLINK_NOFOLLOW
,
838 derr
<< ": failed to open remote directory=" << epath
<< ": "
839 << cpp_strerror(r
) << dendl
;
842 rm_stack
.emplace(SyncEntry(epath
, dirp
, stx
));
844 rm_stack
.emplace(SyncEntry(epath
, stx
));
847 r
= ceph_unlinkat(m_remote_mount
, fh
.r_fd_dir_root
, entry
.epath
.c_str(), 0);
849 derr
<< ": failed to remove remote directory=" << entry
.epath
<< ": "
850 << cpp_strerror(r
) << dendl
;
853 dout(10) << ": done for remote file=" << entry
.epath
<< dendl
;
858 while (!rm_stack
.empty()) {
859 auto &entry
= rm_stack
.top();
860 if (entry
.is_directory()) {
861 dout(20) << ": closing remote directory=" << entry
.epath
<< dendl
;
862 if (ceph_closedir(m_remote_mount
, entry
.dirp
) < 0) {
863 derr
<< ": failed to close remote directory=" << entry
.epath
<< dendl
;
873 int PeerReplayer::should_sync_entry(const std::string
&epath
, const struct ceph_statx
&cstx
,
874 const FHandles
&fh
, bool *need_data_sync
, bool *need_attr_sync
) {
875 dout(10) << ": epath=" << epath
<< dendl
;
877 *need_data_sync
= false;
878 *need_attr_sync
= false;
879 struct ceph_statx pstx
;
880 int r
= ceph_statxat(fh
.p_mnt
, fh
.p_fd
, epath
.c_str(), &pstx
,
881 CEPH_STATX_MODE
| CEPH_STATX_UID
| CEPH_STATX_GID
|
882 CEPH_STATX_SIZE
| CEPH_STATX_CTIME
| CEPH_STATX_MTIME
,
883 AT_NO_ATTR_SYNC
| AT_SYMLINK_NOFOLLOW
);
884 if (r
< 0 && r
!= -ENOENT
&& r
!= -ENOTDIR
) {
885 derr
<< ": failed to stat prev entry= " << epath
<< ": " << cpp_strerror(r
)
891 // inode does not exist in prev snapshot or file type has changed
892 // (file was S_IFREG earlier, S_IFDIR now).
893 dout(5) << ": entry=" << epath
<< ", r=" << r
<< dendl
;
894 *need_data_sync
= true;
895 *need_attr_sync
= true;
899 dout(10) << ": local cur statx: mode=" << cstx
.stx_mode
<< ", uid=" << cstx
.stx_uid
900 << ", gid=" << cstx
.stx_gid
<< ", size=" << cstx
.stx_size
<< ", ctime="
901 << cstx
.stx_ctime
<< ", mtime=" << cstx
.stx_mtime
<< dendl
;
902 dout(10) << ": local prev statx: mode=" << pstx
.stx_mode
<< ", uid=" << pstx
.stx_uid
903 << ", gid=" << pstx
.stx_gid
<< ", size=" << pstx
.stx_size
<< ", ctime="
904 << pstx
.stx_ctime
<< ", mtime=" << pstx
.stx_mtime
<< dendl
;
905 if ((cstx
.stx_mode
& S_IFMT
) != (pstx
.stx_mode
& S_IFMT
)) {
906 dout(5) << ": entry=" << epath
<< " has mode mismatch" << dendl
;
907 *need_data_sync
= true;
908 *need_attr_sync
= true;
910 *need_data_sync
= (cstx
.stx_size
!= pstx
.stx_size
) || (cstx
.stx_mtime
!= pstx
.stx_mtime
);
911 *need_attr_sync
= (cstx
.stx_ctime
!= pstx
.stx_ctime
);
917 int PeerReplayer::propagate_deleted_entries(const std::string
&dir_root
,
918 const std::string
&epath
, const FHandles
&fh
) {
919 dout(10) << ": dir_root=" << dir_root
<< ", epath=" << epath
<< dendl
;
921 ceph_dir_result
*dirp
;
922 int r
= opendirat(fh
.p_mnt
, fh
.p_fd
, epath
, AT_SYMLINK_NOFOLLOW
, &dirp
);
925 dout(5) << ": epath=" << epath
<< " is a symbolic link -- mode sync"
926 << " done when traversing parent" << dendl
;
930 dout(5) << ": epath=" << epath
<< " is not a directory -- mode sync"
931 << " done when traversing parent" << dendl
;
935 dout(5) << ": epath=" << epath
<< " missing in previous-snap/remote dir-root"
941 struct dirent
*dire
= (struct dirent
*)alloca(512 * sizeof(struct dirent
));
943 if (should_backoff(dir_root
, &r
)) {
944 dout(0) << ": backing off r=" << r
<< dendl
;
948 int len
= ceph_getdents(fh
.p_mnt
, dirp
, (char *)dire
, 512);
950 derr
<< ": failed to read directory entries: " << cpp_strerror(len
) << dendl
;
952 // flip errno to signal that we got an err (possible the
953 // snapshot getting deleted in midst).
960 dout(10) << ": reached EOD" << dendl
;
963 int nr
= len
/ sizeof(struct dirent
);
964 for (int i
= 0; i
< nr
; ++i
) {
965 if (should_backoff(dir_root
, &r
)) {
966 dout(0) << ": backing off r=" << r
<< dendl
;
969 std::string d_name
= std::string(dire
[i
].d_name
);
970 if (d_name
== "." || d_name
== "..") {
974 struct ceph_statx pstx
;
975 auto dpath
= entry_path(epath
, d_name
);
976 r
= ceph_statxat(fh
.p_mnt
, fh
.p_fd
, dpath
.c_str(), &pstx
,
977 CEPH_STATX_MODE
, AT_NO_ATTR_SYNC
| AT_SYMLINK_NOFOLLOW
);
979 derr
<< ": failed to stat (prev) directory=" << dpath
<< ": "
980 << cpp_strerror(r
) << dendl
;
981 // flip errno to signal that we got an err (possible the
982 // snapshot getting deleted in midst).
989 struct ceph_statx cstx
;
990 r
= ceph_statxat(m_local_mount
, fh
.c_fd
, dpath
.c_str(), &cstx
,
991 CEPH_STATX_MODE
, AT_NO_ATTR_SYNC
| AT_SYMLINK_NOFOLLOW
);
992 if (r
< 0 && r
!= -ENOENT
) {
993 derr
<< ": failed to stat local (cur) directory=" << dpath
<< ": "
994 << cpp_strerror(r
) << dendl
;
998 bool purge_remote
= true;
1000 // directory entry present in both snapshots -- check inode
1002 if ((pstx
.stx_mode
& S_IFMT
) == (cstx
.stx_mode
& S_IFMT
)) {
1003 dout(5) << ": mode matches for entry=" << d_name
<< dendl
;
1004 purge_remote
= false;
1006 dout(5) << ": mode mismatch for entry=" << d_name
<< dendl
;
1009 dout(5) << ": entry=" << d_name
<< " missing in current snapshot" << dendl
;
1013 dout(5) << ": purging remote entry=" << dpath
<< dendl
;
1014 if (S_ISDIR(pstx
.stx_mode
)) {
1015 r
= cleanup_remote_dir(dir_root
, dpath
, fh
);
1017 r
= ceph_unlinkat(m_remote_mount
, fh
.r_fd_dir_root
, dpath
.c_str(), 0);
1020 if (r
< 0 && r
!= -ENOENT
) {
1021 derr
<< ": failed to cleanup remote entry=" << d_name
<< ": "
1022 << cpp_strerror(r
) << dendl
;
1029 ceph_closedir(fh
.p_mnt
, dirp
);
1033 int PeerReplayer::open_dir(MountRef mnt
, const std::string
&dir_path
,
1034 boost::optional
<uint64_t> snap_id
) {
1035 dout(20) << ": dir_path=" << dir_path
<< dendl
;
1037 dout(20) << ": expected snapshot id=" << *snap_id
<< dendl
;
1040 int fd
= ceph_open(mnt
, dir_path
.c_str(), O_DIRECTORY
| O_RDONLY
, 0);
1042 derr
<< ": cannot open dir_path=" << dir_path
<< ": " << cpp_strerror(fd
)
1052 int r
= ceph_get_snap_info(mnt
, dir_path
.c_str(), &info
);
1054 derr
<< ": failed to fetch snap_info for path=" << dir_path
1055 << ": " << cpp_strerror(r
) << dendl
;
1056 ceph_close(mnt
, fd
);
1060 if (info
.id
!= *snap_id
) {
1061 dout(5) << ": got mismatching snapshot id for path=" << dir_path
<< " (" << info
.id
1062 << " vs " << *snap_id
<< ") -- possible recreate" << dendl
;
1063 ceph_close(mnt
, fd
);
1070 int PeerReplayer::pre_sync_check_and_open_handles(
1071 const std::string
&dir_root
,
1072 const Snapshot
¤t
, boost::optional
<Snapshot
> prev
,
1074 dout(20) << ": dir_root=" << dir_root
<< ", current=" << current
<< dendl
;
1076 dout(20) << ": prev=" << prev
<< dendl
;
1079 auto cur_snap_path
= snapshot_path(m_cct
, dir_root
, current
.first
);
1080 auto fd
= open_dir(m_local_mount
, cur_snap_path
, current
.second
);
1085 // current snapshot file descriptor
1090 mnt
= m_local_mount
;
1091 auto prev_snap_path
= snapshot_path(m_cct
, dir_root
, (*prev
).first
);
1092 fd
= open_dir(mnt
, prev_snap_path
, (*prev
).second
);
1094 mnt
= m_remote_mount
;
1095 fd
= open_dir(mnt
, dir_root
, boost::none
);
1099 if (!prev
|| fd
!= -ENOENT
) {
1100 ceph_close(m_local_mount
, fh
->c_fd
);
1104 // ENOENT of previous snap
1105 dout(5) << ": previous snapshot=" << *prev
<< " missing" << dendl
;
1106 mnt
= m_remote_mount
;
1107 fd
= open_dir(mnt
, dir_root
, boost::none
);
1109 ceph_close(m_local_mount
, fh
->c_fd
);
1114 // "previous" snapshot or dir_root file descriptor
1119 std::scoped_lock
locker(m_lock
);
1120 auto it
= m_registered
.find(dir_root
);
1121 ceph_assert(it
!= m_registered
.end());
1122 fh
->r_fd_dir_root
= it
->second
.fd
;
1125 dout(5) << ": using " << ((fh
->p_mnt
== m_local_mount
) ? "local (previous) snapshot" : "remote dir_root")
1126 << " for incremental transfer" << dendl
;
1130 void PeerReplayer::post_sync_close_handles(const FHandles
&fh
) {
1133 // @FHandles.r_fd_dir_root is closed in @unregister_directory since
1134 // its used to acquire an exclusive lock on remote dir_root.
1135 ceph_close(m_local_mount
, fh
.c_fd
);
1136 ceph_close(fh
.p_mnt
, fh
.p_fd
);
1139 int PeerReplayer::do_synchronize(const std::string
&dir_root
, const Snapshot
¤t
,
1140 boost::optional
<Snapshot
> prev
) {
1141 dout(20) << ": dir_root=" << dir_root
<< ", current=" << current
<< dendl
;
1143 dout(20) << ": incremental sync check from prev=" << prev
<< dendl
;
1147 int r
= pre_sync_check_and_open_handles(dir_root
, current
, prev
, &fh
);
1149 dout(5) << ": cannot proceeed with sync: " << cpp_strerror(r
) << dendl
;
1153 BOOST_SCOPE_EXIT_ALL( (this)(&fh
) ) {
1154 post_sync_close_handles(fh
);
1157 // record that we are going to "dirty" the data under this
1159 auto snap_id_str
{stringify(current
.second
)};
1160 r
= ceph_fsetxattr(m_remote_mount
, fh
.r_fd_dir_root
, "ceph.mirror.dirty_snap_id",
1161 snap_id_str
.c_str(), snap_id_str
.size(), 0);
1163 derr
<< ": error setting \"ceph.mirror.dirty_snap_id\" on dir_root=" << dir_root
1164 << ": " << cpp_strerror(r
) << dendl
;
1168 struct ceph_statx tstx
;
1169 r
= ceph_fstatx(m_local_mount
, fh
.c_fd
, &tstx
,
1170 CEPH_STATX_MODE
| CEPH_STATX_UID
| CEPH_STATX_GID
|
1171 CEPH_STATX_SIZE
| CEPH_STATX_ATIME
| CEPH_STATX_MTIME
,
1172 AT_NO_ATTR_SYNC
| AT_SYMLINK_NOFOLLOW
);
1174 derr
<< ": failed to stat snap=" << current
.first
<< ": " << cpp_strerror(r
)
1179 ceph_dir_result
*tdirp
;
1180 r
= ceph_fdopendir(m_local_mount
, fh
.c_fd
, &tdirp
);
1182 derr
<< ": failed to open local snap=" << current
.first
<< ": " << cpp_strerror(r
)
1187 std::stack
<SyncEntry
> sync_stack
;
1188 sync_stack
.emplace(SyncEntry(".", tdirp
, tstx
));
1189 while (!sync_stack
.empty()) {
1190 if (should_backoff(dir_root
, &r
)) {
1191 dout(0) << ": backing off r=" << r
<< dendl
;
1195 dout(20) << ": " << sync_stack
.size() << " entries in stack" << dendl
;
1197 auto &entry
= sync_stack
.top();
1198 dout(20) << ": top of stack path=" << entry
.epath
<< dendl
;
1199 if (entry
.is_directory()) {
1200 // entry is a directory -- propagate deletes for missing entries
1201 // (and changed inode types) to the remote filesystem.
1202 if (!entry
.needs_remote_sync()) {
1203 r
= propagate_deleted_entries(dir_root
, entry
.epath
, fh
);
1204 if (r
< 0 && r
!= -ENOENT
) {
1205 derr
<< ": failed to propagate missing dirs: " << cpp_strerror(r
) << dendl
;
1208 entry
.set_remote_synced();
1211 struct ceph_statx stx
;
1214 r
= ceph_readdirplus_r(m_local_mount
, entry
.dirp
, &de
, &stx
,
1215 CEPH_STATX_MODE
| CEPH_STATX_UID
| CEPH_STATX_GID
|
1216 CEPH_STATX_SIZE
| CEPH_STATX_ATIME
| CEPH_STATX_MTIME
,
1217 AT_NO_ATTR_SYNC
| AT_SYMLINK_NOFOLLOW
, NULL
);
1219 derr
<< ": failed to local read directory=" << entry
.epath
<< dendl
;
1226 auto d_name
= std::string(de
.d_name
);
1227 if (d_name
!= "." && d_name
!= "..") {
1234 dout(10) << ": done for directory=" << entry
.epath
<< dendl
;
1235 if (ceph_closedir(m_local_mount
, entry
.dirp
) < 0) {
1236 derr
<< ": failed to close local directory=" << entry
.epath
<< dendl
;
1245 auto epath
= entry_path(entry
.epath
, e_name
);
1246 if (S_ISDIR(stx
.stx_mode
)) {
1247 r
= remote_mkdir(epath
, stx
, fh
);
1251 ceph_dir_result
*dirp
;
1252 r
= opendirat(m_local_mount
, fh
.c_fd
, epath
, AT_SYMLINK_NOFOLLOW
, &dirp
);
1254 derr
<< ": failed to open local directory=" << epath
<< ": "
1255 << cpp_strerror(r
) << dendl
;
1258 sync_stack
.emplace(SyncEntry(epath
, dirp
, stx
));
1260 sync_stack
.emplace(SyncEntry(epath
, stx
));
1263 bool need_data_sync
= true;
1264 bool need_attr_sync
= true;
1265 r
= should_sync_entry(entry
.epath
, entry
.stx
, fh
,
1266 &need_data_sync
, &need_attr_sync
);
1271 dout(5) << ": entry=" << entry
.epath
<< ", data_sync=" << need_data_sync
1272 << ", attr_sync=" << need_attr_sync
<< dendl
;
1273 if (need_data_sync
|| need_attr_sync
) {
1274 r
= remote_file_op(dir_root
, entry
.epath
, entry
.stx
, fh
, need_data_sync
,
1280 dout(10) << ": done for epath=" << entry
.epath
<< dendl
;
1285 while (!sync_stack
.empty()) {
1286 auto &entry
= sync_stack
.top();
1287 if (entry
.is_directory()) {
1288 dout(20) << ": closing local directory=" << entry
.epath
<< dendl
;
1289 if (ceph_closedir(m_local_mount
, entry
.dirp
) < 0) {
1290 derr
<< ": failed to close local directory=" << entry
.epath
<< dendl
;
1300 int PeerReplayer::synchronize(const std::string
&dir_root
, const Snapshot
¤t
,
1301 boost::optional
<Snapshot
> prev
) {
1302 dout(20) << ": dir_root=" << dir_root
<< ", current=" << current
<< dendl
;
1304 dout(20) << ": prev=" << prev
<< dendl
;
1307 int r
= ceph_getxattr(m_remote_mount
, dir_root
.c_str(), "ceph.mirror.dirty_snap_id", nullptr, 0);
1308 if (r
< 0 && r
!= -ENODATA
) {
1309 derr
<< ": failed to fetch primary_snap_id length from dir_root=" << dir_root
1310 << ": " << cpp_strerror(r
) << dendl
;
1314 // no xattr, can't determine which snap the data belongs to!
1316 dout(5) << ": missing \"ceph.mirror.dirty_snap_id\" xattr on remote -- using"
1317 << " incremental sync with remote scan" << dendl
;
1318 r
= do_synchronize(dir_root
, current
, boost::none
);
1321 char *val
= (char *)alloca(xlen
+1);
1322 r
= ceph_getxattr(m_remote_mount
, dir_root
.c_str(), "ceph.mirror.dirty_snap_id", (void*)val
, xlen
);
1324 derr
<< ": failed to fetch \"dirty_snap_id\" for dir_root: " << dir_root
1325 << ": " << cpp_strerror(r
) << dendl
;
1330 uint64_t dirty_snap_id
= atoll(val
);
1332 dout(20) << ": dirty_snap_id: " << dirty_snap_id
<< " vs (" << current
.second
1333 << "," << (prev
? stringify((*prev
).second
) : "~") << ")" << dendl
;
1334 if (prev
&& (dirty_snap_id
== (*prev
).second
|| dirty_snap_id
== current
.second
)) {
1335 dout(5) << ": match -- using incremental sync with local scan" << dendl
;
1336 r
= do_synchronize(dir_root
, current
, prev
);
1338 dout(5) << ": mismatch -- using incremental sync with remote scan" << dendl
;
1339 r
= do_synchronize(dir_root
, current
, boost::none
);
1343 // snap sync failed -- bail out!
1348 auto cur_snap_id_str
{stringify(current
.second
)};
1349 snap_metadata snap_meta
[] = {{PRIMARY_SNAP_ID_KEY
.c_str(), cur_snap_id_str
.c_str()}};
1350 r
= ceph_mksnap(m_remote_mount
, dir_root
.c_str(), current
.first
.c_str(), 0755,
1351 snap_meta
, sizeof(snap_meta
)/sizeof(snap_metadata
));
1353 derr
<< ": failed to snap remote directory dir_root=" << dir_root
1354 << ": " << cpp_strerror(r
) << dendl
;
1360 int PeerReplayer::do_sync_snaps(const std::string
&dir_root
) {
1361 dout(20) << ": dir_root=" << dir_root
<< dendl
;
1363 std::map
<uint64_t, std::string
> local_snap_map
;
1364 std::map
<uint64_t, std::string
> remote_snap_map
;
1366 int r
= build_snap_map(dir_root
, &local_snap_map
);
1368 derr
<< ": failed to build local snap map" << dendl
;
1372 r
= build_snap_map(dir_root
, &remote_snap_map
, true);
1374 derr
<< ": failed to build remote snap map" << dendl
;
1378 // infer deleted and renamed snapshots from local and remote
1380 std::set
<std::string
> snaps_deleted
;
1381 std::set
<std::pair
<std::string
,std::string
>> snaps_renamed
;
1382 for (auto &[primary_snap_id
, snap_name
] : remote_snap_map
) {
1383 auto it
= local_snap_map
.find(primary_snap_id
);
1384 if (it
== local_snap_map
.end()) {
1385 snaps_deleted
.emplace(snap_name
);
1386 } else if (it
->second
!= snap_name
) {
1387 snaps_renamed
.emplace(std::make_pair(snap_name
, it
->second
));
1391 r
= propagate_snap_deletes(dir_root
, snaps_deleted
);
1393 derr
<< ": failed to propgate deleted snapshots" << dendl
;
1397 r
= propagate_snap_renames(dir_root
, snaps_renamed
);
1399 derr
<< ": failed to propgate renamed snapshots" << dendl
;
1403 // start mirroring snapshots from the last snap-id synchronized
1404 uint64_t last_snap_id
= 0;
1405 std::string last_snap_name
;
1406 if (!remote_snap_map
.empty()) {
1407 auto last
= remote_snap_map
.rbegin();
1408 last_snap_id
= last
->first
;
1409 last_snap_name
= last
->second
;
1410 set_last_synced_snap(dir_root
, last_snap_id
, last_snap_name
);
1413 dout(5) << ": last snap-id transferred=" << last_snap_id
<< dendl
;
1414 auto it
= local_snap_map
.upper_bound(last_snap_id
);
1415 if (it
== local_snap_map
.end()) {
1416 dout(20) << ": nothing to synchronize" << dendl
;
1420 auto snaps_per_cycle
= g_ceph_context
->_conf
.get_val
<uint64_t>(
1421 "cephfs_mirror_max_snapshot_sync_per_cycle");
1423 dout(10) << ": synchronizing from snap-id=" << it
->first
<< dendl
;
1424 for (; it
!= local_snap_map
.end(); ++it
) {
1425 set_current_syncing_snap(dir_root
, it
->first
, it
->second
);
1426 auto start
= clock::now();
1427 boost::optional
<Snapshot
> prev
= boost::none
;
1428 if (last_snap_id
!= 0) {
1429 prev
= std::make_pair(last_snap_name
, last_snap_id
);
1431 r
= synchronize(dir_root
, std::make_pair(it
->second
, it
->first
), prev
);
1433 derr
<< ": failed to synchronize dir_root=" << dir_root
1434 << ", snapshot=" << it
->second
<< dendl
;
1435 clear_current_syncing_snap(dir_root
);
1438 std::chrono::duration
<double> duration
= clock::now() - start
;
1439 set_last_synced_stat(dir_root
, it
->first
, it
->second
, duration
.count());
1440 if (--snaps_per_cycle
== 0) {
1444 last_snap_name
= it
->second
;
1445 last_snap_id
= it
->first
;
1451 void PeerReplayer::sync_snaps(const std::string
&dir_root
,
1452 std::unique_lock
<ceph::mutex
> &locker
) {
1453 dout(20) << ": dir_root=" << dir_root
<< dendl
;
1455 int r
= do_sync_snaps(dir_root
);
1457 derr
<< ": failed to sync snapshots for dir_root=" << dir_root
<< dendl
;
1461 _inc_failed_count(dir_root
);
1463 _reset_failed_count(dir_root
);
1467 void PeerReplayer::run(SnapshotReplayerThread
*replayer
) {
1468 dout(10) << ": snapshot replayer=" << replayer
<< dendl
;
1470 time last_directory_scan
= clock::zero();
1471 auto scan_interval
= g_ceph_context
->_conf
.get_val
<uint64_t>(
1472 "cephfs_mirror_directory_scan_interval");
1474 std::unique_lock
locker(m_lock
);
1476 // do not check if client is blocklisted under lock
1477 m_cond
.wait_for(locker
, 1s
, [this]{return is_stopping();});
1478 if (is_stopping()) {
1479 dout(5) << ": exiting" << dendl
;
1485 if (m_fs_mirror
->is_blocklisted()) {
1486 dout(5) << ": exiting as client is blocklisted" << dendl
;
1492 auto now
= clock::now();
1493 std::chrono::duration
<double> timo
= now
- last_directory_scan
;
1494 if (timo
.count() >= scan_interval
&& m_directories
.size()) {
1495 dout(20) << ": trying to pick from " << m_directories
.size() << " directories" << dendl
;
1496 auto dir_root
= pick_directory();
1498 dout(5) << ": picked dir_root=" << *dir_root
<< dendl
;
1499 int r
= register_directory(*dir_root
, replayer
);
1501 sync_snaps(*dir_root
, locker
);
1502 unregister_directory(*dir_root
);
1506 last_directory_scan
= now
;
1511 void PeerReplayer::peer_status(Formatter
*f
) {
1512 std::scoped_lock
locker(m_lock
);
1513 f
->open_object_section("stats");
1514 for (auto &[dir_root
, sync_stat
] : m_snap_sync_stats
) {
1515 f
->open_object_section(dir_root
);
1516 if (sync_stat
.failed
) {
1517 f
->dump_string("state", "failed");
1518 } else if (!sync_stat
.current_syncing_snap
) {
1519 f
->dump_string("state", "idle");
1521 f
->dump_string("state", "syncing");
1522 f
->open_object_section("current_sycning_snap");
1523 f
->dump_unsigned("id", (*sync_stat
.current_syncing_snap
).first
);
1524 f
->dump_string("name", (*sync_stat
.current_syncing_snap
).second
);
1527 if (sync_stat
.last_synced_snap
) {
1528 f
->open_object_section("last_synced_snap");
1529 f
->dump_unsigned("id", (*sync_stat
.last_synced_snap
).first
);
1530 f
->dump_string("name", (*sync_stat
.last_synced_snap
).second
);
1531 if (sync_stat
.last_sync_duration
) {
1532 f
->dump_float("sync_duration", *sync_stat
.last_sync_duration
);
1533 f
->dump_stream("sync_time_stamp") << sync_stat
.last_synced
;
1537 f
->dump_unsigned("snaps_synced", sync_stat
.synced_snap_count
);
1538 f
->dump_unsigned("snaps_deleted", sync_stat
.deleted_snap_count
);
1539 f
->dump_unsigned("snaps_renamed", sync_stat
.renamed_snap_count
);
1540 f
->close_section(); // dir_root
1542 f
->close_section(); // stats
1545 void PeerReplayer::reopen_logs() {
1546 std::scoped_lock
locker(m_lock
);
1548 if (m_remote_cluster
) {
1549 reinterpret_cast<CephContext
*>(m_remote_cluster
->cct())->reopen_logs();
1553 } // namespace mirror
1554 } // namespace cephfs