]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_quota.cc
update source to 12.2.11
[ceph.git] / ceph / src / rgw / rgw_quota.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Inktank, Inc
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16#include "include/utime.h"
17#include "common/lru_map.h"
18#include "common/RefCountedObj.h"
19#include "common/Thread.h"
20#include "common/Mutex.h"
21#include "common/RWLock.h"
22
23#include "rgw_common.h"
24#include "rgw_rados.h"
25#include "rgw_quota.h"
26#include "rgw_bucket.h"
27#include "rgw_user.h"
28
29#include <atomic>
30
31#define dout_context g_ceph_context
32#define dout_subsys ceph_subsys_rgw
33
34
35struct RGWQuotaCacheStats {
36 RGWStorageStats stats;
37 utime_t expiration;
38 utime_t async_refresh_time;
39};
40
41template<class T>
42class RGWQuotaCache {
43protected:
44 RGWRados *store;
45 lru_map<T, RGWQuotaCacheStats> stats_map;
46 RefCountedWaitObject *async_refcount;
47
48 class StatsAsyncTestSet : public lru_map<T, RGWQuotaCacheStats>::UpdateContext {
49 int objs_delta;
50 uint64_t added_bytes;
51 uint64_t removed_bytes;
52 public:
53 StatsAsyncTestSet() : objs_delta(0), added_bytes(0), removed_bytes(0) {}
54 bool update(RGWQuotaCacheStats *entry) override {
55 if (entry->async_refresh_time.sec() == 0)
56 return false;
57
58 entry->async_refresh_time = utime_t(0, 0);
59
60 return true;
61 }
62 };
63
224ce89b 64 virtual int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) = 0;
7c673cae 65
224ce89b 66 virtual bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0;
7c673cae 67
224ce89b
WB
68 virtual bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, typename lru_map<T, RGWQuotaCacheStats>::UpdateContext *ctx) = 0;
69 virtual void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0;
7c673cae
FG
70
71 virtual void data_modified(const rgw_user& user, rgw_bucket& bucket) {}
72public:
73 RGWQuotaCache(RGWRados *_store, int size) : store(_store), stats_map(size) {
74 async_refcount = new RefCountedWaitObject;
75 }
76 virtual ~RGWQuotaCache() {
77 async_refcount->put_wait(); /* wait for all pending async requests to complete */
78 }
79
224ce89b 80 int get_stats(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota);
7c673cae
FG
81 void adjust_stats(const rgw_user& user, rgw_bucket& bucket, int objs_delta, uint64_t added_bytes, uint64_t removed_bytes);
82
83 virtual bool can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& stats);
84
224ce89b
WB
85 void set_stats(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats);
86 int async_refresh(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs);
7c673cae 87 void async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats);
c07f9fc5 88 void async_refresh_fail(const rgw_user& user, rgw_bucket& bucket);
7c673cae
FG
89
90 class AsyncRefreshHandler {
91 protected:
92 RGWRados *store;
93 RGWQuotaCache<T> *cache;
94 public:
95 AsyncRefreshHandler(RGWRados *_store, RGWQuotaCache<T> *_cache) : store(_store), cache(_cache) {}
96 virtual ~AsyncRefreshHandler() {}
97
98 virtual int init_fetch() = 0;
99 virtual void drop_reference() = 0;
100 };
101
224ce89b 102 virtual AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) = 0;
7c673cae
FG
103};
104
105template<class T>
106bool RGWQuotaCache<T>::can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& cached_stats)
107{
108 if (quota.max_size >= 0) {
109 if (quota.max_size_soft_threshold < 0) {
110 quota.max_size_soft_threshold = quota.max_size * store->ctx()->_conf->rgw_bucket_quota_soft_threshold;
111 }
112
3efd9988 113 if (cached_stats.size_rounded >= (uint64_t)quota.max_size_soft_threshold) {
7c673cae 114 ldout(store->ctx(), 20) << "quota: can't use cached stats, exceeded soft threshold (size): "
3efd9988 115 << cached_stats.size_rounded << " >= " << quota.max_size_soft_threshold << dendl;
7c673cae
FG
116 return false;
117 }
118 }
119
120 if (quota.max_objects >= 0) {
121 if (quota.max_objs_soft_threshold < 0) {
122 quota.max_objs_soft_threshold = quota.max_objects * store->ctx()->_conf->rgw_bucket_quota_soft_threshold;
123 }
124
125 if (cached_stats.num_objects >= (uint64_t)quota.max_objs_soft_threshold) {
126 ldout(store->ctx(), 20) << "quota: can't use cached stats, exceeded soft threshold (num objs): "
127 << cached_stats.num_objects << " >= " << quota.max_objs_soft_threshold << dendl;
128 return false;
129 }
130 }
131
132 return true;
133}
134
135template<class T>
224ce89b 136int RGWQuotaCache<T>::async_refresh(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs)
7c673cae
FG
137{
138 /* protect against multiple updates */
139 StatsAsyncTestSet test_update;
140 if (!map_find_and_update(user, bucket, &test_update)) {
141 /* most likely we just raced with another update */
142 return 0;
143 }
144
145 async_refcount->get();
146
147
148 AsyncRefreshHandler *handler = allocate_refresh_handler(user, bucket);
149
150 int ret = handler->init_fetch();
151 if (ret < 0) {
152 async_refcount->put();
153 handler->drop_reference();
154 return ret;
155 }
156
157 return 0;
158}
159
c07f9fc5
FG
160template<class T>
161void RGWQuotaCache<T>::async_refresh_fail(const rgw_user& user, rgw_bucket& bucket)
162{
163 ldout(store->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl;
164
165 async_refcount->put();
166}
167
7c673cae
FG
168template<class T>
169void RGWQuotaCache<T>::async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats)
170{
171 ldout(store->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl;
172
173 RGWQuotaCacheStats qs;
174
175 map_find(user, bucket, qs);
176
177 set_stats(user, bucket, qs, stats);
178
179 async_refcount->put();
180}
181
182template<class T>
224ce89b 183void RGWQuotaCache<T>::set_stats(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats)
7c673cae
FG
184{
185 qs.stats = stats;
186 qs.expiration = ceph_clock_now();
187 qs.async_refresh_time = qs.expiration;
188 qs.expiration += store->ctx()->_conf->rgw_bucket_quota_ttl;
189 qs.async_refresh_time += store->ctx()->_conf->rgw_bucket_quota_ttl / 2;
190
191 map_add(user, bucket, qs);
192}
193
194template<class T>
224ce89b 195int RGWQuotaCache<T>::get_stats(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota) {
7c673cae
FG
196 RGWQuotaCacheStats qs;
197 utime_t now = ceph_clock_now();
198 if (map_find(user, bucket, qs)) {
199 if (qs.async_refresh_time.sec() > 0 && now >= qs.async_refresh_time) {
200 int r = async_refresh(user, bucket, qs);
201 if (r < 0) {
202 ldout(store->ctx(), 0) << "ERROR: quota async refresh returned ret=" << r << dendl;
203
204 /* continue processing, might be a transient error, async refresh is just optimization */
205 }
206 }
207
208 if (can_use_cached_stats(quota, qs.stats) && qs.expiration >
209 ceph_clock_now()) {
210 stats = qs.stats;
211 return 0;
212 }
213 }
214
215 int ret = fetch_stats_from_storage(user, bucket, stats);
216 if (ret < 0 && ret != -ENOENT)
217 return ret;
218
219 set_stats(user, bucket, qs, stats);
220
221 return 0;
222}
223
224
225template<class T>
226class RGWQuotaStatsUpdate : public lru_map<T, RGWQuotaCacheStats>::UpdateContext {
227 const int objs_delta;
228 const uint64_t added_bytes;
229 const uint64_t removed_bytes;
230public:
231 RGWQuotaStatsUpdate(const int objs_delta,
232 const uint64_t added_bytes,
233 const uint64_t removed_bytes)
234 : objs_delta(objs_delta),
235 added_bytes(added_bytes),
236 removed_bytes(removed_bytes) {
237 }
238
239 bool update(RGWQuotaCacheStats * const entry) override {
240 const uint64_t rounded_added = rgw_rounded_objsize(added_bytes);
241 const uint64_t rounded_removed = rgw_rounded_objsize(removed_bytes);
242
181888fb 243 if (((int64_t)(entry->stats.size + added_bytes - removed_bytes)) >= 0) {
c07f9fc5
FG
244 entry->stats.size += added_bytes - removed_bytes;
245 } else {
246 entry->stats.size = 0;
247 }
248
181888fb 249 if (((int64_t)(entry->stats.size_rounded + rounded_added - rounded_removed)) >= 0) {
c07f9fc5
FG
250 entry->stats.size_rounded += rounded_added - rounded_removed;
251 } else {
252 entry->stats.size_rounded = 0;
253 }
254
181888fb 255 if (((int64_t)(entry->stats.num_objects + objs_delta)) >= 0) {
c07f9fc5
FG
256 entry->stats.num_objects += objs_delta;
257 } else {
258 entry->stats.num_objects = 0;
259 }
7c673cae
FG
260
261 return true;
262 }
263};
264
265
266template<class T>
267void RGWQuotaCache<T>::adjust_stats(const rgw_user& user, rgw_bucket& bucket, int objs_delta,
268 uint64_t added_bytes, uint64_t removed_bytes)
269{
270 RGWQuotaStatsUpdate<T> update(objs_delta, added_bytes, removed_bytes);
271 map_find_and_update(user, bucket, &update);
272
273 data_modified(user, bucket);
274}
275
276class BucketAsyncRefreshHandler : public RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler,
277 public RGWGetBucketStats_CB {
278 rgw_user user;
279public:
280 BucketAsyncRefreshHandler(RGWRados *_store, RGWQuotaCache<rgw_bucket> *_cache,
224ce89b 281 const rgw_user& _user, const rgw_bucket& _bucket) :
7c673cae
FG
282 RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler(_store, _cache),
283 RGWGetBucketStats_CB(_bucket), user(_user) {}
284
285 void drop_reference() override { put(); }
286 void handle_response(int r) override;
287 int init_fetch() override;
288};
289
290int BucketAsyncRefreshHandler::init_fetch()
291{
292 RGWBucketInfo bucket_info;
293
294 RGWObjectCtx obj_ctx(store);
295
296 int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, NULL, NULL);
297 if (r < 0) {
298 ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl;
299 return r;
300 }
301
302 ldout(store->ctx(), 20) << "initiating async quota refresh for bucket=" << bucket << dendl;
303
304 r = store->get_bucket_stats_async(bucket_info, RGW_NO_SHARD, this);
305 if (r < 0) {
306 ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket.name << dendl;
307
308 /* get_bucket_stats_async() dropped our reference already */
309 return r;
310 }
311
312 return 0;
313}
314
315void BucketAsyncRefreshHandler::handle_response(const int r)
316{
317 if (r < 0) {
318 ldout(store->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
c07f9fc5
FG
319 cache->async_refresh_fail(user, bucket);
320 return;
7c673cae
FG
321 }
322
323 RGWStorageStats bs;
324
325 for (const auto& pair : *stats) {
326 const RGWStorageStats& s = pair.second;
327
328 bs.size += s.size;
329 bs.size_rounded += s.size_rounded;
330 bs.num_objects += s.num_objects;
331 }
332
333 cache->async_refresh_response(user, bucket, bs);
334}
335
336class RGWBucketStatsCache : public RGWQuotaCache<rgw_bucket> {
337protected:
224ce89b 338 bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
7c673cae
FG
339 return stats_map.find(bucket, qs);
340 }
341
224ce89b 342 bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, lru_map<rgw_bucket, RGWQuotaCacheStats>::UpdateContext *ctx) override {
7c673cae
FG
343 return stats_map.find_and_update(bucket, NULL, ctx);
344 }
345
224ce89b 346 void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
7c673cae
FG
347 stats_map.add(bucket, qs);
348 }
349
224ce89b 350 int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) override;
7c673cae
FG
351
352public:
353 explicit RGWBucketStatsCache(RGWRados *_store) : RGWQuotaCache<rgw_bucket>(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size) {
354 }
355
224ce89b 356 AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override {
7c673cae
FG
357 return new BucketAsyncRefreshHandler(store, this, user, bucket);
358 }
359};
360
224ce89b 361int RGWBucketStatsCache::fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats)
7c673cae
FG
362{
363 RGWBucketInfo bucket_info;
364
365 RGWObjectCtx obj_ctx(store);
366
367 int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, NULL, NULL);
368 if (r < 0) {
369 ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl;
370 return r;
371 }
372
373 string bucket_ver;
374 string master_ver;
375
376 map<RGWObjCategory, RGWStorageStats> bucket_stats;
377 r = store->get_bucket_stats(bucket_info, RGW_NO_SHARD, &bucket_ver,
378 &master_ver, bucket_stats, nullptr);
379 if (r < 0) {
380 ldout(store->ctx(), 0) << "could not get bucket stats for bucket="
381 << bucket.name << dendl;
382 return r;
383 }
384
385 stats = RGWStorageStats();
386
387 for (const auto& pair : bucket_stats) {
388 const RGWStorageStats& s = pair.second;
389
390 stats.size += s.size;
391 stats.size_rounded += s.size_rounded;
392 stats.num_objects += s.num_objects;
393 }
394
395 return 0;
396}
397
398class UserAsyncRefreshHandler : public RGWQuotaCache<rgw_user>::AsyncRefreshHandler,
399 public RGWGetUserStats_CB {
400 rgw_bucket bucket;
401public:
402 UserAsyncRefreshHandler(RGWRados *_store, RGWQuotaCache<rgw_user> *_cache,
224ce89b 403 const rgw_user& _user, const rgw_bucket& _bucket) :
7c673cae
FG
404 RGWQuotaCache<rgw_user>::AsyncRefreshHandler(_store, _cache),
405 RGWGetUserStats_CB(_user),
406 bucket(_bucket) {}
407
408 void drop_reference() override { put(); }
409 int init_fetch() override;
410 void handle_response(int r) override;
411};
412
413int UserAsyncRefreshHandler::init_fetch()
414{
415 ldout(store->ctx(), 20) << "initiating async quota refresh for user=" << user << dendl;
416 int r = store->get_user_stats_async(user, this);
417 if (r < 0) {
418 ldout(store->ctx(), 0) << "could not get bucket info for user=" << user << dendl;
419
420 /* get_bucket_stats_async() dropped our reference already */
421 return r;
422 }
423
424 return 0;
425}
426
427void UserAsyncRefreshHandler::handle_response(int r)
428{
429 if (r < 0) {
430 ldout(store->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
c07f9fc5
FG
431 cache->async_refresh_fail(user, bucket);
432 return;
7c673cae
FG
433 }
434
435 cache->async_refresh_response(user, bucket, stats);
436}
437
438class RGWUserStatsCache : public RGWQuotaCache<rgw_user> {
439 std::atomic<bool> down_flag = { false };
440 RWLock rwlock;
441 map<rgw_bucket, rgw_user> modified_buckets;
442
443 /* thread, sync recent modified buckets info */
444 class BucketsSyncThread : public Thread {
445 CephContext *cct;
446 RGWUserStatsCache *stats;
447
448 Mutex lock;
449 Cond cond;
450 public:
451
452 BucketsSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s), lock("RGWUserStatsCache::BucketsSyncThread") {}
453
454 void *entry() override {
455 ldout(cct, 20) << "BucketsSyncThread: start" << dendl;
456 do {
457 map<rgw_bucket, rgw_user> buckets;
458
459 stats->swap_modified_buckets(buckets);
460
461 for (map<rgw_bucket, rgw_user>::iterator iter = buckets.begin(); iter != buckets.end(); ++iter) {
462 rgw_bucket bucket = iter->first;
463 rgw_user& user = iter->second;
464 ldout(cct, 20) << "BucketsSyncThread: sync user=" << user << " bucket=" << bucket << dendl;
465 int r = stats->sync_bucket(user, bucket);
466 if (r < 0) {
467 ldout(cct, 0) << "WARNING: sync_bucket() returned r=" << r << dendl;
468 }
469 }
470
471 if (stats->going_down())
472 break;
473
474 lock.Lock();
475 cond.WaitInterval(lock, utime_t(cct->_conf->rgw_user_quota_bucket_sync_interval, 0));
476 lock.Unlock();
477 } while (!stats->going_down());
478 ldout(cct, 20) << "BucketsSyncThread: done" << dendl;
479
480 return NULL;
481 }
482
483 void stop() {
484 Mutex::Locker l(lock);
485 cond.Signal();
486 }
487 };
488
489 /*
490 * thread, full sync all users stats periodically
491 *
492 * only sync non idle users or ones that never got synced before, this is needed so that
493 * users that didn't have quota turned on before (or existed before the user objclass
494 * tracked stats) need to get their backend stats up to date.
495 */
496 class UserSyncThread : public Thread {
497 CephContext *cct;
498 RGWUserStatsCache *stats;
499
500 Mutex lock;
501 Cond cond;
502 public:
503
504 UserSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s), lock("RGWUserStatsCache::UserSyncThread") {}
505
506 void *entry() override {
507 ldout(cct, 20) << "UserSyncThread: start" << dendl;
508 do {
509 int ret = stats->sync_all_users();
510 if (ret < 0) {
511 ldout(cct, 5) << "ERROR: sync_all_users() returned ret=" << ret << dendl;
512 }
513
b5b8bbf5
FG
514 if (stats->going_down())
515 break;
516
7c673cae
FG
517 lock.Lock();
518 cond.WaitInterval(lock, utime_t(cct->_conf->rgw_user_quota_sync_interval, 0));
519 lock.Unlock();
520 } while (!stats->going_down());
521 ldout(cct, 20) << "UserSyncThread: done" << dendl;
522
523 return NULL;
524 }
525
526 void stop() {
527 Mutex::Locker l(lock);
528 cond.Signal();
529 }
530 };
531
532 BucketsSyncThread *buckets_sync_thread;
533 UserSyncThread *user_sync_thread;
534protected:
224ce89b 535 bool map_find(const rgw_user& user,const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
7c673cae
FG
536 return stats_map.find(user, qs);
537 }
538
224ce89b 539 bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, lru_map<rgw_user, RGWQuotaCacheStats>::UpdateContext *ctx) override {
7c673cae
FG
540 return stats_map.find_and_update(user, NULL, ctx);
541 }
542
224ce89b 543 void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
7c673cae
FG
544 stats_map.add(user, qs);
545 }
546
224ce89b 547 int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) override;
7c673cae
FG
548 int sync_bucket(const rgw_user& rgw_user, rgw_bucket& bucket);
549 int sync_user(const rgw_user& user);
550 int sync_all_users();
551
552 void data_modified(const rgw_user& user, rgw_bucket& bucket) override;
553
554 void swap_modified_buckets(map<rgw_bucket, rgw_user>& out) {
555 rwlock.get_write();
556 modified_buckets.swap(out);
557 rwlock.unlock();
558 }
559
560 template<class T> /* easier doing it as a template, Thread doesn't have ->stop() */
561 void stop_thread(T **pthr) {
562 T *thread = *pthr;
563 if (!thread)
564 return;
565
566 thread->stop();
567 thread->join();
568 delete thread;
569 *pthr = NULL;
570 }
571
572public:
573 RGWUserStatsCache(RGWRados *_store, bool quota_threads) : RGWQuotaCache<rgw_user>(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size),
574 rwlock("RGWUserStatsCache::rwlock") {
575 if (quota_threads) {
576 buckets_sync_thread = new BucketsSyncThread(store->ctx(), this);
577 buckets_sync_thread->create("rgw_buck_st_syn");
578 user_sync_thread = new UserSyncThread(store->ctx(), this);
579 user_sync_thread->create("rgw_user_st_syn");
580 } else {
581 buckets_sync_thread = NULL;
582 user_sync_thread = NULL;
583 }
584 }
585 ~RGWUserStatsCache() override {
586 stop();
587 }
588
224ce89b 589 AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override {
7c673cae
FG
590 return new UserAsyncRefreshHandler(store, this, user, bucket);
591 }
592
593 bool can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& stats) override {
594 /* in the user case, the cached stats may contain a better estimation of the totals, as
595 * the backend is only periodically getting updated.
596 */
597 return true;
598 }
599
600 bool going_down() {
601 return down_flag;
602 }
603
604 void stop() {
605 down_flag = true;
606 rwlock.get_write();
607 stop_thread(&buckets_sync_thread);
608 rwlock.unlock();
609 stop_thread(&user_sync_thread);
610 }
611};
612
224ce89b 613int RGWUserStatsCache::fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats)
7c673cae
FG
614{
615 int r = store->get_user_stats(user, stats);
616 if (r < 0) {
617 ldout(store->ctx(), 0) << "could not get user stats for user=" << user << dendl;
618 return r;
619 }
620
621 return 0;
622}
623
624int RGWUserStatsCache::sync_bucket(const rgw_user& user, rgw_bucket& bucket)
625{
626 RGWBucketInfo bucket_info;
627
628 RGWObjectCtx obj_ctx(store);
629
630 int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, NULL, NULL);
631 if (r < 0) {
632 ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl;
633 return r;
634 }
635
636 r = rgw_bucket_sync_user_stats(store, user, bucket_info);
637 if (r < 0) {
638 ldout(store->ctx(), 0) << "ERROR: rgw_bucket_sync_user_stats() for user=" << user << ", bucket=" << bucket << " returned " << r << dendl;
639 return r;
640 }
641
642 return 0;
643}
644
645int RGWUserStatsCache::sync_user(const rgw_user& user)
646{
647 cls_user_header header;
648 string user_str = user.to_str();
649 int ret = store->cls_user_get_header(user_str, &header);
650 if (ret < 0) {
651 ldout(store->ctx(), 5) << "ERROR: can't read user header: ret=" << ret << dendl;
652 return ret;
653 }
654
655 if (!store->ctx()->_conf->rgw_user_quota_sync_idle_users &&
656 header.last_stats_update < header.last_stats_sync) {
657 ldout(store->ctx(), 20) << "user is idle, not doing a full sync (user=" << user << ")" << dendl;
658 return 0;
659 }
660
661 real_time when_need_full_sync = header.last_stats_sync;
662 when_need_full_sync += make_timespan(store->ctx()->_conf->rgw_user_quota_sync_wait_time);
663
664 // check if enough time passed since last full sync
665 /* FIXME: missing check? */
666
667 ret = rgw_user_sync_all_stats(store, user);
668 if (ret < 0) {
669 ldout(store->ctx(), 0) << "ERROR: failed user stats sync, ret=" << ret << dendl;
670 return ret;
671 }
672
673 return 0;
674}
675
676int RGWUserStatsCache::sync_all_users()
677{
678 string key = "user";
679 void *handle;
680
681 int ret = store->meta_mgr->list_keys_init(key, &handle);
682 if (ret < 0) {
683 ldout(store->ctx(), 10) << "ERROR: can't get key: ret=" << ret << dendl;
684 return ret;
685 }
686
687 bool truncated;
688 int max = 1000;
689
690 do {
691 list<string> keys;
692 ret = store->meta_mgr->list_keys_next(handle, max, keys, &truncated);
693 if (ret < 0) {
694 ldout(store->ctx(), 0) << "ERROR: lists_keys_next(): ret=" << ret << dendl;
695 goto done;
696 }
697 for (list<string>::iterator iter = keys.begin();
698 iter != keys.end() && !going_down();
699 ++iter) {
700 rgw_user user(*iter);
701 ldout(store->ctx(), 20) << "RGWUserStatsCache: sync user=" << user << dendl;
702 int ret = sync_user(user);
703 if (ret < 0) {
704 ldout(store->ctx(), 5) << "ERROR: sync_user() failed, user=" << user << " ret=" << ret << dendl;
705
706 /* continuing to next user */
707 continue;
708 }
709 }
710 } while (truncated);
711
712 ret = 0;
713done:
714 store->meta_mgr->list_keys_complete(handle);
715 return ret;
716}
717
718void RGWUserStatsCache::data_modified(const rgw_user& user, rgw_bucket& bucket)
719{
720 /* racy, but it's ok */
721 rwlock.get_read();
722 bool need_update = modified_buckets.find(bucket) == modified_buckets.end();
723 rwlock.unlock();
724
725 if (need_update) {
726 rwlock.get_write();
727 modified_buckets[bucket] = user;
728 rwlock.unlock();
729 }
730}
731
732
733class RGWQuotaInfoApplier {
734 /* NOTE: no non-static field allowed as instances are supposed to live in
735 * the static memory only. */
736protected:
737 RGWQuotaInfoApplier() = default;
738
739public:
740 virtual ~RGWQuotaInfoApplier() {}
741
742 virtual bool is_size_exceeded(const char * const entity,
743 const RGWQuotaInfo& qinfo,
744 const RGWStorageStats& stats,
745 const uint64_t size) const = 0;
746
747 virtual bool is_num_objs_exceeded(const char * const entity,
748 const RGWQuotaInfo& qinfo,
749 const RGWStorageStats& stats,
750 const uint64_t num_objs) const = 0;
751
752 static const RGWQuotaInfoApplier& get_instance(const RGWQuotaInfo& qinfo);
753};
754
755class RGWQuotaInfoDefApplier : public RGWQuotaInfoApplier {
756public:
757 bool is_size_exceeded(const char * const entity,
758 const RGWQuotaInfo& qinfo,
759 const RGWStorageStats& stats,
760 const uint64_t size) const override;
761
762 bool is_num_objs_exceeded(const char * const entity,
763 const RGWQuotaInfo& qinfo,
764 const RGWStorageStats& stats,
765 const uint64_t num_objs) const override;
766};
767
768class RGWQuotaInfoRawApplier : public RGWQuotaInfoApplier {
769public:
770 bool is_size_exceeded(const char * const entity,
771 const RGWQuotaInfo& qinfo,
772 const RGWStorageStats& stats,
773 const uint64_t size) const override;
774
775 bool is_num_objs_exceeded(const char * const entity,
776 const RGWQuotaInfo& qinfo,
777 const RGWStorageStats& stats,
778 const uint64_t num_objs) const override;
779};
780
781
782bool RGWQuotaInfoDefApplier::is_size_exceeded(const char * const entity,
783 const RGWQuotaInfo& qinfo,
784 const RGWStorageStats& stats,
785 const uint64_t size) const
786{
787 if (qinfo.max_size < 0) {
788 /* The limit is not enabled. */
789 return false;
790 }
791
792 const uint64_t cur_size = stats.size_rounded;
793 const uint64_t new_size = rgw_rounded_objsize(size);
794
795 if (cur_size + new_size > static_cast<uint64_t>(qinfo.max_size)) {
796 dout(10) << "quota exceeded: stats.size_rounded=" << stats.size_rounded
797 << " size=" << new_size << " "
798 << entity << "_quota.max_size=" << qinfo.max_size << dendl;
799 return true;
800 }
801
802 return false;
803}
804
805bool RGWQuotaInfoDefApplier::is_num_objs_exceeded(const char * const entity,
806 const RGWQuotaInfo& qinfo,
807 const RGWStorageStats& stats,
808 const uint64_t num_objs) const
809{
810 if (qinfo.max_objects < 0) {
811 /* The limit is not enabled. */
812 return false;
813 }
814
815 if (stats.num_objects + num_objs > static_cast<uint64_t>(qinfo.max_objects)) {
816 dout(10) << "quota exceeded: stats.num_objects=" << stats.num_objects
817 << " " << entity << "_quota.max_objects=" << qinfo.max_objects
818 << dendl;
819 return true;
820 }
821
822 return false;
823}
824
825bool RGWQuotaInfoRawApplier::is_size_exceeded(const char * const entity,
826 const RGWQuotaInfo& qinfo,
827 const RGWStorageStats& stats,
828 const uint64_t size) const
829{
830 if (qinfo.max_size < 0) {
831 /* The limit is not enabled. */
832 return false;
833 }
834
835 const uint64_t cur_size = stats.size;
836
837 if (cur_size + size > static_cast<uint64_t>(qinfo.max_size)) {
838 dout(10) << "quota exceeded: stats.size=" << stats.size
839 << " size=" << size << " "
840 << entity << "_quota.max_size=" << qinfo.max_size << dendl;
841 return true;
842 }
843
844 return false;
845}
846
847bool RGWQuotaInfoRawApplier::is_num_objs_exceeded(const char * const entity,
848 const RGWQuotaInfo& qinfo,
849 const RGWStorageStats& stats,
850 const uint64_t num_objs) const
851{
852 if (qinfo.max_objects < 0) {
853 /* The limit is not enabled. */
854 return false;
855 }
856
857 if (stats.num_objects + num_objs > static_cast<uint64_t>(qinfo.max_objects)) {
858 dout(10) << "quota exceeded: stats.num_objects=" << stats.num_objects
859 << " " << entity << "_quota.max_objects=" << qinfo.max_objects
860 << dendl;
861 return true;
862 }
863
864 return false;
865}
866
867const RGWQuotaInfoApplier& RGWQuotaInfoApplier::get_instance(
868 const RGWQuotaInfo& qinfo)
869{
870 static RGWQuotaInfoDefApplier default_qapplier;
871 static RGWQuotaInfoRawApplier raw_qapplier;
872
873 if (qinfo.check_on_raw) {
874 return raw_qapplier;
875 } else {
876 return default_qapplier;
877 }
878}
879
880
881class RGWQuotaHandlerImpl : public RGWQuotaHandler {
882 RGWRados *store;
883 RGWBucketStatsCache bucket_stats_cache;
884 RGWUserStatsCache user_stats_cache;
885
886 int check_quota(const char * const entity,
887 const RGWQuotaInfo& quota,
888 const RGWStorageStats& stats,
889 const uint64_t num_objs,
890 const uint64_t size) {
891 if (!quota.enabled) {
892 return 0;
893 }
894
895 const auto& quota_applier = RGWQuotaInfoApplier::get_instance(quota);
896
897 ldout(store->ctx(), 20) << entity
898 << " quota: max_objects=" << quota.max_objects
899 << " max_size=" << quota.max_size << dendl;
900
901
902 if (quota_applier.is_num_objs_exceeded(entity, quota, stats, num_objs)) {
903 return -ERR_QUOTA_EXCEEDED;
904 }
905
906 if (quota_applier.is_size_exceeded(entity, quota, stats, size)) {
907 return -ERR_QUOTA_EXCEEDED;
908 }
909
910 ldout(store->ctx(), 20) << entity << " quota OK:"
911 << " stats.num_objects=" << stats.num_objects
912 << " stats.size=" << stats.size << dendl;
913 return 0;
914 }
915public:
916 RGWQuotaHandlerImpl(RGWRados *_store, bool quota_threads) : store(_store),
917 bucket_stats_cache(_store),
918 user_stats_cache(_store, quota_threads) {}
919
920 int check_quota(const rgw_user& user,
921 rgw_bucket& bucket,
922 RGWQuotaInfo& user_quota,
923 RGWQuotaInfo& bucket_quota,
924 uint64_t num_objs,
925 uint64_t size) override {
926
927 if (!bucket_quota.enabled && !user_quota.enabled) {
928 return 0;
929 }
930
931 /*
932 * we need to fetch bucket stats if the user quota is enabled, because
933 * the whole system relies on us periodically updating the user's bucket
934 * stats in the user's header, this happens in get_stats() if we actually
935 * fetch that info and not rely on cached data
936 */
937
938 if (bucket_quota.enabled) {
939 RGWStorageStats bucket_stats;
940 int ret = bucket_stats_cache.get_stats(user, bucket, bucket_stats,
941 bucket_quota);
942 if (ret < 0) {
943 return ret;
944 }
945 ret = check_quota("bucket", bucket_quota, bucket_stats, num_objs, size);
946 if (ret < 0) {
947 return ret;
948 }
949 }
950
951 if (user_quota.enabled) {
952 RGWStorageStats user_stats;
953 int ret = user_stats_cache.get_stats(user, bucket, user_stats, user_quota);
954 if (ret < 0) {
955 return ret;
956 }
957 ret = check_quota("user", user_quota, user_stats, num_objs, size);
958 if (ret < 0) {
959 return ret;
960 }
961 }
962 return 0;
963 }
964
965 void update_stats(const rgw_user& user, rgw_bucket& bucket, int obj_delta, uint64_t added_bytes, uint64_t removed_bytes) override {
966 bucket_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes);
967 user_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes);
968 }
31f18b77
FG
969
970 int check_bucket_shards(uint64_t max_objs_per_shard, uint64_t num_shards,
224ce89b 971 const rgw_user& user, const rgw_bucket& bucket, RGWQuotaInfo& bucket_quota,
31f18b77
FG
972 uint64_t num_objs, bool& need_resharding, uint32_t *suggested_num_shards)
973 {
974 RGWStorageStats bucket_stats;
975 int ret = bucket_stats_cache.get_stats(user, bucket, bucket_stats,
976 bucket_quota);
977 if (ret < 0) {
978 return ret;
979 }
980
981 if (bucket_stats.num_objects + num_objs > num_shards * max_objs_per_shard) {
982 ldout(store->ctx(), 0) << __func__ << ": resharding needed: stats.num_objects=" << bucket_stats.num_objects
983 << " shard max_objects=" << max_objs_per_shard * num_shards << dendl;
984 need_resharding = true;
985 if (suggested_num_shards) {
986 *suggested_num_shards = (bucket_stats.num_objects + num_objs) * 2 / max_objs_per_shard;
987 }
988 } else {
989 need_resharding = false;
990 }
991
992 return 0;
993 }
994
7c673cae
FG
995};
996
997
998RGWQuotaHandler *RGWQuotaHandler::generate_handler(RGWRados *store, bool quota_threads)
999{
1000 return new RGWQuotaHandlerImpl(store, quota_threads);
1001}
1002
1003void RGWQuotaHandler::free_handler(RGWQuotaHandler *handler)
1004{
1005 delete handler;
1006}
31f18b77
FG
1007
1008
f64942e4
AA
1009void rgw_apply_default_bucket_quota(RGWQuotaInfo& quota, const CephContext* cct)
1010{
1011 if (cct->_conf->rgw_bucket_default_quota_max_objects >= 0) {
1012 quota.max_objects = cct->_conf->rgw_bucket_default_quota_max_objects;
1013 quota.enabled = true;
1014 }
1015 if (cct->_conf->rgw_bucket_default_quota_max_size >= 0) {
1016 quota.max_size = cct->_conf->rgw_bucket_default_quota_max_size;
1017 quota.enabled = true;
1018 }
1019}
1020
1021void rgw_apply_default_user_quota(RGWQuotaInfo& quota, const CephContext* cct)
1022{
1023 if (cct->_conf->rgw_user_default_quota_max_objects >= 0) {
1024 quota.max_objects = cct->_conf->rgw_user_default_quota_max_objects;
1025 quota.enabled = true;
1026 }
1027 if (cct->_conf->rgw_user_default_quota_max_size >= 0) {
1028 quota.max_size = cct->_conf->rgw_user_default_quota_max_size;
1029 quota.enabled = true;
1030 }
1031}