]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_data_sync.cc
update sources to 12.2.10
[ceph.git] / ceph / src / rgw / rgw_data_sync.cc
index daaffb7cde9eb7b2c10b9eeb9490c7e9c581b588..dca9ea9196e1b3f71eb3fbdd8761d75d4ea6db55 100644 (file)
@@ -1,3 +1,6 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
 #include <boost/utility/string_ref.hpp>
 
 #include "common/ceph_json.h"
@@ -18,6 +21,7 @@
 #include "rgw_bucket.h"
 #include "rgw_metadata.h"
 #include "rgw_sync_module.h"
+#include "rgw_sync_log_trim.h"
 
 #include "cls/lock/cls_lock_client.h"
 
@@ -151,6 +155,40 @@ bool RGWReadDataSyncStatusMarkersCR::spawn_next()
   return true;
 }
 
+class RGWReadDataSyncRecoveringShardsCR : public RGWShardCollectCR {
+  static constexpr int MAX_CONCURRENT_SHARDS = 16;
+
+  RGWDataSyncEnv *env;
+
+  uint64_t max_entries;
+  int num_shards;
+  int shard_id{0};;
+
+  string marker;
+  map<int, std::set<std::string>> &entries_map;
+
+ public:
+  RGWReadDataSyncRecoveringShardsCR(RGWDataSyncEnv *env, uint64_t _max_entries, int _num_shards,
+      map<int, std::set<std::string>>& _entries_map)
+    : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS), env(env),
+    max_entries(_max_entries), num_shards(_num_shards), entries_map(_entries_map)
+  {}
+  bool spawn_next() override;
+};
+
+bool RGWReadDataSyncRecoveringShardsCR::spawn_next()
+{
+  if (shard_id > num_shards)
+    return false;
+  string error_oid = RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id) + ".retry";
+  spawn(new RGWRadosGetOmapKeysCR(env->store, rgw_raw_obj(env->store->get_zone_params().log_pool, error_oid),
+                                  marker, &entries_map[shard_id], max_entries), false);
+
+  ++shard_id;
+  return true;
+}
+
 class RGWReadDataSyncStatusCoroutine : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
   rgw_data_sync_status *sync_status;
@@ -613,7 +651,8 @@ int RGWRemoteDataLog::read_source_log_shards_next(map<int, string> shard_markers
 
 int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncModuleInstanceRef& _sync_module)
 {
-  sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger, _source_zone, _sync_module);
+  sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger,
+                _source_zone, _sync_module);
 
   if (initialized) {
     return 0;
@@ -652,9 +691,39 @@ int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status)
   return ret;
 }
 
+int RGWRemoteDataLog::read_recovering_shards(const int num_shards, set<int>& recovering_shards)
+{
+  // cannot run concurrently with run_sync(), so run in a separate manager
+  RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
+  RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
+  int ret = http_manager.set_threaded();
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
+    return ret;
+  }
+  RGWDataSyncEnv sync_env_local = sync_env;
+  sync_env_local.http_manager = &http_manager;
+  map<int, std::set<std::string>> entries_map;
+  uint64_t max_entries{1};
+  ret = crs.run(new RGWReadDataSyncRecoveringShardsCR(&sync_env_local, max_entries, num_shards, entries_map));
+  http_manager.stop();
+
+  if (ret == 0) {
+    for (const auto& entry : entries_map) {
+      if (entry.second.size() != 0) {
+        recovering_shards.insert(entry.first);
+      }
+    }
+  }
+
+  return ret;
+}
+
 int RGWRemoteDataLog::init_sync_status(int num_shards)
 {
   rgw_data_sync_status sync_status;
+  sync_status.sync_info.num_shards = num_shards;
+
   RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
   RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
   int ret = http_manager.set_threaded();
@@ -866,6 +935,10 @@ public:
     marker_to_key[marker] = key;
     return true;
   }
+
+  RGWOrderCallCR *allocate_order_control_cr() {
+    return new RGWLastCallerWinsCR(sync_env->cct);
+  }
 };
 
 // ostream wrappers to print buckets without copying strings
@@ -985,10 +1058,13 @@ public:
       }
 
       if (sync_status < 0) {
-        yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", raw_key,
-                                                        -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status)));
-        if (retcode < 0) {
-          ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure: retcode=" << retcode << dendl;
+        // write actual sync failures for 'radosgw-admin sync error list'
+        if (sync_status != -EBUSY && sync_status != -EAGAIN) {
+          yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", raw_key,
+                                                          -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status)));
+          if (retcode < 0) {
+            ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure: retcode=" << retcode << dendl;
+          }
         }
         if (error_repo && !error_repo->append(raw_key)) {
           ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure in error repo: retcode=" << retcode << dendl;
@@ -1021,12 +1097,6 @@ public:
 #define BUCKET_SHARD_SYNC_SPAWN_WINDOW 20
 #define DATA_SYNC_MAX_ERR_ENTRIES 10
 
-enum RemoteDatalogStatus {
-  RemoteNotTrimmed = 0,
-  RemoteTrimmed = 1,
-  RemoteMightTrimmed = 2
-};
-
 class RGWDataSyncShardCR : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
 
@@ -1035,8 +1105,8 @@ class RGWDataSyncShardCR : public RGWCoroutine {
   uint32_t shard_id;
   rgw_data_sync_marker sync_marker;
 
-  map<string, bufferlist> entries;
-  map<string, bufferlist>::iterator iter;
+  std::set<std::string> entries;
+  std::set<std::string>::iterator iter;
 
   string oid;
 
@@ -1046,10 +1116,6 @@ class RGWDataSyncShardCR : public RGWCoroutine {
   list<rgw_data_change_log_entry>::iterator log_iter;
   bool truncated;
 
-  RGWDataChangesLogInfo shard_info;
-  string datalog_marker;
-
-  RemoteDatalogStatus remote_trimmed;
   Mutex inc_lock;
   Cond inc_cond;
 
@@ -1077,11 +1143,11 @@ class RGWDataSyncShardCR : public RGWCoroutine {
 
   string error_oid;
   RGWOmapAppend *error_repo;
-  map<string, bufferlist> error_entries;
+  std::set<std::string> error_entries;
   string error_marker;
   int max_error_entries;
 
-  ceph::real_time error_retry_time;
+  ceph::coarse_real_time error_retry_time;
 
 #define RETRY_BACKOFF_SECS_MIN 60
 #define RETRY_BACKOFF_SECS_DEFAULT 60
@@ -1097,7 +1163,7 @@ public:
                                                      pool(_pool),
                                                      shard_id(_shard_id),
                                                      sync_marker(_marker),
-                                                      marker_tracker(NULL), truncated(false), remote_trimmed(RemoteNotTrimmed), inc_lock("RGWDataSyncShardCR::inc_lock"),
+                                                      marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"),
                                                       total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL),
                                                       lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES),
                                                       retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT) {
@@ -1186,6 +1252,11 @@ public:
       set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker));
       total_entries = sync_marker.pos;
       do {
+        if (!lease_cr->is_locked()) {
+          stop_spawned_services();
+          drain_all();
+          return set_cr_error(-ECANCELED);
+        }
         yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid), sync_marker.marker, &entries, max_entries));
         if (retcode < 0) {
           ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
@@ -1195,25 +1266,30 @@ public:
         }
         iter = entries.begin();
         for (; iter != entries.end(); ++iter) {
-          ldout(sync_env->cct, 20) << __func__ << ": full sync: " << iter->first << dendl;
+          ldout(sync_env->cct, 20) << __func__ << ": full sync: " << *iter << dendl;
           total_entries++;
-          if (!marker_tracker->start(iter->first, total_entries, real_time())) {
-            ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << iter->first << ". Duplicate entry?" << dendl;
+          if (!marker_tracker->start(*iter, total_entries, real_time())) {
+            ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << *iter << ". Duplicate entry?" << dendl;
           } else {
             // fetch remote and write locally
-            yield spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, marker_tracker, error_repo, false), false);
-            if (retcode < 0) {
-              lease_cr->go_down();
-              drain_all();
-              return set_cr_error(retcode);
+            yield spawn(new RGWDataSyncSingleEntryCR(sync_env, *iter, *iter, marker_tracker, error_repo, false), false);
+          }
+          sync_marker.marker = *iter;
+
+          while ((int)num_spawned() > spawn_window) {
+            set_status() << "num_spawned() > spawn_window";
+            yield wait_for_child();
+            int ret;
+            while (collect(&ret, lease_stack.get())) {
+              if (ret < 0) {
+                ldout(cct, 10) << "a sync operation returned error" << dendl;
+              }
             }
           }
-          sync_marker.marker = iter->first;
         }
       } while ((int)entries.size() == max_entries);
 
