]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_pubsub_push.cc
6230330d4a6a32440a0bfe485b6b1c40670e6e7e
[ceph.git] / ceph / src / rgw / rgw_pubsub_push.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "rgw_pubsub_push.h"
5 #include <string>
6 #include <sstream>
7 #include <algorithm>
8 #include "include/buffer_fwd.h"
9 #include "common/Formatter.h"
10 #include "common/async/completion.h"
11 #include "rgw_common.h"
12 #include "rgw_data_sync.h"
13 #include "rgw_pubsub.h"
14 #include "acconfig.h"
15 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
16 #include "rgw_amqp.h"
17 #endif
18 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
19 #include "rgw_kafka.h"
20 #endif
21 #include <boost/asio/yield.hpp>
22 #include <boost/algorithm/string.hpp>
23 #include <functional>
24 #include "rgw_perf_counters.h"
25
26 using namespace rgw;
27
28 template<typename EventType>
29 std::string json_format_pubsub_event(const EventType& event) {
30 std::stringstream ss;
31 JSONFormatter f(false);
32 {
33 Formatter::ObjectSection s(f, EventType::json_type_plural);
34 {
35 Formatter::ArraySection s(f, EventType::json_type_plural);
36 encode_json("", event, &f);
37 }
38 }
39 f.flush(ss);
40 return ss.str();
41 }
42
43 class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint {
44 private:
45 const std::string endpoint;
46 std::string str_ack_level;
47 typedef unsigned ack_level_t;
48 ack_level_t ack_level; // TODO: not used for now
49 bool verify_ssl;
50 static const ack_level_t ACK_LEVEL_ANY = 0;
51 static const ack_level_t ACK_LEVEL_NON_ERROR = 1;
52
53 // PostCR implements async execution of RGWPostHTTPData via coroutine
54 class PostCR : public RGWPostHTTPData, public RGWSimpleCoroutine {
55 private:
56 RGWDataSyncEnv* const sync_env;
57 bufferlist read_bl;
58 const ack_level_t ack_level;
59
60 public:
61 PostCR(const std::string& _post_data,
62 RGWDataSyncEnv* _sync_env,
63 const std::string& endpoint,
64 ack_level_t _ack_level,
65 bool verify_ssl) :
66 RGWPostHTTPData(_sync_env->cct, "POST", endpoint, &read_bl, verify_ssl),
67 RGWSimpleCoroutine(_sync_env->cct),
68 sync_env(_sync_env),
69 ack_level (_ack_level) {
70 // ctor also set the data to send
71 set_post_data(_post_data);
72 set_send_length(_post_data.length());
73 }
74
75 // send message to endpoint
76 int send_request() override {
77 init_new_io(this);
78 const auto rc = sync_env->http_manager->add_request(this);
79 if (rc < 0) {
80 return rc;
81 }
82 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
83 return 0;
84 }
85
86 // wait for reply
87 int request_complete() override {
88 if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
89 if (ack_level == ACK_LEVEL_ANY) {
90 return 0;
91 } else if (ack_level == ACK_LEVEL_NON_ERROR) {
92 // TODO check result code to be non-error
93 } else {
94 // TODO: check that result code == ack_level
95 }
96 return -1;
97 }
98 };
99
100 public:
101 RGWPubSubHTTPEndpoint(const std::string& _endpoint,
102 const RGWHTTPArgs& args) : endpoint(_endpoint) {
103 bool exists;
104
105 str_ack_level = args.get("http-ack-level", &exists);
106 if (!exists || str_ack_level == "any") {
107 // "any" is default
108 ack_level = ACK_LEVEL_ANY;
109 } else if (str_ack_level == "non-error") {
110 ack_level = ACK_LEVEL_NON_ERROR;
111 } else {
112 ack_level = std::atoi(str_ack_level.c_str());
113 if (ack_level < 100 || ack_level >= 600) {
114 throw configuration_error("HTTP/S: invalid http-ack-level: " + str_ack_level);
115 }
116 }
117
118 auto str_verify_ssl = args.get("verify-ssl", &exists);
119 boost::algorithm::to_lower(str_verify_ssl);
120 // verify server certificate by default
121 if (!exists || str_verify_ssl == "true") {
122 verify_ssl = true;
123 } else if (str_verify_ssl == "false") {
124 verify_ssl = false;
125 } else {
126 throw configuration_error("HTTP/S: verify-ssl must be true/false, not: " + str_verify_ssl);
127 }
128 }
129
130 RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
131 return new PostCR(json_format_pubsub_event(event), env, endpoint, ack_level, verify_ssl);
132 }
133
134 RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override {
135 return new PostCR(json_format_pubsub_event(record), env, endpoint, ack_level, verify_ssl);
136 }
137
138 int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override {
139 bufferlist read_bl;
140 RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl);
141 const auto post_data = json_format_pubsub_event(record);
142 request.set_post_data(post_data);
143 request.set_send_length(post_data.length());
144 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
145 const auto rc = RGWHTTP::process(&request, y);
146 if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
147 // TODO: use read_bl to process return code and handle according to ack level
148 return rc;
149 }
150
151 std::string to_str() const override {
152 std::string str("HTTP/S Endpoint");
153 str += "\nURI: " + endpoint;
154 str += "\nAck Level: " + str_ack_level;
155 str += (verify_ssl ? "\nverify SSL" : "\ndon't verify SSL");
156 return str;
157
158 }
159 };
160
161 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
162 class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint {
163 private:
164 enum class ack_level_t {
165 None,
166 Broker,
167 Routable
168 };
169 CephContext* const cct;
170 const std::string endpoint;
171 const std::string topic;
172 const std::string exchange;
173 amqp::connection_ptr_t conn;
174 ack_level_t ack_level;
175 std::string str_ack_level;
176
177 static std::string get_exchange(const RGWHTTPArgs& args) {
178 bool exists;
179 const auto exchange = args.get("amqp-exchange", &exists);
180 if (!exists) {
181 throw configuration_error("AMQP: missing amqp-exchange");
182 }
183 return exchange;
184 }
185
186 // NoAckPublishCR implements async amqp publishing via coroutine
187 // This coroutine ends when it send the message and does not wait for an ack
188 class NoAckPublishCR : public RGWCoroutine {
189 private:
190 const std::string topic;
191 amqp::connection_ptr_t conn;
192 const std::string message;
193
194 public:
195 NoAckPublishCR(CephContext* cct,
196 const std::string& _topic,
197 amqp::connection_ptr_t& _conn,
198 const std::string& _message) :
199 RGWCoroutine(cct),
200 topic(_topic), conn(_conn), message(_message) {}
201
202 // send message to endpoint, without waiting for reply
203 int operate() override {
204 reenter(this) {
205 const auto rc = amqp::publish(conn, topic, message);
206 if (rc < 0) {
207 return set_cr_error(rc);
208 }
209 return set_cr_done();
210 }
211 return 0;
212 }
213 };
214
215 // AckPublishCR implements async amqp publishing via coroutine
216 // This coroutine ends when an ack is received from the borker
217 // note that it does not wait for an ack fron the end client
218 class AckPublishCR : public RGWCoroutine, public RGWIOProvider {
219 private:
220 const std::string topic;
221 amqp::connection_ptr_t conn;
222 const std::string message;
223 [[maybe_unused]] const ack_level_t ack_level; // TODO not used for now
224
225 public:
226 AckPublishCR(CephContext* cct,
227 const std::string& _topic,
228 amqp::connection_ptr_t& _conn,
229 const std::string& _message,
230 ack_level_t _ack_level) :
231 RGWCoroutine(cct),
232 topic(_topic), conn(_conn), message(_message), ack_level(_ack_level) {}
233
234 // send message to endpoint, waiting for reply
235 int operate() override {
236 reenter(this) {
237 yield {
238 init_new_io(this);
239 const auto rc = amqp::publish_with_confirm(conn,
240 topic,
241 message,
242 std::bind(&AckPublishCR::request_complete, this, std::placeholders::_1));
243 if (rc < 0) {
244 // failed to publish, does not wait for reply
245 return set_cr_error(rc);
246 }
247 // mark as blocked on the amqp answer
248 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
249 io_block();
250 return 0;
251 }
252 return set_cr_done();
253 }
254 return 0;
255 }
256
257 // callback invoked from the amqp manager thread when ack/nack is received
258 void request_complete(int status) {
259 ceph_assert(!is_done());
260 if (status != 0) {
261 // server replied with a nack
262 set_cr_error(status);
263 }
264 io_complete();
265 if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
266 }
267
268 // TODO: why are these mandatory in RGWIOProvider?
269 void set_io_user_info(void *_user_info) override {
270 }
271
272 void *get_io_user_info() override {
273 return nullptr;
274 }
275 };
276
277 public:
278 RGWPubSubAMQPEndpoint(const std::string& _endpoint,
279 const std::string& _topic,
280 const RGWHTTPArgs& args,
281 CephContext* _cct) :
282 cct(_cct),
283 endpoint(_endpoint),
284 topic(_topic),
285 exchange(get_exchange(args)),
286 conn(amqp::connect(endpoint, exchange)) {
287 if (!conn) {
288 throw configuration_error("AMQP: failed to create connection to: " + endpoint);
289 }
290 bool exists;
291 // get ack level
292 str_ack_level = args.get("amqp-ack-level", &exists);
293 if (!exists || str_ack_level == "broker") {
294 // "broker" is default
295 ack_level = ack_level_t::Broker;
296 } else if (str_ack_level == "none") {
297 ack_level = ack_level_t::None;
298 } else if (str_ack_level == "routable") {
299 ack_level = ack_level_t::Routable;
300 } else {
301 throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level);
302 }
303 }
304
305 RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
306 ceph_assert(conn);
307 if (ack_level == ack_level_t::None) {
308 return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
309 } else {
310 // TODO: currently broker and routable are the same - this will require different flags
311 // but the same mechanism
312 return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event), ack_level);
313 }
314 }
315
316 RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override {
317 ceph_assert(conn);
318 if (ack_level == ack_level_t::None) {
319 return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(record));
320 } else {
321 // TODO: currently broker and routable are the same - this will require different flags
322 // but the same mechanism
323 return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(record), ack_level);
324 }
325 }
326
327 // this allows waiting untill "finish()" is called from a different thread
328 // waiting could be blocking the waiting thread or yielding, depending
329 // with compilation flag support and whether the optional_yield is set
330 class Waiter {
331 using Signature = void(boost::system::error_code);
332 using Completion = ceph::async::Completion<Signature>;
333 std::unique_ptr<Completion> completion = nullptr;
334 int ret;
335
336 mutable std::atomic<bool> done = false;
337 mutable std::mutex lock;
338 mutable std::condition_variable cond;
339
340 template <typename ExecutionContext, typename CompletionToken>
341 auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
342 boost::asio::async_completion<CompletionToken, Signature> init(token);
343 auto& handler = init.completion_handler;
344 {
345 std::unique_lock l{lock};
346 completion = Completion::create(ctx.get_executor(), std::move(handler));
347 }
348 return init.result.get();
349 }
350
351 public:
352 int wait(optional_yield y) {
353 if (done) {
354 return ret;
355 }
356 #ifdef HAVE_BOOST_CONTEXT
357 if (y) {
358 auto& io_ctx = y.get_io_context();
359 auto& yield_ctx = y.get_yield_context();
360 boost::system::error_code ec;
361 async_wait(io_ctx, yield_ctx[ec]);
362 return -ec.value();
363 }
364 #endif
365 std::unique_lock l(lock);
366 cond.wait(l, [this]{return (done==true);});
367 return ret;
368 }
369
370 void finish(int r) {
371 std::unique_lock l{lock};
372 ret = r;
373 done = true;
374 if (completion) {
375 boost::system::error_code ec(-ret, boost::system::system_category());
376 Completion::post(std::move(completion), ec);
377 } else {
378 cond.notify_all();
379 }
380 }
381 };
382
383 int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override {
384 ceph_assert(conn);
385 if (ack_level == ack_level_t::None) {
386 return amqp::publish(conn, topic, json_format_pubsub_event(record));
387 } else {
388 // TODO: currently broker and routable are the same - this will require different flags but the same mechanism
389 // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
390 auto w = std::unique_ptr<Waiter>(new Waiter);
391 const auto rc = amqp::publish_with_confirm(conn,
392 topic,
393 json_format_pubsub_event(record),
394 std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
395 if (rc < 0) {
396 // failed to publish, does not wait for reply
397 return rc;
398 }
399 return w->wait(y);
400 }
401 }
402
403 std::string to_str() const override {
404 std::string str("AMQP(0.9.1) Endpoint");
405 str += "\nURI: " + endpoint;
406 str += "\nTopic: " + topic;
407 str += "\nExchange: " + exchange;
408 str += "\nAck Level: " + str_ack_level;
409 return str;
410 }
411 };
412
413 static const std::string AMQP_0_9_1("0-9-1");
414 static const std::string AMQP_1_0("1-0");
415 static const std::string AMQP_SCHEMA("amqp");
416 #endif // ifdef WITH_RADOSGW_AMQP_ENDPOINT
417
418
419 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
420 class RGWPubSubKafkaEndpoint : public RGWPubSubEndpoint {
421 private:
422 enum class ack_level_t {
423 None,
424 Broker,
425 };
426 CephContext* const cct;
427 const std::string topic;
428 kafka::connection_ptr_t conn;
429 const ack_level_t ack_level;
430
431 static bool get_verify_ssl(const RGWHTTPArgs& args) {
432 bool exists;
433 auto str_verify_ssl = args.get("verify-ssl", &exists);
434 if (!exists) {
435 // verify server certificate by default
436 return true;
437 }
438 boost::algorithm::to_lower(str_verify_ssl);
439 if (str_verify_ssl == "true") {
440 return true;
441 }
442 if (str_verify_ssl == "false") {
443 return false;
444 }
445 throw configuration_error("'verify-ssl' must be true/false, not: " + str_verify_ssl);
446 }
447
448 static bool get_use_ssl(const RGWHTTPArgs& args) {
449 bool exists;
450 auto str_use_ssl = args.get("use-ssl", &exists);
451 if (!exists) {
452 // by default ssl not used
453 return false;
454 }
455 boost::algorithm::to_lower(str_use_ssl);
456 if (str_use_ssl == "true") {
457 return true;
458 }
459 if (str_use_ssl == "false") {
460 return false;
461 }
462 throw configuration_error("'use-ssl' must be true/false, not: " + str_use_ssl);
463 }
464
465 static ack_level_t get_ack_level(const RGWHTTPArgs& args) {
466 bool exists;
467 // get ack level
468 const auto str_ack_level = args.get("kafka-ack-level", &exists);
469 if (!exists || str_ack_level == "broker") {
470 // "broker" is default
471 return ack_level_t::Broker;
472 }
473 if (str_ack_level == "none") {
474 return ack_level_t::None;
475 }
476 throw configuration_error("Kafka: invalid kafka-ack-level: " + str_ack_level);
477 }
478
479 // NoAckPublishCR implements async kafka publishing via coroutine
480 // This coroutine ends when it send the message and does not wait for an ack
481 class NoAckPublishCR : public RGWCoroutine {
482 private:
483 const std::string topic;
484 kafka::connection_ptr_t conn;
485 const std::string message;
486
487 public:
488 NoAckPublishCR(CephContext* cct,
489 const std::string& _topic,
490 kafka::connection_ptr_t& _conn,
491 const std::string& _message) :
492 RGWCoroutine(cct),
493 topic(_topic), conn(_conn), message(_message) {}
494
495 // send message to endpoint, without waiting for reply
496 int operate() override {
497 reenter(this) {
498 const auto rc = kafka::publish(conn, topic, message);
499 if (rc < 0) {
500 return set_cr_error(rc);
501 }
502 return set_cr_done();
503 }
504 return 0;
505 }
506 };
507
508 // AckPublishCR implements async kafka publishing via coroutine
509 // This coroutine ends when an ack is received from the borker
510 // note that it does not wait for an ack fron the end client
511 class AckPublishCR : public RGWCoroutine, public RGWIOProvider {
512 private:
513 const std::string topic;
514 kafka::connection_ptr_t conn;
515 const std::string message;
516
517 public:
518 AckPublishCR(CephContext* cct,
519 const std::string& _topic,
520 kafka::connection_ptr_t& _conn,
521 const std::string& _message) :
522 RGWCoroutine(cct),
523 topic(_topic), conn(_conn), message(_message) {}
524
525 // send message to endpoint, waiting for reply
526 int operate() override {
527 reenter(this) {
528 yield {
529 init_new_io(this);
530 const auto rc = kafka::publish_with_confirm(conn,
531 topic,
532 message,
533 std::bind(&AckPublishCR::request_complete, this, std::placeholders::_1));
534 if (rc < 0) {
535 // failed to publish, does not wait for reply
536 return set_cr_error(rc);
537 }
538 // mark as blocked on the kafka answer
539 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
540 io_block();
541 return 0;
542 }
543 return set_cr_done();
544 }
545 return 0;
546 }
547
548 // callback invoked from the kafka manager thread when ack/nack is received
549 void request_complete(int status) {
550 ceph_assert(!is_done());
551 if (status != 0) {
552 // server replied with a nack
553 set_cr_error(status);
554 }
555 io_complete();
556 if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
557 }
558
559 // TODO: why are these mandatory in RGWIOProvider?
560 void set_io_user_info(void *_user_info) override {
561 }
562
563 void *get_io_user_info() override {
564 return nullptr;
565 }
566 };
567
568 public:
569 RGWPubSubKafkaEndpoint(const std::string& _endpoint,
570 const std::string& _topic,
571 const RGWHTTPArgs& args,
572 CephContext* _cct) :
573 cct(_cct),
574 topic(_topic),
575 conn(kafka::connect(_endpoint, get_use_ssl(args), get_verify_ssl(args), args.get_optional("ca-location"))) ,
576 ack_level(get_ack_level(args)) {
577 if (!conn) {
578 throw configuration_error("Kafka: failed to create connection to: " + _endpoint);
579 }
580 }
581
582 RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
583 ceph_assert(conn);
584 if (ack_level == ack_level_t::None) {
585 return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
586 } else {
587 return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
588 }
589 }
590
591 RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override {
592 ceph_assert(conn);
593 if (ack_level == ack_level_t::None) {
594 return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(record));
595 } else {
596 return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(record));
597 }
598 }
599
600 // this allows waiting untill "finish()" is called from a different thread
601 // waiting could be blocking the waiting thread or yielding, depending
602 // with compilation flag support and whether the optional_yield is set
603 class Waiter {
604 using Signature = void(boost::system::error_code);
605 using Completion = ceph::async::Completion<Signature>;
606 std::unique_ptr<Completion> completion = nullptr;
607 int ret;
608
609 mutable std::atomic<bool> done = false;
610 mutable std::mutex lock;
611 mutable std::condition_variable cond;
612
613 template <typename ExecutionContext, typename CompletionToken>
614 auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
615 boost::asio::async_completion<CompletionToken, Signature> init(token);
616 auto& handler = init.completion_handler;
617 {
618 std::unique_lock l{lock};
619 completion = Completion::create(ctx.get_executor(), std::move(handler));
620 }
621 return init.result.get();
622 }
623
624 public:
625 int wait(optional_yield y) {
626 if (done) {
627 return ret;
628 }
629 #ifdef HAVE_BOOST_CONTEXT
630 if (y) {
631 auto& io_ctx = y.get_io_context();
632 auto& yield_ctx = y.get_yield_context();
633 boost::system::error_code ec;
634 async_wait(io_ctx, yield_ctx[ec]);
635 return -ec.value();
636 }
637 #endif
638 std::unique_lock l(lock);
639 cond.wait(l, [this]{return (done==true);});
640 return ret;
641 }
642
643 void finish(int r) {
644 std::unique_lock l{lock};
645 ret = r;
646 done = true;
647 if (completion) {
648 boost::system::error_code ec(-ret, boost::system::system_category());
649 Completion::post(std::move(completion), ec);
650 } else {
651 cond.notify_all();
652 }
653 }
654 };
655
656 int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override {
657 ceph_assert(conn);
658 if (ack_level == ack_level_t::None) {
659 return kafka::publish(conn, topic, json_format_pubsub_event(record));
660 } else {
661 // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
662 auto w = std::unique_ptr<Waiter>(new Waiter);
663 const auto rc = kafka::publish_with_confirm(conn,
664 topic,
665 json_format_pubsub_event(record),
666 std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
667 if (rc < 0) {
668 // failed to publish, does not wait for reply
669 return rc;
670 }
671 return w->wait(y);
672 }
673 }
674
675 std::string to_str() const override {
676 std::string str("Kafka Endpoint");
677 str += kafka::to_string(conn);
678 str += "\nTopic: " + topic;
679 return str;
680 }
681 };
682
683 static const std::string KAFKA_SCHEMA("kafka");
684 #endif // ifdef WITH_RADOSGW_KAFKA_ENDPOINT
685
686 static const std::string WEBHOOK_SCHEMA("webhook");
687 static const std::string UNKNOWN_SCHEMA("unknown");
688 static const std::string NO_SCHEMA("");
689
690 const std::string& get_schema(const std::string& endpoint) {
691 if (endpoint.empty()) {
692 return NO_SCHEMA;
693 }
694 const auto pos = endpoint.find(':');
695 if (pos == std::string::npos) {
696 return UNKNOWN_SCHEMA;
697 }
698 const auto& schema = endpoint.substr(0,pos);
699 if (schema == "http" || schema == "https") {
700 return WEBHOOK_SCHEMA;
701 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
702 } else if (schema == "amqp") {
703 return AMQP_SCHEMA;
704 #endif
705 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
706 } else if (schema == "kafka") {
707 return KAFKA_SCHEMA;
708 #endif
709 }
710 return UNKNOWN_SCHEMA;
711 }
712
713 RGWPubSubEndpoint::Ptr RGWPubSubEndpoint::create(const std::string& endpoint,
714 const std::string& topic,
715 const RGWHTTPArgs& args,
716 CephContext* cct) {
717 const auto& schema = get_schema(endpoint);
718 if (schema == WEBHOOK_SCHEMA) {
719 return Ptr(new RGWPubSubHTTPEndpoint(endpoint, args));
720 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
721 } else if (schema == AMQP_SCHEMA) {
722 bool exists;
723 std::string version = args.get("amqp-version", &exists);
724 if (!exists) {
725 version = AMQP_0_9_1;
726 }
727 if (version == AMQP_0_9_1) {
728 return Ptr(new RGWPubSubAMQPEndpoint(endpoint, topic, args, cct));
729 } else if (version == AMQP_1_0) {
730 throw configuration_error("AMQP: v1.0 not supported");
731 return nullptr;
732 } else {
733 throw configuration_error("AMQP: unknown version: " + version);
734 return nullptr;
735 }
736 } else if (schema == "amqps") {
737 throw configuration_error("AMQP: ssl not supported");
738 return nullptr;
739 #endif
740 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
741 } else if (schema == KAFKA_SCHEMA) {
742 return Ptr(new RGWPubSubKafkaEndpoint(endpoint, topic, args, cct));
743 #endif
744 }
745
746 throw configuration_error("unknown schema in: " + endpoint);
747 return nullptr;
748 }
749