#include "common/RWLock.h"
#include "common/ceph_json.h"
+namespace rgw {
+class BucketChangeObserver;
+}
struct rgw_datalog_info {
uint32_t num_shards;
JSONDecoder::decode_json("num_shards", num_shards, obj);
JSONDecoder::decode_json("instance_id", num_shards, obj);
}
+ static void generate_test_instances(std::list<rgw_data_sync_info*>& o);
rgw_data_sync_info() : state((int)StateInit), num_shards(0) {}
};
JSONDecoder::decode_json("timestamp", t, obj);
timestamp = t.to_real_time();
}
+ static void generate_test_instances(std::list<rgw_data_sync_marker*>& o);
};
WRITE_CLASS_ENCODER(rgw_data_sync_marker)
JSONDecoder::decode_json("info", sync_info, obj);
JSONDecoder::decode_json("markers", sync_markers, obj);
}
+ static void generate_test_instances(std::list<rgw_data_sync_status*>& o);
};
WRITE_CLASS_ENCODER(rgw_data_sync_status)
RGWSyncErrorLogger *error_logger;
string source_zone;
RGWSyncModuleInstanceRef sync_module;
+ rgw::BucketChangeObserver *observer{nullptr};
RGWDataSyncEnv() : cct(NULL), store(NULL), conn(NULL), async_rados(NULL), http_manager(NULL), error_logger(NULL), sync_module(NULL) {}
void init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn,
RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
RGWSyncErrorLogger *_error_logger, const string& _source_zone,
- RGWSyncModuleInstanceRef& _sync_module) {
+ RGWSyncModuleInstanceRef& _sync_module,
+ rgw::BucketChangeObserver *_observer) {
cct = _cct;
store = _store;
conn = _conn;
error_logger = _error_logger;
source_zone = _source_zone;
sync_module = _sync_module;
+ observer = _observer;
}
string shard_obj_name(int shard_id);
class RGWRemoteDataLog : public RGWCoroutinesManager {
RGWRados *store;
RGWAsyncRadosProcessor *async_rados;
+ rgw::BucketChangeObserver *observer;
RGWHTTPManager http_manager;
RGWDataSyncEnv sync_env;
bool initialized;
public:
- RGWRemoteDataLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados)
+ RGWRemoteDataLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
+ rgw::BucketChangeObserver *observer)
: RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()),
- store(_store), async_rados(async_rados),
+ store(_store), async_rados(async_rados), observer(observer),
http_manager(store->ctx(), completion_mgr),
lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL),
initialized(false) {}
public:
RGWDataSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
- const string& _source_zone)
+ const string& _source_zone,
+ rgw::BucketChangeObserver *observer = nullptr)
: store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL),
sync_module(nullptr),
- source_log(store, async_rados), num_shards(0) {}
+ source_log(store, async_rados, observer), num_shards(0) {}
+ RGWDataSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
+ const string& _source_zone, const RGWSyncModuleInstanceRef& _sync_module,
+ rgw::BucketChangeObserver *observer = nullptr)
+ : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL),
+ sync_module(_sync_module),
+ source_log(store, async_rados, observer), num_shards(0) {}
~RGWDataSyncStatusManager() {
finalize();
}
DECODE_FINISH(bl);
}
- void dump(Formatter *f) const {
- encode_json("position", position, f);
- encode_json("count", count, f);
- }
+ void dump(Formatter *f) const;
+ void decode_json(JSONObj *obj);
};
WRITE_CLASS_ENCODER(rgw_bucket_shard_full_sync_marker)
DECODE_FINISH(bl);
}
- void dump(Formatter *f) const {
- encode_json("position", position, f);
- }
+ void dump(Formatter *f) const;
+ void decode_json(JSONObj *obj);
bool operator<(const rgw_bucket_shard_inc_sync_marker& m) const {
return (position < m.position);
DECODE_FINISH(bl);
}
- void dump(Formatter *f) const {
- string s;
- switch ((SyncState)state) {
- case StateInit:
- s = "init";
- break;
- case StateFullSync:
- s = "full-sync";
- break;
- case StateIncrementalSync:
- s = "incremental-sync";
- break;
- default:
- s = "unknown";
- break;
- }
- encode_json("status", s, f);
- encode_json("full_marker", full_marker, f);
- encode_json("inc_marker", inc_marker, f);
- }
+ void dump(Formatter *f) const;
+ void decode_json(JSONObj *obj);
rgw_bucket_shard_sync_info() : state((int)StateInit) {}
int run();
};
+/// read the sync status of all bucket shards from the given source zone
+int rgw_bucket_sync_status(RGWRados *store, const std::string& source_zone,
+ const rgw_bucket& bucket,
+ std::vector<rgw_bucket_shard_sync_info> *status);
+
class RGWDefaultSyncModule : public RGWSyncModule {
public:
RGWDefaultSyncModule() {}