1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
5 * Ceph - scalable distributed file system
7 * Copyright (C) 2013 Inktank, Inc
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
17 #include "include/utime.h"
18 #include "common/lru_map.h"
19 #include "common/RefCountedObj.h"
20 #include "common/Thread.h"
21 #include "common/ceph_mutex.h"
23 #include "rgw_common.h"
25 #include "rgw_quota.h"
26 #include "rgw_bucket.h"
29 #include "services/svc_sys_obj.h"
30 #include "services/svc_meta.h"
34 #define dout_context g_ceph_context
35 #define dout_subsys ceph_subsys_rgw
38 struct RGWQuotaCacheStats
{
39 RGWStorageStats stats
;
41 utime_t async_refresh_time
;
47 rgw::sal::RGWRadosStore
*store
;
48 lru_map
<T
, RGWQuotaCacheStats
> stats_map
;
49 RefCountedWaitObject
*async_refcount
;
51 class StatsAsyncTestSet
: public lru_map
<T
, RGWQuotaCacheStats
>::UpdateContext
{
54 uint64_t removed_bytes
;
56 StatsAsyncTestSet() : objs_delta(0), added_bytes(0), removed_bytes(0) {}
57 bool update(RGWQuotaCacheStats
*entry
) override
{
58 if (entry
->async_refresh_time
.sec() == 0)
61 entry
->async_refresh_time
= utime_t(0, 0);
67 virtual int fetch_stats_from_storage(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWStorageStats
& stats
) = 0;
69 virtual bool map_find(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWQuotaCacheStats
& qs
) = 0;
71 virtual bool map_find_and_update(const rgw_user
& user
, const rgw_bucket
& bucket
, typename lru_map
<T
, RGWQuotaCacheStats
>::UpdateContext
*ctx
) = 0;
72 virtual void map_add(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWQuotaCacheStats
& qs
) = 0;
74 virtual void data_modified(const rgw_user
& user
, rgw_bucket
& bucket
) {}
76 RGWQuotaCache(rgw::sal::RGWRadosStore
*_store
, int size
) : store(_store
), stats_map(size
) {
77 async_refcount
= new RefCountedWaitObject
;
79 virtual ~RGWQuotaCache() {
80 async_refcount
->put_wait(); /* wait for all pending async requests to complete */
83 int get_stats(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWStorageStats
& stats
, RGWQuotaInfo
& quota
);
84 void adjust_stats(const rgw_user
& user
, rgw_bucket
& bucket
, int objs_delta
, uint64_t added_bytes
, uint64_t removed_bytes
);
86 virtual bool can_use_cached_stats(RGWQuotaInfo
& quota
, RGWStorageStats
& stats
);
88 void set_stats(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWQuotaCacheStats
& qs
, RGWStorageStats
& stats
);
89 int async_refresh(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWQuotaCacheStats
& qs
);
90 void async_refresh_response(const rgw_user
& user
, rgw_bucket
& bucket
, RGWStorageStats
& stats
);
91 void async_refresh_fail(const rgw_user
& user
, rgw_bucket
& bucket
);
93 class AsyncRefreshHandler
{
95 rgw::sal::RGWRadosStore
*store
;
96 RGWQuotaCache
<T
> *cache
;
98 AsyncRefreshHandler(rgw::sal::RGWRadosStore
*_store
, RGWQuotaCache
<T
> *_cache
) : store(_store
), cache(_cache
) {}
99 virtual ~AsyncRefreshHandler() {}
101 virtual int init_fetch() = 0;
102 virtual void drop_reference() = 0;
105 virtual AsyncRefreshHandler
*allocate_refresh_handler(const rgw_user
& user
, const rgw_bucket
& bucket
) = 0;
109 bool RGWQuotaCache
<T
>::can_use_cached_stats(RGWQuotaInfo
& quota
, RGWStorageStats
& cached_stats
)
111 if (quota
.max_size
>= 0) {
112 if (quota
.max_size_soft_threshold
< 0) {
113 quota
.max_size_soft_threshold
= quota
.max_size
* store
->ctx()->_conf
->rgw_bucket_quota_soft_threshold
;
116 if (cached_stats
.size_rounded
>= (uint64_t)quota
.max_size_soft_threshold
) {
117 ldout(store
->ctx(), 20) << "quota: can't use cached stats, exceeded soft threshold (size): "
118 << cached_stats
.size_rounded
<< " >= " << quota
.max_size_soft_threshold
<< dendl
;
123 if (quota
.max_objects
>= 0) {
124 if (quota
.max_objs_soft_threshold
< 0) {
125 quota
.max_objs_soft_threshold
= quota
.max_objects
* store
->ctx()->_conf
->rgw_bucket_quota_soft_threshold
;
128 if (cached_stats
.num_objects
>= (uint64_t)quota
.max_objs_soft_threshold
) {
129 ldout(store
->ctx(), 20) << "quota: can't use cached stats, exceeded soft threshold (num objs): "
130 << cached_stats
.num_objects
<< " >= " << quota
.max_objs_soft_threshold
<< dendl
;
139 int RGWQuotaCache
<T
>::async_refresh(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWQuotaCacheStats
& qs
)
141 /* protect against multiple updates */
142 StatsAsyncTestSet test_update
;
143 if (!map_find_and_update(user
, bucket
, &test_update
)) {
144 /* most likely we just raced with another update */
148 async_refcount
->get();
151 AsyncRefreshHandler
*handler
= allocate_refresh_handler(user
, bucket
);
153 int ret
= handler
->init_fetch();
155 async_refcount
->put();
156 handler
->drop_reference();
164 void RGWQuotaCache
<T
>::async_refresh_fail(const rgw_user
& user
, rgw_bucket
& bucket
)
166 ldout(store
->ctx(), 20) << "async stats refresh response for bucket=" << bucket
<< dendl
;
168 async_refcount
->put();
172 void RGWQuotaCache
<T
>::async_refresh_response(const rgw_user
& user
, rgw_bucket
& bucket
, RGWStorageStats
& stats
)
174 ldout(store
->ctx(), 20) << "async stats refresh response for bucket=" << bucket
<< dendl
;
176 RGWQuotaCacheStats qs
;
178 map_find(user
, bucket
, qs
);
180 set_stats(user
, bucket
, qs
, stats
);
182 async_refcount
->put();
186 void RGWQuotaCache
<T
>::set_stats(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWQuotaCacheStats
& qs
, RGWStorageStats
& stats
)
189 qs
.expiration
= ceph_clock_now();
190 qs
.async_refresh_time
= qs
.expiration
;
191 qs
.expiration
+= store
->ctx()->_conf
->rgw_bucket_quota_ttl
;
192 qs
.async_refresh_time
+= store
->ctx()->_conf
->rgw_bucket_quota_ttl
/ 2;
194 map_add(user
, bucket
, qs
);
198 int RGWQuotaCache
<T
>::get_stats(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWStorageStats
& stats
, RGWQuotaInfo
& quota
) {
199 RGWQuotaCacheStats qs
;
200 utime_t now
= ceph_clock_now();
201 if (map_find(user
, bucket
, qs
)) {
202 if (qs
.async_refresh_time
.sec() > 0 && now
>= qs
.async_refresh_time
) {
203 int r
= async_refresh(user
, bucket
, qs
);
205 ldout(store
->ctx(), 0) << "ERROR: quota async refresh returned ret=" << r
<< dendl
;
207 /* continue processing, might be a transient error, async refresh is just optimization */
211 if (can_use_cached_stats(quota
, qs
.stats
) && qs
.expiration
>
218 int ret
= fetch_stats_from_storage(user
, bucket
, stats
);
219 if (ret
< 0 && ret
!= -ENOENT
)
222 set_stats(user
, bucket
, qs
, stats
);
229 class RGWQuotaStatsUpdate
: public lru_map
<T
, RGWQuotaCacheStats
>::UpdateContext
{
230 const int objs_delta
;
231 const uint64_t added_bytes
;
232 const uint64_t removed_bytes
;
234 RGWQuotaStatsUpdate(const int objs_delta
,
235 const uint64_t added_bytes
,
236 const uint64_t removed_bytes
)
237 : objs_delta(objs_delta
),
238 added_bytes(added_bytes
),
239 removed_bytes(removed_bytes
) {
242 bool update(RGWQuotaCacheStats
* const entry
) override
{
243 const uint64_t rounded_added
= rgw_rounded_objsize(added_bytes
);
244 const uint64_t rounded_removed
= rgw_rounded_objsize(removed_bytes
);
246 if (((int64_t)(entry
->stats
.size
+ added_bytes
- removed_bytes
)) >= 0) {
247 entry
->stats
.size
+= added_bytes
- removed_bytes
;
249 entry
->stats
.size
= 0;
252 if (((int64_t)(entry
->stats
.size_rounded
+ rounded_added
- rounded_removed
)) >= 0) {
253 entry
->stats
.size_rounded
+= rounded_added
- rounded_removed
;
255 entry
->stats
.size_rounded
= 0;
258 if (((int64_t)(entry
->stats
.num_objects
+ objs_delta
)) >= 0) {
259 entry
->stats
.num_objects
+= objs_delta
;
261 entry
->stats
.num_objects
= 0;
270 void RGWQuotaCache
<T
>::adjust_stats(const rgw_user
& user
, rgw_bucket
& bucket
, int objs_delta
,
271 uint64_t added_bytes
, uint64_t removed_bytes
)
273 RGWQuotaStatsUpdate
<T
> update(objs_delta
, added_bytes
, removed_bytes
);
274 map_find_and_update(user
, bucket
, &update
);
276 data_modified(user
, bucket
);
279 class BucketAsyncRefreshHandler
: public RGWQuotaCache
<rgw_bucket
>::AsyncRefreshHandler
,
280 public RGWGetBucketStats_CB
{
283 BucketAsyncRefreshHandler(rgw::sal::RGWRadosStore
*_store
, RGWQuotaCache
<rgw_bucket
> *_cache
,
284 const rgw_user
& _user
, const rgw_bucket
& _bucket
) :
285 RGWQuotaCache
<rgw_bucket
>::AsyncRefreshHandler(_store
, _cache
),
286 RGWGetBucketStats_CB(_bucket
), user(_user
) {}
288 void drop_reference() override
{ put(); }
289 void handle_response(int r
) override
;
290 int init_fetch() override
;
293 int BucketAsyncRefreshHandler::init_fetch()
295 RGWBucketInfo bucket_info
;
297 auto obj_ctx
= store
->svc()->sysobj
->init_obj_ctx();
299 int r
= store
->getRados()->get_bucket_instance_info(obj_ctx
, bucket
, bucket_info
, NULL
, NULL
, null_yield
);
301 ldout(store
->ctx(), 0) << "could not get bucket info for bucket=" << bucket
<< " r=" << r
<< dendl
;
305 ldout(store
->ctx(), 20) << "initiating async quota refresh for bucket=" << bucket
<< dendl
;
307 r
= store
->getRados()->get_bucket_stats_async(bucket_info
, RGW_NO_SHARD
, this);
309 ldout(store
->ctx(), 0) << "could not get bucket info for bucket=" << bucket
.name
<< dendl
;
311 /* get_bucket_stats_async() dropped our reference already */
318 void BucketAsyncRefreshHandler::handle_response(const int r
)
321 ldout(store
->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r
<< dendl
;
322 cache
->async_refresh_fail(user
, bucket
);
328 for (const auto& pair
: *stats
) {
329 const RGWStorageStats
& s
= pair
.second
;
332 bs
.size_rounded
+= s
.size_rounded
;
333 bs
.num_objects
+= s
.num_objects
;
336 cache
->async_refresh_response(user
, bucket
, bs
);
339 class RGWBucketStatsCache
: public RGWQuotaCache
<rgw_bucket
> {
341 bool map_find(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWQuotaCacheStats
& qs
) override
{
342 return stats_map
.find(bucket
, qs
);
345 bool map_find_and_update(const rgw_user
& user
, const rgw_bucket
& bucket
, lru_map
<rgw_bucket
, RGWQuotaCacheStats
>::UpdateContext
*ctx
) override
{
346 return stats_map
.find_and_update(bucket
, NULL
, ctx
);
349 void map_add(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWQuotaCacheStats
& qs
) override
{
350 stats_map
.add(bucket
, qs
);
353 int fetch_stats_from_storage(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWStorageStats
& stats
) override
;
356 explicit RGWBucketStatsCache(rgw::sal::RGWRadosStore
*_store
) : RGWQuotaCache
<rgw_bucket
>(_store
, _store
->ctx()->_conf
->rgw_bucket_quota_cache_size
) {
359 AsyncRefreshHandler
*allocate_refresh_handler(const rgw_user
& user
, const rgw_bucket
& bucket
) override
{
360 return new BucketAsyncRefreshHandler(store
, this, user
, bucket
);
364 int RGWBucketStatsCache::fetch_stats_from_storage(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWStorageStats
& stats
)
366 RGWBucketInfo bucket_info
;
368 RGWSysObjectCtx obj_ctx
= store
->svc()->sysobj
->init_obj_ctx();
370 int r
= store
->getRados()->get_bucket_instance_info(obj_ctx
, bucket
, bucket_info
, NULL
, NULL
, null_yield
);
372 ldout(store
->ctx(), 0) << "could not get bucket info for bucket=" << bucket
<< " r=" << r
<< dendl
;
379 map
<RGWObjCategory
, RGWStorageStats
> bucket_stats
;
380 r
= store
->getRados()->get_bucket_stats(bucket_info
, RGW_NO_SHARD
, &bucket_ver
,
381 &master_ver
, bucket_stats
, nullptr);
383 ldout(store
->ctx(), 0) << "could not get bucket stats for bucket="
384 << bucket
.name
<< dendl
;
388 stats
= RGWStorageStats();
390 for (const auto& pair
: bucket_stats
) {
391 const RGWStorageStats
& s
= pair
.second
;
393 stats
.size
+= s
.size
;
394 stats
.size_rounded
+= s
.size_rounded
;
395 stats
.num_objects
+= s
.num_objects
;
401 class UserAsyncRefreshHandler
: public RGWQuotaCache
<rgw_user
>::AsyncRefreshHandler
,
402 public RGWGetUserStats_CB
{
405 UserAsyncRefreshHandler(rgw::sal::RGWRadosStore
*_store
, RGWQuotaCache
<rgw_user
> *_cache
,
406 const rgw_user
& _user
, const rgw_bucket
& _bucket
) :
407 RGWQuotaCache
<rgw_user
>::AsyncRefreshHandler(_store
, _cache
),
408 RGWGetUserStats_CB(_user
),
411 void drop_reference() override
{ put(); }
412 int init_fetch() override
;
413 void handle_response(int r
) override
;
416 int UserAsyncRefreshHandler::init_fetch()
418 ldout(store
->ctx(), 20) << "initiating async quota refresh for user=" << user
<< dendl
;
419 int r
= store
->ctl()->user
->read_stats_async(user
, this);
421 ldout(store
->ctx(), 0) << "could not get bucket info for user=" << user
<< dendl
;
423 /* get_bucket_stats_async() dropped our reference already */
430 void UserAsyncRefreshHandler::handle_response(int r
)
433 ldout(store
->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r
<< dendl
;
434 cache
->async_refresh_fail(user
, bucket
);
438 cache
->async_refresh_response(user
, bucket
, stats
);
441 class RGWUserStatsCache
: public RGWQuotaCache
<rgw_user
> {
442 std::atomic
<bool> down_flag
= { false };
443 ceph::shared_mutex mutex
= ceph::make_shared_mutex("RGWUserStatsCache");
444 map
<rgw_bucket
, rgw_user
> modified_buckets
;
446 /* thread, sync recent modified buckets info */
447 class BucketsSyncThread
: public Thread
{
449 RGWUserStatsCache
*stats
;
451 ceph::mutex lock
= ceph::make_mutex("RGWUserStatsCache::BucketsSyncThread");
452 ceph::condition_variable cond
;
455 BucketsSyncThread(CephContext
*_cct
, RGWUserStatsCache
*_s
) : cct(_cct
), stats(_s
) {}
457 void *entry() override
{
458 ldout(cct
, 20) << "BucketsSyncThread: start" << dendl
;
460 map
<rgw_bucket
, rgw_user
> buckets
;
462 stats
->swap_modified_buckets(buckets
);
464 for (map
<rgw_bucket
, rgw_user
>::iterator iter
= buckets
.begin(); iter
!= buckets
.end(); ++iter
) {
465 rgw_bucket bucket
= iter
->first
;
466 rgw_user
& user
= iter
->second
;
467 ldout(cct
, 20) << "BucketsSyncThread: sync user=" << user
<< " bucket=" << bucket
<< dendl
;
468 int r
= stats
->sync_bucket(user
, bucket
);
470 ldout(cct
, 0) << "WARNING: sync_bucket() returned r=" << r
<< dendl
;
474 if (stats
->going_down())
477 std::unique_lock locker
{lock
};
480 std::chrono::seconds(cct
->_conf
->rgw_user_quota_bucket_sync_interval
));
481 } while (!stats
->going_down());
482 ldout(cct
, 20) << "BucketsSyncThread: done" << dendl
;
488 std::lock_guard l
{lock
};
494 * thread, full sync all users stats periodically
496 * only sync non idle users or ones that never got synced before, this is needed so that
497 * users that didn't have quota turned on before (or existed before the user objclass
498 * tracked stats) need to get their backend stats up to date.
500 class UserSyncThread
: public Thread
{
502 RGWUserStatsCache
*stats
;
504 ceph::mutex lock
= ceph::make_mutex("RGWUserStatsCache::UserSyncThread");
505 ceph::condition_variable cond
;
508 UserSyncThread(CephContext
*_cct
, RGWUserStatsCache
*_s
) : cct(_cct
), stats(_s
) {}
510 void *entry() override
{
511 ldout(cct
, 20) << "UserSyncThread: start" << dendl
;
513 int ret
= stats
->sync_all_users();
515 ldout(cct
, 5) << "ERROR: sync_all_users() returned ret=" << ret
<< dendl
;
518 if (stats
->going_down())
521 std::unique_lock l
{lock
};
522 cond
.wait_for(l
, std::chrono::seconds(cct
->_conf
->rgw_user_quota_sync_interval
));
523 } while (!stats
->going_down());
524 ldout(cct
, 20) << "UserSyncThread: done" << dendl
;
530 std::lock_guard l
{lock
};
535 BucketsSyncThread
*buckets_sync_thread
;
536 UserSyncThread
*user_sync_thread
;
538 bool map_find(const rgw_user
& user
,const rgw_bucket
& bucket
, RGWQuotaCacheStats
& qs
) override
{
539 return stats_map
.find(user
, qs
);
542 bool map_find_and_update(const rgw_user
& user
, const rgw_bucket
& bucket
, lru_map
<rgw_user
, RGWQuotaCacheStats
>::UpdateContext
*ctx
) override
{
543 return stats_map
.find_and_update(user
, NULL
, ctx
);
546 void map_add(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWQuotaCacheStats
& qs
) override
{
547 stats_map
.add(user
, qs
);
550 int fetch_stats_from_storage(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWStorageStats
& stats
) override
;
551 int sync_bucket(const rgw_user
& rgw_user
, rgw_bucket
& bucket
);
552 int sync_user(const rgw_user
& user
);
553 int sync_all_users();
555 void data_modified(const rgw_user
& user
, rgw_bucket
& bucket
) override
;
557 void swap_modified_buckets(map
<rgw_bucket
, rgw_user
>& out
) {
558 std::unique_lock lock
{mutex
};
559 modified_buckets
.swap(out
);
562 template<class T
> /* easier doing it as a template, Thread doesn't have ->stop() */
563 void stop_thread(T
**pthr
) {
575 RGWUserStatsCache(rgw::sal::RGWRadosStore
*_store
, bool quota_threads
)
576 : RGWQuotaCache
<rgw_user
>(_store
, _store
->ctx()->_conf
->rgw_bucket_quota_cache_size
)
579 buckets_sync_thread
= new BucketsSyncThread(store
->ctx(), this);
580 buckets_sync_thread
->create("rgw_buck_st_syn");
581 user_sync_thread
= new UserSyncThread(store
->ctx(), this);
582 user_sync_thread
->create("rgw_user_st_syn");
584 buckets_sync_thread
= NULL
;
585 user_sync_thread
= NULL
;
588 ~RGWUserStatsCache() override
{
592 AsyncRefreshHandler
*allocate_refresh_handler(const rgw_user
& user
, const rgw_bucket
& bucket
) override
{
593 return new UserAsyncRefreshHandler(store
, this, user
, bucket
);
596 bool can_use_cached_stats(RGWQuotaInfo
& quota
, RGWStorageStats
& stats
) override
{
597 /* in the user case, the cached stats may contain a better estimation of the totals, as
598 * the backend is only periodically getting updated.
610 std::unique_lock lock
{mutex
};
611 stop_thread(&buckets_sync_thread
);
613 stop_thread(&user_sync_thread
);
617 int RGWUserStatsCache::fetch_stats_from_storage(const rgw_user
& user
, const rgw_bucket
& bucket
, RGWStorageStats
& stats
)
619 int r
= store
->ctl()->user
->read_stats(user
, &stats
);
621 ldout(store
->ctx(), 0) << "could not get user stats for user=" << user
<< dendl
;
628 int RGWUserStatsCache::sync_bucket(const rgw_user
& user
, rgw_bucket
& bucket
)
630 RGWBucketInfo bucket_info
;
632 int r
= store
->ctl()->bucket
->read_bucket_instance_info(bucket
, &bucket_info
, null_yield
);
634 ldout(store
->ctx(), 0) << "could not get bucket info for bucket=" << bucket
<< " r=" << r
<< dendl
;
639 r
= store
->ctl()->bucket
->sync_user_stats(user
, bucket_info
, &ent
);
641 ldout(store
->ctx(), 0) << "ERROR: sync_user_stats() for user=" << user
<< ", bucket=" << bucket
<< " returned " << r
<< dendl
;
645 return store
->getRados()->check_bucket_shards(bucket_info
, bucket
, ent
.count
);
648 int RGWUserStatsCache::sync_user(const rgw_user
& user
)
650 string user_str
= user
.to_str();
651 RGWStorageStats stats
;
652 ceph::real_time last_stats_sync
;
653 ceph::real_time last_stats_update
;
655 int ret
= store
->ctl()->user
->read_stats(rgw_user(user_str
), &stats
, &last_stats_sync
, &last_stats_update
);
657 ldout(store
->ctx(), 5) << "ERROR: can't read user header: ret=" << ret
<< dendl
;
661 if (!store
->ctx()->_conf
->rgw_user_quota_sync_idle_users
&&
662 last_stats_update
< last_stats_sync
) {
663 ldout(store
->ctx(), 20) << "user is idle, not doing a full sync (user=" << user
<< ")" << dendl
;
667 real_time when_need_full_sync
= last_stats_sync
;
668 when_need_full_sync
+= make_timespan(store
->ctx()->_conf
->rgw_user_quota_sync_wait_time
);
670 // check if enough time passed since last full sync
671 /* FIXME: missing check? */
673 ret
= rgw_user_sync_all_stats(store
, user
);
675 ldout(store
->ctx(), 0) << "ERROR: failed user stats sync, ret=" << ret
<< dendl
;
682 int RGWUserStatsCache::sync_all_users()
687 int ret
= store
->ctl()->meta
.mgr
->list_keys_init(key
, &handle
);
689 ldout(store
->ctx(), 10) << "ERROR: can't get key: ret=" << ret
<< dendl
;
698 ret
= store
->ctl()->meta
.mgr
->list_keys_next(handle
, max
, keys
, &truncated
);
700 ldout(store
->ctx(), 0) << "ERROR: lists_keys_next(): ret=" << ret
<< dendl
;
703 for (list
<string
>::iterator iter
= keys
.begin();
704 iter
!= keys
.end() && !going_down();
706 rgw_user
user(*iter
);
707 ldout(store
->ctx(), 20) << "RGWUserStatsCache: sync user=" << user
<< dendl
;
708 int ret
= sync_user(user
);
710 ldout(store
->ctx(), 5) << "ERROR: sync_user() failed, user=" << user
<< " ret=" << ret
<< dendl
;
712 /* continuing to next user */
720 store
->ctl()->meta
.mgr
->list_keys_complete(handle
);
724 void RGWUserStatsCache::data_modified(const rgw_user
& user
, rgw_bucket
& bucket
)
726 /* racy, but it's ok */
728 bool need_update
= modified_buckets
.find(bucket
) == modified_buckets
.end();
729 mutex
.unlock_shared();
732 std::unique_lock lock
{mutex
};
733 modified_buckets
[bucket
] = user
;
738 class RGWQuotaInfoApplier
{
739 /* NOTE: no non-static field allowed as instances are supposed to live in
740 * the static memory only. */
742 RGWQuotaInfoApplier() = default;
745 virtual ~RGWQuotaInfoApplier() {}
747 virtual bool is_size_exceeded(const char * const entity
,
748 const RGWQuotaInfo
& qinfo
,
749 const RGWStorageStats
& stats
,
750 const uint64_t size
) const = 0;
752 virtual bool is_num_objs_exceeded(const char * const entity
,
753 const RGWQuotaInfo
& qinfo
,
754 const RGWStorageStats
& stats
,
755 const uint64_t num_objs
) const = 0;
757 static const RGWQuotaInfoApplier
& get_instance(const RGWQuotaInfo
& qinfo
);
760 class RGWQuotaInfoDefApplier
: public RGWQuotaInfoApplier
{
762 bool is_size_exceeded(const char * const entity
,
763 const RGWQuotaInfo
& qinfo
,
764 const RGWStorageStats
& stats
,
765 const uint64_t size
) const override
;
767 bool is_num_objs_exceeded(const char * const entity
,
768 const RGWQuotaInfo
& qinfo
,
769 const RGWStorageStats
& stats
,
770 const uint64_t num_objs
) const override
;
773 class RGWQuotaInfoRawApplier
: public RGWQuotaInfoApplier
{
775 bool is_size_exceeded(const char * const entity
,
776 const RGWQuotaInfo
& qinfo
,
777 const RGWStorageStats
& stats
,
778 const uint64_t size
) const override
;
780 bool is_num_objs_exceeded(const char * const entity
,
781 const RGWQuotaInfo
& qinfo
,
782 const RGWStorageStats
& stats
,
783 const uint64_t num_objs
) const override
;
787 bool RGWQuotaInfoDefApplier::is_size_exceeded(const char * const entity
,
788 const RGWQuotaInfo
& qinfo
,
789 const RGWStorageStats
& stats
,
790 const uint64_t size
) const
792 if (qinfo
.max_size
< 0) {
793 /* The limit is not enabled. */
797 const uint64_t cur_size
= stats
.size_rounded
;
798 const uint64_t new_size
= rgw_rounded_objsize(size
);
800 if (cur_size
+ new_size
> static_cast<uint64_t>(qinfo
.max_size
)) {
801 dout(10) << "quota exceeded: stats.size_rounded=" << stats
.size_rounded
802 << " size=" << new_size
<< " "
803 << entity
<< "_quota.max_size=" << qinfo
.max_size
<< dendl
;
810 bool RGWQuotaInfoDefApplier::is_num_objs_exceeded(const char * const entity
,
811 const RGWQuotaInfo
& qinfo
,
812 const RGWStorageStats
& stats
,
813 const uint64_t num_objs
) const
815 if (qinfo
.max_objects
< 0) {
816 /* The limit is not enabled. */
820 if (stats
.num_objects
+ num_objs
> static_cast<uint64_t>(qinfo
.max_objects
)) {
821 dout(10) << "quota exceeded: stats.num_objects=" << stats
.num_objects
822 << " " << entity
<< "_quota.max_objects=" << qinfo
.max_objects
830 bool RGWQuotaInfoRawApplier::is_size_exceeded(const char * const entity
,
831 const RGWQuotaInfo
& qinfo
,
832 const RGWStorageStats
& stats
,
833 const uint64_t size
) const
835 if (qinfo
.max_size
< 0) {
836 /* The limit is not enabled. */
840 const uint64_t cur_size
= stats
.size
;
842 if (cur_size
+ size
> static_cast<uint64_t>(qinfo
.max_size
)) {
843 dout(10) << "quota exceeded: stats.size=" << stats
.size
844 << " size=" << size
<< " "
845 << entity
<< "_quota.max_size=" << qinfo
.max_size
<< dendl
;
852 bool RGWQuotaInfoRawApplier::is_num_objs_exceeded(const char * const entity
,
853 const RGWQuotaInfo
& qinfo
,
854 const RGWStorageStats
& stats
,
855 const uint64_t num_objs
) const
857 if (qinfo
.max_objects
< 0) {
858 /* The limit is not enabled. */
862 if (stats
.num_objects
+ num_objs
> static_cast<uint64_t>(qinfo
.max_objects
)) {
863 dout(10) << "quota exceeded: stats.num_objects=" << stats
.num_objects
864 << " " << entity
<< "_quota.max_objects=" << qinfo
.max_objects
872 const RGWQuotaInfoApplier
& RGWQuotaInfoApplier::get_instance(
873 const RGWQuotaInfo
& qinfo
)
875 static RGWQuotaInfoDefApplier default_qapplier
;
876 static RGWQuotaInfoRawApplier raw_qapplier
;
878 if (qinfo
.check_on_raw
) {
881 return default_qapplier
;
886 class RGWQuotaHandlerImpl
: public RGWQuotaHandler
{
887 rgw::sal::RGWRadosStore
*store
;
888 RGWBucketStatsCache bucket_stats_cache
;
889 RGWUserStatsCache user_stats_cache
;
891 int check_quota(const char * const entity
,
892 const RGWQuotaInfo
& quota
,
893 const RGWStorageStats
& stats
,
894 const uint64_t num_objs
,
895 const uint64_t size
) {
896 if (!quota
.enabled
) {
900 const auto& quota_applier
= RGWQuotaInfoApplier::get_instance(quota
);
902 ldout(store
->ctx(), 20) << entity
903 << " quota: max_objects=" << quota
.max_objects
904 << " max_size=" << quota
.max_size
<< dendl
;
907 if (quota_applier
.is_num_objs_exceeded(entity
, quota
, stats
, num_objs
)) {
908 return -ERR_QUOTA_EXCEEDED
;
911 if (quota_applier
.is_size_exceeded(entity
, quota
, stats
, size
)) {
912 return -ERR_QUOTA_EXCEEDED
;
915 ldout(store
->ctx(), 20) << entity
<< " quota OK:"
916 << " stats.num_objects=" << stats
.num_objects
917 << " stats.size=" << stats
.size
<< dendl
;
921 RGWQuotaHandlerImpl(rgw::sal::RGWRadosStore
*_store
, bool quota_threads
) : store(_store
),
922 bucket_stats_cache(_store
),
923 user_stats_cache(_store
, quota_threads
) {}
925 int check_quota(const rgw_user
& user
,
927 RGWQuotaInfo
& user_quota
,
928 RGWQuotaInfo
& bucket_quota
,
930 uint64_t size
) override
{
932 if (!bucket_quota
.enabled
&& !user_quota
.enabled
) {
937 * we need to fetch bucket stats if the user quota is enabled, because
938 * the whole system relies on us periodically updating the user's bucket
939 * stats in the user's header, this happens in get_stats() if we actually
940 * fetch that info and not rely on cached data
943 if (bucket_quota
.enabled
) {
944 RGWStorageStats bucket_stats
;
945 int ret
= bucket_stats_cache
.get_stats(user
, bucket
, bucket_stats
,
950 ret
= check_quota("bucket", bucket_quota
, bucket_stats
, num_objs
, size
);
956 if (user_quota
.enabled
) {
957 RGWStorageStats user_stats
;
958 int ret
= user_stats_cache
.get_stats(user
, bucket
, user_stats
, user_quota
);
962 ret
= check_quota("user", user_quota
, user_stats
, num_objs
, size
);
970 void update_stats(const rgw_user
& user
, rgw_bucket
& bucket
, int obj_delta
, uint64_t added_bytes
, uint64_t removed_bytes
) override
{
971 bucket_stats_cache
.adjust_stats(user
, bucket
, obj_delta
, added_bytes
, removed_bytes
);
972 user_stats_cache
.adjust_stats(user
, bucket
, obj_delta
, added_bytes
, removed_bytes
);
975 void check_bucket_shards(uint64_t max_objs_per_shard
, uint64_t num_shards
,
976 uint64_t num_objs
, bool& need_resharding
, uint32_t *suggested_num_shards
) override
978 if (num_objs
> num_shards
* max_objs_per_shard
) {
979 ldout(store
->ctx(), 0) << __func__
<< ": resharding needed: stats.num_objects=" << num_objs
980 << " shard max_objects=" << max_objs_per_shard
* num_shards
<< dendl
;
981 need_resharding
= true;
982 if (suggested_num_shards
) {
983 *suggested_num_shards
= num_objs
* 2 / max_objs_per_shard
;
986 need_resharding
= false;
992 RGWQuotaHandler
*RGWQuotaHandler::generate_handler(rgw::sal::RGWRadosStore
*store
, bool quota_threads
)
994 return new RGWQuotaHandlerImpl(store
, quota_threads
);
997 void RGWQuotaHandler::free_handler(RGWQuotaHandler
*handler
)
1003 void rgw_apply_default_bucket_quota(RGWQuotaInfo
& quota
, const ConfigProxy
& conf
)
1005 if (conf
->rgw_bucket_default_quota_max_objects
>= 0) {
1006 quota
.max_objects
= conf
->rgw_bucket_default_quota_max_objects
;
1007 quota
.enabled
= true;
1009 if (conf
->rgw_bucket_default_quota_max_size
>= 0) {
1010 quota
.max_size
= conf
->rgw_bucket_default_quota_max_size
;
1011 quota
.enabled
= true;
1015 void rgw_apply_default_user_quota(RGWQuotaInfo
& quota
, const ConfigProxy
& conf
)
1017 if (conf
->rgw_user_default_quota_max_objects
>= 0) {
1018 quota
.max_objects
= conf
->rgw_user_default_quota_max_objects
;
1019 quota
.enabled
= true;
1021 if (conf
->rgw_user_default_quota_max_size
>= 0) {
1022 quota
.max_size
= conf
->rgw_user_default_quota_max_size
;
1023 quota
.enabled
= true;