]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_notify.cc
import ceph pacific 16.2.5
[ceph.git] / ceph / src / rgw / rgw_notify.cc
index 5fa753b88830fb1e7faac7e55f8d3d2e702ad2c9..a4ad062005e70b607fab90f84cf25bf108be6e0b 100644 (file)
@@ -52,7 +52,7 @@ auto make_stack_allocator() {
   return boost::context::protected_fixedsize_stack{128*1024};
 }
 
-class Manager {
+class Manager : public DoutPrefixProvider {
   const size_t max_queue_size;
   const uint32_t queues_update_period_ms;
   const uint32_t queues_update_retry_ms;
@@ -71,6 +71,10 @@ class Manager {
  
   const std::string Q_LIST_OBJECT_NAME = "queues_list_object";
 
+  CephContext *get_cct() const override { return cct; }
+  unsigned get_subsys() const override { return dout_subsys; }
+  std::ostream& gen_prefix(std::ostream& out) const override { return out << "rgw notify: "; }
+
   // read the list of queues from the queue list object
   int read_queue_list(queues_t& queues, optional_yield y) {
     constexpr auto max_chunk = 1024U;
@@ -81,14 +85,14 @@ class Manager {
       librados::ObjectReadOperation op;
       queues_t queues_chunk;
       op.omap_get_keys2(start_after, max_chunk, &queues_chunk, &more, &rval);
-      const auto ret = rgw_rados_operate(rados_ioctx, Q_LIST_OBJECT_NAME, &op, nullptr, y);
+      const auto ret = rgw_rados_operate(this, rados_ioctx, Q_LIST_OBJECT_NAME, &op, nullptr, y);
       if (ret == -ENOENT) {
         // queue list object was not created - nothing to do
         return 0;
       }
       if (ret < 0) {
         // TODO: do we need to check on rval as well as ret?
-        ldout(cct, 1) << "ERROR: failed to read queue list. error: " << ret << dendl;
+        ldpp_dout(this, 1) << "ERROR: failed to read queue list. error: " << ret << dendl;
         return ret;
       }
       queues.merge(queues_chunk);
@@ -160,29 +164,29 @@ class Manager {
     try {
       decode(event_entry, iter);
     } catch (buffer::error& err) {
-      ldout(cct, 5) << "WARNING: failed to decode entry. error: " << err.what() << dendl;
+      ldpp_dout(this, 5) << "WARNING: failed to decode entry. error: " << err.what() << dendl;
       return false;
     }
     try {
       // TODO move endpoint creation to queue level
       const auto push_endpoint = RGWPubSubEndpoint::create(event_entry.push_endpoint, event_entry.arn_topic,
-          RGWHTTPArgs(event_entry.push_endpoint_args), 
+          RGWHTTPArgs(event_entry.push_endpoint_args, this), 
           cct);
-      ldout(cct, 20) << "INFO: push endpoint created: " << event_entry.push_endpoint <<
+      ldpp_dout(this, 20) << "INFO: push endpoint created: " << event_entry.push_endpoint <<
         " for entry: " << entry.marker << dendl;
       const auto ret = push_endpoint->send_to_completion_async(cct, event_entry.event, optional_yield(io_context, yield));
       if (ret < 0) {
-        ldout(cct, 5) << "WARNING: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint 
+        ldpp_dout(this, 5) << "WARNING: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint 
           << " failed. error: " << ret << " (will retry)" << dendl;
         return false;
       } else {
-        ldout(cct, 20) << "INFO: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint 
+        ldpp_dout(this, 20) << "INFO: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint 
           << " ok" <<  dendl;
         if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
         return true;
       }
     } catch (const RGWPubSubEndpoint::configuration_error& e) {
-      ldout(cct, 5) << "WARNING: failed to create push endpoint: " 
+      ldpp_dout(this, 5) << "WARNING: failed to create push endpoint: " 
           << event_entry.push_endpoint << " for entry: " << entry.marker << ". error: " << e.what() << " (will retry) " << dendl;
       return false;
     }
@@ -191,7 +195,7 @@ class Manager {
   // clean stale reservation from queue
   void cleanup_queue(const std::string& queue_name, spawn::yield_context yield) {
     while (true) {
-      ldout(cct, 20) << "INFO: trying to perform stale reservation cleanup for queue: " << queue_name << dendl;
+      ldpp_dout(this, 20) << "INFO: trying to perform stale reservation cleanup for queue: " << queue_name << dendl;
       const auto now = ceph::coarse_real_time::clock::now();
       const auto stale_time = now - std::chrono::seconds(stale_reservations_period_s);
       librados::ObjectWriteOperation op;
@@ -202,19 +206,19 @@ class Manager {
         "" /*no tag*/);
       cls_2pc_queue_expire_reservations(op, stale_time);
       // check ownership and do reservation cleanup in one batch
-      auto ret = rgw_rados_operate(rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
+      auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
       if (ret == -ENOENT) {
         // queue was deleted
-        ldout(cct, 5) << "INFO: queue: " 
+        ldpp_dout(this, 5) << "INFO: queue: " 
           << queue_name << ". was removed. cleanup will stop" << dendl;
         return;
       }
       if (ret == -EBUSY) {
-        ldout(cct, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
+        ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
         return;
       }
       if (ret < 0) {
-        ldout(cct, 5) << "WARNING: failed to cleanup stale reservation from queue and/or lock queue: " << queue_name
+        ldpp_dout(this, 5) << "WARNING: failed to cleanup stale reservation from queue and/or lock queue: " << queue_name
           << ". error: " << ret << dendl;
       }
       Timer timer(io_context);
@@ -261,25 +265,25 @@ class Manager {
           "" /*no tag*/);
         cls_2pc_queue_list_entries(op, start_marker, max_elements, &obl, &rval);
         // check ownership and list entries in one batch
-        auto ret = rgw_rados_operate(rados_ioctx, queue_name, &op, nullptr, optional_yield(io_context, yield));
+        auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, nullptr, optional_yield(io_context, yield));
         if (ret == -ENOENT) {
           // queue was deleted
-          ldout(cct, 5) << "INFO: queue: " 
+          ldpp_dout(this, 5) << "INFO: queue: " 
             << queue_name << ". was removed. processing will stop" << dendl;
           return;
         }
         if (ret == -EBUSY) {
-          ldout(cct, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
+          ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
           return;
         }
         if (ret < 0) {
-          ldout(cct, 5) << "WARNING: failed to get list of entries in queue and/or lock queue: " 
+          ldpp_dout(this, 5) << "WARNING: failed to get list of entries in queue and/or lock queue: " 
             << queue_name << ". error: " << ret << " (will retry)" << dendl;
           continue;
         }
         ret = cls_2pc_queue_list_entries_result(obl, entries, &truncated, end_marker);
         if (ret < 0) {
-          ldout(cct, 5) << "WARNING: failed to parse list of entries in queue: " 
+          ldpp_dout(this, 5) << "WARNING: failed to parse list of entries in queue: " 
             << queue_name << ". error: " << ret << " (will retry)" << dendl;
           continue;
         }
@@ -290,7 +294,7 @@ class Manager {
         continue;
       }
       // log when queue is not idle
-      ldout(cct, 20) << "INFO: found: " << total_entries << " entries in: " << queue_name <<
+      ldpp_dout(this, 20) << "INFO: found: " << total_entries << " entries in: " << queue_name <<
         ". end marker is: " << end_marker << dendl;
       
       is_idle = false;
@@ -307,17 +311,17 @@ class Manager {
         spawn::spawn(yield, [this, &queue_name, entry_idx, total_entries, &end_marker, &remove_entries, &has_error, &waiter, entry](spawn::yield_context yield) {
             const auto token = waiter.make_token();
             if (process_entry(entry, yield)) {
-              ldout(cct, 20) << "INFO: processing of entry: " << 
+              ldpp_dout(this, 20) << "INFO: processing of entry: " << 
                 entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " ok" << dendl;
               remove_entries = true;
             }  else {
               if (set_min_marker(end_marker, entry.marker) < 0) {
-                ldout(cct, 1) << "ERROR: cannot determin minimum between malformed markers: " << end_marker << ", " << entry.marker << dendl;
+                ldpp_dout(this, 1) << "ERROR: cannot determin minimum between malformed markers: " << end_marker << ", " << entry.marker << dendl;
               } else {
-                ldout(cct, 20) << "INFO: new end marker for removal: " << end_marker << " from: " << queue_name << dendl;
+                ldpp_dout(this, 20) << "INFO: new end marker for removal: " << end_marker << " from: " << queue_name << dendl;
               }
               has_error = true;
-              ldout(cct, 20) << "INFO: processing of entry: " << 
+              ldpp_dout(this, 20) << "INFO: processing of entry: " << 
                 entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " failed" << dendl;
             } 
         }, make_stack_allocator());
@@ -337,22 +341,22 @@ class Manager {
           "" /*no tag*/);
         cls_2pc_queue_remove_entries(op, end_marker); 
         // check ownership and deleted entries in one batch
-        const auto ret = rgw_rados_operate(rados_ioctx, queue_name, &op, optional_yield(io_context, yield)); 
+        const auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield)); 
         if (ret == -ENOENT) {
           // queue was deleted
-          ldout(cct, 5) << "INFO: queue: " 
+          ldpp_dout(this, 5) << "INFO: queue: " 
             << queue_name << ". was removed. processing will stop" << dendl;
           return;
         }
         if (ret == -EBUSY) {
-          ldout(cct, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
+          ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
           return;
         }
         if (ret < 0) {
-          ldout(cct, 1) << "ERROR: failed to remove entries and/or lock queue up to: " << end_marker <<  " from queue: " 
+          ldpp_dout(this, 1) << "ERROR: failed to remove entries and/or lock queue up to: " << end_marker <<  " from queue: " 
             << queue_name << ". error: " << ret << dendl;
         } else {
-          ldout(cct, 20) << "INFO: removed entries up to: " << end_marker <<  " from queue: " 
+          ldpp_dout(this, 20) << "INFO: removed entries up to: " << end_marker <<  " from queue: " 
           << queue_name << dendl;
         }
       }
@@ -384,7 +388,7 @@ class Manager {
         std::chrono::milliseconds(duration_jitter(rnd_gen));
       timer.expires_from_now(duration);
       const auto tp = ceph::coarse_real_time::clock::to_time_t(ceph::coarse_real_time::clock::now() + duration);
-      ldout(cct, 20) << "INFO: next queues processing will happen at: " << std::ctime(&tp)  << dendl;
+      ldpp_dout(this, 20) << "INFO: next queues processing will happen at: " << std::ctime(&tp)  << dendl;
       boost::system::error_code ec;
       timer.async_wait(yield[ec]);
 
@@ -410,27 +414,27 @@ class Manager {
               failover_time,
               LOCK_FLAG_MAY_RENEW);
 
-        ret = rgw_rados_operate(rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
+        ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
         if (ret == -EBUSY) {
           // lock is already taken by another RGW
-          ldout(cct, 20) << "INFO: queue: " << queue_name << " owned (locked) by another daemon" << dendl;
+          ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " owned (locked) by another daemon" << dendl;
           // if queue was owned by this RGW, processing should be stopped, queue would be deleted from list afterwards
           continue;
         }
         if (ret == -ENOENT) {
           // queue is deleted - processing will stop the next time we try to read from the queue
-          ldout(cct, 10) << "INFO: queue: " << queue_name << " should not be locked - already deleted" << dendl;
+          ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " should not be locked - already deleted" << dendl;
           continue;
         }
         if (ret < 0) {
           // failed to lock for another reason, continue to process other queues
-          ldout(cct, 1) << "ERROR: failed to lock queue: " << queue_name << ". error: " << ret << dendl;
+          ldpp_dout(this, 1) << "ERROR: failed to lock queue: " << queue_name << ". error: " << ret << dendl;
           has_error = true;
           continue;
         }
         // add queue to list of owned queues
         if (owned_queues.insert(queue_name).second) {
-          ldout(cct, 10) << "INFO: queue: " << queue_name << " now owned (locked) by this daemon" << dendl;
+          ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " now owned (locked) by this daemon" << dendl;
           // start processing this queue
           spawn::spawn(io_context, [this, &queue_gc, &queue_gc_lock, queue_name](spawn::yield_context yield) {
             process_queue(queue_name, yield);
@@ -438,10 +442,10 @@ class Manager {
             // mark it for deletion
             std::lock_guard lock_guard(queue_gc_lock);
             queue_gc.push_back(queue_name);
-            ldout(cct, 10) << "INFO: queue: " << queue_name << " marked for removal" << dendl;
+            ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " marked for removal" << dendl;
           }, make_stack_allocator());
         } else {
-          ldout(cct, 20) << "INFO: queue: " << queue_name << " ownership (lock) renewed" << dendl;
+          ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " ownership (lock) renewed" << dendl;
         }
       }
       // erase all queue that were deleted
@@ -449,7 +453,7 @@ class Manager {
         std::lock_guard lock_guard(queue_gc_lock);
         std::for_each(queue_gc.begin(), queue_gc.end(), [this, &owned_queues](const std::string& queue_name) {
           owned_queues.erase(queue_name);
-          ldout(cct, 20) << "INFO: queue: " << queue_name << " removed" << dendl;
+          ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " removed" << dendl;
         });
         queue_gc.clear();
       }
@@ -494,64 +498,64 @@ public:
             (WORKER_THREAD_NAME+std::to_string(worker_id)).c_str());
         ceph_assert(rc == 0);
       }
-      ldout(cct, 10) << "Started notification manager with: " << worker_count << " workers" << dendl;
+      ldpp_dout(this, 10) << "Started notification manager with: " << worker_count << " workers" << dendl;
     }
 
   int add_persistent_topic(const std::string& topic_name, optional_yield y) {
     if (topic_name == Q_LIST_OBJECT_NAME) {
-      ldout(cct, 1) << "ERROR: topic name cannot be: " << Q_LIST_OBJECT_NAME << " (conflict with queue list object name)" << dendl;
+      ldpp_dout(this, 1) << "ERROR: topic name cannot be: " << Q_LIST_OBJECT_NAME << " (conflict with queue list object name)" << dendl;
       return -EINVAL;
     }
     librados::ObjectWriteOperation op;
     op.create(true);
     cls_2pc_queue_init(op, topic_name, max_queue_size);
-    auto ret = rgw_rados_operate(rados_ioctx, topic_name, &op, y);
+    auto ret = rgw_rados_operate(this, rados_ioctx, topic_name, &op, y);
     if (ret == -EEXIST) {
       // queue already exists - nothing to do
-      ldout(cct, 20) << "INFO: queue for topic: " << topic_name << " already exists. nothing to do" << dendl;
+      ldpp_dout(this, 20) << "INFO: queue for topic: " << topic_name << " already exists. nothing to do" << dendl;
       return 0;
     }
     if (ret < 0) {
       // failed to create queue
-      ldout(cct, 1) << "ERROR: failed to create queue for topic: " << topic_name << ". error: " << ret << dendl;
+      ldpp_dout(this, 1) << "ERROR: failed to create queue for topic: " << topic_name << ". error: " << ret << dendl;
       return ret;
     }
    
     bufferlist empty_bl;
     std::map<std::string, bufferlist> new_topic{{topic_name, empty_bl}};
     op.omap_set(new_topic);
-    ret = rgw_rados_operate(rados_ioctx, Q_LIST_OBJECT_NAME, &op, y);
+    ret = rgw_rados_operate(this, rados_ioctx, Q_LIST_OBJECT_NAME, &op, y);
     if (ret < 0) {
-      ldout(cct, 1) << "ERROR: failed to add queue: " << topic_name << " to queue list. error: " << ret << dendl;
+      ldpp_dout(this, 1) << "ERROR: failed to add queue: " << topic_name << " to queue list. error: " << ret << dendl;
       return ret;
     } 
-    ldout(cct, 20) << "INFO: queue: " << topic_name << " added to queue list"  << dendl;
+    ldpp_dout(this, 20) << "INFO: queue: " << topic_name << " added to queue list"  << dendl;
     return 0;
   }
   
   int remove_persistent_topic(const std::string& topic_name, optional_yield y) {
     librados::ObjectWriteOperation op;
     op.remove();
-    auto ret = rgw_rados_operate(rados_ioctx, topic_name, &op, y);
+    auto ret = rgw_rados_operate(this, rados_ioctx, topic_name, &op, y);
     if (ret == -ENOENT) {
       // queue already removed - nothing to do
-      ldout(cct, 20) << "INFO: queue for topic: " << topic_name << " already removed. nothing to do" << dendl;
+      ldpp_dout(this, 20) << "INFO: queue for topic: " << topic_name << " already removed. nothing to do" << dendl;
       return 0;
     }
     if (ret < 0) {
       // failed to remove queue
-      ldout(cct, 1) << "ERROR: failed to remove queue for topic: " << topic_name << ". error: " << ret << dendl;
+      ldpp_dout(this, 1) << "ERROR: failed to remove queue for topic: " << topic_name << ". error: " << ret << dendl;
       return ret;
     }
   
     std::set<std::string> topic_to_remove{{topic_name}};
     op.omap_rm_keys(topic_to_remove);
-    ret = rgw_rados_operate(rados_ioctx, Q_LIST_OBJECT_NAME, &op, y);
+    ret = rgw_rados_operate(this, rados_ioctx, Q_LIST_OBJECT_NAME, &op, y);
     if (ret < 0) {
-      ldout(cct, 1) << "ERROR: failed to remove queue: " << topic_name << " from queue list. error: " << ret << dendl;
+      ldpp_dout(this, 1) << "ERROR: failed to remove queue: " << topic_name << " from queue list. error: " << ret << dendl;
       return ret;
     } 
-    ldout(cct, 20) << "INFO: queue: " << topic_name << " removed from queue list"  << dendl;
+    ldpp_dout(this, 20) << "INFO: queue: " << topic_name << " removed from queue list"  << dendl;
     return 0;
   }
 };
@@ -570,7 +574,7 @@ constexpr uint32_t WORKER_COUNT = 1;                 // 1 worker thread
 constexpr uint32_t STALE_RESERVATIONS_PERIOD_S = 120;   // cleanup reservations that are more than 2 minutes old
 constexpr uint32_t RESERVATIONS_CLEANUP_PERIOD_S = 30; // reservation cleanup every 30 seconds
 
-bool init(CephContext* cct, rgw::sal::RGWRadosStore* store) {
+bool init(CephContext* cct, rgw::sal::RGWRadosStore* store, const DoutPrefixProvider *dpp) {
   if (s_manager) {
     return false;
   }
@@ -610,7 +614,7 @@ rgw::sal::RGWObject* get_object_with_atttributes(const req_state* s, rgw::sal::R
     if (!src_obj->get_bucket()) {
       src_obj->set_bucket(s->bucket.get());
     }
-    if (src_obj->get_obj_attrs(s->obj_ctx, s->yield) < 0) {
+    if (src_obj->get_obj_attrs(s->obj_ctx, s->yield, s) < 0) {
       return nullptr;
     }
   }
@@ -748,7 +752,7 @@ bool notification_match(const rgw_pubsub_topic_filter& filter, const req_state*
   return true;
 }
 
-int publish_reserve(EventType event_type,
+int publish_reserve(const DoutPrefixProvider *dpp, EventType event_type,
       reservation_t& res,
       const RGWObjTags* req_tags)
 {
@@ -767,7 +771,7 @@ int publish_reserve(EventType event_type,
       // notification does not apply to req_state
       continue;
     }
-    ldout(res.s->cct, 20) << "INFO: notification: '" << topic_filter.s3_id << 
+    ldpp_dout(dpp, 20) << "INFO: notification: '" << topic_filter.s3_id << 
         "' on topic: '" << topic_cfg.dest.arn_topic << 
         "' and bucket: '" << res.s->bucket->get_name() << 
         "' (unique topic: '" << topic_cfg.name <<
@@ -783,17 +787,17 @@ int publish_reserve(EventType event_type,
       int rval;
       const auto& queue_name = topic_cfg.dest.arn_topic;
       cls_2pc_queue_reserve(op, res.size, 1, &obl, &rval);
-      auto ret = rgw_rados_operate(res.store->getRados()->get_notif_pool_ctx(), 
+      auto ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(), 
           queue_name, &op, res.s->yield, librados::OPERATION_RETURNVEC);
       if (ret < 0) {
-        ldout(res.s->cct, 1) << "ERROR: failed to reserve notification on queue: " << queue_name 
+        ldpp_dout(dpp, 1) << "ERROR: failed to reserve notification on queue: " << queue_name 
           << ". error: " << ret << dendl;
         // if no space is left in queue we ask client to slow down
         return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret;
       }
       ret = cls_2pc_queue_reserve_result(obl, res_id);
       if (ret < 0) {
-        ldout(res.s->cct, 1) << "ERROR: failed to parse reservation id. error: " << ret << dendl;
+        ldpp_dout(dpp, 1) << "ERROR: failed to parse reservation id. error: " << ret << dendl;
         return ret;
       }
     }
@@ -807,7 +811,8 @@ int publish_commit(rgw::sal::RGWObject* obj,
         const ceph::real_time& mtime, 
         const std::string& etag, 
         EventType event_type,
-        reservation_t& res) 
+        reservation_t& res,
+        const DoutPrefixProvider *dpp) 
 {
   for (auto& topic : res.topics) {
     if (topic.cfg.dest.persistent && topic.res_id == cls_2pc_reservation::NO_ID) {
@@ -827,16 +832,16 @@ int publish_commit(rgw::sal::RGWObject* obj,
       const auto& queue_name = topic.cfg.dest.arn_topic;
       if (bl.length() > res.size) {
         // try to make a larger reservation, fail only if this is not possible
-        ldout(res.s->cct, 5) << "WARNING: committed size: " << bl.length() << " exceeded reserved size: " << res.size <<
+        ldpp_dout(dpp, 5) << "WARNING: committed size: " << bl.length() << " exceeded reserved size: " << res.size <<
           " . trying to make a larger reservation on queue:" << queue_name << dendl;
         // first cancel the existing reservation
         librados::ObjectWriteOperation op;
         cls_2pc_queue_abort(op, topic.res_id);
-        auto ret = rgw_rados_operate(res.store->getRados()->get_notif_pool_ctx(),
+        auto ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(),
             topic.cfg.dest.arn_topic, &op,
             res.s->yield);
         if (ret < 0) {
-          ldout(res.s->cct, 1) << "ERROR: failed to abort reservation: " << topic.res_id << 
+          ldpp_dout(dpp, 1) << "ERROR: failed to abort reservation: " << topic.res_id << 
             " when trying to make a larger reservation on queue: " << queue_name
             << ". error: " << ret << dendl;
           return ret;
@@ -845,28 +850,28 @@ int publish_commit(rgw::sal::RGWObject* obj,
         bufferlist obl;
         int rval;
         cls_2pc_queue_reserve(op, bl.length(), 1, &obl, &rval);
-        ret = rgw_rados_operate(res.store->getRados()->get_notif_pool_ctx(), 
+        ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(), 
           queue_name, &op, res.s->yield, librados::OPERATION_RETURNVEC);
         if (ret < 0) {
-          ldout(res.s->cct, 1) << "ERROR: failed to reserve extra space on queue: " << queue_name
+          ldpp_dout(dpp, 1) << "ERROR: failed to reserve extra space on queue: " << queue_name
             << ". error: " << ret << dendl;
           return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret;
         }
         ret = cls_2pc_queue_reserve_result(obl, topic.res_id);
         if (ret < 0) {
-          ldout(res.s->cct, 1) << "ERROR: failed to parse reservation id for extra space. error: " << ret << dendl;
+          ldpp_dout(dpp, 1) << "ERROR: failed to parse reservation id for extra space. error: " << ret << dendl;
           return ret;
         }
       }
       std::vector<bufferlist> bl_data_vec{std::move(bl)};
       librados::ObjectWriteOperation op;
       cls_2pc_queue_commit(op, bl_data_vec, topic.res_id);
-      const auto ret = rgw_rados_operate(res.store->getRados()->get_notif_pool_ctx(),
+      const auto ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(),
             queue_name, &op,
             res.s->yield);
       topic.res_id = cls_2pc_reservation::NO_ID;
       if (ret < 0) {
-        ldout(res.s->cct, 1) << "ERROR: failed to commit reservation to queue: " << queue_name
+        ldpp_dout(dpp, 1) << "ERROR: failed to commit reservation to queue: " << queue_name
           << ". error: " << ret << dendl;
         return ret;
       }
@@ -875,18 +880,18 @@ int publish_commit(rgw::sal::RGWObject* obj,
         // TODO add endpoint LRU cache
         const auto push_endpoint = RGWPubSubEndpoint::create(topic.cfg.dest.push_endpoint, 
                 topic.cfg.dest.arn_topic,
-                RGWHTTPArgs(topic.cfg.dest.push_endpoint_args), 
+                RGWHTTPArgs(topic.cfg.dest.push_endpoint_args, dpp), 
                 res.s->cct);
-        ldout(res.s->cct, 20) << "INFO: push endpoint created: " << topic.cfg.dest.push_endpoint << dendl;
+        ldpp_dout(dpp, 20) << "INFO: push endpoint created: " << topic.cfg.dest.push_endpoint << dendl;
         const auto ret = push_endpoint->send_to_completion_async(res.s->cct, event_entry.event, res.s->yield);
         if (ret < 0) {
-          ldout(res.s->cct, 1) << "ERROR: push to endpoint " << topic.cfg.dest.push_endpoint << " failed. error: " << ret << dendl;
+          ldpp_dout(dpp, 1) << "ERROR: push to endpoint " << topic.cfg.dest.push_endpoint << " failed. error: " << ret << dendl;
           if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
           return ret;
         }
         if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
       } catch (const RGWPubSubEndpoint::configuration_error& e) {
-        ldout(res.s->cct, 1) << "ERROR: failed to create push endpoint: " 
+        ldpp_dout(dpp, 1) << "ERROR: failed to create push endpoint: " 
             << topic.cfg.dest.push_endpoint << ". error: " << e.what() << dendl;
         if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
         return -EINVAL;
@@ -896,7 +901,7 @@ int publish_commit(rgw::sal::RGWObject* obj,
   return 0;
 }
 
-int publish_abort(reservation_t& res) {
+int publish_abort(const DoutPrefixProvider *dpp, reservation_t& res) {
   for (auto& topic : res.topics) {
     if (!topic.cfg.dest.persistent || topic.res_id == cls_2pc_reservation::NO_ID) {
       // nothing to abort or already committed/aborted
@@ -905,11 +910,11 @@ int publish_abort(reservation_t& res) {
     const auto& queue_name = topic.cfg.dest.arn_topic;
     librados::ObjectWriteOperation op;
     cls_2pc_queue_abort(op, topic.res_id);
-    const auto ret = rgw_rados_operate(res.store->getRados()->get_notif_pool_ctx(),
+    const auto ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(),
       queue_name, &op,
       res.s->yield);
     if (ret < 0) {
-      ldout(res.s->cct, 1) << "ERROR: failed to abort reservation: " << topic.res_id << 
+      ldpp_dout(dpp, 1) << "ERROR: failed to abort reservation: " << topic.res_id << 
         " from queue: " << queue_name << ". error: " << ret << dendl;
       return ret;
     }
@@ -919,7 +924,7 @@ int publish_abort(reservation_t& res) {
 }
 
 reservation_t::~reservation_t() {
-  publish_abort(*this);
+  publish_abort(dpp, *this);
 }
 
 }