]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_quota.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / rgw / rgw_quota.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 /*
5 * Ceph - scalable distributed file system
6 *
7 * Copyright (C) 2013 Inktank, Inc
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16
17 #include "include/utime.h"
18 #include "common/lru_map.h"
19 #include "common/RefCountedObj.h"
20 #include "common/Thread.h"
21 #include "common/ceph_mutex.h"
22
23 #include "rgw_common.h"
24 #include "rgw_sal.h"
25 #include "rgw_quota.h"
26 #include "rgw_bucket.h"
27 #include "rgw_user.h"
28
29 #include "services/svc_sys_obj.h"
30 #include "services/svc_meta.h"
31
32 #include <atomic>
33
34 #define dout_context g_ceph_context
35 #define dout_subsys ceph_subsys_rgw
36
37
38 struct RGWQuotaCacheStats {
39 RGWStorageStats stats;
40 utime_t expiration;
41 utime_t async_refresh_time;
42 };
43
44 template<class T>
45 class RGWQuotaCache {
46 protected:
47 rgw::sal::RGWRadosStore *store;
48 lru_map<T, RGWQuotaCacheStats> stats_map;
49 RefCountedWaitObject *async_refcount;
50
51 class StatsAsyncTestSet : public lru_map<T, RGWQuotaCacheStats>::UpdateContext {
52 int objs_delta;
53 uint64_t added_bytes;
54 uint64_t removed_bytes;
55 public:
56 StatsAsyncTestSet() : objs_delta(0), added_bytes(0), removed_bytes(0) {}
57 bool update(RGWQuotaCacheStats *entry) override {
58 if (entry->async_refresh_time.sec() == 0)
59 return false;
60
61 entry->async_refresh_time = utime_t(0, 0);
62
63 return true;
64 }
65 };
66
67 virtual int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) = 0;
68
69 virtual bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0;
70
71 virtual bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, typename lru_map<T, RGWQuotaCacheStats>::UpdateContext *ctx) = 0;
72 virtual void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0;
73
74 virtual void data_modified(const rgw_user& user, rgw_bucket& bucket) {}
75 public:
76 RGWQuotaCache(rgw::sal::RGWRadosStore *_store, int size) : store(_store), stats_map(size) {
77 async_refcount = new RefCountedWaitObject;
78 }
79 virtual ~RGWQuotaCache() {
80 async_refcount->put_wait(); /* wait for all pending async requests to complete */
81 }
82
83 int get_stats(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota);
84 void adjust_stats(const rgw_user& user, rgw_bucket& bucket, int objs_delta, uint64_t added_bytes, uint64_t removed_bytes);
85
86 virtual bool can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& stats);
87
88 void set_stats(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats);
89 int async_refresh(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs);
90 void async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats);
91 void async_refresh_fail(const rgw_user& user, rgw_bucket& bucket);
92
93 class AsyncRefreshHandler {
94 protected:
95 rgw::sal::RGWRadosStore *store;
96 RGWQuotaCache<T> *cache;
97 public:
98 AsyncRefreshHandler(rgw::sal::RGWRadosStore *_store, RGWQuotaCache<T> *_cache) : store(_store), cache(_cache) {}
99 virtual ~AsyncRefreshHandler() {}
100
101 virtual int init_fetch() = 0;
102 virtual void drop_reference() = 0;
103 };
104
105 virtual AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) = 0;
106 };
107
108 template<class T>
109 bool RGWQuotaCache<T>::can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& cached_stats)
110 {
111 if (quota.max_size >= 0) {
112 if (quota.max_size_soft_threshold < 0) {
113 quota.max_size_soft_threshold = quota.max_size * store->ctx()->_conf->rgw_bucket_quota_soft_threshold;
114 }
115
116 if (cached_stats.size_rounded >= (uint64_t)quota.max_size_soft_threshold) {
117 ldout(store->ctx(), 20) << "quota: can't use cached stats, exceeded soft threshold (size): "
118 << cached_stats.size_rounded << " >= " << quota.max_size_soft_threshold << dendl;
119 return false;
120 }
121 }
122
123 if (quota.max_objects >= 0) {
124 if (quota.max_objs_soft_threshold < 0) {
125 quota.max_objs_soft_threshold = quota.max_objects * store->ctx()->_conf->rgw_bucket_quota_soft_threshold;
126 }
127
128 if (cached_stats.num_objects >= (uint64_t)quota.max_objs_soft_threshold) {
129 ldout(store->ctx(), 20) << "quota: can't use cached stats, exceeded soft threshold (num objs): "
130 << cached_stats.num_objects << " >= " << quota.max_objs_soft_threshold << dendl;
131 return false;
132 }
133 }
134
135 return true;
136 }
137
138 template<class T>
139 int RGWQuotaCache<T>::async_refresh(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs)
140 {
141 /* protect against multiple updates */
142 StatsAsyncTestSet test_update;
143 if (!map_find_and_update(user, bucket, &test_update)) {
144 /* most likely we just raced with another update */
145 return 0;
146 }
147
148 async_refcount->get();
149
150
151 AsyncRefreshHandler *handler = allocate_refresh_handler(user, bucket);
152
153 int ret = handler->init_fetch();
154 if (ret < 0) {
155 async_refcount->put();
156 handler->drop_reference();
157 return ret;
158 }
159
160 return 0;
161 }
162
163 template<class T>
164 void RGWQuotaCache<T>::async_refresh_fail(const rgw_user& user, rgw_bucket& bucket)
165 {
166 ldout(store->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl;
167
168 async_refcount->put();
169 }
170
171 template<class T>
172 void RGWQuotaCache<T>::async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats)
173 {
174 ldout(store->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl;
175
176 RGWQuotaCacheStats qs;
177
178 map_find(user, bucket, qs);
179
180 set_stats(user, bucket, qs, stats);
181
182 async_refcount->put();
183 }
184
185 template<class T>
186 void RGWQuotaCache<T>::set_stats(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats)
187 {
188 qs.stats = stats;
189 qs.expiration = ceph_clock_now();
190 qs.async_refresh_time = qs.expiration;
191 qs.expiration += store->ctx()->_conf->rgw_bucket_quota_ttl;
192 qs.async_refresh_time += store->ctx()->_conf->rgw_bucket_quota_ttl / 2;
193
194 map_add(user, bucket, qs);
195 }
196
197 template<class T>
198 int RGWQuotaCache<T>::get_stats(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota) {
199 RGWQuotaCacheStats qs;
200 utime_t now = ceph_clock_now();
201 if (map_find(user, bucket, qs)) {
202 if (qs.async_refresh_time.sec() > 0 && now >= qs.async_refresh_time) {
203 int r = async_refresh(user, bucket, qs);
204 if (r < 0) {
205 ldout(store->ctx(), 0) << "ERROR: quota async refresh returned ret=" << r << dendl;
206
207 /* continue processing, might be a transient error, async refresh is just optimization */
208 }
209 }
210
211 if (can_use_cached_stats(quota, qs.stats) && qs.expiration >
212 ceph_clock_now()) {
213 stats = qs.stats;
214 return 0;
215 }
216 }
217
218 int ret = fetch_stats_from_storage(user, bucket, stats);
219 if (ret < 0 && ret != -ENOENT)
220 return ret;
221
222 set_stats(user, bucket, qs, stats);
223
224 return 0;
225 }
226
227
228 template<class T>
229 class RGWQuotaStatsUpdate : public lru_map<T, RGWQuotaCacheStats>::UpdateContext {
230 const int objs_delta;
231 const uint64_t added_bytes;
232 const uint64_t removed_bytes;
233 public:
234 RGWQuotaStatsUpdate(const int objs_delta,
235 const uint64_t added_bytes,
236 const uint64_t removed_bytes)
237 : objs_delta(objs_delta),
238 added_bytes(added_bytes),
239 removed_bytes(removed_bytes) {
240 }
241
242 bool update(RGWQuotaCacheStats * const entry) override {
243 const uint64_t rounded_added = rgw_rounded_objsize(added_bytes);
244 const uint64_t rounded_removed = rgw_rounded_objsize(removed_bytes);
245
246 if (((int64_t)(entry->stats.size + added_bytes - removed_bytes)) >= 0) {
247 entry->stats.size += added_bytes - removed_bytes;
248 } else {
249 entry->stats.size = 0;
250 }
251
252 if (((int64_t)(entry->stats.size_rounded + rounded_added - rounded_removed)) >= 0) {
253 entry->stats.size_rounded += rounded_added - rounded_removed;
254 } else {
255 entry->stats.size_rounded = 0;
256 }
257
258 if (((int64_t)(entry->stats.num_objects + objs_delta)) >= 0) {
259 entry->stats.num_objects += objs_delta;
260 } else {
261 entry->stats.num_objects = 0;
262 }
263
264 return true;
265 }
266 };
267
268
269 template<class T>
270 void RGWQuotaCache<T>::adjust_stats(const rgw_user& user, rgw_bucket& bucket, int objs_delta,
271 uint64_t added_bytes, uint64_t removed_bytes)
272 {
273 RGWQuotaStatsUpdate<T> update(objs_delta, added_bytes, removed_bytes);
274 map_find_and_update(user, bucket, &update);
275
276 data_modified(user, bucket);
277 }
278
279 class BucketAsyncRefreshHandler : public RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler,
280 public RGWGetBucketStats_CB {
281 rgw_user user;
282 public:
283 BucketAsyncRefreshHandler(rgw::sal::RGWRadosStore *_store, RGWQuotaCache<rgw_bucket> *_cache,
284 const rgw_user& _user, const rgw_bucket& _bucket) :
285 RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler(_store, _cache),
286 RGWGetBucketStats_CB(_bucket), user(_user) {}
287
288 void drop_reference() override { put(); }
289 void handle_response(int r) override;
290 int init_fetch() override;
291 };
292
293 int BucketAsyncRefreshHandler::init_fetch()
294 {
295 RGWBucketInfo bucket_info;
296
297 auto obj_ctx = store->svc()->sysobj->init_obj_ctx();
298
299 int r = store->getRados()->get_bucket_instance_info(obj_ctx, bucket, bucket_info, NULL, NULL, null_yield);
300 if (r < 0) {
301 ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl;
302 return r;
303 }
304
305 ldout(store->ctx(), 20) << "initiating async quota refresh for bucket=" << bucket << dendl;
306
307 r = store->getRados()->get_bucket_stats_async(bucket_info, RGW_NO_SHARD, this);
308 if (r < 0) {
309 ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket.name << dendl;
310
311 /* get_bucket_stats_async() dropped our reference already */
312 return r;
313 }
314
315 return 0;
316 }
317
318 void BucketAsyncRefreshHandler::handle_response(const int r)
319 {
320 if (r < 0) {
321 ldout(store->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
322 cache->async_refresh_fail(user, bucket);
323 return;
324 }
325
326 RGWStorageStats bs;
327
328 for (const auto& pair : *stats) {
329 const RGWStorageStats& s = pair.second;
330
331 bs.size += s.size;
332 bs.size_rounded += s.size_rounded;
333 bs.num_objects += s.num_objects;
334 }
335
336 cache->async_refresh_response(user, bucket, bs);
337 }
338
339 class RGWBucketStatsCache : public RGWQuotaCache<rgw_bucket> {
340 protected:
341 bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
342 return stats_map.find(bucket, qs);
343 }
344
345 bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, lru_map<rgw_bucket, RGWQuotaCacheStats>::UpdateContext *ctx) override {
346 return stats_map.find_and_update(bucket, NULL, ctx);
347 }
348
349 void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
350 stats_map.add(bucket, qs);
351 }
352
353 int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) override;
354
355 public:
356 explicit RGWBucketStatsCache(rgw::sal::RGWRadosStore *_store) : RGWQuotaCache<rgw_bucket>(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size) {
357 }
358
359 AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override {
360 return new BucketAsyncRefreshHandler(store, this, user, bucket);
361 }
362 };
363
364 int RGWBucketStatsCache::fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats)
365 {
366 RGWBucketInfo bucket_info;
367
368 RGWSysObjectCtx obj_ctx = store->svc()->sysobj->init_obj_ctx();
369
370 int r = store->getRados()->get_bucket_instance_info(obj_ctx, bucket, bucket_info, NULL, NULL, null_yield);
371 if (r < 0) {
372 ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl;
373 return r;
374 }
375
376 string bucket_ver;
377 string master_ver;
378
379 map<RGWObjCategory, RGWStorageStats> bucket_stats;
380 r = store->getRados()->get_bucket_stats(bucket_info, RGW_NO_SHARD, &bucket_ver,
381 &master_ver, bucket_stats, nullptr);
382 if (r < 0) {
383 ldout(store->ctx(), 0) << "could not get bucket stats for bucket="
384 << bucket.name << dendl;
385 return r;
386 }
387
388 stats = RGWStorageStats();
389
390 for (const auto& pair : bucket_stats) {
391 const RGWStorageStats& s = pair.second;
392
393 stats.size += s.size;
394 stats.size_rounded += s.size_rounded;
395 stats.num_objects += s.num_objects;
396 }
397
398 return 0;
399 }
400
401 class UserAsyncRefreshHandler : public RGWQuotaCache<rgw_user>::AsyncRefreshHandler,
402 public RGWGetUserStats_CB {
403 rgw_bucket bucket;
404 public:
405 UserAsyncRefreshHandler(rgw::sal::RGWRadosStore *_store, RGWQuotaCache<rgw_user> *_cache,
406 const rgw_user& _user, const rgw_bucket& _bucket) :
407 RGWQuotaCache<rgw_user>::AsyncRefreshHandler(_store, _cache),
408 RGWGetUserStats_CB(_user),
409 bucket(_bucket) {}
410
411 void drop_reference() override { put(); }
412 int init_fetch() override;
413 void handle_response(int r) override;
414 };
415
416 int UserAsyncRefreshHandler::init_fetch()
417 {
418 ldout(store->ctx(), 20) << "initiating async quota refresh for user=" << user << dendl;
419 int r = store->ctl()->user->read_stats_async(user, this);
420 if (r < 0) {
421 ldout(store->ctx(), 0) << "could not get bucket info for user=" << user << dendl;
422
423 /* get_bucket_stats_async() dropped our reference already */
424 return r;
425 }
426
427 return 0;
428 }
429
430 void UserAsyncRefreshHandler::handle_response(int r)
431 {
432 if (r < 0) {
433 ldout(store->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
434 cache->async_refresh_fail(user, bucket);
435 return;
436 }
437
438 cache->async_refresh_response(user, bucket, stats);
439 }
440
441 class RGWUserStatsCache : public RGWQuotaCache<rgw_user> {
442 std::atomic<bool> down_flag = { false };
443 ceph::shared_mutex mutex = ceph::make_shared_mutex("RGWUserStatsCache");
444 map<rgw_bucket, rgw_user> modified_buckets;
445
446 /* thread, sync recent modified buckets info */
447 class BucketsSyncThread : public Thread {
448 CephContext *cct;
449 RGWUserStatsCache *stats;
450
451 ceph::mutex lock = ceph::make_mutex("RGWUserStatsCache::BucketsSyncThread");
452 ceph::condition_variable cond;
453 public:
454
455 BucketsSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s) {}
456
457 void *entry() override {
458 ldout(cct, 20) << "BucketsSyncThread: start" << dendl;
459 do {
460 map<rgw_bucket, rgw_user> buckets;
461
462 stats->swap_modified_buckets(buckets);
463
464 for (map<rgw_bucket, rgw_user>::iterator iter = buckets.begin(); iter != buckets.end(); ++iter) {
465 rgw_bucket bucket = iter->first;
466 rgw_user& user = iter->second;
467 ldout(cct, 20) << "BucketsSyncThread: sync user=" << user << " bucket=" << bucket << dendl;
468 int r = stats->sync_bucket(user, bucket);
469 if (r < 0) {
470 ldout(cct, 0) << "WARNING: sync_bucket() returned r=" << r << dendl;
471 }
472 }
473
474 if (stats->going_down())
475 break;
476
477 std::unique_lock locker{lock};
478 cond.wait_for(
479 locker,
480 std::chrono::seconds(cct->_conf->rgw_user_quota_bucket_sync_interval));
481 } while (!stats->going_down());
482 ldout(cct, 20) << "BucketsSyncThread: done" << dendl;
483
484 return NULL;
485 }
486
487 void stop() {
488 std::lock_guard l{lock};
489 cond.notify_all();
490 }
491 };
492
493 /*
494 * thread, full sync all users stats periodically
495 *
496 * only sync non idle users or ones that never got synced before, this is needed so that
497 * users that didn't have quota turned on before (or existed before the user objclass
498 * tracked stats) need to get their backend stats up to date.
499 */
500 class UserSyncThread : public Thread {
501 CephContext *cct;
502 RGWUserStatsCache *stats;
503
504 ceph::mutex lock = ceph::make_mutex("RGWUserStatsCache::UserSyncThread");
505 ceph::condition_variable cond;
506 public:
507
508 UserSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s) {}
509
510 void *entry() override {
511 ldout(cct, 20) << "UserSyncThread: start" << dendl;
512 do {
513 int ret = stats->sync_all_users();
514 if (ret < 0) {
515 ldout(cct, 5) << "ERROR: sync_all_users() returned ret=" << ret << dendl;
516 }
517
518 if (stats->going_down())
519 break;
520
521 std::unique_lock l{lock};
522 cond.wait_for(l, std::chrono::seconds(cct->_conf->rgw_user_quota_sync_interval));
523 } while (!stats->going_down());
524 ldout(cct, 20) << "UserSyncThread: done" << dendl;
525
526 return NULL;
527 }
528
529 void stop() {
530 std::lock_guard l{lock};
531 cond.notify_all();
532 }
533 };
534
535 BucketsSyncThread *buckets_sync_thread;
536 UserSyncThread *user_sync_thread;
537 protected:
538 bool map_find(const rgw_user& user,const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
539 return stats_map.find(user, qs);
540 }
541
542 bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, lru_map<rgw_user, RGWQuotaCacheStats>::UpdateContext *ctx) override {
543 return stats_map.find_and_update(user, NULL, ctx);
544 }
545
546 void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
547 stats_map.add(user, qs);
548 }
549
550 int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) override;
551 int sync_bucket(const rgw_user& rgw_user, rgw_bucket& bucket);
552 int sync_user(const rgw_user& user);
553 int sync_all_users();
554
555 void data_modified(const rgw_user& user, rgw_bucket& bucket) override;
556
557 void swap_modified_buckets(map<rgw_bucket, rgw_user>& out) {
558 std::unique_lock lock{mutex};
559 modified_buckets.swap(out);
560 }
561
562 template<class T> /* easier doing it as a template, Thread doesn't have ->stop() */
563 void stop_thread(T **pthr) {
564 T *thread = *pthr;
565 if (!thread)
566 return;
567
568 thread->stop();
569 thread->join();
570 delete thread;
571 *pthr = NULL;
572 }
573
574 public:
575 RGWUserStatsCache(rgw::sal::RGWRadosStore *_store, bool quota_threads)
576 : RGWQuotaCache<rgw_user>(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size)
577 {
578 if (quota_threads) {
579 buckets_sync_thread = new BucketsSyncThread(store->ctx(), this);
580 buckets_sync_thread->create("rgw_buck_st_syn");
581 user_sync_thread = new UserSyncThread(store->ctx(), this);
582 user_sync_thread->create("rgw_user_st_syn");
583 } else {
584 buckets_sync_thread = NULL;
585 user_sync_thread = NULL;
586 }
587 }
588 ~RGWUserStatsCache() override {
589 stop();
590 }
591
592 AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override {
593 return new UserAsyncRefreshHandler(store, this, user, bucket);
594 }
595
596 bool can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& stats) override {
597 /* in the user case, the cached stats may contain a better estimation of the totals, as
598 * the backend is only periodically getting updated.
599 */
600 return true;
601 }
602
603 bool going_down() {
604 return down_flag;
605 }
606
607 void stop() {
608 down_flag = true;
609 {
610 std::unique_lock lock{mutex};
611 stop_thread(&buckets_sync_thread);
612 }
613 stop_thread(&user_sync_thread);
614 }
615 };
616
617 int RGWUserStatsCache::fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats)
618 {
619 int r = store->ctl()->user->read_stats(user, &stats);
620 if (r < 0) {
621 ldout(store->ctx(), 0) << "could not get user stats for user=" << user << dendl;
622 return r;
623 }
624
625 return 0;
626 }
627
628 int RGWUserStatsCache::sync_bucket(const rgw_user& user, rgw_bucket& bucket)
629 {
630 RGWBucketInfo bucket_info;
631
632 int r = store->ctl()->bucket->read_bucket_instance_info(bucket, &bucket_info, null_yield);
633 if (r < 0) {
634 ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl;
635 return r;
636 }
637
638 RGWBucketEnt ent;
639 r = store->ctl()->bucket->sync_user_stats(user, bucket_info, &ent);
640 if (r < 0) {
641 ldout(store->ctx(), 0) << "ERROR: sync_user_stats() for user=" << user << ", bucket=" << bucket << " returned " << r << dendl;
642 return r;
643 }
644
645 return store->getRados()->check_bucket_shards(bucket_info, bucket, ent.count);
646 }
647
648 int RGWUserStatsCache::sync_user(const rgw_user& user)
649 {
650 string user_str = user.to_str();
651 RGWStorageStats stats;
652 ceph::real_time last_stats_sync;
653 ceph::real_time last_stats_update;
654
655 int ret = store->ctl()->user->read_stats(rgw_user(user_str), &stats, &last_stats_sync, &last_stats_update);
656 if (ret < 0) {
657 ldout(store->ctx(), 5) << "ERROR: can't read user header: ret=" << ret << dendl;
658 return ret;
659 }
660
661 if (!store->ctx()->_conf->rgw_user_quota_sync_idle_users &&
662 last_stats_update < last_stats_sync) {
663 ldout(store->ctx(), 20) << "user is idle, not doing a full sync (user=" << user << ")" << dendl;
664 return 0;
665 }
666
667 real_time when_need_full_sync = last_stats_sync;
668 when_need_full_sync += make_timespan(store->ctx()->_conf->rgw_user_quota_sync_wait_time);
669
670 // check if enough time passed since last full sync
671 /* FIXME: missing check? */
672
673 ret = rgw_user_sync_all_stats(store, user);
674 if (ret < 0) {
675 ldout(store->ctx(), 0) << "ERROR: failed user stats sync, ret=" << ret << dendl;
676 return ret;
677 }
678
679 return 0;
680 }
681
682 int RGWUserStatsCache::sync_all_users()
683 {
684 string key = "user";
685 void *handle;
686
687 int ret = store->ctl()->meta.mgr->list_keys_init(key, &handle);
688 if (ret < 0) {
689 ldout(store->ctx(), 10) << "ERROR: can't get key: ret=" << ret << dendl;
690 return ret;
691 }
692
693 bool truncated;
694 int max = 1000;
695
696 do {
697 list<string> keys;
698 ret = store->ctl()->meta.mgr->list_keys_next(handle, max, keys, &truncated);
699 if (ret < 0) {
700 ldout(store->ctx(), 0) << "ERROR: lists_keys_next(): ret=" << ret << dendl;
701 goto done;
702 }
703 for (list<string>::iterator iter = keys.begin();
704 iter != keys.end() && !going_down();
705 ++iter) {
706 rgw_user user(*iter);
707 ldout(store->ctx(), 20) << "RGWUserStatsCache: sync user=" << user << dendl;
708 int ret = sync_user(user);
709 if (ret < 0) {
710 ldout(store->ctx(), 5) << "ERROR: sync_user() failed, user=" << user << " ret=" << ret << dendl;
711
712 /* continuing to next user */
713 continue;
714 }
715 }
716 } while (truncated);
717
718 ret = 0;
719 done:
720 store->ctl()->meta.mgr->list_keys_complete(handle);
721 return ret;
722 }
723
724 void RGWUserStatsCache::data_modified(const rgw_user& user, rgw_bucket& bucket)
725 {
726 /* racy, but it's ok */
727 mutex.lock_shared();
728 bool need_update = modified_buckets.find(bucket) == modified_buckets.end();
729 mutex.unlock_shared();
730
731 if (need_update) {
732 std::unique_lock lock{mutex};
733 modified_buckets[bucket] = user;
734 }
735 }
736
737
738 class RGWQuotaInfoApplier {
739 /* NOTE: no non-static field allowed as instances are supposed to live in
740 * the static memory only. */
741 protected:
742 RGWQuotaInfoApplier() = default;
743
744 public:
745 virtual ~RGWQuotaInfoApplier() {}
746
747 virtual bool is_size_exceeded(const char * const entity,
748 const RGWQuotaInfo& qinfo,
749 const RGWStorageStats& stats,
750 const uint64_t size) const = 0;
751
752 virtual bool is_num_objs_exceeded(const char * const entity,
753 const RGWQuotaInfo& qinfo,
754 const RGWStorageStats& stats,
755 const uint64_t num_objs) const = 0;
756
757 static const RGWQuotaInfoApplier& get_instance(const RGWQuotaInfo& qinfo);
758 };
759
760 class RGWQuotaInfoDefApplier : public RGWQuotaInfoApplier {
761 public:
762 bool is_size_exceeded(const char * const entity,
763 const RGWQuotaInfo& qinfo,
764 const RGWStorageStats& stats,
765 const uint64_t size) const override;
766
767 bool is_num_objs_exceeded(const char * const entity,
768 const RGWQuotaInfo& qinfo,
769 const RGWStorageStats& stats,
770 const uint64_t num_objs) const override;
771 };
772
773 class RGWQuotaInfoRawApplier : public RGWQuotaInfoApplier {
774 public:
775 bool is_size_exceeded(const char * const entity,
776 const RGWQuotaInfo& qinfo,
777 const RGWStorageStats& stats,
778 const uint64_t size) const override;
779
780 bool is_num_objs_exceeded(const char * const entity,
781 const RGWQuotaInfo& qinfo,
782 const RGWStorageStats& stats,
783 const uint64_t num_objs) const override;
784 };
785
786
787 bool RGWQuotaInfoDefApplier::is_size_exceeded(const char * const entity,
788 const RGWQuotaInfo& qinfo,
789 const RGWStorageStats& stats,
790 const uint64_t size) const
791 {
792 if (qinfo.max_size < 0) {
793 /* The limit is not enabled. */
794 return false;
795 }
796
797 const uint64_t cur_size = stats.size_rounded;
798 const uint64_t new_size = rgw_rounded_objsize(size);
799
800 if (cur_size + new_size > static_cast<uint64_t>(qinfo.max_size)) {
801 dout(10) << "quota exceeded: stats.size_rounded=" << stats.size_rounded
802 << " size=" << new_size << " "
803 << entity << "_quota.max_size=" << qinfo.max_size << dendl;
804 return true;
805 }
806
807 return false;
808 }
809
810 bool RGWQuotaInfoDefApplier::is_num_objs_exceeded(const char * const entity,
811 const RGWQuotaInfo& qinfo,
812 const RGWStorageStats& stats,
813 const uint64_t num_objs) const
814 {
815 if (qinfo.max_objects < 0) {
816 /* The limit is not enabled. */
817 return false;
818 }
819
820 if (stats.num_objects + num_objs > static_cast<uint64_t>(qinfo.max_objects)) {
821 dout(10) << "quota exceeded: stats.num_objects=" << stats.num_objects
822 << " " << entity << "_quota.max_objects=" << qinfo.max_objects
823 << dendl;
824 return true;
825 }
826
827 return false;
828 }
829
830 bool RGWQuotaInfoRawApplier::is_size_exceeded(const char * const entity,
831 const RGWQuotaInfo& qinfo,
832 const RGWStorageStats& stats,
833 const uint64_t size) const
834 {
835 if (qinfo.max_size < 0) {
836 /* The limit is not enabled. */
837 return false;
838 }
839
840 const uint64_t cur_size = stats.size;
841
842 if (cur_size + size > static_cast<uint64_t>(qinfo.max_size)) {
843 dout(10) << "quota exceeded: stats.size=" << stats.size
844 << " size=" << size << " "
845 << entity << "_quota.max_size=" << qinfo.max_size << dendl;
846 return true;
847 }
848
849 return false;
850 }
851
852 bool RGWQuotaInfoRawApplier::is_num_objs_exceeded(const char * const entity,
853 const RGWQuotaInfo& qinfo,
854 const RGWStorageStats& stats,
855 const uint64_t num_objs) const
856 {
857 if (qinfo.max_objects < 0) {
858 /* The limit is not enabled. */
859 return false;
860 }
861
862 if (stats.num_objects + num_objs > static_cast<uint64_t>(qinfo.max_objects)) {
863 dout(10) << "quota exceeded: stats.num_objects=" << stats.num_objects
864 << " " << entity << "_quota.max_objects=" << qinfo.max_objects
865 << dendl;
866 return true;
867 }
868
869 return false;
870 }
871
872 const RGWQuotaInfoApplier& RGWQuotaInfoApplier::get_instance(
873 const RGWQuotaInfo& qinfo)
874 {
875 static RGWQuotaInfoDefApplier default_qapplier;
876 static RGWQuotaInfoRawApplier raw_qapplier;
877
878 if (qinfo.check_on_raw) {
879 return raw_qapplier;
880 } else {
881 return default_qapplier;
882 }
883 }
884
885
886 class RGWQuotaHandlerImpl : public RGWQuotaHandler {
887 rgw::sal::RGWRadosStore *store;
888 RGWBucketStatsCache bucket_stats_cache;
889 RGWUserStatsCache user_stats_cache;
890
891 int check_quota(const char * const entity,
892 const RGWQuotaInfo& quota,
893 const RGWStorageStats& stats,
894 const uint64_t num_objs,
895 const uint64_t size) {
896 if (!quota.enabled) {
897 return 0;
898 }
899
900 const auto& quota_applier = RGWQuotaInfoApplier::get_instance(quota);
901
902 ldout(store->ctx(), 20) << entity
903 << " quota: max_objects=" << quota.max_objects
904 << " max_size=" << quota.max_size << dendl;
905
906
907 if (quota_applier.is_num_objs_exceeded(entity, quota, stats, num_objs)) {
908 return -ERR_QUOTA_EXCEEDED;
909 }
910
911 if (quota_applier.is_size_exceeded(entity, quota, stats, size)) {
912 return -ERR_QUOTA_EXCEEDED;
913 }
914
915 ldout(store->ctx(), 20) << entity << " quota OK:"
916 << " stats.num_objects=" << stats.num_objects
917 << " stats.size=" << stats.size << dendl;
918 return 0;
919 }
920 public:
921 RGWQuotaHandlerImpl(rgw::sal::RGWRadosStore *_store, bool quota_threads) : store(_store),
922 bucket_stats_cache(_store),
923 user_stats_cache(_store, quota_threads) {}
924
925 int check_quota(const rgw_user& user,
926 rgw_bucket& bucket,
927 RGWQuotaInfo& user_quota,
928 RGWQuotaInfo& bucket_quota,
929 uint64_t num_objs,
930 uint64_t size) override {
931
932 if (!bucket_quota.enabled && !user_quota.enabled) {
933 return 0;
934 }
935
936 /*
937 * we need to fetch bucket stats if the user quota is enabled, because
938 * the whole system relies on us periodically updating the user's bucket
939 * stats in the user's header, this happens in get_stats() if we actually
940 * fetch that info and not rely on cached data
941 */
942
943 if (bucket_quota.enabled) {
944 RGWStorageStats bucket_stats;
945 int ret = bucket_stats_cache.get_stats(user, bucket, bucket_stats,
946 bucket_quota);
947 if (ret < 0) {
948 return ret;
949 }
950 ret = check_quota("bucket", bucket_quota, bucket_stats, num_objs, size);
951 if (ret < 0) {
952 return ret;
953 }
954 }
955
956 if (user_quota.enabled) {
957 RGWStorageStats user_stats;
958 int ret = user_stats_cache.get_stats(user, bucket, user_stats, user_quota);
959 if (ret < 0) {
960 return ret;
961 }
962 ret = check_quota("user", user_quota, user_stats, num_objs, size);
963 if (ret < 0) {
964 return ret;
965 }
966 }
967 return 0;
968 }
969
970 void update_stats(const rgw_user& user, rgw_bucket& bucket, int obj_delta, uint64_t added_bytes, uint64_t removed_bytes) override {
971 bucket_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes);
972 user_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes);
973 }
974
975 void check_bucket_shards(uint64_t max_objs_per_shard, uint64_t num_shards,
976 uint64_t num_objs, bool& need_resharding, uint32_t *suggested_num_shards) override
977 {
978 if (num_objs > num_shards * max_objs_per_shard) {
979 ldout(store->ctx(), 0) << __func__ << ": resharding needed: stats.num_objects=" << num_objs
980 << " shard max_objects=" << max_objs_per_shard * num_shards << dendl;
981 need_resharding = true;
982 if (suggested_num_shards) {
983 *suggested_num_shards = num_objs * 2 / max_objs_per_shard;
984 }
985 } else {
986 need_resharding = false;
987 }
988 }
989 };
990
991
992 RGWQuotaHandler *RGWQuotaHandler::generate_handler(rgw::sal::RGWRadosStore *store, bool quota_threads)
993 {
994 return new RGWQuotaHandlerImpl(store, quota_threads);
995 }
996
997 void RGWQuotaHandler::free_handler(RGWQuotaHandler *handler)
998 {
999 delete handler;
1000 }
1001
1002
1003 void rgw_apply_default_bucket_quota(RGWQuotaInfo& quota, const ConfigProxy& conf)
1004 {
1005 if (conf->rgw_bucket_default_quota_max_objects >= 0) {
1006 quota.max_objects = conf->rgw_bucket_default_quota_max_objects;
1007 quota.enabled = true;
1008 }
1009 if (conf->rgw_bucket_default_quota_max_size >= 0) {
1010 quota.max_size = conf->rgw_bucket_default_quota_max_size;
1011 quota.enabled = true;
1012 }
1013 }
1014
1015 void rgw_apply_default_user_quota(RGWQuotaInfo& quota, const ConfigProxy& conf)
1016 {
1017 if (conf->rgw_user_default_quota_max_objects >= 0) {
1018 quota.max_objects = conf->rgw_user_default_quota_max_objects;
1019 quota.enabled = true;
1020 }
1021 if (conf->rgw_user_default_quota_max_size >= 0) {
1022 quota.max_size = conf->rgw_user_default_quota_max_size;
1023 quota.enabled = true;
1024 }
1025 }