template<class T>
class RGWQuotaCache {
protected:
- rgw::sal::Store* store;
+ rgw::sal::Driver* driver;
lru_map<T, RGWQuotaCacheStats> stats_map;
RefCountedWaitObject *async_refcount;
virtual void data_modified(const rgw_user& user, rgw_bucket& bucket) {}
public:
- RGWQuotaCache(rgw::sal::Store* _store, int size) : store(_store), stats_map(size) {
+ RGWQuotaCache(rgw::sal::Driver* _driver, int size) : driver(_driver), stats_map(size) {
async_refcount = new RefCountedWaitObject;
}
virtual ~RGWQuotaCache() {
class AsyncRefreshHandler {
protected:
- rgw::sal::Store* store;
+ rgw::sal::Driver* driver;
RGWQuotaCache<T> *cache;
public:
- AsyncRefreshHandler(rgw::sal::Store* _store, RGWQuotaCache<T> *_cache) : store(_store), cache(_cache) {}
+ AsyncRefreshHandler(rgw::sal::Driver* _driver, RGWQuotaCache<T> *_cache) : driver(_driver), cache(_cache) {}
virtual ~AsyncRefreshHandler() {}
virtual int init_fetch() = 0;
template<class T>
void RGWQuotaCache<T>::async_refresh_fail(const rgw_user& user, rgw_bucket& bucket)
{
- ldout(store->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl;
+ ldout(driver->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl;
async_refcount->put();
}
template<class T>
void RGWQuotaCache<T>::async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats)
{
- ldout(store->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl;
+ ldout(driver->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl;
RGWQuotaCacheStats qs;
qs.stats = stats;
qs.expiration = ceph_clock_now();
qs.async_refresh_time = qs.expiration;
- qs.expiration += store->ctx()->_conf->rgw_bucket_quota_ttl;
- qs.async_refresh_time += store->ctx()->_conf->rgw_bucket_quota_ttl / 2;
+ qs.expiration += driver->ctx()->_conf->rgw_bucket_quota_ttl;
+ qs.async_refresh_time += driver->ctx()->_conf->rgw_bucket_quota_ttl / 2;
map_add(user, bucket, qs);
}
public RGWGetBucketStats_CB {
rgw_user user;
public:
- BucketAsyncRefreshHandler(rgw::sal::Store* _store, RGWQuotaCache<rgw_bucket> *_cache,
+ BucketAsyncRefreshHandler(rgw::sal::Driver* _driver, RGWQuotaCache<rgw_bucket> *_cache,
const rgw_user& _user, const rgw_bucket& _bucket) :
- RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler(_store, _cache),
+ RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler(_driver, _cache),
RGWGetBucketStats_CB(_bucket), user(_user) {}
void drop_reference() override { put(); }
{
std::unique_ptr<rgw::sal::Bucket> rbucket;
- const DoutPrefix dp(store->ctx(), dout_subsys, "rgw bucket async refresh handler: ");
- int r = store->get_bucket(&dp, nullptr, bucket, &rbucket, null_yield);
+ const DoutPrefix dp(driver->ctx(), dout_subsys, "rgw bucket async refresh handler: ");
+ int r = driver->get_bucket(&dp, nullptr, bucket, &rbucket, null_yield);
if (r < 0) {
ldpp_dout(&dp, 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl;
return r;
ldpp_dout(&dp, 20) << "initiating async quota refresh for bucket=" << bucket << dendl;
- r = rbucket->read_stats_async(&dp, RGW_NO_SHARD, this);
+ const auto& index = rbucket->get_info().get_current_index();
+ if (is_layout_indexless(index)) {
+ return 0;
+ }
+
+ r = rbucket->read_stats_async(&dp, index, RGW_NO_SHARD, this);
if (r < 0) {
ldpp_dout(&dp, 0) << "could not get bucket info for bucket=" << bucket.name << dendl;
void BucketAsyncRefreshHandler::handle_response(const int r)
{
if (r < 0) {
- ldout(store->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
+ ldout(driver->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
cache->async_refresh_fail(user, bucket);
return;
}
int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider *dpp) override;
public:
- explicit RGWBucketStatsCache(rgw::sal::Store* _store) : RGWQuotaCache<rgw_bucket>(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size) {
+ explicit RGWBucketStatsCache(rgw::sal::Driver* _driver) : RGWQuotaCache<rgw_bucket>(_driver, _driver->ctx()->_conf->rgw_bucket_quota_cache_size) {
}
AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override {
- return new BucketAsyncRefreshHandler(store, this, user, bucket);
+ return new BucketAsyncRefreshHandler(driver, this, user, bucket);
}
};
int RGWBucketStatsCache::fetch_stats_from_storage(const rgw_user& _u, const rgw_bucket& _b, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider *dpp)
{
- std::unique_ptr<rgw::sal::User> user = store->get_user(_u);
+ std::unique_ptr<rgw::sal::User> user = driver->get_user(_u);
std::unique_ptr<rgw::sal::Bucket> bucket;
- int r = store->get_bucket(dpp, user.get(), _b, &bucket, y);
+ int r = driver->get_bucket(dpp, user.get(), _b, &bucket, y);
if (r < 0) {
ldpp_dout(dpp, 0) << "could not get bucket info for bucket=" << _b << " r=" << r << dendl;
return r;
}
+ stats = RGWStorageStats();
+
+ const auto& index = bucket->get_info().get_current_index();
+ if (is_layout_indexless(index)) {
+ return 0;
+ }
+
string bucket_ver;
string master_ver;
map<RGWObjCategory, RGWStorageStats> bucket_stats;
- r = bucket->read_stats(dpp, RGW_NO_SHARD, &bucket_ver, &master_ver, bucket_stats);
+ r = bucket->read_stats(dpp, index, RGW_NO_SHARD, &bucket_ver,
+ &master_ver, bucket_stats, nullptr);
if (r < 0) {
ldpp_dout(dpp, 0) << "could not get bucket stats for bucket="
<< _b.name << dendl;
return r;
}
- stats = RGWStorageStats();
-
for (const auto& pair : bucket_stats) {
const RGWStorageStats& s = pair.second;
const DoutPrefixProvider *dpp;
rgw_bucket bucket;
public:
- UserAsyncRefreshHandler(const DoutPrefixProvider *_dpp, rgw::sal::Store* _store, RGWQuotaCache<rgw_user> *_cache,
+ UserAsyncRefreshHandler(const DoutPrefixProvider *_dpp, rgw::sal::Driver* _driver, RGWQuotaCache<rgw_user> *_cache,
const rgw_user& _user, const rgw_bucket& _bucket) :
- RGWQuotaCache<rgw_user>::AsyncRefreshHandler(_store, _cache),
+ RGWQuotaCache<rgw_user>::AsyncRefreshHandler(_driver, _cache),
RGWGetUserStats_CB(_user),
dpp(_dpp),
bucket(_bucket) {}
int UserAsyncRefreshHandler::init_fetch()
{
- std::unique_ptr<rgw::sal::User> ruser = store->get_user(user);
+ std::unique_ptr<rgw::sal::User> ruser = driver->get_user(user);
ldpp_dout(dpp, 20) << "initiating async quota refresh for user=" << user << dendl;
int r = ruser->read_stats_async(dpp, this);
void UserAsyncRefreshHandler::handle_response(int r)
{
if (r < 0) {
- ldout(store->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
+ ldout(driver->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
cache->async_refresh_fail(user, bucket);
return;
}
}
public:
- RGWUserStatsCache(const DoutPrefixProvider *dpp, rgw::sal::Store* _store, bool quota_threads)
- : RGWQuotaCache<rgw_user>(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size), dpp(dpp)
+ RGWUserStatsCache(const DoutPrefixProvider *dpp, rgw::sal::Driver* _driver, bool quota_threads)
+ : RGWQuotaCache<rgw_user>(_driver, _driver->ctx()->_conf->rgw_bucket_quota_cache_size), dpp(dpp)
{
if (quota_threads) {
- buckets_sync_thread = new BucketsSyncThread(store->ctx(), this);
+ buckets_sync_thread = new BucketsSyncThread(driver->ctx(), this);
buckets_sync_thread->create("rgw_buck_st_syn");
- user_sync_thread = new UserSyncThread(store->ctx(), this);
+ user_sync_thread = new UserSyncThread(driver->ctx(), this);
user_sync_thread->create("rgw_user_st_syn");
} else {
buckets_sync_thread = NULL;
}
AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override {
- return new UserAsyncRefreshHandler(dpp, store, this, user, bucket);
+ return new UserAsyncRefreshHandler(dpp, driver, this, user, bucket);
}
bool going_down() {
optional_yield y,
const DoutPrefixProvider *dpp)
{
- std::unique_ptr<rgw::sal::User> user = store->get_user(_u);
+ std::unique_ptr<rgw::sal::User> user = driver->get_user(_u);
int r = user->read_stats(dpp, y, &stats);
if (r < 0) {
ldpp_dout(dpp, 0) << "could not get user stats for user=" << user << dendl;
int RGWUserStatsCache::sync_bucket(const rgw_user& _u, rgw_bucket& _b, optional_yield y, const DoutPrefixProvider *dpp)
{
- std::unique_ptr<rgw::sal::User> user = store->get_user(_u);
+ std::unique_ptr<rgw::sal::User> user = driver->get_user(_u);
std::unique_ptr<rgw::sal::Bucket> bucket;
- int r = store->get_bucket(dpp, user.get(), _b, &bucket, y);
+ int r = driver->get_bucket(dpp, user.get(), _b, &bucket, y);
if (r < 0) {
ldpp_dout(dpp, 0) << "could not get bucket info for bucket=" << _b << " r=" << r << dendl;
return r;
RGWStorageStats stats;
ceph::real_time last_stats_sync;
ceph::real_time last_stats_update;
- std::unique_ptr<rgw::sal::User> user = store->get_user(rgw_user(_u.to_str()));
+ std::unique_ptr<rgw::sal::User> user = driver->get_user(rgw_user(_u.to_str()));
int ret = user->read_stats(dpp, y, &stats, &last_stats_sync, &last_stats_update);
if (ret < 0) {
return ret;
}
- if (!store->ctx()->_conf->rgw_user_quota_sync_idle_users &&
+ if (!driver->ctx()->_conf->rgw_user_quota_sync_idle_users &&
last_stats_update < last_stats_sync) {
ldpp_dout(dpp, 20) << "user is idle, not doing a full sync (user=" << user << ")" << dendl;
return 0;
}
real_time when_need_full_sync = last_stats_sync;
- when_need_full_sync += make_timespan(store->ctx()->_conf->rgw_user_quota_sync_wait_time);
+ when_need_full_sync += make_timespan(driver->ctx()->_conf->rgw_user_quota_sync_wait_time);
// check if enough time passed since last full sync
/* FIXME: missing check? */
- ret = rgw_user_sync_all_stats(dpp, store, user.get(), y);
+ ret = rgw_user_sync_all_stats(dpp, driver, user.get(), y);
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: failed user stats sync, ret=" << ret << dendl;
return ret;
string key = "user";
void *handle;
- int ret = store->meta_list_keys_init(dpp, key, string(), &handle);
+ int ret = driver->meta_list_keys_init(dpp, key, string(), &handle);
if (ret < 0) {
ldpp_dout(dpp, 10) << "ERROR: can't get key: ret=" << ret << dendl;
return ret;
do {
list<string> keys;
- ret = store->meta_list_keys_next(dpp, handle, max, keys, &truncated);
+ ret = driver->meta_list_keys_next(dpp, handle, max, keys, &truncated);
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: lists_keys_next(): ret=" << ret << dendl;
goto done;
ret = 0;
done:
- store->meta_list_keys_complete(handle);
+ driver->meta_list_keys_complete(handle);
return ret;
}
const uint64_t cur_size = stats.size_rounded;
const uint64_t new_size = rgw_rounded_objsize(size);
- if (cur_size + new_size > static_cast<uint64_t>(qinfo.max_size)) {
+ if (std::cmp_greater(cur_size + new_size, qinfo.max_size)) {
ldpp_dout(dpp, 10) << "quota exceeded: stats.size_rounded=" << stats.size_rounded
<< " size=" << new_size << " "
<< entity << "_quota.max_size=" << qinfo.max_size << dendl;
return false;
}
- if (stats.num_objects + num_objs > static_cast<uint64_t>(qinfo.max_objects)) {
+ if (std::cmp_greater(stats.num_objects + num_objs, qinfo.max_objects)) {
ldpp_dout(dpp, 10) << "quota exceeded: stats.num_objects=" << stats.num_objects
<< " " << entity << "_quota.max_objects=" << qinfo.max_objects
<< dendl;
const uint64_t cur_size = stats.size;
- if (cur_size + size > static_cast<uint64_t>(qinfo.max_size)) {
+ if (std::cmp_greater(cur_size + size, qinfo.max_size)) {
ldpp_dout(dpp, 10) << "quota exceeded: stats.size=" << stats.size
<< " size=" << size << " "
<< entity << "_quota.max_size=" << qinfo.max_size << dendl;
return false;
}
- if (stats.num_objects + num_objs > static_cast<uint64_t>(qinfo.max_objects)) {
+ if (std::cmp_greater(stats.num_objects + num_objs, qinfo.max_objects)) {
ldpp_dout(dpp, 10) << "quota exceeded: stats.num_objects=" << stats.num_objects
<< " " << entity << "_quota.max_objects=" << qinfo.max_objects
<< dendl;
class RGWQuotaHandlerImpl : public RGWQuotaHandler {
- rgw::sal::Store* store;
+ rgw::sal::Driver* driver;
RGWBucketStatsCache bucket_stats_cache;
RGWUserStatsCache user_stats_cache;
return 0;
}
public:
- RGWQuotaHandlerImpl(const DoutPrefixProvider *dpp, rgw::sal::Store* _store, bool quota_threads) : store(_store),
- bucket_stats_cache(_store),
- user_stats_cache(dpp, _store, quota_threads) {}
+ RGWQuotaHandlerImpl(const DoutPrefixProvider *dpp, rgw::sal::Driver* _driver, bool quota_threads) : driver(_driver),
+ bucket_stats_cache(_driver),
+ user_stats_cache(dpp, _driver, quota_threads) {}
int check_quota(const DoutPrefixProvider *dpp,
const rgw_user& user,
- rgw_bucket& bucket,
- RGWQuotaInfo& user_quota,
- RGWQuotaInfo& bucket_quota,
- uint64_t num_objs,
- uint64_t size, optional_yield y) override {
+ rgw_bucket& bucket,
+ RGWQuota& quota,
+ uint64_t num_objs,
+ uint64_t size, optional_yield y) override {
- if (!bucket_quota.enabled && !user_quota.enabled) {
+ if (!quota.bucket_quota.enabled && !quota.user_quota.enabled) {
return 0;
}
* fetch that info and not rely on cached data
*/
- const DoutPrefix dp(store->ctx(), dout_subsys, "rgw quota handler: ");
- if (bucket_quota.enabled) {
+ const DoutPrefix dp(driver->ctx(), dout_subsys, "rgw quota handler: ");
+ if (quota.bucket_quota.enabled) {
RGWStorageStats bucket_stats;
int ret = bucket_stats_cache.get_stats(user, bucket, bucket_stats, y, &dp);
if (ret < 0) {
return ret;
}
- ret = check_quota(dpp, "bucket", bucket_quota, bucket_stats, num_objs, size);
+ ret = check_quota(dpp, "bucket", quota.bucket_quota, bucket_stats, num_objs, size);
if (ret < 0) {
return ret;
}
}
- if (user_quota.enabled) {
+ if (quota.user_quota.enabled) {
RGWStorageStats user_stats;
int ret = user_stats_cache.get_stats(user, bucket, user_stats, y, &dp);
if (ret < 0) {
return ret;
}
- ret = check_quota(dpp, "user", user_quota, user_stats, num_objs, size);
+ ret = check_quota(dpp, "user", quota.user_quota, user_stats, num_objs, size);
if (ret < 0) {
return ret;
}
user_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes);
}
- void check_bucket_shards(const DoutPrefixProvider *dpp, uint64_t max_objs_per_shard, uint64_t num_shards,
- uint64_t num_objs, bool& need_resharding, uint32_t *suggested_num_shards) override
+ void check_bucket_shards(const DoutPrefixProvider *dpp, uint64_t max_objs_per_shard,
+ uint64_t num_shards, uint64_t num_objs, bool is_multisite,
+ bool& need_resharding, uint32_t *suggested_num_shards) override
{
if (num_objs > num_shards * max_objs_per_shard) {
ldpp_dout(dpp, 0) << __func__ << ": resharding needed: stats.num_objects=" << num_objs
<< " shard max_objects=" << max_objs_per_shard * num_shards << dendl;
need_resharding = true;
if (suggested_num_shards) {
- *suggested_num_shards = num_objs * 2 / max_objs_per_shard;
+ uint32_t obj_multiplier = 2;
+ if (is_multisite) {
+ // if we're maintaining bilogs for multisite, reshards are significantly
+ // more expensive. scale up the shard count much faster to minimize the
+ // number of reshard events during a write workload
+ obj_multiplier = 8;
+ }
+ *suggested_num_shards = num_objs * obj_multiplier / max_objs_per_shard;
}
} else {
need_resharding = false;
};
-RGWQuotaHandler *RGWQuotaHandler::generate_handler(const DoutPrefixProvider *dpp, rgw::sal::Store* store, bool quota_threads)
+RGWQuotaHandler *RGWQuotaHandler::generate_handler(const DoutPrefixProvider *dpp, rgw::sal::Driver* driver, bool quota_threads)
{
- return new RGWQuotaHandlerImpl(dpp, store, quota_threads);
+ return new RGWQuotaHandlerImpl(dpp, driver, quota_threads);
}
void RGWQuotaHandler::free_handler(RGWQuotaHandler *handler)