]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_sync_module_pubsub_rest.cc
c7feff500226b842726e8d5fdb11fb102025ce1f
[ceph.git] / ceph / src / rgw / rgw_sync_module_pubsub_rest.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include <algorithm>
5 #include "rgw_rest_pubsub_common.h"
6 #include "rgw_rest_pubsub.h"
7 #include "rgw_sync_module_pubsub.h"
8 #include "rgw_pubsub_push.h"
9 #include "rgw_sync_module_pubsub_rest.h"
10 #include "rgw_pubsub.h"
11 #include "rgw_op.h"
12 #include "rgw_rest.h"
13 #include "rgw_rest_s3.h"
14 #include "rgw_arn.h"
15 #include "rgw_zone.h"
16 #include "services/svc_zone.h"
17 #include "rgw_sal_rados.h"
18
19 #define dout_context g_ceph_context
20 #define dout_subsys ceph_subsys_rgw
21
22 // command: PUT /topics/<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]
23 class RGWPSCreateTopic_ObjStore : public RGWPSCreateTopicOp {
24 public:
25 int get_params() override {
26
27 topic_name = s->object->get_name();
28
29 opaque_data = s->info.args.get("OpaqueData");
30 dest.push_endpoint = s->info.args.get("push-endpoint");
31
32 if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
33 return -EINVAL;
34 }
35 dest.push_endpoint_args = s->info.args.get_str();
36 // dest object only stores endpoint info
37 // bucket to store events/records will be set only when subscription is created
38 dest.bucket_name = "";
39 dest.oid_prefix = "";
40 dest.arn_topic = topic_name;
41 // the topic ARN will be sent in the reply
42 const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns,
43 store->svc()->zone->get_zonegroup().get_name(),
44 s->user->get_tenant(), topic_name);
45 topic_arn = arn.to_string();
46 return 0;
47 }
48
49 void send_response() override {
50 if (op_ret) {
51 set_req_state_err(s, op_ret);
52 }
53 dump_errno(s);
54 end_header(s, this, "application/json");
55
56 if (op_ret < 0) {
57 return;
58 }
59
60 {
61 Formatter::ObjectSection section(*s->formatter, "result");
62 encode_json("arn", topic_arn, s->formatter);
63 }
64 rgw_flush_formatter_and_reset(s, s->formatter);
65 }
66 };
67
68 // command: GET /topics
69 class RGWPSListTopics_ObjStore : public RGWPSListTopicsOp {
70 public:
71 void send_response() override {
72 if (op_ret) {
73 set_req_state_err(s, op_ret);
74 }
75 dump_errno(s);
76 end_header(s, this, "application/json");
77
78 if (op_ret < 0) {
79 return;
80 }
81
82 encode_json("result", result, s->formatter);
83 rgw_flush_formatter_and_reset(s, s->formatter);
84 }
85 };
86
87 // command: GET /topics/<topic-name>
88 class RGWPSGetTopic_ObjStore : public RGWPSGetTopicOp {
89 public:
90 int get_params() override {
91 topic_name = s->object->get_name();
92 return 0;
93 }
94
95 void send_response() override {
96 if (op_ret) {
97 set_req_state_err(s, op_ret);
98 }
99 dump_errno(s);
100 end_header(s, this, "application/json");
101
102 if (op_ret < 0) {
103 return;
104 }
105
106 encode_json("result", result, s->formatter);
107 rgw_flush_formatter_and_reset(s, s->formatter);
108 }
109 };
110
111 // command: DELETE /topics/<topic-name>
112 class RGWPSDeleteTopic_ObjStore : public RGWPSDeleteTopicOp {
113 public:
114 int get_params() override {
115 topic_name = s->object->get_name();
116 return 0;
117 }
118 };
119
120 // ceph specifc topics handler factory
121 class RGWHandler_REST_PSTopic : public RGWHandler_REST_S3 {
122 protected:
123 int init_permissions(RGWOp* op, optional_yield) override {
124 return 0;
125 }
126
127 int read_permissions(RGWOp* op, optional_yield) override {
128 return 0;
129 }
130
131 bool supports_quota() override {
132 return false;
133 }
134
135 RGWOp *op_get() override {
136 if (s->init_state.url_bucket.empty()) {
137 return nullptr;
138 }
139 if (s->object->empty()) {
140 return new RGWPSListTopics_ObjStore();
141 }
142 return new RGWPSGetTopic_ObjStore();
143 }
144 RGWOp *op_put() override {
145 if (!s->object->empty()) {
146 return new RGWPSCreateTopic_ObjStore();
147 }
148 return nullptr;
149 }
150 RGWOp *op_delete() override {
151 if (!s->object->empty()) {
152 return new RGWPSDeleteTopic_ObjStore();
153 }
154 return nullptr;
155 }
156 public:
157 explicit RGWHandler_REST_PSTopic(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
158 virtual ~RGWHandler_REST_PSTopic() = default;
159 };
160
161 // command: PUT /subscriptions/<sub-name>?topic=<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]...
162 class RGWPSCreateSub_ObjStore : public RGWPSCreateSubOp {
163 public:
164 int get_params() override {
165 sub_name = s->object->get_name();
166
167 bool exists;
168 topic_name = s->info.args.get("topic", &exists);
169 if (!exists) {
170 ldout(s->cct, 1) << "missing required param 'topic'" << dendl;
171 return -EINVAL;
172 }
173
174 const auto psmodule = static_cast<RGWPSSyncModuleInstance*>(store->getRados()->get_sync_module().get());
175 const auto& conf = psmodule->get_effective_conf();
176
177 dest.push_endpoint = s->info.args.get("push-endpoint");
178 if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
179 return -EINVAL;
180 }
181 dest.push_endpoint_args = s->info.args.get_str();
182 dest.bucket_name = string(conf["data_bucket_prefix"]) + s->owner.get_id().to_str() + "-" + topic_name;
183 dest.oid_prefix = string(conf["data_oid_prefix"]) + sub_name + "/";
184 dest.arn_topic = topic_name;
185
186 return 0;
187 }
188 };
189
190 // command: GET /subscriptions/<sub-name>
191 class RGWPSGetSub_ObjStore : public RGWPSGetSubOp {
192 public:
193 int get_params() override {
194 sub_name = s->object->get_name();
195 return 0;
196 }
197 void send_response() override {
198 if (op_ret) {
199 set_req_state_err(s, op_ret);
200 }
201 dump_errno(s);
202 end_header(s, this, "application/json");
203
204 if (op_ret < 0) {
205 return;
206 }
207
208 encode_json("result", result, s->formatter);
209 rgw_flush_formatter_and_reset(s, s->formatter);
210 }
211 };
212
213 // command: DELETE /subscriptions/<sub-name>
214 class RGWPSDeleteSub_ObjStore : public RGWPSDeleteSubOp {
215 public:
216 int get_params() override {
217 sub_name = s->object->get_name();
218 topic_name = s->info.args.get("topic");
219 return 0;
220 }
221 };
222
223 // command: POST /subscriptions/<sub-name>?ack&event-id=<event-id>
224 class RGWPSAckSubEvent_ObjStore : public RGWPSAckSubEventOp {
225 public:
226 explicit RGWPSAckSubEvent_ObjStore() {}
227
228 int get_params() override {
229 sub_name = s->object->get_name();
230
231 bool exists;
232
233 event_id = s->info.args.get("event-id", &exists);
234 if (!exists) {
235 ldout(s->cct, 1) << "missing required param 'event-id'" << dendl;
236 return -EINVAL;
237 }
238 return 0;
239 }
240 };
241
242 // command: GET /subscriptions/<sub-name>?events[&max-entries=<max-entries>][&marker=<marker>]
243 class RGWPSPullSubEvents_ObjStore : public RGWPSPullSubEventsOp {
244 public:
245 int get_params() override {
246 sub_name = s->object->get_name();
247 marker = s->info.args.get("marker");
248 const int ret = s->info.args.get_int("max-entries", &max_entries,
249 RGWPubSub::Sub::DEFAULT_MAX_EVENTS);
250 if (ret < 0) {
251 ldout(s->cct, 1) << "failed to parse 'max-entries' param" << dendl;
252 return -EINVAL;
253 }
254 return 0;
255 }
256
257 void send_response() override {
258 if (op_ret) {
259 set_req_state_err(s, op_ret);
260 }
261 dump_errno(s);
262 end_header(s, this, "application/json");
263
264 if (op_ret < 0) {
265 return;
266 }
267
268 encode_json("result", *sub, s->formatter);
269 rgw_flush_formatter_and_reset(s, s->formatter);
270 }
271 };
272
273 // subscriptions handler factory
274 class RGWHandler_REST_PSSub : public RGWHandler_REST_S3 {
275 protected:
276 int init_permissions(RGWOp* op, optional_yield) override {
277 return 0;
278 }
279
280 int read_permissions(RGWOp* op, optional_yield) override {
281 return 0;
282 }
283 bool supports_quota() override {
284 return false;
285 }
286 RGWOp *op_get() override {
287 if (s->object->empty()) {
288 return nullptr;
289 }
290 if (s->info.args.exists("events")) {
291 return new RGWPSPullSubEvents_ObjStore();
292 }
293 return new RGWPSGetSub_ObjStore();
294 }
295 RGWOp *op_put() override {
296 if (!s->object->empty()) {
297 return new RGWPSCreateSub_ObjStore();
298 }
299 return nullptr;
300 }
301 RGWOp *op_delete() override {
302 if (!s->object->empty()) {
303 return new RGWPSDeleteSub_ObjStore();
304 }
305 return nullptr;
306 }
307 RGWOp *op_post() override {
308 if (s->info.args.exists("ack")) {
309 return new RGWPSAckSubEvent_ObjStore();
310 }
311 return nullptr;
312 }
313 public:
314 explicit RGWHandler_REST_PSSub(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
315 virtual ~RGWHandler_REST_PSSub() = default;
316 };
317
318 namespace {
319 // extract bucket name from ceph specific notification command, with the format:
320 // /notifications/<bucket-name>
321 int notif_bucket_path(const string& path, std::string& bucket_name) {
322 if (path.empty()) {
323 return -EINVAL;
324 }
325 size_t pos = path.find('/');
326 if (pos == string::npos) {
327 return -EINVAL;
328 }
329 if (pos >= path.size()) {
330 return -EINVAL;
331 }
332
333 string type = path.substr(0, pos);
334 if (type != "bucket") {
335 return -EINVAL;
336 }
337
338 bucket_name = path.substr(pos + 1);
339 return 0;
340 }
341 }
342
343 // command (ceph specific): PUT /notification/bucket/<bucket name>?topic=<topic name>
344 class RGWPSCreateNotif_ObjStore : public RGWPSCreateNotifOp {
345 private:
346 std::string topic_name;
347 rgw::notify::EventTypeList events;
348
349 int get_params() override {
350 bool exists;
351 topic_name = s->info.args.get("topic", &exists);
352 if (!exists) {
353 ldout(s->cct, 1) << "missing required param 'topic'" << dendl;
354 return -EINVAL;
355 }
356
357 std::string events_str = s->info.args.get("events", &exists);
358 if (!exists) {
359 // if no events are provided, we notify on all of them
360 events_str = "OBJECT_CREATE,OBJECT_DELETE,DELETE_MARKER_CREATE";
361 }
362 rgw::notify::from_string_list(events_str, events);
363 if (std::find(events.begin(), events.end(), rgw::notify::UnknownEvent) != events.end()) {
364 ldout(s->cct, 1) << "invalid event type in list: " << events_str << dendl;
365 return -EINVAL;
366 }
367 return notif_bucket_path(s->object->get_name(), bucket_name);
368 }
369
370 public:
371 const char* name() const override { return "pubsub_notification_create"; }
372 void execute(optional_yield y) override;
373 };
374
375 void RGWPSCreateNotif_ObjStore::execute(optional_yield y)
376 {
377 ps.emplace(store, s->owner.get_id().tenant);
378
379 auto b = ps->get_bucket(bucket_info.bucket);
380 op_ret = b->create_notification(topic_name, events, y);
381 if (op_ret < 0) {
382 ldout(s->cct, 1) << "failed to create notification for topic '" << topic_name << "', ret=" << op_ret << dendl;
383 return;
384 }
385 ldout(s->cct, 20) << "successfully created notification for topic '" << topic_name << "'" << dendl;
386 }
387
388 // command: DELETE /notifications/bucket/<bucket>?topic=<topic-name>
389 class RGWPSDeleteNotif_ObjStore : public RGWPSDeleteNotifOp {
390 private:
391 std::string topic_name;
392
393 int get_params() override {
394 bool exists;
395 topic_name = s->info.args.get("topic", &exists);
396 if (!exists) {
397 ldout(s->cct, 1) << "missing required param 'topic'" << dendl;
398 return -EINVAL;
399 }
400 return notif_bucket_path(s->object->get_name(), bucket_name);
401 }
402
403 public:
404 void execute(optional_yield y) override;
405 const char* name() const override { return "pubsub_notification_delete"; }
406 };
407
408 void RGWPSDeleteNotif_ObjStore::execute(optional_yield y) {
409 op_ret = get_params();
410 if (op_ret < 0) {
411 return;
412 }
413
414 ps.emplace(store, s->owner.get_id().tenant);
415 auto b = ps->get_bucket(bucket_info.bucket);
416 op_ret = b->remove_notification(topic_name, y);
417 if (op_ret < 0) {
418 ldout(s->cct, 1) << "failed to remove notification from topic '" << topic_name << "', ret=" << op_ret << dendl;
419 return;
420 }
421 ldout(s->cct, 20) << "successfully removed notification from topic '" << topic_name << "'" << dendl;
422 }
423
424 // command: GET /notifications/bucket/<bucket>
425 class RGWPSListNotifs_ObjStore : public RGWPSListNotifsOp {
426 private:
427 rgw_pubsub_bucket_topics result;
428
429 int get_params() override {
430 return notif_bucket_path(s->object->get_name(), bucket_name);
431 }
432
433 public:
434 void execute(optional_yield y) override;
435 void send_response() override {
436 if (op_ret) {
437 set_req_state_err(s, op_ret);
438 }
439 dump_errno(s);
440 end_header(s, this, "application/json");
441
442 if (op_ret < 0) {
443 return;
444 }
445 encode_json("result", result, s->formatter);
446 rgw_flush_formatter_and_reset(s, s->formatter);
447 }
448 const char* name() const override { return "pubsub_notifications_list"; }
449 };
450
451 void RGWPSListNotifs_ObjStore::execute(optional_yield y)
452 {
453 ps.emplace(store, s->owner.get_id().tenant);
454 auto b = ps->get_bucket(bucket_info.bucket);
455 op_ret = b->get_topics(&result);
456 if (op_ret < 0) {
457 ldout(s->cct, 1) << "failed to get topics, ret=" << op_ret << dendl;
458 return;
459 }
460 }
461
462 // ceph specific notification handler factory
463 class RGWHandler_REST_PSNotifs : public RGWHandler_REST_S3 {
464 protected:
465 int init_permissions(RGWOp* op, optional_yield) override {
466 return 0;
467 }
468
469 int read_permissions(RGWOp* op, optional_yield) override {
470 return 0;
471 }
472 bool supports_quota() override {
473 return false;
474 }
475 RGWOp *op_get() override {
476 if (s->object->empty()) {
477 return nullptr;
478 }
479 return new RGWPSListNotifs_ObjStore();
480 }
481 RGWOp *op_put() override {
482 if (!s->object->empty()) {
483 return new RGWPSCreateNotif_ObjStore();
484 }
485 return nullptr;
486 }
487 RGWOp *op_delete() override {
488 if (!s->object->empty()) {
489 return new RGWPSDeleteNotif_ObjStore();
490 }
491 return nullptr;
492 }
493 public:
494 explicit RGWHandler_REST_PSNotifs(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
495 virtual ~RGWHandler_REST_PSNotifs() = default;
496 };
497
498 // factory for ceph specific PubSub REST handlers
499 RGWHandler_REST* RGWRESTMgr_PubSub::get_handler(rgw::sal::RGWRadosStore *store,
500 struct req_state* const s,
501 const rgw::auth::StrategyRegistry& auth_registry,
502 const std::string& frontend_prefix)
503 {
504 if (RGWHandler_REST_S3::init_from_header(store, s, RGW_FORMAT_JSON, true) < 0) {
505 return nullptr;
506 }
507
508 RGWHandler_REST* handler{nullptr};
509
510 // ceph specific PubSub API: topics/subscriptions/notification are reserved bucket names
511 // this API is available only on RGW that belong to a pubsub zone
512 if (s->init_state.url_bucket == "topics") {
513 handler = new RGWHandler_REST_PSTopic(auth_registry);
514 } else if (s->init_state.url_bucket == "subscriptions") {
515 handler = new RGWHandler_REST_PSSub(auth_registry);
516 } else if (s->init_state.url_bucket == "notifications") {
517 handler = new RGWHandler_REST_PSNotifs(auth_registry);
518 } else if (s->info.args.exists("notification")) {
519 const int ret = RGWHandler_REST::allocate_formatter(s, RGW_FORMAT_XML, true);
520 if (ret == 0) {
521 handler = new RGWHandler_REST_PSNotifs_S3(auth_registry);
522 }
523 }
524
525 ldout(s->cct, 20) << __func__ << " handler=" << (handler ? typeid(*handler).name() : "<null>") << dendl;
526
527 return handler;
528 }
529