]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_data_sync.cc
update sources to 12.2.7
[ceph.git] / ceph / src / rgw / rgw_data_sync.cc
index 8fe6497f29b61a0d2ddc3b3da13b5e351189a6d6..703bdd7ee25e9cc0d700dd374595956f13a43ea9 100644 (file)
@@ -18,6 +18,7 @@
 #include "rgw_bucket.h"
 #include "rgw_metadata.h"
 #include "rgw_sync_module.h"
+#include "rgw_sync_log_trim.h"
 
 #include "cls/lock/cls_lock_client.h"
 
@@ -151,6 +152,40 @@ bool RGWReadDataSyncStatusMarkersCR::spawn_next()
   return true;
 }
 
+class RGWReadDataSyncRecoveringShardsCR : public RGWShardCollectCR {
+  static constexpr int MAX_CONCURRENT_SHARDS = 16;
+
+  RGWDataSyncEnv *env;
+
+  uint64_t max_entries;
+  int num_shards;
+  int shard_id{0};;
+
+  string marker;
+  map<int, std::set<std::string>> &entries_map;
+
+ public:
+  RGWReadDataSyncRecoveringShardsCR(RGWDataSyncEnv *env, uint64_t _max_entries, int _num_shards,
+      map<int, std::set<std::string>>& _entries_map)
+    : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS), env(env),
+    max_entries(_max_entries), num_shards(_num_shards), entries_map(_entries_map)
+  {}
+  bool spawn_next() override;
+};
+
+bool RGWReadDataSyncRecoveringShardsCR::spawn_next()
+{
+  if (shard_id > num_shards)
+    return false;
+  string error_oid = RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id) + ".retry";
+  spawn(new RGWRadosGetOmapKeysCR(env->store, rgw_raw_obj(env->store->get_zone_params().log_pool, error_oid),
+                                  marker, &entries_map[shard_id], max_entries), false);
+
+  ++shard_id;
+  return true;
+}
+
 class RGWReadDataSyncStatusCoroutine : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
   rgw_data_sync_status *sync_status;
@@ -613,7 +648,8 @@ int RGWRemoteDataLog::read_source_log_shards_next(map<int, string> shard_markers
 
 int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncModuleInstanceRef& _sync_module)
 {
-  sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger, _source_zone, _sync_module);
+  sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger,
+                _source_zone, _sync_module, observer);
 
   if (initialized) {
     return 0;
@@ -652,9 +688,39 @@ int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status)
   return ret;
 }
 
+int RGWRemoteDataLog::read_recovering_shards(const int num_shards, set<int>& recovering_shards)
+{
+  // cannot run concurrently with run_sync(), so run in a separate manager
+  RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
+  RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
+  int ret = http_manager.set_threaded();
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
+    return ret;
+  }
+  RGWDataSyncEnv sync_env_local = sync_env;
+  sync_env_local.http_manager = &http_manager;
+  map<int, std::set<std::string>> entries_map;
+  uint64_t max_entries{1};
+  ret = crs.run(new RGWReadDataSyncRecoveringShardsCR(&sync_env_local, max_entries, num_shards, entries_map));
+  http_manager.stop();
+
+  if (ret == 0) {
+    for (const auto& entry : entries_map) {
+      if (entry.second.size() != 0) {
+        recovering_shards.insert(entry.first);
+      }
+    }
+  }
+
+  return ret;
+}
+
 int RGWRemoteDataLog::init_sync_status(int num_shards)
 {
   rgw_data_sync_status sync_status;
+  sync_status.sync_info.num_shards = num_shards;
+
   RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
   RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
   int ret = http_manager.set_threaded();
@@ -735,19 +801,20 @@ public:
 
   int operate() override {
     reenter(this) {
-      entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards,
-                                                 store->get_zone_params().log_pool,
-                                                  oid_prefix);
       yield {
         string entrypoint = string("/admin/metadata/bucket.instance");
         /* FIXME: need a better scaling solution here, requires streaming output */
         call(new RGWReadRESTResourceCR<list<string> >(store->ctx(), sync_env->conn, sync_env->http_manager,
                                                       entrypoint, NULL, &result));
       }
-      if (get_ret_status() < 0) {
+      if (retcode < 0) {
         ldout(sync_env->cct, 0) << "ERROR: failed to fetch metadata for section bucket.index" << dendl;
-        return set_state(RGWCoroutine_Error);
+        return set_cr_error(retcode);
       }
+      entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards,
+                                                 store->get_zone_params().log_pool,
+                                                  oid_prefix);
+      yield; // yield so OmapAppendCRs can start
       for (iter = result.begin(); iter != result.end(); ++iter) {
         ldout(sync_env->cct, 20) << "list metadata: section=bucket.index key=" << *iter << dendl;
 
@@ -984,10 +1051,13 @@ public:
       }
 
       if (sync_status < 0) {
-        yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", raw_key,
-                                                        -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status)));
-        if (retcode < 0) {
-          ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure: retcode=" << retcode << dendl;
+        // write actual sync failures for 'radosgw-admin sync error list'
+        if (sync_status != -EBUSY && sync_status != -EAGAIN) {
+          yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", raw_key,
+                                                          -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status)));
+          if (retcode < 0) {
+            ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure: retcode=" << retcode << dendl;
+          }
         }
         if (error_repo && !error_repo->append(raw_key)) {
           ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure in error repo: retcode=" << retcode << dendl;
@@ -1000,6 +1070,9 @@ public:
              << error_repo->get_obj() << " retcode=" << retcode << dendl;
         }
       }
