]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/driver/rados/rgw_pubsub_push.cc
bdb24ce9ad10326d2e4438d20e14c1ede6d9d6a2
[ceph.git] / ceph / src / rgw / driver / rados / rgw_pubsub_push.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "rgw_pubsub_push.h"
5 #include <string>
6 #include <sstream>
7 #include <algorithm>
8 #include "include/buffer_fwd.h"
9 #include "common/Formatter.h"
10 #include "common/iso_8601.h"
11 #include "common/async/completion.h"
12 #include "rgw_common.h"
13 #include "rgw_data_sync.h"
14 #include "rgw_pubsub.h"
15 #include "acconfig.h"
16 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
17 #include "rgw_amqp.h"
18 #endif
19 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
20 #include "rgw_kafka.h"
21 #endif
22 #include <boost/asio/yield.hpp>
23 #include <boost/algorithm/string.hpp>
24 #include <functional>
25 #include "rgw_perf_counters.h"
26
27 using namespace rgw;
28
29 template<typename EventType>
30 std::string json_format_pubsub_event(const EventType& event) {
31 std::stringstream ss;
32 JSONFormatter f(false);
33 {
34 Formatter::ObjectSection s(f, EventType::json_type_plural);
35 {
36 Formatter::ArraySection s(f, EventType::json_type_plural);
37 encode_json("", event, &f);
38 }
39 }
40 f.flush(ss);
41 return ss.str();
42 }
43
44 bool get_bool(const RGWHTTPArgs& args, const std::string& name, bool default_value) {
45 bool value;
46 bool exists;
47 if (args.get_bool(name.c_str(), &value, &exists) == -EINVAL) {
48 throw RGWPubSubEndpoint::configuration_error("invalid boolean value for " + name);
49 }
50 if (!exists) {
51 return default_value;
52 }
53 return value;
54 }
55
56 class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint {
57 private:
58 const std::string endpoint;
59 typedef unsigned ack_level_t;
60 ack_level_t ack_level; // TODO: not used for now
61 const bool verify_ssl;
62 const bool cloudevents;
63 static const ack_level_t ACK_LEVEL_ANY = 0;
64 static const ack_level_t ACK_LEVEL_NON_ERROR = 1;
65
66 public:
67 RGWPubSubHTTPEndpoint(const std::string& _endpoint, const RGWHTTPArgs& args) :
68 endpoint(_endpoint), verify_ssl(get_bool(args, "verify-ssl", true)), cloudevents(get_bool(args, "cloudevents", false))
69 {
70 bool exists;
71 const auto& str_ack_level = args.get("http-ack-level", &exists);
72 if (!exists || str_ack_level == "any") {
73 // "any" is default
74 ack_level = ACK_LEVEL_ANY;
75 } else if (str_ack_level == "non-error") {
76 ack_level = ACK_LEVEL_NON_ERROR;
77 } else {
78 ack_level = std::atoi(str_ack_level.c_str());
79 if (ack_level < 100 || ack_level >= 600) {
80 throw configuration_error("HTTP/S: invalid http-ack-level: " + str_ack_level);
81 }
82 }
83 }
84
85 int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
86 bufferlist read_bl;
87 RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl);
88 const auto post_data = json_format_pubsub_event(event);
89 if (cloudevents) {
90 // following: https://github.com/cloudevents/spec/blob/v1.0.1/http-protocol-binding.md
91 // using "Binary Content Mode"
92 request.append_header("ce-specversion", "1.0");
93 request.append_header("ce-type", "com.amazonaws." + event.eventName);
94 request.append_header("ce-time", to_iso_8601(event.eventTime));
95 // default output of iso8601 is also RFC3339 compatible
96 request.append_header("ce-id", event.x_amz_request_id + "." + event.x_amz_id_2);
97 request.append_header("ce-source", event.eventSource + "." + event.awsRegion + "." + event.bucket_name);
98 request.append_header("ce-subject", event.object_key);
99 }
100 request.set_post_data(post_data);
101 request.set_send_length(post_data.length());
102 request.append_header("Content-Type", "application/json");
103 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
104 const auto rc = RGWHTTP::process(&request, y);
105 if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
106 // TODO: use read_bl to process return code and handle according to ack level
107 return rc;
108 }
109
110 std::string to_str() const override {
111 std::string str("HTTP/S Endpoint");
112 str += "\nURI: " + endpoint;
113 str += (verify_ssl ? "\nverify SSL" : "\ndon't verify SSL");
114 return str;
115 }
116 };
117
118 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
119 class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint {
120 private:
121 enum class ack_level_t {
122 None,
123 Broker,
124 Routable
125 };
126 CephContext* const cct;
127 const std::string endpoint;
128 const std::string topic;
129 const std::string exchange;
130 ack_level_t ack_level;
131 amqp::connection_id_t conn_id;
132
133 bool get_verify_ssl(const RGWHTTPArgs& args) {
134 bool exists;
135 auto str_verify_ssl = args.get("verify-ssl", &exists);
136 if (!exists) {
137 // verify server certificate by default
138 return true;
139 }
140 boost::algorithm::to_lower(str_verify_ssl);
141 if (str_verify_ssl == "true") {
142 return true;
143 }
144 if (str_verify_ssl == "false") {
145 return false;
146 }
147 throw configuration_error("'verify-ssl' must be true/false, not: " + str_verify_ssl);
148 }
149
150 std::string get_exchange(const RGWHTTPArgs& args) {
151 bool exists;
152 const auto exchange = args.get("amqp-exchange", &exists);
153 if (!exists) {
154 throw configuration_error("AMQP: missing amqp-exchange");
155 }
156 return exchange;
157 }
158
159 ack_level_t get_ack_level(const RGWHTTPArgs& args) {
160 bool exists;
161 const auto& str_ack_level = args.get("amqp-ack-level", &exists);
162 if (!exists || str_ack_level == "broker") {
163 // "broker" is default
164 return ack_level_t::Broker;
165 }
166 if (str_ack_level == "none") {
167 return ack_level_t::None;
168 }
169 if (str_ack_level == "routable") {
170 return ack_level_t::Routable;
171 }
172 throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level);
173 }
174
175 public:
176 RGWPubSubAMQPEndpoint(const std::string& _endpoint,
177 const std::string& _topic,
178 const RGWHTTPArgs& args,
179 CephContext* _cct) :
180 cct(_cct),
181 endpoint(_endpoint),
182 topic(_topic),
183 exchange(get_exchange(args)),
184 ack_level(get_ack_level(args)) {
185 if (!amqp::connect(conn_id, endpoint, exchange, (ack_level == ack_level_t::Broker), get_verify_ssl(args), args.get_optional("ca-location"))) {
186 throw configuration_error("AMQP: failed to create connection to: " + endpoint);
187 }
188 }
189
190 // this allows waiting untill "finish()" is called from a different thread
191 // waiting could be blocking the waiting thread or yielding, depending
192 // with compilation flag support and whether the optional_yield is set
193 class Waiter {
194 using Signature = void(boost::system::error_code);
195 using Completion = ceph::async::Completion<Signature>;
196 std::unique_ptr<Completion> completion = nullptr;
197 int ret;
198
199 mutable std::atomic<bool> done = false;
200 mutable std::mutex lock;
201 mutable std::condition_variable cond;
202
203 template <typename ExecutionContext, typename CompletionToken>
204 auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
205 boost::asio::async_completion<CompletionToken, Signature> init(token);
206 auto& handler = init.completion_handler;
207 {
208 std::unique_lock l{lock};
209 completion = Completion::create(ctx.get_executor(), std::move(handler));
210 }
211 return init.result.get();
212 }
213
214 public:
215 int wait(optional_yield y) {
216 if (done) {
217 return ret;
218 }
219 if (y) {
220 auto& io_ctx = y.get_io_context();
221 auto& yield_ctx = y.get_yield_context();
222 boost::system::error_code ec;
223 async_wait(io_ctx, yield_ctx[ec]);
224 return -ec.value();
225 }
226 std::unique_lock l(lock);
227 cond.wait(l, [this]{return (done==true);});
228 return ret;
229 }
230
231 void finish(int r) {
232 std::unique_lock l{lock};
233 ret = r;
234 done = true;
235 if (completion) {
236 boost::system::error_code ec(-ret, boost::system::system_category());
237 Completion::post(std::move(completion), ec);
238 } else {
239 cond.notify_all();
240 }
241 }
242 };
243
244 int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
245 if (ack_level == ack_level_t::None) {
246 return amqp::publish(conn_id, topic, json_format_pubsub_event(event));
247 } else {
248 // TODO: currently broker and routable are the same - this will require different flags but the same mechanism
249 // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
250 auto w = std::unique_ptr<Waiter>(new Waiter);
251 const auto rc = amqp::publish_with_confirm(conn_id,
252 topic,
253 json_format_pubsub_event(event),
254 std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
255 if (rc < 0) {
256 // failed to publish, does not wait for reply
257 return rc;
258 }
259 return w->wait(y);
260 }
261 }
262
263 std::string to_str() const override {
264 std::string str("AMQP(0.9.1) Endpoint");
265 str += "\nURI: " + endpoint;
266 str += "\nTopic: " + topic;
267 str += "\nExchange: " + exchange;
268 return str;
269 }
270 };
271
272 static const std::string AMQP_0_9_1("0-9-1");
273 static const std::string AMQP_1_0("1-0");
274 static const std::string AMQP_SCHEMA("amqp");
275 #endif // ifdef WITH_RADOSGW_AMQP_ENDPOINT
276
277 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
278 class RGWPubSubKafkaEndpoint : public RGWPubSubEndpoint {
279 private:
280 enum class ack_level_t {
281 None,
282 Broker,
283 };
284 CephContext* const cct;
285 const std::string topic;
286 const ack_level_t ack_level;
287 std::string conn_name;
288
289
290 ack_level_t get_ack_level(const RGWHTTPArgs& args) {
291 bool exists;
292 const auto& str_ack_level = args.get("kafka-ack-level", &exists);
293 if (!exists || str_ack_level == "broker") {
294 // "broker" is default
295 return ack_level_t::Broker;
296 }
297 if (str_ack_level == "none") {
298 return ack_level_t::None;
299 }
300 throw configuration_error("Kafka: invalid kafka-ack-level: " + str_ack_level);
301 }
302
303 public:
304 RGWPubSubKafkaEndpoint(const std::string& _endpoint,
305 const std::string& _topic,
306 const RGWHTTPArgs& args,
307 CephContext* _cct) :
308 cct(_cct),
309 topic(_topic),
310 ack_level(get_ack_level(args)) {
311 if (!kafka::connect(conn_name, _endpoint, get_bool(args, "use-ssl", false), get_bool(args, "verify-ssl", true),
312 args.get_optional("ca-location"), args.get_optional("mechanism"))) {
313 throw configuration_error("Kafka: failed to create connection to: " + _endpoint);
314 }
315 }
316
317 // this allows waiting untill "finish()" is called from a different thread
318 // waiting could be blocking the waiting thread or yielding, depending
319 // with compilation flag support and whether the optional_yield is set
320 class Waiter {
321 using Signature = void(boost::system::error_code);
322 using Completion = ceph::async::Completion<Signature>;
323 std::unique_ptr<Completion> completion = nullptr;
324 int ret;
325
326 mutable std::atomic<bool> done = false;
327 mutable std::mutex lock;
328 mutable std::condition_variable cond;
329
330 template <typename ExecutionContext, typename CompletionToken>
331 auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
332 boost::asio::async_completion<CompletionToken, Signature> init(token);
333 auto& handler = init.completion_handler;
334 {
335 std::unique_lock l{lock};
336 completion = Completion::create(ctx.get_executor(), std::move(handler));
337 }
338 return init.result.get();
339 }
340
341 public:
342 int wait(optional_yield y) {
343 if (done) {
344 return ret;
345 }
346 if (y) {
347 auto& io_ctx = y.get_io_context();
348 auto& yield_ctx = y.get_yield_context();
349 boost::system::error_code ec;
350 async_wait(io_ctx, yield_ctx[ec]);
351 return -ec.value();
352 }
353 std::unique_lock l(lock);
354 cond.wait(l, [this]{return (done==true);});
355 return ret;
356 }
357
358 void finish(int r) {
359 std::unique_lock l{lock};
360 ret = r;
361 done = true;
362 if (completion) {
363 boost::system::error_code ec(-ret, boost::system::system_category());
364 Completion::post(std::move(completion), ec);
365 } else {
366 cond.notify_all();
367 }
368 }
369 };
370
371 int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
372 if (ack_level == ack_level_t::None) {
373 return kafka::publish(conn_name, topic, json_format_pubsub_event(event));
374 } else {
375 // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
376 auto w = std::unique_ptr<Waiter>(new Waiter);
377 const auto rc = kafka::publish_with_confirm(conn_name,
378 topic,
379 json_format_pubsub_event(event),
380 std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
381 if (rc < 0) {
382 // failed to publish, does not wait for reply
383 return rc;
384 }
385 return w->wait(y);
386 }
387 }
388
389 std::string to_str() const override {
390 std::string str("Kafka Endpoint");
391 str += "\nBroker: " + conn_name;
392 str += "\nTopic: " + topic;
393 return str;
394 }
395 };
396
397 static const std::string KAFKA_SCHEMA("kafka");
398 #endif // ifdef WITH_RADOSGW_KAFKA_ENDPOINT
399
400 static const std::string WEBHOOK_SCHEMA("webhook");
401 static const std::string UNKNOWN_SCHEMA("unknown");
402 static const std::string NO_SCHEMA("");
403
404 const std::string& get_schema(const std::string& endpoint) {
405 if (endpoint.empty()) {
406 return NO_SCHEMA;
407 }
408 const auto pos = endpoint.find(':');
409 if (pos == std::string::npos) {
410 return UNKNOWN_SCHEMA;
411 }
412 const auto& schema = endpoint.substr(0,pos);
413 if (schema == "http" || schema == "https") {
414 return WEBHOOK_SCHEMA;
415 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
416 } else if (schema == "amqp" || schema == "amqps") {
417 return AMQP_SCHEMA;
418 #endif
419 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
420 } else if (schema == "kafka") {
421 return KAFKA_SCHEMA;
422 #endif
423 }
424 return UNKNOWN_SCHEMA;
425 }
426
427 RGWPubSubEndpoint::Ptr RGWPubSubEndpoint::create(const std::string& endpoint,
428 const std::string& topic,
429 const RGWHTTPArgs& args,
430 CephContext* cct) {
431 const auto& schema = get_schema(endpoint);
432 if (schema == WEBHOOK_SCHEMA) {
433 return Ptr(new RGWPubSubHTTPEndpoint(endpoint, args));
434 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
435 } else if (schema == AMQP_SCHEMA) {
436 bool exists;
437 std::string version = args.get("amqp-version", &exists);
438 if (!exists) {
439 version = AMQP_0_9_1;
440 }
441 if (version == AMQP_0_9_1) {
442 return Ptr(new RGWPubSubAMQPEndpoint(endpoint, topic, args, cct));
443 } else if (version == AMQP_1_0) {
444 throw configuration_error("AMQP: v1.0 not supported");
445 return nullptr;
446 } else {
447 throw configuration_error("AMQP: unknown version: " + version);
448 return nullptr;
449 }
450 #endif
451 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
452 } else if (schema == KAFKA_SCHEMA) {
453 return Ptr(new RGWPubSubKafkaEndpoint(endpoint, topic, args, cct));
454 #endif
455 }
456
457 throw configuration_error("unknown schema in: " + endpoint);
458 return nullptr;
459 }
460