1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "rgw_pubsub_push.h"
8 #include "include/buffer_fwd.h"
9 #include "common/Formatter.h"
10 #include "common/iso_8601.h"
11 #include "common/async/completion.h"
12 #include "rgw_common.h"
13 #include "rgw_data_sync.h"
14 #include "rgw_pubsub.h"
16 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
19 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
20 #include "rgw_kafka.h"
22 #include <boost/asio/yield.hpp>
23 #include <boost/algorithm/string.hpp>
25 #include "rgw_perf_counters.h"
29 template<typename EventType
>
30 std::string
json_format_pubsub_event(const EventType
& event
) {
32 JSONFormatter
f(false);
34 Formatter::ObjectSection
s(f
, EventType::json_type_plural
);
36 Formatter::ArraySection
s(f
, EventType::json_type_plural
);
37 encode_json("", event
, &f
);
44 bool get_bool(const RGWHTTPArgs
& args
, const std::string
& name
, bool default_value
) {
47 if (args
.get_bool(name
.c_str(), &value
, &exists
) == -EINVAL
) {
48 throw RGWPubSubEndpoint::configuration_error("invalid boolean value for " + name
);
56 class RGWPubSubHTTPEndpoint
: public RGWPubSubEndpoint
{
58 const std::string endpoint
;
59 typedef unsigned ack_level_t
;
60 ack_level_t ack_level
; // TODO: not used for now
61 const bool verify_ssl
;
62 const bool cloudevents
;
63 static const ack_level_t ACK_LEVEL_ANY
= 0;
64 static const ack_level_t ACK_LEVEL_NON_ERROR
= 1;
66 // PostCR implements async execution of RGWPostHTTPData via coroutine
67 class PostCR
: public RGWPostHTTPData
, public RGWSimpleCoroutine
{
69 RGWDataSyncEnv
* const sync_env
;
71 const ack_level_t ack_level
;
74 PostCR(const std::string
& _post_data
,
75 RGWDataSyncEnv
* _sync_env
,
76 const std::string
& endpoint
,
77 ack_level_t _ack_level
,
79 RGWPostHTTPData(_sync_env
->cct
, "POST", endpoint
, &read_bl
, verify_ssl
),
80 RGWSimpleCoroutine(_sync_env
->cct
),
82 ack_level (_ack_level
) {
83 // ctor also set the data to send
84 set_post_data(_post_data
);
85 set_send_length(_post_data
.length());
88 // send message to endpoint
89 int send_request(const DoutPrefixProvider
*dpp
) override
{
91 const auto rc
= sync_env
->http_manager
->add_request(this);
95 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_pending
);
100 int request_complete() override
{
101 if (perfcounter
) perfcounter
->dec(l_rgw_pubsub_push_pending
);
102 if (ack_level
== ACK_LEVEL_ANY
) {
104 } else if (ack_level
== ACK_LEVEL_NON_ERROR
) {
105 // TODO check result code to be non-error
107 // TODO: check that result code == ack_level
114 RGWPubSubHTTPEndpoint(const std::string
& _endpoint
, const RGWHTTPArgs
& args
) :
115 endpoint(_endpoint
), verify_ssl(get_bool(args
, "verify-ssl", true)), cloudevents(get_bool(args
, "cloudevents", false))
118 const auto& str_ack_level
= args
.get("http-ack-level", &exists
);
119 if (!exists
|| str_ack_level
== "any") {
121 ack_level
= ACK_LEVEL_ANY
;
122 } else if (str_ack_level
== "non-error") {
123 ack_level
= ACK_LEVEL_NON_ERROR
;
125 ack_level
= std::atoi(str_ack_level
.c_str());
126 if (ack_level
< 100 || ack_level
>= 600) {
127 throw configuration_error("HTTP/S: invalid http-ack-level: " + str_ack_level
);
132 RGWCoroutine
* send_to_completion_async(const rgw_pubsub_event
& event
, RGWDataSyncEnv
* env
) override
{
133 return new PostCR(json_format_pubsub_event(event
), env
, endpoint
, ack_level
, verify_ssl
);
136 RGWCoroutine
* send_to_completion_async(const rgw_pubsub_s3_event
& event
, RGWDataSyncEnv
* env
) override
{
137 return new PostCR(json_format_pubsub_event(event
), env
, endpoint
, ack_level
, verify_ssl
);
140 int send_to_completion_async(CephContext
* cct
, const rgw_pubsub_s3_event
& event
, optional_yield y
) override
{
142 RGWPostHTTPData
request(cct
, "POST", endpoint
, &read_bl
, verify_ssl
);
143 const auto post_data
= json_format_pubsub_event(event
);
145 // following: https://github.com/cloudevents/spec/blob/v1.0.1/http-protocol-binding.md
146 // using "Binary Content Mode"
147 request
.append_header("ce-specversion", "1.0");
148 request
.append_header("ce-type", "com.amazonaws." + event
.eventName
);
149 request
.append_header("ce-time", to_iso_8601(event
.eventTime
));
150 // default output of iso8601 is also RFC3339 compatible
151 request
.append_header("ce-id", event
.x_amz_request_id
+ "." + event
.x_amz_id_2
);
152 request
.append_header("ce-source", event
.eventSource
+ "." + event
.awsRegion
+ "." + event
.bucket_name
);
153 request
.append_header("ce-subject", event
.object_key
);
155 request
.set_post_data(post_data
);
156 request
.set_send_length(post_data
.length());
157 request
.append_header("Content-Type", "application/json");
158 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_pending
);
159 const auto rc
= RGWHTTP::process(&request
, y
);
160 if (perfcounter
) perfcounter
->dec(l_rgw_pubsub_push_pending
);
161 // TODO: use read_bl to process return code and handle according to ack level
165 std::string
to_str() const override
{
166 std::string
str("HTTP/S Endpoint");
167 str
+= "\nURI: " + endpoint
;
168 str
+= (verify_ssl
? "\nverify SSL" : "\ndon't verify SSL");
173 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
174 class RGWPubSubAMQPEndpoint
: public RGWPubSubEndpoint
{
176 enum class ack_level_t
{
181 CephContext
* const cct
;
182 const std::string endpoint
;
183 const std::string topic
;
184 const std::string exchange
;
185 ack_level_t ack_level
;
186 amqp::connection_ptr_t conn
;
188 bool get_verify_ssl(const RGWHTTPArgs
& args
) {
190 auto str_verify_ssl
= args
.get("verify-ssl", &exists
);
192 // verify server certificate by default
195 boost::algorithm::to_lower(str_verify_ssl
);
196 if (str_verify_ssl
== "true") {
199 if (str_verify_ssl
== "false") {
202 throw configuration_error("'verify-ssl' must be true/false, not: " + str_verify_ssl
);
205 std::string
get_exchange(const RGWHTTPArgs
& args
) {
207 const auto exchange
= args
.get("amqp-exchange", &exists
);
209 throw configuration_error("AMQP: missing amqp-exchange");
214 ack_level_t
get_ack_level(const RGWHTTPArgs
& args
) {
216 const auto& str_ack_level
= args
.get("amqp-ack-level", &exists
);
217 if (!exists
|| str_ack_level
== "broker") {
218 // "broker" is default
219 return ack_level_t::Broker
;
221 if (str_ack_level
== "none") {
222 return ack_level_t::None
;
224 if (str_ack_level
== "routable") {
225 return ack_level_t::Routable
;
227 throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level
);
230 // NoAckPublishCR implements async amqp publishing via coroutine
231 // This coroutine ends when it send the message and does not wait for an ack
232 class NoAckPublishCR
: public RGWCoroutine
{
234 const std::string topic
;
235 amqp::connection_ptr_t conn
;
236 const std::string message
;
239 NoAckPublishCR(CephContext
* cct
,
240 const std::string
& _topic
,
241 amqp::connection_ptr_t
& _conn
,
242 const std::string
& _message
) :
244 topic(_topic
), conn(_conn
), message(_message
) {}
246 // send message to endpoint, without waiting for reply
247 int operate(const DoutPrefixProvider
*dpp
) override
{
249 const auto rc
= amqp::publish(conn
, topic
, message
);
251 return set_cr_error(rc
);
253 return set_cr_done();
259 // AckPublishCR implements async amqp publishing via coroutine
260 // This coroutine ends when an ack is received from the borker
261 // note that it does not wait for an ack fron the end client
262 class AckPublishCR
: public RGWCoroutine
, public RGWIOProvider
{
264 const std::string topic
;
265 amqp::connection_ptr_t conn
;
266 const std::string message
;
269 AckPublishCR(CephContext
* cct
,
270 const std::string
& _topic
,
271 amqp::connection_ptr_t
& _conn
,
272 const std::string
& _message
) :
274 topic(_topic
), conn(_conn
), message(_message
) {}
276 // send message to endpoint, waiting for reply
277 int operate(const DoutPrefixProvider
*dpp
) override
{
281 const auto rc
= amqp::publish_with_confirm(conn
,
284 std::bind(&AckPublishCR::request_complete
, this, std::placeholders::_1
));
286 // failed to publish, does not wait for reply
287 return set_cr_error(rc
);
289 // mark as blocked on the amqp answer
290 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_pending
);
294 return set_cr_done();
299 // callback invoked from the amqp manager thread when ack/nack is received
300 void request_complete(int status
) {
301 ceph_assert(!is_done());
303 // server replied with a nack
304 set_cr_error(status
);
307 if (perfcounter
) perfcounter
->dec(l_rgw_pubsub_push_pending
);
310 // TODO: why are these mandatory in RGWIOProvider?
311 void set_io_user_info(void *_user_info
) override
{
314 void *get_io_user_info() override
{
320 RGWPubSubAMQPEndpoint(const std::string
& _endpoint
,
321 const std::string
& _topic
,
322 const RGWHTTPArgs
& args
,
327 exchange(get_exchange(args
)),
328 ack_level(get_ack_level(args
)),
329 conn(amqp::connect(endpoint
, exchange
, (ack_level
== ack_level_t::Broker
), get_verify_ssl(args
), args
.get_optional("ca-location"))) {
331 throw configuration_error("AMQP: failed to create connection to: " + endpoint
);
335 RGWCoroutine
* send_to_completion_async(const rgw_pubsub_event
& event
, RGWDataSyncEnv
* env
) override
{
337 if (ack_level
== ack_level_t::None
) {
338 return new NoAckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(event
));
340 return new AckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(event
));
344 RGWCoroutine
* send_to_completion_async(const rgw_pubsub_s3_event
& event
, RGWDataSyncEnv
* env
) override
{
346 if (ack_level
== ack_level_t::None
) {
347 return new NoAckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(event
));
349 return new AckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(event
));
353 // this allows waiting untill "finish()" is called from a different thread
354 // waiting could be blocking the waiting thread or yielding, depending
355 // with compilation flag support and whether the optional_yield is set
357 using Signature
= void(boost::system::error_code
);
358 using Completion
= ceph::async::Completion
<Signature
>;
359 std::unique_ptr
<Completion
> completion
= nullptr;
362 mutable std::atomic
<bool> done
= false;
363 mutable std::mutex lock
;
364 mutable std::condition_variable cond
;
366 template <typename ExecutionContext
, typename CompletionToken
>
367 auto async_wait(ExecutionContext
& ctx
, CompletionToken
&& token
) {
368 boost::asio::async_completion
<CompletionToken
, Signature
> init(token
);
369 auto& handler
= init
.completion_handler
;
371 std::unique_lock l
{lock
};
372 completion
= Completion::create(ctx
.get_executor(), std::move(handler
));
374 return init
.result
.get();
378 int wait(optional_yield y
) {
383 auto& io_ctx
= y
.get_io_context();
384 auto& yield_ctx
= y
.get_yield_context();
385 boost::system::error_code ec
;
386 async_wait(io_ctx
, yield_ctx
[ec
]);
389 std::unique_lock
l(lock
);
390 cond
.wait(l
, [this]{return (done
==true);});
395 std::unique_lock l
{lock
};
399 boost::system::error_code
ec(-ret
, boost::system::system_category());
400 Completion::post(std::move(completion
), ec
);
407 int send_to_completion_async(CephContext
* cct
, const rgw_pubsub_s3_event
& event
, optional_yield y
) override
{
409 if (ack_level
== ack_level_t::None
) {
410 return amqp::publish(conn
, topic
, json_format_pubsub_event(event
));
412 // TODO: currently broker and routable are the same - this will require different flags but the same mechanism
413 // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
414 auto w
= std::unique_ptr
<Waiter
>(new Waiter
);
415 const auto rc
= amqp::publish_with_confirm(conn
,
417 json_format_pubsub_event(event
),
418 std::bind(&Waiter::finish
, w
.get(), std::placeholders::_1
));
420 // failed to publish, does not wait for reply
427 std::string
to_str() const override
{
428 std::string
str("AMQP(0.9.1) Endpoint");
429 str
+= "\nURI: " + endpoint
;
430 str
+= "\nTopic: " + topic
;
431 str
+= "\nExchange: " + exchange
;
436 static const std::string
AMQP_0_9_1("0-9-1");
437 static const std::string
AMQP_1_0("1-0");
438 static const std::string
AMQP_SCHEMA("amqp");
439 #endif // ifdef WITH_RADOSGW_AMQP_ENDPOINT
442 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
443 class RGWPubSubKafkaEndpoint
: public RGWPubSubEndpoint
{
445 enum class ack_level_t
{
449 CephContext
* const cct
;
450 const std::string topic
;
451 kafka::connection_ptr_t conn
;
452 const ack_level_t ack_level
;
455 ack_level_t
get_ack_level(const RGWHTTPArgs
& args
) {
457 const auto& str_ack_level
= args
.get("kafka-ack-level", &exists
);
458 if (!exists
|| str_ack_level
== "broker") {
459 // "broker" is default
460 return ack_level_t::Broker
;
462 if (str_ack_level
== "none") {
463 return ack_level_t::None
;
465 throw configuration_error("Kafka: invalid kafka-ack-level: " + str_ack_level
);
468 // NoAckPublishCR implements async kafka publishing via coroutine
469 // This coroutine ends when it send the message and does not wait for an ack
470 class NoAckPublishCR
: public RGWCoroutine
{
472 const std::string topic
;
473 kafka::connection_ptr_t conn
;
474 const std::string message
;
477 NoAckPublishCR(CephContext
* cct
,
478 const std::string
& _topic
,
479 kafka::connection_ptr_t
& _conn
,
480 const std::string
& _message
) :
482 topic(_topic
), conn(_conn
), message(_message
) {}
484 // send message to endpoint, without waiting for reply
485 int operate(const DoutPrefixProvider
*dpp
) override
{
487 const auto rc
= kafka::publish(conn
, topic
, message
);
489 return set_cr_error(rc
);
491 return set_cr_done();
497 // AckPublishCR implements async kafka publishing via coroutine
498 // This coroutine ends when an ack is received from the borker
499 // note that it does not wait for an ack fron the end client
500 class AckPublishCR
: public RGWCoroutine
, public RGWIOProvider
{
502 const std::string topic
;
503 kafka::connection_ptr_t conn
;
504 const std::string message
;
507 AckPublishCR(CephContext
* cct
,
508 const std::string
& _topic
,
509 kafka::connection_ptr_t
& _conn
,
510 const std::string
& _message
) :
512 topic(_topic
), conn(_conn
), message(_message
) {}
514 // send message to endpoint, waiting for reply
515 int operate(const DoutPrefixProvider
*dpp
) override
{
519 const auto rc
= kafka::publish_with_confirm(conn
,
522 std::bind(&AckPublishCR::request_complete
, this, std::placeholders::_1
));
524 // failed to publish, does not wait for reply
525 return set_cr_error(rc
);
527 // mark as blocked on the kafka answer
528 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_pending
);
532 return set_cr_done();
537 // callback invoked from the kafka manager thread when ack/nack is received
538 void request_complete(int status
) {
539 ceph_assert(!is_done());
541 // server replied with a nack
542 set_cr_error(status
);
545 if (perfcounter
) perfcounter
->dec(l_rgw_pubsub_push_pending
);
548 // TODO: why are these mandatory in RGWIOProvider?
549 void set_io_user_info(void *_user_info
) override
{
552 void *get_io_user_info() override
{
558 RGWPubSubKafkaEndpoint(const std::string
& _endpoint
,
559 const std::string
& _topic
,
560 const RGWHTTPArgs
& args
,
564 conn(kafka::connect(_endpoint
, get_bool(args
, "use-ssl", false), get_bool(args
, "verify-ssl", true), args
.get_optional("ca-location"))) ,
565 ack_level(get_ack_level(args
)) {
567 throw configuration_error("Kafka: failed to create connection to: " + _endpoint
);
571 RGWCoroutine
* send_to_completion_async(const rgw_pubsub_event
& event
, RGWDataSyncEnv
* env
) override
{
573 if (ack_level
== ack_level_t::None
) {
574 return new NoAckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(event
));
576 return new AckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(event
));
580 RGWCoroutine
* send_to_completion_async(const rgw_pubsub_s3_event
& event
, RGWDataSyncEnv
* env
) override
{
582 if (ack_level
== ack_level_t::None
) {
583 return new NoAckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(event
));
585 return new AckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(event
));
589 // this allows waiting untill "finish()" is called from a different thread
590 // waiting could be blocking the waiting thread or yielding, depending
591 // with compilation flag support and whether the optional_yield is set
593 using Signature
= void(boost::system::error_code
);
594 using Completion
= ceph::async::Completion
<Signature
>;
595 std::unique_ptr
<Completion
> completion
= nullptr;
598 mutable std::atomic
<bool> done
= false;
599 mutable std::mutex lock
;
600 mutable std::condition_variable cond
;
602 template <typename ExecutionContext
, typename CompletionToken
>
603 auto async_wait(ExecutionContext
& ctx
, CompletionToken
&& token
) {
604 boost::asio::async_completion
<CompletionToken
, Signature
> init(token
);
605 auto& handler
= init
.completion_handler
;
607 std::unique_lock l
{lock
};
608 completion
= Completion::create(ctx
.get_executor(), std::move(handler
));
610 return init
.result
.get();
614 int wait(optional_yield y
) {
619 auto& io_ctx
= y
.get_io_context();
620 auto& yield_ctx
= y
.get_yield_context();
621 boost::system::error_code ec
;
622 async_wait(io_ctx
, yield_ctx
[ec
]);
625 std::unique_lock
l(lock
);
626 cond
.wait(l
, [this]{return (done
==true);});
631 std::unique_lock l
{lock
};
635 boost::system::error_code
ec(-ret
, boost::system::system_category());
636 Completion::post(std::move(completion
), ec
);
643 int send_to_completion_async(CephContext
* cct
, const rgw_pubsub_s3_event
& event
, optional_yield y
) override
{
645 if (ack_level
== ack_level_t::None
) {
646 return kafka::publish(conn
, topic
, json_format_pubsub_event(event
));
648 // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
649 auto w
= std::unique_ptr
<Waiter
>(new Waiter
);
650 const auto rc
= kafka::publish_with_confirm(conn
,
652 json_format_pubsub_event(event
),
653 std::bind(&Waiter::finish
, w
.get(), std::placeholders::_1
));
655 // failed to publish, does not wait for reply
662 std::string
to_str() const override
{
663 std::string
str("Kafka Endpoint");
664 str
+= kafka::to_string(conn
);
665 str
+= "\nTopic: " + topic
;
670 static const std::string
KAFKA_SCHEMA("kafka");
671 #endif // ifdef WITH_RADOSGW_KAFKA_ENDPOINT
673 static const std::string
WEBHOOK_SCHEMA("webhook");
674 static const std::string
UNKNOWN_SCHEMA("unknown");
675 static const std::string
NO_SCHEMA("");
677 const std::string
& get_schema(const std::string
& endpoint
) {
678 if (endpoint
.empty()) {
681 const auto pos
= endpoint
.find(':');
682 if (pos
== std::string::npos
) {
683 return UNKNOWN_SCHEMA
;
685 const auto& schema
= endpoint
.substr(0,pos
);
686 if (schema
== "http" || schema
== "https") {
687 return WEBHOOK_SCHEMA
;
688 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
689 } else if (schema
== "amqp" || schema
== "amqps") {
692 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
693 } else if (schema
== "kafka") {
697 return UNKNOWN_SCHEMA
;
700 RGWPubSubEndpoint::Ptr
RGWPubSubEndpoint::create(const std::string
& endpoint
,
701 const std::string
& topic
,
702 const RGWHTTPArgs
& args
,
704 const auto& schema
= get_schema(endpoint
);
705 if (schema
== WEBHOOK_SCHEMA
) {
706 return Ptr(new RGWPubSubHTTPEndpoint(endpoint
, args
));
707 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
708 } else if (schema
== AMQP_SCHEMA
) {
710 std::string version
= args
.get("amqp-version", &exists
);
712 version
= AMQP_0_9_1
;
714 if (version
== AMQP_0_9_1
) {
715 return Ptr(new RGWPubSubAMQPEndpoint(endpoint
, topic
, args
, cct
));
716 } else if (version
== AMQP_1_0
) {
717 throw configuration_error("AMQP: v1.0 not supported");
720 throw configuration_error("AMQP: unknown version: " + version
);
724 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
725 } else if (schema
== KAFKA_SCHEMA
) {
726 return Ptr(new RGWPubSubKafkaEndpoint(endpoint
, topic
, args
, cct
));
730 throw configuration_error("unknown schema in: " + endpoint
);