]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_op.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / rgw / rgw_op.cc
index 0005c9ee30dbf81c95133ccd8f6740eec938a711..cb8e8a90d5333e71f6c476ebeb2085d04164f34d 100644 (file)
@@ -1,5 +1,5 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
+// vim: ts=8 sw=2 smarttab ft=cpp
 
 #include <errno.h>
 #include <stdlib.h>
@@ -85,14 +85,10 @@ using rgw::ARN;
 using rgw::IAM::Effect;
 using rgw::IAM::Policy;
 
-using rgw::IAM::Policy;
-
 static string mp_ns = RGW_OBJ_NS_MULTIPART;
 static string shadow_ns = RGW_OBJ_NS_SHADOW;
 
 static void forward_req_info(CephContext *cct, req_info& info, const std::string& bucket_name);
-static int forward_request_to_master(struct req_state *s, obj_version *objv, RGWRados *store,
-                                     bufferlist& in_data, JSONParser *jp, req_info *forward_info = nullptr);
 
 static MultipartMetaFilter mp_filter;
 
@@ -189,7 +185,7 @@ static int decode_policy(CephContext *cct,
 
 
 static int get_user_policy_from_attr(CephContext * const cct,
-                                    RGWRados * const store,
+                                    rgw::sal::RGWRadosStore * const store,
                                     map<string, bufferlist>& attrs,
                                     RGWAccessControlPolicy& policy    /* out */)
 {
@@ -206,11 +202,18 @@ static int get_user_policy_from_attr(CephContext * const cct,
   return 0;
 }
 
-static int get_bucket_instance_policy_from_attr(CephContext *cct,
-                                               RGWRados *store,
-                                               RGWBucketInfo& bucket_info,
-                                               map<string, bufferlist>& bucket_attrs,
-                                               RGWAccessControlPolicy *policy)
+/**
+ * Get the AccessControlPolicy for an object off of disk.
+ * policy: must point to a valid RGWACL, and will be filled upon return.
+ * bucket: name of the bucket containing the object.
+ * object: name of the object to get the ACL for.
+ * Returns: 0 on success, -ERR# otherwise.
+ */
+int rgw_op_get_bucket_policy_from_attr(CephContext *cct,
+                                      rgw::sal::RGWRadosStore *store,
+                                      RGWBucketInfo& bucket_info,
+                                      map<string, bufferlist>& bucket_attrs,
+                                      RGWAccessControlPolicy *policy)
 {
   map<string, bufferlist>::iterator aiter = bucket_attrs.find(RGW_ATTR_ACL);
 
@@ -220,33 +223,34 @@ static int get_bucket_instance_policy_from_attr(CephContext *cct,
       return ret;
   } else {
     ldout(cct, 0) << "WARNING: couldn't find acl header for bucket, generating default" << dendl;
-    RGWUserInfo uinfo;
+    rgw::sal::RGWRadosUser user(store);
     /* object exists, but policy is broken */
-    int r = rgw_get_user_info_by_uid(store, bucket_info.owner, uinfo);
+    int r = user.get_by_id(bucket_info.owner, null_yield);
     if (r < 0)
       return r;
 
-    policy->create_default(bucket_info.owner, uinfo.display_name);
+    policy->create_default(bucket_info.owner, user.get_display_name());
   }
   return 0;
 }
 
 static int get_obj_policy_from_attr(CephContext *cct,
-                                   RGWRados *store,
+                                   rgw::sal::RGWRadosStore *store,
                                    RGWObjectCtx& obj_ctx,
                                    RGWBucketInfo& bucket_info,
                                    map<string, bufferlist>& bucket_attrs,
                                    RGWAccessControlPolicy *policy,
                                     string *storage_class,
-                                   rgw_obj& obj)
+                                   rgw_obj& obj,
+                                    optional_yield y)
 {
   bufferlist bl;
   int ret = 0;
 
-  RGWRados::Object op_target(store, bucket_info, obj_ctx, obj);
+  RGWRados::Object op_target(store->getRados(), bucket_info, obj_ctx, obj);
   RGWRados::Object::Read rop(&op_target);
 
-  ret = rop.get_attr(RGW_ATTR_ACL, bl);
+  ret = rop.get_attr(RGW_ATTR_ACL, bl, y);
   if (ret >= 0) {
     ret = decode_policy(cct, bl, policy);
     if (ret < 0)
@@ -254,17 +258,17 @@ static int get_obj_policy_from_attr(CephContext *cct,
   } else if (ret == -ENODATA) {
     /* object exists, but policy is broken */
     ldout(cct, 0) << "WARNING: couldn't find acl header for object, generating default" << dendl;
-    RGWUserInfo uinfo;
-    ret = rgw_get_user_info_by_uid(store, bucket_info.owner, uinfo);
+    rgw::sal::RGWRadosUser user(store);
+    ret = user.get_by_id(bucket_info.owner, y);
     if (ret < 0)
       return ret;
 
-    policy->create_default(bucket_info.owner, uinfo.display_name);
+    policy->create_default(bucket_info.owner, user.get_display_name());
   }
 
   if (storage_class) {
     bufferlist scbl;
-    int r = rop.get_attr(RGW_ATTR_STORAGE_CLASS, scbl);
+    int r = rop.get_attr(RGW_ATTR_STORAGE_CLASS, scbl, y);
     if (r >= 0) {
       *storage_class = scbl.to_str();
     } else {
@@ -276,24 +280,8 @@ static int get_obj_policy_from_attr(CephContext *cct,
 }
 
 
-/**
- * Get the AccessControlPolicy for an object off of disk.
- * policy: must point to a valid RGWACL, and will be filled upon return.
- * bucket: name of the bucket containing the object.
- * object: name of the object to get the ACL for.
- * Returns: 0 on success, -ERR# otherwise.
- */
-int rgw_op_get_bucket_policy_from_attr(CephContext *cct,
-                                       RGWRados *store,
-                                       RGWBucketInfo& bucket_info,
-                                       map<string, bufferlist>& bucket_attrs,
-                                       RGWAccessControlPolicy *policy)
-{
-  return get_bucket_instance_policy_from_attr(cct, store, bucket_info, bucket_attrs, policy);
-}
-
 static boost::optional<Policy> get_iam_policy_from_attr(CephContext* cct,
-                                                       RGWRados* store,
+                                                       rgw::sal::RGWRadosStore* store,
                                                        map<string, bufferlist>& attrs,
                                                        const string& tenant) {
   auto i = attrs.find(RGW_ATTR_IAM_POLICY);
@@ -304,8 +292,25 @@ static boost::optional<Policy> get_iam_policy_from_attr(CephContext* cct,
   }
 }
 
+static boost::optional<PublicAccessBlockConfiguration>
+get_public_access_conf_from_attr(const map<string, bufferlist>& attrs)
+{
+  if (auto aiter = attrs.find(RGW_ATTR_PUBLIC_ACCESS);
+      aiter != attrs.end()) {
+    bufferlist::const_iterator iter{&aiter->second};
+    PublicAccessBlockConfiguration access_conf;
+    try {
+      access_conf.decode(iter);
+    } catch (const buffer::error& e) {
+      return boost::none;
+    }
+    return access_conf;
+  }
+  return boost::none;
+}
+
 vector<Policy> get_iam_user_policy_from_attr(CephContext* cct,
-                        RGWRados* store,
+                        rgw::sal::RGWRadosStore* store,
                         map<string, bufferlist>& attrs,
                         const string& tenant) {
   vector<Policy> policies;
@@ -322,30 +327,30 @@ vector<Policy> get_iam_user_policy_from_attr(CephContext* cct,
   return policies;
 }
 
-static int get_obj_attrs(RGWRados *store, struct req_state *s, const rgw_obj& obj, map<string, bufferlist>& attrs, rgw_obj *target_obj = nullptr)
+static int get_obj_attrs(rgw::sal::RGWRadosStore *store, struct req_state *s, const rgw_obj& obj, map<string, bufferlist>& attrs, rgw_obj *target_obj = nullptr)
 {
-  RGWRados::Object op_target(store, s->bucket_info, *static_cast<RGWObjectCtx *>(s->obj_ctx), obj);
+  RGWRados::Object op_target(store->getRados(), s->bucket_info, *static_cast<RGWObjectCtx *>(s->obj_ctx), obj);
   RGWRados::Object::Read read_op(&op_target);
 
   read_op.params.attrs = &attrs;
   read_op.params.target_obj = target_obj;
 
-  return read_op.prepare();
+  return read_op.prepare(s->yield);
 }
 
-static int get_obj_head(RGWRados *store, struct req_state *s,
+static int get_obj_head(rgw::sal::RGWRadosStore *store, struct req_state *s,
                         const rgw_obj& obj,
                         map<string, bufferlist> *attrs,
                          bufferlist *pbl)
 {
-  store->set_prefetch_data(s->obj_ctx, obj);
+  store->getRados()->set_prefetch_data(s->obj_ctx, obj);
 
-  RGWRados::Object op_target(store, s->bucket_info, *static_cast<RGWObjectCtx *>(s->obj_ctx), obj);
+  RGWRados::Object op_target(store->getRados(), s->bucket_info, *static_cast<RGWObjectCtx *>(s->obj_ctx), obj);
   RGWRados::Object::Read read_op(&op_target);
 
   read_op.params.attrs = attrs;
 
-  int ret = read_op.prepare();
+  int ret = read_op.prepare(s->yield);
   if (ret < 0) {
     return ret;
   }
@@ -354,7 +359,7 @@ static int get_obj_head(RGWRados *store, struct req_state *s,
     return 0;
   }
 
-  ret = read_op.read(0, s->cct->_conf->rgw_max_chunk_size, *pbl);
+  ret = read_op.read(0, s->cct->_conf->rgw_max_chunk_size, *pbl, s->yield);
 
   return 0;
 }
@@ -377,7 +382,7 @@ struct multipart_upload_info
 };
 WRITE_CLASS_ENCODER(multipart_upload_info)
 
-static int get_multipart_info(RGWRados *store, struct req_state *s,
+static int get_multipart_info(rgw::sal::RGWRadosStore *store, struct req_state *s,
                              const rgw_obj& obj,
                               RGWAccessControlPolicy *policy,
                              map<string, bufferlist> *attrs,
@@ -426,7 +431,7 @@ static int get_multipart_info(RGWRados *store, struct req_state *s,
   return 0;
 }
 
-static int get_multipart_info(RGWRados *store, struct req_state *s,
+static int get_multipart_info(rgw::sal::RGWRadosStore *store, struct req_state *s,
                              const string& meta_oid,
                               RGWAccessControlPolicy *policy,
                              map<string, bufferlist> *attrs,
@@ -442,24 +447,24 @@ static int get_multipart_info(RGWRados *store, struct req_state *s,
   return get_multipart_info(store, s, meta_obj, policy, attrs, upload_info);
 }
 
-static int modify_obj_attr(RGWRados *store, struct req_state *s, const rgw_obj& obj, const char* attr_name, bufferlist& attr_val)
+static int modify_obj_attr(rgw::sal::RGWRadosStore *store, struct req_state *s, const rgw_obj& obj, const char* attr_name, bufferlist& attr_val)
 {
   map<string, bufferlist> attrs;
-  RGWRados::Object op_target(store, s->bucket_info, *static_cast<RGWObjectCtx *>(s->obj_ctx), obj);
+  RGWRados::Object op_target(store->getRados(), s->bucket_info, *static_cast<RGWObjectCtx *>(s->obj_ctx), obj);
   RGWRados::Object::Read read_op(&op_target);
 
   read_op.params.attrs = &attrs;
-  
-  int r = read_op.prepare();
+
+  int r = read_op.prepare(s->yield);
   if (r < 0) {
     return r;
   }
-  store->set_atomic(s->obj_ctx, read_op.state.obj);
+  store->getRados()->set_atomic(s->obj_ctx, read_op.state.obj);
   attrs[attr_name] = attr_val;
-  return store->set_attrs(s->obj_ctx, s->bucket_info, read_op.state.obj, attrs, NULL);
+  return store->getRados()->set_attrs(s->obj_ctx, s->bucket_info, read_op.state.obj, attrs, NULL, s->yield);
 }
 
-static int read_bucket_policy(RGWRados *store,
+static int read_bucket_policy(rgw::sal::RGWRadosStore *store,
                               struct req_state *s,
                               RGWBucketInfo& bucket_info,
                               map<string, bufferlist>& bucket_attrs,
@@ -484,7 +489,7 @@ static int read_bucket_policy(RGWRados *store,
   return ret;
 }
 
-static int read_obj_policy(RGWRados *store,
+static int read_obj_policy(rgw::sal::RGWRadosStore *store,
                            struct req_state *s,
                            RGWBucketInfo& bucket_info,
                            map<string, bufferlist>& bucket_attrs,
@@ -517,7 +522,7 @@ static int read_obj_policy(RGWRados *store,
 
   RGWObjectCtx *obj_ctx = static_cast<RGWObjectCtx *>(s->obj_ctx);
   int ret = get_obj_policy_from_attr(s->cct, store, *obj_ctx,
-                                     bucket_info, bucket_attrs, acl, storage_class, obj);
+                                     bucket_info, bucket_attrs, acl, storage_class, obj, s->yield);
   if (ret == -ENOENT) {
     /* object does not exist checking the bucket's ACL to make sure
        that we send a proper error code */
@@ -527,7 +532,7 @@ static int read_obj_policy(RGWRados *store,
       return ret;
     }
     const rgw_user& bucket_owner = bucket_policy.get_owner().get_id();
-    if (bucket_owner.compare(s->user->user_id) != 0 &&
+    if (bucket_owner.compare(s->user->get_id()) != 0 &&
         ! s->auth.identity->is_admin_of(bucket_owner)) {
       if (policy) {
         auto r =  policy->eval(s->env, *s->auth.identity, rgw::IAM::s3ListBucket, ARN(bucket));
@@ -554,16 +559,16 @@ static int read_obj_policy(RGWRados *store,
  * only_bucket: If true, reads the user and bucket ACLs rather than the object ACL.
  * Returns: 0 on success, -ERR# otherwise.
  */
-int rgw_build_bucket_policies(RGWRados* store, struct req_state* s)
+int rgw_build_bucket_policies(rgw::sal::RGWRadosStore* store, struct req_state* s)
 {
   int ret = 0;
   rgw_obj_key obj;
-  RGWUserInfo bucket_owner_info;
-  auto obj_ctx = store->svc.sysobj->init_obj_ctx();
+  auto obj_ctx = store->svc()->sysobj->init_obj_ctx();
 
   string bi = s->info.args.get(RGW_SYS_PARAM_PREFIX "bucket-instance");
   if (!bi.empty()) {
-    ret = rgw_bucket_parse_bucket_instance(bi, &s->bucket_instance_id, &s->bucket_instance_shard_id);
+    string bucket_name;
+    ret = rgw_bucket_parse_bucket_instance(bi, &bucket_name, &s->bucket_instance_id, &s->bucket_instance_shard_id);
     if (ret < 0) {
       return ret;
     }
@@ -575,7 +580,7 @@ int rgw_build_bucket_policies(RGWRados* store, struct req_state* s)
     /* We aren't allocating the account policy for those operations using
      * the Swift's infrastructure that don't really need req_state::user.
      * Typical example here is the implementation of /info. */
-    if (!s->user->user_id.empty()) {
+    if (!s->user->get_id().empty()) {
       s->user_acl = std::make_unique<RGWAccessControlPolicy_SWIFTAcct>(s->cct);
     }
     s->bucket_acl = std::make_unique<RGWAccessControlPolicy_SWIFT>(s->cct);
@@ -588,13 +593,13 @@ int rgw_build_bucket_policies(RGWRados* store, struct req_state* s)
     RGWBucketInfo source_info;
 
     if (s->bucket_instance_id.empty()) {
-      ret = store->get_bucket_info(obj_ctx, s->src_tenant_name, s->src_bucket_name, source_info, NULL);
+      ret = store->getRados()->get_bucket_info(store->svc(), s->src_tenant_name, s->src_bucket_name, source_info, NULL, s->yield);
     } else {
-      ret = store->get_bucket_instance_info(obj_ctx, s->bucket_instance_id, source_info, NULL, NULL);
+      ret = store->getRados()->get_bucket_instance_info(obj_ctx, s->bucket_instance_id, source_info, NULL, NULL, s->yield);
     }
     if (ret == 0) {
       string& zonegroup = source_info.zonegroup;
-      s->local_source = store->svc.zone->get_zonegroup().equals(zonegroup);
+      s->local_source = store->svc()->zone->get_zonegroup().equals(zonegroup);
     }
   }
 
@@ -602,31 +607,33 @@ int rgw_build_bucket_policies(RGWRados* store, struct req_state* s)
     rgw_user uid;
     std::string display_name;
   } acct_acl_user = {
-    s->user->user_id,
-    s->user->display_name,
+    s->user->get_id(),
+    s->user->get_display_name(),
   };
 
   if (!s->bucket_name.empty()) {
     s->bucket_exists = true;
-    if (s->bucket_instance_id.empty()) {
-      ret = store->get_bucket_info(obj_ctx, s->bucket_tenant, s->bucket_name,
-                                   s->bucket_info, &s->bucket_mtime,
-                                   &s->bucket_attrs);
-    } else {
-      ret = store->get_bucket_instance_info(obj_ctx, s->bucket_instance_id,
-                                            s->bucket_info, &s->bucket_mtime,
-                                            &s->bucket_attrs);
-    }
+
+    auto b = rgw_bucket(rgw_bucket_key(s->bucket_tenant, s->bucket_name, s->bucket_instance_id));
+
+    RGWObjVersionTracker ep_ot;
+    ret = store->ctl()->bucket->read_bucket_info(b, &s->bucket_info,
+                                              s->yield,
+                                             RGWBucketCtl::BucketInstance::GetParams()
+                                               .set_mtime(&s->bucket_mtime)
+                                               .set_attrs(&s->bucket_attrs),
+                                             &ep_ot);
     if (ret < 0) {
       if (ret != -ENOENT) {
         string bucket_log;
-        rgw_make_bucket_entry_name(s->bucket_tenant, s->bucket_name, bucket_log);
+        bucket_log = rgw_make_bucket_entry_name(s->bucket_tenant, s->bucket_name);
         ldpp_dout(s, 0) << "NOTICE: couldn't get bucket from bucket_name (name="
             << bucket_log << ")" << dendl;
         return ret;
       }
       s->bucket_exists = false;
     }
+    s->bucket_ep_objv = ep_ot.read_version;
     s->bucket = s->bucket_info.bucket;
 
     if (s->bucket_exists) {
@@ -637,14 +644,13 @@ int rgw_build_bucket_policies(RGWRados* store, struct req_state* s)
         s->bucket_acl->get_owner().get_display_name(),
       };
     } else {
-      s->bucket_acl->create_default(s->user->user_id, s->user->display_name);
-      ret = -ERR_NO_SUCH_BUCKET;
+      return -ERR_NO_SUCH_BUCKET;
     }
 
     s->bucket_owner = s->bucket_acl->get_owner();
 
     RGWZoneGroup zonegroup;
-    int r = store->svc.zone->get_zonegroup(s->bucket_info.zonegroup, zonegroup);
+    int r = store->svc()->zone->get_zonegroup(s->bucket_info.zonegroup, zonegroup);
     if (!r) {
       if (!zonegroup.endpoints.empty()) {
        s->zonegroup_endpoint = zonegroup.endpoints.front();
@@ -661,14 +667,14 @@ int rgw_build_bucket_policies(RGWRados* store, struct req_state* s)
       ret = r;
     }
 
-    if (s->bucket_exists && !store->svc.zone->get_zonegroup().equals(s->bucket_info.zonegroup)) {
+    if (s->bucket_exists && !store->svc()->zone->get_zonegroup().equals(s->bucket_info.zonegroup)) {
       ldpp_dout(s, 0) << "NOTICE: request for data in a different zonegroup ("
           << s->bucket_info.zonegroup << " != "
-          << store->svc.zone->get_zonegroup().get_id() << ")" << dendl;
+          << store->svc()->zone->get_zonegroup().get_id() << ")" << dendl;
       /* we now need to make sure that the operation actually requires copy source, that is
        * it's a copy operation
        */
-      if (store->svc.zone->get_zonegroup().is_master_zonegroup() && s->system_request) {
+      if (store->svc()->zone->get_zonegroup().is_master_zonegroup() && s->system_request) {
         /*If this is the master, don't redirect*/
       } else if (s->op_type == RGW_OP_GET_BUCKET_LOCATION ) {
         /* If op is get bucket location, don't redirect */
@@ -685,17 +691,21 @@ int rgw_build_bucket_policies(RGWRados* store, struct req_state* s)
       s->dest_placement.storage_class = s->info.storage_class;
       s->dest_placement.inherit_from(s->bucket_info.placement_rule);
 
-      if (!store->svc.zone->get_zone_params().valid_placement(s->dest_placement)) {
+      if (!store->svc()->zone->get_zone_params().valid_placement(s->dest_placement)) {
         ldpp_dout(s, 0) << "NOTICE: invalid dest placement: " << s->dest_placement.to_str() << dendl;
         return -EINVAL;
       }
     }
+
+    if(s->bucket_exists) {
+      s->bucket_access_conf = get_public_access_conf_from_attr(s->bucket_attrs);
+    }
   }
 
   /* handle user ACL only for those APIs which support it */
   if (s->user_acl) {
     map<string, bufferlist> uattrs;
-    ret = rgw_get_user_attrs_by_uid(store, acct_acl_user.uid, uattrs);
+    ret = store->ctl()->user->get_attrs_by_uid(acct_acl_user.uid, &uattrs, s->yield);
     if (!ret) {
       ret = get_user_policy_from_attr(s->cct, store, uattrs, *s->user_acl);
     }
@@ -712,22 +722,22 @@ int rgw_build_bucket_policies(RGWRados* store, struct req_state* s)
       ret = 0;
     } else if (ret < 0) {
       ldpp_dout(s, 0) << "NOTICE: couldn't get user attrs for handling ACL "
-          "(user_id=" << s->user->user_id << ", ret=" << ret << ")" << dendl;
+          "(user_id=" << s->user->get_id() << ", ret=" << ret << ")" << dendl;
       return ret;
     }
   }
   // We don't need user policies in case of STS token returned by AssumeRole,
   // hence the check for user type
-  if (! s->user->user_id.empty() && s->auth.identity->get_identity_type() != TYPE_ROLE) {
+  if (! s->user->get_id().empty() && s->auth.identity->get_identity_type() != TYPE_ROLE) {
     try {
       map<string, bufferlist> uattrs;
-      if (ret = rgw_get_user_attrs_by_uid(store, s->user->user_id, uattrs); ! ret) {
+      if (ret = store->ctl()->user->get_attrs_by_uid(s->user->get_id(), &uattrs, s->yield); ! ret) {
         if (s->iam_user_policies.empty()) {
-          s->iam_user_policies = get_iam_user_policy_from_attr(s->cct, store, uattrs, s->user->user_id.tenant);
+          s->iam_user_policies = get_iam_user_policy_from_attr(s->cct, store, uattrs, s->user->get_tenant());
         } else {
           // This scenario can happen when a STS token has a policy, then we need to append other user policies
           // to the existing ones. (e.g. token returned by GetSessionToken)
-          auto user_policies = get_iam_user_policy_from_attr(s->cct, store, uattrs, s->user->user_id.tenant);
+          auto user_policies = get_iam_user_policy_from_attr(s->cct, store, uattrs, s->user->get_tenant());
           s->iam_user_policies.insert(s->iam_user_policies.end(), user_policies.begin(), user_policies.end());
         }
       } else {
@@ -752,7 +762,7 @@ int rgw_build_bucket_policies(RGWRados* store, struct req_state* s)
     ret = -EACCES;
   }
 
-  bool success = store->svc.zone->get_redirect_zone_endpoint(&s->redirect_zone_endpoint);
+  bool success = store->svc()->zone->get_redirect_zone_endpoint(&s->redirect_zone_endpoint);
   if (success) {
     ldpp_dout(s, 20) << "redirect_zone_endpoint=" << s->redirect_zone_endpoint << dendl;
   }
@@ -766,7 +776,7 @@ int rgw_build_bucket_policies(RGWRados* store, struct req_state* s)
  * only_bucket: If true, reads the bucket ACL rather than the object ACL.
  * Returns: 0 on success, -ERR# otherwise.
  */
-int rgw_build_object_policies(RGWRados *store, struct req_state *s,
+int rgw_build_object_policies(rgw::sal::RGWRadosStore *store, struct req_state *s,
                              bool prefetch_data)
 {
   int ret = 0;
@@ -778,9 +788,9 @@ int rgw_build_object_policies(RGWRados *store, struct req_state *s,
     s->object_acl = std::make_unique<RGWAccessControlPolicy>(s->cct);
     rgw_obj obj(s->bucket, s->object);
       
-    store->set_atomic(s->obj_ctx, obj);
+    store->getRados()->set_atomic(s->obj_ctx, obj);
     if (prefetch_data) {
-      store->set_prefetch_data(s->obj_ctx, obj);
+      store->getRados()->set_prefetch_data(s->obj_ctx, obj);
     }
     ret = read_obj_policy(store, s, s->bucket_info, s->bucket_attrs,
                          s->object_acl.get(), nullptr, s->iam_policy, s->bucket,
@@ -798,7 +808,7 @@ void rgw_add_to_iam_environment(rgw::IAM::Environment& e, std::string_view key,
 }
 
 static int rgw_iam_add_tags_from_bl(struct req_state* s, bufferlist& bl){
-  RGWObjTags tagset;
+  RGWObjTags& tagset = s->tagset;
   try {
     auto bliter = bl.cbegin();
     tagset.decode(bliter);
@@ -813,9 +823,9 @@ static int rgw_iam_add_tags_from_bl(struct req_state* s, bufferlist& bl){
   return 0;
 }
 
-static int rgw_iam_add_existing_objtags(RGWRados* store, struct req_state* s, rgw_obj& obj, std::uint64_t action){
+static int rgw_iam_add_existing_objtags(rgw::sal::RGWRadosStore* store, struct req_state* s, rgw_obj& obj, std::uint64_t action){
   map <string, bufferlist> attrs;
-  store->set_atomic(s->obj_ctx, obj);
+  store->getRados()->set_atomic(s->obj_ctx, obj);
   int op_ret = get_obj_attrs(store, s, obj, attrs);
   if (op_ret < 0)
     return op_ret;
@@ -847,7 +857,7 @@ static void rgw_add_grant_to_iam_environment(rgw::IAM::Environment& e, struct re
   }
 }
 
-void rgw_build_iam_environment(RGWRados* store,
+void rgw_build_iam_environment(rgw::sal::RGWRadosStore* store,
                                      struct req_state* s)
 {
   const auto& m = s->info.env->get_map();
@@ -896,7 +906,7 @@ void rgw_build_iam_environment(RGWRados* store,
     // What to do about aws::userid? One can have multiple access
     // keys so that isn't really suitable. Do we have a durable
     // identifier that can persist through name changes?
-    s->env.emplace("aws:username", s->user->user_id.id);
+    s->env.emplace("aws:username", s->user->get_id().id);
   }
 
   i = m.find("HTTP_X_AMZ_SECURITY_TOKEN");
@@ -949,9 +959,9 @@ int retry_raced_bucket_write(RGWRados* g, req_state* s, const F& f) {
 int RGWGetObj::verify_permission()
 {
   obj = rgw_obj(s->bucket, s->object);
-  store->set_atomic(s->obj_ctx, obj);
+  store->getRados()->set_atomic(s->obj_ctx, obj);
   if (get_data) {
-    store->set_prefetch_data(s->obj_ctx, obj);
+    store->getRados()->set_prefetch_data(s->obj_ctx, obj);
   }
 
   if (torrent.get_flag()) {
@@ -988,19 +998,39 @@ int RGWGetObj::verify_permission()
   return 0;
 }
 
+// cache the objects tags into the requests
+// use inside try/catch as "decode()" may throw
+void populate_tags_in_request(req_state* s, const std::map<std::string, bufferlist>& attrs) {
+  const auto attr_iter = attrs.find(RGW_ATTR_TAGS);
+  if (attr_iter != attrs.end()) {
+    auto bliter = attr_iter->second.cbegin();
+    decode(s->tagset, bliter);
+  }
+}
+
+// cache the objects metadata into the request
+void populate_metadata_in_request(req_state* s, std::map<std::string, bufferlist>& attrs) {
+  for (auto& attr : attrs) {
+    if (boost::algorithm::starts_with(attr.first, RGW_ATTR_META_PREFIX)) {
+      std::string_view key(attr.first);
+      key.remove_prefix(sizeof(RGW_ATTR_PREFIX)-1);
+      s->info.x_meta_map.emplace(key, attr.second.c_str());
+    }
+  }
+}
 
 int RGWOp::verify_op_mask()
 {
   uint32_t required_mask = op_mask();
 
   ldpp_dout(this, 20) << "required_mask= " << required_mask
-      << " user.op_mask=" << s->user->op_mask << dendl;
+      << " user.op_mask=" << s->user->get_info().op_mask << dendl;
 
-  if ((s->user->op_mask & required_mask) != required_mask) {
+  if ((s->user->get_info().op_mask & required_mask) != required_mask) {
     return -EPERM;
   }
 
-  if (!s->system_request && (required_mask & RGW_OP_TYPE_MODIFY) && !store->svc.zone->zone_is_writeable()) {
+  if (!s->system_request && (required_mask & RGW_OP_TYPE_MODIFY) && !store->svc()->zone->zone_is_writeable()) {
     ldpp_dout(this, 5) << "NOTICE: modify request to a read-only zone by a "
         "non-system user, permission denied"  << dendl;
     return -EPERM;
@@ -1046,7 +1076,7 @@ void RGWGetObjTags::execute()
 
   obj = rgw_obj(s->bucket, s->object);
 
-  store->set_atomic(s->obj_ctx, obj);
+  store->getRados()->set_atomic(s->obj_ctx, obj);
 
   op_ret = get_obj_attrs(store, s, obj, attrs);
   if (op_ret < 0) {
@@ -1099,7 +1129,7 @@ void RGWPutObjTags::execute()
 
   rgw_obj obj;
   obj = rgw_obj(s->bucket, s->object);
-  store->set_atomic(s->obj_ctx, obj);
+  store->getRados()->set_atomic(s->obj_ctx, obj);
   op_ret = modify_obj_attr(store, s, obj, RGW_ATTR_TAGS, tags_bl);
   if (op_ret == -ECANCELED){
     op_ret = -ERR_TAG_CONFLICT;
@@ -1144,12 +1174,198 @@ void RGWDeleteObjTags::execute()
 
   rgw_obj obj;
   obj = rgw_obj(s->bucket, s->object);
-  store->set_atomic(s->obj_ctx, obj);
+  store->getRados()->set_atomic(s->obj_ctx, obj);
   map <string, bufferlist> attrs;
   map <string, bufferlist> rmattr;
   bufferlist bl;
   rmattr[RGW_ATTR_TAGS] = bl;
-  op_ret = store->set_attrs(s->obj_ctx, s->bucket_info, obj, attrs, &rmattr);
+  op_ret = store->getRados()->set_attrs(s->obj_ctx, s->bucket_info, obj, attrs, &rmattr, s->yield);
+}
+
+int RGWGetBucketTags::verify_permission()
+{
+
+  if (!verify_bucket_permission(this, s, rgw::IAM::s3GetBucketTagging)) {
+    return -EACCES;
+  }
+
+  return 0;
+}
+
+void RGWGetBucketTags::pre_exec()
+{
+  rgw_bucket_object_pre_exec(s);
+}
+
+void RGWGetBucketTags::execute() 
+{
+  auto iter = s->bucket_attrs.find(RGW_ATTR_TAGS);
+  if (iter != s->bucket_attrs.end()) {
+    has_tags = true;
+    tags_bl.append(iter->second);
+  } else {
+    op_ret = -ERR_NO_SUCH_TAG_SET;
+  }
+  send_response_data(tags_bl);
+}
+
+int RGWPutBucketTags::verify_permission() {
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3PutBucketTagging);
+}
+
+void RGWPutBucketTags::execute() {
+
+  op_ret = get_params();
+  if (op_ret < 0) 
+    return;
+
+  if (!store->svc()->zone->is_meta_master()) {
+    op_ret = forward_request_to_master(s, nullptr, store, in_data, nullptr);
+    if (op_ret < 0) {
+      ldpp_dout(this, 0) << "forward_request_to_master returned ret=" << op_ret << dendl;
+    }
+  }
+
+  op_ret = retry_raced_bucket_write(store->getRados(), s, [this] {
+    map<string, bufferlist> attrs = s->bucket_attrs;
+    attrs[RGW_ATTR_TAGS] = tags_bl;
+    return store->ctl()->bucket->set_bucket_instance_attrs(s->bucket_info, attrs, &s->bucket_info.objv_tracker, s->yield);
+  });
+
+}
+
+void RGWDeleteBucketTags::pre_exec()
+{
+  rgw_bucket_object_pre_exec(s);
+}
+
+int RGWDeleteBucketTags::verify_permission()
+{
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3PutBucketTagging);
+}
+
+void RGWDeleteBucketTags::execute()
+{
+  if (!store->svc()->zone->is_meta_master()) {
+    bufferlist in_data;
+    op_ret = forward_request_to_master(s, nullptr, store, in_data, nullptr);
+    if (op_ret < 0) {
+      ldpp_dout(this, 0) << "forward_request_to_master returned ret=" << op_ret << dendl;
+      return;
+    }
+  }
+
+  op_ret = retry_raced_bucket_write(store->getRados(), s, [this] {
+    map<string, bufferlist> attrs = s->bucket_attrs;
+    attrs.erase(RGW_ATTR_TAGS);
+    op_ret = store->ctl()->bucket->set_bucket_instance_attrs(s->bucket_info, attrs, &s->bucket_info.objv_tracker, s->yield);
+    if (op_ret < 0) {
+      ldpp_dout(this, 0) << "RGWDeleteBucketTags() failed to remove RGW_ATTR_TAGS on bucket="
+                        << s->bucket.name
+                        << " returned err= " << op_ret << dendl;
+    }
+    return op_ret;
+  });
+}
+
+int RGWGetBucketReplication::verify_permission()
+{
+  if (!verify_bucket_permission(this, s, rgw::IAM::s3GetReplicationConfiguration)) {
+    return -EACCES;
+  }
+
+  return 0;
+}
+
+void RGWGetBucketReplication::pre_exec()
+{
+  rgw_bucket_object_pre_exec(s);
+}
+
+void RGWGetBucketReplication::execute()
+{
+  send_response_data();
+}
+
+int RGWPutBucketReplication::verify_permission() {
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3PutReplicationConfiguration);
+}
+
+void RGWPutBucketReplication::execute() {
+
+  op_ret = get_params();
+  if (op_ret < 0) 
+    return;
+
+  if (!store->svc()->zone->is_meta_master()) {
+    op_ret = forward_request_to_master(s, nullptr, store, in_data, nullptr);
+    if (op_ret < 0) {
+      ldpp_dout(this, 0) << "forward_request_to_master returned ret=" << op_ret << dendl;
+      return;
+    }
+  }
+
+  op_ret = retry_raced_bucket_write(store->getRados(), s, [this] {
+    auto sync_policy = (s->bucket_info.sync_policy ? *s->bucket_info.sync_policy : rgw_sync_policy_info());
+
+    for (auto& group : sync_policy_groups) {
+      sync_policy.groups[group.id] = group;
+    }
+
+    s->bucket_info.set_sync_policy(std::move(sync_policy));
+
+    int ret = store->getRados()->put_bucket_instance_info(s->bucket_info, false, real_time(),
+                                                          &s->bucket_attrs);
+    if (ret < 0) {
+      ldpp_dout(this, 0) << "ERROR: put_bucket_instance_info (bucket=" << s->bucket_info.bucket.get_key() << ") returned ret=" << ret << dendl;
+      return ret;
+    }
+
+    return 0;
+  });
+}
+
+void RGWDeleteBucketReplication::pre_exec()
+{
+  rgw_bucket_object_pre_exec(s);
+}
+
+int RGWDeleteBucketReplication::verify_permission()
+{
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3DeleteReplicationConfiguration);
+}
+
+void RGWDeleteBucketReplication::execute()
+{
+  if (!store->svc()->zone->is_meta_master()) {
+    bufferlist in_data;
+    op_ret = forward_request_to_master(s, nullptr, store, in_data, nullptr);
+    if (op_ret < 0) {
+      ldpp_dout(this, 0) << "forward_request_to_master returned ret=" << op_ret << dendl;
+      return;
+    }
+  }
+
+  op_ret = retry_raced_bucket_write(store->getRados(), s, [this] {
+    if (!s->bucket_info.sync_policy) {
+      return 0;
+    }
+
+    rgw_sync_policy_info sync_policy = *s->bucket_info.sync_policy;
+
+    update_sync_policy(&sync_policy);
+
+    s->bucket_info.set_sync_policy(std::move(sync_policy));
+
+    int ret = store->getRados()->put_bucket_instance_info(s->bucket_info, false, real_time(),
+                                                          &s->bucket_attrs);
+    if (ret < 0) {
+      ldpp_dout(this, 0) << "ERROR: put_bucket_instance_info (bucket=" << s->bucket_info.bucket.get_key() << ") returned ret=" << ret << dendl;
+      return ret;
+    }
+
+    return 0;
+  });
 }
 
 int RGWOp::do_aws4_auth_completion()
@@ -1178,7 +1394,7 @@ int RGWOp::init_quota()
     return 0;
 
   /* init quota related stuff */
-  if (!(s->user->op_mask & RGW_OP_TYPE_MODIFY)) {
+  if (!(s->user->get_info().op_mask & RGW_OP_TYPE_MODIFY)) {
     return 0;
   }
 
@@ -1187,30 +1403,30 @@ int RGWOp::init_quota()
     return 0;
   }
 
-  RGWUserInfo owner_info;
-  RGWUserInfo *uinfo;
+  rgw::sal::RGWRadosUser owner_user(store);
+  rgw::sal::RGWUser *user;
 
-  if (s->user->user_id == s->bucket_owner.get_id()) {
-    uinfo = s->user;
+  if (s->user->get_id() == s->bucket_owner.get_id()) {
+    user = s->user;
   } else {
-    int r = rgw_get_user_info_by_uid(store, s->bucket_info.owner, owner_info);
+    int r = owner_user.get_by_id(s->bucket_info.owner, s->yield);
     if (r < 0)
       return r;
-    uinfo = &owner_info;
+    user = &owner_user;
   }
 
   if (s->bucket_info.quota.enabled) {
     bucket_quota = s->bucket_info.quota;
-  } else if (uinfo->bucket_quota.enabled) {
-    bucket_quota = uinfo->bucket_quota;
+  } else if (user->get_info().bucket_quota.enabled) {
+    bucket_quota = user->get_info().bucket_quota;
   } else {
-    bucket_quota = store->svc.quota->get_bucket_quota();
+    bucket_quota = store->svc()->quota->get_bucket_quota();
   }
 
-  if (uinfo->user_quota.enabled) {
-    user_quota = uinfo->user_quota;
+  if (user->get_info().user_quota.enabled) {
+    user_quota = user->get_info().user_quota;
   } else {
-    user_quota = store->svc.quota->get_user_quota();
+    user_quota = store->svc()->quota->get_user_quota();
   }
 
   return 0;
@@ -1322,8 +1538,9 @@ bool RGWOp::generate_cors_headers(string& origin, string& method, string& header
 
   /* Custom: */
   origin = orig;
-  op_ret = read_bucket_cors();
-  if (op_ret < 0) {
+  int temp_op_ret = read_bucket_cors();
+  if (temp_op_ret < 0) {
+    op_ret = temp_op_ret;
     return false;
   }
 
@@ -1401,9 +1618,9 @@ int RGWGetObj::read_user_manifest_part(rgw_bucket& bucket,
       << " end=" << cur_end << dendl;
 
   obj_ctx.set_atomic(part);
-  store->set_prefetch_data(&obj_ctx, part);
+  store->getRados()->set_prefetch_data(&obj_ctx, part);
 
-  RGWRados::Object op_target(store, s->bucket_info, obj_ctx, part);
+  RGWRados::Object op_target(store->getRados(), s->bucket_info, obj_ctx, part);
   RGWRados::Object::Read read_op(&op_target);
 
   if (!swift_slo) {
@@ -1413,7 +1630,7 @@ int RGWGetObj::read_user_manifest_part(rgw_bucket& bucket,
   read_op.params.attrs = &attrs;
   read_op.params.obj_size = &obj_size;
 
-  op_ret = read_op.prepare();
+  op_ret = read_op.prepare(s->yield);
   if (op_ret < 0)
     return op_ret;
   op_ret = read_op.range_to_ofs(ent.meta.accounted_size, cur_ofs, cur_end);
@@ -1455,7 +1672,7 @@ int RGWGetObj::read_user_manifest_part(rgw_bucket& bucket,
    * stored inside different accounts. */
   if (s->system_request) {
     ldpp_dout(this, 2) << "overriding permissions due to system operation" << dendl;
-  } else if (s->auth.identity->is_admin_of(s->user->user_id)) {
+  } else if (s->auth.identity->is_admin_of(s->user->get_id())) {
     ldpp_dout(this, 2) << "overriding permissions due to admin operation" << dendl;
   } else if (!verify_object_permission(this, s, part, s->user_acl.get(), bucket_acl,
                                       &obj_policy, bucket_policy, s->iam_user_policies, action)) {
@@ -1467,14 +1684,14 @@ int RGWGetObj::read_user_manifest_part(rgw_bucket& bucket,
 
   perfcounter->inc(l_rgw_get_b, cur_end - cur_ofs);
   filter->fixup_range(cur_ofs, cur_end);
-  op_ret = read_op.iterate(cur_ofs, cur_end, filter);
+  op_ret = read_op.iterate(cur_ofs, cur_end, filter, s->yield);
   if (op_ret >= 0)
          op_ret = filter->flush();
   return op_ret;
 }
 
 static int iterate_user_manifest_parts(CephContext * const cct,
-                                       RGWRados * const store,
+                                       rgw::sal::RGWRadosStore * const store,
                                        const off_t ofs,
                                        const off_t end,
                                        RGWBucketInfo *pbucket_info,
@@ -1503,7 +1720,7 @@ static int iterate_user_manifest_parts(CephContext * const cct,
 
   utime_t start_time = ceph_clock_now();
 
-  RGWRados::Bucket target(store, *pbucket_info);
+  RGWRados::Bucket target(store->getRados(), *pbucket_info);
   RGWRados::Bucket::List list_op(&target);
 
   list_op.params.prefix = obj_prefix;
@@ -1512,7 +1729,7 @@ static int iterate_user_manifest_parts(CephContext * const cct,
   MD5 etag_sum;
   do {
 #define MAX_LIST_OBJS 100
-    int r = list_op.list_objects(MAX_LIST_OBJS, &objs, NULL, &is_truncated);
+    int r = list_op.list_objects(MAX_LIST_OBJS, &objs, NULL, &is_truncated, null_yield);
     if (r < 0) {
       return r;
     }
@@ -1581,7 +1798,7 @@ struct rgw_slo_part {
 };
 
 static int iterate_slo_parts(CephContext *cct,
-                             RGWRados *store,
+                             rgw::sal::RGWRadosStore *store,
                              off_t ofs,
                              off_t end,
                              map<uint64_t, rgw_slo_part>& slo_parts,
@@ -1699,10 +1916,10 @@ int RGWGetObj::handle_user_manifest(const char *prefix)
 
   if (bucket_name.compare(s->bucket.name) != 0) {
     map<string, bufferlist> bucket_attrs;
-    auto obj_ctx = store->svc.sysobj->init_obj_ctx();
-    int r = store->get_bucket_info(obj_ctx, s->user->user_id.tenant,
+    auto obj_ctx = store->svc()->sysobj->init_obj_ctx();
+    int r = store->getRados()->get_bucket_info(store->svc(), s->user->get_tenant(),
                                  bucket_name, bucket_info, NULL,
-                                 &bucket_attrs);
+                                 s->yield, &bucket_attrs);
     if (r < 0) {
       ldpp_dout(this, 0) << "could not get bucket info for bucket="
                       << bucket_name << dendl;
@@ -1833,10 +2050,10 @@ int RGWGetObj::handle_slo_manifest(bufferlist& bl)
 
         RGWBucketInfo bucket_info;
         map<string, bufferlist> bucket_attrs;
-        auto obj_ctx = store->svc.sysobj->init_obj_ctx();
-        int r = store->get_bucket_info(obj_ctx, s->user->user_id.tenant,
+        auto obj_ctx = store->svc()->sysobj->init_obj_ctx();
+        int r = store->getRados()->get_bucket_info(store->svc(), s->user->get_tenant(),
                                        bucket_name, bucket_info, nullptr,
-                                       &bucket_attrs);
+                                       s->yield, &bucket_attrs);
         if (r < 0) {
           ldpp_dout(this, 0) << "could not get bucket info for bucket="
                           << bucket_name << dendl;
@@ -1913,7 +2130,7 @@ int RGWGetObj::get_data_cb(bufferlist& bl, off_t bl_ofs, off_t bl_len)
   /* garbage collection related handling */
   utime_t start_time = ceph_clock_now();
   if (start_time > gc_invalidate_time) {
-    int r = store->defer_gc(s->obj_ctx, s->bucket_info, obj);
+    int r = store->getRados()->defer_gc(s->obj_ctx, s->bucket_info, obj, s->yield);
     if (r < 0) {
       ldpp_dout(this, 0) << "WARNING: could not defer gc entry for obj" << dendl;
     }
@@ -1972,6 +2189,22 @@ static bool object_is_expired(map<string, bufferlist>& attrs) {
   return false;
 }
 
+static inline void rgw_cond_decode_objtags(
+  struct req_state *s,
+  const std::map<std::string, buffer::list> &attrs)
+{
+  const auto& tags = attrs.find(RGW_ATTR_TAGS);
+  if (tags != attrs.end()) {
+    try {
+      bufferlist::const_iterator iter{&tags->second};
+      s->tagset.decode(iter);
+    } catch (buffer::error& err) {
+      ldout(s->cct, 0)
+       << "ERROR: caught buffer::error, couldn't decode TagSet" << dendl;
+    }
+  }
+}
+
 void RGWGetObj::execute()
 {
   bufferlist bl;
@@ -1989,7 +2222,7 @@ void RGWGetObj::execute()
 
   perfcounter->inc(l_rgw_get);
 
-  RGWRados::Object op_target(store, s->bucket_info, *static_cast<RGWObjectCtx *>(s->obj_ctx), obj);
+  RGWRados::Object op_target(store->getRados(), s->bucket_info, *static_cast<RGWObjectCtx *>(s->obj_ctx), obj);
   RGWRados::Object::Read read_op(&op_target);
 
   op_ret = get_params();
@@ -2011,7 +2244,7 @@ void RGWGetObj::execute()
   read_op.params.lastmod = &lastmod;
   read_op.params.obj_size = &s->obj_size;
 
-  op_ret = read_op.prepare();
+  op_ret = read_op.prepare(s->yield);
   if (op_ret < 0)
     goto done_err;
   version_id = read_op.state.obj.key.instance;
@@ -2102,6 +2335,9 @@ void RGWGetObj::execute()
     goto done_err;
   }
 
+  /* Decode S3 objtags, if any */
+  rgw_cond_decode_objtags(s, attrs);
+
   start = ofs;
 
   attr_iter = attrs.find(RGW_ATTR_MANIFEST);
@@ -2124,7 +2360,7 @@ void RGWGetObj::execute()
   ofs_x = ofs;
   end_x = end;
   filter->fixup_range(ofs_x, end_x);
-  op_ret = read_op.iterate(ofs_x, end_x, filter);
+  op_ret = read_op.iterate(ofs_x, end_x, filter, s->yield);
 
   if (op_ret >= 0)
     op_ret = filter->flush();
@@ -2174,7 +2410,7 @@ int RGWListBuckets::verify_permission()
   rgw::Partition partition = rgw::Partition::aws;
   rgw::Service service = rgw::Service::s3;
 
-  if (!verify_user_permission(this, s, ARN(partition, service, "", s->user->user_id.tenant, "*"), rgw::IAM::s3ListAllMyBuckets)) {
+  if (!verify_user_permission(this, s, ARN(partition, service, "", s->user->get_tenant(), "*"), rgw::IAM::s3ListAllMyBuckets)) {
     return -EACCES;
   }
 
@@ -2204,7 +2440,7 @@ void RGWListBuckets::execute()
   }
 
   if (supports_account_metadata()) {
-    op_ret = rgw_get_user_attrs_by_uid(store, s->user->user_id, attrs);
+    op_ret = store->ctl()->user->get_attrs_by_uid(s->user->get_id(), &attrs, s->yield);
     if (op_ret < 0) {
       goto send_end;
     }
@@ -2212,7 +2448,7 @@ void RGWListBuckets::execute()
 
   is_truncated = false;
   do {
-    RGWUserBuckets buckets;
+    rgw::sal::RGWBucketList buckets;
     uint64_t read_count;
     if (limit >= 0) {
       read_count = min(limit - total_count, max_buckets);
@@ -2220,41 +2456,41 @@ void RGWListBuckets::execute()
       read_count = max_buckets;
     }
 
-    op_ret = rgw_read_user_buckets(store, s->user->user_id, buckets,
-                                   marker, end_marker, read_count,
-                                   should_get_stats(), &is_truncated,
-                                   get_default_max());
+    rgw::sal::RGWRadosUser user(store, s->user->get_id());
+
+    op_ret = user.list_buckets(marker, end_marker, read_count, should_get_stats(), buckets);
+
     if (op_ret < 0) {
       /* hmm.. something wrong here.. the user was authenticated, so it
          should exist */
       ldpp_dout(this, 10) << "WARNING: failed on rgw_get_user_buckets uid="
-                       << s->user->user_id << dendl;
+                       << s->user->get_id() << dendl;
       break;
     }
 
     /* We need to have stats for all our policies - even if a given policy
      * isn't actually used in a given account. In such situation its usage
      * stats would be simply full of zeros. */
-    for (const auto& policy : store->svc.zone->get_zonegroup().placement_targets) {
+    for (const auto& policy : store->svc()->zone->get_zonegroup().placement_targets) {
       policies_stats.emplace(policy.second.name,
                              decltype(policies_stats)::mapped_type());
     }
 
-    std::map<std::string, RGWBucketEnt>& m = buckets.get_buckets();
+    std::map<std::string, rgw::sal::RGWBucket*>& m = buckets.get_buckets();
     for (const auto& kv : m) {
       const auto& bucket = kv.second;
 
-      global_stats.bytes_used += bucket.size;
-      global_stats.bytes_used_rounded += bucket.size_rounded;
-      global_stats.objects_count += bucket.count;
+      global_stats.bytes_used += bucket->get_size();
+      global_stats.bytes_used_rounded += bucket->get_size_rounded();
+      global_stats.objects_count += bucket->get_count();
 
       /* operator[] still can create a new entry for storage policy seen
        * for first time. */
-      auto& policy_stats = policies_stats[bucket.placement_rule.to_str()];
-      policy_stats.bytes_used += bucket.size;
-      policy_stats.bytes_used_rounded += bucket.size_rounded;
+      auto& policy_stats = policies_stats[bucket->get_placement_rule().to_str()];
+      policy_stats.bytes_used += bucket->get_size();
+      policy_stats.bytes_used_rounded += bucket->get_size_rounded();
       policy_stats.buckets_count++;
-      policy_stats.objects_count += bucket.count;
+      policy_stats.objects_count += bucket->get_count();
     }
     global_stats.buckets_count += m.size();
     total_count += m.size();
@@ -2266,8 +2502,9 @@ void RGWListBuckets::execute()
       started = true;
     }
 
-    if (!m.empty()) {
-      map<string, RGWBucketEnt>::reverse_iterator riter = m.rbegin();
+    if (read_count > 0 &&
+        !m.empty()) {
+      map<string, rgw::sal::RGWBucket*>::reverse_iterator riter = m.rbegin();
       marker = riter->first;
 
       handle_listing_chunk(std::move(buckets));
@@ -2312,7 +2549,7 @@ void RGWGetUsage::execute()
   RGWUsageIter usage_iter;
   
   while (is_truncated) {
-    op_ret = store->read_usage(s->user->user_id, s->bucket_name, start_epoch, end_epoch, max_entries,
+    op_ret = store->getRados()->read_usage(s->user->get_id(), s->bucket_name, start_epoch, end_epoch, max_entries,
                                 &is_truncated, usage_iter, usage);
 
     if (op_ret == -ENOENT) {
@@ -2325,20 +2562,19 @@ void RGWGetUsage::execute()
     }    
   }
 
-  op_ret = rgw_user_sync_all_stats(store, s->user->user_id);
+  op_ret = rgw_user_sync_all_stats(store, s->user->get_id());
   if (op_ret < 0) {
     ldpp_dout(this, 0) << "ERROR: failed to sync user stats" << dendl;
     return;
   }
 
-  op_ret = rgw_user_get_all_buckets_stats(store, s->user->user_id, buckets_usage);
+  op_ret = rgw_user_get_all_buckets_stats(store, s->user->get_id(), buckets_usage);
   if (op_ret < 0) {
     ldpp_dout(this, 0) << "ERROR: failed to get user's buckets stats" << dendl;
     return;
   }
 
-  string user_str = s->user->user_id.to_str();
-  op_ret = store->cls_user_get_header(user_str, &header);
+  op_ret = store->ctl()->user->read_stats(s->user->get_id(), &stats);
   if (op_ret < 0) {
     ldpp_dout(this, 0) << "ERROR: can't read user header"  << dendl;
     return;
@@ -2359,49 +2595,48 @@ int RGWStatAccount::verify_permission()
 void RGWStatAccount::execute()
 {
   string marker;
-  bool is_truncated = false;
+  rgw::sal::RGWBucketList buckets;
   uint64_t max_buckets = s->cct->_conf->rgw_list_buckets_max_chunk;
 
   do {
-    RGWUserBuckets buckets;
 
-    op_ret = rgw_read_user_buckets(store, s->user->user_id, buckets, marker,
-                                  string(), max_buckets, true, &is_truncated);
+    op_ret = rgw_read_user_buckets(store, s->user->get_id(), buckets, marker,
+                                  string(), max_buckets, true);
     if (op_ret < 0) {
       /* hmm.. something wrong here.. the user was authenticated, so it
          should exist */
       ldpp_dout(this, 10) << "WARNING: failed on rgw_get_user_buckets uid="
-                       << s->user->user_id << dendl;
+                       << s->user->get_id() << dendl;
       break;
     } else {
       /* We need to have stats for all our policies - even if a given policy
        * isn't actually used in a given account. In such situation its usage
        * stats would be simply full of zeros. */
-      for (const auto& policy : store->svc.zone->get_zonegroup().placement_targets) {
+      for (const auto& policy : store->svc()->zone->get_zonegroup().placement_targets) {
         policies_stats.emplace(policy.second.name,
                                decltype(policies_stats)::mapped_type());
       }
 
-      std::map<std::string, RGWBucketEnt>& m = buckets.get_buckets();
+      std::map<std::string, rgw::sal::RGWBucket*>& m = buckets.get_buckets();
       for (const auto& kv : m) {
         const auto& bucket = kv.second;
 
-        global_stats.bytes_used += bucket.size;
-        global_stats.bytes_used_rounded += bucket.size_rounded;
-        global_stats.objects_count += bucket.count;
+        global_stats.bytes_used += bucket->get_size();
+        global_stats.bytes_used_rounded += bucket->get_size_rounded();
+        global_stats.objects_count += bucket->get_count();
 
         /* operator[] still can create a new entry for storage policy seen
          * for first time. */
-        auto& policy_stats = policies_stats[bucket.placement_rule.to_str()];
-        policy_stats.bytes_used += bucket.size;
-        policy_stats.bytes_used_rounded += bucket.size_rounded;
+        auto& policy_stats = policies_stats[bucket->get_placement_rule().to_str()];
+        policy_stats.bytes_used += bucket->get_size();
+        policy_stats.bytes_used_rounded += bucket->get_size_rounded();
         policy_stats.buckets_count++;
-        policy_stats.objects_count += bucket.count;
+        policy_stats.objects_count += bucket->get_count();
       }
       global_stats.buckets_count += m.size();
 
     }
-  } while (is_truncated);
+  } while (buckets.is_truncated());
 }
 
 int RGWGetBucketVersioning::verify_permission()
@@ -2466,7 +2701,7 @@ void RGWSetBucketVersioning::execute()
     }
   }
 
-  if (!store->svc.zone->is_meta_master()) {
+  if (!store->svc()->zone->is_meta_master()) {
     op_ret = forward_request_to_master(s, NULL, store, in_data, nullptr);
     if (op_ret < 0) {
       ldpp_dout(this, 0) << "forward_request_to_master returned ret=" << op_ret << dendl;
@@ -2476,7 +2711,7 @@ void RGWSetBucketVersioning::execute()
 
   bool modified = mfa_set_status;
 
-  op_ret = retry_raced_bucket_write(store, s, [&] {
+  op_ret = retry_raced_bucket_write(store->getRados(), s, [&] {
       if (mfa_set_status) {
         if (mfa_status) {
           s->bucket_info.flags |= BUCKET_MFA_ENABLED;
@@ -2495,7 +2730,7 @@ void RGWSetBucketVersioning::execute()
       } else {
        return op_ret;
       }
-      return store->put_bucket_instance_info(s->bucket_info, false, real_time(),
+      return store->getRados()->put_bucket_instance_info(s->bucket_info, false, real_time(),
                                              &s->bucket_attrs);
     });
 
@@ -2544,7 +2779,7 @@ void RGWSetBucketWebsite::execute()
   if (op_ret < 0)
     return;
 
-  if (!store->svc.zone->is_meta_master()) {
+  if (!store->svc()->zone->is_meta_master()) {
     op_ret = forward_request_to_master(s, NULL, store, in_data, nullptr);
     if (op_ret < 0) {
       ldpp_dout(this, 0) << " forward_request_to_master returned ret=" << op_ret << dendl;
@@ -2552,10 +2787,10 @@ void RGWSetBucketWebsite::execute()
     }
   }
 
-  op_ret = retry_raced_bucket_write(store, s, [this] {
+  op_ret = retry_raced_bucket_write(store->getRados(), s, [this] {
       s->bucket_info.has_website = true;
       s->bucket_info.website_conf = website_conf;
-      op_ret = store->put_bucket_instance_info(s->bucket_info, false,
+      op_ret = store->getRados()->put_bucket_instance_info(s->bucket_info, false,
                                               real_time(), &s->bucket_attrs);
       return op_ret;
     });
@@ -2580,7 +2815,7 @@ void RGWDeleteBucketWebsite::pre_exec()
 void RGWDeleteBucketWebsite::execute()
 {
 
-  if (!store->svc.zone->is_meta_master()) {
+  if (!store->svc()->zone->is_meta_master()) {
     bufferlist in_data;
     op_ret = forward_request_to_master(s, nullptr, store, in_data, nullptr);
     if (op_ret < 0) {
@@ -2589,10 +2824,10 @@ void RGWDeleteBucketWebsite::execute()
       return;
     }
   }
-  op_ret = retry_raced_bucket_write(store, s, [this] {
+  op_ret = retry_raced_bucket_write(store->getRados(), s, [this] {
       s->bucket_info.has_website = false;
       s->bucket_info.website_conf = RGWBucketWebsiteConf();
-      op_ret = store->put_bucket_instance_info(s->bucket_info, false,
+      op_ret = store->getRados()->put_bucket_instance_info(s->bucket_info, false,
                                               real_time(), &s->bucket_attrs);
       return op_ret;
     });
@@ -2625,22 +2860,9 @@ void RGWStatBucket::execute()
     return;
   }
 
-  RGWUserBuckets buckets;
-  bucket.bucket = s->bucket;
-  buckets.add(bucket);
-  map<string, RGWBucketEnt>& m = buckets.get_buckets();
-  op_ret = store->update_containers_stats(m);
-  if (! op_ret)
-    op_ret = -EEXIST;
-  if (op_ret > 0) {
-    op_ret = 0;
-    map<string, RGWBucketEnt>::iterator iter = m.find(bucket.bucket.name);
-    if (iter != m.end()) {
-      bucket = iter->second;
-    } else {
-      op_ret = -EINVAL;
-    }
-  }
+  rgw::sal::RGWRadosUser user(store, s->user->get_id());
+  bucket = new rgw::sal::RGWRadosBucket(store, user, s->bucket);
+  op_ret = bucket->update_container_stats();
 }
 
 int RGWListBucket::verify_permission()
@@ -2699,16 +2921,10 @@ void RGWListBucket::execute()
   }
 
   if (need_container_stats()) {
-    map<string, RGWBucketEnt> m;
-    m[s->bucket.name] = RGWBucketEnt();
-    m.begin()->second.bucket = s->bucket;
-    op_ret = store->update_containers_stats(m);
-    if (op_ret > 0) {
-      bucket = m.begin()->second;
-    }
+    op_ret = bucket->update_container_stats();
   }
 
-  RGWRados::Bucket target(store, s->bucket_info);
+  RGWRados::Bucket target(store->getRados(), s->bucket_info);
   if (shard_id >= 0) {
     target.set_shard_id(shard_id);
   }
@@ -2721,7 +2937,7 @@ void RGWListBucket::execute()
   list_op.params.list_versions = list_versions;
   list_op.params.allow_unordered = allow_unordered;
 
-  op_ret = list_op.list_objects(max, &objs, &common_prefixes, &is_truncated);
+  op_ret = list_op.list_objects(max, &objs, &common_prefixes, &is_truncated, s->yield);
   if (op_ret >= 0) {
     next_marker = list_op.get_next_marker();
   }
@@ -2754,29 +2970,28 @@ int RGWCreateBucket::verify_permission()
     return -EACCES;
   }
 
-  if (s->user->user_id.tenant != s->bucket_tenant) {
+  if (s->user->get_tenant() != s->bucket_tenant) {
     ldpp_dout(this, 10) << "user cannot create a bucket in a different tenant"
-                      << " (user_id.tenant=" << s->user->user_id.tenant
+                      << " (user_id.tenant=" << s->user->get_tenant()
                       << " requested=" << s->bucket_tenant << ")"
                       << dendl;
     return -EACCES;
   }
-  if (s->user->max_buckets < 0) {
+  if (s->user->get_max_buckets() < 0) {
     return -EPERM;
   }
 
-  if (s->user->max_buckets) {
-    RGWUserBuckets buckets;
+  if (s->user->get_max_buckets()) {
+    rgw::sal::RGWBucketList buckets;
     string marker;
-    bool is_truncated = false;
-    op_ret = rgw_read_user_buckets(store, s->user->user_id, buckets,
-                                  marker, string(), s->user->max_buckets,
-                                  false, &is_truncated);
+    op_ret = rgw_read_user_buckets(store, s->user->get_id(), buckets,
+                                  marker, string(), s->user->get_max_buckets(),
+                                  false);
     if (op_ret < 0) {
       return op_ret;
     }
 
-    if ((int)buckets.count() >= s->user->max_buckets) {
+    if ((int)buckets.count() >= s->user->get_max_buckets()) {
       return -ERR_TOO_MANY_BUCKETS;
     }
   }
@@ -2784,19 +2999,19 @@ int RGWCreateBucket::verify_permission()
   return 0;
 }
 
-static int forward_request_to_master(struct req_state *s, obj_version *objv,
-                                   RGWRados *store, bufferlist& in_data,
-                                   JSONParser *jp, req_info *forward_info)
+int forward_request_to_master(struct req_state *s, obj_version *objv,
+                              rgw::sal::RGWRadosStore *store, bufferlist& in_data,
+                              JSONParser *jp, req_info *forward_info)
 {
-  if (!store->svc.zone->get_master_conn()) {
+  if (!store->svc()->zone->get_master_conn()) {
     ldpp_dout(s, 0) << "rest connection is invalid" << dendl;
     return -EINVAL;
   }
   ldpp_dout(s, 0) << "sending request to master zonegroup" << dendl;
   bufferlist response;
-  string uid_str = s->user->user_id.to_str();
+  string uid_str = s->user->get_id().to_str();
 #define MAX_REST_RESPONSE (128 * 1024) // we expect a very small response
-  int ret = store->svc.zone->get_master_conn()->forward(uid_str, (forward_info ? *forward_info : s->info),
+  int ret = store->svc()->zone->get_master_conn()->forward(rgw_user(uid_str), (forward_info ? *forward_info : s->info),
                                                         objv, MAX_REST_RESPONSE, &in_data, &response);
   if (ret < 0)
     return ret;
@@ -2987,9 +3202,8 @@ void RGWCreateBucket::execute()
   buffer::list aclbl;
   buffer::list corsbl;
   bool existed;
-  string bucket_name;
-  rgw_make_bucket_entry_name(s->bucket_tenant, s->bucket_name, bucket_name);
-  rgw_raw_obj obj(store->svc.zone->get_zone_params().domain_root, bucket_name);
+  string bucket_name = rgw_make_bucket_entry_name(s->bucket_tenant, s->bucket_name);
+  rgw_raw_obj obj(store->svc()->zone->get_zone_params().domain_root, bucket_name);
   obj_version objv, *pobjv = NULL;
 
   op_ret = get_params();
@@ -2998,7 +3212,7 @@ void RGWCreateBucket::execute()
 
   if (!relaxed_region_enforcement &&
       !location_constraint.empty() &&
-      !store->svc.zone->has_zonegroup_api(location_constraint)) {
+      !store->svc()->zone->has_zonegroup_api(location_constraint)) {
       ldpp_dout(this, 0) << "location constraint (" << location_constraint << ")"
                        << " can't be found." << dendl;
       op_ret = -ERR_INVALID_LOCATION_CONSTRAINT;
@@ -3006,22 +3220,22 @@ void RGWCreateBucket::execute()
       return;
   }
 
-  if (!relaxed_region_enforcement && !store->svc.zone->get_zonegroup().is_master_zonegroup() && !location_constraint.empty() &&
-      store->svc.zone->get_zonegroup().api_name != location_constraint) {
+  if (!relaxed_region_enforcement && !store->svc()->zone->get_zonegroup().is_master_zonegroup() && !location_constraint.empty() &&
+      store->svc()->zone->get_zonegroup().api_name != location_constraint) {
     ldpp_dout(this, 0) << "location constraint (" << location_constraint << ")"
-                     << " doesn't match zonegroup" << " (" << store->svc.zone->get_zonegroup().api_name << ")"
+                     << " doesn't match zonegroup" << " (" << store->svc()->zone->get_zonegroup().api_name << ")"
                      << dendl;
     op_ret = -ERR_INVALID_LOCATION_CONSTRAINT;
     s->err.message = "The specified location-constraint is not valid";
     return;
   }
 
-  const auto& zonegroup = store->svc.zone->get_zonegroup();
+  const auto& zonegroup = store->svc()->zone->get_zonegroup();
   if (!placement_rule.name.empty() &&
       !zonegroup.placement_targets.count(placement_rule.name)) {
     ldpp_dout(this, 0) << "placement target (" << placement_rule.name << ")"
                      << " doesn't exist in the placement targets of zonegroup"
-                     << " (" << store->svc.zone->get_zonegroup().api_name << ")" << dendl;
+                     << " (" << store->svc()->zone->get_zonegroup().api_name << ")" << dendl;
     op_ret = -ERR_INVALID_LOCATION_CONSTRAINT;
     s->err.message = "The specified placement target does not exist";
     return;
@@ -3029,19 +3243,24 @@ void RGWCreateBucket::execute()
 
   /* we need to make sure we read bucket info, it's not read before for this
    * specific request */
-  op_ret = store->get_bucket_info(*s->sysobj_ctx, s->bucket_tenant, s->bucket_name,
-                                 s->bucket_info, nullptr, &s->bucket_attrs);
+  s->bucket.tenant = s->bucket_tenant;
+  s->bucket.name = s->bucket_name;
+  rgw::sal::RGWBucket* bucket = NULL;
+  op_ret = store->get_bucket(*s->user, s->bucket, &bucket);
   if (op_ret < 0 && op_ret != -ENOENT)
     return;
   s->bucket_exists = (op_ret != -ENOENT);
 
-  s->bucket_owner.set_id(s->user->user_id);
-  s->bucket_owner.set_name(s->user->display_name);
+  s->bucket_owner.set_id(s->user->get_id());
+  s->bucket_owner.set_name(s->user->get_display_name());
   if (s->bucket_exists) {
+    s->bucket_info = bucket->get_info();
+    s->bucket_attrs = bucket->get_attrs();
+    delete bucket;
     int r = rgw_op_get_bucket_policy_from_attr(s->cct, store, s->bucket_info,
                                                s->bucket_attrs, &old_policy);
     if (r >= 0)  {
-      if (old_policy.get_owner().get_id().compare(s->user->user_id) != 0) {
+      if (old_policy.get_owner().get_id().compare(s->user->get_id()) != 0) {
         op_ret = -EEXIST;
         return;
       }
@@ -3053,7 +3272,7 @@ void RGWCreateBucket::execute()
   uint32_t *pmaster_num_shards;
   real_time creation_time;
 
-  if (!store->svc.zone->is_meta_master()) {
+  if (!store->svc()->zone->is_meta_master()) {
     JSONParser jp;
     op_ret = forward_request_to_master(s, NULL, store, in_data, &jp);
     if (op_ret < 0) {
@@ -3080,10 +3299,10 @@ void RGWCreateBucket::execute()
   if (s->system_request) {
     zonegroup_id = s->info.args.get(RGW_SYS_PARAM_PREFIX "zonegroup");
     if (zonegroup_id.empty()) {
-      zonegroup_id = store->svc.zone->get_zonegroup().get_id();
+      zonegroup_id = store->svc()->zone->get_zonegroup().get_id();
     }
   } else {
-    zonegroup_id = store->svc.zone->get_zonegroup().get_id();
+    zonegroup_id = store->svc()->zone->get_zonegroup().get_id();
   }
 
   if (s->bucket_exists) {
@@ -3091,7 +3310,8 @@ void RGWCreateBucket::execute()
     rgw_bucket bucket;
     bucket.tenant = s->bucket_tenant;
     bucket.name = s->bucket_name;
-    op_ret = store->svc.zone->select_bucket_placement(*(s->user), zonegroup_id,
+    op_ret = store->svc()->zone->select_bucket_placement(s->user->get_info(),
+                                           zonegroup_id,
                                            placement_rule,
                                            &selected_placement_rule, nullptr);
     if (selected_placement_rule != s->bucket_info.placement_rule) {
@@ -3148,7 +3368,7 @@ void RGWCreateBucket::execute()
   }
 
 
-  op_ret = store->create_bucket(*(s->user), s->bucket, zonegroup_id,
+  op_ret = store->getRados()->create_bucket(s->user->get_info(), s->bucket, zonegroup_id,
                                 placement_rule, s->bucket_info.swift_ver_location,
                                 pquota_info, attrs,
                                 info, pobjv, &ep_objv, creation_time,
@@ -3169,19 +3389,18 @@ void RGWCreateBucket::execute()
      * If all is ok then update the user's list of buckets.
      * Otherwise inform client about a name conflict.
      */
-    if (info.owner.compare(s->user->user_id) != 0) {
+    if (info.owner.compare(s->user->get_id()) != 0) {
       op_ret = -EEXIST;
       return;
     }
     s->bucket = info.bucket;
   }
 
-  op_ret = rgw_link_bucket(store, s->user->user_id, s->bucket,
-                          info.creation_time, false);
+  op_ret = store->ctl()->bucket->link_bucket(s->user->get_id(), s->bucket,
+                                          info.creation_time, s->yield, false);
   if (op_ret && !existed && op_ret != -EEXIST) {
     /* if it exists (or previously existed), don't remove it! */
-    op_ret = rgw_unlink_bucket(store, s->user->user_id, s->bucket.tenant,
-                              s->bucket.name);
+    op_ret = store->ctl()->bucket->unlink_bucket(s->user->get_id(), s->bucket, s->yield);
     if (op_ret < 0) {
       ldpp_dout(this, 0) << "WARNING: failed to unlink bucket: ret=" << op_ret
                       << dendl;
@@ -3200,11 +3419,11 @@ void RGWCreateBucket::execute()
       RGWBucketInfo binfo;
       map<string, bufferlist> battrs;
 
-      op_ret = store->get_bucket_info(*s->sysobj_ctx, s->bucket_tenant, s->bucket_name,
-                                      binfo, nullptr, &battrs);
+      op_ret = store->getRados()->get_bucket_info(store->svc(), s->bucket_tenant, s->bucket_name,
+                                      binfo, nullptr, s->yield, &battrs);
       if (op_ret < 0) {
         return;
-      } else if (binfo.owner.compare(s->user->user_id) != 0) {
+      } else if (binfo.owner.compare(s->user->get_id()) != 0) {
         /* New bucket doesn't belong to the account we're operating on. */
         op_ret = -EEXIST;
         return;
@@ -3237,8 +3456,9 @@ void RGWCreateBucket::execute()
       s->bucket_info.has_website = !s->bucket_info.website_conf.is_empty();
 
       /* This will also set the quota on the bucket. */
-      op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
-                                    &s->bucket_info.objv_tracker);
+      op_ret = store->ctl()->bucket->set_bucket_instance_attrs(s->bucket_info, attrs,
+                                                           &s->bucket_info.objv_tracker,
+                                                           s->yield);
     } while (op_ret == -ECANCELED && tries++ < 20);
 
     /* Restore the proper return code. */
@@ -3275,7 +3495,7 @@ void RGWDeleteBucket::execute()
     return;
   }
   RGWObjVersionTracker ot;
-  ot.read_version = s->bucket_info.ep_objv;
+  ot.read_version = s->bucket_ep_objv;
 
   if (s->system_request) {
     string tag = s->info.args.get(RGW_SYS_PARAM_PREFIX "tag");
@@ -3294,17 +3514,17 @@ void RGWDeleteBucket::execute()
     }
   }
 
-  op_ret = rgw_bucket_sync_user_stats(store, s->user->user_id, s->bucket_info);
+  op_ret = store->ctl()->bucket->sync_user_stats(s->user->get_id(), s->bucket_info);
   if ( op_ret < 0) {
      ldpp_dout(this, 1) << "WARNING: failed to sync user stats before bucket delete: op_ret= " << op_ret << dendl;
   }
   
-  op_ret = store->check_bucket_empty(s->bucket_info);
+  op_ret = store->getRados()->check_bucket_empty(s->bucket_info, s->yield);
   if (op_ret < 0) {
     return;
   }
 
-  if (!store->svc.zone->is_meta_master()) {
+  if (!store->svc()->zone->is_meta_master()) {
     bufferlist in_data;
     op_ret = forward_request_to_master(s, &ot.read_version, store, in_data,
                                       NULL);
@@ -3339,18 +3559,18 @@ void RGWDeleteBucket::execute()
     return;
   }
 
-  op_ret = store->delete_bucket(s->bucket_info, ot, false);
+  op_ret = store->getRados()->delete_bucket(s->bucket_info, ot, s->yield, false);
 
   if (op_ret == -ECANCELED) {
     // lost a race, either with mdlog sync or another delete bucket operation.
-    // in either case, we've already called rgw_unlink_bucket()
+    // in either case, we've already called ctl.bucket->unlink_bucket()
     op_ret = 0;
     return;
   }
 
   if (op_ret == 0) {
-    op_ret = rgw_unlink_bucket(store, s->bucket_info.owner, s->bucket.tenant,
-                              s->bucket.name, false);
+    op_ret = store->ctl()->bucket->unlink_bucket(s->bucket_info.owner,
+                                              s->bucket, s->yield, false);
     if (op_ret < 0) {
       ldpp_dout(this, 0) << "WARNING: failed to unlink bucket: ret=" << op_ret
                       << dendl;
@@ -3369,8 +3589,8 @@ int RGWPutObj::verify_permission()
     rgw_obj_key cs_object(copy_source_object_name, copy_source_version_id);
 
     rgw_obj obj(cs_bucket, cs_object);
-    store->set_atomic(s->obj_ctx, obj);
-    store->set_prefetch_data(s->obj_ctx, obj);
+    store->getRados()->set_atomic(s->obj_ctx, obj);
+    store->getRados()->set_prefetch_data(s->obj_ctx, obj);
 
     /* check source object permissions */
     if (read_obj_policy(store, s, copy_source_bucket_info, cs_attrs, &cs_acl, nullptr,
@@ -3414,6 +3634,13 @@ int RGWPutObj::verify_permission()
     }
   }
 
+  if (s->bucket_access_conf && s->bucket_access_conf->block_public_acls()) {
+    if (s->canned_acl.compare("public-read") ||
+        s->canned_acl.compare("public-read-write") ||
+        s->canned_acl.compare("authenticated-read"))
+      return -EACCES;
+  }
+
   auto op_ret = get_params();
   if (op_ret < 0) {
     ldpp_dout(this, 20) << "get_params() returned ret=" << op_ret << dendl;
@@ -3496,7 +3723,7 @@ public:
 int RGWPutObj::get_data_cb(bufferlist& bl, off_t bl_ofs, off_t bl_len)
 {
   bufferlist bl_tmp;
-  bl.copy(bl_ofs, bl_len, bl_tmp);
+  bl.begin(bl_ofs).copy(bl_len, bl_tmp);
 
   bl_aux.append(bl_tmp);
 
@@ -3523,12 +3750,12 @@ int RGWPutObj::get_data(const off_t fst, const off_t lst, bufferlist& bl)
   rgw_obj_key obj_key(copy_source_object_name, copy_source_version_id);
   rgw_obj obj(copy_source_bucket_info.bucket, obj_key);
 
-  RGWRados::Object op_target(store, copy_source_bucket_info, *static_cast<RGWObjectCtx *>(s->obj_ctx), obj);
+  RGWRados::Object op_target(store->getRados(), copy_source_bucket_info, *static_cast<RGWObjectCtx *>(s->obj_ctx), obj);
   RGWRados::Object::Read read_op(&op_target);
   read_op.params.obj_size = &obj_size;
   read_op.params.attrs = &attrs;
 
-  ret = read_op.prepare();
+  ret = read_op.prepare(s->yield);
   if (ret < 0)
     return ret;
 
@@ -3564,7 +3791,7 @@ int RGWPutObj::get_data(const off_t fst, const off_t lst, bufferlist& bl)
     return ret;
 
   filter->fixup_range(new_ofs, new_end);
-  ret = read_op.iterate(new_ofs, new_end, filter);
+  ret = read_op.iterate(new_ofs, new_end, filter, s->yield);
 
   if (ret >= 0)
     ret = filter->flush();
@@ -3653,17 +3880,12 @@ void RGWPutObj::execute()
 
   if (!chunked_upload) { /* with chunked upload we don't know how big is the upload.
                             we also check sizes at the end anyway */
-    op_ret = store->check_quota(s->bucket_owner.get_id(), s->bucket,
+    op_ret = store->getRados()->check_quota(s->bucket_owner.get_id(), s->bucket,
                                user_quota, bucket_quota, s->content_length);
     if (op_ret < 0) {
       ldpp_dout(this, 20) << "check_quota() returned ret=" << op_ret << dendl;
       return;
     }
-    op_ret = store->check_bucket_shards(s->bucket_info, s->bucket, bucket_quota);
-    if (op_ret < 0) {
-      ldpp_dout(this, 20) << "check_bucket_shards() returned ret=" << op_ret << dendl;
-      return;
-    }
   }
 
   if (supplied_etag) {
@@ -3677,17 +3899,20 @@ void RGWPutObj::execute()
 
   /* Handle object versioning of Swift API. */
   if (! multipart) {
-    op_ret = store->swift_versioning_copy(obj_ctx,
+    op_ret = store->getRados()->swift_versioning_copy(obj_ctx,
                                           s->bucket_owner.get_id(),
                                           s->bucket_info,
-                                          obj);
+                                          obj,
+                                          this,
+                                          s->yield);
     if (op_ret < 0) {
       return;
     }
   }
 
   // create the object processor
-  rgw::AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
+  auto aio = rgw::make_throttle(s->cct->_conf->rgw_put_obj_min_window_size,
+                                s->yield);
   using namespace rgw::putobj;
   constexpr auto max_processor_size = std::max({sizeof(MultipartObjectProcessor),
                                                sizeof(AtomicObjectProcessor),
@@ -3712,9 +3937,10 @@ void RGWPutObj::execute()
     pdest_placement = &upload_info.dest_placement;
     ldpp_dout(this, 20) << "dest_placement for part=" << upload_info.dest_placement << dendl;
     processor.emplace<MultipartObjectProcessor>(
-        &aio, store, s->bucket_info, pdest_placement,
+        &*aio, store, s->bucket_info, pdest_placement,
         s->owner.get_id(), obj_ctx, obj,
-        multipart_upload_id, multipart_part_num, multipart_part_str);
+        multipart_upload_id, multipart_part_num, multipart_part_str,
+        this, s->yield);
   } else if(append) {
     if (s->bucket_info.versioned()) {
       op_ret = -ERR_INVALID_BUCKET_STATE;
@@ -3722,24 +3948,25 @@ void RGWPutObj::execute()
     }
     pdest_placement = &s->dest_placement;
     processor.emplace<AppendObjectProcessor>(
-            &aio, store, s->bucket_info, pdest_placement, s->bucket_owner.get_id(),obj_ctx, obj,
-            s->req_id, position, &cur_accounted_size);
+            &*aio, store, s->bucket_info, pdest_placement, s->bucket_owner.get_id(),obj_ctx, obj,
+            s->req_id, position, &cur_accounted_size, this, s->yield);
   } else {
     if (s->bucket_info.versioning_enabled()) {
       if (!version_id.empty()) {
         obj.key.set_instance(version_id);
       } else {
-        store->gen_rand_obj_instance_name(&obj);
+        store->getRados()->gen_rand_obj_instance_name(&obj);
         version_id = obj.key.instance;
       }
     }
     pdest_placement = &s->dest_placement;
     processor.emplace<AtomicObjectProcessor>(
-        &aio, store, s->bucket_info, pdest_placement,
-        s->bucket_owner.get_id(), obj_ctx, obj, olh_epoch, s->req_id);
+        &*aio, store, s->bucket_info, pdest_placement,
+        s->bucket_owner.get_id(), obj_ctx, obj, olh_epoch,
+        s->req_id, this, s->yield);
   }
 
-  op_ret = processor->prepare();
+  op_ret = processor->prepare(s->yield);
   if (op_ret < 0) {
     ldpp_dout(this, 20) << "processor->prepare() returned ret=" << op_ret
                      << dendl;
@@ -3751,8 +3978,8 @@ void RGWPutObj::execute()
     rgw_obj obj(copy_source_bucket_info.bucket, obj_key.name);
 
     RGWObjState *astate;
-    op_ret = store->get_obj_state(&obj_ctx, copy_source_bucket_info, obj,
-                                  &astate, true, false);
+    op_ret = store->getRados()->get_obj_state(&obj_ctx, copy_source_bucket_info, obj,
+                                  &astate, true, s->yield, false);
     if (op_ret < 0) {
       ldpp_dout(this, 0) << "ERROR: get copy source obj state returned with error" << op_ret << dendl;
       return;
@@ -3771,7 +3998,7 @@ void RGWPutObj::execute()
   // no filters by default
   DataProcessor *filter = processor.get();
 
-  const auto& compression_type = store->svc.zone->get_zone_params().get_compression_type(*pdest_placement);
+  const auto& compression_type = store->svc()->zone->get_zone_params().get_compression_type(*pdest_placement);
   CompressorRef plugin;
   boost::optional<RGWPutObj_Compress> compressor;
 
@@ -3856,19 +4083,13 @@ void RGWPutObj::execute()
     return;
   }
 
-  op_ret = store->check_quota(s->bucket_owner.get_id(), s->bucket,
+  op_ret = store->getRados()->check_quota(s->bucket_owner.get_id(), s->bucket,
                               user_quota, bucket_quota, s->obj_size);
   if (op_ret < 0) {
     ldpp_dout(this, 20) << "second check_quota() returned op_ret=" << op_ret << dendl;
     return;
   }
 
-  op_ret = store->check_bucket_shards(s->bucket_info, s->bucket, bucket_quota);
-  if (op_ret < 0) {
-    ldpp_dout(this, 20) << "check_bucket_shards() returned ret=" << op_ret << dendl;
-    return;
-  }
-
   hash.Final(m);
 
   if (compressor && compressor->is_compressed()) {
@@ -3925,6 +4146,7 @@ void RGWPutObj::execute()
   }
   encode_delete_at_attr(delete_at, attrs);
   encode_obj_tags_attr(obj_tags.get(), attrs);
+  rgw_cond_decode_objtags(s, attrs);
 
   /* Add a custom metadata to expose the information whether an object
    * is an SLO or not. Appending the attribute must be performed AFTER
@@ -3948,7 +4170,8 @@ void RGWPutObj::execute()
   tracepoint(rgw_op, processor_complete_enter, s->req_id.c_str());
   op_ret = processor->complete(s->obj_size, etag, &mtime, real_time(), attrs,
                                (delete_at ? *delete_at : real_time()), if_match, if_nomatch,
-                               (user_data.empty() ? nullptr : &user_data), nullptr, nullptr);
+                               (user_data.empty() ? nullptr : &user_data), nullptr, nullptr,
+                               s->yield);
   tracepoint(rgw_op, processor_complete_exit, s->req_id.c_str());
 
   /* produce torrent */
@@ -3965,7 +4188,7 @@ void RGWPutObj::execute()
   }
 
   // send request to notification manager
-  const auto ret = rgw::notify::publish(s, mtime, etag, rgw::notify::ObjectCreatedPut, store);
+  const auto ret = rgw::notify::publish(s, s->object, s->obj_size, mtime, etag, rgw::notify::ObjectCreatedPut, store);
   if (ret < 0) {
     ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
        // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
@@ -4038,7 +4261,7 @@ void RGWPostObj::execute()
     ceph::buffer::list bl, aclbl;
     int len = 0;
 
-    op_ret = store->check_quota(s->bucket_owner.get_id(),
+    op_ret = store->getRados()->check_quota(s->bucket_owner.get_id(),
                                 s->bucket,
                                 user_quota,
                                 bucket_quota,
@@ -4047,11 +4270,6 @@ void RGWPostObj::execute()
       return;
     }
 
-    op_ret = store->check_bucket_shards(s->bucket_info, s->bucket, bucket_quota);
-    if (op_ret < 0) {
-      return;
-    }
-
     if (supplied_md5_b64) {
       char supplied_md5_bin[CEPH_CRYPTO_MD5_DIGESTSIZE + 1];
       ldpp_dout(this, 15) << "supplied_md5_b64=" << supplied_md5_b64 << dendl;
@@ -4069,18 +4287,19 @@ void RGWPostObj::execute()
 
     rgw_obj obj(s->bucket, get_current_filename());
     if (s->bucket_info.versioning_enabled()) {
-      store->gen_rand_obj_instance_name(&obj);
+      store->getRados()->gen_rand_obj_instance_name(&obj);
     }
 
-    rgw::AioThrottle aio(s->cct->_conf->rgw_put_obj_min_window_size);
+    auto aio = rgw::make_throttle(s->cct->_conf->rgw_put_obj_min_window_size,
+                                  s->yield);
 
     using namespace rgw::putobj;
-    AtomicObjectProcessor processor(&aio, store, s->bucket_info,
+    AtomicObjectProcessor processor(&*aio, store, s->bucket_info,
                                     &s->dest_placement,
                                     s->bucket_owner.get_id(),
                                     *static_cast<RGWObjectCtx*>(s->obj_ctx),
-                                    obj, 0, s->req_id);
-    op_ret = processor.prepare();
+                                    obj, 0, s->req_id, this, s->yield);
+    op_ret = processor.prepare(s->yield);
     if (op_ret < 0) {
       return;
     }
@@ -4096,7 +4315,7 @@ void RGWPostObj::execute()
     if (encrypt != nullptr) {
       filter = encrypt.get();
     } else {
-      const auto& compression_type = store->svc.zone->get_zone_params().get_compression_type(
+      const auto& compression_type = store->svc()->zone->get_zone_params().get_compression_type(
           s->dest_placement);
       if (compression_type != "none") {
         plugin = Compressor::create(s->cct, compression_type);
@@ -4149,17 +4368,12 @@ void RGWPostObj::execute()
     s->obj_size = ofs;
 
 
-    op_ret = store->check_quota(s->bucket_owner.get_id(), s->bucket,
+    op_ret = store->getRados()->check_quota(s->bucket_owner.get_id(), s->bucket,
                                 user_quota, bucket_quota, s->obj_size);
     if (op_ret < 0) {
       return;
     }
 
-    op_ret = store->check_bucket_shards(s->bucket_info, s->bucket, bucket_quota);
-    if (op_ret < 0) {
-      return;
-    }
-
     hash.Final(m);
     buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5);
 
@@ -4195,13 +4409,14 @@ void RGWPostObj::execute()
 
     op_ret = processor.complete(s->obj_size, etag, nullptr, real_time(), attrs,
                                 (delete_at ? *delete_at : real_time()),
-                                nullptr, nullptr, nullptr, nullptr, nullptr);
+                                nullptr, nullptr, nullptr, nullptr, nullptr,
+                                s->yield);
     if (op_ret < 0) {
       return;
     }
   } while (is_next_file_to_upload());
 
-  const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedPost, store);
+  const auto ret = rgw::notify::publish(s, s->object, s->obj_size, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedPost, store);
   if (ret < 0) {
     ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
        // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
@@ -4255,8 +4470,9 @@ int RGWPutMetadataAccount::init_processing()
     return op_ret;
   }
 
-  op_ret = rgw_get_user_attrs_by_uid(store, s->user->user_id, orig_attrs,
-                                     &acct_op_tracker);
+  op_ret = store->ctl()->user->get_attrs_by_uid(s->user->get_id(), &orig_attrs,
+                                            s->yield,
+                                             &acct_op_tracker);
   if (op_ret < 0) {
     return op_ret;
   }
@@ -4317,8 +4533,9 @@ void RGWPutMetadataAccount::execute()
 {
   /* Params have been extracted earlier. See init_processing(). */
   RGWUserInfo new_uinfo;
-  op_ret = rgw_get_user_info_by_uid(store, s->user->user_id, new_uinfo,
-                                    &acct_op_tracker);
+  op_ret = store->ctl()->user->get_info_by_uid(s->user->get_id(), &new_uinfo, s->yield,
+                                            RGWUserCtl::GetParams()
+                                            .set_objv_tracker(&acct_op_tracker));
   if (op_ret < 0) {
     return;
   }
@@ -4337,8 +4554,11 @@ void RGWPutMetadataAccount::execute()
 
   /* We are passing here the current (old) user info to allow the function
    * optimize-out some operations. */
-  op_ret = rgw_store_user_info(store, new_uinfo, s->user,
-                               &acct_op_tracker, real_time(), false, &attrs);
+  op_ret = store->ctl()->user->store_info(new_uinfo, s->yield,
+                                       RGWUserCtl::PutParams()
+                                       .set_old_info(&s->user->get_info())
+                                       .set_objv_tracker(&acct_op_tracker)
+                                       .set_attrs(&attrs));
 }
 
 int RGWPutMetadataBucket::verify_permission()
@@ -4373,7 +4593,7 @@ void RGWPutMetadataBucket::execute()
     return;
   }
 
-  op_ret = retry_raced_bucket_write(store, s, [this] {
+  op_ret = retry_raced_bucket_write(store->getRados(), s, [this] {
       /* Encode special metadata first as we're using std::map::emplace under
        * the hood. This method will add the new items only if the map doesn't
        * contain such keys yet. */
@@ -4424,8 +4644,9 @@ void RGWPutMetadataBucket::execute()
       /* Setting attributes also stores the provided bucket info. Due
        * to this fact, the new quota settings can be serialized with
        * the same call. */
-      op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
-                                   &s->bucket_info.objv_tracker);
+      op_ret = store->ctl()->bucket->set_bucket_instance_attrs(s->bucket_info, attrs,
+                                                           &s->bucket_info.objv_tracker,
+                                                           s->yield);
       return op_ret;
     });
 }
@@ -4452,7 +4673,7 @@ void RGWPutMetadataObject::execute()
   rgw_obj target_obj;
   map<string, bufferlist> attrs, orig_attrs, rmattrs;
 
-  store->set_atomic(s->obj_ctx, obj);
+  store->getRados()->set_atomic(s->obj_ctx, obj);
 
   op_ret = get_params();
   if (op_ret < 0) {
@@ -4490,7 +4711,8 @@ void RGWPutMetadataObject::execute()
     }
   }
 
-  op_ret = store->set_attrs(s->obj_ctx, s->bucket_info, target_obj, attrs, &rmattrs);
+  op_ret = store->getRados()->set_attrs(s->obj_ctx, s->bucket_info, target_obj,
+    attrs, &rmattrs, s->yield);
 }
 
 int RGWDeleteObj::handle_slo_manifest(bufferlist& bl)
@@ -4620,9 +4842,10 @@ void RGWDeleteObj::execute()
   bool check_obj_lock = obj.key.have_instance() && s->bucket_info.obj_lock_enabled();
 
   if (!s->object.empty()) {
+    op_ret = get_obj_attrs(store, s, obj, attrs);
+
     if (need_object_expiration() || multipart_delete) {
       /* check if obj exists, read orig attrs */
-      op_ret = get_obj_attrs(store, s, obj, attrs);
       if (op_ret < 0) {
         return;
       }
@@ -4630,7 +4853,6 @@ void RGWDeleteObj::execute()
 
     if (check_obj_lock) {
       /* check if obj exists, read orig attrs */
-      op_ret = get_obj_attrs(store, s, obj, attrs);
       if (op_ret < 0) {
         if (op_ret == -ENOENT) {
           /* object maybe delete_marker, skip check_obj_lock*/
@@ -4641,6 +4863,9 @@ void RGWDeleteObj::execute()
       }
     }
 
+    // ignore return value from get_obj_attrs in all other cases
+    op_ret = 0;
+
     if (check_obj_lock) {
       auto aiter = attrs.find(RGW_ATTR_OBJECT_RETENTION);
       if (aiter != attrs.end()) {
@@ -4695,8 +4920,8 @@ void RGWDeleteObj::execute()
     obj_ctx->set_atomic(obj);
 
     bool ver_restored = false;
-    op_ret = store->swift_versioning_restore(*s->sysobj_ctx, *obj_ctx, s->bucket_owner.get_id(),
-                                             s->bucket_info, obj, ver_restored);
+    op_ret = store->getRados()->swift_versioning_restore(*obj_ctx, s->bucket_owner.get_id(),
+                                             s->bucket_info, obj, ver_restored, this);
     if (op_ret < 0) {
       return;
     }
@@ -4705,7 +4930,7 @@ void RGWDeleteObj::execute()
       /* Swift's versioning mechanism hasn't found any previous version of
        * the object that could be restored. This means we should proceed
        * with the regular delete path. */
-      RGWRados::Object del_target(store, s->bucket_info, *obj_ctx, obj);
+      RGWRados::Object del_target(store->getRados(), s->bucket_info, *obj_ctx, obj);
       RGWRados::Object::Delete del_op(&del_target);
 
       op_ret = get_system_versioning_params(s, &del_op.params.olh_epoch,
@@ -4720,7 +4945,7 @@ void RGWDeleteObj::execute()
       del_op.params.unmod_since = unmod_since;
       del_op.params.high_precision_time = s->system_request; /* system request uses high precision time */
 
-      op_ret = del_op.delete_obj();
+      op_ret = del_op.delete_obj(s->yield);
       if (op_ret >= 0) {
         delete_marker = del_op.result.delete_marker;
         version_id = del_op.result.version_id;
@@ -4740,11 +4965,20 @@ void RGWDeleteObj::execute()
     if (op_ret == -ERR_PRECONDITION_FAILED && no_precondition_error) {
       op_ret = 0;
     }
+
+    // cache the objects tags and metadata into the requests
+    // so it could be used in the notification mechanism
+    try {
+      populate_tags_in_request(s, attrs);
+    } catch (buffer::error& err) {
+      ldpp_dout(this, 5) << "WARNING: failed to populate delete request with object tags: " << err.what() << dendl;
+    }
+    populate_metadata_in_request(s, attrs);
   } else {
     op_ret = -EINVAL;
   }
 
-  const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), attrs[RGW_ATTR_ETAG].to_str(),
+  const auto ret = rgw::notify::publish(s, s->object, s->obj_size, ceph::real_clock::now(), attrs[RGW_ATTR_ETAG].to_str(),
           delete_marker && s->object.instance.empty() ? rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete,
           store);
   if (ret < 0) {
@@ -4812,10 +5046,11 @@ int RGWCopyObj::verify_permission()
   map<string, bufferlist> src_attrs;
 
   if (s->bucket_instance_id.empty()) {
-    op_ret = store->get_bucket_info(*s->sysobj_ctx, src_tenant_name, src_bucket_name, src_bucket_info, NULL, &src_attrs);
+    op_ret = store->getRados()->get_bucket_info(store->svc(), src_tenant_name, src_bucket_name, src_bucket_info, NULL, s->yield, &src_attrs);
   } else {
     /* will only happen in intra region sync where the source and dest bucket is the same */
-    op_ret = store->get_bucket_instance_info(*s->sysobj_ctx, s->bucket_instance_id, src_bucket_info, NULL, &src_attrs);
+    rgw_bucket b(rgw_bucket_key(src_tenant_name, src_bucket_name, s->bucket_instance_id));
+    op_ret = store->getRados()->get_bucket_instance_info(*s->sysobj_ctx, b, src_bucket_info, NULL, &src_attrs, s->yield);
   }
   if (op_ret < 0) {
     if (op_ret == -ENOENT) {
@@ -4829,8 +5064,8 @@ int RGWCopyObj::verify_permission()
   /* get buckets info (source and dest) */
   if (s->local_source &&  source_zone.empty()) {
     rgw_obj src_obj(src_bucket, src_object);
-    store->set_atomic(s->obj_ctx, src_obj);
-    store->set_prefetch_data(s->obj_ctx, src_obj);
+    store->getRados()->set_atomic(s->obj_ctx, src_obj);
+    store->getRados()->set_prefetch_data(s->obj_ctx, src_obj);
 
     rgw_placement_rule src_placement;
 
@@ -4882,8 +5117,8 @@ int RGWCopyObj::verify_permission()
     dest_bucket_info = src_bucket_info;
     dest_attrs = src_attrs;
   } else {
-    op_ret = store->get_bucket_info(*s->sysobj_ctx, dest_tenant_name, dest_bucket_name,
-                                    dest_bucket_info, nullptr, &dest_attrs);
+    op_ret = store->getRados()->get_bucket_info(store->svc(), dest_tenant_name, dest_bucket_name,
+                                    dest_bucket_info, nullptr, s->yield, &dest_attrs);
     if (op_ret < 0) {
       if (op_ret == -ENOENT) {
         op_ret = -ERR_NO_SUCH_BUCKET;
@@ -4895,7 +5130,7 @@ int RGWCopyObj::verify_permission()
   dest_bucket = dest_bucket_info.bucket;
 
   rgw_obj dest_obj(dest_bucket, dest_object);
-  store->set_atomic(s->obj_ctx, dest_obj);
+  store->getRados()->set_atomic(s->obj_ctx, dest_obj);
 
   /* check dest bucket permissions */
   op_ret = read_bucket_policy(store, s, dest_bucket_info, dest_attrs,
@@ -5007,7 +5242,7 @@ void RGWCopyObj::execute()
   if ( ! version_id.empty()) {
     dst_obj.key.set_instance(version_id);
   } else if (dest_bucket_info.versioning_enabled()) {
-    store->gen_rand_obj_instance_name(&dst_obj);
+    store->getRados()->gen_rand_obj_instance_name(&dst_obj);
   }
 
   obj_ctx.set_atomic(src_obj);
@@ -5019,42 +5254,45 @@ void RGWCopyObj::execute()
 
   /* Handle object versioning of Swift API. In case of copying to remote this
    * should fail gently (op_ret == 0) as the dst_obj will not exist here. */
-  op_ret = store->swift_versioning_copy(obj_ctx,
+  op_ret = store->getRados()->swift_versioning_copy(obj_ctx,
                                         dest_bucket_info.owner,
                                         dest_bucket_info,
-                                        dst_obj);
+                                        dst_obj,
+                                        this,
+                                        s->yield);
   if (op_ret < 0) {
     return;
   }
 
-  op_ret = store->copy_obj(obj_ctx,
-                          s->user->user_id,
-                          &s->info,
-                          source_zone,
-                          dst_obj,
-                          src_obj,
-                          dest_bucket_info,
-                          src_bucket_info,
-                           s->dest_placement,
-                          &src_mtime,
-                          &mtime,
-                          mod_ptr,
-                          unmod_ptr,
-                           high_precision_time,
-                          if_match,
-                          if_nomatch,
-                          attrs_mod,
-                           copy_if_newer,
-                          attrs, RGWObjCategory::Main,
-                          olh_epoch,
-                          (delete_at ? *delete_at : real_time()),
-                          (version_id.empty() ? NULL : &version_id),
-                          &s->req_id, /* use req_id as tag */
-                          &etag,
-                          copy_obj_progress_cb, (void *)this
-    );
-
-  const auto ret = rgw::notify::publish(s, mtime, etag, rgw::notify::ObjectCreatedCopy, store);
+  op_ret = store->getRados()->copy_obj(obj_ctx,
+          s->user->get_id(),
+          &s->info,
+          source_zone,
+          dst_obj,
+          src_obj,
+          dest_bucket_info,
+          src_bucket_info,
+          s->dest_placement,
+          &src_mtime,
+          &mtime,
+          mod_ptr,
+          unmod_ptr,
+          high_precision_time,
+          if_match,
+          if_nomatch,
+          attrs_mod,
+          copy_if_newer,
+          attrs, RGWObjCategory::Main,
+          olh_epoch,
+          (delete_at ? *delete_at : real_time()),
+          (version_id.empty() ? NULL : &version_id),
+          &s->req_id, /* use req_id as tag */
+          &etag,
+          copy_obj_progress_cb, (void *)this,
+          this,
+          s->yield);
+
+  const auto ret = rgw::notify::publish(s, s->object, s->obj_size, mtime, etag, rgw::notify::ObjectCreatedCopy, store);
   if (ret < 0) {
     ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
        // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
@@ -5260,7 +5498,7 @@ void RGWPutACLs::execute()
   if (grants_num > max_num) {
     ldpp_dout(this, 4) << "An acl can have up to " << max_num
         << " grants, request acl grants num: " << grants_num << dendl;
-    op_ret = -ERR_MALFORMED_ACL_ERROR;
+    op_ret = -ERR_LIMIT_EXCEEDED;
     s->err.message = "The request is rejected, because the acl grants number you requested is larger than the maximum "
                      + std::to_string(max_num)
                      + " grants allowed in an acl.";
@@ -5268,7 +5506,7 @@ void RGWPutACLs::execute()
   }
 
   // forward bucket acl requests to meta master zone
-  if (s->object.empty() && !store->svc.zone->is_meta_master()) {
+  if (s->object.empty() && !store->svc()->zone->is_meta_master()) {
     bufferlist in_data;
     // include acl data unless it was generated from a canned_acl
     if (s->canned_acl.empty()) {
@@ -5287,7 +5525,7 @@ void RGWPutACLs::execute()
     *_dout << dendl;
   }
 
-  op_ret = policy->rebuild(store, &owner, new_policy);
+  op_ret = policy->rebuild(store->ctl()->user, &owner, new_policy, s->err.message);
   if (op_ret < 0)
     return;
 
@@ -5297,18 +5535,24 @@ void RGWPutACLs::execute()
     *_dout << dendl;
   }
 
+  if (s->bucket_access_conf &&
+      s->bucket_access_conf->block_public_acls() &&
+      new_policy.is_public()) {
+    op_ret = -EACCES;
+    return;
+  }
   new_policy.encode(bl);
-  map<string, bufferlist> attrs;
-
   if (!s->object.empty()) {
     obj = rgw_obj(s->bucket, s->object);
-    store->set_atomic(s->obj_ctx, obj);
+    store->getRados()->set_atomic(s->obj_ctx, obj);
     //if instance is empty, we should modify the latest object
     op_ret = modify_obj_attr(store, s, obj, RGW_ATTR_ACL, bl);
   } else {
-    attrs = s->bucket_attrs;
+    map<string,bufferlist> attrs = s->bucket_attrs;
     attrs[RGW_ATTR_ACL] = bl;
-    op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs, &s->bucket_info.objv_tracker);
+    op_ret = store->ctl()->bucket->set_bucket_instance_attrs(s->bucket_info, attrs,
+                                                         &s->bucket_info.objv_tracker,
+                                                         s->yield);
   }
   if (op_ret == -ECANCELED) {
     op_ret = 0; /* lost a race, but it's ok because acls are immutable */
@@ -5382,7 +5626,7 @@ void RGWPutLC::execute()
     return;
   }
 
-  op_ret = config.rebuild(store, new_config);
+  op_ret = config.rebuild(store->getRados(), new_config);
   if (op_ret < 0)
     return;
 
@@ -5394,7 +5638,7 @@ void RGWPutLC::execute()
     ldpp_dout(this, 15) << "New LifecycleConfiguration:" << ss.str() << dendl;
   }
 
-  if (!store->svc.zone->is_meta_master()) {
+  if (!store->svc()->zone->is_meta_master()) {
     op_ret = forward_request_to_master(s, nullptr, store, data, nullptr);
     if (op_ret < 0) {
       ldpp_dout(this, 0) << "forward_request_to_master returned ret=" << op_ret << dendl;
@@ -5402,7 +5646,7 @@ void RGWPutLC::execute()
     }
   }
 
-  op_ret = store->get_lc()->set_bucket_config(s->bucket_info, s->bucket_attrs, &new_config);
+  op_ret = store->getRados()->get_lc()->set_bucket_config(s->bucket_info, s->bucket_attrs, &new_config);
   if (op_ret < 0) {
     return;
   }
@@ -5411,7 +5655,7 @@ void RGWPutLC::execute()
 
 void RGWDeleteLC::execute()
 {
-  if (!store->svc.zone->is_meta_master()) {
+  if (!store->svc()->zone->is_meta_master()) {
     bufferlist data;
     op_ret = forward_request_to_master(s, nullptr, store, data, nullptr);
     if (op_ret < 0) {
@@ -5419,17 +5663,8 @@ void RGWDeleteLC::execute()
       return;
     }
   }
-  map<string, bufferlist> attrs = s->bucket_attrs;
-  attrs.erase(RGW_ATTR_LC);
-  op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
-                               &s->bucket_info.objv_tracker);
-  if (op_ret < 0) {
-    ldpp_dout(this, 0) << "RGWLC::RGWDeleteLC() failed to set attrs on bucket="
-        << s->bucket.name << " returned err=" << op_ret << dendl;
-    return;
-  }
 
-  op_ret = store->get_lc()->remove_bucket_config(s->bucket_info, s->bucket_attrs);
+  op_ret = store->getRados()->get_lc()->remove_bucket_config(s->bucket_info, s->bucket_attrs);
   if (op_ret < 0) {
     return;
   }
@@ -5467,7 +5702,7 @@ void RGWPutCORS::execute()
   if (op_ret < 0)
     return;
 
-  if (!store->svc.zone->is_meta_master()) {
+  if (!store->svc()->zone->is_meta_master()) {
     op_ret = forward_request_to_master(s, NULL, store, in_data, nullptr);
     if (op_ret < 0) {
       ldpp_dout(this, 0) << "forward_request_to_master returned ret=" << op_ret << dendl;
@@ -5475,10 +5710,12 @@ void RGWPutCORS::execute()
     }
   }
 
-  op_ret = retry_raced_bucket_write(store, s, [this] {
+  op_ret = retry_raced_bucket_write(store->getRados(), s, [this] {
       map<string, bufferlist> attrs = s->bucket_attrs;
       attrs[RGW_ATTR_CORS] = cors_bl;
-      return rgw_bucket_set_attrs(store, s->bucket_info, attrs, &s->bucket_info.objv_tracker);
+      return store->ctl()->bucket->set_bucket_instance_attrs(s->bucket_info, attrs,
+                                                         &s->bucket_info.objv_tracker,
+                                                         s->yield);
     });
 }
 
@@ -5490,7 +5727,7 @@ int RGWDeleteCORS::verify_permission()
 
 void RGWDeleteCORS::execute()
 {
-  if (!store->svc.zone->is_meta_master()) {
+  if (!store->svc()->zone->is_meta_master()) {
     bufferlist data;
     op_ret = forward_request_to_master(s, nullptr, store, data, nullptr);
     if (op_ret < 0) {
@@ -5499,7 +5736,7 @@ void RGWDeleteCORS::execute()
     }
   }
 
-  op_ret = retry_raced_bucket_write(store, s, [this] {
+  op_ret = retry_raced_bucket_write(store->getRados(), s, [this] {
       op_ret = read_bucket_cors();
       if (op_ret < 0)
        return op_ret;
@@ -5512,8 +5749,9 @@ void RGWDeleteCORS::execute()
 
       map<string, bufferlist> attrs = s->bucket_attrs;
       attrs.erase(RGW_ATTR_CORS);
-      op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
-                               &s->bucket_info.objv_tracker);
+      op_ret = store->ctl()->bucket->set_bucket_instance_attrs(s->bucket_info, attrs,
+                                                           &s->bucket_info.objv_tracker,
+                                                           s->yield);
       if (op_ret < 0) {
        ldpp_dout(this, 0) << "RGWLC::RGWDeleteCORS() failed to set attrs on bucket=" << s->bucket.name
                         << " returned err=" << op_ret << dendl;
@@ -5604,7 +5842,7 @@ void RGWSetRequestPayment::pre_exec()
 void RGWSetRequestPayment::execute()
 {
 
-  if (!store->svc.zone->is_meta_master()) {
+  if (!store->svc()->zone->is_meta_master()) {
     op_ret = forward_request_to_master(s, nullptr, store, in_data, nullptr);
     if (op_ret < 0) {
       ldpp_dout(this, 0) << "forward_request_to_master returned ret=" << op_ret << dendl;
@@ -5618,7 +5856,7 @@ void RGWSetRequestPayment::execute()
     return;
 
   s->bucket_info.requester_pays = requester_pays;
-  op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(),
+  op_ret = store->getRados()->put_bucket_instance_info(s->bucket_info, false, real_time(),
                                           &s->bucket_attrs);
   if (op_ret < 0) {
     ldpp_dout(this, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name
@@ -5707,7 +5945,7 @@ void RGWInitMultipart::execute()
     obj.set_in_extra_data(true);
     obj.index_hash_source = s->object.name;
 
-    RGWRados::Object op_target(store, s->bucket_info, *static_cast<RGWObjectCtx *>(s->obj_ctx), obj);
+    RGWRados::Object op_target(store->getRados(), s->bucket_info, *static_cast<RGWObjectCtx *>(s->obj_ctx), obj);
     op_target.set_versioning_disabled(true); /* no versioning for multipart meta */
 
     RGWRados::Object::Write obj_op(&op_target);
@@ -5723,10 +5961,10 @@ void RGWInitMultipart::execute()
     encode(upload_info, bl);
     obj_op.meta.data = &bl;
 
-    op_ret = obj_op.write_meta(bl.length(), 0, attrs);
+    op_ret = obj_op.write_meta(bl.length(), 0, attrs, s->yield);
   } while (op_ret == -EEXIST);
   
-  const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), attrs[RGW_ATTR_ETAG].to_str(), rgw::notify::ObjectCreatedPost, store);
+  const auto ret = rgw::notify::publish(s, s->object, s->obj_size, ceph::real_clock::now(), attrs[RGW_ATTR_ETAG].to_str(), rgw::notify::ObjectCreatedPost, store);
   if (ret < 0) {
     ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
        // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
@@ -5860,10 +6098,10 @@ void RGWCompleteMultipart::execute()
     s->cct->_conf.get_val<int64_t>("rgw_mp_lock_max_time");
   utime_t dur(max_lock_secs_mp, 0);
 
-  store->obj_to_raw((s->bucket_info).placement_rule, meta_obj, &raw_obj);
-  store->get_obj_data_pool((s->bucket_info).placement_rule,
+  store->getRados()->obj_to_raw((s->bucket_info).placement_rule, meta_obj, &raw_obj);
+  store->getRados()->get_obj_data_pool((s->bucket_info).placement_rule,
                           meta_obj,&meta_pool);
-  store->open_pool_ctx(meta_pool, serializer.ioctx, true);
+  store->getRados()->open_pool_ctx(meta_pool, serializer.ioctx, true);
 
   op_ret = serializer.try_lock(raw_obj.oid, dur);
   if (op_ret < 0) {
@@ -5939,11 +6177,11 @@ void RGWCompleteMultipart::execute()
         op_ret = -ERR_INVALID_PART;
         return;
       } else {
-        manifest.append(obj_part.manifest, store->svc.zone);
+        manifest.append(obj_part.manifest, store->svc()->zone);
       }
 
       bool part_compressed = (obj_part.cs_info.compression_type != "none");
-      if ((obj_iter != obj_parts.begin()) &&
+      if ((handled_parts > 0) &&
           ((part_compressed != compressed) ||
             (cs_info.compression_type != obj_part.cs_info.compression_type))) {
           ldpp_dout(this, 0) << "ERROR: compression type was changed during multipart upload ("
@@ -6005,7 +6243,7 @@ void RGWCompleteMultipart::execute()
     if (!version_id.empty()) {
       target_obj.key.set_instance(version_id);
     } else {
-      store->gen_rand_obj_instance_name(&target_obj);
+      store->getRados()->gen_rand_obj_instance_name(&target_obj);
       version_id = target_obj.key.get_instance();
     }
   }
@@ -6014,7 +6252,7 @@ void RGWCompleteMultipart::execute()
 
   obj_ctx.set_atomic(target_obj);
 
-  RGWRados::Object op_target(store, s->bucket_info, *static_cast<RGWObjectCtx *>(s->obj_ctx), target_obj);
+  RGWRados::Object op_target(store->getRados(), s->bucket_info, *static_cast<RGWObjectCtx *>(s->obj_ctx), target_obj);
   RGWRados::Object::Write obj_op(&op_target);
 
   obj_op.meta.manifest = &manifest;
@@ -6026,12 +6264,12 @@ void RGWCompleteMultipart::execute()
   obj_op.meta.modify_tail = true;
   obj_op.meta.completeMultipart = true;
   obj_op.meta.olh_epoch = olh_epoch;
-  op_ret = obj_op.write_meta(ofs, accounted_size, attrs);
+  op_ret = obj_op.write_meta(ofs, accounted_size, attrs, s->yield);
   if (op_ret < 0)
     return;
 
   // remove the upload obj
-  int r = store->delete_obj(*static_cast<RGWObjectCtx *>(s->obj_ctx),
+  int r = store->getRados()->delete_obj(*static_cast<RGWObjectCtx *>(s->obj_ctx),
                            s->bucket_info, meta_obj, 0);
   if (r >= 0)  {
     /* serializer's exclusive lock is released */
@@ -6040,7 +6278,7 @@ void RGWCompleteMultipart::execute()
     ldpp_dout(this, 0) << "WARNING: failed to remove object " << meta_obj << dendl;
   }
   
-  const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedCompleteMultipartUpload, store);
+  const auto ret = rgw::notify::publish(s, s->object, s->obj_size, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedCompleteMultipartUpload, store);
   if (ret < 0) {
     ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
        // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
@@ -6057,7 +6295,7 @@ int RGWCompleteMultipart::MPSerializer::try_lock(
   op.assert_exists();
   lock.set_duration(dur);
   lock.lock_exclusive(&op);
-  int ret = ioctx.operate(oid, &op);
+  int ret = rgw_rados_operate(ioctx, oid, &op, null_yield);
   if (! ret) {
     locked = true;
   }
@@ -6382,20 +6620,34 @@ void RGWDeleteMultiObj::execute()
 
     obj_ctx->set_atomic(obj);
 
-    RGWRados::Object del_target(store, s->bucket_info, *obj_ctx, obj);
+    RGWRados::Object del_target(store->getRados(), s->bucket_info, *obj_ctx, obj);
     RGWRados::Object::Delete del_op(&del_target);
 
     del_op.params.bucket_owner = s->bucket_owner.get_id();
     del_op.params.versioning_status = s->bucket_info.versioning_status();
     del_op.params.obj_owner = s->owner;
 
-    op_ret = del_op.delete_obj();
+    op_ret = del_op.delete_obj(s->yield);
     if (op_ret == -ENOENT) {
       op_ret = 0;
     }
 
     send_partial_response(*iter, del_op.result.delete_marker,
                          del_op.result.version_id, op_ret);
+
+    const auto obj_state = obj_ctx->get_state(obj);
+    bufferlist etag_bl;
+    const auto etag = obj_state->get_attr(RGW_ATTR_ETAG, etag_bl) ? etag_bl.to_str() : "";
+
+    const auto ret = rgw::notify::publish(s, obj.key, obj_state->size, ceph::real_clock::now(), etag, 
+            del_op.result.delete_marker && s->object.instance.empty() ? rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete,
+            store);
+    if (ret < 0) {
+        ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
+           // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
+           // this should be global conf (probably returnign a different handler)
+        // so we don't need to read the configured values before we perform it
+    }
   }
 
   /*  set the return code to zero, errors at this point will be
@@ -6440,10 +6692,14 @@ bool RGWBulkDelete::Deleter::delete_single(const acct_path_t& path)
   RGWBucketInfo binfo;
   map<string, bufferlist> battrs;
   ACLOwner bowner;
+  RGWObjVersionTracker ot;
+
+  rgw_bucket b(rgw_bucket_key(s->user->get_tenant(), path.bucket_name));
 
-  int ret = store->get_bucket_info(*s->sysobj_ctx, s->user->user_id.tenant,
-                                   path.bucket_name, binfo, nullptr,
-                                   &battrs);
+  int ret = store->ctl()->bucket->read_bucket_info(b, &binfo, s->yield,
+                                               RGWBucketCtl::BucketInstance::GetParams()
+                                                 .set_attrs(&battrs),
+                                               &ot);
   if (ret < 0) {
     goto binfo_fail;
   }
@@ -6457,25 +6713,21 @@ bool RGWBulkDelete::Deleter::delete_single(const acct_path_t& path)
     rgw_obj obj(binfo.bucket, path.obj_key);
     obj_ctx.set_atomic(obj);
 
-    RGWRados::Object del_target(store, binfo, obj_ctx, obj);
+    RGWRados::Object del_target(store->getRados(), binfo, obj_ctx, obj);
     RGWRados::Object::Delete del_op(&del_target);
 
     del_op.params.bucket_owner = binfo.owner;
     del_op.params.versioning_status = binfo.versioning_status();
     del_op.params.obj_owner = bowner;
 
-    ret = del_op.delete_obj();
+    ret = del_op.delete_obj(s->yield);
     if (ret < 0) {
       goto delop_fail;
     }
   } else {
-    RGWObjVersionTracker ot;
-    ot.read_version = binfo.ep_objv;
-
-    ret = store->delete_bucket(binfo, ot);
+    ret = store->getRados()->delete_bucket(binfo, ot, s->yield);
     if (0 == ret) {
-      ret = rgw_unlink_bucket(store, binfo.owner, binfo.bucket.tenant,
-                              binfo.bucket.name, false);
+      ret = store->ctl()->bucket->unlink_bucket(binfo.owner, binfo.bucket, s->yield, false);
       if (ret < 0) {
         ldpp_dout(s, 0) << "WARNING: failed to unlink bucket: ret=" << ret << dendl;
       }
@@ -6484,7 +6736,7 @@ bool RGWBulkDelete::Deleter::delete_single(const acct_path_t& path)
       goto delop_fail;
     }
 
-    if (!store->svc.zone->is_meta_master()) {
+    if (!store->svc()->zone->is_meta_master()) {
       bufferlist in_data;
       ret = forward_request_to_master(s, &ot.read_version, store, in_data,
                                       nullptr);
@@ -6596,14 +6848,14 @@ int RGWBulkUploadOp::verify_permission()
     return -EACCES;
   }
 
-  if (s->user->user_id.tenant != s->bucket_tenant) {
+  if (s->user->get_tenant() != s->bucket_tenant) {
     ldpp_dout(this, 10) << "user cannot create a bucket in a different tenant"
-        << " (user_id.tenant=" << s->user->user_id.tenant
+        << " (user_id.tenant=" << s->user->get_tenant()
         << " requested=" << s->bucket_tenant << ")" << dendl;
     return -EACCES;
   }
 
-  if (s->user->max_buckets < 0) {
+  if (s->user->get_max_buckets() < 0) {
     return -EPERM;
   }
 
@@ -6666,18 +6918,17 @@ RGWBulkUploadOp::handle_upload_path(struct req_state *s)
 
 int RGWBulkUploadOp::handle_dir_verify_permission()
 {
-  if (s->user->max_buckets > 0) {
-    RGWUserBuckets buckets;
+  if (s->user->get_max_buckets() > 0) {
+    rgw::sal::RGWBucketList buckets;
     std::string marker;
-    bool is_truncated = false;
-    op_ret = rgw_read_user_buckets(store, s->user->user_id, buckets,
-                                   marker, std::string(), s->user->max_buckets,
-                                   false, &is_truncated);
+    op_ret = rgw_read_user_buckets(store, s->user->get_user(), buckets,
+                                   marker, std::string(), s->user->get_max_buckets(),
+                                   false);
     if (op_ret < 0) {
       return op_ret;
     }
 
-    if (buckets.count() >= static_cast<size_t>(s->user->max_buckets)) {
+    if (buckets.count() >= static_cast<size_t>(s->user->get_max_buckets())) {
       return -ERR_TOO_MANY_BUCKETS;
     }
   }
@@ -6699,12 +6950,12 @@ static void forward_req_info(CephContext *cct, req_info& info, const std::string
   info.effective_uri = "/" + bucket_name;
 }
 
-void RGWBulkUploadOp::init(RGWRados* const store,
+void RGWBulkUploadOp::init(rgw::sal::RGWRadosStore* const store,
                            struct req_state* const s,
                            RGWHandler* const h)
 {
   RGWOp::init(store, s, h);
-  dir_ctx.emplace(store->svc.sysobj->init_obj_ctx());
+  dir_ctx.emplace(store->svc()->sysobj->init_obj_ctx());
 }
 
 int RGWBulkUploadOp::handle_dir(const boost::string_ref path)
@@ -6720,15 +6971,15 @@ int RGWBulkUploadOp::handle_dir(const boost::string_ref path)
   rgw_obj_key object_junk;
   std::tie(bucket_name, object_junk) =  *parse_path(path);
 
-  rgw_raw_obj obj(store->svc.zone->get_zone_params().domain_root,
+  rgw_raw_obj obj(store->svc()->zone->get_zone_params().domain_root,
                   rgw_make_bucket_entry_name(s->bucket_tenant, bucket_name));
 
   /* we need to make sure we read bucket info, it's not read before for this
    * specific request */
   RGWBucketInfo binfo;
   std::map<std::string, ceph::bufferlist> battrs;
-  op_ret = store->get_bucket_info(*dir_ctx, s->bucket_tenant, bucket_name,
-                                  binfo, nullptr, &battrs);
+  op_ret = store->getRados()->get_bucket_info(store->svc(), s->bucket_tenant, bucket_name,
+                                  binfo, nullptr, s->yield, &battrs);
   if (op_ret < 0 && op_ret != -ENOENT) {
     return op_ret;
   }
@@ -6739,7 +6990,7 @@ int RGWBulkUploadOp::handle_dir(const boost::string_ref path)
     int r = rgw_op_get_bucket_policy_from_attr(s->cct, store, binfo,
                                                battrs, &old_policy);
     if (r >= 0)  {
-      if (old_policy.get_owner().get_id().compare(s->user->user_id) != 0) {
+      if (old_policy.get_owner().get_id().compare(s->user->get_user()) != 0) {
         op_ret = -EEXIST;
         return op_ret;
       }
@@ -6752,7 +7003,7 @@ int RGWBulkUploadOp::handle_dir(const boost::string_ref path)
   real_time creation_time;
   obj_version objv, ep_objv, *pobjv = nullptr;
 
-  if (! store->svc.zone->is_meta_master()) {
+  if (! store->svc()->zone->is_meta_master()) {
     JSONParser jp;
     ceph::bufferlist in_data;
     req_info info = s->info;
@@ -6785,8 +7036,8 @@ int RGWBulkUploadOp::handle_dir(const boost::string_ref path)
     rgw_bucket bucket;
     bucket.tenant = s->bucket_tenant;
     bucket.name = s->bucket_name;
-    op_ret = store->svc.zone->select_bucket_placement(*(s->user),
-                                            store->svc.zone->get_zonegroup().get_id(),
+    op_ret = store->svc()->zone->select_bucket_placement(s->user->get_info(),
+                                            store->svc()->zone->get_zonegroup().get_id(),
                                             placement_rule,
                                             &selected_placement_rule,
                                             nullptr);
@@ -6800,7 +7051,7 @@ int RGWBulkUploadOp::handle_dir(const boost::string_ref path)
   /* Create metadata: ACLs. */
   std::map<std::string, ceph::bufferlist> attrs;
   RGWAccessControlPolicy policy;
-  policy.create_default(s->user->user_id, s->user->display_name);
+  policy.create_default(s->user->get_id(), s->user->get_display_name());
   ceph::bufferlist aclbl;
   policy.encode(aclbl);
   attrs.emplace(RGW_ATTR_ACL, std::move(aclbl));
@@ -6814,9 +7065,9 @@ int RGWBulkUploadOp::handle_dir(const boost::string_ref path)
 
 
   RGWBucketInfo out_info;
-  op_ret = store->create_bucket(*(s->user),
+  op_ret = store->getRados()->create_bucket(s->user->get_info(),
                                 bucket,
-                                store->svc.zone->get_zonegroup().get_id(),
+                                store->svc()->zone->get_zonegroup().get_id(),
                                 placement_rule, binfo.swift_ver_location,
                                 pquota_info, attrs,
                                 out_info, pobjv, &ep_objv, creation_time,
@@ -6838,7 +7089,7 @@ int RGWBulkUploadOp::handle_dir(const boost::string_ref path)
      * If all is ok then update the user's list of buckets.
      * Otherwise inform client about a name conflict.
      */
-    if (out_info.owner.compare(s->user->user_id) != 0) {
+    if (out_info.owner.compare(s->user->get_id()) != 0) {
       op_ret = -EEXIST;
       ldpp_dout(this, 20) << "conflicting bucket name" << dendl;
       return op_ret;
@@ -6846,12 +7097,12 @@ int RGWBulkUploadOp::handle_dir(const boost::string_ref path)
     bucket = out_info.bucket;
   }
 
-  op_ret = rgw_link_bucket(store, s->user->user_id, bucket,
-                           out_info.creation_time, false);
+  op_ret = store->ctl()->bucket->link_bucket(s->user->get_id(), bucket,
+                                          out_info.creation_time,
+                                         s->yield, false);
   if (op_ret && !existed && op_ret != -EEXIST) {
     /* if it exists (or previously existed), don't remove it! */
-    op_ret = rgw_unlink_bucket(store, s->user->user_id,
-                               bucket.tenant, bucket.name);
+    op_ret = store->ctl()->bucket->unlink_bucket(s->user->get_id(), bucket, s->yield);
     if (op_ret < 0) {
       ldpp_dout(this, 0) << "WARNING: failed to unlink bucket: ret=" << op_ret << dendl;
     }
@@ -6921,8 +7172,8 @@ int RGWBulkUploadOp::handle_file(const boost::string_ref path,
   RGWBucketInfo binfo;
   std::map<std::string, ceph::bufferlist> battrs;
   ACLOwner bowner;
-  op_ret = store->get_bucket_info(*s->sysobj_ctx, s->user->user_id.tenant,
-                                  bucket_name, binfo, nullptr, &battrs);
+  op_ret = store->getRados()->get_bucket_info(store->svc(), s->user->get_tenant(),
+                                  bucket_name, binfo, nullptr, s->yield, &battrs);
   if (op_ret == -ENOENT) {
     ldpp_dout(this, 20) << "non existent directory=" << bucket_name << dendl;
   } else if (op_ret < 0) {
@@ -6937,33 +7188,28 @@ int RGWBulkUploadOp::handle_file(const boost::string_ref path,
     return op_ret;
   }
 
-  op_ret = store->check_quota(bowner.get_id(), binfo.bucket,
+  op_ret = store->getRados()->check_quota(bowner.get_id(), binfo.bucket,
                               user_quota, bucket_quota, size);
   if (op_ret < 0) {
     return op_ret;
   }
 
-  op_ret = store->check_bucket_shards(s->bucket_info, s->bucket, bucket_quota);
-  if (op_ret < 0) {
-    return op_ret;
-  }
-
   rgw_obj obj(binfo.bucket, object);
   if (s->bucket_info.versioning_enabled()) {
-    store->gen_rand_obj_instance_name(&obj);
+    store->getRados()->gen_rand_obj_instance_name(&obj);
   }
 
   rgw_placement_rule dest_placement = s->dest_placement;
   dest_placement.inherit_from(binfo.placement_rule);
 
-  rgw::AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
+  auto aio = rgw::make_throttle(s->cct->_conf->rgw_put_obj_min_window_size,
+                                s->yield);
 
   using namespace rgw::putobj;
+  AtomicObjectProcessor processor(&*aio, store, binfo, &s->dest_placement, bowner.get_id(),
+                                  obj_ctx, obj, 0, s->req_id, this, s->yield);
 
-  AtomicObjectProcessor processor(&aio, store, binfo, &s->dest_placement, bowner.get_id(),
-                                  obj_ctx, obj, 0, s->req_id);
-
-  op_ret = processor.prepare();
+  op_ret = processor.prepare(s->yield);
   if (op_ret < 0) {
     ldpp_dout(this, 20) << "cannot prepare processor due to ret=" << op_ret << dendl;
     return op_ret;
@@ -6972,7 +7218,7 @@ int RGWBulkUploadOp::handle_file(const boost::string_ref path,
   /* No filters by default. */
   DataProcessor *filter = &processor;
 
-  const auto& compression_type = store->svc.zone->get_zone_params().get_compression_type(
+  const auto& compression_type = store->svc()->zone->get_zone_params().get_compression_type(
       dest_placement);
   CompressorRef plugin;
   boost::optional<RGWPutObj_Compress> compressor;
@@ -7024,18 +7270,13 @@ int RGWBulkUploadOp::handle_file(const boost::string_ref path,
     return op_ret;
   }
 
-  op_ret = store->check_quota(bowner.get_id(), binfo.bucket,
+  op_ret = store->getRados()->check_quota(bowner.get_id(), binfo.bucket,
                              user_quota, bucket_quota, size);
   if (op_ret < 0) {
     ldpp_dout(this, 20) << "quota exceeded for path=" << path << dendl;
     return op_ret;
   }
 
-  op_ret = store->check_bucket_shards(s->bucket_info, s->bucket, bucket_quota);
-  if (op_ret < 0) {
-    return op_ret;
-  }
-
   char calc_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
   unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE];
   hash.Final(m);
@@ -7050,7 +7291,7 @@ int RGWBulkUploadOp::handle_file(const boost::string_ref path,
 
   /* Create metadata: ACLs. */
   RGWAccessControlPolicy policy;
-  policy.create_default(s->user->user_id, s->user->display_name);
+  policy.create_default(s->user->get_id(), s->user->get_display_name());
   ceph::bufferlist aclbl;
   policy.encode(aclbl);
   attrs.emplace(RGW_ATTR_ACL, std::move(aclbl));
@@ -7069,7 +7310,8 @@ int RGWBulkUploadOp::handle_file(const boost::string_ref path,
   /* Complete the transaction. */
   op_ret = processor.complete(size, etag, nullptr, ceph::real_time(),
                               attrs, ceph::real_time() /* delete_at */,
-                              nullptr, nullptr, nullptr, nullptr, nullptr);
+                              nullptr, nullptr, nullptr, nullptr, nullptr,
+                              s->yield);
   if (op_ret < 0) {
     ldpp_dout(this, 20) << "processor::complete returned op_ret=" << op_ret << dendl;
   }
@@ -7228,14 +7470,15 @@ void RGWSetAttrs::execute()
   rgw_obj obj(s->bucket, s->object);
 
   if (!s->object.empty()) {
-    store->set_atomic(s->obj_ctx, obj);
-    op_ret = store->set_attrs(s->obj_ctx, s->bucket_info, obj, attrs, nullptr);
+    store->getRados()->set_atomic(s->obj_ctx, obj);
+    op_ret = store->getRados()->set_attrs(s->obj_ctx, s->bucket_info, obj, attrs, nullptr, s->yield);
   } else {
     for (auto& iter : attrs) {
       s->bucket_attrs[iter.first] = std::move(iter.second);
     }
-    op_ret = rgw_bucket_set_attrs(store, s->bucket_info, s->bucket_attrs,
-                                 &s->bucket_info.objv_tracker);
+    op_ret = store->ctl()->bucket->set_bucket_instance_attrs(s->bucket_info, attrs,
+                                                         &s->bucket_info.objv_tracker,
+                                                         s->yield);
   }
 }
 
@@ -7247,20 +7490,20 @@ void RGWGetObjLayout::pre_exec()
 void RGWGetObjLayout::execute()
 {
   rgw_obj obj(s->bucket, s->object);
-  RGWRados::Object target(store,
+  RGWRados::Object target(store->getRados(),
                           s->bucket_info,
                           *static_cast<RGWObjectCtx *>(s->obj_ctx),
                           rgw_obj(s->bucket, s->object));
   RGWRados::Object::Read stat_op(&target);
 
-  op_ret = stat_op.prepare();
+  op_ret = stat_op.prepare(s->yield);
   if (op_ret < 0) {
     return;
   }
 
   head_obj = stat_op.state.head_obj;
 
-  op_ret = target.get_manifest(&manifest);
+  op_ret = target.get_manifest(&manifest, s->yield);
 }
 
 
@@ -7288,7 +7531,7 @@ void RGWConfigBucketMetaSearch::execute()
 
   s->bucket_info.mdsearch_config = mdsearch_config;
 
-  op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(), &s->bucket_attrs);
+  op_ret = store->getRados()->put_bucket_instance_info(s->bucket_info, false, real_time(), &s->bucket_attrs);
   if (op_ret < 0) {
     ldpp_dout(this, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name
         << " returned err=" << op_ret << dendl;
@@ -7328,7 +7571,7 @@ void RGWDelBucketMetaSearch::execute()
 {
   s->bucket_info.mdsearch_config.clear();
 
-  op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(), &s->bucket_attrs);
+  op_ret = store->getRados()->put_bucket_instance_info(s->bucket_info, false, real_time(), &s->bucket_attrs);
   if (op_ret < 0) {
     ldpp_dout(this, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name
         << " returned err=" << op_ret << dendl;
@@ -7341,7 +7584,7 @@ RGWHandler::~RGWHandler()
 {
 }
 
-int RGWHandler::init(RGWRados *_store,
+int RGWHandler::init(rgw::sal::RGWRadosStore *_store,
                      struct req_state *_s,
                      rgw::io::BasicClient *cio)
 {
@@ -7442,7 +7685,7 @@ void RGWPutBucketPolicy::execute()
     return;
   }
 
-  if (!store->svc.zone->is_meta_master()) {
+  if (!store->svc()->zone->is_meta_master()) {
     op_ret = forward_request_to_master(s, NULL, store, data, nullptr);
     if (op_ret < 0) {
       ldpp_dout(this, 20) << "forward_request_to_master returned ret=" << op_ret << dendl;
@@ -7452,12 +7695,20 @@ void RGWPutBucketPolicy::execute()
 
   try {
     const Policy p(s->cct, s->bucket_tenant, data);
-    op_ret = retry_raced_bucket_write(store, s, [&p, this] {
-       auto attrs = s->bucket_attrs;
+    auto attrs = s->bucket_attrs;
+    if (s->bucket_access_conf &&
+        s->bucket_access_conf->block_public_policy() &&
+        rgw::IAM::is_public(p)) {
+      op_ret = -EACCES;
+      return;
+    }
+
+    op_ret = retry_raced_bucket_write(store->getRados(), s, [&p, this, &attrs] {
        attrs[RGW_ATTR_IAM_POLICY].clear();
        attrs[RGW_ATTR_IAM_POLICY].append(p.text);
-       op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
-                                     &s->bucket_info.objv_tracker);
+       op_ret = store->ctl()->bucket->set_bucket_instance_attrs(s->bucket_info, attrs,
+                                                             &s->bucket_info.objv_tracker,
+                                                             s->yield);
        return op_ret;
       });
   } catch (rgw::IAM::PolicyParseException& e) {
@@ -7528,11 +7779,12 @@ int RGWDeleteBucketPolicy::verify_permission()
 
 void RGWDeleteBucketPolicy::execute()
 {
-  op_ret = retry_raced_bucket_write(store, s, [this] {
+  op_ret = retry_raced_bucket_write(store->getRados(), s, [this] {
       auto attrs = s->bucket_attrs;
       attrs.erase(RGW_ATTR_IAM_POLICY);
-      op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
-                                   &s->bucket_info.objv_tracker);
+      op_ret = store->ctl()->bucket->set_bucket_instance_attrs(s->bucket_info, attrs,
+                                                           &s->bucket_info.objv_tracker,
+                                                           s->yield);
       return op_ret;
     });
 }
@@ -7583,7 +7835,7 @@ void RGWPutBucketObjectLock::execute()
     return;
   }
 
-  if (!store->svc.zone->is_meta_master()) {
+  if (!store->svc()->zone->is_meta_master()) {
     op_ret = forward_request_to_master(s, NULL, store, data, nullptr);
     if (op_ret < 0) {
       ldout(s->cct, 20) << __func__ << "forward_request_to_master returned ret=" << op_ret << dendl;
@@ -7591,9 +7843,9 @@ void RGWPutBucketObjectLock::execute()
     }
   }
 
-  op_ret = retry_raced_bucket_write(store, s, [this] {
+  op_ret = retry_raced_bucket_write(store->getRados(), s, [this] {
     s->bucket_info.obj_lock = obj_lock;
-    op_ret = store->put_bucket_instance_info(s->bucket_info, false,
+    op_ret = store->getRados()->put_bucket_instance_info(s->bucket_info, false,
                                              real_time(), &s->bucket_attrs);
     return op_ret;
   });
@@ -7848,7 +8100,142 @@ void RGWGetObjLegalHold::execute()
 
 void RGWGetClusterStat::execute()
 {
-  op_ret = this->store->get_rados_handle()->cluster_stat(stats_op);
+  op_ret = this->store->getRados()->get_rados_handle()->cluster_stat(stats_op);
 }
 
 
+int RGWGetBucketPolicyStatus::verify_permission()
+{
+  if (!verify_bucket_permission(this, s, rgw::IAM::s3GetBucketPolicyStatus)) {
+    return -EACCES;
+  }
+
+  return 0;
+}
+
+void RGWGetBucketPolicyStatus::execute()
+{
+  isPublic = (s->iam_policy && rgw::IAM::is_public(*s->iam_policy)) || s->bucket_acl->is_public();
+}
+
+int RGWPutBucketPublicAccessBlock::verify_permission()
+{
+  if (!verify_bucket_permission(this, s, rgw::IAM::s3PutBucketPublicAccessBlock)) {
+    return -EACCES;
+  }
+
+  return 0;
+}
+
+int RGWPutBucketPublicAccessBlock::get_params()
+{
+  const auto max_size = s->cct->_conf->rgw_max_put_param_size;
+  std::tie(op_ret, data) = rgw_rest_read_all_input(s, max_size, false);
+  return op_ret;
+}
+
+void RGWPutBucketPublicAccessBlock::execute()
+{
+  RGWXMLDecoder::XMLParser parser;
+  if (!parser.init()) {
+    ldpp_dout(this, 0) << "ERROR: failed to initialize parser" << dendl;
+    op_ret = -EINVAL;
+    return;
+  }
+
+  op_ret = get_params();
+  if (op_ret < 0)
+    return;
+
+  if (!parser.parse(data.c_str(), data.length(), 1)) {
+    ldpp_dout(this, 0) << "ERROR: malformed XML" << dendl;
+    op_ret = -ERR_MALFORMED_XML;
+    return;
+  }
+
+  try {
+    RGWXMLDecoder::decode_xml("PublicAccessBlockConfiguration", access_conf, &parser, true);
+  } catch (RGWXMLDecoder::err &err) {
+    ldpp_dout(this, 5) << "unexpected xml:" << err << dendl;
+    op_ret = -ERR_MALFORMED_XML;
+    return;
+  }
+
+  if (!store->svc()->zone->is_meta_master()) {
+    op_ret = forward_request_to_master(s, NULL, store, data, nullptr);
+    if (op_ret < 0) {
+      ldpp_dout(this, 0) << "forward_request_to_master returned ret=" << op_ret << dendl;
+      return;
+    }
+  }
+
+  bufferlist bl;
+  access_conf.encode(bl);
+  op_ret = retry_raced_bucket_write(store->getRados(), s, [this, &bl] {
+      map<string, bufferlist> attrs = s->bucket_attrs;
+      attrs[RGW_ATTR_PUBLIC_ACCESS] = bl;
+      return store->ctl()->bucket->set_bucket_instance_attrs(s->bucket_info, attrs, &s->bucket_info.objv_tracker, s->yield);
+    });
+
+}
+
+int RGWGetBucketPublicAccessBlock::verify_permission()
+{
+  if (!verify_bucket_permission(this, s, rgw::IAM::s3GetBucketPolicy)) {
+    return -EACCES;
+  }
+
+  return 0;
+}
+
+void RGWGetBucketPublicAccessBlock::execute()
+{
+  auto attrs = s->bucket_attrs;
+  if (auto aiter = attrs.find(RGW_ATTR_PUBLIC_ACCESS);
+      aiter == attrs.end()) {
+    ldpp_dout(this, 0) << "can't find bucket IAM POLICY attr bucket_name = "
+                      << s->bucket_name << dendl;
+    // return the default;
+    return;
+  } else {
+    bufferlist::const_iterator iter{&aiter->second};
+    try {
+      access_conf.decode(iter);
+    } catch (const buffer::error& e) {
+      ldpp_dout(this, 0) << __func__ <<  "decode access_conf failed" << dendl;
+      op_ret = -EIO;
+      return;
+    }
+  }
+}
+
+
+void RGWDeleteBucketPublicAccessBlock::send_response()
+{
+  if (op_ret) {
+    set_req_state_err(s, op_ret);
+  }
+  dump_errno(s);
+  end_header(s);
+}
+
+int RGWDeleteBucketPublicAccessBlock::verify_permission()
+{
+  if (!verify_bucket_permission(this, s, rgw::IAM::s3PutBucketPublicAccessBlock)) {
+    return -EACCES;
+  }
+
+  return 0;
+}
+
+void RGWDeleteBucketPublicAccessBlock::execute()
+{
+  op_ret = retry_raced_bucket_write(store->getRados(), s, [this] {
+      auto attrs = s->bucket_attrs;
+      attrs.erase(RGW_ATTR_PUBLIC_ACCESS);
+      op_ret = store->ctl()->bucket->set_bucket_instance_attrs(s->bucket_info, attrs,
+                                                              &s->bucket_info.objv_tracker,
+                                                              s->yield);
+      return op_ret;
+    });
+}