]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_rados.cc
update sources to 12.2.10
[ceph.git] / ceph / src / rgw / rgw_rados.cc
index 7a1fce857151b99077ef251e0afaa9ba02cfd1cd..f123514e470e29f3677cf63d4f394202cabfad7c 100644 (file)
@@ -1,4 +1,3 @@
-
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
 
@@ -171,6 +170,14 @@ int rgw_init_ioctx(librados::Rados *rados, const rgw_pool& pool, IoCtx& ioctx, b
   int r = rados->ioctx_create(pool.name.c_str(), ioctx);
   if (r == -ENOENT && create) {
     r = rados->pool_create(pool.name.c_str());
+    if (r == -ERANGE) {
+      dout(0)
+        << __func__
+        << " ERROR: librados::Rados::pool_create returned " << cpp_strerror(-r)
+        << " (this can be due to a pool or placement group misconfiguration, e.g."
+        << " pg_num < pgp_num or mon_max_pg_per_osd exceeded)"
+        << dendl;
+    }
     if (r < 0 && r != -EEXIST) {
       return r;
     }
@@ -481,9 +488,9 @@ int RGWZoneGroup::read_default_id(string& default_id, bool old_format)
     /* try using default realm */
     RGWRealm realm;
     int ret = realm.init(cct, store);
+    // no default realm exist
     if (ret < 0) {
-      ldout(cct, 10) << "could not read realm id: " << cpp_strerror(-ret) << dendl;
-      return -ENOENT;
+      return read_id(default_zonegroup_name, default_id);
     }
     realm_id = realm.get_id();
   }
@@ -1807,9 +1814,9 @@ int RGWZoneParams::read_default_id(string& default_id, bool old_format)
     /* try using default realm */
     RGWRealm realm;
     int ret = realm.init(cct, store);
+    //no default realm exist
     if (ret < 0) {
-      ldout(cct, 10) << "could not read realm id: " << cpp_strerror(-ret) << dendl;
-      return -ENOENT;
+      return read_id(default_zone_name, default_id);
     }
     realm_id = realm.get_id();
   }