+      if (sync_env->observer) {
+        sync_env->observer->on_bucket_changed(bs.bucket.get_key());
+      }
       /* FIXME: what do do in case of error */
       if (marker_tracker && !entry_marker.empty()) {
         /* update marker */
@@ -1034,8 +1107,8 @@ class RGWDataSyncShardCR : public RGWCoroutine {
   uint32_t shard_id;
   rgw_data_sync_marker sync_marker;
 
-  map<string, bufferlist> entries;
-  map<string, bufferlist>::iterator iter;
+  std::set<std::string> entries;
+  std::set<std::string>::iterator iter;
 
   string oid;
 
@@ -1076,7 +1149,7 @@ class RGWDataSyncShardCR : public RGWCoroutine {
 
   string error_oid;
   RGWOmapAppend *error_repo;
-  map<string, bufferlist> error_entries;
+  std::set<std::string> error_entries;
   string error_marker;
   int max_error_entries;
 
@@ -1194,20 +1267,20 @@ public:
         }
         iter = entries.begin();
         for (; iter != entries.end(); ++iter) {
-          ldout(sync_env->cct, 20) << __func__ << ": full sync: " << iter->first << dendl;
+          ldout(sync_env->cct, 20) << __func__ << ": full sync: " << *iter << dendl;
           total_entries++;
-          if (!marker_tracker->start(iter->first, total_entries, real_time())) {
-            ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << iter->first << ". Duplicate entry?" << dendl;
+          if (!marker_tracker->start(*iter, total_entries, real_time())) {
+            ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << *iter << ". Duplicate entry?" << dendl;
           } else {
             // fetch remote and write locally
-            yield spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, marker_tracker, error_repo, false), false);
+            yield spawn(new RGWDataSyncSingleEntryCR(sync_env, *iter, *iter, marker_tracker, error_repo, false), false);
             if (retcode < 0) {
               lease_cr->go_down();
               drain_all();
               return set_cr_error(retcode);
             }
           }
-          sync_marker.marker = iter->first;
+          sync_marker.marker = *iter;
         }
       } while ((int)entries.size() == max_entries);
 
