#include "rgw_tools.h"
#include "rgw_coroutine.h"
#include "rgw_compression.h"
+#include "rgw_crypt.h"
#include "rgw_etag_verifier.h"
#include "rgw_worker.h"
#include "rgw_notify.h"
*/
int RGWRados::init_begin(const DoutPrefixProvider *dpp)
{
- int ret;
-
- inject_notify_timeout_probability =
- cct->_conf.get_val<double>("rgw_inject_notify_timeout_probability");
- max_notify_retries = cct->_conf.get_val<uint64_t>("rgw_max_notify_retries");
-
- ret = init_svc(false, dpp);
+ int ret = init_svc(false, dpp);
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: failed to init services (ret=" << cpp_strerror(-ret) << ")" << dendl;
return ret;
}
}
}
+
/* We need the manifest to recompute the ETag for verification */
iter = src_attrs.find(RGW_ATTR_MANIFEST);
if (iter != src_attrs.end()) {
manifest_bl = std::move(iter->second);
src_attrs.erase(iter);
+
+ // if the source object was encrypted, preserve the part lengths from
+ // the original object's manifest in RGW_ATTR_CRYPT_PARTS. if the object
+ // already replicated and has the RGW_ATTR_CRYPT_PARTS attr, preserve it
+ if (src_attrs.count(RGW_ATTR_CRYPT_MODE) &&
+ !src_attrs.count(RGW_ATTR_CRYPT_PARTS)) {
+ std::vector<size_t> parts_len;
+ int r = RGWGetObj_BlockDecrypt::read_manifest_parts(dpp, manifest_bl,
+ parts_len);
+ if (r < 0) {
+ ldpp_dout(dpp, 4) << "failed to read part lengths from the manifest" << dendl;
+ } else {
+ // store the encoded part lenghts in RGW_ATTR_CRYPT_PARTS
+ bufferlist parts_bl;
+ encode(parts_len, parts_bl);
+ src_attrs[RGW_ATTR_CRYPT_PARTS] = std::move(parts_bl);
+ }
+ }
}
// filter out olh attributes
string etag;
real_time set_mtime;
- uint64_t expected_size = 0;
+ uint64_t accounted_size = 0;
RGWObjState *dest_state = NULL;
RGWObjManifest *manifest = nullptr;
}
ret = conn->complete_request(in_stream_req, &etag, &set_mtime,
- &expected_size, nullptr, nullptr, null_yield);
+ &accounted_size, nullptr, nullptr, null_yield);
if (ret < 0) {
goto set_err_state;
}
if (ret < 0) {
goto set_err_state;
}
- if (cb.get_data_len() != expected_size) {
+ if (cb.get_data_len() != accounted_size) {
ret = -EIO;
ldpp_dout(dpp, 0) << "ERROR: object truncated during fetching, expected "
- << expected_size << " bytes but received " << cb.get_data_len() << dendl;
+ << accounted_size << " bytes but received " << cb.get_data_len() << dendl;
goto set_err_state;
}
+
if (compressor && compressor->is_compressed()) {
bufferlist tmp;
RGWCompressionInfo cs_info;
cs_info.compression_type = plugin->get_type_name();
- cs_info.orig_size = cb.get_data_len();
+ cs_info.orig_size = accounted_size;
cs_info.compressor_message = compressor->get_compressor_message();
cs_info.blocks = move(compressor->get_compression_blocks());
encode(cs_info, tmp);
cb.get_attrs()[RGW_ATTR_COMPRESSION] = tmp;
+ } else if (auto c = cb.get_attrs().find(RGW_ATTR_COMPRESSION);
+ c != cb.get_attrs().end()) {
+ // if the object was transferred in its compressed+encrypted form, use its
+ // original uncompressed size
+ try {
+ RGWCompressionInfo info;
+ auto p = c->second.cbegin();
+ decode(info, p);
+ accounted_size = info.orig_size;
+ } catch (const buffer::error&) {
+ ldpp_dout(dpp, 0) << "ERROR: could not decode compression attr for "
+ "replicated object " << dest_obj << dendl;
+ // decode error isn't fatal, but we might put the wrong size in the index
+ }
}
if (override_owner) {
#define MAX_COMPLETE_RETRY 100
for (i = 0; i < MAX_COMPLETE_RETRY; i++) {
bool canceled = false;
- ret = processor.complete(cb.get_data_len(), etag, mtime, set_mtime,
+ ret = processor.complete(accounted_size, etag, mtime, set_mtime,
attrs, delete_at, nullptr, nullptr, nullptr,
zones_trace, &canceled, null_yield);
if (ret < 0) {
return CLSRGWIssueBucketRebuild(index_pool.ioctx(), bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
}
+static int resync_encrypted_multipart(const DoutPrefixProvider* dpp,
+ optional_yield y, RGWRados* store,
+ RGWBucketInfo& bucket_info,
+ RGWObjectCtx& obj_ctx,
+ const RGWObjState& state)
+{
+ // only overwrite if the tag hasn't changed
+ obj_ctx.set_atomic(state.obj);
+
+ // make a tiny adjustment to the existing mtime so that fetch_remote_obj()
+ // won't return ERR_NOT_MODIFIED when resyncing the object
+ const auto set_mtime = state.mtime + std::chrono::nanoseconds(1);
+
+ // use set_attrs() to update the mtime in a bucket index transaction so the
+ // change is recorded in bilog and datalog entries. this will cause any peer
+ // zones to resync the object
+ auto add_attrs = std::map<std::string, bufferlist>{
+ { RGW_ATTR_PREFIX "resync-encrypted-multipart", bufferlist{} },
+ };
+
+ return store->set_attrs(dpp, &obj_ctx, bucket_info, state.obj,
+ add_attrs, nullptr, y, set_mtime);
+}
+
+static void try_resync_encrypted_multipart(const DoutPrefixProvider* dpp,
+ optional_yield y, RGWRados* store,
+ RGWBucketInfo& bucket_info,
+ RGWObjectCtx& obj_ctx,
+ const rgw_bucket_dir_entry& dirent,
+ Formatter* f)
+{
+ const auto obj = rgw_obj{bucket_info.bucket, dirent.key};
+
+ RGWObjState* astate = nullptr;
+ RGWObjManifest* manifest = nullptr;
+ constexpr bool follow_olh = false; // dirent will have version ids
+ int ret = store->get_obj_state(dpp, &obj_ctx, bucket_info, obj,
+ &astate, &manifest, follow_olh, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 4) << obj << " does not exist" << dendl;
+ return;
+ }
+
+ // check whether the object is encrypted
+ if (auto i = astate->attrset.find(RGW_ATTR_CRYPT_MODE);
+ i == astate->attrset.end()) {
+ ldpp_dout(dpp, 4) << obj << " is not encrypted" << dendl;
+ return;
+ }
+
+ // check whether the object is multipart
+ if (!manifest) {
+ ldpp_dout(dpp, 4) << obj << " has no manifest so is not multipart" << dendl;
+ return;
+ }
+ const RGWObjManifest::obj_iterator end = manifest->obj_end(dpp);
+ if (end.get_cur_part_id() == 0) {
+ ldpp_dout(dpp, 4) << obj << " manifest is not multipart" << dendl;
+ return;
+ }
+
+ ret = resync_encrypted_multipart(dpp, y, store, bucket_info,
+ obj_ctx, *astate);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: failed to update " << obj
+ << ": " << cpp_strerror(ret) << dendl;
+ return;
+ }
+
+ f->open_object_section("object");
+ encode_json("name", obj.key.name, f);
+ if (!obj.key.instance.empty()) {
+ encode_json("version", obj.key.instance, f);
+ }
+ encode_json("mtime", astate->mtime, f);
+ f->close_section(); // "object"
+}
+
+int RGWRados::bucket_resync_encrypted_multipart(const DoutPrefixProvider* dpp,
+ optional_yield y,
+ rgw::sal::RadosStore* driver,
+ RGWBucketInfo& bucket_info,
+ const std::string& marker,
+ RGWFormatterFlusher& flusher)
+{
+ RGWRados::Bucket target(this, bucket_info);
+ RGWRados::Bucket::List list_op(&target);
+
+ list_op.params.marker.name = marker;
+ list_op.params.enforce_ns = true; // only empty ns
+ list_op.params.list_versions = true;
+ list_op.params.allow_unordered = true;
+
+ /* List bucket entries in chunks. */
+ static constexpr int MAX_LIST_OBJS = 100;
+ std::vector<rgw_bucket_dir_entry> entries;
+ entries.reserve(MAX_LIST_OBJS);
+
+ int processed = 0;
+ bool is_truncated = true;
+
+ Formatter* f = flusher.get_formatter();
+ f->open_array_section("progress");
+
+ do {
+ int ret = list_op.list_objects(dpp, MAX_LIST_OBJS, &entries, nullptr,
+ &is_truncated, y);
+ if (ret < 0) {
+ return ret;
+ }
+
+ f->open_object_section("batch");
+ f->open_array_section("modified");
+
+ for (const auto& dirent : entries) {
+ RGWObjectCtx obj_ctx{driver};
+ try_resync_encrypted_multipart(dpp, y, this, bucket_info,
+ obj_ctx, dirent, f);
+ }
+
+ f->close_section(); // "modified"
+
+ processed += entries.size();
+ encode_json("total processed", processed, f);
+ encode_json("marker", list_op.get_next_marker().name, f);
+ f->close_section(); // "batch"
+
+ flusher.flush(); // flush after each 'chunk'
+ } while (is_truncated);
+
+ f->close_section(); // "progress" array
+ return 0;
+}
+
int RGWRados::bucket_set_reshard(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const cls_rgw_bucket_instance_entry& entry)
{
RGWSI_RADOS::Pool index_pool;
int RGWRados::set_attrs(const DoutPrefixProvider *dpp, RGWObjectCtx* rctx, RGWBucketInfo& bucket_info, const rgw_obj& src_obj,
map<string, bufferlist>& attrs,
map<string, bufferlist>* rmattrs,
- optional_yield y)
+ optional_yield y,
+ ceph::real_time set_mtime /* = zero() */)
{
rgw_obj obj = src_obj;
if (obj.key.instance == "null") {
* set the metadata.
* Hence do not update mtime for any other attr changes */
real_time mtime = state->mtime;
+ if (set_mtime != ceph::real_clock::zero()) {
+ mtime = set_mtime;
+ }
struct timespec mtime_ts = real_clock::to_timespec(mtime);
op.mtime2(&mtime_ts);
auto& ioctx = ref.pool.ioctx();
if (iter != state->attrset.end()) {
iter->second = state->obj_tag;
}
+
+ state->mtime = mtime;
}
return 0;
return r;
}
+
if (need_to_remove) {
string olh_tag(state.olh_tag.c_str(), state.olh_tag.length());
r = clear_olh(dpp, obj_ctx, obj, bucket_info, ref, olh_tag, last_ver, null_yield);
ldpp_dout(dpp, 0) << "ERROR: could not clear olh, r=" << r << dendl;
return r;
}
- }
-
- r = bucket_index_trim_olh_log(dpp, bucket_info, state, obj, last_ver);
- if (r < 0 && r != -ECANCELED) {
- ldpp_dout(dpp, 0) << "ERROR: could not trim olh log, r=" << r << dendl;
- return r;
+ } else {
+ r = bucket_index_trim_olh_log(dpp, bucket_info, state, obj, last_ver);
+ if (r < 0 && r != -ECANCELED) {
+ ldpp_dout(dpp, 0) << "ERROR: could not trim olh log, r=" << r << dendl;
+ return r;
+ }
}
return 0;
return clear_olh(dpp, obj_ctx, obj, bucket_info, ref, tag, ver, y);
}
-
int RGWRados::clear_olh(const DoutPrefixProvider *dpp,
RGWObjectCtx& obj_ctx,
const rgw_obj& obj,
}
continue;
}
+ // it's possible that the pending xattr from this op prevented the olh
+ // object from being cleaned by another thread that was deleting the last
+ // existing version. We invoke a best-effort update_olh here to handle this case.
+ int r = update_olh(dpp, obj_ctx, state, bucket_info, olh_obj);
+ if (r < 0 && r != -ECANCELED) {
+ ldpp_dout(dpp, 20) << "update_olh() target_obj=" << olh_obj << " returned " << r << dendl;
+ }
return ret;
}
break;
if (ret == -ECANCELED) {
continue;
}
+ // it's possible that the pending xattr from this op prevented the olh
+ // object from being cleaned by another thread that was deleting the last
+ // existing version. We invoke a best-effort update_olh here to handle this case.
+ int r = update_olh(dpp, obj_ctx, state, bucket_info, olh_obj, zones_trace);
+ if (r < 0 && r != -ECANCELED) {
+ ldpp_dout(dpp, 20) << "update_olh() target_obj=" << olh_obj << " returned " << r << dendl;
+ }
return ret;
}
break;