#include "rgw_common.h"
#include "rgw_rados.h"
+#include "rgw_zone.h"
#include "rgw_sync.h"
#include "rgw_data_sync.h"
#include "rgw_rest_conn.h"
#include "cls/lock/cls_lock_client.h"
-#include "auth/Crypto.h"
+#include "services/svc_zone.h"
+#include "services/svc_sync_modules.h"
+
+#include "include/random.h"
#include <boost/asio/yield.hpp>
static string datalog_sync_status_shard_prefix = "datalog.sync-status.shard";
static string datalog_sync_full_sync_index_prefix = "data.full-sync.index";
static string bucket_status_oid_prefix = "bucket.sync-status";
+static string object_status_oid_prefix = "bucket.sync-status";
-class RGWSyncDebugLogger {
- CephContext *cct;
- string prefix;
-
- bool ended;
-
-public:
- RGWSyncDebugLogger(CephContext *_cct, const string& source_zone,
- const string& sync_type, const string& sync_stage,
- const string& resource, bool log_start = true) {
- init(_cct, source_zone, sync_type, sync_stage, resource, log_start);
- }
- RGWSyncDebugLogger() : cct(NULL), ended(false) {}
- ~RGWSyncDebugLogger();
-
- void init(CephContext *_cct, const string& source_zone,
- const string& sync_type, const string& sync_stage,
- const string& resource, bool log_start = true);
- void log(const string& state);
- void finish(int status);
-};
-
-void RGWSyncDebugLogger::init(CephContext *_cct, const string& source_zone,
- const string& sync_type, const string& sync_section,
- const string& resource, bool log_start)
-{
- cct = _cct;
- ended = false;
- string zone_str = source_zone.substr(0, 8);
- prefix = "Sync:" + zone_str + ":" + sync_type + ":" + sync_section + ":" + resource;
- if (log_start) {
- log("start");
- }
-}
-
-RGWSyncDebugLogger::~RGWSyncDebugLogger()
-{
- if (!ended) {
- log("finish");
- }
-}
-
-void RGWSyncDebugLogger::log(const string& state)
-{
- ldout(cct, 5) << prefix << ":" << state << dendl;
-}
-
-void RGWSyncDebugLogger::finish(int status)
-{
- ended = true;
- ldout(cct, 5) << prefix << ":" << "finish r=" << status << dendl;
-}
-
-class RGWDataSyncDebugLogger : public RGWSyncDebugLogger {
-public:
- RGWDataSyncDebugLogger() {}
- RGWDataSyncDebugLogger(RGWDataSyncEnv *sync_env, const string& sync_section,
- const string& resource, bool log_start = true) {
- init(sync_env, sync_section, resource, log_start);
- }
- void init(RGWDataSyncEnv *sync_env, const string& sync_section,
- const string& resource, bool log_start = true) {
- RGWSyncDebugLogger::init(sync_env->cct, sync_env->source_zone, "data", sync_section, resource, log_start);
- }
-
-};
void rgw_datalog_info::decode_json(JSONObj *obj) {
JSONDecoder::decode_json("num_objects", num_shards, obj);
return false;
}
using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
- spawn(new CR(env->async_rados, env->store,
- rgw_raw_obj(env->store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id)),
+ spawn(new CR(env->async_rados, env->store->svc.sysobj,
+ rgw_raw_obj(env->store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id)),
&markers[shard_id]),
false);
shard_id++;
uint64_t max_entries;
int num_shards;
- int shard_id{0};;
+ int shard_id{0};
string marker;
- map<int, std::set<std::string>> &entries_map;
+ std::vector<RGWRadosGetOmapKeysCR::ResultPtr>& omapkeys;
public:
RGWReadDataSyncRecoveringShardsCR(RGWDataSyncEnv *env, uint64_t _max_entries, int _num_shards,
- map<int, std::set<std::string>>& _entries_map)
+ std::vector<RGWRadosGetOmapKeysCR::ResultPtr>& omapkeys)
: RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS), env(env),
- max_entries(_max_entries), num_shards(_num_shards), entries_map(_entries_map)
+ max_entries(_max_entries), num_shards(_num_shards), omapkeys(omapkeys)
{}
bool spawn_next() override;
};
bool RGWReadDataSyncRecoveringShardsCR::spawn_next()
{
- if (shard_id > num_shards)
+ if (shard_id >= num_shards)
return false;
string error_oid = RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id) + ".retry";
- spawn(new RGWRadosGetOmapKeysCR(env->store, rgw_raw_obj(env->store->get_zone_params().log_pool, error_oid),
- marker, &entries_map[shard_id], max_entries), false);
+ auto& shard_keys = omapkeys[shard_id];
+ shard_keys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
+ spawn(new RGWRadosGetOmapKeysCR(env->store, rgw_raw_obj(env->store->svc.zone->get_zone_params().log_pool, error_oid),
+ marker, max_entries, shard_keys), false);
++shard_id;
return true;
using ReadInfoCR = RGWSimpleRadosReadCR<rgw_data_sync_info>;
yield {
bool empty_on_enoent = false; // fail on ENOENT
- call(new ReadInfoCR(sync_env->async_rados, sync_env->store,
- rgw_raw_obj(sync_env->store->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
+ call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc.sysobj,
+ rgw_raw_obj(sync_env->store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
&sync_status->sync_info, empty_on_enoent));
}
if (retcode < 0) {
http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
- http_op->set_user_info((void *)stack);
+ init_new_io(http_op);
int ret = http_op->aio_read();
if (ret < 0) {
http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
- http_op->set_user_info((void *)stack);
+ init_new_io(http_op);
int ret = http_op->aio_read();
if (ret < 0) {
string p = "/admin/log/";
http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
- http_op->set_user_info((void *)stack);
+ init_new_io(http_op);
int ret = http_op->aio_read();
if (ret < 0) {
string cookie;
rgw_data_sync_status *status;
map<int, RGWDataChangesLogInfo> shards_info;
+
+ RGWSyncTraceNodeRef tn;
public:
RGWInitDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, uint32_t num_shards,
uint64_t instance_id,
+ RGWSyncTraceNodeRef& _tn_parent,
rgw_data_sync_status *status)
: RGWCoroutine(_sync_env->cct), sync_env(_sync_env), store(sync_env->store),
- pool(store->get_zone_params().log_pool),
- num_shards(num_shards), status(status) {
+ pool(store->svc.zone->get_zone_params().log_pool),
+ num_shards(num_shards), status(status),
+ tn(sync_env->sync_tracer->add_node(_tn_parent, "init_data_sync_status")) {
lock_name = "sync_lock";
status->sync_info.instance_id = instance_id;
cookie = buf;
sync_status_oid = RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone);
+
}
int operate() override {
rgw_raw_obj{pool, sync_status_oid},
lock_name, cookie, lock_duration));
if (retcode < 0) {
- ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
+ tn->log(0, SSTR("ERROR: failed to take a lock on " << sync_status_oid));
return set_cr_error(retcode);
}
using WriteInfoCR = RGWSimpleRadosWriteCR<rgw_data_sync_info>;
- yield call(new WriteInfoCR(sync_env->async_rados, store,
+ yield call(new WriteInfoCR(sync_env->async_rados, store->svc.sysobj,
rgw_raw_obj{pool, sync_status_oid},
status->sync_info));
if (retcode < 0) {
- ldout(cct, 0) << "ERROR: failed to write sync status info with " << retcode << dendl;
+ tn->log(0, SSTR("ERROR: failed to write sync status info with " << retcode));
return set_cr_error(retcode);
}
rgw_raw_obj{pool, sync_status_oid},
lock_name, cookie, lock_duration));
if (retcode < 0) {
- ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
+ tn->log(0, SSTR("ERROR: failed to take a lock on " << sync_status_oid));
return set_cr_error(retcode);
}
+ tn->log(10, "took lease");
+
/* fetch current position in logs */
yield {
- RGWRESTConn *conn = store->get_zone_conn_by_id(sync_env->source_zone);
+ RGWRESTConn *conn = store->svc.zone->get_zone_conn_by_id(sync_env->source_zone);
if (!conn) {
- ldout(cct, 0) << "ERROR: connection to zone " << sync_env->source_zone << " does not exist!" << dendl;
+ tn->log(0, SSTR("ERROR: connection to zone " << sync_env->source_zone << " does not exist!"));
return set_cr_error(-EIO);
}
for (uint32_t i = 0; i < num_shards; i++) {
}
while (collect(&ret, NULL)) {
if (ret < 0) {
- ldout(cct, 0) << "ERROR: failed to read remote data log shards" << dendl;
+ tn->log(0, SSTR("ERROR: failed to read remote data log shards"));
return set_state(RGWCoroutine_Error);
}
yield;
marker.timestamp = info.last_update;
const auto& oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, i);
using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_data_sync_marker>;
- spawn(new WriteMarkerCR(sync_env->async_rados, store,
+ spawn(new WriteMarkerCR(sync_env->async_rados, store->svc.sysobj,
rgw_raw_obj{pool, oid}, marker), true);
}
}
while (collect(&ret, NULL)) {
if (ret < 0) {
- ldout(cct, 0) << "ERROR: failed to write data sync status markers" << dendl;
+ tn->log(0, SSTR("ERROR: failed to write data sync status markers"));
return set_state(RGWCoroutine_Error);
}
yield;
}
status->sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
- yield call(new WriteInfoCR(sync_env->async_rados, store,
+ yield call(new WriteInfoCR(sync_env->async_rados, store->svc.sysobj,
rgw_raw_obj{pool, sync_status_oid},
status->sync_info));
if (retcode < 0) {
- ldout(cct, 0) << "ERROR: failed to write sync status info with " << retcode << dendl;
+ tn->log(0, SSTR("ERROR: failed to write sync status info with " << retcode));
return set_cr_error(retcode);
}
yield call(new RGWSimpleRadosUnlockCR(sync_env->async_rados, store,
int ret = sync_env.conn->get_json_resource("/admin/log", pairs, *log_info);
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to fetch datalog info" << dendl;
+ ldpp_dout(dpp, 0) << "ERROR: failed to fetch datalog info" << dendl;
return ret;
}
- ldout(store->ctx(), 20) << "remote datalog, num_shards=" << log_info->num_shards << dendl;
+ ldpp_dout(dpp, 20) << "remote datalog, num_shards=" << log_info->num_shards << dendl;
return 0;
}
int RGWRemoteDataLog::read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result)
{
- if (store->is_meta_master()) {
- return 0;
- }
-
return run(new RGWListRemoteDataLogCR(&sync_env, shard_markers, 1, result));
}
-int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncModuleInstanceRef& _sync_module)
+int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger,
+ RGWSyncTraceManager *_sync_tracer, RGWSyncModuleInstanceRef& _sync_module)
{
- sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger,
- _source_zone, _sync_module);
+ sync_env.init(dpp, store->ctx(), store, _conn, async_rados, &http_manager, _error_logger,
+ _sync_tracer, _source_zone, _sync_module);
if (initialized) {
return 0;
}
- int ret = http_manager.set_threaded();
+ int ret = http_manager.start();
if (ret < 0) {
- ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
+ ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
return ret;
}
+ tn = sync_env.sync_tracer->add_node(sync_env.sync_tracer->root_node, "data");
+
initialized = true;
return 0;
// cannot run concurrently with run_sync(), so run in a separate manager
RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
- int ret = http_manager.set_threaded();
+ int ret = http_manager.start();
if (ret < 0) {
- ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
+ ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
return ret;
}
RGWDataSyncEnv sync_env_local = sync_env;
// cannot run concurrently with run_sync(), so run in a separate manager
RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
- int ret = http_manager.set_threaded();
+ int ret = http_manager.start();
if (ret < 0) {
- ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
+ ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
return ret;
}
RGWDataSyncEnv sync_env_local = sync_env;
sync_env_local.http_manager = &http_manager;
- map<int, std::set<std::string>> entries_map;
+ std::vector<RGWRadosGetOmapKeysCR::ResultPtr> omapkeys;
+ omapkeys.resize(num_shards);
uint64_t max_entries{1};
- ret = crs.run(new RGWReadDataSyncRecoveringShardsCR(&sync_env_local, max_entries, num_shards, entries_map));
+ ret = crs.run(new RGWReadDataSyncRecoveringShardsCR(&sync_env_local, max_entries, num_shards, omapkeys));
http_manager.stop();
if (ret == 0) {
- for (const auto& entry : entries_map) {
- if (entry.second.size() != 0) {
- recovering_shards.insert(entry.first);
+ for (int i = 0; i < num_shards; i++) {
+ if (omapkeys[i]->entries.size() != 0) {
+ recovering_shards.insert(i);
}
}
}
RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
- int ret = http_manager.set_threaded();
+ int ret = http_manager.start();
if (ret < 0) {
- ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
+ ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
return ret;
}
RGWDataSyncEnv sync_env_local = sync_env;
sync_env_local.http_manager = &http_manager;
- uint64_t instance_id;
- get_random_bytes((char *)&instance_id, sizeof(instance_id));
- ret = crs.run(new RGWInitDataSyncStatusCoroutine(&sync_env_local, num_shards, instance_id, &sync_status));
+ auto instance_id = ceph::util::generate_random_number<uint64_t>();
+ ret = crs.run(new RGWInitDataSyncStatusCoroutine(&sync_env_local, num_shards, instance_id, tn, &sync_status));
http_manager.stop();
return ret;
}
entrypoint, NULL, &result));
}
if (retcode < 0) {
- ldout(sync_env->cct, 0) << "ERROR: failed to fetch metadata for section bucket.index" << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: failed to fetch metadata for section bucket.instance" << dendl;
return set_cr_error(retcode);
}
entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards,
- store->get_zone_params().log_pool,
+ store->svc.zone->get_zone_params().log_pool,
oid_prefix);
yield; // yield so OmapAppendCRs can start
for (iter = result.begin(); iter != result.end(); ++iter) {
- ldout(sync_env->cct, 20) << "list metadata: section=bucket.index key=" << *iter << dendl;
+ ldout(sync_env->cct, 20) << "list metadata: section=bucket.instance key=" << *iter << dendl;
key = *iter;
int shard_id = (int)iter->first;
rgw_data_sync_marker& marker = iter->second;
marker.total_entries = entries_index->get_total_entries(shard_id);
- spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
- rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
+ spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store->svc.sysobj,
+ rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
marker), true);
}
} else {
marker_to_key.erase(iter);
}
+ RGWSyncTraceNodeRef tn;
+
public:
RGWDataSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
const string& _marker_oid,
- const rgw_data_sync_marker& _marker) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
+ const rgw_data_sync_marker& _marker,
+ RGWSyncTraceNodeRef& _tn) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
sync_env(_sync_env),
marker_oid(_marker_oid),
- sync_marker(_marker) {}
+ sync_marker(_marker),
+ tn(_tn) {}
RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
sync_marker.marker = new_marker;
sync_marker.pos = index_pos;
- ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
+ tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
RGWRados *store = sync_env->store;
- return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
- rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
+ return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store->svc.sysobj,
+ rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, marker_oid),
sync_marker);
}
return true;
}
- RGWOrderCallCR *allocate_order_control_cr() {
+ RGWOrderCallCR *allocate_order_control_cr() override {
return new RGWLastCallerWinsCR(sync_env->cct);
}
};
// ostream wrappers to print buckets without copying strings
struct bucket_str {
const rgw_bucket& b;
- bucket_str(const rgw_bucket& b) : b(b) {}
+ explicit bucket_str(const rgw_bucket& b) : b(b) {}
};
std::ostream& operator<<(std::ostream& out, const bucket_str& rhs) {
auto& b = rhs.b;
return out;
}
+struct bucket_str_noinstance {
+ const rgw_bucket& b;
+ explicit bucket_str_noinstance(const rgw_bucket& b) : b(b) {}
+};
+std::ostream& operator<<(std::ostream& out, const bucket_str_noinstance& rhs) {
+ auto& b = rhs.b;
+ if (!b.tenant.empty()) {
+ out << b.tenant << '/';
+ }
+ out << b.name;
+ return out;
+}
+
struct bucket_shard_str {
const rgw_bucket_shard& bs;
- bucket_shard_str(const rgw_bucket_shard& bs) : bs(bs) {}
+ explicit bucket_shard_str(const rgw_bucket_shard& bs) : bs(bs) {}
};
std::ostream& operator<<(std::ostream& out, const bucket_shard_str& rhs) {
auto& bs = rhs.bs;
rgw_bucket_shard_sync_info sync_status;
RGWMetaSyncEnv meta_sync_env;
- RGWDataSyncDebugLogger logger;
const std::string status_oid;
boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
+ RGWSyncTraceNodeRef tn;
+
public:
- RGWRunBucketSyncCoroutine(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs)
+ RGWRunBucketSyncCoroutine(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs, const RGWSyncTraceNodeRef& _tn_parent)
: RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
- status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)) {
- logger.init(sync_env, "Bucket", bs.get_key());
+ status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
+ tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket",
+ SSTR(bucket_shard_str{bs}))) {
}
~RGWRunBucketSyncCoroutine() override {
if (lease_cr) {
set<string> keys;
+ RGWSyncTraceNodeRef tn;
public:
RGWDataSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker,
- RGWOmapAppend *_error_repo, bool _remove_from_repo) : RGWCoroutine(_sync_env->cct),
+ RGWOmapAppend *_error_repo, bool _remove_from_repo, const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
raw_key(_raw_key), entry_marker(_entry_marker),
sync_status(0),
marker_tracker(_marker_tracker),
error_repo(_error_repo), remove_from_repo(_remove_from_repo) {
set_description() << "data sync single entry (source_zone=" << sync_env->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker;
+ tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", raw_key);
}
int operate() override {
if (marker_tracker) {
marker_tracker->reset_need_retry(raw_key);
}
- call(new RGWRunBucketSyncCoroutine(sync_env, bs));
+ tn->log(0, SSTR("triggering sync of bucket/shard " << bucket_shard_str{bs}));
+ call(new RGWRunBucketSyncCoroutine(sync_env, bs, tn));
}
} while (marker_tracker && marker_tracker->need_retry(raw_key));
// this was added when 'tenant/' was added to datalog entries, because
// preexisting tenant buckets could never sync and would stay in the
// error_repo forever
- ldout(sync_env->store->ctx(), 0) << "WARNING: skipping data log entry "
- "for missing bucket " << raw_key << dendl;
+ tn->log(0, SSTR("WARNING: skipping data log entry for missing bucket " << raw_key));
sync_status = 0;
}
yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", raw_key,
-sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status)));
if (retcode < 0) {
- ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure: retcode=" << retcode << dendl;
+ tn->log(0, SSTR("ERROR: failed to log sync failure: retcode=" << retcode));
}
}
if (error_repo && !error_repo->append(raw_key)) {
- ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure in error repo: retcode=" << retcode << dendl;
+ tn->log(0, SSTR("ERROR: failed to log sync failure in error repo: retcode=" << retcode));
}
} else if (error_repo && remove_from_repo) {
keys = {raw_key};
yield call(new RGWRadosRemoveOmapKeysCR(sync_env->store, error_repo->get_obj(), keys));
if (retcode < 0) {
- ldout(sync_env->store->ctx(), 0) << "ERROR: failed to remove omap key from error repo ("
- << error_repo->get_obj() << " retcode=" << retcode << dendl;
+ tn->log(0, SSTR("ERROR: failed to remove omap key from error repo ("
+ << error_repo->get_obj() << " retcode=" << retcode));
}
}
/* FIXME: what do do in case of error */
uint32_t shard_id;
rgw_data_sync_marker sync_marker;
+ RGWRadosGetOmapKeysCR::ResultPtr omapkeys;
std::set<std::string> entries;
std::set<std::string>::iterator iter;
#define RETRY_BACKOFF_SECS_MAX 600
uint32_t retry_backoff_secs;
- RGWDataSyncDebugLogger logger;
+ RGWSyncTraceNodeRef tn;
public:
RGWDataSyncShardCR(RGWDataSyncEnv *_sync_env,
rgw_pool& _pool,
- uint32_t _shard_id, const rgw_data_sync_marker& _marker, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
+ uint32_t _shard_id, const rgw_data_sync_marker& _marker,
+ RGWSyncTraceNodeRef& _tn,
+ bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
pool(_pool),
shard_id(_shard_id),
marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"),
total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL),
lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES),
- retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT) {
+ retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT), tn(_tn) {
set_description() << "data sync shard source_zone=" << sync_env->source_zone << " shard_id=" << shard_id;
status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
error_oid = status_oid + ".retry";
-
- logger.init(sync_env, "DataShard", status_oid);
}
~RGWDataSyncShardCR() override {
case rgw_data_sync_marker::FullSync:
r = full_sync();
if (r < 0) {
- ldout(cct, 10) << "sync: full_sync: shard_id=" << shard_id << " r=" << r << dendl;
+ if (r != -EBUSY) {
+ tn->log(10, SSTR("full sync failed (r=" << r << ")"));
+ }
return set_cr_error(r);
}
return 0;
case rgw_data_sync_marker::IncrementalSync:
r = incremental_sync();
if (r < 0) {
- ldout(cct, 10) << "sync: incremental_sync: shard_id=" << shard_id << " r=" << r << dendl;
+ if (r != -EBUSY) {
+ tn->log(10, SSTR("incremental sync failed (r=" << r << ")"));
+ }
return set_cr_error(r);
}
return 0;
}
RGWRados *store = sync_env->store;
lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
- rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
+ rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
lock_name, lock_duration, this));
lease_stack.reset(spawn(lease_cr.get(), false));
}
#define OMAP_GET_MAX_ENTRIES 100
int max_entries = OMAP_GET_MAX_ENTRIES;
reenter(&full_cr) {
+ tn->log(10, "start full sync");
yield init_lease_cr();
while (!lease_cr->is_locked()) {
if (lease_cr->is_done()) {
- ldout(cct, 5) << "lease cr failed, done early " << dendl;
+ tn->log(5, "failed to take lease");
set_status("lease lock failed, early abort");
drain_all();
return set_cr_error(lease_cr->get_ret_status());
set_sleeping(true);
yield;
}
- logger.log("full sync");
+ tn->log(10, "took lease");
oid = full_data_sync_index_shard_oid(sync_env->source_zone, shard_id);
- set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker));
+ set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker, tn));
total_entries = sync_marker.pos;
do {
if (!lease_cr->is_locked()) {
drain_all();
return set_cr_error(-ECANCELED);
}
- yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid), sync_marker.marker, &entries, max_entries));
+ omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
+ yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid),
+ sync_marker.marker, max_entries, omapkeys));
if (retcode < 0) {
- ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
+ tn->log(0, SSTR("ERROR: RGWRadosGetOmapKeysCR() returned ret=" << retcode));
lease_cr->go_down();
drain_all();
return set_cr_error(retcode);
}
+ entries = std::move(omapkeys->entries);
+ if (entries.size() > 0) {
+ tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
+ }
+ tn->log(20, SSTR("retrieved " << entries.size() << " entries to sync"));
iter = entries.begin();
for (; iter != entries.end(); ++iter) {
- ldout(sync_env->cct, 20) << __func__ << ": full sync: " << *iter << dendl;
+ tn->log(20, SSTR("full sync: " << *iter));
total_entries++;
if (!marker_tracker->start(*iter, total_entries, real_time())) {
- ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << *iter << ". Duplicate entry?" << dendl;
+ tn->log(0, SSTR("ERROR: cannot start syncing " << *iter << ". Duplicate entry?"));
} else {
// fetch remote and write locally
- yield spawn(new RGWDataSyncSingleEntryCR(sync_env, *iter, *iter, marker_tracker, error_repo, false), false);
+ yield spawn(new RGWDataSyncSingleEntryCR(sync_env, *iter, *iter, marker_tracker, error_repo, false, tn), false);
}
sync_marker.marker = *iter;
int ret;
while (collect(&ret, lease_stack.get())) {
if (ret < 0) {
- ldout(cct, 10) << "a sync operation returned error" << dendl;
+ tn->log(10, "a sync operation returned error");
}
}
}
}
- } while ((int)entries.size() == max_entries);
+ } while (omapkeys->more);
+ omapkeys.reset();
drain_all_but_stack(lease_stack.get());
+ tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
+
yield {
/* update marker to reflect we're done with full sync */
sync_marker.state = rgw_data_sync_marker::IncrementalSync;
sync_marker.marker = sync_marker.next_step_marker;
sync_marker.next_step_marker.clear();
RGWRados *store = sync_env->store;
- call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
- rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
+ call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store->svc.sysobj,
+ rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
sync_marker));
}
if (retcode < 0) {
- ldout(sync_env->cct, 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
+ tn->log(0, SSTR("ERROR: failed to set sync marker: retcode=" << retcode));
lease_cr->go_down();
drain_all();
return set_cr_error(retcode);
int incremental_sync() {
reenter(&incremental_cr) {
- ldout(cct, 10) << "start incremental sync" << dendl;
+ tn->log(10, "start incremental sync");
if (lease_cr) {
- ldout(cct, 10) << "lease already held from full sync" << dendl;
+ tn->log(10, "lease already held from full sync");
} else {
yield init_lease_cr();
while (!lease_cr->is_locked()) {
if (lease_cr->is_done()) {
- ldout(cct, 5) << "lease cr failed, done early " << dendl;
+ tn->log(5, "failed to take lease");
set_status("lease lock failed, early abort");
drain_all();
return set_cr_error(lease_cr->get_ret_status());
yield;
}
set_status("lease acquired");
- ldout(cct, 10) << "took lease" << dendl;
+ tn->log(10, "took lease");
}
error_repo = new RGWOmapAppend(sync_env->async_rados, sync_env->store,
rgw_raw_obj(pool, error_oid),
1 /* no buffer */);
error_repo->get();
spawn(error_repo, false);
- logger.log("inc sync");
- set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker));
+ set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker, tn));
do {
if (!lease_cr->is_locked()) {
stop_spawned_services();
current_modified.swap(modified_shards);
inc_lock.Unlock();
+ if (current_modified.size() > 0) {
+ tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
+ }
/* process out of band updates */
for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) {
yield {
- ldout(sync_env->cct, 20) << __func__ << "(): async update notification: " << *modified_iter << dendl;
- spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker, error_repo, false), false);
+ tn->log(20, SSTR("received async update notification: " << *modified_iter));
+ spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker, error_repo, false, tn), false);
}
}
if (error_retry_time <= ceph::coarse_real_clock::now()) {
/* process bucket shards that previously failed */
+ omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid),
- error_marker, &error_entries,
- max_error_entries));
- ldout(sync_env->cct, 20) << __func__ << "(): read error repo, got " << error_entries.size() << " entries" << dendl;
+ error_marker, max_error_entries, omapkeys));
+ error_entries = std::move(omapkeys->entries);
+ tn->log(20, SSTR("read error repo, got " << error_entries.size() << " entries"));
iter = error_entries.begin();
for (; iter != error_entries.end(); ++iter) {
error_marker = *iter;
- ldout(sync_env->cct, 20) << __func__ << "(): handle error entry: " << error_marker << dendl;
- spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true), false);
+ tn->log(20, SSTR("handle error entry: " << error_marker));
+ spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true, tn), false);
}
- if ((int)error_entries.size() != max_error_entries) {
+ if (!omapkeys->more) {
if (error_marker.empty() && error_entries.empty()) {
/* the retry repo is empty, we back off a bit before calling it again */
retry_backoff_secs *= 2;
error_marker.clear();
}
}
+ omapkeys.reset();
#define INCREMENTAL_MAX_ENTRIES 100
- ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " sync_marker=" << sync_marker.marker << dendl;
+ tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker));
spawned_keys.clear();
yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, sync_marker.marker,
&next_marker, &log_entries, &truncated));
if (retcode < 0) {
- ldout(sync_env->cct, 0) << "ERROR: failed to read remote data log info: ret=" << retcode << dendl;
+ tn->log(0, SSTR("ERROR: failed to read remote data log info: ret=" << retcode));
stop_spawned_services();
drain_all();
return set_cr_error(retcode);
}
+
+ if (log_entries.size() > 0) {
+ tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
+ }
+
for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
- ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key << dendl;
+ tn->log(20, SSTR("shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key));
if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) {
- ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard" << dendl;
+ tn->log(20, SSTR("skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard"));
marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
continue;
}
if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
- ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?" << dendl;
+ tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?"));
} else {
/*
* don't spawn the same key more than once. We can do that as long as we don't yield
*/
if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
spawned_keys.insert(log_iter->entry.key);
- spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false), false);
+ spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false, tn), false);
if (retcode < 0) {
stop_spawned_services();
drain_all();
}
}
}
- while ((int)num_spawned() > spawn_window) {
- set_status() << "num_spawned() > spawn_window";
- yield wait_for_child();
- int ret;
- while (collect(&ret, lease_stack.get())) {
- if (ret < 0) {
- ldout(sync_env->cct, 10) << "a sync operation returned error" << dendl;
- /* we have reported this error */
- }
- /* not waiting for child here */
+ }
+ while ((int)num_spawned() > spawn_window) {
+ set_status() << "num_spawned() > spawn_window";
+ yield wait_for_child();
+ int ret;
+ while (collect(&ret, lease_stack.get())) {
+ if (ret < 0) {
+ tn->log(10, "a sync operation returned error");
+ /* we have reported this error */
}
/* not waiting for child here */
}
}
- ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " sync_marker=" << sync_marker.marker
- << " next_marker=" << next_marker << " truncated=" << truncated << dendl;
- if (!truncated) {
- yield wait(get_idle_interval());
- }
+
+ tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker
+ << " next_marker=" << next_marker << " truncated=" << truncated));
if (!next_marker.empty()) {
sync_marker.marker = next_marker;
} else if (!log_entries.empty()) {
sync_marker.marker = log_entries.back().log_id;
}
+ if (!truncated) {
+ // we reached the end, wait a while before checking for more
+ tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
+ yield wait(get_idle_interval());
+ }
} while (true);
}
return 0;
uint32_t shard_id;
rgw_data_sync_marker sync_marker;
+ RGWSyncTraceNodeRef tn;
public:
- RGWDataSyncShardControlCR(RGWDataSyncEnv *_sync_env, rgw_pool& _pool,
- uint32_t _shard_id, rgw_data_sync_marker& _marker) : RGWBackoffControlCR(_sync_env->cct, false),
+ RGWDataSyncShardControlCR(RGWDataSyncEnv *_sync_env, const rgw_pool& _pool,
+ uint32_t _shard_id, rgw_data_sync_marker& _marker,
+ RGWSyncTraceNodeRef& _tn_parent) : RGWBackoffControlCR(_sync_env->cct, false),
sync_env(_sync_env),
pool(_pool),
shard_id(_shard_id),
sync_marker(_marker) {
+ tn = sync_env->sync_tracer->add_node(_tn_parent, "shard", std::to_string(shard_id));
}
RGWCoroutine *alloc_cr() override {
- return new RGWDataSyncShardCR(sync_env, pool, shard_id, sync_marker, backoff_ptr());
+ return new RGWDataSyncShardCR(sync_env, pool, shard_id, sync_marker, tn, backoff_ptr());
}
RGWCoroutine *alloc_finisher_cr() override {
RGWRados *store = sync_env->store;
- return new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->async_rados, store,
- rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
+ return new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->async_rados, store->svc.sysobj,
+ rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
&sync_marker);
}
bool *reset_backoff;
- RGWDataSyncDebugLogger logger;
+ RGWSyncTraceNodeRef tn;
RGWDataSyncModule *data_sync_module{nullptr};
public:
- RGWDataSyncCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
+ RGWDataSyncCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards, RGWSyncTraceNodeRef& _tn, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
num_shards(_num_shards),
marker_tracker(NULL),
shard_crs_lock("RGWDataSyncCR::shard_crs_lock"),
- reset_backoff(_reset_backoff), logger(sync_env, "Data", "all") {
+ reset_backoff(_reset_backoff), tn(_tn) {
}
data_sync_module = sync_env->sync_module->get_data_handler();
if (retcode < 0 && retcode != -ENOENT) {
- ldout(sync_env->cct, 0) << "ERROR: failed to fetch sync status, retcode=" << retcode << dendl;
+ tn->log(0, SSTR("ERROR: failed to fetch sync status, retcode=" << retcode));
return set_cr_error(retcode);
}
/* state: init status */
if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) {
- ldout(sync_env->cct, 20) << __func__ << "(): init" << dendl;
+ tn->log(20, SSTR("init"));
sync_status.sync_info.num_shards = num_shards;
uint64_t instance_id;
- get_random_bytes((char *)&instance_id, sizeof(instance_id));
- yield call(new RGWInitDataSyncStatusCoroutine(sync_env, num_shards, instance_id, &sync_status));
+ instance_id = ceph::util::generate_random_number<uint64_t>();
+ yield call(new RGWInitDataSyncStatusCoroutine(sync_env, num_shards, instance_id, tn, &sync_status));
if (retcode < 0) {
- ldout(sync_env->cct, 0) << "ERROR: failed to init sync, retcode=" << retcode << dendl;
+ tn->log(0, SSTR("ERROR: failed to init sync, retcode=" << retcode));
return set_cr_error(retcode);
}
// sets state = StateBuildingFullSyncMaps
data_sync_module->init(sync_env, sync_status.sync_info.instance_id);
if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) {
+ tn->log(10, SSTR("building full sync maps"));
/* call sync module init here */
sync_status.sync_info.num_shards = num_shards;
yield call(data_sync_module->init_sync(sync_env));
if (retcode < 0) {
- ldout(sync_env->cct, 0) << "ERROR: sync module init_sync() failed, retcode=" << retcode << dendl;
+ tn->log(0, SSTR("ERROR: sync module init_sync() failed, retcode=" << retcode));
return set_cr_error(retcode);
}
/* state: building full sync maps */
- ldout(sync_env->cct, 20) << __func__ << "(): building full sync maps" << dendl;
yield call(new RGWListBucketIndexesCR(sync_env, &sync_status));
if (retcode < 0) {
- ldout(sync_env->cct, 0) << "ERROR: failed to build full sync maps, retcode=" << retcode << dendl;
+ tn->log(0, SSTR("ERROR: failed to build full sync maps, retcode=" << retcode));
return set_cr_error(retcode);
}
sync_status.sync_info.state = rgw_data_sync_info::StateSync;
/* update new state */
yield call(set_sync_info_cr());
if (retcode < 0) {
- ldout(sync_env->cct, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
+ tn->log(0, SSTR("ERROR: failed to write sync status, retcode=" << retcode));
return set_cr_error(retcode);
}
*reset_backoff = true;
}
+ yield call(data_sync_module->start_sync(sync_env));
+
yield {
if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateSync) {
+ tn->log(10, SSTR("spawning " << num_shards << " shards sync"));
for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status.sync_markers.begin();
iter != sync_status.sync_markers.end(); ++iter) {
- RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sync_env, sync_env->store->get_zone_params().log_pool,
- iter->first, iter->second);
+ RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sync_env, sync_env->store->svc.zone->get_zone_params().log_pool,
+ iter->first, iter->second, tn);
cr->get();
shard_crs_lock.Lock();
shard_crs[iter->first] = cr;
RGWCoroutine *set_sync_info_cr() {
RGWRados *store = sync_env->store;
- return new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->async_rados, store,
- rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
+ return new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->async_rados, store->svc.sysobj,
+ rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
sync_status.sync_info);
}
public:
RGWDefaultDataSyncModule() {}
- RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, boost::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override;
+ RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override;
RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
RGWDataSyncModule *get_data_handler() override {
return &data_handler;
}
+ bool supports_user_writes() override {
+ return true;
+ }
};
-int RGWDefaultSyncModule::create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance)
+int RGWDefaultSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance)
{
instance->reset(new RGWDefaultSyncModuleInstance());
return 0;
}
-RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, boost::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
+RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
{
return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, bucket_info,
- key, versioned_epoch,
+ std::nullopt,
+ key, std::nullopt, versioned_epoch,
true, zones_trace);
}
&owner.id, &owner.display_name, true, &mtime, zones_trace);
}
+class RGWArchiveDataSyncModule : public RGWDefaultDataSyncModule {
+public:
+ RGWArchiveDataSyncModule() {}
+
+ RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override;
+ RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
+ RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
+ rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
+};
+
+class RGWArchiveSyncModuleInstance : public RGWDefaultSyncModuleInstance {
+ RGWArchiveDataSyncModule data_handler;
+public:
+ RGWArchiveSyncModuleInstance() {}
+ RGWDataSyncModule *get_data_handler() override {
+ return &data_handler;
+ }
+ RGWMetadataHandler *alloc_bucket_meta_handler() override {
+ return RGWArchiveBucketMetaHandlerAllocator::alloc();
+ }
+ RGWMetadataHandler *alloc_bucket_instance_meta_handler() override {
+ return RGWArchiveBucketInstanceMetaHandlerAllocator::alloc();
+ }
+};
+
+int RGWArchiveSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance)
+{
+ instance->reset(new RGWArchiveSyncModuleInstance());
+ return 0;
+}
+
+RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
+{
+ ldout(sync_env->cct, 5) << "SYNC_ARCHIVE: sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
+ if (!bucket_info.versioned() ||
+ (bucket_info.flags & BUCKET_VERSIONS_SUSPENDED)) {
+ ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: sync_object: enabling object versioning for archive bucket" << dendl;
+ bucket_info.flags = (bucket_info.flags & ~BUCKET_VERSIONS_SUSPENDED) | BUCKET_VERSIONED;
+ int op_ret = sync_env->store->put_bucket_instance_info(bucket_info, false, real_time(), NULL);
+ if (op_ret < 0) {
+ ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: sync_object: error versioning archive bucket" << dendl;
+ return NULL;
+ }
+ }
+
+ std::optional<rgw_obj_key> dest_key;
+
+ if (versioned_epoch.value_or(0) == 0) { /* force version if not set */
+ versioned_epoch = 0;
+ dest_key = key;
+ if (key.instance.empty()) {
+ sync_env->store->gen_rand_obj_instance_name(&(*dest_key));
+ }
+ }
+
+ return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
+ bucket_info, std::nullopt,
+ key, dest_key, versioned_epoch,
+ true, zones_trace);
+}
+
+RGWCoroutine *RGWArchiveDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key,
+ real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
+{
+ ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: remove_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
+ return NULL;
+}
+
+RGWCoroutine *RGWArchiveDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
+ rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
+{
+ ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
+ << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
+ return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
+ bucket_info, key, versioned, versioned_epoch,
+ &owner.id, &owner.display_name, true, &mtime, zones_trace);
+}
+
class RGWDataSyncControlCR : public RGWBackoffControlCR
{
RGWDataSyncEnv *sync_env;
uint32_t num_shards;
+ RGWSyncTraceNodeRef tn;
+
static constexpr bool exit_on_error = false; // retry on all errors
public:
- RGWDataSyncControlCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards) : RGWBackoffControlCR(_sync_env->cct, exit_on_error),
- sync_env(_sync_env), num_shards(_num_shards) {
+ RGWDataSyncControlCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards,
+ RGWSyncTraceNodeRef& _tn_parent) : RGWBackoffControlCR(_sync_env->cct, exit_on_error),
+ sync_env(_sync_env), num_shards(_num_shards) {
+ tn = sync_env->sync_tracer->add_node(_tn_parent, "sync");
}
RGWCoroutine *alloc_cr() override {
- return new RGWDataSyncCR(sync_env, num_shards, backoff_ptr());
+ return new RGWDataSyncCR(sync_env, num_shards, tn, backoff_ptr());
}
void wakeup(int shard_id, set<string>& keys) {
m.Unlock();
if (cr) {
+ tn->log(20, SSTR("notify shard=" << shard_id << " keys=" << keys));
cr->wakeup(shard_id, keys);
}
int RGWRemoteDataLog::run_sync(int num_shards)
{
lock.get_write();
- data_sync_cr = new RGWDataSyncControlCR(&sync_env, num_shards);
+ data_sync_cr = new RGWDataSyncControlCR(&sync_env, num_shards, tn);
data_sync_cr->get(); // run() will drop a ref, so take another
lock.unlock();
lock.unlock();
if (r < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to run sync" << dendl;
+ ldpp_dout(dpp, 0) << "ERROR: failed to run sync" << dendl;
return r;
}
return 0;
int RGWDataSyncStatusManager::init()
{
- auto zone_def_iter = store->zone_by_id.find(source_zone);
- if (zone_def_iter == store->zone_by_id.end()) {
- ldout(store->ctx(), 0) << "ERROR: failed to find zone config info for zone=" << source_zone << dendl;
+ RGWZone *zone_def;
+
+ if (!store->svc.zone->find_zone_by_id(source_zone, &zone_def)) {
+ ldpp_dout(this, 0) << "ERROR: failed to find zone config info for zone=" << source_zone << dendl;
return -EIO;
}
- auto& zone_def = zone_def_iter->second;
-
- if (!store->get_sync_modules_manager()->supports_data_export(zone_def.tier_type)) {
+ if (!store->svc.sync_modules->get_manager()->supports_data_export(zone_def->tier_type)) {
return -ENOTSUP;
}
- RGWZoneParams& zone_params = store->get_zone_params();
+ const RGWZoneParams& zone_params = store->svc.zone->get_zone_params();
if (sync_module == nullptr) {
sync_module = store->get_sync_module();
}
- conn = store->get_zone_conn_by_id(source_zone);
+ conn = store->svc.zone->get_zone_conn_by_id(source_zone);
if (!conn) {
- ldout(store->ctx(), 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
+ ldpp_dout(this, 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
return -EINVAL;
}
error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
- int r = source_log.init(source_zone, conn, error_logger, sync_module);
+ int r = source_log.init(source_zone, conn, error_logger, store->get_sync_tracer(), sync_module);
if (r < 0) {
- lderr(store->ctx()) << "ERROR: failed to init remote log, r=" << r << dendl;
+ ldpp_dout(this, 0) << "ERROR: failed to init remote log, r=" << r << dendl;
finalize();
return r;
}
rgw_datalog_info datalog_info;
r = source_log.read_log_info(&datalog_info);
if (r < 0) {
- ldout(store->ctx(), 5) << "ERROR: master.read_log_info() returned r=" << r << dendl;
+ ldpp_dout(this, 5) << "ERROR: master.read_log_info() returned r=" << r << dendl;
finalize();
return r;
}
error_logger = nullptr;
}
+unsigned RGWDataSyncStatusManager::get_subsys() const
+{
+ return dout_subsys;
+}
+
+std::ostream& RGWDataSyncStatusManager::gen_prefix(std::ostream& out) const
+{
+ auto zone = std::string_view{source_zone};
+ return out << "data sync zone:" << zone.substr(0, 8) << ' ';
+}
+
string RGWDataSyncStatusManager::sync_status_oid(const string& source_zone)
{
char buf[datalog_sync_status_oid_prefix.size() + source_zone.size() + 16];
int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn,
const rgw_bucket& bucket, int shard_id,
RGWSyncErrorLogger *_error_logger,
+ RGWSyncTraceManager *_sync_tracer,
RGWSyncModuleInstanceRef& _sync_module)
{
conn = _conn;
bs.bucket = bucket;
bs.shard_id = shard_id;
- sync_env.init(store->ctx(), store, conn, async_rados, http_manager,
- _error_logger, source_zone, _sync_module);
+ sync_env.init(dpp, store->ctx(), store, conn, async_rados, http_manager,
+ _error_logger, _sync_tracer, source_zone, _sync_module);
return 0;
}
}
yield {
auto store = sync_env->store;
- rgw_raw_obj obj(store->get_zone_params().log_pool, sync_status_oid);
+ rgw_raw_obj obj(store->svc.zone->get_zone_params().log_pool, sync_status_oid);
if (info.syncstopped) {
call(new RGWRadosRemoveCR(store, obj));
status.inc_marker.position = info.max_marker;
map<string, bufferlist> attrs;
status.encode_all_attrs(attrs);
- call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store, obj, attrs));
+ call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store->svc.sysobj, obj, attrs));
}
}
if (info.syncstopped) {
return new RGWInitBucketShardSyncStatusCoroutine(&sync_env, bs, init_status);
}
+#define BUCKET_SYNC_ATTR_PREFIX RGW_ATTR_PREFIX "bucket-sync."
+
template <class T>
-static void decode_attr(CephContext *cct, map<string, bufferlist>& attrs, const string& attr_name, T *val)
+static bool decode_attr(CephContext *cct, map<string, bufferlist>& attrs, const string& attr_name, T *val)
{
map<string, bufferlist>::iterator iter = attrs.find(attr_name);
if (iter == attrs.end()) {
*val = T();
- return;
+ return false;
}
- bufferlist::iterator biter = iter->second.begin();
+ auto biter = iter->second.cbegin();
try {
- ::decode(*val, biter);
+ decode(*val, biter);
} catch (buffer::error& err) {
ldout(cct, 0) << "ERROR: failed to decode attribute: " << attr_name << dendl;
+ return false;
}
+ return true;
}
void rgw_bucket_shard_sync_info::decode_from_attrs(CephContext *cct, map<string, bufferlist>& attrs)
{
- decode_attr(cct, attrs, "state", &state);
- decode_attr(cct, attrs, "full_marker", &full_marker);
- decode_attr(cct, attrs, "inc_marker", &inc_marker);
+ if (!decode_attr(cct, attrs, BUCKET_SYNC_ATTR_PREFIX "state", &state)) {
+ decode_attr(cct, attrs, "state", &state);
+ }
+ if (!decode_attr(cct, attrs, BUCKET_SYNC_ATTR_PREFIX "full_marker", &full_marker)) {
+ decode_attr(cct, attrs, "full_marker", &full_marker);
+ }
+ if (!decode_attr(cct, attrs, BUCKET_SYNC_ATTR_PREFIX "inc_marker", &inc_marker)) {
+ decode_attr(cct, attrs, "inc_marker", &inc_marker);
+ }
}
void rgw_bucket_shard_sync_info::encode_all_attrs(map<string, bufferlist>& attrs)
void rgw_bucket_shard_sync_info::encode_state_attr(map<string, bufferlist>& attrs)
{
- ::encode(state, attrs["state"]);
+ using ceph::encode;
+ encode(state, attrs[BUCKET_SYNC_ATTR_PREFIX "state"]);
}
void rgw_bucket_shard_full_sync_marker::encode_attr(map<string, bufferlist>& attrs)
{
- ::encode(*this, attrs["full_marker"]);
+ using ceph::encode;
+ encode(*this, attrs[BUCKET_SYNC_ATTR_PREFIX "full_marker"]);
}
void rgw_bucket_shard_inc_sync_marker::encode_attr(map<string, bufferlist>& attrs)
{
- ::encode(*this, attrs["inc_marker"]);
+ using ceph::encode;
+ encode(*this, attrs[BUCKET_SYNC_ATTR_PREFIX "inc_marker"]);
}
class RGWReadBucketSyncStatusCoroutine : public RGWCoroutine {
int RGWReadBucketSyncStatusCoroutine::operate()
{
reenter(this) {
- yield call(new RGWSimpleRadosReadAttrsCR(sync_env->async_rados, sync_env->store,
- rgw_raw_obj(sync_env->store->get_zone_params().log_pool, oid),
- &attrs));
+ yield call(new RGWSimpleRadosReadAttrsCR(sync_env->async_rados, sync_env->store->svc.sysobj,
+ rgw_raw_obj(sync_env->store->svc.zone->get_zone_params().log_pool, oid),
+ &attrs, true));
if (retcode == -ENOENT) {
*status = rgw_bucket_shard_sync_info();
return set_cr_done();
string marker;
string error_oid;
+ RGWRadosGetOmapKeysCR::ResultPtr omapkeys;
set<string> error_entries;
int max_omap_entries;
int count;
//read recovering bucket shards
count = 0;
do {
- yield call(new RGWRadosGetOmapKeysCR(store, rgw_raw_obj(store->get_zone_params().log_pool, error_oid),
- marker, &error_entries, max_omap_entries));
+ omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
+ yield call(new RGWRadosGetOmapKeysCR(store, rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, error_oid),
+ marker, max_omap_entries, omapkeys));
if (retcode == -ENOENT) {
break;
return set_cr_error(retcode);
}
+ error_entries = std::move(omapkeys->entries);
if (error_entries.empty()) {
break;
}
count += error_entries.size();
marker = *error_entries.rbegin();
- recovering_buckets.insert(error_entries.begin(), error_entries.end());
- }while((int)error_entries.size() == max_omap_entries && count < max_entries);
+ recovering_buckets.insert(std::make_move_iterator(error_entries.begin()),
+ std::make_move_iterator(error_entries.end()));
+ } while (omapkeys->more && count < max_entries);
return set_cr_done();
}
reenter(this){
//read sync status marker
using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
- yield call(new CR(sync_env->async_rados, store,
- rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
+ yield call(new CR(sync_env->async_rados, store->svc.sysobj,
+ rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
sync_marker));
if (retcode < 0) {
ldout(sync_env->cct,0) << "failed to read sync status marker with "
// cannot run concurrently with run_sync(), so run in a separate manager
RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
- int ret = http_manager.set_threaded();
+ int ret = http_manager.start();
if (ret < 0) {
- ldout(store->ctx(), 0) << "failed in http_manager.start() ret=" << ret << dendl;
+ ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
return ret;
}
RGWDataSyncEnv sync_env_local = sync_env;
JSONDecoder::decode_json("Owner", owner, obj);
JSONDecoder::decode_json("VersionedEpoch", versioned_epoch, obj);
JSONDecoder::decode_json("RgwxTag", rgw_tag, obj);
+ if (key.instance == "null" && !versioned_epoch) {
+ key.instance.clear();
+ }
}
RGWModifyOp get_modify_op() const {
string marker_oid;
rgw_bucket_shard_full_sync_marker sync_marker;
+ RGWSyncTraceNodeRef tn;
+
public:
RGWBucketFullSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
const string& _marker_oid,
marker_oid(_marker_oid),
sync_marker(_marker) {}
+ void set_tn(RGWSyncTraceNodeRef& _tn) {
+ tn = _tn;
+ }
+
RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override {
sync_marker.position = new_marker;
sync_marker.count = index_pos;
RGWRados *store = sync_env->store;
- ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
- return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store,
- rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
+ tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
+ return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store->svc.sysobj,
+ rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, marker_oid),
attrs);
}
- RGWOrderCallCR *allocate_order_control_cr() {
+ RGWOrderCallCR *allocate_order_control_cr() override {
return new RGWLastCallerWinsCR(sync_env->cct);
}
};
map<string, operation> marker_to_op;
std::set<std::string> pending_olh; // object names with pending olh operations
+ RGWSyncTraceNodeRef tn;
+
void handle_finish(const string& marker) override {
auto iter = marker_to_op.find(marker);
if (iter == marker_to_op.end()) {
marker_oid(_marker_oid),
sync_marker(_marker) {}
+ void set_tn(RGWSyncTraceNodeRef& _tn) {
+ tn = _tn;
+ }
+
RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
sync_marker.position = new_marker;
RGWRados *store = sync_env->store;
- ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
+ tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados,
- store,
- rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
+ store->svc.sysobj,
+ rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, marker_oid),
attrs);
}
bool can_do_op(const rgw_obj_key& key, bool is_olh) {
// serialize olh ops on the same object name
if (is_olh && pending_olh.count(key.name)) {
- ldout(sync_env->cct, 20) << __func__ << "(): sync of " << key << " waiting for pending olh op" << dendl;
+ tn->log(20, SSTR("sync of " << key << " waiting for pending olh op"));
return false;
}
return (key_to_marker.find(key) == key_to_marker.end());
}
- RGWOrderCallCR *allocate_order_control_cr() {
+ RGWOrderCallCR *allocate_order_control_cr() override {
return new RGWLastCallerWinsCR(sync_env->cct);
}
};
rgw_obj_key key;
bool versioned;
- boost::optional<uint64_t> versioned_epoch;
+ std::optional<uint64_t> versioned_epoch;
rgw_bucket_entry_owner owner;
real_time timestamp;
RGWModifyOp op;
stringstream error_ss;
- RGWDataSyncDebugLogger logger;
-
bool error_injection;
RGWDataSyncModule *data_sync_module;
rgw_zone_set zones_trace;
+ RGWSyncTraceNodeRef tn;
public:
RGWBucketSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
RGWBucketInfo *_bucket_info,
const rgw_bucket_shard& bs,
const rgw_obj_key& _key, bool _versioned,
- boost::optional<uint64_t> _versioned_epoch,
+ std::optional<uint64_t> _versioned_epoch,
real_time& _timestamp,
const rgw_bucket_entry_owner& _owner,
RGWModifyOp _op, RGWPendingState _op_state,
- const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker, rgw_zone_set& _zones_trace) : RGWCoroutine(_sync_env->cct),
+ const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker, rgw_zone_set& _zones_trace,
+ RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
bucket_info(_bucket_info), bs(bs),
key(_key), versioned(_versioned), versioned_epoch(_versioned_epoch),
stringstream ss;
ss << bucket_shard_str{bs} << "/" << key << "[" << versioned_epoch.value_or(0) << "]";
set_description() << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state;
- ldout(sync_env->cct, 20) << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state << dendl;
set_status("init");
- logger.init(sync_env, "Object", ss.str());
+ tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", SSTR(key));
+ tn->log(20, SSTR("bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state));
error_injection = (sync_env->cct->_conf->rgw_sync_data_inject_err_probability > 0);
data_sync_module = sync_env->sync_module->get_data_handler();
zones_trace = _zones_trace;
- zones_trace.insert(sync_env->store->get_zone().id);
+ zones_trace.insert(sync_env->store->svc.zone->get_zone().id);
}
int operate() override {
if (op_state != CLS_RGW_STATE_COMPLETE) {
goto done;
}
+ tn->set_flag(RGW_SNS_FLAG_ACTIVE);
do {
yield {
marker_tracker->reset_need_retry(key);
if (key.name.empty()) {
/* shouldn't happen */
set_status("skipping empty entry");
- ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): entry with empty obj name, skipping" << dendl;
+ tn->log(0, "entry with empty obj name, skipping");
goto done;
}
if (error_injection &&
rand() % 10000 < cct->_conf->rgw_sync_data_inject_err_probability * 10000.0) {
- ldout(sync_env->cct, 0) << __func__ << ": injecting data sync error on key=" << key.name << dendl;
+ tn->log(0, SSTR(": injecting data sync error on key=" << key.name));
retcode = -EIO;
} else if (op == CLS_RGW_OP_ADD ||
op == CLS_RGW_OP_LINK_OLH) {
set_status("syncing obj");
- ldout(sync_env->cct, 5) << "bucket sync: sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]" << dendl;
- logger.log("fetch");
+ tn->log(5, SSTR("bucket sync: sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
call(data_sync_module->sync_object(sync_env, *bucket_info, key, versioned_epoch, &zones_trace));
} else if (op == CLS_RGW_OP_DEL || op == CLS_RGW_OP_UNLINK_INSTANCE) {
set_status("removing obj");
if (op == CLS_RGW_OP_UNLINK_INSTANCE) {
versioned = true;
}
- logger.log("remove");
+ tn->log(10, SSTR("removing obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
call(data_sync_module->remove_object(sync_env, *bucket_info, key, timestamp, versioned, versioned_epoch.value_or(0), &zones_trace));
// our copy of the object is more recent, continue as if it succeeded
if (retcode == -ERR_PRECONDITION_FAILED) {
retcode = 0;
}
} else if (op == CLS_RGW_OP_LINK_OLH_DM) {
- logger.log("creating delete marker");
set_status("creating delete marker");
- ldout(sync_env->cct, 10) << "creating delete marker: obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]" << dendl;
+ tn->log(10, SSTR("creating delete marker: obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
call(data_sync_module->create_delete_marker(sync_env, *bucket_info, key, timestamp, owner, versioned, versioned_epoch.value_or(0), &zones_trace));
}
+ tn->set_resource_name(SSTR(bucket_str_noinstance(bucket_info->bucket) << "/" << key));
}
} while (marker_tracker->need_retry(key));
{
- stringstream ss;
+ tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
if (retcode >= 0) {
- ss << "done";
+ tn->log(10, "success");
} else {
- ss << "done, retcode=" << retcode;
+ tn->log(10, SSTR("failed, retcode=" << retcode << " (" << cpp_strerror(-retcode) << ")"));
}
- logger.log(ss.str());
}
if (retcode < 0 && retcode != -ENOENT) {
set_status() << "failed to sync obj; retcode=" << retcode;
- ldout(sync_env->cct, 0) << "ERROR: failed to sync object: "
- << bucket_shard_str{bs} << "/" << key.name << dendl;
+ tn->log(0, SSTR("ERROR: failed to sync object: "
+ << bucket_shard_str{bs} << "/" << key.name));
error_ss << bucket_shard_str{bs} << "/" << key.name;
sync_status = retcode;
}
const string& status_oid;
- RGWDataSyncDebugLogger logger;
rgw_zone_set zones_trace;
+
+ RGWSyncTraceNodeRef tn;
public:
RGWBucketShardFullSyncCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
RGWBucketInfo *_bucket_info,
const std::string& status_oid,
RGWContinuousLeaseCR *lease_cr,
- rgw_bucket_shard_sync_info& sync_info)
+ rgw_bucket_shard_sync_info& sync_info,
+ RGWSyncTraceNodeRef tn_parent)
: RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
bucket_info(_bucket_info), lease_cr(lease_cr), sync_info(sync_info),
marker_tracker(sync_env, status_oid, sync_info.full_marker),
- status_oid(status_oid) {
- logger.init(sync_env, "BucketFull", bs.get_key());
+ status_oid(status_oid),
+ tn(sync_env->sync_tracer->add_node(tn_parent, "full_sync",
+ SSTR(bucket_shard_str{bs}))) {
zones_trace.insert(sync_env->source_zone);
+ marker_tracker.set_tn(tn);
}
int operate() override;
return set_cr_error(-ECANCELED);
}
set_status("listing remote bucket");
- ldout(sync_env->cct, 20) << __func__ << "(): listing bucket for full sync" << dendl;
+ tn->log(20, "listing bucket for full sync");
yield call(new RGWListBucketShardCR(sync_env, bs, list_marker,
&list_result));
if (retcode < 0 && retcode != -ENOENT) {
drain_all();
return set_cr_error(retcode);
}
+ if (list_result.entries.size() > 0) {
+ tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
+ }
entries_iter = list_result.entries.begin();
for (; entries_iter != list_result.entries.end(); ++entries_iter) {
if (!lease_cr->is_locked()) {
drain_all();
return set_cr_error(-ECANCELED);
}
- ldout(sync_env->cct, 20) << "[full sync] syncing object: "
- << bucket_shard_str{bs} << "/" << entries_iter->key << dendl;
+ tn->log(20, SSTR("[full sync] syncing object: "
+ << bucket_shard_str{bs} << "/" << entries_iter->key));
entry = &(*entries_iter);
total_entries++;
list_marker = entries_iter->key;
if (!marker_tracker.start(entry->key, total_entries, real_time())) {
- ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << entry->key << ". Duplicate entry?" << dendl;
+ tn->log(0, SSTR("ERROR: cannot start syncing " << entry->key << ". Duplicate entry?"));
} else {
using SyncCR = RGWBucketSyncSingleEntryCR<rgw_obj_key, rgw_obj_key>;
yield spawn(new SyncCR(sync_env, bucket_info, bs, entry->key,
false, /* versioned, only matters for object removal */
entry->versioned_epoch, entry->mtime,
entry->owner, entry->get_modify_op(), CLS_RGW_STATE_COMPLETE,
- entry->key, &marker_tracker, zones_trace),
+ entry->key, &marker_tracker, zones_trace, tn),
false);
}
while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
while (again) {
again = collect(&ret, nullptr);
if (ret < 0) {
- ldout(sync_env->cct, 10) << "a sync operation returned error" << dendl;
+ tn->log(10, "a sync operation returned error");
sync_status = ret;
/* we have reported this error */
}
while (again) {
again = collect(&ret, nullptr);
if (ret < 0) {
- ldout(sync_env->cct, 10) << "a sync operation returned error" << dendl;
+ tn->log(10, "a sync operation returned error");
sync_status = ret;
/* we have reported this error */
}
}
}
+ tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
if (!lease_cr->is_locked()) {
return set_cr_error(-ECANCELED);
}
map<string, bufferlist> attrs;
sync_info.encode_state_attr(attrs);
RGWRados *store = sync_env->store;
- call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store,
- rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
+ call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store->svc.sysobj,
+ rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
attrs));
}
} else {
- ldout(sync_env->cct, 10) << "failure in sync, backing out (sync_status=" << sync_status<< ")" << dendl;
+ tn->log(10, SSTR("backing out with sync_status=" << sync_status));
}
if (retcode < 0 && sync_status == 0) { /* actually tried to set incremental state and failed */
- ldout(sync_env->cct, 0) << "ERROR: failed to set sync state on bucket "
- << bucket_shard_str{bs} << " retcode=" << retcode << dendl;
+ tn->log(0, SSTR("ERROR: failed to set sync state on bucket "
+ << bucket_shard_str{bs} << " retcode=" << retcode));
return set_cr_error(retcode);
}
if (sync_status < 0) {
string cur_id;
- RGWDataSyncDebugLogger logger;
-
int sync_status{0};
bool syncstopped{false};
+ RGWSyncTraceNodeRef tn;
public:
RGWBucketShardIncrementalSyncCR(RGWDataSyncEnv *_sync_env,
const rgw_bucket_shard& bs,
RGWBucketInfo *_bucket_info,
const std::string& status_oid,
RGWContinuousLeaseCR *lease_cr,
- rgw_bucket_shard_sync_info& sync_info)
+ rgw_bucket_shard_sync_info& sync_info,
+ RGWSyncTraceNodeRef& _tn_parent)
: RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
bucket_info(_bucket_info), lease_cr(lease_cr), sync_info(sync_info),
marker_tracker(sync_env, status_oid, sync_info.inc_marker),
- status_oid(status_oid), zone_id(_sync_env->store->get_zone().id)
+ status_oid(status_oid), zone_id(_sync_env->store->svc.zone->get_zone().id),
+ tn(sync_env->sync_tracer->add_node(_tn_parent, "inc_sync",
+ SSTR(bucket_shard_str{bs})))
{
set_description() << "bucket shard incremental sync bucket="
<< bucket_shard_str{bs};
set_status("init");
- logger.init(sync_env, "BucketInc", bs.get_key());
+ marker_tracker.set_tn(tn);
}
int operate() override;
do {
if (!lease_cr->is_locked()) {
drain_all();
+ tn->log(0, "ERROR: lease is not taken, abort");
return set_cr_error(-ECANCELED);
}
- ldout(sync_env->cct, 20) << __func__ << "(): listing bilog for incremental sync" << sync_info.inc_marker.position << dendl;
+ tn->log(20, SSTR("listing bilog for incremental sync" << sync_info.inc_marker.position));
set_status() << "listing bilog; position=" << sync_info.inc_marker.position;
yield call(new RGWListBucketIndexLogCR(sync_env, bs, sync_info.inc_marker.position,
&list_result));
if (!key.set(rgw_obj_index_key{entry->object, entry->instance})) {
set_status() << "parse_raw_oid() on " << entry->object << " returned false, skipping entry";
- ldout(sync_env->cct, 20) << "parse_raw_oid() on " << entry->object << " returned false, skipping entry" << dendl;
+ tn->log(20, SSTR("parse_raw_oid() on " << entry->object << " returned false, skipping entry"));
marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
continue;
}
- ldout(sync_env->cct, 20) << "parsed entry: id=" << cur_id << " iter->object=" << entry->object << " iter->instance=" << entry->instance << " name=" << key.name << " instance=" << key.instance << " ns=" << key.ns << dendl;
+ tn->log(20, SSTR("parsed entry: id=" << cur_id << " iter->object=" << entry->object << " iter->instance=" << entry->instance << " name=" << key.name << " instance=" << key.instance << " ns=" << key.ns));
if (!key.ns.empty()) {
set_status() << "skipping entry in namespace: " << entry->object;
- ldout(sync_env->cct, 20) << "skipping entry in namespace: " << entry->object << dendl;
+ tn->log(20, SSTR("skipping entry in namespace: " << entry->object));
marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
continue;
}
set_status() << "got entry.id=" << cur_id << " key=" << key << " op=" << (int)entry->op;
if (entry->op == CLS_RGW_OP_CANCEL) {
set_status() << "canceled operation, skipping";
- ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
- << bucket_shard_str{bs} << "/" << key << ": canceled operation" << dendl;
+ tn->log(20, SSTR("skipping object: "
+ << bucket_shard_str{bs} << "/" << key << ": canceled operation"));
marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
continue;
}
if (entry->state != CLS_RGW_STATE_COMPLETE) {
set_status() << "non-complete operation, skipping";
- ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
- << bucket_shard_str{bs} << "/" << key << ": non-complete operation" << dendl;
+ tn->log(20, SSTR("skipping object: "
+ << bucket_shard_str{bs} << "/" << key << ": non-complete operation"));
marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
continue;
}
if (entry->zones_trace.find(zone_id) != entry->zones_trace.end()) {
set_status() << "redundant operation, skipping";
- ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
- <<bucket_shard_str{bs} <<"/"<<key<<": redundant operation" << dendl;
+ tn->log(20, SSTR("skipping object: "
+ <<bucket_shard_str{bs} <<"/"<<key<<": redundant operation"));
marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
continue;
}
if (make_pair<>(entry->timestamp, entry->op) != squash_map[make_pair(entry->object, entry->instance)]) {
set_status() << "squashed operation, skipping";
- ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
- << bucket_shard_str{bs} << "/" << key << ": squashed operation" << dendl;
+ tn->log(20, SSTR("skipping object: "
+ << bucket_shard_str{bs} << "/" << key << ": squashed operation"));
marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
continue;
}
- ldout(sync_env->cct, 20) << "[inc sync] syncing object: "
- << bucket_shard_str{bs} << "/" << key << dendl;
+ tn->set_flag(RGW_SNS_FLAG_ACTIVE);
+ tn->log(20, SSTR("syncing object: "
+ << bucket_shard_str{bs} << "/" << key));
updated_status = false;
while (!marker_tracker.can_do_op(key, has_olh_epoch(entry->op))) {
if (!updated_status) {
set_status() << "can't do op, conflicting inflight operation";
updated_status = true;
}
- ldout(sync_env->cct, 5) << *this << ": [inc sync] can't do op on key=" << key << " need to wait for conflicting operation to complete" << dendl;
+ tn->log(5, SSTR("can't do op on key=" << key << " need to wait for conflicting operation to complete"));
yield wait_for_child();
bool again = true;
while (again) {
again = collect(&ret, nullptr);
if (ret < 0) {
- ldout(sync_env->cct, 0) << "ERROR: a child operation returned error (ret=" << ret << ")" << dendl;
+ tn->log(0, SSTR("ERROR: a child operation returned error (ret=" << ret << ")"));
sync_status = ret;
/* we have reported this error */
}
}
if (!marker_tracker.index_key_to_marker(key, cur_id, has_olh_epoch(entry->op))) {
set_status() << "can't do op, sync already in progress for object";
- ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << cur_id << ":" << key << " sync already in progress for object" << dendl;
+ tn->log(20, SSTR("skipping sync of entry: " << cur_id << ":" << key << " sync already in progress for object"));
marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
continue;
}
// yield {
set_status() << "start object sync";
if (!marker_tracker.start(cur_id, 0, entry->timestamp)) {
- ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << cur_id << ". Duplicate entry?" << dendl;
+ tn->log(0, SSTR("ERROR: cannot start syncing " << cur_id << ". Duplicate entry?"));
} else {
- boost::optional<uint64_t> versioned_epoch;
+ std::optional<uint64_t> versioned_epoch;
rgw_bucket_entry_owner owner(entry->owner, entry->owner_display_name);
if (entry->ver.pool < 0) {
versioned_epoch = entry->ver.epoch;
}
- ldout(sync_env->cct, 20) << __func__ << "(): entry->timestamp=" << entry->timestamp << dendl;
+ tn->log(20, SSTR("entry->timestamp=" << entry->timestamp));
using SyncCR = RGWBucketSyncSingleEntryCR<string, rgw_obj_key>;
spawn(new SyncCR(sync_env, bucket_info, bs, key,
entry->is_versioned(), versioned_epoch,
entry->timestamp, owner, entry->op, entry->state,
- cur_id, &marker_tracker, entry->zones_trace),
+ cur_id, &marker_tracker, entry->zones_trace, tn),
false);
}
// }
while (again) {
again = collect(&ret, nullptr);
if (ret < 0) {
- ldout(sync_env->cct, 10) << "a sync operation returned error" << dendl;
+ tn->log(10, "a sync operation returned error");
sync_status = ret;
/* we have reported this error */
}
while (again) {
again = collect(&ret, nullptr);
if (ret < 0) {
- ldout(sync_env->cct, 10) << "a sync operation returned error" << dendl;
+ tn->log(10, "a sync operation returned error");
sync_status = ret;
/* we have reported this error */
}
/* not waiting for child here */
}
}
+ tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
if (syncstopped) {
// transition back to StateInit in RGWRunBucketSyncCoroutine. if sync is
yield call(marker_tracker.flush());
if (retcode < 0) {
- ldout(sync_env->cct, 0) << "ERROR: marker_tracker.flush() returned retcode=" << retcode << dendl;
+ tn->log(0, SSTR("ERROR: marker_tracker.flush() returned retcode=" << retcode));
return set_cr_error(retcode);
}
if (sync_status < 0) {
- ldout(sync_env->cct, 10) << "failure in sync, backing out (sync_status=" << sync_status<< ")" << dendl;
+ tn->log(10, SSTR("backing out with sync_status=" << sync_status));
return set_cr_error(sync_status);
}
return set_cr_done();
set_status("acquiring sync lock");
auto store = sync_env->store;
lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
- rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
+ rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
"sync_lock",
cct->_conf->rgw_sync_lease_period,
this));
}
while (!lease_cr->is_locked()) {
if (lease_cr->is_done()) {
- ldout(cct, 5) << "lease cr failed, done early" << dendl;
+ tn->log(5, "failed to take lease");
set_status("lease lock failed, early abort");
drain_all();
return set_cr_error(lease_cr->get_ret_status());
yield;
}
+ tn->log(10, "took lease");
yield call(new RGWReadBucketSyncStatusCoroutine(sync_env, bs, &sync_status));
if (retcode < 0 && retcode != -ENOENT) {
- ldout(sync_env->cct, 0) << "ERROR: failed to read sync status for bucket="
- << bucket_shard_str{bs} << dendl;
+ tn->log(0, "ERROR: failed to read sync status for bucket");
lease_cr->go_down();
drain_all();
return set_cr_error(retcode);
}
- ldout(sync_env->cct, 20) << __func__ << "(): sync status for bucket "
- << bucket_shard_str{bs} << ": " << sync_status.state << dendl;
+ tn->log(20, SSTR("sync status for bucket: " << sync_status.state));
yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
if (retcode == -ENOENT) {
/* bucket instance info has not been synced in yet, fetch it now */
yield {
- ldout(sync_env->cct, 10) << "no local info for bucket "
- << bucket_str{bs.bucket} << ": fetching metadata" << dendl;
+ tn->log(10, SSTR("no local info for bucket:" << ": fetching metadata"));
string raw_key = string("bucket.instance:") + bs.bucket.get_key();
- meta_sync_env.init(cct, sync_env->store, sync_env->store->rest_master_conn, sync_env->async_rados, sync_env->http_manager, sync_env->error_logger);
+ meta_sync_env.init(sync_env->dpp, cct, sync_env->store, sync_env->store->svc.zone->get_master_conn(), sync_env->async_rados,
+ sync_env->http_manager, sync_env->error_logger, sync_env->sync_tracer);
call(new RGWMetaSyncSingleEntryCR(&meta_sync_env, raw_key,
string() /* no marker */,
MDLOG_STATUS_COMPLETE,
- NULL /* no marker tracker */));
+ NULL /* no marker tracker */,
+ tn));
}
if (retcode < 0) {
- ldout(sync_env->cct, 0) << "ERROR: failed to fetch bucket instance info for " << bucket_str{bs.bucket} << dendl;
+ tn->log(0, SSTR("ERROR: failed to fetch bucket instance info for " << bucket_str{bs.bucket}));
lease_cr->go_down();
drain_all();
return set_cr_error(retcode);
yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
}
if (retcode < 0) {
- ldout(sync_env->cct, 0) << "ERROR: failed to retrieve bucket info for bucket=" << bucket_str{bs.bucket} << dendl;
+ tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{bs.bucket}));
lease_cr->go_down();
drain_all();
return set_cr_error(retcode);
if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
yield call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bs, sync_status));
if (retcode == -ENOENT) {
- ldout(sync_env->cct, 0) << "bucket sync disabled" << dendl;
+ tn->log(0, "bucket sync disabled");
lease_cr->abort(); // deleted lease object, abort/wakeup instead of unlock
lease_cr->wakeup();
lease_cr.reset();
return set_cr_done();
}
if (retcode < 0) {
- ldout(sync_env->cct, 0) << "ERROR: init sync on " << bucket_shard_str{bs}
- << " failed, retcode=" << retcode << dendl;
+ tn->log(0, SSTR("ERROR: init sync on bucket failed, retcode=" << retcode));
lease_cr->go_down();
drain_all();
return set_cr_error(retcode);
if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
yield call(new RGWBucketShardFullSyncCR(sync_env, bs, &bucket_info,
status_oid, lease_cr.get(),
- sync_status));
+ sync_status, tn));
if (retcode < 0) {
- ldout(sync_env->cct, 5) << "full sync on " << bucket_shard_str{bs}
- << " failed, retcode=" << retcode << dendl;
+ tn->log(5, SSTR("full sync on bucket failed, retcode=" << retcode));
lease_cr->go_down();
drain_all();
return set_cr_error(retcode);
if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
yield call(new RGWBucketShardIncrementalSyncCR(sync_env, bs, &bucket_info,
status_oid, lease_cr.get(),
- sync_status));
+ sync_status, tn));
if (retcode < 0) {
- ldout(sync_env->cct, 5) << "incremental sync on " << bucket_shard_str{bs}
- << " failed, retcode=" << retcode << dendl;
+ tn->log(5, SSTR("incremental sync on bucket failed, retcode=" << retcode));
lease_cr->go_down();
drain_all();
return set_cr_error(retcode);
RGWCoroutine *RGWRemoteBucketLog::run_sync_cr()
{
- return new RGWRunBucketSyncCoroutine(&sync_env, bs);
+ return new RGWRunBucketSyncCoroutine(&sync_env, bs, sync_env.sync_tracer->root_node);
}
int RGWBucketSyncStatusManager::init()
{
- conn = store->get_zone_conn_by_id(source_zone);
+ conn = store->svc.zone->get_zone_conn_by_id(source_zone);
if (!conn) {
- ldout(store->ctx(), 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
+ ldpp_dout(this, 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
return -EINVAL;
}
- int ret = http_manager.set_threaded();
+ int ret = http_manager.start();
if (ret < 0) {
- ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
+ ldpp_dout(this, 0) << "failed in http_manager.start() ret=" << ret << dendl;
return ret;
}
bucket_instance_meta_info result;
ret = cr_mgr.run(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), conn, &http_manager, path, pairs, &result));
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to fetch bucket metadata info from zone=" << source_zone << " path=" << path << " key=" << key << " ret=" << ret << dendl;
+ ldpp_dout(this, 0) << "ERROR: failed to fetch bucket metadata info from zone=" << source_zone << " path=" << path << " key=" << key << " ret=" << ret << dendl;
return ret;
}
auto async_rados = store->get_async_rados();
for (int i = 0; i < effective_num_shards; i++) {
- RGWRemoteBucketLog *l = new RGWRemoteBucketLog(store, this, async_rados, &http_manager);
- ret = l->init(source_zone, conn, bucket, (num_shards ? i : -1), error_logger, sync_module);
+ RGWRemoteBucketLog *l = new RGWRemoteBucketLog(this, store, this, async_rados, &http_manager);
+ ret = l->init(source_zone, conn, bucket, (num_shards ? i : -1), error_logger, store->get_sync_tracer(), sync_module);
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl;
+ ldpp_dout(this, 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl;
return ret;
}
source_logs[i] = l;
int ret = cr_mgr.run(stacks);
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to read sync status for "
+ ldpp_dout(this, 0) << "ERROR: failed to read sync status for "
<< bucket_str{bucket} << dendl;
return ret;
}
int ret = cr_mgr.run(stacks);
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to read sync status for "
+ ldpp_dout(this, 0) << "ERROR: failed to read sync status for "
<< bucket_str{bucket} << dendl;
return ret;
}
return 0;
}
+unsigned RGWBucketSyncStatusManager::get_subsys() const
+{
+ return dout_subsys;
+}
+
+std::ostream& RGWBucketSyncStatusManager::gen_prefix(std::ostream& out) const
+{
+ auto zone = std::string_view{source_zone};
+ return out << "bucket sync zone:" << zone.substr(0, 8)
+ << " bucket:" << bucket.name << ' ';
+}
+
string RGWBucketSyncStatusManager::status_oid(const string& source_zone,
const rgw_bucket_shard& bs)
{
return bucket_status_oid_prefix + "." + source_zone + ":" + bs.get_key();
}
+string RGWBucketSyncStatusManager::obj_status_oid(const string& source_zone,
+ const rgw_obj& obj)
+{
+ return object_status_oid_prefix + "." + source_zone + ":" + obj.bucket.get_key() + ":" +
+ obj.key.name + ":" + obj.key.instance;
+}
+
class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR {
static constexpr int max_concurrent_shards = 16;
RGWRados *const store;
}
};
-int rgw_bucket_sync_status(RGWRados *store, const std::string& source_zone,
+int rgw_bucket_sync_status(const DoutPrefixProvider *dpp, RGWRados *store, const std::string& source_zone,
const RGWBucketInfo& bucket_info,
std::vector<rgw_bucket_shard_sync_info> *status)
{
RGWDataSyncEnv env;
RGWSyncModuleInstanceRef module; // null sync module
- env.init(store->ctx(), store, nullptr, store->get_async_rados(),
- nullptr, nullptr, source_zone, module);
+ env.init(dpp, store->ctx(), store, nullptr, store->get_async_rados(),
+ nullptr, nullptr, nullptr, source_zone, module);
RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
return crs.run(new RGWCollectBucketSyncStatusCR(store, &env, num_shards,
int num_shards, std::vector<std::string>& last_trim)
: RGWCoroutine(store->ctx()), store(store), http(http),
num_shards(num_shards),
- zone_id(store->get_zone().id),
- peer_status(store->zone_conn_map.size()),
+ zone_id(store->svc.zone->get_zone().id),
+ peer_status(store->svc.zone->get_zone_conn_map().size()),
min_shard_markers(num_shards),
last_trim(last_trim)
{}
};
auto p = peer_status.begin();
- for (auto& c : store->zone_conn_map) {
+ for (auto& c : store->svc.zone->get_zone_conn_map()) {
ldout(cct, 20) << "query sync status from " << c.first << dendl;
using StatusCR = RGWReadRESTResourceCR<rgw_data_sync_status>;
spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
// prevent other gateways from attempting to trim for the duration
set_status("acquiring trim lock");
yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
- rgw_raw_obj(store->get_zone_params().log_pool, lock_oid),
+ rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, lock_oid),
"data_trim", lock_cookie,
interval.sec()));
if (retcode < 0) {