class RGWReadRemoteDataLogShardCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
- RGWRESTReadResource *http_op;
+ RGWRESTReadResource *http_op = nullptr;
int shard_id;
- string *pmarker;
+ const std::string& marker;
+ string *pnext_marker;
list<rgw_data_change_log_entry> *entries;
bool *truncated;
read_remote_data_log_response response;
public:
- RGWReadRemoteDataLogShardCR(RGWDataSyncEnv *_sync_env,
- int _shard_id, string *_pmarker, list<rgw_data_change_log_entry> *_entries, bool *_truncated) : RGWCoroutine(_sync_env->cct),
- sync_env(_sync_env),
- http_op(NULL),
- shard_id(_shard_id),
- pmarker(_pmarker),
- entries(_entries),
- truncated(_truncated) {
+ RGWReadRemoteDataLogShardCR(RGWDataSyncEnv *_sync_env, int _shard_id,
+ const std::string& marker, string *pnext_marker,
+ list<rgw_data_change_log_entry> *_entries,
+ bool *_truncated)
+ : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+ shard_id(_shard_id), marker(marker), pnext_marker(pnext_marker),
+ entries(_entries), truncated(_truncated) {
}
~RGWReadRemoteDataLogShardCR() override {
if (http_op) {
snprintf(buf, sizeof(buf), "%d", shard_id);
rgw_http_param_pair pairs[] = { { "type" , "data" },
{ "id", buf },
- { "marker", pmarker->c_str() },
+ { "marker", marker.c_str() },
{ "extra-info", "true" },
{ NULL, NULL } };
}
entries->clear();
entries->swap(response.entries);
- *pmarker = response.marker;
+ *pnext_marker = response.marker;
*truncated = response.truncated;
return set_cr_done();
}
RGWDataSyncShardMarkerTrack *marker_tracker;
+ std::string next_marker;
list<rgw_data_change_log_entry> log_entries;
list<rgw_data_change_log_entry>::iterator log_iter;
bool truncated;
public:
RGWDataSyncShardCR(RGWDataSyncEnv *_sync_env,
rgw_pool& _pool,
- uint32_t _shard_id, rgw_data_sync_marker& _marker, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
+ uint32_t _shard_id, const rgw_data_sync_marker& _marker, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
pool(_pool),
shard_id(_shard_id),
if (lease_cr->is_done()) {
ldout(cct, 5) << "lease cr failed, done early " << dendl;
set_status("lease lock failed, early abort");
+ drain_all();
return set_cr_error(lease_cr->get_ret_status());
}
set_sleeping(true);
if (lease_cr->is_done()) {
ldout(cct, 5) << "lease cr failed, done early " << dendl;
set_status("lease lock failed, early abort");
+ drain_all();
return set_cr_error(lease_cr->get_ret_status());
}
set_sleeping(true);
#define INCREMENTAL_MAX_ENTRIES 100
ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " sync_marker=" << sync_marker.marker << dendl;
spawned_keys.clear();
- yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &sync_marker.marker, &log_entries, &truncated));
+ yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, sync_marker.marker,
+ &next_marker, &log_entries, &truncated));
if (retcode < 0) {
ldout(sync_env->cct, 0) << "ERROR: failed to read remote data log info: ret=" << retcode << dendl;
stop_spawned_services();
}
/* not waiting for child here */
}
- }
- ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " sync_marker=" << sync_marker.marker << " truncated=" << truncated << dendl;
- if (!truncated) {
- yield wait(get_idle_interval());
- }
+ }
+ ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " sync_marker=" << sync_marker.marker
+ << " next_marker=" << next_marker << " truncated=" << truncated << dendl;
+ if (!truncated) {
+ yield wait(get_idle_interval());
+ }
+ if (!next_marker.empty()) {
+ sync_marker.marker = next_marker;
+ } else if (!log_entries.empty()) {
+ sync_marker.marker = log_entries.back().log_id;
+ }
} while (true);
}
return 0;
rgw_data_sync_marker* sync_marker;
int count;
+ std::string next_marker;
list<rgw_data_change_log_entry> log_entries;
bool truncated;
marker = sync_marker->marker;
count = 0;
do{
- yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &marker, &log_entries, &truncated));
+ yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, marker,
+ &next_marker, &log_entries, &truncated));
if (retcode == -ENOENT) {
break;
JSONDecoder::decode_json("VersionedEpoch", versioned_epoch, obj);
JSONDecoder::decode_json("RgwxTag", rgw_tag, obj);
}
+
+ RGWModifyOp get_modify_op() const {
+ if (delete_marker) {
+ return CLS_RGW_OP_LINK_OLH_DM;
+ } else if (!key.instance.empty() && key.instance != "null") {
+ return CLS_RGW_OP_LINK_OLH;
+ } else {
+ return CLS_RGW_OP_ADD;
+ }
+ }
};
struct bucket_list_result {
RGWBucketFullSyncShardMarkerTrack marker_tracker;
rgw_obj_key list_marker;
bucket_list_entry *entry{nullptr};
- RGWModifyOp op{CLS_RGW_OP_ADD};
int total_entries{0};
if (!marker_tracker.start(entry->key, total_entries, real_time())) {
ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << entry->key << ". Duplicate entry?" << dendl;
} else {
- op = (entry->key.instance.empty() || entry->key.instance == "null" ? CLS_RGW_OP_ADD : CLS_RGW_OP_LINK_OLH);
using SyncCR = RGWBucketSyncSingleEntryCR<rgw_obj_key, rgw_obj_key>;
yield spawn(new SyncCR(sync_env, bucket_info, bs, entry->key,
false, /* versioned, only matters for object removal */
entry->versioned_epoch, entry->mtime,
- entry->owner, op, CLS_RGW_STATE_COMPLETE,
+ entry->owner, entry->get_modify_op(), CLS_RGW_STATE_COMPLETE,
entry->key, &marker_tracker, zones_trace),
false);
}
if (lease_cr->is_done()) {
ldout(cct, 5) << "lease cr failed, done early" << dendl;
set_status("lease lock failed, early abort");
+ drain_all();
return set_cr_error(lease_cr->get_ret_status());
}
set_sleeping(true);
do {
if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
yield call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bs, sync_status));
+ if (retcode == -ENOENT) {
+ ldout(sync_env->cct, 0) << "bucket sync disabled" << dendl;
+ lease_cr->abort(); // deleted lease object, abort/wakeup instead of unlock
+ lease_cr->wakeup();
+ lease_cr.reset();
+ drain_all();
+ return set_cr_done();
+ }
if (retcode < 0) {
ldout(sync_env->cct, 0) << "ERROR: init sync on " << bucket_shard_str{bs}
<< " failed, retcode=" << retcode << dendl;
return 0;
}
+RGWCoroutine* create_admin_data_log_trim_cr(RGWRados *store,
+ RGWHTTPManager *http,
+ int num_shards,
+ std::vector<std::string>& markers)
+{
+ return new DataLogTrimCR(store, http, num_shards, markers);
+}
+
class DataLogTrimPollCR : public RGWCoroutine {
RGWRados *store;
RGWHTTPManager *http;