]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/cls/rgw/cls_rgw_client.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / cls / rgw / cls_rgw_client.h
index 0eca7345dc65044e971acfabbd54222bb19ae473..356b873a4f126c8777acee64ce8da2cb0f88027d 100644 (file)
@@ -1,8 +1,7 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
 
-#ifndef CEPH_CLS_RGW_CLIENT_H
-#define CEPH_CLS_RGW_CLIENT_H
+#pragma once
 
 #include "include/str_list.h"
 #include "include/rados/librados.hpp"
@@ -14,6 +13,7 @@
 #include "common/ceph_time.h"
 #include "common/ceph_mutex.h"
 
+
 // Forward declaration
 class BucketIndexAioManager;
 /*
@@ -28,16 +28,33 @@ struct BucketIndexAioArg : public RefCountedObject {
 };
 
 /*
- * This class manages AIO completions. This class is not completely thread-safe,
- * methods like *get_next* is not thread-safe and is expected to be called from
- * within one thread.
+ * This class manages AIO completions. This class is not completely
+ * thread-safe, methods like *get_next_request_id* is not thread-safe
+ * and is expected to be called from within one thread.
  */
 class BucketIndexAioManager {
+public:
+
+  // allows us to reaccess the shard id and shard's oid during and
+  // after the asynchronous call is made
+  struct RequestObj {
+    int shard_id;
+    std::string oid;
+
+    RequestObj(int _shard_id, const std::string& _oid) :
+      shard_id(_shard_id), oid(_oid)
+    {/* empty */}
+  };
+
+
 private:
+  // NB: the following 4 maps use the request_id as the key; this
+  // is not the same as the shard_id!
   std::map<int, librados::AioCompletion*> pendings;
   std::map<int, librados::AioCompletion*> completions;
-  std::map<int, std::string> pending_objs;
-  std::map<int, std::string> completion_objs;
+  std::map<int, const RequestObj> pending_objs;
+  std::map<int, const RequestObj> completion_objs;
+
   int next = 0;
   ceph::mutex lock = ceph::make_mutex("BucketIndexAioManager::lock");
   ceph::condition_variable cond;
@@ -55,8 +72,8 @@ private:
    *
    * Return next request ID.
    */
-  int get_next() { return next++; }
-    
+  int get_next_request_id() { return next++; }
+
   /*
    * Add a new pending AIO completion instance.
    *
@@ -65,10 +82,11 @@ private:
    * @param oid        - the object id associated with the object, if it is NULL, we don't
    *                     track the object id per callback.
    */
-  void add_pending(int id, librados::AioCompletion* completion, const std::string& oid) {
-    pendings[id] = completion;
-    pending_objs[id] = oid;
+  void add_pending(int request_id, librados::AioCompletion* completion, const int shard_id, const std::string& oid) {
+    pendings[request_id] = completion;
+    pending_objs.emplace(request_id, RequestObj(shard_id, oid));
   }
+
 public:
   /*
    * Create a new instance.
@@ -78,7 +96,7 @@ public:
   /*
    * Do completion for the given AIO request.
    */
-  void do_completion(int id);
+  void do_completion(int request_id);
 
   /*
    * Wait for AIO completions.
@@ -90,19 +108,23 @@ public:
    *
    * Return false if there is no pending AIO, true otherwise.
    */
-  bool wait_for_completions(int valid_ret_code, int *num_completions, int *ret_code,
-      std::map<int, std::string> *objs);
+  bool wait_for_completions(int valid_ret_code,
+                           int *num_completions = nullptr,
+                           int *ret_code = nullptr,
+                           std::map<int, std::string> *completed_objs = nullptr,
+                           std::map<int, std::string> *retry_objs = nullptr);
 
   /**
    * Do aio read operation.
    */
-  bool aio_operate(librados::IoCtx& io_ctx, const std::string& oid, librados::ObjectReadOperation *op) {
+  bool aio_operate(librados::IoCtx& io_ctx, const int shard_id, const std::string& oid, librados::ObjectReadOperation *op) {
     std::lock_guard l{lock};
-    BucketIndexAioArg *arg = new BucketIndexAioArg(get_next(), this);
+    const int request_id = get_next_request_id();
+    BucketIndexAioArg *arg = new BucketIndexAioArg(request_id, this);
     librados::AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, bucket_index_op_completion_cb);
     int r = io_ctx.aio_operate(oid, c, (librados::ObjectReadOperation*)op, NULL);
     if (r >= 0) {
-      add_pending(arg->id, c, oid);
+      add_pending(arg->id, c, shard_id, oid);
     } else {
       arg->put();
       c->release();
@@ -113,13 +135,14 @@ public:
   /**
    * Do aio write operation.
    */
-  bool aio_operate(librados::IoCtx& io_ctx, const std::string& oid, librados::ObjectWriteOperation *op) {
+  bool aio_operate(librados::IoCtx& io_ctx, const int shard_id, const std::string& oid, librados::ObjectWriteOperation *op) {
     std::lock_guard l{lock};
-    BucketIndexAioArg *arg = new BucketIndexAioArg(get_next(), this);
+    const int request_id = get_next_request_id();
+    BucketIndexAioArg *arg = new BucketIndexAioArg(request_id, this);
     librados::AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, bucket_index_op_completion_cb);
     int r = io_ctx.aio_operate(oid, c, (librados::ObjectWriteOperation*)op);
     if (r >= 0) {
-      add_pending(arg->id, c, oid);
+      add_pending(arg->id, c, shard_id, oid);
     } else {
       arg->put();
       c->release();
@@ -243,8 +266,12 @@ void cls_rgw_bucket_init_index(librados::ObjectWriteOperation& o);
 class CLSRGWConcurrentIO {
 protected:
   librados::IoCtx& io_ctx;
+
+  // map of shard # to oid; the shards that are remaining to be processed
   std::map<int, std::string>& objs_container;
+  // iterator to work through objs_container
   std::map<int, std::string>::iterator iter;
+
   uint32_t max_aio;
   BucketIndexAioManager manager;
 
@@ -270,51 +297,9 @@ public:
   virtual ~CLSRGWConcurrentIO()
   {}
 
-  int operator()() {
-    int ret = 0;
-    iter = objs_container.begin();
-    for (; iter != objs_container.end() && max_aio-- > 0; ++iter) {
-      ret = issue_op(iter->first, iter->second);
-      if (ret < 0)
-        break;
-    }
+  int operator()();
+}; // class CLSRGWConcurrentIO
 
-    int num_completions = 0, r = 0;
-    std::map<int, std::string> objs;
-    std::map<int, std::string> *pobjs = (need_multiple_rounds() ? &objs : NULL);
-    while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r, pobjs)) {
-      if (r >= 0 && ret >= 0) {
-        for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) {
-          int issue_ret = issue_op(iter->first, iter->second);
-          if (issue_ret < 0) {
-            ret = issue_ret;
-            break;
-          }
-        }
-      } else if (ret >= 0) {
-        ret = r;
-      }
-      if (need_multiple_rounds() && iter == objs_container.end() && !objs.empty()) {
-        // For those objects which need another round, use them to reset
-        // the container
-        reset_container(objs);
-        iter = objs_container.begin();
-        for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) {
-          int issue_ret = issue_op(iter->first, iter->second);
-          if (issue_ret < 0) {
-            ret = issue_ret;
-            break;
-          }
-        }
-      }
-    }
-
-    if (ret < 0) {
-      cleanup();
-    }
-    return ret;
-  }
-};
 
 class CLSRGWIssueBucketIndexInit : public CLSRGWConcurrentIO {
 protected:
@@ -322,8 +307,9 @@ protected:
   int valid_ret_code() override { return -EEXIST; }
   void cleanup() override;
 public:
-  CLSRGWIssueBucketIndexInit(librados::IoCtx& ioc, std::map<int, std::string>& _bucket_objs,
-                     uint32_t _max_aio) :
+  CLSRGWIssueBucketIndexInit(librados::IoCtx& ioc,
+                            std::map<int, std::string>& _bucket_objs,
+                            uint32_t _max_aio) :
     CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {}
 };
 
@@ -358,16 +344,16 @@ void cls_rgw_bucket_update_stats(librados::ObjectWriteOperation& o,
                                  bool absolute,
                                  const std::map<RGWObjCategory, rgw_bucket_category_stats>& stats);
 
-void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, RGWModifyOp op, std::string& tag,
+void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, RGWModifyOp op, const std::string& tag,
                                const cls_rgw_obj_key& key, const std::string& locator, bool log_op,
-                               uint16_t bilog_op, rgw_zone_set& zones_trace);
+                               uint16_t bilog_op, const rgw_zone_set& zones_trace);
 
