]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
9f95a23c | 2 | // vim: ts=8 sw=2 smarttab ft=cpp |
11fdf7f2 | 3 | |
7c673cae FG |
4 | /* |
5 | * Ceph - scalable distributed file system | |
6 | * | |
7 | * Copyright (C) 2013 Inktank, Inc | |
8 | * | |
9 | * This is free software; you can redistribute it and/or | |
10 | * modify it under the terms of the GNU Lesser General Public | |
11 | * License version 2.1, as published by the Free Software | |
12 | * Foundation. See file COPYING. | |
13 | * | |
14 | */ | |
15 | ||
16 | ||
17 | #include "include/utime.h" | |
18 | #include "common/lru_map.h" | |
19 | #include "common/RefCountedObj.h" | |
20 | #include "common/Thread.h" | |
9f95a23c | 21 | #include "common/ceph_mutex.h" |
7c673cae FG |
22 | |
23 | #include "rgw_common.h" | |
9f95a23c | 24 | #include "rgw_sal.h" |
f67539c2 | 25 | #include "rgw_sal_rados.h" |
7c673cae FG |
26 | #include "rgw_quota.h" |
27 | #include "rgw_bucket.h" | |
28 | #include "rgw_user.h" | |
29 | ||
11fdf7f2 | 30 | #include "services/svc_sys_obj.h" |
9f95a23c | 31 | #include "services/svc_meta.h" |
11fdf7f2 | 32 | |
7c673cae FG |
33 | #include <atomic> |
34 | ||
35 | #define dout_context g_ceph_context | |
36 | #define dout_subsys ceph_subsys_rgw | |
37 | ||
20effc67 | 38 | using namespace std; |
7c673cae FG |
39 | |
40 | struct RGWQuotaCacheStats { | |
41 | RGWStorageStats stats; | |
42 | utime_t expiration; | |
43 | utime_t async_refresh_time; | |
44 | }; | |
45 | ||
46 | template<class T> | |
47 | class RGWQuotaCache { | |
48 | protected: | |
1e59de90 | 49 | rgw::sal::Driver* driver; |
7c673cae FG |
50 | lru_map<T, RGWQuotaCacheStats> stats_map; |
51 | RefCountedWaitObject *async_refcount; | |
52 | ||
53 | class StatsAsyncTestSet : public lru_map<T, RGWQuotaCacheStats>::UpdateContext { | |
54 | int objs_delta; | |
55 | uint64_t added_bytes; | |
56 | uint64_t removed_bytes; | |
57 | public: | |
58 | StatsAsyncTestSet() : objs_delta(0), added_bytes(0), removed_bytes(0) {} | |
59 | bool update(RGWQuotaCacheStats *entry) override { | |
60 | if (entry->async_refresh_time.sec() == 0) | |
61 | return false; | |
62 | ||
63 | entry->async_refresh_time = utime_t(0, 0); | |
64 | ||
65 | return true; | |
66 | } | |
67 | }; | |
68 | ||
b3b6e05e | 69 | virtual int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider *dpp) = 0; |
7c673cae | 70 | |
224ce89b | 71 | virtual bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0; |
7c673cae | 72 | |
224ce89b WB |
73 | virtual bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, typename lru_map<T, RGWQuotaCacheStats>::UpdateContext *ctx) = 0; |
74 | virtual void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0; | |
7c673cae FG |
75 | |
76 | virtual void data_modified(const rgw_user& user, rgw_bucket& bucket) {} | |
77 | public: | |
1e59de90 | 78 | RGWQuotaCache(rgw::sal::Driver* _driver, int size) : driver(_driver), stats_map(size) { |
7c673cae FG |
79 | async_refcount = new RefCountedWaitObject; |
80 | } | |
81 | virtual ~RGWQuotaCache() { | |
82 | async_refcount->put_wait(); /* wait for all pending async requests to complete */ | |
83 | } | |
84 | ||
522d829b TL |
85 | int get_stats(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, optional_yield y, |
86 | const DoutPrefixProvider* dpp); | |
7c673cae FG |
87 | void adjust_stats(const rgw_user& user, rgw_bucket& bucket, int objs_delta, uint64_t added_bytes, uint64_t removed_bytes); |
88 | ||
224ce89b WB |
89 | void set_stats(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats); |
90 | int async_refresh(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs); | |
7c673cae | 91 | void async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats); |
c07f9fc5 | 92 | void async_refresh_fail(const rgw_user& user, rgw_bucket& bucket); |
7c673cae FG |
93 | |
94 | class AsyncRefreshHandler { | |
95 | protected: | |
1e59de90 | 96 | rgw::sal::Driver* driver; |
7c673cae FG |
97 | RGWQuotaCache<T> *cache; |
98 | public: | |
1e59de90 | 99 | AsyncRefreshHandler(rgw::sal::Driver* _driver, RGWQuotaCache<T> *_cache) : driver(_driver), cache(_cache) {} |
7c673cae FG |
100 | virtual ~AsyncRefreshHandler() {} |
101 | ||
102 | virtual int init_fetch() = 0; | |
103 | virtual void drop_reference() = 0; | |
104 | }; | |
105 | ||
224ce89b | 106 | virtual AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) = 0; |
7c673cae FG |
107 | }; |
108 | ||
7c673cae | 109 | template<class T> |
224ce89b | 110 | int RGWQuotaCache<T>::async_refresh(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) |
7c673cae FG |
111 | { |
112 | /* protect against multiple updates */ | |
113 | StatsAsyncTestSet test_update; | |
114 | if (!map_find_and_update(user, bucket, &test_update)) { | |
115 | /* most likely we just raced with another update */ | |
116 | return 0; | |
117 | } | |
118 | ||
119 | async_refcount->get(); | |
120 | ||
121 | ||
122 | AsyncRefreshHandler *handler = allocate_refresh_handler(user, bucket); | |
123 | ||
124 | int ret = handler->init_fetch(); | |
125 | if (ret < 0) { | |
126 | async_refcount->put(); | |
127 | handler->drop_reference(); | |
128 | return ret; | |
129 | } | |
130 | ||
131 | return 0; | |
132 | } | |
133 | ||
c07f9fc5 FG |
134 | template<class T> |
135 | void RGWQuotaCache<T>::async_refresh_fail(const rgw_user& user, rgw_bucket& bucket) | |
136 | { | |
1e59de90 | 137 | ldout(driver->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl; |
c07f9fc5 FG |
138 | |
139 | async_refcount->put(); | |
140 | } | |
141 | ||
7c673cae FG |
142 | template<class T> |
143 | void RGWQuotaCache<T>::async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats) | |
144 | { | |
1e59de90 | 145 | ldout(driver->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl; |
7c673cae FG |
146 | |
147 | RGWQuotaCacheStats qs; | |
148 | ||
149 | map_find(user, bucket, qs); | |
150 | ||
151 | set_stats(user, bucket, qs, stats); | |
152 | ||
153 | async_refcount->put(); | |
154 | } | |
155 | ||
156 | template<class T> | |
224ce89b | 157 | void RGWQuotaCache<T>::set_stats(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats) |
7c673cae FG |
158 | { |
159 | qs.stats = stats; | |
160 | qs.expiration = ceph_clock_now(); | |
161 | qs.async_refresh_time = qs.expiration; | |
1e59de90 TL |
162 | qs.expiration += driver->ctx()->_conf->rgw_bucket_quota_ttl; |
163 | qs.async_refresh_time += driver->ctx()->_conf->rgw_bucket_quota_ttl / 2; | |
7c673cae FG |
164 | |
165 | map_add(user, bucket, qs); | |
166 | } | |
167 | ||
168 | template<class T> | |
522d829b | 169 | int RGWQuotaCache<T>::get_stats(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider* dpp) { |
7c673cae FG |
170 | RGWQuotaCacheStats qs; |
171 | utime_t now = ceph_clock_now(); | |
172 | if (map_find(user, bucket, qs)) { | |
173 | if (qs.async_refresh_time.sec() > 0 && now >= qs.async_refresh_time) { | |
174 | int r = async_refresh(user, bucket, qs); | |
175 | if (r < 0) { | |
20effc67 | 176 | ldpp_dout(dpp, 0) << "ERROR: quota async refresh returned ret=" << r << dendl; |
7c673cae FG |
177 | |
178 | /* continue processing, might be a transient error, async refresh is just optimization */ | |
179 | } | |
180 | } | |
181 | ||
522d829b | 182 | if (qs.expiration > ceph_clock_now()) { |
7c673cae FG |
183 | stats = qs.stats; |
184 | return 0; | |
185 | } | |
186 | } | |
187 | ||
b3b6e05e | 188 | int ret = fetch_stats_from_storage(user, bucket, stats, y, dpp); |
7c673cae FG |
189 | if (ret < 0 && ret != -ENOENT) |
190 | return ret; | |
191 | ||
192 | set_stats(user, bucket, qs, stats); | |
193 | ||
194 | return 0; | |
195 | } | |
196 | ||
197 | ||
198 | template<class T> | |
199 | class RGWQuotaStatsUpdate : public lru_map<T, RGWQuotaCacheStats>::UpdateContext { | |
200 | const int objs_delta; | |
201 | const uint64_t added_bytes; | |
202 | const uint64_t removed_bytes; | |
203 | public: | |
204 | RGWQuotaStatsUpdate(const int objs_delta, | |
205 | const uint64_t added_bytes, | |
206 | const uint64_t removed_bytes) | |
207 | : objs_delta(objs_delta), | |
208 | added_bytes(added_bytes), | |
209 | removed_bytes(removed_bytes) { | |
210 | } | |
211 | ||
212 | bool update(RGWQuotaCacheStats * const entry) override { | |
213 | const uint64_t rounded_added = rgw_rounded_objsize(added_bytes); | |
214 | const uint64_t rounded_removed = rgw_rounded_objsize(removed_bytes); | |
215 | ||
181888fb | 216 | if (((int64_t)(entry->stats.size + added_bytes - removed_bytes)) >= 0) { |
c07f9fc5 FG |
217 | entry->stats.size += added_bytes - removed_bytes; |
218 | } else { | |
219 | entry->stats.size = 0; | |
220 | } | |
221 | ||
181888fb | 222 | if (((int64_t)(entry->stats.size_rounded + rounded_added - rounded_removed)) >= 0) { |
c07f9fc5 FG |
223 | entry->stats.size_rounded += rounded_added - rounded_removed; |
224 | } else { | |
225 | entry->stats.size_rounded = 0; | |
226 | } | |
227 | ||
181888fb | 228 | if (((int64_t)(entry->stats.num_objects + objs_delta)) >= 0) { |
c07f9fc5 FG |
229 | entry->stats.num_objects += objs_delta; |
230 | } else { | |
231 | entry->stats.num_objects = 0; | |
232 | } | |
7c673cae FG |
233 | |
234 | return true; | |
235 | } | |
236 | }; | |
237 | ||
238 | ||
239 | template<class T> | |
240 | void RGWQuotaCache<T>::adjust_stats(const rgw_user& user, rgw_bucket& bucket, int objs_delta, | |
241 | uint64_t added_bytes, uint64_t removed_bytes) | |
242 | { | |
243 | RGWQuotaStatsUpdate<T> update(objs_delta, added_bytes, removed_bytes); | |
244 | map_find_and_update(user, bucket, &update); | |
245 | ||
246 | data_modified(user, bucket); | |
247 | } | |
248 | ||
249 | class BucketAsyncRefreshHandler : public RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler, | |
250 | public RGWGetBucketStats_CB { | |
251 | rgw_user user; | |
252 | public: | |
1e59de90 | 253 | BucketAsyncRefreshHandler(rgw::sal::Driver* _driver, RGWQuotaCache<rgw_bucket> *_cache, |
224ce89b | 254 | const rgw_user& _user, const rgw_bucket& _bucket) : |
1e59de90 | 255 | RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler(_driver, _cache), |
7c673cae FG |
256 | RGWGetBucketStats_CB(_bucket), user(_user) {} |
257 | ||
258 | void drop_reference() override { put(); } | |
259 | void handle_response(int r) override; | |
260 | int init_fetch() override; | |
261 | }; | |
262 | ||
263 | int BucketAsyncRefreshHandler::init_fetch() | |
264 | { | |
20effc67 | 265 | std::unique_ptr<rgw::sal::Bucket> rbucket; |
7c673cae | 266 | |
1e59de90 TL |
267 | const DoutPrefix dp(driver->ctx(), dout_subsys, "rgw bucket async refresh handler: "); |
268 | int r = driver->get_bucket(&dp, nullptr, bucket, &rbucket, null_yield); | |
7c673cae | 269 | if (r < 0) { |
b3b6e05e | 270 | ldpp_dout(&dp, 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl; |
7c673cae FG |
271 | return r; |
272 | } | |
273 | ||
b3b6e05e | 274 | ldpp_dout(&dp, 20) << "initiating async quota refresh for bucket=" << bucket << dendl; |
7c673cae | 275 | |
1e59de90 TL |
276 | const auto& index = rbucket->get_info().get_current_index(); |
277 | if (is_layout_indexless(index)) { | |
278 | return 0; | |
279 | } | |
280 | ||
281 | r = rbucket->read_stats_async(&dp, index, RGW_NO_SHARD, this); | |
7c673cae | 282 | if (r < 0) { |
b3b6e05e | 283 | ldpp_dout(&dp, 0) << "could not get bucket info for bucket=" << bucket.name << dendl; |
7c673cae | 284 | |
20effc67 | 285 | /* read_stats_async() dropped our reference already */ |
7c673cae FG |
286 | return r; |
287 | } | |
288 | ||
289 | return 0; | |
290 | } | |
291 | ||
292 | void BucketAsyncRefreshHandler::handle_response(const int r) | |
293 | { | |
294 | if (r < 0) { | |
1e59de90 | 295 | ldout(driver->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl; |
c07f9fc5 FG |
296 | cache->async_refresh_fail(user, bucket); |
297 | return; | |
7c673cae FG |
298 | } |
299 | ||
300 | RGWStorageStats bs; | |
301 | ||
302 | for (const auto& pair : *stats) { | |
303 | const RGWStorageStats& s = pair.second; | |
304 | ||
305 | bs.size += s.size; | |
306 | bs.size_rounded += s.size_rounded; | |
307 | bs.num_objects += s.num_objects; | |
308 | } | |
309 | ||
310 | cache->async_refresh_response(user, bucket, bs); | |
311 | } | |
312 | ||
313 | class RGWBucketStatsCache : public RGWQuotaCache<rgw_bucket> { | |
314 | protected: | |
224ce89b | 315 | bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override { |
7c673cae FG |
316 | return stats_map.find(bucket, qs); |
317 | } | |
318 | ||
224ce89b | 319 | bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, lru_map<rgw_bucket, RGWQuotaCacheStats>::UpdateContext *ctx) override { |
7c673cae FG |
320 | return stats_map.find_and_update(bucket, NULL, ctx); |
321 | } | |
322 | ||
224ce89b | 323 | void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override { |
7c673cae FG |
324 | stats_map.add(bucket, qs); |
325 | } | |
326 | ||
b3b6e05e | 327 | int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider *dpp) override; |
7c673cae FG |
328 | |
329 | public: | |
1e59de90 | 330 | explicit RGWBucketStatsCache(rgw::sal::Driver* _driver) : RGWQuotaCache<rgw_bucket>(_driver, _driver->ctx()->_conf->rgw_bucket_quota_cache_size) { |
7c673cae FG |
331 | } |
332 | ||
224ce89b | 333 | AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override { |
1e59de90 | 334 | return new BucketAsyncRefreshHandler(driver, this, user, bucket); |
7c673cae FG |
335 | } |
336 | }; | |
337 | ||
20effc67 | 338 | int RGWBucketStatsCache::fetch_stats_from_storage(const rgw_user& _u, const rgw_bucket& _b, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider *dpp) |
7c673cae | 339 | { |
1e59de90 | 340 | std::unique_ptr<rgw::sal::User> user = driver->get_user(_u); |
20effc67 | 341 | std::unique_ptr<rgw::sal::Bucket> bucket; |
7c673cae | 342 | |
1e59de90 | 343 | int r = driver->get_bucket(dpp, user.get(), _b, &bucket, y); |
7c673cae | 344 | if (r < 0) { |
20effc67 | 345 | ldpp_dout(dpp, 0) << "could not get bucket info for bucket=" << _b << " r=" << r << dendl; |
7c673cae FG |
346 | return r; |
347 | } | |
348 | ||
1e59de90 TL |
349 | stats = RGWStorageStats(); |
350 | ||
351 | const auto& index = bucket->get_info().get_current_index(); | |
352 | if (is_layout_indexless(index)) { | |
353 | return 0; | |
354 | } | |
355 | ||
7c673cae FG |
356 | string bucket_ver; |
357 | string master_ver; | |
358 | ||
359 | map<RGWObjCategory, RGWStorageStats> bucket_stats; | |
1e59de90 TL |
360 | r = bucket->read_stats(dpp, index, RGW_NO_SHARD, &bucket_ver, |
361 | &master_ver, bucket_stats, nullptr); | |
7c673cae | 362 | if (r < 0) { |
b3b6e05e | 363 | ldpp_dout(dpp, 0) << "could not get bucket stats for bucket=" |
20effc67 | 364 | << _b.name << dendl; |
7c673cae FG |
365 | return r; |
366 | } | |
367 | ||
7c673cae FG |
368 | for (const auto& pair : bucket_stats) { |
369 | const RGWStorageStats& s = pair.second; | |
370 | ||
371 | stats.size += s.size; | |
372 | stats.size_rounded += s.size_rounded; | |
373 | stats.num_objects += s.num_objects; | |
374 | } | |
375 | ||
376 | return 0; | |
377 | } | |
378 | ||
379 | class UserAsyncRefreshHandler : public RGWQuotaCache<rgw_user>::AsyncRefreshHandler, | |
380 | public RGWGetUserStats_CB { | |
b3b6e05e | 381 | const DoutPrefixProvider *dpp; |
7c673cae FG |
382 | rgw_bucket bucket; |
383 | public: | |
1e59de90 | 384 | UserAsyncRefreshHandler(const DoutPrefixProvider *_dpp, rgw::sal::Driver* _driver, RGWQuotaCache<rgw_user> *_cache, |
224ce89b | 385 | const rgw_user& _user, const rgw_bucket& _bucket) : |
1e59de90 | 386 | RGWQuotaCache<rgw_user>::AsyncRefreshHandler(_driver, _cache), |
7c673cae | 387 | RGWGetUserStats_CB(_user), |
b3b6e05e | 388 | dpp(_dpp), |
7c673cae FG |
389 | bucket(_bucket) {} |
390 | ||
391 | void drop_reference() override { put(); } | |
392 | int init_fetch() override; | |
393 | void handle_response(int r) override; | |
394 | }; | |
395 | ||
396 | int UserAsyncRefreshHandler::init_fetch() | |
397 | { | |
1e59de90 | 398 | std::unique_ptr<rgw::sal::User> ruser = driver->get_user(user); |
20effc67 | 399 | |
b3b6e05e | 400 | ldpp_dout(dpp, 20) << "initiating async quota refresh for user=" << user << dendl; |
20effc67 | 401 | int r = ruser->read_stats_async(dpp, this); |
7c673cae | 402 | if (r < 0) { |
b3b6e05e | 403 | ldpp_dout(dpp, 0) << "could not get bucket info for user=" << user << dendl; |
7c673cae FG |
404 | |
405 | /* get_bucket_stats_async() dropped our reference already */ | |
406 | return r; | |
407 | } | |
408 | ||
409 | return 0; | |
410 | } | |
411 | ||
412 | void UserAsyncRefreshHandler::handle_response(int r) | |
413 | { | |
414 | if (r < 0) { | |
1e59de90 | 415 | ldout(driver->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl; |
c07f9fc5 FG |
416 | cache->async_refresh_fail(user, bucket); |
417 | return; | |
7c673cae FG |
418 | } |
419 | ||
420 | cache->async_refresh_response(user, bucket, stats); | |
421 | } | |
422 | ||
423 | class RGWUserStatsCache : public RGWQuotaCache<rgw_user> { | |
b3b6e05e | 424 | const DoutPrefixProvider *dpp; |
7c673cae | 425 | std::atomic<bool> down_flag = { false }; |
9f95a23c | 426 | ceph::shared_mutex mutex = ceph::make_shared_mutex("RGWUserStatsCache"); |
7c673cae FG |
427 | map<rgw_bucket, rgw_user> modified_buckets; |
428 | ||
429 | /* thread, sync recent modified buckets info */ | |
430 | class BucketsSyncThread : public Thread { | |
431 | CephContext *cct; | |
432 | RGWUserStatsCache *stats; | |
433 | ||
9f95a23c TL |
434 | ceph::mutex lock = ceph::make_mutex("RGWUserStatsCache::BucketsSyncThread"); |
435 | ceph::condition_variable cond; | |
7c673cae FG |
436 | public: |
437 | ||
9f95a23c | 438 | BucketsSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s) {} |
7c673cae FG |
439 | |
440 | void *entry() override { | |
441 | ldout(cct, 20) << "BucketsSyncThread: start" << dendl; | |
442 | do { | |
443 | map<rgw_bucket, rgw_user> buckets; | |
444 | ||
445 | stats->swap_modified_buckets(buckets); | |
446 | ||
447 | for (map<rgw_bucket, rgw_user>::iterator iter = buckets.begin(); iter != buckets.end(); ++iter) { | |
448 | rgw_bucket bucket = iter->first; | |
449 | rgw_user& user = iter->second; | |
450 | ldout(cct, 20) << "BucketsSyncThread: sync user=" << user << " bucket=" << bucket << dendl; | |
b3b6e05e TL |
451 | const DoutPrefix dp(cct, dout_subsys, "rgw bucket sync thread: "); |
452 | int r = stats->sync_bucket(user, bucket, null_yield, &dp); | |
7c673cae FG |
453 | if (r < 0) { |
454 | ldout(cct, 0) << "WARNING: sync_bucket() returned r=" << r << dendl; | |
455 | } | |
456 | } | |
457 | ||
458 | if (stats->going_down()) | |
459 | break; | |
460 | ||
9f95a23c TL |
461 | std::unique_lock locker{lock}; |
462 | cond.wait_for( | |
463 | locker, | |
464 | std::chrono::seconds(cct->_conf->rgw_user_quota_bucket_sync_interval)); | |
7c673cae FG |
465 | } while (!stats->going_down()); |
466 | ldout(cct, 20) << "BucketsSyncThread: done" << dendl; | |
467 | ||
468 | return NULL; | |
469 | } | |
470 | ||
471 | void stop() { | |
9f95a23c TL |
472 | std::lock_guard l{lock}; |
473 | cond.notify_all(); | |
7c673cae FG |
474 | } |
475 | }; | |
476 | ||
477 | /* | |
478 | * thread, full sync all users stats periodically | |
479 | * | |
480 | * only sync non idle users or ones that never got synced before, this is needed so that | |
481 | * users that didn't have quota turned on before (or existed before the user objclass | |
482 | * tracked stats) need to get their backend stats up to date. | |
483 | */ | |
484 | class UserSyncThread : public Thread { | |
485 | CephContext *cct; | |
486 | RGWUserStatsCache *stats; | |
487 | ||
9f95a23c TL |
488 | ceph::mutex lock = ceph::make_mutex("RGWUserStatsCache::UserSyncThread"); |
489 | ceph::condition_variable cond; | |
7c673cae FG |
490 | public: |
491 | ||
9f95a23c | 492 | UserSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s) {} |
7c673cae FG |
493 | |
494 | void *entry() override { | |
495 | ldout(cct, 20) << "UserSyncThread: start" << dendl; | |
496 | do { | |
b3b6e05e TL |
497 | const DoutPrefix dp(cct, dout_subsys, "rgw user sync thread: "); |
498 | int ret = stats->sync_all_users(&dp, null_yield); | |
7c673cae FG |
499 | if (ret < 0) { |
500 | ldout(cct, 5) << "ERROR: sync_all_users() returned ret=" << ret << dendl; | |
501 | } | |
502 | ||
b5b8bbf5 FG |
503 | if (stats->going_down()) |
504 | break; | |
505 | ||
9f95a23c TL |
506 | std::unique_lock l{lock}; |
507 | cond.wait_for(l, std::chrono::seconds(cct->_conf->rgw_user_quota_sync_interval)); | |
7c673cae FG |
508 | } while (!stats->going_down()); |
509 | ldout(cct, 20) << "UserSyncThread: done" << dendl; | |
510 | ||
511 | return NULL; | |
512 | } | |
513 | ||
514 | void stop() { | |
9f95a23c TL |
515 | std::lock_guard l{lock}; |
516 | cond.notify_all(); | |
7c673cae FG |
517 | } |
518 | }; | |
519 | ||
520 | BucketsSyncThread *buckets_sync_thread; | |
521 | UserSyncThread *user_sync_thread; | |
522 | protected: | |
224ce89b | 523 | bool map_find(const rgw_user& user,const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override { |
7c673cae FG |
524 | return stats_map.find(user, qs); |
525 | } | |
526 | ||
224ce89b | 527 | bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, lru_map<rgw_user, RGWQuotaCacheStats>::UpdateContext *ctx) override { |
7c673cae FG |
528 | return stats_map.find_and_update(user, NULL, ctx); |
529 | } | |
530 | ||
224ce89b | 531 | void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override { |
7c673cae FG |
532 | stats_map.add(user, qs); |
533 | } | |
534 | ||
b3b6e05e TL |
535 | int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider *dpp) override; |
536 | int sync_bucket(const rgw_user& rgw_user, rgw_bucket& bucket, optional_yield y, const DoutPrefixProvider *dpp); | |
537 | int sync_user(const DoutPrefixProvider *dpp, const rgw_user& user, optional_yield y); | |
538 | int sync_all_users(const DoutPrefixProvider *dpp, optional_yield y); | |
7c673cae FG |
539 | |
540 | void data_modified(const rgw_user& user, rgw_bucket& bucket) override; | |
541 | ||
542 | void swap_modified_buckets(map<rgw_bucket, rgw_user>& out) { | |
9f95a23c | 543 | std::unique_lock lock{mutex}; |
7c673cae | 544 | modified_buckets.swap(out); |
7c673cae FG |
545 | } |
546 | ||
547 | template<class T> /* easier doing it as a template, Thread doesn't have ->stop() */ | |
548 | void stop_thread(T **pthr) { | |
549 | T *thread = *pthr; | |
550 | if (!thread) | |
551 | return; | |
552 | ||
553 | thread->stop(); | |
554 | thread->join(); | |
555 | delete thread; | |
556 | *pthr = NULL; | |
557 | } | |
558 | ||
559 | public: | |
1e59de90 TL |
560 | RGWUserStatsCache(const DoutPrefixProvider *dpp, rgw::sal::Driver* _driver, bool quota_threads) |
561 | : RGWQuotaCache<rgw_user>(_driver, _driver->ctx()->_conf->rgw_bucket_quota_cache_size), dpp(dpp) | |
9f95a23c | 562 | { |
7c673cae | 563 | if (quota_threads) { |
1e59de90 | 564 | buckets_sync_thread = new BucketsSyncThread(driver->ctx(), this); |
7c673cae | 565 | buckets_sync_thread->create("rgw_buck_st_syn"); |
1e59de90 | 566 | user_sync_thread = new UserSyncThread(driver->ctx(), this); |
7c673cae FG |
567 | user_sync_thread->create("rgw_user_st_syn"); |
568 | } else { | |
569 | buckets_sync_thread = NULL; | |
570 | user_sync_thread = NULL; | |
571 | } | |
572 | } | |
573 | ~RGWUserStatsCache() override { | |
574 | stop(); | |
575 | } | |
576 | ||
224ce89b | 577 | AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override { |
1e59de90 | 578 | return new UserAsyncRefreshHandler(dpp, driver, this, user, bucket); |
7c673cae FG |
579 | } |
580 | ||
7c673cae FG |
581 | bool going_down() { |
582 | return down_flag; | |
583 | } | |
584 | ||
585 | void stop() { | |
586 | down_flag = true; | |
9f95a23c TL |
587 | { |
588 | std::unique_lock lock{mutex}; | |
589 | stop_thread(&buckets_sync_thread); | |
590 | } | |
7c673cae FG |
591 | stop_thread(&user_sync_thread); |
592 | } | |
593 | }; | |
594 | ||
20effc67 TL |
595 | int RGWUserStatsCache::fetch_stats_from_storage(const rgw_user& _u, |
596 | const rgw_bucket& _b, | |
f67539c2 | 597 | RGWStorageStats& stats, |
b3b6e05e TL |
598 | optional_yield y, |
599 | const DoutPrefixProvider *dpp) | |
7c673cae | 600 | { |
1e59de90 | 601 | std::unique_ptr<rgw::sal::User> user = driver->get_user(_u); |
20effc67 | 602 | int r = user->read_stats(dpp, y, &stats); |
7c673cae | 603 | if (r < 0) { |
20effc67 | 604 | ldpp_dout(dpp, 0) << "could not get user stats for user=" << user << dendl; |
7c673cae FG |
605 | return r; |
606 | } | |
607 | ||
608 | return 0; | |
609 | } | |
610 | ||
20effc67 | 611 | int RGWUserStatsCache::sync_bucket(const rgw_user& _u, rgw_bucket& _b, optional_yield y, const DoutPrefixProvider *dpp) |
7c673cae | 612 | { |
1e59de90 | 613 | std::unique_ptr<rgw::sal::User> user = driver->get_user(_u); |
20effc67 | 614 | std::unique_ptr<rgw::sal::Bucket> bucket; |
7c673cae | 615 | |
1e59de90 | 616 | int r = driver->get_bucket(dpp, user.get(), _b, &bucket, y); |
7c673cae | 617 | if (r < 0) { |
20effc67 | 618 | ldpp_dout(dpp, 0) << "could not get bucket info for bucket=" << _b << " r=" << r << dendl; |
7c673cae FG |
619 | return r; |
620 | } | |
621 | ||
20effc67 | 622 | r = bucket->sync_user_stats(dpp, y); |
7c673cae | 623 | if (r < 0) { |
20effc67 | 624 | ldpp_dout(dpp, 0) << "ERROR: sync_user_stats() for user=" << _u << ", bucket=" << bucket << " returned " << r << dendl; |
7c673cae FG |
625 | return r; |
626 | } | |
627 | ||
20effc67 | 628 | return bucket->check_bucket_shards(dpp); |
7c673cae FG |
629 | } |
630 | ||
20effc67 | 631 | int RGWUserStatsCache::sync_user(const DoutPrefixProvider *dpp, const rgw_user& _u, optional_yield y) |
7c673cae | 632 | { |
9f95a23c TL |
633 | RGWStorageStats stats; |
634 | ceph::real_time last_stats_sync; | |
635 | ceph::real_time last_stats_update; | |
1e59de90 | 636 | std::unique_ptr<rgw::sal::User> user = driver->get_user(rgw_user(_u.to_str())); |
9f95a23c | 637 | |
20effc67 | 638 | int ret = user->read_stats(dpp, y, &stats, &last_stats_sync, &last_stats_update); |
7c673cae | 639 | if (ret < 0) { |
20effc67 | 640 | ldpp_dout(dpp, 5) << "ERROR: can't read user header: ret=" << ret << dendl; |
7c673cae FG |
641 | return ret; |
642 | } | |
643 | ||
1e59de90 | 644 | if (!driver->ctx()->_conf->rgw_user_quota_sync_idle_users && |
9f95a23c | 645 | last_stats_update < last_stats_sync) { |
20effc67 | 646 | ldpp_dout(dpp, 20) << "user is idle, not doing a full sync (user=" << user << ")" << dendl; |
7c673cae FG |
647 | return 0; |
648 | } | |
649 | ||
9f95a23c | 650 | real_time when_need_full_sync = last_stats_sync; |
1e59de90 | 651 | when_need_full_sync += make_timespan(driver->ctx()->_conf->rgw_user_quota_sync_wait_time); |
7c673cae FG |
652 | |
653 | // check if enough time passed since last full sync | |
654 | /* FIXME: missing check? */ | |
655 | ||
1e59de90 | 656 | ret = rgw_user_sync_all_stats(dpp, driver, user.get(), y); |
7c673cae | 657 | if (ret < 0) { |
20effc67 | 658 | ldpp_dout(dpp, 0) << "ERROR: failed user stats sync, ret=" << ret << dendl; |
7c673cae FG |
659 | return ret; |
660 | } | |
661 | ||
662 | return 0; | |
663 | } | |
664 | ||
b3b6e05e | 665 | int RGWUserStatsCache::sync_all_users(const DoutPrefixProvider *dpp, optional_yield y) |
7c673cae FG |
666 | { |
667 | string key = "user"; | |
668 | void *handle; | |
669 | ||
1e59de90 | 670 | int ret = driver->meta_list_keys_init(dpp, key, string(), &handle); |
7c673cae | 671 | if (ret < 0) { |
b3b6e05e | 672 | ldpp_dout(dpp, 10) << "ERROR: can't get key: ret=" << ret << dendl; |
7c673cae FG |
673 | return ret; |
674 | } | |
675 | ||
676 | bool truncated; | |
677 | int max = 1000; | |
678 | ||
679 | do { | |
680 | list<string> keys; | |
1e59de90 | 681 | ret = driver->meta_list_keys_next(dpp, handle, max, keys, &truncated); |
7c673cae | 682 | if (ret < 0) { |
b3b6e05e | 683 | ldpp_dout(dpp, 0) << "ERROR: lists_keys_next(): ret=" << ret << dendl; |
7c673cae FG |
684 | goto done; |
685 | } | |
686 | for (list<string>::iterator iter = keys.begin(); | |
687 | iter != keys.end() && !going_down(); | |
688 | ++iter) { | |
689 | rgw_user user(*iter); | |
b3b6e05e TL |
690 | ldpp_dout(dpp, 20) << "RGWUserStatsCache: sync user=" << user << dendl; |
691 | int ret = sync_user(dpp, user, y); | |
7c673cae | 692 | if (ret < 0) { |
b3b6e05e | 693 | ldpp_dout(dpp, 5) << "ERROR: sync_user() failed, user=" << user << " ret=" << ret << dendl; |
7c673cae FG |
694 | |
695 | /* continuing to next user */ | |
696 | continue; | |
697 | } | |
698 | } | |
699 | } while (truncated); | |
700 | ||
701 | ret = 0; | |
702 | done: | |
1e59de90 | 703 | driver->meta_list_keys_complete(handle); |
7c673cae FG |
704 | return ret; |
705 | } | |
706 | ||
707 | void RGWUserStatsCache::data_modified(const rgw_user& user, rgw_bucket& bucket) | |
708 | { | |
709 | /* racy, but it's ok */ | |
9f95a23c | 710 | mutex.lock_shared(); |
7c673cae | 711 | bool need_update = modified_buckets.find(bucket) == modified_buckets.end(); |
9f95a23c | 712 | mutex.unlock_shared(); |
7c673cae FG |
713 | |
714 | if (need_update) { | |
9f95a23c | 715 | std::unique_lock lock{mutex}; |
7c673cae | 716 | modified_buckets[bucket] = user; |
7c673cae FG |
717 | } |
718 | } | |
719 | ||
720 | ||
721 | class RGWQuotaInfoApplier { | |
722 | /* NOTE: no non-static field allowed as instances are supposed to live in | |
723 | * the static memory only. */ | |
724 | protected: | |
725 | RGWQuotaInfoApplier() = default; | |
726 | ||
727 | public: | |
728 | virtual ~RGWQuotaInfoApplier() {} | |
729 | ||
20effc67 TL |
730 | virtual bool is_size_exceeded(const DoutPrefixProvider *dpp, |
731 | const char * const entity, | |
7c673cae FG |
732 | const RGWQuotaInfo& qinfo, |
733 | const RGWStorageStats& stats, | |
734 | const uint64_t size) const = 0; | |
735 | ||
20effc67 TL |
736 | virtual bool is_num_objs_exceeded(const DoutPrefixProvider *dpp, |
737 | const char * const entity, | |
7c673cae FG |
738 | const RGWQuotaInfo& qinfo, |
739 | const RGWStorageStats& stats, | |
740 | const uint64_t num_objs) const = 0; | |
741 | ||
742 | static const RGWQuotaInfoApplier& get_instance(const RGWQuotaInfo& qinfo); | |
743 | }; | |
744 | ||
745 | class RGWQuotaInfoDefApplier : public RGWQuotaInfoApplier { | |
746 | public: | |
20effc67 | 747 | bool is_size_exceeded(const DoutPrefixProvider *dpp, const char * const entity, |
7c673cae FG |
748 | const RGWQuotaInfo& qinfo, |
749 | const RGWStorageStats& stats, | |
750 | const uint64_t size) const override; | |
751 | ||
20effc67 | 752 | bool is_num_objs_exceeded(const DoutPrefixProvider *dpp, const char * const entity, |
7c673cae FG |
753 | const RGWQuotaInfo& qinfo, |
754 | const RGWStorageStats& stats, | |
755 | const uint64_t num_objs) const override; | |
756 | }; | |
757 | ||
758 | class RGWQuotaInfoRawApplier : public RGWQuotaInfoApplier { | |
759 | public: | |
20effc67 | 760 | bool is_size_exceeded(const DoutPrefixProvider *dpp, const char * const entity, |
7c673cae FG |
761 | const RGWQuotaInfo& qinfo, |
762 | const RGWStorageStats& stats, | |
763 | const uint64_t size) const override; | |
764 | ||
20effc67 | 765 | bool is_num_objs_exceeded(const DoutPrefixProvider *dpp, const char * const entity, |
7c673cae FG |
766 | const RGWQuotaInfo& qinfo, |
767 | const RGWStorageStats& stats, | |
768 | const uint64_t num_objs) const override; | |
769 | }; | |
770 | ||
771 | ||
20effc67 TL |
772 | bool RGWQuotaInfoDefApplier::is_size_exceeded(const DoutPrefixProvider *dpp, |
773 | const char * const entity, | |
7c673cae FG |
774 | const RGWQuotaInfo& qinfo, |
775 | const RGWStorageStats& stats, | |
776 | const uint64_t size) const | |
777 | { | |
778 | if (qinfo.max_size < 0) { | |
779 | /* The limit is not enabled. */ | |
780 | return false; | |
781 | } | |
782 | ||
783 | const uint64_t cur_size = stats.size_rounded; | |
784 | const uint64_t new_size = rgw_rounded_objsize(size); | |
785 | ||
1e59de90 | 786 | if (std::cmp_greater(cur_size + new_size, qinfo.max_size)) { |
20effc67 | 787 | ldpp_dout(dpp, 10) << "quota exceeded: stats.size_rounded=" << stats.size_rounded |
7c673cae FG |
788 | << " size=" << new_size << " " |
789 | << entity << "_quota.max_size=" << qinfo.max_size << dendl; | |
790 | return true; | |
791 | } | |
792 | ||
793 | return false; | |
794 | } | |
795 | ||
20effc67 TL |
796 | bool RGWQuotaInfoDefApplier::is_num_objs_exceeded(const DoutPrefixProvider *dpp, |
797 | const char * const entity, | |
7c673cae FG |
798 | const RGWQuotaInfo& qinfo, |
799 | const RGWStorageStats& stats, | |
800 | const uint64_t num_objs) const | |
801 | { | |
802 | if (qinfo.max_objects < 0) { | |
803 | /* The limit is not enabled. */ | |
804 | return false; | |
805 | } | |
806 | ||
1e59de90 | 807 | if (std::cmp_greater(stats.num_objects + num_objs, qinfo.max_objects)) { |
20effc67 | 808 | ldpp_dout(dpp, 10) << "quota exceeded: stats.num_objects=" << stats.num_objects |
7c673cae FG |
809 | << " " << entity << "_quota.max_objects=" << qinfo.max_objects |
810 | << dendl; | |
811 | return true; | |
812 | } | |
813 | ||
814 | return false; | |
815 | } | |
816 | ||
20effc67 TL |
817 | bool RGWQuotaInfoRawApplier::is_size_exceeded(const DoutPrefixProvider *dpp, |
818 | const char * const entity, | |
7c673cae FG |
819 | const RGWQuotaInfo& qinfo, |
820 | const RGWStorageStats& stats, | |
821 | const uint64_t size) const | |
822 | { | |
823 | if (qinfo.max_size < 0) { | |
824 | /* The limit is not enabled. */ | |
825 | return false; | |
826 | } | |
827 | ||
828 | const uint64_t cur_size = stats.size; | |
829 | ||
1e59de90 | 830 | if (std::cmp_greater(cur_size + size, qinfo.max_size)) { |
20effc67 | 831 | ldpp_dout(dpp, 10) << "quota exceeded: stats.size=" << stats.size |
7c673cae FG |
832 | << " size=" << size << " " |
833 | << entity << "_quota.max_size=" << qinfo.max_size << dendl; | |
834 | return true; | |
835 | } | |
836 | ||
837 | return false; | |
838 | } | |
839 | ||
20effc67 TL |
840 | bool RGWQuotaInfoRawApplier::is_num_objs_exceeded(const DoutPrefixProvider *dpp, |
841 | const char * const entity, | |
7c673cae FG |
842 | const RGWQuotaInfo& qinfo, |
843 | const RGWStorageStats& stats, | |
844 | const uint64_t num_objs) const | |
845 | { | |
846 | if (qinfo.max_objects < 0) { | |
847 | /* The limit is not enabled. */ | |
848 | return false; | |
849 | } | |
850 | ||
1e59de90 | 851 | if (std::cmp_greater(stats.num_objects + num_objs, qinfo.max_objects)) { |
20effc67 | 852 | ldpp_dout(dpp, 10) << "quota exceeded: stats.num_objects=" << stats.num_objects |
7c673cae FG |
853 | << " " << entity << "_quota.max_objects=" << qinfo.max_objects |
854 | << dendl; | |
855 | return true; | |
856 | } | |
857 | ||
858 | return false; | |
859 | } | |
860 | ||
861 | const RGWQuotaInfoApplier& RGWQuotaInfoApplier::get_instance( | |
862 | const RGWQuotaInfo& qinfo) | |
863 | { | |
864 | static RGWQuotaInfoDefApplier default_qapplier; | |
865 | static RGWQuotaInfoRawApplier raw_qapplier; | |
866 | ||
867 | if (qinfo.check_on_raw) { | |
868 | return raw_qapplier; | |
869 | } else { | |
870 | return default_qapplier; | |
871 | } | |
872 | } | |
873 | ||
874 | ||
875 | class RGWQuotaHandlerImpl : public RGWQuotaHandler { | |
1e59de90 | 876 | rgw::sal::Driver* driver; |
7c673cae FG |
877 | RGWBucketStatsCache bucket_stats_cache; |
878 | RGWUserStatsCache user_stats_cache; | |
879 | ||
20effc67 TL |
880 | int check_quota(const DoutPrefixProvider *dpp, |
881 | const char * const entity, | |
7c673cae FG |
882 | const RGWQuotaInfo& quota, |
883 | const RGWStorageStats& stats, | |
884 | const uint64_t num_objs, | |
885 | const uint64_t size) { | |
886 | if (!quota.enabled) { | |
887 | return 0; | |
888 | } | |
889 | ||
890 | const auto& quota_applier = RGWQuotaInfoApplier::get_instance(quota); | |
891 | ||
20effc67 | 892 | ldpp_dout(dpp, 20) << entity |
7c673cae FG |
893 | << " quota: max_objects=" << quota.max_objects |
894 | << " max_size=" << quota.max_size << dendl; | |
895 | ||
896 | ||
20effc67 | 897 | if (quota_applier.is_num_objs_exceeded(dpp, entity, quota, stats, num_objs)) { |
7c673cae FG |
898 | return -ERR_QUOTA_EXCEEDED; |
899 | } | |
900 | ||
20effc67 | 901 | if (quota_applier.is_size_exceeded(dpp, entity, quota, stats, size)) { |
7c673cae FG |
902 | return -ERR_QUOTA_EXCEEDED; |
903 | } | |
904 | ||
20effc67 | 905 | ldpp_dout(dpp, 20) << entity << " quota OK:" |
7c673cae FG |
906 | << " stats.num_objects=" << stats.num_objects |
907 | << " stats.size=" << stats.size << dendl; | |
908 | return 0; | |
909 | } | |
910 | public: | |
1e59de90 TL |
911 | RGWQuotaHandlerImpl(const DoutPrefixProvider *dpp, rgw::sal::Driver* _driver, bool quota_threads) : driver(_driver), |
912 | bucket_stats_cache(_driver), | |
913 | user_stats_cache(dpp, _driver, quota_threads) {} | |
7c673cae | 914 | |
20effc67 TL |
915 | int check_quota(const DoutPrefixProvider *dpp, |
916 | const rgw_user& user, | |
1e59de90 TL |
917 | rgw_bucket& bucket, |
918 | RGWQuota& quota, | |
919 | uint64_t num_objs, | |
920 | uint64_t size, optional_yield y) override { | |
7c673cae | 921 | |
1e59de90 | 922 | if (!quota.bucket_quota.enabled && !quota.user_quota.enabled) { |
7c673cae FG |
923 | return 0; |
924 | } | |
925 | ||
926 | /* | |
927 | * we need to fetch bucket stats if the user quota is enabled, because | |
928 | * the whole system relies on us periodically updating the user's bucket | |
929 | * stats in the user's header, this happens in get_stats() if we actually | |
930 | * fetch that info and not rely on cached data | |
931 | */ | |
932 | ||
1e59de90 TL |
933 | const DoutPrefix dp(driver->ctx(), dout_subsys, "rgw quota handler: "); |
934 | if (quota.bucket_quota.enabled) { | |
7c673cae | 935 | RGWStorageStats bucket_stats; |
522d829b | 936 | int ret = bucket_stats_cache.get_stats(user, bucket, bucket_stats, y, &dp); |
7c673cae FG |
937 | if (ret < 0) { |
938 | return ret; | |
939 | } | |
1e59de90 | 940 | ret = check_quota(dpp, "bucket", quota.bucket_quota, bucket_stats, num_objs, size); |
7c673cae FG |
941 | if (ret < 0) { |
942 | return ret; | |
943 | } | |
944 | } | |
945 | ||
1e59de90 | 946 | if (quota.user_quota.enabled) { |
7c673cae | 947 | RGWStorageStats user_stats; |
522d829b | 948 | int ret = user_stats_cache.get_stats(user, bucket, user_stats, y, &dp); |
7c673cae FG |
949 | if (ret < 0) { |
950 | return ret; | |
951 | } | |
1e59de90 | 952 | ret = check_quota(dpp, "user", quota.user_quota, user_stats, num_objs, size); |
7c673cae FG |
953 | if (ret < 0) { |
954 | return ret; | |
955 | } | |
956 | } | |
957 | return 0; | |
958 | } | |
959 | ||
960 | void update_stats(const rgw_user& user, rgw_bucket& bucket, int obj_delta, uint64_t added_bytes, uint64_t removed_bytes) override { | |
961 | bucket_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes); | |
962 | user_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes); | |
963 | } | |
31f18b77 | 964 | |
1e59de90 TL |
965 | void check_bucket_shards(const DoutPrefixProvider *dpp, uint64_t max_objs_per_shard, |
966 | uint64_t num_shards, uint64_t num_objs, bool is_multisite, | |
967 | bool& need_resharding, uint32_t *suggested_num_shards) override | |
31f18b77 | 968 | { |
9f95a23c | 969 | if (num_objs > num_shards * max_objs_per_shard) { |
20effc67 | 970 | ldpp_dout(dpp, 0) << __func__ << ": resharding needed: stats.num_objects=" << num_objs |
31f18b77 FG |
971 | << " shard max_objects=" << max_objs_per_shard * num_shards << dendl; |
972 | need_resharding = true; | |
973 | if (suggested_num_shards) { | |
1e59de90 TL |
974 | uint32_t obj_multiplier = 2; |
975 | if (is_multisite) { | |
976 | // if we're maintaining bilogs for multisite, reshards are significantly | |
977 | // more expensive. scale up the shard count much faster to minimize the | |
978 | // number of reshard events during a write workload | |
979 | obj_multiplier = 8; | |
980 | } | |
981 | *suggested_num_shards = num_objs * obj_multiplier / max_objs_per_shard; | |
31f18b77 FG |
982 | } |
983 | } else { | |
984 | need_resharding = false; | |
985 | } | |
31f18b77 | 986 | } |
7c673cae FG |
987 | }; |
988 | ||
989 | ||
1e59de90 | 990 | RGWQuotaHandler *RGWQuotaHandler::generate_handler(const DoutPrefixProvider *dpp, rgw::sal::Driver* driver, bool quota_threads) |
7c673cae | 991 | { |
1e59de90 | 992 | return new RGWQuotaHandlerImpl(dpp, driver, quota_threads); |
7c673cae FG |
993 | } |
994 | ||
995 | void RGWQuotaHandler::free_handler(RGWQuotaHandler *handler) | |
996 | { | |
997 | delete handler; | |
998 | } | |
31f18b77 FG |
999 | |
1000 | ||
11fdf7f2 | 1001 | void rgw_apply_default_bucket_quota(RGWQuotaInfo& quota, const ConfigProxy& conf) |
f64942e4 | 1002 | { |
11fdf7f2 TL |
1003 | if (conf->rgw_bucket_default_quota_max_objects >= 0) { |
1004 | quota.max_objects = conf->rgw_bucket_default_quota_max_objects; | |
f64942e4 AA |
1005 | quota.enabled = true; |
1006 | } | |
11fdf7f2 TL |
1007 | if (conf->rgw_bucket_default_quota_max_size >= 0) { |
1008 | quota.max_size = conf->rgw_bucket_default_quota_max_size; | |
f64942e4 AA |
1009 | quota.enabled = true; |
1010 | } | |
1011 | } | |
1012 | ||
11fdf7f2 | 1013 | void rgw_apply_default_user_quota(RGWQuotaInfo& quota, const ConfigProxy& conf) |
f64942e4 | 1014 | { |
11fdf7f2 TL |
1015 | if (conf->rgw_user_default_quota_max_objects >= 0) { |
1016 | quota.max_objects = conf->rgw_user_default_quota_max_objects; | |
f64942e4 AA |
1017 | quota.enabled = true; |
1018 | } | |
11fdf7f2 TL |
1019 | if (conf->rgw_user_default_quota_max_size >= 0) { |
1020 | quota.max_size = conf->rgw_user_default_quota_max_size; | |
f64942e4 AA |
1021 | quota.enabled = true; |
1022 | } | |
1023 | } | |
20effc67 TL |
1024 | |
1025 | void RGWQuotaInfo::dump(Formatter *f) const | |
1026 | { | |
1027 | f->dump_bool("enabled", enabled); | |
1028 | f->dump_bool("check_on_raw", check_on_raw); | |
1029 | ||
1030 | f->dump_int("max_size", max_size); | |
1031 | f->dump_int("max_size_kb", rgw_rounded_kb(max_size)); | |
1032 | f->dump_int("max_objects", max_objects); | |
1033 | } | |
1034 | ||
1035 | void RGWQuotaInfo::decode_json(JSONObj *obj) | |
1036 | { | |
1037 | if (false == JSONDecoder::decode_json("max_size", max_size, obj)) { | |
1038 | /* We're parsing an older version of the struct. */ | |
1039 | int64_t max_size_kb = 0; | |
1040 | ||
1041 | JSONDecoder::decode_json("max_size_kb", max_size_kb, obj); | |
1042 | max_size = max_size_kb * 1024; | |
1043 | } | |
1044 | JSONDecoder::decode_json("max_objects", max_objects, obj); | |
1045 | ||
1046 | JSONDecoder::decode_json("check_on_raw", check_on_raw, obj); | |
1047 | JSONDecoder::decode_json("enabled", enabled, obj); | |
1048 | } | |
1049 |