]>
Commit | Line | Data |
---|---|---|
11fdf7f2 | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
9f95a23c | 2 | // vim: ts=8 sw=2 smarttab ft=cpp |
11fdf7f2 TL |
3 | |
4 | #include "rgw_pubsub_push.h" | |
5 | #include <string> | |
6 | #include <sstream> | |
7 | #include <algorithm> | |
8 | #include "include/buffer_fwd.h" | |
9 | #include "common/Formatter.h" | |
eafe8130 | 10 | #include "common/async/completion.h" |
11fdf7f2 TL |
11 | #include "rgw_common.h" |
12 | #include "rgw_data_sync.h" | |
13 | #include "rgw_pubsub.h" | |
14 | #include "acconfig.h" | |
15 | #ifdef WITH_RADOSGW_AMQP_ENDPOINT | |
16 | #include "rgw_amqp.h" | |
17 | #endif | |
9f95a23c TL |
18 | #ifdef WITH_RADOSGW_KAFKA_ENDPOINT |
19 | #include "rgw_kafka.h" | |
20 | #endif | |
11fdf7f2 TL |
21 | #include <boost/asio/yield.hpp> |
22 | #include <boost/algorithm/string.hpp> | |
23 | #include <functional> | |
24 | #include "rgw_perf_counters.h" | |
25 | ||
26 | using namespace rgw; | |
27 | ||
eafe8130 TL |
28 | template<typename EventType> |
29 | std::string json_format_pubsub_event(const EventType& event) { | |
11fdf7f2 TL |
30 | std::stringstream ss; |
31 | JSONFormatter f(false); | |
92f5a8d4 TL |
32 | { |
33 | Formatter::ObjectSection s(f, EventType::json_type_plural); | |
34 | { | |
35 | Formatter::ArraySection s(f, EventType::json_type_plural); | |
36 | encode_json("", event, &f); | |
37 | } | |
38 | } | |
11fdf7f2 TL |
39 | f.flush(ss); |
40 | return ss.str(); | |
41 | } | |
42 | ||
43 | class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint { | |
44 | private: | |
45 | const std::string endpoint; | |
46 | std::string str_ack_level; | |
47 | typedef unsigned ack_level_t; | |
48 | ack_level_t ack_level; // TODO: not used for now | |
49 | bool verify_ssl; | |
50 | static const ack_level_t ACK_LEVEL_ANY = 0; | |
51 | static const ack_level_t ACK_LEVEL_NON_ERROR = 1; | |
52 | ||
53 | // PostCR implements async execution of RGWPostHTTPData via coroutine | |
54 | class PostCR : public RGWPostHTTPData, public RGWSimpleCoroutine { | |
55 | private: | |
56 | RGWDataSyncEnv* const sync_env; | |
57 | bufferlist read_bl; | |
58 | const ack_level_t ack_level; | |
59 | ||
60 | public: | |
61 | PostCR(const std::string& _post_data, | |
62 | RGWDataSyncEnv* _sync_env, | |
63 | const std::string& endpoint, | |
64 | ack_level_t _ack_level, | |
65 | bool verify_ssl) : | |
66 | RGWPostHTTPData(_sync_env->cct, "POST", endpoint, &read_bl, verify_ssl), | |
67 | RGWSimpleCoroutine(_sync_env->cct), | |
68 | sync_env(_sync_env), | |
69 | ack_level (_ack_level) { | |
70 | // ctor also set the data to send | |
71 | set_post_data(_post_data); | |
72 | set_send_length(_post_data.length()); | |
73 | } | |
74 | ||
75 | // send message to endpoint | |
76 | int send_request() override { | |
77 | init_new_io(this); | |
78 | const auto rc = sync_env->http_manager->add_request(this); | |
79 | if (rc < 0) { | |
80 | return rc; | |
81 | } | |
82 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending); | |
83 | return 0; | |
84 | } | |
85 | ||
86 | // wait for reply | |
87 | int request_complete() override { | |
88 | if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending); | |
89 | if (ack_level == ACK_LEVEL_ANY) { | |
90 | return 0; | |
91 | } else if (ack_level == ACK_LEVEL_NON_ERROR) { | |
92 | // TODO check result code to be non-error | |
93 | } else { | |
94 | // TODO: check that result code == ack_level | |
95 | } | |
96 | return -1; | |
97 | } | |
98 | }; | |
99 | ||
100 | public: | |
101 | RGWPubSubHTTPEndpoint(const std::string& _endpoint, | |
eafe8130 TL |
102 | const RGWHTTPArgs& args) : endpoint(_endpoint) { |
103 | bool exists; | |
11fdf7f2 | 104 | |
eafe8130 TL |
105 | str_ack_level = args.get("http-ack-level", &exists); |
106 | if (!exists || str_ack_level == "any") { | |
107 | // "any" is default | |
108 | ack_level = ACK_LEVEL_ANY; | |
109 | } else if (str_ack_level == "non-error") { | |
110 | ack_level = ACK_LEVEL_NON_ERROR; | |
111 | } else { | |
112 | ack_level = std::atoi(str_ack_level.c_str()); | |
113 | if (ack_level < 100 || ack_level >= 600) { | |
114 | throw configuration_error("HTTP/S: invalid http-ack-level: " + str_ack_level); | |
11fdf7f2 TL |
115 | } |
116 | } | |
117 | ||
eafe8130 TL |
118 | auto str_verify_ssl = args.get("verify-ssl", &exists); |
119 | boost::algorithm::to_lower(str_verify_ssl); | |
120 | // verify server certificate by default | |
121 | if (!exists || str_verify_ssl == "true") { | |
122 | verify_ssl = true; | |
123 | } else if (str_verify_ssl == "false") { | |
124 | verify_ssl = false; | |
125 | } else { | |
126 | throw configuration_error("HTTP/S: verify-ssl must be true/false, not: " + str_verify_ssl); | |
127 | } | |
128 | } | |
129 | ||
11fdf7f2 TL |
130 | RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override { |
131 | return new PostCR(json_format_pubsub_event(event), env, endpoint, ack_level, verify_ssl); | |
132 | } | |
133 | ||
eafe8130 TL |
134 | RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override { |
135 | return new PostCR(json_format_pubsub_event(record), env, endpoint, ack_level, verify_ssl); | |
136 | } | |
137 | ||
138 | int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override { | |
139 | bufferlist read_bl; | |
140 | RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl); | |
141 | const auto post_data = json_format_pubsub_event(record); | |
142 | request.set_post_data(post_data); | |
143 | request.set_send_length(post_data.length()); | |
144 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending); | |
145 | const auto rc = RGWHTTP::process(&request, y); | |
146 | if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending); | |
147 | // TODO: use read_bl to process return code and handle according to ack level | |
148 | return rc; | |
149 | } | |
150 | ||
11fdf7f2 | 151 | std::string to_str() const override { |
eafe8130 | 152 | std::string str("HTTP/S Endpoint"); |
11fdf7f2 TL |
153 | str += "\nURI: " + endpoint; |
154 | str += "\nAck Level: " + str_ack_level; | |
155 | str += (verify_ssl ? "\nverify SSL" : "\ndon't verify SSL"); | |
156 | return str; | |
157 | ||
158 | } | |
159 | }; | |
160 | ||
161 | #ifdef WITH_RADOSGW_AMQP_ENDPOINT | |
162 | class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint { | |
eafe8130 | 163 | private: |
9f95a23c TL |
164 | enum class ack_level_t { |
165 | None, | |
166 | Broker, | |
167 | Routable | |
eafe8130 TL |
168 | }; |
169 | CephContext* const cct; | |
170 | const std::string endpoint; | |
171 | const std::string topic; | |
172 | const std::string exchange; | |
eafe8130 | 173 | ack_level_t ack_level; |
e306af50 | 174 | amqp::connection_ptr_t conn; |
eafe8130 | 175 | |
e306af50 | 176 | std::string get_exchange(const RGWHTTPArgs& args) { |
eafe8130 TL |
177 | bool exists; |
178 | const auto exchange = args.get("amqp-exchange", &exists); | |
179 | if (!exists) { | |
180 | throw configuration_error("AMQP: missing amqp-exchange"); | |
11fdf7f2 | 181 | } |
eafe8130 TL |
182 | return exchange; |
183 | } | |
11fdf7f2 | 184 | |
e306af50 TL |
185 | ack_level_t get_ack_level(const RGWHTTPArgs& args) { |
186 | bool exists; | |
187 | const auto& str_ack_level = args.get("amqp-ack-level", &exists); | |
188 | if (!exists || str_ack_level == "broker") { | |
189 | // "broker" is default | |
190 | return ack_level_t::Broker; | |
191 | } | |
192 | if (str_ack_level == "none") { | |
193 | return ack_level_t::None; | |
194 | } | |
195 | if (str_ack_level == "routable") { | |
196 | return ack_level_t::Routable; | |
197 | } | |
198 | throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level); | |
199 | } | |
200 | ||
11fdf7f2 TL |
201 | // NoAckPublishCR implements async amqp publishing via coroutine |
202 | // This coroutine ends when it send the message and does not wait for an ack | |
203 | class NoAckPublishCR : public RGWCoroutine { | |
204 | private: | |
11fdf7f2 TL |
205 | const std::string topic; |
206 | amqp::connection_ptr_t conn; | |
207 | const std::string message; | |
208 | ||
209 | public: | |
eafe8130 | 210 | NoAckPublishCR(CephContext* cct, |
11fdf7f2 TL |
211 | const std::string& _topic, |
212 | amqp::connection_ptr_t& _conn, | |
213 | const std::string& _message) : | |
eafe8130 | 214 | RGWCoroutine(cct), |
11fdf7f2 TL |
215 | topic(_topic), conn(_conn), message(_message) {} |
216 | ||
217 | // send message to endpoint, without waiting for reply | |
218 | int operate() override { | |
219 | reenter(this) { | |
220 | const auto rc = amqp::publish(conn, topic, message); | |
221 | if (rc < 0) { | |
222 | return set_cr_error(rc); | |
223 | } | |
224 | return set_cr_done(); | |
225 | } | |
226 | return 0; | |
227 | } | |
228 | }; | |
229 | ||
230 | // AckPublishCR implements async amqp publishing via coroutine | |
231 | // This coroutine ends when an ack is received from the borker | |
232 | // note that it does not wait for an ack fron the end client | |
233 | class AckPublishCR : public RGWCoroutine, public RGWIOProvider { | |
234 | private: | |
11fdf7f2 TL |
235 | const std::string topic; |
236 | amqp::connection_ptr_t conn; | |
237 | const std::string message; | |
11fdf7f2 TL |
238 | |
239 | public: | |
eafe8130 | 240 | AckPublishCR(CephContext* cct, |
11fdf7f2 TL |
241 | const std::string& _topic, |
242 | amqp::connection_ptr_t& _conn, | |
e306af50 | 243 | const std::string& _message) : |
eafe8130 | 244 | RGWCoroutine(cct), |
e306af50 | 245 | topic(_topic), conn(_conn), message(_message) {} |
11fdf7f2 TL |
246 | |
247 | // send message to endpoint, waiting for reply | |
248 | int operate() override { | |
249 | reenter(this) { | |
250 | yield { | |
251 | init_new_io(this); | |
252 | const auto rc = amqp::publish_with_confirm(conn, | |
253 | topic, | |
254 | message, | |
255 | std::bind(&AckPublishCR::request_complete, this, std::placeholders::_1)); | |
256 | if (rc < 0) { | |
257 | // failed to publish, does not wait for reply | |
258 | return set_cr_error(rc); | |
259 | } | |
260 | // mark as blocked on the amqp answer | |
261 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending); | |
262 | io_block(); | |
263 | return 0; | |
264 | } | |
265 | return set_cr_done(); | |
266 | } | |
267 | return 0; | |
268 | } | |
269 | ||
270 | // callback invoked from the amqp manager thread when ack/nack is received | |
271 | void request_complete(int status) { | |
272 | ceph_assert(!is_done()); | |
273 | if (status != 0) { | |
274 | // server replied with a nack | |
275 | set_cr_error(status); | |
276 | } | |
277 | io_complete(); | |
278 | if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending); | |
279 | } | |
280 | ||
281 | // TODO: why are these mandatory in RGWIOProvider? | |
282 | void set_io_user_info(void *_user_info) override { | |
283 | } | |
284 | ||
285 | void *get_io_user_info() override { | |
286 | return nullptr; | |
287 | } | |
288 | }; | |
e306af50 | 289 | |
eafe8130 TL |
290 | public: |
291 | RGWPubSubAMQPEndpoint(const std::string& _endpoint, | |
292 | const std::string& _topic, | |
293 | const RGWHTTPArgs& args, | |
294 | CephContext* _cct) : | |
295 | cct(_cct), | |
296 | endpoint(_endpoint), | |
297 | topic(_topic), | |
298 | exchange(get_exchange(args)), | |
e306af50 TL |
299 | ack_level(get_ack_level(args)), |
300 | conn(amqp::connect(endpoint, exchange, (ack_level == ack_level_t::Broker))) { | |
eafe8130 TL |
301 | if (!conn) { |
302 | throw configuration_error("AMQP: failed to create connection to: " + endpoint); | |
303 | } | |
eafe8130 TL |
304 | } |
305 | ||
306 | RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override { | |
307 | ceph_assert(conn); | |
9f95a23c | 308 | if (ack_level == ack_level_t::None) { |
eafe8130 TL |
309 | return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event)); |
310 | } else { | |
e306af50 | 311 | return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event)); |
eafe8130 TL |
312 | } |
313 | } | |
314 | ||
315 | RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override { | |
316 | ceph_assert(conn); | |
9f95a23c | 317 | if (ack_level == ack_level_t::None) { |
eafe8130 TL |
318 | return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(record)); |
319 | } else { | |
e306af50 | 320 | return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(record)); |
eafe8130 TL |
321 | } |
322 | } | |
323 | ||
324 | // this allows waiting untill "finish()" is called from a different thread | |
325 | // waiting could be blocking the waiting thread or yielding, depending | |
326 | // with compilation flag support and whether the optional_yield is set | |
327 | class Waiter { | |
328 | using Signature = void(boost::system::error_code); | |
329 | using Completion = ceph::async::Completion<Signature>; | |
330 | std::unique_ptr<Completion> completion = nullptr; | |
331 | int ret; | |
332 | ||
333 | mutable std::atomic<bool> done = false; | |
334 | mutable std::mutex lock; | |
335 | mutable std::condition_variable cond; | |
336 | ||
337 | template <typename ExecutionContext, typename CompletionToken> | |
338 | auto async_wait(ExecutionContext& ctx, CompletionToken&& token) { | |
339 | boost::asio::async_completion<CompletionToken, Signature> init(token); | |
340 | auto& handler = init.completion_handler; | |
341 | { | |
342 | std::unique_lock l{lock}; | |
343 | completion = Completion::create(ctx.get_executor(), std::move(handler)); | |
344 | } | |
345 | return init.result.get(); | |
346 | } | |
347 | ||
11fdf7f2 | 348 | public: |
eafe8130 TL |
349 | int wait(optional_yield y) { |
350 | if (done) { | |
351 | return ret; | |
352 | } | |
353 | #ifdef HAVE_BOOST_CONTEXT | |
354 | if (y) { | |
355 | auto& io_ctx = y.get_io_context(); | |
356 | auto& yield_ctx = y.get_yield_context(); | |
357 | boost::system::error_code ec; | |
358 | async_wait(io_ctx, yield_ctx[ec]); | |
359 | return -ec.value(); | |
11fdf7f2 | 360 | } |
eafe8130 TL |
361 | #endif |
362 | std::unique_lock l(lock); | |
363 | cond.wait(l, [this]{return (done==true);}); | |
364 | return ret; | |
11fdf7f2 TL |
365 | } |
366 | ||
eafe8130 TL |
367 | void finish(int r) { |
368 | std::unique_lock l{lock}; | |
369 | ret = r; | |
370 | done = true; | |
371 | if (completion) { | |
372 | boost::system::error_code ec(-ret, boost::system::system_category()); | |
373 | Completion::post(std::move(completion), ec); | |
11fdf7f2 | 374 | } else { |
eafe8130 | 375 | cond.notify_all(); |
11fdf7f2 TL |
376 | } |
377 | } | |
eafe8130 | 378 | }; |
11fdf7f2 | 379 | |
eafe8130 TL |
380 | int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override { |
381 | ceph_assert(conn); | |
9f95a23c | 382 | if (ack_level == ack_level_t::None) { |
eafe8130 TL |
383 | return amqp::publish(conn, topic, json_format_pubsub_event(record)); |
384 | } else { | |
385 | // TODO: currently broker and routable are the same - this will require different flags but the same mechanism | |
386 | // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine | |
387 | auto w = std::unique_ptr<Waiter>(new Waiter); | |
388 | const auto rc = amqp::publish_with_confirm(conn, | |
389 | topic, | |
390 | json_format_pubsub_event(record), | |
391 | std::bind(&Waiter::finish, w.get(), std::placeholders::_1)); | |
392 | if (rc < 0) { | |
393 | // failed to publish, does not wait for reply | |
394 | return rc; | |
395 | } | |
396 | return w->wait(y); | |
11fdf7f2 | 397 | } |
eafe8130 TL |
398 | } |
399 | ||
400 | std::string to_str() const override { | |
401 | std::string str("AMQP(0.9.1) Endpoint"); | |
402 | str += "\nURI: " + endpoint; | |
403 | str += "\nTopic: " + topic; | |
404 | str += "\nExchange: " + exchange; | |
eafe8130 TL |
405 | return str; |
406 | } | |
11fdf7f2 TL |
407 | }; |
408 | ||
409 | static const std::string AMQP_0_9_1("0-9-1"); | |
410 | static const std::string AMQP_1_0("1-0"); | |
eafe8130 | 411 | static const std::string AMQP_SCHEMA("amqp"); |
11fdf7f2 TL |
412 | #endif // ifdef WITH_RADOSGW_AMQP_ENDPOINT |
413 | ||
9f95a23c TL |
414 | |
415 | #ifdef WITH_RADOSGW_KAFKA_ENDPOINT | |
416 | class RGWPubSubKafkaEndpoint : public RGWPubSubEndpoint { | |
417 | private: | |
418 | enum class ack_level_t { | |
419 | None, | |
420 | Broker, | |
421 | }; | |
422 | CephContext* const cct; | |
423 | const std::string topic; | |
424 | kafka::connection_ptr_t conn; | |
425 | const ack_level_t ack_level; | |
426 | ||
e306af50 | 427 | bool get_verify_ssl(const RGWHTTPArgs& args) { |
9f95a23c TL |
428 | bool exists; |
429 | auto str_verify_ssl = args.get("verify-ssl", &exists); | |
430 | if (!exists) { | |
431 | // verify server certificate by default | |
432 | return true; | |
433 | } | |
434 | boost::algorithm::to_lower(str_verify_ssl); | |
435 | if (str_verify_ssl == "true") { | |
436 | return true; | |
437 | } | |
438 | if (str_verify_ssl == "false") { | |
439 | return false; | |
440 | } | |
441 | throw configuration_error("'verify-ssl' must be true/false, not: " + str_verify_ssl); | |
442 | } | |
443 | ||
e306af50 | 444 | bool get_use_ssl(const RGWHTTPArgs& args) { |
9f95a23c TL |
445 | bool exists; |
446 | auto str_use_ssl = args.get("use-ssl", &exists); | |
447 | if (!exists) { | |
448 | // by default ssl not used | |
449 | return false; | |
450 | } | |
451 | boost::algorithm::to_lower(str_use_ssl); | |
452 | if (str_use_ssl == "true") { | |
453 | return true; | |
454 | } | |
455 | if (str_use_ssl == "false") { | |
456 | return false; | |
457 | } | |
458 | throw configuration_error("'use-ssl' must be true/false, not: " + str_use_ssl); | |
459 | } | |
460 | ||
e306af50 | 461 | ack_level_t get_ack_level(const RGWHTTPArgs& args) { |
9f95a23c TL |
462 | bool exists; |
463 | // get ack level | |
464 | const auto str_ack_level = args.get("kafka-ack-level", &exists); | |
465 | if (!exists || str_ack_level == "broker") { | |
466 | // "broker" is default | |
467 | return ack_level_t::Broker; | |
468 | } | |
469 | if (str_ack_level == "none") { | |
470 | return ack_level_t::None; | |
471 | } | |
472 | throw configuration_error("Kafka: invalid kafka-ack-level: " + str_ack_level); | |
473 | } | |
474 | ||
475 | // NoAckPublishCR implements async kafka publishing via coroutine | |
476 | // This coroutine ends when it send the message and does not wait for an ack | |
477 | class NoAckPublishCR : public RGWCoroutine { | |
478 | private: | |
479 | const std::string topic; | |
480 | kafka::connection_ptr_t conn; | |
481 | const std::string message; | |
482 | ||
483 | public: | |
484 | NoAckPublishCR(CephContext* cct, | |
485 | const std::string& _topic, | |
486 | kafka::connection_ptr_t& _conn, | |
487 | const std::string& _message) : | |
488 | RGWCoroutine(cct), | |
489 | topic(_topic), conn(_conn), message(_message) {} | |
490 | ||
491 | // send message to endpoint, without waiting for reply | |
492 | int operate() override { | |
493 | reenter(this) { | |
494 | const auto rc = kafka::publish(conn, topic, message); | |
495 | if (rc < 0) { | |
496 | return set_cr_error(rc); | |
497 | } | |
498 | return set_cr_done(); | |
499 | } | |
500 | return 0; | |
501 | } | |
502 | }; | |
503 | ||
504 | // AckPublishCR implements async kafka publishing via coroutine | |
505 | // This coroutine ends when an ack is received from the borker | |
506 | // note that it does not wait for an ack fron the end client | |
507 | class AckPublishCR : public RGWCoroutine, public RGWIOProvider { | |
508 | private: | |
509 | const std::string topic; | |
510 | kafka::connection_ptr_t conn; | |
511 | const std::string message; | |
512 | ||
513 | public: | |
514 | AckPublishCR(CephContext* cct, | |
515 | const std::string& _topic, | |
516 | kafka::connection_ptr_t& _conn, | |
517 | const std::string& _message) : | |
518 | RGWCoroutine(cct), | |
519 | topic(_topic), conn(_conn), message(_message) {} | |
520 | ||
521 | // send message to endpoint, waiting for reply | |
522 | int operate() override { | |
523 | reenter(this) { | |
524 | yield { | |
525 | init_new_io(this); | |
526 | const auto rc = kafka::publish_with_confirm(conn, | |
527 | topic, | |
528 | message, | |
529 | std::bind(&AckPublishCR::request_complete, this, std::placeholders::_1)); | |
530 | if (rc < 0) { | |
531 | // failed to publish, does not wait for reply | |
532 | return set_cr_error(rc); | |
533 | } | |
534 | // mark as blocked on the kafka answer | |
535 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending); | |
536 | io_block(); | |
537 | return 0; | |
538 | } | |
539 | return set_cr_done(); | |
540 | } | |
541 | return 0; | |
542 | } | |
543 | ||
544 | // callback invoked from the kafka manager thread when ack/nack is received | |
545 | void request_complete(int status) { | |
546 | ceph_assert(!is_done()); | |
547 | if (status != 0) { | |
548 | // server replied with a nack | |
549 | set_cr_error(status); | |
550 | } | |
551 | io_complete(); | |
552 | if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending); | |
553 | } | |
554 | ||
555 | // TODO: why are these mandatory in RGWIOProvider? | |
556 | void set_io_user_info(void *_user_info) override { | |
557 | } | |
558 | ||
559 | void *get_io_user_info() override { | |
560 | return nullptr; | |
561 | } | |
562 | }; | |
563 | ||
564 | public: | |
565 | RGWPubSubKafkaEndpoint(const std::string& _endpoint, | |
566 | const std::string& _topic, | |
567 | const RGWHTTPArgs& args, | |
568 | CephContext* _cct) : | |
569 | cct(_cct), | |
570 | topic(_topic), | |
571 | conn(kafka::connect(_endpoint, get_use_ssl(args), get_verify_ssl(args), args.get_optional("ca-location"))) , | |
572 | ack_level(get_ack_level(args)) { | |
573 | if (!conn) { | |
574 | throw configuration_error("Kafka: failed to create connection to: " + _endpoint); | |
575 | } | |
576 | } | |
577 | ||
578 | RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override { | |
579 | ceph_assert(conn); | |
580 | if (ack_level == ack_level_t::None) { | |
581 | return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event)); | |
582 | } else { | |
583 | return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event)); | |
584 | } | |
585 | } | |
586 | ||
587 | RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override { | |
588 | ceph_assert(conn); | |
589 | if (ack_level == ack_level_t::None) { | |
590 | return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(record)); | |
591 | } else { | |
592 | return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(record)); | |
593 | } | |
594 | } | |
595 | ||
596 | // this allows waiting untill "finish()" is called from a different thread | |
597 | // waiting could be blocking the waiting thread or yielding, depending | |
598 | // with compilation flag support and whether the optional_yield is set | |
599 | class Waiter { | |
600 | using Signature = void(boost::system::error_code); | |
601 | using Completion = ceph::async::Completion<Signature>; | |
602 | std::unique_ptr<Completion> completion = nullptr; | |
603 | int ret; | |
604 | ||
605 | mutable std::atomic<bool> done = false; | |
606 | mutable std::mutex lock; | |
607 | mutable std::condition_variable cond; | |
608 | ||
609 | template <typename ExecutionContext, typename CompletionToken> | |
610 | auto async_wait(ExecutionContext& ctx, CompletionToken&& token) { | |
611 | boost::asio::async_completion<CompletionToken, Signature> init(token); | |
612 | auto& handler = init.completion_handler; | |
613 | { | |
614 | std::unique_lock l{lock}; | |
615 | completion = Completion::create(ctx.get_executor(), std::move(handler)); | |
616 | } | |
617 | return init.result.get(); | |
618 | } | |
619 | ||
620 | public: | |
621 | int wait(optional_yield y) { | |
622 | if (done) { | |
623 | return ret; | |
624 | } | |
625 | #ifdef HAVE_BOOST_CONTEXT | |
626 | if (y) { | |
627 | auto& io_ctx = y.get_io_context(); | |
628 | auto& yield_ctx = y.get_yield_context(); | |
629 | boost::system::error_code ec; | |
630 | async_wait(io_ctx, yield_ctx[ec]); | |
631 | return -ec.value(); | |
632 | } | |
633 | #endif | |
634 | std::unique_lock l(lock); | |
635 | cond.wait(l, [this]{return (done==true);}); | |
636 | return ret; | |
637 | } | |
638 | ||
639 | void finish(int r) { | |
640 | std::unique_lock l{lock}; | |
641 | ret = r; | |
642 | done = true; | |
643 | if (completion) { | |
644 | boost::system::error_code ec(-ret, boost::system::system_category()); | |
645 | Completion::post(std::move(completion), ec); | |
646 | } else { | |
647 | cond.notify_all(); | |
648 | } | |
649 | } | |
650 | }; | |
651 | ||
652 | int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override { | |
653 | ceph_assert(conn); | |
654 | if (ack_level == ack_level_t::None) { | |
655 | return kafka::publish(conn, topic, json_format_pubsub_event(record)); | |
656 | } else { | |
657 | // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine | |
658 | auto w = std::unique_ptr<Waiter>(new Waiter); | |
659 | const auto rc = kafka::publish_with_confirm(conn, | |
660 | topic, | |
661 | json_format_pubsub_event(record), | |
662 | std::bind(&Waiter::finish, w.get(), std::placeholders::_1)); | |
663 | if (rc < 0) { | |
664 | // failed to publish, does not wait for reply | |
665 | return rc; | |
666 | } | |
667 | return w->wait(y); | |
668 | } | |
669 | } | |
670 | ||
671 | std::string to_str() const override { | |
672 | std::string str("Kafka Endpoint"); | |
673 | str += kafka::to_string(conn); | |
674 | str += "\nTopic: " + topic; | |
675 | return str; | |
676 | } | |
677 | }; | |
678 | ||
679 | static const std::string KAFKA_SCHEMA("kafka"); | |
680 | #endif // ifdef WITH_RADOSGW_KAFKA_ENDPOINT | |
681 | ||
eafe8130 TL |
682 | static const std::string WEBHOOK_SCHEMA("webhook"); |
683 | static const std::string UNKNOWN_SCHEMA("unknown"); | |
684 | static const std::string NO_SCHEMA(""); | |
685 | ||
686 | const std::string& get_schema(const std::string& endpoint) { | |
687 | if (endpoint.empty()) { | |
688 | return NO_SCHEMA; | |
689 | } | |
11fdf7f2 TL |
690 | const auto pos = endpoint.find(':'); |
691 | if (pos == std::string::npos) { | |
eafe8130 | 692 | return UNKNOWN_SCHEMA; |
11fdf7f2 TL |
693 | } |
694 | const auto& schema = endpoint.substr(0,pos); | |
695 | if (schema == "http" || schema == "https") { | |
eafe8130 | 696 | return WEBHOOK_SCHEMA; |
11fdf7f2 TL |
697 | #ifdef WITH_RADOSGW_AMQP_ENDPOINT |
698 | } else if (schema == "amqp") { | |
eafe8130 | 699 | return AMQP_SCHEMA; |
9f95a23c TL |
700 | #endif |
701 | #ifdef WITH_RADOSGW_KAFKA_ENDPOINT | |
702 | } else if (schema == "kafka") { | |
703 | return KAFKA_SCHEMA; | |
eafe8130 TL |
704 | #endif |
705 | } | |
706 | return UNKNOWN_SCHEMA; | |
707 | } | |
708 | ||
709 | RGWPubSubEndpoint::Ptr RGWPubSubEndpoint::create(const std::string& endpoint, | |
710 | const std::string& topic, | |
711 | const RGWHTTPArgs& args, | |
712 | CephContext* cct) { | |
713 | const auto& schema = get_schema(endpoint); | |
714 | if (schema == WEBHOOK_SCHEMA) { | |
715 | return Ptr(new RGWPubSubHTTPEndpoint(endpoint, args)); | |
716 | #ifdef WITH_RADOSGW_AMQP_ENDPOINT | |
717 | } else if (schema == AMQP_SCHEMA) { | |
11fdf7f2 TL |
718 | bool exists; |
719 | std::string version = args.get("amqp-version", &exists); | |
720 | if (!exists) { | |
721 | version = AMQP_0_9_1; | |
722 | } | |
723 | if (version == AMQP_0_9_1) { | |
eafe8130 | 724 | return Ptr(new RGWPubSubAMQPEndpoint(endpoint, topic, args, cct)); |
11fdf7f2 | 725 | } else if (version == AMQP_1_0) { |
eafe8130 | 726 | throw configuration_error("AMQP: v1.0 not supported"); |
11fdf7f2 TL |
727 | return nullptr; |
728 | } else { | |
eafe8130 | 729 | throw configuration_error("AMQP: unknown version: " + version); |
11fdf7f2 TL |
730 | return nullptr; |
731 | } | |
732 | } else if (schema == "amqps") { | |
eafe8130 | 733 | throw configuration_error("AMQP: ssl not supported"); |
11fdf7f2 | 734 | return nullptr; |
9f95a23c TL |
735 | #endif |
736 | #ifdef WITH_RADOSGW_KAFKA_ENDPOINT | |
737 | } else if (schema == KAFKA_SCHEMA) { | |
738 | return Ptr(new RGWPubSubKafkaEndpoint(endpoint, topic, args, cct)); | |
11fdf7f2 TL |
739 | #endif |
740 | } | |
741 | ||
eafe8130 | 742 | throw configuration_error("unknown schema in: " + endpoint); |
11fdf7f2 TL |
743 | return nullptr; |
744 | } | |
745 |