]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_data_sync.h
update sources to v12.2.3
[ceph.git] / ceph / src / rgw / rgw_data_sync.h
index 934f3f322c43af28f365a0c27131fca6022ee900..7716fabadcc6e57468b5b4ae11d878d9ce626f19 100644 (file)
@@ -10,6 +10,9 @@
 #include "common/RWLock.h"
 #include "common/ceph_json.h"
 
+namespace rgw {
+class BucketChangeObserver;
+}
 
 struct rgw_datalog_info {
   uint32_t num_shards;
@@ -82,6 +85,7 @@ struct rgw_data_sync_info {
     JSONDecoder::decode_json("num_shards", num_shards, obj);
     JSONDecoder::decode_json("instance_id", num_shards, obj);
   }
+  static void generate_test_instances(std::list<rgw_data_sync_info*>& o);
 
   rgw_data_sync_info() : state((int)StateInit), num_shards(0) {}
 };
@@ -143,6 +147,7 @@ struct rgw_data_sync_marker {
     JSONDecoder::decode_json("timestamp", t, obj);
     timestamp = t.to_real_time();
   }
+  static void generate_test_instances(std::list<rgw_data_sync_marker*>& o);
 };
 WRITE_CLASS_ENCODER(rgw_data_sync_marker)
 
@@ -174,6 +179,7 @@ struct rgw_data_sync_status {
     JSONDecoder::decode_json("info", sync_info, obj);
     JSONDecoder::decode_json("markers", sync_markers, obj);
   }
+  static void generate_test_instances(std::list<rgw_data_sync_status*>& o);
 };
 WRITE_CLASS_ENCODER(rgw_data_sync_status)
 
@@ -216,13 +222,15 @@ struct RGWDataSyncEnv {
   RGWSyncErrorLogger *error_logger;
   string source_zone;
   RGWSyncModuleInstanceRef sync_module;
+  rgw::BucketChangeObserver *observer{nullptr};
 
   RGWDataSyncEnv() : cct(NULL), store(NULL), conn(NULL), async_rados(NULL), http_manager(NULL), error_logger(NULL), sync_module(NULL) {}
 
   void init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn,
             RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
             RGWSyncErrorLogger *_error_logger, const string& _source_zone,
-            RGWSyncModuleInstanceRef& _sync_module) {
+            RGWSyncModuleInstanceRef& _sync_module,
+            rgw::BucketChangeObserver *_observer) {
     cct = _cct;
     store = _store;
     conn = _conn;
@@ -231,6 +239,7 @@ struct RGWDataSyncEnv {
     error_logger = _error_logger;
     source_zone = _source_zone;
     sync_module = _sync_module;
+    observer = _observer;
   }
 
   string shard_obj_name(int shard_id);
@@ -240,6 +249,7 @@ struct RGWDataSyncEnv {
 class RGWRemoteDataLog : public RGWCoroutinesManager {
   RGWRados *store;
   RGWAsyncRadosProcessor *async_rados;
+  rgw::BucketChangeObserver *observer;
   RGWHTTPManager http_manager;
 
   RGWDataSyncEnv sync_env;
@@ -250,9 +260,10 @@ class RGWRemoteDataLog : public RGWCoroutinesManager {
   bool initialized;
 
 public:
-  RGWRemoteDataLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados)
+  RGWRemoteDataLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
+                   rgw::BucketChangeObserver *observer)
     : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()),
-      store(_store), async_rados(async_rados),
+      store(_store), async_rados(async_rados), observer(observer),
       http_manager(store->ctx(), completion_mgr),
       lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL),
       initialized(false) {}
@@ -289,10 +300,11 @@ class RGWDataSyncStatusManager {
 
 public:
   RGWDataSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
-                           const string& _source_zone)
+                           const string& _source_zone,
+                           rgw::BucketChangeObserver *observer = nullptr)
     : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL),
       sync_module(nullptr),
-      source_log(store, async_rados), num_shards(0) {}
+      source_log(store, async_rados, observer), num_shards(0) {}
   ~RGWDataSyncStatusManager() {
     finalize();
   }
@@ -350,10 +362,8 @@ struct rgw_bucket_shard_full_sync_marker {
      DECODE_FINISH(bl);
   }
 
-  void dump(Formatter *f) const {
-    encode_json("position", position, f);
-    encode_json("count", count, f);
-  }
+  void dump(Formatter *f) const;
+  void decode_json(JSONObj *obj);
 };
 WRITE_CLASS_ENCODER(rgw_bucket_shard_full_sync_marker)
 
@@ -376,9 +386,8 @@ struct rgw_bucket_shard_inc_sync_marker {
      DECODE_FINISH(bl);
   }
 
-  void dump(Formatter *f) const {
-    encode_json("position", position, f);
-  }
+  void dump(Formatter *f) const;
+  void decode_json(JSONObj *obj);
 
   bool operator<(const rgw_bucket_shard_inc_sync_marker& m) const {
     return (position < m.position);
@@ -417,26 +426,8 @@ struct rgw_bucket_shard_sync_info {
      DECODE_FINISH(bl);
   }
 
-  void dump(Formatter *f) const {
-    string s;
-    switch ((SyncState)state) {
-      case StateInit:
-       s = "init";
-       break;
-      case StateFullSync:
-       s = "full-sync";
-       break;
-      case StateIncrementalSync:
-       s = "incremental-sync";
-       break;
-      default:
-       s = "unknown";
-       break;
-    }
-    encode_json("status", s, f);
-    encode_json("full_marker", full_marker, f);
-    encode_json("inc_marker", inc_marker, f);
-  }
+  void dump(Formatter *f) const;
+  void decode_json(JSONObj *obj);
 
   rgw_bucket_shard_sync_info() : state((int)StateInit) {}
 
@@ -523,6 +514,11 @@ public:
   int run();
 };
 
+/// read the sync status of all bucket shards from the given source zone
+int rgw_bucket_sync_status(RGWRados *store, const std::string& source_zone,
+                           const rgw_bucket& bucket,
+                           std::vector<rgw_bucket_shard_sync_info> *status);
+
 class RGWDefaultSyncModule : public RGWSyncModule {
 public:
   RGWDefaultSyncModule() {}