1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2013 Inktank, Inc
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
16 #include "include/utime.h"
17 #include "common/lru_map.h"
18 #include "common/RefCountedObj.h"
19 #include "common/Thread.h"
20 #include "common/Mutex.h"
21 #include "common/RWLock.h"
23 #include "rgw_common.h"
24 #include "rgw_rados.h"
25 #include "rgw_quota.h"
26 #include "rgw_bucket.h"
31 #define dout_context g_ceph_context
32 #define dout_subsys ceph_subsys_rgw
35 struct RGWQuotaCacheStats
{
36 RGWStorageStats stats
;
38 utime_t async_refresh_time
;
45 lru_map
<T
, RGWQuotaCacheStats
> stats_map
;
46 RefCountedWaitObject
*async_refcount
;
48 class StatsAsyncTestSet
: public lru_map
<T
, RGWQuotaCacheStats
>::UpdateContext
{
51 uint64_t removed_bytes
;
53 StatsAsyncTestSet() : objs_delta(0), added_bytes(0), removed_bytes(0) {}
54 bool update(RGWQuotaCacheStats
*entry
) override
{
55 if (entry
->async_refresh_time
.sec() == 0)
58 entry
->async_refresh_time
= utime_t(0, 0);
64 virtual int fetch_stats_from_storage(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWStorageStats
& stats
) = 0;
66 virtual bool map_find(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWQuotaCacheStats
& qs
) = 0;
68 virtual bool map_find_and_update(const rgw_user
& user
, const rgw_bucket
& bucket
, typename lru_map
<T
, RGWQuotaCacheStats
>::UpdateContext
*ctx
) = 0;
69 virtual void map_add(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWQuotaCacheStats
& qs
) = 0;
71 virtual void data_modified(const rgw_user
& user
, rgw_bucket
& bucket
) {}
73 RGWQuotaCache(RGWRados
*_store
, int size
) : store(_store
), stats_map(size
) {
74 async_refcount
= new RefCountedWaitObject
;
76 virtual ~RGWQuotaCache() {
77 async_refcount
->put_wait(); /* wait for all pending async requests to complete */
80 int get_stats(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWStorageStats
& stats
, RGWQuotaInfo
& quota
);
81 void adjust_stats(const rgw_user
& user
, rgw_bucket
& bucket
, int objs_delta
, uint64_t added_bytes
, uint64_t removed_bytes
);
83 virtual bool can_use_cached_stats(RGWQuotaInfo
& quota
, RGWStorageStats
& stats
);
85 void set_stats(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWQuotaCacheStats
& qs
, RGWStorageStats
& stats
);
86 int async_refresh(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWQuotaCacheStats
& qs
);
87 void async_refresh_response(const rgw_user
& user
, rgw_bucket
& bucket
, RGWStorageStats
& stats
);
88 void async_refresh_fail(const rgw_user
& user
, rgw_bucket
& bucket
);
90 class AsyncRefreshHandler
{
93 RGWQuotaCache
<T
> *cache
;
95 AsyncRefreshHandler(RGWRados
*_store
, RGWQuotaCache
<T
> *_cache
) : store(_store
), cache(_cache
) {}
96 virtual ~AsyncRefreshHandler() {}
98 virtual int init_fetch() = 0;
99 virtual void drop_reference() = 0;
102 virtual AsyncRefreshHandler
*allocate_refresh_handler(const rgw_user
& user
, const rgw_bucket
& bucket
) = 0;
106 bool RGWQuotaCache
<T
>::can_use_cached_stats(RGWQuotaInfo
& quota
, RGWStorageStats
& cached_stats
)
108 if (quota
.max_size
>= 0) {
109 if (quota
.max_size_soft_threshold
< 0) {
110 quota
.max_size_soft_threshold
= quota
.max_size
* store
->ctx()->_conf
->rgw_bucket_quota_soft_threshold
;
113 const auto cached_stats_num_kb_rounded
= rgw_rounded_kb(cached_stats
.size_rounded
);
114 if (cached_stats_num_kb_rounded
>= (uint64_t)quota
.max_size_soft_threshold
) {
115 ldout(store
->ctx(), 20) << "quota: can't use cached stats, exceeded soft threshold (size): "
116 << cached_stats_num_kb_rounded
<< " >= " << quota
.max_size_soft_threshold
<< dendl
;
121 if (quota
.max_objects
>= 0) {
122 if (quota
.max_objs_soft_threshold
< 0) {
123 quota
.max_objs_soft_threshold
= quota
.max_objects
* store
->ctx()->_conf
->rgw_bucket_quota_soft_threshold
;
126 if (cached_stats
.num_objects
>= (uint64_t)quota
.max_objs_soft_threshold
) {
127 ldout(store
->ctx(), 20) << "quota: can't use cached stats, exceeded soft threshold (num objs): "
128 << cached_stats
.num_objects
<< " >= " << quota
.max_objs_soft_threshold
<< dendl
;
137 int RGWQuotaCache
<T
>::async_refresh(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWQuotaCacheStats
& qs
)
139 /* protect against multiple updates */
140 StatsAsyncTestSet test_update
;
141 if (!map_find_and_update(user
, bucket
, &test_update
)) {
142 /* most likely we just raced with another update */
146 async_refcount
->get();
149 AsyncRefreshHandler
*handler
= allocate_refresh_handler(user
, bucket
);
151 int ret
= handler
->init_fetch();
153 async_refcount
->put();
154 handler
->drop_reference();
162 void RGWQuotaCache
<T
>::async_refresh_fail(const rgw_user
& user
, rgw_bucket
& bucket
)
164 ldout(store
->ctx(), 20) << "async stats refresh response for bucket=" << bucket
<< dendl
;
166 async_refcount
->put();
170 void RGWQuotaCache
<T
>::async_refresh_response(const rgw_user
& user
, rgw_bucket
& bucket
, RGWStorageStats
& stats
)
172 ldout(store
->ctx(), 20) << "async stats refresh response for bucket=" << bucket
<< dendl
;
174 RGWQuotaCacheStats qs
;
176 map_find(user
, bucket
, qs
);
178 set_stats(user
, bucket
, qs
, stats
);
180 async_refcount
->put();
184 void RGWQuotaCache
<T
>::set_stats(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWQuotaCacheStats
& qs
, RGWStorageStats
& stats
)
187 qs
.expiration
= ceph_clock_now();
188 qs
.async_refresh_time
= qs
.expiration
;
189 qs
.expiration
+= store
->ctx()->_conf
->rgw_bucket_quota_ttl
;
190 qs
.async_refresh_time
+= store
->ctx()->_conf
->rgw_bucket_quota_ttl
/ 2;
192 map_add(user
, bucket
, qs
);
196 int RGWQuotaCache
<T
>::get_stats(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWStorageStats
& stats
, RGWQuotaInfo
& quota
) {
197 RGWQuotaCacheStats qs
;
198 utime_t now
= ceph_clock_now();
199 if (map_find(user
, bucket
, qs
)) {
200 if (qs
.async_refresh_time
.sec() > 0 && now
>= qs
.async_refresh_time
) {
201 int r
= async_refresh(user
, bucket
, qs
);
203 ldout(store
->ctx(), 0) << "ERROR: quota async refresh returned ret=" << r
<< dendl
;
205 /* continue processing, might be a transient error, async refresh is just optimization */
209 if (can_use_cached_stats(quota
, qs
.stats
) && qs
.expiration
>
216 int ret
= fetch_stats_from_storage(user
, bucket
, stats
);
217 if (ret
< 0 && ret
!= -ENOENT
)
220 set_stats(user
, bucket
, qs
, stats
);
227 class RGWQuotaStatsUpdate
: public lru_map
<T
, RGWQuotaCacheStats
>::UpdateContext
{
228 const int objs_delta
;
229 const uint64_t added_bytes
;
230 const uint64_t removed_bytes
;
232 RGWQuotaStatsUpdate(const int objs_delta
,
233 const uint64_t added_bytes
,
234 const uint64_t removed_bytes
)
235 : objs_delta(objs_delta
),
236 added_bytes(added_bytes
),
237 removed_bytes(removed_bytes
) {
240 bool update(RGWQuotaCacheStats
* const entry
) override
{
241 const uint64_t rounded_added
= rgw_rounded_objsize(added_bytes
);
242 const uint64_t rounded_removed
= rgw_rounded_objsize(removed_bytes
);
244 if (((int64_t)(entry
->stats
.size
+ added_bytes
- removed_bytes
)) >= 0) {
245 entry
->stats
.size
+= added_bytes
- removed_bytes
;
247 entry
->stats
.size
= 0;
250 if (((int64_t)(entry
->stats
.size_rounded
+ rounded_added
- rounded_removed
)) >= 0) {
251 entry
->stats
.size_rounded
+= rounded_added
- rounded_removed
;
253 entry
->stats
.size_rounded
= 0;
256 if (((int64_t)(entry
->stats
.num_objects
+ objs_delta
)) >= 0) {
257 entry
->stats
.num_objects
+= objs_delta
;
259 entry
->stats
.num_objects
= 0;
268 void RGWQuotaCache
<T
>::adjust_stats(const rgw_user
& user
, rgw_bucket
& bucket
, int objs_delta
,
269 uint64_t added_bytes
, uint64_t removed_bytes
)
271 RGWQuotaStatsUpdate
<T
> update(objs_delta
, added_bytes
, removed_bytes
);
272 map_find_and_update(user
, bucket
, &update
);
274 data_modified(user
, bucket
);
277 class BucketAsyncRefreshHandler
: public RGWQuotaCache
<rgw_bucket
>::AsyncRefreshHandler
,
278 public RGWGetBucketStats_CB
{
281 BucketAsyncRefreshHandler(RGWRados
*_store
, RGWQuotaCache
<rgw_bucket
> *_cache
,
282 const rgw_user
& _user
, const rgw_bucket
& _bucket
) :
283 RGWQuotaCache
<rgw_bucket
>::AsyncRefreshHandler(_store
, _cache
),
284 RGWGetBucketStats_CB(_bucket
), user(_user
) {}
286 void drop_reference() override
{ put(); }
287 void handle_response(int r
) override
;
288 int init_fetch() override
;
291 int BucketAsyncRefreshHandler::init_fetch()
293 RGWBucketInfo bucket_info
;
295 RGWObjectCtx
obj_ctx(store
);
297 int r
= store
->get_bucket_instance_info(obj_ctx
, bucket
, bucket_info
, NULL
, NULL
);
299 ldout(store
->ctx(), 0) << "could not get bucket info for bucket=" << bucket
<< " r=" << r
<< dendl
;
303 ldout(store
->ctx(), 20) << "initiating async quota refresh for bucket=" << bucket
<< dendl
;
305 r
= store
->get_bucket_stats_async(bucket_info
, RGW_NO_SHARD
, this);
307 ldout(store
->ctx(), 0) << "could not get bucket info for bucket=" << bucket
.name
<< dendl
;
309 /* get_bucket_stats_async() dropped our reference already */
316 void BucketAsyncRefreshHandler::handle_response(const int r
)
319 ldout(store
->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r
<< dendl
;
320 cache
->async_refresh_fail(user
, bucket
);
326 for (const auto& pair
: *stats
) {
327 const RGWStorageStats
& s
= pair
.second
;
330 bs
.size_rounded
+= s
.size_rounded
;
331 bs
.num_objects
+= s
.num_objects
;
334 cache
->async_refresh_response(user
, bucket
, bs
);
337 class RGWBucketStatsCache
: public RGWQuotaCache
<rgw_bucket
> {
339 bool map_find(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWQuotaCacheStats
& qs
) override
{
340 return stats_map
.find(bucket
, qs
);
343 bool map_find_and_update(const rgw_user
& user
, const rgw_bucket
& bucket
, lru_map
<rgw_bucket
, RGWQuotaCacheStats
>::UpdateContext
*ctx
) override
{
344 return stats_map
.find_and_update(bucket
, NULL
, ctx
);
347 void map_add(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWQuotaCacheStats
& qs
) override
{
348 stats_map
.add(bucket
, qs
);
351 int fetch_stats_from_storage(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWStorageStats
& stats
) override
;
354 explicit RGWBucketStatsCache(RGWRados
*_store
) : RGWQuotaCache
<rgw_bucket
>(_store
, _store
->ctx()->_conf
->rgw_bucket_quota_cache_size
) {
357 AsyncRefreshHandler
*allocate_refresh_handler(const rgw_user
& user
, const rgw_bucket
& bucket
) override
{
358 return new BucketAsyncRefreshHandler(store
, this, user
, bucket
);
362 int RGWBucketStatsCache::fetch_stats_from_storage(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWStorageStats
& stats
)
364 RGWBucketInfo bucket_info
;
366 RGWObjectCtx
obj_ctx(store
);
368 int r
= store
->get_bucket_instance_info(obj_ctx
, bucket
, bucket_info
, NULL
, NULL
);
370 ldout(store
->ctx(), 0) << "could not get bucket info for bucket=" << bucket
<< " r=" << r
<< dendl
;
377 map
<RGWObjCategory
, RGWStorageStats
> bucket_stats
;
378 r
= store
->get_bucket_stats(bucket_info
, RGW_NO_SHARD
, &bucket_ver
,
379 &master_ver
, bucket_stats
, nullptr);
381 ldout(store
->ctx(), 0) << "could not get bucket stats for bucket="
382 << bucket
.name
<< dendl
;
386 stats
= RGWStorageStats();
388 for (const auto& pair
: bucket_stats
) {
389 const RGWStorageStats
& s
= pair
.second
;
391 stats
.size
+= s
.size
;
392 stats
.size_rounded
+= s
.size_rounded
;
393 stats
.num_objects
+= s
.num_objects
;
399 class UserAsyncRefreshHandler
: public RGWQuotaCache
<rgw_user
>::AsyncRefreshHandler
,
400 public RGWGetUserStats_CB
{
403 UserAsyncRefreshHandler(RGWRados
*_store
, RGWQuotaCache
<rgw_user
> *_cache
,
404 const rgw_user
& _user
, const rgw_bucket
& _bucket
) :
405 RGWQuotaCache
<rgw_user
>::AsyncRefreshHandler(_store
, _cache
),
406 RGWGetUserStats_CB(_user
),
409 void drop_reference() override
{ put(); }
410 int init_fetch() override
;
411 void handle_response(int r
) override
;
414 int UserAsyncRefreshHandler::init_fetch()
416 ldout(store
->ctx(), 20) << "initiating async quota refresh for user=" << user
<< dendl
;
417 int r
= store
->get_user_stats_async(user
, this);
419 ldout(store
->ctx(), 0) << "could not get bucket info for user=" << user
<< dendl
;
421 /* get_bucket_stats_async() dropped our reference already */
428 void UserAsyncRefreshHandler::handle_response(int r
)
431 ldout(store
->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r
<< dendl
;
432 cache
->async_refresh_fail(user
, bucket
);
436 cache
->async_refresh_response(user
, bucket
, stats
);
439 class RGWUserStatsCache
: public RGWQuotaCache
<rgw_user
> {
440 std::atomic
<bool> down_flag
= { false };
442 map
<rgw_bucket
, rgw_user
> modified_buckets
;
444 /* thread, sync recent modified buckets info */
445 class BucketsSyncThread
: public Thread
{
447 RGWUserStatsCache
*stats
;
453 BucketsSyncThread(CephContext
*_cct
, RGWUserStatsCache
*_s
) : cct(_cct
), stats(_s
), lock("RGWUserStatsCache::BucketsSyncThread") {}
455 void *entry() override
{
456 ldout(cct
, 20) << "BucketsSyncThread: start" << dendl
;
458 map
<rgw_bucket
, rgw_user
> buckets
;
460 stats
->swap_modified_buckets(buckets
);
462 for (map
<rgw_bucket
, rgw_user
>::iterator iter
= buckets
.begin(); iter
!= buckets
.end(); ++iter
) {
463 rgw_bucket bucket
= iter
->first
;
464 rgw_user
& user
= iter
->second
;
465 ldout(cct
, 20) << "BucketsSyncThread: sync user=" << user
<< " bucket=" << bucket
<< dendl
;
466 int r
= stats
->sync_bucket(user
, bucket
);
468 ldout(cct
, 0) << "WARNING: sync_bucket() returned r=" << r
<< dendl
;
472 if (stats
->going_down())
476 cond
.WaitInterval(lock
, utime_t(cct
->_conf
->rgw_user_quota_bucket_sync_interval
, 0));
478 } while (!stats
->going_down());
479 ldout(cct
, 20) << "BucketsSyncThread: done" << dendl
;
485 Mutex::Locker
l(lock
);
491 * thread, full sync all users stats periodically
493 * only sync non idle users or ones that never got synced before, this is needed so that
494 * users that didn't have quota turned on before (or existed before the user objclass
495 * tracked stats) need to get their backend stats up to date.
497 class UserSyncThread
: public Thread
{
499 RGWUserStatsCache
*stats
;
505 UserSyncThread(CephContext
*_cct
, RGWUserStatsCache
*_s
) : cct(_cct
), stats(_s
), lock("RGWUserStatsCache::UserSyncThread") {}
507 void *entry() override
{
508 ldout(cct
, 20) << "UserSyncThread: start" << dendl
;
510 int ret
= stats
->sync_all_users();
512 ldout(cct
, 5) << "ERROR: sync_all_users() returned ret=" << ret
<< dendl
;
515 if (stats
->going_down())
519 cond
.WaitInterval(lock
, utime_t(cct
->_conf
->rgw_user_quota_sync_interval
, 0));
521 } while (!stats
->going_down());
522 ldout(cct
, 20) << "UserSyncThread: done" << dendl
;
528 Mutex::Locker
l(lock
);
533 BucketsSyncThread
*buckets_sync_thread
;
534 UserSyncThread
*user_sync_thread
;
536 bool map_find(const rgw_user
& user
,const rgw_bucket
& bucket
, RGWQuotaCacheStats
& qs
) override
{
537 return stats_map
.find(user
, qs
);
540 bool map_find_and_update(const rgw_user
& user
, const rgw_bucket
& bucket
, lru_map
<rgw_user
, RGWQuotaCacheStats
>::UpdateContext
*ctx
) override
{
541 return stats_map
.find_and_update(user
, NULL
, ctx
);
544 void map_add(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWQuotaCacheStats
& qs
) override
{
545 stats_map
.add(user
, qs
);
548 int fetch_stats_from_storage(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWStorageStats
& stats
) override
;
549 int sync_bucket(const rgw_user
& rgw_user
, rgw_bucket
& bucket
);
550 int sync_user(const rgw_user
& user
);
551 int sync_all_users();
553 void data_modified(const rgw_user
& user
, rgw_bucket
& bucket
) override
;
555 void swap_modified_buckets(map
<rgw_bucket
, rgw_user
>& out
) {
557 modified_buckets
.swap(out
);
561 template<class T
> /* easier doing it as a template, Thread doesn't have ->stop() */
562 void stop_thread(T
**pthr
) {
574 RGWUserStatsCache(RGWRados
*_store
, bool quota_threads
) : RGWQuotaCache
<rgw_user
>(_store
, _store
->ctx()->_conf
->rgw_bucket_quota_cache_size
),
575 rwlock("RGWUserStatsCache::rwlock") {
577 buckets_sync_thread
= new BucketsSyncThread(store
->ctx(), this);
578 buckets_sync_thread
->create("rgw_buck_st_syn");
579 user_sync_thread
= new UserSyncThread(store
->ctx(), this);
580 user_sync_thread
->create("rgw_user_st_syn");
582 buckets_sync_thread
= NULL
;
583 user_sync_thread
= NULL
;
586 ~RGWUserStatsCache() override
{
590 AsyncRefreshHandler
*allocate_refresh_handler(const rgw_user
& user
, const rgw_bucket
& bucket
) override
{
591 return new UserAsyncRefreshHandler(store
, this, user
, bucket
);
594 bool can_use_cached_stats(RGWQuotaInfo
& quota
, RGWStorageStats
& stats
) override
{
595 /* in the user case, the cached stats may contain a better estimation of the totals, as
596 * the backend is only periodically getting updated.
608 stop_thread(&buckets_sync_thread
);
610 stop_thread(&user_sync_thread
);
614 int RGWUserStatsCache::fetch_stats_from_storage(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWStorageStats
& stats
)
616 int r
= store
->get_user_stats(user
, stats
);
618 ldout(store
->ctx(), 0) << "could not get user stats for user=" << user
<< dendl
;
625 int RGWUserStatsCache::sync_bucket(const rgw_user
& user
, rgw_bucket
& bucket
)
627 RGWBucketInfo bucket_info
;
629 RGWObjectCtx
obj_ctx(store
);
631 int r
= store
->get_bucket_instance_info(obj_ctx
, bucket
, bucket_info
, NULL
, NULL
);
633 ldout(store
->ctx(), 0) << "could not get bucket info for bucket=" << bucket
<< " r=" << r
<< dendl
;
637 r
= rgw_bucket_sync_user_stats(store
, user
, bucket_info
);
639 ldout(store
->ctx(), 0) << "ERROR: rgw_bucket_sync_user_stats() for user=" << user
<< ", bucket=" << bucket
<< " returned " << r
<< dendl
;
646 int RGWUserStatsCache::sync_user(const rgw_user
& user
)
648 cls_user_header header
;
649 string user_str
= user
.to_str();
650 int ret
= store
->cls_user_get_header(user_str
, &header
);
652 ldout(store
->ctx(), 5) << "ERROR: can't read user header: ret=" << ret
<< dendl
;
656 if (!store
->ctx()->_conf
->rgw_user_quota_sync_idle_users
&&
657 header
.last_stats_update
< header
.last_stats_sync
) {
658 ldout(store
->ctx(), 20) << "user is idle, not doing a full sync (user=" << user
<< ")" << dendl
;
662 real_time when_need_full_sync
= header
.last_stats_sync
;
663 when_need_full_sync
+= make_timespan(store
->ctx()->_conf
->rgw_user_quota_sync_wait_time
);
665 // check if enough time passed since last full sync
666 /* FIXME: missing check? */
668 ret
= rgw_user_sync_all_stats(store
, user
);
670 ldout(store
->ctx(), 0) << "ERROR: failed user stats sync, ret=" << ret
<< dendl
;
677 int RGWUserStatsCache::sync_all_users()
682 int ret
= store
->meta_mgr
->list_keys_init(key
, &handle
);
684 ldout(store
->ctx(), 10) << "ERROR: can't get key: ret=" << ret
<< dendl
;
693 ret
= store
->meta_mgr
->list_keys_next(handle
, max
, keys
, &truncated
);
695 ldout(store
->ctx(), 0) << "ERROR: lists_keys_next(): ret=" << ret
<< dendl
;
698 for (list
<string
>::iterator iter
= keys
.begin();
699 iter
!= keys
.end() && !going_down();
701 rgw_user
user(*iter
);
702 ldout(store
->ctx(), 20) << "RGWUserStatsCache: sync user=" << user
<< dendl
;
703 int ret
= sync_user(user
);
705 ldout(store
->ctx(), 5) << "ERROR: sync_user() failed, user=" << user
<< " ret=" << ret
<< dendl
;
707 /* continuing to next user */
715 store
->meta_mgr
->list_keys_complete(handle
);
719 void RGWUserStatsCache::data_modified(const rgw_user
& user
, rgw_bucket
& bucket
)
721 /* racy, but it's ok */
723 bool need_update
= modified_buckets
.find(bucket
) == modified_buckets
.end();
728 modified_buckets
[bucket
] = user
;
734 class RGWQuotaInfoApplier
{
735 /* NOTE: no non-static field allowed as instances are supposed to live in
736 * the static memory only. */
738 RGWQuotaInfoApplier() = default;
741 virtual ~RGWQuotaInfoApplier() {}
743 virtual bool is_size_exceeded(const char * const entity
,
744 const RGWQuotaInfo
& qinfo
,
745 const RGWStorageStats
& stats
,
746 const uint64_t size
) const = 0;
748 virtual bool is_num_objs_exceeded(const char * const entity
,
749 const RGWQuotaInfo
& qinfo
,
750 const RGWStorageStats
& stats
,
751 const uint64_t num_objs
) const = 0;
753 static const RGWQuotaInfoApplier
& get_instance(const RGWQuotaInfo
& qinfo
);
756 class RGWQuotaInfoDefApplier
: public RGWQuotaInfoApplier
{
758 bool is_size_exceeded(const char * const entity
,
759 const RGWQuotaInfo
& qinfo
,
760 const RGWStorageStats
& stats
,
761 const uint64_t size
) const override
;
763 bool is_num_objs_exceeded(const char * const entity
,
764 const RGWQuotaInfo
& qinfo
,
765 const RGWStorageStats
& stats
,
766 const uint64_t num_objs
) const override
;
769 class RGWQuotaInfoRawApplier
: public RGWQuotaInfoApplier
{
771 bool is_size_exceeded(const char * const entity
,
772 const RGWQuotaInfo
& qinfo
,
773 const RGWStorageStats
& stats
,
774 const uint64_t size
) const override
;
776 bool is_num_objs_exceeded(const char * const entity
,
777 const RGWQuotaInfo
& qinfo
,
778 const RGWStorageStats
& stats
,
779 const uint64_t num_objs
) const override
;
783 bool RGWQuotaInfoDefApplier::is_size_exceeded(const char * const entity
,
784 const RGWQuotaInfo
& qinfo
,
785 const RGWStorageStats
& stats
,
786 const uint64_t size
) const
788 if (qinfo
.max_size
< 0) {
789 /* The limit is not enabled. */
793 const uint64_t cur_size
= stats
.size_rounded
;
794 const uint64_t new_size
= rgw_rounded_objsize(size
);
796 if (cur_size
+ new_size
> static_cast<uint64_t>(qinfo
.max_size
)) {
797 dout(10) << "quota exceeded: stats.size_rounded=" << stats
.size_rounded
798 << " size=" << new_size
<< " "
799 << entity
<< "_quota.max_size=" << qinfo
.max_size
<< dendl
;
806 bool RGWQuotaInfoDefApplier::is_num_objs_exceeded(const char * const entity
,
807 const RGWQuotaInfo
& qinfo
,
808 const RGWStorageStats
& stats
,
809 const uint64_t num_objs
) const
811 if (qinfo
.max_objects
< 0) {
812 /* The limit is not enabled. */
816 if (stats
.num_objects
+ num_objs
> static_cast<uint64_t>(qinfo
.max_objects
)) {
817 dout(10) << "quota exceeded: stats.num_objects=" << stats
.num_objects
818 << " " << entity
<< "_quota.max_objects=" << qinfo
.max_objects
826 bool RGWQuotaInfoRawApplier::is_size_exceeded(const char * const entity
,
827 const RGWQuotaInfo
& qinfo
,
828 const RGWStorageStats
& stats
,
829 const uint64_t size
) const
831 if (qinfo
.max_size
< 0) {
832 /* The limit is not enabled. */
836 const uint64_t cur_size
= stats
.size
;
838 if (cur_size
+ size
> static_cast<uint64_t>(qinfo
.max_size
)) {
839 dout(10) << "quota exceeded: stats.size=" << stats
.size
840 << " size=" << size
<< " "
841 << entity
<< "_quota.max_size=" << qinfo
.max_size
<< dendl
;
848 bool RGWQuotaInfoRawApplier::is_num_objs_exceeded(const char * const entity
,
849 const RGWQuotaInfo
& qinfo
,
850 const RGWStorageStats
& stats
,
851 const uint64_t num_objs
) const
853 if (qinfo
.max_objects
< 0) {
854 /* The limit is not enabled. */
858 if (stats
.num_objects
+ num_objs
> static_cast<uint64_t>(qinfo
.max_objects
)) {
859 dout(10) << "quota exceeded: stats.num_objects=" << stats
.num_objects
860 << " " << entity
<< "_quota.max_objects=" << qinfo
.max_objects
868 const RGWQuotaInfoApplier
& RGWQuotaInfoApplier::get_instance(
869 const RGWQuotaInfo
& qinfo
)
871 static RGWQuotaInfoDefApplier default_qapplier
;
872 static RGWQuotaInfoRawApplier raw_qapplier
;
874 if (qinfo
.check_on_raw
) {
877 return default_qapplier
;
882 class RGWQuotaHandlerImpl
: public RGWQuotaHandler
{
884 RGWBucketStatsCache bucket_stats_cache
;
885 RGWUserStatsCache user_stats_cache
;
887 int check_quota(const char * const entity
,
888 const RGWQuotaInfo
& quota
,
889 const RGWStorageStats
& stats
,
890 const uint64_t num_objs
,
891 const uint64_t size
) {
892 if (!quota
.enabled
) {
896 const auto& quota_applier
= RGWQuotaInfoApplier::get_instance(quota
);
898 ldout(store
->ctx(), 20) << entity
899 << " quota: max_objects=" << quota
.max_objects
900 << " max_size=" << quota
.max_size
<< dendl
;
903 if (quota_applier
.is_num_objs_exceeded(entity
, quota
, stats
, num_objs
)) {
904 return -ERR_QUOTA_EXCEEDED
;
907 if (quota_applier
.is_size_exceeded(entity
, quota
, stats
, size
)) {
908 return -ERR_QUOTA_EXCEEDED
;
911 ldout(store
->ctx(), 20) << entity
<< " quota OK:"
912 << " stats.num_objects=" << stats
.num_objects
913 << " stats.size=" << stats
.size
<< dendl
;
917 RGWQuotaHandlerImpl(RGWRados
*_store
, bool quota_threads
) : store(_store
),
918 bucket_stats_cache(_store
),
919 user_stats_cache(_store
, quota_threads
) {}
921 int check_quota(const rgw_user
& user
,
923 RGWQuotaInfo
& user_quota
,
924 RGWQuotaInfo
& bucket_quota
,
926 uint64_t size
) override
{
928 if (!bucket_quota
.enabled
&& !user_quota
.enabled
) {
933 * we need to fetch bucket stats if the user quota is enabled, because
934 * the whole system relies on us periodically updating the user's bucket
935 * stats in the user's header, this happens in get_stats() if we actually
936 * fetch that info and not rely on cached data
939 if (bucket_quota
.enabled
) {
940 RGWStorageStats bucket_stats
;
941 int ret
= bucket_stats_cache
.get_stats(user
, bucket
, bucket_stats
,
946 ret
= check_quota("bucket", bucket_quota
, bucket_stats
, num_objs
, size
);
952 if (user_quota
.enabled
) {
953 RGWStorageStats user_stats
;
954 int ret
= user_stats_cache
.get_stats(user
, bucket
, user_stats
, user_quota
);
958 ret
= check_quota("user", user_quota
, user_stats
, num_objs
, size
);
966 void update_stats(const rgw_user
& user
, rgw_bucket
& bucket
, int obj_delta
, uint64_t added_bytes
, uint64_t removed_bytes
) override
{
967 bucket_stats_cache
.adjust_stats(user
, bucket
, obj_delta
, added_bytes
, removed_bytes
);
968 user_stats_cache
.adjust_stats(user
, bucket
, obj_delta
, added_bytes
, removed_bytes
);
971 int check_bucket_shards(uint64_t max_objs_per_shard
, uint64_t num_shards
,
972 const rgw_user
& user
, const rgw_bucket
& bucket
, RGWQuotaInfo
& bucket_quota
,
973 uint64_t num_objs
, bool& need_resharding
, uint32_t *suggested_num_shards
)
975 RGWStorageStats bucket_stats
;
976 int ret
= bucket_stats_cache
.get_stats(user
, bucket
, bucket_stats
,
982 if (bucket_stats
.num_objects
+ num_objs
> num_shards
* max_objs_per_shard
) {
983 ldout(store
->ctx(), 0) << __func__
<< ": resharding needed: stats.num_objects=" << bucket_stats
.num_objects
984 << " shard max_objects=" << max_objs_per_shard
* num_shards
<< dendl
;
985 need_resharding
= true;
986 if (suggested_num_shards
) {
987 *suggested_num_shards
= (bucket_stats
.num_objects
+ num_objs
) * 2 / max_objs_per_shard
;
990 need_resharding
= false;
999 RGWQuotaHandler
*RGWQuotaHandler::generate_handler(RGWRados
*store
, bool quota_threads
)
1001 return new RGWQuotaHandlerImpl(store
, quota_threads
);
1004 void RGWQuotaHandler::free_handler(RGWQuotaHandler
*handler
)