]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2013 Inktank, Inc | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | ||
16 | #include "include/utime.h" | |
17 | #include "common/lru_map.h" | |
18 | #include "common/RefCountedObj.h" | |
19 | #include "common/Thread.h" | |
20 | #include "common/Mutex.h" | |
21 | #include "common/RWLock.h" | |
22 | ||
23 | #include "rgw_common.h" | |
24 | #include "rgw_rados.h" | |
25 | #include "rgw_quota.h" | |
26 | #include "rgw_bucket.h" | |
27 | #include "rgw_user.h" | |
28 | ||
29 | #include <atomic> | |
30 | ||
31 | #define dout_context g_ceph_context | |
32 | #define dout_subsys ceph_subsys_rgw | |
33 | ||
34 | ||
35 | struct RGWQuotaCacheStats { | |
36 | RGWStorageStats stats; | |
37 | utime_t expiration; | |
38 | utime_t async_refresh_time; | |
39 | }; | |
40 | ||
41 | template<class T> | |
42 | class RGWQuotaCache { | |
43 | protected: | |
44 | RGWRados *store; | |
45 | lru_map<T, RGWQuotaCacheStats> stats_map; | |
46 | RefCountedWaitObject *async_refcount; | |
47 | ||
48 | class StatsAsyncTestSet : public lru_map<T, RGWQuotaCacheStats>::UpdateContext { | |
49 | int objs_delta; | |
50 | uint64_t added_bytes; | |
51 | uint64_t removed_bytes; | |
52 | public: | |
53 | StatsAsyncTestSet() : objs_delta(0), added_bytes(0), removed_bytes(0) {} | |
54 | bool update(RGWQuotaCacheStats *entry) override { | |
55 | if (entry->async_refresh_time.sec() == 0) | |
56 | return false; | |
57 | ||
58 | entry->async_refresh_time = utime_t(0, 0); | |
59 | ||
60 | return true; | |
61 | } | |
62 | }; | |
63 | ||
224ce89b | 64 | virtual int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) = 0; |
7c673cae | 65 | |
224ce89b | 66 | virtual bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0; |
7c673cae | 67 | |
224ce89b WB |
68 | virtual bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, typename lru_map<T, RGWQuotaCacheStats>::UpdateContext *ctx) = 0; |
69 | virtual void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0; | |
7c673cae FG |
70 | |
71 | virtual void data_modified(const rgw_user& user, rgw_bucket& bucket) {} | |
72 | public: | |
73 | RGWQuotaCache(RGWRados *_store, int size) : store(_store), stats_map(size) { | |
74 | async_refcount = new RefCountedWaitObject; | |
75 | } | |
76 | virtual ~RGWQuotaCache() { | |
77 | async_refcount->put_wait(); /* wait for all pending async requests to complete */ | |
78 | } | |
79 | ||
224ce89b | 80 | int get_stats(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota); |
7c673cae FG |
81 | void adjust_stats(const rgw_user& user, rgw_bucket& bucket, int objs_delta, uint64_t added_bytes, uint64_t removed_bytes); |
82 | ||
83 | virtual bool can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& stats); | |
84 | ||
224ce89b WB |
85 | void set_stats(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats); |
86 | int async_refresh(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs); | |
7c673cae FG |
87 | void async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats); |
88 | ||
89 | class AsyncRefreshHandler { | |
90 | protected: | |
91 | RGWRados *store; | |
92 | RGWQuotaCache<T> *cache; | |
93 | public: | |
94 | AsyncRefreshHandler(RGWRados *_store, RGWQuotaCache<T> *_cache) : store(_store), cache(_cache) {} | |
95 | virtual ~AsyncRefreshHandler() {} | |
96 | ||
97 | virtual int init_fetch() = 0; | |
98 | virtual void drop_reference() = 0; | |
99 | }; | |
100 | ||
224ce89b | 101 | virtual AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) = 0; |
7c673cae FG |
102 | }; |
103 | ||
104 | template<class T> | |
105 | bool RGWQuotaCache<T>::can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& cached_stats) | |
106 | { | |
107 | if (quota.max_size >= 0) { | |
108 | if (quota.max_size_soft_threshold < 0) { | |
109 | quota.max_size_soft_threshold = quota.max_size * store->ctx()->_conf->rgw_bucket_quota_soft_threshold; | |
110 | } | |
111 | ||
112 | const auto cached_stats_num_kb_rounded = rgw_rounded_kb(cached_stats.size_rounded); | |
113 | if (cached_stats_num_kb_rounded >= (uint64_t)quota.max_size_soft_threshold) { | |
114 | ldout(store->ctx(), 20) << "quota: can't use cached stats, exceeded soft threshold (size): " | |
115 | << cached_stats_num_kb_rounded << " >= " << quota.max_size_soft_threshold << dendl; | |
116 | return false; | |
117 | } | |
118 | } | |
119 | ||
120 | if (quota.max_objects >= 0) { | |
121 | if (quota.max_objs_soft_threshold < 0) { | |
122 | quota.max_objs_soft_threshold = quota.max_objects * store->ctx()->_conf->rgw_bucket_quota_soft_threshold; | |
123 | } | |
124 | ||
125 | if (cached_stats.num_objects >= (uint64_t)quota.max_objs_soft_threshold) { | |
126 | ldout(store->ctx(), 20) << "quota: can't use cached stats, exceeded soft threshold (num objs): " | |
127 | << cached_stats.num_objects << " >= " << quota.max_objs_soft_threshold << dendl; | |
128 | return false; | |
129 | } | |
130 | } | |
131 | ||
132 | return true; | |
133 | } | |
134 | ||
135 | template<class T> | |
224ce89b | 136 | int RGWQuotaCache<T>::async_refresh(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) |
7c673cae FG |
137 | { |
138 | /* protect against multiple updates */ | |
139 | StatsAsyncTestSet test_update; | |
140 | if (!map_find_and_update(user, bucket, &test_update)) { | |
141 | /* most likely we just raced with another update */ | |
142 | return 0; | |
143 | } | |
144 | ||
145 | async_refcount->get(); | |
146 | ||
147 | ||
148 | AsyncRefreshHandler *handler = allocate_refresh_handler(user, bucket); | |
149 | ||
150 | int ret = handler->init_fetch(); | |
151 | if (ret < 0) { | |
152 | async_refcount->put(); | |
153 | handler->drop_reference(); | |
154 | return ret; | |
155 | } | |
156 | ||
157 | return 0; | |
158 | } | |
159 | ||
160 | template<class T> | |
161 | void RGWQuotaCache<T>::async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats) | |
162 | { | |
163 | ldout(store->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl; | |
164 | ||
165 | RGWQuotaCacheStats qs; | |
166 | ||
167 | map_find(user, bucket, qs); | |
168 | ||
169 | set_stats(user, bucket, qs, stats); | |
170 | ||
171 | async_refcount->put(); | |
172 | } | |
173 | ||
174 | template<class T> | |
224ce89b | 175 | void RGWQuotaCache<T>::set_stats(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats) |
7c673cae FG |
176 | { |
177 | qs.stats = stats; | |
178 | qs.expiration = ceph_clock_now(); | |
179 | qs.async_refresh_time = qs.expiration; | |
180 | qs.expiration += store->ctx()->_conf->rgw_bucket_quota_ttl; | |
181 | qs.async_refresh_time += store->ctx()->_conf->rgw_bucket_quota_ttl / 2; | |
182 | ||
183 | map_add(user, bucket, qs); | |
184 | } | |
185 | ||
186 | template<class T> | |
224ce89b | 187 | int RGWQuotaCache<T>::get_stats(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota) { |
7c673cae FG |
188 | RGWQuotaCacheStats qs; |
189 | utime_t now = ceph_clock_now(); | |
190 | if (map_find(user, bucket, qs)) { | |
191 | if (qs.async_refresh_time.sec() > 0 && now >= qs.async_refresh_time) { | |
192 | int r = async_refresh(user, bucket, qs); | |
193 | if (r < 0) { | |
194 | ldout(store->ctx(), 0) << "ERROR: quota async refresh returned ret=" << r << dendl; | |
195 | ||
196 | /* continue processing, might be a transient error, async refresh is just optimization */ | |
197 | } | |
198 | } | |
199 | ||
200 | if (can_use_cached_stats(quota, qs.stats) && qs.expiration > | |
201 | ceph_clock_now()) { | |
202 | stats = qs.stats; | |
203 | return 0; | |
204 | } | |
205 | } | |
206 | ||
207 | int ret = fetch_stats_from_storage(user, bucket, stats); | |
208 | if (ret < 0 && ret != -ENOENT) | |
209 | return ret; | |
210 | ||
211 | set_stats(user, bucket, qs, stats); | |
212 | ||
213 | return 0; | |
214 | } | |
215 | ||
216 | ||
217 | template<class T> | |
218 | class RGWQuotaStatsUpdate : public lru_map<T, RGWQuotaCacheStats>::UpdateContext { | |
219 | const int objs_delta; | |
220 | const uint64_t added_bytes; | |
221 | const uint64_t removed_bytes; | |
222 | public: | |
223 | RGWQuotaStatsUpdate(const int objs_delta, | |
224 | const uint64_t added_bytes, | |
225 | const uint64_t removed_bytes) | |
226 | : objs_delta(objs_delta), | |
227 | added_bytes(added_bytes), | |
228 | removed_bytes(removed_bytes) { | |
229 | } | |
230 | ||
231 | bool update(RGWQuotaCacheStats * const entry) override { | |
232 | const uint64_t rounded_added = rgw_rounded_objsize(added_bytes); | |
233 | const uint64_t rounded_removed = rgw_rounded_objsize(removed_bytes); | |
234 | ||
235 | entry->stats.size += added_bytes - removed_bytes; | |
236 | entry->stats.size_rounded += rounded_added - rounded_removed; | |
237 | entry->stats.num_objects += objs_delta; | |
238 | ||
239 | return true; | |
240 | } | |
241 | }; | |
242 | ||
243 | ||
244 | template<class T> | |
245 | void RGWQuotaCache<T>::adjust_stats(const rgw_user& user, rgw_bucket& bucket, int objs_delta, | |
246 | uint64_t added_bytes, uint64_t removed_bytes) | |
247 | { | |
248 | RGWQuotaStatsUpdate<T> update(objs_delta, added_bytes, removed_bytes); | |
249 | map_find_and_update(user, bucket, &update); | |
250 | ||
251 | data_modified(user, bucket); | |
252 | } | |
253 | ||
254 | class BucketAsyncRefreshHandler : public RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler, | |
255 | public RGWGetBucketStats_CB { | |
256 | rgw_user user; | |
257 | public: | |
258 | BucketAsyncRefreshHandler(RGWRados *_store, RGWQuotaCache<rgw_bucket> *_cache, | |
224ce89b | 259 | const rgw_user& _user, const rgw_bucket& _bucket) : |
7c673cae FG |
260 | RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler(_store, _cache), |
261 | RGWGetBucketStats_CB(_bucket), user(_user) {} | |
262 | ||
263 | void drop_reference() override { put(); } | |
264 | void handle_response(int r) override; | |
265 | int init_fetch() override; | |
266 | }; | |
267 | ||
268 | int BucketAsyncRefreshHandler::init_fetch() | |
269 | { | |
270 | RGWBucketInfo bucket_info; | |
271 | ||
272 | RGWObjectCtx obj_ctx(store); | |
273 | ||
274 | int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, NULL, NULL); | |
275 | if (r < 0) { | |
276 | ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl; | |
277 | return r; | |
278 | } | |
279 | ||
280 | ldout(store->ctx(), 20) << "initiating async quota refresh for bucket=" << bucket << dendl; | |
281 | ||
282 | r = store->get_bucket_stats_async(bucket_info, RGW_NO_SHARD, this); | |
283 | if (r < 0) { | |
284 | ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket.name << dendl; | |
285 | ||
286 | /* get_bucket_stats_async() dropped our reference already */ | |
287 | return r; | |
288 | } | |
289 | ||
290 | return 0; | |
291 | } | |
292 | ||
293 | void BucketAsyncRefreshHandler::handle_response(const int r) | |
294 | { | |
295 | if (r < 0) { | |
296 | ldout(store->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl; | |
297 | return; /* nothing to do here */ | |
298 | } | |
299 | ||
300 | RGWStorageStats bs; | |
301 | ||
302 | for (const auto& pair : *stats) { | |
303 | const RGWStorageStats& s = pair.second; | |
304 | ||
305 | bs.size += s.size; | |
306 | bs.size_rounded += s.size_rounded; | |
307 | bs.num_objects += s.num_objects; | |
308 | } | |
309 | ||
310 | cache->async_refresh_response(user, bucket, bs); | |
311 | } | |
312 | ||
313 | class RGWBucketStatsCache : public RGWQuotaCache<rgw_bucket> { | |
314 | protected: | |
224ce89b | 315 | bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override { |
7c673cae FG |
316 | return stats_map.find(bucket, qs); |
317 | } | |
318 | ||
224ce89b | 319 | bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, lru_map<rgw_bucket, RGWQuotaCacheStats>::UpdateContext *ctx) override { |
7c673cae FG |
320 | return stats_map.find_and_update(bucket, NULL, ctx); |
321 | } | |
322 | ||
224ce89b | 323 | void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override { |
7c673cae FG |
324 | stats_map.add(bucket, qs); |
325 | } | |
326 | ||
224ce89b | 327 | int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) override; |
7c673cae FG |
328 | |
329 | public: | |
330 | explicit RGWBucketStatsCache(RGWRados *_store) : RGWQuotaCache<rgw_bucket>(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size) { | |
331 | } | |
332 | ||
224ce89b | 333 | AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override { |
7c673cae FG |
334 | return new BucketAsyncRefreshHandler(store, this, user, bucket); |
335 | } | |
336 | }; | |
337 | ||
224ce89b | 338 | int RGWBucketStatsCache::fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) |
7c673cae FG |
339 | { |
340 | RGWBucketInfo bucket_info; | |
341 | ||
342 | RGWObjectCtx obj_ctx(store); | |
343 | ||
344 | int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, NULL, NULL); | |
345 | if (r < 0) { | |
346 | ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl; | |
347 | return r; | |
348 | } | |
349 | ||
350 | string bucket_ver; | |
351 | string master_ver; | |
352 | ||
353 | map<RGWObjCategory, RGWStorageStats> bucket_stats; | |
354 | r = store->get_bucket_stats(bucket_info, RGW_NO_SHARD, &bucket_ver, | |
355 | &master_ver, bucket_stats, nullptr); | |
356 | if (r < 0) { | |
357 | ldout(store->ctx(), 0) << "could not get bucket stats for bucket=" | |
358 | << bucket.name << dendl; | |
359 | return r; | |
360 | } | |
361 | ||
362 | stats = RGWStorageStats(); | |
363 | ||
364 | for (const auto& pair : bucket_stats) { | |
365 | const RGWStorageStats& s = pair.second; | |
366 | ||
367 | stats.size += s.size; | |
368 | stats.size_rounded += s.size_rounded; | |
369 | stats.num_objects += s.num_objects; | |
370 | } | |
371 | ||
372 | return 0; | |
373 | } | |
374 | ||
375 | class UserAsyncRefreshHandler : public RGWQuotaCache<rgw_user>::AsyncRefreshHandler, | |
376 | public RGWGetUserStats_CB { | |
377 | rgw_bucket bucket; | |
378 | public: | |
379 | UserAsyncRefreshHandler(RGWRados *_store, RGWQuotaCache<rgw_user> *_cache, | |
224ce89b | 380 | const rgw_user& _user, const rgw_bucket& _bucket) : |
7c673cae FG |
381 | RGWQuotaCache<rgw_user>::AsyncRefreshHandler(_store, _cache), |
382 | RGWGetUserStats_CB(_user), | |
383 | bucket(_bucket) {} | |
384 | ||
385 | void drop_reference() override { put(); } | |
386 | int init_fetch() override; | |
387 | void handle_response(int r) override; | |
388 | }; | |
389 | ||
390 | int UserAsyncRefreshHandler::init_fetch() | |
391 | { | |
392 | ldout(store->ctx(), 20) << "initiating async quota refresh for user=" << user << dendl; | |
393 | int r = store->get_user_stats_async(user, this); | |
394 | if (r < 0) { | |
395 | ldout(store->ctx(), 0) << "could not get bucket info for user=" << user << dendl; | |
396 | ||
397 | /* get_bucket_stats_async() dropped our reference already */ | |
398 | return r; | |
399 | } | |
400 | ||
401 | return 0; | |
402 | } | |
403 | ||
404 | void UserAsyncRefreshHandler::handle_response(int r) | |
405 | { | |
406 | if (r < 0) { | |
407 | ldout(store->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl; | |
408 | return; /* nothing to do here */ | |
409 | } | |
410 | ||
411 | cache->async_refresh_response(user, bucket, stats); | |
412 | } | |
413 | ||
414 | class RGWUserStatsCache : public RGWQuotaCache<rgw_user> { | |
415 | std::atomic<bool> down_flag = { false }; | |
416 | RWLock rwlock; | |
417 | map<rgw_bucket, rgw_user> modified_buckets; | |
418 | ||
419 | /* thread, sync recent modified buckets info */ | |
420 | class BucketsSyncThread : public Thread { | |
421 | CephContext *cct; | |
422 | RGWUserStatsCache *stats; | |
423 | ||
424 | Mutex lock; | |
425 | Cond cond; | |
426 | public: | |
427 | ||
428 | BucketsSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s), lock("RGWUserStatsCache::BucketsSyncThread") {} | |
429 | ||
430 | void *entry() override { | |
431 | ldout(cct, 20) << "BucketsSyncThread: start" << dendl; | |
432 | do { | |
433 | map<rgw_bucket, rgw_user> buckets; | |
434 | ||
435 | stats->swap_modified_buckets(buckets); | |
436 | ||
437 | for (map<rgw_bucket, rgw_user>::iterator iter = buckets.begin(); iter != buckets.end(); ++iter) { | |
438 | rgw_bucket bucket = iter->first; | |
439 | rgw_user& user = iter->second; | |
440 | ldout(cct, 20) << "BucketsSyncThread: sync user=" << user << " bucket=" << bucket << dendl; | |
441 | int r = stats->sync_bucket(user, bucket); | |
442 | if (r < 0) { | |
443 | ldout(cct, 0) << "WARNING: sync_bucket() returned r=" << r << dendl; | |
444 | } | |
445 | } | |
446 | ||
447 | if (stats->going_down()) | |
448 | break; | |
449 | ||
450 | lock.Lock(); | |
451 | cond.WaitInterval(lock, utime_t(cct->_conf->rgw_user_quota_bucket_sync_interval, 0)); | |
452 | lock.Unlock(); | |
453 | } while (!stats->going_down()); | |
454 | ldout(cct, 20) << "BucketsSyncThread: done" << dendl; | |
455 | ||
456 | return NULL; | |
457 | } | |
458 | ||
459 | void stop() { | |
460 | Mutex::Locker l(lock); | |
461 | cond.Signal(); | |
462 | } | |
463 | }; | |
464 | ||
465 | /* | |
466 | * thread, full sync all users stats periodically | |
467 | * | |
468 | * only sync non idle users or ones that never got synced before, this is needed so that | |
469 | * users that didn't have quota turned on before (or existed before the user objclass | |
470 | * tracked stats) need to get their backend stats up to date. | |
471 | */ | |
472 | class UserSyncThread : public Thread { | |
473 | CephContext *cct; | |
474 | RGWUserStatsCache *stats; | |
475 | ||
476 | Mutex lock; | |
477 | Cond cond; | |
478 | public: | |
479 | ||
480 | UserSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s), lock("RGWUserStatsCache::UserSyncThread") {} | |
481 | ||
482 | void *entry() override { | |
483 | ldout(cct, 20) << "UserSyncThread: start" << dendl; | |
484 | do { | |
485 | int ret = stats->sync_all_users(); | |
486 | if (ret < 0) { | |
487 | ldout(cct, 5) << "ERROR: sync_all_users() returned ret=" << ret << dendl; | |
488 | } | |
489 | ||
490 | lock.Lock(); | |
491 | cond.WaitInterval(lock, utime_t(cct->_conf->rgw_user_quota_sync_interval, 0)); | |
492 | lock.Unlock(); | |
493 | } while (!stats->going_down()); | |
494 | ldout(cct, 20) << "UserSyncThread: done" << dendl; | |
495 | ||
496 | return NULL; | |
497 | } | |
498 | ||
499 | void stop() { | |
500 | Mutex::Locker l(lock); | |
501 | cond.Signal(); | |
502 | } | |
503 | }; | |
504 | ||
505 | BucketsSyncThread *buckets_sync_thread; | |
506 | UserSyncThread *user_sync_thread; | |
507 | protected: | |
224ce89b | 508 | bool map_find(const rgw_user& user,const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override { |
7c673cae FG |
509 | return stats_map.find(user, qs); |
510 | } | |
511 | ||
224ce89b | 512 | bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, lru_map<rgw_user, RGWQuotaCacheStats>::UpdateContext *ctx) override { |
7c673cae FG |
513 | return stats_map.find_and_update(user, NULL, ctx); |
514 | } | |
515 | ||
224ce89b | 516 | void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override { |
7c673cae FG |
517 | stats_map.add(user, qs); |
518 | } | |
519 | ||
224ce89b | 520 | int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) override; |
7c673cae FG |
521 | int sync_bucket(const rgw_user& rgw_user, rgw_bucket& bucket); |
522 | int sync_user(const rgw_user& user); | |
523 | int sync_all_users(); | |
524 | ||
525 | void data_modified(const rgw_user& user, rgw_bucket& bucket) override; | |
526 | ||
527 | void swap_modified_buckets(map<rgw_bucket, rgw_user>& out) { | |
528 | rwlock.get_write(); | |
529 | modified_buckets.swap(out); | |
530 | rwlock.unlock(); | |
531 | } | |
532 | ||
533 | template<class T> /* easier doing it as a template, Thread doesn't have ->stop() */ | |
534 | void stop_thread(T **pthr) { | |
535 | T *thread = *pthr; | |
536 | if (!thread) | |
537 | return; | |
538 | ||
539 | thread->stop(); | |
540 | thread->join(); | |
541 | delete thread; | |
542 | *pthr = NULL; | |
543 | } | |
544 | ||
545 | public: | |
546 | RGWUserStatsCache(RGWRados *_store, bool quota_threads) : RGWQuotaCache<rgw_user>(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size), | |
547 | rwlock("RGWUserStatsCache::rwlock") { | |
548 | if (quota_threads) { | |
549 | buckets_sync_thread = new BucketsSyncThread(store->ctx(), this); | |
550 | buckets_sync_thread->create("rgw_buck_st_syn"); | |
551 | user_sync_thread = new UserSyncThread(store->ctx(), this); | |
552 | user_sync_thread->create("rgw_user_st_syn"); | |
553 | } else { | |
554 | buckets_sync_thread = NULL; | |
555 | user_sync_thread = NULL; | |
556 | } | |
557 | } | |
558 | ~RGWUserStatsCache() override { | |
559 | stop(); | |
560 | } | |
561 | ||
224ce89b | 562 | AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override { |
7c673cae FG |
563 | return new UserAsyncRefreshHandler(store, this, user, bucket); |
564 | } | |
565 | ||
566 | bool can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& stats) override { | |
567 | /* in the user case, the cached stats may contain a better estimation of the totals, as | |
568 | * the backend is only periodically getting updated. | |
569 | */ | |
570 | return true; | |
571 | } | |
572 | ||
573 | bool going_down() { | |
574 | return down_flag; | |
575 | } | |
576 | ||
577 | void stop() { | |
578 | down_flag = true; | |
579 | rwlock.get_write(); | |
580 | stop_thread(&buckets_sync_thread); | |
581 | rwlock.unlock(); | |
582 | stop_thread(&user_sync_thread); | |
583 | } | |
584 | }; | |
585 | ||
224ce89b | 586 | int RGWUserStatsCache::fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) |
7c673cae FG |
587 | { |
588 | int r = store->get_user_stats(user, stats); | |
589 | if (r < 0) { | |
590 | ldout(store->ctx(), 0) << "could not get user stats for user=" << user << dendl; | |
591 | return r; | |
592 | } | |
593 | ||
594 | return 0; | |
595 | } | |
596 | ||
597 | int RGWUserStatsCache::sync_bucket(const rgw_user& user, rgw_bucket& bucket) | |
598 | { | |
599 | RGWBucketInfo bucket_info; | |
600 | ||
601 | RGWObjectCtx obj_ctx(store); | |
602 | ||
603 | int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, NULL, NULL); | |
604 | if (r < 0) { | |
605 | ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl; | |
606 | return r; | |
607 | } | |
608 | ||
609 | r = rgw_bucket_sync_user_stats(store, user, bucket_info); | |
610 | if (r < 0) { | |
611 | ldout(store->ctx(), 0) << "ERROR: rgw_bucket_sync_user_stats() for user=" << user << ", bucket=" << bucket << " returned " << r << dendl; | |
612 | return r; | |
613 | } | |
614 | ||
615 | return 0; | |
616 | } | |
617 | ||
618 | int RGWUserStatsCache::sync_user(const rgw_user& user) | |
619 | { | |
620 | cls_user_header header; | |
621 | string user_str = user.to_str(); | |
622 | int ret = store->cls_user_get_header(user_str, &header); | |
623 | if (ret < 0) { | |
624 | ldout(store->ctx(), 5) << "ERROR: can't read user header: ret=" << ret << dendl; | |
625 | return ret; | |
626 | } | |
627 | ||
628 | if (!store->ctx()->_conf->rgw_user_quota_sync_idle_users && | |
629 | header.last_stats_update < header.last_stats_sync) { | |
630 | ldout(store->ctx(), 20) << "user is idle, not doing a full sync (user=" << user << ")" << dendl; | |
631 | return 0; | |
632 | } | |
633 | ||
634 | real_time when_need_full_sync = header.last_stats_sync; | |
635 | when_need_full_sync += make_timespan(store->ctx()->_conf->rgw_user_quota_sync_wait_time); | |
636 | ||
637 | // check if enough time passed since last full sync | |
638 | /* FIXME: missing check? */ | |
639 | ||
640 | ret = rgw_user_sync_all_stats(store, user); | |
641 | if (ret < 0) { | |
642 | ldout(store->ctx(), 0) << "ERROR: failed user stats sync, ret=" << ret << dendl; | |
643 | return ret; | |
644 | } | |
645 | ||
646 | return 0; | |
647 | } | |
648 | ||
649 | int RGWUserStatsCache::sync_all_users() | |
650 | { | |
651 | string key = "user"; | |
652 | void *handle; | |
653 | ||
654 | int ret = store->meta_mgr->list_keys_init(key, &handle); | |
655 | if (ret < 0) { | |
656 | ldout(store->ctx(), 10) << "ERROR: can't get key: ret=" << ret << dendl; | |
657 | return ret; | |
658 | } | |
659 | ||
660 | bool truncated; | |
661 | int max = 1000; | |
662 | ||
663 | do { | |
664 | list<string> keys; | |
665 | ret = store->meta_mgr->list_keys_next(handle, max, keys, &truncated); | |
666 | if (ret < 0) { | |
667 | ldout(store->ctx(), 0) << "ERROR: lists_keys_next(): ret=" << ret << dendl; | |
668 | goto done; | |
669 | } | |
670 | for (list<string>::iterator iter = keys.begin(); | |
671 | iter != keys.end() && !going_down(); | |
672 | ++iter) { | |
673 | rgw_user user(*iter); | |
674 | ldout(store->ctx(), 20) << "RGWUserStatsCache: sync user=" << user << dendl; | |
675 | int ret = sync_user(user); | |
676 | if (ret < 0) { | |
677 | ldout(store->ctx(), 5) << "ERROR: sync_user() failed, user=" << user << " ret=" << ret << dendl; | |
678 | ||
679 | /* continuing to next user */ | |
680 | continue; | |
681 | } | |
682 | } | |
683 | } while (truncated); | |
684 | ||
685 | ret = 0; | |
686 | done: | |
687 | store->meta_mgr->list_keys_complete(handle); | |
688 | return ret; | |
689 | } | |
690 | ||
691 | void RGWUserStatsCache::data_modified(const rgw_user& user, rgw_bucket& bucket) | |
692 | { | |
693 | /* racy, but it's ok */ | |
694 | rwlock.get_read(); | |
695 | bool need_update = modified_buckets.find(bucket) == modified_buckets.end(); | |
696 | rwlock.unlock(); | |
697 | ||
698 | if (need_update) { | |
699 | rwlock.get_write(); | |
700 | modified_buckets[bucket] = user; | |
701 | rwlock.unlock(); | |
702 | } | |
703 | } | |
704 | ||
705 | ||
706 | class RGWQuotaInfoApplier { | |
707 | /* NOTE: no non-static field allowed as instances are supposed to live in | |
708 | * the static memory only. */ | |
709 | protected: | |
710 | RGWQuotaInfoApplier() = default; | |
711 | ||
712 | public: | |
713 | virtual ~RGWQuotaInfoApplier() {} | |
714 | ||
715 | virtual bool is_size_exceeded(const char * const entity, | |
716 | const RGWQuotaInfo& qinfo, | |
717 | const RGWStorageStats& stats, | |
718 | const uint64_t size) const = 0; | |
719 | ||
720 | virtual bool is_num_objs_exceeded(const char * const entity, | |
721 | const RGWQuotaInfo& qinfo, | |
722 | const RGWStorageStats& stats, | |
723 | const uint64_t num_objs) const = 0; | |
724 | ||
725 | static const RGWQuotaInfoApplier& get_instance(const RGWQuotaInfo& qinfo); | |
726 | }; | |
727 | ||
728 | class RGWQuotaInfoDefApplier : public RGWQuotaInfoApplier { | |
729 | public: | |
730 | bool is_size_exceeded(const char * const entity, | |
731 | const RGWQuotaInfo& qinfo, | |
732 | const RGWStorageStats& stats, | |
733 | const uint64_t size) const override; | |
734 | ||
735 | bool is_num_objs_exceeded(const char * const entity, | |
736 | const RGWQuotaInfo& qinfo, | |
737 | const RGWStorageStats& stats, | |
738 | const uint64_t num_objs) const override; | |
739 | }; | |
740 | ||
741 | class RGWQuotaInfoRawApplier : public RGWQuotaInfoApplier { | |
742 | public: | |
743 | bool is_size_exceeded(const char * const entity, | |
744 | const RGWQuotaInfo& qinfo, | |
745 | const RGWStorageStats& stats, | |
746 | const uint64_t size) const override; | |
747 | ||
748 | bool is_num_objs_exceeded(const char * const entity, | |
749 | const RGWQuotaInfo& qinfo, | |
750 | const RGWStorageStats& stats, | |
751 | const uint64_t num_objs) const override; | |
752 | }; | |
753 | ||
754 | ||
755 | bool RGWQuotaInfoDefApplier::is_size_exceeded(const char * const entity, | |
756 | const RGWQuotaInfo& qinfo, | |
757 | const RGWStorageStats& stats, | |
758 | const uint64_t size) const | |
759 | { | |
760 | if (qinfo.max_size < 0) { | |
761 | /* The limit is not enabled. */ | |
762 | return false; | |
763 | } | |
764 | ||
765 | const uint64_t cur_size = stats.size_rounded; | |
766 | const uint64_t new_size = rgw_rounded_objsize(size); | |
767 | ||
768 | if (cur_size + new_size > static_cast<uint64_t>(qinfo.max_size)) { | |
769 | dout(10) << "quota exceeded: stats.size_rounded=" << stats.size_rounded | |
770 | << " size=" << new_size << " " | |
771 | << entity << "_quota.max_size=" << qinfo.max_size << dendl; | |
772 | return true; | |
773 | } | |
774 | ||
775 | return false; | |
776 | } | |
777 | ||
778 | bool RGWQuotaInfoDefApplier::is_num_objs_exceeded(const char * const entity, | |
779 | const RGWQuotaInfo& qinfo, | |
780 | const RGWStorageStats& stats, | |
781 | const uint64_t num_objs) const | |
782 | { | |
783 | if (qinfo.max_objects < 0) { | |
784 | /* The limit is not enabled. */ | |
785 | return false; | |
786 | } | |
787 | ||
788 | if (stats.num_objects + num_objs > static_cast<uint64_t>(qinfo.max_objects)) { | |
789 | dout(10) << "quota exceeded: stats.num_objects=" << stats.num_objects | |
790 | << " " << entity << "_quota.max_objects=" << qinfo.max_objects | |
791 | << dendl; | |
792 | return true; | |
793 | } | |
794 | ||
795 | return false; | |
796 | } | |
797 | ||
798 | bool RGWQuotaInfoRawApplier::is_size_exceeded(const char * const entity, | |
799 | const RGWQuotaInfo& qinfo, | |
800 | const RGWStorageStats& stats, | |
801 | const uint64_t size) const | |
802 | { | |
803 | if (qinfo.max_size < 0) { | |
804 | /* The limit is not enabled. */ | |
805 | return false; | |
806 | } | |
807 | ||
808 | const uint64_t cur_size = stats.size; | |
809 | ||
810 | if (cur_size + size > static_cast<uint64_t>(qinfo.max_size)) { | |
811 | dout(10) << "quota exceeded: stats.size=" << stats.size | |
812 | << " size=" << size << " " | |
813 | << entity << "_quota.max_size=" << qinfo.max_size << dendl; | |
814 | return true; | |
815 | } | |
816 | ||
817 | return false; | |
818 | } | |
819 | ||
820 | bool RGWQuotaInfoRawApplier::is_num_objs_exceeded(const char * const entity, | |
821 | const RGWQuotaInfo& qinfo, | |
822 | const RGWStorageStats& stats, | |
823 | const uint64_t num_objs) const | |
824 | { | |
825 | if (qinfo.max_objects < 0) { | |
826 | /* The limit is not enabled. */ | |
827 | return false; | |
828 | } | |
829 | ||
830 | if (stats.num_objects + num_objs > static_cast<uint64_t>(qinfo.max_objects)) { | |
831 | dout(10) << "quota exceeded: stats.num_objects=" << stats.num_objects | |
832 | << " " << entity << "_quota.max_objects=" << qinfo.max_objects | |
833 | << dendl; | |
834 | return true; | |
835 | } | |
836 | ||
837 | return false; | |
838 | } | |
839 | ||
840 | const RGWQuotaInfoApplier& RGWQuotaInfoApplier::get_instance( | |
841 | const RGWQuotaInfo& qinfo) | |
842 | { | |
843 | static RGWQuotaInfoDefApplier default_qapplier; | |
844 | static RGWQuotaInfoRawApplier raw_qapplier; | |
845 | ||
846 | if (qinfo.check_on_raw) { | |
847 | return raw_qapplier; | |
848 | } else { | |
849 | return default_qapplier; | |
850 | } | |
851 | } | |
852 | ||
853 | ||
854 | class RGWQuotaHandlerImpl : public RGWQuotaHandler { | |
855 | RGWRados *store; | |
856 | RGWBucketStatsCache bucket_stats_cache; | |
857 | RGWUserStatsCache user_stats_cache; | |
858 | ||
859 | int check_quota(const char * const entity, | |
860 | const RGWQuotaInfo& quota, | |
861 | const RGWStorageStats& stats, | |
862 | const uint64_t num_objs, | |
863 | const uint64_t size) { | |
864 | if (!quota.enabled) { | |
865 | return 0; | |
866 | } | |
867 | ||
868 | const auto& quota_applier = RGWQuotaInfoApplier::get_instance(quota); | |
869 | ||
870 | ldout(store->ctx(), 20) << entity | |
871 | << " quota: max_objects=" << quota.max_objects | |
872 | << " max_size=" << quota.max_size << dendl; | |
873 | ||
874 | ||
875 | if (quota_applier.is_num_objs_exceeded(entity, quota, stats, num_objs)) { | |
876 | return -ERR_QUOTA_EXCEEDED; | |
877 | } | |
878 | ||
879 | if (quota_applier.is_size_exceeded(entity, quota, stats, size)) { | |
880 | return -ERR_QUOTA_EXCEEDED; | |
881 | } | |
882 | ||
883 | ldout(store->ctx(), 20) << entity << " quota OK:" | |
884 | << " stats.num_objects=" << stats.num_objects | |
885 | << " stats.size=" << stats.size << dendl; | |
886 | return 0; | |
887 | } | |
888 | public: | |
889 | RGWQuotaHandlerImpl(RGWRados *_store, bool quota_threads) : store(_store), | |
890 | bucket_stats_cache(_store), | |
891 | user_stats_cache(_store, quota_threads) {} | |
892 | ||
893 | int check_quota(const rgw_user& user, | |
894 | rgw_bucket& bucket, | |
895 | RGWQuotaInfo& user_quota, | |
896 | RGWQuotaInfo& bucket_quota, | |
897 | uint64_t num_objs, | |
898 | uint64_t size) override { | |
899 | ||
900 | if (!bucket_quota.enabled && !user_quota.enabled) { | |
901 | return 0; | |
902 | } | |
903 | ||
904 | /* | |
905 | * we need to fetch bucket stats if the user quota is enabled, because | |
906 | * the whole system relies on us periodically updating the user's bucket | |
907 | * stats in the user's header, this happens in get_stats() if we actually | |
908 | * fetch that info and not rely on cached data | |
909 | */ | |
910 | ||
911 | if (bucket_quota.enabled) { | |
912 | RGWStorageStats bucket_stats; | |
913 | int ret = bucket_stats_cache.get_stats(user, bucket, bucket_stats, | |
914 | bucket_quota); | |
915 | if (ret < 0) { | |
916 | return ret; | |
917 | } | |
918 | ret = check_quota("bucket", bucket_quota, bucket_stats, num_objs, size); | |
919 | if (ret < 0) { | |
920 | return ret; | |
921 | } | |
922 | } | |
923 | ||
924 | if (user_quota.enabled) { | |
925 | RGWStorageStats user_stats; | |
926 | int ret = user_stats_cache.get_stats(user, bucket, user_stats, user_quota); | |
927 | if (ret < 0) { | |
928 | return ret; | |
929 | } | |
930 | ret = check_quota("user", user_quota, user_stats, num_objs, size); | |
931 | if (ret < 0) { | |
932 | return ret; | |
933 | } | |
934 | } | |
935 | return 0; | |
936 | } | |
937 | ||
938 | void update_stats(const rgw_user& user, rgw_bucket& bucket, int obj_delta, uint64_t added_bytes, uint64_t removed_bytes) override { | |
939 | bucket_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes); | |
940 | user_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes); | |
941 | } | |
31f18b77 FG |
942 | |
943 | int check_bucket_shards(uint64_t max_objs_per_shard, uint64_t num_shards, | |
224ce89b | 944 | const rgw_user& user, const rgw_bucket& bucket, RGWQuotaInfo& bucket_quota, |
31f18b77 FG |
945 | uint64_t num_objs, bool& need_resharding, uint32_t *suggested_num_shards) |
946 | { | |
947 | RGWStorageStats bucket_stats; | |
948 | int ret = bucket_stats_cache.get_stats(user, bucket, bucket_stats, | |
949 | bucket_quota); | |
950 | if (ret < 0) { | |
951 | return ret; | |
952 | } | |
953 | ||
954 | if (bucket_stats.num_objects + num_objs > num_shards * max_objs_per_shard) { | |
955 | ldout(store->ctx(), 0) << __func__ << ": resharding needed: stats.num_objects=" << bucket_stats.num_objects | |
956 | << " shard max_objects=" << max_objs_per_shard * num_shards << dendl; | |
957 | need_resharding = true; | |
958 | if (suggested_num_shards) { | |
959 | *suggested_num_shards = (bucket_stats.num_objects + num_objs) * 2 / max_objs_per_shard; | |
960 | } | |
961 | } else { | |
962 | need_resharding = false; | |
963 | } | |
964 | ||
965 | return 0; | |
966 | } | |
967 | ||
7c673cae FG |
968 | }; |
969 | ||
970 | ||
971 | RGWQuotaHandler *RGWQuotaHandler::generate_handler(RGWRados *store, bool quota_threads) | |
972 | { | |
973 | return new RGWQuotaHandlerImpl(store, quota_threads); | |
974 | } | |
975 | ||
976 | void RGWQuotaHandler::free_handler(RGWQuotaHandler *handler) | |
977 | { | |
978 | delete handler; | |
979 | } | |
31f18b77 FG |
980 | |
981 |