-      lease_cr->go_down();
-      drain_all();
+      drain_all_but_stack(lease_stack.get());
 
       yield {
         /* update marker to reflect we're done with full sync */
@@ -1228,25 +1304,33 @@ public:
       if (retcode < 0) {
         ldout(sync_env->cct, 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
         lease_cr->go_down();
+        drain_all();
         return set_cr_error(retcode);
       }
+      // keep lease and transition to incremental_sync()
     }
     return 0;
   }
 
   int incremental_sync() {
     reenter(&incremental_cr) {
-      yield init_lease_cr();
-      while (!lease_cr->is_locked()) {
-        if (lease_cr->is_done()) {
-          ldout(cct, 5) << "lease cr failed, done early " << dendl;
-          set_status("lease lock failed, early abort");
-          return set_cr_error(lease_cr->get_ret_status());
+      ldout(cct, 10) << "start incremental sync" << dendl;
+      if (lease_cr) {
+        ldout(cct, 10) << "lease already held from full sync" << dendl;
+      } else {
+        yield init_lease_cr();
+        while (!lease_cr->is_locked()) {
+          if (lease_cr->is_done()) {
+            ldout(cct, 5) << "lease cr failed, done early " << dendl;
+            set_status("lease lock failed, early abort");
+            return set_cr_error(lease_cr->get_ret_status());
+          }
+          set_sleeping(true);
+          yield;
         }
-        set_sleeping(true);
-        yield;
+        set_status("lease acquired");
+        ldout(cct, 10) << "took lease" << dendl;
       }
-      set_status("lease acquired");
       error_repo = new RGWOmapAppend(sync_env->async_rados, sync_env->store,
                                      rgw_raw_obj(pool, error_oid),
                                      1 /* no buffer */);
@@ -1255,6 +1339,11 @@ public:
       logger.log("inc sync");
       set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker));
       do {
+        if (!lease_cr->is_locked()) {
+          stop_spawned_services();
+          drain_all();
+          return set_cr_error(-ECANCELED);
+        }
         current_modified.clear();
         inc_lock.Lock();
         current_modified.swap(modified_shards);
@@ -1268,104 +1357,105 @@ public:
           }
         }
 
-        /* process bucket shards that previously failed */
-        yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid),
-                                             error_marker, &error_entries,
-                                             max_error_entries));
-        ldout(sync_env->cct, 20) << __func__ << "(): read error repo, got " << error_entries.size() << " entries" << dendl;
-        iter = error_entries.begin();
-        for (; iter != error_entries.end(); ++iter) {
-          ldout(sync_env->cct, 20) << __func__ << "(): handle error entry: " << iter->first << dendl;
-          spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, nullptr /* no marker tracker */, error_repo, true), false);
-          error_marker = iter->first;
-        }
-        if ((int)error_entries.size() != max_error_entries) {
-          if (error_marker.empty() && error_entries.empty()) {
-            /* the retry repo is empty, we back off a bit before calling it again */
-            retry_backoff_secs *= 2;
-            if (retry_backoff_secs > RETRY_BACKOFF_SECS_MAX) {
-              retry_backoff_secs = RETRY_BACKOFF_SECS_MAX;
+        if (error_retry_time <= ceph::coarse_real_clock::now()) {
+          /* process bucket shards that previously failed */
+          yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid),
+                                               error_marker, &error_entries,
+                                               max_error_entries));
+          ldout(sync_env->cct, 20) << __func__ << "(): read error repo, got " << error_entries.size() << " entries" << dendl;
+          iter = error_entries.begin();
+          for (; iter != error_entries.end(); ++iter) {
+            error_marker = *iter;
+            ldout(sync_env->cct, 20) << __func__ << "(): handle error entry: " << error_marker << dendl;
+            spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true), false);
+          }
+          if ((int)error_entries.size() != max_error_entries) {
+            if (error_marker.empty() && error_entries.empty()) {
+              /* the retry repo is empty, we back off a bit before calling it again */
+              retry_backoff_secs *= 2;
+              if (retry_backoff_secs > RETRY_BACKOFF_SECS_MAX) {
+                retry_backoff_secs = RETRY_BACKOFF_SECS_MAX;
+              }
+            } else {
+              retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT;
             }
-          } else {
-            retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT;
+            error_retry_time = ceph::coarse_real_clock::now() + make_timespan(retry_backoff_secs);
+            error_marker.clear();
           }
