]>
Commit | Line | Data |
---|---|---|
11fdf7f2 | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
9f95a23c | 2 | // vim: ts=8 sw=2 smarttab ft=cpp |
11fdf7f2 TL |
3 | |
4 | #include "rgw_pubsub_push.h" | |
5 | #include <string> | |
6 | #include <sstream> | |
7 | #include <algorithm> | |
8 | #include "include/buffer_fwd.h" | |
9 | #include "common/Formatter.h" | |
20effc67 | 10 | #include "common/iso_8601.h" |
eafe8130 | 11 | #include "common/async/completion.h" |
11fdf7f2 TL |
12 | #include "rgw_common.h" |
13 | #include "rgw_data_sync.h" | |
14 | #include "rgw_pubsub.h" | |
15 | #include "acconfig.h" | |
16 | #ifdef WITH_RADOSGW_AMQP_ENDPOINT | |
17 | #include "rgw_amqp.h" | |
18 | #endif | |
9f95a23c TL |
19 | #ifdef WITH_RADOSGW_KAFKA_ENDPOINT |
20 | #include "rgw_kafka.h" | |
21 | #endif | |
11fdf7f2 TL |
22 | #include <boost/asio/yield.hpp> |
23 | #include <boost/algorithm/string.hpp> | |
24 | #include <functional> | |
25 | #include "rgw_perf_counters.h" | |
26 | ||
27 | using namespace rgw; | |
28 | ||
eafe8130 TL |
29 | template<typename EventType> |
30 | std::string json_format_pubsub_event(const EventType& event) { | |
11fdf7f2 TL |
31 | std::stringstream ss; |
32 | JSONFormatter f(false); | |
92f5a8d4 TL |
33 | { |
34 | Formatter::ObjectSection s(f, EventType::json_type_plural); | |
35 | { | |
36 | Formatter::ArraySection s(f, EventType::json_type_plural); | |
37 | encode_json("", event, &f); | |
38 | } | |
39 | } | |
11fdf7f2 TL |
40 | f.flush(ss); |
41 | return ss.str(); | |
42 | } | |
20effc67 TL |
43 | |
44 | bool get_bool(const RGWHTTPArgs& args, const std::string& name, bool default_value) { | |
45 | bool value; | |
46 | bool exists; | |
47 | if (args.get_bool(name.c_str(), &value, &exists) == -EINVAL) { | |
48 | throw RGWPubSubEndpoint::configuration_error("invalid boolean value for " + name); | |
49 | } | |
50 | if (!exists) { | |
51 | return default_value; | |
52 | } | |
53 | return value; | |
54 | } | |
11fdf7f2 TL |
55 | |
56 | class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint { | |
57 | private: | |
58 | const std::string endpoint; | |
11fdf7f2 TL |
59 | typedef unsigned ack_level_t; |
60 | ack_level_t ack_level; // TODO: not used for now | |
20effc67 TL |
61 | const bool verify_ssl; |
62 | const bool cloudevents; | |
11fdf7f2 TL |
63 | static const ack_level_t ACK_LEVEL_ANY = 0; |
64 | static const ack_level_t ACK_LEVEL_NON_ERROR = 1; | |
65 | ||
66 | // PostCR implements async execution of RGWPostHTTPData via coroutine | |
67 | class PostCR : public RGWPostHTTPData, public RGWSimpleCoroutine { | |
68 | private: | |
69 | RGWDataSyncEnv* const sync_env; | |
70 | bufferlist read_bl; | |
71 | const ack_level_t ack_level; | |
72 | ||
73 | public: | |
74 | PostCR(const std::string& _post_data, | |
75 | RGWDataSyncEnv* _sync_env, | |
76 | const std::string& endpoint, | |
77 | ack_level_t _ack_level, | |
78 | bool verify_ssl) : | |
79 | RGWPostHTTPData(_sync_env->cct, "POST", endpoint, &read_bl, verify_ssl), | |
80 | RGWSimpleCoroutine(_sync_env->cct), | |
81 | sync_env(_sync_env), | |
82 | ack_level (_ack_level) { | |
83 | // ctor also set the data to send | |
84 | set_post_data(_post_data); | |
85 | set_send_length(_post_data.length()); | |
86 | } | |
87 | ||
88 | // send message to endpoint | |
b3b6e05e | 89 | int send_request(const DoutPrefixProvider *dpp) override { |
11fdf7f2 TL |
90 | init_new_io(this); |
91 | const auto rc = sync_env->http_manager->add_request(this); | |
92 | if (rc < 0) { | |
93 | return rc; | |
94 | } | |
95 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending); | |
96 | return 0; | |
97 | } | |
98 | ||
99 | // wait for reply | |
100 | int request_complete() override { | |
101 | if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending); | |
102 | if (ack_level == ACK_LEVEL_ANY) { | |
103 | return 0; | |
104 | } else if (ack_level == ACK_LEVEL_NON_ERROR) { | |
105 | // TODO check result code to be non-error | |
106 | } else { | |
107 | // TODO: check that result code == ack_level | |
108 | } | |
109 | return -1; | |
110 | } | |
111 | }; | |
112 | ||
113 | public: | |
20effc67 TL |
114 | RGWPubSubHTTPEndpoint(const std::string& _endpoint, const RGWHTTPArgs& args) : |
115 | endpoint(_endpoint), verify_ssl(get_bool(args, "verify-ssl", true)), cloudevents(get_bool(args, "cloudevents", false)) | |
116 | { | |
eafe8130 | 117 | bool exists; |
20effc67 | 118 | const auto& str_ack_level = args.get("http-ack-level", &exists); |
eafe8130 TL |
119 | if (!exists || str_ack_level == "any") { |
120 | // "any" is default | |
121 | ack_level = ACK_LEVEL_ANY; | |
122 | } else if (str_ack_level == "non-error") { | |
123 | ack_level = ACK_LEVEL_NON_ERROR; | |
124 | } else { | |
125 | ack_level = std::atoi(str_ack_level.c_str()); | |
126 | if (ack_level < 100 || ack_level >= 600) { | |
127 | throw configuration_error("HTTP/S: invalid http-ack-level: " + str_ack_level); | |
11fdf7f2 TL |
128 | } |
129 | } | |
eafe8130 TL |
130 | } |
131 | ||
11fdf7f2 TL |
132 | RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override { |
133 | return new PostCR(json_format_pubsub_event(event), env, endpoint, ack_level, verify_ssl); | |
134 | } | |
135 | ||
f67539c2 TL |
136 | RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) override { |
137 | return new PostCR(json_format_pubsub_event(event), env, endpoint, ack_level, verify_ssl); | |
eafe8130 TL |
138 | } |
139 | ||
f67539c2 | 140 | int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override { |
eafe8130 TL |
141 | bufferlist read_bl; |
142 | RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl); | |
f67539c2 | 143 | const auto post_data = json_format_pubsub_event(event); |
20effc67 TL |
144 | if (cloudevents) { |
145 | // following: https://github.com/cloudevents/spec/blob/v1.0.1/http-protocol-binding.md | |
146 | // using "Binary Content Mode" | |
147 | request.append_header("ce-specversion", "1.0"); | |
148 | request.append_header("ce-type", "com.amazonaws." + event.eventName); | |
149 | request.append_header("ce-time", to_iso_8601(event.eventTime)); | |
150 | // default output of iso8601 is also RFC3339 compatible | |
151 | request.append_header("ce-id", event.x_amz_request_id + "." + event.x_amz_id_2); | |
152 | request.append_header("ce-source", event.eventSource + "." + event.awsRegion + "." + event.bucket_name); | |
153 | request.append_header("ce-subject", event.object_key); | |
154 | } | |
eafe8130 TL |
155 | request.set_post_data(post_data); |
156 | request.set_send_length(post_data.length()); | |
522d829b | 157 | request.append_header("Content-Type", "application/json"); |
eafe8130 TL |
158 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending); |
159 | const auto rc = RGWHTTP::process(&request, y); | |
160 | if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending); | |
161 | // TODO: use read_bl to process return code and handle according to ack level | |
162 | return rc; | |
163 | } | |
164 | ||
11fdf7f2 | 165 | std::string to_str() const override { |
eafe8130 | 166 | std::string str("HTTP/S Endpoint"); |
11fdf7f2 | 167 | str += "\nURI: " + endpoint; |
11fdf7f2 TL |
168 | str += (verify_ssl ? "\nverify SSL" : "\ndon't verify SSL"); |
169 | return str; | |
11fdf7f2 TL |
170 | } |
171 | }; | |
172 | ||
173 | #ifdef WITH_RADOSGW_AMQP_ENDPOINT | |
174 | class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint { | |
eafe8130 | 175 | private: |
9f95a23c TL |
176 | enum class ack_level_t { |
177 | None, | |
178 | Broker, | |
179 | Routable | |
eafe8130 TL |
180 | }; |
181 | CephContext* const cct; | |
182 | const std::string endpoint; | |
183 | const std::string topic; | |
184 | const std::string exchange; | |
eafe8130 | 185 | ack_level_t ack_level; |
e306af50 | 186 | amqp::connection_ptr_t conn; |
eafe8130 | 187 | |
f67539c2 TL |
188 | bool get_verify_ssl(const RGWHTTPArgs& args) { |
189 | bool exists; | |
190 | auto str_verify_ssl = args.get("verify-ssl", &exists); | |
191 | if (!exists) { | |
192 | // verify server certificate by default | |
193 | return true; | |
194 | } | |
195 | boost::algorithm::to_lower(str_verify_ssl); | |
196 | if (str_verify_ssl == "true") { | |
197 | return true; | |
198 | } | |
199 | if (str_verify_ssl == "false") { | |
200 | return false; | |
201 | } | |
202 | throw configuration_error("'verify-ssl' must be true/false, not: " + str_verify_ssl); | |
203 | } | |
204 | ||
e306af50 | 205 | std::string get_exchange(const RGWHTTPArgs& args) { |
eafe8130 TL |
206 | bool exists; |
207 | const auto exchange = args.get("amqp-exchange", &exists); | |
208 | if (!exists) { | |
209 | throw configuration_error("AMQP: missing amqp-exchange"); | |
11fdf7f2 | 210 | } |
eafe8130 TL |
211 | return exchange; |
212 | } | |
11fdf7f2 | 213 | |
e306af50 TL |
214 | ack_level_t get_ack_level(const RGWHTTPArgs& args) { |
215 | bool exists; | |
216 | const auto& str_ack_level = args.get("amqp-ack-level", &exists); | |
217 | if (!exists || str_ack_level == "broker") { | |
218 | // "broker" is default | |
219 | return ack_level_t::Broker; | |
220 | } | |
221 | if (str_ack_level == "none") { | |
222 | return ack_level_t::None; | |
223 | } | |
224 | if (str_ack_level == "routable") { | |
225 | return ack_level_t::Routable; | |
226 | } | |
227 | throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level); | |
228 | } | |
229 | ||
11fdf7f2 TL |
230 | // NoAckPublishCR implements async amqp publishing via coroutine |
231 | // This coroutine ends when it send the message and does not wait for an ack | |
232 | class NoAckPublishCR : public RGWCoroutine { | |
233 | private: | |
11fdf7f2 TL |
234 | const std::string topic; |
235 | amqp::connection_ptr_t conn; | |
236 | const std::string message; | |
237 | ||
238 | public: | |
eafe8130 | 239 | NoAckPublishCR(CephContext* cct, |
11fdf7f2 TL |
240 | const std::string& _topic, |
241 | amqp::connection_ptr_t& _conn, | |
242 | const std::string& _message) : | |
eafe8130 | 243 | RGWCoroutine(cct), |
11fdf7f2 TL |
244 | topic(_topic), conn(_conn), message(_message) {} |
245 | ||
246 | // send message to endpoint, without waiting for reply | |
b3b6e05e | 247 | int operate(const DoutPrefixProvider *dpp) override { |
11fdf7f2 TL |
248 | reenter(this) { |
249 | const auto rc = amqp::publish(conn, topic, message); | |
250 | if (rc < 0) { | |
251 | return set_cr_error(rc); | |
252 | } | |
253 | return set_cr_done(); | |
254 | } | |
255 | return 0; | |
256 | } | |
257 | }; | |
258 | ||
259 | // AckPublishCR implements async amqp publishing via coroutine | |
260 | // This coroutine ends when an ack is received from the borker | |
261 | // note that it does not wait for an ack fron the end client | |
262 | class AckPublishCR : public RGWCoroutine, public RGWIOProvider { | |
263 | private: | |
11fdf7f2 TL |
264 | const std::string topic; |
265 | amqp::connection_ptr_t conn; | |
266 | const std::string message; | |
11fdf7f2 TL |
267 | |
268 | public: | |
eafe8130 | 269 | AckPublishCR(CephContext* cct, |
11fdf7f2 TL |
270 | const std::string& _topic, |
271 | amqp::connection_ptr_t& _conn, | |
e306af50 | 272 | const std::string& _message) : |
eafe8130 | 273 | RGWCoroutine(cct), |
e306af50 | 274 | topic(_topic), conn(_conn), message(_message) {} |
11fdf7f2 TL |
275 | |
276 | // send message to endpoint, waiting for reply | |
b3b6e05e | 277 | int operate(const DoutPrefixProvider *dpp) override { |
11fdf7f2 TL |
278 | reenter(this) { |
279 | yield { | |
280 | init_new_io(this); | |
281 | const auto rc = amqp::publish_with_confirm(conn, | |
282 | topic, | |
283 | message, | |
284 | std::bind(&AckPublishCR::request_complete, this, std::placeholders::_1)); | |
285 | if (rc < 0) { | |
286 | // failed to publish, does not wait for reply | |
287 | return set_cr_error(rc); | |
288 | } | |
289 | // mark as blocked on the amqp answer | |
290 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending); | |
291 | io_block(); | |
292 | return 0; | |
293 | } | |
294 | return set_cr_done(); | |
295 | } | |
296 | return 0; | |
297 | } | |
298 | ||
299 | // callback invoked from the amqp manager thread when ack/nack is received | |
300 | void request_complete(int status) { | |
301 | ceph_assert(!is_done()); | |
302 | if (status != 0) { | |
303 | // server replied with a nack | |
304 | set_cr_error(status); | |
305 | } | |
306 | io_complete(); | |
307 | if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending); | |
308 | } | |
309 | ||
310 | // TODO: why are these mandatory in RGWIOProvider? | |
311 | void set_io_user_info(void *_user_info) override { | |
312 | } | |
313 | ||
314 | void *get_io_user_info() override { | |
315 | return nullptr; | |
316 | } | |
317 | }; | |
e306af50 | 318 | |
eafe8130 TL |
319 | public: |
320 | RGWPubSubAMQPEndpoint(const std::string& _endpoint, | |
321 | const std::string& _topic, | |
322 | const RGWHTTPArgs& args, | |
323 | CephContext* _cct) : | |
324 | cct(_cct), | |
325 | endpoint(_endpoint), | |
326 | topic(_topic), | |
327 | exchange(get_exchange(args)), | |
e306af50 | 328 | ack_level(get_ack_level(args)), |
f67539c2 | 329 | conn(amqp::connect(endpoint, exchange, (ack_level == ack_level_t::Broker), get_verify_ssl(args), args.get_optional("ca-location"))) { |
eafe8130 TL |
330 | if (!conn) { |
331 | throw configuration_error("AMQP: failed to create connection to: " + endpoint); | |
332 | } | |
eafe8130 TL |
333 | } |
334 | ||
335 | RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override { | |
336 | ceph_assert(conn); | |
9f95a23c | 337 | if (ack_level == ack_level_t::None) { |
eafe8130 TL |
338 | return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event)); |
339 | } else { | |
e306af50 | 340 | return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event)); |
eafe8130 TL |
341 | } |
342 | } | |
343 | ||
f67539c2 | 344 | RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) override { |
eafe8130 | 345 | ceph_assert(conn); |
9f95a23c | 346 | if (ack_level == ack_level_t::None) { |
f67539c2 | 347 | return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event)); |
eafe8130 | 348 | } else { |
f67539c2 | 349 | return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event)); |
eafe8130 TL |
350 | } |
351 | } | |
352 | ||
353 | // this allows waiting untill "finish()" is called from a different thread | |
354 | // waiting could be blocking the waiting thread or yielding, depending | |
355 | // with compilation flag support and whether the optional_yield is set | |
356 | class Waiter { | |
357 | using Signature = void(boost::system::error_code); | |
358 | using Completion = ceph::async::Completion<Signature>; | |
359 | std::unique_ptr<Completion> completion = nullptr; | |
360 | int ret; | |
361 | ||
362 | mutable std::atomic<bool> done = false; | |
363 | mutable std::mutex lock; | |
364 | mutable std::condition_variable cond; | |
365 | ||
366 | template <typename ExecutionContext, typename CompletionToken> | |
367 | auto async_wait(ExecutionContext& ctx, CompletionToken&& token) { | |
368 | boost::asio::async_completion<CompletionToken, Signature> init(token); | |
369 | auto& handler = init.completion_handler; | |
370 | { | |
371 | std::unique_lock l{lock}; | |
372 | completion = Completion::create(ctx.get_executor(), std::move(handler)); | |
373 | } | |
374 | return init.result.get(); | |
375 | } | |
376 | ||
11fdf7f2 | 377 | public: |
eafe8130 TL |
378 | int wait(optional_yield y) { |
379 | if (done) { | |
380 | return ret; | |
381 | } | |
eafe8130 | 382 | if (y) { |
f67539c2 | 383 | auto& io_ctx = y.get_io_context(); |
eafe8130 TL |
384 | auto& yield_ctx = y.get_yield_context(); |
385 | boost::system::error_code ec; | |
386 | async_wait(io_ctx, yield_ctx[ec]); | |
387 | return -ec.value(); | |
11fdf7f2 | 388 | } |
eafe8130 TL |
389 | std::unique_lock l(lock); |
390 | cond.wait(l, [this]{return (done==true);}); | |
391 | return ret; | |
11fdf7f2 TL |
392 | } |
393 | ||
eafe8130 TL |
394 | void finish(int r) { |
395 | std::unique_lock l{lock}; | |
396 | ret = r; | |
397 | done = true; | |
398 | if (completion) { | |
399 | boost::system::error_code ec(-ret, boost::system::system_category()); | |
400 | Completion::post(std::move(completion), ec); | |
11fdf7f2 | 401 | } else { |
eafe8130 | 402 | cond.notify_all(); |
11fdf7f2 TL |
403 | } |
404 | } | |
eafe8130 | 405 | }; |
11fdf7f2 | 406 | |
f67539c2 | 407 | int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override { |
eafe8130 | 408 | ceph_assert(conn); |
9f95a23c | 409 | if (ack_level == ack_level_t::None) { |
f67539c2 | 410 | return amqp::publish(conn, topic, json_format_pubsub_event(event)); |
eafe8130 TL |
411 | } else { |
412 | // TODO: currently broker and routable are the same - this will require different flags but the same mechanism | |
413 | // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine | |
414 | auto w = std::unique_ptr<Waiter>(new Waiter); | |
415 | const auto rc = amqp::publish_with_confirm(conn, | |
416 | topic, | |
f67539c2 | 417 | json_format_pubsub_event(event), |
eafe8130 TL |
418 | std::bind(&Waiter::finish, w.get(), std::placeholders::_1)); |
419 | if (rc < 0) { | |
420 | // failed to publish, does not wait for reply | |
421 | return rc; | |
422 | } | |
423 | return w->wait(y); | |
11fdf7f2 | 424 | } |
eafe8130 TL |
425 | } |
426 | ||
427 | std::string to_str() const override { | |
428 | std::string str("AMQP(0.9.1) Endpoint"); | |
429 | str += "\nURI: " + endpoint; | |
430 | str += "\nTopic: " + topic; | |
431 | str += "\nExchange: " + exchange; | |
eafe8130 TL |
432 | return str; |
433 | } | |
11fdf7f2 TL |
434 | }; |
435 | ||
436 | static const std::string AMQP_0_9_1("0-9-1"); | |
437 | static const std::string AMQP_1_0("1-0"); | |
eafe8130 | 438 | static const std::string AMQP_SCHEMA("amqp"); |
11fdf7f2 TL |
439 | #endif // ifdef WITH_RADOSGW_AMQP_ENDPOINT |
440 | ||
9f95a23c TL |
441 | |
442 | #ifdef WITH_RADOSGW_KAFKA_ENDPOINT | |
443 | class RGWPubSubKafkaEndpoint : public RGWPubSubEndpoint { | |
444 | private: | |
445 | enum class ack_level_t { | |
446 | None, | |
447 | Broker, | |
448 | }; | |
449 | CephContext* const cct; | |
450 | const std::string topic; | |
451 | kafka::connection_ptr_t conn; | |
452 | const ack_level_t ack_level; | |
453 | ||
9f95a23c | 454 | |
e306af50 | 455 | ack_level_t get_ack_level(const RGWHTTPArgs& args) { |
9f95a23c | 456 | bool exists; |
20effc67 | 457 | const auto& str_ack_level = args.get("kafka-ack-level", &exists); |
9f95a23c TL |
458 | if (!exists || str_ack_level == "broker") { |
459 | // "broker" is default | |
460 | return ack_level_t::Broker; | |
461 | } | |
462 | if (str_ack_level == "none") { | |
463 | return ack_level_t::None; | |
464 | } | |
465 | throw configuration_error("Kafka: invalid kafka-ack-level: " + str_ack_level); | |
466 | } | |
467 | ||
468 | // NoAckPublishCR implements async kafka publishing via coroutine | |
469 | // This coroutine ends when it send the message and does not wait for an ack | |
470 | class NoAckPublishCR : public RGWCoroutine { | |
471 | private: | |
472 | const std::string topic; | |
473 | kafka::connection_ptr_t conn; | |
474 | const std::string message; | |
475 | ||
476 | public: | |
477 | NoAckPublishCR(CephContext* cct, | |
478 | const std::string& _topic, | |
479 | kafka::connection_ptr_t& _conn, | |
480 | const std::string& _message) : | |
481 | RGWCoroutine(cct), | |
482 | topic(_topic), conn(_conn), message(_message) {} | |
483 | ||
484 | // send message to endpoint, without waiting for reply | |
b3b6e05e | 485 | int operate(const DoutPrefixProvider *dpp) override { |
9f95a23c TL |
486 | reenter(this) { |
487 | const auto rc = kafka::publish(conn, topic, message); | |
488 | if (rc < 0) { | |
489 | return set_cr_error(rc); | |
490 | } | |
491 | return set_cr_done(); | |
492 | } | |
493 | return 0; | |
494 | } | |
495 | }; | |
496 | ||
497 | // AckPublishCR implements async kafka publishing via coroutine | |
498 | // This coroutine ends when an ack is received from the borker | |
499 | // note that it does not wait for an ack fron the end client | |
500 | class AckPublishCR : public RGWCoroutine, public RGWIOProvider { | |
501 | private: | |
502 | const std::string topic; | |
503 | kafka::connection_ptr_t conn; | |
504 | const std::string message; | |
505 | ||
506 | public: | |
507 | AckPublishCR(CephContext* cct, | |
508 | const std::string& _topic, | |
509 | kafka::connection_ptr_t& _conn, | |
510 | const std::string& _message) : | |
511 | RGWCoroutine(cct), | |
512 | topic(_topic), conn(_conn), message(_message) {} | |
513 | ||
514 | // send message to endpoint, waiting for reply | |
b3b6e05e | 515 | int operate(const DoutPrefixProvider *dpp) override { |
9f95a23c TL |
516 | reenter(this) { |
517 | yield { | |
518 | init_new_io(this); | |
519 | const auto rc = kafka::publish_with_confirm(conn, | |
520 | topic, | |
521 | message, | |
522 | std::bind(&AckPublishCR::request_complete, this, std::placeholders::_1)); | |
523 | if (rc < 0) { | |
524 | // failed to publish, does not wait for reply | |
525 | return set_cr_error(rc); | |
526 | } | |
527 | // mark as blocked on the kafka answer | |
528 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending); | |
529 | io_block(); | |
530 | return 0; | |
531 | } | |
532 | return set_cr_done(); | |
533 | } | |
534 | return 0; | |
535 | } | |
536 | ||
537 | // callback invoked from the kafka manager thread when ack/nack is received | |
538 | void request_complete(int status) { | |
539 | ceph_assert(!is_done()); | |
540 | if (status != 0) { | |
541 | // server replied with a nack | |
542 | set_cr_error(status); | |
543 | } | |
544 | io_complete(); | |
545 | if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending); | |
546 | } | |
547 | ||
548 | // TODO: why are these mandatory in RGWIOProvider? | |
549 | void set_io_user_info(void *_user_info) override { | |
550 | } | |
551 | ||
552 | void *get_io_user_info() override { | |
553 | return nullptr; | |
554 | } | |
555 | }; | |
556 | ||
557 | public: | |
558 | RGWPubSubKafkaEndpoint(const std::string& _endpoint, | |
559 | const std::string& _topic, | |
560 | const RGWHTTPArgs& args, | |
561 | CephContext* _cct) : | |
562 | cct(_cct), | |
563 | topic(_topic), | |
20effc67 | 564 | conn(kafka::connect(_endpoint, get_bool(args, "use-ssl", false), get_bool(args, "verify-ssl", true), args.get_optional("ca-location"))) , |
9f95a23c TL |
565 | ack_level(get_ack_level(args)) { |
566 | if (!conn) { | |
567 | throw configuration_error("Kafka: failed to create connection to: " + _endpoint); | |
568 | } | |
569 | } | |
570 | ||
571 | RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override { | |
572 | ceph_assert(conn); | |
573 | if (ack_level == ack_level_t::None) { | |
574 | return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event)); | |
575 | } else { | |
576 | return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event)); | |
577 | } | |
578 | } | |
579 | ||
f67539c2 | 580 | RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) override { |
9f95a23c TL |
581 | ceph_assert(conn); |
582 | if (ack_level == ack_level_t::None) { | |
f67539c2 | 583 | return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event)); |
9f95a23c | 584 | } else { |
f67539c2 | 585 | return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event)); |
9f95a23c TL |
586 | } |
587 | } | |
588 | ||
589 | // this allows waiting untill "finish()" is called from a different thread | |
590 | // waiting could be blocking the waiting thread or yielding, depending | |
591 | // with compilation flag support and whether the optional_yield is set | |
592 | class Waiter { | |
593 | using Signature = void(boost::system::error_code); | |
594 | using Completion = ceph::async::Completion<Signature>; | |
595 | std::unique_ptr<Completion> completion = nullptr; | |
596 | int ret; | |
597 | ||
598 | mutable std::atomic<bool> done = false; | |
599 | mutable std::mutex lock; | |
600 | mutable std::condition_variable cond; | |
601 | ||
602 | template <typename ExecutionContext, typename CompletionToken> | |
603 | auto async_wait(ExecutionContext& ctx, CompletionToken&& token) { | |
604 | boost::asio::async_completion<CompletionToken, Signature> init(token); | |
605 | auto& handler = init.completion_handler; | |
606 | { | |
607 | std::unique_lock l{lock}; | |
608 | completion = Completion::create(ctx.get_executor(), std::move(handler)); | |
609 | } | |
610 | return init.result.get(); | |
611 | } | |
612 | ||
613 | public: | |
614 | int wait(optional_yield y) { | |
615 | if (done) { | |
616 | return ret; | |
617 | } | |
9f95a23c TL |
618 | if (y) { |
619 | auto& io_ctx = y.get_io_context(); | |
620 | auto& yield_ctx = y.get_yield_context(); | |
621 | boost::system::error_code ec; | |
622 | async_wait(io_ctx, yield_ctx[ec]); | |
623 | return -ec.value(); | |
624 | } | |
9f95a23c TL |
625 | std::unique_lock l(lock); |
626 | cond.wait(l, [this]{return (done==true);}); | |
627 | return ret; | |
628 | } | |
629 | ||
630 | void finish(int r) { | |
631 | std::unique_lock l{lock}; | |
632 | ret = r; | |
633 | done = true; | |
634 | if (completion) { | |
635 | boost::system::error_code ec(-ret, boost::system::system_category()); | |
636 | Completion::post(std::move(completion), ec); | |
637 | } else { | |
638 | cond.notify_all(); | |
639 | } | |
640 | } | |
641 | }; | |
642 | ||
f67539c2 | 643 | int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override { |
9f95a23c TL |
644 | ceph_assert(conn); |
645 | if (ack_level == ack_level_t::None) { | |
f67539c2 | 646 | return kafka::publish(conn, topic, json_format_pubsub_event(event)); |
9f95a23c TL |
647 | } else { |
648 | // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine | |
649 | auto w = std::unique_ptr<Waiter>(new Waiter); | |
650 | const auto rc = kafka::publish_with_confirm(conn, | |
651 | topic, | |
f67539c2 | 652 | json_format_pubsub_event(event), |
9f95a23c TL |
653 | std::bind(&Waiter::finish, w.get(), std::placeholders::_1)); |
654 | if (rc < 0) { | |
655 | // failed to publish, does not wait for reply | |
656 | return rc; | |
657 | } | |
658 | return w->wait(y); | |
659 | } | |
660 | } | |
661 | ||
662 | std::string to_str() const override { | |
663 | std::string str("Kafka Endpoint"); | |
664 | str += kafka::to_string(conn); | |
665 | str += "\nTopic: " + topic; | |
666 | return str; | |
667 | } | |
668 | }; | |
669 | ||
670 | static const std::string KAFKA_SCHEMA("kafka"); | |
671 | #endif // ifdef WITH_RADOSGW_KAFKA_ENDPOINT | |
672 | ||
eafe8130 TL |
673 | static const std::string WEBHOOK_SCHEMA("webhook"); |
674 | static const std::string UNKNOWN_SCHEMA("unknown"); | |
675 | static const std::string NO_SCHEMA(""); | |
676 | ||
677 | const std::string& get_schema(const std::string& endpoint) { | |
678 | if (endpoint.empty()) { | |
679 | return NO_SCHEMA; | |
680 | } | |
11fdf7f2 TL |
681 | const auto pos = endpoint.find(':'); |
682 | if (pos == std::string::npos) { | |
eafe8130 | 683 | return UNKNOWN_SCHEMA; |
11fdf7f2 TL |
684 | } |
685 | const auto& schema = endpoint.substr(0,pos); | |
686 | if (schema == "http" || schema == "https") { | |
eafe8130 | 687 | return WEBHOOK_SCHEMA; |
11fdf7f2 | 688 | #ifdef WITH_RADOSGW_AMQP_ENDPOINT |
f67539c2 | 689 | } else if (schema == "amqp" || schema == "amqps") { |
eafe8130 | 690 | return AMQP_SCHEMA; |
9f95a23c TL |
691 | #endif |
692 | #ifdef WITH_RADOSGW_KAFKA_ENDPOINT | |
693 | } else if (schema == "kafka") { | |
694 | return KAFKA_SCHEMA; | |
eafe8130 TL |
695 | #endif |
696 | } | |
697 | return UNKNOWN_SCHEMA; | |
698 | } | |
699 | ||
700 | RGWPubSubEndpoint::Ptr RGWPubSubEndpoint::create(const std::string& endpoint, | |
701 | const std::string& topic, | |
702 | const RGWHTTPArgs& args, | |
703 | CephContext* cct) { | |
704 | const auto& schema = get_schema(endpoint); | |
705 | if (schema == WEBHOOK_SCHEMA) { | |
706 | return Ptr(new RGWPubSubHTTPEndpoint(endpoint, args)); | |
707 | #ifdef WITH_RADOSGW_AMQP_ENDPOINT | |
708 | } else if (schema == AMQP_SCHEMA) { | |
11fdf7f2 TL |
709 | bool exists; |
710 | std::string version = args.get("amqp-version", &exists); | |
711 | if (!exists) { | |
712 | version = AMQP_0_9_1; | |
713 | } | |
714 | if (version == AMQP_0_9_1) { | |
eafe8130 | 715 | return Ptr(new RGWPubSubAMQPEndpoint(endpoint, topic, args, cct)); |
11fdf7f2 | 716 | } else if (version == AMQP_1_0) { |
eafe8130 | 717 | throw configuration_error("AMQP: v1.0 not supported"); |
11fdf7f2 TL |
718 | return nullptr; |
719 | } else { | |
eafe8130 | 720 | throw configuration_error("AMQP: unknown version: " + version); |
11fdf7f2 TL |
721 | return nullptr; |
722 | } | |
9f95a23c TL |
723 | #endif |
724 | #ifdef WITH_RADOSGW_KAFKA_ENDPOINT | |
725 | } else if (schema == KAFKA_SCHEMA) { | |
726 | return Ptr(new RGWPubSubKafkaEndpoint(endpoint, topic, args, cct)); | |
11fdf7f2 TL |
727 | #endif |
728 | } | |
729 | ||
eafe8130 | 730 | throw configuration_error("unknown schema in: " + endpoint); |
11fdf7f2 TL |
731 | return nullptr; |
732 | } | |
733 |