-
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
int r = rados->ioctx_create(pool.name.c_str(), ioctx);
if (r == -ENOENT && create) {
r = rados->pool_create(pool.name.c_str());
+ if (r == -ERANGE) {
+ dout(0)
+ << __func__
+ << " ERROR: librados::Rados::pool_create returned " << cpp_strerror(-r)
+ << " (this can be due to a pool or placement group misconfiguration, e.g."
+ << " pg_num < pgp_num or mon_max_pg_per_osd exceeded)"
+ << dendl;
+ }
if (r < 0 && r != -EEXIST) {
return r;
}
/* try using default realm */
RGWRealm realm;
int ret = realm.init(cct, store);
+ // no default realm exist
if (ret < 0) {
- ldout(cct, 10) << "could not read realm id: " << cpp_strerror(-ret) << dendl;
- return -ENOENT;
+ return read_id(default_zonegroup_name, default_id);
}
realm_id = realm.get_id();
}
/* try using default realm */
RGWRealm realm;
int ret = realm.init(cct, store);
+ //no default realm exist
if (ret < 0) {
- ldout(cct, 10) << "could not read realm id: " << cpp_strerror(-ret) << dendl;
- return -ENOENT;
+ return read_id(default_zone_name, default_id);
}
realm_id = realm.get_id();
}
obj_op.meta.delete_at = delete_at;
obj_op.meta.user_data = user_data;
obj_op.meta.zones_trace = zones_trace;
+ obj_op.meta.modify_tail = true;
r = obj_op.write_meta(obj_len, accounted_size, attrs);
if (r < 0) {
return 0;
}
+const char* RGWRados::admin_commands[4][3] = {
+ { "cache list",
+ "cache list name=filter,type=CephString,req=false",
+ "cache list [filter_str]: list object cache, possibly matching substrings" },
+ { "cache inspect",
+ "cache inspect name=target,type=CephString,req=true",
+ "cache inspect target: print cache element" },
+ { "cache erase",
+ "cache erase name=target,type=CephString,req=true",
+ "cache erase target: erase element from cache" },
+ { "cache zap",
+ "cache zap",
+ "cache zap: erase all elements from cache" }
+};
+
+
int RGWRados::watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx) {
int r = control_pool_ctx.watch2(oid, watch_handle, ctx);
if (r < 0)
uint64_t interval_msec() override {
return cct->_conf->rgw_md_notify_interval_msec;
}
+ void stop_process() override {
+ notify_mgr.stop();
+ }
public:
RGWMetaNotifier(RGWRados *_store, RGWMetadataLog* log)
: RGWRadosThread(_store, "meta-notifier"), notify_mgr(_store), log(log) {}
uint64_t interval_msec() override {
return cct->_conf->get_val<int64_t>("rgw_data_notify_interval_msec");
}
+ void stop_process() override {
+ notify_mgr.stop();
+ }
public:
RGWDataNotifier(RGWRados *_store) : RGWRadosThread(_store, "data-notifier"), notify_mgr(_store) {}
public:
RGWDataSyncProcessorThread(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
const string& _source_zone)
- : RGWSyncProcessorThread(_store, "data-sync"), sync(_store, async_rados, _source_zone),
+ : RGWSyncProcessorThread(_store, "data-sync"),
+ sync(_store, async_rados, _source_zone),
initialized(false) {}
void wakeup_sync_shards(map<int, set<string> >& shard_ids) {
{
RGWCoroutinesManager crs;
RGWRados *store;
+ rgw::BucketTrimManager *bucket_trim;
RGWHTTPManager http;
const utime_t trim_interval;
uint64_t interval_msec() override { return 0; }
void stop_process() override { crs.stop(); }
public:
- RGWSyncLogTrimThread(RGWRados *store, int interval)
+ RGWSyncLogTrimThread(RGWRados *store, rgw::BucketTrimManager *bucket_trim,
+ int interval)
: RGWSyncProcessorThread(store, "sync-log-trim"),
crs(store->ctx(), store->get_cr_registry()), store(store),
+ bucket_trim(bucket_trim),
http(store->ctx(), crs.get_completion_mgr()),
trim_interval(interval, 0)
{}
trim_interval));
stacks.push_back(data);
+ auto bucket = new RGWCoroutinesStack(store->ctx(), &crs);
+ bucket->call(bucket_trim->create_bucket_trim_cr(&http));
+ stacks.push_back(bucket);
+
crs.run(stacks);
return 0;
}
for (int i = 0; i < num_shards; ++i) {
Mutex::Locker l(*locks[i]);
for (auto c : completions[i]) {
- Mutex::Locker cl(c->lock);
c->stop();
}
}
void RGWRados::finalize()
{
+ auto admin_socket = cct->get_admin_socket();
+ for (auto cmd : admin_commands) {
+ int r = admin_socket->unregister_command(cmd[0]);
+ if (r < 0) {
+ lderr(cct) << "ERROR: fail to unregister admin socket command (r=" << r
+ << ")" << dendl;
+ }
+ }
+
if (run_sync_thread) {
Mutex::Locker l(meta_sync_thread_lock);
meta_sync_processor_thread->stop();
data_sync_processor_threads.clear();
delete sync_log_trimmer;
sync_log_trimmer = nullptr;
+ bucket_trim = boost::none;
}
if (finisher) {
finisher->stop();
int RGWRados::init_rados()
{
int ret = 0;
+ auto admin_socket = cct->get_admin_socket();
+ for (auto cmd : admin_commands) {
+ int r = admin_socket->register_command(cmd[0], cmd[1], this,
+ cmd[2]);
+ if (r < 0) {
+ lderr(cct) << "ERROR: fail to register admin socket command (r=" << r
+ << ")" << dendl;
+ return r;
+ }
+ }
+
auto handles = std::vector<librados::Rados>{cct->_conf->rgw_num_rados_handles};
for (auto& r : handles) {
obj_expirer->start_processor();
}
- if (run_sync_thread) {
- // initialize the log period history. we want to do this any time we're not
- // running under radosgw-admin, so we check run_sync_thread here before
- // disabling it based on the zone/zonegroup setup
- meta_mgr->init_oldest_log_period();
- }
-
/* no point of running sync thread if we don't have a master zone configured
or there is no rest_master_conn */
if (get_zonegroup().master_zone.empty() || !rest_master_conn
run_sync_thread = false;
}
+ if (run_sync_thread) {
+ // initialize the log period history
+ meta_mgr->init_oldest_log_period();
+ }
+
async_rados = new RGWAsyncRadosProcessor(this, cct->_conf->rgw_num_async_rados_threads);
async_rados->start();
}
meta_sync_processor_thread->start();
+ // configure the bucket trim manager
+ rgw::BucketTrimConfig config;
+ rgw::configure_bucket_trim(cct, config);
+
+ bucket_trim.emplace(this, config);
+ ret = bucket_trim->init();
+ if (ret < 0) {
+ ldout(cct, 0) << "ERROR: failed to start bucket trim manager" << dendl;
+ return ret;
+ }
+ data_log->set_observer(&*bucket_trim);
+
Mutex::Locker dl(data_sync_thread_lock);
for (auto iter : zone_data_sync_from_map) {
ldout(cct, 5) << "starting data sync thread for zone " << iter.first << dendl;
- RGWDataSyncProcessorThread *thread = new RGWDataSyncProcessorThread(this, async_rados, iter.first);
+ auto *thread = new RGWDataSyncProcessorThread(this, async_rados, iter.first);
ret = thread->init();
if (ret < 0) {
ldout(cct, 0) << "ERROR: failed to initialize data sync thread" << dendl;
}
auto interval = cct->_conf->rgw_sync_log_trim_interval;
if (interval > 0) {
- sync_log_trimmer = new RGWSyncLogTrimThread(this, interval);
+ sync_log_trimmer = new RGWSyncLogTrimThread(this, &*bucket_trim, interval);
ret = sync_log_trimmer->init();
if (ret < 0) {
ldout(cct, 0) << "ERROR: failed to initialize sync log trim thread" << dendl;
notify_oid.append(buf);
}
-int RGWRados::open_pool_ctx(const rgw_pool& pool, librados::IoCtx& io_ctx)
+int RGWRados::open_pool_ctx(const rgw_pool& pool, librados::IoCtx& io_ctx)
{
- librados::Rados *rad = get_rados_handle();
- int r = rgw_init_ioctx(rad, pool, io_ctx);
- if (r != -ENOENT)
- return r;
-
- if (!pools_initialized)
- return r;
-
- r = rad->pool_create(pool.name.c_str());
- if (r < 0 && r != -EEXIST)
- return r;
-
- r = rgw_init_ioctx(rad, pool, io_ctx);
- if (r < 0)
- return r;
-
- r = io_ctx.application_enable(pg_pool_t::APPLICATION_NAME_RGW, false);
- if (r < 0 && r != -EOPNOTSUPP)
- return r;
- return 0;
+ constexpr bool create = true; // create the pool if it doesn't exist
+ return rgw_init_ioctx(get_rados_handle(), pool, io_ctx, create);
}
void RGWRados::build_bucket_index_marker(const string& shard_id_str, const string& shard_marker,
int RGWRados::open_bucket_index_ctx(const RGWBucketInfo& bucket_info, librados::IoCtx& index_ctx)
{
+ const rgw_pool& explicit_pool = bucket_info.bucket.explicit_placement.index_pool;
+
+ if (!explicit_pool.empty()) {
+ return open_pool_ctx(explicit_pool, index_ctx);
+ }
+
const string *rule = &bucket_info.placement_rule;
if (rule->empty()) {
rule = &zonegroup.default_placement;
usage_log_hash(cct, user_str, first_hash, index);
hash = first_hash;
-
do {
int ret = cls_obj_usage_log_trim(hash, user_str, start_epoch, end_epoch);
- if (ret == -ENOENT)
- goto next;
- if (ret < 0)
+ if (ret < 0 && ret != -ENOENT)
return ret;
-next:
usage_log_hash(cct, user_str, hash, ++index);
} while (hash != first_hash);
int RGWRados::key_to_shard_id(const string& key, int max_shards)
{
- return rgw_shards_hash(key, max_shards);
+ return rgw_shard_id(key, max_shards);
}
void RGWRados::shard_name(const string& prefix, unsigned max_shards, const string& key, string& name, int *shard_id)
{
string obj_key = key.name + key.instance;
int num_shards = cct->_conf->rgw_objexp_hints_num_shards;
- uint32_t sid = ceph_str_hash_linux(obj_key.c_str(), obj_key.size());
- uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
- sid = rgw_shards_mod(sid2, num_shards);
- return sid;
+ return rgw_bucket_shard_index(obj_key, num_shards);
}
static string objexp_hint_get_keyext(const string& tenant_name,
return 0;
}
-/**
- * get listing of the objects in a bucket.
+
+/**
+ * Get ordered listing of the objects in a bucket.
*
* max: maximum number of results to return
* bucket: bucket to list contents of
* common_prefixes: if delim is filled in, any matching prefixes are placed here.
* is_truncated: if number of objects in the bucket is bigger than max, then truncated.
*/
-int RGWRados::Bucket::List::list_objects(int64_t max,
- vector<rgw_bucket_dir_entry> *result,
- map<string, bool> *common_prefixes,
- bool *is_truncated)
+int RGWRados::Bucket::List::list_objects_ordered(int64_t max,
+ vector<rgw_bucket_dir_entry> *result,
+ map<string, bool> *common_prefixes,
+ bool *is_truncated)
{
RGWRados *store = target->get_store();
CephContext *cct = store->ctx();
result->clear();
rgw_obj_key marker_obj(params.marker.name, params.marker.instance, params.ns);
-
- rgw_obj_key end_marker_obj;
- rgw_obj_index_key cur_end_marker;
- if (!params.ns.empty()) {
- end_marker_obj = rgw_obj_key(params.end_marker.name, params.end_marker.instance, params.ns);
- end_marker_obj.ns = params.ns;
- end_marker_obj.get_index_key(&cur_end_marker);
- }
rgw_obj_index_key cur_marker;
marker_obj.get_index_key(&cur_marker);
+ rgw_obj_key end_marker_obj(params.end_marker.name, params.end_marker.instance,
+ params.ns);
+ rgw_obj_index_key cur_end_marker;
+ end_marker_obj.get_index_key(&cur_end_marker);
const bool cur_end_marker_valid = !params.end_marker.empty();
rgw_obj_key prefix_obj(params.prefix);
string bigger_than_delim;
if (!params.delim.empty()) {
- unsigned long val = decode_utf8((unsigned char *)params.delim.c_str(), params.delim.size());
+ unsigned long val = decode_utf8((unsigned char *)params.delim.c_str(),
+ params.delim.size());
char buf[params.delim.size() + 16];
int r = encode_utf8(val + 1, (unsigned char *)buf);
if (r < 0) {
cur_marker = s;
}
}
-
+
string skip_after_delim;
while (truncated && count <= max) {
if (skip_after_delim > cur_marker.name) {
ldout(cct, 20) << "setting cur_marker=" << cur_marker.name << "[" << cur_marker.instance << "]" << dendl;
}
std::map<string, rgw_bucket_dir_entry> ent_map;
- int r = store->cls_bucket_list(target->get_bucket_info(), shard_id, cur_marker, cur_prefix,
- read_ahead + 1 - count, params.list_versions, ent_map,
- &truncated, &cur_marker);
+ int r = store->cls_bucket_list_ordered(target->get_bucket_info(),
+ shard_id,
+ cur_marker,
+ cur_prefix,
+ read_ahead + 1 - count,
+ params.list_versions,
+ ent_map,
+ &truncated,
+ &cur_marker);
if (r < 0)
return r;
- std::map<string, rgw_bucket_dir_entry>::iterator eiter;
- for (eiter = ent_map.begin(); eiter != ent_map.end(); ++eiter) {
+ for (auto eiter = ent_map.begin(); eiter != ent_map.end(); ++eiter) {
rgw_bucket_dir_entry& entry = eiter->second;
rgw_obj_index_key index_key = entry.key;
rgw_obj_key obj(index_key);
- /* note that parse_raw_oid() here will not set the correct object's instance, as
- * rgw_obj_index_key encodes that separately. We don't need to set the instance because it's
- * not needed for the checks here and we end up using the raw entry for the return vector
+ /* note that parse_raw_oid() here will not set the correct
+ * object's instance, as rgw_obj_index_key encodes that
+ * separately. We don't need to set the instance because it's
+ * not needed for the checks here and we end up using the raw
+ * entry for the return vector
*/
bool valid = rgw_obj_key::parse_raw_oid(index_key.name, &obj);
if (!valid) {
if (params.filter && !params.filter->filter(obj.name, index_key.name))
continue;
- if (params.prefix.size() && (obj.name.compare(0, params.prefix.size(), params.prefix) != 0))
+ if (params.prefix.size() &&
+ (obj.name.compare(0, params.prefix.size(), params.prefix) != 0))
continue;
if (!params.delim.empty()) {
result->emplace_back(std::move(entry));
count++;
}
-
- // Either the back-end telling us truncated, or we don't consume all
- // items returned per the amount caller request
- truncated = (truncated || eiter != ent_map.end());
}
done:
*is_truncated = truncated;
return 0;
-}
+} // list_objects_ordered
+
/**
- * create a rados pool, associated meta info
- * returns 0 on success, -ERR# otherwise.
+ * Get listing of the objects in a bucket and allow the results to be out
+ * of order.
+ *
+ * Even though there are key differences with the ordered counterpart,
+ * the parameters are the same to maintain some compatability.
+ *
+ * max: maximum number of results to return
+ * bucket: bucket to list contents of
+ * prefix: only return results that match this prefix
+ * delim: should not be set; if it is we should have indicated an error
+ * marker: if filled in, begin the listing with this object.
+ * end_marker: if filled in, end the listing with this object.
+ * result: the objects are put in here.
+ * common_prefixes: this is never filled with an unordered list; the param
+ * is maintained for compatibility
+ * is_truncated: if number of objects in the bucket is bigger than max, then
+ * truncated.
*/
-int RGWRados::create_pool(const rgw_pool& pool)
+int RGWRados::Bucket::List::list_objects_unordered(int64_t max,
+ vector<rgw_bucket_dir_entry> *result,
+ map<string, bool> *common_prefixes,
+ bool *is_truncated)
{
- int ret = 0;
+ RGWRados *store = target->get_store();
+ CephContext *cct = store->ctx();
+ int shard_id = target->get_shard_id();
- librados::Rados *rad = get_rados_handle();
- ret = rad->pool_create(pool.name.c_str(), 0);
- if (ret == -EEXIST)
- ret = 0;
- else if (ret == -ERANGE) {
- ldout(cct, 0)
- << __func__
- << " ERROR: librados::Rados::pool_create returned " << cpp_strerror(-ret)
- << " (this can be due to a pool or placement group misconfiguration, e.g., pg_num < pgp_num)"
- << dendl;
- }
- if (ret < 0)
- return ret;
+ int count = 0;
+ bool truncated = true;
- librados::IoCtx io_ctx;
- ret = rad->ioctx_create(pool.name.c_str(), io_ctx);
- if (ret < 0)
- return ret;
+ // read a few extra in each call to cls_bucket_list_unordered in
+ // case some are filtered out due to namespace matching, versioning,
+ // filtering, etc.
+ const int64_t max_read_ahead = 100;
+ const uint32_t read_ahead = uint32_t(max + std::min(max, max_read_ahead));
+
+ result->clear();
+
+ rgw_obj_key marker_obj(params.marker.name, params.marker.instance, params.ns);
+ rgw_obj_index_key cur_marker;
+ marker_obj.get_index_key(&cur_marker);
+
+ rgw_obj_key end_marker_obj(params.end_marker.name, params.end_marker.instance,
+ params.ns);
+ rgw_obj_index_key cur_end_marker;
+ end_marker_obj.get_index_key(&cur_end_marker);
+ const bool cur_end_marker_valid = !params.end_marker.empty();
+
+ rgw_obj_key prefix_obj(params.prefix);
+ prefix_obj.ns = params.ns;
+ string cur_prefix = prefix_obj.get_index_key_name();
+
+ while (truncated && count <= max) {
+ std::vector<rgw_bucket_dir_entry> ent_list;
+ int r = store->cls_bucket_list_unordered(target->get_bucket_info(),
+ shard_id,
+ cur_marker,
+ cur_prefix,
+ read_ahead,
+ params.list_versions,
+ ent_list,
+ &truncated,
+ &cur_marker);
+ if (r < 0)
+ return r;
+
+ // NB: while regions of ent_list will be sorted, we have no
+ // guarantee that all items will be sorted since they can cross
+ // shard boundaries
+
+ for (auto& entry : ent_list) {
+ rgw_obj_index_key index_key = entry.key;
+ rgw_obj_key obj(index_key);
+
+ /* note that parse_raw_oid() here will not set the correct
+ * object's instance, as rgw_obj_index_key encodes that
+ * separately. We don't need to set the instance because it's
+ * not needed for the checks here and we end up using the raw
+ * entry for the return vector
+ */
+ bool valid = rgw_obj_key::parse_raw_oid(index_key.name, &obj);
+ if (!valid) {
+ ldout(cct, 0) << "ERROR: could not parse object name: " <<
+ obj.name << dendl;
+ continue;
+ }
+
+ if (!params.list_versions && !entry.is_visible()) {
+ continue;
+ }
+
+ if (params.enforce_ns && obj.ns != params.ns) {
+ continue;
+ }
+
+ if (cur_end_marker_valid && cur_end_marker <= index_key) {
+ // we're not guaranteed items will come in order, so we have
+ // to loop through all
+ continue;
+ }
+
+ if (count < max) {
+ params.marker = index_key;
+ next_marker = index_key;
+ }
+
+ if (params.filter && !params.filter->filter(obj.name, index_key.name))
+ continue;
+
+ if (params.prefix.size() &&
+ (0 != obj.name.compare(0, params.prefix.size(), params.prefix)))
+ continue;
+
+ if (count >= max) {
+ truncated = true;
+ goto done;
+ }
+
+ result->emplace_back(std::move(entry));
+ count++;
+ } // for (auto& entry : ent_list)
+ } // while (truncated && count <= max)
+
+done:
+ if (is_truncated)
+ *is_truncated = truncated;
- ret = io_ctx.application_enable(pg_pool_t::APPLICATION_NAME_RGW, false);
- if (ret < 0 && ret != -EOPNOTSUPP)
- return ret;
return 0;
+} // list_objects_unordered
+
+
+/**
+ * create a rados pool, associated meta info
+ * returns 0 on success, -ERR# otherwise.
+ */
+int RGWRados::create_pool(const rgw_pool& pool)
+{
+ librados::IoCtx io_ctx;
+ constexpr bool create = true;
+ return rgw_init_ioctx(get_rados_handle(), pool, io_ctx, create);
}
int RGWRados::init_bucket_index(RGWBucketInfo& bucket_info, int num_shards)
return 0;
}
+int RGWRados::BucketShard::init(const RGWBucketInfo& bucket_info, int sid)
+{
+ bucket = bucket_info.bucket;
+ shard_id = sid;
+
+ int ret = store->open_bucket_index_shard(bucket_info, index_ctx, shard_id, &bucket_obj);
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: open_bucket_index_shard() returned ret=" << ret << dendl;
+ return ret;
+ }
+ ldout(store->ctx(), 20) << " bucket index object: " << bucket_obj << dendl;
+
+ return 0;
+}
+
/* Execute @handler on last item in bucket listing for bucket specified
* in @bucket_info. @obj_prefix and @obj_delim narrow down the listing
* Returns: 0 on success, -ERR# otherwise.
*/
int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_size,
- map<string, bufferlist>& attrs, bool assume_noent,
+ map<string, bufferlist>& attrs,
+ bool assume_noent, bool modify_tail,
void *_index_op)
{
RGWRados::Bucket::UpdateIndex *index_op = static_cast<RGWRados::Bucket::UpdateIndex *>(_index_op);
if (!ptag && !index_op->get_optag()->empty()) {
ptag = index_op->get_optag();
}
- r = target->prepare_atomic_modification(op, reset_obj, ptag, meta.if_match, meta.if_nomatch, false);
+ r = target->prepare_atomic_modification(op, reset_obj, ptag, meta.if_match, meta.if_nomatch, false, modify_tail);
if (r < 0)
return r;
orig_size = state->accounted_size;
}
- bool versioned_target = (meta.olh_epoch > 0 || !obj.key.instance.empty());
+ bool versioned_target = (meta.olh_epoch && *meta.olh_epoch > 0) ||
+ !obj.key.instance.empty();
bool versioned_op = (target->versioning_enabled() || is_olh || versioned_target);
target->invalidate_state();
state = NULL;
- if (versioned_op) {
- r = store->set_olh(target->get_ctx(), target->get_bucket_info(), obj, false, NULL, meta.olh_epoch, real_time(), false, meta.zones_trace);
+ if (versioned_op && meta.olh_epoch) {
+ r = store->set_olh(target->get_ctx(), target->get_bucket_info(), obj, false, NULL, *meta.olh_epoch, real_time(), false, meta.zones_trace);
if (r < 0) {
return r;
}
meta.canceled = false;
/* update quota cache */
- store->quota_handler->update_stats(meta.owner, obj.bucket, (orig_exists ? 0 : 1),
- accounted_size, orig_size);
+ if (meta.completeMultipart){
+ store->quota_handler->update_stats(meta.owner, obj.bucket, (orig_exists ? 0 : 1),
+ 0, orig_size);
+ }
+ else {
+ store->quota_handler->update_stats(meta.owner, obj.bucket, (orig_exists ? 0 : 1),
+ accounted_size, orig_size);
+ }
return 0;
done_cancel:
bool assume_noent = (meta.if_match == NULL && meta.if_nomatch == NULL);
int r;
if (assume_noent) {
- r = _do_write_meta(size, accounted_size, attrs, assume_noent, (void *)&index_op);
+ r = _do_write_meta(size, accounted_size, attrs, assume_noent, meta.modify_tail, (void *)&index_op);
if (r == -EEXIST) {
assume_noent = false;
}
}
if (!assume_noent) {
- r = _do_write_meta(size, accounted_size, attrs, assume_noent, (void *)&index_op);
+ r = _do_write_meta(size, accounted_size, attrs, assume_noent, meta.modify_tail, (void *)&index_op);
}
return r;
}
return c->is_safe();
}
+// PutObj filter that buffers data so we don't try to compress tiny blocks.
+// libcurl reads in 16k at a time, and we need at least 64k to get a good
+// compression ratio
+class RGWPutObj_Buffer : public RGWPutObj_Filter {
+ const unsigned buffer_size;
+ bufferlist buffer;
+ public:
+ RGWPutObj_Buffer(RGWPutObjDataProcessor* next, unsigned buffer_size)
+ : RGWPutObj_Filter(next), buffer_size(buffer_size) {
+ assert(ISP2(buffer_size)); // must be power of 2
+ }
+
+ int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj,
+ bool *again) override {
+ if (*again || !bl.length()) {
+ // flush buffered data
+ return RGWPutObj_Filter::handle_data(buffer, ofs, phandle, pobj, again);
+ }
+ // transform offset to the beginning of the buffer
+ ofs = ofs - buffer.length();
+ buffer.claim_append(bl);
+ if (buffer.length() < buffer_size) {
+ *again = false; // don't come back until there's more data
+ return 0;
+ }
+ const auto count = P2ALIGN(buffer.length(), buffer_size);
+ buffer.splice(0, count, &bl);
+ return RGWPutObj_Filter::handle_data(bl, ofs, phandle, pobj, again);
+ }
+};
+
class RGWRadosPutObj : public RGWGetDataCB
{
CephContext* cct;
rgw_obj obj;
RGWPutObjDataProcessor *filter;
boost::optional<RGWPutObj_Compress>& compressor;
+ boost::optional<RGWPutObj_Buffer> buffering;
CompressorRef& plugin;
RGWPutObjProcessor_Atomic *processor;
RGWOpStateSingleOp *opstate;
void (*progress_cb)(off_t, void *);
void *progress_data;
bufferlist extra_data_bl;
- uint64_t extra_data_len;
+ uint64_t extra_data_left;
uint64_t data_len;
map<string, bufferlist> src_attrs;
public:
opstate(_ops),
progress_cb(_progress_cb),
progress_data(_progress_data),
- extra_data_len(0),
+ extra_data_left(0),
data_len(0) {}
int process_attrs(void) {
if (plugin && src_attrs.find(RGW_ATTR_CRYPT_MODE) == src_attrs.end()) {
//do not compress if object is encrypted
compressor = boost::in_place(cct, plugin, filter);
- filter = &*compressor;
+ constexpr unsigned buffer_size = 512 * 1024;
+ buffering = boost::in_place(&*compressor, buffer_size);
+ filter = &*buffering;
}
return 0;
}
if (progress_cb) {
progress_cb(ofs, progress_data);
}
- if (extra_data_len) {
+ if (extra_data_left) {
size_t extra_len = bl.length();
- if (extra_len > extra_data_len)
- extra_len = extra_data_len;
+ if (extra_len > extra_data_left)
+ extra_len = extra_data_left;
bufferlist extra;
bl.splice(0, extra_len, &extra);
extra_data_bl.append(extra);
- extra_data_len -= extra_len;
- if (extra_data_len == 0) {
+ extra_data_left -= extra_len;
+ if (extra_data_left == 0) {
int res = process_attrs();
if (res < 0)
return res;
if (bl.length() == 0) {
return 0;
}
+ ofs += extra_len;
}
+ // adjust ofs based on extra_data_len, so the result is a logical offset
+ // into the object data
+ assert(uint64_t(ofs) >= extra_data_len);
+ ofs -= extra_data_len;
+
data_len += bl.length();
bool again = false;
return 0;
}
+ int flush() {
+ bufferlist bl;
+ return put_data_and_throttle(filter, bl, 0, false);
+ }
+
bufferlist& get_extra_data() { return extra_data_bl; }
map<string, bufferlist>& get_attrs() { return src_attrs; }
void set_extra_data_len(uint64_t len) override {
- extra_data_len = len;
+ extra_data_left = len;
+ RGWGetDataCB::set_extra_data_len(len);
}
uint64_t get_data_len() {
if (!attrs[RGW_ATTR_ETAG].length()) {
attrs[RGW_ATTR_ETAG] = src_attrs[RGW_ATTR_ETAG];
}
+ if (!attrs[RGW_ATTR_TAIL_TAG].length()) {
+ auto ttiter = src_attrs.find(RGW_ATTR_TAIL_TAG);
+ if (ttiter != src_attrs.end()) {
+ attrs[RGW_ATTR_TAIL_TAG] = src_attrs[RGW_ATTR_TAIL_TAG];
+ }
+ }
break;
case RGWRados::ATTRSMOD_MERGE:
for (map<string, bufferlist>::iterator it = src_attrs.begin(); it != src_attrs.end(); ++it) {
return ret;
attrset.erase(RGW_ATTR_ID_TAG);
+ attrset.erase(RGW_ATTR_TAIL_TAG);
uint64_t max_chunk_size;
return ret;
}
- return copy_obj_data(rctx, dest_bucket_info, read_op, obj_size - 1, obj, obj, max_chunk_size, NULL, mtime, attrset,
- RGW_OBJ_CATEGORY_MAIN, 0, real_time(), NULL, NULL, NULL);
+ return copy_obj_data(rctx, dest_bucket_info, read_op, obj_size - 1, obj, obj,
+ max_chunk_size, NULL, mtime, attrset,
+ RGW_OBJ_CATEGORY_MAIN, 0, real_time(),
+ (obj.key.instance.empty() ? NULL : &(obj.key.instance)),
+ NULL, NULL);
}
struct obj_time_weight {
obj_time_weight dest_mtime_weight;
+ constexpr bool prepend_meta = true;
+ constexpr bool get_op = true;
+ constexpr bool rgwx_stat = true;
+ constexpr bool sync_manifest = true;
+ constexpr bool skip_decrypt = true;
int ret = conn->get_obj(user_id, info, src_obj, pmod, unmod_ptr,
dest_mtime_weight.zone_short_id, dest_mtime_weight.pg_ver,
- true /* prepend_meta */, true /* GET */, true /* rgwx-stat */,
- true /* sync manifest */, &cb, &in_stream_req);
+ prepend_meta, get_op, rgwx_stat,
+ sync_manifest, skip_decrypt, &cb, &in_stream_req);
if (ret < 0) {
return ret;
}
bool copy_if_newer,
map<string, bufferlist>& attrs,
RGWObjCategory category,
- uint64_t olh_epoch,
+ boost::optional<uint64_t> olh_epoch,
real_time delete_at,
string *version_id,
string *ptag,
if (version_id && *version_id != "null") {
processor.set_version_id(*version_id);
}
- processor.set_olh_epoch(olh_epoch);
+ if (olh_epoch) {
+ processor.set_olh_epoch(*olh_epoch);
+ }
int ret = processor.prepare(this, NULL);
if (ret < 0) {
return ret;
}
}
+ static constexpr bool prepend_meta = true;
+ static constexpr bool get_op = true;
+ static constexpr bool rgwx_stat = false;
+ static constexpr bool sync_manifest = true;
+ static constexpr bool skip_decrypt = true;
ret = conn->get_obj(user_id, info, src_obj, pmod, unmod_ptr,
dest_mtime_weight.zone_short_id, dest_mtime_weight.pg_ver,
- true /* prepend_meta */, true /* GET */, false /* rgwx-stat */,
- true /* sync manifest */, &cb, &in_stream_req);
+ prepend_meta, get_op, rgwx_stat,
+ sync_manifest, skip_decrypt, &cb, &in_stream_req);
if (ret < 0) {
goto set_err_state;
}
if (ret < 0) {
goto set_err_state;
}
+ ret = cb.flush();
+ if (ret < 0) {
+ goto set_err_state;
+ }
if (compressor && compressor->is_compressed()) {
bufferlist tmp;
RGWCompressionInfo cs_info;
return 0;
set_err_state:
if (copy_if_newer && ret == -ERR_NOT_MODIFIED) {
- ret = 0;
+ // we may have already fetched during sync of OP_ADD, but were waiting
+ // for OP_LINK_OLH to call set_olh() with a real olh_epoch
+ if (olh_epoch && *olh_epoch > 0) {
+ constexpr bool log_data_change = true;
+ ret = set_olh(obj_ctx, dest_bucket_info, dest_obj, false, nullptr,
+ *olh_epoch, real_time(), false, zones_trace, log_data_change);
+ } else {
+ // we already have the latest copy
+ ret = 0;
+ }
}
if (opstate) {
RGWOpState::OpState state;
if (ret < 0) {
return ret;
}
+ if (src_attrs.count(RGW_ATTR_CRYPT_MODE)) {
+ // Current implementation does not follow S3 spec and even
+ // may result in data corruption silently when copying
+ // multipart objects acorss pools. So reject COPY operations
+ //on encrypted objects before it is fully functional.
+ ldout(cct, 0) << "ERROR: copy op for encrypted object " << src_obj
+ << " has not been implemented." << dendl;
+ return -ERR_NOT_IMPLEMENTED;
+ }
src_attrs[RGW_ATTR_ACL] = attrs[RGW_ATTR_ACL];
src_attrs.erase(RGW_ATTR_DELETE_AT);
}
if (!copy_itself) {
+ attrs.erase(RGW_ATTR_TAIL_TAG);
manifest = astate->manifest;
const rgw_bucket_placement& tail_placement = manifest.get_tail_placement();
if (tail_placement.bucket.name.empty()) {
manifest.set_tail_placement(tail_placement.placement_rule, src_obj.bucket);
}
+ string ref_tag;
for (; miter != astate->manifest.obj_end(); ++miter) {
ObjectWriteOperation op;
- cls_refcount_get(op, tag, true);
+ ref_tag = tag + '\0';
+ cls_refcount_get(op, ref_tag, true);
const rgw_raw_obj& loc = miter.get_location().get_raw_obj(this);
ref.ioctx.locator_set_key(loc.loc);
write_op.meta.category = category;
write_op.meta.olh_epoch = olh_epoch;
write_op.meta.delete_at = delete_at;
+ write_op.meta.modify_tail = !copy_itself;
ret = write_op.write_meta(obj_size, astate->accounted_size, attrs);
if (ret < 0) {
append_rand_alpha(cct, tag, tag, 32);
RGWPutObjProcessor_Atomic processor(obj_ctx,
- dest_bucket_info, dest_obj.bucket, dest_obj.get_oid(),
+ dest_bucket_info, dest_obj.bucket, dest_obj.key.name,
cct->_conf->rgw_obj_stripe_size, tag, dest_bucket_info.versioning_enabled());
if (version_id) {
processor.set_version_id(*version_id);
int RGWRados::check_bucket_empty(RGWBucketInfo& bucket_info)
{
- std::map<string, rgw_bucket_dir_entry> ent_map;
+ std::vector<rgw_bucket_dir_entry> ent_list;
rgw_obj_index_key marker;
string prefix;
bool is_truncated;
do {
-#define NUM_ENTRIES 1000
- int r = cls_bucket_list(bucket_info, RGW_NO_SHARD, marker, prefix, NUM_ENTRIES, true, ent_map,
- &is_truncated, &marker);
+ constexpr uint NUM_ENTRIES = 1000u;
+ int r = cls_bucket_list_unordered(bucket_info,
+ RGW_NO_SHARD,
+ marker,
+ prefix,
+ NUM_ENTRIES,
+ true,
+ ent_list,
+ &is_truncated,
+ &marker);
if (r < 0)
return r;
string ns;
- std::map<string, rgw_bucket_dir_entry>::iterator eiter;
- for (eiter = ent_map.begin(); eiter != ent_map.end(); ++eiter) {
+ for (auto const& dirent : ent_list) {
rgw_obj_key obj;
- if (rgw_obj_key::oid_to_key_in_ns(eiter->second.key.name, &obj, ns))
+ if (rgw_obj_key::oid_to_key_in_ns(dirent.key.name, &obj, ns))
return -ENOTEMPTY;
}
} while (is_truncated);
+
return 0;
}
return 0;
}
- string tag = state->obj_tag.to_str();
+ string tag = (state->tail_tag.length() > 0 ? state->tail_tag.to_str() : state->obj_tag.to_str());
return store->gc->send_chain(chain, tag, false); // do it async
}
return gc->send_chain(chain, tag, sync);
}
-int RGWRados::open_bucket_index(const RGWBucketInfo& bucket_info, librados::IoCtx& index_ctx, string& bucket_oid)
+int RGWRados::open_bucket_index(const RGWBucketInfo& bucket_info,
+ librados::IoCtx& index_ctx,
+ string& bucket_oid)
{
const rgw_bucket& bucket = bucket_info.bucket;
int r = open_bucket_index_ctx(bucket_info, index_ctx);
return 0;
}
-int RGWRados::open_bucket_index_base(const RGWBucketInfo& bucket_info, librados::IoCtx& index_ctx,
- string& bucket_oid_base) {
+int RGWRados::open_bucket_index_base(const RGWBucketInfo& bucket_info,
+ librados::IoCtx& index_ctx,
+ string& bucket_oid_base) {
const rgw_bucket& bucket = bucket_info.bucket;
int r = open_bucket_index_ctx(bucket_info, index_ctx);
if (r < 0)
}
-int RGWRados::open_bucket_index(const RGWBucketInfo& bucket_info, librados::IoCtx& index_ctx,
- map<int, string>& bucket_objs, int shard_id, map<int, string> *bucket_instance_ids) {
+int RGWRados::open_bucket_index(const RGWBucketInfo& bucket_info,
+ librados::IoCtx& index_ctx,
+ map<int, string>& bucket_objs,
+ int shard_id,
+ map<int, string> *bucket_instance_ids) {
string bucket_oid_base;
int ret = open_bucket_index_base(bucket_info, index_ctx, bucket_oid_base);
if (ret < 0) {
return -EINVAL;
}
- if (state->obj_tag.length() == 0) {// check for backward compatibility
+ string tag;
+
+ if (state->tail_tag.length() > 0) {
+ tag = state->tail_tag.c_str();
+ } else if (state->obj_tag.length() > 0) {
+ tag = state->obj_tag.c_str();
+ } else {
ldout(cct, 20) << "state->obj_tag is empty, not deferring gc operation" << dendl;
return -EINVAL;
}
- string tag = state->obj_tag.c_str();
-
ldout(cct, 0) << "defer chain tag=" << tag << dendl;
return gc->defer_chain(tag, false);
}
result.version_id = marker.key.instance;
+ if (result.version_id.empty())
+ result.version_id = "null";
result.delete_marker = true;
struct rgw_bucket_dir_entry_meta meta;
return -ENOENT;
}
- r = target->prepare_atomic_modification(op, false, NULL, NULL, NULL, true);
+ r = target->prepare_atomic_modification(op, false, NULL, NULL, NULL, true, false);
if (r < 0)
return r;
index_op.set_zones_trace(params.zones_trace);
index_op.set_bilog_flags(params.bilog_flags);
-
r = index_op.prepare(CLS_RGW_OP_DEL, &state->write_tag);
if (r < 0)
return r;
store->remove_rgw_head_obj(op);
r = ref.ioctx.operate(ref.oid, &op);
- bool need_invalidate = false;
- if (r == -ECANCELED) {
- /* raced with another operation, we can regard it as removed */
- need_invalidate = true;
- r = 0;
- }
+
+ /* raced with another operation, object state is indeterminate */
+ const bool need_invalidate = (r == -ECANCELED);
int64_t poolid = ref.ioctx.get_id();
if (r >= 0) {
s->shadow_obj[bl.length()] = '\0';
}
s->obj_tag = s->attrset[RGW_ATTR_ID_TAG];
+ auto ttiter = s->attrset.find(RGW_ATTR_TAIL_TAG);
+ if (ttiter != s->attrset.end()) {
+ s->tail_tag = s->attrset[RGW_ATTR_TAIL_TAG];
+ }
bufferlist manifest_bl = s->attrset[RGW_ATTR_MANIFEST];
if (manifest_bl.length()) {
}
int RGWRados::Object::prepare_atomic_modification(ObjectWriteOperation& op, bool reset_obj, const string *ptag,
- const char *if_match, const char *if_nomatch, bool removal_op)
+ const char *if_match, const char *if_nomatch, bool removal_op,
+ bool modify_tail)
{
int r = get_state(&state, false);
if (r < 0)
ldout(store->ctx(), 10) << "setting object write_tag=" << state->write_tag << dendl;
op.setxattr(RGW_ATTR_ID_TAG, bl);
+ if (modify_tail) {
+ op.setxattr(RGW_ATTR_TAIL_TAG, bl);
+ }
return 0;
}
return r;
bl.append(tag.c_str(), tag.size() + 1);
-
op.setxattr(RGW_ATTR_ID_TAG, bl);
}
+
+ real_time mtime = real_clock::now();
+ struct timespec mtime_ts = real_clock::to_timespec(mtime);
+ op.mtime2(&mtime_ts);
r = ref.ioctx.operate(ref.oid, &op);
if (state) {
if (r >= 0) {
string content_type(content_type_bl.c_str(), content_type_bl.length());
uint64_t epoch = ref.ioctx.get_last_version();
int64_t poolid = ref.ioctx.get_id();
- real_time mtime = real_clock::now();
r = index_op.complete(poolid, epoch, state->size, state->accounted_size,
mtime, etag, content_type, &acl_bl,
RGW_OBJ_CATEGORY_MAIN, NULL);
RGWObjVersionTracker *objv_tracker, rgw_raw_obj& obj,
bufferlist& bl, off_t ofs, off_t end,
map<string, bufferlist> *attrs,
- rgw_cache_entry_info *cache_info)
+ rgw_cache_entry_info *cache_info,
+ boost::optional<obj_version>)
{
uint64_t len;
ObjectReadOperation op;
return bl.length();
}
-int RGWRados::SystemObject::Read::read(int64_t ofs, int64_t end, bufferlist& bl, RGWObjVersionTracker *objv_tracker)
+int RGWRados::SystemObject::Read::read(int64_t ofs, int64_t end, bufferlist& bl,
+ RGWObjVersionTracker *objv_tracker,
+ boost::optional<obj_version> refresh_version)
{
RGWRados *store = source->get_store();
rgw_raw_obj& obj = source->get_obj();
- return store->get_system_obj(source->get_ctx(), state, objv_tracker, obj, bl, ofs, end, read_params.attrs, read_params.cache_info);
+ return store->get_system_obj(source->get_ctx(), state, objv_tracker, obj, bl,
+ ofs, end, read_params.attrs,
+ read_params.cache_info, refresh_version);
}
int RGWRados::SystemObject::Read::get_attr(const char *name, bufferlist& dest)
op.create(true);
} else {
op.assert_exists();
+ struct timespec mtime_ts = real_clock::to_timespec(state.mtime);
+ op.mtime2(&mtime_ts);
}
/*
const string& op_tag,
struct rgw_bucket_dir_entry_meta *meta,
uint64_t olh_epoch,
- real_time unmod_since, bool high_precision_time, rgw_zone_set *_zones_trace)
+ real_time unmod_since, bool high_precision_time,
+ rgw_zone_set *_zones_trace, bool log_data_change)
{
rgw_rados_ref ref;
int r = get_obj_head_ref(bucket_info, obj_instance, &ref);
rgw_zone_set zones_trace;
if (_zones_trace) {
zones_trace = *_zones_trace;
- } else {
- zones_trace.insert(get_zone().id);
}
+ zones_trace.insert(get_zone().id);
BucketShard bs(this);
return r;
}
+ if (log_data_change && bucket_info.datasync_flag_enabled()) {
+ data_log->add_entry(bs.bucket, bs.shard_id);
+ }
+
return 0;
}
op.cmpxattr(RGW_ATTR_OLH_ID_TAG, CEPH_OSD_CMPXATTR_OP_EQ, olh_tag);
op.cmpxattr(RGW_ATTR_OLH_VER, CEPH_OSD_CMPXATTR_OP_GT, last_ver);
+ struct timespec mtime_ts = real_clock::to_timespec(state.mtime);
+ op.mtime2(&mtime_ts);
+
bool need_to_link = false;
cls_rgw_obj_key key;
bool delete_marker = false;
}
int RGWRados::set_olh(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const rgw_obj& target_obj, bool delete_marker, rgw_bucket_dir_entry_meta *meta,
- uint64_t olh_epoch, real_time unmod_since, bool high_precision_time, rgw_zone_set *zones_trace)
+ uint64_t olh_epoch, real_time unmod_since, bool high_precision_time,
+ rgw_zone_set *zones_trace, bool log_data_change)
{
string op_tag;
}
return ret;
}
- ret = bucket_index_link_olh(bucket_info, *state, target_obj, delete_marker, op_tag, meta, olh_epoch, unmod_since, high_precision_time, zones_trace);
+ ret = bucket_index_link_olh(bucket_info, *state, target_obj, delete_marker,
+ op_tag, meta, olh_epoch, unmod_since, high_precision_time,
+ zones_trace, log_data_change);
if (ret < 0) {
ldout(cct, 20) << "bucket_index_link_olh() target_obj=" << target_obj << " delete_marker=" << (int)delete_marker << " returned " << ret << dendl;
if (ret == -ECANCELED) {
int RGWRados::get_bucket_instance_from_oid(RGWObjectCtx& obj_ctx, const string& oid, RGWBucketInfo& info,
real_time *pmtime, map<string, bufferlist> *pattrs,
- rgw_cache_entry_info *cache_info)
+ rgw_cache_entry_info *cache_info,
+ boost::optional<obj_version> refresh_version)
{
ldout(cct, 20) << "reading from " << get_zone_params().domain_root << ":" << oid << dendl;
bufferlist epbl;
- int ret = rgw_get_system_obj(this, obj_ctx, get_zone_params().domain_root, oid, epbl, &info.objv_tracker, pmtime, pattrs, cache_info);
+ int ret = rgw_get_system_obj(this, obj_ctx, get_zone_params().domain_root,
+ oid, epbl, &info.objv_tracker, pmtime, pattrs,
+ cache_info, refresh_version);
if (ret < 0) {
return ret;
}
RGWObjVersionTracker *objv_tracker,
real_time *pmtime,
map<string, bufferlist> *pattrs,
- rgw_cache_entry_info *cache_info)
+ rgw_cache_entry_info *cache_info,
+ boost::optional<obj_version> refresh_version)
{
bufferlist bl;
string bucket_entry;
rgw_make_bucket_entry_name(tenant_name, bucket_name, bucket_entry);
- int ret = rgw_get_system_obj(this, obj_ctx, get_zone_params().domain_root, bucket_entry, bl, objv_tracker, pmtime, pattrs, cache_info);
+ int ret = rgw_get_system_obj(this, obj_ctx, get_zone_params().domain_root,
+ bucket_entry, bl, objv_tracker, pmtime, pattrs,
+ cache_info, refresh_version);
if (ret < 0) {
return ret;
}
return 0;
}
-int RGWRados::get_bucket_info(RGWObjectCtx& obj_ctx,
- const string& tenant, const string& bucket_name, RGWBucketInfo& info,
- real_time *pmtime, map<string, bufferlist> *pattrs)
+int RGWRados::_get_bucket_info(RGWObjectCtx& obj_ctx,
+ const string& tenant,
+ const string& bucket_name,
+ RGWBucketInfo& info,
+ real_time *pmtime,
+ map<string, bufferlist> *pattrs,
+ boost::optional<obj_version> refresh_version)
{
bucket_info_entry e;
string bucket_entry;
rgw_make_bucket_entry_name(tenant, bucket_name, bucket_entry);
+
if (binfo_cache->find(bucket_entry, &e)) {
+ if (refresh_version &&
+ e.info.objv_tracker.read_version.compare(&(*refresh_version))) {
+ lderr(cct) << "WARNING: The bucket info cache is inconsistent. This is "
+ << "a failure that should be debugged. I am a nice machine, "
+ << "so I will try to recover." << dendl;
+ binfo_cache->invalidate(bucket_entry);
+ }
info = e.info;
if (pattrs)
*pattrs = e.attrs;
real_time ep_mtime;
RGWObjVersionTracker ot;
rgw_cache_entry_info entry_cache_info;
- int ret = get_bucket_entrypoint_info(obj_ctx, tenant, bucket_name, entry_point, &ot, &ep_mtime, pattrs, &entry_cache_info);
+ int ret = get_bucket_entrypoint_info(obj_ctx, tenant, bucket_name,
+ entry_point, &ot, &ep_mtime, pattrs,
+ &entry_cache_info, refresh_version);
if (ret < 0) {
/* only init these fields */
info.bucket.tenant = tenant;
rgw_cache_entry_info cache_info;
- ret = get_bucket_instance_from_oid(obj_ctx, oid, e.info, &e.mtime, &e.attrs, &cache_info);
+ ret = get_bucket_instance_from_oid(obj_ctx, oid, e.info, &e.mtime, &e.attrs,
+ &cache_info, refresh_version);
e.info.ep_objv = ot.read_version;
info = e.info;
if (ret < 0) {
+ lderr(cct) << "ERROR: get_bucket_instance_from_oid failed: " << ret << dendl;
info.bucket.tenant = tenant;
info.bucket.name = bucket_name;
// XXX and why return anything in case of an error anyway?
ldout(cct, 20) << "couldn't put binfo cache entry, might have raced with data changes" << dendl;
}
+ if (refresh_version &&
+ refresh_version->compare(&info.objv_tracker.read_version)) {
+ lderr(cct) << "WARNING: The OSD has the same version I have. Something may "
+ << "have gone squirrelly. An administrator may have forced a "
+ << "change; otherwise there is a problem somewhere." << dendl;
+ }
+
return 0;
}
+int RGWRados::get_bucket_info(RGWObjectCtx& obj_ctx,
+ const string& tenant, const string& bucket_name,
+ RGWBucketInfo& info,
+ real_time *pmtime, map<string, bufferlist> *pattrs)
+{
+ return _get_bucket_info(obj_ctx, tenant, bucket_name, info, pmtime,
+ pattrs, boost::none);
+}
+
+int RGWRados::try_refresh_bucket_info(RGWBucketInfo& info,
+ ceph::real_time *pmtime,
+ map<string, bufferlist> *pattrs)
+{
+ RGWObjectCtx obj_ctx(this);
+
+ return _get_bucket_info(obj_ctx, info.bucket.tenant, info.bucket.name,
+ info, pmtime, pattrs, info.objv_tracker.read_version);
+}
+
int RGWRados::put_bucket_entrypoint_info(const string& tenant_name, const string& bucket_name, RGWBucketEntryPoint& entry_point,
bool exclusive, RGWObjVersionTracker& objv_tracker, real_time mtime,
map<string, bufferlist> *pattrs)
ent.size_rounded += stats.total_size_rounded;
}
}
+
+ // fill in placement_rule from the bucket instance for use in swift's
+ // per-storage policy statistics
+ ent.placement_rule = std::move(bucket_info.placement_rule);
}
return m.size();
return 0;
}
+int RGWRados::pool_iterate_begin(const rgw_pool& pool, const string& cursor, RGWPoolIterCtx& ctx)
+{
+ librados::IoCtx& io_ctx = ctx.io_ctx;
+ librados::NObjectIterator& iter = ctx.iter;
+
+ int r = open_pool_ctx(pool, io_ctx);
+ if (r < 0)
+ return r;
+
+ librados::ObjectCursor oc;
+ if (!oc.from_str(cursor)) {
+ ldout(cct, 10) << "failed to parse cursor: " << cursor << dendl;
+ return -EINVAL;
+ }
+
+ iter = io_ctx.nobjects_begin(oc);
+
+ return 0;
+}
+
+string RGWRados::pool_iterate_get_cursor(RGWPoolIterCtx& ctx)
+{
+ return ctx.iter.get_cursor().to_str();
+}
+
int RGWRados::pool_iterate(RGWPoolIterCtx& ctx, uint32_t num, vector<rgw_bucket_dir_entry>& objs,
bool *is_truncated, RGWAccessListFilter *filter)
{
}
};
-int RGWRados::list_raw_objects(const rgw_pool& pool, const string& prefix_filter,
- int max, RGWListRawObjsCtx& ctx, list<string>& oids,
- bool *is_truncated)
+int RGWRados::list_raw_objects_init(const rgw_pool& pool, const string& marker, RGWListRawObjsCtx *ctx)
{
- RGWAccessListFilterPrefix filter(prefix_filter);
-
- if (!ctx.initialized) {
- int r = pool_iterate_begin(pool, ctx.iter_ctx);
+ if (!ctx->initialized) {
+ int r = pool_iterate_begin(pool, marker, ctx->iter_ctx);
if (r < 0) {
ldout(cct, 10) << "failed to list objects pool_iterate_begin() returned r=" << r << dendl;
return r;
}
- ctx.initialized = true;
+ ctx->initialized = true;
}
+ return 0;
+}
+int RGWRados::list_raw_objects_next(const string& prefix_filter, int max,
+ RGWListRawObjsCtx& ctx, list<string>& oids,
+ bool *is_truncated)
+{
+ if (!ctx.initialized) {
+ return -EINVAL;
+ }
+ RGWAccessListFilterPrefix filter(prefix_filter);
vector<rgw_bucket_dir_entry> objs;
int r = pool_iterate(ctx.iter_ctx, max, objs, is_truncated, &filter);
if (r < 0) {
return oids.size();
}
+int RGWRados::list_raw_objects(const rgw_pool& pool, const string& prefix_filter,
+ int max, RGWListRawObjsCtx& ctx, list<string>& oids,
+ bool *is_truncated)
+{
+ if (!ctx.initialized) {
+ int r = list_raw_objects_init(pool, string(), &ctx);
+ if (r < 0) {
+ return r;
+ }
+ }
+
+ return list_raw_objects_next(prefix_filter, max, ctx, oids, is_truncated);
+}
+
+string RGWRados::list_raw_objs_get_cursor(RGWListRawObjsCtx& ctx)
+{
+ return pool_iterate_get_cursor(ctx.iter_ctx);
+}
+
int RGWRados::list_bi_log_entries(RGWBucketInfo& bucket_info, int shard_id, string& marker, uint32_t max,
std::list<rgw_bi_log_entry>& result, bool *truncated)
{
return lc->process();
}
-int RGWRados::process_expire_objects()
+bool RGWRados::process_expire_objects()
{
- obj_expirer->inspect_all_shards(utime_t(), ceph_clock_now());
- return 0;
+ return obj_expirer->inspect_all_shards(utime_t(), ceph_clock_now());
}
int RGWRados::cls_rgw_init_index(librados::IoCtx& index_ctx, librados::ObjectWriteOperation& op, string& oid)
if (_zones_trace) {
zones_trace = *_zones_trace;
}
- else {
- zones_trace.insert(get_zone().id);
- }
-
+ zones_trace.insert(get_zone().id);
+
ObjectWriteOperation o;
cls_rgw_obj_key key(obj.key.get_index_key_name(), obj.key.instance);
cls_rgw_guard_bucket_resharding(o, -ERR_BUSY_RESHARDING);
dir_meta = ent.meta;
dir_meta.category = category;
+ rgw_zone_set zones_trace;
+ if (_zones_trace) {
+ zones_trace = *_zones_trace;
+ }
+ zones_trace.insert(get_zone().id);
+
rgw_bucket_entry_ver ver;
ver.pool = pool;
ver.epoch = epoch;
cls_rgw_obj_key key(ent.key.name, ent.key.instance);
cls_rgw_guard_bucket_resharding(o, -ERR_BUSY_RESHARDING);
cls_rgw_bucket_complete_op(o, op, tag, ver, key, dir_meta, remove_objs,
- get_zone().log_data, bilog_flags, _zones_trace);
+ get_zone().log_data, bilog_flags, &zones_trace);
complete_op_data *arg;
index_completion_manager->create_completion(obj, op, tag, ver, key, dir_meta, remove_objs,
- get_zone().log_data, bilog_flags, _zones_trace, &arg);
+ get_zone().log_data, bilog_flags, &zones_trace, &arg);
librados::AioCompletion *completion = arg->rados_completion;
int ret = bs.index_ctx.aio_operate(bs.bucket_obj, arg->rados_completion, &o);
completion->release(); /* can't reference arg here, as it might have already been released */
return CLSRGWIssueSetTagTimeout(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio, timeout)();
}
-int RGWRados::cls_bucket_list(RGWBucketInfo& bucket_info, int shard_id, rgw_obj_index_key& start, const string& prefix,
- uint32_t num_entries, bool list_versions, map<string, rgw_bucket_dir_entry>& m,
- bool *is_truncated, rgw_obj_index_key *last_entry,
- bool (*force_check_filter)(const string& name))
+
+int RGWRados::cls_bucket_list_ordered(RGWBucketInfo& bucket_info,
+ int shard_id,
+ rgw_obj_index_key& start,
+ const string& prefix,
+ uint32_t num_entries,
+ bool list_versions,
+ map<string, rgw_bucket_dir_entry>& m,
+ bool *is_truncated,
+ rgw_obj_index_key *last_entry,
+ bool (*force_check_filter)(const string& name))
{
- ldout(cct, 10) << "cls_bucket_list " << bucket_info.bucket << " start " << start.name << "[" << start.instance << "] num_entries " << num_entries << dendl;
+ ldout(cct, 10) << "cls_bucket_list_ordered " << bucket_info.bucket <<
+ " start " << start.name << "[" << start.instance << "] num_entries " <<
+ num_entries << dendl;
librados::IoCtx index_ctx;
// key - oid (for different shards if there is any)
- // value - list result for the corresponding oid (shard), it is filled by the AIO callback
+ // value - list result for the corresponding oid (shard), it is filled by
+ // the AIO callback
map<int, string> oids;
map<int, struct rgw_cls_list_ret> list_results;
int r = open_bucket_index(bucket_info, index_ctx, oids, shard_id);
return r;
cls_rgw_obj_key start_key(start.name, start.instance);
- r = CLSRGWIssueBucketList(index_ctx, start_key, prefix, num_entries, list_versions,
- oids, list_results, cct->_conf->rgw_bucket_index_max_aio)();
+ r = CLSRGWIssueBucketList(index_ctx, start_key, prefix, num_entries,
+ list_versions, oids, list_results,
+ cct->_conf->rgw_bucket_index_max_aio)();
if (r < 0)
return r;
const string& name = vcurrents[pos]->first;
struct rgw_bucket_dir_entry& dirent = vcurrents[pos]->second;
- bool force_check = force_check_filter && force_check_filter(dirent.key.name);
- if ((!dirent.exists && !dirent.is_delete_marker()) || !dirent.pending_map.empty() || force_check) {
+ bool force_check = force_check_filter &&
+ force_check_filter(dirent.key.name);
+ if ((!dirent.exists && !dirent.is_delete_marker()) ||
+ !dirent.pending_map.empty() ||
+ force_check) {
/* there are uncommitted ops. We need to check the current state,
* and if the tags are old we need to do cleanup as well. */
librados::IoCtx sub_ctx;
sub_ctx.dup(index_ctx);
- r = check_disk_state(sub_ctx, bucket_info, dirent, dirent, updates[vnames[pos]]);
+ r = check_disk_state(sub_ctx, bucket_info, dirent, dirent,
+ updates[vnames[pos]]);
if (r < 0 && r != -ENOENT) {
return r;
}
}
if (r >= 0) {
- ldout(cct, 10) << "RGWRados::cls_bucket_list: got " << dirent.key.name << "[" << dirent.key.instance << "]" << dendl;
+ ldout(cct, 10) << "RGWRados::cls_bucket_list_ordered: got " <<
+ dirent.key.name << "[" << dirent.key.instance << "]" << dendl;
m[name] = std::move(dirent);
++count;
}
// we don't care if we lose suggested updates, send them off blindly
AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
index_ctx.aio_operate(miter->first, c, &o);
- c->release();
+ c->release();
}
}
// Check if all the returned entries are consumed or not
for (size_t i = 0; i < vcurrents.size(); ++i) {
- if (vcurrents[i] != vends[i])
+ if (vcurrents[i] != vends[i]) {
*is_truncated = true;
+ break;
+ }
}
if (!m.empty())
*last_entry = m.rbegin()->first;
return 0;
}
-int RGWRados::cls_obj_usage_log_add(const string& oid, rgw_usage_log_info& info)
+
+int RGWRados::cls_bucket_list_unordered(RGWBucketInfo& bucket_info,
+ int shard_id,
+ rgw_obj_index_key& start,
+ const string& prefix,
+ uint32_t num_entries,
+ bool list_versions,
+ std::vector<rgw_bucket_dir_entry>& ent_list,
+ bool *is_truncated,
+ rgw_obj_index_key *last_entry,
+ bool (*force_check_filter)(const string& name)) {
+ ldout(cct, 10) << "cls_bucket_list_unordered " << bucket_info.bucket <<
+ " start " << start.name << "[" << start.instance <<
+ "] num_entries " << num_entries << dendl;
+
+ *is_truncated = false;
+ librados::IoCtx index_ctx;
+
+ rgw_obj_index_key my_start = start;
+
+ map<int, string> oids;
+ int r = open_bucket_index(bucket_info, index_ctx, oids, shard_id);
+ if (r < 0)
+ return r;
+ const uint32_t num_shards = oids.size();
+
+ uint32_t current_shard;
+ if (shard_id >= 0) {
+ current_shard = shard_id;
+ } else if (my_start.empty()) {
+ current_shard = 0u;
+ } else {
+ current_shard =
+ rgw_bucket_shard_index(my_start.name, num_shards);
+ }
+
+ uint32_t count = 0u;
+ map<string, bufferlist> updates;
+ std::string last_added_entry;
+ while (count <= num_entries &&
+ ((shard_id >= 0 && current_shard == uint32_t(shard_id)) ||
+ current_shard < num_shards)) {
+ // key - oid (for different shards if there is any)
+ // value - list result for the corresponding oid (shard), it is filled by
+ // the AIO callback
+ map<int, struct rgw_cls_list_ret> list_results;
+ r = CLSRGWIssueBucketList(index_ctx, my_start, prefix, num_entries,
+ list_versions, oids, list_results,
+ cct->_conf->rgw_bucket_index_max_aio)();
+ if (r < 0)
+ return r;
+
+ const std::string& oid = oids[current_shard];
+ assert(list_results.find(current_shard) != list_results.end());
+ auto& result = list_results[current_shard];
+ for (auto& entry : result.dir.m) {
+ rgw_bucket_dir_entry& dirent = entry.second;
+
+ bool force_check = force_check_filter &&
+ force_check_filter(dirent.key.name);
+ if ((!dirent.exists && !dirent.is_delete_marker()) ||
+ !dirent.pending_map.empty() ||
+ force_check) {
+ /* there are uncommitted ops. We need to check the current state,
+ * and if the tags are old we need to do cleanup as well. */
+ librados::IoCtx sub_ctx;
+ sub_ctx.dup(index_ctx);
+ r = check_disk_state(sub_ctx, bucket_info, dirent, dirent, updates[oid]);
+ if (r < 0 && r != -ENOENT) {
+ return r;
+ }
+ }
+
+ // at this point either r >=0 or r == -ENOENT
+ if (r >= 0) { // i.e., if r != -ENOENT
+ ldout(cct, 10) << "RGWRados::cls_bucket_list_unordered: got " <<
+ dirent.key.name << "[" << dirent.key.instance << "]" << dendl;
+
+ if (count < num_entries) {
+ last_added_entry = entry.first;
+ my_start = dirent.key;
+ ent_list.emplace_back(std::move(dirent));
+ ++count;
+ } else {
+ *is_truncated = true;
+ goto check_updates;
+ }
+ } else { // r == -ENOENT
+ // in the case of -ENOENT, make sure we're advancing marker
+ // for possible next call to CLSRGWIssueBucketList
+ my_start = dirent.key;
+ }
+ } // entry for loop
+
+ if (!result.is_truncated) {
+ // if we reached the end of the shard read next shard
+ ++current_shard;
+ my_start = rgw_obj_index_key();
+ }
+ } // shard loop
+
+check_updates:
+ // suggest updates if there is any
+ map<string, bufferlist>::iterator miter = updates.begin();
+ for (; miter != updates.end(); ++miter) {
+ if (miter->second.length()) {
+ ObjectWriteOperation o;
+ cls_rgw_suggest_changes(o, miter->second);
+ // we don't care if we lose suggested updates, send them off blindly
+ AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
+ index_ctx.aio_operate(miter->first, c, &o);
+ c->release();
+ }
+ }
+
+ if (last_entry && !ent_list.empty()) {
+ *last_entry = last_added_entry;
+ }
+
+ return 0;
+}
+
+
+int RGWRados::cls_obj_usage_log_add(const string& oid,
+ rgw_usage_log_info& info)
{
rgw_raw_obj obj(get_zone_params().usage_log_pool, oid);
return r;
}
- ObjectWriteOperation op;
- cls_rgw_usage_log_trim(op, user, start_epoch, end_epoch);
-
- r = ref.ioctx.operate(ref.oid, &op);
+ r = cls_rgw_usage_log_trim(ref.ioctx, ref.oid, user, start_epoch, end_epoch);
return r;
}
return 0;
}
+int RGWRados::cls_user_reset_stats(const string& user_id)
+{
+ string buckets_obj_id;
+ rgw_get_buckets_obj(user_id, buckets_obj_id);
+ rgw_raw_obj obj(get_zone_params().user_uid_pool, buckets_obj_id);
+
+ rgw_rados_ref ref;
+ int r = get_raw_obj_ref(obj, &ref);
+ if (r < 0) {
+ return r;
+ }
+
+ librados::ObjectWriteOperation op;
+ ::cls_user_reset_stats(op);
+ return ref.ioctx.operate(ref.oid, &op);
+}
+
int RGWRados::cls_user_get_header_async(const string& user_id, RGWGetUserHeader_CB *ctx)
{
string buckets_obj_id;
}
void RGWRados::get_bucket_index_objects(const string& bucket_oid_base,
- uint32_t num_shards, map<int, string>& bucket_objects, int shard_id)
-{
+ uint32_t num_shards,
+ map<int, string>& bucket_objects,
+ int shard_id) {
if (!num_shards) {
bucket_objects[0] = bucket_oid_base;
} else {
*shard_id = -1;
}
} else {
- uint32_t sid = ceph_str_hash_linux(obj_key.c_str(), obj_key.size());
- uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
- sid = rgw_shards_mod(sid2, bucket_info.num_shards);
+ uint32_t sid = rgw_bucket_shard_index(obj_key, bucket_info.num_shards);
if (shard_id) {
*shard_id = (int)sid;
}
*shard_id = -1;
}
} else {
- uint32_t sid = ceph_str_hash_linux(obj_key.c_str(), obj_key.size());
- uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
- sid = rgw_shards_mod(sid2, num_shards);
+ uint32_t sid = rgw_bucket_shard_index(obj_key, num_shards);
char buf[bucket_oid_base.size() + 32];
snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), sid);
(*bucket_obj) = buf;
return ++max_bucket_id;
}
-RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread)
+RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread,
+ bool quota_threads, bool run_sync_thread, bool run_reshard_thread, bool use_cache)
{
- int use_cache = cct->_conf->rgw_cache_enabled;
RGWRados *store = NULL;
if (!use_cache) {
store = new RGWRados;
} else {
- store = new RGWCache<RGWRados>;
+ store = new RGWCache<RGWRados>;
}
if (store->initialize(cct, use_gc_thread, use_lc_thread, quota_threads, run_sync_thread, run_reshard_thread) < 0) {
}
}
+bool RGWRados::call(std::string command, cmdmap_t& cmdmap, std::string format,
+ bufferlist& out)
+{
+ if (command == "cache list") {
+ boost::optional<std::string> filter;
+ auto i = cmdmap.find("filter");
+ if (i != cmdmap.cend()) {
+ filter = boost::get<std::string>(i->second);
+ }
+ std::unique_ptr<Formatter> f(ceph::Formatter::create(format, "table"));
+ if (f) {
+ f->open_array_section("cache_entries");
+ call_list(filter, f.get());
+ f->close_section();
+ f->flush(out);
+ return true;
+ } else {
+ out.append("Unable to create Formatter.\n");
+ return false;
+ }
+ } else if (command == "cache inspect") {
+ std::unique_ptr<Formatter> f(ceph::Formatter::create(format, "json-pretty"));
+ if (f) {
+ const auto& target = boost::get<std::string>(cmdmap["target"]);
+ if (call_inspect(target, f.get())) {
+ f->flush(out);
+ return true;
+ } else {
+ out.append(string("Unable to find entry ") + target + string(".\n"));
+ return false;
+ }
+ } else {
+ out.append("Unable to create Formatter.\n");
+ return false;
+ }
+ } else if (command == "cache erase") {
+ const auto& target = boost::get<std::string>(cmdmap["target"]);
+ if (call_erase(target)) {
+ return true;
+ } else {
+ out.append(string("Unable to find entry ") + target + string(".\n"));
+ return false;
+ }
+ } else if (command == "cache zap") {
+ call_zap();
+ return true;
+ }
+ return false;
+}
+
+void RGWRados::call_list(const boost::optional<std::string>&,
+ ceph::Formatter*)
+{
+ return;
+}
+
+bool RGWRados::call_inspect(const std::string&, Formatter*)
+{
+ return false;
+}
+
+bool RGWRados::call_erase(const std::string&) {
+ return false;
+}
+
+void RGWRados::call_zap() {
+ return;
+}