#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw
+using namespace std;
+
const string reshard_oid_prefix = "reshard.";
const string reshard_lock_name = "reshard_process";
const string bucket_instance_lock_name = "bucket_instance_lock";
};
class BucketReshardShard {
- rgw::sal::RGWRadosStore *store;
+ rgw::sal::RadosStore* store;
const RGWBucketInfo& bucket_info;
int num_shard;
const rgw::bucket_index_layout_generation& idx_layout;
}
public:
- BucketReshardShard(rgw::sal::RGWRadosStore *_store, const RGWBucketInfo& _bucket_info,
+ BucketReshardShard(const DoutPrefixProvider *dpp,
+ rgw::sal::RadosStore* _store, const RGWBucketInfo& _bucket_info,
int _num_shard, const rgw::bucket_index_layout_generation& _idx_layout,
deque<librados::AioCompletion *>& _completions) :
store(_store), bucket_info(_bucket_info), idx_layout(_idx_layout), bs(store->getRados()),
{
num_shard = (idx_layout.layout.normal.num_shards > 0 ? _num_shard : -1);
- bs.init(bucket_info.bucket, num_shard, idx_layout, nullptr /* no RGWBucketInfo */);
+ bs.init(bucket_info.bucket, num_shard, idx_layout, nullptr /* no RGWBucketInfo */, dpp);
max_aio_completions =
store->ctx()->_conf.get_val<uint64_t>("rgw_reshard_max_aio");
class BucketReshardManager {
- rgw::sal::RGWRadosStore *store;
+ rgw::sal::RadosStore* store;
const RGWBucketInfo& target_bucket_info;
deque<librados::AioCompletion *> completions;
int num_target_shards;
vector<BucketReshardShard *> target_shards;
public:
- BucketReshardManager(rgw::sal::RGWRadosStore *_store,
+ BucketReshardManager(const DoutPrefixProvider *dpp,
+ rgw::sal::RadosStore* _store,
const RGWBucketInfo& _target_bucket_info,
int _num_target_shards) :
store(_store), target_bucket_info(_target_bucket_info),
const auto& idx_layout = target_bucket_info.layout.current_index;
target_shards.resize(num_target_shards);
for (int i = 0; i < num_target_shards; ++i) {
- target_shards[i] = new BucketReshardShard(store, target_bucket_info, i, idx_layout, completions);
+ target_shards[i] = new BucketReshardShard(dpp, store, target_bucket_info, i, idx_layout, completions);
}
}
}
}; // class BucketReshardManager
-RGWBucketReshard::RGWBucketReshard(rgw::sal::RGWRadosStore *_store,
+RGWBucketReshard::RGWBucketReshard(rgw::sal::RadosStore* _store,
const RGWBucketInfo& _bucket_info,
const map<string, bufferlist>& _bucket_attrs,
RGWBucketReshardLock* _outer_reshard_lock) :
outer_reshard_lock(_outer_reshard_lock)
{ }
-int RGWBucketReshard::set_resharding_status(rgw::sal::RGWRadosStore* store,
+int RGWBucketReshard::set_resharding_status(const DoutPrefixProvider *dpp,
+ rgw::sal::RadosStore* store,
const RGWBucketInfo& bucket_info,
const string& new_instance_id,
int32_t num_shards,
cls_rgw_reshard_status status)
{
if (new_instance_id.empty()) {
- ldout(store->ctx(), 0) << __func__ << " missing new bucket instance id" << dendl;
+ ldpp_dout(dpp, 0) << __func__ << " missing new bucket instance id" << dendl;
return -EINVAL;
}
cls_rgw_bucket_instance_entry instance_entry;
instance_entry.set_status(new_instance_id, num_shards, status);
- int ret = store->getRados()->bucket_set_reshard(bucket_info, instance_entry);
+ int ret = store->getRados()->bucket_set_reshard(dpp, bucket_info, instance_entry);
if (ret < 0) {
- ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: error setting bucket resharding flag on bucket index: "
+ ldpp_dout(dpp, 0) << "RGWReshard::" << __func__ << " ERROR: error setting bucket resharding flag on bucket index: "
<< cpp_strerror(-ret) << dendl;
return ret;
}
}
// reshard lock assumes lock is held
-int RGWBucketReshard::clear_resharding(rgw::sal::RGWRadosStore* store,
+int RGWBucketReshard::clear_resharding(const DoutPrefixProvider *dpp,
+ rgw::sal::RadosStore* store,
const RGWBucketInfo& bucket_info)
{
- int ret = clear_index_shard_reshard_status(store, bucket_info);
+ int ret = clear_index_shard_reshard_status(dpp, store, bucket_info);
if (ret < 0) {
- ldout(store->ctx(), 0) << "RGWBucketReshard::" << __func__ <<
+ ldpp_dout(dpp, 0) << "RGWBucketReshard::" << __func__ <<
" ERROR: error clearing reshard status from index shard " <<
cpp_strerror(-ret) << dendl;
return ret;
}
cls_rgw_bucket_instance_entry instance_entry;
- ret = store->getRados()->bucket_set_reshard(bucket_info, instance_entry);
+ ret = store->getRados()->bucket_set_reshard(dpp, bucket_info, instance_entry);
if (ret < 0) {
- ldout(store->ctx(), 0) << "RGWReshard::" << __func__ <<
+ ldpp_dout(dpp, 0) << "RGWReshard::" << __func__ <<
" ERROR: error setting bucket resharding flag on bucket index: " <<
cpp_strerror(-ret) << dendl;
return ret;
return 0;
}
-int RGWBucketReshard::clear_index_shard_reshard_status(rgw::sal::RGWRadosStore* store,
+int RGWBucketReshard::clear_index_shard_reshard_status(const DoutPrefixProvider *dpp,
+ rgw::sal::RadosStore* store,
const RGWBucketInfo& bucket_info)
{
uint32_t num_shards = bucket_info.layout.current_index.layout.normal.num_shards;
if (num_shards < std::numeric_limits<uint32_t>::max()) {
- int ret = set_resharding_status(store, bucket_info,
+ int ret = set_resharding_status(dpp, store, bucket_info,
bucket_info.bucket.bucket_id,
(num_shards < 1 ? 1 : num_shards),
cls_rgw_reshard_status::NOT_RESHARDING);
if (ret < 0) {
- ldout(store->ctx(), 0) << "RGWBucketReshard::" << __func__ <<
+ ldpp_dout(dpp, 0) << "RGWBucketReshard::" << __func__ <<
" ERROR: error clearing reshard status from index shard " <<
cpp_strerror(-ret) << dendl;
return ret;
return 0;
}
-static int create_new_bucket_instance(rgw::sal::RGWRadosStore *store,
+static int create_new_bucket_instance(rgw::sal::RadosStore* store,
int new_num_shards,
const RGWBucketInfo& bucket_info,
map<string, bufferlist>& attrs,
- RGWBucketInfo& new_bucket_info)
+ RGWBucketInfo& new_bucket_info,
+ const DoutPrefixProvider *dpp)
{
new_bucket_info = bucket_info;
new_bucket_info.new_bucket_instance_id.clear();
new_bucket_info.reshard_status = cls_rgw_reshard_status::NOT_RESHARDING;
- int ret = store->svc()->bi->init_index(new_bucket_info);
+ int ret = store->getRados()->put_bucket_instance_info(new_bucket_info, true, real_time(), &attrs, dpp);
if (ret < 0) {
- cerr << "ERROR: failed to init new bucket indexes: " << cpp_strerror(-ret) << std::endl;
+ cerr << "ERROR: failed to store new bucket instance info: " << cpp_strerror(-ret) << std::endl;
return ret;
}
- ret = store->getRados()->put_bucket_instance_info(new_bucket_info, true, real_time(), &attrs);
+ ret = store->svc()->bi->init_index(dpp, new_bucket_info);
if (ret < 0) {
- cerr << "ERROR: failed to store new bucket instance info: " << cpp_strerror(-ret) << std::endl;
+ cerr << "ERROR: failed to init new bucket indexes: " << cpp_strerror(-ret) << std::endl;
return ret;
}
}
int RGWBucketReshard::create_new_bucket_instance(int new_num_shards,
- RGWBucketInfo& new_bucket_info)
+ RGWBucketInfo& new_bucket_info,
+ const DoutPrefixProvider *dpp)
{
return ::create_new_bucket_instance(store, new_num_shards,
- bucket_info, bucket_attrs, new_bucket_info);
+ bucket_info, bucket_attrs, new_bucket_info, dpp);
}
-int RGWBucketReshard::cancel()
+int RGWBucketReshard::cancel(const DoutPrefixProvider *dpp)
{
- int ret = reshard_lock.lock();
+ int ret = reshard_lock.lock(dpp);
if (ret < 0) {
return ret;
}
- ret = clear_resharding();
+ ret = clear_resharding(dpp);
reshard_lock.unlock();
return ret;
class BucketInfoReshardUpdate
{
- rgw::sal::RGWRadosStore *store;
+ const DoutPrefixProvider *dpp;
+ rgw::sal::RadosStore* store;
RGWBucketInfo& bucket_info;
std::map<string, bufferlist> bucket_attrs;
bool in_progress{false};
- int set_status(cls_rgw_reshard_status s) {
+ int set_status(cls_rgw_reshard_status s, const DoutPrefixProvider *dpp) {
bucket_info.reshard_status = s;
- int ret = store->getRados()->put_bucket_instance_info(bucket_info, false, real_time(), &bucket_attrs);
+ int ret = store->getRados()->put_bucket_instance_info(bucket_info, false, real_time(), &bucket_attrs, dpp);
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to write bucket info, ret=" << ret << dendl;
+ ldpp_dout(dpp, 0) << "ERROR: failed to write bucket info, ret=" << ret << dendl;
return ret;
}
return 0;
}
public:
- BucketInfoReshardUpdate(rgw::sal::RGWRadosStore *_store,
+ BucketInfoReshardUpdate(const DoutPrefixProvider *_dpp,
+ rgw::sal::RadosStore* _store,
RGWBucketInfo& _bucket_info,
map<string, bufferlist>& _bucket_attrs,
const string& new_bucket_id) :
+ dpp(_dpp),
store(_store),
bucket_info(_bucket_info),
bucket_attrs(_bucket_attrs)
if (in_progress) {
// resharding must not have ended correctly, clean up
int ret =
- RGWBucketReshard::clear_index_shard_reshard_status(store, bucket_info);
+ RGWBucketReshard::clear_index_shard_reshard_status(dpp, store, bucket_info);
if (ret < 0) {
- lderr(store->ctx()) << "Error: " << __func__ <<
+ ldpp_dout(dpp, -1) << "Error: " << __func__ <<
" clear_index_shard_status returned " << ret << dendl;
}
bucket_info.new_bucket_instance_id.clear();
// clears new_bucket_instance as well
- set_status(cls_rgw_reshard_status::NOT_RESHARDING);
+ set_status(cls_rgw_reshard_status::NOT_RESHARDING, dpp);
}
}
int start() {
- int ret = set_status(cls_rgw_reshard_status::IN_PROGRESS);
+ int ret = set_status(cls_rgw_reshard_status::IN_PROGRESS, dpp);
if (ret < 0) {
return ret;
}
}
int complete() {
- int ret = set_status(cls_rgw_reshard_status::DONE);
+ int ret = set_status(cls_rgw_reshard_status::DONE, dpp);
if (ret < 0) {
return ret;
}
};
-RGWBucketReshardLock::RGWBucketReshardLock(rgw::sal::RGWRadosStore* _store,
+RGWBucketReshardLock::RGWBucketReshardLock(rgw::sal::RadosStore* _store,
const std::string& reshard_lock_oid,
bool _ephemeral) :
store(_store),
internal_lock.set_duration(duration);
}
-int RGWBucketReshardLock::lock() {
+int RGWBucketReshardLock::lock(const DoutPrefixProvider *dpp) {
internal_lock.set_must_renew(false);
+
int ret;
if (ephemeral) {
ret = internal_lock.lock_exclusive_ephemeral(&store->getRados()->reshard_pool_ctx,
} else {
ret = internal_lock.lock_exclusive(&store->getRados()->reshard_pool_ctx, lock_oid);
}
- if (ret < 0) {
- ldout(store->ctx(), 0) << "RGWReshardLock::" << __func__ <<
- " failed to acquire lock on " << lock_oid << " ret=" << ret << dendl;
+
+ if (ret == -EBUSY) {
+ ldout(store->ctx(), 0) << "INFO: RGWReshardLock::" << __func__ <<
+ " found lock on " << lock_oid <<
+ " to be held by another RGW process; skipping for now" << dendl;
+ return ret;
+ } else if (ret < 0) {
+ ldpp_dout(dpp, -1) << "ERROR: RGWReshardLock::" << __func__ <<
+ " failed to acquire lock on " << lock_oid << ": " <<
+ cpp_strerror(-ret) << dendl;
return ret;
}
+
reset_time(Clock::now());
return 0;
int max_entries,
bool verbose,
ostream *out,
- Formatter *formatter)
+ Formatter *formatter,
+ const DoutPrefixProvider *dpp)
{
if (out) {
const rgw_bucket& bucket = bucket_info.bucket;
list<rgw_cls_bi_entry> entries;
if (max_entries < 0) {
- ldout(store->ctx(), 0) << __func__ <<
+ ldpp_dout(dpp, 0) << __func__ <<
": can't reshard, negative max_entries" << dendl;
return -EINVAL;
}
// NB: destructor cleans up sharding state if reshard does not
// complete successfully
- BucketInfoReshardUpdate bucket_info_updater(store, bucket_info, bucket_attrs, new_bucket_info.bucket.bucket_id);
+ BucketInfoReshardUpdate bucket_info_updater(dpp, store, bucket_info, bucket_attrs, new_bucket_info.bucket.bucket_id);
int ret = bucket_info_updater.start();
if (ret < 0) {
- ldout(store->ctx(), 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl;
+ ldpp_dout(dpp, 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl;
return ret;
}
int num_target_shards = (new_bucket_info.layout.current_index.layout.normal.num_shards > 0 ? new_bucket_info.layout.current_index.layout.normal.num_shards : 1);
- BucketReshardManager target_shards_mgr(store, new_bucket_info, num_target_shards);
+ BucketReshardManager target_shards_mgr(dpp, store, new_bucket_info, num_target_shards);
bool verbose_json_out = verbose && (formatter != nullptr) && (out != nullptr);
for (int i = 0; i < num_source_shards; ++i) {
bool is_truncated = true;
marker.clear();
+ const std::string null_object_filter; // empty string since we're not filtering by object
while (is_truncated) {
entries.clear();
- ret = store->getRados()->bi_list(bucket_info, i, string(), marker, max_entries, &entries, &is_truncated);
+ ret = store->getRados()->bi_list(dpp, bucket_info, i, null_object_filter, marker, max_entries, &entries, &is_truncated);
if (ret < 0 && ret != -ENOENT) {
derr << "ERROR: bi_list(): " << cpp_strerror(-ret) << dendl;
return ret;
}
int ret = store->getRados()->get_target_shard_id(new_bucket_info.layout.current_index.layout.normal, obj.get_hash_object(), &target_shard_id);
if (ret < 0) {
- lderr(store->ctx()) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl;
+ ldpp_dout(dpp, -1) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl;
return ret;
}
}
ret = reshard_lock.renew(now);
if (ret < 0) {
- lderr(store->ctx()) << "Error renewing bucket lock: " << ret << dendl;
+ ldpp_dout(dpp, -1) << "Error renewing bucket lock: " << ret << dendl;
return ret;
}
}
ret = target_shards_mgr.finish();
if (ret < 0) {
- lderr(store->ctx()) << "ERROR: failed to reshard" << dendl;
+ ldpp_dout(dpp, -1) << "ERROR: failed to reshard" << dendl;
return -EIO;
}
- ret = store->ctl()->bucket->link_bucket(new_bucket_info.owner, new_bucket_info.bucket, bucket_info.creation_time, null_yield);
+ ret = store->ctl()->bucket->link_bucket(new_bucket_info.owner, new_bucket_info.bucket, bucket_info.creation_time, null_yield, dpp);
if (ret < 0) {
- lderr(store->ctx()) << "failed to link new bucket instance (bucket_id=" << new_bucket_info.bucket.bucket_id << ": " << cpp_strerror(-ret) << ")" << dendl;
+ ldpp_dout(dpp, -1) << "failed to link new bucket instance (bucket_id=" << new_bucket_info.bucket.bucket_id << ": " << cpp_strerror(-ret) << ")" << dendl;
return ret;
}
ret = bucket_info_updater.complete();
if (ret < 0) {
- ldout(store->ctx(), 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl;
+ ldpp_dout(dpp, 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl;
/* don't error out, reshard process succeeded */
}
// NB: some error clean-up is done by ~BucketInfoReshardUpdate
} // RGWBucketReshard::do_reshard
-int RGWBucketReshard::get_status(list<cls_rgw_bucket_instance_entry> *status)
+int RGWBucketReshard::get_status(const DoutPrefixProvider *dpp, list<cls_rgw_bucket_instance_entry> *status)
{
- return store->svc()->bi_rados->get_reshard_status(bucket_info, status);
+ return store->svc()->bi_rados->get_reshard_status(dpp, bucket_info, status);
}
int RGWBucketReshard::execute(int num_shards, int max_op_entries,
+ const DoutPrefixProvider *dpp,
bool verbose, ostream *out, Formatter *formatter,
RGWReshard* reshard_log)
{
- int ret = reshard_lock.lock();
+ int ret = reshard_lock.lock(dpp);
if (ret < 0) {
return ret;
}
RGWBucketInfo new_bucket_info;
- ret = create_new_bucket_instance(num_shards, new_bucket_info);
+ ret = create_new_bucket_instance(num_shards, new_bucket_info, dpp);
if (ret < 0) {
// shard state is uncertain, but this will attempt to remove them anyway
goto error_out;
}
if (reshard_log) {
- ret = reshard_log->update(bucket_info, new_bucket_info);
+ ret = reshard_log->update(dpp, bucket_info, new_bucket_info);
if (ret < 0) {
goto error_out;
}
// set resharding status of current bucket_info & shards with
// information about planned resharding
- ret = set_resharding_status(new_bucket_info.bucket.bucket_id,
+ ret = set_resharding_status(dpp, new_bucket_info.bucket.bucket_id,
num_shards, cls_rgw_reshard_status::IN_PROGRESS);
if (ret < 0) {
goto error_out;
ret = do_reshard(num_shards,
new_bucket_info,
max_op_entries,
- verbose, out, formatter);
+ verbose, out, formatter, dpp);
if (ret < 0) {
goto error_out;
}
// resharding successful, so remove old bucket index shards; use
// best effort and don't report out an error; the lock isn't needed
- // at this point since all we're using a best effor to to remove old
+ // at this point since all we're using a best effort to remove old
// shard objects
- ret = store->svc()->bi->clean_index(bucket_info);
+ ret = store->svc()->bi->clean_index(dpp, bucket_info);
if (ret < 0) {
- lderr(store->ctx()) << "Error: " << __func__ <<
+ ldpp_dout(dpp, -1) << "Error: " << __func__ <<
" failed to clean up old shards; " <<
"RGWRados::clean_bucket_index returned " << ret << dendl;
}
ret = store->ctl()->bucket->remove_bucket_instance_info(bucket_info.bucket,
- bucket_info, null_yield);
+ bucket_info, null_yield, dpp);
if (ret < 0) {
- lderr(store->ctx()) << "Error: " << __func__ <<
+ ldpp_dout(dpp, -1) << "Error: " << __func__ <<
" failed to clean old bucket info object \"" <<
bucket_info.bucket.get_key() <<
"\"created after successful resharding with error " << ret << dendl;
}
- ldout(store->ctx(), 1) << __func__ <<
+ ldpp_dout(dpp, 1) << __func__ <<
" INFO: reshard of bucket \"" << bucket_info.bucket.name << "\" from \"" <<
bucket_info.bucket.get_key() << "\" to \"" <<
new_bucket_info.bucket.get_key() << "\" completed successfully" << dendl;
// since the real problem is the issue that led to this error code
// path, we won't touch ret and instead use another variable to
// temporarily error codes
- int ret2 = store->svc()->bi->clean_index(new_bucket_info);
+ int ret2 = store->svc()->bi->clean_index(dpp, new_bucket_info);
if (ret2 < 0) {
- lderr(store->ctx()) << "Error: " << __func__ <<
+ ldpp_dout(dpp, -1) << "Error: " << __func__ <<
" failed to clean up shards from failed incomplete resharding; " <<
"RGWRados::clean_bucket_index returned " << ret2 << dendl;
}
ret2 = store->ctl()->bucket->remove_bucket_instance_info(new_bucket_info.bucket,
new_bucket_info,
- null_yield);
+ null_yield, dpp);
if (ret2 < 0) {
- lderr(store->ctx()) << "Error: " << __func__ <<
+ ldpp_dout(dpp, -1) << "Error: " << __func__ <<
" failed to clean bucket info object \"" <<
new_bucket_info.bucket.get_key() <<
"\"created during incomplete resharding with error " << ret2 << dendl;
} // execute
-RGWReshard::RGWReshard(rgw::sal::RGWRadosStore* _store, bool _verbose, ostream *_out,
+RGWReshard::RGWReshard(rgw::sal::RadosStore* _store, bool _verbose, ostream *_out,
Formatter *_formatter) :
store(_store), instance_lock(bucket_instance_lock_name),
verbose(_verbose), out(_out), formatter(_formatter)
get_logshard_oid(int(sid), oid);
}
-int RGWReshard::add(cls_rgw_reshard_entry& entry)
+int RGWReshard::add(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry)
{
if (!store->svc()->zone->can_reshard()) {
- ldout(store->ctx(), 20) << __func__ << " Resharding is disabled" << dendl;
+ ldpp_dout(dpp, 20) << __func__ << " Resharding is disabled" << dendl;
return 0;
}
librados::ObjectWriteOperation op;
cls_rgw_reshard_add(op, entry);
- int ret = rgw_rados_operate(store->getRados()->reshard_pool_ctx, logshard_oid, &op, null_yield);
+ int ret = rgw_rados_operate(dpp, store->getRados()->reshard_pool_ctx, logshard_oid, &op, null_yield);
if (ret < 0) {
- lderr(store->ctx()) << "ERROR: failed to add entry to reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
+ ldpp_dout(dpp, -1) << "ERROR: failed to add entry to reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
return ret;
}
return 0;
}
-int RGWReshard::update(const RGWBucketInfo& bucket_info, const RGWBucketInfo& new_bucket_info)
+int RGWReshard::update(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const RGWBucketInfo& new_bucket_info)
{
cls_rgw_reshard_entry entry;
entry.bucket_name = bucket_info.bucket.name;
entry.bucket_id = bucket_info.bucket.bucket_id;
entry.tenant = bucket_info.owner.tenant;
- int ret = get(entry);
+ int ret = get(dpp, entry);
if (ret < 0) {
return ret;
}
entry.new_instance_id = new_bucket_info.bucket.name + ":" + new_bucket_info.bucket.bucket_id;
- ret = add(entry);
+ ret = add(dpp, entry);
if (ret < 0) {
- ldout(store->ctx(), 0) << __func__ << ":Error in updating entry bucket " << entry.bucket_name << ": " <<
+ ldpp_dout(dpp, 0) << __func__ << ":Error in updating entry bucket " << entry.bucket_name << ": " <<
cpp_strerror(-ret) << dendl;
}
}
-int RGWReshard::list(int logshard_num, string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated)
+int RGWReshard::list(const DoutPrefixProvider *dpp, int logshard_num, string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated)
{
string logshard_oid;
int ret = cls_rgw_reshard_list(store->getRados()->reshard_pool_ctx, logshard_oid, marker, max, entries, is_truncated);
- if (ret < 0) {
- lderr(store->ctx()) << "ERROR: failed to list reshard log entries, oid=" << logshard_oid << " "
- << "marker=" << marker << " " << cpp_strerror(ret) << dendl;
- if (ret == -ENOENT) {
- *is_truncated = false;
- ret = 0;
- } else {
- if (ret == -EACCES) {
- lderr(store->ctx()) << "access denied to pool " << store->svc()->zone->get_zone_params().reshard_pool
- << ". Fix the pool access permissions of your client" << dendl;
- }
- }
+ if (ret == -ENOENT) {
+ // these shard objects aren't created until we actually write something to
+ // them, so treat ENOENT as a successful empty listing
+ *is_truncated = false;
+ ret = 0;
+ } else if (ret == -EACCES) {
+ ldpp_dout(dpp, -1) << "ERROR: access denied to pool " << store->svc()->zone->get_zone_params().reshard_pool
+ << ". Fix the pool access permissions of your client" << dendl;
+ } else if (ret < 0) {
+ ldpp_dout(dpp, -1) << "ERROR: failed to list reshard log entries, oid="
+ << logshard_oid << " marker=" << marker << " " << cpp_strerror(ret) << dendl;
}
return ret;
}
-int RGWReshard::get(cls_rgw_reshard_entry& entry)
+int RGWReshard::get(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry)
{
string logshard_oid;
int ret = cls_rgw_reshard_get(store->getRados()->reshard_pool_ctx, logshard_oid, entry);
if (ret < 0) {
if (ret != -ENOENT) {
- lderr(store->ctx()) << "ERROR: failed to get entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant <<
+ ldpp_dout(dpp, -1) << "ERROR: failed to get entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant <<
" bucket=" << entry.bucket_name << dendl;
}
return ret;
return 0;
}
-int RGWReshard::remove(cls_rgw_reshard_entry& entry)
+int RGWReshard::remove(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry)
{
string logshard_oid;
librados::ObjectWriteOperation op;
cls_rgw_reshard_remove(op, entry);
- int ret = rgw_rados_operate(store->getRados()->reshard_pool_ctx, logshard_oid, &op, null_yield);
+ int ret = rgw_rados_operate(dpp, store->getRados()->reshard_pool_ctx, logshard_oid, &op, null_yield);
if (ret < 0) {
- lderr(store->ctx()) << "ERROR: failed to remove entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
+ ldpp_dout(dpp, -1) << "ERROR: failed to remove entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
return ret;
}
return ret;
}
-int RGWReshard::clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry)
+int RGWReshard::clear_bucket_resharding(const DoutPrefixProvider *dpp, const string& bucket_instance_oid, cls_rgw_reshard_entry& entry)
{
int ret = cls_rgw_clear_bucket_resharding(store->getRados()->reshard_pool_ctx, bucket_instance_oid);
if (ret < 0) {
- lderr(store->ctx()) << "ERROR: failed to clear bucket resharding, bucket_instance_oid=" << bucket_instance_oid << dendl;
+ ldpp_dout(dpp, -1) << "ERROR: failed to clear bucket resharding, bucket_instance_oid=" << bucket_instance_oid << dendl;
return ret;
}
}
}
-int RGWReshard::process_single_logshard(int logshard_num)
+int RGWReshard::process_single_logshard(int logshard_num, const DoutPrefixProvider *dpp)
{
string marker;
bool truncated = true;
- CephContext *cct = store->ctx();
constexpr uint32_t max_entries = 1000;
string logshard_oid;
RGWBucketReshardLock logshard_lock(store, logshard_oid, false);
- int ret = logshard_lock.lock();
+ int ret = logshard_lock.lock(dpp);
if (ret < 0) {
- ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " <<
+ ldpp_dout(dpp, 5) << __func__ << "(): failed to acquire lock on " <<
logshard_oid << ", ret = " << ret <<dendl;
return ret;
}
do {
std::list<cls_rgw_reshard_entry> entries;
- ret = list(logshard_num, marker, max_entries, entries, &truncated);
+ ret = list(dpp, logshard_num, marker, max_entries, entries, &truncated);
if (ret < 0) {
- ldout(cct, 10) << "cannot list all reshards in logshard oid=" <<
+ ldpp_dout(dpp, 10) << "cannot list all reshards in logshard oid=" <<
logshard_oid << dendl;
continue;
}
for(auto& entry: entries) { // logshard entries
if(entry.new_instance_id.empty()) {
- ldout(store->ctx(), 20) << __func__ << " resharding " <<
+ ldpp_dout(dpp, 20) << __func__ << " resharding " <<
entry.bucket_name << dendl;
rgw_bucket bucket;
ret = store->getRados()->get_bucket_info(store->svc(),
entry.tenant, entry.bucket_name,
bucket_info, nullptr,
- null_yield, &attrs);
+ null_yield, dpp, &attrs);
if (ret < 0 || bucket_info.bucket.bucket_id != entry.bucket_id) {
if (ret < 0) {
- ldout(cct, 0) << __func__ <<
+ ldpp_dout(dpp, 0) << __func__ <<
": Error in get_bucket_info for bucket " << entry.bucket_name <<
": " << cpp_strerror(-ret) << dendl;
if (ret != -ENOENT) {
return ret;
}
} else {
- ldout(cct,0) << __func__ <<
+ ldpp_dout(dpp, 0) << __func__ <<
": Bucket: " << entry.bucket_name <<
" already resharded by someone, skipping " << dendl;
}
// we've encountered a reshard queue entry for an apparently
// non-existent bucket; let's try to recover by cleaning up
- ldout(cct, 0) << __func__ <<
+ ldpp_dout(dpp, 0) << __func__ <<
": removing reshard queue entry for a resharded or non-existent bucket" <<
entry.bucket_name << dendl;
- ret = remove(entry);
+ ret = remove(dpp, entry);
if (ret < 0) {
- ldout(cct, 0) << __func__ <<
+ ldpp_dout(dpp, 0) << __func__ <<
": Error removing non-existent bucket " <<
entry.bucket_name << " from resharding queue: " <<
cpp_strerror(-ret) << dendl;
}
RGWBucketReshard br(store, bucket_info, attrs, nullptr);
- ret = br.execute(entry.new_num_shards, max_entries, false, nullptr,
+ ret = br.execute(entry.new_num_shards, max_entries, dpp, false, nullptr,
nullptr, this);
if (ret < 0) {
- ldout(store->ctx(), 0) << __func__ <<
+ ldpp_dout(dpp, 0) << __func__ <<
": Error during resharding bucket " << entry.bucket_name << ":" <<
cpp_strerror(-ret)<< dendl;
return ret;
}
- ldout(store->ctx(), 20) << __func__ <<
+ ldpp_dout(dpp, 20) << __func__ <<
" removing reshard queue entry for bucket " << entry.bucket_name <<
dendl;
- ret = remove(entry);
+ ret = remove(dpp, entry);
if (ret < 0) {
- ldout(cct, 0) << __func__ << ": Error removing bucket " <<
+ ldpp_dout(dpp, 0) << __func__ << ": Error removing bucket " <<
entry.bucket_name << " from resharding queue: " <<
cpp_strerror(-ret) << dendl;
return ret;
*logshard = objname + buf;
}
-int RGWReshard::process_all_logshards()
+int RGWReshard::process_all_logshards(const DoutPrefixProvider *dpp)
{
if (!store->svc()->zone->can_reshard()) {
- ldout(store->ctx(), 20) << __func__ << " Resharding is disabled" << dendl;
+ ldpp_dout(dpp, 20) << __func__ << " Resharding is disabled" << dendl;
return 0;
}
int ret = 0;
string logshard;
get_logshard_oid(i, &logshard);
- ldout(store->ctx(), 20) << "processing logshard = " << logshard << dendl;
+ ldpp_dout(dpp, 20) << "processing logshard = " << logshard << dendl;
- ret = process_single_logshard(i);
+ ret = process_single_logshard(i, dpp);
- ldout(store->ctx(), 20) << "finish processing logshard = " << logshard << " , ret = " << ret << dendl;
+ ldpp_dout(dpp, 20) << "finish processing logshard = " << logshard << " , ret = " << ret << dendl;
}
return 0;
void *RGWReshard::ReshardWorker::entry() {
do {
utime_t start = ceph_clock_now();
- reshard->process_all_logshards();
+ reshard->process_all_logshards(this);
if (reshard->going_down())
break;
std::lock_guard l{lock};
cond.notify_all();
}
+
+CephContext *RGWReshard::ReshardWorker::get_cct() const
+{
+ return cct;
+}
+
+unsigned RGWReshard::ReshardWorker::get_subsys() const
+{
+ return dout_subsys;
+}
+
+std::ostream& RGWReshard::ReshardWorker::gen_prefix(std::ostream& out) const
+{
+ return out << "rgw reshard worker thread: ";
+}