]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2013 Inktank, Inc | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | ||
16 | #include "include/utime.h" | |
17 | #include "common/lru_map.h" | |
18 | #include "common/RefCountedObj.h" | |
19 | #include "common/Thread.h" | |
20 | #include "common/Mutex.h" | |
21 | #include "common/RWLock.h" | |
22 | ||
23 | #include "rgw_common.h" | |
24 | #include "rgw_rados.h" | |
25 | #include "rgw_quota.h" | |
26 | #include "rgw_bucket.h" | |
27 | #include "rgw_user.h" | |
28 | ||
29 | #include <atomic> | |
30 | ||
31 | #define dout_context g_ceph_context | |
32 | #define dout_subsys ceph_subsys_rgw | |
33 | ||
34 | ||
35 | struct RGWQuotaCacheStats { | |
36 | RGWStorageStats stats; | |
37 | utime_t expiration; | |
38 | utime_t async_refresh_time; | |
39 | }; | |
40 | ||
41 | template<class T> | |
42 | class RGWQuotaCache { | |
43 | protected: | |
44 | RGWRados *store; | |
45 | lru_map<T, RGWQuotaCacheStats> stats_map; | |
46 | RefCountedWaitObject *async_refcount; | |
47 | ||
48 | class StatsAsyncTestSet : public lru_map<T, RGWQuotaCacheStats>::UpdateContext { | |
49 | int objs_delta; | |
50 | uint64_t added_bytes; | |
51 | uint64_t removed_bytes; | |
52 | public: | |
53 | StatsAsyncTestSet() : objs_delta(0), added_bytes(0), removed_bytes(0) {} | |
54 | bool update(RGWQuotaCacheStats *entry) override { | |
55 | if (entry->async_refresh_time.sec() == 0) | |
56 | return false; | |
57 | ||
58 | entry->async_refresh_time = utime_t(0, 0); | |
59 | ||
60 | return true; | |
61 | } | |
62 | }; | |
63 | ||
224ce89b | 64 | virtual int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) = 0; |
7c673cae | 65 | |
224ce89b | 66 | virtual bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0; |
7c673cae | 67 | |
224ce89b WB |
68 | virtual bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, typename lru_map<T, RGWQuotaCacheStats>::UpdateContext *ctx) = 0; |
69 | virtual void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0; | |
7c673cae FG |
70 | |
71 | virtual void data_modified(const rgw_user& user, rgw_bucket& bucket) {} | |
72 | public: | |
73 | RGWQuotaCache(RGWRados *_store, int size) : store(_store), stats_map(size) { | |
74 | async_refcount = new RefCountedWaitObject; | |
75 | } | |
76 | virtual ~RGWQuotaCache() { | |
77 | async_refcount->put_wait(); /* wait for all pending async requests to complete */ | |
78 | } | |
79 | ||
224ce89b | 80 | int get_stats(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota); |
7c673cae FG |
81 | void adjust_stats(const rgw_user& user, rgw_bucket& bucket, int objs_delta, uint64_t added_bytes, uint64_t removed_bytes); |
82 | ||
83 | virtual bool can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& stats); | |
84 | ||
224ce89b WB |
85 | void set_stats(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats); |
86 | int async_refresh(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs); | |
7c673cae | 87 | void async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats); |
c07f9fc5 | 88 | void async_refresh_fail(const rgw_user& user, rgw_bucket& bucket); |
7c673cae FG |
89 | |
90 | class AsyncRefreshHandler { | |
91 | protected: | |
92 | RGWRados *store; | |
93 | RGWQuotaCache<T> *cache; | |
94 | public: | |
95 | AsyncRefreshHandler(RGWRados *_store, RGWQuotaCache<T> *_cache) : store(_store), cache(_cache) {} | |
96 | virtual ~AsyncRefreshHandler() {} | |
97 | ||
98 | virtual int init_fetch() = 0; | |
99 | virtual void drop_reference() = 0; | |
100 | }; | |
101 | ||
224ce89b | 102 | virtual AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) = 0; |
7c673cae FG |
103 | }; |
104 | ||
105 | template<class T> | |
106 | bool RGWQuotaCache<T>::can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& cached_stats) | |
107 | { | |
108 | if (quota.max_size >= 0) { | |
109 | if (quota.max_size_soft_threshold < 0) { | |
110 | quota.max_size_soft_threshold = quota.max_size * store->ctx()->_conf->rgw_bucket_quota_soft_threshold; | |
111 | } | |
112 | ||
3efd9988 | 113 | if (cached_stats.size_rounded >= (uint64_t)quota.max_size_soft_threshold) { |
7c673cae | 114 | ldout(store->ctx(), 20) << "quota: can't use cached stats, exceeded soft threshold (size): " |
3efd9988 | 115 | << cached_stats.size_rounded << " >= " << quota.max_size_soft_threshold << dendl; |
7c673cae FG |
116 | return false; |
117 | } | |
118 | } | |
119 | ||
120 | if (quota.max_objects >= 0) { | |
121 | if (quota.max_objs_soft_threshold < 0) { | |
122 | quota.max_objs_soft_threshold = quota.max_objects * store->ctx()->_conf->rgw_bucket_quota_soft_threshold; | |
123 | } | |
124 | ||
125 | if (cached_stats.num_objects >= (uint64_t)quota.max_objs_soft_threshold) { | |
126 | ldout(store->ctx(), 20) << "quota: can't use cached stats, exceeded soft threshold (num objs): " | |
127 | << cached_stats.num_objects << " >= " << quota.max_objs_soft_threshold << dendl; | |
128 | return false; | |
129 | } | |
130 | } | |
131 | ||
132 | return true; | |
133 | } | |
134 | ||
135 | template<class T> | |
224ce89b | 136 | int RGWQuotaCache<T>::async_refresh(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) |
7c673cae FG |
137 | { |
138 | /* protect against multiple updates */ | |
139 | StatsAsyncTestSet test_update; | |
140 | if (!map_find_and_update(user, bucket, &test_update)) { | |
141 | /* most likely we just raced with another update */ | |
142 | return 0; | |
143 | } | |
144 | ||
145 | async_refcount->get(); | |
146 | ||
147 | ||
148 | AsyncRefreshHandler *handler = allocate_refresh_handler(user, bucket); | |
149 | ||
150 | int ret = handler->init_fetch(); | |
151 | if (ret < 0) { | |
152 | async_refcount->put(); | |
153 | handler->drop_reference(); | |
154 | return ret; | |
155 | } | |
156 | ||
157 | return 0; | |
158 | } | |
159 | ||
c07f9fc5 FG |
160 | template<class T> |
161 | void RGWQuotaCache<T>::async_refresh_fail(const rgw_user& user, rgw_bucket& bucket) | |
162 | { | |
163 | ldout(store->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl; | |
164 | ||
165 | async_refcount->put(); | |
166 | } | |
167 | ||
7c673cae FG |
168 | template<class T> |
169 | void RGWQuotaCache<T>::async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats) | |
170 | { | |
171 | ldout(store->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl; | |
172 | ||
173 | RGWQuotaCacheStats qs; | |
174 | ||
175 | map_find(user, bucket, qs); | |
176 | ||
177 | set_stats(user, bucket, qs, stats); | |
178 | ||
179 | async_refcount->put(); | |
180 | } | |
181 | ||
182 | template<class T> | |
224ce89b | 183 | void RGWQuotaCache<T>::set_stats(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats) |
7c673cae FG |
184 | { |
185 | qs.stats = stats; | |
186 | qs.expiration = ceph_clock_now(); | |
187 | qs.async_refresh_time = qs.expiration; | |
188 | qs.expiration += store->ctx()->_conf->rgw_bucket_quota_ttl; | |
189 | qs.async_refresh_time += store->ctx()->_conf->rgw_bucket_quota_ttl / 2; | |
190 | ||
191 | map_add(user, bucket, qs); | |
192 | } | |
193 | ||
194 | template<class T> | |
224ce89b | 195 | int RGWQuotaCache<T>::get_stats(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota) { |
7c673cae FG |
196 | RGWQuotaCacheStats qs; |
197 | utime_t now = ceph_clock_now(); | |
198 | if (map_find(user, bucket, qs)) { | |
199 | if (qs.async_refresh_time.sec() > 0 && now >= qs.async_refresh_time) { | |
200 | int r = async_refresh(user, bucket, qs); | |
201 | if (r < 0) { | |
202 | ldout(store->ctx(), 0) << "ERROR: quota async refresh returned ret=" << r << dendl; | |
203 | ||
204 | /* continue processing, might be a transient error, async refresh is just optimization */ | |
205 | } | |
206 | } | |
207 | ||
208 | if (can_use_cached_stats(quota, qs.stats) && qs.expiration > | |
209 | ceph_clock_now()) { | |
210 | stats = qs.stats; | |
211 | return 0; | |
212 | } | |
213 | } | |
214 | ||
215 | int ret = fetch_stats_from_storage(user, bucket, stats); | |
216 | if (ret < 0 && ret != -ENOENT) | |
217 | return ret; | |
218 | ||
219 | set_stats(user, bucket, qs, stats); | |
220 | ||
221 | return 0; | |
222 | } | |
223 | ||
224 | ||
225 | template<class T> | |
226 | class RGWQuotaStatsUpdate : public lru_map<T, RGWQuotaCacheStats>::UpdateContext { | |
227 | const int objs_delta; | |
228 | const uint64_t added_bytes; | |
229 | const uint64_t removed_bytes; | |
230 | public: | |
231 | RGWQuotaStatsUpdate(const int objs_delta, | |
232 | const uint64_t added_bytes, | |
233 | const uint64_t removed_bytes) | |
234 | : objs_delta(objs_delta), | |
235 | added_bytes(added_bytes), | |
236 | removed_bytes(removed_bytes) { | |
237 | } | |
238 | ||
239 | bool update(RGWQuotaCacheStats * const entry) override { | |
240 | const uint64_t rounded_added = rgw_rounded_objsize(added_bytes); | |
241 | const uint64_t rounded_removed = rgw_rounded_objsize(removed_bytes); | |
242 | ||
181888fb | 243 | if (((int64_t)(entry->stats.size + added_bytes - removed_bytes)) >= 0) { |
c07f9fc5 FG |
244 | entry->stats.size += added_bytes - removed_bytes; |
245 | } else { | |
246 | entry->stats.size = 0; | |
247 | } | |
248 | ||
181888fb | 249 | if (((int64_t)(entry->stats.size_rounded + rounded_added - rounded_removed)) >= 0) { |
c07f9fc5 FG |
250 | entry->stats.size_rounded += rounded_added - rounded_removed; |
251 | } else { | |
252 | entry->stats.size_rounded = 0; | |
253 | } | |
254 | ||
181888fb | 255 | if (((int64_t)(entry->stats.num_objects + objs_delta)) >= 0) { |
c07f9fc5 FG |
256 | entry->stats.num_objects += objs_delta; |
257 | } else { | |
258 | entry->stats.num_objects = 0; | |
259 | } | |
7c673cae FG |
260 | |
261 | return true; | |
262 | } | |
263 | }; | |
264 | ||
265 | ||
266 | template<class T> | |
267 | void RGWQuotaCache<T>::adjust_stats(const rgw_user& user, rgw_bucket& bucket, int objs_delta, | |
268 | uint64_t added_bytes, uint64_t removed_bytes) | |
269 | { | |
270 | RGWQuotaStatsUpdate<T> update(objs_delta, added_bytes, removed_bytes); | |
271 | map_find_and_update(user, bucket, &update); | |
272 | ||
273 | data_modified(user, bucket); | |
274 | } | |
275 | ||
276 | class BucketAsyncRefreshHandler : public RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler, | |
277 | public RGWGetBucketStats_CB { | |
278 | rgw_user user; | |
279 | public: | |
280 | BucketAsyncRefreshHandler(RGWRados *_store, RGWQuotaCache<rgw_bucket> *_cache, | |
224ce89b | 281 | const rgw_user& _user, const rgw_bucket& _bucket) : |
7c673cae FG |
282 | RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler(_store, _cache), |
283 | RGWGetBucketStats_CB(_bucket), user(_user) {} | |
284 | ||
285 | void drop_reference() override { put(); } | |
286 | void handle_response(int r) override; | |
287 | int init_fetch() override; | |
288 | }; | |
289 | ||
290 | int BucketAsyncRefreshHandler::init_fetch() | |
291 | { | |
292 | RGWBucketInfo bucket_info; | |
293 | ||
294 | RGWObjectCtx obj_ctx(store); | |
295 | ||
296 | int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, NULL, NULL); | |
297 | if (r < 0) { | |
298 | ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl; | |
299 | return r; | |
300 | } | |
301 | ||
302 | ldout(store->ctx(), 20) << "initiating async quota refresh for bucket=" << bucket << dendl; | |
303 | ||
304 | r = store->get_bucket_stats_async(bucket_info, RGW_NO_SHARD, this); | |
305 | if (r < 0) { | |
306 | ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket.name << dendl; | |
307 | ||
308 | /* get_bucket_stats_async() dropped our reference already */ | |
309 | return r; | |
310 | } | |
311 | ||
312 | return 0; | |
313 | } | |
314 | ||
315 | void BucketAsyncRefreshHandler::handle_response(const int r) | |
316 | { | |
317 | if (r < 0) { | |
318 | ldout(store->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl; | |
c07f9fc5 FG |
319 | cache->async_refresh_fail(user, bucket); |
320 | return; | |
7c673cae FG |
321 | } |
322 | ||
323 | RGWStorageStats bs; | |
324 | ||
325 | for (const auto& pair : *stats) { | |
326 | const RGWStorageStats& s = pair.second; | |
327 | ||
328 | bs.size += s.size; | |
329 | bs.size_rounded += s.size_rounded; | |
330 | bs.num_objects += s.num_objects; | |
331 | } | |
332 | ||
333 | cache->async_refresh_response(user, bucket, bs); | |
334 | } | |
335 | ||
336 | class RGWBucketStatsCache : public RGWQuotaCache<rgw_bucket> { | |
337 | protected: | |
224ce89b | 338 | bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override { |
7c673cae FG |
339 | return stats_map.find(bucket, qs); |
340 | } | |
341 | ||
224ce89b | 342 | bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, lru_map<rgw_bucket, RGWQuotaCacheStats>::UpdateContext *ctx) override { |
7c673cae FG |
343 | return stats_map.find_and_update(bucket, NULL, ctx); |
344 | } | |
345 | ||
224ce89b | 346 | void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override { |
7c673cae FG |
347 | stats_map.add(bucket, qs); |
348 | } | |
349 | ||
224ce89b | 350 | int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) override; |
7c673cae FG |
351 | |
352 | public: | |
353 | explicit RGWBucketStatsCache(RGWRados *_store) : RGWQuotaCache<rgw_bucket>(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size) { | |
354 | } | |
355 | ||
224ce89b | 356 | AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override { |
7c673cae FG |
357 | return new BucketAsyncRefreshHandler(store, this, user, bucket); |
358 | } | |
359 | }; | |
360 | ||
224ce89b | 361 | int RGWBucketStatsCache::fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) |
7c673cae FG |
362 | { |
363 | RGWBucketInfo bucket_info; | |
364 | ||
365 | RGWObjectCtx obj_ctx(store); | |
366 | ||
367 | int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, NULL, NULL); | |
368 | if (r < 0) { | |
369 | ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl; | |
370 | return r; | |
371 | } | |
372 | ||
373 | string bucket_ver; | |
374 | string master_ver; | |
375 | ||
376 | map<RGWObjCategory, RGWStorageStats> bucket_stats; | |
377 | r = store->get_bucket_stats(bucket_info, RGW_NO_SHARD, &bucket_ver, | |
378 | &master_ver, bucket_stats, nullptr); | |
379 | if (r < 0) { | |
380 | ldout(store->ctx(), 0) << "could not get bucket stats for bucket=" | |
381 | << bucket.name << dendl; | |
382 | return r; | |
383 | } | |
384 | ||
385 | stats = RGWStorageStats(); | |
386 | ||
387 | for (const auto& pair : bucket_stats) { | |
388 | const RGWStorageStats& s = pair.second; | |
389 | ||
390 | stats.size += s.size; | |
391 | stats.size_rounded += s.size_rounded; | |
392 | stats.num_objects += s.num_objects; | |
393 | } | |
394 | ||
395 | return 0; | |
396 | } | |
397 | ||
398 | class UserAsyncRefreshHandler : public RGWQuotaCache<rgw_user>::AsyncRefreshHandler, | |
399 | public RGWGetUserStats_CB { | |
400 | rgw_bucket bucket; | |
401 | public: | |
402 | UserAsyncRefreshHandler(RGWRados *_store, RGWQuotaCache<rgw_user> *_cache, | |
224ce89b | 403 | const rgw_user& _user, const rgw_bucket& _bucket) : |
7c673cae FG |
404 | RGWQuotaCache<rgw_user>::AsyncRefreshHandler(_store, _cache), |
405 | RGWGetUserStats_CB(_user), | |
406 | bucket(_bucket) {} | |
407 | ||
408 | void drop_reference() override { put(); } | |
409 | int init_fetch() override; | |
410 | void handle_response(int r) override; | |
411 | }; | |
412 | ||
413 | int UserAsyncRefreshHandler::init_fetch() | |
414 | { | |
415 | ldout(store->ctx(), 20) << "initiating async quota refresh for user=" << user << dendl; | |
416 | int r = store->get_user_stats_async(user, this); | |
417 | if (r < 0) { | |
418 | ldout(store->ctx(), 0) << "could not get bucket info for user=" << user << dendl; | |
419 | ||
420 | /* get_bucket_stats_async() dropped our reference already */ | |
421 | return r; | |
422 | } | |
423 | ||
424 | return 0; | |
425 | } | |
426 | ||
427 | void UserAsyncRefreshHandler::handle_response(int r) | |
428 | { | |
429 | if (r < 0) { | |
430 | ldout(store->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl; | |
c07f9fc5 FG |
431 | cache->async_refresh_fail(user, bucket); |
432 | return; | |
7c673cae FG |
433 | } |
434 | ||
435 | cache->async_refresh_response(user, bucket, stats); | |
436 | } | |
437 | ||
438 | class RGWUserStatsCache : public RGWQuotaCache<rgw_user> { | |
439 | std::atomic<bool> down_flag = { false }; | |
440 | RWLock rwlock; | |
441 | map<rgw_bucket, rgw_user> modified_buckets; | |
442 | ||
443 | /* thread, sync recent modified buckets info */ | |
444 | class BucketsSyncThread : public Thread { | |
445 | CephContext *cct; | |
446 | RGWUserStatsCache *stats; | |
447 | ||
448 | Mutex lock; | |
449 | Cond cond; | |
450 | public: | |
451 | ||
452 | BucketsSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s), lock("RGWUserStatsCache::BucketsSyncThread") {} | |
453 | ||
454 | void *entry() override { | |
455 | ldout(cct, 20) << "BucketsSyncThread: start" << dendl; | |
456 | do { | |
457 | map<rgw_bucket, rgw_user> buckets; | |
458 | ||
459 | stats->swap_modified_buckets(buckets); | |
460 | ||
461 | for (map<rgw_bucket, rgw_user>::iterator iter = buckets.begin(); iter != buckets.end(); ++iter) { | |
462 | rgw_bucket bucket = iter->first; | |
463 | rgw_user& user = iter->second; | |
464 | ldout(cct, 20) << "BucketsSyncThread: sync user=" << user << " bucket=" << bucket << dendl; | |
465 | int r = stats->sync_bucket(user, bucket); | |
466 | if (r < 0) { | |
467 | ldout(cct, 0) << "WARNING: sync_bucket() returned r=" << r << dendl; | |
468 | } | |
469 | } | |
470 | ||
471 | if (stats->going_down()) | |
472 | break; | |
473 | ||
474 | lock.Lock(); | |
475 | cond.WaitInterval(lock, utime_t(cct->_conf->rgw_user_quota_bucket_sync_interval, 0)); | |
476 | lock.Unlock(); | |
477 | } while (!stats->going_down()); | |
478 | ldout(cct, 20) << "BucketsSyncThread: done" << dendl; | |
479 | ||
480 | return NULL; | |
481 | } | |
482 | ||
483 | void stop() { | |
484 | Mutex::Locker l(lock); | |
485 | cond.Signal(); | |
486 | } | |
487 | }; | |
488 | ||
489 | /* | |
490 | * thread, full sync all users stats periodically | |
491 | * | |
492 | * only sync non idle users or ones that never got synced before, this is needed so that | |
493 | * users that didn't have quota turned on before (or existed before the user objclass | |
494 | * tracked stats) need to get their backend stats up to date. | |
495 | */ | |
496 | class UserSyncThread : public Thread { | |
497 | CephContext *cct; | |
498 | RGWUserStatsCache *stats; | |
499 | ||
500 | Mutex lock; | |
501 | Cond cond; | |
502 | public: | |
503 | ||
504 | UserSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s), lock("RGWUserStatsCache::UserSyncThread") {} | |
505 | ||
506 | void *entry() override { | |
507 | ldout(cct, 20) << "UserSyncThread: start" << dendl; | |
508 | do { | |
509 | int ret = stats->sync_all_users(); | |
510 | if (ret < 0) { | |
511 | ldout(cct, 5) << "ERROR: sync_all_users() returned ret=" << ret << dendl; | |
512 | } | |
513 | ||
b5b8bbf5 FG |
514 | if (stats->going_down()) |
515 | break; | |
516 | ||
7c673cae FG |
517 | lock.Lock(); |
518 | cond.WaitInterval(lock, utime_t(cct->_conf->rgw_user_quota_sync_interval, 0)); | |
519 | lock.Unlock(); | |
520 | } while (!stats->going_down()); | |
521 | ldout(cct, 20) << "UserSyncThread: done" << dendl; | |
522 | ||
523 | return NULL; | |
524 | } | |
525 | ||
526 | void stop() { | |
527 | Mutex::Locker l(lock); | |
528 | cond.Signal(); | |
529 | } | |
530 | }; | |
531 | ||
532 | BucketsSyncThread *buckets_sync_thread; | |
533 | UserSyncThread *user_sync_thread; | |
534 | protected: | |
224ce89b | 535 | bool map_find(const rgw_user& user,const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override { |
7c673cae FG |
536 | return stats_map.find(user, qs); |
537 | } | |
538 | ||
224ce89b | 539 | bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, lru_map<rgw_user, RGWQuotaCacheStats>::UpdateContext *ctx) override { |
7c673cae FG |
540 | return stats_map.find_and_update(user, NULL, ctx); |
541 | } | |
542 | ||
224ce89b | 543 | void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override { |
7c673cae FG |
544 | stats_map.add(user, qs); |
545 | } | |
546 | ||
224ce89b | 547 | int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) override; |
7c673cae FG |
548 | int sync_bucket(const rgw_user& rgw_user, rgw_bucket& bucket); |
549 | int sync_user(const rgw_user& user); | |
550 | int sync_all_users(); | |
551 | ||
552 | void data_modified(const rgw_user& user, rgw_bucket& bucket) override; | |
553 | ||
554 | void swap_modified_buckets(map<rgw_bucket, rgw_user>& out) { | |
555 | rwlock.get_write(); | |
556 | modified_buckets.swap(out); | |
557 | rwlock.unlock(); | |
558 | } | |
559 | ||
560 | template<class T> /* easier doing it as a template, Thread doesn't have ->stop() */ | |
561 | void stop_thread(T **pthr) { | |
562 | T *thread = *pthr; | |
563 | if (!thread) | |
564 | return; | |
565 | ||
566 | thread->stop(); | |
567 | thread->join(); | |
568 | delete thread; | |
569 | *pthr = NULL; | |
570 | } | |
571 | ||
572 | public: | |
573 | RGWUserStatsCache(RGWRados *_store, bool quota_threads) : RGWQuotaCache<rgw_user>(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size), | |
574 | rwlock("RGWUserStatsCache::rwlock") { | |
575 | if (quota_threads) { | |
576 | buckets_sync_thread = new BucketsSyncThread(store->ctx(), this); | |
577 | buckets_sync_thread->create("rgw_buck_st_syn"); | |
578 | user_sync_thread = new UserSyncThread(store->ctx(), this); | |
579 | user_sync_thread->create("rgw_user_st_syn"); | |
580 | } else { | |
581 | buckets_sync_thread = NULL; | |
582 | user_sync_thread = NULL; | |
583 | } | |
584 | } | |
585 | ~RGWUserStatsCache() override { | |
586 | stop(); | |
587 | } | |
588 | ||
224ce89b | 589 | AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override { |
7c673cae FG |
590 | return new UserAsyncRefreshHandler(store, this, user, bucket); |
591 | } | |
592 | ||
593 | bool can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& stats) override { | |
594 | /* in the user case, the cached stats may contain a better estimation of the totals, as | |
595 | * the backend is only periodically getting updated. | |
596 | */ | |
597 | return true; | |
598 | } | |
599 | ||
600 | bool going_down() { | |
601 | return down_flag; | |
602 | } | |
603 | ||
604 | void stop() { | |
605 | down_flag = true; | |
606 | rwlock.get_write(); | |
607 | stop_thread(&buckets_sync_thread); | |
608 | rwlock.unlock(); | |
609 | stop_thread(&user_sync_thread); | |
610 | } | |
611 | }; | |
612 | ||
224ce89b | 613 | int RGWUserStatsCache::fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) |
7c673cae FG |
614 | { |
615 | int r = store->get_user_stats(user, stats); | |
616 | if (r < 0) { | |
617 | ldout(store->ctx(), 0) << "could not get user stats for user=" << user << dendl; | |
618 | return r; | |
619 | } | |
620 | ||
621 | return 0; | |
622 | } | |
623 | ||
624 | int RGWUserStatsCache::sync_bucket(const rgw_user& user, rgw_bucket& bucket) | |
625 | { | |
626 | RGWBucketInfo bucket_info; | |
627 | ||
628 | RGWObjectCtx obj_ctx(store); | |
629 | ||
630 | int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, NULL, NULL); | |
631 | if (r < 0) { | |
632 | ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl; | |
633 | return r; | |
634 | } | |
635 | ||
636 | r = rgw_bucket_sync_user_stats(store, user, bucket_info); | |
637 | if (r < 0) { | |
638 | ldout(store->ctx(), 0) << "ERROR: rgw_bucket_sync_user_stats() for user=" << user << ", bucket=" << bucket << " returned " << r << dendl; | |
639 | return r; | |
640 | } | |
641 | ||
642 | return 0; | |
643 | } | |
644 | ||
645 | int RGWUserStatsCache::sync_user(const rgw_user& user) | |
646 | { | |
647 | cls_user_header header; | |
648 | string user_str = user.to_str(); | |
649 | int ret = store->cls_user_get_header(user_str, &header); | |
650 | if (ret < 0) { | |
651 | ldout(store->ctx(), 5) << "ERROR: can't read user header: ret=" << ret << dendl; | |
652 | return ret; | |
653 | } | |
654 | ||
655 | if (!store->ctx()->_conf->rgw_user_quota_sync_idle_users && | |
656 | header.last_stats_update < header.last_stats_sync) { | |
657 | ldout(store->ctx(), 20) << "user is idle, not doing a full sync (user=" << user << ")" << dendl; | |
658 | return 0; | |
659 | } | |
660 | ||
661 | real_time when_need_full_sync = header.last_stats_sync; | |
662 | when_need_full_sync += make_timespan(store->ctx()->_conf->rgw_user_quota_sync_wait_time); | |
663 | ||
664 | // check if enough time passed since last full sync | |
665 | /* FIXME: missing check? */ | |
666 | ||
667 | ret = rgw_user_sync_all_stats(store, user); | |
668 | if (ret < 0) { | |
669 | ldout(store->ctx(), 0) << "ERROR: failed user stats sync, ret=" << ret << dendl; | |
670 | return ret; | |
671 | } | |
672 | ||
673 | return 0; | |
674 | } | |
675 | ||
676 | int RGWUserStatsCache::sync_all_users() | |
677 | { | |
678 | string key = "user"; | |
679 | void *handle; | |
680 | ||
681 | int ret = store->meta_mgr->list_keys_init(key, &handle); | |
682 | if (ret < 0) { | |
683 | ldout(store->ctx(), 10) << "ERROR: can't get key: ret=" << ret << dendl; | |
684 | return ret; | |
685 | } | |
686 | ||
687 | bool truncated; | |
688 | int max = 1000; | |
689 | ||
690 | do { | |
691 | list<string> keys; | |
692 | ret = store->meta_mgr->list_keys_next(handle, max, keys, &truncated); | |
693 | if (ret < 0) { | |
694 | ldout(store->ctx(), 0) << "ERROR: lists_keys_next(): ret=" << ret << dendl; | |
695 | goto done; | |
696 | } | |
697 | for (list<string>::iterator iter = keys.begin(); | |
698 | iter != keys.end() && !going_down(); | |
699 | ++iter) { | |
700 | rgw_user user(*iter); | |
701 | ldout(store->ctx(), 20) << "RGWUserStatsCache: sync user=" << user << dendl; | |
702 | int ret = sync_user(user); | |
703 | if (ret < 0) { | |
704 | ldout(store->ctx(), 5) << "ERROR: sync_user() failed, user=" << user << " ret=" << ret << dendl; | |
705 | ||
706 | /* continuing to next user */ | |
707 | continue; | |
708 | } | |
709 | } | |
710 | } while (truncated); | |
711 | ||
712 | ret = 0; | |
713 | done: | |
714 | store->meta_mgr->list_keys_complete(handle); | |
715 | return ret; | |
716 | } | |
717 | ||
718 | void RGWUserStatsCache::data_modified(const rgw_user& user, rgw_bucket& bucket) | |
719 | { | |
720 | /* racy, but it's ok */ | |
721 | rwlock.get_read(); | |
722 | bool need_update = modified_buckets.find(bucket) == modified_buckets.end(); | |
723 | rwlock.unlock(); | |
724 | ||
725 | if (need_update) { | |
726 | rwlock.get_write(); | |
727 | modified_buckets[bucket] = user; | |
728 | rwlock.unlock(); | |
729 | } | |
730 | } | |
731 | ||
732 | ||
733 | class RGWQuotaInfoApplier { | |
734 | /* NOTE: no non-static field allowed as instances are supposed to live in | |
735 | * the static memory only. */ | |
736 | protected: | |
737 | RGWQuotaInfoApplier() = default; | |
738 | ||
739 | public: | |
740 | virtual ~RGWQuotaInfoApplier() {} | |
741 | ||
742 | virtual bool is_size_exceeded(const char * const entity, | |
743 | const RGWQuotaInfo& qinfo, | |
744 | const RGWStorageStats& stats, | |
745 | const uint64_t size) const = 0; | |
746 | ||
747 | virtual bool is_num_objs_exceeded(const char * const entity, | |
748 | const RGWQuotaInfo& qinfo, | |
749 | const RGWStorageStats& stats, | |
750 | const uint64_t num_objs) const = 0; | |
751 | ||
752 | static const RGWQuotaInfoApplier& get_instance(const RGWQuotaInfo& qinfo); | |
753 | }; | |
754 | ||
755 | class RGWQuotaInfoDefApplier : public RGWQuotaInfoApplier { | |
756 | public: | |
757 | bool is_size_exceeded(const char * const entity, | |
758 | const RGWQuotaInfo& qinfo, | |
759 | const RGWStorageStats& stats, | |
760 | const uint64_t size) const override; | |
761 | ||
762 | bool is_num_objs_exceeded(const char * const entity, | |
763 | const RGWQuotaInfo& qinfo, | |
764 | const RGWStorageStats& stats, | |
765 | const uint64_t num_objs) const override; | |
766 | }; | |
767 | ||
768 | class RGWQuotaInfoRawApplier : public RGWQuotaInfoApplier { | |
769 | public: | |
770 | bool is_size_exceeded(const char * const entity, | |
771 | const RGWQuotaInfo& qinfo, | |
772 | const RGWStorageStats& stats, | |
773 | const uint64_t size) const override; | |
774 | ||
775 | bool is_num_objs_exceeded(const char * const entity, | |
776 | const RGWQuotaInfo& qinfo, | |
777 | const RGWStorageStats& stats, | |
778 | const uint64_t num_objs) const override; | |
779 | }; | |
780 | ||
781 | ||
782 | bool RGWQuotaInfoDefApplier::is_size_exceeded(const char * const entity, | |
783 | const RGWQuotaInfo& qinfo, | |
784 | const RGWStorageStats& stats, | |
785 | const uint64_t size) const | |
786 | { | |
787 | if (qinfo.max_size < 0) { | |
788 | /* The limit is not enabled. */ | |
789 | return false; | |
790 | } | |
791 | ||
792 | const uint64_t cur_size = stats.size_rounded; | |
793 | const uint64_t new_size = rgw_rounded_objsize(size); | |
794 | ||
795 | if (cur_size + new_size > static_cast<uint64_t>(qinfo.max_size)) { | |
796 | dout(10) << "quota exceeded: stats.size_rounded=" << stats.size_rounded | |
797 | << " size=" << new_size << " " | |
798 | << entity << "_quota.max_size=" << qinfo.max_size << dendl; | |
799 | return true; | |
800 | } | |
801 | ||
802 | return false; | |
803 | } | |
804 | ||
805 | bool RGWQuotaInfoDefApplier::is_num_objs_exceeded(const char * const entity, | |
806 | const RGWQuotaInfo& qinfo, | |
807 | const RGWStorageStats& stats, | |
808 | const uint64_t num_objs) const | |
809 | { | |
810 | if (qinfo.max_objects < 0) { | |
811 | /* The limit is not enabled. */ | |
812 | return false; | |
813 | } | |
814 | ||
815 | if (stats.num_objects + num_objs > static_cast<uint64_t>(qinfo.max_objects)) { | |
816 | dout(10) << "quota exceeded: stats.num_objects=" << stats.num_objects | |
817 | << " " << entity << "_quota.max_objects=" << qinfo.max_objects | |
818 | << dendl; | |
819 | return true; | |
820 | } | |
821 | ||
822 | return false; | |
823 | } | |
824 | ||
825 | bool RGWQuotaInfoRawApplier::is_size_exceeded(const char * const entity, | |
826 | const RGWQuotaInfo& qinfo, | |
827 | const RGWStorageStats& stats, | |
828 | const uint64_t size) const | |
829 | { | |
830 | if (qinfo.max_size < 0) { | |
831 | /* The limit is not enabled. */ | |
832 | return false; | |
833 | } | |
834 | ||
835 | const uint64_t cur_size = stats.size; | |
836 | ||
837 | if (cur_size + size > static_cast<uint64_t>(qinfo.max_size)) { | |
838 | dout(10) << "quota exceeded: stats.size=" << stats.size | |
839 | << " size=" << size << " " | |
840 | << entity << "_quota.max_size=" << qinfo.max_size << dendl; | |
841 | return true; | |
842 | } | |
843 | ||
844 | return false; | |
845 | } | |
846 | ||
847 | bool RGWQuotaInfoRawApplier::is_num_objs_exceeded(const char * const entity, | |
848 | const RGWQuotaInfo& qinfo, | |
849 | const RGWStorageStats& stats, | |
850 | const uint64_t num_objs) const | |
851 | { | |
852 | if (qinfo.max_objects < 0) { | |
853 | /* The limit is not enabled. */ | |
854 | return false; | |
855 | } | |
856 | ||
857 | if (stats.num_objects + num_objs > static_cast<uint64_t>(qinfo.max_objects)) { | |
858 | dout(10) << "quota exceeded: stats.num_objects=" << stats.num_objects | |
859 | << " " << entity << "_quota.max_objects=" << qinfo.max_objects | |
860 | << dendl; | |
861 | return true; | |
862 | } | |
863 | ||
864 | return false; | |
865 | } | |
866 | ||
867 | const RGWQuotaInfoApplier& RGWQuotaInfoApplier::get_instance( | |
868 | const RGWQuotaInfo& qinfo) | |
869 | { | |
870 | static RGWQuotaInfoDefApplier default_qapplier; | |
871 | static RGWQuotaInfoRawApplier raw_qapplier; | |
872 | ||
873 | if (qinfo.check_on_raw) { | |
874 | return raw_qapplier; | |
875 | } else { | |
876 | return default_qapplier; | |
877 | } | |
878 | } | |
879 | ||
880 | ||
881 | class RGWQuotaHandlerImpl : public RGWQuotaHandler { | |
882 | RGWRados *store; | |
883 | RGWBucketStatsCache bucket_stats_cache; | |
884 | RGWUserStatsCache user_stats_cache; | |
885 | ||
886 | int check_quota(const char * const entity, | |
887 | const RGWQuotaInfo& quota, | |
888 | const RGWStorageStats& stats, | |
889 | const uint64_t num_objs, | |
890 | const uint64_t size) { | |
891 | if (!quota.enabled) { | |
892 | return 0; | |
893 | } | |
894 | ||
895 | const auto& quota_applier = RGWQuotaInfoApplier::get_instance(quota); | |
896 | ||
897 | ldout(store->ctx(), 20) << entity | |
898 | << " quota: max_objects=" << quota.max_objects | |
899 | << " max_size=" << quota.max_size << dendl; | |
900 | ||
901 | ||
902 | if (quota_applier.is_num_objs_exceeded(entity, quota, stats, num_objs)) { | |
903 | return -ERR_QUOTA_EXCEEDED; | |
904 | } | |
905 | ||
906 | if (quota_applier.is_size_exceeded(entity, quota, stats, size)) { | |
907 | return -ERR_QUOTA_EXCEEDED; | |
908 | } | |
909 | ||
910 | ldout(store->ctx(), 20) << entity << " quota OK:" | |
911 | << " stats.num_objects=" << stats.num_objects | |
912 | << " stats.size=" << stats.size << dendl; | |
913 | return 0; | |
914 | } | |
915 | public: | |
916 | RGWQuotaHandlerImpl(RGWRados *_store, bool quota_threads) : store(_store), | |
917 | bucket_stats_cache(_store), | |
918 | user_stats_cache(_store, quota_threads) {} | |
919 | ||
920 | int check_quota(const rgw_user& user, | |
921 | rgw_bucket& bucket, | |
922 | RGWQuotaInfo& user_quota, | |
923 | RGWQuotaInfo& bucket_quota, | |
924 | uint64_t num_objs, | |
925 | uint64_t size) override { | |
926 | ||
927 | if (!bucket_quota.enabled && !user_quota.enabled) { | |
928 | return 0; | |
929 | } | |
930 | ||
931 | /* | |
932 | * we need to fetch bucket stats if the user quota is enabled, because | |
933 | * the whole system relies on us periodically updating the user's bucket | |
934 | * stats in the user's header, this happens in get_stats() if we actually | |
935 | * fetch that info and not rely on cached data | |
936 | */ | |
937 | ||
938 | if (bucket_quota.enabled) { | |
939 | RGWStorageStats bucket_stats; | |
940 | int ret = bucket_stats_cache.get_stats(user, bucket, bucket_stats, | |
941 | bucket_quota); | |
942 | if (ret < 0) { | |
943 | return ret; | |
944 | } | |
945 | ret = check_quota("bucket", bucket_quota, bucket_stats, num_objs, size); | |
946 | if (ret < 0) { | |
947 | return ret; | |
948 | } | |
949 | } | |
950 | ||
951 | if (user_quota.enabled) { | |
952 | RGWStorageStats user_stats; | |
953 | int ret = user_stats_cache.get_stats(user, bucket, user_stats, user_quota); | |
954 | if (ret < 0) { | |
955 | return ret; | |
956 | } | |
957 | ret = check_quota("user", user_quota, user_stats, num_objs, size); | |
958 | if (ret < 0) { | |
959 | return ret; | |
960 | } | |
961 | } | |
962 | return 0; | |
963 | } | |
964 | ||
965 | void update_stats(const rgw_user& user, rgw_bucket& bucket, int obj_delta, uint64_t added_bytes, uint64_t removed_bytes) override { | |
966 | bucket_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes); | |
967 | user_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes); | |
968 | } | |
31f18b77 FG |
969 | |
970 | int check_bucket_shards(uint64_t max_objs_per_shard, uint64_t num_shards, | |
224ce89b | 971 | const rgw_user& user, const rgw_bucket& bucket, RGWQuotaInfo& bucket_quota, |
31f18b77 FG |
972 | uint64_t num_objs, bool& need_resharding, uint32_t *suggested_num_shards) |
973 | { | |
974 | RGWStorageStats bucket_stats; | |
975 | int ret = bucket_stats_cache.get_stats(user, bucket, bucket_stats, | |
976 | bucket_quota); | |
977 | if (ret < 0) { | |
978 | return ret; | |
979 | } | |
980 | ||
981 | if (bucket_stats.num_objects + num_objs > num_shards * max_objs_per_shard) { | |
982 | ldout(store->ctx(), 0) << __func__ << ": resharding needed: stats.num_objects=" << bucket_stats.num_objects | |
983 | << " shard max_objects=" << max_objs_per_shard * num_shards << dendl; | |
984 | need_resharding = true; | |
985 | if (suggested_num_shards) { | |
986 | *suggested_num_shards = (bucket_stats.num_objects + num_objs) * 2 / max_objs_per_shard; | |
987 | } | |
988 | } else { | |
989 | need_resharding = false; | |
990 | } | |
991 | ||
992 | return 0; | |
993 | } | |
994 | ||
7c673cae FG |
995 | }; |
996 | ||
997 | ||
998 | RGWQuotaHandler *RGWQuotaHandler::generate_handler(RGWRados *store, bool quota_threads) | |
999 | { | |
1000 | return new RGWQuotaHandlerImpl(store, quota_threads); | |
1001 | } | |
1002 | ||
1003 | void RGWQuotaHandler::free_handler(RGWQuotaHandler *handler) | |
1004 | { | |
1005 | delete handler; | |
1006 | } | |
31f18b77 FG |
1007 | |
1008 | ||
f64942e4 AA |
1009 | void rgw_apply_default_bucket_quota(RGWQuotaInfo& quota, const CephContext* cct) |
1010 | { | |
1011 | if (cct->_conf->rgw_bucket_default_quota_max_objects >= 0) { | |
1012 | quota.max_objects = cct->_conf->rgw_bucket_default_quota_max_objects; | |
1013 | quota.enabled = true; | |
1014 | } | |
1015 | if (cct->_conf->rgw_bucket_default_quota_max_size >= 0) { | |
1016 | quota.max_size = cct->_conf->rgw_bucket_default_quota_max_size; | |
1017 | quota.enabled = true; | |
1018 | } | |
1019 | } | |
1020 | ||
1021 | void rgw_apply_default_user_quota(RGWQuotaInfo& quota, const CephContext* cct) | |
1022 | { | |
1023 | if (cct->_conf->rgw_user_default_quota_max_objects >= 0) { | |
1024 | quota.max_objects = cct->_conf->rgw_user_default_quota_max_objects; | |
1025 | quota.enabled = true; | |
1026 | } | |
1027 | if (cct->_conf->rgw_user_default_quota_max_size >= 0) { | |
1028 | quota.max_size = cct->_conf->rgw_user_default_quota_max_size; | |
1029 | quota.enabled = true; | |
1030 | } | |
1031 | } |