]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/driver/rados/rgw_pubsub_push.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rgw / driver / rados / rgw_pubsub_push.cc
CommitLineData
11fdf7f2 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
11fdf7f2
TL
3
4#include "rgw_pubsub_push.h"
5#include <string>
6#include <sstream>
7#include <algorithm>
8#include "include/buffer_fwd.h"
9#include "common/Formatter.h"
20effc67 10#include "common/iso_8601.h"
eafe8130 11#include "common/async/completion.h"
11fdf7f2
TL
12#include "rgw_common.h"
13#include "rgw_data_sync.h"
14#include "rgw_pubsub.h"
15#include "acconfig.h"
16#ifdef WITH_RADOSGW_AMQP_ENDPOINT
17#include "rgw_amqp.h"
18#endif
9f95a23c
TL
19#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
20#include "rgw_kafka.h"
21#endif
11fdf7f2
TL
22#include <boost/asio/yield.hpp>
23#include <boost/algorithm/string.hpp>
24#include <functional>
25#include "rgw_perf_counters.h"
26
27using namespace rgw;
28
eafe8130
TL
29template<typename EventType>
30std::string json_format_pubsub_event(const EventType& event) {
11fdf7f2
TL
31 std::stringstream ss;
32 JSONFormatter f(false);
92f5a8d4
TL
33 {
34 Formatter::ObjectSection s(f, EventType::json_type_plural);
35 {
36 Formatter::ArraySection s(f, EventType::json_type_plural);
37 encode_json("", event, &f);
38 }
39 }
11fdf7f2
TL
40 f.flush(ss);
41 return ss.str();
42}
20effc67
TL
43
44bool get_bool(const RGWHTTPArgs& args, const std::string& name, bool default_value) {
45 bool value;
46 bool exists;
47 if (args.get_bool(name.c_str(), &value, &exists) == -EINVAL) {
48 throw RGWPubSubEndpoint::configuration_error("invalid boolean value for " + name);
49 }
50 if (!exists) {
51 return default_value;
52 }
53 return value;
54}
11fdf7f2
TL
55
56class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint {
57private:
58 const std::string endpoint;
11fdf7f2
TL
59 typedef unsigned ack_level_t;
60 ack_level_t ack_level; // TODO: not used for now
20effc67
TL
61 const bool verify_ssl;
62 const bool cloudevents;
11fdf7f2
TL
63 static const ack_level_t ACK_LEVEL_ANY = 0;
64 static const ack_level_t ACK_LEVEL_NON_ERROR = 1;
65
11fdf7f2 66public:
20effc67
TL
67 RGWPubSubHTTPEndpoint(const std::string& _endpoint, const RGWHTTPArgs& args) :
68 endpoint(_endpoint), verify_ssl(get_bool(args, "verify-ssl", true)), cloudevents(get_bool(args, "cloudevents", false))
69 {
eafe8130 70 bool exists;
20effc67 71 const auto& str_ack_level = args.get("http-ack-level", &exists);
eafe8130
TL
72 if (!exists || str_ack_level == "any") {
73 // "any" is default
74 ack_level = ACK_LEVEL_ANY;
75 } else if (str_ack_level == "non-error") {
76 ack_level = ACK_LEVEL_NON_ERROR;
77 } else {
78 ack_level = std::atoi(str_ack_level.c_str());
79 if (ack_level < 100 || ack_level >= 600) {
80 throw configuration_error("HTTP/S: invalid http-ack-level: " + str_ack_level);
11fdf7f2
TL
81 }
82 }
eafe8130
TL
83 }
84
f67539c2 85 int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
eafe8130
TL
86 bufferlist read_bl;
87 RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl);
f67539c2 88 const auto post_data = json_format_pubsub_event(event);
20effc67
TL
89 if (cloudevents) {
90 // following: https://github.com/cloudevents/spec/blob/v1.0.1/http-protocol-binding.md
91 // using "Binary Content Mode"
92 request.append_header("ce-specversion", "1.0");
93 request.append_header("ce-type", "com.amazonaws." + event.eventName);
94 request.append_header("ce-time", to_iso_8601(event.eventTime));
95 // default output of iso8601 is also RFC3339 compatible
96 request.append_header("ce-id", event.x_amz_request_id + "." + event.x_amz_id_2);
97 request.append_header("ce-source", event.eventSource + "." + event.awsRegion + "." + event.bucket_name);
98 request.append_header("ce-subject", event.object_key);
99 }
eafe8130
TL
100 request.set_post_data(post_data);
101 request.set_send_length(post_data.length());
522d829b 102 request.append_header("Content-Type", "application/json");
eafe8130
TL
103 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
104 const auto rc = RGWHTTP::process(&request, y);
105 if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
106 // TODO: use read_bl to process return code and handle according to ack level
107 return rc;
108 }
109
11fdf7f2 110 std::string to_str() const override {
eafe8130 111 std::string str("HTTP/S Endpoint");
11fdf7f2 112 str += "\nURI: " + endpoint;
11fdf7f2
TL
113 str += (verify_ssl ? "\nverify SSL" : "\ndon't verify SSL");
114 return str;
11fdf7f2
TL
115 }
116};
117
118#ifdef WITH_RADOSGW_AMQP_ENDPOINT
119class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint {
eafe8130 120private:
9f95a23c
TL
121 enum class ack_level_t {
122 None,
123 Broker,
124 Routable
eafe8130
TL
125 };
126 CephContext* const cct;
127 const std::string endpoint;
128 const std::string topic;
129 const std::string exchange;
eafe8130 130 ack_level_t ack_level;
e306af50 131 amqp::connection_ptr_t conn;
eafe8130 132
f67539c2
TL
133 bool get_verify_ssl(const RGWHTTPArgs& args) {
134 bool exists;
135 auto str_verify_ssl = args.get("verify-ssl", &exists);
136 if (!exists) {
137 // verify server certificate by default
138 return true;
139 }
140 boost::algorithm::to_lower(str_verify_ssl);
141 if (str_verify_ssl == "true") {
142 return true;
143 }
144 if (str_verify_ssl == "false") {
145 return false;
146 }
147 throw configuration_error("'verify-ssl' must be true/false, not: " + str_verify_ssl);
148 }
149
e306af50 150 std::string get_exchange(const RGWHTTPArgs& args) {
eafe8130
TL
151 bool exists;
152 const auto exchange = args.get("amqp-exchange", &exists);
153 if (!exists) {
154 throw configuration_error("AMQP: missing amqp-exchange");
11fdf7f2 155 }
eafe8130
TL
156 return exchange;
157 }
11fdf7f2 158
e306af50
TL
159 ack_level_t get_ack_level(const RGWHTTPArgs& args) {
160 bool exists;
161 const auto& str_ack_level = args.get("amqp-ack-level", &exists);
162 if (!exists || str_ack_level == "broker") {
163 // "broker" is default
164 return ack_level_t::Broker;
165 }
166 if (str_ack_level == "none") {
167 return ack_level_t::None;
168 }
169 if (str_ack_level == "routable") {
170 return ack_level_t::Routable;
171 }
172 throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level);
173 }
e306af50 174
eafe8130
TL
175public:
176 RGWPubSubAMQPEndpoint(const std::string& _endpoint,
177 const std::string& _topic,
178 const RGWHTTPArgs& args,
179 CephContext* _cct) :
180 cct(_cct),
181 endpoint(_endpoint),
182 topic(_topic),
183 exchange(get_exchange(args)),
e306af50 184 ack_level(get_ack_level(args)),
f67539c2 185 conn(amqp::connect(endpoint, exchange, (ack_level == ack_level_t::Broker), get_verify_ssl(args), args.get_optional("ca-location"))) {
eafe8130
TL
186 if (!conn) {
187 throw configuration_error("AMQP: failed to create connection to: " + endpoint);
188 }
eafe8130
TL
189 }
190
eafe8130
TL
191 // this allows waiting untill "finish()" is called from a different thread
192 // waiting could be blocking the waiting thread or yielding, depending
193 // with compilation flag support and whether the optional_yield is set
194 class Waiter {
195 using Signature = void(boost::system::error_code);
196 using Completion = ceph::async::Completion<Signature>;
197 std::unique_ptr<Completion> completion = nullptr;
198 int ret;
199
200 mutable std::atomic<bool> done = false;
201 mutable std::mutex lock;
202 mutable std::condition_variable cond;
203
204 template <typename ExecutionContext, typename CompletionToken>
205 auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
206 boost::asio::async_completion<CompletionToken, Signature> init(token);
207 auto& handler = init.completion_handler;
208 {
209 std::unique_lock l{lock};
210 completion = Completion::create(ctx.get_executor(), std::move(handler));
211 }
212 return init.result.get();
213 }
214
11fdf7f2 215 public:
eafe8130
TL
216 int wait(optional_yield y) {
217 if (done) {
218 return ret;
219 }
eafe8130 220 if (y) {
f67539c2 221 auto& io_ctx = y.get_io_context();
eafe8130
TL
222 auto& yield_ctx = y.get_yield_context();
223 boost::system::error_code ec;
224 async_wait(io_ctx, yield_ctx[ec]);
225 return -ec.value();
11fdf7f2 226 }
eafe8130
TL
227 std::unique_lock l(lock);
228 cond.wait(l, [this]{return (done==true);});
229 return ret;
11fdf7f2
TL
230 }
231
eafe8130
TL
232 void finish(int r) {
233 std::unique_lock l{lock};
234 ret = r;
235 done = true;
236 if (completion) {
237 boost::system::error_code ec(-ret, boost::system::system_category());
238 Completion::post(std::move(completion), ec);
11fdf7f2 239 } else {
eafe8130 240 cond.notify_all();
11fdf7f2
TL
241 }
242 }
eafe8130 243 };
11fdf7f2 244
f67539c2 245 int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
eafe8130 246 ceph_assert(conn);
9f95a23c 247 if (ack_level == ack_level_t::None) {
f67539c2 248 return amqp::publish(conn, topic, json_format_pubsub_event(event));
eafe8130
TL
249 } else {
250 // TODO: currently broker and routable are the same - this will require different flags but the same mechanism
251 // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
252 auto w = std::unique_ptr<Waiter>(new Waiter);
253 const auto rc = amqp::publish_with_confirm(conn,
254 topic,
f67539c2 255 json_format_pubsub_event(event),
eafe8130
TL
256 std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
257 if (rc < 0) {
258 // failed to publish, does not wait for reply
259 return rc;
260 }
261 return w->wait(y);
11fdf7f2 262 }
eafe8130
TL
263 }
264
265 std::string to_str() const override {
266 std::string str("AMQP(0.9.1) Endpoint");
267 str += "\nURI: " + endpoint;
268 str += "\nTopic: " + topic;
269 str += "\nExchange: " + exchange;
eafe8130
TL
270 return str;
271 }
11fdf7f2
TL
272};
273
274static const std::string AMQP_0_9_1("0-9-1");
275static const std::string AMQP_1_0("1-0");
eafe8130 276static const std::string AMQP_SCHEMA("amqp");
11fdf7f2
TL
277#endif // ifdef WITH_RADOSGW_AMQP_ENDPOINT
278
9f95a23c
TL
279#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
280class RGWPubSubKafkaEndpoint : public RGWPubSubEndpoint {
281private:
282 enum class ack_level_t {
283 None,
284 Broker,
285 };
286 CephContext* const cct;
287 const std::string topic;
9f95a23c 288 const ack_level_t ack_level;
1e59de90 289 std::string conn_name;
9f95a23c 290
9f95a23c 291
e306af50 292 ack_level_t get_ack_level(const RGWHTTPArgs& args) {
9f95a23c 293 bool exists;
20effc67 294 const auto& str_ack_level = args.get("kafka-ack-level", &exists);
9f95a23c
TL
295 if (!exists || str_ack_level == "broker") {
296 // "broker" is default
297 return ack_level_t::Broker;
298 }
299 if (str_ack_level == "none") {
300 return ack_level_t::None;
301 }
302 throw configuration_error("Kafka: invalid kafka-ack-level: " + str_ack_level);
303 }
304
9f95a23c
TL
305public:
306 RGWPubSubKafkaEndpoint(const std::string& _endpoint,
307 const std::string& _topic,
308 const RGWHTTPArgs& args,
309 CephContext* _cct) :
310 cct(_cct),
311 topic(_topic),
9f95a23c 312 ack_level(get_ack_level(args)) {
1e59de90
TL
313 if (!kafka::connect(conn_name, _endpoint, get_bool(args, "use-ssl", false), get_bool(args, "verify-ssl", true),
314 args.get_optional("ca-location"), args.get_optional("mechanism"))) {
9f95a23c
TL
315 throw configuration_error("Kafka: failed to create connection to: " + _endpoint);
316 }
317 }
318
9f95a23c
TL
319 // this allows waiting untill "finish()" is called from a different thread
320 // waiting could be blocking the waiting thread or yielding, depending
321 // with compilation flag support and whether the optional_yield is set
322 class Waiter {
323 using Signature = void(boost::system::error_code);
324 using Completion = ceph::async::Completion<Signature>;
325 std::unique_ptr<Completion> completion = nullptr;
326 int ret;
327
328 mutable std::atomic<bool> done = false;
329 mutable std::mutex lock;
330 mutable std::condition_variable cond;
331
332 template <typename ExecutionContext, typename CompletionToken>
333 auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
334 boost::asio::async_completion<CompletionToken, Signature> init(token);
335 auto& handler = init.completion_handler;
336 {
337 std::unique_lock l{lock};
338 completion = Completion::create(ctx.get_executor(), std::move(handler));
339 }
340 return init.result.get();
341 }
342
343 public:
344 int wait(optional_yield y) {
345 if (done) {
346 return ret;
347 }
9f95a23c
TL
348 if (y) {
349 auto& io_ctx = y.get_io_context();
350 auto& yield_ctx = y.get_yield_context();
351 boost::system::error_code ec;
352 async_wait(io_ctx, yield_ctx[ec]);
353 return -ec.value();
354 }
9f95a23c
TL
355 std::unique_lock l(lock);
356 cond.wait(l, [this]{return (done==true);});
357 return ret;
358 }
359
360 void finish(int r) {
361 std::unique_lock l{lock};
362 ret = r;
363 done = true;
364 if (completion) {
365 boost::system::error_code ec(-ret, boost::system::system_category());
366 Completion::post(std::move(completion), ec);
367 } else {
368 cond.notify_all();
369 }
370 }
371 };
372
f67539c2 373 int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
9f95a23c 374 if (ack_level == ack_level_t::None) {
1e59de90 375 return kafka::publish(conn_name, topic, json_format_pubsub_event(event));
9f95a23c
TL
376 } else {
377 // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
378 auto w = std::unique_ptr<Waiter>(new Waiter);
1e59de90 379 const auto rc = kafka::publish_with_confirm(conn_name,
9f95a23c 380 topic,
f67539c2 381 json_format_pubsub_event(event),
9f95a23c
TL
382 std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
383 if (rc < 0) {
384 // failed to publish, does not wait for reply
385 return rc;
386 }
387 return w->wait(y);
388 }
389 }
390
391 std::string to_str() const override {
392 std::string str("Kafka Endpoint");
1e59de90 393 str += "\nBroker: " + conn_name;
9f95a23c
TL
394 str += "\nTopic: " + topic;
395 return str;
396 }
397};
398
399static const std::string KAFKA_SCHEMA("kafka");
400#endif // ifdef WITH_RADOSGW_KAFKA_ENDPOINT
401
eafe8130
TL
402static const std::string WEBHOOK_SCHEMA("webhook");
403static const std::string UNKNOWN_SCHEMA("unknown");
404static const std::string NO_SCHEMA("");
405
406const std::string& get_schema(const std::string& endpoint) {
407 if (endpoint.empty()) {
408 return NO_SCHEMA;
409 }
11fdf7f2
TL
410 const auto pos = endpoint.find(':');
411 if (pos == std::string::npos) {
eafe8130 412 return UNKNOWN_SCHEMA;
11fdf7f2
TL
413 }
414 const auto& schema = endpoint.substr(0,pos);
415 if (schema == "http" || schema == "https") {
eafe8130 416 return WEBHOOK_SCHEMA;
11fdf7f2 417#ifdef WITH_RADOSGW_AMQP_ENDPOINT
f67539c2 418 } else if (schema == "amqp" || schema == "amqps") {
eafe8130 419 return AMQP_SCHEMA;
9f95a23c
TL
420#endif
421#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
422 } else if (schema == "kafka") {
423 return KAFKA_SCHEMA;
eafe8130
TL
424#endif
425 }
426 return UNKNOWN_SCHEMA;
427}
428
429RGWPubSubEndpoint::Ptr RGWPubSubEndpoint::create(const std::string& endpoint,
430 const std::string& topic,
431 const RGWHTTPArgs& args,
432 CephContext* cct) {
433 const auto& schema = get_schema(endpoint);
434 if (schema == WEBHOOK_SCHEMA) {
435 return Ptr(new RGWPubSubHTTPEndpoint(endpoint, args));
436#ifdef WITH_RADOSGW_AMQP_ENDPOINT
437 } else if (schema == AMQP_SCHEMA) {
11fdf7f2
TL
438 bool exists;
439 std::string version = args.get("amqp-version", &exists);
440 if (!exists) {
441 version = AMQP_0_9_1;
442 }
443 if (version == AMQP_0_9_1) {
eafe8130 444 return Ptr(new RGWPubSubAMQPEndpoint(endpoint, topic, args, cct));
11fdf7f2 445 } else if (version == AMQP_1_0) {
eafe8130 446 throw configuration_error("AMQP: v1.0 not supported");
11fdf7f2
TL
447 return nullptr;
448 } else {
eafe8130 449 throw configuration_error("AMQP: unknown version: " + version);
11fdf7f2
TL
450 return nullptr;
451 }
9f95a23c
TL
452#endif
453#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
454 } else if (schema == KAFKA_SCHEMA) {
455 return Ptr(new RGWPubSubKafkaEndpoint(endpoint, topic, args, cct));
11fdf7f2
TL
456#endif
457 }
458
eafe8130 459 throw configuration_error("unknown schema in: " + endpoint);
11fdf7f2
TL
460 return nullptr;
461}
462