]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/cls/rgw/cls_rgw_client.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / cls / rgw / cls_rgw_client.cc
index 95d6ffbc16a84ee00590a6054a1ba350e6ff454d..4667de8994d5a70d98348ddc497065d9e2d7d22f 100644 (file)
@@ -21,6 +21,67 @@ using namespace librados;
 const string BucketIndexShardsManager::KEY_VALUE_SEPARATOR = "#";
 const string BucketIndexShardsManager::SHARDS_SEPARATOR = ",";
 
+
+int CLSRGWConcurrentIO::operator()() {
+  int ret = 0;
+  iter = objs_container.begin();
+  for (; iter != objs_container.end() && max_aio-- > 0; ++iter) {
+    ret = issue_op(iter->first, iter->second);
+    if (ret < 0)
+      break;
+  }
+
+  int num_completions = 0, r = 0;
+  std::map<int, std::string> completed_objs;
+  std::map<int, std::string> retry_objs;
+  while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r,
+                                     need_multiple_rounds() ? &completed_objs : nullptr,
+                                     !need_multiple_rounds() ? &retry_objs : nullptr)) {
+    if (r >= 0 && ret >= 0) {
+      for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) {
+       int issue_ret = issue_op(iter->first, iter->second);
+       if (issue_ret < 0) {
+         ret = issue_ret;
+         break;
+       }
+      }
+    } else if (ret >= 0) {
+      ret = r;
+    }
+
+    // if we're at the end with this round, see if another round is needed
+    if (iter == objs_container.end()) {
+      if (need_multiple_rounds() && !completed_objs.empty()) {
+       // For those objects which need another round, use them to reset
+       // the container
+       reset_container(completed_objs);
+       iter = objs_container.begin();
+      } else if (! need_multiple_rounds() && !retry_objs.empty()) {
+       reset_container(retry_objs);
+       iter = objs_container.begin();
+      }
+
+      // re-issue ops if container was reset above (i.e., iter !=
+      // objs_container.end()); if it was not reset above (i.e., iter
+      // == objs_container.end()) the loop will exit immediately
+      // without iterating
+      for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) {
+       int issue_ret = issue_op(iter->first, iter->second);
+       if (issue_ret < 0) {
+         ret = issue_ret;
+         break;
+       }
+      }
+    }
+  }
+
+  if (ret < 0) {
+    cleanup();
+  }
+  return ret;
+} // CLSRGWConcurrintIO::operator()()
+
+
 /**
  * This class represents the bucket index object operation callback context.
  */
