#include "rgw_rest_conn.h"
#include "rgw_cr_rados.h"
#include "rgw_cr_rest.h"
+#include "rgw_datalog.h"
#include "rgw_putobj_processor.h"
#include "cls/rgw/cls_rgw_ops.h"
#include "rgw_compression.h"
#include "rgw_etag_verifier.h"
#include "rgw_worker.h"
+#include "rgw_notify.h"
#undef fork // fails to compile RGWPeriod::fork() below
#include "services/svc_sys_obj_cache.h"
#include "services/svc_bucket.h"
#include "services/svc_mdlog.h"
-#include "services/svc_datalog_rados.h"
#include "compressor/Compressor.h"
return raw_obj;
}
-rgw_raw_obj rgw_obj_select::get_raw_obj(RGWRados *store) const
+rgw_raw_obj rgw_obj_select::get_raw_obj(rgw::sal::RGWStore* store) const
{
if (!is_raw) {
rgw_raw_obj r;
- store->obj_to_raw(placement_rule, obj, &r);
+ store->get_raw_obj(placement_rule, obj, &r);
return r;
}
return raw_obj;
http_manager.start();
}
- int notify_all(map<rgw_zone_id, RGWRESTConn *>& conn_map, map<int, set<string> >& shards) {
+ int notify_all(map<rgw_zone_id, RGWRESTConn *>& conn_map,
+ bc::flat_map<int, bc::flat_set<string> >& shards) {
rgw_http_param_pair pairs[] = { { "type", "data" },
{ "notify", NULL },
{ "source-zone", store->svc.zone->get_zone_params().get_id().c_str() },
for (auto iter = conn_map.begin(); iter != conn_map.end(); ++iter) {
RGWRESTConn *conn = iter->second;
RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), this);
- stack->call(new RGWPostRESTResourceCR<map<int, set<string> >, int>(store->ctx(), conn, &http_manager, "/admin/log", pairs, shards, NULL));
+ stack->call(new RGWPostRESTResourceCR<bc::flat_map<int, bc::flat_set<string> >, int>(store->ctx(), conn, &http_manager, "/admin/log", pairs, shards, NULL));
stacks.push_back(stack);
}
int RGWDataNotifier::process()
{
- auto data_log = store->svc.datalog_rados->get_log();
+ auto data_log = store->svc.datalog_rados;
if (!data_log) {
return 0;
}
- map<int, set<string> > shards;
-
- data_log->read_clear_modified(shards);
+ auto shards = data_log->read_clear_modified();
if (shards.empty()) {
return 0;
}
- for (map<int, set<string> >::iterator iter = shards.begin(); iter != shards.end(); ++iter) {
- ldout(cct, 20) << __func__ << "(): notifying datalog change, shard_id=" << iter->first << ": " << iter->second << dendl;
+ for (const auto& [shard_id, keys] : shards) {
+ ldout(cct, 20) << __func__ << "(): notifying datalog change, shard_id="
+ << shard_id << ": " << keys << dendl;
}
notify_mgr.notify_all(store->svc.zone->get_zone_data_notify_to_map(), shards);
}
int process() override {
- sync.run();
+ sync.run(null_yield);
return 0;
}
};
}
delete reshard;
delete index_completion_manager;
+
+ rgw::notify::shutdown();
}
/**
int RGWRados::register_to_service_map(const string& daemon_type, const map<string, string>& meta)
{
+ string name = cct->_conf->name.get_id();
+ if (name.compare(0, 4, "rgw.") == 0) {
+ name = name.substr(4);
+ }
map<string,string> metadata = meta;
metadata["num_handles"] = "1"s;
metadata["zonegroup_id"] = svc.zone->get_zonegroup().get_id();
metadata["zonegroup_name"] = svc.zone->get_zonegroup().get_name();
metadata["zone_name"] = svc.zone->zone_name();
metadata["zone_id"] = svc.zone->zone_id().id;
- string name = cct->_conf->name.get_id();
- if (name.compare(0, 4, "rgw.") == 0) {
- name = name.substr(4);
- }
- int ret = rados.service_daemon_register(daemon_type, name, metadata);
+ metadata["id"] = name;
+ int ret = rados.service_daemon_register(
+ daemon_type,
+ stringify(rados.get_instance_id()),
+ metadata);
if (ret < 0) {
ldout(cct, 0) << "ERROR: service_daemon_register() returned ret=" << ret << ": " << cpp_strerror(-ret) << dendl;
return ret;
if (ret < 0)
return ret;
+ ret = open_notif_pool_ctx();
+ if (ret < 0)
+ return ret;
+
pools_initialized = true;
gc = new RGWGC();
index_completion_manager = new RGWIndexCompletionManager(this);
ret = index_completion_manager->start();
+ if (ret < 0) {
+ return ret;
+ }
+ ret = rgw::notify::init(cct, store);
+ if (ret < 0 ) {
+ ldout(cct, 1) << "ERROR: failed to initialize notification manager" << dendl;
+ }
return ret;
}
int RGWRados::init_svc(bool raw)
{
if (raw) {
- return svc.init_raw(cct, use_cache);
+ return svc.init_raw(cct, use_cache, null_yield);
}
- return svc.init(cct, use_cache, run_sync_thread);
+ return svc.init(cct, use_cache, run_sync_thread, null_yield);
}
int RGWRados::init_ctl()
return rgw_init_ioctx(get_rados_handle(), svc.zone->get_zone_params().reshard_pool, reshard_pool_ctx, true, true);
}
+int RGWRados::open_notif_pool_ctx()
+{
+ return rgw_init_ioctx(get_rados_handle(), svc.zone->get_zone_params().notif_pool, notif_pool_ctx, true, true);
+}
+
int RGWRados::open_pool_ctx(const rgw_pool& pool, librados::IoCtx& io_ctx,
bool mostly_omap)
{
} catch (buffer::error& err) {
return -EINVAL;
}
- state->bl.clear();
- state->bl.claim(old);
+ state->bl = std::move(old);
state->bl.claim_append(more);
state->p = state->bl.cbegin();
if ((unsigned)r < chunk)
real_time creation_time,
rgw_bucket *pmaster_bucket,
uint32_t *pmaster_num_shards,
+ optional_yield y,
bool exclusive)
{
#define MAX_CREATE_RETRIES 20 /* need to bound retries */
for (int i = 0; i < MAX_CREATE_RETRIES; i++) {
int ret = 0;
ret = svc.zone->select_bucket_placement(owner, zonegroup_id, placement_rule,
- &selected_placement_rule, &rule_info);
+ &selected_placement_rule, &rule_info, y);
if (ret < 0)
return ret;
info.owner = owner.user_id;
info.zonegroup = zonegroup_id;
info.placement_rule = selected_placement_rule;
- info.index_type = rule_info.index_type;
info.swift_ver_location = swift_ver_location;
info.swift_versioning = (!swift_ver_location.empty());
- if (pmaster_num_shards) {
- info.num_shards = *pmaster_num_shards;
- } else {
- info.num_shards = bucket_index_max_shards;
- }
- info.bucket_index_shard_hash_type = RGWBucketInfo::MOD;
+
+ init_default_bucket_layout(cct, info.layout, svc.zone->get_zone(),
+ pmaster_num_shards ?
+ std::optional{*pmaster_num_shards} :
+ std::nullopt,
+ rule_info.index_type);
+
info.requester_pays = false;
if (real_clock::is_zero(creation_time)) {
info.creation_time = ceph::real_clock::now();
RGWObjManifest::obj_iterator miter;
RGWObjManifest& manifest = *astate->manifest;
for (miter = manifest.obj_begin(); miter != manifest.obj_end(); ++miter) {
- rgw_raw_obj raw_loc = miter.get_location().get_raw_obj(this);
+ rgw_raw_obj raw_loc = miter.get_location().get_raw_obj(store);
rgw_obj loc;
string oid;
string locator;
}
int RGWRados::BucketShard::init(const rgw_bucket& _bucket,
- int sid,
+ int sid, const rgw::bucket_index_layout_generation& idx_layout,
RGWBucketInfo* bucket_info_out)
{
bucket = _bucket;
auto obj_ctx = store->svc.sysobj->init_obj_ctx();
+
RGWBucketInfo bucket_info;
RGWBucketInfo* bucket_info_p =
bucket_info_out ? bucket_info_out : &bucket_info;
string oid;
- ret = store->svc.bi_rados->open_bucket_index_shard(*bucket_info_p, shard_id, &bucket_obj);
+ ret = store->svc.bi_rados->open_bucket_index_shard(*bucket_info_p, shard_id, idx_layout, &bucket_obj);
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: open_bucket_index_shard() returned ret=" << ret << dendl;
return ret;
return 0;
}
-int RGWRados::BucketShard::init(const RGWBucketInfo& bucket_info, int sid)
+int RGWRados::BucketShard::init(const RGWBucketInfo& bucket_info, const rgw::bucket_index_layout_generation& idx_layout, int sid)
{
bucket = bucket_info.bucket;
shard_id = sid;
- int ret = store->svc.bi_rados->open_bucket_index_shard(bucket_info, shard_id, &bucket_obj);
+ int ret = store->svc.bi_rados->open_bucket_index_shard(bucket_info, shard_id, idx_layout, &bucket_obj);
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: open_bucket_index_shard() returned ret=" << ret << dendl;
return ret;
return 0;
}
+bool RGWRados::swift_versioning_enabled(rgw::sal::RGWBucket* bucket) const
+{
+ return bucket->get_info().has_swift_versioning() &&
+ bucket->get_info().swift_ver_location.size();
+}
int RGWRados::swift_versioning_copy(RGWObjectCtx& obj_ctx,
const rgw_user& user,
- RGWBucketInfo& bucket_info,
- rgw_obj& obj,
+ rgw::sal::RGWBucket* bucket,
+ rgw::sal::RGWObject* obj,
const DoutPrefixProvider *dpp,
optional_yield y)
{
- if (! swift_versioning_enabled(bucket_info)) {
+ if (! swift_versioning_enabled(bucket)) {
return 0;
}
- obj_ctx.set_atomic(obj);
+ obj->set_atomic(&obj_ctx);
RGWObjState * state = nullptr;
- int r = get_obj_state(&obj_ctx, bucket_info, obj, &state, false, y);
+ int r = get_obj_state(&obj_ctx, bucket->get_info(), obj->get_obj(), &state, false, y);
if (r < 0) {
return r;
}
return 0;
}
- const string& src_name = obj.get_oid();
+ const string& src_name = obj->get_oid();
char buf[src_name.size() + 32];
struct timespec ts = ceph::real_clock::to_timespec(state->mtime);
snprintf(buf, sizeof(buf), "%03x%s/%lld.%06ld", (int)src_name.size(),
RGWBucketInfo dest_bucket_info;
- r = get_bucket_info(&svc, bucket_info.bucket.tenant, bucket_info.swift_ver_location, dest_bucket_info, NULL, null_yield, NULL);
+ r = get_bucket_info(&svc, bucket->get_tenant(), bucket->get_info().swift_ver_location, dest_bucket_info, NULL, null_yield, NULL);
if (r < 0) {
ldout(cct, 10) << "failed to read dest bucket info: r=" << r << dendl;
if (r == -ENOENT) {
return r;
}
- if (dest_bucket_info.owner != bucket_info.owner) {
+ if (dest_bucket_info.owner != bucket->get_info().owner) {
return -ERR_PRECONDITION_FAILED;
}
- rgw_obj dest_obj(dest_bucket_info.bucket, buf);
+ rgw::sal::RGWRadosBucket dest_bucket(store, dest_bucket_info);
+ rgw::sal::RGWRadosObject dest_obj(store, rgw_obj_key(buf), &dest_bucket);
if (dest_bucket_info.versioning_enabled()){
- gen_rand_obj_instance_name(&dest_obj);
+ dest_obj.gen_rand_obj_instance_name();
}
- obj_ctx.set_atomic(dest_obj);
+ dest_obj.set_atomic(&obj_ctx);
rgw_zone_id no_zone;
user,
NULL, /* req_info *info */
no_zone,
- dest_obj,
+ &dest_obj,
obj,
- dest_bucket_info,
- bucket_info,
- bucket_info.placement_rule,
+ &dest_bucket,
+ bucket,
+ bucket->get_placement_rule(),
NULL, /* time_t *src_mtime */
NULL, /* time_t *mtime */
NULL, /* const time_t *mod_ptr */
int RGWRados::swift_versioning_restore(RGWObjectCtx& obj_ctx,
const rgw_user& user,
- RGWBucketInfo& bucket_info,
- rgw_obj& obj,
+ rgw::sal::RGWBucket* bucket,
+ rgw::sal::RGWObject* obj,
bool& restored, /* out */
const DoutPrefixProvider *dpp)
{
- if (! swift_versioning_enabled(bucket_info)) {
+ if (! swift_versioning_enabled(bucket)) {
return 0;
}
/* Bucket info of the bucket that stores previous versions of our object. */
RGWBucketInfo archive_binfo;
- int ret = get_bucket_info(&svc, bucket_info.bucket.tenant,
- bucket_info.swift_ver_location, archive_binfo,
- nullptr, null_yield, nullptr);
+ int ret = get_bucket_info(&svc, bucket->get_tenant(),
+ bucket->get_info().swift_ver_location,
+ archive_binfo, nullptr, null_yield, nullptr);
if (ret < 0) {
return ret;
}
* into consideration. For we can live with that.
*
* TODO: delegate this check to un upper layer and compare with ACLs. */
- if (bucket_info.owner != archive_binfo.owner) {
+ if (bucket->get_info().owner != archive_binfo.owner) {
return -EPERM;
}
* irrelevant and may be safely skipped. */
std::map<std::string, ceph::bufferlist> no_attrs;
- rgw_obj archive_obj(archive_binfo.bucket, entry.key);
+ rgw::sal::RGWRadosBucket archive_bucket(store, archive_binfo);
+ rgw::sal::RGWRadosObject archive_obj(store, entry.key, &archive_bucket);
- if (bucket_info.versioning_enabled()){
- gen_rand_obj_instance_name(&obj);
+ if (bucket->versioning_enabled()){
+ obj->gen_rand_obj_instance_name();
}
- obj_ctx.set_atomic(archive_obj);
- obj_ctx.set_atomic(obj);
+ archive_obj.set_atomic(&obj_ctx);
+ obj->set_atomic(&obj_ctx);
int ret = copy_obj(obj_ctx,
user,
nullptr, /* req_info *info */
no_zone,
obj, /* dest obj */
- archive_obj, /* src obj */
- bucket_info, /* dest bucket info */
- archive_binfo, /* src bucket info */
- bucket_info.placement_rule, /* placement_rule */
+ &archive_obj, /* src obj */
+ bucket, /* dest bucket info */
+ &archive_bucket, /* src bucket info */
+ bucket->get_placement_rule(), /* placement_rule */
nullptr, /* time_t *src_mtime */
nullptr, /* time_t *mtime */
nullptr, /* const time_t *mod_ptr */
}
/* Need to remove the archived copy. */
- ret = delete_obj(obj_ctx, archive_binfo, archive_obj,
+ ret = delete_obj(obj_ctx, archive_binfo, archive_obj.get_obj(),
archive_binfo.versioning_status());
return ret;
};
- const std::string& obj_name = obj.get_oid();
+ const std::string& obj_name = obj->get_oid();
const auto prefix = boost::str(boost::format("%03x%s") % obj_name.size()
% obj_name);
}
}
-int RGWRados::rewrite_obj(RGWBucketInfo& dest_bucket_info, const rgw_obj& obj, const DoutPrefixProvider *dpp, optional_yield y)
+int RGWRados::rewrite_obj(RGWBucketInfo& dest_bucket_info, rgw::sal::RGWObject* obj, const DoutPrefixProvider *dpp, optional_yield y)
{
- map<string, bufferlist> attrset;
-
- real_time mtime;
- uint64_t obj_size;
RGWObjectCtx rctx(this->store);
+ rgw::sal::RGWRadosBucket bucket(store, dest_bucket_info);
- RGWRados::Object op_target(this, dest_bucket_info, rctx, obj);
- RGWRados::Object::Read read_op(&op_target);
-
- read_op.params.attrs = &attrset;
- read_op.params.lastmod = &mtime;
- read_op.params.obj_size = &obj_size;
-
- int ret = read_op.prepare(y);
- if (ret < 0)
- return ret;
-
- attrset.erase(RGW_ATTR_ID_TAG);
- attrset.erase(RGW_ATTR_TAIL_TAG);
-
- return copy_obj_data(rctx, dest_bucket_info, dest_bucket_info.placement_rule,
- read_op, obj_size - 1, obj, NULL, mtime, attrset,
- 0, real_time(), NULL, dpp, y);
+ return obj->copy_obj_data(rctx, &bucket, obj, 0, NULL, dpp, y);
}
struct obj_time_weight {
const rgw_user& user_id,
req_info *info,
const rgw_zone_id& source_zone,
- rgw_obj& src_obj,
+ rgw::sal::RGWObject* src_obj,
const RGWBucketInfo *src_bucket_info,
real_time *src_mtime,
uint64_t *psize,
return ret;
}
- ret = conn->complete_request(in_stream_req, nullptr, &set_mtime, psize, nullptr, pheaders);
+ ret = conn->complete_request(in_stream_req, nullptr, &set_mtime, psize,
+ nullptr, pheaders, null_yield);
if (ret < 0) {
return ret;
}
const rgw_user& user_id,
req_info *info,
const rgw_zone_id& source_zone,
- const rgw_obj& dest_obj,
- const rgw_obj& src_obj,
- const RGWBucketInfo& dest_bucket_info,
- const RGWBucketInfo *src_bucket_info,
+ rgw::sal::RGWObject* dest_obj,
+ rgw::sal::RGWObject* src_obj,
+ rgw::sal::RGWBucket* dest_bucket,
+ rgw::sal::RGWBucket* src_bucket,
std::optional<rgw_placement_rule> dest_placement_rule,
real_time *src_mtime,
real_time *mtime,
const char *if_nomatch,
AttrsMod attrs_mod,
bool copy_if_newer,
- map<string, bufferlist>& attrs,
+ rgw::sal::RGWAttrs& attrs,
RGWObjCategory category,
std::optional<uint64_t> olh_epoch,
real_time delete_at,
rgw::BlockingAioThrottle aio(cct->_conf->rgw_put_obj_min_window_size);
using namespace rgw::putobj;
- AtomicObjectProcessor processor(&aio, this->store, dest_bucket_info, nullptr, user_id,
- obj_ctx, dest_obj, olh_epoch, tag, dpp, null_yield);
+ AtomicObjectProcessor processor(&aio, this->store, dest_bucket, nullptr, user_id,
+ obj_ctx, dest_obj->clone(), olh_epoch,
+ tag, dpp, null_yield);
RGWRESTConn *conn;
auto& zone_conn_map = svc.zone->get_zone_conn_map();
auto& zonegroup_conn_map = svc.zone->get_zonegroup_conn_map();
if (source_zone.empty()) {
- if (!src_bucket_info || src_bucket_info->zonegroup.empty()) {
+ if (!src_bucket || src_bucket->get_info().zonegroup.empty()) {
/* source is in the master zonegroup */
conn = svc.zone->get_master_conn();
} else {
- map<string, RGWRESTConn *>::iterator iter = zonegroup_conn_map.find(src_bucket_info->zonegroup);
+ map<string, RGWRESTConn *>::iterator iter = zonegroup_conn_map.find(src_bucket->get_info().zonegroup);
if (iter == zonegroup_conn_map.end()) {
ldout(cct, 0) << "could not find zonegroup connection to zonegroup: " << source_zone << dendl;
return -ENOENT;
const rgw_placement_rule *ptail_rule;
int ret = filter->filter(cct,
- src_obj.key,
- dest_bucket_info,
+ src_obj->get_key(),
+ dest_bucket->get_info(),
dest_placement_rule,
obj_attrs,
&override_owner,
if (copy_if_newer) {
/* need to get mtime for destination */
- ret = get_obj_state(&obj_ctx, dest_bucket_info, dest_obj, &dest_state, false, null_yield);
+ ret = get_obj_state(&obj_ctx, dest_bucket->get_info(), dest_obj->get_obj(), &dest_state, false, null_yield);
if (ret < 0)
goto set_err_state;
}
ret = conn->complete_request(in_stream_req, &etag, &set_mtime,
- &expected_size, nullptr, nullptr);
+ &expected_size, nullptr, nullptr, null_yield);
if (ret < 0) {
goto set_err_state;
}
RGWCompressionInfo cs_info;
cs_info.compression_type = plugin->get_type_name();
cs_info.orig_size = cb.get_data_len();
+ cs_info.compressor_message = compressor->get_compressor_message();
cs_info.blocks = move(compressor->get_compression_blocks());
encode(cs_info, tmp);
cb.get_attrs()[RGW_ATTR_COMPRESSION] = tmp;
if (copy_if_newer && canceled) {
ldout(cct, 20) << "raced with another write of obj: " << dest_obj << dendl;
- obj_ctx.invalidate(dest_obj); /* object was overwritten */
- ret = get_obj_state(&obj_ctx, dest_bucket_info, dest_obj, &dest_state, false, null_yield);
+ obj_ctx.invalidate(dest_obj->get_obj()); /* object was overwritten */
+ ret = get_obj_state(&obj_ctx, dest_bucket->get_info(), dest_obj->get_obj(), &dest_state, false, null_yield);
if (ret < 0) {
ldout(cct, 0) << "ERROR: " << __func__ << ": get_err_state() returned ret=" << ret << dendl;
goto set_err_state;
// for OP_LINK_OLH to call set_olh() with a real olh_epoch
if (olh_epoch && *olh_epoch > 0) {
constexpr bool log_data_change = true;
- ret = set_olh(obj_ctx, dest_bucket_info, dest_obj, false, nullptr,
+ ret = set_olh(obj_ctx, dest_bucket->get_info(), dest_obj->get_obj(), false, nullptr,
*olh_epoch, real_time(), false, null_yield, zones_trace, log_data_change);
} else {
// we already have the latest copy
map<string, bufferlist>& src_attrs,
RGWRados::Object::Read& read_op,
const rgw_user& user_id,
- rgw_obj& dest_obj,
+ rgw::sal::RGWObject* dest_obj,
real_time *mtime)
{
string etag;
return ret;
}
- ret = rest_master_conn->complete_request(out_stream_req, etag, mtime);
+ ret = rest_master_conn->complete_request(out_stream_req, etag, mtime, null_yield);
if (ret < 0)
return ret;
const rgw_user& user_id,
req_info *info,
const rgw_zone_id& source_zone,
- rgw_obj& dest_obj,
- rgw_obj& src_obj,
- RGWBucketInfo& dest_bucket_info,
- RGWBucketInfo& src_bucket_info,
+ rgw::sal::RGWObject* dest_obj,
+ rgw::sal::RGWObject* src_obj,
+ rgw::sal::RGWBucket* dest_bucket,
+ rgw::sal::RGWBucket* src_bucket,
const rgw_placement_rule& dest_placement,
real_time *src_mtime,
real_time *mtime,
const char *if_nomatch,
AttrsMod attrs_mod,
bool copy_if_newer,
- map<string, bufferlist>& attrs,
+ rgw::sal::RGWAttrs& attrs,
RGWObjCategory category,
uint64_t olh_epoch,
real_time delete_at,
{
int ret;
uint64_t obj_size;
- rgw_obj shadow_obj = dest_obj;
+ rgw_obj shadow_obj = dest_obj->get_obj();
string shadow_oid;
bool remote_src;
bool remote_dest;
- append_rand_alpha(cct, dest_obj.get_oid(), shadow_oid, 32);
- shadow_obj.init_ns(dest_obj.bucket, shadow_oid, shadow_ns);
+ append_rand_alpha(cct, dest_obj->get_oid(), shadow_oid, 32);
+ shadow_obj.init_ns(dest_obj->get_bucket()->get_key(), shadow_oid, shadow_ns);
auto& zonegroup = svc.zone->get_zonegroup();
- remote_dest = !zonegroup.equals(dest_bucket_info.zonegroup);
- remote_src = !zonegroup.equals(src_bucket_info.zonegroup);
+ remote_dest = !zonegroup.equals(dest_bucket->get_info().zonegroup);
+ remote_src = !zonegroup.equals(src_bucket->get_info().zonegroup);
if (remote_src && remote_dest) {
ldpp_dout(dpp, 0) << "ERROR: can't copy object when both src and dest buckets are remote" << dendl;
return -EINVAL;
}
- ldpp_dout(dpp, 5) << "Copy object " << src_obj.bucket << ":" << src_obj.get_oid() << " => " << dest_obj.bucket << ":" << dest_obj.get_oid() << dendl;
+ ldpp_dout(dpp, 5) << "Copy object " << src_obj->get_bucket() << ":" << src_obj->get_oid() << " => " << dest_obj->get_bucket() << ":" << dest_obj->get_oid() << dendl;
if (remote_src || !source_zone.empty()) {
return fetch_remote_obj(obj_ctx, user_id, info, source_zone,
- dest_obj, src_obj, dest_bucket_info, &src_bucket_info,
+ dest_obj, src_obj, dest_bucket, src_bucket,
dest_placement, src_mtime, mtime, mod_ptr,
unmod_ptr, high_precision_time,
if_match, if_nomatch, attrs_mod, copy_if_newer, attrs, category,
}
map<string, bufferlist> src_attrs;
- RGWRados::Object src_op_target(this, src_bucket_info, obj_ctx, src_obj);
+ RGWRados::Object src_op_target(this, src_bucket->get_info(), obj_ctx, src_obj->get_obj());
RGWRados::Object::Read read_op(&src_op_target);
read_op.conds.mod_ptr = mod_ptr;
RGWObjManifest manifest;
RGWObjState *astate = NULL;
- ret = get_obj_state(&obj_ctx, src_bucket_info, src_obj, &astate, y);
+ ret = get_obj_state(&obj_ctx, src_bucket->get_info(), src_obj->get_obj(), &astate, y);
if (ret < 0) {
return ret;
}
}
uint64_t max_chunk_size;
- ret = get_max_chunk_size(dest_bucket_info.placement_rule, dest_obj, &max_chunk_size);
+ ret = get_max_chunk_size(dest_bucket->get_placement_rule(), dest_obj->get_obj(), &max_chunk_size);
if (ret < 0) {
- ldpp_dout(dpp, 0) << "ERROR: failed to get max_chunk_size() for bucket " << dest_obj.bucket << dendl;
+ ldpp_dout(dpp, 0) << "ERROR: failed to get max_chunk_size() for bucket " << dest_obj->get_bucket() << dendl;
return ret;
}
}
if (!src_rule || src_rule->empty()) {
- src_rule = &src_bucket_info.placement_rule;
+ src_rule = &src_bucket->get_placement_rule();
}
- if (!get_obj_data_pool(*src_rule, src_obj, &src_pool)) {
+ if (!get_obj_data_pool(*src_rule, src_obj->get_obj(), &src_pool)) {
ldpp_dout(dpp, 0) << "ERROR: failed to locate data pool for " << src_obj << dendl;
return -EIO;
}
- if (!get_obj_data_pool(dest_placement, dest_obj, &dest_pool)) {
+ if (!get_obj_data_pool(dest_placement, dest_obj->get_obj(), &dest_pool)) {
ldpp_dout(dpp, 0) << "ERROR: failed to locate data pool for " << dest_obj << dendl;
return -EIO;
}
if (copy_data) { /* refcounting tail wouldn't work here, just copy the data */
attrs.erase(RGW_ATTR_TAIL_TAG);
- return copy_obj_data(obj_ctx, dest_bucket_info, dest_placement, read_op, obj_size - 1, dest_obj,
+ return copy_obj_data(obj_ctx, dest_bucket, dest_placement, read_op, obj_size - 1, dest_obj,
mtime, real_time(), attrs, olh_epoch, delete_at, petag, dpp, y);
}
}
rgw_rados_ref ref;
- ret = get_raw_obj_ref(miter.get_location().get_raw_obj(this), &ref);
+ ret = get_raw_obj_ref(miter.get_location().get_raw_obj(store), &ref);
if (ret < 0) {
return ret;
}
RGWObjManifest *pmanifest;
ldpp_dout(dpp, 20) << "dest_obj=" << dest_obj << " src_obj=" << src_obj << " copy_itself=" << (int)copy_itself << dendl;
- RGWRados::Object dest_op_target(this, dest_bucket_info, obj_ctx, dest_obj);
+ RGWRados::Object dest_op_target(this, dest_bucket->get_info(), obj_ctx, dest_obj->get_obj());
RGWRados::Object::Write write_op(&dest_op_target);
string tag;
manifest = *astate->manifest;
const rgw_bucket_placement& tail_placement = manifest.get_tail_placement();
if (tail_placement.bucket.name.empty()) {
- manifest.set_tail_placement(tail_placement.placement_rule, src_obj.bucket);
+ manifest.set_tail_placement(tail_placement.placement_rule, src_obj->get_bucket()->get_key());
}
string ref_tag;
for (; miter != astate->manifest->obj_end(); ++miter) {
ObjectWriteOperation op;
ref_tag = tag + '\0';
cls_refcount_get(op, ref_tag, true);
- const rgw_raw_obj& loc = miter.get_location().get_raw_obj(this);
+ const rgw_raw_obj& loc = miter.get_location().get_raw_obj(store);
auto& ioctx = ref.pool.ioctx();
ioctx.locator_set_key(loc.loc);
goto done_ret;
}
- pmanifest->set_head(dest_bucket_info.placement_rule, dest_obj, first_chunk.length());
+ pmanifest->set_head(dest_bucket->get_placement_rule(), dest_obj->get_obj(), first_chunk.length());
} else {
- pmanifest->set_head(dest_bucket_info.placement_rule, dest_obj, 0);
+ pmanifest->set_head(dest_bucket->get_placement_rule(), dest_obj->get_obj(), 0);
}
write_op.meta.data = &first_chunk;
write_op.meta.manifest = pmanifest;
write_op.meta.ptag = &tag;
- write_op.meta.owner = dest_bucket_info.owner;
+ write_op.meta.owner = dest_bucket->get_info().owner;
write_op.meta.mtime = mtime;
write_op.meta.flags = PUT_OBJ_CREATE;
write_op.meta.category = category;
int RGWRados::copy_obj_data(RGWObjectCtx& obj_ctx,
- RGWBucketInfo& dest_bucket_info,
+ rgw::sal::RGWBucket* bucket,
const rgw_placement_rule& dest_placement,
RGWRados::Object::Read& read_op, off_t end,
- const rgw_obj& dest_obj,
+ rgw::sal::RGWObject* dest_obj,
real_time *mtime,
real_time set_mtime,
- map<string, bufferlist>& attrs,
+ rgw::sal::RGWAttrs& attrs,
uint64_t olh_epoch,
real_time delete_at,
string *petag,
using namespace rgw::putobj;
// do not change the null_yield in the initialization of this AtomicObjectProcessor
// it causes crashes in the ragweed tests
- AtomicObjectProcessor processor(&aio, this->store, dest_bucket_info, &dest_placement,
- dest_bucket_info.owner, obj_ctx,
- dest_obj, olh_epoch, tag, dpp, null_yield);
+ AtomicObjectProcessor processor(&aio, this->store, bucket, &dest_placement,
+ bucket->get_info().owner, obj_ctx,
+ dest_obj->clone(), olh_epoch, tag,
+ dpp, null_yield);
int ret = processor.prepare(y);
if (ret < 0)
return ret;
}
int RGWRados::transition_obj(RGWObjectCtx& obj_ctx,
- RGWBucketInfo& bucket_info,
- rgw_obj& obj,
+ rgw::sal::RGWBucket* bucket,
+ rgw::sal::RGWObject& obj,
const rgw_placement_rule& placement_rule,
const real_time& mtime,
uint64_t olh_epoch,
const DoutPrefixProvider *dpp,
optional_yield y)
{
- map<string, bufferlist> attrs;
+ rgw::sal::RGWAttrs attrs;
real_time read_mtime;
uint64_t obj_size;
- obj_ctx.set_atomic(obj);
-
- RGWRados::Object op_target(this, bucket_info, obj_ctx, obj);
+ obj.set_atomic(&obj_ctx);
+ RGWRados::Object op_target(this, bucket->get_info(), obj_ctx, obj.get_obj());
RGWRados::Object::Read read_op(&op_target);
read_op.params.attrs = &attrs;
attrs.erase(RGW_ATTR_TAIL_TAG);
ret = copy_obj_data(obj_ctx,
- bucket_info,
+ bucket,
placement_rule,
read_op,
obj_size - 1,
- obj,
+ &obj,
nullptr /* pmtime */,
mtime,
attrs,
rgw_raw_obj raw_head;
obj_to_raw(manifest.get_head_placement_rule(), head_obj, &raw_head);
for (iter = manifest.obj_begin(); iter != manifest.obj_end(); ++iter) {
- const rgw_raw_obj& mobj = iter.get_location().get_raw_obj(this);
+ const rgw_raw_obj& mobj = iter.get_location().get_raw_obj(store);
if (mobj == raw_head)
continue;
cls_rgw_obj_key key(mobj.oid);
int RGWRados::send_chain_to_gc(cls_rgw_obj_chain& chain, const string& tag)
{
+ if (chain.empty()) {
+ return 0;
+ }
+
return gc->send_chain(chain, tag);
}
return index_op.complete_del(-1 /* pool */, 0, mtime, NULL);
}
-static void generate_fake_tag(RGWRados *store, map<string, bufferlist>& attrset, RGWObjManifest& manifest, bufferlist& manifest_bl, bufferlist& tag_bl)
+static void generate_fake_tag(rgw::sal::RGWStore* store, map<string, bufferlist>& attrset, RGWObjManifest& manifest, bufferlist& manifest_bl, bufferlist& tag_bl)
{
string tag;
if (bletag.length() > 0 && bletag[bletag.length() - 1] == '\0') {
bufferlist newbl;
bletag.splice(0, bletag.length() - 1, &newbl);
- bletag.claim(newbl);
+ bletag = std::move(newbl);
}
}
s->manifest->has_explicit_objs()) {
RGWObjManifest::obj_iterator mi;
for (mi = s->manifest->obj_begin(); mi != s->manifest->obj_end(); ++mi) {
- ldout(cct, 20) << "manifest: ofs=" << mi.get_ofs() << " loc=" << mi.get_location().get_raw_obj(this) << dendl;
+ ldout(cct, 20) << "manifest: ofs=" << mi.get_ofs() << " loc=" << mi.get_location().get_raw_obj(store) << dendl;
}
}
* Uh oh, something's wrong, object with manifest should have tag. Let's
* create one out of the manifest, would be unique
*/
- generate_fake_tag(this, s->attrset, *s->manifest, manifest_bl, s->obj_tag);
+ generate_fake_tag(store, s->attrset, *s->manifest, manifest_bl, s->obj_tag);
s->fake_tag = true;
}
}
RGWObjManifest::obj_iterator iter = astate->manifest->obj_find(ofs);
uint64_t stripe_ofs = iter.get_stripe_ofs();
- read_obj = iter.get_location().get_raw_obj(store);
+ read_obj = iter.get_location().get_raw_obj(store->store);
len = std::min(len, iter.get_stripe_size() - (ofs - stripe_ofs));
read_ofs = iter.location_ofs() + (ofs - stripe_ofs);
reading_from_head = (read_obj == state.head_obj);
off_t next_stripe_ofs = stripe_ofs + iter.get_stripe_size();
while (ofs < next_stripe_ofs && ofs <= end) {
- read_obj = iter.get_location().get_raw_obj(this);
+ read_obj = iter.get_location().get_raw_obj(store);
uint64_t read_len = std::min(len, iter.get_stripe_size() - (ofs - stripe_ofs));
read_ofs = iter.location_ofs() + (ofs - stripe_ofs);
int RGWRados::get_bucket_stats_async(RGWBucketInfo& bucket_info, int shard_id, RGWGetBucketStats_CB *ctx)
{
int num_aio = 0;
- RGWGetBucketStatsContext *get_ctx = new RGWGetBucketStatsContext(ctx, bucket_info.num_shards ? : 1);
+ RGWGetBucketStatsContext *get_ctx = new RGWGetBucketStatsContext(ctx, bucket_info.layout.current_index.layout.normal.num_shards ? : 1);
ceph_assert(get_ctx);
int r = cls_bucket_head_async(bucket_info, shard_id, get_ctx, &num_aio);
if (r < 0) {
return 0;
}
-int RGWRados::bi_list(rgw_bucket& bucket, int shard_id, const string& filter_obj, const string& marker, uint32_t max, list<rgw_cls_bi_entry> *entries, bool *is_truncated)
+int RGWRados::bi_list(const RGWBucketInfo& bucket_info, int shard_id, const string& filter_obj, const string& marker, uint32_t max, list<rgw_cls_bi_entry> *entries, bool *is_truncated)
{
BucketShard bs(this);
- int ret = bs.init(bucket, shard_id, nullptr /* no RGWBucketInfo */);
+ int ret = bs.init(bucket_info.bucket, shard_id, bucket_info.layout.current_index, nullptr /* no RGWBucketInfo */);
if (ret < 0) {
ldout(cct, 5) << "bs.init() returned ret=" << ret << dendl;
return ret;
}
int RGWRados::list_lc_progress(string& marker, uint32_t max_entries,
- vector<cls_rgw_lc_entry>& progress_map,
+ vector<rgw::sal::Lifecycle::LCEntry>& progress_map,
int& index)
{
return lc->list_lc_progress(marker, max_entries, progress_map, index);
}; // ShardTracker
// add the next unique candidate, or return false if we reach the end
- auto next_candidate = [] (ShardTracker& t,
+ auto next_candidate = [] (CephContext *cct, ShardTracker& t,
std::map<std::string, size_t>& candidates,
size_t tracker_idx) {
while (!t.at_end()) {
// it's important that the values in the map refer to the index
// into the results_trackers vector, which may not be the same
// as the shard number (i.e., when not all shards are requested)
- next_candidate(t, candidates, tracker_idx);
+ next_candidate(cct, t, candidates, tracker_idx);
++tracker_idx;
}
candidates.erase(candidates.begin());
tracker.advance();
- next_candidate(tracker, candidates, tracker_idx);
+ next_candidate(cct, tracker, candidates, tracker_idx);
if (tracker.at_end() && tracker.is_truncated()) {
// once we exhaust one shard that is truncated, we need to stop,
RGWObjManifest::obj_iterator miter;
RGWObjManifest& manifest = *astate->manifest;
for (miter = manifest.obj_begin(); miter != manifest.obj_end(); ++miter) {
- const rgw_raw_obj& raw_loc = miter.get_location().get_raw_obj(this);
+ const rgw_raw_obj& raw_loc = miter.get_location().get_raw_obj(store);
rgw_obj loc;
RGWSI_Tier_RADOS::raw_obj_to_obj(manifest.get_obj().bucket, raw_loc, &loc);
bool need_resharding = false;
uint32_t num_source_shards =
- (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
+ (bucket_info.layout.current_index.layout.normal.num_shards > 0 ? bucket_info.layout.current_index.layout.normal.num_shards : 1);
const uint32_t max_dynamic_shards =
uint32_t(cct->_conf.get_val<uint64_t>("rgw_max_dynamic_shards"));
}
ldout(cct, 1) << "RGWRados::" << __func__ << " bucket " << bucket.name <<
- " needs resharding; current num shards " << bucket_info.num_shards <<
+ " needs resharding; current num shards " << bucket_info.layout.current_index.layout.normal.num_shards <<
"; new num shards " << final_num_shards << " (suggested " <<
suggested_num_shards << ")" << dendl;
{
RGWReshard reshard(this->store);
- uint32_t num_source_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
+ uint32_t num_source_shards = (bucket_info.layout.current_index.layout.normal.num_shards > 0 ? bucket_info.layout.current_index.layout.normal.num_shards : 1);
new_num_shards = std::min(new_num_shards, get_max_bucket_shards());
if (new_num_shards <= num_source_shards) {
}
int RGWRados::check_quota(const rgw_user& bucket_owner, rgw_bucket& bucket,
- RGWQuotaInfo& user_quota, RGWQuotaInfo& bucket_quota, uint64_t obj_size, bool check_size_only)
+ RGWQuotaInfo& user_quota, RGWQuotaInfo& bucket_quota,
+ uint64_t obj_size, optional_yield y,
+ bool check_size_only)
{
// if we only check size, then num_objs will set to 0
if(check_size_only)
- return quota_handler->check_quota(bucket_owner, bucket, user_quota, bucket_quota, 0, obj_size);
+ return quota_handler->check_quota(bucket_owner, bucket, user_quota, bucket_quota, 0, obj_size, y);
- return quota_handler->check_quota(bucket_owner, bucket, user_quota, bucket_quota, 1, obj_size);
+ return quota_handler->check_quota(bucket_owner, bucket, user_quota, bucket_quota, 1, obj_size, y);
}
-int RGWRados::get_target_shard_id(const RGWBucketInfo& bucket_info, const string& obj_key,
+int RGWRados::get_target_shard_id(const rgw::bucket_index_normal_layout& layout, const string& obj_key,
int *shard_id)
{
int r = 0;
- switch (bucket_info.bucket_index_shard_hash_type) {
- case RGWBucketInfo::MOD:
- if (!bucket_info.num_shards) {
+ switch (layout.hash_type) {
+ case rgw::BucketHashType::Mod:
+ if (!layout.num_shards) {
if (shard_id) {
*shard_id = -1;
}
} else {
- uint32_t sid = svc.bi_rados->bucket_shard_index(obj_key, bucket_info.num_shards);
+ uint32_t sid = svc.bi_rados->bucket_shard_index(obj_key, layout.num_shards);
if (shard_id) {
*shard_id = (int)sid;
}