]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_tools.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / rgw / rgw_tools.cc
index 057535e411f506cf501cb612ac5cd330872cfd02..b05fe0a0cc307c519fcf257042f7aa8159c6f7bf 100644 (file)
@@ -1,5 +1,5 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
+// vim: ts=8 sw=2 smarttab ft=cpp
 
 #include <errno.h>
 
@@ -23,6 +23,7 @@
 #include "osd/osd_types.h"
 
 #include "services/svc_sys_obj.h"
+#include "services/svc_zone.h"
 #include "services/svc_zone_utils.h"
 
 #define dout_subsys ceph_subsys_rgw
@@ -67,7 +68,7 @@ int rgw_init_ioctx(librados::Rados *rados, const rgw_pool& pool,
       float bias = g_conf().get_val<double>("rgw_rados_pool_autoscale_bias");
       int r = rados->mon_command(
        "{\"prefix\": \"osd pool set\", \"pool\": \"" +
-       pool.name + "\", \"var\": \"pg_autoscale_bias\": \"" +
+       pool.name + "\", \"var\": \"pg_autoscale_bias\", \"val\": \"" +
        stringify(bias) + "\"}",
        inbl, NULL, NULL);
       if (r < 0) {
@@ -78,12 +79,23 @@ int rgw_init_ioctx(librados::Rados *rados, const rgw_pool& pool,
       int min = g_conf().get_val<uint64_t>("rgw_rados_pool_pg_num_min");
       r = rados->mon_command(
        "{\"prefix\": \"osd pool set\", \"pool\": \"" +
-       pool.name + "\", \"var\": \"pg_num_min\": \"" +
+       pool.name + "\", \"var\": \"pg_num_min\", \"val\": \"" +
        stringify(min) + "\"}",
        inbl, NULL, NULL);
-     if (r < 0) {
-       dout(10) << __func__ << " warning: failed to set pg_num_min on "
-               << pool.name << dendl;
+      if (r < 0) {
+       dout(10) << __func__ << " warning: failed to set pg_num_min on "
+                << pool.name << dendl;
+      }
+      // set recovery_priority
+      int p = g_conf().get_val<uint64_t>("rgw_rados_pool_recovery_priority");
+      r = rados->mon_command(
+       "{\"prefix\": \"osd pool set\", \"pool\": \"" +
+       pool.name + "\", \"var\": \"recovery_priority\": \"" +
+       stringify(p) + "\"}",
+       inbl, NULL, NULL);
+      if (r < 0) {
+       dout(10) << __func__ << " warning: failed to set recovery_priority on "
+                << pool.name << dendl;
       }
     }
   } else if (r < 0) {
@@ -95,8 +107,54 @@ int rgw_init_ioctx(librados::Rados *rados, const rgw_pool& pool,
   return 0;
 }
 
-int rgw_put_system_obj(RGWRados *rgwstore, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive,
-                       RGWObjVersionTracker *objv_tracker, real_time set_mtime, map<string, bufferlist> *pattrs)
+void rgw_shard_name(const string& prefix, unsigned max_shards, const string& key, string& name, int *shard_id)
+{
+  uint32_t val = ceph_str_hash_linux(key.c_str(), key.size());
+  char buf[16];
+  if (shard_id) {
+    *shard_id = val % max_shards;
+  }
+  snprintf(buf, sizeof(buf), "%u", (unsigned)(val % max_shards));
+  name = prefix + buf;
+}
+
+void rgw_shard_name(const string& prefix, unsigned max_shards, const string& section, const string& key, string& name)
+{
+  uint32_t val = ceph_str_hash_linux(key.c_str(), key.size());
+  val ^= ceph_str_hash_linux(section.c_str(), section.size());
+  char buf[16];
+  snprintf(buf, sizeof(buf), "%u", (unsigned)(val % max_shards));
+  name = prefix + buf;
+}
+
+void rgw_shard_name(const string& prefix, unsigned shard_id, string& name)
+{
+  char buf[16];
+  snprintf(buf, sizeof(buf), "%u", shard_id);
+  name = prefix + buf;
+}
+
+int rgw_parse_list_of_flags(struct rgw_name_to_flag *mapping,
+                           const string& str, uint32_t *perm)
+{
+  list<string> strs;
+  get_str_list(str, strs);
+  list<string>::iterator iter;
+  uint32_t v = 0;
+  for (iter = strs.begin(); iter != strs.end(); ++iter) {
+    string& s = *iter;
+    for (int i = 0; mapping[i].type_name; i++) {
+      if (s.compare(mapping[i].type_name) == 0)
+        v |= mapping[i].flag;
+    }
+  }
+
+  *perm = v;
+  return 0;
+}
+
+int rgw_put_system_obj(RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive,
+                       RGWObjVersionTracker *objv_tracker, real_time set_mtime, optional_yield y, map<string, bufferlist> *pattrs)
 {
   map<string,bufferlist> no_attrs;
   if (!pattrs) {
@@ -105,33 +163,28 @@ int rgw_put_system_obj(RGWRados *rgwstore, const rgw_pool& pool, const string& o
 
   rgw_raw_obj obj(pool, oid);
 
-  auto obj_ctx = rgwstore->svc.sysobj->init_obj_ctx();
   auto sysobj = obj_ctx.get_obj(obj);
   int ret = sysobj.wop()
                   .set_objv_tracker(objv_tracker)
                   .set_exclusive(exclusive)
                   .set_mtime(set_mtime)
                   .set_attrs(*pattrs)
-                  .write(data);
-
-  if (ret == -ENOENT) {
-    ret = rgwstore->create_pool(pool);
-    if (ret >= 0) {
-      ret = sysobj.wop()
-                  .set_objv_tracker(objv_tracker)
-                  .set_exclusive(exclusive)
-                  .set_mtime(set_mtime)
-                  .set_attrs(*pattrs)
-                  .write(data);
-    }
-  }
+                  .write(data, y);
 
   return ret;
 }
 
-int rgw_get_system_obj(RGWRados *rgwstore, RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& key, bufferlist& bl,
-                       RGWObjVersionTracker *objv_tracker, real_time *pmtime, map<string, bufferlist> *pattrs,
-                       rgw_cache_entry_info *cache_info, boost::optional<obj_version> refresh_version)
+int rgw_put_system_obj(RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive,
+                       RGWObjVersionTracker *objv_tracker, real_time set_mtime, map<string, bufferlist> *pattrs)
+{
+  return rgw_put_system_obj(obj_ctx, pool, oid, data, exclusive,
+                            objv_tracker, set_mtime, null_yield, pattrs);
+}
+
+int rgw_get_system_obj(RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& key, bufferlist& bl,
+                       RGWObjVersionTracker *objv_tracker, real_time *pmtime, optional_yield y, map<string, bufferlist> *pattrs,
+                       rgw_cache_entry_info *cache_info,
+                      boost::optional<obj_version> refresh_version)
 {
   bufferlist::iterator iter;
   int request_len = READ_CHUNK_LEN;
@@ -149,13 +202,13 @@ int rgw_get_system_obj(RGWRados *rgwstore, RGWSysObjectCtx& obj_ctx, const rgw_p
     int ret = rop.set_attrs(pattrs)
                  .set_last_mod(pmtime)
                  .set_objv_tracker(objv_tracker)
-                 .stat();
+                 .stat(y);
     if (ret < 0)
       return ret;
 
     ret = rop.set_cache_info(cache_info)
              .set_refresh_version(refresh_version)
-             .read(&bl);
+             .read(&bl, y);
     if (ret == -ECANCELED) {
       /* raced, restart */
       if (!original_readv.empty()) {
@@ -180,15 +233,15 @@ int rgw_get_system_obj(RGWRados *rgwstore, RGWSysObjectCtx& obj_ctx, const rgw_p
   return 0;
 }
 
-int rgw_delete_system_obj(RGWRados *rgwstore, const rgw_pool& pool, const string& oid,
+int rgw_delete_system_obj(RGWSI_SysObj *sysobj_svc, const rgw_pool& pool, const string& oid,
                           RGWObjVersionTracker *objv_tracker)
 {
-  auto obj_ctx = rgwstore->svc.sysobj->init_obj_ctx();
+  auto obj_ctx = sysobj_svc->init_obj_ctx();
   auto sysobj = obj_ctx.get_obj(rgw_raw_obj{pool, oid});
   rgw_raw_obj obj(pool, oid);
   return sysobj.wop()
                .set_objv_tracker(objv_tracker)
-               .remove();
+               .remove(null_yield);
 }
 
 thread_local bool is_asio_thread = false;
@@ -236,6 +289,29 @@ int rgw_rados_operate(librados::IoCtx& ioctx, const std::string& oid,
   return ioctx.operate(oid, op);
 }
 
+int rgw_rados_notify(librados::IoCtx& ioctx, const std::string& oid,
+                     bufferlist& bl, uint64_t timeout_ms, bufferlist* pbl,
+                     optional_yield y)
+{
+#ifdef HAVE_BOOST_CONTEXT
+  if (y) {
+    auto& context = y.get_io_context();
+    auto& yield = y.get_yield_context();
+    boost::system::error_code ec;
+    auto reply = librados::async_notify(context, ioctx, oid,
+                                        bl, timeout_ms, yield[ec]);
+    if (pbl) {
+      *pbl = std::move(reply);
+    }
+    return -ec.value();
+  }
+  if (is_asio_thread) {
+    dout(20) << "WARNING: blocking librados call" << dendl;
+  }
+#endif
+  return ioctx.notify2(oid, bl, timeout_ms, pbl);
+}
+
 void parse_mime_map_line(const char *start, const char *end)
 {
   char line[end - start + 1];
@@ -342,9 +418,9 @@ void rgw_filter_attrset(map<string, bufferlist>& unfiltered_attrset, const strin
   }
 }
 
-RGWDataAccess::RGWDataAccess(RGWRados *_store) : store(_store)
+RGWDataAccess::RGWDataAccess(rgw::sal::RGWRadosStore *_store) : store(_store)
 {
-  sysobj_ctx = std::make_unique<RGWSysObjectCtx>(store->svc.sysobj->init_obj_ctx());
+  sysobj_ctx = std::make_unique<RGWSysObjectCtx>(store->svc()->sysobj->init_obj_ctx());
 }
 
 
@@ -367,10 +443,11 @@ int RGWDataAccess::Bucket::finish_init()
 
 int RGWDataAccess::Bucket::init()
 {
-  int ret = sd->store->get_bucket_info(*sd->sysobj_ctx,
+  int ret = sd->store->getRados()->get_bucket_info(sd->store->svc(),
                                       tenant, name,
                                       bucket_info,
                                       &mtime,
+                                       null_yield,
                                       &attrs);
   if (ret < 0) {
     return ret;
@@ -395,9 +472,11 @@ int RGWDataAccess::Bucket::get_object(const rgw_obj_key& key,
 }
 
 int RGWDataAccess::Object::put(bufferlist& data,
-                              map<string, bufferlist>& attrs)
+                              map<string, bufferlist>& attrs,
+                               const DoutPrefixProvider *dpp,
+                               optional_yield y)
 {
-  RGWRados *store = sd->store;
+  rgw::sal::RGWRadosStore *store = sd->store;
   CephContext *cct = store->ctx();
 
   string tag;
@@ -405,33 +484,30 @@ int RGWDataAccess::Object::put(bufferlist& data,
 
   RGWBucketInfo& bucket_info = bucket->bucket_info;
 
-  using namespace rgw::putobj;
-  rgw::AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
+  rgw::BlockingAioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
 
   RGWObjectCtx obj_ctx(store);
   rgw_obj obj(bucket_info.bucket, key);
 
   auto& owner = bucket->policy.get_owner();
 
-  string req_id = store->svc.zone_utils->unique_id(store->get_new_req_id());
+  string req_id = store->svc()->zone_utils->unique_id(store->getRados()->get_new_req_id());
 
-  AtomicObjectProcessor processor(&aio, store, bucket_info,
-                                  nullptr,
-                                  owner.get_id(),
-                                  obj_ctx, obj, olh_epoch, req_id);
+  using namespace rgw::putobj;
+  AtomicObjectProcessor processor(&aio, store, bucket_info, nullptr,
+                                  owner.get_id(), obj_ctx, obj, olh_epoch,
+                                  req_id, dpp, y);
 
-  int ret = processor.prepare();
+  int ret = processor.prepare(y);
   if (ret < 0)
     return ret;
 
-  using namespace rgw::putobj;
-
   DataProcessor *filter = &processor;
 
   CompressorRef plugin;
   boost::optional<RGWPutObj_Compress> compressor;
 
-  const auto& compression_type = store->svc.zone->get_zone_params().get_compression_type(bucket_info.placement_rule);
+  const auto& compression_type = store->svc()->zone->get_zone_params().get_compression_type(bucket_info.placement_rule);
   if (compression_type != "none") {
     plugin = Compressor::create(store->ctx(), compression_type);
     if (!plugin) {
@@ -504,7 +580,7 @@ int RGWDataAccess::Object::put(bufferlist& data,
                            attrs, delete_at,
                             nullptr, nullptr,
                             puser_data,
-                            nullptr, nullptr);
+                            nullptr, nullptr, y);
 }
 
 void RGWDataAccess::Object::set_policy(const RGWAccessControlPolicy& policy)