#include <algorithm>
#include "include/buffer_fwd.h"
#include "common/Formatter.h"
+#include "common/async/completion.h"
#include "rgw_common.h"
#include "rgw_data_sync.h"
#include "rgw_pubsub.h"
using namespace rgw;
-std::string json_format_pubsub_event(const rgw_pubsub_event& event) {
+template<typename EventType>
+std::string json_format_pubsub_event(const EventType& event) {
std::stringstream ss;
JSONFormatter f(false);
- encode_json("event", event, &f);
+ encode_json(EventType::json_type_single, event, &f);
f.flush(ss);
return ss.str();
}
public:
RGWPubSubHTTPEndpoint(const std::string& _endpoint,
- const RGWHTTPArgs& args) :
- endpoint(_endpoint) {
- bool exists;
-
- str_ack_level = args.get("http-ack-level", &exists);
- if (!exists || str_ack_level == "any") {
- // "any" is default
- ack_level = ACK_LEVEL_ANY;
- } else if (str_ack_level == "non-error") {
- ack_level = ACK_LEVEL_NON_ERROR;
- } else {
- ack_level = std::atoi(str_ack_level.c_str());
- if (ack_level < 100 || ack_level >= 600) {
- throw configuration_error("HTTP: invalid http-ack-level " + str_ack_level);
- }
- }
+ const RGWHTTPArgs& args) : endpoint(_endpoint) {
+ bool exists;
- auto str_verify_ssl = args.get("verify-ssl", &exists);
- boost::algorithm::to_lower(str_verify_ssl);
- // verify server certificate by default
- if (!exists || str_verify_ssl == "true") {
- verify_ssl = true;
- } else if (str_verify_ssl == "false") {
- verify_ssl = false;
- } else {
- throw configuration_error("HTTP: verify-ssl must be true/false, not: " + str_verify_ssl);
+ str_ack_level = args.get("http-ack-level", &exists);
+ if (!exists || str_ack_level == "any") {
+ // "any" is default
+ ack_level = ACK_LEVEL_ANY;
+ } else if (str_ack_level == "non-error") {
+ ack_level = ACK_LEVEL_NON_ERROR;
+ } else {
+ ack_level = std::atoi(str_ack_level.c_str());
+ if (ack_level < 100 || ack_level >= 600) {
+ throw configuration_error("HTTP/S: invalid http-ack-level: " + str_ack_level);
}
}
+ auto str_verify_ssl = args.get("verify-ssl", &exists);
+ boost::algorithm::to_lower(str_verify_ssl);
+ // verify server certificate by default
+ if (!exists || str_verify_ssl == "true") {
+ verify_ssl = true;
+ } else if (str_verify_ssl == "false") {
+ verify_ssl = false;
+ } else {
+ throw configuration_error("HTTP/S: verify-ssl must be true/false, not: " + str_verify_ssl);
+ }
+ }
+
RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
return new PostCR(json_format_pubsub_event(event), env, endpoint, ack_level, verify_ssl);
}
+ RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override {
+ return new PostCR(json_format_pubsub_event(record), env, endpoint, ack_level, verify_ssl);
+ }
+
+ int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override {
+ bufferlist read_bl;
+ RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl);
+ const auto post_data = json_format_pubsub_event(record);
+ request.set_post_data(post_data);
+ request.set_send_length(post_data.length());
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
+ const auto rc = RGWHTTP::process(&request, y);
+ if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+ // TODO: use read_bl to process return code and handle according to ack level
+ return rc;
+ }
+
std::string to_str() const override {
- std::string str("HTTP Endpoint");
+ std::string str("HTTP/S Endpoint");
str += "\nURI: " + endpoint;
str += "\nAck Level: " + str_ack_level;
str += (verify_ssl ? "\nverify SSL" : "\ndon't verify SSL");
#ifdef WITH_RADOSGW_AMQP_ENDPOINT
class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint {
- private:
- enum ack_level_t {
- ACK_LEVEL_NONE,
- ACK_LEVEL_BROKER,
- ACK_LEVEL_ROUTEABLE
- };
- const std::string endpoint;
- const std::string topic;
- amqp::connection_ptr_t conn;
- ack_level_t ack_level;
- std::string str_ack_level;
-
- static std::string get_exchange(const RGWHTTPArgs& args) {
- bool exists;
- const auto exchange = args.get("amqp-exchange", &exists);
- if (!exists) {
- throw configuration_error("AMQP: missing amqp-exchange");
- }
- return exchange;
+private:
+ enum ack_level_t {
+ ACK_LEVEL_NONE,
+ ACK_LEVEL_BROKER,
+ ACK_LEVEL_ROUTEABLE
+ };
+ CephContext* const cct;
+ const std::string endpoint;
+ const std::string topic;
+ const std::string exchange;
+ amqp::connection_ptr_t conn;
+ ack_level_t ack_level;
+ std::string str_ack_level;
+
+ static std::string get_exchange(const RGWHTTPArgs& args) {
+ bool exists;
+ const auto exchange = args.get("amqp-exchange", &exists);
+ if (!exists) {
+ throw configuration_error("AMQP: missing amqp-exchange");
}
+ return exchange;
+ }
// NoAckPublishCR implements async amqp publishing via coroutine
// This coroutine ends when it send the message and does not wait for an ack
class NoAckPublishCR : public RGWCoroutine {
private:
- RGWDataSyncEnv* const sync_env;
const std::string topic;
amqp::connection_ptr_t conn;
const std::string message;
public:
- NoAckPublishCR(RGWDataSyncEnv* _sync_env,
+ NoAckPublishCR(CephContext* cct,
const std::string& _topic,
amqp::connection_ptr_t& _conn,
const std::string& _message) :
- RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+ RGWCoroutine(cct),
topic(_topic), conn(_conn), message(_message) {}
// send message to endpoint, without waiting for reply
// note that it does not wait for an ack fron the end client
class AckPublishCR : public RGWCoroutine, public RGWIOProvider {
private:
- RGWDataSyncEnv* const sync_env;
const std::string topic;
amqp::connection_ptr_t conn;
const std::string message;
const ack_level_t ack_level; // TODO not used for now
public:
- AckPublishCR(RGWDataSyncEnv* _sync_env,
+ AckPublishCR(CephContext* cct,
const std::string& _topic,
amqp::connection_ptr_t& _conn,
const std::string& _message,
ack_level_t _ack_level) :
- RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+ RGWCoroutine(cct),
topic(_topic), conn(_conn), message(_message), ack_level(_ack_level) {}
// send message to endpoint, waiting for reply
}
};
+public:
+ RGWPubSubAMQPEndpoint(const std::string& _endpoint,
+ const std::string& _topic,
+ const RGWHTTPArgs& args,
+ CephContext* _cct) :
+ cct(_cct),
+ endpoint(_endpoint),
+ topic(_topic),
+ exchange(get_exchange(args)),
+ conn(amqp::connect(endpoint, exchange)) {
+ if (!conn) {
+ throw configuration_error("AMQP: failed to create connection to: " + endpoint);
+ }
+ bool exists;
+ // get ack level
+ str_ack_level = args.get("amqp-ack-level", &exists);
+ if (!exists || str_ack_level == "broker") {
+ // "broker" is default
+ ack_level = ACK_LEVEL_BROKER;
+ } else if (str_ack_level == "none") {
+ ack_level = ACK_LEVEL_NONE;
+ } else if (str_ack_level == "routable") {
+ ack_level = ACK_LEVEL_ROUTEABLE;
+ } else {
+ throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level);
+ }
+ }
+
+ RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
+ ceph_assert(conn);
+ if (ack_level == ACK_LEVEL_NONE) {
+ return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
+ } else {
+ // TODO: currently broker and routable are the same - this will require different flags
+ // but the same mechanism
+ return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event), ack_level);
+ }
+ }
+
+ RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override {
+ ceph_assert(conn);
+ if (ack_level == ACK_LEVEL_NONE) {
+ return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(record));
+ } else {
+ // TODO: currently broker and routable are the same - this will require different flags
+ // but the same mechanism
+ return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(record), ack_level);
+ }
+ }
+
+ // this allows waiting untill "finish()" is called from a different thread
+ // waiting could be blocking the waiting thread or yielding, depending
+ // with compilation flag support and whether the optional_yield is set
+ class Waiter {
+ using Signature = void(boost::system::error_code);
+ using Completion = ceph::async::Completion<Signature>;
+ std::unique_ptr<Completion> completion = nullptr;
+ int ret;
+
+ mutable std::atomic<bool> done = false;
+ mutable std::mutex lock;
+ mutable std::condition_variable cond;
+
+ template <typename ExecutionContext, typename CompletionToken>
+ auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
+ boost::asio::async_completion<CompletionToken, Signature> init(token);
+ auto& handler = init.completion_handler;
+ {
+ std::unique_lock l{lock};
+ completion = Completion::create(ctx.get_executor(), std::move(handler));
+ }
+ return init.result.get();
+ }
+
public:
- RGWPubSubAMQPEndpoint(const std::string& _endpoint,
- const std::string& _topic,
- const RGWHTTPArgs& args) :
- endpoint(_endpoint),
- topic(_topic),
- conn(amqp::connect(endpoint, get_exchange(args))) {
- bool exists;
- // get ack level
- str_ack_level = args.get("amqp-ack-level", &exists);
- if (!exists || str_ack_level == "broker") {
- // "broker" is default
- ack_level = ACK_LEVEL_BROKER;
- } else if (str_ack_level == "none") {
- ack_level = ACK_LEVEL_NONE;
- } else if (str_ack_level == "routable") {
- ack_level = ACK_LEVEL_ROUTEABLE;
- } else {
- throw configuration_error("HTTP: invalid amqp-ack-level " + str_ack_level);
+ int wait(optional_yield y) {
+ if (done) {
+ return ret;
+ }
+#ifdef HAVE_BOOST_CONTEXT
+ if (y) {
+ auto& io_ctx = y.get_io_context();
+ auto& yield_ctx = y.get_yield_context();
+ boost::system::error_code ec;
+ async_wait(io_ctx, yield_ctx[ec]);
+ return -ec.value();
}
+#endif
+ std::unique_lock l(lock);
+ cond.wait(l, [this]{return (done==true);});
+ return ret;
}
- RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
- if (ack_level == ACK_LEVEL_NONE) {
- return new NoAckPublishCR(env, topic, conn, json_format_pubsub_event(event));
+ void finish(int r) {
+ std::unique_lock l{lock};
+ ret = r;
+ done = true;
+ if (completion) {
+ boost::system::error_code ec(-ret, boost::system::system_category());
+ Completion::post(std::move(completion), ec);
} else {
- // TODO: currently broker and routable are the same - this will require different flags
- // but the same mechanism
- return new AckPublishCR(env, topic, conn, json_format_pubsub_event(event), ack_level);
+ cond.notify_all();
}
}
+ };
- std::string to_str() const override {
- std::string str("AMQP(0.9.1) Endpoint");
- str += "\nURI: " + endpoint;
- str += "\nTopic: " + topic;
- str += "\nAck Level: " + str_ack_level;
- return str;
+ int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override {
+ ceph_assert(conn);
+ if (ack_level == ACK_LEVEL_NONE) {
+ return amqp::publish(conn, topic, json_format_pubsub_event(record));
+ } else {
+ // TODO: currently broker and routable are the same - this will require different flags but the same mechanism
+ // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
+ auto w = std::unique_ptr<Waiter>(new Waiter);
+ const auto rc = amqp::publish_with_confirm(conn,
+ topic,
+ json_format_pubsub_event(record),
+ std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
+ if (rc < 0) {
+ // failed to publish, does not wait for reply
+ return rc;
+ }
+ return w->wait(y);
}
+ }
+
+ std::string to_str() const override {
+ std::string str("AMQP(0.9.1) Endpoint");
+ str += "\nURI: " + endpoint;
+ str += "\nTopic: " + topic;
+ str += "\nExchange: " + exchange;
+ str += "\nAck Level: " + str_ack_level;
+ return str;
+ }
};
static const std::string AMQP_0_9_1("0-9-1");
static const std::string AMQP_1_0("1-0");
+static const std::string AMQP_SCHEMA("amqp");
#endif // ifdef WITH_RADOSGW_AMQP_ENDPOINT
-RGWPubSubEndpoint::Ptr RGWPubSubEndpoint::create(const std::string& endpoint,
- const std::string& topic,
- const RGWHTTPArgs& args) {
- //fetch the schema from the endpoint
+static const std::string WEBHOOK_SCHEMA("webhook");
+static const std::string UNKNOWN_SCHEMA("unknown");
+static const std::string NO_SCHEMA("");
+
+const std::string& get_schema(const std::string& endpoint) {
+ if (endpoint.empty()) {
+ return NO_SCHEMA;
+ }
const auto pos = endpoint.find(':');
if (pos == std::string::npos) {
- throw configuration_error("malformed endpoint " + endpoint);
- return nullptr;
+ return UNKNOWN_SCHEMA;
}
const auto& schema = endpoint.substr(0,pos);
if (schema == "http" || schema == "https") {
- return Ptr(new RGWPubSubHTTPEndpoint(endpoint, args));
+ return WEBHOOK_SCHEMA;
#ifdef WITH_RADOSGW_AMQP_ENDPOINT
} else if (schema == "amqp") {
+ return AMQP_SCHEMA;
+#endif
+ }
+ return UNKNOWN_SCHEMA;
+}
+
+RGWPubSubEndpoint::Ptr RGWPubSubEndpoint::create(const std::string& endpoint,
+ const std::string& topic,
+ const RGWHTTPArgs& args,
+ CephContext* cct) {
+ const auto& schema = get_schema(endpoint);
+ if (schema == WEBHOOK_SCHEMA) {
+ return Ptr(new RGWPubSubHTTPEndpoint(endpoint, args));
+#ifdef WITH_RADOSGW_AMQP_ENDPOINT
+ } else if (schema == AMQP_SCHEMA) {
bool exists;
std::string version = args.get("amqp-version", &exists);
if (!exists) {
version = AMQP_0_9_1;
}
if (version == AMQP_0_9_1) {
- return Ptr(new RGWPubSubAMQPEndpoint(endpoint, topic, args));
+ return Ptr(new RGWPubSubAMQPEndpoint(endpoint, topic, args, cct));
} else if (version == AMQP_1_0) {
- throw configuration_error("amqp v1.0 not supported");
+ throw configuration_error("AMQP: v1.0 not supported");
return nullptr;
} else {
- throw configuration_error("unknown amqp version " + version);
+ throw configuration_error("AMQP: unknown version: " + version);
return nullptr;
}
} else if (schema == "amqps") {
- throw configuration_error("amqps not supported");
+ throw configuration_error("AMQP: ssl not supported");
return nullptr;
#endif
}
- throw configuration_error("unknown schema " + schema);
+ throw configuration_error("unknown schema in: " + endpoint);
return nullptr;
}