-
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
#include <string>
-using namespace std;
#include "common/config.h"
#include "common/Formatter.h"
#include "common/errno.h"
#include "rgw_rados.h"
+#include "rgw_op.h"
+#include "rgw_multi.h"
#include "rgw_orphan.h"
+#include "rgw_zone.h"
+#include "rgw_bucket.h"
+
+#include "services/svc_zone.h"
+#include "services/svc_sys_obj.h"
#define dout_subsys ceph_subsys_rgw
try {
bufferlist& bl = iter->second;
- ::decode(state, bl);
+ decode(state, bl);
} catch (buffer::error& err) {
lderr(store->ctx()) << "ERROR: could not decode buffer" << dendl;
return -EIO;
{
map<string, bufferlist> vals;
bufferlist bl;
- ::encode(state, bl);
+ encode(state, bl);
vals[job_name] = bl;
int r = ioctx.omap_set(oid, vals);
if (r < 0) {
RGWOrphanSearchState state;
try {
bufferlist bl = it.second;
- ::decode(state, bl);
+ decode(state, bl);
} catch (buffer::error& err) {
lderr(store->ctx()) << "ERROR: could not decode buffer" << dendl;
return -EIO;
int RGWOrphanStore::init()
{
- rgw_pool& log_pool = store->get_zone_params().log_pool;
- int r = rgw_init_ioctx(store->get_rados_handle(), log_pool, ioctx);
+ const rgw_pool& log_pool = store->svc()->zone->get_zone_params().log_pool;
+ int r = rgw_init_ioctx(store->getRados()->get_rados_handle(), log_pool, ioctx);
if (r < 0) {
cerr << "ERROR: failed to open log pool (" << log_pool << " ret=" << r << std::endl;
return r;
for (map<string, bufferlist>::const_iterator iter = entries.begin(); iter != entries.end(); ++iter) {
ldout(store->ctx(), 20) << " > " << iter->first << dendl;
}
- int ret = ioctx.operate(oid, &op);
+ int ret = rgw_rados_operate(ioctx, oid, &op, null_yield);
if (ret < 0) {
lderr(store->ctx()) << "ERROR: " << __func__ << "(" << oid << ") returned ret=" << ret << dendl;
}
return 0;
}
-int RGWOrphanSearch::init(const string& job_name, RGWOrphanSearchInfo *info) {
+int RGWOrphanSearch::init(const string& job_name, RGWOrphanSearchInfo *info, bool _detailed_mode)
+{
int r = orphan_store.init();
if (r < 0) {
return r;
}
+ constexpr int64_t MAX_LIST_OBJS_ENTRIES=100;
+
+ max_list_bucket_entries = std::max(store->ctx()->_conf->rgw_list_bucket_min_readahead,
+ MAX_LIST_OBJS_ENTRIES);
+
+ detailed_mode = _detailed_mode;
RGWOrphanSearchState state;
r = orphan_store.read_job(job_name, state);
if (r < 0 && r != -ENOENT) {
{
librados::IoCtx ioctx;
- int ret = rgw_init_ioctx(store->get_rados_handle(), search_info.pool, ioctx);
+ int ret = rgw_init_ioctx(store->getRados()->get_rados_handle(), search_info.pool, ioctx);
if (ret < 0) {
lderr(store->ctx()) << __func__ << ": rgw_init_ioctx() returned ret=" << ret << dendl;
return ret;
void *handle;
int max = 1000;
string section = "bucket.instance";
- int ret = store->meta_mgr->list_keys_init(section, &handle);
+ int ret = store->ctl()->meta.mgr->list_keys_init(section, &handle);
if (ret < 0) {
lderr(store->ctx()) << "ERROR: can't get key: " << cpp_strerror(-ret) << dendl;
- return -ret;
+ return ret;
}
map<int, list<string> > instances;
do {
list<string> keys;
- ret = store->meta_mgr->list_keys_next(handle, max, keys, &truncated);
+ ret = store->ctl()->meta.mgr->list_keys_next(handle, max, keys, &truncated);
if (ret < 0) {
lderr(store->ctx()) << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << dendl;
- return -ret;
+ return ret;
}
for (list<string>::iterator iter = keys.begin(); iter != keys.end(); ++iter) {
lderr(store->ctx()) << __func__ << ": ERROR: log_oids() returned ret=" << ret << dendl;
return ret;
}
- store->meta_mgr->list_keys_complete(handle);
+ store->ctl()->meta.mgr->list_keys_complete(handle);
return 0;
}
{
set<string> obj_oids;
rgw_bucket& bucket = result.obj.bucket;
- if (!result.has_manifest) { /* a very very old object, or part of a multipart upload during upload */
+ if (!result.manifest) { /* a very very old object, or part of a multipart upload during upload */
const string loc = bucket.bucket_id + "_" + result.obj.get_oid();
obj_oids.insert(obj_fingerprint(loc));
*/
obj_oids.insert(obj_fingerprint(loc, "shadow"));
} else {
- RGWObjManifest& manifest = result.manifest;
+ RGWObjManifest& manifest = *result.manifest;
+
+ if (!detailed_mode &&
+ manifest.get_obj_size() <= manifest.get_head_size()) {
+ ldout(store->ctx(), 5) << "skipping object as it fits in a head" << dendl;
+ return 0;
+ }
RGWObjManifest::obj_iterator miter;
for (miter = manifest.obj_begin(); miter != manifest.obj_end(); ++miter) {
- const rgw_raw_obj& loc = miter.get_location().get_raw_obj(store);
+ const rgw_raw_obj& loc = miter.get_location().get_raw_obj(store->getRados());
string s = loc.oid;
obj_oids.insert(obj_fingerprint(s));
}
int RGWOrphanSearch::build_linked_oids_for_bucket(const string& bucket_instance_id, map<int, list<string> >& oids)
{
- ldout(store->ctx(), 10) << "building linked oids for bucket instance: " << bucket_instance_id << dendl;
- RGWBucketInfo bucket_info;
RGWObjectCtx obj_ctx(store);
- int ret = store->get_bucket_instance_info(obj_ctx, bucket_instance_id, bucket_info, NULL, NULL);
+ auto sysobj_ctx = store->svc()->sysobj->init_obj_ctx();
+
+ rgw_bucket orphan_bucket;
+ int shard_id;
+ int ret = rgw_bucket_parse_bucket_key(store->ctx(), bucket_instance_id,
+ &orphan_bucket, &shard_id);
+ if (ret < 0) {
+ ldout(store->ctx(),0) << __func__ << " failed to parse bucket instance: "
+ << bucket_instance_id << " skipping" << dendl;
+ return ret;
+ }
+
+ RGWBucketInfo cur_bucket_info;
+ ret = store->getRados()->get_bucket_info(store->svc(), orphan_bucket.tenant,
+ orphan_bucket.name, cur_bucket_info, nullptr, null_yield);
+ if (ret < 0) {
+ if (ret == -ENOENT) {
+ /* probably raced with bucket removal */
+ return 0;
+ }
+ lderr(store->ctx()) << __func__ << ": ERROR: RGWRados::get_bucket_instance_info() returned ret=" << ret << dendl;
+ return ret;
+ }
+
+ if (cur_bucket_info.bucket.bucket_id != orphan_bucket.bucket_id) {
+ ldout(store->ctx(), 0) << __func__ << ": Skipping stale bucket instance: "
+ << orphan_bucket.name << ": "
+ << orphan_bucket.bucket_id << dendl;
+ return 0;
+ }
+
+ if (cur_bucket_info.reshard_status == cls_rgw_reshard_status::IN_PROGRESS) {
+ ldout(store->ctx(), 0) << __func__ << ": reshard in progress. Skipping "
+ << orphan_bucket.name << ": "
+ << orphan_bucket.bucket_id << dendl;
+ return 0;
+ }
+
+ RGWBucketInfo bucket_info;
+ ret = store->getRados()->get_bucket_instance_info(sysobj_ctx, bucket_instance_id, bucket_info, nullptr, nullptr, null_yield);
if (ret < 0) {
if (ret == -ENOENT) {
/* probably raced with bucket removal */
return ret;
}
- RGWRados::Bucket target(store, bucket_info);
+ ldout(store->ctx(), 10) << "building linked oids for bucket instance: " << bucket_instance_id << dendl;
+ RGWRados::Bucket target(store->getRados(), bucket_info);
RGWRados::Bucket::List list_op(&target);
string marker;
deque<RGWRados::Object::Stat> stat_ops;
- int count = 0;
-
do {
vector<rgw_bucket_dir_entry> result;
-#define MAX_LIST_OBJS_ENTRIES 100
- ret = list_op.list_objects(MAX_LIST_OBJS_ENTRIES, &result, NULL, &truncated);
+ ret = list_op.list_objects(max_list_bucket_entries,
+ &result, nullptr, &truncated, null_yield);
if (ret < 0) {
cerr << "ERROR: store->list_objects(): " << cpp_strerror(-ret) << std::endl;
- return -ret;
+ return ret;
}
for (vector<rgw_bucket_dir_entry>::iterator iter = result.begin(); iter != result.end(); ++iter) {
}
ldout(store->ctx(), 20) << __func__ << ": entry.key.name=" << entry.key.name << " entry.key.instance=" << entry.key.instance << dendl;
+
+ if (!detailed_mode &&
+ entry.meta.accounted_size <= (uint64_t)store->ctx()->_conf->rgw_max_chunk_size) {
+ ldout(store->ctx(),5) << __func__ << "skipping stat as the object " << entry.key.name
+ << "fits in a head" << dendl;
+ continue;
+ }
+
rgw_obj obj(bucket_info.bucket, entry.key);
- RGWRados::Object op_target(store, bucket_info, obj_ctx, obj);
+ RGWRados::Object op_target(store->getRados(), bucket_info, obj_ctx, obj);
stat_ops.push_back(RGWRados::Object::Stat(&op_target));
RGWRados::Object::Stat& op = stat_ops.back();
}
}
}
- if (++count >= COUNT_BEFORE_FLUSH) {
+ if (oids.size() >= COUNT_BEFORE_FLUSH) {
ret = log_oids(linked_objs_index, oids);
if (ret < 0) {
cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
return ret;
}
- count = 0;
oids.clear();
}
}
int RGWOrphanSearch::compare_oid_indexes()
{
- assert(linked_objs_index.size() == all_objs_index.size());
+ ceph_assert(linked_objs_index.size() == all_objs_index.size());
librados::IoCtx& ioctx = orphan_store.get_ioctx();
librados::IoCtx data_ioctx;
- int ret = rgw_init_ioctx(store->get_rados_handle(), search_info.pool, data_ioctx);
+ int ret = rgw_init_ioctx(store->getRados()->get_rados_handle(), search_info.pool, data_ioctx);
if (ret < 0) {
lderr(store->ctx()) << __func__ << ": rgw_init_ioctx() returned ret=" << ret << dendl;
return ret;
return r;
}
+
+
+int RGWRadosList::handle_stat_result(RGWRados::Object::Stat::Result& result,
+ std::set<string>& obj_oids)
+{
+ obj_oids.clear();
+
+ rgw_bucket& bucket = result.obj.bucket;
+
+ ldout(store->ctx(), 20) << "RGWRadosList::" << __func__ <<
+ " bucket=" << bucket <<
+ ", has_manifest=" << result.manifest.has_value() <<
+ dendl;
+
+ // iterator to store result of dlo/slo attribute find
+ decltype(result.attrs)::iterator attr_it = result.attrs.end();
+ const std::string oid = bucket.marker + "_" + result.obj.get_oid();
+ ldout(store->ctx(), 20) << "radoslist processing object=\"" <<
+ oid << "\"" << dendl;
+ if (visited_oids.find(oid) != visited_oids.end()) {
+ // apparently we hit a loop; don't continue with this oid
+ ldout(store->ctx(), 15) <<
+ "radoslist stopped loop at already visited object=\"" <<
+ oid << "\"" << dendl;
+ return 0;
+ }
+
+ if (!result.manifest) {
+ /* a very very old object, or part of a multipart upload during upload */
+ obj_oids.insert(oid);
+
+ /*
+ * multipart parts don't have manifest on them, it's in the meta
+ * object; we'll process them in
+ * RGWRadosList::do_incomplete_multipart
+ */
+ } else if ((attr_it = result.attrs.find(RGW_ATTR_USER_MANIFEST)) !=
+ result.attrs.end()) {
+ // *** handle DLO object ***
+
+ obj_oids.insert(oid);
+ visited_oids.insert(oid); // prevent dlo loops
+ ldout(store->ctx(), 15) << "radoslist added to visited list DLO=\"" <<
+ oid << "\"" << dendl;
+
+ char* prefix_path_c = attr_it->second.c_str();
+ const std::string& prefix_path = prefix_path_c;
+
+ const size_t sep_pos = prefix_path.find('/');
+ if (string::npos == sep_pos) {
+ return -EINVAL;
+ }
+
+ const std::string bucket_name = prefix_path.substr(0, sep_pos);
+ const std::string prefix = prefix_path.substr(sep_pos + 1);
+
+ add_bucket_prefix(bucket_name, prefix);
+ ldout(store->ctx(), 25) << "radoslist DLO oid=\"" << oid <<
+ "\" added bucket=\"" << bucket_name << "\" prefix=\"" <<
+ prefix << "\" to process list" << dendl;
+ } else if ((attr_it = result.attrs.find(RGW_ATTR_SLO_MANIFEST)) !=
+ result.attrs.end()) {
+ // *** handle SLO object ***
+
+ obj_oids.insert(oid);
+ visited_oids.insert(oid); // prevent slo loops
+ ldout(store->ctx(), 15) << "radoslist added to visited list SLO=\"" <<
+ oid << "\"" << dendl;
+
+ RGWSLOInfo slo_info;
+ bufferlist::const_iterator bliter = attr_it->second.begin();
+ try {
+ ::decode(slo_info, bliter);
+ } catch (buffer::error& err) {
+ ldout(store->ctx(), 0) <<
+ "ERROR: failed to decode slo manifest for " << oid << dendl;
+ return -EIO;
+ }
+
+ for (const auto& iter : slo_info.entries) {
+ const string& path_str = iter.path;
+
+ const size_t sep_pos = path_str.find('/', 1 /* skip initial slash */);
+ if (string::npos == sep_pos) {
+ return -EINVAL;
+ }
+
+ std::string bucket_name;
+ std::string obj_name;
+
+ bucket_name = url_decode(path_str.substr(1, sep_pos - 1));
+ obj_name = url_decode(path_str.substr(sep_pos + 1));
+
+ const rgw_obj_key obj_key(obj_name);
+ add_bucket_filter(bucket_name, obj_key);
+ ldout(store->ctx(), 25) << "radoslist SLO oid=\"" << oid <<
+ "\" added bucket=\"" << bucket_name << "\" obj_key=\"" <<
+ obj_key << "\" to process list" << dendl;
+ }
+ } else {
+ RGWObjManifest& manifest = *result.manifest;
+
+ // in multipart, the head object contains no data and just has the
+ // manifest AND empty objects have no manifest, but they're
+ // realized as empty rados objects
+ if (0 == manifest.get_max_head_size() ||
+ manifest.obj_begin() == manifest.obj_end()) {
+ obj_oids.insert(oid);
+ // first_insert = true;
+ }
+
+ RGWObjManifest::obj_iterator miter;
+ for (miter = manifest.obj_begin(); miter != manifest.obj_end(); ++miter) {
+ const rgw_raw_obj& loc =
+ miter.get_location().get_raw_obj(store->getRados());
+ string s = loc.oid;
+ obj_oids.insert(s);
+ }
+ }
+
+ return 0;
+} // RGWRadosList::handle_stat_result
+
+int RGWRadosList::pop_and_handle_stat_op(
+ RGWObjectCtx& obj_ctx,
+ std::deque<RGWRados::Object::Stat>& ops)
+{
+ std::set<string> obj_oids;
+ RGWRados::Object::Stat& front_op = ops.front();
+
+ int ret = front_op.wait();
+ if (ret < 0) {
+ if (ret != -ENOENT) {
+ lderr(store->ctx()) << "ERROR: stat_async() returned error: " <<
+ cpp_strerror(-ret) << dendl;
+ }
+ goto done;
+ }
+
+ ret = handle_stat_result(front_op.result, obj_oids);
+ if (ret < 0) {
+ lderr(store->ctx()) << "ERROR: handle_stat_result() returned error: " <<
+ cpp_strerror(-ret) << dendl;
+ }
+
+ // output results
+ for (const auto& o : obj_oids) {
+ std::cout << o << std::endl;
+ }
+
+done:
+
+ // invalidate object context for this object to avoid memory leak
+ // (see pr https://github.com/ceph/ceph/pull/30174)
+ obj_ctx.invalidate(front_op.result.obj);
+
+ ops.pop_front();
+ return ret;
+}
+
+
+#if 0 // code that may be the basis for expansion
+int RGWRadosList::build_buckets_instance_index()
+{
+ void *handle;
+ int max = 1000;
+ string section = "bucket.instance";
+ int ret = store->meta_mgr->list_keys_init(section, &handle);
+ if (ret < 0) {
+ lderr(store->ctx()) << "ERROR: can't get key: " << cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+
+ map<int, list<string> > instances;
+
+ bool truncated;
+
+ RGWObjectCtx obj_ctx(store);
+
+ int count = 0;
+ uint64_t total = 0;
+
+ do {
+ list<string> keys;
+ ret = store->meta_mgr->list_keys_next(handle, max, keys, &truncated);
+ if (ret < 0) {
+ lderr(store->ctx()) << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+
+ for (list<string>::iterator iter = keys.begin(); iter != keys.end(); ++iter) {
+ ++total;
+ ldout(store->ctx(), 10) << "bucket_instance=" << *iter << " total=" << total << dendl;
+ int shard = orphan_shard(*iter);
+ instances[shard].push_back(*iter);
+
+ if (++count >= COUNT_BEFORE_FLUSH) {
+ ret = log_oids(buckets_instance_index, instances);
+ if (ret < 0) {
+ lderr(store->ctx()) << __func__ << ": ERROR: log_oids() returned ret=" << ret << dendl;
+ return ret;
+ }
+ count = 0;
+ instances.clear();
+ }
+ }
+ } while (truncated);
+
+ ret = log_oids(buckets_instance_index, instances);
+ if (ret < 0) {
+ lderr(store->ctx()) << __func__ << ": ERROR: log_oids() returned ret=" << ret << dendl;
+ return ret;
+ }
+ store->meta_mgr->list_keys_complete(handle);
+
+ return 0;
+}
+#endif
+
+
+int RGWRadosList::process_bucket(
+ const std::string& bucket_instance_id,
+ const std::string& prefix,
+ const std::set<rgw_obj_key>& entries_filter)
+{
+ ldout(store->ctx(), 10) << "RGWRadosList::" << __func__ <<
+ " bucket_instance_id=" << bucket_instance_id <<
+ ", prefix=" << prefix <<
+ ", entries_filter.size=" << entries_filter.size() << dendl;
+
+ RGWBucketInfo bucket_info;
+ RGWSysObjectCtx sys_obj_ctx = store->svc()->sysobj->init_obj_ctx();
+ int ret = store->getRados()->get_bucket_instance_info(sys_obj_ctx,
+ bucket_instance_id,
+ bucket_info,
+ nullptr,
+ nullptr,
+ null_yield);
+ if (ret < 0) {
+ if (ret == -ENOENT) {
+ // probably raced with bucket removal
+ return 0;
+ }
+ lderr(store->ctx()) << __func__ <<
+ ": ERROR: RGWRados::get_bucket_instance_info() returned ret=" <<
+ ret << dendl;
+ return ret;
+ }
+
+ RGWRados::Bucket target(store->getRados(), bucket_info);
+ RGWRados::Bucket::List list_op(&target);
+
+ std::string marker;
+ list_op.params.marker = rgw_obj_key(marker);
+ list_op.params.list_versions = true;
+ list_op.params.enforce_ns = false;
+ list_op.params.allow_unordered = false;
+ list_op.params.prefix = prefix;
+
+ bool truncated;
+
+ std::deque<RGWRados::Object::Stat> stat_ops;
+ std::string prev_versioned_key_name = "";
+
+ RGWObjectCtx obj_ctx(store);
+
+ do {
+ std::vector<rgw_bucket_dir_entry> result;
+
+ constexpr int64_t LIST_OBJS_MAX_ENTRIES = 100;
+ ret = list_op.list_objects(LIST_OBJS_MAX_ENTRIES, &result,
+ NULL, &truncated, null_yield);
+ if (ret == -ENOENT) {
+ // race with bucket delete?
+ ret = 0;
+ break;
+ } else if (ret < 0) {
+ std::cerr << "ERROR: store->list_objects(): " << cpp_strerror(-ret) <<
+ std::endl;
+ return ret;
+ }
+
+ for (std::vector<rgw_bucket_dir_entry>::iterator iter = result.begin();
+ iter != result.end();
+ ++iter) {
+ rgw_bucket_dir_entry& entry = *iter;
+
+ if (entry.key.instance.empty()) {
+ ldout(store->ctx(), 20) << "obj entry: " << entry.key.name << dendl;
+ } else {
+ ldout(store->ctx(), 20) << "obj entry: " << entry.key.name <<
+ " [" << entry.key.instance << "]" << dendl;
+ }
+
+ ldout(store->ctx(), 20) << __func__ << ": entry.key.name=" <<
+ entry.key.name << " entry.key.instance=" << entry.key.instance <<
+ dendl;
+
+ // ignore entries that are not in the filter if there is a filter
+ if (!entries_filter.empty() &&
+ entries_filter.find(entry.key) == entries_filter.cend()) {
+ continue;
+ }
+
+ // we need to do this in two cases below, so use a lambda
+ auto do_stat_key =
+ [&](const rgw_obj_key& key) -> int {
+ int ret;
+
+ rgw_obj obj(bucket_info.bucket, key);
+
+ RGWRados::Object op_target(store->getRados(), bucket_info,
+ obj_ctx, obj);
+
+ stat_ops.push_back(RGWRados::Object::Stat(&op_target));
+ RGWRados::Object::Stat& op = stat_ops.back();
+
+ ret = op.stat_async();
+ if (ret < 0) {
+ lderr(store->ctx()) << "ERROR: stat_async() returned error: " <<
+ cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+
+ if (stat_ops.size() >= max_concurrent_ios) {
+ ret = pop_and_handle_stat_op(obj_ctx, stat_ops);
+ if (ret < 0) {
+ if (ret != -ENOENT) {
+ lderr(store->ctx()) <<
+ "ERROR: pop_and_handle_stat_op() returned error: " <<
+ cpp_strerror(-ret) << dendl;
+ }
+
+ // clear error, so we'll continue processing directory
+ ret = 0;
+ }
+ }
+
+ return ret;
+ }; // do_stat_key lambda
+
+ // for versioned objects, make sure the head object is handled
+ // as well by ignoring the instance identifier
+ if (!entry.key.instance.empty() &&
+ entry.key.name != prev_versioned_key_name) {
+ // don't do the same key twice; even though out bucket index
+ // listing allows unordered, since all versions of an object
+ // use the same bucket index key, they'll all end up together
+ // and sorted
+ prev_versioned_key_name = entry.key.name;
+
+ rgw_obj_key uninstanced(entry.key.name);
+
+ ret = do_stat_key(uninstanced);
+ if (ret < 0) {
+ return ret;
+ }
+ }
+
+ ret = do_stat_key(entry.key);
+ if (ret < 0) {
+ return ret;
+ }
+ } // for iter loop
+ } while (truncated);
+
+ while (!stat_ops.empty()) {
+ ret = pop_and_handle_stat_op(obj_ctx, stat_ops);
+ if (ret < 0) {
+ if (ret != -ENOENT) {
+ lderr(store->ctx()) << "ERROR: stat_async() returned error: " <<
+ cpp_strerror(-ret) << dendl;
+ }
+ }
+ }
+
+ return 0;
+}
+
+
+int RGWRadosList::run()
+{
+ int ret;
+ void* handle = nullptr;
+
+ ret = store->ctl()->meta.mgr->list_keys_init("bucket", &handle);
+ if (ret < 0) {
+ lderr(store->ctx()) << "RGWRadosList::" << __func__ <<
+ " ERROR: list_keys_init returned " <<
+ cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+
+ const int max_keys = 1000;
+ bool truncated = true;
+
+ do {
+ std::list<std::string> buckets;
+ ret = store->ctl()->meta.mgr->list_keys_next(handle, max_keys,
+ buckets, &truncated);
+
+ for (std::string& bucket_id : buckets) {
+ ret = run(bucket_id);
+ if (ret == -ENOENT) {
+ continue;
+ } else if (ret < 0) {
+ return ret;
+ }
+ }
+ } while (truncated);
+
+ return 0;
+} // RGWRadosList::run()
+
+
+int RGWRadosList::run(const std::string& start_bucket_name)
+{
+ RGWSysObjectCtx sys_obj_ctx = store->svc()->sysobj->init_obj_ctx();
+ RGWObjectCtx obj_ctx(store);
+ int ret;
+
+ add_bucket_entire(start_bucket_name);
+
+ while (! bucket_process_map.empty()) {
+ // pop item from map and capture its key data
+ auto front = bucket_process_map.begin();
+ std::string bucket_name = front->first;
+ process_t process;
+ std::swap(process, front->second);
+ bucket_process_map.erase(front);
+
+ RGWBucketInfo bucket_info;
+ ret = store->getRados()->get_bucket_info(store->svc(),
+ tenant_name,
+ bucket_name,
+ bucket_info,
+ nullptr,
+ null_yield);
+ if (ret == -ENOENT) {
+ std::cerr << "WARNING: bucket " << bucket_name <<
+ " does not exist; could it have been deleted very recently?" <<
+ std::endl;
+ continue;
+ } else if (ret < 0) {
+ std::cerr << "ERROR: could not get info for bucket " << bucket_name <<
+ " -- " << cpp_strerror(-ret) << std::endl;
+ return ret;
+ }
+
+ const std::string bucket_id = bucket_info.bucket.get_key();
+
+ static const std::set<rgw_obj_key> empty_filter;
+ static const std::string empty_prefix;
+
+ auto do_process_bucket =
+ [&bucket_id, this]
+ (const std::string& prefix,
+ const std::set<rgw_obj_key>& entries_filter) -> int {
+ int ret = process_bucket(bucket_id, prefix, entries_filter);
+ if (ret == -ENOENT) {
+ // bucket deletion race?
+ return 0;
+ } if (ret < 0) {
+ lderr(store->ctx()) << "RGWRadosList::" << __func__ <<
+ ": ERROR: process_bucket(); bucket_id=" <<
+ bucket_id << " returned ret=" << ret << dendl;
+ }
+
+ return ret;
+ };
+
+ // either process the whole bucket *or* process the filters and/or
+ // the prefixes
+ if (process.entire_container) {
+ ret = do_process_bucket(empty_prefix, empty_filter);
+ if (ret < 0) {
+ return ret;
+ }
+ } else {
+ if (! process.filter_keys.empty()) {
+ ret = do_process_bucket(empty_prefix, process.filter_keys);
+ if (ret < 0) {
+ return ret;
+ }
+ }
+ for (const auto& p : process.prefixes) {
+ ret = do_process_bucket(p, empty_filter);
+ if (ret < 0) {
+ return ret;
+ }
+ }
+ }
+ } // while (! bucket_process_map.empty())
+
+ // now handle incomplete multipart uploads by going back to the
+ // initial bucket
+
+ RGWBucketInfo bucket_info;
+ ret = store->getRados()->get_bucket_info(store->svc(),
+ tenant_name,
+ start_bucket_name,
+ bucket_info,
+ nullptr,
+ null_yield);
+ if (ret == -ENOENT) {
+ // bucket deletion race?
+ return 0;
+ } else if (ret < 0) {
+ lderr(store->ctx()) << "RGWRadosList::" << __func__ <<
+ ": ERROR: get_bucket_info returned ret=" << ret << dendl;
+ return ret;
+ }
+
+ ret = do_incomplete_multipart(store, bucket_info);
+ if (ret < 0) {
+ lderr(store->ctx()) << "RGWRadosList::" << __func__ <<
+ ": ERROR: do_incomplete_multipart returned ret=" << ret << dendl;
+ return ret;
+ }
+
+ return 0;
+} // RGWRadosList::run(string)
+
+
+int RGWRadosList::do_incomplete_multipart(
+ rgw::sal::RGWRadosStore* store,
+ RGWBucketInfo& bucket_info)
+{
+ constexpr int max_uploads = 1000;
+ constexpr int max_parts = 1000;
+ static const std::string mp_ns = RGW_OBJ_NS_MULTIPART;
+ static MultipartMetaFilter mp_filter;
+
+ int ret;
+
+ RGWRados::Bucket target(store->getRados(), bucket_info);
+ RGWRados::Bucket::List list_op(&target);
+ list_op.params.ns = mp_ns;
+ list_op.params.filter = &mp_filter;
+ // use empty string for initial list_op.params.marker
+ // use empty strings for list_op.params.{prefix,delim}
+
+ bool is_listing_truncated;
+
+ do {
+ std::vector<rgw_bucket_dir_entry> objs;
+ std::map<string, bool> common_prefixes;
+ ret = list_op.list_objects(max_uploads, &objs, &common_prefixes,
+ &is_listing_truncated, null_yield);
+ if (ret == -ENOENT) {
+ // could bucket have been removed while this is running?
+ ldout(store->ctx(), 20) << "RGWRadosList::" << __func__ <<
+ ": WARNING: call to list_objects of multipart namespace got ENOENT; "
+ "assuming bucket removal race" << dendl;
+ break;
+ } else if (ret < 0) {
+ lderr(store->ctx()) << "RGWRadosList::" << __func__ <<
+ ": ERROR: list_objects op returned ret=" << ret << dendl;
+ return ret;
+ }
+
+ if (!objs.empty()) {
+ std::vector<RGWMultipartUploadEntry> uploads;
+ RGWMultipartUploadEntry entry;
+ for (const rgw_bucket_dir_entry& obj : objs) {
+ const rgw_obj_key& key = obj.key;
+ if (!entry.mp.from_meta(key.name)) {
+ // we only want the meta objects, so skip all the components
+ continue;
+ }
+ entry.obj = obj;
+ uploads.push_back(entry);
+ ldout(store->ctx(), 20) << "RGWRadosList::" << __func__ <<
+ " processing incomplete multipart entry " <<
+ entry << dendl;
+ }
+
+ // now process the uploads vector
+ int parts_marker = 0;
+ bool is_parts_truncated = false;
+ do {
+ map<uint32_t, RGWUploadPartInfo> parts;
+
+ for (const auto& upload : uploads) {
+ const RGWMPObj& mp = upload.mp;
+ ret = list_multipart_parts(store, bucket_info, store->ctx(),
+ mp.get_upload_id(), mp.get_meta(),
+ max_parts,
+ parts_marker, parts, NULL, &is_parts_truncated);
+ if (ret == -ENOENT) {
+ continue;
+ } else if (ret < 0) {
+ lderr(store->ctx()) << "RGWRadosList::" << __func__ <<
+ ": ERROR: list_multipart_parts returned ret=" << ret << dendl;
+ return ret;
+ }
+
+ for (auto& p : parts) {
+ RGWObjManifest& manifest = p.second.manifest;
+ for (auto obj_it = manifest.obj_begin();
+ obj_it != manifest.obj_end();
+ ++obj_it) {
+ const rgw_raw_obj& loc =
+ obj_it.get_location().get_raw_obj(store->getRados());
+ std::cout << loc.oid << std::endl;
+ }
+ }
+ }
+ } while (is_parts_truncated);
+ } // if objs not empty
+ } while (is_listing_truncated);
+
+ return 0;
+} // RGWRadosList::do_incomplete_multipart