-          error_retry_time = ceph::real_clock::now() + make_timespan(retry_backoff_secs);
-          error_marker.clear();
         }
 
-
-        yield call(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &shard_info));
+#define INCREMENTAL_MAX_ENTRIES 100
+             ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " sync_marker=" << sync_marker.marker << dendl;
+        spawned_keys.clear();
+        yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &sync_marker.marker, &log_entries, &truncated));
         if (retcode < 0) {
-          ldout(sync_env->cct, 0) << "ERROR: failed to fetch remote data log info: ret=" << retcode << dendl;
+          ldout(sync_env->cct, 0) << "ERROR: failed to read remote data log info: ret=" << retcode << dendl;
           stop_spawned_services();
           drain_all();
           return set_cr_error(retcode);
         }
-        datalog_marker = shard_info.marker;
-        remote_trimmed = RemoteNotTrimmed;
-#define INCREMENTAL_MAX_ENTRIES 100
-       ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
-       if (datalog_marker > sync_marker.marker) {
-          spawned_keys.clear();
-          if (sync_marker.marker.empty())
-            remote_trimmed = RemoteMightTrimmed; //remote data log shard might be trimmed;
-          yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &sync_marker.marker, &log_entries, &truncated));
-          if (retcode < 0) {
-            ldout(sync_env->cct, 0) << "ERROR: failed to read remote data log info: ret=" << retcode << dendl;
-            stop_spawned_services();
-            drain_all();
-            return set_cr_error(retcode);
+        for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
+          ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key << dendl;
+          if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) {
+            ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard" << dendl;
+            marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
+            continue;
           }
-          if ((remote_trimmed == RemoteMightTrimmed) && sync_marker.marker.empty() && log_entries.empty())
-            remote_trimmed = RemoteTrimmed;
-          else
-            remote_trimmed = RemoteNotTrimmed;
-          for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
-            ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key << dendl;
-            if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) {
-              ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard" << dendl;
-              marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
-              continue;
-            }
-            if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
-              ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?" << dendl;
-            } else {
-              /*
-               * don't spawn the same key more than once. We can do that as long as we don't yield
-               */
-              if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
-                spawned_keys.insert(log_iter->entry.key);
-                spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false), false);
-                if (retcode < 0) {
-                  stop_spawned_services();
-                  drain_all();
-                  return set_cr_error(retcode);
-                }
+          if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
+            ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?" << dendl;
+          } else {
+            /*
+             * don't spawn the same key more than once. We can do that as long as we don't yield
+             */
+            if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
+              spawned_keys.insert(log_iter->entry.key);
+              spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false), false);
+              if (retcode < 0) {
+                stop_spawned_services();
+                drain_all();
+                return set_cr_error(retcode);
               }
             }
-         }
+          }
           while ((int)num_spawned() > spawn_window) {
             set_status() << "num_spawned() > spawn_window";
             yield wait_for_child();
             int ret;
             while (collect(&ret, lease_stack.get())) {
               if (ret < 0) {
-                ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
+                ldout(sync_env->cct, 10) << "a sync operation returned error" << dendl;
                 /* we have reported this error */
               }
               /* not waiting for child here */
             }
+            /* not waiting for child here */
           }
-       }
-       ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
-       if (datalog_marker == sync_marker.marker || remote_trimmed == RemoteTrimmed) {
-#define INCREMENTAL_INTERVAL 20
-         yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
-       }
+             }
+             ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " sync_marker=" << sync_marker.marker << " truncated=" << truncated << dendl;
+             if (!truncated) {
+        yield wait(get_idle_interval());
+      }
       } while (true);
     }
     return 0;
   }
+
+  utime_t get_idle_interval() const {
+#define INCREMENTAL_INTERVAL 20
+    ceph::timespan interval = std::chrono::seconds(INCREMENTAL_INTERVAL);
+    if (!ceph::coarse_real_clock::is_zero(error_retry_time)) {
+      auto now = ceph::coarse_real_clock::now();
+      if (error_retry_time > now) {
+        auto d = error_retry_time - now;
+        if (interval > d) {
+          interval = d;
+        }
+      }
+    }
+    // convert timespan -> time_point -> utime_t
+    return utime_t(ceph::coarse_real_clock::zero() + interval);
+  }
+
   void stop_spawned_services() {
     lease_cr->go_down();
     if (error_repo) {
@@ -1481,6 +1571,7 @@ public:
 
       if  ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) {
         /* call sync module init here */
+        sync_status.sync_info.num_shards = num_shards;
         yield call(data_sync_module->init_sync(sync_env));
         if (retcode < 0) {
           ldout(sync_env->cct, 0) << "ERROR: sync module init_sync() failed, retcode=" << retcode << dendl;
@@ -1547,7 +1638,7 @@ class RGWDefaultDataSyncModule : public RGWDataSyncModule {
 public:
   RGWDefaultDataSyncModule() {}
 
-  RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
+  RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, boost::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override;
   RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
   RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
                                      rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
@@ -1568,7 +1659,7 @@ int RGWDefaultSyncModule::create_instance(CephContext *cct, map<string, string,
   return 0;
 }
 
-RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
+RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, boost::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
 {
   return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, bucket_info,
                                  key, versioned_epoch,
@@ -1672,7 +1763,9 @@ int RGWDataSyncStatusManager::init()
 
   RGWZoneParams& zone_params = store->get_zone_params();
 
-  sync_module = store->get_sync_module();
+  if (sync_module == nullptr) { 
+    sync_module = store->get_sync_module();
+  }
 
   conn = store->get_zone_conn_by_id(source_zone);
   if (!conn) {
@@ -1738,35 +1831,22 @@ int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn,
   bs.bucket = bucket;
   bs.shard_id = shard_id;
 
-  sync_env.init(store->ctx(), store, conn, async_rados, http_manager, _error_logger, source_zone, _sync_module);
+  sync_env.init(store->ctx(), store, conn, async_rados, http_manager,
+                _error_logger, source_zone, _sync_module);
 
   return 0;
 }
 
-struct bucket_index_marker_info {
-  string bucket_ver;
-  string master_ver;
-  string max_marker;
-  bool syncstopped{false};
-
-  void decode_json(JSONObj *obj) {
-    JSONDecoder::decode_json("bucket_ver", bucket_ver, obj);
-    JSONDecoder::decode_json("master_ver", master_ver, obj);
-    JSONDecoder::decode_json("max_marker", max_marker, obj);
-    JSONDecoder::decode_json("syncstopped", syncstopped, obj);
-  }
-};
-
 class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
   const string instance_key;
 
-  bucket_index_marker_info *info;
+  rgw_bucket_index_marker_info *info;
 
 public:
   RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncEnv *_sync_env,
                                   const rgw_bucket_shard& bs,
-                                  bucket_index_marker_info *_info)
+                                  rgw_bucket_index_marker_info *_info)
     : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
       instance_key(bs.get_key()), info(_info) {}
 
@@ -1779,7 +1859,7 @@ public:
                                        { NULL, NULL } };
 
         string p = "/admin/log/";
-        call(new RGWReadRESTResourceCR<bucket_index_marker_info>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, info));
+        call(new RGWReadRESTResourceCR<rgw_bucket_index_marker_info>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, info));
       }
       if (retcode < 0) {
         return set_cr_error(retcode);
@@ -1798,7 +1878,7 @@ class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine {
 
   rgw_bucket_shard_sync_info& status;
 
-  bucket_index_marker_info info;
+  rgw_bucket_index_marker_info info;
 public:
   RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
                                         const rgw_bucket_shard& bs,
@@ -1830,6 +1910,12 @@ public:
           call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store, obj, attrs));
         }
       }
