]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_rados.cc
update sources to 12.2.7
[ceph.git] / ceph / src / rgw / rgw_rados.cc
index 562b067beb23530d8de9b0aba5aa85aed9df08c8..100f77a699bc0310cfb1b0684cfd7dc174f9dedf 100644 (file)
@@ -1,6 +1,7 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
 
+#include "include/compat.h"
 #include <errno.h>
 #include <stdlib.h>
 #include <sys/types.h>
 #include "cls/timeindex/cls_timeindex_client.h"
 #include "cls/lock/cls_lock_client.h"
 #include "cls/user/cls_user_client.h"
+#include "osd/osd_types.h"
 
 #include "rgw_tools.h"
 #include "rgw_coroutine.h"
 #include "rgw_compression.h"
 
-#include "rgw_boost_asio_yield.h"
 #undef fork // fails to compile RGWPeriod::fork() below
 
 #include "common/Clock.h"
@@ -69,11 +70,10 @@ using namespace librados;
 #include "rgw_sync.h"
 #include "rgw_data_sync.h"
 #include "rgw_realm_watcher.h"
+#include "rgw_reshard.h"
 
 #include "compressor/Compressor.h"
 
-#include <atomic>
-
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rgw
 
@@ -130,7 +130,7 @@ static bool rgw_get_obj_data_pool(const RGWZoneGroup& zonegroup, const RGWZonePa
     if (!obj.in_extra_data) {
       *pool = placement.data_pool;
     } else {
-      *pool = placement.data_extra_pool;
+      *pool = placement.get_data_extra_pool();
     }
   }
 
