]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/driver/rados/rgw_rados.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / rgw / driver / rados / rgw_rados.cc
index df334e99e3998a350be490f0007cac501c95b500..10018d4a68a0c30acab2c7dab624d49742897c17 100644 (file)
@@ -45,6 +45,7 @@
 #include "rgw_tools.h"
 #include "rgw_coroutine.h"
 #include "rgw_compression.h"
+#include "rgw_crypt.h"
 #include "rgw_etag_verifier.h"
 #include "rgw_worker.h"
 #include "rgw_notify.h"
@@ -1342,13 +1343,7 @@ int RGWRados::init_ctl(const DoutPrefixProvider *dpp)
  */
 int RGWRados::init_begin(const DoutPrefixProvider *dpp)
 {
-  int ret;
-
-  inject_notify_timeout_probability =
-    cct->_conf.get_val<double>("rgw_inject_notify_timeout_probability");
-  max_notify_retries = cct->_conf.get_val<uint64_t>("rgw_max_notify_retries");
-
-  ret = init_svc(false, dpp);
+  int ret = init_svc(false, dpp);
   if (ret < 0) {
     ldpp_dout(dpp, 0) << "ERROR: failed to init services (ret=" << cpp_strerror(-ret) << ")" << dendl;
     return ret;
@@ -3417,11 +3412,30 @@ public:
           }
         }
       }
+
       /* We need the manifest to recompute the ETag for verification */
       iter = src_attrs.find(RGW_ATTR_MANIFEST);
       if (iter != src_attrs.end()) {
         manifest_bl = std::move(iter->second);
         src_attrs.erase(iter);
+
+        // if the source object was encrypted, preserve the part lengths from
+        // the original object's manifest in RGW_ATTR_CRYPT_PARTS. if the object
+        // already replicated and has the RGW_ATTR_CRYPT_PARTS attr, preserve it
+        if (src_attrs.count(RGW_ATTR_CRYPT_MODE) &&
+            !src_attrs.count(RGW_ATTR_CRYPT_PARTS)) {
+          std::vector<size_t> parts_len;
+          int r = RGWGetObj_BlockDecrypt::read_manifest_parts(dpp, manifest_bl,
+                                                              parts_len);
+          if (r < 0) {
+            ldpp_dout(dpp, 4) << "failed to read part lengths from the manifest" << dendl;
+          } else {
+            // store the encoded part lenghts in RGW_ATTR_CRYPT_PARTS
+            bufferlist parts_bl;
+            encode(parts_len, parts_bl);
+            src_attrs[RGW_ATTR_CRYPT_PARTS] = std::move(parts_bl);
+          }
+        }
       }
 
       // filter out olh attributes
@@ -3976,7 +3990,7 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
 
   string etag;
   real_time set_mtime;
-  uint64_t expected_size = 0;
+  uint64_t accounted_size = 0;
 
   RGWObjState *dest_state = NULL;
   RGWObjManifest *manifest = nullptr;
@@ -4015,7 +4029,7 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
   }
 
   ret = conn->complete_request(in_stream_req, &etag, &set_mtime,
-                               &expected_size, nullptr, nullptr, null_yield);
+                               &accounted_size, nullptr, nullptr, null_yield);
   if (ret < 0) {
     goto set_err_state;
   }
@@ -4023,21 +4037,36 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
   if (ret < 0) {
     goto set_err_state;
   }
