1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
5 #include "rgw_rest_pubsub_common.h"
6 #include "rgw_rest_pubsub.h"
7 #include "rgw_sync_module_pubsub.h"
8 #include "rgw_pubsub_push.h"
9 #include "rgw_sync_module_pubsub_rest.h"
10 #include "rgw_pubsub.h"
13 #include "rgw_rest_s3.h"
17 #define dout_context g_ceph_context
18 #define dout_subsys ceph_subsys_rgw
20 // command: PUT /topics/<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]
21 class RGWPSCreateTopic_ObjStore
: public RGWPSCreateTopicOp
{
23 int get_params() override
{
25 topic_name
= s
->object
.name
;
27 dest
.push_endpoint
= s
->info
.args
.get("push-endpoint");
28 dest
.push_endpoint_args
= s
->info
.args
.get_str();
29 // dest object only stores endpoint info
30 // bucket to store events/records will be set only when subscription is created
31 dest
.bucket_name
= "";
33 dest
.arn_topic
= topic_name
;
34 // the topic ARN will be sent in the reply
35 const rgw::ARN
arn(rgw::Partition::aws
, rgw::Service::sns
,
36 store
->svc
.zone
->get_zonegroup().get_name(),
37 s
->user
->user_id
.tenant
, topic_name
);
38 topic_arn
= arn
.to_string();
42 void send_response() override
{
44 set_req_state_err(s
, op_ret
);
47 end_header(s
, this, "application/json");
54 Formatter::ObjectSection
section(*s
->formatter
, "result");
55 encode_json("arn", topic_arn
, s
->formatter
);
57 rgw_flush_formatter_and_reset(s
, s
->formatter
);
61 // command: GET /topics
62 class RGWPSListTopics_ObjStore
: public RGWPSListTopicsOp
{
64 void send_response() override
{
66 set_req_state_err(s
, op_ret
);
69 end_header(s
, this, "application/json");
75 encode_json("result", result
, s
->formatter
);
76 rgw_flush_formatter_and_reset(s
, s
->formatter
);
80 // command: GET /topics/<topic-name>
81 class RGWPSGetTopic_ObjStore
: public RGWPSGetTopicOp
{
83 int get_params() override
{
84 topic_name
= s
->object
.name
;
88 void send_response() override
{
90 set_req_state_err(s
, op_ret
);
93 end_header(s
, this, "application/json");
99 encode_json("result", result
, s
->formatter
);
100 rgw_flush_formatter_and_reset(s
, s
->formatter
);
104 // command: DELETE /topics/<topic-name>
105 class RGWPSDeleteTopic_ObjStore
: public RGWPSDeleteTopicOp
{
107 int get_params() override
{
108 topic_name
= s
->object
.name
;
113 // ceph specifc topics handler factory
114 class RGWHandler_REST_PSTopic
: public RGWHandler_REST_S3
{
116 int init_permissions(RGWOp
* op
) override
{
120 int read_permissions(RGWOp
* op
) override
{
124 bool supports_quota() override
{
128 RGWOp
*op_get() override
{
129 if (s
->init_state
.url_bucket
.empty()) {
132 if (s
->object
.empty()) {
133 return new RGWPSListTopics_ObjStore();
135 return new RGWPSGetTopic_ObjStore();
137 RGWOp
*op_put() override
{
138 if (!s
->object
.empty()) {
139 return new RGWPSCreateTopic_ObjStore();
143 RGWOp
*op_delete() override
{
144 if (!s
->object
.empty()) {
145 return new RGWPSDeleteTopic_ObjStore();
150 explicit RGWHandler_REST_PSTopic(const rgw::auth::StrategyRegistry
& auth_registry
) : RGWHandler_REST_S3(auth_registry
) {}
151 virtual ~RGWHandler_REST_PSTopic() = default;
154 // command: PUT /subscriptions/<sub-name>?topic=<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]...
155 class RGWPSCreateSub_ObjStore
: public RGWPSCreateSubOp
{
157 int get_params() override
{
158 sub_name
= s
->object
.name
;
161 topic_name
= s
->info
.args
.get("topic", &exists
);
163 ldout(s
->cct
, 1) << "missing required param 'topic'" << dendl
;
167 const auto psmodule
= static_cast<RGWPSSyncModuleInstance
*>(store
->get_sync_module().get());
168 const auto& conf
= psmodule
->get_effective_conf();
170 dest
.push_endpoint
= s
->info
.args
.get("push-endpoint");
171 dest
.bucket_name
= string(conf
["data_bucket_prefix"]) + s
->owner
.get_id().to_str() + "-" + topic_name
;
172 dest
.oid_prefix
= string(conf
["data_oid_prefix"]) + sub_name
+ "/";
173 dest
.push_endpoint_args
= s
->info
.args
.get_str();
174 dest
.arn_topic
= topic_name
;
180 // command: GET /subscriptions/<sub-name>
181 class RGWPSGetSub_ObjStore
: public RGWPSGetSubOp
{
183 int get_params() override
{
184 sub_name
= s
->object
.name
;
187 void send_response() override
{
189 set_req_state_err(s
, op_ret
);
192 end_header(s
, this, "application/json");
198 encode_json("result", result
, s
->formatter
);
199 rgw_flush_formatter_and_reset(s
, s
->formatter
);
203 // command: DELETE /subscriptions/<sub-name>
204 class RGWPSDeleteSub_ObjStore
: public RGWPSDeleteSubOp
{
206 int get_params() override
{
207 sub_name
= s
->object
.name
;
208 topic_name
= s
->info
.args
.get("topic");
213 // command: POST /subscriptions/<sub-name>?ack&event-id=<event-id>
214 class RGWPSAckSubEvent_ObjStore
: public RGWPSAckSubEventOp
{
216 explicit RGWPSAckSubEvent_ObjStore() {}
218 int get_params() override
{
219 sub_name
= s
->object
.name
;
223 event_id
= s
->info
.args
.get("event-id", &exists
);
225 ldout(s
->cct
, 1) << "missing required param 'event-id'" << dendl
;
232 // command: GET /subscriptions/<sub-name>?events[&max-entries=<max-entries>][&marker=<marker>]
233 class RGWPSPullSubEvents_ObjStore
: public RGWPSPullSubEventsOp
{
235 int get_params() override
{
236 sub_name
= s
->object
.name
;
237 marker
= s
->info
.args
.get("marker");
238 const int ret
= s
->info
.args
.get_int("max-entries", &max_entries
,
239 RGWUserPubSub::Sub::DEFAULT_MAX_EVENTS
);
241 ldout(s
->cct
, 1) << "failed to parse 'max-entries' param" << dendl
;
247 void send_response() override
{
249 set_req_state_err(s
, op_ret
);
252 end_header(s
, this, "application/json");
258 encode_json("result", *sub
, s
->formatter
);
259 rgw_flush_formatter_and_reset(s
, s
->formatter
);
263 // subscriptions handler factory
264 class RGWHandler_REST_PSSub
: public RGWHandler_REST_S3
{
266 int init_permissions(RGWOp
* op
) override
{
270 int read_permissions(RGWOp
* op
) override
{
273 bool supports_quota() override
{
276 RGWOp
*op_get() override
{
277 if (s
->object
.empty()) {
280 if (s
->info
.args
.exists("events")) {
281 return new RGWPSPullSubEvents_ObjStore();
283 return new RGWPSGetSub_ObjStore();
285 RGWOp
*op_put() override
{
286 if (!s
->object
.empty()) {
287 return new RGWPSCreateSub_ObjStore();
291 RGWOp
*op_delete() override
{
292 if (!s
->object
.empty()) {
293 return new RGWPSDeleteSub_ObjStore();
297 RGWOp
*op_post() override
{
298 if (s
->info
.args
.exists("ack")) {
299 return new RGWPSAckSubEvent_ObjStore();
304 explicit RGWHandler_REST_PSSub(const rgw::auth::StrategyRegistry
& auth_registry
) : RGWHandler_REST_S3(auth_registry
) {}
305 virtual ~RGWHandler_REST_PSSub() = default;
309 // extract bucket name from ceph specific notification command, with the format:
310 // /notifications/<bucket-name>
311 int notif_bucket_path(const string
& path
, std::string
& bucket_name
) {
315 size_t pos
= path
.find('/');
316 if (pos
== string::npos
) {
319 if (pos
>= path
.size()) {
323 string type
= path
.substr(0, pos
);
324 if (type
!= "bucket") {
328 bucket_name
= path
.substr(pos
+ 1);
333 // command (ceph specific): PUT /notification/bucket/<bucket name>?topic=<topic name>
334 class RGWPSCreateNotif_ObjStore
: public RGWPSCreateNotifOp
{
336 std::string topic_name
;
337 rgw::notify::EventTypeList events
;
339 int get_params() override
{
341 topic_name
= s
->info
.args
.get("topic", &exists
);
343 ldout(s
->cct
, 1) << "missing required param 'topic'" << dendl
;
347 std::string events_str
= s
->info
.args
.get("events", &exists
);
349 // if no events are provided, we notify on all of them
350 events_str
= "OBJECT_CREATE,OBJECT_DELETE,DELETE_MARKER_CREATE";
352 rgw::notify::from_string_list(events_str
, events
);
353 if (std::find(events
.begin(), events
.end(), rgw::notify::UnknownEvent
) != events
.end()) {
354 ldout(s
->cct
, 1) << "invalid event type in list: " << events_str
<< dendl
;
357 return notif_bucket_path(s
->object
.name
, bucket_name
);
361 const char* name() const override
{ return "pubsub_notification_create"; }
362 void execute() override
;
365 void RGWPSCreateNotif_ObjStore::execute()
367 ups
.emplace(store
, s
->owner
.get_id());
369 auto b
= ups
->get_bucket(bucket_info
.bucket
);
370 op_ret
= b
->create_notification(topic_name
, events
);
372 ldout(s
->cct
, 1) << "failed to create notification for topic '" << topic_name
<< "', ret=" << op_ret
<< dendl
;
375 ldout(s
->cct
, 20) << "successfully created notification for topic '" << topic_name
<< "'" << dendl
;
378 // command: DELETE /notifications/bucket/<bucket>?topic=<topic-name>
379 class RGWPSDeleteNotif_ObjStore
: public RGWPSDeleteNotifOp
{
381 std::string topic_name
;
383 int get_params() override
{
385 topic_name
= s
->info
.args
.get("topic", &exists
);
387 ldout(s
->cct
, 1) << "missing required param 'topic'" << dendl
;
390 return notif_bucket_path(s
->object
.name
, bucket_name
);
394 void execute() override
;
395 const char* name() const override
{ return "pubsub_notification_delete"; }
398 void RGWPSDeleteNotif_ObjStore::execute() {
399 op_ret
= get_params();
404 ups
.emplace(store
, s
->owner
.get_id());
405 auto b
= ups
->get_bucket(bucket_info
.bucket
);
406 op_ret
= b
->remove_notification(topic_name
);
408 ldout(s
->cct
, 1) << "failed to remove notification from topic '" << topic_name
<< "', ret=" << op_ret
<< dendl
;
411 ldout(s
->cct
, 20) << "successfully removed notification from topic '" << topic_name
<< "'" << dendl
;
414 // command: GET /notifications/bucket/<bucket>
415 class RGWPSListNotifs_ObjStore
: public RGWPSListNotifsOp
{
417 rgw_pubsub_bucket_topics result
;
419 int get_params() override
{
420 return notif_bucket_path(s
->object
.name
, bucket_name
);
424 void execute() override
;
425 void send_response() override
{
427 set_req_state_err(s
, op_ret
);
430 end_header(s
, this, "application/json");
435 encode_json("result", result
, s
->formatter
);
436 rgw_flush_formatter_and_reset(s
, s
->formatter
);
438 const char* name() const override
{ return "pubsub_notifications_list"; }
441 void RGWPSListNotifs_ObjStore::execute()
443 ups
.emplace(store
, s
->owner
.get_id());
444 auto b
= ups
->get_bucket(bucket_info
.bucket
);
445 op_ret
= b
->get_topics(&result
);
447 ldout(s
->cct
, 1) << "failed to get topics, ret=" << op_ret
<< dendl
;
452 // ceph specific notification handler factory
453 class RGWHandler_REST_PSNotifs
: public RGWHandler_REST_S3
{
455 int init_permissions(RGWOp
* op
) override
{
459 int read_permissions(RGWOp
* op
) override
{
462 bool supports_quota() override
{
465 RGWOp
*op_get() override
{
466 if (s
->object
.empty()) {
469 return new RGWPSListNotifs_ObjStore();
471 RGWOp
*op_put() override
{
472 if (!s
->object
.empty()) {
473 return new RGWPSCreateNotif_ObjStore();
477 RGWOp
*op_delete() override
{
478 if (!s
->object
.empty()) {
479 return new RGWPSDeleteNotif_ObjStore();
484 explicit RGWHandler_REST_PSNotifs(const rgw::auth::StrategyRegistry
& auth_registry
) : RGWHandler_REST_S3(auth_registry
) {}
485 virtual ~RGWHandler_REST_PSNotifs() = default;
488 // factory for ceph specific PubSub REST handlers
489 RGWHandler_REST
* RGWRESTMgr_PubSub::get_handler(struct req_state
* const s
,
490 const rgw::auth::StrategyRegistry
& auth_registry
,
491 const std::string
& frontend_prefix
)
493 if (RGWHandler_REST_S3::init_from_header(s
, RGW_FORMAT_JSON
, true) < 0) {
497 RGWHandler_REST
* handler
{nullptr};
499 // ceph specific PubSub API: topics/subscriptions/notification are reserved bucket names
500 // this API is available only on RGW that belong to a pubsub zone
501 if (s
->init_state
.url_bucket
== "topics") {
502 handler
= new RGWHandler_REST_PSTopic(auth_registry
);
503 } else if (s
->init_state
.url_bucket
== "subscriptions") {
504 handler
= new RGWHandler_REST_PSSub(auth_registry
);
505 } else if (s
->init_state
.url_bucket
== "notifications") {
506 handler
= new RGWHandler_REST_PSNotifs(auth_registry
);
507 } else if (s
->info
.args
.exists("notification")) {
508 const int ret
= RGWHandler_REST::allocate_formatter(s
, RGW_FORMAT_XML
, true);
510 handler
= new RGWHandler_REST_PSNotifs_S3(auth_registry
);
514 ldout(s
->cct
, 20) << __func__
<< " handler=" << (handler
? typeid(*handler
).name() : "<null>") << dendl
;