#include "rgw_bucket.h"
#include "rgw_metadata.h"
#include "rgw_sync_module.h"
+#include "rgw_sync_log_trim.h"
#include "cls/lock/cls_lock_client.h"
return true;
}
+class RGWReadDataSyncRecoveringShardsCR : public RGWShardCollectCR {
+ static constexpr int MAX_CONCURRENT_SHARDS = 16;
+
+ RGWDataSyncEnv *env;
+
+ uint64_t max_entries;
+ int num_shards;
+ int shard_id{0};;
+
+ string marker;
+ map<int, std::set<std::string>> &entries_map;
+
+ public:
+ RGWReadDataSyncRecoveringShardsCR(RGWDataSyncEnv *env, uint64_t _max_entries, int _num_shards,
+ map<int, std::set<std::string>>& _entries_map)
+ : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS), env(env),
+ max_entries(_max_entries), num_shards(_num_shards), entries_map(_entries_map)
+ {}
+ bool spawn_next() override;
+};
+
+bool RGWReadDataSyncRecoveringShardsCR::spawn_next()
+{
+ if (shard_id > num_shards)
+ return false;
+
+ string error_oid = RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id) + ".retry";
+ spawn(new RGWRadosGetOmapKeysCR(env->store, rgw_raw_obj(env->store->get_zone_params().log_pool, error_oid),
+ marker, &entries_map[shard_id], max_entries), false);
+
+ ++shard_id;
+ return true;
+}
+
class RGWReadDataSyncStatusCoroutine : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
rgw_data_sync_status *sync_status;
int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncModuleInstanceRef& _sync_module)
{
- sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger, _source_zone, _sync_module);
+ sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger,
+ _source_zone, _sync_module, observer);
if (initialized) {
return 0;
return ret;
}
+int RGWRemoteDataLog::read_recovering_shards(const int num_shards, set<int>& recovering_shards)
+{
+ // cannot run concurrently with run_sync(), so run in a separate manager
+ RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
+ RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
+ int ret = http_manager.set_threaded();
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
+ return ret;
+ }
+ RGWDataSyncEnv sync_env_local = sync_env;
+ sync_env_local.http_manager = &http_manager;
+ map<int, std::set<std::string>> entries_map;
+ uint64_t max_entries{1};
+ ret = crs.run(new RGWReadDataSyncRecoveringShardsCR(&sync_env_local, max_entries, num_shards, entries_map));
+ http_manager.stop();
+
+ if (ret == 0) {
+ for (const auto& entry : entries_map) {
+ if (entry.second.size() != 0) {
+ recovering_shards.insert(entry.first);
+ }
+ }
+ }
+
+ return ret;
+}
+
int RGWRemoteDataLog::init_sync_status(int num_shards)
{
rgw_data_sync_status sync_status;
+ sync_status.sync_info.num_shards = num_shards;
+
RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
int ret = http_manager.set_threaded();
int operate() override {
reenter(this) {
- entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards,
- store->get_zone_params().log_pool,
- oid_prefix);
yield {
string entrypoint = string("/admin/metadata/bucket.instance");
/* FIXME: need a better scaling solution here, requires streaming output */
call(new RGWReadRESTResourceCR<list<string> >(store->ctx(), sync_env->conn, sync_env->http_manager,
entrypoint, NULL, &result));
}
- if (get_ret_status() < 0) {
+ if (retcode < 0) {
ldout(sync_env->cct, 0) << "ERROR: failed to fetch metadata for section bucket.index" << dendl;
- return set_state(RGWCoroutine_Error);
+ return set_cr_error(retcode);
}
+ entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards,
+ store->get_zone_params().log_pool,
+ oid_prefix);
+ yield; // yield so OmapAppendCRs can start
for (iter = result.begin(); iter != result.end(); ++iter) {
ldout(sync_env->cct, 20) << "list metadata: section=bucket.index key=" << *iter << dendl;
}
if (sync_status < 0) {
- yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", raw_key,
- -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status)));
- if (retcode < 0) {
- ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure: retcode=" << retcode << dendl;
+ // write actual sync failures for 'radosgw-admin sync error list'
+ if (sync_status != -EBUSY && sync_status != -EAGAIN) {
+ yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", raw_key,
+ -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status)));
+ if (retcode < 0) {
+ ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure: retcode=" << retcode << dendl;
+ }
}
if (error_repo && !error_repo->append(raw_key)) {
ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure in error repo: retcode=" << retcode << dendl;
<< error_repo->get_obj() << " retcode=" << retcode << dendl;
}
}
+ if (sync_env->observer) {
+ sync_env->observer->on_bucket_changed(bs.bucket.get_key());
+ }
/* FIXME: what do do in case of error */
if (marker_tracker && !entry_marker.empty()) {
/* update marker */
uint32_t shard_id;
rgw_data_sync_marker sync_marker;
- map<string, bufferlist> entries;
- map<string, bufferlist>::iterator iter;
+ std::set<std::string> entries;
+ std::set<std::string>::iterator iter;
string oid;
string error_oid;
RGWOmapAppend *error_repo;
- map<string, bufferlist> error_entries;
+ std::set<std::string> error_entries;
string error_marker;
int max_error_entries;
}
iter = entries.begin();
for (; iter != entries.end(); ++iter) {
- ldout(sync_env->cct, 20) << __func__ << ": full sync: " << iter->first << dendl;
+ ldout(sync_env->cct, 20) << __func__ << ": full sync: " << *iter << dendl;
total_entries++;
- if (!marker_tracker->start(iter->first, total_entries, real_time())) {
- ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << iter->first << ". Duplicate entry?" << dendl;
+ if (!marker_tracker->start(*iter, total_entries, real_time())) {
+ ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << *iter << ". Duplicate entry?" << dendl;
} else {
// fetch remote and write locally
- yield spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, marker_tracker, error_repo, false), false);
+ yield spawn(new RGWDataSyncSingleEntryCR(sync_env, *iter, *iter, marker_tracker, error_repo, false), false);
if (retcode < 0) {
lease_cr->go_down();
drain_all();
return set_cr_error(retcode);
}
}
- sync_marker.marker = iter->first;
+ sync_marker.marker = *iter;
}
} while ((int)entries.size() == max_entries);
ldout(sync_env->cct, 20) << __func__ << "(): read error repo, got " << error_entries.size() << " entries" << dendl;
iter = error_entries.begin();
for (; iter != error_entries.end(); ++iter) {
- ldout(sync_env->cct, 20) << __func__ << "(): handle error entry: " << iter->first << dendl;
- spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, nullptr /* no marker tracker */, error_repo, true), false);
- error_marker = iter->first;
+ error_marker = *iter;
+ ldout(sync_env->cct, 20) << __func__ << "(): handle error entry: " << error_marker << dendl;
+ spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true), false);
}
if ((int)error_entries.size() != max_error_entries) {
if (error_marker.empty() && error_entries.empty()) {
if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) {
/* call sync module init here */
+ sync_status.sync_info.num_shards = num_shards;
yield call(data_sync_module->init_sync(sync_env));
if (retcode < 0) {
ldout(sync_env->cct, 0) << "ERROR: sync module init_sync() failed, retcode=" << retcode << dendl;
RGWDataSyncEnv *sync_env;
uint32_t num_shards;
+ static constexpr bool exit_on_error = false; // retry on all errors
public:
- RGWDataSyncControlCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards) : RGWBackoffControlCR(_sync_env->cct, true),
+ RGWDataSyncControlCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards) : RGWBackoffControlCR(_sync_env->cct, exit_on_error),
sync_env(_sync_env), num_shards(_num_shards) {
}
RGWZoneParams& zone_params = store->get_zone_params();
- sync_module = store->get_sync_module();
+ if (sync_module == nullptr) {
+ sync_module = store->get_sync_module();
+ }
conn = store->get_zone_conn_by_id(source_zone);
if (!conn) {
bs.bucket = bucket;
bs.shard_id = shard_id;
- sync_env.init(store->ctx(), store, conn, async_rados, http_manager, _error_logger, source_zone, _sync_module);
+ sync_env.init(store->ctx(), store, conn, async_rados, http_manager,
+ _error_logger, source_zone, _sync_module, nullptr);
return 0;
}
-struct bucket_index_marker_info {
- string bucket_ver;
- string master_ver;
- string max_marker;
- bool syncstopped{false};
-
- void decode_json(JSONObj *obj) {
- JSONDecoder::decode_json("bucket_ver", bucket_ver, obj);
- JSONDecoder::decode_json("master_ver", master_ver, obj);
- JSONDecoder::decode_json("max_marker", max_marker, obj);
- JSONDecoder::decode_json("syncstopped", syncstopped, obj);
- }
-};
-
class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
const string instance_key;
- bucket_index_marker_info *info;
+ rgw_bucket_index_marker_info *info;
public:
RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncEnv *_sync_env,
const rgw_bucket_shard& bs,
- bucket_index_marker_info *_info)
+ rgw_bucket_index_marker_info *_info)
: RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
instance_key(bs.get_key()), info(_info) {}
{ NULL, NULL } };
string p = "/admin/log/";
- call(new RGWReadRESTResourceCR<bucket_index_marker_info>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, info));
+ call(new RGWReadRESTResourceCR<rgw_bucket_index_marker_info>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, info));
}
if (retcode < 0) {
return set_cr_error(retcode);
rgw_bucket_shard_sync_info& status;
- bucket_index_marker_info info;
+ rgw_bucket_index_marker_info info;
public:
RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
const rgw_bucket_shard& bs,
}
return 0;
}
+
+#define OMAP_READ_MAX_ENTRIES 10
+class RGWReadRecoveringBucketShardsCoroutine : public RGWCoroutine {
+ RGWDataSyncEnv *sync_env;
+ RGWRados *store;
+
+ const int shard_id;
+ int max_entries;
+
+ set<string>& recovering_buckets;
+ string marker;
+ string error_oid;
+
+ set<string> error_entries;
+ int max_omap_entries;
+ int count;
+
+public:
+ RGWReadRecoveringBucketShardsCoroutine(RGWDataSyncEnv *_sync_env, const int _shard_id,
+ set<string>& _recovering_buckets, const int _max_entries)
+ : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+ store(sync_env->store), shard_id(_shard_id), max_entries(_max_entries),
+ recovering_buckets(_recovering_buckets), max_omap_entries(OMAP_READ_MAX_ENTRIES)
+ {
+ error_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id) + ".retry";
+ }
+
+ int operate() override;
+};
+
+int RGWReadRecoveringBucketShardsCoroutine::operate()
+{
+ reenter(this){
+ //read recovering bucket shards
+ count = 0;
+ do {
+ yield call(new RGWRadosGetOmapKeysCR(store, rgw_raw_obj(store->get_zone_params().log_pool, error_oid),
+ marker, &error_entries, max_omap_entries));
+
+ if (retcode == -ENOENT) {
+ break;
+ }
+
+ if (retcode < 0) {
+ ldout(sync_env->cct, 0) << "failed to read recovering bucket shards with "
+ << cpp_strerror(retcode) << dendl;
+ return set_cr_error(retcode);
+ }
+
+ if (error_entries.empty()) {
+ break;
+ }
+
+ count += error_entries.size();
+ marker = *error_entries.rbegin();
+ recovering_buckets.insert(error_entries.begin(), error_entries.end());
+ }while((int)error_entries.size() == max_omap_entries && count < max_entries);
+
+ return set_cr_done();
+ }
+
+ return 0;
+}
+
+class RGWReadPendingBucketShardsCoroutine : public RGWCoroutine {
+ RGWDataSyncEnv *sync_env;
+ RGWRados *store;
+
+ const int shard_id;
+ int max_entries;
+
+ set<string>& pending_buckets;
+ string marker;
+ string status_oid;
+
+ rgw_data_sync_marker* sync_marker;
+ int count;
+
+ list<rgw_data_change_log_entry> log_entries;
+ bool truncated;
+
+public:
+ RGWReadPendingBucketShardsCoroutine(RGWDataSyncEnv *_sync_env, const int _shard_id,
+ set<string>& _pending_buckets,
+ rgw_data_sync_marker* _sync_marker, const int _max_entries)
+ : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+ store(sync_env->store), shard_id(_shard_id), max_entries(_max_entries),
+ pending_buckets(_pending_buckets), sync_marker(_sync_marker)
+ {
+ status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
+ }
+
+ int operate() override;
+};
+
+int RGWReadPendingBucketShardsCoroutine::operate()
+{
+ reenter(this){
+ //read sync status marker
+ using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
+ yield call(new CR(sync_env->async_rados, store,
+ rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
+ sync_marker));
+ if (retcode < 0) {
+ ldout(sync_env->cct,0) << "failed to read sync status marker with "
+ << cpp_strerror(retcode) << dendl;
+ return set_cr_error(retcode);
+ }
+
+ //read pending bucket shards
+ marker = sync_marker->marker;
+ count = 0;
+ do{
+ yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &marker, &log_entries, &truncated));
+
+ if (retcode == -ENOENT) {
+ break;
+ }
+
+ if (retcode < 0) {
+ ldout(sync_env->cct,0) << "failed to read remote data log info with "
+ << cpp_strerror(retcode) << dendl;
+ return set_cr_error(retcode);
+ }
+
+ if (log_entries.empty()) {
+ break;
+ }
+
+ count += log_entries.size();
+ for (const auto& entry : log_entries) {
+ pending_buckets.insert(entry.entry.key);
+ }
+ }while(truncated && count < max_entries);
+
+ return set_cr_done();
+ }
+
+ return 0;
+}
+
+int RGWRemoteDataLog::read_shard_status(int shard_id, set<string>& pending_buckets, set<string>& recovering_buckets, rgw_data_sync_marker *sync_marker, const int max_entries)
+{
+ // cannot run concurrently with run_sync(), so run in a separate manager
+ RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
+ RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
+ int ret = http_manager.set_threaded();
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "failed in http_manager.start() ret=" << ret << dendl;
+ return ret;
+ }
+ RGWDataSyncEnv sync_env_local = sync_env;
+ sync_env_local.http_manager = &http_manager;
+ list<RGWCoroutinesStack *> stacks;
+ RGWCoroutinesStack* recovering_stack = new RGWCoroutinesStack(store->ctx(), &crs);
+ recovering_stack->call(new RGWReadRecoveringBucketShardsCoroutine(&sync_env_local, shard_id, recovering_buckets, max_entries));
+ stacks.push_back(recovering_stack);
+ RGWCoroutinesStack* pending_stack = new RGWCoroutinesStack(store->ctx(), &crs);
+ pending_stack->call(new RGWReadPendingBucketShardsCoroutine(&sync_env_local, shard_id, pending_buckets, sync_marker, max_entries));
+ stacks.push_back(pending_stack);
+ ret = crs.run(stacks);
+ http_manager.stop();
+ return ret;
+}
+
RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status)
{
return new RGWReadBucketSyncStatusCoroutine(&sync_env, bs, sync_status);
sync_status = retcode;
}
if (!error_ss.str().empty()) {
- yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", error_ss.str(), -retcode, "failed to sync object"));
+ yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", error_ss.str(), -retcode, string("failed to sync object") + cpp_strerror(-sync_status)));
}
done:
if (sync_status == 0) {
syncstopped = false;
continue;
}
+ if (e.op == CLS_RGW_OP_CANCEL) {
+ continue;
+ }
if (e.state != CLS_RGW_STATE_COMPLETE) {
continue;
}
return bucket_status_oid_prefix + "." + source_zone + ":" + bs.get_key();
}
+class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR {
+ static constexpr int max_concurrent_shards = 16;
+ RGWRados *const store;
+ RGWDataSyncEnv *const env;
+ const int num_shards;
+ rgw_bucket_shard bs;
+
+ using Vector = std::vector<rgw_bucket_shard_sync_info>;
+ Vector::iterator i, end;
+
+ public:
+ RGWCollectBucketSyncStatusCR(RGWRados *store, RGWDataSyncEnv *env,
+ int num_shards, const rgw_bucket& bucket,
+ Vector *status)
+ : RGWShardCollectCR(store->ctx(), max_concurrent_shards),
+ store(store), env(env), num_shards(num_shards),
+ bs(bucket, num_shards > 0 ? 0 : -1), // start at shard 0 or -1
+ i(status->begin()), end(status->end())
+ {}
+
+ bool spawn_next() override {
+ if (i == end) {
+ return false;
+ }
+ spawn(new RGWReadBucketSyncStatusCoroutine(env, bs, &*i), false);
+ ++i;
+ ++bs.shard_id;
+ return true;
+ }
+};
+
+int rgw_bucket_sync_status(RGWRados *store, const std::string& source_zone,
+ const RGWBucketInfo& bucket_info,
+ std::vector<rgw_bucket_shard_sync_info> *status)
+{
+ const auto num_shards = bucket_info.num_shards;
+ status->clear();
+ status->resize(std::max<size_t>(1, num_shards));
+
+ RGWDataSyncEnv env;
+ RGWSyncModuleInstanceRef module; // null sync module
+ env.init(store->ctx(), store, nullptr, store->get_async_rados(),
+ nullptr, nullptr, source_zone, module, nullptr);
+
+ RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
+ return crs.run(new RGWCollectBucketSyncStatusCR(store, &env, num_shards,
+ bucket_info.bucket, status));
+}
+
// TODO: move into rgw_data_sync_trim.cc
#undef dout_prefix