]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_rados.cc
update sources to 12.2.7
[ceph.git] / ceph / src / rgw / rgw_rados.cc
index 460029be925414fd0b65fbd40729ca78ddaae47a..100f77a699bc0310cfb1b0684cfd7dc174f9dedf 100644 (file)
@@ -1,4 +1,3 @@
-
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
 
@@ -171,6 +170,14 @@ int rgw_init_ioctx(librados::Rados *rados, const rgw_pool& pool, IoCtx& ioctx, b
   int r = rados->ioctx_create(pool.name.c_str(), ioctx);
   if (r == -ENOENT && create) {
     r = rados->pool_create(pool.name.c_str());
+    if (r == -ERANGE) {
+      dout(0)
+        << __func__
+        << " ERROR: librados::Rados::pool_create returned " << cpp_strerror(-r)
+        << " (this can be due to a pool or placement group misconfiguration, e.g."
+        << " pg_num < pgp_num or mon_max_pg_per_osd exceeded)"
+        << dendl;
+    }
     if (r < 0 && r != -EEXIST) {
       return r;
     }
@@ -2792,6 +2799,22 @@ int RGWPutObjProcessor_Atomic::do_complete(size_t accounted_size, const string&
   return 0;
 }
 
+const char* RGWRados::admin_commands[4][3] = {
+  { "cache list",
+    "cache list name=filter,type=CephString,req=false",
+    "cache list [filter_str]: list object cache, possibly matching substrings" },
+  { "cache inspect",
+    "cache inspect name=target,type=CephString,req=true",
+    "cache inspect target: print cache element" },
+  { "cache erase",
+    "cache erase name=target,type=CephString,req=true",
+    "cache erase target: erase element from cache" },
+  { "cache zap",
+    "cache zap",
+    "cache zap: erase all elements from cache" }
+};
+
+
 int RGWRados::watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx) {
   int r = control_pool_ctx.watch2(oid, watch_handle, ctx);
   if (r < 0)
@@ -3669,6 +3692,15 @@ bool RGWIndexCompletionManager::handle_completion(completion_t cb, complete_op_d
 
 void RGWRados::finalize()
 {
+  auto admin_socket = cct->get_admin_socket();
+  for (auto cmd : admin_commands) {
+    int r = admin_socket->unregister_command(cmd[0]);
+    if (r < 0) {
+      lderr(cct) << "ERROR: fail to unregister admin socket command (r=" << r
+                 << ")" << dendl;
+    }
+  }
+
   if (run_sync_thread) {
     Mutex::Locker l(meta_sync_thread_lock);
     meta_sync_processor_thread->stop();
@@ -3773,6 +3805,17 @@ void RGWRados::finalize()
 int RGWRados::init_rados()
 {
   int ret = 0;
+  auto admin_socket = cct->get_admin_socket();
+  for (auto cmd : admin_commands) {
+    int r = admin_socket->register_command(cmd[0], cmd[1], this,
+                                           cmd[2]);
+    if (r < 0) {
+      lderr(cct) << "ERROR: fail to register admin socket command (r=" << r
+                 << ")" << dendl;
+      return r;
+    }
+  }
+
   auto handles = std::vector<librados::Rados>{cct->_conf->rgw_num_rados_handles};
 
   for (auto& r : handles) {
@@ -4826,28 +4869,10 @@ void RGWRados::pick_control_oid(const string& key, string& notify_oid)
   notify_oid.append(buf);
 }
 
-int RGWRados::open_pool_ctx(const rgw_pool& pool, librados::IoCtx&  io_ctx)
+int RGWRados::open_pool_ctx(const rgw_pool& pool, librados::IoCtx& io_ctx)
 {
-  librados::Rados *rad = get_rados_handle();
-  int r = rgw_init_ioctx(rad, pool, io_ctx);
-  if (r != -ENOENT)
-    return r;
-
-  if (!pools_initialized)
-    return r;
-
-  r = rad->pool_create(pool.name.c_str());
-  if (r < 0 && r != -EEXIST)
-    return r;
-
-  r = rgw_init_ioctx(rad, pool, io_ctx);
-  if (r < 0)
-    return r;
-
-  r = io_ctx.application_enable(pg_pool_t::APPLICATION_NAME_RGW, false);
-  if (r < 0 && r != -EOPNOTSUPP)
-    return r;
-  return 0;
+  constexpr bool create = true; // create the pool if it doesn't exist
+  return rgw_init_ioctx(get_rados_handle(), pool, io_ctx, create);
 }
 
 void RGWRados::build_bucket_index_marker(const string& shard_id_str, const string& shard_marker,
@@ -4861,6 +4886,12 @@ void RGWRados::build_bucket_index_marker(const string& shard_id_str, const strin
 
 int RGWRados::open_bucket_index_ctx(const RGWBucketInfo& bucket_info, librados::IoCtx& index_ctx)
 {
+  const rgw_pool& explicit_pool = bucket_info.bucket.explicit_placement.index_pool;
+
+  if (!explicit_pool.empty()) {
+    return open_pool_ctx(explicit_pool, index_ctx);
+  }
+
   const string *rule = &bucket_info.placement_rule;
   if (rule->empty()) {
     rule = &zonegroup.default_placement;
@@ -5726,31 +5757,9 @@ done:
  */
 int RGWRados::create_pool(const rgw_pool& pool)
 {
-  int ret = 0;
-
-  librados::Rados *rad = get_rados_handle();
-  ret = rad->pool_create(pool.name.c_str(), 0);
-  if (ret == -EEXIST)
-    ret = 0;
-  else if (ret == -ERANGE) {
-    ldout(cct, 0)
-      << __func__
-      << " ERROR: librados::Rados::pool_create returned " << cpp_strerror(-ret)
-      << " (this can be due to a pool or placement group misconfiguration, e.g., pg_num < pgp_num)"
-      << dendl;
-  }
-  if (ret < 0)
-    return ret;
-
   librados::IoCtx io_ctx;
-  ret = rad->ioctx_create(pool.name.c_str(), io_ctx);
-  if (ret < 0)
-    return ret;
-
-  ret = io_ctx.application_enable(pg_pool_t::APPLICATION_NAME_RGW, false);
-  if (ret < 0 && ret != -EOPNOTSUPP)
-    return ret;
-  return 0;
+  constexpr bool create = true;
+  return rgw_init_ioctx(get_rados_handle(), pool, io_ctx, create);
 }
 
 int RGWRados::init_bucket_index(RGWBucketInfo& bucket_info, int num_shards)
@@ -7272,12 +7281,44 @@ bool RGWRados::aio_completed(void *handle)
   return c->is_safe();
 }
 
+// PutObj filter that buffers data so we don't try to compress tiny blocks.
+// libcurl reads in 16k at a time, and we need at least 64k to get a good
+// compression ratio
+class RGWPutObj_Buffer : public RGWPutObj_Filter {
+  const unsigned buffer_size;
+  bufferlist buffer;
+ public:
+  RGWPutObj_Buffer(RGWPutObjDataProcessor* next, unsigned buffer_size)
+    : RGWPutObj_Filter(next), buffer_size(buffer_size) {
+    assert(ISP2(buffer_size)); // must be power of 2
+  }
+
+  int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj,
+                  bool *again) override {
+    if (*again || !bl.length()) {
+      // flush buffered data
+      return RGWPutObj_Filter::handle_data(buffer, ofs, phandle, pobj, again);
+    }
+    // transform offset to the beginning of the buffer
+    ofs = ofs - buffer.length();
+    buffer.claim_append(bl);
+    if (buffer.length() < buffer_size) {
+      *again = false; // don't come back until there's more data
+      return 0;
+    }
+    const auto count = P2ALIGN(buffer.length(), buffer_size);
+    buffer.splice(0, count, &bl);
+    return RGWPutObj_Filter::handle_data(bl, ofs, phandle, pobj, again);
+  }
+};
+
 class RGWRadosPutObj : public RGWGetDataCB
 {
   CephContext* cct;
   rgw_obj obj;
   RGWPutObjDataProcessor *filter;
   boost::optional<RGWPutObj_Compress>& compressor;
+  boost::optional<RGWPutObj_Buffer> buffering;
   CompressorRef& plugin;
   RGWPutObjProcessor_Atomic *processor;
   RGWOpStateSingleOp *opstate;
@@ -7323,7 +7364,9 @@ public:
     if (plugin && src_attrs.find(RGW_ATTR_CRYPT_MODE) == src_attrs.end()) {
       //do not compress if object is encrypted
       compressor = boost::in_place(cct, plugin, filter);
-      filter = &*compressor;
+      constexpr unsigned buffer_size = 512 * 1024;
+      buffering = boost::in_place(&*compressor, buffer_size);
+      filter = &*buffering;
     }
     return 0;
   }
@@ -7395,6 +7438,11 @@ public:
     return 0;
   }
 
+  int flush() {
+    bufferlist bl;
+    return put_data_and_throttle(filter, bl, 0, false);
+  }
+
   bufferlist& get_extra_data() { return extra_data_bl; }
 
   map<string, bufferlist>& get_attrs() { return src_attrs; }
@@ -7824,6 +7872,10 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
   if (ret < 0) {
     goto set_err_state;
   }
+  ret = cb.flush();
+  if (ret < 0) {
+    goto set_err_state;
+  }
   if (compressor && compressor->is_compressed()) {
     bufferlist tmp;
     RGWCompressionInfo cs_info;
@@ -8061,6 +8113,15 @@ int RGWRados::copy_obj(RGWObjectCtx& obj_ctx,
   if (ret < 0) {
     return ret;
   }
+  if (src_attrs.count(RGW_ATTR_CRYPT_MODE)) {
+    // Current implementation does not follow S3 spec and even
+    // may result in data corruption silently when copying
+    // multipart objects acorss pools. So reject COPY operations
+    //on encrypted objects before it is fully functional.
+    ldout(cct, 0) << "ERROR: copy op for encrypted object " << src_obj
+                  << " has not been implemented." << dendl;
+    return -ERR_NOT_IMPLEMENTED;
+  }
 
   src_attrs[RGW_ATTR_ACL] = attrs[RGW_ATTR_ACL];
   src_attrs.erase(RGW_ATTR_DELETE_AT);
@@ -8949,12 +9010,9 @@ int RGWRados::Object::Delete::delete_obj()
 
   store->remove_rgw_head_obj(op);
   r = ref.ioctx.operate(ref.oid, &op);
-  bool need_invalidate = false;
-  if (r == -ECANCELED) {
-    /* raced with another operation, we can regard it as removed */
-    need_invalidate = true;
-    r = 0;
-  }
+
+  /* raced with another operation, object state is indeterminate */
+  const bool need_invalidate = (r == -ECANCELED);
 
   int64_t poolid = ref.ioctx.get_id();
   if (r >= 0) {
@@ -13236,6 +13294,23 @@ int RGWRados::cls_user_get_header(const string& user_id, cls_user_header *header
   return 0;
 }
 
+int RGWRados::cls_user_reset_stats(const string& user_id)
+{
+  string buckets_obj_id;
+  rgw_get_buckets_obj(user_id, buckets_obj_id);
+  rgw_raw_obj obj(get_zone_params().user_uid_pool, buckets_obj_id);
+
+  rgw_rados_ref ref;
+  int r = get_raw_obj_ref(obj, &ref);
+  if (r < 0) {
+    return r;
+  }
+
+  librados::ObjectWriteOperation op;
+  ::cls_user_reset_stats(op);
+  return ref.ioctx.operate(ref.oid, &op);
+}
+
 int RGWRados::cls_user_get_header_async(const string& user_id, RGWGetUserHeader_CB *ctx)
 {
   string buckets_obj_id;
@@ -13859,14 +13934,14 @@ uint64_t RGWRados::next_bucket_id()
   return ++max_bucket_id;
 }
 
-RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread)
+RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread,
+                                                bool quota_threads, bool run_sync_thread, bool run_reshard_thread, bool use_cache)
 {
-  int use_cache = cct->_conf->rgw_cache_enabled;
   RGWRados *store = NULL;
   if (!use_cache) {
     store = new RGWRados;
   } else {
-    store = new RGWCache<RGWRados>; 
+    store = new RGWCache<RGWRados>;
   }
 
   if (store->initialize(cct, use_gc_thread, use_lc_thread, quota_threads, run_sync_thread, run_reshard_thread) < 0) {
@@ -14023,3 +14098,71 @@ int rgw_compression_info_from_attrset(map<string, bufferlist>& attrs, bool& need
   }
 }
 
+bool RGWRados::call(std::string command, cmdmap_t& cmdmap, std::string format,
+                    bufferlist& out)
+{
+  if (command == "cache list") {
+    boost::optional<std::string> filter;
+    auto i = cmdmap.find("filter");
+    if (i != cmdmap.cend()) {
+      filter = boost::get<std::string>(i->second);
+    }
+    std::unique_ptr<Formatter> f(ceph::Formatter::create(format, "table"));
+    if (f) {
+      f->open_array_section("cache_entries");
+      call_list(filter, f.get());
+      f->close_section();
+      f->flush(out);
+      return true;
+    } else {
+      out.append("Unable to create Formatter.\n");
+      return false;
+    }
+  } else if (command == "cache inspect") {
+    std::unique_ptr<Formatter> f(ceph::Formatter::create(format, "json-pretty"));
+    if (f) {
+      const auto& target = boost::get<std::string>(cmdmap["target"]);
+      if (call_inspect(target, f.get())) {
+        f->flush(out);
+        return true;
+      } else {
+        out.append(string("Unable to find entry ") + target + string(".\n"));
+        return false;
+      }
+    } else {
+      out.append("Unable to create Formatter.\n");
+      return false;
+    }
+  } else if (command == "cache erase") {
+    const auto& target = boost::get<std::string>(cmdmap["target"]);
+    if (call_erase(target)) {
+      return true;
+    } else {
+      out.append(string("Unable to find entry ") + target + string(".\n"));
+      return false;
+    }
+  } else if (command == "cache zap") {
+    call_zap();
+    return true;
+  }
+  return false;
+}
+
+void RGWRados::call_list(const boost::optional<std::string>&,
+                         ceph::Formatter*)
+{
+  return;
+}
+
+bool RGWRados::call_inspect(const std::string&, Formatter*)
+{
+  return false;
+}
+
+bool RGWRados::call_erase(const std::string&) {
+  return false;
+}
+
+void RGWRados::call_zap() {
+  return;
+}