]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_data_sync.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rgw / rgw_data_sync.cc
index 5da750df2341ea33963c7e6c7cd9ffe20054c2ac..90eb6bc234410de229b00c2338f6c9397de528c5 100644 (file)
@@ -12,6 +12,7 @@
 
 #include "rgw_common.h"
 #include "rgw_rados.h"
+#include "rgw_zone.h"
 #include "rgw_sync.h"
 #include "rgw_data_sync.h"
 #include "rgw_rest_conn.h"
 
 #include "cls/lock/cls_lock_client.h"
 
-#include "auth/Crypto.h"
+#include "services/svc_zone.h"
+#include "services/svc_sync_modules.h"
+
+#include "include/random.h"
 
 #include <boost/asio/yield.hpp>
 
@@ -38,73 +42,8 @@ static string datalog_sync_status_oid_prefix = "datalog.sync-status";
 static string datalog_sync_status_shard_prefix = "datalog.sync-status.shard";
 static string datalog_sync_full_sync_index_prefix = "data.full-sync.index";
 static string bucket_status_oid_prefix = "bucket.sync-status";
+static string object_status_oid_prefix = "bucket.sync-status";
 
-class RGWSyncDebugLogger {
-  CephContext *cct;
-  string prefix;
-
-  bool ended;
-
-public:
-  RGWSyncDebugLogger(CephContext *_cct, const string& source_zone,
-                     const string& sync_type, const string& sync_stage,
-                     const string& resource, bool log_start = true) {
-    init(_cct, source_zone, sync_type, sync_stage, resource, log_start);
-  }
-  RGWSyncDebugLogger() : cct(NULL), ended(false) {}
-  ~RGWSyncDebugLogger();
-
-  void init(CephContext *_cct, const string& source_zone,
-            const string& sync_type, const string& sync_stage,
-            const string& resource, bool log_start = true);
-  void log(const string& state);
-  void finish(int status);
-};
-
-void RGWSyncDebugLogger::init(CephContext *_cct, const string& source_zone,
-                              const string& sync_type, const string& sync_section,
-                              const string& resource, bool log_start)
-{
-  cct = _cct;
-  ended = false;
-  string zone_str = source_zone.substr(0, 8);
-  prefix = "Sync:" + zone_str + ":" + sync_type + ":" + sync_section + ":" + resource;
-  if (log_start) {
-    log("start");
-  }
-}
-
-RGWSyncDebugLogger::~RGWSyncDebugLogger()
-{
-  if (!ended) {
-    log("finish");
-  }
-}
-
-void RGWSyncDebugLogger::log(const string& state)
-{
-  ldout(cct, 5) << prefix << ":" << state << dendl;
-}
-
-void RGWSyncDebugLogger::finish(int status)
-{
-  ended = true;
-  ldout(cct, 5) << prefix << ":" << "finish r=" << status << dendl;
-}
-
-class RGWDataSyncDebugLogger : public RGWSyncDebugLogger {
-public:
-  RGWDataSyncDebugLogger() {}
-  RGWDataSyncDebugLogger(RGWDataSyncEnv *sync_env, const string& sync_section,
-                         const string& resource, bool log_start = true) {
-    init(sync_env, sync_section, resource, log_start);
-  }
-  void init(RGWDataSyncEnv *sync_env, const string& sync_section,
-            const string& resource, bool log_start = true) {
-    RGWSyncDebugLogger::init(sync_env->cct, sync_env->source_zone, "data", sync_section, resource, log_start);
-  }
-
-};
 
 void rgw_datalog_info::decode_json(JSONObj *obj) {
   JSONDecoder::decode_json("num_objects", num_shards, obj);
@@ -147,8 +86,8 @@ bool RGWReadDataSyncStatusMarkersCR::spawn_next()
     return false;
   }
   using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
-  spawn(new CR(env->async_rados, env->store,
-               rgw_raw_obj(env->store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id)),
+  spawn(new CR(env->async_rados, env->store->svc.sysobj,
+               rgw_raw_obj(env->store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id)),
                &markers[shard_id]),
         false);
   shard_id++;
@@ -162,28 +101,30 @@ class RGWReadDataSyncRecoveringShardsCR : public RGWShardCollectCR {
 
   uint64_t max_entries;
   int num_shards;
-  int shard_id{0};;
+  int shard_id{0};
 
   string marker;
-  map<int, std::set<std::string>> &entries_map;
+  std::vector<RGWRadosGetOmapKeysCR::ResultPtr>& omapkeys;
 
  public:
   RGWReadDataSyncRecoveringShardsCR(RGWDataSyncEnv *env, uint64_t _max_entries, int _num_shards,
-      map<int, std::set<std::string>>& _entries_map)
+                                    std::vector<RGWRadosGetOmapKeysCR::ResultPtr>& omapkeys)
     : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS), env(env),
-    max_entries(_max_entries), num_shards(_num_shards), entries_map(_entries_map)
+      max_entries(_max_entries), num_shards(_num_shards), omapkeys(omapkeys)
   {}
   bool spawn_next() override;
 };
 
 bool RGWReadDataSyncRecoveringShardsCR::spawn_next()
 {
-  if (shard_id > num_shards)
+  if (shard_id >= num_shards)
     return false;
  
   string error_oid = RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id) + ".retry";
-  spawn(new RGWRadosGetOmapKeysCR(env->store, rgw_raw_obj(env->store->get_zone_params().log_pool, error_oid),
-                                  marker, &entries_map[shard_id], max_entries), false);
+  auto& shard_keys = omapkeys[shard_id];
+  shard_keys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
+  spawn(new RGWRadosGetOmapKeysCR(env->store, rgw_raw_obj(env->store->svc.zone->get_zone_params().log_pool, error_oid),
+                                  marker, max_entries, shard_keys), false);
 
   ++shard_id;
   return true;
@@ -208,8 +149,8 @@ int RGWReadDataSyncStatusCoroutine::operate()
     using ReadInfoCR = RGWSimpleRadosReadCR<rgw_data_sync_info>;
     yield {
       bool empty_on_enoent = false; // fail on ENOENT
-      call(new ReadInfoCR(sync_env->async_rados, sync_env->store,
-                          rgw_raw_obj(sync_env->store->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
+      call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc.sysobj,
+                          rgw_raw_obj(sync_env->store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
                           &sync_status->sync_info, empty_on_enoent));
     }
     if (retcode < 0) {
@@ -268,7 +209,7 @@ public:
 
         http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
 
-        http_op->set_user_info((void *)stack);
+        init_new_io(http_op);
 
         int ret = http_op->aio_read();
         if (ret < 0) {
@@ -348,7 +289,7 @@ public:
 
         http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
 
-        http_op->set_user_info((void *)stack);
+        init_new_io(http_op);
 
         int ret = http_op->aio_read();
         if (ret < 0) {
@@ -439,7 +380,7 @@ public:
     string p = "/admin/log/";
 
     http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
-    http_op->set_user_info((void *)stack);
+    init_new_io(http_op);
 
     int ret = http_op->aio_read();
     if (ret < 0) {
@@ -509,13 +450,17 @@ class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
   string cookie;
   rgw_data_sync_status *status;
   map<int, RGWDataChangesLogInfo> shards_info;
+
+  RGWSyncTraceNodeRef tn;
 public:
   RGWInitDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, uint32_t num_shards,
                                  uint64_t instance_id,
+                                 RGWSyncTraceNodeRef& _tn_parent,
                                  rgw_data_sync_status *status)
     : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), store(sync_env->store),
-      pool(store->get_zone_params().log_pool),
-      num_shards(num_shards), status(status) {
+      pool(store->svc.zone->get_zone_params().log_pool),
+      num_shards(num_shards), status(status),
+      tn(sync_env->sync_tracer->add_node(_tn_parent, "init_data_sync_status")) {
     lock_name = "sync_lock";
 
     status->sync_info.instance_id = instance_id;
@@ -527,6 +472,7 @@ public:
     cookie = buf;
 
     sync_status_oid = RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone);
+
   }
 
   int operate() override {
@@ -537,15 +483,15 @@ public:
                             rgw_raw_obj{pool, sync_status_oid},
                             lock_name, cookie, lock_duration));
       if (retcode < 0) {
-        ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
+        tn->log(0, SSTR("ERROR: failed to take a lock on " << sync_status_oid));
         return set_cr_error(retcode);
       }
       using WriteInfoCR = RGWSimpleRadosWriteCR<rgw_data_sync_info>;
-      yield call(new WriteInfoCR(sync_env->async_rados, store,
+      yield call(new WriteInfoCR(sync_env->async_rados, store->svc.sysobj,
                                  rgw_raw_obj{pool, sync_status_oid},
                                  status->sync_info));
       if (retcode < 0) {
-        ldout(cct, 0) << "ERROR: failed to write sync status info with " << retcode << dendl;
+        tn->log(0, SSTR("ERROR: failed to write sync status info with " << retcode));
         return set_cr_error(retcode);
       }
 
@@ -554,15 +500,17 @@ public:
                             rgw_raw_obj{pool, sync_status_oid},
                             lock_name, cookie, lock_duration));
       if (retcode < 0) {
-        ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
+        tn->log(0, SSTR("ERROR: failed to take a lock on " << sync_status_oid));
         return set_cr_error(retcode);
       }
 
+      tn->log(10, "took lease");
+
       /* fetch current position in logs */
       yield {
-        RGWRESTConn *conn = store->get_zone_conn_by_id(sync_env->source_zone);
+        RGWRESTConn *conn = store->svc.zone->get_zone_conn_by_id(sync_env->source_zone);
         if (!conn) {
-          ldout(cct, 0) << "ERROR: connection to zone " << sync_env->source_zone << " does not exist!" << dendl;
+          tn->log(0, SSTR("ERROR: connection to zone " << sync_env->source_zone << " does not exist!"));
           return set_cr_error(-EIO);
         }
         for (uint32_t i = 0; i < num_shards; i++) {
@@ -571,7 +519,7 @@ public:
       }
       while (collect(&ret, NULL)) {
         if (ret < 0) {
-          ldout(cct, 0) << "ERROR: failed to read remote data log shards" << dendl;
+          tn->log(0, SSTR("ERROR: failed to read remote data log shards"));
           return set_state(RGWCoroutine_Error);
         }
         yield;
@@ -584,24 +532,24 @@ public:
           marker.timestamp = info.last_update;
           const auto& oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, i);
           using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_data_sync_marker>;
-          spawn(new WriteMarkerCR(sync_env->async_rados, store,
+          spawn(new WriteMarkerCR(sync_env->async_rados, store->svc.sysobj,
                                   rgw_raw_obj{pool, oid}, marker), true);
         }
       }
       while (collect(&ret, NULL)) {
         if (ret < 0) {
-          ldout(cct, 0) << "ERROR: failed to write data sync status markers" << dendl;
+          tn->log(0, SSTR("ERROR: failed to write data sync status markers"));
           return set_state(RGWCoroutine_Error);
         }
         yield;
       }
 
       status->sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
-      yield call(new WriteInfoCR(sync_env->async_rados, store,
+      yield call(new WriteInfoCR(sync_env->async_rados, store->svc.sysobj,
                                  rgw_raw_obj{pool, sync_status_oid},
                                  status->sync_info));
       if (retcode < 0) {
-        ldout(cct, 0) << "ERROR: failed to write sync status info with " << retcode << dendl;
+        tn->log(0, SSTR("ERROR: failed to write sync status info with " << retcode));
         return set_cr_error(retcode);
       }
       yield call(new RGWSimpleRadosUnlockCR(sync_env->async_rados, store,
@@ -620,11 +568,11 @@ int RGWRemoteDataLog::read_log_info(rgw_datalog_info *log_info)
 
   int ret = sync_env.conn->get_json_resource("/admin/log", pairs, *log_info);
   if (ret < 0) {
-    ldout(store->ctx(), 0) << "ERROR: failed to fetch datalog info" << dendl;
+    ldpp_dout(dpp, 0) << "ERROR: failed to fetch datalog info" << dendl;
     return ret;
   }
 
-  ldout(store->ctx(), 20) << "remote datalog, num_shards=" << log_info->num_shards << dendl;
+  ldpp_dout(dpp, 20) << "remote datalog, num_shards=" << log_info->num_shards << dendl;
 
   return 0;
 }
@@ -642,28 +590,27 @@ int RGWRemoteDataLog::read_source_log_shards_info(map<int, RGWDataChangesLogInfo
 
 int RGWRemoteDataLog::read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result)
 {
-  if (store->is_meta_master()) {
-    return 0;
-  }
-
   return run(new RGWListRemoteDataLogCR(&sync_env, shard_markers, 1, result));
 }
 
-int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncModuleInstanceRef& _sync_module)
+int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger,
+                           RGWSyncTraceManager *_sync_tracer, RGWSyncModuleInstanceRef& _sync_module)
 {
-  sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger,
-                _source_zone, _sync_module);
+  sync_env.init(dpp, store->ctx(), store, _conn, async_rados, &http_manager, _error_logger,
+                _sync_tracer, _source_zone, _sync_module);
 
   if (initialized) {
     return 0;
   }
 
-  int ret = http_manager.set_threaded();
+  int ret = http_manager.start();
   if (ret < 0) {
-    ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
+    ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
     return ret;
   }
 
+  tn = sync_env.sync_tracer->add_node(sync_env.sync_tracer->root_node, "data");
+
   initialized = true;
 
   return 0;
@@ -679,9 +626,9 @@ int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status)
   // cannot run concurrently with run_sync(), so run in a separate manager
   RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
   RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
-  int ret = http_manager.set_threaded();
+  int ret = http_manager.start();
   if (ret < 0) {
-    ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
+    ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
     return ret;
   }
   RGWDataSyncEnv sync_env_local = sync_env;
@@ -696,22 +643,23 @@ int RGWRemoteDataLog::read_recovering_shards(const int num_shards, set<int>& rec
   // cannot run concurrently with run_sync(), so run in a separate manager
   RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
   RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
-  int ret = http_manager.set_threaded();
+  int ret = http_manager.start();
   if (ret < 0) {
-    ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
+    ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
     return ret;
   }
   RGWDataSyncEnv sync_env_local = sync_env;
   sync_env_local.http_manager = &http_manager;
-  map<int, std::set<std::string>> entries_map;
+  std::vector<RGWRadosGetOmapKeysCR::ResultPtr> omapkeys;
+  omapkeys.resize(num_shards);
   uint64_t max_entries{1};
-  ret = crs.run(new RGWReadDataSyncRecoveringShardsCR(&sync_env_local, max_entries, num_shards, entries_map));
+  ret = crs.run(new RGWReadDataSyncRecoveringShardsCR(&sync_env_local, max_entries, num_shards, omapkeys));
   http_manager.stop();
 
   if (ret == 0) {
-    for (const auto& entry : entries_map) {
-      if (entry.second.size() != 0) {
-        recovering_shards.insert(entry.first);
+    for (int i = 0; i < num_shards; i++) {
+      if (omapkeys[i]->entries.size() != 0) {
+        recovering_shards.insert(i);
       }
     }
   }
@@ -726,16 +674,15 @@ int RGWRemoteDataLog::init_sync_status(int num_shards)
 
   RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
   RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
-  int ret = http_manager.set_threaded();
+  int ret = http_manager.start();
   if (ret < 0) {
-    ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
+    ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
     return ret;
   }
   RGWDataSyncEnv sync_env_local = sync_env;
   sync_env_local.http_manager = &http_manager;
-  uint64_t instance_id;
-  get_random_bytes((char *)&instance_id, sizeof(instance_id));
-  ret = crs.run(new RGWInitDataSyncStatusCoroutine(&sync_env_local, num_shards, instance_id, &sync_status));
+  auto instance_id = ceph::util::generate_random_number<uint64_t>();
+  ret = crs.run(new RGWInitDataSyncStatusCoroutine(&sync_env_local, num_shards, instance_id, tn, &sync_status));
   http_manager.stop();
   return ret;
 }
@@ -811,15 +758,15 @@ public:
                                                       entrypoint, NULL, &result));
       }
       if (retcode < 0) {
-        ldout(sync_env->cct, 0) << "ERROR: failed to fetch metadata for section bucket.index" << dendl;
+        ldout(sync_env->cct, 0) << "ERROR: failed to fetch metadata for section bucket.instance" << dendl;
         return set_cr_error(retcode);
       }
       entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards,
-                                                 store->get_zone_params().log_pool,
+                                                 store->svc.zone->get_zone_params().log_pool,
                                                   oid_prefix);
       yield; // yield so OmapAppendCRs can start
       for (iter = result.begin(); iter != result.end(); ++iter) {
-        ldout(sync_env->cct, 20) << "list metadata: section=bucket.index key=" << *iter << dendl;
+        ldout(sync_env->cct, 20) << "list metadata: section=bucket.instance key=" << *iter << dendl;
 
         key = *iter;
 
@@ -852,8 +799,8 @@ public:
           int shard_id = (int)iter->first;
           rgw_data_sync_marker& marker = iter->second;
           marker.total_entries = entries_index->get_total_entries(shard_id);
-          spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
-                                                                rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
+          spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store->svc.sysobj,
+                                                                rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
                                                                 marker), true);
         }
       } else {
@@ -899,23 +846,27 @@ class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, strin
     marker_to_key.erase(iter);
   }
 
