1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "rgw_pubsub_push.h"
8 #include "include/buffer_fwd.h"
9 #include "common/Formatter.h"
10 #include "common/async/completion.h"
11 #include "rgw_common.h"
12 #include "rgw_data_sync.h"
13 #include "rgw_pubsub.h"
15 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
18 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
19 #include "rgw_kafka.h"
21 #include <boost/asio/yield.hpp>
22 #include <boost/algorithm/string.hpp>
24 #include "rgw_perf_counters.h"
28 template<typename EventType
>
29 std::string
json_format_pubsub_event(const EventType
& event
) {
31 JSONFormatter
f(false);
33 Formatter::ObjectSection
s(f
, EventType::json_type_plural
);
35 Formatter::ArraySection
s(f
, EventType::json_type_plural
);
36 encode_json("", event
, &f
);
43 class RGWPubSubHTTPEndpoint
: public RGWPubSubEndpoint
{
45 const std::string endpoint
;
46 std::string str_ack_level
;
47 typedef unsigned ack_level_t
;
48 ack_level_t ack_level
; // TODO: not used for now
50 static const ack_level_t ACK_LEVEL_ANY
= 0;
51 static const ack_level_t ACK_LEVEL_NON_ERROR
= 1;
53 // PostCR implements async execution of RGWPostHTTPData via coroutine
54 class PostCR
: public RGWPostHTTPData
, public RGWSimpleCoroutine
{
56 RGWDataSyncEnv
* const sync_env
;
58 const ack_level_t ack_level
;
61 PostCR(const std::string
& _post_data
,
62 RGWDataSyncEnv
* _sync_env
,
63 const std::string
& endpoint
,
64 ack_level_t _ack_level
,
66 RGWPostHTTPData(_sync_env
->cct
, "POST", endpoint
, &read_bl
, verify_ssl
),
67 RGWSimpleCoroutine(_sync_env
->cct
),
69 ack_level (_ack_level
) {
70 // ctor also set the data to send
71 set_post_data(_post_data
);
72 set_send_length(_post_data
.length());
75 // send message to endpoint
76 int send_request() override
{
78 const auto rc
= sync_env
->http_manager
->add_request(this);
82 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_pending
);
87 int request_complete() override
{
88 if (perfcounter
) perfcounter
->dec(l_rgw_pubsub_push_pending
);
89 if (ack_level
== ACK_LEVEL_ANY
) {
91 } else if (ack_level
== ACK_LEVEL_NON_ERROR
) {
92 // TODO check result code to be non-error
94 // TODO: check that result code == ack_level
101 RGWPubSubHTTPEndpoint(const std::string
& _endpoint
,
102 const RGWHTTPArgs
& args
) : endpoint(_endpoint
) {
105 str_ack_level
= args
.get("http-ack-level", &exists
);
106 if (!exists
|| str_ack_level
== "any") {
108 ack_level
= ACK_LEVEL_ANY
;
109 } else if (str_ack_level
== "non-error") {
110 ack_level
= ACK_LEVEL_NON_ERROR
;
112 ack_level
= std::atoi(str_ack_level
.c_str());
113 if (ack_level
< 100 || ack_level
>= 600) {
114 throw configuration_error("HTTP/S: invalid http-ack-level: " + str_ack_level
);
118 auto str_verify_ssl
= args
.get("verify-ssl", &exists
);
119 boost::algorithm::to_lower(str_verify_ssl
);
120 // verify server certificate by default
121 if (!exists
|| str_verify_ssl
== "true") {
123 } else if (str_verify_ssl
== "false") {
126 throw configuration_error("HTTP/S: verify-ssl must be true/false, not: " + str_verify_ssl
);
130 RGWCoroutine
* send_to_completion_async(const rgw_pubsub_event
& event
, RGWDataSyncEnv
* env
) override
{
131 return new PostCR(json_format_pubsub_event(event
), env
, endpoint
, ack_level
, verify_ssl
);
134 RGWCoroutine
* send_to_completion_async(const rgw_pubsub_s3_event
& event
, RGWDataSyncEnv
* env
) override
{
135 return new PostCR(json_format_pubsub_event(event
), env
, endpoint
, ack_level
, verify_ssl
);
138 int send_to_completion_async(CephContext
* cct
, const rgw_pubsub_s3_event
& event
, optional_yield y
) override
{
140 RGWPostHTTPData
request(cct
, "POST", endpoint
, &read_bl
, verify_ssl
);
141 const auto post_data
= json_format_pubsub_event(event
);
142 request
.set_post_data(post_data
);
143 request
.set_send_length(post_data
.length());
144 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_pending
);
145 const auto rc
= RGWHTTP::process(&request
, y
);
146 if (perfcounter
) perfcounter
->dec(l_rgw_pubsub_push_pending
);
147 // TODO: use read_bl to process return code and handle according to ack level
151 std::string
to_str() const override
{
152 std::string
str("HTTP/S Endpoint");
153 str
+= "\nURI: " + endpoint
;
154 str
+= "\nAck Level: " + str_ack_level
;
155 str
+= (verify_ssl
? "\nverify SSL" : "\ndon't verify SSL");
161 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
162 class RGWPubSubAMQPEndpoint
: public RGWPubSubEndpoint
{
164 enum class ack_level_t
{
169 CephContext
* const cct
;
170 const std::string endpoint
;
171 const std::string topic
;
172 const std::string exchange
;
173 ack_level_t ack_level
;
174 amqp::connection_ptr_t conn
;
176 bool get_verify_ssl(const RGWHTTPArgs
& args
) {
178 auto str_verify_ssl
= args
.get("verify-ssl", &exists
);
180 // verify server certificate by default
183 boost::algorithm::to_lower(str_verify_ssl
);
184 if (str_verify_ssl
== "true") {
187 if (str_verify_ssl
== "false") {
190 throw configuration_error("'verify-ssl' must be true/false, not: " + str_verify_ssl
);
193 std::string
get_exchange(const RGWHTTPArgs
& args
) {
195 const auto exchange
= args
.get("amqp-exchange", &exists
);
197 throw configuration_error("AMQP: missing amqp-exchange");
202 ack_level_t
get_ack_level(const RGWHTTPArgs
& args
) {
204 const auto& str_ack_level
= args
.get("amqp-ack-level", &exists
);
205 if (!exists
|| str_ack_level
== "broker") {
206 // "broker" is default
207 return ack_level_t::Broker
;
209 if (str_ack_level
== "none") {
210 return ack_level_t::None
;
212 if (str_ack_level
== "routable") {
213 return ack_level_t::Routable
;
215 throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level
);
218 // NoAckPublishCR implements async amqp publishing via coroutine
219 // This coroutine ends when it send the message and does not wait for an ack
220 class NoAckPublishCR
: public RGWCoroutine
{
222 const std::string topic
;
223 amqp::connection_ptr_t conn
;
224 const std::string message
;
227 NoAckPublishCR(CephContext
* cct
,
228 const std::string
& _topic
,
229 amqp::connection_ptr_t
& _conn
,
230 const std::string
& _message
) :
232 topic(_topic
), conn(_conn
), message(_message
) {}
234 // send message to endpoint, without waiting for reply
235 int operate() override
{
237 const auto rc
= amqp::publish(conn
, topic
, message
);
239 return set_cr_error(rc
);
241 return set_cr_done();
247 // AckPublishCR implements async amqp publishing via coroutine
248 // This coroutine ends when an ack is received from the borker
249 // note that it does not wait for an ack fron the end client
250 class AckPublishCR
: public RGWCoroutine
, public RGWIOProvider
{
252 const std::string topic
;
253 amqp::connection_ptr_t conn
;
254 const std::string message
;
257 AckPublishCR(CephContext
* cct
,
258 const std::string
& _topic
,
259 amqp::connection_ptr_t
& _conn
,
260 const std::string
& _message
) :
262 topic(_topic
), conn(_conn
), message(_message
) {}
264 // send message to endpoint, waiting for reply
265 int operate() override
{
269 const auto rc
= amqp::publish_with_confirm(conn
,
272 std::bind(&AckPublishCR::request_complete
, this, std::placeholders::_1
));
274 // failed to publish, does not wait for reply
275 return set_cr_error(rc
);
277 // mark as blocked on the amqp answer
278 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_pending
);
282 return set_cr_done();
287 // callback invoked from the amqp manager thread when ack/nack is received
288 void request_complete(int status
) {
289 ceph_assert(!is_done());
291 // server replied with a nack
292 set_cr_error(status
);
295 if (perfcounter
) perfcounter
->dec(l_rgw_pubsub_push_pending
);
298 // TODO: why are these mandatory in RGWIOProvider?
299 void set_io_user_info(void *_user_info
) override
{
302 void *get_io_user_info() override
{
308 RGWPubSubAMQPEndpoint(const std::string
& _endpoint
,
309 const std::string
& _topic
,
310 const RGWHTTPArgs
& args
,
315 exchange(get_exchange(args
)),
316 ack_level(get_ack_level(args
)),
317 conn(amqp::connect(endpoint
, exchange
, (ack_level
== ack_level_t::Broker
), get_verify_ssl(args
), args
.get_optional("ca-location"))) {
319 throw configuration_error("AMQP: failed to create connection to: " + endpoint
);
323 RGWCoroutine
* send_to_completion_async(const rgw_pubsub_event
& event
, RGWDataSyncEnv
* env
) override
{
325 if (ack_level
== ack_level_t::None
) {
326 return new NoAckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(event
));
328 return new AckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(event
));
332 RGWCoroutine
* send_to_completion_async(const rgw_pubsub_s3_event
& event
, RGWDataSyncEnv
* env
) override
{
334 if (ack_level
== ack_level_t::None
) {
335 return new NoAckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(event
));
337 return new AckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(event
));
341 // this allows waiting untill "finish()" is called from a different thread
342 // waiting could be blocking the waiting thread or yielding, depending
343 // with compilation flag support and whether the optional_yield is set
345 using Signature
= void(boost::system::error_code
);
346 using Completion
= ceph::async::Completion
<Signature
>;
347 std::unique_ptr
<Completion
> completion
= nullptr;
350 mutable std::atomic
<bool> done
= false;
351 mutable std::mutex lock
;
352 mutable std::condition_variable cond
;
354 template <typename ExecutionContext
, typename CompletionToken
>
355 auto async_wait(ExecutionContext
& ctx
, CompletionToken
&& token
) {
356 boost::asio::async_completion
<CompletionToken
, Signature
> init(token
);
357 auto& handler
= init
.completion_handler
;
359 std::unique_lock l
{lock
};
360 completion
= Completion::create(ctx
.get_executor(), std::move(handler
));
362 return init
.result
.get();
366 int wait(optional_yield y
) {
371 auto& io_ctx
= y
.get_io_context();
372 auto& yield_ctx
= y
.get_yield_context();
373 boost::system::error_code ec
;
374 async_wait(io_ctx
, yield_ctx
[ec
]);
377 std::unique_lock
l(lock
);
378 cond
.wait(l
, [this]{return (done
==true);});
383 std::unique_lock l
{lock
};
387 boost::system::error_code
ec(-ret
, boost::system::system_category());
388 Completion::post(std::move(completion
), ec
);
395 int send_to_completion_async(CephContext
* cct
, const rgw_pubsub_s3_event
& event
, optional_yield y
) override
{
397 if (ack_level
== ack_level_t::None
) {
398 return amqp::publish(conn
, topic
, json_format_pubsub_event(event
));
400 // TODO: currently broker and routable are the same - this will require different flags but the same mechanism
401 // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
402 auto w
= std::unique_ptr
<Waiter
>(new Waiter
);
403 const auto rc
= amqp::publish_with_confirm(conn
,
405 json_format_pubsub_event(event
),
406 std::bind(&Waiter::finish
, w
.get(), std::placeholders::_1
));
408 // failed to publish, does not wait for reply
415 std::string
to_str() const override
{
416 std::string
str("AMQP(0.9.1) Endpoint");
417 str
+= "\nURI: " + endpoint
;
418 str
+= "\nTopic: " + topic
;
419 str
+= "\nExchange: " + exchange
;
424 static const std::string
AMQP_0_9_1("0-9-1");
425 static const std::string
AMQP_1_0("1-0");
426 static const std::string
AMQP_SCHEMA("amqp");
427 #endif // ifdef WITH_RADOSGW_AMQP_ENDPOINT
430 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
431 class RGWPubSubKafkaEndpoint
: public RGWPubSubEndpoint
{
433 enum class ack_level_t
{
437 CephContext
* const cct
;
438 const std::string topic
;
439 kafka::connection_ptr_t conn
;
440 const ack_level_t ack_level
;
442 bool get_verify_ssl(const RGWHTTPArgs
& args
) {
444 auto str_verify_ssl
= args
.get("verify-ssl", &exists
);
446 // verify server certificate by default
449 boost::algorithm::to_lower(str_verify_ssl
);
450 if (str_verify_ssl
== "true") {
453 if (str_verify_ssl
== "false") {
456 throw configuration_error("'verify-ssl' must be true/false, not: " + str_verify_ssl
);
459 bool get_use_ssl(const RGWHTTPArgs
& args
) {
461 auto str_use_ssl
= args
.get("use-ssl", &exists
);
463 // by default ssl not used
466 boost::algorithm::to_lower(str_use_ssl
);
467 if (str_use_ssl
== "true") {
470 if (str_use_ssl
== "false") {
473 throw configuration_error("'use-ssl' must be true/false, not: " + str_use_ssl
);
476 ack_level_t
get_ack_level(const RGWHTTPArgs
& args
) {
479 const auto str_ack_level
= args
.get("kafka-ack-level", &exists
);
480 if (!exists
|| str_ack_level
== "broker") {
481 // "broker" is default
482 return ack_level_t::Broker
;
484 if (str_ack_level
== "none") {
485 return ack_level_t::None
;
487 throw configuration_error("Kafka: invalid kafka-ack-level: " + str_ack_level
);
490 // NoAckPublishCR implements async kafka publishing via coroutine
491 // This coroutine ends when it send the message and does not wait for an ack
492 class NoAckPublishCR
: public RGWCoroutine
{
494 const std::string topic
;
495 kafka::connection_ptr_t conn
;
496 const std::string message
;
499 NoAckPublishCR(CephContext
* cct
,
500 const std::string
& _topic
,
501 kafka::connection_ptr_t
& _conn
,
502 const std::string
& _message
) :
504 topic(_topic
), conn(_conn
), message(_message
) {}
506 // send message to endpoint, without waiting for reply
507 int operate() override
{
509 const auto rc
= kafka::publish(conn
, topic
, message
);
511 return set_cr_error(rc
);
513 return set_cr_done();
519 // AckPublishCR implements async kafka publishing via coroutine
520 // This coroutine ends when an ack is received from the borker
521 // note that it does not wait for an ack fron the end client
522 class AckPublishCR
: public RGWCoroutine
, public RGWIOProvider
{
524 const std::string topic
;
525 kafka::connection_ptr_t conn
;
526 const std::string message
;
529 AckPublishCR(CephContext
* cct
,
530 const std::string
& _topic
,
531 kafka::connection_ptr_t
& _conn
,
532 const std::string
& _message
) :
534 topic(_topic
), conn(_conn
), message(_message
) {}
536 // send message to endpoint, waiting for reply
537 int operate() override
{
541 const auto rc
= kafka::publish_with_confirm(conn
,
544 std::bind(&AckPublishCR::request_complete
, this, std::placeholders::_1
));
546 // failed to publish, does not wait for reply
547 return set_cr_error(rc
);
549 // mark as blocked on the kafka answer
550 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_pending
);
554 return set_cr_done();
559 // callback invoked from the kafka manager thread when ack/nack is received
560 void request_complete(int status
) {
561 ceph_assert(!is_done());
563 // server replied with a nack
564 set_cr_error(status
);
567 if (perfcounter
) perfcounter
->dec(l_rgw_pubsub_push_pending
);
570 // TODO: why are these mandatory in RGWIOProvider?
571 void set_io_user_info(void *_user_info
) override
{
574 void *get_io_user_info() override
{
580 RGWPubSubKafkaEndpoint(const std::string
& _endpoint
,
581 const std::string
& _topic
,
582 const RGWHTTPArgs
& args
,
586 conn(kafka::connect(_endpoint
, get_use_ssl(args
), get_verify_ssl(args
), args
.get_optional("ca-location"))) ,
587 ack_level(get_ack_level(args
)) {
589 throw configuration_error("Kafka: failed to create connection to: " + _endpoint
);
593 RGWCoroutine
* send_to_completion_async(const rgw_pubsub_event
& event
, RGWDataSyncEnv
* env
) override
{
595 if (ack_level
== ack_level_t::None
) {
596 return new NoAckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(event
));
598 return new AckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(event
));
602 RGWCoroutine
* send_to_completion_async(const rgw_pubsub_s3_event
& event
, RGWDataSyncEnv
* env
) override
{
604 if (ack_level
== ack_level_t::None
) {
605 return new NoAckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(event
));
607 return new AckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(event
));
611 // this allows waiting untill "finish()" is called from a different thread
612 // waiting could be blocking the waiting thread or yielding, depending
613 // with compilation flag support and whether the optional_yield is set
615 using Signature
= void(boost::system::error_code
);
616 using Completion
= ceph::async::Completion
<Signature
>;
617 std::unique_ptr
<Completion
> completion
= nullptr;
620 mutable std::atomic
<bool> done
= false;
621 mutable std::mutex lock
;
622 mutable std::condition_variable cond
;
624 template <typename ExecutionContext
, typename CompletionToken
>
625 auto async_wait(ExecutionContext
& ctx
, CompletionToken
&& token
) {
626 boost::asio::async_completion
<CompletionToken
, Signature
> init(token
);
627 auto& handler
= init
.completion_handler
;
629 std::unique_lock l
{lock
};
630 completion
= Completion::create(ctx
.get_executor(), std::move(handler
));
632 return init
.result
.get();
636 int wait(optional_yield y
) {
641 auto& io_ctx
= y
.get_io_context();
642 auto& yield_ctx
= y
.get_yield_context();
643 boost::system::error_code ec
;
644 async_wait(io_ctx
, yield_ctx
[ec
]);
647 std::unique_lock
l(lock
);
648 cond
.wait(l
, [this]{return (done
==true);});
653 std::unique_lock l
{lock
};
657 boost::system::error_code
ec(-ret
, boost::system::system_category());
658 Completion::post(std::move(completion
), ec
);
665 int send_to_completion_async(CephContext
* cct
, const rgw_pubsub_s3_event
& event
, optional_yield y
) override
{
667 if (ack_level
== ack_level_t::None
) {
668 return kafka::publish(conn
, topic
, json_format_pubsub_event(event
));
670 // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
671 auto w
= std::unique_ptr
<Waiter
>(new Waiter
);
672 const auto rc
= kafka::publish_with_confirm(conn
,
674 json_format_pubsub_event(event
),
675 std::bind(&Waiter::finish
, w
.get(), std::placeholders::_1
));
677 // failed to publish, does not wait for reply
684 std::string
to_str() const override
{
685 std::string
str("Kafka Endpoint");
686 str
+= kafka::to_string(conn
);
687 str
+= "\nTopic: " + topic
;
692 static const std::string
KAFKA_SCHEMA("kafka");
693 #endif // ifdef WITH_RADOSGW_KAFKA_ENDPOINT
695 static const std::string
WEBHOOK_SCHEMA("webhook");
696 static const std::string
UNKNOWN_SCHEMA("unknown");
697 static const std::string
NO_SCHEMA("");
699 const std::string
& get_schema(const std::string
& endpoint
) {
700 if (endpoint
.empty()) {
703 const auto pos
= endpoint
.find(':');
704 if (pos
== std::string::npos
) {
705 return UNKNOWN_SCHEMA
;
707 const auto& schema
= endpoint
.substr(0,pos
);
708 if (schema
== "http" || schema
== "https") {
709 return WEBHOOK_SCHEMA
;
710 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
711 } else if (schema
== "amqp" || schema
== "amqps") {
714 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
715 } else if (schema
== "kafka") {
719 return UNKNOWN_SCHEMA
;
722 RGWPubSubEndpoint::Ptr
RGWPubSubEndpoint::create(const std::string
& endpoint
,
723 const std::string
& topic
,
724 const RGWHTTPArgs
& args
,
726 const auto& schema
= get_schema(endpoint
);
727 if (schema
== WEBHOOK_SCHEMA
) {
728 return Ptr(new RGWPubSubHTTPEndpoint(endpoint
, args
));
729 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
730 } else if (schema
== AMQP_SCHEMA
) {
732 std::string version
= args
.get("amqp-version", &exists
);
734 version
= AMQP_0_9_1
;
736 if (version
== AMQP_0_9_1
) {
737 return Ptr(new RGWPubSubAMQPEndpoint(endpoint
, topic
, args
, cct
));
738 } else if (version
== AMQP_1_0
) {
739 throw configuration_error("AMQP: v1.0 not supported");
742 throw configuration_error("AMQP: unknown version: " + version
);
746 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
747 } else if (schema
== KAFKA_SCHEMA
) {
748 return Ptr(new RGWPubSubKafkaEndpoint(endpoint
, topic
, args
, cct
));
752 throw configuration_error("unknown schema in: " + endpoint
);