#include "rgw_http_client.h"
#include "rgw_sal.h"
+#include "rgw_datalog.h"
#include "rgw_sync_module.h"
#include "rgw_sync_trace.h"
#include "rgw_sync_policy.h"
#include "rgw_bucket_sync.h"
+// represents an obligation to sync an entry up a given time
+struct rgw_data_sync_obligation {
+ std::string key;
+ std::string marker;
+ ceph::real_time timestamp;
+ bool retry = false;
+};
+
+inline std::ostream& operator<<(std::ostream& out, const rgw_data_sync_obligation& o) {
+ out << "key=" << o.key;
+ if (!o.marker.empty()) {
+ out << " marker=" << o.marker;
+ }
+ if (o.timestamp != ceph::real_time{}) {
+ out << " timestamp=" << o.timestamp;
+ }
+ if (o.retry) {
+ out << " retry";
+ }
+ return out;
+}
+
class JSONObj;
struct rgw_sync_bucket_pipe;
};
class RGWRados;
-class RGWDataChangesLogInfo;
class RGWRemoteDataLog : public RGWCoroutinesManager {
const DoutPrefixProvider *dpp;
struct rgw_bucket_shard_inc_sync_marker {
string position;
-
- rgw_bucket_shard_inc_sync_marker() {}
+ ceph::real_time timestamp;
void encode_attr(map<string, bufferlist>& attrs);
void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
+ ENCODE_START(2, 1, bl);
encode(position, bl);
+ encode(timestamp, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) {
- DECODE_START(1, bl);
+ DECODE_START(2, bl);
decode(position, bl);
- DECODE_FINISH(bl);
+ if (struct_v >= 2) {
+ decode(timestamp, bl);
+ }
+ DECODE_FINISH(bl);
}
void dump(Formatter *f) const;
void decode_json(JSONObj *obj);
-
- bool operator<(const rgw_bucket_shard_inc_sync_marker& m) const {
- return (position < m.position);
- }
};
WRITE_CLASS_ENCODER(rgw_bucket_shard_inc_sync_marker)
const rgw_bucket& dest_bucket);
RGWCoroutine *read_sync_status_cr(int num, rgw_bucket_shard_sync_info *sync_status);
- RGWCoroutine *init_sync_status_cr(int num);
+ RGWCoroutine *init_sync_status_cr(int num, RGWObjVersionTracker& objv_tracker);
RGWCoroutine *run_sync_cr(int num);
int num_pipes() {
void wakeup();
};
+class BucketIndexShardsManager;
+
+int rgw_read_remote_bilog_info(RGWRESTConn* conn,
+ const rgw_bucket& bucket,
+ BucketIndexShardsManager& markers,
+ optional_yield y);
+
class RGWBucketPipeSyncStatusManager : public DoutPrefixProvider {
rgw::sal::RGWRadosStore *store;
static string status_oid(const rgw_zone_id& source_zone, const rgw_bucket_sync_pair_info& bs);
static string obj_status_oid(const rgw_bucket_sync_pipe& sync_pipe,
- const rgw_zone_id& source_zone, const rgw_obj& obj); /* specific source obj sync status,
+ const rgw_zone_id& source_zone, const rgw::sal::RGWObject* obj); /* specific source obj sync status,
can be used by sync modules */
// implements DoutPrefixProvider