#include "common/RWLock.h"
#include "common/ceph_json.h"
-namespace rgw {
-class BucketChangeObserver;
-}
-
struct rgw_datalog_info {
uint32_t num_shards;
}
void dump(Formatter *f) const {
- encode_json("state", (int)state, f);
+ const char *s{nullptr};
+ switch ((SyncState)state) {
+ case FullSync:
+ s = "full-sync";
+ break;
+ case IncrementalSync:
+ s = "incremental-sync";
+ break;
+ default:
+ s = "unknown";
+ break;
+ }
+ encode_json("status", s, f);
encode_json("marker", marker, f);
encode_json("next_step_marker", next_step_marker, f);
encode_json("total_entries", total_entries, f);
encode_json("timestamp", utime_t(timestamp), f);
}
void decode_json(JSONObj *obj) {
- int s;
- JSONDecoder::decode_json("state", s, obj);
- state = s;
+ std::string s;
+ JSONDecoder::decode_json("status", s, obj);
+ if (s == "full-sync") {
+ state = FullSync;
+ } else if (s == "incremental-sync") {
+ state = IncrementalSync;
+ }
JSONDecoder::decode_json("marker", marker, obj);
JSONDecoder::decode_json("next_step_marker", next_step_marker, obj);
JSONDecoder::decode_json("total_entries", total_entries, obj);
RGWSyncErrorLogger *error_logger;
string source_zone;
RGWSyncModuleInstanceRef sync_module;
- rgw::BucketChangeObserver *observer{nullptr};
RGWDataSyncEnv() : cct(NULL), store(NULL), conn(NULL), async_rados(NULL), http_manager(NULL), error_logger(NULL), sync_module(NULL) {}
void init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn,
RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
RGWSyncErrorLogger *_error_logger, const string& _source_zone,
- RGWSyncModuleInstanceRef& _sync_module,
- rgw::BucketChangeObserver *_observer) {
+ RGWSyncModuleInstanceRef& _sync_module) {
cct = _cct;
store = _store;
conn = _conn;
error_logger = _error_logger;
source_zone = _source_zone;
sync_module = _sync_module;
- observer = _observer;
}
string shard_obj_name(int shard_id);
class RGWRemoteDataLog : public RGWCoroutinesManager {
RGWRados *store;
RGWAsyncRadosProcessor *async_rados;
- rgw::BucketChangeObserver *observer;
RGWHTTPManager http_manager;
RGWDataSyncEnv sync_env;
bool initialized;
public:
- RGWRemoteDataLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
- rgw::BucketChangeObserver *observer)
+ RGWRemoteDataLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados)
: RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()),
- store(_store), async_rados(async_rados), observer(observer),
+ store(_store), async_rados(async_rados),
http_manager(store->ctx(), completion_mgr),
lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL),
initialized(false) {}
int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info);
int read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result);
int read_sync_status(rgw_data_sync_status *sync_status);
+ int read_recovering_shards(const int num_shards, set<int>& recovering_shards);
+ int read_shard_status(int shard_id, set<string>& lagging_buckets,set<string>& recovering_buckets, rgw_data_sync_marker* sync_marker, const int max_entries);
int init_sync_status(int num_shards);
int run_sync(int num_shards);
public:
RGWDataSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
- const string& _source_zone,
- rgw::BucketChangeObserver *observer = nullptr)
+ const string& _source_zone)
: store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL),
sync_module(nullptr),
- source_log(store, async_rados, observer), num_shards(0) {}
+ source_log(store, async_rados), num_shards(0) {}
RGWDataSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
- const string& _source_zone, const RGWSyncModuleInstanceRef& _sync_module,
- rgw::BucketChangeObserver *observer = nullptr)
+ const string& _source_zone, const RGWSyncModuleInstanceRef& _sync_module)
: store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL),
sync_module(_sync_module),
- source_log(store, async_rados, observer), num_shards(0) {}
+ source_log(store, async_rados), num_shards(0) {}
~RGWDataSyncStatusManager() {
finalize();
}
int read_sync_status(rgw_data_sync_status *sync_status) {
return source_log.read_sync_status(sync_status);
}
+
+ int read_recovering_shards(const int num_shards, set<int>& recovering_shards) {
+ return source_log.read_recovering_shards(num_shards, recovering_shards);
+ }
+
+ int read_shard_status(int shard_id, set<string>& lagging_buckets, set<string>& recovering_buckets, rgw_data_sync_marker *sync_marker, const int max_entries) {
+ return source_log.read_shard_status(shard_id, lagging_buckets, recovering_buckets,sync_marker, max_entries);
+ }
int init_sync_status() { return source_log.init_sync_status(num_shards); }
int read_log_info(rgw_datalog_info *log_info) {
};
WRITE_CLASS_ENCODER(rgw_bucket_shard_sync_info)
+struct rgw_bucket_index_marker_info {
+ string bucket_ver;
+ string master_ver;
+ string max_marker;
+ bool syncstopped{false};
+
+ void decode_json(JSONObj *obj) {
+ JSONDecoder::decode_json("bucket_ver", bucket_ver, obj);
+ JSONDecoder::decode_json("master_ver", master_ver, obj);
+ JSONDecoder::decode_json("max_marker", max_marker, obj);
+ JSONDecoder::decode_json("syncstopped", syncstopped, obj);
+ }
+};
+
class RGWRemoteBucketLog : public RGWCoroutinesManager {
RGWRados *store;
/// read the sync status of all bucket shards from the given source zone
int rgw_bucket_sync_status(RGWRados *store, const std::string& source_zone,
- const rgw_bucket& bucket,
+ const RGWBucketInfo& bucket_info,
std::vector<rgw_bucket_shard_sync_info> *status);
class RGWDefaultSyncModule : public RGWSyncModule {