]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_coroutine.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_coroutine.h
index f84232875616236f1daa611a964b6dfa16379776..1a9fa62cd32147469fba74bcbe224687e5770162 100644 (file)
@@ -1,3 +1,6 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
 #ifndef CEPH_RGW_COROUTINE_H
 #define CEPH_RGW_COROUTINE_H
 
@@ -20,6 +23,8 @@
 #include "common/admin_socket.h"
 
 #include "rgw_common.h"
+#include "rgw_http_client_types.h"
+
 #include <boost/asio/coroutine.hpp>
 
 #include <atomic>
@@ -31,32 +36,40 @@ class RGWCoroutinesManager;
 class RGWAioCompletionNotifier;
 
 class RGWCompletionManager : public RefCountedObject {
+  friend class RGWCoroutinesManager;
+
   CephContext *cct;
-  list<void *> complete_reqs;
+
+  struct io_completion {
+    rgw_io_id io_id;
+    void *user_info;
+  };
+  std::list<io_completion> complete_reqs;
+  std::set<rgw_io_id> complete_reqs_set;
   using NotifierRef = boost::intrusive_ptr<RGWAioCompletionNotifier>;
-  set<NotifierRef> cns;
+  std::set<NotifierRef> cns;
 
-  Mutex lock;
-  Cond cond;
+  ceph::mutex lock = ceph::make_mutex("RGWCompletionManager::lock");
+  ceph::condition_variable cond;
 
   SafeTimer timer;
 
   std::atomic<bool> going_down = { false };
 
-  map<void *, void *> waiters;
+  std::map<void *, void *> waiters;
 
   class WaitContext;
 
 protected:
   void _wakeup(void *opaque);
-  void _complete(RGWAioCompletionNotifier *cn, void *user_info);
+  void _complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info);
 public:
-  RGWCompletionManager(CephContext *_cct);
+  explicit RGWCompletionManager(CephContext *_cct);
   ~RGWCompletionManager() override;
 
-  void complete(RGWAioCompletionNotifier *cn, void *user_info);
-  int get_next(void **user_info);
-  bool try_get_next(void **user_info);
+  void complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info);
+  int get_next(io_completion *io);
+  bool try_get_next(io_completion *io);
 
   void go_down();
 
