return string(buf);
}
+struct read_metadata_list {
+ string marker;
+ bool truncated;
+ list<string> keys;
+ int count;
+
+ read_metadata_list() : truncated(false), count(0) {}
+
+ void decode_json(JSONObj *obj) {
+ JSONDecoder::decode_json("marker", marker, obj);
+ JSONDecoder::decode_json("truncated", truncated, obj);
+ JSONDecoder::decode_json("keys", keys, obj);
+ JSONDecoder::decode_json("count", count, obj);
+ }
+};
+
struct bucket_instance_meta_info {
string key;
obj_version ver;
int req_ret;
int ret;
- list<string> result;
list<string>::iterator iter;
RGWShardedOmapCRManager *entries_index;
int i;
bool failed;
+ bool truncated;
+ read_metadata_list result;
public:
RGWListBucketIndexesCR(RGWDataSyncEnv *_sync_env,
rgw_data_sync_status *_sync_status) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
store(sync_env->store), sync_status(_sync_status),
- req_ret(0), ret(0), entries_index(NULL), i(0), failed(false) {
+ req_ret(0), ret(0), entries_index(NULL), i(0), failed(false), truncated(false) {
oid_prefix = datalog_sync_full_sync_index_prefix + "." + sync_env->source_zone;
path = "/admin/metadata/bucket.instance";
num_shards = sync_status->sync_info.num_shards;
int operate() override {
reenter(this) {
- yield {
- string entrypoint = string("/admin/metadata/bucket.instance");
- /* FIXME: need a better scaling solution here, requires streaming output */
- call(new RGWReadRESTResourceCR<list<string> >(store->ctx(), sync_env->conn, sync_env->http_manager,
- entrypoint, NULL, &result));
- }
- if (retcode < 0) {
- ldout(sync_env->cct, 0) << "ERROR: failed to fetch metadata for section bucket.instance" << dendl;
- return set_cr_error(retcode);
- }
entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards,
store->svc.zone->get_zone_params().log_pool,
oid_prefix);
yield; // yield so OmapAppendCRs can start
- for (iter = result.begin(); iter != result.end(); ++iter) {
- ldout(sync_env->cct, 20) << "list metadata: section=bucket.instance key=" << *iter << dendl;
-
- key = *iter;
+ do {
yield {
- rgw_http_param_pair pairs[] = { { "key", key.c_str() },
- { NULL, NULL } };
+ string entrypoint = string("/admin/metadata/bucket.instance");
- call(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), sync_env->conn, sync_env->http_manager, path, pairs, &meta_info));
+ rgw_http_param_pair pairs[] = {{"max-entries", "1000"},
+ {"marker", result.marker.c_str()},
+ {NULL, NULL}};
+
+ call(new RGWReadRESTResourceCR<read_metadata_list>(store->ctx(), sync_env->conn, sync_env->http_manager,
+ entrypoint, pairs, &result));
}
+ if (retcode < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: failed to fetch metadata for section bucket.instance" << dendl;
+ return set_cr_error(retcode);
+ }
+
+ for (iter = result.keys.begin(); iter != result.keys.end(); ++iter) {
+ ldout(sync_env->cct, 20) << "list metadata: section=bucket.instance key=" << *iter << dendl;
+ key = *iter;
- num_shards = meta_info.data.get_bucket_info().num_shards;
- if (num_shards > 0) {
- for (i = 0; i < num_shards; i++) {
- char buf[16];
- snprintf(buf, sizeof(buf), ":%d", i);
- s = key + buf;
- yield entries_index->append(s, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, i));
+ yield {
+ rgw_http_param_pair pairs[] = {{"key", key.c_str()},
+ {NULL, NULL}};
+
+ call(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), sync_env->conn, sync_env->http_manager, path, pairs, &meta_info));
+ }
+
+ num_shards = meta_info.data.get_bucket_info().num_shards;
+ if (num_shards > 0) {
+ for (i = 0; i < num_shards; i++) {
+ char buf[16];
+ snprintf(buf, sizeof(buf), ":%d", i);
+ s = key + buf;
+ yield entries_index->append(s, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, i));
+ }
+ } else {
+ yield entries_index->append(key, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, -1));
}
- } else {
- yield entries_index->append(key, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, -1));
}
- }
+ truncated = result.truncated;
+ } while (truncated);
+
yield {
if (!entries_index->finish()) {
failed = true;
marker.total_entries = entries_index->get_total_entries(shard_id);
spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store->svc.sysobj,
rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
- marker), true);
+ marker),
+ true);
}
} else {
- yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
- EIO, string("failed to build bucket instances map")));
+ yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
+ EIO, string("failed to build bucket instances map")));
}
while (collect(&ret, NULL)) {
- if (ret < 0) {
+ if (ret < 0) {
yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
-ret, string("failed to store sync status: ") + cpp_strerror(-ret)));
- req_ret = ret;
- }
+ req_ret = ret;
+ }
yield;
}
+
drain_all();
if (req_ret < 0) {
yield return set_cr_error(req_ret);
}
- yield return set_cr_done();
+ yield return set_cr_done();
}
return 0;
}