#include "common/errno.h"
#include "common/ceph_json.h"
#include "common/backport14.h"
+#include "include/scope_guard.h"
#include "rgw_rados.h"
#include "rgw_acl.h"
#include "rgw_acl_s3.h"
#include "include/rados/librados.hpp"
// until everything is moved from rgw_common
#include "rgw_common.h"
-
+#include "rgw_reshard.h"
#include "cls/user/cls_user_types.h"
#define dout_context g_ceph_context
return store->meta_mgr->put_entry(bucket_instance_meta_handler, entry, bl, exclusive, objv_tracker, mtime, pattrs);
}
-int rgw_bucket_instance_remove_entry(RGWRados *store, string& entry, RGWObjVersionTracker *objv_tracker) {
+int rgw_bucket_instance_remove_entry(RGWRados *store, const string& entry,
+ RGWObjVersionTracker *objv_tracker) {
return store->meta_mgr->remove_entry(bucket_instance_meta_handler, entry, objv_tracker);
}
return bucket.set_quota(op_state);
}
+static int purge_bucket_instance(RGWRados *store, const RGWBucketInfo& bucket_info)
+{
+ int max_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
+ for (int i = 0; i < max_shards; i++) {
+ RGWRados::BucketShard bs(store);
+ int shard_id = (bucket_info.num_shards > 0 ? i : -1);
+ int ret = bs.init(bucket_info.bucket, shard_id, nullptr);
+ if (ret < 0) {
+ cerr << "ERROR: bs.init(bucket=" << bucket_info.bucket << ", shard=" << shard_id
+ << "): " << cpp_strerror(-ret) << std::endl;
+ return ret;
+ }
+ ret = store->bi_remove(bs);
+ if (ret < 0) {
+ cerr << "ERROR: failed to remove bucket index object: "
+ << cpp_strerror(-ret) << std::endl;
+ return ret;
+ }
+ }
+ return 0;
+}
+
+inline auto split_tenant(const std::string& bucket_name) ->
+ std::pair<std::string,std::string>
+{
+ auto p = bucket_name.find('/');
+ if(p != std::string::npos) {
+ return std::make_pair(bucket_name.substr(0,p), bucket_name.substr(p+1));
+ }
+ return std::make_pair(std::string(), bucket_name);
+}
+
+using bucket_instance_ls = std::vector<RGWBucketInfo>;
+void get_stale_instances(RGWRados *store, const std::string& bucket_name,
+ const vector<std::string>& lst,
+ bucket_instance_ls& stale_instances)
+{
+
+ RGWObjectCtx obj_ctx(store);
+
+ bucket_instance_ls other_instances;
+// first iterate over the entries, and pick up the done buckets; these
+// are guaranteed to be stale
+ for (const auto& bucket_instance : lst){
+ RGWBucketInfo binfo;
+ int r = store->get_bucket_instance_info(obj_ctx, bucket_instance,
+ binfo, nullptr,nullptr);
+ if (r < 0){
+ // this can only happen if someone deletes us right when we're processing
+ lderr(store->ctx()) << "Bucket instance is invalid: " << bucket_instance
+ << cpp_strerror(-r) << dendl;
+ continue;
+ }
+ if (binfo.reshard_status == CLS_RGW_RESHARD_DONE)
+ stale_instances.emplace_back(std::move(binfo));
+ else {
+ other_instances.emplace_back(std::move(binfo));
+ }
+ }
+
+ // Read the cur bucket info, if the bucket doesn't exist we can simply return
+ // all the instances
+ std::string tenant,bucket;
+ std::tie(tenant, bucket) = split_tenant(bucket_name);
+ RGWBucketInfo cur_bucket_info;
+ int r = store->get_bucket_info(obj_ctx, tenant, bucket, cur_bucket_info, nullptr);
+ if (r < 0) {
+ if (r == -ENOENT) {
+ // bucket doesn't exist, everything is stale then
+ stale_instances.insert(std::end(stale_instances),
+ std::make_move_iterator(other_instances.begin()),
+ std::make_move_iterator(other_instances.end()));
+ } else {
+ // all bets are off if we can't read the bucket, just return the sureshot stale instances
+ lderr(store->ctx()) << "error: reading bucket info for bucket: "
+ << bucket << cpp_strerror(-r) << dendl;
+ }
+ return;
+ }
+
+ // Don't process further in this round if bucket is resharding
+ if (cur_bucket_info.reshard_status == CLS_RGW_RESHARD_IN_PROGRESS)
+ return;
+
+ other_instances.erase(std::remove_if(other_instances.begin(), other_instances.end(),
+ [&cur_bucket_info](const RGWBucketInfo& b){
+ return (b.bucket.bucket_id == cur_bucket_info.bucket.bucket_id ||
+ b.bucket.bucket_id == cur_bucket_info.new_bucket_instance_id);
+ }),
+ other_instances.end());
+
+ // check if there are still instances left
+ if (other_instances.empty()) {
+ return;
+ }
+
+ // Now we have a bucket with instances where the reshard status is none, this
+ // usually happens when the reshard process couldn't complete, lockdown the
+ // bucket and walk through these instances to make sure no one else interferes
+ // with these
+ {
+ RGWBucketReshardLock reshard_lock(store, cur_bucket_info, true);
+ r = reshard_lock.lock();
+ if (r < 0) {
+ // most likely bucket is under reshard, return the sureshot stale instances
+ ldout(store->ctx(), 5) << __func__
+ << "failed to take reshard lock; reshard underway likey" << dendl;
+ return;
+ }
+ auto sg = make_scope_guard([&reshard_lock](){ reshard_lock.unlock();} );
+ // this should be fast enough that we may not need to renew locks and check
+ // exit status?, should we read the values of the instances again?
+ stale_instances.insert(std::end(stale_instances),
+ std::make_move_iterator(other_instances.begin()),
+ std::make_move_iterator(other_instances.end()));
+ }
+
+ return;
+}
+
+static int process_stale_instances(RGWRados *store, RGWBucketAdminOpState& op_state,
+ RGWFormatterFlusher& flusher,
+ std::function<void(const bucket_instance_ls&,
+ Formatter *,
+ RGWRados*)> process_f)
+{
+ std::string marker;
+ void *handle;
+ Formatter *formatter = flusher.get_formatter();
+ static constexpr auto default_max_keys = 1000;
+
+ int ret = store->meta_mgr->list_keys_init("bucket.instance", marker, &handle);
+ if (ret < 0) {
+ cerr << "ERROR: can't get key: " << cpp_strerror(-ret) << std::endl;
+ return ret;
+ }
+
+ bool truncated;
+
+ formatter->open_array_section("keys");
+
+ do {
+ list<std::string> keys;
+
+ ret = store->meta_mgr->list_keys_next(handle, default_max_keys, keys, &truncated);
+ if (ret < 0 && ret != -ENOENT) {
+ cerr << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << std::endl;
+ return ret;
+ } if (ret != -ENOENT) {
+ // partition the list of buckets by buckets as the listing is un sorted,
+ // since it would minimize the reads to bucket_info
+ std::unordered_map<std::string, std::vector<std::string>> bucket_instance_map;
+ for (auto &key: keys) {
+ auto pos = key.find(':');
+ if(pos != std::string::npos)
+ bucket_instance_map[key.substr(0,pos)].emplace_back(std::move(key));
+ }
+ for (const auto& kv: bucket_instance_map) {
+ bucket_instance_ls stale_lst;
+ get_stale_instances(store, kv.first, kv.second, stale_lst);
+ process_f(stale_lst, formatter, store);
+ }
+ }
+ } while (truncated);
+
+ formatter->close_section(); // keys
+ formatter->flush(cout);
+ return 0;
+}
+
+int RGWBucketAdminOp::list_stale_instances(RGWRados *store,
+ RGWBucketAdminOpState& op_state,
+ RGWFormatterFlusher& flusher)
+{
+ auto process_f = [](const bucket_instance_ls& lst,
+ Formatter *formatter,
+ RGWRados*){
+ for (const auto& binfo: lst)
+ formatter->dump_string("key", binfo.bucket.get_key());
+ };
+ return process_stale_instances(store, op_state, flusher, process_f);
+}
+
+
+int RGWBucketAdminOp::clear_stale_instances(RGWRados *store,
+ RGWBucketAdminOpState& op_state,
+ RGWFormatterFlusher& flusher)
+{
+ auto process_f = [](const bucket_instance_ls& lst,
+ Formatter *formatter,
+ RGWRados *store){
+ for (const auto &binfo: lst) {
+ int ret = purge_bucket_instance(store, binfo);
+ if (ret == 0){
+ auto md_key = "bucket.instance:" + binfo.bucket.get_key();
+ ret = store->meta_mgr->remove(md_key);
+ }
+ formatter->open_object_section("delete_status");
+ formatter->dump_string("bucket_instance", binfo.bucket.get_key());
+ formatter->dump_int("status", -ret);
+ formatter->close_section();
+ }
+ };
+
+ return process_stale_instances(store, op_state, flusher, process_f);
+}
+
void rgw_data_change::dump(Formatter *f) const
{
string type;
RGWListRawObjsCtx ctx;
};
- int remove(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker) override {
+ int remove(RGWRados *store, string& entry,
+ RGWObjVersionTracker& objv_tracker) override {
RGWBucketInfo info;
RGWObjectCtx obj_ctx(store);
if (ret < 0 && ret != -ENOENT)
return ret;
- return rgw_bucket_instance_remove_entry(store, entry, &info.objv_tracker);
+ return rgw_bucket_instance_remove_entry(store, entry,
+ &info.objv_tracker);
}
void get_pool_and_oid(RGWRados *store, const string& key, rgw_pool& pool, string& oid) override {