@@ -2780,6 +2787,7 @@ int RGWPutObjProcessor_Atomic::do_complete(size_t accounted_size, const string&
   obj_op.meta.delete_at = delete_at;
   obj_op.meta.user_data = user_data;
   obj_op.meta.zones_trace = zones_trace;
+  obj_op.meta.modify_tail = true;
 
   r = obj_op.write_meta(obj_len, accounted_size, attrs);
   if (r < 0) {
@@ -2791,6 +2799,22 @@ int RGWPutObjProcessor_Atomic::do_complete(size_t accounted_size, const string&
   return 0;
 }
 
+const char* RGWRados::admin_commands[4][3] = {
+  { "cache list",
+    "cache list name=filter,type=CephString,req=false",
+    "cache list [filter_str]: list object cache, possibly matching substrings" },
+  { "cache inspect",
+    "cache inspect name=target,type=CephString,req=true",
+    "cache inspect target: print cache element" },
+  { "cache erase",
+    "cache erase name=target,type=CephString,req=true",
+    "cache erase target: erase element from cache" },
+  { "cache zap",
+    "cache zap",
+    "cache zap: erase all elements from cache" }
+};
+
+
 int RGWRados::watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx) {
   int r = control_pool_ctx.watch2(oid, watch_handle, ctx);
   if (r < 0)
@@ -3085,6 +3109,9 @@ class RGWMetaNotifier : public RGWRadosThread {
   uint64_t interval_msec() override {
     return cct->_conf->rgw_md_notify_interval_msec;
   }
+  void stop_process() override {
+    notify_mgr.stop();
+  }
 public:
   RGWMetaNotifier(RGWRados *_store, RGWMetadataLog* log)
     : RGWRadosThread(_store, "meta-notifier"), notify_mgr(_store), log(log) {}
@@ -3117,6 +3144,9 @@ class RGWDataNotifier : public RGWRadosThread {
   uint64_t interval_msec() override {
     return cct->_conf->get_val<int64_t>("rgw_data_notify_interval_msec");
   }
+  void stop_process() override {
+    notify_mgr.stop();
+  }
 public:
   RGWDataNotifier(RGWRados *_store) : RGWRadosThread(_store, "data-notifier"), notify_mgr(_store) {}
 
@@ -3210,7 +3240,8 @@ class RGWDataSyncProcessorThread : public RGWSyncProcessorThread
 public:
   RGWDataSyncProcessorThread(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
                              const string& _source_zone)
-    : RGWSyncProcessorThread(_store, "data-sync"), sync(_store, async_rados, _source_zone),
+    : RGWSyncProcessorThread(_store, "data-sync"),
+      sync(_store, async_rados, _source_zone),
       initialized(false) {}
 
   void wakeup_sync_shards(map<int, set<string> >& shard_ids) {
@@ -3246,15 +3277,18 @@ class RGWSyncLogTrimThread : public RGWSyncProcessorThread
 {
   RGWCoroutinesManager crs;
   RGWRados *store;
+  rgw::BucketTrimManager *bucket_trim;
   RGWHTTPManager http;
   const utime_t trim_interval;
 
   uint64_t interval_msec() override { return 0; }
   void stop_process() override { crs.stop(); }
 public:
-  RGWSyncLogTrimThread(RGWRados *store, int interval)
+  RGWSyncLogTrimThread(RGWRados *store, rgw::BucketTrimManager *bucket_trim,
+                       int interval)
     : RGWSyncProcessorThread(store, "sync-log-trim"),
       crs(store->ctx(), store->get_cr_registry()), store(store),
+      bucket_trim(bucket_trim),
       http(store->ctx(), crs.get_completion_mgr()),
       trim_interval(interval, 0)
   {}
@@ -3276,6 +3310,10 @@ public:
                                        trim_interval));
     stacks.push_back(data);
 
+    auto bucket = new RGWCoroutinesStack(store->ctx(), &crs);
+    bucket->call(bucket_trim->create_bucket_trim_cr(&http));
+    stacks.push_back(bucket);
+
     crs.run(stacks);
     return 0;
   }
@@ -3563,7 +3601,6 @@ public:
     for (int i = 0; i < num_shards; ++i) {
       Mutex::Locker l(*locks[i]);
       for (auto c : completions[i]) {
-        Mutex::Locker cl(c->lock);
         c->stop();
       }
     }
@@ -3659,6 +3696,15 @@ bool RGWIndexCompletionManager::handle_completion(completion_t cb, complete_op_d
 
 void RGWRados::finalize()
 {
+  auto admin_socket = cct->get_admin_socket();
+  for (auto cmd : admin_commands) {
+    int r = admin_socket->unregister_command(cmd[0]);
+    if (r < 0) {
+      lderr(cct) << "ERROR: fail to unregister admin socket command (r=" << r
+                 << ")" << dendl;
+    }
+  }
+
   if (run_sync_thread) {
     Mutex::Locker l(meta_sync_thread_lock);
     meta_sync_processor_thread->stop();
@@ -3686,6 +3732,7 @@ void RGWRados::finalize()
     data_sync_processor_threads.clear();
     delete sync_log_trimmer;
     sync_log_trimmer = nullptr;
+    bucket_trim = boost::none;
   }
   if (finisher) {
     finisher->stop();
@@ -3762,6 +3809,17 @@ void RGWRados::finalize()
 int RGWRados::init_rados()
 {
   int ret = 0;
+  auto admin_socket = cct->get_admin_socket();
+  for (auto cmd : admin_commands) {
+    int r = admin_socket->register_command(cmd[0], cmd[1], this,
+                                           cmd[2]);
+    if (r < 0) {
+      lderr(cct) << "ERROR: fail to register admin socket command (r=" << r
+                 << ")" << dendl;
+      return r;
+    }
+  }
+
   auto handles = std::vector<librados::Rados>{cct->_conf->rgw_num_rados_handles};
 
   for (auto& r : handles) {
@@ -4493,13 +4551,6 @@ int RGWRados::init_complete()
     obj_expirer->start_processor();
   }
 
-  if (run_sync_thread) {
-    // initialize the log period history. we want to do this any time we're not
-    // running under radosgw-admin, so we check run_sync_thread here before
-    // disabling it based on the zone/zonegroup setup
-    meta_mgr->init_oldest_log_period();
-  }
-
   /* no point of running sync thread if we don't have a master zone configured
     or there is no rest_master_conn */
   if (get_zonegroup().master_zone.empty() || !rest_master_conn
@@ -4507,6 +4558,11 @@ int RGWRados::init_complete()
     run_sync_thread = false;
   }
 
+  if (run_sync_thread) {
+    // initialize the log period history
+    meta_mgr->init_oldest_log_period();
+  }
+
   async_rados = new RGWAsyncRadosProcessor(this, cct->_conf->rgw_num_async_rados_threads);
   async_rados->start();
 
@@ -4533,10 +4589,22 @@ int RGWRados::init_complete()
     }
     meta_sync_processor_thread->start();
 
+    // configure the bucket trim manager
+    rgw::BucketTrimConfig config;
+    rgw::configure_bucket_trim(cct, config);
+
+    bucket_trim.emplace(this, config);
+    ret = bucket_trim->init();
+    if (ret < 0) {
+      ldout(cct, 0) << "ERROR: failed to start bucket trim manager" << dendl;
+      return ret;
+    }
+    data_log->set_observer(&*bucket_trim);
+
     Mutex::Locker dl(data_sync_thread_lock);
     for (auto iter : zone_data_sync_from_map) {
       ldout(cct, 5) << "starting data sync thread for zone " << iter.first << dendl;
-      RGWDataSyncProcessorThread *thread = new RGWDataSyncProcessorThread(this, async_rados, iter.first);
+      auto *thread = new RGWDataSyncProcessorThread(this, async_rados, iter.first);
       ret = thread->init();
       if (ret < 0) {
         ldout(cct, 0) << "ERROR: failed to initialize data sync thread" << dendl;
@@ -4547,7 +4615,7 @@ int RGWRados::init_complete()
     }
     auto interval = cct->_conf->rgw_sync_log_trim_interval;
     if (interval > 0) {
-      sync_log_trimmer = new RGWSyncLogTrimThread(this, interval);
+      sync_log_trimmer = new RGWSyncLogTrimThread(this, &*bucket_trim, interval);
       ret = sync_log_trimmer->init();
       if (ret < 0) {
         ldout(cct, 0) << "ERROR: failed to initialize sync log trim thread" << dendl;
@@ -4805,28 +4873,10 @@ void RGWRados::pick_control_oid(const string& key, string& notify_oid)
   notify_oid.append(buf);
 }
 
-int RGWRados::open_pool_ctx(const rgw_pool& pool, librados::IoCtx&  io_ctx)
+int RGWRados::open_pool_ctx(const rgw_pool& pool, librados::IoCtx& io_ctx)
 {
-  librados::Rados *rad = get_rados_handle();
-  int r = rgw_init_ioctx(rad, pool, io_ctx);
-  if (r != -ENOENT)
-    return r;
-
-  if (!pools_initialized)
-    return r;
-
-  r = rad->pool_create(pool.name.c_str());
-  if (r < 0 && r != -EEXIST)
-    return r;
-
-  r = rgw_init_ioctx(rad, pool, io_ctx);
-  if (r < 0)
-    return r;
-
-  r = io_ctx.application_enable(pg_pool_t::APPLICATION_NAME_RGW, false);
-  if (r < 0 && r != -EOPNOTSUPP)
-    return r;
-  return 0;
+  constexpr bool create = true; // create the pool if it doesn't exist
+  return rgw_init_ioctx(get_rados_handle(), pool, io_ctx, create);
 }
 
 void RGWRados::build_bucket_index_marker(const string& shard_id_str, const string& shard_marker,
@@ -4840,6 +4890,12 @@ void RGWRados::build_bucket_index_marker(const string& shard_id_str, const strin
 
 int RGWRados::open_bucket_index_ctx(const RGWBucketInfo& bucket_info, librados::IoCtx& index_ctx)
 {
+  const rgw_pool& explicit_pool = bucket_info.bucket.explicit_placement.index_pool;
+
+  if (!explicit_pool.empty()) {
+    return open_pool_ctx(explicit_pool, index_ctx);
+  }
+
   const string *rule = &bucket_info.placement_rule;
   if (rule->empty()) {
     rule = &zonegroup.default_placement;
@@ -5139,16 +5195,12 @@ int RGWRados::trim_usage(rgw_user& user, uint64_t start_epoch, uint64_t end_epoc
   usage_log_hash(cct, user_str, first_hash, index);
 
   hash = first_hash;
-
   do {
     int ret =  cls_obj_usage_log_trim(hash, user_str, start_epoch, end_epoch);
-    if (ret == -ENOENT)
-      goto next;
 
-    if (ret < 0)
+    if (ret < 0 && ret != -ENOENT)
       return ret;
 
-next:
     usage_log_hash(cct, user_str, hash, ++index);
   } while (hash != first_hash);
 
@@ -5157,7 +5209,7 @@ next:
 
 int RGWRados::key_to_shard_id(const string& key, int max_shards)
 {
-  return rgw_shards_hash(key, max_shards);
+  return rgw_shard_id(key, max_shards);
 }
 
 void RGWRados::shard_name(const string& prefix, unsigned max_shards, const string& key, string& name, int *shard_id)
@@ -5338,10 +5390,7 @@ int RGWRados::objexp_key_shard(const rgw_obj_index_key& key)
 {
   string obj_key = key.name + key.instance;
   int num_shards = cct->_conf->rgw_objexp_hints_num_shards;
-  uint32_t sid = ceph_str_hash_linux(obj_key.c_str(), obj_key.size());
-  uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
-  sid = rgw_shards_mod(sid2, num_shards);
-  return sid;
+  return rgw_bucket_shard_index(obj_key, num_shards);
 }
 
 static string objexp_hint_get_keyext(const string& tenant_name,
@@ -5525,8 +5574,9 @@ int RGWRados::Bucket::update_bucket_id(const string& new_bucket_id)
   return 0;
 }
 
-/** 
- * get listing of the objects in a bucket.
+
+/**
+ * Get ordered listing of the objects in a bucket.
  *
  * max: maximum number of results to return
  * bucket: bucket to list contents of
@@ -5540,10 +5590,10 @@ int RGWRados::Bucket::update_bucket_id(const string& new_bucket_id)
  * common_prefixes: if delim is filled in, any matching prefixes are placed here.
  * is_truncated: if number of objects in the bucket is bigger than max, then truncated.
  */
-int RGWRados::Bucket::List::list_objects(int64_t max,
-                                         vector<rgw_bucket_dir_entry> *result,
-                                         map<string, bool> *common_prefixes,
-                                         bool *is_truncated)
+int RGWRados::Bucket::List::list_objects_ordered(int64_t max,
+                                                vector<rgw_bucket_dir_entry> *result,
+                                                map<string, bool> *common_prefixes,
+                                                bool *is_truncated)
 {
   RGWRados *store = target->get_store();
   CephContext *cct = store->ctx();
@@ -5556,17 +5606,13 @@ int RGWRados::Bucket::List::list_objects(int64_t max,
   result->clear();
 
   rgw_obj_key marker_obj(params.marker.name, params.marker.instance, params.ns);
-
-  rgw_obj_key end_marker_obj;
-  rgw_obj_index_key cur_end_marker;
-  if (!params.ns.empty()) {
-    end_marker_obj = rgw_obj_key(params.end_marker.name, params.end_marker.instance, params.ns);
-    end_marker_obj.ns = params.ns;
-    end_marker_obj.get_index_key(&cur_end_marker);
-  }
   rgw_obj_index_key cur_marker;
   marker_obj.get_index_key(&cur_marker);
 
+  rgw_obj_key end_marker_obj(params.end_marker.name, params.end_marker.instance,
+                             params.ns);
+  rgw_obj_index_key cur_end_marker;
+  end_marker_obj.get_index_key(&cur_end_marker);
   const bool cur_end_marker_valid = !params.end_marker.empty();
 
   rgw_obj_key prefix_obj(params.prefix);
@@ -5576,7 +5622,8 @@ int RGWRados::Bucket::List::list_objects(int64_t max,
   string bigger_than_delim;
 
   if (!params.delim.empty()) {
-    unsigned long val = decode_utf8((unsigned char *)params.delim.c_str(), params.delim.size());
+    unsigned long val = decode_utf8((unsigned char *)params.delim.c_str(),
+                                   params.delim.size());
     char buf[params.delim.size() + 16];
     int r = encode_utf8(val + 1, (unsigned char *)buf);
     if (r < 0) {
@@ -5595,7 +5642,7 @@ int RGWRados::Bucket::List::list_objects(int64_t max,
       cur_marker = s;
     }
   }
-  
+
   string skip_after_delim;
   while (truncated && count <= max) {
     if (skip_after_delim > cur_marker.name) {
@@ -5603,22 +5650,29 @@ int RGWRados::Bucket::List::list_objects(int64_t max,
       ldout(cct, 20) << "setting cur_marker=" << cur_marker.name << "[" << cur_marker.instance << "]" << dendl;
     }
     std::map<string, rgw_bucket_dir_entry> ent_map;
-    int r = store->cls_bucket_list(target->get_bucket_info(), shard_id, cur_marker, cur_prefix,
-                                   read_ahead + 1 - count, params.list_versions, ent_map,
-                                   &truncated, &cur_marker);
+    int r = store->cls_bucket_list_ordered(target->get_bucket_info(),
+                                          shard_id,
+                                          cur_marker,
+                                          cur_prefix,
+                                          read_ahead + 1 - count,
+                                          params.list_versions,
+                                          ent_map,
+                                          &truncated,
+                                          &cur_marker);
     if (r < 0)
       return r;
 
-    std::map<string, rgw_bucket_dir_entry>::iterator eiter;
-    for (eiter = ent_map.begin(); eiter != ent_map.end(); ++eiter) {
+    for (auto eiter = ent_map.begin(); eiter != ent_map.end(); ++eiter) {
       rgw_bucket_dir_entry& entry = eiter->second;
       rgw_obj_index_key index_key = entry.key;
 
       rgw_obj_key obj(index_key);
 
-      /* note that parse_raw_oid() here will not set the correct object's instance, as
-       * rgw_obj_index_key encodes that separately. We don't need to set the instance because it's
-       * not needed for the checks here and we end up using the raw entry for the return vector
+      /* note that parse_raw_oid() here will not set the correct
+       * object's instance, as rgw_obj_index_key encodes that
+       * separately. We don't need to set the instance because it's
+       * not needed for the checks here and we end up using the raw
+       * entry for the return vector
        */
       bool valid = rgw_obj_key::parse_raw_oid(index_key.name, &obj);
       if (!valid) {
@@ -5654,7 +5708,8 @@ int RGWRados::Bucket::List::list_objects(int64_t max,
       if (params.filter && !params.filter->filter(obj.name, index_key.name))
         continue;
 
-      if (params.prefix.size() &&  (obj.name.compare(0, params.prefix.size(), params.prefix) != 0))
+      if (params.prefix.size() &&
+         (obj.name.compare(0, params.prefix.size(), params.prefix) != 0))
         continue;
 
       if (!params.delim.empty()) {
@@ -5694,10 +5749,6 @@ int RGWRados::Bucket::List::list_objects(int64_t max,
       result->emplace_back(std::move(entry));
       count++;
     }
-
-    // Either the back-end telling us truncated, or we don't consume all
-    // items returned per the amount caller request
-    truncated = (truncated || eiter != ent_map.end());
   }
 
 done:
@@ -5705,39 +5756,150 @@ done:
     *is_truncated = truncated;
 
   return 0;
-}
+} // list_objects_ordered
+
 
 /**
- * create a rados pool, associated meta info
- * returns 0 on success, -ERR# otherwise.
+ * Get listing of the objects in a bucket and allow the results to be out
+ * of order.
+ *
+ * Even though there are key differences with the ordered counterpart,
+ * the parameters are the same to maintain some compatability.
+ *
+ * max: maximum number of results to return
+ * bucket: bucket to list contents of
+ * prefix: only return results that match this prefix
+ * delim: should not be set; if it is we should have indicated an error
+ * marker: if filled in, begin the listing with this object.
+ * end_marker: if filled in, end the listing with this object.
+ * result: the objects are put in here.
+ * common_prefixes: this is never filled with an unordered list; the param
+ *                  is maintained for compatibility
+ * is_truncated: if number of objects in the bucket is bigger than max, then
+ *               truncated.
  */
-int RGWRados::create_pool(const rgw_pool& pool)
+int RGWRados::Bucket::List::list_objects_unordered(int64_t max,
+                                                  vector<rgw_bucket_dir_entry> *result,
+                                                  map<string, bool> *common_prefixes,
+                                                  bool *is_truncated)
 {
-  int ret = 0;
+  RGWRados *store = target->get_store();
+  CephContext *cct = store->ctx();
+  int shard_id = target->get_shard_id();
 
-  librados::Rados *rad = get_rados_handle();
-  ret = rad->pool_create(pool.name.c_str(), 0);
-  if (ret == -EEXIST)
-    ret = 0;
-  else if (ret == -ERANGE) {
-    ldout(cct, 0)
-      << __func__
-      << " ERROR: librados::Rados::pool_create returned " << cpp_strerror(-ret)
-      << " (this can be due to a pool or placement group misconfiguration, e.g., pg_num < pgp_num)"
-      << dendl;
-  }
-  if (ret < 0)
-    return ret;
+  int count = 0;
+  bool truncated = true;
 
-  librados::IoCtx io_ctx;
-  ret = rad->ioctx_create(pool.name.c_str(), io_ctx);
-  if (ret < 0)
-    return ret;
+  // read a few extra in each call to cls_bucket_list_unordered in
+  // case some are filtered out due to namespace matching, versioning,
+  // filtering, etc.
+  const int64_t max_read_ahead = 100;
+  const uint32_t read_ahead = uint32_t(max + std::min(max, max_read_ahead));
+
+  result->clear();
+
+  rgw_obj_key marker_obj(params.marker.name, params.marker.instance, params.ns);
+  rgw_obj_index_key cur_marker;
+  marker_obj.get_index_key(&cur_marker);
+
+  rgw_obj_key end_marker_obj(params.end_marker.name, params.end_marker.instance,
+                             params.ns);
+  rgw_obj_index_key cur_end_marker;
+  end_marker_obj.get_index_key(&cur_end_marker);
+  const bool cur_end_marker_valid = !params.end_marker.empty();
+
+  rgw_obj_key prefix_obj(params.prefix);
+  prefix_obj.ns = params.ns;
+  string cur_prefix = prefix_obj.get_index_key_name();
+
+  while (truncated && count <= max) {
+    std::vector<rgw_bucket_dir_entry> ent_list;
+    int r = store->cls_bucket_list_unordered(target->get_bucket_info(),
+                                            shard_id,
+                                            cur_marker,
+                                            cur_prefix,
+                                            read_ahead,
+                                            params.list_versions,
+                                            ent_list,
+                                            &truncated,
+                                            &cur_marker);
+    if (r < 0)
+      return r;
+
+    // NB: while regions of ent_list will be sorted, we have no
+    // guarantee that all items will be sorted since they can cross
+    // shard boundaries
+
+    for (auto& entry : ent_list) {
+      rgw_obj_index_key index_key = entry.key;
+      rgw_obj_key obj(index_key);
+
+      /* note that parse_raw_oid() here will not set the correct
+       * object's instance, as rgw_obj_index_key encodes that
+       * separately. We don't need to set the instance because it's
+       * not needed for the checks here and we end up using the raw
+       * entry for the return vector
+       */
+      bool valid = rgw_obj_key::parse_raw_oid(index_key.name, &obj);
+      if (!valid) {
+        ldout(cct, 0) << "ERROR: could not parse object name: " <<
+         obj.name << dendl;
+        continue;
+      }
+
+      if (!params.list_versions && !entry.is_visible()) {
+        continue;
+      }
+
+      if (params.enforce_ns && obj.ns != params.ns) {
+        continue;
+      }
+
+      if (cur_end_marker_valid && cur_end_marker <= index_key) {
+       // we're not guaranteed items will come in order, so we have
+       // to loop through all
+       continue;
+      }
+
+      if (count < max) {
+        params.marker = index_key;
+        next_marker = index_key;
+      }
+
+      if (params.filter && !params.filter->filter(obj.name, index_key.name))
+        continue;
+
+      if (params.prefix.size() &&
+         (0 != obj.name.compare(0, params.prefix.size(), params.prefix)))
+        continue;
+
+      if (count >= max) {
+        truncated = true;
+        goto done;
+      }
+
+      result->emplace_back(std::move(entry));
+      count++;
+    } // for (auto& entry : ent_list)
+  } // while (truncated && count <= max)
+
+done:
+  if (is_truncated)
+    *is_truncated = truncated;
 
-  ret = io_ctx.application_enable(pg_pool_t::APPLICATION_NAME_RGW, false);
-  if (ret < 0 && ret != -EOPNOTSUPP)
-    return ret;
   return 0;
+} // list_objects_unordered
+
+
+/**
+ * create a rados pool, associated meta info
+ * returns 0 on success, -ERR# otherwise.
+ */
+int RGWRados::create_pool(const rgw_pool& pool)
+{
+  librados::IoCtx io_ctx;
+  constexpr bool create = true;
+  return rgw_init_ioctx(get_rados_handle(), pool, io_ctx, create);
 }
 
 int RGWRados::init_bucket_index(RGWBucketInfo& bucket_info, int num_shards)
@@ -6571,6 +6733,21 @@ int RGWRados::BucketShard::init(const rgw_bucket& _bucket, int sid)
   return 0;
 }
 
+int RGWRados::BucketShard::init(const RGWBucketInfo& bucket_info, int sid)
+{
+  bucket = bucket_info.bucket;
+  shard_id = sid;
+
+  int ret = store->open_bucket_index_shard(bucket_info, index_ctx, shard_id, &bucket_obj);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: open_bucket_index_shard() returned ret=" << ret << dendl;
+    return ret;
+  }
+  ldout(store->ctx(), 20) << " bucket index object: " << bucket_obj << dendl;
+
+  return 0;
+}
+
 
 /* Execute @handler on last item in bucket listing for bucket specified
  * in @bucket_info. @obj_prefix and @obj_delim narrow down the listing
@@ -6823,7 +7000,8 @@ int RGWRados::swift_versioning_restore(RGWObjectCtx& obj_ctx,
  * Returns: 0 on success, -ERR# otherwise.
  */
 int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_size,
-                                           map<string, bufferlist>& attrs, bool assume_noent,
+                                           map<string, bufferlist>& attrs,
+                                           bool assume_noent, bool modify_tail,
                                            void *_index_op)
 {
   RGWRados::Bucket::UpdateIndex *index_op = static_cast<RGWRados::Bucket::UpdateIndex *>(_index_op);
@@ -6856,7 +7034,7 @@ int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_si
   if (!ptag && !index_op->get_optag()->empty()) {
     ptag = index_op->get_optag();
   }
-  r = target->prepare_atomic_modification(op, reset_obj, ptag, meta.if_match, meta.if_nomatch, false);
+  r = target->prepare_atomic_modification(op, reset_obj, ptag, meta.if_match, meta.if_nomatch, false, modify_tail);
   if (r < 0)
     return r;
 
@@ -6943,7 +7121,8 @@ int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_si
     orig_size = state->accounted_size;
   }
 
-  bool versioned_target = (meta.olh_epoch > 0 || !obj.key.instance.empty());
+  bool versioned_target = (meta.olh_epoch && *meta.olh_epoch > 0) ||
+                          !obj.key.instance.empty();
 
   bool versioned_op = (target->versioning_enabled() || is_olh || versioned_target);
 
@@ -6990,8 +7169,8 @@ int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_si
   target->invalidate_state();
   state = NULL;
 
-  if (versioned_op) {
-    r = store->set_olh(target->get_ctx(), target->get_bucket_info(), obj, false, NULL, meta.olh_epoch, real_time(), false, meta.zones_trace);
+  if (versioned_op && meta.olh_epoch) {
+    r = store->set_olh(target->get_ctx(), target->get_bucket_info(), obj, false, NULL, *meta.olh_epoch, real_time(), false, meta.zones_trace);
     if (r < 0) {
       return r;
     }
@@ -7011,8 +7190,14 @@ int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_si
   meta.canceled = false;
 
   /* update quota cache */
-  store->quota_handler->update_stats(meta.owner, obj.bucket, (orig_exists ? 0 : 1),
-                                     accounted_size, orig_size);
+  if (meta.completeMultipart){
+       store->quota_handler->update_stats(meta.owner, obj.bucket, (orig_exists ? 0 : 1),
+                                     0, orig_size);
+  }
+  else {
+    store->quota_handler->update_stats(meta.owner, obj.bucket, (orig_exists ? 0 : 1),
+                                     accounted_size, orig_size);  
+  }
   return 0;
 
 done_cancel:
@@ -7072,13 +7257,13 @@ int RGWRados::Object::Write::write_meta(uint64_t size, uint64_t accounted_size,
   bool assume_noent = (meta.if_match == NULL && meta.if_nomatch == NULL);
   int r;
   if (assume_noent) {
-    r = _do_write_meta(size, accounted_size, attrs, assume_noent, (void *)&index_op);
+    r = _do_write_meta(size, accounted_size, attrs, assume_noent, meta.modify_tail, (void *)&index_op);
     if (r == -EEXIST) {
       assume_noent = false;
     }
   }
   if (!assume_noent) {
-    r = _do_write_meta(size, accounted_size, attrs, assume_noent, (void *)&index_op);
+    r = _do_write_meta(size, accounted_size, attrs, assume_noent, meta.modify_tail, (void *)&index_op);
   }
   return r;
 }
@@ -7237,19 +7422,51 @@ bool RGWRados::aio_completed(void *handle)
   return c->is_safe();
 }
 
+// PutObj filter that buffers data so we don't try to compress tiny blocks.
+// libcurl reads in 16k at a time, and we need at least 64k to get a good
+// compression ratio
+class RGWPutObj_Buffer : public RGWPutObj_Filter {
+  const unsigned buffer_size;
+  bufferlist buffer;
+ public:
+  RGWPutObj_Buffer(RGWPutObjDataProcessor* next, unsigned buffer_size)
+    : RGWPutObj_Filter(next), buffer_size(buffer_size) {
+    assert(ISP2(buffer_size)); // must be power of 2
+  }
+
+  int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj,
+                  bool *again) override {
+    if (*again || !bl.length()) {
+      // flush buffered data
+      return RGWPutObj_Filter::handle_data(buffer, ofs, phandle, pobj, again);
+    }
+    // transform offset to the beginning of the buffer
+    ofs = ofs - buffer.length();
+    buffer.claim_append(bl);
+    if (buffer.length() < buffer_size) {
+      *again = false; // don't come back until there's more data
+      return 0;
+    }
+    const auto count = P2ALIGN(buffer.length(), buffer_size);
+    buffer.splice(0, count, &bl);
+    return RGWPutObj_Filter::handle_data(bl, ofs, phandle, pobj, again);
+  }
+};
+
 class RGWRadosPutObj : public RGWGetDataCB
 {
   CephContext* cct;
   rgw_obj obj;
   RGWPutObjDataProcessor *filter;
   boost::optional<RGWPutObj_Compress>& compressor;
+  boost::optional<RGWPutObj_Buffer> buffering;
   CompressorRef& plugin;
   RGWPutObjProcessor_Atomic *processor;
   RGWOpStateSingleOp *opstate;
   void (*progress_cb)(off_t, void *);
   void *progress_data;
   bufferlist extra_data_bl;
-  uint64_t extra_data_len;
+  uint64_t extra_data_left;
   uint64_t data_len;
   map<string, bufferlist> src_attrs;
 public:
@@ -7268,7 +7485,7 @@ public:
                        opstate(_ops),
                        progress_cb(_progress_cb),
                        progress_data(_progress_data),
-                       extra_data_len(0),
+                       extra_data_left(0),
                        data_len(0) {}
 
   int process_attrs(void) {
@@ -7288,7 +7505,9 @@ public:
     if (plugin && src_attrs.find(RGW_ATTR_CRYPT_MODE) == src_attrs.end()) {
       //do not compress if object is encrypted
       compressor = boost::in_place(cct, plugin, filter);
-      filter = &*compressor;
+      constexpr unsigned buffer_size = 512 * 1024;
+      buffering = boost::in_place(&*compressor, buffer_size);
+      filter = &*buffering;
     }
     return 0;
   }
@@ -7297,17 +7516,17 @@ public:
     if (progress_cb) {
       progress_cb(ofs, progress_data);
     }
-    if (extra_data_len) {
+    if (extra_data_left) {
       size_t extra_len = bl.length();
-      if (extra_len > extra_data_len)
-        extra_len = extra_data_len;
+      if (extra_len > extra_data_left)
+        extra_len = extra_data_left;
 
       bufferlist extra;
       bl.splice(0, extra_len, &extra);
       extra_data_bl.append(extra);
 
-      extra_data_len -= extra_len;
-      if (extra_data_len == 0) {
+      extra_data_left -= extra_len;
+      if (extra_data_left == 0) {
         int res = process_attrs();
         if (res < 0)
           return res;
@@ -7315,7 +7534,13 @@ public:
       if (bl.length() == 0) {
         return 0;
       }
+      ofs += extra_len;
     }
+    // adjust ofs based on extra_data_len, so the result is a logical offset
+    // into the object data
+    assert(uint64_t(ofs) >= extra_data_len);
+    ofs -= extra_data_len;
+
     data_len += bl.length();
     bool again = false;
 
@@ -7354,12 +7579,18 @@ public:
     return 0;
   }
 
+  int flush() {
+    bufferlist bl;
+    return put_data_and_throttle(filter, bl, 0, false);
+  }
+
   bufferlist& get_extra_data() { return extra_data_bl; }
 
   map<string, bufferlist>& get_attrs() { return src_attrs; }
 
   void set_extra_data_len(uint64_t len) override {
-    extra_data_len = len;
+    extra_data_left = len;
+    RGWGetDataCB::set_extra_data_len(len);
   }
 
   uint64_t get_data_len() {
@@ -7391,6 +7622,12 @@ static void set_copy_attrs(map<string, bufferlist>& src_attrs,
     if (!attrs[RGW_ATTR_ETAG].length()) {
       attrs[RGW_ATTR_ETAG] = src_attrs[RGW_ATTR_ETAG];
     }
+    if (!attrs[RGW_ATTR_TAIL_TAG].length()) {
+      auto ttiter = src_attrs.find(RGW_ATTR_TAIL_TAG);
+      if (ttiter != src_attrs.end()) {
+        attrs[RGW_ATTR_TAIL_TAG] = src_attrs[RGW_ATTR_TAIL_TAG];
+      }
+    }
     break;
   case RGWRados::ATTRSMOD_MERGE:
     for (map<string, bufferlist>::iterator it = src_attrs.begin(); it != src_attrs.end(); ++it) {
@@ -7422,6 +7659,7 @@ int RGWRados::rewrite_obj(RGWBucketInfo& dest_bucket_info, rgw_obj& obj)
     return ret;
 
   attrset.erase(RGW_ATTR_ID_TAG);
+  attrset.erase(RGW_ATTR_TAIL_TAG);
 
   uint64_t max_chunk_size;
 
@@ -7431,8 +7669,11 @@ int RGWRados::rewrite_obj(RGWBucketInfo& dest_bucket_info, rgw_obj& obj)
     return ret;
   }
 
-  return copy_obj_data(rctx, dest_bucket_info, read_op, obj_size - 1, obj, obj, max_chunk_size, NULL, mtime, attrset,
-                       RGW_OBJ_CATEGORY_MAIN, 0, real_time(), NULL, NULL, NULL);
+  return copy_obj_data(rctx, dest_bucket_info, read_op, obj_size - 1, obj, obj,
+                                          max_chunk_size, NULL, mtime, attrset,
+                                          RGW_OBJ_CATEGORY_MAIN, 0, real_time(),
+                                          (obj.key.instance.empty() ? NULL : &(obj.key.instance)),
+                                          NULL, NULL);
 }
 
 struct obj_time_weight {
@@ -7579,10 +7820,15 @@ int RGWRados::stat_remote_obj(RGWObjectCtx& obj_ctx,
 
   obj_time_weight dest_mtime_weight;
 
+  constexpr bool prepend_meta = true;
+  constexpr bool get_op = true;
+  constexpr bool rgwx_stat = true;
+  constexpr bool sync_manifest = true;
+  constexpr bool skip_decrypt = true;
   int ret = conn->get_obj(user_id, info, src_obj, pmod, unmod_ptr,
                       dest_mtime_weight.zone_short_id, dest_mtime_weight.pg_ver,
-                      true /* prepend_meta */, true /* GET */, true /* rgwx-stat */,
-                      true /* sync manifest */, &cb, &in_stream_req);
+                      prepend_meta, get_op, rgwx_stat,
+                      sync_manifest, skip_decrypt, &cb, &in_stream_req);
   if (ret < 0) {
     return ret;
   }
@@ -7646,7 +7892,7 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
                bool copy_if_newer,
                map<string, bufferlist>& attrs,
                RGWObjCategory category,
-               uint64_t olh_epoch,
+               boost::optional<uint64_t> olh_epoch,
               real_time delete_at,
                string *version_id,
                string *ptag,
@@ -7670,7 +7916,9 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
   if (version_id && *version_id != "null") {
     processor.set_version_id(*version_id);
   }
-  processor.set_olh_epoch(olh_epoch);
+  if (olh_epoch) {
+    processor.set_olh_epoch(*olh_epoch);
+  }
   int ret = processor.prepare(this, NULL);
   if (ret < 0) {
     return ret;
@@ -7750,10 +7998,15 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
     }
   }
 
+  static constexpr bool prepend_meta = true;
+  static constexpr bool get_op = true;
+  static constexpr bool rgwx_stat = false;
+  static constexpr bool sync_manifest = true;
+  static constexpr bool skip_decrypt = true;
   ret = conn->get_obj(user_id, info, src_obj, pmod, unmod_ptr,
                       dest_mtime_weight.zone_short_id, dest_mtime_weight.pg_ver,
-                      true /* prepend_meta */, true /* GET */, false /* rgwx-stat */,
-                      true /* sync manifest */, &cb, &in_stream_req);
+                      prepend_meta, get_op, rgwx_stat,
+                      sync_manifest, skip_decrypt, &cb, &in_stream_req);
   if (ret < 0) {
     goto set_err_state;
   }
@@ -7762,6 +8015,10 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
   if (ret < 0) {
     goto set_err_state;
   }
+  ret = cb.flush();
+  if (ret < 0) {
+    goto set_err_state;
+  }
   if (compressor && compressor->is_compressed()) {
     bufferlist tmp;
     RGWCompressionInfo cs_info;
@@ -7861,7 +8118,16 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
   return 0;
 set_err_state:
   if (copy_if_newer && ret == -ERR_NOT_MODIFIED) {
-    ret = 0;
+    // we may have already fetched during sync of OP_ADD, but were waiting
+    // for OP_LINK_OLH to call set_olh() with a real olh_epoch
+    if (olh_epoch && *olh_epoch > 0) {
+      constexpr bool log_data_change = true;
+      ret = set_olh(obj_ctx, dest_bucket_info, dest_obj, false, nullptr,
+                    *olh_epoch, real_time(), false, zones_trace, log_data_change);
+    } else {
+      // we already have the latest copy
+      ret = 0;
+    }
   }
   if (opstate) {
     RGWOpState::OpState state;
@@ -7999,6 +8265,15 @@ int RGWRados::copy_obj(RGWObjectCtx& obj_ctx,
   if (ret < 0) {
     return ret;
   }
+  if (src_attrs.count(RGW_ATTR_CRYPT_MODE)) {
+    // Current implementation does not follow S3 spec and even
+    // may result in data corruption silently when copying
+    // multipart objects acorss pools. So reject COPY operations
+    //on encrypted objects before it is fully functional.
+    ldout(cct, 0) << "ERROR: copy op for encrypted object " << src_obj
+                  << " has not been implemented." << dendl;
+    return -ERR_NOT_IMPLEMENTED;
+  }
 
   src_attrs[RGW_ATTR_ACL] = attrs[RGW_ATTR_ACL];
   src_attrs.erase(RGW_ATTR_DELETE_AT);
@@ -8117,14 +8392,17 @@ int RGWRados::copy_obj(RGWObjectCtx& obj_ctx,
   }
 
   if (!copy_itself) {
+    attrs.erase(RGW_ATTR_TAIL_TAG);
     manifest = astate->manifest;
     const rgw_bucket_placement& tail_placement = manifest.get_tail_placement();
     if (tail_placement.bucket.name.empty()) {
       manifest.set_tail_placement(tail_placement.placement_rule, src_obj.bucket);
     }
+    string ref_tag;
     for (; miter != astate->manifest.obj_end(); ++miter) {
       ObjectWriteOperation op;
-      cls_refcount_get(op, tag, true);
+      ref_tag = tag + '\0';
+      cls_refcount_get(op, ref_tag, true);
       const rgw_raw_obj& loc = miter.get_location().get_raw_obj(this);
       ref.ioctx.locator_set_key(loc.loc);
 
@@ -8163,6 +8441,7 @@ int RGWRados::copy_obj(RGWObjectCtx& obj_ctx,
   write_op.meta.category = category;
   write_op.meta.olh_epoch = olh_epoch;
   write_op.meta.delete_at = delete_at;
+  write_op.meta.modify_tail = !copy_itself;
 
   ret = write_op.write_meta(obj_size, astate->accounted_size, attrs);
   if (ret < 0) {
@@ -8215,7 +8494,7 @@ int RGWRados::copy_obj_data(RGWObjectCtx& obj_ctx,
   append_rand_alpha(cct, tag, tag, 32);
 
   RGWPutObjProcessor_Atomic processor(obj_ctx,
-                                      dest_bucket_info, dest_obj.bucket, dest_obj.get_oid(),
+                                      dest_bucket_info, dest_obj.bucket, dest_obj.key.name,
                                       cct->_conf->rgw_obj_stripe_size, tag, dest_bucket_info.versioning_enabled());
   if (version_id) {
     processor.set_version_id(*version_id);
@@ -8318,27 +8597,34 @@ bool RGWRados::is_syncing_bucket_meta(const rgw_bucket& bucket)
 
 int RGWRados::check_bucket_empty(RGWBucketInfo& bucket_info)
 {
-  std::map<string, rgw_bucket_dir_entry> ent_map;
+  std::vector<rgw_bucket_dir_entry> ent_list;
   rgw_obj_index_key marker;
   string prefix;
   bool is_truncated;
 
   do {
-#define NUM_ENTRIES 1000
-    int r = cls_bucket_list(bucket_info, RGW_NO_SHARD, marker, prefix, NUM_ENTRIES, true, ent_map,
-                        &is_truncated, &marker);
+    constexpr uint NUM_ENTRIES = 1000u;
+    int r = cls_bucket_list_unordered(bucket_info,
+                                     RGW_NO_SHARD,
+                                     marker,
+                                     prefix,
+                                     NUM_ENTRIES,
+                                     true,
+                                     ent_list,
+                                     &is_truncated,
+                                     &marker);
     if (r < 0)
       return r;
 
     string ns;
-    std::map<string, rgw_bucket_dir_entry>::iterator eiter;
-    for (eiter = ent_map.begin(); eiter != ent_map.end(); ++eiter) {
+    for (auto const& dirent : ent_list) {
       rgw_obj_key obj;
 
-      if (rgw_obj_key::oid_to_key_in_ns(eiter->second.key.name, &obj, ns))
+      if (rgw_obj_key::oid_to_key_in_ns(dirent.key.name, &obj, ns))
         return -ENOTEMPTY;
     }
   } while (is_truncated);
+
   return 0;
 }
   
@@ -8475,7 +8761,7 @@ int RGWRados::Object::complete_atomic_modification()
     return 0;
   }
 
-  string tag = state->obj_tag.to_str();
+  string tag = (state->tail_tag.length() > 0 ? state->tail_tag.to_str() : state->obj_tag.to_str());
   return store->gc->send_chain(chain, tag, false);  // do it async
 }
 
@@ -8498,7 +8784,9 @@ int RGWRados::send_chain_to_gc(cls_rgw_obj_chain& chain, const string& tag, bool
   return gc->send_chain(chain, tag, sync);
 }
 
-int RGWRados::open_bucket_index(const RGWBucketInfo& bucket_info, librados::IoCtx& index_ctx, string& bucket_oid)
+int RGWRados::open_bucket_index(const RGWBucketInfo& bucket_info,
+                               librados::IoCtx& index_ctx,
+                               string& bucket_oid)
 {
   const rgw_bucket& bucket = bucket_info.bucket;
   int r = open_bucket_index_ctx(bucket_info, index_ctx);
@@ -8516,8 +8804,9 @@ int RGWRados::open_bucket_index(const RGWBucketInfo& bucket_info, librados::IoCt
   return 0;
 }
 
-int RGWRados::open_bucket_index_base(const RGWBucketInfo& bucket_info, librados::IoCtx& index_ctx,
-    string& bucket_oid_base) {
+int RGWRados::open_bucket_index_base(const RGWBucketInfo& bucket_info,
+                                    librados::IoCtx& index_ctx,
+                                    string& bucket_oid_base) {
   const rgw_bucket& bucket = bucket_info.bucket;
   int r = open_bucket_index_ctx(bucket_info, index_ctx);
   if (r < 0)
@@ -8535,8 +8824,11 @@ int RGWRados::open_bucket_index_base(const RGWBucketInfo& bucket_info, librados:
 
 }
 
-int RGWRados::open_bucket_index(const RGWBucketInfo& bucket_info, librados::IoCtx& index_ctx,
-    map<int, string>& bucket_objs, int shard_id, map<int, string> *bucket_instance_ids) {
+int RGWRados::open_bucket_index(const RGWBucketInfo& bucket_info,
+                               librados::IoCtx& index_ctx,
+                               map<int, string>& bucket_objs,
+                               int shard_id,
+                               map<int, string> *bucket_instance_ids) {
   string bucket_oid_base;
   int ret = open_bucket_index_base(bucket_info, index_ctx, bucket_oid_base);
   if (ret < 0) {
@@ -8692,13 +8984,17 @@ int RGWRados::defer_gc(void *ctx, const RGWBucketInfo& bucket_info, const rgw_ob
     return -EINVAL;
   }
 
-  if (state->obj_tag.length() == 0) {// check for backward compatibility
+  string tag;
+
+  if (state->tail_tag.length() > 0) {
+    tag = state->tail_tag.c_str();
+  } else if (state->obj_tag.length() > 0) {
+    tag = state->obj_tag.c_str();
+  } else {
     ldout(cct, 20) << "state->obj_tag is empty, not deferring gc operation" << dendl;
     return -EINVAL;
   }
 
-  string tag = state->obj_tag.c_str();
-
   ldout(cct, 0) << "defer chain tag=" << tag << dendl;
 
   return gc->defer_chain(tag, false);
@@ -8754,6 +9050,8 @@ int RGWRados::Object::Delete::delete_obj()
       }
 
       result.version_id = marker.key.instance;
+      if (result.version_id.empty())
+        result.version_id = "null";
       result.delete_marker = true;
 
       struct rgw_bucket_dir_entry_meta meta;
@@ -8861,7 +9159,7 @@ int RGWRados::Object::Delete::delete_obj()
     return -ENOENT;
   }
 
-  r = target->prepare_atomic_modification(op, false, NULL, NULL, NULL, true);
+  r = target->prepare_atomic_modification(op, false, NULL, NULL, NULL, true, false);
   if (r < 0)
     return r;
 
@@ -8873,19 +9171,15 @@ int RGWRados::Object::Delete::delete_obj()
   index_op.set_zones_trace(params.zones_trace);
   index_op.set_bilog_flags(params.bilog_flags);
 
-
   r = index_op.prepare(CLS_RGW_OP_DEL, &state->write_tag);
   if (r < 0)
     return r;
 
   store->remove_rgw_head_obj(op);
   r = ref.ioctx.operate(ref.oid, &op);
-  bool need_invalidate = false;
-  if (r == -ECANCELED) {
-    /* raced with another operation, we can regard it as removed */
-    need_invalidate = true;
-    r = 0;
-  }
+
+  /* raced with another operation, object state is indeterminate */
+  const bool need_invalidate = (r == -ECANCELED);
 
   int64_t poolid = ref.ioctx.get_id();
   if (r >= 0) {
@@ -9195,6 +9489,10 @@ int RGWRados::get_obj_state_impl(RGWObjectCtx *rctx, const RGWBucketInfo& bucket
     s->shadow_obj[bl.length()] = '\0';
   }
   s->obj_tag = s->attrset[RGW_ATTR_ID_TAG];
+  auto ttiter = s->attrset.find(RGW_ATTR_TAIL_TAG);
+  if (ttiter != s->attrset.end()) {
+    s->tail_tag = s->attrset[RGW_ATTR_TAIL_TAG];
+  }
 
   bufferlist manifest_bl = s->attrset[RGW_ATTR_MANIFEST];
   if (manifest_bl.length()) {
@@ -9467,7 +9765,8 @@ void RGWRados::SystemObject::invalidate_state()
 }
 
 int RGWRados::Object::prepare_atomic_modification(ObjectWriteOperation& op, bool reset_obj, const string *ptag,
-                                                  const char *if_match, const char *if_nomatch, bool removal_op)
+                                                  const char *if_match, const char *if_nomatch, bool removal_op,
+                                                  bool modify_tail)
 {
   int r = get_state(&state, false);
   if (r < 0)
@@ -9551,6 +9850,9 @@ int RGWRados::Object::prepare_atomic_modification(ObjectWriteOperation& op, bool
   ldout(store->ctx(), 10) << "setting object write_tag=" << state->write_tag << dendl;
 
   op.setxattr(RGW_ATTR_ID_TAG, bl);
+  if (modify_tail) {
+    op.setxattr(RGW_ATTR_TAIL_TAG, bl);
+  }
 
   return 0;
 }
@@ -9695,10 +9997,13 @@ int RGWRados::set_attrs(void *ctx, const RGWBucketInfo& bucket_info, rgw_obj& ob
       return r;
 
     bl.append(tag.c_str(), tag.size() + 1);
-
     op.setxattr(RGW_ATTR_ID_TAG,  bl);
   }
 
+
+  real_time mtime = real_clock::now();
+  struct timespec mtime_ts = real_clock::to_timespec(mtime);
+  op.mtime2(&mtime_ts);
   r = ref.ioctx.operate(ref.oid, &op);
   if (state) {
     if (r >= 0) {
@@ -9709,7 +10014,6 @@ int RGWRados::set_attrs(void *ctx, const RGWBucketInfo& bucket_info, rgw_obj& ob
       string content_type(content_type_bl.c_str(), content_type_bl.length());
       uint64_t epoch = ref.ioctx.get_last_version();
       int64_t poolid = ref.ioctx.get_id();
-      real_time mtime = real_clock::now();
       r = index_op.complete(poolid, epoch, state->size, state->accounted_size,
                             mtime, etag, content_type, &acl_bl,
                             RGW_OBJ_CATEGORY_MAIN, NULL);
@@ -10198,7 +10502,8 @@ int RGWRados::get_system_obj(RGWObjectCtx& obj_ctx, RGWRados::SystemObject::Read
                              RGWObjVersionTracker *objv_tracker, rgw_raw_obj& obj,
                              bufferlist& bl, off_t ofs, off_t end,
                              map<string, bufferlist> *attrs,
-                             rgw_cache_entry_info *cache_info)
+                             rgw_cache_entry_info *cache_info,
+                            boost::optional<obj_version>)
 {
   uint64_t len;
   ObjectReadOperation op;
@@ -10245,12 +10550,16 @@ int RGWRados::get_system_obj(RGWObjectCtx& obj_ctx, RGWRados::SystemObject::Read
   return bl.length();
 }
 
-int RGWRados::SystemObject::Read::read(int64_t ofs, int64_t end, bufferlist& bl, RGWObjVersionTracker *objv_tracker)
+int RGWRados::SystemObject::Read::read(int64_t ofs, int64_t end, bufferlist& bl,
+                                      RGWObjVersionTracker *objv_tracker,
+                                      boost::optional<obj_version> refresh_version)
 {
   RGWRados *store = source->get_store();
   rgw_raw_obj& obj = source->get_obj();
 
-  return store->get_system_obj(source->get_ctx(), state, objv_tracker, obj, bl, ofs, end, read_params.attrs, read_params.cache_info);
+  return store->get_system_obj(source->get_ctx(), state, objv_tracker, obj, bl,
+                              ofs, end, read_params.attrs,
+                              read_params.cache_info, refresh_version);
 }
 
 int RGWRados::SystemObject::Read::get_attr(const char *name, bufferlist& dest)
@@ -10766,6 +11075,8 @@ int RGWRados::olh_init_modification_impl(const RGWBucketInfo& bucket_info, RGWOb
     op.create(true);
   } else {
     op.assert_exists();
+    struct timespec mtime_ts = real_clock::to_timespec(state.mtime);
+    op.mtime2(&mtime_ts);
   }
 
   /*
@@ -10917,7 +11228,8 @@ int RGWRados::bucket_index_link_olh(const RGWBucketInfo& bucket_info, RGWObjStat
                                     const string& op_tag,
                                     struct rgw_bucket_dir_entry_meta *meta,
                                     uint64_t olh_epoch,
-                                    real_time unmod_since, bool high_precision_time, rgw_zone_set *_zones_trace)
+                                    real_time unmod_since, bool high_precision_time,
+                                    rgw_zone_set *_zones_trace, bool log_data_change)
 {
   rgw_rados_ref ref;
   int r = get_obj_head_ref(bucket_info, obj_instance, &ref);
@@ -10928,9 +11240,8 @@ int RGWRados::bucket_index_link_olh(const RGWBucketInfo& bucket_info, RGWObjStat
   rgw_zone_set zones_trace;
   if (_zones_trace) {
     zones_trace = *_zones_trace;
-  } else {
-    zones_trace.insert(get_zone().id);
   }
+  zones_trace.insert(get_zone().id);
 
   BucketShard bs(this);
 
@@ -10948,6 +11259,10 @@ int RGWRados::bucket_index_link_olh(const RGWBucketInfo& bucket_info, RGWObjStat
     return r;
   }
 
+  if (log_data_change && bucket_info.datasync_flag_enabled()) {
+    data_log->add_entry(bs.bucket, bs.shard_id);
+  }
+
   return 0;
 }
 
@@ -11103,6 +11418,9 @@ int RGWRados::apply_olh_log(RGWObjectCtx& obj_ctx, RGWObjState& state, const RGW
   op.cmpxattr(RGW_ATTR_OLH_ID_TAG, CEPH_OSD_CMPXATTR_OP_EQ, olh_tag);
   op.cmpxattr(RGW_ATTR_OLH_VER, CEPH_OSD_CMPXATTR_OP_GT, last_ver);
 
+  struct timespec mtime_ts = real_clock::to_timespec(state.mtime);
+  op.mtime2(&mtime_ts);
+
   bool need_to_link = false;
   cls_rgw_obj_key key;
   bool delete_marker = false;
@@ -11237,7 +11555,8 @@ int RGWRados::update_olh(RGWObjectCtx& obj_ctx, RGWObjState *state, const RGWBuc
 }
 
 int RGWRados::set_olh(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const rgw_obj& target_obj, bool delete_marker, rgw_bucket_dir_entry_meta *meta,
-                      uint64_t olh_epoch, real_time unmod_since, bool high_precision_time, rgw_zone_set *zones_trace)
+                      uint64_t olh_epoch, real_time unmod_since, bool high_precision_time,
+                      rgw_zone_set *zones_trace, bool log_data_change)
 {
   string op_tag;
 
@@ -11268,7 +11587,9 @@ int RGWRados::set_olh(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const r
       }
       return ret;
     }
-    ret = bucket_index_link_olh(bucket_info, *state, target_obj, delete_marker, op_tag, meta, olh_epoch, unmod_since, high_precision_time, zones_trace);
+    ret = bucket_index_link_olh(bucket_info, *state, target_obj, delete_marker,
+                                op_tag, meta, olh_epoch, unmod_since, high_precision_time,
+                                zones_trace, log_data_change);
     if (ret < 0) {
       ldout(cct, 20) << "bucket_index_link_olh() target_obj=" << target_obj << " delete_marker=" << (int)delete_marker << " returned " << ret << dendl;
       if (ret == -ECANCELED) {
@@ -11788,13 +12109,16 @@ int RGWRados::get_bucket_instance_info(RGWObjectCtx& obj_ctx, const rgw_bucket&
 
 int RGWRados::get_bucket_instance_from_oid(RGWObjectCtx& obj_ctx, const string& oid, RGWBucketInfo& info,
                                            real_time *pmtime, map<string, bufferlist> *pattrs,
-                                           rgw_cache_entry_info *cache_info)
+                                           rgw_cache_entry_info *cache_info,
+                                          boost::optional<obj_version> refresh_version)
 {
   ldout(cct, 20) << "reading from " << get_zone_params().domain_root << ":" << oid << dendl;
 
   bufferlist epbl;
 
-  int ret = rgw_get_system_obj(this, obj_ctx, get_zone_params().domain_root, oid, epbl, &info.objv_tracker, pmtime, pattrs, cache_info);
+  int ret = rgw_get_system_obj(this, obj_ctx, get_zone_params().domain_root,
+                              oid, epbl, &info.objv_tracker, pmtime, pattrs,
+                              cache_info, refresh_version);
   if (ret < 0) {
     return ret;
   }
@@ -11817,13 +12141,16 @@ int RGWRados::get_bucket_entrypoint_info(RGWObjectCtx& obj_ctx,
                                          RGWObjVersionTracker *objv_tracker,
                                          real_time *pmtime,
                                          map<string, bufferlist> *pattrs,
-                                         rgw_cache_entry_info *cache_info)
+                                         rgw_cache_entry_info *cache_info,
+                                        boost::optional<obj_version> refresh_version)
 {
   bufferlist bl;
   string bucket_entry;
 
   rgw_make_bucket_entry_name(tenant_name, bucket_name, bucket_entry);
-  int ret = rgw_get_system_obj(this, obj_ctx, get_zone_params().domain_root, bucket_entry, bl, objv_tracker, pmtime, pattrs, cache_info);
+  int ret = rgw_get_system_obj(this, obj_ctx, get_zone_params().domain_root,
+                              bucket_entry, bl, objv_tracker, pmtime, pattrs,
+                              cache_info, refresh_version);
   if (ret < 0) {
     return ret;
   }
@@ -11876,15 +12203,27 @@ int RGWRados::convert_old_bucket_info(RGWObjectCtx& obj_ctx,
   return 0;
 }
 
-int RGWRados::get_bucket_info(RGWObjectCtx& obj_ctx,
-                              const string& tenant, const string& bucket_name, RGWBucketInfo& info,
-                              real_time *pmtime, map<string, bufferlist> *pattrs)
+int RGWRados::_get_bucket_info(RGWObjectCtx& obj_ctx,
+                               const string& tenant,
+                               const string& bucket_name,
+                               RGWBucketInfo& info,
+                               real_time *pmtime,
+                               map<string, bufferlist> *pattrs,
+                               boost::optional<obj_version> refresh_version)
 {
   bucket_info_entry e;
   string bucket_entry;
   rgw_make_bucket_entry_name(tenant, bucket_name, bucket_entry);
 
+
   if (binfo_cache->find(bucket_entry, &e)) {
+    if (refresh_version &&
+        e.info.objv_tracker.read_version.compare(&(*refresh_version))) {
+      lderr(cct) << "WARNING: The bucket info cache is inconsistent. This is "
+                 << "a failure that should be debugged. I am a nice machine, "
+                 << "so I will try to recover." << dendl;
+      binfo_cache->invalidate(bucket_entry);
+    }
     info = e.info;
     if (pattrs)
       *pattrs = e.attrs;
@@ -11897,7 +12236,9 @@ int RGWRados::get_bucket_info(RGWObjectCtx& obj_ctx,
   real_time ep_mtime;
   RGWObjVersionTracker ot;
   rgw_cache_entry_info entry_cache_info;
-  int ret = get_bucket_entrypoint_info(obj_ctx, tenant, bucket_name, entry_point, &ot, &ep_mtime, pattrs, &entry_cache_info);
+  int ret = get_bucket_entrypoint_info(obj_ctx, tenant, bucket_name,
+                                      entry_point, &ot, &ep_mtime, pattrs,
+                                      &entry_cache_info, refresh_version);
   if (ret < 0) {
     /* only init these fields */
     info.bucket.tenant = tenant;
@@ -11931,10 +12272,12 @@ int RGWRados::get_bucket_info(RGWObjectCtx& obj_ctx,
 
   rgw_cache_entry_info cache_info;
 
-  ret = get_bucket_instance_from_oid(obj_ctx, oid, e.info, &e.mtime, &e.attrs, &cache_info);
+  ret = get_bucket_instance_from_oid(obj_ctx, oid, e.info, &e.mtime, &e.attrs,
+                                    &cache_info, refresh_version);
   e.info.ep_objv = ot.read_version;
   info = e.info;
   if (ret < 0) {
+    lderr(cct) << "ERROR: get_bucket_instance_from_oid failed: " << ret << dendl;
     info.bucket.tenant = tenant;
     info.bucket.name = bucket_name;
     // XXX and why return anything in case of an error anyway?
@@ -11956,9 +12299,35 @@ int RGWRados::get_bucket_info(RGWObjectCtx& obj_ctx,
     ldout(cct, 20) << "couldn't put binfo cache entry, might have raced with data changes" << dendl;
   }
 
+  if (refresh_version &&
+      refresh_version->compare(&info.objv_tracker.read_version)) {
+    lderr(cct) << "WARNING: The OSD has the same version I have. Something may "
+               << "have gone squirrelly. An administrator may have forced a "
+               << "change; otherwise there is a problem somewhere." << dendl;
+  }
+
   return 0;
 }
 
+int RGWRados::get_bucket_info(RGWObjectCtx& obj_ctx,
+                              const string& tenant, const string& bucket_name,
+                              RGWBucketInfo& info,
+                              real_time *pmtime, map<string, bufferlist> *pattrs)
+{
+  return _get_bucket_info(obj_ctx, tenant, bucket_name, info, pmtime,
+                          pattrs, boost::none);
+}
+
+int RGWRados::try_refresh_bucket_info(RGWBucketInfo& info,
+                                      ceph::real_time *pmtime,
+                                      map<string, bufferlist> *pattrs)
+{
+  RGWObjectCtx obj_ctx(this);
+
+  return _get_bucket_info(obj_ctx, info.bucket.tenant, info.bucket.name,
+                          info, pmtime, pattrs, info.objv_tracker.read_version);
+}
+
 int RGWRados::put_bucket_entrypoint_info(const string& tenant_name, const string& bucket_name, RGWBucketEntryPoint& entry_point,
                                          bool exclusive, RGWObjVersionTracker& objv_tracker, real_time mtime,
                                          map<string, bufferlist> *pattrs)
@@ -12152,6 +12521,10 @@ int RGWRados::update_containers_stats(map<string, RGWBucketEnt>& m)
         ent.size_rounded += stats.total_size_rounded;
       }
     }
+
+    // fill in placement_rule from the bucket instance for use in swift's
+    // per-storage policy statistics
+    ent.placement_rule = std::move(bucket_info.placement_rule);
   }
 
   return m.size();
@@ -12203,6 +12576,31 @@ int RGWRados::pool_iterate_begin(const rgw_pool& pool, RGWPoolIterCtx& ctx)
   return 0;
 }
 
+int RGWRados::pool_iterate_begin(const rgw_pool& pool, const string& cursor, RGWPoolIterCtx& ctx)
+{
+  librados::IoCtx& io_ctx = ctx.io_ctx;
+  librados::NObjectIterator& iter = ctx.iter;
+
+  int r = open_pool_ctx(pool, io_ctx);
+  if (r < 0)
+    return r;
+
+  librados::ObjectCursor oc;
+  if (!oc.from_str(cursor)) {
+    ldout(cct, 10) << "failed to parse cursor: " << cursor << dendl;
+    return -EINVAL;
+  }
+
+  iter = io_ctx.nobjects_begin(oc);
+
+  return 0;
+}
+
+string RGWRados::pool_iterate_get_cursor(RGWPoolIterCtx& ctx)
+{
+  return ctx.iter.get_cursor().to_str();
+}
+
 int RGWRados::pool_iterate(RGWPoolIterCtx& ctx, uint32_t num, vector<rgw_bucket_dir_entry>& objs,
                            bool *is_truncated, RGWAccessListFilter *filter)
 {
@@ -12242,21 +12640,27 @@ struct RGWAccessListFilterPrefix : public RGWAccessListFilter {
   }
 };
 
-int RGWRados::list_raw_objects(const rgw_pool& pool, const string& prefix_filter,
-                              int max, RGWListRawObjsCtx& ctx, list<string>& oids,
-                              bool *is_truncated)
+int RGWRados::list_raw_objects_init(const rgw_pool& pool, const string& marker, RGWListRawObjsCtx *ctx)
 {
-  RGWAccessListFilterPrefix filter(prefix_filter);
-
-  if (!ctx.initialized) {
-    int r = pool_iterate_begin(pool, ctx.iter_ctx);
+  if (!ctx->initialized) {
+    int r = pool_iterate_begin(pool, marker, ctx->iter_ctx);
     if (r < 0) {
       ldout(cct, 10) << "failed to list objects pool_iterate_begin() returned r=" << r << dendl;
       return r;
     }
-    ctx.initialized = true;
+    ctx->initialized = true;
   }
+  return 0;
+}
 
+int RGWRados::list_raw_objects_next(const string& prefix_filter, int max,
+                                    RGWListRawObjsCtx& ctx, list<string>& oids,
+                                    bool *is_truncated)
+{
+  if (!ctx.initialized) {
+    return -EINVAL;
+  }
+  RGWAccessListFilterPrefix filter(prefix_filter);
   vector<rgw_bucket_dir_entry> objs;
   int r = pool_iterate(ctx.iter_ctx, max, objs, is_truncated, &filter);
   if (r < 0) {
@@ -12273,6 +12677,25 @@ int RGWRados::list_raw_objects(const rgw_pool& pool, const string& prefix_filter
   return oids.size();
 }
 
+int RGWRados::list_raw_objects(const rgw_pool& pool, const string& prefix_filter,
+                              int max, RGWListRawObjsCtx& ctx, list<string>& oids,
+                              bool *is_truncated)
+{
+  if (!ctx.initialized) {
+    int r = list_raw_objects_init(pool, string(), &ctx);
+    if (r < 0) {
+      return r;
+    }
+  }
+
+  return list_raw_objects_next(prefix_filter, max, ctx, oids, is_truncated);
+}
+
+string RGWRados::list_raw_objs_get_cursor(RGWListRawObjsCtx& ctx)
+{
+  return pool_iterate_get_cursor(ctx.iter_ctx);
+}
+
 int RGWRados::list_bi_log_entries(RGWBucketInfo& bucket_info, int shard_id, string& marker, uint32_t max,
                                   std::list<rgw_bi_log_entry>& result, bool *truncated)
 {
@@ -12587,10 +13010,9 @@ int RGWRados::process_lc()
   return lc->process();
 }
 
-int RGWRados::process_expire_objects()
+bool RGWRados::process_expire_objects()
 {
-  obj_expirer->inspect_all_shards(utime_t(), ceph_clock_now());
-  return 0;
+  return obj_expirer->inspect_all_shards(utime_t(), ceph_clock_now());
 }
 
 int RGWRados::cls_rgw_init_index(librados::IoCtx& index_ctx, librados::ObjectWriteOperation& op, string& oid)
@@ -12607,10 +13029,8 @@ int RGWRados::cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag,
   if (_zones_trace) {
     zones_trace = *_zones_trace;
   }
-  else {
-    zones_trace.insert(get_zone().id);
-  }
-  
+  zones_trace.insert(get_zone().id);
+
   ObjectWriteOperation o;
   cls_rgw_obj_key key(obj.key.get_index_key_name(), obj.key.instance);
   cls_rgw_guard_bucket_resharding(o, -ERR_BUSY_RESHARDING);
@@ -12628,16 +13048,22 @@ int RGWRados::cls_obj_complete_op(BucketShard& bs, const rgw_obj& obj, RGWModify
   dir_meta = ent.meta;
   dir_meta.category = category;
 
+  rgw_zone_set zones_trace;
+  if (_zones_trace) {
+    zones_trace = *_zones_trace;
+  }
+  zones_trace.insert(get_zone().id);
+
   rgw_bucket_entry_ver ver;
   ver.pool = pool;
   ver.epoch = epoch;
   cls_rgw_obj_key key(ent.key.name, ent.key.instance);
   cls_rgw_guard_bucket_resharding(o, -ERR_BUSY_RESHARDING);
   cls_rgw_bucket_complete_op(o, op, tag, ver, key, dir_meta, remove_objs,
-                             get_zone().log_data, bilog_flags, _zones_trace);
+                             get_zone().log_data, bilog_flags, &zones_trace);
   complete_op_data *arg;
   index_completion_manager->create_completion(obj, op, tag, ver, key, dir_meta, remove_objs,
-                                              get_zone().log_data, bilog_flags, _zones_trace, &arg);
+                                              get_zone().log_data, bilog_flags, &zones_trace, &arg);
   librados::AioCompletion *completion = arg->rados_completion;
   int ret = bs.index_ctx.aio_operate(bs.bucket_obj, arg->rados_completion, &o);
   completion->release(); /* can't reference arg here, as it might have already been released */
@@ -12684,16 +13110,26 @@ int RGWRados::cls_obj_set_bucket_tag_timeout(RGWBucketInfo& bucket_info, uint64_
   return CLSRGWIssueSetTagTimeout(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio, timeout)();
 }
 
-int RGWRados::cls_bucket_list(RGWBucketInfo& bucket_info, int shard_id, rgw_obj_index_key& start, const string& prefix,
-                             uint32_t num_entries, bool list_versions, map<string, rgw_bucket_dir_entry>& m,
-                             bool *is_truncated, rgw_obj_index_key *last_entry,
-                             bool (*force_check_filter)(const string&  name))
+
+int RGWRados::cls_bucket_list_ordered(RGWBucketInfo& bucket_info,
+                                     int shard_id,
+                                     rgw_obj_index_key& start,
+                                     const string& prefix,
+                                     uint32_t num_entries,
+                                     bool list_versions,
+                                     map<string, rgw_bucket_dir_entry>& m,
+                                     bool *is_truncated,
+                                     rgw_obj_index_key *last_entry,
+                                     bool (*force_check_filter)(const string& name))
 {
-  ldout(cct, 10) << "cls_bucket_list " << bucket_info.bucket << " start " << start.name << "[" << start.instance << "] num_entries " << num_entries << dendl;
+  ldout(cct, 10) << "cls_bucket_list_ordered " << bucket_info.bucket <<
+    " start " << start.name << "[" << start.instance << "] num_entries " <<
+    num_entries << dendl;
 
   librados::IoCtx index_ctx;
   // key   - oid (for different shards if there is any)
-  // value - list result for the corresponding oid (shard), it is filled by the AIO callback
+  // value - list result for the corresponding oid (shard), it is filled by
+  //         the AIO callback
   map<int, string> oids;
   map<int, struct rgw_cls_list_ret> list_results;
   int r = open_bucket_index(bucket_info, index_ctx, oids, shard_id);
@@ -12701,8 +13137,9 @@ int RGWRados::cls_bucket_list(RGWBucketInfo& bucket_info, int shard_id, rgw_obj_
     return r;
 
   cls_rgw_obj_key start_key(start.name, start.instance);
-  r = CLSRGWIssueBucketList(index_ctx, start_key, prefix, num_entries, list_versions,
-                            oids, list_results, cct->_conf->rgw_bucket_index_max_aio)();
+  r = CLSRGWIssueBucketList(index_ctx, start_key, prefix, num_entries,
+                           list_versions, oids, list_results,
+                           cct->_conf->rgw_bucket_index_max_aio)();
   if (r < 0)
     return r;
 
@@ -12738,19 +13175,24 @@ int RGWRados::cls_bucket_list(RGWBucketInfo& bucket_info, int shard_id, rgw_obj_
     const string& name = vcurrents[pos]->first;
     struct rgw_bucket_dir_entry& dirent = vcurrents[pos]->second;
 
-    bool force_check = force_check_filter && force_check_filter(dirent.key.name);
-    if ((!dirent.exists && !dirent.is_delete_marker()) || !dirent.pending_map.empty() || force_check) {
+    bool force_check = force_check_filter &&
+        force_check_filter(dirent.key.name);
+    if ((!dirent.exists && !dirent.is_delete_marker()) ||
+        !dirent.pending_map.empty() ||
+        force_check) {
       /* there are uncommitted ops. We need to check the current state,
        * and if the tags are old we need to do cleanup as well. */
       librados::IoCtx sub_ctx;
       sub_ctx.dup(index_ctx);
-      r = check_disk_state(sub_ctx, bucket_info, dirent, dirent, updates[vnames[pos]]);
+      r = check_disk_state(sub_ctx, bucket_info, dirent, dirent,
+                          updates[vnames[pos]]);
       if (r < 0 && r != -ENOENT) {
           return r;
       }
     }
     if (r >= 0) {
-      ldout(cct, 10) << "RGWRados::cls_bucket_list: got " << dirent.key.name << "[" << dirent.key.instance << "]" << dendl;
+      ldout(cct, 10) << "RGWRados::cls_bucket_list_ordered: got " <<
+       dirent.key.name << "[" << dirent.key.instance << "]" << dendl;
       m[name] = std::move(dirent);
       ++count;
     }
@@ -12772,14 +13214,16 @@ int RGWRados::cls_bucket_list(RGWBucketInfo& bucket_info, int shard_id, rgw_obj_
       // we don't care if we lose suggested updates, send them off blindly
       AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
       index_ctx.aio_operate(miter->first, c, &o);
-        c->release();
+      c->release();
     }
   }
 
   // Check if all the returned entries are consumed or not
   for (size_t i = 0; i < vcurrents.size(); ++i) {
-    if (vcurrents[i] != vends[i])
+    if (vcurrents[i] != vends[i]) {
       *is_truncated = true;
+      break;
+    }
   }
   if (!m.empty())
     *last_entry = m.rbegin()->first;
@@ -12787,7 +13231,131 @@ int RGWRados::cls_bucket_list(RGWBucketInfo& bucket_info, int shard_id, rgw_obj_
   return 0;
 }
 
-int RGWRados::cls_obj_usage_log_add(const string& oid, rgw_usage_log_info& info)
+
+int RGWRados::cls_bucket_list_unordered(RGWBucketInfo& bucket_info,
+                                       int shard_id,
+                                       rgw_obj_index_key& start,
+                                       const string& prefix,
+                                       uint32_t num_entries,
+                                       bool list_versions,
+                                       std::vector<rgw_bucket_dir_entry>& ent_list,
+                                       bool *is_truncated,
+                                       rgw_obj_index_key *last_entry,
+                                       bool (*force_check_filter)(const string& name)) {
+  ldout(cct, 10) << "cls_bucket_list_unordered " << bucket_info.bucket <<
+    " start " << start.name << "[" << start.instance <<
+    "] num_entries " << num_entries << dendl;
+
+  *is_truncated = false;
+  librados::IoCtx index_ctx;
+
+  rgw_obj_index_key my_start = start;
+
+  map<int, string> oids;
+  int r = open_bucket_index(bucket_info, index_ctx, oids, shard_id);
+  if (r < 0)
+    return r;
+  const uint32_t num_shards = oids.size();
+
+  uint32_t current_shard;
+  if (shard_id >= 0) {
+    current_shard = shard_id;
+  } else if (my_start.empty()) {
+    current_shard = 0u;
+  } else {
+    current_shard =
+      rgw_bucket_shard_index(my_start.name, num_shards);
+  }
+
+  uint32_t count = 0u;
+  map<string, bufferlist> updates;
+  std::string last_added_entry;
+  while (count <= num_entries &&
+        ((shard_id >= 0 && current_shard == uint32_t(shard_id)) ||
+         current_shard < num_shards)) {
+    // key   - oid (for different shards if there is any)
+    // value - list result for the corresponding oid (shard), it is filled by
+    //         the AIO callback
+    map<int, struct rgw_cls_list_ret> list_results;
+    r = CLSRGWIssueBucketList(index_ctx, my_start, prefix, num_entries,
+                             list_versions, oids, list_results,
+                             cct->_conf->rgw_bucket_index_max_aio)();
+    if (r < 0)
+      return r;
+
+    const std::string& oid = oids[current_shard];
+    assert(list_results.find(current_shard) != list_results.end());
+    auto& result = list_results[current_shard];
+    for (auto& entry : result.dir.m) {
+      rgw_bucket_dir_entry& dirent = entry.second;
+
+      bool force_check = force_check_filter &&
+       force_check_filter(dirent.key.name);
+      if ((!dirent.exists && !dirent.is_delete_marker()) ||
+         !dirent.pending_map.empty() ||
+         force_check) {
+       /* there are uncommitted ops. We need to check the current state,
+        * and if the tags are old we need to do cleanup as well. */
+       librados::IoCtx sub_ctx;
+       sub_ctx.dup(index_ctx);
+       r = check_disk_state(sub_ctx, bucket_info, dirent, dirent, updates[oid]);
+       if (r < 0 && r != -ENOENT) {
+         return r;
+       }
+      }
+
+      // at this point either r >=0 or r == -ENOENT
+      if (r >= 0) { // i.e., if r != -ENOENT
+       ldout(cct, 10) << "RGWRados::cls_bucket_list_unordered: got " <<
+         dirent.key.name << "[" << dirent.key.instance << "]" << dendl;
+
+       if (count < num_entries) {
+         last_added_entry = entry.first;
+         my_start = dirent.key;
+         ent_list.emplace_back(std::move(dirent));
+         ++count;
+       } else {
+         *is_truncated = true;
+         goto check_updates;
+       }
+      } else { // r == -ENOENT
+       // in the case of -ENOENT, make sure we're advancing marker
+       // for possible next call to CLSRGWIssueBucketList
+       my_start = dirent.key;
+      }
+    } // entry for loop
+
+    if (!result.is_truncated) {
+      // if we reached the end of the shard read next shard
+      ++current_shard;
+      my_start = rgw_obj_index_key();
+    }
+  } // shard loop
+
+check_updates:
+  // suggest updates if there is any
+  map<string, bufferlist>::iterator miter = updates.begin();
+  for (; miter != updates.end(); ++miter) {
+    if (miter->second.length()) {
+      ObjectWriteOperation o;
+      cls_rgw_suggest_changes(o, miter->second);
+      // we don't care if we lose suggested updates, send them off blindly
+      AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
+      index_ctx.aio_operate(miter->first, c, &o);
+      c->release();
+    }
+  }
+
+  if (last_entry && !ent_list.empty()) {
+    *last_entry = last_added_entry;
+  }
+
+  return 0;
+}
+
+
+int RGWRados::cls_obj_usage_log_add(const string& oid,
+                                   rgw_usage_log_info& info)
 {
   rgw_raw_obj obj(get_zone_params().usage_log_pool, oid);
 
@@ -12833,10 +13401,7 @@ int RGWRados::cls_obj_usage_log_trim(string& oid, string& user, uint64_t start_e
     return r;
   }
 
-  ObjectWriteOperation op;
-  cls_rgw_usage_log_trim(op, user, start_epoch, end_epoch);
-
-  r = ref.ioctx.operate(ref.oid, &op);
+  r = cls_rgw_usage_log_trim(ref.ioctx, ref.oid, user, start_epoch, end_epoch);
   return r;
 }
 
@@ -13045,6 +13610,23 @@ int RGWRados::cls_user_get_header(const string& user_id, cls_user_header *header
   return 0;
 }
 
+int RGWRados::cls_user_reset_stats(const string& user_id)
+{
+  string buckets_obj_id;
+  rgw_get_buckets_obj(user_id, buckets_obj_id);
+  rgw_raw_obj obj(get_zone_params().user_uid_pool, buckets_obj_id);
+
+  rgw_rados_ref ref;
+  int r = get_raw_obj_ref(obj, &ref);
+  if (r < 0) {
+    return r;
+  }
+
+  librados::ObjectWriteOperation op;
+  ::cls_user_reset_stats(op);
+  return ref.ioctx.operate(ref.oid, &op);
+}
+
 int RGWRados::cls_user_get_header_async(const string& user_id, RGWGetUserHeader_CB *ctx)
 {
   string buckets_obj_id;
@@ -13281,8 +13863,9 @@ int RGWRados::check_quota(const rgw_user& bucket_owner, rgw_bucket& bucket,
 }
 
 void RGWRados::get_bucket_index_objects(const string& bucket_oid_base,
-    uint32_t num_shards, map<int, string>& bucket_objects, int shard_id)
-{
+                                       uint32_t num_shards,
+                                       map<int, string>& bucket_objects,
+                                       int shard_id) {
   if (!num_shards) {
     bucket_objects[0] = bucket_oid_base;
   } else {
@@ -13336,9 +13919,7 @@ int RGWRados::get_target_shard_id(const RGWBucketInfo& bucket_info, const string
           *shard_id = -1;
         }
       } else {
-        uint32_t sid = ceph_str_hash_linux(obj_key.c_str(), obj_key.size());
-        uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
-        sid = rgw_shards_mod(sid2, bucket_info.num_shards);
+        uint32_t sid = rgw_bucket_shard_index(obj_key, bucket_info.num_shards);
         if (shard_id) {
           *shard_id = (int)sid;
         }
@@ -13376,9 +13957,7 @@ int RGWRados::get_bucket_index_object(const string& bucket_oid_base, const strin
           *shard_id = -1;
         }
       } else {
-        uint32_t sid = ceph_str_hash_linux(obj_key.c_str(), obj_key.size());
-        uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
-        sid = rgw_shards_mod(sid2, num_shards);
+        uint32_t sid = rgw_bucket_shard_index(obj_key, num_shards);
         char buf[bucket_oid_base.size() + 32];
         snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), sid);
         (*bucket_obj) = buf;
@@ -13668,14 +14247,14 @@ uint64_t RGWRados::next_bucket_id()
   return ++max_bucket_id;
 }
 
-RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread)
+RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread,
+                                                bool quota_threads, bool run_sync_thread, bool run_reshard_thread, bool use_cache)
 {
-  int use_cache = cct->_conf->rgw_cache_enabled;
   RGWRados *store = NULL;
   if (!use_cache) {
     store = new RGWRados;
   } else {
-    store = new RGWCache<RGWRados>; 
+    store = new RGWCache<RGWRados>;
   }
 
   if (store->initialize(cct, use_gc_thread, use_lc_thread, quota_threads, run_sync_thread, run_reshard_thread) < 0) {
@@ -13832,3 +14411,71 @@ int rgw_compression_info_from_attrset(map<string, bufferlist>& attrs, bool& need
   }
 }
 
+bool RGWRados::call(std::string command, cmdmap_t& cmdmap, std::string format,
+                    bufferlist& out)
+{
+  if (command == "cache list") {
+    boost::optional<std::string> filter;
+    auto i = cmdmap.find("filter");
+    if (i != cmdmap.cend()) {
+      filter = boost::get<std::string>(i->second);
+    }
+    std::unique_ptr<Formatter> f(ceph::Formatter::create(format, "table"));
+    if (f) {
+      f->open_array_section("cache_entries");
+      call_list(filter, f.get());
+      f->close_section();
+      f->flush(out);
+      return true;
+    } else {
+      out.append("Unable to create Formatter.\n");
+      return false;
+    }
+  } else if (command == "cache inspect") {
+    std::unique_ptr<Formatter> f(ceph::Formatter::create(format, "json-pretty"));
+    if (f) {
+      const auto& target = boost::get<std::string>(cmdmap["target"]);
+      if (call_inspect(target, f.get())) {
+        f->flush(out);
+        return true;
+      } else {
+        out.append(string("Unable to find entry ") + target + string(".\n"));
+        return false;
+      }
+    } else {
+      out.append("Unable to create Formatter.\n");
+      return false;
+    }
+  } else if (command == "cache erase") {
+    const auto& target = boost::get<std::string>(cmdmap["target"]);
+    if (call_erase(target)) {
+      return true;
+    } else {
+      out.append(string("Unable to find entry ") + target + string(".\n"));
+      return false;
+    }
+  } else if (command == "cache zap") {
+    call_zap();
+    return true;
+  }
+  return false;
+}
+
+void RGWRados::call_list(const boost::optional<std::string>&,
+                         ceph::Formatter*)
+{
+  return;
+}
+
+bool RGWRados::call_inspect(const std::string&, Formatter*)
+{
+  return false;
+}
+
+bool RGWRados::call_erase(const std::string&) {
+  return false;
+}
+
+void RGWRados::call_zap() {
+  return;
+}