]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_sync_module_pubsub_rest.cc
import ceph 14.2.5
[ceph.git] / ceph / src / rgw / rgw_sync_module_pubsub_rest.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include <algorithm>
5 #include "rgw_rest_pubsub_common.h"
6 #include "rgw_rest_pubsub.h"
7 #include "rgw_sync_module_pubsub.h"
8 #include "rgw_pubsub_push.h"
9 #include "rgw_sync_module_pubsub_rest.h"
10 #include "rgw_pubsub.h"
11 #include "rgw_op.h"
12 #include "rgw_rest.h"
13 #include "rgw_rest_s3.h"
14 #include "rgw_arn.h"
15 #include "rgw_zone.h"
16
17 #define dout_context g_ceph_context
18 #define dout_subsys ceph_subsys_rgw
19
20 // command: PUT /topics/<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]
21 class RGWPSCreateTopic_ObjStore : public RGWPSCreateTopicOp {
22 public:
23 int get_params() override {
24
25 topic_name = s->object.name;
26
27 dest.push_endpoint = s->info.args.get("push-endpoint");
28 dest.push_endpoint_args = s->info.args.get_str();
29 // dest object only stores endpoint info
30 // bucket to store events/records will be set only when subscription is created
31 dest.bucket_name = "";
32 dest.oid_prefix = "";
33 dest.arn_topic = topic_name;
34 // the topic ARN will be sent in the reply
35 const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns,
36 store->svc.zone->get_zonegroup().get_name(),
37 s->user->user_id.tenant, topic_name);
38 topic_arn = arn.to_string();
39 return 0;
40 }
41
42 void send_response() override {
43 if (op_ret) {
44 set_req_state_err(s, op_ret);
45 }
46 dump_errno(s);
47 end_header(s, this, "application/json");
48
49 if (op_ret < 0) {
50 return;
51 }
52
53 {
54 Formatter::ObjectSection section(*s->formatter, "result");
55 encode_json("arn", topic_arn, s->formatter);
56 }
57 rgw_flush_formatter_and_reset(s, s->formatter);
58 }
59 };
60
61 // command: GET /topics
62 class RGWPSListTopics_ObjStore : public RGWPSListTopicsOp {
63 public:
64 void send_response() override {
65 if (op_ret) {
66 set_req_state_err(s, op_ret);
67 }
68 dump_errno(s);
69 end_header(s, this, "application/json");
70
71 if (op_ret < 0) {
72 return;
73 }
74
75 encode_json("result", result, s->formatter);
76 rgw_flush_formatter_and_reset(s, s->formatter);
77 }
78 };
79
80 // command: GET /topics/<topic-name>
81 class RGWPSGetTopic_ObjStore : public RGWPSGetTopicOp {
82 public:
83 int get_params() override {
84 topic_name = s->object.name;
85 return 0;
86 }
87
88 void send_response() override {
89 if (op_ret) {
90 set_req_state_err(s, op_ret);
91 }
92 dump_errno(s);
93 end_header(s, this, "application/json");
94
95 if (op_ret < 0) {
96 return;
97 }
98
99 encode_json("result", result, s->formatter);
100 rgw_flush_formatter_and_reset(s, s->formatter);
101 }
102 };
103
104 // command: DELETE /topics/<topic-name>
105 class RGWPSDeleteTopic_ObjStore : public RGWPSDeleteTopicOp {
106 public:
107 int get_params() override {
108 topic_name = s->object.name;
109 return 0;
110 }
111 };
112
113 // ceph specifc topics handler factory
114 class RGWHandler_REST_PSTopic : public RGWHandler_REST_S3 {
115 protected:
116 int init_permissions(RGWOp* op) override {
117 return 0;
118 }
119
120 int read_permissions(RGWOp* op) override {
121 return 0;
122 }
123
124 bool supports_quota() override {
125 return false;
126 }
127
128 RGWOp *op_get() override {
129 if (s->init_state.url_bucket.empty()) {
130 return nullptr;
131 }
132 if (s->object.empty()) {
133 return new RGWPSListTopics_ObjStore();
134 }
135 return new RGWPSGetTopic_ObjStore();
136 }
137 RGWOp *op_put() override {
138 if (!s->object.empty()) {
139 return new RGWPSCreateTopic_ObjStore();
140 }
141 return nullptr;
142 }
143 RGWOp *op_delete() override {
144 if (!s->object.empty()) {
145 return new RGWPSDeleteTopic_ObjStore();
146 }
147 return nullptr;
148 }
149 public:
150 explicit RGWHandler_REST_PSTopic(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
151 virtual ~RGWHandler_REST_PSTopic() = default;
152 };
153
154 // command: PUT /subscriptions/<sub-name>?topic=<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]...
155 class RGWPSCreateSub_ObjStore : public RGWPSCreateSubOp {
156 public:
157 int get_params() override {
158 sub_name = s->object.name;
159
160 bool exists;
161 topic_name = s->info.args.get("topic", &exists);
162 if (!exists) {
163 ldout(s->cct, 1) << "missing required param 'topic'" << dendl;
164 return -EINVAL;
165 }
166
167 const auto psmodule = static_cast<RGWPSSyncModuleInstance*>(store->get_sync_module().get());
168 const auto& conf = psmodule->get_effective_conf();
169
170 dest.push_endpoint = s->info.args.get("push-endpoint");
171 dest.bucket_name = string(conf["data_bucket_prefix"]) + s->owner.get_id().to_str() + "-" + topic_name;
172 dest.oid_prefix = string(conf["data_oid_prefix"]) + sub_name + "/";
173 dest.push_endpoint_args = s->info.args.get_str();
174 dest.arn_topic = topic_name;
175
176 return 0;
177 }
178 };
179
180 // command: GET /subscriptions/<sub-name>
181 class RGWPSGetSub_ObjStore : public RGWPSGetSubOp {
182 public:
183 int get_params() override {
184 sub_name = s->object.name;
185 return 0;
186 }
187 void send_response() override {
188 if (op_ret) {
189 set_req_state_err(s, op_ret);
190 }
191 dump_errno(s);
192 end_header(s, this, "application/json");
193
194 if (op_ret < 0) {
195 return;
196 }
197
198 encode_json("result", result, s->formatter);
199 rgw_flush_formatter_and_reset(s, s->formatter);
200 }
201 };
202
203 // command: DELETE /subscriptions/<sub-name>
204 class RGWPSDeleteSub_ObjStore : public RGWPSDeleteSubOp {
205 public:
206 int get_params() override {
207 sub_name = s->object.name;
208 topic_name = s->info.args.get("topic");
209 return 0;
210 }
211 };
212
213 // command: POST /subscriptions/<sub-name>?ack&event-id=<event-id>
214 class RGWPSAckSubEvent_ObjStore : public RGWPSAckSubEventOp {
215 public:
216 explicit RGWPSAckSubEvent_ObjStore() {}
217
218 int get_params() override {
219 sub_name = s->object.name;
220
221 bool exists;
222
223 event_id = s->info.args.get("event-id", &exists);
224 if (!exists) {
225 ldout(s->cct, 1) << "missing required param 'event-id'" << dendl;
226 return -EINVAL;
227 }
228 return 0;
229 }
230 };
231
232 // command: GET /subscriptions/<sub-name>?events[&max-entries=<max-entries>][&marker=<marker>]
233 class RGWPSPullSubEvents_ObjStore : public RGWPSPullSubEventsOp {
234 public:
235 int get_params() override {
236 sub_name = s->object.name;
237 marker = s->info.args.get("marker");
238 const int ret = s->info.args.get_int("max-entries", &max_entries,
239 RGWUserPubSub::Sub::DEFAULT_MAX_EVENTS);
240 if (ret < 0) {
241 ldout(s->cct, 1) << "failed to parse 'max-entries' param" << dendl;
242 return -EINVAL;
243 }
244 return 0;
245 }
246
247 void send_response() override {
248 if (op_ret) {
249 set_req_state_err(s, op_ret);
250 }
251 dump_errno(s);
252 end_header(s, this, "application/json");
253
254 if (op_ret < 0) {
255 return;
256 }
257
258 encode_json("result", *sub, s->formatter);
259 rgw_flush_formatter_and_reset(s, s->formatter);
260 }
261 };
262
263 // subscriptions handler factory
264 class RGWHandler_REST_PSSub : public RGWHandler_REST_S3 {
265 protected:
266 int init_permissions(RGWOp* op) override {
267 return 0;
268 }
269
270 int read_permissions(RGWOp* op) override {
271 return 0;
272 }
273 bool supports_quota() override {
274 return false;
275 }
276 RGWOp *op_get() override {
277 if (s->object.empty()) {
278 return nullptr;
279 }
280 if (s->info.args.exists("events")) {
281 return new RGWPSPullSubEvents_ObjStore();
282 }
283 return new RGWPSGetSub_ObjStore();
284 }
285 RGWOp *op_put() override {
286 if (!s->object.empty()) {
287 return new RGWPSCreateSub_ObjStore();
288 }
289 return nullptr;
290 }
291 RGWOp *op_delete() override {
292 if (!s->object.empty()) {
293 return new RGWPSDeleteSub_ObjStore();
294 }
295 return nullptr;
296 }
297 RGWOp *op_post() override {
298 if (s->info.args.exists("ack")) {
299 return new RGWPSAckSubEvent_ObjStore();
300 }
301 return nullptr;
302 }
303 public:
304 explicit RGWHandler_REST_PSSub(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
305 virtual ~RGWHandler_REST_PSSub() = default;
306 };
307
308 namespace {
309 // extract bucket name from ceph specific notification command, with the format:
310 // /notifications/<bucket-name>
311 int notif_bucket_path(const string& path, std::string& bucket_name) {
312 if (path.empty()) {
313 return -EINVAL;
314 }
315 size_t pos = path.find('/');
316 if (pos == string::npos) {
317 return -EINVAL;
318 }
319 if (pos >= path.size()) {
320 return -EINVAL;
321 }
322
323 string type = path.substr(0, pos);
324 if (type != "bucket") {
325 return -EINVAL;
326 }
327
328 bucket_name = path.substr(pos + 1);
329 return 0;
330 }
331 }
332
333 // command (ceph specific): PUT /notification/bucket/<bucket name>?topic=<topic name>
334 class RGWPSCreateNotif_ObjStore : public RGWPSCreateNotifOp {
335 private:
336 std::string topic_name;
337 rgw::notify::EventTypeList events;
338
339 int get_params() override {
340 bool exists;
341 topic_name = s->info.args.get("topic", &exists);
342 if (!exists) {
343 ldout(s->cct, 1) << "missing required param 'topic'" << dendl;
344 return -EINVAL;
345 }
346
347 std::string events_str = s->info.args.get("events", &exists);
348 if (!exists) {
349 // if no events are provided, we notify on all of them
350 events_str = "OBJECT_CREATE,OBJECT_DELETE,DELETE_MARKER_CREATE";
351 }
352 rgw::notify::from_string_list(events_str, events);
353 if (std::find(events.begin(), events.end(), rgw::notify::UnknownEvent) != events.end()) {
354 ldout(s->cct, 1) << "invalid event type in list: " << events_str << dendl;
355 return -EINVAL;
356 }
357 return notif_bucket_path(s->object.name, bucket_name);
358 }
359
360 public:
361 const char* name() const override { return "pubsub_notification_create"; }
362 void execute() override;
363 };
364
365 void RGWPSCreateNotif_ObjStore::execute()
366 {
367 ups.emplace(store, s->owner.get_id());
368
369 auto b = ups->get_bucket(bucket_info.bucket);
370 op_ret = b->create_notification(topic_name, events);
371 if (op_ret < 0) {
372 ldout(s->cct, 1) << "failed to create notification for topic '" << topic_name << "', ret=" << op_ret << dendl;
373 return;
374 }
375 ldout(s->cct, 20) << "successfully created notification for topic '" << topic_name << "'" << dendl;
376 }
377
378 // command: DELETE /notifications/bucket/<bucket>?topic=<topic-name>
379 class RGWPSDeleteNotif_ObjStore : public RGWPSDeleteNotifOp {
380 private:
381 std::string topic_name;
382
383 int get_params() override {
384 bool exists;
385 topic_name = s->info.args.get("topic", &exists);
386 if (!exists) {
387 ldout(s->cct, 1) << "missing required param 'topic'" << dendl;
388 return -EINVAL;
389 }
390 return notif_bucket_path(s->object.name, bucket_name);
391 }
392
393 public:
394 void execute() override;
395 const char* name() const override { return "pubsub_notification_delete"; }
396 };
397
398 void RGWPSDeleteNotif_ObjStore::execute() {
399 op_ret = get_params();
400 if (op_ret < 0) {
401 return;
402 }
403
404 ups.emplace(store, s->owner.get_id());
405 auto b = ups->get_bucket(bucket_info.bucket);
406 op_ret = b->remove_notification(topic_name);
407 if (op_ret < 0) {
408 ldout(s->cct, 1) << "failed to remove notification from topic '" << topic_name << "', ret=" << op_ret << dendl;
409 return;
410 }
411 ldout(s->cct, 20) << "successfully removed notification from topic '" << topic_name << "'" << dendl;
412 }
413
414 // command: GET /notifications/bucket/<bucket>
415 class RGWPSListNotifs_ObjStore : public RGWPSListNotifsOp {
416 private:
417 rgw_pubsub_bucket_topics result;
418
419 int get_params() override {
420 return notif_bucket_path(s->object.name, bucket_name);
421 }
422
423 public:
424 void execute() override;
425 void send_response() override {
426 if (op_ret) {
427 set_req_state_err(s, op_ret);
428 }
429 dump_errno(s);
430 end_header(s, this, "application/json");
431
432 if (op_ret < 0) {
433 return;
434 }
435 encode_json("result", result, s->formatter);
436 rgw_flush_formatter_and_reset(s, s->formatter);
437 }
438 const char* name() const override { return "pubsub_notifications_list"; }
439 };
440
441 void RGWPSListNotifs_ObjStore::execute()
442 {
443 ups.emplace(store, s->owner.get_id());
444 auto b = ups->get_bucket(bucket_info.bucket);
445 op_ret = b->get_topics(&result);
446 if (op_ret < 0) {
447 ldout(s->cct, 1) << "failed to get topics, ret=" << op_ret << dendl;
448 return;
449 }
450 }
451
452 // ceph specific notification handler factory
453 class RGWHandler_REST_PSNotifs : public RGWHandler_REST_S3 {
454 protected:
455 int init_permissions(RGWOp* op) override {
456 return 0;
457 }
458
459 int read_permissions(RGWOp* op) override {
460 return 0;
461 }
462 bool supports_quota() override {
463 return false;
464 }
465 RGWOp *op_get() override {
466 if (s->object.empty()) {
467 return nullptr;
468 }
469 return new RGWPSListNotifs_ObjStore();
470 }
471 RGWOp *op_put() override {
472 if (!s->object.empty()) {
473 return new RGWPSCreateNotif_ObjStore();
474 }
475 return nullptr;
476 }
477 RGWOp *op_delete() override {
478 if (!s->object.empty()) {
479 return new RGWPSDeleteNotif_ObjStore();
480 }
481 return nullptr;
482 }
483 public:
484 explicit RGWHandler_REST_PSNotifs(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
485 virtual ~RGWHandler_REST_PSNotifs() = default;
486 };
487
488 // factory for ceph specific PubSub REST handlers
489 RGWHandler_REST* RGWRESTMgr_PubSub::get_handler(struct req_state* const s,
490 const rgw::auth::StrategyRegistry& auth_registry,
491 const std::string& frontend_prefix)
492 {
493 if (RGWHandler_REST_S3::init_from_header(s, RGW_FORMAT_JSON, true) < 0) {
494 return nullptr;
495 }
496
497 RGWHandler_REST* handler{nullptr};
498
499 // ceph specific PubSub API: topics/subscriptions/notification are reserved bucket names
500 // this API is available only on RGW that belong to a pubsub zone
501 if (s->init_state.url_bucket == "topics") {
502 handler = new RGWHandler_REST_PSTopic(auth_registry);
503 } else if (s->init_state.url_bucket == "subscriptions") {
504 handler = new RGWHandler_REST_PSSub(auth_registry);
505 } else if (s->init_state.url_bucket == "notifications") {
506 handler = new RGWHandler_REST_PSNotifs(auth_registry);
507 } else if (s->info.args.exists("notification")) {
508 const int ret = RGWHandler_REST::allocate_formatter(s, RGW_FORMAT_XML, true);
509 if (ret == 0) {
510 handler = new RGWHandler_REST_PSNotifs_S3(auth_registry);
511 }
512 }
513
514 ldout(s->cct, 20) << __func__ << " handler=" << (handler ? typeid(*handler).name() : "<null>") << dendl;
515
516 return handler;
517 }
518