const string BucketIndexShardsManager::KEY_VALUE_SEPARATOR = "#";
const string BucketIndexShardsManager::SHARDS_SEPARATOR = ",";
+
+int CLSRGWConcurrentIO::operator()() {
+ int ret = 0;
+ iter = objs_container.begin();
+ for (; iter != objs_container.end() && max_aio-- > 0; ++iter) {
+ ret = issue_op(iter->first, iter->second);
+ if (ret < 0)
+ break;
+ }
+
+ int num_completions = 0, r = 0;
+ std::map<int, std::string> completed_objs;
+ std::map<int, std::string> retry_objs;
+ while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r,
+ need_multiple_rounds() ? &completed_objs : nullptr,
+ !need_multiple_rounds() ? &retry_objs : nullptr)) {
+ if (r >= 0 && ret >= 0) {
+ for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) {
+ int issue_ret = issue_op(iter->first, iter->second);
+ if (issue_ret < 0) {
+ ret = issue_ret;
+ break;
+ }
+ }
+ } else if (ret >= 0) {
+ ret = r;
+ }
+
+ // if we're at the end with this round, see if another round is needed
+ if (iter == objs_container.end()) {
+ if (need_multiple_rounds() && !completed_objs.empty()) {
+ // For those objects which need another round, use them to reset
+ // the container
+ reset_container(completed_objs);
+ iter = objs_container.begin();
+ } else if (! need_multiple_rounds() && !retry_objs.empty()) {
+ reset_container(retry_objs);
+ iter = objs_container.begin();
+ }
+
+ // re-issue ops if container was reset above (i.e., iter !=
+ // objs_container.end()); if it was not reset above (i.e., iter
+ // == objs_container.end()) the loop will exit immediately
+ // without iterating
+ for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) {
+ int issue_ret = issue_op(iter->first, iter->second);
+ if (issue_ret < 0) {
+ ret = issue_ret;
+ break;
+ }
+ }
+ }
+ }
+
+ if (ret < 0) {
+ cleanup();
+ }
+ return ret;
+} // CLSRGWConcurrintIO::operator()()
+
+
/**
* This class represents the bucket index object operation callback context.
*/
ClsBucketIndexOpCtx(T* _data, int *_ret_code) : data(_data), ret_code(_ret_code) { ceph_assert(data); }
~ClsBucketIndexOpCtx() override {}
void handle_completion(int r, bufferlist& outbl) override {
- if (r >= 0) {
+ // if successful, or we're asked for a retry, copy result into
+ // destination (*data)
+ if (r >= 0 || r == RGWBIAdvanceAndRetryError) {
try {
auto iter = outbl.cbegin();
decode((*data), iter);
}
};
-void BucketIndexAioManager::do_completion(int id) {
+void BucketIndexAioManager::do_completion(const int request_id) {
std::lock_guard l{lock};
- auto iter = pendings.find(id);
+ auto iter = pendings.find(request_id);
ceph_assert(iter != pendings.end());
- completions[id] = iter->second;
+ completions[request_id] = iter->second;
pendings.erase(iter);
// If the caller needs a list of finished objects, store them
// for further processing
- auto miter = pending_objs.find(id);
+ auto miter = pending_objs.find(request_id);
if (miter != pending_objs.end()) {
- completion_objs[id] = miter->second;
+ completion_objs.emplace(request_id, miter->second);
pending_objs.erase(miter);
}
}
bool BucketIndexAioManager::wait_for_completions(int valid_ret_code,
- int *num_completions, int *ret_code, map<int, string> *objs) {
+ int *num_completions,
+ int *ret_code,
+ std::map<int, std::string> *completed_objs,
+ std::map<int, std::string> *retry_objs)
+{
std::unique_lock locker{lock};
if (pendings.empty() && completions.empty()) {
return false;
auto iter = completions.begin();
for (; iter != completions.end(); ++iter) {
int r = iter->second->get_return_value();
- if (objs && r == 0) { /* update list of successfully completed objs */
+
+ // see if we may need to copy completions or retries
+ if (completed_objs || retry_objs) {
auto liter = completion_objs.find(iter->first);
if (liter != completion_objs.end()) {
- (*objs)[liter->first] = liter->second;
+ if (completed_objs && r == 0) { /* update list of successfully completed objs */
+ (*completed_objs)[liter->second.shard_id] = liter->second.oid;
+ }
+
+ if (r == RGWBIAdvanceAndRetryError) {
+ r = 0;
+ if (retry_objs) {
+ (*retry_objs)[liter->second.shard_id] = liter->second.oid;
+ }
+ }
+ } else {
+ // NB: should we log an error here; currently no logging
+ // context to use
}
}
- if (ret_code && (r < 0 && r != valid_ret_code))
+
+ if (ret_code && (r < 0 && r != valid_ret_code)) {
(*ret_code) = r;
+ }
+
iter->second->release();
}
- if (num_completions)
+
+ if (num_completions) {
(*num_completions) = completions.size();
+ }
+
completions.clear();
return true;
}
static bool issue_bucket_index_init_op(librados::IoCtx& io_ctx,
+ const int shard_id,
const string& oid,
BucketIndexAioManager *manager) {
bufferlist in;
librados::ObjectWriteOperation op;
op.create(true);
op.exec(RGW_CLASS, RGW_BUCKET_INIT_INDEX, in);
- return manager->aio_operate(io_ctx, oid, &op);
+ return manager->aio_operate(io_ctx, shard_id, oid, &op);
}
static bool issue_bucket_index_clean_op(librados::IoCtx& io_ctx,
+ const int shard_id,
const string& oid,
BucketIndexAioManager *manager) {
bufferlist in;
librados::ObjectWriteOperation op;
op.remove();
- return manager->aio_operate(io_ctx, oid, &op);
+ return manager->aio_operate(io_ctx, shard_id, oid, &op);
}
static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx,
- const string& oid, uint64_t timeout, BucketIndexAioManager *manager) {
+ const int shard_id,
+ const string& oid,
+ uint64_t timeout,
+ BucketIndexAioManager *manager) {
bufferlist in;
rgw_cls_tag_timeout_op call;
call.tag_timeout = timeout;
encode(call, in);
ObjectWriteOperation op;
op.exec(RGW_CLASS, RGW_BUCKET_SET_TAG_TIMEOUT, in);
- return manager->aio_operate(io_ctx, oid, &op);
+ return manager->aio_operate(io_ctx, shard_id, oid, &op);
}
-int CLSRGWIssueBucketIndexInit::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueBucketIndexInit::issue_op(const int shard_id, const string& oid)
{
- return issue_bucket_index_init_op(io_ctx, oid, &manager);
+ return issue_bucket_index_init_op(io_ctx, shard_id, oid, &manager);
}
void CLSRGWIssueBucketIndexInit::cleanup()
}
}
-int CLSRGWIssueBucketIndexClean::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueBucketIndexClean::issue_op(const int shard_id, const string& oid)
{
- return issue_bucket_index_clean_op(io_ctx, oid, &manager);
+ return issue_bucket_index_clean_op(io_ctx, shard_id, oid, &manager);
}
-int CLSRGWIssueSetTagTimeout::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueSetTagTimeout::issue_op(const int shard_id, const string& oid)
{
- return issue_bucket_set_tag_timeout_op(io_ctx, oid, tag_timeout, &manager);
+ return issue_bucket_set_tag_timeout_op(io_ctx, shard_id, oid, tag_timeout, &manager);
}
void cls_rgw_bucket_update_stats(librados::ObjectWriteOperation& o,
o.exec(RGW_CLASS, RGW_BUCKET_UPDATE_STATS, in);
}
-void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, RGWModifyOp op, string& tag,
+void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, RGWModifyOp op, const string& tag,
const cls_rgw_obj_key& key, const string& locator, bool log_op,
- uint16_t bilog_flags, rgw_zone_set& zones_trace)
+ uint16_t bilog_flags, const rgw_zone_set& zones_trace)
{
rgw_cls_obj_prepare_op call;
call.op = op;
o.exec(RGW_CLASS, RGW_BUCKET_PREPARE_OP, in);
}
-void cls_rgw_bucket_complete_op(ObjectWriteOperation& o, RGWModifyOp op, string& tag,
- rgw_bucket_entry_ver& ver,
+void cls_rgw_bucket_complete_op(ObjectWriteOperation& o, RGWModifyOp op, const string& tag,
+ const rgw_bucket_entry_ver& ver,
const cls_rgw_obj_key& key,
- rgw_bucket_dir_entry_meta& dir_meta,
- list<cls_rgw_obj_key> *remove_objs, bool log_op,
+ const rgw_bucket_dir_entry_meta& dir_meta,
+ const list<cls_rgw_obj_key> *remove_objs, bool log_op,
uint16_t bilog_flags,
- rgw_zone_set *zones_trace)
+ const rgw_zone_set *zones_trace)
{
bufferlist in;
}
static bool issue_bucket_list_op(librados::IoCtx& io_ctx,
- const string& oid,
+ const int shard_id,
+ const std::string& oid,
const cls_rgw_obj_key& start_obj,
- const string& filter_prefix,
- const string& delimiter,
+ const std::string& filter_prefix,
+ const std::string& delimiter,
uint32_t num_entries,
bool list_versions,
BucketIndexAioManager *manager,
cls_rgw_bucket_list_op(op,
start_obj, filter_prefix, delimiter,
num_entries, list_versions, pdata);
- return manager->aio_operate(io_ctx, oid, &op);
-}
+ return manager->aio_operate(io_ctx, shard_id, oid, &op);
+}
+
+int CLSRGWIssueBucketList::issue_op(const int shard_id, const string& oid)
+{
+ // set the marker depending on whether we've already queried this
+ // shard and gotten a RGWBIAdvanceAndRetryError (defined
+ // constant) return value; if we have use the marker in the return
+ // to advance the search, otherwise use the marker passed in by the
+ // caller
+ cls_rgw_obj_key marker;
+ auto iter = result.find(shard_id);
+ if (iter != result.end()) {
+ marker = iter->second.marker;
+ } else {
+ marker = start_obj;
+ }
-int CLSRGWIssueBucketList::issue_op(int shard_id, const string& oid)
-{
- return issue_bucket_list_op(io_ctx, oid,
- start_obj, filter_prefix, delimiter,
+ return issue_bucket_list_op(io_ctx, shard_id, oid,
+ marker, filter_prefix, delimiter,
num_entries, list_versions, &manager,
&result[shard_id]);
}
+
+void CLSRGWIssueBucketList::reset_container(std::map<int, std::string>& objs)
+{
+ objs_container.swap(objs);
+ iter = objs_container.begin();
+ objs.clear();
+}
+
+
void cls_rgw_remove_obj(librados::ObjectWriteOperation& o, list<string>& keep_attr_prefixes)
{
bufferlist in;
}
int cls_rgw_bi_get(librados::IoCtx& io_ctx, const string oid,
- BIIndexType index_type, cls_rgw_obj_key& key,
+ BIIndexType index_type, const cls_rgw_obj_key& key,
rgw_cls_bi_entry *entry)
{
bufferlist in, out;
return 0;
}
-int cls_rgw_bi_put(librados::IoCtx& io_ctx, const string oid, rgw_cls_bi_entry& entry)
+int cls_rgw_bi_put(librados::IoCtx& io_ctx, const string oid, const rgw_cls_bi_entry& entry)
{
bufferlist in, out;
rgw_cls_bi_put_op call;
return 0;
}
-void cls_rgw_bi_put(ObjectWriteOperation& op, const string oid, rgw_cls_bi_entry& entry)
+void cls_rgw_bi_put(ObjectWriteOperation& op, const string oid, const rgw_cls_bi_entry& entry)
{
bufferlist in, out;
rgw_cls_bi_put_op call;
op.exec(RGW_CLASS, RGW_BI_PUT, in);
}
-int cls_rgw_bi_list(librados::IoCtx& io_ctx, const string oid,
- const string& name, const string& marker, uint32_t max,
- list<rgw_cls_bi_entry> *entries, bool *is_truncated)
+/* nb: any entries passed in are replaced with the results of the cls
+ * call, so caller does not need to clear entries between calls
+ */
+int cls_rgw_bi_list(librados::IoCtx& io_ctx, const std::string& oid,
+ const std::string& name_filter, const std::string& marker, uint32_t max,
+ std::list<rgw_cls_bi_entry> *entries, bool *is_truncated)
{
bufferlist in, out;
rgw_cls_bi_list_op call;
- call.name = name;
+ call.name_filter = name_filter;
call.marker = marker;
call.max = max;
encode(call, in);
return 0;
}
-int cls_rgw_bucket_link_olh(librados::IoCtx& io_ctx, const string& oid,
- const cls_rgw_obj_key& key, bufferlist& olh_tag,
- bool delete_marker, const string& op_tag, rgw_bucket_dir_entry_meta *meta,
- uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time, bool log_op, rgw_zone_set& zones_trace)
+int cls_rgw_bucket_link_olh(librados::IoCtx& io_ctx, const string& oid,
+ const cls_rgw_obj_key& key, const bufferlist& olh_tag,
+ bool delete_marker, const string& op_tag, const rgw_bucket_dir_entry_meta *meta,
+ uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time, bool log_op, const rgw_zone_set& zones_trace)
{
librados::ObjectWriteOperation op;
cls_rgw_bucket_link_olh(op, key, olh_tag, delete_marker, op_tag, meta,
void cls_rgw_bucket_link_olh(librados::ObjectWriteOperation& op, const cls_rgw_obj_key& key,
- bufferlist& olh_tag, bool delete_marker,
- const string& op_tag, rgw_bucket_dir_entry_meta *meta,
- uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time, bool log_op, rgw_zone_set& zones_trace)
+ const bufferlist& olh_tag, bool delete_marker,
+ const string& op_tag, const rgw_bucket_dir_entry_meta *meta,
+ uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time, bool log_op, const rgw_zone_set& zones_trace)
{
bufferlist in, out;
rgw_cls_link_olh_op call;
call.key = key;
- call.olh_tag = string(olh_tag.c_str(), olh_tag.length());
+ call.olh_tag = olh_tag.to_str();
call.op_tag = op_tag;
call.delete_marker = delete_marker;
if (meta) {
int cls_rgw_bucket_unlink_instance(librados::IoCtx& io_ctx, const string& oid,
const cls_rgw_obj_key& key, const string& op_tag,
- const string& olh_tag, uint64_t olh_epoch, bool log_op, rgw_zone_set& zones_trace)
+ const string& olh_tag, uint64_t olh_epoch, bool log_op, const rgw_zone_set& zones_trace)
{
librados::ObjectWriteOperation op;
cls_rgw_bucket_unlink_instance(op, key, op_tag, olh_tag, olh_epoch, log_op, zones_trace);
void cls_rgw_bucket_unlink_instance(librados::ObjectWriteOperation& op,
const cls_rgw_obj_key& key, const string& op_tag,
- const string& olh_tag, uint64_t olh_epoch, bool log_op, rgw_zone_set& zones_trace)
+ const string& olh_tag, uint64_t olh_epoch, bool log_op, const rgw_zone_set& zones_trace)
{
bufferlist in, out;
rgw_cls_unlink_instance_op call;
op.exec(RGW_CLASS, RGW_BI_LOG_LIST, in, new ClsBucketIndexOpCtx<cls_rgw_bi_log_list_ret>(pdata, ret));
}
-static bool issue_bi_log_list_op(librados::IoCtx& io_ctx, const string& oid, int shard_id,
+static bool issue_bi_log_list_op(librados::IoCtx& io_ctx, const string& oid, const int shard_id,
BucketIndexShardsManager& marker_mgr, uint32_t max,
BucketIndexAioManager *manager,
cls_rgw_bi_log_list_ret *pdata)
{
librados::ObjectReadOperation op;
cls_rgw_bilog_list(op, marker_mgr.get(shard_id, ""), max, pdata, nullptr);
- return manager->aio_operate(io_ctx, oid, &op);
+ return manager->aio_operate(io_ctx, shard_id, oid, &op);
}
-int CLSRGWIssueBILogList::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueBILogList::issue_op(const int shard_id, const string& oid)
{
return issue_bi_log_list_op(io_ctx, oid, shard_id, marker_mgr, max, &manager, &result[shard_id]);
}
op.exec(RGW_CLASS, RGW_BI_LOG_TRIM, in);
}
-static bool issue_bi_log_trim(librados::IoCtx& io_ctx, const string& oid, int shard_id,
+static bool issue_bi_log_trim(librados::IoCtx& io_ctx, const string& oid, const int shard_id,
BucketIndexShardsManager& start_marker_mgr,
BucketIndexShardsManager& end_marker_mgr, BucketIndexAioManager *manager) {
cls_rgw_bi_log_trim_op call;
librados::ObjectWriteOperation op;
cls_rgw_bilog_trim(op, start_marker_mgr.get(shard_id, ""),
end_marker_mgr.get(shard_id, ""));
- return manager->aio_operate(io_ctx, oid, &op);
+ return manager->aio_operate(io_ctx, shard_id, oid, &op);
}
-int CLSRGWIssueBILogTrim::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueBILogTrim::issue_op(const int shard_id, const string& oid)
{
return issue_bi_log_trim(io_ctx, oid, shard_id, start_marker_mgr, end_marker_mgr, &manager);
}
-static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager,
+static bool issue_bucket_check_index_op(IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager,
rgw_cls_check_index_ret *pdata) {
bufferlist in;
librados::ObjectReadOperation op;
op.exec(RGW_CLASS, RGW_BUCKET_CHECK_INDEX, in, new ClsBucketIndexOpCtx<rgw_cls_check_index_ret>(
pdata, NULL));
- return manager->aio_operate(io_ctx, oid, &op);
+ return manager->aio_operate(io_ctx, shard_id, oid, &op);
}
int CLSRGWIssueBucketCheck::issue_op(int shard_id, const string& oid)
{
- return issue_bucket_check_index_op(io_ctx, oid, &manager, &result[shard_id]);
+ return issue_bucket_check_index_op(io_ctx, shard_id, oid, &manager, &result[shard_id]);
}
-static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const string& oid,
+static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const int shard_id, const string& oid,
BucketIndexAioManager *manager) {
bufferlist in;
librados::ObjectWriteOperation op;
op.exec(RGW_CLASS, RGW_BUCKET_REBUILD_INDEX, in);
- return manager->aio_operate(io_ctx, oid, &op);
+ return manager->aio_operate(io_ctx, shard_id, oid, &op);
}
-int CLSRGWIssueBucketRebuild::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueBucketRebuild::issue_op(const int shard_id, const string& oid)
{
- return issue_bucket_rebuild_index_op(io_ctx, oid, &manager);
+ return issue_bucket_rebuild_index_op(io_ctx, shard_id, oid, &manager);
}
void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates)
o.exec(RGW_CLASS, RGW_DIR_SUGGEST_CHANGES, updates);
}
-int CLSRGWIssueGetDirHeader::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueGetDirHeader::issue_op(const int shard_id, const string& oid)
{
cls_rgw_obj_key empty_key;
string empty_prefix;
string empty_delimiter;
- return issue_bucket_list_op(io_ctx, oid,
+ return issue_bucket_list_op(io_ctx, shard_id, oid,
empty_key, empty_prefix, empty_delimiter,
0, false, &manager, &result[shard_id]);
}
-static bool issue_resync_bi_log(librados::IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager)
+static bool issue_resync_bi_log(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager)
{
bufferlist in;
librados::ObjectWriteOperation op;
op.exec(RGW_CLASS, RGW_BI_LOG_RESYNC, in);
- return manager->aio_operate(io_ctx, oid, &op);
+ return manager->aio_operate(io_ctx, shard_id, oid, &op);
}
-int CLSRGWIssueResyncBucketBILog::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueResyncBucketBILog::issue_op(const int shard_id, const string& oid)
{
- return issue_resync_bi_log(io_ctx, oid, &manager);
+ return issue_resync_bi_log(io_ctx, shard_id, oid, &manager);
}
-static bool issue_bi_log_stop(librados::IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager)
+static bool issue_bi_log_stop(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager)
{
bufferlist in;
librados::ObjectWriteOperation op;
op.exec(RGW_CLASS, RGW_BI_LOG_STOP, in);
- return manager->aio_operate(io_ctx, oid, &op);
+ return manager->aio_operate(io_ctx, shard_id, oid, &op);
}
-int CLSRGWIssueBucketBILogStop::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueBucketBILogStop::issue_op(const int shard_id, const string& oid)
{
- return issue_bi_log_stop(io_ctx, oid, &manager);
+ return issue_bi_log_stop(io_ctx, shard_id, oid, &manager);
}
class GetDirHeaderCompletion : public ObjectOperationCompletion {
op.exec(RGW_CLASS, RGW_GUARD_BUCKET_RESHARDING, in);
}
-static bool issue_set_bucket_resharding(librados::IoCtx& io_ctx, const string& oid,
+static bool issue_set_bucket_resharding(librados::IoCtx& io_ctx,
+ const int shard_id, const string& oid,
const cls_rgw_bucket_instance_entry& entry,
BucketIndexAioManager *manager) {
bufferlist in;
encode(call, in);
librados::ObjectWriteOperation op;
op.exec(RGW_CLASS, RGW_SET_BUCKET_RESHARDING, in);
- return manager->aio_operate(io_ctx, oid, &op);
+ return manager->aio_operate(io_ctx, shard_id, oid, &op);
}
-int CLSRGWIssueSetBucketResharding::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueSetBucketResharding::issue_op(const int shard_id, const string& oid)
{
- return issue_set_bucket_resharding(io_ctx, oid, entry, &manager);
+ return issue_set_bucket_resharding(io_ctx, shard_id, oid, entry, &manager);
}