]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_pubsub_push.cc
3b5b926661051ecd1923f8f2e5a911e99fdea0a4
[ceph.git] / ceph / src / rgw / rgw_pubsub_push.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "rgw_pubsub_push.h"
5 #include <string>
6 #include <sstream>
7 #include <algorithm>
8 #include "include/buffer_fwd.h"
9 #include "common/Formatter.h"
10 #include "common/async/completion.h"
11 #include "rgw_common.h"
12 #include "rgw_data_sync.h"
13 #include "rgw_pubsub.h"
14 #include "acconfig.h"
15 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
16 #include "rgw_amqp.h"
17 #endif
18 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
19 #include "rgw_kafka.h"
20 #endif
21 #include <boost/asio/yield.hpp>
22 #include <boost/algorithm/string.hpp>
23 #include <functional>
24 #include "rgw_perf_counters.h"
25
26 using namespace rgw;
27
28 template<typename EventType>
29 std::string json_format_pubsub_event(const EventType& event) {
30 std::stringstream ss;
31 JSONFormatter f(false);
32 {
33 Formatter::ObjectSection s(f, EventType::json_type_plural);
34 {
35 Formatter::ArraySection s(f, EventType::json_type_plural);
36 encode_json("", event, &f);
37 }
38 }
39 f.flush(ss);
40 return ss.str();
41 }
42
43 class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint {
44 private:
45 const std::string endpoint;
46 std::string str_ack_level;
47 typedef unsigned ack_level_t;
48 ack_level_t ack_level; // TODO: not used for now
49 bool verify_ssl;
50 static const ack_level_t ACK_LEVEL_ANY = 0;
51 static const ack_level_t ACK_LEVEL_NON_ERROR = 1;
52
53 // PostCR implements async execution of RGWPostHTTPData via coroutine
54 class PostCR : public RGWPostHTTPData, public RGWSimpleCoroutine {
55 private:
56 RGWDataSyncEnv* const sync_env;
57 bufferlist read_bl;
58 const ack_level_t ack_level;
59
60 public:
61 PostCR(const std::string& _post_data,
62 RGWDataSyncEnv* _sync_env,
63 const std::string& endpoint,
64 ack_level_t _ack_level,
65 bool verify_ssl) :
66 RGWPostHTTPData(_sync_env->cct, "POST", endpoint, &read_bl, verify_ssl),
67 RGWSimpleCoroutine(_sync_env->cct),
68 sync_env(_sync_env),
69 ack_level (_ack_level) {
70 // ctor also set the data to send
71 set_post_data(_post_data);
72 set_send_length(_post_data.length());
73 }
74
75 // send message to endpoint
76 int send_request() override {
77 init_new_io(this);
78 const auto rc = sync_env->http_manager->add_request(this);
79 if (rc < 0) {
80 return rc;
81 }
82 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
83 return 0;
84 }
85
86 // wait for reply
87 int request_complete() override {
88 if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
89 if (ack_level == ACK_LEVEL_ANY) {
90 return 0;
91 } else if (ack_level == ACK_LEVEL_NON_ERROR) {
92 // TODO check result code to be non-error
93 } else {
94 // TODO: check that result code == ack_level
95 }
96 return -1;
97 }
98 };
99
100 public:
101 RGWPubSubHTTPEndpoint(const std::string& _endpoint,
102 const RGWHTTPArgs& args) : endpoint(_endpoint) {
103 bool exists;
104
105 str_ack_level = args.get("http-ack-level", &exists);
106 if (!exists || str_ack_level == "any") {
107 // "any" is default
108 ack_level = ACK_LEVEL_ANY;
109 } else if (str_ack_level == "non-error") {
110 ack_level = ACK_LEVEL_NON_ERROR;
111 } else {
112 ack_level = std::atoi(str_ack_level.c_str());
113 if (ack_level < 100 || ack_level >= 600) {
114 throw configuration_error("HTTP/S: invalid http-ack-level: " + str_ack_level);
115 }
116 }
117
118 auto str_verify_ssl = args.get("verify-ssl", &exists);
119 boost::algorithm::to_lower(str_verify_ssl);
120 // verify server certificate by default
121 if (!exists || str_verify_ssl == "true") {
122 verify_ssl = true;
123 } else if (str_verify_ssl == "false") {
124 verify_ssl = false;
125 } else {
126 throw configuration_error("HTTP/S: verify-ssl must be true/false, not: " + str_verify_ssl);
127 }
128 }
129
130 RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
131 return new PostCR(json_format_pubsub_event(event), env, endpoint, ack_level, verify_ssl);
132 }
133
134 RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) override {
135 return new PostCR(json_format_pubsub_event(event), env, endpoint, ack_level, verify_ssl);
136 }
137
138 int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
139 bufferlist read_bl;
140 RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl);
141 const auto post_data = json_format_pubsub_event(event);
142 request.set_post_data(post_data);
143 request.set_send_length(post_data.length());
144 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
145 const auto rc = RGWHTTP::process(&request, y);
146 if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
147 // TODO: use read_bl to process return code and handle according to ack level
148 return rc;
149 }
150
151 std::string to_str() const override {
152 std::string str("HTTP/S Endpoint");
153 str += "\nURI: " + endpoint;
154 str += "\nAck Level: " + str_ack_level;
155 str += (verify_ssl ? "\nverify SSL" : "\ndon't verify SSL");
156 return str;
157
158 }
159 };
160
161 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
162 class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint {
163 private:
164 enum class ack_level_t {
165 None,
166 Broker,
167 Routable
168 };
169 CephContext* const cct;
170 const std::string endpoint;
171 const std::string topic;
172 const std::string exchange;
173 ack_level_t ack_level;
174 amqp::connection_ptr_t conn;
175
176 bool get_verify_ssl(const RGWHTTPArgs& args) {
177 bool exists;
178 auto str_verify_ssl = args.get("verify-ssl", &exists);
179 if (!exists) {
180 // verify server certificate by default
181 return true;
182 }
183 boost::algorithm::to_lower(str_verify_ssl);
184 if (str_verify_ssl == "true") {
185 return true;
186 }
187 if (str_verify_ssl == "false") {
188 return false;
189 }
190 throw configuration_error("'verify-ssl' must be true/false, not: " + str_verify_ssl);
191 }
192
193 std::string get_exchange(const RGWHTTPArgs& args) {
194 bool exists;
195 const auto exchange = args.get("amqp-exchange", &exists);
196 if (!exists) {
197 throw configuration_error("AMQP: missing amqp-exchange");
198 }
199 return exchange;
200 }
201
202 ack_level_t get_ack_level(const RGWHTTPArgs& args) {
203 bool exists;
204 const auto& str_ack_level = args.get("amqp-ack-level", &exists);
205 if (!exists || str_ack_level == "broker") {
206 // "broker" is default
207 return ack_level_t::Broker;
208 }
209 if (str_ack_level == "none") {
210 return ack_level_t::None;
211 }
212 if (str_ack_level == "routable") {
213 return ack_level_t::Routable;
214 }
215 throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level);
216 }
217
218 // NoAckPublishCR implements async amqp publishing via coroutine
219 // This coroutine ends when it send the message and does not wait for an ack
220 class NoAckPublishCR : public RGWCoroutine {
221 private:
222 const std::string topic;
223 amqp::connection_ptr_t conn;
224 const std::string message;
225
226 public:
227 NoAckPublishCR(CephContext* cct,
228 const std::string& _topic,
229 amqp::connection_ptr_t& _conn,
230 const std::string& _message) :
231 RGWCoroutine(cct),
232 topic(_topic), conn(_conn), message(_message) {}
233
234 // send message to endpoint, without waiting for reply
235 int operate() override {
236 reenter(this) {
237 const auto rc = amqp::publish(conn, topic, message);
238 if (rc < 0) {
239 return set_cr_error(rc);
240 }
241 return set_cr_done();
242 }
243 return 0;
244 }
245 };
246
247 // AckPublishCR implements async amqp publishing via coroutine
248 // This coroutine ends when an ack is received from the borker
249 // note that it does not wait for an ack fron the end client
250 class AckPublishCR : public RGWCoroutine, public RGWIOProvider {
251 private:
252 const std::string topic;
253 amqp::connection_ptr_t conn;
254 const std::string message;
255
256 public:
257 AckPublishCR(CephContext* cct,
258 const std::string& _topic,
259 amqp::connection_ptr_t& _conn,
260 const std::string& _message) :
261 RGWCoroutine(cct),
262 topic(_topic), conn(_conn), message(_message) {}
263
264 // send message to endpoint, waiting for reply
265 int operate() override {
266 reenter(this) {
267 yield {
268 init_new_io(this);
269 const auto rc = amqp::publish_with_confirm(conn,
270 topic,
271 message,
272 std::bind(&AckPublishCR::request_complete, this, std::placeholders::_1));
273 if (rc < 0) {
274 // failed to publish, does not wait for reply
275 return set_cr_error(rc);
276 }
277 // mark as blocked on the amqp answer
278 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
279 io_block();
280 return 0;
281 }
282 return set_cr_done();
283 }
284 return 0;
285 }
286
287 // callback invoked from the amqp manager thread when ack/nack is received
288 void request_complete(int status) {
289 ceph_assert(!is_done());
290 if (status != 0) {
291 // server replied with a nack
292 set_cr_error(status);
293 }
294 io_complete();
295 if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
296 }
297
298 // TODO: why are these mandatory in RGWIOProvider?
299 void set_io_user_info(void *_user_info) override {
300 }
301
302 void *get_io_user_info() override {
303 return nullptr;
304 }
305 };
306
307 public:
308 RGWPubSubAMQPEndpoint(const std::string& _endpoint,
309 const std::string& _topic,
310 const RGWHTTPArgs& args,
311 CephContext* _cct) :
312 cct(_cct),
313 endpoint(_endpoint),
314 topic(_topic),
315 exchange(get_exchange(args)),
316 ack_level(get_ack_level(args)),
317 conn(amqp::connect(endpoint, exchange, (ack_level == ack_level_t::Broker), get_verify_ssl(args), args.get_optional("ca-location"))) {
318 if (!conn) {
319 throw configuration_error("AMQP: failed to create connection to: " + endpoint);
320 }
321 }
322
323 RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
324 ceph_assert(conn);
325 if (ack_level == ack_level_t::None) {
326 return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
327 } else {
328 return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
329 }
330 }
331
332 RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) override {
333 ceph_assert(conn);
334 if (ack_level == ack_level_t::None) {
335 return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
336 } else {
337 return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
338 }
339 }
340
341 // this allows waiting untill "finish()" is called from a different thread
342 // waiting could be blocking the waiting thread or yielding, depending
343 // with compilation flag support and whether the optional_yield is set
344 class Waiter {
345 using Signature = void(boost::system::error_code);
346 using Completion = ceph::async::Completion<Signature>;
347 std::unique_ptr<Completion> completion = nullptr;
348 int ret;
349
350 mutable std::atomic<bool> done = false;
351 mutable std::mutex lock;
352 mutable std::condition_variable cond;
353
354 template <typename ExecutionContext, typename CompletionToken>
355 auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
356 boost::asio::async_completion<CompletionToken, Signature> init(token);
357 auto& handler = init.completion_handler;
358 {
359 std::unique_lock l{lock};
360 completion = Completion::create(ctx.get_executor(), std::move(handler));
361 }
362 return init.result.get();
363 }
364
365 public:
366 int wait(optional_yield y) {
367 if (done) {
368 return ret;
369 }
370 if (y) {
371 auto& io_ctx = y.get_io_context();
372 auto& yield_ctx = y.get_yield_context();
373 boost::system::error_code ec;
374 async_wait(io_ctx, yield_ctx[ec]);
375 return -ec.value();
376 }
377 std::unique_lock l(lock);
378 cond.wait(l, [this]{return (done==true);});
379 return ret;
380 }
381
382 void finish(int r) {
383 std::unique_lock l{lock};
384 ret = r;
385 done = true;
386 if (completion) {
387 boost::system::error_code ec(-ret, boost::system::system_category());
388 Completion::post(std::move(completion), ec);
389 } else {
390 cond.notify_all();
391 }
392 }
393 };
394
395 int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
396 ceph_assert(conn);
397 if (ack_level == ack_level_t::None) {
398 return amqp::publish(conn, topic, json_format_pubsub_event(event));
399 } else {
400 // TODO: currently broker and routable are the same - this will require different flags but the same mechanism
401 // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
402 auto w = std::unique_ptr<Waiter>(new Waiter);
403 const auto rc = amqp::publish_with_confirm(conn,
404 topic,
405 json_format_pubsub_event(event),
406 std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
407 if (rc < 0) {
408 // failed to publish, does not wait for reply
409 return rc;
410 }
411 return w->wait(y);
412 }
413 }
414
415 std::string to_str() const override {
416 std::string str("AMQP(0.9.1) Endpoint");
417 str += "\nURI: " + endpoint;
418 str += "\nTopic: " + topic;
419 str += "\nExchange: " + exchange;
420 return str;
421 }
422 };
423
424 static const std::string AMQP_0_9_1("0-9-1");
425 static const std::string AMQP_1_0("1-0");
426 static const std::string AMQP_SCHEMA("amqp");
427 #endif // ifdef WITH_RADOSGW_AMQP_ENDPOINT
428
429
430 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
431 class RGWPubSubKafkaEndpoint : public RGWPubSubEndpoint {
432 private:
433 enum class ack_level_t {
434 None,
435 Broker,
436 };
437 CephContext* const cct;
438 const std::string topic;
439 kafka::connection_ptr_t conn;
440 const ack_level_t ack_level;
441
442 bool get_verify_ssl(const RGWHTTPArgs& args) {
443 bool exists;
444 auto str_verify_ssl = args.get("verify-ssl", &exists);
445 if (!exists) {
446 // verify server certificate by default
447 return true;
448 }
449 boost::algorithm::to_lower(str_verify_ssl);
450 if (str_verify_ssl == "true") {
451 return true;
452 }
453 if (str_verify_ssl == "false") {
454 return false;
455 }
456 throw configuration_error("'verify-ssl' must be true/false, not: " + str_verify_ssl);
457 }
458
459 bool get_use_ssl(const RGWHTTPArgs& args) {
460 bool exists;
461 auto str_use_ssl = args.get("use-ssl", &exists);
462 if (!exists) {
463 // by default ssl not used
464 return false;
465 }
466 boost::algorithm::to_lower(str_use_ssl);
467 if (str_use_ssl == "true") {
468 return true;
469 }
470 if (str_use_ssl == "false") {
471 return false;
472 }
473 throw configuration_error("'use-ssl' must be true/false, not: " + str_use_ssl);
474 }
475
476 ack_level_t get_ack_level(const RGWHTTPArgs& args) {
477 bool exists;
478 // get ack level
479 const auto str_ack_level = args.get("kafka-ack-level", &exists);
480 if (!exists || str_ack_level == "broker") {
481 // "broker" is default
482 return ack_level_t::Broker;
483 }
484 if (str_ack_level == "none") {
485 return ack_level_t::None;
486 }
487 throw configuration_error("Kafka: invalid kafka-ack-level: " + str_ack_level);
488 }
489
490 // NoAckPublishCR implements async kafka publishing via coroutine
491 // This coroutine ends when it send the message and does not wait for an ack
492 class NoAckPublishCR : public RGWCoroutine {
493 private:
494 const std::string topic;
495 kafka::connection_ptr_t conn;
496 const std::string message;
497
498 public:
499 NoAckPublishCR(CephContext* cct,
500 const std::string& _topic,
501 kafka::connection_ptr_t& _conn,
502 const std::string& _message) :
503 RGWCoroutine(cct),
504 topic(_topic), conn(_conn), message(_message) {}
505
506 // send message to endpoint, without waiting for reply
507 int operate() override {
508 reenter(this) {
509 const auto rc = kafka::publish(conn, topic, message);
510 if (rc < 0) {
511 return set_cr_error(rc);
512 }
513 return set_cr_done();
514 }
515 return 0;
516 }
517 };
518
519 // AckPublishCR implements async kafka publishing via coroutine
520 // This coroutine ends when an ack is received from the borker
521 // note that it does not wait for an ack fron the end client
522 class AckPublishCR : public RGWCoroutine, public RGWIOProvider {
523 private:
524 const std::string topic;
525 kafka::connection_ptr_t conn;
526 const std::string message;
527
528 public:
529 AckPublishCR(CephContext* cct,
530 const std::string& _topic,
531 kafka::connection_ptr_t& _conn,
532 const std::string& _message) :
533 RGWCoroutine(cct),
534 topic(_topic), conn(_conn), message(_message) {}
535
536 // send message to endpoint, waiting for reply
537 int operate() override {
538 reenter(this) {
539 yield {
540 init_new_io(this);
541 const auto rc = kafka::publish_with_confirm(conn,
542 topic,
543 message,
544 std::bind(&AckPublishCR::request_complete, this, std::placeholders::_1));
545 if (rc < 0) {
546 // failed to publish, does not wait for reply
547 return set_cr_error(rc);
548 }
549 // mark as blocked on the kafka answer
550 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
551 io_block();
552 return 0;
553 }
554 return set_cr_done();
555 }
556 return 0;
557 }
558
559 // callback invoked from the kafka manager thread when ack/nack is received
560 void request_complete(int status) {
561 ceph_assert(!is_done());
562 if (status != 0) {
563 // server replied with a nack
564 set_cr_error(status);
565 }
566 io_complete();
567 if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
568 }
569
570 // TODO: why are these mandatory in RGWIOProvider?
571 void set_io_user_info(void *_user_info) override {
572 }
573
574 void *get_io_user_info() override {
575 return nullptr;
576 }
577 };
578
579 public:
580 RGWPubSubKafkaEndpoint(const std::string& _endpoint,
581 const std::string& _topic,
582 const RGWHTTPArgs& args,
583 CephContext* _cct) :
584 cct(_cct),
585 topic(_topic),
586 conn(kafka::connect(_endpoint, get_use_ssl(args), get_verify_ssl(args), args.get_optional("ca-location"))) ,
587 ack_level(get_ack_level(args)) {
588 if (!conn) {
589 throw configuration_error("Kafka: failed to create connection to: " + _endpoint);
590 }
591 }
592
593 RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
594 ceph_assert(conn);
595 if (ack_level == ack_level_t::None) {
596 return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
597 } else {
598 return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
599 }
600 }
601
602 RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) override {
603 ceph_assert(conn);
604 if (ack_level == ack_level_t::None) {
605 return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
606 } else {
607 return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
608 }
609 }
610
611 // this allows waiting untill "finish()" is called from a different thread
612 // waiting could be blocking the waiting thread or yielding, depending
613 // with compilation flag support and whether the optional_yield is set
614 class Waiter {
615 using Signature = void(boost::system::error_code);
616 using Completion = ceph::async::Completion<Signature>;
617 std::unique_ptr<Completion> completion = nullptr;
618 int ret;
619
620 mutable std::atomic<bool> done = false;
621 mutable std::mutex lock;
622 mutable std::condition_variable cond;
623
624 template <typename ExecutionContext, typename CompletionToken>
625 auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
626 boost::asio::async_completion<CompletionToken, Signature> init(token);
627 auto& handler = init.completion_handler;
628 {
629 std::unique_lock l{lock};
630 completion = Completion::create(ctx.get_executor(), std::move(handler));
631 }
632 return init.result.get();
633 }
634
635 public:
636 int wait(optional_yield y) {
637 if (done) {
638 return ret;
639 }
640 if (y) {
641 auto& io_ctx = y.get_io_context();
642 auto& yield_ctx = y.get_yield_context();
643 boost::system::error_code ec;
644 async_wait(io_ctx, yield_ctx[ec]);
645 return -ec.value();
646 }
647 std::unique_lock l(lock);
648 cond.wait(l, [this]{return (done==true);});
649 return ret;
650 }
651
652 void finish(int r) {
653 std::unique_lock l{lock};
654 ret = r;
655 done = true;
656 if (completion) {
657 boost::system::error_code ec(-ret, boost::system::system_category());
658 Completion::post(std::move(completion), ec);
659 } else {
660 cond.notify_all();
661 }
662 }
663 };
664
665 int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
666 ceph_assert(conn);
667 if (ack_level == ack_level_t::None) {
668 return kafka::publish(conn, topic, json_format_pubsub_event(event));
669 } else {
670 // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
671 auto w = std::unique_ptr<Waiter>(new Waiter);
672 const auto rc = kafka::publish_with_confirm(conn,
673 topic,
674 json_format_pubsub_event(event),
675 std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
676 if (rc < 0) {
677 // failed to publish, does not wait for reply
678 return rc;
679 }
680 return w->wait(y);
681 }
682 }
683
684 std::string to_str() const override {
685 std::string str("Kafka Endpoint");
686 str += kafka::to_string(conn);
687 str += "\nTopic: " + topic;
688 return str;
689 }
690 };
691
692 static const std::string KAFKA_SCHEMA("kafka");
693 #endif // ifdef WITH_RADOSGW_KAFKA_ENDPOINT
694
695 static const std::string WEBHOOK_SCHEMA("webhook");
696 static const std::string UNKNOWN_SCHEMA("unknown");
697 static const std::string NO_SCHEMA("");
698
699 const std::string& get_schema(const std::string& endpoint) {
700 if (endpoint.empty()) {
701 return NO_SCHEMA;
702 }
703 const auto pos = endpoint.find(':');
704 if (pos == std::string::npos) {
705 return UNKNOWN_SCHEMA;
706 }
707 const auto& schema = endpoint.substr(0,pos);
708 if (schema == "http" || schema == "https") {
709 return WEBHOOK_SCHEMA;
710 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
711 } else if (schema == "amqp" || schema == "amqps") {
712 return AMQP_SCHEMA;
713 #endif
714 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
715 } else if (schema == "kafka") {
716 return KAFKA_SCHEMA;
717 #endif
718 }
719 return UNKNOWN_SCHEMA;
720 }
721
722 RGWPubSubEndpoint::Ptr RGWPubSubEndpoint::create(const std::string& endpoint,
723 const std::string& topic,
724 const RGWHTTPArgs& args,
725 CephContext* cct) {
726 const auto& schema = get_schema(endpoint);
727 if (schema == WEBHOOK_SCHEMA) {
728 return Ptr(new RGWPubSubHTTPEndpoint(endpoint, args));
729 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
730 } else if (schema == AMQP_SCHEMA) {
731 bool exists;
732 std::string version = args.get("amqp-version", &exists);
733 if (!exists) {
734 version = AMQP_0_9_1;
735 }
736 if (version == AMQP_0_9_1) {
737 return Ptr(new RGWPubSubAMQPEndpoint(endpoint, topic, args, cct));
738 } else if (version == AMQP_1_0) {
739 throw configuration_error("AMQP: v1.0 not supported");
740 return nullptr;
741 } else {
742 throw configuration_error("AMQP: unknown version: " + version);
743 return nullptr;
744 }
745 #endif
746 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
747 } else if (schema == KAFKA_SCHEMA) {
748 return Ptr(new RGWPubSubKafkaEndpoint(endpoint, topic, args, cct));
749 #endif
750 }
751
752 throw configuration_error("unknown schema in: " + endpoint);
753 return nullptr;
754 }
755