]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_quota.cc
update sources to v12.1.1
[ceph.git] / ceph / src / rgw / rgw_quota.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Inktank, Inc
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16#include "include/utime.h"
17#include "common/lru_map.h"
18#include "common/RefCountedObj.h"
19#include "common/Thread.h"
20#include "common/Mutex.h"
21#include "common/RWLock.h"
22
23#include "rgw_common.h"
24#include "rgw_rados.h"
25#include "rgw_quota.h"
26#include "rgw_bucket.h"
27#include "rgw_user.h"
28
29#include <atomic>
30
31#define dout_context g_ceph_context
32#define dout_subsys ceph_subsys_rgw
33
34
35struct RGWQuotaCacheStats {
36 RGWStorageStats stats;
37 utime_t expiration;
38 utime_t async_refresh_time;
39};
40
41template<class T>
42class RGWQuotaCache {
43protected:
44 RGWRados *store;
45 lru_map<T, RGWQuotaCacheStats> stats_map;
46 RefCountedWaitObject *async_refcount;
47
48 class StatsAsyncTestSet : public lru_map<T, RGWQuotaCacheStats>::UpdateContext {
49 int objs_delta;
50 uint64_t added_bytes;
51 uint64_t removed_bytes;
52 public:
53 StatsAsyncTestSet() : objs_delta(0), added_bytes(0), removed_bytes(0) {}
54 bool update(RGWQuotaCacheStats *entry) override {
55 if (entry->async_refresh_time.sec() == 0)
56 return false;
57
58 entry->async_refresh_time = utime_t(0, 0);
59
60 return true;
61 }
62 };
63
224ce89b 64 virtual int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) = 0;
7c673cae 65
224ce89b 66 virtual bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0;
7c673cae 67
224ce89b
WB
68 virtual bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, typename lru_map<T, RGWQuotaCacheStats>::UpdateContext *ctx) = 0;
69 virtual void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0;
7c673cae
FG
70
71 virtual void data_modified(const rgw_user& user, rgw_bucket& bucket) {}
72public:
73 RGWQuotaCache(RGWRados *_store, int size) : store(_store), stats_map(size) {
74 async_refcount = new RefCountedWaitObject;
75 }
76 virtual ~RGWQuotaCache() {
77 async_refcount->put_wait(); /* wait for all pending async requests to complete */
78 }
79
224ce89b 80 int get_stats(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota);
7c673cae
FG
81 void adjust_stats(const rgw_user& user, rgw_bucket& bucket, int objs_delta, uint64_t added_bytes, uint64_t removed_bytes);
82
83 virtual bool can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& stats);
84
224ce89b
WB
85 void set_stats(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats);
86 int async_refresh(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs);
7c673cae
FG
87 void async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats);
88
89 class AsyncRefreshHandler {
90 protected:
91 RGWRados *store;
92 RGWQuotaCache<T> *cache;
93 public:
94 AsyncRefreshHandler(RGWRados *_store, RGWQuotaCache<T> *_cache) : store(_store), cache(_cache) {}
95 virtual ~AsyncRefreshHandler() {}
96
97 virtual int init_fetch() = 0;
98 virtual void drop_reference() = 0;
99 };
100
224ce89b 101 virtual AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) = 0;
7c673cae
FG
102};
103
104template<class T>
105bool RGWQuotaCache<T>::can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& cached_stats)
106{
107 if (quota.max_size >= 0) {
108 if (quota.max_size_soft_threshold < 0) {
109 quota.max_size_soft_threshold = quota.max_size * store->ctx()->_conf->rgw_bucket_quota_soft_threshold;
110 }
111
112 const auto cached_stats_num_kb_rounded = rgw_rounded_kb(cached_stats.size_rounded);
113 if (cached_stats_num_kb_rounded >= (uint64_t)quota.max_size_soft_threshold) {
114 ldout(store->ctx(), 20) << "quota: can't use cached stats, exceeded soft threshold (size): "
115 << cached_stats_num_kb_rounded << " >= " << quota.max_size_soft_threshold << dendl;
116 return false;
117 }
118 }
119
120 if (quota.max_objects >= 0) {
121 if (quota.max_objs_soft_threshold < 0) {
122 quota.max_objs_soft_threshold = quota.max_objects * store->ctx()->_conf->rgw_bucket_quota_soft_threshold;
123 }
124
125 if (cached_stats.num_objects >= (uint64_t)quota.max_objs_soft_threshold) {
126 ldout(store->ctx(), 20) << "quota: can't use cached stats, exceeded soft threshold (num objs): "
127 << cached_stats.num_objects << " >= " << quota.max_objs_soft_threshold << dendl;
128 return false;
129 }
130 }
131
132 return true;
133}
134
135template<class T>
224ce89b 136int RGWQuotaCache<T>::async_refresh(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs)
7c673cae
FG
137{
138 /* protect against multiple updates */
139 StatsAsyncTestSet test_update;
140 if (!map_find_and_update(user, bucket, &test_update)) {
141 /* most likely we just raced with another update */
142 return 0;
143 }
144
145 async_refcount->get();
146
147
148 AsyncRefreshHandler *handler = allocate_refresh_handler(user, bucket);
149
150 int ret = handler->init_fetch();
151 if (ret < 0) {
152 async_refcount->put();
153 handler->drop_reference();
154 return ret;
155 }
156
157 return 0;
158}
159
160template<class T>
161void RGWQuotaCache<T>::async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats)
162{
163 ldout(store->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl;
164
165 RGWQuotaCacheStats qs;
166
167 map_find(user, bucket, qs);
168
169 set_stats(user, bucket, qs, stats);
170
171 async_refcount->put();
172}
173
174template<class T>
224ce89b 175void RGWQuotaCache<T>::set_stats(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats)
7c673cae
FG
176{
177 qs.stats = stats;
178 qs.expiration = ceph_clock_now();
179 qs.async_refresh_time = qs.expiration;
180 qs.expiration += store->ctx()->_conf->rgw_bucket_quota_ttl;
181 qs.async_refresh_time += store->ctx()->_conf->rgw_bucket_quota_ttl / 2;
182
183 map_add(user, bucket, qs);
184}
185
186template<class T>
224ce89b 187int RGWQuotaCache<T>::get_stats(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota) {
7c673cae
FG
188 RGWQuotaCacheStats qs;
189 utime_t now = ceph_clock_now();
190 if (map_find(user, bucket, qs)) {
191 if (qs.async_refresh_time.sec() > 0 && now >= qs.async_refresh_time) {
192 int r = async_refresh(user, bucket, qs);
193 if (r < 0) {
194 ldout(store->ctx(), 0) << "ERROR: quota async refresh returned ret=" << r << dendl;
195
196 /* continue processing, might be a transient error, async refresh is just optimization */
197 }
198 }
199
200 if (can_use_cached_stats(quota, qs.stats) && qs.expiration >
201 ceph_clock_now()) {
202 stats = qs.stats;
203 return 0;
204 }
205 }
206
207 int ret = fetch_stats_from_storage(user, bucket, stats);
208 if (ret < 0 && ret != -ENOENT)
209 return ret;
210
211 set_stats(user, bucket, qs, stats);
212
213 return 0;
214}
215
216
217template<class T>
218class RGWQuotaStatsUpdate : public lru_map<T, RGWQuotaCacheStats>::UpdateContext {
219 const int objs_delta;
220 const uint64_t added_bytes;
221 const uint64_t removed_bytes;
222public:
223 RGWQuotaStatsUpdate(const int objs_delta,
224 const uint64_t added_bytes,
225 const uint64_t removed_bytes)
226 : objs_delta(objs_delta),
227 added_bytes(added_bytes),
228 removed_bytes(removed_bytes) {
229 }
230
231 bool update(RGWQuotaCacheStats * const entry) override {
232 const uint64_t rounded_added = rgw_rounded_objsize(added_bytes);
233 const uint64_t rounded_removed = rgw_rounded_objsize(removed_bytes);
234
235 entry->stats.size += added_bytes - removed_bytes;
236 entry->stats.size_rounded += rounded_added - rounded_removed;
237 entry->stats.num_objects += objs_delta;
238
239 return true;
240 }
241};
242
243
244template<class T>
245void RGWQuotaCache<T>::adjust_stats(const rgw_user& user, rgw_bucket& bucket, int objs_delta,
246 uint64_t added_bytes, uint64_t removed_bytes)
247{
248 RGWQuotaStatsUpdate<T> update(objs_delta, added_bytes, removed_bytes);
249 map_find_and_update(user, bucket, &update);
250
251 data_modified(user, bucket);
252}
253
254class BucketAsyncRefreshHandler : public RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler,
255 public RGWGetBucketStats_CB {
256 rgw_user user;
257public:
258 BucketAsyncRefreshHandler(RGWRados *_store, RGWQuotaCache<rgw_bucket> *_cache,
224ce89b 259 const rgw_user& _user, const rgw_bucket& _bucket) :
7c673cae
FG
260 RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler(_store, _cache),
261 RGWGetBucketStats_CB(_bucket), user(_user) {}
262
263 void drop_reference() override { put(); }
264 void handle_response(int r) override;
265 int init_fetch() override;
266};
267
268int BucketAsyncRefreshHandler::init_fetch()
269{
270 RGWBucketInfo bucket_info;
271
272 RGWObjectCtx obj_ctx(store);
273
274 int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, NULL, NULL);
275 if (r < 0) {
276 ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl;
277 return r;
278 }
279
280 ldout(store->ctx(), 20) << "initiating async quota refresh for bucket=" << bucket << dendl;
281
282 r = store->get_bucket_stats_async(bucket_info, RGW_NO_SHARD, this);
283 if (r < 0) {
284 ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket.name << dendl;
285
286 /* get_bucket_stats_async() dropped our reference already */
287 return r;
288 }
289
290 return 0;
291}
292
293void BucketAsyncRefreshHandler::handle_response(const int r)
294{
295 if (r < 0) {
296 ldout(store->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
297 return; /* nothing to do here */
298 }
299
300 RGWStorageStats bs;
301
302 for (const auto& pair : *stats) {
303 const RGWStorageStats& s = pair.second;
304
305 bs.size += s.size;
306 bs.size_rounded += s.size_rounded;
307 bs.num_objects += s.num_objects;
308 }
309
310 cache->async_refresh_response(user, bucket, bs);
311}
312
313class RGWBucketStatsCache : public RGWQuotaCache<rgw_bucket> {
314protected:
224ce89b 315 bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
7c673cae
FG
316 return stats_map.find(bucket, qs);
317 }
318
224ce89b 319 bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, lru_map<rgw_bucket, RGWQuotaCacheStats>::UpdateContext *ctx) override {
7c673cae
FG
320 return stats_map.find_and_update(bucket, NULL, ctx);
321 }
322
224ce89b 323 void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
7c673cae
FG
324 stats_map.add(bucket, qs);
325 }
326
224ce89b 327 int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) override;
7c673cae
FG
328
329public:
330 explicit RGWBucketStatsCache(RGWRados *_store) : RGWQuotaCache<rgw_bucket>(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size) {
331 }
332
224ce89b 333 AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override {
7c673cae
FG
334 return new BucketAsyncRefreshHandler(store, this, user, bucket);
335 }
336};
337
224ce89b 338int RGWBucketStatsCache::fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats)
7c673cae
FG
339{
340 RGWBucketInfo bucket_info;
341
342 RGWObjectCtx obj_ctx(store);
343
344 int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, NULL, NULL);
345 if (r < 0) {
346 ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl;
347 return r;
348 }
349
350 string bucket_ver;
351 string master_ver;
352
353 map<RGWObjCategory, RGWStorageStats> bucket_stats;
354 r = store->get_bucket_stats(bucket_info, RGW_NO_SHARD, &bucket_ver,
355 &master_ver, bucket_stats, nullptr);
356 if (r < 0) {
357 ldout(store->ctx(), 0) << "could not get bucket stats for bucket="
358 << bucket.name << dendl;
359 return r;
360 }
361
362 stats = RGWStorageStats();
363
364 for (const auto& pair : bucket_stats) {
365 const RGWStorageStats& s = pair.second;
366
367 stats.size += s.size;
368 stats.size_rounded += s.size_rounded;
369 stats.num_objects += s.num_objects;
370 }
371
372 return 0;
373}
374
375class UserAsyncRefreshHandler : public RGWQuotaCache<rgw_user>::AsyncRefreshHandler,
376 public RGWGetUserStats_CB {
377 rgw_bucket bucket;
378public:
379 UserAsyncRefreshHandler(RGWRados *_store, RGWQuotaCache<rgw_user> *_cache,
224ce89b 380 const rgw_user& _user, const rgw_bucket& _bucket) :
7c673cae
FG
381 RGWQuotaCache<rgw_user>::AsyncRefreshHandler(_store, _cache),
382 RGWGetUserStats_CB(_user),
383 bucket(_bucket) {}
384
385 void drop_reference() override { put(); }
386 int init_fetch() override;
387 void handle_response(int r) override;
388};
389
390int UserAsyncRefreshHandler::init_fetch()
391{
392 ldout(store->ctx(), 20) << "initiating async quota refresh for user=" << user << dendl;
393 int r = store->get_user_stats_async(user, this);
394 if (r < 0) {
395 ldout(store->ctx(), 0) << "could not get bucket info for user=" << user << dendl;
396
397 /* get_bucket_stats_async() dropped our reference already */
398 return r;
399 }
400
401 return 0;
402}
403
404void UserAsyncRefreshHandler::handle_response(int r)
405{
406 if (r < 0) {
407 ldout(store->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
408 return; /* nothing to do here */
409 }
410
411 cache->async_refresh_response(user, bucket, stats);
412}
413
414class RGWUserStatsCache : public RGWQuotaCache<rgw_user> {
415 std::atomic<bool> down_flag = { false };
416 RWLock rwlock;
417 map<rgw_bucket, rgw_user> modified_buckets;
418
419 /* thread, sync recent modified buckets info */
420 class BucketsSyncThread : public Thread {
421 CephContext *cct;
422 RGWUserStatsCache *stats;
423
424 Mutex lock;
425 Cond cond;
426 public:
427
428 BucketsSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s), lock("RGWUserStatsCache::BucketsSyncThread") {}
429
430 void *entry() override {
431 ldout(cct, 20) << "BucketsSyncThread: start" << dendl;
432 do {
433 map<rgw_bucket, rgw_user> buckets;
434
435 stats->swap_modified_buckets(buckets);
436
437 for (map<rgw_bucket, rgw_user>::iterator iter = buckets.begin(); iter != buckets.end(); ++iter) {
438 rgw_bucket bucket = iter->first;
439 rgw_user& user = iter->second;
440 ldout(cct, 20) << "BucketsSyncThread: sync user=" << user << " bucket=" << bucket << dendl;
441 int r = stats->sync_bucket(user, bucket);
442 if (r < 0) {
443 ldout(cct, 0) << "WARNING: sync_bucket() returned r=" << r << dendl;
444 }
445 }
446
447 if (stats->going_down())
448 break;
449
450 lock.Lock();
451 cond.WaitInterval(lock, utime_t(cct->_conf->rgw_user_quota_bucket_sync_interval, 0));
452 lock.Unlock();
453 } while (!stats->going_down());
454 ldout(cct, 20) << "BucketsSyncThread: done" << dendl;
455
456 return NULL;
457 }
458
459 void stop() {
460 Mutex::Locker l(lock);
461 cond.Signal();
462 }
463 };
464
465 /*
466 * thread, full sync all users stats periodically
467 *
468 * only sync non idle users or ones that never got synced before, this is needed so that
469 * users that didn't have quota turned on before (or existed before the user objclass
470 * tracked stats) need to get their backend stats up to date.
471 */
472 class UserSyncThread : public Thread {
473 CephContext *cct;
474 RGWUserStatsCache *stats;
475
476 Mutex lock;
477 Cond cond;
478 public:
479
480 UserSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s), lock("RGWUserStatsCache::UserSyncThread") {}
481
482 void *entry() override {
483 ldout(cct, 20) << "UserSyncThread: start" << dendl;
484 do {
485 int ret = stats->sync_all_users();
486 if (ret < 0) {
487 ldout(cct, 5) << "ERROR: sync_all_users() returned ret=" << ret << dendl;
488 }
489
490 lock.Lock();
491 cond.WaitInterval(lock, utime_t(cct->_conf->rgw_user_quota_sync_interval, 0));
492 lock.Unlock();
493 } while (!stats->going_down());
494 ldout(cct, 20) << "UserSyncThread: done" << dendl;
495
496 return NULL;
497 }
498
499 void stop() {
500 Mutex::Locker l(lock);
501 cond.Signal();
502 }
503 };
504
505 BucketsSyncThread *buckets_sync_thread;
506 UserSyncThread *user_sync_thread;
507protected:
224ce89b 508 bool map_find(const rgw_user& user,const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
7c673cae
FG
509 return stats_map.find(user, qs);
510 }
511
224ce89b 512 bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, lru_map<rgw_user, RGWQuotaCacheStats>::UpdateContext *ctx) override {
7c673cae
FG
513 return stats_map.find_and_update(user, NULL, ctx);
514 }
515
224ce89b 516 void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
7c673cae
FG
517 stats_map.add(user, qs);
518 }
519
224ce89b 520 int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) override;
7c673cae
FG
521 int sync_bucket(const rgw_user& rgw_user, rgw_bucket& bucket);
522 int sync_user(const rgw_user& user);
523 int sync_all_users();
524
525 void data_modified(const rgw_user& user, rgw_bucket& bucket) override;
526
527 void swap_modified_buckets(map<rgw_bucket, rgw_user>& out) {
528 rwlock.get_write();
529 modified_buckets.swap(out);
530 rwlock.unlock();
531 }
532
533 template<class T> /* easier doing it as a template, Thread doesn't have ->stop() */
534 void stop_thread(T **pthr) {
535 T *thread = *pthr;
536 if (!thread)
537 return;
538
539 thread->stop();
540 thread->join();
541 delete thread;
542 *pthr = NULL;
543 }
544
545public:
546 RGWUserStatsCache(RGWRados *_store, bool quota_threads) : RGWQuotaCache<rgw_user>(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size),
547 rwlock("RGWUserStatsCache::rwlock") {
548 if (quota_threads) {
549 buckets_sync_thread = new BucketsSyncThread(store->ctx(), this);
550 buckets_sync_thread->create("rgw_buck_st_syn");
551 user_sync_thread = new UserSyncThread(store->ctx(), this);
552 user_sync_thread->create("rgw_user_st_syn");
553 } else {
554 buckets_sync_thread = NULL;
555 user_sync_thread = NULL;
556 }
557 }
558 ~RGWUserStatsCache() override {
559 stop();
560 }
561
224ce89b 562 AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override {
7c673cae
FG
563 return new UserAsyncRefreshHandler(store, this, user, bucket);
564 }
565
566 bool can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& stats) override {
567 /* in the user case, the cached stats may contain a better estimation of the totals, as
568 * the backend is only periodically getting updated.
569 */
570 return true;
571 }
572
573 bool going_down() {
574 return down_flag;
575 }
576
577 void stop() {
578 down_flag = true;
579 rwlock.get_write();
580 stop_thread(&buckets_sync_thread);
581 rwlock.unlock();
582 stop_thread(&user_sync_thread);
583 }
584};
585
224ce89b 586int RGWUserStatsCache::fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats)
7c673cae
FG
587{
588 int r = store->get_user_stats(user, stats);
589 if (r < 0) {
590 ldout(store->ctx(), 0) << "could not get user stats for user=" << user << dendl;
591 return r;
592 }
593
594 return 0;
595}
596
597int RGWUserStatsCache::sync_bucket(const rgw_user& user, rgw_bucket& bucket)
598{
599 RGWBucketInfo bucket_info;
600
601 RGWObjectCtx obj_ctx(store);
602
603 int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, NULL, NULL);
604 if (r < 0) {
605 ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl;
606 return r;
607 }
608
609 r = rgw_bucket_sync_user_stats(store, user, bucket_info);
610 if (r < 0) {
611 ldout(store->ctx(), 0) << "ERROR: rgw_bucket_sync_user_stats() for user=" << user << ", bucket=" << bucket << " returned " << r << dendl;
612 return r;
613 }
614
615 return 0;
616}
617
618int RGWUserStatsCache::sync_user(const rgw_user& user)
619{
620 cls_user_header header;
621 string user_str = user.to_str();
622 int ret = store->cls_user_get_header(user_str, &header);
623 if (ret < 0) {
624 ldout(store->ctx(), 5) << "ERROR: can't read user header: ret=" << ret << dendl;
625 return ret;
626 }
627
628 if (!store->ctx()->_conf->rgw_user_quota_sync_idle_users &&
629 header.last_stats_update < header.last_stats_sync) {
630 ldout(store->ctx(), 20) << "user is idle, not doing a full sync (user=" << user << ")" << dendl;
631 return 0;
632 }
633
634 real_time when_need_full_sync = header.last_stats_sync;
635 when_need_full_sync += make_timespan(store->ctx()->_conf->rgw_user_quota_sync_wait_time);
636
637 // check if enough time passed since last full sync
638 /* FIXME: missing check? */
639
640 ret = rgw_user_sync_all_stats(store, user);
641 if (ret < 0) {
642 ldout(store->ctx(), 0) << "ERROR: failed user stats sync, ret=" << ret << dendl;
643 return ret;
644 }
645
646 return 0;
647}
648
649int RGWUserStatsCache::sync_all_users()
650{
651 string key = "user";
652 void *handle;
653
654 int ret = store->meta_mgr->list_keys_init(key, &handle);
655 if (ret < 0) {
656 ldout(store->ctx(), 10) << "ERROR: can't get key: ret=" << ret << dendl;
657 return ret;
658 }
659
660 bool truncated;
661 int max = 1000;
662
663 do {
664 list<string> keys;
665 ret = store->meta_mgr->list_keys_next(handle, max, keys, &truncated);
666 if (ret < 0) {
667 ldout(store->ctx(), 0) << "ERROR: lists_keys_next(): ret=" << ret << dendl;
668 goto done;
669 }
670 for (list<string>::iterator iter = keys.begin();
671 iter != keys.end() && !going_down();
672 ++iter) {
673 rgw_user user(*iter);
674 ldout(store->ctx(), 20) << "RGWUserStatsCache: sync user=" << user << dendl;
675 int ret = sync_user(user);
676 if (ret < 0) {
677 ldout(store->ctx(), 5) << "ERROR: sync_user() failed, user=" << user << " ret=" << ret << dendl;
678
679 /* continuing to next user */
680 continue;
681 }
682 }
683 } while (truncated);
684
685 ret = 0;
686done:
687 store->meta_mgr->list_keys_complete(handle);
688 return ret;
689}
690
691void RGWUserStatsCache::data_modified(const rgw_user& user, rgw_bucket& bucket)
692{
693 /* racy, but it's ok */
694 rwlock.get_read();
695 bool need_update = modified_buckets.find(bucket) == modified_buckets.end();
696 rwlock.unlock();
697
698 if (need_update) {
699 rwlock.get_write();
700 modified_buckets[bucket] = user;
701 rwlock.unlock();
702 }
703}
704
705
706class RGWQuotaInfoApplier {
707 /* NOTE: no non-static field allowed as instances are supposed to live in
708 * the static memory only. */
709protected:
710 RGWQuotaInfoApplier() = default;
711
712public:
713 virtual ~RGWQuotaInfoApplier() {}
714
715 virtual bool is_size_exceeded(const char * const entity,
716 const RGWQuotaInfo& qinfo,
717 const RGWStorageStats& stats,
718 const uint64_t size) const = 0;
719
720 virtual bool is_num_objs_exceeded(const char * const entity,
721 const RGWQuotaInfo& qinfo,
722 const RGWStorageStats& stats,
723 const uint64_t num_objs) const = 0;
724
725 static const RGWQuotaInfoApplier& get_instance(const RGWQuotaInfo& qinfo);
726};
727
728class RGWQuotaInfoDefApplier : public RGWQuotaInfoApplier {
729public:
730 bool is_size_exceeded(const char * const entity,
731 const RGWQuotaInfo& qinfo,
732 const RGWStorageStats& stats,
733 const uint64_t size) const override;
734
735 bool is_num_objs_exceeded(const char * const entity,
736 const RGWQuotaInfo& qinfo,
737 const RGWStorageStats& stats,
738 const uint64_t num_objs) const override;
739};
740
741class RGWQuotaInfoRawApplier : public RGWQuotaInfoApplier {
742public:
743 bool is_size_exceeded(const char * const entity,
744 const RGWQuotaInfo& qinfo,
745 const RGWStorageStats& stats,
746 const uint64_t size) const override;
747
748 bool is_num_objs_exceeded(const char * const entity,
749 const RGWQuotaInfo& qinfo,
750 const RGWStorageStats& stats,
751 const uint64_t num_objs) const override;
752};
753
754
755bool RGWQuotaInfoDefApplier::is_size_exceeded(const char * const entity,
756 const RGWQuotaInfo& qinfo,
757 const RGWStorageStats& stats,
758 const uint64_t size) const
759{
760 if (qinfo.max_size < 0) {
761 /* The limit is not enabled. */
762 return false;
763 }
764
765 const uint64_t cur_size = stats.size_rounded;
766 const uint64_t new_size = rgw_rounded_objsize(size);
767
768 if (cur_size + new_size > static_cast<uint64_t>(qinfo.max_size)) {
769 dout(10) << "quota exceeded: stats.size_rounded=" << stats.size_rounded
770 << " size=" << new_size << " "
771 << entity << "_quota.max_size=" << qinfo.max_size << dendl;
772 return true;
773 }
774
775 return false;
776}
777
778bool RGWQuotaInfoDefApplier::is_num_objs_exceeded(const char * const entity,
779 const RGWQuotaInfo& qinfo,
780 const RGWStorageStats& stats,
781 const uint64_t num_objs) const
782{
783 if (qinfo.max_objects < 0) {
784 /* The limit is not enabled. */
785 return false;
786 }
787
788 if (stats.num_objects + num_objs > static_cast<uint64_t>(qinfo.max_objects)) {
789 dout(10) << "quota exceeded: stats.num_objects=" << stats.num_objects
790 << " " << entity << "_quota.max_objects=" << qinfo.max_objects
791 << dendl;
792 return true;
793 }
794
795 return false;
796}
797
798bool RGWQuotaInfoRawApplier::is_size_exceeded(const char * const entity,
799 const RGWQuotaInfo& qinfo,
800 const RGWStorageStats& stats,
801 const uint64_t size) const
802{
803 if (qinfo.max_size < 0) {
804 /* The limit is not enabled. */
805 return false;
806 }
807
808 const uint64_t cur_size = stats.size;
809
810 if (cur_size + size > static_cast<uint64_t>(qinfo.max_size)) {
811 dout(10) << "quota exceeded: stats.size=" << stats.size
812 << " size=" << size << " "
813 << entity << "_quota.max_size=" << qinfo.max_size << dendl;
814 return true;
815 }
816
817 return false;
818}
819
820bool RGWQuotaInfoRawApplier::is_num_objs_exceeded(const char * const entity,
821 const RGWQuotaInfo& qinfo,
822 const RGWStorageStats& stats,
823 const uint64_t num_objs) const
824{
825 if (qinfo.max_objects < 0) {
826 /* The limit is not enabled. */
827 return false;
828 }
829
830 if (stats.num_objects + num_objs > static_cast<uint64_t>(qinfo.max_objects)) {
831 dout(10) << "quota exceeded: stats.num_objects=" << stats.num_objects
832 << " " << entity << "_quota.max_objects=" << qinfo.max_objects
833 << dendl;
834 return true;
835 }
836
837 return false;
838}
839
840const RGWQuotaInfoApplier& RGWQuotaInfoApplier::get_instance(
841 const RGWQuotaInfo& qinfo)
842{
843 static RGWQuotaInfoDefApplier default_qapplier;
844 static RGWQuotaInfoRawApplier raw_qapplier;
845
846 if (qinfo.check_on_raw) {
847 return raw_qapplier;
848 } else {
849 return default_qapplier;
850 }
851}
852
853
854class RGWQuotaHandlerImpl : public RGWQuotaHandler {
855 RGWRados *store;
856 RGWBucketStatsCache bucket_stats_cache;
857 RGWUserStatsCache user_stats_cache;
858
859 int check_quota(const char * const entity,
860 const RGWQuotaInfo& quota,
861 const RGWStorageStats& stats,
862 const uint64_t num_objs,
863 const uint64_t size) {
864 if (!quota.enabled) {
865 return 0;
866 }
867
868 const auto& quota_applier = RGWQuotaInfoApplier::get_instance(quota);
869
870 ldout(store->ctx(), 20) << entity
871 << " quota: max_objects=" << quota.max_objects
872 << " max_size=" << quota.max_size << dendl;
873
874
875 if (quota_applier.is_num_objs_exceeded(entity, quota, stats, num_objs)) {
876 return -ERR_QUOTA_EXCEEDED;
877 }
878
879 if (quota_applier.is_size_exceeded(entity, quota, stats, size)) {
880 return -ERR_QUOTA_EXCEEDED;
881 }
882
883 ldout(store->ctx(), 20) << entity << " quota OK:"
884 << " stats.num_objects=" << stats.num_objects
885 << " stats.size=" << stats.size << dendl;
886 return 0;
887 }
888public:
889 RGWQuotaHandlerImpl(RGWRados *_store, bool quota_threads) : store(_store),
890 bucket_stats_cache(_store),
891 user_stats_cache(_store, quota_threads) {}
892
893 int check_quota(const rgw_user& user,
894 rgw_bucket& bucket,
895 RGWQuotaInfo& user_quota,
896 RGWQuotaInfo& bucket_quota,
897 uint64_t num_objs,
898 uint64_t size) override {
899
900 if (!bucket_quota.enabled && !user_quota.enabled) {
901 return 0;
902 }
903
904 /*
905 * we need to fetch bucket stats if the user quota is enabled, because
906 * the whole system relies on us periodically updating the user's bucket
907 * stats in the user's header, this happens in get_stats() if we actually
908 * fetch that info and not rely on cached data
909 */
910
911 if (bucket_quota.enabled) {
912 RGWStorageStats bucket_stats;
913 int ret = bucket_stats_cache.get_stats(user, bucket, bucket_stats,
914 bucket_quota);
915 if (ret < 0) {
916 return ret;
917 }
918 ret = check_quota("bucket", bucket_quota, bucket_stats, num_objs, size);
919 if (ret < 0) {
920 return ret;
921 }
922 }
923
924 if (user_quota.enabled) {
925 RGWStorageStats user_stats;
926 int ret = user_stats_cache.get_stats(user, bucket, user_stats, user_quota);
927 if (ret < 0) {
928 return ret;
929 }
930 ret = check_quota("user", user_quota, user_stats, num_objs, size);
931 if (ret < 0) {
932 return ret;
933 }
934 }
935 return 0;
936 }
937
938 void update_stats(const rgw_user& user, rgw_bucket& bucket, int obj_delta, uint64_t added_bytes, uint64_t removed_bytes) override {
939 bucket_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes);
940 user_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes);
941 }
31f18b77
FG
942
943 int check_bucket_shards(uint64_t max_objs_per_shard, uint64_t num_shards,
224ce89b 944 const rgw_user& user, const rgw_bucket& bucket, RGWQuotaInfo& bucket_quota,
31f18b77
FG
945 uint64_t num_objs, bool& need_resharding, uint32_t *suggested_num_shards)
946 {
947 RGWStorageStats bucket_stats;
948 int ret = bucket_stats_cache.get_stats(user, bucket, bucket_stats,
949 bucket_quota);
950 if (ret < 0) {
951 return ret;
952 }
953
954 if (bucket_stats.num_objects + num_objs > num_shards * max_objs_per_shard) {
955 ldout(store->ctx(), 0) << __func__ << ": resharding needed: stats.num_objects=" << bucket_stats.num_objects
956 << " shard max_objects=" << max_objs_per_shard * num_shards << dendl;
957 need_resharding = true;
958 if (suggested_num_shards) {
959 *suggested_num_shards = (bucket_stats.num_objects + num_objs) * 2 / max_objs_per_shard;
960 }
961 } else {
962 need_resharding = false;
963 }
964
965 return 0;
966 }
967
7c673cae
FG
968};
969
970
971RGWQuotaHandler *RGWQuotaHandler::generate_handler(RGWRados *store, bool quota_threads)
972{
973 return new RGWQuotaHandlerImpl(store, quota_threads);
974}
975
976void RGWQuotaHandler::free_handler(RGWQuotaHandler *handler)
977{
978 delete handler;
979}
31f18b77
FG
980
981