-void cls_rgw_bucket_complete_op(librados::ObjectWriteOperation& o, RGWModifyOp op, std::string& tag,
-                                rgw_bucket_entry_ver& ver,
+void cls_rgw_bucket_complete_op(librados::ObjectWriteOperation& o, RGWModifyOp op, const std::string& tag,
+                                const rgw_bucket_entry_ver& ver,
                                 const cls_rgw_obj_key& key,
-                                rgw_bucket_dir_entry_meta& dir_meta,
-                               std::list<cls_rgw_obj_key> *remove_objs, bool log_op,
-                                uint16_t bilog_op, rgw_zone_set *zones_trace);
+                                const rgw_bucket_dir_entry_meta& dir_meta,
+                               const std::list<cls_rgw_obj_key> *remove_objs, bool log_op,
+                                uint16_t bilog_op, const rgw_zone_set *zones_trace);
 
 void cls_rgw_remove_obj(librados::ObjectWriteOperation& o, std::list<std::string>& keep_attr_prefixes);
 void cls_rgw_obj_store_pg_ver(librados::ObjectWriteOperation& o, const std::string& attr);
@@ -375,22 +361,22 @@ void cls_rgw_obj_check_attrs_prefix(librados::ObjectOperation& o, const std::str
 void cls_rgw_obj_check_mtime(librados::ObjectOperation& o, const ceph::real_time& mtime, bool high_precision_time, RGWCheckMTimeType type);
 
 int cls_rgw_bi_get(librados::IoCtx& io_ctx, const std::string oid,
-                   BIIndexType index_type, cls_rgw_obj_key& key,
+                   BIIndexType index_type, const cls_rgw_obj_key& key,
                    rgw_cls_bi_entry *entry);
-int cls_rgw_bi_put(librados::IoCtx& io_ctx, const std::string oid, rgw_cls_bi_entry& entry);
-void cls_rgw_bi_put(librados::ObjectWriteOperation& op, const std::string oid, rgw_cls_bi_entry& entry);
-int cls_rgw_bi_list(librados::IoCtx& io_ctx, const std::string oid,
+int cls_rgw_bi_put(librados::IoCtx& io_ctx, const std::string oid, const rgw_cls_bi_entry& entry);
+void cls_rgw_bi_put(librados::ObjectWriteOperation& op, const std::string oid, const rgw_cls_bi_entry& entry);
+int cls_rgw_bi_list(librados::IoCtx& io_ctx, const std::string& oid,
                    const std::string& name, const std::string& marker, uint32_t max,
                    std::list<rgw_cls_bi_entry> *entries, bool *is_truncated);
 
 
 void cls_rgw_bucket_link_olh(librados::ObjectWriteOperation& op,
-                            const cls_rgw_obj_key& key, ceph::buffer::list& olh_tag,
-                            bool delete_marker, const std::string& op_tag, rgw_bucket_dir_entry_meta *meta,
-                            uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time, bool log_op, rgw_zone_set& zones_trace);
+                            const cls_rgw_obj_key& key, const ceph::buffer::list& olh_tag,
+                            bool delete_marker, const std::string& op_tag, const rgw_bucket_dir_entry_meta *meta,
+                            uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time, bool log_op, const rgw_zone_set& zones_trace);
 void cls_rgw_bucket_unlink_instance(librados::ObjectWriteOperation& op,
                                    const cls_rgw_obj_key& key, const std::string& op_tag,
-                                   const std::string& olh_tag, uint64_t olh_epoch, bool log_op, rgw_zone_set& zones_trace);
+                                   const std::string& olh_tag, uint64_t olh_epoch, bool log_op, const rgw_zone_set& zones_trace);
 void cls_rgw_get_olh_log(librados::ObjectReadOperation& op, const cls_rgw_obj_key& olh, uint64_t ver_marker, const std::string& olh_tag, rgw_cls_read_olh_log_ret& log_ret, int& op_ret);
 void cls_rgw_trim_olh_log(librados::ObjectWriteOperation& op, const cls_rgw_obj_key& olh, uint64_t ver, const std::string& olh_tag);
 void cls_rgw_clear_olh(librados::ObjectWriteOperation& op, const cls_rgw_obj_key& olh, const std::string& olh_tag);
