using namespace std;
+// these values are copied from cls/rgw/cls_rgw.cc
+static const string BI_OLH_ENTRY_NS_START = "\x80" "1001_";
+static const string BI_INSTANCE_ENTRY_NS_START = "\x80" "1000_";
+
+// number of characters that we should allow to be buffered by the formatter
+// before flushing (used by index check methods with dump_keys=true)
+static constexpr int FORMATTER_LEN_FLUSH_THRESHOLD = 4 * 1024 * 1024;
+
// default number of entries to list with each bucket listing call
// (use marker to bridge between calls)
static constexpr size_t listing_max_entries = 1000;
return 0;
}
+/**
+ * Loops over all olh entries in a bucket shard and finds ones with
+ * exists=false and pending_removal=true. If the pending log is empty on
+ * these entries, they were left behind after the last remaining version of
+ * an object was deleted or after an incomplete upload. This was known to
+ * happen historically due to concurrency conflicts among requests referencing
+ * the same object key. If op_state.fix_index is true, we continue where the
+ * request left off by calling RGWRados::clear_olh. If the pending log is not
+ * empty, we attempt to apply it.
+ */
+static int check_index_olh(rgw::sal::RadosStore* const rados_store,
+ rgw::sal::Bucket* const bucket,
+ const DoutPrefixProvider *dpp,
+ RGWBucketAdminOpState& op_state,
+ RGWFormatterFlusher& flusher,
+ const int shard,
+ uint64_t* const count_out,
+ optional_yield y)
+{
+ string marker = BI_OLH_ENTRY_NS_START;
+ bool is_truncated = true;
+ list<rgw_cls_bi_entry> entries;
+
+ RGWObjectCtx obj_ctx(rados_store);
+ RGWRados* store = rados_store->getRados();
+ RGWRados::BucketShard bs(store);
+
+ int ret = bs.init(dpp, bucket->get_info(), bucket->get_info().layout.current_index, shard);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "ERROR bs.init(bucket=" << bucket << "): " << cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+
+ *count_out = 0;
+ do {
+ entries.clear();
+ ret = store->bi_list(bs, "", marker, -1, &entries, &is_truncated);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "ERROR bi_list(): " << cpp_strerror(-ret) << dendl;
+ break;
+ }
+ list<rgw_cls_bi_entry>::iterator iter;
+ for (iter = entries.begin(); iter != entries.end(); ++iter) {
+ rgw_cls_bi_entry& entry = *iter;
+ marker = entry.idx;
+ if (entry.type != BIIndexType::OLH) {
+ is_truncated = false;
+ break;
+ }
+ rgw_bucket_olh_entry olh_entry;
+ auto iiter = entry.data.cbegin();
+ try {
+ decode(olh_entry, iiter);
+ } catch (buffer::error& err) {
+ ldpp_dout(dpp, -1) << "ERROR failed to decode olh entry for key: " << entry.idx << dendl;
+ continue;
+ }
+ if (olh_entry.exists || !olh_entry.pending_removal) {
+ continue;
+ }
+ if (op_state.will_fix_index()) {
+ rgw_obj obj(bucket->get_key(), olh_entry.key.name);
+ if (olh_entry.pending_log.empty()) {
+ ret = store->clear_olh(dpp, obj_ctx, obj, bucket->get_info(), olh_entry.tag, olh_entry.epoch, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "ERROR failed to clear olh for: " << olh_entry.key.name << " clear_olh(): " << cpp_strerror(-ret) << dendl;
+ continue;
+ }
+ } else {
+ std::unique_ptr<rgw::sal::Object> object = bucket->get_object({olh_entry.key.name});
+ RGWObjState *state;
+ ret = object->get_obj_state(dpp, &state, y, false);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "ERROR failed to get state for: " << olh_entry.key.name << " get_obj_state(): " << cpp_strerror(-ret) << dendl;
+ continue;
+ }
+ ret = store->update_olh(dpp, obj_ctx, state, bucket->get_info(), obj);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "ERROR failed to update olh for: " << olh_entry.key.name << " update_olh(): " << cpp_strerror(-ret) << dendl;
+ continue;
+ }
+ }
+ }
+ if (op_state.dump_keys) {
+ flusher.get_formatter()->dump_string("", olh_entry.key.name);
+ if (flusher.get_formatter()->get_len() > FORMATTER_LEN_FLUSH_THRESHOLD) {
+ flusher.flush();
+ }
+ }
+ *count_out += 1;
+ }
+ } while (is_truncated);
+ flusher.flush();
+ return 0;
+}
+
+
+/**
+ * Spawns separate coroutines to check each bucket shard for leftover
+ * olh entries (and remove them if op_state.fix_index is true).
+ */
+int RGWBucket::check_index_olh(rgw::sal::RadosStore* const rados_store,
+ const DoutPrefixProvider *dpp,
+ RGWBucketAdminOpState& op_state,
+ RGWFormatterFlusher& flusher)
+{
+ const RGWBucketInfo& bucket_info = get_bucket_info();
+ if ((bucket_info.versioning_status() & BUCKET_VERSIONED) == 0) {
+ ldpp_dout(dpp, 0) << "WARNING: this command is only applicable to versioned buckets" << dendl;
+ return 0;
+ }
+
+ Formatter* formatter = flusher.get_formatter();
+ if (op_state.dump_keys) {
+ formatter->open_array_section("");
+ }
+
+ const int max_shards = rgw::num_shards(bucket_info.layout.current_index);
+ std::string verb = op_state.will_fix_index() ? "removed" : "found";
+ uint64_t count_out = 0;
+
+ boost::asio::io_context context;
+ int next_shard = 0;
+
+ const int max_aio = std::max(1, op_state.get_max_aio());
+
+ for (int i=0; i<max_aio; i++) {
+ spawn::spawn(context, [&](yield_context yield) {
+ while (true) {
+ int shard = next_shard;
+ next_shard += 1;
+ if (shard >= max_shards) {
+ return;
+ }
+ optional_yield y(context, yield);
+ uint64_t shard_count;
+ int r = ::check_index_olh(rados_store, &*bucket, dpp, op_state, flusher, shard, &shard_count, y);
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << "NOTICE: error processing shard " << shard <<
+ " check_index_olh(): " << r << dendl;
+ }
+ count_out += shard_count;
+ if (!op_state.hide_progress) {
+ ldpp_dout(dpp, 1) << "NOTICE: finished shard " << shard << " (" << shard_count <<
+ " entries " << verb << ")" << dendl;
+ }
+ }
+ });
+ }
+ try {
+ context.run();
+ } catch (const std::system_error& e) {
+ return -e.code().value();
+ }
+ if (!op_state.hide_progress) {
+ ldpp_dout(dpp, 1) << "NOTICE: finished all shards (" << count_out <<
+ " entries " << verb << ")" << dendl;
+ }
+ if (op_state.dump_keys) {
+ formatter->close_section();
+ flusher.flush();
+ }
+ return 0;
+}
+
+/**
+ * Indicates whether a versioned bucket instance entry is listable in the
+ * index. It does this by looping over all plain entries with prefix equal to
+ * the key name, and checking whether any have an instance ID matching the one
+ * on the specified key. The existence of an instance entry without a matching
+ * plain entry indicates that the object was uploaded successfully, but the
+ * request exited prior to linking the object into the index (via the creation
+ * of a plain entry).
+ */
+static int is_versioned_instance_listable(const DoutPrefixProvider *dpp,
+ RGWRados::BucketShard& bs,
+ const cls_rgw_obj_key& key,
+ bool& listable,
+ optional_yield y)
+{
+ const std::string empty_delim;
+ cls_rgw_obj_key marker;
+ rgw_cls_list_ret result;
+ listable = false;
+
+ do {
+ librados::ObjectReadOperation op;
+ cls_rgw_bucket_list_op(op, marker, key.name, empty_delim, 1000,
+ true, &result);
+ bufferlist ibl;
+ int r = bs.bucket_obj.operate(dpp, &op, &ibl, y);
+ if (r < 0) {
+ return r;
+ }
+
+ for (auto const& entry : result.dir.m) {
+ if (entry.second.key == key) {
+ listable = true;
+ return 0;
+ }
+ marker = entry.second.key;
+ }
+ } while (result.is_truncated);
+ return 0;
+}
+
+/**
+ * Loops over all instance entries in a bucket shard and finds ones with
+ * versioned_epoch=0 and an mtime that is earlier than op_state.min_age
+ * relative to the current time. These entries represent objects that were
+ * uploaded successfully but were not successfully linked into the object
+ * index. As an extra precaution, we also verify that these entries are indeed
+ * non listable (have no corresponding plain entry in the index). We can assume
+ * that clients received an error response for the associated upload requests
+ * since the bucket index linking transaction did not complete. Therefore, if
+ * op_state.fix_index is true, we remove the object that is associated with the
+ * instance entry.
+ */
+static int check_index_unlinked(rgw::sal::RadosStore* const rados_store,
+ rgw::sal::Bucket* const bucket,
+ const DoutPrefixProvider *dpp,
+ RGWBucketAdminOpState& op_state,
+ RGWFormatterFlusher& flusher,
+ const int shard,
+ uint64_t* const count_out,
+ optional_yield y)
+{
+ string marker = BI_INSTANCE_ENTRY_NS_START;
+ bool is_truncated = true;
+ list<rgw_cls_bi_entry> entries;
+
+ RGWObjectCtx obj_ctx(rados_store);
+ RGWRados* store = rados_store->getRados();
+ RGWRados::BucketShard bs(store);
+
+ int ret = bs.init(dpp, bucket->get_info(), bucket->get_info().layout.current_index, shard);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "ERROR bs.init(bucket=" << bucket << "): " << cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+
+ ceph::real_clock::time_point now = ceph::real_clock::now();
+ ceph::real_clock::time_point not_after = now - op_state.min_age;
+
+ *count_out = 0;
+ do {
+ entries.clear();
+ ret = store->bi_list(bs, "", marker, -1, &entries, &is_truncated);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "ERROR bi_list(): " << cpp_strerror(-ret) << dendl;
+ break;
+ }
+ list<rgw_cls_bi_entry>::iterator iter;
+ for (iter = entries.begin(); iter != entries.end(); ++iter) {
+ rgw_cls_bi_entry& entry = *iter;
+ marker = entry.idx;
+ if (entry.type != BIIndexType::Instance) {
+ is_truncated = false;
+ break;
+ }
+ rgw_bucket_dir_entry dir_entry;
+ auto iiter = entry.data.cbegin();
+ try {
+ decode(dir_entry, iiter);
+ } catch (buffer::error& err) {
+ ldpp_dout(dpp, -1) << "ERROR failed to decode instance entry for key: " <<
+ entry.idx << dendl;
+ continue;
+ }
+ if (dir_entry.versioned_epoch != 0 || dir_entry.meta.mtime > not_after) {
+ continue;
+ }
+ bool listable;
+ ret = is_versioned_instance_listable(dpp, bs, dir_entry.key, listable, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "ERROR is_versioned_instance_listable(key='" <<
+ dir_entry.key << "'): " << cpp_strerror(-ret) << dendl;
+ continue;
+ }
+ if (listable) {
+ continue;
+ }
+ if (op_state.will_fix_index()) {
+ rgw_obj_key key(dir_entry.key.name, dir_entry.key.instance);
+ ret = rgw_remove_object(dpp, rados_store, bucket, key);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "ERROR rgw_remove_obj(key='" <<
+ dir_entry.key << "'): " << cpp_strerror(-ret) << dendl;
+ continue;
+ }
+ }
+ if (op_state.dump_keys) {
+ Formatter* const formatter = flusher.get_formatter();
+ formatter->open_object_section("object_instance");
+ formatter->dump_string("name", dir_entry.key.name);
+ formatter->dump_string("instance", dir_entry.key.instance);
+ formatter->close_section();
+ if (formatter->get_len() > FORMATTER_LEN_FLUSH_THRESHOLD) {
+ flusher.flush();
+ }
+ }
+ *count_out += 1;
+ }
+ } while (is_truncated);
+ flusher.flush();
+ return 0;
+}
+
+/**
+ * Spawns separate coroutines to check each bucket shard for unlinked
+ * instance entries (and remove them if op_state.fix_index is true).
+ */
+int RGWBucket::check_index_unlinked(rgw::sal::RadosStore* const rados_store,
+ const DoutPrefixProvider *dpp,
+ RGWBucketAdminOpState& op_state,
+ RGWFormatterFlusher& flusher)
+{
+ const RGWBucketInfo& bucket_info = get_bucket_info();
+ if ((bucket_info.versioning_status() & BUCKET_VERSIONED) == 0) {
+ ldpp_dout(dpp, 0) << "WARNING: this command is only applicable to versioned buckets" << dendl;
+ return 0;
+ }
+
+ Formatter* formatter = flusher.get_formatter();
+ if (op_state.dump_keys) {
+ formatter->open_array_section("");
+ }
+
+ const int max_shards = rgw::num_shards(bucket_info.layout.current_index);
+ std::string verb = op_state.will_fix_index() ? "removed" : "found";
+ uint64_t count_out = 0;
+
+ int max_aio = std::max(1, op_state.get_max_aio());
+ int next_shard = 0;
+ boost::asio::io_context context;
+ for (int i=0; i<max_aio; i++) {
+ spawn::spawn(context, [&](yield_context yield) {
+ while (true) {
+ int shard = next_shard;
+ next_shard += 1;
+ if (shard >= max_shards) {
+ return;
+ }
+ uint64_t shard_count;
+ optional_yield y {context, yield};
+ int r = ::check_index_unlinked(rados_store, &*bucket, dpp, op_state, flusher, shard, &shard_count, y);
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << "ERROR: error processing shard " << shard <<
+ " check_index_unlinked(): " << r << dendl;
+ }
+ count_out += shard_count;
+ if (!op_state.hide_progress) {
+ ldpp_dout(dpp, 1) << "NOTICE: finished shard " << shard << " (" << shard_count <<
+ " entries " << verb << ")" << dendl;
+ }
+ }
+ });
+ }
+ try {
+ context.run();
+ } catch (const std::system_error& e) {
+ return -e.code().value();
+ }
+
+ if (!op_state.hide_progress) {
+ ldpp_dout(dpp, 1) << "NOTICE: finished all shards (" << count_out <<
+ " entries " << verb << ")" << dendl;
+ }
+ if (op_state.dump_keys) {
+ formatter->close_section();
+ flusher.flush();
+ }
+ return 0;
+}
int RGWBucket::check_index(const DoutPrefixProvider *dpp,
RGWBucketAdminOpState& op_state,
}
+int RGWBucketAdminOp::check_index_olh(rgw::sal::RadosStore* store, RGWBucketAdminOpState& op_state,
+ RGWFormatterFlusher& flusher, const DoutPrefixProvider *dpp)
+{
+ RGWBucket bucket;
+ int ret = bucket.init(store, op_state, null_yield, dpp);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "bucket.init(): " << ret << dendl;
+ return ret;
+ }
+ flusher.start(0);
+ ret = bucket.check_index_olh(store, dpp, op_state, flusher);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "check_index_olh(): " << ret << dendl;
+ return ret;
+ }
+ flusher.flush();
+ return 0;
+}
+
+int RGWBucketAdminOp::check_index_unlinked(rgw::sal::RadosStore* store,
+ RGWBucketAdminOpState& op_state,
+ RGWFormatterFlusher& flusher,
+ const DoutPrefixProvider *dpp)
+{
+ flusher.start(0);
+ RGWBucket bucket;
+ int ret = bucket.init(store, op_state, null_yield, dpp);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "bucket.init(): " << ret << dendl;
+ return ret;
+ }
+ ret = bucket.check_index_unlinked(store, dpp, op_state, flusher);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "check_index_unlinked(): " << ret << dendl;
+ return ret;
+ }
+ flusher.flush();
+ return 0;
+}
+
int RGWBucketAdminOp::check_index(rgw::sal::Driver* driver, RGWBucketAdminOpState& op_state,
RGWFormatterFlusher& flusher, optional_yield y, const DoutPrefixProvider *dpp)
{
Formatter *formatter = flusher.get_formatter();
flusher.start(0);
+ formatter->open_object_section("bucket_check");
ret = bucket.check_bad_index_multipart(op_state, flusher, dpp);
if (ret < 0)
return ret;
- ret = bucket.check_object_index(dpp, op_state, flusher, y);
- if (ret < 0)
- return ret;
+ if (op_state.will_check_objects()) {
+ ret = bucket.check_object_index(dpp, op_state, flusher, y);
+ if (ret < 0)
+ return ret;
+ }
ret = bucket.check_index(dpp, op_state, existing_stats, calculated_stats);
if (ret < 0)
return ret;
dump_index_check(existing_stats, calculated_stats, formatter);
+
+ formatter->close_section();
flusher.flush();
return 0;
return ret;
}
+ const RGWBucketInfo& bucket_info = bucket->get_info();
+
const auto& index = bucket->get_info().get_current_index();
if (is_layout_indexless(index)) {
cerr << "error, indexless buckets do not maintain stats; bucket=" <<
formatter->dump_string("id", bucket->get_bucket_id());
formatter->dump_string("marker", bucket->get_marker());
formatter->dump_stream("index_type") << bucket->get_info().layout.current_index.layout.type;
+ formatter->dump_bool("versioned", bucket_info.versioned());
+ formatter->dump_bool("versioning_enabled", bucket_info.versioning_enabled());
+ formatter->dump_bool("object_lock_enabled", bucket_info.obj_lock_enabled());
+ formatter->dump_bool("mfa_enabled", bucket_info.mfa_enabled());
::encode_json("owner", bucket->get_info().owner, formatter);
formatter->dump_string("ver", bucket_ver);
formatter->dump_string("master_ver", master_ver);
new_be.bucket.name = new_bucket_name;
- ret = ctl.bucket->store_bucket_instance_info(be.bucket, new_bi, y, dpp, RGWBucketCtl::BucketInstance::PutParams()
+ ret = ctl.bucket->store_bucket_instance_info(new_be.bucket, new_bi, y, dpp, RGWBucketCtl::BucketInstance::PutParams()
.set_exclusive(false)
.set_mtime(orig_mtime)
.set_attrs(&attrs_m)