int RGWRados::get_required_alignment(const DoutPrefixProvider *dpp, const rgw_pool& pool, uint64_t *alignment)
{
IoCtx ioctx;
- int r = open_pool_ctx(dpp, pool, ioctx, false);
+ int r = open_pool_ctx(dpp, pool, ioctx, false, true);
if (r < 0) {
ldpp_dout(dpp, 0) << "ERROR: open_pool_ctx() returned " << r << dendl;
return r;
}
int RGWRados::open_pool_ctx(const DoutPrefixProvider *dpp, const rgw_pool& pool, librados::IoCtx& io_ctx,
- bool mostly_omap)
+ bool mostly_omap, bool bulk)
{
constexpr bool create = true; // create the pool if it doesn't exist
- return rgw_init_ioctx(dpp, get_rados_handle(), pool, io_ctx, create, mostly_omap);
+ return rgw_init_ioctx(dpp, get_rados_handle(), pool, io_ctx, create, mostly_omap, bulk);
}
/**** logs ****/
return -EIO;
}
- int r = open_pool_ctx(dpp, pool, *ioctx, false);
+ int r = open_pool_ctx(dpp, pool, *ioctx, false, true);
if (r < 0) {
ldpp_dout(dpp, 0) << "ERROR: unable to open data-pool=" << pool.to_str() <<
" for obj=" << obj << " with error-code=" << r << dendl;
int process_attrs(void) {
+ bool encrypted = false;
if (extra_data_bl.length()) {
JSONParser jp;
if (!jp.parse(extra_data_bl.c_str(), extra_data_bl.length())) {
JSONDecoder::decode_json("attrs", src_attrs, &jp);
+ encrypted = src_attrs.count(RGW_ATTR_CRYPT_MODE);
+ if (encrypted) {
+ // we won't have access to the decrypted data for checksumming
+ try_etag_verify = false;
+ }
+
+ // if the object is both compressed and encrypted, it was transferred
+ // in its encrypted+compressed form. we need to preserve the original
+ // RGW_ATTR_COMPRESSION instead of falling back to default compression
+ // settings
auto iter = src_attrs.find(RGW_ATTR_COMPRESSION);
- if (iter != src_attrs.end()) {
+ if (iter != src_attrs.end() && !encrypted) {
const bufferlist bl = std::move(iter->second);
src_attrs.erase(iter); // don't preserve source compression info
return ret;
}
- if (plugin && src_attrs.find(RGW_ATTR_CRYPT_MODE) == src_attrs.end()) {
- //do not compress if object is encrypted
+ // do not compress if object is encrypted
+ if (plugin && !encrypted) {
compressor = boost::in_place(cct, plugin, filter);
// add a filter that buffers data so we don't try to compress tiny blocks.
// libcurl reads in 16k at a time, and we need at least 64k to get a good
filter = &*buffering;
}
- /*
- * Presently we don't support ETag based verification if encryption is
- * requested. We can enable simultaneous support once we have a mechanism
- * to know the sequence in which the filters must be applied.
- */
- if (try_etag_verify && src_attrs.find(RGW_ATTR_CRYPT_MODE) == src_attrs.end()) {
+ if (try_etag_verify) {
ret = rgw::putobj::create_etag_verifier(dpp, cct, filter, manifest_bl,
compression_info,
etag_verifier);
if (read_mtime != mtime) {
/* raced */
+ ldpp_dout(dpp, 0) << __func__ << " ERROR: failed to transition obj(" << obj.key << ") read_mtime = " << read_mtime << " doesn't match mtime = " << mtime << dendl;
return -ECANCELED;
}
static bool is_olh(map<string, bufferlist>& attrs)
{
- map<string, bufferlist>::iterator iter = attrs.find(RGW_ATTR_OLH_INFO);
+ map<string, bufferlist>::iterator iter = attrs.find(RGW_ATTR_OLH_VER);
return (iter != attrs.end());
}
}
- real_time mtime = real_clock::now();
+ /* As per https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingMetadata.html,
+ * the only way for users to modify object metadata is to make a copy of the object and
+ * set the metadata.
+ * Hence do not update mtime for any other attr changes */
+ real_time mtime = state->mtime;
struct timespec mtime_ts = real_clock::to_timespec(mtime);
op.mtime2(&mtime_ts);
auto& ioctx = ref.pool.ioctx();
auto iter = state.io_ctxs.find(read_obj.pool);
if (iter == state.io_ctxs.end()) {
state.cur_ioctx = &state.io_ctxs[read_obj.pool];
- r = store->open_pool_ctx(dpp, read_obj.pool, *state.cur_ioctx, false);
+ r = store->open_pool_ctx(dpp, read_obj.pool, *state.cur_ioctx, false, true);
if (r < 0) {
ldpp_dout(dpp, 20) << "ERROR: failed to open pool context for pool=" << read_obj.pool << " r=" << r << dendl;
return r;
return rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, op, &outbl, null_yield);
}
+void RGWRados::olh_cancel_modification(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info,
+ RGWObjState& state, const rgw_obj& olh_obj,
+ const std::string& op_tag, optional_yield y)
+{
+ if (cct->_conf->rgw_debug_inject_olh_cancel_modification_err) {
+ // simulate the scenario where we fail to remove the pending xattr
+ return;
+ }
+
+ rgw_rados_ref ref;
+ int r = get_obj_head_ref(dpp, bucket_info, olh_obj, &ref);
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << __func__ << " target_obj=" << olh_obj << " get_obj_head_ref() returned " << r << dendl;
+ return;
+ }
+ string attr_name = RGW_ATTR_OLH_PENDING_PREFIX;
+ attr_name.append(op_tag);
+
+ // first remove the relevant pending prefix
+ ObjectWriteOperation op;
+ bucket_index_guard_olh_op(dpp, state, op);
+ op.rmxattr(attr_name.c_str());
+ r = rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &op, y);
+ if (r < 0) {
+ if (r != -ENOENT && r != -ECANCELED) {
+ ldpp_dout(dpp, 0) << __func__ << " target_obj=" << olh_obj << " rmxattr rgw_rados_operate() returned " << r << dendl;
+ }
+ return;
+ }
+
+ if (auto iter = state.attrset.find(RGW_ATTR_OLH_INFO); iter == state.attrset.end()) {
+ // attempt to remove the OLH object if there are no pending ops,
+ // its olh info attr is empty, and its tag hasn't changed
+ ObjectWriteOperation rm_op;
+ bucket_index_guard_olh_op(dpp, state, rm_op);
+ rm_op.cmpxattr(RGW_ATTR_OLH_INFO, CEPH_OSD_CMPXATTR_OP_EQ, bufferlist());
+ cls_obj_check_prefix_exist(rm_op, RGW_ATTR_OLH_PENDING_PREFIX, true);
+ rm_op.remove();
+ r = rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &rm_op, y);
+ }
+ if (r < 0 && (r != -ENOENT && r != -ECANCELED)) {
+ ldpp_dout(dpp, 0) << __func__ << " target_obj=" << olh_obj << " olh rm rgw_rados_operate() returned " << r << dendl;
+ }
+}
+
int RGWRados::olh_init_modification_impl(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, string *op_tag)
{
ObjectWriteOperation op;
if (has_tag) {
/* guard against racing writes */
bucket_index_guard_olh_op(dpp, state, op);
+ } else if (state.exists) {
+ // This is the case where a null versioned object already exists for this key
+ // but it hasn't been initialized as an OLH object yet. We immediately add
+ // the RGW_ATTR_OLH_INFO attr so that the OLH points back to itself and
+ // therefore effectively makes this an unobservable modification.
+ op.cmpxattr(RGW_ATTR_OLH_ID_TAG, CEPH_OSD_CMPXATTR_OP_EQ, bufferlist());
+ RGWOLHInfo info;
+ info.target = olh_obj;
+ info.removed = false;
+ bufferlist bl;
+ encode(info, bl);
+ op.setxattr(RGW_ATTR_OLH_INFO, bl);
}
if (!has_tag) {
int RGWRados::bucket_index_clear_olh(const DoutPrefixProvider *dpp,
RGWBucketInfo& bucket_info,
- RGWObjState& state,
+ const std::string& olh_tag,
const rgw_obj& obj_instance)
{
rgw_rados_ref ref;
BucketShard bs(this);
- string olh_tag(state.olh_tag.c_str(), state.olh_tag.length());
-
cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), string());
int ret = guard_reshard(dpp, &bs, obj_instance, bucket_info,
return r;
}
+ if (need_to_remove) {
+ string olh_tag(state.olh_tag.c_str(), state.olh_tag.length());
+ r = clear_olh(dpp, obj_ctx, obj, bucket_info, ref, olh_tag, last_ver, null_yield);
+ if (r < 0 && r != -ECANCELED) {
+ ldpp_dout(dpp, 0) << "ERROR: could not clear olh, r=" << r << dendl;
+ return r;
+ }
+ }
+
r = bucket_index_trim_olh_log(dpp, bucket_info, state, obj, last_ver);
- if (r < 0) {
+ if (r < 0 && r != -ECANCELED) {
ldpp_dout(dpp, 0) << "ERROR: could not trim olh log, r=" << r << dendl;
return r;
}
- if (need_to_remove) {
- ObjectWriteOperation rm_op;
+ return 0;
+}
- rm_op.cmpxattr(RGW_ATTR_OLH_ID_TAG, CEPH_OSD_CMPXATTR_OP_EQ, olh_tag);
- rm_op.cmpxattr(RGW_ATTR_OLH_VER, CEPH_OSD_CMPXATTR_OP_EQ, last_ver);
- cls_obj_check_prefix_exist(rm_op, RGW_ATTR_OLH_PENDING_PREFIX, true); /* fail if found one of these, pending modification */
- rm_op.remove();
+int RGWRados::clear_olh(const DoutPrefixProvider *dpp,
+ RGWObjectCtx& obj_ctx,
+ const rgw_obj& obj,
+ RGWBucketInfo& bucket_info,
+ const std::string& tag,
+ const uint64_t ver,
+ optional_yield y) {
+ rgw_rados_ref ref;
+ int r = get_obj_head_ref(dpp, bucket_info, obj, &ref);
+ if (r < 0) {
+ return r;
+ }
+ return clear_olh(dpp, obj_ctx, obj, bucket_info, ref, tag, ver, y);
+}
- r = rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &rm_op, null_yield);
- if (r == -ECANCELED) {
- return 0; /* someone else won this race */
- } else {
- /*
- * only clear if was successful, otherwise we might clobber pending operations on this object
- */
- r = bucket_index_clear_olh(dpp, bucket_info, state, obj);
- if (r < 0) {
- ldpp_dout(dpp, 0) << "ERROR: could not clear bucket index olh entries r=" << r << dendl;
- return r;
- }
+
+int RGWRados::clear_olh(const DoutPrefixProvider *dpp,
+ RGWObjectCtx& obj_ctx,
+ const rgw_obj& obj,
+ RGWBucketInfo& bucket_info,
+ rgw_rados_ref& ref,
+ const std::string& tag,
+ const uint64_t ver,
+ optional_yield y) {
+ ObjectWriteOperation rm_op;
+
+ RGWObjManifest *manifest = nullptr;
+ RGWObjState *s = nullptr;
+
+ int r = get_obj_state(dpp, &obj_ctx, bucket_info, obj, &s, &manifest, false, y);
+ if (r < 0) {
+ return r;
+ }
+ map<string, bufferlist> pending_entries;
+ rgw_filter_attrset(s->attrset, RGW_ATTR_OLH_PENDING_PREFIX, &pending_entries);
+
+ map<string, bufferlist> rm_pending_entries;
+ check_pending_olh_entries(dpp, pending_entries, &rm_pending_entries);
+
+ if (!rm_pending_entries.empty()) {
+ r = remove_olh_pending_entries(dpp, bucket_info, *s, obj, rm_pending_entries);
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: rm_pending_entries returned ret=" << r << dendl;
+ return r;
}
}
+ bufferlist tag_bl;
+ tag_bl.append(tag.c_str(), tag.length());
+ rm_op.cmpxattr(RGW_ATTR_OLH_ID_TAG, CEPH_OSD_CMPXATTR_OP_EQ, tag_bl);
+ rm_op.cmpxattr(RGW_ATTR_OLH_VER, CEPH_OSD_CMPXATTR_OP_EQ, ver);
+ cls_obj_check_prefix_exist(rm_op, RGW_ATTR_OLH_PENDING_PREFIX, true); /* fail if found one of these, pending modification */
+ rm_op.remove();
+
+ r = rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &rm_op, y);
+ if (r == -ECANCELED) {
+ return r; /* someone else made a modification in the meantime */
+ }
+ /*
+ * only clear if was successful, otherwise we might clobber pending operations on this object
+ */
+ r = bucket_index_clear_olh(dpp, bucket_info, tag, obj);
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: could not clear bucket index olh entries r=" << r << dendl;
+ return r;
+ }
return 0;
}
}
return ret;
}
- ret = bucket_index_link_olh(dpp, bucket_info, *state, target_obj,
- delete_marker, op_tag, meta, olh_epoch, unmod_since,
- high_precision_time, y, zones_trace, log_data_change);
+ if (cct->_conf->rgw_debug_inject_set_olh_err) {
+ // fail here to simulate the scenario of an unlinked object instance
+ ret = -cct->_conf->rgw_debug_inject_set_olh_err;
+ } else {
+ ret = bucket_index_link_olh(dpp, bucket_info, *state, target_obj,
+ delete_marker, op_tag, meta, olh_epoch, unmod_since,
+ high_precision_time, y, zones_trace, log_data_change);
+ }
if (ret < 0) {
ldpp_dout(dpp, 20) << "bucket_index_link_olh() target_obj=" << target_obj << " delete_marker=" << (int)delete_marker << " returned " << ret << dendl;
+ olh_cancel_modification(dpp, bucket_info, *state, olh_obj, op_tag, y);
if (ret == -ECANCELED) {
// the bucket index rejected the link_olh() due to olh tag mismatch;
// attempt to reconstruct olh head attributes based on the bucket index
ret = bucket_index_unlink_instance(dpp, bucket_info, target_obj, op_tag, olh_tag, olh_epoch, zones_trace);
if (ret < 0) {
+ olh_cancel_modification(dpp, bucket_info, *state, olh_obj, op_tag, y);
ldpp_dout(dpp, 20) << "bucket_index_unlink_instance() target_obj=" << target_obj << " returned " << ret << dendl;
if (ret == -ECANCELED) {
continue;
return r;
}
- auto iter = attrset.find(RGW_ATTR_OLH_INFO);
+ auto iter = attrset.find(RGW_ATTR_OLH_VER);
if (iter == attrset.end()) { /* not an olh */
return -EINVAL;
}
}
}
-int RGWRados::remove_olh_pending_entries(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, map<string, bufferlist>& pending_attrs)
+int RGWRados::remove_olh_pending_entries(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, map<string, bufferlist>& pending_attrs)
{
rgw_rados_ref ref;
int r = get_obj_head_ref(dpp, bucket_info, olh_obj, &ref);
}
}
- auto iter = state->attrset.find(RGW_ATTR_OLH_INFO);
+ auto iter = state->attrset.find(RGW_ATTR_OLH_VER);
if (iter == state->attrset.end()) {
return -EINVAL;
}
+ iter = state->attrset.find(RGW_ATTR_OLH_INFO);
+ if (iter == state->attrset.end()) {
+ return -ENOENT;
+ }
RGWOLHInfo olh;
int ret = decode_olh_info(dpp, cct, iter->second, &olh);
librados::IoCtx& io_ctx = ctx.io_ctx;
librados::NObjectIterator& iter = ctx.iter;
- int r = open_pool_ctx(dpp, pool, io_ctx, false);
+ int r = open_pool_ctx(dpp, pool, io_ctx, false, false);
if (r < 0)
return r;
librados::IoCtx& io_ctx = ctx.io_ctx;
librados::NObjectIterator& iter = ctx.iter;
- int r = open_pool_ctx(dpp, pool, io_ctx, false);
+ int r = open_pool_ctx(dpp, pool, io_ctx, false, false);
if (r < 0)
return r;