@@ -399,12 +385,12 @@ void cls_rgw_clear_olh(librados::ObjectWriteOperation& op, const cls_rgw_obj_key
 // rgw_rados_operate() should be called after the overloads w/o calls to io_ctx.operate()
 #ifndef CLS_CLIENT_HIDE_IOCTX
 int cls_rgw_bucket_link_olh(librados::IoCtx& io_ctx, const std::string& oid,
-                            const cls_rgw_obj_key& key, ceph::buffer::list& olh_tag,
-                            bool delete_marker, const std::string& op_tag, rgw_bucket_dir_entry_meta *meta,
-                            uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time, bool log_op, rgw_zone_set& zones_trace);
+                            const cls_rgw_obj_key& key, const ceph::buffer::list& olh_tag,
+                            bool delete_marker, const std::string& op_tag, const rgw_bucket_dir_entry_meta *meta,
+                            uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time, bool log_op, const rgw_zone_set& zones_trace);
 int cls_rgw_bucket_unlink_instance(librados::IoCtx& io_ctx, const std::string& oid,
                                    const cls_rgw_obj_key& key, const std::string& op_tag,
-                                   const std::string& olh_tag, uint64_t olh_epoch, bool log_op, rgw_zone_set& zones_trace);
+                                   const std::string& olh_tag, uint64_t olh_epoch, bool log_op, const rgw_zone_set& zones_trace);
 int cls_rgw_get_olh_log(librados::IoCtx& io_ctx, std::string& oid, const cls_rgw_obj_key& olh, uint64_t ver_marker,
                         const std::string& olh_tag, rgw_cls_read_olh_log_ret& log_ret);
 int cls_rgw_clear_olh(librados::IoCtx& io_ctx, std::string& oid, const cls_rgw_obj_key& olh, const std::string& olh_tag);