@@ -1274,9 +1347,9 @@ public:
         ldout(sync_env->cct, 20) << __func__ << "(): read error repo, got " << error_entries.size() << " entries" << dendl;
         iter = error_entries.begin();
         for (; iter != error_entries.end(); ++iter) {
-          ldout(sync_env->cct, 20) << __func__ << "(): handle error entry: " << iter->first << dendl;
-          spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, nullptr /* no marker tracker */, error_repo, true), false);
-          error_marker = iter->first;
+          error_marker = *iter;
+          ldout(sync_env->cct, 20) << __func__ << "(): handle error entry: " << error_marker << dendl;
+          spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true), false);
         }
         if ((int)error_entries.size() != max_error_entries) {
           if (error_marker.empty() && error_entries.empty()) {
@@ -1480,6 +1553,7 @@ public:
 
       if  ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) {
         /* call sync module init here */
+        sync_status.sync_info.num_shards = num_shards;
         yield call(data_sync_module->init_sync(sync_env));
         if (retcode < 0) {
           ldout(sync_env->cct, 0) << "ERROR: sync module init_sync() failed, retcode=" << retcode << dendl;
@@ -1595,8 +1669,9 @@ class RGWDataSyncControlCR : public RGWBackoffControlCR
   RGWDataSyncEnv *sync_env;
   uint32_t num_shards;
 
+  static constexpr bool exit_on_error = false; // retry on all errors
 public:
-  RGWDataSyncControlCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards) : RGWBackoffControlCR(_sync_env->cct, true),
+  RGWDataSyncControlCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards) : RGWBackoffControlCR(_sync_env->cct, exit_on_error),
                                                       sync_env(_sync_env), num_shards(_num_shards) {
   }
 
@@ -1670,7 +1745,9 @@ int RGWDataSyncStatusManager::init()
 
   RGWZoneParams& zone_params = store->get_zone_params();
 
-  sync_module = store->get_sync_module();
+  if (sync_module == nullptr) { 
+    sync_module = store->get_sync_module();
+  }
 
   conn = store->get_zone_conn_by_id(source_zone);
   if (!conn) {
@@ -1736,35 +1813,22 @@ int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn,
   bs.bucket = bucket;
   bs.shard_id = shard_id;
 
-  sync_env.init(store->ctx(), store, conn, async_rados, http_manager, _error_logger, source_zone, _sync_module);
+  sync_env.init(store->ctx(), store, conn, async_rados, http_manager,
+                _error_logger, source_zone, _sync_module, nullptr);
 
   return 0;
 }
 
-struct bucket_index_marker_info {
-  string bucket_ver;
-  string master_ver;
-  string max_marker;
-  bool syncstopped{false};
-
-  void decode_json(JSONObj *obj) {
-    JSONDecoder::decode_json("bucket_ver", bucket_ver, obj);
-    JSONDecoder::decode_json("master_ver", master_ver, obj);
-    JSONDecoder::decode_json("max_marker", max_marker, obj);
-    JSONDecoder::decode_json("syncstopped", syncstopped, obj);
-  }
-};
-
 class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
   const string instance_key;
 
-  bucket_index_marker_info *info;
+  rgw_bucket_index_marker_info *info;
 
 public:
   RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncEnv *_sync_env,
                                   const rgw_bucket_shard& bs,
-                                  bucket_index_marker_info *_info)
+                                  rgw_bucket_index_marker_info *_info)
     : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
       instance_key(bs.get_key()), info(_info) {}
 
@@ -1777,7 +1841,7 @@ public:
                                        { NULL, NULL } };
 
         string p = "/admin/log/";
-        call(new RGWReadRESTResourceCR<bucket_index_marker_info>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, info));
+        call(new RGWReadRESTResourceCR<rgw_bucket_index_marker_info>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, info));
       }
       if (retcode < 0) {
         return set_cr_error(retcode);
@@ -1796,7 +1860,7 @@ class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine {
 
   rgw_bucket_shard_sync_info& status;
 
-  bucket_index_marker_info info;
+  rgw_bucket_index_marker_info info;
 public:
   RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
                                         const rgw_bucket_shard& bs,
@@ -1920,6 +1984,171 @@ int RGWReadBucketSyncStatusCoroutine::operate()
   }
   return 0;
 }
