-
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/* try using default realm */
RGWRealm realm;
int ret = realm.init(cct, store);
+ // no default realm exist
if (ret < 0) {
- ldout(cct, 10) << "could not read realm id: " << cpp_strerror(-ret) << dendl;
- return -ENOENT;
+ return read_id(default_zonegroup_name, default_id);
}
realm_id = realm.get_id();
}
/* try using default realm */
RGWRealm realm;
int ret = realm.init(cct, store);
+ //no default realm exist
if (ret < 0) {
- ldout(cct, 10) << "could not read realm id: " << cpp_strerror(-ret) << dendl;
- return -ENOENT;
+ return read_id(default_zone_name, default_id);
}
realm_id = realm.get_id();
}
return 0;
}
+const char* RGWRados::admin_commands[4][3] = {
+ { "cache list",
+ "cache list name=filter,type=CephString,req=false",
+ "cache list [filter_str]: list object cache, possibly matching substrings" },
+ { "cache inspect",
+ "cache inspect name=target,type=CephString,req=true",
+ "cache inspect target: print cache element" },
+ { "cache erase",
+ "cache erase name=target,type=CephString,req=true",
+ "cache erase target: erase element from cache" },
+ { "cache zap",
+ "cache zap",
+ "cache zap: erase all elements from cache" }
+};
+
+
int RGWRados::watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx) {
int r = control_pool_ctx.watch2(oid, watch_handle, ctx);
if (r < 0)
}
public:
RGWDataSyncProcessorThread(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
- const string& _source_zone)
- : RGWSyncProcessorThread(_store, "data-sync"), sync(_store, async_rados, _source_zone),
+ const string& _source_zone,
+ rgw::BucketChangeObserver *observer)
+ : RGWSyncProcessorThread(_store, "data-sync"),
+ sync(_store, async_rados, _source_zone, observer),
initialized(false) {}
void wakeup_sync_shards(map<int, set<string> >& shard_ids) {
{
RGWCoroutinesManager crs;
RGWRados *store;
+ rgw::BucketTrimManager *bucket_trim;
RGWHTTPManager http;
const utime_t trim_interval;
uint64_t interval_msec() override { return 0; }
void stop_process() override { crs.stop(); }
public:
- RGWSyncLogTrimThread(RGWRados *store, int interval)
+ RGWSyncLogTrimThread(RGWRados *store, rgw::BucketTrimManager *bucket_trim,
+ int interval)
: RGWSyncProcessorThread(store, "sync-log-trim"),
crs(store->ctx(), store->get_cr_registry()), store(store),
+ bucket_trim(bucket_trim),
http(store->ctx(), crs.get_completion_mgr()),
trim_interval(interval, 0)
{}
trim_interval));
stacks.push_back(data);
+ auto bucket = new RGWCoroutinesStack(store->ctx(), &crs);
+ bucket->call(bucket_trim->create_bucket_trim_cr(&http));
+ stacks.push_back(bucket);
+
crs.run(stacks);
return 0;
}
void RGWRados::finalize()
{
+ auto admin_socket = cct->get_admin_socket();
+ for (auto cmd : admin_commands) {
+ int r = admin_socket->unregister_command(cmd[0]);
+ if (r < 0) {
+ lderr(cct) << "ERROR: fail to unregister admin socket command (r=" << r
+ << ")" << dendl;
+ }
+ }
+
if (run_sync_thread) {
Mutex::Locker l(meta_sync_thread_lock);
meta_sync_processor_thread->stop();
data_sync_processor_threads.clear();
delete sync_log_trimmer;
sync_log_trimmer = nullptr;
+ bucket_trim = boost::none;
}
if (finisher) {
finisher->stop();
int RGWRados::init_rados()
{
int ret = 0;
+ auto admin_socket = cct->get_admin_socket();
+ for (auto cmd : admin_commands) {
+ int r = admin_socket->register_command(cmd[0], cmd[1], this,
+ cmd[2]);
+ if (r < 0) {
+ lderr(cct) << "ERROR: fail to register admin socket command (r=" << r
+ << ")" << dendl;
+ return r;
+ }
+ }
+
auto handles = std::vector<librados::Rados>{cct->_conf->rgw_num_rados_handles};
for (auto& r : handles) {
obj_expirer->start_processor();
}
- if (run_sync_thread) {
- // initialize the log period history. we want to do this any time we're not
- // running under radosgw-admin, so we check run_sync_thread here before
- // disabling it based on the zone/zonegroup setup
- meta_mgr->init_oldest_log_period();
- }
-
/* no point of running sync thread if we don't have a master zone configured
or there is no rest_master_conn */
if (get_zonegroup().master_zone.empty() || !rest_master_conn
run_sync_thread = false;
}
+ if (run_sync_thread) {
+ // initialize the log period history
+ meta_mgr->init_oldest_log_period();
+ }
+
async_rados = new RGWAsyncRadosProcessor(this, cct->_conf->rgw_num_async_rados_threads);
async_rados->start();
}
meta_sync_processor_thread->start();
+ // configure the bucket trim manager
+ rgw::BucketTrimConfig config;
+ rgw::configure_bucket_trim(cct, config);
+
+ bucket_trim.emplace(this, config);
+ ret = bucket_trim->init();
+ if (ret < 0) {
+ ldout(cct, 0) << "ERROR: failed to start bucket trim manager" << dendl;
+ return ret;
+ }
+
Mutex::Locker dl(data_sync_thread_lock);
for (auto iter : zone_data_sync_from_map) {
ldout(cct, 5) << "starting data sync thread for zone " << iter.first << dendl;
- RGWDataSyncProcessorThread *thread = new RGWDataSyncProcessorThread(this, async_rados, iter.first);
+ auto *thread = new RGWDataSyncProcessorThread(this, async_rados, iter.first,
+ &*bucket_trim);
ret = thread->init();
if (ret < 0) {
ldout(cct, 0) << "ERROR: failed to initialize data sync thread" << dendl;
}
auto interval = cct->_conf->rgw_sync_log_trim_interval;
if (interval > 0) {
- sync_log_trimmer = new RGWSyncLogTrimThread(this, interval);
+ sync_log_trimmer = new RGWSyncLogTrimThread(this, &*bucket_trim, interval);
ret = sync_log_trimmer->init();
if (ret < 0) {
ldout(cct, 0) << "ERROR: failed to initialize sync log trim thread" << dendl;
int RGWRados::open_bucket_index_ctx(const RGWBucketInfo& bucket_info, librados::IoCtx& index_ctx)
{
+ const rgw_pool& explicit_pool = bucket_info.bucket.explicit_placement.index_pool;
+
+ if (!explicit_pool.empty()) {
+ return open_pool_ctx(explicit_pool, index_ctx);
+ }
+
const string *rule = &bucket_info.placement_rule;
if (rule->empty()) {
rule = &zonegroup.default_placement;
usage_log_hash(cct, user_str, first_hash, index);
hash = first_hash;
-
do {
int ret = cls_obj_usage_log_trim(hash, user_str, start_epoch, end_epoch);
- if (ret == -ENOENT)
- goto next;
- if (ret < 0)
+ if (ret < 0 && ret != -ENOENT)
return ret;
-next:
usage_log_hash(cct, user_str, hash, ++index);
} while (hash != first_hash);
ldout(cct, 0)
<< __func__
<< " ERROR: librados::Rados::pool_create returned " << cpp_strerror(-ret)
- << " (this can be due to a pool or placement group misconfiguration, e.g., pg_num < pgp_num)"
+ << " (this can be due to a pool or placement group misconfiguration, e.g."
+ << " pg_num < pgp_num or mon_max_pg_per_osd exceeded)"
<< dendl;
}
if (ret < 0)
return 0;
}
+int RGWRados::BucketShard::init(const RGWBucketInfo& bucket_info, int sid)
+{
+ bucket = bucket_info.bucket;
+ shard_id = sid;
+
+ int ret = store->open_bucket_index_shard(bucket_info, index_ctx, shard_id, &bucket_obj);
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: open_bucket_index_shard() returned ret=" << ret << dendl;
+ return ret;
+ }
+ ldout(store->ctx(), 20) << " bucket index object: " << bucket_obj << dendl;
+
+ return 0;
+}
+
/* Execute @handler on last item in bucket listing for bucket specified
* in @bucket_info. @obj_prefix and @obj_delim narrow down the listing
void (*progress_cb)(off_t, void *);
void *progress_data;
bufferlist extra_data_bl;
- uint64_t extra_data_len;
+ uint64_t extra_data_left;
uint64_t data_len;
map<string, bufferlist> src_attrs;
public:
opstate(_ops),
progress_cb(_progress_cb),
progress_data(_progress_data),
- extra_data_len(0),
+ extra_data_left(0),
data_len(0) {}
int process_attrs(void) {
if (progress_cb) {
progress_cb(ofs, progress_data);
}
- if (extra_data_len) {
+ if (extra_data_left) {
size_t extra_len = bl.length();
- if (extra_len > extra_data_len)
- extra_len = extra_data_len;
+ if (extra_len > extra_data_left)
+ extra_len = extra_data_left;
bufferlist extra;
bl.splice(0, extra_len, &extra);
extra_data_bl.append(extra);
- extra_data_len -= extra_len;
- if (extra_data_len == 0) {
+ extra_data_left -= extra_len;
+ if (extra_data_left == 0) {
int res = process_attrs();
if (res < 0)
return res;
if (bl.length() == 0) {
return 0;
}
+ ofs += extra_len;
}
+ // adjust ofs based on extra_data_len, so the result is a logical offset
+ // into the object data
+ assert(uint64_t(ofs) >= extra_data_len);
+ ofs -= extra_data_len;
+
data_len += bl.length();
bool again = false;
map<string, bufferlist>& get_attrs() { return src_attrs; }
void set_extra_data_len(uint64_t len) override {
- extra_data_len = len;
+ extra_data_left = len;
+ RGWGetDataCB::set_extra_data_len(len);
}
uint64_t get_data_len() {
return ret;
}
- return copy_obj_data(rctx, dest_bucket_info, read_op, obj_size - 1, obj, obj, max_chunk_size, NULL, mtime, attrset,
- RGW_OBJ_CATEGORY_MAIN, 0, real_time(), NULL, NULL, NULL);
+ return copy_obj_data(rctx, dest_bucket_info, read_op, obj_size - 1, obj, obj,
+ max_chunk_size, NULL, mtime, attrset,
+ RGW_OBJ_CATEGORY_MAIN, 0, real_time(),
+ (obj.key.instance.empty() ? NULL : &(obj.key.instance)),
+ NULL, NULL);
}
struct obj_time_weight {
if (ret < 0) {
return ret;
}
+ if (src_attrs.count(RGW_ATTR_CRYPT_MODE)) {
+ // Current implementation does not follow S3 spec and even
+ // may result in data corruption silently when copying
+ // multipart objects acorss pools. So reject COPY operations
+ //on encrypted objects before it is fully functional.
+ ldout(cct, 0) << "ERROR: copy op for encrypted object " << src_obj
+ << " has not been implemented." << dendl;
+ return -ERR_NOT_IMPLEMENTED;
+ }
src_attrs[RGW_ATTR_ACL] = attrs[RGW_ATTR_ACL];
src_attrs.erase(RGW_ATTR_DELETE_AT);
append_rand_alpha(cct, tag, tag, 32);
RGWPutObjProcessor_Atomic processor(obj_ctx,
- dest_bucket_info, dest_obj.bucket, dest_obj.get_oid(),
+ dest_bucket_info, dest_obj.bucket, dest_obj.key.name,
cct->_conf->rgw_obj_stripe_size, tag, dest_bucket_info.versioning_enabled());
if (version_id) {
processor.set_version_id(*version_id);
store->remove_rgw_head_obj(op);
r = ref.ioctx.operate(ref.oid, &op);
- bool need_invalidate = false;
- if (r == -ECANCELED) {
- /* raced with another operation, we can regard it as removed */
- need_invalidate = true;
- r = 0;
- }
+
+ /* raced with another operation, object state is indeterminate */
+ const bool need_invalidate = (r == -ECANCELED);
int64_t poolid = ref.ioctx.get_id();
if (r >= 0) {
RGWObjVersionTracker *objv_tracker, rgw_raw_obj& obj,
bufferlist& bl, off_t ofs, off_t end,
map<string, bufferlist> *attrs,
- rgw_cache_entry_info *cache_info)
+ rgw_cache_entry_info *cache_info,
+ boost::optional<obj_version>)
{
uint64_t len;
ObjectReadOperation op;
return bl.length();
}
-int RGWRados::SystemObject::Read::read(int64_t ofs, int64_t end, bufferlist& bl, RGWObjVersionTracker *objv_tracker)
+int RGWRados::SystemObject::Read::read(int64_t ofs, int64_t end, bufferlist& bl,
+ RGWObjVersionTracker *objv_tracker,
+ boost::optional<obj_version> refresh_version)
{
RGWRados *store = source->get_store();
rgw_raw_obj& obj = source->get_obj();
- return store->get_system_obj(source->get_ctx(), state, objv_tracker, obj, bl, ofs, end, read_params.attrs, read_params.cache_info);
+ return store->get_system_obj(source->get_ctx(), state, objv_tracker, obj, bl,
+ ofs, end, read_params.attrs,
+ read_params.cache_info, refresh_version);
}
int RGWRados::SystemObject::Read::get_attr(const char *name, bufferlist& dest)
op.create(true);
} else {
op.assert_exists();
+ struct timespec mtime_ts = real_clock::to_timespec(state.mtime);
+ op.mtime2(&mtime_ts);
}
/*
op.cmpxattr(RGW_ATTR_OLH_ID_TAG, CEPH_OSD_CMPXATTR_OP_EQ, olh_tag);
op.cmpxattr(RGW_ATTR_OLH_VER, CEPH_OSD_CMPXATTR_OP_GT, last_ver);
+ struct timespec mtime_ts = real_clock::to_timespec(state.mtime);
+ op.mtime2(&mtime_ts);
+
bool need_to_link = false;
cls_rgw_obj_key key;
bool delete_marker = false;
int RGWRados::get_bucket_instance_from_oid(RGWObjectCtx& obj_ctx, const string& oid, RGWBucketInfo& info,
real_time *pmtime, map<string, bufferlist> *pattrs,
- rgw_cache_entry_info *cache_info)
+ rgw_cache_entry_info *cache_info,
+ boost::optional<obj_version> refresh_version)
{
ldout(cct, 20) << "reading from " << get_zone_params().domain_root << ":" << oid << dendl;
bufferlist epbl;
- int ret = rgw_get_system_obj(this, obj_ctx, get_zone_params().domain_root, oid, epbl, &info.objv_tracker, pmtime, pattrs, cache_info);
+ int ret = rgw_get_system_obj(this, obj_ctx, get_zone_params().domain_root,
+ oid, epbl, &info.objv_tracker, pmtime, pattrs,
+ cache_info, refresh_version);
if (ret < 0) {
return ret;
}
RGWObjVersionTracker *objv_tracker,
real_time *pmtime,
map<string, bufferlist> *pattrs,
- rgw_cache_entry_info *cache_info)
+ rgw_cache_entry_info *cache_info,
+ boost::optional<obj_version> refresh_version)
{
bufferlist bl;
string bucket_entry;
rgw_make_bucket_entry_name(tenant_name, bucket_name, bucket_entry);
- int ret = rgw_get_system_obj(this, obj_ctx, get_zone_params().domain_root, bucket_entry, bl, objv_tracker, pmtime, pattrs, cache_info);
+ int ret = rgw_get_system_obj(this, obj_ctx, get_zone_params().domain_root,
+ bucket_entry, bl, objv_tracker, pmtime, pattrs,
+ cache_info, refresh_version);
if (ret < 0) {
return ret;
}
return 0;
}
-int RGWRados::get_bucket_info(RGWObjectCtx& obj_ctx,
- const string& tenant, const string& bucket_name, RGWBucketInfo& info,
- real_time *pmtime, map<string, bufferlist> *pattrs)
+int RGWRados::_get_bucket_info(RGWObjectCtx& obj_ctx,
+ const string& tenant,
+ const string& bucket_name,
+ RGWBucketInfo& info,
+ real_time *pmtime,
+ map<string, bufferlist> *pattrs,
+ boost::optional<obj_version> refresh_version)
{
bucket_info_entry e;
string bucket_entry;
rgw_make_bucket_entry_name(tenant, bucket_name, bucket_entry);
+
if (binfo_cache->find(bucket_entry, &e)) {
+ if (refresh_version &&
+ e.info.objv_tracker.read_version.compare(&(*refresh_version))) {
+ lderr(cct) << "WARNING: The bucket info cache is inconsistent. This is "
+ << "a failure that should be debugged. I am a nice machine, "
+ << "so I will try to recover." << dendl;
+ binfo_cache->invalidate(bucket_entry);
+ }
info = e.info;
if (pattrs)
*pattrs = e.attrs;
real_time ep_mtime;
RGWObjVersionTracker ot;
rgw_cache_entry_info entry_cache_info;
- int ret = get_bucket_entrypoint_info(obj_ctx, tenant, bucket_name, entry_point, &ot, &ep_mtime, pattrs, &entry_cache_info);
+ int ret = get_bucket_entrypoint_info(obj_ctx, tenant, bucket_name,
+ entry_point, &ot, &ep_mtime, pattrs,
+ &entry_cache_info, refresh_version);
if (ret < 0) {
/* only init these fields */
info.bucket.tenant = tenant;
rgw_cache_entry_info cache_info;
- ret = get_bucket_instance_from_oid(obj_ctx, oid, e.info, &e.mtime, &e.attrs, &cache_info);
+ ret = get_bucket_instance_from_oid(obj_ctx, oid, e.info, &e.mtime, &e.attrs,
+ &cache_info, refresh_version);
e.info.ep_objv = ot.read_version;
info = e.info;
if (ret < 0) {
+ lderr(cct) << "ERROR: get_bucket_instance_from_oid failed: " << ret << dendl;
info.bucket.tenant = tenant;
info.bucket.name = bucket_name;
// XXX and why return anything in case of an error anyway?
ldout(cct, 20) << "couldn't put binfo cache entry, might have raced with data changes" << dendl;
}
+ if (refresh_version &&
+ refresh_version->compare(&info.objv_tracker.read_version)) {
+ lderr(cct) << "WARNING: The OSD has the same version I have. Something may "
+ << "have gone squirrelly. An administrator may have forced a "
+ << "change; otherwise there is a problem somewhere." << dendl;
+ }
+
return 0;
}
+int RGWRados::get_bucket_info(RGWObjectCtx& obj_ctx,
+ const string& tenant, const string& bucket_name,
+ RGWBucketInfo& info,
+ real_time *pmtime, map<string, bufferlist> *pattrs)
+{
+ return _get_bucket_info(obj_ctx, tenant, bucket_name, info, pmtime,
+ pattrs, boost::none);
+}
+
+int RGWRados::try_refresh_bucket_info(RGWBucketInfo& info,
+ ceph::real_time *pmtime,
+ map<string, bufferlist> *pattrs)
+{
+ RGWObjectCtx obj_ctx(this);
+
+ return _get_bucket_info(obj_ctx, info.bucket.tenant, info.bucket.name,
+ info, pmtime, pattrs, info.objv_tracker.read_version);
+}
+
int RGWRados::put_bucket_entrypoint_info(const string& tenant_name, const string& bucket_name, RGWBucketEntryPoint& entry_point,
bool exclusive, RGWObjVersionTracker& objv_tracker, real_time mtime,
map<string, bufferlist> *pattrs)
return r;
}
- ObjectWriteOperation op;
- cls_rgw_usage_log_trim(op, user, start_epoch, end_epoch);
-
- r = ref.ioctx.operate(ref.oid, &op);
+ r = cls_rgw_usage_log_trim(ref.ioctx, ref.oid, user, start_epoch, end_epoch);
return r;
}
return 0;
}
+int RGWRados::cls_user_reset_stats(const string& user_id)
+{
+ string buckets_obj_id;
+ rgw_get_buckets_obj(user_id, buckets_obj_id);
+ rgw_raw_obj obj(get_zone_params().user_uid_pool, buckets_obj_id);
+
+ rgw_rados_ref ref;
+ int r = get_raw_obj_ref(obj, &ref);
+ if (r < 0) {
+ return r;
+ }
+
+ librados::ObjectWriteOperation op;
+ ::cls_user_reset_stats(op);
+ return ref.ioctx.operate(ref.oid, &op);
+}
+
int RGWRados::cls_user_get_header_async(const string& user_id, RGWGetUserHeader_CB *ctx)
{
string buckets_obj_id;
}
}
+bool RGWRados::call(std::string command, cmdmap_t& cmdmap, std::string format,
+ bufferlist& out)
+{
+ if (command == "cache list") {
+ boost::optional<std::string> filter;
+ auto i = cmdmap.find("filter");
+ if (i != cmdmap.cend()) {
+ filter = boost::get<std::string>(i->second);
+ }
+ std::unique_ptr<Formatter> f(ceph::Formatter::create(format, "table"));
+ if (f) {
+ f->open_array_section("cache_entries");
+ call_list(filter, f.get());
+ f->close_section();
+ f->flush(out);
+ return true;
+ } else {
+ out.append("Unable to create Formatter.\n");
+ return false;
+ }
+ } else if (command == "cache inspect") {
+ std::unique_ptr<Formatter> f(ceph::Formatter::create(format, "json-pretty"));
+ if (f) {
+ const auto& target = boost::get<std::string>(cmdmap["target"]);
+ if (call_inspect(target, f.get())) {
+ f->flush(out);
+ return true;
+ } else {
+ out.append(string("Unable to find entry ") + target + string(".\n"));
+ return false;
+ }
+ } else {
+ out.append("Unable to create Formatter.\n");
+ return false;
+ }
+ } else if (command == "cache erase") {
+ const auto& target = boost::get<std::string>(cmdmap["target"]);
+ if (call_erase(target)) {
+ return true;
+ } else {
+ out.append(string("Unable to find entry ") + target + string(".\n"));
+ return false;
+ }
+ } else if (command == "cache zap") {
+ call_zap();
+ return true;
+ }
+ return false;
+}
+
+void RGWRados::call_list(const boost::optional<std::string>&,
+ ceph::Formatter*)
+{
+ return;
+}
+
+bool RGWRados::call_inspect(const std::string&, Formatter*)
+{
+ return false;
+}
+
+bool RGWRados::call_erase(const std::string&) {
+ return false;
+}
+
+void RGWRados::call_zap() {
+ return;
+}