1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "rgw_pubsub_push.h"
8 #include "include/buffer_fwd.h"
9 #include "common/Formatter.h"
10 #include "common/async/completion.h"
11 #include "rgw_common.h"
12 #include "rgw_data_sync.h"
13 #include "rgw_pubsub.h"
15 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
18 #include <boost/asio/yield.hpp>
19 #include <boost/algorithm/string.hpp>
21 #include "rgw_perf_counters.h"
25 template<typename EventType
>
26 std::string
json_format_pubsub_event(const EventType
& event
) {
28 JSONFormatter
f(false);
30 Formatter::ObjectSection
s(f
, EventType::json_type_plural
);
32 Formatter::ArraySection
s(f
, EventType::json_type_plural
);
33 encode_json("", event
, &f
);
40 class RGWPubSubHTTPEndpoint
: public RGWPubSubEndpoint
{
42 const std::string endpoint
;
43 std::string str_ack_level
;
44 typedef unsigned ack_level_t
;
45 ack_level_t ack_level
; // TODO: not used for now
47 static const ack_level_t ACK_LEVEL_ANY
= 0;
48 static const ack_level_t ACK_LEVEL_NON_ERROR
= 1;
50 // PostCR implements async execution of RGWPostHTTPData via coroutine
51 class PostCR
: public RGWPostHTTPData
, public RGWSimpleCoroutine
{
53 RGWDataSyncEnv
* const sync_env
;
55 const ack_level_t ack_level
;
58 PostCR(const std::string
& _post_data
,
59 RGWDataSyncEnv
* _sync_env
,
60 const std::string
& endpoint
,
61 ack_level_t _ack_level
,
63 RGWPostHTTPData(_sync_env
->cct
, "POST", endpoint
, &read_bl
, verify_ssl
),
64 RGWSimpleCoroutine(_sync_env
->cct
),
66 ack_level (_ack_level
) {
67 // ctor also set the data to send
68 set_post_data(_post_data
);
69 set_send_length(_post_data
.length());
72 // send message to endpoint
73 int send_request() override
{
75 const auto rc
= sync_env
->http_manager
->add_request(this);
79 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_pending
);
84 int request_complete() override
{
85 if (perfcounter
) perfcounter
->dec(l_rgw_pubsub_push_pending
);
86 if (ack_level
== ACK_LEVEL_ANY
) {
88 } else if (ack_level
== ACK_LEVEL_NON_ERROR
) {
89 // TODO check result code to be non-error
91 // TODO: check that result code == ack_level
98 RGWPubSubHTTPEndpoint(const std::string
& _endpoint
,
99 const RGWHTTPArgs
& args
) : endpoint(_endpoint
) {
102 str_ack_level
= args
.get("http-ack-level", &exists
);
103 if (!exists
|| str_ack_level
== "any") {
105 ack_level
= ACK_LEVEL_ANY
;
106 } else if (str_ack_level
== "non-error") {
107 ack_level
= ACK_LEVEL_NON_ERROR
;
109 ack_level
= std::atoi(str_ack_level
.c_str());
110 if (ack_level
< 100 || ack_level
>= 600) {
111 throw configuration_error("HTTP/S: invalid http-ack-level: " + str_ack_level
);
115 auto str_verify_ssl
= args
.get("verify-ssl", &exists
);
116 boost::algorithm::to_lower(str_verify_ssl
);
117 // verify server certificate by default
118 if (!exists
|| str_verify_ssl
== "true") {
120 } else if (str_verify_ssl
== "false") {
123 throw configuration_error("HTTP/S: verify-ssl must be true/false, not: " + str_verify_ssl
);
127 RGWCoroutine
* send_to_completion_async(const rgw_pubsub_event
& event
, RGWDataSyncEnv
* env
) override
{
128 return new PostCR(json_format_pubsub_event(event
), env
, endpoint
, ack_level
, verify_ssl
);
131 RGWCoroutine
* send_to_completion_async(const rgw_pubsub_s3_record
& record
, RGWDataSyncEnv
* env
) override
{
132 return new PostCR(json_format_pubsub_event(record
), env
, endpoint
, ack_level
, verify_ssl
);
135 int send_to_completion_async(CephContext
* cct
, const rgw_pubsub_s3_record
& record
, optional_yield y
) override
{
137 RGWPostHTTPData
request(cct
, "POST", endpoint
, &read_bl
, verify_ssl
);
138 const auto post_data
= json_format_pubsub_event(record
);
139 request
.set_post_data(post_data
);
140 request
.set_send_length(post_data
.length());
141 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_pending
);
142 const auto rc
= RGWHTTP::process(&request
, y
);
143 if (perfcounter
) perfcounter
->dec(l_rgw_pubsub_push_pending
);
144 // TODO: use read_bl to process return code and handle according to ack level
148 std::string
to_str() const override
{
149 std::string
str("HTTP/S Endpoint");
150 str
+= "\nURI: " + endpoint
;
151 str
+= "\nAck Level: " + str_ack_level
;
152 str
+= (verify_ssl
? "\nverify SSL" : "\ndon't verify SSL");
158 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
159 class RGWPubSubAMQPEndpoint
: public RGWPubSubEndpoint
{
166 CephContext
* const cct
;
167 const std::string endpoint
;
168 const std::string topic
;
169 const std::string exchange
;
170 amqp::connection_ptr_t conn
;
171 ack_level_t ack_level
;
172 std::string str_ack_level
;
174 static std::string
get_exchange(const RGWHTTPArgs
& args
) {
176 const auto exchange
= args
.get("amqp-exchange", &exists
);
178 throw configuration_error("AMQP: missing amqp-exchange");
183 // NoAckPublishCR implements async amqp publishing via coroutine
184 // This coroutine ends when it send the message and does not wait for an ack
185 class NoAckPublishCR
: public RGWCoroutine
{
187 const std::string topic
;
188 amqp::connection_ptr_t conn
;
189 const std::string message
;
192 NoAckPublishCR(CephContext
* cct
,
193 const std::string
& _topic
,
194 amqp::connection_ptr_t
& _conn
,
195 const std::string
& _message
) :
197 topic(_topic
), conn(_conn
), message(_message
) {}
199 // send message to endpoint, without waiting for reply
200 int operate() override
{
202 const auto rc
= amqp::publish(conn
, topic
, message
);
204 return set_cr_error(rc
);
206 return set_cr_done();
212 // AckPublishCR implements async amqp publishing via coroutine
213 // This coroutine ends when an ack is received from the borker
214 // note that it does not wait for an ack fron the end client
215 class AckPublishCR
: public RGWCoroutine
, public RGWIOProvider
{
217 const std::string topic
;
218 amqp::connection_ptr_t conn
;
219 const std::string message
;
220 const ack_level_t ack_level
; // TODO not used for now
223 AckPublishCR(CephContext
* cct
,
224 const std::string
& _topic
,
225 amqp::connection_ptr_t
& _conn
,
226 const std::string
& _message
,
227 ack_level_t _ack_level
) :
229 topic(_topic
), conn(_conn
), message(_message
), ack_level(_ack_level
) {}
231 // send message to endpoint, waiting for reply
232 int operate() override
{
236 const auto rc
= amqp::publish_with_confirm(conn
,
239 std::bind(&AckPublishCR::request_complete
, this, std::placeholders::_1
));
241 // failed to publish, does not wait for reply
242 return set_cr_error(rc
);
244 // mark as blocked on the amqp answer
245 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_pending
);
249 return set_cr_done();
254 // callback invoked from the amqp manager thread when ack/nack is received
255 void request_complete(int status
) {
256 ceph_assert(!is_done());
258 // server replied with a nack
259 set_cr_error(status
);
262 if (perfcounter
) perfcounter
->dec(l_rgw_pubsub_push_pending
);
265 // TODO: why are these mandatory in RGWIOProvider?
266 void set_io_user_info(void *_user_info
) override
{
269 void *get_io_user_info() override
{
275 RGWPubSubAMQPEndpoint(const std::string
& _endpoint
,
276 const std::string
& _topic
,
277 const RGWHTTPArgs
& args
,
282 exchange(get_exchange(args
)),
283 conn(amqp::connect(endpoint
, exchange
)) {
285 throw configuration_error("AMQP: failed to create connection to: " + endpoint
);
289 str_ack_level
= args
.get("amqp-ack-level", &exists
);
290 if (!exists
|| str_ack_level
== "broker") {
291 // "broker" is default
292 ack_level
= ACK_LEVEL_BROKER
;
293 } else if (str_ack_level
== "none") {
294 ack_level
= ACK_LEVEL_NONE
;
295 } else if (str_ack_level
== "routable") {
296 ack_level
= ACK_LEVEL_ROUTEABLE
;
298 throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level
);
302 RGWCoroutine
* send_to_completion_async(const rgw_pubsub_event
& event
, RGWDataSyncEnv
* env
) override
{
304 if (ack_level
== ACK_LEVEL_NONE
) {
305 return new NoAckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(event
));
307 // TODO: currently broker and routable are the same - this will require different flags
308 // but the same mechanism
309 return new AckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(event
), ack_level
);
313 RGWCoroutine
* send_to_completion_async(const rgw_pubsub_s3_record
& record
, RGWDataSyncEnv
* env
) override
{
315 if (ack_level
== ACK_LEVEL_NONE
) {
316 return new NoAckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(record
));
318 // TODO: currently broker and routable are the same - this will require different flags
319 // but the same mechanism
320 return new AckPublishCR(cct
, topic
, conn
, json_format_pubsub_event(record
), ack_level
);
324 // this allows waiting untill "finish()" is called from a different thread
325 // waiting could be blocking the waiting thread or yielding, depending
326 // with compilation flag support and whether the optional_yield is set
328 using Signature
= void(boost::system::error_code
);
329 using Completion
= ceph::async::Completion
<Signature
>;
330 std::unique_ptr
<Completion
> completion
= nullptr;
333 mutable std::atomic
<bool> done
= false;
334 mutable std::mutex lock
;
335 mutable std::condition_variable cond
;
337 template <typename ExecutionContext
, typename CompletionToken
>
338 auto async_wait(ExecutionContext
& ctx
, CompletionToken
&& token
) {
339 boost::asio::async_completion
<CompletionToken
, Signature
> init(token
);
340 auto& handler
= init
.completion_handler
;
342 std::unique_lock l
{lock
};
343 completion
= Completion::create(ctx
.get_executor(), std::move(handler
));
345 return init
.result
.get();
349 int wait(optional_yield y
) {
353 #ifdef HAVE_BOOST_CONTEXT
355 auto& io_ctx
= y
.get_io_context();
356 auto& yield_ctx
= y
.get_yield_context();
357 boost::system::error_code ec
;
358 async_wait(io_ctx
, yield_ctx
[ec
]);
362 std::unique_lock
l(lock
);
363 cond
.wait(l
, [this]{return (done
==true);});
368 std::unique_lock l
{lock
};
372 boost::system::error_code
ec(-ret
, boost::system::system_category());
373 Completion::post(std::move(completion
), ec
);
380 int send_to_completion_async(CephContext
* cct
, const rgw_pubsub_s3_record
& record
, optional_yield y
) override
{
382 if (ack_level
== ACK_LEVEL_NONE
) {
383 return amqp::publish(conn
, topic
, json_format_pubsub_event(record
));
385 // TODO: currently broker and routable are the same - this will require different flags but the same mechanism
386 // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
387 auto w
= std::unique_ptr
<Waiter
>(new Waiter
);
388 const auto rc
= amqp::publish_with_confirm(conn
,
390 json_format_pubsub_event(record
),
391 std::bind(&Waiter::finish
, w
.get(), std::placeholders::_1
));
393 // failed to publish, does not wait for reply
400 std::string
to_str() const override
{
401 std::string
str("AMQP(0.9.1) Endpoint");
402 str
+= "\nURI: " + endpoint
;
403 str
+= "\nTopic: " + topic
;
404 str
+= "\nExchange: " + exchange
;
405 str
+= "\nAck Level: " + str_ack_level
;
410 static const std::string
AMQP_0_9_1("0-9-1");
411 static const std::string
AMQP_1_0("1-0");
412 static const std::string
AMQP_SCHEMA("amqp");
413 #endif // ifdef WITH_RADOSGW_AMQP_ENDPOINT
415 static const std::string
WEBHOOK_SCHEMA("webhook");
416 static const std::string
UNKNOWN_SCHEMA("unknown");
417 static const std::string
NO_SCHEMA("");
419 const std::string
& get_schema(const std::string
& endpoint
) {
420 if (endpoint
.empty()) {
423 const auto pos
= endpoint
.find(':');
424 if (pos
== std::string::npos
) {
425 return UNKNOWN_SCHEMA
;
427 const auto& schema
= endpoint
.substr(0,pos
);
428 if (schema
== "http" || schema
== "https") {
429 return WEBHOOK_SCHEMA
;
430 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
431 } else if (schema
== "amqp") {
435 return UNKNOWN_SCHEMA
;
438 RGWPubSubEndpoint::Ptr
RGWPubSubEndpoint::create(const std::string
& endpoint
,
439 const std::string
& topic
,
440 const RGWHTTPArgs
& args
,
442 const auto& schema
= get_schema(endpoint
);
443 if (schema
== WEBHOOK_SCHEMA
) {
444 return Ptr(new RGWPubSubHTTPEndpoint(endpoint
, args
));
445 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
446 } else if (schema
== AMQP_SCHEMA
) {
448 std::string version
= args
.get("amqp-version", &exists
);
450 version
= AMQP_0_9_1
;
452 if (version
== AMQP_0_9_1
) {
453 return Ptr(new RGWPubSubAMQPEndpoint(endpoint
, topic
, args
, cct
));
454 } else if (version
== AMQP_1_0
) {
455 throw configuration_error("AMQP: v1.0 not supported");
458 throw configuration_error("AMQP: unknown version: " + version
);
461 } else if (schema
== "amqps") {
462 throw configuration_error("AMQP: ssl not supported");
467 throw configuration_error("unknown schema in: " + endpoint
);