]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_bucket.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_bucket.cc
index 725df23a1400a9003a8f51d48923ab360c204799..d13c856ab31b5b7c0a61a5bb765b316abe784138 100644 (file)
@@ -50,7 +50,6 @@
 
 #include "cls/user/cls_user_types.h"
 
-#include "rgw_sal.h"
 #include "rgw_sal_rados.h"
 
 #define dout_context g_ceph_context
 
 #define BUCKET_TAG_TIMEOUT 30
 
+using namespace std;
+
 // default number of entries to list with each bucket listing call
 // (use marker to bridge between calls)
 static constexpr size_t listing_max_entries = 1000;
 
+void init_bucket(rgw_bucket *b, const char *t, const char *n, const char *dp, const char *ip, const char *m, const char *id)
+{
+  b->tenant = t;
+  b->name = n;
+  b->marker = m;
+  b->bucket_id = id;
+  b->explicit_placement.data_pool = rgw_pool(dp);
+  b->explicit_placement.index_pool = rgw_pool(ip);
+}
 
 /*
  * The tenant_name is always returned on purpose. May be empty, of course.
@@ -148,24 +158,6 @@ void rgw_parse_url_bucket(const string &bucket, const string& auth_tenant,
   }
 }
 
-/**
- * Get all the buckets owned by a user and fill up an RGWUserBuckets with them.
- * Returns: 0 on success, -ERR# on failure.
- */
-int rgw_read_user_buckets(const DoutPrefixProvider *dpp, 
-                          rgw::sal::RGWRadosStore * store,
-                          const rgw_user& user_id,
-                          rgw::sal::RGWBucketList& buckets,
-                          const string& marker,
-                          const string& end_marker,
-                          uint64_t max,
-                          bool need_stats,
-                         optional_yield y)
-{
-  rgw::sal::RGWRadosUser user(store, user_id);
-  return user.list_buckets(dpp, marker, end_marker, max, need_stats, buckets, y);
-}
-
 int rgw_bucket_parse_bucket_instance(const string& bucket_instance, string *bucket_name, string *bucket_id, int *shard_id)
 {
   auto pos = bucket_instance.rfind(':');
@@ -260,13 +252,12 @@ static void dump_mulipart_index_results(list<rgw_obj_index_key>& objs_to_unlink,
   }
 }
 
