1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
5 #include "rgw_rest_pubsub_common.h"
6 #include "rgw_rest_pubsub.h"
7 #include "rgw_sync_module_pubsub.h"
8 #include "rgw_pubsub_push.h"
9 #include "rgw_sync_module_pubsub_rest.h"
10 #include "rgw_pubsub.h"
13 #include "rgw_rest_s3.h"
16 #include "services/svc_zone.h"
17 #include "rgw_sal_rados.h"
19 #define dout_context g_ceph_context
20 #define dout_subsys ceph_subsys_rgw
22 // command: PUT /topics/<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]
23 class RGWPSCreateTopic_ObjStore
: public RGWPSCreateTopicOp
{
25 int get_params() override
{
27 topic_name
= s
->object
->get_name();
29 opaque_data
= s
->info
.args
.get("OpaqueData");
30 dest
.push_endpoint
= s
->info
.args
.get("push-endpoint");
32 if (!validate_and_update_endpoint_secret(dest
, s
->cct
, *(s
->info
.env
))) {
35 dest
.push_endpoint_args
= s
->info
.args
.get_str();
36 // dest object only stores endpoint info
37 // bucket to store events/records will be set only when subscription is created
38 dest
.bucket_name
= "";
40 dest
.arn_topic
= topic_name
;
41 // the topic ARN will be sent in the reply
42 const rgw::ARN
arn(rgw::Partition::aws
, rgw::Service::sns
,
43 store
->svc()->zone
->get_zonegroup().get_name(),
44 s
->user
->get_tenant(), topic_name
);
45 topic_arn
= arn
.to_string();
49 void send_response() override
{
51 set_req_state_err(s
, op_ret
);
54 end_header(s
, this, "application/json");
61 Formatter::ObjectSection
section(*s
->formatter
, "result");
62 encode_json("arn", topic_arn
, s
->formatter
);
64 rgw_flush_formatter_and_reset(s
, s
->formatter
);
68 // command: GET /topics
69 class RGWPSListTopics_ObjStore
: public RGWPSListTopicsOp
{
71 void send_response() override
{
73 set_req_state_err(s
, op_ret
);
76 end_header(s
, this, "application/json");
82 encode_json("result", result
, s
->formatter
);
83 rgw_flush_formatter_and_reset(s
, s
->formatter
);
87 // command: GET /topics/<topic-name>
88 class RGWPSGetTopic_ObjStore
: public RGWPSGetTopicOp
{
90 int get_params() override
{
91 topic_name
= s
->object
->get_name();
95 void send_response() override
{
97 set_req_state_err(s
, op_ret
);
100 end_header(s
, this, "application/json");
106 encode_json("result", result
, s
->formatter
);
107 rgw_flush_formatter_and_reset(s
, s
->formatter
);
111 // command: DELETE /topics/<topic-name>
112 class RGWPSDeleteTopic_ObjStore
: public RGWPSDeleteTopicOp
{
114 int get_params() override
{
115 topic_name
= s
->object
->get_name();
120 // ceph specifc topics handler factory
121 class RGWHandler_REST_PSTopic
: public RGWHandler_REST_S3
{
123 int init_permissions(RGWOp
* op
, optional_yield
) override
{
127 int read_permissions(RGWOp
* op
, optional_yield
) override
{
131 bool supports_quota() override
{
135 RGWOp
*op_get() override
{
136 if (s
->init_state
.url_bucket
.empty()) {
139 if (s
->object
->empty()) {
140 return new RGWPSListTopics_ObjStore();
142 return new RGWPSGetTopic_ObjStore();
144 RGWOp
*op_put() override
{
145 if (!s
->object
->empty()) {
146 return new RGWPSCreateTopic_ObjStore();
150 RGWOp
*op_delete() override
{
151 if (!s
->object
->empty()) {
152 return new RGWPSDeleteTopic_ObjStore();
157 explicit RGWHandler_REST_PSTopic(const rgw::auth::StrategyRegistry
& auth_registry
) : RGWHandler_REST_S3(auth_registry
) {}
158 virtual ~RGWHandler_REST_PSTopic() = default;
161 // command: PUT /subscriptions/<sub-name>?topic=<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]...
162 class RGWPSCreateSub_ObjStore
: public RGWPSCreateSubOp
{
164 int get_params() override
{
165 sub_name
= s
->object
->get_name();
168 topic_name
= s
->info
.args
.get("topic", &exists
);
170 ldout(s
->cct
, 1) << "missing required param 'topic'" << dendl
;
174 const auto psmodule
= static_cast<RGWPSSyncModuleInstance
*>(store
->getRados()->get_sync_module().get());
175 const auto& conf
= psmodule
->get_effective_conf();
177 dest
.push_endpoint
= s
->info
.args
.get("push-endpoint");
178 if (!validate_and_update_endpoint_secret(dest
, s
->cct
, *(s
->info
.env
))) {
181 dest
.push_endpoint_args
= s
->info
.args
.get_str();
182 dest
.bucket_name
= string(conf
["data_bucket_prefix"]) + s
->owner
.get_id().to_str() + "-" + topic_name
;
183 dest
.oid_prefix
= string(conf
["data_oid_prefix"]) + sub_name
+ "/";
184 dest
.arn_topic
= topic_name
;
190 // command: GET /subscriptions/<sub-name>
191 class RGWPSGetSub_ObjStore
: public RGWPSGetSubOp
{
193 int get_params() override
{
194 sub_name
= s
->object
->get_name();
197 void send_response() override
{
199 set_req_state_err(s
, op_ret
);
202 end_header(s
, this, "application/json");
208 encode_json("result", result
, s
->formatter
);
209 rgw_flush_formatter_and_reset(s
, s
->formatter
);
213 // command: DELETE /subscriptions/<sub-name>
214 class RGWPSDeleteSub_ObjStore
: public RGWPSDeleteSubOp
{
216 int get_params() override
{
217 sub_name
= s
->object
->get_name();
218 topic_name
= s
->info
.args
.get("topic");
223 // command: POST /subscriptions/<sub-name>?ack&event-id=<event-id>
224 class RGWPSAckSubEvent_ObjStore
: public RGWPSAckSubEventOp
{
226 explicit RGWPSAckSubEvent_ObjStore() {}
228 int get_params() override
{
229 sub_name
= s
->object
->get_name();
233 event_id
= s
->info
.args
.get("event-id", &exists
);
235 ldout(s
->cct
, 1) << "missing required param 'event-id'" << dendl
;
242 // command: GET /subscriptions/<sub-name>?events[&max-entries=<max-entries>][&marker=<marker>]
243 class RGWPSPullSubEvents_ObjStore
: public RGWPSPullSubEventsOp
{
245 int get_params() override
{
246 sub_name
= s
->object
->get_name();
247 marker
= s
->info
.args
.get("marker");
248 const int ret
= s
->info
.args
.get_int("max-entries", &max_entries
,
249 RGWPubSub::Sub::DEFAULT_MAX_EVENTS
);
251 ldout(s
->cct
, 1) << "failed to parse 'max-entries' param" << dendl
;
257 void send_response() override
{
259 set_req_state_err(s
, op_ret
);
262 end_header(s
, this, "application/json");
268 encode_json("result", *sub
, s
->formatter
);
269 rgw_flush_formatter_and_reset(s
, s
->formatter
);
273 // subscriptions handler factory
274 class RGWHandler_REST_PSSub
: public RGWHandler_REST_S3
{
276 int init_permissions(RGWOp
* op
, optional_yield
) override
{
280 int read_permissions(RGWOp
* op
, optional_yield
) override
{
283 bool supports_quota() override
{
286 RGWOp
*op_get() override
{
287 if (s
->object
->empty()) {
290 if (s
->info
.args
.exists("events")) {
291 return new RGWPSPullSubEvents_ObjStore();
293 return new RGWPSGetSub_ObjStore();
295 RGWOp
*op_put() override
{
296 if (!s
->object
->empty()) {
297 return new RGWPSCreateSub_ObjStore();
301 RGWOp
*op_delete() override
{
302 if (!s
->object
->empty()) {
303 return new RGWPSDeleteSub_ObjStore();
307 RGWOp
*op_post() override
{
308 if (s
->info
.args
.exists("ack")) {
309 return new RGWPSAckSubEvent_ObjStore();
314 explicit RGWHandler_REST_PSSub(const rgw::auth::StrategyRegistry
& auth_registry
) : RGWHandler_REST_S3(auth_registry
) {}
315 virtual ~RGWHandler_REST_PSSub() = default;
319 // extract bucket name from ceph specific notification command, with the format:
320 // /notifications/<bucket-name>
321 int notif_bucket_path(const string
& path
, std::string
& bucket_name
) {
325 size_t pos
= path
.find('/');
326 if (pos
== string::npos
) {
329 if (pos
>= path
.size()) {
333 string type
= path
.substr(0, pos
);
334 if (type
!= "bucket") {
338 bucket_name
= path
.substr(pos
+ 1);
343 // command (ceph specific): PUT /notification/bucket/<bucket name>?topic=<topic name>
344 class RGWPSCreateNotif_ObjStore
: public RGWPSCreateNotifOp
{
346 std::string topic_name
;
347 rgw::notify::EventTypeList events
;
349 int get_params() override
{
351 topic_name
= s
->info
.args
.get("topic", &exists
);
353 ldout(s
->cct
, 1) << "missing required param 'topic'" << dendl
;
357 std::string events_str
= s
->info
.args
.get("events", &exists
);
359 // if no events are provided, we notify on all of them
360 events_str
= "OBJECT_CREATE,OBJECT_DELETE,DELETE_MARKER_CREATE";
362 rgw::notify::from_string_list(events_str
, events
);
363 if (std::find(events
.begin(), events
.end(), rgw::notify::UnknownEvent
) != events
.end()) {
364 ldout(s
->cct
, 1) << "invalid event type in list: " << events_str
<< dendl
;
367 return notif_bucket_path(s
->object
->get_name(), bucket_name
);
371 const char* name() const override
{ return "pubsub_notification_create"; }
372 void execute(optional_yield y
) override
;
375 void RGWPSCreateNotif_ObjStore::execute(optional_yield y
)
377 ps
.emplace(store
, s
->owner
.get_id().tenant
);
379 auto b
= ps
->get_bucket(bucket_info
.bucket
);
380 op_ret
= b
->create_notification(topic_name
, events
, y
);
382 ldout(s
->cct
, 1) << "failed to create notification for topic '" << topic_name
<< "', ret=" << op_ret
<< dendl
;
385 ldout(s
->cct
, 20) << "successfully created notification for topic '" << topic_name
<< "'" << dendl
;
388 // command: DELETE /notifications/bucket/<bucket>?topic=<topic-name>
389 class RGWPSDeleteNotif_ObjStore
: public RGWPSDeleteNotifOp
{
391 std::string topic_name
;
393 int get_params() override
{
395 topic_name
= s
->info
.args
.get("topic", &exists
);
397 ldout(s
->cct
, 1) << "missing required param 'topic'" << dendl
;
400 return notif_bucket_path(s
->object
->get_name(), bucket_name
);
404 void execute(optional_yield y
) override
;
405 const char* name() const override
{ return "pubsub_notification_delete"; }
408 void RGWPSDeleteNotif_ObjStore::execute(optional_yield y
) {
409 op_ret
= get_params();
414 ps
.emplace(store
, s
->owner
.get_id().tenant
);
415 auto b
= ps
->get_bucket(bucket_info
.bucket
);
416 op_ret
= b
->remove_notification(topic_name
, y
);
418 ldout(s
->cct
, 1) << "failed to remove notification from topic '" << topic_name
<< "', ret=" << op_ret
<< dendl
;
421 ldout(s
->cct
, 20) << "successfully removed notification from topic '" << topic_name
<< "'" << dendl
;
424 // command: GET /notifications/bucket/<bucket>
425 class RGWPSListNotifs_ObjStore
: public RGWPSListNotifsOp
{
427 rgw_pubsub_bucket_topics result
;
429 int get_params() override
{
430 return notif_bucket_path(s
->object
->get_name(), bucket_name
);
434 void execute(optional_yield y
) override
;
435 void send_response() override
{
437 set_req_state_err(s
, op_ret
);
440 end_header(s
, this, "application/json");
445 encode_json("result", result
, s
->formatter
);
446 rgw_flush_formatter_and_reset(s
, s
->formatter
);
448 const char* name() const override
{ return "pubsub_notifications_list"; }
451 void RGWPSListNotifs_ObjStore::execute(optional_yield y
)
453 ps
.emplace(store
, s
->owner
.get_id().tenant
);
454 auto b
= ps
->get_bucket(bucket_info
.bucket
);
455 op_ret
= b
->get_topics(&result
);
457 ldout(s
->cct
, 1) << "failed to get topics, ret=" << op_ret
<< dendl
;
462 // ceph specific notification handler factory
463 class RGWHandler_REST_PSNotifs
: public RGWHandler_REST_S3
{
465 int init_permissions(RGWOp
* op
, optional_yield
) override
{
469 int read_permissions(RGWOp
* op
, optional_yield
) override
{
472 bool supports_quota() override
{
475 RGWOp
*op_get() override
{
476 if (s
->object
->empty()) {
479 return new RGWPSListNotifs_ObjStore();
481 RGWOp
*op_put() override
{
482 if (!s
->object
->empty()) {
483 return new RGWPSCreateNotif_ObjStore();
487 RGWOp
*op_delete() override
{
488 if (!s
->object
->empty()) {
489 return new RGWPSDeleteNotif_ObjStore();
494 explicit RGWHandler_REST_PSNotifs(const rgw::auth::StrategyRegistry
& auth_registry
) : RGWHandler_REST_S3(auth_registry
) {}
495 virtual ~RGWHandler_REST_PSNotifs() = default;
498 // factory for ceph specific PubSub REST handlers
499 RGWHandler_REST
* RGWRESTMgr_PubSub::get_handler(rgw::sal::RGWRadosStore
*store
,
500 struct req_state
* const s
,
501 const rgw::auth::StrategyRegistry
& auth_registry
,
502 const std::string
& frontend_prefix
)
504 if (RGWHandler_REST_S3::init_from_header(store
, s
, RGW_FORMAT_JSON
, true) < 0) {
508 RGWHandler_REST
* handler
{nullptr};
510 // ceph specific PubSub API: topics/subscriptions/notification are reserved bucket names
511 // this API is available only on RGW that belong to a pubsub zone
512 if (s
->init_state
.url_bucket
== "topics") {
513 handler
= new RGWHandler_REST_PSTopic(auth_registry
);
514 } else if (s
->init_state
.url_bucket
== "subscriptions") {
515 handler
= new RGWHandler_REST_PSSub(auth_registry
);
516 } else if (s
->init_state
.url_bucket
== "notifications") {
517 handler
= new RGWHandler_REST_PSNotifs(auth_registry
);
518 } else if (s
->info
.args
.exists("notification")) {
519 const int ret
= RGWHandler_REST::allocate_formatter(s
, RGW_FORMAT_XML
, true);
521 handler
= new RGWHandler_REST_PSNotifs_S3(auth_registry
);
525 ldout(s
->cct
, 20) << __func__
<< " handler=" << (handler
? typeid(*handler
).name() : "<null>") << dendl
;