1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
5 #include "rgw_rest_pubsub_common.h"
6 #include "rgw_rest_pubsub.h"
7 #include "rgw_sync_module_pubsub.h"
8 #include "rgw_pubsub_push.h"
9 #include "rgw_sync_module_pubsub_rest.h"
10 #include "rgw_pubsub.h"
13 #include "rgw_rest_s3.h"
16 #include "services/svc_zone.h"
17 #include "rgw_sal_rados.h"
19 #define dout_context g_ceph_context
20 #define dout_subsys ceph_subsys_rgw
24 // command: PUT /topics/<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]
25 class RGWPSCreateTopic_ObjStore
: public RGWPSCreateTopicOp
{
27 int get_params() override
{
29 topic_name
= s
->object
->get_name();
31 opaque_data
= s
->info
.args
.get("OpaqueData");
32 dest
.push_endpoint
= s
->info
.args
.get("push-endpoint");
34 if (!validate_and_update_endpoint_secret(dest
, s
->cct
, *(s
->info
.env
))) {
37 dest
.push_endpoint_args
= s
->info
.args
.get_str();
38 // dest object only stores endpoint info
39 // bucket to store events/records will be set only when subscription is created
40 dest
.bucket_name
= "";
42 dest
.arn_topic
= topic_name
;
43 // the topic ARN will be sent in the reply
44 const rgw::ARN
arn(rgw::Partition::aws
, rgw::Service::sns
,
45 store
->get_zone()->get_zonegroup().get_name(),
46 s
->user
->get_tenant(), topic_name
);
47 topic_arn
= arn
.to_string();
51 void send_response() override
{
53 set_req_state_err(s
, op_ret
);
56 end_header(s
, this, "application/json");
63 Formatter::ObjectSection
section(*s
->formatter
, "result");
64 encode_json("arn", topic_arn
, s
->formatter
);
66 rgw_flush_formatter_and_reset(s
, s
->formatter
);
70 // command: GET /topics
71 class RGWPSListTopics_ObjStore
: public RGWPSListTopicsOp
{
73 void send_response() override
{
75 set_req_state_err(s
, op_ret
);
78 end_header(s
, this, "application/json");
84 encode_json("result", result
, s
->formatter
);
85 rgw_flush_formatter_and_reset(s
, s
->formatter
);
89 // command: GET /topics/<topic-name>
90 class RGWPSGetTopic_ObjStore
: public RGWPSGetTopicOp
{
92 int get_params() override
{
93 topic_name
= s
->object
->get_name();
97 void send_response() override
{
99 set_req_state_err(s
, op_ret
);
102 end_header(s
, this, "application/json");
108 encode_json("result", result
, s
->formatter
);
109 rgw_flush_formatter_and_reset(s
, s
->formatter
);
113 // command: DELETE /topics/<topic-name>
114 class RGWPSDeleteTopic_ObjStore
: public RGWPSDeleteTopicOp
{
116 int get_params() override
{
117 topic_name
= s
->object
->get_name();
122 // ceph specifc topics handler factory
123 class RGWHandler_REST_PSTopic
: public RGWHandler_REST_S3
{
125 int init_permissions(RGWOp
* op
, optional_yield
) override
{
129 int read_permissions(RGWOp
* op
, optional_yield
) override
{
133 bool supports_quota() override
{
137 RGWOp
*op_get() override
{
138 if (s
->init_state
.url_bucket
.empty()) {
141 if (s
->object
== nullptr || s
->object
->empty()) {
142 return new RGWPSListTopics_ObjStore();
144 return new RGWPSGetTopic_ObjStore();
146 RGWOp
*op_put() override
{
147 if (!s
->object
->empty()) {
148 return new RGWPSCreateTopic_ObjStore();
152 RGWOp
*op_delete() override
{
153 if (!s
->object
->empty()) {
154 return new RGWPSDeleteTopic_ObjStore();
159 explicit RGWHandler_REST_PSTopic(const rgw::auth::StrategyRegistry
& auth_registry
) : RGWHandler_REST_S3(auth_registry
) {}
160 virtual ~RGWHandler_REST_PSTopic() = default;
163 // command: PUT /subscriptions/<sub-name>?topic=<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]...
164 class RGWPSCreateSub_ObjStore
: public RGWPSCreateSubOp
{
166 int get_params() override
{
167 sub_name
= s
->object
->get_name();
170 topic_name
= s
->info
.args
.get("topic", &exists
);
172 ldpp_dout(this, 1) << "missing required param 'topic'" << dendl
;
176 const auto psmodule
= static_cast<RGWPSSyncModuleInstance
*>(store
->get_sync_module().get());
177 const auto& conf
= psmodule
->get_effective_conf();
179 dest
.push_endpoint
= s
->info
.args
.get("push-endpoint");
180 if (!validate_and_update_endpoint_secret(dest
, s
->cct
, *(s
->info
.env
))) {
183 dest
.push_endpoint_args
= s
->info
.args
.get_str();
184 dest
.bucket_name
= string(conf
["data_bucket_prefix"]) + s
->owner
.get_id().to_str() + "-" + topic_name
;
185 dest
.oid_prefix
= string(conf
["data_oid_prefix"]) + sub_name
+ "/";
186 dest
.arn_topic
= topic_name
;
192 // command: GET /subscriptions/<sub-name>
193 class RGWPSGetSub_ObjStore
: public RGWPSGetSubOp
{
195 int get_params() override
{
196 sub_name
= s
->object
->get_name();
199 void send_response() override
{
201 set_req_state_err(s
, op_ret
);
204 end_header(s
, this, "application/json");
210 encode_json("result", result
, s
->formatter
);
211 rgw_flush_formatter_and_reset(s
, s
->formatter
);
215 // command: DELETE /subscriptions/<sub-name>
216 class RGWPSDeleteSub_ObjStore
: public RGWPSDeleteSubOp
{
218 int get_params() override
{
219 sub_name
= s
->object
->get_name();
220 topic_name
= s
->info
.args
.get("topic");
225 // command: POST /subscriptions/<sub-name>?ack&event-id=<event-id>
226 class RGWPSAckSubEvent_ObjStore
: public RGWPSAckSubEventOp
{
228 explicit RGWPSAckSubEvent_ObjStore() {}
230 int get_params() override
{
231 sub_name
= s
->object
->get_name();
235 event_id
= s
->info
.args
.get("event-id", &exists
);
237 ldpp_dout(this, 1) << "missing required param 'event-id'" << dendl
;
244 // command: GET /subscriptions/<sub-name>?events[&max-entries=<max-entries>][&marker=<marker>]
245 class RGWPSPullSubEvents_ObjStore
: public RGWPSPullSubEventsOp
{
247 int get_params() override
{
248 sub_name
= s
->object
->get_name();
249 marker
= s
->info
.args
.get("marker");
250 const int ret
= s
->info
.args
.get_int("max-entries", &max_entries
,
251 RGWPubSub::Sub::DEFAULT_MAX_EVENTS
);
253 ldpp_dout(this, 1) << "failed to parse 'max-entries' param" << dendl
;
259 void send_response() override
{
261 set_req_state_err(s
, op_ret
);
264 end_header(s
, this, "application/json");
270 encode_json("result", *sub
, s
->formatter
);
271 rgw_flush_formatter_and_reset(s
, s
->formatter
);
275 // subscriptions handler factory
276 class RGWHandler_REST_PSSub
: public RGWHandler_REST_S3
{
278 int init_permissions(RGWOp
* op
, optional_yield
) override
{
282 int read_permissions(RGWOp
* op
, optional_yield
) override
{
285 bool supports_quota() override
{
288 RGWOp
*op_get() override
{
289 if (s
->object
->empty()) {
292 if (s
->info
.args
.exists("events")) {
293 return new RGWPSPullSubEvents_ObjStore();
295 return new RGWPSGetSub_ObjStore();
297 RGWOp
*op_put() override
{
298 if (!s
->object
->empty()) {
299 return new RGWPSCreateSub_ObjStore();
303 RGWOp
*op_delete() override
{
304 if (!s
->object
->empty()) {
305 return new RGWPSDeleteSub_ObjStore();
309 RGWOp
*op_post() override
{
310 if (s
->info
.args
.exists("ack")) {
311 return new RGWPSAckSubEvent_ObjStore();
316 explicit RGWHandler_REST_PSSub(const rgw::auth::StrategyRegistry
& auth_registry
) : RGWHandler_REST_S3(auth_registry
) {}
317 virtual ~RGWHandler_REST_PSSub() = default;
321 // extract bucket name from ceph specific notification command, with the format:
322 // /notifications/<bucket-name>
323 int notif_bucket_path(const string
& path
, std::string
& bucket_name
) {
327 size_t pos
= path
.find('/');
328 if (pos
== string::npos
) {
331 if (pos
>= path
.size()) {
335 string type
= path
.substr(0, pos
);
336 if (type
!= "bucket") {
340 bucket_name
= path
.substr(pos
+ 1);
345 // command (ceph specific): PUT /notification/bucket/<bucket name>?topic=<topic name>
346 class RGWPSCreateNotif_ObjStore
: public RGWPSCreateNotifOp
{
348 std::string topic_name
;
349 rgw::notify::EventTypeList events
;
351 int get_params() override
{
353 topic_name
= s
->info
.args
.get("topic", &exists
);
355 ldpp_dout(this, 1) << "missing required param 'topic'" << dendl
;
359 std::string events_str
= s
->info
.args
.get("events", &exists
);
361 // if no events are provided, we notify on all of them
363 "OBJECT_CREATE,OBJECT_DELETE,DELETE_MARKER_CREATE,OBJECT_EXPIRATION";
365 rgw::notify::from_string_list(events_str
, events
);
366 if (std::find(events
.begin(), events
.end(), rgw::notify::UnknownEvent
) != events
.end()) {
367 ldpp_dout(this, 1) << "invalid event type in list: " << events_str
<< dendl
;
370 return notif_bucket_path(s
->object
->get_name(), bucket_name
);
374 const char* name() const override
{ return "pubsub_notification_create"; }
375 void execute(optional_yield y
) override
;
378 void RGWPSCreateNotif_ObjStore::execute(optional_yield y
)
380 ps
.emplace(static_cast<rgw::sal::RadosStore
*>(store
), s
->owner
.get_id().tenant
);
382 auto b
= ps
->get_bucket(bucket_info
.bucket
);
383 op_ret
= b
->create_notification(this, topic_name
, events
, y
);
385 ldpp_dout(this, 1) << "failed to create notification for topic '" << topic_name
<< "', ret=" << op_ret
<< dendl
;
388 ldpp_dout(this, 20) << "successfully created notification for topic '" << topic_name
<< "'" << dendl
;
391 // command: DELETE /notifications/bucket/<bucket>?topic=<topic-name>
392 class RGWPSDeleteNotif_ObjStore
: public RGWPSDeleteNotifOp
{
394 std::string topic_name
;
396 int get_params() override
{
398 topic_name
= s
->info
.args
.get("topic", &exists
);
400 ldpp_dout(this, 1) << "missing required param 'topic'" << dendl
;
403 return notif_bucket_path(s
->object
->get_name(), bucket_name
);
407 void execute(optional_yield y
) override
;
408 const char* name() const override
{ return "pubsub_notification_delete"; }
411 void RGWPSDeleteNotif_ObjStore::execute(optional_yield y
) {
412 op_ret
= get_params();
417 ps
.emplace(static_cast<rgw::sal::RadosStore
*>(store
), s
->owner
.get_id().tenant
);
418 auto b
= ps
->get_bucket(bucket_info
.bucket
);
419 op_ret
= b
->remove_notification(this, topic_name
, y
);
421 ldpp_dout(s
, 1) << "failed to remove notification from topic '" << topic_name
<< "', ret=" << op_ret
<< dendl
;
424 ldpp_dout(this, 20) << "successfully removed notification from topic '" << topic_name
<< "'" << dendl
;
427 // command: GET /notifications/bucket/<bucket>
428 class RGWPSListNotifs_ObjStore
: public RGWPSListNotifsOp
{
430 rgw_pubsub_bucket_topics result
;
432 int get_params() override
{
433 return notif_bucket_path(s
->object
->get_name(), bucket_name
);
437 void execute(optional_yield y
) override
;
438 void send_response() override
{
440 set_req_state_err(s
, op_ret
);
443 end_header(s
, this, "application/json");
448 encode_json("result", result
, s
->formatter
);
449 rgw_flush_formatter_and_reset(s
, s
->formatter
);
451 const char* name() const override
{ return "pubsub_notifications_list"; }
454 void RGWPSListNotifs_ObjStore::execute(optional_yield y
)
456 ps
.emplace(static_cast<rgw::sal::RadosStore
*>(store
), s
->owner
.get_id().tenant
);
457 auto b
= ps
->get_bucket(bucket_info
.bucket
);
458 op_ret
= b
->get_topics(&result
);
460 ldpp_dout(this, 1) << "failed to get topics, ret=" << op_ret
<< dendl
;
465 // ceph specific notification handler factory
466 class RGWHandler_REST_PSNotifs
: public RGWHandler_REST_S3
{
468 int init_permissions(RGWOp
* op
, optional_yield
) override
{
472 int read_permissions(RGWOp
* op
, optional_yield
) override
{
475 bool supports_quota() override
{
478 RGWOp
*op_get() override
{
479 if (s
->object
->empty()) {
482 return new RGWPSListNotifs_ObjStore();
484 RGWOp
*op_put() override
{
485 if (!s
->object
->empty()) {
486 return new RGWPSCreateNotif_ObjStore();
490 RGWOp
*op_delete() override
{
491 if (!s
->object
->empty()) {
492 return new RGWPSDeleteNotif_ObjStore();
497 explicit RGWHandler_REST_PSNotifs(const rgw::auth::StrategyRegistry
& auth_registry
) : RGWHandler_REST_S3(auth_registry
) {}
498 virtual ~RGWHandler_REST_PSNotifs() = default;
501 // factory for ceph specific PubSub REST handlers
502 RGWHandler_REST
* RGWRESTMgr_PubSub::get_handler(rgw::sal::Store
* store
,
503 struct req_state
* const s
,
504 const rgw::auth::StrategyRegistry
& auth_registry
,
505 const std::string
& frontend_prefix
)
507 if (RGWHandler_REST_S3::init_from_header(store
, s
, RGW_FORMAT_JSON
, true) < 0) {
511 RGWHandler_REST
* handler
{nullptr};
513 // ceph specific PubSub API: topics/subscriptions/notification are reserved bucket names
514 // this API is available only on RGW that belong to a pubsub zone
515 if (s
->init_state
.url_bucket
== "topics") {
516 handler
= new RGWHandler_REST_PSTopic(auth_registry
);
517 } else if (s
->init_state
.url_bucket
== "subscriptions") {
518 handler
= new RGWHandler_REST_PSSub(auth_registry
);
519 } else if (s
->init_state
.url_bucket
== "notifications") {
520 handler
= new RGWHandler_REST_PSNotifs(auth_registry
);
521 } else if (s
->info
.args
.exists("notification")) {
522 const int ret
= RGWHandler_REST::allocate_formatter(s
, RGW_FORMAT_XML
, true);
524 handler
= new RGWHandler_REST_PSNotifs_S3(auth_registry
);
528 ldpp_dout(s
, 20) << __func__
<< " handler=" << (handler
? typeid(*handler
).name() : "<null>") << dendl
;