1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 #include <boost/tokenizer.hpp>
7 #include "rgw_rest_pubsub_common.h"
8 #include "rgw_rest_pubsub.h"
9 #include "rgw_pubsub_push.h"
10 #include "rgw_pubsub.h"
11 #include "rgw_sync_module_pubsub.h"
14 #include "rgw_rest_s3.h"
16 #include "rgw_auth_s3.h"
17 #include "rgw_notify.h"
18 #include "rgw_sal_rados.h"
19 #include "services/svc_zone.h"
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_rgw
26 static const char* AWS_SNS_NS("https://sns.amazonaws.com/doc/2010-03-31/");
28 // command (AWS compliant):
30 // Action=CreateTopic&Name=<topic-name>[&OpaqueData=data][&push-endpoint=<endpoint>[&persistent][&<arg1>=<value1>]]
31 class RGWPSCreateTopic_ObjStore_AWS
: public RGWPSCreateTopicOp
{
33 int get_params() override
{
34 topic_name
= s
->info
.args
.get("Name");
35 if (topic_name
.empty()) {
36 ldpp_dout(this, 1) << "CreateTopic Action 'Name' argument is missing" << dendl
;
40 opaque_data
= s
->info
.args
.get("OpaqueData");
42 dest
.push_endpoint
= s
->info
.args
.get("push-endpoint");
43 s
->info
.args
.get_bool("persistent", &dest
.persistent
, false);
45 if (!validate_and_update_endpoint_secret(dest
, s
->cct
, *(s
->info
.env
))) {
48 for (const auto& param
: s
->info
.args
.get_params()) {
49 if (param
.first
== "Action" || param
.first
== "Name" || param
.first
== "PayloadHash") {
52 dest
.push_endpoint_args
.append(param
.first
+"="+param
.second
+"&");
55 if (!dest
.push_endpoint_args
.empty()) {
56 // remove last separator
57 dest
.push_endpoint_args
.pop_back();
59 if (!dest
.push_endpoint
.empty() && dest
.persistent
) {
60 const auto ret
= rgw::notify::add_persistent_topic(topic_name
, s
->yield
);
62 ldpp_dout(this, 1) << "CreateTopic Action failed to create queue for persistent topics. error:" << ret
<< dendl
;
67 // dest object only stores endpoint info
68 // bucket to store events/records will be set only when subscription is created
69 dest
.bucket_name
= "";
71 dest
.arn_topic
= topic_name
;
72 // the topic ARN will be sent in the reply
73 const rgw::ARN
arn(rgw::Partition::aws
, rgw::Service::sns
,
74 store
->get_zone()->get_zonegroup().get_name(),
75 s
->user
->get_tenant(), topic_name
);
76 topic_arn
= arn
.to_string();
80 void send_response() override
{
82 set_req_state_err(s
, op_ret
);
85 end_header(s
, this, "application/xml");
91 const auto f
= s
->formatter
;
92 f
->open_object_section_in_ns("CreateTopicResponse", AWS_SNS_NS
);
93 f
->open_object_section("CreateTopicResult");
94 encode_xml("TopicArn", topic_arn
, f
);
95 f
->close_section(); // CreateTopicResult
96 f
->open_object_section("ResponseMetadata");
97 encode_xml("RequestId", s
->req_id
, f
);
98 f
->close_section(); // ResponseMetadata
99 f
->close_section(); // CreateTopicResponse
100 rgw_flush_formatter_and_reset(s
, f
);
104 // command (AWS compliant):
107 class RGWPSListTopics_ObjStore_AWS
: public RGWPSListTopicsOp
{
109 void send_response() override
{
111 set_req_state_err(s
, op_ret
);
114 end_header(s
, this, "application/xml");
120 const auto f
= s
->formatter
;
121 f
->open_object_section_in_ns("ListTopicsResponse", AWS_SNS_NS
);
122 f
->open_object_section("ListTopicsResult");
123 encode_xml("Topics", result
, f
);
124 f
->close_section(); // ListTopicsResult
125 f
->open_object_section("ResponseMetadata");
126 encode_xml("RequestId", s
->req_id
, f
);
127 f
->close_section(); // ResponseMetadat
128 f
->close_section(); // ListTopicsResponse
129 rgw_flush_formatter_and_reset(s
, f
);
133 // command (extension to AWS):
135 // Action=GetTopic&TopicArn=<topic-arn>
136 class RGWPSGetTopic_ObjStore_AWS
: public RGWPSGetTopicOp
{
138 int get_params() override
{
139 const auto topic_arn
= rgw::ARN::parse((s
->info
.args
.get("TopicArn")));
141 if (!topic_arn
|| topic_arn
->resource
.empty()) {
142 ldpp_dout(this, 1) << "GetTopic Action 'TopicArn' argument is missing or invalid" << dendl
;
146 topic_name
= topic_arn
->resource
;
150 void send_response() override
{
152 set_req_state_err(s
, op_ret
);
155 end_header(s
, this, "application/xml");
161 const auto f
= s
->formatter
;
162 f
->open_object_section("GetTopicResponse");
163 f
->open_object_section("GetTopicResult");
164 encode_xml("Topic", result
.topic
, f
);
166 f
->open_object_section("ResponseMetadata");
167 encode_xml("RequestId", s
->req_id
, f
);
170 rgw_flush_formatter_and_reset(s
, f
);
174 // command (AWS compliant):
176 // Action=GetTopicAttributes&TopicArn=<topic-arn>
177 class RGWPSGetTopicAttributes_ObjStore_AWS
: public RGWPSGetTopicOp
{
179 int get_params() override
{
180 const auto topic_arn
= rgw::ARN::parse((s
->info
.args
.get("TopicArn")));
182 if (!topic_arn
|| topic_arn
->resource
.empty()) {
183 ldpp_dout(this, 1) << "GetTopicAttribute Action 'TopicArn' argument is missing or invalid" << dendl
;
187 topic_name
= topic_arn
->resource
;
191 void send_response() override
{
193 set_req_state_err(s
, op_ret
);
196 end_header(s
, this, "application/xml");
202 const auto f
= s
->formatter
;
203 f
->open_object_section_in_ns("GetTopicAttributesResponse", AWS_SNS_NS
);
204 f
->open_object_section("GetTopicAttributesResult");
205 result
.topic
.dump_xml_as_attributes(f
);
206 f
->close_section(); // GetTopicAttributesResult
207 f
->open_object_section("ResponseMetadata");
208 encode_xml("RequestId", s
->req_id
, f
);
209 f
->close_section(); // ResponseMetadata
210 f
->close_section(); // GetTopicAttributesResponse
211 rgw_flush_formatter_and_reset(s
, f
);
215 // command (AWS compliant):
217 // Action=DeleteTopic&TopicArn=<topic-arn>
218 class RGWPSDeleteTopic_ObjStore_AWS
: public RGWPSDeleteTopicOp
{
220 int get_params() override
{
221 const auto topic_arn
= rgw::ARN::parse((s
->info
.args
.get("TopicArn")));
223 if (!topic_arn
|| topic_arn
->resource
.empty()) {
224 ldpp_dout(this, 1) << "DeleteTopic Action 'TopicArn' argument is missing or invalid" << dendl
;
228 topic_name
= topic_arn
->resource
;
230 // upon deletion it is not known if topic is persistent or not
231 // will try to delete the persistent topic anyway
232 const auto ret
= rgw::notify::remove_persistent_topic(topic_name
, s
->yield
);
233 if (ret
== -ENOENT
) {
234 // topic was not persistent, or already deleted
238 ldpp_dout(this, 1) << "DeleteTopic Action failed to remove queue for persistent topics. error:" << ret
<< dendl
;
245 void send_response() override
{
247 set_req_state_err(s
, op_ret
);
250 end_header(s
, this, "application/xml");
256 const auto f
= s
->formatter
;
257 f
->open_object_section_in_ns("DeleteTopicResponse", AWS_SNS_NS
);
258 f
->open_object_section("ResponseMetadata");
259 encode_xml("RequestId", s
->req_id
, f
);
260 f
->close_section(); // ResponseMetadata
261 f
->close_section(); // DeleteTopicResponse
262 rgw_flush_formatter_and_reset(s
, f
);
267 // utility classes and functions for handling parameters with the following format:
268 // Attributes.entry.{N}.{key|value}={VALUE}
269 // N - any unsigned number
270 // VALUE - url encoded string
272 // and Attribute is holding key and value
273 // ctor and set are done according to the "type" argument
274 // if type is not "key" or "value" its a no-op
279 Attribute(const std::string
& type
, const std::string
& key_or_value
) {
280 set(type
, key_or_value
);
282 void set(const std::string
& type
, const std::string
& key_or_value
) {
285 } else if (type
== "value") {
286 value
= key_or_value
;
289 const std::string
& get_key() const { return key
; }
290 const std::string
& get_value() const { return value
; }
293 using AttributeMap
= std::map
<unsigned, Attribute
>;
295 // aggregate the attributes into a map
296 // the key and value are associated by the index (N)
297 // no assumptions are made on the order in which these parameters are added
298 void update_attribute_map(const std::string
& input
, AttributeMap
& map
) {
299 const boost::char_separator
<char> sep(".");
300 const boost::tokenizer
tokens(input
, sep
);
301 auto token
= tokens
.begin();
302 if (*token
!= "Attributes") {
307 if (*token
!= "entry") {
314 idx
= std::stoul(*token
);
315 } catch (const std::invalid_argument
&) {
320 std::string key_or_value
= "";
321 // get the rest of the string regardless of dots
322 // this is to allow dots in the value
323 while (token
!= tokens
.end()) {
324 key_or_value
.append(*token
+".");
327 // remove last separator
328 key_or_value
.pop_back();
330 auto pos
= key_or_value
.find("=");
331 if (pos
!= string::npos
) {
332 const auto key_or_value_lhs
= key_or_value
.substr(0, pos
);
333 const auto key_or_value_rhs
= url_decode(key_or_value
.substr(pos
+ 1, key_or_value
.size() - 1));
334 const auto map_it
= map
.find(idx
);
335 if (map_it
== map
.end()) {
337 map
.emplace(std::make_pair(idx
, Attribute(key_or_value_lhs
, key_or_value_rhs
)));
340 map_it
->second
.set(key_or_value_lhs
, key_or_value_rhs
);
346 void RGWHandler_REST_PSTopic_AWS::rgw_topic_parse_input() {
347 if (post_body
.size() > 0) {
348 ldpp_dout(s
, 10) << "Content of POST: " << post_body
<< dendl
;
350 if (post_body
.find("Action") != string::npos
) {
351 const boost::char_separator
<char> sep("&");
352 const boost::tokenizer
<boost::char_separator
<char>> tokens(post_body
, sep
);
354 for (const auto& t
: tokens
) {
355 auto pos
= t
.find("=");
356 if (pos
!= string::npos
) {
357 const auto key
= t
.substr(0, pos
);
358 if (key
== "Action") {
359 s
->info
.args
.append(key
, t
.substr(pos
+ 1, t
.size() - 1));
360 } else if (key
== "Name" || key
== "TopicArn") {
361 const auto value
= url_decode(t
.substr(pos
+ 1, t
.size() - 1));
362 s
->info
.args
.append(key
, value
);
364 update_attribute_map(t
, map
);
368 // update the regular args with the content of the attribute map
369 for (const auto& attr
: map
) {
370 s
->info
.args
.append(attr
.second
.get_key(), attr
.second
.get_value());
373 const auto payload_hash
= rgw::auth::s3::calc_v4_payload_hash(post_body
);
374 s
->info
.args
.append("PayloadHash", payload_hash
);
378 RGWOp
* RGWHandler_REST_PSTopic_AWS::op_post() {
379 rgw_topic_parse_input();
381 if (s
->info
.args
.exists("Action")) {
382 const auto action
= s
->info
.args
.get("Action");
383 if (action
.compare("CreateTopic") == 0)
384 return new RGWPSCreateTopic_ObjStore_AWS();
385 if (action
.compare("DeleteTopic") == 0)
386 return new RGWPSDeleteTopic_ObjStore_AWS
;
387 if (action
.compare("ListTopics") == 0)
388 return new RGWPSListTopics_ObjStore_AWS();
389 if (action
.compare("GetTopic") == 0)
390 return new RGWPSGetTopic_ObjStore_AWS();
391 if (action
.compare("GetTopicAttributes") == 0)
392 return new RGWPSGetTopicAttributes_ObjStore_AWS();
398 int RGWHandler_REST_PSTopic_AWS::authorize(const DoutPrefixProvider
* dpp
, optional_yield y
) {
399 return RGW_Auth_S3::authorize(dpp
, store
, auth_registry
, s
, y
);
404 // return a unique topic by prefexing with the notification name: <notification>_<topic>
405 std::string
topic_to_unique(const std::string
& topic
, const std::string
& notification
) {
406 return notification
+ "_" + topic
;
409 // extract the topic from a unique topic of the form: <notification>_<topic>
410 [[maybe_unused
]] std::string
unique_to_topic(const std::string
& unique_topic
, const std::string
& notification
) {
411 if (unique_topic
.find(notification
+ "_") == string::npos
) {
414 return unique_topic
.substr(notification
.length() + 1);
417 // from list of bucket topics, find the one that was auto-generated by a notification
418 auto find_unique_topic(const rgw_pubsub_bucket_topics
& bucket_topics
, const std::string
& notif_name
) {
419 auto it
= std::find_if(bucket_topics
.topics
.begin(), bucket_topics
.topics
.end(), [&](const auto& val
) { return notif_name
== val
.second
.s3_id
; });
420 return it
!= bucket_topics
.topics
.end() ?
421 std::optional
<std::reference_wrapper
<const rgw_pubsub_topic_filter
>>(it
->second
):
426 int remove_notification_by_topic(const DoutPrefixProvider
*dpp
, const std::string
& topic_name
, const RGWPubSub::BucketRef
& b
, optional_yield y
, RGWPubSub
& ps
) {
427 int op_ret
= b
->remove_notification(dpp
, topic_name
, y
);
429 ldpp_dout(dpp
, 1) << "failed to remove notification of topic '" << topic_name
<< "', ret=" << op_ret
<< dendl
;
431 op_ret
= ps
.remove_topic(dpp
, topic_name
, y
);
433 ldpp_dout(dpp
, 1) << "failed to remove auto-generated topic '" << topic_name
<< "', ret=" << op_ret
<< dendl
;
438 int delete_all_notifications(const DoutPrefixProvider
*dpp
, const rgw_pubsub_bucket_topics
& bucket_topics
, const RGWPubSub::BucketRef
& b
, optional_yield y
, RGWPubSub
& ps
) {
439 // delete all notifications of on a bucket
440 for (const auto& topic
: bucket_topics
.topics
) {
441 // remove the auto generated subscription of the topic (if exist)
442 rgw_pubsub_topic_subs topic_subs
;
443 int op_ret
= ps
.get_topic(topic
.first
, &topic_subs
);
444 for (const auto& topic_sub_name
: topic_subs
.subs
) {
445 auto sub
= ps
.get_sub(topic_sub_name
);
446 rgw_pubsub_sub_config sub_conf
;
447 op_ret
= sub
->get_conf(&sub_conf
);
449 ldpp_dout(dpp
, 1) << "failed to get subscription '" << topic_sub_name
<< "' info, ret=" << op_ret
<< dendl
;
452 if (!sub_conf
.s3_id
.empty()) {
453 // S3 notification, has autogenerated subscription
454 const auto& sub_topic_name
= sub_conf
.topic
;
455 op_ret
= sub
->unsubscribe(dpp
, sub_topic_name
, y
);
457 ldpp_dout(dpp
, 1) << "failed to remove auto-generated subscription '" << topic_sub_name
<< "', ret=" << op_ret
<< dendl
;
462 op_ret
= remove_notification_by_topic(dpp
, topic
.first
, b
, y
, ps
);
470 // command (S3 compliant): PUT /<bucket name>?notification
471 // a "notification" and a subscription will be auto-generated
472 // actual configuration is XML encoded in the body of the message
473 class RGWPSCreateNotif_ObjStore_S3
: public RGWPSCreateNotifOp
{
474 rgw_pubsub_s3_notifications configurations
;
476 int get_params_from_body() {
477 const auto max_size
= s
->cct
->_conf
->rgw_max_put_param_size
;
480 std::tie(r
, data
) = read_all_input(s
, max_size
, false);
483 ldpp_dout(this, 1) << "failed to read XML payload" << dendl
;
486 if (data
.length() == 0) {
487 ldpp_dout(this, 1) << "XML payload missing" << dendl
;
491 RGWXMLDecoder::XMLParser parser
;
494 ldpp_dout(this, 1) << "failed to initialize XML parser" << dendl
;
497 if (!parser
.parse(data
.c_str(), data
.length(), 1)) {
498 ldpp_dout(this, 1) << "failed to parse XML payload" << dendl
;
499 return -ERR_MALFORMED_XML
;
502 // NotificationConfigurations is mandatory
503 // It can be empty which means we delete all the notifications
504 RGWXMLDecoder::decode_xml("NotificationConfiguration", configurations
, &parser
, true);
505 } catch (RGWXMLDecoder::err
& err
) {
506 ldpp_dout(this, 1) << "failed to parse XML payload. error: " << err
<< dendl
;
507 return -ERR_MALFORMED_XML
;
512 int get_params() override
{
514 const auto no_value
= s
->info
.args
.get("notification", &exists
);
516 ldpp_dout(this, 1) << "missing required param 'notification'" << dendl
;
519 if (no_value
.length() > 0) {
520 ldpp_dout(this, 1) << "param 'notification' should not have any value" << dendl
;
523 if (s
->bucket_name
.empty()) {
524 ldpp_dout(this, 1) << "request must be on a bucket" << dendl
;
527 bucket_name
= s
->bucket_name
;
532 const char* name() const override
{ return "pubsub_notification_create_s3"; }
533 void execute(optional_yield
) override
;
536 void RGWPSCreateNotif_ObjStore_S3::execute(optional_yield y
) {
537 op_ret
= get_params_from_body();
542 ps
.emplace(static_cast<rgw::sal::RadosStore
*>(store
), s
->owner
.get_id().tenant
);
543 auto b
= ps
->get_bucket(bucket_info
.bucket
);
546 std::string data_bucket_prefix
= "";
547 std::string data_oid_prefix
= "";
548 bool push_only
= true;
549 if (store
->get_sync_module()) {
550 const auto psmodule
= dynamic_cast<RGWPSSyncModuleInstance
*>(store
->get_sync_module().get());
552 const auto& conf
= psmodule
->get_effective_conf();
553 data_bucket_prefix
= conf
["data_bucket_prefix"];
554 data_oid_prefix
= conf
["data_oid_prefix"];
555 // TODO: allow "push-only" on PS zone as well
560 if(configurations
.list
.empty()) {
561 // get all topics on a bucket
562 rgw_pubsub_bucket_topics bucket_topics
;
563 op_ret
= b
->get_topics(&bucket_topics
);
565 ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << bucket_info
.bucket
.name
<< "', ret=" << op_ret
<< dendl
;
569 op_ret
= delete_all_notifications(this, bucket_topics
, b
, y
, *ps
);
573 for (const auto& c
: configurations
.list
) {
574 const auto& notif_name
= c
.id
;
575 if (notif_name
.empty()) {
576 ldpp_dout(this, 1) << "missing notification id" << dendl
;
580 if (c
.topic_arn
.empty()) {
581 ldpp_dout(this, 1) << "missing topic ARN in notification: '" << notif_name
<< "'" << dendl
;
586 const auto arn
= rgw::ARN::parse(c
.topic_arn
);
587 if (!arn
|| arn
->resource
.empty()) {
588 ldpp_dout(this, 1) << "topic ARN has invalid format: '" << c
.topic_arn
<< "' in notification: '" << notif_name
<< "'" << dendl
;
593 if (std::find(c
.events
.begin(), c
.events
.end(), rgw::notify::UnknownEvent
) != c
.events
.end()) {
594 ldpp_dout(this, 1) << "unknown event type in notification: '" << notif_name
<< "'" << dendl
;
599 const auto topic_name
= arn
->resource
;
601 // get topic information. destination information is stored in the topic
602 rgw_pubsub_topic topic_info
;
603 op_ret
= ps
->get_topic(topic_name
, &topic_info
);
605 ldpp_dout(this, 1) << "failed to get topic '" << topic_name
<< "', ret=" << op_ret
<< dendl
;
608 // make sure that full topic configuration match
609 // TODO: use ARN match function
611 // create unique topic name. this has 2 reasons:
612 // (1) topics cannot be shared between different S3 notifications because they hold the filter information
613 // (2) make topic clneaup easier, when notification is removed
614 const auto unique_topic_name
= topic_to_unique(topic_name
, notif_name
);
615 // generate the internal topic. destination is stored here for the "push-only" case
616 // when no subscription exists
617 // ARN is cached to make the "GET" method faster
618 op_ret
= ps
->create_topic(this, unique_topic_name
, topic_info
.dest
, topic_info
.arn
, topic_info
.opaque_data
, y
);
620 ldpp_dout(this, 1) << "failed to auto-generate unique topic '" << unique_topic_name
<<
621 "', ret=" << op_ret
<< dendl
;
624 ldpp_dout(this, 20) << "successfully auto-generated unique topic '" << unique_topic_name
<< "'" << dendl
;
625 // generate the notification
626 rgw::notify::EventTypeList events
;
627 op_ret
= b
->create_notification(this, unique_topic_name
, c
.events
, std::make_optional(c
.filter
), notif_name
, y
);
629 ldpp_dout(this, 1) << "failed to auto-generate notification for unique topic '" << unique_topic_name
<<
630 "', ret=" << op_ret
<< dendl
;
631 // rollback generated topic (ignore return value)
632 ps
->remove_topic(this, unique_topic_name
, y
);
635 ldpp_dout(this, 20) << "successfully auto-generated notification for unique topic '" << unique_topic_name
<< "'" << dendl
;
638 // generate the subscription with destination information from the original topic
639 rgw_pubsub_sub_dest dest
= topic_info
.dest
;
640 dest
.bucket_name
= data_bucket_prefix
+ s
->owner
.get_id().to_str() + "-" + unique_topic_name
;
641 dest
.oid_prefix
= data_oid_prefix
+ notif_name
+ "/";
642 auto sub
= ps
->get_sub(notif_name
);
643 op_ret
= sub
->subscribe(this, unique_topic_name
, dest
, y
, notif_name
);
645 ldpp_dout(this, 1) << "failed to auto-generate subscription '" << notif_name
<< "', ret=" << op_ret
<< dendl
;
646 // rollback generated notification (ignore return value)
647 b
->remove_notification(this, unique_topic_name
, y
);
648 // rollback generated topic (ignore return value)
649 ps
->remove_topic(this, unique_topic_name
, y
);
652 ldpp_dout(this, 20) << "successfully auto-generated subscription '" << notif_name
<< "'" << dendl
;
657 // command (extension to S3): DELETE /bucket?notification[=<notification-id>]
658 class RGWPSDeleteNotif_ObjStore_S3
: public RGWPSDeleteNotifOp
{
660 std::string notif_name
;
662 int get_params() override
{
664 notif_name
= s
->info
.args
.get("notification", &exists
);
666 ldpp_dout(this, 1) << "missing required param 'notification'" << dendl
;
669 if (s
->bucket_name
.empty()) {
670 ldpp_dout(this, 1) << "request must be on a bucket" << dendl
;
673 bucket_name
= s
->bucket_name
;
678 void execute(optional_yield y
) override
;
679 const char* name() const override
{ return "pubsub_notification_delete_s3"; }
682 void RGWPSDeleteNotif_ObjStore_S3::execute(optional_yield y
) {
683 op_ret
= get_params();
688 ps
.emplace(static_cast<rgw::sal::RadosStore
*>(store
), s
->owner
.get_id().tenant
);
689 auto b
= ps
->get_bucket(bucket_info
.bucket
);
692 // get all topics on a bucket
693 rgw_pubsub_bucket_topics bucket_topics
;
694 op_ret
= b
->get_topics(&bucket_topics
);
696 ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << bucket_info
.bucket
.name
<< "', ret=" << op_ret
<< dendl
;
700 if (!notif_name
.empty()) {
701 // delete a specific notification
702 const auto unique_topic
= find_unique_topic(bucket_topics
, notif_name
);
704 // remove the auto generated subscription according to notification name (if exist)
705 const auto unique_topic_name
= unique_topic
->get().topic
.name
;
706 auto sub
= ps
->get_sub(notif_name
);
707 op_ret
= sub
->unsubscribe(this, unique_topic_name
, y
);
708 if (op_ret
< 0 && op_ret
!= -ENOENT
) {
709 ldpp_dout(this, 1) << "failed to remove auto-generated subscription '" << notif_name
<< "', ret=" << op_ret
<< dendl
;
712 op_ret
= remove_notification_by_topic(this, unique_topic_name
, b
, y
, *ps
);
715 // notification to be removed is not found - considered success
716 ldpp_dout(this, 20) << "notification '" << notif_name
<< "' already removed" << dendl
;
720 op_ret
= delete_all_notifications(this, bucket_topics
, b
, y
, *ps
);
723 // command (S3 compliant): GET /bucket?notification[=<notification-id>]
724 class RGWPSListNotifs_ObjStore_S3
: public RGWPSListNotifsOp
{
726 std::string notif_name
;
727 rgw_pubsub_s3_notifications notifications
;
729 int get_params() override
{
731 notif_name
= s
->info
.args
.get("notification", &exists
);
733 ldpp_dout(this, 1) << "missing required param 'notification'" << dendl
;
736 if (s
->bucket_name
.empty()) {
737 ldpp_dout(this, 1) << "request must be on a bucket" << dendl
;
740 bucket_name
= s
->bucket_name
;
745 void execute(optional_yield y
) override
;
746 void send_response() override
{
748 set_req_state_err(s
, op_ret
);
751 end_header(s
, this, "application/xml");
756 notifications
.dump_xml(s
->formatter
);
757 rgw_flush_formatter_and_reset(s
, s
->formatter
);
759 const char* name() const override
{ return "pubsub_notifications_get_s3"; }
762 void RGWPSListNotifs_ObjStore_S3::execute(optional_yield y
) {
763 ps
.emplace(static_cast<rgw::sal::RadosStore
*>(store
), s
->owner
.get_id().tenant
);
764 auto b
= ps
->get_bucket(bucket_info
.bucket
);
767 // get all topics on a bucket
768 rgw_pubsub_bucket_topics bucket_topics
;
769 op_ret
= b
->get_topics(&bucket_topics
);
771 ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << bucket_info
.bucket
.name
<< "', ret=" << op_ret
<< dendl
;
774 if (!notif_name
.empty()) {
775 // get info of a specific notification
776 const auto unique_topic
= find_unique_topic(bucket_topics
, notif_name
);
778 notifications
.list
.emplace_back(unique_topic
->get());
782 ldpp_dout(this, 1) << "failed to get notification info for '" << notif_name
<< "', ret=" << op_ret
<< dendl
;
785 // loop through all topics of the bucket
786 for (const auto& topic
: bucket_topics
.topics
) {
787 if (topic
.second
.s3_id
.empty()) {
788 // not an s3 notification
791 notifications
.list
.emplace_back(topic
.second
);
795 RGWOp
* RGWHandler_REST_PSNotifs_S3::op_get() {
796 return new RGWPSListNotifs_ObjStore_S3();
799 RGWOp
* RGWHandler_REST_PSNotifs_S3::op_put() {
800 return new RGWPSCreateNotif_ObjStore_S3();
803 RGWOp
* RGWHandler_REST_PSNotifs_S3::op_delete() {
804 return new RGWPSDeleteNotif_ObjStore_S3();
807 RGWOp
* RGWHandler_REST_PSNotifs_S3::create_get_op() {
808 return new RGWPSListNotifs_ObjStore_S3();
811 RGWOp
* RGWHandler_REST_PSNotifs_S3::create_put_op() {
812 return new RGWPSCreateNotif_ObjStore_S3();
815 RGWOp
* RGWHandler_REST_PSNotifs_S3::create_delete_op() {
816 return new RGWPSDeleteNotif_ObjStore_S3();