]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2013 Inktank, Inc | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | ||
16 | #include "include/utime.h" | |
17 | #include "common/lru_map.h" | |
18 | #include "common/RefCountedObj.h" | |
19 | #include "common/Thread.h" | |
20 | #include "common/Mutex.h" | |
21 | #include "common/RWLock.h" | |
22 | ||
23 | #include "rgw_common.h" | |
24 | #include "rgw_rados.h" | |
25 | #include "rgw_quota.h" | |
26 | #include "rgw_bucket.h" | |
27 | #include "rgw_user.h" | |
28 | ||
29 | #include <atomic> | |
30 | ||
31 | #define dout_context g_ceph_context | |
32 | #define dout_subsys ceph_subsys_rgw | |
33 | ||
34 | ||
35 | struct RGWQuotaCacheStats { | |
36 | RGWStorageStats stats; | |
37 | utime_t expiration; | |
38 | utime_t async_refresh_time; | |
39 | }; | |
40 | ||
41 | template<class T> | |
42 | class RGWQuotaCache { | |
43 | protected: | |
44 | RGWRados *store; | |
45 | lru_map<T, RGWQuotaCacheStats> stats_map; | |
46 | RefCountedWaitObject *async_refcount; | |
47 | ||
48 | class StatsAsyncTestSet : public lru_map<T, RGWQuotaCacheStats>::UpdateContext { | |
49 | int objs_delta; | |
50 | uint64_t added_bytes; | |
51 | uint64_t removed_bytes; | |
52 | public: | |
53 | StatsAsyncTestSet() : objs_delta(0), added_bytes(0), removed_bytes(0) {} | |
54 | bool update(RGWQuotaCacheStats *entry) override { | |
55 | if (entry->async_refresh_time.sec() == 0) | |
56 | return false; | |
57 | ||
58 | entry->async_refresh_time = utime_t(0, 0); | |
59 | ||
60 | return true; | |
61 | } | |
62 | }; | |
63 | ||
224ce89b | 64 | virtual int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) = 0; |
7c673cae | 65 | |
224ce89b | 66 | virtual bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0; |
7c673cae | 67 | |
224ce89b WB |
68 | virtual bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, typename lru_map<T, RGWQuotaCacheStats>::UpdateContext *ctx) = 0; |
69 | virtual void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0; | |
7c673cae FG |
70 | |
71 | virtual void data_modified(const rgw_user& user, rgw_bucket& bucket) {} | |
72 | public: | |
73 | RGWQuotaCache(RGWRados *_store, int size) : store(_store), stats_map(size) { | |
74 | async_refcount = new RefCountedWaitObject; | |
75 | } | |
76 | virtual ~RGWQuotaCache() { | |
77 | async_refcount->put_wait(); /* wait for all pending async requests to complete */ | |
78 | } | |
79 | ||
224ce89b | 80 | int get_stats(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota); |
7c673cae FG |
81 | void adjust_stats(const rgw_user& user, rgw_bucket& bucket, int objs_delta, uint64_t added_bytes, uint64_t removed_bytes); |
82 | ||
83 | virtual bool can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& stats); | |
84 | ||
224ce89b WB |
85 | void set_stats(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats); |
86 | int async_refresh(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs); | |
7c673cae | 87 | void async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats); |
c07f9fc5 | 88 | void async_refresh_fail(const rgw_user& user, rgw_bucket& bucket); |
7c673cae FG |
89 | |
90 | class AsyncRefreshHandler { | |
91 | protected: | |
92 | RGWRados *store; | |
93 | RGWQuotaCache<T> *cache; | |
94 | public: | |
95 | AsyncRefreshHandler(RGWRados *_store, RGWQuotaCache<T> *_cache) : store(_store), cache(_cache) {} | |
96 | virtual ~AsyncRefreshHandler() {} | |
97 | ||
98 | virtual int init_fetch() = 0; | |
99 | virtual void drop_reference() = 0; | |
100 | }; | |
101 | ||
224ce89b | 102 | virtual AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) = 0; |
7c673cae FG |
103 | }; |
104 | ||
105 | template<class T> | |
106 | bool RGWQuotaCache<T>::can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& cached_stats) | |
107 | { | |
108 | if (quota.max_size >= 0) { | |
109 | if (quota.max_size_soft_threshold < 0) { | |
110 | quota.max_size_soft_threshold = quota.max_size * store->ctx()->_conf->rgw_bucket_quota_soft_threshold; | |
111 | } | |
112 | ||
113 | const auto cached_stats_num_kb_rounded = rgw_rounded_kb(cached_stats.size_rounded); | |
114 | if (cached_stats_num_kb_rounded >= (uint64_t)quota.max_size_soft_threshold) { | |
115 | ldout(store->ctx(), 20) << "quota: can't use cached stats, exceeded soft threshold (size): " | |
116 | << cached_stats_num_kb_rounded << " >= " << quota.max_size_soft_threshold << dendl; | |
117 | return false; | |
118 | } | |
119 | } | |
120 | ||
121 | if (quota.max_objects >= 0) { | |
122 | if (quota.max_objs_soft_threshold < 0) { | |
123 | quota.max_objs_soft_threshold = quota.max_objects * store->ctx()->_conf->rgw_bucket_quota_soft_threshold; | |
124 | } | |
125 | ||
126 | if (cached_stats.num_objects >= (uint64_t)quota.max_objs_soft_threshold) { | |
127 | ldout(store->ctx(), 20) << "quota: can't use cached stats, exceeded soft threshold (num objs): " | |
128 | << cached_stats.num_objects << " >= " << quota.max_objs_soft_threshold << dendl; | |
129 | return false; | |
130 | } | |
131 | } | |
132 | ||
133 | return true; | |
134 | } | |
135 | ||
136 | template<class T> | |
224ce89b | 137 | int RGWQuotaCache<T>::async_refresh(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) |
7c673cae FG |
138 | { |
139 | /* protect against multiple updates */ | |
140 | StatsAsyncTestSet test_update; | |
141 | if (!map_find_and_update(user, bucket, &test_update)) { | |
142 | /* most likely we just raced with another update */ | |
143 | return 0; | |
144 | } | |
145 | ||
146 | async_refcount->get(); | |
147 | ||
148 | ||
149 | AsyncRefreshHandler *handler = allocate_refresh_handler(user, bucket); | |
150 | ||
151 | int ret = handler->init_fetch(); | |
152 | if (ret < 0) { | |
153 | async_refcount->put(); | |
154 | handler->drop_reference(); | |
155 | return ret; | |
156 | } | |
157 | ||
158 | return 0; | |
159 | } | |
160 | ||
c07f9fc5 FG |
161 | template<class T> |
162 | void RGWQuotaCache<T>::async_refresh_fail(const rgw_user& user, rgw_bucket& bucket) | |
163 | { | |
164 | ldout(store->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl; | |
165 | ||
166 | async_refcount->put(); | |
167 | } | |
168 | ||
7c673cae FG |
169 | template<class T> |
170 | void RGWQuotaCache<T>::async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats) | |
171 | { | |
172 | ldout(store->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl; | |
173 | ||
174 | RGWQuotaCacheStats qs; | |
175 | ||
176 | map_find(user, bucket, qs); | |
177 | ||
178 | set_stats(user, bucket, qs, stats); | |
179 | ||
180 | async_refcount->put(); | |
181 | } | |
182 | ||
183 | template<class T> | |
224ce89b | 184 | void RGWQuotaCache<T>::set_stats(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats) |
7c673cae FG |
185 | { |
186 | qs.stats = stats; | |
187 | qs.expiration = ceph_clock_now(); | |
188 | qs.async_refresh_time = qs.expiration; | |
189 | qs.expiration += store->ctx()->_conf->rgw_bucket_quota_ttl; | |
190 | qs.async_refresh_time += store->ctx()->_conf->rgw_bucket_quota_ttl / 2; | |
191 | ||
192 | map_add(user, bucket, qs); | |
193 | } | |
194 | ||
195 | template<class T> | |
224ce89b | 196 | int RGWQuotaCache<T>::get_stats(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota) { |
7c673cae FG |
197 | RGWQuotaCacheStats qs; |
198 | utime_t now = ceph_clock_now(); | |
199 | if (map_find(user, bucket, qs)) { | |
200 | if (qs.async_refresh_time.sec() > 0 && now >= qs.async_refresh_time) { | |
201 | int r = async_refresh(user, bucket, qs); | |
202 | if (r < 0) { | |
203 | ldout(store->ctx(), 0) << "ERROR: quota async refresh returned ret=" << r << dendl; | |
204 | ||
205 | /* continue processing, might be a transient error, async refresh is just optimization */ | |
206 | } | |
207 | } | |
208 | ||
209 | if (can_use_cached_stats(quota, qs.stats) && qs.expiration > | |
210 | ceph_clock_now()) { | |
211 | stats = qs.stats; | |
212 | return 0; | |
213 | } | |
214 | } | |
215 | ||
216 | int ret = fetch_stats_from_storage(user, bucket, stats); | |
217 | if (ret < 0 && ret != -ENOENT) | |
218 | return ret; | |
219 | ||
220 | set_stats(user, bucket, qs, stats); | |
221 | ||
222 | return 0; | |
223 | } | |
224 | ||
225 | ||
226 | template<class T> | |
227 | class RGWQuotaStatsUpdate : public lru_map<T, RGWQuotaCacheStats>::UpdateContext { | |
228 | const int objs_delta; | |
229 | const uint64_t added_bytes; | |
230 | const uint64_t removed_bytes; | |
231 | public: | |
232 | RGWQuotaStatsUpdate(const int objs_delta, | |
233 | const uint64_t added_bytes, | |
234 | const uint64_t removed_bytes) | |
235 | : objs_delta(objs_delta), | |
236 | added_bytes(added_bytes), | |
237 | removed_bytes(removed_bytes) { | |
238 | } | |
239 | ||
240 | bool update(RGWQuotaCacheStats * const entry) override { | |
241 | const uint64_t rounded_added = rgw_rounded_objsize(added_bytes); | |
242 | const uint64_t rounded_removed = rgw_rounded_objsize(removed_bytes); | |
243 | ||
c07f9fc5 FG |
244 | if ((entry->stats.size + added_bytes - removed_bytes) >= 0) { |
245 | entry->stats.size += added_bytes - removed_bytes; | |
246 | } else { | |
247 | entry->stats.size = 0; | |
248 | } | |
249 | ||
250 | if ((entry->stats.size_rounded + rounded_added - rounded_removed) >= 0) { | |
251 | entry->stats.size_rounded += rounded_added - rounded_removed; | |
252 | } else { | |
253 | entry->stats.size_rounded = 0; | |
254 | } | |
255 | ||
256 | if ((entry->stats.num_objects + objs_delta) >= 0) { | |
257 | entry->stats.num_objects += objs_delta; | |
258 | } else { | |
259 | entry->stats.num_objects = 0; | |
260 | } | |
7c673cae FG |
261 | |
262 | return true; | |
263 | } | |
264 | }; | |
265 | ||
266 | ||
267 | template<class T> | |
268 | void RGWQuotaCache<T>::adjust_stats(const rgw_user& user, rgw_bucket& bucket, int objs_delta, | |
269 | uint64_t added_bytes, uint64_t removed_bytes) | |
270 | { | |
271 | RGWQuotaStatsUpdate<T> update(objs_delta, added_bytes, removed_bytes); | |
272 | map_find_and_update(user, bucket, &update); | |
273 | ||
274 | data_modified(user, bucket); | |
275 | } | |
276 | ||
277 | class BucketAsyncRefreshHandler : public RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler, | |
278 | public RGWGetBucketStats_CB { | |
279 | rgw_user user; | |
280 | public: | |
281 | BucketAsyncRefreshHandler(RGWRados *_store, RGWQuotaCache<rgw_bucket> *_cache, | |
224ce89b | 282 | const rgw_user& _user, const rgw_bucket& _bucket) : |
7c673cae FG |
283 | RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler(_store, _cache), |
284 | RGWGetBucketStats_CB(_bucket), user(_user) {} | |
285 | ||
286 | void drop_reference() override { put(); } | |
287 | void handle_response(int r) override; | |
288 | int init_fetch() override; | |
289 | }; | |
290 | ||
291 | int BucketAsyncRefreshHandler::init_fetch() | |
292 | { | |
293 | RGWBucketInfo bucket_info; | |
294 | ||
295 | RGWObjectCtx obj_ctx(store); | |
296 | ||
297 | int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, NULL, NULL); | |
298 | if (r < 0) { | |
299 | ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl; | |
300 | return r; | |
301 | } | |
302 | ||
303 | ldout(store->ctx(), 20) << "initiating async quota refresh for bucket=" << bucket << dendl; | |
304 | ||
305 | r = store->get_bucket_stats_async(bucket_info, RGW_NO_SHARD, this); | |
306 | if (r < 0) { | |
307 | ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket.name << dendl; | |
308 | ||
309 | /* get_bucket_stats_async() dropped our reference already */ | |
310 | return r; | |
311 | } | |
312 | ||
313 | return 0; | |
314 | } | |
315 | ||
316 | void BucketAsyncRefreshHandler::handle_response(const int r) | |
317 | { | |
318 | if (r < 0) { | |
319 | ldout(store->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl; | |
c07f9fc5 FG |
320 | cache->async_refresh_fail(user, bucket); |
321 | return; | |
7c673cae FG |
322 | } |
323 | ||
324 | RGWStorageStats bs; | |
325 | ||
326 | for (const auto& pair : *stats) { | |
327 | const RGWStorageStats& s = pair.second; | |
328 | ||
329 | bs.size += s.size; | |
330 | bs.size_rounded += s.size_rounded; | |
331 | bs.num_objects += s.num_objects; | |
332 | } | |
333 | ||
334 | cache->async_refresh_response(user, bucket, bs); | |
335 | } | |
336 | ||
337 | class RGWBucketStatsCache : public RGWQuotaCache<rgw_bucket> { | |
338 | protected: | |
224ce89b | 339 | bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override { |
7c673cae FG |
340 | return stats_map.find(bucket, qs); |
341 | } | |
342 | ||
224ce89b | 343 | bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, lru_map<rgw_bucket, RGWQuotaCacheStats>::UpdateContext *ctx) override { |
7c673cae FG |
344 | return stats_map.find_and_update(bucket, NULL, ctx); |
345 | } | |
346 | ||
224ce89b | 347 | void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override { |
7c673cae FG |
348 | stats_map.add(bucket, qs); |
349 | } | |
350 | ||
224ce89b | 351 | int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) override; |
7c673cae FG |
352 | |
353 | public: | |
354 | explicit RGWBucketStatsCache(RGWRados *_store) : RGWQuotaCache<rgw_bucket>(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size) { | |
355 | } | |
356 | ||
224ce89b | 357 | AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override { |
7c673cae FG |
358 | return new BucketAsyncRefreshHandler(store, this, user, bucket); |
359 | } | |
360 | }; | |
361 | ||
224ce89b | 362 | int RGWBucketStatsCache::fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) |
7c673cae FG |
363 | { |
364 | RGWBucketInfo bucket_info; | |
365 | ||
366 | RGWObjectCtx obj_ctx(store); | |
367 | ||
368 | int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, NULL, NULL); | |
369 | if (r < 0) { | |
370 | ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl; | |
371 | return r; | |
372 | } | |
373 | ||
374 | string bucket_ver; | |
375 | string master_ver; | |
376 | ||
377 | map<RGWObjCategory, RGWStorageStats> bucket_stats; | |
378 | r = store->get_bucket_stats(bucket_info, RGW_NO_SHARD, &bucket_ver, | |
379 | &master_ver, bucket_stats, nullptr); | |
380 | if (r < 0) { | |
381 | ldout(store->ctx(), 0) << "could not get bucket stats for bucket=" | |
382 | << bucket.name << dendl; | |
383 | return r; | |
384 | } | |
385 | ||
386 | stats = RGWStorageStats(); | |
387 | ||
388 | for (const auto& pair : bucket_stats) { | |
389 | const RGWStorageStats& s = pair.second; | |
390 | ||
391 | stats.size += s.size; | |
392 | stats.size_rounded += s.size_rounded; | |
393 | stats.num_objects += s.num_objects; | |
394 | } | |
395 | ||
396 | return 0; | |
397 | } | |
398 | ||
399 | class UserAsyncRefreshHandler : public RGWQuotaCache<rgw_user>::AsyncRefreshHandler, | |
400 | public RGWGetUserStats_CB { | |
401 | rgw_bucket bucket; | |
402 | public: | |
403 | UserAsyncRefreshHandler(RGWRados *_store, RGWQuotaCache<rgw_user> *_cache, | |
224ce89b | 404 | const rgw_user& _user, const rgw_bucket& _bucket) : |
7c673cae FG |
405 | RGWQuotaCache<rgw_user>::AsyncRefreshHandler(_store, _cache), |
406 | RGWGetUserStats_CB(_user), | |
407 | bucket(_bucket) {} | |
408 | ||
409 | void drop_reference() override { put(); } | |
410 | int init_fetch() override; | |
411 | void handle_response(int r) override; | |
412 | }; | |
413 | ||
414 | int UserAsyncRefreshHandler::init_fetch() | |
415 | { | |
416 | ldout(store->ctx(), 20) << "initiating async quota refresh for user=" << user << dendl; | |
417 | int r = store->get_user_stats_async(user, this); | |
418 | if (r < 0) { | |
419 | ldout(store->ctx(), 0) << "could not get bucket info for user=" << user << dendl; | |
420 | ||
421 | /* get_bucket_stats_async() dropped our reference already */ | |
422 | return r; | |
423 | } | |
424 | ||
425 | return 0; | |
426 | } | |
427 | ||
428 | void UserAsyncRefreshHandler::handle_response(int r) | |
429 | { | |
430 | if (r < 0) { | |
431 | ldout(store->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl; | |
c07f9fc5 FG |
432 | cache->async_refresh_fail(user, bucket); |
433 | return; | |
7c673cae FG |
434 | } |
435 | ||
436 | cache->async_refresh_response(user, bucket, stats); | |
437 | } | |
438 | ||
439 | class RGWUserStatsCache : public RGWQuotaCache<rgw_user> { | |
440 | std::atomic<bool> down_flag = { false }; | |
441 | RWLock rwlock; | |
442 | map<rgw_bucket, rgw_user> modified_buckets; | |
443 | ||
444 | /* thread, sync recent modified buckets info */ | |
445 | class BucketsSyncThread : public Thread { | |
446 | CephContext *cct; | |
447 | RGWUserStatsCache *stats; | |
448 | ||
449 | Mutex lock; | |
450 | Cond cond; | |
451 | public: | |
452 | ||
453 | BucketsSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s), lock("RGWUserStatsCache::BucketsSyncThread") {} | |
454 | ||
455 | void *entry() override { | |
456 | ldout(cct, 20) << "BucketsSyncThread: start" << dendl; | |
457 | do { | |
458 | map<rgw_bucket, rgw_user> buckets; | |
459 | ||
460 | stats->swap_modified_buckets(buckets); | |
461 | ||
462 | for (map<rgw_bucket, rgw_user>::iterator iter = buckets.begin(); iter != buckets.end(); ++iter) { | |
463 | rgw_bucket bucket = iter->first; | |
464 | rgw_user& user = iter->second; | |
465 | ldout(cct, 20) << "BucketsSyncThread: sync user=" << user << " bucket=" << bucket << dendl; | |
466 | int r = stats->sync_bucket(user, bucket); | |
467 | if (r < 0) { | |
468 | ldout(cct, 0) << "WARNING: sync_bucket() returned r=" << r << dendl; | |
469 | } | |
470 | } | |
471 | ||
472 | if (stats->going_down()) | |
473 | break; | |
474 | ||
475 | lock.Lock(); | |
476 | cond.WaitInterval(lock, utime_t(cct->_conf->rgw_user_quota_bucket_sync_interval, 0)); | |
477 | lock.Unlock(); | |
478 | } while (!stats->going_down()); | |
479 | ldout(cct, 20) << "BucketsSyncThread: done" << dendl; | |
480 | ||
481 | return NULL; | |
482 | } | |
483 | ||
484 | void stop() { | |
485 | Mutex::Locker l(lock); | |
486 | cond.Signal(); | |
487 | } | |
488 | }; | |
489 | ||
490 | /* | |
491 | * thread, full sync all users stats periodically | |
492 | * | |
493 | * only sync non idle users or ones that never got synced before, this is needed so that | |
494 | * users that didn't have quota turned on before (or existed before the user objclass | |
495 | * tracked stats) need to get their backend stats up to date. | |
496 | */ | |
497 | class UserSyncThread : public Thread { | |
498 | CephContext *cct; | |
499 | RGWUserStatsCache *stats; | |
500 | ||
501 | Mutex lock; | |
502 | Cond cond; | |
503 | public: | |
504 | ||
505 | UserSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s), lock("RGWUserStatsCache::UserSyncThread") {} | |
506 | ||
507 | void *entry() override { | |
508 | ldout(cct, 20) << "UserSyncThread: start" << dendl; | |
509 | do { | |
510 | int ret = stats->sync_all_users(); | |
511 | if (ret < 0) { | |
512 | ldout(cct, 5) << "ERROR: sync_all_users() returned ret=" << ret << dendl; | |
513 | } | |
514 | ||
b5b8bbf5 FG |
515 | if (stats->going_down()) |
516 | break; | |
517 | ||
7c673cae FG |
518 | lock.Lock(); |
519 | cond.WaitInterval(lock, utime_t(cct->_conf->rgw_user_quota_sync_interval, 0)); | |
520 | lock.Unlock(); | |
521 | } while (!stats->going_down()); | |
522 | ldout(cct, 20) << "UserSyncThread: done" << dendl; | |
523 | ||
524 | return NULL; | |
525 | } | |
526 | ||
527 | void stop() { | |
528 | Mutex::Locker l(lock); | |
529 | cond.Signal(); | |
530 | } | |
531 | }; | |
532 | ||
533 | BucketsSyncThread *buckets_sync_thread; | |
534 | UserSyncThread *user_sync_thread; | |
535 | protected: | |
224ce89b | 536 | bool map_find(const rgw_user& user,const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override { |
7c673cae FG |
537 | return stats_map.find(user, qs); |
538 | } | |
539 | ||
224ce89b | 540 | bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, lru_map<rgw_user, RGWQuotaCacheStats>::UpdateContext *ctx) override { |
7c673cae FG |
541 | return stats_map.find_and_update(user, NULL, ctx); |
542 | } | |
543 | ||
224ce89b | 544 | void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override { |
7c673cae FG |
545 | stats_map.add(user, qs); |
546 | } | |
547 | ||
224ce89b | 548 | int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) override; |
7c673cae FG |
549 | int sync_bucket(const rgw_user& rgw_user, rgw_bucket& bucket); |
550 | int sync_user(const rgw_user& user); | |
551 | int sync_all_users(); | |
552 | ||
553 | void data_modified(const rgw_user& user, rgw_bucket& bucket) override; | |
554 | ||
555 | void swap_modified_buckets(map<rgw_bucket, rgw_user>& out) { | |
556 | rwlock.get_write(); | |
557 | modified_buckets.swap(out); | |
558 | rwlock.unlock(); | |
559 | } | |
560 | ||
561 | template<class T> /* easier doing it as a template, Thread doesn't have ->stop() */ | |
562 | void stop_thread(T **pthr) { | |
563 | T *thread = *pthr; | |
564 | if (!thread) | |
565 | return; | |
566 | ||
567 | thread->stop(); | |
568 | thread->join(); | |
569 | delete thread; | |
570 | *pthr = NULL; | |
571 | } | |
572 | ||
573 | public: | |
574 | RGWUserStatsCache(RGWRados *_store, bool quota_threads) : RGWQuotaCache<rgw_user>(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size), | |
575 | rwlock("RGWUserStatsCache::rwlock") { | |
576 | if (quota_threads) { | |
577 | buckets_sync_thread = new BucketsSyncThread(store->ctx(), this); | |
578 | buckets_sync_thread->create("rgw_buck_st_syn"); | |
579 | user_sync_thread = new UserSyncThread(store->ctx(), this); | |
580 | user_sync_thread->create("rgw_user_st_syn"); | |
581 | } else { | |
582 | buckets_sync_thread = NULL; | |
583 | user_sync_thread = NULL; | |
584 | } | |
585 | } | |
586 | ~RGWUserStatsCache() override { | |
587 | stop(); | |
588 | } | |
589 | ||
224ce89b | 590 | AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override { |
7c673cae FG |
591 | return new UserAsyncRefreshHandler(store, this, user, bucket); |
592 | } | |
593 | ||
594 | bool can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& stats) override { | |
595 | /* in the user case, the cached stats may contain a better estimation of the totals, as | |
596 | * the backend is only periodically getting updated. | |
597 | */ | |
598 | return true; | |
599 | } | |
600 | ||
601 | bool going_down() { | |
602 | return down_flag; | |
603 | } | |
604 | ||
605 | void stop() { | |
606 | down_flag = true; | |
607 | rwlock.get_write(); | |
608 | stop_thread(&buckets_sync_thread); | |
609 | rwlock.unlock(); | |
610 | stop_thread(&user_sync_thread); | |
611 | } | |
612 | }; | |
613 | ||
224ce89b | 614 | int RGWUserStatsCache::fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) |
7c673cae FG |
615 | { |
616 | int r = store->get_user_stats(user, stats); | |
617 | if (r < 0) { | |
618 | ldout(store->ctx(), 0) << "could not get user stats for user=" << user << dendl; | |
619 | return r; | |
620 | } | |
621 | ||
622 | return 0; | |
623 | } | |
624 | ||
625 | int RGWUserStatsCache::sync_bucket(const rgw_user& user, rgw_bucket& bucket) | |
626 | { | |
627 | RGWBucketInfo bucket_info; | |
628 | ||
629 | RGWObjectCtx obj_ctx(store); | |
630 | ||
631 | int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, NULL, NULL); | |
632 | if (r < 0) { | |
633 | ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl; | |
634 | return r; | |
635 | } | |
636 | ||
637 | r = rgw_bucket_sync_user_stats(store, user, bucket_info); | |
638 | if (r < 0) { | |
639 | ldout(store->ctx(), 0) << "ERROR: rgw_bucket_sync_user_stats() for user=" << user << ", bucket=" << bucket << " returned " << r << dendl; | |
640 | return r; | |
641 | } | |
642 | ||
643 | return 0; | |
644 | } | |
645 | ||
646 | int RGWUserStatsCache::sync_user(const rgw_user& user) | |
647 | { | |
648 | cls_user_header header; | |
649 | string user_str = user.to_str(); | |
650 | int ret = store->cls_user_get_header(user_str, &header); | |
651 | if (ret < 0) { | |
652 | ldout(store->ctx(), 5) << "ERROR: can't read user header: ret=" << ret << dendl; | |
653 | return ret; | |
654 | } | |
655 | ||
656 | if (!store->ctx()->_conf->rgw_user_quota_sync_idle_users && | |
657 | header.last_stats_update < header.last_stats_sync) { | |
658 | ldout(store->ctx(), 20) << "user is idle, not doing a full sync (user=" << user << ")" << dendl; | |
659 | return 0; | |
660 | } | |
661 | ||
662 | real_time when_need_full_sync = header.last_stats_sync; | |
663 | when_need_full_sync += make_timespan(store->ctx()->_conf->rgw_user_quota_sync_wait_time); | |
664 | ||
665 | // check if enough time passed since last full sync | |
666 | /* FIXME: missing check? */ | |
667 | ||
668 | ret = rgw_user_sync_all_stats(store, user); | |
669 | if (ret < 0) { | |
670 | ldout(store->ctx(), 0) << "ERROR: failed user stats sync, ret=" << ret << dendl; | |
671 | return ret; | |
672 | } | |
673 | ||
674 | return 0; | |
675 | } | |
676 | ||
677 | int RGWUserStatsCache::sync_all_users() | |
678 | { | |
679 | string key = "user"; | |
680 | void *handle; | |
681 | ||
682 | int ret = store->meta_mgr->list_keys_init(key, &handle); | |
683 | if (ret < 0) { | |
684 | ldout(store->ctx(), 10) << "ERROR: can't get key: ret=" << ret << dendl; | |
685 | return ret; | |
686 | } | |
687 | ||
688 | bool truncated; | |
689 | int max = 1000; | |
690 | ||
691 | do { | |
692 | list<string> keys; | |
693 | ret = store->meta_mgr->list_keys_next(handle, max, keys, &truncated); | |
694 | if (ret < 0) { | |
695 | ldout(store->ctx(), 0) << "ERROR: lists_keys_next(): ret=" << ret << dendl; | |
696 | goto done; | |
697 | } | |
698 | for (list<string>::iterator iter = keys.begin(); | |
699 | iter != keys.end() && !going_down(); | |
700 | ++iter) { | |
701 | rgw_user user(*iter); | |
702 | ldout(store->ctx(), 20) << "RGWUserStatsCache: sync user=" << user << dendl; | |
703 | int ret = sync_user(user); | |
704 | if (ret < 0) { | |
705 | ldout(store->ctx(), 5) << "ERROR: sync_user() failed, user=" << user << " ret=" << ret << dendl; | |
706 | ||
707 | /* continuing to next user */ | |
708 | continue; | |
709 | } | |
710 | } | |
711 | } while (truncated); | |
712 | ||
713 | ret = 0; | |
714 | done: | |
715 | store->meta_mgr->list_keys_complete(handle); | |
716 | return ret; | |
717 | } | |
718 | ||
719 | void RGWUserStatsCache::data_modified(const rgw_user& user, rgw_bucket& bucket) | |
720 | { | |
721 | /* racy, but it's ok */ | |
722 | rwlock.get_read(); | |
723 | bool need_update = modified_buckets.find(bucket) == modified_buckets.end(); | |
724 | rwlock.unlock(); | |
725 | ||
726 | if (need_update) { | |
727 | rwlock.get_write(); | |
728 | modified_buckets[bucket] = user; | |
729 | rwlock.unlock(); | |
730 | } | |
731 | } | |
732 | ||
733 | ||
734 | class RGWQuotaInfoApplier { | |
735 | /* NOTE: no non-static field allowed as instances are supposed to live in | |
736 | * the static memory only. */ | |
737 | protected: | |
738 | RGWQuotaInfoApplier() = default; | |
739 | ||
740 | public: | |
741 | virtual ~RGWQuotaInfoApplier() {} | |
742 | ||
743 | virtual bool is_size_exceeded(const char * const entity, | |
744 | const RGWQuotaInfo& qinfo, | |
745 | const RGWStorageStats& stats, | |
746 | const uint64_t size) const = 0; | |
747 | ||
748 | virtual bool is_num_objs_exceeded(const char * const entity, | |
749 | const RGWQuotaInfo& qinfo, | |
750 | const RGWStorageStats& stats, | |
751 | const uint64_t num_objs) const = 0; | |
752 | ||
753 | static const RGWQuotaInfoApplier& get_instance(const RGWQuotaInfo& qinfo); | |
754 | }; | |
755 | ||
756 | class RGWQuotaInfoDefApplier : public RGWQuotaInfoApplier { | |
757 | public: | |
758 | bool is_size_exceeded(const char * const entity, | |
759 | const RGWQuotaInfo& qinfo, | |
760 | const RGWStorageStats& stats, | |
761 | const uint64_t size) const override; | |
762 | ||
763 | bool is_num_objs_exceeded(const char * const entity, | |
764 | const RGWQuotaInfo& qinfo, | |
765 | const RGWStorageStats& stats, | |
766 | const uint64_t num_objs) const override; | |
767 | }; | |
768 | ||
769 | class RGWQuotaInfoRawApplier : public RGWQuotaInfoApplier { | |
770 | public: | |
771 | bool is_size_exceeded(const char * const entity, | |
772 | const RGWQuotaInfo& qinfo, | |
773 | const RGWStorageStats& stats, | |
774 | const uint64_t size) const override; | |
775 | ||
776 | bool is_num_objs_exceeded(const char * const entity, | |
777 | const RGWQuotaInfo& qinfo, | |
778 | const RGWStorageStats& stats, | |
779 | const uint64_t num_objs) const override; | |
780 | }; | |
781 | ||
782 | ||
783 | bool RGWQuotaInfoDefApplier::is_size_exceeded(const char * const entity, | |
784 | const RGWQuotaInfo& qinfo, | |
785 | const RGWStorageStats& stats, | |
786 | const uint64_t size) const | |
787 | { | |
788 | if (qinfo.max_size < 0) { | |
789 | /* The limit is not enabled. */ | |
790 | return false; | |
791 | } | |
792 | ||
793 | const uint64_t cur_size = stats.size_rounded; | |
794 | const uint64_t new_size = rgw_rounded_objsize(size); | |
795 | ||
796 | if (cur_size + new_size > static_cast<uint64_t>(qinfo.max_size)) { | |
797 | dout(10) << "quota exceeded: stats.size_rounded=" << stats.size_rounded | |
798 | << " size=" << new_size << " " | |
799 | << entity << "_quota.max_size=" << qinfo.max_size << dendl; | |
800 | return true; | |
801 | } | |
802 | ||
803 | return false; | |
804 | } | |
805 | ||
806 | bool RGWQuotaInfoDefApplier::is_num_objs_exceeded(const char * const entity, | |
807 | const RGWQuotaInfo& qinfo, | |
808 | const RGWStorageStats& stats, | |
809 | const uint64_t num_objs) const | |
810 | { | |
811 | if (qinfo.max_objects < 0) { | |
812 | /* The limit is not enabled. */ | |
813 | return false; | |
814 | } | |
815 | ||
816 | if (stats.num_objects + num_objs > static_cast<uint64_t>(qinfo.max_objects)) { | |
817 | dout(10) << "quota exceeded: stats.num_objects=" << stats.num_objects | |
818 | << " " << entity << "_quota.max_objects=" << qinfo.max_objects | |
819 | << dendl; | |
820 | return true; | |
821 | } | |
822 | ||
823 | return false; | |
824 | } | |
825 | ||
826 | bool RGWQuotaInfoRawApplier::is_size_exceeded(const char * const entity, | |
827 | const RGWQuotaInfo& qinfo, | |
828 | const RGWStorageStats& stats, | |
829 | const uint64_t size) const | |
830 | { | |
831 | if (qinfo.max_size < 0) { | |
832 | /* The limit is not enabled. */ | |
833 | return false; | |
834 | } | |
835 | ||
836 | const uint64_t cur_size = stats.size; | |
837 | ||
838 | if (cur_size + size > static_cast<uint64_t>(qinfo.max_size)) { | |
839 | dout(10) << "quota exceeded: stats.size=" << stats.size | |
840 | << " size=" << size << " " | |
841 | << entity << "_quota.max_size=" << qinfo.max_size << dendl; | |
842 | return true; | |
843 | } | |
844 | ||
845 | return false; | |
846 | } | |
847 | ||
848 | bool RGWQuotaInfoRawApplier::is_num_objs_exceeded(const char * const entity, | |
849 | const RGWQuotaInfo& qinfo, | |
850 | const RGWStorageStats& stats, | |
851 | const uint64_t num_objs) const | |
852 | { | |
853 | if (qinfo.max_objects < 0) { | |
854 | /* The limit is not enabled. */ | |
855 | return false; | |
856 | } | |
857 | ||
858 | if (stats.num_objects + num_objs > static_cast<uint64_t>(qinfo.max_objects)) { | |
859 | dout(10) << "quota exceeded: stats.num_objects=" << stats.num_objects | |
860 | << " " << entity << "_quota.max_objects=" << qinfo.max_objects | |
861 | << dendl; | |
862 | return true; | |
863 | } | |
864 | ||
865 | return false; | |
866 | } | |
867 | ||
868 | const RGWQuotaInfoApplier& RGWQuotaInfoApplier::get_instance( | |
869 | const RGWQuotaInfo& qinfo) | |
870 | { | |
871 | static RGWQuotaInfoDefApplier default_qapplier; | |
872 | static RGWQuotaInfoRawApplier raw_qapplier; | |
873 | ||
874 | if (qinfo.check_on_raw) { | |
875 | return raw_qapplier; | |
876 | } else { | |
877 | return default_qapplier; | |
878 | } | |
879 | } | |
880 | ||
881 | ||
882 | class RGWQuotaHandlerImpl : public RGWQuotaHandler { | |
883 | RGWRados *store; | |
884 | RGWBucketStatsCache bucket_stats_cache; | |
885 | RGWUserStatsCache user_stats_cache; | |
886 | ||
887 | int check_quota(const char * const entity, | |
888 | const RGWQuotaInfo& quota, | |
889 | const RGWStorageStats& stats, | |
890 | const uint64_t num_objs, | |
891 | const uint64_t size) { | |
892 | if (!quota.enabled) { | |
893 | return 0; | |
894 | } | |
895 | ||
896 | const auto& quota_applier = RGWQuotaInfoApplier::get_instance(quota); | |
897 | ||
898 | ldout(store->ctx(), 20) << entity | |
899 | << " quota: max_objects=" << quota.max_objects | |
900 | << " max_size=" << quota.max_size << dendl; | |
901 | ||
902 | ||
903 | if (quota_applier.is_num_objs_exceeded(entity, quota, stats, num_objs)) { | |
904 | return -ERR_QUOTA_EXCEEDED; | |
905 | } | |
906 | ||
907 | if (quota_applier.is_size_exceeded(entity, quota, stats, size)) { | |
908 | return -ERR_QUOTA_EXCEEDED; | |
909 | } | |
910 | ||
911 | ldout(store->ctx(), 20) << entity << " quota OK:" | |
912 | << " stats.num_objects=" << stats.num_objects | |
913 | << " stats.size=" << stats.size << dendl; | |
914 | return 0; | |
915 | } | |
916 | public: | |
917 | RGWQuotaHandlerImpl(RGWRados *_store, bool quota_threads) : store(_store), | |
918 | bucket_stats_cache(_store), | |
919 | user_stats_cache(_store, quota_threads) {} | |
920 | ||
921 | int check_quota(const rgw_user& user, | |
922 | rgw_bucket& bucket, | |
923 | RGWQuotaInfo& user_quota, | |
924 | RGWQuotaInfo& bucket_quota, | |
925 | uint64_t num_objs, | |
926 | uint64_t size) override { | |
927 | ||
928 | if (!bucket_quota.enabled && !user_quota.enabled) { | |
929 | return 0; | |
930 | } | |
931 | ||
932 | /* | |
933 | * we need to fetch bucket stats if the user quota is enabled, because | |
934 | * the whole system relies on us periodically updating the user's bucket | |
935 | * stats in the user's header, this happens in get_stats() if we actually | |
936 | * fetch that info and not rely on cached data | |
937 | */ | |
938 | ||
939 | if (bucket_quota.enabled) { | |
940 | RGWStorageStats bucket_stats; | |
941 | int ret = bucket_stats_cache.get_stats(user, bucket, bucket_stats, | |
942 | bucket_quota); | |
943 | if (ret < 0) { | |
944 | return ret; | |
945 | } | |
946 | ret = check_quota("bucket", bucket_quota, bucket_stats, num_objs, size); | |
947 | if (ret < 0) { | |
948 | return ret; | |
949 | } | |
950 | } | |
951 | ||
952 | if (user_quota.enabled) { | |
953 | RGWStorageStats user_stats; | |
954 | int ret = user_stats_cache.get_stats(user, bucket, user_stats, user_quota); | |
955 | if (ret < 0) { | |
956 | return ret; | |
957 | } | |
958 | ret = check_quota("user", user_quota, user_stats, num_objs, size); | |
959 | if (ret < 0) { | |
960 | return ret; | |
961 | } | |
962 | } | |
963 | return 0; | |
964 | } | |
965 | ||
966 | void update_stats(const rgw_user& user, rgw_bucket& bucket, int obj_delta, uint64_t added_bytes, uint64_t removed_bytes) override { | |
967 | bucket_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes); | |
968 | user_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes); | |
969 | } | |
31f18b77 FG |
970 | |
971 | int check_bucket_shards(uint64_t max_objs_per_shard, uint64_t num_shards, | |
224ce89b | 972 | const rgw_user& user, const rgw_bucket& bucket, RGWQuotaInfo& bucket_quota, |
31f18b77 FG |
973 | uint64_t num_objs, bool& need_resharding, uint32_t *suggested_num_shards) |
974 | { | |
975 | RGWStorageStats bucket_stats; | |
976 | int ret = bucket_stats_cache.get_stats(user, bucket, bucket_stats, | |
977 | bucket_quota); | |
978 | if (ret < 0) { | |
979 | return ret; | |
980 | } | |
981 | ||
982 | if (bucket_stats.num_objects + num_objs > num_shards * max_objs_per_shard) { | |
983 | ldout(store->ctx(), 0) << __func__ << ": resharding needed: stats.num_objects=" << bucket_stats.num_objects | |
984 | << " shard max_objects=" << max_objs_per_shard * num_shards << dendl; | |
985 | need_resharding = true; | |
986 | if (suggested_num_shards) { | |
987 | *suggested_num_shards = (bucket_stats.num_objects + num_objs) * 2 / max_objs_per_shard; | |
988 | } | |
989 | } else { | |
990 | need_resharding = false; | |
991 | } | |
992 | ||
993 | return 0; | |
994 | } | |
995 | ||
7c673cae FG |
996 | }; |
997 | ||
998 | ||
999 | RGWQuotaHandler *RGWQuotaHandler::generate_handler(RGWRados *store, bool quota_threads) | |
1000 | { | |
1001 | return new RGWQuotaHandlerImpl(store, quota_threads); | |
1002 | } | |
1003 | ||
1004 | void RGWQuotaHandler::free_handler(RGWQuotaHandler *handler) | |
1005 | { | |
1006 | delete handler; | |
1007 | } | |
31f18b77 FG |
1008 | |
1009 |