+      if (info.syncstopped) {
+        retcode = -ENOENT;
+      }
+      if (retcode < 0) {
+        return set_cr_error(retcode);
+      }
       return set_cr_done();
     }
     return 0;
@@ -1922,6 +2008,171 @@ int RGWReadBucketSyncStatusCoroutine::operate()
   }
   return 0;
 }
+
+#define OMAP_READ_MAX_ENTRIES 10
+class RGWReadRecoveringBucketShardsCoroutine : public RGWCoroutine {
+  RGWDataSyncEnv *sync_env;
+  RGWRados *store;
+  
+  const int shard_id;
+  int max_entries;
+
+  set<string>& recovering_buckets;
+  string marker;
+  string error_oid;
+
+  set<string> error_entries;
+  int max_omap_entries;
+  int count;
+
+public:
+  RGWReadRecoveringBucketShardsCoroutine(RGWDataSyncEnv *_sync_env, const int _shard_id,
+                                      set<string>& _recovering_buckets, const int _max_entries) 
+  : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+  store(sync_env->store), shard_id(_shard_id), max_entries(_max_entries),
+  recovering_buckets(_recovering_buckets), max_omap_entries(OMAP_READ_MAX_ENTRIES)
+  {
+    error_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id) + ".retry";
+  }
+
+  int operate() override;
+};
+
+int RGWReadRecoveringBucketShardsCoroutine::operate()
+{
+  reenter(this){
+    //read recovering bucket shards
+    count = 0;
+    do {
+      yield call(new RGWRadosGetOmapKeysCR(store, rgw_raw_obj(store->get_zone_params().log_pool, error_oid), 
+            marker, &error_entries, max_omap_entries));
+
+      if (retcode == -ENOENT) {
+        break;
+      }
+
+      if (retcode < 0) {
+        ldout(sync_env->cct, 0) << "failed to read recovering bucket shards with " 
+          << cpp_strerror(retcode) << dendl;
+        return set_cr_error(retcode);
+      }
+
+      if (error_entries.empty()) {
+        break;
+      }
+
+      count += error_entries.size();
+      marker = *error_entries.rbegin();
+      recovering_buckets.insert(error_entries.begin(), error_entries.end());
+    }while((int)error_entries.size() == max_omap_entries && count < max_entries);
+  
+    return set_cr_done();
+  }
+
+  return 0;
+}
+
+class RGWReadPendingBucketShardsCoroutine : public RGWCoroutine {
+  RGWDataSyncEnv *sync_env;
+  RGWRados *store;
+  
+  const int shard_id;
+  int max_entries;
+
+  set<string>& pending_buckets;
+  string marker;
+  string status_oid;
+
+  rgw_data_sync_marker* sync_marker;
+  int count;
+
+  list<rgw_data_change_log_entry> log_entries;
+  bool truncated;
+
+public:
+  RGWReadPendingBucketShardsCoroutine(RGWDataSyncEnv *_sync_env, const int _shard_id,
+                                      set<string>& _pending_buckets,
+                                      rgw_data_sync_marker* _sync_marker, const int _max_entries) 
+  : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+  store(sync_env->store), shard_id(_shard_id), max_entries(_max_entries),
+  pending_buckets(_pending_buckets), sync_marker(_sync_marker)
+  {
+    status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
+  }
+
+  int operate() override;
+};
+
+int RGWReadPendingBucketShardsCoroutine::operate()
+{
+  reenter(this){
+    //read sync status marker
+    using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
+    yield call(new CR(sync_env->async_rados, store, 
+                      rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
+                      sync_marker));
+    if (retcode < 0) {
+      ldout(sync_env->cct,0) << "failed to read sync status marker with " 
+        << cpp_strerror(retcode) << dendl;
+      return set_cr_error(retcode);
+    }
+
+    //read pending bucket shards
+    marker = sync_marker->marker;
+    count = 0;
+    do{
+      yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &marker, &log_entries, &truncated));
+
+      if (retcode == -ENOENT) {
+        break;
+      }
+
+      if (retcode < 0) {
+        ldout(sync_env->cct,0) << "failed to read remote data log info with " 
+          << cpp_strerror(retcode) << dendl;
+        return set_cr_error(retcode);
+      }
+
+      if (log_entries.empty()) {
+        break;
+      }
+
+      count += log_entries.size();
+      for (const auto& entry : log_entries) {
+        pending_buckets.insert(entry.entry.key);
+      }
+    }while(truncated && count < max_entries);
+
+    return set_cr_done();
+  }
+
+  return 0;
+}
+
+int RGWRemoteDataLog::read_shard_status(int shard_id, set<string>& pending_buckets, set<string>& recovering_buckets, rgw_data_sync_marker *sync_marker, const int max_entries)
+{
+  // cannot run concurrently with run_sync(), so run in a separate manager
+  RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
+  RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
+  int ret = http_manager.set_threaded();
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "failed in http_manager.start() ret=" << ret << dendl;
+    return ret;
+  }
+  RGWDataSyncEnv sync_env_local = sync_env;
+  sync_env_local.http_manager = &http_manager;
+  list<RGWCoroutinesStack *> stacks;
+  RGWCoroutinesStack* recovering_stack = new RGWCoroutinesStack(store->ctx(), &crs);
+  recovering_stack->call(new RGWReadRecoveringBucketShardsCoroutine(&sync_env_local, shard_id, recovering_buckets, max_entries));
+  stacks.push_back(recovering_stack);
+  RGWCoroutinesStack* pending_stack = new RGWCoroutinesStack(store->ctx(), &crs);
+  pending_stack->call(new RGWReadPendingBucketShardsCoroutine(&sync_env_local, shard_id, pending_buckets, sync_marker, max_entries));
+  stacks.push_back(pending_stack);
+  ret = crs.run(stacks);
+  http_manager.stop();
+  return ret;
+}
+
 RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status)
 {
   return new RGWReadBucketSyncStatusCoroutine(&sync_env, bs, sync_status);
@@ -2103,6 +2354,10 @@ public:
                                           rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
                                           attrs);
   }
