#define BUCKET_SHARD_SYNC_SPAWN_WINDOW 20
#define DATA_SYNC_MAX_ERR_ENTRIES 10
+enum RemoteDatalogStatus {
+ RemoteNotTrimmed = 0,
+ RemoteTrimmed = 1,
+ RemoteMightTrimmed = 2
+};
+
class RGWDataSyncShardCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
RGWDataChangesLogInfo shard_info;
string datalog_marker;
+ RemoteDatalogStatus remote_trimmed;
Mutex inc_lock;
Cond inc_cond;
pool(_pool),
shard_id(_shard_id),
sync_marker(_marker),
- marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"),
+ marker_tracker(NULL), truncated(false), remote_trimmed(RemoteNotTrimmed), inc_lock("RGWDataSyncShardCR::inc_lock"),
total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL),
lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES),
retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT) {
return set_cr_error(retcode);
}
datalog_marker = shard_info.marker;
+ remote_trimmed = RemoteNotTrimmed;
#define INCREMENTAL_MAX_ENTRIES 100
ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
if (datalog_marker > sync_marker.marker) {
spawned_keys.clear();
+ if (sync_marker.marker.empty())
+ remote_trimmed = RemoteMightTrimmed; //remote data log shard might be trimmed;
yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &sync_marker.marker, &log_entries, &truncated));
if (retcode < 0) {
ldout(sync_env->cct, 0) << "ERROR: failed to read remote data log info: ret=" << retcode << dendl;
drain_all();
return set_cr_error(retcode);
}
+ if ((remote_trimmed == RemoteMightTrimmed) && sync_marker.marker.empty() && log_entries.empty())
+ remote_trimmed = RemoteTrimmed;
+ else
+ remote_trimmed = RemoteNotTrimmed;
for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key << dendl;
if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) {
}
}
ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
- if (datalog_marker == sync_marker.marker) {
+ if (datalog_marker == sync_marker.marker || remote_trimmed == RemoteTrimmed) {
#define INCREMENTAL_INTERVAL 20
yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
}