@@ -170,13 +170,28 @@ int rgw_init_ioctx(librados::Rados *rados, const rgw_pool& pool, IoCtx& ioctx, b
   int r = rados->ioctx_create(pool.name.c_str(), ioctx);
   if (r == -ENOENT && create) {
     r = rados->pool_create(pool.name.c_str());
+    if (r == -ERANGE) {
+      dout(0)
+        << __func__
+        << " ERROR: librados::Rados::pool_create returned " << cpp_strerror(-r)
+        << " (this can be due to a pool or placement group misconfiguration, e.g."
+        << " pg_num < pgp_num or mon_max_pg_per_osd exceeded)"
+        << dendl;
+    }
     if (r < 0 && r != -EEXIST) {
       return r;
     }
 
     r = rados->ioctx_create(pool.name.c_str(), ioctx);
-  }
-  if (r < 0) {
+    if (r < 0) {
+      return r;
+    }
+
+    r = ioctx.application_enable(pg_pool_t::APPLICATION_NAME_RGW, false);
+    if (r < 0 && r != -EOPNOTSUPP) {
+      return r;
+    }
+  } else if (r < 0) {
     return r;
   }
   if (!pool.ns.empty()) {
@@ -427,7 +442,6 @@ void RGWZoneGroup::post_process_params()
   for (map<string, RGWZone>::iterator iter = zones.begin(); iter != zones.end(); ++iter) {
     RGWZone& zone = iter->second;
     zone.log_data = log_data;
-    zone.log_meta = (is_master && zone.id == master_zone);
 
     RGWZoneParams zone_params(zone.id, zone.name);
     int ret = zone_params.init(cct, store);
@@ -474,9 +488,9 @@ int RGWZoneGroup::read_default_id(string& default_id, bool old_format)
     /* try using default realm */
     RGWRealm realm;
     int ret = realm.init(cct, store);
+    // no default realm exist
     if (ret < 0) {
-      ldout(cct, 10) << "could not read realm id: " << cpp_strerror(-ret) << dendl;
-      return -ENOENT;
+      return read_id(default_zonegroup_name, default_id);
     }
     realm_id = realm.get_id();
   }
@@ -1089,11 +1103,6 @@ int RGWPeriod::get_zonegroup(RGWZoneGroup& zonegroup, const string& zonegroup_id
   return -ENOENT;
 }
 
-bool RGWPeriod::is_single_zonegroup(CephContext *cct, RGWRados *store)
-{
-  return (period_map.zonegroups.size() == 1);
-}
-
 const string& RGWPeriod::get_latest_epoch_oid()
 {
   if (cct->_conf->rgw_period_latest_epoch_info_oid.empty()) {
@@ -1122,14 +1131,15 @@ const string RGWPeriod::get_period_oid()
   return oss.str();
 }
 
-int RGWPeriod::read_latest_epoch(RGWPeriodLatestEpochInfo& info)
+int RGWPeriod::read_latest_epoch(RGWPeriodLatestEpochInfo& info,
+                                 RGWObjVersionTracker *objv)
 {
   string oid = get_period_oid_prefix() + get_latest_epoch_oid();
 
   rgw_pool pool(get_pool(cct));
   bufferlist bl;
   RGWObjectCtx obj_ctx(store);
-  int ret = rgw_get_system_obj(store, obj_ctx, pool, oid, bl, NULL, NULL);
+  int ret = rgw_get_system_obj(store, obj_ctx, pool, oid, bl, objv, nullptr);
   if (ret < 0) {
     ldout(cct, 1) << "error read_lastest_epoch " << pool << ":" << oid << dendl;
     return ret;
@@ -1172,7 +1182,8 @@ int RGWPeriod::use_latest_epoch()
   return 0;
 }
 
-int RGWPeriod::set_latest_epoch(epoch_t epoch, bool exclusive)
+int RGWPeriod::set_latest_epoch(epoch_t epoch, bool exclusive,
+                                RGWObjVersionTracker *objv)
 {
   string oid = get_period_oid_prefix() + get_latest_epoch_oid();
 
@@ -1185,7 +1196,52 @@ int RGWPeriod::set_latest_epoch(epoch_t epoch, bool exclusive)
   ::encode(info, bl);
 
   return rgw_put_system_obj(store, pool, oid, bl.c_str(), bl.length(),
-                            exclusive, NULL, real_time(), NULL);
+                            exclusive, objv, real_time(), nullptr);
+}
+
+int RGWPeriod::update_latest_epoch(epoch_t epoch)
+{
+  static constexpr int MAX_RETRIES = 20;
+
+  for (int i = 0; i < MAX_RETRIES; i++) {
+    RGWPeriodLatestEpochInfo info;
+    RGWObjVersionTracker objv;
+    bool exclusive = false;
+
+    // read existing epoch
+    int r = read_latest_epoch(info, &objv);
+    if (r == -ENOENT) {
+      // use an exclusive create to set the epoch atomically
+      exclusive = true;
+      ldout(cct, 20) << "creating initial latest_epoch=" << epoch
+          << " for period=" << id << dendl;
+    } else if (r < 0) {
+      ldout(cct, 0) << "ERROR: failed to read latest_epoch" << dendl;
+      return r;
+    } else if (epoch <= info.epoch) {
+      r = -EEXIST; // fail with EEXIST if epoch is not newer
+      ldout(cct, 1) << "found existing latest_epoch " << info.epoch
+          << " >= given epoch " << epoch << ", returning r=" << r << dendl;
+      return r;
+    } else {
+      ldout(cct, 20) << "updating latest_epoch from " << info.epoch
+          << " -> " << epoch << " on period=" << id << dendl;
+    }
+
+    r = set_latest_epoch(epoch, exclusive, &objv);
+    if (r == -EEXIST) {
+      continue; // exclusive create raced with another update, retry
+    } else if (r == -ECANCELED) {
+      continue; // write raced with a conflicting version, retry
+    }
+    if (r < 0) {
+      ldout(cct, 0) << "ERROR: failed to write latest_epoch" << dendl;
+      return r;
+    }
+    return 0; // return success
+  }
+
+  return -ECANCELED; // fail after max retries
 }
 
 int RGWPeriod::delete_obj()
@@ -1255,6 +1311,7 @@ int RGWPeriod::create(bool exclusive)
   ret = store_info(exclusive);
   if (ret < 0) {
     ldout(cct, 0) << "ERROR:  storing info for " << id << ": " << cpp_strerror(-ret) << dendl;
+    return ret;
   }
 
   ret = set_latest_epoch(epoch);
@@ -1267,31 +1324,14 @@ int RGWPeriod::create(bool exclusive)
 
 int RGWPeriod::store_info(bool exclusive)
 {
-  epoch_t latest_epoch = FIRST_EPOCH - 1;
-  int ret = get_latest_epoch(latest_epoch);
-  if (ret < 0 && ret != -ENOENT) {
-    ldout(cct, 0) << "ERROR: RGWPeriod::get_latest_epoch() returned " << cpp_strerror(-ret) << dendl;
-    return ret;
-  }
-
   rgw_pool pool(get_pool(cct));
 
   string oid = get_period_oid();
   bufferlist bl;
   ::encode(*this, bl);
-  ret = rgw_put_system_obj(store, pool, oid, bl.c_str(), bl.length(), exclusive, NULL, real_time(), NULL);
-  if (ret < 0) {
-    ldout(cct, 0) << "ERROR: rgw_put_system_obj(" << pool << ":" << oid << "): " << cpp_strerror(-ret) << dendl;
-    return ret;
-  }
-  if (latest_epoch < epoch) {
-    ret = set_latest_epoch(epoch);
-    if (ret < 0) {
-      ldout(cct, 0) << "ERROR: RGWPeriod::set_latest_epoch() returned " << cpp_strerror(-ret) << dendl;
-      return ret;
-    }
-  }
-  return 0;
+
+  return rgw_put_system_obj(store, pool, oid, bl.c_str(), bl.length(),
+                            exclusive, NULL, real_time(), NULL);
 }
 
 rgw_pool RGWPeriod::get_pool(CephContext *cct)
@@ -1302,28 +1342,6 @@ rgw_pool RGWPeriod::get_pool(CephContext *cct)
   return rgw_pool(cct->_conf->rgw_period_root_pool);
 }
 
-int RGWPeriod::use_next_epoch()
-{
-  epoch_t latest_epoch;
-  int ret = get_latest_epoch(latest_epoch);
-  if (ret < 0) {
-    return ret;
-  }
-  epoch = latest_epoch + 1;
-  ret = read_info();
-  if (ret < 0 && ret != -ENOENT) {
-    return ret;
-  }
-  if (ret == -ENOENT) {
-    ret = create();
-    if (ret < 0) {
-      ldout(cct, 0) << "Error creating new epoch " << epoch << dendl;
-      return ret;
-    }
-  }
-  return 0;
-}
-
 int RGWPeriod::add_zonegroup(const RGWZoneGroup& zonegroup)
 {
   if (zonegroup.realm_id != realm_id) {
@@ -1565,7 +1583,11 @@ int RGWPeriod::commit(RGWRealm& realm, const RGWPeriod& current_period,
     return r;
   }
   // set as latest epoch
-  r = set_latest_epoch(epoch);
+  r = update_latest_epoch(epoch);
+  if (r == -EEXIST) {
+    // already have this epoch (or a more recent one)
+    return 0;
+  }
   if (r < 0) {
     ldout(cct, 0) << "failed to set latest epoch: " << cpp_strerror(-r) << dendl;
     return r;
@@ -1624,6 +1646,7 @@ int get_zones_pool_set(CephContext* cct,
       pool_names.insert(zone.user_swift_pool);
       pool_names.insert(zone.user_uid_pool);
       pool_names.insert(zone.roles_pool);
+      pool_names.insert(zone.reshard_pool);
       for(auto& iter : zone.placement_pools) {
        pool_names.insert(iter.second.index_pool);
        pool_names.insert(iter.second.data_pool);
@@ -1694,6 +1717,7 @@ int RGWZoneParams::fix_pool_names()
   user_swift_pool = fix_zone_pool_dup(pools, name, ".rgw.meta:users.swift", user_swift_pool);
   user_uid_pool = fix_zone_pool_dup(pools, name, ".rgw.meta:users.uid", user_uid_pool);
   roles_pool = fix_zone_pool_dup(pools, name, ".rgw.meta:roles", roles_pool);
+  reshard_pool = fix_zone_pool_dup(pools, name, ".rgw.log:reshard", reshard_pool);
 
   for(auto& iter : placement_pools) {
     iter.second.index_pool = fix_zone_pool_dup(pools, name, "." + default_bucket_index_pool_suffix,
@@ -1790,9 +1814,9 @@ int RGWZoneParams::read_default_id(string& default_id, bool old_format)
     /* try using default realm */
     RGWRealm realm;
     int ret = realm.init(cct, store);
+    //no default realm exist
     if (ret < 0) {
-      ldout(cct, 10) << "could not read realm id: " << cpp_strerror(-ret) << dendl;
-      return -ENOENT;
+      return read_id(default_zone_name, default_id);
     }
     realm_id = realm.get_id();
   }
@@ -1852,7 +1876,7 @@ void RGWPeriodMap::decode(bufferlist::iterator& bl) {
        iter != zonegroups.end(); ++iter) {
     RGWZoneGroup& zonegroup = iter->second;
     zonegroups_by_api[zonegroup.api_name] = zonegroup;
-    if (zonegroup.is_master) {
+    if (zonegroup.is_master_zonegroup()) {
       master_zonegroup = zonegroup.get_id();
     }
   }
@@ -1873,7 +1897,7 @@ static uint32_t gen_short_zone_id(const std::string zone_id)
 
 int RGWPeriodMap::update(const RGWZoneGroup& zonegroup, CephContext *cct)
 {
-  if (zonegroup.is_master && (!master_zonegroup.empty() && zonegroup.get_id() != master_zonegroup)) {
+  if (zonegroup.is_master_zonegroup() && (!master_zonegroup.empty() && zonegroup.get_id() != master_zonegroup)) {
     ldout(cct,0) << "Error updating periodmap, multiple master zonegroups configured "<< dendl;
     ldout(cct,0) << "master zonegroup: " << master_zonegroup << " and  " << zonegroup.get_id() <<dendl;
     return -EINVAL;
@@ -1891,7 +1915,7 @@ int RGWPeriodMap::update(const RGWZoneGroup& zonegroup, CephContext *cct)
     zonegroups_by_api[zonegroup.api_name] = zonegroup;
   }
 
-  if (zonegroup.is_master) {
+  if (zonegroup.is_master_zonegroup()) {
     master_zonegroup = zonegroup.get_id();
   } else if (master_zonegroup == zonegroup.get_id()) {
     master_zonegroup = "";
@@ -1993,7 +2017,7 @@ void RGWZoneGroupMap::decode(bufferlist::iterator& bl) {
        iter != zonegroups.end(); ++iter) {
     RGWZoneGroup& zonegroup = iter->second;
     zonegroups_by_api[zonegroup.api_name] = zonegroup;
-    if (zonegroup.is_master) {
+    if (zonegroup.is_master_zonegroup()) {
       master_zonegroup = zonegroup.get_name();
     }
   }
@@ -2385,9 +2409,10 @@ void RGWObjVersionTracker::generate_new_write_ver(CephContext *cct)
 int RGWPutObjProcessor::complete(size_t accounted_size, const string& etag,
                                  real_time *mtime, real_time set_mtime,
                                  map<string, bufferlist>& attrs, real_time delete_at,
-                                 const char *if_match, const char *if_nomatch, const string *user_data)
+                                 const char *if_match, const char *if_nomatch, const string *user_data,
+                                 rgw_zone_set *zones_trace)
 {
-  int r = do_complete(accounted_size, etag, mtime, set_mtime, attrs, delete_at, if_match, if_nomatch, user_data);
+  int r = do_complete(accounted_size, etag, mtime, set_mtime, attrs, delete_at, if_match, if_nomatch, user_data, zones_trace);
   if (r < 0)
     return r;
 
@@ -2560,8 +2585,10 @@ int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phan
 
   *pobj = cur_obj;
 
-  if (!bl.length())
+  if (!bl.length()) {
+    *phandle = nullptr;
     return 0;
+  }
 
   return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle, exclusive);
 }
@@ -2686,7 +2713,7 @@ int RGWPutObjProcessor_Atomic::complete_writing_data()
     obj_len = (uint64_t)first_chunk.length();
   }
   while (pending_data_bl.length()) {
-    void *handle;
+    void *handle = nullptr;
     rgw_raw_obj obj;
     uint64_t max_write_size = MIN(max_chunk_size, (uint64_t)next_part_ofs - data_ofs);
     if (max_write_size > pending_data_bl.length()) {
@@ -2732,7 +2759,8 @@ int RGWPutObjProcessor_Atomic::do_complete(size_t accounted_size, const string&
                                            map<string, bufferlist>& attrs,
                                            real_time delete_at,
                                            const char *if_match,
-                                           const char *if_nomatch, const string *user_data) {
+                                           const char *if_nomatch, const string *user_data,
+                                           rgw_zone_set *zones_trace) {
   int r = complete_writing_data();
   if (r < 0)
     return r;
@@ -2758,6 +2786,8 @@ int RGWPutObjProcessor_Atomic::do_complete(size_t accounted_size, const string&
   obj_op.meta.olh_epoch = olh_epoch;
   obj_op.meta.delete_at = delete_at;
   obj_op.meta.user_data = user_data;
+  obj_op.meta.zones_trace = zones_trace;
+  obj_op.meta.modify_tail = true;
 
   r = obj_op.write_meta(obj_len, accounted_size, attrs);
   if (r < 0) {
@@ -2769,6 +2799,22 @@ int RGWPutObjProcessor_Atomic::do_complete(size_t accounted_size, const string&
   return 0;
 }
 
+const char* RGWRados::admin_commands[4][3] = {
+  { "cache list",
+    "cache list name=filter,type=CephString,req=false",
+    "cache list [filter_str]: list object cache, possibly matching substrings" },
+  { "cache inspect",
+    "cache inspect name=target,type=CephString,req=true",
+    "cache inspect target: print cache element" },
+  { "cache erase",
+    "cache erase name=target,type=CephString,req=true",
+    "cache erase target: erase element from cache" },
+  { "cache zap",
+    "cache zap",
+    "cache zap: erase all elements from cache" }
+};
+
+
 int RGWRados::watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx) {
   int r = control_pool_ctx.watch2(oid, watch_handle, ctx);
   if (r < 0)
@@ -2946,10 +2992,20 @@ class RGWRadosThread {
     Mutex lock;
     Cond cond;
 
+    void wait() {
+      Mutex::Locker l(lock);
+      cond.Wait(lock);
+    };
+
+    void wait_interval(const utime_t& wait_time) {
+      Mutex::Locker l(lock);
+      cond.WaitInterval(lock, wait_time);
+    }
+
   public:
     Worker(CephContext *_cct, RGWRadosThread *_p) : cct(_cct), processor(_p), lock("RGWRadosThread::Worker") {}
     void *entry() override;
-    void stop() {
+    void signal() {
       Mutex::Locker l(lock);
       cond.Signal();
     }
@@ -2981,6 +3037,12 @@ public:
 
   void start();
   void stop();
+
+  void signal() {
+    if (worker) {
+      worker->signal();
+    }
+  }
 };
 
 void RGWRadosThread::start()
@@ -2994,7 +3056,7 @@ void RGWRadosThread::stop()
   down_flag = true;
   stop_process();
   if (worker) {
-    worker->stop();
+    worker->signal();
     worker->join();
   }
   delete worker;
@@ -3031,13 +3093,9 @@ void *RGWRadosThread::Worker::entry() {
       utime_t wait_time = interval;
       wait_time -= end;
 
-      lock.Lock();
-      cond.WaitInterval(lock, wait_time);
-      lock.Unlock();
+      wait_interval(wait_time);
     } else {
-      lock.Lock();
-      cond.Wait(lock);
-      lock.Unlock();
+      wait();
     }
   } while (!processor->going_down());
 
@@ -3081,7 +3139,7 @@ class RGWDataNotifier : public RGWRadosThread {
   RGWDataNotifierManager notify_mgr;
 
   uint64_t interval_msec() override {
-    return cct->_conf->rgw_md_notify_interval_msec;
+    return cct->_conf->get_val<int64_t>("rgw_data_notify_interval_msec");
   }
 public:
   RGWDataNotifier(RGWRados *_store) : RGWRadosThread(_store, "data-notifier"), notify_mgr(_store) {}
@@ -3175,8 +3233,10 @@ class RGWDataSyncProcessorThread : public RGWSyncProcessorThread
   }
 public:
   RGWDataSyncProcessorThread(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
-                             const string& _source_zone)
-    : RGWSyncProcessorThread(_store, "data-sync"), sync(_store, async_rados, _source_zone),
+                             const string& _source_zone,
+                             rgw::BucketChangeObserver *observer)
+    : RGWSyncProcessorThread(_store, "data-sync"),
+      sync(_store, async_rados, _source_zone, observer),
       initialized(false) {}
 
   void wakeup_sync_shards(map<int, set<string> >& shard_ids) {
@@ -3212,15 +3272,18 @@ class RGWSyncLogTrimThread : public RGWSyncProcessorThread
 {
   RGWCoroutinesManager crs;
   RGWRados *store;
+  rgw::BucketTrimManager *bucket_trim;
   RGWHTTPManager http;
   const utime_t trim_interval;
 
   uint64_t interval_msec() override { return 0; }
   void stop_process() override { crs.stop(); }
 public:
-  RGWSyncLogTrimThread(RGWRados *store, int interval)
+  RGWSyncLogTrimThread(RGWRados *store, rgw::BucketTrimManager *bucket_trim,
+                       int interval)
     : RGWSyncProcessorThread(store, "sync-log-trim"),
       crs(store->ctx(), store->get_cr_registry()), store(store),
+      bucket_trim(bucket_trim),
       http(store->ctx(), crs.get_completion_mgr()),
       trim_interval(interval, 0)
   {}
@@ -3242,6 +3305,10 @@ public:
                                        trim_interval));
     stacks.push_back(data);
 
+    auto bucket = new RGWCoroutinesStack(store->ctx(), &crs);
+    bucket->call(bucket_trim->create_bucket_trim_cr(&http));
+    stacks.push_back(bucket);
+
     crs.run(stacks);
     return 0;
   }
@@ -3327,7 +3394,7 @@ int RGWRados::get_required_alignment(const rgw_pool& pool, uint64_t *alignment)
 
 int RGWRados::get_max_chunk_size(const rgw_pool& pool, uint64_t *max_chunk_size)
 {
-  uint64_t alignment;
+  uint64_t alignment = 0;
   int r = get_required_alignment(pool, &alignment);
   if (r < 0) {
     return r;
@@ -3362,8 +3429,278 @@ int RGWRados::get_max_chunk_size(const string& placement_rule, const rgw_obj& ob
   return get_max_chunk_size(pool, max_chunk_size);
 }
 
+class RGWIndexCompletionManager;
+
+struct complete_op_data {
+  Mutex lock{"complete_op_data"};
+  AioCompletion *rados_completion{nullptr};
+  int manager_shard_id{-1};
+  RGWIndexCompletionManager *manager{nullptr};
+  rgw_obj obj;
+  RGWModifyOp op;
+  string tag;
+  rgw_bucket_entry_ver ver;
+  cls_rgw_obj_key key;
+  rgw_bucket_dir_entry_meta dir_meta;
+  list<cls_rgw_obj_key> remove_objs;
+  bool log_op;
+  uint16_t bilog_op;
+  rgw_zone_set zones_trace;
+
+  bool stopped{false};
+
+  void stop() {
+    Mutex::Locker l(lock);
+    stopped = true;
+  }
+};
+
+class RGWIndexCompletionThread : public RGWRadosThread {
+  RGWRados *store;
+
+  uint64_t interval_msec() override {
+    return 0;
+  }
+
+  list<complete_op_data *> completions;
+
+  Mutex completions_lock;
+public:
+  RGWIndexCompletionThread(RGWRados *_store)
+    : RGWRadosThread(_store, "index-complete"), store(_store), completions_lock("RGWIndexCompletionThread::completions_lock") {}
+
+  int process() override;
+
+  void add_completion(complete_op_data *completion) {
+    {
+      Mutex::Locker l(completions_lock);
+      completions.push_back(completion);
+    }
+
+    signal();
+  }
+};
+
+int RGWIndexCompletionThread::process()
+{
+  list<complete_op_data *> comps;
+
+  {
+    Mutex::Locker l(completions_lock);
+    completions.swap(comps);
+  }
+
+  for (auto c : comps) {
+    std::unique_ptr<complete_op_data> up{c};
+
+    if (going_down()) {
+      continue;
+    }
+    ldout(store->ctx(), 20) << __func__ << "(): handling completion for key=" << c->key << dendl;
+
+    RGWRados::BucketShard bs(store);
+
+    int r = bs.init(c->obj.bucket, c->obj);
+    if (r < 0) {
+      ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to initialize BucketShard, obj=" << c->obj << " r=" << r << dendl;
+      /* not much to do */
+      continue;
+    }
+
+    r = store->guard_reshard(&bs, c->obj, [&](RGWRados::BucketShard *bs) -> int { 
+                             librados::ObjectWriteOperation o;
+                             cls_rgw_guard_bucket_resharding(o, -ERR_BUSY_RESHARDING);
+                             cls_rgw_bucket_complete_op(o, c->op, c->tag, c->ver, c->key, c->dir_meta, &c->remove_objs,
+                                                        c->log_op, c->bilog_op, &c->zones_trace);
+
+                             return bs->index_ctx.operate(bs->bucket_obj, &o);
+                             });
+    if (r < 0) {
+      ldout(cct, 0) << "ERROR: " << __func__ << "(): bucket index completion failed, obj=" << c->obj << " r=" << r << dendl;
+      /* ignoring error, can't do anything about it */
+      continue;
+    }
+    r = store->data_log->add_entry(bs.bucket, bs.shard_id);
+    if (r < 0) {
+      lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
+    }
+  }
+
+  return 0;
+}
+
+class RGWIndexCompletionManager {
+  RGWRados *store{nullptr};
+  vector<Mutex *> locks;
+  vector<set<complete_op_data *> > completions;
+
+  RGWIndexCompletionThread *completion_thread{nullptr};
+
+  int num_shards;
+
+  std::atomic<int> cur_shard {0};
+
+
+public:
+  RGWIndexCompletionManager(RGWRados *_store) : store(_store) {
+    num_shards = store->ctx()->_conf->rgw_thread_pool_size;
+
+    for (int i = 0; i < num_shards; i++) {
+      char buf[64];
+      snprintf(buf, sizeof(buf), "RGWIndexCompletionManager::lock::%d", i);
+      locks.push_back(new Mutex(buf));
+    }
+
+    completions.resize(num_shards);
+  }
+  ~RGWIndexCompletionManager() {
+    stop();
+
+    for (auto l : locks) {
+      delete l;
+    }
+  }
+
+  int next_shard() {
+    int result = cur_shard % num_shards;
+    cur_shard++;
+    return result;
+  }
+
+  void create_completion(const rgw_obj& obj,
+                         RGWModifyOp op, string& tag,
+                         rgw_bucket_entry_ver& ver,
+                         const cls_rgw_obj_key& key,
+                         rgw_bucket_dir_entry_meta& dir_meta,
+                         list<cls_rgw_obj_key> *remove_objs, bool log_op,
+                         uint16_t bilog_op,
+                         rgw_zone_set *zones_trace,
+                         complete_op_data **result);
+  bool handle_completion(completion_t cb, complete_op_data *arg);
+
+  int start() {
+    completion_thread = new RGWIndexCompletionThread(store);
+    int ret = completion_thread->init();
+    if (ret < 0) {
+      return ret;
+    }
+    completion_thread->start();
+    return 0;
+  }
+  void stop() {
+    if (completion_thread) {
+      completion_thread->stop();
+      delete completion_thread;
+    }
+
+    for (int i = 0; i < num_shards; ++i) {
+      Mutex::Locker l(*locks[i]);
+      for (auto c : completions[i]) {
+        Mutex::Locker cl(c->lock);
+        c->stop();
+      }
+    }
+    completions.clear();
+  }
+};
+
+static void obj_complete_cb(completion_t cb, void *arg)
+{
+  complete_op_data *completion = (complete_op_data *)arg;
+  completion->lock.Lock();
+  if (completion->stopped) {
+    completion->lock.Unlock(); /* can drop lock, no one else is referencing us */
+    delete completion;
+    return;
+  }
+  bool need_delete = completion->manager->handle_completion(cb, completion);
+  completion->lock.Unlock();
+  if (need_delete) {
+    delete completion;
+  }
+}
+
+
+void RGWIndexCompletionManager::create_completion(const rgw_obj& obj,
+                                                  RGWModifyOp op, string& tag,
+                                                  rgw_bucket_entry_ver& ver,
+                                                  const cls_rgw_obj_key& key,
+                                                  rgw_bucket_dir_entry_meta& dir_meta,
+                                                  list<cls_rgw_obj_key> *remove_objs, bool log_op,
+                                                  uint16_t bilog_op,
+                                                  rgw_zone_set *zones_trace,
+                                                  complete_op_data **result)
+{
+  complete_op_data *entry = new complete_op_data;
+
+  int shard_id = next_shard();
+
+  entry->manager_shard_id = shard_id;
+  entry->manager = this;
+  entry->obj = obj;
+  entry->op = op;
+  entry->tag = tag;
+  entry->ver = ver;
+  entry->key = key;
+  entry->dir_meta = dir_meta;
+  entry->log_op = log_op;
+  entry->bilog_op = bilog_op;
+
+  if (remove_objs) {
+    for (auto iter = remove_objs->begin(); iter != remove_objs->end(); ++iter) {
+      entry->remove_objs.push_back(*iter);
+    }
+  }
+
+  if (zones_trace) {
+    entry->zones_trace = *zones_trace;
+  } else {
+    entry->zones_trace.insert(store->get_zone().id);
+  }
+
+  *result = entry;
+
+  entry->rados_completion = librados::Rados::aio_create_completion(entry, NULL, obj_complete_cb);
+
+  Mutex::Locker l(*locks[shard_id]);
+  completions[shard_id].insert(entry);
+}
+
+bool RGWIndexCompletionManager::handle_completion(completion_t cb, complete_op_data *arg)
+{
+  int shard_id = arg->manager_shard_id;
+  {
+    Mutex::Locker l(*locks[shard_id]);
+
+    auto& comps = completions[shard_id];
+
+    auto iter = comps.find(arg);
+    if (iter == comps.end()) {
+      return true;
+    }
+
+    comps.erase(iter);
+  }
+
+  int r = rados_aio_get_return_value(cb);
+  if (r != -ERR_BUSY_RESHARDING) {
+    return true;
+  }
+  completion_thread->add_completion(arg);
+  return false;
+}
+
 void RGWRados::finalize()
 {
+  auto admin_socket = cct->get_admin_socket();
+  for (auto cmd : admin_commands) {
+    int r = admin_socket->unregister_command(cmd[0]);
+    if (r < 0) {
+      lderr(cct) << "ERROR: fail to unregister admin socket command (r=" << r
+                 << ")" << dendl;
+    }
+  }
+
   if (run_sync_thread) {
     Mutex::Locker l(meta_sync_thread_lock);
     meta_sync_processor_thread->stop();
@@ -3391,6 +3728,7 @@ void RGWRados::finalize()
     data_sync_processor_threads.clear();
     delete sync_log_trimmer;
     sync_log_trimmer = nullptr;
+    bucket_trim = boost::none;
   }
   if (finisher) {
     finisher->stop();
@@ -3417,19 +3755,13 @@ void RGWRados::finalize()
   if (async_rados) {
     delete async_rados;
   }
-  if (use_gc_thread) {
-    gc->stop_processor();
-    obj_expirer->stop_processor();
-  }
+  
+  delete lc;
+  lc = NULL; 
+
   delete gc;
   gc = NULL;
 
-  if (use_lc_thread) {
-    lc->stop_processor();
-  }
-  delete lc;
-  lc = NULL;
-
   delete obj_expirer;
   obj_expirer = NULL;
 
@@ -3453,6 +3785,17 @@ void RGWRados::finalize()
   delete binfo_cache;
   delete obj_tombstone_cache;
   delete sync_modules_manager;
+
+  if (reshard_wait.get()) {
+    reshard_wait->stop();
+    reshard_wait.reset();
+  }
+
+  if (run_reshard_thread) {
+    reshard->stop_processor();
+  }
+  delete reshard;
+  delete index_completion_manager;
 }
 
 /** 
@@ -3462,6 +3805,17 @@ void RGWRados::finalize()
 int RGWRados::init_rados()
 {
   int ret = 0;
+  auto admin_socket = cct->get_admin_socket();
+  for (auto cmd : admin_commands) {
+    int r = admin_socket->register_command(cmd[0], cmd[1], this,
+                                           cmd[2]);
+    if (r < 0) {
+      lderr(cct) << "ERROR: fail to register admin socket command (r=" << r
+                 << ")" << dendl;
+      return r;
+    }
+  }
+
   auto handles = std::vector<librados::Rados>{cct->_conf->rgw_num_rados_handles};
 
   for (auto& r : handles) {
@@ -3469,7 +3823,6 @@ int RGWRados::init_rados()
     if (ret < 0) {
       return ret;
     }
-
     ret = r.connect();
     if (ret < 0) {
       return ret;
@@ -3495,6 +3848,28 @@ int RGWRados::init_rados()
   return ret;
 }
 
+
+int RGWRados::register_to_service_map(const string& daemon_type, const map<string, string>& meta)
+{
+  map<string,string> metadata = meta;
+  metadata["num_handles"] = stringify(rados.size());
+  metadata["zonegroup_id"] = zonegroup.get_id();
+  metadata["zonegroup_name"] = zonegroup.get_name();
+  metadata["zone_name"] = zone_name();
+  metadata["zone_id"] = zone_id();;
+  string name = cct->_conf->name.get_id();
+  if (name.find("rgw.") == 0) {
+    name = name.substr(4);
+  }
+  int ret = rados[0].service_daemon_register(daemon_type, name, metadata);
+  if (ret < 0) {
+    ldout(cct, 0) << "ERROR: service_daemon_register() returned ret=" << ret << ": " << cpp_strerror(-ret) << dendl;
+    return ret;
+  }
+
+  return 0;
+}
+
 /**
  * Add new connection to connections map
  * @param zonegroup_conn_map map which new connection will be added to
@@ -3655,7 +4030,7 @@ int RGWRados::replace_region_with_zonegroup()
          ldout(cct, 0) <<  __func__ << " failed init region "<< *iter << ": " << cpp_strerror(-ret) << dendl;
          return ret;
       }
-      if (region.is_master) {
+      if (region.is_master_zonegroup()) {
        master_region = region.get_id();
        master_zone = region.master_zone;
       }
@@ -3947,7 +4322,7 @@ int RGWRados::init_zg_from_local(bool *creating_defaults)
     }
   }
   ldout(cct, 20) << "zonegroup " << zonegroup.get_name() << dendl;
-  if (zonegroup.is_master) {
+  if (zonegroup.is_master_zonegroup()) {
     // use endpoints from the zonegroup's master zone
     auto master = zonegroup.zones.find(zonegroup.master_zone);
     if (master == zonegroup.zones.end()) {
@@ -4075,10 +4450,12 @@ int RGWRados::init_complete()
 
   zone_short_id = current_period.get_map().get_zone_short_id(zone_params.get_id());
 
-  ret = sync_modules_manager->create_instance(cct, zone_public_config.tier_type, zone_params.tier_config, &sync_module);
-  if (ret < 0) {
-    lderr(cct) << "ERROR: failed to init sync module instance, ret=" << ret << dendl;
-    return ret;
+  if (run_sync_thread) {
+    ret = sync_modules_manager->create_instance(cct, zone_public_config.tier_type, zone_params.tier_config, &sync_module);
+    if (ret < 0) {
+      lderr(cct) << "ERROR: failed to init sync module instance, ret=" << ret << dendl;
+      return ret;
+    }
   }
 
   writeable_zone = (zone_public_config.tier_type.empty() || zone_public_config.tier_type == "rgw");
@@ -4107,7 +4484,7 @@ int RGWRados::init_complete()
     zone_id_by_name[z.name] = id;
     zone_by_id[id] = z;
   }
-  
+
   if (zone_by_id.find(zone_id()) == zone_by_id.end()) {
     ldout(cct, 0) << "WARNING: could not find zone config in zonegroup for local zone (" << zone_id() << "), will use defaults" << dendl;
   }
@@ -4154,6 +4531,10 @@ int RGWRados::init_complete()
   if (ret < 0)
     return ret;
 
+  ret = open_reshard_pool_ctx();
+  if (ret < 0)
+    return ret;
+
   pools_initialized = true;
 
   gc = new RGWGC();
@@ -4166,13 +4547,6 @@ int RGWRados::init_complete()
     obj_expirer->start_processor();
   }
 
-  if (run_sync_thread) {
-    // initialize the log period history. we want to do this any time we're not
-    // running under radosgw-admin, so we check run_sync_thread here before
-    // disabling it based on the zone/zonegroup setup
-    meta_mgr->init_oldest_log_period();
-  }
-
   /* no point of running sync thread if we don't have a master zone configured
     or there is no rest_master_conn */
   if (get_zonegroup().master_zone.empty() || !rest_master_conn
@@ -4180,6 +4554,11 @@ int RGWRados::init_complete()
     run_sync_thread = false;
   }
 
+  if (run_sync_thread) {
+    // initialize the log period history
+    meta_mgr->init_oldest_log_period();
+  }
+
   async_rados = new RGWAsyncRadosProcessor(this, cct->_conf->rgw_num_async_rados_threads);
   async_rados->start();
 
@@ -4206,10 +4585,22 @@ int RGWRados::init_complete()
     }
     meta_sync_processor_thread->start();
 
+    // configure the bucket trim manager
+    rgw::BucketTrimConfig config;
+    rgw::configure_bucket_trim(cct, config);
+
+    bucket_trim.emplace(this, config);
+    ret = bucket_trim->init();
+    if (ret < 0) {
+      ldout(cct, 0) << "ERROR: failed to start bucket trim manager" << dendl;
+      return ret;
+    }
+
     Mutex::Locker dl(data_sync_thread_lock);
     for (auto iter : zone_data_sync_from_map) {
       ldout(cct, 5) << "starting data sync thread for zone " << iter.first << dendl;
-      RGWDataSyncProcessorThread *thread = new RGWDataSyncProcessorThread(this, async_rados, iter.first);
+      auto *thread = new RGWDataSyncProcessorThread(this, async_rados, iter.first,
+                                                    &*bucket_trim);
       ret = thread->init();
       if (ret < 0) {
         ldout(cct, 0) << "ERROR: failed to initialize data sync thread" << dendl;
@@ -4220,7 +4611,7 @@ int RGWRados::init_complete()
     }
     auto interval = cct->_conf->rgw_sync_log_trim_interval;
     if (interval > 0) {
-      sync_log_trimmer = new RGWSyncLogTrimThread(this, interval);
+      sync_log_trimmer = new RGWSyncLogTrimThread(this, &*bucket_trim, interval);
       ret = sync_log_trimmer->init();
       if (ret < 0) {
         ldout(cct, 0) << "ERROR: failed to initialize sync log trim thread" << dendl;
@@ -4234,18 +4625,18 @@ int RGWRados::init_complete()
 
   lc = new RGWLC();
   lc->initialize(cct, this);
-  
+
   if (use_lc_thread)
     lc->start_processor();
-  
+
   quota_handler = RGWQuotaHandler::generate_handler(this, quota_threads);
 
   bucket_index_max_shards = (cct->_conf->rgw_override_bucket_index_max_shards ? cct->_conf->rgw_override_bucket_index_max_shards :
                              get_zone().bucket_index_max_shards);
-  if (bucket_index_max_shards > MAX_BUCKET_INDEX_SHARDS_PRIME) {
-    bucket_index_max_shards = MAX_BUCKET_INDEX_SHARDS_PRIME;
+  if (bucket_index_max_shards > get_max_bucket_shards()) {
+    bucket_index_max_shards = get_max_bucket_shards();
     ldout(cct, 1) << __func__ << " bucket index max shards is too large, reset to value: "
-      << MAX_BUCKET_INDEX_SHARDS_PRIME << dendl;
+      << get_max_bucket_shards() << dendl;
   }
   ldout(cct, 20) << __func__ << " bucket index max shards: " << bucket_index_max_shards << dendl;
 
@@ -4258,6 +4649,19 @@ int RGWRados::init_complete()
     obj_tombstone_cache = new tombstone_cache_t(cct->_conf->rgw_obj_tombstone_cache_size);
   }
 
+  reshard_wait = std::make_shared<RGWReshardWait>(this);
+
+  reshard = new RGWReshard(this);
+
+  /* only the master zone in the zonegroup reshards buckets */
+  run_reshard_thread = run_reshard_thread && (get_zonegroup().master_zone == zone_public_config.id);
+  if (run_reshard_thread)  {
+    reshard->start_processor();
+  }
+
+  index_completion_manager = new RGWIndexCompletionManager(this);
+  ret = index_completion_manager->start();
+
   return ret;
 }
 
@@ -4404,6 +4808,11 @@ int RGWRados::open_objexp_pool_ctx()
   return rgw_init_ioctx(get_rados_handle(), get_zone_params().log_pool, objexp_pool_ctx, true);
 }
 
+int RGWRados::open_reshard_pool_ctx()
+{
+  return rgw_init_ioctx(get_rados_handle(), get_zone_params().reshard_pool, reshard_pool_ctx, true);
+}
+
 int RGWRados::init_watch()
 {
   int r = rgw_init_ioctx(&rados[0], get_zone_params().control_pool, control_pool_ctx, true);
@@ -4460,21 +4869,10 @@ void RGWRados::pick_control_oid(const string& key, string& notify_oid)
   notify_oid.append(buf);
 }
 
-int RGWRados::open_pool_ctx(const rgw_pool& pool, librados::IoCtx&  io_ctx)
+int RGWRados::open_pool_ctx(const rgw_pool& pool, librados::IoCtx& io_ctx)
 {
-  librados::Rados *rad = get_rados_handle();
-  int r = rgw_init_ioctx(rad, pool, io_ctx);
-  if (r != -ENOENT)
-    return r;
-
-  if (!pools_initialized)
-    return r;
-
-  r = rad->pool_create(pool.name.c_str());
-  if (r < 0 && r != -EEXIST)
-    return r;
-
-  return rgw_init_ioctx(rad, pool, io_ctx);
+  constexpr bool create = true; // create the pool if it doesn't exist
+  return rgw_init_ioctx(get_rados_handle(), pool, io_ctx, create);
 }
 
 void RGWRados::build_bucket_index_marker(const string& shard_id_str, const string& shard_marker,
@@ -4488,6 +4886,12 @@ void RGWRados::build_bucket_index_marker(const string& shard_id_str, const strin
 
 int RGWRados::open_bucket_index_ctx(const RGWBucketInfo& bucket_info, librados::IoCtx& index_ctx)
 {
+  const rgw_pool& explicit_pool = bucket_info.bucket.explicit_placement.index_pool;
+
+  if (!explicit_pool.empty()) {
+    return open_pool_ctx(explicit_pool, index_ctx);
+  }
+
   const string *rule = &bucket_info.placement_rule;
   if (rule->empty()) {
     rule = &zonegroup.default_placement;
@@ -4682,12 +5086,12 @@ static void usage_log_hash(CephContext *cct, const string& name, string& hash, u
   uint32_t val = index;
 
   if (!name.empty()) {
-    int max_user_shards = max(cct->_conf->rgw_usage_max_user_shards, 1);
+    int max_user_shards = cct->_conf->rgw_usage_max_user_shards;
     val %= max_user_shards;
     val += ceph_str_hash_linux(name.c_str(), name.size());
   }
   char buf[17];
-  int max_shards = max(cct->_conf->rgw_usage_max_shards, 1);
+  int max_shards = cct->_conf->rgw_usage_max_shards;
   snprintf(buf, sizeof(buf), RGW_USAGE_OBJ_PREFIX "%u", (unsigned)(val % max_shards));
   hash = buf;
 }
@@ -4787,28 +5191,21 @@ int RGWRados::trim_usage(rgw_user& user, uint64_t start_epoch, uint64_t end_epoc
   usage_log_hash(cct, user_str, first_hash, index);
 
   hash = first_hash;
-
   do {
     int ret =  cls_obj_usage_log_trim(hash, user_str, start_epoch, end_epoch);
-    if (ret == -ENOENT)
-      goto next;
 
-    if (ret < 0)
+    if (ret < 0 && ret != -ENOENT)
       return ret;
 
-next:
     usage_log_hash(cct, user_str, hash, ++index);
   } while (hash != first_hash);
 
   return 0;
 }
 
-#define MAX_SHARDS_PRIME 7877
-
 int RGWRados::key_to_shard_id(const string& key, int max_shards)
 {
-  uint32_t val = ceph_str_hash_linux(key.c_str(), key.size()) % MAX_SHARDS_PRIME;
-  return val % max_shards;
+  return rgw_shards_hash(key, max_shards);
 }
 
 void RGWRados::shard_name(const string& prefix, unsigned max_shards, const string& key, string& name, int *shard_id)
@@ -4985,16 +5382,14 @@ string RGWRados::objexp_hint_get_shardname(int shard_num)
   return objname + buf;
 }
 
-#define MAX_OBJEXP_SHARDS_PRIME 7877
-
 int RGWRados::objexp_key_shard(const rgw_obj_index_key& key)
 {
   string obj_key = key.name + key.instance;
   int num_shards = cct->_conf->rgw_objexp_hints_num_shards;
   uint32_t sid = ceph_str_hash_linux(obj_key.c_str(), obj_key.size());
   uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
-  sid = sid2 % MAX_OBJEXP_SHARDS_PRIME % num_shards;
-  return sid % num_shards;
+  sid = rgw_shards_mod(sid2, num_shards);
+  return sid;
 }
 
 static string objexp_hint_get_keyext(const string& tenant_name,
@@ -5163,6 +5558,21 @@ int rgw_policy_from_attrset(CephContext *cct, map<string, bufferlist>& attrset,
 }
 
 
+int RGWRados::Bucket::update_bucket_id(const string& new_bucket_id)
+{
+  rgw_bucket bucket = bucket_info.bucket;
+  bucket.update_bucket_id(new_bucket_id);
+
+  RGWObjectCtx obj_ctx(store);
+
+  int ret = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, nullptr, nullptr);
+  if (ret < 0) {
+    return ret;
+  }
+
+  return 0;
+}
+
 /** 
  * get listing of the objects in a bucket.
  *
@@ -5178,7 +5588,8 @@ int rgw_policy_from_attrset(CephContext *cct, map<string, bufferlist>& attrset,
  * common_prefixes: if delim is filled in, any matching prefixes are placed here.
  * is_truncated: if number of objects in the bucket is bigger than max, then truncated.
  */
-int RGWRados::Bucket::List::list_objects(int max, vector<rgw_bucket_dir_entry> *result,
+int RGWRados::Bucket::List::list_objects(int64_t max,
+                                         vector<rgw_bucket_dir_entry> *result,
                                          map<string, bool> *common_prefixes,
                                          bool *is_truncated)
 {
@@ -5193,17 +5604,13 @@ int RGWRados::Bucket::List::list_objects(int max, vector<rgw_bucket_dir_entry> *
   result->clear();
 
   rgw_obj_key marker_obj(params.marker.name, params.marker.instance, params.ns);
-
-  rgw_obj_key end_marker_obj;
-  rgw_obj_index_key cur_end_marker;
-  if (!params.ns.empty()) {
-    end_marker_obj = rgw_obj_key(params.end_marker.name, params.end_marker.instance, params.ns);
-    end_marker_obj.ns = params.ns;
-    end_marker_obj.get_index_key(&cur_end_marker);
-  }
   rgw_obj_index_key cur_marker;
   marker_obj.get_index_key(&cur_marker);
 
+  rgw_obj_key end_marker_obj(params.end_marker.name, params.end_marker.instance,
+                             params.ns);
+  rgw_obj_index_key cur_end_marker;
+  end_marker_obj.get_index_key(&cur_end_marker);
   const bool cur_end_marker_valid = !params.end_marker.empty();
 
   rgw_obj_key prefix_obj(params.prefix);
@@ -5225,7 +5632,7 @@ int RGWRados::Bucket::List::list_objects(int max, vector<rgw_bucket_dir_entry> *
     bigger_than_delim = buf;
 
     /* if marker points at a common prefix, fast forward it into its upperbound string */
-    int delim_pos = cur_marker.name.find(params.delim, params.prefix.size());
+    int delim_pos = cur_marker.name.find(params.delim, cur_prefix.size());
     if (delim_pos >= 0) {
       string s = cur_marker.name.substr(0, delim_pos);
       s.append(bigger_than_delim);
@@ -5309,7 +5716,9 @@ int RGWRados::Bucket::List::list_objects(int max, vector<rgw_bucket_dir_entry> *
             next_marker = prefix_key;
             (*common_prefixes)[prefix_key] = true;
 
-            skip_after_delim = obj.name.substr(0, delim_pos);
+            int marker_delim_pos = cur_marker.name.find(params.delim, cur_prefix.size());
+
+            skip_after_delim = cur_marker.name.substr(0, marker_delim_pos);
             skip_after_delim.append(bigger_than_delim);
 
             ldout(cct, 20) << "skip_after_delim=" << skip_after_delim << dendl;
@@ -5348,34 +5757,21 @@ done:
  */
 int RGWRados::create_pool(const rgw_pool& pool)
 {
-  int ret = 0;
-
-  librados::Rados *rad = get_rados_handle();
-  ret = rad->pool_create(pool.name.c_str(), 0);
-  if (ret == -EEXIST)
-    ret = 0;
-  else if (ret == -ERANGE) {
-    ldout(cct, 0)
-      << __func__
-      << " ERROR: librados::Rados::pool_create returned " << cpp_strerror(-ret)
-      << " (this can be due to a pool or placement group misconfiguration, e.g., pg_num < pgp_num)"
-      << dendl;
-  }
-  if (ret < 0)
-    return ret;
-
-  return 0;
+  librados::IoCtx io_ctx;
+  constexpr bool create = true;
+  return rgw_init_ioctx(get_rados_handle(), pool, io_ctx, create);
 }
 
 int RGWRados::init_bucket_index(RGWBucketInfo& bucket_info, int num_shards)
 {
   librados::IoCtx index_ctx; // context for new bucket
 
+  string dir_oid =  dir_oid_prefix;
   int r = open_bucket_index_ctx(bucket_info, index_ctx);
-  if (r < 0)
+  if (r < 0) {
     return r;
+  }
 
-  string dir_oid =  dir_oid_prefix;
   dir_oid.append(bucket_info.bucket.bucket_id);
 
   map<int, string> bucket_objs;
@@ -5393,10 +5789,6 @@ void RGWRados::create_bucket_id(string *bucket_id)
   *bucket_id = buf;
 }
 
-/**
- * create a bucket with name bucket and the given list of attrs
- * returns 0 on success, -ERR# otherwise.
- */
 int RGWRados::create_bucket(RGWUserInfo& owner, rgw_bucket& bucket,
                             const string& zonegroup_id,
                             const string& placement_rule,
@@ -5515,7 +5907,7 @@ int RGWRados::select_new_bucket_location(RGWUserInfo& user_info, const string& z
                                          string *pselected_rule_name, RGWZonePlacementInfo *rule_info)
 
 {
-  /* first check that rule exists within the specific zonegroup */
+  /* first check that zonegroup exists within current period. */
   RGWZoneGroup zonegroup;
   int ret = get_zonegroup(zonegroup_id, zonegroup);
   if (ret < 0) {
@@ -5523,37 +5915,48 @@ int RGWRados::select_new_bucket_location(RGWUserInfo& user_info, const string& z
     return ret;
   }
 
-  /* now check that tag exists within zonegroup */
   /* find placement rule. Hierarchy: request rule > user default rule > zonegroup default rule */
-  string rule = request_rule;
-  if (rule.empty()) {
-    rule = user_info.default_placement;
-    if (rule.empty())
-      rule = zonegroup.default_placement;
-  }
-
-  if (rule.empty()) {
-    ldout(cct, 0) << "misconfiguration, should not have an empty placement rule name" << dendl;
-    return -EIO;
-  }
-
-  map<string, RGWZoneGroupPlacementTarget>::iterator titer = zonegroup.placement_targets.find(rule);
-  if (titer == zonegroup.placement_targets.end()) {
-    ldout(cct, 0) << "could not find placement rule " << rule << " within zonegroup " << dendl;
-    return -EINVAL;
+  std::map<std::string, RGWZoneGroupPlacementTarget>::const_iterator titer;
+
+  if (!request_rule.empty()) {
+    titer = zonegroup.placement_targets.find(request_rule);
+    if (titer == zonegroup.placement_targets.end()) {
+      ldout(cct, 0) << "could not find requested placement id " << request_rule 
+                    << " within zonegroup " << dendl;
+      return -ERR_INVALID_LOCATION_CONSTRAINT;
+    }
+  } else if (!user_info.default_placement.empty()) {
+    titer = zonegroup.placement_targets.find(user_info.default_placement);
+    if (titer == zonegroup.placement_targets.end()) {
+      ldout(cct, 0) << "could not find user default placement id " << user_info.default_placement
+                    << " within zonegroup " << dendl;
+      return -ERR_INVALID_LOCATION_CONSTRAINT;
+    }
+  } else {
+    if (zonegroup.default_placement.empty()) { // zonegroup default rule as fallback, it should not be empty.
+      ldout(cct, 0) << "misconfiguration, zonegroup default placement id should not be empty." << dendl;
+      return -ERR_ZONEGROUP_DEFAULT_PLACEMENT_MISCONFIGURATION;
+    } else {
+      titer = zonegroup.placement_targets.find(zonegroup.default_placement);
+      if (titer == zonegroup.placement_targets.end()) {
+        ldout(cct, 0) << "could not find zonegroup default placement id " << zonegroup.default_placement
+                      << " within zonegroup " << dendl;
+        return -ERR_INVALID_LOCATION_CONSTRAINT;
+      }
+    }
   }
 
   /* now check tag for the rule, whether user is permitted to use rule */
-  RGWZoneGroupPlacementTarget& target_rule = titer->second;
+  const auto& target_rule = titer->second;
   if (!target_rule.user_permitted(user_info.placement_tags)) {
-    ldout(cct, 0) << "user not permitted to use placement rule" << dendl;
+    ldout(cct, 0) << "user not permitted to use placement rule " << titer->first  << dendl;
     return -EPERM;
   }
 
   if (pselected_rule_name)
-    *pselected_rule_name = rule;
+    *pselected_rule_name = titer->first;
 
-  return select_bucket_location_by_rule(rule, rule_info);
+  return select_bucket_location_by_rule(titer->first, rule_info);
 }
 
 int RGWRados::select_bucket_location_by_rule(const string& location_rule, RGWZonePlacementInfo *rule_info)
@@ -5573,7 +5976,7 @@ int RGWRados::select_bucket_location_by_rule(const string& location_rule, RGWZon
   map<string, RGWZonePlacementInfo>::iterator piter = get_zone_params().placement_pools.find(location_rule);
   if (piter == get_zone_params().placement_pools.end()) {
     /* couldn't find, means we cannot really place data for this bucket in this zone */
-    if (get_zonegroup().equals(zonegroup_id)) {
+    if (get_zonegroup().equals(zonegroup.get_id())) {
       /* that's a configuration error, zone should have that rule, as we're within the requested
        * zonegroup */
       return -EINVAL;
@@ -5783,6 +6186,7 @@ int RGWRados::create_pools(vector<rgw_pool>& pools, vector<int>& retcodes)
   vector<int>::iterator riter;
   vector<librados::PoolAsyncCompletion *>::iterator citer;
 
+  bool error = false;
   assert(rets.size() == completions.size());
   for (riter = rets.begin(), citer = completions.begin(); riter != rets.end(); ++riter, ++citer) {
     int r = *riter;
@@ -5792,11 +6196,55 @@ int RGWRados::create_pools(vector<rgw_pool>& pools, vector<int>& retcodes)
       r = c->get_return_value();
       if (r < 0) {
         ldout(cct, 0) << "WARNING: async pool_create returned " << r << dendl;
+        error = true;
       }
     }
     c->release();
     retcodes.push_back(r);
   }
+  if (error) {
+    return 0;
+  }
+
+  std::vector<librados::IoCtx> io_ctxs;
+  retcodes.clear();
+  for (auto pool : pools) {
+    io_ctxs.emplace_back();
+    int ret = rad->ioctx_create(pool.name.c_str(), io_ctxs.back());
+    if (ret < 0) {
+      ldout(cct, 0) << "WARNING: ioctx_create returned " << ret << dendl;
+      error = true;
+    }
+    retcodes.push_back(ret);
+  }
+  if (error) {
+    return 0;
+  }
+
+  completions.clear();
+  for (auto &io_ctx : io_ctxs) {
+    librados::PoolAsyncCompletion *c =
+      librados::Rados::pool_async_create_completion();
+    completions.push_back(c);
+    int ret = io_ctx.application_enable_async(pg_pool_t::APPLICATION_NAME_RGW,
+                                              false, c);
+    assert(ret == 0);
+  }
+
+  retcodes.clear();
+  for (auto c : completions) {
+    c->wait();
+    int ret = c->get_return_value();
+    if (ret == -EOPNOTSUPP) {
+      ret = 0;
+    } else if (ret < 0) {
+      ldout(cct, 0) << "WARNING: async application_enable returned " << ret
+                    << dendl;
+      error = true;
+    }
+    c->release();
+    retcodes.push_back(ret);
+  }
   return 0;
 }
 
@@ -5841,7 +6289,7 @@ int RGWRados::get_obj_head_ref(const RGWBucketInfo& bucket_info, const rgw_obj&
   return 0;
 }
 
-int RGWRados::get_raw_obj_ref(const rgw_raw_obj& obj, rgw_rados_ref *ref, rgw_pool *pool)
+int RGWRados::get_raw_obj_ref(const rgw_raw_obj& obj, rgw_rados_ref *ref)
 {
   ref->oid = obj.oid;
   ref->key = obj.loc;
@@ -5854,9 +6302,6 @@ int RGWRados::get_raw_obj_ref(const rgw_raw_obj& obj, rgw_rados_ref *ref, rgw_po
   } else {
     ref->pool = obj.pool;
   }
-  if (pool) {
-    *pool = ref->pool;
-  }
   r = open_pool_ctx(ref->pool, ref->ioctx);
   if (r < 0)
     return r;
@@ -5866,9 +6311,9 @@ int RGWRados::get_raw_obj_ref(const rgw_raw_obj& obj, rgw_rados_ref *ref, rgw_po
   return 0;
 }
 
-int RGWRados::get_system_obj_ref(const rgw_raw_obj& obj, rgw_rados_ref *ref, rgw_pool *pool)
+int RGWRados::get_system_obj_ref(const rgw_raw_obj& obj, rgw_rados_ref *ref)
 {
-  return get_raw_obj_ref(obj, ref, pool);
+  return get_raw_obj_ref(obj, ref);
 }
 
 /*
@@ -6148,6 +6593,21 @@ int RGWRados::BucketShard::init(const rgw_bucket& _bucket, int sid)
   return 0;
 }
 
+int RGWRados::BucketShard::init(const RGWBucketInfo& bucket_info, int sid)
+{
+  bucket = bucket_info.bucket;
+  shard_id = sid;
+
+  int ret = store->open_bucket_index_shard(bucket_info, index_ctx, shard_id, &bucket_obj);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: open_bucket_index_shard() returned ret=" << ret << dendl;
+    return ret;
+  }
+  ldout(store->ctx(), 20) << " bucket index object: " << bucket_obj << dendl;
+
+  return 0;
+}
+
 
 /* Execute @handler on last item in bucket listing for bucket specified
  * in @bucket_info. @obj_prefix and @obj_delim narrow down the listing
@@ -6271,7 +6731,6 @@ int RGWRados::swift_versioning_copy(RGWObjectCtx& obj_ctx,
                NULL, /* string *version_id */
                NULL, /* string *ptag */
                NULL, /* string *petag */
-               NULL, /* struct rgw_err *err */
                NULL, /* void (*progress_cb)(off_t, void *) */
                NULL); /* void *progress_data */
   if (r == -ECANCELED || r == -ENOENT) {
@@ -6361,7 +6820,6 @@ int RGWRados::swift_versioning_restore(RGWObjectCtx& obj_ctx,
                        nullptr,       /* string *version_id */
                        nullptr,       /* string *ptag */
                        nullptr,       /* string *petag */
-                       nullptr,       /* struct rgw_err *err */
                        nullptr,       /* void (*progress_cb)(off_t, void *) */
                        nullptr);      /* void *progress_data */
     if (ret == -ECANCELED || ret == -ENOENT) {
@@ -6402,12 +6860,11 @@ int RGWRados::swift_versioning_restore(RGWObjectCtx& obj_ctx,
  * Returns: 0 on success, -ERR# otherwise.
  */
 int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_size,
-                                           map<string, bufferlist>& attrs, bool assume_noent,
+                                           map<string, bufferlist>& attrs,
+                                           bool assume_noent, bool modify_tail,
                                            void *_index_op)
 {
   RGWRados::Bucket::UpdateIndex *index_op = static_cast<RGWRados::Bucket::UpdateIndex *>(_index_op);
-  rgw_pool pool;
-  rgw_rados_ref ref;
   RGWRados *store = target->get_store();
 
   ObjectWriteOperation op;
@@ -6424,6 +6881,7 @@ int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_si
     return -EIO;
   }
 
+  rgw_rados_ref ref;
   r = store->get_obj_head_ref(target->get_bucket_info(), obj, &ref);
   if (r < 0)
     return r;
@@ -6436,7 +6894,7 @@ int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_si
   if (!ptag && !index_op->get_optag()->empty()) {
     ptag = index_op->get_optag();
   }
-  r = target->prepare_atomic_modification(op, reset_obj, ptag, meta.if_match, meta.if_nomatch, false);
+  r = target->prepare_atomic_modification(op, reset_obj, ptag, meta.if_match, meta.if_nomatch, false, modify_tail);
   if (r < 0)
     return r;
 
@@ -6512,9 +6970,16 @@ int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_si
 
   uint64_t epoch;
   int64_t poolid;
-
-  bool orig_exists = state->exists;
-  uint64_t orig_size = state->accounted_size;
+  bool orig_exists;
+  uint64_t orig_size;
+  
+  if (!reset_obj) {    //Multipart upload, it has immutable head. 
+    orig_exists = false;
+    orig_size = 0;
+  } else {
+    orig_exists = state->exists;
+    orig_size = state->accounted_size;
+  }
 
   bool versioned_target = (meta.olh_epoch > 0 || !obj.key.instance.empty());
 
@@ -6564,7 +7029,7 @@ int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_si
   state = NULL;
 
   if (versioned_op) {
-    r = store->set_olh(target->get_ctx(), target->get_bucket_info(), obj, false, NULL, meta.olh_epoch, real_time(), false);
+    r = store->set_olh(target->get_ctx(), target->get_bucket_info(), obj, false, NULL, meta.olh_epoch, real_time(), false, meta.zones_trace);
     if (r < 0) {
       return r;
     }
@@ -6584,8 +7049,14 @@ int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_si
   meta.canceled = false;
 
   /* update quota cache */
-  store->quota_handler->update_stats(meta.owner, obj.bucket, (orig_exists ? 0 : 1),
-                                     accounted_size, orig_size);
+  if (meta.completeMultipart){
+       store->quota_handler->update_stats(meta.owner, obj.bucket, (orig_exists ? 0 : 1),
+                                     0, orig_size);
+  }
+  else {
+    store->quota_handler->update_stats(meta.owner, obj.bucket, (orig_exists ? 0 : 1),
+                                     accounted_size, orig_size);  
+  }
   return 0;
 
 done_cancel:
@@ -6640,17 +7111,18 @@ int RGWRados::Object::Write::write_meta(uint64_t size, uint64_t accounted_size,
 
   RGWRados::Bucket bop(target->get_store(), bucket_info);
   RGWRados::Bucket::UpdateIndex index_op(&bop, target->get_obj());
-
+  index_op.set_zones_trace(meta.zones_trace);
+  
   bool assume_noent = (meta.if_match == NULL && meta.if_nomatch == NULL);
   int r;
   if (assume_noent) {
-    r = _do_write_meta(size, accounted_size, attrs, assume_noent, (void *)&index_op);
+    r = _do_write_meta(size, accounted_size, attrs, assume_noent, meta.modify_tail, (void *)&index_op);
     if (r == -EEXIST) {
       assume_noent = false;
     }
   }
   if (!assume_noent) {
-    r = _do_write_meta(size, accounted_size, attrs, assume_noent, (void *)&index_op);
+    r = _do_write_meta(size, accounted_size, attrs, assume_noent, meta.modify_tail, (void *)&index_op);
   }
   return r;
 }
@@ -6662,9 +7134,8 @@ int RGWRados::put_system_obj_impl(rgw_raw_obj& obj, uint64_t size, real_time *mt
               RGWObjVersionTracker *objv_tracker,
               real_time set_mtime /* 0 for don't set */)
 {
-  rgw_pool pool;
   rgw_rados_ref ref;
-  int r = get_system_obj_ref(obj, &ref, &pool);
+  int r = get_system_obj_ref(obj, &ref);
   if (r < 0)
     return r;
 
@@ -6725,8 +7196,7 @@ int RGWRados::put_system_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& bl,
                                   RGWObjVersionTracker *objv_tracker)
 {
   rgw_rados_ref ref;
-  rgw_pool pool;
-  int r = get_system_obj_ref(obj, &ref, &pool);
+  int r = get_system_obj_ref(obj, &ref);
   if (r < 0) {
     return r;
   }
@@ -6811,19 +7281,51 @@ bool RGWRados::aio_completed(void *handle)
   return c->is_safe();
 }
 
+// PutObj filter that buffers data so we don't try to compress tiny blocks.
+// libcurl reads in 16k at a time, and we need at least 64k to get a good
+// compression ratio
+class RGWPutObj_Buffer : public RGWPutObj_Filter {
+  const unsigned buffer_size;
+  bufferlist buffer;
+ public:
+  RGWPutObj_Buffer(RGWPutObjDataProcessor* next, unsigned buffer_size)
+    : RGWPutObj_Filter(next), buffer_size(buffer_size) {
+    assert(ISP2(buffer_size)); // must be power of 2
+  }
+
+  int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj,
+                  bool *again) override {
+    if (*again || !bl.length()) {
+      // flush buffered data
+      return RGWPutObj_Filter::handle_data(buffer, ofs, phandle, pobj, again);
+    }
+    // transform offset to the beginning of the buffer
+    ofs = ofs - buffer.length();
+    buffer.claim_append(bl);
+    if (buffer.length() < buffer_size) {
+      *again = false; // don't come back until there's more data
+      return 0;
+    }
+    const auto count = P2ALIGN(buffer.length(), buffer_size);
+    buffer.splice(0, count, &bl);
+    return RGWPutObj_Filter::handle_data(bl, ofs, phandle, pobj, again);
+  }
+};
+
 class RGWRadosPutObj : public RGWGetDataCB
 {
   CephContext* cct;
   rgw_obj obj;
   RGWPutObjDataProcessor *filter;
   boost::optional<RGWPutObj_Compress>& compressor;
+  boost::optional<RGWPutObj_Buffer> buffering;
   CompressorRef& plugin;
   RGWPutObjProcessor_Atomic *processor;
   RGWOpStateSingleOp *opstate;
   void (*progress_cb)(off_t, void *);
   void *progress_data;
   bufferlist extra_data_bl;
-  uint64_t extra_data_len;
+  uint64_t extra_data_left;
   uint64_t data_len;
   map<string, bufferlist> src_attrs;
 public:
@@ -6842,7 +7344,7 @@ public:
                        opstate(_ops),
                        progress_cb(_progress_cb),
                        progress_data(_progress_data),
-                       extra_data_len(0),
+                       extra_data_left(0),
                        data_len(0) {}
 
   int process_attrs(void) {
@@ -6862,7 +7364,9 @@ public:
     if (plugin && src_attrs.find(RGW_ATTR_CRYPT_MODE) == src_attrs.end()) {
       //do not compress if object is encrypted
       compressor = boost::in_place(cct, plugin, filter);
-      filter = &*compressor;
+      constexpr unsigned buffer_size = 512 * 1024;
+      buffering = boost::in_place(&*compressor, buffer_size);
+      filter = &*buffering;
     }
     return 0;
   }
@@ -6871,17 +7375,17 @@ public:
     if (progress_cb) {
       progress_cb(ofs, progress_data);
     }
-    if (extra_data_len) {
+    if (extra_data_left) {
       size_t extra_len = bl.length();
-      if (extra_len > extra_data_len)
-        extra_len = extra_data_len;
+      if (extra_len > extra_data_left)
+        extra_len = extra_data_left;
 
       bufferlist extra;
       bl.splice(0, extra_len, &extra);
       extra_data_bl.append(extra);
 
-      extra_data_len -= extra_len;
-      if (extra_data_len == 0) {
+      extra_data_left -= extra_len;
+      if (extra_data_left == 0) {
         int res = process_attrs();
         if (res < 0)
           return res;
@@ -6889,7 +7393,13 @@ public:
       if (bl.length() == 0) {
         return 0;
       }
+      ofs += extra_len;
     }
+    // adjust ofs based on extra_data_len, so the result is a logical offset
+    // into the object data
+    assert(uint64_t(ofs) >= extra_data_len);
+    ofs -= extra_data_len;
+
     data_len += bl.length();
     bool again = false;
 
@@ -6928,12 +7438,18 @@ public:
     return 0;
   }
 
+  int flush() {
+    bufferlist bl;
+    return put_data_and_throttle(filter, bl, 0, false);
+  }
+
   bufferlist& get_extra_data() { return extra_data_bl; }
 
   map<string, bufferlist>& get_attrs() { return src_attrs; }
 
   void set_extra_data_len(uint64_t len) override {
-    extra_data_len = len;
+    extra_data_left = len;
+    RGWGetDataCB::set_extra_data_len(len);
   }
 
   uint64_t get_data_len() {
@@ -6941,8 +7457,8 @@ public:
   }
 
   int complete(const string& etag, real_time *mtime, real_time set_mtime,
-               map<string, bufferlist>& attrs, real_time delete_at) {
-    return processor->complete(data_len, etag, mtime, set_mtime, attrs, delete_at);
+               map<string, bufferlist>& attrs, real_time delete_at, rgw_zone_set *zones_trace) {
+    return processor->complete(data_len, etag, mtime, set_mtime, attrs, delete_at, NULL, NULL, NULL, zones_trace);
   }
 
   bool is_canceled() {
@@ -6965,6 +7481,12 @@ static void set_copy_attrs(map<string, bufferlist>& src_attrs,
     if (!attrs[RGW_ATTR_ETAG].length()) {
       attrs[RGW_ATTR_ETAG] = src_attrs[RGW_ATTR_ETAG];
     }
+    if (!attrs[RGW_ATTR_TAIL_TAG].length()) {
+      auto ttiter = src_attrs.find(RGW_ATTR_TAIL_TAG);
+      if (ttiter != src_attrs.end()) {
+        attrs[RGW_ATTR_TAIL_TAG] = src_attrs[RGW_ATTR_TAIL_TAG];
+      }
+    }
     break;
   case RGWRados::ATTRSMOD_MERGE:
     for (map<string, bufferlist>::iterator it = src_attrs.begin(); it != src_attrs.end(); ++it) {
@@ -6996,6 +7518,7 @@ int RGWRados::rewrite_obj(RGWBucketInfo& dest_bucket_info, rgw_obj& obj)
     return ret;
 
   attrset.erase(RGW_ATTR_ID_TAG);
+  attrset.erase(RGW_ATTR_TAIL_TAG);
 
   uint64_t max_chunk_size;
 
@@ -7005,8 +7528,11 @@ int RGWRados::rewrite_obj(RGWBucketInfo& dest_bucket_info, rgw_obj& obj)
     return ret;
   }
 
-  return copy_obj_data(rctx, dest_bucket_info, read_op, obj_size - 1, obj, obj, max_chunk_size, NULL, mtime, attrset,
-                       RGW_OBJ_CATEGORY_MAIN, 0, real_time(), NULL, NULL, NULL, NULL);
+  return copy_obj_data(rctx, dest_bucket_info, read_op, obj_size - 1, obj, obj,
+                                          max_chunk_size, NULL, mtime, attrset,
+                                          RGW_OBJ_CATEGORY_MAIN, 0, real_time(),
+                                          (obj.key.instance.empty() ? NULL : &(obj.key.instance)),
+                                          NULL, NULL);
 }
 
 struct obj_time_weight {
@@ -7153,10 +7679,15 @@ int RGWRados::stat_remote_obj(RGWObjectCtx& obj_ctx,
 
   obj_time_weight dest_mtime_weight;
 
+  constexpr bool prepend_meta = true;
+  constexpr bool get_op = true;
+  constexpr bool rgwx_stat = true;
+  constexpr bool sync_manifest = true;
+  constexpr bool skip_decrypt = true;
   int ret = conn->get_obj(user_id, info, src_obj, pmod, unmod_ptr,
                       dest_mtime_weight.zone_short_id, dest_mtime_weight.pg_ver,
-                      true /* prepend_meta */, true /* GET */, true /* rgwx-stat */,
-                      true /* sync manifest */, &cb, &in_stream_req);
+                      prepend_meta, get_op, rgwx_stat,
+                      sync_manifest, skip_decrypt, &cb, &in_stream_req);
   if (ret < 0) {
     return ret;
   }
@@ -7225,9 +7756,9 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
                string *version_id,
                string *ptag,
                ceph::buffer::list *petag,
-               struct rgw_err *err,
                void (*progress_cb)(off_t, void *),
-               void *progress_data)
+               void *progress_data,
+               rgw_zone_set *zones_trace)
 {
   /* source is in a different zonegroup, copy from there */
 
@@ -7324,10 +7855,15 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
     }
   }
 
+  static constexpr bool prepend_meta = true;
+  static constexpr bool get_op = true;
+  static constexpr bool rgwx_stat = false;
+  static constexpr bool sync_manifest = true;
+  static constexpr bool skip_decrypt = true;
   ret = conn->get_obj(user_id, info, src_obj, pmod, unmod_ptr,
                       dest_mtime_weight.zone_short_id, dest_mtime_weight.pg_ver,
-                      true /* prepend_meta */, true /* GET */, false /* rgwx-stat */,
-                      true /* sync manifest */, &cb, &in_stream_req);
+                      prepend_meta, get_op, rgwx_stat,
+                      sync_manifest, skip_decrypt, &cb, &in_stream_req);
   if (ret < 0) {
     goto set_err_state;
   }
@@ -7336,6 +7872,10 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
   if (ret < 0) {
     goto set_err_state;
   }
+  ret = cb.flush();
+  if (ret < 0) {
+    goto set_err_state;
+  }
   if (compressor && compressor->is_compressed()) {
     bufferlist tmp;
     RGWCompressionInfo cs_info;
@@ -7393,7 +7933,7 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
 
 #define MAX_COMPLETE_RETRY 100
   for (i = 0; i < MAX_COMPLETE_RETRY; i++) {
-    ret = cb.complete(etag, mtime, set_mtime, attrs, delete_at);
+    ret = cb.complete(etag, mtime, set_mtime, attrs, delete_at, zones_trace);
     if (ret < 0) {
       goto set_err_state;
     }
@@ -7467,13 +8007,14 @@ int RGWRados::copy_obj_to_remote_dest(RGWObjState *astate,
 
   int ret = rest_master_conn->put_obj_init(user_id, dest_obj, astate->size, src_attrs, &out_stream_req);
   if (ret < 0) {
-    delete out_stream_req;
     return ret;
   }
 
   ret = read_op.iterate(0, astate->size - 1, out_stream_req->get_out_cb());
-  if (ret < 0)
+  if (ret < 0) {
+    delete out_stream_req;
     return ret;
+  }
 
   ret = rest_master_conn->complete_request(out_stream_req, etag, mtime);
   if (ret < 0)
@@ -7523,7 +8064,6 @@ int RGWRados::copy_obj(RGWObjectCtx& obj_ctx,
                string *version_id,
                string *ptag,
                ceph::buffer::list *petag,
-               struct rgw_err *err,
                void (*progress_cb)(off_t, void *),
                void *progress_data)
 {
@@ -7553,7 +8093,7 @@ int RGWRados::copy_obj(RGWObjectCtx& obj_ctx,
                dest_obj, src_obj, dest_bucket_info, src_bucket_info, src_mtime, mtime, mod_ptr,
                unmod_ptr, high_precision_time,
                if_match, if_nomatch, attrs_mod, copy_if_newer, attrs, category,
-               olh_epoch, delete_at, version_id, ptag, petag, err, progress_cb, progress_data);
+               olh_epoch, delete_at, version_id, ptag, petag, progress_cb, progress_data);
   }
 
   map<string, bufferlist> src_attrs;
@@ -7568,12 +8108,20 @@ int RGWRados::copy_obj(RGWObjectCtx& obj_ctx,
   read_op.params.attrs = &src_attrs;
   read_op.params.lastmod = src_mtime;
   read_op.params.obj_size = &obj_size;
-  read_op.params.perr = err;
 
   ret = read_op.prepare();
   if (ret < 0) {
     return ret;
   }
+  if (src_attrs.count(RGW_ATTR_CRYPT_MODE)) {
+    // Current implementation does not follow S3 spec and even
+    // may result in data corruption silently when copying
+    // multipart objects acorss pools. So reject COPY operations
+    //on encrypted objects before it is fully functional.
+    ldout(cct, 0) << "ERROR: copy op for encrypted object " << src_obj
+                  << " has not been implemented." << dendl;
+    return -ERR_NOT_IMPLEMENTED;
+  }
 
   src_attrs[RGW_ATTR_ACL] = attrs[RGW_ATTR_ACL];
   src_attrs.erase(RGW_ATTR_DELETE_AT);
@@ -7648,7 +8196,7 @@ int RGWRados::copy_obj(RGWObjectCtx& obj_ctx,
   if (copy_data) { /* refcounting tail wouldn't work here, just copy the data */
     return copy_obj_data(obj_ctx, dest_bucket_info, read_op, obj_size - 1, dest_obj, src_obj,
                          max_chunk_size, mtime, real_time(), attrs, category, olh_epoch, delete_at,
-                         version_id, ptag, petag, err);
+                         version_id, ptag, petag);
   }
 
   RGWObjManifest::obj_iterator miter = astate->manifest.obj_begin();
@@ -7676,7 +8224,7 @@ int RGWRados::copy_obj(RGWObjectCtx& obj_ctx,
 
   bool copy_itself = (dest_obj == src_obj);
   RGWObjManifest *pmanifest; 
-  ldout(cct, 0) << "dest_obj=" << dest_obj << " src_obj=" << src_obj << " copy_itself=" << (int)copy_itself << dendl;
+  ldout(cct, 20) << "dest_obj=" << dest_obj << " src_obj=" << src_obj << " copy_itself=" << (int)copy_itself << dendl;
 
   RGWRados::Object dest_op_target(this, dest_bucket_info, obj_ctx, dest_obj);
   RGWRados::Object::Write write_op(&dest_op_target);
@@ -7692,15 +8240,17 @@ int RGWRados::copy_obj(RGWObjectCtx& obj_ctx,
   }
 
   if (!copy_itself) {
+    attrs.erase(RGW_ATTR_TAIL_TAG);
     manifest = astate->manifest;
     const rgw_bucket_placement& tail_placement = manifest.get_tail_placement();
     if (tail_placement.bucket.name.empty()) {
       manifest.set_tail_placement(tail_placement.placement_rule, src_obj.bucket);
     }
-    string oid, key;
+    string ref_tag;
     for (; miter != astate->manifest.obj_end(); ++miter) {
       ObjectWriteOperation op;
-      cls_refcount_get(op, tag, true);
+      ref_tag = tag + '\0';
+      cls_refcount_get(op, ref_tag, true);
       const rgw_raw_obj& loc = miter.get_location().get_raw_obj(this);
       ref.ioctx.locator_set_key(loc.loc);
 
@@ -7739,6 +8289,7 @@ int RGWRados::copy_obj(RGWObjectCtx& obj_ctx,
   write_op.meta.category = category;
   write_op.meta.olh_epoch = olh_epoch;
   write_op.meta.delete_at = delete_at;
+  write_op.meta.modify_tail = !copy_itself;
 
   ret = write_op.write_meta(obj_size, astate->accounted_size, attrs);
   if (ret < 0) {
@@ -7751,8 +8302,6 @@ done_ret:
   if (!copy_itself) {
     vector<rgw_raw_obj>::iterator riter;
 
-    string oid, key;
-
     /* rollback reference */
     for (riter = ref_objs.begin(); riter != ref_objs.end(); ++riter) {
       ObjectWriteOperation op;
@@ -7784,8 +8333,7 @@ int RGWRados::copy_obj_data(RGWObjectCtx& obj_ctx,
               real_time delete_at,
                string *version_id,
                string *ptag,
-               ceph::buffer::list *petag,
-               struct rgw_err *err)
+               ceph::buffer::list *petag)
 {
   bufferlist first_chunk;
   RGWObjManifest manifest;
@@ -7794,7 +8342,7 @@ int RGWRados::copy_obj_data(RGWObjectCtx& obj_ctx,
   append_rand_alpha(cct, tag, tag, 32);
 
   RGWPutObjProcessor_Atomic processor(obj_ctx,
-                                      dest_bucket_info, dest_obj.bucket, dest_obj.get_oid(),
+                                      dest_bucket_info, dest_obj.bucket, dest_obj.key.name,
                                       cct->_conf->rgw_obj_stripe_size, tag, dest_bucket_info.versioning_enabled());
   if (version_id) {
     processor.set_version_id(*version_id);
@@ -7857,7 +8405,7 @@ int RGWRados::copy_obj_data(RGWObjectCtx& obj_ctx,
 
 bool RGWRados::is_meta_master()
 {
-  if (!get_zonegroup().is_master) {
+  if (!get_zonegroup().is_master_zonegroup()) {
     return false;
   }
 
@@ -7878,12 +8426,12 @@ bool RGWRados::is_syncing_bucket_meta(const rgw_bucket& bucket)
   }
 
   /* zonegroup is not master zonegroup */
-  if (!get_zonegroup().is_master) {
+  if (!get_zonegroup().is_master_zonegroup()) {
     return false;
   }
 
   /* single zonegroup and a single zone */
-  if (current_period.is_single_zonegroup(cct, this) && get_zonegroup().zones.size() == 1) {
+  if (current_period.is_single_zonegroup() && get_zonegroup().zones.size() == 1) {
     return false;
   }
 
@@ -7968,7 +8516,12 @@ int RGWRados::set_bucket_owner(rgw_bucket& bucket, ACLOwner& owner)
   RGWBucketInfo info;
   map<string, bufferlist> attrs;
   RGWObjectCtx obj_ctx(this);
-  int r = get_bucket_info(obj_ctx, bucket.tenant, bucket.name, info, NULL, &attrs);
+  int r;
+  if (bucket.bucket_id.empty()) {
+    r = get_bucket_info(obj_ctx, bucket.tenant, bucket.name, info, NULL, &attrs);
+  } else {
+    r = get_bucket_instance_info(obj_ctx, bucket, info, nullptr, &attrs);
+  }
   if (r < 0) {
     ldout(cct, 0) << "NOTICE: get_bucket_info on bucket=" << bucket.name << " returned err=" << r << dendl;
     return r;
@@ -8049,7 +8602,7 @@ int RGWRados::Object::complete_atomic_modification()
     return 0;
   }
 
-  string tag = state->obj_tag.to_str();
+  string tag = (state->tail_tag.length() > 0 ? state->tail_tag.to_str() : state->obj_tag.to_str());
   return store->gc->send_chain(chain, tag, false);  // do it async
 }
 
@@ -8200,13 +8753,16 @@ int RGWRados::bucket_check_index(RGWBucketInfo& bucket_info,
   // value - bucket index check OP returned result with the given bucket index object (shard)
   map<int, string> oids;
   map<int, struct rgw_cls_check_index_ret> bucket_objs_ret;
+
   int ret = open_bucket_index(bucket_info, index_ctx, oids, bucket_objs_ret);
-  if (ret < 0)
-    return ret;
+  if (ret < 0) {
+      return ret;
+  }
 
   ret = CLSRGWIssueBucketCheck(index_ctx, oids, bucket_objs_ret, cct->_conf->rgw_bucket_index_max_aio)();
-  if (ret < 0)
-    return ret;
+  if (ret < 0) {
+      return ret;
+  }
 
   // Aggregate results (from different shards if there is any)
   map<int, struct rgw_cls_check_index_ret>::iterator iter;
@@ -8222,13 +8778,27 @@ int RGWRados::bucket_rebuild_index(RGWBucketInfo& bucket_info)
 {
   librados::IoCtx index_ctx;
   map<int, string> bucket_objs;
+
   int r = open_bucket_index(bucket_info, index_ctx, bucket_objs);
-  if (r < 0)
+  if (r < 0) {
     return r;
+  }
 
   return CLSRGWIssueBucketRebuild(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
 }
 
+int RGWRados::bucket_set_reshard(RGWBucketInfo& bucket_info, const cls_rgw_bucket_instance_entry& entry)
+{
+  librados::IoCtx index_ctx;
+  map<int, string> bucket_objs;
+
+  int r = open_bucket_index(bucket_info, index_ctx, bucket_objs);
+  if (r < 0) {
+    return r;
+  }
+
+  return CLSRGWIssueSetBucketResharding(index_ctx, bucket_objs, entry, cct->_conf->rgw_bucket_index_max_aio)();
+}
 
 int RGWRados::defer_gc(void *ctx, const RGWBucketInfo& bucket_info, const rgw_obj& obj)
 {
@@ -8249,13 +8819,17 @@ int RGWRados::defer_gc(void *ctx, const RGWBucketInfo& bucket_info, const rgw_ob
     return -EINVAL;
   }
 
-  if (state->obj_tag.length() == 0) {// check for backward compatibility
+  string tag;
+
+  if (state->tail_tag.length() > 0) {
+    tag = state->tail_tag.c_str();
+  } else if (state->obj_tag.length() > 0) {
+    tag = state->obj_tag.c_str();
+  } else {
     ldout(cct, 20) << "state->obj_tag is empty, not deferring gc operation" << dendl;
     return -EINVAL;
   }
 
-  string tag = state->obj_tag.c_str();
-
   ldout(cct, 0) << "defer chain tag=" << tag << dendl;
 
   return gc->defer_chain(tag, false);
@@ -8324,7 +8898,7 @@ int RGWRados::Object::Delete::delete_obj()
         meta.mtime = params.mtime;
       }
 
-      int r = store->set_olh(target->get_ctx(), target->get_bucket_info(), marker, true, &meta, params.olh_epoch, params.unmod_since, params.high_precision_time);
+      int r = store->set_olh(target->get_ctx(), target->get_bucket_info(), marker, true, &meta, params.olh_epoch, params.unmod_since, params.high_precision_time, params.zones_trace);
       if (r < 0) {
         return r;
       }
@@ -8336,7 +8910,7 @@ int RGWRados::Object::Delete::delete_obj()
         return r;
       }
       result.delete_marker = dirent.is_delete_marker();
-      r = store->unlink_obj_instance(target->get_ctx(), target->get_bucket_info(), obj, params.olh_epoch);
+      r = store->unlink_obj_instance(target->get_ctx(), target->get_bucket_info(), obj, params.olh_epoch, params.zones_trace);
       if (r < 0) {
         return r;
       }
@@ -8350,10 +8924,12 @@ int RGWRados::Object::Delete::delete_obj()
       return r;
     }
 
-    r = store->data_log->add_entry(bs->bucket, bs->shard_id);
-    if (r < 0) {
-      lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
-      return r;
+    if (target->bucket_info.datasync_flag_enabled()) {
+      r = store->data_log->add_entry(bs->bucket, bs->shard_id);
+      if (r < 0) {
+        lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
+        return r;
+      }
     }
 
     return 0;
@@ -8416,7 +8992,7 @@ int RGWRados::Object::Delete::delete_obj()
     return -ENOENT;
   }
 
-  r = target->prepare_atomic_modification(op, false, NULL, NULL, NULL, true);
+  r = target->prepare_atomic_modification(op, false, NULL, NULL, NULL, true, false);
   if (r < 0)
     return r;
 
@@ -8424,23 +9000,19 @@ int RGWRados::Object::Delete::delete_obj()
 
   RGWRados::Bucket bop(store, bucket_info);
   RGWRados::Bucket::UpdateIndex index_op(&bop, obj);
-
+  
+  index_op.set_zones_trace(params.zones_trace);
   index_op.set_bilog_flags(params.bilog_flags);
 
-
   r = index_op.prepare(CLS_RGW_OP_DEL, &state->write_tag);
   if (r < 0)
     return r;
 
   store->remove_rgw_head_obj(op);
   r = ref.ioctx.operate(ref.oid, &op);
-  bool need_invalidate = false;
-  if (r == -ECANCELED) {
-    /* raced with another operation, we can regard it as removed */
-    need_invalidate = true;
-    r = 0;
-  }
-  bool removed = (r >= 0);
+
+  /* raced with another operation, object state is indeterminate */
+  const bool need_invalidate = (r == -ECANCELED);
 
   int64_t poolid = ref.ioctx.get_id();
   if (r >= 0) {
@@ -8450,18 +9022,17 @@ int RGWRados::Object::Delete::delete_obj()
       obj_tombstone_cache->add(obj, entry);
     }
     r = index_op.complete_del(poolid, ref.ioctx.get_last_version(), state->mtime, params.remove_objs);
-  } else {
-    int ret = index_op.cancel();
-    if (ret < 0) {
-      ldout(store->ctx(), 0) << "ERROR: index_op.cancel() returned ret=" << ret << dendl;
-    }
-  }
-  if (removed) {
+    
     int ret = target->complete_atomic_modification();
     if (ret < 0) {
       ldout(store->ctx(), 0) << "ERROR: complete_atomic_modification returned ret=" << ret << dendl;
     }
     /* other than that, no need to propagate error */
+  } else {
+    int ret = index_op.cancel();
+    if (ret < 0) {
+      ldout(store->ctx(), 0) << "ERROR: index_op.cancel() returned ret=" << ret << dendl;
+    }
   }
 
   if (need_invalidate) {
@@ -8482,7 +9053,8 @@ int RGWRados::delete_obj(RGWObjectCtx& obj_ctx,
                          const rgw_obj& obj,
                          int versioning_status,
                          uint16_t bilog_flags,
-                         const real_time& expiration_time)
+                         const real_time& expiration_time,
+                         rgw_zone_set *zones_trace)
 {
   RGWRados::Object del_target(this, bucket_info, obj_ctx, obj);
   RGWRados::Object::Delete del_op(&del_target);
@@ -8491,6 +9063,7 @@ int RGWRados::delete_obj(RGWObjectCtx& obj_ctx,
   del_op.params.versioning_status = versioning_status;
   del_op.params.bilog_flags = bilog_flags;
   del_op.params.expiration_time = expiration_time;
+  del_op.params.zones_trace = zones_trace;
 
   return del_op.delete_obj();
 }
@@ -8498,8 +9071,7 @@ int RGWRados::delete_obj(RGWObjectCtx& obj_ctx,
 int RGWRados::delete_raw_obj(const rgw_raw_obj& obj)
 {
   rgw_rados_ref ref;
-  rgw_pool pool;
-  int r = get_raw_obj_ref(obj, &ref, &pool);
+  int r = get_raw_obj_ref(obj, &ref);
   if (r < 0) {
     return r;
   }
@@ -8522,8 +9094,7 @@ int RGWRados::delete_system_obj(rgw_raw_obj& obj, RGWObjVersionTracker *objv_tra
     return -EINVAL;
   }
   rgw_rados_ref ref;
-  rgw_pool pool;
-  int r = get_raw_obj_ref(obj, &ref, &pool);
+  int r = get_raw_obj_ref(obj, &ref);
   if (r < 0) {
     return r;
   }
@@ -8657,8 +9228,8 @@ int RGWRados::get_system_obj_state_impl(RGWObjectCtx *rctx, rgw_raw_obj& obj, RG
   s->obj_tag = s->attrset[RGW_ATTR_ID_TAG];
 
   if (s->obj_tag.length())
-    ldout(cct, 20) << "get_system_obj_state: setting s->obj_tag to " 
-                   << string(s->obj_tag.c_str(), s->obj_tag.length()) << dendl;
+    ldout(cct, 20) << "get_system_obj_state: setting s->obj_tag to "
+                   << s->obj_tag.c_str() << dendl;
   else
     ldout(cct, 20) << "get_system_obj_state: s->obj_tag was set empty" << dendl;
 
@@ -8729,13 +9300,14 @@ int RGWRados::get_obj_state_impl(RGWObjectCtx *rctx, const RGWBucketInfo& bucket
   s->accounted_size = s->size;
 
   auto iter = s->attrset.find(RGW_ATTR_COMPRESSION);
-  if (iter != s->attrset.end()) {
+  const bool compressed = (iter != s->attrset.end());
+  if (compressed) {
     // use uncompressed size for accounted_size
     try {
       RGWCompressionInfo info;
       auto p = iter->second.begin();
       ::decode(info, p);
-      s->accounted_size = info.orig_size;
+      s->accounted_size = info.orig_size; 
     } catch (buffer::error&) {
       dout(0) << "ERROR: could not decode compression info for object: " << obj << dendl;
       return -EIO;
@@ -8750,6 +9322,10 @@ int RGWRados::get_obj_state_impl(RGWObjectCtx *rctx, const RGWBucketInfo& bucket
     s->shadow_obj[bl.length()] = '\0';
   }
   s->obj_tag = s->attrset[RGW_ATTR_ID_TAG];
+  auto ttiter = s->attrset.find(RGW_ATTR_TAIL_TAG);
+  if (ttiter != s->attrset.end()) {
+    s->tail_tag = s->attrset[RGW_ATTR_TAIL_TAG];
+  }
 
   bufferlist manifest_bl = s->attrset[RGW_ATTR_MANIFEST];
   if (manifest_bl.length()) {
@@ -8760,6 +9336,8 @@ int RGWRados::get_obj_state_impl(RGWObjectCtx *rctx, const RGWBucketInfo& bucket
       s->manifest.set_head(bucket_info.placement_rule, obj, s->size); /* patch manifest to reflect the head we just read, some manifests might be
                                              broken due to old bugs */
       s->size = s->manifest.get_obj_size();
+      if (!compressed)
+        s->accounted_size = s->size;
     } catch (buffer::error& err) {
       ldout(cct, 0) << "ERROR: couldn't decode manifest" << dendl;
       return -EIO;
@@ -8806,7 +9384,7 @@ int RGWRados::get_obj_state_impl(RGWObjectCtx *rctx, const RGWBucketInfo& bucket
     }
   }
   if (s->obj_tag.length())
-    ldout(cct, 20) << "get_obj_state: setting s->obj_tag to " << string(s->obj_tag.c_str(), s->obj_tag.length()) << dendl;
+    ldout(cct, 20) << "get_obj_state: setting s->obj_tag to " << s->obj_tag.c_str() << dendl;
   else
     ldout(cct, 20) << "get_obj_state: s->obj_tag was set empty" << dendl;
 
@@ -8952,9 +9530,8 @@ int RGWRados::Object::Stat::finish()
 }
 
 /**
- * Get the attributes for an object.
- * bucket: name of the bucket holding the object.
- * obj: name of the object
+ * Get an attribute for a system object.
+ * obj: the object to get attr
  * name: name of the attr to retrieve
  * dest: bufferlist to store the result in
  * Returns: 0 on success, -ERR# otherwise.
@@ -8962,8 +9539,7 @@ int RGWRados::Object::Stat::finish()
 int RGWRados::system_obj_get_attr(rgw_raw_obj& obj, const char *name, bufferlist& dest)
 {
   rgw_rados_ref ref;
-  rgw_pool pool;
-  int r = get_system_obj_ref(obj, &ref, &pool);
+  int r = get_system_obj_ref(obj, &ref);
   if (r < 0) {
     return r;
   }
@@ -9022,7 +9598,8 @@ void RGWRados::SystemObject::invalidate_state()
 }
 
 int RGWRados::Object::prepare_atomic_modification(ObjectWriteOperation& op, bool reset_obj, const string *ptag,
-                                                  const char *if_match, const char *if_nomatch, bool removal_op)
+                                                  const char *if_match, const char *if_nomatch, bool removal_op,
+                                                  bool modify_tail)
 {
   int r = get_state(&state, false);
   if (r < 0)
@@ -9106,6 +9683,9 @@ int RGWRados::Object::prepare_atomic_modification(ObjectWriteOperation& op, bool
   ldout(store->ctx(), 10) << "setting object write_tag=" << state->write_tag << dendl;
 
   op.setxattr(RGW_ATTR_ID_TAG, bl);
+  if (modify_tail) {
+    op.setxattr(RGW_ATTR_TAIL_TAG, bl);
+  }
 
   return 0;
 }
@@ -9124,8 +9704,7 @@ int RGWRados::system_obj_set_attrs(void *ctx, rgw_raw_obj& obj,
                         RGWObjVersionTracker *objv_tracker)
 {
   rgw_rados_ref ref;
-  rgw_pool pool;
-  int r = get_system_obj_ref(obj, &ref, &pool);
+  int r = get_system_obj_ref(obj, &ref);
   if (r < 0) {
     return r;
   }
@@ -9251,10 +9830,13 @@ int RGWRados::set_attrs(void *ctx, const RGWBucketInfo& bucket_info, rgw_obj& ob
       return r;
 
     bl.append(tag.c_str(), tag.size() + 1);
-
     op.setxattr(RGW_ATTR_ID_TAG,  bl);
   }
 
+
+  real_time mtime = real_clock::now();
+  struct timespec mtime_ts = real_clock::to_timespec(mtime);
+  op.mtime2(&mtime_ts);
   r = ref.ioctx.operate(ref.oid, &op);
   if (state) {
     if (r >= 0) {
@@ -9265,7 +9847,6 @@ int RGWRados::set_attrs(void *ctx, const RGWBucketInfo& bucket_info, rgw_obj& ob
       string content_type(content_type_bl.c_str(), content_type_bl.length());
       uint64_t epoch = ref.ioctx.get_last_version();
       int64_t poolid = ref.ioctx.get_id();
-      real_time mtime = real_clock::now();
       r = index_op.complete(poolid, epoch, state->size, state->accounted_size,
                             mtime, etag, content_type, &acl_bl,
                             RGW_OBJ_CATEGORY_MAIN, NULL);
@@ -9294,28 +9875,6 @@ int RGWRados::set_attrs(void *ctx, const RGWBucketInfo& bucket_info, rgw_obj& ob
   return 0;
 }
 
-/**
- * Get data about an object out of RADOS and into memory.
- * bucket: name of the bucket the object is in.
- * obj: name/key of the object to read
- * data: if get_data==true, this pointer will be set
- *    to an address containing the object's data/value
- * attrs: if non-NULL, the pointed-to map will contain
- *    all the attrs of the object when this function returns
- * mod_ptr: if non-NULL, compares the object's mtime to *mod_ptr,
- *    and if mtime is smaller it fails.
- * unmod_ptr: if non-NULL, compares the object's mtime to *unmod_ptr,
- *    and if mtime is >= it fails.
- * if_match/nomatch: if non-NULL, compares the object's etag attr
- *    to the string and, if it doesn't/does match, fails out.
- * get_data: if true, the object's data/value will be read out, otherwise not
- * err: Many errors will result in this structure being filled
- *    with extra informatin on the error.
- * Returns: -ERR# on failure, otherwise
- *          (if get_data==true) length of read data,
- *          (if get_data==false) length of the object
- */
-// P3 XXX get_data is not seen used anywhere.
 int RGWRados::Object::Read::prepare()
 {
   RGWRados *store = source->get_store();
@@ -9470,6 +10029,54 @@ int RGWRados::stat_system_obj(RGWObjectCtx& obj_ctx,
   return 0;
 }
 
+
+int RGWRados::Bucket::UpdateIndex::guard_reshard(BucketShard **pbs, std::function<int(BucketShard *)> call)
+{
+  RGWRados *store = target->get_store();
+  BucketShard *bs;
+  int r;
+
+#define NUM_RESHARD_RETRIES 10
+  for (int i = 0; i < NUM_RESHARD_RETRIES; ++i) {
+    int ret = get_bucket_shard(&bs);
+    if (ret < 0) {
+      ldout(store->ctx(), 5) << "failed to get BucketShard object: ret=" << ret << dendl;
+      return ret;
+    }
+    r = call(bs);
+    if (r != -ERR_BUSY_RESHARDING) {
+      break;
+    }
+    ldout(store->ctx(), 0) << "NOTICE: resharding operation on bucket index detected, blocking" << dendl;
+    string new_bucket_id;
+    r = store->block_while_resharding(bs, &new_bucket_id);
+    if (r == -ERR_BUSY_RESHARDING) {
+      continue;
+    }
+    if (r < 0) {
+      return r;
+    }
+    ldout(store->ctx(), 20) << "reshard completion identified, new_bucket_id=" << new_bucket_id << dendl;
+    i = 0; /* resharding is finished, make sure we can retry */
+    r = target->update_bucket_id(new_bucket_id);
+    if (r < 0) {
+      ldout(store->ctx(), 0) << "ERROR: update_bucket_id() new_bucket_id=" << new_bucket_id << " returned r=" << r << dendl;
+      return r;
+    }
+    invalidate_bs();
+  }
+
+  if (r < 0) {
+    return r;
+  }
+
+  if (pbs) {
+    *pbs = bs;
+  }
+
+  return 0;
+}
+
 int RGWRados::SystemObject::Read::stat(RGWObjVersionTracker *objv_tracker)
 {
   RGWRados *store = source->get_store();
@@ -9485,12 +10092,6 @@ int RGWRados::Bucket::UpdateIndex::prepare(RGWModifyOp op, const string *write_t
     return 0;
   }
   RGWRados *store = target->get_store();
-  BucketShard *bs;
-  int ret = get_bucket_shard(&bs);
-  if (ret < 0) {
-    ldout(store->ctx(), 5) << "failed to get BucketShard object: ret=" << ret << dendl;
-    return ret;
-  }
 
   if (write_tag && write_tag->length()) {
     optag = string(write_tag->c_str(), write_tag->length());
@@ -9500,11 +10101,15 @@ int RGWRados::Bucket::UpdateIndex::prepare(RGWModifyOp op, const string *write_t
     }
   }
 
-  int r = store->cls_obj_prepare_op(*bs, op, optag, obj, bilog_flags);
+  int r = guard_reshard(nullptr, [&](BucketShard *bs) -> int { 
+    return store->cls_obj_prepare_op(*bs, op, optag, obj, bilog_flags, zones_trace);
+  });
+
   if (r < 0) {
     return r;
   }
   prepared = true;
+
   return 0;
 }
 
@@ -9521,6 +10126,7 @@ int RGWRados::Bucket::UpdateIndex::complete(int64_t poolid, uint64_t epoch,
   }
   RGWRados *store = target->get_store();
   BucketShard *bs;
+
   int ret = get_bucket_shard(&bs);
   if (ret < 0) {
     ldout(store->ctx(), 5) << "failed to get BucketShard object: ret=" << ret << dendl;
@@ -9547,11 +10153,13 @@ int RGWRados::Bucket::UpdateIndex::complete(int64_t poolid, uint64_t epoch,
   ent.meta.owner_display_name = owner.get_display_name();
   ent.meta.content_type = content_type;
 
-  ret = store->cls_obj_complete_add(*bs, optag, poolid, epoch, ent, category, remove_objs, bilog_flags);
+  ret = store->cls_obj_complete_add(*bs, obj, optag, poolid, epoch, ent, category, remove_objs, bilog_flags, zones_trace);
 
-  int r = store->data_log->add_entry(bs->bucket, bs->shard_id);
-  if (r < 0) {
-    lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
+  if (target->bucket_info.datasync_flag_enabled()) {
+    int r = store->data_log->add_entry(bs->bucket, bs->shard_id);
+    if (r < 0) {
+      lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
+    }
   }
 
   return ret;
@@ -9566,17 +10174,20 @@ int RGWRados::Bucket::UpdateIndex::complete_del(int64_t poolid, uint64_t epoch,
   }
   RGWRados *store = target->get_store();
   BucketShard *bs;
+
   int ret = get_bucket_shard(&bs);
   if (ret < 0) {
     ldout(store->ctx(), 5) << "failed to get BucketShard object: ret=" << ret << dendl;
     return ret;
   }
 
-  ret = store->cls_obj_complete_del(*bs, optag, poolid, epoch, obj, removed_mtime, remove_objs, bilog_flags);
+  ret = store->cls_obj_complete_del(*bs, optag, poolid, epoch, obj, removed_mtime, remove_objs, bilog_flags, zones_trace);
 
-  int r = store->data_log->add_entry(bs->bucket, bs->shard_id);
-  if (r < 0) {
-    lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
+  if (target->bucket_info.datasync_flag_enabled()) {
+    int r = store->data_log->add_entry(bs->bucket, bs->shard_id);
+    if (r < 0) {
+      lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
+    }
   }
 
   return ret;
@@ -9590,22 +10201,21 @@ int RGWRados::Bucket::UpdateIndex::cancel()
   }
   RGWRados *store = target->get_store();
   BucketShard *bs;
-  int ret = get_bucket_shard(&bs);
-  if (ret < 0) {
-    ldout(store->ctx(), 5) << "failed to get BucketShard object: ret=" << ret << dendl;
-    return ret;
-  }
 
-  ret = store->cls_obj_complete_cancel(*bs, optag, obj, bilog_flags);
+  int ret = guard_reshard(&bs, [&](BucketShard *bs) -> int { 
+    return store->cls_obj_complete_cancel(*bs, optag, obj, bilog_flags, zones_trace);
+  });
 
   /*
    * need to update data log anyhow, so that whoever follows needs to update its internal markers
    * for following the specific bucket shard log. Otherwise they end up staying behind, and users
    * have no way to tell that they're all caught up
    */
-  int r = store->data_log->add_entry(bs->bucket, bs->shard_id);
-  if (r < 0) {
-    lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
+  if (target->bucket_info.datasync_flag_enabled()) {
+    int r = store->data_log->add_entry(bs->bucket, bs->shard_id);
+    if (r < 0) {
+      lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
+    }
   }
 
   return ret;
@@ -9616,7 +10226,6 @@ int RGWRados::Object::Read::read(int64_t ofs, int64_t end, bufferlist& bl)
   RGWRados *store = source->get_store();
   CephContext *cct = store->ctx();
 
-  std::string oid, key;
   rgw_raw_obj read_obj;
   uint64_t read_ofs = ofs;
   uint64_t len, read_len;
@@ -9711,8 +10320,7 @@ int RGWRados::Object::Read::read(int64_t ofs, int64_t end, bufferlist& bl)
 int RGWRados::SystemObject::Read::GetObjState::get_ref(RGWRados *store, rgw_raw_obj& obj, rgw_rados_ref **pref)
 {
   if (!has_ref) {
-    rgw_pool pool;
-    int r = store->get_raw_obj_ref(obj, &ref, &pool);
+    int r = store->get_raw_obj_ref(obj, &ref);
     if (r < 0) {
       return r;
     }
@@ -9727,7 +10335,8 @@ int RGWRados::get_system_obj(RGWObjectCtx& obj_ctx, RGWRados::SystemObject::Read
                              RGWObjVersionTracker *objv_tracker, rgw_raw_obj& obj,
                              bufferlist& bl, off_t ofs, off_t end,
                              map<string, bufferlist> *attrs,
-                             rgw_cache_entry_info *cache_info)
+                             rgw_cache_entry_info *cache_info,
+                            boost::optional<obj_version>)
 {
   uint64_t len;
   ObjectReadOperation op;
@@ -9774,12 +10383,16 @@ int RGWRados::get_system_obj(RGWObjectCtx& obj_ctx, RGWRados::SystemObject::Read
   return bl.length();
 }
 
-int RGWRados::SystemObject::Read::read(int64_t ofs, int64_t end, bufferlist& bl, RGWObjVersionTracker *objv_tracker)
+int RGWRados::SystemObject::Read::read(int64_t ofs, int64_t end, bufferlist& bl,
+                                      RGWObjVersionTracker *objv_tracker,
+                                      boost::optional<obj_version> refresh_version)
 {
   RGWRados *store = source->get_store();
   rgw_raw_obj& obj = source->get_obj();
 
-  return store->get_system_obj(source->get_ctx(), state, objv_tracker, obj, bl, ofs, end, read_params.attrs, read_params.cache_info);
+  return store->get_system_obj(source->get_ctx(), state, objv_tracker, obj, bl,
+                              ofs, end, read_params.attrs,
+                              read_params.cache_info, refresh_version);
 }
 
 int RGWRados::SystemObject::Read::get_attr(const char *name, bufferlist& dest)
@@ -10295,6 +10908,8 @@ int RGWRados::olh_init_modification_impl(const RGWBucketInfo& bucket_info, RGWOb
     op.create(true);
   } else {
     op.assert_exists();
+    struct timespec mtime_ts = real_clock::to_timespec(state.mtime);
+    op.mtime2(&mtime_ts);
   }
 
   /*
@@ -10394,12 +11009,59 @@ int RGWRados::olh_init_modification(const RGWBucketInfo& bucket_info, RGWObjStat
   return ret;
 }
 
+int RGWRados::guard_reshard(BucketShard *bs, const rgw_obj& obj_instance, std::function<int(BucketShard *)> call)
+{
+  rgw_obj obj;
+  const rgw_obj *pobj = &obj_instance;
+  int r;
+
+  for (int i = 0; i < NUM_RESHARD_RETRIES; ++i) {
+    r = bs->init(pobj->bucket, *pobj);
+    if (r < 0) {
+      ldout(cct, 5) << "bs.init() returned ret=" << r << dendl;
+      return r;
+    }
+    r = call(bs);
+    if (r != -ERR_BUSY_RESHARDING) {
+      break;
+    }
+    ldout(cct, 0) << "NOTICE: resharding operation on bucket index detected, blocking" << dendl;
+    string new_bucket_id;
+    r = block_while_resharding(bs, &new_bucket_id);
+    if (r == -ERR_BUSY_RESHARDING) {
+      continue;
+    }
+    if (r < 0) {
+      return r;
+    }
+    ldout(cct, 20) << "reshard completion identified, new_bucket_id=" << new_bucket_id << dendl;
+    i = 0; /* resharding is finished, make sure we can retry */
+
+    obj = *pobj;
+    obj.bucket.update_bucket_id(new_bucket_id);
+    pobj = &obj;
+  }
+
+  if (r < 0) {
+    return r;
+  }
+
+  return 0;
+}
+
+int RGWRados::block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id)
+{
+  std::shared_ptr<RGWReshardWait> waiter = reshard_wait;
+
+  return waiter->block_while_resharding(bs, new_bucket_id);
+}
+
 int RGWRados::bucket_index_link_olh(const RGWBucketInfo& bucket_info, RGWObjState& olh_state, const rgw_obj& obj_instance,
                                     bool delete_marker,
                                     const string& op_tag,
                                     struct rgw_bucket_dir_entry_meta *meta,
                                     uint64_t olh_epoch,
-                                    real_time unmod_since, bool high_precision_time)
+                                    real_time unmod_since, bool high_precision_time, rgw_zone_set *_zones_trace)
 {
   rgw_rados_ref ref;
   int r = get_obj_head_ref(bucket_info, obj_instance, &ref);
@@ -10407,19 +11069,27 @@ int RGWRados::bucket_index_link_olh(const RGWBucketInfo& bucket_info, RGWObjStat
     return r;
   }
 
-  BucketShard bs(this);
-  int ret = bs.init(obj_instance.bucket, obj_instance);
-  if (ret < 0) {
-    ldout(cct, 5) << "bs.init() returned ret=" << ret << dendl;
-    return ret;
+  rgw_zone_set zones_trace;
+  if (_zones_trace) {
+    zones_trace = *_zones_trace;
+  } else {
+    zones_trace.insert(get_zone().id);
   }
 
+  BucketShard bs(this);
+
   cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), obj_instance.key.instance);
-  ret = cls_rgw_bucket_link_olh(bs.index_ctx, bs.bucket_obj, key, olh_state.olh_tag, delete_marker, op_tag, meta, olh_epoch,
-                                unmod_since, high_precision_time,
-                                get_zone().log_data);
-  if (ret < 0) {
-    return ret;
+  r = guard_reshard(&bs, obj_instance, [&](BucketShard *bs) -> int { 
+                    librados::ObjectWriteOperation op;
+                    cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
+                    return cls_rgw_bucket_link_olh(bs->index_ctx, op,
+                                                   bs->bucket_obj, key, olh_state.olh_tag, delete_marker, op_tag, meta, olh_epoch,
+                                                   unmod_since, high_precision_time,
+                                                   get_zone().log_data, zones_trace);
+                    });
+  if (r < 0) {
+    ldout(cct, 20) << "cls_rgw_bucket_link_olh() returned r=" << r << dendl;
+    return r;
   }
 
   return 0;
@@ -10432,7 +11102,7 @@ void RGWRados::bucket_index_guard_olh_op(RGWObjState& olh_state, ObjectOperation
 }
 
 int RGWRados::bucket_index_unlink_instance(const RGWBucketInfo& bucket_info, const rgw_obj& obj_instance,
-                                           const string& op_tag, const string& olh_tag, uint64_t olh_epoch)
+                                           const string& op_tag, const string& olh_tag, uint64_t olh_epoch, rgw_zone_set *_zones_trace)
 {
   rgw_rados_ref ref;
   int r = get_obj_head_ref(bucket_info, obj_instance, &ref);
@@ -10440,17 +11110,24 @@ int RGWRados::bucket_index_unlink_instance(const RGWBucketInfo& bucket_info, con
     return r;
   }
 
-  BucketShard bs(this);
-  int ret = bs.init(obj_instance.bucket, obj_instance);
-  if (ret < 0) {
-    ldout(cct, 5) << "bs.init() returned ret=" << ret << dendl;
-    return ret;
+  rgw_zone_set zones_trace;
+  if (_zones_trace) {
+    zones_trace = *_zones_trace;
   }
+  zones_trace.insert(get_zone().id);
+
+  BucketShard bs(this);
 
   cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), obj_instance.key.instance);
-  ret = cls_rgw_bucket_unlink_instance(bs.index_ctx, bs.bucket_obj, key, op_tag, olh_tag, olh_epoch, get_zone().log_data);
-  if (ret < 0) {
-    return ret;
+  r = guard_reshard(&bs, obj_instance, [&](BucketShard *bs) -> int { 
+                    librados::ObjectWriteOperation op;
+                    cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
+                    return cls_rgw_bucket_unlink_instance(bs->index_ctx, op, bs->bucket_obj, key, op_tag,
+                                                          olh_tag, olh_epoch, get_zone().log_data, zones_trace);
+                    });
+  if (r < 0) {
+    ldout(cct, 20) << "cls_rgw_bucket_link_olh() returned r=" << r << dendl;
+    return r;
   }
 
   return 0;
@@ -10478,11 +11155,16 @@ int RGWRados::bucket_index_read_olh_log(const RGWBucketInfo& bucket_info, RGWObj
 
   cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), string());
 
-  ObjectReadOperation op;
-
-  ret = cls_rgw_get_olh_log(bs.index_ctx, bs.bucket_obj, op, key, ver_marker, olh_tag, log, is_truncated);
-  if (ret < 0)
+  ret = guard_reshard(&bs, obj_instance, [&](BucketShard *bs) -> int { 
+                      ObjectReadOperation op;
+                      cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
+                      return cls_rgw_get_olh_log(bs->index_ctx, bs->bucket_obj, op,
+                                                 key, ver_marker, olh_tag, log, is_truncated);
+                    });
+  if (ret < 0) {
+    ldout(cct, 20) << "cls_rgw_get_olh_log() returned r=" << r << dendl;
     return ret;
+  }
 
   return 0;
 }
@@ -10506,13 +11188,16 @@ int RGWRados::bucket_index_trim_olh_log(const RGWBucketInfo& bucket_info, RGWObj
 
   cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), string());
 
-  ObjectWriteOperation op;
-
-  cls_rgw_trim_olh_log(op, key, ver, olh_tag);
-
-  ret = bs.index_ctx.operate(bs.bucket_obj, &op);
-  if (ret < 0)
+  ret = guard_reshard(&bs, obj_instance, [&](BucketShard *pbs) -> int { 
+                      ObjectWriteOperation op;
+                      cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
+                      cls_rgw_trim_olh_log(op, key, ver, olh_tag);
+                      return pbs->index_ctx.operate(pbs->bucket_obj, &op);
+                      });
+  if (ret < 0) {
+    ldout(cct, 20) << "cls_rgw_trim_olh_log() returned r=" << ret << dendl;
     return ret;
+  }
 
   return 0;
 }
@@ -10526,17 +11211,16 @@ int RGWRados::bucket_index_clear_olh(const RGWBucketInfo& bucket_info, RGWObjSta
   }
 
   BucketShard bs(this);
-  int ret = bs.init(obj_instance.bucket, obj_instance);
-  if (ret < 0) {
-    ldout(cct, 5) << "bs.init() returned ret=" << ret << dendl;
-    return ret;
-  }
 
   string olh_tag(state.olh_tag.c_str(), state.olh_tag.length());
 
   cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), string());
 
-  ret = cls_rgw_clear_olh(bs.index_ctx, bs.bucket_obj, key, olh_tag);
+  int ret = guard_reshard(&bs, obj_instance, [&](BucketShard *pbs) -> int { 
+                          ObjectWriteOperation op;
+                          cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
+                          return cls_rgw_clear_olh(pbs->index_ctx, op, pbs->bucket_obj, key, olh_tag);
+                          });
   if (ret < 0) {
     ldout(cct, 5) << "cls_rgw_clear_olh() returned ret=" << ret << dendl;
     return ret;
@@ -10547,7 +11231,7 @@ int RGWRados::bucket_index_clear_olh(const RGWBucketInfo& bucket_info, RGWObjSta
 
 int RGWRados::apply_olh_log(RGWObjectCtx& obj_ctx, RGWObjState& state, const RGWBucketInfo& bucket_info, const rgw_obj& obj,
                             bufferlist& olh_tag, map<uint64_t, vector<rgw_bucket_olh_log_entry> >& log,
-                            uint64_t *plast_ver)
+                            uint64_t *plast_ver, rgw_zone_set* zones_trace)
 {
   if (log.empty()) {
     return 0;
@@ -10563,6 +11247,9 @@ int RGWRados::apply_olh_log(RGWObjectCtx& obj_ctx, RGWObjState& state, const RGW
   op.cmpxattr(RGW_ATTR_OLH_ID_TAG, CEPH_OSD_CMPXATTR_OP_EQ, olh_tag);
   op.cmpxattr(RGW_ATTR_OLH_VER, CEPH_OSD_CMPXATTR_OP_GT, last_ver);
 
+  struct timespec mtime_ts = real_clock::to_timespec(state.mtime);
+  op.mtime2(&mtime_ts);
+
   bool need_to_link = false;
   cls_rgw_obj_key key;
   bool delete_marker = false;
@@ -10624,7 +11311,7 @@ int RGWRados::apply_olh_log(RGWObjectCtx& obj_ctx, RGWObjState& state, const RGW
        liter != remove_instances.end(); ++liter) {
     cls_rgw_obj_key& key = *liter;
     rgw_obj obj_instance(bucket, key);
-    int ret = delete_obj(obj_ctx, bucket_info, obj_instance, 0, RGW_BILOG_FLAG_VERSIONED_OP);
+    int ret = delete_obj(obj_ctx, bucket_info, obj_instance, 0, RGW_BILOG_FLAG_VERSIONED_OP, ceph::real_time(), zones_trace);
     if (ret < 0 && ret != -ENOENT) {
       ldout(cct, 0) << "ERROR: delete_obj() returned " << ret << " obj_instance=" << obj_instance << dendl;
       return ret;
@@ -10676,7 +11363,7 @@ int RGWRados::apply_olh_log(RGWObjectCtx& obj_ctx, RGWObjState& state, const RGW
 /*
  * read olh log and apply it
  */
-int RGWRados::update_olh(RGWObjectCtx& obj_ctx, RGWObjState *state, const RGWBucketInfo& bucket_info, const rgw_obj& obj)
+int RGWRados::update_olh(RGWObjectCtx& obj_ctx, RGWObjState *state, const RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_zone_set *zones_trace)
 {
   map<uint64_t, vector<rgw_bucket_olh_log_entry> > log;
   bool is_truncated;
@@ -10687,7 +11374,7 @@ int RGWRados::update_olh(RGWObjectCtx& obj_ctx, RGWObjState *state, const RGWBuc
     if (ret < 0) {
       return ret;
     }
-    ret = apply_olh_log(obj_ctx, *state, bucket_info, obj, state->olh_tag, log, &ver_marker);
+    ret = apply_olh_log(obj_ctx, *state, bucket_info, obj, state->olh_tag, log, &ver_marker, zones_trace);
     if (ret < 0) {
       return ret;
     }
@@ -10697,7 +11384,7 @@ int RGWRados::update_olh(RGWObjectCtx& obj_ctx, RGWObjState *state, const RGWBuc
 }
 
 int RGWRados::set_olh(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const rgw_obj& target_obj, bool delete_marker, rgw_bucket_dir_entry_meta *meta,
-                      uint64_t olh_epoch, real_time unmod_since, bool high_precision_time)
+                      uint64_t olh_epoch, real_time unmod_since, bool high_precision_time, rgw_zone_set *zones_trace)
 {
   string op_tag;
 
@@ -10708,7 +11395,7 @@ int RGWRados::set_olh(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const r
 
   int ret = 0;
   int i;
-
+  
 #define MAX_ECANCELED_RETRY 100
   for (i = 0; i < MAX_ECANCELED_RETRY; i++) {
     if (ret == -ECANCELED) {
@@ -10728,7 +11415,7 @@ int RGWRados::set_olh(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const r
       }
       return ret;
     }
-    ret = bucket_index_link_olh(bucket_info, *state, target_obj, delete_marker, op_tag, meta, olh_epoch, unmod_since, high_precision_time);
+    ret = bucket_index_link_olh(bucket_info, *state, target_obj, delete_marker, op_tag, meta, olh_epoch, unmod_since, high_precision_time, zones_trace);
     if (ret < 0) {
       ldout(cct, 20) << "bucket_index_link_olh() target_obj=" << target_obj << " delete_marker=" << (int)delete_marker << " returned " << ret << dendl;
       if (ret == -ECANCELED) {
@@ -10757,7 +11444,7 @@ int RGWRados::set_olh(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const r
 }
 
 int RGWRados::unlink_obj_instance(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const rgw_obj& target_obj,
-                                  uint64_t olh_epoch)
+                                  uint64_t olh_epoch, rgw_zone_set *zones_trace)
 {
   string op_tag;
 
@@ -10789,7 +11476,7 @@ int RGWRados::unlink_obj_instance(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_i
 
     string olh_tag(state->olh_tag.c_str(), state->olh_tag.length());
 
-    ret = bucket_index_unlink_instance(bucket_info, target_obj, op_tag, olh_tag, olh_epoch);
+    ret = bucket_index_unlink_instance(bucket_info, target_obj, op_tag, olh_tag, olh_epoch, zones_trace);
     if (ret < 0) {
       ldout(cct, 20) << "bucket_index_unlink_instance() target_obj=" << target_obj << " returned " << ret << dendl;
       if (ret == -ECANCELED) {
@@ -10805,7 +11492,7 @@ int RGWRados::unlink_obj_instance(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_i
     return -EIO;
   }
 
-  ret = update_olh(obj_ctx, state, bucket_info, olh_obj);
+  ret = update_olh(obj_ctx, state, bucket_info, olh_obj, zones_trace);
   if (ret == -ECANCELED) { /* already did what we needed, no need to retry, raced with another user */
     return 0;
   }
@@ -11028,7 +11715,7 @@ int RGWRados::raw_obj_stat(rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime,
 }
 
 int RGWRados::get_bucket_stats(RGWBucketInfo& bucket_info, int shard_id, string *bucket_ver, string *master_ver,
-    map<RGWObjCategory, RGWStorageStats>& stats, string *max_marker)
+    map<RGWObjCategory, RGWStorageStats>& stats, string *max_marker, bool *syncstopped)
 {
   map<string, rgw_bucket_dir_header> headers;
   map<int, string> bucket_instance_ids;
@@ -11044,7 +11731,6 @@ int RGWRados::get_bucket_stats(RGWBucketInfo& bucket_info, int shard_id, string
   BucketIndexShardsManager ver_mgr;
   BucketIndexShardsManager master_ver_mgr;
   BucketIndexShardsManager marker_mgr;
-  string shard_marker;
   char buf[64];
   for(; iter != headers.end(); ++iter, ++viter) {
     accumulate_raw_stats(iter->second, stats);
@@ -11057,6 +11743,8 @@ int RGWRados::get_bucket_stats(RGWBucketInfo& bucket_info, int shard_id, string
     } else {
       marker_mgr.add(viter->first, iter->second.max_marker);
     }
+    if (syncstopped != NULL)
+      *syncstopped = iter->second.syncstopped;
   }
   ver_mgr.to_string(bucket_ver);
   master_ver_mgr.to_string(master_ver);
@@ -11132,16 +11820,16 @@ public:
 int RGWRados::get_bucket_stats_async(RGWBucketInfo& bucket_info, int shard_id, RGWGetBucketStats_CB *ctx)
 {
   int num_aio = 0;
-  RGWGetBucketStatsContext *get_ctx = new RGWGetBucketStatsContext(ctx, bucket_info.num_shards);
+  RGWGetBucketStatsContext *get_ctx = new RGWGetBucketStatsContext(ctx, bucket_info.num_shards ? : 1);
   assert(get_ctx);
   int r = cls_bucket_head_async(bucket_info, shard_id, get_ctx, &num_aio);
-  get_ctx->put();
   if (r < 0) {
     ctx->put();
     if (num_aio) {
       get_ctx->unset_cb();
     }
   }
+  get_ctx->put();
   return r;
 }
 
@@ -11245,15 +11933,18 @@ int RGWRados::get_bucket_instance_info(RGWObjectCtx& obj_ctx, const rgw_bucket&
   return get_bucket_instance_from_oid(obj_ctx, oid, info, pmtime, pattrs);
 }
 
-int RGWRados::get_bucket_instance_from_oid(RGWObjectCtx& obj_ctx, string& oid, RGWBucketInfo& info,
+int RGWRados::get_bucket_instance_from_oid(RGWObjectCtx& obj_ctx, const string& oid, RGWBucketInfo& info,
                                            real_time *pmtime, map<string, bufferlist> *pattrs,
-                                           rgw_cache_entry_info *cache_info)
+                                           rgw_cache_entry_info *cache_info,
+                                          boost::optional<obj_version> refresh_version)
 {
   ldout(cct, 20) << "reading from " << get_zone_params().domain_root << ":" << oid << dendl;
 
   bufferlist epbl;
 
-  int ret = rgw_get_system_obj(this, obj_ctx, get_zone_params().domain_root, oid, epbl, &info.objv_tracker, pmtime, pattrs, cache_info);
+  int ret = rgw_get_system_obj(this, obj_ctx, get_zone_params().domain_root,
+                              oid, epbl, &info.objv_tracker, pmtime, pattrs,
+                              cache_info, refresh_version);
   if (ret < 0) {
     return ret;
   }
@@ -11276,13 +11967,16 @@ int RGWRados::get_bucket_entrypoint_info(RGWObjectCtx& obj_ctx,
                                          RGWObjVersionTracker *objv_tracker,
                                          real_time *pmtime,
                                          map<string, bufferlist> *pattrs,
-                                         rgw_cache_entry_info *cache_info)
+                                         rgw_cache_entry_info *cache_info,
+                                        boost::optional<obj_version> refresh_version)
 {
   bufferlist bl;
   string bucket_entry;
 
   rgw_make_bucket_entry_name(tenant_name, bucket_name, bucket_entry);
-  int ret = rgw_get_system_obj(this, obj_ctx, get_zone_params().domain_root, bucket_entry, bl, objv_tracker, pmtime, pattrs, cache_info);
+  int ret = rgw_get_system_obj(this, obj_ctx, get_zone_params().domain_root,
+                              bucket_entry, bl, objv_tracker, pmtime, pattrs,
+                              cache_info, refresh_version);
   if (ret < 0) {
     return ret;
   }
@@ -11335,15 +12029,27 @@ int RGWRados::convert_old_bucket_info(RGWObjectCtx& obj_ctx,
   return 0;
 }
 
-int RGWRados::get_bucket_info(RGWObjectCtx& obj_ctx,
-                              const string& tenant, const string& bucket_name, RGWBucketInfo& info,
-                              real_time *pmtime, map<string, bufferlist> *pattrs)
+int RGWRados::_get_bucket_info(RGWObjectCtx& obj_ctx,
+                               const string& tenant,
+                               const string& bucket_name,
+                               RGWBucketInfo& info,
+                               real_time *pmtime,
+                               map<string, bufferlist> *pattrs,
+                               boost::optional<obj_version> refresh_version)
 {
   bucket_info_entry e;
   string bucket_entry;
   rgw_make_bucket_entry_name(tenant, bucket_name, bucket_entry);
 
+
   if (binfo_cache->find(bucket_entry, &e)) {
+    if (refresh_version &&
+        e.info.objv_tracker.read_version.compare(&(*refresh_version))) {
+      lderr(cct) << "WARNING: The bucket info cache is inconsistent. This is "
+                 << "a failure that should be debugged. I am a nice machine, "
+                 << "so I will try to recover." << dendl;
+      binfo_cache->invalidate(bucket_entry);
+    }
     info = e.info;
     if (pattrs)
       *pattrs = e.attrs;
@@ -11356,7 +12062,9 @@ int RGWRados::get_bucket_info(RGWObjectCtx& obj_ctx,
   real_time ep_mtime;
   RGWObjVersionTracker ot;
   rgw_cache_entry_info entry_cache_info;
-  int ret = get_bucket_entrypoint_info(obj_ctx, tenant, bucket_name, entry_point, &ot, &ep_mtime, pattrs, &entry_cache_info);
+  int ret = get_bucket_entrypoint_info(obj_ctx, tenant, bucket_name,
+                                      entry_point, &ot, &ep_mtime, pattrs,
+                                      &entry_cache_info, refresh_version);
   if (ret < 0) {
     /* only init these fields */
     info.bucket.tenant = tenant;
@@ -11390,10 +12098,12 @@ int RGWRados::get_bucket_info(RGWObjectCtx& obj_ctx,
 
   rgw_cache_entry_info cache_info;
 
-  ret = get_bucket_instance_from_oid(obj_ctx, oid, e.info, &e.mtime, &e.attrs, &cache_info);
+  ret = get_bucket_instance_from_oid(obj_ctx, oid, e.info, &e.mtime, &e.attrs,
+                                    &cache_info, refresh_version);
   e.info.ep_objv = ot.read_version;
   info = e.info;
   if (ret < 0) {
+    lderr(cct) << "ERROR: get_bucket_instance_from_oid failed: " << ret << dendl;
     info.bucket.tenant = tenant;
     info.bucket.name = bucket_name;
     // XXX and why return anything in case of an error anyway?
@@ -11415,9 +12125,35 @@ int RGWRados::get_bucket_info(RGWObjectCtx& obj_ctx,
     ldout(cct, 20) << "couldn't put binfo cache entry, might have raced with data changes" << dendl;
   }
 
+  if (refresh_version &&
+      refresh_version->compare(&info.objv_tracker.read_version)) {
+    lderr(cct) << "WARNING: The OSD has the same version I have. Something may "
+               << "have gone squirrelly. An administrator may have forced a "
+               << "change; otherwise there is a problem somewhere." << dendl;
+  }
+
   return 0;
 }
 
+int RGWRados::get_bucket_info(RGWObjectCtx& obj_ctx,
+                              const string& tenant, const string& bucket_name,
+                              RGWBucketInfo& info,
+                              real_time *pmtime, map<string, bufferlist> *pattrs)
+{
+  return _get_bucket_info(obj_ctx, tenant, bucket_name, info, pmtime,
+                          pattrs, boost::none);
+}
+
+int RGWRados::try_refresh_bucket_info(RGWBucketInfo& info,
+                                      ceph::real_time *pmtime,
+                                      map<string, bufferlist> *pattrs)
+{
+  RGWObjectCtx obj_ctx(this);
+
+  return _get_bucket_info(obj_ctx, info.bucket.tenant, info.bucket.name,
+                          info, pmtime, pattrs, info.objv_tracker.read_version);
+}
+
 int RGWRados::put_bucket_entrypoint_info(const string& tenant_name, const string& bucket_name, RGWBucketEntryPoint& entry_point,
                                          bool exclusive, RGWObjVersionTracker& objv_tracker, real_time mtime,
                                          map<string, bufferlist> *pattrs)
@@ -11611,6 +12347,10 @@ int RGWRados::update_containers_stats(map<string, RGWBucketEnt>& m)
         ent.size_rounded += stats.total_size_rounded;
       }
     }
+
+    // fill in placement_rule from the bucket instance for use in swift's
+    // per-storage policy statistics
+    ent.placement_rule = std::move(bucket_info.placement_rule);
   }
 
   return m.size();
@@ -11662,6 +12402,31 @@ int RGWRados::pool_iterate_begin(const rgw_pool& pool, RGWPoolIterCtx& ctx)
   return 0;
 }
 
+int RGWRados::pool_iterate_begin(const rgw_pool& pool, const string& cursor, RGWPoolIterCtx& ctx)
+{
+  librados::IoCtx& io_ctx = ctx.io_ctx;
+  librados::NObjectIterator& iter = ctx.iter;
+
+  int r = open_pool_ctx(pool, io_ctx);
+  if (r < 0)
+    return r;
+
+  librados::ObjectCursor oc;
+  if (!oc.from_str(cursor)) {
+    ldout(cct, 10) << "failed to parse cursor: " << cursor << dendl;
+    return -EINVAL;
+  }
+
+  iter = io_ctx.nobjects_begin(oc);
+
+  return 0;
+}
+
+string RGWRados::pool_iterate_get_cursor(RGWPoolIterCtx& ctx)
+{
+  return ctx.iter.get_cursor().to_str();
+}
+
 int RGWRados::pool_iterate(RGWPoolIterCtx& ctx, uint32_t num, vector<rgw_bucket_dir_entry>& objs,
                            bool *is_truncated, RGWAccessListFilter *filter)
 {
@@ -11701,21 +12466,27 @@ struct RGWAccessListFilterPrefix : public RGWAccessListFilter {
   }
 };
 
-int RGWRados::list_raw_objects(const rgw_pool& pool, const string& prefix_filter,
-                              int max, RGWListRawObjsCtx& ctx, list<string>& oids,
-                              bool *is_truncated)
+int RGWRados::list_raw_objects_init(const rgw_pool& pool, const string& marker, RGWListRawObjsCtx *ctx)
 {
-  RGWAccessListFilterPrefix filter(prefix_filter);
-
-  if (!ctx.initialized) {
-    int r = pool_iterate_begin(pool, ctx.iter_ctx);
+  if (!ctx->initialized) {
+    int r = pool_iterate_begin(pool, marker, ctx->iter_ctx);
     if (r < 0) {
       ldout(cct, 10) << "failed to list objects pool_iterate_begin() returned r=" << r << dendl;
       return r;
     }
-    ctx.initialized = true;
+    ctx->initialized = true;
   }
+  return 0;
+}
 
+int RGWRados::list_raw_objects_next(const string& prefix_filter, int max,
+                                    RGWListRawObjsCtx& ctx, list<string>& oids,
+                                    bool *is_truncated)
+{
+  if (!ctx.initialized) {
+    return -EINVAL;
+  }
+  RGWAccessListFilterPrefix filter(prefix_filter);
   vector<rgw_bucket_dir_entry> objs;
   int r = pool_iterate(ctx.iter_ctx, max, objs, is_truncated, &filter);
   if (r < 0) {
@@ -11732,6 +12503,25 @@ int RGWRados::list_raw_objects(const rgw_pool& pool, const string& prefix_filter
   return oids.size();
 }
 
+int RGWRados::list_raw_objects(const rgw_pool& pool, const string& prefix_filter,
+                              int max, RGWListRawObjsCtx& ctx, list<string>& oids,
+                              bool *is_truncated)
+{
+  if (!ctx.initialized) {
+    int r = list_raw_objects_init(pool, string(), &ctx);
+    if (r < 0) {
+      return r;
+    }
+  }
+
+  return list_raw_objects_next(prefix_filter, max, ctx, oids, is_truncated);
+}
+
+string RGWRados::list_raw_objs_get_cursor(RGWListRawObjsCtx& ctx)
+{
+  return pool_iterate_get_cursor(ctx.iter_ctx);
+}
+
 int RGWRados::list_bi_log_entries(RGWBucketInfo& bucket_info, int shard_id, string& marker, uint32_t max,
                                   std::list<rgw_bi_log_entry>& result, bool *truncated)
 {
@@ -11835,21 +12625,51 @@ int RGWRados::trim_bi_log_entries(RGWBucketInfo& bucket_info, int shard_id, stri
 {
   librados::IoCtx index_ctx;
   map<int, string> bucket_objs;
+
+  BucketIndexShardsManager start_marker_mgr;
+  BucketIndexShardsManager end_marker_mgr;
+
   int r = open_bucket_index(bucket_info, index_ctx, bucket_objs, shard_id);
-  if (r < 0)
+  if (r < 0) {
     return r;
+  }
 
-  BucketIndexShardsManager start_marker_mgr;
   r = start_marker_mgr.from_string(start_marker, shard_id);
-  if (r < 0)
+  if (r < 0) {
     return r;
-  BucketIndexShardsManager end_marker_mgr;
+  }
+
   r = end_marker_mgr.from_string(end_marker, shard_id);
-  if (r < 0)
+  if (r < 0) {
     return r;
+  }
 
   return CLSRGWIssueBILogTrim(index_ctx, start_marker_mgr, end_marker_mgr, bucket_objs,
-      cct->_conf->rgw_bucket_index_max_aio)();
+                             cct->_conf->rgw_bucket_index_max_aio)();
+
+  return r;
+}
+
+int RGWRados::resync_bi_log_entries(RGWBucketInfo& bucket_info, int shard_id)
+{
+  librados::IoCtx index_ctx;
+  map<int, string> bucket_objs;
+  int r = open_bucket_index(bucket_info, index_ctx, bucket_objs, shard_id);
+  if (r < 0)
+    return r;
+
+  return CLSRGWIssueResyncBucketBILog(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
+}
+
+int RGWRados::stop_bi_log_entries(RGWBucketInfo& bucket_info, int shard_id)
+{
+  librados::IoCtx index_ctx;
+  map<int, string> bucket_objs;
+  int r = open_bucket_index(bucket_info, index_ctx, bucket_objs, shard_id);
+  if (r < 0)
+    return r;
+
+  return CLSRGWIssueBucketBILogStop(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
 }
 
 int RGWRados::bi_get_instance(const RGWBucketInfo& bucket_info, rgw_obj& obj, rgw_bucket_dir_entry *dirent)
@@ -11934,6 +12754,9 @@ int RGWRados::bi_list(rgw_bucket& bucket, const string& obj_name, const string&
   }
 
   ret = cls_rgw_bi_list(bs.index_ctx, bs.bucket_obj, obj_name, marker, max, entries, is_truncated);
+  if (ret == -ENOENT) {
+    *is_truncated = false;
+  }
   if (ret < 0)
     return ret;
 
@@ -12027,29 +12850,28 @@ int RGWRados::cls_rgw_init_index(librados::IoCtx& index_ctx, librados::ObjectWri
 }
 
 int RGWRados::cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag,
-                                 rgw_obj& obj, uint16_t bilog_flags)
+                                 rgw_obj& obj, uint16_t bilog_flags, rgw_zone_set *_zones_trace)
 {
+  rgw_zone_set zones_trace;
+  if (_zones_trace) {
+    zones_trace = *_zones_trace;
+  }
+  else {
+    zones_trace.insert(get_zone().id);
+  }
+  
   ObjectWriteOperation o;
   cls_rgw_obj_key key(obj.key.get_index_key_name(), obj.key.instance);
-  cls_rgw_bucket_prepare_op(o, op, tag, key, obj.key.get_loc(), get_zone().log_data, bilog_flags);
+  cls_rgw_guard_bucket_resharding(o, -ERR_BUSY_RESHARDING);
+  cls_rgw_bucket_prepare_op(o, op, tag, key, obj.key.get_loc(), get_zone().log_data, bilog_flags, zones_trace);
   return bs.index_ctx.operate(bs.bucket_obj, &o);
 }
 
-int RGWRados::cls_obj_complete_op(BucketShard& bs, RGWModifyOp op, string& tag,
+int RGWRados::cls_obj_complete_op(BucketShard& bs, const rgw_obj& obj, RGWModifyOp op, string& tag,
                                   int64_t pool, uint64_t epoch,
                                   rgw_bucket_dir_entry& ent, RGWObjCategory category,
-                                 list<rgw_obj_index_key> *remove_objs, uint16_t bilog_flags)
+                                 list<rgw_obj_index_key> *remove_objs, uint16_t bilog_flags, rgw_zone_set *_zones_trace)
 {
-  list<cls_rgw_obj_key> *pro = NULL;
-  list<cls_rgw_obj_key> ro;
-
-  if (remove_objs) {
-    for (auto iter = remove_objs->begin(); iter != remove_objs->end(); ++iter) {
-      ro.push_back(*iter);
-    }
-    pro = &ro;
-  }
-
   ObjectWriteOperation o;
   rgw_bucket_dir_entry_meta dir_meta;
   dir_meta = ent.meta;
@@ -12059,21 +12881,24 @@ int RGWRados::cls_obj_complete_op(BucketShard& bs, RGWModifyOp op, string& tag,
   ver.pool = pool;
   ver.epoch = epoch;
   cls_rgw_obj_key key(ent.key.name, ent.key.instance);
-  cls_rgw_bucket_complete_op(o, op, tag, ver, key, dir_meta, pro,
-                             get_zone().log_data, bilog_flags);
-
-  AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
-  int ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &o);
-  c->release();
+  cls_rgw_guard_bucket_resharding(o, -ERR_BUSY_RESHARDING);
+  cls_rgw_bucket_complete_op(o, op, tag, ver, key, dir_meta, remove_objs,
+                             get_zone().log_data, bilog_flags, _zones_trace);
+  complete_op_data *arg;
+  index_completion_manager->create_completion(obj, op, tag, ver, key, dir_meta, remove_objs,
+                                              get_zone().log_data, bilog_flags, _zones_trace, &arg);
+  librados::AioCompletion *completion = arg->rados_completion;
+  int ret = bs.index_ctx.aio_operate(bs.bucket_obj, arg->rados_completion, &o);
+  completion->release(); /* can't reference arg here, as it might have already been released */
   return ret;
 }
 
-int RGWRados::cls_obj_complete_add(BucketShard& bs, string& tag,
+int RGWRados::cls_obj_complete_add(BucketShard& bs, const rgw_obj& obj, string& tag,
                                    int64_t pool, uint64_t epoch,
                                    rgw_bucket_dir_entry& ent, RGWObjCategory category,
-                                   list<rgw_obj_index_key> *remove_objs, uint16_t bilog_flags)
+                                   list<rgw_obj_index_key> *remove_objs, uint16_t bilog_flags, rgw_zone_set *zones_trace)
 {
-  return cls_obj_complete_op(bs, CLS_RGW_OP_ADD, tag, pool, epoch, ent, category, remove_objs, bilog_flags);
+  return cls_obj_complete_op(bs, obj, CLS_RGW_OP_ADD, tag, pool, epoch, ent, category, remove_objs, bilog_flags, zones_trace);
 }
 
 int RGWRados::cls_obj_complete_del(BucketShard& bs, string& tag,
@@ -12081,19 +12906,20 @@ int RGWRados::cls_obj_complete_del(BucketShard& bs, string& tag,
                                    rgw_obj& obj,
                                    real_time& removed_mtime,
                                    list<rgw_obj_index_key> *remove_objs,
-                                   uint16_t bilog_flags)
+                                   uint16_t bilog_flags,
+                                   rgw_zone_set *zones_trace)
 {
   rgw_bucket_dir_entry ent;
   ent.meta.mtime = removed_mtime;
   obj.key.get_index_key(&ent.key);
-  return cls_obj_complete_op(bs, CLS_RGW_OP_DEL, tag, pool, epoch, ent, RGW_OBJ_CATEGORY_NONE, remove_objs, bilog_flags);
+  return cls_obj_complete_op(bs, obj, CLS_RGW_OP_DEL, tag, pool, epoch, ent, RGW_OBJ_CATEGORY_NONE, remove_objs, bilog_flags, zones_trace);
 }
 
-int RGWRados::cls_obj_complete_cancel(BucketShard& bs, string& tag, rgw_obj& obj, uint16_t bilog_flags)
+int RGWRados::cls_obj_complete_cancel(BucketShard& bs, string& tag, rgw_obj& obj, uint16_t bilog_flags, rgw_zone_set *zones_trace)
 {
   rgw_bucket_dir_entry ent;
   obj.key.get_index_key(&ent.key);
-  return cls_obj_complete_op(bs, CLS_RGW_OP_CANCEL, tag, -1 /* pool id */, 0, ent, RGW_OBJ_CATEGORY_NONE, NULL, bilog_flags);
+  return cls_obj_complete_op(bs, obj, CLS_RGW_OP_CANCEL, tag, -1 /* pool id */, 0, ent, RGW_OBJ_CATEGORY_NONE, NULL, bilog_flags, zones_trace);
 }
 
 int RGWRados::cls_obj_set_bucket_tag_timeout(RGWBucketInfo& bucket_info, uint64_t timeout)
@@ -12161,8 +12987,11 @@ int RGWRados::cls_bucket_list(RGWBucketInfo& bucket_info, int shard_id, rgw_obj_
     const string& name = vcurrents[pos]->first;
     struct rgw_bucket_dir_entry& dirent = vcurrents[pos]->second;
 
-    bool force_check = force_check_filter && force_check_filter(dirent.key.name);
-    if ((!dirent.exists && !dirent.is_delete_marker()) || !dirent.pending_map.empty() || force_check) {
+    bool force_check = force_check_filter &&
+        force_check_filter(dirent.key.name);
+    if ((!dirent.exists && !dirent.is_delete_marker()) ||
+        !dirent.pending_map.empty() ||
+        force_check) {
       /* there are uncommitted ops. We need to check the current state,
        * and if the tags are old we need to do cleanup as well. */
       librados::IoCtx sub_ctx;
@@ -12215,8 +13044,7 @@ int RGWRados::cls_obj_usage_log_add(const string& oid, rgw_usage_log_info& info)
   rgw_raw_obj obj(get_zone_params().usage_log_pool, oid);
 
   rgw_rados_ref ref;
-  rgw_pool pool;
-  int r = get_raw_obj_ref(obj, &ref, &pool);
+  int r = get_raw_obj_ref(obj, &ref);
   if (r < 0) {
     return r;
   }
@@ -12234,8 +13062,7 @@ int RGWRados::cls_obj_usage_log_read(string& oid, string& user, uint64_t start_e
   rgw_raw_obj obj(get_zone_params().usage_log_pool, oid);
 
   rgw_rados_ref ref;
-  rgw_pool pool;
-  int r = get_raw_obj_ref(obj, &ref, &pool);
+  int r = get_raw_obj_ref(obj, &ref);
   if (r < 0) {
     return r;
   }
@@ -12253,16 +13080,12 @@ int RGWRados::cls_obj_usage_log_trim(string& oid, string& user, uint64_t start_e
   rgw_raw_obj obj(get_zone_params().usage_log_pool, oid);
 
   rgw_rados_ref ref;
-  rgw_pool pool;
-  int r = get_raw_obj_ref(obj, &ref, &pool);
+  int r = get_raw_obj_ref(obj, &ref);
   if (r < 0) {
     return r;
   }
 
-  ObjectWriteOperation op;
-  cls_rgw_usage_log_trim(op, user, start_epoch, end_epoch);
-
-  r = ref.ioctx.operate(ref.oid, &op);
+  r = cls_rgw_usage_log_trim(ref.ioctx, ref.oid, user, start_epoch, end_epoch);
   return r;
 }
 
@@ -12453,8 +13276,7 @@ int RGWRados::cls_user_get_header(const string& user_id, cls_user_header *header
   rgw_raw_obj obj(get_zone_params().user_uid_pool, buckets_obj_id);
 
   rgw_rados_ref ref;
-  rgw_pool pool;
-  int r = get_raw_obj_ref(obj, &ref, &pool);
+  int r = get_raw_obj_ref(obj, &ref);
   if (r < 0) {
     return r;
   }
@@ -12472,6 +13294,23 @@ int RGWRados::cls_user_get_header(const string& user_id, cls_user_header *header
   return 0;
 }
 
+int RGWRados::cls_user_reset_stats(const string& user_id)
+{
+  string buckets_obj_id;
+  rgw_get_buckets_obj(user_id, buckets_obj_id);
+  rgw_raw_obj obj(get_zone_params().user_uid_pool, buckets_obj_id);
+
+  rgw_rados_ref ref;
+  int r = get_raw_obj_ref(obj, &ref);
+  if (r < 0) {
+    return r;
+  }
+
+  librados::ObjectWriteOperation op;
+  ::cls_user_reset_stats(op);
+  return ref.ioctx.operate(ref.oid, &op);
+}
+
 int RGWRados::cls_user_get_header_async(const string& user_id, RGWGetUserHeader_CB *ctx)
 {
   string buckets_obj_id;
@@ -12479,8 +13318,7 @@ int RGWRados::cls_user_get_header_async(const string& user_id, RGWGetUserHeader_
   rgw_raw_obj obj(get_zone_params().user_uid_pool, buckets_obj_id);
 
   rgw_rados_ref ref;
-  rgw_pool pool;
-  int r = get_raw_obj_ref(obj, &ref, &pool);
+  int r = get_raw_obj_ref(obj, &ref);
   if (r < 0) {
     return r;
   }
@@ -12505,11 +13343,9 @@ int RGWRados::cls_user_sync_bucket_stats(rgw_raw_obj& user_obj, const RGWBucketI
 
   bucket_info.bucket.convert(&entry.bucket);
 
-  map<string, struct rgw_bucket_dir_header>::iterator hiter = headers.begin();
-  for (; hiter != headers.end(); ++hiter) {
-    map<uint8_t, struct rgw_bucket_category_stats>::iterator iter = hiter->second.stats.begin();
-    for (; iter != hiter->second.stats.end(); ++iter) {
-      struct rgw_bucket_category_stats& header_stats = iter->second;
+  for (const auto& hiter : headers) {
+    for (const auto& iter : hiter.second.stats) {
+      const struct rgw_bucket_category_stats& header_stats = iter.second;
       entry.size += header_stats.total_size;
       entry.size_rounded += header_stats.total_size_rounded;
       entry.count += header_stats.num_entries;
@@ -12528,6 +13364,36 @@ int RGWRados::cls_user_sync_bucket_stats(rgw_raw_obj& user_obj, const RGWBucketI
   return 0;
 }
 
+int RGWRados::cls_user_get_bucket_stats(const rgw_bucket& bucket, cls_user_bucket_entry& entry)
+{
+  map<string, struct rgw_bucket_dir_header> headers;
+  RGWBucketInfo bucket_info;
+  RGWObjectCtx obj_ctx(this);
+  int ret = get_bucket_instance_info(obj_ctx, bucket, bucket_info, NULL, NULL);
+  if (ret < 0) {
+    return ret;
+  }
+
+  ret = cls_bucket_head(bucket_info, RGW_NO_SHARD, headers);
+  if (ret < 0) {
+    ldout(cct, 20) << "cls_bucket_header() returned " << ret << dendl;
+    return ret;
+  }
+
+  bucket.convert(&entry.bucket);
+
+  for (const auto& hiter : headers) {
+    for (const auto& iter : hiter.second.stats) {
+      const struct rgw_bucket_category_stats& header_stats = iter.second;
+      entry.size += header_stats.total_size;
+      entry.size_rounded += header_stats.total_size_rounded;
+      entry.count += header_stats.num_entries;
+    }
+  }
+
+  return 0;
+}
+
 int RGWRados::cls_user_list_buckets(rgw_raw_obj& obj,
                                     const string& in_marker,
                                     const string& end_marker,
@@ -12537,8 +13403,7 @@ int RGWRados::cls_user_list_buckets(rgw_raw_obj& obj,
                                     bool * const truncated)
 {
   rgw_rados_ref ref;
-  rgw_pool pool;
-  int r = get_raw_obj_ref(obj, &ref, &pool);
+  int r = get_raw_obj_ref(obj, &ref);
   if (r < 0) {
     return r;
   }
@@ -12560,8 +13425,7 @@ int RGWRados::cls_user_list_buckets(rgw_raw_obj& obj,
 int RGWRados::cls_user_update_buckets(rgw_raw_obj& obj, list<cls_user_bucket_entry>& entries, bool add)
 {
   rgw_rados_ref ref;
-  rgw_pool pool;
-  int r = get_raw_obj_ref(obj, &ref, &pool);
+  int r = get_raw_obj_ref(obj, &ref);
   if (r < 0) {
     return r;
   }
@@ -12586,8 +13450,7 @@ int RGWRados::complete_sync_user_stats(const rgw_user& user_id)
 int RGWRados::cls_user_complete_stats_sync(rgw_raw_obj& obj)
 {
   rgw_rados_ref ref;
-  rgw_pool pool;
-  int r = get_raw_obj_ref(obj, &ref, &pool);
+  int r = get_raw_obj_ref(obj, &ref);
   if (r < 0) {
     return r;
   }
@@ -12611,9 +13474,8 @@ int RGWRados::cls_user_add_bucket(rgw_raw_obj& obj, const cls_user_bucket_entry&
 
 int RGWRados::cls_user_remove_bucket(rgw_raw_obj& obj, const cls_user_bucket& bucket)
 {
-  rgw_pool p;
   rgw_rados_ref ref;
-  int r = get_system_obj_ref(obj, &ref, &p);
+  int r = get_system_obj_ref(obj, &ref);
   if (r < 0) {
     return r;
   }
@@ -12627,6 +13489,57 @@ int RGWRados::cls_user_remove_bucket(rgw_raw_obj& obj, const cls_user_bucket& bu
   return 0;
 }
 
+int RGWRados::check_bucket_shards(const RGWBucketInfo& bucket_info, const rgw_bucket& bucket,
+                                 RGWQuotaInfo& bucket_quota)
+{
+  if (!cct->_conf->rgw_dynamic_resharding) {
+      return 0;
+  }
+
+  bool need_resharding = false;
+  int num_source_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
+  uint32_t suggested_num_shards;
+
+  int ret =  quota_handler->check_bucket_shards((uint64_t)cct->_conf->rgw_max_objs_per_shard,
+                                               num_source_shards,  bucket_info.owner, bucket, bucket_quota,
+                                               1, need_resharding, &suggested_num_shards);
+  if (ret < 0) {
+    return ret;
+  }
+
+  if (need_resharding) {
+    ldout(cct, 20) << __func__ << " bucket " << bucket.name << " need resharding " <<
+      " old num shards " << bucket_info.num_shards << " new num shards " << suggested_num_shards <<
+      dendl;
+    return add_bucket_to_reshard(bucket_info, suggested_num_shards);
+  }
+
+  return ret;
+}
+
+int RGWRados::add_bucket_to_reshard(const RGWBucketInfo& bucket_info, uint32_t new_num_shards)
+{
+  RGWReshard reshard(this);
+
+  uint32_t num_source_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
+
+  new_num_shards = min(new_num_shards, get_max_bucket_shards());
+  if (new_num_shards <= num_source_shards) {
+    ldout(cct, 20) << "not resharding bucket name=" << bucket_info.bucket.name << ", orig_num=" << num_source_shards << ", new_num_shards=" << new_num_shards << dendl;
+    return 0;
+  }
+
+  cls_rgw_reshard_entry entry;
+  entry.time = real_clock::now();
+  entry.tenant = bucket_info.owner.tenant;
+  entry.bucket_name = bucket_info.bucket.name;
+  entry.bucket_id = bucket_info.bucket.bucket_id;
+  entry.old_num_shards = num_source_shards;
+  entry.new_num_shards = new_num_shards;
+
+  return reshard.add(entry);
+}
+
 int RGWRados::check_quota(const rgw_user& bucket_owner, rgw_bucket& bucket,
                           RGWQuotaInfo& user_quota, RGWQuotaInfo& bucket_quota, uint64_t obj_size)
 {
@@ -12691,7 +13604,7 @@ int RGWRados::get_target_shard_id(const RGWBucketInfo& bucket_info, const string
       } else {
         uint32_t sid = ceph_str_hash_linux(obj_key.c_str(), obj_key.size());
         uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
-        sid = sid2 % MAX_BUCKET_INDEX_SHARDS_PRIME % bucket_info.num_shards;
+        sid = rgw_shards_mod(sid2, bucket_info.num_shards);
         if (shard_id) {
           *shard_id = (int)sid;
         }
@@ -12731,7 +13644,7 @@ int RGWRados::get_bucket_index_object(const string& bucket_oid_base, const strin
       } else {
         uint32_t sid = ceph_str_hash_linux(obj_key.c_str(), obj_key.size());
         uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
-        sid = sid2 % MAX_BUCKET_INDEX_SHARDS_PRIME % num_shards;
+        sid = rgw_shards_mod(sid2, num_shards);
         char buf[bucket_oid_base.size() + 32];
         snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), sid);
         (*bucket_obj) = buf;
@@ -13021,17 +13934,17 @@ uint64_t RGWRados::next_bucket_id()
   return ++max_bucket_id;
 }
 
-RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread)
+RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread,
+                                                bool quota_threads, bool run_sync_thread, bool run_reshard_thread, bool use_cache)
 {
-  int use_cache = cct->_conf->rgw_cache_enabled;
   RGWRados *store = NULL;
   if (!use_cache) {
     store = new RGWRados;
   } else {
-    store = new RGWCache<RGWRados>; 
+    store = new RGWCache<RGWRados>;
   }
 
-  if (store->initialize(cct, use_gc_thread, use_lc_thread, quota_threads, run_sync_thread) < 0) {
+  if (store->initialize(cct, use_gc_thread, use_lc_thread, quota_threads, run_sync_thread, run_reshard_thread) < 0) {
     delete store;
     return NULL;
   }
@@ -13185,3 +14098,71 @@ int rgw_compression_info_from_attrset(map<string, bufferlist>& attrs, bool& need
   }
 }
 
+bool RGWRados::call(std::string command, cmdmap_t& cmdmap, std::string format,
+                    bufferlist& out)
+{
+  if (command == "cache list") {
+    boost::optional<std::string> filter;
+    auto i = cmdmap.find("filter");
+    if (i != cmdmap.cend()) {
+      filter = boost::get<std::string>(i->second);
+    }
+    std::unique_ptr<Formatter> f(ceph::Formatter::create(format, "table"));
+    if (f) {
+      f->open_array_section("cache_entries");
+      call_list(filter, f.get());
+      f->close_section();
+      f->flush(out);
+      return true;
+    } else {
+      out.append("Unable to create Formatter.\n");
+      return false;
+    }
+  } else if (command == "cache inspect") {
+    std::unique_ptr<Formatter> f(ceph::Formatter::create(format, "json-pretty"));
+    if (f) {
+      const auto& target = boost::get<std::string>(cmdmap["target"]);
+      if (call_inspect(target, f.get())) {
+        f->flush(out);
+        return true;
+      } else {
+        out.append(string("Unable to find entry ") + target + string(".\n"));
+        return false;
+      }
+    } else {
+      out.append("Unable to create Formatter.\n");
+      return false;
+    }
+  } else if (command == "cache erase") {
+    const auto& target = boost::get<std::string>(cmdmap["target"]);
+    if (call_erase(target)) {
+      return true;
+    } else {
+      out.append(string("Unable to find entry ") + target + string(".\n"));
+      return false;
+    }
+  } else if (command == "cache zap") {
+    call_zap();
+    return true;
+  }
+  return false;
+}
+
+void RGWRados::call_list(const boost::optional<std::string>&,
+                         ceph::Formatter*)
+{
+  return;
+}
+
+bool RGWRados::call_inspect(const std::string&, Formatter*)
+{
+  return false;
+}
+
+bool RGWRados::call_erase(const std::string&) {
+  return false;
+}
+
+void RGWRados::call_zap() {
+  return;
+}