return boost::context::protected_fixedsize_stack{128*1024};
}
-class Manager {
+class Manager : public DoutPrefixProvider {
const size_t max_queue_size;
const uint32_t queues_update_period_ms;
const uint32_t queues_update_retry_ms;
const std::string Q_LIST_OBJECT_NAME = "queues_list_object";
+ CephContext *get_cct() const override { return cct; }
+ unsigned get_subsys() const override { return dout_subsys; }
+ std::ostream& gen_prefix(std::ostream& out) const override { return out << "rgw notify: "; }
+
// read the list of queues from the queue list object
int read_queue_list(queues_t& queues, optional_yield y) {
constexpr auto max_chunk = 1024U;
librados::ObjectReadOperation op;
queues_t queues_chunk;
op.omap_get_keys2(start_after, max_chunk, &queues_chunk, &more, &rval);
- const auto ret = rgw_rados_operate(rados_ioctx, Q_LIST_OBJECT_NAME, &op, nullptr, y);
+ const auto ret = rgw_rados_operate(this, rados_ioctx, Q_LIST_OBJECT_NAME, &op, nullptr, y);
if (ret == -ENOENT) {
// queue list object was not created - nothing to do
return 0;
}
if (ret < 0) {
// TODO: do we need to check on rval as well as ret?
- ldout(cct, 1) << "ERROR: failed to read queue list. error: " << ret << dendl;
+ ldpp_dout(this, 1) << "ERROR: failed to read queue list. error: " << ret << dendl;
return ret;
}
queues.merge(queues_chunk);
try {
decode(event_entry, iter);
} catch (buffer::error& err) {
- ldout(cct, 5) << "WARNING: failed to decode entry. error: " << err.what() << dendl;
+ ldpp_dout(this, 5) << "WARNING: failed to decode entry. error: " << err.what() << dendl;
return false;
}
try {
// TODO move endpoint creation to queue level
const auto push_endpoint = RGWPubSubEndpoint::create(event_entry.push_endpoint, event_entry.arn_topic,
- RGWHTTPArgs(event_entry.push_endpoint_args),
+ RGWHTTPArgs(event_entry.push_endpoint_args, this),
cct);
- ldout(cct, 20) << "INFO: push endpoint created: " << event_entry.push_endpoint <<
+ ldpp_dout(this, 20) << "INFO: push endpoint created: " << event_entry.push_endpoint <<
" for entry: " << entry.marker << dendl;
const auto ret = push_endpoint->send_to_completion_async(cct, event_entry.event, optional_yield(io_context, yield));
if (ret < 0) {
- ldout(cct, 5) << "WARNING: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint
+ ldpp_dout(this, 5) << "WARNING: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint
<< " failed. error: " << ret << " (will retry)" << dendl;
return false;
} else {
- ldout(cct, 20) << "INFO: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint
+ ldpp_dout(this, 20) << "INFO: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint
<< " ok" << dendl;
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
return true;
}
} catch (const RGWPubSubEndpoint::configuration_error& e) {
- ldout(cct, 5) << "WARNING: failed to create push endpoint: "
+ ldpp_dout(this, 5) << "WARNING: failed to create push endpoint: "
<< event_entry.push_endpoint << " for entry: " << entry.marker << ". error: " << e.what() << " (will retry) " << dendl;
return false;
}
// clean stale reservation from queue
void cleanup_queue(const std::string& queue_name, spawn::yield_context yield) {
while (true) {
- ldout(cct, 20) << "INFO: trying to perform stale reservation cleanup for queue: " << queue_name << dendl;
+ ldpp_dout(this, 20) << "INFO: trying to perform stale reservation cleanup for queue: " << queue_name << dendl;
const auto now = ceph::coarse_real_time::clock::now();
const auto stale_time = now - std::chrono::seconds(stale_reservations_period_s);
librados::ObjectWriteOperation op;
"" /*no tag*/);
cls_2pc_queue_expire_reservations(op, stale_time);
// check ownership and do reservation cleanup in one batch
- auto ret = rgw_rados_operate(rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
+ auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
if (ret == -ENOENT) {
// queue was deleted
- ldout(cct, 5) << "INFO: queue: "
+ ldpp_dout(this, 5) << "INFO: queue: "
<< queue_name << ". was removed. cleanup will stop" << dendl;
return;
}
if (ret == -EBUSY) {
- ldout(cct, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
+ ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
return;
}
if (ret < 0) {
- ldout(cct, 5) << "WARNING: failed to cleanup stale reservation from queue and/or lock queue: " << queue_name
+ ldpp_dout(this, 5) << "WARNING: failed to cleanup stale reservation from queue and/or lock queue: " << queue_name
<< ". error: " << ret << dendl;
}
Timer timer(io_context);
"" /*no tag*/);
cls_2pc_queue_list_entries(op, start_marker, max_elements, &obl, &rval);
// check ownership and list entries in one batch
- auto ret = rgw_rados_operate(rados_ioctx, queue_name, &op, nullptr, optional_yield(io_context, yield));
+ auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, nullptr, optional_yield(io_context, yield));
if (ret == -ENOENT) {
// queue was deleted
- ldout(cct, 5) << "INFO: queue: "
+ ldpp_dout(this, 5) << "INFO: queue: "
<< queue_name << ". was removed. processing will stop" << dendl;
return;
}
if (ret == -EBUSY) {
- ldout(cct, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
+ ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
return;
}
if (ret < 0) {
- ldout(cct, 5) << "WARNING: failed to get list of entries in queue and/or lock queue: "
+ ldpp_dout(this, 5) << "WARNING: failed to get list of entries in queue and/or lock queue: "
<< queue_name << ". error: " << ret << " (will retry)" << dendl;
continue;
}
ret = cls_2pc_queue_list_entries_result(obl, entries, &truncated, end_marker);
if (ret < 0) {
- ldout(cct, 5) << "WARNING: failed to parse list of entries in queue: "
+ ldpp_dout(this, 5) << "WARNING: failed to parse list of entries in queue: "
<< queue_name << ". error: " << ret << " (will retry)" << dendl;
continue;
}
continue;
}
// log when queue is not idle
- ldout(cct, 20) << "INFO: found: " << total_entries << " entries in: " << queue_name <<
+ ldpp_dout(this, 20) << "INFO: found: " << total_entries << " entries in: " << queue_name <<
". end marker is: " << end_marker << dendl;
is_idle = false;
spawn::spawn(yield, [this, &queue_name, entry_idx, total_entries, &end_marker, &remove_entries, &has_error, &waiter, entry](spawn::yield_context yield) {
const auto token = waiter.make_token();
if (process_entry(entry, yield)) {
- ldout(cct, 20) << "INFO: processing of entry: " <<
+ ldpp_dout(this, 20) << "INFO: processing of entry: " <<
entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " ok" << dendl;
remove_entries = true;
} else {
if (set_min_marker(end_marker, entry.marker) < 0) {
- ldout(cct, 1) << "ERROR: cannot determin minimum between malformed markers: " << end_marker << ", " << entry.marker << dendl;
+ ldpp_dout(this, 1) << "ERROR: cannot determin minimum between malformed markers: " << end_marker << ", " << entry.marker << dendl;
} else {
- ldout(cct, 20) << "INFO: new end marker for removal: " << end_marker << " from: " << queue_name << dendl;
+ ldpp_dout(this, 20) << "INFO: new end marker for removal: " << end_marker << " from: " << queue_name << dendl;
}
has_error = true;
- ldout(cct, 20) << "INFO: processing of entry: " <<
+ ldpp_dout(this, 20) << "INFO: processing of entry: " <<
entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " failed" << dendl;
}
}, make_stack_allocator());
"" /*no tag*/);
cls_2pc_queue_remove_entries(op, end_marker);
// check ownership and deleted entries in one batch
- const auto ret = rgw_rados_operate(rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
+ const auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
if (ret == -ENOENT) {
// queue was deleted
- ldout(cct, 5) << "INFO: queue: "
+ ldpp_dout(this, 5) << "INFO: queue: "
<< queue_name << ". was removed. processing will stop" << dendl;
return;
}
if (ret == -EBUSY) {
- ldout(cct, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
+ ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
return;
}
if (ret < 0) {
- ldout(cct, 1) << "ERROR: failed to remove entries and/or lock queue up to: " << end_marker << " from queue: "
+ ldpp_dout(this, 1) << "ERROR: failed to remove entries and/or lock queue up to: " << end_marker << " from queue: "
<< queue_name << ". error: " << ret << dendl;
} else {
- ldout(cct, 20) << "INFO: removed entries up to: " << end_marker << " from queue: "
+ ldpp_dout(this, 20) << "INFO: removed entries up to: " << end_marker << " from queue: "
<< queue_name << dendl;
}
}
std::chrono::milliseconds(duration_jitter(rnd_gen));
timer.expires_from_now(duration);
const auto tp = ceph::coarse_real_time::clock::to_time_t(ceph::coarse_real_time::clock::now() + duration);
- ldout(cct, 20) << "INFO: next queues processing will happen at: " << std::ctime(&tp) << dendl;
+ ldpp_dout(this, 20) << "INFO: next queues processing will happen at: " << std::ctime(&tp) << dendl;
boost::system::error_code ec;
timer.async_wait(yield[ec]);
failover_time,
LOCK_FLAG_MAY_RENEW);
- ret = rgw_rados_operate(rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
+ ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
if (ret == -EBUSY) {
// lock is already taken by another RGW
- ldout(cct, 20) << "INFO: queue: " << queue_name << " owned (locked) by another daemon" << dendl;
+ ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " owned (locked) by another daemon" << dendl;
// if queue was owned by this RGW, processing should be stopped, queue would be deleted from list afterwards
continue;
}
if (ret == -ENOENT) {
// queue is deleted - processing will stop the next time we try to read from the queue
- ldout(cct, 10) << "INFO: queue: " << queue_name << " should not be locked - already deleted" << dendl;
+ ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " should not be locked - already deleted" << dendl;
continue;
}
if (ret < 0) {
// failed to lock for another reason, continue to process other queues
- ldout(cct, 1) << "ERROR: failed to lock queue: " << queue_name << ". error: " << ret << dendl;
+ ldpp_dout(this, 1) << "ERROR: failed to lock queue: " << queue_name << ". error: " << ret << dendl;
has_error = true;
continue;
}
// add queue to list of owned queues
if (owned_queues.insert(queue_name).second) {
- ldout(cct, 10) << "INFO: queue: " << queue_name << " now owned (locked) by this daemon" << dendl;
+ ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " now owned (locked) by this daemon" << dendl;
// start processing this queue
spawn::spawn(io_context, [this, &queue_gc, &queue_gc_lock, queue_name](spawn::yield_context yield) {
process_queue(queue_name, yield);
// mark it for deletion
std::lock_guard lock_guard(queue_gc_lock);
queue_gc.push_back(queue_name);
- ldout(cct, 10) << "INFO: queue: " << queue_name << " marked for removal" << dendl;
+ ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " marked for removal" << dendl;
}, make_stack_allocator());
} else {
- ldout(cct, 20) << "INFO: queue: " << queue_name << " ownership (lock) renewed" << dendl;
+ ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " ownership (lock) renewed" << dendl;
}
}
// erase all queue that were deleted
std::lock_guard lock_guard(queue_gc_lock);
std::for_each(queue_gc.begin(), queue_gc.end(), [this, &owned_queues](const std::string& queue_name) {
owned_queues.erase(queue_name);
- ldout(cct, 20) << "INFO: queue: " << queue_name << " removed" << dendl;
+ ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " removed" << dendl;
});
queue_gc.clear();
}
(WORKER_THREAD_NAME+std::to_string(worker_id)).c_str());
ceph_assert(rc == 0);
}
- ldout(cct, 10) << "Started notification manager with: " << worker_count << " workers" << dendl;
+ ldpp_dout(this, 10) << "Started notification manager with: " << worker_count << " workers" << dendl;
}
int add_persistent_topic(const std::string& topic_name, optional_yield y) {
if (topic_name == Q_LIST_OBJECT_NAME) {
- ldout(cct, 1) << "ERROR: topic name cannot be: " << Q_LIST_OBJECT_NAME << " (conflict with queue list object name)" << dendl;
+ ldpp_dout(this, 1) << "ERROR: topic name cannot be: " << Q_LIST_OBJECT_NAME << " (conflict with queue list object name)" << dendl;
return -EINVAL;
}
librados::ObjectWriteOperation op;
op.create(true);
cls_2pc_queue_init(op, topic_name, max_queue_size);
- auto ret = rgw_rados_operate(rados_ioctx, topic_name, &op, y);
+ auto ret = rgw_rados_operate(this, rados_ioctx, topic_name, &op, y);
if (ret == -EEXIST) {
// queue already exists - nothing to do
- ldout(cct, 20) << "INFO: queue for topic: " << topic_name << " already exists. nothing to do" << dendl;
+ ldpp_dout(this, 20) << "INFO: queue for topic: " << topic_name << " already exists. nothing to do" << dendl;
return 0;
}
if (ret < 0) {
// failed to create queue
- ldout(cct, 1) << "ERROR: failed to create queue for topic: " << topic_name << ". error: " << ret << dendl;
+ ldpp_dout(this, 1) << "ERROR: failed to create queue for topic: " << topic_name << ". error: " << ret << dendl;
return ret;
}
bufferlist empty_bl;
std::map<std::string, bufferlist> new_topic{{topic_name, empty_bl}};
op.omap_set(new_topic);
- ret = rgw_rados_operate(rados_ioctx, Q_LIST_OBJECT_NAME, &op, y);
+ ret = rgw_rados_operate(this, rados_ioctx, Q_LIST_OBJECT_NAME, &op, y);
if (ret < 0) {
- ldout(cct, 1) << "ERROR: failed to add queue: " << topic_name << " to queue list. error: " << ret << dendl;
+ ldpp_dout(this, 1) << "ERROR: failed to add queue: " << topic_name << " to queue list. error: " << ret << dendl;
return ret;
}
- ldout(cct, 20) << "INFO: queue: " << topic_name << " added to queue list" << dendl;
+ ldpp_dout(this, 20) << "INFO: queue: " << topic_name << " added to queue list" << dendl;
return 0;
}
int remove_persistent_topic(const std::string& topic_name, optional_yield y) {
librados::ObjectWriteOperation op;
op.remove();
- auto ret = rgw_rados_operate(rados_ioctx, topic_name, &op, y);
+ auto ret = rgw_rados_operate(this, rados_ioctx, topic_name, &op, y);
if (ret == -ENOENT) {
// queue already removed - nothing to do
- ldout(cct, 20) << "INFO: queue for topic: " << topic_name << " already removed. nothing to do" << dendl;
+ ldpp_dout(this, 20) << "INFO: queue for topic: " << topic_name << " already removed. nothing to do" << dendl;
return 0;
}
if (ret < 0) {
// failed to remove queue
- ldout(cct, 1) << "ERROR: failed to remove queue for topic: " << topic_name << ". error: " << ret << dendl;
+ ldpp_dout(this, 1) << "ERROR: failed to remove queue for topic: " << topic_name << ". error: " << ret << dendl;
return ret;
}
std::set<std::string> topic_to_remove{{topic_name}};
op.omap_rm_keys(topic_to_remove);
- ret = rgw_rados_operate(rados_ioctx, Q_LIST_OBJECT_NAME, &op, y);
+ ret = rgw_rados_operate(this, rados_ioctx, Q_LIST_OBJECT_NAME, &op, y);
if (ret < 0) {
- ldout(cct, 1) << "ERROR: failed to remove queue: " << topic_name << " from queue list. error: " << ret << dendl;
+ ldpp_dout(this, 1) << "ERROR: failed to remove queue: " << topic_name << " from queue list. error: " << ret << dendl;
return ret;
}
- ldout(cct, 20) << "INFO: queue: " << topic_name << " removed from queue list" << dendl;
+ ldpp_dout(this, 20) << "INFO: queue: " << topic_name << " removed from queue list" << dendl;
return 0;
}
};
constexpr uint32_t STALE_RESERVATIONS_PERIOD_S = 120; // cleanup reservations that are more than 2 minutes old
constexpr uint32_t RESERVATIONS_CLEANUP_PERIOD_S = 30; // reservation cleanup every 30 seconds
-bool init(CephContext* cct, rgw::sal::RGWRadosStore* store) {
+bool init(CephContext* cct, rgw::sal::RGWRadosStore* store, const DoutPrefixProvider *dpp) {
if (s_manager) {
return false;
}
if (!src_obj->get_bucket()) {
src_obj->set_bucket(s->bucket.get());
}
- if (src_obj->get_obj_attrs(s->obj_ctx, s->yield) < 0) {
+ if (src_obj->get_obj_attrs(s->obj_ctx, s->yield, s) < 0) {
return nullptr;
}
}
return true;
}
-int publish_reserve(EventType event_type,
+int publish_reserve(const DoutPrefixProvider *dpp, EventType event_type,
reservation_t& res,
const RGWObjTags* req_tags)
{
// notification does not apply to req_state
continue;
}
- ldout(res.s->cct, 20) << "INFO: notification: '" << topic_filter.s3_id <<
+ ldpp_dout(dpp, 20) << "INFO: notification: '" << topic_filter.s3_id <<
"' on topic: '" << topic_cfg.dest.arn_topic <<
"' and bucket: '" << res.s->bucket->get_name() <<
"' (unique topic: '" << topic_cfg.name <<
int rval;
const auto& queue_name = topic_cfg.dest.arn_topic;
cls_2pc_queue_reserve(op, res.size, 1, &obl, &rval);
- auto ret = rgw_rados_operate(res.store->getRados()->get_notif_pool_ctx(),
+ auto ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(),
queue_name, &op, res.s->yield, librados::OPERATION_RETURNVEC);
if (ret < 0) {
- ldout(res.s->cct, 1) << "ERROR: failed to reserve notification on queue: " << queue_name
+ ldpp_dout(dpp, 1) << "ERROR: failed to reserve notification on queue: " << queue_name
<< ". error: " << ret << dendl;
// if no space is left in queue we ask client to slow down
return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret;
}
ret = cls_2pc_queue_reserve_result(obl, res_id);
if (ret < 0) {
- ldout(res.s->cct, 1) << "ERROR: failed to parse reservation id. error: " << ret << dendl;
+ ldpp_dout(dpp, 1) << "ERROR: failed to parse reservation id. error: " << ret << dendl;
return ret;
}
}
const ceph::real_time& mtime,
const std::string& etag,
EventType event_type,
- reservation_t& res)
+ reservation_t& res,
+ const DoutPrefixProvider *dpp)
{
for (auto& topic : res.topics) {
if (topic.cfg.dest.persistent && topic.res_id == cls_2pc_reservation::NO_ID) {
const auto& queue_name = topic.cfg.dest.arn_topic;
if (bl.length() > res.size) {
// try to make a larger reservation, fail only if this is not possible
- ldout(res.s->cct, 5) << "WARNING: committed size: " << bl.length() << " exceeded reserved size: " << res.size <<
+ ldpp_dout(dpp, 5) << "WARNING: committed size: " << bl.length() << " exceeded reserved size: " << res.size <<
" . trying to make a larger reservation on queue:" << queue_name << dendl;
// first cancel the existing reservation
librados::ObjectWriteOperation op;
cls_2pc_queue_abort(op, topic.res_id);
- auto ret = rgw_rados_operate(res.store->getRados()->get_notif_pool_ctx(),
+ auto ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(),
topic.cfg.dest.arn_topic, &op,
res.s->yield);
if (ret < 0) {
- ldout(res.s->cct, 1) << "ERROR: failed to abort reservation: " << topic.res_id <<
+ ldpp_dout(dpp, 1) << "ERROR: failed to abort reservation: " << topic.res_id <<
" when trying to make a larger reservation on queue: " << queue_name
<< ". error: " << ret << dendl;
return ret;
bufferlist obl;
int rval;
cls_2pc_queue_reserve(op, bl.length(), 1, &obl, &rval);
- ret = rgw_rados_operate(res.store->getRados()->get_notif_pool_ctx(),
+ ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(),
queue_name, &op, res.s->yield, librados::OPERATION_RETURNVEC);
if (ret < 0) {
- ldout(res.s->cct, 1) << "ERROR: failed to reserve extra space on queue: " << queue_name
+ ldpp_dout(dpp, 1) << "ERROR: failed to reserve extra space on queue: " << queue_name
<< ". error: " << ret << dendl;
return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret;
}
ret = cls_2pc_queue_reserve_result(obl, topic.res_id);
if (ret < 0) {
- ldout(res.s->cct, 1) << "ERROR: failed to parse reservation id for extra space. error: " << ret << dendl;
+ ldpp_dout(dpp, 1) << "ERROR: failed to parse reservation id for extra space. error: " << ret << dendl;
return ret;
}
}
std::vector<bufferlist> bl_data_vec{std::move(bl)};
librados::ObjectWriteOperation op;
cls_2pc_queue_commit(op, bl_data_vec, topic.res_id);
- const auto ret = rgw_rados_operate(res.store->getRados()->get_notif_pool_ctx(),
+ const auto ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(),
queue_name, &op,
res.s->yield);
topic.res_id = cls_2pc_reservation::NO_ID;
if (ret < 0) {
- ldout(res.s->cct, 1) << "ERROR: failed to commit reservation to queue: " << queue_name
+ ldpp_dout(dpp, 1) << "ERROR: failed to commit reservation to queue: " << queue_name
<< ". error: " << ret << dendl;
return ret;
}
// TODO add endpoint LRU cache
const auto push_endpoint = RGWPubSubEndpoint::create(topic.cfg.dest.push_endpoint,
topic.cfg.dest.arn_topic,
- RGWHTTPArgs(topic.cfg.dest.push_endpoint_args),
+ RGWHTTPArgs(topic.cfg.dest.push_endpoint_args, dpp),
res.s->cct);
- ldout(res.s->cct, 20) << "INFO: push endpoint created: " << topic.cfg.dest.push_endpoint << dendl;
+ ldpp_dout(dpp, 20) << "INFO: push endpoint created: " << topic.cfg.dest.push_endpoint << dendl;
const auto ret = push_endpoint->send_to_completion_async(res.s->cct, event_entry.event, res.s->yield);
if (ret < 0) {
- ldout(res.s->cct, 1) << "ERROR: push to endpoint " << topic.cfg.dest.push_endpoint << " failed. error: " << ret << dendl;
+ ldpp_dout(dpp, 1) << "ERROR: push to endpoint " << topic.cfg.dest.push_endpoint << " failed. error: " << ret << dendl;
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
return ret;
}
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
} catch (const RGWPubSubEndpoint::configuration_error& e) {
- ldout(res.s->cct, 1) << "ERROR: failed to create push endpoint: "
+ ldpp_dout(dpp, 1) << "ERROR: failed to create push endpoint: "
<< topic.cfg.dest.push_endpoint << ". error: " << e.what() << dendl;
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
return -EINVAL;
return 0;
}
-int publish_abort(reservation_t& res) {
+int publish_abort(const DoutPrefixProvider *dpp, reservation_t& res) {
for (auto& topic : res.topics) {
if (!topic.cfg.dest.persistent || topic.res_id == cls_2pc_reservation::NO_ID) {
// nothing to abort or already committed/aborted
const auto& queue_name = topic.cfg.dest.arn_topic;
librados::ObjectWriteOperation op;
cls_2pc_queue_abort(op, topic.res_id);
- const auto ret = rgw_rados_operate(res.store->getRados()->get_notif_pool_ctx(),
+ const auto ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(),
queue_name, &op,
res.s->yield);
if (ret < 0) {
- ldout(res.s->cct, 1) << "ERROR: failed to abort reservation: " << topic.res_id <<
+ ldpp_dout(dpp, 1) << "ERROR: failed to abort reservation: " << topic.res_id <<
" from queue: " << queue_name << ". error: " << ret << dendl;
return ret;
}
}
reservation_t::~reservation_t() {
- publish_abort(*this);
+ publish_abort(dpp, *this);
}
}