-  if (cb.get_data_len() != expected_size) {
+  if (cb.get_data_len() != accounted_size) {
     ret = -EIO;
     ldpp_dout(dpp, 0) << "ERROR: object truncated during fetching, expected "
-        << expected_size << " bytes but received " << cb.get_data_len() << dendl;
+        << accounted_size << " bytes but received " << cb.get_data_len() << dendl;
     goto set_err_state;
   }
+
   if (compressor && compressor->is_compressed()) {
     bufferlist tmp;
     RGWCompressionInfo cs_info;
     cs_info.compression_type = plugin->get_type_name();
-    cs_info.orig_size = cb.get_data_len();
+    cs_info.orig_size = accounted_size;
     cs_info.compressor_message = compressor->get_compressor_message();
     cs_info.blocks = move(compressor->get_compression_blocks());
     encode(cs_info, tmp);
     cb.get_attrs()[RGW_ATTR_COMPRESSION] = tmp;
+  } else if (auto c = cb.get_attrs().find(RGW_ATTR_COMPRESSION);
+             c != cb.get_attrs().end()) {
+    // if the object was transferred in its compressed+encrypted form, use its
+    // original uncompressed size
+    try {
+      RGWCompressionInfo info;
+      auto p = c->second.cbegin();
+      decode(info, p);
+      accounted_size = info.orig_size;
+    } catch (const buffer::error&) {
+      ldpp_dout(dpp, 0) << "ERROR: could not decode compression attr for "
+          "replicated object " << dest_obj << dendl;
+      // decode error isn't fatal, but we might put the wrong size in the index
+    }
   }
 
   if (override_owner) {
@@ -4167,7 +4196,7 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
 #define MAX_COMPLETE_RETRY 100
   for (i = 0; i < MAX_COMPLETE_RETRY; i++) {
     bool canceled = false;
-    ret = processor.complete(cb.get_data_len(), etag, mtime, set_mtime,
+    ret = processor.complete(accounted_size, etag, mtime, set_mtime,
                              attrs, delete_at, nullptr, nullptr, nullptr,
                              zones_trace, &canceled, null_yield);
     if (ret < 0) {
@@ -5084,6 +5113,140 @@ int RGWRados::bucket_rebuild_index(const DoutPrefixProvider *dpp, RGWBucketInfo&
   return CLSRGWIssueBucketRebuild(index_pool.ioctx(), bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
 }
 
+static int resync_encrypted_multipart(const DoutPrefixProvider* dpp,
+                                      optional_yield y, RGWRados* store,
+                                      RGWBucketInfo& bucket_info,
+                                      RGWObjectCtx& obj_ctx,
+                                      const RGWObjState& state)
+{
+  // only overwrite if the tag hasn't changed
+  obj_ctx.set_atomic(state.obj);
+
+  // make a tiny adjustment to the existing mtime so that fetch_remote_obj()
+  // won't return ERR_NOT_MODIFIED when resyncing the object
+  const auto set_mtime = state.mtime + std::chrono::nanoseconds(1);
+
+  // use set_attrs() to update the mtime in a bucket index transaction so the
+  // change is recorded in bilog and datalog entries. this will cause any peer
+  // zones to resync the object
+  auto add_attrs = std::map<std::string, bufferlist>{
+    { RGW_ATTR_PREFIX "resync-encrypted-multipart", bufferlist{} },
+  };
+
+  return store->set_attrs(dpp, &obj_ctx, bucket_info, state.obj,
+                          add_attrs, nullptr, y, set_mtime);
+}
+
+static void try_resync_encrypted_multipart(const DoutPrefixProvider* dpp,
+                                           optional_yield y, RGWRados* store,
+                                           RGWBucketInfo& bucket_info,
+                                           RGWObjectCtx& obj_ctx,
+                                           const rgw_bucket_dir_entry& dirent,
+                                           Formatter* f)
+{
+  const auto obj = rgw_obj{bucket_info.bucket, dirent.key};
+
+  RGWObjState* astate = nullptr;
+  RGWObjManifest* manifest = nullptr;
+  constexpr bool follow_olh = false; // dirent will have version ids
+  int ret = store->get_obj_state(dpp, &obj_ctx, bucket_info, obj,
+                                 &astate, &manifest, follow_olh, y);
+  if (ret < 0) {
+    ldpp_dout(dpp, 4) << obj << " does not exist" << dendl;
+    return;
+  }
+
+  // check whether the object is encrypted
+  if (auto i = astate->attrset.find(RGW_ATTR_CRYPT_MODE);
+      i == astate->attrset.end()) {
+    ldpp_dout(dpp, 4) << obj << " is not encrypted" << dendl;
+    return;
+  }
+
+  // check whether the object is multipart
+  if (!manifest) {
+    ldpp_dout(dpp, 4) << obj << " has no manifest so is not multipart" << dendl;
+    return;
+  }
+  const RGWObjManifest::obj_iterator end = manifest->obj_end(dpp);
+  if (end.get_cur_part_id() == 0) {
+    ldpp_dout(dpp, 4) << obj << " manifest is not multipart" << dendl;
+    return;
+  }
+
+  ret = resync_encrypted_multipart(dpp, y, store, bucket_info,
+                                   obj_ctx, *astate);
+  if (ret < 0) {
+    ldpp_dout(dpp, 0) << "ERROR: failed to update " << obj
+        << ": " << cpp_strerror(ret) << dendl;
+    return;
+  }
+
+  f->open_object_section("object");
+  encode_json("name", obj.key.name, f);
+  if (!obj.key.instance.empty()) {
+    encode_json("version", obj.key.instance, f);
+  }
+  encode_json("mtime", astate->mtime, f);
+  f->close_section(); // "object"
+}
+
+int RGWRados::bucket_resync_encrypted_multipart(const DoutPrefixProvider* dpp,
+                                                optional_yield y,
+                                                rgw::sal::RadosStore* driver,
+                                                RGWBucketInfo& bucket_info,
+                                                const std::string& marker,
+                                                RGWFormatterFlusher& flusher)
+{
+  RGWRados::Bucket target(this, bucket_info);
+  RGWRados::Bucket::List list_op(&target);
+
+  list_op.params.marker.name = marker;
+  list_op.params.enforce_ns = true; // only empty ns
+  list_op.params.list_versions = true;
+  list_op.params.allow_unordered = true;
+
+  /* List bucket entries in chunks. */
+  static constexpr int MAX_LIST_OBJS = 100;
+  std::vector<rgw_bucket_dir_entry> entries;
+  entries.reserve(MAX_LIST_OBJS);
+
+  int processed = 0;
+  bool is_truncated = true;
+
+  Formatter* f = flusher.get_formatter();
+  f->open_array_section("progress");
+
+  do {
+    int ret = list_op.list_objects(dpp, MAX_LIST_OBJS, &entries, nullptr,
+                                   &is_truncated, y);
+    if (ret < 0) {
+      return ret;
+    }
+
+    f->open_object_section("batch");
+    f->open_array_section("modified");
+
+    for (const auto& dirent : entries) {
+      RGWObjectCtx obj_ctx{driver};
+      try_resync_encrypted_multipart(dpp, y, this, bucket_info,
+                                     obj_ctx, dirent, f);
+    }
+
+    f->close_section(); // "modified"
+
+    processed += entries.size();
+    encode_json("total processed", processed, f);
+    encode_json("marker", list_op.get_next_marker().name, f);
+    f->close_section(); // "batch"
+
+    flusher.flush(); // flush after each 'chunk'
+  } while (is_truncated);
+
+  f->close_section(); // "progress" array
+  return 0;
+}
+
 int RGWRados::bucket_set_reshard(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const cls_rgw_bucket_instance_entry& entry)
 {
   RGWSI_RADOS::Pool index_pool;
@@ -5952,7 +6115,8 @@ int RGWRados::set_attr(const DoutPrefixProvider *dpp, RGWObjectCtx* rctx, RGWBuc
 int RGWRados::set_attrs(const DoutPrefixProvider *dpp, RGWObjectCtx* rctx, RGWBucketInfo& bucket_info, const rgw_obj& src_obj,
                         map<string, bufferlist>& attrs,
                         map<string, bufferlist>* rmattrs,
-                        optional_yield y)
+                        optional_yield y,
+                        ceph::real_time set_mtime /* = zero() */)
 {
   rgw_obj obj = src_obj;
   if (obj.key.instance == "null") {
@@ -6038,6 +6202,9 @@ int RGWRados::set_attrs(const DoutPrefixProvider *dpp, RGWObjectCtx* rctx, RGWBu
    * set the metadata.
    * Hence do not update mtime for any other attr changes */
   real_time mtime = state->mtime;
+  if (set_mtime != ceph::real_clock::zero()) {
+    mtime = set_mtime;
+  }
   struct timespec mtime_ts = real_clock::to_timespec(mtime);
   op.mtime2(&mtime_ts);
   auto& ioctx = ref.pool.ioctx();
@@ -6091,6 +6258,8 @@ int RGWRados::set_attrs(const DoutPrefixProvider *dpp, RGWObjectCtx* rctx, RGWBu
     if (iter != state->attrset.end()) {
       iter->second = state->obj_tag;
     }
+
+    state->mtime = mtime;
   }
 
   return 0;
@@ -7514,6 +7683,7 @@ int RGWRados::apply_olh_log(const DoutPrefixProvider *dpp,
     return r;
   }
 
+
   if (need_to_remove) {
     string olh_tag(state.olh_tag.c_str(), state.olh_tag.length());
     r = clear_olh(dpp, obj_ctx, obj, bucket_info, ref, olh_tag, last_ver, null_yield);
@@ -7521,12 +7691,12 @@ int RGWRados::apply_olh_log(const DoutPrefixProvider *dpp,
       ldpp_dout(dpp, 0) << "ERROR: could not clear olh, r=" << r << dendl;
       return r;
     }
-  }
-
-  r = bucket_index_trim_olh_log(dpp, bucket_info, state, obj, last_ver);
-  if (r < 0 && r != -ECANCELED) {
-    ldpp_dout(dpp, 0) << "ERROR: could not trim olh log, r=" << r << dendl;
-    return r;
+  } else {
+    r = bucket_index_trim_olh_log(dpp, bucket_info, state, obj, last_ver);
+    if (r < 0 && r != -ECANCELED) {
+      ldpp_dout(dpp, 0) << "ERROR: could not trim olh log, r=" << r << dendl;
+      return r;
+    }
   }
 
   return 0;
@@ -7547,7 +7717,6 @@ int RGWRados::clear_olh(const DoutPrefixProvider *dpp,
   return clear_olh(dpp, obj_ctx, obj, bucket_info, ref, tag, ver, y);
 }
 
-
 int RGWRados::clear_olh(const DoutPrefixProvider *dpp,
                         RGWObjectCtx& obj_ctx,
                         const rgw_obj& obj,
@@ -7681,6 +7850,13 @@ int RGWRados::set_olh(const DoutPrefixProvider *dpp, RGWObjectCtx& obj_ctx,
         }
         continue;
       }
+      // it's possible that the pending xattr from this op prevented the olh
+      // object from being cleaned by another thread that was deleting the last
+      // existing version. We invoke a best-effort update_olh here to handle this case.
+      int r = update_olh(dpp, obj_ctx, state, bucket_info, olh_obj);
+      if (r < 0 && r != -ECANCELED) {
+        ldpp_dout(dpp, 20) << "update_olh() target_obj=" << olh_obj << " returned " << r << dendl;
+      }
       return ret;
     }
     break;
@@ -7744,6 +7920,13 @@ int RGWRados::unlink_obj_instance(const DoutPrefixProvider *dpp, RGWObjectCtx& o
       if (ret == -ECANCELED) {
         continue;
       }
+      // it's possible that the pending xattr from this op prevented the olh
+      // object from being cleaned by another thread that was deleting the last
+      // existing version. We invoke a best-effort update_olh here to handle this case.
+      int r = update_olh(dpp, obj_ctx, state, bucket_info, olh_obj, zones_trace);
+      if (r < 0 && r != -ECANCELED) {
+        ldpp_dout(dpp, 20) << "update_olh() target_obj=" << olh_obj << " returned " << r << dendl;
+      }
       return ret;
     }
     break;