1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
5 * Ceph - scalable distributed file system
7 * Copyright (C) 2013 Inktank, Inc
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
17 #include "include/utime.h"
18 #include "common/lru_map.h"
19 #include "common/RefCountedObj.h"
20 #include "common/Thread.h"
21 #include "common/ceph_mutex.h"
23 #include "rgw_common.h"
25 #include "rgw_sal_rados.h"
26 #include "rgw_quota.h"
27 #include "rgw_bucket.h"
30 #include "services/svc_sys_obj.h"
31 #include "services/svc_meta.h"
35 #define dout_context g_ceph_context
36 #define dout_subsys ceph_subsys_rgw
40 struct RGWQuotaCacheStats
{
41 RGWStorageStats stats
;
43 utime_t async_refresh_time
;
49 rgw::sal::Driver
* driver
;
50 lru_map
<T
, RGWQuotaCacheStats
> stats_map
;
51 RefCountedWaitObject
*async_refcount
;
53 class StatsAsyncTestSet
: public lru_map
<T
, RGWQuotaCacheStats
>::UpdateContext
{
56 uint64_t removed_bytes
;
58 StatsAsyncTestSet() : objs_delta(0), added_bytes(0), removed_bytes(0) {}
59 bool update(RGWQuotaCacheStats
*entry
) override
{
60 if (entry
->async_refresh_time
.sec() == 0)
63 entry
->async_refresh_time
= utime_t(0, 0);
69 virtual int fetch_stats_from_storage(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWStorageStats
& stats
, optional_yield y
, const DoutPrefixProvider
*dpp
) = 0;
71 virtual bool map_find(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWQuotaCacheStats
& qs
) = 0;
73 virtual bool map_find_and_update(const rgw_user
& user
, const rgw_bucket
& bucket
, typename lru_map
<T
, RGWQuotaCacheStats
>::UpdateContext
*ctx
) = 0;
74 virtual void map_add(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWQuotaCacheStats
& qs
) = 0;
76 virtual void data_modified(const rgw_user
& user
, rgw_bucket
& bucket
) {}
78 RGWQuotaCache(rgw::sal::Driver
* _driver
, int size
) : driver(_driver
), stats_map(size
) {
79 async_refcount
= new RefCountedWaitObject
;
81 virtual ~RGWQuotaCache() {
82 async_refcount
->put_wait(); /* wait for all pending async requests to complete */
85 int get_stats(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWStorageStats
& stats
, optional_yield y
,
86 const DoutPrefixProvider
* dpp
);
87 void adjust_stats(const rgw_user
& user
, rgw_bucket
& bucket
, int objs_delta
, uint64_t added_bytes
, uint64_t removed_bytes
);
89 void set_stats(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWQuotaCacheStats
& qs
, RGWStorageStats
& stats
);
90 int async_refresh(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWQuotaCacheStats
& qs
);
91 void async_refresh_response(const rgw_user
& user
, rgw_bucket
& bucket
, RGWStorageStats
& stats
);
92 void async_refresh_fail(const rgw_user
& user
, rgw_bucket
& bucket
);
94 class AsyncRefreshHandler
{
96 rgw::sal::Driver
* driver
;
97 RGWQuotaCache
<T
> *cache
;
99 AsyncRefreshHandler(rgw::sal::Driver
* _driver
, RGWQuotaCache
<T
> *_cache
) : driver(_driver
), cache(_cache
) {}
100 virtual ~AsyncRefreshHandler() {}
102 virtual int init_fetch() = 0;
103 virtual void drop_reference() = 0;
106 virtual AsyncRefreshHandler
*allocate_refresh_handler(const rgw_user
& user
, const rgw_bucket
& bucket
) = 0;
110 int RGWQuotaCache
<T
>::async_refresh(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWQuotaCacheStats
& qs
)
112 /* protect against multiple updates */
113 StatsAsyncTestSet test_update
;
114 if (!map_find_and_update(user
, bucket
, &test_update
)) {
115 /* most likely we just raced with another update */
119 async_refcount
->get();
122 AsyncRefreshHandler
*handler
= allocate_refresh_handler(user
, bucket
);
124 int ret
= handler
->init_fetch();
126 async_refcount
->put();
127 handler
->drop_reference();
135 void RGWQuotaCache
<T
>::async_refresh_fail(const rgw_user
& user
, rgw_bucket
& bucket
)
137 ldout(driver
->ctx(), 20) << "async stats refresh response for bucket=" << bucket
<< dendl
;
139 async_refcount
->put();
143 void RGWQuotaCache
<T
>::async_refresh_response(const rgw_user
& user
, rgw_bucket
& bucket
, RGWStorageStats
& stats
)
145 ldout(driver
->ctx(), 20) << "async stats refresh response for bucket=" << bucket
<< dendl
;
147 RGWQuotaCacheStats qs
;
149 map_find(user
, bucket
, qs
);
151 set_stats(user
, bucket
, qs
, stats
);
153 async_refcount
->put();
157 void RGWQuotaCache
<T
>::set_stats(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWQuotaCacheStats
& qs
, RGWStorageStats
& stats
)
160 qs
.expiration
= ceph_clock_now();
161 qs
.async_refresh_time
= qs
.expiration
;
162 qs
.expiration
+= driver
->ctx()->_conf
->rgw_bucket_quota_ttl
;
163 qs
.async_refresh_time
+= driver
->ctx()->_conf
->rgw_bucket_quota_ttl
/ 2;
165 map_add(user
, bucket
, qs
);
169 int RGWQuotaCache
<T
>::get_stats(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWStorageStats
& stats
, optional_yield y
, const DoutPrefixProvider
* dpp
) {
170 RGWQuotaCacheStats qs
;
171 utime_t now
= ceph_clock_now();
172 if (map_find(user
, bucket
, qs
)) {
173 if (qs
.async_refresh_time
.sec() > 0 && now
>= qs
.async_refresh_time
) {
174 int r
= async_refresh(user
, bucket
, qs
);
176 ldpp_dout(dpp
, 0) << "ERROR: quota async refresh returned ret=" << r
<< dendl
;
178 /* continue processing, might be a transient error, async refresh is just optimization */
182 if (qs
.expiration
> ceph_clock_now()) {
188 int ret
= fetch_stats_from_storage(user
, bucket
, stats
, y
, dpp
);
189 if (ret
< 0 && ret
!= -ENOENT
)
192 set_stats(user
, bucket
, qs
, stats
);
199 class RGWQuotaStatsUpdate
: public lru_map
<T
, RGWQuotaCacheStats
>::UpdateContext
{
200 const int objs_delta
;
201 const uint64_t added_bytes
;
202 const uint64_t removed_bytes
;
204 RGWQuotaStatsUpdate(const int objs_delta
,
205 const uint64_t added_bytes
,
206 const uint64_t removed_bytes
)
207 : objs_delta(objs_delta
),
208 added_bytes(added_bytes
),
209 removed_bytes(removed_bytes
) {
212 bool update(RGWQuotaCacheStats
* const entry
) override
{
213 const uint64_t rounded_added
= rgw_rounded_objsize(added_bytes
);
214 const uint64_t rounded_removed
= rgw_rounded_objsize(removed_bytes
);
216 if (((int64_t)(entry
->stats
.size
+ added_bytes
- removed_bytes
)) >= 0) {
217 entry
->stats
.size
+= added_bytes
- removed_bytes
;
219 entry
->stats
.size
= 0;
222 if (((int64_t)(entry
->stats
.size_rounded
+ rounded_added
- rounded_removed
)) >= 0) {
223 entry
->stats
.size_rounded
+= rounded_added
- rounded_removed
;
225 entry
->stats
.size_rounded
= 0;
228 if (((int64_t)(entry
->stats
.num_objects
+ objs_delta
)) >= 0) {
229 entry
->stats
.num_objects
+= objs_delta
;
231 entry
->stats
.num_objects
= 0;
240 void RGWQuotaCache
<T
>::adjust_stats(const rgw_user
& user
, rgw_bucket
& bucket
, int objs_delta
,
241 uint64_t added_bytes
, uint64_t removed_bytes
)
243 RGWQuotaStatsUpdate
<T
> update(objs_delta
, added_bytes
, removed_bytes
);
244 map_find_and_update(user
, bucket
, &update
);
246 data_modified(user
, bucket
);
249 class BucketAsyncRefreshHandler
: public RGWQuotaCache
<rgw_bucket
>::AsyncRefreshHandler
,
250 public RGWGetBucketStats_CB
{
253 BucketAsyncRefreshHandler(rgw::sal::Driver
* _driver
, RGWQuotaCache
<rgw_bucket
> *_cache
,
254 const rgw_user
& _user
, const rgw_bucket
& _bucket
) :
255 RGWQuotaCache
<rgw_bucket
>::AsyncRefreshHandler(_driver
, _cache
),
256 RGWGetBucketStats_CB(_bucket
), user(_user
) {}
258 void drop_reference() override
{ put(); }
259 void handle_response(int r
) override
;
260 int init_fetch() override
;
263 int BucketAsyncRefreshHandler::init_fetch()
265 std::unique_ptr
<rgw::sal::Bucket
> rbucket
;
267 const DoutPrefix
dp(driver
->ctx(), dout_subsys
, "rgw bucket async refresh handler: ");
268 int r
= driver
->get_bucket(&dp
, nullptr, bucket
, &rbucket
, null_yield
);
270 ldpp_dout(&dp
, 0) << "could not get bucket info for bucket=" << bucket
<< " r=" << r
<< dendl
;
274 ldpp_dout(&dp
, 20) << "initiating async quota refresh for bucket=" << bucket
<< dendl
;
276 const auto& index
= rbucket
->get_info().get_current_index();
277 if (is_layout_indexless(index
)) {
281 r
= rbucket
->read_stats_async(&dp
, index
, RGW_NO_SHARD
, this);
283 ldpp_dout(&dp
, 0) << "could not get bucket info for bucket=" << bucket
.name
<< dendl
;
285 /* read_stats_async() dropped our reference already */
292 void BucketAsyncRefreshHandler::handle_response(const int r
)
295 ldout(driver
->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r
<< dendl
;
296 cache
->async_refresh_fail(user
, bucket
);
302 for (const auto& pair
: *stats
) {
303 const RGWStorageStats
& s
= pair
.second
;
306 bs
.size_rounded
+= s
.size_rounded
;
307 bs
.num_objects
+= s
.num_objects
;
310 cache
->async_refresh_response(user
, bucket
, bs
);
313 class RGWBucketStatsCache
: public RGWQuotaCache
<rgw_bucket
> {
315 bool map_find(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWQuotaCacheStats
& qs
) override
{
316 return stats_map
.find(bucket
, qs
);
319 bool map_find_and_update(const rgw_user
& user
, const rgw_bucket
& bucket
, lru_map
<rgw_bucket
, RGWQuotaCacheStats
>::UpdateContext
*ctx
) override
{
320 return stats_map
.find_and_update(bucket
, NULL
, ctx
);
323 void map_add(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWQuotaCacheStats
& qs
) override
{
324 stats_map
.add(bucket
, qs
);
327 int fetch_stats_from_storage(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWStorageStats
& stats
, optional_yield y
, const DoutPrefixProvider
*dpp
) override
;
330 explicit RGWBucketStatsCache(rgw::sal::Driver
* _driver
) : RGWQuotaCache
<rgw_bucket
>(_driver
, _driver
->ctx()->_conf
->rgw_bucket_quota_cache_size
) {
333 AsyncRefreshHandler
*allocate_refresh_handler(const rgw_user
& user
, const rgw_bucket
& bucket
) override
{
334 return new BucketAsyncRefreshHandler(driver
, this, user
, bucket
);
338 int RGWBucketStatsCache::fetch_stats_from_storage(const rgw_user
& _u
, const rgw_bucket
& _b
, RGWStorageStats
& stats
, optional_yield y
, const DoutPrefixProvider
*dpp
)
340 std::unique_ptr
<rgw::sal::User
> user
= driver
->get_user(_u
);
341 std::unique_ptr
<rgw::sal::Bucket
> bucket
;
343 int r
= driver
->get_bucket(dpp
, user
.get(), _b
, &bucket
, y
);
345 ldpp_dout(dpp
, 0) << "could not get bucket info for bucket=" << _b
<< " r=" << r
<< dendl
;
349 stats
= RGWStorageStats();
351 const auto& index
= bucket
->get_info().get_current_index();
352 if (is_layout_indexless(index
)) {
359 map
<RGWObjCategory
, RGWStorageStats
> bucket_stats
;
360 r
= bucket
->read_stats(dpp
, index
, RGW_NO_SHARD
, &bucket_ver
,
361 &master_ver
, bucket_stats
, nullptr);
363 ldpp_dout(dpp
, 0) << "could not get bucket stats for bucket="
368 for (const auto& pair
: bucket_stats
) {
369 const RGWStorageStats
& s
= pair
.second
;
371 stats
.size
+= s
.size
;
372 stats
.size_rounded
+= s
.size_rounded
;
373 stats
.num_objects
+= s
.num_objects
;
379 class UserAsyncRefreshHandler
: public RGWQuotaCache
<rgw_user
>::AsyncRefreshHandler
,
380 public RGWGetUserStats_CB
{
381 const DoutPrefixProvider
*dpp
;
384 UserAsyncRefreshHandler(const DoutPrefixProvider
*_dpp
, rgw::sal::Driver
* _driver
, RGWQuotaCache
<rgw_user
> *_cache
,
385 const rgw_user
& _user
, const rgw_bucket
& _bucket
) :
386 RGWQuotaCache
<rgw_user
>::AsyncRefreshHandler(_driver
, _cache
),
387 RGWGetUserStats_CB(_user
),
391 void drop_reference() override
{ put(); }
392 int init_fetch() override
;
393 void handle_response(int r
) override
;
396 int UserAsyncRefreshHandler::init_fetch()
398 std::unique_ptr
<rgw::sal::User
> ruser
= driver
->get_user(user
);
400 ldpp_dout(dpp
, 20) << "initiating async quota refresh for user=" << user
<< dendl
;
401 int r
= ruser
->read_stats_async(dpp
, this);
403 ldpp_dout(dpp
, 0) << "could not get bucket info for user=" << user
<< dendl
;
405 /* get_bucket_stats_async() dropped our reference already */
412 void UserAsyncRefreshHandler::handle_response(int r
)
415 ldout(driver
->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r
<< dendl
;
416 cache
->async_refresh_fail(user
, bucket
);
420 cache
->async_refresh_response(user
, bucket
, stats
);
423 class RGWUserStatsCache
: public RGWQuotaCache
<rgw_user
> {
424 const DoutPrefixProvider
*dpp
;
425 std::atomic
<bool> down_flag
= { false };
426 ceph::shared_mutex mutex
= ceph::make_shared_mutex("RGWUserStatsCache");
427 map
<rgw_bucket
, rgw_user
> modified_buckets
;
429 /* thread, sync recent modified buckets info */
430 class BucketsSyncThread
: public Thread
{
432 RGWUserStatsCache
*stats
;
434 ceph::mutex lock
= ceph::make_mutex("RGWUserStatsCache::BucketsSyncThread");
435 ceph::condition_variable cond
;
438 BucketsSyncThread(CephContext
*_cct
, RGWUserStatsCache
*_s
) : cct(_cct
), stats(_s
) {}
440 void *entry() override
{
441 ldout(cct
, 20) << "BucketsSyncThread: start" << dendl
;
443 map
<rgw_bucket
, rgw_user
> buckets
;
445 stats
->swap_modified_buckets(buckets
);
447 for (map
<rgw_bucket
, rgw_user
>::iterator iter
= buckets
.begin(); iter
!= buckets
.end(); ++iter
) {
448 rgw_bucket bucket
= iter
->first
;
449 rgw_user
& user
= iter
->second
;
450 ldout(cct
, 20) << "BucketsSyncThread: sync user=" << user
<< " bucket=" << bucket
<< dendl
;
451 const DoutPrefix
dp(cct
, dout_subsys
, "rgw bucket sync thread: ");
452 int r
= stats
->sync_bucket(user
, bucket
, null_yield
, &dp
);
454 ldout(cct
, 0) << "WARNING: sync_bucket() returned r=" << r
<< dendl
;
458 if (stats
->going_down())
461 std::unique_lock locker
{lock
};
464 std::chrono::seconds(cct
->_conf
->rgw_user_quota_bucket_sync_interval
));
465 } while (!stats
->going_down());
466 ldout(cct
, 20) << "BucketsSyncThread: done" << dendl
;
472 std::lock_guard l
{lock
};
478 * thread, full sync all users stats periodically
480 * only sync non idle users or ones that never got synced before, this is needed so that
481 * users that didn't have quota turned on before (or existed before the user objclass
482 * tracked stats) need to get their backend stats up to date.
484 class UserSyncThread
: public Thread
{
486 RGWUserStatsCache
*stats
;
488 ceph::mutex lock
= ceph::make_mutex("RGWUserStatsCache::UserSyncThread");
489 ceph::condition_variable cond
;
492 UserSyncThread(CephContext
*_cct
, RGWUserStatsCache
*_s
) : cct(_cct
), stats(_s
) {}
494 void *entry() override
{
495 ldout(cct
, 20) << "UserSyncThread: start" << dendl
;
497 const DoutPrefix
dp(cct
, dout_subsys
, "rgw user sync thread: ");
498 int ret
= stats
->sync_all_users(&dp
, null_yield
);
500 ldout(cct
, 5) << "ERROR: sync_all_users() returned ret=" << ret
<< dendl
;
503 if (stats
->going_down())
506 std::unique_lock l
{lock
};
507 cond
.wait_for(l
, std::chrono::seconds(cct
->_conf
->rgw_user_quota_sync_interval
));
508 } while (!stats
->going_down());
509 ldout(cct
, 20) << "UserSyncThread: done" << dendl
;
515 std::lock_guard l
{lock
};
520 BucketsSyncThread
*buckets_sync_thread
;
521 UserSyncThread
*user_sync_thread
;
523 bool map_find(const rgw_user
& user
,const rgw_bucket
& bucket
, RGWQuotaCacheStats
& qs
) override
{
524 return stats_map
.find(user
, qs
);
527 bool map_find_and_update(const rgw_user
& user
, const rgw_bucket
& bucket
, lru_map
<rgw_user
, RGWQuotaCacheStats
>::UpdateContext
*ctx
) override
{
528 return stats_map
.find_and_update(user
, NULL
, ctx
);
531 void map_add(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWQuotaCacheStats
& qs
) override
{
532 stats_map
.add(user
, qs
);
535 int fetch_stats_from_storage(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWStorageStats
& stats
, optional_yield y
, const DoutPrefixProvider
*dpp
) override
;
536 int sync_bucket(const rgw_user
& rgw_user
, rgw_bucket
& bucket
, optional_yield y
, const DoutPrefixProvider
*dpp
);
537 int sync_user(const DoutPrefixProvider
*dpp
, const rgw_user
& user
, optional_yield y
);
538 int sync_all_users(const DoutPrefixProvider
*dpp
, optional_yield y
);
540 void data_modified(const rgw_user
& user
, rgw_bucket
& bucket
) override
;
542 void swap_modified_buckets(map
<rgw_bucket
, rgw_user
>& out
) {
543 std::unique_lock lock
{mutex
};
544 modified_buckets
.swap(out
);
547 template<class T
> /* easier doing it as a template, Thread doesn't have ->stop() */
548 void stop_thread(T
**pthr
) {
560 RGWUserStatsCache(const DoutPrefixProvider
*dpp
, rgw::sal::Driver
* _driver
, bool quota_threads
)
561 : RGWQuotaCache
<rgw_user
>(_driver
, _driver
->ctx()->_conf
->rgw_bucket_quota_cache_size
), dpp(dpp
)
564 buckets_sync_thread
= new BucketsSyncThread(driver
->ctx(), this);
565 buckets_sync_thread
->create("rgw_buck_st_syn");
566 user_sync_thread
= new UserSyncThread(driver
->ctx(), this);
567 user_sync_thread
->create("rgw_user_st_syn");
569 buckets_sync_thread
= NULL
;
570 user_sync_thread
= NULL
;
573 ~RGWUserStatsCache() override
{
577 AsyncRefreshHandler
*allocate_refresh_handler(const rgw_user
& user
, const rgw_bucket
& bucket
) override
{
578 return new UserAsyncRefreshHandler(dpp
, driver
, this, user
, bucket
);
588 std::unique_lock lock
{mutex
};
589 stop_thread(&buckets_sync_thread
);
591 stop_thread(&user_sync_thread
);
595 int RGWUserStatsCache::fetch_stats_from_storage(const rgw_user
& _u
,
596 const rgw_bucket
& _b
,
597 RGWStorageStats
& stats
,
599 const DoutPrefixProvider
*dpp
)
601 std::unique_ptr
<rgw::sal::User
> user
= driver
->get_user(_u
);
602 int r
= user
->read_stats(dpp
, y
, &stats
);
604 ldpp_dout(dpp
, 0) << "could not get user stats for user=" << user
<< dendl
;
611 int RGWUserStatsCache::sync_bucket(const rgw_user
& _u
, rgw_bucket
& _b
, optional_yield y
, const DoutPrefixProvider
*dpp
)
613 std::unique_ptr
<rgw::sal::User
> user
= driver
->get_user(_u
);
614 std::unique_ptr
<rgw::sal::Bucket
> bucket
;
616 int r
= driver
->get_bucket(dpp
, user
.get(), _b
, &bucket
, y
);
618 ldpp_dout(dpp
, 0) << "could not get bucket info for bucket=" << _b
<< " r=" << r
<< dendl
;
622 r
= bucket
->sync_user_stats(dpp
, y
);
624 ldpp_dout(dpp
, 0) << "ERROR: sync_user_stats() for user=" << _u
<< ", bucket=" << bucket
<< " returned " << r
<< dendl
;
628 return bucket
->check_bucket_shards(dpp
);
631 int RGWUserStatsCache::sync_user(const DoutPrefixProvider
*dpp
, const rgw_user
& _u
, optional_yield y
)
633 RGWStorageStats stats
;
634 ceph::real_time last_stats_sync
;
635 ceph::real_time last_stats_update
;
636 std::unique_ptr
<rgw::sal::User
> user
= driver
->get_user(rgw_user(_u
.to_str()));
638 int ret
= user
->read_stats(dpp
, y
, &stats
, &last_stats_sync
, &last_stats_update
);
640 ldpp_dout(dpp
, 5) << "ERROR: can't read user header: ret=" << ret
<< dendl
;
644 if (!driver
->ctx()->_conf
->rgw_user_quota_sync_idle_users
&&
645 last_stats_update
< last_stats_sync
) {
646 ldpp_dout(dpp
, 20) << "user is idle, not doing a full sync (user=" << user
<< ")" << dendl
;
650 real_time when_need_full_sync
= last_stats_sync
;
651 when_need_full_sync
+= make_timespan(driver
->ctx()->_conf
->rgw_user_quota_sync_wait_time
);
653 // check if enough time passed since last full sync
654 /* FIXME: missing check? */
656 ret
= rgw_user_sync_all_stats(dpp
, driver
, user
.get(), y
);
658 ldpp_dout(dpp
, 0) << "ERROR: failed user stats sync, ret=" << ret
<< dendl
;
665 int RGWUserStatsCache::sync_all_users(const DoutPrefixProvider
*dpp
, optional_yield y
)
670 int ret
= driver
->meta_list_keys_init(dpp
, key
, string(), &handle
);
672 ldpp_dout(dpp
, 10) << "ERROR: can't get key: ret=" << ret
<< dendl
;
681 ret
= driver
->meta_list_keys_next(dpp
, handle
, max
, keys
, &truncated
);
683 ldpp_dout(dpp
, 0) << "ERROR: lists_keys_next(): ret=" << ret
<< dendl
;
686 for (list
<string
>::iterator iter
= keys
.begin();
687 iter
!= keys
.end() && !going_down();
689 rgw_user
user(*iter
);
690 ldpp_dout(dpp
, 20) << "RGWUserStatsCache: sync user=" << user
<< dendl
;
691 int ret
= sync_user(dpp
, user
, y
);
693 ldpp_dout(dpp
, 5) << "ERROR: sync_user() failed, user=" << user
<< " ret=" << ret
<< dendl
;
695 /* continuing to next user */
703 driver
->meta_list_keys_complete(handle
);
707 void RGWUserStatsCache::data_modified(const rgw_user
& user
, rgw_bucket
& bucket
)
709 /* racy, but it's ok */
711 bool need_update
= modified_buckets
.find(bucket
) == modified_buckets
.end();
712 mutex
.unlock_shared();
715 std::unique_lock lock
{mutex
};
716 modified_buckets
[bucket
] = user
;
721 class RGWQuotaInfoApplier
{
722 /* NOTE: no non-static field allowed as instances are supposed to live in
723 * the static memory only. */
725 RGWQuotaInfoApplier() = default;
728 virtual ~RGWQuotaInfoApplier() {}
730 virtual bool is_size_exceeded(const DoutPrefixProvider
*dpp
,
731 const char * const entity
,
732 const RGWQuotaInfo
& qinfo
,
733 const RGWStorageStats
& stats
,
734 const uint64_t size
) const = 0;
736 virtual bool is_num_objs_exceeded(const DoutPrefixProvider
*dpp
,
737 const char * const entity
,
738 const RGWQuotaInfo
& qinfo
,
739 const RGWStorageStats
& stats
,
740 const uint64_t num_objs
) const = 0;
742 static const RGWQuotaInfoApplier
& get_instance(const RGWQuotaInfo
& qinfo
);
745 class RGWQuotaInfoDefApplier
: public RGWQuotaInfoApplier
{
747 bool is_size_exceeded(const DoutPrefixProvider
*dpp
, const char * const entity
,
748 const RGWQuotaInfo
& qinfo
,
749 const RGWStorageStats
& stats
,
750 const uint64_t size
) const override
;
752 bool is_num_objs_exceeded(const DoutPrefixProvider
*dpp
, const char * const entity
,
753 const RGWQuotaInfo
& qinfo
,
754 const RGWStorageStats
& stats
,
755 const uint64_t num_objs
) const override
;
758 class RGWQuotaInfoRawApplier
: public RGWQuotaInfoApplier
{
760 bool is_size_exceeded(const DoutPrefixProvider
*dpp
, const char * const entity
,
761 const RGWQuotaInfo
& qinfo
,
762 const RGWStorageStats
& stats
,
763 const uint64_t size
) const override
;
765 bool is_num_objs_exceeded(const DoutPrefixProvider
*dpp
, const char * const entity
,
766 const RGWQuotaInfo
& qinfo
,
767 const RGWStorageStats
& stats
,
768 const uint64_t num_objs
) const override
;
772 bool RGWQuotaInfoDefApplier::is_size_exceeded(const DoutPrefixProvider
*dpp
,
773 const char * const entity
,
774 const RGWQuotaInfo
& qinfo
,
775 const RGWStorageStats
& stats
,
776 const uint64_t size
) const
778 if (qinfo
.max_size
< 0) {
779 /* The limit is not enabled. */
783 const uint64_t cur_size
= stats
.size_rounded
;
784 const uint64_t new_size
= rgw_rounded_objsize(size
);
786 if (std::cmp_greater(cur_size
+ new_size
, qinfo
.max_size
)) {
787 ldpp_dout(dpp
, 10) << "quota exceeded: stats.size_rounded=" << stats
.size_rounded
788 << " size=" << new_size
<< " "
789 << entity
<< "_quota.max_size=" << qinfo
.max_size
<< dendl
;
796 bool RGWQuotaInfoDefApplier::is_num_objs_exceeded(const DoutPrefixProvider
*dpp
,
797 const char * const entity
,
798 const RGWQuotaInfo
& qinfo
,
799 const RGWStorageStats
& stats
,
800 const uint64_t num_objs
) const
802 if (qinfo
.max_objects
< 0) {
803 /* The limit is not enabled. */
807 if (std::cmp_greater(stats
.num_objects
+ num_objs
, qinfo
.max_objects
)) {
808 ldpp_dout(dpp
, 10) << "quota exceeded: stats.num_objects=" << stats
.num_objects
809 << " " << entity
<< "_quota.max_objects=" << qinfo
.max_objects
817 bool RGWQuotaInfoRawApplier::is_size_exceeded(const DoutPrefixProvider
*dpp
,
818 const char * const entity
,
819 const RGWQuotaInfo
& qinfo
,
820 const RGWStorageStats
& stats
,
821 const uint64_t size
) const
823 if (qinfo
.max_size
< 0) {
824 /* The limit is not enabled. */
828 const uint64_t cur_size
= stats
.size
;
830 if (std::cmp_greater(cur_size
+ size
, qinfo
.max_size
)) {
831 ldpp_dout(dpp
, 10) << "quota exceeded: stats.size=" << stats
.size
832 << " size=" << size
<< " "
833 << entity
<< "_quota.max_size=" << qinfo
.max_size
<< dendl
;
840 bool RGWQuotaInfoRawApplier::is_num_objs_exceeded(const DoutPrefixProvider
*dpp
,
841 const char * const entity
,
842 const RGWQuotaInfo
& qinfo
,
843 const RGWStorageStats
& stats
,
844 const uint64_t num_objs
) const
846 if (qinfo
.max_objects
< 0) {
847 /* The limit is not enabled. */
851 if (std::cmp_greater(stats
.num_objects
+ num_objs
, qinfo
.max_objects
)) {
852 ldpp_dout(dpp
, 10) << "quota exceeded: stats.num_objects=" << stats
.num_objects
853 << " " << entity
<< "_quota.max_objects=" << qinfo
.max_objects
861 const RGWQuotaInfoApplier
& RGWQuotaInfoApplier::get_instance(
862 const RGWQuotaInfo
& qinfo
)
864 static RGWQuotaInfoDefApplier default_qapplier
;
865 static RGWQuotaInfoRawApplier raw_qapplier
;
867 if (qinfo
.check_on_raw
) {
870 return default_qapplier
;
875 class RGWQuotaHandlerImpl
: public RGWQuotaHandler
{
876 rgw::sal::Driver
* driver
;
877 RGWBucketStatsCache bucket_stats_cache
;
878 RGWUserStatsCache user_stats_cache
;
880 int check_quota(const DoutPrefixProvider
*dpp
,
881 const char * const entity
,
882 const RGWQuotaInfo
& quota
,
883 const RGWStorageStats
& stats
,
884 const uint64_t num_objs
,
885 const uint64_t size
) {
886 if (!quota
.enabled
) {
890 const auto& quota_applier
= RGWQuotaInfoApplier::get_instance(quota
);
892 ldpp_dout(dpp
, 20) << entity
893 << " quota: max_objects=" << quota
.max_objects
894 << " max_size=" << quota
.max_size
<< dendl
;
897 if (quota_applier
.is_num_objs_exceeded(dpp
, entity
, quota
, stats
, num_objs
)) {
898 return -ERR_QUOTA_EXCEEDED
;
901 if (quota_applier
.is_size_exceeded(dpp
, entity
, quota
, stats
, size
)) {
902 return -ERR_QUOTA_EXCEEDED
;
905 ldpp_dout(dpp
, 20) << entity
<< " quota OK:"
906 << " stats.num_objects=" << stats
.num_objects
907 << " stats.size=" << stats
.size
<< dendl
;
911 RGWQuotaHandlerImpl(const DoutPrefixProvider
*dpp
, rgw::sal::Driver
* _driver
, bool quota_threads
) : driver(_driver
),
912 bucket_stats_cache(_driver
),
913 user_stats_cache(dpp
, _driver
, quota_threads
) {}
915 int check_quota(const DoutPrefixProvider
*dpp
,
916 const rgw_user
& user
,
920 uint64_t size
, optional_yield y
) override
{
922 if (!quota
.bucket_quota
.enabled
&& !quota
.user_quota
.enabled
) {
927 * we need to fetch bucket stats if the user quota is enabled, because
928 * the whole system relies on us periodically updating the user's bucket
929 * stats in the user's header, this happens in get_stats() if we actually
930 * fetch that info and not rely on cached data
933 const DoutPrefix
dp(driver
->ctx(), dout_subsys
, "rgw quota handler: ");
934 if (quota
.bucket_quota
.enabled
) {
935 RGWStorageStats bucket_stats
;
936 int ret
= bucket_stats_cache
.get_stats(user
, bucket
, bucket_stats
, y
, &dp
);
940 ret
= check_quota(dpp
, "bucket", quota
.bucket_quota
, bucket_stats
, num_objs
, size
);
946 if (quota
.user_quota
.enabled
) {
947 RGWStorageStats user_stats
;
948 int ret
= user_stats_cache
.get_stats(user
, bucket
, user_stats
, y
, &dp
);
952 ret
= check_quota(dpp
, "user", quota
.user_quota
, user_stats
, num_objs
, size
);
960 void update_stats(const rgw_user
& user
, rgw_bucket
& bucket
, int obj_delta
, uint64_t added_bytes
, uint64_t removed_bytes
) override
{
961 bucket_stats_cache
.adjust_stats(user
, bucket
, obj_delta
, added_bytes
, removed_bytes
);
962 user_stats_cache
.adjust_stats(user
, bucket
, obj_delta
, added_bytes
, removed_bytes
);
965 void check_bucket_shards(const DoutPrefixProvider
*dpp
, uint64_t max_objs_per_shard
,
966 uint64_t num_shards
, uint64_t num_objs
, bool is_multisite
,
967 bool& need_resharding
, uint32_t *suggested_num_shards
) override
969 if (num_objs
> num_shards
* max_objs_per_shard
) {
970 ldpp_dout(dpp
, 0) << __func__
<< ": resharding needed: stats.num_objects=" << num_objs
971 << " shard max_objects=" << max_objs_per_shard
* num_shards
<< dendl
;
972 need_resharding
= true;
973 if (suggested_num_shards
) {
974 uint32_t obj_multiplier
= 2;
976 // if we're maintaining bilogs for multisite, reshards are significantly
977 // more expensive. scale up the shard count much faster to minimize the
978 // number of reshard events during a write workload
981 *suggested_num_shards
= num_objs
* obj_multiplier
/ max_objs_per_shard
;
984 need_resharding
= false;
990 RGWQuotaHandler
*RGWQuotaHandler::generate_handler(const DoutPrefixProvider
*dpp
, rgw::sal::Driver
* driver
, bool quota_threads
)
992 return new RGWQuotaHandlerImpl(dpp
, driver
, quota_threads
);
995 void RGWQuotaHandler::free_handler(RGWQuotaHandler
*handler
)
1001 void rgw_apply_default_bucket_quota(RGWQuotaInfo
& quota
, const ConfigProxy
& conf
)
1003 if (conf
->rgw_bucket_default_quota_max_objects
>= 0) {
1004 quota
.max_objects
= conf
->rgw_bucket_default_quota_max_objects
;
1005 quota
.enabled
= true;
1007 if (conf
->rgw_bucket_default_quota_max_size
>= 0) {
1008 quota
.max_size
= conf
->rgw_bucket_default_quota_max_size
;
1009 quota
.enabled
= true;
1013 void rgw_apply_default_user_quota(RGWQuotaInfo
& quota
, const ConfigProxy
& conf
)
1015 if (conf
->rgw_user_default_quota_max_objects
>= 0) {
1016 quota
.max_objects
= conf
->rgw_user_default_quota_max_objects
;
1017 quota
.enabled
= true;
1019 if (conf
->rgw_user_default_quota_max_size
>= 0) {
1020 quota
.max_size
= conf
->rgw_user_default_quota_max_size
;
1021 quota
.enabled
= true;
1025 void RGWQuotaInfo::dump(Formatter
*f
) const
1027 f
->dump_bool("enabled", enabled
);
1028 f
->dump_bool("check_on_raw", check_on_raw
);
1030 f
->dump_int("max_size", max_size
);
1031 f
->dump_int("max_size_kb", rgw_rounded_kb(max_size
));
1032 f
->dump_int("max_objects", max_objects
);
1035 void RGWQuotaInfo::decode_json(JSONObj
*obj
)
1037 if (false == JSONDecoder::decode_json("max_size", max_size
, obj
)) {
1038 /* We're parsing an older version of the struct. */
1039 int64_t max_size_kb
= 0;
1041 JSONDecoder::decode_json("max_size_kb", max_size_kb
, obj
);
1042 max_size
= max_size_kb
* 1024;
1044 JSONDecoder::decode_json("max_objects", max_objects
, obj
);
1046 JSONDecoder::decode_json("check_on_raw", check_on_raw
, obj
);
1047 JSONDecoder::decode_json("enabled", enabled
, obj
);