]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_data_sync.cc
import ceph 12.2.12
[ceph.git] / ceph / src / rgw / rgw_data_sync.cc
index ad43157824e592692b67a64680b8eebaee65febe..5da750df2341ea33963c7e6c7cd9ffe20054c2ac 100644 (file)
@@ -308,24 +308,24 @@ struct read_remote_data_log_response {
 class RGWReadRemoteDataLogShardCR : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
 
-  RGWRESTReadResource *http_op;
+  RGWRESTReadResource *http_op = nullptr;
 
   int shard_id;
-  string *pmarker;
+  const std::string& marker;
+  string *pnext_marker;
   list<rgw_data_change_log_entry> *entries;
   bool *truncated;
 
   read_remote_data_log_response response;
 
 public:
-  RGWReadRemoteDataLogShardCR(RGWDataSyncEnv *_sync_env,
-                              int _shard_id, string *_pmarker, list<rgw_data_change_log_entry> *_entries, bool *_truncated) : RGWCoroutine(_sync_env->cct),
-                                                      sync_env(_sync_env),
-                                                      http_op(NULL),
-                                                      shard_id(_shard_id),
-                                                      pmarker(_pmarker),
-                                                      entries(_entries),
-                                                      truncated(_truncated) {
+  RGWReadRemoteDataLogShardCR(RGWDataSyncEnv *_sync_env, int _shard_id,
+                              const std::string& marker, string *pnext_marker,
+                              list<rgw_data_change_log_entry> *_entries,
+                              bool *_truncated)
+    : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+      shard_id(_shard_id), marker(marker), pnext_marker(pnext_marker),
+      entries(_entries), truncated(_truncated) {
   }
   ~RGWReadRemoteDataLogShardCR() override {
     if (http_op) {
@@ -340,7 +340,7 @@ public:
        snprintf(buf, sizeof(buf), "%d", shard_id);
         rgw_http_param_pair pairs[] = { { "type" , "data" },
                                        { "id", buf },
-                                       { "marker", pmarker->c_str() },
+                                       { "marker", marker.c_str() },
                                        { "extra-info", "true" },
                                        { NULL, NULL } };
 
@@ -366,7 +366,7 @@ public:
         }
         entries->clear();
         entries->swap(response.entries);
-        *pmarker = response.marker;
+        *pnext_marker = response.marker;
         *truncated = response.truncated;
         return set_cr_done();
       }
@@ -1112,6 +1112,7 @@ class RGWDataSyncShardCR : public RGWCoroutine {
 
   RGWDataSyncShardMarkerTrack *marker_tracker;
 
+  std::string next_marker;
   list<rgw_data_change_log_entry> log_entries;
   list<rgw_data_change_log_entry>::iterator log_iter;
   bool truncated;
@@ -1158,7 +1159,7 @@ class RGWDataSyncShardCR : public RGWCoroutine {
 public:
   RGWDataSyncShardCR(RGWDataSyncEnv *_sync_env,
                      rgw_pool& _pool,
-                    uint32_t _shard_id, rgw_data_sync_marker& _marker, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
+                    uint32_t _shard_id, const rgw_data_sync_marker& _marker, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
                                                       sync_env(_sync_env),
                                                      pool(_pool),
                                                      shard_id(_shard_id),
@@ -1242,6 +1243,7 @@ public:
         if (lease_cr->is_done()) {
           ldout(cct, 5) << "lease cr failed, done early " << dendl;
           set_status("lease lock failed, early abort");
+          drain_all();
           return set_cr_error(lease_cr->get_ret_status());
         }
         set_sleeping(true);
@@ -1323,6 +1325,7 @@ public:
           if (lease_cr->is_done()) {
             ldout(cct, 5) << "lease cr failed, done early " << dendl;
             set_status("lease lock failed, early abort");
+            drain_all();
             return set_cr_error(lease_cr->get_ret_status());
           }
           set_sleeping(true);
@@ -1387,7 +1390,8 @@ public:
 #define INCREMENTAL_MAX_ENTRIES 100
              ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " sync_marker=" << sync_marker.marker << dendl;
         spawned_keys.clear();
-        yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &sync_marker.marker, &log_entries, &truncated));
+        yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, sync_marker.marker,
+                                                   &next_marker, &log_entries, &truncated));
         if (retcode < 0) {
           ldout(sync_env->cct, 0) << "ERROR: failed to read remote data log info: ret=" << retcode << dendl;
           stop_spawned_services();
@@ -1430,11 +1434,17 @@ public:
             }
             /* not waiting for child here */
           }
