]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_user.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / rgw / rgw_user.cc
index 3d369cce0c4bfd942535e5675960426bf0750e61..ee1cded39357bfe5cb3bcc7ee638e9605a87f1d9 100644 (file)
@@ -1,5 +1,5 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
+// vim: ts=8 sw=2 smarttab ft=cpp
 
 #include <errno.h>
 
@@ -11,7 +11,7 @@
 #include "common/Formatter.h"
 #include "common/ceph_json.h"
 #include "common/RWLock.h"
-#include "rgw_rados.h"
+#include "rgw_sal.h"
 #include "rgw_zone.h"
 #include "rgw_acl.h"
 
 #include "services/svc_zone.h"
 #include "services/svc_sys_obj.h"
 #include "services/svc_sys_obj_cache.h"
+#include "services/svc_user.h"
+#include "services/svc_meta.h"
 
 #define dout_subsys ceph_subsys_rgw
 
 
 
-static RGWMetadataHandler *user_meta_handler = NULL;
 extern void op_type_to_str(uint32_t mask, char *buf, int len);
 
 /**
@@ -46,52 +47,49 @@ void rgw_get_anon_user(RGWUserInfo& info)
   info.access_keys.clear();
 }
 
-int rgw_user_sync_all_stats(RGWRados *store, const rgw_user& user_id)
+int rgw_user_sync_all_stats(rgw::sal::RGWRadosStore *store, const rgw_user& user_id)
 {
+  rgw::sal::RGWBucketList user_buckets;
+  rgw::sal::RGWRadosUser user(store, user_id);
+
   CephContext *cct = store->ctx();
   size_t max_entries = cct->_conf->rgw_list_buckets_max_chunk;
   bool is_truncated = false;
   string marker;
   int ret;
-  RGWSysObjectCtx obj_ctx = store->svc.sysobj->init_obj_ctx();
 
   do {
-    RGWUserBuckets user_buckets;
-    ret = rgw_read_user_buckets(store, user_id, user_buckets, marker,
-                               string(), max_entries, false, &is_truncated);
+    ret = user.list_buckets(marker, string(), max_entries, false, user_buckets);
     if (ret < 0) {
       ldout(cct, 0) << "failed to read user buckets: ret=" << ret << dendl;
       return ret;
     }
-    map<string, RGWBucketEnt>& buckets = user_buckets.get_buckets();
-    for (map<string, RGWBucketEnt>::iterator i = buckets.begin();
+    map<string, rgw::sal::RGWBucket*>& buckets = user_buckets.get_buckets();
+    for (map<string, rgw::sal::RGWBucket*>::iterator i = buckets.begin();
          i != buckets.end();
          ++i) {
       marker = i->first;
 
-      RGWBucketEnt& bucket_ent = i->second;
-      RGWBucketInfo bucket_info;
+      rgw::sal::RGWBucket* bucket = i->second;
 
-      ret = store->get_bucket_info(obj_ctx, user_id.tenant, bucket_ent.bucket.name,
-                                   bucket_info, nullptr, nullptr);
+      ret = bucket->get_bucket_info(null_yield);
       if (ret < 0) {
-        ldout(cct, 0) << "ERROR: could not read bucket info: bucket=" << bucket_ent.bucket << " ret=" << ret << dendl;
+        ldout(cct, 0) << "ERROR: could not read bucket info: bucket=" << bucket << " ret=" << ret << dendl;
         continue;
       }
-      ret = rgw_bucket_sync_user_stats(store, user_id, bucket_info);
+      ret = bucket->sync_user_stats();
       if (ret < 0) {
         ldout(cct, 0) << "ERROR: could not sync bucket stats: ret=" << ret << dendl;
         return ret;
       }
-      RGWQuotaInfo bucket_quota;
-      ret = store->check_bucket_shards(bucket_info, bucket_info.bucket, bucket_quota);
+      ret = bucket->check_bucket_shards();
       if (ret < 0) {
        ldout(cct, 0) << "ERROR in check_bucket_shards: " << cpp_strerror(-ret)<< dendl;
       }
     }
   } while (is_truncated);
 
-  ret = store->complete_sync_user_stats(user_id);
+  ret = store->ctl()->user->complete_flush_stats(user.get_user());
   if (ret < 0) {
     cerr << "ERROR: failed to complete syncing user stats: ret=" << ret << std::endl;
     return ret;
@@ -100,37 +98,37 @@ int rgw_user_sync_all_stats(RGWRados *store, const rgw_user& user_id)
   return 0;
 }
 
-int rgw_user_get_all_buckets_stats(RGWRados *store, const rgw_user& user_id, map<string, cls_user_bucket_entry>&buckets_usage_map)
+int rgw_user_get_all_buckets_stats(rgw::sal::RGWRadosStore *store, const rgw_user& user_id, map<string, cls_user_bucket_entry>& buckets_usage_map)
 {
   CephContext *cct = store->ctx();
   size_t max_entries = cct->_conf->rgw_list_buckets_max_chunk;
   bool done;
-  bool is_truncated;
   string marker;
   int ret;
 
   do {
-    RGWUserBuckets user_buckets;
-    ret = rgw_read_user_buckets(store, user_id, user_buckets, marker,
-                               string(), max_entries, false, &is_truncated);
+    rgw::sal::RGWBucketList buckets;
+    ret = rgw_read_user_buckets(store, user_id, buckets, marker,
+                               string(), max_entries, false);
     if (ret < 0) {
       ldout(cct, 0) << "failed to read user buckets: ret=" << ret << dendl;
       return ret;
     }
-    map<string, RGWBucketEnt>& buckets = user_buckets.get_buckets();
-    for (const auto& i :  buckets) {
+    std::map<std::string, rgw::sal::RGWBucket*>& m = buckets.get_buckets();
+    for (const auto& i :  m) {
       marker = i.first;
 
-      const RGWBucketEnt& bucket_ent = i.second;
-      cls_user_bucket_entry entry;
-      ret = store->cls_user_get_bucket_stats(bucket_ent.bucket, entry);
+      rgw::sal::RGWBucket* bucket_ent = i.second;
+      ret = bucket_ent->read_bucket_stats(null_yield);
       if (ret < 0) {
         ldout(cct, 0) << "ERROR: could not get bucket stats: ret=" << ret << dendl;
         return ret;
       }
-      buckets_usage_map.emplace(bucket_ent.bucket.name, entry);
+      cls_user_bucket_entry entry;
+      bucket_ent->convert(&entry);
+      buckets_usage_map.emplace(bucket_ent->get_name(), entry);
     }
-    done = (buckets.size() < max_entries);
+    done = (buckets.count() < max_entries);
   } while (!done);
 
   return 0;
@@ -140,7 +138,7 @@ int rgw_user_get_all_buckets_stats(RGWRados *store, const rgw_user& user_id, map
  * Save the given user information to storage.
  * Returns: 0 on success, -ERR# on failure.
  */