@@ -74,21 +87,22 @@ public:
 class RGWAioCompletionNotifier : public RefCountedObject {
   librados::AioCompletion *c;
   RGWCompletionManager *completion_mgr;
+  rgw_io_id io_id;
   void *user_data;
-  Mutex lock;
+  ceph::mutex lock = ceph::make_mutex("RGWAioCompletionNotifier");
   bool registered;
 
 public:
-  RGWAioCompletionNotifier(RGWCompletionManager *_mgr, void *_user_data);
+  RGWAioCompletionNotifier(RGWCompletionManager *_mgr, const rgw_io_id& _io_id, void *_user_data);
   ~RGWAioCompletionNotifier() override {
     c->release();
-    lock.Lock();
+    lock.lock();
     bool need_unregister = registered;
     if (registered) {
       completion_mgr->get();
     }
     registered = false;
-    lock.Unlock();
+    lock.unlock();
     if (need_unregister) {
       completion_mgr->unregister_completion_notifier(this);
       completion_mgr->put();
@@ -100,7 +114,7 @@ public:
   }
 
   void unregister() {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     if (!registered) {
       return;
     }
@@ -108,25 +122,37 @@ public:
   }
 
   void cb() {
-    lock.Lock();
+    lock.lock();
     if (!registered) {
-      lock.Unlock();
+      lock.unlock();
       put();
       return;
     }
     completion_mgr->get();
     registered = false;
-    lock.Unlock();
-    completion_mgr->complete(this, user_data);
+    lock.unlock();
+    completion_mgr->complete(this, io_id, user_data);
     completion_mgr->put();
     put();
   }
 };
 
+// completion notifier with opaque payload (ie a reference-counted pointer)
+template <typename T>
+class RGWAioCompletionNotifierWith : public RGWAioCompletionNotifier {
+  T value;
+public:
+  RGWAioCompletionNotifierWith(RGWCompletionManager *mgr,
+                               const rgw_io_id& io_id, void *user_data,
+                               T value)
+    : RGWAioCompletionNotifier(mgr, io_id, user_data), value(std::move(value))
+  {}
+};
+
 struct RGWCoroutinesEnv {
   uint64_t run_context;
   RGWCoroutinesManager *manager;
-  list<RGWCoroutinesStack *> *scheduled_stacks;
+  std::list<RGWCoroutinesStack *> *scheduled_stacks;
   RGWCoroutinesStack *stack;
 
   RGWCoroutinesEnv() : run_context(0), manager(NULL), scheduled_stacks(NULL), stack(NULL) {}
@@ -139,7 +165,7 @@ enum RGWCoroutineState {
 };
 
 struct rgw_spawned_stacks {
-  vector<RGWCoroutinesStack *> entries;
+  std::vector<RGWCoroutinesStack *> entries;
 
   rgw_spawned_stacks() {}
 
@@ -148,9 +174,8 @@ struct rgw_spawned_stacks {
   }
 
   void inherit(rgw_spawned_stacks *source) {
-    for (vector<RGWCoroutinesStack *>::iterator iter = source->entries.begin();
-         iter != source->entries.end(); ++iter) {
-      add_pending(*iter);
+    for (auto* entry : source->entries) {
+      add_pending(entry);
     }
     source->entries.clear();
   }
@@ -163,9 +188,9 @@ class RGWCoroutine : public RefCountedObject, public boost::asio::coroutine {
 
   struct StatusItem {
     utime_t timestamp;
-    string status;
+    std::string status;
 
-    StatusItem(utime_t& t, const string& s) : timestamp(t), status(s) {}
+    StatusItem(utime_t& t, const std::string& s) : timestamp(t), status(s) {}
 
     void dump(Formatter *f) const;
   };
@@ -174,24 +199,36 @@ class RGWCoroutine : public RefCountedObject, public boost::asio::coroutine {
 
   struct Status {
     CephContext *cct;
-    RWLock lock;
+    ceph::shared_mutex lock =
+      ceph::make_shared_mutex("RGWCoroutine::Status::lock");
     int max_history;
 
     utime_t timestamp;
-    stringstream status;
+    std::stringstream status;
 
-    Status(CephContext *_cct) : cct(_cct), lock("RGWCoroutine::Status::lock"), max_history(MAX_COROUTINE_HISTORY) {}
+    explicit Status(CephContext *_cct) : cct(_cct), max_history(MAX_COROUTINE_HISTORY) {}
 
-    deque<StatusItem> history;
+    std::deque<StatusItem> history;
 
-    stringstream& set_status();
+    std::stringstream& set_status();
   } status;
 
-  stringstream description;
+  std::stringstream description;
 
 protected:
   bool _yield_ret;
-  boost::asio::coroutine drain_cr;
+
+  struct {
+    boost::asio::coroutine cr;
+    bool should_exit{false};
+    int ret{0};
+
+    void init() {
+      cr = boost::asio::coroutine();
+      should_exit = false;
+      ret = 0;
+    }
+  } drain_status;
 
   CephContext *cct;
 
@@ -201,51 +238,52 @@ protected:
 
   rgw_spawned_stacks spawned;
 
-  stringstream error_stream;
+  std::stringstream error_stream;
 
   int set_state(int s, int ret = 0) {
+    retcode = ret;
     state = s;
     return ret;
   }
   int set_cr_error(int ret) {
-    state = RGWCoroutine_Error;
-    return ret;
+    return set_state(RGWCoroutine_Error, ret);
   }
   int set_cr_done() {
-    state = RGWCoroutine_Done;
-    return 0;
+    return set_state(RGWCoroutine_Done, 0);
   }
   void set_io_blocked(bool flag);
-  int io_block(int ret = 0);
 
   void reset_description() {
-    description.str(string());
+    description.str(std::string());
   }
 
-  stringstream& set_description() {
+  std::stringstream& set_description() {
     return description;
   }
-  stringstream& set_status() {
+  std::stringstream& set_status() {
     return status.set_status();
   }
 
-  stringstream& set_status(const string& s) {
-    stringstream& status = set_status();
+  std::stringstream& set_status(const std::string& s) {
+    std::stringstream& status = set_status();
     status << s;
     return status;
   }
 
+  virtual int operate_wrapper(const DoutPrefixProvider *dpp) {
+    return operate(dpp);
+  }
 public:
   RGWCoroutine(CephContext *_cct) : status(_cct), _yield_ret(false), cct(_cct), stack(NULL), retcode(0), state(RGWCoroutine_Run) {}
-  ~RGWCoroutine() override;
+  virtual ~RGWCoroutine() override;
 
-  virtual int operate() = 0;
+  virtual int operate(const DoutPrefixProvider *dpp) = 0;
 
   bool is_done() { return (state == RGWCoroutine_Done || state == RGWCoroutine_Error); }
   bool is_error() { return (state == RGWCoroutine_Error); }
 
-  stringstream& log_error() { return error_stream; }
-  string error_str() {
+  std::stringstream& log_error() { return error_stream; }
+  std::string error_str() {
     return error_stream.str();
   }
 
@@ -259,11 +297,23 @@ public:
 
   void call(RGWCoroutine *op); /* call at the same stack we're in */
   RGWCoroutinesStack *spawn(RGWCoroutine *op, bool wait); /* execute on a different stack */
-  bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
+  bool collect(int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id = nullptr); /* returns true if needs to be called again */
   bool collect_next(int *ret, RGWCoroutinesStack **collected_stack = NULL); /* returns true if found a stack to collect */
 
+  RGWCoroutinesStack *prealloc_stack(); /* prepare a stack that will be used in the next spawn operation */
+  uint64_t prealloc_stack_id(); /* prepare a stack that will be used in the next spawn operation, return its id */
+
   int wait(const utime_t& interval);
-  bool drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack = NULL); /* returns true if needed to be called again */
+  bool drain_children(int num_cr_left,
+                      RGWCoroutinesStack *skip_stack = nullptr,
+                      std::optional<std::function<void(uint64_t stack_id, int ret)> > cb = std::nullopt); /* returns true if needed to be called again,
+                                                                                                             cb will be called on completion of every
+                                                                                                             completion. */
+  bool drain_children(int num_cr_left,
+                      std::optional<std::function<int(uint64_t stack_id, int ret)> > cb); /* returns true if needed to be called again,
+                                                                                             cb will be called on every completion, can filter errors.
+                                                                                             A negative return value from cb means that current cr
+                                                                                             will need to exit */
   void wakeup();
   void set_sleeping(bool flag); /* put in sleep, or wakeup from sleep */
 
@@ -273,16 +323,30 @@ public:
 
   void wait_for_child();
 
-  virtual string to_str() const;
+  virtual std::string to_str() const;
 
   RGWCoroutinesStack *get_stack() const {
     return stack;
   }
 
+  RGWCoroutinesEnv *get_env() const;
+
   void dump(Formatter *f) const;
+
+  void init_new_io(RGWIOProvider *io_provider); /* only links the default io id */
+
+  int io_block(int ret = 0) {
+    return io_block(ret, -1);
+  }
+  int io_block(int ret, int64_t io_id);
+  int io_block(int ret, const rgw_io_id& io_id);
+  void io_complete() {
+    io_complete(rgw_io_id{});
+  }
+  void io_complete(const rgw_io_id& io_id);
 };
 
-ostream& operator<<(ostream& out, const RGWCoroutine& cr);
+std::ostream& operator<<(std::ostream& out, const RGWCoroutine& cr);
 
 #define yield_until_true(x)     \
 do {                            \
@@ -293,23 +357,45 @@ do {                            \
 } while (0)
 
 #define drain_all() \
-  drain_cr = boost::asio::coroutine(); \
+  drain_status.init(); \
   yield_until_true(drain_children(0))
 
 #define drain_all_but(n) \
-  drain_cr = boost::asio::coroutine(); \
+  drain_status.init(); \
   yield_until_true(drain_children(n))
 
 #define drain_all_but_stack(stack) \
-  drain_cr = boost::asio::coroutine(); \
+  drain_status.init(); \
   yield_until_true(drain_children(1, stack))
 
+#define drain_all_but_stack_cb(stack, cb) \
+  drain_status.init(); \
+  yield_until_true(drain_children(1, stack, cb))
+
+#define drain_with_cb(n, cb) \
+  drain_status.init(); \
+  yield_until_true(drain_children(n, cb)); \
+  if (drain_status.should_exit) { \
+    return set_cr_error(drain_status.ret); \
+  }
+
+#define drain_all_cb(cb) \
+  drain_with_cb(0, cb)
+
+#define yield_spawn_window(cr, n, cb) \
+  do { \
+    spawn(cr, false); \
+    drain_with_cb(n, cb); /* this is guaranteed to yield */ \
+  } while (0)
+
+
+
 template <class T>
 class RGWConsumerCR : public RGWCoroutine {
-  list<T> product;
+  std::list<T> product;
 
 public:
-  RGWConsumerCR(CephContext *_cct) : RGWCoroutine(_cct) {}
+  explicit RGWConsumerCR(CephContext *_cct) : RGWCoroutine(_cct) {}
 
   bool has_product() {
     return !product.empty();
@@ -331,7 +417,7 @@ public:
   }
 
   void receive(const T& p, bool wakeup = true);
-  void receive(list<T>& l, bool wakeup = true);
+  void receive(std::list<T>& l, bool wakeup = true);
 };
 
 class RGWCoroutinesStack : public RefCountedObject {
@@ -340,15 +426,22 @@ class RGWCoroutinesStack : public RefCountedObject {
 
   CephContext *cct;
 
+  int64_t id{-1};
+
   RGWCoroutinesManager *ops_mgr;
 
-  list<RGWCoroutine *> ops;
-  list<RGWCoroutine *>::iterator pos;
+  std::list<RGWCoroutine *> ops;
+  std::list<RGWCoroutine *>::iterator pos;
 
   rgw_spawned_stacks spawned;
 
-  set<RGWCoroutinesStack *> blocked_by_stack;
-  set<RGWCoroutinesStack *> blocking_stacks;
+  RGWCoroutinesStack *preallocated_stack{nullptr};
+
+  std::set<RGWCoroutinesStack *> blocked_by_stack;
+  std::set<RGWCoroutinesStack *> blocking_stacks;
+
+  std::map<int64_t, rgw_io_id> io_finish_ids;
+  rgw_io_id io_blocked_id;
 
   bool done_flag;
   bool error_flag;
@@ -369,13 +462,17 @@ protected:
   RGWCoroutinesStack *parent;
 
   RGWCoroutinesStack *spawn(RGWCoroutine *source_op, RGWCoroutine *next_op, bool wait);
-  bool collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
+  bool collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id); /* returns true if needs to be called again */
   bool collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesStack **collected_stack); /* returns true if found a stack to collect */
 public:
   RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start = NULL);
   ~RGWCoroutinesStack() override;
 
-  int operate(RGWCoroutinesEnv *env);
+  int64_t get_id() const {
+    return id;
+  }
+
+  int operate(const DoutPrefixProvider *dpp, RGWCoroutinesEnv *env);
 
   bool is_done() {
     return done_flag;
@@ -389,9 +486,18 @@ public:
   void set_io_blocked(bool flag) {
     blocked_flag = flag;
   }
+  void set_io_blocked_id(const rgw_io_id& io_id) {
+    io_blocked_id = io_id;
+  }
   bool is_io_blocked() {
-    return blocked_flag;
+    return blocked_flag && !done_flag;
   }
+  bool can_io_unblock(const rgw_io_id& io_id) {
+    return ((io_blocked_id.id < 0) ||
+            io_blocked_id.intersects(io_id));
+  }
+  bool try_io_unblock(const rgw_io_id& io_id);
+  bool consume_io_finish(const rgw_io_id& io_id);
   void set_interval_wait(bool flag) {
     interval_wait_flag = flag;
   }
@@ -417,32 +523,34 @@ public:
           is_io_blocked() || waiting_for_child() ;
   }
 
-  void schedule(list<RGWCoroutinesStack *> *stacks = NULL) {
-    if (!stacks) {
-      stacks = env->scheduled_stacks;
-    }
-    if (!is_scheduled) {
-      stacks->push_back(this);
-      is_scheduled = true;
-    }
-  }
+  void schedule();
+  void _schedule();
 
   int get_ret_status() {
     return retcode;
   }
 
-  string error_str();
+  std::string error_str();
 
   void call(RGWCoroutine *next_op);
   RGWCoroutinesStack *spawn(RGWCoroutine *next_op, bool wait);
+  RGWCoroutinesStack *prealloc_stack();
   int unwind(int retcode);
 
   int wait(const utime_t& interval);
   void wakeup();
+  void io_complete() {
+    io_complete(rgw_io_id{});
+  }
+  void io_complete(const rgw_io_id& io_id);
 
-  bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
+  bool collect(int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id); /* returns true if needs to be called again */
+
+  void cancel();
 
   RGWAioCompletionNotifier *create_completion_notifier();
+  template <typename T>
+  RGWAioCompletionNotifier *create_completion_notifier(T value);
   RGWCompletionManager *get_completion_mgr();
 
   void set_blocked_by(RGWCoroutinesStack *s) {
@@ -460,13 +568,15 @@ public:
 
   bool unblock_stack(RGWCoroutinesStack **s);
 
-  RGWCoroutinesEnv *get_env() { return env; }
+  RGWCoroutinesEnv *get_env() const { return env; }
 
   void dump(Formatter *f) const;
+
+  void init_new_io(RGWIOProvider *io_provider);
 };
 
 template <class T>
-void RGWConsumerCR<T>::receive(list<T>& l, bool wakeup)
+void RGWConsumerCR<T>::receive(std::list<T>& l, bool wakeup)
 {
   product.splice(product.end(), l);
   if (wakeup) {
@@ -487,22 +597,25 @@ void RGWConsumerCR<T>::receive(const T& p, bool wakeup)
 class RGWCoroutinesManagerRegistry : public RefCountedObject, public AdminSocketHook {
   CephContext *cct;
 
-  set<RGWCoroutinesManager *> managers;
-  RWLock lock;
+  std::set<RGWCoroutinesManager *> managers;
+  ceph::shared_mutex lock =
+    ceph::make_shared_mutex("RGWCoroutinesRegistry::lock");
 
-  string admin_command;
+  std::string admin_command;
 
 public:
-  RGWCoroutinesManagerRegistry(CephContext *_cct) : cct(_cct), lock("RGWCoroutinesRegistry::lock") {}
-  ~RGWCoroutinesManagerRegistry() override;
+  explicit RGWCoroutinesManagerRegistry(CephContext *_cct) : cct(_cct) {}
+  virtual ~RGWCoroutinesManagerRegistry() override;
 
   void add(RGWCoroutinesManager *mgr);
   void remove(RGWCoroutinesManager *mgr);
 
-  int hook_to_admin_command(const string& command);
-  bool call(std::string command, cmdmap_t& cmdmap, std::string format,
-           bufferlist& out) override;
-    
+  int hook_to_admin_command(const std::string& command);
+  int call(std::string_view command, const cmdmap_t& cmdmap,
+          Formatter *f,
+          std::ostream& ss,
+          bufferlist& out) override;
+
   void dump(Formatter *f) const;
 };
 
@@ -511,38 +624,39 @@ class RGWCoroutinesManager {
   std::atomic<bool> going_down = { false };
 
   std::atomic<int64_t> run_context_count = { 0 };
-  map<uint64_t, set<RGWCoroutinesStack *> > run_contexts;
+  std::map<uint64_t, std::set<RGWCoroutinesStack *> > run_contexts;
+
+  std::atomic<int64_t> max_io_id = { 0 };
+  std::atomic<uint64_t> max_stack_id = { 0 };
+
+  mutable ceph::shared_mutex lock =
+    ceph::make_shared_mutex("RGWCoroutinesManager::lock");
 
-  RWLock lock;
+  RGWIOIDProvider io_id_provider;
 
-  void handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks, RGWCoroutinesStack *stack, int *waiting_count);
+  void handle_unblocked_stack(std::set<RGWCoroutinesStack *>& context_stacks, std::list<RGWCoroutinesStack *>& scheduled_stacks,
+                              RGWCompletionManager::io_completion& io, int *waiting_count);
 protected:
   RGWCompletionManager *completion_mgr;
   RGWCoroutinesManagerRegistry *cr_registry;
 
   int ops_window;
 
-  string id;
+  std::string id;
 
   void put_completion_notifier(RGWAioCompletionNotifier *cn);
 public:
-  RGWCoroutinesManager(CephContext *_cct, RGWCoroutinesManagerRegistry *_cr_registry) : cct(_cct), lock("RGWCoroutinesManager::lock"),
+  RGWCoroutinesManager(CephContext *_cct, RGWCoroutinesManagerRegistry *_cr_registry) : cct(_cct),
                                                                                         cr_registry(_cr_registry), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) {
     completion_mgr = new RGWCompletionManager(cct);
     if (cr_registry) {
       cr_registry->add(this);
     }
   }
-  virtual ~RGWCoroutinesManager() {
-    stop();
-    completion_mgr->put();
-    if (cr_registry) {
-      cr_registry->remove(this);
-    }
-  }
+  virtual ~RGWCoroutinesManager();
 
-  int run(list<RGWCoroutinesStack *>& ops);
-  int run(RGWCoroutine *op);
+  int run(const DoutPrefixProvider *dpp, std::list<RGWCoroutinesStack *>& ops);
+  int run(const DoutPrefixProvider *dpp, RGWCoroutine *op);
   void stop() {
     bool expected = false;
     if (going_down.compare_exchange_strong(expected, true)) {
@@ -553,22 +667,50 @@ public:
   virtual void report_error(RGWCoroutinesStack *op);
 
   RGWAioCompletionNotifier *create_completion_notifier(RGWCoroutinesStack *stack);
+  template <typename T>
+  RGWAioCompletionNotifier *create_completion_notifier(RGWCoroutinesStack *stack, T value);
   RGWCompletionManager *get_completion_mgr() { return completion_mgr; }
 
   void schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack);
+  void _schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack);
   RGWCoroutinesStack *allocate_stack();
 
-  virtual string get_id();
+  int64_t get_next_io_id();
+  uint64_t get_next_stack_id();
+
+  void set_sleeping(RGWCoroutine *cr, bool flag);
+  void io_complete(RGWCoroutine *cr, const rgw_io_id& io_id);
+
+  virtual std::string get_id();
   void dump(Formatter *f) const;
+
+  RGWIOIDProvider& get_io_id_provider() {
+    return io_id_provider;
+  }
 };
 
+template <typename T>
+RGWAioCompletionNotifier *RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack *stack, T value)
+{
+  rgw_io_id io_id{get_next_io_id(), -1};
+  RGWAioCompletionNotifier *cn = new RGWAioCompletionNotifierWith<T>(completion_mgr, io_id, (void *)stack, std::move(value));
+  completion_mgr->register_completion_notifier(cn);
+  return cn;
+}
+
+template <typename T>
+RGWAioCompletionNotifier *RGWCoroutinesStack::create_completion_notifier(T value)
+{
+  return ops_mgr->create_completion_notifier(this, std::move(value));
+}
+
 class RGWSimpleCoroutine : public RGWCoroutine {
   bool called_cleanup;
 
-  int operate() override;
+  int operate(const DoutPrefixProvider *dpp) override;
 
   int state_init();
-  int state_send_request();
+  int state_send_request(const DoutPrefixProvider *dpp);
   int state_request_complete();
   int state_all_complete();
 
@@ -579,7 +721,7 @@ public:
   ~RGWSimpleCoroutine() override;
 
   virtual int init() { return 0; }
-  virtual int send_request() = 0;
+  virtual int send_request(const DoutPrefixProvider *dpp) = 0;
   virtual int request_complete() = 0;
   virtual int finish() { return 0; }
   virtual void request_cleanup() {}