1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 #include <boost/tokenizer.hpp>
7 #include "rgw_rest_pubsub_common.h"
8 #include "rgw_rest_pubsub.h"
9 #include "rgw_pubsub_push.h"
10 #include "rgw_pubsub.h"
11 #include "rgw_sync_module_pubsub.h"
14 #include "rgw_rest_s3.h"
16 #include "rgw_auth_s3.h"
17 #include "services/svc_zone.h"
19 #define dout_context g_ceph_context
20 #define dout_subsys ceph_subsys_rgw
23 // command (AWS compliant):
25 // Action=CreateTopic&Name=<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]
26 class RGWPSCreateTopic_ObjStore_AWS
: public RGWPSCreateTopicOp
{
28 int get_params() override
{
29 topic_name
= s
->info
.args
.get("Name");
30 if (topic_name
.empty()) {
31 ldout(s
->cct
, 1) << "CreateTopic Action 'Name' argument is missing" << dendl
;
35 opaque_data
= s
->info
.args
.get("OpaqueData");
37 dest
.push_endpoint
= s
->info
.args
.get("push-endpoint");
39 if (!validate_and_update_endpoint_secret(dest
, s
->cct
, *(s
->info
.env
))) {
42 for (const auto param
: s
->info
.args
.get_params()) {
43 if (param
.first
== "Action" || param
.first
== "Name" || param
.first
== "PayloadHash") {
46 dest
.push_endpoint_args
.append(param
.first
+"="+param
.second
+"&");
49 if (!dest
.push_endpoint_args
.empty()) {
50 // remove last separator
51 dest
.push_endpoint_args
.pop_back();
54 // dest object only stores endpoint info
55 // bucket to store events/records will be set only when subscription is created
56 dest
.bucket_name
= "";
58 dest
.arn_topic
= topic_name
;
59 // the topic ARN will be sent in the reply
60 const rgw::ARN
arn(rgw::Partition::aws
, rgw::Service::sns
,
61 store
->svc()->zone
->get_zonegroup().get_name(),
62 s
->user
->get_tenant(), topic_name
);
63 topic_arn
= arn
.to_string();
67 void send_response() override
{
69 set_req_state_err(s
, op_ret
);
72 end_header(s
, this, "application/xml");
78 const auto f
= s
->formatter
;
79 f
->open_object_section_in_ns("CreateTopicResponse", "https://sns.amazonaws.com/doc/2010-03-31/");
80 f
->open_object_section("CreateTopicResult");
81 encode_xml("TopicArn", topic_arn
, f
);
83 f
->open_object_section("ResponseMetadata");
84 encode_xml("RequestId", s
->req_id
, f
);
87 rgw_flush_formatter_and_reset(s
, f
);
91 // command (AWS compliant):
94 class RGWPSListTopics_ObjStore_AWS
: public RGWPSListTopicsOp
{
96 void send_response() override
{
98 set_req_state_err(s
, op_ret
);
101 end_header(s
, this, "application/xml");
107 const auto f
= s
->formatter
;
108 f
->open_object_section_in_ns("ListTopicsResponse", "https://sns.amazonaws.com/doc/2010-03-31/");
109 f
->open_object_section("ListTopicsResult");
110 encode_xml("Topics", result
, f
);
112 f
->open_object_section("ResponseMetadata");
113 encode_xml("RequestId", s
->req_id
, f
);
116 rgw_flush_formatter_and_reset(s
, f
);
120 // command (extension to AWS):
122 // Action=GetTopic&TopicArn=<topic-arn>
123 class RGWPSGetTopic_ObjStore_AWS
: public RGWPSGetTopicOp
{
125 int get_params() override
{
126 const auto topic_arn
= rgw::ARN::parse((s
->info
.args
.get("TopicArn")));
128 if (!topic_arn
|| topic_arn
->resource
.empty()) {
129 ldout(s
->cct
, 1) << "GetTopic Action 'TopicArn' argument is missing or invalid" << dendl
;
133 topic_name
= topic_arn
->resource
;
137 void send_response() override
{
139 set_req_state_err(s
, op_ret
);
142 end_header(s
, this, "application/xml");
148 const auto f
= s
->formatter
;
149 f
->open_object_section("GetTopicResponse");
150 f
->open_object_section("GetTopicResult");
151 encode_xml("Topic", result
.topic
, f
);
153 f
->open_object_section("ResponseMetadata");
154 encode_xml("RequestId", s
->req_id
, f
);
157 rgw_flush_formatter_and_reset(s
, f
);
161 // command (AWS compliant):
163 // Action=DeleteTopic&TopicArn=<topic-arn>
164 class RGWPSDeleteTopic_ObjStore_AWS
: public RGWPSDeleteTopicOp
{
166 int get_params() override
{
167 const auto topic_arn
= rgw::ARN::parse((s
->info
.args
.get("TopicArn")));
169 if (!topic_arn
|| topic_arn
->resource
.empty()) {
170 ldout(s
->cct
, 1) << "DeleteTopic Action 'TopicArn' argument is missing or invalid" << dendl
;
174 topic_name
= topic_arn
->resource
;
178 void send_response() override
{
180 set_req_state_err(s
, op_ret
);
183 end_header(s
, this, "application/xml");
189 const auto f
= s
->formatter
;
190 f
->open_object_section_in_ns("DeleteTopicResponse", "https://sns.amazonaws.com/doc/2010-03-31/");
191 f
->open_object_section("ResponseMetadata");
192 encode_xml("RequestId", s
->req_id
, f
);
195 rgw_flush_formatter_and_reset(s
, f
);
200 // utility classes and functions for handling parameters with the following format:
201 // Attributes.entry.{N}.{key|value}={VALUE}
202 // N - any unsigned number
203 // VALUE - url encoded string
205 // and Attribute is holding key and value
206 // ctor and set are done according to the "type" argument
207 // if type is not "key" or "value" its a no-op
212 Attribute(const std::string
& type
, const std::string
& key_or_value
) {
213 set(type
, key_or_value
);
215 void set(const std::string
& type
, const std::string
& key_or_value
) {
218 } else if (type
== "value") {
219 value
= key_or_value
;
222 const std::string
& get_key() const { return key
; }
223 const std::string
& get_value() const { return value
; }
226 using AttributeMap
= std::map
<unsigned, Attribute
>;
228 // aggregate the attributes into a map
229 // the key and value are associated by the index (N)
230 // no assumptions are made on the order in which these parameters are added
231 void update_attribute_map(const std::string
& input
, AttributeMap
& map
) {
232 const boost::char_separator
<char> sep(".");
233 const boost::tokenizer
tokens(input
, sep
);
234 auto token
= tokens
.begin();
235 if (*token
!= "Attributes") {
240 if (*token
!= "entry") {
247 idx
= std::stoul(*token
);
248 } catch (const std::invalid_argument
&) {
253 std::string key_or_value
= "";
254 // get the rest of the string regardless of dots
255 // this is to allow dots in the value
256 while (token
!= tokens
.end()) {
257 key_or_value
.append(*token
+".");
260 // remove last separator
261 key_or_value
.pop_back();
263 auto pos
= key_or_value
.find("=");
264 if (pos
!= string::npos
) {
265 const auto key_or_value_lhs
= key_or_value
.substr(0, pos
);
266 const auto key_or_value_rhs
= url_decode(key_or_value
.substr(pos
+ 1, key_or_value
.size() - 1));
267 const auto map_it
= map
.find(idx
);
268 if (map_it
== map
.end()) {
270 map
.emplace(std::make_pair(idx
, Attribute(key_or_value_lhs
, key_or_value_rhs
)));
273 map_it
->second
.set(key_or_value_lhs
, key_or_value_rhs
);
279 void RGWHandler_REST_PSTopic_AWS::rgw_topic_parse_input() {
280 if (post_body
.size() > 0) {
281 ldout(s
->cct
, 10) << "Content of POST: " << post_body
<< dendl
;
283 if (post_body
.find("Action") != string::npos
) {
284 const boost::char_separator
<char> sep("&");
285 const boost::tokenizer
<boost::char_separator
<char>> tokens(post_body
, sep
);
287 for (const auto& t
: tokens
) {
288 auto pos
= t
.find("=");
289 if (pos
!= string::npos
) {
290 const auto key
= t
.substr(0, pos
);
291 if (key
== "Action") {
292 s
->info
.args
.append(key
, t
.substr(pos
+ 1, t
.size() - 1));
293 } else if (key
== "Name" || key
== "TopicArn") {
294 const auto value
= url_decode(t
.substr(pos
+ 1, t
.size() - 1));
295 s
->info
.args
.append(key
, value
);
297 update_attribute_map(t
, map
);
301 // update the regular args with the content of the attribute map
302 for (const auto attr
: map
) {
303 s
->info
.args
.append(attr
.second
.get_key(), attr
.second
.get_value());
306 const auto payload_hash
= rgw::auth::s3::calc_v4_payload_hash(post_body
);
307 s
->info
.args
.append("PayloadHash", payload_hash
);
311 RGWOp
* RGWHandler_REST_PSTopic_AWS::op_post() {
312 rgw_topic_parse_input();
314 if (s
->info
.args
.exists("Action")) {
315 const auto action
= s
->info
.args
.get("Action");
316 if (action
.compare("CreateTopic") == 0)
317 return new RGWPSCreateTopic_ObjStore_AWS();
318 if (action
.compare("DeleteTopic") == 0)
319 return new RGWPSDeleteTopic_ObjStore_AWS
;
320 if (action
.compare("ListTopics") == 0)
321 return new RGWPSListTopics_ObjStore_AWS();
322 if (action
.compare("GetTopic") == 0)
323 return new RGWPSGetTopic_ObjStore_AWS();
329 int RGWHandler_REST_PSTopic_AWS::authorize(const DoutPrefixProvider
* dpp
) {
330 /*if (s->info.args.exists("Action") && s->info.args.get("Action").find("Topic") != std::string::npos) {
331 // TODO: some topic specific authorization
334 return RGW_Auth_S3::authorize(dpp
, store
, auth_registry
, s
);
339 // return a unique topic by prefexing with the notification name: <notification>_<topic>
340 std::string
topic_to_unique(const std::string
& topic
, const std::string
& notification
) {
341 return notification
+ "_" + topic
;
344 // extract the topic from a unique topic of the form: <notification>_<topic>
345 [[maybe_unused
]] std::string
unique_to_topic(const std::string
& unique_topic
, const std::string
& notification
) {
346 if (unique_topic
.find(notification
+ "_") == string::npos
) {
349 return unique_topic
.substr(notification
.length() + 1);
352 // from list of bucket topics, find the one that was auto-generated by a notification
353 auto find_unique_topic(const rgw_pubsub_bucket_topics
& bucket_topics
, const std::string
& notif_name
) {
354 auto it
= std::find_if(bucket_topics
.topics
.begin(), bucket_topics
.topics
.end(), [&](const auto& val
) { return notif_name
== val
.second
.s3_id
; });
355 return it
!= bucket_topics
.topics
.end() ?
356 std::optional
<std::reference_wrapper
<const rgw_pubsub_topic_filter
>>(it
->second
):
361 // command (S3 compliant): PUT /<bucket name>?notification
362 // a "notification" and a subscription will be auto-generated
363 // actual configuration is XML encoded in the body of the message
364 class RGWPSCreateNotif_ObjStore_S3
: public RGWPSCreateNotifOp
{
365 rgw_pubsub_s3_notifications configurations
;
367 int get_params_from_body() {
368 const auto max_size
= s
->cct
->_conf
->rgw_max_put_param_size
;
371 std::tie(r
, data
) = rgw_rest_read_all_input(s
, max_size
, false);
374 ldout(s
->cct
, 1) << "failed to read XML payload" << dendl
;
377 if (data
.length() == 0) {
378 ldout(s
->cct
, 1) << "XML payload missing" << dendl
;
382 RGWXMLDecoder::XMLParser parser
;
385 ldout(s
->cct
, 1) << "failed to initialize XML parser" << dendl
;
388 if (!parser
.parse(data
.c_str(), data
.length(), 1)) {
389 ldout(s
->cct
, 1) << "failed to parse XML payload" << dendl
;
390 return -ERR_MALFORMED_XML
;
393 // NotificationConfigurations is mandatory
394 RGWXMLDecoder::decode_xml("NotificationConfiguration", configurations
, &parser
, true);
395 } catch (RGWXMLDecoder::err
& err
) {
396 ldout(s
->cct
, 1) << "failed to parse XML payload. error: " << err
<< dendl
;
397 return -ERR_MALFORMED_XML
;
402 int get_params() override
{
404 const auto no_value
= s
->info
.args
.get("notification", &exists
);
406 ldout(s
->cct
, 1) << "missing required param 'notification'" << dendl
;
409 if (no_value
.length() > 0) {
410 ldout(s
->cct
, 1) << "param 'notification' should not have any value" << dendl
;
413 if (s
->bucket_name
.empty()) {
414 ldout(s
->cct
, 1) << "request must be on a bucket" << dendl
;
417 bucket_name
= s
->bucket_name
;
422 const char* name() const override
{ return "pubsub_notification_create_s3"; }
423 void execute() override
;
426 void RGWPSCreateNotif_ObjStore_S3::execute() {
427 op_ret
= get_params_from_body();
432 ups
.emplace(store
, s
->owner
.get_id());
433 auto b
= ups
->get_bucket(bucket_info
.bucket
);
435 std::string data_bucket_prefix
= "";
436 std::string data_oid_prefix
= "";
437 bool push_only
= true;
438 if (store
->getRados()->get_sync_module()) {
439 const auto psmodule
= dynamic_cast<RGWPSSyncModuleInstance
*>(store
->getRados()->get_sync_module().get());
441 const auto& conf
= psmodule
->get_effective_conf();
442 data_bucket_prefix
= conf
["data_bucket_prefix"];
443 data_oid_prefix
= conf
["data_oid_prefix"];
444 // TODO: allow "push-only" on PS zone as well
449 for (const auto& c
: configurations
.list
) {
450 const auto& notif_name
= c
.id
;
451 if (notif_name
.empty()) {
452 ldout(s
->cct
, 1) << "missing notification id" << dendl
;
456 if (c
.topic_arn
.empty()) {
457 ldout(s
->cct
, 1) << "missing topic ARN in notification: '" << notif_name
<< "'" << dendl
;
462 const auto arn
= rgw::ARN::parse(c
.topic_arn
);
463 if (!arn
|| arn
->resource
.empty()) {
464 ldout(s
->cct
, 1) << "topic ARN has invalid format: '" << c
.topic_arn
<< "' in notification: '" << notif_name
<< "'" << dendl
;
469 if (std::find(c
.events
.begin(), c
.events
.end(), rgw::notify::UnknownEvent
) != c
.events
.end()) {
470 ldout(s
->cct
, 1) << "unknown event type in notification: '" << notif_name
<< "'" << dendl
;
475 const auto topic_name
= arn
->resource
;
477 // get topic information. destination information is stored in the topic
478 rgw_pubsub_topic topic_info
;
479 op_ret
= ups
->get_topic(topic_name
, &topic_info
);
481 ldout(s
->cct
, 1) << "failed to get topic '" << topic_name
<< "', ret=" << op_ret
<< dendl
;
484 // make sure that full topic configuration match
485 // TODO: use ARN match function
487 // create unique topic name. this has 2 reasons:
488 // (1) topics cannot be shared between different S3 notifications because they hold the filter information
489 // (2) make topic clneaup easier, when notification is removed
490 const auto unique_topic_name
= topic_to_unique(topic_name
, notif_name
);
491 // generate the internal topic. destination is stored here for the "push-only" case
492 // when no subscription exists
493 // ARN is cached to make the "GET" method faster
494 op_ret
= ups
->create_topic(unique_topic_name
, topic_info
.dest
, topic_info
.arn
, topic_info
.opaque_data
);
496 ldout(s
->cct
, 1) << "failed to auto-generate unique topic '" << unique_topic_name
<<
497 "', ret=" << op_ret
<< dendl
;
500 ldout(s
->cct
, 20) << "successfully auto-generated unique topic '" << unique_topic_name
<< "'" << dendl
;
501 // generate the notification
502 rgw::notify::EventTypeList events
;
503 op_ret
= b
->create_notification(unique_topic_name
, c
.events
, std::make_optional(c
.filter
), notif_name
);
505 ldout(s
->cct
, 1) << "failed to auto-generate notification for unique topic '" << unique_topic_name
<<
506 "', ret=" << op_ret
<< dendl
;
507 // rollback generated topic (ignore return value)
508 ups
->remove_topic(unique_topic_name
);
511 ldout(s
->cct
, 20) << "successfully auto-generated notification for unique topic '" << unique_topic_name
<< "'" << dendl
;
514 // generate the subscription with destination information from the original topic
515 rgw_pubsub_sub_dest dest
= topic_info
.dest
;
516 dest
.bucket_name
= data_bucket_prefix
+ s
->owner
.get_id().to_str() + "-" + unique_topic_name
;
517 dest
.oid_prefix
= data_oid_prefix
+ notif_name
+ "/";
518 auto sub
= ups
->get_sub(notif_name
);
519 op_ret
= sub
->subscribe(unique_topic_name
, dest
, notif_name
);
521 ldout(s
->cct
, 1) << "failed to auto-generate subscription '" << notif_name
<< "', ret=" << op_ret
<< dendl
;
522 // rollback generated notification (ignore return value)
523 b
->remove_notification(unique_topic_name
);
524 // rollback generated topic (ignore return value)
525 ups
->remove_topic(unique_topic_name
);
528 ldout(s
->cct
, 20) << "successfully auto-generated subscription '" << notif_name
<< "'" << dendl
;
533 // command (extension to S3): DELETE /bucket?notification[=<notification-id>]
534 class RGWPSDeleteNotif_ObjStore_S3
: public RGWPSDeleteNotifOp
{
536 std::string notif_name
;
538 int get_params() override
{
540 notif_name
= s
->info
.args
.get("notification", &exists
);
542 ldout(s
->cct
, 1) << "missing required param 'notification'" << dendl
;
545 if (s
->bucket_name
.empty()) {
546 ldout(s
->cct
, 1) << "request must be on a bucket" << dendl
;
549 bucket_name
= s
->bucket_name
;
553 void remove_notification_by_topic(const std::string
& topic_name
, const RGWUserPubSub::BucketRef
& b
) {
554 op_ret
= b
->remove_notification(topic_name
);
556 ldout(s
->cct
, 1) << "failed to remove notification of topic '" << topic_name
<< "', ret=" << op_ret
<< dendl
;
558 op_ret
= ups
->remove_topic(topic_name
);
560 ldout(s
->cct
, 1) << "failed to remove auto-generated topic '" << topic_name
<< "', ret=" << op_ret
<< dendl
;
565 void execute() override
;
566 const char* name() const override
{ return "pubsub_notification_delete_s3"; }
569 void RGWPSDeleteNotif_ObjStore_S3::execute() {
570 op_ret
= get_params();
575 ups
.emplace(store
, s
->owner
.get_id());
576 auto b
= ups
->get_bucket(bucket_info
.bucket
);
579 // get all topics on a bucket
580 rgw_pubsub_bucket_topics bucket_topics
;
581 op_ret
= b
->get_topics(&bucket_topics
);
583 ldout(s
->cct
, 1) << "failed to get list of topics from bucket '" << bucket_info
.bucket
.name
<< "', ret=" << op_ret
<< dendl
;
587 if (!notif_name
.empty()) {
588 // delete a specific notification
589 const auto unique_topic
= find_unique_topic(bucket_topics
, notif_name
);
591 // remove the auto generated subscription according to notification name (if exist)
592 const auto unique_topic_name
= unique_topic
->get().topic
.name
;
593 auto sub
= ups
->get_sub(notif_name
);
594 op_ret
= sub
->unsubscribe(unique_topic_name
);
595 if (op_ret
< 0 && op_ret
!= -ENOENT
) {
596 ldout(s
->cct
, 1) << "failed to remove auto-generated subscription '" << notif_name
<< "', ret=" << op_ret
<< dendl
;
599 remove_notification_by_topic(unique_topic_name
, b
);
602 // notification to be removed is not found - considered success
603 ldout(s
->cct
, 20) << "notification '" << notif_name
<< "' already removed" << dendl
;
607 // delete all notification of on a bucket
608 for (const auto& topic
: bucket_topics
.topics
) {
609 // remove the auto generated subscription of the topic (if exist)
610 rgw_pubsub_topic_subs topic_subs
;
611 op_ret
= ups
->get_topic(topic
.first
, &topic_subs
);
612 for (const auto& topic_sub_name
: topic_subs
.subs
) {
613 auto sub
= ups
->get_sub(topic_sub_name
);
614 rgw_pubsub_sub_config sub_conf
;
615 op_ret
= sub
->get_conf(&sub_conf
);
617 ldout(s
->cct
, 1) << "failed to get subscription '" << topic_sub_name
<< "' info, ret=" << op_ret
<< dendl
;
620 if (!sub_conf
.s3_id
.empty()) {
621 // S3 notification, has autogenerated subscription
622 const auto& sub_topic_name
= sub_conf
.topic
;
623 op_ret
= sub
->unsubscribe(sub_topic_name
);
625 ldout(s
->cct
, 1) << "failed to remove auto-generated subscription '" << topic_sub_name
<< "', ret=" << op_ret
<< dendl
;
630 remove_notification_by_topic(topic
.first
, b
);
634 // command (S3 compliant): GET /bucket?notification[=<notification-id>]
635 class RGWPSListNotifs_ObjStore_S3
: public RGWPSListNotifsOp
{
637 std::string notif_name
;
638 rgw_pubsub_s3_notifications notifications
;
640 int get_params() override
{
642 notif_name
= s
->info
.args
.get("notification", &exists
);
644 ldout(s
->cct
, 1) << "missing required param 'notification'" << dendl
;
647 if (s
->bucket_name
.empty()) {
648 ldout(s
->cct
, 1) << "request must be on a bucket" << dendl
;
651 bucket_name
= s
->bucket_name
;
656 void execute() override
;
657 void send_response() override
{
659 set_req_state_err(s
, op_ret
);
662 end_header(s
, this, "application/xml");
667 notifications
.dump_xml(s
->formatter
);
668 rgw_flush_formatter_and_reset(s
, s
->formatter
);
670 const char* name() const override
{ return "pubsub_notifications_get_s3"; }
673 void RGWPSListNotifs_ObjStore_S3::execute() {
674 ups
.emplace(store
, s
->owner
.get_id());
675 auto b
= ups
->get_bucket(bucket_info
.bucket
);
678 // get all topics on a bucket
679 rgw_pubsub_bucket_topics bucket_topics
;
680 op_ret
= b
->get_topics(&bucket_topics
);
682 ldout(s
->cct
, 1) << "failed to get list of topics from bucket '" << bucket_info
.bucket
.name
<< "', ret=" << op_ret
<< dendl
;
685 if (!notif_name
.empty()) {
686 // get info of a specific notification
687 const auto unique_topic
= find_unique_topic(bucket_topics
, notif_name
);
689 notifications
.list
.emplace_back(unique_topic
->get());
693 ldout(s
->cct
, 1) << "failed to get notification info for '" << notif_name
<< "', ret=" << op_ret
<< dendl
;
696 // loop through all topics of the bucket
697 for (const auto& topic
: bucket_topics
.topics
) {
698 if (topic
.second
.s3_id
.empty()) {
699 // not an s3 notification
702 notifications
.list
.emplace_back(topic
.second
);
706 RGWOp
* RGWHandler_REST_PSNotifs_S3::op_get() {
707 return new RGWPSListNotifs_ObjStore_S3();
710 RGWOp
* RGWHandler_REST_PSNotifs_S3::op_put() {
711 return new RGWPSCreateNotif_ObjStore_S3();
714 RGWOp
* RGWHandler_REST_PSNotifs_S3::op_delete() {
715 return new RGWPSDeleteNotif_ObjStore_S3();
718 RGWOp
* RGWHandler_REST_PSNotifs_S3::create_get_op() {
719 return new RGWPSListNotifs_ObjStore_S3();
722 RGWOp
* RGWHandler_REST_PSNotifs_S3::create_put_op() {
723 return new RGWPSCreateNotif_ObjStore_S3();
726 RGWOp
* RGWHandler_REST_PSNotifs_S3::create_delete_op() {
727 return new RGWPSDeleteNotif_ObjStore_S3();