]>
Commit | Line | Data |
---|---|---|
11fdf7f2 | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
9f95a23c | 2 | // vim: ts=8 sw=2 smarttab ft=cpp |
11fdf7f2 TL |
3 | |
4 | #include "rgw_pubsub_push.h" | |
5 | #include <string> | |
6 | #include <sstream> | |
7 | #include <algorithm> | |
8 | #include "include/buffer_fwd.h" | |
9 | #include "common/Formatter.h" | |
20effc67 | 10 | #include "common/iso_8601.h" |
eafe8130 | 11 | #include "common/async/completion.h" |
11fdf7f2 TL |
12 | #include "rgw_common.h" |
13 | #include "rgw_data_sync.h" | |
14 | #include "rgw_pubsub.h" | |
15 | #include "acconfig.h" | |
16 | #ifdef WITH_RADOSGW_AMQP_ENDPOINT | |
17 | #include "rgw_amqp.h" | |
18 | #endif | |
9f95a23c TL |
19 | #ifdef WITH_RADOSGW_KAFKA_ENDPOINT |
20 | #include "rgw_kafka.h" | |
21 | #endif | |
11fdf7f2 TL |
22 | #include <boost/asio/yield.hpp> |
23 | #include <boost/algorithm/string.hpp> | |
24 | #include <functional> | |
25 | #include "rgw_perf_counters.h" | |
26 | ||
27 | using namespace rgw; | |
28 | ||
eafe8130 TL |
29 | template<typename EventType> |
30 | std::string json_format_pubsub_event(const EventType& event) { | |
11fdf7f2 TL |
31 | std::stringstream ss; |
32 | JSONFormatter f(false); | |
92f5a8d4 TL |
33 | { |
34 | Formatter::ObjectSection s(f, EventType::json_type_plural); | |
35 | { | |
36 | Formatter::ArraySection s(f, EventType::json_type_plural); | |
37 | encode_json("", event, &f); | |
38 | } | |
39 | } | |
11fdf7f2 TL |
40 | f.flush(ss); |
41 | return ss.str(); | |
42 | } | |
20effc67 TL |
43 | |
44 | bool get_bool(const RGWHTTPArgs& args, const std::string& name, bool default_value) { | |
45 | bool value; | |
46 | bool exists; | |
47 | if (args.get_bool(name.c_str(), &value, &exists) == -EINVAL) { | |
48 | throw RGWPubSubEndpoint::configuration_error("invalid boolean value for " + name); | |
49 | } | |
50 | if (!exists) { | |
51 | return default_value; | |
52 | } | |
53 | return value; | |
54 | } | |
11fdf7f2 TL |
55 | |
56 | class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint { | |
57 | private: | |
58 | const std::string endpoint; | |
11fdf7f2 TL |
59 | typedef unsigned ack_level_t; |
60 | ack_level_t ack_level; // TODO: not used for now | |
20effc67 TL |
61 | const bool verify_ssl; |
62 | const bool cloudevents; | |
11fdf7f2 TL |
63 | static const ack_level_t ACK_LEVEL_ANY = 0; |
64 | static const ack_level_t ACK_LEVEL_NON_ERROR = 1; | |
65 | ||
11fdf7f2 | 66 | public: |
20effc67 TL |
67 | RGWPubSubHTTPEndpoint(const std::string& _endpoint, const RGWHTTPArgs& args) : |
68 | endpoint(_endpoint), verify_ssl(get_bool(args, "verify-ssl", true)), cloudevents(get_bool(args, "cloudevents", false)) | |
69 | { | |
eafe8130 | 70 | bool exists; |
20effc67 | 71 | const auto& str_ack_level = args.get("http-ack-level", &exists); |
eafe8130 TL |
72 | if (!exists || str_ack_level == "any") { |
73 | // "any" is default | |
74 | ack_level = ACK_LEVEL_ANY; | |
75 | } else if (str_ack_level == "non-error") { | |
76 | ack_level = ACK_LEVEL_NON_ERROR; | |
77 | } else { | |
78 | ack_level = std::atoi(str_ack_level.c_str()); | |
79 | if (ack_level < 100 || ack_level >= 600) { | |
80 | throw configuration_error("HTTP/S: invalid http-ack-level: " + str_ack_level); | |
11fdf7f2 TL |
81 | } |
82 | } | |
eafe8130 TL |
83 | } |
84 | ||
f67539c2 | 85 | int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override { |
eafe8130 TL |
86 | bufferlist read_bl; |
87 | RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl); | |
f67539c2 | 88 | const auto post_data = json_format_pubsub_event(event); |
20effc67 TL |
89 | if (cloudevents) { |
90 | // following: https://github.com/cloudevents/spec/blob/v1.0.1/http-protocol-binding.md | |
91 | // using "Binary Content Mode" | |
92 | request.append_header("ce-specversion", "1.0"); | |
93 | request.append_header("ce-type", "com.amazonaws." + event.eventName); | |
94 | request.append_header("ce-time", to_iso_8601(event.eventTime)); | |
95 | // default output of iso8601 is also RFC3339 compatible | |
96 | request.append_header("ce-id", event.x_amz_request_id + "." + event.x_amz_id_2); | |
97 | request.append_header("ce-source", event.eventSource + "." + event.awsRegion + "." + event.bucket_name); | |
98 | request.append_header("ce-subject", event.object_key); | |
99 | } | |
eafe8130 TL |
100 | request.set_post_data(post_data); |
101 | request.set_send_length(post_data.length()); | |
522d829b | 102 | request.append_header("Content-Type", "application/json"); |
eafe8130 TL |
103 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending); |
104 | const auto rc = RGWHTTP::process(&request, y); | |
105 | if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending); | |
106 | // TODO: use read_bl to process return code and handle according to ack level | |
107 | return rc; | |
108 | } | |
109 | ||
11fdf7f2 | 110 | std::string to_str() const override { |
eafe8130 | 111 | std::string str("HTTP/S Endpoint"); |
11fdf7f2 | 112 | str += "\nURI: " + endpoint; |
11fdf7f2 TL |
113 | str += (verify_ssl ? "\nverify SSL" : "\ndon't verify SSL"); |
114 | return str; | |
11fdf7f2 TL |
115 | } |
116 | }; | |
117 | ||
118 | #ifdef WITH_RADOSGW_AMQP_ENDPOINT | |
119 | class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint { | |
eafe8130 | 120 | private: |
9f95a23c TL |
121 | enum class ack_level_t { |
122 | None, | |
123 | Broker, | |
124 | Routable | |
eafe8130 TL |
125 | }; |
126 | CephContext* const cct; | |
127 | const std::string endpoint; | |
128 | const std::string topic; | |
129 | const std::string exchange; | |
eafe8130 | 130 | ack_level_t ack_level; |
e306af50 | 131 | amqp::connection_ptr_t conn; |
eafe8130 | 132 | |
f67539c2 TL |
133 | bool get_verify_ssl(const RGWHTTPArgs& args) { |
134 | bool exists; | |
135 | auto str_verify_ssl = args.get("verify-ssl", &exists); | |
136 | if (!exists) { | |
137 | // verify server certificate by default | |
138 | return true; | |
139 | } | |
140 | boost::algorithm::to_lower(str_verify_ssl); | |
141 | if (str_verify_ssl == "true") { | |
142 | return true; | |
143 | } | |
144 | if (str_verify_ssl == "false") { | |
145 | return false; | |
146 | } | |
147 | throw configuration_error("'verify-ssl' must be true/false, not: " + str_verify_ssl); | |
148 | } | |
149 | ||
e306af50 | 150 | std::string get_exchange(const RGWHTTPArgs& args) { |
eafe8130 TL |
151 | bool exists; |
152 | const auto exchange = args.get("amqp-exchange", &exists); | |
153 | if (!exists) { | |
154 | throw configuration_error("AMQP: missing amqp-exchange"); | |
11fdf7f2 | 155 | } |
eafe8130 TL |
156 | return exchange; |
157 | } | |
11fdf7f2 | 158 | |
e306af50 TL |
159 | ack_level_t get_ack_level(const RGWHTTPArgs& args) { |
160 | bool exists; | |
161 | const auto& str_ack_level = args.get("amqp-ack-level", &exists); | |
162 | if (!exists || str_ack_level == "broker") { | |
163 | // "broker" is default | |
164 | return ack_level_t::Broker; | |
165 | } | |
166 | if (str_ack_level == "none") { | |
167 | return ack_level_t::None; | |
168 | } | |
169 | if (str_ack_level == "routable") { | |
170 | return ack_level_t::Routable; | |
171 | } | |
172 | throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level); | |
173 | } | |
e306af50 | 174 | |
eafe8130 TL |
175 | public: |
176 | RGWPubSubAMQPEndpoint(const std::string& _endpoint, | |
177 | const std::string& _topic, | |
178 | const RGWHTTPArgs& args, | |
179 | CephContext* _cct) : | |
180 | cct(_cct), | |
181 | endpoint(_endpoint), | |
182 | topic(_topic), | |
183 | exchange(get_exchange(args)), | |
e306af50 | 184 | ack_level(get_ack_level(args)), |
f67539c2 | 185 | conn(amqp::connect(endpoint, exchange, (ack_level == ack_level_t::Broker), get_verify_ssl(args), args.get_optional("ca-location"))) { |
eafe8130 TL |
186 | if (!conn) { |
187 | throw configuration_error("AMQP: failed to create connection to: " + endpoint); | |
188 | } | |
eafe8130 TL |
189 | } |
190 | ||
eafe8130 TL |
191 | // this allows waiting untill "finish()" is called from a different thread |
192 | // waiting could be blocking the waiting thread or yielding, depending | |
193 | // with compilation flag support and whether the optional_yield is set | |
194 | class Waiter { | |
195 | using Signature = void(boost::system::error_code); | |
196 | using Completion = ceph::async::Completion<Signature>; | |
197 | std::unique_ptr<Completion> completion = nullptr; | |
198 | int ret; | |
199 | ||
200 | mutable std::atomic<bool> done = false; | |
201 | mutable std::mutex lock; | |
202 | mutable std::condition_variable cond; | |
203 | ||
204 | template <typename ExecutionContext, typename CompletionToken> | |
205 | auto async_wait(ExecutionContext& ctx, CompletionToken&& token) { | |
206 | boost::asio::async_completion<CompletionToken, Signature> init(token); | |
207 | auto& handler = init.completion_handler; | |
208 | { | |
209 | std::unique_lock l{lock}; | |
210 | completion = Completion::create(ctx.get_executor(), std::move(handler)); | |
211 | } | |
212 | return init.result.get(); | |
213 | } | |
214 | ||
11fdf7f2 | 215 | public: |
eafe8130 TL |
216 | int wait(optional_yield y) { |
217 | if (done) { | |
218 | return ret; | |
219 | } | |
eafe8130 | 220 | if (y) { |
f67539c2 | 221 | auto& io_ctx = y.get_io_context(); |
eafe8130 TL |
222 | auto& yield_ctx = y.get_yield_context(); |
223 | boost::system::error_code ec; | |
224 | async_wait(io_ctx, yield_ctx[ec]); | |
225 | return -ec.value(); | |
11fdf7f2 | 226 | } |
eafe8130 TL |
227 | std::unique_lock l(lock); |
228 | cond.wait(l, [this]{return (done==true);}); | |
229 | return ret; | |
11fdf7f2 TL |
230 | } |
231 | ||
eafe8130 TL |
232 | void finish(int r) { |
233 | std::unique_lock l{lock}; | |
234 | ret = r; | |
235 | done = true; | |
236 | if (completion) { | |
237 | boost::system::error_code ec(-ret, boost::system::system_category()); | |
238 | Completion::post(std::move(completion), ec); | |
11fdf7f2 | 239 | } else { |
eafe8130 | 240 | cond.notify_all(); |
11fdf7f2 TL |
241 | } |
242 | } | |
eafe8130 | 243 | }; |
11fdf7f2 | 244 | |
f67539c2 | 245 | int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override { |
eafe8130 | 246 | ceph_assert(conn); |
9f95a23c | 247 | if (ack_level == ack_level_t::None) { |
f67539c2 | 248 | return amqp::publish(conn, topic, json_format_pubsub_event(event)); |
eafe8130 TL |
249 | } else { |
250 | // TODO: currently broker and routable are the same - this will require different flags but the same mechanism | |
251 | // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine | |
252 | auto w = std::unique_ptr<Waiter>(new Waiter); | |
253 | const auto rc = amqp::publish_with_confirm(conn, | |
254 | topic, | |
f67539c2 | 255 | json_format_pubsub_event(event), |
eafe8130 TL |
256 | std::bind(&Waiter::finish, w.get(), std::placeholders::_1)); |
257 | if (rc < 0) { | |
258 | // failed to publish, does not wait for reply | |
259 | return rc; | |
260 | } | |
261 | return w->wait(y); | |
11fdf7f2 | 262 | } |
eafe8130 TL |
263 | } |
264 | ||
265 | std::string to_str() const override { | |
266 | std::string str("AMQP(0.9.1) Endpoint"); | |
267 | str += "\nURI: " + endpoint; | |
268 | str += "\nTopic: " + topic; | |
269 | str += "\nExchange: " + exchange; | |
eafe8130 TL |
270 | return str; |
271 | } | |
11fdf7f2 TL |
272 | }; |
273 | ||
274 | static const std::string AMQP_0_9_1("0-9-1"); | |
275 | static const std::string AMQP_1_0("1-0"); | |
eafe8130 | 276 | static const std::string AMQP_SCHEMA("amqp"); |
11fdf7f2 TL |
277 | #endif // ifdef WITH_RADOSGW_AMQP_ENDPOINT |
278 | ||
9f95a23c TL |
279 | #ifdef WITH_RADOSGW_KAFKA_ENDPOINT |
280 | class RGWPubSubKafkaEndpoint : public RGWPubSubEndpoint { | |
281 | private: | |
282 | enum class ack_level_t { | |
283 | None, | |
284 | Broker, | |
285 | }; | |
286 | CephContext* const cct; | |
287 | const std::string topic; | |
9f95a23c | 288 | const ack_level_t ack_level; |
1e59de90 | 289 | std::string conn_name; |
9f95a23c | 290 | |
9f95a23c | 291 | |
e306af50 | 292 | ack_level_t get_ack_level(const RGWHTTPArgs& args) { |
9f95a23c | 293 | bool exists; |
20effc67 | 294 | const auto& str_ack_level = args.get("kafka-ack-level", &exists); |
9f95a23c TL |
295 | if (!exists || str_ack_level == "broker") { |
296 | // "broker" is default | |
297 | return ack_level_t::Broker; | |
298 | } | |
299 | if (str_ack_level == "none") { | |
300 | return ack_level_t::None; | |
301 | } | |
302 | throw configuration_error("Kafka: invalid kafka-ack-level: " + str_ack_level); | |
303 | } | |
304 | ||
9f95a23c TL |
305 | public: |
306 | RGWPubSubKafkaEndpoint(const std::string& _endpoint, | |
307 | const std::string& _topic, | |
308 | const RGWHTTPArgs& args, | |
309 | CephContext* _cct) : | |
310 | cct(_cct), | |
311 | topic(_topic), | |
9f95a23c | 312 | ack_level(get_ack_level(args)) { |
1e59de90 TL |
313 | if (!kafka::connect(conn_name, _endpoint, get_bool(args, "use-ssl", false), get_bool(args, "verify-ssl", true), |
314 | args.get_optional("ca-location"), args.get_optional("mechanism"))) { | |
9f95a23c TL |
315 | throw configuration_error("Kafka: failed to create connection to: " + _endpoint); |
316 | } | |
317 | } | |
318 | ||
9f95a23c TL |
319 | // this allows waiting untill "finish()" is called from a different thread |
320 | // waiting could be blocking the waiting thread or yielding, depending | |
321 | // with compilation flag support and whether the optional_yield is set | |
322 | class Waiter { | |
323 | using Signature = void(boost::system::error_code); | |
324 | using Completion = ceph::async::Completion<Signature>; | |
325 | std::unique_ptr<Completion> completion = nullptr; | |
326 | int ret; | |
327 | ||
328 | mutable std::atomic<bool> done = false; | |
329 | mutable std::mutex lock; | |
330 | mutable std::condition_variable cond; | |
331 | ||
332 | template <typename ExecutionContext, typename CompletionToken> | |
333 | auto async_wait(ExecutionContext& ctx, CompletionToken&& token) { | |
334 | boost::asio::async_completion<CompletionToken, Signature> init(token); | |
335 | auto& handler = init.completion_handler; | |
336 | { | |
337 | std::unique_lock l{lock}; | |
338 | completion = Completion::create(ctx.get_executor(), std::move(handler)); | |
339 | } | |
340 | return init.result.get(); | |
341 | } | |
342 | ||
343 | public: | |
344 | int wait(optional_yield y) { | |
345 | if (done) { | |
346 | return ret; | |
347 | } | |
9f95a23c TL |
348 | if (y) { |
349 | auto& io_ctx = y.get_io_context(); | |
350 | auto& yield_ctx = y.get_yield_context(); | |
351 | boost::system::error_code ec; | |
352 | async_wait(io_ctx, yield_ctx[ec]); | |
353 | return -ec.value(); | |
354 | } | |
9f95a23c TL |
355 | std::unique_lock l(lock); |
356 | cond.wait(l, [this]{return (done==true);}); | |
357 | return ret; | |
358 | } | |
359 | ||
360 | void finish(int r) { | |
361 | std::unique_lock l{lock}; | |
362 | ret = r; | |
363 | done = true; | |
364 | if (completion) { | |
365 | boost::system::error_code ec(-ret, boost::system::system_category()); | |
366 | Completion::post(std::move(completion), ec); | |
367 | } else { | |
368 | cond.notify_all(); | |
369 | } | |
370 | } | |
371 | }; | |
372 | ||
f67539c2 | 373 | int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override { |
9f95a23c | 374 | if (ack_level == ack_level_t::None) { |
1e59de90 | 375 | return kafka::publish(conn_name, topic, json_format_pubsub_event(event)); |
9f95a23c TL |
376 | } else { |
377 | // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine | |
378 | auto w = std::unique_ptr<Waiter>(new Waiter); | |
1e59de90 | 379 | const auto rc = kafka::publish_with_confirm(conn_name, |
9f95a23c | 380 | topic, |
f67539c2 | 381 | json_format_pubsub_event(event), |
9f95a23c TL |
382 | std::bind(&Waiter::finish, w.get(), std::placeholders::_1)); |
383 | if (rc < 0) { | |
384 | // failed to publish, does not wait for reply | |
385 | return rc; | |
386 | } | |
387 | return w->wait(y); | |
388 | } | |
389 | } | |
390 | ||
391 | std::string to_str() const override { | |
392 | std::string str("Kafka Endpoint"); | |
1e59de90 | 393 | str += "\nBroker: " + conn_name; |
9f95a23c TL |
394 | str += "\nTopic: " + topic; |
395 | return str; | |
396 | } | |
397 | }; | |
398 | ||
399 | static const std::string KAFKA_SCHEMA("kafka"); | |
400 | #endif // ifdef WITH_RADOSGW_KAFKA_ENDPOINT | |
401 | ||
eafe8130 TL |
402 | static const std::string WEBHOOK_SCHEMA("webhook"); |
403 | static const std::string UNKNOWN_SCHEMA("unknown"); | |
404 | static const std::string NO_SCHEMA(""); | |
405 | ||
406 | const std::string& get_schema(const std::string& endpoint) { | |
407 | if (endpoint.empty()) { | |
408 | return NO_SCHEMA; | |
409 | } | |
11fdf7f2 TL |
410 | const auto pos = endpoint.find(':'); |
411 | if (pos == std::string::npos) { | |
eafe8130 | 412 | return UNKNOWN_SCHEMA; |
11fdf7f2 TL |
413 | } |
414 | const auto& schema = endpoint.substr(0,pos); | |
415 | if (schema == "http" || schema == "https") { | |
eafe8130 | 416 | return WEBHOOK_SCHEMA; |
11fdf7f2 | 417 | #ifdef WITH_RADOSGW_AMQP_ENDPOINT |
f67539c2 | 418 | } else if (schema == "amqp" || schema == "amqps") { |
eafe8130 | 419 | return AMQP_SCHEMA; |
9f95a23c TL |
420 | #endif |
421 | #ifdef WITH_RADOSGW_KAFKA_ENDPOINT | |
422 | } else if (schema == "kafka") { | |
423 | return KAFKA_SCHEMA; | |
eafe8130 TL |
424 | #endif |
425 | } | |
426 | return UNKNOWN_SCHEMA; | |
427 | } | |
428 | ||
429 | RGWPubSubEndpoint::Ptr RGWPubSubEndpoint::create(const std::string& endpoint, | |
430 | const std::string& topic, | |
431 | const RGWHTTPArgs& args, | |
432 | CephContext* cct) { | |
433 | const auto& schema = get_schema(endpoint); | |
434 | if (schema == WEBHOOK_SCHEMA) { | |
435 | return Ptr(new RGWPubSubHTTPEndpoint(endpoint, args)); | |
436 | #ifdef WITH_RADOSGW_AMQP_ENDPOINT | |
437 | } else if (schema == AMQP_SCHEMA) { | |
11fdf7f2 TL |
438 | bool exists; |
439 | std::string version = args.get("amqp-version", &exists); | |
440 | if (!exists) { | |
441 | version = AMQP_0_9_1; | |
442 | } | |
443 | if (version == AMQP_0_9_1) { | |
eafe8130 | 444 | return Ptr(new RGWPubSubAMQPEndpoint(endpoint, topic, args, cct)); |
11fdf7f2 | 445 | } else if (version == AMQP_1_0) { |
eafe8130 | 446 | throw configuration_error("AMQP: v1.0 not supported"); |
11fdf7f2 TL |
447 | return nullptr; |
448 | } else { | |
eafe8130 | 449 | throw configuration_error("AMQP: unknown version: " + version); |
11fdf7f2 TL |
450 | return nullptr; |
451 | } | |
9f95a23c TL |
452 | #endif |
453 | #ifdef WITH_RADOSGW_KAFKA_ENDPOINT | |
454 | } else if (schema == KAFKA_SCHEMA) { | |
455 | return Ptr(new RGWPubSubKafkaEndpoint(endpoint, topic, args, cct)); | |
11fdf7f2 TL |
456 | #endif |
457 | } | |
458 | ||
eafe8130 | 459 | throw configuration_error("unknown schema in: " + endpoint); |
11fdf7f2 TL |
460 | return nullptr; |
461 | } | |
462 |