]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_data_sync.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rgw / rgw_data_sync.cc
index 11ff6759f09103a772292375a79c3158e3033c5c..57ee38f3267b9c4e3aa10e35fdffca5aa8aee045 100644 (file)
@@ -707,6 +707,22 @@ static string full_data_sync_index_shard_oid(const string& source_zone, int shar
   return string(buf);
 }
 
+struct read_metadata_list {
+  string marker;
+  bool truncated;
+  list<string> keys;
+  int count;
+
+  read_metadata_list() : truncated(false), count(0) {}
+
+  void decode_json(JSONObj *obj) {
+    JSONDecoder::decode_json("marker", marker, obj);
+    JSONDecoder::decode_json("truncated", truncated, obj);
+    JSONDecoder::decode_json("keys", keys, obj);
+    JSONDecoder::decode_json("count", count, obj);
+  }
+};
+
 struct bucket_instance_meta_info {
   string key;
   obj_version ver;
@@ -734,7 +750,6 @@ class RGWListBucketIndexesCR : public RGWCoroutine {
   int req_ret;
   int ret;
 
-  list<string> result;
   list<string>::iterator iter;
 
   RGWShardedOmapCRManager *entries_index;
@@ -748,12 +763,14 @@ class RGWListBucketIndexesCR : public RGWCoroutine {
   int i;
 
   bool failed;
+  bool truncated;
+  read_metadata_list result;
 
 public:
   RGWListBucketIndexesCR(RGWDataSyncEnv *_sync_env,
                          rgw_data_sync_status *_sync_status) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
                                                       store(sync_env->store), sync_status(_sync_status),
-                                                     req_ret(0), ret(0), entries_index(NULL), i(0), failed(false) {
+                                                     req_ret(0), ret(0), entries_index(NULL), i(0), failed(false), truncated(false) {
     oid_prefix = datalog_sync_full_sync_index_prefix + "." + sync_env->source_zone; 
     path = "/admin/metadata/bucket.instance";
     num_shards = sync_status->sync_info.num_shards;
@@ -764,44 +781,53 @@ public:
 
   int operate() override {
     reenter(this) {
-      yield {
-        string entrypoint = string("/admin/metadata/bucket.instance");
-        /* FIXME: need a better scaling solution here, requires streaming output */
-        call(new RGWReadRESTResourceCR<list<string> >(store->ctx(), sync_env->conn, sync_env->http_manager,
-                                                      entrypoint, NULL, &result));
-      }
-      if (retcode < 0) {
-        ldout(sync_env->cct, 0) << "ERROR: failed to fetch metadata for section bucket.instance" << dendl;
-        return set_cr_error(retcode);
-      }
       entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards,
                                                  store->svc.zone->get_zone_params().log_pool,
                                                   oid_prefix);
       yield; // yield so OmapAppendCRs can start
-      for (iter = result.begin(); iter != result.end(); ++iter) {
-        ldout(sync_env->cct, 20) << "list metadata: section=bucket.instance key=" << *iter << dendl;
-
-        key = *iter;
 
+      do {
         yield {
-          rgw_http_param_pair pairs[] = { { "key", key.c_str() },
-                                          { NULL, NULL } };
+          string entrypoint = string("/admin/metadata/bucket.instance");
 
-          call(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), sync_env->conn, sync_env->http_manager, path, pairs, &meta_info));
+          rgw_http_param_pair pairs[] = {{"max-entries", "1000"},
+                                         {"marker", result.marker.c_str()},
+                                         {NULL, NULL}};
+
+          call(new RGWReadRESTResourceCR<read_metadata_list>(store->ctx(), sync_env->conn, sync_env->http_manager,
+                                                             entrypoint, pairs, &result));
         }
+        if (retcode < 0) {
+          ldout(sync_env->cct, 0) << "ERROR: failed to fetch metadata for section bucket.instance" << dendl;
+          return set_cr_error(retcode);
+        }
+
+        for (iter = result.keys.begin(); iter != result.keys.end(); ++iter) {
+          ldout(sync_env->cct, 20) << "list metadata: section=bucket.instance key=" << *iter << dendl;
+          key = *iter;
 
-        num_shards = meta_info.data.get_bucket_info().num_shards;
-        if (num_shards > 0) {
-          for (i = 0; i < num_shards; i++) {
-            char buf[16];
-            snprintf(buf, sizeof(buf), ":%d", i);
-            s = key + buf;
-            yield entries_index->append(s, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, i));
+          yield {
+            rgw_http_param_pair pairs[] = {{"key", key.c_str()},
+                                           {NULL, NULL}};
+
+            call(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), sync_env->conn, sync_env->http_manager, path, pairs, &meta_info));
+          }
+
+          num_shards = meta_info.data.get_bucket_info().num_shards;
+          if (num_shards > 0) {
+            for (i = 0; i < num_shards; i++) {
+              char buf[16];
+              snprintf(buf, sizeof(buf), ":%d", i);
+              s = key + buf;
+              yield entries_index->append(s, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, i));
+            }
+          } else {
+            yield entries_index->append(key, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, -1));
           }
-        } else {
-          yield entries_index->append(key, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, -1));
         }
-      }
+        truncated = result.truncated;
+      } while (truncated);
+
       yield {
         if (!entries_index->finish()) {
           failed = true;
@@ -814,25 +840,27 @@ public:
           marker.total_entries = entries_index->get_total_entries(shard_id);
           spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store->svc.sysobj,
                                                                 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
-                                                                marker), true);
+                                                                marker),
+                true);
         }
       } else {
-          yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
-                                                          EIO, string("failed to build bucket instances map")));
+        yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
+                                                        EIO, string("failed to build bucket instances map")));
       }
       while (collect(&ret, NULL)) {
-       if (ret < 0) {
+        if (ret < 0) {
           yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
                                                           -ret, string("failed to store sync status: ") + cpp_strerror(-ret)));
-         req_ret = ret;
-       }
+          req_ret = ret;
+        }
         yield;
       }
+
       drain_all();
       if (req_ret < 0) {
         yield return set_cr_error(req_ret);
       }
-      yield return set_cr_done();
+       yield return set_cr_done();
     }
     return 0;
   }