]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_quota.cc
import quincy 17.2.0
[ceph.git] / ceph / src / rgw / rgw_quota.cc
CommitLineData
7c673cae 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
11fdf7f2 3
7c673cae
FG
4/*
5 * Ceph - scalable distributed file system
6 *
7 * Copyright (C) 2013 Inktank, Inc
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16
17#include "include/utime.h"
18#include "common/lru_map.h"
19#include "common/RefCountedObj.h"
20#include "common/Thread.h"
9f95a23c 21#include "common/ceph_mutex.h"
7c673cae
FG
22
23#include "rgw_common.h"
9f95a23c 24#include "rgw_sal.h"
f67539c2 25#include "rgw_sal_rados.h"
7c673cae
FG
26#include "rgw_quota.h"
27#include "rgw_bucket.h"
28#include "rgw_user.h"
29
11fdf7f2 30#include "services/svc_sys_obj.h"
9f95a23c 31#include "services/svc_meta.h"
11fdf7f2 32
7c673cae
FG
33#include <atomic>
34
35#define dout_context g_ceph_context
36#define dout_subsys ceph_subsys_rgw
37
20effc67 38using namespace std;
7c673cae
FG
39
40struct RGWQuotaCacheStats {
41 RGWStorageStats stats;
42 utime_t expiration;
43 utime_t async_refresh_time;
44};
45
46template<class T>
47class RGWQuotaCache {
48protected:
20effc67 49 rgw::sal::Store* store;
7c673cae
FG
50 lru_map<T, RGWQuotaCacheStats> stats_map;
51 RefCountedWaitObject *async_refcount;
52
53 class StatsAsyncTestSet : public lru_map<T, RGWQuotaCacheStats>::UpdateContext {
54 int objs_delta;
55 uint64_t added_bytes;
56 uint64_t removed_bytes;
57 public:
58 StatsAsyncTestSet() : objs_delta(0), added_bytes(0), removed_bytes(0) {}
59 bool update(RGWQuotaCacheStats *entry) override {
60 if (entry->async_refresh_time.sec() == 0)
61 return false;
62
63 entry->async_refresh_time = utime_t(0, 0);
64
65 return true;
66 }
67 };
68
b3b6e05e 69 virtual int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider *dpp) = 0;
7c673cae 70
224ce89b 71 virtual bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0;
7c673cae 72
224ce89b
WB
73 virtual bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, typename lru_map<T, RGWQuotaCacheStats>::UpdateContext *ctx) = 0;
74 virtual void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0;
7c673cae
FG
75
76 virtual void data_modified(const rgw_user& user, rgw_bucket& bucket) {}
77public:
20effc67 78 RGWQuotaCache(rgw::sal::Store* _store, int size) : store(_store), stats_map(size) {
7c673cae
FG
79 async_refcount = new RefCountedWaitObject;
80 }
81 virtual ~RGWQuotaCache() {
82 async_refcount->put_wait(); /* wait for all pending async requests to complete */
83 }
84
522d829b
TL
85 int get_stats(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, optional_yield y,
86 const DoutPrefixProvider* dpp);
7c673cae
FG
87 void adjust_stats(const rgw_user& user, rgw_bucket& bucket, int objs_delta, uint64_t added_bytes, uint64_t removed_bytes);
88
224ce89b
WB
89 void set_stats(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats);
90 int async_refresh(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs);
7c673cae 91 void async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats);
c07f9fc5 92 void async_refresh_fail(const rgw_user& user, rgw_bucket& bucket);
7c673cae
FG
93
94 class AsyncRefreshHandler {
95 protected:
20effc67 96 rgw::sal::Store* store;
7c673cae
FG
97 RGWQuotaCache<T> *cache;
98 public:
20effc67 99 AsyncRefreshHandler(rgw::sal::Store* _store, RGWQuotaCache<T> *_cache) : store(_store), cache(_cache) {}
7c673cae
FG
100 virtual ~AsyncRefreshHandler() {}
101
102 virtual int init_fetch() = 0;
103 virtual void drop_reference() = 0;
104 };
105
224ce89b 106 virtual AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) = 0;
7c673cae
FG
107};
108
7c673cae 109template<class T>
224ce89b 110int RGWQuotaCache<T>::async_refresh(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs)
7c673cae
FG
111{
112 /* protect against multiple updates */
113 StatsAsyncTestSet test_update;
114 if (!map_find_and_update(user, bucket, &test_update)) {
115 /* most likely we just raced with another update */
116 return 0;
117 }
118
119 async_refcount->get();
120
121
122 AsyncRefreshHandler *handler = allocate_refresh_handler(user, bucket);
123
124 int ret = handler->init_fetch();
125 if (ret < 0) {
126 async_refcount->put();
127 handler->drop_reference();
128 return ret;
129 }
130
131 return 0;
132}
133
c07f9fc5
FG
134template<class T>
135void RGWQuotaCache<T>::async_refresh_fail(const rgw_user& user, rgw_bucket& bucket)
136{
137 ldout(store->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl;
138
139 async_refcount->put();
140}
141
7c673cae
FG
142template<class T>
143void RGWQuotaCache<T>::async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats)
144{
145 ldout(store->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl;
146
147 RGWQuotaCacheStats qs;
148
149 map_find(user, bucket, qs);
150
151 set_stats(user, bucket, qs, stats);
152
153 async_refcount->put();
154}
155
156template<class T>
224ce89b 157void RGWQuotaCache<T>::set_stats(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats)
7c673cae
FG
158{
159 qs.stats = stats;
160 qs.expiration = ceph_clock_now();
161 qs.async_refresh_time = qs.expiration;
162 qs.expiration += store->ctx()->_conf->rgw_bucket_quota_ttl;
163 qs.async_refresh_time += store->ctx()->_conf->rgw_bucket_quota_ttl / 2;
164
165 map_add(user, bucket, qs);
166}
167
168template<class T>
522d829b 169int RGWQuotaCache<T>::get_stats(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider* dpp) {
7c673cae
FG
170 RGWQuotaCacheStats qs;
171 utime_t now = ceph_clock_now();
172 if (map_find(user, bucket, qs)) {
173 if (qs.async_refresh_time.sec() > 0 && now >= qs.async_refresh_time) {
174 int r = async_refresh(user, bucket, qs);
175 if (r < 0) {
20effc67 176 ldpp_dout(dpp, 0) << "ERROR: quota async refresh returned ret=" << r << dendl;
7c673cae
FG
177
178 /* continue processing, might be a transient error, async refresh is just optimization */
179 }
180 }
181
522d829b 182 if (qs.expiration > ceph_clock_now()) {
7c673cae
FG
183 stats = qs.stats;
184 return 0;
185 }
186 }
187
b3b6e05e 188 int ret = fetch_stats_from_storage(user, bucket, stats, y, dpp);
7c673cae
FG
189 if (ret < 0 && ret != -ENOENT)
190 return ret;
191
192 set_stats(user, bucket, qs, stats);
193
194 return 0;
195}
196
197
198template<class T>
199class RGWQuotaStatsUpdate : public lru_map<T, RGWQuotaCacheStats>::UpdateContext {
200 const int objs_delta;
201 const uint64_t added_bytes;
202 const uint64_t removed_bytes;
203public:
204 RGWQuotaStatsUpdate(const int objs_delta,
205 const uint64_t added_bytes,
206 const uint64_t removed_bytes)
207 : objs_delta(objs_delta),
208 added_bytes(added_bytes),
209 removed_bytes(removed_bytes) {
210 }
211
212 bool update(RGWQuotaCacheStats * const entry) override {
213 const uint64_t rounded_added = rgw_rounded_objsize(added_bytes);
214 const uint64_t rounded_removed = rgw_rounded_objsize(removed_bytes);
215
181888fb 216 if (((int64_t)(entry->stats.size + added_bytes - removed_bytes)) >= 0) {
c07f9fc5
FG
217 entry->stats.size += added_bytes - removed_bytes;
218 } else {
219 entry->stats.size = 0;
220 }
221
181888fb 222 if (((int64_t)(entry->stats.size_rounded + rounded_added - rounded_removed)) >= 0) {
c07f9fc5
FG
223 entry->stats.size_rounded += rounded_added - rounded_removed;
224 } else {
225 entry->stats.size_rounded = 0;
226 }
227
181888fb 228 if (((int64_t)(entry->stats.num_objects + objs_delta)) >= 0) {
c07f9fc5
FG
229 entry->stats.num_objects += objs_delta;
230 } else {
231 entry->stats.num_objects = 0;
232 }
7c673cae
FG
233
234 return true;
235 }
236};
237
238
239template<class T>
240void RGWQuotaCache<T>::adjust_stats(const rgw_user& user, rgw_bucket& bucket, int objs_delta,
241 uint64_t added_bytes, uint64_t removed_bytes)
242{
243 RGWQuotaStatsUpdate<T> update(objs_delta, added_bytes, removed_bytes);
244 map_find_and_update(user, bucket, &update);
245
246 data_modified(user, bucket);
247}
248
249class BucketAsyncRefreshHandler : public RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler,
250 public RGWGetBucketStats_CB {
251 rgw_user user;
252public:
20effc67 253 BucketAsyncRefreshHandler(rgw::sal::Store* _store, RGWQuotaCache<rgw_bucket> *_cache,
224ce89b 254 const rgw_user& _user, const rgw_bucket& _bucket) :
7c673cae
FG
255 RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler(_store, _cache),
256 RGWGetBucketStats_CB(_bucket), user(_user) {}
257
258 void drop_reference() override { put(); }
259 void handle_response(int r) override;
260 int init_fetch() override;
261};
262
263int BucketAsyncRefreshHandler::init_fetch()
264{
20effc67 265 std::unique_ptr<rgw::sal::Bucket> rbucket;
7c673cae 266
b3b6e05e 267 const DoutPrefix dp(store->ctx(), dout_subsys, "rgw bucket async refresh handler: ");
20effc67 268 int r = store->get_bucket(&dp, nullptr, bucket, &rbucket, null_yield);
7c673cae 269 if (r < 0) {
b3b6e05e 270 ldpp_dout(&dp, 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl;
7c673cae
FG
271 return r;
272 }
273
b3b6e05e 274 ldpp_dout(&dp, 20) << "initiating async quota refresh for bucket=" << bucket << dendl;
7c673cae 275
20effc67 276 r = rbucket->read_stats_async(&dp, RGW_NO_SHARD, this);
7c673cae 277 if (r < 0) {
b3b6e05e 278 ldpp_dout(&dp, 0) << "could not get bucket info for bucket=" << bucket.name << dendl;
7c673cae 279
20effc67 280 /* read_stats_async() dropped our reference already */
7c673cae
FG
281 return r;
282 }
283
284 return 0;
285}
286
287void BucketAsyncRefreshHandler::handle_response(const int r)
288{
289 if (r < 0) {
290 ldout(store->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
c07f9fc5
FG
291 cache->async_refresh_fail(user, bucket);
292 return;
7c673cae
FG
293 }
294
295 RGWStorageStats bs;
296
297 for (const auto& pair : *stats) {
298 const RGWStorageStats& s = pair.second;
299
300 bs.size += s.size;
301 bs.size_rounded += s.size_rounded;
302 bs.num_objects += s.num_objects;
303 }
304
305 cache->async_refresh_response(user, bucket, bs);
306}
307
308class RGWBucketStatsCache : public RGWQuotaCache<rgw_bucket> {
309protected:
224ce89b 310 bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
7c673cae
FG
311 return stats_map.find(bucket, qs);
312 }
313
224ce89b 314 bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, lru_map<rgw_bucket, RGWQuotaCacheStats>::UpdateContext *ctx) override {
7c673cae
FG
315 return stats_map.find_and_update(bucket, NULL, ctx);
316 }
317
224ce89b 318 void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
7c673cae
FG
319 stats_map.add(bucket, qs);
320 }
321
b3b6e05e 322 int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider *dpp) override;
7c673cae
FG
323
324public:
20effc67 325 explicit RGWBucketStatsCache(rgw::sal::Store* _store) : RGWQuotaCache<rgw_bucket>(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size) {
7c673cae
FG
326 }
327
224ce89b 328 AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override {
7c673cae
FG
329 return new BucketAsyncRefreshHandler(store, this, user, bucket);
330 }
331};
332
20effc67 333int RGWBucketStatsCache::fetch_stats_from_storage(const rgw_user& _u, const rgw_bucket& _b, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider *dpp)
7c673cae 334{
20effc67
TL
335 std::unique_ptr<rgw::sal::User> user = store->get_user(_u);
336 std::unique_ptr<rgw::sal::Bucket> bucket;
7c673cae 337
20effc67 338 int r = store->get_bucket(dpp, user.get(), _b, &bucket, y);
7c673cae 339 if (r < 0) {
20effc67 340 ldpp_dout(dpp, 0) << "could not get bucket info for bucket=" << _b << " r=" << r << dendl;
7c673cae
FG
341 return r;
342 }
343
344 string bucket_ver;
345 string master_ver;
346
347 map<RGWObjCategory, RGWStorageStats> bucket_stats;
20effc67 348 r = bucket->read_stats(dpp, RGW_NO_SHARD, &bucket_ver, &master_ver, bucket_stats);
7c673cae 349 if (r < 0) {
b3b6e05e 350 ldpp_dout(dpp, 0) << "could not get bucket stats for bucket="
20effc67 351 << _b.name << dendl;
7c673cae
FG
352 return r;
353 }
354
355 stats = RGWStorageStats();
356
357 for (const auto& pair : bucket_stats) {
358 const RGWStorageStats& s = pair.second;
359
360 stats.size += s.size;
361 stats.size_rounded += s.size_rounded;
362 stats.num_objects += s.num_objects;
363 }
364
365 return 0;
366}
367
368class UserAsyncRefreshHandler : public RGWQuotaCache<rgw_user>::AsyncRefreshHandler,
369 public RGWGetUserStats_CB {
b3b6e05e 370 const DoutPrefixProvider *dpp;
7c673cae
FG
371 rgw_bucket bucket;
372public:
20effc67 373 UserAsyncRefreshHandler(const DoutPrefixProvider *_dpp, rgw::sal::Store* _store, RGWQuotaCache<rgw_user> *_cache,
224ce89b 374 const rgw_user& _user, const rgw_bucket& _bucket) :
7c673cae
FG
375 RGWQuotaCache<rgw_user>::AsyncRefreshHandler(_store, _cache),
376 RGWGetUserStats_CB(_user),
b3b6e05e 377 dpp(_dpp),
7c673cae
FG
378 bucket(_bucket) {}
379
380 void drop_reference() override { put(); }
381 int init_fetch() override;
382 void handle_response(int r) override;
383};
384
385int UserAsyncRefreshHandler::init_fetch()
386{
20effc67
TL
387 std::unique_ptr<rgw::sal::User> ruser = store->get_user(user);
388
b3b6e05e 389 ldpp_dout(dpp, 20) << "initiating async quota refresh for user=" << user << dendl;
20effc67 390 int r = ruser->read_stats_async(dpp, this);
7c673cae 391 if (r < 0) {
b3b6e05e 392 ldpp_dout(dpp, 0) << "could not get bucket info for user=" << user << dendl;
7c673cae
FG
393
394 /* get_bucket_stats_async() dropped our reference already */
395 return r;
396 }
397
398 return 0;
399}
400
401void UserAsyncRefreshHandler::handle_response(int r)
402{
403 if (r < 0) {
404 ldout(store->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
c07f9fc5
FG
405 cache->async_refresh_fail(user, bucket);
406 return;
7c673cae
FG
407 }
408
409 cache->async_refresh_response(user, bucket, stats);
410}
411
412class RGWUserStatsCache : public RGWQuotaCache<rgw_user> {
b3b6e05e 413 const DoutPrefixProvider *dpp;
7c673cae 414 std::atomic<bool> down_flag = { false };
9f95a23c 415 ceph::shared_mutex mutex = ceph::make_shared_mutex("RGWUserStatsCache");
7c673cae
FG
416 map<rgw_bucket, rgw_user> modified_buckets;
417
418 /* thread, sync recent modified buckets info */
419 class BucketsSyncThread : public Thread {
420 CephContext *cct;
421 RGWUserStatsCache *stats;
422
9f95a23c
TL
423 ceph::mutex lock = ceph::make_mutex("RGWUserStatsCache::BucketsSyncThread");
424 ceph::condition_variable cond;
7c673cae
FG
425 public:
426
9f95a23c 427 BucketsSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s) {}
7c673cae
FG
428
429 void *entry() override {
430 ldout(cct, 20) << "BucketsSyncThread: start" << dendl;
431 do {
432 map<rgw_bucket, rgw_user> buckets;
433
434 stats->swap_modified_buckets(buckets);
435
436 for (map<rgw_bucket, rgw_user>::iterator iter = buckets.begin(); iter != buckets.end(); ++iter) {
437 rgw_bucket bucket = iter->first;
438 rgw_user& user = iter->second;
439 ldout(cct, 20) << "BucketsSyncThread: sync user=" << user << " bucket=" << bucket << dendl;
b3b6e05e
TL
440 const DoutPrefix dp(cct, dout_subsys, "rgw bucket sync thread: ");
441 int r = stats->sync_bucket(user, bucket, null_yield, &dp);
7c673cae
FG
442 if (r < 0) {
443 ldout(cct, 0) << "WARNING: sync_bucket() returned r=" << r << dendl;
444 }
445 }
446
447 if (stats->going_down())
448 break;
449
9f95a23c
TL
450 std::unique_lock locker{lock};
451 cond.wait_for(
452 locker,
453 std::chrono::seconds(cct->_conf->rgw_user_quota_bucket_sync_interval));
7c673cae
FG
454 } while (!stats->going_down());
455 ldout(cct, 20) << "BucketsSyncThread: done" << dendl;
456
457 return NULL;
458 }
459
460 void stop() {
9f95a23c
TL
461 std::lock_guard l{lock};
462 cond.notify_all();
7c673cae
FG
463 }
464 };
465
466 /*
467 * thread, full sync all users stats periodically
468 *
469 * only sync non idle users or ones that never got synced before, this is needed so that
470 * users that didn't have quota turned on before (or existed before the user objclass
471 * tracked stats) need to get their backend stats up to date.
472 */
473 class UserSyncThread : public Thread {
474 CephContext *cct;
475 RGWUserStatsCache *stats;
476
9f95a23c
TL
477 ceph::mutex lock = ceph::make_mutex("RGWUserStatsCache::UserSyncThread");
478 ceph::condition_variable cond;
7c673cae
FG
479 public:
480
9f95a23c 481 UserSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s) {}
7c673cae
FG
482
483 void *entry() override {
484 ldout(cct, 20) << "UserSyncThread: start" << dendl;
485 do {
b3b6e05e
TL
486 const DoutPrefix dp(cct, dout_subsys, "rgw user sync thread: ");
487 int ret = stats->sync_all_users(&dp, null_yield);
7c673cae
FG
488 if (ret < 0) {
489 ldout(cct, 5) << "ERROR: sync_all_users() returned ret=" << ret << dendl;
490 }
491
b5b8bbf5
FG
492 if (stats->going_down())
493 break;
494
9f95a23c
TL
495 std::unique_lock l{lock};
496 cond.wait_for(l, std::chrono::seconds(cct->_conf->rgw_user_quota_sync_interval));
7c673cae
FG
497 } while (!stats->going_down());
498 ldout(cct, 20) << "UserSyncThread: done" << dendl;
499
500 return NULL;
501 }
502
503 void stop() {
9f95a23c
TL
504 std::lock_guard l{lock};
505 cond.notify_all();
7c673cae
FG
506 }
507 };
508
509 BucketsSyncThread *buckets_sync_thread;
510 UserSyncThread *user_sync_thread;
511protected:
224ce89b 512 bool map_find(const rgw_user& user,const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
7c673cae
FG
513 return stats_map.find(user, qs);
514 }
515
224ce89b 516 bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, lru_map<rgw_user, RGWQuotaCacheStats>::UpdateContext *ctx) override {
7c673cae
FG
517 return stats_map.find_and_update(user, NULL, ctx);
518 }
519
224ce89b 520 void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
7c673cae
FG
521 stats_map.add(user, qs);
522 }
523
b3b6e05e
TL
524 int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider *dpp) override;
525 int sync_bucket(const rgw_user& rgw_user, rgw_bucket& bucket, optional_yield y, const DoutPrefixProvider *dpp);
526 int sync_user(const DoutPrefixProvider *dpp, const rgw_user& user, optional_yield y);
527 int sync_all_users(const DoutPrefixProvider *dpp, optional_yield y);
7c673cae
FG
528
529 void data_modified(const rgw_user& user, rgw_bucket& bucket) override;
530
531 void swap_modified_buckets(map<rgw_bucket, rgw_user>& out) {
9f95a23c 532 std::unique_lock lock{mutex};
7c673cae 533 modified_buckets.swap(out);
7c673cae
FG
534 }
535
536 template<class T> /* easier doing it as a template, Thread doesn't have ->stop() */
537 void stop_thread(T **pthr) {
538 T *thread = *pthr;
539 if (!thread)
540 return;
541
542 thread->stop();
543 thread->join();
544 delete thread;
545 *pthr = NULL;
546 }
547
548public:
20effc67 549 RGWUserStatsCache(const DoutPrefixProvider *dpp, rgw::sal::Store* _store, bool quota_threads)
b3b6e05e 550 : RGWQuotaCache<rgw_user>(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size), dpp(dpp)
9f95a23c 551 {
7c673cae
FG
552 if (quota_threads) {
553 buckets_sync_thread = new BucketsSyncThread(store->ctx(), this);
554 buckets_sync_thread->create("rgw_buck_st_syn");
555 user_sync_thread = new UserSyncThread(store->ctx(), this);
556 user_sync_thread->create("rgw_user_st_syn");
557 } else {
558 buckets_sync_thread = NULL;
559 user_sync_thread = NULL;
560 }
561 }
562 ~RGWUserStatsCache() override {
563 stop();
564 }
565
224ce89b 566 AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override {
b3b6e05e 567 return new UserAsyncRefreshHandler(dpp, store, this, user, bucket);
7c673cae
FG
568 }
569
7c673cae
FG
570 bool going_down() {
571 return down_flag;
572 }
573
574 void stop() {
575 down_flag = true;
9f95a23c
TL
576 {
577 std::unique_lock lock{mutex};
578 stop_thread(&buckets_sync_thread);
579 }
7c673cae
FG
580 stop_thread(&user_sync_thread);
581 }
582};
583
20effc67
TL
584int RGWUserStatsCache::fetch_stats_from_storage(const rgw_user& _u,
585 const rgw_bucket& _b,
f67539c2 586 RGWStorageStats& stats,
b3b6e05e
TL
587 optional_yield y,
588 const DoutPrefixProvider *dpp)
7c673cae 589{
20effc67
TL
590 std::unique_ptr<rgw::sal::User> user = store->get_user(_u);
591 int r = user->read_stats(dpp, y, &stats);
7c673cae 592 if (r < 0) {
20effc67 593 ldpp_dout(dpp, 0) << "could not get user stats for user=" << user << dendl;
7c673cae
FG
594 return r;
595 }
596
597 return 0;
598}
599
20effc67 600int RGWUserStatsCache::sync_bucket(const rgw_user& _u, rgw_bucket& _b, optional_yield y, const DoutPrefixProvider *dpp)
7c673cae 601{
20effc67
TL
602 std::unique_ptr<rgw::sal::User> user = store->get_user(_u);
603 std::unique_ptr<rgw::sal::Bucket> bucket;
7c673cae 604
20effc67 605 int r = store->get_bucket(dpp, user.get(), _b, &bucket, y);
7c673cae 606 if (r < 0) {
20effc67 607 ldpp_dout(dpp, 0) << "could not get bucket info for bucket=" << _b << " r=" << r << dendl;
7c673cae
FG
608 return r;
609 }
610
20effc67 611 r = bucket->sync_user_stats(dpp, y);
7c673cae 612 if (r < 0) {
20effc67 613 ldpp_dout(dpp, 0) << "ERROR: sync_user_stats() for user=" << _u << ", bucket=" << bucket << " returned " << r << dendl;
7c673cae
FG
614 return r;
615 }
616
20effc67 617 return bucket->check_bucket_shards(dpp);
7c673cae
FG
618}
619
20effc67 620int RGWUserStatsCache::sync_user(const DoutPrefixProvider *dpp, const rgw_user& _u, optional_yield y)
7c673cae 621{
9f95a23c
TL
622 RGWStorageStats stats;
623 ceph::real_time last_stats_sync;
624 ceph::real_time last_stats_update;
20effc67 625 std::unique_ptr<rgw::sal::User> user = store->get_user(rgw_user(_u.to_str()));
9f95a23c 626
20effc67 627 int ret = user->read_stats(dpp, y, &stats, &last_stats_sync, &last_stats_update);
7c673cae 628 if (ret < 0) {
20effc67 629 ldpp_dout(dpp, 5) << "ERROR: can't read user header: ret=" << ret << dendl;
7c673cae
FG
630 return ret;
631 }
632
633 if (!store->ctx()->_conf->rgw_user_quota_sync_idle_users &&
9f95a23c 634 last_stats_update < last_stats_sync) {
20effc67 635 ldpp_dout(dpp, 20) << "user is idle, not doing a full sync (user=" << user << ")" << dendl;
7c673cae
FG
636 return 0;
637 }
638
9f95a23c 639 real_time when_need_full_sync = last_stats_sync;
7c673cae
FG
640 when_need_full_sync += make_timespan(store->ctx()->_conf->rgw_user_quota_sync_wait_time);
641
642 // check if enough time passed since last full sync
643 /* FIXME: missing check? */
644
20effc67 645 ret = rgw_user_sync_all_stats(dpp, store, user.get(), y);
7c673cae 646 if (ret < 0) {
20effc67 647 ldpp_dout(dpp, 0) << "ERROR: failed user stats sync, ret=" << ret << dendl;
7c673cae
FG
648 return ret;
649 }
650
651 return 0;
652}
653
b3b6e05e 654int RGWUserStatsCache::sync_all_users(const DoutPrefixProvider *dpp, optional_yield y)
7c673cae
FG
655{
656 string key = "user";
657 void *handle;
658
20effc67 659 int ret = store->meta_list_keys_init(dpp, key, string(), &handle);
7c673cae 660 if (ret < 0) {
b3b6e05e 661 ldpp_dout(dpp, 10) << "ERROR: can't get key: ret=" << ret << dendl;
7c673cae
FG
662 return ret;
663 }
664
665 bool truncated;
666 int max = 1000;
667
668 do {
669 list<string> keys;
20effc67 670 ret = store->meta_list_keys_next(dpp, handle, max, keys, &truncated);
7c673cae 671 if (ret < 0) {
b3b6e05e 672 ldpp_dout(dpp, 0) << "ERROR: lists_keys_next(): ret=" << ret << dendl;
7c673cae
FG
673 goto done;
674 }
675 for (list<string>::iterator iter = keys.begin();
676 iter != keys.end() && !going_down();
677 ++iter) {
678 rgw_user user(*iter);
b3b6e05e
TL
679 ldpp_dout(dpp, 20) << "RGWUserStatsCache: sync user=" << user << dendl;
680 int ret = sync_user(dpp, user, y);
7c673cae 681 if (ret < 0) {
b3b6e05e 682 ldpp_dout(dpp, 5) << "ERROR: sync_user() failed, user=" << user << " ret=" << ret << dendl;
7c673cae
FG
683
684 /* continuing to next user */
685 continue;
686 }
687 }
688 } while (truncated);
689
690 ret = 0;
691done:
20effc67 692 store->meta_list_keys_complete(handle);
7c673cae
FG
693 return ret;
694}
695
696void RGWUserStatsCache::data_modified(const rgw_user& user, rgw_bucket& bucket)
697{
698 /* racy, but it's ok */
9f95a23c 699 mutex.lock_shared();
7c673cae 700 bool need_update = modified_buckets.find(bucket) == modified_buckets.end();
9f95a23c 701 mutex.unlock_shared();
7c673cae
FG
702
703 if (need_update) {
9f95a23c 704 std::unique_lock lock{mutex};
7c673cae 705 modified_buckets[bucket] = user;
7c673cae
FG
706 }
707}
708
709
710class RGWQuotaInfoApplier {
711 /* NOTE: no non-static field allowed as instances are supposed to live in
712 * the static memory only. */
713protected:
714 RGWQuotaInfoApplier() = default;
715
716public:
717 virtual ~RGWQuotaInfoApplier() {}
718
20effc67
TL
719 virtual bool is_size_exceeded(const DoutPrefixProvider *dpp,
720 const char * const entity,
7c673cae
FG
721 const RGWQuotaInfo& qinfo,
722 const RGWStorageStats& stats,
723 const uint64_t size) const = 0;
724
20effc67
TL
725 virtual bool is_num_objs_exceeded(const DoutPrefixProvider *dpp,
726 const char * const entity,
7c673cae
FG
727 const RGWQuotaInfo& qinfo,
728 const RGWStorageStats& stats,
729 const uint64_t num_objs) const = 0;
730
731 static const RGWQuotaInfoApplier& get_instance(const RGWQuotaInfo& qinfo);
732};
733
734class RGWQuotaInfoDefApplier : public RGWQuotaInfoApplier {
735public:
20effc67 736 bool is_size_exceeded(const DoutPrefixProvider *dpp, const char * const entity,
7c673cae
FG
737 const RGWQuotaInfo& qinfo,
738 const RGWStorageStats& stats,
739 const uint64_t size) const override;
740
20effc67 741 bool is_num_objs_exceeded(const DoutPrefixProvider *dpp, const char * const entity,
7c673cae
FG
742 const RGWQuotaInfo& qinfo,
743 const RGWStorageStats& stats,
744 const uint64_t num_objs) const override;
745};
746
747class RGWQuotaInfoRawApplier : public RGWQuotaInfoApplier {
748public:
20effc67 749 bool is_size_exceeded(const DoutPrefixProvider *dpp, const char * const entity,
7c673cae
FG
750 const RGWQuotaInfo& qinfo,
751 const RGWStorageStats& stats,
752 const uint64_t size) const override;
753
20effc67 754 bool is_num_objs_exceeded(const DoutPrefixProvider *dpp, const char * const entity,
7c673cae
FG
755 const RGWQuotaInfo& qinfo,
756 const RGWStorageStats& stats,
757 const uint64_t num_objs) const override;
758};
759
760
20effc67
TL
761bool RGWQuotaInfoDefApplier::is_size_exceeded(const DoutPrefixProvider *dpp,
762 const char * const entity,
7c673cae
FG
763 const RGWQuotaInfo& qinfo,
764 const RGWStorageStats& stats,
765 const uint64_t size) const
766{
767 if (qinfo.max_size < 0) {
768 /* The limit is not enabled. */
769 return false;
770 }
771
772 const uint64_t cur_size = stats.size_rounded;
773 const uint64_t new_size = rgw_rounded_objsize(size);
774
775 if (cur_size + new_size > static_cast<uint64_t>(qinfo.max_size)) {
20effc67 776 ldpp_dout(dpp, 10) << "quota exceeded: stats.size_rounded=" << stats.size_rounded
7c673cae
FG
777 << " size=" << new_size << " "
778 << entity << "_quota.max_size=" << qinfo.max_size << dendl;
779 return true;
780 }
781
782 return false;
783}
784
20effc67
TL
785bool RGWQuotaInfoDefApplier::is_num_objs_exceeded(const DoutPrefixProvider *dpp,
786 const char * const entity,
7c673cae
FG
787 const RGWQuotaInfo& qinfo,
788 const RGWStorageStats& stats,
789 const uint64_t num_objs) const
790{
791 if (qinfo.max_objects < 0) {
792 /* The limit is not enabled. */
793 return false;
794 }
795
796 if (stats.num_objects + num_objs > static_cast<uint64_t>(qinfo.max_objects)) {
20effc67 797 ldpp_dout(dpp, 10) << "quota exceeded: stats.num_objects=" << stats.num_objects
7c673cae
FG
798 << " " << entity << "_quota.max_objects=" << qinfo.max_objects
799 << dendl;
800 return true;
801 }
802
803 return false;
804}
805
20effc67
TL
806bool RGWQuotaInfoRawApplier::is_size_exceeded(const DoutPrefixProvider *dpp,
807 const char * const entity,
7c673cae
FG
808 const RGWQuotaInfo& qinfo,
809 const RGWStorageStats& stats,
810 const uint64_t size) const
811{
812 if (qinfo.max_size < 0) {
813 /* The limit is not enabled. */
814 return false;
815 }
816
817 const uint64_t cur_size = stats.size;
818
819 if (cur_size + size > static_cast<uint64_t>(qinfo.max_size)) {
20effc67 820 ldpp_dout(dpp, 10) << "quota exceeded: stats.size=" << stats.size
7c673cae
FG
821 << " size=" << size << " "
822 << entity << "_quota.max_size=" << qinfo.max_size << dendl;
823 return true;
824 }
825
826 return false;
827}
828
20effc67
TL
829bool RGWQuotaInfoRawApplier::is_num_objs_exceeded(const DoutPrefixProvider *dpp,
830 const char * const entity,
7c673cae
FG
831 const RGWQuotaInfo& qinfo,
832 const RGWStorageStats& stats,
833 const uint64_t num_objs) const
834{
835 if (qinfo.max_objects < 0) {
836 /* The limit is not enabled. */
837 return false;
838 }
839
840 if (stats.num_objects + num_objs > static_cast<uint64_t>(qinfo.max_objects)) {
20effc67 841 ldpp_dout(dpp, 10) << "quota exceeded: stats.num_objects=" << stats.num_objects
7c673cae
FG
842 << " " << entity << "_quota.max_objects=" << qinfo.max_objects
843 << dendl;
844 return true;
845 }
846
847 return false;
848}
849
850const RGWQuotaInfoApplier& RGWQuotaInfoApplier::get_instance(
851 const RGWQuotaInfo& qinfo)
852{
853 static RGWQuotaInfoDefApplier default_qapplier;
854 static RGWQuotaInfoRawApplier raw_qapplier;
855
856 if (qinfo.check_on_raw) {
857 return raw_qapplier;
858 } else {
859 return default_qapplier;
860 }
861}
862
863
864class RGWQuotaHandlerImpl : public RGWQuotaHandler {
20effc67 865 rgw::sal::Store* store;
7c673cae
FG
866 RGWBucketStatsCache bucket_stats_cache;
867 RGWUserStatsCache user_stats_cache;
868
20effc67
TL
869 int check_quota(const DoutPrefixProvider *dpp,
870 const char * const entity,
7c673cae
FG
871 const RGWQuotaInfo& quota,
872 const RGWStorageStats& stats,
873 const uint64_t num_objs,
874 const uint64_t size) {
875 if (!quota.enabled) {
876 return 0;
877 }
878
879 const auto& quota_applier = RGWQuotaInfoApplier::get_instance(quota);
880
20effc67 881 ldpp_dout(dpp, 20) << entity
7c673cae
FG
882 << " quota: max_objects=" << quota.max_objects
883 << " max_size=" << quota.max_size << dendl;
884
885
20effc67 886 if (quota_applier.is_num_objs_exceeded(dpp, entity, quota, stats, num_objs)) {
7c673cae
FG
887 return -ERR_QUOTA_EXCEEDED;
888 }
889
20effc67 890 if (quota_applier.is_size_exceeded(dpp, entity, quota, stats, size)) {
7c673cae
FG
891 return -ERR_QUOTA_EXCEEDED;
892 }
893
20effc67 894 ldpp_dout(dpp, 20) << entity << " quota OK:"
7c673cae
FG
895 << " stats.num_objects=" << stats.num_objects
896 << " stats.size=" << stats.size << dendl;
897 return 0;
898 }
899public:
20effc67 900 RGWQuotaHandlerImpl(const DoutPrefixProvider *dpp, rgw::sal::Store* _store, bool quota_threads) : store(_store),
7c673cae 901 bucket_stats_cache(_store),
b3b6e05e 902 user_stats_cache(dpp, _store, quota_threads) {}
7c673cae 903
20effc67
TL
904 int check_quota(const DoutPrefixProvider *dpp,
905 const rgw_user& user,
f67539c2
TL
906 rgw_bucket& bucket,
907 RGWQuotaInfo& user_quota,
908 RGWQuotaInfo& bucket_quota,
909 uint64_t num_objs,
910 uint64_t size, optional_yield y) override {
7c673cae
FG
911
912 if (!bucket_quota.enabled && !user_quota.enabled) {
913 return 0;
914 }
915
916 /*
917 * we need to fetch bucket stats if the user quota is enabled, because
918 * the whole system relies on us periodically updating the user's bucket
919 * stats in the user's header, this happens in get_stats() if we actually
920 * fetch that info and not rely on cached data
921 */
922
b3b6e05e 923 const DoutPrefix dp(store->ctx(), dout_subsys, "rgw quota handler: ");
7c673cae
FG
924 if (bucket_quota.enabled) {
925 RGWStorageStats bucket_stats;
522d829b 926 int ret = bucket_stats_cache.get_stats(user, bucket, bucket_stats, y, &dp);
7c673cae
FG
927 if (ret < 0) {
928 return ret;
929 }
20effc67 930 ret = check_quota(dpp, "bucket", bucket_quota, bucket_stats, num_objs, size);
7c673cae
FG
931 if (ret < 0) {
932 return ret;
933 }
934 }
935
936 if (user_quota.enabled) {
937 RGWStorageStats user_stats;
522d829b 938 int ret = user_stats_cache.get_stats(user, bucket, user_stats, y, &dp);
7c673cae
FG
939 if (ret < 0) {
940 return ret;
941 }
20effc67 942 ret = check_quota(dpp, "user", user_quota, user_stats, num_objs, size);
7c673cae
FG
943 if (ret < 0) {
944 return ret;
945 }
946 }
947 return 0;
948 }
949
950 void update_stats(const rgw_user& user, rgw_bucket& bucket, int obj_delta, uint64_t added_bytes, uint64_t removed_bytes) override {
951 bucket_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes);
952 user_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes);
953 }
31f18b77 954
20effc67 955 void check_bucket_shards(const DoutPrefixProvider *dpp, uint64_t max_objs_per_shard, uint64_t num_shards,
9f95a23c 956 uint64_t num_objs, bool& need_resharding, uint32_t *suggested_num_shards) override
31f18b77 957 {
9f95a23c 958 if (num_objs > num_shards * max_objs_per_shard) {
20effc67 959 ldpp_dout(dpp, 0) << __func__ << ": resharding needed: stats.num_objects=" << num_objs
31f18b77
FG
960 << " shard max_objects=" << max_objs_per_shard * num_shards << dendl;
961 need_resharding = true;
962 if (suggested_num_shards) {
9f95a23c 963 *suggested_num_shards = num_objs * 2 / max_objs_per_shard;
31f18b77
FG
964 }
965 } else {
966 need_resharding = false;
967 }
31f18b77 968 }
7c673cae
FG
969};
970
971
20effc67 972RGWQuotaHandler *RGWQuotaHandler::generate_handler(const DoutPrefixProvider *dpp, rgw::sal::Store* store, bool quota_threads)
7c673cae 973{
b3b6e05e 974 return new RGWQuotaHandlerImpl(dpp, store, quota_threads);
7c673cae
FG
975}
976
977void RGWQuotaHandler::free_handler(RGWQuotaHandler *handler)
978{
979 delete handler;
980}
31f18b77
FG
981
982
11fdf7f2 983void rgw_apply_default_bucket_quota(RGWQuotaInfo& quota, const ConfigProxy& conf)
f64942e4 984{
11fdf7f2
TL
985 if (conf->rgw_bucket_default_quota_max_objects >= 0) {
986 quota.max_objects = conf->rgw_bucket_default_quota_max_objects;
f64942e4
AA
987 quota.enabled = true;
988 }
11fdf7f2
TL
989 if (conf->rgw_bucket_default_quota_max_size >= 0) {
990 quota.max_size = conf->rgw_bucket_default_quota_max_size;
f64942e4
AA
991 quota.enabled = true;
992 }
993}
994
11fdf7f2 995void rgw_apply_default_user_quota(RGWQuotaInfo& quota, const ConfigProxy& conf)
f64942e4 996{
11fdf7f2
TL
997 if (conf->rgw_user_default_quota_max_objects >= 0) {
998 quota.max_objects = conf->rgw_user_default_quota_max_objects;
f64942e4
AA
999 quota.enabled = true;
1000 }
11fdf7f2
TL
1001 if (conf->rgw_user_default_quota_max_size >= 0) {
1002 quota.max_size = conf->rgw_user_default_quota_max_size;
f64942e4
AA
1003 quota.enabled = true;
1004 }
1005}
20effc67
TL
1006
1007void RGWQuotaInfo::dump(Formatter *f) const
1008{
1009 f->dump_bool("enabled", enabled);
1010 f->dump_bool("check_on_raw", check_on_raw);
1011
1012 f->dump_int("max_size", max_size);
1013 f->dump_int("max_size_kb", rgw_rounded_kb(max_size));
1014 f->dump_int("max_objects", max_objects);
1015}
1016
1017void RGWQuotaInfo::decode_json(JSONObj *obj)
1018{
1019 if (false == JSONDecoder::decode_json("max_size", max_size, obj)) {
1020 /* We're parsing an older version of the struct. */
1021 int64_t max_size_kb = 0;
1022
1023 JSONDecoder::decode_json("max_size_kb", max_size_kb, obj);
1024 max_size = max_size_kb * 1024;
1025 }
1026 JSONDecoder::decode_json("max_objects", max_objects, obj);
1027
1028 JSONDecoder::decode_json("check_on_raw", check_on_raw, obj);
1029 JSONDecoder::decode_json("enabled", enabled, obj);
1030}
1031