1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "rgw_pubsub_push.h"
8 #include "include/buffer_fwd.h"
9 #include "common/Formatter.h"
10 #include "common/async/completion.h"
11 #include "rgw_common.h"
12 #include "rgw_data_sync.h"
13 #include "rgw_pubsub.h"
15 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
18 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
19 #include "rgw_kafka.h"
21 #include <boost/asio/yield.hpp>
22 #include <boost/algorithm/string.hpp>
24 #include "rgw_perf_counters.h"
28 template<typename EventType
>
29 std::string
json_format_pubsub_event(const EventType
& event
) {
31 JSONFormatter
f(false);
33 Formatter::ObjectSection
s(f
, EventType::json_type_plural
);
35 Formatter::ArraySection
s(f
, EventType::json_type_plural
);
36 encode_json("", event
, &f
);
43 class RGWPubSubHTTPEndpoint
: public RGWPubSubEndpoint
{
45 const std::string endpoint
;
46 std::string str_ack_level
;
47 typedef unsigned ack_level_t
;
48 ack_level_t ack_level
; // TODO: not used for now
50 static const ack_level_t ACK_LEVEL_ANY
= 0;
51 static const ack_level_t ACK_LEVEL_NON_ERROR
= 1;
53 // PostCR implements async execution of RGWPostHTTPData via coroutine
54 class PostCR
: public RGWPostHTTPData
, public RGWSimpleCoroutine
{
56 RGWDataSyncEnv
* const sync_env
;
58 const ack_level_t ack_level
;
61 PostCR(const std::string
& _post_data
,
62 RGWDataSyncEnv
* _sync_env
,
63 const std::string
& endpoint
,
64 ack_level_t _ack_level
,
66 RGWPostHTTPData(_sync_env
->cct
, "POST", endpoint
, &read_bl
, verify_ssl
),
67 RGWSimpleCoroutine(_sync_env
->cct
),
69 ack_level (_ack_level
) {
70 // ctor also set the data to send
71 set_post_data(_post_data
);
72 set_send_length(_post_data
.length());
75 // send message to endpoint
76 int send_request() override
{
78 const auto rc
= sync_env
->http_manager
->add_request(this);
82 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_pending
);
87 int request_complete() override
{
88 if (perfcounter
) perfcounter
->dec(l_rgw_pubsub_push_pending
);
89 if (ack_level
== ACK_LEVEL_ANY
) {
91 } else if (ack_level
== ACK_LEVEL_NON_ERROR
) {
92 // TODO check result code to be non-error
94 // TODO: check that result code == ack_level
101 RGWPubSubHTTPEndpoint(const std::string
& _endpoint
,
102 const RGWHTTPArgs
& args
) : endpoint(_endpoint
) {
105 str_ack_level
= args
.get("http-ack-level", &exists
);
106 if (!exists
|| str_ack_level
== "any") {
108 ack_level
= ACK_LEVEL_ANY
;
109 } else if (str_ack_level
== "non-error") {
110 ack_level
= ACK_LEVEL_NON_ERROR
;
112 ack_level
= std::atoi(str_ack_level
.c_str());
113 if (ack_level
< 100 || ack_level
>= 600) {
114 throw configuration_error("HTTP/S: invalid http-ack-level: " + str_ack_level
);
118 auto str_verify_ssl
= args
.get("verify-ssl", &exists
);
119 boost::algorithm::to_lower(str_verify_ssl
);
120 // verify server certificate by default
121 if (!exists
|| str_verify_ssl
== "true") {
123 } else if (str_verify_ssl
== "false") {
126 throw configuration_error("HTTP/S: verify-ssl must be true/false, not: " + str_verify_ssl
);
130 RGWCoroutine
* send_to_completion_async(const rgw_pubsub_event
& event
, RGWDataSyncEnv
* env
) override
{
131 return new PostCR(json_format_pubsub_event(event
), env
, endpoint
, ack_level
, verify_ssl
);
134 RGWCoroutine
* send_to_completion_async(const rgw_pubsub_s3_record
& record
, RGWDataSyncEnv
* env
) override
{
135 return new PostCR(json_format_pubsub_event(record
), env
, endpoint
, ack_level
, verify_ssl
);
138 int send_to_completion_async(CephContext
* cct
, const rgw_pubsub_s3_record
& record
, optional_yield y
) override
{
140 RGWPostHTTPData
request(cct
, "POST", endpoint
, &read_bl
, verify_ssl
);
141 const auto post_data
= json_format_pubsub_event(record
);
142 request
.set_post_data(post_data
);
143 request
.set_send_length(post_data
.length());
144 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_pending
);
145 const auto rc
= RGWHTTP::process(&request
, y
);
146 if (perfcounter
) perfcounter
->dec(l_rgw_pubsub_push_pending
);
147 // TODO: use read_bl to process return code and handle according to ack level
151 std::string
to_str() const override
{
152 std::string
str("HTTP/S Endpoint");
153 str
+= "\nURI: " + endpoint
;
154 str
+= "\nAck Level: " + str_ack_level
;
155 str
+= (verify_ssl
? "\nverify SSL" : "\ndon't verify SSL");
161 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
162 class RGWPubSubAMQPEndpoint
: public RGWPubSubEndpoint
{
164 enum class ack_level_t
{
169 CephContext
* const cct
;
170 const std::string endpoint
;
171 const std::string topic
;
172 const std::string exchange
;
173 amqp::connection_ptr_t conn
;
174 ack_level_t ack_level
;
175 std::string str_ack_level
;
177 static std::string
get_exchange(const RGWHTTPArgs
& args
) {
179 const auto exchange
= args
.get("amqp-exchange", &exists
);
181 throw configuration_error("AMQP: missing amqp-exchange");
186 // NoAckPublishCR implements async amqp publishing via coroutine
187 // This coroutine ends when it send the message and does not wait for an ack
188 class NoAckPublishCR
: public RGWCoroutine
{
190 const std::string topic
;
191 amqp::connection_ptr_t conn
;
192 const std::string message
;
195 NoAckPublishCR(CephContext
* cct
,
196 const std::string
& _topic
,
197 amqp::connection_ptr_t
& _conn
,
198 const std::string
& _message
) :
200 topic(_topic
), conn(_conn
), message(_message
) {}
202 // send message to endpoint, without waiting for reply
203 int operate() override
{
205 const auto rc
= amqp::publish(conn
, topic
, message
);
207 return set_cr_error(rc
);
209 return set_cr_done();
215 // AckPublishCR implements async amqp publishing via coroutine
216 // This coroutine ends when an ack is received from the borker
217 // note that it does not wait for an ack fron the end client
218 class AckPublishCR
: public RGWCoroutine
, public RGWIOProvider
{
220 const std::string topic
;
221 amqp::connection_ptr_t conn
;
222 const std::string message
;
223 [[maybe_unused
]] const ack_level_t ack_level
; // TODO not used for now
226 AckPublishCR(CephContext
* cct
,
227 const std::string
& _topic
,
228 amqp::connection_ptr_t
& _conn
,
229 const std::string
& _message
,
230 ack_level_t _ack_level
) :
232 topic(_topic
), conn(_conn
), message(_message
), ack_level(_ack_level
) {}
234 // send message to endpoint, waiting for reply
235 int operate() override
{
239 const auto rc
= amqp::publish_with_confirm(conn
,
242 std::bind(&AckPublishCR::request_complete
, this, std::placeholders::_1
));
244 // failed to publish, does not wait for reply
245 return set_cr_error(rc
);
247 // mark as blocked on the amqp answer
248 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_pending
);
252 return set_cr_done();
257 // callback invoked from the amqp manager thread when ack/nack is received
258 void request_complete(int status
) {
259 ceph_assert(!is_done());
261 // server replied with a nack
262 set_cr_error(status
);
265 if (perfcounter
) perfcounter
->dec(l_rgw_pubsub_push_pending
);
268 // TODO: why are these mandatory in RGWIOProvider?
269 void set_io_user_info(void *_user_info
) override
{
272 void *get_io_user_info() override
{
278 RGWPubSubAMQPEndpoint(const std::string
& _endpoint
,
279 const std::string
& _topic
,
280 const RGWHTTPArgs
& args
,
285 exchange(get_exchange(args
)),
286 conn(amqp::connect(endpoint
, exchange
)) {
288 throw configuration_error("AMQP: failed to create connection to: " + endpoint
);
292 str_ack_level
= args
.get("amqp-ack-level", &exists
);
293 if (!exists
|| str_ack_level
== "broker") {
294 // "broker" is default
295 ack_level
= ack_level_t::Broker
;
296 } else if (str_ack_level
== "none") {
297 ack_level
= ack_level_t::None
;
298 } else if (str_ack_level
== "routable") {
299 ack_level
= ack_level_t::Routable
;
301 throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level
);
305 RGWCoroutine
* send_to_completion_async(const rgw_pubsub_event
& event
, RGWDataSyncEnv
* env
) override
{
307 if (ack_level
== ack_level_t::None
) {
308 return new NoAckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(event
));
310 // TODO: currently broker and routable are the same - this will require different flags
311 // but the same mechanism
312 return new AckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(event
), ack_level
);
316 RGWCoroutine
* send_to_completion_async(const rgw_pubsub_s3_record
& record
, RGWDataSyncEnv
* env
) override
{
318 if (ack_level
== ack_level_t::None
) {
319 return new NoAckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(record
));
321 // TODO: currently broker and routable are the same - this will require different flags
322 // but the same mechanism
323 return new AckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(record
), ack_level
);
327 // this allows waiting untill "finish()" is called from a different thread
328 // waiting could be blocking the waiting thread or yielding, depending
329 // with compilation flag support and whether the optional_yield is set
331 using Signature
= void(boost::system::error_code
);
332 using Completion
= ceph::async::Completion
<Signature
>;
333 std::unique_ptr
<Completion
> completion
= nullptr;
336 mutable std::atomic
<bool> done
= false;
337 mutable std::mutex lock
;
338 mutable std::condition_variable cond
;
340 template <typename ExecutionContext
, typename CompletionToken
>
341 auto async_wait(ExecutionContext
& ctx
, CompletionToken
&& token
) {
342 boost::asio::async_completion
<CompletionToken
, Signature
> init(token
);
343 auto& handler
= init
.completion_handler
;
345 std::unique_lock l
{lock
};
346 completion
= Completion::create(ctx
.get_executor(), std::move(handler
));
348 return init
.result
.get();
352 int wait(optional_yield y
) {
356 #ifdef HAVE_BOOST_CONTEXT
358 auto& io_ctx
= y
.get_io_context();
359 auto& yield_ctx
= y
.get_yield_context();
360 boost::system::error_code ec
;
361 async_wait(io_ctx
, yield_ctx
[ec
]);
365 std::unique_lock
l(lock
);
366 cond
.wait(l
, [this]{return (done
==true);});
371 std::unique_lock l
{lock
};
375 boost::system::error_code
ec(-ret
, boost::system::system_category());
376 Completion::post(std::move(completion
), ec
);
383 int send_to_completion_async(CephContext
* cct
, const rgw_pubsub_s3_record
& record
, optional_yield y
) override
{
385 if (ack_level
== ack_level_t::None
) {
386 return amqp::publish(conn
, topic
, json_format_pubsub_event(record
));
388 // TODO: currently broker and routable are the same - this will require different flags but the same mechanism
389 // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
390 auto w
= std::unique_ptr
<Waiter
>(new Waiter
);
391 const auto rc
= amqp::publish_with_confirm(conn
,
393 json_format_pubsub_event(record
),
394 std::bind(&Waiter::finish
, w
.get(), std::placeholders::_1
));
396 // failed to publish, does not wait for reply
403 std::string
to_str() const override
{
404 std::string
str("AMQP(0.9.1) Endpoint");
405 str
+= "\nURI: " + endpoint
;
406 str
+= "\nTopic: " + topic
;
407 str
+= "\nExchange: " + exchange
;
408 str
+= "\nAck Level: " + str_ack_level
;
413 static const std::string
AMQP_0_9_1("0-9-1");
414 static const std::string
AMQP_1_0("1-0");
415 static const std::string
AMQP_SCHEMA("amqp");
416 #endif // ifdef WITH_RADOSGW_AMQP_ENDPOINT
419 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
420 class RGWPubSubKafkaEndpoint
: public RGWPubSubEndpoint
{
422 enum class ack_level_t
{
426 CephContext
* const cct
;
427 const std::string topic
;
428 kafka::connection_ptr_t conn
;
429 const ack_level_t ack_level
;
431 static bool get_verify_ssl(const RGWHTTPArgs
& args
) {
433 auto str_verify_ssl
= args
.get("verify-ssl", &exists
);
435 // verify server certificate by default
438 boost::algorithm::to_lower(str_verify_ssl
);
439 if (str_verify_ssl
== "true") {
442 if (str_verify_ssl
== "false") {
445 throw configuration_error("'verify-ssl' must be true/false, not: " + str_verify_ssl
);
448 static bool get_use_ssl(const RGWHTTPArgs
& args
) {
450 auto str_use_ssl
= args
.get("use-ssl", &exists
);
452 // by default ssl not used
455 boost::algorithm::to_lower(str_use_ssl
);
456 if (str_use_ssl
== "true") {
459 if (str_use_ssl
== "false") {
462 throw configuration_error("'use-ssl' must be true/false, not: " + str_use_ssl
);
465 static ack_level_t
get_ack_level(const RGWHTTPArgs
& args
) {
468 const auto str_ack_level
= args
.get("kafka-ack-level", &exists
);
469 if (!exists
|| str_ack_level
== "broker") {
470 // "broker" is default
471 return ack_level_t::Broker
;
473 if (str_ack_level
== "none") {
474 return ack_level_t::None
;
476 throw configuration_error("Kafka: invalid kafka-ack-level: " + str_ack_level
);
479 // NoAckPublishCR implements async kafka publishing via coroutine
480 // This coroutine ends when it send the message and does not wait for an ack
481 class NoAckPublishCR
: public RGWCoroutine
{
483 const std::string topic
;
484 kafka::connection_ptr_t conn
;
485 const std::string message
;
488 NoAckPublishCR(CephContext
* cct
,
489 const std::string
& _topic
,
490 kafka::connection_ptr_t
& _conn
,
491 const std::string
& _message
) :
493 topic(_topic
), conn(_conn
), message(_message
) {}
495 // send message to endpoint, without waiting for reply
496 int operate() override
{
498 const auto rc
= kafka::publish(conn
, topic
, message
);
500 return set_cr_error(rc
);
502 return set_cr_done();
508 // AckPublishCR implements async kafka publishing via coroutine
509 // This coroutine ends when an ack is received from the borker
510 // note that it does not wait for an ack fron the end client
511 class AckPublishCR
: public RGWCoroutine
, public RGWIOProvider
{
513 const std::string topic
;
514 kafka::connection_ptr_t conn
;
515 const std::string message
;
518 AckPublishCR(CephContext
* cct
,
519 const std::string
& _topic
,
520 kafka::connection_ptr_t
& _conn
,
521 const std::string
& _message
) :
523 topic(_topic
), conn(_conn
), message(_message
) {}
525 // send message to endpoint, waiting for reply
526 int operate() override
{
530 const auto rc
= kafka::publish_with_confirm(conn
,
533 std::bind(&AckPublishCR::request_complete
, this, std::placeholders::_1
));
535 // failed to publish, does not wait for reply
536 return set_cr_error(rc
);
538 // mark as blocked on the kafka answer
539 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_pending
);
543 return set_cr_done();
548 // callback invoked from the kafka manager thread when ack/nack is received
549 void request_complete(int status
) {
550 ceph_assert(!is_done());
552 // server replied with a nack
553 set_cr_error(status
);
556 if (perfcounter
) perfcounter
->dec(l_rgw_pubsub_push_pending
);
559 // TODO: why are these mandatory in RGWIOProvider?
560 void set_io_user_info(void *_user_info
) override
{
563 void *get_io_user_info() override
{
569 RGWPubSubKafkaEndpoint(const std::string
& _endpoint
,
570 const std::string
& _topic
,
571 const RGWHTTPArgs
& args
,
575 conn(kafka::connect(_endpoint
, get_use_ssl(args
), get_verify_ssl(args
), args
.get_optional("ca-location"))) ,
576 ack_level(get_ack_level(args
)) {
578 throw configuration_error("Kafka: failed to create connection to: " + _endpoint
);
582 RGWCoroutine
* send_to_completion_async(const rgw_pubsub_event
& event
, RGWDataSyncEnv
* env
) override
{
584 if (ack_level
== ack_level_t::None
) {
585 return new NoAckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(event
));
587 return new AckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(event
));
591 RGWCoroutine
* send_to_completion_async(const rgw_pubsub_s3_record
& record
, RGWDataSyncEnv
* env
) override
{
593 if (ack_level
== ack_level_t::None
) {
594 return new NoAckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(record
));
596 return new AckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(record
));
600 // this allows waiting untill "finish()" is called from a different thread
601 // waiting could be blocking the waiting thread or yielding, depending
602 // with compilation flag support and whether the optional_yield is set
604 using Signature
= void(boost::system::error_code
);
605 using Completion
= ceph::async::Completion
<Signature
>;
606 std::unique_ptr
<Completion
> completion
= nullptr;
609 mutable std::atomic
<bool> done
= false;
610 mutable std::mutex lock
;
611 mutable std::condition_variable cond
;
613 template <typename ExecutionContext
, typename CompletionToken
>
614 auto async_wait(ExecutionContext
& ctx
, CompletionToken
&& token
) {
615 boost::asio::async_completion
<CompletionToken
, Signature
> init(token
);
616 auto& handler
= init
.completion_handler
;
618 std::unique_lock l
{lock
};
619 completion
= Completion::create(ctx
.get_executor(), std::move(handler
));
621 return init
.result
.get();
625 int wait(optional_yield y
) {
629 #ifdef HAVE_BOOST_CONTEXT
631 auto& io_ctx
= y
.get_io_context();
632 auto& yield_ctx
= y
.get_yield_context();
633 boost::system::error_code ec
;
634 async_wait(io_ctx
, yield_ctx
[ec
]);
638 std::unique_lock
l(lock
);
639 cond
.wait(l
, [this]{return (done
==true);});
644 std::unique_lock l
{lock
};
648 boost::system::error_code
ec(-ret
, boost::system::system_category());
649 Completion::post(std::move(completion
), ec
);
656 int send_to_completion_async(CephContext
* cct
, const rgw_pubsub_s3_record
& record
, optional_yield y
) override
{
658 if (ack_level
== ack_level_t::None
) {
659 return kafka::publish(conn
, topic
, json_format_pubsub_event(record
));
661 // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
662 auto w
= std::unique_ptr
<Waiter
>(new Waiter
);
663 const auto rc
= kafka::publish_with_confirm(conn
,
665 json_format_pubsub_event(record
),
666 std::bind(&Waiter::finish
, w
.get(), std::placeholders::_1
));
668 // failed to publish, does not wait for reply
675 std::string
to_str() const override
{
676 std::string
str("Kafka Endpoint");
677 str
+= kafka::to_string(conn
);
678 str
+= "\nTopic: " + topic
;
683 static const std::string
KAFKA_SCHEMA("kafka");
684 #endif // ifdef WITH_RADOSGW_KAFKA_ENDPOINT
686 static const std::string
WEBHOOK_SCHEMA("webhook");
687 static const std::string
UNKNOWN_SCHEMA("unknown");
688 static const std::string
NO_SCHEMA("");
690 const std::string
& get_schema(const std::string
& endpoint
) {
691 if (endpoint
.empty()) {
694 const auto pos
= endpoint
.find(':');
695 if (pos
== std::string::npos
) {
696 return UNKNOWN_SCHEMA
;
698 const auto& schema
= endpoint
.substr(0,pos
);
699 if (schema
== "http" || schema
== "https") {
700 return WEBHOOK_SCHEMA
;
701 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
702 } else if (schema
== "amqp") {
705 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
706 } else if (schema
== "kafka") {
710 return UNKNOWN_SCHEMA
;
713 RGWPubSubEndpoint::Ptr
RGWPubSubEndpoint::create(const std::string
& endpoint
,
714 const std::string
& topic
,
715 const RGWHTTPArgs
& args
,
717 const auto& schema
= get_schema(endpoint
);
718 if (schema
== WEBHOOK_SCHEMA
) {
719 return Ptr(new RGWPubSubHTTPEndpoint(endpoint
, args
));
720 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
721 } else if (schema
== AMQP_SCHEMA
) {
723 std::string version
= args
.get("amqp-version", &exists
);
725 version
= AMQP_0_9_1
;
727 if (version
== AMQP_0_9_1
) {
728 return Ptr(new RGWPubSubAMQPEndpoint(endpoint
, topic
, args
, cct
));
729 } else if (version
== AMQP_1_0
) {
730 throw configuration_error("AMQP: v1.0 not supported");
733 throw configuration_error("AMQP: unknown version: " + version
);
736 } else if (schema
== "amqps") {
737 throw configuration_error("AMQP: ssl not supported");
740 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
741 } else if (schema
== KAFKA_SCHEMA
) {
742 return Ptr(new RGWPubSubKafkaEndpoint(endpoint
, topic
, args
, cct
));
746 throw configuration_error("unknown schema in: " + endpoint
);