]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_quota.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rgw / rgw_quota.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 /*
5 * Ceph - scalable distributed file system
6 *
7 * Copyright (C) 2013 Inktank, Inc
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16
17 #include "include/utime.h"
18 #include "common/lru_map.h"
19 #include "common/RefCountedObj.h"
20 #include "common/Thread.h"
21 #include "common/ceph_mutex.h"
22
23 #include "rgw_common.h"
24 #include "rgw_sal.h"
25 #include "rgw_sal_rados.h"
26 #include "rgw_quota.h"
27 #include "rgw_bucket.h"
28 #include "rgw_user.h"
29
30 #include "services/svc_sys_obj.h"
31 #include "services/svc_meta.h"
32
33 #include <atomic>
34
35 #define dout_context g_ceph_context
36 #define dout_subsys ceph_subsys_rgw
37
38 using namespace std;
39
40 struct RGWQuotaCacheStats {
41 RGWStorageStats stats;
42 utime_t expiration;
43 utime_t async_refresh_time;
44 };
45
46 template<class T>
47 class RGWQuotaCache {
48 protected:
49 rgw::sal::Driver* driver;
50 lru_map<T, RGWQuotaCacheStats> stats_map;
51 RefCountedWaitObject *async_refcount;
52
53 class StatsAsyncTestSet : public lru_map<T, RGWQuotaCacheStats>::UpdateContext {
54 int objs_delta;
55 uint64_t added_bytes;
56 uint64_t removed_bytes;
57 public:
58 StatsAsyncTestSet() : objs_delta(0), added_bytes(0), removed_bytes(0) {}
59 bool update(RGWQuotaCacheStats *entry) override {
60 if (entry->async_refresh_time.sec() == 0)
61 return false;
62
63 entry->async_refresh_time = utime_t(0, 0);
64
65 return true;
66 }
67 };
68
69 virtual int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider *dpp) = 0;
70
71 virtual bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0;
72
73 virtual bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, typename lru_map<T, RGWQuotaCacheStats>::UpdateContext *ctx) = 0;
74 virtual void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0;
75
76 virtual void data_modified(const rgw_user& user, rgw_bucket& bucket) {}
77 public:
78 RGWQuotaCache(rgw::sal::Driver* _driver, int size) : driver(_driver), stats_map(size) {
79 async_refcount = new RefCountedWaitObject;
80 }
81 virtual ~RGWQuotaCache() {
82 async_refcount->put_wait(); /* wait for all pending async requests to complete */
83 }
84
85 int get_stats(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, optional_yield y,
86 const DoutPrefixProvider* dpp);
87 void adjust_stats(const rgw_user& user, rgw_bucket& bucket, int objs_delta, uint64_t added_bytes, uint64_t removed_bytes);
88
89 void set_stats(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats);
90 int async_refresh(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs);
91 void async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats);
92 void async_refresh_fail(const rgw_user& user, rgw_bucket& bucket);
93
94 class AsyncRefreshHandler {
95 protected:
96 rgw::sal::Driver* driver;
97 RGWQuotaCache<T> *cache;
98 public:
99 AsyncRefreshHandler(rgw::sal::Driver* _driver, RGWQuotaCache<T> *_cache) : driver(_driver), cache(_cache) {}
100 virtual ~AsyncRefreshHandler() {}
101
102 virtual int init_fetch() = 0;
103 virtual void drop_reference() = 0;
104 };
105
106 virtual AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) = 0;
107 };
108
109 template<class T>
110 int RGWQuotaCache<T>::async_refresh(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs)
111 {
112 /* protect against multiple updates */
113 StatsAsyncTestSet test_update;
114 if (!map_find_and_update(user, bucket, &test_update)) {
115 /* most likely we just raced with another update */
116 return 0;
117 }
118
119 async_refcount->get();
120
121
122 AsyncRefreshHandler *handler = allocate_refresh_handler(user, bucket);
123
124 int ret = handler->init_fetch();
125 if (ret < 0) {
126 async_refcount->put();
127 handler->drop_reference();
128 return ret;
129 }
130
131 return 0;
132 }
133
134 template<class T>
135 void RGWQuotaCache<T>::async_refresh_fail(const rgw_user& user, rgw_bucket& bucket)
136 {
137 ldout(driver->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl;
138
139 async_refcount->put();
140 }
141
142 template<class T>
143 void RGWQuotaCache<T>::async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats)
144 {
145 ldout(driver->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl;
146
147 RGWQuotaCacheStats qs;
148
149 map_find(user, bucket, qs);
150
151 set_stats(user, bucket, qs, stats);
152
153 async_refcount->put();
154 }
155
156 template<class T>
157 void RGWQuotaCache<T>::set_stats(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats)
158 {
159 qs.stats = stats;
160 qs.expiration = ceph_clock_now();
161 qs.async_refresh_time = qs.expiration;
162 qs.expiration += driver->ctx()->_conf->rgw_bucket_quota_ttl;
163 qs.async_refresh_time += driver->ctx()->_conf->rgw_bucket_quota_ttl / 2;
164
165 map_add(user, bucket, qs);
166 }
167
168 template<class T>
169 int RGWQuotaCache<T>::get_stats(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider* dpp) {
170 RGWQuotaCacheStats qs;
171 utime_t now = ceph_clock_now();
172 if (map_find(user, bucket, qs)) {
173 if (qs.async_refresh_time.sec() > 0 && now >= qs.async_refresh_time) {
174 int r = async_refresh(user, bucket, qs);
175 if (r < 0) {
176 ldpp_dout(dpp, 0) << "ERROR: quota async refresh returned ret=" << r << dendl;
177
178 /* continue processing, might be a transient error, async refresh is just optimization */
179 }
180 }
181
182 if (qs.expiration > ceph_clock_now()) {
183 stats = qs.stats;
184 return 0;
185 }
186 }
187
188 int ret = fetch_stats_from_storage(user, bucket, stats, y, dpp);
189 if (ret < 0 && ret != -ENOENT)
190 return ret;
191
192 set_stats(user, bucket, qs, stats);
193
194 return 0;
195 }
196
197
198 template<class T>
199 class RGWQuotaStatsUpdate : public lru_map<T, RGWQuotaCacheStats>::UpdateContext {
200 const int objs_delta;
201 const uint64_t added_bytes;
202 const uint64_t removed_bytes;
203 public:
204 RGWQuotaStatsUpdate(const int objs_delta,
205 const uint64_t added_bytes,
206 const uint64_t removed_bytes)
207 : objs_delta(objs_delta),
208 added_bytes(added_bytes),
209 removed_bytes(removed_bytes) {
210 }
211
212 bool update(RGWQuotaCacheStats * const entry) override {
213 const uint64_t rounded_added = rgw_rounded_objsize(added_bytes);
214 const uint64_t rounded_removed = rgw_rounded_objsize(removed_bytes);
215
216 if (((int64_t)(entry->stats.size + added_bytes - removed_bytes)) >= 0) {
217 entry->stats.size += added_bytes - removed_bytes;
218 } else {
219 entry->stats.size = 0;
220 }
221
222 if (((int64_t)(entry->stats.size_rounded + rounded_added - rounded_removed)) >= 0) {
223 entry->stats.size_rounded += rounded_added - rounded_removed;
224 } else {
225 entry->stats.size_rounded = 0;
226 }
227
228 if (((int64_t)(entry->stats.num_objects + objs_delta)) >= 0) {
229 entry->stats.num_objects += objs_delta;
230 } else {
231 entry->stats.num_objects = 0;
232 }
233
234 return true;
235 }
236 };
237
238
239 template<class T>
240 void RGWQuotaCache<T>::adjust_stats(const rgw_user& user, rgw_bucket& bucket, int objs_delta,
241 uint64_t added_bytes, uint64_t removed_bytes)
242 {
243 RGWQuotaStatsUpdate<T> update(objs_delta, added_bytes, removed_bytes);
244 map_find_and_update(user, bucket, &update);
245
246 data_modified(user, bucket);
247 }
248
249 class BucketAsyncRefreshHandler : public RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler,
250 public RGWGetBucketStats_CB {
251 rgw_user user;
252 public:
253 BucketAsyncRefreshHandler(rgw::sal::Driver* _driver, RGWQuotaCache<rgw_bucket> *_cache,
254 const rgw_user& _user, const rgw_bucket& _bucket) :
255 RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler(_driver, _cache),
256 RGWGetBucketStats_CB(_bucket), user(_user) {}
257
258 void drop_reference() override { put(); }
259 void handle_response(int r) override;
260 int init_fetch() override;
261 };
262
263 int BucketAsyncRefreshHandler::init_fetch()
264 {
265 std::unique_ptr<rgw::sal::Bucket> rbucket;
266
267 const DoutPrefix dp(driver->ctx(), dout_subsys, "rgw bucket async refresh handler: ");
268 int r = driver->get_bucket(&dp, nullptr, bucket, &rbucket, null_yield);
269 if (r < 0) {
270 ldpp_dout(&dp, 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl;
271 return r;
272 }
273
274 ldpp_dout(&dp, 20) << "initiating async quota refresh for bucket=" << bucket << dendl;
275
276 const auto& index = rbucket->get_info().get_current_index();
277 if (is_layout_indexless(index)) {
278 return 0;
279 }
280
281 r = rbucket->read_stats_async(&dp, index, RGW_NO_SHARD, this);
282 if (r < 0) {
283 ldpp_dout(&dp, 0) << "could not get bucket info for bucket=" << bucket.name << dendl;
284
285 /* read_stats_async() dropped our reference already */
286 return r;
287 }
288
289 return 0;
290 }
291
292 void BucketAsyncRefreshHandler::handle_response(const int r)
293 {
294 if (r < 0) {
295 ldout(driver->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
296 cache->async_refresh_fail(user, bucket);
297 return;
298 }
299
300 RGWStorageStats bs;
301
302 for (const auto& pair : *stats) {
303 const RGWStorageStats& s = pair.second;
304
305 bs.size += s.size;
306 bs.size_rounded += s.size_rounded;
307 bs.num_objects += s.num_objects;
308 }
309
310 cache->async_refresh_response(user, bucket, bs);
311 }
312
313 class RGWBucketStatsCache : public RGWQuotaCache<rgw_bucket> {
314 protected:
315 bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
316 return stats_map.find(bucket, qs);
317 }
318
319 bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, lru_map<rgw_bucket, RGWQuotaCacheStats>::UpdateContext *ctx) override {
320 return stats_map.find_and_update(bucket, NULL, ctx);
321 }
322
323 void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
324 stats_map.add(bucket, qs);
325 }
326
327 int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider *dpp) override;
328
329 public:
330 explicit RGWBucketStatsCache(rgw::sal::Driver* _driver) : RGWQuotaCache<rgw_bucket>(_driver, _driver->ctx()->_conf->rgw_bucket_quota_cache_size) {
331 }
332
333 AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override {
334 return new BucketAsyncRefreshHandler(driver, this, user, bucket);
335 }
336 };
337
338 int RGWBucketStatsCache::fetch_stats_from_storage(const rgw_user& _u, const rgw_bucket& _b, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider *dpp)
339 {
340 std::unique_ptr<rgw::sal::User> user = driver->get_user(_u);
341 std::unique_ptr<rgw::sal::Bucket> bucket;
342
343 int r = driver->get_bucket(dpp, user.get(), _b, &bucket, y);
344 if (r < 0) {
345 ldpp_dout(dpp, 0) << "could not get bucket info for bucket=" << _b << " r=" << r << dendl;
346 return r;
347 }
348
349 stats = RGWStorageStats();
350
351 const auto& index = bucket->get_info().get_current_index();
352 if (is_layout_indexless(index)) {
353 return 0;
354 }
355
356 string bucket_ver;
357 string master_ver;
358
359 map<RGWObjCategory, RGWStorageStats> bucket_stats;
360 r = bucket->read_stats(dpp, index, RGW_NO_SHARD, &bucket_ver,
361 &master_ver, bucket_stats, nullptr);
362 if (r < 0) {
363 ldpp_dout(dpp, 0) << "could not get bucket stats for bucket="
364 << _b.name << dendl;
365 return r;
366 }
367
368 for (const auto& pair : bucket_stats) {
369 const RGWStorageStats& s = pair.second;
370
371 stats.size += s.size;
372 stats.size_rounded += s.size_rounded;
373 stats.num_objects += s.num_objects;
374 }
375
376 return 0;
377 }
378
379 class UserAsyncRefreshHandler : public RGWQuotaCache<rgw_user>::AsyncRefreshHandler,
380 public RGWGetUserStats_CB {
381 const DoutPrefixProvider *dpp;
382 rgw_bucket bucket;
383 public:
384 UserAsyncRefreshHandler(const DoutPrefixProvider *_dpp, rgw::sal::Driver* _driver, RGWQuotaCache<rgw_user> *_cache,
385 const rgw_user& _user, const rgw_bucket& _bucket) :
386 RGWQuotaCache<rgw_user>::AsyncRefreshHandler(_driver, _cache),
387 RGWGetUserStats_CB(_user),
388 dpp(_dpp),
389 bucket(_bucket) {}
390
391 void drop_reference() override { put(); }
392 int init_fetch() override;
393 void handle_response(int r) override;
394 };
395
396 int UserAsyncRefreshHandler::init_fetch()
397 {
398 std::unique_ptr<rgw::sal::User> ruser = driver->get_user(user);
399
400 ldpp_dout(dpp, 20) << "initiating async quota refresh for user=" << user << dendl;
401 int r = ruser->read_stats_async(dpp, this);
402 if (r < 0) {
403 ldpp_dout(dpp, 0) << "could not get bucket info for user=" << user << dendl;
404
405 /* get_bucket_stats_async() dropped our reference already */
406 return r;
407 }
408
409 return 0;
410 }
411
412 void UserAsyncRefreshHandler::handle_response(int r)
413 {
414 if (r < 0) {
415 ldout(driver->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
416 cache->async_refresh_fail(user, bucket);
417 return;
418 }
419
420 cache->async_refresh_response(user, bucket, stats);
421 }
422
423 class RGWUserStatsCache : public RGWQuotaCache<rgw_user> {
424 const DoutPrefixProvider *dpp;
425 std::atomic<bool> down_flag = { false };
426 ceph::shared_mutex mutex = ceph::make_shared_mutex("RGWUserStatsCache");
427 map<rgw_bucket, rgw_user> modified_buckets;
428
429 /* thread, sync recent modified buckets info */
430 class BucketsSyncThread : public Thread {
431 CephContext *cct;
432 RGWUserStatsCache *stats;
433
434 ceph::mutex lock = ceph::make_mutex("RGWUserStatsCache::BucketsSyncThread");
435 ceph::condition_variable cond;
436 public:
437
438 BucketsSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s) {}
439
440 void *entry() override {
441 ldout(cct, 20) << "BucketsSyncThread: start" << dendl;
442 do {
443 map<rgw_bucket, rgw_user> buckets;
444
445 stats->swap_modified_buckets(buckets);
446
447 for (map<rgw_bucket, rgw_user>::iterator iter = buckets.begin(); iter != buckets.end(); ++iter) {
448 rgw_bucket bucket = iter->first;
449 rgw_user& user = iter->second;
450 ldout(cct, 20) << "BucketsSyncThread: sync user=" << user << " bucket=" << bucket << dendl;
451 const DoutPrefix dp(cct, dout_subsys, "rgw bucket sync thread: ");
452 int r = stats->sync_bucket(user, bucket, null_yield, &dp);
453 if (r < 0) {
454 ldout(cct, 0) << "WARNING: sync_bucket() returned r=" << r << dendl;
455 }
456 }
457
458 if (stats->going_down())
459 break;
460
461 std::unique_lock locker{lock};
462 cond.wait_for(
463 locker,
464 std::chrono::seconds(cct->_conf->rgw_user_quota_bucket_sync_interval));
465 } while (!stats->going_down());
466 ldout(cct, 20) << "BucketsSyncThread: done" << dendl;
467
468 return NULL;
469 }
470
471 void stop() {
472 std::lock_guard l{lock};
473 cond.notify_all();
474 }
475 };
476
477 /*
478 * thread, full sync all users stats periodically
479 *
480 * only sync non idle users or ones that never got synced before, this is needed so that
481 * users that didn't have quota turned on before (or existed before the user objclass
482 * tracked stats) need to get their backend stats up to date.
483 */
484 class UserSyncThread : public Thread {
485 CephContext *cct;
486 RGWUserStatsCache *stats;
487
488 ceph::mutex lock = ceph::make_mutex("RGWUserStatsCache::UserSyncThread");
489 ceph::condition_variable cond;
490 public:
491
492 UserSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s) {}
493
494 void *entry() override {
495 ldout(cct, 20) << "UserSyncThread: start" << dendl;
496 do {
497 const DoutPrefix dp(cct, dout_subsys, "rgw user sync thread: ");
498 int ret = stats->sync_all_users(&dp, null_yield);
499 if (ret < 0) {
500 ldout(cct, 5) << "ERROR: sync_all_users() returned ret=" << ret << dendl;
501 }
502
503 if (stats->going_down())
504 break;
505
506 std::unique_lock l{lock};
507 cond.wait_for(l, std::chrono::seconds(cct->_conf->rgw_user_quota_sync_interval));
508 } while (!stats->going_down());
509 ldout(cct, 20) << "UserSyncThread: done" << dendl;
510
511 return NULL;
512 }
513
514 void stop() {
515 std::lock_guard l{lock};
516 cond.notify_all();
517 }
518 };
519
520 BucketsSyncThread *buckets_sync_thread;
521 UserSyncThread *user_sync_thread;
522 protected:
523 bool map_find(const rgw_user& user,const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
524 return stats_map.find(user, qs);
525 }
526
527 bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, lru_map<rgw_user, RGWQuotaCacheStats>::UpdateContext *ctx) override {
528 return stats_map.find_and_update(user, NULL, ctx);
529 }
530
531 void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
532 stats_map.add(user, qs);
533 }
534
535 int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider *dpp) override;
536 int sync_bucket(const rgw_user& rgw_user, rgw_bucket& bucket, optional_yield y, const DoutPrefixProvider *dpp);
537 int sync_user(const DoutPrefixProvider *dpp, const rgw_user& user, optional_yield y);
538 int sync_all_users(const DoutPrefixProvider *dpp, optional_yield y);
539
540 void data_modified(const rgw_user& user, rgw_bucket& bucket) override;
541
542 void swap_modified_buckets(map<rgw_bucket, rgw_user>& out) {
543 std::unique_lock lock{mutex};
544 modified_buckets.swap(out);
545 }
546
547 template<class T> /* easier doing it as a template, Thread doesn't have ->stop() */
548 void stop_thread(T **pthr) {
549 T *thread = *pthr;
550 if (!thread)
551 return;
552
553 thread->stop();
554 thread->join();
555 delete thread;
556 *pthr = NULL;
557 }
558
559 public:
560 RGWUserStatsCache(const DoutPrefixProvider *dpp, rgw::sal::Driver* _driver, bool quota_threads)
561 : RGWQuotaCache<rgw_user>(_driver, _driver->ctx()->_conf->rgw_bucket_quota_cache_size), dpp(dpp)
562 {
563 if (quota_threads) {
564 buckets_sync_thread = new BucketsSyncThread(driver->ctx(), this);
565 buckets_sync_thread->create("rgw_buck_st_syn");
566 user_sync_thread = new UserSyncThread(driver->ctx(), this);
567 user_sync_thread->create("rgw_user_st_syn");
568 } else {
569 buckets_sync_thread = NULL;
570 user_sync_thread = NULL;
571 }
572 }
573 ~RGWUserStatsCache() override {
574 stop();
575 }
576
577 AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override {
578 return new UserAsyncRefreshHandler(dpp, driver, this, user, bucket);
579 }
580
581 bool going_down() {
582 return down_flag;
583 }
584
585 void stop() {
586 down_flag = true;
587 {
588 std::unique_lock lock{mutex};
589 stop_thread(&buckets_sync_thread);
590 }
591 stop_thread(&user_sync_thread);
592 }
593 };
594
595 int RGWUserStatsCache::fetch_stats_from_storage(const rgw_user& _u,
596 const rgw_bucket& _b,
597 RGWStorageStats& stats,
598 optional_yield y,
599 const DoutPrefixProvider *dpp)
600 {
601 std::unique_ptr<rgw::sal::User> user = driver->get_user(_u);
602 int r = user->read_stats(dpp, y, &stats);
603 if (r < 0) {
604 ldpp_dout(dpp, 0) << "could not get user stats for user=" << user << dendl;
605 return r;
606 }
607
608 return 0;
609 }
610
611 int RGWUserStatsCache::sync_bucket(const rgw_user& _u, rgw_bucket& _b, optional_yield y, const DoutPrefixProvider *dpp)
612 {
613 std::unique_ptr<rgw::sal::User> user = driver->get_user(_u);
614 std::unique_ptr<rgw::sal::Bucket> bucket;
615
616 int r = driver->get_bucket(dpp, user.get(), _b, &bucket, y);
617 if (r < 0) {
618 ldpp_dout(dpp, 0) << "could not get bucket info for bucket=" << _b << " r=" << r << dendl;
619 return r;
620 }
621
622 r = bucket->sync_user_stats(dpp, y);
623 if (r < 0) {
624 ldpp_dout(dpp, 0) << "ERROR: sync_user_stats() for user=" << _u << ", bucket=" << bucket << " returned " << r << dendl;
625 return r;
626 }
627
628 return bucket->check_bucket_shards(dpp);
629 }
630
631 int RGWUserStatsCache::sync_user(const DoutPrefixProvider *dpp, const rgw_user& _u, optional_yield y)
632 {
633 RGWStorageStats stats;
634 ceph::real_time last_stats_sync;
635 ceph::real_time last_stats_update;
636 std::unique_ptr<rgw::sal::User> user = driver->get_user(rgw_user(_u.to_str()));
637
638 int ret = user->read_stats(dpp, y, &stats, &last_stats_sync, &last_stats_update);
639 if (ret < 0) {
640 ldpp_dout(dpp, 5) << "ERROR: can't read user header: ret=" << ret << dendl;
641 return ret;
642 }
643
644 if (!driver->ctx()->_conf->rgw_user_quota_sync_idle_users &&
645 last_stats_update < last_stats_sync) {
646 ldpp_dout(dpp, 20) << "user is idle, not doing a full sync (user=" << user << ")" << dendl;
647 return 0;
648 }
649
650 real_time when_need_full_sync = last_stats_sync;
651 when_need_full_sync += make_timespan(driver->ctx()->_conf->rgw_user_quota_sync_wait_time);
652
653 // check if enough time passed since last full sync
654 /* FIXME: missing check? */
655
656 ret = rgw_user_sync_all_stats(dpp, driver, user.get(), y);
657 if (ret < 0) {
658 ldpp_dout(dpp, 0) << "ERROR: failed user stats sync, ret=" << ret << dendl;
659 return ret;
660 }
661
662 return 0;
663 }
664
665 int RGWUserStatsCache::sync_all_users(const DoutPrefixProvider *dpp, optional_yield y)
666 {
667 string key = "user";
668 void *handle;
669
670 int ret = driver->meta_list_keys_init(dpp, key, string(), &handle);
671 if (ret < 0) {
672 ldpp_dout(dpp, 10) << "ERROR: can't get key: ret=" << ret << dendl;
673 return ret;
674 }
675
676 bool truncated;
677 int max = 1000;
678
679 do {
680 list<string> keys;
681 ret = driver->meta_list_keys_next(dpp, handle, max, keys, &truncated);
682 if (ret < 0) {
683 ldpp_dout(dpp, 0) << "ERROR: lists_keys_next(): ret=" << ret << dendl;
684 goto done;
685 }
686 for (list<string>::iterator iter = keys.begin();
687 iter != keys.end() && !going_down();
688 ++iter) {
689 rgw_user user(*iter);
690 ldpp_dout(dpp, 20) << "RGWUserStatsCache: sync user=" << user << dendl;
691 int ret = sync_user(dpp, user, y);
692 if (ret < 0) {
693 ldpp_dout(dpp, 5) << "ERROR: sync_user() failed, user=" << user << " ret=" << ret << dendl;
694
695 /* continuing to next user */
696 continue;
697 }
698 }
699 } while (truncated);
700
701 ret = 0;
702 done:
703 driver->meta_list_keys_complete(handle);
704 return ret;
705 }
706
707 void RGWUserStatsCache::data_modified(const rgw_user& user, rgw_bucket& bucket)
708 {
709 /* racy, but it's ok */
710 mutex.lock_shared();
711 bool need_update = modified_buckets.find(bucket) == modified_buckets.end();
712 mutex.unlock_shared();
713
714 if (need_update) {
715 std::unique_lock lock{mutex};
716 modified_buckets[bucket] = user;
717 }
718 }
719
720
721 class RGWQuotaInfoApplier {
722 /* NOTE: no non-static field allowed as instances are supposed to live in
723 * the static memory only. */
724 protected:
725 RGWQuotaInfoApplier() = default;
726
727 public:
728 virtual ~RGWQuotaInfoApplier() {}
729
730 virtual bool is_size_exceeded(const DoutPrefixProvider *dpp,
731 const char * const entity,
732 const RGWQuotaInfo& qinfo,
733 const RGWStorageStats& stats,
734 const uint64_t size) const = 0;
735
736 virtual bool is_num_objs_exceeded(const DoutPrefixProvider *dpp,
737 const char * const entity,
738 const RGWQuotaInfo& qinfo,
739 const RGWStorageStats& stats,
740 const uint64_t num_objs) const = 0;
741
742 static const RGWQuotaInfoApplier& get_instance(const RGWQuotaInfo& qinfo);
743 };
744
745 class RGWQuotaInfoDefApplier : public RGWQuotaInfoApplier {
746 public:
747 bool is_size_exceeded(const DoutPrefixProvider *dpp, const char * const entity,
748 const RGWQuotaInfo& qinfo,
749 const RGWStorageStats& stats,
750 const uint64_t size) const override;
751
752 bool is_num_objs_exceeded(const DoutPrefixProvider *dpp, const char * const entity,
753 const RGWQuotaInfo& qinfo,
754 const RGWStorageStats& stats,
755 const uint64_t num_objs) const override;
756 };
757
758 class RGWQuotaInfoRawApplier : public RGWQuotaInfoApplier {
759 public:
760 bool is_size_exceeded(const DoutPrefixProvider *dpp, const char * const entity,
761 const RGWQuotaInfo& qinfo,
762 const RGWStorageStats& stats,
763 const uint64_t size) const override;
764
765 bool is_num_objs_exceeded(const DoutPrefixProvider *dpp, const char * const entity,
766 const RGWQuotaInfo& qinfo,
767 const RGWStorageStats& stats,
768 const uint64_t num_objs) const override;
769 };
770
771
772 bool RGWQuotaInfoDefApplier::is_size_exceeded(const DoutPrefixProvider *dpp,
773 const char * const entity,
774 const RGWQuotaInfo& qinfo,
775 const RGWStorageStats& stats,
776 const uint64_t size) const
777 {
778 if (qinfo.max_size < 0) {
779 /* The limit is not enabled. */
780 return false;
781 }
782
783 const uint64_t cur_size = stats.size_rounded;
784 const uint64_t new_size = rgw_rounded_objsize(size);
785
786 if (std::cmp_greater(cur_size + new_size, qinfo.max_size)) {
787 ldpp_dout(dpp, 10) << "quota exceeded: stats.size_rounded=" << stats.size_rounded
788 << " size=" << new_size << " "
789 << entity << "_quota.max_size=" << qinfo.max_size << dendl;
790 return true;
791 }
792
793 return false;
794 }
795
796 bool RGWQuotaInfoDefApplier::is_num_objs_exceeded(const DoutPrefixProvider *dpp,
797 const char * const entity,
798 const RGWQuotaInfo& qinfo,
799 const RGWStorageStats& stats,
800 const uint64_t num_objs) const
801 {
802 if (qinfo.max_objects < 0) {
803 /* The limit is not enabled. */
804 return false;
805 }
806
807 if (std::cmp_greater(stats.num_objects + num_objs, qinfo.max_objects)) {
808 ldpp_dout(dpp, 10) << "quota exceeded: stats.num_objects=" << stats.num_objects
809 << " " << entity << "_quota.max_objects=" << qinfo.max_objects
810 << dendl;
811 return true;
812 }
813
814 return false;
815 }
816
817 bool RGWQuotaInfoRawApplier::is_size_exceeded(const DoutPrefixProvider *dpp,
818 const char * const entity,
819 const RGWQuotaInfo& qinfo,
820 const RGWStorageStats& stats,
821 const uint64_t size) const
822 {
823 if (qinfo.max_size < 0) {
824 /* The limit is not enabled. */
825 return false;
826 }
827
828 const uint64_t cur_size = stats.size;
829
830 if (std::cmp_greater(cur_size + size, qinfo.max_size)) {
831 ldpp_dout(dpp, 10) << "quota exceeded: stats.size=" << stats.size
832 << " size=" << size << " "
833 << entity << "_quota.max_size=" << qinfo.max_size << dendl;
834 return true;
835 }
836
837 return false;
838 }
839
840 bool RGWQuotaInfoRawApplier::is_num_objs_exceeded(const DoutPrefixProvider *dpp,
841 const char * const entity,
842 const RGWQuotaInfo& qinfo,
843 const RGWStorageStats& stats,
844 const uint64_t num_objs) const
845 {
846 if (qinfo.max_objects < 0) {
847 /* The limit is not enabled. */
848 return false;
849 }
850
851 if (std::cmp_greater(stats.num_objects + num_objs, qinfo.max_objects)) {
852 ldpp_dout(dpp, 10) << "quota exceeded: stats.num_objects=" << stats.num_objects
853 << " " << entity << "_quota.max_objects=" << qinfo.max_objects
854 << dendl;
855 return true;
856 }
857
858 return false;
859 }
860
861 const RGWQuotaInfoApplier& RGWQuotaInfoApplier::get_instance(
862 const RGWQuotaInfo& qinfo)
863 {
864 static RGWQuotaInfoDefApplier default_qapplier;
865 static RGWQuotaInfoRawApplier raw_qapplier;
866
867 if (qinfo.check_on_raw) {
868 return raw_qapplier;
869 } else {
870 return default_qapplier;
871 }
872 }
873
874
875 class RGWQuotaHandlerImpl : public RGWQuotaHandler {
876 rgw::sal::Driver* driver;
877 RGWBucketStatsCache bucket_stats_cache;
878 RGWUserStatsCache user_stats_cache;
879
880 int check_quota(const DoutPrefixProvider *dpp,
881 const char * const entity,
882 const RGWQuotaInfo& quota,
883 const RGWStorageStats& stats,
884 const uint64_t num_objs,
885 const uint64_t size) {
886 if (!quota.enabled) {
887 return 0;
888 }
889
890 const auto& quota_applier = RGWQuotaInfoApplier::get_instance(quota);
891
892 ldpp_dout(dpp, 20) << entity
893 << " quota: max_objects=" << quota.max_objects
894 << " max_size=" << quota.max_size << dendl;
895
896
897 if (quota_applier.is_num_objs_exceeded(dpp, entity, quota, stats, num_objs)) {
898 return -ERR_QUOTA_EXCEEDED;
899 }
900
901 if (quota_applier.is_size_exceeded(dpp, entity, quota, stats, size)) {
902 return -ERR_QUOTA_EXCEEDED;
903 }
904
905 ldpp_dout(dpp, 20) << entity << " quota OK:"
906 << " stats.num_objects=" << stats.num_objects
907 << " stats.size=" << stats.size << dendl;
908 return 0;
909 }
910 public:
911 RGWQuotaHandlerImpl(const DoutPrefixProvider *dpp, rgw::sal::Driver* _driver, bool quota_threads) : driver(_driver),
912 bucket_stats_cache(_driver),
913 user_stats_cache(dpp, _driver, quota_threads) {}
914
915 int check_quota(const DoutPrefixProvider *dpp,
916 const rgw_user& user,
917 rgw_bucket& bucket,
918 RGWQuota& quota,
919 uint64_t num_objs,
920 uint64_t size, optional_yield y) override {
921
922 if (!quota.bucket_quota.enabled && !quota.user_quota.enabled) {
923 return 0;
924 }
925
926 /*
927 * we need to fetch bucket stats if the user quota is enabled, because
928 * the whole system relies on us periodically updating the user's bucket
929 * stats in the user's header, this happens in get_stats() if we actually
930 * fetch that info and not rely on cached data
931 */
932
933 const DoutPrefix dp(driver->ctx(), dout_subsys, "rgw quota handler: ");
934 if (quota.bucket_quota.enabled) {
935 RGWStorageStats bucket_stats;
936 int ret = bucket_stats_cache.get_stats(user, bucket, bucket_stats, y, &dp);
937 if (ret < 0) {
938 return ret;
939 }
940 ret = check_quota(dpp, "bucket", quota.bucket_quota, bucket_stats, num_objs, size);
941 if (ret < 0) {
942 return ret;
943 }
944 }
945
946 if (quota.user_quota.enabled) {
947 RGWStorageStats user_stats;
948 int ret = user_stats_cache.get_stats(user, bucket, user_stats, y, &dp);
949 if (ret < 0) {
950 return ret;
951 }
952 ret = check_quota(dpp, "user", quota.user_quota, user_stats, num_objs, size);
953 if (ret < 0) {
954 return ret;
955 }
956 }
957 return 0;
958 }
959
960 void update_stats(const rgw_user& user, rgw_bucket& bucket, int obj_delta, uint64_t added_bytes, uint64_t removed_bytes) override {
961 bucket_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes);
962 user_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes);
963 }
964
965 void check_bucket_shards(const DoutPrefixProvider *dpp, uint64_t max_objs_per_shard,
966 uint64_t num_shards, uint64_t num_objs, bool is_multisite,
967 bool& need_resharding, uint32_t *suggested_num_shards) override
968 {
969 if (num_objs > num_shards * max_objs_per_shard) {
970 ldpp_dout(dpp, 0) << __func__ << ": resharding needed: stats.num_objects=" << num_objs
971 << " shard max_objects=" << max_objs_per_shard * num_shards << dendl;
972 need_resharding = true;
973 if (suggested_num_shards) {
974 uint32_t obj_multiplier = 2;
975 if (is_multisite) {
976 // if we're maintaining bilogs for multisite, reshards are significantly
977 // more expensive. scale up the shard count much faster to minimize the
978 // number of reshard events during a write workload
979 obj_multiplier = 8;
980 }
981 *suggested_num_shards = num_objs * obj_multiplier / max_objs_per_shard;
982 }
983 } else {
984 need_resharding = false;
985 }
986 }
987 };
988
989
990 RGWQuotaHandler *RGWQuotaHandler::generate_handler(const DoutPrefixProvider *dpp, rgw::sal::Driver* driver, bool quota_threads)
991 {
992 return new RGWQuotaHandlerImpl(dpp, driver, quota_threads);
993 }
994
995 void RGWQuotaHandler::free_handler(RGWQuotaHandler *handler)
996 {
997 delete handler;
998 }
999
1000
1001 void rgw_apply_default_bucket_quota(RGWQuotaInfo& quota, const ConfigProxy& conf)
1002 {
1003 if (conf->rgw_bucket_default_quota_max_objects >= 0) {
1004 quota.max_objects = conf->rgw_bucket_default_quota_max_objects;
1005 quota.enabled = true;
1006 }
1007 if (conf->rgw_bucket_default_quota_max_size >= 0) {
1008 quota.max_size = conf->rgw_bucket_default_quota_max_size;
1009 quota.enabled = true;
1010 }
1011 }
1012
1013 void rgw_apply_default_user_quota(RGWQuotaInfo& quota, const ConfigProxy& conf)
1014 {
1015 if (conf->rgw_user_default_quota_max_objects >= 0) {
1016 quota.max_objects = conf->rgw_user_default_quota_max_objects;
1017 quota.enabled = true;
1018 }
1019 if (conf->rgw_user_default_quota_max_size >= 0) {
1020 quota.max_size = conf->rgw_user_default_quota_max_size;
1021 quota.enabled = true;
1022 }
1023 }
1024
1025 void RGWQuotaInfo::dump(Formatter *f) const
1026 {
1027 f->dump_bool("enabled", enabled);
1028 f->dump_bool("check_on_raw", check_on_raw);
1029
1030 f->dump_int("max_size", max_size);
1031 f->dump_int("max_size_kb", rgw_rounded_kb(max_size));
1032 f->dump_int("max_objects", max_objects);
1033 }
1034
1035 void RGWQuotaInfo::decode_json(JSONObj *obj)
1036 {
1037 if (false == JSONDecoder::decode_json("max_size", max_size, obj)) {
1038 /* We're parsing an older version of the struct. */
1039 int64_t max_size_kb = 0;
1040
1041 JSONDecoder::decode_json("max_size_kb", max_size_kb, obj);
1042 max_size = max_size_kb * 1024;
1043 }
1044 JSONDecoder::decode_json("max_objects", max_objects, obj);
1045
1046 JSONDecoder::decode_json("check_on_raw", check_on_raw, obj);
1047 JSONDecoder::decode_json("enabled", enabled, obj);
1048 }
1049