+  RGWSyncTraceNodeRef tn;
+
 public:
   RGWDataSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
                          const string& _marker_oid,
-                         const rgw_data_sync_marker& _marker) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
+                         const rgw_data_sync_marker& _marker,
+                         RGWSyncTraceNodeRef& _tn) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
                                                                 sync_env(_sync_env),
                                                                 marker_oid(_marker_oid),
-                                                                sync_marker(_marker) {}
+                                                                sync_marker(_marker),
+                                                                tn(_tn) {}
 
   RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
     sync_marker.marker = new_marker;
     sync_marker.pos = index_pos;
 
-    ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
+    tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
     RGWRados *store = sync_env->store;
 
-    return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
-                                                           rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
+    return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store->svc.sysobj,
+                                                           rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, marker_oid),
                                                            sync_marker);
   }
 
@@ -936,7 +887,7 @@ public:
     return true;
   }
 
-  RGWOrderCallCR *allocate_order_control_cr() {
+  RGWOrderCallCR *allocate_order_control_cr() override {
     return new RGWLastCallerWinsCR(sync_env->cct);
   }
 };
@@ -944,7 +895,7 @@ public:
 // ostream wrappers to print buckets without copying strings
 struct bucket_str {
   const rgw_bucket& b;
-  bucket_str(const rgw_bucket& b) : b(b) {}
+  explicit bucket_str(const rgw_bucket& b) : b(b) {}
 };
 std::ostream& operator<<(std::ostream& out, const bucket_str& rhs) {
   auto& b = rhs.b;
@@ -958,9 +909,22 @@ std::ostream& operator<<(std::ostream& out, const bucket_str& rhs) {
   return out;
 }
 
+struct bucket_str_noinstance {
+  const rgw_bucket& b;
+  explicit bucket_str_noinstance(const rgw_bucket& b) : b(b) {}
+};
+std::ostream& operator<<(std::ostream& out, const bucket_str_noinstance& rhs) {
+  auto& b = rhs.b;
+  if (!b.tenant.empty()) {
+    out << b.tenant << '/';
+  }
+  out << b.name;
+  return out;
+}
+
 struct bucket_shard_str {
   const rgw_bucket_shard& bs;
-  bucket_shard_str(const rgw_bucket_shard& bs) : bs(bs) {}
+  explicit bucket_shard_str(const rgw_bucket_shard& bs) : bs(bs) {}
 };
 std::ostream& operator<<(std::ostream& out, const bucket_shard_str& rhs) {
   auto& bs = rhs.bs;
@@ -978,17 +942,19 @@ class RGWRunBucketSyncCoroutine : public RGWCoroutine {
   rgw_bucket_shard_sync_info sync_status;
   RGWMetaSyncEnv meta_sync_env;
 
-  RGWDataSyncDebugLogger logger;
   const std::string status_oid;
 
   boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
   boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
 
+  RGWSyncTraceNodeRef tn;
+
 public:
-  RGWRunBucketSyncCoroutine(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs)
+  RGWRunBucketSyncCoroutine(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs, const RGWSyncTraceNodeRef& _tn_parent)
     : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
-      status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)) {
-    logger.init(sync_env, "Bucket", bs.get_key());
+      status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
+      tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket",
+                                         SSTR(bucket_shard_str{bs}))) {
   }
   ~RGWRunBucketSyncCoroutine() override {
     if (lease_cr) {
@@ -1018,16 +984,18 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine {
 
   set<string> keys;
 
+  RGWSyncTraceNodeRef tn;
 public:
   RGWDataSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
                           const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker,
-                           RGWOmapAppend *_error_repo, bool _remove_from_repo) : RGWCoroutine(_sync_env->cct),
+                           RGWOmapAppend *_error_repo, bool _remove_from_repo, const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct),
                                                       sync_env(_sync_env),
                                                      raw_key(_raw_key), entry_marker(_entry_marker),
                                                       sync_status(0),
                                                       marker_tracker(_marker_tracker),
                                                       error_repo(_error_repo), remove_from_repo(_remove_from_repo) {
     set_description() << "data sync single entry (source_zone=" << sync_env->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker;
+    tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", raw_key);
   }
 
   int operate() override {
@@ -1042,7 +1010,8 @@ public:
           if (marker_tracker) {
             marker_tracker->reset_need_retry(raw_key);
           }
-          call(new RGWRunBucketSyncCoroutine(sync_env, bs));
+          tn->log(0, SSTR("triggering sync of bucket/shard " << bucket_shard_str{bs}));
+          call(new RGWRunBucketSyncCoroutine(sync_env, bs, tn));
         }
       } while (marker_tracker && marker_tracker->need_retry(raw_key));
 
@@ -1052,8 +1021,7 @@ public:
         // this was added when 'tenant/' was added to datalog entries, because
         // preexisting tenant buckets could never sync and would stay in the
         // error_repo forever
-        ldout(sync_env->store->ctx(), 0) << "WARNING: skipping data log entry "
-            "for missing bucket " << raw_key << dendl;
+        tn->log(0, SSTR("WARNING: skipping data log entry for missing bucket " << raw_key));
         sync_status = 0;
       }
 
@@ -1063,18 +1031,18 @@ public:
           yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", raw_key,
                                                           -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status)));
           if (retcode < 0) {
-            ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure: retcode=" << retcode << dendl;
+            tn->log(0, SSTR("ERROR: failed to log sync failure: retcode=" << retcode));
           }
         }
         if (error_repo && !error_repo->append(raw_key)) {
-          ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure in error repo: retcode=" << retcode << dendl;
+          tn->log(0, SSTR("ERROR: failed to log sync failure in error repo: retcode=" << retcode));
         }
       } else if (error_repo && remove_from_repo) {
         keys = {raw_key};
         yield call(new RGWRadosRemoveOmapKeysCR(sync_env->store, error_repo->get_obj(), keys));
         if (retcode < 0) {
-          ldout(sync_env->store->ctx(), 0) << "ERROR: failed to remove omap key from error repo ("
-             << error_repo->get_obj() << " retcode=" << retcode << dendl;
+          tn->log(0, SSTR("ERROR: failed to remove omap key from error repo ("
+             << error_repo->get_obj() << " retcode=" << retcode));
         }
       }
       /* FIXME: what do do in case of error */
