}
};
- virtual int fetch_stats_from_storage(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats) = 0;
+ virtual int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) = 0;
- virtual bool map_find(const rgw_user& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0;
+ virtual bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0;
- virtual bool map_find_and_update(const rgw_user& user, rgw_bucket& bucket, typename lru_map<T, RGWQuotaCacheStats>::UpdateContext *ctx) = 0;
- virtual void map_add(const rgw_user& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0;
+ virtual bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, typename lru_map<T, RGWQuotaCacheStats>::UpdateContext *ctx) = 0;
+ virtual void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0;
virtual void data_modified(const rgw_user& user, rgw_bucket& bucket) {}
public:
async_refcount->put_wait(); /* wait for all pending async requests to complete */
}
- int get_stats(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota);
+ int get_stats(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota);
void adjust_stats(const rgw_user& user, rgw_bucket& bucket, int objs_delta, uint64_t added_bytes, uint64_t removed_bytes);
virtual bool can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& stats);
- void set_stats(const rgw_user& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats);
- int async_refresh(const rgw_user& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs);
+ void set_stats(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats);
+ int async_refresh(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs);
void async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats);
class AsyncRefreshHandler {
virtual void drop_reference() = 0;
};
- virtual AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, rgw_bucket& bucket) = 0;
+ virtual AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) = 0;
};
template<class T>
}
template<class T>
-int RGWQuotaCache<T>::async_refresh(const rgw_user& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs)
+int RGWQuotaCache<T>::async_refresh(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs)
{
/* protect against multiple updates */
StatsAsyncTestSet test_update;
}
template<class T>
-void RGWQuotaCache<T>::set_stats(const rgw_user& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats)
+void RGWQuotaCache<T>::set_stats(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats)
{
qs.stats = stats;
qs.expiration = ceph_clock_now();
}
template<class T>
-int RGWQuotaCache<T>::get_stats(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota) {
+int RGWQuotaCache<T>::get_stats(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota) {
RGWQuotaCacheStats qs;
utime_t now = ceph_clock_now();
if (map_find(user, bucket, qs)) {
rgw_user user;
public:
BucketAsyncRefreshHandler(RGWRados *_store, RGWQuotaCache<rgw_bucket> *_cache,
- const rgw_user& _user, rgw_bucket& _bucket) :
+ const rgw_user& _user, const rgw_bucket& _bucket) :
RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler(_store, _cache),
RGWGetBucketStats_CB(_bucket), user(_user) {}
class RGWBucketStatsCache : public RGWQuotaCache<rgw_bucket> {
protected:
- bool map_find(const rgw_user& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
+ bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
return stats_map.find(bucket, qs);
}
- bool map_find_and_update(const rgw_user& user, rgw_bucket& bucket, lru_map<rgw_bucket, RGWQuotaCacheStats>::UpdateContext *ctx) override {
+ bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, lru_map<rgw_bucket, RGWQuotaCacheStats>::UpdateContext *ctx) override {
return stats_map.find_and_update(bucket, NULL, ctx);
}
- void map_add(const rgw_user& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
+ void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
stats_map.add(bucket, qs);
}
- int fetch_stats_from_storage(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats) override;
+ int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) override;
public:
explicit RGWBucketStatsCache(RGWRados *_store) : RGWQuotaCache<rgw_bucket>(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size) {
}
- AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, rgw_bucket& bucket) override {
+ AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override {
return new BucketAsyncRefreshHandler(store, this, user, bucket);
}
};
-int RGWBucketStatsCache::fetch_stats_from_storage(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats)
+int RGWBucketStatsCache::fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats)
{
RGWBucketInfo bucket_info;
rgw_bucket bucket;
public:
UserAsyncRefreshHandler(RGWRados *_store, RGWQuotaCache<rgw_user> *_cache,
- const rgw_user& _user, rgw_bucket& _bucket) :
+ const rgw_user& _user, const rgw_bucket& _bucket) :
RGWQuotaCache<rgw_user>::AsyncRefreshHandler(_store, _cache),
RGWGetUserStats_CB(_user),
bucket(_bucket) {}
BucketsSyncThread *buckets_sync_thread;
UserSyncThread *user_sync_thread;
protected:
- bool map_find(const rgw_user& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
+ bool map_find(const rgw_user& user,const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
return stats_map.find(user, qs);
}
- bool map_find_and_update(const rgw_user& user, rgw_bucket& bucket, lru_map<rgw_user, RGWQuotaCacheStats>::UpdateContext *ctx) override {
+ bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, lru_map<rgw_user, RGWQuotaCacheStats>::UpdateContext *ctx) override {
return stats_map.find_and_update(user, NULL, ctx);
}
- void map_add(const rgw_user& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
+ void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
stats_map.add(user, qs);
}
- int fetch_stats_from_storage(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats) override;
+ int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) override;
int sync_bucket(const rgw_user& rgw_user, rgw_bucket& bucket);
int sync_user(const rgw_user& user);
int sync_all_users();
stop();
}
- AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, rgw_bucket& bucket) override {
+ AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override {
return new UserAsyncRefreshHandler(store, this, user, bucket);
}
}
};
-int RGWUserStatsCache::fetch_stats_from_storage(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats)
+int RGWUserStatsCache::fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats)
{
int r = store->get_user_stats(user, stats);
if (r < 0) {
bucket_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes);
user_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes);
}
+
+ int check_bucket_shards(uint64_t max_objs_per_shard, uint64_t num_shards,
+ const rgw_user& user, const rgw_bucket& bucket, RGWQuotaInfo& bucket_quota,
+ uint64_t num_objs, bool& need_resharding, uint32_t *suggested_num_shards)
+ {
+ RGWStorageStats bucket_stats;
+ int ret = bucket_stats_cache.get_stats(user, bucket, bucket_stats,
+ bucket_quota);
+ if (ret < 0) {
+ return ret;
+ }
+
+ if (bucket_stats.num_objects + num_objs > num_shards * max_objs_per_shard) {
+ ldout(store->ctx(), 0) << __func__ << ": resharding needed: stats.num_objects=" << bucket_stats.num_objects
+ << " shard max_objects=" << max_objs_per_shard * num_shards << dendl;
+ need_resharding = true;
+ if (suggested_num_shards) {
+ *suggested_num_shards = (bucket_stats.num_objects + num_objs) * 2 / max_objs_per_shard;
+ }
+ } else {
+ need_resharding = false;
+ }
+
+ return 0;
+ }
+
};
{
delete handler;
}
+
+