]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_rados.cc
update sources to 12.2.7
[ceph.git] / ceph / src / rgw / rgw_rados.cc
index fd30c347b99664d8e13da90e9918a9170a655c3a..100f77a699bc0310cfb1b0684cfd7dc174f9dedf 100644 (file)
@@ -170,6 +170,14 @@ int rgw_init_ioctx(librados::Rados *rados, const rgw_pool& pool, IoCtx& ioctx, b
   int r = rados->ioctx_create(pool.name.c_str(), ioctx);
   if (r == -ENOENT && create) {
     r = rados->pool_create(pool.name.c_str());
+    if (r == -ERANGE) {
+      dout(0)
+        << __func__
+        << " ERROR: librados::Rados::pool_create returned " << cpp_strerror(-r)
+        << " (this can be due to a pool or placement group misconfiguration, e.g."
+        << " pg_num < pgp_num or mon_max_pg_per_osd exceeded)"
+        << dendl;
+    }
     if (r < 0 && r != -EEXIST) {
       return r;
     }
@@ -4861,28 +4869,10 @@ void RGWRados::pick_control_oid(const string& key, string& notify_oid)
   notify_oid.append(buf);
 }
 
-int RGWRados::open_pool_ctx(const rgw_pool& pool, librados::IoCtx&  io_ctx)
+int RGWRados::open_pool_ctx(const rgw_pool& pool, librados::IoCtx& io_ctx)
 {
-  librados::Rados *rad = get_rados_handle();
-  int r = rgw_init_ioctx(rad, pool, io_ctx);
-  if (r != -ENOENT)
-    return r;
-
-  if (!pools_initialized)
-    return r;
-
-  r = rad->pool_create(pool.name.c_str());
-  if (r < 0 && r != -EEXIST)
-    return r;
-
-  r = rgw_init_ioctx(rad, pool, io_ctx);
-  if (r < 0)
-    return r;
-
-  r = io_ctx.application_enable(pg_pool_t::APPLICATION_NAME_RGW, false);
-  if (r < 0 && r != -EOPNOTSUPP)
-    return r;
-  return 0;
+  constexpr bool create = true; // create the pool if it doesn't exist
+  return rgw_init_ioctx(get_rados_handle(), pool, io_ctx, create);
 }
 
 void RGWRados::build_bucket_index_marker(const string& shard_id_str, const string& shard_marker,
@@ -5767,32 +5757,9 @@ done:
  */
 int RGWRados::create_pool(const rgw_pool& pool)
 {
-  int ret = 0;
-
-  librados::Rados *rad = get_rados_handle();
-  ret = rad->pool_create(pool.name.c_str(), 0);
-  if (ret == -EEXIST)
-    ret = 0;
-  else if (ret == -ERANGE) {
-    ldout(cct, 0)
-      << __func__
-      << " ERROR: librados::Rados::pool_create returned " << cpp_strerror(-ret)
-      << " (this can be due to a pool or placement group misconfiguration, e.g."
-      << " pg_num < pgp_num or mon_max_pg_per_osd exceeded)"
-      << dendl;
-  }
-  if (ret < 0)
-    return ret;
-
   librados::IoCtx io_ctx;
-  ret = rad->ioctx_create(pool.name.c_str(), io_ctx);
-  if (ret < 0)
-    return ret;
-
-  ret = io_ctx.application_enable(pg_pool_t::APPLICATION_NAME_RGW, false);
-  if (ret < 0 && ret != -EOPNOTSUPP)
-    return ret;
-  return 0;
+  constexpr bool create = true;
+  return rgw_init_ioctx(get_rados_handle(), pool, io_ctx, create);
 }
 
 int RGWRados::init_bucket_index(RGWBucketInfo& bucket_info, int num_shards)
@@ -7314,12 +7281,44 @@ bool RGWRados::aio_completed(void *handle)
   return c->is_safe();
 }
 
+// PutObj filter that buffers data so we don't try to compress tiny blocks.
+// libcurl reads in 16k at a time, and we need at least 64k to get a good
+// compression ratio
+class RGWPutObj_Buffer : public RGWPutObj_Filter {
+  const unsigned buffer_size;
+  bufferlist buffer;
+ public:
+  RGWPutObj_Buffer(RGWPutObjDataProcessor* next, unsigned buffer_size)
+    : RGWPutObj_Filter(next), buffer_size(buffer_size) {
+    assert(ISP2(buffer_size)); // must be power of 2
+  }
+
+  int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj,
+                  bool *again) override {
+    if (*again || !bl.length()) {
+      // flush buffered data
+      return RGWPutObj_Filter::handle_data(buffer, ofs, phandle, pobj, again);
+    }
+    // transform offset to the beginning of the buffer
+    ofs = ofs - buffer.length();
+    buffer.claim_append(bl);
+    if (buffer.length() < buffer_size) {
+      *again = false; // don't come back until there's more data
+      return 0;
+    }
+    const auto count = P2ALIGN(buffer.length(), buffer_size);
+    buffer.splice(0, count, &bl);
+    return RGWPutObj_Filter::handle_data(bl, ofs, phandle, pobj, again);
+  }
+};
+
 class RGWRadosPutObj : public RGWGetDataCB
 {
   CephContext* cct;
   rgw_obj obj;
   RGWPutObjDataProcessor *filter;
   boost::optional<RGWPutObj_Compress>& compressor;
+  boost::optional<RGWPutObj_Buffer> buffering;
   CompressorRef& plugin;
   RGWPutObjProcessor_Atomic *processor;
   RGWOpStateSingleOp *opstate;
@@ -7365,7 +7364,9 @@ public:
     if (plugin && src_attrs.find(RGW_ATTR_CRYPT_MODE) == src_attrs.end()) {
       //do not compress if object is encrypted
       compressor = boost::in_place(cct, plugin, filter);
-      filter = &*compressor;
+      constexpr unsigned buffer_size = 512 * 1024;
+      buffering = boost::in_place(&*compressor, buffer_size);
+      filter = &*buffering;
     }
     return 0;
   }
@@ -7437,6 +7438,11 @@ public:
     return 0;
   }
 
+  int flush() {
+    bufferlist bl;
+    return put_data_and_throttle(filter, bl, 0, false);
+  }
+
   bufferlist& get_extra_data() { return extra_data_bl; }
 
   map<string, bufferlist>& get_attrs() { return src_attrs; }
@@ -7866,6 +7872,10 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
   if (ret < 0) {
     goto set_err_state;
   }
+  ret = cb.flush();
+  if (ret < 0) {
+    goto set_err_state;
+  }
   if (compressor && compressor->is_compressed()) {
     bufferlist tmp;
     RGWCompressionInfo cs_info;
@@ -13924,14 +13934,14 @@ uint64_t RGWRados::next_bucket_id()
   return ++max_bucket_id;
 }
 
-RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread)
+RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread,
+                                                bool quota_threads, bool run_sync_thread, bool run_reshard_thread, bool use_cache)
 {
-  int use_cache = cct->_conf->rgw_cache_enabled;
   RGWRados *store = NULL;
   if (!use_cache) {
     store = new RGWRados;
   } else {
-    store = new RGWCache<RGWRados>; 
+    store = new RGWCache<RGWRados>;
   }
 
   if (store->initialize(cct, use_gc_thread, use_lc_thread, quota_threads, run_sync_thread, run_reshard_thread) < 0) {