]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_sync_module_pubsub_rest.cc
import ceph quincy 17.2.6
[ceph.git] / ceph / src / rgw / rgw_sync_module_pubsub_rest.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include <algorithm>
5 #include "rgw_rest_pubsub_common.h"
6 #include "rgw_rest_pubsub.h"
7 #include "rgw_sync_module_pubsub.h"
8 #include "rgw_pubsub_push.h"
9 #include "rgw_sync_module_pubsub_rest.h"
10 #include "rgw_pubsub.h"
11 #include "rgw_op.h"
12 #include "rgw_rest.h"
13 #include "rgw_rest_s3.h"
14 #include "rgw_arn.h"
15 #include "rgw_zone.h"
16 #include "services/svc_zone.h"
17 #include "rgw_sal_rados.h"
18
19 #define dout_context g_ceph_context
20 #define dout_subsys ceph_subsys_rgw
21
22 using namespace std;
23
24 // command: PUT /topics/<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]
25 class RGWPSCreateTopic_ObjStore : public RGWPSCreateTopicOp {
26 public:
27 int get_params() override {
28
29 topic_name = s->object->get_name();
30
31 opaque_data = s->info.args.get("OpaqueData");
32 dest.push_endpoint = s->info.args.get("push-endpoint");
33
34 if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
35 return -EINVAL;
36 }
37 dest.push_endpoint_args = s->info.args.get_str();
38 // dest object only stores endpoint info
39 // bucket to store events/records will be set only when subscription is created
40 dest.bucket_name = "";
41 dest.oid_prefix = "";
42 dest.arn_topic = topic_name;
43 // the topic ARN will be sent in the reply
44 const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns,
45 store->get_zone()->get_zonegroup().get_name(),
46 s->user->get_tenant(), topic_name);
47 topic_arn = arn.to_string();
48 return 0;
49 }
50
51 void send_response() override {
52 if (op_ret) {
53 set_req_state_err(s, op_ret);
54 }
55 dump_errno(s);
56 end_header(s, this, "application/json");
57
58 if (op_ret < 0) {
59 return;
60 }
61
62 {
63 Formatter::ObjectSection section(*s->formatter, "result");
64 encode_json("arn", topic_arn, s->formatter);
65 }
66 rgw_flush_formatter_and_reset(s, s->formatter);
67 }
68 };
69
70 // command: GET /topics
71 class RGWPSListTopics_ObjStore : public RGWPSListTopicsOp {
72 public:
73 void send_response() override {
74 if (op_ret) {
75 set_req_state_err(s, op_ret);
76 }
77 dump_errno(s);
78 end_header(s, this, "application/json");
79
80 if (op_ret < 0) {
81 return;
82 }
83
84 encode_json("result", result, s->formatter);
85 rgw_flush_formatter_and_reset(s, s->formatter);
86 }
87 };
88
89 // command: GET /topics/<topic-name>
90 class RGWPSGetTopic_ObjStore : public RGWPSGetTopicOp {
91 public:
92 int get_params() override {
93 topic_name = s->object->get_name();
94 return 0;
95 }
96
97 void send_response() override {
98 if (op_ret) {
99 set_req_state_err(s, op_ret);
100 }
101 dump_errno(s);
102 end_header(s, this, "application/json");
103
104 if (op_ret < 0) {
105 return;
106 }
107
108 encode_json("result", result, s->formatter);
109 rgw_flush_formatter_and_reset(s, s->formatter);
110 }
111 };
112
113 // command: DELETE /topics/<topic-name>
114 class RGWPSDeleteTopic_ObjStore : public RGWPSDeleteTopicOp {
115 public:
116 int get_params() override {
117 topic_name = s->object->get_name();
118 return 0;
119 }
120 };
121
122 // ceph specifc topics handler factory
123 class RGWHandler_REST_PSTopic : public RGWHandler_REST_S3 {
124 protected:
125 int init_permissions(RGWOp* op, optional_yield) override {
126 return 0;
127 }
128
129 int read_permissions(RGWOp* op, optional_yield) override {
130 return 0;
131 }
132
133 bool supports_quota() override {
134 return false;
135 }
136
137 RGWOp *op_get() override {
138 if (s->init_state.url_bucket.empty()) {
139 return nullptr;
140 }
141 if (s->object == nullptr || s->object->empty()) {
142 return new RGWPSListTopics_ObjStore();
143 }
144 return new RGWPSGetTopic_ObjStore();
145 }
146 RGWOp *op_put() override {
147 if (!s->object->empty()) {
148 return new RGWPSCreateTopic_ObjStore();
149 }
150 return nullptr;
151 }
152 RGWOp *op_delete() override {
153 if (!s->object->empty()) {
154 return new RGWPSDeleteTopic_ObjStore();
155 }
156 return nullptr;
157 }
158 public:
159 explicit RGWHandler_REST_PSTopic(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
160 virtual ~RGWHandler_REST_PSTopic() = default;
161 };
162
163 // command: PUT /subscriptions/<sub-name>?topic=<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]...
164 class RGWPSCreateSub_ObjStore : public RGWPSCreateSubOp {
165 public:
166 int get_params() override {
167 sub_name = s->object->get_name();
168
169 bool exists;
170 topic_name = s->info.args.get("topic", &exists);
171 if (!exists) {
172 ldpp_dout(this, 1) << "missing required param 'topic'" << dendl;
173 return -EINVAL;
174 }
175
176 const auto psmodule = static_cast<RGWPSSyncModuleInstance*>(store->get_sync_module().get());
177 const auto& conf = psmodule->get_effective_conf();
178
179 dest.push_endpoint = s->info.args.get("push-endpoint");
180 if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
181 return -EINVAL;
182 }
183 dest.push_endpoint_args = s->info.args.get_str();
184 dest.bucket_name = string(conf["data_bucket_prefix"]) + s->owner.get_id().to_str() + "-" + topic_name;
185 dest.oid_prefix = string(conf["data_oid_prefix"]) + sub_name + "/";
186 dest.arn_topic = topic_name;
187
188 return 0;
189 }
190 };
191
192 // command: GET /subscriptions/<sub-name>
193 class RGWPSGetSub_ObjStore : public RGWPSGetSubOp {
194 public:
195 int get_params() override {
196 sub_name = s->object->get_name();
197 return 0;
198 }
199 void send_response() override {
200 if (op_ret) {
201 set_req_state_err(s, op_ret);
202 }
203 dump_errno(s);
204 end_header(s, this, "application/json");
205
206 if (op_ret < 0) {
207 return;
208 }
209
210 encode_json("result", result, s->formatter);
211 rgw_flush_formatter_and_reset(s, s->formatter);
212 }
213 };
214
215 // command: DELETE /subscriptions/<sub-name>
216 class RGWPSDeleteSub_ObjStore : public RGWPSDeleteSubOp {
217 public:
218 int get_params() override {
219 sub_name = s->object->get_name();
220 topic_name = s->info.args.get("topic");
221 return 0;
222 }
223 };
224
225 // command: POST /subscriptions/<sub-name>?ack&event-id=<event-id>
226 class RGWPSAckSubEvent_ObjStore : public RGWPSAckSubEventOp {
227 public:
228 explicit RGWPSAckSubEvent_ObjStore() {}
229
230 int get_params() override {
231 sub_name = s->object->get_name();
232
233 bool exists;
234
235 event_id = s->info.args.get("event-id", &exists);
236 if (!exists) {
237 ldpp_dout(this, 1) << "missing required param 'event-id'" << dendl;
238 return -EINVAL;
239 }
240 return 0;
241 }
242 };
243
244 // command: GET /subscriptions/<sub-name>?events[&max-entries=<max-entries>][&marker=<marker>]
245 class RGWPSPullSubEvents_ObjStore : public RGWPSPullSubEventsOp {
246 public:
247 int get_params() override {
248 sub_name = s->object->get_name();
249 marker = s->info.args.get("marker");
250 const int ret = s->info.args.get_int("max-entries", &max_entries,
251 RGWPubSub::Sub::DEFAULT_MAX_EVENTS);
252 if (ret < 0) {
253 ldpp_dout(this, 1) << "failed to parse 'max-entries' param" << dendl;
254 return -EINVAL;
255 }
256 return 0;
257 }
258
259 void send_response() override {
260 if (op_ret) {
261 set_req_state_err(s, op_ret);
262 }
263 dump_errno(s);
264 end_header(s, this, "application/json");
265
266 if (op_ret < 0) {
267 return;
268 }
269
270 encode_json("result", *sub, s->formatter);
271 rgw_flush_formatter_and_reset(s, s->formatter);
272 }
273 };
274
275 // subscriptions handler factory
276 class RGWHandler_REST_PSSub : public RGWHandler_REST_S3 {
277 protected:
278 int init_permissions(RGWOp* op, optional_yield) override {
279 return 0;
280 }
281
282 int read_permissions(RGWOp* op, optional_yield) override {
283 return 0;
284 }
285 bool supports_quota() override {
286 return false;
287 }
288 RGWOp *op_get() override {
289 if (s->object->empty()) {
290 return nullptr;
291 }
292 if (s->info.args.exists("events")) {
293 return new RGWPSPullSubEvents_ObjStore();
294 }
295 return new RGWPSGetSub_ObjStore();
296 }
297 RGWOp *op_put() override {
298 if (!s->object->empty()) {
299 return new RGWPSCreateSub_ObjStore();
300 }
301 return nullptr;
302 }
303 RGWOp *op_delete() override {
304 if (!s->object->empty()) {
305 return new RGWPSDeleteSub_ObjStore();
306 }
307 return nullptr;
308 }
309 RGWOp *op_post() override {
310 if (s->info.args.exists("ack")) {
311 return new RGWPSAckSubEvent_ObjStore();
312 }
313 return nullptr;
314 }
315 public:
316 explicit RGWHandler_REST_PSSub(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
317 virtual ~RGWHandler_REST_PSSub() = default;
318 };
319
320 namespace {
321 // extract bucket name from ceph specific notification command, with the format:
322 // /notifications/<bucket-name>
323 int notif_bucket_path(const string& path, std::string& bucket_name) {
324 if (path.empty()) {
325 return -EINVAL;
326 }
327 size_t pos = path.find('/');
328 if (pos == string::npos) {
329 return -EINVAL;
330 }
331 if (pos >= path.size()) {
332 return -EINVAL;
333 }
334
335 string type = path.substr(0, pos);
336 if (type != "bucket") {
337 return -EINVAL;
338 }
339
340 bucket_name = path.substr(pos + 1);
341 return 0;
342 }
343 }
344
345 // command (ceph specific): PUT /notification/bucket/<bucket name>?topic=<topic name>
346 class RGWPSCreateNotif_ObjStore : public RGWPSCreateNotifOp {
347 private:
348 std::string topic_name;
349 rgw::notify::EventTypeList events;
350
351 int get_params() override {
352 bool exists;
353 topic_name = s->info.args.get("topic", &exists);
354 if (!exists) {
355 ldpp_dout(this, 1) << "missing required param 'topic'" << dendl;
356 return -EINVAL;
357 }
358
359 std::string events_str = s->info.args.get("events", &exists);
360 if (!exists) {
361 // if no events are provided, we notify on all of them
362 events_str =
363 "OBJECT_CREATE,OBJECT_DELETE,DELETE_MARKER_CREATE,OBJECT_EXPIRATION";
364 }
365 rgw::notify::from_string_list(events_str, events);
366 if (std::find(events.begin(), events.end(), rgw::notify::UnknownEvent) != events.end()) {
367 ldpp_dout(this, 1) << "invalid event type in list: " << events_str << dendl;
368 return -EINVAL;
369 }
370 return notif_bucket_path(s->object->get_name(), bucket_name);
371 }
372
373 public:
374 const char* name() const override { return "pubsub_notification_create"; }
375 void execute(optional_yield y) override;
376 };
377
378 void RGWPSCreateNotif_ObjStore::execute(optional_yield y)
379 {
380 ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
381
382 auto b = ps->get_bucket(bucket_info.bucket);
383 op_ret = b->create_notification(this, topic_name, events, y);
384 if (op_ret < 0) {
385 ldpp_dout(this, 1) << "failed to create notification for topic '" << topic_name << "', ret=" << op_ret << dendl;
386 return;
387 }
388 ldpp_dout(this, 20) << "successfully created notification for topic '" << topic_name << "'" << dendl;
389 }
390
391 // command: DELETE /notifications/bucket/<bucket>?topic=<topic-name>
392 class RGWPSDeleteNotif_ObjStore : public RGWPSDeleteNotifOp {
393 private:
394 std::string topic_name;
395
396 int get_params() override {
397 bool exists;
398 topic_name = s->info.args.get("topic", &exists);
399 if (!exists) {
400 ldpp_dout(this, 1) << "missing required param 'topic'" << dendl;
401 return -EINVAL;
402 }
403 return notif_bucket_path(s->object->get_name(), bucket_name);
404 }
405
406 public:
407 void execute(optional_yield y) override;
408 const char* name() const override { return "pubsub_notification_delete"; }
409 };
410
411 void RGWPSDeleteNotif_ObjStore::execute(optional_yield y) {
412 op_ret = get_params();
413 if (op_ret < 0) {
414 return;
415 }
416
417 ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
418 auto b = ps->get_bucket(bucket_info.bucket);
419 op_ret = b->remove_notification(this, topic_name, y);
420 if (op_ret < 0) {
421 ldpp_dout(s, 1) << "failed to remove notification from topic '" << topic_name << "', ret=" << op_ret << dendl;
422 return;
423 }
424 ldpp_dout(this, 20) << "successfully removed notification from topic '" << topic_name << "'" << dendl;
425 }
426
427 // command: GET /notifications/bucket/<bucket>
428 class RGWPSListNotifs_ObjStore : public RGWPSListNotifsOp {
429 private:
430 rgw_pubsub_bucket_topics result;
431
432 int get_params() override {
433 return notif_bucket_path(s->object->get_name(), bucket_name);
434 }
435
436 public:
437 void execute(optional_yield y) override;
438 void send_response() override {
439 if (op_ret) {
440 set_req_state_err(s, op_ret);
441 }
442 dump_errno(s);
443 end_header(s, this, "application/json");
444
445 if (op_ret < 0) {
446 return;
447 }
448 encode_json("result", result, s->formatter);
449 rgw_flush_formatter_and_reset(s, s->formatter);
450 }
451 const char* name() const override { return "pubsub_notifications_list"; }
452 };
453
454 void RGWPSListNotifs_ObjStore::execute(optional_yield y)
455 {
456 ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
457 auto b = ps->get_bucket(bucket_info.bucket);
458 op_ret = b->get_topics(&result);
459 if (op_ret < 0) {
460 ldpp_dout(this, 1) << "failed to get topics, ret=" << op_ret << dendl;
461 return;
462 }
463 }
464
465 // ceph specific notification handler factory
466 class RGWHandler_REST_PSNotifs : public RGWHandler_REST_S3 {
467 protected:
468 int init_permissions(RGWOp* op, optional_yield) override {
469 return 0;
470 }
471
472 int read_permissions(RGWOp* op, optional_yield) override {
473 return 0;
474 }
475 bool supports_quota() override {
476 return false;
477 }
478 RGWOp *op_get() override {
479 if (s->object->empty()) {
480 return nullptr;
481 }
482 return new RGWPSListNotifs_ObjStore();
483 }
484 RGWOp *op_put() override {
485 if (!s->object->empty()) {
486 return new RGWPSCreateNotif_ObjStore();
487 }
488 return nullptr;
489 }
490 RGWOp *op_delete() override {
491 if (!s->object->empty()) {
492 return new RGWPSDeleteNotif_ObjStore();
493 }
494 return nullptr;
495 }
496 public:
497 explicit RGWHandler_REST_PSNotifs(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
498 virtual ~RGWHandler_REST_PSNotifs() = default;
499 };
500
501 // factory for ceph specific PubSub REST handlers
502 RGWHandler_REST* RGWRESTMgr_PubSub::get_handler(rgw::sal::Store* store,
503 struct req_state* const s,
504 const rgw::auth::StrategyRegistry& auth_registry,
505 const std::string& frontend_prefix)
506 {
507 if (RGWHandler_REST_S3::init_from_header(store, s, RGW_FORMAT_JSON, true) < 0) {
508 return nullptr;
509 }
510
511 RGWHandler_REST* handler{nullptr};
512
513 // ceph specific PubSub API: topics/subscriptions/notification are reserved bucket names
514 // this API is available only on RGW that belong to a pubsub zone
515 if (s->init_state.url_bucket == "topics") {
516 handler = new RGWHandler_REST_PSTopic(auth_registry);
517 } else if (s->init_state.url_bucket == "subscriptions") {
518 handler = new RGWHandler_REST_PSSub(auth_registry);
519 } else if (s->init_state.url_bucket == "notifications") {
520 handler = new RGWHandler_REST_PSNotifs(auth_registry);
521 } else if (s->info.args.exists("notification")) {
522 const int ret = RGWHandler_REST::allocate_formatter(s, RGW_FORMAT_XML, true);
523 if (ret == 0) {
524 handler = new RGWHandler_REST_PSNotifs_S3(auth_registry);
525 }
526 }
527
528 ldpp_dout(s, 20) << __func__ << " handler=" << (handler ? typeid(*handler).name() : "<null>") << dendl;
529
530 return handler;
531 }
532