+
+  RGWOrderCallCR *allocate_order_control_cr() {
+    return new RGWLastCallerWinsCR(sync_env->cct);
+  }
 };
 
 class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, rgw_obj_key> {
@@ -2112,16 +2367,26 @@ class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string,
   rgw_bucket_shard_inc_sync_marker sync_marker;
 
   map<rgw_obj_key, string> key_to_marker;
-  map<string, rgw_obj_key> marker_to_key;
+
+  struct operation {
+    rgw_obj_key key;
+    bool is_olh;
+  };
+  map<string, operation> marker_to_op;
+  std::set<std::string> pending_olh; // object names with pending olh operations
 
   void handle_finish(const string& marker) override {
-    map<string, rgw_obj_key>::iterator iter = marker_to_key.find(marker);
-    if (iter == marker_to_key.end()) {
+    auto iter = marker_to_op.find(marker);
+    if (iter == marker_to_op.end()) {
       return;
     }
-    key_to_marker.erase(iter->second);
-    reset_need_retry(iter->second);
-    marker_to_key.erase(iter);
+    auto& op = iter->second;
+    key_to_marker.erase(op.key);
+    reset_need_retry(op.key);
+    if (op.is_olh) {
+      pending_olh.erase(op.key.name);
+    }
+    marker_to_op.erase(iter);
   }
 
 public:
@@ -2156,19 +2421,32 @@ public:
    * Also, we should make sure that we don't run concurrent operations on the same key with
    * different ops.
    */
-  bool index_key_to_marker(const rgw_obj_key& key, const string& marker) {
-    if (key_to_marker.find(key) != key_to_marker.end()) {
+  bool index_key_to_marker(const rgw_obj_key& key, const string& marker, bool is_olh) {
+    auto result = key_to_marker.emplace(key, marker);
+    if (!result.second) { // exists
       set_need_retry(key);
       return false;
     }
-    key_to_marker[key] = marker;
-    marker_to_key[marker] = key;
+    marker_to_op[marker] = operation{key, is_olh};
+    if (is_olh) {
+      // prevent other olh ops from starting on this object name
+      pending_olh.insert(key.name);
+    }
     return true;
   }
 
-  bool can_do_op(const rgw_obj_key& key) {
+  bool can_do_op(const rgw_obj_key& key, bool is_olh) {
+    // serialize olh ops on the same object name
+    if (is_olh && pending_olh.count(key.name)) {
+      ldout(sync_env->cct, 20) << __func__ << "(): sync of " << key << " waiting for pending olh op" << dendl;
+      return false;
+    }
     return (key_to_marker.find(key) == key_to_marker.end());
   }
+
+  RGWOrderCallCR *allocate_order_control_cr() {
+    return new RGWLastCallerWinsCR(sync_env->cct);
+  }
 };
 
 template <class T, class K>
@@ -2180,7 +2458,7 @@ class RGWBucketSyncSingleEntryCR : public RGWCoroutine {
 
   rgw_obj_key key;
   bool versioned;
-  uint64_t versioned_epoch;
+  boost::optional<uint64_t> versioned_epoch;
   rgw_bucket_entry_owner owner;
   real_time timestamp;
   RGWModifyOp op;
@@ -2205,7 +2483,8 @@ public:
   RGWBucketSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
                              RGWBucketInfo *_bucket_info,
                              const rgw_bucket_shard& bs,
-                             const rgw_obj_key& _key, bool _versioned, uint64_t _versioned_epoch,
+                             const rgw_obj_key& _key, bool _versioned,
+                             boost::optional<uint64_t> _versioned_epoch,
                              real_time& _timestamp,
                              const rgw_bucket_entry_owner& _owner,
                              RGWModifyOp _op, RGWPendingState _op_state,
@@ -2220,7 +2499,7 @@ public:
                                                       marker_tracker(_marker_tracker),
                                                       sync_status(0){
     stringstream ss;
-    ss << bucket_shard_str{bs} << "/" << key << "[" << versioned_epoch << "]";
+    ss << bucket_shard_str{bs} << "/" << key << "[" << versioned_epoch.value_or(0) << "]";
     set_description() << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state;
     ldout(sync_env->cct, 20) << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state << dendl;
     set_status("init");
@@ -2256,14 +2535,8 @@ public:
             retcode = -EIO;
           } else if (op == CLS_RGW_OP_ADD ||
                      op == CLS_RGW_OP_LINK_OLH) {
-            if (op == CLS_RGW_OP_ADD && !key.instance.empty() && key.instance != "null") {
-              set_status("skipping entry");
-              ldout(sync_env->cct, 10) << "bucket skipping sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]: versioned object will be synced on link_olh" << dendl;
-              goto done;
-
-            }
             set_status("syncing obj");
-            ldout(sync_env->cct, 5) << "bucket sync: sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
+            ldout(sync_env->cct, 5) << "bucket sync: sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]" << dendl;
             logger.log("fetch");
             call(data_sync_module->sync_object(sync_env, *bucket_info, key, versioned_epoch, &zones_trace));
           } else if (op == CLS_RGW_OP_DEL || op == CLS_RGW_OP_UNLINK_INSTANCE) {
@@ -2272,12 +2545,12 @@ public:
               versioned = true;
             }
             logger.log("remove");
-            call(data_sync_module->remove_object(sync_env, *bucket_info, key, timestamp, versioned, versioned_epoch, &zones_trace));
+            call(data_sync_module->remove_object(sync_env, *bucket_info, key, timestamp, versioned, versioned_epoch.value_or(0), &zones_trace));
           } else if (op == CLS_RGW_OP_LINK_OLH_DM) {
             logger.log("creating delete marker");
             set_status("creating delete marker");
-            ldout(sync_env->cct, 10) << "creating delete marker: obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
-            call(data_sync_module->create_delete_marker(sync_env, *bucket_info, key, timestamp, owner, versioned, versioned_epoch, &zones_trace));
+            ldout(sync_env->cct, 10) << "creating delete marker: obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]" << dendl;
+            call(data_sync_module->create_delete_marker(sync_env, *bucket_info, key, timestamp, owner, versioned, versioned_epoch.value_or(0), &zones_trace));
           }
         }
       } while (marker_tracker->need_retry(key));
@@ -2299,7 +2572,7 @@ public:
         sync_status = retcode;
       }
       if (!error_ss.str().empty()) {
-        yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", error_ss.str(), -retcode, "failed to sync object"));
+        yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", error_ss.str(), -retcode, string("failed to sync object") + cpp_strerror(-sync_status)));
       }
 done:
       if (sync_status == 0) {
@@ -2326,7 +2599,7 @@ class RGWBucketShardFullSyncCR : public RGWCoroutine {
   boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
   bucket_list_result list_result;
   list<bucket_list_entry>::iterator entries_iter;
-  rgw_bucket_shard_full_sync_marker& full_marker;
+  rgw_bucket_shard_sync_info& sync_info;
   RGWBucketFullSyncShardMarkerTrack marker_tracker;
   rgw_obj_key list_marker;
   bucket_list_entry *entry{nullptr};
@@ -2345,10 +2618,10 @@ public:
                            RGWBucketInfo *_bucket_info,
                            const std::string& status_oid,
                            RGWContinuousLeaseCR *lease_cr,
-                           rgw_bucket_shard_full_sync_marker& _full_marker)
+                           rgw_bucket_shard_sync_info& sync_info)
     : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
-      bucket_info(_bucket_info), lease_cr(lease_cr), full_marker(_full_marker),
-      marker_tracker(sync_env, status_oid, full_marker),
+      bucket_info(_bucket_info), lease_cr(lease_cr), sync_info(sync_info),
+      marker_tracker(sync_env, status_oid, sync_info.full_marker),
       status_oid(status_oid) {
     logger.init(sync_env, "BucketFull", bs.get_key());
     zones_trace.insert(sync_env->source_zone);
@@ -2361,9 +2634,9 @@ int RGWBucketShardFullSyncCR::operate()
 {
   int ret;
   reenter(this) {
-    list_marker = full_marker.position;
+    list_marker = sync_info.full_marker.position;
 
-    total_entries = full_marker.count;
+    total_entries = sync_info.full_marker.count;
     do {
       if (!lease_cr->is_locked()) {
         drain_all();
@@ -2407,7 +2680,7 @@ int RGWBucketShardFullSyncCR::operate()
           while (again) {
             again = collect(&ret, nullptr);
             if (ret < 0) {
-              ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
+              ldout(sync_env->cct, 10) << "a sync operation returned error" << dendl;
               sync_status = ret;
               /* we have reported this error */
             }
@@ -2423,7 +2696,7 @@ int RGWBucketShardFullSyncCR::operate()
       while (again) {
         again = collect(&ret, nullptr);
         if (ret < 0) {
-          ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
+          ldout(sync_env->cct, 10) << "a sync operation returned error" << dendl;
           sync_status = ret;
           /* we have reported this error */
         }
@@ -2435,17 +2708,16 @@ int RGWBucketShardFullSyncCR::operate()
     /* update sync state to incremental */
     if (sync_status == 0) {
       yield {
-        rgw_bucket_shard_sync_info sync_status;
-        sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
+        sync_info.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
         map<string, bufferlist> attrs;
-        sync_status.encode_state_attr(attrs);
+        sync_info.encode_state_attr(attrs);
         RGWRados *store = sync_env->store;
         call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store,
                                             rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
                                             attrs));
       }
     } else {
-      ldout(sync_env->cct, 0) << "ERROR: failure in sync, backing out (sync_status=" << sync_status<< ")" << dendl;
+      ldout(sync_env->cct, 10) << "failure in sync, backing out (sync_status=" << sync_status<< ")" << dendl;
     }
     if (retcode < 0 && sync_status == 0) { /* actually tried to set incremental state and failed */
       ldout(sync_env->cct, 0) << "ERROR: failed to set sync state on bucket "
@@ -2460,22 +2732,25 @@ int RGWBucketShardFullSyncCR::operate()
   return 0;
 }
 
+static bool has_olh_epoch(RGWModifyOp op) {
+  return op == CLS_RGW_OP_LINK_OLH || op == CLS_RGW_OP_UNLINK_INSTANCE;
+}
+
 class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
   const rgw_bucket_shard& bs;
   RGWBucketInfo *bucket_info;
   boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
   list<rgw_bi_log_entry> list_result;
-  list<rgw_bi_log_entry>::iterator entries_iter;
+  list<rgw_bi_log_entry>::iterator entries_iter, entries_end;
   map<pair<string, string>, pair<real_time, RGWModifyOp> > squash_map;
-  rgw_bucket_shard_inc_sync_marker& inc_marker;
+  rgw_bucket_shard_sync_info& sync_info;
   rgw_obj_key key;
   rgw_bi_log_entry *entry{nullptr};
   RGWBucketIncSyncShardMarkerTrack marker_tracker;
   bool updated_status{false};
   const string& status_oid;
   const string& zone_id;
-  ceph::real_time sync_modify_time;
 
   string cur_id;
 
@@ -2490,10 +2765,12 @@ public:
                                   RGWBucketInfo *_bucket_info,
                                   const std::string& status_oid,
                                   RGWContinuousLeaseCR *lease_cr,
-                                  rgw_bucket_shard_inc_sync_marker& _inc_marker)
+                                  rgw_bucket_shard_sync_info& sync_info)
     : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
-      bucket_info(_bucket_info), lease_cr(lease_cr), inc_marker(_inc_marker),
-      marker_tracker(sync_env, status_oid, inc_marker), status_oid(status_oid) , zone_id(_sync_env->store->get_zone().id){
+      bucket_info(_bucket_info), lease_cr(lease_cr), sync_info(sync_info),
+      marker_tracker(sync_env, status_oid, sync_info.inc_marker),
+      status_oid(status_oid), zone_id(_sync_env->store->get_zone().id)
+  {
     set_description() << "bucket shard incremental sync bucket="
         << bucket_shard_str{bs};
     set_status("init");
@@ -2512,32 +2789,30 @@ int RGWBucketShardIncrementalSyncCR::operate()
         drain_all();
         return set_cr_error(-ECANCELED);
       }
-      ldout(sync_env->cct, 20) << __func__ << "(): listing bilog for incremental sync" << inc_marker.position << dendl;
-      set_status() << "listing bilog; position=" << inc_marker.position;
-      yield call(new RGWListBucketIndexLogCR(sync_env, bs, inc_marker.position,
+      ldout(sync_env->cct, 20) << __func__ << "(): listing bilog for incremental sync" << sync_info.inc_marker.position << dendl;
+      set_status() << "listing bilog; position=" << sync_info.inc_marker.position;
+      yield call(new RGWListBucketIndexLogCR(sync_env, bs, sync_info.inc_marker.position,
                                              &list_result));
-      if (retcode < 0 && retcode != -ENOENT ) {
+      if (retcode < 0 && retcode != -ENOENT) {
+        /* wait for all operations to complete */
         drain_all();
-        if (!syncstopped) {
-          /* wait for all operations to complete */
-          return set_cr_error(retcode);
-        } else {
-          /* no need to retry */
-          break;       
-       }
+        return set_cr_error(retcode);
       }
       squash_map.clear();
-      for (auto& e : list_result) {
-        if (e.op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP && (sync_modify_time < e.timestamp)) {
-          ldout(sync_env->cct, 20) << " syncstop on " << e.timestamp << dendl;
-          sync_modify_time = e.timestamp;
+      entries_iter = list_result.begin();
+      entries_end = list_result.end();
+      for (; entries_iter != entries_end; ++entries_iter) {
+        auto e = *entries_iter;
+        if (e.op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP) {
+          ldout(sync_env->cct, 20) << "syncstop on " << e.timestamp << dendl;
           syncstopped = true;
+          entries_end = entries_iter; // dont sync past here
+          break;
+        }
+        if (e.op == RGWModifyOp::CLS_RGW_OP_RESYNC) {
           continue;
         }
-        if (e.op == RGWModifyOp::CLS_RGW_OP_RESYNC && (sync_modify_time < e.timestamp)) {
-          ldout(sync_env->cct, 20) << " resync on " << e.timestamp << dendl;
-          sync_modify_time = e.timestamp;
-          syncstopped = false;
+        if (e.op == CLS_RGW_OP_CANCEL) {
           continue;
         }
         if (e.state != CLS_RGW_STATE_COMPLETE) {
@@ -2547,13 +2822,17 @@ int RGWBucketShardIncrementalSyncCR::operate()
           continue;
         }
         auto& squash_entry = squash_map[make_pair(e.object, e.instance)];
+        // don't squash over olh entries - we need to apply their olh_epoch
+        if (has_olh_epoch(squash_entry.second) && !has_olh_epoch(e.op)) {
+          continue;
+        }
         if (squash_entry.first <= e.timestamp) {
           squash_entry = make_pair<>(e.timestamp, e.op);
         }
       }
 
       entries_iter = list_result.begin();
-      for (; entries_iter != list_result.end(); ++entries_iter) {
+      for (; entries_iter != entries_end; ++entries_iter) {
         if (!lease_cr->is_locked()) {
           drain_all();
           return set_cr_error(-ECANCELED);
@@ -2567,10 +2846,10 @@ int RGWBucketShardIncrementalSyncCR::operate()
             cur_id = entry->id.substr(p + 1);
           }
         }
-        inc_marker.position = cur_id;
+        sync_info.inc_marker.position = cur_id;
 
         if (entry->op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP || entry->op == RGWModifyOp::CLS_RGW_OP_RESYNC) {
-          ldout(sync_env->cct, 20) << "detected syncstop or resync  on " << entries_iter->timestamp << " , skipping entry" << dendl;
+          ldout(sync_env->cct, 20) << "detected syncstop or resync on " << entries_iter->timestamp << ", skipping entry" << dendl;
           marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
           continue;
         }
@@ -2617,13 +2896,13 @@ int RGWBucketShardIncrementalSyncCR::operate()
           set_status() << "squashed operation, skipping";
           ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
               << bucket_shard_str{bs} << "/" << key << ": squashed operation" << dendl;
-          /* not updating high marker though */
+          marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
           continue;
         }
         ldout(sync_env->cct, 20) << "[inc sync] syncing object: "
             << bucket_shard_str{bs} << "/" << key << dendl;
         updated_status = false;
-        while (!marker_tracker.can_do_op(key)) {
+        while (!marker_tracker.can_do_op(key, has_olh_epoch(entry->op))) {
           if (!updated_status) {
             set_status() << "can't do op, conflicting inflight operation";
             updated_status = true;
@@ -2639,8 +2918,14 @@ int RGWBucketShardIncrementalSyncCR::operate()
               /* we have reported this error */
             }
           }
+          if (sync_status != 0)
+            break;
+        }
+        if (sync_status != 0) {
+          /* get error, stop */
+          break;
         }
-        if (!marker_tracker.index_key_to_marker(key, cur_id)) {
+        if (!marker_tracker.index_key_to_marker(key, cur_id, has_olh_epoch(entry->op))) {
           set_status() << "can't do op, sync already in progress for object";
           ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << cur_id << ":" << key << " sync already in progress for object" << dendl;
           marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
@@ -2651,7 +2936,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
           if (!marker_tracker.start(cur_id, 0, entry->timestamp)) {
             ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << cur_id << ". Duplicate entry?" << dendl;
           } else {
-            uint64_t versioned_epoch = 0;
+            boost::optional<uint64_t> versioned_epoch;
             rgw_bucket_entry_owner owner(entry->owner, entry->owner_display_name);
             if (entry->ver.pool < 0) {
               versioned_epoch = entry->ver.epoch;
@@ -2672,7 +2957,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
           while (again) {
             again = collect(&ret, nullptr);
             if (ret < 0) {
-              ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
+              ldout(sync_env->cct, 10) << "a sync operation returned error" << dendl;
               sync_status = ret;
               /* we have reported this error */
             }
@@ -2680,19 +2965,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
           }
         }
       }
-    } while (!list_result.empty() && sync_status == 0);
-
-    if (syncstopped) {
-      drain_all();
-
-      yield {
-        const string& oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs);
-        RGWRados *store = sync_env->store;
-        call(new RGWRadosRemoveCR(store, rgw_raw_obj{store->get_zone_params().log_pool, oid}));
-      }
-      lease_cr->abort();
-      return set_cr_done();
-    }
+    } while (!list_result.empty() && sync_status == 0 && !syncstopped);
 
     while (num_spawned()) {
       yield wait_for_child();
@@ -2700,7 +2973,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
       while (again) {
         again = collect(&ret, nullptr);
         if (ret < 0) {
-          ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
+          ldout(sync_env->cct, 10) << "a sync operation returned error" << dendl;
           sync_status = ret;
           /* we have reported this error */
         }
@@ -2708,22 +2981,24 @@ int RGWBucketShardIncrementalSyncCR::operate()
       }
     }
 
+    if (syncstopped) {
+      // transition back to StateInit in RGWRunBucketSyncCoroutine. if sync is
+      // still disabled, we'll delete the sync status object. otherwise we'll
+      // restart full sync to catch any changes that happened while sync was
+      // disabled
+      sync_info.state = rgw_bucket_shard_sync_info::StateInit;
+      return set_cr_done();
+    }
+
     yield call(marker_tracker.flush());
     if (retcode < 0) {
       ldout(sync_env->cct, 0) << "ERROR: marker_tracker.flush() returned retcode=" << retcode << dendl;
       return set_cr_error(retcode);
     }
     if (sync_status < 0) {
-      ldout(sync_env->cct, 0) << "ERROR: failure in sync, backing out (sync_status=" << sync_status<< ")" << dendl;
-    }
-
-    /* wait for all operations to complete */
-    drain_all();
-
-    if (sync_status < 0) {
+      ldout(sync_env->cct, 10) << "failure in sync, backing out (sync_status=" << sync_status<< ")" << dendl;
       return set_cr_error(sync_status);
     }
-
     return set_cr_done();
   }
   return 0;
@@ -2795,43 +3070,45 @@ int RGWRunBucketSyncCoroutine::operate()
       return set_cr_error(retcode);
     }
 
-    if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
-      yield call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bs, sync_status));
-      if (retcode < 0) {
-        ldout(sync_env->cct, 0) << "ERROR: init sync on " << bucket_shard_str{bs}
-            << " failed, retcode=" << retcode << dendl;
-        lease_cr->go_down();
-        drain_all();
-        return set_cr_error(retcode);
+    do {
+      if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
+        yield call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bs, sync_status));
+        if (retcode < 0) {
+          ldout(sync_env->cct, 0) << "ERROR: init sync on " << bucket_shard_str{bs}
+              << " failed, retcode=" << retcode << dendl;
+          lease_cr->go_down();
+          drain_all();
+          return set_cr_error(retcode);
+        }
       }
-    }
 
-    if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
-      yield call(new RGWBucketShardFullSyncCR(sync_env, bs, &bucket_info,
-                                              status_oid, lease_cr.get(),
-                                              sync_status.full_marker));
-      if (retcode < 0) {
-        ldout(sync_env->cct, 5) << "full sync on " << bucket_shard_str{bs}
-            << " failed, retcode=" << retcode << dendl;
-        lease_cr->go_down();
-        drain_all();
-        return set_cr_error(retcode);
+      if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
+        yield call(new RGWBucketShardFullSyncCR(sync_env, bs, &bucket_info,
+                                                status_oid, lease_cr.get(),
+                                                sync_status));
+        if (retcode < 0) {
+          ldout(sync_env->cct, 5) << "full sync on " << bucket_shard_str{bs}
+              << " failed, retcode=" << retcode << dendl;
+          lease_cr->go_down();
+          drain_all();
+          return set_cr_error(retcode);
+        }
       }
-      sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
-    }
 
-    if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
-      yield call(new RGWBucketShardIncrementalSyncCR(sync_env, bs, &bucket_info,
-                                                     status_oid, lease_cr.get(),
-                                                     sync_status.inc_marker));
-      if (retcode < 0) {
-        ldout(sync_env->cct, 5) << "incremental sync on " << bucket_shard_str{bs}
-            << " failed, retcode=" << retcode << dendl;
-        lease_cr->go_down();
-        drain_all();
-        return set_cr_error(retcode);
+      if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
+        yield call(new RGWBucketShardIncrementalSyncCR(sync_env, bs, &bucket_info,
+                                                       status_oid, lease_cr.get(),
+                                                       sync_status));
+        if (retcode < 0) {
+          ldout(sync_env->cct, 5) << "incremental sync on " << bucket_shard_str{bs}
+              << " failed, retcode=" << retcode << dendl;
+          lease_cr->go_down();
+          drain_all();
+          return set_cr_error(retcode);
+        }
       }
-    }
+      // loop back to previous states unless incremental sync returns normally
+    } while (sync_status.state != rgw_bucket_shard_sync_info::StateIncrementalSync);
 
     lease_cr->go_down();
     drain_all();
@@ -2964,6 +3241,55 @@ string RGWBucketSyncStatusManager::status_oid(const string& source_zone,
   return bucket_status_oid_prefix + "." + source_zone + ":" + bs.get_key();
 }
 
+class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR {
+  static constexpr int max_concurrent_shards = 16;
+  RGWRados *const store;
+  RGWDataSyncEnv *const env;
+  const int num_shards;
+  rgw_bucket_shard bs;
+
+  using Vector = std::vector<rgw_bucket_shard_sync_info>;
+  Vector::iterator i, end;
+
+ public:
+  RGWCollectBucketSyncStatusCR(RGWRados *store, RGWDataSyncEnv *env,
+                               int num_shards, const rgw_bucket& bucket,
+                               Vector *status)
+    : RGWShardCollectCR(store->ctx(), max_concurrent_shards),
+      store(store), env(env), num_shards(num_shards),
+      bs(bucket, num_shards > 0 ? 0 : -1), // start at shard 0 or -1
+      i(status->begin()), end(status->end())
+  {}
+
+  bool spawn_next() override {
+    if (i == end) {
+      return false;
+    }
+    spawn(new RGWReadBucketSyncStatusCoroutine(env, bs, &*i), false);
+    ++i;
+    ++bs.shard_id;
+    return true;
+  }
+};
+
+int rgw_bucket_sync_status(RGWRados *store, const std::string& source_zone,
+                           const RGWBucketInfo& bucket_info,
+                           std::vector<rgw_bucket_shard_sync_info> *status)
+{
+  const auto num_shards = bucket_info.num_shards;
+  status->clear();
+  status->resize(std::max<size_t>(1, num_shards));
+
+  RGWDataSyncEnv env;
+  RGWSyncModuleInstanceRef module; // null sync module
+  env.init(store->ctx(), store, nullptr, store->get_async_rados(),
+           nullptr, nullptr, source_zone, module);
+
+  RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
+  return crs.run(new RGWCollectBucketSyncStatusCR(store, &env, num_shards,
+                                                  bucket_info.bucket, status));
+}
+
 
 // TODO: move into rgw_data_sync_trim.cc
 #undef dout_prefix