@@ -1105,6 +1073,7 @@ class RGWDataSyncShardCR : public RGWCoroutine {
   uint32_t shard_id;
   rgw_data_sync_marker sync_marker;
 
+  RGWRadosGetOmapKeysCR::ResultPtr omapkeys;
   std::set<std::string> entries;
   std::set<std::string>::iterator iter;
 
@@ -1155,11 +1124,13 @@ class RGWDataSyncShardCR : public RGWCoroutine {
 #define RETRY_BACKOFF_SECS_MAX 600
   uint32_t retry_backoff_secs;
 
-  RGWDataSyncDebugLogger logger;
+  RGWSyncTraceNodeRef tn;
 public:
   RGWDataSyncShardCR(RGWDataSyncEnv *_sync_env,
                      rgw_pool& _pool,
-                    uint32_t _shard_id, const rgw_data_sync_marker& _marker, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
+                    uint32_t _shard_id, const rgw_data_sync_marker& _marker,
+                     RGWSyncTraceNodeRef& _tn,
+                     bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
                                                       sync_env(_sync_env),
                                                      pool(_pool),
                                                      shard_id(_shard_id),
@@ -1167,12 +1138,10 @@ public:
                                                       marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"),
                                                       total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL),
                                                       lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES),
-                                                      retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT) {
+                                                      retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT), tn(_tn) {
     set_description() << "data sync shard source_zone=" << sync_env->source_zone << " shard_id=" << shard_id;
     status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
     error_oid = status_oid + ".retry";
-
-    logger.init(sync_env, "DataShard", status_oid);
   }
 
   ~RGWDataSyncShardCR() override {
@@ -1202,14 +1171,18 @@ public:
       case rgw_data_sync_marker::FullSync:
         r = full_sync();
         if (r < 0) {
-          ldout(cct, 10) << "sync: full_sync: shard_id=" << shard_id << " r=" << r << dendl;
+          if (r != -EBUSY) {
+            tn->log(10, SSTR("full sync failed (r=" << r << ")"));
+          }
           return set_cr_error(r);
         }
         return 0;
       case rgw_data_sync_marker::IncrementalSync:
         r  = incremental_sync();
         if (r < 0) {
-          ldout(cct, 10) << "sync: incremental_sync: shard_id=" << shard_id << " r=" << r << dendl;
+          if (r != -EBUSY) {
+            tn->log(10, SSTR("incremental sync failed (r=" << r << ")"));
+          }
           return set_cr_error(r);
         }
         return 0;
@@ -1229,7 +1202,7 @@ public:
     }
     RGWRados *store = sync_env->store;
     lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
-                                            rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
+                                            rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
                                             lock_name, lock_duration, this));
     lease_stack.reset(spawn(lease_cr.get(), false));
   }
@@ -1238,10 +1211,11 @@ public:
 #define OMAP_GET_MAX_ENTRIES 100
     int max_entries = OMAP_GET_MAX_ENTRIES;
     reenter(&full_cr) {
+      tn->log(10, "start full sync");
       yield init_lease_cr();
       while (!lease_cr->is_locked()) {
         if (lease_cr->is_done()) {
-          ldout(cct, 5) << "lease cr failed, done early " << dendl;
+          tn->log(5, "failed to take lease");
           set_status("lease lock failed, early abort");
           drain_all();
           return set_cr_error(lease_cr->get_ret_status());
@@ -1249,9 +1223,9 @@ public:
         set_sleeping(true);
         yield;
       }
-      logger.log("full sync");
+      tn->log(10, "took lease");
       oid = full_data_sync_index_shard_oid(sync_env->source_zone, shard_id);
-      set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker));
+      set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker, tn));
       total_entries = sync_marker.pos;
       do {
         if (!lease_cr->is_locked()) {
@@ -1259,22 +1233,29 @@ public:
           drain_all();
           return set_cr_error(-ECANCELED);
         }
-        yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid), sync_marker.marker, &entries, max_entries));
+        omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
+        yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid),
+                                             sync_marker.marker, max_entries, omapkeys));
         if (retcode < 0) {
-          ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
+          tn->log(0, SSTR("ERROR: RGWRadosGetOmapKeysCR() returned ret=" << retcode));
           lease_cr->go_down();
           drain_all();
           return set_cr_error(retcode);
         }
+        entries = std::move(omapkeys->entries);
+        if (entries.size() > 0) {
+          tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
+        }
+        tn->log(20, SSTR("retrieved " << entries.size() << " entries to sync"));
         iter = entries.begin();
         for (; iter != entries.end(); ++iter) {
-          ldout(sync_env->cct, 20) << __func__ << ": full sync: " << *iter << dendl;
+          tn->log(20, SSTR("full sync: " << *iter));
           total_entries++;
           if (!marker_tracker->start(*iter, total_entries, real_time())) {
-            ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << *iter << ". Duplicate entry?" << dendl;
+            tn->log(0, SSTR("ERROR: cannot start syncing " << *iter << ". Duplicate entry?"));
           } else {
             // fetch remote and write locally
-            yield spawn(new RGWDataSyncSingleEntryCR(sync_env, *iter, *iter, marker_tracker, error_repo, false), false);
+            yield spawn(new RGWDataSyncSingleEntryCR(sync_env, *iter, *iter, marker_tracker, error_repo, false, tn), false);
           }
           sync_marker.marker = *iter;
 
@@ -1284,27 +1265,30 @@ public:
             int ret;
             while (collect(&ret, lease_stack.get())) {
               if (ret < 0) {
-                ldout(cct, 10) << "a sync operation returned error" << dendl;
+                tn->log(10, "a sync operation returned error");
               }
             }
           }
         }
-      } while ((int)entries.size() == max_entries);
+      } while (omapkeys->more);
+      omapkeys.reset();
 
       drain_all_but_stack(lease_stack.get());
 
+      tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
+
       yield {
         /* update marker to reflect we're done with full sync */
         sync_marker.state = rgw_data_sync_marker::IncrementalSync;
         sync_marker.marker = sync_marker.next_step_marker;
         sync_marker.next_step_marker.clear();
         RGWRados *store = sync_env->store;
-        call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
-                                                             rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
+        call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store->svc.sysobj,
+                                                             rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
                                                              sync_marker));
       }
       if (retcode < 0) {
-        ldout(sync_env->cct, 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
+        tn->log(0, SSTR("ERROR: failed to set sync marker: retcode=" << retcode));
         lease_cr->go_down();
         drain_all();
         return set_cr_error(retcode);
@@ -1316,14 +1300,14 @@ public:
 
   int incremental_sync() {
     reenter(&incremental_cr) {
-      ldout(cct, 10) << "start incremental sync" << dendl;
+      tn->log(10, "start incremental sync");
       if (lease_cr) {
-        ldout(cct, 10) << "lease already held from full sync" << dendl;
+        tn->log(10, "lease already held from full sync");
       } else {
         yield init_lease_cr();
         while (!lease_cr->is_locked()) {
           if (lease_cr->is_done()) {
-            ldout(cct, 5) << "lease cr failed, done early " << dendl;
+            tn->log(5, "failed to take lease");
             set_status("lease lock failed, early abort");
             drain_all();
             return set_cr_error(lease_cr->get_ret_status());
@@ -1332,15 +1316,14 @@ public:
           yield;
         }
         set_status("lease acquired");
-        ldout(cct, 10) << "took lease" << dendl;
+        tn->log(10, "took lease");
       }
       error_repo = new RGWOmapAppend(sync_env->async_rados, sync_env->store,
                                      rgw_raw_obj(pool, error_oid),
                                      1 /* no buffer */);
       error_repo->get();
       spawn(error_repo, false);
-      logger.log("inc sync");
-      set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker));
+      set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker, tn));
       do {
         if (!lease_cr->is_locked()) {
           stop_spawned_services();
@@ -1352,27 +1335,31 @@ public:
         current_modified.swap(modified_shards);
         inc_lock.Unlock();
 
+        if (current_modified.size() > 0) {
+          tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
+        }
         /* process out of band updates */
         for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) {
           yield {
-            ldout(sync_env->cct, 20) << __func__ << "(): async update notification: " << *modified_iter << dendl;
-            spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker, error_repo, false), false);
+            tn->log(20, SSTR("received async update notification: " << *modified_iter));
+            spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker, error_repo, false, tn), false);
           }
         }
 
         if (error_retry_time <= ceph::coarse_real_clock::now()) {
           /* process bucket shards that previously failed */
+          omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
           yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid),
-                                               error_marker, &error_entries,
-                                               max_error_entries));
-          ldout(sync_env->cct, 20) << __func__ << "(): read error repo, got " << error_entries.size() << " entries" << dendl;
+                                               error_marker, max_error_entries, omapkeys));
+          error_entries = std::move(omapkeys->entries);
+          tn->log(20, SSTR("read error repo, got " << error_entries.size() << " entries"));
           iter = error_entries.begin();
           for (; iter != error_entries.end(); ++iter) {
             error_marker = *iter;
-            ldout(sync_env->cct, 20) << __func__ << "(): handle error entry: " << error_marker << dendl;
-            spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true), false);
+            tn->log(20, SSTR("handle error entry: " << error_marker));
+            spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true, tn), false);
           }
-          if ((int)error_entries.size() != max_error_entries) {
+          if (!omapkeys->more) {
             if (error_marker.empty() && error_entries.empty()) {
               /* the retry repo is empty, we back off a bit before calling it again */
               retry_backoff_secs *= 2;
@@ -1386,34 +1373,40 @@ public:
             error_marker.clear();
           }
         }
+        omapkeys.reset();
 
 #define INCREMENTAL_MAX_ENTRIES 100
-             ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " sync_marker=" << sync_marker.marker << dendl;
+        tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker));
         spawned_keys.clear();
         yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, sync_marker.marker,
                                                    &next_marker, &log_entries, &truncated));
         if (retcode < 0) {
-          ldout(sync_env->cct, 0) << "ERROR: failed to read remote data log info: ret=" << retcode << dendl;
+          tn->log(0, SSTR("ERROR: failed to read remote data log info: ret=" << retcode));
           stop_spawned_services();
           drain_all();
           return set_cr_error(retcode);
         }
+
+        if (log_entries.size() > 0) {
+          tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
+        }
+
         for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
-          ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key << dendl;
+          tn->log(20, SSTR("shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key));
           if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) {
-            ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard" << dendl;
+            tn->log(20, SSTR("skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard"));
             marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
             continue;
           }
           if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
-            ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?" << dendl;
+            tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?"));
           } else {
             /*
              * don't spawn the same key more than once. We can do that as long as we don't yield
              */
             if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
               spawned_keys.insert(log_iter->entry.key);
-              spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false), false);
+              spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false, tn), false);
               if (retcode < 0) {
                 stop_spawned_services();
                 drain_all();
@@ -1421,30 +1414,32 @@ public:
               }
             }
           }
-          while ((int)num_spawned() > spawn_window) {
-            set_status() << "num_spawned() > spawn_window";
-            yield wait_for_child();
-            int ret;
-            while (collect(&ret, lease_stack.get())) {
-              if (ret < 0) {
-                ldout(sync_env->cct, 10) << "a sync operation returned error" << dendl;
-                /* we have reported this error */
-              }
-              /* not waiting for child here */
+        }
+        while ((int)num_spawned() > spawn_window) {
+          set_status() << "num_spawned() > spawn_window";
+          yield wait_for_child();
+          int ret;
+          while (collect(&ret, lease_stack.get())) {
+            if (ret < 0) {
+              tn->log(10, "a sync operation returned error");
+              /* we have reported this error */
             }
             /* not waiting for child here */
           }
         }
-        ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " sync_marker=" << sync_marker.marker
-            << " next_marker=" << next_marker << " truncated=" << truncated << dendl;
-        if (!truncated) {
-          yield wait(get_idle_interval());
-        }
+
+        tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker
+                         << " next_marker=" << next_marker << " truncated=" << truncated));
         if (!next_marker.empty()) {
           sync_marker.marker = next_marker;
         } else if (!log_entries.empty()) {
           sync_marker.marker = log_entries.back().log_id;
         }
+        if (!truncated) {
+          // we reached the end, wait a while before checking for more
+          tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
+         yield wait(get_idle_interval());
+       }
       } while (true);
     }
     return 0;
