1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "rgw_pubsub_push.h"
8 #include "include/buffer_fwd.h"
9 #include "common/Formatter.h"
10 #include "common/async/completion.h"
11 #include "rgw_common.h"
12 #include "rgw_data_sync.h"
13 #include "rgw_pubsub.h"
15 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
18 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
19 #include "rgw_kafka.h"
21 #include <boost/asio/yield.hpp>
22 #include <boost/algorithm/string.hpp>
24 #include "rgw_perf_counters.h"
28 template<typename EventType
>
29 std::string
json_format_pubsub_event(const EventType
& event
) {
31 JSONFormatter
f(false);
33 Formatter::ObjectSection
s(f
, EventType::json_type_plural
);
35 Formatter::ArraySection
s(f
, EventType::json_type_plural
);
36 encode_json("", event
, &f
);
43 class RGWPubSubHTTPEndpoint
: public RGWPubSubEndpoint
{
45 const std::string endpoint
;
46 std::string str_ack_level
;
47 typedef unsigned ack_level_t
;
48 ack_level_t ack_level
; // TODO: not used for now
50 static const ack_level_t ACK_LEVEL_ANY
= 0;
51 static const ack_level_t ACK_LEVEL_NON_ERROR
= 1;
53 // PostCR implements async execution of RGWPostHTTPData via coroutine
54 class PostCR
: public RGWPostHTTPData
, public RGWSimpleCoroutine
{
56 RGWDataSyncEnv
* const sync_env
;
58 const ack_level_t ack_level
;
61 PostCR(const std::string
& _post_data
,
62 RGWDataSyncEnv
* _sync_env
,
63 const std::string
& endpoint
,
64 ack_level_t _ack_level
,
66 RGWPostHTTPData(_sync_env
->cct
, "POST", endpoint
, &read_bl
, verify_ssl
),
67 RGWSimpleCoroutine(_sync_env
->cct
),
69 ack_level (_ack_level
) {
70 // ctor also set the data to send
71 set_post_data(_post_data
);
72 set_send_length(_post_data
.length());
75 // send message to endpoint
76 int send_request() override
{
78 const auto rc
= sync_env
->http_manager
->add_request(this);
82 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_pending
);
87 int request_complete() override
{
88 if (perfcounter
) perfcounter
->dec(l_rgw_pubsub_push_pending
);
89 if (ack_level
== ACK_LEVEL_ANY
) {
91 } else if (ack_level
== ACK_LEVEL_NON_ERROR
) {
92 // TODO check result code to be non-error
94 // TODO: check that result code == ack_level
101 RGWPubSubHTTPEndpoint(const std::string
& _endpoint
,
102 const RGWHTTPArgs
& args
) : endpoint(_endpoint
) {
105 str_ack_level
= args
.get("http-ack-level", &exists
);
106 if (!exists
|| str_ack_level
== "any") {
108 ack_level
= ACK_LEVEL_ANY
;
109 } else if (str_ack_level
== "non-error") {
110 ack_level
= ACK_LEVEL_NON_ERROR
;
112 ack_level
= std::atoi(str_ack_level
.c_str());
113 if (ack_level
< 100 || ack_level
>= 600) {
114 throw configuration_error("HTTP/S: invalid http-ack-level: " + str_ack_level
);
118 auto str_verify_ssl
= args
.get("verify-ssl", &exists
);
119 boost::algorithm::to_lower(str_verify_ssl
);
120 // verify server certificate by default
121 if (!exists
|| str_verify_ssl
== "true") {
123 } else if (str_verify_ssl
== "false") {
126 throw configuration_error("HTTP/S: verify-ssl must be true/false, not: " + str_verify_ssl
);
130 RGWCoroutine
* send_to_completion_async(const rgw_pubsub_event
& event
, RGWDataSyncEnv
* env
) override
{
131 return new PostCR(json_format_pubsub_event(event
), env
, endpoint
, ack_level
, verify_ssl
);
134 RGWCoroutine
* send_to_completion_async(const rgw_pubsub_s3_record
& record
, RGWDataSyncEnv
* env
) override
{
135 return new PostCR(json_format_pubsub_event(record
), env
, endpoint
, ack_level
, verify_ssl
);
138 int send_to_completion_async(CephContext
* cct
, const rgw_pubsub_s3_record
& record
, optional_yield y
) override
{
140 RGWPostHTTPData
request(cct
, "POST", endpoint
, &read_bl
, verify_ssl
);
141 const auto post_data
= json_format_pubsub_event(record
);
142 request
.set_post_data(post_data
);
143 request
.set_send_length(post_data
.length());
144 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_pending
);
145 const auto rc
= RGWHTTP::process(&request
, y
);
146 if (perfcounter
) perfcounter
->dec(l_rgw_pubsub_push_pending
);
147 // TODO: use read_bl to process return code and handle according to ack level
151 std::string
to_str() const override
{
152 std::string
str("HTTP/S Endpoint");
153 str
+= "\nURI: " + endpoint
;
154 str
+= "\nAck Level: " + str_ack_level
;
155 str
+= (verify_ssl
? "\nverify SSL" : "\ndon't verify SSL");
161 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
162 class RGWPubSubAMQPEndpoint
: public RGWPubSubEndpoint
{
164 enum class ack_level_t
{
169 CephContext
* const cct
;
170 const std::string endpoint
;
171 const std::string topic
;
172 const std::string exchange
;
173 ack_level_t ack_level
;
174 amqp::connection_ptr_t conn
;
176 std::string
get_exchange(const RGWHTTPArgs
& args
) {
178 const auto exchange
= args
.get("amqp-exchange", &exists
);
180 throw configuration_error("AMQP: missing amqp-exchange");
185 ack_level_t
get_ack_level(const RGWHTTPArgs
& args
) {
187 const auto& str_ack_level
= args
.get("amqp-ack-level", &exists
);
188 if (!exists
|| str_ack_level
== "broker") {
189 // "broker" is default
190 return ack_level_t::Broker
;
192 if (str_ack_level
== "none") {
193 return ack_level_t::None
;
195 if (str_ack_level
== "routable") {
196 return ack_level_t::Routable
;
198 throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level
);
201 // NoAckPublishCR implements async amqp publishing via coroutine
202 // This coroutine ends when it send the message and does not wait for an ack
203 class NoAckPublishCR
: public RGWCoroutine
{
205 const std::string topic
;
206 amqp::connection_ptr_t conn
;
207 const std::string message
;
210 NoAckPublishCR(CephContext
* cct
,
211 const std::string
& _topic
,
212 amqp::connection_ptr_t
& _conn
,
213 const std::string
& _message
) :
215 topic(_topic
), conn(_conn
), message(_message
) {}
217 // send message to endpoint, without waiting for reply
218 int operate() override
{
220 const auto rc
= amqp::publish(conn
, topic
, message
);
222 return set_cr_error(rc
);
224 return set_cr_done();
230 // AckPublishCR implements async amqp publishing via coroutine
231 // This coroutine ends when an ack is received from the borker
232 // note that it does not wait for an ack fron the end client
233 class AckPublishCR
: public RGWCoroutine
, public RGWIOProvider
{
235 const std::string topic
;
236 amqp::connection_ptr_t conn
;
237 const std::string message
;
240 AckPublishCR(CephContext
* cct
,
241 const std::string
& _topic
,
242 amqp::connection_ptr_t
& _conn
,
243 const std::string
& _message
) :
245 topic(_topic
), conn(_conn
), message(_message
) {}
247 // send message to endpoint, waiting for reply
248 int operate() override
{
252 const auto rc
= amqp::publish_with_confirm(conn
,
255 std::bind(&AckPublishCR::request_complete
, this, std::placeholders::_1
));
257 // failed to publish, does not wait for reply
258 return set_cr_error(rc
);
260 // mark as blocked on the amqp answer
261 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_pending
);
265 return set_cr_done();
270 // callback invoked from the amqp manager thread when ack/nack is received
271 void request_complete(int status
) {
272 ceph_assert(!is_done());
274 // server replied with a nack
275 set_cr_error(status
);
278 if (perfcounter
) perfcounter
->dec(l_rgw_pubsub_push_pending
);
281 // TODO: why are these mandatory in RGWIOProvider?
282 void set_io_user_info(void *_user_info
) override
{
285 void *get_io_user_info() override
{
291 RGWPubSubAMQPEndpoint(const std::string
& _endpoint
,
292 const std::string
& _topic
,
293 const RGWHTTPArgs
& args
,
298 exchange(get_exchange(args
)),
299 ack_level(get_ack_level(args
)),
300 conn(amqp::connect(endpoint
, exchange
, (ack_level
== ack_level_t::Broker
))) {
302 throw configuration_error("AMQP: failed to create connection to: " + endpoint
);
306 RGWCoroutine
* send_to_completion_async(const rgw_pubsub_event
& event
, RGWDataSyncEnv
* env
) override
{
308 if (ack_level
== ack_level_t::None
) {
309 return new NoAckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(event
));
311 return new AckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(event
));
315 RGWCoroutine
* send_to_completion_async(const rgw_pubsub_s3_record
& record
, RGWDataSyncEnv
* env
) override
{
317 if (ack_level
== ack_level_t::None
) {
318 return new NoAckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(record
));
320 return new AckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(record
));
324 // this allows waiting untill "finish()" is called from a different thread
325 // waiting could be blocking the waiting thread or yielding, depending
326 // with compilation flag support and whether the optional_yield is set
328 using Signature
= void(boost::system::error_code
);
329 using Completion
= ceph::async::Completion
<Signature
>;
330 std::unique_ptr
<Completion
> completion
= nullptr;
333 mutable std::atomic
<bool> done
= false;
334 mutable std::mutex lock
;
335 mutable std::condition_variable cond
;
337 template <typename ExecutionContext
, typename CompletionToken
>
338 auto async_wait(ExecutionContext
& ctx
, CompletionToken
&& token
) {
339 boost::asio::async_completion
<CompletionToken
, Signature
> init(token
);
340 auto& handler
= init
.completion_handler
;
342 std::unique_lock l
{lock
};
343 completion
= Completion::create(ctx
.get_executor(), std::move(handler
));
345 return init
.result
.get();
349 int wait(optional_yield y
) {
353 #ifdef HAVE_BOOST_CONTEXT
355 auto& io_ctx
= y
.get_io_context();
356 auto& yield_ctx
= y
.get_yield_context();
357 boost::system::error_code ec
;
358 async_wait(io_ctx
, yield_ctx
[ec
]);
362 std::unique_lock
l(lock
);
363 cond
.wait(l
, [this]{return (done
==true);});
368 std::unique_lock l
{lock
};
372 boost::system::error_code
ec(-ret
, boost::system::system_category());
373 Completion::post(std::move(completion
), ec
);
380 int send_to_completion_async(CephContext
* cct
, const rgw_pubsub_s3_record
& record
, optional_yield y
) override
{
382 if (ack_level
== ack_level_t::None
) {
383 return amqp::publish(conn
, topic
, json_format_pubsub_event(record
));
385 // TODO: currently broker and routable are the same - this will require different flags but the same mechanism
386 // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
387 auto w
= std::unique_ptr
<Waiter
>(new Waiter
);
388 const auto rc
= amqp::publish_with_confirm(conn
,
390 json_format_pubsub_event(record
),
391 std::bind(&Waiter::finish
, w
.get(), std::placeholders::_1
));
393 // failed to publish, does not wait for reply
400 std::string
to_str() const override
{
401 std::string
str("AMQP(0.9.1) Endpoint");
402 str
+= "\nURI: " + endpoint
;
403 str
+= "\nTopic: " + topic
;
404 str
+= "\nExchange: " + exchange
;
409 static const std::string
AMQP_0_9_1("0-9-1");
410 static const std::string
AMQP_1_0("1-0");
411 static const std::string
AMQP_SCHEMA("amqp");
412 #endif // ifdef WITH_RADOSGW_AMQP_ENDPOINT
415 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
416 class RGWPubSubKafkaEndpoint
: public RGWPubSubEndpoint
{
418 enum class ack_level_t
{
422 CephContext
* const cct
;
423 const std::string topic
;
424 kafka::connection_ptr_t conn
;
425 const ack_level_t ack_level
;
427 bool get_verify_ssl(const RGWHTTPArgs
& args
) {
429 auto str_verify_ssl
= args
.get("verify-ssl", &exists
);
431 // verify server certificate by default
434 boost::algorithm::to_lower(str_verify_ssl
);
435 if (str_verify_ssl
== "true") {
438 if (str_verify_ssl
== "false") {
441 throw configuration_error("'verify-ssl' must be true/false, not: " + str_verify_ssl
);
444 bool get_use_ssl(const RGWHTTPArgs
& args
) {
446 auto str_use_ssl
= args
.get("use-ssl", &exists
);
448 // by default ssl not used
451 boost::algorithm::to_lower(str_use_ssl
);
452 if (str_use_ssl
== "true") {
455 if (str_use_ssl
== "false") {
458 throw configuration_error("'use-ssl' must be true/false, not: " + str_use_ssl
);
461 ack_level_t
get_ack_level(const RGWHTTPArgs
& args
) {
464 const auto str_ack_level
= args
.get("kafka-ack-level", &exists
);
465 if (!exists
|| str_ack_level
== "broker") {
466 // "broker" is default
467 return ack_level_t::Broker
;
469 if (str_ack_level
== "none") {
470 return ack_level_t::None
;
472 throw configuration_error("Kafka: invalid kafka-ack-level: " + str_ack_level
);
475 // NoAckPublishCR implements async kafka publishing via coroutine
476 // This coroutine ends when it send the message and does not wait for an ack
477 class NoAckPublishCR
: public RGWCoroutine
{
479 const std::string topic
;
480 kafka::connection_ptr_t conn
;
481 const std::string message
;
484 NoAckPublishCR(CephContext
* cct
,
485 const std::string
& _topic
,
486 kafka::connection_ptr_t
& _conn
,
487 const std::string
& _message
) :
489 topic(_topic
), conn(_conn
), message(_message
) {}
491 // send message to endpoint, without waiting for reply
492 int operate() override
{
494 const auto rc
= kafka::publish(conn
, topic
, message
);
496 return set_cr_error(rc
);
498 return set_cr_done();
504 // AckPublishCR implements async kafka publishing via coroutine
505 // This coroutine ends when an ack is received from the borker
506 // note that it does not wait for an ack fron the end client
507 class AckPublishCR
: public RGWCoroutine
, public RGWIOProvider
{
509 const std::string topic
;
510 kafka::connection_ptr_t conn
;
511 const std::string message
;
514 AckPublishCR(CephContext
* cct
,
515 const std::string
& _topic
,
516 kafka::connection_ptr_t
& _conn
,
517 const std::string
& _message
) :
519 topic(_topic
), conn(_conn
), message(_message
) {}
521 // send message to endpoint, waiting for reply
522 int operate() override
{
526 const auto rc
= kafka::publish_with_confirm(conn
,
529 std::bind(&AckPublishCR::request_complete
, this, std::placeholders::_1
));
531 // failed to publish, does not wait for reply
532 return set_cr_error(rc
);
534 // mark as blocked on the kafka answer
535 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_pending
);
539 return set_cr_done();
544 // callback invoked from the kafka manager thread when ack/nack is received
545 void request_complete(int status
) {
546 ceph_assert(!is_done());
548 // server replied with a nack
549 set_cr_error(status
);
552 if (perfcounter
) perfcounter
->dec(l_rgw_pubsub_push_pending
);
555 // TODO: why are these mandatory in RGWIOProvider?
556 void set_io_user_info(void *_user_info
) override
{
559 void *get_io_user_info() override
{
565 RGWPubSubKafkaEndpoint(const std::string
& _endpoint
,
566 const std::string
& _topic
,
567 const RGWHTTPArgs
& args
,
571 conn(kafka::connect(_endpoint
, get_use_ssl(args
), get_verify_ssl(args
), args
.get_optional("ca-location"))) ,
572 ack_level(get_ack_level(args
)) {
574 throw configuration_error("Kafka: failed to create connection to: " + _endpoint
);
578 RGWCoroutine
* send_to_completion_async(const rgw_pubsub_event
& event
, RGWDataSyncEnv
* env
) override
{
580 if (ack_level
== ack_level_t::None
) {
581 return new NoAckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(event
));
583 return new AckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(event
));
587 RGWCoroutine
* send_to_completion_async(const rgw_pubsub_s3_record
& record
, RGWDataSyncEnv
* env
) override
{
589 if (ack_level
== ack_level_t::None
) {
590 return new NoAckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(record
));
592 return new AckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(record
));
596 // this allows waiting untill "finish()" is called from a different thread
597 // waiting could be blocking the waiting thread or yielding, depending
598 // with compilation flag support and whether the optional_yield is set
600 using Signature
= void(boost::system::error_code
);
601 using Completion
= ceph::async::Completion
<Signature
>;
602 std::unique_ptr
<Completion
> completion
= nullptr;
605 mutable std::atomic
<bool> done
= false;
606 mutable std::mutex lock
;
607 mutable std::condition_variable cond
;
609 template <typename ExecutionContext
, typename CompletionToken
>
610 auto async_wait(ExecutionContext
& ctx
, CompletionToken
&& token
) {
611 boost::asio::async_completion
<CompletionToken
, Signature
> init(token
);
612 auto& handler
= init
.completion_handler
;
614 std::unique_lock l
{lock
};
615 completion
= Completion::create(ctx
.get_executor(), std::move(handler
));
617 return init
.result
.get();
621 int wait(optional_yield y
) {
625 #ifdef HAVE_BOOST_CONTEXT
627 auto& io_ctx
= y
.get_io_context();
628 auto& yield_ctx
= y
.get_yield_context();
629 boost::system::error_code ec
;
630 async_wait(io_ctx
, yield_ctx
[ec
]);
634 std::unique_lock
l(lock
);
635 cond
.wait(l
, [this]{return (done
==true);});
640 std::unique_lock l
{lock
};
644 boost::system::error_code
ec(-ret
, boost::system::system_category());
645 Completion::post(std::move(completion
), ec
);
652 int send_to_completion_async(CephContext
* cct
, const rgw_pubsub_s3_record
& record
, optional_yield y
) override
{
654 if (ack_level
== ack_level_t::None
) {
655 return kafka::publish(conn
, topic
, json_format_pubsub_event(record
));
657 // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
658 auto w
= std::unique_ptr
<Waiter
>(new Waiter
);
659 const auto rc
= kafka::publish_with_confirm(conn
,
661 json_format_pubsub_event(record
),
662 std::bind(&Waiter::finish
, w
.get(), std::placeholders::_1
));
664 // failed to publish, does not wait for reply
671 std::string
to_str() const override
{
672 std::string
str("Kafka Endpoint");
673 str
+= kafka::to_string(conn
);
674 str
+= "\nTopic: " + topic
;
679 static const std::string
KAFKA_SCHEMA("kafka");
680 #endif // ifdef WITH_RADOSGW_KAFKA_ENDPOINT
682 static const std::string
WEBHOOK_SCHEMA("webhook");
683 static const std::string
UNKNOWN_SCHEMA("unknown");
684 static const std::string
NO_SCHEMA("");
686 const std::string
& get_schema(const std::string
& endpoint
) {
687 if (endpoint
.empty()) {
690 const auto pos
= endpoint
.find(':');
691 if (pos
== std::string::npos
) {
692 return UNKNOWN_SCHEMA
;
694 const auto& schema
= endpoint
.substr(0,pos
);
695 if (schema
== "http" || schema
== "https") {
696 return WEBHOOK_SCHEMA
;
697 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
698 } else if (schema
== "amqp") {
701 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
702 } else if (schema
== "kafka") {
706 return UNKNOWN_SCHEMA
;
709 RGWPubSubEndpoint::Ptr
RGWPubSubEndpoint::create(const std::string
& endpoint
,
710 const std::string
& topic
,
711 const RGWHTTPArgs
& args
,
713 const auto& schema
= get_schema(endpoint
);
714 if (schema
== WEBHOOK_SCHEMA
) {
715 return Ptr(new RGWPubSubHTTPEndpoint(endpoint
, args
));
716 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
717 } else if (schema
== AMQP_SCHEMA
) {
719 std::string version
= args
.get("amqp-version", &exists
);
721 version
= AMQP_0_9_1
;
723 if (version
== AMQP_0_9_1
) {
724 return Ptr(new RGWPubSubAMQPEndpoint(endpoint
, topic
, args
, cct
));
725 } else if (version
== AMQP_1_0
) {
726 throw configuration_error("AMQP: v1.0 not supported");
729 throw configuration_error("AMQP: unknown version: " + version
);
732 } else if (schema
== "amqps") {
733 throw configuration_error("AMQP: ssl not supported");
736 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
737 } else if (schema
== KAFKA_SCHEMA
) {
738 return Ptr(new RGWPubSubKafkaEndpoint(endpoint
, topic
, args
, cct
));
742 throw configuration_error("unknown schema in: " + endpoint
);