]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_quota.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rgw / rgw_quota.cc
index f31abae145f1c2d1ce1f9025fb809acfa80ff85b..f1ae34f9368092440f2d9ec9882ae9e1e740f72f 100644 (file)
@@ -46,7 +46,7 @@ struct RGWQuotaCacheStats {
 template<class T>
 class RGWQuotaCache {
 protected:
-  rgw::sal::Store* store;
+  rgw::sal::Driver* driver;
   lru_map<T, RGWQuotaCacheStats> stats_map;
   RefCountedWaitObject *async_refcount;
 
@@ -75,7 +75,7 @@ protected:
 
   virtual void data_modified(const rgw_user& user, rgw_bucket& bucket) {}
 public:
-  RGWQuotaCache(rgw::sal::Store* _store, int size) : store(_store), stats_map(size) {
+  RGWQuotaCache(rgw::sal::Driver* _driver, int size) : driver(_driver), stats_map(size) {
     async_refcount = new RefCountedWaitObject;
   }
   virtual ~RGWQuotaCache() {
@@ -93,10 +93,10 @@ public:
 
   class AsyncRefreshHandler {
   protected:
-    rgw::sal::Store* store;
+    rgw::sal::Driver* driver;
     RGWQuotaCache<T> *cache;
   public:
-    AsyncRefreshHandler(rgw::sal::Store* _store, RGWQuotaCache<T> *_cache) : store(_store), cache(_cache) {}
+    AsyncRefreshHandler(rgw::sal::Driver* _driver, RGWQuotaCache<T> *_cache) : driver(_driver), cache(_cache) {}
     virtual ~AsyncRefreshHandler() {}
 
     virtual int init_fetch() = 0;
@@ -134,7 +134,7 @@ int RGWQuotaCache<T>::async_refresh(const rgw_user& user, const rgw_bucket& buck
 template<class T>
 void RGWQuotaCache<T>::async_refresh_fail(const rgw_user& user, rgw_bucket& bucket)
 {
-  ldout(store->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl;
+  ldout(driver->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl;
 
   async_refcount->put();
 }
@@ -142,7 +142,7 @@ void RGWQuotaCache<T>::async_refresh_fail(const rgw_user& user, rgw_bucket& buck
 template<class T>
 void RGWQuotaCache<T>::async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats)
 {
-  ldout(store->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl;
+  ldout(driver->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl;
 
   RGWQuotaCacheStats qs;
 
@@ -159,8 +159,8 @@ void RGWQuotaCache<T>::set_stats(const rgw_user& user, const rgw_bucket& bucket,
   qs.stats = stats;
   qs.expiration = ceph_clock_now();
   qs.async_refresh_time = qs.expiration;
-  qs.expiration += store->ctx()->_conf->rgw_bucket_quota_ttl;
-  qs.async_refresh_time += store->ctx()->_conf->rgw_bucket_quota_ttl / 2;
+  qs.expiration += driver->ctx()->_conf->rgw_bucket_quota_ttl;
+  qs.async_refresh_time += driver->ctx()->_conf->rgw_bucket_quota_ttl / 2;
 
   map_add(user, bucket, qs);
 }
@@ -250,9 +250,9 @@ class BucketAsyncRefreshHandler : public RGWQuotaCache<rgw_bucket>::AsyncRefresh
                                   public RGWGetBucketStats_CB {
   rgw_user user;
 public:
-  BucketAsyncRefreshHandler(rgw::sal::Store* _store, RGWQuotaCache<rgw_bucket> *_cache,
+  BucketAsyncRefreshHandler(rgw::sal::Driver* _driver, RGWQuotaCache<rgw_bucket> *_cache,
                             const rgw_user& _user, const rgw_bucket& _bucket) :
-                                      RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler(_store, _cache),
+                                      RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler(_driver, _cache),
                                       RGWGetBucketStats_CB(_bucket), user(_user) {}
 
   void drop_reference() override { put(); }
@@ -264,8 +264,8 @@ int BucketAsyncRefreshHandler::init_fetch()
 {
   std::unique_ptr<rgw::sal::Bucket> rbucket;
 
-  const DoutPrefix dp(store->ctx(), dout_subsys, "rgw bucket async refresh handler: ");
-  int r = store->get_bucket(&dp, nullptr, bucket, &rbucket, null_yield);
+  const DoutPrefix dp(driver->ctx(), dout_subsys, "rgw bucket async refresh handler: ");
+  int r = driver->get_bucket(&dp, nullptr, bucket, &rbucket, null_yield);
   if (r < 0) {
     ldpp_dout(&dp, 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl;
     return r;
@@ -273,7 +273,12 @@ int BucketAsyncRefreshHandler::init_fetch()
 
   ldpp_dout(&dp, 20) << "initiating async quota refresh for bucket=" << bucket << dendl;
 
-  r = rbucket->read_stats_async(&dp, RGW_NO_SHARD, this);
+  const auto& index = rbucket->get_info().get_current_index();
+  if (is_layout_indexless(index)) {
+    return 0;
+  }
+
+  r = rbucket->read_stats_async(&dp, index, RGW_NO_SHARD, this);
   if (r < 0) {
     ldpp_dout(&dp, 0) << "could not get bucket info for bucket=" << bucket.name << dendl;
 
@@ -287,7 +292,7 @@ int BucketAsyncRefreshHandler::init_fetch()
 void BucketAsyncRefreshHandler::handle_response(const int r)
 {
   if (r < 0) {
-    ldout(store->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
+    ldout(driver->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
     cache->async_refresh_fail(user, bucket);
     return;
   }
@@ -322,38 +327,44 @@ protected:
   int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider *dpp) override;
 
 public:
-  explicit RGWBucketStatsCache(rgw::sal::Store* _store) : RGWQuotaCache<rgw_bucket>(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size) {
+  explicit RGWBucketStatsCache(rgw::sal::Driver* _driver) : RGWQuotaCache<rgw_bucket>(_driver, _driver->ctx()->_conf->rgw_bucket_quota_cache_size) {
   }
 
   AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override {
-    return new BucketAsyncRefreshHandler(store, this, user, bucket);
+    return new BucketAsyncRefreshHandler(driver, this, user, bucket);
   }
 };
 
 int RGWBucketStatsCache::fetch_stats_from_storage(const rgw_user& _u, const rgw_bucket& _b, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider *dpp)
 {
-  std::unique_ptr<rgw::sal::User> user = store->get_user(_u);
+  std::unique_ptr<rgw::sal::User> user = driver->get_user(_u);
   std::unique_ptr<rgw::sal::Bucket> bucket;
 
-  int r = store->get_bucket(dpp, user.get(), _b, &bucket, y);
+  int r = driver->get_bucket(dpp, user.get(), _b, &bucket, y);
   if (r < 0) {
     ldpp_dout(dpp, 0) << "could not get bucket info for bucket=" << _b << " r=" << r << dendl;
     return r;
   }
 
+  stats = RGWStorageStats();
+
+  const auto& index = bucket->get_info().get_current_index();
+  if (is_layout_indexless(index)) {
+    return 0;
+  }
+
   string bucket_ver;
   string master_ver;
 
   map<RGWObjCategory, RGWStorageStats> bucket_stats;
-  r = bucket->read_stats(dpp, RGW_NO_SHARD, &bucket_ver, &master_ver, bucket_stats);
+  r = bucket->read_stats(dpp, index, RGW_NO_SHARD, &bucket_ver,
+                        &master_ver, bucket_stats, nullptr);
   if (r < 0) {
     ldpp_dout(dpp, 0) << "could not get bucket stats for bucket="
                            << _b.name << dendl;
     return r;
   }
 
-  stats = RGWStorageStats();
-
   for (const auto& pair : bucket_stats) {
     const RGWStorageStats& s = pair.second;
 
@@ -370,9 +381,9 @@ class UserAsyncRefreshHandler : public RGWQuotaCache<rgw_user>::AsyncRefreshHand
   const DoutPrefixProvider *dpp;
   rgw_bucket bucket;
 public:
-  UserAsyncRefreshHandler(const DoutPrefixProvider *_dpp, rgw::sal::Store* _store, RGWQuotaCache<rgw_user> *_cache,
+  UserAsyncRefreshHandler(const DoutPrefixProvider *_dpp, rgw::sal::Driver* _driver, RGWQuotaCache<rgw_user> *_cache,
                           const rgw_user& _user, const rgw_bucket& _bucket) :
-                          RGWQuotaCache<rgw_user>::AsyncRefreshHandler(_store, _cache),
+                          RGWQuotaCache<rgw_user>::AsyncRefreshHandler(_driver, _cache),
                           RGWGetUserStats_CB(_user),
                           dpp(_dpp),
                           bucket(_bucket) {}
@@ -384,7 +395,7 @@ public:
 
 int UserAsyncRefreshHandler::init_fetch()
 {
-  std::unique_ptr<rgw::sal::User> ruser = store->get_user(user);
+  std::unique_ptr<rgw::sal::User> ruser = driver->get_user(user);
 
   ldpp_dout(dpp, 20) << "initiating async quota refresh for user=" << user << dendl;
   int r = ruser->read_stats_async(dpp, this);
@@ -401,7 +412,7 @@ int UserAsyncRefreshHandler::init_fetch()
 void UserAsyncRefreshHandler::handle_response(int r)
 {
   if (r < 0) {
-    ldout(store->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
+    ldout(driver->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
     cache->async_refresh_fail(user, bucket);
     return;
   }
@@ -546,13 +557,13 @@ protected:
   }
 
 public:
-  RGWUserStatsCache(const DoutPrefixProvider *dpp, rgw::sal::Store* _store, bool quota_threads)
-    : RGWQuotaCache<rgw_user>(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size), dpp(dpp)
+  RGWUserStatsCache(const DoutPrefixProvider *dpp, rgw::sal::Driver* _driver, bool quota_threads)
+    : RGWQuotaCache<rgw_user>(_driver, _driver->ctx()->_conf->rgw_bucket_quota_cache_size), dpp(dpp)
   {
     if (quota_threads) {
-      buckets_sync_thread = new BucketsSyncThread(store->ctx(), this);
+      buckets_sync_thread = new BucketsSyncThread(driver->ctx(), this);
       buckets_sync_thread->create("rgw_buck_st_syn");
-      user_sync_thread = new UserSyncThread(store->ctx(), this);
+      user_sync_thread = new UserSyncThread(driver->ctx(), this);
       user_sync_thread->create("rgw_user_st_syn");
     } else {
       buckets_sync_thread = NULL;
@@ -564,7 +575,7 @@ public:
   }
 
   AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override {
-    return new UserAsyncRefreshHandler(dpp, store, this, user, bucket);
+    return new UserAsyncRefreshHandler(dpp, driver, this, user, bucket);
   }
 
   bool going_down() {
@@ -587,7 +598,7 @@ int RGWUserStatsCache::fetch_stats_from_storage(const rgw_user& _u,
                                                optional_yield y,
                                                 const DoutPrefixProvider *dpp)
 {
-  std::unique_ptr<rgw::sal::User> user = store->get_user(_u);
+  std::unique_ptr<rgw::sal::User> user = driver->get_user(_u);
   int r = user->read_stats(dpp, y, &stats);
   if (r < 0) {
     ldpp_dout(dpp, 0) << "could not get user stats for user=" << user << dendl;
@@ -599,10 +610,10 @@ int RGWUserStatsCache::fetch_stats_from_storage(const rgw_user& _u,
 
 int RGWUserStatsCache::sync_bucket(const rgw_user& _u, rgw_bucket& _b, optional_yield y, const DoutPrefixProvider *dpp)
 {
-  std::unique_ptr<rgw::sal::User> user = store->get_user(_u);
+  std::unique_ptr<rgw::sal::User> user = driver->get_user(_u);
   std::unique_ptr<rgw::sal::Bucket> bucket;
 
-  int r = store->get_bucket(dpp, user.get(), _b, &bucket, y);
+  int r = driver->get_bucket(dpp, user.get(), _b, &bucket, y);
   if (r < 0) {
     ldpp_dout(dpp, 0) << "could not get bucket info for bucket=" << _b << " r=" << r << dendl;
     return r;
@@ -622,7 +633,7 @@ int RGWUserStatsCache::sync_user(const DoutPrefixProvider *dpp, const rgw_user&
   RGWStorageStats stats;
   ceph::real_time last_stats_sync;
   ceph::real_time last_stats_update;
-  std::unique_ptr<rgw::sal::User> user = store->get_user(rgw_user(_u.to_str()));
+  std::unique_ptr<rgw::sal::User> user = driver->get_user(rgw_user(_u.to_str()));
 
   int ret = user->read_stats(dpp, y, &stats, &last_stats_sync, &last_stats_update);
   if (ret < 0) {
@@ -630,19 +641,19 @@ int RGWUserStatsCache::sync_user(const DoutPrefixProvider *dpp, const rgw_user&
     return ret;
   }
 
-  if (!store->ctx()->_conf->rgw_user_quota_sync_idle_users &&
+  if (!driver->ctx()->_conf->rgw_user_quota_sync_idle_users &&
       last_stats_update < last_stats_sync) {
     ldpp_dout(dpp, 20) << "user is idle, not doing a full sync (user=" << user << ")" << dendl;
     return 0;
   }
 
   real_time when_need_full_sync = last_stats_sync;
-  when_need_full_sync += make_timespan(store->ctx()->_conf->rgw_user_quota_sync_wait_time);
+  when_need_full_sync += make_timespan(driver->ctx()->_conf->rgw_user_quota_sync_wait_time);
   
   // check if enough time passed since last full sync
   /* FIXME: missing check? */
 
-  ret = rgw_user_sync_all_stats(dpp, store, user.get(), y);
+  ret = rgw_user_sync_all_stats(dpp, driver, user.get(), y);
   if (ret < 0) {
     ldpp_dout(dpp, 0) << "ERROR: failed user stats sync, ret=" << ret << dendl;
     return ret;
@@ -656,7 +667,7 @@ int RGWUserStatsCache::sync_all_users(const DoutPrefixProvider *dpp, optional_yi
   string key = "user";
   void *handle;
 
-  int ret = store->meta_list_keys_init(dpp, key, string(), &handle);
+  int ret = driver->meta_list_keys_init(dpp, key, string(), &handle);
   if (ret < 0) {
     ldpp_dout(dpp, 10) << "ERROR: can't get key: ret=" << ret << dendl;
     return ret;
@@ -667,7 +678,7 @@ int RGWUserStatsCache::sync_all_users(const DoutPrefixProvider *dpp, optional_yi
 
   do {
     list<string> keys;
-    ret = store->meta_list_keys_next(dpp, handle, max, keys, &truncated);
+    ret = driver->meta_list_keys_next(dpp, handle, max, keys, &truncated);
     if (ret < 0) {
       ldpp_dout(dpp, 0) << "ERROR: lists_keys_next(): ret=" << ret << dendl;
       goto done;
@@ -689,7 +700,7 @@ int RGWUserStatsCache::sync_all_users(const DoutPrefixProvider *dpp, optional_yi
 
   ret = 0;
 done:
-  store->meta_list_keys_complete(handle);
+  driver->meta_list_keys_complete(handle);
   return ret;
 }
 
@@ -772,7 +783,7 @@ bool RGWQuotaInfoDefApplier::is_size_exceeded(const DoutPrefixProvider *dpp,
   const uint64_t cur_size = stats.size_rounded;
   const uint64_t new_size = rgw_rounded_objsize(size);
 
-  if (cur_size + new_size > static_cast<uint64_t>(qinfo.max_size)) {
+  if (std::cmp_greater(cur_size + new_size, qinfo.max_size)) {
     ldpp_dout(dpp, 10) << "quota exceeded: stats.size_rounded=" << stats.size_rounded
              << " size=" << new_size << " "
              << entity << "_quota.max_size=" << qinfo.max_size << dendl;
@@ -793,7 +804,7 @@ bool RGWQuotaInfoDefApplier::is_num_objs_exceeded(const DoutPrefixProvider *dpp,
     return false;
   }
 
-  if (stats.num_objects + num_objs > static_cast<uint64_t>(qinfo.max_objects)) {
+  if (std::cmp_greater(stats.num_objects + num_objs, qinfo.max_objects)) {
     ldpp_dout(dpp, 10) << "quota exceeded: stats.num_objects=" << stats.num_objects
              << " " << entity << "_quota.max_objects=" << qinfo.max_objects
              << dendl;
@@ -816,7 +827,7 @@ bool RGWQuotaInfoRawApplier::is_size_exceeded(const DoutPrefixProvider *dpp,
 
   const uint64_t cur_size = stats.size;
 
-  if (cur_size + size > static_cast<uint64_t>(qinfo.max_size)) {
+  if (std::cmp_greater(cur_size + size, qinfo.max_size)) {
     ldpp_dout(dpp, 10) << "quota exceeded: stats.size=" << stats.size
              << " size=" << size << " "
              << entity << "_quota.max_size=" << qinfo.max_size << dendl;
@@ -837,7 +848,7 @@ bool RGWQuotaInfoRawApplier::is_num_objs_exceeded(const DoutPrefixProvider *dpp,
     return false;
   }
 
-  if (stats.num_objects + num_objs > static_cast<uint64_t>(qinfo.max_objects)) {
+  if (std::cmp_greater(stats.num_objects + num_objs, qinfo.max_objects)) {
     ldpp_dout(dpp, 10) << "quota exceeded: stats.num_objects=" << stats.num_objects
              << " " << entity << "_quota.max_objects=" << qinfo.max_objects
              << dendl;
@@ -862,7 +873,7 @@ const RGWQuotaInfoApplier& RGWQuotaInfoApplier::get_instance(
 
 
 class RGWQuotaHandlerImpl : public RGWQuotaHandler {
-  rgw::sal::Store* store;
+  rgw::sal::Driver* driver;
   RGWBucketStatsCache bucket_stats_cache;
   RGWUserStatsCache user_stats_cache;
 
@@ -897,19 +908,18 @@ class RGWQuotaHandlerImpl : public RGWQuotaHandler {
     return 0;
   }
 public:
-  RGWQuotaHandlerImpl(const DoutPrefixProvider *dpp, rgw::sal::Store* _store, bool quota_threads) : store(_store),
-                                    bucket_stats_cache(_store),
-                                    user_stats_cache(dpp, _store, quota_threads) {}
+  RGWQuotaHandlerImpl(const DoutPrefixProvider *dpp, rgw::sal::Driver* _driver, bool quota_threads) : driver(_driver),
+                                    bucket_stats_cache(_driver),
+                                    user_stats_cache(dpp, _driver, quota_threads) {}
 
   int check_quota(const DoutPrefixProvider *dpp,
                   const rgw_user& user,
-                 rgw_bucket& bucket,
-                 RGWQuotaInfo& user_quota,
-                 RGWQuotaInfo& bucket_quota,
-                 uint64_t num_objs,
-                 uint64_t size, optional_yield y) override {
+                  rgw_bucket& bucket,
+                  RGWQuota& quota,
+                  uint64_t num_objs,
+                  uint64_t size, optional_yield y) override {
 
-    if (!bucket_quota.enabled && !user_quota.enabled) {
+    if (!quota.bucket_quota.enabled && !quota.user_quota.enabled) {
       return 0;
     }
 
@@ -920,26 +930,26 @@ public:
      * fetch that info and not rely on cached data
      */
 
-    const DoutPrefix dp(store->ctx(), dout_subsys, "rgw quota handler: ");
-    if (bucket_quota.enabled) {
+    const DoutPrefix dp(driver->ctx(), dout_subsys, "rgw quota handler: ");
+    if (quota.bucket_quota.enabled) {
       RGWStorageStats bucket_stats;
       int ret = bucket_stats_cache.get_stats(user, bucket, bucket_stats, y, &dp);
       if (ret < 0) {
         return ret;
       }
-      ret = check_quota(dpp, "bucket", bucket_quota, bucket_stats, num_objs, size);
+      ret = check_quota(dpp, "bucket", quota.bucket_quota, bucket_stats, num_objs, size);
       if (ret < 0) {
         return ret;
       }
     }
 
-    if (user_quota.enabled) {
+    if (quota.user_quota.enabled) {
       RGWStorageStats user_stats;
       int ret = user_stats_cache.get_stats(user, bucket, user_stats, y, &dp);
       if (ret < 0) {
         return ret;
       }
-      ret = check_quota(dpp, "user", user_quota, user_stats, num_objs, size);
+      ret = check_quota(dpp, "user", quota.user_quota, user_stats, num_objs, size);
       if (ret < 0) {
         return ret;
       }
@@ -952,15 +962,23 @@ public:
     user_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes);
   }
 
-  void check_bucket_shards(const DoutPrefixProvider *dpp, uint64_t max_objs_per_shard, uint64_t num_shards,
-                          uint64_t num_objs, bool& need_resharding, uint32_t *suggested_num_shards) override
+  void check_bucket_shards(const DoutPrefixProvider *dpp, uint64_t max_objs_per_shard,
+                           uint64_t num_shards, uint64_t num_objs, bool is_multisite,
+                           bool& need_resharding, uint32_t *suggested_num_shards) override
   {
     if (num_objs > num_shards * max_objs_per_shard) {
       ldpp_dout(dpp, 0) << __func__ << ": resharding needed: stats.num_objects=" << num_objs
              << " shard max_objects=" <<  max_objs_per_shard * num_shards << dendl;
       need_resharding = true;
       if (suggested_num_shards) {
-        *suggested_num_shards = num_objs * 2 / max_objs_per_shard;
+        uint32_t obj_multiplier = 2;
+        if (is_multisite) {
+          // if we're maintaining bilogs for multisite, reshards are significantly
+          // more expensive. scale up the shard count much faster to minimize the
+          // number of reshard events during a write workload
+          obj_multiplier = 8;
+        }
+        *suggested_num_shards = num_objs * obj_multiplier / max_objs_per_shard;
       }
     } else {
       need_resharding = false;
@@ -969,9 +987,9 @@ public:
 };
 
 
-RGWQuotaHandler *RGWQuotaHandler::generate_handler(const DoutPrefixProvider *dpp, rgw::sal::Store* store, bool quota_threads)
+RGWQuotaHandler *RGWQuotaHandler::generate_handler(const DoutPrefixProvider *dpp, rgw::sal::Driver* driver, bool quota_threads)
 {
-  return new RGWQuotaHandlerImpl(dpp, store, quota_threads);
+  return new RGWQuotaHandlerImpl(dpp, driver, quota_threads);
 }
 
 void RGWQuotaHandler::free_handler(RGWQuotaHandler *handler)