@@ -1484,23 +1479,26 @@ class RGWDataSyncShardControlCR : public RGWBackoffControlCR {
   uint32_t shard_id;
   rgw_data_sync_marker sync_marker;
 
+  RGWSyncTraceNodeRef tn;
 public:
-  RGWDataSyncShardControlCR(RGWDataSyncEnv *_sync_env, rgw_pool& _pool,
-                    uint32_t _shard_id, rgw_data_sync_marker& _marker) : RGWBackoffControlCR(_sync_env->cct, false),
+  RGWDataSyncShardControlCR(RGWDataSyncEnv *_sync_env, const rgw_pool& _pool,
+                    uint32_t _shard_id, rgw_data_sync_marker& _marker,
+                     RGWSyncTraceNodeRef& _tn_parent) : RGWBackoffControlCR(_sync_env->cct, false),
                                                       sync_env(_sync_env),
                                                      pool(_pool),
                                                      shard_id(_shard_id),
                                                      sync_marker(_marker) {
+    tn = sync_env->sync_tracer->add_node(_tn_parent, "shard", std::to_string(shard_id));
   }
 
   RGWCoroutine *alloc_cr() override {
-    return new RGWDataSyncShardCR(sync_env, pool, shard_id, sync_marker, backoff_ptr());
+    return new RGWDataSyncShardCR(sync_env, pool, shard_id, sync_marker, tn, backoff_ptr());
   }
 
   RGWCoroutine *alloc_finisher_cr() override {
     RGWRados *store = sync_env->store;
-    return new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->async_rados, store,
-                                                          rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
+    return new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->async_rados, store->svc.sysobj,
+                                                          rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
                                                           &sync_marker);
   }
 
@@ -1529,16 +1527,16 @@ class RGWDataSyncCR : public RGWCoroutine {
 
   bool *reset_backoff;
 
-  RGWDataSyncDebugLogger logger;
+  RGWSyncTraceNodeRef tn;
 
   RGWDataSyncModule *data_sync_module{nullptr};
 public:
-  RGWDataSyncCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
+  RGWDataSyncCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards, RGWSyncTraceNodeRef& _tn, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
                                                       sync_env(_sync_env),
                                                       num_shards(_num_shards),
                                                       marker_tracker(NULL),
                                                       shard_crs_lock("RGWDataSyncCR::shard_crs_lock"),
-                                                      reset_backoff(_reset_backoff), logger(sync_env, "Data", "all") {
+                                                      reset_backoff(_reset_backoff), tn(_tn) {
 
   }
 
@@ -1557,19 +1555,19 @@ public:
       data_sync_module = sync_env->sync_module->get_data_handler();
 
       if (retcode < 0 && retcode != -ENOENT) {
-        ldout(sync_env->cct, 0) << "ERROR: failed to fetch sync status, retcode=" << retcode << dendl;
+        tn->log(0, SSTR("ERROR: failed to fetch sync status, retcode=" << retcode));
         return set_cr_error(retcode);
       }
 
       /* state: init status */
       if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) {
-        ldout(sync_env->cct, 20) << __func__ << "(): init" << dendl;
+        tn->log(20, SSTR("init"));
         sync_status.sync_info.num_shards = num_shards;
         uint64_t instance_id;
-        get_random_bytes((char *)&instance_id, sizeof(instance_id));
-        yield call(new RGWInitDataSyncStatusCoroutine(sync_env, num_shards, instance_id, &sync_status));
+        instance_id = ceph::util::generate_random_number<uint64_t>();
+        yield call(new RGWInitDataSyncStatusCoroutine(sync_env, num_shards, instance_id, tn, &sync_status));
         if (retcode < 0) {
-          ldout(sync_env->cct, 0) << "ERROR: failed to init sync, retcode=" << retcode << dendl;
+          tn->log(0, SSTR("ERROR: failed to init sync, retcode=" << retcode));
           return set_cr_error(retcode);
         }
         // sets state = StateBuildingFullSyncMaps
@@ -1580,18 +1578,18 @@ public:
       data_sync_module->init(sync_env, sync_status.sync_info.instance_id);
 
       if  ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) {
+        tn->log(10, SSTR("building full sync maps"));
         /* call sync module init here */
         sync_status.sync_info.num_shards = num_shards;
         yield call(data_sync_module->init_sync(sync_env));
         if (retcode < 0) {
-          ldout(sync_env->cct, 0) << "ERROR: sync module init_sync() failed, retcode=" << retcode << dendl;
+          tn->log(0, SSTR("ERROR: sync module init_sync() failed, retcode=" << retcode));
           return set_cr_error(retcode);
         }
         /* state: building full sync maps */
-        ldout(sync_env->cct, 20) << __func__ << "(): building full sync maps" << dendl;
         yield call(new RGWListBucketIndexesCR(sync_env, &sync_status));
         if (retcode < 0) {
-          ldout(sync_env->cct, 0) << "ERROR: failed to build full sync maps, retcode=" << retcode << dendl;
+          tn->log(0, SSTR("ERROR: failed to build full sync maps, retcode=" << retcode));
           return set_cr_error(retcode);
         }
         sync_status.sync_info.state = rgw_data_sync_info::StateSync;
@@ -1599,19 +1597,22 @@ public:
         /* update new state */
         yield call(set_sync_info_cr());
         if (retcode < 0) {
-          ldout(sync_env->cct, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
+          tn->log(0, SSTR("ERROR: failed to write sync status, retcode=" << retcode));
           return set_cr_error(retcode);
         }
 
         *reset_backoff = true;
       }
 
+      yield call(data_sync_module->start_sync(sync_env));
+
       yield {
         if  ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateSync) {
+          tn->log(10, SSTR("spawning " << num_shards << " shards sync"));
           for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status.sync_markers.begin();
                iter != sync_status.sync_markers.end(); ++iter) {
-            RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sync_env, sync_env->store->get_zone_params().log_pool,
-                                                                          iter->first, iter->second);
+            RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sync_env, sync_env->store->svc.zone->get_zone_params().log_pool,
+                                                                          iter->first, iter->second, tn);
             cr->get();
             shard_crs_lock.Lock();
             shard_crs[iter->first] = cr;
@@ -1628,8 +1629,8 @@ public:
 
   RGWCoroutine *set_sync_info_cr() {
     RGWRados *store = sync_env->store;
-    return new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->async_rados, store,
-                                                         rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
+    return new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->async_rados, store->svc.sysobj,
+                                                         rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
                                                          sync_status.sync_info);
   }
 
@@ -1648,7 +1649,7 @@ class RGWDefaultDataSyncModule : public RGWDataSyncModule {
 public:
   RGWDefaultDataSyncModule() {}
 
-  RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, boost::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override;
+  RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override;
   RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
   RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
                                      rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
@@ -1661,18 +1662,22 @@ public:
   RGWDataSyncModule *get_data_handler() override {
     return &data_handler;
   }
+  bool supports_user_writes() override {
+    return true;
+  }
 };
 
-int RGWDefaultSyncModule::create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance)
+int RGWDefaultSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance)
 {
   instance->reset(new RGWDefaultSyncModuleInstance());
   return 0;
 }
 
-RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, boost::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
+RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
 {
   return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, bucket_info,
-                                 key, versioned_epoch,
+                                std::nullopt,
+                                 key, std::nullopt, versioned_epoch,
                                  true, zones_trace);
 }
 
@@ -1692,19 +1697,101 @@ RGWCoroutine *RGWDefaultDataSyncModule::create_delete_marker(RGWDataSyncEnv *syn
                             &owner.id, &owner.display_name, true, &mtime, zones_trace);
 }
 
