]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
9f95a23c | 2 | // vim: ts=8 sw=2 smarttab ft=cpp |
11fdf7f2 | 3 | |
7c673cae FG |
4 | /* |
5 | * Ceph - scalable distributed file system | |
6 | * | |
7 | * Copyright (C) 2013 Inktank, Inc | |
8 | * | |
9 | * This is free software; you can redistribute it and/or | |
10 | * modify it under the terms of the GNU Lesser General Public | |
11 | * License version 2.1, as published by the Free Software | |
12 | * Foundation. See file COPYING. | |
13 | * | |
14 | */ | |
15 | ||
16 | ||
17 | #include "include/utime.h" | |
18 | #include "common/lru_map.h" | |
19 | #include "common/RefCountedObj.h" | |
20 | #include "common/Thread.h" | |
9f95a23c | 21 | #include "common/ceph_mutex.h" |
7c673cae FG |
22 | |
23 | #include "rgw_common.h" | |
9f95a23c | 24 | #include "rgw_sal.h" |
f67539c2 | 25 | #include "rgw_sal_rados.h" |
7c673cae FG |
26 | #include "rgw_quota.h" |
27 | #include "rgw_bucket.h" | |
28 | #include "rgw_user.h" | |
29 | ||
11fdf7f2 | 30 | #include "services/svc_sys_obj.h" |
9f95a23c | 31 | #include "services/svc_meta.h" |
11fdf7f2 | 32 | |
7c673cae FG |
33 | #include <atomic> |
34 | ||
35 | #define dout_context g_ceph_context | |
36 | #define dout_subsys ceph_subsys_rgw | |
37 | ||
20effc67 | 38 | using namespace std; |
7c673cae FG |
39 | |
40 | struct RGWQuotaCacheStats { | |
41 | RGWStorageStats stats; | |
42 | utime_t expiration; | |
43 | utime_t async_refresh_time; | |
44 | }; | |
45 | ||
46 | template<class T> | |
47 | class RGWQuotaCache { | |
48 | protected: | |
20effc67 | 49 | rgw::sal::Store* store; |
7c673cae FG |
50 | lru_map<T, RGWQuotaCacheStats> stats_map; |
51 | RefCountedWaitObject *async_refcount; | |
52 | ||
53 | class StatsAsyncTestSet : public lru_map<T, RGWQuotaCacheStats>::UpdateContext { | |
54 | int objs_delta; | |
55 | uint64_t added_bytes; | |
56 | uint64_t removed_bytes; | |
57 | public: | |
58 | StatsAsyncTestSet() : objs_delta(0), added_bytes(0), removed_bytes(0) {} | |
59 | bool update(RGWQuotaCacheStats *entry) override { | |
60 | if (entry->async_refresh_time.sec() == 0) | |
61 | return false; | |
62 | ||
63 | entry->async_refresh_time = utime_t(0, 0); | |
64 | ||
65 | return true; | |
66 | } | |
67 | }; | |
68 | ||
b3b6e05e | 69 | virtual int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider *dpp) = 0; |
7c673cae | 70 | |
224ce89b | 71 | virtual bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0; |
7c673cae | 72 | |
224ce89b WB |
73 | virtual bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, typename lru_map<T, RGWQuotaCacheStats>::UpdateContext *ctx) = 0; |
74 | virtual void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0; | |
7c673cae FG |
75 | |
76 | virtual void data_modified(const rgw_user& user, rgw_bucket& bucket) {} | |
77 | public: | |
20effc67 | 78 | RGWQuotaCache(rgw::sal::Store* _store, int size) : store(_store), stats_map(size) { |
7c673cae FG |
79 | async_refcount = new RefCountedWaitObject; |
80 | } | |
81 | virtual ~RGWQuotaCache() { | |
82 | async_refcount->put_wait(); /* wait for all pending async requests to complete */ | |
83 | } | |
84 | ||
522d829b TL |
85 | int get_stats(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, optional_yield y, |
86 | const DoutPrefixProvider* dpp); | |
7c673cae FG |
87 | void adjust_stats(const rgw_user& user, rgw_bucket& bucket, int objs_delta, uint64_t added_bytes, uint64_t removed_bytes); |
88 | ||
224ce89b WB |
89 | void set_stats(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats); |
90 | int async_refresh(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs); | |
7c673cae | 91 | void async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats); |
c07f9fc5 | 92 | void async_refresh_fail(const rgw_user& user, rgw_bucket& bucket); |
7c673cae FG |
93 | |
94 | class AsyncRefreshHandler { | |
95 | protected: | |
20effc67 | 96 | rgw::sal::Store* store; |
7c673cae FG |
97 | RGWQuotaCache<T> *cache; |
98 | public: | |
20effc67 | 99 | AsyncRefreshHandler(rgw::sal::Store* _store, RGWQuotaCache<T> *_cache) : store(_store), cache(_cache) {} |
7c673cae FG |
100 | virtual ~AsyncRefreshHandler() {} |
101 | ||
102 | virtual int init_fetch() = 0; | |
103 | virtual void drop_reference() = 0; | |
104 | }; | |
105 | ||
224ce89b | 106 | virtual AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) = 0; |
7c673cae FG |
107 | }; |
108 | ||
7c673cae | 109 | template<class T> |
224ce89b | 110 | int RGWQuotaCache<T>::async_refresh(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) |
7c673cae FG |
111 | { |
112 | /* protect against multiple updates */ | |
113 | StatsAsyncTestSet test_update; | |
114 | if (!map_find_and_update(user, bucket, &test_update)) { | |
115 | /* most likely we just raced with another update */ | |
116 | return 0; | |
117 | } | |
118 | ||
119 | async_refcount->get(); | |
120 | ||
121 | ||
122 | AsyncRefreshHandler *handler = allocate_refresh_handler(user, bucket); | |
123 | ||
124 | int ret = handler->init_fetch(); | |
125 | if (ret < 0) { | |
126 | async_refcount->put(); | |
127 | handler->drop_reference(); | |
128 | return ret; | |
129 | } | |
130 | ||
131 | return 0; | |
132 | } | |
133 | ||
c07f9fc5 FG |
134 | template<class T> |
135 | void RGWQuotaCache<T>::async_refresh_fail(const rgw_user& user, rgw_bucket& bucket) | |
136 | { | |
137 | ldout(store->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl; | |
138 | ||
139 | async_refcount->put(); | |
140 | } | |
141 | ||
7c673cae FG |
142 | template<class T> |
143 | void RGWQuotaCache<T>::async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats) | |
144 | { | |
145 | ldout(store->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl; | |
146 | ||
147 | RGWQuotaCacheStats qs; | |
148 | ||
149 | map_find(user, bucket, qs); | |
150 | ||
151 | set_stats(user, bucket, qs, stats); | |
152 | ||
153 | async_refcount->put(); | |
154 | } | |
155 | ||
156 | template<class T> | |
224ce89b | 157 | void RGWQuotaCache<T>::set_stats(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats) |
7c673cae FG |
158 | { |
159 | qs.stats = stats; | |
160 | qs.expiration = ceph_clock_now(); | |
161 | qs.async_refresh_time = qs.expiration; | |
162 | qs.expiration += store->ctx()->_conf->rgw_bucket_quota_ttl; | |
163 | qs.async_refresh_time += store->ctx()->_conf->rgw_bucket_quota_ttl / 2; | |
164 | ||
165 | map_add(user, bucket, qs); | |
166 | } | |
167 | ||
168 | template<class T> | |
522d829b | 169 | int RGWQuotaCache<T>::get_stats(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider* dpp) { |
7c673cae FG |
170 | RGWQuotaCacheStats qs; |
171 | utime_t now = ceph_clock_now(); | |
172 | if (map_find(user, bucket, qs)) { | |
173 | if (qs.async_refresh_time.sec() > 0 && now >= qs.async_refresh_time) { | |
174 | int r = async_refresh(user, bucket, qs); | |
175 | if (r < 0) { | |
20effc67 | 176 | ldpp_dout(dpp, 0) << "ERROR: quota async refresh returned ret=" << r << dendl; |
7c673cae FG |
177 | |
178 | /* continue processing, might be a transient error, async refresh is just optimization */ | |
179 | } | |
180 | } | |
181 | ||
522d829b | 182 | if (qs.expiration > ceph_clock_now()) { |
7c673cae FG |
183 | stats = qs.stats; |
184 | return 0; | |
185 | } | |
186 | } | |
187 | ||
b3b6e05e | 188 | int ret = fetch_stats_from_storage(user, bucket, stats, y, dpp); |
7c673cae FG |
189 | if (ret < 0 && ret != -ENOENT) |
190 | return ret; | |
191 | ||
192 | set_stats(user, bucket, qs, stats); | |
193 | ||
194 | return 0; | |
195 | } | |
196 | ||
197 | ||
198 | template<class T> | |
199 | class RGWQuotaStatsUpdate : public lru_map<T, RGWQuotaCacheStats>::UpdateContext { | |
200 | const int objs_delta; | |
201 | const uint64_t added_bytes; | |
202 | const uint64_t removed_bytes; | |
203 | public: | |
204 | RGWQuotaStatsUpdate(const int objs_delta, | |
205 | const uint64_t added_bytes, | |
206 | const uint64_t removed_bytes) | |
207 | : objs_delta(objs_delta), | |
208 | added_bytes(added_bytes), | |
209 | removed_bytes(removed_bytes) { | |
210 | } | |
211 | ||
212 | bool update(RGWQuotaCacheStats * const entry) override { | |
213 | const uint64_t rounded_added = rgw_rounded_objsize(added_bytes); | |
214 | const uint64_t rounded_removed = rgw_rounded_objsize(removed_bytes); | |
215 | ||
181888fb | 216 | if (((int64_t)(entry->stats.size + added_bytes - removed_bytes)) >= 0) { |
c07f9fc5 FG |
217 | entry->stats.size += added_bytes - removed_bytes; |
218 | } else { | |
219 | entry->stats.size = 0; | |
220 | } | |
221 | ||
181888fb | 222 | if (((int64_t)(entry->stats.size_rounded + rounded_added - rounded_removed)) >= 0) { |
c07f9fc5 FG |
223 | entry->stats.size_rounded += rounded_added - rounded_removed; |
224 | } else { | |
225 | entry->stats.size_rounded = 0; | |
226 | } | |
227 | ||
181888fb | 228 | if (((int64_t)(entry->stats.num_objects + objs_delta)) >= 0) { |
c07f9fc5 FG |
229 | entry->stats.num_objects += objs_delta; |
230 | } else { | |
231 | entry->stats.num_objects = 0; | |
232 | } | |
7c673cae FG |
233 | |
234 | return true; | |
235 | } | |
236 | }; | |
237 | ||
238 | ||
239 | template<class T> | |
240 | void RGWQuotaCache<T>::adjust_stats(const rgw_user& user, rgw_bucket& bucket, int objs_delta, | |
241 | uint64_t added_bytes, uint64_t removed_bytes) | |
242 | { | |
243 | RGWQuotaStatsUpdate<T> update(objs_delta, added_bytes, removed_bytes); | |
244 | map_find_and_update(user, bucket, &update); | |
245 | ||
246 | data_modified(user, bucket); | |
247 | } | |
248 | ||
249 | class BucketAsyncRefreshHandler : public RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler, | |
250 | public RGWGetBucketStats_CB { | |
251 | rgw_user user; | |
252 | public: | |
20effc67 | 253 | BucketAsyncRefreshHandler(rgw::sal::Store* _store, RGWQuotaCache<rgw_bucket> *_cache, |
224ce89b | 254 | const rgw_user& _user, const rgw_bucket& _bucket) : |
7c673cae FG |
255 | RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler(_store, _cache), |
256 | RGWGetBucketStats_CB(_bucket), user(_user) {} | |
257 | ||
258 | void drop_reference() override { put(); } | |
259 | void handle_response(int r) override; | |
260 | int init_fetch() override; | |
261 | }; | |
262 | ||
263 | int BucketAsyncRefreshHandler::init_fetch() | |
264 | { | |
20effc67 | 265 | std::unique_ptr<rgw::sal::Bucket> rbucket; |
7c673cae | 266 | |
b3b6e05e | 267 | const DoutPrefix dp(store->ctx(), dout_subsys, "rgw bucket async refresh handler: "); |
20effc67 | 268 | int r = store->get_bucket(&dp, nullptr, bucket, &rbucket, null_yield); |
7c673cae | 269 | if (r < 0) { |
b3b6e05e | 270 | ldpp_dout(&dp, 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl; |
7c673cae FG |
271 | return r; |
272 | } | |
273 | ||
b3b6e05e | 274 | ldpp_dout(&dp, 20) << "initiating async quota refresh for bucket=" << bucket << dendl; |
7c673cae | 275 | |
20effc67 | 276 | r = rbucket->read_stats_async(&dp, RGW_NO_SHARD, this); |
7c673cae | 277 | if (r < 0) { |
b3b6e05e | 278 | ldpp_dout(&dp, 0) << "could not get bucket info for bucket=" << bucket.name << dendl; |
7c673cae | 279 | |
20effc67 | 280 | /* read_stats_async() dropped our reference already */ |
7c673cae FG |
281 | return r; |
282 | } | |
283 | ||
284 | return 0; | |
285 | } | |
286 | ||
287 | void BucketAsyncRefreshHandler::handle_response(const int r) | |
288 | { | |
289 | if (r < 0) { | |
290 | ldout(store->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl; | |
c07f9fc5 FG |
291 | cache->async_refresh_fail(user, bucket); |
292 | return; | |
7c673cae FG |
293 | } |
294 | ||
295 | RGWStorageStats bs; | |
296 | ||
297 | for (const auto& pair : *stats) { | |
298 | const RGWStorageStats& s = pair.second; | |
299 | ||
300 | bs.size += s.size; | |
301 | bs.size_rounded += s.size_rounded; | |
302 | bs.num_objects += s.num_objects; | |
303 | } | |
304 | ||
305 | cache->async_refresh_response(user, bucket, bs); | |
306 | } | |
307 | ||
308 | class RGWBucketStatsCache : public RGWQuotaCache<rgw_bucket> { | |
309 | protected: | |
224ce89b | 310 | bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override { |
7c673cae FG |
311 | return stats_map.find(bucket, qs); |
312 | } | |
313 | ||
224ce89b | 314 | bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, lru_map<rgw_bucket, RGWQuotaCacheStats>::UpdateContext *ctx) override { |
7c673cae FG |
315 | return stats_map.find_and_update(bucket, NULL, ctx); |
316 | } | |
317 | ||
224ce89b | 318 | void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override { |
7c673cae FG |
319 | stats_map.add(bucket, qs); |
320 | } | |
321 | ||
b3b6e05e | 322 | int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider *dpp) override; |
7c673cae FG |
323 | |
324 | public: | |
20effc67 | 325 | explicit RGWBucketStatsCache(rgw::sal::Store* _store) : RGWQuotaCache<rgw_bucket>(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size) { |
7c673cae FG |
326 | } |
327 | ||
224ce89b | 328 | AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override { |
7c673cae FG |
329 | return new BucketAsyncRefreshHandler(store, this, user, bucket); |
330 | } | |
331 | }; | |
332 | ||
20effc67 | 333 | int RGWBucketStatsCache::fetch_stats_from_storage(const rgw_user& _u, const rgw_bucket& _b, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider *dpp) |
7c673cae | 334 | { |
20effc67 TL |
335 | std::unique_ptr<rgw::sal::User> user = store->get_user(_u); |
336 | std::unique_ptr<rgw::sal::Bucket> bucket; | |
7c673cae | 337 | |
20effc67 | 338 | int r = store->get_bucket(dpp, user.get(), _b, &bucket, y); |
7c673cae | 339 | if (r < 0) { |
20effc67 | 340 | ldpp_dout(dpp, 0) << "could not get bucket info for bucket=" << _b << " r=" << r << dendl; |
7c673cae FG |
341 | return r; |
342 | } | |
343 | ||
344 | string bucket_ver; | |
345 | string master_ver; | |
346 | ||
347 | map<RGWObjCategory, RGWStorageStats> bucket_stats; | |
20effc67 | 348 | r = bucket->read_stats(dpp, RGW_NO_SHARD, &bucket_ver, &master_ver, bucket_stats); |
7c673cae | 349 | if (r < 0) { |
b3b6e05e | 350 | ldpp_dout(dpp, 0) << "could not get bucket stats for bucket=" |
20effc67 | 351 | << _b.name << dendl; |
7c673cae FG |
352 | return r; |
353 | } | |
354 | ||
355 | stats = RGWStorageStats(); | |
356 | ||
357 | for (const auto& pair : bucket_stats) { | |
358 | const RGWStorageStats& s = pair.second; | |
359 | ||
360 | stats.size += s.size; | |
361 | stats.size_rounded += s.size_rounded; | |
362 | stats.num_objects += s.num_objects; | |
363 | } | |
364 | ||
365 | return 0; | |
366 | } | |
367 | ||
368 | class UserAsyncRefreshHandler : public RGWQuotaCache<rgw_user>::AsyncRefreshHandler, | |
369 | public RGWGetUserStats_CB { | |
b3b6e05e | 370 | const DoutPrefixProvider *dpp; |
7c673cae FG |
371 | rgw_bucket bucket; |
372 | public: | |
20effc67 | 373 | UserAsyncRefreshHandler(const DoutPrefixProvider *_dpp, rgw::sal::Store* _store, RGWQuotaCache<rgw_user> *_cache, |
224ce89b | 374 | const rgw_user& _user, const rgw_bucket& _bucket) : |
7c673cae FG |
375 | RGWQuotaCache<rgw_user>::AsyncRefreshHandler(_store, _cache), |
376 | RGWGetUserStats_CB(_user), | |
b3b6e05e | 377 | dpp(_dpp), |
7c673cae FG |
378 | bucket(_bucket) {} |
379 | ||
380 | void drop_reference() override { put(); } | |
381 | int init_fetch() override; | |
382 | void handle_response(int r) override; | |
383 | }; | |
384 | ||
385 | int UserAsyncRefreshHandler::init_fetch() | |
386 | { | |
20effc67 TL |
387 | std::unique_ptr<rgw::sal::User> ruser = store->get_user(user); |
388 | ||
b3b6e05e | 389 | ldpp_dout(dpp, 20) << "initiating async quota refresh for user=" << user << dendl; |
20effc67 | 390 | int r = ruser->read_stats_async(dpp, this); |
7c673cae | 391 | if (r < 0) { |
b3b6e05e | 392 | ldpp_dout(dpp, 0) << "could not get bucket info for user=" << user << dendl; |
7c673cae FG |
393 | |
394 | /* get_bucket_stats_async() dropped our reference already */ | |
395 | return r; | |
396 | } | |
397 | ||
398 | return 0; | |
399 | } | |
400 | ||
401 | void UserAsyncRefreshHandler::handle_response(int r) | |
402 | { | |
403 | if (r < 0) { | |
404 | ldout(store->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl; | |
c07f9fc5 FG |
405 | cache->async_refresh_fail(user, bucket); |
406 | return; | |
7c673cae FG |
407 | } |
408 | ||
409 | cache->async_refresh_response(user, bucket, stats); | |
410 | } | |
411 | ||
412 | class RGWUserStatsCache : public RGWQuotaCache<rgw_user> { | |
b3b6e05e | 413 | const DoutPrefixProvider *dpp; |
7c673cae | 414 | std::atomic<bool> down_flag = { false }; |
9f95a23c | 415 | ceph::shared_mutex mutex = ceph::make_shared_mutex("RGWUserStatsCache"); |
7c673cae FG |
416 | map<rgw_bucket, rgw_user> modified_buckets; |
417 | ||
418 | /* thread, sync recent modified buckets info */ | |
419 | class BucketsSyncThread : public Thread { | |
420 | CephContext *cct; | |
421 | RGWUserStatsCache *stats; | |
422 | ||
9f95a23c TL |
423 | ceph::mutex lock = ceph::make_mutex("RGWUserStatsCache::BucketsSyncThread"); |
424 | ceph::condition_variable cond; | |
7c673cae FG |
425 | public: |
426 | ||
9f95a23c | 427 | BucketsSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s) {} |
7c673cae FG |
428 | |
429 | void *entry() override { | |
430 | ldout(cct, 20) << "BucketsSyncThread: start" << dendl; | |
431 | do { | |
432 | map<rgw_bucket, rgw_user> buckets; | |
433 | ||
434 | stats->swap_modified_buckets(buckets); | |
435 | ||
436 | for (map<rgw_bucket, rgw_user>::iterator iter = buckets.begin(); iter != buckets.end(); ++iter) { | |
437 | rgw_bucket bucket = iter->first; | |
438 | rgw_user& user = iter->second; | |
439 | ldout(cct, 20) << "BucketsSyncThread: sync user=" << user << " bucket=" << bucket << dendl; | |
b3b6e05e TL |
440 | const DoutPrefix dp(cct, dout_subsys, "rgw bucket sync thread: "); |
441 | int r = stats->sync_bucket(user, bucket, null_yield, &dp); | |
7c673cae FG |
442 | if (r < 0) { |
443 | ldout(cct, 0) << "WARNING: sync_bucket() returned r=" << r << dendl; | |
444 | } | |
445 | } | |
446 | ||
447 | if (stats->going_down()) | |
448 | break; | |
449 | ||
9f95a23c TL |
450 | std::unique_lock locker{lock}; |
451 | cond.wait_for( | |
452 | locker, | |
453 | std::chrono::seconds(cct->_conf->rgw_user_quota_bucket_sync_interval)); | |
7c673cae FG |
454 | } while (!stats->going_down()); |
455 | ldout(cct, 20) << "BucketsSyncThread: done" << dendl; | |
456 | ||
457 | return NULL; | |
458 | } | |
459 | ||
460 | void stop() { | |
9f95a23c TL |
461 | std::lock_guard l{lock}; |
462 | cond.notify_all(); | |
7c673cae FG |
463 | } |
464 | }; | |
465 | ||
466 | /* | |
467 | * thread, full sync all users stats periodically | |
468 | * | |
469 | * only sync non idle users or ones that never got synced before, this is needed so that | |
470 | * users that didn't have quota turned on before (or existed before the user objclass | |
471 | * tracked stats) need to get their backend stats up to date. | |
472 | */ | |
473 | class UserSyncThread : public Thread { | |
474 | CephContext *cct; | |
475 | RGWUserStatsCache *stats; | |
476 | ||
9f95a23c TL |
477 | ceph::mutex lock = ceph::make_mutex("RGWUserStatsCache::UserSyncThread"); |
478 | ceph::condition_variable cond; | |
7c673cae FG |
479 | public: |
480 | ||
9f95a23c | 481 | UserSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s) {} |
7c673cae FG |
482 | |
483 | void *entry() override { | |
484 | ldout(cct, 20) << "UserSyncThread: start" << dendl; | |
485 | do { | |
b3b6e05e TL |
486 | const DoutPrefix dp(cct, dout_subsys, "rgw user sync thread: "); |
487 | int ret = stats->sync_all_users(&dp, null_yield); | |
7c673cae FG |
488 | if (ret < 0) { |
489 | ldout(cct, 5) << "ERROR: sync_all_users() returned ret=" << ret << dendl; | |
490 | } | |
491 | ||
b5b8bbf5 FG |
492 | if (stats->going_down()) |
493 | break; | |
494 | ||
9f95a23c TL |
495 | std::unique_lock l{lock}; |
496 | cond.wait_for(l, std::chrono::seconds(cct->_conf->rgw_user_quota_sync_interval)); | |
7c673cae FG |
497 | } while (!stats->going_down()); |
498 | ldout(cct, 20) << "UserSyncThread: done" << dendl; | |
499 | ||
500 | return NULL; | |
501 | } | |
502 | ||
503 | void stop() { | |
9f95a23c TL |
504 | std::lock_guard l{lock}; |
505 | cond.notify_all(); | |
7c673cae FG |
506 | } |
507 | }; | |
508 | ||
509 | BucketsSyncThread *buckets_sync_thread; | |
510 | UserSyncThread *user_sync_thread; | |
511 | protected: | |
224ce89b | 512 | bool map_find(const rgw_user& user,const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override { |
7c673cae FG |
513 | return stats_map.find(user, qs); |
514 | } | |
515 | ||
224ce89b | 516 | bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, lru_map<rgw_user, RGWQuotaCacheStats>::UpdateContext *ctx) override { |
7c673cae FG |
517 | return stats_map.find_and_update(user, NULL, ctx); |
518 | } | |
519 | ||
224ce89b | 520 | void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override { |
7c673cae FG |
521 | stats_map.add(user, qs); |
522 | } | |
523 | ||
b3b6e05e TL |
524 | int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider *dpp) override; |
525 | int sync_bucket(const rgw_user& rgw_user, rgw_bucket& bucket, optional_yield y, const DoutPrefixProvider *dpp); | |
526 | int sync_user(const DoutPrefixProvider *dpp, const rgw_user& user, optional_yield y); | |
527 | int sync_all_users(const DoutPrefixProvider *dpp, optional_yield y); | |
7c673cae FG |
528 | |
529 | void data_modified(const rgw_user& user, rgw_bucket& bucket) override; | |
530 | ||
531 | void swap_modified_buckets(map<rgw_bucket, rgw_user>& out) { | |
9f95a23c | 532 | std::unique_lock lock{mutex}; |
7c673cae | 533 | modified_buckets.swap(out); |
7c673cae FG |
534 | } |
535 | ||
536 | template<class T> /* easier doing it as a template, Thread doesn't have ->stop() */ | |
537 | void stop_thread(T **pthr) { | |
538 | T *thread = *pthr; | |
539 | if (!thread) | |
540 | return; | |
541 | ||
542 | thread->stop(); | |
543 | thread->join(); | |
544 | delete thread; | |
545 | *pthr = NULL; | |
546 | } | |
547 | ||
548 | public: | |
20effc67 | 549 | RGWUserStatsCache(const DoutPrefixProvider *dpp, rgw::sal::Store* _store, bool quota_threads) |
b3b6e05e | 550 | : RGWQuotaCache<rgw_user>(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size), dpp(dpp) |
9f95a23c | 551 | { |
7c673cae FG |
552 | if (quota_threads) { |
553 | buckets_sync_thread = new BucketsSyncThread(store->ctx(), this); | |
554 | buckets_sync_thread->create("rgw_buck_st_syn"); | |
555 | user_sync_thread = new UserSyncThread(store->ctx(), this); | |
556 | user_sync_thread->create("rgw_user_st_syn"); | |
557 | } else { | |
558 | buckets_sync_thread = NULL; | |
559 | user_sync_thread = NULL; | |
560 | } | |
561 | } | |
562 | ~RGWUserStatsCache() override { | |
563 | stop(); | |
564 | } | |
565 | ||
224ce89b | 566 | AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override { |
b3b6e05e | 567 | return new UserAsyncRefreshHandler(dpp, store, this, user, bucket); |
7c673cae FG |
568 | } |
569 | ||
7c673cae FG |
570 | bool going_down() { |
571 | return down_flag; | |
572 | } | |
573 | ||
574 | void stop() { | |
575 | down_flag = true; | |
9f95a23c TL |
576 | { |
577 | std::unique_lock lock{mutex}; | |
578 | stop_thread(&buckets_sync_thread); | |
579 | } | |
7c673cae FG |
580 | stop_thread(&user_sync_thread); |
581 | } | |
582 | }; | |
583 | ||
20effc67 TL |
584 | int RGWUserStatsCache::fetch_stats_from_storage(const rgw_user& _u, |
585 | const rgw_bucket& _b, | |
f67539c2 | 586 | RGWStorageStats& stats, |
b3b6e05e TL |
587 | optional_yield y, |
588 | const DoutPrefixProvider *dpp) | |
7c673cae | 589 | { |
20effc67 TL |
590 | std::unique_ptr<rgw::sal::User> user = store->get_user(_u); |
591 | int r = user->read_stats(dpp, y, &stats); | |
7c673cae | 592 | if (r < 0) { |
20effc67 | 593 | ldpp_dout(dpp, 0) << "could not get user stats for user=" << user << dendl; |
7c673cae FG |
594 | return r; |
595 | } | |
596 | ||
597 | return 0; | |
598 | } | |
599 | ||
20effc67 | 600 | int RGWUserStatsCache::sync_bucket(const rgw_user& _u, rgw_bucket& _b, optional_yield y, const DoutPrefixProvider *dpp) |
7c673cae | 601 | { |
20effc67 TL |
602 | std::unique_ptr<rgw::sal::User> user = store->get_user(_u); |
603 | std::unique_ptr<rgw::sal::Bucket> bucket; | |
7c673cae | 604 | |
20effc67 | 605 | int r = store->get_bucket(dpp, user.get(), _b, &bucket, y); |
7c673cae | 606 | if (r < 0) { |
20effc67 | 607 | ldpp_dout(dpp, 0) << "could not get bucket info for bucket=" << _b << " r=" << r << dendl; |
7c673cae FG |
608 | return r; |
609 | } | |
610 | ||
20effc67 | 611 | r = bucket->sync_user_stats(dpp, y); |
7c673cae | 612 | if (r < 0) { |
20effc67 | 613 | ldpp_dout(dpp, 0) << "ERROR: sync_user_stats() for user=" << _u << ", bucket=" << bucket << " returned " << r << dendl; |
7c673cae FG |
614 | return r; |
615 | } | |
616 | ||
20effc67 | 617 | return bucket->check_bucket_shards(dpp); |
7c673cae FG |
618 | } |
619 | ||
20effc67 | 620 | int RGWUserStatsCache::sync_user(const DoutPrefixProvider *dpp, const rgw_user& _u, optional_yield y) |
7c673cae | 621 | { |
9f95a23c TL |
622 | RGWStorageStats stats; |
623 | ceph::real_time last_stats_sync; | |
624 | ceph::real_time last_stats_update; | |
20effc67 | 625 | std::unique_ptr<rgw::sal::User> user = store->get_user(rgw_user(_u.to_str())); |
9f95a23c | 626 | |
20effc67 | 627 | int ret = user->read_stats(dpp, y, &stats, &last_stats_sync, &last_stats_update); |
7c673cae | 628 | if (ret < 0) { |
20effc67 | 629 | ldpp_dout(dpp, 5) << "ERROR: can't read user header: ret=" << ret << dendl; |
7c673cae FG |
630 | return ret; |
631 | } | |
632 | ||
633 | if (!store->ctx()->_conf->rgw_user_quota_sync_idle_users && | |
9f95a23c | 634 | last_stats_update < last_stats_sync) { |
20effc67 | 635 | ldpp_dout(dpp, 20) << "user is idle, not doing a full sync (user=" << user << ")" << dendl; |
7c673cae FG |
636 | return 0; |
637 | } | |
638 | ||
9f95a23c | 639 | real_time when_need_full_sync = last_stats_sync; |
7c673cae FG |
640 | when_need_full_sync += make_timespan(store->ctx()->_conf->rgw_user_quota_sync_wait_time); |
641 | ||
642 | // check if enough time passed since last full sync | |
643 | /* FIXME: missing check? */ | |
644 | ||
20effc67 | 645 | ret = rgw_user_sync_all_stats(dpp, store, user.get(), y); |
7c673cae | 646 | if (ret < 0) { |
20effc67 | 647 | ldpp_dout(dpp, 0) << "ERROR: failed user stats sync, ret=" << ret << dendl; |
7c673cae FG |
648 | return ret; |
649 | } | |
650 | ||
651 | return 0; | |
652 | } | |
653 | ||
b3b6e05e | 654 | int RGWUserStatsCache::sync_all_users(const DoutPrefixProvider *dpp, optional_yield y) |
7c673cae FG |
655 | { |
656 | string key = "user"; | |
657 | void *handle; | |
658 | ||
20effc67 | 659 | int ret = store->meta_list_keys_init(dpp, key, string(), &handle); |
7c673cae | 660 | if (ret < 0) { |
b3b6e05e | 661 | ldpp_dout(dpp, 10) << "ERROR: can't get key: ret=" << ret << dendl; |
7c673cae FG |
662 | return ret; |
663 | } | |
664 | ||
665 | bool truncated; | |
666 | int max = 1000; | |
667 | ||
668 | do { | |
669 | list<string> keys; | |
20effc67 | 670 | ret = store->meta_list_keys_next(dpp, handle, max, keys, &truncated); |
7c673cae | 671 | if (ret < 0) { |
b3b6e05e | 672 | ldpp_dout(dpp, 0) << "ERROR: lists_keys_next(): ret=" << ret << dendl; |
7c673cae FG |
673 | goto done; |
674 | } | |
675 | for (list<string>::iterator iter = keys.begin(); | |
676 | iter != keys.end() && !going_down(); | |
677 | ++iter) { | |
678 | rgw_user user(*iter); | |
b3b6e05e TL |
679 | ldpp_dout(dpp, 20) << "RGWUserStatsCache: sync user=" << user << dendl; |
680 | int ret = sync_user(dpp, user, y); | |
7c673cae | 681 | if (ret < 0) { |
b3b6e05e | 682 | ldpp_dout(dpp, 5) << "ERROR: sync_user() failed, user=" << user << " ret=" << ret << dendl; |
7c673cae FG |
683 | |
684 | /* continuing to next user */ | |
685 | continue; | |
686 | } | |
687 | } | |
688 | } while (truncated); | |
689 | ||
690 | ret = 0; | |
691 | done: | |
20effc67 | 692 | store->meta_list_keys_complete(handle); |
7c673cae FG |
693 | return ret; |
694 | } | |
695 | ||
696 | void RGWUserStatsCache::data_modified(const rgw_user& user, rgw_bucket& bucket) | |
697 | { | |
698 | /* racy, but it's ok */ | |
9f95a23c | 699 | mutex.lock_shared(); |
7c673cae | 700 | bool need_update = modified_buckets.find(bucket) == modified_buckets.end(); |
9f95a23c | 701 | mutex.unlock_shared(); |
7c673cae FG |
702 | |
703 | if (need_update) { | |
9f95a23c | 704 | std::unique_lock lock{mutex}; |
7c673cae | 705 | modified_buckets[bucket] = user; |
7c673cae FG |
706 | } |
707 | } | |
708 | ||
709 | ||
710 | class RGWQuotaInfoApplier { | |
711 | /* NOTE: no non-static field allowed as instances are supposed to live in | |
712 | * the static memory only. */ | |
713 | protected: | |
714 | RGWQuotaInfoApplier() = default; | |
715 | ||
716 | public: | |
717 | virtual ~RGWQuotaInfoApplier() {} | |
718 | ||
20effc67 TL |
719 | virtual bool is_size_exceeded(const DoutPrefixProvider *dpp, |
720 | const char * const entity, | |
7c673cae FG |
721 | const RGWQuotaInfo& qinfo, |
722 | const RGWStorageStats& stats, | |
723 | const uint64_t size) const = 0; | |
724 | ||
20effc67 TL |
725 | virtual bool is_num_objs_exceeded(const DoutPrefixProvider *dpp, |
726 | const char * const entity, | |
7c673cae FG |
727 | const RGWQuotaInfo& qinfo, |
728 | const RGWStorageStats& stats, | |
729 | const uint64_t num_objs) const = 0; | |
730 | ||
731 | static const RGWQuotaInfoApplier& get_instance(const RGWQuotaInfo& qinfo); | |
732 | }; | |
733 | ||
734 | class RGWQuotaInfoDefApplier : public RGWQuotaInfoApplier { | |
735 | public: | |
20effc67 | 736 | bool is_size_exceeded(const DoutPrefixProvider *dpp, const char * const entity, |
7c673cae FG |
737 | const RGWQuotaInfo& qinfo, |
738 | const RGWStorageStats& stats, | |
739 | const uint64_t size) const override; | |
740 | ||
20effc67 | 741 | bool is_num_objs_exceeded(const DoutPrefixProvider *dpp, const char * const entity, |
7c673cae FG |
742 | const RGWQuotaInfo& qinfo, |
743 | const RGWStorageStats& stats, | |
744 | const uint64_t num_objs) const override; | |
745 | }; | |
746 | ||
747 | class RGWQuotaInfoRawApplier : public RGWQuotaInfoApplier { | |
748 | public: | |
20effc67 | 749 | bool is_size_exceeded(const DoutPrefixProvider *dpp, const char * const entity, |
7c673cae FG |
750 | const RGWQuotaInfo& qinfo, |
751 | const RGWStorageStats& stats, | |
752 | const uint64_t size) const override; | |
753 | ||
20effc67 | 754 | bool is_num_objs_exceeded(const DoutPrefixProvider *dpp, const char * const entity, |
7c673cae FG |
755 | const RGWQuotaInfo& qinfo, |
756 | const RGWStorageStats& stats, | |
757 | const uint64_t num_objs) const override; | |
758 | }; | |
759 | ||
760 | ||
20effc67 TL |
761 | bool RGWQuotaInfoDefApplier::is_size_exceeded(const DoutPrefixProvider *dpp, |
762 | const char * const entity, | |
7c673cae FG |
763 | const RGWQuotaInfo& qinfo, |
764 | const RGWStorageStats& stats, | |
765 | const uint64_t size) const | |
766 | { | |
767 | if (qinfo.max_size < 0) { | |
768 | /* The limit is not enabled. */ | |
769 | return false; | |
770 | } | |
771 | ||
772 | const uint64_t cur_size = stats.size_rounded; | |
773 | const uint64_t new_size = rgw_rounded_objsize(size); | |
774 | ||
775 | if (cur_size + new_size > static_cast<uint64_t>(qinfo.max_size)) { | |
20effc67 | 776 | ldpp_dout(dpp, 10) << "quota exceeded: stats.size_rounded=" << stats.size_rounded |
7c673cae FG |
777 | << " size=" << new_size << " " |
778 | << entity << "_quota.max_size=" << qinfo.max_size << dendl; | |
779 | return true; | |
780 | } | |
781 | ||
782 | return false; | |
783 | } | |
784 | ||
20effc67 TL |
785 | bool RGWQuotaInfoDefApplier::is_num_objs_exceeded(const DoutPrefixProvider *dpp, |
786 | const char * const entity, | |
7c673cae FG |
787 | const RGWQuotaInfo& qinfo, |
788 | const RGWStorageStats& stats, | |
789 | const uint64_t num_objs) const | |
790 | { | |
791 | if (qinfo.max_objects < 0) { | |
792 | /* The limit is not enabled. */ | |
793 | return false; | |
794 | } | |
795 | ||
796 | if (stats.num_objects + num_objs > static_cast<uint64_t>(qinfo.max_objects)) { | |
20effc67 | 797 | ldpp_dout(dpp, 10) << "quota exceeded: stats.num_objects=" << stats.num_objects |
7c673cae FG |
798 | << " " << entity << "_quota.max_objects=" << qinfo.max_objects |
799 | << dendl; | |
800 | return true; | |
801 | } | |
802 | ||
803 | return false; | |
804 | } | |
805 | ||
20effc67 TL |
806 | bool RGWQuotaInfoRawApplier::is_size_exceeded(const DoutPrefixProvider *dpp, |
807 | const char * const entity, | |
7c673cae FG |
808 | const RGWQuotaInfo& qinfo, |
809 | const RGWStorageStats& stats, | |
810 | const uint64_t size) const | |
811 | { | |
812 | if (qinfo.max_size < 0) { | |
813 | /* The limit is not enabled. */ | |
814 | return false; | |
815 | } | |
816 | ||
817 | const uint64_t cur_size = stats.size; | |
818 | ||
819 | if (cur_size + size > static_cast<uint64_t>(qinfo.max_size)) { | |
20effc67 | 820 | ldpp_dout(dpp, 10) << "quota exceeded: stats.size=" << stats.size |
7c673cae FG |
821 | << " size=" << size << " " |
822 | << entity << "_quota.max_size=" << qinfo.max_size << dendl; | |
823 | return true; | |
824 | } | |
825 | ||
826 | return false; | |
827 | } | |
828 | ||
20effc67 TL |
829 | bool RGWQuotaInfoRawApplier::is_num_objs_exceeded(const DoutPrefixProvider *dpp, |
830 | const char * const entity, | |
7c673cae FG |
831 | const RGWQuotaInfo& qinfo, |
832 | const RGWStorageStats& stats, | |
833 | const uint64_t num_objs) const | |
834 | { | |
835 | if (qinfo.max_objects < 0) { | |
836 | /* The limit is not enabled. */ | |
837 | return false; | |
838 | } | |
839 | ||
840 | if (stats.num_objects + num_objs > static_cast<uint64_t>(qinfo.max_objects)) { | |
20effc67 | 841 | ldpp_dout(dpp, 10) << "quota exceeded: stats.num_objects=" << stats.num_objects |
7c673cae FG |
842 | << " " << entity << "_quota.max_objects=" << qinfo.max_objects |
843 | << dendl; | |
844 | return true; | |
845 | } | |
846 | ||
847 | return false; | |
848 | } | |
849 | ||
850 | const RGWQuotaInfoApplier& RGWQuotaInfoApplier::get_instance( | |
851 | const RGWQuotaInfo& qinfo) | |
852 | { | |
853 | static RGWQuotaInfoDefApplier default_qapplier; | |
854 | static RGWQuotaInfoRawApplier raw_qapplier; | |
855 | ||
856 | if (qinfo.check_on_raw) { | |
857 | return raw_qapplier; | |
858 | } else { | |
859 | return default_qapplier; | |
860 | } | |
861 | } | |
862 | ||
863 | ||
864 | class RGWQuotaHandlerImpl : public RGWQuotaHandler { | |
20effc67 | 865 | rgw::sal::Store* store; |
7c673cae FG |
866 | RGWBucketStatsCache bucket_stats_cache; |
867 | RGWUserStatsCache user_stats_cache; | |
868 | ||
20effc67 TL |
869 | int check_quota(const DoutPrefixProvider *dpp, |
870 | const char * const entity, | |
7c673cae FG |
871 | const RGWQuotaInfo& quota, |
872 | const RGWStorageStats& stats, | |
873 | const uint64_t num_objs, | |
874 | const uint64_t size) { | |
875 | if (!quota.enabled) { | |
876 | return 0; | |
877 | } | |
878 | ||
879 | const auto& quota_applier = RGWQuotaInfoApplier::get_instance(quota); | |
880 | ||
20effc67 | 881 | ldpp_dout(dpp, 20) << entity |
7c673cae FG |
882 | << " quota: max_objects=" << quota.max_objects |
883 | << " max_size=" << quota.max_size << dendl; | |
884 | ||
885 | ||
20effc67 | 886 | if (quota_applier.is_num_objs_exceeded(dpp, entity, quota, stats, num_objs)) { |
7c673cae FG |
887 | return -ERR_QUOTA_EXCEEDED; |
888 | } | |
889 | ||
20effc67 | 890 | if (quota_applier.is_size_exceeded(dpp, entity, quota, stats, size)) { |
7c673cae FG |
891 | return -ERR_QUOTA_EXCEEDED; |
892 | } | |
893 | ||
20effc67 | 894 | ldpp_dout(dpp, 20) << entity << " quota OK:" |
7c673cae FG |
895 | << " stats.num_objects=" << stats.num_objects |
896 | << " stats.size=" << stats.size << dendl; | |
897 | return 0; | |
898 | } | |
899 | public: | |
20effc67 | 900 | RGWQuotaHandlerImpl(const DoutPrefixProvider *dpp, rgw::sal::Store* _store, bool quota_threads) : store(_store), |
7c673cae | 901 | bucket_stats_cache(_store), |
b3b6e05e | 902 | user_stats_cache(dpp, _store, quota_threads) {} |
7c673cae | 903 | |
20effc67 TL |
904 | int check_quota(const DoutPrefixProvider *dpp, |
905 | const rgw_user& user, | |
f67539c2 TL |
906 | rgw_bucket& bucket, |
907 | RGWQuotaInfo& user_quota, | |
908 | RGWQuotaInfo& bucket_quota, | |
909 | uint64_t num_objs, | |
910 | uint64_t size, optional_yield y) override { | |
7c673cae FG |
911 | |
912 | if (!bucket_quota.enabled && !user_quota.enabled) { | |
913 | return 0; | |
914 | } | |
915 | ||
916 | /* | |
917 | * we need to fetch bucket stats if the user quota is enabled, because | |
918 | * the whole system relies on us periodically updating the user's bucket | |
919 | * stats in the user's header, this happens in get_stats() if we actually | |
920 | * fetch that info and not rely on cached data | |
921 | */ | |
922 | ||
b3b6e05e | 923 | const DoutPrefix dp(store->ctx(), dout_subsys, "rgw quota handler: "); |
7c673cae FG |
924 | if (bucket_quota.enabled) { |
925 | RGWStorageStats bucket_stats; | |
522d829b | 926 | int ret = bucket_stats_cache.get_stats(user, bucket, bucket_stats, y, &dp); |
7c673cae FG |
927 | if (ret < 0) { |
928 | return ret; | |
929 | } | |
20effc67 | 930 | ret = check_quota(dpp, "bucket", bucket_quota, bucket_stats, num_objs, size); |
7c673cae FG |
931 | if (ret < 0) { |
932 | return ret; | |
933 | } | |
934 | } | |
935 | ||
936 | if (user_quota.enabled) { | |
937 | RGWStorageStats user_stats; | |
522d829b | 938 | int ret = user_stats_cache.get_stats(user, bucket, user_stats, y, &dp); |
7c673cae FG |
939 | if (ret < 0) { |
940 | return ret; | |
941 | } | |
20effc67 | 942 | ret = check_quota(dpp, "user", user_quota, user_stats, num_objs, size); |
7c673cae FG |
943 | if (ret < 0) { |
944 | return ret; | |
945 | } | |
946 | } | |
947 | return 0; | |
948 | } | |
949 | ||
950 | void update_stats(const rgw_user& user, rgw_bucket& bucket, int obj_delta, uint64_t added_bytes, uint64_t removed_bytes) override { | |
951 | bucket_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes); | |
952 | user_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes); | |
953 | } | |
31f18b77 | 954 | |
20effc67 | 955 | void check_bucket_shards(const DoutPrefixProvider *dpp, uint64_t max_objs_per_shard, uint64_t num_shards, |
9f95a23c | 956 | uint64_t num_objs, bool& need_resharding, uint32_t *suggested_num_shards) override |
31f18b77 | 957 | { |
9f95a23c | 958 | if (num_objs > num_shards * max_objs_per_shard) { |
20effc67 | 959 | ldpp_dout(dpp, 0) << __func__ << ": resharding needed: stats.num_objects=" << num_objs |
31f18b77 FG |
960 | << " shard max_objects=" << max_objs_per_shard * num_shards << dendl; |
961 | need_resharding = true; | |
962 | if (suggested_num_shards) { | |
9f95a23c | 963 | *suggested_num_shards = num_objs * 2 / max_objs_per_shard; |
31f18b77 FG |
964 | } |
965 | } else { | |
966 | need_resharding = false; | |
967 | } | |
31f18b77 | 968 | } |
7c673cae FG |
969 | }; |
970 | ||
971 | ||
20effc67 | 972 | RGWQuotaHandler *RGWQuotaHandler::generate_handler(const DoutPrefixProvider *dpp, rgw::sal::Store* store, bool quota_threads) |
7c673cae | 973 | { |
b3b6e05e | 974 | return new RGWQuotaHandlerImpl(dpp, store, quota_threads); |
7c673cae FG |
975 | } |
976 | ||
977 | void RGWQuotaHandler::free_handler(RGWQuotaHandler *handler) | |
978 | { | |
979 | delete handler; | |
980 | } | |
31f18b77 FG |
981 | |
982 | ||
11fdf7f2 | 983 | void rgw_apply_default_bucket_quota(RGWQuotaInfo& quota, const ConfigProxy& conf) |
f64942e4 | 984 | { |
11fdf7f2 TL |
985 | if (conf->rgw_bucket_default_quota_max_objects >= 0) { |
986 | quota.max_objects = conf->rgw_bucket_default_quota_max_objects; | |
f64942e4 AA |
987 | quota.enabled = true; |
988 | } | |
11fdf7f2 TL |
989 | if (conf->rgw_bucket_default_quota_max_size >= 0) { |
990 | quota.max_size = conf->rgw_bucket_default_quota_max_size; | |
f64942e4 AA |
991 | quota.enabled = true; |
992 | } | |
993 | } | |
994 | ||
11fdf7f2 | 995 | void rgw_apply_default_user_quota(RGWQuotaInfo& quota, const ConfigProxy& conf) |
f64942e4 | 996 | { |
11fdf7f2 TL |
997 | if (conf->rgw_user_default_quota_max_objects >= 0) { |
998 | quota.max_objects = conf->rgw_user_default_quota_max_objects; | |
f64942e4 AA |
999 | quota.enabled = true; |
1000 | } | |
11fdf7f2 TL |
1001 | if (conf->rgw_user_default_quota_max_size >= 0) { |
1002 | quota.max_size = conf->rgw_user_default_quota_max_size; | |
f64942e4 AA |
1003 | quota.enabled = true; |
1004 | } | |
1005 | } | |
20effc67 TL |
1006 | |
1007 | void RGWQuotaInfo::dump(Formatter *f) const | |
1008 | { | |
1009 | f->dump_bool("enabled", enabled); | |
1010 | f->dump_bool("check_on_raw", check_on_raw); | |
1011 | ||
1012 | f->dump_int("max_size", max_size); | |
1013 | f->dump_int("max_size_kb", rgw_rounded_kb(max_size)); | |
1014 | f->dump_int("max_objects", max_objects); | |
1015 | } | |
1016 | ||
1017 | void RGWQuotaInfo::decode_json(JSONObj *obj) | |
1018 | { | |
1019 | if (false == JSONDecoder::decode_json("max_size", max_size, obj)) { | |
1020 | /* We're parsing an older version of the struct. */ | |
1021 | int64_t max_size_kb = 0; | |
1022 | ||
1023 | JSONDecoder::decode_json("max_size_kb", max_size_kb, obj); | |
1024 | max_size = max_size_kb * 1024; | |
1025 | } | |
1026 | JSONDecoder::decode_json("max_objects", max_objects, obj); | |
1027 | ||
1028 | JSONDecoder::decode_json("check_on_raw", check_on_raw, obj); | |
1029 | JSONDecoder::decode_json("enabled", enabled, obj); | |
1030 | } | |
1031 |