}
public:
RGWDataSyncProcessorThread(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
- const string& _source_zone,
- rgw::BucketChangeObserver *observer)
+ const string& _source_zone)
: RGWSyncProcessorThread(_store, "data-sync"),
- sync(_store, async_rados, _source_zone, observer),
+ sync(_store, async_rados, _source_zone),
initialized(false) {}
void wakeup_sync_shards(map<int, set<string> >& shard_ids) {
for (int i = 0; i < num_shards; ++i) {
Mutex::Locker l(*locks[i]);
for (auto c : completions[i]) {
- Mutex::Locker cl(c->lock);
c->stop();
}
}
ldout(cct, 0) << "ERROR: failed to start bucket trim manager" << dendl;
return ret;
}
+ data_log->set_observer(&*bucket_trim);
Mutex::Locker dl(data_sync_thread_lock);
for (auto iter : zone_data_sync_from_map) {
ldout(cct, 5) << "starting data sync thread for zone " << iter.first << dendl;
- auto *thread = new RGWDataSyncProcessorThread(this, async_rados, iter.first,
- &*bucket_trim);
+ auto *thread = new RGWDataSyncProcessorThread(this, async_rados, iter.first);
ret = thread->init();
if (ret < 0) {
ldout(cct, 0) << "ERROR: failed to initialize data sync thread" << dendl;
orig_size = state->accounted_size;
}
- bool versioned_target = (meta.olh_epoch > 0 || !obj.key.instance.empty());
+ bool versioned_target = (meta.olh_epoch && *meta.olh_epoch > 0) ||
+ !obj.key.instance.empty();
bool versioned_op = (target->versioning_enabled() || is_olh || versioned_target);
target->invalidate_state();
state = NULL;
- if (versioned_op) {
- r = store->set_olh(target->get_ctx(), target->get_bucket_info(), obj, false, NULL, meta.olh_epoch, real_time(), false, meta.zones_trace);
+ if (versioned_op && meta.olh_epoch) {
+ r = store->set_olh(target->get_ctx(), target->get_bucket_info(), obj, false, NULL, *meta.olh_epoch, real_time(), false, meta.zones_trace);
if (r < 0) {
return r;
}
bool copy_if_newer,
map<string, bufferlist>& attrs,
RGWObjCategory category,
- uint64_t olh_epoch,
+ boost::optional<uint64_t> olh_epoch,
real_time delete_at,
string *version_id,
string *ptag,
if (version_id && *version_id != "null") {
processor.set_version_id(*version_id);
}
- processor.set_olh_epoch(olh_epoch);
+ if (olh_epoch) {
+ processor.set_olh_epoch(*olh_epoch);
+ }
int ret = processor.prepare(this, NULL);
if (ret < 0) {
return ret;
return 0;
set_err_state:
if (copy_if_newer && ret == -ERR_NOT_MODIFIED) {
- ret = 0;
+ // we may have already fetched during sync of OP_ADD, but were waiting
+ // for OP_LINK_OLH to call set_olh() with a real olh_epoch
+ if (olh_epoch && *olh_epoch > 0) {
+ constexpr bool log_data_change = true;
+ ret = set_olh(obj_ctx, dest_bucket_info, dest_obj, false, nullptr,
+ *olh_epoch, real_time(), false, zones_trace, log_data_change);
+ } else {
+ // we already have the latest copy
+ ret = 0;
+ }
}
if (opstate) {
RGWOpState::OpState state;
}
result.version_id = marker.key.instance;
+ if (result.version_id.empty())
+ result.version_id = "null";
result.delete_marker = true;
struct rgw_bucket_dir_entry_meta meta;
const string& op_tag,
struct rgw_bucket_dir_entry_meta *meta,
uint64_t olh_epoch,
- real_time unmod_since, bool high_precision_time, rgw_zone_set *_zones_trace)
+ real_time unmod_since, bool high_precision_time,
+ rgw_zone_set *_zones_trace, bool log_data_change)
{
rgw_rados_ref ref;
int r = get_obj_head_ref(bucket_info, obj_instance, &ref);
return r;
}
+ if (log_data_change && bucket_info.datasync_flag_enabled()) {
+ data_log->add_entry(bs.bucket, bs.shard_id);
+ }
+
return 0;
}
}
int RGWRados::set_olh(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const rgw_obj& target_obj, bool delete_marker, rgw_bucket_dir_entry_meta *meta,
- uint64_t olh_epoch, real_time unmod_since, bool high_precision_time, rgw_zone_set *zones_trace)
+ uint64_t olh_epoch, real_time unmod_since, bool high_precision_time,
+ rgw_zone_set *zones_trace, bool log_data_change)
{
string op_tag;
}
return ret;
}
- ret = bucket_index_link_olh(bucket_info, *state, target_obj, delete_marker, op_tag, meta, olh_epoch, unmod_since, high_precision_time, zones_trace);
+ ret = bucket_index_link_olh(bucket_info, *state, target_obj, delete_marker,
+ op_tag, meta, olh_epoch, unmod_since, high_precision_time,
+ zones_trace, log_data_change);
if (ret < 0) {
ldout(cct, 20) << "bucket_index_link_olh() target_obj=" << target_obj << " delete_marker=" << (int)delete_marker << " returned " << ret << dendl;
if (ret == -ECANCELED) {