@@ -33,7 +94,9 @@ public:
   ClsBucketIndexOpCtx(T* _data, int *_ret_code) : data(_data), ret_code(_ret_code) { ceph_assert(data); }
   ~ClsBucketIndexOpCtx() override {}
   void handle_completion(int r, bufferlist& outbl) override {
-    if (r >= 0) {
+    // if successful, or we're asked for a retry, copy result into
+    // destination (*data)
+    if (r >= 0 || r == RGWBIAdvanceAndRetryError) {
       try {
         auto iter = outbl.cbegin();
         decode((*data), iter);
@@ -47,19 +110,19 @@ public:
   }
 };
 
-void BucketIndexAioManager::do_completion(int id) {
+void BucketIndexAioManager::do_completion(const int request_id) {
   std::lock_guard l{lock};
 
-  auto iter = pendings.find(id);
+  auto iter = pendings.find(request_id);
   ceph_assert(iter != pendings.end());
-  completions[id] = iter->second;
+  completions[request_id] = iter->second;
   pendings.erase(iter);
 
   // If the caller needs a list of finished objects, store them
   // for further processing
-  auto miter = pending_objs.find(id);
+  auto miter = pending_objs.find(request_id);
   if (miter != pending_objs.end()) {
-    completion_objs[id] = miter->second;
+    completion_objs.emplace(request_id, miter->second);
     pending_objs.erase(miter);
   }
 
@@ -67,7 +130,11 @@ void BucketIndexAioManager::do_completion(int id) {
 }
 
 bool BucketIndexAioManager::wait_for_completions(int valid_ret_code,
-    int *num_completions, int *ret_code, map<int, string> *objs) {
+                                                int *num_completions,
+                                                int *ret_code,
+                                                std::map<int, std::string> *completed_objs,
+                                                std::map<int, std::string> *retry_objs)
+{
   std::unique_lock locker{lock};
   if (pendings.empty() && completions.empty()) {
     return false;
@@ -82,18 +149,38 @@ bool BucketIndexAioManager::wait_for_completions(int valid_ret_code,
   auto iter = completions.begin();
   for (; iter != completions.end(); ++iter) {
     int r = iter->second->get_return_value();
-    if (objs && r == 0) { /* update list of successfully completed objs */
+
+    // see if we may need to copy completions or retries
+    if (completed_objs || retry_objs) {
       auto liter = completion_objs.find(iter->first);
       if (liter != completion_objs.end()) {
-        (*objs)[liter->first] = liter->second;
+       if (completed_objs && r == 0) { /* update list of successfully completed objs */
+         (*completed_objs)[liter->second.shard_id] = liter->second.oid;
+       }
+
+       if (r == RGWBIAdvanceAndRetryError) {
+         r = 0;
+         if (retry_objs) {
+           (*retry_objs)[liter->second.shard_id] = liter->second.oid;
+         }
+       }
+      } else {
+       // NB: should we log an error here; currently no logging
+       // context to use
       }
     }
-    if (ret_code && (r < 0 && r != valid_ret_code))
+
+    if (ret_code && (r < 0 && r != valid_ret_code)) {
       (*ret_code) = r;
+    }
+
     iter->second->release();
   }
-  if (num_completions)
+
+  if (num_completions) {
     (*num_completions) = completions.size();
+  }
+
   completions.clear();
 
   return true;
@@ -107,38 +194,43 @@ void cls_rgw_bucket_init_index(ObjectWriteOperation& o)
 }
 
 static bool issue_bucket_index_init_op(librados::IoCtx& io_ctx,
+                                      const int shard_id,
                                       const string& oid,
                                       BucketIndexAioManager *manager) {
   bufferlist in;
   librados::ObjectWriteOperation op;
   op.create(true);
   op.exec(RGW_CLASS, RGW_BUCKET_INIT_INDEX, in);
-  return manager->aio_operate(io_ctx, oid, &op);
+  return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
 static bool issue_bucket_index_clean_op(librados::IoCtx& io_ctx,
+                                       const int shard_id,
                                        const string& oid,
                                        BucketIndexAioManager *manager) {
   bufferlist in;
   librados::ObjectWriteOperation op;
   op.remove();
-  return manager->aio_operate(io_ctx, oid, &op);
+  return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
 static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx,
-    const string& oid, uint64_t timeout, BucketIndexAioManager *manager) {
+                                           const int shard_id,
+                                           const string& oid,
+                                           uint64_t timeout,
+                                           BucketIndexAioManager *manager) {
   bufferlist in;
   rgw_cls_tag_timeout_op call;
   call.tag_timeout = timeout;
   encode(call, in);
   ObjectWriteOperation op;
   op.exec(RGW_CLASS, RGW_BUCKET_SET_TAG_TIMEOUT, in);
-  return manager->aio_operate(io_ctx, oid, &op);
+  return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
-int CLSRGWIssueBucketIndexInit::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueBucketIndexInit::issue_op(const int shard_id, const string& oid)
 {
-  return issue_bucket_index_init_op(io_ctx, oid, &manager);
+  return issue_bucket_index_init_op(io_ctx, shard_id, oid, &manager);
 }
 
 void CLSRGWIssueBucketIndexInit::cleanup()
@@ -149,14 +241,14 @@ void CLSRGWIssueBucketIndexInit::cleanup()
   }
 }
 
-int CLSRGWIssueBucketIndexClean::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueBucketIndexClean::issue_op(const int shard_id, const string& oid)
 {
-  return issue_bucket_index_clean_op(io_ctx, oid, &manager);
+  return issue_bucket_index_clean_op(io_ctx, shard_id, oid, &manager);
 }
 
-int CLSRGWIssueSetTagTimeout::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueSetTagTimeout::issue_op(const int shard_id, const string& oid)
 {
-  return issue_bucket_set_tag_timeout_op(io_ctx, oid, tag_timeout, &manager);
+  return issue_bucket_set_tag_timeout_op(io_ctx, shard_id, oid, tag_timeout, &manager);
 }
 
 void cls_rgw_bucket_update_stats(librados::ObjectWriteOperation& o,
@@ -171,9 +263,9 @@ void cls_rgw_bucket_update_stats(librados::ObjectWriteOperation& o,
   o.exec(RGW_CLASS, RGW_BUCKET_UPDATE_STATS, in);
 }
 
-void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, RGWModifyOp op, string& tag,
+void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, RGWModifyOp op, const string& tag,
                                const cls_rgw_obj_key& key, const string& locator, bool log_op,
-                               uint16_t bilog_flags, rgw_zone_set& zones_trace)
+                               uint16_t bilog_flags, const rgw_zone_set& zones_trace)
 {
   rgw_cls_obj_prepare_op call;
   call.op = op;
@@ -188,13 +280,13 @@ void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, RGWModifyOp op, string&
   o.exec(RGW_CLASS, RGW_BUCKET_PREPARE_OP, in);
 }
 
-void cls_rgw_bucket_complete_op(ObjectWriteOperation& o, RGWModifyOp op, string& tag,
-                                rgw_bucket_entry_ver& ver,
+void cls_rgw_bucket_complete_op(ObjectWriteOperation& o, RGWModifyOp op, const string& tag,
+                                const rgw_bucket_entry_ver& ver,
                                 const cls_rgw_obj_key& key,
-                                rgw_bucket_dir_entry_meta& dir_meta,
-                               list<cls_rgw_obj_key> *remove_objs, bool log_op,
+                                const rgw_bucket_dir_entry_meta& dir_meta,
+                               const list<cls_rgw_obj_key> *remove_objs, bool log_op,
                                 uint16_t bilog_flags,
-                                rgw_zone_set *zones_trace)
+                                const rgw_zone_set *zones_trace)
 {
 
   bufferlist in;
@@ -237,10 +329,11 @@ void cls_rgw_bucket_list_op(librados::ObjectReadOperation& op,
 }
 
 static bool issue_bucket_list_op(librados::IoCtx& io_ctx,
-                                const string& oid,
+                                const int shard_id,
+                                const std::string& oid,
                                 const cls_rgw_obj_key& start_obj,
-                                const string& filter_prefix,
-                                const string& delimiter,
+                                const std::string& filter_prefix,
+                                const std::string& delimiter,
                                 uint32_t num_entries,
                                 bool list_versions,
                                 BucketIndexAioManager *manager,
@@ -250,17 +343,39 @@ static bool issue_bucket_list_op(librados::IoCtx& io_ctx,
   cls_rgw_bucket_list_op(op,
                         start_obj, filter_prefix, delimiter,
                          num_entries, list_versions, pdata);
-  return manager->aio_operate(io_ctx, oid, &op);
-}
+  return manager->aio_operate(io_ctx, shard_id, oid, &op);
+}
+
+int CLSRGWIssueBucketList::issue_op(const int shard_id, const string& oid)
+{
+  // set the marker depending on whether we've already queried this
+  // shard and gotten a RGWBIAdvanceAndRetryError (defined
+  // constant) return value; if we have use the marker in the return
+  // to advance the search, otherwise use the marker passed in by the
+  // caller
+  cls_rgw_obj_key marker;
+  auto iter = result.find(shard_id);
+  if (iter != result.end()) {
+    marker = iter->second.marker;
+  } else {
+    marker = start_obj;
+  }
 
-int CLSRGWIssueBucketList::issue_op(int shard_id, const string& oid)
-{
-  return issue_bucket_list_op(io_ctx, oid,
-                             start_obj, filter_prefix, delimiter,
+  return issue_bucket_list_op(io_ctx, shard_id, oid,
+                             marker, filter_prefix, delimiter,
                              num_entries, list_versions, &manager,
                              &result[shard_id]);
 }
 
+
+void CLSRGWIssueBucketList::reset_container(std::map<int, std::string>& objs)
+{
+  objs_container.swap(objs);
+  iter = objs_container.begin();
+  objs.clear();
+}
+
+
 void cls_rgw_remove_obj(librados::ObjectWriteOperation& o, list<string>& keep_attr_prefixes)
 {
   bufferlist in;
@@ -301,7 +416,7 @@ void cls_rgw_obj_check_mtime(librados::ObjectOperation& o, const real_time& mtim
 }
 
 int cls_rgw_bi_get(librados::IoCtx& io_ctx, const string oid,
-                   BIIndexType index_type, cls_rgw_obj_key& key,
+                   BIIndexType index_type, const cls_rgw_obj_key& key,
                    rgw_cls_bi_entry *entry)
 {
   bufferlist in, out;
@@ -326,7 +441,7 @@ int cls_rgw_bi_get(librados::IoCtx& io_ctx, const string oid,
   return 0;
 }
 
-int cls_rgw_bi_put(librados::IoCtx& io_ctx, const string oid, rgw_cls_bi_entry& entry)
+int cls_rgw_bi_put(librados::IoCtx& io_ctx, const string oid, const rgw_cls_bi_entry& entry)
 {
   bufferlist in, out;
   rgw_cls_bi_put_op call;
@@ -339,7 +454,7 @@ int cls_rgw_bi_put(librados::IoCtx& io_ctx, const string oid, rgw_cls_bi_entry&
   return 0;
 }
 
-void cls_rgw_bi_put(ObjectWriteOperation& op, const string oid, rgw_cls_bi_entry& entry)
+void cls_rgw_bi_put(ObjectWriteOperation& op, const string oid, const rgw_cls_bi_entry& entry)
 {
   bufferlist in, out;
   rgw_cls_bi_put_op call;
@@ -348,13 +463,16 @@ void cls_rgw_bi_put(ObjectWriteOperation& op, const string oid, rgw_cls_bi_entry
   op.exec(RGW_CLASS, RGW_BI_PUT, in);
 }
 
-int cls_rgw_bi_list(librados::IoCtx& io_ctx, const string oid,
-                   const string& name, const string& marker, uint32_t max,
-                   list<rgw_cls_bi_entry> *entries, bool *is_truncated)
+/* nb: any entries passed in are replaced with the results of the cls
+ * call, so caller does not need to clear entries between calls
+ */
+int cls_rgw_bi_list(librados::IoCtx& io_ctx, const std::string& oid,
+                   const std::string& name_filter, const std::string& marker, uint32_t max,
+                   std::list<rgw_cls_bi_entry> *entries, bool *is_truncated)
 {
   bufferlist in, out;
   rgw_cls_bi_list_op call;
-  call.name = name;
+  call.name_filter = name_filter;
   call.marker = marker;
   call.max = max;
   encode(call, in);
@@ -376,10 +494,10 @@ int cls_rgw_bi_list(librados::IoCtx& io_ctx, const string oid,
   return 0;
 }
 
-int cls_rgw_bucket_link_olh(librados::IoCtx& io_ctx, const string& oid, 
-                            const cls_rgw_obj_key& key, bufferlist& olh_tag,
-                            bool delete_marker, const string& op_tag, rgw_bucket_dir_entry_meta *meta,
-                            uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time, bool log_op, rgw_zone_set& zones_trace)
+int cls_rgw_bucket_link_olh(librados::IoCtx& io_ctx, const string& oid,
+                            const cls_rgw_obj_key& key, const bufferlist& olh_tag,
+                            bool delete_marker, const string& op_tag, const rgw_bucket_dir_entry_meta *meta,
+                            uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time, bool log_op, const rgw_zone_set& zones_trace)
 {
   librados::ObjectWriteOperation op;
   cls_rgw_bucket_link_olh(op, key, olh_tag, delete_marker, op_tag, meta,
@@ -391,14 +509,14 @@ int cls_rgw_bucket_link_olh(librados::IoCtx& io_ctx, const string& oid,
 
 
 void cls_rgw_bucket_link_olh(librados::ObjectWriteOperation& op, const cls_rgw_obj_key& key,
-                            bufferlist& olh_tag, bool delete_marker,
-                            const string& op_tag, rgw_bucket_dir_entry_meta *meta,
-                            uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time, bool log_op, rgw_zone_set& zones_trace)
+                            const bufferlist& olh_tag, bool delete_marker,
+                            const string& op_tag, const rgw_bucket_dir_entry_meta *meta,
+                            uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time, bool log_op, const rgw_zone_set& zones_trace)
 {
   bufferlist in, out;
   rgw_cls_link_olh_op call;
   call.key = key;
-  call.olh_tag = string(olh_tag.c_str(), olh_tag.length());
+  call.olh_tag = olh_tag.to_str();
   call.op_tag = op_tag;
   call.delete_marker = delete_marker;
   if (meta) {
@@ -415,7 +533,7 @@ void cls_rgw_bucket_link_olh(librados::ObjectWriteOperation& op, const cls_rgw_o
 
 int cls_rgw_bucket_unlink_instance(librados::IoCtx& io_ctx, const string& oid,
                                    const cls_rgw_obj_key& key, const string& op_tag,
-                                   const string& olh_tag, uint64_t olh_epoch, bool log_op, rgw_zone_set& zones_trace)
+                                   const string& olh_tag, uint64_t olh_epoch, bool log_op, const rgw_zone_set& zones_trace)
 {
   librados::ObjectWriteOperation op;
   cls_rgw_bucket_unlink_instance(op, key, op_tag, olh_tag, olh_epoch, log_op, zones_trace);
@@ -428,7 +546,7 @@ int cls_rgw_bucket_unlink_instance(librados::IoCtx& io_ctx, const string& oid,
 
 void cls_rgw_bucket_unlink_instance(librados::ObjectWriteOperation& op,
                                    const cls_rgw_obj_key& key, const string& op_tag,
-                                   const string& olh_tag, uint64_t olh_epoch, bool log_op, rgw_zone_set& zones_trace)
+                                   const string& olh_tag, uint64_t olh_epoch, bool log_op, const rgw_zone_set& zones_trace)
 {
   bufferlist in, out;
   rgw_cls_unlink_instance_op call;
@@ -513,17 +631,17 @@ void cls_rgw_bilog_list(librados::ObjectReadOperation& op,
   op.exec(RGW_CLASS, RGW_BI_LOG_LIST, in, new ClsBucketIndexOpCtx<cls_rgw_bi_log_list_ret>(pdata, ret));
 }
 
-static bool issue_bi_log_list_op(librados::IoCtx& io_ctx, const string& oid, int shard_id,
+static bool issue_bi_log_list_op(librados::IoCtx& io_ctx, const string& oid, const int shard_id,
                                  BucketIndexShardsManager& marker_mgr, uint32_t max,
                                  BucketIndexAioManager *manager,
                                  cls_rgw_bi_log_list_ret *pdata)
 {
   librados::ObjectReadOperation op;
   cls_rgw_bilog_list(op, marker_mgr.get(shard_id, ""), max, pdata, nullptr);
-  return manager->aio_operate(io_ctx, oid, &op);
+  return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
-int CLSRGWIssueBILogList::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueBILogList::issue_op(const int shard_id, const string& oid)
 {
   return issue_bi_log_list_op(io_ctx, oid, shard_id, marker_mgr, max, &manager, &result[shard_id]);
 }
@@ -541,46 +659,46 @@ void cls_rgw_bilog_trim(librados::ObjectWriteOperation& op,
   op.exec(RGW_CLASS, RGW_BI_LOG_TRIM, in);
 }
 
-static bool issue_bi_log_trim(librados::IoCtx& io_ctx, const string& oid, int shard_id,
+static bool issue_bi_log_trim(librados::IoCtx& io_ctx, const string& oid, const int shard_id,
                               BucketIndexShardsManager& start_marker_mgr,
                               BucketIndexShardsManager& end_marker_mgr, BucketIndexAioManager *manager) {
   cls_rgw_bi_log_trim_op call;
   librados::ObjectWriteOperation op;
   cls_rgw_bilog_trim(op, start_marker_mgr.get(shard_id, ""),
                      end_marker_mgr.get(shard_id, ""));
-  return manager->aio_operate(io_ctx, oid, &op);
+  return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
-int CLSRGWIssueBILogTrim::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueBILogTrim::issue_op(const int shard_id, const string& oid)
 {
   return issue_bi_log_trim(io_ctx, oid, shard_id, start_marker_mgr, end_marker_mgr, &manager);
 }
 
-static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager,
+static bool issue_bucket_check_index_op(IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager,
     rgw_cls_check_index_ret *pdata) {
   bufferlist in;
   librados::ObjectReadOperation op;
   op.exec(RGW_CLASS, RGW_BUCKET_CHECK_INDEX, in, new ClsBucketIndexOpCtx<rgw_cls_check_index_ret>(
         pdata, NULL));
-  return manager->aio_operate(io_ctx, oid, &op);
+  return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
 int CLSRGWIssueBucketCheck::issue_op(int shard_id, const string& oid)
 {
-  return issue_bucket_check_index_op(io_ctx, oid, &manager, &result[shard_id]);
+  return issue_bucket_check_index_op(io_ctx, shard_id, oid, &manager, &result[shard_id]);
 }
 
-static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const string& oid,
+static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const int shard_id, const string& oid,
     BucketIndexAioManager *manager) {
   bufferlist in;
   librados::ObjectWriteOperation op;
   op.exec(RGW_CLASS, RGW_BUCKET_REBUILD_INDEX, in);
-  return manager->aio_operate(io_ctx, oid, &op);
+  return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
-int CLSRGWIssueBucketRebuild::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueBucketRebuild::issue_op(const int shard_id, const string& oid)
 {
-  return issue_bucket_rebuild_index_op(io_ctx, oid, &manager);
+  return issue_bucket_rebuild_index_op(io_ctx, shard_id, oid, &manager);
 }
 
 void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates)
@@ -594,40 +712,40 @@ void cls_rgw_suggest_changes(ObjectWriteOperation& o, bufferlist& updates)
   o.exec(RGW_CLASS, RGW_DIR_SUGGEST_CHANGES, updates);
 }
 
-int CLSRGWIssueGetDirHeader::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueGetDirHeader::issue_op(const int shard_id, const string& oid)
 {
   cls_rgw_obj_key empty_key;
   string empty_prefix;
   string empty_delimiter;
-  return issue_bucket_list_op(io_ctx, oid,
+  return issue_bucket_list_op(io_ctx, shard_id, oid,
                              empty_key, empty_prefix, empty_delimiter,
                              0, false, &manager, &result[shard_id]);
 }
 
-static bool issue_resync_bi_log(librados::IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager)
+static bool issue_resync_bi_log(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager)
 {
   bufferlist in;
   librados::ObjectWriteOperation op;
   op.exec(RGW_CLASS, RGW_BI_LOG_RESYNC, in);
-  return manager->aio_operate(io_ctx, oid, &op);
+  return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
-int CLSRGWIssueResyncBucketBILog::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueResyncBucketBILog::issue_op(const int shard_id, const string& oid)
 {
-  return issue_resync_bi_log(io_ctx, oid, &manager);
+  return issue_resync_bi_log(io_ctx, shard_id, oid, &manager);
 }
 
-static bool issue_bi_log_stop(librados::IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager)
+static bool issue_bi_log_stop(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager)
 {
   bufferlist in;
   librados::ObjectWriteOperation op;
   op.exec(RGW_CLASS, RGW_BI_LOG_STOP, in);
-  return manager->aio_operate(io_ctx, oid, &op); 
+  return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
-int CLSRGWIssueBucketBILogStop::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueBucketBILogStop::issue_op(const int shard_id, const string& oid)
 {
-  return issue_bi_log_stop(io_ctx, oid, &manager);
+  return issue_bi_log_stop(io_ctx, shard_id, oid, &manager);
 }
 
 class GetDirHeaderCompletion : public ObjectOperationCompletion {
@@ -1069,7 +1187,8 @@ void cls_rgw_guard_bucket_resharding(librados::ObjectOperation& op, int ret_err)
   op.exec(RGW_CLASS, RGW_GUARD_BUCKET_RESHARDING, in);
 }
 
-static bool issue_set_bucket_resharding(librados::IoCtx& io_ctx, const string& oid,
+static bool issue_set_bucket_resharding(librados::IoCtx& io_ctx,
+                                       const int shard_id, const string& oid,
                                         const cls_rgw_bucket_instance_entry& entry,
                                         BucketIndexAioManager *manager) {
   bufferlist in;
@@ -1078,10 +1197,10 @@ static bool issue_set_bucket_resharding(librados::IoCtx& io_ctx, const string& o
   encode(call, in);
   librados::ObjectWriteOperation op;
   op.exec(RGW_CLASS, RGW_SET_BUCKET_RESHARDING, in);
-  return manager->aio_operate(io_ctx, oid, &op);
+  return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
-int CLSRGWIssueSetBucketResharding::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueSetBucketResharding::issue_op(const int shard_id, const string& oid)
 {
-  return issue_set_bucket_resharding(io_ctx, oid, entry, &manager);
+  return issue_set_bucket_resharding(io_ctx, shard_id, oid, entry, &manager);
 }