1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 #include <boost/tokenizer.hpp>
7 #include "rgw_rest_pubsub.h"
8 #include "rgw_pubsub_push.h"
9 #include "rgw_pubsub.h"
12 #include "rgw_rest_s3.h"
14 #include "rgw_auth_s3.h"
15 #include "rgw_notify.h"
16 #include "services/svc_zone.h"
17 #include "common/dout.h"
20 #define dout_context g_ceph_context
21 #define dout_subsys ceph_subsys_rgw
23 static const char* AWS_SNS_NS("https://sns.amazonaws.com/doc/2010-03-31/");
25 bool verify_transport_security(CephContext
*cct
, const RGWEnv
& env
) {
26 const auto is_secure
= rgw_transport_is_secure(cct
, env
);
27 if (!is_secure
&& g_conf().get_val
<bool>("rgw_allow_notification_secrets_in_cleartext")) {
28 ldout(cct
, 0) << "WARNING: bypassing endpoint validation, allows sending secrets over insecure transport" << dendl
;
34 // make sure that endpoint is a valid URL
35 // make sure that if user/password are passed inside URL, it is over secure connection
36 // update rgw_pubsub_dest to indicate that a password is stored in the URL
37 bool validate_and_update_endpoint_secret(rgw_pubsub_dest
& dest
, CephContext
*cct
, const RGWEnv
& env
) {
38 if (dest
.push_endpoint
.empty()) {
43 if (!rgw::parse_url_userinfo(dest
.push_endpoint
, user
, password
)) {
44 ldout(cct
, 1) << "endpoint validation error: malformed endpoint URL:" << dest
.push_endpoint
<< dendl
;
47 // this should be verified inside parse_url()
48 ceph_assert(user
.empty() == password
.empty());
50 dest
.stored_secret
= true;
51 if (!verify_transport_security(cct
, env
)) {
52 ldout(cct
, 1) << "endpoint validation error: sending secrets over insecure transport" << dendl
;
59 bool topic_has_endpoint_secret(const rgw_pubsub_topic
& topic
) {
60 return topic
.dest
.stored_secret
;
63 bool topics_has_endpoint_secret(const rgw_pubsub_topics
& topics
) {
64 for (const auto& topic
: topics
.topics
) {
65 if (topic_has_endpoint_secret(topic
.second
)) return true;
70 // command (AWS compliant):
72 // Action=CreateTopic&Name=<topic-name>[&OpaqueData=data][&push-endpoint=<endpoint>[&persistent][&<arg1>=<value1>]]
73 class RGWPSCreateTopicOp
: public RGWOp
{
75 std::string topic_name
;
77 std::string topic_arn
;
78 std::string opaque_data
;
81 topic_name
= s
->info
.args
.get("Name");
82 if (topic_name
.empty()) {
83 ldpp_dout(this, 1) << "CreateTopic Action 'Name' argument is missing" << dendl
;
87 opaque_data
= s
->info
.args
.get("OpaqueData");
89 dest
.push_endpoint
= s
->info
.args
.get("push-endpoint");
90 s
->info
.args
.get_bool("persistent", &dest
.persistent
, false);
92 if (!validate_and_update_endpoint_secret(dest
, s
->cct
, *(s
->info
.env
))) {
95 for (const auto& param
: s
->info
.args
.get_params()) {
96 if (param
.first
== "Action" || param
.first
== "Name" || param
.first
== "PayloadHash") {
99 dest
.push_endpoint_args
.append(param
.first
+"="+param
.second
+"&");
102 if (!dest
.push_endpoint_args
.empty()) {
103 // remove last separator
104 dest
.push_endpoint_args
.pop_back();
106 if (!dest
.push_endpoint
.empty() && dest
.persistent
) {
107 const auto ret
= rgw::notify::add_persistent_topic(topic_name
, s
->yield
);
109 ldpp_dout(this, 1) << "CreateTopic Action failed to create queue for persistent topics. error:" << ret
<< dendl
;
114 // dest object only stores endpoint info
115 dest
.arn_topic
= topic_name
;
116 // the topic ARN will be sent in the reply
117 const rgw::ARN
arn(rgw::Partition::aws
, rgw::Service::sns
,
118 driver
->get_zone()->get_zonegroup().get_name(),
119 s
->user
->get_tenant(), topic_name
);
120 topic_arn
= arn
.to_string();
125 int verify_permission(optional_yield
) override
{
129 void pre_exec() override
{
130 rgw_bucket_object_pre_exec(s
);
132 void execute(optional_yield
) override
;
134 const char* name() const override
{ return "pubsub_topic_create"; }
135 RGWOpType
get_type() override
{ return RGW_OP_PUBSUB_TOPIC_CREATE
; }
136 uint32_t op_mask() override
{ return RGW_OP_TYPE_WRITE
; }
138 void send_response() override
{
140 set_req_state_err(s
, op_ret
);
143 end_header(s
, this, "application/xml");
149 const auto f
= s
->formatter
;
150 f
->open_object_section_in_ns("CreateTopicResponse", AWS_SNS_NS
);
151 f
->open_object_section("CreateTopicResult");
152 encode_xml("TopicArn", topic_arn
, f
);
153 f
->close_section(); // CreateTopicResult
154 f
->open_object_section("ResponseMetadata");
155 encode_xml("RequestId", s
->req_id
, f
);
156 f
->close_section(); // ResponseMetadata
157 f
->close_section(); // CreateTopicResponse
158 rgw_flush_formatter_and_reset(s
, f
);
162 void RGWPSCreateTopicOp::execute(optional_yield y
) {
163 op_ret
= get_params();
168 const RGWPubSub
ps(driver
, s
->owner
.get_id().tenant
);
169 op_ret
= ps
.create_topic(this, topic_name
, dest
, topic_arn
, opaque_data
, y
);
171 ldpp_dout(this, 1) << "failed to create topic '" << topic_name
<< "', ret=" << op_ret
<< dendl
;
174 ldpp_dout(this, 20) << "successfully created topic '" << topic_name
<< "'" << dendl
;
177 // command (AWS compliant):
180 class RGWPSListTopicsOp
: public RGWOp
{
182 rgw_pubsub_topics result
;
185 int verify_permission(optional_yield
) override
{
188 void pre_exec() override
{
189 rgw_bucket_object_pre_exec(s
);
191 void execute(optional_yield
) override
;
193 const char* name() const override
{ return "pubsub_topics_list"; }
194 RGWOpType
get_type() override
{ return RGW_OP_PUBSUB_TOPICS_LIST
; }
195 uint32_t op_mask() override
{ return RGW_OP_TYPE_READ
; }
197 void send_response() override
{
199 set_req_state_err(s
, op_ret
);
202 end_header(s
, this, "application/xml");
208 const auto f
= s
->formatter
;
209 f
->open_object_section_in_ns("ListTopicsResponse", AWS_SNS_NS
);
210 f
->open_object_section("ListTopicsResult");
211 encode_xml("Topics", result
, f
);
212 f
->close_section(); // ListTopicsResult
213 f
->open_object_section("ResponseMetadata");
214 encode_xml("RequestId", s
->req_id
, f
);
215 f
->close_section(); // ResponseMetadat
216 f
->close_section(); // ListTopicsResponse
217 rgw_flush_formatter_and_reset(s
, f
);
221 void RGWPSListTopicsOp::execute(optional_yield y
) {
222 const RGWPubSub
ps(driver
, s
->owner
.get_id().tenant
);
223 op_ret
= ps
.get_topics(this, result
, y
);
224 // if there are no topics it is not considered an error
225 op_ret
= op_ret
== -ENOENT
? 0 : op_ret
;
227 ldpp_dout(this, 1) << "failed to get topics, ret=" << op_ret
<< dendl
;
230 if (topics_has_endpoint_secret(result
) && !verify_transport_security(s
->cct
, *(s
->info
.env
))) {
231 ldpp_dout(this, 1) << "topics contain secrets and cannot be sent over insecure transport" << dendl
;
235 ldpp_dout(this, 20) << "successfully got topics" << dendl
;
238 // command (extension to AWS):
240 // Action=GetTopic&TopicArn=<topic-arn>
241 class RGWPSGetTopicOp
: public RGWOp
{
243 std::string topic_name
;
244 rgw_pubsub_topic result
;
247 const auto topic_arn
= rgw::ARN::parse((s
->info
.args
.get("TopicArn")));
249 if (!topic_arn
|| topic_arn
->resource
.empty()) {
250 ldpp_dout(this, 1) << "GetTopic Action 'TopicArn' argument is missing or invalid" << dendl
;
254 topic_name
= topic_arn
->resource
;
259 int verify_permission(optional_yield y
) override
{
262 void pre_exec() override
{
263 rgw_bucket_object_pre_exec(s
);
265 void execute(optional_yield y
) override
;
267 const char* name() const override
{ return "pubsub_topic_get"; }
268 RGWOpType
get_type() override
{ return RGW_OP_PUBSUB_TOPIC_GET
; }
269 uint32_t op_mask() override
{ return RGW_OP_TYPE_READ
; }
271 void send_response() override
{
273 set_req_state_err(s
, op_ret
);
276 end_header(s
, this, "application/xml");
282 const auto f
= s
->formatter
;
283 f
->open_object_section("GetTopicResponse");
284 f
->open_object_section("GetTopicResult");
285 encode_xml("Topic", result
, f
);
287 f
->open_object_section("ResponseMetadata");
288 encode_xml("RequestId", s
->req_id
, f
);
291 rgw_flush_formatter_and_reset(s
, f
);
295 void RGWPSGetTopicOp::execute(optional_yield y
) {
296 op_ret
= get_params();
300 const RGWPubSub
ps(driver
, s
->owner
.get_id().tenant
);
301 op_ret
= ps
.get_topic(this, topic_name
, result
, y
);
303 ldpp_dout(this, 1) << "failed to get topic '" << topic_name
<< "', ret=" << op_ret
<< dendl
;
306 if (topic_has_endpoint_secret(result
) && !verify_transport_security(s
->cct
, *(s
->info
.env
))) {
307 ldpp_dout(this, 1) << "topic '" << topic_name
<< "' contain secret and cannot be sent over insecure transport" << dendl
;
311 ldpp_dout(this, 1) << "successfully got topic '" << topic_name
<< "'" << dendl
;
314 // command (AWS compliant):
316 // Action=GetTopicAttributes&TopicArn=<topic-arn>
317 class RGWPSGetTopicAttributesOp
: public RGWOp
{
319 std::string topic_name
;
320 rgw_pubsub_topic result
;
323 const auto topic_arn
= rgw::ARN::parse((s
->info
.args
.get("TopicArn")));
325 if (!topic_arn
|| topic_arn
->resource
.empty()) {
326 ldpp_dout(this, 1) << "GetTopicAttribute Action 'TopicArn' argument is missing or invalid" << dendl
;
330 topic_name
= topic_arn
->resource
;
335 int verify_permission(optional_yield y
) override
{
338 void pre_exec() override
{
339 rgw_bucket_object_pre_exec(s
);
341 void execute(optional_yield y
) override
;
343 const char* name() const override
{ return "pubsub_topic_get"; }
344 RGWOpType
get_type() override
{ return RGW_OP_PUBSUB_TOPIC_GET
; }
345 uint32_t op_mask() override
{ return RGW_OP_TYPE_READ
; }
347 void send_response() override
{
349 set_req_state_err(s
, op_ret
);
352 end_header(s
, this, "application/xml");
358 const auto f
= s
->formatter
;
359 f
->open_object_section_in_ns("GetTopicAttributesResponse", AWS_SNS_NS
);
360 f
->open_object_section("GetTopicAttributesResult");
361 result
.dump_xml_as_attributes(f
);
362 f
->close_section(); // GetTopicAttributesResult
363 f
->open_object_section("ResponseMetadata");
364 encode_xml("RequestId", s
->req_id
, f
);
365 f
->close_section(); // ResponseMetadata
366 f
->close_section(); // GetTopicAttributesResponse
367 rgw_flush_formatter_and_reset(s
, f
);
371 void RGWPSGetTopicAttributesOp::execute(optional_yield y
) {
372 op_ret
= get_params();
376 const RGWPubSub
ps(driver
, s
->owner
.get_id().tenant
);
377 op_ret
= ps
.get_topic(this, topic_name
, result
, y
);
379 ldpp_dout(this, 1) << "failed to get topic '" << topic_name
<< "', ret=" << op_ret
<< dendl
;
382 if (topic_has_endpoint_secret(result
) && !verify_transport_security(s
->cct
, *(s
->info
.env
))) {
383 ldpp_dout(this, 1) << "topic '" << topic_name
<< "' contain secret and cannot be sent over insecure transport" << dendl
;
387 ldpp_dout(this, 1) << "successfully got topic '" << topic_name
<< "'" << dendl
;
390 // command (AWS compliant):
392 // Action=DeleteTopic&TopicArn=<topic-arn>
393 class RGWPSDeleteTopicOp
: public RGWOp
{
395 std::string topic_name
;
398 const auto topic_arn
= rgw::ARN::parse((s
->info
.args
.get("TopicArn")));
400 if (!topic_arn
|| topic_arn
->resource
.empty()) {
401 ldpp_dout(this, 1) << "DeleteTopic Action 'TopicArn' argument is missing or invalid" << dendl
;
405 topic_name
= topic_arn
->resource
;
407 // upon deletion it is not known if topic is persistent or not
408 // will try to delete the persistent topic anyway
409 const auto ret
= rgw::notify::remove_persistent_topic(topic_name
, s
->yield
);
410 if (ret
== -ENOENT
) {
411 // topic was not persistent, or already deleted
415 ldpp_dout(this, 1) << "DeleteTopic Action failed to remove queue for persistent topics. error:" << ret
<< dendl
;
423 int verify_permission(optional_yield
) override
{
426 void pre_exec() override
{
427 rgw_bucket_object_pre_exec(s
);
429 void execute(optional_yield y
) override
;
431 const char* name() const override
{ return "pubsub_topic_delete"; }
432 RGWOpType
get_type() override
{ return RGW_OP_PUBSUB_TOPIC_DELETE
; }
433 uint32_t op_mask() override
{ return RGW_OP_TYPE_DELETE
; }
435 void send_response() override
{
437 set_req_state_err(s
, op_ret
);
440 end_header(s
, this, "application/xml");
446 const auto f
= s
->formatter
;
447 f
->open_object_section_in_ns("DeleteTopicResponse", AWS_SNS_NS
);
448 f
->open_object_section("ResponseMetadata");
449 encode_xml("RequestId", s
->req_id
, f
);
450 f
->close_section(); // ResponseMetadata
451 f
->close_section(); // DeleteTopicResponse
452 rgw_flush_formatter_and_reset(s
, f
);
456 void RGWPSDeleteTopicOp::execute(optional_yield y
) {
457 op_ret
= get_params();
461 const RGWPubSub
ps(driver
, s
->owner
.get_id().tenant
);
462 op_ret
= ps
.remove_topic(this, topic_name
, y
);
464 ldpp_dout(this, 1) << "failed to remove topic '" << topic_name
<< ", ret=" << op_ret
<< dendl
;
467 ldpp_dout(this, 1) << "successfully removed topic '" << topic_name
<< "'" << dendl
;
470 using op_generator
= RGWOp
*(*)();
471 static const std::unordered_map
<std::string
, op_generator
> op_generators
= {
472 {"CreateTopic", []() -> RGWOp
* {return new RGWPSCreateTopicOp
;}},
473 {"DeleteTopic", []() -> RGWOp
* {return new RGWPSDeleteTopicOp
;}},
474 {"ListTopics", []() -> RGWOp
* {return new RGWPSListTopicsOp
;}},
475 {"GetTopic", []() -> RGWOp
* {return new RGWPSGetTopicOp
;}},
476 {"GetTopicAttributes", []() -> RGWOp
* {return new RGWPSGetTopicAttributesOp
;}}
479 bool RGWHandler_REST_PSTopic_AWS::action_exists(const req_state
* s
)
481 if (s
->info
.args
.exists("Action")) {
482 const std::string action_name
= s
->info
.args
.get("Action");
483 return op_generators
.contains(action_name
);
488 RGWOp
*RGWHandler_REST_PSTopic_AWS::op_post()
491 s
->prot_flags
= RGW_REST_STS
;
493 if (s
->info
.args
.exists("Action")) {
494 const std::string action_name
= s
->info
.args
.get("Action");
495 const auto action_it
= op_generators
.find(action_name
);
496 if (action_it
!= op_generators
.end()) {
497 return action_it
->second();
499 ldpp_dout(s
, 10) << "unknown action '" << action_name
<< "' for Topic handler" << dendl
;
501 ldpp_dout(s
, 10) << "missing action argument in Topic handler" << dendl
;
506 int RGWHandler_REST_PSTopic_AWS::authorize(const DoutPrefixProvider
* dpp
, optional_yield y
) {
507 const auto rc
= RGW_Auth_S3::authorize(dpp
, driver
, auth_registry
, s
, y
);
511 if (s
->auth
.identity
->is_anonymous()) {
512 ldpp_dout(dpp
, 1) << "anonymous user not allowed in topic operations" << dendl
;
513 return -ERR_INVALID_REQUEST
;
519 // return a unique topic by prefexing with the notification name: <notification>_<topic>
520 std::string
topic_to_unique(const std::string
& topic
, const std::string
& notification
) {
521 return notification
+ "_" + topic
;
524 // extract the topic from a unique topic of the form: <notification>_<topic>
525 [[maybe_unused
]] std::string
unique_to_topic(const std::string
& unique_topic
, const std::string
& notification
) {
526 if (unique_topic
.find(notification
+ "_") == std::string::npos
) {
529 return unique_topic
.substr(notification
.length() + 1);
532 // from list of bucket topics, find the one that was auto-generated by a notification
533 auto find_unique_topic(const rgw_pubsub_bucket_topics
& bucket_topics
, const std::string
& notif_name
) {
534 auto it
= std::find_if(bucket_topics
.topics
.begin(), bucket_topics
.topics
.end(), [&](const auto& val
) { return notif_name
== val
.second
.s3_id
; });
535 return it
!= bucket_topics
.topics
.end() ?
536 std::optional
<std::reference_wrapper
<const rgw_pubsub_topic_filter
>>(it
->second
):
541 int remove_notification_by_topic(const DoutPrefixProvider
*dpp
, const std::string
& topic_name
, const RGWPubSub::Bucket
& b
, optional_yield y
, const RGWPubSub
& ps
) {
542 int op_ret
= b
.remove_notification(dpp
, topic_name
, y
);
544 ldpp_dout(dpp
, 1) << "failed to remove notification of topic '" << topic_name
<< "', ret=" << op_ret
<< dendl
;
546 op_ret
= ps
.remove_topic(dpp
, topic_name
, y
);
548 ldpp_dout(dpp
, 1) << "failed to remove auto-generated topic '" << topic_name
<< "', ret=" << op_ret
<< dendl
;
553 int delete_all_notifications(const DoutPrefixProvider
*dpp
, const rgw_pubsub_bucket_topics
& bucket_topics
, const RGWPubSub::Bucket
& b
, optional_yield y
, const RGWPubSub
& ps
) {
554 // delete all notifications of on a bucket
555 for (const auto& topic
: bucket_topics
.topics
) {
556 const auto op_ret
= remove_notification_by_topic(dpp
, topic
.first
, b
, y
, ps
);
564 // command (S3 compliant): PUT /<bucket name>?notification
565 // a "notification" and a subscription will be auto-generated
566 // actual configuration is XML encoded in the body of the message
567 class RGWPSCreateNotifOp
: public RGWDefaultResponseOp
{
568 int verify_params() override
{
570 const auto no_value
= s
->info
.args
.get("notification", &exists
);
572 ldpp_dout(this, 1) << "missing required param 'notification'" << dendl
;
575 if (no_value
.length() > 0) {
576 ldpp_dout(this, 1) << "param 'notification' should not have any value" << dendl
;
579 if (s
->bucket_name
.empty()) {
580 ldpp_dout(this, 1) << "request must be on a bucket" << dendl
;
586 int get_params_from_body(rgw_pubsub_s3_notifications
& configurations
) {
587 const auto max_size
= s
->cct
->_conf
->rgw_max_put_param_size
;
590 std::tie(r
, data
) = read_all_input(s
, max_size
, false);
593 ldpp_dout(this, 1) << "failed to read XML payload" << dendl
;
596 if (data
.length() == 0) {
597 ldpp_dout(this, 1) << "XML payload missing" << dendl
;
601 RGWXMLDecoder::XMLParser parser
;
604 ldpp_dout(this, 1) << "failed to initialize XML parser" << dendl
;
607 if (!parser
.parse(data
.c_str(), data
.length(), 1)) {
608 ldpp_dout(this, 1) << "failed to parse XML payload" << dendl
;
609 return -ERR_MALFORMED_XML
;
612 // NotificationConfigurations is mandatory
613 // It can be empty which means we delete all the notifications
614 RGWXMLDecoder::decode_xml("NotificationConfiguration", configurations
, &parser
, true);
615 } catch (RGWXMLDecoder::err
& err
) {
616 ldpp_dout(this, 1) << "failed to parse XML payload. error: " << err
<< dendl
;
617 return -ERR_MALFORMED_XML
;
622 int verify_permission(optional_yield y
) override
;
624 void pre_exec() override
{
625 rgw_bucket_object_pre_exec(s
);
628 const char* name() const override
{ return "pubsub_notification_create_s3"; }
629 RGWOpType
get_type() override
{ return RGW_OP_PUBSUB_NOTIF_CREATE
; }
630 uint32_t op_mask() override
{ return RGW_OP_TYPE_WRITE
; }
633 void execute(optional_yield
) override
;
636 void RGWPSCreateNotifOp::execute(optional_yield y
) {
637 op_ret
= verify_params();
642 rgw_pubsub_s3_notifications configurations
;
643 op_ret
= get_params_from_body(configurations
);
648 std::unique_ptr
<rgw::sal::User
> user
= driver
->get_user(s
->owner
.get_id());
649 std::unique_ptr
<rgw::sal::Bucket
> bucket
;
650 op_ret
= driver
->get_bucket(this, user
.get(), s
->bucket_tenant
, s
->bucket_name
, &bucket
, y
);
652 ldpp_dout(this, 1) << "failed to get bucket '" <<
653 (s
->bucket_tenant
.empty() ? s
->bucket_name
: s
->bucket_tenant
+ ":" + s
->bucket_name
) <<
654 "' info, ret = " << op_ret
<< dendl
;
658 const RGWPubSub
ps(driver
, s
->owner
.get_id().tenant
);
659 const RGWPubSub::Bucket
b(ps
, bucket
.get());
661 if(configurations
.list
.empty()) {
662 // get all topics on a bucket
663 rgw_pubsub_bucket_topics bucket_topics
;
664 op_ret
= b
.get_topics(this, bucket_topics
, y
);
666 ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << s
->bucket_name
<< "', ret=" << op_ret
<< dendl
;
670 op_ret
= delete_all_notifications(this, bucket_topics
, b
, y
, ps
);
674 for (const auto& c
: configurations
.list
) {
675 const auto& notif_name
= c
.id
;
676 if (notif_name
.empty()) {
677 ldpp_dout(this, 1) << "missing notification id" << dendl
;
681 if (c
.topic_arn
.empty()) {
682 ldpp_dout(this, 1) << "missing topic ARN in notification: '" << notif_name
<< "'" << dendl
;
687 const auto arn
= rgw::ARN::parse(c
.topic_arn
);
688 if (!arn
|| arn
->resource
.empty()) {
689 ldpp_dout(this, 1) << "topic ARN has invalid format: '" << c
.topic_arn
<< "' in notification: '" << notif_name
<< "'" << dendl
;
694 if (std::find(c
.events
.begin(), c
.events
.end(), rgw::notify::UnknownEvent
) != c
.events
.end()) {
695 ldpp_dout(this, 1) << "unknown event type in notification: '" << notif_name
<< "'" << dendl
;
700 const auto topic_name
= arn
->resource
;
702 // get topic information. destination information is stored in the topic
703 rgw_pubsub_topic topic_info
;
704 op_ret
= ps
.get_topic(this, topic_name
, topic_info
, y
);
706 ldpp_dout(this, 1) << "failed to get topic '" << topic_name
<< "', ret=" << op_ret
<< dendl
;
709 // make sure that full topic configuration match
710 // TODO: use ARN match function
712 // create unique topic name. this has 2 reasons:
713 // (1) topics cannot be shared between different S3 notifications because they hold the filter information
714 // (2) make topic clneaup easier, when notification is removed
715 const auto unique_topic_name
= topic_to_unique(topic_name
, notif_name
);
716 // generate the internal topic. destination is stored here for the "push-only" case
717 // when no subscription exists
718 // ARN is cached to make the "GET" method faster
719 op_ret
= ps
.create_topic(this, unique_topic_name
, topic_info
.dest
, topic_info
.arn
, topic_info
.opaque_data
, y
);
721 ldpp_dout(this, 1) << "failed to auto-generate unique topic '" << unique_topic_name
<<
722 "', ret=" << op_ret
<< dendl
;
725 ldpp_dout(this, 20) << "successfully auto-generated unique topic '" << unique_topic_name
<< "'" << dendl
;
726 // generate the notification
727 rgw::notify::EventTypeList events
;
728 op_ret
= b
.create_notification(this, unique_topic_name
, c
.events
, std::make_optional(c
.filter
), notif_name
, y
);
730 ldpp_dout(this, 1) << "failed to auto-generate notification for unique topic '" << unique_topic_name
<<
731 "', ret=" << op_ret
<< dendl
;
732 // rollback generated topic (ignore return value)
733 ps
.remove_topic(this, unique_topic_name
, y
);
736 ldpp_dout(this, 20) << "successfully auto-generated notification for unique topic '" << unique_topic_name
<< "'" << dendl
;
740 int RGWPSCreateNotifOp::verify_permission(optional_yield y
) {
741 if (!verify_bucket_permission(this, s
, rgw::IAM::s3PutBucketNotification
)) {
748 // command (extension to S3): DELETE /bucket?notification[=<notification-id>]
749 class RGWPSDeleteNotifOp
: public RGWDefaultResponseOp
{
750 int get_params(std::string
& notif_name
) const {
752 notif_name
= s
->info
.args
.get("notification", &exists
);
754 ldpp_dout(this, 1) << "missing required param 'notification'" << dendl
;
757 if (s
->bucket_name
.empty()) {
758 ldpp_dout(this, 1) << "request must be on a bucket" << dendl
;
765 int verify_permission(optional_yield y
) override
;
767 void pre_exec() override
{
768 rgw_bucket_object_pre_exec(s
);
771 const char* name() const override
{ return "pubsub_notification_delete_s3"; }
772 RGWOpType
get_type() override
{ return RGW_OP_PUBSUB_NOTIF_DELETE
; }
773 uint32_t op_mask() override
{ return RGW_OP_TYPE_DELETE
; }
775 void execute(optional_yield y
) override
;
778 void RGWPSDeleteNotifOp::execute(optional_yield y
) {
779 std::string notif_name
;
780 op_ret
= get_params(notif_name
);
785 std::unique_ptr
<rgw::sal::User
> user
= driver
->get_user(s
->owner
.get_id());
786 std::unique_ptr
<rgw::sal::Bucket
> bucket
;
787 op_ret
= driver
->get_bucket(this, user
.get(), s
->bucket_tenant
, s
->bucket_name
, &bucket
, y
);
789 ldpp_dout(this, 1) << "failed to get bucket '" <<
790 (s
->bucket_tenant
.empty() ? s
->bucket_name
: s
->bucket_tenant
+ ":" + s
->bucket_name
) <<
791 "' info, ret = " << op_ret
<< dendl
;
795 const RGWPubSub
ps(driver
, s
->owner
.get_id().tenant
);
796 const RGWPubSub::Bucket
b(ps
, bucket
.get());
798 // get all topics on a bucket
799 rgw_pubsub_bucket_topics bucket_topics
;
800 op_ret
= b
.get_topics(this, bucket_topics
, y
);
802 ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << s
->bucket_name
<< "', ret=" << op_ret
<< dendl
;
806 if (!notif_name
.empty()) {
807 // delete a specific notification
808 const auto unique_topic
= find_unique_topic(bucket_topics
, notif_name
);
810 const auto unique_topic_name
= unique_topic
->get().topic
.name
;
811 op_ret
= remove_notification_by_topic(this, unique_topic_name
, b
, y
, ps
);
814 // notification to be removed is not found - considered success
815 ldpp_dout(this, 20) << "notification '" << notif_name
<< "' already removed" << dendl
;
819 op_ret
= delete_all_notifications(this, bucket_topics
, b
, y
, ps
);
822 int RGWPSDeleteNotifOp::verify_permission(optional_yield y
) {
823 if (!verify_bucket_permission(this, s
, rgw::IAM::s3PutBucketNotification
)) {
830 // command (S3 compliant): GET /bucket?notification[=<notification-id>]
831 class RGWPSListNotifsOp
: public RGWOp
{
832 rgw_pubsub_s3_notifications notifications
;
834 int get_params(std::string
& notif_name
) const {
836 notif_name
= s
->info
.args
.get("notification", &exists
);
838 ldpp_dout(this, 1) << "missing required param 'notification'" << dendl
;
841 if (s
->bucket_name
.empty()) {
842 ldpp_dout(this, 1) << "request must be on a bucket" << dendl
;
849 int verify_permission(optional_yield y
) override
;
851 void pre_exec() override
{
852 rgw_bucket_object_pre_exec(s
);
855 const char* name() const override
{ return "pubsub_notifications_get_s3"; }
856 RGWOpType
get_type() override
{ return RGW_OP_PUBSUB_NOTIF_LIST
; }
857 uint32_t op_mask() override
{ return RGW_OP_TYPE_READ
; }
859 void execute(optional_yield y
) override
;
860 void send_response() override
{
862 set_req_state_err(s
, op_ret
);
865 end_header(s
, this, "application/xml");
870 notifications
.dump_xml(s
->formatter
);
871 rgw_flush_formatter_and_reset(s
, s
->formatter
);
875 void RGWPSListNotifsOp::execute(optional_yield y
) {
876 std::string notif_name
;
877 op_ret
= get_params(notif_name
);
882 std::unique_ptr
<rgw::sal::User
> user
= driver
->get_user(s
->owner
.get_id());
883 std::unique_ptr
<rgw::sal::Bucket
> bucket
;
884 op_ret
= driver
->get_bucket(this, user
.get(), s
->bucket_tenant
, s
->bucket_name
, &bucket
, y
);
886 ldpp_dout(this, 1) << "failed to get bucket '" <<
887 (s
->bucket_tenant
.empty() ? s
->bucket_name
: s
->bucket_tenant
+ ":" + s
->bucket_name
) <<
888 "' info, ret = " << op_ret
<< dendl
;
892 const RGWPubSub
ps(driver
, s
->owner
.get_id().tenant
);
893 const RGWPubSub::Bucket
b(ps
, bucket
.get());
895 // get all topics on a bucket
896 rgw_pubsub_bucket_topics bucket_topics
;
897 op_ret
= b
.get_topics(this, bucket_topics
, y
);
899 ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << s
->bucket_name
<< "', ret=" << op_ret
<< dendl
;
902 if (!notif_name
.empty()) {
903 // get info of a specific notification
904 const auto unique_topic
= find_unique_topic(bucket_topics
, notif_name
);
906 notifications
.list
.emplace_back(unique_topic
->get());
910 ldpp_dout(this, 1) << "failed to get notification info for '" << notif_name
<< "', ret=" << op_ret
<< dendl
;
913 // loop through all topics of the bucket
914 for (const auto& topic
: bucket_topics
.topics
) {
915 if (topic
.second
.s3_id
.empty()) {
916 // not an s3 notification
919 notifications
.list
.emplace_back(topic
.second
);
923 int RGWPSListNotifsOp::verify_permission(optional_yield y
) {
924 if (!verify_bucket_permission(this, s
, rgw::IAM::s3GetBucketNotification
)) {
931 RGWOp
* RGWHandler_REST_PSNotifs_S3::op_get() {
932 return new RGWPSListNotifsOp();
935 RGWOp
* RGWHandler_REST_PSNotifs_S3::op_put() {
936 return new RGWPSCreateNotifOp();
939 RGWOp
* RGWHandler_REST_PSNotifs_S3::op_delete() {
940 return new RGWPSDeleteNotifOp();
943 RGWOp
* RGWHandler_REST_PSNotifs_S3::create_get_op() {
944 return new RGWPSListNotifsOp();
947 RGWOp
* RGWHandler_REST_PSNotifs_S3::create_put_op() {
948 return new RGWPSCreateNotifOp();
951 RGWOp
* RGWHandler_REST_PSNotifs_S3::create_delete_op() {
952 return new RGWPSDeleteNotifOp();