-void check_bad_user_bucket_mapping(rgw::sal::RGWRadosStore *store, const rgw_user& user_id,
+void check_bad_user_bucket_mapping(rgw::sal::Store* store, rgw::sal::User* user,
                                   bool fix,
                                   optional_yield y,
                                    const DoutPrefixProvider *dpp)
 {
-  rgw::sal::RGWBucketList user_buckets;
-  rgw::sal::RGWRadosUser user(store, user_id);
+  rgw::sal::BucketList user_buckets;
   string marker;
 
   CephContext *cct = store->ctx();
@@ -274,14 +265,14 @@ void check_bad_user_bucket_mapping(rgw::sal::RGWRadosStore *store, const rgw_use
   size_t max_entries = cct->_conf->rgw_list_buckets_max_chunk;
 
   do {
-    int ret = user.list_buckets(dpp, marker, string(), max_entries, false, user_buckets, y);
+    int ret = user->list_buckets(dpp, marker, string(), max_entries, false, user_buckets, y);
     if (ret < 0) {
       ldout(store->ctx(), 0) << "failed to read user buckets: "
                             << cpp_strerror(-ret) << dendl;
       return;
     }
 
-    map<string, std::unique_ptr<rgw::sal::RGWBucket>>& buckets = user_buckets.get_buckets();
+    map<string, std::unique_ptr<rgw::sal::Bucket>>& buckets = user_buckets.get_buckets();
     for (auto i = buckets.begin();
          i != buckets.end();
          ++i) {
@@ -289,26 +280,21 @@ void check_bad_user_bucket_mapping(rgw::sal::RGWRadosStore *store, const rgw_use
 
       auto& bucket = i->second;
 
-      RGWBucketInfo bucket_info;
-      real_time mtime;
-      int r = store->getRados()->get_bucket_info(store->svc(), user_id.tenant, bucket->get_name(), bucket_info, &mtime, null_yield, dpp);
+      std::unique_ptr<rgw::sal::Bucket> actual_bucket;
+      int r = store->get_bucket(dpp, user, user->get_tenant(), bucket->get_name(), &actual_bucket, null_yield);
       if (r < 0) {
         ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << dendl;
         continue;
       }
 
-      rgw_bucket& actual_bucket = bucket_info.bucket;
-
-      if (actual_bucket.name.compare(bucket->get_name()) != 0 ||
-          actual_bucket.tenant.compare(bucket->get_tenant()) != 0 ||
-          actual_bucket.marker.compare(bucket->get_marker()) != 0 ||
-          actual_bucket.bucket_id.compare(bucket->get_bucket_id()) != 0) {
+      if (actual_bucket->get_name().compare(bucket->get_name()) != 0 ||
+          actual_bucket->get_tenant().compare(bucket->get_tenant()) != 0 ||
+          actual_bucket->get_marker().compare(bucket->get_marker()) != 0 ||
+          actual_bucket->get_bucket_id().compare(bucket->get_bucket_id()) != 0) {
         cout << "bucket info mismatch: expected " << actual_bucket << " got " << bucket << std::endl;
         if (fix) {
           cout << "fixing" << std::endl;
-          r = store->ctl()->bucket->link_bucket(user_id, actual_bucket,
-                                             bucket_info.creation_time,
-                                            null_yield, dpp);
+         r = actual_bucket->chown(dpp, user, nullptr, null_yield);
           if (r < 0) {
             cerr << "failed to fix bucket: " << cpp_strerror(-r) << std::endl;
           }
@@ -318,15 +304,16 @@ void check_bad_user_bucket_mapping(rgw::sal::RGWRadosStore *store, const rgw_use
   } while (user_buckets.is_truncated());
 }
 
-// note: function type conforms to RGWRados::check_filter_t
-bool rgw_bucket_object_check_filter(const string& oid)
+// returns true if entry is in the empty namespace. note: function
+// type conforms to type RGWBucketListNameFilter
+bool rgw_bucket_object_check_filter(const std::string& oid)
 {
-  rgw_obj_key key;
-  string ns;
-  return rgw_obj_key::oid_to_key_in_ns(oid, &key, ns);
+  const static std::string empty_ns;
+  rgw_obj_key key; // thrown away but needed for parsing
+  return rgw_obj_key::oid_to_key_in_ns(oid, &key, empty_ns);
 }
 
-int rgw_remove_object(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, const RGWBucketInfo& bucket_info, const rgw_bucket& bucket, rgw_obj_key& key)
+int rgw_remove_object(const DoutPrefixProvider *dpp, rgw::sal::Store* store, rgw::sal::Bucket* bucket, rgw_obj_key& key)
 {
   RGWObjectCtx rctx(store);
 
@@ -334,173 +321,9 @@ int rgw_remove_object(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *st
     key.instance = "null";
   }
 
-  rgw_obj obj(bucket, key);
-
-  return store->getRados()->delete_obj(dpp, rctx, bucket_info, obj, bucket_info.versioning_status());
-}
-
-static int aio_wait(librados::AioCompletion *handle)
-{
-  librados::AioCompletion *c = (librados::AioCompletion *)handle;
-  c->wait_for_complete();
-  int ret = c->get_return_value();
-  c->release();
-  return ret;
-}
-
-static int drain_handles(list<librados::AioCompletion *>& pending)
-{
-  int ret = 0;
-  while (!pending.empty()) {
-    librados::AioCompletion *handle = pending.front();
-    pending.pop_front();
-    int r = aio_wait(handle);
-    if (r < 0) {
-      ret = r;
-    }
-  }
-  return ret;
-}
-
-int rgw_remove_bucket_bypass_gc(rgw::sal::RGWRadosStore *store, rgw_bucket& bucket,
-                                int concurrent_max, bool keep_index_consistent,
-                                optional_yield y,
-                                const DoutPrefixProvider *dpp)
-{
-  int ret;
-  map<RGWObjCategory, RGWStorageStats> stats;
-  std::vector<rgw_bucket_dir_entry> objs;
-  map<string, bool> common_prefixes;
-  RGWBucketInfo info;
-  RGWObjectCtx obj_ctx(store);
-  CephContext *cct = store->ctx();
-
-  string bucket_ver, master_ver;
-
-  ret = store->getRados()->get_bucket_info(store->svc(), bucket.tenant, bucket.name, info, NULL, null_yield, dpp);
-  if (ret < 0)
-    return ret;
-
-  ret = store->getRados()->get_bucket_stats(dpp, info, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, NULL);
-  if (ret < 0)
-    return ret;
-
-  ret = abort_bucket_multiparts(dpp, store, cct, info);
-  if (ret < 0) {
-    return ret;
-  }
-
-  RGWRados::Bucket target(store->getRados(), info);
-  RGWRados::Bucket::List list_op(&target);
-
-  list_op.params.list_versions = true;
-  list_op.params.allow_unordered = true;
-
-  std::list<librados::AioCompletion*> handles;
-
-  int max_aio = concurrent_max;
-  bool is_truncated = true;
-
-  while (is_truncated) {
-    objs.clear();
-    ret = list_op.list_objects(dpp, listing_max_entries, &objs, &common_prefixes,
-                              &is_truncated, null_yield);
-    if (ret < 0)
-      return ret;
-
-    std::vector<rgw_bucket_dir_entry>::iterator it = objs.begin();
-    for (; it != objs.end(); ++it) {
-      RGWObjState *astate = NULL;
-      rgw_obj obj(bucket, (*it).key);
-
-      ret = store->getRados()->get_obj_state(dpp, &obj_ctx, info, obj, &astate, false, y);
-      if (ret == -ENOENT) {
-        ldpp_dout(dpp, 1) << "WARNING: cannot find obj state for obj " << obj.get_oid() << dendl;
-        continue;
-      }
-      if (ret < 0) {
-        ldpp_dout(dpp, -1) << "ERROR: get obj state returned with error " << ret << dendl;
-        return ret;
-      }
-
-      if (astate->manifest) {
-        RGWObjManifest& manifest = *astate->manifest;
-        RGWObjManifest::obj_iterator miter = manifest.obj_begin(dpp);
-        rgw_obj head_obj = manifest.get_obj();
-        rgw_raw_obj raw_head_obj;
-        store->getRados()->obj_to_raw(info.placement_rule, head_obj, &raw_head_obj);
-
-
-        for (; miter != manifest.obj_end(dpp) && max_aio--; ++miter) {
-          if (!max_aio) {
-            ret = drain_handles(handles);
-            if (ret < 0) {
-              ldpp_dout(dpp, -1) << "ERROR: could not drain handles as aio completion returned with " << ret << dendl;
-              return ret;
-            }
-            max_aio = concurrent_max;
-          }
-
-          rgw_raw_obj last_obj = miter.get_location().get_raw_obj(store);
-          if (last_obj == raw_head_obj) {
-            // have the head obj deleted at the end
-            continue;
-          }
-
-          ret = store->getRados()->delete_raw_obj_aio(dpp, last_obj, handles);
-          if (ret < 0) {
-            ldpp_dout(dpp, -1) << "ERROR: delete obj aio failed with " << ret << dendl;
-            return ret;
-          }
-        } // for all shadow objs
-
-        ret = store->getRados()->delete_obj_aio(dpp, head_obj, info, astate, handles, keep_index_consistent, null_yield);
-        if (ret < 0) {
-          ldpp_dout(dpp, -1) << "ERROR: delete obj aio failed with " << ret << dendl;
-          return ret;
-        }
-      }
-
-      if (!max_aio) {
-        ret = drain_handles(handles);
-        if (ret < 0) {
-          ldpp_dout(dpp, -1) << "ERROR: could not drain handles as aio completion returned with " << ret << dendl;
-          return ret;
-        }
-        max_aio = concurrent_max;
-      }
-      obj_ctx.invalidate(obj);
-    } // for all RGW objects
-  }
-
-  ret = drain_handles(handles);
-  if (ret < 0) {
-    ldpp_dout(dpp, -1) << "ERROR: could not drain handles as aio completion returned with " << ret << dendl;
-    return ret;
-  }
-
-  ret = store->ctl()->bucket->sync_user_stats(dpp, info.owner, info, y);
-  if (ret < 0) {
-     ldpp_dout(dpp, 1) << "WARNING: failed sync user stats before bucket delete. ret=" <<  ret << dendl;
-  }
-
-  RGWObjVersionTracker objv_tracker;
-
-  // this function can only be run if caller wanted children to be
-  // deleted, so we can ignore the check for children as any that
-  // remain are detritus from a prior bug
-  ret = store->getRados()->delete_bucket(info, objv_tracker, y, dpp, false);
-  if (ret < 0) {
-    ldpp_dout(dpp, -1) << "ERROR: could not remove bucket " << bucket.name << dendl;
-    return ret;
-  }
-
-  ret = store->ctl()->bucket->unlink_bucket(info.owner, bucket, null_yield, dpp, false);
-  if (ret < 0) {
-    ldpp_dout(dpp, -1) << "ERROR: unable to remove user bucket information" << dendl;
-  }
+  std::unique_ptr<rgw::sal::Object> object = bucket->get_object(key);
 
-  return ret;
+  return object->delete_object(dpp, &rctx, null_yield);
 }
 
 static void set_err_msg(std::string *sink, std::string msg)
@@ -509,77 +332,72 @@ static void set_err_msg(std::string *sink, std::string msg)
     *sink = msg;
 }
 
-int RGWBucket::init(rgw::sal::RGWRadosStore *storage, RGWBucketAdminOpState& op_state,
-                    optional_yield y, const DoutPrefixProvider *dpp, std::string *err_msg,
-                    map<string, bufferlist> *pattrs)
+int RGWBucket::init(rgw::sal::Store* _store, RGWBucketAdminOpState& op_state,
+                    optional_yield y, const DoutPrefixProvider *dpp, std::string *err_msg)
 {
-  if (!storage) {
+  if (!_store) {
     set_err_msg(err_msg, "no storage!");
     return -EINVAL;
   }
 
-  store = storage;
+  store = _store;
 
-  rgw_user user_id = op_state.get_user_id();
-  bucket.tenant = user_id.tenant;
-  bucket.name = op_state.get_bucket_name();
+  std::string bucket_name = op_state.get_bucket_name();
 
-  if (bucket.name.empty() && user_id.empty())
+  if (bucket_name.empty() && op_state.get_user_id().empty())
     return -EINVAL;
-  
+
+  user = store->get_user(op_state.get_user_id());
+  std::string tenant = user->get_tenant();
+
   // split possible tenant/name
-  auto pos = bucket.name.find('/');
+  auto pos = bucket_name.find('/');
   if (pos != string::npos) {
-    bucket.tenant = bucket.name.substr(0, pos);
-    bucket.name = bucket.name.substr(pos + 1);
+    tenant = bucket_name.substr(0, pos);
+    bucket_name = bucket_name.substr(pos + 1);
   }
 
-  if (!bucket.name.empty()) {
-    int r = store->ctl()->bucket->read_bucket_info(
-        bucket, &bucket_info, y, dpp,
-        RGWBucketCtl::BucketInstance::GetParams().set_attrs(pattrs),
-        &ep_objv);
-    if (r < 0) {
-      set_err_msg(err_msg, "failed to fetch bucket info for bucket=" + bucket.name);
+  int r = store->get_bucket(dpp, user.get(), tenant, bucket_name, &bucket, y);
+  if (r < 0) {
+      set_err_msg(err_msg, "failed to fetch bucket info for bucket=" + bucket_name);
       return r;
-    }
-
-    op_state.set_bucket(bucket_info.bucket);
   }
 
-  if (!user_id.empty()) {
-    int r = store->ctl()->user->get_info_by_uid(dpp, user_id, &user_info, y);
+  op_state.set_bucket(bucket->clone());
+
+  if (!rgw::sal::User::empty(user.get())) {
+    r = user->load_user(dpp, y);
     if (r < 0) {
       set_err_msg(err_msg, "failed to fetch user info");
       return r;
     }
-
-    op_state.display_name = user_info.display_name;
   }
 
+  op_state.display_name = user->get_display_name();
+
   clear_failure();
   return 0;
 }
 
-bool rgw_find_bucket_by_id(const DoutPrefixProvider *dpp, CephContext *cct, RGWMetadataManager *mgr,
+bool rgw_find_bucket_by_id(const DoutPrefixProvider *dpp, CephContext *cct, rgw::sal::Store* store,
                            const string& marker, const string& bucket_id, rgw_bucket* bucket_out)
 {
   void *handle = NULL;
   bool truncated = false;
   string s;
 
-  int ret = mgr->list_keys_init(dpp, "bucket.instance", marker, &handle);
+  int ret = store->meta_list_keys_init(dpp, "bucket.instance", marker, &handle);
   if (ret < 0) {
     cerr << "ERROR: can't get key: " << cpp_strerror(-ret) << std::endl;
-    mgr->list_keys_complete(handle);
+    store->meta_list_keys_complete(handle);
     return -ret;
   }
   do {
       list<string> keys;
-      ret = mgr->list_keys_next(handle, 1000, keys, &truncated);
+      ret = store->meta_list_keys_next(dpp, handle, 1000, keys, &truncated);
       if (ret < 0) {
         cerr << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << std::endl;
-        mgr->list_keys_complete(handle);
+        store->meta_list_keys_complete(handle);
         return -ret;
       }
       for (list<string>::iterator iter = keys.begin(); iter != keys.end(); ++iter) {
@@ -589,148 +407,19 @@ bool rgw_find_bucket_by_id(const DoutPrefixProvider *dpp, CephContext *cct, RGWM
           continue;
         }
         if (bucket_id == bucket_out->bucket_id) {
-          mgr->list_keys_complete(handle);
+          store->meta_list_keys_complete(handle);
           return true;
         }
       }
   } while (truncated);
-  mgr->list_keys_complete(handle);
+  store->meta_list_keys_complete(handle);
   return false;
 }
 
-int RGWBucket::link(RGWBucketAdminOpState& op_state, optional_yield y, const DoutPrefixProvider *dpp,
-                    map<string, bufferlist>& attrs, std::string *err_msg)
-{
-  if (!op_state.is_user_op()) {
-    set_err_msg(err_msg, "empty user id");
-    return -EINVAL;
-  }
-
-  string bucket_id = op_state.get_bucket_id();
-
-  std::string display_name = op_state.get_user_display_name();
-  rgw_bucket& bucket = op_state.get_bucket();
-  if (!bucket_id.empty() && bucket_id != bucket.bucket_id) {
-    set_err_msg(err_msg,
-       "specified bucket id does not match " + bucket.bucket_id);
-    return -EINVAL;
-  }
-  rgw_bucket old_bucket = bucket;
-  rgw_user user_id = op_state.get_user_id();
-  bucket.tenant = user_id.tenant;
-  if (!op_state.new_bucket_name.empty()) {
-    auto pos = op_state.new_bucket_name.find('/');
-    if (pos != string::npos) {
-      bucket.tenant = op_state.new_bucket_name.substr(0, pos);
-      bucket.name = op_state.new_bucket_name.substr(pos + 1);
-    } else {
-      bucket.name = op_state.new_bucket_name;
-    }
-  }
-
-  RGWObjVersionTracker objv_tracker;
-  RGWObjVersionTracker old_version = bucket_info.objv_tracker;
-
-  map<string, bufferlist>::iterator aiter = attrs.find(RGW_ATTR_ACL);
-  if (aiter == attrs.end()) {
-       // should never happen; only pre-argonaut buckets lacked this.
-    ldpp_dout(dpp, 0) << "WARNING: can't bucket link because no acl on bucket=" << old_bucket.name << dendl;
-    set_err_msg(err_msg,
-       "While crossing the Anavros you have displeased the goddess Hera."
-       "  You must sacrifice your ancient bucket " + bucket.bucket_id);
-    return -EINVAL;
-  }
-  bufferlist& aclbl = aiter->second;
-  RGWAccessControlPolicy policy;
-  ACLOwner owner;
-  try {
-   auto iter = aclbl.cbegin();
-   decode(policy, iter);
-   owner = policy.get_owner();
-  } catch (buffer::error& err) {
-    set_err_msg(err_msg, "couldn't decode policy");
-    return -EIO;
-  }
-
-  auto bucket_ctl = store->ctl()->bucket;
-  int r = bucket_ctl->unlink_bucket(owner.get_id(), old_bucket, y, dpp, false);
-  if (r < 0) {
-    set_err_msg(err_msg, "could not unlink policy from user " + owner.get_id().to_str());
-    return r;
-  }
-
-  // now update the user for the bucket...
-  if (display_name.empty()) {
-    ldpp_dout(dpp, 0) << "WARNING: user " << user_info.user_id << " has no display name set" << dendl;
-  }
-
-  RGWAccessControlPolicy policy_instance;
-  policy_instance.create_default(user_info.user_id, display_name);
-  owner = policy_instance.get_owner();
-
-  aclbl.clear();
-  policy_instance.encode(aclbl);
-
-  auto instance_params = RGWBucketCtl::BucketInstance::PutParams().set_attrs(&attrs);
-
-  bucket_info.owner = user_info.user_id;
-  if (bucket != old_bucket) {
-    bucket_info.bucket = bucket;
-    bucket_info.objv_tracker.version_for_read()->ver = 0;
-    instance_params.set_exclusive(true);
-  }
-
-  r = bucket_ctl->store_bucket_instance_info(bucket, bucket_info, y, dpp, instance_params);
-  if (r < 0) {
-    set_err_msg(err_msg, "ERROR: failed writing bucket instance info: " + cpp_strerror(-r));
-    return r;
-  }
-
-  RGWBucketEntryPoint ep;
-  ep.bucket = bucket_info.bucket;
-  ep.owner = user_info.user_id;
-  ep.creation_time = bucket_info.creation_time;
-  ep.linked = true;
-  map<string, bufferlist> ep_attrs;
-  rgw_ep_info ep_data{ep, ep_attrs};
-
-  /* link to user */
-  r = store->ctl()->bucket->link_bucket(user_info.user_id,
-                                     bucket_info.bucket,
-                                     ep.creation_time,
-                                     y, dpp, true, &ep_data);
-  if (r < 0) {
-    set_err_msg(err_msg, "failed to relink bucket");
-    return r;
-  }
-
-  if (bucket != old_bucket) {
-    // like RGWRados::delete_bucket -- excepting no bucket_index work.
-    r = bucket_ctl->remove_bucket_entrypoint_info(old_bucket, y, dpp,
-                                                  RGWBucketCtl::Bucket::RemoveParams()
-                                                  .set_objv_tracker(&ep_data.ep_objv));
-    if (r < 0) {
-      set_err_msg(err_msg, "failed to unlink old bucket endpoint " + old_bucket.tenant + "/" + old_bucket.name);
-      return r;
-    }
-
-    r = bucket_ctl->remove_bucket_instance_info(old_bucket, bucket_info, y, dpp,
-                                                RGWBucketCtl::BucketInstance::RemoveParams()
-                                                .set_objv_tracker(&old_version));
-    if (r < 0) {
-      set_err_msg(err_msg, "failed to unlink old bucket info");
-      return r;
-    }
-  }
-
-  return 0;
-}
-
 int RGWBucket::chown(RGWBucketAdminOpState& op_state, const string& marker,
                      optional_yield y, const DoutPrefixProvider *dpp, std::string *err_msg)
 {
-  int ret = store->ctl()->bucket->chown(store, bucket_info, user_info.user_id,
-                                     user_info.display_name, marker, y, dpp);
+  int ret = bucket->chown(dpp, user.get(), user.get(), y, &marker);
   if (ret < 0) {
     set_err_msg(err_msg, "Failed to change object ownership: " + cpp_strerror(-ret));
   }
@@ -738,36 +427,12 @@ int RGWBucket::chown(RGWBucketAdminOpState& op_state, const string& marker,
   return ret;
 }
 
-int RGWBucket::unlink(RGWBucketAdminOpState& op_state, optional_yield y, const DoutPrefixProvider *dpp, std::string *err_msg)
-{
-  rgw_bucket bucket = op_state.get_bucket();
-
-  if (!op_state.is_user_op()) {
-    set_err_msg(err_msg, "could not fetch user or user bucket info");
-    return -EINVAL;
-  }
-
-  int r = store->ctl()->bucket->unlink_bucket(user_info.user_id, bucket, y, dpp);
-  if (r < 0) {
-    set_err_msg(err_msg, "error unlinking bucket" + cpp_strerror(-r));
-  }
-
-  return r;
-}
-
 int RGWBucket::set_quota(RGWBucketAdminOpState& op_state, const DoutPrefixProvider *dpp, std::string *err_msg)
 {
-  rgw_bucket bucket = op_state.get_bucket();
-  RGWBucketInfo bucket_info;
-  map<string, bufferlist> attrs;
-  int r = store->getRados()->get_bucket_info(store->svc(), bucket.tenant, bucket.name, bucket_info, NULL, null_yield, dpp, &attrs);
-  if (r < 0) {
-    set_err_msg(err_msg, "could not get bucket info for bucket=" + bucket.name + ": " + cpp_strerror(-r));
-    return r;
-  }
+  bucket = op_state.get_bucket()->clone();
 
-  bucket_info.quota = op_state.quota;
-  r = store->getRados()->put_bucket_instance_info(bucket_info, false, real_time(), &attrs, dpp);
+  bucket->get_info().quota = op_state.quota;
+  int r = bucket->put_info(dpp, false, real_time());
   if (r < 0) {
     set_err_msg(err_msg, "ERROR: failed writing bucket instance info: " + cpp_strerror(-r));
     return r;
@@ -777,12 +442,13 @@ int RGWBucket::set_quota(RGWBucketAdminOpState& op_state, const DoutPrefixProvid
 
 int RGWBucket::remove_object(const DoutPrefixProvider *dpp, RGWBucketAdminOpState& op_state, std::string *err_msg)
 {
-  rgw_bucket bucket = op_state.get_bucket();
   std::string object_name = op_state.get_object_name();
 
   rgw_obj_key key(object_name);
 
-  int ret = rgw_remove_object(dpp, store, bucket_info, bucket, key);
+  bucket = op_state.get_bucket()->clone();
+
+  int ret = rgw_remove_object(dpp, store, bucket.get(), key);
   if (ret < 0) {
     set_err_msg(err_msg, "unable to remove object" + cpp_strerror(-ret));
     return ret;
@@ -791,10 +457,10 @@ int RGWBucket::remove_object(const DoutPrefixProvider *dpp, RGWBucketAdminOpStat
   return 0;
 }
 
-static void dump_bucket_index(const RGWRados::ent_map_t& result,  Formatter *f)
+static void dump_bucket_index(const vector<rgw_bucket_dir_entry>& objs,  Formatter *f)
 {
-  for (auto iter = result.begin(); iter != result.end(); ++iter) {
-    f->dump_string("object", iter->first);
+  for (auto iter = objs.begin(); iter != objs.end(); ++iter) {
+    f->dump_string("object", iter->key.name);
   }
 }
 
@@ -805,8 +471,7 @@ static void dump_bucket_usage(map<RGWObjCategory, RGWStorageStats>& stats, Forma
   formatter->open_object_section("usage");
   for (iter = stats.begin(); iter != stats.end(); ++iter) {
     RGWStorageStats& s = iter->second;
-    const char *cat_name = rgw_obj_category_name(iter->first);
-    formatter->open_object_section(cat_name);
+    formatter->open_object_section(to_string(iter->first));
     s.dump(formatter);
     formatter->close_section();
   }
@@ -832,43 +497,33 @@ int RGWBucket::check_bad_index_multipart(RGWBucketAdminOpState& op_state,
                const DoutPrefixProvider *dpp, std::string *err_msg)
 {
   bool fix_index = op_state.will_fix_index();
-  rgw_bucket bucket = op_state.get_bucket();
-
-  map<string, bool> common_prefixes;
 
   bool is_truncated;
   map<string, bool> meta_objs;
   map<rgw_obj_index_key, string> all_objs;
 
-  RGWBucketInfo bucket_info;
-  auto obj_ctx = store->svc()->sysobj->init_obj_ctx();
-  int r = store->getRados()->get_bucket_instance_info(obj_ctx, bucket, bucket_info, nullptr, nullptr, null_yield, dpp);
-  if (r < 0) {
-    ldpp_dout(dpp, 0) << "ERROR: " << __func__ << "(): get_bucket_instance_info(bucket=" << bucket << ") returned r=" << r << dendl;
-    return r;
-  }
+  bucket = op_state.get_bucket()->clone();
 
-  RGWRados::Bucket target(store->getRados(), bucket_info);
-  RGWRados::Bucket::List list_op(&target);
+  rgw::sal::Bucket::ListParams params;
 
-  list_op.params.list_versions = true;
-  list_op.params.ns = RGW_OBJ_NS_MULTIPART;
+  params.list_versions = true;
+  params.ns = RGW_OBJ_NS_MULTIPART;
 
   do {
-    vector<rgw_bucket_dir_entry> result;
-    int r = list_op.list_objects(dpp, listing_max_entries, &result,
-                                &common_prefixes, &is_truncated, null_yield);
+    rgw::sal::Bucket::ListResults results;
+    int r = bucket->list(dpp, params, listing_max_entries, results, null_yield);
     if (r < 0) {
-      set_err_msg(err_msg, "failed to list objects in bucket=" + bucket.name +
+      set_err_msg(err_msg, "failed to list objects in bucket=" + bucket->get_name() +
               " err=" +  cpp_strerror(-r));
 
       return r;
     }
+    is_truncated = results.is_truncated;
 
     vector<rgw_bucket_dir_entry>::iterator iter;
-    for (iter = result.begin(); iter != result.end(); ++iter) {
+    for (iter = results.objs.begin(); iter != results.objs.end(); ++iter) {
       rgw_obj_index_key key = iter->key;
-      rgw_obj obj(bucket, key);
+      rgw_obj obj(bucket->get_key(), key);
       string oid = obj.get_oid();
 
       int pos = oid.find_last_of('.');
@@ -903,7 +558,7 @@ int RGWBucket::check_bad_index_multipart(RGWBucketAdminOpState& op_state,
 
     if (objs_to_unlink.size() > listing_max_entries) {
       if (fix_index) {
-       int r = store->getRados()->remove_objs_from_index(dpp, bucket_info, objs_to_unlink);
+       int r = bucket->remove_objs_from_index(dpp, objs_to_unlink);
        if (r < 0) {
          set_err_msg(err_msg, "ERROR: remove_obj_from_index() returned error: " +
                      cpp_strerror(-r));
@@ -918,7 +573,7 @@ int RGWBucket::check_bad_index_multipart(RGWBucketAdminOpState& op_state,
   }
 
   if (fix_index) {
-    int r = store->getRados()->remove_objs_from_index(dpp, bucket_info, objs_to_unlink);
+    int r = bucket->remove_objs_from_index(dpp, objs_to_unlink);
     if (r < 0) {
       set_err_msg(err_msg, "ERROR: remove_obj_from_index() returned error: " +
               cpp_strerror(-r));
@@ -948,46 +603,33 @@ int RGWBucket::check_object_index(const DoutPrefixProvider *dpp,
     return -EINVAL;
   }
 
-  store->getRados()->cls_obj_set_bucket_tag_timeout(dpp, bucket_info, BUCKET_TAG_TIMEOUT);
+  bucket->set_tag_timeout(dpp, BUCKET_TAG_TIMEOUT);
 
-  string prefix;
-  string empty_delimiter;
-  rgw_obj_index_key marker;
-  bool is_truncated = true;
-  bool cls_filtered = true;
+  rgw::sal::Bucket::ListResults results;
+  results.is_truncated = true;
 
   Formatter *formatter = flusher.get_formatter();
   formatter->open_object_section("objects");
-  uint16_t expansion_factor = 1;
-  while (is_truncated) {
-    RGWRados::ent_map_t result;
-    result.reserve(listing_max_entries);
-
-    int r = store->getRados()->cls_bucket_list_ordered(
-      dpp, bucket_info, RGW_NO_SHARD, marker, prefix, empty_delimiter,
-      listing_max_entries, true, expansion_factor,
-      result, &is_truncated, &cls_filtered, &marker,
-      y, rgw_bucket_object_check_filter);
+  while (results.is_truncated) {
+    rgw::sal::Bucket::ListParams params;
+    params.marker = results.next_marker;
+    params.force_check_filter = rgw_bucket_object_check_filter;
+
+    int r = bucket->list(dpp, params, listing_max_entries, results, y);
+
     if (r == -ENOENT) {
       break;
-    } else if (r < 0 && r != -ENOENT) {
+    } else if (r < 0) {
       set_err_msg(err_msg, "ERROR: failed operation r=" + cpp_strerror(-r));
     }
 
-    if (result.size() < listing_max_entries / 8) {
-      ++expansion_factor;
-    } else if (result.size() > listing_max_entries * 7 / 8 &&
-              expansion_factor > 1) {
-      --expansion_factor;
-    }
-
-    dump_bucket_index(result, formatter);
+    dump_bucket_index(results.objs, formatter);
     flusher.flush();
   }
 
   formatter->close_section();
 
-  store->getRados()->cls_obj_set_bucket_tag_timeout(dpp, bucket_info, 0);
+  bucket->set_tag_timeout(dpp, 0);
 
   return 0;
 }
@@ -1001,14 +643,14 @@ int RGWBucket::check_index(const DoutPrefixProvider *dpp,
 {
   bool fix_index = op_state.will_fix_index();
 
-  int r = store->getRados()->bucket_check_index(dpp, bucket_info, &existing_stats, &calculated_stats);
+  int r = bucket->check_index(dpp, existing_stats, calculated_stats);
   if (r < 0) {
     set_err_msg(err_msg, "failed to check index error=" + cpp_strerror(-r));
     return r;
   }
 
   if (fix_index) {
-    r = store->getRados()->bucket_rebuild_index(dpp, bucket_info);
+    r = bucket->rebuild_index(dpp);
     if (r < 0) {
       set_err_msg(err_msg, "failed to rebuild index err=" + cpp_strerror(-r));
       return r;
@@ -1018,36 +660,36 @@ int RGWBucket::check_index(const DoutPrefixProvider *dpp,
   return 0;
 }
 
-int RGWBucket::sync(RGWBucketAdminOpState& op_state, map<string, bufferlist> *attrs, const DoutPrefixProvider *dpp, std::string *err_msg)
+int RGWBucket::sync(RGWBucketAdminOpState& op_state, const DoutPrefixProvider *dpp, std::string *err_msg)
 {
-  if (!store->svc()->zone->is_meta_master()) {
+  if (!store->is_meta_master()) {
     set_err_msg(err_msg, "ERROR: failed to update bucket sync: only allowed on meta master zone");
     return -EINVAL;
   }
   bool sync = op_state.will_sync_bucket();
   if (sync) {
-    bucket_info.flags &= ~BUCKET_DATASYNC_DISABLED;
+    bucket->get_info().flags &= ~BUCKET_DATASYNC_DISABLED;
   } else {
-    bucket_info.flags |= BUCKET_DATASYNC_DISABLED;
+    bucket->get_info().flags |= BUCKET_DATASYNC_DISABLED;
   }
 
-  int r = store->getRados()->put_bucket_instance_info(bucket_info, false, real_time(), attrs, dpp);
+  int r = bucket->put_info(dpp, false, real_time());
   if (r < 0) {
     set_err_msg(err_msg, "ERROR: failed writing bucket instance info:" + cpp_strerror(-r));
     return r;
   }
 
-  int shards_num = bucket_info.layout.current_index.layout.normal.num_shards? bucket_info.layout.current_index.layout.normal.num_shards : 1;
-  int shard_id = bucket_info.layout.current_index.layout.normal.num_shards? 0 : -1;
+  int shards_num = bucket->get_info().layout.current_index.layout.normal.num_shards? bucket->get_info().layout.current_index.layout.normal.num_shards : 1;
+  int shard_id = bucket->get_info().layout.current_index.layout.normal.num_shards? 0 : -1;
 
   if (!sync) {
-    r = store->svc()->bilog_rados->log_stop(dpp, bucket_info, -1);
+    r = static_cast<rgw::sal::RadosStore*>(store)->svc()->bilog_rados->log_stop(dpp, bucket->get_info(), -1);
     if (r < 0) {
       set_err_msg(err_msg, "ERROR: failed writing stop bilog:" + cpp_strerror(-r));
       return r;
     }
   } else {
-    r = store->svc()->bilog_rados->log_start(dpp, bucket_info, -1);
+    r = static_cast<rgw::sal::RadosStore*>(store)->svc()->bilog_rados->log_start(dpp, bucket->get_info(), -1);
     if (r < 0) {
       set_err_msg(err_msg, "ERROR: failed writing resync bilog:" + cpp_strerror(-r));
       return r;
@@ -1055,7 +697,7 @@ int RGWBucket::sync(RGWBucketAdminOpState& op_state, map<string, bufferlist> *at
   }
 
   for (int i = 0; i < shards_num; ++i, ++shard_id) {
-    r = store->svc()->datalog_rados->add_entry(dpp, bucket_info, shard_id);
+    r = static_cast<rgw::sal::RadosStore*>(store)->svc()->datalog_rados->add_entry(dpp, bucket->get_info(), shard_id);
     if (r < 0) {
       set_err_msg(err_msg, "ERROR: failed writing data log:" + cpp_strerror(-r));
       return r;
@@ -1077,35 +719,28 @@ int RGWBucket::policy_bl_to_stream(bufferlist& bl, ostream& o)
   return 0;
 }
 
-int rgw_object_get_attr(const DoutPrefixProvider *dpp, 
-                        rgw::sal::RGWRadosStore* store, const RGWBucketInfo& bucket_info,
-                       const rgw_obj& obj, const char* attr_name,
-                       bufferlist& out_bl, optional_yield y)
+int rgw_object_get_attr(const DoutPrefixProvider *dpp,
+                       rgw::sal::Store* store, rgw::sal::Object* obj,
+                       const char* attr_name, bufferlist& out_bl, optional_yield y)
 {
   RGWObjectCtx obj_ctx(store);
-  RGWRados::Object op_target(store->getRados(), bucket_info, obj_ctx, obj);
-  RGWRados::Object::Read rop(&op_target);
+  std::unique_ptr<rgw::sal::Object::ReadOp> rop = obj->get_read_op(&obj_ctx);
 
-  return rop.get_attr(dpp, attr_name, out_bl, y);
+  return rop->get_attr(dpp, attr_name, out_bl, y);
 }
 
 int RGWBucket::get_policy(RGWBucketAdminOpState& op_state, RGWAccessControlPolicy& policy, optional_yield y, const DoutPrefixProvider *dpp)
 {
+  int ret;
   std::string object_name = op_state.get_object_name();
-  rgw_bucket bucket = op_state.get_bucket();
 
-  RGWBucketInfo bucket_info;
-  map<string, bufferlist> attrs;
-  int ret = store->getRados()->get_bucket_info(store->svc(), bucket.tenant, bucket.name, bucket_info, NULL, null_yield, dpp, &attrs);
-  if (ret < 0) {
-    return ret;
-  }
+  bucket = op_state.get_bucket()->clone();
 
   if (!object_name.empty()) {
     bufferlist bl;
-    rgw_obj obj(bucket, object_name);
+    std::unique_ptr<rgw::sal::Object> obj = bucket->get_object(rgw_obj_key(object_name));
 
-    ret = rgw_object_get_attr(dpp, store, bucket_info, obj, RGW_ATTR_ACL, bl, y);
+    ret = rgw_object_get_attr(dpp, store, obj.get(), RGW_ATTR_ACL, bl, y);
     if (ret < 0){
       return ret;
     }
@@ -1117,8 +752,8 @@ int RGWBucket::get_policy(RGWBucketAdminOpState& op_state, RGWAccessControlPolic
     return ret;
   }
 
-  map<string, bufferlist>::iterator aiter = attrs.find(RGW_ATTR_ACL);
-  if (aiter == attrs.end()) {
+  map<string, bufferlist>::iterator aiter = bucket->get_attrs().find(RGW_ATTR_ACL);
+  if (aiter == bucket->get_attrs().end()) {
     return -ENOENT;
   }
 
@@ -1131,7 +766,7 @@ int RGWBucket::get_policy(RGWBucketAdminOpState& op_state, RGWAccessControlPolic
 }
 
 
-int RGWBucketAdminOp::get_policy(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state,
+int RGWBucketAdminOp::get_policy(rgw::sal::Store* store, RGWBucketAdminOpState& op_state,
                   RGWAccessControlPolicy& policy, const DoutPrefixProvider *dpp)
 {
   RGWBucket bucket;
@@ -1150,7 +785,7 @@ int RGWBucketAdminOp::get_policy(rgw::sal::RGWRadosStore *store, RGWBucketAdminO
 /* Wrappers to facilitate RESTful interface */
 
 
-int RGWBucketAdminOp::get_policy(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state,
+int RGWBucketAdminOp::get_policy(rgw::sal::Store* store, RGWBucketAdminOpState& op_state,
                   RGWFormatterFlusher& flusher, const DoutPrefixProvider *dpp)
 {
   RGWAccessControlPolicy policy(store->ctx());
@@ -1172,7 +807,7 @@ int RGWBucketAdminOp::get_policy(rgw::sal::RGWRadosStore *store, RGWBucketAdminO
   return 0;
 }
 
-int RGWBucketAdminOp::dump_s3_policy(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state,
+int RGWBucketAdminOp::dump_s3_policy(rgw::sal::Store* store, RGWBucketAdminOpState& op_state,
                   ostream& os, const DoutPrefixProvider *dpp)
 {
   RGWAccessControlPolicy_S3 policy(store->ctx());
@@ -1186,7 +821,7 @@ int RGWBucketAdminOp::dump_s3_policy(rgw::sal::RGWRadosStore *store, RGWBucketAd
   return 0;
 }
 
-int RGWBucketAdminOp::unlink(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state, const DoutPrefixProvider *dpp)
+int RGWBucketAdminOp::unlink(rgw::sal::Store* store, RGWBucketAdminOpState& op_state, const DoutPrefixProvider *dpp)
 {
   RGWBucket bucket;
 
@@ -1194,32 +829,148 @@ int RGWBucketAdminOp::unlink(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpSta
   if (ret < 0)
     return ret;
 
-  return bucket.unlink(op_state, null_yield, dpp);
+  return static_cast<rgw::sal::RadosStore*>(store)->ctl()->bucket->unlink_bucket(op_state.get_user_id(), op_state.get_bucket()->get_info().bucket, null_yield, dpp, true);
 }
 
-int RGWBucketAdminOp::link(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state, const DoutPrefixProvider *dpp, string *err)
+int RGWBucketAdminOp::link(rgw::sal::Store* store, RGWBucketAdminOpState& op_state, const DoutPrefixProvider *dpp, string *err)
 {
-  RGWBucket bucket;
-  map<string, bufferlist> attrs;
+  if (!op_state.is_user_op()) {
+    set_err_msg(err, "empty user id");
+    return -EINVAL;
+  }
 
-  int ret = bucket.init(store, op_state, null_yield, dpp, err, &attrs);
+  RGWBucket bucket;
+  int ret = bucket.init(store, op_state, null_yield, dpp, err);
   if (ret < 0)
     return ret;
 
-  return bucket.link(op_state, null_yield, dpp, attrs, err);
+  string bucket_id = op_state.get_bucket_id();
+  std::string display_name = op_state.get_user_display_name();
+  std::unique_ptr<rgw::sal::Bucket> loc_bucket;
+  std::unique_ptr<rgw::sal::Bucket> old_bucket;
+
+  loc_bucket = op_state.get_bucket()->clone();
+
+  if (!bucket_id.empty() && bucket_id != loc_bucket->get_bucket_id()) {
+    set_err_msg(err,
+       "specified bucket id does not match " + loc_bucket->get_bucket_id());
+    return -EINVAL;
+  }
+
+  old_bucket = loc_bucket->clone();
+
+  loc_bucket->get_key().tenant = op_state.get_user_id().tenant;
+
+  if (!op_state.new_bucket_name.empty()) {
+    auto pos = op_state.new_bucket_name.find('/');
+    if (pos != string::npos) {
+      loc_bucket->get_key().tenant = op_state.new_bucket_name.substr(0, pos);
+      loc_bucket->get_key().name = op_state.new_bucket_name.substr(pos + 1);
+    } else {
+      loc_bucket->get_key().name = op_state.new_bucket_name;
+    }
+  }
+
+  RGWObjVersionTracker objv_tracker;
+  RGWObjVersionTracker old_version = loc_bucket->get_info().objv_tracker;
+
+  map<string, bufferlist>::iterator aiter = loc_bucket->get_attrs().find(RGW_ATTR_ACL);
+  if (aiter == loc_bucket->get_attrs().end()) {
+       // should never happen; only pre-argonaut buckets lacked this.
+    ldpp_dout(dpp, 0) << "WARNING: can't bucket link because no acl on bucket=" << old_bucket << dendl;
+    set_err_msg(err,
+       "While crossing the Anavros you have displeased the goddess Hera."
+       "  You must sacrifice your ancient bucket " + loc_bucket->get_bucket_id());
+    return -EINVAL;
+  }
+  bufferlist& aclbl = aiter->second;
+  RGWAccessControlPolicy policy;
+  ACLOwner owner;
+  try {
+   auto iter = aclbl.cbegin();
+   decode(policy, iter);
+   owner = policy.get_owner();
+  } catch (buffer::error& e) {
+    set_err_msg(err, "couldn't decode policy");
+    return -EIO;
+  }
+
+  int r = static_cast<rgw::sal::RadosStore*>(store)->ctl()->bucket->unlink_bucket(owner.get_id(), old_bucket->get_info().bucket, null_yield, dpp, false);
+  if (r < 0) {
+    set_err_msg(err, "could not unlink policy from user " + owner.get_id().to_str());
+    return r;
+  }
+
+  // now update the user for the bucket...
+  if (display_name.empty()) {
+    ldpp_dout(dpp, 0) << "WARNING: user " << op_state.get_user_id() << " has no display name set" << dendl;
+  }
+
+  RGWAccessControlPolicy policy_instance;
+  policy_instance.create_default(op_state.get_user_id(), display_name);
+  owner = policy_instance.get_owner();
+
+  aclbl.clear();
+  policy_instance.encode(aclbl);
+
+  bool exclusive = false;
+  loc_bucket->get_info().owner = op_state.get_user_id();
+  if (*loc_bucket != *old_bucket) {
+    loc_bucket->get_info().bucket = loc_bucket->get_key();
+    loc_bucket->get_info().objv_tracker.version_for_read()->ver = 0;
+    exclusive = true;
+  }
+
+  r = loc_bucket->put_info(dpp, exclusive, ceph::real_time());
+  if (r < 0) {
+    set_err_msg(err, "ERROR: failed writing bucket instance info: " + cpp_strerror(-r));
+    return r;
+  }
+
+  /* link to user */
+  RGWBucketEntryPoint ep;
+  ep.bucket = loc_bucket->get_info().bucket;
+  ep.owner = op_state.get_user_id();
+  ep.creation_time = loc_bucket->get_info().creation_time;
+  ep.linked = true;
+  rgw::sal::Attrs ep_attrs;
+  rgw_ep_info ep_data{ep, ep_attrs};
+
+  r = static_cast<rgw::sal::RadosStore*>(store)->ctl()->bucket->link_bucket(op_state.get_user_id(), loc_bucket->get_info().bucket, loc_bucket->get_info().creation_time, null_yield, dpp, true, &ep_data);
+  if (r < 0) {
+    set_err_msg(err, "failed to relink bucket");
+    return r;
+  }
+
+  if (*loc_bucket != *old_bucket) {
+    // like RGWRados::delete_bucket -- excepting no bucket_index work.
+    r = static_cast<rgw::sal::RadosStore*>(store)->ctl()->bucket->remove_bucket_entrypoint_info(
+                                       old_bucket->get_key(), null_yield, dpp,
+                                       RGWBucketCtl::Bucket::RemoveParams()
+                                       .set_objv_tracker(&ep_data.ep_objv));
+    if (r < 0) {
+      set_err_msg(err, "failed to unlink old bucket " + old_bucket->get_tenant() + "/" + old_bucket->get_name());
+      return r;
+    }
+    r = static_cast<rgw::sal::RadosStore*>(store)->ctl()->bucket->remove_bucket_instance_info(
+                                       old_bucket->get_key(), old_bucket->get_info(),
+                                       null_yield, dpp,
+                                       RGWBucketCtl::BucketInstance::RemoveParams()
+                                       .set_objv_tracker(&ep_data.ep_objv));
+    if (r < 0) {
+      set_err_msg(err, "failed to unlink old bucket " + old_bucket->get_tenant() + "/" + old_bucket->get_name());
+      return r;
+    }
+  }
 
+  return 0;
 }
 
-int RGWBucketAdminOp::chown(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state, const string& marker, const DoutPrefixProvider *dpp, string *err)
+int RGWBucketAdminOp::chown(rgw::sal::Store* store, RGWBucketAdminOpState& op_state, const string& marker, const DoutPrefixProvider *dpp, string *err)
 {
   RGWBucket bucket;
-  map<string, bufferlist> attrs;
-
-  int ret = bucket.init(store, op_state, null_yield, dpp, err, &attrs);
-  if (ret < 0)
-    return ret;
 
-  ret = bucket.link(op_state, null_yield, dpp, attrs, err);
+  int ret = bucket.init(store, op_state, null_yield, dpp, err);
   if (ret < 0)
     return ret;
 
@@ -1227,7 +978,7 @@ int RGWBucketAdminOp::chown(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpStat
 
 }
 
-int RGWBucketAdminOp::check_index(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state,
+int RGWBucketAdminOp::check_index(rgw::sal::Store* store, RGWBucketAdminOpState& op_state,
                   RGWFormatterFlusher& flusher, optional_yield y, const DoutPrefixProvider *dpp)
 {
   int ret;
@@ -1262,12 +1013,12 @@ int RGWBucketAdminOp::check_index(rgw::sal::RGWRadosStore *store, RGWBucketAdmin
   return 0;
 }
 
-int RGWBucketAdminOp::remove_bucket(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state,
+int RGWBucketAdminOp::remove_bucket(rgw::sal::Store* store, RGWBucketAdminOpState& op_state,
                                    optional_yield y, const DoutPrefixProvider *dpp, 
                                     bool bypass_gc, bool keep_index_consistent)
 {
-  std::unique_ptr<rgw::sal::RGWBucket> bucket;
-  std::unique_ptr<rgw::sal::RGWUser> user = store->get_user(op_state.get_user_id());
+  std::unique_ptr<rgw::sal::Bucket> bucket;
+  std::unique_ptr<rgw::sal::User> user = store->get_user(op_state.get_user_id());
 
   int ret = store->get_bucket(dpp, user.get(), user->get_tenant(), op_state.get_bucket_name(),
                              &bucket, y);
@@ -1275,7 +1026,7 @@ int RGWBucketAdminOp::remove_bucket(rgw::sal::RGWRadosStore *store, RGWBucketAdm
     return ret;
 
   if (bypass_gc)
-    ret = rgw_remove_bucket_bypass_gc(store, bucket->get_key(), op_state.get_max_aio(), keep_index_consistent, y, dpp);
+    ret = bucket->remove_bucket_bypass_gc(op_state.get_max_aio(), keep_index_consistent, y, dpp);
   else
     ret = bucket->remove_bucket(dpp, op_state.will_delete_children(),
                                false, nullptr, y);
@@ -1283,7 +1034,7 @@ int RGWBucketAdminOp::remove_bucket(rgw::sal::RGWRadosStore *store, RGWBucketAdm
   return ret;
 }
 
-int RGWBucketAdminOp::remove_object(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state, const DoutPrefixProvider *dpp)
+int RGWBucketAdminOp::remove_object(rgw::sal::Store* store, RGWBucketAdminOpState& op_state, const DoutPrefixProvider *dpp)
 {
   RGWBucket bucket;
 
@@ -1294,74 +1045,66 @@ int RGWBucketAdminOp::remove_object(rgw::sal::RGWRadosStore *store, RGWBucketAdm
   return bucket.remove_object(dpp, op_state);
 }
 
-int RGWBucketAdminOp::sync_bucket(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state, const DoutPrefixProvider *dpp, string *err_msg)
+int RGWBucketAdminOp::sync_bucket(rgw::sal::Store* store, RGWBucketAdminOpState& op_state, const DoutPrefixProvider *dpp, string *err_msg)
 {
   RGWBucket bucket;
-  map<string, bufferlist> attrs;
-  int ret = bucket.init(store, op_state, null_yield, dpp, err_msg, &attrs);
+  int ret = bucket.init(store, op_state, null_yield, dpp, err_msg);
   if (ret < 0)
   {
     return ret;
   }
-  return bucket.sync(op_state, &attrs, dpp, err_msg);
+  return bucket.sync(op_state, dpp, err_msg);
 }
 
-static int bucket_stats(rgw::sal::RGWRadosStore *store,
+static int bucket_stats(rgw::sal::Store* store,
                        const std::string& tenant_name,
                        const std::string& bucket_name,
                        Formatter *formatter,
                         const DoutPrefixProvider *dpp)
 {
-  RGWBucketInfo bucket_info;
+  std::unique_ptr<rgw::sal::Bucket> bucket;
   map<RGWObjCategory, RGWStorageStats> stats;
-  map<string, bufferlist> attrs;
 
   real_time mtime;
-  int r = store->getRados()->get_bucket_info(store->svc(),
-                                            tenant_name, bucket_name, bucket_info,
-                                            &mtime, null_yield, dpp, &attrs);
-  if (r < 0) {
-    return r;
+  int ret = store->get_bucket(dpp, nullptr, tenant_name, bucket_name, &bucket, null_yield);
+  if (ret < 0) {
+    return ret;
   }
 
-  rgw_bucket& bucket = bucket_info.bucket;
-
   string bucket_ver, master_ver;
   string max_marker;
-  int ret = store->getRados()->get_bucket_stats(dpp, bucket_info, RGW_NO_SHARD,
-                                               &bucket_ver, &master_ver, stats,
-                                               &max_marker);
+  ret = bucket->read_stats(dpp, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, &max_marker);
   if (ret < 0) {
-    cerr << "error getting bucket stats bucket=" << bucket.name << " ret=" << ret << std::endl;
+    cerr << "error getting bucket stats bucket=" << bucket->get_name() << " ret=" << ret << std::endl;
     return ret;
   }
 
   utime_t ut(mtime);
-  utime_t ctime_ut(bucket_info.creation_time);
+  utime_t ctime_ut(bucket->get_creation_time());
 
   formatter->open_object_section("stats");
-  formatter->dump_string("bucket", bucket.name);
+  formatter->dump_string("bucket", bucket->get_name());
   formatter->dump_int("num_shards",
-                     bucket_info.layout.current_index.layout.normal.num_shards);
-  formatter->dump_string("tenant", bucket.tenant);
-  formatter->dump_string("zonegroup", bucket_info.zonegroup);
-  formatter->dump_string("placement_rule", bucket_info.placement_rule.to_str());
-  ::encode_json("explicit_placement", bucket.explicit_placement, formatter);
-  formatter->dump_string("id", bucket.bucket_id);
-  formatter->dump_string("marker", bucket.marker);
-  formatter->dump_stream("index_type") << bucket_info.layout.current_index.layout.type;
-  ::encode_json("owner", bucket_info.owner, formatter);
+                     bucket->get_info().layout.current_index.layout.normal.num_shards);
+  formatter->dump_string("tenant", bucket->get_tenant());
+  formatter->dump_string("zonegroup", bucket->get_info().zonegroup);
+  formatter->dump_string("placement_rule", bucket->get_info().placement_rule.to_str());
+  ::encode_json("explicit_placement", bucket->get_key().explicit_placement, formatter);
+  formatter->dump_string("id", bucket->get_bucket_id());
+  formatter->dump_string("marker", bucket->get_marker());
+  formatter->dump_stream("index_type") << bucket->get_info().layout.current_index.layout.type;
+  ::encode_json("owner", bucket->get_info().owner, formatter);
   formatter->dump_string("ver", bucket_ver);
   formatter->dump_string("master_ver", master_ver);
   ut.gmtime(formatter->dump_stream("mtime"));
   ctime_ut.gmtime(formatter->dump_stream("creation_time"));
   formatter->dump_string("max_marker", max_marker);
   dump_bucket_usage(stats, formatter);
-  encode_json("bucket_quota", bucket_info.quota, formatter);
+  encode_json("bucket_quota", bucket->get_info().quota, formatter);
 
   // bucket tags
-  auto iter = attrs.find(RGW_ATTR_TAGS);
-  if (iter != attrs.end()) {
+  auto iter = bucket->get_attrs().find(RGW_ATTR_TAGS);
+  if (iter != bucket->get_attrs().end()) {
     RGWObjTagSet_S3 tagset;
     bufferlist::const_iterator piter{&iter->second};
     try {
@@ -1379,7 +1122,7 @@ static int bucket_stats(rgw::sal::RGWRadosStore *store,
   return 0;
 }
 
-int RGWBucketAdminOp::limit_check(rgw::sal::RGWRadosStore *store,
+int RGWBucketAdminOp::limit_check(rgw::sal::Store* store,
                                  RGWBucketAdminOpState& op_state,
                                  const std::list<std::string>& user_ids,
                                  RGWFormatterFlusher& flusher, optional_yield y,
@@ -1410,40 +1153,34 @@ int RGWBucketAdminOp::limit_check(rgw::sal::RGWRadosStore *store,
     formatter->open_array_section("buckets");
 
     string marker;
-    rgw::sal::RGWBucketList buckets;
+    rgw::sal::BucketList buckets;
     do {
-      rgw::sal::RGWRadosUser user(store, rgw_user(user_id));
+      std::unique_ptr<rgw::sal::User> user = store->get_user(rgw_user(user_id));
 
-      ret = user.list_buckets(dpp, marker, string(), max_entries, false, buckets, y);
+      ret = user->list_buckets(dpp, marker, string(), max_entries, false, buckets, y);
 
       if (ret < 0)
         return ret;
 
-      map<string, std::unique_ptr<rgw::sal::RGWBucket>>& m_buckets = buckets.get_buckets();
+      map<string, std::unique_ptr<rgw::sal::Bucket>>& m_buckets = buckets.get_buckets();
 
       for (const auto& iter : m_buckets) {
        auto& bucket = iter.second;
        uint32_t num_shards = 1;
        uint64_t num_objects = 0;
 
-       /* need info for num_shards */
-       RGWBucketInfo info;
-
        marker = bucket->get_name(); /* Casey's location for marker update,
                                     * as we may now not reach the end of
                                     * the loop body */
 
-       ret = store->getRados()->get_bucket_info(store->svc(), bucket->get_tenant(),
-                                                bucket->get_name(), info, nullptr,
-                                                null_yield, dpp);
+       ret = bucket->load_bucket(dpp, null_yield);
        if (ret < 0)
          continue;
 
        /* need stats for num_entries */
        string bucket_ver, master_ver;
        std::map<RGWObjCategory, RGWStorageStats> stats;
-       ret = store->getRados()->get_bucket_stats(dpp, info, RGW_NO_SHARD, &bucket_ver,
-                                     &master_ver, stats, nullptr);
+       ret = bucket->read_stats(dpp, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, nullptr);
 
        if (ret < 0)
          continue;
@@ -1452,7 +1189,7 @@ int RGWBucketAdminOp::limit_check(rgw::sal::RGWRadosStore *store,
          num_objects += s.second.num_objects;
        }
 
-       num_shards = info.layout.current_index.layout.normal.num_shards;
+       num_shards = bucket->get_info().layout.current_index.layout.normal.num_shards;
        uint64_t objs_per_shard =
          (num_shards) ? num_objects/num_shards : num_objects;
        {
@@ -1497,7 +1234,7 @@ int RGWBucketAdminOp::limit_check(rgw::sal::RGWRadosStore *store,
   return ret;
 } /* RGWBucketAdminOp::limit_check */
 
-int RGWBucketAdminOp::info(rgw::sal::RGWRadosStore *store,
+int RGWBucketAdminOp::info(rgw::sal::Store* store,
                           RGWBucketAdminOpState& op_state,
                           RGWFormatterFlusher& flusher,
                           optional_yield y,
@@ -1526,21 +1263,21 @@ int RGWBucketAdminOp::info(rgw::sal::RGWRadosStore *store,
   if (op_state.is_user_op()) {
     formatter->open_array_section("buckets");
 
-    rgw::sal::RGWBucketList buckets;
-    rgw::sal::RGWRadosUser user(store, op_state.get_user_id());
+    rgw::sal::BucketList buckets;
+    std::unique_ptr<rgw::sal::User> user = store->get_user(op_state.get_user_id());
     std::string marker;
     const std::string empty_end_marker;
     constexpr bool no_need_stats = false; // set need_stats to false
 
     do {
-      ret = user.list_buckets(dpp, marker, empty_end_marker, max_entries,
+      ret = user->list_buckets(dpp, marker, empty_end_marker, max_entries,
                              no_need_stats, buckets, y);
       if (ret < 0) {
         return ret;
       }
 
       const std::string* marker_cursor = nullptr;
-      map<string, std::unique_ptr<rgw::sal::RGWBucket>>& m = buckets.get_buckets();
+      map<string, std::unique_ptr<rgw::sal::Bucket>>& m = buckets.get_buckets();
 
       for (const auto& i : m) {
         const std::string& obj_name = i.first;
@@ -1574,11 +1311,11 @@ int RGWBucketAdminOp::info(rgw::sal::RGWRadosStore *store,
     bool truncated = true;
 
     formatter->open_array_section("buckets");
-    ret = store->ctl()->meta.mgr->list_keys_init(dpp, "bucket", &handle);
+    ret = store->meta_list_keys_init(dpp, "bucket", string(), &handle);
     while (ret == 0 && truncated) {
       std::list<std::string> buckets;
       constexpr int max_keys = 1000;
-      ret = store->ctl()->meta.mgr->list_keys_next(handle, max_keys, buckets,
+      ret = store->meta_list_keys_next(dpp, handle, max_keys, buckets,
                                                   &truncated);
       for (auto& bucket_name : buckets) {
         if (show_stats) {
@@ -1588,7 +1325,7 @@ int RGWBucketAdminOp::info(rgw::sal::RGWRadosStore *store,
        }
       }
     }
-    store->ctl()->meta.mgr->list_keys_complete(handle);
+    store->meta_list_keys_complete(handle);
 
     formatter->close_section();
   }
@@ -1598,7 +1335,7 @@ int RGWBucketAdminOp::info(rgw::sal::RGWRadosStore *store,
   return 0;
 }
 
-int RGWBucketAdminOp::set_quota(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state, const DoutPrefixProvider *dpp)
+int RGWBucketAdminOp::set_quota(rgw::sal::Store* store, RGWBucketAdminOpState& op_state, const DoutPrefixProvider *dpp)
 {
   RGWBucket bucket;
 
@@ -1608,26 +1345,14 @@ int RGWBucketAdminOp::set_quota(rgw::sal::RGWRadosStore *store, RGWBucketAdminOp
   return bucket.set_quota(op_state, dpp);
 }
 
-static int purge_bucket_instance(rgw::sal::RGWRadosStore *store, const RGWBucketInfo& bucket_info, const DoutPrefixProvider *dpp)
+static int purge_bucket_instance(rgw::sal::Store* store, const RGWBucketInfo& bucket_info, const DoutPrefixProvider *dpp)
 {
-  int max_shards = (bucket_info.layout.current_index.layout.normal.num_shards > 0 ? bucket_info.layout.current_index.layout.normal.num_shards : 1);
-  for (int i = 0; i < max_shards; i++) {
-    RGWRados::BucketShard bs(store->getRados());
-    int shard_id = (bucket_info.layout.current_index.layout.normal.num_shards > 0  ? i : -1);
-    int ret = bs.init(bucket_info.bucket, shard_id, bucket_info.layout.current_index, nullptr, dpp);
-    if (ret < 0) {
-      cerr << "ERROR: bs.init(bucket=" << bucket_info.bucket << ", shard=" << shard_id
-           << "): " << cpp_strerror(-ret) << std::endl;
-      return ret;
-    }
-    ret = store->getRados()->bi_remove(bs);
-    if (ret < 0) {
-      cerr << "ERROR: failed to remove bucket index object: "
-           << cpp_strerror(-ret) << std::endl;
-      return ret;
-    }
-  }
-  return 0;
+  std::unique_ptr<rgw::sal::Bucket> bucket;
+  int ret = store->get_bucket(nullptr, bucket_info, &bucket);
+  if (ret < 0)
+    return ret;
+
+  return bucket->purge_instance(dpp);
 }
 
 inline auto split_tenant(const std::string& bucket_name){
@@ -1639,27 +1364,28 @@ inline auto split_tenant(const std::string& bucket_name){
 }
 
 using bucket_instance_ls = std::vector<RGWBucketInfo>;
-void get_stale_instances(rgw::sal::RGWRadosStore *store, const std::string& bucket_name,
+void get_stale_instances(rgw::sal::Store* store, const std::string& bucket_name,
                          const vector<std::string>& lst,
                          bucket_instance_ls& stale_instances,
                          const DoutPrefixProvider *dpp)
 {
 
-  auto obj_ctx = store->svc()->sysobj->init_obj_ctx();
-
   bucket_instance_ls other_instances;
 // first iterate over the entries, and pick up the done buckets; these
 // are guaranteed to be stale
   for (const auto& bucket_instance : lst){
     RGWBucketInfo binfo;
-    int r = store->getRados()->get_bucket_instance_info(obj_ctx, bucket_instance,
-                                            binfo, nullptr,nullptr, null_yield, dpp);
+    std::unique_ptr<rgw::sal::Bucket> bucket;
+    rgw_bucket rbucket;
+    rgw_bucket_parse_bucket_key(store->ctx(), bucket_instance, &rbucket, nullptr);
+    int r = store->get_bucket(dpp, nullptr, rbucket, &bucket, null_yield);
     if (r < 0){
       // this can only happen if someone deletes us right when we're processing
       ldpp_dout(dpp, -1) << "Bucket instance is invalid: " << bucket_instance
                           << cpp_strerror(-r) << dendl;
       continue;
     }
+    binfo = bucket->get_info();
     if (binfo.reshard_status == cls_rgw_reshard_status::DONE)
       stale_instances.emplace_back(std::move(binfo));
     else {
@@ -1669,9 +1395,10 @@ void get_stale_instances(rgw::sal::RGWRadosStore *store, const std::string& buck
 
   // Read the cur bucket info, if the bucket doesn't exist we can simply return
   // all the instances
-  auto [tenant, bucket] = split_tenant(bucket_name);
+  auto [tenant, bname] = split_tenant(bucket_name);
   RGWBucketInfo cur_bucket_info;
-  int r = store->getRados()->get_bucket_info(store->svc(), tenant, bucket, cur_bucket_info, nullptr, null_yield, dpp);
+  std::unique_ptr<rgw::sal::Bucket> cur_bucket;
+  int r = store->get_bucket(dpp, nullptr, tenant, bname, &cur_bucket, null_yield);
   if (r < 0) {
     if (r == -ENOENT) {
       // bucket doesn't exist, everything is stale then
@@ -1681,12 +1408,13 @@ void get_stale_instances(rgw::sal::RGWRadosStore *store, const std::string& buck
     } else {
       // all bets are off if we can't read the bucket, just return the sureshot stale instances
       ldpp_dout(dpp, -1) << "error: reading bucket info for bucket: "
-                          << bucket << cpp_strerror(-r) << dendl;
+                          << bname << cpp_strerror(-r) << dendl;
     }
     return;
   }
 
   // Don't process further in this round if bucket is resharding
+  cur_bucket_info = cur_bucket->get_info();
   if (cur_bucket_info.reshard_status == cls_rgw_reshard_status::IN_PROGRESS)
     return;
 
@@ -1707,8 +1435,8 @@ void get_stale_instances(rgw::sal::RGWRadosStore *store, const std::string& buck
   // bucket and walk through these instances to make sure no one else interferes
   // with these
   {
-    RGWBucketReshardLock reshard_lock(store, cur_bucket_info, true);
-    r = reshard_lock.lock();
+    RGWBucketReshardLock reshard_lock(static_cast<rgw::sal::RadosStore*>(store), cur_bucket->get_info(), true);
+    r = reshard_lock.lock(dpp);
     if (r < 0) {
       // most likely bucket is under reshard, return the sureshot stale instances
       ldpp_dout(dpp, 5) << __func__
@@ -1726,19 +1454,19 @@ void get_stale_instances(rgw::sal::RGWRadosStore *store, const std::string& buck
   return;
 }
 
-static int process_stale_instances(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state,
+static int process_stale_instances(rgw::sal::Store* store, RGWBucketAdminOpState& op_state,
                                    RGWFormatterFlusher& flusher,
                                    const DoutPrefixProvider *dpp,
                                    std::function<void(const bucket_instance_ls&,
                                                       Formatter *,
-                                                      rgw::sal::RGWRadosStore*)> process_f)
+                                                      rgw::sal::Store*)> process_f)
 {
   std::string marker;
   void *handle;
   Formatter *formatter = flusher.get_formatter();
   static constexpr auto default_max_keys = 1000;
 
-  int ret = store->ctl()->meta.mgr->list_keys_init(dpp, "bucket.instance", marker, &handle);
+  int ret = store->meta_list_keys_init(dpp, "bucket.instance", marker, &handle);
   if (ret < 0) {
     cerr << "ERROR: can't get key: " << cpp_strerror(-ret) << std::endl;
     return ret;
@@ -1748,7 +1476,7 @@ static int process_stale_instances(rgw::sal::RGWRadosStore *store, RGWBucketAdmi
 
   formatter->open_array_section("keys");
   auto g = make_scope_guard([&store, &handle, &formatter]() {
-                              store->ctl()->meta.mgr->list_keys_complete(handle);
+                              store->meta_list_keys_complete(handle);
                               formatter->close_section(); // keys
                               formatter->flush(cout);
                             });
@@ -1756,7 +1484,7 @@ static int process_stale_instances(rgw::sal::RGWRadosStore *store, RGWBucketAdmi
   do {
     list<std::string> keys;
 
-    ret = store->ctl()->meta.mgr->list_keys_next(handle, default_max_keys, keys, &truncated);
+    ret = store->meta_list_keys_next(dpp, handle, default_max_keys, keys, &truncated);
     if (ret < 0 && ret != -ENOENT) {
       cerr << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << std::endl;
       return ret;
@@ -1780,14 +1508,14 @@ static int process_stale_instances(rgw::sal::RGWRadosStore *store, RGWBucketAdmi
   return 0;
 }
 
-int RGWBucketAdminOp::list_stale_instances(rgw::sal::RGWRadosStore *store,
+int RGWBucketAdminOp::list_stale_instances(rgw::sal::Store* store,
                                            RGWBucketAdminOpState& op_state,
                                            RGWFormatterFlusher& flusher,
                                            const DoutPrefixProvider *dpp)
 {
   auto process_f = [](const bucket_instance_ls& lst,
                       Formatter *formatter,
-                      rgw::sal::RGWRadosStore*){
+                      rgw::sal::Store*){
                      for (const auto& binfo: lst)
                        formatter->dump_string("key", binfo.bucket.get_key());
                    };
@@ -1795,19 +1523,19 @@ int RGWBucketAdminOp::list_stale_instances(rgw::sal::RGWRadosStore *store,
 }
 
 
-int RGWBucketAdminOp::clear_stale_instances(rgw::sal::RGWRadosStore *store,
+int RGWBucketAdminOp::clear_stale_instances(rgw::sal::Store* store,
                                             RGWBucketAdminOpState& op_state,
                                             RGWFormatterFlusher& flusher,
                                             const DoutPrefixProvider *dpp)
 {
   auto process_f = [dpp](const bucket_instance_ls& lst,
                       Formatter *formatter,
-                      rgw::sal::RGWRadosStore *store) {
+                      rgw::sal::Store* store){
                      for (const auto &binfo: lst) {
                        int ret = purge_bucket_instance(store, binfo, dpp);
                        if (ret == 0){
                          auto md_key = "bucket.instance:" + binfo.bucket.get_key();
-                         ret = store->ctl()->meta.mgr->remove(md_key, null_yield, dpp);
+                         ret = store->meta_remove(dpp, md_key, null_yield);
                        }
                        formatter->open_object_section("delete_status");
                        formatter->dump_string("bucket_instance", binfo.bucket.get_key());
@@ -1819,23 +1547,20 @@ int RGWBucketAdminOp::clear_stale_instances(rgw::sal::RGWRadosStore *store,
   return process_stale_instances(store, op_state, flusher, dpp, process_f);
 }
 
-static int fix_single_bucket_lc(rgw::sal::RGWRadosStore *store,
+static int fix_single_bucket_lc(rgw::sal::Store* store,
                                 const std::string& tenant_name,
                                 const std::string& bucket_name,
                                 const DoutPrefixProvider *dpp)
 {
-  RGWBucketInfo bucket_info;
-  map <std::string, bufferlist> bucket_attrs;
-  int ret = store->getRados()->get_bucket_info(store->svc(), tenant_name, bucket_name,
-                                   bucket_info, nullptr, null_yield, dpp, &bucket_attrs);
+  std::unique_ptr<rgw::sal::Bucket> bucket;
+  int ret = store->get_bucket(dpp, nullptr, tenant_name, bucket_name, &bucket, null_yield);
   if (ret < 0) {
     // TODO: Should we handle the case where the bucket could've been removed between
     // listing and fetching?
     return ret;
   }
 
-  return rgw::lc::fix_lc_shard_entry(dpp, store, store->get_rgwlc()->get_lc(), bucket_info,
-                                    bucket_attrs);
+  return rgw::lc::fix_lc_shard_entry(dpp, store, store->get_rgwlc()->get_lc(), bucket.get());
 }
 
 static void format_lc_status(Formatter* formatter,
@@ -1850,7 +1575,7 @@ static void format_lc_status(Formatter* formatter,
   formatter->close_section(); // bucket_entry
 }
 
-static void process_single_lc_entry(rgw::sal::RGWRadosStore *store,
+static void process_single_lc_entry(rgw::sal::Store* store,
                                    Formatter *formatter,
                                     const std::string& tenant_name,
                                     const std::string& bucket_name,
@@ -1860,7 +1585,7 @@ static void process_single_lc_entry(rgw::sal::RGWRadosStore *store,
   format_lc_status(formatter, tenant_name, bucket_name, -ret);
 }
 
-int RGWBucketAdminOp::fix_lc_shards(rgw::sal::RGWRadosStore *store,
+int RGWBucketAdminOp::fix_lc_shards(rgw::sal::Store* store,
                                     RGWBucketAdminOpState& op_state,
                                     RGWFormatterFlusher& flusher,
                                     const DoutPrefixProvider *dpp)
@@ -1877,7 +1602,7 @@ int RGWBucketAdminOp::fix_lc_shards(rgw::sal::RGWRadosStore *store,
     process_single_lc_entry(store, formatter, user_id.tenant, bucket_name, dpp);
     formatter->flush(cout);
   } else {
-    int ret = store->ctl()->meta.mgr->list_keys_init(dpp, "bucket", marker, &handle);
+    int ret = store->meta_list_keys_init(dpp, "bucket", marker, &handle);
     if (ret < 0) {
       std::cerr << "ERROR: can't get key: " << cpp_strerror(-ret) << std::endl;
       return ret;
@@ -1886,13 +1611,13 @@ int RGWBucketAdminOp::fix_lc_shards(rgw::sal::RGWRadosStore *store,
     {
       formatter->open_array_section("lc_fix_status");
       auto sg = make_scope_guard([&store, &handle, &formatter](){
-                                   store->ctl()->meta.mgr->list_keys_complete(handle);
+                                   store->meta_list_keys_complete(handle);
                                    formatter->close_section(); // lc_fix_status
                                    formatter->flush(cout);
                                  });
       do {
         list<std::string> keys;
-        ret = store->ctl()->meta.mgr->list_keys_next(handle, default_max_keys, keys, &truncated);
+        ret = store->meta_list_keys_next(dpp, handle, default_max_keys, keys, &truncated);
         if (ret < 0 && ret != -ENOENT) {
           std::cerr << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << std::endl;
           return ret;
@@ -1911,15 +1636,15 @@ int RGWBucketAdminOp::fix_lc_shards(rgw::sal::RGWRadosStore *store,
 
 }
 
-static bool has_object_expired(const DoutPrefixProvider *dpp, 
-                               rgw::sal::RGWRadosStore *store,
-                              const RGWBucketInfo& bucket_info,
+static bool has_object_expired(const DoutPrefixProvider *dpp,
+                              rgw::sal::Store* store,
+                              rgw::sal::Bucket* bucket,
                               const rgw_obj_key& key, utime_t& delete_at)
 {
-  rgw_obj obj(bucket_info.bucket, key);
+  std::unique_ptr<rgw::sal::Object> obj = bucket->get_object(key);
   bufferlist delete_at_bl;
 
-  int ret = rgw_object_get_attr(dpp, store, bucket_info, obj, RGW_ATTR_DELETE_AT, delete_at_bl, null_yield);
+  int ret = rgw_object_get_attr(dpp, store, obj.get(), RGW_ATTR_DELETE_AT, delete_at_bl, null_yield);
   if (ret < 0) {
     return false;  // no delete at attr, proceed
   }
@@ -1936,12 +1661,12 @@ static bool has_object_expired(const DoutPrefixProvider *dpp,
   return false;
 }
 
-static int fix_bucket_obj_expiry(const DoutPrefixProvider *dpp, 
-                                 rgw::sal::RGWRadosStore *store,
-                                const RGWBucketInfo& bucket_info,
+static int fix_bucket_obj_expiry(const DoutPrefixProvider *dpp,
+                                rgw::sal::Store* store,
+                                rgw::sal::Bucket* bucket,
                                 RGWFormatterFlusher& flusher, bool dry_run)
 {
-  if (bucket_info.bucket.bucket_id == bucket_info.bucket.marker) {
+  if (bucket->get_key().bucket_id == bucket->get_key().marker) {
     ldpp_dout(dpp, -1) << "Not a resharded bucket skipping" << dendl;
     return 0;  // not a resharded bucket, move along
   }
@@ -1953,32 +1678,28 @@ static int fix_bucket_obj_expiry(const DoutPrefixProvider *dpp,
                               formatter->flush(std::cout);
                             });
 
-  RGWRados::Bucket target(store->getRados(), bucket_info);
-  RGWRados::Bucket::List list_op(&target);
+  rgw::sal::Bucket::ListParams params;
+  rgw::sal::Bucket::ListResults results;
 
-  list_op.params.list_versions = bucket_info.versioned();
-  list_op.params.allow_unordered = true;
+  params.list_versions = bucket->versioned();
+  params.allow_unordered = true;
 
-  bool is_truncated {false};
   do {
-    std::vector<rgw_bucket_dir_entry> objs;
-
-    int ret = list_op.list_objects(dpp, listing_max_entries, &objs, nullptr,
-                                  &is_truncated, null_yield);
+    int ret = bucket->list(dpp, params, listing_max_entries, results, null_yield);
     if (ret < 0) {
       ldpp_dout(dpp, -1) << "ERROR failed to list objects in the bucket" << dendl;
       return ret;
     }
-    for (const auto& obj : objs) {
+    for (const auto& obj : results.objs) {
       rgw_obj_key key(obj.key);
       utime_t delete_at;
-      if (has_object_expired(dpp, store, bucket_info, key, delete_at)) {
+      if (has_object_expired(dpp, store, bucket, key, delete_at)) {
        formatter->open_object_section("object_status");
        formatter->dump_string("object", key.name);
        formatter->dump_stream("delete_at") << delete_at;
 
        if (!dry_run) {
-         ret = rgw_remove_object(dpp, store, bucket_info, bucket_info.bucket, key);
+         ret = rgw_remove_object(dpp, store, bucket, key);
          formatter->dump_int("status", ret);
        }
 
@@ -1986,12 +1707,12 @@ static int fix_bucket_obj_expiry(const DoutPrefixProvider *dpp,
       }
     }
     formatter->flush(cout); // regularly flush every 1k entries
-  } while (is_truncated);
+  } while (results.is_truncated);
 
   return 0;
 }
 
-int RGWBucketAdminOp::fix_obj_expiry(rgw::sal::RGWRadosStore *store,
+int RGWBucketAdminOp::fix_obj_expiry(rgw::sal::Store* store,
                                     RGWBucketAdminOpState& op_state,
                                     RGWFormatterFlusher& flusher,
                                      const DoutPrefixProvider *dpp, bool dry_run)
@@ -2002,8 +1723,13 @@ int RGWBucketAdminOp::fix_obj_expiry(rgw::sal::RGWRadosStore *store,
     ldpp_dout(dpp, -1) << "failed to initialize bucket" << dendl;
     return ret;
   }
+  std::unique_ptr<rgw::sal::Bucket> bucket;
+  ret = store->get_bucket(nullptr, admin_bucket.get_bucket_info(), &bucket);
+  if (ret < 0) {
+    return ret;
+  }
 
-  return fix_bucket_obj_expiry(dpp, store, admin_bucket.get_bucket_info(), flusher, dry_run);
+  return fix_bucket_obj_expiry(dpp, store, bucket.get(), flusher, dry_run);
 }
 
 void RGWBucketCompleteInfo::dump(Formatter *f) const {
@@ -2205,6 +1931,8 @@ static void get_md5_digest(const RGWBucketEntryPoint *be, string& md5_digest) {
    f->flush(bl);
 
    MD5 hash;
+   // Allow use of MD5 digest in FIPS mode for non-cryptographic purposes
+   hash.SetFlags(EVP_MD_CTX_FLAG_NON_FIPS_ALLOW);
    hash.Update((const unsigned char *)bl.c_str(), bl.length());
    hash.Final(m);
 
@@ -3160,73 +2888,47 @@ int RGWBucketCtl::do_unlink_bucket(RGWSI_Bucket_EP_Ctx& ctx,
   return svc.bucket->store_bucket_entrypoint_info(ctx, meta_key, ep, false, real_time(), &attrs, &ot, y, dpp);
 }
 
-int RGWBucketCtl::set_acl(ACLOwner& owner, rgw_bucket& bucket,
-                          RGWBucketInfo& bucket_info, bufferlist& bl,
-                          optional_yield y,
-                          const DoutPrefixProvider *dpp)
-{
-  // set owner and acl
-  bucket_info.owner = owner.get_id();
-  std::map<std::string, bufferlist> attrs{{RGW_ATTR_ACL, bl}};
-
-  int r = store_bucket_instance_info(bucket, bucket_info, y, dpp,
-                                     BucketInstance::PutParams().set_attrs(&attrs));
-  if (r < 0) {
-    cerr << "ERROR: failed to set bucket owner: " << cpp_strerror(-r) << std::endl;
-    return r;
-  }
-  
-  return 0;
-}
-
 // TODO: remove RGWRados dependency for bucket listing
-int RGWBucketCtl::chown(rgw::sal::RGWRadosStore *store, RGWBucketInfo& bucket_info,
+int RGWBucketCtl::chown(rgw::sal::Store* store, rgw::sal::Bucket* bucket,
                         const rgw_user& user_id, const std::string& display_name,
                         const std::string& marker, optional_yield y, const DoutPrefixProvider *dpp)
 {
   RGWObjectCtx obj_ctx(store);
-  std::vector<rgw_bucket_dir_entry> objs;
   map<string, bool> common_prefixes;
 
-  RGWRados::Bucket target(store->getRados(), bucket_info);
-  RGWRados::Bucket::List list_op(&target);
+  rgw::sal::Bucket::ListParams params;
+  rgw::sal::Bucket::ListResults results;
 
-  list_op.params.list_versions = true;
-  list_op.params.allow_unordered = true;
-  list_op.params.marker = marker;
+  params.list_versions = true;
+  params.allow_unordered = true;
+  params.marker = marker;
 
-  bool is_truncated = false;
   int count = 0;
   int max_entries = 1000;
 
   //Loop through objects and update object acls to point to bucket owner
 
   do {
-    objs.clear();
-    int ret = list_op.list_objects(dpp, max_entries, &objs, &common_prefixes, &is_truncated, y);
+    results.objs.clear();
+    int ret = bucket->list(dpp, params, max_entries, results, y);
     if (ret < 0) {
       ldpp_dout(dpp, 0) << "ERROR: list objects failed: " << cpp_strerror(-ret) << dendl;
       return ret;
     }
 
-    list_op.params.marker = list_op.get_next_marker();
-    count += objs.size();
-
-    for (const auto& obj : objs) {
+    params.marker = results.next_marker;
+    count += results.objs.size();
 
-      rgw_obj r_obj(bucket_info.bucket, obj.key);
-      RGWRados::Object op_target(store->getRados(), bucket_info, obj_ctx, r_obj);
-      RGWRados::Object::Read read_op(&op_target);
+    for (const auto& obj : results.objs) {
+      std::unique_ptr<rgw::sal::Object> r_obj = bucket->get_object(obj.key);
 
-      map<string, bufferlist> attrs;
-      read_op.params.attrs = &attrs;
-      ret = read_op.prepare(y, dpp);
+      ret = r_obj->get_obj_attrs(&obj_ctx, y, dpp);
       if (ret < 0){
         ldpp_dout(dpp, 0) << "ERROR: failed to read object " << obj.key.name << cpp_strerror(-ret) << dendl;
         continue;
       }
-      const auto& aiter = attrs.find(RGW_ATTR_ACL);
-      if (aiter == attrs.end()) {
+      const auto& aiter = r_obj->get_attrs().find(RGW_ATTR_ACL);
+      if (aiter == r_obj->get_attrs().end()) {
         ldpp_dout(dpp, 0) << "ERROR: no acls found for object " << obj.key.name << " .Continuing with next object." << dendl;
         continue;
       } else {
@@ -3261,17 +2963,19 @@ int RGWBucketCtl::chown(rgw::sal::RGWRadosStore *store, RGWBucketInfo& bucket_in
         bl.clear();
         encode(policy, bl);
 
-        obj_ctx.set_atomic(r_obj);
-        ret = store->getRados()->set_attr(dpp, &obj_ctx, bucket_info, r_obj, RGW_ATTR_ACL, bl);
+       r_obj->set_atomic(&obj_ctx);
+       map<string, bufferlist> attrs;
+       attrs[RGW_ATTR_ACL] = bl;
+       ret = r_obj->set_obj_attrs(dpp, &obj_ctx, &attrs, nullptr, y);
         if (ret < 0) {
           ldpp_dout(dpp, 0) << "ERROR: modify attr failed " << cpp_strerror(-ret) << dendl;
           return ret;
         }
       }
     }
-    cerr << count << " objects processed in " << bucket_info.bucket.name
-        << ". Next marker " << list_op.params.marker.name << std::endl;
-  } while(is_truncated);
+    cerr << count << " objects processed in " << bucket
+        << ". Next marker " << params.marker.name << std::endl;
+  } while(results.is_truncated);
   return 0;
 }
 
@@ -3377,3 +3081,41 @@ RGWBucketInstanceMetadataHandlerBase *RGWArchiveBucketInstanceMetaHandlerAllocat
   return new RGWArchiveBucketInstanceMetadataHandler();
 }
 
+
+void RGWBucketEntryPoint::generate_test_instances(list<RGWBucketEntryPoint*>& o)
+{
+  RGWBucketEntryPoint *bp = new RGWBucketEntryPoint();
+  init_bucket(&bp->bucket, "tenant", "bucket", "pool", ".index.pool", "marker", "10");
+  bp->owner = "owner";
+  bp->creation_time = ceph::real_clock::from_ceph_timespec({ceph_le32(2), ceph_le32(3)});
+
+  o.push_back(bp);
+  o.push_back(new RGWBucketEntryPoint);
+}
+
+void RGWBucketEntryPoint::dump(Formatter *f) const
+{
+  encode_json("bucket", bucket, f);
+  encode_json("owner", owner, f);
+  utime_t ut(creation_time);
+  encode_json("creation_time", ut, f);
+  encode_json("linked", linked, f);
+  encode_json("has_bucket_info", has_bucket_info, f);
+  if (has_bucket_info) {
+    encode_json("old_bucket_info", old_bucket_info, f);
+  }
+}
+
+void RGWBucketEntryPoint::decode_json(JSONObj *obj) {
+  JSONDecoder::decode_json("bucket", bucket, obj);
+  JSONDecoder::decode_json("owner", owner, obj);
+  utime_t ut;
+  JSONDecoder::decode_json("creation_time", ut, obj);
+  creation_time = ut.to_real_time();
+  JSONDecoder::decode_json("linked", linked, obj);
+  JSONDecoder::decode_json("has_bucket_info", has_bucket_info, obj);
+  if (has_bucket_info) {
+    JSONDecoder::decode_json("old_bucket_info", old_bucket_info, obj);
+  }
+}
+