const std::string topic;
const std::string exchange;
ack_level_t ack_level;
- amqp::connection_ptr_t conn;
+ amqp::connection_id_t conn_id;
bool get_verify_ssl(const RGWHTTPArgs& args) {
bool exists;
endpoint(_endpoint),
topic(_topic),
exchange(get_exchange(args)),
- ack_level(get_ack_level(args)),
- conn(amqp::connect(endpoint, exchange, (ack_level == ack_level_t::Broker), get_verify_ssl(args), args.get_optional("ca-location"))) {
- if (!conn) {
+ ack_level(get_ack_level(args)) {
+ if (!amqp::connect(conn_id, endpoint, exchange, (ack_level == ack_level_t::Broker), get_verify_ssl(args), args.get_optional("ca-location"))) {
throw configuration_error("AMQP: failed to create connection to: " + endpoint);
}
}
};
int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
- ceph_assert(conn);
if (ack_level == ack_level_t::None) {
- return amqp::publish(conn, topic, json_format_pubsub_event(event));
+ return amqp::publish(conn_id, topic, json_format_pubsub_event(event));
} else {
// TODO: currently broker and routable are the same - this will require different flags but the same mechanism
// note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
auto w = std::unique_ptr<Waiter>(new Waiter);
- const auto rc = amqp::publish_with_confirm(conn,
+ const auto rc = amqp::publish_with_confirm(conn_id,
topic,
json_format_pubsub_event(event),
std::bind(&Waiter::finish, w.get(), std::placeholders::_1));