]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_pubsub_push.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_pubsub_push.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "rgw_pubsub_push.h"
5 #include <string>
6 #include <sstream>
7 #include <algorithm>
8 #include "include/buffer_fwd.h"
9 #include "common/Formatter.h"
10 #include "common/iso_8601.h"
11 #include "common/async/completion.h"
12 #include "rgw_common.h"
13 #include "rgw_data_sync.h"
14 #include "rgw_pubsub.h"
15 #include "acconfig.h"
16 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
17 #include "rgw_amqp.h"
18 #endif
19 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
20 #include "rgw_kafka.h"
21 #endif
22 #include <boost/asio/yield.hpp>
23 #include <boost/algorithm/string.hpp>
24 #include <functional>
25 #include "rgw_perf_counters.h"
26
27 using namespace rgw;
28
29 template<typename EventType>
30 std::string json_format_pubsub_event(const EventType& event) {
31 std::stringstream ss;
32 JSONFormatter f(false);
33 {
34 Formatter::ObjectSection s(f, EventType::json_type_plural);
35 {
36 Formatter::ArraySection s(f, EventType::json_type_plural);
37 encode_json("", event, &f);
38 }
39 }
40 f.flush(ss);
41 return ss.str();
42 }
43
44 bool get_bool(const RGWHTTPArgs& args, const std::string& name, bool default_value) {
45 bool value;
46 bool exists;
47 if (args.get_bool(name.c_str(), &value, &exists) == -EINVAL) {
48 throw RGWPubSubEndpoint::configuration_error("invalid boolean value for " + name);
49 }
50 if (!exists) {
51 return default_value;
52 }
53 return value;
54 }
55
56 class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint {
57 private:
58 const std::string endpoint;
59 typedef unsigned ack_level_t;
60 ack_level_t ack_level; // TODO: not used for now
61 const bool verify_ssl;
62 const bool cloudevents;
63 static const ack_level_t ACK_LEVEL_ANY = 0;
64 static const ack_level_t ACK_LEVEL_NON_ERROR = 1;
65
66 // PostCR implements async execution of RGWPostHTTPData via coroutine
67 class PostCR : public RGWPostHTTPData, public RGWSimpleCoroutine {
68 private:
69 RGWDataSyncEnv* const sync_env;
70 bufferlist read_bl;
71 const ack_level_t ack_level;
72
73 public:
74 PostCR(const std::string& _post_data,
75 RGWDataSyncEnv* _sync_env,
76 const std::string& endpoint,
77 ack_level_t _ack_level,
78 bool verify_ssl) :
79 RGWPostHTTPData(_sync_env->cct, "POST", endpoint, &read_bl, verify_ssl),
80 RGWSimpleCoroutine(_sync_env->cct),
81 sync_env(_sync_env),
82 ack_level (_ack_level) {
83 // ctor also set the data to send
84 set_post_data(_post_data);
85 set_send_length(_post_data.length());
86 }
87
88 // send message to endpoint
89 int send_request(const DoutPrefixProvider *dpp) override {
90 init_new_io(this);
91 const auto rc = sync_env->http_manager->add_request(this);
92 if (rc < 0) {
93 return rc;
94 }
95 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
96 return 0;
97 }
98
99 // wait for reply
100 int request_complete() override {
101 if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
102 if (ack_level == ACK_LEVEL_ANY) {
103 return 0;
104 } else if (ack_level == ACK_LEVEL_NON_ERROR) {
105 // TODO check result code to be non-error
106 } else {
107 // TODO: check that result code == ack_level
108 }
109 return -1;
110 }
111 };
112
113 public:
114 RGWPubSubHTTPEndpoint(const std::string& _endpoint, const RGWHTTPArgs& args) :
115 endpoint(_endpoint), verify_ssl(get_bool(args, "verify-ssl", true)), cloudevents(get_bool(args, "cloudevents", false))
116 {
117 bool exists;
118 const auto& str_ack_level = args.get("http-ack-level", &exists);
119 if (!exists || str_ack_level == "any") {
120 // "any" is default
121 ack_level = ACK_LEVEL_ANY;
122 } else if (str_ack_level == "non-error") {
123 ack_level = ACK_LEVEL_NON_ERROR;
124 } else {
125 ack_level = std::atoi(str_ack_level.c_str());
126 if (ack_level < 100 || ack_level >= 600) {
127 throw configuration_error("HTTP/S: invalid http-ack-level: " + str_ack_level);
128 }
129 }
130 }
131
132 RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
133 return new PostCR(json_format_pubsub_event(event), env, endpoint, ack_level, verify_ssl);
134 }
135
136 RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) override {
137 return new PostCR(json_format_pubsub_event(event), env, endpoint, ack_level, verify_ssl);
138 }
139
140 int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
141 bufferlist read_bl;
142 RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl);
143 const auto post_data = json_format_pubsub_event(event);
144 if (cloudevents) {
145 // following: https://github.com/cloudevents/spec/blob/v1.0.1/http-protocol-binding.md
146 // using "Binary Content Mode"
147 request.append_header("ce-specversion", "1.0");
148 request.append_header("ce-type", "com.amazonaws." + event.eventName);
149 request.append_header("ce-time", to_iso_8601(event.eventTime));
150 // default output of iso8601 is also RFC3339 compatible
151 request.append_header("ce-id", event.x_amz_request_id + "." + event.x_amz_id_2);
152 request.append_header("ce-source", event.eventSource + "." + event.awsRegion + "." + event.bucket_name);
153 request.append_header("ce-subject", event.object_key);
154 }
155 request.set_post_data(post_data);
156 request.set_send_length(post_data.length());
157 request.append_header("Content-Type", "application/json");
158 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
159 const auto rc = RGWHTTP::process(&request, y);
160 if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
161 // TODO: use read_bl to process return code and handle according to ack level
162 return rc;
163 }
164
165 std::string to_str() const override {
166 std::string str("HTTP/S Endpoint");
167 str += "\nURI: " + endpoint;
168 str += (verify_ssl ? "\nverify SSL" : "\ndon't verify SSL");
169 return str;
170 }
171 };
172
173 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
174 class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint {
175 private:
176 enum class ack_level_t {
177 None,
178 Broker,
179 Routable
180 };
181 CephContext* const cct;
182 const std::string endpoint;
183 const std::string topic;
184 const std::string exchange;
185 ack_level_t ack_level;
186 amqp::connection_ptr_t conn;
187
188 bool get_verify_ssl(const RGWHTTPArgs& args) {
189 bool exists;
190 auto str_verify_ssl = args.get("verify-ssl", &exists);
191 if (!exists) {
192 // verify server certificate by default
193 return true;
194 }
195 boost::algorithm::to_lower(str_verify_ssl);
196 if (str_verify_ssl == "true") {
197 return true;
198 }
199 if (str_verify_ssl == "false") {
200 return false;
201 }
202 throw configuration_error("'verify-ssl' must be true/false, not: " + str_verify_ssl);
203 }
204
205 std::string get_exchange(const RGWHTTPArgs& args) {
206 bool exists;
207 const auto exchange = args.get("amqp-exchange", &exists);
208 if (!exists) {
209 throw configuration_error("AMQP: missing amqp-exchange");
210 }
211 return exchange;
212 }
213
214 ack_level_t get_ack_level(const RGWHTTPArgs& args) {
215 bool exists;
216 const auto& str_ack_level = args.get("amqp-ack-level", &exists);
217 if (!exists || str_ack_level == "broker") {
218 // "broker" is default
219 return ack_level_t::Broker;
220 }
221 if (str_ack_level == "none") {
222 return ack_level_t::None;
223 }
224 if (str_ack_level == "routable") {
225 return ack_level_t::Routable;
226 }
227 throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level);
228 }
229
230 // NoAckPublishCR implements async amqp publishing via coroutine
231 // This coroutine ends when it send the message and does not wait for an ack
232 class NoAckPublishCR : public RGWCoroutine {
233 private:
234 const std::string topic;
235 amqp::connection_ptr_t conn;
236 const std::string message;
237
238 public:
239 NoAckPublishCR(CephContext* cct,
240 const std::string& _topic,
241 amqp::connection_ptr_t& _conn,
242 const std::string& _message) :
243 RGWCoroutine(cct),
244 topic(_topic), conn(_conn), message(_message) {}
245
246 // send message to endpoint, without waiting for reply
247 int operate(const DoutPrefixProvider *dpp) override {
248 reenter(this) {
249 const auto rc = amqp::publish(conn, topic, message);
250 if (rc < 0) {
251 return set_cr_error(rc);
252 }
253 return set_cr_done();
254 }
255 return 0;
256 }
257 };
258
259 // AckPublishCR implements async amqp publishing via coroutine
260 // This coroutine ends when an ack is received from the borker
261 // note that it does not wait for an ack fron the end client
262 class AckPublishCR : public RGWCoroutine, public RGWIOProvider {
263 private:
264 const std::string topic;
265 amqp::connection_ptr_t conn;
266 const std::string message;
267
268 public:
269 AckPublishCR(CephContext* cct,
270 const std::string& _topic,
271 amqp::connection_ptr_t& _conn,
272 const std::string& _message) :
273 RGWCoroutine(cct),
274 topic(_topic), conn(_conn), message(_message) {}
275
276 // send message to endpoint, waiting for reply
277 int operate(const DoutPrefixProvider *dpp) override {
278 reenter(this) {
279 yield {
280 init_new_io(this);
281 const auto rc = amqp::publish_with_confirm(conn,
282 topic,
283 message,
284 std::bind(&AckPublishCR::request_complete, this, std::placeholders::_1));
285 if (rc < 0) {
286 // failed to publish, does not wait for reply
287 return set_cr_error(rc);
288 }
289 // mark as blocked on the amqp answer
290 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
291 io_block();
292 return 0;
293 }
294 return set_cr_done();
295 }
296 return 0;
297 }
298
299 // callback invoked from the amqp manager thread when ack/nack is received
300 void request_complete(int status) {
301 ceph_assert(!is_done());
302 if (status != 0) {
303 // server replied with a nack
304 set_cr_error(status);
305 }
306 io_complete();
307 if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
308 }
309
310 // TODO: why are these mandatory in RGWIOProvider?
311 void set_io_user_info(void *_user_info) override {
312 }
313
314 void *get_io_user_info() override {
315 return nullptr;
316 }
317 };
318
319 public:
320 RGWPubSubAMQPEndpoint(const std::string& _endpoint,
321 const std::string& _topic,
322 const RGWHTTPArgs& args,
323 CephContext* _cct) :
324 cct(_cct),
325 endpoint(_endpoint),
326 topic(_topic),
327 exchange(get_exchange(args)),
328 ack_level(get_ack_level(args)),
329 conn(amqp::connect(endpoint, exchange, (ack_level == ack_level_t::Broker), get_verify_ssl(args), args.get_optional("ca-location"))) {
330 if (!conn) {
331 throw configuration_error("AMQP: failed to create connection to: " + endpoint);
332 }
333 }
334
335 RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
336 ceph_assert(conn);
337 if (ack_level == ack_level_t::None) {
338 return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
339 } else {
340 return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
341 }
342 }
343
344 RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) override {
345 ceph_assert(conn);
346 if (ack_level == ack_level_t::None) {
347 return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
348 } else {
349 return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
350 }
351 }
352
353 // this allows waiting untill "finish()" is called from a different thread
354 // waiting could be blocking the waiting thread or yielding, depending
355 // with compilation flag support and whether the optional_yield is set
356 class Waiter {
357 using Signature = void(boost::system::error_code);
358 using Completion = ceph::async::Completion<Signature>;
359 std::unique_ptr<Completion> completion = nullptr;
360 int ret;
361
362 mutable std::atomic<bool> done = false;
363 mutable std::mutex lock;
364 mutable std::condition_variable cond;
365
366 template <typename ExecutionContext, typename CompletionToken>
367 auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
368 boost::asio::async_completion<CompletionToken, Signature> init(token);
369 auto& handler = init.completion_handler;
370 {
371 std::unique_lock l{lock};
372 completion = Completion::create(ctx.get_executor(), std::move(handler));
373 }
374 return init.result.get();
375 }
376
377 public:
378 int wait(optional_yield y) {
379 if (done) {
380 return ret;
381 }
382 if (y) {
383 auto& io_ctx = y.get_io_context();
384 auto& yield_ctx = y.get_yield_context();
385 boost::system::error_code ec;
386 async_wait(io_ctx, yield_ctx[ec]);
387 return -ec.value();
388 }
389 std::unique_lock l(lock);
390 cond.wait(l, [this]{return (done==true);});
391 return ret;
392 }
393
394 void finish(int r) {
395 std::unique_lock l{lock};
396 ret = r;
397 done = true;
398 if (completion) {
399 boost::system::error_code ec(-ret, boost::system::system_category());
400 Completion::post(std::move(completion), ec);
401 } else {
402 cond.notify_all();
403 }
404 }
405 };
406
407 int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
408 ceph_assert(conn);
409 if (ack_level == ack_level_t::None) {
410 return amqp::publish(conn, topic, json_format_pubsub_event(event));
411 } else {
412 // TODO: currently broker and routable are the same - this will require different flags but the same mechanism
413 // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
414 auto w = std::unique_ptr<Waiter>(new Waiter);
415 const auto rc = amqp::publish_with_confirm(conn,
416 topic,
417 json_format_pubsub_event(event),
418 std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
419 if (rc < 0) {
420 // failed to publish, does not wait for reply
421 return rc;
422 }
423 return w->wait(y);
424 }
425 }
426
427 std::string to_str() const override {
428 std::string str("AMQP(0.9.1) Endpoint");
429 str += "\nURI: " + endpoint;
430 str += "\nTopic: " + topic;
431 str += "\nExchange: " + exchange;
432 return str;
433 }
434 };
435
436 static const std::string AMQP_0_9_1("0-9-1");
437 static const std::string AMQP_1_0("1-0");
438 static const std::string AMQP_SCHEMA("amqp");
439 #endif // ifdef WITH_RADOSGW_AMQP_ENDPOINT
440
441
442 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
443 class RGWPubSubKafkaEndpoint : public RGWPubSubEndpoint {
444 private:
445 enum class ack_level_t {
446 None,
447 Broker,
448 };
449 CephContext* const cct;
450 const std::string topic;
451 kafka::connection_ptr_t conn;
452 const ack_level_t ack_level;
453
454
455 ack_level_t get_ack_level(const RGWHTTPArgs& args) {
456 bool exists;
457 const auto& str_ack_level = args.get("kafka-ack-level", &exists);
458 if (!exists || str_ack_level == "broker") {
459 // "broker" is default
460 return ack_level_t::Broker;
461 }
462 if (str_ack_level == "none") {
463 return ack_level_t::None;
464 }
465 throw configuration_error("Kafka: invalid kafka-ack-level: " + str_ack_level);
466 }
467
468 // NoAckPublishCR implements async kafka publishing via coroutine
469 // This coroutine ends when it send the message and does not wait for an ack
470 class NoAckPublishCR : public RGWCoroutine {
471 private:
472 const std::string topic;
473 kafka::connection_ptr_t conn;
474 const std::string message;
475
476 public:
477 NoAckPublishCR(CephContext* cct,
478 const std::string& _topic,
479 kafka::connection_ptr_t& _conn,
480 const std::string& _message) :
481 RGWCoroutine(cct),
482 topic(_topic), conn(_conn), message(_message) {}
483
484 // send message to endpoint, without waiting for reply
485 int operate(const DoutPrefixProvider *dpp) override {
486 reenter(this) {
487 const auto rc = kafka::publish(conn, topic, message);
488 if (rc < 0) {
489 return set_cr_error(rc);
490 }
491 return set_cr_done();
492 }
493 return 0;
494 }
495 };
496
497 // AckPublishCR implements async kafka publishing via coroutine
498 // This coroutine ends when an ack is received from the borker
499 // note that it does not wait for an ack fron the end client
500 class AckPublishCR : public RGWCoroutine, public RGWIOProvider {
501 private:
502 const std::string topic;
503 kafka::connection_ptr_t conn;
504 const std::string message;
505
506 public:
507 AckPublishCR(CephContext* cct,
508 const std::string& _topic,
509 kafka::connection_ptr_t& _conn,
510 const std::string& _message) :
511 RGWCoroutine(cct),
512 topic(_topic), conn(_conn), message(_message) {}
513
514 // send message to endpoint, waiting for reply
515 int operate(const DoutPrefixProvider *dpp) override {
516 reenter(this) {
517 yield {
518 init_new_io(this);
519 const auto rc = kafka::publish_with_confirm(conn,
520 topic,
521 message,
522 std::bind(&AckPublishCR::request_complete, this, std::placeholders::_1));
523 if (rc < 0) {
524 // failed to publish, does not wait for reply
525 return set_cr_error(rc);
526 }
527 // mark as blocked on the kafka answer
528 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
529 io_block();
530 return 0;
531 }
532 return set_cr_done();
533 }
534 return 0;
535 }
536
537 // callback invoked from the kafka manager thread when ack/nack is received
538 void request_complete(int status) {
539 ceph_assert(!is_done());
540 if (status != 0) {
541 // server replied with a nack
542 set_cr_error(status);
543 }
544 io_complete();
545 if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
546 }
547
548 // TODO: why are these mandatory in RGWIOProvider?
549 void set_io_user_info(void *_user_info) override {
550 }
551
552 void *get_io_user_info() override {
553 return nullptr;
554 }
555 };
556
557 public:
558 RGWPubSubKafkaEndpoint(const std::string& _endpoint,
559 const std::string& _topic,
560 const RGWHTTPArgs& args,
561 CephContext* _cct) :
562 cct(_cct),
563 topic(_topic),
564 conn(kafka::connect(_endpoint, get_bool(args, "use-ssl", false), get_bool(args, "verify-ssl", true), args.get_optional("ca-location"))) ,
565 ack_level(get_ack_level(args)) {
566 if (!conn) {
567 throw configuration_error("Kafka: failed to create connection to: " + _endpoint);
568 }
569 }
570
571 RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
572 ceph_assert(conn);
573 if (ack_level == ack_level_t::None) {
574 return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
575 } else {
576 return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
577 }
578 }
579
580 RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) override {
581 ceph_assert(conn);
582 if (ack_level == ack_level_t::None) {
583 return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
584 } else {
585 return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
586 }
587 }
588
589 // this allows waiting untill "finish()" is called from a different thread
590 // waiting could be blocking the waiting thread or yielding, depending
591 // with compilation flag support and whether the optional_yield is set
592 class Waiter {
593 using Signature = void(boost::system::error_code);
594 using Completion = ceph::async::Completion<Signature>;
595 std::unique_ptr<Completion> completion = nullptr;
596 int ret;
597
598 mutable std::atomic<bool> done = false;
599 mutable std::mutex lock;
600 mutable std::condition_variable cond;
601
602 template <typename ExecutionContext, typename CompletionToken>
603 auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
604 boost::asio::async_completion<CompletionToken, Signature> init(token);
605 auto& handler = init.completion_handler;
606 {
607 std::unique_lock l{lock};
608 completion = Completion::create(ctx.get_executor(), std::move(handler));
609 }
610 return init.result.get();
611 }
612
613 public:
614 int wait(optional_yield y) {
615 if (done) {
616 return ret;
617 }
618 if (y) {
619 auto& io_ctx = y.get_io_context();
620 auto& yield_ctx = y.get_yield_context();
621 boost::system::error_code ec;
622 async_wait(io_ctx, yield_ctx[ec]);
623 return -ec.value();
624 }
625 std::unique_lock l(lock);
626 cond.wait(l, [this]{return (done==true);});
627 return ret;
628 }
629
630 void finish(int r) {
631 std::unique_lock l{lock};
632 ret = r;
633 done = true;
634 if (completion) {
635 boost::system::error_code ec(-ret, boost::system::system_category());
636 Completion::post(std::move(completion), ec);
637 } else {
638 cond.notify_all();
639 }
640 }
641 };
642
643 int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
644 ceph_assert(conn);
645 if (ack_level == ack_level_t::None) {
646 return kafka::publish(conn, topic, json_format_pubsub_event(event));
647 } else {
648 // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
649 auto w = std::unique_ptr<Waiter>(new Waiter);
650 const auto rc = kafka::publish_with_confirm(conn,
651 topic,
652 json_format_pubsub_event(event),
653 std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
654 if (rc < 0) {
655 // failed to publish, does not wait for reply
656 return rc;
657 }
658 return w->wait(y);
659 }
660 }
661
662 std::string to_str() const override {
663 std::string str("Kafka Endpoint");
664 str += kafka::to_string(conn);
665 str += "\nTopic: " + topic;
666 return str;
667 }
668 };
669
670 static const std::string KAFKA_SCHEMA("kafka");
671 #endif // ifdef WITH_RADOSGW_KAFKA_ENDPOINT
672
673 static const std::string WEBHOOK_SCHEMA("webhook");
674 static const std::string UNKNOWN_SCHEMA("unknown");
675 static const std::string NO_SCHEMA("");
676
677 const std::string& get_schema(const std::string& endpoint) {
678 if (endpoint.empty()) {
679 return NO_SCHEMA;
680 }
681 const auto pos = endpoint.find(':');
682 if (pos == std::string::npos) {
683 return UNKNOWN_SCHEMA;
684 }
685 const auto& schema = endpoint.substr(0,pos);
686 if (schema == "http" || schema == "https") {
687 return WEBHOOK_SCHEMA;
688 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
689 } else if (schema == "amqp" || schema == "amqps") {
690 return AMQP_SCHEMA;
691 #endif
692 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
693 } else if (schema == "kafka") {
694 return KAFKA_SCHEMA;
695 #endif
696 }
697 return UNKNOWN_SCHEMA;
698 }
699
700 RGWPubSubEndpoint::Ptr RGWPubSubEndpoint::create(const std::string& endpoint,
701 const std::string& topic,
702 const RGWHTTPArgs& args,
703 CephContext* cct) {
704 const auto& schema = get_schema(endpoint);
705 if (schema == WEBHOOK_SCHEMA) {
706 return Ptr(new RGWPubSubHTTPEndpoint(endpoint, args));
707 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
708 } else if (schema == AMQP_SCHEMA) {
709 bool exists;
710 std::string version = args.get("amqp-version", &exists);
711 if (!exists) {
712 version = AMQP_0_9_1;
713 }
714 if (version == AMQP_0_9_1) {
715 return Ptr(new RGWPubSubAMQPEndpoint(endpoint, topic, args, cct));
716 } else if (version == AMQP_1_0) {
717 throw configuration_error("AMQP: v1.0 not supported");
718 return nullptr;
719 } else {
720 throw configuration_error("AMQP: unknown version: " + version);
721 return nullptr;
722 }
723 #endif
724 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
725 } else if (schema == KAFKA_SCHEMA) {
726 return Ptr(new RGWPubSubKafkaEndpoint(endpoint, topic, args, cct));
727 #endif
728 }
729
730 throw configuration_error("unknown schema in: " + endpoint);
731 return nullptr;
732 }
733