-int rgw_store_user_info(RGWRados *store,
+int rgw_store_user_info(RGWUserCtl *user_ctl,
                         RGWUserInfo& info,
                         RGWUserInfo *old_info,
                         RGWObjVersionTracker *objv_tracker,
@@ -148,172 +146,20 @@ int rgw_store_user_info(RGWRados *store,
                         bool exclusive,
                         map<string, bufferlist> *pattrs)
 {
-  int ret;
-  RGWObjVersionTracker ot;
-
-  if (objv_tracker) {
-    ot = *objv_tracker;
-  }
-
-  if (ot.write_version.tag.empty()) {
-    if (ot.read_version.tag.empty()) {
-      ot.generate_new_write_ver(store->ctx());
-    } else {
-      ot.write_version = ot.read_version;
-      ot.write_version.ver++;
-    }
-  }
-
-  map<string, RGWAccessKey>::iterator iter;
-  for (iter = info.swift_keys.begin(); iter != info.swift_keys.end(); ++iter) {
-    if (old_info && old_info->swift_keys.count(iter->first) != 0)
-      continue;
-    RGWAccessKey& k = iter->second;
-    /* check if swift mapping exists */
-    RGWUserInfo inf;
-    int r = rgw_get_user_info_by_swift(store, k.id, inf);
-    if (r >= 0 && inf.user_id.compare(info.user_id) != 0) {
-      ldout(store->ctx(), 0) << "WARNING: can't store user info, swift id (" << k.id
-        << ") already mapped to another user (" << info.user_id << ")" << dendl;
-      return -EEXIST;
-    }
-  }
-
-  if (!info.access_keys.empty()) {
-    /* check if access keys already exist */
-    RGWUserInfo inf;
-    map<string, RGWAccessKey>::iterator iter = info.access_keys.begin();
-    for (; iter != info.access_keys.end(); ++iter) {
-      RGWAccessKey& k = iter->second;
-      if (old_info && old_info->access_keys.count(iter->first) != 0)
-        continue;
-      int r = rgw_get_user_info_by_access_key(store, k.id, inf);
-      if (r >= 0 && inf.user_id.compare(info.user_id) != 0) {
-        ldout(store->ctx(), 0) << "WARNING: can't store user info, access key already mapped to another user" << dendl;
-        return -EEXIST;
-      }
-    }
-  }
-
-  RGWUID ui;
-  ui.user_id = info.user_id;
-
-  bufferlist link_bl;
-  encode(ui, link_bl);
-
-  bufferlist data_bl;
-  encode(ui, data_bl);
-  encode(info, data_bl);
-
-  string key;
-  info.user_id.to_str(key);
-
-  ret = store->meta_mgr->put_entry(user_meta_handler, key, data_bl, exclusive, &ot, mtime, pattrs);
-  if (ret < 0)
-    return ret;
-
-  if (!info.user_email.empty()) {
-    if (!old_info ||
-        old_info->user_email.compare(info.user_email) != 0) { /* only if new index changed */
-      ret = rgw_put_system_obj(store, store->svc.zone->get_zone_params().user_email_pool, info.user_email,
-                               link_bl, exclusive, NULL, real_time());
-      if (ret < 0)
-        return ret;
-    }
-  }
-
-  if (!info.access_keys.empty()) {
-    map<string, RGWAccessKey>::iterator iter = info.access_keys.begin();
-    for (; iter != info.access_keys.end(); ++iter) {
-      RGWAccessKey& k = iter->second;
-      if (old_info && old_info->access_keys.count(iter->first) != 0)
-       continue;
-
-      ret = rgw_put_system_obj(store, store->svc.zone->get_zone_params().user_keys_pool, k.id,
-                               link_bl, exclusive, NULL, real_time());
-      if (ret < 0)
-        return ret;
-    }
-  }
-
-  map<string, RGWAccessKey>::iterator siter;
-  for (siter = info.swift_keys.begin(); siter != info.swift_keys.end(); ++siter) {
-    RGWAccessKey& k = siter->second;
-    if (old_info && old_info->swift_keys.count(siter->first) != 0)
-      continue;
-
-    ret = rgw_put_system_obj(store, store->svc.zone->get_zone_params().user_swift_pool, k.id,
-                             link_bl, exclusive, NULL, real_time());
-    if (ret < 0)
-      return ret;
-  }
-
-  return ret;
-}
-
-struct user_info_entry {
-  RGWUserInfo info;
-  RGWObjVersionTracker objv_tracker;
-  real_time mtime;
-};
-
-static RGWChainedCacheImpl<user_info_entry> uinfo_cache;
-
-int rgw_get_user_info_from_index(RGWRados * const store,
-                                 const string& key,
-                                 const rgw_pool& pool,
-                                 RGWUserInfo& info,
-                                 RGWObjVersionTracker * const objv_tracker,
-                                 real_time * const pmtime)
-{
-  if (auto e = uinfo_cache.find(key)) {
-    info = e->info;
-    if (objv_tracker)
-      *objv_tracker = e->objv_tracker;
-    if (pmtime)
-      *pmtime = e->mtime;
-    return 0;
-  }
-
-  user_info_entry e;
-  bufferlist bl;
-  RGWUID uid;
-  auto obj_ctx = store->svc.sysobj->init_obj_ctx();
-
-  int ret = rgw_get_system_obj(store, obj_ctx, pool, key, bl, NULL, &e.mtime);
-  if (ret < 0)
-    return ret;
-
-  rgw_cache_entry_info cache_info;
-
-  auto iter = bl.cbegin();
-  try {
-    decode(uid, iter);
-    int ret = rgw_get_user_info_by_uid(store, uid.user_id, e.info, &e.objv_tracker, NULL, &cache_info);
-    if (ret < 0) {
-      return ret;
-    }
-  } catch (buffer::error& err) {
-    ldout(store->ctx(), 0) << "ERROR: failed to decode user info, caught buffer::error" << dendl;
-    return -EIO;
-  }
-
-  uinfo_cache.put(store->svc.cache, key, &e, { &cache_info });
-
-  info = e.info;
-  if (objv_tracker)
-    *objv_tracker = e.objv_tracker;
-  if (pmtime)
-    *pmtime = e.mtime;
-
-  return 0;
+  return user_ctl->store_info(info, null_yield,
+                              RGWUserCtl::PutParams()
+                              .set_old_info(old_info)
+                              .set_objv_tracker(objv_tracker)
+                              .set_mtime(mtime)
+                              .set_exclusive(exclusive)
+                              .set_attrs(pattrs));
 }
 
 /**
  * Given a uid, finds the user info associated with it.
  * returns: 0 on success, -ERR# on failure (including nonexistence)
  */
-int rgw_get_user_info_by_uid(RGWRados *store,
+int rgw_get_user_info_by_uid(RGWUserCtl *user_ctl,
                              const rgw_user& uid,
                              RGWUserInfo& info,
                              RGWObjVersionTracker * const objv_tracker,
@@ -321,195 +167,57 @@ int rgw_get_user_info_by_uid(RGWRados *store,
                              rgw_cache_entry_info * const cache_info,
                              map<string, bufferlist> * const pattrs)
 {
-  bufferlist bl;
-  RGWUID user_id;
-
-  auto obj_ctx = store->svc.sysobj->init_obj_ctx();
-  string oid = uid.to_str();
-  int ret = rgw_get_system_obj(store, obj_ctx, store->svc.zone->get_zone_params().user_uid_pool, oid, bl, objv_tracker, pmtime, pattrs, cache_info);
-  if (ret < 0) {
-    return ret;
-  }
-
-  auto iter = bl.cbegin();
-  try {
-    decode(user_id, iter);
-    if (user_id.user_id.compare(uid) != 0) {
-      lderr(store->ctx())  << "ERROR: rgw_get_user_info_by_uid(): user id mismatch: " << user_id.user_id << " != " << uid << dendl;
-      return -EIO;
-    }
-    if (!iter.end()) {
-      decode(info, iter);
-    }
-  } catch (buffer::error& err) {
-    ldout(store->ctx(), 0) << "ERROR: failed to decode user info, caught buffer::error" << dendl;
-    return -EIO;
-  }
-
-  return 0;
+  return user_ctl->get_info_by_uid(uid, &info, null_yield,
+                                   RGWUserCtl::GetParams()
+                                   .set_objv_tracker(objv_tracker)
+                                   .set_mtime(pmtime)
+                                   .set_cache_info(cache_info)
+                                   .set_attrs(pattrs));
 }
 
 /**
  * Given an email, finds the user info associated with it.
  * returns: 0 on success, -ERR# on failure (including nonexistence)
  */
-int rgw_get_user_info_by_email(RGWRados *store, string& email, RGWUserInfo& info,
+int rgw_get_user_info_by_email(RGWUserCtl *user_ctl, string& email, RGWUserInfo& info,
                                RGWObjVersionTracker *objv_tracker, real_time *pmtime)
 {
-  return rgw_get_user_info_from_index(store, email, store->svc.zone->get_zone_params().user_email_pool, info, objv_tracker, pmtime);
+  return user_ctl->get_info_by_email(email, &info, null_yield,
+                                     RGWUserCtl::GetParams()
+                                     .set_objv_tracker(objv_tracker)
+                                     .set_mtime(pmtime));
 }
 
 /**
  * Given an swift username, finds the user_info associated with it.
  * returns: 0 on success, -ERR# on failure (including nonexistence)
  */
-extern int rgw_get_user_info_by_swift(RGWRados * const store,
+extern int rgw_get_user_info_by_swift(RGWUserCtl *user_ctl,
                                       const string& swift_name,
                                       RGWUserInfo& info,        /* out */
                                       RGWObjVersionTracker * const objv_tracker,
                                       real_time * const pmtime)
 {
-  return rgw_get_user_info_from_index(store, swift_name,
-                                      store->svc.zone->get_zone_params().user_swift_pool,
-                                      info, objv_tracker, pmtime);
+  return user_ctl->get_info_by_swift(swift_name, &info, null_yield,
+                                     RGWUserCtl::GetParams()
+                                     .set_objv_tracker(objv_tracker)
+                                     .set_mtime(pmtime));
 }
 
 /**
  * Given an access key, finds the user info associated with it.
  * returns: 0 on success, -ERR# on failure (including nonexistence)
  */
-extern int rgw_get_user_info_by_access_key(RGWRados* store,
+extern int rgw_get_user_info_by_access_key(RGWUserCtl *user_ctl,
                                            const std::string& access_key,
                                            RGWUserInfo& info,
                                            RGWObjVersionTracker* objv_tracker,
                                            real_time *pmtime)
 {
-  return rgw_get_user_info_from_index(store, access_key,
-                                      store->svc.zone->get_zone_params().user_keys_pool,
-                                      info, objv_tracker, pmtime);
-}
-
-int rgw_get_user_attrs_by_uid(RGWRados *store,
-                              const rgw_user& user_id,
-                              map<string, bufferlist>& attrs,
-                              RGWObjVersionTracker *objv_tracker)
-{
-  auto obj_ctx = store->svc.sysobj->init_obj_ctx();
-  rgw_raw_obj obj(store->svc.zone->get_zone_params().user_uid_pool, user_id.to_str());
-  auto src = obj_ctx.get_obj(obj);
-
-  return src.rop()
-            .set_attrs(&attrs)
-            .set_objv_tracker(objv_tracker)
-            .stat();
-}
-
-int rgw_remove_key_index(RGWRados *store, RGWAccessKey& access_key)
-{
-  rgw_raw_obj obj(store->svc.zone->get_zone_params().user_keys_pool, access_key.id);
-  auto obj_ctx = store->svc.sysobj->init_obj_ctx();
-  auto sysobj = obj_ctx.get_obj(obj);
-  return sysobj.wop().remove();
-}
-
-int rgw_remove_uid_index(RGWRados *store, rgw_user& uid)
-{
-  RGWObjVersionTracker objv_tracker;
-  RGWUserInfo info;
-  int ret = rgw_get_user_info_by_uid(store, uid, info, &objv_tracker, NULL);
-  if (ret < 0)
-    return ret;
-
-  string oid = uid.to_str();
-  ret = store->meta_mgr->remove_entry(user_meta_handler, oid, &objv_tracker);
-  if (ret < 0)
-    return ret;
-
-  return 0;
-}
-
-int rgw_remove_email_index(RGWRados *store, string& email)
-{
-  if (email.empty()) {
-    return 0;
-  }
-  rgw_raw_obj obj(store->svc.zone->get_zone_params().user_email_pool, email);
-  auto obj_ctx = store->svc.sysobj->init_obj_ctx();
-  auto sysobj = obj_ctx.get_obj(obj);
-  return sysobj.wop().remove();
-}
-
-int rgw_remove_swift_name_index(RGWRados *store, string& swift_name)
-{
-  rgw_raw_obj obj(store->svc.zone->get_zone_params().user_swift_pool, swift_name);
-  auto obj_ctx = store->svc.sysobj->init_obj_ctx();
-  auto sysobj = obj_ctx.get_obj(obj);
-  return sysobj.wop().remove();
-}
-
-/**
- * delete a user's presence from the RGW system.
- * First remove their bucket ACLs, then delete them
- * from the user and user email pools. This leaves the pools
- * themselves alone, as well as any ACLs embedded in object xattrs.
- */
-int rgw_delete_user(RGWRados *store, RGWUserInfo& info, RGWObjVersionTracker& objv_tracker) {
-  int ret;
-
-  map<string, RGWAccessKey>::iterator kiter = info.access_keys.begin();
-  for (; kiter != info.access_keys.end(); ++kiter) {
-    ldout(store->ctx(), 10) << "removing key index: " << kiter->first << dendl;
-    ret = rgw_remove_key_index(store, kiter->second);
-    if (ret < 0 && ret != -ENOENT) {
-      ldout(store->ctx(), 0) << "ERROR: could not remove " << kiter->first << " (access key object), should be fixed (err=" << ret << ")" << dendl;
-      return ret;
-    }
-  }
-
-  map<string, RGWAccessKey>::iterator siter = info.swift_keys.begin();
-  for (; siter != info.swift_keys.end(); ++siter) {
-    RGWAccessKey& k = siter->second;
-    ldout(store->ctx(), 10) << "removing swift subuser index: " << k.id << dendl;
-    /* check if swift mapping exists */
-    ret = rgw_remove_swift_name_index(store, k.id);
-    if (ret < 0 && ret != -ENOENT) {
-      ldout(store->ctx(), 0) << "ERROR: could not remove " << k.id << " (swift name object), should be fixed (err=" << ret << ")" << dendl;
-      return ret;
-    }
-  }
-
-  ldout(store->ctx(), 10) << "removing email index: " << info.user_email << dendl;
-  ret = rgw_remove_email_index(store, info.user_email);
-  if (ret < 0 && ret != -ENOENT) {
-    ldout(store->ctx(), 0) << "ERROR: could not remove email index object for "
-        << info.user_email << ", should be fixed (err=" << ret << ")" << dendl;
-    return ret;
-  }
-
-  string buckets_obj_id;
-  rgw_get_buckets_obj(info.user_id, buckets_obj_id);
-  rgw_raw_obj uid_bucks(store->svc.zone->get_zone_params().user_uid_pool, buckets_obj_id);
-  ldout(store->ctx(), 10) << "removing user buckets index" << dendl;
-  auto obj_ctx = store->svc.sysobj->init_obj_ctx();
-  auto sysobj = obj_ctx.get_obj(uid_bucks);
-  ret = sysobj.wop().remove();
-  if (ret < 0 && ret != -ENOENT) {
-    ldout(store->ctx(), 0) << "ERROR: could not remove " << info.user_id << ":" << uid_bucks << ", should be fixed (err=" << ret << ")" << dendl;
-    return ret;
-  }
-
-  string key;
-  info.user_id.to_str(key);
-  
-  rgw_raw_obj uid_obj(store->svc.zone->get_zone_params().user_uid_pool, key);
-  ldout(store->ctx(), 10) << "removing user index: " << info.user_id << dendl;
-  ret = store->meta_mgr->remove_entry(user_meta_handler, key, &objv_tracker);
-  if (ret < 0 && ret != -ENOENT && ret  != -ECANCELED) {
-    ldout(store->ctx(), 0) << "ERROR: could not remove " << info.user_id << ":" << uid_obj << ", should be fixed (err=" << ret << ")" << dendl;
-    return ret;
-  }
-
-  return 0;
+  return user_ctl->get_info_by_access_key(access_key, &info, null_yield,
+                                          RGWUserCtl::GetParams()
+                                          .set_objv_tracker(objv_tracker)
+                                          .set_mtime(pmtime));
 }
 
 static bool char_is_unreserved_url(char c)
@@ -615,50 +323,6 @@ static void set_err_msg(std::string *sink, std::string msg)
     *sink = msg;
 }
 
-static bool remove_old_indexes(RGWRados *store,
-         RGWUserInfo& old_info, RGWUserInfo& new_info, std::string *err_msg)
-{
-  int ret;
-  bool success = true;
-
-  if (!old_info.user_id.empty() &&
-      old_info.user_id.compare(new_info.user_id) != 0) {
-    if (old_info.user_id.tenant != new_info.user_id.tenant) {
-      ldout(store->ctx(), 0) << "ERROR: tenant mismatch: " << old_info.user_id.tenant << " != " << new_info.user_id.tenant << dendl;
-      return false;
-    }
-    ret = rgw_remove_uid_index(store, old_info.user_id);
-    if (ret < 0 && ret != -ENOENT) {
-      set_err_msg(err_msg, "ERROR: could not remove index for uid " + old_info.user_id.to_str());
-      success = false;
-    }
-  }
-
-  if (!old_info.user_email.empty() &&
-      old_info.user_email.compare(new_info.user_email) != 0) {
-    ret = rgw_remove_email_index(store, old_info.user_email);
-  if (ret < 0 && ret != -ENOENT) {
-      set_err_msg(err_msg, "ERROR: could not remove index for email " + old_info.user_email);
-      success = false;
-    }
-  }
-
-  map<string, RGWAccessKey>::iterator old_iter;
-  for (old_iter = old_info.swift_keys.begin(); old_iter != old_info.swift_keys.end(); ++old_iter) {
-    RGWAccessKey& swift_key = old_iter->second;
-    map<string, RGWAccessKey>::iterator new_iter = new_info.swift_keys.find(swift_key.id);
-    if (new_iter == new_info.swift_keys.end()) {
-      ret = rgw_remove_swift_name_index(store, swift_key.id);
-      if (ret < 0 && ret != -ENOENT) {
-        set_err_msg(err_msg, "ERROR: could not remove index for swift_name " + swift_key.id);
-        success = false;
-      }
-    }
-  }
-
-  return success;
-}
-
 /*
  * Dump either the full user info or a subset to a formatter.
  *
@@ -780,24 +444,14 @@ static void dump_user_info(Formatter *f, RGWUserInfo &info,
 
 RGWAccessKeyPool::RGWAccessKeyPool(RGWUser* usr)
 {
-  user = usr;
-  swift_keys = NULL;
-  access_keys = NULL;
-
-  if (!user) {
-    keys_allowed = false;
-    store = NULL;
+  if (!usr) {
     return;
   }
 
-  keys_allowed = true;
+  user = usr;
 
   store = user->get_store();
-}
-
-RGWAccessKeyPool::~RGWAccessKeyPool()
-{
-
+  user_ctl = user->get_user_ctl();
 }
 
 int RGWAccessKeyPool::init(RGWUserAdminOpState& op_state)
@@ -963,13 +617,13 @@ int RGWAccessKeyPool::generate_key(RGWUserAdminOpState& op_state, std::string *e
   if (!id.empty()) {
     switch (key_type) {
     case KEY_TYPE_SWIFT:
-      if (rgw_get_user_info_by_swift(store, id, duplicate_check) >= 0) {
+      if (rgw_get_user_info_by_swift(user_ctl, id, duplicate_check) >= 0) {
         set_err_msg(err_msg, "existing swift key in RGW system:" + id);
         return -ERR_KEY_EXIST;
       }
       break;
     case KEY_TYPE_S3:
-      if (rgw_get_user_info_by_access_key(store, id, duplicate_check) >= 0) {
+      if (rgw_get_user_info_by_access_key(user_ctl, id, duplicate_check) >= 0) {
         set_err_msg(err_msg, "existing S3 key in RGW system:" + id);
         return -ERR_KEY_EXIST;
       }
@@ -1009,7 +663,7 @@ int RGWAccessKeyPool::generate_key(RGWUserAdminOpState& op_state, std::string *e
       if (!validate_access_key(id))
         continue;
 
-    } while (!rgw_get_user_info_by_access_key(store, id, duplicate_check));
+    } while (!rgw_get_user_info_by_access_key(user_ctl, id, duplicate_check));
   }
 
   if (key_type == KEY_TYPE_SWIFT) {
@@ -1020,7 +674,7 @@ int RGWAccessKeyPool::generate_key(RGWUserAdminOpState& op_state, std::string *e
     }
 
     // check that the access key doesn't exist
-    if (rgw_get_user_info_by_swift(store, id, duplicate_check) >= 0) {
+    if (rgw_get_user_info_by_swift(user_ctl, id, duplicate_check) >= 0) {
       set_err_msg(err_msg, "cannot create existing swift key");
       return -ERR_KEY_EXIST;
     }
@@ -1209,7 +863,6 @@ int RGWAccessKeyPool::execute_remove(RGWUserAdminOpState& op_state, std::string
     return -ERR_INVALID_ACCESS_KEY;
   }
 
-  rgw_remove_key_index(store, kiter->second);
   keys_map->erase(kiter);
 
   if (!defer_user_update)
@@ -1276,7 +929,6 @@ int RGWAccessKeyPool::remove_subuser_keys(RGWUserAdminOpState& op_state,
   keys_map = swift_keys;
   kiter = keys_map->find(swift_kid);
   if (kiter != keys_map->end()) {
-    rgw_remove_key_index(store, kiter->second);
     keys_map->erase(kiter);
   }
 
@@ -1284,12 +936,11 @@ int RGWAccessKeyPool::remove_subuser_keys(RGWUserAdminOpState& op_state,
   std::string subuser_str = op_state.get_subuser();
   keys_map = access_keys;
   RGWUserInfo user_info = op_state.get_user_info();
-  map<std::string, RGWAccessKey>::iterator user_kiter = user_info.access_keys.begin();
+  auto user_kiter = user_info.access_keys.begin();
   for (; user_kiter != user_info.access_keys.end(); ++user_kiter) {
     if (user_kiter->second.subuser == subuser_str) {
       kiter = keys_map->find(user_kiter->first);
       if (kiter != keys_map->end()) {
-        rgw_remove_key_index(store, kiter->second);
         keys_map->erase(kiter);
       }
     }
@@ -1306,18 +957,15 @@ int RGWAccessKeyPool::remove_subuser_keys(RGWUserAdminOpState& op_state,
 
 RGWSubUserPool::RGWSubUserPool(RGWUser *usr)
 {
-  subusers_allowed = (usr != NULL);
-  if (usr)
-    store = usr->get_store();
-  else
-    store = NULL;
-  user = usr;
-  subuser_map = NULL;
-}
+  if (!usr) {
+    return;
+  }
 
-RGWSubUserPool::~RGWSubUserPool()
-{
+  user = usr;
 
+  subusers_allowed = true;
+  store = user->get_store();
+  user_ctl = user->get_user_ctl();
 }
 
 int RGWSubUserPool::init(RGWUserAdminOpState& op_state)
@@ -1609,14 +1257,11 @@ int RGWSubUserPool::modify(RGWUserAdminOpState& op_state, std::string *err_msg,
 
 RGWUserCapPool::RGWUserCapPool(RGWUser *usr)
 {
+  if (!usr) {
+    return;
+  }
   user = usr;
-  caps = NULL;
-  caps_allowed = (user != NULL);
-}
-
-RGWUserCapPool::~RGWUserCapPool()
-{
-
+  caps_allowed = true;
 }
 
 int RGWUserCapPool::init(RGWUserAdminOpState& op_state)
@@ -1724,12 +1369,12 @@ int RGWUserCapPool::remove(RGWUserAdminOpState& op_state, std::string *err_msg,
   return 0;
 }
 
-RGWUser::RGWUser() : store(NULL), info_stored(false), caps(this), keys(this), subusers(this)
+RGWUser::RGWUser() : caps(this), keys(this), subusers(this)
 {
   init_default();
 }
 
-int RGWUser::init(RGWRados *storage, RGWUserAdminOpState& op_state)
+int RGWUser::init(rgw::sal::RGWRadosStore *storage, RGWUserAdminOpState& op_state)
 {
   init_default();
   int ret = init_storage(storage);
@@ -1743,10 +1388,6 @@ int RGWUser::init(RGWRados *storage, RGWUserAdminOpState& op_state)
   return 0;
 }
 
-RGWUser::~RGWUser()
-{
-}
-
 void RGWUser::init_default()
 {
   // use anonymous user info as a placeholder
@@ -1756,13 +1397,14 @@ void RGWUser::init_default()
   clear_populated();
 }
 
-int RGWUser::init_storage(RGWRados *storage)
+int RGWUser::init_storage(rgw::sal::RGWRadosStore *storage)
 {
   if (!storage) {
     return -EINVAL;
   }
 
   store = storage;
+  user_ctl = store->ctl()->user;
 
   clear_populated();
 
@@ -1802,21 +1444,21 @@ int RGWUser::init(RGWUserAdminOpState& op_state)
   }
 
   if (!user_id.empty() && (user_id.compare(RGW_USER_ANON_ID) != 0)) {
-    found = (rgw_get_user_info_by_uid(store, user_id, user_info, &op_state.objv) >= 0);
+    found = (rgw_get_user_info_by_uid(user_ctl, user_id, user_info, &op_state.objv) >= 0);
     op_state.found_by_uid = found;
   }
   if (store->ctx()->_conf.get_val<bool>("rgw_user_unique_email")) {
     if (!user_email.empty() && !found) {
-      found = (rgw_get_user_info_by_email(store, user_email, user_info, &op_state.objv) >= 0);
+      found = (rgw_get_user_info_by_email(user_ctl, user_email, user_info, &op_state.objv) >= 0);
       op_state.found_by_email = found;
     }
   }
   if (!swift_user.empty() && !found) {
-    found = (rgw_get_user_info_by_swift(store, swift_user, user_info, &op_state.objv) >= 0);
+    found = (rgw_get_user_info_by_swift(user_ctl, swift_user, user_info, &op_state.objv) >= 0);
     op_state.found_by_key = found;
   }
   if (!access_key.empty() && !found) {
-    found = (rgw_get_user_info_by_access_key(store, access_key, user_info, &op_state.objv) >= 0);
+    found = (rgw_get_user_info_by_access_key(user_ctl, access_key, user_info, &op_state.objv) >= 0);
     op_state.found_by_key = found;
   }
   
@@ -1872,24 +1514,12 @@ int RGWUser::update(RGWUserAdminOpState& op_state, std::string *err_msg)
     return -EINVAL;
   }
 
-  if (is_populated()) {
-    ret = rgw_store_user_info(store, user_info, &old_info, &op_state.objv, real_time(), false);
-    if (ret < 0) {
-      set_err_msg(err_msg, "unable to store user info");
-      return ret;
-    }
+  RGWUserInfo *pold_info = (is_populated() ? &old_info : nullptr);
 
-    ret = remove_old_indexes(store, old_info, user_info, &subprocess_msg);
-    if (ret < 0) {
-      set_err_msg(err_msg, "unable to remove old user info, " + subprocess_msg);
-      return ret;
-    }
-  } else {
-    ret = rgw_store_user_info(store, user_info, NULL, &op_state.objv, real_time(), false);
-    if (ret < 0) {
-      set_err_msg(err_msg, "unable to store user info");
-      return ret;
-    }
+  ret = rgw_store_user_info(user_ctl, user_info, pold_info, &op_state.objv, real_time(), false);
+  if (ret < 0) {
+    set_err_msg(err_msg, "unable to store user info");
+    return ret;
   }
 
   old_info = user_info;
@@ -1937,6 +1567,128 @@ int RGWUser::check_op(RGWUserAdminOpState& op_state, std::string *err_msg)
   return 0;
 }
 
+// update swift_keys with new user id
+static void rename_swift_keys(const rgw_user& user,
+                              std::map<std::string, RGWAccessKey>& keys)
+{
+  std::string user_id;
+  user.to_str(user_id);
+
+  auto modify_keys = std::move(keys);
+  for ([[maybe_unused]] auto& [k, key] : modify_keys) {
+    std::string id = user_id + ":" + key.subuser;
+    key.id = id;
+    keys[id] = std::move(key);
+  }
+}
+
+int RGWUser::execute_rename(RGWUserAdminOpState& op_state, std::string *err_msg)
+{
+  int ret;
+  bool populated = op_state.is_populated();
+
+  if (!op_state.has_existing_user() && !populated) {
+    set_err_msg(err_msg, "user not found");
+    return -ENOENT;
+  }
+
+  if (!populated) {
+    ret = init(op_state);
+    if (ret < 0) {
+      set_err_msg(err_msg, "unable to retrieve user info");
+      return ret;
+    }
+  }
+
+  rgw::sal::RGWRadosUser old_user(store, op_state.get_user_info());
+  rgw::sal::RGWRadosUser new_user(store, op_state.get_new_uid());
+  if (old_user.get_tenant() != new_user.get_tenant()) {
+    set_err_msg(err_msg, "users have to be under the same tenant namespace "
+                + old_user.get_tenant() + " != " + new_user.get_tenant());
+    return -EINVAL;
+  }
+
+  // create a stub user and write only the uid index and buckets object
+  RGWUserInfo stub_user_info;
+  stub_user_info.user_id = new_user.get_user();
+
+  RGWObjVersionTracker objv;
+  const bool exclusive = !op_state.get_overwrite_new_user(); // overwrite if requested
+
+  ret = user_ctl->store_info(stub_user_info, null_yield,
+                             RGWUserCtl::PutParams()
+                             .set_objv_tracker(&objv)
+                             .set_exclusive(exclusive));
+  if (ret == -EEXIST) {
+    set_err_msg(err_msg, "user name given by --new-uid already exists");
+    return ret;
+  }
+  if (ret < 0) {
+    set_err_msg(err_msg, "unable to store new user info");
+    return ret;
+  }
+
+  RGWAccessControlPolicy policy_instance;
+  policy_instance.create_default(new_user.get_user(), old_user.get_display_name());
+
+  //unlink and link buckets to new user
+  string marker;
+  CephContext *cct = store->ctx();
+  size_t max_buckets = cct->_conf->rgw_list_buckets_max_chunk;
+  rgw::sal::RGWBucketList buckets;
+
+  do {
+    ret = old_user.list_buckets(marker, "", max_buckets, false, buckets);
+    if (ret < 0) {
+      set_err_msg(err_msg, "unable to list user buckets");
+      return ret;
+    }
+
+    map<std::string, rgw::sal::RGWBucket*>& m = buckets.get_buckets();
+    std::map<std::string, rgw::sal::RGWBucket*>::iterator it;
+
+    for (it = m.begin(); it != m.end(); ++it) {
+      rgw::sal::RGWBucket* bucket = it->second;
+      marker = it->first;
+
+      ret = bucket->get_bucket_info(null_yield);
+      if (ret < 0) {
+        set_err_msg(err_msg, "failed to fetch bucket info for bucket=" + bucket->get_name());
+        return ret;
+      }
+
+      ret = bucket->set_acl(policy_instance, null_yield);
+      if (ret < 0) {
+        set_err_msg(err_msg, "failed to set acl on bucket " + bucket->get_name());
+        return ret;
+      }
+
+      ret = bucket->link(&new_user, null_yield);
+      if (ret < 0) {
+        set_err_msg(err_msg, "failed to link bucket " + bucket->get_name());
+        return ret;
+      }
+
+      ret = bucket->chown(&new_user, &old_user, null_yield);
+      if (ret < 0) {
+        set_err_msg(err_msg, "failed to run bucket chown" + cpp_strerror(-ret));
+        return ret;
+      }
+    }
+
+  } while (buckets.is_truncated());
+
+  // update the 'stub user' with all of the other fields and rewrite all of the
+  // associated index objects
+  RGWUserInfo& user_info = op_state.get_user_info();
+  user_info.user_id = new_user.get_user();
+  op_state.objv = objv;
+
+  rename_swift_keys(new_user.get_user(), user_info.swift_keys);
+
+  return update(op_state, err_msg);
+}
+
 int RGWUser::execute_add(RGWUserAdminOpState& op_state, std::string *err_msg)
 {
   std::string subprocess_msg;
@@ -1951,13 +1703,6 @@ int RGWUser::execute_add(RGWUserAdminOpState& op_state, std::string *err_msg)
 
   // fail if the user exists already
   if (op_state.has_existing_user()) {
-    if (!op_state.exclusive &&
-        (user_email.empty() ||
-        boost::iequals(user_email, old_info.user_email)) &&
-        old_info.display_name == display_name) {
-      return execute_modify(op_state, err_msg);
-    }
-
     if (op_state.found_by_email) {
       set_err_msg(err_msg, "email: " + user_email +
                  " is the email address an existing user");
@@ -2028,6 +1773,14 @@ int RGWUser::execute_add(RGWUserAdminOpState& op_state, std::string *err_msg)
     rgw_apply_default_user_quota(user_info.user_quota, cct->_conf);
   }
 
+  if (op_state.default_placement_specified) {
+    user_info.default_placement = op_state.default_placement;
+  }
+
+  if (op_state.placement_tags_specified) {
+    user_info.placement_tags = op_state.placement_tags;
+  }
+
   // update the request
   op_state.set_user_info(user_info);
   op_state.set_populated();
@@ -2064,6 +1817,7 @@ int RGWUser::execute_add(RGWUserAdminOpState& op_state, std::string *err_msg)
   return 0;
 }
 
+
 int RGWUser::add(RGWUserAdminOpState& op_state, std::string *err_msg)
 {
   std::string subprocess_msg;
@@ -2084,7 +1838,27 @@ int RGWUser::add(RGWUserAdminOpState& op_state, std::string *err_msg)
   return 0;
 }
 
-int RGWUser::execute_remove(RGWUserAdminOpState& op_state, std::string *err_msg)
+int RGWUser::rename(RGWUserAdminOpState& op_state, std::string *err_msg)
+{
+  std::string subprocess_msg;
+  int ret;
+
+  ret = check_op(op_state, &subprocess_msg);
+  if (ret < 0) {
+    set_err_msg(err_msg, "unable to parse parameters, " + subprocess_msg);
+    return ret;
+  }
+
+  ret = execute_rename(op_state, &subprocess_msg);
+  if (ret < 0) {
+    set_err_msg(err_msg, "unable to rename user, " + subprocess_msg);
+    return ret;
+  }
+
+  return 0;
+}
+
+int RGWUser::execute_remove(RGWUserAdminOpState& op_state, std::string *err_msg, optional_yield y)
 {
   int ret;
 
@@ -2097,28 +1871,27 @@ int RGWUser::execute_remove(RGWUserAdminOpState& op_state, std::string *err_msg)
     return -ENOENT;
   }
 
-  bool is_truncated = false;
+  rgw::sal::RGWBucketList buckets;
   string marker;
   CephContext *cct = store->ctx();
   size_t max_buckets = cct->_conf->rgw_list_buckets_max_chunk;
   do {
-    RGWUserBuckets buckets;
     ret = rgw_read_user_buckets(store, uid, buckets, marker, string(),
-                               max_buckets, false, &is_truncated);
+                               max_buckets, false);
     if (ret < 0) {
       set_err_msg(err_msg, "unable to read user bucket info");
       return ret;
     }
 
-    map<std::string, RGWBucketEnt>& m = buckets.get_buckets();
+    std::map<std::string, rgw::sal::RGWBucket*>& m = buckets.get_buckets();
     if (!m.empty() && !purge_data) {
       set_err_msg(err_msg, "must specify purge data to remove user with buckets");
       return -EEXIST; // change to code that maps to 409: conflict
     }
 
-    std::map<std::string, RGWBucketEnt>::iterator it;
+    std::map<std::string, rgw::sal::RGWBucket*>::iterator it;
     for (it = m.begin(); it != m.end(); ++it) {
-      ret = rgw_remove_bucket(store, ((*it).second).bucket, true);
+      ret = it->second->remove_bucket(true, y);
       if (ret < 0) {
         set_err_msg(err_msg, "unable to delete user data");
         return ret;
@@ -2127,9 +1900,10 @@ int RGWUser::execute_remove(RGWUserAdminOpState& op_state, std::string *err_msg)
       marker = it->first;
     }
 
-  } while (is_truncated);
+  } while (buckets.is_truncated());
 
-  ret = rgw_delete_user(store, user_info, op_state.objv);
+  ret = user_ctl->remove_info(user_info, y, RGWUserCtl::RemoveParams()
+                                            .set_objv_tracker(&op_state.objv));
   if (ret < 0) {
     set_err_msg(err_msg, "unable to remove user from RADOS");
     return ret;
@@ -2141,7 +1915,7 @@ int RGWUser::execute_remove(RGWUserAdminOpState& op_state, std::string *err_msg)
   return 0;
 }
 
-int RGWUser::remove(RGWUserAdminOpState& op_state, std::string *err_msg)
+int RGWUser::remove(RGWUserAdminOpState& op_state, optional_yield y, std::string *err_msg)
 {
   std::string subprocess_msg;
   int ret;
@@ -2152,7 +1926,7 @@ int RGWUser::remove(RGWUserAdminOpState& op_state, std::string *err_msg)
     return ret;
   }
 
-  ret = execute_remove(op_state, &subprocess_msg);
+  ret = execute_remove(op_state, &subprocess_msg, y);
   if (ret < 0) {
     set_err_msg(err_msg, "unable to remove user, " + subprocess_msg);
     return ret;
@@ -2198,8 +1972,8 @@ int RGWUser::execute_modify(RGWUserAdminOpState& op_state, std::string *err_msg)
   std::string old_email = old_info.user_email;
   if (!op_email.empty()) {
     // make sure we are not adding a duplicate email
-    if (old_email.compare(op_email) != 0) {
-      ret = rgw_get_user_info_by_email(store, op_email, duplicate_check);
+    if (old_email != op_email) {
+      ret = rgw_get_user_info_by_email(user_ctl, op_email, duplicate_check);
       if (ret >= 0 && duplicate_check.user_id.compare(user_id) != 0) {
         set_err_msg(err_msg, "cannot add duplicate email");
         return -ERR_EMAIL_EXIST;
@@ -2207,14 +1981,9 @@ int RGWUser::execute_modify(RGWUserAdminOpState& op_state, std::string *err_msg)
     }
     user_info.user_email = op_email;
   } else if (op_email.empty() && op_state.user_email_specified) {
-
     ldout(store->ctx(), 10) << "removing email index: " << user_info.user_email << dendl;
-    ret = rgw_remove_email_index(store, user_info.user_email);
-    if (ret < 0 && ret != -ENOENT) {
-      ldout(store->ctx(), 0) << "ERROR: could not remove " << user_info.user_id << " index (err=" << ret << ")" << dendl;
-      return ret;
-    }
-    user_info.user_email = "";
+    /* will be physically removed later when calling update() */
+    user_info.user_email.clear();
   }
 
   // update the remaining user info
@@ -2251,48 +2020,56 @@ int RGWUser::execute_modify(RGWUserAdminOpState& op_state, std::string *err_msg)
     __u8 suspended = op_state.get_suspension_status();
     user_info.suspended = suspended;
 
-    RGWUserBuckets buckets;
+    rgw::sal::RGWBucketList buckets;
 
     if (user_id.empty()) {
       set_err_msg(err_msg, "empty user id passed...aborting");
       return -EINVAL;
     }
 
-    bool is_truncated = false;
     string marker;
     CephContext *cct = store->ctx();
     size_t max_buckets = cct->_conf->rgw_list_buckets_max_chunk;
     do {
       ret = rgw_read_user_buckets(store, user_id, buckets, marker, string(),
-                                 max_buckets, false, &is_truncated);
+                                 max_buckets, false);
       if (ret < 0) {
         set_err_msg(err_msg, "could not get buckets for uid:  " + user_id.to_str());
         return ret;
       }
 
-      map<string, RGWBucketEnt>& m = buckets.get_buckets();
-      map<string, RGWBucketEnt>::iterator iter;
+      std::map<std::string, rgw::sal::RGWBucket*>& m = buckets.get_buckets();
+      std::map<std::string, rgw::sal::RGWBucket*>::iterator iter;
 
       vector<rgw_bucket> bucket_names;
       for (iter = m.begin(); iter != m.end(); ++iter) {
-        RGWBucketEnt obj = iter->second;
-        bucket_names.push_back(obj.bucket);
+       rgw::sal::RGWBucket* obj = iter->second;
+        bucket_names.push_back(obj->get_bi());
 
         marker = iter->first;
       }
 
-      ret = store->set_buckets_enabled(bucket_names, !suspended);
+      ret = store->getRados()->set_buckets_enabled(bucket_names, !suspended);
       if (ret < 0) {
         set_err_msg(err_msg, "failed to modify bucket");
         return ret;
       }
 
-    } while (is_truncated);
+    } while (buckets.is_truncated());
   }
 
   if (op_state.mfa_ids_specified) {
     user_info.mfa_ids = op_state.mfa_ids;
   }
+
+  if (op_state.default_placement_specified) {
+    user_info.default_placement = op_state.default_placement;
+  }
+
+  if (op_state.placement_tags_specified) {
+    user_info.placement_tags = op_state.placement_tags;
+  }
+
   op_state.set_user_info(user_info);
 
   // if we're supposed to modify keys, do so
@@ -2365,7 +2142,9 @@ int RGWUser::list(RGWUserAdminOpState& op_state, RGWFormatterFlusher& flusher)
     op_state.max_entries = 1000;
   }
 
-  int ret = store->meta_mgr->list_keys_init(metadata_key, op_state.marker, &handle);
+  auto meta_mgr = store->ctl()->meta.mgr;
+
+  int ret = meta_mgr->list_keys_init(metadata_key, op_state.marker, &handle);
   if (ret < 0) {
     return ret;
   }
@@ -2383,7 +2162,7 @@ int RGWUser::list(RGWUserAdminOpState& op_state, RGWFormatterFlusher& flusher)
   do {
     std::list<std::string> keys;
     left = op_state.max_entries - count;
-    ret = store->meta_mgr->list_keys_next(handle, left, keys, &truncated);
+    ret = meta_mgr->list_keys_next(handle, left, keys, &truncated);
     if (ret < 0 && ret != -ENOENT) {
       return ret;
     } if (ret != -ENOENT) {
@@ -2399,19 +2178,19 @@ int RGWUser::list(RGWUserAdminOpState& op_state, RGWFormatterFlusher& flusher)
   formatter->dump_bool("truncated", truncated);
   formatter->dump_int("count", count);
   if (truncated) {
-    formatter->dump_string("marker", store->meta_mgr->get_marker(handle));
+    formatter->dump_string("marker", meta_mgr->get_marker(handle));
   }
 
   // close result object section
   formatter->close_section();
 
-  store->meta_mgr->list_keys_complete(handle);
+  meta_mgr->list_keys_complete(handle);
 
   flusher.flush();
   return 0;
 }
 
-int RGWUserAdminOp_User::list(RGWRados *store, RGWUserAdminOpState& op_state,
+int RGWUserAdminOp_User::list(rgw::sal::RGWRadosStore *store, RGWUserAdminOpState& op_state,
                   RGWFormatterFlusher& flusher)
 {
   RGWUser user;
@@ -2427,7 +2206,7 @@ int RGWUserAdminOp_User::list(RGWRados *store, RGWUserAdminOpState& op_state,
   return 0;
 }
 
-int RGWUserAdminOp_User::info(RGWRados *store, RGWUserAdminOpState& op_state,
+int RGWUserAdminOp_User::info(rgw::sal::RGWRadosStore *store, RGWUserAdminOpState& op_state,
                   RGWFormatterFlusher& flusher)
 {
   RGWUserInfo info;
@@ -2456,7 +2235,7 @@ int RGWUserAdminOp_User::info(RGWRados *store, RGWUserAdminOpState& op_state,
   RGWStorageStats stats;
   RGWStorageStats *arg_stats = NULL;
   if (op_state.fetch_stats) {
-    int ret = store->get_user_stats(info.user_id, stats);
+    int ret = store->ctl()->user->read_stats(info.user_id, &stats);
     if (ret < 0 && ret != -ENOENT) {
       return ret;
     }
@@ -2474,7 +2253,7 @@ int RGWUserAdminOp_User::info(RGWRados *store, RGWUserAdminOpState& op_state,
   return 0;
 }
 
-int RGWUserAdminOp_User::create(RGWRados *store, RGWUserAdminOpState& op_state,
+int RGWUserAdminOp_User::create(rgw::sal::RGWRadosStore *store, RGWUserAdminOpState& op_state,
                   RGWFormatterFlusher& flusher)
 {
   RGWUserInfo info;
@@ -2506,7 +2285,7 @@ int RGWUserAdminOp_User::create(RGWRados *store, RGWUserAdminOpState& op_state,
   return 0;
 }
 
-int RGWUserAdminOp_User::modify(RGWRados *store, RGWUserAdminOpState& op_state,
+int RGWUserAdminOp_User::modify(rgw::sal::RGWRadosStore *store, RGWUserAdminOpState& op_state,
                   RGWFormatterFlusher& flusher)
 {
   RGWUserInfo info;
@@ -2537,8 +2316,8 @@ int RGWUserAdminOp_User::modify(RGWRados *store, RGWUserAdminOpState& op_state,
   return 0;
 }
 
-int RGWUserAdminOp_User::remove(RGWRados *store, RGWUserAdminOpState& op_state,
-                  RGWFormatterFlusher& flusher)
+int RGWUserAdminOp_User::remove(rgw::sal::RGWRadosStore *store, RGWUserAdminOpState& op_state,
+                  RGWFormatterFlusher& flusher, optional_yield y)
 {
   RGWUserInfo info;
   RGWUser user;
@@ -2547,14 +2326,14 @@ int RGWUserAdminOp_User::remove(RGWRados *store, RGWUserAdminOpState& op_state,
     return ret;
 
 
-  ret = user.remove(op_state, NULL);
+  ret = user.remove(op_state, y, NULL);
 
   if (ret == -ENOENT)
     ret = -ERR_NO_SUCH_USER;
   return ret;
 }
 
-int RGWUserAdminOp_Subuser::create(RGWRados *store, RGWUserAdminOpState& op_state,
+int RGWUserAdminOp_Subuser::create(rgw::sal::RGWRadosStore *store, RGWUserAdminOpState& op_state,
                   RGWFormatterFlusher& flusher)
 {
   RGWUserInfo info;
@@ -2586,7 +2365,7 @@ int RGWUserAdminOp_Subuser::create(RGWRados *store, RGWUserAdminOpState& op_stat
   return 0;
 }
 
-int RGWUserAdminOp_Subuser::modify(RGWRados *store, RGWUserAdminOpState& op_state,
+int RGWUserAdminOp_Subuser::modify(rgw::sal::RGWRadosStore *store, RGWUserAdminOpState& op_state,
                   RGWFormatterFlusher& flusher)
 {
   RGWUserInfo info;
@@ -2618,7 +2397,7 @@ int RGWUserAdminOp_Subuser::modify(RGWRados *store, RGWUserAdminOpState& op_stat
   return 0;
 }
 
-int RGWUserAdminOp_Subuser::remove(RGWRados *store, RGWUserAdminOpState& op_state,
+int RGWUserAdminOp_Subuser::remove(rgw::sal::RGWRadosStore *store, RGWUserAdminOpState& op_state,
                   RGWFormatterFlusher& flusher)
 {
   RGWUserInfo info;
@@ -2638,7 +2417,7 @@ int RGWUserAdminOp_Subuser::remove(RGWRados *store, RGWUserAdminOpState& op_stat
   return 0;
 }
 
-int RGWUserAdminOp_Key::create(RGWRados *store, RGWUserAdminOpState& op_state,
+int RGWUserAdminOp_Key::create(rgw::sal::RGWRadosStore *store, RGWUserAdminOpState& op_state,
                   RGWFormatterFlusher& flusher)
 {
   RGWUserInfo info;
@@ -2677,7 +2456,7 @@ int RGWUserAdminOp_Key::create(RGWRados *store, RGWUserAdminOpState& op_state,
   return 0;
 }
 
-int RGWUserAdminOp_Key::remove(RGWRados *store, RGWUserAdminOpState& op_state,
+int RGWUserAdminOp_Key::remove(rgw::sal::RGWRadosStore *store, RGWUserAdminOpState& op_state,
                   RGWFormatterFlusher& flusher)
 {
   RGWUserInfo info;
@@ -2697,7 +2476,7 @@ int RGWUserAdminOp_Key::remove(RGWRados *store, RGWUserAdminOpState& op_state,
   return 0;
 }
 
-int RGWUserAdminOp_Caps::add(RGWRados *store, RGWUserAdminOpState& op_state,
+int RGWUserAdminOp_Caps::add(rgw::sal::RGWRadosStore *store, RGWUserAdminOpState& op_state,
                   RGWFormatterFlusher& flusher)
 {
   RGWUserInfo info;
@@ -2730,7 +2509,7 @@ int RGWUserAdminOp_Caps::add(RGWRados *store, RGWUserAdminOpState& op_state,
 }
 
 
-int RGWUserAdminOp_Caps::remove(RGWRados *store, RGWUserAdminOpState& op_state,
+int RGWUserAdminOp_Caps::remove(rgw::sal::RGWRadosStore *store, RGWUserAdminOpState& op_state,
                   RGWFormatterFlusher& flusher)
 {
   RGWUserInfo info;
@@ -2762,53 +2541,29 @@ int RGWUserAdminOp_Caps::remove(RGWRados *store, RGWUserAdminOpState& op_state,
   return 0;
 }
 
-struct RGWUserCompleteInfo {
-  RGWUserInfo info;
-  map<string, bufferlist> attrs;
-  bool has_attrs;
-
-  RGWUserCompleteInfo()
-    : has_attrs(false)
-  {}
-
-  void dump(Formatter * const f) const {
-    info.dump(f);
-    encode_json("attrs", attrs, f);
-  }
-
-  void decode_json(JSONObj *obj) {
-    decode_json_obj(info, obj);
-    has_attrs = JSONDecoder::decode_json("attrs", attrs, obj);
-  }
-};
-
-class RGWUserMetadataObject : public RGWMetadataObject {
-  RGWUserCompleteInfo uci;
+class RGWUserMetadataHandler : public RGWMetadataHandler_GenericMetaBE {
 public:
-  RGWUserMetadataObject(const RGWUserCompleteInfo& _uci, obj_version& v, real_time m)
-      : uci(_uci) {
-    objv = v;
-    mtime = m;
-  }
+  struct Svc {
+    RGWSI_User *user{nullptr};
+  } svc;
 
-  void dump(Formatter *f) const override {
-    uci.dump(f);
+  RGWUserMetadataHandler(RGWSI_User *user_svc) {
+    base_init(user_svc->ctx(), user_svc->get_be_handler());
+    svc.user = user_svc;
   }
-};
 
-class RGWUserMetadataHandler : public RGWMetadataHandler {
-public:
   string get_type() override { return "user"; }
 
-  int get(RGWRados *store, string& entry, RGWMetadataObject **obj) override {
+  int do_get(RGWSI_MetaBackend_Handler::Op *op, string& entry, RGWMetadataObject **obj, optional_yield y) override {
     RGWUserCompleteInfo uci;
     RGWObjVersionTracker objv_tracker;
     real_time mtime;
 
-    rgw_user uid(entry);
+    rgw_user user = RGWSI_User::user_from_meta_key(entry);
 
-    int ret = rgw_get_user_info_by_uid(store, uid, uci.info, &objv_tracker,
-                                       &mtime, NULL, &uci.attrs);
+    int ret = svc.user->read_user_info(op->ctx(), user, &uci.info, &objv_tracker,
+                                       &mtime, nullptr, &uci.attrs,
+                                       y);
     if (ret < 0) {
       return ret;
     }
@@ -2819,132 +2574,318 @@ public:
     return 0;
   }
 
-  int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker,
-          real_time mtime, JSONObj *obj, sync_type_t sync_mode) override {
+  RGWMetadataObject *get_meta_obj(JSONObj *jo, const obj_version& objv, const ceph::real_time& mtime) override {
     RGWUserCompleteInfo uci;
 
     try {
-      decode_json_obj(uci, obj);
+      decode_json_obj(uci, jo);
     } catch (JSONDecoder::err& e) {
-      return -EINVAL;
+      return nullptr;
     }
 
-    map<string, bufferlist> *pattrs = NULL;
-    if (uci.has_attrs) {
-      pattrs = &uci.attrs;
-    }
+    return new RGWUserMetadataObject(uci, objv, mtime);
+  }
 
-    rgw_user uid(entry);
+  int do_put(RGWSI_MetaBackend_Handler::Op *op, string& entry,
+             RGWMetadataObject *obj,
+             RGWObjVersionTracker& objv_tracker,
+             optional_yield y,
+             RGWMDLogSyncType type) override;
 
-    RGWUserInfo old_info;
-    real_time orig_mtime;
-    int ret = rgw_get_user_info_by_uid(store, uid, old_info, &objv_tracker, &orig_mtime);
-    if (ret < 0 && ret != -ENOENT)
-      return ret;
+  int do_remove(RGWSI_MetaBackend_Handler::Op *op, string& entry, RGWObjVersionTracker& objv_tracker,
+                optional_yield y) {
+    RGWUserInfo info;
 
-    // are we actually going to perform this put, or is it too old?
-    if (ret != -ENOENT &&
-        !check_versions(objv_tracker.read_version, orig_mtime,
-                       objv_tracker.write_version, mtime, sync_mode)) {
-      return STATUS_NO_APPLY;
-    }
+    rgw_user user = RGWSI_User::user_from_meta_key(entry);
 
-    ret = rgw_store_user_info(store, uci.info, &old_info, &objv_tracker, mtime, false, pattrs);
+    int ret = svc.user->read_user_info(op->ctx(), user, &info, nullptr,
+                                       nullptr, nullptr, nullptr,
+                                       y);
     if (ret < 0) {
       return ret;
     }
 
-    return STATUS_APPLIED;
+    return svc.user->remove_user_info(op->ctx(), info, &objv_tracker,
+                                      y);
   }
+};
 
-  struct list_keys_info {
-    RGWRados *store;
-    RGWListRawObjsCtx ctx;
-  };
+class RGWMetadataHandlerPut_User : public RGWMetadataHandlerPut_SObj
+{
+  RGWUserMetadataHandler *uhandler;
+  RGWUserMetadataObject *uobj;
+public:
+  RGWMetadataHandlerPut_User(RGWUserMetadataHandler *_handler,
+                             RGWSI_MetaBackend_Handler::Op *op, string& entry,
+                             RGWMetadataObject *obj, RGWObjVersionTracker& objv_tracker,
+                             optional_yield y,
+                             RGWMDLogSyncType type) : RGWMetadataHandlerPut_SObj(_handler, op, entry, obj, objv_tracker, y, type),
+                                                                uhandler(_handler) {
+    uobj = static_cast<RGWUserMetadataObject *>(obj);
+  }
 
-  int remove(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker) override {
-    RGWUserInfo info;
+  int put_checked() override;
+};
 
-    rgw_user uid(entry);
+int RGWUserMetadataHandler::do_put(RGWSI_MetaBackend_Handler::Op *op, string& entry,
+                                   RGWMetadataObject *obj,
+                                   RGWObjVersionTracker& objv_tracker,
+                                   optional_yield y,
+                                   RGWMDLogSyncType type)
+{
+  RGWMetadataHandlerPut_User put_op(this, op, entry, obj, objv_tracker, y, type);
+  return do_put_operate(&put_op);
+}
 
-    int ret = rgw_get_user_info_by_uid(store, uid, info, &objv_tracker);
-    if (ret < 0)
-      return ret;
+int RGWMetadataHandlerPut_User::put_checked()
+{
+  RGWUserMetadataObject *orig_obj = static_cast<RGWUserMetadataObject *>(old_obj);
+  RGWUserCompleteInfo& uci = uobj->get_uci();
 
-    return rgw_delete_user(store, info, objv_tracker);
+  map<string, bufferlist> *pattrs{nullptr};
+  if (uci.has_attrs) {
+    pattrs = &uci.attrs;
   }
 
-  void get_pool_and_oid(RGWRados *store, const string& key, rgw_pool& pool, string& oid) override {
-    oid = key;
-    pool = store->svc.zone->get_zone_params().user_uid_pool;
+  RGWUserInfo *pold_info = (orig_obj ? &orig_obj->get_uci().info : nullptr);
+
+  auto mtime = obj->get_mtime();
+
+  int ret = uhandler->svc.user->store_user_info(op->ctx(), uci.info, pold_info,
+                                               &objv_tracker, mtime,
+                                               false, pattrs, y);
+  if (ret < 0) {
+    return ret;
   }
 
-  int list_keys_init(RGWRados *store, const string& marker, void **phandle) override
-  {
-    auto info = std::make_unique<list_keys_info>();
+  return STATUS_APPLIED;
+}
 
-    info->store = store;
 
-    int ret = store->list_raw_objects_init(store->svc.zone->get_zone_params().user_uid_pool, marker,
-                                           &info->ctx);
-    if (ret < 0) {
-      return ret;
+RGWUserCtl::RGWUserCtl(RGWSI_Zone *zone_svc,
+                       RGWSI_User *user_svc,
+                       RGWUserMetadataHandler *_umhandler) : umhandler(_umhandler) {
+  svc.zone = zone_svc;
+  svc.user = user_svc;
+  be_handler = umhandler->get_be_handler();
+}
+
+template <class T>
+class optional_default
+{
+  const std::optional<T>& opt;
+  std::optional<T> def;
+  const T *p;
+public:
+  optional_default(const std::optional<T>& _o) : opt(_o) {
+    if (opt) {
+      p = &(*opt);
+    } else {
+      def = T();
+      p = &(*def);
     }
+  }
 
-    *phandle = (void *)info.release();
+  const T *operator->() {
+    return p;
+  }
 
-    return 0;
+  const T& operator*() {
+    return *p;
   }
+};
 
-  int list_keys_next(void *handle, int max, list<string>& keys, bool *truncated) override {
-    list_keys_info *info = static_cast<list_keys_info *>(handle);
+int RGWUserCtl::get_info_by_uid(const rgw_user& uid,
+                                RGWUserInfo *info,
+                                optional_yield y,
+                                const GetParams& params)
+
+{
+  return be_handler->call([&](RGWSI_MetaBackend_Handler::Op *op) {
+    return svc.user->read_user_info(op->ctx(),
+                                    uid,
+                                    info,
+                                    params.objv_tracker,
+                                    params.mtime,
+                                    params.cache_info,
+                                    params.attrs,
+                                    y);
+  });
+}
+
+int RGWUserCtl::get_info_by_email(const string& email,
+                                  RGWUserInfo *info,
+                                  optional_yield y,
+                                  const GetParams& params)
+{
+  return be_handler->call([&](RGWSI_MetaBackend_Handler::Op *op) {
+    return svc.user->get_user_info_by_email(op->ctx(), email,
+                                            info,
+                                            params.objv_tracker,
+                                            params.mtime,
+                                            y);
+  });
+}
+
+int RGWUserCtl::get_info_by_swift(const string& swift_name,
+                                  RGWUserInfo *info,
+                                  optional_yield y,
+                                  const GetParams& params)
+{
+  return be_handler->call([&](RGWSI_MetaBackend_Handler::Op *op) {
+    return svc.user->get_user_info_by_swift(op->ctx(), swift_name,
+                                            info,
+                                            params.objv_tracker,
+                                            params.mtime,
+                                            y);
+  });
+}
+
+int RGWUserCtl::get_info_by_access_key(const string& access_key,
+                                       RGWUserInfo *info,
+                                       optional_yield y,
+                                       const GetParams& params)
+{
+  return be_handler->call([&](RGWSI_MetaBackend_Handler::Op *op) {
+    return svc.user->get_user_info_by_access_key(op->ctx(), access_key,
+                                                 info,
+                                                 params.objv_tracker,
+                                                 params.mtime,
+                                                 y);
+  });
+}
+
+int RGWUserCtl::get_attrs_by_uid(const rgw_user& user_id,
+                                 map<string, bufferlist> *pattrs,
+                                 optional_yield y,
+                                 RGWObjVersionTracker *objv_tracker)
+{
+  RGWUserInfo user_info;
 
-    string no_filter;
+  return get_info_by_uid(user_id, &user_info, y, RGWUserCtl::GetParams()
+                         .set_attrs(pattrs)
+                         .set_objv_tracker(objv_tracker));
+}
 
-    keys.clear();
+int RGWUserCtl::store_info(const RGWUserInfo& info, optional_yield y,
+                           const PutParams& params)
+{
+  string key = RGWSI_User::get_meta_key(info.user_id);
 
-    RGWRados *store = info->store;
+  return be_handler->call([&](RGWSI_MetaBackend_Handler::Op *op) {
+    return svc.user->store_user_info(op->ctx(), info,
+                                     params.old_info,
+                                     params.objv_tracker,
+                                     params.mtime,
+                                     params.exclusive,
+                                     params.attrs,
+                                     y);
+  });
+}
 
-    list<string> unfiltered_keys;
+int RGWUserCtl::remove_info(const RGWUserInfo& info, optional_yield y,
+                            const RemoveParams& params)
 
-    int ret = store->list_raw_objects_next(no_filter, max, info->ctx,
-                                           unfiltered_keys, truncated);
-    if (ret < 0 && ret != -ENOENT)
-      return ret;                      
-    if (ret == -ENOENT) {
-      if (truncated)
-        *truncated = false;
-      return 0;
-    }
+{
+  string key = RGWSI_User::get_meta_key(info.user_id);
 
-    // now filter out the buckets entries
-    list<string>::iterator iter;
-    for (iter = unfiltered_keys.begin(); iter != unfiltered_keys.end(); ++iter) {
-      string& k = *iter;
+  return be_handler->call([&](RGWSI_MetaBackend_Handler::Op *op) {
+    return svc.user->remove_user_info(op->ctx(), info,
+                                      params.objv_tracker,
+                                      y);
+  });
+}
+
+int RGWUserCtl::add_bucket(const rgw_user& user,
+                           const rgw_bucket& bucket,
+                           ceph::real_time creation_time)
+
+{
+  return be_handler->call([&](RGWSI_MetaBackend_Handler::Op *op) {
+    return svc.user->add_bucket(op->ctx(), user, bucket, creation_time);
+  });
+}
+
+int RGWUserCtl::remove_bucket(const rgw_user& user,
+                              const rgw_bucket& bucket)
+
+{
+  return be_handler->call([&](RGWSI_MetaBackend_Handler::Op *op) {
+    return svc.user->remove_bucket(op->ctx(), user, bucket);
+  });
+}
+
+int RGWUserCtl::list_buckets(const rgw_user& user,
+                             const string& marker,
+                             const string& end_marker,
+                             uint64_t max,
+                             bool need_stats,
+                             RGWUserBuckets *buckets,
+                             bool *is_truncated,
+                             uint64_t default_max)
+{
+  if (!max) {
+    max = default_max;
+  }
 
-      if (k.find(".buckets") == string::npos) {
-        keys.push_back(k);
+  return be_handler->call([&](RGWSI_MetaBackend_Handler::Op *op) {
+    int ret = svc.user->list_buckets(op->ctx(), user, marker, end_marker,
+                                     max, buckets, is_truncated);
+    if (ret < 0) {
+      return ret;
+    }
+    if (need_stats) {
+      map<string, RGWBucketEnt>& m = buckets->get_buckets();
+      ret = ctl.bucket->read_buckets_stats(m, null_yield);
+      if (ret < 0 && ret != -ENOENT) {
+        ldout(svc.user->ctx(), 0) << "ERROR: could not get stats for buckets" << dendl;
+        return ret;
       }
     }
-
     return 0;
-  }
+  });
+}
 
-  void list_keys_complete(void *handle) override {
-    list_keys_info *info = static_cast<list_keys_info *>(handle);
-    delete info;
-  }
+int RGWUserCtl::flush_bucket_stats(const rgw_user& user,
+                                   const RGWBucketEnt& ent)
+{
+  return be_handler->call([&](RGWSI_MetaBackend_Handler::Op *op) {
+    return svc.user->flush_bucket_stats(op->ctx(), user, ent);
+  });
+}
 
-  string get_marker(void *handle) override {
-    list_keys_info *info = static_cast<list_keys_info *>(handle);
-    return info->store->list_raw_objs_get_cursor(info->ctx);
-  }
-};
+int RGWUserCtl::complete_flush_stats(const rgw_user& user)
+{
+  return be_handler->call([&](RGWSI_MetaBackend_Handler::Op *op) {
+    return svc.user->complete_flush_stats(op->ctx(), user);
+  });
+}
 
-void rgw_user_init(RGWRados *store)
+int RGWUserCtl::reset_stats(const rgw_user& user)
 {
-  uinfo_cache.init(store->svc.cache);
+  return be_handler->call([&](RGWSI_MetaBackend_Handler::Op *op) {
+    return svc.user->reset_bucket_stats(op->ctx(), user);
+  });
+}
 
-  user_meta_handler = new RGWUserMetadataHandler;
-  store->meta_mgr->register_handler(user_meta_handler);
+int RGWUserCtl::read_stats(const rgw_user& user, RGWStorageStats *stats,
+                          ceph::real_time *last_stats_sync,
+                          ceph::real_time *last_stats_update)
+{
+  return be_handler->call([&](RGWSI_MetaBackend_Handler::Op *op) {
+    return svc.user->read_stats(op->ctx(), user, stats,
+                               last_stats_sync, last_stats_update);
+  });
 }
+
+int RGWUserCtl::read_stats_async(const rgw_user& user, RGWGetUserStats_CB *cb)
+{
+  return be_handler->call([&](RGWSI_MetaBackend_Handler::Op *op) {
+    return svc.user->read_stats_async(op->ctx(), user, cb);
+  });
+}
+
+RGWMetadataHandler *RGWUserMetaHandlerAllocator::alloc(RGWSI_User *user_svc) {
+  return new RGWUserMetadataHandler(user_svc);
+}
+