]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_pubsub_push.cc
import ceph 14.2.5
[ceph.git] / ceph / src / rgw / rgw_pubsub_push.cc
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "rgw_pubsub_push.h"
5#include <string>
6#include <sstream>
7#include <algorithm>
8#include "include/buffer_fwd.h"
9#include "common/Formatter.h"
eafe8130 10#include "common/async/completion.h"
11fdf7f2
TL
11#include "rgw_common.h"
12#include "rgw_data_sync.h"
13#include "rgw_pubsub.h"
14#include "acconfig.h"
15#ifdef WITH_RADOSGW_AMQP_ENDPOINT
16#include "rgw_amqp.h"
17#endif
18#include <boost/asio/yield.hpp>
19#include <boost/algorithm/string.hpp>
20#include <functional>
21#include "rgw_perf_counters.h"
22
23using namespace rgw;
24
eafe8130
TL
25template<typename EventType>
26std::string json_format_pubsub_event(const EventType& event) {
11fdf7f2
TL
27 std::stringstream ss;
28 JSONFormatter f(false);
eafe8130 29 encode_json(EventType::json_type_single, event, &f);
11fdf7f2
TL
30 f.flush(ss);
31 return ss.str();
32}
33
34class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint {
35private:
36 const std::string endpoint;
37 std::string str_ack_level;
38 typedef unsigned ack_level_t;
39 ack_level_t ack_level; // TODO: not used for now
40 bool verify_ssl;
41 static const ack_level_t ACK_LEVEL_ANY = 0;
42 static const ack_level_t ACK_LEVEL_NON_ERROR = 1;
43
44 // PostCR implements async execution of RGWPostHTTPData via coroutine
45 class PostCR : public RGWPostHTTPData, public RGWSimpleCoroutine {
46 private:
47 RGWDataSyncEnv* const sync_env;
48 bufferlist read_bl;
49 const ack_level_t ack_level;
50
51 public:
52 PostCR(const std::string& _post_data,
53 RGWDataSyncEnv* _sync_env,
54 const std::string& endpoint,
55 ack_level_t _ack_level,
56 bool verify_ssl) :
57 RGWPostHTTPData(_sync_env->cct, "POST", endpoint, &read_bl, verify_ssl),
58 RGWSimpleCoroutine(_sync_env->cct),
59 sync_env(_sync_env),
60 ack_level (_ack_level) {
61 // ctor also set the data to send
62 set_post_data(_post_data);
63 set_send_length(_post_data.length());
64 }
65
66 // send message to endpoint
67 int send_request() override {
68 init_new_io(this);
69 const auto rc = sync_env->http_manager->add_request(this);
70 if (rc < 0) {
71 return rc;
72 }
73 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
74 return 0;
75 }
76
77 // wait for reply
78 int request_complete() override {
79 if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
80 if (ack_level == ACK_LEVEL_ANY) {
81 return 0;
82 } else if (ack_level == ACK_LEVEL_NON_ERROR) {
83 // TODO check result code to be non-error
84 } else {
85 // TODO: check that result code == ack_level
86 }
87 return -1;
88 }
89 };
90
91public:
92 RGWPubSubHTTPEndpoint(const std::string& _endpoint,
eafe8130
TL
93 const RGWHTTPArgs& args) : endpoint(_endpoint) {
94 bool exists;
11fdf7f2 95
eafe8130
TL
96 str_ack_level = args.get("http-ack-level", &exists);
97 if (!exists || str_ack_level == "any") {
98 // "any" is default
99 ack_level = ACK_LEVEL_ANY;
100 } else if (str_ack_level == "non-error") {
101 ack_level = ACK_LEVEL_NON_ERROR;
102 } else {
103 ack_level = std::atoi(str_ack_level.c_str());
104 if (ack_level < 100 || ack_level >= 600) {
105 throw configuration_error("HTTP/S: invalid http-ack-level: " + str_ack_level);
11fdf7f2
TL
106 }
107 }
108
eafe8130
TL
109 auto str_verify_ssl = args.get("verify-ssl", &exists);
110 boost::algorithm::to_lower(str_verify_ssl);
111 // verify server certificate by default
112 if (!exists || str_verify_ssl == "true") {
113 verify_ssl = true;
114 } else if (str_verify_ssl == "false") {
115 verify_ssl = false;
116 } else {
117 throw configuration_error("HTTP/S: verify-ssl must be true/false, not: " + str_verify_ssl);
118 }
119 }
120
11fdf7f2
TL
121 RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
122 return new PostCR(json_format_pubsub_event(event), env, endpoint, ack_level, verify_ssl);
123 }
124
eafe8130
TL
125 RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override {
126 return new PostCR(json_format_pubsub_event(record), env, endpoint, ack_level, verify_ssl);
127 }
128
129 int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override {
130 bufferlist read_bl;
131 RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl);
132 const auto post_data = json_format_pubsub_event(record);
133 request.set_post_data(post_data);
134 request.set_send_length(post_data.length());
135 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
136 const auto rc = RGWHTTP::process(&request, y);
137 if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
138 // TODO: use read_bl to process return code and handle according to ack level
139 return rc;
140 }
141
11fdf7f2 142 std::string to_str() const override {
eafe8130 143 std::string str("HTTP/S Endpoint");
11fdf7f2
TL
144 str += "\nURI: " + endpoint;
145 str += "\nAck Level: " + str_ack_level;
146 str += (verify_ssl ? "\nverify SSL" : "\ndon't verify SSL");
147 return str;
148
149 }
150};
151
152#ifdef WITH_RADOSGW_AMQP_ENDPOINT
153class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint {
eafe8130
TL
154private:
155 enum ack_level_t {
156 ACK_LEVEL_NONE,
157 ACK_LEVEL_BROKER,
158 ACK_LEVEL_ROUTEABLE
159 };
160 CephContext* const cct;
161 const std::string endpoint;
162 const std::string topic;
163 const std::string exchange;
164 amqp::connection_ptr_t conn;
165 ack_level_t ack_level;
166 std::string str_ack_level;
167
168 static std::string get_exchange(const RGWHTTPArgs& args) {
169 bool exists;
170 const auto exchange = args.get("amqp-exchange", &exists);
171 if (!exists) {
172 throw configuration_error("AMQP: missing amqp-exchange");
11fdf7f2 173 }
eafe8130
TL
174 return exchange;
175 }
11fdf7f2
TL
176
177 // NoAckPublishCR implements async amqp publishing via coroutine
178 // This coroutine ends when it send the message and does not wait for an ack
179 class NoAckPublishCR : public RGWCoroutine {
180 private:
11fdf7f2
TL
181 const std::string topic;
182 amqp::connection_ptr_t conn;
183 const std::string message;
184
185 public:
eafe8130 186 NoAckPublishCR(CephContext* cct,
11fdf7f2
TL
187 const std::string& _topic,
188 amqp::connection_ptr_t& _conn,
189 const std::string& _message) :
eafe8130 190 RGWCoroutine(cct),
11fdf7f2
TL
191 topic(_topic), conn(_conn), message(_message) {}
192
193 // send message to endpoint, without waiting for reply
194 int operate() override {
195 reenter(this) {
196 const auto rc = amqp::publish(conn, topic, message);
197 if (rc < 0) {
198 return set_cr_error(rc);
199 }
200 return set_cr_done();
201 }
202 return 0;
203 }
204 };
205
206 // AckPublishCR implements async amqp publishing via coroutine
207 // This coroutine ends when an ack is received from the borker
208 // note that it does not wait for an ack fron the end client
209 class AckPublishCR : public RGWCoroutine, public RGWIOProvider {
210 private:
11fdf7f2
TL
211 const std::string topic;
212 amqp::connection_ptr_t conn;
213 const std::string message;
214 const ack_level_t ack_level; // TODO not used for now
215
216 public:
eafe8130 217 AckPublishCR(CephContext* cct,
11fdf7f2
TL
218 const std::string& _topic,
219 amqp::connection_ptr_t& _conn,
220 const std::string& _message,
221 ack_level_t _ack_level) :
eafe8130 222 RGWCoroutine(cct),
11fdf7f2
TL
223 topic(_topic), conn(_conn), message(_message), ack_level(_ack_level) {}
224
225 // send message to endpoint, waiting for reply
226 int operate() override {
227 reenter(this) {
228 yield {
229 init_new_io(this);
230 const auto rc = amqp::publish_with_confirm(conn,
231 topic,
232 message,
233 std::bind(&AckPublishCR::request_complete, this, std::placeholders::_1));
234 if (rc < 0) {
235 // failed to publish, does not wait for reply
236 return set_cr_error(rc);
237 }
238 // mark as blocked on the amqp answer
239 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
240 io_block();
241 return 0;
242 }
243 return set_cr_done();
244 }
245 return 0;
246 }
247
248 // callback invoked from the amqp manager thread when ack/nack is received
249 void request_complete(int status) {
250 ceph_assert(!is_done());
251 if (status != 0) {
252 // server replied with a nack
253 set_cr_error(status);
254 }
255 io_complete();
256 if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
257 }
258
259 // TODO: why are these mandatory in RGWIOProvider?
260 void set_io_user_info(void *_user_info) override {
261 }
262
263 void *get_io_user_info() override {
264 return nullptr;
265 }
266 };
267
eafe8130
TL
268public:
269 RGWPubSubAMQPEndpoint(const std::string& _endpoint,
270 const std::string& _topic,
271 const RGWHTTPArgs& args,
272 CephContext* _cct) :
273 cct(_cct),
274 endpoint(_endpoint),
275 topic(_topic),
276 exchange(get_exchange(args)),
277 conn(amqp::connect(endpoint, exchange)) {
278 if (!conn) {
279 throw configuration_error("AMQP: failed to create connection to: " + endpoint);
280 }
281 bool exists;
282 // get ack level
283 str_ack_level = args.get("amqp-ack-level", &exists);
284 if (!exists || str_ack_level == "broker") {
285 // "broker" is default
286 ack_level = ACK_LEVEL_BROKER;
287 } else if (str_ack_level == "none") {
288 ack_level = ACK_LEVEL_NONE;
289 } else if (str_ack_level == "routable") {
290 ack_level = ACK_LEVEL_ROUTEABLE;
291 } else {
292 throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level);
293 }
294 }
295
296 RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
297 ceph_assert(conn);
298 if (ack_level == ACK_LEVEL_NONE) {
299 return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
300 } else {
301 // TODO: currently broker and routable are the same - this will require different flags
302 // but the same mechanism
303 return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event), ack_level);
304 }
305 }
306
307 RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override {
308 ceph_assert(conn);
309 if (ack_level == ACK_LEVEL_NONE) {
310 return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(record));
311 } else {
312 // TODO: currently broker and routable are the same - this will require different flags
313 // but the same mechanism
314 return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(record), ack_level);
315 }
316 }
317
318 // this allows waiting untill "finish()" is called from a different thread
319 // waiting could be blocking the waiting thread or yielding, depending
320 // with compilation flag support and whether the optional_yield is set
321 class Waiter {
322 using Signature = void(boost::system::error_code);
323 using Completion = ceph::async::Completion<Signature>;
324 std::unique_ptr<Completion> completion = nullptr;
325 int ret;
326
327 mutable std::atomic<bool> done = false;
328 mutable std::mutex lock;
329 mutable std::condition_variable cond;
330
331 template <typename ExecutionContext, typename CompletionToken>
332 auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
333 boost::asio::async_completion<CompletionToken, Signature> init(token);
334 auto& handler = init.completion_handler;
335 {
336 std::unique_lock l{lock};
337 completion = Completion::create(ctx.get_executor(), std::move(handler));
338 }
339 return init.result.get();
340 }
341
11fdf7f2 342 public:
eafe8130
TL
343 int wait(optional_yield y) {
344 if (done) {
345 return ret;
346 }
347#ifdef HAVE_BOOST_CONTEXT
348 if (y) {
349 auto& io_ctx = y.get_io_context();
350 auto& yield_ctx = y.get_yield_context();
351 boost::system::error_code ec;
352 async_wait(io_ctx, yield_ctx[ec]);
353 return -ec.value();
11fdf7f2 354 }
eafe8130
TL
355#endif
356 std::unique_lock l(lock);
357 cond.wait(l, [this]{return (done==true);});
358 return ret;
11fdf7f2
TL
359 }
360
eafe8130
TL
361 void finish(int r) {
362 std::unique_lock l{lock};
363 ret = r;
364 done = true;
365 if (completion) {
366 boost::system::error_code ec(-ret, boost::system::system_category());
367 Completion::post(std::move(completion), ec);
11fdf7f2 368 } else {
eafe8130 369 cond.notify_all();
11fdf7f2
TL
370 }
371 }
eafe8130 372 };
11fdf7f2 373
eafe8130
TL
374 int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override {
375 ceph_assert(conn);
376 if (ack_level == ACK_LEVEL_NONE) {
377 return amqp::publish(conn, topic, json_format_pubsub_event(record));
378 } else {
379 // TODO: currently broker and routable are the same - this will require different flags but the same mechanism
380 // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
381 auto w = std::unique_ptr<Waiter>(new Waiter);
382 const auto rc = amqp::publish_with_confirm(conn,
383 topic,
384 json_format_pubsub_event(record),
385 std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
386 if (rc < 0) {
387 // failed to publish, does not wait for reply
388 return rc;
389 }
390 return w->wait(y);
11fdf7f2 391 }
eafe8130
TL
392 }
393
394 std::string to_str() const override {
395 std::string str("AMQP(0.9.1) Endpoint");
396 str += "\nURI: " + endpoint;
397 str += "\nTopic: " + topic;
398 str += "\nExchange: " + exchange;
399 str += "\nAck Level: " + str_ack_level;
400 return str;
401 }
11fdf7f2
TL
402};
403
404static const std::string AMQP_0_9_1("0-9-1");
405static const std::string AMQP_1_0("1-0");
eafe8130 406static const std::string AMQP_SCHEMA("amqp");
11fdf7f2
TL
407#endif // ifdef WITH_RADOSGW_AMQP_ENDPOINT
408
eafe8130
TL
409static const std::string WEBHOOK_SCHEMA("webhook");
410static const std::string UNKNOWN_SCHEMA("unknown");
411static const std::string NO_SCHEMA("");
412
413const std::string& get_schema(const std::string& endpoint) {
414 if (endpoint.empty()) {
415 return NO_SCHEMA;
416 }
11fdf7f2
TL
417 const auto pos = endpoint.find(':');
418 if (pos == std::string::npos) {
eafe8130 419 return UNKNOWN_SCHEMA;
11fdf7f2
TL
420 }
421 const auto& schema = endpoint.substr(0,pos);
422 if (schema == "http" || schema == "https") {
eafe8130 423 return WEBHOOK_SCHEMA;
11fdf7f2
TL
424#ifdef WITH_RADOSGW_AMQP_ENDPOINT
425 } else if (schema == "amqp") {
eafe8130
TL
426 return AMQP_SCHEMA;
427#endif
428 }
429 return UNKNOWN_SCHEMA;
430}
431
432RGWPubSubEndpoint::Ptr RGWPubSubEndpoint::create(const std::string& endpoint,
433 const std::string& topic,
434 const RGWHTTPArgs& args,
435 CephContext* cct) {
436 const auto& schema = get_schema(endpoint);
437 if (schema == WEBHOOK_SCHEMA) {
438 return Ptr(new RGWPubSubHTTPEndpoint(endpoint, args));
439#ifdef WITH_RADOSGW_AMQP_ENDPOINT
440 } else if (schema == AMQP_SCHEMA) {
11fdf7f2
TL
441 bool exists;
442 std::string version = args.get("amqp-version", &exists);
443 if (!exists) {
444 version = AMQP_0_9_1;
445 }
446 if (version == AMQP_0_9_1) {
eafe8130 447 return Ptr(new RGWPubSubAMQPEndpoint(endpoint, topic, args, cct));
11fdf7f2 448 } else if (version == AMQP_1_0) {
eafe8130 449 throw configuration_error("AMQP: v1.0 not supported");
11fdf7f2
TL
450 return nullptr;
451 } else {
eafe8130 452 throw configuration_error("AMQP: unknown version: " + version);
11fdf7f2
TL
453 return nullptr;
454 }
455 } else if (schema == "amqps") {
eafe8130 456 throw configuration_error("AMQP: ssl not supported");
11fdf7f2
TL
457 return nullptr;
458#endif
459 }
460
eafe8130 461 throw configuration_error("unknown schema in: " + endpoint);
11fdf7f2
TL
462 return nullptr;
463}
464