]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_sync_module_pubsub_rest.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rgw / rgw_sync_module_pubsub_rest.cc
1 #include "rgw_sync_module_pubsub.h"
2 #include "rgw_sync_module_pubsub_rest.h"
3 #include "rgw_pubsub.h"
4 #include "rgw_op.h"
5 #include "rgw_rest.h"
6 #include "rgw_rest_s3.h"
7
8 #define dout_context g_ceph_context
9 #define dout_subsys ceph_subsys_rgw
10
11 class RGWPSCreateTopicOp : public RGWDefaultResponseOp {
12 protected:
13 std::unique_ptr<RGWUserPubSub> ups;
14 string topic_name;
15 string bucket_name;
16
17 public:
18 RGWPSCreateTopicOp() {}
19
20 int verify_permission() override {
21 return 0;
22 }
23 void pre_exec() override {
24 rgw_bucket_object_pre_exec(s);
25 }
26 void execute() override;
27
28 const char* name() const override { return "pubsub_topic_create"; }
29 virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_CREATE; }
30 virtual uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
31 virtual int get_params() = 0;
32 };
33
34 void RGWPSCreateTopicOp::execute()
35 {
36 op_ret = get_params();
37 if (op_ret < 0) {
38 return;
39 }
40
41 ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
42 op_ret = ups->create_topic(topic_name);
43 if (op_ret < 0) {
44 ldout(s->cct, 20) << "failed to create topic, ret=" << op_ret << dendl;
45 return;
46 }
47 }
48
49 class RGWPSCreateTopic_ObjStore_S3 : public RGWPSCreateTopicOp {
50 public:
51 explicit RGWPSCreateTopic_ObjStore_S3() {}
52
53 int get_params() override {
54 topic_name = s->object.name;
55 return 0;
56 }
57 };
58
59 class RGWPSListTopicsOp : public RGWOp {
60 protected:
61 std::unique_ptr<RGWUserPubSub> ups;
62 rgw_pubsub_user_topics result;
63
64
65 public:
66 RGWPSListTopicsOp() {}
67
68 int verify_permission() override {
69 return 0;
70 }
71 void pre_exec() override {
72 rgw_bucket_object_pre_exec(s);
73 }
74 void execute() override;
75
76 const char* name() const override { return "pubsub_topics_list"; }
77 virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPICS_LIST; }
78 virtual uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
79 };
80
81 void RGWPSListTopicsOp::execute()
82 {
83 ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
84 op_ret = ups->get_user_topics(&result);
85 if (op_ret < 0) {
86 ldout(s->cct, 20) << "failed to get topics, ret=" << op_ret << dendl;
87 return;
88 }
89
90 }
91
92 class RGWPSListTopics_ObjStore_S3 : public RGWPSListTopicsOp {
93 public:
94 explicit RGWPSListTopics_ObjStore_S3() {}
95
96 void send_response() override {
97 if (op_ret) {
98 set_req_state_err(s, op_ret);
99 }
100 dump_errno(s);
101 end_header(s, this, "application/json");
102
103 if (op_ret < 0) {
104 return;
105 }
106
107 encode_json("result", result, s->formatter);
108 rgw_flush_formatter_and_reset(s, s->formatter);
109 }
110 };
111
112 class RGWPSGetTopicOp : public RGWOp {
113 protected:
114 string topic_name;
115 std::unique_ptr<RGWUserPubSub> ups;
116 rgw_pubsub_topic_subs result;
117
118 public:
119 RGWPSGetTopicOp() {}
120
121 int verify_permission() override {
122 return 0;
123 }
124 void pre_exec() override {
125 rgw_bucket_object_pre_exec(s);
126 }
127 void execute() override;
128
129 const char* name() const override { return "pubsub_topic_get"; }
130 virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_GET; }
131 virtual uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
132 virtual int get_params() = 0;
133 };
134
135 void RGWPSGetTopicOp::execute()
136 {
137 op_ret = get_params();
138 if (op_ret < 0) {
139 return;
140 }
141 ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
142 op_ret = ups->get_topic(topic_name, &result);
143 if (op_ret < 0) {
144 ldout(s->cct, 20) << "failed to get topic, ret=" << op_ret << dendl;
145 return;
146 }
147 }
148
149 class RGWPSGetTopic_ObjStore_S3 : public RGWPSGetTopicOp {
150 public:
151 explicit RGWPSGetTopic_ObjStore_S3() {}
152
153 int get_params() override {
154 topic_name = s->object.name;
155 return 0;
156 }
157
158 void send_response() override {
159 if (op_ret) {
160 set_req_state_err(s, op_ret);
161 }
162 dump_errno(s);
163 end_header(s, this, "application/json");
164
165 if (op_ret < 0) {
166 return;
167 }
168
169 encode_json("result", result, s->formatter);
170 rgw_flush_formatter_and_reset(s, s->formatter);
171 }
172 };
173
174 class RGWPSDeleteTopicOp : public RGWDefaultResponseOp {
175 protected:
176 string topic_name;
177 std::unique_ptr<RGWUserPubSub> ups;
178
179 public:
180 RGWPSDeleteTopicOp() {}
181
182 int verify_permission() override {
183 return 0;
184 }
185 void pre_exec() override {
186 rgw_bucket_object_pre_exec(s);
187 }
188 void execute() override;
189
190 const char* name() const override { return "pubsub_topic_delete"; }
191 virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_DELETE; }
192 virtual uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
193 virtual int get_params() = 0;
194 };
195
196 void RGWPSDeleteTopicOp::execute()
197 {
198 op_ret = get_params();
199 if (op_ret < 0) {
200 return;
201 }
202
203 ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
204 op_ret = ups->remove_topic(topic_name);
205 if (op_ret < 0) {
206 ldout(s->cct, 20) << "failed to remove topic, ret=" << op_ret << dendl;
207 return;
208 }
209 }
210
211 class RGWPSDeleteTopic_ObjStore_S3 : public RGWPSDeleteTopicOp {
212 public:
213 explicit RGWPSDeleteTopic_ObjStore_S3() {}
214
215 int get_params() override {
216 topic_name = s->object.name;
217 return 0;
218 }
219 };
220
221 class RGWHandler_REST_PSTopic_S3 : public RGWHandler_REST_S3 {
222 protected:
223 int init_permissions(RGWOp* op) override {
224 return 0;
225 }
226 int read_permissions(RGWOp* op) override {
227 return 0;
228 }
229 bool supports_quota() override {
230 return false;
231 }
232 RGWOp *op_get() override {
233 if (s->init_state.url_bucket.empty()) {
234 return nullptr;
235 }
236 if (s->object.empty()) {
237 return new RGWPSListTopics_ObjStore_S3();
238 }
239 return new RGWPSGetTopic_ObjStore_S3();
240 }
241 RGWOp *op_put() override {
242 if (!s->object.empty()) {
243 return new RGWPSCreateTopic_ObjStore_S3();
244 }
245 return nullptr;
246 }
247 RGWOp *op_delete() override {
248 if (!s->object.empty()) {
249 return new RGWPSDeleteTopic_ObjStore_S3();
250 }
251 return nullptr;
252 }
253 public:
254 explicit RGWHandler_REST_PSTopic_S3(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
255 virtual ~RGWHandler_REST_PSTopic_S3() {}
256 };
257
258
259 class RGWPSCreateSubOp : public RGWDefaultResponseOp {
260 protected:
261 string sub_name;
262 string topic_name;
263 std::unique_ptr<RGWUserPubSub> ups;
264 rgw_pubsub_sub_dest dest;
265
266 public:
267 RGWPSCreateSubOp() {}
268
269 int verify_permission() override {
270 return 0;
271 }
272 void pre_exec() override {
273 rgw_bucket_object_pre_exec(s);
274 }
275 void execute() override;
276
277 const char* name() const override { return "pubsub_subscription_create"; }
278 virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_CREATE; }
279 virtual uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
280 virtual int get_params() = 0;
281 };
282
283 void RGWPSCreateSubOp::execute()
284 {
285 op_ret = get_params();
286 if (op_ret < 0) {
287 return;
288 }
289 ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
290 auto sub = ups->get_sub(sub_name);
291 op_ret = sub->subscribe(topic_name, dest);
292 if (op_ret < 0) {
293 ldout(s->cct, 20) << "failed to create subscription, ret=" << op_ret << dendl;
294 return;
295 }
296 }
297
298 class RGWPSCreateSub_ObjStore_S3 : public RGWPSCreateSubOp {
299 public:
300 explicit RGWPSCreateSub_ObjStore_S3() {}
301
302 int get_params() override {
303 sub_name = s->object.name;
304
305 bool exists;
306
307 topic_name = s->info.args.get("topic", &exists);
308 if (!exists) {
309 ldout(s->cct, 20) << "ERROR: missing required param 'topic' for request" << dendl;
310 return -EINVAL;
311 }
312
313 auto psmodule = static_cast<RGWPSSyncModuleInstance *>(store->get_sync_module().get());
314 auto conf = psmodule->get_effective_conf();
315
316 dest.push_endpoint = s->info.args.get("push-endpoint");
317 dest.bucket_name = string(conf["data_bucket_prefix"]) + s->owner.get_id().to_str() + "-" + topic_name;
318 dest.oid_prefix = string(conf["data_oid_prefix"]) + sub_name + "/";
319 dest.push_endpoint_args = s->info.args.get_str();
320
321 return 0;
322 }
323 };
324
325 class RGWPSGetSubOp : public RGWOp {
326 protected:
327 string sub_name;
328 std::unique_ptr<RGWUserPubSub> ups;
329 rgw_pubsub_sub_config result;
330
331 public:
332 RGWPSGetSubOp() {}
333
334 int verify_permission() override {
335 return 0;
336 }
337 void pre_exec() override {
338 rgw_bucket_object_pre_exec(s);
339 }
340 void execute() override;
341
342 const char* name() const override { return "pubsub_subscription_get"; }
343 virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_GET; }
344 virtual uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
345 virtual int get_params() = 0;
346 };
347
348 void RGWPSGetSubOp::execute()
349 {
350 op_ret = get_params();
351 if (op_ret < 0) {
352 return;
353 }
354 ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
355 auto sub = ups->get_sub(sub_name);
356 op_ret = sub->get_conf(&result);
357 if (op_ret < 0) {
358 ldout(s->cct, 20) << "failed to get subscription, ret=" << op_ret << dendl;
359 return;
360 }
361 }
362
363 class RGWPSGetSub_ObjStore_S3 : public RGWPSGetSubOp {
364 public:
365 explicit RGWPSGetSub_ObjStore_S3() {}
366
367 int get_params() override {
368 sub_name = s->object.name;
369 return 0;
370 }
371
372 void send_response() override {
373 if (op_ret) {
374 set_req_state_err(s, op_ret);
375 }
376 dump_errno(s);
377 end_header(s, this, "application/json");
378
379 if (op_ret < 0) {
380 return;
381 }
382
383 {
384 Formatter::ObjectSection section(*s->formatter, "result");
385 encode_json("topic", result.topic, s->formatter);
386 encode_json("push_endpoint", result.dest.push_endpoint, s->formatter);
387 encode_json("args", result.dest.push_endpoint_args, s->formatter);
388 }
389 rgw_flush_formatter_and_reset(s, s->formatter);
390 }
391 };
392
393 class RGWPSDeleteSubOp : public RGWDefaultResponseOp {
394 protected:
395 string sub_name;
396 string topic_name;
397 std::unique_ptr<RGWUserPubSub> ups;
398
399 public:
400 RGWPSDeleteSubOp() {}
401
402 int verify_permission() override {
403 return 0;
404 }
405 void pre_exec() override {
406 rgw_bucket_object_pre_exec(s);
407 }
408 void execute() override;
409
410 const char* name() const override { return "pubsub_subscription_delete"; }
411 virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_DELETE; }
412 virtual uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
413 virtual int get_params() = 0;
414 };
415
416 void RGWPSDeleteSubOp::execute()
417 {
418 op_ret = get_params();
419 if (op_ret < 0) {
420 return;
421 }
422 ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
423 auto sub = ups->get_sub(sub_name);
424 op_ret = sub->unsubscribe(topic_name);
425 if (op_ret < 0) {
426 ldout(s->cct, 20) << "failed to remove subscription, ret=" << op_ret << dendl;
427 return;
428 }
429 }
430
431 class RGWPSDeleteSub_ObjStore_S3 : public RGWPSDeleteSubOp {
432 public:
433 explicit RGWPSDeleteSub_ObjStore_S3() {}
434
435 int get_params() override {
436 sub_name = s->object.name;
437 topic_name = s->info.args.get("topic");
438 return 0;
439 }
440 };
441
442 class RGWPSAckSubEventOp : public RGWDefaultResponseOp {
443 protected:
444 string sub_name;
445 string event_id;
446 std::unique_ptr<RGWUserPubSub> ups;
447
448 public:
449 RGWPSAckSubEventOp() {}
450
451 int verify_permission() override {
452 return 0;
453 }
454 void pre_exec() override {
455 rgw_bucket_object_pre_exec(s);
456 }
457 void execute() override;
458
459 const char* name() const override { return "pubsub_subscription_ack"; }
460 virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_ACK; }
461 virtual uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
462 virtual int get_params() = 0;
463 };
464
465 void RGWPSAckSubEventOp::execute()
466 {
467 op_ret = get_params();
468 if (op_ret < 0) {
469 return;
470 }
471 ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
472 auto sub = ups->get_sub(sub_name);
473 op_ret = sub->remove_event(event_id);
474 if (op_ret < 0) {
475 ldout(s->cct, 20) << "failed to ack event, ret=" << op_ret << dendl;
476 return;
477 }
478 }
479
480 class RGWPSAckSubEvent_ObjStore_S3 : public RGWPSAckSubEventOp {
481 public:
482 explicit RGWPSAckSubEvent_ObjStore_S3() {}
483
484 int get_params() override {
485 sub_name = s->object.name;
486
487 bool exists;
488
489 event_id = s->info.args.get("event-id", &exists);
490 if (!exists) {
491 ldout(s->cct, 20) << "ERROR: missing required param 'event-id' for request" << dendl;
492 return -EINVAL;
493 }
494 return 0;
495 }
496 };
497
498 class RGWPSPullSubEventsOp : public RGWOp {
499 protected:
500 int max_entries{0};
501 string sub_name;
502 string marker;
503 std::unique_ptr<RGWUserPubSub> ups;
504 RGWUserPubSub::Sub::list_events_result result;
505
506 public:
507 RGWPSPullSubEventsOp() {}
508
509 int verify_permission() override {
510 return 0;
511 }
512 void pre_exec() override {
513 rgw_bucket_object_pre_exec(s);
514 }
515 void execute() override;
516
517 const char* name() const override { return "pubsub_subscription_pull"; }
518 virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_PULL; }
519 virtual uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
520 virtual int get_params() = 0;
521 };
522
523 void RGWPSPullSubEventsOp::execute()
524 {
525 op_ret = get_params();
526 if (op_ret < 0) {
527 return;
528 }
529 ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
530 auto sub = ups->get_sub(sub_name);
531 op_ret = sub->list_events(marker, max_entries, &result);
532 if (op_ret < 0) {
533 ldout(s->cct, 20) << "failed to get subscription, ret=" << op_ret << dendl;
534 return;
535 }
536 }
537
538 class RGWPSPullSubEvents_ObjStore_S3 : public RGWPSPullSubEventsOp {
539 public:
540 explicit RGWPSPullSubEvents_ObjStore_S3() {}
541
542 int get_params() override {
543 sub_name = s->object.name;
544 marker = s->info.args.get("marker");
545 #define DEFAULT_MAX_ENTRIES 100
546 int ret = s->info.args.get_int("max-entries", &max_entries, DEFAULT_MAX_ENTRIES);
547 if (ret < 0) {
548 ldout(s->cct, 20) << "failed to parse 'max-entries' param" << dendl;
549 return -EINVAL;
550 }
551 return 0;
552 }
553
554 void send_response() override {
555 if (op_ret) {
556 set_req_state_err(s, op_ret);
557 }
558 dump_errno(s);
559 end_header(s, this, "application/json");
560
561 if (op_ret < 0) {
562 return;
563 }
564
565 encode_json("result", result, s->formatter);
566 rgw_flush_formatter_and_reset(s, s->formatter);
567 }
568 };
569
570 class RGWHandler_REST_PSSub_S3 : public RGWHandler_REST_S3 {
571 protected:
572 int init_permissions(RGWOp* op) override {
573 return 0;
574 }
575
576 int read_permissions(RGWOp* op) override {
577 return 0;
578 }
579 bool supports_quota() override {
580 return false;
581 }
582 RGWOp *op_get() override {
583 if (s->object.empty()) {
584 return nullptr;
585 }
586 if (s->info.args.exists("events")) {
587 return new RGWPSPullSubEvents_ObjStore_S3();
588 }
589 return new RGWPSGetSub_ObjStore_S3();
590 }
591 RGWOp *op_put() override {
592 if (!s->object.empty()) {
593 return new RGWPSCreateSub_ObjStore_S3();
594 }
595 return nullptr;
596 }
597 RGWOp *op_delete() override {
598 if (!s->object.empty()) {
599 return new RGWPSDeleteSub_ObjStore_S3();
600 }
601 return nullptr;
602 }
603 RGWOp *op_post() override {
604 if (s->info.args.exists("ack")) {
605 return new RGWPSAckSubEvent_ObjStore_S3();
606 }
607 return nullptr;
608 }
609 public:
610 explicit RGWHandler_REST_PSSub_S3(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
611 virtual ~RGWHandler_REST_PSSub_S3() {}
612 };
613
614
615 static int notif_bucket_path(const string& path, string *bucket_name)
616 {
617 if (path.empty()) {
618 return -EINVAL;
619 }
620 size_t pos = path.find('/');
621 if (pos == string::npos) {
622 return -EINVAL;
623 }
624 if (pos >= path.size()) {
625 return -EINVAL;
626 }
627
628 string type = path.substr(0, pos);
629 if (type != "bucket") {
630 return -EINVAL;
631 }
632
633 *bucket_name = path.substr(pos + 1);
634 return 0;
635 }
636
637 class RGWPSCreateNotifOp : public RGWDefaultResponseOp {
638 protected:
639 std::unique_ptr<RGWUserPubSub> ups;
640 string topic_name;
641 set<string, ltstr_nocase> events;
642
643 string bucket_name;
644 RGWBucketInfo bucket_info;
645
646 public:
647 RGWPSCreateNotifOp() {}
648
649 int verify_permission() override {
650 int ret = get_params();
651 if (ret < 0) {
652 return ret;
653 }
654
655 ret = store->get_bucket_info(*s->sysobj_ctx, s->owner.get_id().tenant, bucket_name,
656 bucket_info, nullptr, nullptr);
657 if (ret < 0) {
658 return ret;
659 }
660
661 if (bucket_info.owner != s->owner.get_id()) {
662 ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl;
663 return -EPERM;
664 }
665 return 0;
666 }
667 void pre_exec() override {
668 rgw_bucket_object_pre_exec(s);
669 }
670 void execute() override;
671
672 const char* name() const override { return "pubsub_notification_create"; }
673 virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_CREATE; }
674 virtual uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
675 virtual int get_params() = 0;
676 };
677
678 void RGWPSCreateNotifOp::execute()
679 {
680 op_ret = get_params();
681 if (op_ret < 0) {
682 return;
683 }
684
685 ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
686 auto b = ups->get_bucket(bucket_info.bucket);
687 op_ret = b->create_notification(topic_name, events);
688 if (op_ret < 0) {
689 ldout(s->cct, 20) << "failed to create notification, ret=" << op_ret << dendl;
690 return;
691 }
692 }
693
694 class RGWPSCreateNotif_ObjStore_S3 : public RGWPSCreateNotifOp {
695 public:
696 explicit RGWPSCreateNotif_ObjStore_S3() {}
697
698 int get_params() override {
699 bool exists;
700 topic_name = s->info.args.get("topic", &exists);
701 if (!exists) {
702 ldout(s->cct, 20) << "param 'topic' not provided" << dendl;
703 return -EINVAL;
704 }
705
706 string events_str = s->info.args.get("events", &exists);
707 if (exists) {
708 get_str_set(events_str, ",", events);
709 }
710 return notif_bucket_path(s->object.name, &bucket_name);
711 }
712 };
713
714 class RGWPSDeleteNotifOp : public RGWDefaultResponseOp {
715 protected:
716 std::unique_ptr<RGWUserPubSub> ups;
717 string topic_name;
718 string bucket_name;
719 RGWBucketInfo bucket_info;
720
721 public:
722 RGWPSDeleteNotifOp() {}
723
724 int verify_permission() override {
725 int ret = get_params();
726 if (ret < 0) {
727 return ret;
728 }
729
730 ret = store->get_bucket_info(*s->sysobj_ctx, s->owner.get_id().tenant, bucket_name,
731 bucket_info, nullptr, nullptr);
732 if (ret < 0) {
733 return ret;
734 }
735
736 if (bucket_info.owner != s->owner.get_id()) {
737 ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl;
738 return -EPERM;
739 }
740 return 0;
741 }
742 void pre_exec() override {
743 rgw_bucket_object_pre_exec(s);
744 }
745 void execute() override;
746
747 const char* name() const override { return "pubsub_notification_delete"; }
748 virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_DELETE; }
749 virtual uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
750 virtual int get_params() = 0;
751 };
752
753 void RGWPSDeleteNotifOp::execute()
754 {
755 op_ret = get_params();
756 if (op_ret < 0) {
757 return;
758 }
759
760 ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
761 auto b = ups->get_bucket(bucket_info.bucket);
762 op_ret = b->remove_notification(topic_name);
763 if (op_ret < 0) {
764 ldout(s->cct, 20) << "failed to remove notification, ret=" << op_ret << dendl;
765 return;
766 }
767 }
768
769 class RGWPSDeleteNotif_ObjStore_S3 : public RGWPSCreateNotifOp {
770 public:
771 explicit RGWPSDeleteNotif_ObjStore_S3() {}
772
773 int get_params() override {
774 bool exists;
775 topic_name = s->info.args.get("topic", &exists);
776 if (!exists) {
777 ldout(s->cct, 20) << "param 'topic' not provided" << dendl;
778 return -EINVAL;
779 }
780 return notif_bucket_path(s->object.name, &bucket_name);
781 }
782 };
783
784 class RGWPSListNotifsOp : public RGWOp {
785 protected:
786 string bucket_name;
787 RGWBucketInfo bucket_info;
788 std::unique_ptr<RGWUserPubSub> ups;
789 rgw_pubsub_bucket_topics result;
790
791
792 public:
793 RGWPSListNotifsOp() {}
794
795 int verify_permission() override {
796 int ret = get_params();
797 if (ret < 0) {
798 return ret;
799 }
800
801 ret = store->get_bucket_info(*s->sysobj_ctx, s->owner.get_id().tenant, bucket_name,
802 bucket_info, nullptr, nullptr);
803 if (ret < 0) {
804 return ret;
805 }
806
807 if (bucket_info.owner != s->owner.get_id()) {
808 ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl;
809 return -EPERM;
810 }
811
812 return 0;
813 }
814 void pre_exec() override {
815 rgw_bucket_object_pre_exec(s);
816 }
817 void execute() override;
818
819 const char* name() const override { return "pubsub_notifications_list"; }
820 virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_LIST; }
821 virtual uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
822 virtual int get_params() = 0;
823 };
824
825 void RGWPSListNotifsOp::execute()
826 {
827 ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
828 auto b = ups->get_bucket(bucket_info.bucket);
829 op_ret = b->get_topics(&result);
830 if (op_ret < 0) {
831 ldout(s->cct, 20) << "failed to get topics, ret=" << op_ret << dendl;
832 return;
833 }
834
835 }
836
837 class RGWPSListNotifs_ObjStore_S3 : public RGWPSListNotifsOp {
838 public:
839 explicit RGWPSListNotifs_ObjStore_S3() {}
840
841 int get_params() override {
842 return notif_bucket_path(s->object.name, &bucket_name);
843 }
844
845 void send_response() override {
846 if (op_ret) {
847 set_req_state_err(s, op_ret);
848 }
849 dump_errno(s);
850 end_header(s, this, "application/json");
851
852 if (op_ret < 0) {
853 return;
854 }
855
856 encode_json("result", result, s->formatter);
857 rgw_flush_formatter_and_reset(s, s->formatter);
858 }
859 };
860
861
862 class RGWHandler_REST_PSNotifs_S3 : public RGWHandler_REST_S3 {
863 protected:
864 int init_permissions(RGWOp* op) override {
865 return 0;
866 }
867
868 int read_permissions(RGWOp* op) override {
869 return 0;
870 }
871 bool supports_quota() override {
872 return false;
873 }
874 RGWOp *op_get() override {
875 if (s->object.empty()) {
876 return nullptr;
877 }
878 return new RGWPSListNotifs_ObjStore_S3();
879 }
880 RGWOp *op_put() override {
881 if (!s->object.empty()) {
882 return new RGWPSCreateNotif_ObjStore_S3();
883 }
884 return nullptr;
885 }
886 RGWOp *op_delete() override {
887 if (!s->object.empty()) {
888 return new RGWPSDeleteNotif_ObjStore_S3();
889 }
890 return nullptr;
891 }
892 public:
893 explicit RGWHandler_REST_PSNotifs_S3(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
894 virtual ~RGWHandler_REST_PSNotifs_S3() {}
895 };
896
897
898 RGWHandler_REST* RGWRESTMgr_PubSub_S3::get_handler(struct req_state* const s,
899 const rgw::auth::StrategyRegistry& auth_registry,
900 const std::string& frontend_prefix)
901 {
902 int ret =
903 RGWHandler_REST_S3::init_from_header(s,
904 RGW_FORMAT_JSON, true);
905 if (ret < 0) {
906 return nullptr;
907 }
908
909 RGWHandler_REST *handler = nullptr;;
910
911 if (s->init_state.url_bucket == "topics") {
912 handler = new RGWHandler_REST_PSTopic_S3(auth_registry);
913 }
914
915 if (s->init_state.url_bucket == "subscriptions") {
916 handler = new RGWHandler_REST_PSSub_S3(auth_registry);
917 }
918
919 if (s->init_state.url_bucket == "notifications") {
920 handler = new RGWHandler_REST_PSNotifs_S3(auth_registry);
921 }
922
923 ldout(s->cct, 20) << __func__ << " handler=" << (handler ? typeid(*handler).name() : "<null>") << dendl;
924 return handler;
925 }
926