]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/driver/rados/rgw_bucket.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / rgw / driver / rados / rgw_bucket.cc
index 70e5581b1f122ba1a5414edda7fc3af7cdccf7fa..32cd1ccf95166c63dd45bd376a74d30bb9672aff 100644 (file)
@@ -28,6 +28,14 @@ constexpr uint64_t BUCKET_TAG_QUICK_TIMEOUT = 30;
 
 using namespace std;
 
+// these values are copied from cls/rgw/cls_rgw.cc
+static const string BI_OLH_ENTRY_NS_START = "\x80" "1001_";
+static const string BI_INSTANCE_ENTRY_NS_START = "\x80" "1000_";
+
+// number of characters that we should allow to be buffered by the formatter
+// before flushing (used by index check methods with dump_keys=true)
+static constexpr int FORMATTER_LEN_FLUSH_THRESHOLD = 4 * 1024 * 1024;
+
 // default number of entries to list with each bucket listing call
 // (use marker to bridge between calls)
 static constexpr size_t listing_max_entries = 1000;
@@ -463,6 +471,380 @@ int RGWBucket::check_object_index(const DoutPrefixProvider *dpp,
   return 0;
 }
 
+/**
+ * Loops over all olh entries in a bucket shard and finds ones with
+ * exists=false and pending_removal=true. If the pending log is empty on
+ * these entries, they were left behind after the last remaining version of
+ * an object was deleted or after an incomplete upload. This was known to
+ * happen historically due to concurrency conflicts among requests referencing
+ * the same object key. If op_state.fix_index is true, we continue where the
+ * request left off by calling RGWRados::clear_olh. If the pending log is not
+ * empty, we attempt to apply it.
+ */
+static int check_index_olh(rgw::sal::RadosStore* const rados_store,
+                           rgw::sal::Bucket* const bucket,
+                           const DoutPrefixProvider *dpp,
+                           RGWBucketAdminOpState& op_state,
+                           RGWFormatterFlusher& flusher,
+                           const int shard, 
+                           uint64_t* const count_out,
+                           optional_yield y)
+{
+  string marker = BI_OLH_ENTRY_NS_START;
+  bool is_truncated = true;
+  list<rgw_cls_bi_entry> entries;
+
+  RGWObjectCtx obj_ctx(rados_store);
+  RGWRados* store = rados_store->getRados();
+  RGWRados::BucketShard bs(store);
+
+  int ret = bs.init(dpp, bucket->get_info(), bucket->get_info().layout.current_index, shard);
+  if (ret < 0) {
+    ldpp_dout(dpp, -1) << "ERROR bs.init(bucket=" << bucket << "): " << cpp_strerror(-ret) << dendl;
+    return ret;
+  }
+  
+  *count_out = 0;
+  do {
+    entries.clear();
+    ret = store->bi_list(bs, "", marker, -1, &entries, &is_truncated);
+    if (ret < 0) {
+      ldpp_dout(dpp, -1) << "ERROR bi_list(): " << cpp_strerror(-ret) << dendl;
+      break;
+    }
+    list<rgw_cls_bi_entry>::iterator iter;
+    for (iter = entries.begin(); iter != entries.end(); ++iter) {
+      rgw_cls_bi_entry& entry = *iter;
+      marker = entry.idx;
+      if (entry.type != BIIndexType::OLH) {
+        is_truncated = false;
+        break;
+      }
+      rgw_bucket_olh_entry olh_entry;
+      auto iiter = entry.data.cbegin();
+      try {
+        decode(olh_entry, iiter);
+      } catch (buffer::error& err) {
+        ldpp_dout(dpp, -1) << "ERROR failed to decode olh entry for key: " << entry.idx << dendl;
+        continue;
+      }
+      if (olh_entry.exists || !olh_entry.pending_removal) {
+        continue;
+      }
+      if (op_state.will_fix_index()) {
+        rgw_obj obj(bucket->get_key(), olh_entry.key.name);
+        if (olh_entry.pending_log.empty()) {
+          ret = store->clear_olh(dpp, obj_ctx, obj, bucket->get_info(), olh_entry.tag, olh_entry.epoch, y);
+          if (ret < 0) {
+            ldpp_dout(dpp, -1) << "ERROR failed to clear olh for: " << olh_entry.key.name << " clear_olh(): " << cpp_strerror(-ret) << dendl;
+            continue;
+          }
+        } else {
+          std::unique_ptr<rgw::sal::Object> object = bucket->get_object({olh_entry.key.name});
+          RGWObjState *state;
+          ret = object->get_obj_state(dpp, &state, y, false);
+          if (ret < 0) {
+            ldpp_dout(dpp, -1) << "ERROR failed to get state for: " << olh_entry.key.name << " get_obj_state(): " << cpp_strerror(-ret) << dendl;
+            continue;
+          }
+          ret = store->update_olh(dpp, obj_ctx, state, bucket->get_info(), obj);
+          if (ret < 0) {
+            ldpp_dout(dpp, -1) << "ERROR failed to update olh for: " << olh_entry.key.name << " update_olh(): " << cpp_strerror(-ret) << dendl;
+            continue;
+          }
+        }
+      }
+      if (op_state.dump_keys) {
+        flusher.get_formatter()->dump_string("", olh_entry.key.name);
+        if (flusher.get_formatter()->get_len() > FORMATTER_LEN_FLUSH_THRESHOLD) {
+          flusher.flush();
+        }
+      }
+      *count_out += 1;
+    }
+  } while (is_truncated);
+  flusher.flush();
+  return 0;
+}
+
+
+/**
+ * Spawns separate coroutines to check each bucket shard for leftover
+ * olh entries (and remove them if op_state.fix_index is true).
+ */
+int RGWBucket::check_index_olh(rgw::sal::RadosStore* const rados_store,
+                               const DoutPrefixProvider *dpp,
+                               RGWBucketAdminOpState& op_state,
+                               RGWFormatterFlusher& flusher)
+{
+  const RGWBucketInfo& bucket_info = get_bucket_info();
+  if ((bucket_info.versioning_status() & BUCKET_VERSIONED) == 0) {
+    ldpp_dout(dpp, 0) << "WARNING: this command is only applicable to versioned buckets" << dendl;
+    return 0;
+  }
+  
+  Formatter* formatter = flusher.get_formatter();
+  if (op_state.dump_keys) {
+    formatter->open_array_section("");
+  }
+
+  const int max_shards = rgw::num_shards(bucket_info.layout.current_index);
+  std::string verb = op_state.will_fix_index() ? "removed" : "found";
+  uint64_t count_out = 0;
+  
+  boost::asio::io_context context;
+  int next_shard = 0;
+  
+  const int max_aio = std::max(1, op_state.get_max_aio());
+
+  for (int i=0; i<max_aio; i++) {
+    spawn::spawn(context, [&](yield_context yield) {
+      while (true) {
+        int shard = next_shard;
+        next_shard += 1;
+        if (shard >= max_shards) {
+          return;
+        }
+        optional_yield y(context, yield);
+        uint64_t shard_count;
+        int r = ::check_index_olh(rados_store, &*bucket, dpp, op_state, flusher, shard, &shard_count, y);
+        if (r < 0) {
+          ldpp_dout(dpp, -1) << "NOTICE: error processing shard " << shard << 
+            " check_index_olh(): " << r << dendl;
+        }
+        count_out += shard_count;
+        if (!op_state.hide_progress) {
+          ldpp_dout(dpp, 1) << "NOTICE: finished shard " << shard << " (" << shard_count <<
+            " entries " << verb << ")" << dendl;
+        }
+      }
+    });
+  }
+  try {
+    context.run();
+  } catch (const std::system_error& e) {
+    return -e.code().value();
+  }
+  if (!op_state.hide_progress) {
+    ldpp_dout(dpp, 1) << "NOTICE: finished all shards (" << count_out <<
+      " entries " << verb << ")" << dendl;
+  }
+  if (op_state.dump_keys) {
+    formatter->close_section();
+    flusher.flush();
+  }
+  return 0;
+}
+
+/**
+ * Indicates whether a versioned bucket instance entry is listable in the
+ * index. It does this by looping over all plain entries with prefix equal to
+ * the key name, and checking whether any have an instance ID matching the one
+ * on the specified key. The existence of an instance entry without a matching
+ * plain entry indicates that the object was uploaded successfully, but the
+ * request exited prior to linking the object into the index (via the creation
+ * of a plain entry).
+ */
+static int is_versioned_instance_listable(const DoutPrefixProvider *dpp,
+                                          RGWRados::BucketShard& bs,
+                                          const cls_rgw_obj_key& key,
+                                          bool& listable,
+                                          optional_yield y)
+{
+  const std::string empty_delim;
+  cls_rgw_obj_key marker;
+  rgw_cls_list_ret result;
+  listable = false;
+
+  do {
+    librados::ObjectReadOperation op;
+    cls_rgw_bucket_list_op(op, marker, key.name, empty_delim, 1000,
+                           true, &result);
+    bufferlist ibl;
+    int r = bs.bucket_obj.operate(dpp, &op, &ibl, y);
+    if (r < 0) {
+      return r;
+    }
+
+    for (auto const& entry : result.dir.m) {
+      if (entry.second.key == key) {
+        listable = true;
+        return 0;
+      }
+      marker = entry.second.key;
+    }
+  } while (result.is_truncated);
+  return 0;
+}
+
+/**
+ * Loops over all instance entries in a bucket shard and finds ones with
+ * versioned_epoch=0 and an mtime that is earlier than op_state.min_age
+ * relative to the current time. These entries represent objects that were
+ * uploaded successfully but were not successfully linked into the object
+ * index. As an extra precaution, we also verify that these entries are indeed
+ * non listable (have no corresponding plain entry in the index). We can assume
+ * that clients received an error response for the associated upload requests
+ * since the bucket index linking transaction did not complete. Therefore, if
+ * op_state.fix_index is true, we remove the object that is associated with the
+ * instance entry.
+ */
+static int check_index_unlinked(rgw::sal::RadosStore* const rados_store,
+                                rgw::sal::Bucket* const bucket,
+                                const DoutPrefixProvider *dpp,
+                                RGWBucketAdminOpState& op_state,
+                                RGWFormatterFlusher& flusher,
+                                const int shard, 
+                                uint64_t* const count_out,
+                                optional_yield y)
+{
+  string marker = BI_INSTANCE_ENTRY_NS_START;
+  bool is_truncated = true;
+  list<rgw_cls_bi_entry> entries;
+
+  RGWObjectCtx obj_ctx(rados_store);
+  RGWRados* store = rados_store->getRados();
+  RGWRados::BucketShard bs(store);
+
+  int ret = bs.init(dpp, bucket->get_info(), bucket->get_info().layout.current_index, shard);
+  if (ret < 0) {
+    ldpp_dout(dpp, -1) << "ERROR bs.init(bucket=" << bucket << "): " << cpp_strerror(-ret) << dendl;
+    return ret;
+  }
+
+  ceph::real_clock::time_point now = ceph::real_clock::now();
+  ceph::real_clock::time_point not_after = now - op_state.min_age;
+  
+  *count_out = 0;
+  do {
+    entries.clear();
+    ret = store->bi_list(bs, "", marker, -1, &entries, &is_truncated);
+    if (ret < 0) {
+      ldpp_dout(dpp, -1) << "ERROR bi_list(): " << cpp_strerror(-ret) << dendl;
+      break;
+    }
+    list<rgw_cls_bi_entry>::iterator iter;
+    for (iter = entries.begin(); iter != entries.end(); ++iter) {
+      rgw_cls_bi_entry& entry = *iter;
+      marker = entry.idx;
+      if (entry.type != BIIndexType::Instance) {
+        is_truncated = false;
+        break;
+      }
+      rgw_bucket_dir_entry dir_entry;
+      auto iiter = entry.data.cbegin();
+      try {
+        decode(dir_entry, iiter);
+      } catch (buffer::error& err) {
+        ldpp_dout(dpp, -1) << "ERROR failed to decode instance entry for key: " <<
+          entry.idx << dendl;
+        continue;
+      }
+      if (dir_entry.versioned_epoch != 0 || dir_entry.meta.mtime > not_after) {
+        continue;
+      }
+      bool listable;
+      ret = is_versioned_instance_listable(dpp, bs, dir_entry.key, listable, y);
+      if (ret < 0) {
+        ldpp_dout(dpp, -1) << "ERROR is_versioned_instance_listable(key='" <<
+          dir_entry.key << "'): " << cpp_strerror(-ret) << dendl;
+        continue;
+      }
+      if (listable) {
+        continue;
+      }
+      if (op_state.will_fix_index()) {
+        rgw_obj_key key(dir_entry.key.name, dir_entry.key.instance);
+        ret = rgw_remove_object(dpp, rados_store, bucket, key);
+        if (ret < 0) {
+          ldpp_dout(dpp, -1) << "ERROR rgw_remove_obj(key='" <<
+            dir_entry.key << "'): " << cpp_strerror(-ret) << dendl;
+          continue;
+        }
+      }
+      if (op_state.dump_keys) {
+        Formatter* const formatter = flusher.get_formatter();
+        formatter->open_object_section("object_instance");
+        formatter->dump_string("name", dir_entry.key.name);
+        formatter->dump_string("instance", dir_entry.key.instance);
+        formatter->close_section();
+        if (formatter->get_len() > FORMATTER_LEN_FLUSH_THRESHOLD) {
+          flusher.flush();
+        }
+      }
+      *count_out += 1;
+    }
+  } while (is_truncated);
+  flusher.flush();
+  return 0;
+}
+
+/**
+ * Spawns separate coroutines to check each bucket shard for unlinked
+ * instance entries (and remove them if op_state.fix_index is true).
+ */
+int RGWBucket::check_index_unlinked(rgw::sal::RadosStore* const rados_store,
+                                    const DoutPrefixProvider *dpp,
+                                    RGWBucketAdminOpState& op_state,
+                                    RGWFormatterFlusher& flusher)
+{
+  const RGWBucketInfo& bucket_info = get_bucket_info();
+  if ((bucket_info.versioning_status() & BUCKET_VERSIONED) == 0) {
+    ldpp_dout(dpp, 0) << "WARNING: this command is only applicable to versioned buckets" << dendl;
+    return 0;
+  }
+  
+  Formatter* formatter = flusher.get_formatter();
+  if (op_state.dump_keys) {
+    formatter->open_array_section("");
+  }
+
+  const int max_shards = rgw::num_shards(bucket_info.layout.current_index);
+  std::string verb = op_state.will_fix_index() ? "removed" : "found";
+  uint64_t count_out = 0;
+  
+  int max_aio = std::max(1, op_state.get_max_aio());
+  int next_shard = 0;
+  boost::asio::io_context context;
+  for (int i=0; i<max_aio; i++) {
+    spawn::spawn(context, [&](yield_context yield) {
+      while (true) {
+        int shard = next_shard;
+        next_shard += 1;
+        if (shard >= max_shards) {
+          return;
+        }
+        uint64_t shard_count;
+        optional_yield y {context, yield};
+        int r = ::check_index_unlinked(rados_store, &*bucket, dpp, op_state, flusher, shard, &shard_count, y);
+        if (r < 0) {
+          ldpp_dout(dpp, -1) << "ERROR: error processing shard " << shard << 
+            " check_index_unlinked(): " << r << dendl;
+        }
+        count_out += shard_count;
+        if (!op_state.hide_progress) {
+          ldpp_dout(dpp, 1) << "NOTICE: finished shard " << shard << " (" << shard_count <<
+            " entries " << verb << ")" << dendl;
+        }
+      }
+    });
+  }
+  try {
+    context.run();
+  } catch (const std::system_error& e) {
+    return -e.code().value();
+  }
+
+  if (!op_state.hide_progress) {
+    ldpp_dout(dpp, 1) << "NOTICE: finished all shards (" << count_out <<
+      " entries " << verb << ")" << dendl;
+  }
+  if (op_state.dump_keys) {
+    formatter->close_section();
+    flusher.flush();
+  }
+  return 0;
+}
 
 int RGWBucket::check_index(const DoutPrefixProvider *dpp,
         RGWBucketAdminOpState& op_state,
@@ -783,6 +1165,46 @@ int RGWBucketAdminOp::chown(rgw::sal::Driver* driver, RGWBucketAdminOpState& op_
 
 }
 
+int RGWBucketAdminOp::check_index_olh(rgw::sal::RadosStore* store, RGWBucketAdminOpState& op_state,
+                  RGWFormatterFlusher& flusher, const DoutPrefixProvider *dpp)
+{
+  RGWBucket bucket;
+  int ret = bucket.init(store, op_state, null_yield, dpp);
+  if (ret < 0) {
+    ldpp_dout(dpp, -1) << "bucket.init(): " << ret << dendl;
+    return ret;
+  }
+  flusher.start(0);
+  ret = bucket.check_index_olh(store, dpp, op_state, flusher);
+  if (ret < 0) {
+    ldpp_dout(dpp, -1) << "check_index_olh(): " << ret << dendl;
+    return ret;
+  }
+  flusher.flush();
+  return 0;
+}
+
+int RGWBucketAdminOp::check_index_unlinked(rgw::sal::RadosStore* store,
+                                           RGWBucketAdminOpState& op_state,
+                                           RGWFormatterFlusher& flusher,
+                                           const DoutPrefixProvider *dpp)
+{
+  flusher.start(0);
+  RGWBucket bucket;
+  int ret = bucket.init(store, op_state, null_yield, dpp);
+  if (ret < 0) {
+    ldpp_dout(dpp, -1) << "bucket.init(): " << ret << dendl;
+    return ret;
+  }
+  ret = bucket.check_index_unlinked(store, dpp, op_state, flusher);
+  if (ret < 0) {
+    ldpp_dout(dpp, -1) << "check_index_unlinked(): " << ret << dendl;
+    return ret;
+  }
+  flusher.flush();
+  return 0;
+}
+
 int RGWBucketAdminOp::check_index(rgw::sal::Driver* driver, RGWBucketAdminOpState& op_state,
                   RGWFormatterFlusher& flusher, optional_yield y, const DoutPrefixProvider *dpp)
 {
@@ -799,20 +1221,25 @@ int RGWBucketAdminOp::check_index(rgw::sal::Driver* driver, RGWBucketAdminOpStat
 
   Formatter *formatter = flusher.get_formatter();
   flusher.start(0);
+  formatter->open_object_section("bucket_check");
 
   ret = bucket.check_bad_index_multipart(op_state, flusher, dpp);
   if (ret < 0)
     return ret;
 
-  ret = bucket.check_object_index(dpp, op_state, flusher, y);
-  if (ret < 0)
-    return ret;
+  if (op_state.will_check_objects()) {
+    ret = bucket.check_object_index(dpp, op_state, flusher, y);
+    if (ret < 0)
+      return ret;
+  }
 
   ret = bucket.check_index(dpp, op_state, existing_stats, calculated_stats);
   if (ret < 0)
     return ret;
 
   dump_index_check(existing_stats, calculated_stats, formatter);
+  
+  formatter->close_section();
   flusher.flush();
 
   return 0;
@@ -875,6 +1302,8 @@ static int bucket_stats(rgw::sal::Driver* driver,
     return ret;
   }
 
+  const RGWBucketInfo& bucket_info = bucket->get_info();
+
   const auto& index = bucket->get_info().get_current_index();
   if (is_layout_indexless(index)) {
     cerr << "error, indexless buckets do not maintain stats; bucket=" <<
@@ -904,6 +1333,10 @@ static int bucket_stats(rgw::sal::Driver* driver,
   formatter->dump_string("id", bucket->get_bucket_id());
   formatter->dump_string("marker", bucket->get_marker());
   formatter->dump_stream("index_type") << bucket->get_info().layout.current_index.layout.type;
+  formatter->dump_bool("versioned", bucket_info.versioned());
+  formatter->dump_bool("versioning_enabled", bucket_info.versioning_enabled());
+  formatter->dump_bool("object_lock_enabled", bucket_info.obj_lock_enabled());
+  formatter->dump_bool("mfa_enabled", bucket_info.mfa_enabled());
   ::encode_json("owner", bucket->get_info().owner, formatter);
   formatter->dump_string("ver", bucket_ver);
   formatter->dump_string("master_ver", master_ver);
@@ -1863,7 +2296,7 @@ public:
 
     new_be.bucket.name = new_bucket_name;
 
-    ret = ctl.bucket->store_bucket_instance_info(be.bucket, new_bi, y, dpp, RGWBucketCtl::BucketInstance::PutParams()
+    ret = ctl.bucket->store_bucket_instance_info(new_be.bucket, new_bi, y, dpp, RGWBucketCtl::BucketInstance::PutParams()
                                                                     .set_exclusive(false)
                                                                     .set_mtime(orig_mtime)
                                                                     .set_attrs(&attrs_m)