]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_pubsub_push.cc
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / rgw / rgw_pubsub_push.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "rgw_pubsub_push.h"
5 #include <string>
6 #include <sstream>
7 #include <algorithm>
8 #include "include/buffer_fwd.h"
9 #include "common/Formatter.h"
10 #include "common/async/completion.h"
11 #include "rgw_common.h"
12 #include "rgw_data_sync.h"
13 #include "rgw_pubsub.h"
14 #include "acconfig.h"
15 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
16 #include "rgw_amqp.h"
17 #endif
18 #include <boost/asio/yield.hpp>
19 #include <boost/algorithm/string.hpp>
20 #include <functional>
21 #include "rgw_perf_counters.h"
22
23 using namespace rgw;
24
25 template<typename EventType>
26 std::string json_format_pubsub_event(const EventType& event) {
27 std::stringstream ss;
28 JSONFormatter f(false);
29 {
30 Formatter::ObjectSection s(f, EventType::json_type_plural);
31 {
32 Formatter::ArraySection s(f, EventType::json_type_plural);
33 encode_json("", event, &f);
34 }
35 }
36 f.flush(ss);
37 return ss.str();
38 }
39
40 class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint {
41 private:
42 const std::string endpoint;
43 std::string str_ack_level;
44 typedef unsigned ack_level_t;
45 ack_level_t ack_level; // TODO: not used for now
46 bool verify_ssl;
47 static const ack_level_t ACK_LEVEL_ANY = 0;
48 static const ack_level_t ACK_LEVEL_NON_ERROR = 1;
49
50 // PostCR implements async execution of RGWPostHTTPData via coroutine
51 class PostCR : public RGWPostHTTPData, public RGWSimpleCoroutine {
52 private:
53 RGWDataSyncEnv* const sync_env;
54 bufferlist read_bl;
55 const ack_level_t ack_level;
56
57 public:
58 PostCR(const std::string& _post_data,
59 RGWDataSyncEnv* _sync_env,
60 const std::string& endpoint,
61 ack_level_t _ack_level,
62 bool verify_ssl) :
63 RGWPostHTTPData(_sync_env->cct, "POST", endpoint, &read_bl, verify_ssl),
64 RGWSimpleCoroutine(_sync_env->cct),
65 sync_env(_sync_env),
66 ack_level (_ack_level) {
67 // ctor also set the data to send
68 set_post_data(_post_data);
69 set_send_length(_post_data.length());
70 }
71
72 // send message to endpoint
73 int send_request() override {
74 init_new_io(this);
75 const auto rc = sync_env->http_manager->add_request(this);
76 if (rc < 0) {
77 return rc;
78 }
79 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
80 return 0;
81 }
82
83 // wait for reply
84 int request_complete() override {
85 if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
86 if (ack_level == ACK_LEVEL_ANY) {
87 return 0;
88 } else if (ack_level == ACK_LEVEL_NON_ERROR) {
89 // TODO check result code to be non-error
90 } else {
91 // TODO: check that result code == ack_level
92 }
93 return -1;
94 }
95 };
96
97 public:
98 RGWPubSubHTTPEndpoint(const std::string& _endpoint,
99 const RGWHTTPArgs& args) : endpoint(_endpoint) {
100 bool exists;
101
102 str_ack_level = args.get("http-ack-level", &exists);
103 if (!exists || str_ack_level == "any") {
104 // "any" is default
105 ack_level = ACK_LEVEL_ANY;
106 } else if (str_ack_level == "non-error") {
107 ack_level = ACK_LEVEL_NON_ERROR;
108 } else {
109 ack_level = std::atoi(str_ack_level.c_str());
110 if (ack_level < 100 || ack_level >= 600) {
111 throw configuration_error("HTTP/S: invalid http-ack-level: " + str_ack_level);
112 }
113 }
114
115 auto str_verify_ssl = args.get("verify-ssl", &exists);
116 boost::algorithm::to_lower(str_verify_ssl);
117 // verify server certificate by default
118 if (!exists || str_verify_ssl == "true") {
119 verify_ssl = true;
120 } else if (str_verify_ssl == "false") {
121 verify_ssl = false;
122 } else {
123 throw configuration_error("HTTP/S: verify-ssl must be true/false, not: " + str_verify_ssl);
124 }
125 }
126
127 RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
128 return new PostCR(json_format_pubsub_event(event), env, endpoint, ack_level, verify_ssl);
129 }
130
131 RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override {
132 return new PostCR(json_format_pubsub_event(record), env, endpoint, ack_level, verify_ssl);
133 }
134
135 int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override {
136 bufferlist read_bl;
137 RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl);
138 const auto post_data = json_format_pubsub_event(record);
139 request.set_post_data(post_data);
140 request.set_send_length(post_data.length());
141 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
142 const auto rc = RGWHTTP::process(&request, y);
143 if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
144 // TODO: use read_bl to process return code and handle according to ack level
145 return rc;
146 }
147
148 std::string to_str() const override {
149 std::string str("HTTP/S Endpoint");
150 str += "\nURI: " + endpoint;
151 str += "\nAck Level: " + str_ack_level;
152 str += (verify_ssl ? "\nverify SSL" : "\ndon't verify SSL");
153 return str;
154
155 }
156 };
157
158 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
159 class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint {
160 private:
161 enum ack_level_t {
162 ACK_LEVEL_NONE,
163 ACK_LEVEL_BROKER,
164 ACK_LEVEL_ROUTEABLE
165 };
166 CephContext* const cct;
167 const std::string endpoint;
168 const std::string topic;
169 const std::string exchange;
170 amqp::connection_ptr_t conn;
171 ack_level_t ack_level;
172 std::string str_ack_level;
173
174 static std::string get_exchange(const RGWHTTPArgs& args) {
175 bool exists;
176 const auto exchange = args.get("amqp-exchange", &exists);
177 if (!exists) {
178 throw configuration_error("AMQP: missing amqp-exchange");
179 }
180 return exchange;
181 }
182
183 // NoAckPublishCR implements async amqp publishing via coroutine
184 // This coroutine ends when it send the message and does not wait for an ack
185 class NoAckPublishCR : public RGWCoroutine {
186 private:
187 const std::string topic;
188 amqp::connection_ptr_t conn;
189 const std::string message;
190
191 public:
192 NoAckPublishCR(CephContext* cct,
193 const std::string& _topic,
194 amqp::connection_ptr_t& _conn,
195 const std::string& _message) :
196 RGWCoroutine(cct),
197 topic(_topic), conn(_conn), message(_message) {}
198
199 // send message to endpoint, without waiting for reply
200 int operate() override {
201 reenter(this) {
202 const auto rc = amqp::publish(conn, topic, message);
203 if (rc < 0) {
204 return set_cr_error(rc);
205 }
206 return set_cr_done();
207 }
208 return 0;
209 }
210 };
211
212 // AckPublishCR implements async amqp publishing via coroutine
213 // This coroutine ends when an ack is received from the borker
214 // note that it does not wait for an ack fron the end client
215 class AckPublishCR : public RGWCoroutine, public RGWIOProvider {
216 private:
217 const std::string topic;
218 amqp::connection_ptr_t conn;
219 const std::string message;
220 const ack_level_t ack_level; // TODO not used for now
221
222 public:
223 AckPublishCR(CephContext* cct,
224 const std::string& _topic,
225 amqp::connection_ptr_t& _conn,
226 const std::string& _message,
227 ack_level_t _ack_level) :
228 RGWCoroutine(cct),
229 topic(_topic), conn(_conn), message(_message), ack_level(_ack_level) {}
230
231 // send message to endpoint, waiting for reply
232 int operate() override {
233 reenter(this) {
234 yield {
235 init_new_io(this);
236 const auto rc = amqp::publish_with_confirm(conn,
237 topic,
238 message,
239 std::bind(&AckPublishCR::request_complete, this, std::placeholders::_1));
240 if (rc < 0) {
241 // failed to publish, does not wait for reply
242 return set_cr_error(rc);
243 }
244 // mark as blocked on the amqp answer
245 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
246 io_block();
247 return 0;
248 }
249 return set_cr_done();
250 }
251 return 0;
252 }
253
254 // callback invoked from the amqp manager thread when ack/nack is received
255 void request_complete(int status) {
256 ceph_assert(!is_done());
257 if (status != 0) {
258 // server replied with a nack
259 set_cr_error(status);
260 }
261 io_complete();
262 if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
263 }
264
265 // TODO: why are these mandatory in RGWIOProvider?
266 void set_io_user_info(void *_user_info) override {
267 }
268
269 void *get_io_user_info() override {
270 return nullptr;
271 }
272 };
273
274 public:
275 RGWPubSubAMQPEndpoint(const std::string& _endpoint,
276 const std::string& _topic,
277 const RGWHTTPArgs& args,
278 CephContext* _cct) :
279 cct(_cct),
280 endpoint(_endpoint),
281 topic(_topic),
282 exchange(get_exchange(args)),
283 conn(amqp::connect(endpoint, exchange)) {
284 if (!conn) {
285 throw configuration_error("AMQP: failed to create connection to: " + endpoint);
286 }
287 bool exists;
288 // get ack level
289 str_ack_level = args.get("amqp-ack-level", &exists);
290 if (!exists || str_ack_level == "broker") {
291 // "broker" is default
292 ack_level = ACK_LEVEL_BROKER;
293 } else if (str_ack_level == "none") {
294 ack_level = ACK_LEVEL_NONE;
295 } else if (str_ack_level == "routable") {
296 ack_level = ACK_LEVEL_ROUTEABLE;
297 } else {
298 throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level);
299 }
300 }
301
302 RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
303 ceph_assert(conn);
304 if (ack_level == ACK_LEVEL_NONE) {
305 return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
306 } else {
307 // TODO: currently broker and routable are the same - this will require different flags
308 // but the same mechanism
309 return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event), ack_level);
310 }
311 }
312
313 RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override {
314 ceph_assert(conn);
315 if (ack_level == ACK_LEVEL_NONE) {
316 return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(record));
317 } else {
318 // TODO: currently broker and routable are the same - this will require different flags
319 // but the same mechanism
320 return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(record), ack_level);
321 }
322 }
323
324 // this allows waiting untill "finish()" is called from a different thread
325 // waiting could be blocking the waiting thread or yielding, depending
326 // with compilation flag support and whether the optional_yield is set
327 class Waiter {
328 using Signature = void(boost::system::error_code);
329 using Completion = ceph::async::Completion<Signature>;
330 std::unique_ptr<Completion> completion = nullptr;
331 int ret;
332
333 mutable std::atomic<bool> done = false;
334 mutable std::mutex lock;
335 mutable std::condition_variable cond;
336
337 template <typename ExecutionContext, typename CompletionToken>
338 auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
339 boost::asio::async_completion<CompletionToken, Signature> init(token);
340 auto& handler = init.completion_handler;
341 {
342 std::unique_lock l{lock};
343 completion = Completion::create(ctx.get_executor(), std::move(handler));
344 }
345 return init.result.get();
346 }
347
348 public:
349 int wait(optional_yield y) {
350 if (done) {
351 return ret;
352 }
353 #ifdef HAVE_BOOST_CONTEXT
354 if (y) {
355 auto& io_ctx = y.get_io_context();
356 auto& yield_ctx = y.get_yield_context();
357 boost::system::error_code ec;
358 async_wait(io_ctx, yield_ctx[ec]);
359 return -ec.value();
360 }
361 #endif
362 std::unique_lock l(lock);
363 cond.wait(l, [this]{return (done==true);});
364 return ret;
365 }
366
367 void finish(int r) {
368 std::unique_lock l{lock};
369 ret = r;
370 done = true;
371 if (completion) {
372 boost::system::error_code ec(-ret, boost::system::system_category());
373 Completion::post(std::move(completion), ec);
374 } else {
375 cond.notify_all();
376 }
377 }
378 };
379
380 int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override {
381 ceph_assert(conn);
382 if (ack_level == ACK_LEVEL_NONE) {
383 return amqp::publish(conn, topic, json_format_pubsub_event(record));
384 } else {
385 // TODO: currently broker and routable are the same - this will require different flags but the same mechanism
386 // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
387 auto w = std::unique_ptr<Waiter>(new Waiter);
388 const auto rc = amqp::publish_with_confirm(conn,
389 topic,
390 json_format_pubsub_event(record),
391 std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
392 if (rc < 0) {
393 // failed to publish, does not wait for reply
394 return rc;
395 }
396 return w->wait(y);
397 }
398 }
399
400 std::string to_str() const override {
401 std::string str("AMQP(0.9.1) Endpoint");
402 str += "\nURI: " + endpoint;
403 str += "\nTopic: " + topic;
404 str += "\nExchange: " + exchange;
405 str += "\nAck Level: " + str_ack_level;
406 return str;
407 }
408 };
409
410 static const std::string AMQP_0_9_1("0-9-1");
411 static const std::string AMQP_1_0("1-0");
412 static const std::string AMQP_SCHEMA("amqp");
413 #endif // ifdef WITH_RADOSGW_AMQP_ENDPOINT
414
415 static const std::string WEBHOOK_SCHEMA("webhook");
416 static const std::string UNKNOWN_SCHEMA("unknown");
417 static const std::string NO_SCHEMA("");
418
419 const std::string& get_schema(const std::string& endpoint) {
420 if (endpoint.empty()) {
421 return NO_SCHEMA;
422 }
423 const auto pos = endpoint.find(':');
424 if (pos == std::string::npos) {
425 return UNKNOWN_SCHEMA;
426 }
427 const auto& schema = endpoint.substr(0,pos);
428 if (schema == "http" || schema == "https") {
429 return WEBHOOK_SCHEMA;
430 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
431 } else if (schema == "amqp") {
432 return AMQP_SCHEMA;
433 #endif
434 }
435 return UNKNOWN_SCHEMA;
436 }
437
438 RGWPubSubEndpoint::Ptr RGWPubSubEndpoint::create(const std::string& endpoint,
439 const std::string& topic,
440 const RGWHTTPArgs& args,
441 CephContext* cct) {
442 const auto& schema = get_schema(endpoint);
443 if (schema == WEBHOOK_SCHEMA) {
444 return Ptr(new RGWPubSubHTTPEndpoint(endpoint, args));
445 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
446 } else if (schema == AMQP_SCHEMA) {
447 bool exists;
448 std::string version = args.get("amqp-version", &exists);
449 if (!exists) {
450 version = AMQP_0_9_1;
451 }
452 if (version == AMQP_0_9_1) {
453 return Ptr(new RGWPubSubAMQPEndpoint(endpoint, topic, args, cct));
454 } else if (version == AMQP_1_0) {
455 throw configuration_error("AMQP: v1.0 not supported");
456 return nullptr;
457 } else {
458 throw configuration_error("AMQP: unknown version: " + version);
459 return nullptr;
460 }
461 } else if (schema == "amqps") {
462 throw configuration_error("AMQP: ssl not supported");
463 return nullptr;
464 #endif
465 }
466
467 throw configuration_error("unknown schema in: " + endpoint);
468 return nullptr;
469 }
470