#include <iostream>
#include <sstream>
#include <string>
+#include <chrono>
+#include <thread>
#include <boost/optional.hpp>
}
+static int trim_sync_error_log(int shard_id, const ceph::real_time& start_time,
+ const ceph::real_time& end_time,
+ const string& start_marker, const string& end_marker,
+ int delay_ms)
+{
+ auto oid = RGWSyncErrorLogger::get_shard_oid(RGW_SYNC_ERROR_LOG_SHARD_PREFIX,
+ shard_id);
+ // call cls_log_trim() until it returns -ENODATA
+ for (;;) {
+ int ret = store->time_log_trim(oid, start_time, end_time,
+ start_marker, end_marker);
+ if (ret == -ENODATA) {
+ return 0;
+ }
+ if (ret < 0) {
+ return ret;
+ }
+ if (delay_ms) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
+ }
+ }
+ // unreachable
+}
+
int main(int argc, const char **argv)
{
vector<const char*> args;
boost::optional<std::string> compression_type;
+ int trim_delay_ms = 0;
+
for (std::vector<const char*>::iterator i = args.begin(); i != args.end(); ) {
if (ceph_argparse_double_dash(args, i)) {
break;
perm_policy_doc = val;
} else if (ceph_argparse_witharg(args, i, &val, "--path-prefix", (char*)NULL)) {
path_prefix = val;
+ } else if (ceph_argparse_witharg(args, i, &val, "--trim-delay-ms", (char*)NULL)) {
+ trim_delay_ms = atoi(val.c_str());
} else if (strncmp(*i, "-", 1) == 0) {
cerr << "ERROR: invalid flag " << *i << std::endl;
return EINVAL;
}
for (; shard_id < ERROR_LOGGER_SHARDS; ++shard_id) {
- string oid = RGWSyncErrorLogger::get_shard_oid(RGW_SYNC_ERROR_LOG_SHARD_PREFIX, shard_id);
- ret = store->time_log_trim(oid, start_time.to_real_time(), end_time.to_real_time(), start_marker, end_marker);
- if (ret < 0 && ret != -ENODATA) {
+ ret = trim_sync_error_log(shard_id, start_time.to_real_time(),
+ end_time.to_real_time(), start_marker,
+ end_marker, trim_delay_ms);
+ if (ret < 0) {
cerr << "ERROR: sync error trim: " << cpp_strerror(-ret) << std::endl;
return -ret;
}