]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_pubsub_push.cc
import 15.2.4
[ceph.git] / ceph / src / rgw / rgw_pubsub_push.cc
CommitLineData
11fdf7f2 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
11fdf7f2
TL
3
4#include "rgw_pubsub_push.h"
5#include <string>
6#include <sstream>
7#include <algorithm>
8#include "include/buffer_fwd.h"
9#include "common/Formatter.h"
eafe8130 10#include "common/async/completion.h"
11fdf7f2
TL
11#include "rgw_common.h"
12#include "rgw_data_sync.h"
13#include "rgw_pubsub.h"
14#include "acconfig.h"
15#ifdef WITH_RADOSGW_AMQP_ENDPOINT
16#include "rgw_amqp.h"
17#endif
9f95a23c
TL
18#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
19#include "rgw_kafka.h"
20#endif
11fdf7f2
TL
21#include <boost/asio/yield.hpp>
22#include <boost/algorithm/string.hpp>
23#include <functional>
24#include "rgw_perf_counters.h"
25
26using namespace rgw;
27
eafe8130
TL
28template<typename EventType>
29std::string json_format_pubsub_event(const EventType& event) {
11fdf7f2
TL
30 std::stringstream ss;
31 JSONFormatter f(false);
92f5a8d4
TL
32 {
33 Formatter::ObjectSection s(f, EventType::json_type_plural);
34 {
35 Formatter::ArraySection s(f, EventType::json_type_plural);
36 encode_json("", event, &f);
37 }
38 }
11fdf7f2
TL
39 f.flush(ss);
40 return ss.str();
41}
42
43class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint {
44private:
45 const std::string endpoint;
46 std::string str_ack_level;
47 typedef unsigned ack_level_t;
48 ack_level_t ack_level; // TODO: not used for now
49 bool verify_ssl;
50 static const ack_level_t ACK_LEVEL_ANY = 0;
51 static const ack_level_t ACK_LEVEL_NON_ERROR = 1;
52
53 // PostCR implements async execution of RGWPostHTTPData via coroutine
54 class PostCR : public RGWPostHTTPData, public RGWSimpleCoroutine {
55 private:
56 RGWDataSyncEnv* const sync_env;
57 bufferlist read_bl;
58 const ack_level_t ack_level;
59
60 public:
61 PostCR(const std::string& _post_data,
62 RGWDataSyncEnv* _sync_env,
63 const std::string& endpoint,
64 ack_level_t _ack_level,
65 bool verify_ssl) :
66 RGWPostHTTPData(_sync_env->cct, "POST", endpoint, &read_bl, verify_ssl),
67 RGWSimpleCoroutine(_sync_env->cct),
68 sync_env(_sync_env),
69 ack_level (_ack_level) {
70 // ctor also set the data to send
71 set_post_data(_post_data);
72 set_send_length(_post_data.length());
73 }
74
75 // send message to endpoint
76 int send_request() override {
77 init_new_io(this);
78 const auto rc = sync_env->http_manager->add_request(this);
79 if (rc < 0) {
80 return rc;
81 }
82 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
83 return 0;
84 }
85
86 // wait for reply
87 int request_complete() override {
88 if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
89 if (ack_level == ACK_LEVEL_ANY) {
90 return 0;
91 } else if (ack_level == ACK_LEVEL_NON_ERROR) {
92 // TODO check result code to be non-error
93 } else {
94 // TODO: check that result code == ack_level
95 }
96 return -1;
97 }
98 };
99
100public:
101 RGWPubSubHTTPEndpoint(const std::string& _endpoint,
eafe8130
TL
102 const RGWHTTPArgs& args) : endpoint(_endpoint) {
103 bool exists;
11fdf7f2 104
eafe8130
TL
105 str_ack_level = args.get("http-ack-level", &exists);
106 if (!exists || str_ack_level == "any") {
107 // "any" is default
108 ack_level = ACK_LEVEL_ANY;
109 } else if (str_ack_level == "non-error") {
110 ack_level = ACK_LEVEL_NON_ERROR;
111 } else {
112 ack_level = std::atoi(str_ack_level.c_str());
113 if (ack_level < 100 || ack_level >= 600) {
114 throw configuration_error("HTTP/S: invalid http-ack-level: " + str_ack_level);
11fdf7f2
TL
115 }
116 }
117
eafe8130
TL
118 auto str_verify_ssl = args.get("verify-ssl", &exists);
119 boost::algorithm::to_lower(str_verify_ssl);
120 // verify server certificate by default
121 if (!exists || str_verify_ssl == "true") {
122 verify_ssl = true;
123 } else if (str_verify_ssl == "false") {
124 verify_ssl = false;
125 } else {
126 throw configuration_error("HTTP/S: verify-ssl must be true/false, not: " + str_verify_ssl);
127 }
128 }
129
11fdf7f2
TL
130 RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
131 return new PostCR(json_format_pubsub_event(event), env, endpoint, ack_level, verify_ssl);
132 }
133
eafe8130
TL
134 RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override {
135 return new PostCR(json_format_pubsub_event(record), env, endpoint, ack_level, verify_ssl);
136 }
137
138 int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override {
139 bufferlist read_bl;
140 RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl);
141 const auto post_data = json_format_pubsub_event(record);
142 request.set_post_data(post_data);
143 request.set_send_length(post_data.length());
144 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
145 const auto rc = RGWHTTP::process(&request, y);
146 if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
147 // TODO: use read_bl to process return code and handle according to ack level
148 return rc;
149 }
150
11fdf7f2 151 std::string to_str() const override {
eafe8130 152 std::string str("HTTP/S Endpoint");
11fdf7f2
TL
153 str += "\nURI: " + endpoint;
154 str += "\nAck Level: " + str_ack_level;
155 str += (verify_ssl ? "\nverify SSL" : "\ndon't verify SSL");
156 return str;
157
158 }
159};
160
161#ifdef WITH_RADOSGW_AMQP_ENDPOINT
162class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint {
eafe8130 163private:
9f95a23c
TL
164 enum class ack_level_t {
165 None,
166 Broker,
167 Routable
eafe8130
TL
168 };
169 CephContext* const cct;
170 const std::string endpoint;
171 const std::string topic;
172 const std::string exchange;
eafe8130 173 ack_level_t ack_level;
e306af50 174 amqp::connection_ptr_t conn;
eafe8130 175
e306af50 176 std::string get_exchange(const RGWHTTPArgs& args) {
eafe8130
TL
177 bool exists;
178 const auto exchange = args.get("amqp-exchange", &exists);
179 if (!exists) {
180 throw configuration_error("AMQP: missing amqp-exchange");
11fdf7f2 181 }
eafe8130
TL
182 return exchange;
183 }
11fdf7f2 184
e306af50
TL
185 ack_level_t get_ack_level(const RGWHTTPArgs& args) {
186 bool exists;
187 const auto& str_ack_level = args.get("amqp-ack-level", &exists);
188 if (!exists || str_ack_level == "broker") {
189 // "broker" is default
190 return ack_level_t::Broker;
191 }
192 if (str_ack_level == "none") {
193 return ack_level_t::None;
194 }
195 if (str_ack_level == "routable") {
196 return ack_level_t::Routable;
197 }
198 throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level);
199 }
200
11fdf7f2
TL
201 // NoAckPublishCR implements async amqp publishing via coroutine
202 // This coroutine ends when it send the message and does not wait for an ack
203 class NoAckPublishCR : public RGWCoroutine {
204 private:
11fdf7f2
TL
205 const std::string topic;
206 amqp::connection_ptr_t conn;
207 const std::string message;
208
209 public:
eafe8130 210 NoAckPublishCR(CephContext* cct,
11fdf7f2
TL
211 const std::string& _topic,
212 amqp::connection_ptr_t& _conn,
213 const std::string& _message) :
eafe8130 214 RGWCoroutine(cct),
11fdf7f2
TL
215 topic(_topic), conn(_conn), message(_message) {}
216
217 // send message to endpoint, without waiting for reply
218 int operate() override {
219 reenter(this) {
220 const auto rc = amqp::publish(conn, topic, message);
221 if (rc < 0) {
222 return set_cr_error(rc);
223 }
224 return set_cr_done();
225 }
226 return 0;
227 }
228 };
229
230 // AckPublishCR implements async amqp publishing via coroutine
231 // This coroutine ends when an ack is received from the borker
232 // note that it does not wait for an ack fron the end client
233 class AckPublishCR : public RGWCoroutine, public RGWIOProvider {
234 private:
11fdf7f2
TL
235 const std::string topic;
236 amqp::connection_ptr_t conn;
237 const std::string message;
11fdf7f2
TL
238
239 public:
eafe8130 240 AckPublishCR(CephContext* cct,
11fdf7f2
TL
241 const std::string& _topic,
242 amqp::connection_ptr_t& _conn,
e306af50 243 const std::string& _message) :
eafe8130 244 RGWCoroutine(cct),
e306af50 245 topic(_topic), conn(_conn), message(_message) {}
11fdf7f2
TL
246
247 // send message to endpoint, waiting for reply
248 int operate() override {
249 reenter(this) {
250 yield {
251 init_new_io(this);
252 const auto rc = amqp::publish_with_confirm(conn,
253 topic,
254 message,
255 std::bind(&AckPublishCR::request_complete, this, std::placeholders::_1));
256 if (rc < 0) {
257 // failed to publish, does not wait for reply
258 return set_cr_error(rc);
259 }
260 // mark as blocked on the amqp answer
261 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
262 io_block();
263 return 0;
264 }
265 return set_cr_done();
266 }
267 return 0;
268 }
269
270 // callback invoked from the amqp manager thread when ack/nack is received
271 void request_complete(int status) {
272 ceph_assert(!is_done());
273 if (status != 0) {
274 // server replied with a nack
275 set_cr_error(status);
276 }
277 io_complete();
278 if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
279 }
280
281 // TODO: why are these mandatory in RGWIOProvider?
282 void set_io_user_info(void *_user_info) override {
283 }
284
285 void *get_io_user_info() override {
286 return nullptr;
287 }
288 };
e306af50 289
eafe8130
TL
290public:
291 RGWPubSubAMQPEndpoint(const std::string& _endpoint,
292 const std::string& _topic,
293 const RGWHTTPArgs& args,
294 CephContext* _cct) :
295 cct(_cct),
296 endpoint(_endpoint),
297 topic(_topic),
298 exchange(get_exchange(args)),
e306af50
TL
299 ack_level(get_ack_level(args)),
300 conn(amqp::connect(endpoint, exchange, (ack_level == ack_level_t::Broker))) {
eafe8130
TL
301 if (!conn) {
302 throw configuration_error("AMQP: failed to create connection to: " + endpoint);
303 }
eafe8130
TL
304 }
305
306 RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
307 ceph_assert(conn);
9f95a23c 308 if (ack_level == ack_level_t::None) {
eafe8130
TL
309 return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
310 } else {
e306af50 311 return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
eafe8130
TL
312 }
313 }
314
315 RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override {
316 ceph_assert(conn);
9f95a23c 317 if (ack_level == ack_level_t::None) {
eafe8130
TL
318 return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(record));
319 } else {
e306af50 320 return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(record));
eafe8130
TL
321 }
322 }
323
324 // this allows waiting untill "finish()" is called from a different thread
325 // waiting could be blocking the waiting thread or yielding, depending
326 // with compilation flag support and whether the optional_yield is set
327 class Waiter {
328 using Signature = void(boost::system::error_code);
329 using Completion = ceph::async::Completion<Signature>;
330 std::unique_ptr<Completion> completion = nullptr;
331 int ret;
332
333 mutable std::atomic<bool> done = false;
334 mutable std::mutex lock;
335 mutable std::condition_variable cond;
336
337 template <typename ExecutionContext, typename CompletionToken>
338 auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
339 boost::asio::async_completion<CompletionToken, Signature> init(token);
340 auto& handler = init.completion_handler;
341 {
342 std::unique_lock l{lock};
343 completion = Completion::create(ctx.get_executor(), std::move(handler));
344 }
345 return init.result.get();
346 }
347
11fdf7f2 348 public:
eafe8130
TL
349 int wait(optional_yield y) {
350 if (done) {
351 return ret;
352 }
353#ifdef HAVE_BOOST_CONTEXT
354 if (y) {
355 auto& io_ctx = y.get_io_context();
356 auto& yield_ctx = y.get_yield_context();
357 boost::system::error_code ec;
358 async_wait(io_ctx, yield_ctx[ec]);
359 return -ec.value();
11fdf7f2 360 }
eafe8130
TL
361#endif
362 std::unique_lock l(lock);
363 cond.wait(l, [this]{return (done==true);});
364 return ret;
11fdf7f2
TL
365 }
366
eafe8130
TL
367 void finish(int r) {
368 std::unique_lock l{lock};
369 ret = r;
370 done = true;
371 if (completion) {
372 boost::system::error_code ec(-ret, boost::system::system_category());
373 Completion::post(std::move(completion), ec);
11fdf7f2 374 } else {
eafe8130 375 cond.notify_all();
11fdf7f2
TL
376 }
377 }
eafe8130 378 };
11fdf7f2 379
eafe8130
TL
380 int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override {
381 ceph_assert(conn);
9f95a23c 382 if (ack_level == ack_level_t::None) {
eafe8130
TL
383 return amqp::publish(conn, topic, json_format_pubsub_event(record));
384 } else {
385 // TODO: currently broker and routable are the same - this will require different flags but the same mechanism
386 // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
387 auto w = std::unique_ptr<Waiter>(new Waiter);
388 const auto rc = amqp::publish_with_confirm(conn,
389 topic,
390 json_format_pubsub_event(record),
391 std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
392 if (rc < 0) {
393 // failed to publish, does not wait for reply
394 return rc;
395 }
396 return w->wait(y);
11fdf7f2 397 }
eafe8130
TL
398 }
399
400 std::string to_str() const override {
401 std::string str("AMQP(0.9.1) Endpoint");
402 str += "\nURI: " + endpoint;
403 str += "\nTopic: " + topic;
404 str += "\nExchange: " + exchange;
eafe8130
TL
405 return str;
406 }
11fdf7f2
TL
407};
408
409static const std::string AMQP_0_9_1("0-9-1");
410static const std::string AMQP_1_0("1-0");
eafe8130 411static const std::string AMQP_SCHEMA("amqp");
11fdf7f2
TL
412#endif // ifdef WITH_RADOSGW_AMQP_ENDPOINT
413
9f95a23c
TL
414
415#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
416class RGWPubSubKafkaEndpoint : public RGWPubSubEndpoint {
417private:
418 enum class ack_level_t {
419 None,
420 Broker,
421 };
422 CephContext* const cct;
423 const std::string topic;
424 kafka::connection_ptr_t conn;
425 const ack_level_t ack_level;
426
e306af50 427 bool get_verify_ssl(const RGWHTTPArgs& args) {
9f95a23c
TL
428 bool exists;
429 auto str_verify_ssl = args.get("verify-ssl", &exists);
430 if (!exists) {
431 // verify server certificate by default
432 return true;
433 }
434 boost::algorithm::to_lower(str_verify_ssl);
435 if (str_verify_ssl == "true") {
436 return true;
437 }
438 if (str_verify_ssl == "false") {
439 return false;
440 }
441 throw configuration_error("'verify-ssl' must be true/false, not: " + str_verify_ssl);
442 }
443
e306af50 444 bool get_use_ssl(const RGWHTTPArgs& args) {
9f95a23c
TL
445 bool exists;
446 auto str_use_ssl = args.get("use-ssl", &exists);
447 if (!exists) {
448 // by default ssl not used
449 return false;
450 }
451 boost::algorithm::to_lower(str_use_ssl);
452 if (str_use_ssl == "true") {
453 return true;
454 }
455 if (str_use_ssl == "false") {
456 return false;
457 }
458 throw configuration_error("'use-ssl' must be true/false, not: " + str_use_ssl);
459 }
460
e306af50 461 ack_level_t get_ack_level(const RGWHTTPArgs& args) {
9f95a23c
TL
462 bool exists;
463 // get ack level
464 const auto str_ack_level = args.get("kafka-ack-level", &exists);
465 if (!exists || str_ack_level == "broker") {
466 // "broker" is default
467 return ack_level_t::Broker;
468 }
469 if (str_ack_level == "none") {
470 return ack_level_t::None;
471 }
472 throw configuration_error("Kafka: invalid kafka-ack-level: " + str_ack_level);
473 }
474
475 // NoAckPublishCR implements async kafka publishing via coroutine
476 // This coroutine ends when it send the message and does not wait for an ack
477 class NoAckPublishCR : public RGWCoroutine {
478 private:
479 const std::string topic;
480 kafka::connection_ptr_t conn;
481 const std::string message;
482
483 public:
484 NoAckPublishCR(CephContext* cct,
485 const std::string& _topic,
486 kafka::connection_ptr_t& _conn,
487 const std::string& _message) :
488 RGWCoroutine(cct),
489 topic(_topic), conn(_conn), message(_message) {}
490
491 // send message to endpoint, without waiting for reply
492 int operate() override {
493 reenter(this) {
494 const auto rc = kafka::publish(conn, topic, message);
495 if (rc < 0) {
496 return set_cr_error(rc);
497 }
498 return set_cr_done();
499 }
500 return 0;
501 }
502 };
503
504 // AckPublishCR implements async kafka publishing via coroutine
505 // This coroutine ends when an ack is received from the borker
506 // note that it does not wait for an ack fron the end client
507 class AckPublishCR : public RGWCoroutine, public RGWIOProvider {
508 private:
509 const std::string topic;
510 kafka::connection_ptr_t conn;
511 const std::string message;
512
513 public:
514 AckPublishCR(CephContext* cct,
515 const std::string& _topic,
516 kafka::connection_ptr_t& _conn,
517 const std::string& _message) :
518 RGWCoroutine(cct),
519 topic(_topic), conn(_conn), message(_message) {}
520
521 // send message to endpoint, waiting for reply
522 int operate() override {
523 reenter(this) {
524 yield {
525 init_new_io(this);
526 const auto rc = kafka::publish_with_confirm(conn,
527 topic,
528 message,
529 std::bind(&AckPublishCR::request_complete, this, std::placeholders::_1));
530 if (rc < 0) {
531 // failed to publish, does not wait for reply
532 return set_cr_error(rc);
533 }
534 // mark as blocked on the kafka answer
535 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
536 io_block();
537 return 0;
538 }
539 return set_cr_done();
540 }
541 return 0;
542 }
543
544 // callback invoked from the kafka manager thread when ack/nack is received
545 void request_complete(int status) {
546 ceph_assert(!is_done());
547 if (status != 0) {
548 // server replied with a nack
549 set_cr_error(status);
550 }
551 io_complete();
552 if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
553 }
554
555 // TODO: why are these mandatory in RGWIOProvider?
556 void set_io_user_info(void *_user_info) override {
557 }
558
559 void *get_io_user_info() override {
560 return nullptr;
561 }
562 };
563
564public:
565 RGWPubSubKafkaEndpoint(const std::string& _endpoint,
566 const std::string& _topic,
567 const RGWHTTPArgs& args,
568 CephContext* _cct) :
569 cct(_cct),
570 topic(_topic),
571 conn(kafka::connect(_endpoint, get_use_ssl(args), get_verify_ssl(args), args.get_optional("ca-location"))) ,
572 ack_level(get_ack_level(args)) {
573 if (!conn) {
574 throw configuration_error("Kafka: failed to create connection to: " + _endpoint);
575 }
576 }
577
578 RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
579 ceph_assert(conn);
580 if (ack_level == ack_level_t::None) {
581 return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
582 } else {
583 return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
584 }
585 }
586
587 RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override {
588 ceph_assert(conn);
589 if (ack_level == ack_level_t::None) {
590 return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(record));
591 } else {
592 return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(record));
593 }
594 }
595
596 // this allows waiting untill "finish()" is called from a different thread
597 // waiting could be blocking the waiting thread or yielding, depending
598 // with compilation flag support and whether the optional_yield is set
599 class Waiter {
600 using Signature = void(boost::system::error_code);
601 using Completion = ceph::async::Completion<Signature>;
602 std::unique_ptr<Completion> completion = nullptr;
603 int ret;
604
605 mutable std::atomic<bool> done = false;
606 mutable std::mutex lock;
607 mutable std::condition_variable cond;
608
609 template <typename ExecutionContext, typename CompletionToken>
610 auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
611 boost::asio::async_completion<CompletionToken, Signature> init(token);
612 auto& handler = init.completion_handler;
613 {
614 std::unique_lock l{lock};
615 completion = Completion::create(ctx.get_executor(), std::move(handler));
616 }
617 return init.result.get();
618 }
619
620 public:
621 int wait(optional_yield y) {
622 if (done) {
623 return ret;
624 }
625#ifdef HAVE_BOOST_CONTEXT
626 if (y) {
627 auto& io_ctx = y.get_io_context();
628 auto& yield_ctx = y.get_yield_context();
629 boost::system::error_code ec;
630 async_wait(io_ctx, yield_ctx[ec]);
631 return -ec.value();
632 }
633#endif
634 std::unique_lock l(lock);
635 cond.wait(l, [this]{return (done==true);});
636 return ret;
637 }
638
639 void finish(int r) {
640 std::unique_lock l{lock};
641 ret = r;
642 done = true;
643 if (completion) {
644 boost::system::error_code ec(-ret, boost::system::system_category());
645 Completion::post(std::move(completion), ec);
646 } else {
647 cond.notify_all();
648 }
649 }
650 };
651
652 int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override {
653 ceph_assert(conn);
654 if (ack_level == ack_level_t::None) {
655 return kafka::publish(conn, topic, json_format_pubsub_event(record));
656 } else {
657 // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
658 auto w = std::unique_ptr<Waiter>(new Waiter);
659 const auto rc = kafka::publish_with_confirm(conn,
660 topic,
661 json_format_pubsub_event(record),
662 std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
663 if (rc < 0) {
664 // failed to publish, does not wait for reply
665 return rc;
666 }
667 return w->wait(y);
668 }
669 }
670
671 std::string to_str() const override {
672 std::string str("Kafka Endpoint");
673 str += kafka::to_string(conn);
674 str += "\nTopic: " + topic;
675 return str;
676 }
677};
678
679static const std::string KAFKA_SCHEMA("kafka");
680#endif // ifdef WITH_RADOSGW_KAFKA_ENDPOINT
681
eafe8130
TL
682static const std::string WEBHOOK_SCHEMA("webhook");
683static const std::string UNKNOWN_SCHEMA("unknown");
684static const std::string NO_SCHEMA("");
685
686const std::string& get_schema(const std::string& endpoint) {
687 if (endpoint.empty()) {
688 return NO_SCHEMA;
689 }
11fdf7f2
TL
690 const auto pos = endpoint.find(':');
691 if (pos == std::string::npos) {
eafe8130 692 return UNKNOWN_SCHEMA;
11fdf7f2
TL
693 }
694 const auto& schema = endpoint.substr(0,pos);
695 if (schema == "http" || schema == "https") {
eafe8130 696 return WEBHOOK_SCHEMA;
11fdf7f2
TL
697#ifdef WITH_RADOSGW_AMQP_ENDPOINT
698 } else if (schema == "amqp") {
eafe8130 699 return AMQP_SCHEMA;
9f95a23c
TL
700#endif
701#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
702 } else if (schema == "kafka") {
703 return KAFKA_SCHEMA;
eafe8130
TL
704#endif
705 }
706 return UNKNOWN_SCHEMA;
707}
708
709RGWPubSubEndpoint::Ptr RGWPubSubEndpoint::create(const std::string& endpoint,
710 const std::string& topic,
711 const RGWHTTPArgs& args,
712 CephContext* cct) {
713 const auto& schema = get_schema(endpoint);
714 if (schema == WEBHOOK_SCHEMA) {
715 return Ptr(new RGWPubSubHTTPEndpoint(endpoint, args));
716#ifdef WITH_RADOSGW_AMQP_ENDPOINT
717 } else if (schema == AMQP_SCHEMA) {
11fdf7f2
TL
718 bool exists;
719 std::string version = args.get("amqp-version", &exists);
720 if (!exists) {
721 version = AMQP_0_9_1;
722 }
723 if (version == AMQP_0_9_1) {
eafe8130 724 return Ptr(new RGWPubSubAMQPEndpoint(endpoint, topic, args, cct));
11fdf7f2 725 } else if (version == AMQP_1_0) {
eafe8130 726 throw configuration_error("AMQP: v1.0 not supported");
11fdf7f2
TL
727 return nullptr;
728 } else {
eafe8130 729 throw configuration_error("AMQP: unknown version: " + version);
11fdf7f2
TL
730 return nullptr;
731 }
732 } else if (schema == "amqps") {
eafe8130 733 throw configuration_error("AMQP: ssl not supported");
11fdf7f2 734 return nullptr;
9f95a23c
TL
735#endif
736#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
737 } else if (schema == KAFKA_SCHEMA) {
738 return Ptr(new RGWPubSubKafkaEndpoint(endpoint, topic, args, cct));
11fdf7f2
TL
739#endif
740 }
741
eafe8130 742 throw configuration_error("unknown schema in: " + endpoint);
11fdf7f2
TL
743 return nullptr;
744}
745