uint64_t interval_msec() override {
return cct->_conf->rgw_md_notify_interval_msec;
}
+ void stop_process() override {
+ notify_mgr.stop();
+ }
public:
RGWMetaNotifier(RGWRados *_store, RGWMetadataLog* log)
: RGWRadosThread(_store, "meta-notifier"), notify_mgr(_store), log(log) {}
uint64_t interval_msec() override {
return cct->_conf->get_val<int64_t>("rgw_data_notify_interval_msec");
}
+ void stop_process() override {
+ notify_mgr.stop();
+ }
public:
RGWDataNotifier(RGWRados *_store) : RGWRadosThread(_store, "data-notifier"), notify_mgr(_store) {}
int RGWRados::key_to_shard_id(const string& key, int max_shards)
{
- return rgw_shards_hash(key, max_shards);
+ return rgw_shard_id(key, max_shards);
}
void RGWRados::shard_name(const string& prefix, unsigned max_shards, const string& key, string& name, int *shard_id)
{
string obj_key = key.name + key.instance;
int num_shards = cct->_conf->rgw_objexp_hints_num_shards;
- uint32_t sid = ceph_str_hash_linux(obj_key.c_str(), obj_key.size());
- uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
- sid = rgw_shards_mod(sid2, num_shards);
- return sid;
+ return rgw_bucket_shard_index(obj_key, num_shards);
}
static string objexp_hint_get_keyext(const string& tenant_name,
return 0;
}
-/**
- * get listing of the objects in a bucket.
+
+/**
+ * Get ordered listing of the objects in a bucket.
*
* max: maximum number of results to return
* bucket: bucket to list contents of
* common_prefixes: if delim is filled in, any matching prefixes are placed here.
* is_truncated: if number of objects in the bucket is bigger than max, then truncated.
*/
-int RGWRados::Bucket::List::list_objects(int64_t max,
- vector<rgw_bucket_dir_entry> *result,
- map<string, bool> *common_prefixes,
- bool *is_truncated)
+int RGWRados::Bucket::List::list_objects_ordered(int64_t max,
+ vector<rgw_bucket_dir_entry> *result,
+ map<string, bool> *common_prefixes,
+ bool *is_truncated)
{
RGWRados *store = target->get_store();
CephContext *cct = store->ctx();
string bigger_than_delim;
if (!params.delim.empty()) {
- unsigned long val = decode_utf8((unsigned char *)params.delim.c_str(), params.delim.size());
+ unsigned long val = decode_utf8((unsigned char *)params.delim.c_str(),
+ params.delim.size());
char buf[params.delim.size() + 16];
int r = encode_utf8(val + 1, (unsigned char *)buf);
if (r < 0) {
cur_marker = s;
}
}
-
+
string skip_after_delim;
while (truncated && count <= max) {
if (skip_after_delim > cur_marker.name) {
ldout(cct, 20) << "setting cur_marker=" << cur_marker.name << "[" << cur_marker.instance << "]" << dendl;
}
std::map<string, rgw_bucket_dir_entry> ent_map;
- int r = store->cls_bucket_list(target->get_bucket_info(), shard_id, cur_marker, cur_prefix,
- read_ahead + 1 - count, params.list_versions, ent_map,
- &truncated, &cur_marker);
+ int r = store->cls_bucket_list_ordered(target->get_bucket_info(),
+ shard_id,
+ cur_marker,
+ cur_prefix,
+ read_ahead + 1 - count,
+ params.list_versions,
+ ent_map,
+ &truncated,
+ &cur_marker);
if (r < 0)
return r;
- std::map<string, rgw_bucket_dir_entry>::iterator eiter;
- for (eiter = ent_map.begin(); eiter != ent_map.end(); ++eiter) {
+ for (auto eiter = ent_map.begin(); eiter != ent_map.end(); ++eiter) {
rgw_bucket_dir_entry& entry = eiter->second;
rgw_obj_index_key index_key = entry.key;
rgw_obj_key obj(index_key);
- /* note that parse_raw_oid() here will not set the correct object's instance, as
- * rgw_obj_index_key encodes that separately. We don't need to set the instance because it's
- * not needed for the checks here and we end up using the raw entry for the return vector
+ /* note that parse_raw_oid() here will not set the correct
+ * object's instance, as rgw_obj_index_key encodes that
+ * separately. We don't need to set the instance because it's
+ * not needed for the checks here and we end up using the raw
+ * entry for the return vector
*/
bool valid = rgw_obj_key::parse_raw_oid(index_key.name, &obj);
if (!valid) {
if (params.filter && !params.filter->filter(obj.name, index_key.name))
continue;
- if (params.prefix.size() && (obj.name.compare(0, params.prefix.size(), params.prefix) != 0))
+ if (params.prefix.size() &&
+ (obj.name.compare(0, params.prefix.size(), params.prefix) != 0))
continue;
if (!params.delim.empty()) {
result->emplace_back(std::move(entry));
count++;
}
-
- // Either the back-end telling us truncated, or we don't consume all
- // items returned per the amount caller request
- truncated = (truncated || eiter != ent_map.end());
}
done:
*is_truncated = truncated;
return 0;
-}
+} // list_objects_ordered
+
+
+/**
+ * Get listing of the objects in a bucket and allow the results to be out
+ * of order.
+ *
+ * Even though there are key differences with the ordered counterpart,
+ * the parameters are the same to maintain some compatability.
+ *
+ * max: maximum number of results to return
+ * bucket: bucket to list contents of
+ * prefix: only return results that match this prefix
+ * delim: should not be set; if it is we should have indicated an error
+ * marker: if filled in, begin the listing with this object.
+ * end_marker: if filled in, end the listing with this object.
+ * result: the objects are put in here.
+ * common_prefixes: this is never filled with an unordered list; the param
+ * is maintained for compatibility
+ * is_truncated: if number of objects in the bucket is bigger than max, then
+ * truncated.
+ */
+int RGWRados::Bucket::List::list_objects_unordered(int64_t max,
+ vector<rgw_bucket_dir_entry> *result,
+ map<string, bool> *common_prefixes,
+ bool *is_truncated)
+{
+ RGWRados *store = target->get_store();
+ CephContext *cct = store->ctx();
+ int shard_id = target->get_shard_id();
+
+ int count = 0;
+ bool truncated = true;
+
+ // read a few extra in each call to cls_bucket_list_unordered in
+ // case some are filtered out due to namespace matching, versioning,
+ // filtering, etc.
+ const int64_t max_read_ahead = 100;
+ const uint32_t read_ahead = uint32_t(max + std::min(max, max_read_ahead));
+
+ result->clear();
+
+ rgw_obj_key marker_obj(params.marker.name, params.marker.instance, params.ns);
+ rgw_obj_index_key cur_marker;
+ marker_obj.get_index_key(&cur_marker);
+
+ rgw_obj_key end_marker_obj(params.end_marker.name, params.end_marker.instance,
+ params.ns);
+ rgw_obj_index_key cur_end_marker;
+ end_marker_obj.get_index_key(&cur_end_marker);
+ const bool cur_end_marker_valid = !params.end_marker.empty();
+
+ rgw_obj_key prefix_obj(params.prefix);
+ prefix_obj.ns = params.ns;
+ string cur_prefix = prefix_obj.get_index_key_name();
+
+ while (truncated && count <= max) {
+ std::vector<rgw_bucket_dir_entry> ent_list;
+ int r = store->cls_bucket_list_unordered(target->get_bucket_info(),
+ shard_id,
+ cur_marker,
+ cur_prefix,
+ read_ahead,
+ params.list_versions,
+ ent_list,
+ &truncated,
+ &cur_marker);
+ if (r < 0)
+ return r;
+
+ // NB: while regions of ent_list will be sorted, we have no
+ // guarantee that all items will be sorted since they can cross
+ // shard boundaries
+
+ for (auto& entry : ent_list) {
+ rgw_obj_index_key index_key = entry.key;
+ rgw_obj_key obj(index_key);
+
+ /* note that parse_raw_oid() here will not set the correct
+ * object's instance, as rgw_obj_index_key encodes that
+ * separately. We don't need to set the instance because it's
+ * not needed for the checks here and we end up using the raw
+ * entry for the return vector
+ */
+ bool valid = rgw_obj_key::parse_raw_oid(index_key.name, &obj);
+ if (!valid) {
+ ldout(cct, 0) << "ERROR: could not parse object name: " <<
+ obj.name << dendl;
+ continue;
+ }
+
+ if (!params.list_versions && !entry.is_visible()) {
+ continue;
+ }
+
+ if (params.enforce_ns && obj.ns != params.ns) {
+ continue;
+ }
+
+ if (cur_end_marker_valid && cur_end_marker <= index_key) {
+ // we're not guaranteed items will come in order, so we have
+ // to loop through all
+ continue;
+ }
+
+ if (count < max) {
+ params.marker = index_key;
+ next_marker = index_key;
+ }
+
+ if (params.filter && !params.filter->filter(obj.name, index_key.name))
+ continue;
+
+ if (params.prefix.size() &&
+ (0 != obj.name.compare(0, params.prefix.size(), params.prefix)))
+ continue;
+
+ if (count >= max) {
+ truncated = true;
+ goto done;
+ }
+
+ result->emplace_back(std::move(entry));
+ count++;
+ } // for (auto& entry : ent_list)
+ } // while (truncated && count <= max)
+
+done:
+ if (is_truncated)
+ *is_truncated = truncated;
+
+ return 0;
+} // list_objects_unordered
+
/**
* create a rados pool, associated meta info
int RGWRados::check_bucket_empty(RGWBucketInfo& bucket_info)
{
- std::map<string, rgw_bucket_dir_entry> ent_map;
+ std::vector<rgw_bucket_dir_entry> ent_list;
rgw_obj_index_key marker;
string prefix;
bool is_truncated;
do {
-#define NUM_ENTRIES 1000
- int r = cls_bucket_list(bucket_info, RGW_NO_SHARD, marker, prefix, NUM_ENTRIES, true, ent_map,
- &is_truncated, &marker);
+ constexpr uint NUM_ENTRIES = 1000u;
+ int r = cls_bucket_list_unordered(bucket_info,
+ RGW_NO_SHARD,
+ marker,
+ prefix,
+ NUM_ENTRIES,
+ true,
+ ent_list,
+ &is_truncated,
+ &marker);
if (r < 0)
return r;
string ns;
- std::map<string, rgw_bucket_dir_entry>::iterator eiter;
- for (eiter = ent_map.begin(); eiter != ent_map.end(); ++eiter) {
+ for (auto const& dirent : ent_list) {
rgw_obj_key obj;
- if (rgw_obj_key::oid_to_key_in_ns(eiter->second.key.name, &obj, ns))
+ if (rgw_obj_key::oid_to_key_in_ns(dirent.key.name, &obj, ns))
return -ENOTEMPTY;
}
} while (is_truncated);
+
return 0;
}
return gc->send_chain(chain, tag, sync);
}
-int RGWRados::open_bucket_index(const RGWBucketInfo& bucket_info, librados::IoCtx& index_ctx, string& bucket_oid)
+int RGWRados::open_bucket_index(const RGWBucketInfo& bucket_info,
+ librados::IoCtx& index_ctx,
+ string& bucket_oid)
{
const rgw_bucket& bucket = bucket_info.bucket;
int r = open_bucket_index_ctx(bucket_info, index_ctx);
return 0;
}
-int RGWRados::open_bucket_index_base(const RGWBucketInfo& bucket_info, librados::IoCtx& index_ctx,
- string& bucket_oid_base) {
+int RGWRados::open_bucket_index_base(const RGWBucketInfo& bucket_info,
+ librados::IoCtx& index_ctx,
+ string& bucket_oid_base) {
const rgw_bucket& bucket = bucket_info.bucket;
int r = open_bucket_index_ctx(bucket_info, index_ctx);
if (r < 0)
}
-int RGWRados::open_bucket_index(const RGWBucketInfo& bucket_info, librados::IoCtx& index_ctx,
- map<int, string>& bucket_objs, int shard_id, map<int, string> *bucket_instance_ids) {
+int RGWRados::open_bucket_index(const RGWBucketInfo& bucket_info,
+ librados::IoCtx& index_ctx,
+ map<int, string>& bucket_objs,
+ int shard_id,
+ map<int, string> *bucket_instance_ids) {
string bucket_oid_base;
int ret = open_bucket_index_base(bucket_info, index_ctx, bucket_oid_base);
if (ret < 0) {
rgw_zone_set zones_trace;
if (_zones_trace) {
zones_trace = *_zones_trace;
- } else {
- zones_trace.insert(get_zone().id);
}
+ zones_trace.insert(get_zone().id);
BucketShard bs(this);
return lc->process();
}
-int RGWRados::process_expire_objects()
+bool RGWRados::process_expire_objects()
{
- obj_expirer->inspect_all_shards(utime_t(), ceph_clock_now());
- return 0;
+ return obj_expirer->inspect_all_shards(utime_t(), ceph_clock_now());
}
int RGWRados::cls_rgw_init_index(librados::IoCtx& index_ctx, librados::ObjectWriteOperation& op, string& oid)
if (_zones_trace) {
zones_trace = *_zones_trace;
}
- else {
- zones_trace.insert(get_zone().id);
- }
-
+ zones_trace.insert(get_zone().id);
+
ObjectWriteOperation o;
cls_rgw_obj_key key(obj.key.get_index_key_name(), obj.key.instance);
cls_rgw_guard_bucket_resharding(o, -ERR_BUSY_RESHARDING);
dir_meta = ent.meta;
dir_meta.category = category;
+ rgw_zone_set zones_trace;
+ if (_zones_trace) {
+ zones_trace = *_zones_trace;
+ }
+ zones_trace.insert(get_zone().id);
+
rgw_bucket_entry_ver ver;
ver.pool = pool;
ver.epoch = epoch;
cls_rgw_obj_key key(ent.key.name, ent.key.instance);
cls_rgw_guard_bucket_resharding(o, -ERR_BUSY_RESHARDING);
cls_rgw_bucket_complete_op(o, op, tag, ver, key, dir_meta, remove_objs,
- get_zone().log_data, bilog_flags, _zones_trace);
+ get_zone().log_data, bilog_flags, &zones_trace);
complete_op_data *arg;
index_completion_manager->create_completion(obj, op, tag, ver, key, dir_meta, remove_objs,
- get_zone().log_data, bilog_flags, _zones_trace, &arg);
+ get_zone().log_data, bilog_flags, &zones_trace, &arg);
librados::AioCompletion *completion = arg->rados_completion;
int ret = bs.index_ctx.aio_operate(bs.bucket_obj, arg->rados_completion, &o);
completion->release(); /* can't reference arg here, as it might have already been released */
return CLSRGWIssueSetTagTimeout(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio, timeout)();
}
-int RGWRados::cls_bucket_list(RGWBucketInfo& bucket_info, int shard_id, rgw_obj_index_key& start, const string& prefix,
- uint32_t num_entries, bool list_versions, map<string, rgw_bucket_dir_entry>& m,
- bool *is_truncated, rgw_obj_index_key *last_entry,
- bool (*force_check_filter)(const string& name))
+
+int RGWRados::cls_bucket_list_ordered(RGWBucketInfo& bucket_info,
+ int shard_id,
+ rgw_obj_index_key& start,
+ const string& prefix,
+ uint32_t num_entries,
+ bool list_versions,
+ map<string, rgw_bucket_dir_entry>& m,
+ bool *is_truncated,
+ rgw_obj_index_key *last_entry,
+ bool (*force_check_filter)(const string& name))
{
- ldout(cct, 10) << "cls_bucket_list " << bucket_info.bucket << " start " << start.name << "[" << start.instance << "] num_entries " << num_entries << dendl;
+ ldout(cct, 10) << "cls_bucket_list_ordered " << bucket_info.bucket <<
+ " start " << start.name << "[" << start.instance << "] num_entries " <<
+ num_entries << dendl;
librados::IoCtx index_ctx;
// key - oid (for different shards if there is any)
- // value - list result for the corresponding oid (shard), it is filled by the AIO callback
+ // value - list result for the corresponding oid (shard), it is filled by
+ // the AIO callback
map<int, string> oids;
map<int, struct rgw_cls_list_ret> list_results;
int r = open_bucket_index(bucket_info, index_ctx, oids, shard_id);
return r;
cls_rgw_obj_key start_key(start.name, start.instance);
- r = CLSRGWIssueBucketList(index_ctx, start_key, prefix, num_entries, list_versions,
- oids, list_results, cct->_conf->rgw_bucket_index_max_aio)();
+ r = CLSRGWIssueBucketList(index_ctx, start_key, prefix, num_entries,
+ list_versions, oids, list_results,
+ cct->_conf->rgw_bucket_index_max_aio)();
if (r < 0)
return r;
* and if the tags are old we need to do cleanup as well. */
librados::IoCtx sub_ctx;
sub_ctx.dup(index_ctx);
- r = check_disk_state(sub_ctx, bucket_info, dirent, dirent, updates[vnames[pos]]);
+ r = check_disk_state(sub_ctx, bucket_info, dirent, dirent,
+ updates[vnames[pos]]);
if (r < 0 && r != -ENOENT) {
return r;
}
}
if (r >= 0) {
- ldout(cct, 10) << "RGWRados::cls_bucket_list: got " << dirent.key.name << "[" << dirent.key.instance << "]" << dendl;
+ ldout(cct, 10) << "RGWRados::cls_bucket_list_ordered: got " <<
+ dirent.key.name << "[" << dirent.key.instance << "]" << dendl;
m[name] = std::move(dirent);
++count;
}
// we don't care if we lose suggested updates, send them off blindly
AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
index_ctx.aio_operate(miter->first, c, &o);
- c->release();
+ c->release();
}
}
// Check if all the returned entries are consumed or not
for (size_t i = 0; i < vcurrents.size(); ++i) {
- if (vcurrents[i] != vends[i])
+ if (vcurrents[i] != vends[i]) {
*is_truncated = true;
+ break;
+ }
}
if (!m.empty())
*last_entry = m.rbegin()->first;
return 0;
}
-int RGWRados::cls_obj_usage_log_add(const string& oid, rgw_usage_log_info& info)
+
+int RGWRados::cls_bucket_list_unordered(RGWBucketInfo& bucket_info,
+ int shard_id,
+ rgw_obj_index_key& start,
+ const string& prefix,
+ uint32_t num_entries,
+ bool list_versions,
+ std::vector<rgw_bucket_dir_entry>& ent_list,
+ bool *is_truncated,
+ rgw_obj_index_key *last_entry,
+ bool (*force_check_filter)(const string& name)) {
+ ldout(cct, 10) << "cls_bucket_list_unordered " << bucket_info.bucket <<
+ " start " << start.name << "[" << start.instance <<
+ "] num_entries " << num_entries << dendl;
+
+ *is_truncated = false;
+ librados::IoCtx index_ctx;
+
+ rgw_obj_index_key my_start = start;
+
+ map<int, string> oids;
+ int r = open_bucket_index(bucket_info, index_ctx, oids, shard_id);
+ if (r < 0)
+ return r;
+ const uint32_t num_shards = oids.size();
+
+ uint32_t current_shard;
+ if (shard_id >= 0) {
+ current_shard = shard_id;
+ } else if (my_start.empty()) {
+ current_shard = 0u;
+ } else {
+ current_shard =
+ rgw_bucket_shard_index(my_start.name, num_shards);
+ }
+
+ uint32_t count = 0u;
+ map<string, bufferlist> updates;
+ std::string last_added_entry;
+ while (count <= num_entries &&
+ ((shard_id >= 0 && current_shard == uint32_t(shard_id)) ||
+ current_shard < num_shards)) {
+ // key - oid (for different shards if there is any)
+ // value - list result for the corresponding oid (shard), it is filled by
+ // the AIO callback
+ map<int, struct rgw_cls_list_ret> list_results;
+ r = CLSRGWIssueBucketList(index_ctx, my_start, prefix, num_entries,
+ list_versions, oids, list_results,
+ cct->_conf->rgw_bucket_index_max_aio)();
+ if (r < 0)
+ return r;
+
+ const std::string& oid = oids[current_shard];
+ assert(list_results.find(current_shard) != list_results.end());
+ auto& result = list_results[current_shard];
+ for (auto& entry : result.dir.m) {
+ rgw_bucket_dir_entry& dirent = entry.second;
+
+ bool force_check = force_check_filter &&
+ force_check_filter(dirent.key.name);
+ if ((!dirent.exists && !dirent.is_delete_marker()) ||
+ !dirent.pending_map.empty() ||
+ force_check) {
+ /* there are uncommitted ops. We need to check the current state,
+ * and if the tags are old we need to do cleanup as well. */
+ librados::IoCtx sub_ctx;
+ sub_ctx.dup(index_ctx);
+ r = check_disk_state(sub_ctx, bucket_info, dirent, dirent, updates[oid]);
+ if (r < 0 && r != -ENOENT) {
+ return r;
+ }
+ }
+
+ // at this point either r >=0 or r == -ENOENT
+ if (r >= 0) { // i.e., if r != -ENOENT
+ ldout(cct, 10) << "RGWRados::cls_bucket_list_unordered: got " <<
+ dirent.key.name << "[" << dirent.key.instance << "]" << dendl;
+
+ if (count < num_entries) {
+ last_added_entry = entry.first;
+ my_start = dirent.key;
+ ent_list.emplace_back(std::move(dirent));
+ ++count;
+ } else {
+ *is_truncated = true;
+ goto check_updates;
+ }
+ } else { // r == -ENOENT
+ // in the case of -ENOENT, make sure we're advancing marker
+ // for possible next call to CLSRGWIssueBucketList
+ my_start = dirent.key;
+ }
+ } // entry for loop
+
+ if (!result.is_truncated) {
+ // if we reached the end of the shard read next shard
+ ++current_shard;
+ my_start = rgw_obj_index_key();
+ }
+ } // shard loop
+
+check_updates:
+ // suggest updates if there is any
+ map<string, bufferlist>::iterator miter = updates.begin();
+ for (; miter != updates.end(); ++miter) {
+ if (miter->second.length()) {
+ ObjectWriteOperation o;
+ cls_rgw_suggest_changes(o, miter->second);
+ // we don't care if we lose suggested updates, send them off blindly
+ AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
+ index_ctx.aio_operate(miter->first, c, &o);
+ c->release();
+ }
+ }
+
+ if (last_entry && !ent_list.empty()) {
+ *last_entry = last_added_entry;
+ }
+
+ return 0;
+}
+
+
+int RGWRados::cls_obj_usage_log_add(const string& oid,
+ rgw_usage_log_info& info)
{
rgw_raw_obj obj(get_zone_params().usage_log_pool, oid);
}
void RGWRados::get_bucket_index_objects(const string& bucket_oid_base,
- uint32_t num_shards, map<int, string>& bucket_objects, int shard_id)
-{
+ uint32_t num_shards,
+ map<int, string>& bucket_objects,
+ int shard_id) {
if (!num_shards) {
bucket_objects[0] = bucket_oid_base;
} else {
*shard_id = -1;
}
} else {
- uint32_t sid = ceph_str_hash_linux(obj_key.c_str(), obj_key.size());
- uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
- sid = rgw_shards_mod(sid2, bucket_info.num_shards);
+ uint32_t sid = rgw_bucket_shard_index(obj_key, bucket_info.num_shards);
if (shard_id) {
*shard_id = (int)sid;
}
*shard_id = -1;
}
} else {
- uint32_t sid = ceph_str_hash_linux(obj_key.c_str(), obj_key.size());
- uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
- sid = rgw_shards_mod(sid2, num_shards);
+ uint32_t sid = rgw_bucket_shard_index(obj_key, num_shards);
char buf[bucket_oid_base.size() + 32];
snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), sid);
(*bucket_obj) = buf;