]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_quota.cc
update sources to v12.2.0
[ceph.git] / ceph / src / rgw / rgw_quota.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Inktank, Inc
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16#include "include/utime.h"
17#include "common/lru_map.h"
18#include "common/RefCountedObj.h"
19#include "common/Thread.h"
20#include "common/Mutex.h"
21#include "common/RWLock.h"
22
23#include "rgw_common.h"
24#include "rgw_rados.h"
25#include "rgw_quota.h"
26#include "rgw_bucket.h"
27#include "rgw_user.h"
28
29#include <atomic>
30
31#define dout_context g_ceph_context
32#define dout_subsys ceph_subsys_rgw
33
34
35struct RGWQuotaCacheStats {
36 RGWStorageStats stats;
37 utime_t expiration;
38 utime_t async_refresh_time;
39};
40
41template<class T>
42class RGWQuotaCache {
43protected:
44 RGWRados *store;
45 lru_map<T, RGWQuotaCacheStats> stats_map;
46 RefCountedWaitObject *async_refcount;
47
48 class StatsAsyncTestSet : public lru_map<T, RGWQuotaCacheStats>::UpdateContext {
49 int objs_delta;
50 uint64_t added_bytes;
51 uint64_t removed_bytes;
52 public:
53 StatsAsyncTestSet() : objs_delta(0), added_bytes(0), removed_bytes(0) {}
54 bool update(RGWQuotaCacheStats *entry) override {
55 if (entry->async_refresh_time.sec() == 0)
56 return false;
57
58 entry->async_refresh_time = utime_t(0, 0);
59
60 return true;
61 }
62 };
63
224ce89b 64 virtual int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) = 0;
7c673cae 65
224ce89b 66 virtual bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0;
7c673cae 67
224ce89b
WB
68 virtual bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, typename lru_map<T, RGWQuotaCacheStats>::UpdateContext *ctx) = 0;
69 virtual void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0;
7c673cae
FG
70
71 virtual void data_modified(const rgw_user& user, rgw_bucket& bucket) {}
72public:
73 RGWQuotaCache(RGWRados *_store, int size) : store(_store), stats_map(size) {
74 async_refcount = new RefCountedWaitObject;
75 }
76 virtual ~RGWQuotaCache() {
77 async_refcount->put_wait(); /* wait for all pending async requests to complete */
78 }
79
224ce89b 80 int get_stats(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota);
7c673cae
FG
81 void adjust_stats(const rgw_user& user, rgw_bucket& bucket, int objs_delta, uint64_t added_bytes, uint64_t removed_bytes);
82
83 virtual bool can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& stats);
84
224ce89b
WB
85 void set_stats(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats);
86 int async_refresh(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs);
7c673cae 87 void async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats);
c07f9fc5 88 void async_refresh_fail(const rgw_user& user, rgw_bucket& bucket);
7c673cae
FG
89
90 class AsyncRefreshHandler {
91 protected:
92 RGWRados *store;
93 RGWQuotaCache<T> *cache;
94 public:
95 AsyncRefreshHandler(RGWRados *_store, RGWQuotaCache<T> *_cache) : store(_store), cache(_cache) {}
96 virtual ~AsyncRefreshHandler() {}
97
98 virtual int init_fetch() = 0;
99 virtual void drop_reference() = 0;
100 };
101
224ce89b 102 virtual AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) = 0;
7c673cae
FG
103};
104
105template<class T>
106bool RGWQuotaCache<T>::can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& cached_stats)
107{
108 if (quota.max_size >= 0) {
109 if (quota.max_size_soft_threshold < 0) {
110 quota.max_size_soft_threshold = quota.max_size * store->ctx()->_conf->rgw_bucket_quota_soft_threshold;
111 }
112
113 const auto cached_stats_num_kb_rounded = rgw_rounded_kb(cached_stats.size_rounded);
114 if (cached_stats_num_kb_rounded >= (uint64_t)quota.max_size_soft_threshold) {
115 ldout(store->ctx(), 20) << "quota: can't use cached stats, exceeded soft threshold (size): "
116 << cached_stats_num_kb_rounded << " >= " << quota.max_size_soft_threshold << dendl;
117 return false;
118 }
119 }
120
121 if (quota.max_objects >= 0) {
122 if (quota.max_objs_soft_threshold < 0) {
123 quota.max_objs_soft_threshold = quota.max_objects * store->ctx()->_conf->rgw_bucket_quota_soft_threshold;
124 }
125
126 if (cached_stats.num_objects >= (uint64_t)quota.max_objs_soft_threshold) {
127 ldout(store->ctx(), 20) << "quota: can't use cached stats, exceeded soft threshold (num objs): "
128 << cached_stats.num_objects << " >= " << quota.max_objs_soft_threshold << dendl;
129 return false;
130 }
131 }
132
133 return true;
134}
135
136template<class T>
224ce89b 137int RGWQuotaCache<T>::async_refresh(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs)
7c673cae
FG
138{
139 /* protect against multiple updates */
140 StatsAsyncTestSet test_update;
141 if (!map_find_and_update(user, bucket, &test_update)) {
142 /* most likely we just raced with another update */
143 return 0;
144 }
145
146 async_refcount->get();
147
148
149 AsyncRefreshHandler *handler = allocate_refresh_handler(user, bucket);
150
151 int ret = handler->init_fetch();
152 if (ret < 0) {
153 async_refcount->put();
154 handler->drop_reference();
155 return ret;
156 }
157
158 return 0;
159}
160
c07f9fc5
FG
161template<class T>
162void RGWQuotaCache<T>::async_refresh_fail(const rgw_user& user, rgw_bucket& bucket)
163{
164 ldout(store->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl;
165
166 async_refcount->put();
167}
168
7c673cae
FG
169template<class T>
170void RGWQuotaCache<T>::async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats)
171{
172 ldout(store->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl;
173
174 RGWQuotaCacheStats qs;
175
176 map_find(user, bucket, qs);
177
178 set_stats(user, bucket, qs, stats);
179
180 async_refcount->put();
181}
182
183template<class T>
224ce89b 184void RGWQuotaCache<T>::set_stats(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats)
7c673cae
FG
185{
186 qs.stats = stats;
187 qs.expiration = ceph_clock_now();
188 qs.async_refresh_time = qs.expiration;
189 qs.expiration += store->ctx()->_conf->rgw_bucket_quota_ttl;
190 qs.async_refresh_time += store->ctx()->_conf->rgw_bucket_quota_ttl / 2;
191
192 map_add(user, bucket, qs);
193}
194
195template<class T>
224ce89b 196int RGWQuotaCache<T>::get_stats(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota) {
7c673cae
FG
197 RGWQuotaCacheStats qs;
198 utime_t now = ceph_clock_now();
199 if (map_find(user, bucket, qs)) {
200 if (qs.async_refresh_time.sec() > 0 && now >= qs.async_refresh_time) {
201 int r = async_refresh(user, bucket, qs);
202 if (r < 0) {
203 ldout(store->ctx(), 0) << "ERROR: quota async refresh returned ret=" << r << dendl;
204
205 /* continue processing, might be a transient error, async refresh is just optimization */
206 }
207 }
208
209 if (can_use_cached_stats(quota, qs.stats) && qs.expiration >
210 ceph_clock_now()) {
211 stats = qs.stats;
212 return 0;
213 }
214 }
215
216 int ret = fetch_stats_from_storage(user, bucket, stats);
217 if (ret < 0 && ret != -ENOENT)
218 return ret;
219
220 set_stats(user, bucket, qs, stats);
221
222 return 0;
223}
224
225
226template<class T>
227class RGWQuotaStatsUpdate : public lru_map<T, RGWQuotaCacheStats>::UpdateContext {
228 const int objs_delta;
229 const uint64_t added_bytes;
230 const uint64_t removed_bytes;
231public:
232 RGWQuotaStatsUpdate(const int objs_delta,
233 const uint64_t added_bytes,
234 const uint64_t removed_bytes)
235 : objs_delta(objs_delta),
236 added_bytes(added_bytes),
237 removed_bytes(removed_bytes) {
238 }
239
240 bool update(RGWQuotaCacheStats * const entry) override {
241 const uint64_t rounded_added = rgw_rounded_objsize(added_bytes);
242 const uint64_t rounded_removed = rgw_rounded_objsize(removed_bytes);
243
c07f9fc5
FG
244 if ((entry->stats.size + added_bytes - removed_bytes) >= 0) {
245 entry->stats.size += added_bytes - removed_bytes;
246 } else {
247 entry->stats.size = 0;
248 }
249
250 if ((entry->stats.size_rounded + rounded_added - rounded_removed) >= 0) {
251 entry->stats.size_rounded += rounded_added - rounded_removed;
252 } else {
253 entry->stats.size_rounded = 0;
254 }
255
256 if ((entry->stats.num_objects + objs_delta) >= 0) {
257 entry->stats.num_objects += objs_delta;
258 } else {
259 entry->stats.num_objects = 0;
260 }
7c673cae
FG
261
262 return true;
263 }
264};
265
266
267template<class T>
268void RGWQuotaCache<T>::adjust_stats(const rgw_user& user, rgw_bucket& bucket, int objs_delta,
269 uint64_t added_bytes, uint64_t removed_bytes)
270{
271 RGWQuotaStatsUpdate<T> update(objs_delta, added_bytes, removed_bytes);
272 map_find_and_update(user, bucket, &update);
273
274 data_modified(user, bucket);
275}
276
277class BucketAsyncRefreshHandler : public RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler,
278 public RGWGetBucketStats_CB {
279 rgw_user user;
280public:
281 BucketAsyncRefreshHandler(RGWRados *_store, RGWQuotaCache<rgw_bucket> *_cache,
224ce89b 282 const rgw_user& _user, const rgw_bucket& _bucket) :
7c673cae
FG
283 RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler(_store, _cache),
284 RGWGetBucketStats_CB(_bucket), user(_user) {}
285
286 void drop_reference() override { put(); }
287 void handle_response(int r) override;
288 int init_fetch() override;
289};
290
291int BucketAsyncRefreshHandler::init_fetch()
292{
293 RGWBucketInfo bucket_info;
294
295 RGWObjectCtx obj_ctx(store);
296
297 int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, NULL, NULL);
298 if (r < 0) {
299 ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl;
300 return r;
301 }
302
303 ldout(store->ctx(), 20) << "initiating async quota refresh for bucket=" << bucket << dendl;
304
305 r = store->get_bucket_stats_async(bucket_info, RGW_NO_SHARD, this);
306 if (r < 0) {
307 ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket.name << dendl;
308
309 /* get_bucket_stats_async() dropped our reference already */
310 return r;
311 }
312
313 return 0;
314}
315
316void BucketAsyncRefreshHandler::handle_response(const int r)
317{
318 if (r < 0) {
319 ldout(store->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
c07f9fc5
FG
320 cache->async_refresh_fail(user, bucket);
321 return;
7c673cae
FG
322 }
323
324 RGWStorageStats bs;
325
326 for (const auto& pair : *stats) {
327 const RGWStorageStats& s = pair.second;
328
329 bs.size += s.size;
330 bs.size_rounded += s.size_rounded;
331 bs.num_objects += s.num_objects;
332 }
333
334 cache->async_refresh_response(user, bucket, bs);
335}
336
337class RGWBucketStatsCache : public RGWQuotaCache<rgw_bucket> {
338protected:
224ce89b 339 bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
7c673cae
FG
340 return stats_map.find(bucket, qs);
341 }
342
224ce89b 343 bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, lru_map<rgw_bucket, RGWQuotaCacheStats>::UpdateContext *ctx) override {
7c673cae
FG
344 return stats_map.find_and_update(bucket, NULL, ctx);
345 }
346
224ce89b 347 void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
7c673cae
FG
348 stats_map.add(bucket, qs);
349 }
350
224ce89b 351 int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) override;
7c673cae
FG
352
353public:
354 explicit RGWBucketStatsCache(RGWRados *_store) : RGWQuotaCache<rgw_bucket>(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size) {
355 }
356
224ce89b 357 AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override {
7c673cae
FG
358 return new BucketAsyncRefreshHandler(store, this, user, bucket);
359 }
360};
361
224ce89b 362int RGWBucketStatsCache::fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats)
7c673cae
FG
363{
364 RGWBucketInfo bucket_info;
365
366 RGWObjectCtx obj_ctx(store);
367
368 int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, NULL, NULL);
369 if (r < 0) {
370 ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl;
371 return r;
372 }
373
374 string bucket_ver;
375 string master_ver;
376
377 map<RGWObjCategory, RGWStorageStats> bucket_stats;
378 r = store->get_bucket_stats(bucket_info, RGW_NO_SHARD, &bucket_ver,
379 &master_ver, bucket_stats, nullptr);
380 if (r < 0) {
381 ldout(store->ctx(), 0) << "could not get bucket stats for bucket="
382 << bucket.name << dendl;
383 return r;
384 }
385
386 stats = RGWStorageStats();
387
388 for (const auto& pair : bucket_stats) {
389 const RGWStorageStats& s = pair.second;
390
391 stats.size += s.size;
392 stats.size_rounded += s.size_rounded;
393 stats.num_objects += s.num_objects;
394 }
395
396 return 0;
397}
398
399class UserAsyncRefreshHandler : public RGWQuotaCache<rgw_user>::AsyncRefreshHandler,
400 public RGWGetUserStats_CB {
401 rgw_bucket bucket;
402public:
403 UserAsyncRefreshHandler(RGWRados *_store, RGWQuotaCache<rgw_user> *_cache,
224ce89b 404 const rgw_user& _user, const rgw_bucket& _bucket) :
7c673cae
FG
405 RGWQuotaCache<rgw_user>::AsyncRefreshHandler(_store, _cache),
406 RGWGetUserStats_CB(_user),
407 bucket(_bucket) {}
408
409 void drop_reference() override { put(); }
410 int init_fetch() override;
411 void handle_response(int r) override;
412};
413
414int UserAsyncRefreshHandler::init_fetch()
415{
416 ldout(store->ctx(), 20) << "initiating async quota refresh for user=" << user << dendl;
417 int r = store->get_user_stats_async(user, this);
418 if (r < 0) {
419 ldout(store->ctx(), 0) << "could not get bucket info for user=" << user << dendl;
420
421 /* get_bucket_stats_async() dropped our reference already */
422 return r;
423 }
424
425 return 0;
426}
427
428void UserAsyncRefreshHandler::handle_response(int r)
429{
430 if (r < 0) {
431 ldout(store->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
c07f9fc5
FG
432 cache->async_refresh_fail(user, bucket);
433 return;
7c673cae
FG
434 }
435
436 cache->async_refresh_response(user, bucket, stats);
437}
438
439class RGWUserStatsCache : public RGWQuotaCache<rgw_user> {
440 std::atomic<bool> down_flag = { false };
441 RWLock rwlock;
442 map<rgw_bucket, rgw_user> modified_buckets;
443
444 /* thread, sync recent modified buckets info */
445 class BucketsSyncThread : public Thread {
446 CephContext *cct;
447 RGWUserStatsCache *stats;
448
449 Mutex lock;
450 Cond cond;
451 public:
452
453 BucketsSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s), lock("RGWUserStatsCache::BucketsSyncThread") {}
454
455 void *entry() override {
456 ldout(cct, 20) << "BucketsSyncThread: start" << dendl;
457 do {
458 map<rgw_bucket, rgw_user> buckets;
459
460 stats->swap_modified_buckets(buckets);
461
462 for (map<rgw_bucket, rgw_user>::iterator iter = buckets.begin(); iter != buckets.end(); ++iter) {
463 rgw_bucket bucket = iter->first;
464 rgw_user& user = iter->second;
465 ldout(cct, 20) << "BucketsSyncThread: sync user=" << user << " bucket=" << bucket << dendl;
466 int r = stats->sync_bucket(user, bucket);
467 if (r < 0) {
468 ldout(cct, 0) << "WARNING: sync_bucket() returned r=" << r << dendl;
469 }
470 }
471
472 if (stats->going_down())
473 break;
474
475 lock.Lock();
476 cond.WaitInterval(lock, utime_t(cct->_conf->rgw_user_quota_bucket_sync_interval, 0));
477 lock.Unlock();
478 } while (!stats->going_down());
479 ldout(cct, 20) << "BucketsSyncThread: done" << dendl;
480
481 return NULL;
482 }
483
484 void stop() {
485 Mutex::Locker l(lock);
486 cond.Signal();
487 }
488 };
489
490 /*
491 * thread, full sync all users stats periodically
492 *
493 * only sync non idle users or ones that never got synced before, this is needed so that
494 * users that didn't have quota turned on before (or existed before the user objclass
495 * tracked stats) need to get their backend stats up to date.
496 */
497 class UserSyncThread : public Thread {
498 CephContext *cct;
499 RGWUserStatsCache *stats;
500
501 Mutex lock;
502 Cond cond;
503 public:
504
505 UserSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s), lock("RGWUserStatsCache::UserSyncThread") {}
506
507 void *entry() override {
508 ldout(cct, 20) << "UserSyncThread: start" << dendl;
509 do {
510 int ret = stats->sync_all_users();
511 if (ret < 0) {
512 ldout(cct, 5) << "ERROR: sync_all_users() returned ret=" << ret << dendl;
513 }
514
b5b8bbf5
FG
515 if (stats->going_down())
516 break;
517
7c673cae
FG
518 lock.Lock();
519 cond.WaitInterval(lock, utime_t(cct->_conf->rgw_user_quota_sync_interval, 0));
520 lock.Unlock();
521 } while (!stats->going_down());
522 ldout(cct, 20) << "UserSyncThread: done" << dendl;
523
524 return NULL;
525 }
526
527 void stop() {
528 Mutex::Locker l(lock);
529 cond.Signal();
530 }
531 };
532
533 BucketsSyncThread *buckets_sync_thread;
534 UserSyncThread *user_sync_thread;
535protected:
224ce89b 536 bool map_find(const rgw_user& user,const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
7c673cae
FG
537 return stats_map.find(user, qs);
538 }
539
224ce89b 540 bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, lru_map<rgw_user, RGWQuotaCacheStats>::UpdateContext *ctx) override {
7c673cae
FG
541 return stats_map.find_and_update(user, NULL, ctx);
542 }
543
224ce89b 544 void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
7c673cae
FG
545 stats_map.add(user, qs);
546 }
547
224ce89b 548 int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) override;
7c673cae
FG
549 int sync_bucket(const rgw_user& rgw_user, rgw_bucket& bucket);
550 int sync_user(const rgw_user& user);
551 int sync_all_users();
552
553 void data_modified(const rgw_user& user, rgw_bucket& bucket) override;
554
555 void swap_modified_buckets(map<rgw_bucket, rgw_user>& out) {
556 rwlock.get_write();
557 modified_buckets.swap(out);
558 rwlock.unlock();
559 }
560
561 template<class T> /* easier doing it as a template, Thread doesn't have ->stop() */
562 void stop_thread(T **pthr) {
563 T *thread = *pthr;
564 if (!thread)
565 return;
566
567 thread->stop();
568 thread->join();
569 delete thread;
570 *pthr = NULL;
571 }
572
573public:
574 RGWUserStatsCache(RGWRados *_store, bool quota_threads) : RGWQuotaCache<rgw_user>(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size),
575 rwlock("RGWUserStatsCache::rwlock") {
576 if (quota_threads) {
577 buckets_sync_thread = new BucketsSyncThread(store->ctx(), this);
578 buckets_sync_thread->create("rgw_buck_st_syn");
579 user_sync_thread = new UserSyncThread(store->ctx(), this);
580 user_sync_thread->create("rgw_user_st_syn");
581 } else {
582 buckets_sync_thread = NULL;
583 user_sync_thread = NULL;
584 }
585 }
586 ~RGWUserStatsCache() override {
587 stop();
588 }
589
224ce89b 590 AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override {
7c673cae
FG
591 return new UserAsyncRefreshHandler(store, this, user, bucket);
592 }
593
594 bool can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& stats) override {
595 /* in the user case, the cached stats may contain a better estimation of the totals, as
596 * the backend is only periodically getting updated.
597 */
598 return true;
599 }
600
601 bool going_down() {
602 return down_flag;
603 }
604
605 void stop() {
606 down_flag = true;
607 rwlock.get_write();
608 stop_thread(&buckets_sync_thread);
609 rwlock.unlock();
610 stop_thread(&user_sync_thread);
611 }
612};
613
224ce89b 614int RGWUserStatsCache::fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats)
7c673cae
FG
615{
616 int r = store->get_user_stats(user, stats);
617 if (r < 0) {
618 ldout(store->ctx(), 0) << "could not get user stats for user=" << user << dendl;
619 return r;
620 }
621
622 return 0;
623}
624
625int RGWUserStatsCache::sync_bucket(const rgw_user& user, rgw_bucket& bucket)
626{
627 RGWBucketInfo bucket_info;
628
629 RGWObjectCtx obj_ctx(store);
630
631 int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, NULL, NULL);
632 if (r < 0) {
633 ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl;
634 return r;
635 }
636
637 r = rgw_bucket_sync_user_stats(store, user, bucket_info);
638 if (r < 0) {
639 ldout(store->ctx(), 0) << "ERROR: rgw_bucket_sync_user_stats() for user=" << user << ", bucket=" << bucket << " returned " << r << dendl;
640 return r;
641 }
642
643 return 0;
644}
645
646int RGWUserStatsCache::sync_user(const rgw_user& user)
647{
648 cls_user_header header;
649 string user_str = user.to_str();
650 int ret = store->cls_user_get_header(user_str, &header);
651 if (ret < 0) {
652 ldout(store->ctx(), 5) << "ERROR: can't read user header: ret=" << ret << dendl;
653 return ret;
654 }
655
656 if (!store->ctx()->_conf->rgw_user_quota_sync_idle_users &&
657 header.last_stats_update < header.last_stats_sync) {
658 ldout(store->ctx(), 20) << "user is idle, not doing a full sync (user=" << user << ")" << dendl;
659 return 0;
660 }
661
662 real_time when_need_full_sync = header.last_stats_sync;
663 when_need_full_sync += make_timespan(store->ctx()->_conf->rgw_user_quota_sync_wait_time);
664
665 // check if enough time passed since last full sync
666 /* FIXME: missing check? */
667
668 ret = rgw_user_sync_all_stats(store, user);
669 if (ret < 0) {
670 ldout(store->ctx(), 0) << "ERROR: failed user stats sync, ret=" << ret << dendl;
671 return ret;
672 }
673
674 return 0;
675}
676
677int RGWUserStatsCache::sync_all_users()
678{
679 string key = "user";
680 void *handle;
681
682 int ret = store->meta_mgr->list_keys_init(key, &handle);
683 if (ret < 0) {
684 ldout(store->ctx(), 10) << "ERROR: can't get key: ret=" << ret << dendl;
685 return ret;
686 }
687
688 bool truncated;
689 int max = 1000;
690
691 do {
692 list<string> keys;
693 ret = store->meta_mgr->list_keys_next(handle, max, keys, &truncated);
694 if (ret < 0) {
695 ldout(store->ctx(), 0) << "ERROR: lists_keys_next(): ret=" << ret << dendl;
696 goto done;
697 }
698 for (list<string>::iterator iter = keys.begin();
699 iter != keys.end() && !going_down();
700 ++iter) {
701 rgw_user user(*iter);
702 ldout(store->ctx(), 20) << "RGWUserStatsCache: sync user=" << user << dendl;
703 int ret = sync_user(user);
704 if (ret < 0) {
705 ldout(store->ctx(), 5) << "ERROR: sync_user() failed, user=" << user << " ret=" << ret << dendl;
706
707 /* continuing to next user */
708 continue;
709 }
710 }
711 } while (truncated);
712
713 ret = 0;
714done:
715 store->meta_mgr->list_keys_complete(handle);
716 return ret;
717}
718
719void RGWUserStatsCache::data_modified(const rgw_user& user, rgw_bucket& bucket)
720{
721 /* racy, but it's ok */
722 rwlock.get_read();
723 bool need_update = modified_buckets.find(bucket) == modified_buckets.end();
724 rwlock.unlock();
725
726 if (need_update) {
727 rwlock.get_write();
728 modified_buckets[bucket] = user;
729 rwlock.unlock();
730 }
731}
732
733
734class RGWQuotaInfoApplier {
735 /* NOTE: no non-static field allowed as instances are supposed to live in
736 * the static memory only. */
737protected:
738 RGWQuotaInfoApplier() = default;
739
740public:
741 virtual ~RGWQuotaInfoApplier() {}
742
743 virtual bool is_size_exceeded(const char * const entity,
744 const RGWQuotaInfo& qinfo,
745 const RGWStorageStats& stats,
746 const uint64_t size) const = 0;
747
748 virtual bool is_num_objs_exceeded(const char * const entity,
749 const RGWQuotaInfo& qinfo,
750 const RGWStorageStats& stats,
751 const uint64_t num_objs) const = 0;
752
753 static const RGWQuotaInfoApplier& get_instance(const RGWQuotaInfo& qinfo);
754};
755
756class RGWQuotaInfoDefApplier : public RGWQuotaInfoApplier {
757public:
758 bool is_size_exceeded(const char * const entity,
759 const RGWQuotaInfo& qinfo,
760 const RGWStorageStats& stats,
761 const uint64_t size) const override;
762
763 bool is_num_objs_exceeded(const char * const entity,
764 const RGWQuotaInfo& qinfo,
765 const RGWStorageStats& stats,
766 const uint64_t num_objs) const override;
767};
768
769class RGWQuotaInfoRawApplier : public RGWQuotaInfoApplier {
770public:
771 bool is_size_exceeded(const char * const entity,
772 const RGWQuotaInfo& qinfo,
773 const RGWStorageStats& stats,
774 const uint64_t size) const override;
775
776 bool is_num_objs_exceeded(const char * const entity,
777 const RGWQuotaInfo& qinfo,
778 const RGWStorageStats& stats,
779 const uint64_t num_objs) const override;
780};
781
782
783bool RGWQuotaInfoDefApplier::is_size_exceeded(const char * const entity,
784 const RGWQuotaInfo& qinfo,
785 const RGWStorageStats& stats,
786 const uint64_t size) const
787{
788 if (qinfo.max_size < 0) {
789 /* The limit is not enabled. */
790 return false;
791 }
792
793 const uint64_t cur_size = stats.size_rounded;
794 const uint64_t new_size = rgw_rounded_objsize(size);
795
796 if (cur_size + new_size > static_cast<uint64_t>(qinfo.max_size)) {
797 dout(10) << "quota exceeded: stats.size_rounded=" << stats.size_rounded
798 << " size=" << new_size << " "
799 << entity << "_quota.max_size=" << qinfo.max_size << dendl;
800 return true;
801 }
802
803 return false;
804}
805
806bool RGWQuotaInfoDefApplier::is_num_objs_exceeded(const char * const entity,
807 const RGWQuotaInfo& qinfo,
808 const RGWStorageStats& stats,
809 const uint64_t num_objs) const
810{
811 if (qinfo.max_objects < 0) {
812 /* The limit is not enabled. */
813 return false;
814 }
815
816 if (stats.num_objects + num_objs > static_cast<uint64_t>(qinfo.max_objects)) {
817 dout(10) << "quota exceeded: stats.num_objects=" << stats.num_objects
818 << " " << entity << "_quota.max_objects=" << qinfo.max_objects
819 << dendl;
820 return true;
821 }
822
823 return false;
824}
825
826bool RGWQuotaInfoRawApplier::is_size_exceeded(const char * const entity,
827 const RGWQuotaInfo& qinfo,
828 const RGWStorageStats& stats,
829 const uint64_t size) const
830{
831 if (qinfo.max_size < 0) {
832 /* The limit is not enabled. */
833 return false;
834 }
835
836 const uint64_t cur_size = stats.size;
837
838 if (cur_size + size > static_cast<uint64_t>(qinfo.max_size)) {
839 dout(10) << "quota exceeded: stats.size=" << stats.size
840 << " size=" << size << " "
841 << entity << "_quota.max_size=" << qinfo.max_size << dendl;
842 return true;
843 }
844
845 return false;
846}
847
848bool RGWQuotaInfoRawApplier::is_num_objs_exceeded(const char * const entity,
849 const RGWQuotaInfo& qinfo,
850 const RGWStorageStats& stats,
851 const uint64_t num_objs) const
852{
853 if (qinfo.max_objects < 0) {
854 /* The limit is not enabled. */
855 return false;
856 }
857
858 if (stats.num_objects + num_objs > static_cast<uint64_t>(qinfo.max_objects)) {
859 dout(10) << "quota exceeded: stats.num_objects=" << stats.num_objects
860 << " " << entity << "_quota.max_objects=" << qinfo.max_objects
861 << dendl;
862 return true;
863 }
864
865 return false;
866}
867
868const RGWQuotaInfoApplier& RGWQuotaInfoApplier::get_instance(
869 const RGWQuotaInfo& qinfo)
870{
871 static RGWQuotaInfoDefApplier default_qapplier;
872 static RGWQuotaInfoRawApplier raw_qapplier;
873
874 if (qinfo.check_on_raw) {
875 return raw_qapplier;
876 } else {
877 return default_qapplier;
878 }
879}
880
881
882class RGWQuotaHandlerImpl : public RGWQuotaHandler {
883 RGWRados *store;
884 RGWBucketStatsCache bucket_stats_cache;
885 RGWUserStatsCache user_stats_cache;
886
887 int check_quota(const char * const entity,
888 const RGWQuotaInfo& quota,
889 const RGWStorageStats& stats,
890 const uint64_t num_objs,
891 const uint64_t size) {
892 if (!quota.enabled) {
893 return 0;
894 }
895
896 const auto& quota_applier = RGWQuotaInfoApplier::get_instance(quota);
897
898 ldout(store->ctx(), 20) << entity
899 << " quota: max_objects=" << quota.max_objects
900 << " max_size=" << quota.max_size << dendl;
901
902
903 if (quota_applier.is_num_objs_exceeded(entity, quota, stats, num_objs)) {
904 return -ERR_QUOTA_EXCEEDED;
905 }
906
907 if (quota_applier.is_size_exceeded(entity, quota, stats, size)) {
908 return -ERR_QUOTA_EXCEEDED;
909 }
910
911 ldout(store->ctx(), 20) << entity << " quota OK:"
912 << " stats.num_objects=" << stats.num_objects
913 << " stats.size=" << stats.size << dendl;
914 return 0;
915 }
916public:
917 RGWQuotaHandlerImpl(RGWRados *_store, bool quota_threads) : store(_store),
918 bucket_stats_cache(_store),
919 user_stats_cache(_store, quota_threads) {}
920
921 int check_quota(const rgw_user& user,
922 rgw_bucket& bucket,
923 RGWQuotaInfo& user_quota,
924 RGWQuotaInfo& bucket_quota,
925 uint64_t num_objs,
926 uint64_t size) override {
927
928 if (!bucket_quota.enabled && !user_quota.enabled) {
929 return 0;
930 }
931
932 /*
933 * we need to fetch bucket stats if the user quota is enabled, because
934 * the whole system relies on us periodically updating the user's bucket
935 * stats in the user's header, this happens in get_stats() if we actually
936 * fetch that info and not rely on cached data
937 */
938
939 if (bucket_quota.enabled) {
940 RGWStorageStats bucket_stats;
941 int ret = bucket_stats_cache.get_stats(user, bucket, bucket_stats,
942 bucket_quota);
943 if (ret < 0) {
944 return ret;
945 }
946 ret = check_quota("bucket", bucket_quota, bucket_stats, num_objs, size);
947 if (ret < 0) {
948 return ret;
949 }
950 }
951
952 if (user_quota.enabled) {
953 RGWStorageStats user_stats;
954 int ret = user_stats_cache.get_stats(user, bucket, user_stats, user_quota);
955 if (ret < 0) {
956 return ret;
957 }
958 ret = check_quota("user", user_quota, user_stats, num_objs, size);
959 if (ret < 0) {
960 return ret;
961 }
962 }
963 return 0;
964 }
965
966 void update_stats(const rgw_user& user, rgw_bucket& bucket, int obj_delta, uint64_t added_bytes, uint64_t removed_bytes) override {
967 bucket_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes);
968 user_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes);
969 }
31f18b77
FG
970
971 int check_bucket_shards(uint64_t max_objs_per_shard, uint64_t num_shards,
224ce89b 972 const rgw_user& user, const rgw_bucket& bucket, RGWQuotaInfo& bucket_quota,
31f18b77
FG
973 uint64_t num_objs, bool& need_resharding, uint32_t *suggested_num_shards)
974 {
975 RGWStorageStats bucket_stats;
976 int ret = bucket_stats_cache.get_stats(user, bucket, bucket_stats,
977 bucket_quota);
978 if (ret < 0) {
979 return ret;
980 }
981
982 if (bucket_stats.num_objects + num_objs > num_shards * max_objs_per_shard) {
983 ldout(store->ctx(), 0) << __func__ << ": resharding needed: stats.num_objects=" << bucket_stats.num_objects
984 << " shard max_objects=" << max_objs_per_shard * num_shards << dendl;
985 need_resharding = true;
986 if (suggested_num_shards) {
987 *suggested_num_shards = (bucket_stats.num_objects + num_objs) * 2 / max_objs_per_shard;
988 }
989 } else {
990 need_resharding = false;
991 }
992
993 return 0;
994 }
995
7c673cae
FG
996};
997
998
999RGWQuotaHandler *RGWQuotaHandler::generate_handler(RGWRados *store, bool quota_threads)
1000{
1001 return new RGWQuotaHandlerImpl(store, quota_threads);
1002}
1003
1004void RGWQuotaHandler::free_handler(RGWQuotaHandler *handler)
1005{
1006 delete handler;
1007}
31f18b77
FG
1008
1009