1 #include "rgw_sync_module_pubsub.h"
2 #include "rgw_sync_module_pubsub_rest.h"
3 #include "rgw_pubsub.h"
6 #include "rgw_rest_s3.h"
8 #define dout_context g_ceph_context
9 #define dout_subsys ceph_subsys_rgw
11 class RGWPSCreateTopicOp
: public RGWDefaultResponseOp
{
13 std::unique_ptr
<RGWUserPubSub
> ups
;
18 RGWPSCreateTopicOp() {}
20 int verify_permission() override
{
23 void pre_exec() override
{
24 rgw_bucket_object_pre_exec(s
);
26 void execute() override
;
28 const char* name() const override
{ return "pubsub_topic_create"; }
29 virtual RGWOpType
get_type() override
{ return RGW_OP_PUBSUB_TOPIC_CREATE
; }
30 virtual uint32_t op_mask() override
{ return RGW_OP_TYPE_WRITE
; }
31 virtual int get_params() = 0;
34 void RGWPSCreateTopicOp::execute()
36 op_ret
= get_params();
41 ups
= make_unique
<RGWUserPubSub
>(store
, s
->owner
.get_id());
42 op_ret
= ups
->create_topic(topic_name
);
44 ldout(s
->cct
, 20) << "failed to create topic, ret=" << op_ret
<< dendl
;
49 class RGWPSCreateTopic_ObjStore_S3
: public RGWPSCreateTopicOp
{
51 explicit RGWPSCreateTopic_ObjStore_S3() {}
53 int get_params() override
{
54 topic_name
= s
->object
.name
;
59 class RGWPSListTopicsOp
: public RGWOp
{
61 std::unique_ptr
<RGWUserPubSub
> ups
;
62 rgw_pubsub_user_topics result
;
66 RGWPSListTopicsOp() {}
68 int verify_permission() override
{
71 void pre_exec() override
{
72 rgw_bucket_object_pre_exec(s
);
74 void execute() override
;
76 const char* name() const override
{ return "pubsub_topics_list"; }
77 virtual RGWOpType
get_type() override
{ return RGW_OP_PUBSUB_TOPICS_LIST
; }
78 virtual uint32_t op_mask() override
{ return RGW_OP_TYPE_READ
; }
81 void RGWPSListTopicsOp::execute()
83 ups
= make_unique
<RGWUserPubSub
>(store
, s
->owner
.get_id());
84 op_ret
= ups
->get_user_topics(&result
);
86 ldout(s
->cct
, 20) << "failed to get topics, ret=" << op_ret
<< dendl
;
92 class RGWPSListTopics_ObjStore_S3
: public RGWPSListTopicsOp
{
94 explicit RGWPSListTopics_ObjStore_S3() {}
96 void send_response() override
{
98 set_req_state_err(s
, op_ret
);
101 end_header(s
, this, "application/json");
107 encode_json("result", result
, s
->formatter
);
108 rgw_flush_formatter_and_reset(s
, s
->formatter
);
112 class RGWPSGetTopicOp
: public RGWOp
{
115 std::unique_ptr
<RGWUserPubSub
> ups
;
116 rgw_pubsub_topic_subs result
;
121 int verify_permission() override
{
124 void pre_exec() override
{
125 rgw_bucket_object_pre_exec(s
);
127 void execute() override
;
129 const char* name() const override
{ return "pubsub_topic_get"; }
130 virtual RGWOpType
get_type() override
{ return RGW_OP_PUBSUB_TOPIC_GET
; }
131 virtual uint32_t op_mask() override
{ return RGW_OP_TYPE_READ
; }
132 virtual int get_params() = 0;
135 void RGWPSGetTopicOp::execute()
137 op_ret
= get_params();
141 ups
= make_unique
<RGWUserPubSub
>(store
, s
->owner
.get_id());
142 op_ret
= ups
->get_topic(topic_name
, &result
);
144 ldout(s
->cct
, 20) << "failed to get topic, ret=" << op_ret
<< dendl
;
149 class RGWPSGetTopic_ObjStore_S3
: public RGWPSGetTopicOp
{
151 explicit RGWPSGetTopic_ObjStore_S3() {}
153 int get_params() override
{
154 topic_name
= s
->object
.name
;
158 void send_response() override
{
160 set_req_state_err(s
, op_ret
);
163 end_header(s
, this, "application/json");
169 encode_json("result", result
, s
->formatter
);
170 rgw_flush_formatter_and_reset(s
, s
->formatter
);
174 class RGWPSDeleteTopicOp
: public RGWDefaultResponseOp
{
177 std::unique_ptr
<RGWUserPubSub
> ups
;
180 RGWPSDeleteTopicOp() {}
182 int verify_permission() override
{
185 void pre_exec() override
{
186 rgw_bucket_object_pre_exec(s
);
188 void execute() override
;
190 const char* name() const override
{ return "pubsub_topic_delete"; }
191 virtual RGWOpType
get_type() override
{ return RGW_OP_PUBSUB_TOPIC_DELETE
; }
192 virtual uint32_t op_mask() override
{ return RGW_OP_TYPE_DELETE
; }
193 virtual int get_params() = 0;
196 void RGWPSDeleteTopicOp::execute()
198 op_ret
= get_params();
203 ups
= make_unique
<RGWUserPubSub
>(store
, s
->owner
.get_id());
204 op_ret
= ups
->remove_topic(topic_name
);
206 ldout(s
->cct
, 20) << "failed to remove topic, ret=" << op_ret
<< dendl
;
211 class RGWPSDeleteTopic_ObjStore_S3
: public RGWPSDeleteTopicOp
{
213 explicit RGWPSDeleteTopic_ObjStore_S3() {}
215 int get_params() override
{
216 topic_name
= s
->object
.name
;
221 class RGWHandler_REST_PSTopic_S3
: public RGWHandler_REST_S3
{
223 int init_permissions(RGWOp
* op
) override
{
226 int read_permissions(RGWOp
* op
) override
{
229 bool supports_quota() override
{
232 RGWOp
*op_get() override
{
233 if (s
->init_state
.url_bucket
.empty()) {
236 if (s
->object
.empty()) {
237 return new RGWPSListTopics_ObjStore_S3();
239 return new RGWPSGetTopic_ObjStore_S3();
241 RGWOp
*op_put() override
{
242 if (!s
->object
.empty()) {
243 return new RGWPSCreateTopic_ObjStore_S3();
247 RGWOp
*op_delete() override
{
248 if (!s
->object
.empty()) {
249 return new RGWPSDeleteTopic_ObjStore_S3();
254 explicit RGWHandler_REST_PSTopic_S3(const rgw::auth::StrategyRegistry
& auth_registry
) : RGWHandler_REST_S3(auth_registry
) {}
255 virtual ~RGWHandler_REST_PSTopic_S3() {}
259 class RGWPSCreateSubOp
: public RGWDefaultResponseOp
{
263 std::unique_ptr
<RGWUserPubSub
> ups
;
264 rgw_pubsub_sub_dest dest
;
267 RGWPSCreateSubOp() {}
269 int verify_permission() override
{
272 void pre_exec() override
{
273 rgw_bucket_object_pre_exec(s
);
275 void execute() override
;
277 const char* name() const override
{ return "pubsub_subscription_create"; }
278 virtual RGWOpType
get_type() override
{ return RGW_OP_PUBSUB_SUB_CREATE
; }
279 virtual uint32_t op_mask() override
{ return RGW_OP_TYPE_WRITE
; }
280 virtual int get_params() = 0;
283 void RGWPSCreateSubOp::execute()
285 op_ret
= get_params();
289 ups
= make_unique
<RGWUserPubSub
>(store
, s
->owner
.get_id());
290 auto sub
= ups
->get_sub(sub_name
);
291 op_ret
= sub
->subscribe(topic_name
, dest
);
293 ldout(s
->cct
, 20) << "failed to create subscription, ret=" << op_ret
<< dendl
;
298 class RGWPSCreateSub_ObjStore_S3
: public RGWPSCreateSubOp
{
300 explicit RGWPSCreateSub_ObjStore_S3() {}
302 int get_params() override
{
303 sub_name
= s
->object
.name
;
307 topic_name
= s
->info
.args
.get("topic", &exists
);
309 ldout(s
->cct
, 20) << "ERROR: missing required param 'topic' for request" << dendl
;
313 auto psmodule
= static_cast<RGWPSSyncModuleInstance
*>(store
->get_sync_module().get());
314 auto conf
= psmodule
->get_effective_conf();
316 dest
.push_endpoint
= s
->info
.args
.get("push-endpoint");
317 dest
.bucket_name
= string(conf
["data_bucket_prefix"]) + s
->owner
.get_id().to_str() + "-" + topic_name
;
318 dest
.oid_prefix
= string(conf
["data_oid_prefix"]) + sub_name
+ "/";
319 dest
.push_endpoint_args
= s
->info
.args
.get_str();
325 class RGWPSGetSubOp
: public RGWOp
{
328 std::unique_ptr
<RGWUserPubSub
> ups
;
329 rgw_pubsub_sub_config result
;
334 int verify_permission() override
{
337 void pre_exec() override
{
338 rgw_bucket_object_pre_exec(s
);
340 void execute() override
;
342 const char* name() const override
{ return "pubsub_subscription_get"; }
343 virtual RGWOpType
get_type() override
{ return RGW_OP_PUBSUB_SUB_GET
; }
344 virtual uint32_t op_mask() override
{ return RGW_OP_TYPE_READ
; }
345 virtual int get_params() = 0;
348 void RGWPSGetSubOp::execute()
350 op_ret
= get_params();
354 ups
= make_unique
<RGWUserPubSub
>(store
, s
->owner
.get_id());
355 auto sub
= ups
->get_sub(sub_name
);
356 op_ret
= sub
->get_conf(&result
);
358 ldout(s
->cct
, 20) << "failed to get subscription, ret=" << op_ret
<< dendl
;
363 class RGWPSGetSub_ObjStore_S3
: public RGWPSGetSubOp
{
365 explicit RGWPSGetSub_ObjStore_S3() {}
367 int get_params() override
{
368 sub_name
= s
->object
.name
;
372 void send_response() override
{
374 set_req_state_err(s
, op_ret
);
377 end_header(s
, this, "application/json");
384 Formatter::ObjectSection
section(*s
->formatter
, "result");
385 encode_json("topic", result
.topic
, s
->formatter
);
386 encode_json("push_endpoint", result
.dest
.push_endpoint
, s
->formatter
);
387 encode_json("args", result
.dest
.push_endpoint_args
, s
->formatter
);
389 rgw_flush_formatter_and_reset(s
, s
->formatter
);
393 class RGWPSDeleteSubOp
: public RGWDefaultResponseOp
{
397 std::unique_ptr
<RGWUserPubSub
> ups
;
400 RGWPSDeleteSubOp() {}
402 int verify_permission() override
{
405 void pre_exec() override
{
406 rgw_bucket_object_pre_exec(s
);
408 void execute() override
;
410 const char* name() const override
{ return "pubsub_subscription_delete"; }
411 virtual RGWOpType
get_type() override
{ return RGW_OP_PUBSUB_SUB_DELETE
; }
412 virtual uint32_t op_mask() override
{ return RGW_OP_TYPE_DELETE
; }
413 virtual int get_params() = 0;
416 void RGWPSDeleteSubOp::execute()
418 op_ret
= get_params();
422 ups
= make_unique
<RGWUserPubSub
>(store
, s
->owner
.get_id());
423 auto sub
= ups
->get_sub(sub_name
);
424 op_ret
= sub
->unsubscribe(topic_name
);
426 ldout(s
->cct
, 20) << "failed to remove subscription, ret=" << op_ret
<< dendl
;
431 class RGWPSDeleteSub_ObjStore_S3
: public RGWPSDeleteSubOp
{
433 explicit RGWPSDeleteSub_ObjStore_S3() {}
435 int get_params() override
{
436 sub_name
= s
->object
.name
;
437 topic_name
= s
->info
.args
.get("topic");
442 class RGWPSAckSubEventOp
: public RGWDefaultResponseOp
{
446 std::unique_ptr
<RGWUserPubSub
> ups
;
449 RGWPSAckSubEventOp() {}
451 int verify_permission() override
{
454 void pre_exec() override
{
455 rgw_bucket_object_pre_exec(s
);
457 void execute() override
;
459 const char* name() const override
{ return "pubsub_subscription_ack"; }
460 virtual RGWOpType
get_type() override
{ return RGW_OP_PUBSUB_SUB_ACK
; }
461 virtual uint32_t op_mask() override
{ return RGW_OP_TYPE_WRITE
; }
462 virtual int get_params() = 0;
465 void RGWPSAckSubEventOp::execute()
467 op_ret
= get_params();
471 ups
= make_unique
<RGWUserPubSub
>(store
, s
->owner
.get_id());
472 auto sub
= ups
->get_sub(sub_name
);
473 op_ret
= sub
->remove_event(event_id
);
475 ldout(s
->cct
, 20) << "failed to ack event, ret=" << op_ret
<< dendl
;
480 class RGWPSAckSubEvent_ObjStore_S3
: public RGWPSAckSubEventOp
{
482 explicit RGWPSAckSubEvent_ObjStore_S3() {}
484 int get_params() override
{
485 sub_name
= s
->object
.name
;
489 event_id
= s
->info
.args
.get("event-id", &exists
);
491 ldout(s
->cct
, 20) << "ERROR: missing required param 'event-id' for request" << dendl
;
498 class RGWPSPullSubEventsOp
: public RGWOp
{
503 std::unique_ptr
<RGWUserPubSub
> ups
;
504 RGWUserPubSub::Sub::list_events_result result
;
507 RGWPSPullSubEventsOp() {}
509 int verify_permission() override
{
512 void pre_exec() override
{
513 rgw_bucket_object_pre_exec(s
);
515 void execute() override
;
517 const char* name() const override
{ return "pubsub_subscription_pull"; }
518 virtual RGWOpType
get_type() override
{ return RGW_OP_PUBSUB_SUB_PULL
; }
519 virtual uint32_t op_mask() override
{ return RGW_OP_TYPE_READ
; }
520 virtual int get_params() = 0;
523 void RGWPSPullSubEventsOp::execute()
525 op_ret
= get_params();
529 ups
= make_unique
<RGWUserPubSub
>(store
, s
->owner
.get_id());
530 auto sub
= ups
->get_sub(sub_name
);
531 op_ret
= sub
->list_events(marker
, max_entries
, &result
);
533 ldout(s
->cct
, 20) << "failed to get subscription, ret=" << op_ret
<< dendl
;
538 class RGWPSPullSubEvents_ObjStore_S3
: public RGWPSPullSubEventsOp
{
540 explicit RGWPSPullSubEvents_ObjStore_S3() {}
542 int get_params() override
{
543 sub_name
= s
->object
.name
;
544 marker
= s
->info
.args
.get("marker");
545 #define DEFAULT_MAX_ENTRIES 100
546 int ret
= s
->info
.args
.get_int("max-entries", &max_entries
, DEFAULT_MAX_ENTRIES
);
548 ldout(s
->cct
, 20) << "failed to parse 'max-entries' param" << dendl
;
554 void send_response() override
{
556 set_req_state_err(s
, op_ret
);
559 end_header(s
, this, "application/json");
565 encode_json("result", result
, s
->formatter
);
566 rgw_flush_formatter_and_reset(s
, s
->formatter
);
570 class RGWHandler_REST_PSSub_S3
: public RGWHandler_REST_S3
{
572 int init_permissions(RGWOp
* op
) override
{
576 int read_permissions(RGWOp
* op
) override
{
579 bool supports_quota() override
{
582 RGWOp
*op_get() override
{
583 if (s
->object
.empty()) {
586 if (s
->info
.args
.exists("events")) {
587 return new RGWPSPullSubEvents_ObjStore_S3();
589 return new RGWPSGetSub_ObjStore_S3();
591 RGWOp
*op_put() override
{
592 if (!s
->object
.empty()) {
593 return new RGWPSCreateSub_ObjStore_S3();
597 RGWOp
*op_delete() override
{
598 if (!s
->object
.empty()) {
599 return new RGWPSDeleteSub_ObjStore_S3();
603 RGWOp
*op_post() override
{
604 if (s
->info
.args
.exists("ack")) {
605 return new RGWPSAckSubEvent_ObjStore_S3();
610 explicit RGWHandler_REST_PSSub_S3(const rgw::auth::StrategyRegistry
& auth_registry
) : RGWHandler_REST_S3(auth_registry
) {}
611 virtual ~RGWHandler_REST_PSSub_S3() {}
615 static int notif_bucket_path(const string
& path
, string
*bucket_name
)
620 size_t pos
= path
.find('/');
621 if (pos
== string::npos
) {
624 if (pos
>= path
.size()) {
628 string type
= path
.substr(0, pos
);
629 if (type
!= "bucket") {
633 *bucket_name
= path
.substr(pos
+ 1);
637 class RGWPSCreateNotifOp
: public RGWDefaultResponseOp
{
639 std::unique_ptr
<RGWUserPubSub
> ups
;
641 set
<string
, ltstr_nocase
> events
;
644 RGWBucketInfo bucket_info
;
647 RGWPSCreateNotifOp() {}
649 int verify_permission() override
{
650 int ret
= get_params();
655 ret
= store
->get_bucket_info(*s
->sysobj_ctx
, s
->owner
.get_id().tenant
, bucket_name
,
656 bucket_info
, nullptr, nullptr);
661 if (bucket_info
.owner
!= s
->owner
.get_id()) {
662 ldout(s
->cct
, 20) << "user doesn't own bucket, cannot create topic" << dendl
;
667 void pre_exec() override
{
668 rgw_bucket_object_pre_exec(s
);
670 void execute() override
;
672 const char* name() const override
{ return "pubsub_notification_create"; }
673 virtual RGWOpType
get_type() override
{ return RGW_OP_PUBSUB_NOTIF_CREATE
; }
674 virtual uint32_t op_mask() override
{ return RGW_OP_TYPE_WRITE
; }
675 virtual int get_params() = 0;
678 void RGWPSCreateNotifOp::execute()
680 op_ret
= get_params();
685 ups
= make_unique
<RGWUserPubSub
>(store
, s
->owner
.get_id());
686 auto b
= ups
->get_bucket(bucket_info
.bucket
);
687 op_ret
= b
->create_notification(topic_name
, events
);
689 ldout(s
->cct
, 20) << "failed to create notification, ret=" << op_ret
<< dendl
;
694 class RGWPSCreateNotif_ObjStore_S3
: public RGWPSCreateNotifOp
{
696 explicit RGWPSCreateNotif_ObjStore_S3() {}
698 int get_params() override
{
700 topic_name
= s
->info
.args
.get("topic", &exists
);
702 ldout(s
->cct
, 20) << "param 'topic' not provided" << dendl
;
706 string events_str
= s
->info
.args
.get("events", &exists
);
708 get_str_set(events_str
, ",", events
);
710 return notif_bucket_path(s
->object
.name
, &bucket_name
);
714 class RGWPSDeleteNotifOp
: public RGWDefaultResponseOp
{
716 std::unique_ptr
<RGWUserPubSub
> ups
;
719 RGWBucketInfo bucket_info
;
722 RGWPSDeleteNotifOp() {}
724 int verify_permission() override
{
725 int ret
= get_params();
730 ret
= store
->get_bucket_info(*s
->sysobj_ctx
, s
->owner
.get_id().tenant
, bucket_name
,
731 bucket_info
, nullptr, nullptr);
736 if (bucket_info
.owner
!= s
->owner
.get_id()) {
737 ldout(s
->cct
, 20) << "user doesn't own bucket, cannot create topic" << dendl
;
742 void pre_exec() override
{
743 rgw_bucket_object_pre_exec(s
);
745 void execute() override
;
747 const char* name() const override
{ return "pubsub_notification_delete"; }
748 virtual RGWOpType
get_type() override
{ return RGW_OP_PUBSUB_NOTIF_DELETE
; }
749 virtual uint32_t op_mask() override
{ return RGW_OP_TYPE_DELETE
; }
750 virtual int get_params() = 0;
753 void RGWPSDeleteNotifOp::execute()
755 op_ret
= get_params();
760 ups
= make_unique
<RGWUserPubSub
>(store
, s
->owner
.get_id());
761 auto b
= ups
->get_bucket(bucket_info
.bucket
);
762 op_ret
= b
->remove_notification(topic_name
);
764 ldout(s
->cct
, 20) << "failed to remove notification, ret=" << op_ret
<< dendl
;
769 class RGWPSDeleteNotif_ObjStore_S3
: public RGWPSCreateNotifOp
{
771 explicit RGWPSDeleteNotif_ObjStore_S3() {}
773 int get_params() override
{
775 topic_name
= s
->info
.args
.get("topic", &exists
);
777 ldout(s
->cct
, 20) << "param 'topic' not provided" << dendl
;
780 return notif_bucket_path(s
->object
.name
, &bucket_name
);
784 class RGWPSListNotifsOp
: public RGWOp
{
787 RGWBucketInfo bucket_info
;
788 std::unique_ptr
<RGWUserPubSub
> ups
;
789 rgw_pubsub_bucket_topics result
;
793 RGWPSListNotifsOp() {}
795 int verify_permission() override
{
796 int ret
= get_params();
801 ret
= store
->get_bucket_info(*s
->sysobj_ctx
, s
->owner
.get_id().tenant
, bucket_name
,
802 bucket_info
, nullptr, nullptr);
807 if (bucket_info
.owner
!= s
->owner
.get_id()) {
808 ldout(s
->cct
, 20) << "user doesn't own bucket, cannot create topic" << dendl
;
814 void pre_exec() override
{
815 rgw_bucket_object_pre_exec(s
);
817 void execute() override
;
819 const char* name() const override
{ return "pubsub_notifications_list"; }
820 virtual RGWOpType
get_type() override
{ return RGW_OP_PUBSUB_NOTIF_LIST
; }
821 virtual uint32_t op_mask() override
{ return RGW_OP_TYPE_READ
; }
822 virtual int get_params() = 0;
825 void RGWPSListNotifsOp::execute()
827 ups
= make_unique
<RGWUserPubSub
>(store
, s
->owner
.get_id());
828 auto b
= ups
->get_bucket(bucket_info
.bucket
);
829 op_ret
= b
->get_topics(&result
);
831 ldout(s
->cct
, 20) << "failed to get topics, ret=" << op_ret
<< dendl
;
837 class RGWPSListNotifs_ObjStore_S3
: public RGWPSListNotifsOp
{
839 explicit RGWPSListNotifs_ObjStore_S3() {}
841 int get_params() override
{
842 return notif_bucket_path(s
->object
.name
, &bucket_name
);
845 void send_response() override
{
847 set_req_state_err(s
, op_ret
);
850 end_header(s
, this, "application/json");
856 encode_json("result", result
, s
->formatter
);
857 rgw_flush_formatter_and_reset(s
, s
->formatter
);
862 class RGWHandler_REST_PSNotifs_S3
: public RGWHandler_REST_S3
{
864 int init_permissions(RGWOp
* op
) override
{
868 int read_permissions(RGWOp
* op
) override
{
871 bool supports_quota() override
{
874 RGWOp
*op_get() override
{
875 if (s
->object
.empty()) {
878 return new RGWPSListNotifs_ObjStore_S3();
880 RGWOp
*op_put() override
{
881 if (!s
->object
.empty()) {
882 return new RGWPSCreateNotif_ObjStore_S3();
886 RGWOp
*op_delete() override
{
887 if (!s
->object
.empty()) {
888 return new RGWPSDeleteNotif_ObjStore_S3();
893 explicit RGWHandler_REST_PSNotifs_S3(const rgw::auth::StrategyRegistry
& auth_registry
) : RGWHandler_REST_S3(auth_registry
) {}
894 virtual ~RGWHandler_REST_PSNotifs_S3() {}
898 RGWHandler_REST
* RGWRESTMgr_PubSub_S3::get_handler(struct req_state
* const s
,
899 const rgw::auth::StrategyRegistry
& auth_registry
,
900 const std::string
& frontend_prefix
)
903 RGWHandler_REST_S3::init_from_header(s
,
904 RGW_FORMAT_JSON
, true);
909 RGWHandler_REST
*handler
= nullptr;;
911 if (s
->init_state
.url_bucket
== "topics") {
912 handler
= new RGWHandler_REST_PSTopic_S3(auth_registry
);
915 if (s
->init_state
.url_bucket
== "subscriptions") {
916 handler
= new RGWHandler_REST_PSSub_S3(auth_registry
);
919 if (s
->init_state
.url_bucket
== "notifications") {
920 handler
= new RGWHandler_REST_PSNotifs_S3(auth_registry
);
923 ldout(s
->cct
, 20) << __func__
<< " handler=" << (handler
? typeid(*handler
).name() : "<null>") << dendl
;