]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_pubsub_push.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_pubsub_push.cc
CommitLineData
11fdf7f2 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
11fdf7f2
TL
3
4#include "rgw_pubsub_push.h"
5#include <string>
6#include <sstream>
7#include <algorithm>
8#include "include/buffer_fwd.h"
9#include "common/Formatter.h"
20effc67 10#include "common/iso_8601.h"
eafe8130 11#include "common/async/completion.h"
11fdf7f2
TL
12#include "rgw_common.h"
13#include "rgw_data_sync.h"
14#include "rgw_pubsub.h"
15#include "acconfig.h"
16#ifdef WITH_RADOSGW_AMQP_ENDPOINT
17#include "rgw_amqp.h"
18#endif
9f95a23c
TL
19#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
20#include "rgw_kafka.h"
21#endif
11fdf7f2
TL
22#include <boost/asio/yield.hpp>
23#include <boost/algorithm/string.hpp>
24#include <functional>
25#include "rgw_perf_counters.h"
26
27using namespace rgw;
28
eafe8130
TL
29template<typename EventType>
30std::string json_format_pubsub_event(const EventType& event) {
11fdf7f2
TL
31 std::stringstream ss;
32 JSONFormatter f(false);
92f5a8d4
TL
33 {
34 Formatter::ObjectSection s(f, EventType::json_type_plural);
35 {
36 Formatter::ArraySection s(f, EventType::json_type_plural);
37 encode_json("", event, &f);
38 }
39 }
11fdf7f2
TL
40 f.flush(ss);
41 return ss.str();
42}
20effc67
TL
43
44bool get_bool(const RGWHTTPArgs& args, const std::string& name, bool default_value) {
45 bool value;
46 bool exists;
47 if (args.get_bool(name.c_str(), &value, &exists) == -EINVAL) {
48 throw RGWPubSubEndpoint::configuration_error("invalid boolean value for " + name);
49 }
50 if (!exists) {
51 return default_value;
52 }
53 return value;
54}
11fdf7f2
TL
55
56class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint {
57private:
58 const std::string endpoint;
11fdf7f2
TL
59 typedef unsigned ack_level_t;
60 ack_level_t ack_level; // TODO: not used for now
20effc67
TL
61 const bool verify_ssl;
62 const bool cloudevents;
11fdf7f2
TL
63 static const ack_level_t ACK_LEVEL_ANY = 0;
64 static const ack_level_t ACK_LEVEL_NON_ERROR = 1;
65
66 // PostCR implements async execution of RGWPostHTTPData via coroutine
67 class PostCR : public RGWPostHTTPData, public RGWSimpleCoroutine {
68 private:
69 RGWDataSyncEnv* const sync_env;
70 bufferlist read_bl;
71 const ack_level_t ack_level;
72
73 public:
74 PostCR(const std::string& _post_data,
75 RGWDataSyncEnv* _sync_env,
76 const std::string& endpoint,
77 ack_level_t _ack_level,
78 bool verify_ssl) :
79 RGWPostHTTPData(_sync_env->cct, "POST", endpoint, &read_bl, verify_ssl),
80 RGWSimpleCoroutine(_sync_env->cct),
81 sync_env(_sync_env),
82 ack_level (_ack_level) {
83 // ctor also set the data to send
84 set_post_data(_post_data);
85 set_send_length(_post_data.length());
86 }
87
88 // send message to endpoint
b3b6e05e 89 int send_request(const DoutPrefixProvider *dpp) override {
11fdf7f2
TL
90 init_new_io(this);
91 const auto rc = sync_env->http_manager->add_request(this);
92 if (rc < 0) {
93 return rc;
94 }
95 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
96 return 0;
97 }
98
99 // wait for reply
100 int request_complete() override {
101 if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
102 if (ack_level == ACK_LEVEL_ANY) {
103 return 0;
104 } else if (ack_level == ACK_LEVEL_NON_ERROR) {
105 // TODO check result code to be non-error
106 } else {
107 // TODO: check that result code == ack_level
108 }
109 return -1;
110 }
111 };
112
113public:
20effc67
TL
114 RGWPubSubHTTPEndpoint(const std::string& _endpoint, const RGWHTTPArgs& args) :
115 endpoint(_endpoint), verify_ssl(get_bool(args, "verify-ssl", true)), cloudevents(get_bool(args, "cloudevents", false))
116 {
eafe8130 117 bool exists;
20effc67 118 const auto& str_ack_level = args.get("http-ack-level", &exists);
eafe8130
TL
119 if (!exists || str_ack_level == "any") {
120 // "any" is default
121 ack_level = ACK_LEVEL_ANY;
122 } else if (str_ack_level == "non-error") {
123 ack_level = ACK_LEVEL_NON_ERROR;
124 } else {
125 ack_level = std::atoi(str_ack_level.c_str());
126 if (ack_level < 100 || ack_level >= 600) {
127 throw configuration_error("HTTP/S: invalid http-ack-level: " + str_ack_level);
11fdf7f2
TL
128 }
129 }
eafe8130
TL
130 }
131
11fdf7f2
TL
132 RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
133 return new PostCR(json_format_pubsub_event(event), env, endpoint, ack_level, verify_ssl);
134 }
135
f67539c2
TL
136 RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) override {
137 return new PostCR(json_format_pubsub_event(event), env, endpoint, ack_level, verify_ssl);
eafe8130
TL
138 }
139
f67539c2 140 int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
eafe8130
TL
141 bufferlist read_bl;
142 RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl);
f67539c2 143 const auto post_data = json_format_pubsub_event(event);
20effc67
TL
144 if (cloudevents) {
145 // following: https://github.com/cloudevents/spec/blob/v1.0.1/http-protocol-binding.md
146 // using "Binary Content Mode"
147 request.append_header("ce-specversion", "1.0");
148 request.append_header("ce-type", "com.amazonaws." + event.eventName);
149 request.append_header("ce-time", to_iso_8601(event.eventTime));
150 // default output of iso8601 is also RFC3339 compatible
151 request.append_header("ce-id", event.x_amz_request_id + "." + event.x_amz_id_2);
152 request.append_header("ce-source", event.eventSource + "." + event.awsRegion + "." + event.bucket_name);
153 request.append_header("ce-subject", event.object_key);
154 }
eafe8130
TL
155 request.set_post_data(post_data);
156 request.set_send_length(post_data.length());
522d829b 157 request.append_header("Content-Type", "application/json");
eafe8130
TL
158 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
159 const auto rc = RGWHTTP::process(&request, y);
160 if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
161 // TODO: use read_bl to process return code and handle according to ack level
162 return rc;
163 }
164
11fdf7f2 165 std::string to_str() const override {
eafe8130 166 std::string str("HTTP/S Endpoint");
11fdf7f2 167 str += "\nURI: " + endpoint;
11fdf7f2
TL
168 str += (verify_ssl ? "\nverify SSL" : "\ndon't verify SSL");
169 return str;
11fdf7f2
TL
170 }
171};
172
173#ifdef WITH_RADOSGW_AMQP_ENDPOINT
174class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint {
eafe8130 175private:
9f95a23c
TL
176 enum class ack_level_t {
177 None,
178 Broker,
179 Routable
eafe8130
TL
180 };
181 CephContext* const cct;
182 const std::string endpoint;
183 const std::string topic;
184 const std::string exchange;
eafe8130 185 ack_level_t ack_level;
e306af50 186 amqp::connection_ptr_t conn;
eafe8130 187
f67539c2
TL
188 bool get_verify_ssl(const RGWHTTPArgs& args) {
189 bool exists;
190 auto str_verify_ssl = args.get("verify-ssl", &exists);
191 if (!exists) {
192 // verify server certificate by default
193 return true;
194 }
195 boost::algorithm::to_lower(str_verify_ssl);
196 if (str_verify_ssl == "true") {
197 return true;
198 }
199 if (str_verify_ssl == "false") {
200 return false;
201 }
202 throw configuration_error("'verify-ssl' must be true/false, not: " + str_verify_ssl);
203 }
204
e306af50 205 std::string get_exchange(const RGWHTTPArgs& args) {
eafe8130
TL
206 bool exists;
207 const auto exchange = args.get("amqp-exchange", &exists);
208 if (!exists) {
209 throw configuration_error("AMQP: missing amqp-exchange");
11fdf7f2 210 }
eafe8130
TL
211 return exchange;
212 }
11fdf7f2 213
e306af50
TL
214 ack_level_t get_ack_level(const RGWHTTPArgs& args) {
215 bool exists;
216 const auto& str_ack_level = args.get("amqp-ack-level", &exists);
217 if (!exists || str_ack_level == "broker") {
218 // "broker" is default
219 return ack_level_t::Broker;
220 }
221 if (str_ack_level == "none") {
222 return ack_level_t::None;
223 }
224 if (str_ack_level == "routable") {
225 return ack_level_t::Routable;
226 }
227 throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level);
228 }
229
11fdf7f2
TL
230 // NoAckPublishCR implements async amqp publishing via coroutine
231 // This coroutine ends when it send the message and does not wait for an ack
232 class NoAckPublishCR : public RGWCoroutine {
233 private:
11fdf7f2
TL
234 const std::string topic;
235 amqp::connection_ptr_t conn;
236 const std::string message;
237
238 public:
eafe8130 239 NoAckPublishCR(CephContext* cct,
11fdf7f2
TL
240 const std::string& _topic,
241 amqp::connection_ptr_t& _conn,
242 const std::string& _message) :
eafe8130 243 RGWCoroutine(cct),
11fdf7f2
TL
244 topic(_topic), conn(_conn), message(_message) {}
245
246 // send message to endpoint, without waiting for reply
b3b6e05e 247 int operate(const DoutPrefixProvider *dpp) override {
11fdf7f2
TL
248 reenter(this) {
249 const auto rc = amqp::publish(conn, topic, message);
250 if (rc < 0) {
251 return set_cr_error(rc);
252 }
253 return set_cr_done();
254 }
255 return 0;
256 }
257 };
258
259 // AckPublishCR implements async amqp publishing via coroutine
260 // This coroutine ends when an ack is received from the borker
261 // note that it does not wait for an ack fron the end client
262 class AckPublishCR : public RGWCoroutine, public RGWIOProvider {
263 private:
11fdf7f2
TL
264 const std::string topic;
265 amqp::connection_ptr_t conn;
266 const std::string message;
11fdf7f2
TL
267
268 public:
eafe8130 269 AckPublishCR(CephContext* cct,
11fdf7f2
TL
270 const std::string& _topic,
271 amqp::connection_ptr_t& _conn,
e306af50 272 const std::string& _message) :
eafe8130 273 RGWCoroutine(cct),
e306af50 274 topic(_topic), conn(_conn), message(_message) {}
11fdf7f2
TL
275
276 // send message to endpoint, waiting for reply
b3b6e05e 277 int operate(const DoutPrefixProvider *dpp) override {
11fdf7f2
TL
278 reenter(this) {
279 yield {
280 init_new_io(this);
281 const auto rc = amqp::publish_with_confirm(conn,
282 topic,
283 message,
284 std::bind(&AckPublishCR::request_complete, this, std::placeholders::_1));
285 if (rc < 0) {
286 // failed to publish, does not wait for reply
287 return set_cr_error(rc);
288 }
289 // mark as blocked on the amqp answer
290 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
291 io_block();
292 return 0;
293 }
294 return set_cr_done();
295 }
296 return 0;
297 }
298
299 // callback invoked from the amqp manager thread when ack/nack is received
300 void request_complete(int status) {
301 ceph_assert(!is_done());
302 if (status != 0) {
303 // server replied with a nack
304 set_cr_error(status);
305 }
306 io_complete();
307 if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
308 }
309
310 // TODO: why are these mandatory in RGWIOProvider?
311 void set_io_user_info(void *_user_info) override {
312 }
313
314 void *get_io_user_info() override {
315 return nullptr;
316 }
317 };
e306af50 318
eafe8130
TL
319public:
320 RGWPubSubAMQPEndpoint(const std::string& _endpoint,
321 const std::string& _topic,
322 const RGWHTTPArgs& args,
323 CephContext* _cct) :
324 cct(_cct),
325 endpoint(_endpoint),
326 topic(_topic),
327 exchange(get_exchange(args)),
e306af50 328 ack_level(get_ack_level(args)),
f67539c2 329 conn(amqp::connect(endpoint, exchange, (ack_level == ack_level_t::Broker), get_verify_ssl(args), args.get_optional("ca-location"))) {
eafe8130
TL
330 if (!conn) {
331 throw configuration_error("AMQP: failed to create connection to: " + endpoint);
332 }
eafe8130
TL
333 }
334
335 RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
336 ceph_assert(conn);
9f95a23c 337 if (ack_level == ack_level_t::None) {
eafe8130
TL
338 return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
339 } else {
e306af50 340 return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
eafe8130
TL
341 }
342 }
343
f67539c2 344 RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) override {
eafe8130 345 ceph_assert(conn);
9f95a23c 346 if (ack_level == ack_level_t::None) {
f67539c2 347 return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
eafe8130 348 } else {
f67539c2 349 return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
eafe8130
TL
350 }
351 }
352
353 // this allows waiting untill "finish()" is called from a different thread
354 // waiting could be blocking the waiting thread or yielding, depending
355 // with compilation flag support and whether the optional_yield is set
356 class Waiter {
357 using Signature = void(boost::system::error_code);
358 using Completion = ceph::async::Completion<Signature>;
359 std::unique_ptr<Completion> completion = nullptr;
360 int ret;
361
362 mutable std::atomic<bool> done = false;
363 mutable std::mutex lock;
364 mutable std::condition_variable cond;
365
366 template <typename ExecutionContext, typename CompletionToken>
367 auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
368 boost::asio::async_completion<CompletionToken, Signature> init(token);
369 auto& handler = init.completion_handler;
370 {
371 std::unique_lock l{lock};
372 completion = Completion::create(ctx.get_executor(), std::move(handler));
373 }
374 return init.result.get();
375 }
376
11fdf7f2 377 public:
eafe8130
TL
378 int wait(optional_yield y) {
379 if (done) {
380 return ret;
381 }
eafe8130 382 if (y) {
f67539c2 383 auto& io_ctx = y.get_io_context();
eafe8130
TL
384 auto& yield_ctx = y.get_yield_context();
385 boost::system::error_code ec;
386 async_wait(io_ctx, yield_ctx[ec]);
387 return -ec.value();
11fdf7f2 388 }
eafe8130
TL
389 std::unique_lock l(lock);
390 cond.wait(l, [this]{return (done==true);});
391 return ret;
11fdf7f2
TL
392 }
393
eafe8130
TL
394 void finish(int r) {
395 std::unique_lock l{lock};
396 ret = r;
397 done = true;
398 if (completion) {
399 boost::system::error_code ec(-ret, boost::system::system_category());
400 Completion::post(std::move(completion), ec);
11fdf7f2 401 } else {
eafe8130 402 cond.notify_all();
11fdf7f2
TL
403 }
404 }
eafe8130 405 };
11fdf7f2 406
f67539c2 407 int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
eafe8130 408 ceph_assert(conn);
9f95a23c 409 if (ack_level == ack_level_t::None) {
f67539c2 410 return amqp::publish(conn, topic, json_format_pubsub_event(event));
eafe8130
TL
411 } else {
412 // TODO: currently broker and routable are the same - this will require different flags but the same mechanism
413 // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
414 auto w = std::unique_ptr<Waiter>(new Waiter);
415 const auto rc = amqp::publish_with_confirm(conn,
416 topic,
f67539c2 417 json_format_pubsub_event(event),
eafe8130
TL
418 std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
419 if (rc < 0) {
420 // failed to publish, does not wait for reply
421 return rc;
422 }
423 return w->wait(y);
11fdf7f2 424 }
eafe8130
TL
425 }
426
427 std::string to_str() const override {
428 std::string str("AMQP(0.9.1) Endpoint");
429 str += "\nURI: " + endpoint;
430 str += "\nTopic: " + topic;
431 str += "\nExchange: " + exchange;
eafe8130
TL
432 return str;
433 }
11fdf7f2
TL
434};
435
436static const std::string AMQP_0_9_1("0-9-1");
437static const std::string AMQP_1_0("1-0");
eafe8130 438static const std::string AMQP_SCHEMA("amqp");
11fdf7f2
TL
439#endif // ifdef WITH_RADOSGW_AMQP_ENDPOINT
440
9f95a23c
TL
441
442#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
443class RGWPubSubKafkaEndpoint : public RGWPubSubEndpoint {
444private:
445 enum class ack_level_t {
446 None,
447 Broker,
448 };
449 CephContext* const cct;
450 const std::string topic;
451 kafka::connection_ptr_t conn;
452 const ack_level_t ack_level;
453
9f95a23c 454
e306af50 455 ack_level_t get_ack_level(const RGWHTTPArgs& args) {
9f95a23c 456 bool exists;
20effc67 457 const auto& str_ack_level = args.get("kafka-ack-level", &exists);
9f95a23c
TL
458 if (!exists || str_ack_level == "broker") {
459 // "broker" is default
460 return ack_level_t::Broker;
461 }
462 if (str_ack_level == "none") {
463 return ack_level_t::None;
464 }
465 throw configuration_error("Kafka: invalid kafka-ack-level: " + str_ack_level);
466 }
467
468 // NoAckPublishCR implements async kafka publishing via coroutine
469 // This coroutine ends when it send the message and does not wait for an ack
470 class NoAckPublishCR : public RGWCoroutine {
471 private:
472 const std::string topic;
473 kafka::connection_ptr_t conn;
474 const std::string message;
475
476 public:
477 NoAckPublishCR(CephContext* cct,
478 const std::string& _topic,
479 kafka::connection_ptr_t& _conn,
480 const std::string& _message) :
481 RGWCoroutine(cct),
482 topic(_topic), conn(_conn), message(_message) {}
483
484 // send message to endpoint, without waiting for reply
b3b6e05e 485 int operate(const DoutPrefixProvider *dpp) override {
9f95a23c
TL
486 reenter(this) {
487 const auto rc = kafka::publish(conn, topic, message);
488 if (rc < 0) {
489 return set_cr_error(rc);
490 }
491 return set_cr_done();
492 }
493 return 0;
494 }
495 };
496
497 // AckPublishCR implements async kafka publishing via coroutine
498 // This coroutine ends when an ack is received from the borker
499 // note that it does not wait for an ack fron the end client
500 class AckPublishCR : public RGWCoroutine, public RGWIOProvider {
501 private:
502 const std::string topic;
503 kafka::connection_ptr_t conn;
504 const std::string message;
505
506 public:
507 AckPublishCR(CephContext* cct,
508 const std::string& _topic,
509 kafka::connection_ptr_t& _conn,
510 const std::string& _message) :
511 RGWCoroutine(cct),
512 topic(_topic), conn(_conn), message(_message) {}
513
514 // send message to endpoint, waiting for reply
b3b6e05e 515 int operate(const DoutPrefixProvider *dpp) override {
9f95a23c
TL
516 reenter(this) {
517 yield {
518 init_new_io(this);
519 const auto rc = kafka::publish_with_confirm(conn,
520 topic,
521 message,
522 std::bind(&AckPublishCR::request_complete, this, std::placeholders::_1));
523 if (rc < 0) {
524 // failed to publish, does not wait for reply
525 return set_cr_error(rc);
526 }
527 // mark as blocked on the kafka answer
528 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
529 io_block();
530 return 0;
531 }
532 return set_cr_done();
533 }
534 return 0;
535 }
536
537 // callback invoked from the kafka manager thread when ack/nack is received
538 void request_complete(int status) {
539 ceph_assert(!is_done());
540 if (status != 0) {
541 // server replied with a nack
542 set_cr_error(status);
543 }
544 io_complete();
545 if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
546 }
547
548 // TODO: why are these mandatory in RGWIOProvider?
549 void set_io_user_info(void *_user_info) override {
550 }
551
552 void *get_io_user_info() override {
553 return nullptr;
554 }
555 };
556
557public:
558 RGWPubSubKafkaEndpoint(const std::string& _endpoint,
559 const std::string& _topic,
560 const RGWHTTPArgs& args,
561 CephContext* _cct) :
562 cct(_cct),
563 topic(_topic),
20effc67 564 conn(kafka::connect(_endpoint, get_bool(args, "use-ssl", false), get_bool(args, "verify-ssl", true), args.get_optional("ca-location"))) ,
9f95a23c
TL
565 ack_level(get_ack_level(args)) {
566 if (!conn) {
567 throw configuration_error("Kafka: failed to create connection to: " + _endpoint);
568 }
569 }
570
571 RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
572 ceph_assert(conn);
573 if (ack_level == ack_level_t::None) {
574 return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
575 } else {
576 return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
577 }
578 }
579
f67539c2 580 RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) override {
9f95a23c
TL
581 ceph_assert(conn);
582 if (ack_level == ack_level_t::None) {
f67539c2 583 return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
9f95a23c 584 } else {
f67539c2 585 return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
9f95a23c
TL
586 }
587 }
588
589 // this allows waiting untill "finish()" is called from a different thread
590 // waiting could be blocking the waiting thread or yielding, depending
591 // with compilation flag support and whether the optional_yield is set
592 class Waiter {
593 using Signature = void(boost::system::error_code);
594 using Completion = ceph::async::Completion<Signature>;
595 std::unique_ptr<Completion> completion = nullptr;
596 int ret;
597
598 mutable std::atomic<bool> done = false;
599 mutable std::mutex lock;
600 mutable std::condition_variable cond;
601
602 template <typename ExecutionContext, typename CompletionToken>
603 auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
604 boost::asio::async_completion<CompletionToken, Signature> init(token);
605 auto& handler = init.completion_handler;
606 {
607 std::unique_lock l{lock};
608 completion = Completion::create(ctx.get_executor(), std::move(handler));
609 }
610 return init.result.get();
611 }
612
613 public:
614 int wait(optional_yield y) {
615 if (done) {
616 return ret;
617 }
9f95a23c
TL
618 if (y) {
619 auto& io_ctx = y.get_io_context();
620 auto& yield_ctx = y.get_yield_context();
621 boost::system::error_code ec;
622 async_wait(io_ctx, yield_ctx[ec]);
623 return -ec.value();
624 }
9f95a23c
TL
625 std::unique_lock l(lock);
626 cond.wait(l, [this]{return (done==true);});
627 return ret;
628 }
629
630 void finish(int r) {
631 std::unique_lock l{lock};
632 ret = r;
633 done = true;
634 if (completion) {
635 boost::system::error_code ec(-ret, boost::system::system_category());
636 Completion::post(std::move(completion), ec);
637 } else {
638 cond.notify_all();
639 }
640 }
641 };
642
f67539c2 643 int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
9f95a23c
TL
644 ceph_assert(conn);
645 if (ack_level == ack_level_t::None) {
f67539c2 646 return kafka::publish(conn, topic, json_format_pubsub_event(event));
9f95a23c
TL
647 } else {
648 // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
649 auto w = std::unique_ptr<Waiter>(new Waiter);
650 const auto rc = kafka::publish_with_confirm(conn,
651 topic,
f67539c2 652 json_format_pubsub_event(event),
9f95a23c
TL
653 std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
654 if (rc < 0) {
655 // failed to publish, does not wait for reply
656 return rc;
657 }
658 return w->wait(y);
659 }
660 }
661
662 std::string to_str() const override {
663 std::string str("Kafka Endpoint");
664 str += kafka::to_string(conn);
665 str += "\nTopic: " + topic;
666 return str;
667 }
668};
669
670static const std::string KAFKA_SCHEMA("kafka");
671#endif // ifdef WITH_RADOSGW_KAFKA_ENDPOINT
672
eafe8130
TL
673static const std::string WEBHOOK_SCHEMA("webhook");
674static const std::string UNKNOWN_SCHEMA("unknown");
675static const std::string NO_SCHEMA("");
676
677const std::string& get_schema(const std::string& endpoint) {
678 if (endpoint.empty()) {
679 return NO_SCHEMA;
680 }
11fdf7f2
TL
681 const auto pos = endpoint.find(':');
682 if (pos == std::string::npos) {
eafe8130 683 return UNKNOWN_SCHEMA;
11fdf7f2
TL
684 }
685 const auto& schema = endpoint.substr(0,pos);
686 if (schema == "http" || schema == "https") {
eafe8130 687 return WEBHOOK_SCHEMA;
11fdf7f2 688#ifdef WITH_RADOSGW_AMQP_ENDPOINT
f67539c2 689 } else if (schema == "amqp" || schema == "amqps") {
eafe8130 690 return AMQP_SCHEMA;
9f95a23c
TL
691#endif
692#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
693 } else if (schema == "kafka") {
694 return KAFKA_SCHEMA;
eafe8130
TL
695#endif
696 }
697 return UNKNOWN_SCHEMA;
698}
699
700RGWPubSubEndpoint::Ptr RGWPubSubEndpoint::create(const std::string& endpoint,
701 const std::string& topic,
702 const RGWHTTPArgs& args,
703 CephContext* cct) {
704 const auto& schema = get_schema(endpoint);
705 if (schema == WEBHOOK_SCHEMA) {
706 return Ptr(new RGWPubSubHTTPEndpoint(endpoint, args));
707#ifdef WITH_RADOSGW_AMQP_ENDPOINT
708 } else if (schema == AMQP_SCHEMA) {
11fdf7f2
TL
709 bool exists;
710 std::string version = args.get("amqp-version", &exists);
711 if (!exists) {
712 version = AMQP_0_9_1;
713 }
714 if (version == AMQP_0_9_1) {
eafe8130 715 return Ptr(new RGWPubSubAMQPEndpoint(endpoint, topic, args, cct));
11fdf7f2 716 } else if (version == AMQP_1_0) {
eafe8130 717 throw configuration_error("AMQP: v1.0 not supported");
11fdf7f2
TL
718 return nullptr;
719 } else {
eafe8130 720 throw configuration_error("AMQP: unknown version: " + version);
11fdf7f2
TL
721 return nullptr;
722 }
9f95a23c
TL
723#endif
724#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
725 } else if (schema == KAFKA_SCHEMA) {
726 return Ptr(new RGWPubSubKafkaEndpoint(endpoint, topic, args, cct));
11fdf7f2
TL
727#endif
728 }
729
eafe8130 730 throw configuration_error("unknown schema in: " + endpoint);
11fdf7f2
TL
731 return nullptr;
732}
733