]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_user.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_user.cc
index ad8cfc88d134e92e73f0c04871f29fe613636226..3db58af7e9501fc6084ede03601060ad85f0da23 100644 (file)
@@ -10,8 +10,6 @@
 #include "common/errno.h"
 #include "common/Formatter.h"
 #include "common/ceph_json.h"
-#include "common/RWLock.h"
-#include "rgw_sal.h"
 #include "rgw_sal_rados.h"
 #include "rgw_zone.h"
 #include "rgw_acl.h"
@@ -34,7 +32,7 @@
 
 #define dout_subsys ceph_subsys_rgw
 
-
+using namespace std;
 
 extern void op_type_to_str(uint32_t mask, char *buf, int len);
 
@@ -48,11 +46,10 @@ void rgw_get_anon_user(RGWUserInfo& info)
   info.access_keys.clear();
 }
 
-int rgw_user_sync_all_stats(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store,
-                           const rgw_user& user_id, optional_yield y)
+int rgw_user_sync_all_stats(const DoutPrefixProvider *dpp, rgw::sal::Store* store,
+                           rgw::sal::User* user, optional_yield y)
 {
-  rgw::sal::RGWBucketList user_buckets;
-  rgw::sal::RGWRadosUser user(store, user_id);
+  rgw::sal::BucketList user_buckets;
 
   CephContext *cct = store->ctx();
   size_t max_entries = cct->_conf->rgw_list_buckets_max_chunk;
@@ -60,7 +57,7 @@ int rgw_user_sync_all_stats(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosSto
   int ret;
 
   do {
-    ret = user.list_buckets(dpp, marker, string(), max_entries, false, user_buckets, y);
+    ret = user->list_buckets(dpp, marker, string(), max_entries, false, user_buckets, y);
     if (ret < 0) {
       ldpp_dout(dpp, 0) << "failed to read user buckets: ret=" << ret << dendl;
       return ret;
@@ -71,7 +68,7 @@ int rgw_user_sync_all_stats(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosSto
 
       auto& bucket = i->second;
 
-      ret = bucket->get_bucket_info(dpp, y);
+      ret = bucket->load_bucket(dpp, y);
       if (ret < 0) {
         ldpp_dout(dpp, 0) << "ERROR: could not read bucket info: bucket=" << bucket << " ret=" << ret << dendl;
         continue;
@@ -88,7 +85,7 @@ int rgw_user_sync_all_stats(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosSto
     }
   } while (user_buckets.is_truncated());
 
-  ret = store->ctl()->user->complete_flush_stats(dpp, user.get_user(), y);
+  ret = user->complete_flush_stats(dpp, y);
   if (ret < 0) {
     cerr << "ERROR: failed to complete syncing user stats: ret=" << ret << std::endl;
     return ret;
@@ -97,10 +94,10 @@ int rgw_user_sync_all_stats(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosSto
   return 0;
 }
 
-int rgw_user_get_all_buckets_stats(const DoutPrefixProvider *dpp, 
-                                   rgw::sal::RGWRadosStore *store,
-                                  const rgw_user& user_id,
-                                  map<string, cls_user_bucket_entry>& buckets_usage_map,
+int rgw_user_get_all_buckets_stats(const DoutPrefixProvider *dpp,
+                                  rgw::sal::Store* store,
+                                  rgw::sal::User* user,
+                                  map<string, bucket_meta_entry>& buckets_usage_map,
                                   optional_yield y)
 {
   CephContext *cct = store->ctx();
@@ -110,9 +107,8 @@ int rgw_user_get_all_buckets_stats(const DoutPrefixProvider *dpp,
   int ret;
 
   do {
-    rgw::sal::RGWBucketList buckets;
-    ret = rgw_read_user_buckets(dpp, store, user_id, buckets, marker,
-                               string(), max_entries, false, y);
+    rgw::sal::BucketList buckets;
+    ret = user->list_buckets(dpp, marker, string(), max_entries, false, buckets, y);
     if (ret < 0) {
       ldpp_dout(dpp, 0) << "failed to read user buckets: ret=" << ret << dendl;
       return ret;
@@ -122,13 +118,16 @@ int rgw_user_get_all_buckets_stats(const DoutPrefixProvider *dpp,
       marker = i.first;
 
       auto& bucket_ent = i.second;
-      ret = bucket_ent->read_bucket_stats(dpp, y);
+      ret = bucket_ent->load_bucket(dpp, y, true /* load user stats */);
       if (ret < 0) {
         ldpp_dout(dpp, 0) << "ERROR: could not get bucket stats: ret=" << ret << dendl;
         return ret;
       }
-      cls_user_bucket_entry entry;
-      bucket_ent->convert(&entry);
+      bucket_meta_entry entry;
+      entry.size = bucket_ent->get_size();
+      entry.size_rounded = bucket_ent->get_size_rounded();
+      entry.creation_time = bucket_ent->get_creation_time();
+      entry.count = bucket_ent->get_count();
       buckets_usage_map.emplace(bucket_ent->get_name(), entry);
     }
     done = (buckets.count() < max_entries);
@@ -137,103 +136,6 @@ int rgw_user_get_all_buckets_stats(const DoutPrefixProvider *dpp,
   return 0;
 }
 
-/**
- * Save the given user information to storage.
- * Returns: 0 on success, -ERR# on failure.
- */
-int rgw_store_user_info(const DoutPrefixProvider *dpp, 
-                        RGWUserCtl *user_ctl,
-                        RGWUserInfo& info,
-                        RGWUserInfo *old_info,
-                        RGWObjVersionTracker *objv_tracker,
-                        real_time mtime,
-                        bool exclusive,
-                       optional_yield y,
-                        map<string, bufferlist> *pattrs)
-{
-  return user_ctl->store_info(dpp, info, y,
-                              RGWUserCtl::PutParams()
-                              .set_old_info(old_info)
-                              .set_objv_tracker(objv_tracker)
-                              .set_mtime(mtime)
-                              .set_exclusive(exclusive)
-                              .set_attrs(pattrs));
-}
-
-/**
- * Given a uid, finds the user info associated with it.
- * returns: 0 on success, -ERR# on failure (including nonexistence)
- */
-int rgw_get_user_info_by_uid(const DoutPrefixProvider *dpp, 
-                             RGWUserCtl *user_ctl,
-                             const rgw_user& uid,
-                             RGWUserInfo& info,
-                            optional_yield y,
-                             RGWObjVersionTracker * const objv_tracker,
-                             real_time * const pmtime,
-                             rgw_cache_entry_info * const cache_info,
-                             map<string, bufferlist> * const pattrs)
-{
-  return user_ctl->get_info_by_uid(dpp, uid, &info, y,
-                                   RGWUserCtl::GetParams()
-                                   .set_objv_tracker(objv_tracker)
-                                   .set_mtime(pmtime)
-                                   .set_cache_info(cache_info)
-                                   .set_attrs(pattrs));
-}
-
-/**
- * Given an email, finds the user info associated with it.
- * returns: 0 on success, -ERR# on failure (including nonexistence)
- */
-int rgw_get_user_info_by_email(const DoutPrefixProvider *dpp, 
-                               RGWUserCtl *user_ctl, string& email,
-                              RGWUserInfo& info, optional_yield y,
-                               RGWObjVersionTracker *objv_tracker,
-                              real_time *pmtime)
-{
-  return user_ctl->get_info_by_email(dpp, email, &info, y,
-                                     RGWUserCtl::GetParams()
-                                     .set_objv_tracker(objv_tracker)
-                                     .set_mtime(pmtime));
-}
-
-/**
- * Given an swift username, finds the user_info associated with it.
- * returns: 0 on success, -ERR# on failure (including nonexistence)
- */
-int rgw_get_user_info_by_swift(const DoutPrefixProvider *dpp, 
-                               RGWUserCtl *user_ctl,
-                              const string& swift_name,
-                              RGWUserInfo& info,        /* out */
-                              optional_yield y,
-                              RGWObjVersionTracker * const objv_tracker,
-                              real_time * const pmtime)
-{
-  return user_ctl->get_info_by_swift(dpp, swift_name, &info, y,
-                                     RGWUserCtl::GetParams()
-                                     .set_objv_tracker(objv_tracker)
-                                     .set_mtime(pmtime));
-}
-
-/**
- * Given an access key, finds the user info associated with it.
- * returns: 0 on success, -ERR# on failure (including nonexistence)
- */
-extern int rgw_get_user_info_by_access_key(const DoutPrefixProvider *dpp, 
-                                           RGWUserCtl *user_ctl,
-                                           const std::string& access_key,
-                                           RGWUserInfo& info,
-                                          optional_yield y,
-                                          RGWObjVersionTracker* objv_tracker,
-                                           real_time *pmtime)
-{
-  return user_ctl->get_info_by_access_key(dpp, access_key, &info, y,
-                                          RGWUserCtl::GetParams()
-                                          .set_objv_tracker(objv_tracker)
-                                          .set_mtime(pmtime));
-}
-
 static bool char_is_unreserved_url(char c)
 {
   if (isalnum(c))
@@ -250,64 +152,6 @@ static bool char_is_unreserved_url(char c)
   }
 }
 
-struct rgw_flags_desc {
-  uint32_t mask;
-  const char *str;
-};
-
-static struct rgw_flags_desc rgw_perms[] = {
- { RGW_PERM_FULL_CONTROL, "full-control" },
- { RGW_PERM_READ | RGW_PERM_WRITE, "read-write" },
- { RGW_PERM_READ, "read" },
- { RGW_PERM_WRITE, "write" },
- { RGW_PERM_READ_ACP, "read-acp" },
- { RGW_PERM_WRITE_ACP, "write-acp" },
- { 0, NULL }
-};
-
-void rgw_perm_to_str(uint32_t mask, char *buf, int len)
-{
-  const char *sep = "";
-  int pos = 0;
-  if (!mask) {
-    snprintf(buf, len, "<none>");
-    return;
-  }
-  while (mask) {
-    uint32_t orig_mask = mask;
-    for (int i = 0; rgw_perms[i].mask; i++) {
-      struct rgw_flags_desc *desc = &rgw_perms[i];
-      if ((mask & desc->mask) == desc->mask) {
-        pos += snprintf(buf + pos, len - pos, "%s%s", sep, desc->str);
-        if (pos == len)
-          return;
-        sep = ", ";
-        mask &= ~desc->mask;
-        if (!mask)
-          return;
-      }
-    }
-    if (mask == orig_mask) // no change
-      break;
-  }
-}
-
-uint32_t rgw_str_to_perm(const char *str)
-{
-  if (strcasecmp(str, "") == 0)
-    return RGW_PERM_NONE;
-  else if (strcasecmp(str, "read") == 0)
-    return RGW_PERM_READ;
-  else if (strcasecmp(str, "write") == 0)
-    return RGW_PERM_WRITE;
-  else if (strcasecmp(str, "readwrite") == 0)
-    return RGW_PERM_READ | RGW_PERM_WRITE;
-  else if (strcasecmp(str, "full") == 0)
-    return RGW_PERM_FULL_CONTROL;
-
-  return RGW_PERM_INVALID;
-}
-
 int rgw_validate_tenant_name(const string& t)
 {
   struct tench {
@@ -502,7 +346,6 @@ RGWAccessKeyPool::RGWAccessKeyPool(RGWUser* usr)
   user = usr;
 
   store = user->get_store();
-  user_ctl = user->get_user_ctl();
 }
 
 int RGWAccessKeyPool::init(RGWUserAdminOpState& op_state)
@@ -512,7 +355,7 @@ int RGWAccessKeyPool::init(RGWUserAdminOpState& op_state)
     return -EINVAL;
   }
 
-  rgw_user& uid = op_state.get_user_id();
+  const rgw_user& uid = op_state.get_user_id();
   if (uid.compare(RGW_USER_ANON_ID) == 0) {
     keys_allowed = false;
     return -EACCES;
@@ -526,6 +369,112 @@ int RGWAccessKeyPool::init(RGWUserAdminOpState& op_state)
   return 0;
 }
 
+RGWUserAdminOpState::RGWUserAdminOpState(rgw::sal::Store* store)
+{
+  user = store->get_user(rgw_user(RGW_USER_ANON_ID));
+}
+
+void RGWUserAdminOpState::set_user_id(const rgw_user& id)
+{
+  if (id.empty())
+    return;
+
+  user->get_info().user_id = id;
+}
+
+void RGWUserAdminOpState::set_subuser(std::string& _subuser)
+{
+  if (_subuser.empty())
+    return;
+
+  size_t pos = _subuser.find(":");
+  if (pos != string::npos) {
+    rgw_user tmp_id;
+    tmp_id.from_str(_subuser.substr(0, pos));
+    if (tmp_id.tenant.empty()) {
+      user->get_info().user_id.id = tmp_id.id;
+    } else {
+      user->get_info().user_id = tmp_id;
+    }
+    subuser = _subuser.substr(pos+1);
+  } else {
+    subuser = _subuser;
+  }
+
+  subuser_specified = true;
+}
+
+void RGWUserAdminOpState::set_user_info(RGWUserInfo& user_info)
+{
+  user->get_info() = user_info;
+}
+
+const rgw_user& RGWUserAdminOpState::get_user_id()
+{
+  return user->get_id();
+}
+
+RGWUserInfo& RGWUserAdminOpState::get_user_info()
+{
+  return user->get_info();
+}
+
+map<std::string, RGWAccessKey>* RGWUserAdminOpState::get_swift_keys()
+{
+  return &user->get_info().swift_keys;
+}
+
+map<std::string, RGWAccessKey>* RGWUserAdminOpState::get_access_keys()
+{
+  return &user->get_info().access_keys;
+}
+
+map<std::string, RGWSubUser>* RGWUserAdminOpState::get_subusers()
+{
+  return &user->get_info().subusers;
+}
+
+RGWUserCaps *RGWUserAdminOpState::get_caps_obj()
+{
+  return &user->get_info().caps;
+}
+
+std::string RGWUserAdminOpState::build_default_swift_kid()
+{
+  if (user->get_id().empty() || subuser.empty())
+    return "";
+
+  std::string kid;
+  user->get_id().to_str(kid);
+  kid.append(":");
+  kid.append(subuser);
+
+  return kid;
+}
+
+std::string RGWUserAdminOpState::generate_subuser() {
+  if (user->get_id().empty())
+    return "";
+
+  std::string generated_subuser;
+  user->get_id().to_str(generated_subuser);
+  std::string rand_suffix;
+
+  int sub_buf_size = RAND_SUBUSER_LEN + 1;
+  char sub_buf[RAND_SUBUSER_LEN + 1];
+
+  gen_rand_alphanumeric_upper(g_ceph_context, sub_buf, sub_buf_size);
+
+  rand_suffix = sub_buf;
+  if (rand_suffix.empty())
+    return "";
+
+  generated_subuser.append(rand_suffix);
+  subuser = generated_subuser;
+
+  return generated_subuser;
+}
+
 /*
  * Do a fairly exhaustive search for an existing key matching the parameters
  * given. Also handles the case where no key type was specified and updates
@@ -632,8 +581,9 @@ int RGWAccessKeyPool::check_op(RGWUserAdminOpState& op_state,
 
   // don't check for secret key because we may be doing a removal
 
-  check_existing_key(op_state);
-
+  if (check_existing_key(op_state)) {
+    op_state.set_access_key_exist();
+  }
   return 0;
 }
 
@@ -646,7 +596,7 @@ int RGWAccessKeyPool::generate_key(const DoutPrefixProvider *dpp, RGWUserAdminOp
 
   std::pair<std::string, RGWAccessKey> key_pair;
   RGWAccessKey new_key;
-  RGWUserInfo duplicate_check;
+  std::unique_ptr<rgw::sal::User> duplicate_check;
 
   int key_type = op_state.get_key_type();
   bool gen_access = op_state.will_gen_access();
@@ -669,13 +619,13 @@ int RGWAccessKeyPool::generate_key(const DoutPrefixProvider *dpp, RGWUserAdminOp
   if (!id.empty()) {
     switch (key_type) {
     case KEY_TYPE_SWIFT:
-      if (rgw_get_user_info_by_swift(dpp, user_ctl, id, duplicate_check, y) >= 0) {
+      if (store->get_user_by_swift(dpp, id, y, &duplicate_check) >= 0) {
         set_err_msg(err_msg, "existing swift key in RGW system:" + id);
         return -ERR_KEY_EXIST;
       }
       break;
     case KEY_TYPE_S3:
-      if (rgw_get_user_info_by_access_key(dpp, user_ctl, id, duplicate_check, y) >= 0) {
+      if (store->get_user_by_access_key(dpp, id, y, &duplicate_check) >= 0) {
         set_err_msg(err_msg, "existing S3 key in RGW system:" + id);
         return -ERR_KEY_EXIST;
       }
@@ -715,7 +665,7 @@ int RGWAccessKeyPool::generate_key(const DoutPrefixProvider *dpp, RGWUserAdminOp
       if (!validate_access_key(id))
         continue;
 
-    } while (!rgw_get_user_info_by_access_key(dpp, user_ctl, id, duplicate_check, y));
+    } while (!store->get_user_by_access_key(dpp, id, y, &duplicate_check));
   }
 
   if (key_type == KEY_TYPE_SWIFT) {
@@ -726,7 +676,7 @@ int RGWAccessKeyPool::generate_key(const DoutPrefixProvider *dpp, RGWUserAdminOp
     }
 
     // check that the access key doesn't exist
-    if (rgw_get_user_info_by_swift(dpp, user_ctl, id, duplicate_check, y) >= 0) {
+    if (store->get_user_by_swift(dpp, id, y, &duplicate_check) >= 0) {
       set_err_msg(err_msg, "cannot create existing swift key");
       return -ERR_KEY_EXIST;
     }
@@ -1034,7 +984,6 @@ RGWSubUserPool::RGWSubUserPool(RGWUser *usr)
 
   subusers_allowed = true;
   store = user->get_store();
-  user_ctl = user->get_user_ctl();
 }
 
 int RGWSubUserPool::init(RGWUserAdminOpState& op_state)
@@ -1044,7 +993,7 @@ int RGWSubUserPool::init(RGWUserAdminOpState& op_state)
     return -EINVAL;
   }
 
-  rgw_user& uid = op_state.get_user_id();
+  const rgw_user& uid = op_state.get_user_id();
   if (uid.compare(RGW_USER_ANON_ID) == 0) {
     subusers_allowed = false;
     return -EACCES;
@@ -1177,6 +1126,11 @@ int RGWSubUserPool::add(const DoutPrefixProvider *dpp, RGWUserAdminOpState& op_s
     return ret;
   }
 
+  if (op_state.get_access_key_exist()) {
+    set_err_msg(err_msg, "cannot create existing key");
+    return -ERR_KEY_EXIST;
+  }
+
   if (key_type == KEY_TYPE_S3 && op_state.get_access_key().empty()) {
     op_state.set_gen_access();
   }
@@ -1347,7 +1301,7 @@ int RGWUserCapPool::init(RGWUserAdminOpState& op_state)
     return -EINVAL;
   }
 
-  rgw_user& uid = op_state.get_user_id();
+  const rgw_user& uid = op_state.get_user_id();
   if (uid.compare(RGW_USER_ANON_ID) == 0) {
     caps_allowed = false;
     return -EACCES;
@@ -1454,7 +1408,7 @@ RGWUser::RGWUser() : caps(this), keys(this), subusers(this)
   init_default();
 }
 
-int RGWUser::init(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *storage,
+int RGWUser::init(const DoutPrefixProvider *dpp, rgw::sal::Store* storage,
                  RGWUserAdminOpState& op_state, optional_yield y)
 {
   init_default();
@@ -1478,14 +1432,13 @@ void RGWUser::init_default()
   clear_populated();
 }
 
-int RGWUser::init_storage(rgw::sal::RGWRadosStore *storage)
+int RGWUser::init_storage(rgw::sal::Store* storage)
 {
   if (!storage) {
     return -EINVAL;
   }
 
   store = storage;
-  user_ctl = store->ctl()->user;
 
   clear_populated();
 
@@ -1512,7 +1465,7 @@ int RGWUser::init(const DoutPrefixProvider *dpp, RGWUserAdminOpState& op_state,
     access_key.clear();
   }
 
-  RGWUserInfo user_info;
+  std::unique_ptr<rgw::sal::User> user;
 
   clear_populated();
 
@@ -1525,35 +1478,37 @@ int RGWUser::init(const DoutPrefixProvider *dpp, RGWUserAdminOpState& op_state,
   }
 
   if (!user_id.empty() && (user_id.compare(RGW_USER_ANON_ID) != 0)) {
-    found = (rgw_get_user_info_by_uid(dpp, user_ctl, user_id, user_info, y, &op_state.objv) >= 0);
+    user = store->get_user(user_id);
+    found = (user->load_user(dpp, y) >= 0);
     op_state.found_by_uid = found;
   }
   if (store->ctx()->_conf.get_val<bool>("rgw_user_unique_email")) {
     if (!user_email.empty() && !found) {
-      found = (rgw_get_user_info_by_email(dpp, user_ctl, user_email, user_info, y, &op_state.objv) >= 0);
+      found = (store->get_user_by_email(dpp, user_email, y, &user) >= 0);
       op_state.found_by_email = found;
     }
   }
   if (!swift_user.empty() && !found) {
-    found = (rgw_get_user_info_by_swift(dpp, user_ctl, swift_user, user_info, y, &op_state.objv) >= 0);
+    found = (store->get_user_by_swift(dpp, swift_user, y, &user) >= 0);
     op_state.found_by_key = found;
   }
   if (!access_key.empty() && !found) {
-    found = (rgw_get_user_info_by_access_key(dpp, user_ctl, access_key, user_info, y, &op_state.objv) >= 0);
+    found = (store->get_user_by_access_key(dpp, access_key, y, &user) >= 0);
     op_state.found_by_key = found;
   }
   
   op_state.set_existing_user(found);
   if (found) {
-    op_state.set_user_info(user_info);
+    op_state.set_user_info(user->get_info());
     op_state.set_populated();
+    op_state.objv = user->get_version_tracker();
 
-    old_info = user_info;
+    old_info = user->get_info();
     set_populated();
   }
 
   if (user_id.empty()) {
-    user_id = user_info.user_id;
+    user_id = user->get_id();
   }
   op_state.set_initialized();
 
@@ -1589,7 +1544,7 @@ int RGWUser::update(const DoutPrefixProvider *dpp, RGWUserAdminOpState& op_state
 {
   int ret;
   std::string subprocess_msg;
-  RGWUserInfo user_info = op_state.get_user_info();
+  rgw::sal::User* user = op_state.get_user();
 
   if (!store) {
     set_err_msg(err_msg, "couldn't initialize storage");
@@ -1598,14 +1553,14 @@ int RGWUser::update(const DoutPrefixProvider *dpp, RGWUserAdminOpState& op_state
 
   RGWUserInfo *pold_info = (is_populated() ? &old_info : nullptr);
 
-  ret = rgw_store_user_info(dpp, user_ctl, user_info, pold_info, &op_state.objv,
-                           real_time(), false, y);
+  ret = user->store_user(dpp, y, false, pold_info);
+  op_state.objv = user->get_version_tracker();
   if (ret < 0) {
     set_err_msg(err_msg, "unable to store user info");
     return ret;
   }
 
-  old_info = user_info;
+  old_info = user->get_info();
   set_populated();
 
   return 0;
@@ -1677,25 +1632,21 @@ int RGWUser::execute_rename(const DoutPrefixProvider *dpp, RGWUserAdminOpState&
     }
   }
 
-  rgw::sal::RGWRadosUser old_user(store, op_state.get_user_info());
-  rgw::sal::RGWRadosUser new_user(store, op_state.get_new_uid());
-  if (old_user.get_tenant() != new_user.get_tenant()) {
+  std::unique_ptr<rgw::sal::User> old_user = store->get_user(op_state.get_user_info().user_id);
+  std::unique_ptr<rgw::sal::User> new_user = store->get_user(op_state.get_new_uid());
+  if (old_user->get_tenant() != new_user->get_tenant()) {
     set_err_msg(err_msg, "users have to be under the same tenant namespace "
-                + old_user.get_tenant() + " != " + new_user.get_tenant());
+                + old_user->get_tenant() + " != " + new_user->get_tenant());
     return -EINVAL;
   }
 
   // create a stub user and write only the uid index and buckets object
-  RGWUserInfo stub_user_info;
-  stub_user_info.user_id = new_user.get_user();
+  std::unique_ptr<rgw::sal::User> user;
+  user = store->get_user(new_user->get_id());
 
-  RGWObjVersionTracker objv;
   const bool exclusive = !op_state.get_overwrite_new_user(); // overwrite if requested
 
-  ret = user_ctl->store_info(dpp, stub_user_info, y,
-                             RGWUserCtl::PutParams()
-                             .set_objv_tracker(&objv)
-                             .set_exclusive(exclusive));
+  ret = user->store_user(dpp, y, exclusive);
   if (ret == -EEXIST) {
     set_err_msg(err_msg, "user name given by --new-uid already exists");
     return ret;
@@ -1706,16 +1657,16 @@ int RGWUser::execute_rename(const DoutPrefixProvider *dpp, RGWUserAdminOpState&
   }
 
   RGWAccessControlPolicy policy_instance;
-  policy_instance.create_default(new_user.get_user(), old_user.get_display_name());
+  policy_instance.create_default(new_user->get_id(), old_user->get_display_name());
 
   //unlink and link buckets to new user
   string marker;
   CephContext *cct = store->ctx();
   size_t max_buckets = cct->_conf->rgw_list_buckets_max_chunk;
-  rgw::sal::RGWBucketList buckets;
+  rgw::sal::BucketList buckets;
 
   do {
-    ret = old_user.list_buckets(dpp, marker, "", max_buckets, false, buckets, y);
+    ret = old_user->list_buckets(dpp, marker, "", max_buckets, false, buckets, y);
     if (ret < 0) {
       set_err_msg(err_msg, "unable to list user buckets");
       return ret;
@@ -1727,7 +1678,7 @@ int RGWUser::execute_rename(const DoutPrefixProvider *dpp, RGWUserAdminOpState&
       auto& bucket = it->second;
       marker = it->first;
 
-      ret = bucket->get_bucket_info(dpp, y);
+      ret = bucket->load_bucket(dpp, y);
       if (ret < 0) {
         set_err_msg(err_msg, "failed to fetch bucket info for bucket=" + bucket->get_name());
         return ret;
@@ -1739,13 +1690,7 @@ int RGWUser::execute_rename(const DoutPrefixProvider *dpp, RGWUserAdminOpState&
         return ret;
       }
 
-      ret = bucket->link(dpp, &new_user, y);
-      if (ret < 0) {
-        set_err_msg(err_msg, "failed to link bucket " + bucket->get_name());
-        return ret;
-      }
-
-      ret = bucket->chown(&new_user, &old_user, y, dpp);
+      ret = bucket->chown(dpp, new_user.get(), old_user.get(), y);
       if (ret < 0) {
         set_err_msg(err_msg, "failed to run bucket chown" + cpp_strerror(-ret));
         return ret;
@@ -1757,10 +1702,10 @@ int RGWUser::execute_rename(const DoutPrefixProvider *dpp, RGWUserAdminOpState&
   // update the 'stub user' with all of the other fields and rewrite all of the
   // associated index objects
   RGWUserInfo& user_info = op_state.get_user_info();
-  user_info.user_id = new_user.get_user();
-  op_state.objv = objv;
+  user_info.user_id = new_user->get_id();
+  op_state.objv = user->get_version_tracker();
 
-  rename_swift_keys(new_user.get_user(), user_info.swift_keys);
+  rename_swift_keys(new_user->get_id(), user_info.swift_keys);
 
   return update(dpp, op_state, err_msg, y);
 }
@@ -1912,21 +1857,19 @@ int RGWUser::execute_remove(const DoutPrefixProvider *dpp, RGWUserAdminOpState&
   int ret;
 
   bool purge_data = op_state.will_purge_data();
-  rgw_user& uid = op_state.get_user_id();
-  RGWUserInfo user_info = op_state.get_user_info();
+  rgw::sal::User* user = op_state.get_user();
 
   if (!op_state.has_existing_user()) {
     set_err_msg(err_msg, "user does not exist");
     return -ENOENT;
   }
 
-  rgw::sal::RGWBucketList buckets;
+  rgw::sal::BucketList buckets;
   string marker;
   CephContext *cct = store->ctx();
   size_t max_buckets = cct->_conf->rgw_list_buckets_max_chunk;
   do {
-    ret = rgw_read_user_buckets(dpp, store, uid, buckets, marker, string(),
-                               max_buckets, false, y);
+    ret = user->list_buckets(dpp, marker, string(), max_buckets, false, buckets, y);
     if (ret < 0) {
       set_err_msg(err_msg, "unable to read user bucket info");
       return ret;
@@ -1950,8 +1893,7 @@ int RGWUser::execute_remove(const DoutPrefixProvider *dpp, RGWUserAdminOpState&
 
   } while (buckets.is_truncated());
 
-  ret = user_ctl->remove_info(dpp, user_info, y, RGWUserCtl::RemoveParams()
-                                            .set_objv_tracker(&op_state.objv));
+  ret = user->remove_user(dpp, y);
   if (ret < 0) {
     set_err_msg(err_msg, "unable to remove user from RADOS");
     return ret;
@@ -1992,7 +1934,7 @@ int RGWUser::execute_modify(const DoutPrefixProvider *dpp, RGWUserAdminOpState&
   std::string display_name = op_state.get_display_name();
 
   RGWUserInfo user_info;
-  RGWUserInfo duplicate_check;
+  std::unique_ptr<rgw::sal::User> duplicate_check;
 
   // ensure that the user info has been populated or is populate-able
   if (!op_state.has_existing_user() && !populated) {
@@ -2021,8 +1963,8 @@ int RGWUser::execute_modify(const DoutPrefixProvider *dpp, RGWUserAdminOpState&
   if (!op_email.empty()) {
     // make sure we are not adding a duplicate email
     if (old_email != op_email) {
-      ret = rgw_get_user_info_by_email(dpp, user_ctl, op_email, duplicate_check,y );
-      if (ret >= 0 && duplicate_check.user_id.compare(user_id) != 0) {
+      ret = store->get_user_by_email(dpp, op_email, y, &duplicate_check);
+      if (ret >= 0 && duplicate_check->get_id().compare(user_id) != 0) {
         set_err_msg(err_msg, "cannot add duplicate email");
         return -ERR_EMAIL_EXIST;
       }
@@ -2068,7 +2010,7 @@ int RGWUser::execute_modify(const DoutPrefixProvider *dpp, RGWUserAdminOpState&
     __u8 suspended = op_state.get_suspension_status();
     user_info.suspended = suspended;
 
-    rgw::sal::RGWBucketList buckets;
+    rgw::sal::BucketList buckets;
 
     if (user_id.empty()) {
       set_err_msg(err_msg, "empty user id passed...aborting");
@@ -2078,9 +2020,9 @@ int RGWUser::execute_modify(const DoutPrefixProvider *dpp, RGWUserAdminOpState&
     string marker;
     CephContext *cct = store->ctx();
     size_t max_buckets = cct->_conf->rgw_list_buckets_max_chunk;
+    std::unique_ptr<rgw::sal::User> user = store->get_user(user_id);
     do {
-      ret = rgw_read_user_buckets(dpp, store, user_id, buckets, marker, string(),
-                                 max_buckets, false, y);
+      ret = user->list_buckets(dpp, marker, string(), max_buckets, false, buckets, y);
       if (ret < 0) {
         set_err_msg(err_msg, "could not get buckets for uid:  " + user_id.to_str());
         return ret;
@@ -2096,7 +2038,7 @@ int RGWUser::execute_modify(const DoutPrefixProvider *dpp, RGWUserAdminOpState&
         marker = iter->first;
       }
 
-      ret = store->getRados()->set_buckets_enabled(bucket_names, !suspended, dpp);
+      ret = store->set_buckets_enabled(dpp, bucket_names, !suspended);
       if (ret < 0) {
         set_err_msg(err_msg, "failed to modify bucket");
         return ret;
@@ -2190,9 +2132,7 @@ int RGWUser::list(const DoutPrefixProvider *dpp, RGWUserAdminOpState& op_state,
     op_state.max_entries = 1000;
   }
 
-  auto meta_mgr = store->ctl()->meta.mgr;
-
-  int ret = meta_mgr->list_keys_init(dpp, metadata_key, op_state.marker, &handle);
+  int ret = store->meta_list_keys_init(dpp, metadata_key, op_state.marker, &handle);
   if (ret < 0) {
     return ret;
   }
@@ -2210,7 +2150,7 @@ int RGWUser::list(const DoutPrefixProvider *dpp, RGWUserAdminOpState& op_state,
   do {
     std::list<std::string> keys;
     left = op_state.max_entries - count;
-    ret = meta_mgr->list_keys_next(handle, left, keys, &truncated);
+    ret = store->meta_list_keys_next(dpp, handle, left, keys, &truncated);
     if (ret < 0 && ret != -ENOENT) {
       return ret;
     } if (ret != -ENOENT) {
@@ -2226,19 +2166,19 @@ int RGWUser::list(const DoutPrefixProvider *dpp, RGWUserAdminOpState& op_state,
   formatter->dump_bool("truncated", truncated);
   formatter->dump_int("count", count);
   if (truncated) {
-    formatter->dump_string("marker", meta_mgr->get_marker(handle));
+    formatter->dump_string("marker", store->meta_get_marker(handle));
   }
 
   // close result object section
   formatter->close_section();
 
-  meta_mgr->list_keys_complete(handle);
+  store->meta_list_keys_complete(handle);
 
   flusher.flush();
   return 0;
 }
 
-int RGWUserAdminOp_User::list(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWUserAdminOpState& op_state,
+int RGWUserAdminOp_User::list(const DoutPrefixProvider *dpp, rgw::sal::Store* store, RGWUserAdminOpState& op_state,
                   RGWFormatterFlusher& flusher)
 {
   RGWUser user;
@@ -2254,13 +2194,14 @@ int RGWUserAdminOp_User::list(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosS
   return 0;
 }
 
-int RGWUserAdminOp_User::info(const DoutPrefixProvider *dpp, 
-                              rgw::sal::RGWRadosStore *store, RGWUserAdminOpState& op_state,
+int RGWUserAdminOp_User::info(const DoutPrefixProvider *dpp,
+                             rgw::sal::Store* store, RGWUserAdminOpState& op_state,
                              RGWFormatterFlusher& flusher,
                              optional_yield y)
 {
   RGWUserInfo info;
   RGWUser user;
+  std::unique_ptr<rgw::sal::User> ruser;
 
   int ret = user.init(dpp, store, op_state, y);
   if (ret < 0)
@@ -2275,8 +2216,10 @@ int RGWUserAdminOp_User::info(const DoutPrefixProvider *dpp,
   if (ret < 0)
     return ret;
 
+  ruser = store->get_user(info.user_id);
+
   if (op_state.sync_stats) {
-    ret = rgw_user_sync_all_stats(dpp, store, info.user_id, y);
+    ret = rgw_user_sync_all_stats(dpp, store, ruser.get(), y);
     if (ret < 0) {
       return ret;
     }
@@ -2285,7 +2228,7 @@ int RGWUserAdminOp_User::info(const DoutPrefixProvider *dpp,
   RGWStorageStats stats;
   RGWStorageStats *arg_stats = NULL;
   if (op_state.fetch_stats) {
-    int ret = store->ctl()->user->read_stats(dpp, info.user_id, &stats, y);
+    int ret = ruser->read_stats(dpp, y, &stats);
     if (ret < 0 && ret != -ENOENT) {
       return ret;
     }
@@ -2303,8 +2246,8 @@ int RGWUserAdminOp_User::info(const DoutPrefixProvider *dpp,
   return 0;
 }
 
-int RGWUserAdminOp_User::create(const DoutPrefixProvider *dpp, 
-                                rgw::sal::RGWRadosStore *store,
+int RGWUserAdminOp_User::create(const DoutPrefixProvider *dpp,
+                               rgw::sal::Store* store,
                                RGWUserAdminOpState& op_state,
                                RGWFormatterFlusher& flusher, optional_yield y)
 {
@@ -2337,8 +2280,8 @@ int RGWUserAdminOp_User::create(const DoutPrefixProvider *dpp,
   return 0;
 }
 
-int RGWUserAdminOp_User::modify(const DoutPrefixProvider *dpp, 
-                                rgw::sal::RGWRadosStore *store,
+int RGWUserAdminOp_User::modify(const DoutPrefixProvider *dpp,
+                               rgw::sal::Store* store,
                                RGWUserAdminOpState& op_state,
                                RGWFormatterFlusher& flusher, optional_yield y)
 {
@@ -2370,7 +2313,8 @@ int RGWUserAdminOp_User::modify(const DoutPrefixProvider *dpp,
   return 0;
 }
 
-int RGWUserAdminOp_User::remove(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWUserAdminOpState& op_state,
+int RGWUserAdminOp_User::remove(const DoutPrefixProvider *dpp,
+                               rgw::sal::Store* store, RGWUserAdminOpState& op_state,
                                RGWFormatterFlusher& flusher, optional_yield y)
 {
   RGWUserInfo info;
@@ -2387,8 +2331,8 @@ int RGWUserAdminOp_User::remove(const DoutPrefixProvider *dpp, rgw::sal::RGWRado
   return ret;
 }
 
-int RGWUserAdminOp_Subuser::create(const DoutPrefixProvider *dpp, 
-                                   rgw::sal::RGWRadosStore *store,
+int RGWUserAdminOp_Subuser::create(const DoutPrefixProvider *dpp,
+                                  rgw::sal::Store* store,
                                   RGWUserAdminOpState& op_state,
                                   RGWFormatterFlusher& flusher,
                                   optional_yield y)
@@ -2422,8 +2366,8 @@ int RGWUserAdminOp_Subuser::create(const DoutPrefixProvider *dpp,
   return 0;
 }
 
-int RGWUserAdminOp_Subuser::modify(const DoutPrefixProvider *dpp, 
-                                   rgw::sal::RGWRadosStore *store, RGWUserAdminOpState& op_state,
+int RGWUserAdminOp_Subuser::modify(const DoutPrefixProvider *dpp,
+                                  rgw::sal::Store* store, RGWUserAdminOpState& op_state,
                                   RGWFormatterFlusher& flusher, optional_yield y)
 {
   RGWUserInfo info;
@@ -2455,8 +2399,8 @@ int RGWUserAdminOp_Subuser::modify(const DoutPrefixProvider *dpp,
   return 0;
 }
 
-int RGWUserAdminOp_Subuser::remove(const DoutPrefixProvider *dpp, 
-                                   rgw::sal::RGWRadosStore *store,
+int RGWUserAdminOp_Subuser::remove(const DoutPrefixProvider *dpp,
+                                  rgw::sal::Store* store,
                                   RGWUserAdminOpState& op_state,
                                   RGWFormatterFlusher& flusher,
                                   optional_yield y)
@@ -2478,8 +2422,8 @@ int RGWUserAdminOp_Subuser::remove(const DoutPrefixProvider *dpp,
   return 0;
 }
 
-int RGWUserAdminOp_Key::create(const DoutPrefixProvider *dpp, 
-                               rgw::sal::RGWRadosStore *store, RGWUserAdminOpState& op_state,
+int RGWUserAdminOp_Key::create(const DoutPrefixProvider *dpp,
+                              rgw::sal::Store* store, RGWUserAdminOpState& op_state,
                               RGWFormatterFlusher& flusher,
                               optional_yield y)
 {
@@ -2519,8 +2463,8 @@ int RGWUserAdminOp_Key::create(const DoutPrefixProvider *dpp,
   return 0;
 }
 
-int RGWUserAdminOp_Key::remove(const DoutPrefixProvider *dpp, 
-                               rgw::sal::RGWRadosStore *store,
+int RGWUserAdminOp_Key::remove(const DoutPrefixProvider *dpp,
+                              rgw::sal::Store* store,
                               RGWUserAdminOpState& op_state,
                               RGWFormatterFlusher& flusher,
                               optional_yield y)
@@ -2542,8 +2486,8 @@ int RGWUserAdminOp_Key::remove(const DoutPrefixProvider *dpp,
   return 0;
 }
 
-int RGWUserAdminOp_Caps::add(const DoutPrefixProvider *dpp, 
-                             rgw::sal::RGWRadosStore *store,
+int RGWUserAdminOp_Caps::add(const DoutPrefixProvider *dpp,
+                            rgw::sal::Store* store,
                             RGWUserAdminOpState& op_state,
                             RGWFormatterFlusher& flusher, optional_yield y)
 {
@@ -2577,8 +2521,8 @@ int RGWUserAdminOp_Caps::add(const DoutPrefixProvider *dpp,
 }
 
 
-int RGWUserAdminOp_Caps::remove(const DoutPrefixProvider *dpp, 
-                                rgw::sal::RGWRadosStore *store,
+int RGWUserAdminOp_Caps::remove(const DoutPrefixProvider *dpp,
+                               rgw::sal::Store* store,
                                RGWUserAdminOpState& op_state,
                                RGWFormatterFlusher& flusher, optional_yield y)
 {
@@ -2981,3 +2925,8 @@ RGWMetadataHandler *RGWUserMetaHandlerAllocator::alloc(RGWSI_User *user_svc) {
   return new RGWUserMetadataHandler(user_svc);
 }
 
+void rgw_user::dump(Formatter *f) const
+{
+  ::encode_json("user", *this, f);
+}
+