]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_quota.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / rgw / rgw_quota.cc
CommitLineData
7c673cae 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
11fdf7f2 3
7c673cae
FG
4/*
5 * Ceph - scalable distributed file system
6 *
7 * Copyright (C) 2013 Inktank, Inc
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16
17#include "include/utime.h"
18#include "common/lru_map.h"
19#include "common/RefCountedObj.h"
20#include "common/Thread.h"
9f95a23c 21#include "common/ceph_mutex.h"
7c673cae
FG
22
23#include "rgw_common.h"
9f95a23c 24#include "rgw_sal.h"
f67539c2 25#include "rgw_sal_rados.h"
7c673cae
FG
26#include "rgw_quota.h"
27#include "rgw_bucket.h"
28#include "rgw_user.h"
29
11fdf7f2 30#include "services/svc_sys_obj.h"
9f95a23c 31#include "services/svc_meta.h"
11fdf7f2 32
7c673cae
FG
33#include <atomic>
34
35#define dout_context g_ceph_context
36#define dout_subsys ceph_subsys_rgw
37
20effc67 38using namespace std;
7c673cae
FG
39
40struct RGWQuotaCacheStats {
41 RGWStorageStats stats;
42 utime_t expiration;
43 utime_t async_refresh_time;
44};
45
46template<class T>
47class RGWQuotaCache {
48protected:
1e59de90 49 rgw::sal::Driver* driver;
7c673cae
FG
50 lru_map<T, RGWQuotaCacheStats> stats_map;
51 RefCountedWaitObject *async_refcount;
52
53 class StatsAsyncTestSet : public lru_map<T, RGWQuotaCacheStats>::UpdateContext {
54 int objs_delta;
55 uint64_t added_bytes;
56 uint64_t removed_bytes;
57 public:
58 StatsAsyncTestSet() : objs_delta(0), added_bytes(0), removed_bytes(0) {}
59 bool update(RGWQuotaCacheStats *entry) override {
60 if (entry->async_refresh_time.sec() == 0)
61 return false;
62
63 entry->async_refresh_time = utime_t(0, 0);
64
65 return true;
66 }
67 };
68
b3b6e05e 69 virtual int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider *dpp) = 0;
7c673cae 70
224ce89b 71 virtual bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0;
7c673cae 72
224ce89b
WB
73 virtual bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, typename lru_map<T, RGWQuotaCacheStats>::UpdateContext *ctx) = 0;
74 virtual void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0;
7c673cae
FG
75
76 virtual void data_modified(const rgw_user& user, rgw_bucket& bucket) {}
77public:
1e59de90 78 RGWQuotaCache(rgw::sal::Driver* _driver, int size) : driver(_driver), stats_map(size) {
7c673cae
FG
79 async_refcount = new RefCountedWaitObject;
80 }
81 virtual ~RGWQuotaCache() {
82 async_refcount->put_wait(); /* wait for all pending async requests to complete */
83 }
84
522d829b
TL
85 int get_stats(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, optional_yield y,
86 const DoutPrefixProvider* dpp);
7c673cae
FG
87 void adjust_stats(const rgw_user& user, rgw_bucket& bucket, int objs_delta, uint64_t added_bytes, uint64_t removed_bytes);
88
224ce89b
WB
89 void set_stats(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats);
90 int async_refresh(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs);
7c673cae 91 void async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats);
c07f9fc5 92 void async_refresh_fail(const rgw_user& user, rgw_bucket& bucket);
7c673cae
FG
93
94 class AsyncRefreshHandler {
95 protected:
1e59de90 96 rgw::sal::Driver* driver;
7c673cae
FG
97 RGWQuotaCache<T> *cache;
98 public:
1e59de90 99 AsyncRefreshHandler(rgw::sal::Driver* _driver, RGWQuotaCache<T> *_cache) : driver(_driver), cache(_cache) {}
7c673cae
FG
100 virtual ~AsyncRefreshHandler() {}
101
102 virtual int init_fetch() = 0;
103 virtual void drop_reference() = 0;
104 };
105
224ce89b 106 virtual AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) = 0;
7c673cae
FG
107};
108
7c673cae 109template<class T>
224ce89b 110int RGWQuotaCache<T>::async_refresh(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs)
7c673cae
FG
111{
112 /* protect against multiple updates */
113 StatsAsyncTestSet test_update;
114 if (!map_find_and_update(user, bucket, &test_update)) {
115 /* most likely we just raced with another update */
116 return 0;
117 }
118
119 async_refcount->get();
120
121
122 AsyncRefreshHandler *handler = allocate_refresh_handler(user, bucket);
123
124 int ret = handler->init_fetch();
125 if (ret < 0) {
126 async_refcount->put();
127 handler->drop_reference();
128 return ret;
129 }
130
131 return 0;
132}
133
c07f9fc5
FG
134template<class T>
135void RGWQuotaCache<T>::async_refresh_fail(const rgw_user& user, rgw_bucket& bucket)
136{
1e59de90 137 ldout(driver->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl;
c07f9fc5
FG
138
139 async_refcount->put();
140}
141
7c673cae
FG
142template<class T>
143void RGWQuotaCache<T>::async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats)
144{
1e59de90 145 ldout(driver->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl;
7c673cae
FG
146
147 RGWQuotaCacheStats qs;
148
149 map_find(user, bucket, qs);
150
151 set_stats(user, bucket, qs, stats);
152
153 async_refcount->put();
154}
155
156template<class T>
224ce89b 157void RGWQuotaCache<T>::set_stats(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats)
7c673cae
FG
158{
159 qs.stats = stats;
160 qs.expiration = ceph_clock_now();
161 qs.async_refresh_time = qs.expiration;
1e59de90
TL
162 qs.expiration += driver->ctx()->_conf->rgw_bucket_quota_ttl;
163 qs.async_refresh_time += driver->ctx()->_conf->rgw_bucket_quota_ttl / 2;
7c673cae
FG
164
165 map_add(user, bucket, qs);
166}
167
168template<class T>
522d829b 169int RGWQuotaCache<T>::get_stats(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider* dpp) {
7c673cae
FG
170 RGWQuotaCacheStats qs;
171 utime_t now = ceph_clock_now();
172 if (map_find(user, bucket, qs)) {
173 if (qs.async_refresh_time.sec() > 0 && now >= qs.async_refresh_time) {
174 int r = async_refresh(user, bucket, qs);
175 if (r < 0) {
20effc67 176 ldpp_dout(dpp, 0) << "ERROR: quota async refresh returned ret=" << r << dendl;
7c673cae
FG
177
178 /* continue processing, might be a transient error, async refresh is just optimization */
179 }
180 }
181
522d829b 182 if (qs.expiration > ceph_clock_now()) {
7c673cae
FG
183 stats = qs.stats;
184 return 0;
185 }
186 }
187
b3b6e05e 188 int ret = fetch_stats_from_storage(user, bucket, stats, y, dpp);
7c673cae
FG
189 if (ret < 0 && ret != -ENOENT)
190 return ret;
191
192 set_stats(user, bucket, qs, stats);
193
194 return 0;
195}
196
197
198template<class T>
199class RGWQuotaStatsUpdate : public lru_map<T, RGWQuotaCacheStats>::UpdateContext {
200 const int objs_delta;
201 const uint64_t added_bytes;
202 const uint64_t removed_bytes;
203public:
204 RGWQuotaStatsUpdate(const int objs_delta,
205 const uint64_t added_bytes,
206 const uint64_t removed_bytes)
207 : objs_delta(objs_delta),
208 added_bytes(added_bytes),
209 removed_bytes(removed_bytes) {
210 }
211
212 bool update(RGWQuotaCacheStats * const entry) override {
213 const uint64_t rounded_added = rgw_rounded_objsize(added_bytes);
214 const uint64_t rounded_removed = rgw_rounded_objsize(removed_bytes);
215
181888fb 216 if (((int64_t)(entry->stats.size + added_bytes - removed_bytes)) >= 0) {
c07f9fc5
FG
217 entry->stats.size += added_bytes - removed_bytes;
218 } else {
219 entry->stats.size = 0;
220 }
221
181888fb 222 if (((int64_t)(entry->stats.size_rounded + rounded_added - rounded_removed)) >= 0) {
c07f9fc5
FG
223 entry->stats.size_rounded += rounded_added - rounded_removed;
224 } else {
225 entry->stats.size_rounded = 0;
226 }
227
181888fb 228 if (((int64_t)(entry->stats.num_objects + objs_delta)) >= 0) {
c07f9fc5
FG
229 entry->stats.num_objects += objs_delta;
230 } else {
231 entry->stats.num_objects = 0;
232 }
7c673cae
FG
233
234 return true;
235 }
236};
237
238
239template<class T>
240void RGWQuotaCache<T>::adjust_stats(const rgw_user& user, rgw_bucket& bucket, int objs_delta,
241 uint64_t added_bytes, uint64_t removed_bytes)
242{
243 RGWQuotaStatsUpdate<T> update(objs_delta, added_bytes, removed_bytes);
244 map_find_and_update(user, bucket, &update);
245
246 data_modified(user, bucket);
247}
248
249class BucketAsyncRefreshHandler : public RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler,
250 public RGWGetBucketStats_CB {
251 rgw_user user;
252public:
1e59de90 253 BucketAsyncRefreshHandler(rgw::sal::Driver* _driver, RGWQuotaCache<rgw_bucket> *_cache,
224ce89b 254 const rgw_user& _user, const rgw_bucket& _bucket) :
1e59de90 255 RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler(_driver, _cache),
7c673cae
FG
256 RGWGetBucketStats_CB(_bucket), user(_user) {}
257
258 void drop_reference() override { put(); }
259 void handle_response(int r) override;
260 int init_fetch() override;
261};
262
263int BucketAsyncRefreshHandler::init_fetch()
264{
20effc67 265 std::unique_ptr<rgw::sal::Bucket> rbucket;
7c673cae 266
1e59de90
TL
267 const DoutPrefix dp(driver->ctx(), dout_subsys, "rgw bucket async refresh handler: ");
268 int r = driver->get_bucket(&dp, nullptr, bucket, &rbucket, null_yield);
7c673cae 269 if (r < 0) {
b3b6e05e 270 ldpp_dout(&dp, 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl;
7c673cae
FG
271 return r;
272 }
273
b3b6e05e 274 ldpp_dout(&dp, 20) << "initiating async quota refresh for bucket=" << bucket << dendl;
7c673cae 275
1e59de90
TL
276 const auto& index = rbucket->get_info().get_current_index();
277 if (is_layout_indexless(index)) {
278 return 0;
279 }
280
281 r = rbucket->read_stats_async(&dp, index, RGW_NO_SHARD, this);
7c673cae 282 if (r < 0) {
b3b6e05e 283 ldpp_dout(&dp, 0) << "could not get bucket info for bucket=" << bucket.name << dendl;
7c673cae 284
20effc67 285 /* read_stats_async() dropped our reference already */
7c673cae
FG
286 return r;
287 }
288
289 return 0;
290}
291
292void BucketAsyncRefreshHandler::handle_response(const int r)
293{
294 if (r < 0) {
1e59de90 295 ldout(driver->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
c07f9fc5
FG
296 cache->async_refresh_fail(user, bucket);
297 return;
7c673cae
FG
298 }
299
300 RGWStorageStats bs;
301
302 for (const auto& pair : *stats) {
303 const RGWStorageStats& s = pair.second;
304
305 bs.size += s.size;
306 bs.size_rounded += s.size_rounded;
307 bs.num_objects += s.num_objects;
308 }
309
310 cache->async_refresh_response(user, bucket, bs);
311}
312
313class RGWBucketStatsCache : public RGWQuotaCache<rgw_bucket> {
314protected:
224ce89b 315 bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
7c673cae
FG
316 return stats_map.find(bucket, qs);
317 }
318
224ce89b 319 bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, lru_map<rgw_bucket, RGWQuotaCacheStats>::UpdateContext *ctx) override {
7c673cae
FG
320 return stats_map.find_and_update(bucket, NULL, ctx);
321 }
322
224ce89b 323 void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
7c673cae
FG
324 stats_map.add(bucket, qs);
325 }
326
b3b6e05e 327 int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider *dpp) override;
7c673cae
FG
328
329public:
1e59de90 330 explicit RGWBucketStatsCache(rgw::sal::Driver* _driver) : RGWQuotaCache<rgw_bucket>(_driver, _driver->ctx()->_conf->rgw_bucket_quota_cache_size) {
7c673cae
FG
331 }
332
224ce89b 333 AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override {
1e59de90 334 return new BucketAsyncRefreshHandler(driver, this, user, bucket);
7c673cae
FG
335 }
336};
337
20effc67 338int RGWBucketStatsCache::fetch_stats_from_storage(const rgw_user& _u, const rgw_bucket& _b, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider *dpp)
7c673cae 339{
1e59de90 340 std::unique_ptr<rgw::sal::User> user = driver->get_user(_u);
20effc67 341 std::unique_ptr<rgw::sal::Bucket> bucket;
7c673cae 342
1e59de90 343 int r = driver->get_bucket(dpp, user.get(), _b, &bucket, y);
7c673cae 344 if (r < 0) {
20effc67 345 ldpp_dout(dpp, 0) << "could not get bucket info for bucket=" << _b << " r=" << r << dendl;
7c673cae
FG
346 return r;
347 }
348
1e59de90
TL
349 stats = RGWStorageStats();
350
351 const auto& index = bucket->get_info().get_current_index();
352 if (is_layout_indexless(index)) {
353 return 0;
354 }
355
7c673cae
FG
356 string bucket_ver;
357 string master_ver;
358
359 map<RGWObjCategory, RGWStorageStats> bucket_stats;
1e59de90
TL
360 r = bucket->read_stats(dpp, index, RGW_NO_SHARD, &bucket_ver,
361 &master_ver, bucket_stats, nullptr);
7c673cae 362 if (r < 0) {
b3b6e05e 363 ldpp_dout(dpp, 0) << "could not get bucket stats for bucket="
20effc67 364 << _b.name << dendl;
7c673cae
FG
365 return r;
366 }
367
7c673cae
FG
368 for (const auto& pair : bucket_stats) {
369 const RGWStorageStats& s = pair.second;
370
371 stats.size += s.size;
372 stats.size_rounded += s.size_rounded;
373 stats.num_objects += s.num_objects;
374 }
375
376 return 0;
377}
378
379class UserAsyncRefreshHandler : public RGWQuotaCache<rgw_user>::AsyncRefreshHandler,
380 public RGWGetUserStats_CB {
b3b6e05e 381 const DoutPrefixProvider *dpp;
7c673cae
FG
382 rgw_bucket bucket;
383public:
1e59de90 384 UserAsyncRefreshHandler(const DoutPrefixProvider *_dpp, rgw::sal::Driver* _driver, RGWQuotaCache<rgw_user> *_cache,
224ce89b 385 const rgw_user& _user, const rgw_bucket& _bucket) :
1e59de90 386 RGWQuotaCache<rgw_user>::AsyncRefreshHandler(_driver, _cache),
7c673cae 387 RGWGetUserStats_CB(_user),
b3b6e05e 388 dpp(_dpp),
7c673cae
FG
389 bucket(_bucket) {}
390
391 void drop_reference() override { put(); }
392 int init_fetch() override;
393 void handle_response(int r) override;
394};
395
396int UserAsyncRefreshHandler::init_fetch()
397{
1e59de90 398 std::unique_ptr<rgw::sal::User> ruser = driver->get_user(user);
20effc67 399
b3b6e05e 400 ldpp_dout(dpp, 20) << "initiating async quota refresh for user=" << user << dendl;
20effc67 401 int r = ruser->read_stats_async(dpp, this);
7c673cae 402 if (r < 0) {
b3b6e05e 403 ldpp_dout(dpp, 0) << "could not get bucket info for user=" << user << dendl;
7c673cae
FG
404
405 /* get_bucket_stats_async() dropped our reference already */
406 return r;
407 }
408
409 return 0;
410}
411
412void UserAsyncRefreshHandler::handle_response(int r)
413{
414 if (r < 0) {
1e59de90 415 ldout(driver->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
c07f9fc5
FG
416 cache->async_refresh_fail(user, bucket);
417 return;
7c673cae
FG
418 }
419
420 cache->async_refresh_response(user, bucket, stats);
421}
422
423class RGWUserStatsCache : public RGWQuotaCache<rgw_user> {
b3b6e05e 424 const DoutPrefixProvider *dpp;
7c673cae 425 std::atomic<bool> down_flag = { false };
9f95a23c 426 ceph::shared_mutex mutex = ceph::make_shared_mutex("RGWUserStatsCache");
7c673cae
FG
427 map<rgw_bucket, rgw_user> modified_buckets;
428
429 /* thread, sync recent modified buckets info */
430 class BucketsSyncThread : public Thread {
431 CephContext *cct;
432 RGWUserStatsCache *stats;
433
9f95a23c
TL
434 ceph::mutex lock = ceph::make_mutex("RGWUserStatsCache::BucketsSyncThread");
435 ceph::condition_variable cond;
7c673cae
FG
436 public:
437
9f95a23c 438 BucketsSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s) {}
7c673cae
FG
439
440 void *entry() override {
441 ldout(cct, 20) << "BucketsSyncThread: start" << dendl;
442 do {
443 map<rgw_bucket, rgw_user> buckets;
444
445 stats->swap_modified_buckets(buckets);
446
447 for (map<rgw_bucket, rgw_user>::iterator iter = buckets.begin(); iter != buckets.end(); ++iter) {
448 rgw_bucket bucket = iter->first;
449 rgw_user& user = iter->second;
450 ldout(cct, 20) << "BucketsSyncThread: sync user=" << user << " bucket=" << bucket << dendl;
b3b6e05e
TL
451 const DoutPrefix dp(cct, dout_subsys, "rgw bucket sync thread: ");
452 int r = stats->sync_bucket(user, bucket, null_yield, &dp);
7c673cae
FG
453 if (r < 0) {
454 ldout(cct, 0) << "WARNING: sync_bucket() returned r=" << r << dendl;
455 }
456 }
457
458 if (stats->going_down())
459 break;
460
9f95a23c
TL
461 std::unique_lock locker{lock};
462 cond.wait_for(
463 locker,
464 std::chrono::seconds(cct->_conf->rgw_user_quota_bucket_sync_interval));
7c673cae
FG
465 } while (!stats->going_down());
466 ldout(cct, 20) << "BucketsSyncThread: done" << dendl;
467
468 return NULL;
469 }
470
471 void stop() {
9f95a23c
TL
472 std::lock_guard l{lock};
473 cond.notify_all();
7c673cae
FG
474 }
475 };
476
477 /*
478 * thread, full sync all users stats periodically
479 *
480 * only sync non idle users or ones that never got synced before, this is needed so that
481 * users that didn't have quota turned on before (or existed before the user objclass
482 * tracked stats) need to get their backend stats up to date.
483 */
484 class UserSyncThread : public Thread {
485 CephContext *cct;
486 RGWUserStatsCache *stats;
487
9f95a23c
TL
488 ceph::mutex lock = ceph::make_mutex("RGWUserStatsCache::UserSyncThread");
489 ceph::condition_variable cond;
7c673cae
FG
490 public:
491
9f95a23c 492 UserSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s) {}
7c673cae
FG
493
494 void *entry() override {
495 ldout(cct, 20) << "UserSyncThread: start" << dendl;
496 do {
b3b6e05e
TL
497 const DoutPrefix dp(cct, dout_subsys, "rgw user sync thread: ");
498 int ret = stats->sync_all_users(&dp, null_yield);
7c673cae
FG
499 if (ret < 0) {
500 ldout(cct, 5) << "ERROR: sync_all_users() returned ret=" << ret << dendl;
501 }
502
b5b8bbf5
FG
503 if (stats->going_down())
504 break;
505
9f95a23c
TL
506 std::unique_lock l{lock};
507 cond.wait_for(l, std::chrono::seconds(cct->_conf->rgw_user_quota_sync_interval));
7c673cae
FG
508 } while (!stats->going_down());
509 ldout(cct, 20) << "UserSyncThread: done" << dendl;
510
511 return NULL;
512 }
513
514 void stop() {
9f95a23c
TL
515 std::lock_guard l{lock};
516 cond.notify_all();
7c673cae
FG
517 }
518 };
519
520 BucketsSyncThread *buckets_sync_thread;
521 UserSyncThread *user_sync_thread;
522protected:
224ce89b 523 bool map_find(const rgw_user& user,const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
7c673cae
FG
524 return stats_map.find(user, qs);
525 }
526
224ce89b 527 bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, lru_map<rgw_user, RGWQuotaCacheStats>::UpdateContext *ctx) override {
7c673cae
FG
528 return stats_map.find_and_update(user, NULL, ctx);
529 }
530
224ce89b 531 void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
7c673cae
FG
532 stats_map.add(user, qs);
533 }
534
b3b6e05e
TL
535 int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider *dpp) override;
536 int sync_bucket(const rgw_user& rgw_user, rgw_bucket& bucket, optional_yield y, const DoutPrefixProvider *dpp);
537 int sync_user(const DoutPrefixProvider *dpp, const rgw_user& user, optional_yield y);
538 int sync_all_users(const DoutPrefixProvider *dpp, optional_yield y);
7c673cae
FG
539
540 void data_modified(const rgw_user& user, rgw_bucket& bucket) override;
541
542 void swap_modified_buckets(map<rgw_bucket, rgw_user>& out) {
9f95a23c 543 std::unique_lock lock{mutex};
7c673cae 544 modified_buckets.swap(out);
7c673cae
FG
545 }
546
547 template<class T> /* easier doing it as a template, Thread doesn't have ->stop() */
548 void stop_thread(T **pthr) {
549 T *thread = *pthr;
550 if (!thread)
551 return;
552
553 thread->stop();
554 thread->join();
555 delete thread;
556 *pthr = NULL;
557 }
558
559public:
1e59de90
TL
560 RGWUserStatsCache(const DoutPrefixProvider *dpp, rgw::sal::Driver* _driver, bool quota_threads)
561 : RGWQuotaCache<rgw_user>(_driver, _driver->ctx()->_conf->rgw_bucket_quota_cache_size), dpp(dpp)
9f95a23c 562 {
7c673cae 563 if (quota_threads) {
1e59de90 564 buckets_sync_thread = new BucketsSyncThread(driver->ctx(), this);
7c673cae 565 buckets_sync_thread->create("rgw_buck_st_syn");
1e59de90 566 user_sync_thread = new UserSyncThread(driver->ctx(), this);
7c673cae
FG
567 user_sync_thread->create("rgw_user_st_syn");
568 } else {
569 buckets_sync_thread = NULL;
570 user_sync_thread = NULL;
571 }
572 }
573 ~RGWUserStatsCache() override {
574 stop();
575 }
576
224ce89b 577 AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override {
1e59de90 578 return new UserAsyncRefreshHandler(dpp, driver, this, user, bucket);
7c673cae
FG
579 }
580
7c673cae
FG
581 bool going_down() {
582 return down_flag;
583 }
584
585 void stop() {
586 down_flag = true;
9f95a23c
TL
587 {
588 std::unique_lock lock{mutex};
589 stop_thread(&buckets_sync_thread);
590 }
7c673cae
FG
591 stop_thread(&user_sync_thread);
592 }
593};
594
20effc67
TL
595int RGWUserStatsCache::fetch_stats_from_storage(const rgw_user& _u,
596 const rgw_bucket& _b,
f67539c2 597 RGWStorageStats& stats,
b3b6e05e
TL
598 optional_yield y,
599 const DoutPrefixProvider *dpp)
7c673cae 600{
1e59de90 601 std::unique_ptr<rgw::sal::User> user = driver->get_user(_u);
20effc67 602 int r = user->read_stats(dpp, y, &stats);
7c673cae 603 if (r < 0) {
20effc67 604 ldpp_dout(dpp, 0) << "could not get user stats for user=" << user << dendl;
7c673cae
FG
605 return r;
606 }
607
608 return 0;
609}
610
20effc67 611int RGWUserStatsCache::sync_bucket(const rgw_user& _u, rgw_bucket& _b, optional_yield y, const DoutPrefixProvider *dpp)
7c673cae 612{
1e59de90 613 std::unique_ptr<rgw::sal::User> user = driver->get_user(_u);
20effc67 614 std::unique_ptr<rgw::sal::Bucket> bucket;
7c673cae 615
1e59de90 616 int r = driver->get_bucket(dpp, user.get(), _b, &bucket, y);
7c673cae 617 if (r < 0) {
20effc67 618 ldpp_dout(dpp, 0) << "could not get bucket info for bucket=" << _b << " r=" << r << dendl;
7c673cae
FG
619 return r;
620 }
621
20effc67 622 r = bucket->sync_user_stats(dpp, y);
7c673cae 623 if (r < 0) {
20effc67 624 ldpp_dout(dpp, 0) << "ERROR: sync_user_stats() for user=" << _u << ", bucket=" << bucket << " returned " << r << dendl;
7c673cae
FG
625 return r;
626 }
627
20effc67 628 return bucket->check_bucket_shards(dpp);
7c673cae
FG
629}
630
20effc67 631int RGWUserStatsCache::sync_user(const DoutPrefixProvider *dpp, const rgw_user& _u, optional_yield y)
7c673cae 632{
9f95a23c
TL
633 RGWStorageStats stats;
634 ceph::real_time last_stats_sync;
635 ceph::real_time last_stats_update;
1e59de90 636 std::unique_ptr<rgw::sal::User> user = driver->get_user(rgw_user(_u.to_str()));
9f95a23c 637
20effc67 638 int ret = user->read_stats(dpp, y, &stats, &last_stats_sync, &last_stats_update);
7c673cae 639 if (ret < 0) {
20effc67 640 ldpp_dout(dpp, 5) << "ERROR: can't read user header: ret=" << ret << dendl;
7c673cae
FG
641 return ret;
642 }
643
1e59de90 644 if (!driver->ctx()->_conf->rgw_user_quota_sync_idle_users &&
9f95a23c 645 last_stats_update < last_stats_sync) {
20effc67 646 ldpp_dout(dpp, 20) << "user is idle, not doing a full sync (user=" << user << ")" << dendl;
7c673cae
FG
647 return 0;
648 }
649
9f95a23c 650 real_time when_need_full_sync = last_stats_sync;
1e59de90 651 when_need_full_sync += make_timespan(driver->ctx()->_conf->rgw_user_quota_sync_wait_time);
7c673cae
FG
652
653 // check if enough time passed since last full sync
654 /* FIXME: missing check? */
655
1e59de90 656 ret = rgw_user_sync_all_stats(dpp, driver, user.get(), y);
7c673cae 657 if (ret < 0) {
20effc67 658 ldpp_dout(dpp, 0) << "ERROR: failed user stats sync, ret=" << ret << dendl;
7c673cae
FG
659 return ret;
660 }
661
662 return 0;
663}
664
b3b6e05e 665int RGWUserStatsCache::sync_all_users(const DoutPrefixProvider *dpp, optional_yield y)
7c673cae
FG
666{
667 string key = "user";
668 void *handle;
669
1e59de90 670 int ret = driver->meta_list_keys_init(dpp, key, string(), &handle);
7c673cae 671 if (ret < 0) {
b3b6e05e 672 ldpp_dout(dpp, 10) << "ERROR: can't get key: ret=" << ret << dendl;
7c673cae
FG
673 return ret;
674 }
675
676 bool truncated;
677 int max = 1000;
678
679 do {
680 list<string> keys;
1e59de90 681 ret = driver->meta_list_keys_next(dpp, handle, max, keys, &truncated);
7c673cae 682 if (ret < 0) {
b3b6e05e 683 ldpp_dout(dpp, 0) << "ERROR: lists_keys_next(): ret=" << ret << dendl;
7c673cae
FG
684 goto done;
685 }
686 for (list<string>::iterator iter = keys.begin();
687 iter != keys.end() && !going_down();
688 ++iter) {
689 rgw_user user(*iter);
b3b6e05e
TL
690 ldpp_dout(dpp, 20) << "RGWUserStatsCache: sync user=" << user << dendl;
691 int ret = sync_user(dpp, user, y);
7c673cae 692 if (ret < 0) {
b3b6e05e 693 ldpp_dout(dpp, 5) << "ERROR: sync_user() failed, user=" << user << " ret=" << ret << dendl;
7c673cae
FG
694
695 /* continuing to next user */
696 continue;
697 }
698 }
699 } while (truncated);
700
701 ret = 0;
702done:
1e59de90 703 driver->meta_list_keys_complete(handle);
7c673cae
FG
704 return ret;
705}
706
707void RGWUserStatsCache::data_modified(const rgw_user& user, rgw_bucket& bucket)
708{
709 /* racy, but it's ok */
9f95a23c 710 mutex.lock_shared();
7c673cae 711 bool need_update = modified_buckets.find(bucket) == modified_buckets.end();
9f95a23c 712 mutex.unlock_shared();
7c673cae
FG
713
714 if (need_update) {
9f95a23c 715 std::unique_lock lock{mutex};
7c673cae 716 modified_buckets[bucket] = user;
7c673cae
FG
717 }
718}
719
720
721class RGWQuotaInfoApplier {
722 /* NOTE: no non-static field allowed as instances are supposed to live in
723 * the static memory only. */
724protected:
725 RGWQuotaInfoApplier() = default;
726
727public:
728 virtual ~RGWQuotaInfoApplier() {}
729
20effc67
TL
730 virtual bool is_size_exceeded(const DoutPrefixProvider *dpp,
731 const char * const entity,
7c673cae
FG
732 const RGWQuotaInfo& qinfo,
733 const RGWStorageStats& stats,
734 const uint64_t size) const = 0;
735
20effc67
TL
736 virtual bool is_num_objs_exceeded(const DoutPrefixProvider *dpp,
737 const char * const entity,
7c673cae
FG
738 const RGWQuotaInfo& qinfo,
739 const RGWStorageStats& stats,
740 const uint64_t num_objs) const = 0;
741
742 static const RGWQuotaInfoApplier& get_instance(const RGWQuotaInfo& qinfo);
743};
744
745class RGWQuotaInfoDefApplier : public RGWQuotaInfoApplier {
746public:
20effc67 747 bool is_size_exceeded(const DoutPrefixProvider *dpp, const char * const entity,
7c673cae
FG
748 const RGWQuotaInfo& qinfo,
749 const RGWStorageStats& stats,
750 const uint64_t size) const override;
751
20effc67 752 bool is_num_objs_exceeded(const DoutPrefixProvider *dpp, const char * const entity,
7c673cae
FG
753 const RGWQuotaInfo& qinfo,
754 const RGWStorageStats& stats,
755 const uint64_t num_objs) const override;
756};
757
758class RGWQuotaInfoRawApplier : public RGWQuotaInfoApplier {
759public:
20effc67 760 bool is_size_exceeded(const DoutPrefixProvider *dpp, const char * const entity,
7c673cae
FG
761 const RGWQuotaInfo& qinfo,
762 const RGWStorageStats& stats,
763 const uint64_t size) const override;
764
20effc67 765 bool is_num_objs_exceeded(const DoutPrefixProvider *dpp, const char * const entity,
7c673cae
FG
766 const RGWQuotaInfo& qinfo,
767 const RGWStorageStats& stats,
768 const uint64_t num_objs) const override;
769};
770
771
20effc67
TL
772bool RGWQuotaInfoDefApplier::is_size_exceeded(const DoutPrefixProvider *dpp,
773 const char * const entity,
7c673cae
FG
774 const RGWQuotaInfo& qinfo,
775 const RGWStorageStats& stats,
776 const uint64_t size) const
777{
778 if (qinfo.max_size < 0) {
779 /* The limit is not enabled. */
780 return false;
781 }
782
783 const uint64_t cur_size = stats.size_rounded;
784 const uint64_t new_size = rgw_rounded_objsize(size);
785
1e59de90 786 if (std::cmp_greater(cur_size + new_size, qinfo.max_size)) {
20effc67 787 ldpp_dout(dpp, 10) << "quota exceeded: stats.size_rounded=" << stats.size_rounded
7c673cae
FG
788 << " size=" << new_size << " "
789 << entity << "_quota.max_size=" << qinfo.max_size << dendl;
790 return true;
791 }
792
793 return false;
794}
795
20effc67
TL
796bool RGWQuotaInfoDefApplier::is_num_objs_exceeded(const DoutPrefixProvider *dpp,
797 const char * const entity,
7c673cae
FG
798 const RGWQuotaInfo& qinfo,
799 const RGWStorageStats& stats,
800 const uint64_t num_objs) const
801{
802 if (qinfo.max_objects < 0) {
803 /* The limit is not enabled. */
804 return false;
805 }
806
1e59de90 807 if (std::cmp_greater(stats.num_objects + num_objs, qinfo.max_objects)) {
20effc67 808 ldpp_dout(dpp, 10) << "quota exceeded: stats.num_objects=" << stats.num_objects
7c673cae
FG
809 << " " << entity << "_quota.max_objects=" << qinfo.max_objects
810 << dendl;
811 return true;
812 }
813
814 return false;
815}
816
20effc67
TL
817bool RGWQuotaInfoRawApplier::is_size_exceeded(const DoutPrefixProvider *dpp,
818 const char * const entity,
7c673cae
FG
819 const RGWQuotaInfo& qinfo,
820 const RGWStorageStats& stats,
821 const uint64_t size) const
822{
823 if (qinfo.max_size < 0) {
824 /* The limit is not enabled. */
825 return false;
826 }
827
828 const uint64_t cur_size = stats.size;
829
1e59de90 830 if (std::cmp_greater(cur_size + size, qinfo.max_size)) {
20effc67 831 ldpp_dout(dpp, 10) << "quota exceeded: stats.size=" << stats.size
7c673cae
FG
832 << " size=" << size << " "
833 << entity << "_quota.max_size=" << qinfo.max_size << dendl;
834 return true;
835 }
836
837 return false;
838}
839
20effc67
TL
840bool RGWQuotaInfoRawApplier::is_num_objs_exceeded(const DoutPrefixProvider *dpp,
841 const char * const entity,
7c673cae
FG
842 const RGWQuotaInfo& qinfo,
843 const RGWStorageStats& stats,
844 const uint64_t num_objs) const
845{
846 if (qinfo.max_objects < 0) {
847 /* The limit is not enabled. */
848 return false;
849 }
850
1e59de90 851 if (std::cmp_greater(stats.num_objects + num_objs, qinfo.max_objects)) {
20effc67 852 ldpp_dout(dpp, 10) << "quota exceeded: stats.num_objects=" << stats.num_objects
7c673cae
FG
853 << " " << entity << "_quota.max_objects=" << qinfo.max_objects
854 << dendl;
855 return true;
856 }
857
858 return false;
859}
860
861const RGWQuotaInfoApplier& RGWQuotaInfoApplier::get_instance(
862 const RGWQuotaInfo& qinfo)
863{
864 static RGWQuotaInfoDefApplier default_qapplier;
865 static RGWQuotaInfoRawApplier raw_qapplier;
866
867 if (qinfo.check_on_raw) {
868 return raw_qapplier;
869 } else {
870 return default_qapplier;
871 }
872}
873
874
875class RGWQuotaHandlerImpl : public RGWQuotaHandler {
1e59de90 876 rgw::sal::Driver* driver;
7c673cae
FG
877 RGWBucketStatsCache bucket_stats_cache;
878 RGWUserStatsCache user_stats_cache;
879
20effc67
TL
880 int check_quota(const DoutPrefixProvider *dpp,
881 const char * const entity,
7c673cae
FG
882 const RGWQuotaInfo& quota,
883 const RGWStorageStats& stats,
884 const uint64_t num_objs,
885 const uint64_t size) {
886 if (!quota.enabled) {
887 return 0;
888 }
889
890 const auto& quota_applier = RGWQuotaInfoApplier::get_instance(quota);
891
20effc67 892 ldpp_dout(dpp, 20) << entity
7c673cae
FG
893 << " quota: max_objects=" << quota.max_objects
894 << " max_size=" << quota.max_size << dendl;
895
896
20effc67 897 if (quota_applier.is_num_objs_exceeded(dpp, entity, quota, stats, num_objs)) {
7c673cae
FG
898 return -ERR_QUOTA_EXCEEDED;
899 }
900
20effc67 901 if (quota_applier.is_size_exceeded(dpp, entity, quota, stats, size)) {
7c673cae
FG
902 return -ERR_QUOTA_EXCEEDED;
903 }
904
20effc67 905 ldpp_dout(dpp, 20) << entity << " quota OK:"
7c673cae
FG
906 << " stats.num_objects=" << stats.num_objects
907 << " stats.size=" << stats.size << dendl;
908 return 0;
909 }
910public:
1e59de90
TL
911 RGWQuotaHandlerImpl(const DoutPrefixProvider *dpp, rgw::sal::Driver* _driver, bool quota_threads) : driver(_driver),
912 bucket_stats_cache(_driver),
913 user_stats_cache(dpp, _driver, quota_threads) {}
7c673cae 914
20effc67
TL
915 int check_quota(const DoutPrefixProvider *dpp,
916 const rgw_user& user,
1e59de90
TL
917 rgw_bucket& bucket,
918 RGWQuota& quota,
919 uint64_t num_objs,
920 uint64_t size, optional_yield y) override {
7c673cae 921
1e59de90 922 if (!quota.bucket_quota.enabled && !quota.user_quota.enabled) {
7c673cae
FG
923 return 0;
924 }
925
926 /*
927 * we need to fetch bucket stats if the user quota is enabled, because
928 * the whole system relies on us periodically updating the user's bucket
929 * stats in the user's header, this happens in get_stats() if we actually
930 * fetch that info and not rely on cached data
931 */
932
1e59de90
TL
933 const DoutPrefix dp(driver->ctx(), dout_subsys, "rgw quota handler: ");
934 if (quota.bucket_quota.enabled) {
7c673cae 935 RGWStorageStats bucket_stats;
522d829b 936 int ret = bucket_stats_cache.get_stats(user, bucket, bucket_stats, y, &dp);
7c673cae
FG
937 if (ret < 0) {
938 return ret;
939 }
1e59de90 940 ret = check_quota(dpp, "bucket", quota.bucket_quota, bucket_stats, num_objs, size);
7c673cae
FG
941 if (ret < 0) {
942 return ret;
943 }
944 }
945
1e59de90 946 if (quota.user_quota.enabled) {
7c673cae 947 RGWStorageStats user_stats;
522d829b 948 int ret = user_stats_cache.get_stats(user, bucket, user_stats, y, &dp);
7c673cae
FG
949 if (ret < 0) {
950 return ret;
951 }
1e59de90 952 ret = check_quota(dpp, "user", quota.user_quota, user_stats, num_objs, size);
7c673cae
FG
953 if (ret < 0) {
954 return ret;
955 }
956 }
957 return 0;
958 }
959
960 void update_stats(const rgw_user& user, rgw_bucket& bucket, int obj_delta, uint64_t added_bytes, uint64_t removed_bytes) override {
961 bucket_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes);
962 user_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes);
963 }
31f18b77 964
1e59de90
TL
965 void check_bucket_shards(const DoutPrefixProvider *dpp, uint64_t max_objs_per_shard,
966 uint64_t num_shards, uint64_t num_objs, bool is_multisite,
967 bool& need_resharding, uint32_t *suggested_num_shards) override
31f18b77 968 {
9f95a23c 969 if (num_objs > num_shards * max_objs_per_shard) {
20effc67 970 ldpp_dout(dpp, 0) << __func__ << ": resharding needed: stats.num_objects=" << num_objs
31f18b77
FG
971 << " shard max_objects=" << max_objs_per_shard * num_shards << dendl;
972 need_resharding = true;
973 if (suggested_num_shards) {
1e59de90
TL
974 uint32_t obj_multiplier = 2;
975 if (is_multisite) {
976 // if we're maintaining bilogs for multisite, reshards are significantly
977 // more expensive. scale up the shard count much faster to minimize the
978 // number of reshard events during a write workload
979 obj_multiplier = 8;
980 }
981 *suggested_num_shards = num_objs * obj_multiplier / max_objs_per_shard;
31f18b77
FG
982 }
983 } else {
984 need_resharding = false;
985 }
31f18b77 986 }
7c673cae
FG
987};
988
989
1e59de90 990RGWQuotaHandler *RGWQuotaHandler::generate_handler(const DoutPrefixProvider *dpp, rgw::sal::Driver* driver, bool quota_threads)
7c673cae 991{
1e59de90 992 return new RGWQuotaHandlerImpl(dpp, driver, quota_threads);
7c673cae
FG
993}
994
995void RGWQuotaHandler::free_handler(RGWQuotaHandler *handler)
996{
997 delete handler;
998}
31f18b77
FG
999
1000
11fdf7f2 1001void rgw_apply_default_bucket_quota(RGWQuotaInfo& quota, const ConfigProxy& conf)
f64942e4 1002{
11fdf7f2
TL
1003 if (conf->rgw_bucket_default_quota_max_objects >= 0) {
1004 quota.max_objects = conf->rgw_bucket_default_quota_max_objects;
f64942e4
AA
1005 quota.enabled = true;
1006 }
11fdf7f2
TL
1007 if (conf->rgw_bucket_default_quota_max_size >= 0) {
1008 quota.max_size = conf->rgw_bucket_default_quota_max_size;
f64942e4
AA
1009 quota.enabled = true;
1010 }
1011}
1012
11fdf7f2 1013void rgw_apply_default_user_quota(RGWQuotaInfo& quota, const ConfigProxy& conf)
f64942e4 1014{
11fdf7f2
TL
1015 if (conf->rgw_user_default_quota_max_objects >= 0) {
1016 quota.max_objects = conf->rgw_user_default_quota_max_objects;
f64942e4
AA
1017 quota.enabled = true;
1018 }
11fdf7f2
TL
1019 if (conf->rgw_user_default_quota_max_size >= 0) {
1020 quota.max_size = conf->rgw_user_default_quota_max_size;
f64942e4
AA
1021 quota.enabled = true;
1022 }
1023}
20effc67
TL
1024
1025void RGWQuotaInfo::dump(Formatter *f) const
1026{
1027 f->dump_bool("enabled", enabled);
1028 f->dump_bool("check_on_raw", check_on_raw);
1029
1030 f->dump_int("max_size", max_size);
1031 f->dump_int("max_size_kb", rgw_rounded_kb(max_size));
1032 f->dump_int("max_objects", max_objects);
1033}
1034
1035void RGWQuotaInfo::decode_json(JSONObj *obj)
1036{
1037 if (false == JSONDecoder::decode_json("max_size", max_size, obj)) {
1038 /* We're parsing an older version of the struct. */
1039 int64_t max_size_kb = 0;
1040
1041 JSONDecoder::decode_json("max_size_kb", max_size_kb, obj);
1042 max_size = max_size_kb * 1024;
1043 }
1044 JSONDecoder::decode_json("max_objects", max_objects, obj);
1045
1046 JSONDecoder::decode_json("check_on_raw", check_on_raw, obj);
1047 JSONDecoder::decode_json("enabled", enabled, obj);
1048}
1049