@@ -436,9 +422,12 @@ class CLSRGWIssueBucketList : public CLSRGWConcurrentIO {
   std::string delimiter;
   uint32_t num_entries;
   bool list_versions;
-  std::map<int, rgw_cls_list_ret>& result;
+  std::map<int, rgw_cls_list_ret>& result; // request_id -> return value
+
 protected:
   int issue_op(int shard_id, const std::string& oid) override;
+  void reset_container(std::map<int, std::string>& objs) override;
+
 public:
   CLSRGWIssueBucketList(librados::IoCtx& io_ctx,
                        const cls_rgw_obj_key& _start_obj,
@@ -446,7 +435,8 @@ public:
                        const std::string& _delimiter,
                        uint32_t _num_entries,
                         bool _list_versions,
-                        std::map<int, std::string>& oids,
+                        std::map<int, std::string>& oids, // shard_id -> shard_oid
+                       // shard_id -> return value
                         std::map<int, rgw_cls_list_ret>& list_results,
                         uint32_t max_aio) :
   CLSRGWConcurrentIO(io_ctx, oids, max_aio),
@@ -609,13 +599,13 @@ int cls_rgw_gc_list(librados::IoCtx& io_ctx, std::string& oid, std::string& mark
 #ifndef CLS_CLIENT_HIDE_IOCTX
 int cls_rgw_lc_get_head(librados::IoCtx& io_ctx, const std::string& oid, cls_rgw_lc_obj_head& head);
 int cls_rgw_lc_put_head(librados::IoCtx& io_ctx, const std::string& oid, cls_rgw_lc_obj_head& head);
-int cls_rgw_lc_get_next_entry(librados::IoCtx& io_ctx, const std::string& oid, string& marker, cls_rgw_lc_entry& entry);
+int cls_rgw_lc_get_next_entry(librados::IoCtx& io_ctx, const std::string& oid, std::string& marker, cls_rgw_lc_entry& entry);
 int cls_rgw_lc_rm_entry(librados::IoCtx& io_ctx, const std::string& oid, const cls_rgw_lc_entry& entry);
 int cls_rgw_lc_set_entry(librados::IoCtx& io_ctx, const std::string& oid, const cls_rgw_lc_entry& entry);
 int cls_rgw_lc_get_entry(librados::IoCtx& io_ctx, const std::string& oid, const std::string& marker, cls_rgw_lc_entry& entry);
 int cls_rgw_lc_list(librados::IoCtx& io_ctx, const std::string& oid,
                    const std::string& marker, uint32_t max_entries,
-                    vector<cls_rgw_lc_entry>& entries);
+                    std::vector<cls_rgw_lc_entry>& entries);
 #endif
 
 /* resharding */
@@ -640,5 +630,3 @@ int cls_rgw_clear_bucket_resharding(librados::IoCtx& io_ctx, const std::string&
 int cls_rgw_get_bucket_resharding(librados::IoCtx& io_ctx, const std::string& oid,
                                   cls_rgw_bucket_instance_entry *entry);
 #endif
-
-#endif