1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 #include <boost/tokenizer.hpp>
7 #include "rgw_rest_pubsub_common.h"
8 #include "rgw_rest_pubsub.h"
9 #include "rgw_pubsub_push.h"
10 #include "rgw_pubsub.h"
11 #include "rgw_sync_module_pubsub.h"
14 #include "rgw_rest_s3.h"
16 #include "rgw_auth_s3.h"
17 #include "rgw_notify.h"
18 #include "rgw_sal_rados.h"
19 #include "services/svc_zone.h"
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_rgw
24 static const char* AWS_SNS_NS("https://sns.amazonaws.com/doc/2010-03-31/");
26 // command (AWS compliant):
28 // Action=CreateTopic&Name=<topic-name>[&OpaqueData=data][&push-endpoint=<endpoint>[&persistent][&<arg1>=<value1>]]
29 class RGWPSCreateTopic_ObjStore_AWS
: public RGWPSCreateTopicOp
{
31 int get_params() override
{
32 topic_name
= s
->info
.args
.get("Name");
33 if (topic_name
.empty()) {
34 ldpp_dout(this, 1) << "CreateTopic Action 'Name' argument is missing" << dendl
;
38 opaque_data
= s
->info
.args
.get("OpaqueData");
40 dest
.push_endpoint
= s
->info
.args
.get("push-endpoint");
41 s
->info
.args
.get_bool("persistent", &dest
.persistent
, false);
43 if (!validate_and_update_endpoint_secret(dest
, s
->cct
, *(s
->info
.env
))) {
46 for (const auto& param
: s
->info
.args
.get_params()) {
47 if (param
.first
== "Action" || param
.first
== "Name" || param
.first
== "PayloadHash") {
50 dest
.push_endpoint_args
.append(param
.first
+"="+param
.second
+"&");
53 if (!dest
.push_endpoint_args
.empty()) {
54 // remove last separator
55 dest
.push_endpoint_args
.pop_back();
57 if (!dest
.push_endpoint
.empty() && dest
.persistent
) {
58 const auto ret
= rgw::notify::add_persistent_topic(topic_name
, s
->yield
);
60 ldpp_dout(this, 1) << "CreateTopic Action failed to create queue for persistent topics. error:" << ret
<< dendl
;
65 // dest object only stores endpoint info
66 // bucket to store events/records will be set only when subscription is created
67 dest
.bucket_name
= "";
69 dest
.arn_topic
= topic_name
;
70 // the topic ARN will be sent in the reply
71 const rgw::ARN
arn(rgw::Partition::aws
, rgw::Service::sns
,
72 store
->svc()->zone
->get_zonegroup().get_name(),
73 s
->user
->get_tenant(), topic_name
);
74 topic_arn
= arn
.to_string();
78 void send_response() override
{
80 set_req_state_err(s
, op_ret
);
83 end_header(s
, this, "application/xml");
89 const auto f
= s
->formatter
;
90 f
->open_object_section_in_ns("CreateTopicResponse", AWS_SNS_NS
);
91 f
->open_object_section("CreateTopicResult");
92 encode_xml("TopicArn", topic_arn
, f
);
93 f
->close_section(); // CreateTopicResult
94 f
->open_object_section("ResponseMetadata");
95 encode_xml("RequestId", s
->req_id
, f
);
96 f
->close_section(); // ResponseMetadata
97 f
->close_section(); // CreateTopicResponse
98 rgw_flush_formatter_and_reset(s
, f
);
102 // command (AWS compliant):
105 class RGWPSListTopics_ObjStore_AWS
: public RGWPSListTopicsOp
{
107 void send_response() override
{
109 set_req_state_err(s
, op_ret
);
112 end_header(s
, this, "application/xml");
118 const auto f
= s
->formatter
;
119 f
->open_object_section_in_ns("ListTopicsResponse", AWS_SNS_NS
);
120 f
->open_object_section("ListTopicsResult");
121 encode_xml("Topics", result
, f
);
122 f
->close_section(); // ListTopicsResult
123 f
->open_object_section("ResponseMetadata");
124 encode_xml("RequestId", s
->req_id
, f
);
125 f
->close_section(); // ResponseMetadat
126 f
->close_section(); // ListTopicsResponse
127 rgw_flush_formatter_and_reset(s
, f
);
131 // command (extension to AWS):
133 // Action=GetTopic&TopicArn=<topic-arn>
134 class RGWPSGetTopic_ObjStore_AWS
: public RGWPSGetTopicOp
{
136 int get_params() override
{
137 const auto topic_arn
= rgw::ARN::parse((s
->info
.args
.get("TopicArn")));
139 if (!topic_arn
|| topic_arn
->resource
.empty()) {
140 ldpp_dout(this, 1) << "GetTopic Action 'TopicArn' argument is missing or invalid" << dendl
;
144 topic_name
= topic_arn
->resource
;
148 void send_response() override
{
150 set_req_state_err(s
, op_ret
);
153 end_header(s
, this, "application/xml");
159 const auto f
= s
->formatter
;
160 f
->open_object_section("GetTopicResponse");
161 f
->open_object_section("GetTopicResult");
162 encode_xml("Topic", result
.topic
, f
);
164 f
->open_object_section("ResponseMetadata");
165 encode_xml("RequestId", s
->req_id
, f
);
168 rgw_flush_formatter_and_reset(s
, f
);
172 // command (AWS compliant):
174 // Action=GetTopicAttributes&TopicArn=<topic-arn>
175 class RGWPSGetTopicAttributes_ObjStore_AWS
: public RGWPSGetTopicOp
{
177 int get_params() override
{
178 const auto topic_arn
= rgw::ARN::parse((s
->info
.args
.get("TopicArn")));
180 if (!topic_arn
|| topic_arn
->resource
.empty()) {
181 ldpp_dout(this, 1) << "GetTopicAttribute Action 'TopicArn' argument is missing or invalid" << dendl
;
185 topic_name
= topic_arn
->resource
;
189 void send_response() override
{
191 set_req_state_err(s
, op_ret
);
194 end_header(s
, this, "application/xml");
200 const auto f
= s
->formatter
;
201 f
->open_object_section_in_ns("GetTopicAttributesResponse", AWS_SNS_NS
);
202 f
->open_object_section("GetTopicAttributesResult");
203 result
.topic
.dump_xml_as_attributes(f
);
204 f
->close_section(); // GetTopicAttributesResult
205 f
->open_object_section("ResponseMetadata");
206 encode_xml("RequestId", s
->req_id
, f
);
207 f
->close_section(); // ResponseMetadata
208 f
->close_section(); // GetTopicAttributesResponse
209 rgw_flush_formatter_and_reset(s
, f
);
213 // command (AWS compliant):
215 // Action=DeleteTopic&TopicArn=<topic-arn>
216 class RGWPSDeleteTopic_ObjStore_AWS
: public RGWPSDeleteTopicOp
{
218 int get_params() override
{
219 const auto topic_arn
= rgw::ARN::parse((s
->info
.args
.get("TopicArn")));
221 if (!topic_arn
|| topic_arn
->resource
.empty()) {
222 ldpp_dout(this, 1) << "DeleteTopic Action 'TopicArn' argument is missing or invalid" << dendl
;
226 topic_name
= topic_arn
->resource
;
228 // upon deletion it is not known if topic is persistent or not
229 // will try to delete the persistent topic anyway
230 const auto ret
= rgw::notify::remove_persistent_topic(topic_name
, s
->yield
);
231 if (ret
== -ENOENT
) {
232 // topic was not persistent, or already deleted
236 ldpp_dout(this, 1) << "DeleteTopic Action failed to remove queue for persistent topics. error:" << ret
<< dendl
;
243 void send_response() override
{
245 set_req_state_err(s
, op_ret
);
248 end_header(s
, this, "application/xml");
254 const auto f
= s
->formatter
;
255 f
->open_object_section_in_ns("DeleteTopicResponse", AWS_SNS_NS
);
256 f
->open_object_section("ResponseMetadata");
257 encode_xml("RequestId", s
->req_id
, f
);
258 f
->close_section(); // ResponseMetadata
259 f
->close_section(); // DeleteTopicResponse
260 rgw_flush_formatter_and_reset(s
, f
);
265 // utility classes and functions for handling parameters with the following format:
266 // Attributes.entry.{N}.{key|value}={VALUE}
267 // N - any unsigned number
268 // VALUE - url encoded string
270 // and Attribute is holding key and value
271 // ctor and set are done according to the "type" argument
272 // if type is not "key" or "value" its a no-op
277 Attribute(const std::string
& type
, const std::string
& key_or_value
) {
278 set(type
, key_or_value
);
280 void set(const std::string
& type
, const std::string
& key_or_value
) {
283 } else if (type
== "value") {
284 value
= key_or_value
;
287 const std::string
& get_key() const { return key
; }
288 const std::string
& get_value() const { return value
; }
291 using AttributeMap
= std::map
<unsigned, Attribute
>;
293 // aggregate the attributes into a map
294 // the key and value are associated by the index (N)
295 // no assumptions are made on the order in which these parameters are added
296 void update_attribute_map(const std::string
& input
, AttributeMap
& map
) {
297 const boost::char_separator
<char> sep(".");
298 const boost::tokenizer
tokens(input
, sep
);
299 auto token
= tokens
.begin();
300 if (*token
!= "Attributes") {
305 if (*token
!= "entry") {
312 idx
= std::stoul(*token
);
313 } catch (const std::invalid_argument
&) {
318 std::string key_or_value
= "";
319 // get the rest of the string regardless of dots
320 // this is to allow dots in the value
321 while (token
!= tokens
.end()) {
322 key_or_value
.append(*token
+".");
325 // remove last separator
326 key_or_value
.pop_back();
328 auto pos
= key_or_value
.find("=");
329 if (pos
!= string::npos
) {
330 const auto key_or_value_lhs
= key_or_value
.substr(0, pos
);
331 const auto key_or_value_rhs
= url_decode(key_or_value
.substr(pos
+ 1, key_or_value
.size() - 1));
332 const auto map_it
= map
.find(idx
);
333 if (map_it
== map
.end()) {
335 map
.emplace(std::make_pair(idx
, Attribute(key_or_value_lhs
, key_or_value_rhs
)));
338 map_it
->second
.set(key_or_value_lhs
, key_or_value_rhs
);
344 void RGWHandler_REST_PSTopic_AWS::rgw_topic_parse_input() {
345 if (post_body
.size() > 0) {
346 ldpp_dout(s
, 10) << "Content of POST: " << post_body
<< dendl
;
348 if (post_body
.find("Action") != string::npos
) {
349 const boost::char_separator
<char> sep("&");
350 const boost::tokenizer
<boost::char_separator
<char>> tokens(post_body
, sep
);
352 for (const auto& t
: tokens
) {
353 auto pos
= t
.find("=");
354 if (pos
!= string::npos
) {
355 const auto key
= t
.substr(0, pos
);
356 if (key
== "Action") {
357 s
->info
.args
.append(key
, t
.substr(pos
+ 1, t
.size() - 1));
358 } else if (key
== "Name" || key
== "TopicArn") {
359 const auto value
= url_decode(t
.substr(pos
+ 1, t
.size() - 1));
360 s
->info
.args
.append(key
, value
);
362 update_attribute_map(t
, map
);
366 // update the regular args with the content of the attribute map
367 for (const auto& attr
: map
) {
368 s
->info
.args
.append(attr
.second
.get_key(), attr
.second
.get_value());
371 const auto payload_hash
= rgw::auth::s3::calc_v4_payload_hash(post_body
);
372 s
->info
.args
.append("PayloadHash", payload_hash
);
376 RGWOp
* RGWHandler_REST_PSTopic_AWS::op_post() {
377 rgw_topic_parse_input();
379 if (s
->info
.args
.exists("Action")) {
380 const auto action
= s
->info
.args
.get("Action");
381 if (action
.compare("CreateTopic") == 0)
382 return new RGWPSCreateTopic_ObjStore_AWS();
383 if (action
.compare("DeleteTopic") == 0)
384 return new RGWPSDeleteTopic_ObjStore_AWS
;
385 if (action
.compare("ListTopics") == 0)
386 return new RGWPSListTopics_ObjStore_AWS();
387 if (action
.compare("GetTopic") == 0)
388 return new RGWPSGetTopic_ObjStore_AWS();
389 if (action
.compare("GetTopicAttributes") == 0)
390 return new RGWPSGetTopicAttributes_ObjStore_AWS();
396 int RGWHandler_REST_PSTopic_AWS::authorize(const DoutPrefixProvider
* dpp
, optional_yield y
) {
397 return RGW_Auth_S3::authorize(dpp
, store
, auth_registry
, s
, y
);
402 // return a unique topic by prefexing with the notification name: <notification>_<topic>
403 std::string
topic_to_unique(const std::string
& topic
, const std::string
& notification
) {
404 return notification
+ "_" + topic
;
407 // extract the topic from a unique topic of the form: <notification>_<topic>
408 [[maybe_unused
]] std::string
unique_to_topic(const std::string
& unique_topic
, const std::string
& notification
) {
409 if (unique_topic
.find(notification
+ "_") == string::npos
) {
412 return unique_topic
.substr(notification
.length() + 1);
415 // from list of bucket topics, find the one that was auto-generated by a notification
416 auto find_unique_topic(const rgw_pubsub_bucket_topics
& bucket_topics
, const std::string
& notif_name
) {
417 auto it
= std::find_if(bucket_topics
.topics
.begin(), bucket_topics
.topics
.end(), [&](const auto& val
) { return notif_name
== val
.second
.s3_id
; });
418 return it
!= bucket_topics
.topics
.end() ?
419 std::optional
<std::reference_wrapper
<const rgw_pubsub_topic_filter
>>(it
->second
):
424 // command (S3 compliant): PUT /<bucket name>?notification
425 // a "notification" and a subscription will be auto-generated
426 // actual configuration is XML encoded in the body of the message
427 class RGWPSCreateNotif_ObjStore_S3
: public RGWPSCreateNotifOp
{
428 rgw_pubsub_s3_notifications configurations
;
430 int get_params_from_body() {
431 const auto max_size
= s
->cct
->_conf
->rgw_max_put_param_size
;
434 std::tie(r
, data
) = rgw_rest_read_all_input(s
, max_size
, false);
437 ldpp_dout(this, 1) << "failed to read XML payload" << dendl
;
440 if (data
.length() == 0) {
441 ldpp_dout(this, 1) << "XML payload missing" << dendl
;
445 RGWXMLDecoder::XMLParser parser
;
448 ldpp_dout(this, 1) << "failed to initialize XML parser" << dendl
;
451 if (!parser
.parse(data
.c_str(), data
.length(), 1)) {
452 ldpp_dout(this, 1) << "failed to parse XML payload" << dendl
;
453 return -ERR_MALFORMED_XML
;
456 // NotificationConfigurations is mandatory
457 RGWXMLDecoder::decode_xml("NotificationConfiguration", configurations
, &parser
, true);
458 } catch (RGWXMLDecoder::err
& err
) {
459 ldpp_dout(this, 1) << "failed to parse XML payload. error: " << err
<< dendl
;
460 return -ERR_MALFORMED_XML
;
465 int get_params() override
{
467 const auto no_value
= s
->info
.args
.get("notification", &exists
);
469 ldpp_dout(this, 1) << "missing required param 'notification'" << dendl
;
472 if (no_value
.length() > 0) {
473 ldpp_dout(this, 1) << "param 'notification' should not have any value" << dendl
;
476 if (s
->bucket_name
.empty()) {
477 ldpp_dout(this, 1) << "request must be on a bucket" << dendl
;
480 bucket_name
= s
->bucket_name
;
485 const char* name() const override
{ return "pubsub_notification_create_s3"; }
486 void execute(optional_yield
) override
;
489 void RGWPSCreateNotif_ObjStore_S3::execute(optional_yield y
) {
490 op_ret
= get_params_from_body();
495 ps
.emplace(store
, s
->owner
.get_id().tenant
);
496 auto b
= ps
->get_bucket(bucket_info
.bucket
);
498 std::string data_bucket_prefix
= "";
499 std::string data_oid_prefix
= "";
500 bool push_only
= true;
501 if (store
->getRados()->get_sync_module()) {
502 const auto psmodule
= dynamic_cast<RGWPSSyncModuleInstance
*>(store
->getRados()->get_sync_module().get());
504 const auto& conf
= psmodule
->get_effective_conf();
505 data_bucket_prefix
= conf
["data_bucket_prefix"];
506 data_oid_prefix
= conf
["data_oid_prefix"];
507 // TODO: allow "push-only" on PS zone as well
512 for (const auto& c
: configurations
.list
) {
513 const auto& notif_name
= c
.id
;
514 if (notif_name
.empty()) {
515 ldpp_dout(this, 1) << "missing notification id" << dendl
;
519 if (c
.topic_arn
.empty()) {
520 ldpp_dout(this, 1) << "missing topic ARN in notification: '" << notif_name
<< "'" << dendl
;
525 const auto arn
= rgw::ARN::parse(c
.topic_arn
);
526 if (!arn
|| arn
->resource
.empty()) {
527 ldpp_dout(this, 1) << "topic ARN has invalid format: '" << c
.topic_arn
<< "' in notification: '" << notif_name
<< "'" << dendl
;
532 if (std::find(c
.events
.begin(), c
.events
.end(), rgw::notify::UnknownEvent
) != c
.events
.end()) {
533 ldpp_dout(this, 1) << "unknown event type in notification: '" << notif_name
<< "'" << dendl
;
538 const auto topic_name
= arn
->resource
;
540 // get topic information. destination information is stored in the topic
541 rgw_pubsub_topic topic_info
;
542 op_ret
= ps
->get_topic(topic_name
, &topic_info
);
544 ldpp_dout(this, 1) << "failed to get topic '" << topic_name
<< "', ret=" << op_ret
<< dendl
;
547 // make sure that full topic configuration match
548 // TODO: use ARN match function
550 // create unique topic name. this has 2 reasons:
551 // (1) topics cannot be shared between different S3 notifications because they hold the filter information
552 // (2) make topic clneaup easier, when notification is removed
553 const auto unique_topic_name
= topic_to_unique(topic_name
, notif_name
);
554 // generate the internal topic. destination is stored here for the "push-only" case
555 // when no subscription exists
556 // ARN is cached to make the "GET" method faster
557 op_ret
= ps
->create_topic(this, unique_topic_name
, topic_info
.dest
, topic_info
.arn
, topic_info
.opaque_data
, y
);
559 ldpp_dout(this, 1) << "failed to auto-generate unique topic '" << unique_topic_name
<<
560 "', ret=" << op_ret
<< dendl
;
563 ldpp_dout(this, 20) << "successfully auto-generated unique topic '" << unique_topic_name
<< "'" << dendl
;
564 // generate the notification
565 rgw::notify::EventTypeList events
;
566 op_ret
= b
->create_notification(this, unique_topic_name
, c
.events
, std::make_optional(c
.filter
), notif_name
, y
);
568 ldpp_dout(this, 1) << "failed to auto-generate notification for unique topic '" << unique_topic_name
<<
569 "', ret=" << op_ret
<< dendl
;
570 // rollback generated topic (ignore return value)
571 ps
->remove_topic(this, unique_topic_name
, y
);
574 ldpp_dout(this, 20) << "successfully auto-generated notification for unique topic '" << unique_topic_name
<< "'" << dendl
;
577 // generate the subscription with destination information from the original topic
578 rgw_pubsub_sub_dest dest
= topic_info
.dest
;
579 dest
.bucket_name
= data_bucket_prefix
+ s
->owner
.get_id().to_str() + "-" + unique_topic_name
;
580 dest
.oid_prefix
= data_oid_prefix
+ notif_name
+ "/";
581 auto sub
= ps
->get_sub(notif_name
);
582 op_ret
= sub
->subscribe(this, unique_topic_name
, dest
, y
, notif_name
);
584 ldpp_dout(this, 1) << "failed to auto-generate subscription '" << notif_name
<< "', ret=" << op_ret
<< dendl
;
585 // rollback generated notification (ignore return value)
586 b
->remove_notification(this, unique_topic_name
, y
);
587 // rollback generated topic (ignore return value)
588 ps
->remove_topic(this, unique_topic_name
, y
);
591 ldpp_dout(this, 20) << "successfully auto-generated subscription '" << notif_name
<< "'" << dendl
;
596 // command (extension to S3): DELETE /bucket?notification[=<notification-id>]
597 class RGWPSDeleteNotif_ObjStore_S3
: public RGWPSDeleteNotifOp
{
599 std::string notif_name
;
601 int get_params() override
{
603 notif_name
= s
->info
.args
.get("notification", &exists
);
605 ldpp_dout(this, 1) << "missing required param 'notification'" << dendl
;
608 if (s
->bucket_name
.empty()) {
609 ldpp_dout(this, 1) << "request must be on a bucket" << dendl
;
612 bucket_name
= s
->bucket_name
;
616 void remove_notification_by_topic(const std::string
& topic_name
, const RGWPubSub::BucketRef
& b
, optional_yield y
) {
617 op_ret
= b
->remove_notification(this, topic_name
, y
);
619 ldpp_dout(this, 1) << "failed to remove notification of topic '" << topic_name
<< "', ret=" << op_ret
<< dendl
;
621 op_ret
= ps
->remove_topic(this, topic_name
, y
);
623 ldpp_dout(this, 1) << "failed to remove auto-generated topic '" << topic_name
<< "', ret=" << op_ret
<< dendl
;
628 void execute(optional_yield y
) override
;
629 const char* name() const override
{ return "pubsub_notification_delete_s3"; }
632 void RGWPSDeleteNotif_ObjStore_S3::execute(optional_yield y
) {
633 op_ret
= get_params();
638 ps
.emplace(store
, s
->owner
.get_id().tenant
);
639 auto b
= ps
->get_bucket(bucket_info
.bucket
);
642 // get all topics on a bucket
643 rgw_pubsub_bucket_topics bucket_topics
;
644 op_ret
= b
->get_topics(&bucket_topics
);
646 ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << bucket_info
.bucket
.name
<< "', ret=" << op_ret
<< dendl
;
650 if (!notif_name
.empty()) {
651 // delete a specific notification
652 const auto unique_topic
= find_unique_topic(bucket_topics
, notif_name
);
654 // remove the auto generated subscription according to notification name (if exist)
655 const auto unique_topic_name
= unique_topic
->get().topic
.name
;
656 auto sub
= ps
->get_sub(notif_name
);
657 op_ret
= sub
->unsubscribe(this, unique_topic_name
, y
);
658 if (op_ret
< 0 && op_ret
!= -ENOENT
) {
659 ldpp_dout(this, 1) << "failed to remove auto-generated subscription '" << notif_name
<< "', ret=" << op_ret
<< dendl
;
662 remove_notification_by_topic(unique_topic_name
, b
, y
);
665 // notification to be removed is not found - considered success
666 ldpp_dout(this, 20) << "notification '" << notif_name
<< "' already removed" << dendl
;
670 // delete all notification of on a bucket
671 for (const auto& topic
: bucket_topics
.topics
) {
672 // remove the auto generated subscription of the topic (if exist)
673 rgw_pubsub_topic_subs topic_subs
;
674 op_ret
= ps
->get_topic(topic
.first
, &topic_subs
);
675 for (const auto& topic_sub_name
: topic_subs
.subs
) {
676 auto sub
= ps
->get_sub(topic_sub_name
);
677 rgw_pubsub_sub_config sub_conf
;
678 op_ret
= sub
->get_conf(&sub_conf
);
680 ldpp_dout(this, 1) << "failed to get subscription '" << topic_sub_name
<< "' info, ret=" << op_ret
<< dendl
;
683 if (!sub_conf
.s3_id
.empty()) {
684 // S3 notification, has autogenerated subscription
685 const auto& sub_topic_name
= sub_conf
.topic
;
686 op_ret
= sub
->unsubscribe(this, sub_topic_name
, y
);
688 ldpp_dout(this, 1) << "failed to remove auto-generated subscription '" << topic_sub_name
<< "', ret=" << op_ret
<< dendl
;
693 remove_notification_by_topic(topic
.first
, b
, y
);
697 // command (S3 compliant): GET /bucket?notification[=<notification-id>]
698 class RGWPSListNotifs_ObjStore_S3
: public RGWPSListNotifsOp
{
700 std::string notif_name
;
701 rgw_pubsub_s3_notifications notifications
;
703 int get_params() override
{
705 notif_name
= s
->info
.args
.get("notification", &exists
);
707 ldpp_dout(this, 1) << "missing required param 'notification'" << dendl
;
710 if (s
->bucket_name
.empty()) {
711 ldpp_dout(this, 1) << "request must be on a bucket" << dendl
;
714 bucket_name
= s
->bucket_name
;
719 void execute(optional_yield y
) override
;
720 void send_response() override
{
722 set_req_state_err(s
, op_ret
);
725 end_header(s
, this, "application/xml");
730 notifications
.dump_xml(s
->formatter
);
731 rgw_flush_formatter_and_reset(s
, s
->formatter
);
733 const char* name() const override
{ return "pubsub_notifications_get_s3"; }
736 void RGWPSListNotifs_ObjStore_S3::execute(optional_yield y
) {
737 ps
.emplace(store
, s
->owner
.get_id().tenant
);
738 auto b
= ps
->get_bucket(bucket_info
.bucket
);
741 // get all topics on a bucket
742 rgw_pubsub_bucket_topics bucket_topics
;
743 op_ret
= b
->get_topics(&bucket_topics
);
745 ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << bucket_info
.bucket
.name
<< "', ret=" << op_ret
<< dendl
;
748 if (!notif_name
.empty()) {
749 // get info of a specific notification
750 const auto unique_topic
= find_unique_topic(bucket_topics
, notif_name
);
752 notifications
.list
.emplace_back(unique_topic
->get());
756 ldpp_dout(this, 1) << "failed to get notification info for '" << notif_name
<< "', ret=" << op_ret
<< dendl
;
759 // loop through all topics of the bucket
760 for (const auto& topic
: bucket_topics
.topics
) {
761 if (topic
.second
.s3_id
.empty()) {
762 // not an s3 notification
765 notifications
.list
.emplace_back(topic
.second
);
769 RGWOp
* RGWHandler_REST_PSNotifs_S3::op_get() {
770 return new RGWPSListNotifs_ObjStore_S3();
773 RGWOp
* RGWHandler_REST_PSNotifs_S3::op_put() {
774 return new RGWPSCreateNotif_ObjStore_S3();
777 RGWOp
* RGWHandler_REST_PSNotifs_S3::op_delete() {
778 return new RGWPSDeleteNotif_ObjStore_S3();
781 RGWOp
* RGWHandler_REST_PSNotifs_S3::create_get_op() {
782 return new RGWPSListNotifs_ObjStore_S3();
785 RGWOp
* RGWHandler_REST_PSNotifs_S3::create_put_op() {
786 return new RGWPSCreateNotif_ObjStore_S3();
789 RGWOp
* RGWHandler_REST_PSNotifs_S3::create_delete_op() {
790 return new RGWPSDeleteNotif_ObjStore_S3();