+
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
for (map<string, RGWZone>::iterator iter = zones.begin(); iter != zones.end(); ++iter) {
RGWZone& zone = iter->second;
zone.log_data = log_data;
- zone.log_meta = (is_master && zone.id == master_zone);
RGWZoneParams zone_params(zone.id, zone.name);
int ret = zone_params.init(cct, store);
return -ENOENT;
}
-bool RGWPeriod::is_single_zonegroup(CephContext *cct, RGWRados *store)
-{
- return (period_map.zonegroups.size() == 1);
-}
-
const string& RGWPeriod::get_latest_epoch_oid()
{
if (cct->_conf->rgw_period_latest_epoch_info_oid.empty()) {
return oss.str();
}
-int RGWPeriod::read_latest_epoch(RGWPeriodLatestEpochInfo& info)
+int RGWPeriod::read_latest_epoch(RGWPeriodLatestEpochInfo& info,
+ RGWObjVersionTracker *objv)
{
string oid = get_period_oid_prefix() + get_latest_epoch_oid();
rgw_pool pool(get_pool(cct));
bufferlist bl;
RGWObjectCtx obj_ctx(store);
- int ret = rgw_get_system_obj(store, obj_ctx, pool, oid, bl, NULL, NULL);
+ int ret = rgw_get_system_obj(store, obj_ctx, pool, oid, bl, objv, nullptr);
if (ret < 0) {
ldout(cct, 1) << "error read_lastest_epoch " << pool << ":" << oid << dendl;
return ret;
return 0;
}
-int RGWPeriod::set_latest_epoch(epoch_t epoch, bool exclusive)
+int RGWPeriod::set_latest_epoch(epoch_t epoch, bool exclusive,
+ RGWObjVersionTracker *objv)
{
string oid = get_period_oid_prefix() + get_latest_epoch_oid();
::encode(info, bl);
return rgw_put_system_obj(store, pool, oid, bl.c_str(), bl.length(),
- exclusive, NULL, real_time(), NULL);
+ exclusive, objv, real_time(), nullptr);
+}
+
+int RGWPeriod::update_latest_epoch(epoch_t epoch)
+{
+ static constexpr int MAX_RETRIES = 20;
+
+ for (int i = 0; i < MAX_RETRIES; i++) {
+ RGWPeriodLatestEpochInfo info;
+ RGWObjVersionTracker objv;
+ bool exclusive = false;
+
+ // read existing epoch
+ int r = read_latest_epoch(info, &objv);
+ if (r == -ENOENT) {
+ // use an exclusive create to set the epoch atomically
+ exclusive = true;
+ ldout(cct, 20) << "creating initial latest_epoch=" << epoch
+ << " for period=" << id << dendl;
+ } else if (r < 0) {
+ ldout(cct, 0) << "ERROR: failed to read latest_epoch" << dendl;
+ return r;
+ } else if (epoch <= info.epoch) {
+ r = -EEXIST; // fail with EEXIST if epoch is not newer
+ ldout(cct, 1) << "found existing latest_epoch " << info.epoch
+ << " >= given epoch " << epoch << ", returning r=" << r << dendl;
+ return r;
+ } else {
+ ldout(cct, 20) << "updating latest_epoch from " << info.epoch
+ << " -> " << epoch << " on period=" << id << dendl;
+ }
+
+ r = set_latest_epoch(epoch, exclusive, &objv);
+ if (r == -EEXIST) {
+ continue; // exclusive create raced with another update, retry
+ } else if (r == -ECANCELED) {
+ continue; // write raced with a conflicting version, retry
+ }
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: failed to write latest_epoch" << dendl;
+ return r;
+ }
+ return 0; // return success
+ }
+
+ return -ECANCELED; // fail after max retries
}
int RGWPeriod::delete_obj()
ret = store_info(exclusive);
if (ret < 0) {
ldout(cct, 0) << "ERROR: storing info for " << id << ": " << cpp_strerror(-ret) << dendl;
+ return ret;
}
ret = set_latest_epoch(epoch);
int RGWPeriod::store_info(bool exclusive)
{
- epoch_t latest_epoch = FIRST_EPOCH - 1;
- int ret = get_latest_epoch(latest_epoch);
- if (ret < 0 && ret != -ENOENT) {
- ldout(cct, 0) << "ERROR: RGWPeriod::get_latest_epoch() returned " << cpp_strerror(-ret) << dendl;
- return ret;
- }
-
rgw_pool pool(get_pool(cct));
string oid = get_period_oid();
bufferlist bl;
::encode(*this, bl);
- ret = rgw_put_system_obj(store, pool, oid, bl.c_str(), bl.length(), exclusive, NULL, real_time(), NULL);
- if (ret < 0) {
- ldout(cct, 0) << "ERROR: rgw_put_system_obj(" << pool << ":" << oid << "): " << cpp_strerror(-ret) << dendl;
- return ret;
- }
- if (latest_epoch < epoch) {
- ret = set_latest_epoch(epoch);
- if (ret < 0) {
- ldout(cct, 0) << "ERROR: RGWPeriod::set_latest_epoch() returned " << cpp_strerror(-ret) << dendl;
- return ret;
- }
- }
- return 0;
+
+ return rgw_put_system_obj(store, pool, oid, bl.c_str(), bl.length(),
+ exclusive, NULL, real_time(), NULL);
}
rgw_pool RGWPeriod::get_pool(CephContext *cct)
return rgw_pool(cct->_conf->rgw_period_root_pool);
}
-int RGWPeriod::use_next_epoch()
-{
- epoch_t latest_epoch;
- int ret = get_latest_epoch(latest_epoch);
- if (ret < 0) {
- return ret;
- }
- epoch = latest_epoch + 1;
- ret = read_info();
- if (ret < 0 && ret != -ENOENT) {
- return ret;
- }
- if (ret == -ENOENT) {
- ret = create();
- if (ret < 0) {
- ldout(cct, 0) << "Error creating new epoch " << epoch << dendl;
- return ret;
- }
- }
- return 0;
-}
-
int RGWPeriod::add_zonegroup(const RGWZoneGroup& zonegroup)
{
if (zonegroup.realm_id != realm_id) {
return r;
}
// set as latest epoch
- r = set_latest_epoch(epoch);
+ r = update_latest_epoch(epoch);
+ if (r == -EEXIST) {
+ // already have this epoch (or a more recent one)
+ return 0;
+ }
if (r < 0) {
ldout(cct, 0) << "failed to set latest epoch: " << cpp_strerror(-r) << dendl;
return r;
*pobj = cur_obj;
- if (!bl.length())
+ if (!bl.length()) {
+ *phandle = nullptr;
return 0;
+ }
return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle, exclusive);
}
obj_len = (uint64_t)first_chunk.length();
}
while (pending_data_bl.length()) {
- void *handle;
+ void *handle = nullptr;
rgw_raw_obj obj;
uint64_t max_write_size = MIN(max_chunk_size, (uint64_t)next_part_ofs - data_ofs);
if (max_write_size > pending_data_bl.length()) {
int RGWRados::get_max_chunk_size(const rgw_pool& pool, uint64_t *max_chunk_size)
{
- uint64_t alignment;
+ uint64_t alignment = 0;
int r = get_required_alignment(pool, &alignment);
if (r < 0) {
return r;
if (async_rados) {
delete async_rados;
}
- if (use_gc_thread) {
- gc->stop_processor();
- obj_expirer->stop_processor();
- }
+
delete gc;
gc = NULL;
- if (use_lc_thread) {
- lc->stop_processor();
- }
- delete lc;
- lc = NULL;
-
delete obj_expirer;
obj_expirer = NULL;
+
+ delete lc;
+ lc = NULL;
delete rest_master_conn;
if (ret < 0) {
return ret;
}
-
ret = r.connect();
if (ret < 0) {
return ret;
return ret;
}
+
+int RGWRados::register_to_service_map(const string& daemon_type, const map<string, string>& meta)
+{
+ map<string,string> metadata = meta;
+ metadata["num_handles"] = stringify(rados.size());
+ metadata["zonegroup_id"] = zonegroup.get_id();
+ metadata["zonegroup_name"] = zonegroup.get_name();
+ metadata["zone_name"] = zone_name();
+ metadata["zone_id"] = zone_id();;
+ string name = cct->_conf->name.get_id();
+ if (name.find("rgw.") == 0) {
+ name = name.substr(4);
+ }
+ int ret = rados[0].service_daemon_register(daemon_type, name, metadata);
+ if (ret < 0) {
+ ldout(cct, 0) << "ERROR: service_daemon_register() returned ret=" << ret << ": " << cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+
+ return 0;
+}
+
/**
* Add new connection to connections map
* @param zonegroup_conn_map map which new connection will be added to
bigger_than_delim = buf;
/* if marker points at a common prefix, fast forward it into its upperbound string */
- int delim_pos = cur_marker.name.find(params.delim, params.prefix.size());
+ int delim_pos = cur_marker.name.find(params.delim, cur_prefix.size());
if (delim_pos >= 0) {
string s = cur_marker.name.substr(0, delim_pos);
s.append(bigger_than_delim);
next_marker = prefix_key;
(*common_prefixes)[prefix_key] = true;
- skip_after_delim = obj.name.substr(0, delim_pos);
+ int marker_delim_pos = cur_marker.name.find(params.delim, cur_prefix.size());
+
+ skip_after_delim = cur_marker.name.substr(0, marker_delim_pos);
skip_after_delim.append(bigger_than_delim);
ldout(cct, 20) << "skip_after_delim=" << skip_after_delim << dendl;
map<string, RGWZonePlacementInfo>::iterator piter = get_zone_params().placement_pools.find(location_rule);
if (piter == get_zone_params().placement_pools.end()) {
/* couldn't find, means we cannot really place data for this bucket in this zone */
- if (get_zonegroup().equals(zonegroup_id)) {
+ if (get_zonegroup().equals(zonegroup.get_id())) {
/* that's a configuration error, zone should have that rule, as we're within the requested
* zonegroup */
return -EINVAL;
return 0;
}
-int RGWRados::get_raw_obj_ref(const rgw_raw_obj& obj, rgw_rados_ref *ref, rgw_pool *pool)
+int RGWRados::get_raw_obj_ref(const rgw_raw_obj& obj, rgw_rados_ref *ref)
{
ref->oid = obj.oid;
ref->key = obj.loc;
} else {
ref->pool = obj.pool;
}
- if (pool) {
- *pool = ref->pool;
- }
r = open_pool_ctx(ref->pool, ref->ioctx);
if (r < 0)
return r;
return 0;
}
-int RGWRados::get_system_obj_ref(const rgw_raw_obj& obj, rgw_rados_ref *ref, rgw_pool *pool)
+int RGWRados::get_system_obj_ref(const rgw_raw_obj& obj, rgw_rados_ref *ref)
{
- return get_raw_obj_ref(obj, ref, pool);
+ return get_raw_obj_ref(obj, ref);
}
/*
void *_index_op)
{
RGWRados::Bucket::UpdateIndex *index_op = static_cast<RGWRados::Bucket::UpdateIndex *>(_index_op);
- rgw_pool pool;
- rgw_rados_ref ref;
RGWRados *store = target->get_store();
ObjectWriteOperation op;
return -EIO;
}
+ rgw_rados_ref ref;
r = store->get_obj_head_ref(target->get_bucket_info(), obj, &ref);
if (r < 0)
return r;
uint64_t epoch;
int64_t poolid;
-
- bool orig_exists = state->exists;
- uint64_t orig_size = state->accounted_size;
+ bool orig_exists;
+ uint64_t orig_size;
+
+ if (!reset_obj) { //Multipart upload, it has immutable head.
+ orig_exists = false;
+ orig_size = 0;
+ } else {
+ orig_exists = state->exists;
+ orig_size = state->accounted_size;
+ }
bool versioned_target = (meta.olh_epoch > 0 || !obj.key.instance.empty());
RGWObjVersionTracker *objv_tracker,
real_time set_mtime /* 0 for don't set */)
{
- rgw_pool pool;
rgw_rados_ref ref;
- int r = get_system_obj_ref(obj, &ref, &pool);
+ int r = get_system_obj_ref(obj, &ref);
if (r < 0)
return r;
RGWObjVersionTracker *objv_tracker)
{
rgw_rados_ref ref;
- rgw_pool pool;
- int r = get_system_obj_ref(obj, &ref, &pool);
+ int r = get_system_obj_ref(obj, &ref);
if (r < 0) {
return r;
}
}
ret = read_op.iterate(0, astate->size - 1, out_stream_req->get_out_cb());
- if (ret < 0)
+ if (ret < 0) {
+ delete out_stream_req;
return ret;
+ }
ret = rest_master_conn->complete_request(out_stream_req, etag, mtime);
if (ret < 0)
}
/* single zonegroup and a single zone */
- if (current_period.is_single_zonegroup(cct, this) && get_zonegroup().zones.size() == 1) {
+ if (current_period.is_single_zonegroup() && get_zonegroup().zones.size() == 1) {
return false;
}
need_invalidate = true;
r = 0;
}
- bool removed = (r >= 0);
int64_t poolid = ref.ioctx.get_id();
if (r >= 0) {
obj_tombstone_cache->add(obj, entry);
}
r = index_op.complete_del(poolid, ref.ioctx.get_last_version(), state->mtime, params.remove_objs);
- } else {
- int ret = index_op.cancel();
- if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: index_op.cancel() returned ret=" << ret << dendl;
- }
- }
- if (removed) {
+
int ret = target->complete_atomic_modification();
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: complete_atomic_modification returned ret=" << ret << dendl;
}
/* other than that, no need to propagate error */
+ } else {
+ int ret = index_op.cancel();
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: index_op.cancel() returned ret=" << ret << dendl;
+ }
}
if (need_invalidate) {
int RGWRados::delete_raw_obj(const rgw_raw_obj& obj)
{
rgw_rados_ref ref;
- rgw_pool pool;
- int r = get_raw_obj_ref(obj, &ref, &pool);
+ int r = get_raw_obj_ref(obj, &ref);
if (r < 0) {
return r;
}
return -EINVAL;
}
rgw_rados_ref ref;
- rgw_pool pool;
- int r = get_raw_obj_ref(obj, &ref, &pool);
+ int r = get_raw_obj_ref(obj, &ref);
if (r < 0) {
return r;
}
int RGWRados::system_obj_get_attr(rgw_raw_obj& obj, const char *name, bufferlist& dest)
{
rgw_rados_ref ref;
- rgw_pool pool;
- int r = get_system_obj_ref(obj, &ref, &pool);
+ int r = get_system_obj_ref(obj, &ref);
if (r < 0) {
return r;
}
RGWObjVersionTracker *objv_tracker)
{
rgw_rados_ref ref;
- rgw_pool pool;
- int r = get_system_obj_ref(obj, &ref, &pool);
+ int r = get_system_obj_ref(obj, &ref);
if (r < 0) {
return r;
}
int RGWRados::SystemObject::Read::GetObjState::get_ref(RGWRados *store, rgw_raw_obj& obj, rgw_rados_ref **pref)
{
if (!has_ref) {
- rgw_pool pool;
- int r = store->get_raw_obj_ref(obj, &ref, &pool);
+ int r = store->get_raw_obj_ref(obj, &ref);
if (r < 0) {
return r;
}
rgw_raw_obj obj(get_zone_params().usage_log_pool, oid);
rgw_rados_ref ref;
- rgw_pool pool;
- int r = get_raw_obj_ref(obj, &ref, &pool);
+ int r = get_raw_obj_ref(obj, &ref);
if (r < 0) {
return r;
}
rgw_raw_obj obj(get_zone_params().usage_log_pool, oid);
rgw_rados_ref ref;
- rgw_pool pool;
- int r = get_raw_obj_ref(obj, &ref, &pool);
+ int r = get_raw_obj_ref(obj, &ref);
if (r < 0) {
return r;
}
rgw_raw_obj obj(get_zone_params().usage_log_pool, oid);
rgw_rados_ref ref;
- rgw_pool pool;
- int r = get_raw_obj_ref(obj, &ref, &pool);
+ int r = get_raw_obj_ref(obj, &ref);
if (r < 0) {
return r;
}
rgw_raw_obj obj(get_zone_params().user_uid_pool, buckets_obj_id);
rgw_rados_ref ref;
- rgw_pool pool;
- int r = get_raw_obj_ref(obj, &ref, &pool);
+ int r = get_raw_obj_ref(obj, &ref);
if (r < 0) {
return r;
}
rgw_raw_obj obj(get_zone_params().user_uid_pool, buckets_obj_id);
rgw_rados_ref ref;
- rgw_pool pool;
- int r = get_raw_obj_ref(obj, &ref, &pool);
+ int r = get_raw_obj_ref(obj, &ref);
if (r < 0) {
return r;
}
bool * const truncated)
{
rgw_rados_ref ref;
- rgw_pool pool;
- int r = get_raw_obj_ref(obj, &ref, &pool);
+ int r = get_raw_obj_ref(obj, &ref);
if (r < 0) {
return r;
}
int RGWRados::cls_user_update_buckets(rgw_raw_obj& obj, list<cls_user_bucket_entry>& entries, bool add)
{
rgw_rados_ref ref;
- rgw_pool pool;
- int r = get_raw_obj_ref(obj, &ref, &pool);
+ int r = get_raw_obj_ref(obj, &ref);
if (r < 0) {
return r;
}
int RGWRados::cls_user_complete_stats_sync(rgw_raw_obj& obj)
{
rgw_rados_ref ref;
- rgw_pool pool;
- int r = get_raw_obj_ref(obj, &ref, &pool);
+ int r = get_raw_obj_ref(obj, &ref);
if (r < 0) {
return r;
}
int RGWRados::cls_user_remove_bucket(rgw_raw_obj& obj, const cls_user_bucket& bucket)
{
- rgw_pool p;
rgw_rados_ref ref;
- int r = get_system_obj_ref(obj, &ref, &p);
+ int r = get_system_obj_ref(obj, &ref);
if (r < 0) {
return r;
}
return 0;
}
-int RGWRados::check_bucket_shards(const RGWBucketInfo& bucket_info, rgw_bucket& bucket,
+int RGWRados::check_bucket_shards(const RGWBucketInfo& bucket_info, const rgw_bucket& bucket,
RGWQuotaInfo& bucket_quota)
{
if (!cct->_conf->rgw_dynamic_resharding) {
}
if (need_resharding) {
+ ldout(cct, 20) << __func__ << " bucket " << bucket.name << " need resharding " <<
+ " old num shards " << bucket_info.num_shards << " new num shards " << suggested_num_shards <<
+ dendl;
return add_bucket_to_reshard(bucket_info, suggested_num_shards);
}