-             }
-             ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " sync_marker=" << sync_marker.marker << " truncated=" << truncated << dendl;
-             if (!truncated) {
-        yield wait(get_idle_interval());
-      }
+        }
+        ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " sync_marker=" << sync_marker.marker
+            << " next_marker=" << next_marker << " truncated=" << truncated << dendl;
+        if (!truncated) {
+          yield wait(get_idle_interval());
+        }
+        if (!next_marker.empty()) {
+          sync_marker.marker = next_marker;
+        } else if (!log_entries.empty()) {
+          sync_marker.marker = log_entries.back().log_id;
+        }
       } while (true);
     }
     return 0;
@@ -2086,6 +2096,7 @@ class RGWReadPendingBucketShardsCoroutine : public RGWCoroutine {
   rgw_data_sync_marker* sync_marker;
   int count;
 
+  std::string next_marker;
   list<rgw_data_change_log_entry> log_entries;
   bool truncated;
 
@@ -2121,7 +2132,8 @@ int RGWReadPendingBucketShardsCoroutine::operate()
     marker = sync_marker->marker;
     count = 0;
     do{
-      yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &marker, &log_entries, &truncated));
+      yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, marker,
+                                                 &next_marker, &log_entries, &truncated));
 
       if (retcode == -ENOENT) {
         break;
@@ -2229,6 +2241,16 @@ struct bucket_list_entry {
     JSONDecoder::decode_json("VersionedEpoch", versioned_epoch, obj);
     JSONDecoder::decode_json("RgwxTag", rgw_tag, obj);
   }
+
+  RGWModifyOp get_modify_op() const {
+    if (delete_marker) {
+      return CLS_RGW_OP_LINK_OLH_DM;
+    } else if (!key.instance.empty() && key.instance != "null") {
+      return CLS_RGW_OP_LINK_OLH;
+    } else {
+      return CLS_RGW_OP_ADD;
+    }
+  }
 };
 
 struct bucket_list_result {
@@ -2607,7 +2629,6 @@ class RGWBucketShardFullSyncCR : public RGWCoroutine {
   RGWBucketFullSyncShardMarkerTrack marker_tracker;
   rgw_obj_key list_marker;
   bucket_list_entry *entry{nullptr};
-  RGWModifyOp op{CLS_RGW_OP_ADD};
 
   int total_entries{0};
 
@@ -2669,12 +2690,11 @@ int RGWBucketShardFullSyncCR::operate()
         if (!marker_tracker.start(entry->key, total_entries, real_time())) {
           ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << entry->key << ". Duplicate entry?" << dendl;
         } else {
-          op = (entry->key.instance.empty() || entry->key.instance == "null" ? CLS_RGW_OP_ADD : CLS_RGW_OP_LINK_OLH);
           using SyncCR = RGWBucketSyncSingleEntryCR<rgw_obj_key, rgw_obj_key>;
           yield spawn(new SyncCR(sync_env, bucket_info, bs, entry->key,
                                  false, /* versioned, only matters for object removal */
                                  entry->versioned_epoch, entry->mtime,
-                                 entry->owner, op, CLS_RGW_STATE_COMPLETE,
+                                 entry->owner, entry->get_modify_op(), CLS_RGW_STATE_COMPLETE,
                                  entry->key, &marker_tracker, zones_trace),
                       false);
         }
@@ -3025,6 +3045,7 @@ int RGWRunBucketSyncCoroutine::operate()
       if (lease_cr->is_done()) {
         ldout(cct, 5) << "lease cr failed, done early" << dendl;
         set_status("lease lock failed, early abort");
+        drain_all();
         return set_cr_error(lease_cr->get_ret_status());
       }
       set_sleeping(true);
@@ -3077,6 +3098,14 @@ int RGWRunBucketSyncCoroutine::operate()
     do {
       if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
         yield call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bs, sync_status));
+        if (retcode == -ENOENT) {
+          ldout(sync_env->cct, 0) << "bucket sync disabled" << dendl;
+          lease_cr->abort(); // deleted lease object, abort/wakeup instead of unlock
+          lease_cr->wakeup();
+          lease_cr.reset();
+          drain_all();
+          return set_cr_done();
+        }
         if (retcode < 0) {
           ldout(sync_env->cct, 0) << "ERROR: init sync on " << bucket_shard_str{bs}
               << " failed, retcode=" << retcode << dendl;
@@ -3431,6 +3460,14 @@ int DataLogTrimCR::operate()
   return 0;
 }
 
+RGWCoroutine* create_admin_data_log_trim_cr(RGWRados *store,
+                                            RGWHTTPManager *http,
+                                            int num_shards,
+                                            std::vector<std::string>& markers)
+{
+  return new DataLogTrimCR(store, http, num_shards, markers);
+}
+
 class DataLogTrimPollCR : public RGWCoroutine {
   RGWRados *store;
   RGWHTTPManager *http;