+class RGWArchiveDataSyncModule : public RGWDefaultDataSyncModule {
+public:
+  RGWArchiveDataSyncModule() {}
+
+  RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override;
+  RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
+  RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
+                                     rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
+};
+
+class RGWArchiveSyncModuleInstance : public RGWDefaultSyncModuleInstance {
+  RGWArchiveDataSyncModule data_handler;
+public:
+  RGWArchiveSyncModuleInstance() {}
+  RGWDataSyncModule *get_data_handler() override {
+    return &data_handler;
+  }
+  RGWMetadataHandler *alloc_bucket_meta_handler() override {
+    return RGWArchiveBucketMetaHandlerAllocator::alloc();
+  }
+  RGWMetadataHandler *alloc_bucket_instance_meta_handler() override {
+    return RGWArchiveBucketInstanceMetaHandlerAllocator::alloc();
+  }
+};
+
+int RGWArchiveSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance)
+{
+  instance->reset(new RGWArchiveSyncModuleInstance());
+  return 0;
+}
+
+RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
+{
+  ldout(sync_env->cct, 5) << "SYNC_ARCHIVE: sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
+  if (!bucket_info.versioned() ||
+     (bucket_info.flags & BUCKET_VERSIONS_SUSPENDED)) {
+      ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: sync_object: enabling object versioning for archive bucket" << dendl;
+      bucket_info.flags = (bucket_info.flags & ~BUCKET_VERSIONS_SUSPENDED) | BUCKET_VERSIONED;
+      int op_ret = sync_env->store->put_bucket_instance_info(bucket_info, false, real_time(), NULL);
+      if (op_ret < 0) {
+         ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: sync_object: error versioning archive bucket" << dendl;
+         return NULL;
+      }
+  }
+
+  std::optional<rgw_obj_key> dest_key;
+
+  if (versioned_epoch.value_or(0) == 0) { /* force version if not set */
+    versioned_epoch = 0;
+    dest_key = key;
+    if (key.instance.empty()) {
+      sync_env->store->gen_rand_obj_instance_name(&(*dest_key));
+    }
+  }
+
+  return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
+                                 bucket_info, std::nullopt,
+                                 key, dest_key, versioned_epoch,
+                                 true, zones_trace);
+}
+
+RGWCoroutine *RGWArchiveDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key,
+                                                     real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
+{
+  ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: remove_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
+  return NULL;
+}
+
+RGWCoroutine *RGWArchiveDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
+                                                            rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
+{
+  ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
+                                   << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
+  return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
+                            bucket_info, key, versioned, versioned_epoch,
+                            &owner.id, &owner.display_name, true, &mtime, zones_trace);
+}
+
 class RGWDataSyncControlCR : public RGWBackoffControlCR
 {
   RGWDataSyncEnv *sync_env;
   uint32_t num_shards;
 
+  RGWSyncTraceNodeRef tn;
+
   static constexpr bool exit_on_error = false; // retry on all errors
 public:
-  RGWDataSyncControlCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards) : RGWBackoffControlCR(_sync_env->cct, exit_on_error),
-                                                      sync_env(_sync_env), num_shards(_num_shards) {
+  RGWDataSyncControlCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards,
+                       RGWSyncTraceNodeRef& _tn_parent) : RGWBackoffControlCR(_sync_env->cct, exit_on_error),
+                                                          sync_env(_sync_env), num_shards(_num_shards) {
+    tn = sync_env->sync_tracer->add_node(_tn_parent, "sync");
   }
 
   RGWCoroutine *alloc_cr() override {
-    return new RGWDataSyncCR(sync_env, num_shards, backoff_ptr());
+    return new RGWDataSyncCR(sync_env, num_shards, tn, backoff_ptr());
   }
 
   void wakeup(int shard_id, set<string>& keys) {
@@ -1721,6 +1808,7 @@ public:
     m.Unlock();
 
     if (cr) {
+      tn->log(20, SSTR("notify shard=" << shard_id << " keys=" << keys));
       cr->wakeup(shard_id, keys);
     }
 
@@ -1739,7 +1827,7 @@ void RGWRemoteDataLog::wakeup(int shard_id, set<string>& keys) {
 int RGWRemoteDataLog::run_sync(int num_shards)
 {
   lock.get_write();
-  data_sync_cr = new RGWDataSyncControlCR(&sync_env, num_shards);
+  data_sync_cr = new RGWDataSyncControlCR(&sync_env, num_shards, tn);
   data_sync_cr->get(); // run() will drop a ref, so take another
   lock.unlock();
 
@@ -1751,7 +1839,7 @@ int RGWRemoteDataLog::run_sync(int num_shards)
   lock.unlock();
 
   if (r < 0) {
-    ldout(store->ctx(), 0) << "ERROR: failed to run sync" << dendl;
+    ldpp_dout(dpp, 0) << "ERROR: failed to run sync" << dendl;
     return r;
   }
   return 0;
@@ -1759,35 +1847,34 @@ int RGWRemoteDataLog::run_sync(int num_shards)
 
 int RGWDataSyncStatusManager::init()
 {
-  auto zone_def_iter = store->zone_by_id.find(source_zone);
-  if (zone_def_iter == store->zone_by_id.end()) {
-    ldout(store->ctx(), 0) << "ERROR: failed to find zone config info for zone=" << source_zone << dendl;
+  RGWZone *zone_def;
+
+  if (!store->svc.zone->find_zone_by_id(source_zone, &zone_def)) {
+    ldpp_dout(this, 0) << "ERROR: failed to find zone config info for zone=" << source_zone << dendl;
     return -EIO;
   }
 
-  auto& zone_def = zone_def_iter->second;
-
-  if (!store->get_sync_modules_manager()->supports_data_export(zone_def.tier_type)) {
+  if (!store->svc.sync_modules->get_manager()->supports_data_export(zone_def->tier_type)) {
     return -ENOTSUP;
   }
 
-  RGWZoneParams& zone_params = store->get_zone_params();
+  const RGWZoneParams& zone_params = store->svc.zone->get_zone_params();
 
   if (sync_module == nullptr) { 
     sync_module = store->get_sync_module();
   }
 
-  conn = store->get_zone_conn_by_id(source_zone);
+  conn = store->svc.zone->get_zone_conn_by_id(source_zone);
   if (!conn) {
-    ldout(store->ctx(), 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
+    ldpp_dout(this, 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
     return -EINVAL;
   }
 
   error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
 
-  int r = source_log.init(source_zone, conn, error_logger, sync_module);
+  int r = source_log.init(source_zone, conn, error_logger, store->get_sync_tracer(), sync_module);
   if (r < 0) {
-    lderr(store->ctx()) << "ERROR: failed to init remote log, r=" << r << dendl;
+    ldpp_dout(this, 0) << "ERROR: failed to init remote log, r=" << r << dendl;
     finalize();
     return r;
   }
@@ -1795,7 +1882,7 @@ int RGWDataSyncStatusManager::init()
   rgw_datalog_info datalog_info;
   r = source_log.read_log_info(&datalog_info);
   if (r < 0) {
-    ldout(store->ctx(), 5) << "ERROR: master.read_log_info() returned r=" << r << dendl;
+    ldpp_dout(this, 5) << "ERROR: master.read_log_info() returned r=" << r << dendl;
     finalize();
     return r;
   }
@@ -1815,6 +1902,17 @@ void RGWDataSyncStatusManager::finalize()
   error_logger = nullptr;
 }
 
+unsigned RGWDataSyncStatusManager::get_subsys() const
+{
+  return dout_subsys;
+}
+
+std::ostream& RGWDataSyncStatusManager::gen_prefix(std::ostream& out) const
+{
+  auto zone = std::string_view{source_zone};
+  return out << "data sync zone:" << zone.substr(0, 8) << ' ';
+}
+
 string RGWDataSyncStatusManager::sync_status_oid(const string& source_zone)
 {
   char buf[datalog_sync_status_oid_prefix.size() + source_zone.size() + 16];
@@ -1834,6 +1932,7 @@ string RGWDataSyncStatusManager::shard_obj_name(const string& source_zone, int s
 int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn,
                              const rgw_bucket& bucket, int shard_id,
                              RGWSyncErrorLogger *_error_logger,
+                             RGWSyncTraceManager *_sync_tracer,
                              RGWSyncModuleInstanceRef& _sync_module)
 {
   conn = _conn;
@@ -1841,8 +1940,8 @@ int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn,
   bs.bucket = bucket;
   bs.shard_id = shard_id;
 
-  sync_env.init(store->ctx(), store, conn, async_rados, http_manager,
-                _error_logger, source_zone, _sync_module);
+  sync_env.init(dpp, store->ctx(), store, conn, async_rados, http_manager,
+                _error_logger, _sync_tracer, source_zone, _sync_module);
 
   return 0;
 }
@@ -1908,7 +2007,7 @@ public:
       }
       yield {
         auto store = sync_env->store;
-        rgw_raw_obj obj(store->get_zone_params().log_pool, sync_status_oid);
+        rgw_raw_obj obj(store->svc.zone->get_zone_params().log_pool, sync_status_oid);
 
         if (info.syncstopped) {
           call(new RGWRadosRemoveCR(store, obj));
@@ -1917,7 +2016,7 @@ public:
           status.inc_marker.position = info.max_marker;
           map<string, bufferlist> attrs;
           status.encode_all_attrs(attrs);
-          call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store, obj, attrs));
+          call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store->svc.sysobj, obj, attrs));
         }
       }
       if (info.syncstopped) {
@@ -1937,28 +2036,38 @@ RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr()
   return new RGWInitBucketShardSyncStatusCoroutine(&sync_env, bs, init_status);
 }
 
+#define BUCKET_SYNC_ATTR_PREFIX RGW_ATTR_PREFIX "bucket-sync."
+
 template <class T>
-static void decode_attr(CephContext *cct, map<string, bufferlist>& attrs, const string& attr_name, T *val)
+static bool decode_attr(CephContext *cct, map<string, bufferlist>& attrs, const string& attr_name, T *val)
 {
   map<string, bufferlist>::iterator iter = attrs.find(attr_name);
   if (iter == attrs.end()) {
     *val = T();
-    return;
+    return false;
   }
 
-  bufferlist::iterator biter = iter->second.begin();
+  auto biter = iter->second.cbegin();
   try {
-    ::decode(*val, biter);
+    decode(*val, biter);
   } catch (buffer::error& err) {
     ldout(cct, 0) << "ERROR: failed to decode attribute: " << attr_name << dendl;
+    return false;
   }
+  return true;
 }
 
 void rgw_bucket_shard_sync_info::decode_from_attrs(CephContext *cct, map<string, bufferlist>& attrs)
 {
-  decode_attr(cct, attrs, "state", &state);
-  decode_attr(cct, attrs, "full_marker", &full_marker);
-  decode_attr(cct, attrs, "inc_marker", &inc_marker);
+  if (!decode_attr(cct, attrs, BUCKET_SYNC_ATTR_PREFIX "state", &state)) {
+    decode_attr(cct, attrs, "state", &state);
+  }
+  if (!decode_attr(cct, attrs, BUCKET_SYNC_ATTR_PREFIX "full_marker", &full_marker)) {
+    decode_attr(cct, attrs, "full_marker", &full_marker);
+  }
+  if (!decode_attr(cct, attrs, BUCKET_SYNC_ATTR_PREFIX "inc_marker", &inc_marker)) {
+    decode_attr(cct, attrs, "inc_marker", &inc_marker);
+  }
 }
 
 void rgw_bucket_shard_sync_info::encode_all_attrs(map<string, bufferlist>& attrs)
@@ -1970,17 +2079,20 @@ void rgw_bucket_shard_sync_info::encode_all_attrs(map<string, bufferlist>& attrs
 
 void rgw_bucket_shard_sync_info::encode_state_attr(map<string, bufferlist>& attrs)
 {
-  ::encode(state, attrs["state"]);
+  using ceph::encode;
+  encode(state, attrs[BUCKET_SYNC_ATTR_PREFIX "state"]);
 }
 
 void rgw_bucket_shard_full_sync_marker::encode_attr(map<string, bufferlist>& attrs)
 {
-  ::encode(*this, attrs["full_marker"]);
+  using ceph::encode;
+  encode(*this, attrs[BUCKET_SYNC_ATTR_PREFIX "full_marker"]);
 }
 
 void rgw_bucket_shard_inc_sync_marker::encode_attr(map<string, bufferlist>& attrs)
 {
-  ::encode(*this, attrs["inc_marker"]);
+  using ceph::encode;
+  encode(*this, attrs[BUCKET_SYNC_ATTR_PREFIX "inc_marker"]);
 }
 
 class RGWReadBucketSyncStatusCoroutine : public RGWCoroutine {
@@ -2002,9 +2114,9 @@ public:
 int RGWReadBucketSyncStatusCoroutine::operate()
 {
   reenter(this) {
-    yield call(new RGWSimpleRadosReadAttrsCR(sync_env->async_rados, sync_env->store,
-                                                   rgw_raw_obj(sync_env->store->get_zone_params().log_pool, oid),
-                                                   &attrs));
+    yield call(new RGWSimpleRadosReadAttrsCR(sync_env->async_rados, sync_env->store->svc.sysobj,
+                                             rgw_raw_obj(sync_env->store->svc.zone->get_zone_params().log_pool, oid),
+                                             &attrs, true));
     if (retcode == -ENOENT) {
       *status = rgw_bucket_shard_sync_info();
       return set_cr_done();
@@ -2031,6 +2143,7 @@ class RGWReadRecoveringBucketShardsCoroutine : public RGWCoroutine {
   string marker;
   string error_oid;
 
+  RGWRadosGetOmapKeysCR::ResultPtr omapkeys;
   set<string> error_entries;
   int max_omap_entries;
   int count;
@@ -2054,8 +2167,9 @@ int RGWReadRecoveringBucketShardsCoroutine::operate()
     //read recovering bucket shards
     count = 0;
     do {
-      yield call(new RGWRadosGetOmapKeysCR(store, rgw_raw_obj(store->get_zone_params().log_pool, error_oid), 
-            marker, &error_entries, max_omap_entries));
+      omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
+      yield call(new RGWRadosGetOmapKeysCR(store, rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, error_oid), 
+            marker, max_omap_entries, omapkeys));
 
       if (retcode == -ENOENT) {
         break;
@@ -2067,14 +2181,16 @@ int RGWReadRecoveringBucketShardsCoroutine::operate()
         return set_cr_error(retcode);
       }
 
+      error_entries = std::move(omapkeys->entries);
       if (error_entries.empty()) {
         break;
       }
 
       count += error_entries.size();
       marker = *error_entries.rbegin();
-      recovering_buckets.insert(error_entries.begin(), error_entries.end());
-    }while((int)error_entries.size() == max_omap_entries && count < max_entries);
+      recovering_buckets.insert(std::make_move_iterator(error_entries.begin()),
+                                std::make_move_iterator(error_entries.end()));
+    } while (omapkeys->more && count < max_entries);
   
     return set_cr_done();
   }
@@ -2119,8 +2235,8 @@ int RGWReadPendingBucketShardsCoroutine::operate()
   reenter(this){
     //read sync status marker
     using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
-    yield call(new CR(sync_env->async_rados, store
-                      rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
+    yield call(new CR(sync_env->async_rados, store->svc.sysobj,
+                      rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
                       sync_marker));
     if (retcode < 0) {
       ldout(sync_env->cct,0) << "failed to read sync status marker with " 
@@ -2166,9 +2282,9 @@ int RGWRemoteDataLog::read_shard_status(int shard_id, set<string>& pending_bucke
   // cannot run concurrently with run_sync(), so run in a separate manager
   RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
   RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
-  int ret = http_manager.set_threaded();
+  int ret = http_manager.start();
   if (ret < 0) {
-    ldout(store->ctx(), 0) << "failed in http_manager.start() ret=" << ret << dendl;
+    ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
     return ret;
   }
   RGWDataSyncEnv sync_env_local = sync_env;
@@ -2240,6 +2356,9 @@ struct bucket_list_entry {
     JSONDecoder::decode_json("Owner", owner, obj);
     JSONDecoder::decode_json("VersionedEpoch", versioned_epoch, obj);
     JSONDecoder::decode_json("RgwxTag", rgw_tag, obj);
+    if (key.instance == "null" && !versioned_epoch) {
+      key.instance.clear();
+    }
   }
 
   RGWModifyOp get_modify_op() const {
@@ -2354,6 +2473,8 @@ class RGWBucketFullSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<rgw_obj
   string marker_oid;
   rgw_bucket_shard_full_sync_marker sync_marker;
 
+  RGWSyncTraceNodeRef tn;
+
 public:
   RGWBucketFullSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
                          const string& _marker_oid,
@@ -2362,6 +2483,10 @@ public:
                                                                 marker_oid(_marker_oid),
                                                                 sync_marker(_marker) {}
 
+  void set_tn(RGWSyncTraceNodeRef& _tn) {
+    tn = _tn;
+  }
+
   RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override {
     sync_marker.position = new_marker;
     sync_marker.count = index_pos;
@@ -2371,13 +2496,13 @@ public:
 
     RGWRados *store = sync_env->store;
 
-    ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
-    return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store,
-                                          rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
+    tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
+    return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store->svc.sysobj,
+                                          rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, marker_oid),
                                           attrs);
   }
 
-  RGWOrderCallCR *allocate_order_control_cr() {
+  RGWOrderCallCR *allocate_order_control_cr() override {
     return new RGWLastCallerWinsCR(sync_env->cct);
   }
 };
@@ -2397,6 +2522,8 @@ class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string,
   map<string, operation> marker_to_op;
   std::set<std::string> pending_olh; // object names with pending olh operations
 
+  RGWSyncTraceNodeRef tn;
+
   void handle_finish(const string& marker) override {
     auto iter = marker_to_op.find(marker);
     if (iter == marker_to_op.end()) {
@@ -2419,6 +2546,10 @@ public:
                                                                 marker_oid(_marker_oid),
                                                                 sync_marker(_marker) {}
 
+  void set_tn(RGWSyncTraceNodeRef& _tn) {
+    tn = _tn;
+  }
+
   RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
     sync_marker.position = new_marker;
 
@@ -2427,10 +2558,10 @@ public:
 
     RGWRados *store = sync_env->store;
 
-    ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
+    tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
     return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados,
-                                          store,
-                                          rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
+                                          store->svc.sysobj,
+                                          rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, marker_oid),
                                           attrs);
   }
 
@@ -2460,13 +2591,13 @@ public:
   bool can_do_op(const rgw_obj_key& key, bool is_olh) {
     // serialize olh ops on the same object name
     if (is_olh && pending_olh.count(key.name)) {
-      ldout(sync_env->cct, 20) << __func__ << "(): sync of " << key << " waiting for pending olh op" << dendl;
+      tn->log(20, SSTR("sync of " << key << " waiting for pending olh op"));
       return false;
     }
     return (key_to_marker.find(key) == key_to_marker.end());
   }
 
-  RGWOrderCallCR *allocate_order_control_cr() {
+  RGWOrderCallCR *allocate_order_control_cr() override {
     return new RGWLastCallerWinsCR(sync_env->cct);
   }
 };
@@ -2480,7 +2611,7 @@ class RGWBucketSyncSingleEntryCR : public RGWCoroutine {
 
   rgw_obj_key key;
   bool versioned;
-  boost::optional<uint64_t> versioned_epoch;
+  std::optional<uint64_t> versioned_epoch;
   rgw_bucket_entry_owner owner;
   real_time timestamp;
   RGWModifyOp op;
@@ -2493,24 +2624,24 @@ class RGWBucketSyncSingleEntryCR : public RGWCoroutine {
 
   stringstream error_ss;
 
-  RGWDataSyncDebugLogger logger;
-
   bool error_injection;
 
   RGWDataSyncModule *data_sync_module;
   
   rgw_zone_set zones_trace;
 
+  RGWSyncTraceNodeRef tn;
 public:
   RGWBucketSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
                              RGWBucketInfo *_bucket_info,
                              const rgw_bucket_shard& bs,
                              const rgw_obj_key& _key, bool _versioned,
-                             boost::optional<uint64_t> _versioned_epoch,
+                             std::optional<uint64_t> _versioned_epoch,
                              real_time& _timestamp,
                              const rgw_bucket_entry_owner& _owner,
                              RGWModifyOp _op, RGWPendingState _op_state,
-                            const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker, rgw_zone_set& _zones_trace) : RGWCoroutine(_sync_env->cct),
+                            const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker, rgw_zone_set& _zones_trace,
+                             RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct),
                                                      sync_env(_sync_env),
                                                       bucket_info(_bucket_info), bs(bs),
                                                       key(_key), versioned(_versioned), versioned_epoch(_versioned_epoch),
@@ -2523,17 +2654,17 @@ public:
     stringstream ss;
     ss << bucket_shard_str{bs} << "/" << key << "[" << versioned_epoch.value_or(0) << "]";
     set_description() << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state;
-    ldout(sync_env->cct, 20) << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state << dendl;
     set_status("init");
 
-    logger.init(sync_env, "Object", ss.str());
+    tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", SSTR(key));
 
+    tn->log(20, SSTR("bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state));
     error_injection = (sync_env->cct->_conf->rgw_sync_data_inject_err_probability > 0);
 
     data_sync_module = sync_env->sync_module->get_data_handler();
     
     zones_trace = _zones_trace;
-    zones_trace.insert(sync_env->store->get_zone().id);
+    zones_trace.insert(sync_env->store->svc.zone->get_zone().id);
   }
 
   int operate() override {
@@ -2542,58 +2673,57 @@ public:
       if (op_state != CLS_RGW_STATE_COMPLETE) {
         goto done;
       }
+      tn->set_flag(RGW_SNS_FLAG_ACTIVE);
       do {
         yield {
           marker_tracker->reset_need_retry(key);
           if (key.name.empty()) {
             /* shouldn't happen */
             set_status("skipping empty entry");
-            ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): entry with empty obj name, skipping" << dendl;
+            tn->log(0, "entry with empty obj name, skipping");
             goto done;
           }
           if (error_injection &&
               rand() % 10000 < cct->_conf->rgw_sync_data_inject_err_probability * 10000.0) {
-            ldout(sync_env->cct, 0) << __func__ << ": injecting data sync error on key=" << key.name << dendl;
+            tn->log(0, SSTR(": injecting data sync error on key=" << key.name));
             retcode = -EIO;
           } else if (op == CLS_RGW_OP_ADD ||
                      op == CLS_RGW_OP_LINK_OLH) {
             set_status("syncing obj");
-            ldout(sync_env->cct, 5) << "bucket sync: sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]" << dendl;
-            logger.log("fetch");
+            tn->log(5, SSTR("bucket sync: sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
             call(data_sync_module->sync_object(sync_env, *bucket_info, key, versioned_epoch, &zones_trace));
           } else if (op == CLS_RGW_OP_DEL || op == CLS_RGW_OP_UNLINK_INSTANCE) {
             set_status("removing obj");
             if (op == CLS_RGW_OP_UNLINK_INSTANCE) {
               versioned = true;
             }
-            logger.log("remove");
+            tn->log(10, SSTR("removing obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
             call(data_sync_module->remove_object(sync_env, *bucket_info, key, timestamp, versioned, versioned_epoch.value_or(0), &zones_trace));
             // our copy of the object is more recent, continue as if it succeeded
             if (retcode == -ERR_PRECONDITION_FAILED) {
               retcode = 0;
             }
           } else if (op == CLS_RGW_OP_LINK_OLH_DM) {
-            logger.log("creating delete marker");
             set_status("creating delete marker");
-            ldout(sync_env->cct, 10) << "creating delete marker: obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]" << dendl;
+            tn->log(10, SSTR("creating delete marker: obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
             call(data_sync_module->create_delete_marker(sync_env, *bucket_info, key, timestamp, owner, versioned, versioned_epoch.value_or(0), &zones_trace));
           }
+          tn->set_resource_name(SSTR(bucket_str_noinstance(bucket_info->bucket) << "/" << key));
         }
       } while (marker_tracker->need_retry(key));
       {
-        stringstream ss;
+        tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
         if (retcode >= 0) {
-          ss << "done";
+          tn->log(10, "success");
         } else {
-          ss << "done, retcode=" << retcode;
+          tn->log(10, SSTR("failed, retcode=" << retcode << " (" << cpp_strerror(-retcode) << ")"));
         }
-        logger.log(ss.str());
       }
 
       if (retcode < 0 && retcode != -ENOENT) {
         set_status() << "failed to sync obj; retcode=" << retcode;
-        ldout(sync_env->cct, 0) << "ERROR: failed to sync object: "
-            << bucket_shard_str{bs} << "/" << key.name << dendl;
+        tn->log(0, SSTR("ERROR: failed to sync object: "
+            << bucket_shard_str{bs} << "/" << key.name));
         error_ss << bucket_shard_str{bs} << "/" << key.name;
         sync_status = retcode;
       }
@@ -2636,20 +2766,24 @@ class RGWBucketShardFullSyncCR : public RGWCoroutine {
 
   const string& status_oid;
 
-  RGWDataSyncDebugLogger logger;
   rgw_zone_set zones_trace;
+
+  RGWSyncTraceNodeRef tn;
 public:
   RGWBucketShardFullSyncCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
                            RGWBucketInfo *_bucket_info,
                            const std::string& status_oid,
                            RGWContinuousLeaseCR *lease_cr,
-                           rgw_bucket_shard_sync_info& sync_info)
+                           rgw_bucket_shard_sync_info& sync_info,
+                           RGWSyncTraceNodeRef tn_parent)
     : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
       bucket_info(_bucket_info), lease_cr(lease_cr), sync_info(sync_info),
       marker_tracker(sync_env, status_oid, sync_info.full_marker),
-      status_oid(status_oid) {
-    logger.init(sync_env, "BucketFull", bs.get_key());
+      status_oid(status_oid),
+      tn(sync_env->sync_tracer->add_node(tn_parent, "full_sync",
+                                         SSTR(bucket_shard_str{bs}))) {
     zones_trace.insert(sync_env->source_zone);
+    marker_tracker.set_tn(tn);
   }
 
   int operate() override;
@@ -2668,7 +2802,7 @@ int RGWBucketShardFullSyncCR::operate()
         return set_cr_error(-ECANCELED);
       }
       set_status("listing remote bucket");
-      ldout(sync_env->cct, 20) << __func__ << "(): listing bucket for full sync" << dendl;
+      tn->log(20, "listing bucket for full sync");
       yield call(new RGWListBucketShardCR(sync_env, bs, list_marker,
                                           &list_result));
       if (retcode < 0 && retcode != -ENOENT) {
@@ -2676,26 +2810,29 @@ int RGWBucketShardFullSyncCR::operate()
         drain_all();
         return set_cr_error(retcode);
       }
+      if (list_result.entries.size() > 0) {
+        tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
+      }
       entries_iter = list_result.entries.begin();
       for (; entries_iter != list_result.entries.end(); ++entries_iter) {
         if (!lease_cr->is_locked()) {
           drain_all();
           return set_cr_error(-ECANCELED);
         }
-        ldout(sync_env->cct, 20) << "[full sync] syncing object: "
-            << bucket_shard_str{bs} << "/" << entries_iter->key << dendl;
+        tn->log(20, SSTR("[full sync] syncing object: "
+            << bucket_shard_str{bs} << "/" << entries_iter->key));
         entry = &(*entries_iter);
         total_entries++;
         list_marker = entries_iter->key;
         if (!marker_tracker.start(entry->key, total_entries, real_time())) {
-          ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << entry->key << ". Duplicate entry?" << dendl;
+          tn->log(0, SSTR("ERROR: cannot start syncing " << entry->key << ". Duplicate entry?"));
         } else {
           using SyncCR = RGWBucketSyncSingleEntryCR<rgw_obj_key, rgw_obj_key>;
           yield spawn(new SyncCR(sync_env, bucket_info, bs, entry->key,
                                  false, /* versioned, only matters for object removal */
                                  entry->versioned_epoch, entry->mtime,
                                  entry->owner, entry->get_modify_op(), CLS_RGW_STATE_COMPLETE,
-                                 entry->key, &marker_tracker, zones_trace),
+                                 entry->key, &marker_tracker, zones_trace, tn),
                       false);
         }
         while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
@@ -2704,7 +2841,7 @@ int RGWBucketShardFullSyncCR::operate()
           while (again) {
             again = collect(&ret, nullptr);
             if (ret < 0) {
-              ldout(sync_env->cct, 10) << "a sync operation returned error" << dendl;
+              tn->log(10, "a sync operation returned error");
               sync_status = ret;
               /* we have reported this error */
             }
@@ -2720,12 +2857,13 @@ int RGWBucketShardFullSyncCR::operate()
       while (again) {
         again = collect(&ret, nullptr);
         if (ret < 0) {
-          ldout(sync_env->cct, 10) << "a sync operation returned error" << dendl;
+          tn->log(10, "a sync operation returned error");
           sync_status = ret;
           /* we have reported this error */
         }
       }
     }
+    tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
     if (!lease_cr->is_locked()) {
       return set_cr_error(-ECANCELED);
     }
@@ -2736,16 +2874,16 @@ int RGWBucketShardFullSyncCR::operate()
         map<string, bufferlist> attrs;
         sync_info.encode_state_attr(attrs);
         RGWRados *store = sync_env->store;
-        call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store,
-                                            rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
+        call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store->svc.sysobj,
+                                            rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
                                             attrs));
       }
     } else {
-      ldout(sync_env->cct, 10) << "failure in sync, backing out (sync_status=" << sync_status<< ")" << dendl;
+      tn->log(10, SSTR("backing out with sync_status=" << sync_status));
     }
     if (retcode < 0 && sync_status == 0) { /* actually tried to set incremental state and failed */
-      ldout(sync_env->cct, 0) << "ERROR: failed to set sync state on bucket "
-          << bucket_shard_str{bs} << " retcode=" << retcode << dendl;
+      tn->log(0, SSTR("ERROR: failed to set sync state on bucket "
+          << bucket_shard_str{bs} << " retcode=" << retcode));
       return set_cr_error(retcode);
     }
     if (sync_status < 0) {
@@ -2778,27 +2916,29 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
 
   string cur_id;
 
-  RGWDataSyncDebugLogger logger;
-
   int sync_status{0};
   bool syncstopped{false};
 
+  RGWSyncTraceNodeRef tn;
 public:
   RGWBucketShardIncrementalSyncCR(RGWDataSyncEnv *_sync_env,
                                   const rgw_bucket_shard& bs,
                                   RGWBucketInfo *_bucket_info,
                                   const std::string& status_oid,
                                   RGWContinuousLeaseCR *lease_cr,
-                                  rgw_bucket_shard_sync_info& sync_info)
+                                  rgw_bucket_shard_sync_info& sync_info,
+                                  RGWSyncTraceNodeRef& _tn_parent)
     : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
       bucket_info(_bucket_info), lease_cr(lease_cr), sync_info(sync_info),
       marker_tracker(sync_env, status_oid, sync_info.inc_marker),
-      status_oid(status_oid), zone_id(_sync_env->store->get_zone().id)
+      status_oid(status_oid), zone_id(_sync_env->store->svc.zone->get_zone().id),
+      tn(sync_env->sync_tracer->add_node(_tn_parent, "inc_sync",
+                                         SSTR(bucket_shard_str{bs})))
   {
     set_description() << "bucket shard incremental sync bucket="
         << bucket_shard_str{bs};
     set_status("init");
-    logger.init(sync_env, "BucketInc", bs.get_key());
+    marker_tracker.set_tn(tn);
   }
 
   int operate() override;
@@ -2811,9 +2951,10 @@ int RGWBucketShardIncrementalSyncCR::operate()
     do {
       if (!lease_cr->is_locked()) {
         drain_all();
+        tn->log(0, "ERROR: lease is not taken, abort");
         return set_cr_error(-ECANCELED);
       }
-      ldout(sync_env->cct, 20) << __func__ << "(): listing bilog for incremental sync" << sync_info.inc_marker.position << dendl;
+      tn->log(20, SSTR("listing bilog for incremental sync" << sync_info.inc_marker.position));
       set_status() << "listing bilog; position=" << sync_info.inc_marker.position;
       yield call(new RGWListBucketIndexLogCR(sync_env, bs, sync_info.inc_marker.position,
                                              &list_result));
@@ -2880,16 +3021,16 @@ int RGWBucketShardIncrementalSyncCR::operate()
 
         if (!key.set(rgw_obj_index_key{entry->object, entry->instance})) {
           set_status() << "parse_raw_oid() on " << entry->object << " returned false, skipping entry";
-          ldout(sync_env->cct, 20) << "parse_raw_oid() on " << entry->object << " returned false, skipping entry" << dendl;
+          tn->log(20, SSTR("parse_raw_oid() on " << entry->object << " returned false, skipping entry"));
           marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
           continue;
         }
 
-        ldout(sync_env->cct, 20) << "parsed entry: id=" << cur_id << " iter->object=" << entry->object << " iter->instance=" << entry->instance << " name=" << key.name << " instance=" << key.instance << " ns=" << key.ns << dendl;
+        tn->log(20, SSTR("parsed entry: id=" << cur_id << " iter->object=" << entry->object << " iter->instance=" << entry->instance << " name=" << key.name << " instance=" << key.instance << " ns=" << key.ns));
 
         if (!key.ns.empty()) {
           set_status() << "skipping entry in namespace: " << entry->object;
-          ldout(sync_env->cct, 20) << "skipping entry in namespace: " << entry->object << dendl;
+          tn->log(20, SSTR("skipping entry in namespace: " << entry->object));
           marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
           continue;
         }
@@ -2897,47 +3038,48 @@ int RGWBucketShardIncrementalSyncCR::operate()
         set_status() << "got entry.id=" << cur_id << " key=" << key << " op=" << (int)entry->op;
         if (entry->op == CLS_RGW_OP_CANCEL) {
           set_status() << "canceled operation, skipping";
-          ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
-              << bucket_shard_str{bs} << "/" << key << ": canceled operation" << dendl;
+          tn->log(20, SSTR("skipping object: "
+              << bucket_shard_str{bs} << "/" << key << ": canceled operation"));
           marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
           continue;
         }
         if (entry->state != CLS_RGW_STATE_COMPLETE) {
           set_status() << "non-complete operation, skipping";
-          ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
-              << bucket_shard_str{bs} << "/" << key << ": non-complete operation" << dendl;
+          tn->log(20, SSTR("skipping object: "
+              << bucket_shard_str{bs} << "/" << key << ": non-complete operation"));
           marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
           continue;
         }
         if (entry->zones_trace.find(zone_id) != entry->zones_trace.end()) {
           set_status() << "redundant operation, skipping";
-          ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
-              <<bucket_shard_str{bs} <<"/"<<key<<": redundant operation" << dendl;
+          tn->log(20, SSTR("skipping object: "
+              <<bucket_shard_str{bs} <<"/"<<key<<": redundant operation"));
           marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
           continue;
         }
         if (make_pair<>(entry->timestamp, entry->op) != squash_map[make_pair(entry->object, entry->instance)]) {
           set_status() << "squashed operation, skipping";
-          ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
-              << bucket_shard_str{bs} << "/" << key << ": squashed operation" << dendl;
+          tn->log(20, SSTR("skipping object: "
+              << bucket_shard_str{bs} << "/" << key << ": squashed operation"));
           marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
           continue;
         }
-        ldout(sync_env->cct, 20) << "[inc sync] syncing object: "
-            << bucket_shard_str{bs} << "/" << key << dendl;
+        tn->set_flag(RGW_SNS_FLAG_ACTIVE);
+        tn->log(20, SSTR("syncing object: "
+            << bucket_shard_str{bs} << "/" << key));
         updated_status = false;
         while (!marker_tracker.can_do_op(key, has_olh_epoch(entry->op))) {
           if (!updated_status) {
             set_status() << "can't do op, conflicting inflight operation";
             updated_status = true;
           }
-          ldout(sync_env->cct, 5) << *this << ": [inc sync] can't do op on key=" << key << " need to wait for conflicting operation to complete" << dendl;
+          tn->log(5, SSTR("can't do op on key=" << key << " need to wait for conflicting operation to complete"));
           yield wait_for_child();
           bool again = true;
           while (again) {
             again = collect(&ret, nullptr);
             if (ret < 0) {
-              ldout(sync_env->cct, 0) << "ERROR: a child operation returned error (ret=" << ret << ")" << dendl;
+              tn->log(0, SSTR("ERROR: a child operation returned error (ret=" << ret << ")"));
               sync_status = ret;
               /* we have reported this error */
             }
@@ -2951,26 +3093,26 @@ int RGWBucketShardIncrementalSyncCR::operate()
         }
         if (!marker_tracker.index_key_to_marker(key, cur_id, has_olh_epoch(entry->op))) {
           set_status() << "can't do op, sync already in progress for object";
-          ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << cur_id << ":" << key << " sync already in progress for object" << dendl;
+          tn->log(20, SSTR("skipping sync of entry: " << cur_id << ":" << key << " sync already in progress for object"));
           marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
           continue;
         }
         // yield {
           set_status() << "start object sync";
           if (!marker_tracker.start(cur_id, 0, entry->timestamp)) {
-            ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << cur_id << ". Duplicate entry?" << dendl;
+            tn->log(0, SSTR("ERROR: cannot start syncing " << cur_id << ". Duplicate entry?"));
           } else {
-            boost::optional<uint64_t> versioned_epoch;
+            std::optional<uint64_t> versioned_epoch;
             rgw_bucket_entry_owner owner(entry->owner, entry->owner_display_name);
             if (entry->ver.pool < 0) {
               versioned_epoch = entry->ver.epoch;
             }
-            ldout(sync_env->cct, 20) << __func__ << "(): entry->timestamp=" << entry->timestamp << dendl;
+            tn->log(20, SSTR("entry->timestamp=" << entry->timestamp));
             using SyncCR = RGWBucketSyncSingleEntryCR<string, rgw_obj_key>;
             spawn(new SyncCR(sync_env, bucket_info, bs, key,
                              entry->is_versioned(), versioned_epoch,
                              entry->timestamp, owner, entry->op, entry->state,
-                             cur_id, &marker_tracker, entry->zones_trace),
+                             cur_id, &marker_tracker, entry->zones_trace, tn),
                   false);
           }
         // }
@@ -2981,7 +3123,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
           while (again) {
             again = collect(&ret, nullptr);
             if (ret < 0) {
-              ldout(sync_env->cct, 10) << "a sync operation returned error" << dendl;
+              tn->log(10, "a sync operation returned error");
               sync_status = ret;
               /* we have reported this error */
             }
@@ -2997,13 +3139,14 @@ int RGWBucketShardIncrementalSyncCR::operate()
       while (again) {
         again = collect(&ret, nullptr);
         if (ret < 0) {
-          ldout(sync_env->cct, 10) << "a sync operation returned error" << dendl;
+          tn->log(10, "a sync operation returned error");
           sync_status = ret;
           /* we have reported this error */
         }
         /* not waiting for child here */
       }
     }
+    tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
 
     if (syncstopped) {
       // transition back to StateInit in RGWRunBucketSyncCoroutine. if sync is
@@ -3016,11 +3159,11 @@ int RGWBucketShardIncrementalSyncCR::operate()
 
     yield call(marker_tracker.flush());
     if (retcode < 0) {
-      ldout(sync_env->cct, 0) << "ERROR: marker_tracker.flush() returned retcode=" << retcode << dendl;
+      tn->log(0, SSTR("ERROR: marker_tracker.flush() returned retcode=" << retcode));
       return set_cr_error(retcode);
     }
     if (sync_status < 0) {
-      ldout(sync_env->cct, 10) << "failure in sync, backing out (sync_status=" << sync_status<< ")" << dendl;
+      tn->log(10, SSTR("backing out with sync_status=" << sync_status));
       return set_cr_error(sync_status);
     }
     return set_cr_done();
@@ -3035,7 +3178,7 @@ int RGWRunBucketSyncCoroutine::operate()
       set_status("acquiring sync lock");
       auto store = sync_env->store;
       lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
-                                              rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
+                                              rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
                                               "sync_lock",
                                               cct->_conf->rgw_sync_lease_period,
                                               this));
@@ -3043,7 +3186,7 @@ int RGWRunBucketSyncCoroutine::operate()
     }
     while (!lease_cr->is_locked()) {
       if (lease_cr->is_done()) {
-        ldout(cct, 5) << "lease cr failed, done early" << dendl;
+        tn->log(5, "failed to take lease");
         set_status("lease lock failed, early abort");
         drain_all();
         return set_cr_error(lease_cr->get_ret_status());
@@ -3052,35 +3195,35 @@ int RGWRunBucketSyncCoroutine::operate()
       yield;
     }
 
+    tn->log(10, "took lease");
     yield call(new RGWReadBucketSyncStatusCoroutine(sync_env, bs, &sync_status));
     if (retcode < 0 && retcode != -ENOENT) {
-      ldout(sync_env->cct, 0) << "ERROR: failed to read sync status for bucket="
-          << bucket_shard_str{bs} << dendl;
+      tn->log(0, "ERROR: failed to read sync status for bucket");
       lease_cr->go_down();
       drain_all();
       return set_cr_error(retcode);
     }
 
-    ldout(sync_env->cct, 20) << __func__ << "(): sync status for bucket "
-        << bucket_shard_str{bs} << ": " << sync_status.state << dendl;
+    tn->log(20, SSTR("sync status for bucket: " << sync_status.state));
 
     yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
     if (retcode == -ENOENT) {
       /* bucket instance info has not been synced in yet, fetch it now */
       yield {
-        ldout(sync_env->cct, 10) << "no local info for bucket "
-            << bucket_str{bs.bucket} << ": fetching metadata" << dendl;
+        tn->log(10, SSTR("no local info for bucket:" << ": fetching metadata"));
         string raw_key = string("bucket.instance:") + bs.bucket.get_key();
 
-        meta_sync_env.init(cct, sync_env->store, sync_env->store->rest_master_conn, sync_env->async_rados, sync_env->http_manager, sync_env->error_logger);
+        meta_sync_env.init(sync_env->dpp, cct, sync_env->store, sync_env->store->svc.zone->get_master_conn(), sync_env->async_rados,
+                           sync_env->http_manager, sync_env->error_logger, sync_env->sync_tracer);
 
         call(new RGWMetaSyncSingleEntryCR(&meta_sync_env, raw_key,
                                           string() /* no marker */,
                                           MDLOG_STATUS_COMPLETE,
-                                          NULL /* no marker tracker */));
+                                          NULL /* no marker tracker */,
+                                          tn));
       }
       if (retcode < 0) {
-        ldout(sync_env->cct, 0) << "ERROR: failed to fetch bucket instance info for " << bucket_str{bs.bucket} << dendl;
+        tn->log(0, SSTR("ERROR: failed to fetch bucket instance info for " << bucket_str{bs.bucket}));
         lease_cr->go_down();
         drain_all();
         return set_cr_error(retcode);
@@ -3089,7 +3232,7 @@ int RGWRunBucketSyncCoroutine::operate()
       yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
     }
     if (retcode < 0) {
-      ldout(sync_env->cct, 0) << "ERROR: failed to retrieve bucket info for bucket=" << bucket_str{bs.bucket} << dendl;
+      tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{bs.bucket}));
       lease_cr->go_down();
       drain_all();
       return set_cr_error(retcode);
@@ -3099,7 +3242,7 @@ int RGWRunBucketSyncCoroutine::operate()
       if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
         yield call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bs, sync_status));
         if (retcode == -ENOENT) {
-          ldout(sync_env->cct, 0) << "bucket sync disabled" << dendl;
+          tn->log(0, "bucket sync disabled");
           lease_cr->abort(); // deleted lease object, abort/wakeup instead of unlock
           lease_cr->wakeup();
           lease_cr.reset();
@@ -3107,8 +3250,7 @@ int RGWRunBucketSyncCoroutine::operate()
           return set_cr_done();
         }
         if (retcode < 0) {
-          ldout(sync_env->cct, 0) << "ERROR: init sync on " << bucket_shard_str{bs}
-              << " failed, retcode=" << retcode << dendl;
+          tn->log(0, SSTR("ERROR: init sync on bucket failed, retcode=" << retcode));
           lease_cr->go_down();
           drain_all();
           return set_cr_error(retcode);
@@ -3118,10 +3260,9 @@ int RGWRunBucketSyncCoroutine::operate()
       if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
         yield call(new RGWBucketShardFullSyncCR(sync_env, bs, &bucket_info,
                                                 status_oid, lease_cr.get(),
-                                                sync_status));
+                                                sync_status, tn));
         if (retcode < 0) {
-          ldout(sync_env->cct, 5) << "full sync on " << bucket_shard_str{bs}
-              << " failed, retcode=" << retcode << dendl;
+          tn->log(5, SSTR("full sync on bucket failed, retcode=" << retcode));
           lease_cr->go_down();
           drain_all();
           return set_cr_error(retcode);
@@ -3131,10 +3272,9 @@ int RGWRunBucketSyncCoroutine::operate()
       if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
         yield call(new RGWBucketShardIncrementalSyncCR(sync_env, bs, &bucket_info,
                                                        status_oid, lease_cr.get(),
-                                                       sync_status));
+                                                       sync_status, tn));
         if (retcode < 0) {
-          ldout(sync_env->cct, 5) << "incremental sync on " << bucket_shard_str{bs}
-              << " failed, retcode=" << retcode << dendl;
+          tn->log(5, SSTR("incremental sync on bucket failed, retcode=" << retcode));
           lease_cr->go_down();
           drain_all();
           return set_cr_error(retcode);
@@ -3153,20 +3293,20 @@ int RGWRunBucketSyncCoroutine::operate()
 
 RGWCoroutine *RGWRemoteBucketLog::run_sync_cr()
 {
-  return new RGWRunBucketSyncCoroutine(&sync_env, bs);
+  return new RGWRunBucketSyncCoroutine(&sync_env, bs, sync_env.sync_tracer->root_node);
 }
 
 int RGWBucketSyncStatusManager::init()
 {
-  conn = store->get_zone_conn_by_id(source_zone);
+  conn = store->svc.zone->get_zone_conn_by_id(source_zone);
   if (!conn) {
-    ldout(store->ctx(), 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
+    ldpp_dout(this, 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
     return -EINVAL;
   }
 
-  int ret = http_manager.set_threaded();
+  int ret = http_manager.start();
   if (ret < 0) {
-    ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
+    ldpp_dout(this, 0) << "failed in http_manager.start() ret=" << ret << dendl;
     return ret;
   }
 
@@ -3181,7 +3321,7 @@ int RGWBucketSyncStatusManager::init()
   bucket_instance_meta_info result;
   ret = cr_mgr.run(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), conn, &http_manager, path, pairs, &result));
   if (ret < 0) {
-    ldout(store->ctx(), 0) << "ERROR: failed to fetch bucket metadata info from zone=" << source_zone << " path=" << path << " key=" << key << " ret=" << ret << dendl;
+    ldpp_dout(this, 0) << "ERROR: failed to fetch bucket metadata info from zone=" << source_zone << " path=" << path << " key=" << key << " ret=" << ret << dendl;
     return ret;
   }
 
@@ -3197,10 +3337,10 @@ int RGWBucketSyncStatusManager::init()
   auto async_rados = store->get_async_rados();
 
   for (int i = 0; i < effective_num_shards; i++) {
-    RGWRemoteBucketLog *l = new RGWRemoteBucketLog(store, this, async_rados, &http_manager);
-    ret = l->init(source_zone, conn, bucket, (num_shards ? i : -1), error_logger, sync_module);
+    RGWRemoteBucketLog *l = new RGWRemoteBucketLog(this, store, this, async_rados, &http_manager);
+    ret = l->init(source_zone, conn, bucket, (num_shards ? i : -1), error_logger, store->get_sync_tracer(), sync_module);
     if (ret < 0) {
-      ldout(store->ctx(), 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl;
+      ldpp_dout(this, 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl;
       return ret;
     }
     source_logs[i] = l;
@@ -3238,7 +3378,7 @@ int RGWBucketSyncStatusManager::read_sync_status()
 
   int ret = cr_mgr.run(stacks);
   if (ret < 0) {
-    ldout(store->ctx(), 0) << "ERROR: failed to read sync status for "
+    ldpp_dout(this, 0) << "ERROR: failed to read sync status for "
         << bucket_str{bucket} << dendl;
     return ret;
   }
@@ -3260,7 +3400,7 @@ int RGWBucketSyncStatusManager::run()
 
   int ret = cr_mgr.run(stacks);
   if (ret < 0) {
-    ldout(store->ctx(), 0) << "ERROR: failed to read sync status for "
+    ldpp_dout(this, 0) << "ERROR: failed to read sync status for "
         << bucket_str{bucket} << dendl;
     return ret;
   }
@@ -3268,12 +3408,31 @@ int RGWBucketSyncStatusManager::run()
   return 0;
 }
 
+unsigned RGWBucketSyncStatusManager::get_subsys() const
+{
+  return dout_subsys;
+}
+
+std::ostream& RGWBucketSyncStatusManager::gen_prefix(std::ostream& out) const
+{
+  auto zone = std::string_view{source_zone};
+  return out << "bucket sync zone:" << zone.substr(0, 8)
+      << " bucket:" << bucket.name << ' ';
+}
+
 string RGWBucketSyncStatusManager::status_oid(const string& source_zone,
                                               const rgw_bucket_shard& bs)
 {
   return bucket_status_oid_prefix + "." + source_zone + ":" + bs.get_key();
 }
 
+string RGWBucketSyncStatusManager::obj_status_oid(const string& source_zone,
+                                                  const rgw_obj& obj)
+{
+  return object_status_oid_prefix + "." + source_zone + ":" + obj.bucket.get_key() + ":" +
+         obj.key.name + ":" + obj.key.instance;
+}
+
 class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR {
   static constexpr int max_concurrent_shards = 16;
   RGWRados *const store;
@@ -3305,7 +3464,7 @@ class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR {
   }
 };
 
-int rgw_bucket_sync_status(RGWRados *store, const std::string& source_zone,
+int rgw_bucket_sync_status(const DoutPrefixProvider *dpp, RGWRados *store, const std::string& source_zone,
                            const RGWBucketInfo& bucket_info,
                            std::vector<rgw_bucket_shard_sync_info> *status)
 {
@@ -3315,8 +3474,8 @@ int rgw_bucket_sync_status(RGWRados *store, const std::string& source_zone,
 
   RGWDataSyncEnv env;
   RGWSyncModuleInstanceRef module; // null sync module
-  env.init(store->ctx(), store, nullptr, store->get_async_rados(),
-           nullptr, nullptr, source_zone, module);
+  env.init(dpp, store->ctx(), store, nullptr, store->get_async_rados(),
+           nullptr, nullptr, nullptr, source_zone, module);
 
   RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
   return crs.run(new RGWCollectBucketSyncStatusCR(store, &env, num_shards,
@@ -3387,8 +3546,8 @@ class DataLogTrimCR : public RGWCoroutine {
                    int num_shards, std::vector<std::string>& last_trim)
     : RGWCoroutine(store->ctx()), store(store), http(http),
       num_shards(num_shards),
-      zone_id(store->get_zone().id),
-      peer_status(store->zone_conn_map.size()),
+      zone_id(store->svc.zone->get_zone().id),
+      peer_status(store->svc.zone->get_zone_conn_map().size()),
       min_shard_markers(num_shards),
       last_trim(last_trim)
   {}
@@ -3411,7 +3570,7 @@ int DataLogTrimCR::operate()
       };
 
       auto p = peer_status.begin();
-      for (auto& c : store->zone_conn_map) {
+      for (auto& c : store->svc.zone->get_zone_conn_map()) {
         ldout(cct, 20) << "query sync status from " << c.first << dendl;
         using StatusCR = RGWReadRESTResourceCR<rgw_data_sync_status>;
         spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
@@ -3501,7 +3660,7 @@ int DataLogTrimPollCR::operate()
       // prevent other gateways from attempting to trim for the duration
       set_status("acquiring trim lock");
       yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
-                                          rgw_raw_obj(store->get_zone_params().log_pool, lock_oid),
+                                          rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, lock_oid),
                                           "data_trim", lock_cookie,
                                           interval.sec()));
       if (retcode < 0) {