+
+#define OMAP_READ_MAX_ENTRIES 10
+class RGWReadRecoveringBucketShardsCoroutine : public RGWCoroutine {
+  RGWDataSyncEnv *sync_env;
+  RGWRados *store;
+  
+  const int shard_id;
+  int max_entries;
+
+  set<string>& recovering_buckets;
+  string marker;
+  string error_oid;
+
+  set<string> error_entries;
+  int max_omap_entries;
+  int count;
+
+public:
+  RGWReadRecoveringBucketShardsCoroutine(RGWDataSyncEnv *_sync_env, const int _shard_id,
+                                      set<string>& _recovering_buckets, const int _max_entries) 
+  : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+  store(sync_env->store), shard_id(_shard_id), max_entries(_max_entries),
+  recovering_buckets(_recovering_buckets), max_omap_entries(OMAP_READ_MAX_ENTRIES)
+  {
+    error_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id) + ".retry";
+  }
+
+  int operate() override;
+};
+
+int RGWReadRecoveringBucketShardsCoroutine::operate()
+{
+  reenter(this){
+    //read recovering bucket shards
+    count = 0;
+    do {
+      yield call(new RGWRadosGetOmapKeysCR(store, rgw_raw_obj(store->get_zone_params().log_pool, error_oid), 
+            marker, &error_entries, max_omap_entries));
+
+      if (retcode == -ENOENT) {
+        break;
+      }
+
+      if (retcode < 0) {
+        ldout(sync_env->cct, 0) << "failed to read recovering bucket shards with " 
+          << cpp_strerror(retcode) << dendl;
+        return set_cr_error(retcode);
+      }
+
+      if (error_entries.empty()) {
+        break;
+      }
+
+      count += error_entries.size();
+      marker = *error_entries.rbegin();
+      recovering_buckets.insert(error_entries.begin(), error_entries.end());
+    }while((int)error_entries.size() == max_omap_entries && count < max_entries);
+  
+    return set_cr_done();
+  }
+
+  return 0;
+}
+
+class RGWReadPendingBucketShardsCoroutine : public RGWCoroutine {
+  RGWDataSyncEnv *sync_env;
+  RGWRados *store;
+  
+  const int shard_id;
+  int max_entries;
+
+  set<string>& pending_buckets;
+  string marker;
+  string status_oid;
+
+  rgw_data_sync_marker* sync_marker;
+  int count;
+
+  list<rgw_data_change_log_entry> log_entries;
+  bool truncated;
+
+public:
+  RGWReadPendingBucketShardsCoroutine(RGWDataSyncEnv *_sync_env, const int _shard_id,
+                                      set<string>& _pending_buckets,
+                                      rgw_data_sync_marker* _sync_marker, const int _max_entries) 
+  : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+  store(sync_env->store), shard_id(_shard_id), max_entries(_max_entries),
+  pending_buckets(_pending_buckets), sync_marker(_sync_marker)
+  {
+    status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
+  }
+
+  int operate() override;
+};
+
+int RGWReadPendingBucketShardsCoroutine::operate()
+{
+  reenter(this){
+    //read sync status marker
+    using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
+    yield call(new CR(sync_env->async_rados, store, 
+                      rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
+                      sync_marker));
+    if (retcode < 0) {
+      ldout(sync_env->cct,0) << "failed to read sync status marker with " 
+        << cpp_strerror(retcode) << dendl;
+      return set_cr_error(retcode);
+    }
+
+    //read pending bucket shards
+    marker = sync_marker->marker;
+    count = 0;
+    do{
+      yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &marker, &log_entries, &truncated));
+
+      if (retcode == -ENOENT) {
+        break;
+      }
+
+      if (retcode < 0) {
+        ldout(sync_env->cct,0) << "failed to read remote data log info with " 
+          << cpp_strerror(retcode) << dendl;
+        return set_cr_error(retcode);
+      }
+
+      if (log_entries.empty()) {
+        break;
+      }
+
+      count += log_entries.size();
+      for (const auto& entry : log_entries) {
+        pending_buckets.insert(entry.entry.key);
+      }
+    }while(truncated && count < max_entries);
+
+    return set_cr_done();
+  }
+
+  return 0;
+}
+
+int RGWRemoteDataLog::read_shard_status(int shard_id, set<string>& pending_buckets, set<string>& recovering_buckets, rgw_data_sync_marker *sync_marker, const int max_entries)
+{
+  // cannot run concurrently with run_sync(), so run in a separate manager
+  RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
+  RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
+  int ret = http_manager.set_threaded();
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "failed in http_manager.start() ret=" << ret << dendl;
+    return ret;
+  }
+  RGWDataSyncEnv sync_env_local = sync_env;
+  sync_env_local.http_manager = &http_manager;
+  list<RGWCoroutinesStack *> stacks;
+  RGWCoroutinesStack* recovering_stack = new RGWCoroutinesStack(store->ctx(), &crs);
+  recovering_stack->call(new RGWReadRecoveringBucketShardsCoroutine(&sync_env_local, shard_id, recovering_buckets, max_entries));
+  stacks.push_back(recovering_stack);
+  RGWCoroutinesStack* pending_stack = new RGWCoroutinesStack(store->ctx(), &crs);
+  pending_stack->call(new RGWReadPendingBucketShardsCoroutine(&sync_env_local, shard_id, pending_buckets, sync_marker, max_entries));
+  stacks.push_back(pending_stack);
+  ret = crs.run(stacks);
+  http_manager.stop();
+  return ret;
+}
+
 RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status)
 {
   return new RGWReadBucketSyncStatusCoroutine(&sync_env, bs, sync_status);
@@ -2297,7 +2526,7 @@ public:
         sync_status = retcode;
       }
       if (!error_ss.str().empty()) {
-        yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", error_ss.str(), -retcode, "failed to sync object"));
+        yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", error_ss.str(), -retcode, string("failed to sync object") + cpp_strerror(-sync_status)));
       }
 done:
       if (sync_status == 0) {
@@ -2538,6 +2767,9 @@ int RGWBucketShardIncrementalSyncCR::operate()
           syncstopped = false;
           continue;
         }
