+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
#include <boost/utility/string_ref.hpp>
#include "common/ceph_json.h"
int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncModuleInstanceRef& _sync_module)
{
sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger,
- _source_zone, _sync_module, observer);
+ _source_zone, _sync_module);
if (initialized) {
return 0;
marker_to_key[marker] = key;
return true;
}
+
+ RGWOrderCallCR *allocate_order_control_cr() {
+ return new RGWLastCallerWinsCR(sync_env->cct);
+ }
};
// ostream wrappers to print buckets without copying strings
<< error_repo->get_obj() << " retcode=" << retcode << dendl;
}
}
- if (sync_env->observer) {
- sync_env->observer->on_bucket_changed(bs.bucket.get_key());
- }
/* FIXME: what do do in case of error */
if (marker_tracker && !entry_marker.empty()) {
/* update marker */
#define BUCKET_SHARD_SYNC_SPAWN_WINDOW 20
#define DATA_SYNC_MAX_ERR_ENTRIES 10
-enum RemoteDatalogStatus {
- RemoteNotTrimmed = 0,
- RemoteTrimmed = 1,
- RemoteMightTrimmed = 2
-};
-
class RGWDataSyncShardCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
list<rgw_data_change_log_entry>::iterator log_iter;
bool truncated;
- RGWDataChangesLogInfo shard_info;
- string datalog_marker;
-
- RemoteDatalogStatus remote_trimmed;
Mutex inc_lock;
Cond inc_cond;
string error_marker;
int max_error_entries;
- ceph::real_time error_retry_time;
+ ceph::coarse_real_time error_retry_time;
#define RETRY_BACKOFF_SECS_MIN 60
#define RETRY_BACKOFF_SECS_DEFAULT 60
pool(_pool),
shard_id(_shard_id),
sync_marker(_marker),
- marker_tracker(NULL), truncated(false), remote_trimmed(RemoteNotTrimmed), inc_lock("RGWDataSyncShardCR::inc_lock"),
+ marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"),
total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL),
lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES),
retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT) {
set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker));
total_entries = sync_marker.pos;
do {
+ if (!lease_cr->is_locked()) {
+ stop_spawned_services();
+ drain_all();
+ return set_cr_error(-ECANCELED);
+ }
yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid), sync_marker.marker, &entries, max_entries));
if (retcode < 0) {
ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
} else {
// fetch remote and write locally
yield spawn(new RGWDataSyncSingleEntryCR(sync_env, *iter, *iter, marker_tracker, error_repo, false), false);
- if (retcode < 0) {
- lease_cr->go_down();
- drain_all();
- return set_cr_error(retcode);
- }
}
sync_marker.marker = *iter;
+
+ while ((int)num_spawned() > spawn_window) {
+ set_status() << "num_spawned() > spawn_window";
+ yield wait_for_child();
+ int ret;
+ while (collect(&ret, lease_stack.get())) {
+ if (ret < 0) {
+ ldout(cct, 10) << "a sync operation returned error" << dendl;
+ }
+ }
+ }
}
} while ((int)entries.size() == max_entries);
- lease_cr->go_down();
- drain_all();
+ drain_all_but_stack(lease_stack.get());
yield {
/* update marker to reflect we're done with full sync */
if (retcode < 0) {
ldout(sync_env->cct, 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
lease_cr->go_down();
+ drain_all();
return set_cr_error(retcode);
}
+ // keep lease and transition to incremental_sync()
}
return 0;
}
int incremental_sync() {
reenter(&incremental_cr) {
- yield init_lease_cr();
- while (!lease_cr->is_locked()) {
- if (lease_cr->is_done()) {
- ldout(cct, 5) << "lease cr failed, done early " << dendl;
- set_status("lease lock failed, early abort");
- return set_cr_error(lease_cr->get_ret_status());
+ ldout(cct, 10) << "start incremental sync" << dendl;
+ if (lease_cr) {
+ ldout(cct, 10) << "lease already held from full sync" << dendl;
+ } else {
+ yield init_lease_cr();
+ while (!lease_cr->is_locked()) {
+ if (lease_cr->is_done()) {
+ ldout(cct, 5) << "lease cr failed, done early " << dendl;
+ set_status("lease lock failed, early abort");
+ return set_cr_error(lease_cr->get_ret_status());
+ }
+ set_sleeping(true);
+ yield;
}
- set_sleeping(true);
- yield;
+ set_status("lease acquired");
+ ldout(cct, 10) << "took lease" << dendl;
}
- set_status("lease acquired");
error_repo = new RGWOmapAppend(sync_env->async_rados, sync_env->store,
rgw_raw_obj(pool, error_oid),
1 /* no buffer */);
logger.log("inc sync");
set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker));
do {
+ if (!lease_cr->is_locked()) {
+ stop_spawned_services();
+ drain_all();
+ return set_cr_error(-ECANCELED);
+ }
current_modified.clear();
inc_lock.Lock();
current_modified.swap(modified_shards);
}
}
- /* process bucket shards that previously failed */
- yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid),
- error_marker, &error_entries,
- max_error_entries));
- ldout(sync_env->cct, 20) << __func__ << "(): read error repo, got " << error_entries.size() << " entries" << dendl;
- iter = error_entries.begin();
- for (; iter != error_entries.end(); ++iter) {
- error_marker = *iter;
- ldout(sync_env->cct, 20) << __func__ << "(): handle error entry: " << error_marker << dendl;
- spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true), false);
- }
- if ((int)error_entries.size() != max_error_entries) {
- if (error_marker.empty() && error_entries.empty()) {
- /* the retry repo is empty, we back off a bit before calling it again */
- retry_backoff_secs *= 2;
- if (retry_backoff_secs > RETRY_BACKOFF_SECS_MAX) {
- retry_backoff_secs = RETRY_BACKOFF_SECS_MAX;
+ if (error_retry_time <= ceph::coarse_real_clock::now()) {
+ /* process bucket shards that previously failed */
+ yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid),
+ error_marker, &error_entries,
+ max_error_entries));
+ ldout(sync_env->cct, 20) << __func__ << "(): read error repo, got " << error_entries.size() << " entries" << dendl;
+ iter = error_entries.begin();
+ for (; iter != error_entries.end(); ++iter) {
+ error_marker = *iter;
+ ldout(sync_env->cct, 20) << __func__ << "(): handle error entry: " << error_marker << dendl;
+ spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true), false);
+ }
+ if ((int)error_entries.size() != max_error_entries) {
+ if (error_marker.empty() && error_entries.empty()) {
+ /* the retry repo is empty, we back off a bit before calling it again */
+ retry_backoff_secs *= 2;
+ if (retry_backoff_secs > RETRY_BACKOFF_SECS_MAX) {
+ retry_backoff_secs = RETRY_BACKOFF_SECS_MAX;
+ }
+ } else {
+ retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT;
}
- } else {
- retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT;
+ error_retry_time = ceph::coarse_real_clock::now() + make_timespan(retry_backoff_secs);
+ error_marker.clear();
}
- error_retry_time = ceph::real_clock::now() + make_timespan(retry_backoff_secs);
- error_marker.clear();
}
-
- yield call(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &shard_info));
+#define INCREMENTAL_MAX_ENTRIES 100
+ ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " sync_marker=" << sync_marker.marker << dendl;
+ spawned_keys.clear();
+ yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &sync_marker.marker, &log_entries, &truncated));
if (retcode < 0) {
- ldout(sync_env->cct, 0) << "ERROR: failed to fetch remote data log info: ret=" << retcode << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: failed to read remote data log info: ret=" << retcode << dendl;
stop_spawned_services();
drain_all();
return set_cr_error(retcode);
}
- datalog_marker = shard_info.marker;
- remote_trimmed = RemoteNotTrimmed;
-#define INCREMENTAL_MAX_ENTRIES 100
- ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
- if (datalog_marker > sync_marker.marker) {
- spawned_keys.clear();
- if (sync_marker.marker.empty())
- remote_trimmed = RemoteMightTrimmed; //remote data log shard might be trimmed;
- yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &sync_marker.marker, &log_entries, &truncated));
- if (retcode < 0) {
- ldout(sync_env->cct, 0) << "ERROR: failed to read remote data log info: ret=" << retcode << dendl;
- stop_spawned_services();
- drain_all();
- return set_cr_error(retcode);
+ for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
+ ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key << dendl;
+ if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) {
+ ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard" << dendl;
+ marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
+ continue;
}
- if ((remote_trimmed == RemoteMightTrimmed) && sync_marker.marker.empty() && log_entries.empty())
- remote_trimmed = RemoteTrimmed;
- else
- remote_trimmed = RemoteNotTrimmed;
- for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
- ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key << dendl;
- if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) {
- ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard" << dendl;
- marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
- continue;
- }
- if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
- ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?" << dendl;
- } else {
- /*
- * don't spawn the same key more than once. We can do that as long as we don't yield
- */
- if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
- spawned_keys.insert(log_iter->entry.key);
- spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false), false);
- if (retcode < 0) {
- stop_spawned_services();
- drain_all();
- return set_cr_error(retcode);
- }
+ if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
+ ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?" << dendl;
+ } else {
+ /*
+ * don't spawn the same key more than once. We can do that as long as we don't yield
+ */
+ if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
+ spawned_keys.insert(log_iter->entry.key);
+ spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false), false);
+ if (retcode < 0) {
+ stop_spawned_services();
+ drain_all();
+ return set_cr_error(retcode);
}
}
- }
+ }
while ((int)num_spawned() > spawn_window) {
set_status() << "num_spawned() > spawn_window";
yield wait_for_child();
int ret;
while (collect(&ret, lease_stack.get())) {
if (ret < 0) {
- ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
+ ldout(sync_env->cct, 10) << "a sync operation returned error" << dendl;
/* we have reported this error */
}
/* not waiting for child here */
}
+ /* not waiting for child here */
}
- }
- ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
- if (datalog_marker == sync_marker.marker || remote_trimmed == RemoteTrimmed) {
-#define INCREMENTAL_INTERVAL 20
- yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
- }
+ }
+ ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " sync_marker=" << sync_marker.marker << " truncated=" << truncated << dendl;
+ if (!truncated) {
+ yield wait(get_idle_interval());
+ }
} while (true);
}
return 0;
}
+
+ utime_t get_idle_interval() const {
+#define INCREMENTAL_INTERVAL 20
+ ceph::timespan interval = std::chrono::seconds(INCREMENTAL_INTERVAL);
+ if (!ceph::coarse_real_clock::is_zero(error_retry_time)) {
+ auto now = ceph::coarse_real_clock::now();
+ if (error_retry_time > now) {
+ auto d = error_retry_time - now;
+ if (interval > d) {
+ interval = d;
+ }
+ }
+ }
+ // convert timespan -> time_point -> utime_t
+ return utime_t(ceph::coarse_real_clock::zero() + interval);
+ }
+
void stop_spawned_services() {
lease_cr->go_down();
if (error_repo) {
public:
RGWDefaultDataSyncModule() {}
- RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
+ RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, boost::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override;
RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
return 0;
}
-RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
+RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, boost::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
{
return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, bucket_info,
key, versioned_epoch,
bs.shard_id = shard_id;
sync_env.init(store->ctx(), store, conn, async_rados, http_manager,
- _error_logger, source_zone, _sync_module, nullptr);
+ _error_logger, source_zone, _sync_module);
return 0;
}
call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store, obj, attrs));
}
}
+ if (info.syncstopped) {
+ retcode = -ENOENT;
+ }
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
return set_cr_done();
}
return 0;
rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
attrs);
}
+
+ RGWOrderCallCR *allocate_order_control_cr() {
+ return new RGWLastCallerWinsCR(sync_env->cct);
+ }
};
class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, rgw_obj_key> {
rgw_bucket_shard_inc_sync_marker sync_marker;
map<rgw_obj_key, string> key_to_marker;
- map<string, rgw_obj_key> marker_to_key;
+
+ struct operation {
+ rgw_obj_key key;
+ bool is_olh;
+ };
+ map<string, operation> marker_to_op;
+ std::set<std::string> pending_olh; // object names with pending olh operations
void handle_finish(const string& marker) override {
- map<string, rgw_obj_key>::iterator iter = marker_to_key.find(marker);
- if (iter == marker_to_key.end()) {
+ auto iter = marker_to_op.find(marker);
+ if (iter == marker_to_op.end()) {
return;
}
- key_to_marker.erase(iter->second);
- reset_need_retry(iter->second);
- marker_to_key.erase(iter);
+ auto& op = iter->second;
+ key_to_marker.erase(op.key);
+ reset_need_retry(op.key);
+ if (op.is_olh) {
+ pending_olh.erase(op.key.name);
+ }
+ marker_to_op.erase(iter);
}
public:
* Also, we should make sure that we don't run concurrent operations on the same key with
* different ops.
*/
- bool index_key_to_marker(const rgw_obj_key& key, const string& marker) {
- if (key_to_marker.find(key) != key_to_marker.end()) {
+ bool index_key_to_marker(const rgw_obj_key& key, const string& marker, bool is_olh) {
+ auto result = key_to_marker.emplace(key, marker);
+ if (!result.second) { // exists
set_need_retry(key);
return false;
}
- key_to_marker[key] = marker;
- marker_to_key[marker] = key;
+ marker_to_op[marker] = operation{key, is_olh};
+ if (is_olh) {
+ // prevent other olh ops from starting on this object name
+ pending_olh.insert(key.name);
+ }
return true;
}
- bool can_do_op(const rgw_obj_key& key) {
+ bool can_do_op(const rgw_obj_key& key, bool is_olh) {
+ // serialize olh ops on the same object name
+ if (is_olh && pending_olh.count(key.name)) {
+ ldout(sync_env->cct, 20) << __func__ << "(): sync of " << key << " waiting for pending olh op" << dendl;
+ return false;
+ }
return (key_to_marker.find(key) == key_to_marker.end());
}
+
+ RGWOrderCallCR *allocate_order_control_cr() {
+ return new RGWLastCallerWinsCR(sync_env->cct);
+ }
};
template <class T, class K>
rgw_obj_key key;
bool versioned;
- uint64_t versioned_epoch;
+ boost::optional<uint64_t> versioned_epoch;
rgw_bucket_entry_owner owner;
real_time timestamp;
RGWModifyOp op;
RGWBucketSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
RGWBucketInfo *_bucket_info,
const rgw_bucket_shard& bs,
- const rgw_obj_key& _key, bool _versioned, uint64_t _versioned_epoch,
+ const rgw_obj_key& _key, bool _versioned,
+ boost::optional<uint64_t> _versioned_epoch,
real_time& _timestamp,
const rgw_bucket_entry_owner& _owner,
RGWModifyOp _op, RGWPendingState _op_state,
marker_tracker(_marker_tracker),
sync_status(0){
stringstream ss;
- ss << bucket_shard_str{bs} << "/" << key << "[" << versioned_epoch << "]";
+ ss << bucket_shard_str{bs} << "/" << key << "[" << versioned_epoch.value_or(0) << "]";
set_description() << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state;
ldout(sync_env->cct, 20) << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state << dendl;
set_status("init");
retcode = -EIO;
} else if (op == CLS_RGW_OP_ADD ||
op == CLS_RGW_OP_LINK_OLH) {
- if (op == CLS_RGW_OP_ADD && !key.instance.empty() && key.instance != "null") {
- set_status("skipping entry");
- ldout(sync_env->cct, 10) << "bucket skipping sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]: versioned object will be synced on link_olh" << dendl;
- goto done;
-
- }
set_status("syncing obj");
- ldout(sync_env->cct, 5) << "bucket sync: sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
+ ldout(sync_env->cct, 5) << "bucket sync: sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]" << dendl;
logger.log("fetch");
call(data_sync_module->sync_object(sync_env, *bucket_info, key, versioned_epoch, &zones_trace));
} else if (op == CLS_RGW_OP_DEL || op == CLS_RGW_OP_UNLINK_INSTANCE) {
versioned = true;
}
logger.log("remove");
- call(data_sync_module->remove_object(sync_env, *bucket_info, key, timestamp, versioned, versioned_epoch, &zones_trace));
+ call(data_sync_module->remove_object(sync_env, *bucket_info, key, timestamp, versioned, versioned_epoch.value_or(0), &zones_trace));
} else if (op == CLS_RGW_OP_LINK_OLH_DM) {
logger.log("creating delete marker");
set_status("creating delete marker");
- ldout(sync_env->cct, 10) << "creating delete marker: obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
- call(data_sync_module->create_delete_marker(sync_env, *bucket_info, key, timestamp, owner, versioned, versioned_epoch, &zones_trace));
+ ldout(sync_env->cct, 10) << "creating delete marker: obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]" << dendl;
+ call(data_sync_module->create_delete_marker(sync_env, *bucket_info, key, timestamp, owner, versioned, versioned_epoch.value_or(0), &zones_trace));
}
}
} while (marker_tracker->need_retry(key));
boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
bucket_list_result list_result;
list<bucket_list_entry>::iterator entries_iter;
- rgw_bucket_shard_full_sync_marker& full_marker;
+ rgw_bucket_shard_sync_info& sync_info;
RGWBucketFullSyncShardMarkerTrack marker_tracker;
rgw_obj_key list_marker;
bucket_list_entry *entry{nullptr};
RGWBucketInfo *_bucket_info,
const std::string& status_oid,
RGWContinuousLeaseCR *lease_cr,
- rgw_bucket_shard_full_sync_marker& _full_marker)
+ rgw_bucket_shard_sync_info& sync_info)
: RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
- bucket_info(_bucket_info), lease_cr(lease_cr), full_marker(_full_marker),
- marker_tracker(sync_env, status_oid, full_marker),
+ bucket_info(_bucket_info), lease_cr(lease_cr), sync_info(sync_info),
+ marker_tracker(sync_env, status_oid, sync_info.full_marker),
status_oid(status_oid) {
logger.init(sync_env, "BucketFull", bs.get_key());
zones_trace.insert(sync_env->source_zone);
{
int ret;
reenter(this) {
- list_marker = full_marker.position;
+ list_marker = sync_info.full_marker.position;
- total_entries = full_marker.count;
+ total_entries = sync_info.full_marker.count;
do {
if (!lease_cr->is_locked()) {
drain_all();
while (again) {
again = collect(&ret, nullptr);
if (ret < 0) {
- ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
+ ldout(sync_env->cct, 10) << "a sync operation returned error" << dendl;
sync_status = ret;
/* we have reported this error */
}
while (again) {
again = collect(&ret, nullptr);
if (ret < 0) {
- ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
+ ldout(sync_env->cct, 10) << "a sync operation returned error" << dendl;
sync_status = ret;
/* we have reported this error */
}
/* update sync state to incremental */
if (sync_status == 0) {
yield {
- rgw_bucket_shard_sync_info sync_status;
- sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
+ sync_info.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
map<string, bufferlist> attrs;
- sync_status.encode_state_attr(attrs);
+ sync_info.encode_state_attr(attrs);
RGWRados *store = sync_env->store;
call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store,
rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
attrs));
}
} else {
- ldout(sync_env->cct, 0) << "ERROR: failure in sync, backing out (sync_status=" << sync_status<< ")" << dendl;
+ ldout(sync_env->cct, 10) << "failure in sync, backing out (sync_status=" << sync_status<< ")" << dendl;
}
if (retcode < 0 && sync_status == 0) { /* actually tried to set incremental state and failed */
ldout(sync_env->cct, 0) << "ERROR: failed to set sync state on bucket "
return 0;
}
+static bool has_olh_epoch(RGWModifyOp op) {
+ return op == CLS_RGW_OP_LINK_OLH || op == CLS_RGW_OP_UNLINK_INSTANCE;
+}
+
class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
const rgw_bucket_shard& bs;
RGWBucketInfo *bucket_info;
boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
list<rgw_bi_log_entry> list_result;
- list<rgw_bi_log_entry>::iterator entries_iter;
+ list<rgw_bi_log_entry>::iterator entries_iter, entries_end;
map<pair<string, string>, pair<real_time, RGWModifyOp> > squash_map;
- rgw_bucket_shard_inc_sync_marker& inc_marker;
+ rgw_bucket_shard_sync_info& sync_info;
rgw_obj_key key;
rgw_bi_log_entry *entry{nullptr};
RGWBucketIncSyncShardMarkerTrack marker_tracker;
bool updated_status{false};
const string& status_oid;
const string& zone_id;
- ceph::real_time sync_modify_time;
string cur_id;
RGWBucketInfo *_bucket_info,
const std::string& status_oid,
RGWContinuousLeaseCR *lease_cr,
- rgw_bucket_shard_inc_sync_marker& _inc_marker)
+ rgw_bucket_shard_sync_info& sync_info)
: RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
- bucket_info(_bucket_info), lease_cr(lease_cr), inc_marker(_inc_marker),
- marker_tracker(sync_env, status_oid, inc_marker), status_oid(status_oid) , zone_id(_sync_env->store->get_zone().id){
+ bucket_info(_bucket_info), lease_cr(lease_cr), sync_info(sync_info),
+ marker_tracker(sync_env, status_oid, sync_info.inc_marker),
+ status_oid(status_oid), zone_id(_sync_env->store->get_zone().id)
+ {
set_description() << "bucket shard incremental sync bucket="
<< bucket_shard_str{bs};
set_status("init");
drain_all();
return set_cr_error(-ECANCELED);
}
- ldout(sync_env->cct, 20) << __func__ << "(): listing bilog for incremental sync" << inc_marker.position << dendl;
- set_status() << "listing bilog; position=" << inc_marker.position;
- yield call(new RGWListBucketIndexLogCR(sync_env, bs, inc_marker.position,
+ ldout(sync_env->cct, 20) << __func__ << "(): listing bilog for incremental sync" << sync_info.inc_marker.position << dendl;
+ set_status() << "listing bilog; position=" << sync_info.inc_marker.position;
+ yield call(new RGWListBucketIndexLogCR(sync_env, bs, sync_info.inc_marker.position,
&list_result));
- if (retcode < 0 && retcode != -ENOENT ) {
+ if (retcode < 0 && retcode != -ENOENT) {
+ /* wait for all operations to complete */
drain_all();
- if (!syncstopped) {
- /* wait for all operations to complete */
- return set_cr_error(retcode);
- } else {
- /* no need to retry */
- break;
- }
+ return set_cr_error(retcode);
}
squash_map.clear();
- for (auto& e : list_result) {
- if (e.op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP && (sync_modify_time < e.timestamp)) {
- ldout(sync_env->cct, 20) << " syncstop on " << e.timestamp << dendl;
- sync_modify_time = e.timestamp;
+ entries_iter = list_result.begin();
+ entries_end = list_result.end();
+ for (; entries_iter != entries_end; ++entries_iter) {
+ auto e = *entries_iter;
+ if (e.op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP) {
+ ldout(sync_env->cct, 20) << "syncstop on " << e.timestamp << dendl;
syncstopped = true;
- continue;
+ entries_end = entries_iter; // dont sync past here
+ break;
}
- if (e.op == RGWModifyOp::CLS_RGW_OP_RESYNC && (sync_modify_time < e.timestamp)) {
- ldout(sync_env->cct, 20) << " resync on " << e.timestamp << dendl;
- sync_modify_time = e.timestamp;
- syncstopped = false;
+ if (e.op == RGWModifyOp::CLS_RGW_OP_RESYNC) {
continue;
}
if (e.op == CLS_RGW_OP_CANCEL) {
continue;
}
auto& squash_entry = squash_map[make_pair(e.object, e.instance)];
+ // don't squash over olh entries - we need to apply their olh_epoch
+ if (has_olh_epoch(squash_entry.second) && !has_olh_epoch(e.op)) {
+ continue;
+ }
if (squash_entry.first <= e.timestamp) {
squash_entry = make_pair<>(e.timestamp, e.op);
}
}
entries_iter = list_result.begin();
- for (; entries_iter != list_result.end(); ++entries_iter) {
+ for (; entries_iter != entries_end; ++entries_iter) {
if (!lease_cr->is_locked()) {
drain_all();
return set_cr_error(-ECANCELED);
cur_id = entry->id.substr(p + 1);
}
}
- inc_marker.position = cur_id;
+ sync_info.inc_marker.position = cur_id;
if (entry->op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP || entry->op == RGWModifyOp::CLS_RGW_OP_RESYNC) {
- ldout(sync_env->cct, 20) << "detected syncstop or resync on " << entries_iter->timestamp << " , skipping entry" << dendl;
+ ldout(sync_env->cct, 20) << "detected syncstop or resync on " << entries_iter->timestamp << ", skipping entry" << dendl;
marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
continue;
}
set_status() << "squashed operation, skipping";
ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
<< bucket_shard_str{bs} << "/" << key << ": squashed operation" << dendl;
- /* not updating high marker though */
+ marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
continue;
}
ldout(sync_env->cct, 20) << "[inc sync] syncing object: "
<< bucket_shard_str{bs} << "/" << key << dendl;
updated_status = false;
- while (!marker_tracker.can_do_op(key)) {
+ while (!marker_tracker.can_do_op(key, has_olh_epoch(entry->op))) {
if (!updated_status) {
set_status() << "can't do op, conflicting inflight operation";
updated_status = true;
/* get error, stop */
break;
}
- if (!marker_tracker.index_key_to_marker(key, cur_id)) {
+ if (!marker_tracker.index_key_to_marker(key, cur_id, has_olh_epoch(entry->op))) {
set_status() << "can't do op, sync already in progress for object";
ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << cur_id << ":" << key << " sync already in progress for object" << dendl;
marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
if (!marker_tracker.start(cur_id, 0, entry->timestamp)) {
ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << cur_id << ". Duplicate entry?" << dendl;
} else {
- uint64_t versioned_epoch = 0;
+ boost::optional<uint64_t> versioned_epoch;
rgw_bucket_entry_owner owner(entry->owner, entry->owner_display_name);
if (entry->ver.pool < 0) {
versioned_epoch = entry->ver.epoch;
while (again) {
again = collect(&ret, nullptr);
if (ret < 0) {
- ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
+ ldout(sync_env->cct, 10) << "a sync operation returned error" << dendl;
sync_status = ret;
/* we have reported this error */
}
}
}
}
- } while (!list_result.empty() && sync_status == 0);
-
- if (syncstopped) {
- drain_all();
-
- yield {
- const string& oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs);
- RGWRados *store = sync_env->store;
- call(new RGWRadosRemoveCR(store, rgw_raw_obj{store->get_zone_params().log_pool, oid}));
- }
- lease_cr->abort();
- return set_cr_done();
- }
+ } while (!list_result.empty() && sync_status == 0 && !syncstopped);
while (num_spawned()) {
yield wait_for_child();
while (again) {
again = collect(&ret, nullptr);
if (ret < 0) {
- ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
+ ldout(sync_env->cct, 10) << "a sync operation returned error" << dendl;
sync_status = ret;
/* we have reported this error */
}
}
}
+ if (syncstopped) {
+ // transition back to StateInit in RGWRunBucketSyncCoroutine. if sync is
+ // still disabled, we'll delete the sync status object. otherwise we'll
+ // restart full sync to catch any changes that happened while sync was
+ // disabled
+ sync_info.state = rgw_bucket_shard_sync_info::StateInit;
+ return set_cr_done();
+ }
+
yield call(marker_tracker.flush());
if (retcode < 0) {
ldout(sync_env->cct, 0) << "ERROR: marker_tracker.flush() returned retcode=" << retcode << dendl;
return set_cr_error(retcode);
}
if (sync_status < 0) {
- ldout(sync_env->cct, 0) << "ERROR: failure in sync, backing out (sync_status=" << sync_status<< ")" << dendl;
- }
-
- /* wait for all operations to complete */
- drain_all();
-
- if (sync_status < 0) {
+ ldout(sync_env->cct, 10) << "failure in sync, backing out (sync_status=" << sync_status<< ")" << dendl;
return set_cr_error(sync_status);
}
-
return set_cr_done();
}
return 0;
return set_cr_error(retcode);
}
- if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
- yield call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bs, sync_status));
- if (retcode < 0) {
- ldout(sync_env->cct, 0) << "ERROR: init sync on " << bucket_shard_str{bs}
- << " failed, retcode=" << retcode << dendl;
- lease_cr->go_down();
- drain_all();
- return set_cr_error(retcode);
+ do {
+ if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
+ yield call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bs, sync_status));
+ if (retcode < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: init sync on " << bucket_shard_str{bs}
+ << " failed, retcode=" << retcode << dendl;
+ lease_cr->go_down();
+ drain_all();
+ return set_cr_error(retcode);
+ }
}
- }
- if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
- yield call(new RGWBucketShardFullSyncCR(sync_env, bs, &bucket_info,
- status_oid, lease_cr.get(),
- sync_status.full_marker));
- if (retcode < 0) {
- ldout(sync_env->cct, 5) << "full sync on " << bucket_shard_str{bs}
- << " failed, retcode=" << retcode << dendl;
- lease_cr->go_down();
- drain_all();
- return set_cr_error(retcode);
+ if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
+ yield call(new RGWBucketShardFullSyncCR(sync_env, bs, &bucket_info,
+ status_oid, lease_cr.get(),
+ sync_status));
+ if (retcode < 0) {
+ ldout(sync_env->cct, 5) << "full sync on " << bucket_shard_str{bs}
+ << " failed, retcode=" << retcode << dendl;
+ lease_cr->go_down();
+ drain_all();
+ return set_cr_error(retcode);
+ }
}
- sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
- }
- if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
- yield call(new RGWBucketShardIncrementalSyncCR(sync_env, bs, &bucket_info,
- status_oid, lease_cr.get(),
- sync_status.inc_marker));
- if (retcode < 0) {
- ldout(sync_env->cct, 5) << "incremental sync on " << bucket_shard_str{bs}
- << " failed, retcode=" << retcode << dendl;
- lease_cr->go_down();
- drain_all();
- return set_cr_error(retcode);
+ if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
+ yield call(new RGWBucketShardIncrementalSyncCR(sync_env, bs, &bucket_info,
+ status_oid, lease_cr.get(),
+ sync_status));
+ if (retcode < 0) {
+ ldout(sync_env->cct, 5) << "incremental sync on " << bucket_shard_str{bs}
+ << " failed, retcode=" << retcode << dendl;
+ lease_cr->go_down();
+ drain_all();
+ return set_cr_error(retcode);
+ }
}
- }
+ // loop back to previous states unless incremental sync returns normally
+ } while (sync_status.state != rgw_bucket_shard_sync_info::StateIncrementalSync);
lease_cr->go_down();
drain_all();
RGWDataSyncEnv env;
RGWSyncModuleInstanceRef module; // null sync module
env.init(store->ctx(), store, nullptr, store->get_async_rados(),
- nullptr, nullptr, source_zone, module, nullptr);
+ nullptr, nullptr, source_zone, module);
RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
return crs.run(new RGWCollectBucketSyncStatusCR(store, &env, num_shards,