]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/cephfs_mirror/PeerReplayer.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / tools / cephfs_mirror / PeerReplayer.cc
CommitLineData
f67539c2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include <stack>
5#include <fcntl.h>
6#include <algorithm>
7#include <sys/time.h>
8#include <sys/file.h>
9
10#include "common/admin_socket.h"
11#include "common/ceph_context.h"
12#include "common/debug.h"
13#include "common/errno.h"
14#include "FSMirror.h"
15#include "PeerReplayer.h"
16#include "Utils.h"
17
18#include "json_spirit/json_spirit.h"
19
20#define dout_context g_ceph_context
21#define dout_subsys ceph_subsys_cephfs_mirror
22#undef dout_prefix
23#define dout_prefix *_dout << "cephfs::mirror::PeerReplayer(" \
24 << m_peer.uuid << ") " << __func__
25
26namespace cephfs {
27namespace mirror {
28
29namespace {
30
31const std::string PEER_CONFIG_KEY_PREFIX = "cephfs/mirror/peer";
32
33std::string snapshot_dir_path(CephContext *cct, const std::string &path) {
34 return path + "/" + cct->_conf->client_snapdir;
35}
36
37std::string snapshot_path(const std::string &snap_dir, const std::string &snap_name) {
38 return snap_dir + "/" + snap_name;
39}
40
41std::string snapshot_path(CephContext *cct, const std::string &path, const std::string &snap_name) {
42 return path + "/" + cct->_conf->client_snapdir + "/" + snap_name;
43}
44
45std::string entry_path(const std::string &dir, const std::string &name) {
46 return dir + "/" + name;
47}
48
49std::map<std::string, std::string> decode_snap_metadata(snap_metadata *snap_metadata,
50 size_t nr_snap_metadata) {
51 std::map<std::string, std::string> metadata;
52 for (size_t i = 0; i < nr_snap_metadata; ++i) {
53 metadata.emplace(snap_metadata[i].key, snap_metadata[i].value);
54 }
55
56 return metadata;
57}
58
59std::string peer_config_key(const std::string &fs_name, const std::string &uuid) {
60 return PEER_CONFIG_KEY_PREFIX + "/" + fs_name + "/" + uuid;
61}
62
63class PeerAdminSocketCommand {
64public:
65 virtual ~PeerAdminSocketCommand() {
66 }
67 virtual int call(Formatter *f) = 0;
68};
69
70class StatusCommand : public PeerAdminSocketCommand {
71public:
72 explicit StatusCommand(PeerReplayer *peer_replayer)
73 : peer_replayer(peer_replayer) {
74 }
75
76 int call(Formatter *f) override {
77 peer_replayer->peer_status(f);
78 return 0;
79 }
80
81private:
82 PeerReplayer *peer_replayer;
83};
84
85} // anonymous namespace
86
87class PeerReplayerAdminSocketHook : public AdminSocketHook {
88public:
89 PeerReplayerAdminSocketHook(CephContext *cct, const Filesystem &filesystem,
90 const Peer &peer, PeerReplayer *peer_replayer)
91 : admin_socket(cct->get_admin_socket()) {
92 int r;
93 std::string cmd;
94
95 // mirror peer status format is name@id uuid
96 cmd = "fs mirror peer status "
97 + stringify(filesystem.fs_name) + "@" + stringify(filesystem.fscid)
98 + " "
99 + stringify(peer.uuid);
100 r = admin_socket->register_command(
101 cmd, this, "get peer mirror status");
102 if (r == 0) {
103 commands[cmd] = new StatusCommand(peer_replayer);
104 }
105 }
106
107 ~PeerReplayerAdminSocketHook() override {
108 admin_socket->unregister_commands(this);
109 for (auto &[command, cmdptr] : commands) {
110 delete cmdptr;
111 }
112 }
113
114 int call(std::string_view command, const cmdmap_t& cmdmap,
115 Formatter *f, std::ostream &errss, bufferlist &out) override {
116 auto p = commands.at(std::string(command));
117 return p->call(f);
118 }
119
120private:
121 typedef std::map<std::string, PeerAdminSocketCommand*, std::less<>> Commands;
122
123 AdminSocket *admin_socket;
124 Commands commands;
125};
126
127PeerReplayer::PeerReplayer(CephContext *cct, FSMirror *fs_mirror,
128 RadosRef local_cluster, const Filesystem &filesystem,
129 const Peer &peer, const std::set<std::string, std::less<>> &directories,
130 MountRef mount, ServiceDaemon *service_daemon)
131 : m_cct(cct),
132 m_fs_mirror(fs_mirror),
133 m_local_cluster(local_cluster),
134 m_filesystem(filesystem),
135 m_peer(peer),
136 m_directories(directories.begin(), directories.end()),
137 m_local_mount(mount),
138 m_service_daemon(service_daemon),
139 m_asok_hook(new PeerReplayerAdminSocketHook(cct, filesystem, peer, this)),
140 m_lock(ceph::make_mutex("cephfs::mirror::PeerReplayer::" + stringify(peer.uuid))) {
141 // reset sync stats sent via service daemon
142 m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer,
143 SERVICE_DAEMON_FAILED_DIR_COUNT_KEY, (uint64_t)0);
144 m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer,
145 SERVICE_DAEMON_RECOVERED_DIR_COUNT_KEY, (uint64_t)0);
146}
147
148PeerReplayer::~PeerReplayer() {
149 delete m_asok_hook;
150}
151
152int PeerReplayer::init() {
153 dout(20) << ": initial dir list=[" << m_directories << "]" << dendl;
154 for (auto &dir_path : m_directories) {
155 m_snap_sync_stats.emplace(dir_path, SnapSyncStat());
156 }
157
158 auto &remote_client = m_peer.remote.client_name;
159 auto &remote_cluster = m_peer.remote.cluster_name;
160 auto remote_filesystem = Filesystem{0, m_peer.remote.fs_name};
161
162 std::string key = peer_config_key(m_filesystem.fs_name, m_peer.uuid);
163 std::string cmd =
164 "{"
165 "\"prefix\": \"config-key get\", "
166 "\"key\": \"" + key + "\""
167 "}";
168
169 bufferlist in_bl;
170 bufferlist out_bl;
171
172 int r = m_local_cluster->mon_command(cmd, in_bl, &out_bl, nullptr);
173 dout(5) << ": mon command r=" << r << dendl;
174 if (r < 0 && r != -ENOENT) {
175 return r;
176 }
177
178 std::string mon_host;
179 std::string cephx_key;
180 if (!r) {
181 json_spirit::mValue root;
182 if (!json_spirit::read(out_bl.to_str(), root)) {
183 derr << ": invalid config-key JSON" << dendl;
184 return -EBADMSG;
185 }
186 try {
187 auto &root_obj = root.get_obj();
188 mon_host = root_obj.at("mon_host").get_str();
189 cephx_key = root_obj.at("key").get_str();
190 dout(0) << ": remote monitor host=" << mon_host << dendl;
191 } catch (std::runtime_error&) {
192 derr << ": unexpected JSON received" << dendl;
193 return -EBADMSG;
194 }
195 }
196
197 r = connect(remote_client, remote_cluster, &m_remote_cluster, mon_host, cephx_key);
198 if (r < 0) {
199 derr << ": error connecting to remote cluster: " << cpp_strerror(r)
200 << dendl;
201 return r;
202 }
203
204 r = mount(m_remote_cluster, remote_filesystem, false, &m_remote_mount);
205 if (r < 0) {
206 m_remote_cluster.reset();
207 derr << ": error mounting remote filesystem=" << remote_filesystem << dendl;
208 return r;
209 }
210
211 std::scoped_lock locker(m_lock);
212 auto nr_replayers = g_ceph_context->_conf.get_val<uint64_t>(
213 "cephfs_mirror_max_concurrent_directory_syncs");
214 dout(20) << ": spawning " << nr_replayers << " snapshot replayer(s)" << dendl;
215
216 while (nr_replayers-- > 0) {
217 std::unique_ptr<SnapshotReplayerThread> replayer(
218 new SnapshotReplayerThread(this));
219 std::string name("replayer-" + stringify(nr_replayers));
220 replayer->create(name.c_str());
221 m_replayers.push_back(std::move(replayer));
222 }
223
224 return 0;
225}
226
227void PeerReplayer::shutdown() {
228 dout(20) << dendl;
229
230 {
231 std::scoped_lock locker(m_lock);
232 ceph_assert(!m_stopping);
233 m_stopping = true;
234 m_cond.notify_all();
235 }
236
237 for (auto &replayer : m_replayers) {
238 replayer->join();
239 }
240 m_replayers.clear();
241 ceph_unmount(m_remote_mount);
242 ceph_release(m_remote_mount);
243 m_remote_mount = nullptr;
244 m_remote_cluster.reset();
245}
246
247void PeerReplayer::add_directory(string_view dir_path) {
248 dout(20) << ": dir_path=" << dir_path << dendl;
249
250 std::scoped_lock locker(m_lock);
251 m_directories.emplace_back(dir_path);
252 m_snap_sync_stats.emplace(dir_path, SnapSyncStat());
253 m_cond.notify_all();
254}
255
256void PeerReplayer::remove_directory(string_view dir_path) {
257 dout(20) << ": dir_path=" << dir_path << dendl;
258 auto _dir_path = std::string(dir_path);
259
260 std::scoped_lock locker(m_lock);
261 auto it = std::find(m_directories.begin(), m_directories.end(), _dir_path);
262 if (it != m_directories.end()) {
263 m_directories.erase(it);
264 }
265
266 auto it1 = m_registered.find(_dir_path);
267 if (it1 == m_registered.end()) {
268 m_snap_sync_stats.erase(_dir_path);
269 } else {
270 it1->second.replayer->cancel();
271 }
272 m_cond.notify_all();
273}
274
275boost::optional<std::string> PeerReplayer::pick_directory() {
276 dout(20) << dendl;
277
278 auto now = clock::now();
279 auto retry_timo = g_ceph_context->_conf.get_val<uint64_t>(
280 "cephfs_mirror_retry_failed_directories_interval");
281
282 boost::optional<std::string> candidate;
283 for (auto &dir_path : m_directories) {
284 auto &sync_stat = m_snap_sync_stats.at(dir_path);
285 if (sync_stat.failed) {
286 std::chrono::duration<double> d = now - *sync_stat.last_failed;
287 if (d.count() < retry_timo) {
288 continue;
289 }
290 }
291 if (!m_registered.count(dir_path)) {
292 candidate = dir_path;
293 break;
294 }
295 }
296
297 std::rotate(m_directories.begin(), m_directories.begin() + 1, m_directories.end());
298 return candidate;
299}
300
301int PeerReplayer::register_directory(const std::string &dir_path,
302 SnapshotReplayerThread *replayer) {
303 dout(20) << ": dir_path=" << dir_path << dendl;
304 ceph_assert(m_registered.find(dir_path) == m_registered.end());
305
306 DirRegistry registry;
307 int r = try_lock_directory(dir_path, replayer, &registry);
308 if (r < 0) {
309 return r;
310 }
311
312 dout(5) << ": dir_path=" << dir_path << " registered with replayer="
313 << replayer << dendl;
314 m_registered.emplace(dir_path, std::move(registry));
315 return 0;
316}
317
318void PeerReplayer::unregister_directory(const std::string &dir_path) {
319 dout(20) << ": dir_path=" << dir_path << dendl;
320
321 auto it = m_registered.find(dir_path);
322 ceph_assert(it != m_registered.end());
323
324 unlock_directory(it->first, it->second);
325 m_registered.erase(it);
326 if (std::find(m_directories.begin(), m_directories.end(), dir_path) == m_directories.end()) {
327 m_snap_sync_stats.erase(dir_path);
328 }
329}
330
331int PeerReplayer::try_lock_directory(const std::string &dir_path,
332 SnapshotReplayerThread *replayer, DirRegistry *registry) {
333 dout(20) << ": dir_path=" << dir_path << dendl;
334
335 int r = ceph_open(m_remote_mount, dir_path.c_str(), O_RDONLY | O_DIRECTORY, 0);
336 if (r < 0 && r != -ENOENT) {
337 derr << ": failed to open remote dir_path=" << dir_path << ": " << cpp_strerror(r)
338 << dendl;
339 return r;
340 }
341
342 if (r == -ENOENT) {
343 // we snap under dir_path, so mode does not matter much
344 r = ceph_mkdirs(m_remote_mount, dir_path.c_str(), 0755);
345 if (r < 0) {
346 derr << ": failed to create remote directory=" << dir_path << ": " << cpp_strerror(r)
347 << dendl;
348 return r;
349 }
350
351 r = ceph_open(m_remote_mount, dir_path.c_str(), O_RDONLY | O_DIRECTORY, 0);
352 if (r < 0) {
353 derr << ": failed to open remote dir_path=" << dir_path << ": " << cpp_strerror(r)
354 << dendl;
355 return r;
356 }
357 }
358
359 int fd = r;
360 r = ceph_flock(m_remote_mount, fd, LOCK_EX | LOCK_NB, (uint64_t)replayer->get_thread_id());
361 if (r != 0) {
362 if (r == -EWOULDBLOCK) {
363 dout(5) << ": dir_path=" << dir_path << " is locked by cephfs-mirror, "
364 << "will retry again" << dendl;
365 } else {
366 derr << ": failed to lock dir_path=" << dir_path << ": " << cpp_strerror(r)
367 << dendl;
368 }
369
370 if (ceph_close(m_remote_mount, fd) < 0) {
371 derr << ": failed to close (cleanup) remote dir_path=" << dir_path << ": "
372 << cpp_strerror(r) << dendl;
373 }
374 return r;
375 }
376
377 dout(10) << ": dir_path=" << dir_path << " locked" << dendl;
378
379 registry->fd = fd;
380 registry->replayer = replayer;
381 return 0;
382}
383
384void PeerReplayer::unlock_directory(const std::string &dir_path, const DirRegistry &registry) {
385 dout(20) << ": dir_path=" << dir_path << dendl;
386
387 int r = ceph_flock(m_remote_mount, registry.fd, LOCK_UN,
388 (uint64_t)registry.replayer->get_thread_id());
389 if (r < 0) {
390 derr << ": failed to unlock remote dir_path=" << dir_path << ": " << cpp_strerror(r)
391 << dendl;
392 return;
393 }
394
395 r = ceph_close(m_remote_mount, registry.fd);
396 if (r < 0) {
397 derr << ": failed to close remote dir_path=" << dir_path << ": " << cpp_strerror(r)
398 << dendl;
399 }
400
401 dout(10) << ": dir_path=" << dir_path << " unlocked" << dendl;
402}
403
404int PeerReplayer::build_snap_map(const std::string &dir_path,
405 std::map<uint64_t, std::string> *snap_map, bool is_remote) {
406 auto snap_dir = snapshot_dir_path(m_cct, dir_path);
407 dout(20) << ": dir_path=" << dir_path << ", snap_dir=" << snap_dir
408 << ", is_remote=" << is_remote << dendl;
409
410 auto lr_str = is_remote ? "remote" : "local";
411 auto mnt = is_remote ? m_remote_mount : m_local_mount;
412
413 ceph_dir_result *dirp = nullptr;
414 int r = ceph_opendir(mnt, snap_dir.c_str(), &dirp);
415 if (r < 0) {
416 if (is_remote && r == -ENOENT) {
417 return 0;
418 }
419 derr << ": failed to open " << lr_str << " snap directory=" << snap_dir
420 << ": " << cpp_strerror(r) << dendl;
421 return r;
422 }
423
424 std::set<std::string> snaps;
425 auto entry = ceph_readdir(mnt, dirp);
426 while (entry != NULL) {
427 auto d_name = std::string(entry->d_name);
428 dout(20) << ": entry=" << d_name << dendl;
429 if (d_name != "." && d_name != "..") {
430 snaps.emplace(d_name);
431 }
432
433 entry = ceph_readdir(mnt, dirp);
434 }
435
436 int rv = 0;
437 for (auto &snap : snaps) {
438 snap_info info;
439 auto snap_path = snapshot_path(snap_dir, snap);
440 r = ceph_get_snap_info(mnt, snap_path.c_str(), &info);
441 if (r < 0) {
442 derr << ": failed to fetch " << lr_str << " snap info for snap_path=" << snap_path
443 << ": " << cpp_strerror(r) << dendl;
444 rv = r;
445 break;
446 }
447
448 uint64_t snap_id;
449 if (is_remote) {
450 if (!info.nr_snap_metadata) {
451 derr << ": snap_path=" << snap_path << " has invalid metadata in remote snapshot"
452 << dendl;
453 rv = -EINVAL;
454 } else {
455 auto metadata = decode_snap_metadata(info.snap_metadata, info.nr_snap_metadata);
456 dout(20) << ": snap_path=" << snap_path << ", metadata=" << metadata << dendl;
457 auto it = metadata.find(PRIMARY_SNAP_ID_KEY);
458 if (it == metadata.end()) {
459 derr << ": snap_path=" << snap_path << " has missing \"" << PRIMARY_SNAP_ID_KEY
460 << "\" in metadata" << dendl;
461 rv = -EINVAL;
462 } else {
463 snap_id = std::stoull(it->second);
464 }
465 ceph_free_snap_info_buffer(&info);
466 }
467 } else {
468 snap_id = info.id;
469 }
470
471 if (rv != 0) {
472 break;
473 }
474 snap_map->emplace(snap_id, snap);
475 }
476
477 r = ceph_closedir(mnt, dirp);
478 if (r < 0) {
479 derr << ": failed to close " << lr_str << " snap directory=" << snap_dir
480 << ": " << cpp_strerror(r) << dendl;
481 }
482
483 dout(10) << ": " << lr_str << " snap_map=" << *snap_map << dendl;
484 return rv;
485}
486
487int PeerReplayer::propagate_snap_deletes(const std::string &dir_path,
488 const std::set<std::string> &snaps) {
489 dout(5) << ": dir_path=" << dir_path << ", deleted snapshots=" << snaps << dendl;
490
491 for (auto &snap : snaps) {
492 dout(20) << ": deleting dir_path=" << dir_path << ", snapshot=" << snap
493 << dendl;
494 int r = ceph_rmsnap(m_remote_mount, dir_path.c_str(), snap.c_str());
495 if (r < 0) {
496 derr << ": failed to delete remote snap dir_path=" << dir_path
497 << ", snapshot=" << snaps << ": " << cpp_strerror(r) << dendl;
498 return r;
499 }
500 inc_deleted_snap(dir_path);
501 }
502
503 return 0;
504}
505
506int PeerReplayer::propagate_snap_renames(
507 const std::string &dir_path,
508 const std::set<std::pair<std::string,std::string>> &snaps) {
509 dout(10) << ": dir_path=" << dir_path << ", renamed snapshots=" << snaps << dendl;
510
511 for (auto &snapp : snaps) {
512 auto from = snapshot_path(m_cct, dir_path, snapp.first);
513 auto to = snapshot_path(m_cct, dir_path, snapp.second);
514 dout(20) << ": renaming dir_path=" << dir_path << ", snapshot from="
515 << from << ", to=" << to << dendl;
516 int r = ceph_rename(m_remote_mount, from.c_str(), to.c_str());
517 if (r < 0) {
518 derr << ": failed to rename remote snap dir_path=" << dir_path
519 << ", snapshot from =" << from << ", to=" << to << ": "
520 << cpp_strerror(r) << dendl;
521 return r;
522 }
523 inc_renamed_snap(dir_path);
524 }
525
526 return 0;
527}
528
529int PeerReplayer::remote_mkdir(const std::string &local_path,
530 const std::string &remote_path,
531 const struct ceph_statx &stx) {
532 dout(10) << ": local_path=" << local_path << ", remote_path=" << remote_path
533 << dendl;
534
535 int r = ceph_mkdir(m_remote_mount, remote_path.c_str(), stx.stx_mode & ~S_IFDIR);
536 if (r < 0 && r != -EEXIST) {
537 derr << ": failed to create remote directory=" << remote_path << ": " << cpp_strerror(r)
538 << dendl;
539 return r;
540 }
541
542 r = ceph_lchown(m_remote_mount, remote_path.c_str(), stx.stx_uid, stx.stx_gid);
543 if (r < 0) {
544 derr << ": failed to chown remote directory=" << remote_path << ": " << cpp_strerror(r)
545 << dendl;
546 return r;
547 }
548
549 struct timeval times[] = {{stx.stx_atime.tv_sec, stx.stx_atime.tv_nsec / 1000},
550 {stx.stx_mtime.tv_sec, stx.stx_mtime.tv_nsec / 1000}};
551 r = ceph_lutimes(m_remote_mount, remote_path.c_str(), times);
552 if (r < 0) {
553 derr << ": failed to change [am]time on remote directory=" << remote_path << ": "
554 << cpp_strerror(r) << dendl;
555 return r;
556 }
557
558 return 0;
559}
560
561#define NR_IOVECS 8 // # iovecs
562#define IOVEC_SIZE (8 * 1024 * 1024) // buffer size for each iovec
563int PeerReplayer::remote_copy(const std::string &dir_path,
564 const std::string &local_path,
565 const std::string &remote_path,
566 const struct ceph_statx &stx) {
567 dout(10) << ": dir_path=" << dir_path << ", local_path=" << local_path
568 << ", remote_path=" << remote_path << dendl;
569 int l_fd;
570 int r_fd;
571 void *ptr;
572 struct iovec iov[NR_IOVECS];
573
574 int r = ceph_open(m_local_mount, local_path.c_str(), O_RDONLY, 0);
575 if (r < 0) {
576 derr << ": failed to open local file path=" << local_path << ": "
577 << cpp_strerror(r) << dendl;
578 return r;
579 }
580
581 l_fd = r;
582 r = ceph_open(m_remote_mount, remote_path.c_str(),
583 O_CREAT | O_TRUNC | O_WRONLY, stx.stx_mode);
584 if (r < 0) {
585 derr << ": failed to create remote file path=" << remote_path << ": "
586 << cpp_strerror(r) << dendl;
587 goto close_local_fd;
588 }
589
590 r_fd = r;
591 ptr = malloc(NR_IOVECS * IOVEC_SIZE);
592 if (!ptr) {
593 r = -ENOMEM;
594 derr << ": failed to allocate memory" << dendl;
595 goto close_remote_fd;
596 }
597
598 while (true) {
599 if (should_backoff(dir_path, &r)) {
600 dout(0) << ": backing off r=" << r << dendl;
601 break;
602 }
603
604 for (int i = 0; i < NR_IOVECS; ++i) {
605 iov[i].iov_base = (char*)ptr + IOVEC_SIZE*i;
606 iov[i].iov_len = IOVEC_SIZE;
607 }
608
609 r = ceph_preadv(m_local_mount, l_fd, iov, NR_IOVECS, -1);
610 if (r < 0) {
611 derr << ": failed to read local file path=" << local_path << ": "
612 << cpp_strerror(r) << dendl;
613 break;
614 }
615 if (r == 0) {
616 break;
617 }
618
619 int iovs = (int)(r / IOVEC_SIZE);
620 int t = r % IOVEC_SIZE;
621 if (t) {
622 iov[iovs].iov_len = t;
623 ++iovs;
624 }
625
626 r = ceph_pwritev(m_remote_mount, r_fd, iov, iovs, -1);
627 if (r < 0) {
628 derr << ": failed to write remote file path=" << remote_path << ": "
629 << cpp_strerror(r) << dendl;
630 break;
631 }
632 }
633
634 if (r == 0) {
635 r = ceph_fsync(m_remote_mount, r_fd, 0);
636 if (r < 0) {
637 derr << ": failed to sync data for dir_path=" << remote_path << ": "
638 << cpp_strerror(r) << dendl;
639 }
640 }
641
642 free(ptr);
643
644close_remote_fd:
645 if (ceph_close(m_remote_mount, r_fd) < 0) {
646 derr << ": failed to close remote fd path=" << remote_path << ": " << cpp_strerror(r)
647 << dendl;
648 return -EINVAL;
649 }
650
651close_local_fd:
652 if (ceph_close(m_local_mount, l_fd) < 0) {
653 derr << ": failed to close local fd path=" << local_path << ": " << cpp_strerror(r)
654 << dendl;
655 return -EINVAL;
656 }
657
658 return r == 0 ? 0 : r;
659}
660
661int PeerReplayer::remote_file_op(const std::string &dir_path,
662 const std::string &local_path,
663 const std::string &remote_path,
664 const struct ceph_statx &stx) {
665 dout(10) << ": dir_path=" << dir_path << ", local_path=" << local_path
666 << ", remote_path=" << remote_path << dendl;
667
668 int r;
669 if (S_ISREG(stx.stx_mode)) {
670 r = remote_copy(dir_path, local_path, remote_path, stx);
671 if (r < 0) {
672 derr << ": failed to copy path=" << local_path << ": " << cpp_strerror(r)
673 << dendl;
674 return r;
675 }
676 } else if (S_ISLNK(stx.stx_mode)) {
677 char *target = (char *)alloca(stx.stx_size+1);
678 r = ceph_readlink(m_local_mount, local_path.c_str(), target, stx.stx_size);
679 if (r < 0) {
680 derr << ": failed to readlink local path=" << local_path << ": " << cpp_strerror(r)
681 << dendl;
682 return r;
683 }
684
685 target[stx.stx_size] = '\0';
686 r = ceph_symlink(m_remote_mount, target, remote_path.c_str());
687 if (r < 0 && r != EEXIST) {
688 derr << ": failed to symlink remote path=" << remote_path << " to target=" << target
689 << ": " << cpp_strerror(r) << dendl;
690 return r;
691 }
692 } else {
693 dout(5) << ": skipping entry=" << local_path << ": unsupported mode=" << stx.stx_mode
694 << dendl;
695 return 0;
696 }
697
698 r = ceph_lchown(m_remote_mount, remote_path.c_str(), stx.stx_uid, stx.stx_gid);
699 if (r < 0) {
700 derr << ": failed to chown remote directory=" << remote_path << ": "
701 << cpp_strerror(r) << dendl;
702 return r;
703 }
704
705 struct timeval times[] = {{stx.stx_atime.tv_sec, stx.stx_atime.tv_nsec / 1000},
706 {stx.stx_mtime.tv_sec, stx.stx_mtime.tv_nsec / 1000}};
707 r = ceph_lutimes(m_remote_mount, remote_path.c_str(), times);
708 if (r < 0) {
709 derr << ": failed to change [am]time on remote directory=" << remote_path << ": "
710 << cpp_strerror(r) << dendl;
711 return r;
712 }
713
714 return 0;
715}
716
717int PeerReplayer::cleanup_remote_dir(const std::string &dir_path) {
718 dout(20) << ": dir_path=" << dir_path << dendl;
719
720 std::stack<SyncEntry> rm_stack;
721 ceph_dir_result *tdirp;
722 int r = ceph_opendir(m_remote_mount, dir_path.c_str(), &tdirp);
723 if (r < 0) {
724 derr << ": failed to open remote directory=" << dir_path << ": "
725 << cpp_strerror(r) << dendl;
726 return r;
727 }
728
729 struct ceph_statx tstx;
730 r = ceph_statx(m_remote_mount, dir_path.c_str(), &tstx,
731 CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
732 CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
733 AT_NO_ATTR_SYNC | AT_SYMLINK_NOFOLLOW);
734 if (r < 0) {
735 derr << ": failed to stat remote directory=" << dir_path << ": "
736 << cpp_strerror(r) << dendl;
737 return r;
738 }
739
740 rm_stack.emplace(SyncEntry(dir_path, tdirp, tstx));
741 while (!rm_stack.empty()) {
742 if (should_backoff(dir_path, &r)) {
743 dout(0) << ": backing off r=" << r << dendl;
744 break;
745 }
746
747 dout(20) << ": " << rm_stack.size() << " entries in stack" << dendl;
748 std::string e_name;
749 auto &entry = rm_stack.top();
750 dout(20) << ": top of stack path=" << entry.epath << dendl;
751 if (entry.is_directory()) {
752 struct ceph_statx stx;
753 struct dirent de;
754 while (true) {
755 r = ceph_readdirplus_r(m_remote_mount, entry.dirp, &de, &stx,
756 CEPH_STATX_MODE, AT_NO_ATTR_SYNC | AT_SYMLINK_NOFOLLOW, NULL);
757 if (r < 0) {
758 derr << ": failed to read remote directory=" << entry.epath << dendl;
759 break;
760 }
761 if (r == 0) {
762 break;
763 }
764
765 auto d_name = std::string(de.d_name);
766 if (d_name != "." && d_name != "..") {
767 e_name = d_name;
768 break;
769 }
770 }
771
772 if (r == 0) {
773 if (rm_stack.size() > 1) {
774 r = ceph_rmdir(m_remote_mount, entry.epath.c_str());
775 if (r < 0) {
776 derr << ": failed to remove remote directory=" << entry.epath << ": "
777 << cpp_strerror(r) << dendl;
778 break;
779 }
780 }
781
782 dout(10) << ": done for remote directory=" << entry.epath << dendl;
783 if (ceph_closedir(m_remote_mount, entry.dirp) < 0) {
784 derr << ": failed to close remote directory=" << entry.epath << dendl;
785 }
786 rm_stack.pop();
787 continue;
788 }
789 if (r < 0) {
790 break;
791 }
792
793 auto epath = entry_path(entry.epath, e_name);
794 if (S_ISDIR(stx.stx_mode)) {
795 ceph_dir_result *dirp;
796 r = ceph_opendir(m_remote_mount, epath.c_str(), &dirp);
797 if (r < 0) {
798 derr << ": failed to open remote directory=" << epath << ": "
799 << cpp_strerror(r) << dendl;
800 break;
801 }
802 rm_stack.emplace(SyncEntry(epath, dirp, stx));
803 } else {
804 rm_stack.emplace(SyncEntry(epath, stx));
805 }
806 } else {
807 r = ceph_unlink(m_remote_mount, entry.epath.c_str());
808 if (r < 0) {
809 derr << ": failed to remove remote directory=" << entry.epath << ": "
810 << cpp_strerror(r) << dendl;
811 break;
812 }
813 dout(10) << ": done for remote file=" << entry.epath << dendl;
814 rm_stack.pop();
815 }
816 }
817
818 while (!rm_stack.empty()) {
819 auto &entry = rm_stack.top();
820 if (entry.is_directory()) {
821 dout(20) << ": closing remote directory=" << entry.epath << dendl;
822 if (ceph_closedir(m_remote_mount, entry.dirp) < 0) {
823 derr << ": failed to close remote directory=" << entry.epath << dendl;
824 }
825 }
826
827 rm_stack.pop();
828 }
829
830 return r;
831}
832
833int PeerReplayer::do_synchronize(const std::string &dir_path, const std::string &snap_name) {
834 dout(20) << ": dir_path=" << dir_path << ", snap_name=" << snap_name << dendl;
835
836 auto snap_path = snapshot_path(m_cct, dir_path, snap_name);
837 std::stack<SyncEntry> sync_stack;
838
839 ceph_dir_result *tdirp;
840 int r = ceph_opendir(m_local_mount, snap_path.c_str(), &tdirp);
841 if (r < 0) {
842 derr << ": failed to open local directory=" << snap_path << ": "
843 << cpp_strerror(r) << dendl;
844 return r;
845 }
846
847 struct ceph_statx tstx;
848 r = ceph_statx(m_local_mount, snap_path.c_str(), &tstx,
849 CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
850 CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
851 AT_NO_ATTR_SYNC | AT_SYMLINK_NOFOLLOW);
852 if (r < 0) {
853 derr << ": failed to stat local directory=" << snap_path << ": "
854 << cpp_strerror(r) << dendl;
855 return r;
856 }
857
858 sync_stack.emplace(SyncEntry("/", tdirp, tstx));
859 while (!sync_stack.empty()) {
860 if (should_backoff(dir_path, &r)) {
861 dout(0) << ": backing off r=" << r << dendl;
862 break;
863 }
864
865 dout(20) << ": " << sync_stack.size() << " entries in stack" << dendl;
866 std::string e_name;
867 auto &entry = sync_stack.top();
868 dout(20) << ": top of stack path=" << entry.epath << dendl;
869 if (entry.is_directory()) {
870 struct ceph_statx stx;
871 struct dirent de;
872 while (true) {
873 r = ceph_readdirplus_r(m_local_mount, entry.dirp, &de, &stx,
874 CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
875 CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
876 AT_NO_ATTR_SYNC | AT_SYMLINK_NOFOLLOW, NULL);
877 if (r < 0) {
878 derr << ": failed to local read directory=" << entry.epath << dendl;
879 break;
880 }
881 if (r == 0) {
882 break;
883 }
884
885 auto d_name = std::string(de.d_name);
886 if (d_name != "." && d_name != "..") {
887 e_name = d_name;
888 break;
889 }
890 }
891
892 if (r == 0) {
893 dout(10) << ": done for directory=" << entry.epath << dendl;
894 if (ceph_closedir(m_local_mount, entry.dirp) < 0) {
895 derr << ": failed to close local directory=" << entry.epath << dendl;
896 }
897 sync_stack.pop();
898 continue;
899 }
900 if (r < 0) {
901 break;
902 }
903
904 auto epath = entry_path(entry.epath, e_name);
905 auto l_path = entry_path(snap_path, epath);
906 auto r_path = entry_path(dir_path, epath);
907 if (S_ISDIR(stx.stx_mode)) {
908 r = remote_mkdir(l_path, r_path, stx);
909 if (r < 0) {
910 break;
911 }
912 ceph_dir_result *dirp;
913 r = ceph_opendir(m_local_mount, l_path.c_str(), &dirp);
914 if (r < 0) {
915 derr << ": failed to open local directory=" << l_path << ": "
916 << cpp_strerror(r) << dendl;
917 break;
918 }
919 sync_stack.emplace(SyncEntry(epath, dirp, stx));
920 } else {
921 sync_stack.emplace(SyncEntry(epath, stx));
922 }
923 } else {
924 auto l_path = entry_path(snap_path, entry.epath);
925 auto r_path = entry_path(dir_path, entry.epath);
926 r = remote_file_op(dir_path, l_path, r_path, entry.stx);
927 if (r < 0) {
928 break;
929 }
930 dout(10) << ": done for file=" << entry.epath << dendl;
931 sync_stack.pop();
932 }
933 }
934
935 while (!sync_stack.empty()) {
936 auto &entry = sync_stack.top();
937 if (entry.is_directory()) {
938 dout(20) << ": closing local directory=" << entry.epath << dendl;
939 if (ceph_closedir(m_local_mount, entry.dirp) < 0) {
940 derr << ": failed to close local directory=" << entry.epath << dendl;
941 }
942 }
943
944 sync_stack.pop();
945 }
946
947 return r;
948}
949
950int PeerReplayer::synchronize(const std::string &dir_path, uint64_t snap_id,
951 const std::string &snap_name) {
952 dout(20) << ": dir_path=" << dir_path << ", snap_id=" << snap_id
953 << ", snap_name=" << snap_name << dendl;
954
955 auto snap_path = snapshot_path(m_cct, dir_path, snap_name);
956
957 int r = cleanup_remote_dir(dir_path);
958 if (r < 0) {
959 derr << ": failed to cleanup remote directory=" << dir_path << dendl;
960 return r;
961 }
962
963 r = do_synchronize(dir_path, snap_name);
964 if (r < 0) {
965 derr << ": failed to synchronize dir_path=" << dir_path << ", snapshot="
966 << snap_path << dendl;
967 return r;
968 }
969
970 auto snap_id_str{stringify(snap_id)};
971 snap_metadata snap_meta[] = {{PRIMARY_SNAP_ID_KEY.c_str(), snap_id_str.c_str()}};
972 r = ceph_mksnap(m_remote_mount, dir_path.c_str(), snap_name.c_str(), 0755,
973 snap_meta, 1);
974 if (r < 0) {
975 derr << ": failed to snap remote directory dir_path=" << dir_path
976 << ": " << cpp_strerror(r) << dendl;
977 }
978
979 return r;
980}
981
982int PeerReplayer::do_sync_snaps(const std::string &dir_path) {
983 dout(20) << ": dir_path=" << dir_path << dendl;
984
985 std::map<uint64_t, std::string> local_snap_map;
986 std::map<uint64_t, std::string> remote_snap_map;
987
988 int r = build_snap_map(dir_path, &local_snap_map);
989 if (r < 0) {
990 derr << ": failed to build local snap map" << dendl;
991 return r;
992 }
993
994 r = build_snap_map(dir_path, &remote_snap_map, true);
995 if (r < 0) {
996 derr << ": failed to build remote snap map" << dendl;
997 return r;
998 }
999
1000 // infer deleted and renamed snapshots from local and remote
1001 // snap maps
1002 std::set<std::string> snaps_deleted;
1003 std::set<std::pair<std::string,std::string>> snaps_renamed;
1004 for (auto &[primary_snap_id, snap_name] : remote_snap_map) {
1005 auto it = local_snap_map.find(primary_snap_id);
1006 if (it == local_snap_map.end()) {
1007 snaps_deleted.emplace(snap_name);
1008 } else if (it->second != snap_name) {
1009 snaps_renamed.emplace(std::make_pair(snap_name, it->second));
1010 }
1011 }
1012
1013 r = propagate_snap_deletes(dir_path, snaps_deleted);
1014 if (r < 0) {
1015 derr << ": failed to propgate deleted snapshots" << dendl;
1016 return r;
1017 }
1018
1019 r = propagate_snap_renames(dir_path, snaps_renamed);
1020 if (r < 0) {
1021 derr << ": failed to propgate renamed snapshots" << dendl;
1022 return r;
1023 }
1024
1025 // start mirroring snapshots from the last snap-id synchronized
1026 uint64_t last_snap_id = 0;
1027 if (!remote_snap_map.empty()) {
1028 auto last = remote_snap_map.rbegin();
1029 last_snap_id = last->first;
1030 set_last_synced_snap(dir_path, last_snap_id, last->second);
1031 }
1032
1033 dout(5) << ": last snap-id transferred=" << last_snap_id << dendl;
1034 auto it = local_snap_map.upper_bound(last_snap_id);
1035 if (it == local_snap_map.end()) {
1036 dout(20) << ": nothing to synchronize" << dendl;
1037 return 0;
1038 }
1039
1040 auto snaps_per_cycle = g_ceph_context->_conf.get_val<uint64_t>(
1041 "cephfs_mirror_max_snapshot_sync_per_cycle");
1042
1043 dout(10) << ": synzhronizing from snap-id=" << it->first << dendl;
1044 for (; it != local_snap_map.end(); ++it) {
1045 set_current_syncing_snap(dir_path, it->first, it->second);
1046 auto start = clock::now();
1047 r = synchronize(dir_path, it->first, it->second);
1048 if (r < 0) {
1049 derr << ": failed to synchronize dir_path=" << dir_path
1050 << ", snapshot=" << it->second << dendl;
1051 clear_current_syncing_snap(dir_path);
1052 return r;
1053 }
1054 std::chrono::duration<double> duration = clock::now() - start;
1055 set_last_synced_stat(dir_path, it->first, it->second, duration.count());
1056 if (--snaps_per_cycle == 0) {
1057 break;
1058 }
1059 }
1060
1061 return 0;
1062}
1063
1064void PeerReplayer::sync_snaps(const std::string &dir_path,
1065 std::unique_lock<ceph::mutex> &locker) {
1066 dout(20) << ": dir_path=" << dir_path << dendl;
1067 locker.unlock();
1068 int r = do_sync_snaps(dir_path);
1069 if (r < 0) {
1070 derr << ": failed to sync snapshots for dir_path=" << dir_path << dendl;
1071 }
1072 locker.lock();
1073 if (r < 0) {
1074 _inc_failed_count(dir_path);
1075 } else {
1076 _reset_failed_count(dir_path);
1077 }
1078}
1079
1080void PeerReplayer::run(SnapshotReplayerThread *replayer) {
1081 dout(10) << ": snapshot replayer=" << replayer << dendl;
1082
1083 time last_directory_scan = clock::zero();
1084 auto scan_interval = g_ceph_context->_conf.get_val<uint64_t>(
1085 "cephfs_mirror_directory_scan_interval");
1086
1087 std::unique_lock locker(m_lock);
1088 while (true) {
1089 // do not check if client is blocklisted under lock
1090 m_cond.wait_for(locker, 1s, [this]{return is_stopping();});
1091 if (is_stopping()) {
1092 dout(5) << ": exiting" << dendl;
1093 break;
1094 }
1095
1096 locker.unlock();
1097
1098 if (m_fs_mirror->is_blocklisted()) {
1099 dout(5) << ": exiting as client is blocklisted" << dendl;
1100 break;
1101 }
1102
1103 locker.lock();
1104
1105 auto now = clock::now();
1106 std::chrono::duration<double> timo = now - last_directory_scan;
1107 if (timo.count() >= scan_interval && m_directories.size()) {
1108 dout(20) << ": trying to pick from " << m_directories.size() << " directories" << dendl;
1109 auto dir_path = pick_directory();
1110 if (dir_path) {
1111 dout(5) << ": picked dir_path=" << *dir_path << dendl;
1112 int r = register_directory(*dir_path, replayer);
1113 if (r == 0) {
1114 sync_snaps(*dir_path, locker);
1115 unregister_directory(*dir_path);
1116 }
1117 }
1118
1119 last_directory_scan = now;
1120 }
1121 }
1122}
1123
1124void PeerReplayer::peer_status(Formatter *f) {
1125 std::scoped_lock locker(m_lock);
1126 f->open_object_section("stats");
1127 for (auto &[dir_path, sync_stat] : m_snap_sync_stats) {
1128 f->open_object_section(dir_path);
1129 if (sync_stat.failed) {
1130 f->dump_string("state", "failed");
1131 } else if (!sync_stat.current_syncing_snap) {
1132 f->dump_string("state", "idle");
1133 } else {
1134 f->dump_string("state", "syncing");
1135 f->open_object_section("current_sycning_snap");
1136 f->dump_unsigned("id", (*sync_stat.current_syncing_snap).first);
1137 f->dump_string("name", (*sync_stat.current_syncing_snap).second);
1138 f->close_section();
1139 }
1140 if (sync_stat.last_synced_snap) {
1141 f->open_object_section("last_synced_snap");
1142 f->dump_unsigned("id", (*sync_stat.last_synced_snap).first);
1143 f->dump_string("name", (*sync_stat.last_synced_snap).second);
1144 if (sync_stat.last_sync_duration) {
1145 f->dump_float("sync_duration", *sync_stat.last_sync_duration);
1146 f->dump_stream("sync_time_stamp") << sync_stat.last_synced;
1147 }
1148 f->close_section();
1149 }
1150 f->dump_unsigned("snaps_synced", sync_stat.synced_snap_count);
1151 f->dump_unsigned("snaps_deleted", sync_stat.deleted_snap_count);
1152 f->dump_unsigned("snaps_renamed", sync_stat.renamed_snap_count);
1153 f->close_section(); // dir_path
1154 }
1155 f->close_section(); // stats
1156}
1157
1158} // namespace mirror
1159} // namespace cephfs