+        if (e.op == CLS_RGW_OP_CANCEL) {
+          continue;
+        }
         if (e.state != CLS_RGW_STATE_COMPLETE) {
           continue;
         }
@@ -2962,6 +3194,55 @@ string RGWBucketSyncStatusManager::status_oid(const string& source_zone,
   return bucket_status_oid_prefix + "." + source_zone + ":" + bs.get_key();
 }
 
+class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR {
+  static constexpr int max_concurrent_shards = 16;
+  RGWRados *const store;
+  RGWDataSyncEnv *const env;
+  const int num_shards;
+  rgw_bucket_shard bs;
+
+  using Vector = std::vector<rgw_bucket_shard_sync_info>;
+  Vector::iterator i, end;
+
+ public:
+  RGWCollectBucketSyncStatusCR(RGWRados *store, RGWDataSyncEnv *env,
+                               int num_shards, const rgw_bucket& bucket,
+                               Vector *status)
+    : RGWShardCollectCR(store->ctx(), max_concurrent_shards),
+      store(store), env(env), num_shards(num_shards),
+      bs(bucket, num_shards > 0 ? 0 : -1), // start at shard 0 or -1
+      i(status->begin()), end(status->end())
+  {}
+
+  bool spawn_next() override {
+    if (i == end) {
+      return false;
+    }
+    spawn(new RGWReadBucketSyncStatusCoroutine(env, bs, &*i), false);
+    ++i;
+    ++bs.shard_id;
+    return true;
+  }
+};
+
+int rgw_bucket_sync_status(RGWRados *store, const std::string& source_zone,
+                           const RGWBucketInfo& bucket_info,
+                           std::vector<rgw_bucket_shard_sync_info> *status)
+{
+  const auto num_shards = bucket_info.num_shards;
+  status->clear();
+  status->resize(std::max<size_t>(1, num_shards));
+
+  RGWDataSyncEnv env;
+  RGWSyncModuleInstanceRef module; // null sync module
+  env.init(store->ctx(), store, nullptr, store->get_async_rados(),
+           nullptr, nullptr, source_zone, module, nullptr);
+
+  RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
+  return crs.run(new RGWCollectBucketSyncStatusCR(store, &env, num_shards,
+                                                  bucket_info.bucket, status));
+}
+
 
 // TODO: move into rgw_data_sync_trim.cc
 #undef dout_prefix