]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_data_sync.h
update sources to 12.2.10
[ceph.git] / ceph / src / rgw / rgw_data_sync.h
index 8e876b9c97d77a0c66f026414fb686b7ecf79fd1..83f75c925578cb39374f0e03ec9406bafe110bd1 100644 (file)
 #include "common/RWLock.h"
 #include "common/ceph_json.h"
 
-namespace rgw {
-class BucketChangeObserver;
-}
-
 struct rgw_datalog_info {
   uint32_t num_shards;
 
@@ -128,7 +124,19 @@ struct rgw_data_sync_marker {
   }
 
   void dump(Formatter *f) const {
-    encode_json("state", (int)state, f);
+    const char *s{nullptr};
+    switch ((SyncState)state) {
+      case FullSync:
+        s = "full-sync";
+        break;
+      case IncrementalSync:
+        s = "incremental-sync";
+        break;
+      default:
+        s = "unknown";
+        break;
+    }
+    encode_json("status", s, f);
     encode_json("marker", marker, f);
     encode_json("next_step_marker", next_step_marker, f);
     encode_json("total_entries", total_entries, f);
@@ -136,9 +144,13 @@ struct rgw_data_sync_marker {
     encode_json("timestamp", utime_t(timestamp), f);
   }
   void decode_json(JSONObj *obj) {
-    int s;
-    JSONDecoder::decode_json("state", s, obj);
-    state = s;
+    std::string s;
+    JSONDecoder::decode_json("status", s, obj);
+    if (s == "full-sync") {
+      state = FullSync;
+    } else if (s == "incremental-sync") {
+      state = IncrementalSync;
+    }
     JSONDecoder::decode_json("marker", marker, obj);
     JSONDecoder::decode_json("next_step_marker", next_step_marker, obj);
     JSONDecoder::decode_json("total_entries", total_entries, obj);
@@ -222,15 +234,13 @@ struct RGWDataSyncEnv {
   RGWSyncErrorLogger *error_logger;
   string source_zone;
   RGWSyncModuleInstanceRef sync_module;
-  rgw::BucketChangeObserver *observer{nullptr};
 
   RGWDataSyncEnv() : cct(NULL), store(NULL), conn(NULL), async_rados(NULL), http_manager(NULL), error_logger(NULL), sync_module(NULL) {}
 
   void init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn,
             RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
             RGWSyncErrorLogger *_error_logger, const string& _source_zone,
-            RGWSyncModuleInstanceRef& _sync_module,
-            rgw::BucketChangeObserver *_observer) {
+            RGWSyncModuleInstanceRef& _sync_module) {
     cct = _cct;
     store = _store;
     conn = _conn;
@@ -239,7 +249,6 @@ struct RGWDataSyncEnv {
     error_logger = _error_logger;
     source_zone = _source_zone;
     sync_module = _sync_module;
-    observer = _observer;
   }
 
   string shard_obj_name(int shard_id);
@@ -249,7 +258,6 @@ struct RGWDataSyncEnv {
 class RGWRemoteDataLog : public RGWCoroutinesManager {
   RGWRados *store;
   RGWAsyncRadosProcessor *async_rados;
-  rgw::BucketChangeObserver *observer;
   RGWHTTPManager http_manager;
 
   RGWDataSyncEnv sync_env;
@@ -260,10 +268,9 @@ class RGWRemoteDataLog : public RGWCoroutinesManager {
   bool initialized;
 
 public:
-  RGWRemoteDataLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
-                   rgw::BucketChangeObserver *observer)
+  RGWRemoteDataLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados)
     : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()),
-      store(_store), async_rados(async_rados), observer(observer),
+      store(_store), async_rados(async_rados),
       http_manager(store->ctx(), completion_mgr),
       lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL),
       initialized(false) {}
@@ -274,6 +281,8 @@ public:
   int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info);
   int read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result);
   int read_sync_status(rgw_data_sync_status *sync_status);
+  int read_recovering_shards(const int num_shards, set<int>& recovering_shards);
+  int read_shard_status(int shard_id, set<string>& lagging_buckets,set<string>& recovering_buckets, rgw_data_sync_marker* sync_marker, const int max_entries);
   int init_sync_status(int num_shards);
   int run_sync(int num_shards);
 
@@ -300,17 +309,15 @@ class RGWDataSyncStatusManager {
 
 public:
   RGWDataSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
-                           const string& _source_zone,
-                           rgw::BucketChangeObserver *observer = nullptr)
+                           const string& _source_zone)
     : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL),
       sync_module(nullptr),
-      source_log(store, async_rados, observer), num_shards(0) {}
+      source_log(store, async_rados), num_shards(0) {}
   RGWDataSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
-                           const string& _source_zone, const RGWSyncModuleInstanceRef& _sync_module,
-                           rgw::BucketChangeObserver *observer = nullptr)
+                           const string& _source_zone, const RGWSyncModuleInstanceRef& _sync_module)
     : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL),
       sync_module(_sync_module),
-      source_log(store, async_rados, observer), num_shards(0) {}
+      source_log(store, async_rados), num_shards(0) {}
   ~RGWDataSyncStatusManager() {
     finalize();
   }
@@ -323,6 +330,14 @@ public:
   int read_sync_status(rgw_data_sync_status *sync_status) {
     return source_log.read_sync_status(sync_status);
   }
+
+  int read_recovering_shards(const int num_shards, set<int>& recovering_shards) {
+    return source_log.read_recovering_shards(num_shards, recovering_shards);
+  }
+
+  int read_shard_status(int shard_id, set<string>& lagging_buckets, set<string>& recovering_buckets, rgw_data_sync_marker *sync_marker, const int max_entries) {
+    return source_log.read_shard_status(shard_id, lagging_buckets, recovering_buckets,sync_marker, max_entries);
+  }
   int init_sync_status() { return source_log.init_sync_status(num_shards); }
 
   int read_log_info(rgw_datalog_info *log_info) {
@@ -440,6 +455,20 @@ struct rgw_bucket_shard_sync_info {
 };
 WRITE_CLASS_ENCODER(rgw_bucket_shard_sync_info)
 
+struct rgw_bucket_index_marker_info {
+  string bucket_ver;
+  string master_ver;
+  string max_marker;
+  bool syncstopped{false};
+
+  void decode_json(JSONObj *obj) {
+    JSONDecoder::decode_json("bucket_ver", bucket_ver, obj);
+    JSONDecoder::decode_json("master_ver", master_ver, obj);
+    JSONDecoder::decode_json("max_marker", max_marker, obj);
+    JSONDecoder::decode_json("syncstopped", syncstopped, obj);
+  }
+};
+
 
 class RGWRemoteBucketLog : public RGWCoroutinesManager {
   RGWRados *store;
@@ -522,7 +551,7 @@ public:
 
 /// read the sync status of all bucket shards from the given source zone
 int rgw_bucket_sync_status(RGWRados *store, const std::string& source_zone,
-                           const rgw_bucket& bucket,
+                           const RGWBucketInfo& bucket_info,
                            std::vector<rgw_bucket_shard_sync_info> *status);
 
 class RGWDefaultSyncModule : public RGWSyncModule {