]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "rgw_pubsub_push.h" | |
5 | #include <string> | |
6 | #include <sstream> | |
7 | #include <algorithm> | |
8 | #include "include/buffer_fwd.h" | |
9 | #include "common/Formatter.h" | |
eafe8130 | 10 | #include "common/async/completion.h" |
11fdf7f2 TL |
11 | #include "rgw_common.h" |
12 | #include "rgw_data_sync.h" | |
13 | #include "rgw_pubsub.h" | |
14 | #include "acconfig.h" | |
15 | #ifdef WITH_RADOSGW_AMQP_ENDPOINT | |
16 | #include "rgw_amqp.h" | |
17 | #endif | |
18 | #include <boost/asio/yield.hpp> | |
19 | #include <boost/algorithm/string.hpp> | |
20 | #include <functional> | |
21 | #include "rgw_perf_counters.h" | |
22 | ||
23 | using namespace rgw; | |
24 | ||
eafe8130 TL |
25 | template<typename EventType> |
26 | std::string json_format_pubsub_event(const EventType& event) { | |
11fdf7f2 TL |
27 | std::stringstream ss; |
28 | JSONFormatter f(false); | |
eafe8130 | 29 | encode_json(EventType::json_type_single, event, &f); |
11fdf7f2 TL |
30 | f.flush(ss); |
31 | return ss.str(); | |
32 | } | |
33 | ||
34 | class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint { | |
35 | private: | |
36 | const std::string endpoint; | |
37 | std::string str_ack_level; | |
38 | typedef unsigned ack_level_t; | |
39 | ack_level_t ack_level; // TODO: not used for now | |
40 | bool verify_ssl; | |
41 | static const ack_level_t ACK_LEVEL_ANY = 0; | |
42 | static const ack_level_t ACK_LEVEL_NON_ERROR = 1; | |
43 | ||
44 | // PostCR implements async execution of RGWPostHTTPData via coroutine | |
45 | class PostCR : public RGWPostHTTPData, public RGWSimpleCoroutine { | |
46 | private: | |
47 | RGWDataSyncEnv* const sync_env; | |
48 | bufferlist read_bl; | |
49 | const ack_level_t ack_level; | |
50 | ||
51 | public: | |
52 | PostCR(const std::string& _post_data, | |
53 | RGWDataSyncEnv* _sync_env, | |
54 | const std::string& endpoint, | |
55 | ack_level_t _ack_level, | |
56 | bool verify_ssl) : | |
57 | RGWPostHTTPData(_sync_env->cct, "POST", endpoint, &read_bl, verify_ssl), | |
58 | RGWSimpleCoroutine(_sync_env->cct), | |
59 | sync_env(_sync_env), | |
60 | ack_level (_ack_level) { | |
61 | // ctor also set the data to send | |
62 | set_post_data(_post_data); | |
63 | set_send_length(_post_data.length()); | |
64 | } | |
65 | ||
66 | // send message to endpoint | |
67 | int send_request() override { | |
68 | init_new_io(this); | |
69 | const auto rc = sync_env->http_manager->add_request(this); | |
70 | if (rc < 0) { | |
71 | return rc; | |
72 | } | |
73 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending); | |
74 | return 0; | |
75 | } | |
76 | ||
77 | // wait for reply | |
78 | int request_complete() override { | |
79 | if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending); | |
80 | if (ack_level == ACK_LEVEL_ANY) { | |
81 | return 0; | |
82 | } else if (ack_level == ACK_LEVEL_NON_ERROR) { | |
83 | // TODO check result code to be non-error | |
84 | } else { | |
85 | // TODO: check that result code == ack_level | |
86 | } | |
87 | return -1; | |
88 | } | |
89 | }; | |
90 | ||
91 | public: | |
92 | RGWPubSubHTTPEndpoint(const std::string& _endpoint, | |
eafe8130 TL |
93 | const RGWHTTPArgs& args) : endpoint(_endpoint) { |
94 | bool exists; | |
11fdf7f2 | 95 | |
eafe8130 TL |
96 | str_ack_level = args.get("http-ack-level", &exists); |
97 | if (!exists || str_ack_level == "any") { | |
98 | // "any" is default | |
99 | ack_level = ACK_LEVEL_ANY; | |
100 | } else if (str_ack_level == "non-error") { | |
101 | ack_level = ACK_LEVEL_NON_ERROR; | |
102 | } else { | |
103 | ack_level = std::atoi(str_ack_level.c_str()); | |
104 | if (ack_level < 100 || ack_level >= 600) { | |
105 | throw configuration_error("HTTP/S: invalid http-ack-level: " + str_ack_level); | |
11fdf7f2 TL |
106 | } |
107 | } | |
108 | ||
eafe8130 TL |
109 | auto str_verify_ssl = args.get("verify-ssl", &exists); |
110 | boost::algorithm::to_lower(str_verify_ssl); | |
111 | // verify server certificate by default | |
112 | if (!exists || str_verify_ssl == "true") { | |
113 | verify_ssl = true; | |
114 | } else if (str_verify_ssl == "false") { | |
115 | verify_ssl = false; | |
116 | } else { | |
117 | throw configuration_error("HTTP/S: verify-ssl must be true/false, not: " + str_verify_ssl); | |
118 | } | |
119 | } | |
120 | ||
11fdf7f2 TL |
121 | RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override { |
122 | return new PostCR(json_format_pubsub_event(event), env, endpoint, ack_level, verify_ssl); | |
123 | } | |
124 | ||
eafe8130 TL |
125 | RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override { |
126 | return new PostCR(json_format_pubsub_event(record), env, endpoint, ack_level, verify_ssl); | |
127 | } | |
128 | ||
129 | int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override { | |
130 | bufferlist read_bl; | |
131 | RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl); | |
132 | const auto post_data = json_format_pubsub_event(record); | |
133 | request.set_post_data(post_data); | |
134 | request.set_send_length(post_data.length()); | |
135 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending); | |
136 | const auto rc = RGWHTTP::process(&request, y); | |
137 | if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending); | |
138 | // TODO: use read_bl to process return code and handle according to ack level | |
139 | return rc; | |
140 | } | |
141 | ||
11fdf7f2 | 142 | std::string to_str() const override { |
eafe8130 | 143 | std::string str("HTTP/S Endpoint"); |
11fdf7f2 TL |
144 | str += "\nURI: " + endpoint; |
145 | str += "\nAck Level: " + str_ack_level; | |
146 | str += (verify_ssl ? "\nverify SSL" : "\ndon't verify SSL"); | |
147 | return str; | |
148 | ||
149 | } | |
150 | }; | |
151 | ||
152 | #ifdef WITH_RADOSGW_AMQP_ENDPOINT | |
153 | class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint { | |
eafe8130 TL |
154 | private: |
155 | enum ack_level_t { | |
156 | ACK_LEVEL_NONE, | |
157 | ACK_LEVEL_BROKER, | |
158 | ACK_LEVEL_ROUTEABLE | |
159 | }; | |
160 | CephContext* const cct; | |
161 | const std::string endpoint; | |
162 | const std::string topic; | |
163 | const std::string exchange; | |
164 | amqp::connection_ptr_t conn; | |
165 | ack_level_t ack_level; | |
166 | std::string str_ack_level; | |
167 | ||
168 | static std::string get_exchange(const RGWHTTPArgs& args) { | |
169 | bool exists; | |
170 | const auto exchange = args.get("amqp-exchange", &exists); | |
171 | if (!exists) { | |
172 | throw configuration_error("AMQP: missing amqp-exchange"); | |
11fdf7f2 | 173 | } |
eafe8130 TL |
174 | return exchange; |
175 | } | |
11fdf7f2 TL |
176 | |
177 | // NoAckPublishCR implements async amqp publishing via coroutine | |
178 | // This coroutine ends when it send the message and does not wait for an ack | |
179 | class NoAckPublishCR : public RGWCoroutine { | |
180 | private: | |
11fdf7f2 TL |
181 | const std::string topic; |
182 | amqp::connection_ptr_t conn; | |
183 | const std::string message; | |
184 | ||
185 | public: | |
eafe8130 | 186 | NoAckPublishCR(CephContext* cct, |
11fdf7f2 TL |
187 | const std::string& _topic, |
188 | amqp::connection_ptr_t& _conn, | |
189 | const std::string& _message) : | |
eafe8130 | 190 | RGWCoroutine(cct), |
11fdf7f2 TL |
191 | topic(_topic), conn(_conn), message(_message) {} |
192 | ||
193 | // send message to endpoint, without waiting for reply | |
194 | int operate() override { | |
195 | reenter(this) { | |
196 | const auto rc = amqp::publish(conn, topic, message); | |
197 | if (rc < 0) { | |
198 | return set_cr_error(rc); | |
199 | } | |
200 | return set_cr_done(); | |
201 | } | |
202 | return 0; | |
203 | } | |
204 | }; | |
205 | ||
206 | // AckPublishCR implements async amqp publishing via coroutine | |
207 | // This coroutine ends when an ack is received from the borker | |
208 | // note that it does not wait for an ack fron the end client | |
209 | class AckPublishCR : public RGWCoroutine, public RGWIOProvider { | |
210 | private: | |
11fdf7f2 TL |
211 | const std::string topic; |
212 | amqp::connection_ptr_t conn; | |
213 | const std::string message; | |
214 | const ack_level_t ack_level; // TODO not used for now | |
215 | ||
216 | public: | |
eafe8130 | 217 | AckPublishCR(CephContext* cct, |
11fdf7f2 TL |
218 | const std::string& _topic, |
219 | amqp::connection_ptr_t& _conn, | |
220 | const std::string& _message, | |
221 | ack_level_t _ack_level) : | |
eafe8130 | 222 | RGWCoroutine(cct), |
11fdf7f2 TL |
223 | topic(_topic), conn(_conn), message(_message), ack_level(_ack_level) {} |
224 | ||
225 | // send message to endpoint, waiting for reply | |
226 | int operate() override { | |
227 | reenter(this) { | |
228 | yield { | |
229 | init_new_io(this); | |
230 | const auto rc = amqp::publish_with_confirm(conn, | |
231 | topic, | |
232 | message, | |
233 | std::bind(&AckPublishCR::request_complete, this, std::placeholders::_1)); | |
234 | if (rc < 0) { | |
235 | // failed to publish, does not wait for reply | |
236 | return set_cr_error(rc); | |
237 | } | |
238 | // mark as blocked on the amqp answer | |
239 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending); | |
240 | io_block(); | |
241 | return 0; | |
242 | } | |
243 | return set_cr_done(); | |
244 | } | |
245 | return 0; | |
246 | } | |
247 | ||
248 | // callback invoked from the amqp manager thread when ack/nack is received | |
249 | void request_complete(int status) { | |
250 | ceph_assert(!is_done()); | |
251 | if (status != 0) { | |
252 | // server replied with a nack | |
253 | set_cr_error(status); | |
254 | } | |
255 | io_complete(); | |
256 | if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending); | |
257 | } | |
258 | ||
259 | // TODO: why are these mandatory in RGWIOProvider? | |
260 | void set_io_user_info(void *_user_info) override { | |
261 | } | |
262 | ||
263 | void *get_io_user_info() override { | |
264 | return nullptr; | |
265 | } | |
266 | }; | |
267 | ||
eafe8130 TL |
268 | public: |
269 | RGWPubSubAMQPEndpoint(const std::string& _endpoint, | |
270 | const std::string& _topic, | |
271 | const RGWHTTPArgs& args, | |
272 | CephContext* _cct) : | |
273 | cct(_cct), | |
274 | endpoint(_endpoint), | |
275 | topic(_topic), | |
276 | exchange(get_exchange(args)), | |
277 | conn(amqp::connect(endpoint, exchange)) { | |
278 | if (!conn) { | |
279 | throw configuration_error("AMQP: failed to create connection to: " + endpoint); | |
280 | } | |
281 | bool exists; | |
282 | // get ack level | |
283 | str_ack_level = args.get("amqp-ack-level", &exists); | |
284 | if (!exists || str_ack_level == "broker") { | |
285 | // "broker" is default | |
286 | ack_level = ACK_LEVEL_BROKER; | |
287 | } else if (str_ack_level == "none") { | |
288 | ack_level = ACK_LEVEL_NONE; | |
289 | } else if (str_ack_level == "routable") { | |
290 | ack_level = ACK_LEVEL_ROUTEABLE; | |
291 | } else { | |
292 | throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level); | |
293 | } | |
294 | } | |
295 | ||
296 | RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override { | |
297 | ceph_assert(conn); | |
298 | if (ack_level == ACK_LEVEL_NONE) { | |
299 | return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event)); | |
300 | } else { | |
301 | // TODO: currently broker and routable are the same - this will require different flags | |
302 | // but the same mechanism | |
303 | return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event), ack_level); | |
304 | } | |
305 | } | |
306 | ||
307 | RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override { | |
308 | ceph_assert(conn); | |
309 | if (ack_level == ACK_LEVEL_NONE) { | |
310 | return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(record)); | |
311 | } else { | |
312 | // TODO: currently broker and routable are the same - this will require different flags | |
313 | // but the same mechanism | |
314 | return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(record), ack_level); | |
315 | } | |
316 | } | |
317 | ||
318 | // this allows waiting untill "finish()" is called from a different thread | |
319 | // waiting could be blocking the waiting thread or yielding, depending | |
320 | // with compilation flag support and whether the optional_yield is set | |
321 | class Waiter { | |
322 | using Signature = void(boost::system::error_code); | |
323 | using Completion = ceph::async::Completion<Signature>; | |
324 | std::unique_ptr<Completion> completion = nullptr; | |
325 | int ret; | |
326 | ||
327 | mutable std::atomic<bool> done = false; | |
328 | mutable std::mutex lock; | |
329 | mutable std::condition_variable cond; | |
330 | ||
331 | template <typename ExecutionContext, typename CompletionToken> | |
332 | auto async_wait(ExecutionContext& ctx, CompletionToken&& token) { | |
333 | boost::asio::async_completion<CompletionToken, Signature> init(token); | |
334 | auto& handler = init.completion_handler; | |
335 | { | |
336 | std::unique_lock l{lock}; | |
337 | completion = Completion::create(ctx.get_executor(), std::move(handler)); | |
338 | } | |
339 | return init.result.get(); | |
340 | } | |
341 | ||
11fdf7f2 | 342 | public: |
eafe8130 TL |
343 | int wait(optional_yield y) { |
344 | if (done) { | |
345 | return ret; | |
346 | } | |
347 | #ifdef HAVE_BOOST_CONTEXT | |
348 | if (y) { | |
349 | auto& io_ctx = y.get_io_context(); | |
350 | auto& yield_ctx = y.get_yield_context(); | |
351 | boost::system::error_code ec; | |
352 | async_wait(io_ctx, yield_ctx[ec]); | |
353 | return -ec.value(); | |
11fdf7f2 | 354 | } |
eafe8130 TL |
355 | #endif |
356 | std::unique_lock l(lock); | |
357 | cond.wait(l, [this]{return (done==true);}); | |
358 | return ret; | |
11fdf7f2 TL |
359 | } |
360 | ||
eafe8130 TL |
361 | void finish(int r) { |
362 | std::unique_lock l{lock}; | |
363 | ret = r; | |
364 | done = true; | |
365 | if (completion) { | |
366 | boost::system::error_code ec(-ret, boost::system::system_category()); | |
367 | Completion::post(std::move(completion), ec); | |
11fdf7f2 | 368 | } else { |
eafe8130 | 369 | cond.notify_all(); |
11fdf7f2 TL |
370 | } |
371 | } | |
eafe8130 | 372 | }; |
11fdf7f2 | 373 | |
eafe8130 TL |
374 | int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override { |
375 | ceph_assert(conn); | |
376 | if (ack_level == ACK_LEVEL_NONE) { | |
377 | return amqp::publish(conn, topic, json_format_pubsub_event(record)); | |
378 | } else { | |
379 | // TODO: currently broker and routable are the same - this will require different flags but the same mechanism | |
380 | // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine | |
381 | auto w = std::unique_ptr<Waiter>(new Waiter); | |
382 | const auto rc = amqp::publish_with_confirm(conn, | |
383 | topic, | |
384 | json_format_pubsub_event(record), | |
385 | std::bind(&Waiter::finish, w.get(), std::placeholders::_1)); | |
386 | if (rc < 0) { | |
387 | // failed to publish, does not wait for reply | |
388 | return rc; | |
389 | } | |
390 | return w->wait(y); | |
11fdf7f2 | 391 | } |
eafe8130 TL |
392 | } |
393 | ||
394 | std::string to_str() const override { | |
395 | std::string str("AMQP(0.9.1) Endpoint"); | |
396 | str += "\nURI: " + endpoint; | |
397 | str += "\nTopic: " + topic; | |
398 | str += "\nExchange: " + exchange; | |
399 | str += "\nAck Level: " + str_ack_level; | |
400 | return str; | |
401 | } | |
11fdf7f2 TL |
402 | }; |
403 | ||
404 | static const std::string AMQP_0_9_1("0-9-1"); | |
405 | static const std::string AMQP_1_0("1-0"); | |
eafe8130 | 406 | static const std::string AMQP_SCHEMA("amqp"); |
11fdf7f2 TL |
407 | #endif // ifdef WITH_RADOSGW_AMQP_ENDPOINT |
408 | ||
eafe8130 TL |
409 | static const std::string WEBHOOK_SCHEMA("webhook"); |
410 | static const std::string UNKNOWN_SCHEMA("unknown"); | |
411 | static const std::string NO_SCHEMA(""); | |
412 | ||
413 | const std::string& get_schema(const std::string& endpoint) { | |
414 | if (endpoint.empty()) { | |
415 | return NO_SCHEMA; | |
416 | } | |
11fdf7f2 TL |
417 | const auto pos = endpoint.find(':'); |
418 | if (pos == std::string::npos) { | |
eafe8130 | 419 | return UNKNOWN_SCHEMA; |
11fdf7f2 TL |
420 | } |
421 | const auto& schema = endpoint.substr(0,pos); | |
422 | if (schema == "http" || schema == "https") { | |
eafe8130 | 423 | return WEBHOOK_SCHEMA; |
11fdf7f2 TL |
424 | #ifdef WITH_RADOSGW_AMQP_ENDPOINT |
425 | } else if (schema == "amqp") { | |
eafe8130 TL |
426 | return AMQP_SCHEMA; |
427 | #endif | |
428 | } | |
429 | return UNKNOWN_SCHEMA; | |
430 | } | |
431 | ||
432 | RGWPubSubEndpoint::Ptr RGWPubSubEndpoint::create(const std::string& endpoint, | |
433 | const std::string& topic, | |
434 | const RGWHTTPArgs& args, | |
435 | CephContext* cct) { | |
436 | const auto& schema = get_schema(endpoint); | |
437 | if (schema == WEBHOOK_SCHEMA) { | |
438 | return Ptr(new RGWPubSubHTTPEndpoint(endpoint, args)); | |
439 | #ifdef WITH_RADOSGW_AMQP_ENDPOINT | |
440 | } else if (schema == AMQP_SCHEMA) { | |
11fdf7f2 TL |
441 | bool exists; |
442 | std::string version = args.get("amqp-version", &exists); | |
443 | if (!exists) { | |
444 | version = AMQP_0_9_1; | |
445 | } | |
446 | if (version == AMQP_0_9_1) { | |
eafe8130 | 447 | return Ptr(new RGWPubSubAMQPEndpoint(endpoint, topic, args, cct)); |
11fdf7f2 | 448 | } else if (version == AMQP_1_0) { |
eafe8130 | 449 | throw configuration_error("AMQP: v1.0 not supported"); |
11fdf7f2 TL |
450 | return nullptr; |
451 | } else { | |
eafe8130 | 452 | throw configuration_error("AMQP: unknown version: " + version); |
11fdf7f2 TL |
453 | return nullptr; |
454 | } | |
455 | } else if (schema == "amqps") { | |
eafe8130 | 456 | throw configuration_error("AMQP: ssl not supported"); |
11fdf7f2 TL |
457 | return nullptr; |
458 | #endif | |
459 | } | |
460 | ||
eafe8130 | 461 | throw configuration_error("unknown schema in: " + endpoint); |
11fdf7f2 TL |
462 | return nullptr; |
463 | } | |
464 |