]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/tools/cephfs_mirror/PeerReplayer.h
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / tools / cephfs_mirror / PeerReplayer.h
diff --git a/ceph/src/tools/cephfs_mirror/PeerReplayer.h b/ceph/src/tools/cephfs_mirror/PeerReplayer.h
new file mode 100644 (file)
index 0000000..42d58b5
--- /dev/null
@@ -0,0 +1,276 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPHFS_MIRROR_PEER_REPLAYER_H
+#define CEPHFS_MIRROR_PEER_REPLAYER_H
+
+#include "common/Formatter.h"
+#include "common/Thread.h"
+#include "mds/FSMap.h"
+#include "ServiceDaemon.h"
+#include "Types.h"
+
+namespace cephfs {
+namespace mirror {
+
+class FSMirror;
+class PeerReplayerAdminSocketHook;
+
+class PeerReplayer {
+public:
+  PeerReplayer(CephContext *cct, FSMirror *fs_mirror,
+               RadosRef local_cluster, const Filesystem &filesystem,
+               const Peer &peer, const std::set<std::string, std::less<>> &directories,
+               MountRef mount, ServiceDaemon *service_daemon);
+  ~PeerReplayer();
+
+  // initialize replayer for a peer
+  int init();
+
+  // shutdown replayer for a peer
+  void shutdown();
+
+  // add a directory to mirror queue
+  void add_directory(string_view dir_path);
+
+  // remove a directory from queue
+  void remove_directory(string_view dir_path);
+
+  // admin socket helpers
+  void peer_status(Formatter *f);
+
+private:
+  inline static const std::string PRIMARY_SNAP_ID_KEY = "primary_snap_id";
+
+  inline static const std::string SERVICE_DAEMON_FAILED_DIR_COUNT_KEY = "failure_count";
+  inline static const std::string SERVICE_DAEMON_RECOVERED_DIR_COUNT_KEY = "recovery_count";
+
+  bool is_stopping() {
+    return m_stopping;
+  }
+
+  struct Replayer;
+  class SnapshotReplayerThread : public Thread {
+  public:
+    SnapshotReplayerThread(PeerReplayer *peer_replayer)
+      : m_peer_replayer(peer_replayer) {
+    }
+
+    void *entry() override {
+      m_peer_replayer->run(this);
+      return 0;
+    }
+
+    void cancel() {
+      canceled = true;
+    }
+
+    bool is_canceled() const {
+      return canceled;
+    }
+
+  private:
+    PeerReplayer *m_peer_replayer;
+    bool canceled = false;
+  };
+
+  struct DirRegistry {
+    int fd;
+    SnapshotReplayerThread *replayer;
+  };
+
+  struct SyncEntry {
+    std::string epath;
+    ceph_dir_result *dirp; // valid for directories
+    struct ceph_statx stx;
+
+    SyncEntry(std::string_view path,
+              const struct ceph_statx &stx)
+      : epath(path),
+        stx(stx) {
+    }
+    SyncEntry(std::string_view path,
+              ceph_dir_result *dirp,
+              const struct ceph_statx &stx)
+      : epath(path),
+        dirp(dirp),
+        stx(stx) {
+    }
+
+    bool is_directory() const {
+      return S_ISDIR(stx.stx_mode);
+    }
+  };
+
+  using clock = ceph::coarse_mono_clock;
+  using time = ceph::coarse_mono_time;
+
+  // stats sent to service daemon
+  struct ServiceDaemonStats {
+    uint64_t failed_dir_count = 0;
+    uint64_t recovered_dir_count = 0;
+  };
+
+  struct SnapSyncStat {
+    uint64_t nr_failures = 0; // number of consecutive failures
+    boost::optional<time> last_failed; // lat failed timestamp
+    bool failed = false; // hit upper cap for consecutive failures
+    boost::optional<std::pair<uint64_t, std::string>> last_synced_snap;
+    boost::optional<std::pair<uint64_t, std::string>> current_syncing_snap;
+    uint64_t synced_snap_count = 0;
+    uint64_t deleted_snap_count = 0;
+    uint64_t renamed_snap_count = 0;
+    time last_synced = clock::zero();
+    boost::optional<double> last_sync_duration;
+  };
+
+  void _inc_failed_count(const std::string &dir_path) {
+    auto max_failures = g_ceph_context->_conf.get_val<uint64_t>(
+    "cephfs_mirror_max_consecutive_failures_per_directory");
+    auto &sync_stat = m_snap_sync_stats.at(dir_path);
+    sync_stat.last_failed = clock::now();
+    if (++sync_stat.nr_failures >= max_failures && !sync_stat.failed) {
+      sync_stat.failed = true;
+      ++m_service_daemon_stats.failed_dir_count;
+      m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer,
+                                                     SERVICE_DAEMON_FAILED_DIR_COUNT_KEY,
+                                                     m_service_daemon_stats.failed_dir_count);
+    }
+  }
+  void _reset_failed_count(const std::string &dir_path) {
+    auto &sync_stat = m_snap_sync_stats.at(dir_path);
+    if (sync_stat.failed) {
+      ++m_service_daemon_stats.recovered_dir_count;
+      m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer,
+                                                     SERVICE_DAEMON_RECOVERED_DIR_COUNT_KEY,
+                                                     m_service_daemon_stats.recovered_dir_count);
+    }
+    sync_stat.nr_failures = 0;
+    sync_stat.failed = false;
+    sync_stat.last_failed = boost::none;
+  }
+
+  void _set_last_synced_snap(const std::string &dir_path, uint64_t snap_id,
+                            const std::string &snap_name) {
+    auto &sync_stat = m_snap_sync_stats.at(dir_path);
+    sync_stat.last_synced_snap = std::make_pair(snap_id, snap_name);
+    sync_stat.current_syncing_snap = boost::none;
+  }
+  void set_last_synced_snap(const std::string &dir_path, uint64_t snap_id,
+                            const std::string &snap_name) {
+    std::scoped_lock locker(m_lock);
+    _set_last_synced_snap(dir_path, snap_id, snap_name);
+  }
+  void set_current_syncing_snap(const std::string &dir_path, uint64_t snap_id,
+                                const std::string &snap_name) {
+    std::scoped_lock locker(m_lock);
+    auto &sync_stat = m_snap_sync_stats.at(dir_path);
+    sync_stat.current_syncing_snap = std::make_pair(snap_id, snap_name);
+  }
+  void clear_current_syncing_snap(const std::string &dir_path) {
+    std::scoped_lock locker(m_lock);
+    auto &sync_stat = m_snap_sync_stats.at(dir_path);
+    sync_stat.current_syncing_snap = boost::none;
+  }
+  void inc_deleted_snap(const std::string &dir_path) {
+    std::scoped_lock locker(m_lock);
+    auto &sync_stat = m_snap_sync_stats.at(dir_path);
+    ++sync_stat.deleted_snap_count;
+  }
+  void inc_renamed_snap(const std::string &dir_path) {
+    std::scoped_lock locker(m_lock);
+    auto &sync_stat = m_snap_sync_stats.at(dir_path);
+    ++sync_stat.renamed_snap_count;
+  }
+  void set_last_synced_stat(const std::string &dir_path, uint64_t snap_id,
+                            const std::string &snap_name, double duration) {
+    std::scoped_lock locker(m_lock);
+    _set_last_synced_snap(dir_path, snap_id, snap_name);
+    auto &sync_stat = m_snap_sync_stats.at(dir_path);
+    sync_stat.last_synced = clock::now();
+    sync_stat.last_sync_duration = duration;
+    ++sync_stat.synced_snap_count;
+  }
+
+  bool should_backoff(const std::string &dir_path, int *retval) {
+    if (m_fs_mirror->is_blocklisted()) {
+      *retval = -EBLOCKLISTED;
+      return true;
+    }
+
+    std::scoped_lock locker(m_lock);
+    if (is_stopping()) {
+      // ceph defines EBLOCKLISTED to ESHUTDOWN (108). so use
+      // EINPROGRESS to identify shutdown.
+      *retval = -EINPROGRESS;
+      return true;
+    }
+    auto &dr = m_registered.at(dir_path);
+    if (dr.replayer->is_canceled()) {
+      *retval = -ECANCELED;
+      return true;
+    }
+
+    *retval = 0;
+    return false;
+  }
+
+  typedef std::vector<std::unique_ptr<SnapshotReplayerThread>> SnapshotReplayers;
+
+  CephContext *m_cct;
+  FSMirror *m_fs_mirror;
+  RadosRef m_local_cluster;
+  Filesystem m_filesystem;
+  Peer m_peer;
+  // probably need to be encapsulated when supporting cancelations
+  std::map<std::string, DirRegistry> m_registered;
+  std::vector<std::string> m_directories;
+  std::map<std::string, SnapSyncStat> m_snap_sync_stats;
+  MountRef m_local_mount;
+  ServiceDaemon *m_service_daemon;
+  PeerReplayerAdminSocketHook *m_asok_hook = nullptr;
+
+  ceph::mutex m_lock;
+  ceph::condition_variable m_cond;
+  RadosRef m_remote_cluster;
+  MountRef m_remote_mount;
+  bool m_stopping = false;
+  SnapshotReplayers m_replayers;
+
+  ServiceDaemonStats m_service_daemon_stats;
+
+  void run(SnapshotReplayerThread *replayer);
+
+  boost::optional<std::string> pick_directory();
+  int register_directory(const std::string &dir_path, SnapshotReplayerThread *replayer);
+  void unregister_directory(const std::string &dir_path);
+  int try_lock_directory(const std::string &dir_path, SnapshotReplayerThread *replayer,
+                         DirRegistry *registry);
+  void unlock_directory(const std::string &dir_path, const DirRegistry &registry);
+  void sync_snaps(const std::string &dir_path, std::unique_lock<ceph::mutex> &locker);
+
+  int do_sync_snaps(const std::string &dir_path);
+  int build_snap_map(const std::string &dir_path, std::map<uint64_t, std::string> *snap_map,
+                     bool is_remote=false);
+  int propagate_snap_deletes(const std::string &dir_name, const std::set<std::string> &snaps);
+  int propagate_snap_renames(const std::string &dir_name,
+                             const std::set<std::pair<std::string,std::string>> &snaps);
+  int synchronize(const std::string &dir_path, uint64_t snap_id, const std::string &snap_name);
+  int do_synchronize(const std::string &path, const std::string &snap_name);
+
+  int cleanup_remote_dir(const std::string &dir_path);
+  int remote_mkdir(const std::string &local_path, const std::string &remote_path,
+                   const struct ceph_statx &stx);
+  int remote_file_op(const std::string &dir_path,
+                     const std::string &local_path,
+                     const std::string &remote_path, const struct ceph_statx &stx);
+  int remote_copy(const std::string &dir_path,
+                  const std::string &local_path,
+                  const std::string &remote_path,
+                  const struct ceph_statx &local_stx);
+};
+
+} // namespace mirror
+} // namespace cephfs
+
+#endif // CEPHFS_MIRROR_PEER_REPLAYER_H