1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "rgw_pubsub_push.h"
8 #include "include/buffer_fwd.h"
9 #include "common/Formatter.h"
10 #include "common/iso_8601.h"
11 #include "common/async/completion.h"
12 #include "rgw_common.h"
13 #include "rgw_data_sync.h"
14 #include "rgw_pubsub.h"
16 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
19 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
20 #include "rgw_kafka.h"
22 #include <boost/asio/yield.hpp>
23 #include <boost/algorithm/string.hpp>
25 #include "rgw_perf_counters.h"
29 template<typename EventType
>
30 std::string
json_format_pubsub_event(const EventType
& event
) {
32 JSONFormatter
f(false);
34 Formatter::ObjectSection
s(f
, EventType::json_type_plural
);
36 Formatter::ArraySection
s(f
, EventType::json_type_plural
);
37 encode_json("", event
, &f
);
44 bool get_bool(const RGWHTTPArgs
& args
, const std::string
& name
, bool default_value
) {
47 if (args
.get_bool(name
.c_str(), &value
, &exists
) == -EINVAL
) {
48 throw RGWPubSubEndpoint::configuration_error("invalid boolean value for " + name
);
56 class RGWPubSubHTTPEndpoint
: public RGWPubSubEndpoint
{
58 const std::string endpoint
;
59 typedef unsigned ack_level_t
;
60 ack_level_t ack_level
; // TODO: not used for now
61 const bool verify_ssl
;
62 const bool cloudevents
;
63 static const ack_level_t ACK_LEVEL_ANY
= 0;
64 static const ack_level_t ACK_LEVEL_NON_ERROR
= 1;
67 RGWPubSubHTTPEndpoint(const std::string
& _endpoint
, const RGWHTTPArgs
& args
) :
68 endpoint(_endpoint
), verify_ssl(get_bool(args
, "verify-ssl", true)), cloudevents(get_bool(args
, "cloudevents", false))
71 const auto& str_ack_level
= args
.get("http-ack-level", &exists
);
72 if (!exists
|| str_ack_level
== "any") {
74 ack_level
= ACK_LEVEL_ANY
;
75 } else if (str_ack_level
== "non-error") {
76 ack_level
= ACK_LEVEL_NON_ERROR
;
78 ack_level
= std::atoi(str_ack_level
.c_str());
79 if (ack_level
< 100 || ack_level
>= 600) {
80 throw configuration_error("HTTP/S: invalid http-ack-level: " + str_ack_level
);
85 int send_to_completion_async(CephContext
* cct
, const rgw_pubsub_s3_event
& event
, optional_yield y
) override
{
87 RGWPostHTTPData
request(cct
, "POST", endpoint
, &read_bl
, verify_ssl
);
88 const auto post_data
= json_format_pubsub_event(event
);
90 // following: https://github.com/cloudevents/spec/blob/v1.0.1/http-protocol-binding.md
91 // using "Binary Content Mode"
92 request
.append_header("ce-specversion", "1.0");
93 request
.append_header("ce-type", "com.amazonaws." + event
.eventName
);
94 request
.append_header("ce-time", to_iso_8601(event
.eventTime
));
95 // default output of iso8601 is also RFC3339 compatible
96 request
.append_header("ce-id", event
.x_amz_request_id
+ "." + event
.x_amz_id_2
);
97 request
.append_header("ce-source", event
.eventSource
+ "." + event
.awsRegion
+ "." + event
.bucket_name
);
98 request
.append_header("ce-subject", event
.object_key
);
100 request
.set_post_data(post_data
);
101 request
.set_send_length(post_data
.length());
102 request
.append_header("Content-Type", "application/json");
103 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_pending
);
104 const auto rc
= RGWHTTP::process(&request
, y
);
105 if (perfcounter
) perfcounter
->dec(l_rgw_pubsub_push_pending
);
106 // TODO: use read_bl to process return code and handle according to ack level
110 std::string
to_str() const override
{
111 std::string
str("HTTP/S Endpoint");
112 str
+= "\nURI: " + endpoint
;
113 str
+= (verify_ssl
? "\nverify SSL" : "\ndon't verify SSL");
118 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
119 class RGWPubSubAMQPEndpoint
: public RGWPubSubEndpoint
{
121 enum class ack_level_t
{
126 CephContext
* const cct
;
127 const std::string endpoint
;
128 const std::string topic
;
129 const std::string exchange
;
130 ack_level_t ack_level
;
131 amqp::connection_id_t conn_id
;
133 bool get_verify_ssl(const RGWHTTPArgs
& args
) {
135 auto str_verify_ssl
= args
.get("verify-ssl", &exists
);
137 // verify server certificate by default
140 boost::algorithm::to_lower(str_verify_ssl
);
141 if (str_verify_ssl
== "true") {
144 if (str_verify_ssl
== "false") {
147 throw configuration_error("'verify-ssl' must be true/false, not: " + str_verify_ssl
);
150 std::string
get_exchange(const RGWHTTPArgs
& args
) {
152 const auto exchange
= args
.get("amqp-exchange", &exists
);
154 throw configuration_error("AMQP: missing amqp-exchange");
159 ack_level_t
get_ack_level(const RGWHTTPArgs
& args
) {
161 const auto& str_ack_level
= args
.get("amqp-ack-level", &exists
);
162 if (!exists
|| str_ack_level
== "broker") {
163 // "broker" is default
164 return ack_level_t::Broker
;
166 if (str_ack_level
== "none") {
167 return ack_level_t::None
;
169 if (str_ack_level
== "routable") {
170 return ack_level_t::Routable
;
172 throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level
);
176 RGWPubSubAMQPEndpoint(const std::string
& _endpoint
,
177 const std::string
& _topic
,
178 const RGWHTTPArgs
& args
,
183 exchange(get_exchange(args
)),
184 ack_level(get_ack_level(args
)) {
185 if (!amqp::connect(conn_id
, endpoint
, exchange
, (ack_level
== ack_level_t::Broker
), get_verify_ssl(args
), args
.get_optional("ca-location"))) {
186 throw configuration_error("AMQP: failed to create connection to: " + endpoint
);
190 // this allows waiting untill "finish()" is called from a different thread
191 // waiting could be blocking the waiting thread or yielding, depending
192 // with compilation flag support and whether the optional_yield is set
194 using Signature
= void(boost::system::error_code
);
195 using Completion
= ceph::async::Completion
<Signature
>;
196 std::unique_ptr
<Completion
> completion
= nullptr;
199 mutable std::atomic
<bool> done
= false;
200 mutable std::mutex lock
;
201 mutable std::condition_variable cond
;
203 template <typename ExecutionContext
, typename CompletionToken
>
204 auto async_wait(ExecutionContext
& ctx
, CompletionToken
&& token
) {
205 boost::asio::async_completion
<CompletionToken
, Signature
> init(token
);
206 auto& handler
= init
.completion_handler
;
208 std::unique_lock l
{lock
};
209 completion
= Completion::create(ctx
.get_executor(), std::move(handler
));
211 return init
.result
.get();
215 int wait(optional_yield y
) {
220 auto& io_ctx
= y
.get_io_context();
221 auto& yield_ctx
= y
.get_yield_context();
222 boost::system::error_code ec
;
223 async_wait(io_ctx
, yield_ctx
[ec
]);
226 std::unique_lock
l(lock
);
227 cond
.wait(l
, [this]{return (done
==true);});
232 std::unique_lock l
{lock
};
236 boost::system::error_code
ec(-ret
, boost::system::system_category());
237 Completion::post(std::move(completion
), ec
);
244 int send_to_completion_async(CephContext
* cct
, const rgw_pubsub_s3_event
& event
, optional_yield y
) override
{
245 if (ack_level
== ack_level_t::None
) {
246 return amqp::publish(conn_id
, topic
, json_format_pubsub_event(event
));
248 // TODO: currently broker and routable are the same - this will require different flags but the same mechanism
249 // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
250 auto w
= std::unique_ptr
<Waiter
>(new Waiter
);
251 const auto rc
= amqp::publish_with_confirm(conn_id
,
253 json_format_pubsub_event(event
),
254 std::bind(&Waiter::finish
, w
.get(), std::placeholders::_1
));
256 // failed to publish, does not wait for reply
263 std::string
to_str() const override
{
264 std::string
str("AMQP(0.9.1) Endpoint");
265 str
+= "\nURI: " + endpoint
;
266 str
+= "\nTopic: " + topic
;
267 str
+= "\nExchange: " + exchange
;
272 static const std::string
AMQP_0_9_1("0-9-1");
273 static const std::string
AMQP_1_0("1-0");
274 static const std::string
AMQP_SCHEMA("amqp");
275 #endif // ifdef WITH_RADOSGW_AMQP_ENDPOINT
277 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
278 class RGWPubSubKafkaEndpoint
: public RGWPubSubEndpoint
{
280 enum class ack_level_t
{
284 CephContext
* const cct
;
285 const std::string topic
;
286 const ack_level_t ack_level
;
287 std::string conn_name
;
290 ack_level_t
get_ack_level(const RGWHTTPArgs
& args
) {
292 const auto& str_ack_level
= args
.get("kafka-ack-level", &exists
);
293 if (!exists
|| str_ack_level
== "broker") {
294 // "broker" is default
295 return ack_level_t::Broker
;
297 if (str_ack_level
== "none") {
298 return ack_level_t::None
;
300 throw configuration_error("Kafka: invalid kafka-ack-level: " + str_ack_level
);
304 RGWPubSubKafkaEndpoint(const std::string
& _endpoint
,
305 const std::string
& _topic
,
306 const RGWHTTPArgs
& args
,
310 ack_level(get_ack_level(args
)) {
311 if (!kafka::connect(conn_name
, _endpoint
, get_bool(args
, "use-ssl", false), get_bool(args
, "verify-ssl", true),
312 args
.get_optional("ca-location"), args
.get_optional("mechanism"))) {
313 throw configuration_error("Kafka: failed to create connection to: " + _endpoint
);
317 // this allows waiting untill "finish()" is called from a different thread
318 // waiting could be blocking the waiting thread or yielding, depending
319 // with compilation flag support and whether the optional_yield is set
321 using Signature
= void(boost::system::error_code
);
322 using Completion
= ceph::async::Completion
<Signature
>;
323 std::unique_ptr
<Completion
> completion
= nullptr;
326 mutable std::atomic
<bool> done
= false;
327 mutable std::mutex lock
;
328 mutable std::condition_variable cond
;
330 template <typename ExecutionContext
, typename CompletionToken
>
331 auto async_wait(ExecutionContext
& ctx
, CompletionToken
&& token
) {
332 boost::asio::async_completion
<CompletionToken
, Signature
> init(token
);
333 auto& handler
= init
.completion_handler
;
335 std::unique_lock l
{lock
};
336 completion
= Completion::create(ctx
.get_executor(), std::move(handler
));
338 return init
.result
.get();
342 int wait(optional_yield y
) {
347 auto& io_ctx
= y
.get_io_context();
348 auto& yield_ctx
= y
.get_yield_context();
349 boost::system::error_code ec
;
350 async_wait(io_ctx
, yield_ctx
[ec
]);
353 std::unique_lock
l(lock
);
354 cond
.wait(l
, [this]{return (done
==true);});
359 std::unique_lock l
{lock
};
363 boost::system::error_code
ec(-ret
, boost::system::system_category());
364 Completion::post(std::move(completion
), ec
);
371 int send_to_completion_async(CephContext
* cct
, const rgw_pubsub_s3_event
& event
, optional_yield y
) override
{
372 if (ack_level
== ack_level_t::None
) {
373 return kafka::publish(conn_name
, topic
, json_format_pubsub_event(event
));
375 // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
376 auto w
= std::unique_ptr
<Waiter
>(new Waiter
);
377 const auto rc
= kafka::publish_with_confirm(conn_name
,
379 json_format_pubsub_event(event
),
380 std::bind(&Waiter::finish
, w
.get(), std::placeholders::_1
));
382 // failed to publish, does not wait for reply
389 std::string
to_str() const override
{
390 std::string
str("Kafka Endpoint");
391 str
+= "\nBroker: " + conn_name
;
392 str
+= "\nTopic: " + topic
;
397 static const std::string
KAFKA_SCHEMA("kafka");
398 #endif // ifdef WITH_RADOSGW_KAFKA_ENDPOINT
400 static const std::string
WEBHOOK_SCHEMA("webhook");
401 static const std::string
UNKNOWN_SCHEMA("unknown");
402 static const std::string
NO_SCHEMA("");
404 const std::string
& get_schema(const std::string
& endpoint
) {
405 if (endpoint
.empty()) {
408 const auto pos
= endpoint
.find(':');
409 if (pos
== std::string::npos
) {
410 return UNKNOWN_SCHEMA
;
412 const auto& schema
= endpoint
.substr(0,pos
);
413 if (schema
== "http" || schema
== "https") {
414 return WEBHOOK_SCHEMA
;
415 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
416 } else if (schema
== "amqp" || schema
== "amqps") {
419 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
420 } else if (schema
== "kafka") {
424 return UNKNOWN_SCHEMA
;
427 RGWPubSubEndpoint::Ptr
RGWPubSubEndpoint::create(const std::string
& endpoint
,
428 const std::string
& topic
,
429 const RGWHTTPArgs
& args
,
431 const auto& schema
= get_schema(endpoint
);
432 if (schema
== WEBHOOK_SCHEMA
) {
433 return Ptr(new RGWPubSubHTTPEndpoint(endpoint
, args
));
434 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
435 } else if (schema
== AMQP_SCHEMA
) {
437 std::string version
= args
.get("amqp-version", &exists
);
439 version
= AMQP_0_9_1
;
441 if (version
== AMQP_0_9_1
) {
442 return Ptr(new RGWPubSubAMQPEndpoint(endpoint
, topic
, args
, cct
));
443 } else if (version
== AMQP_1_0
) {
444 throw configuration_error("AMQP: v1.0 not supported");
447 throw configuration_error("AMQP: unknown version: " + version
);
451 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
452 } else if (schema
== KAFKA_SCHEMA
) {
453 return Ptr(new RGWPubSubKafkaEndpoint(endpoint
, topic
, args
, cct
));
457 throw configuration_error("unknown schema in: " + endpoint
);