#include <algorithm>
#include <boost/tokenizer.hpp>
#include <optional>
-#include "rgw_rest_pubsub_common.h"
#include "rgw_rest_pubsub.h"
#include "rgw_pubsub_push.h"
#include "rgw_pubsub.h"
-#include "rgw_sync_module_pubsub.h"
#include "rgw_op.h"
#include "rgw_rest.h"
#include "rgw_rest_s3.h"
#include "rgw_arn.h"
#include "rgw_auth_s3.h"
#include "rgw_notify.h"
-#include "rgw_sal_rados.h"
#include "services/svc_zone.h"
+#include "common/dout.h"
+#include "rgw_url.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw
-using namespace std;
-
static const char* AWS_SNS_NS("https://sns.amazonaws.com/doc/2010-03-31/");
+bool verify_transport_security(CephContext *cct, const RGWEnv& env) {
+ const auto is_secure = rgw_transport_is_secure(cct, env);
+ if (!is_secure && g_conf().get_val<bool>("rgw_allow_notification_secrets_in_cleartext")) {
+ ldout(cct, 0) << "WARNING: bypassing endpoint validation, allows sending secrets over insecure transport" << dendl;
+ return true;
+ }
+ return is_secure;
+}
+
+// make sure that endpoint is a valid URL
+// make sure that if user/password are passed inside URL, it is over secure connection
+// update rgw_pubsub_dest to indicate that a password is stored in the URL
+bool validate_and_update_endpoint_secret(rgw_pubsub_dest& dest, CephContext *cct, const RGWEnv& env) {
+ if (dest.push_endpoint.empty()) {
+ return true;
+ }
+ std::string user;
+ std::string password;
+ if (!rgw::parse_url_userinfo(dest.push_endpoint, user, password)) {
+ ldout(cct, 1) << "endpoint validation error: malformed endpoint URL:" << dest.push_endpoint << dendl;
+ return false;
+ }
+ // this should be verified inside parse_url()
+ ceph_assert(user.empty() == password.empty());
+ if (!user.empty()) {
+ dest.stored_secret = true;
+ if (!verify_transport_security(cct, env)) {
+ ldout(cct, 1) << "endpoint validation error: sending secrets over insecure transport" << dendl;
+ return false;
+ }
+ }
+ return true;
+}
+
+bool topic_has_endpoint_secret(const rgw_pubsub_topic& topic) {
+ return topic.dest.stored_secret;
+}
+
+bool topics_has_endpoint_secret(const rgw_pubsub_topics& topics) {
+ for (const auto& topic : topics.topics) {
+ if (topic_has_endpoint_secret(topic.second)) return true;
+ }
+ return false;
+}
+
// command (AWS compliant):
// POST
// Action=CreateTopic&Name=<topic-name>[&OpaqueData=data][&push-endpoint=<endpoint>[&persistent][&<arg1>=<value1>]]
-class RGWPSCreateTopic_ObjStore_AWS : public RGWPSCreateTopicOp {
-public:
- int get_params() override {
+class RGWPSCreateTopicOp : public RGWOp {
+ private:
+ std::string topic_name;
+ rgw_pubsub_dest dest;
+ std::string topic_arn;
+ std::string opaque_data;
+
+ int get_params() {
topic_name = s->info.args.get("Name");
if (topic_name.empty()) {
ldpp_dout(this, 1) << "CreateTopic Action 'Name' argument is missing" << dendl;
}
// dest object only stores endpoint info
- // bucket to store events/records will be set only when subscription is created
- dest.bucket_name = "";
- dest.oid_prefix = "";
dest.arn_topic = topic_name;
// the topic ARN will be sent in the reply
const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns,
- store->get_zone()->get_zonegroup().get_name(),
+ driver->get_zone()->get_zonegroup().get_name(),
s->user->get_tenant(), topic_name);
topic_arn = arn.to_string();
return 0;
}
+ public:
+ int verify_permission(optional_yield) override {
+ return 0;
+ }
+
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+ void execute(optional_yield) override;
+
+ const char* name() const override { return "pubsub_topic_create"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_CREATE; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
+
void send_response() override {
if (op_ret) {
set_req_state_err(s, op_ret);
}
};
+void RGWPSCreateTopicOp::execute(optional_yield y) {
+ op_ret = get_params();
+ if (op_ret < 0) {
+ return;
+ }
+
+ const RGWPubSub ps(driver, s->owner.get_id().tenant);
+ op_ret = ps.create_topic(this, topic_name, dest, topic_arn, opaque_data, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to create topic '" << topic_name << "', ret=" << op_ret << dendl;
+ return;
+ }
+ ldpp_dout(this, 20) << "successfully created topic '" << topic_name << "'" << dendl;
+}
+
// command (AWS compliant):
// POST
// Action=ListTopics
-class RGWPSListTopics_ObjStore_AWS : public RGWPSListTopicsOp {
+class RGWPSListTopicsOp : public RGWOp {
+private:
+ rgw_pubsub_topics result;
+
public:
+ int verify_permission(optional_yield) override {
+ return 0;
+ }
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+ void execute(optional_yield) override;
+
+ const char* name() const override { return "pubsub_topics_list"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPICS_LIST; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+
void send_response() override {
if (op_ret) {
set_req_state_err(s, op_ret);
}
};
+void RGWPSListTopicsOp::execute(optional_yield y) {
+ const RGWPubSub ps(driver, s->owner.get_id().tenant);
+ op_ret = ps.get_topics(this, result, y);
+ // if there are no topics it is not considered an error
+ op_ret = op_ret == -ENOENT ? 0 : op_ret;
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to get topics, ret=" << op_ret << dendl;
+ return;
+ }
+ if (topics_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
+ ldpp_dout(this, 1) << "topics contain secrets and cannot be sent over insecure transport" << dendl;
+ op_ret = -EPERM;
+ return;
+ }
+ ldpp_dout(this, 20) << "successfully got topics" << dendl;
+}
+
// command (extension to AWS):
// POST
// Action=GetTopic&TopicArn=<topic-arn>
-class RGWPSGetTopic_ObjStore_AWS : public RGWPSGetTopicOp {
-public:
- int get_params() override {
+class RGWPSGetTopicOp : public RGWOp {
+ private:
+ std::string topic_name;
+ rgw_pubsub_topic result;
+
+ int get_params() {
const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
if (!topic_arn || topic_arn->resource.empty()) {
return 0;
}
+ public:
+ int verify_permission(optional_yield y) override {
+ return 0;
+ }
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+ void execute(optional_yield y) override;
+
+ const char* name() const override { return "pubsub_topic_get"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_GET; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+
void send_response() override {
if (op_ret) {
set_req_state_err(s, op_ret);
const auto f = s->formatter;
f->open_object_section("GetTopicResponse");
f->open_object_section("GetTopicResult");
- encode_xml("Topic", result.topic, f);
+ encode_xml("Topic", result, f);
f->close_section();
f->open_object_section("ResponseMetadata");
encode_xml("RequestId", s->req_id, f);
}
};
+void RGWPSGetTopicOp::execute(optional_yield y) {
+ op_ret = get_params();
+ if (op_ret < 0) {
+ return;
+ }
+ const RGWPubSub ps(driver, s->owner.get_id().tenant);
+ op_ret = ps.get_topic(this, topic_name, result, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
+ return;
+ }
+ if (topic_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
+ ldpp_dout(this, 1) << "topic '" << topic_name << "' contain secret and cannot be sent over insecure transport" << dendl;
+ op_ret = -EPERM;
+ return;
+ }
+ ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl;
+}
+
// command (AWS compliant):
// POST
// Action=GetTopicAttributes&TopicArn=<topic-arn>
-class RGWPSGetTopicAttributes_ObjStore_AWS : public RGWPSGetTopicOp {
-public:
- int get_params() override {
+class RGWPSGetTopicAttributesOp : public RGWOp {
+ private:
+ std::string topic_name;
+ rgw_pubsub_topic result;
+
+ int get_params() {
const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
if (!topic_arn || topic_arn->resource.empty()) {
return 0;
}
+ public:
+ int verify_permission(optional_yield y) override {
+ return 0;
+ }
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+ void execute(optional_yield y) override;
+
+ const char* name() const override { return "pubsub_topic_get"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_GET; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+
void send_response() override {
if (op_ret) {
set_req_state_err(s, op_ret);
const auto f = s->formatter;
f->open_object_section_in_ns("GetTopicAttributesResponse", AWS_SNS_NS);
f->open_object_section("GetTopicAttributesResult");
- result.topic.dump_xml_as_attributes(f);
+ result.dump_xml_as_attributes(f);
f->close_section(); // GetTopicAttributesResult
f->open_object_section("ResponseMetadata");
encode_xml("RequestId", s->req_id, f);
}
};
+void RGWPSGetTopicAttributesOp::execute(optional_yield y) {
+ op_ret = get_params();
+ if (op_ret < 0) {
+ return;
+ }
+ const RGWPubSub ps(driver, s->owner.get_id().tenant);
+ op_ret = ps.get_topic(this, topic_name, result, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
+ return;
+ }
+ if (topic_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
+ ldpp_dout(this, 1) << "topic '" << topic_name << "' contain secret and cannot be sent over insecure transport" << dendl;
+ op_ret = -EPERM;
+ return;
+ }
+ ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl;
+}
+
// command (AWS compliant):
// POST
// Action=DeleteTopic&TopicArn=<topic-arn>
-class RGWPSDeleteTopic_ObjStore_AWS : public RGWPSDeleteTopicOp {
-public:
- int get_params() override {
+class RGWPSDeleteTopicOp : public RGWOp {
+ private:
+ std::string topic_name;
+
+ int get_params() {
const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
if (!topic_arn || topic_arn->resource.empty()) {
return 0;
}
-
+
+ public:
+ int verify_permission(optional_yield) override {
+ return 0;
+ }
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+ void execute(optional_yield y) override;
+
+ const char* name() const override { return "pubsub_topic_delete"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_DELETE; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
+
void send_response() override {
if (op_ret) {
set_req_state_err(s, op_ret);
}
};
-namespace {
-// utility classes and functions for handling parameters with the following format:
-// Attributes.entry.{N}.{key|value}={VALUE}
-// N - any unsigned number
-// VALUE - url encoded string
-
-// and Attribute is holding key and value
-// ctor and set are done according to the "type" argument
-// if type is not "key" or "value" its a no-op
-class Attribute {
- std::string key;
- std::string value;
-public:
- Attribute(const std::string& type, const std::string& key_or_value) {
- set(type, key_or_value);
- }
- void set(const std::string& type, const std::string& key_or_value) {
- if (type == "key") {
- key = key_or_value;
- } else if (type == "value") {
- value = key_or_value;
- }
- }
- const std::string& get_key() const { return key; }
- const std::string& get_value() const { return value; }
-};
-
-using AttributeMap = std::map<unsigned, Attribute>;
-
-// aggregate the attributes into a map
-// the key and value are associated by the index (N)
-// no assumptions are made on the order in which these parameters are added
-void update_attribute_map(const std::string& input, AttributeMap& map) {
- const boost::char_separator<char> sep(".");
- const boost::tokenizer tokens(input, sep);
- auto token = tokens.begin();
- if (*token != "Attributes") {
- return;
- }
- ++token;
-
- if (*token != "entry") {
- return;
- }
- ++token;
-
- unsigned idx;
- try {
- idx = std::stoul(*token);
- } catch (const std::invalid_argument&) {
+void RGWPSDeleteTopicOp::execute(optional_yield y) {
+ op_ret = get_params();
+ if (op_ret < 0) {
return;
}
- ++token;
-
- std::string key_or_value = "";
- // get the rest of the string regardless of dots
- // this is to allow dots in the value
- while (token != tokens.end()) {
- key_or_value.append(*token+".");
- ++token;
- }
- // remove last separator
- key_or_value.pop_back();
-
- auto pos = key_or_value.find("=");
- if (pos != string::npos) {
- const auto key_or_value_lhs = key_or_value.substr(0, pos);
- const auto key_or_value_rhs = url_decode(key_or_value.substr(pos + 1, key_or_value.size() - 1));
- const auto map_it = map.find(idx);
- if (map_it == map.end()) {
- // new entry
- map.emplace(std::make_pair(idx, Attribute(key_or_value_lhs, key_or_value_rhs)));
- } else {
- // existing entry
- map_it->second.set(key_or_value_lhs, key_or_value_rhs);
- }
+ const RGWPubSub ps(driver, s->owner.get_id().tenant);
+ op_ret = ps.remove_topic(this, topic_name, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to remove topic '" << topic_name << ", ret=" << op_ret << dendl;
+ return;
}
-}
+ ldpp_dout(this, 1) << "successfully removed topic '" << topic_name << "'" << dendl;
}
-void RGWHandler_REST_PSTopic_AWS::rgw_topic_parse_input() {
- if (post_body.size() > 0) {
- ldpp_dout(s, 10) << "Content of POST: " << post_body << dendl;
-
- if (post_body.find("Action") != string::npos) {
- const boost::char_separator<char> sep("&");
- const boost::tokenizer<boost::char_separator<char>> tokens(post_body, sep);
- AttributeMap map;
- for (const auto& t : tokens) {
- auto pos = t.find("=");
- if (pos != string::npos) {
- const auto key = t.substr(0, pos);
- if (key == "Action") {
- s->info.args.append(key, t.substr(pos + 1, t.size() - 1));
- } else if (key == "Name" || key == "TopicArn") {
- const auto value = url_decode(t.substr(pos + 1, t.size() - 1));
- s->info.args.append(key, value);
- } else {
- update_attribute_map(t, map);
- }
- }
- }
- // update the regular args with the content of the attribute map
- for (const auto& attr : map) {
- s->info.args.append(attr.second.get_key(), attr.second.get_value());
- }
- }
- const auto payload_hash = rgw::auth::s3::calc_v4_payload_hash(post_body);
- s->info.args.append("PayloadHash", payload_hash);
+using op_generator = RGWOp*(*)();
+static const std::unordered_map<std::string, op_generator> op_generators = {
+ {"CreateTopic", []() -> RGWOp* {return new RGWPSCreateTopicOp;}},
+ {"DeleteTopic", []() -> RGWOp* {return new RGWPSDeleteTopicOp;}},
+ {"ListTopics", []() -> RGWOp* {return new RGWPSListTopicsOp;}},
+ {"GetTopic", []() -> RGWOp* {return new RGWPSGetTopicOp;}},
+ {"GetTopicAttributes", []() -> RGWOp* {return new RGWPSGetTopicAttributesOp;}}
+};
+
+bool RGWHandler_REST_PSTopic_AWS::action_exists(const req_state* s)
+{
+ if (s->info.args.exists("Action")) {
+ const std::string action_name = s->info.args.get("Action");
+ return op_generators.contains(action_name);
}
+ return false;
}
-RGWOp* RGWHandler_REST_PSTopic_AWS::op_post() {
- rgw_topic_parse_input();
+RGWOp *RGWHandler_REST_PSTopic_AWS::op_post()
+{
+ s->dialect = "sns";
+ s->prot_flags = RGW_REST_STS;
if (s->info.args.exists("Action")) {
- const auto action = s->info.args.get("Action");
- if (action.compare("CreateTopic") == 0)
- return new RGWPSCreateTopic_ObjStore_AWS();
- if (action.compare("DeleteTopic") == 0)
- return new RGWPSDeleteTopic_ObjStore_AWS;
- if (action.compare("ListTopics") == 0)
- return new RGWPSListTopics_ObjStore_AWS();
- if (action.compare("GetTopic") == 0)
- return new RGWPSGetTopic_ObjStore_AWS();
- if (action.compare("GetTopicAttributes") == 0)
- return new RGWPSGetTopicAttributes_ObjStore_AWS();
+ const std::string action_name = s->info.args.get("Action");
+ const auto action_it = op_generators.find(action_name);
+ if (action_it != op_generators.end()) {
+ return action_it->second();
+ }
+ ldpp_dout(s, 10) << "unknown action '" << action_name << "' for Topic handler" << dendl;
+ } else {
+ ldpp_dout(s, 10) << "missing action argument in Topic handler" << dendl;
}
-
return nullptr;
}
int RGWHandler_REST_PSTopic_AWS::authorize(const DoutPrefixProvider* dpp, optional_yield y) {
- return RGW_Auth_S3::authorize(dpp, store, auth_registry, s, y);
+ const auto rc = RGW_Auth_S3::authorize(dpp, driver, auth_registry, s, y);
+ if (rc < 0) {
+ return rc;
+ }
+ if (s->auth.identity->is_anonymous()) {
+ ldpp_dout(dpp, 1) << "anonymous user not allowed in topic operations" << dendl;
+ return -ERR_INVALID_REQUEST;
+ }
+ return 0;
}
-
namespace {
// return a unique topic by prefexing with the notification name: <notification>_<topic>
std::string topic_to_unique(const std::string& topic, const std::string& notification) {
// extract the topic from a unique topic of the form: <notification>_<topic>
[[maybe_unused]] std::string unique_to_topic(const std::string& unique_topic, const std::string& notification) {
- if (unique_topic.find(notification + "_") == string::npos) {
+ if (unique_topic.find(notification + "_") == std::string::npos) {
return "";
}
return unique_topic.substr(notification.length() + 1);
}
}
-int remove_notification_by_topic(const DoutPrefixProvider *dpp, const std::string& topic_name, const RGWPubSub::BucketRef& b, optional_yield y, RGWPubSub& ps) {
- int op_ret = b->remove_notification(dpp, topic_name, y);
+int remove_notification_by_topic(const DoutPrefixProvider *dpp, const std::string& topic_name, const RGWPubSub::Bucket& b, optional_yield y, const RGWPubSub& ps) {
+ int op_ret = b.remove_notification(dpp, topic_name, y);
if (op_ret < 0) {
ldpp_dout(dpp, 1) << "failed to remove notification of topic '" << topic_name << "', ret=" << op_ret << dendl;
}
return op_ret;
}
-int delete_all_notifications(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& bucket_topics, const RGWPubSub::BucketRef& b, optional_yield y, RGWPubSub& ps) {
+int delete_all_notifications(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& bucket_topics, const RGWPubSub::Bucket& b, optional_yield y, const RGWPubSub& ps) {
// delete all notifications of on a bucket
for (const auto& topic : bucket_topics.topics) {
- // remove the auto generated subscription of the topic (if exist)
- rgw_pubsub_topic_subs topic_subs;
- int op_ret = ps.get_topic(topic.first, &topic_subs);
- for (const auto& topic_sub_name : topic_subs.subs) {
- auto sub = ps.get_sub(topic_sub_name);
- rgw_pubsub_sub_config sub_conf;
- op_ret = sub->get_conf(&sub_conf);
- if (op_ret < 0) {
- ldpp_dout(dpp, 1) << "failed to get subscription '" << topic_sub_name << "' info, ret=" << op_ret << dendl;
- return op_ret;
- }
- if (!sub_conf.s3_id.empty()) {
- // S3 notification, has autogenerated subscription
- const auto& sub_topic_name = sub_conf.topic;
- op_ret = sub->unsubscribe(dpp, sub_topic_name, y);
- if (op_ret < 0) {
- ldpp_dout(dpp, 1) << "failed to remove auto-generated subscription '" << topic_sub_name << "', ret=" << op_ret << dendl;
- return op_ret;
- }
- }
- }
- op_ret = remove_notification_by_topic(dpp, topic.first, b, y, ps);
+ const auto op_ret = remove_notification_by_topic(dpp, topic.first, b, y, ps);
if (op_ret < 0) {
return op_ret;
}
// command (S3 compliant): PUT /<bucket name>?notification
// a "notification" and a subscription will be auto-generated
// actual configuration is XML encoded in the body of the message
-class RGWPSCreateNotif_ObjStore_S3 : public RGWPSCreateNotifOp {
- rgw_pubsub_s3_notifications configurations;
+class RGWPSCreateNotifOp : public RGWDefaultResponseOp {
+ int verify_params() override {
+ bool exists;
+ const auto no_value = s->info.args.get("notification", &exists);
+ if (!exists) {
+ ldpp_dout(this, 1) << "missing required param 'notification'" << dendl;
+ return -EINVAL;
+ }
+ if (no_value.length() > 0) {
+ ldpp_dout(this, 1) << "param 'notification' should not have any value" << dendl;
+ return -EINVAL;
+ }
+ if (s->bucket_name.empty()) {
+ ldpp_dout(this, 1) << "request must be on a bucket" << dendl;
+ return -EINVAL;
+ }
+ return 0;
+ }
- int get_params_from_body() {
+ int get_params_from_body(rgw_pubsub_s3_notifications& configurations) {
const auto max_size = s->cct->_conf->rgw_max_put_param_size;
int r;
bufferlist data;
}
return 0;
}
+public:
+ int verify_permission(optional_yield y) override;
- int get_params() override {
- bool exists;
- const auto no_value = s->info.args.get("notification", &exists);
- if (!exists) {
- ldpp_dout(this, 1) << "missing required param 'notification'" << dendl;
- return -EINVAL;
- }
- if (no_value.length() > 0) {
- ldpp_dout(this, 1) << "param 'notification' should not have any value" << dendl;
- return -EINVAL;
- }
- if (s->bucket_name.empty()) {
- ldpp_dout(this, 1) << "request must be on a bucket" << dendl;
- return -EINVAL;
- }
- bucket_name = s->bucket_name;
- return 0;
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
}
-public:
const char* name() const override { return "pubsub_notification_create_s3"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_CREATE; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
+
+
void execute(optional_yield) override;
};
-void RGWPSCreateNotif_ObjStore_S3::execute(optional_yield y) {
- op_ret = get_params_from_body();
+void RGWPSCreateNotifOp::execute(optional_yield y) {
+ op_ret = verify_params();
if (op_ret < 0) {
return;
}
- ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
- auto b = ps->get_bucket(bucket_info.bucket);
- ceph_assert(b);
+ rgw_pubsub_s3_notifications configurations;
+ op_ret = get_params_from_body(configurations);
+ if (op_ret < 0) {
+ return;
+ }
- std::string data_bucket_prefix = "";
- std::string data_oid_prefix = "";
- bool push_only = true;
- if (store->get_sync_module()) {
- const auto psmodule = dynamic_cast<RGWPSSyncModuleInstance*>(store->get_sync_module().get());
- if (psmodule) {
- const auto& conf = psmodule->get_effective_conf();
- data_bucket_prefix = conf["data_bucket_prefix"];
- data_oid_prefix = conf["data_oid_prefix"];
- // TODO: allow "push-only" on PS zone as well
- push_only = false;
- }
+ std::unique_ptr<rgw::sal::User> user = driver->get_user(s->owner.get_id());
+ std::unique_ptr<rgw::sal::Bucket> bucket;
+ op_ret = driver->get_bucket(this, user.get(), s->owner.get_id().tenant, s->bucket_name, &bucket, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to get bucket '" << s->bucket_name << "' info, ret = " << op_ret << dendl;
+ return;
}
+ const RGWPubSub ps(driver, s->owner.get_id().tenant);
+ const RGWPubSub::Bucket b(ps, bucket.get());
+
if(configurations.list.empty()) {
// get all topics on a bucket
rgw_pubsub_bucket_topics bucket_topics;
- op_ret = b->get_topics(&bucket_topics);
+ op_ret = b.get_topics(this, bucket_topics, y);
if (op_ret < 0) {
- ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << bucket_info.bucket.name << "', ret=" << op_ret << dendl;
+ ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << s->bucket_name << "', ret=" << op_ret << dendl;
return;
}
- op_ret = delete_all_notifications(this, bucket_topics, b, y, *ps);
+ op_ret = delete_all_notifications(this, bucket_topics, b, y, ps);
return;
}
// get topic information. destination information is stored in the topic
rgw_pubsub_topic topic_info;
- op_ret = ps->get_topic(topic_name, &topic_info);
+ op_ret = ps.get_topic(this, topic_name, topic_info, y);
if (op_ret < 0) {
ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
return;
// generate the internal topic. destination is stored here for the "push-only" case
// when no subscription exists
// ARN is cached to make the "GET" method faster
- op_ret = ps->create_topic(this, unique_topic_name, topic_info.dest, topic_info.arn, topic_info.opaque_data, y);
+ op_ret = ps.create_topic(this, unique_topic_name, topic_info.dest, topic_info.arn, topic_info.opaque_data, y);
if (op_ret < 0) {
ldpp_dout(this, 1) << "failed to auto-generate unique topic '" << unique_topic_name <<
"', ret=" << op_ret << dendl;
ldpp_dout(this, 20) << "successfully auto-generated unique topic '" << unique_topic_name << "'" << dendl;
// generate the notification
rgw::notify::EventTypeList events;
- op_ret = b->create_notification(this, unique_topic_name, c.events, std::make_optional(c.filter), notif_name, y);
+ op_ret = b.create_notification(this, unique_topic_name, c.events, std::make_optional(c.filter), notif_name, y);
if (op_ret < 0) {
ldpp_dout(this, 1) << "failed to auto-generate notification for unique topic '" << unique_topic_name <<
"', ret=" << op_ret << dendl;
// rollback generated topic (ignore return value)
- ps->remove_topic(this, unique_topic_name, y);
+ ps.remove_topic(this, unique_topic_name, y);
return;
}
ldpp_dout(this, 20) << "successfully auto-generated notification for unique topic '" << unique_topic_name << "'" << dendl;
-
- if (!push_only) {
- // generate the subscription with destination information from the original topic
- rgw_pubsub_sub_dest dest = topic_info.dest;
- dest.bucket_name = data_bucket_prefix + s->owner.get_id().to_str() + "-" + unique_topic_name;
- dest.oid_prefix = data_oid_prefix + notif_name + "/";
- auto sub = ps->get_sub(notif_name);
- op_ret = sub->subscribe(this, unique_topic_name, dest, y, notif_name);
- if (op_ret < 0) {
- ldpp_dout(this, 1) << "failed to auto-generate subscription '" << notif_name << "', ret=" << op_ret << dendl;
- // rollback generated notification (ignore return value)
- b->remove_notification(this, unique_topic_name, y);
- // rollback generated topic (ignore return value)
- ps->remove_topic(this, unique_topic_name, y);
- return;
- }
- ldpp_dout(this, 20) << "successfully auto-generated subscription '" << notif_name << "'" << dendl;
- }
}
}
-// command (extension to S3): DELETE /bucket?notification[=<notification-id>]
-class RGWPSDeleteNotif_ObjStore_S3 : public RGWPSDeleteNotifOp {
-private:
- std::string notif_name;
+int RGWPSCreateNotifOp::verify_permission(optional_yield y) {
+ if (!verify_bucket_permission(this, s, rgw::IAM::s3PutBucketNotification)) {
+ return -EACCES;
+ }
- int get_params() override {
+ return 0;
+}
+
+// command (extension to S3): DELETE /bucket?notification[=<notification-id>]
+class RGWPSDeleteNotifOp : public RGWDefaultResponseOp {
+ int get_params(std::string& notif_name) const {
bool exists;
notif_name = s->info.args.get("notification", &exists);
if (!exists) {
ldpp_dout(this, 1) << "request must be on a bucket" << dendl;
return -EINVAL;
}
- bucket_name = s->bucket_name;
return 0;
}
public:
- void execute(optional_yield y) override;
+ int verify_permission(optional_yield y) override;
+
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+
const char* name() const override { return "pubsub_notification_delete_s3"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_DELETE; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
+
+ void execute(optional_yield y) override;
};
-void RGWPSDeleteNotif_ObjStore_S3::execute(optional_yield y) {
- op_ret = get_params();
+void RGWPSDeleteNotifOp::execute(optional_yield y) {
+ std::string notif_name;
+ op_ret = get_params(notif_name);
+ if (op_ret < 0) {
+ return;
+ }
+
+ std::unique_ptr<rgw::sal::User> user = driver->get_user(s->owner.get_id());
+ std::unique_ptr<rgw::sal::Bucket> bucket;
+ op_ret = driver->get_bucket(this, user.get(), s->owner.get_id().tenant, s->bucket_name, &bucket, y);
if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to get bucket '" << s->bucket_name << "' info, ret = " << op_ret << dendl;
return;
}
- ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
- auto b = ps->get_bucket(bucket_info.bucket);
- ceph_assert(b);
+ const RGWPubSub ps(driver, s->owner.get_id().tenant);
+ const RGWPubSub::Bucket b(ps, bucket.get());
// get all topics on a bucket
rgw_pubsub_bucket_topics bucket_topics;
- op_ret = b->get_topics(&bucket_topics);
+ op_ret = b.get_topics(this, bucket_topics, y);
if (op_ret < 0) {
- ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << bucket_info.bucket.name << "', ret=" << op_ret << dendl;
+ ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << s->bucket_name << "', ret=" << op_ret << dendl;
return;
}
// delete a specific notification
const auto unique_topic = find_unique_topic(bucket_topics, notif_name);
if (unique_topic) {
- // remove the auto generated subscription according to notification name (if exist)
const auto unique_topic_name = unique_topic->get().topic.name;
- auto sub = ps->get_sub(notif_name);
- op_ret = sub->unsubscribe(this, unique_topic_name, y);
- if (op_ret < 0 && op_ret != -ENOENT) {
- ldpp_dout(this, 1) << "failed to remove auto-generated subscription '" << notif_name << "', ret=" << op_ret << dendl;
- return;
- }
- op_ret = remove_notification_by_topic(this, unique_topic_name, b, y, *ps);
+ op_ret = remove_notification_by_topic(this, unique_topic_name, b, y, ps);
return;
}
// notification to be removed is not found - considered success
return;
}
- op_ret = delete_all_notifications(this, bucket_topics, b, y, *ps);
+ op_ret = delete_all_notifications(this, bucket_topics, b, y, ps);
+}
+
+int RGWPSDeleteNotifOp::verify_permission(optional_yield y) {
+ if (!verify_bucket_permission(this, s, rgw::IAM::s3PutBucketNotification)) {
+ return -EACCES;
+ }
+
+ return 0;
}
// command (S3 compliant): GET /bucket?notification[=<notification-id>]
-class RGWPSListNotifs_ObjStore_S3 : public RGWPSListNotifsOp {
-private:
- std::string notif_name;
+class RGWPSListNotifsOp : public RGWOp {
rgw_pubsub_s3_notifications notifications;
- int get_params() override {
+ int get_params(std::string& notif_name) const {
bool exists;
notif_name = s->info.args.get("notification", &exists);
if (!exists) {
ldpp_dout(this, 1) << "request must be on a bucket" << dendl;
return -EINVAL;
}
- bucket_name = s->bucket_name;
return 0;
}
public:
+ int verify_permission(optional_yield y) override;
+
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+
+ const char* name() const override { return "pubsub_notifications_get_s3"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_LIST; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+
void execute(optional_yield y) override;
void send_response() override {
if (op_ret) {
notifications.dump_xml(s->formatter);
rgw_flush_formatter_and_reset(s, s->formatter);
}
- const char* name() const override { return "pubsub_notifications_get_s3"; }
};
-void RGWPSListNotifs_ObjStore_S3::execute(optional_yield y) {
- ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
- auto b = ps->get_bucket(bucket_info.bucket);
- ceph_assert(b);
+void RGWPSListNotifsOp::execute(optional_yield y) {
+ std::string notif_name;
+ op_ret = get_params(notif_name);
+ if (op_ret < 0) {
+ return;
+ }
+
+ std::unique_ptr<rgw::sal::User> user = driver->get_user(s->owner.get_id());
+ std::unique_ptr<rgw::sal::Bucket> bucket;
+ op_ret = driver->get_bucket(this, user.get(), s->owner.get_id().tenant, s->bucket_name, &bucket, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to get bucket '" << s->bucket_name << "' info, ret = " << op_ret << dendl;
+ return;
+ }
+
+ const RGWPubSub ps(driver, s->owner.get_id().tenant);
+ const RGWPubSub::Bucket b(ps, bucket.get());
// get all topics on a bucket
rgw_pubsub_bucket_topics bucket_topics;
- op_ret = b->get_topics(&bucket_topics);
+ op_ret = b.get_topics(this, bucket_topics, y);
if (op_ret < 0) {
- ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << bucket_info.bucket.name << "', ret=" << op_ret << dendl;
+ ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << s->bucket_name << "', ret=" << op_ret << dendl;
return;
}
if (!notif_name.empty()) {
}
}
+int RGWPSListNotifsOp::verify_permission(optional_yield y) {
+ if (!verify_bucket_permission(this, s, rgw::IAM::s3GetBucketNotification)) {
+ return -EACCES;
+ }
+
+ return 0;
+}
+
RGWOp* RGWHandler_REST_PSNotifs_S3::op_get() {
- return new RGWPSListNotifs_ObjStore_S3();
+ return new RGWPSListNotifsOp();
}
RGWOp* RGWHandler_REST_PSNotifs_S3::op_put() {
- return new RGWPSCreateNotif_ObjStore_S3();
+ return new RGWPSCreateNotifOp();
}
RGWOp* RGWHandler_REST_PSNotifs_S3::op_delete() {
- return new RGWPSDeleteNotif_ObjStore_S3();
+ return new RGWPSDeleteNotifOp();
}
RGWOp* RGWHandler_REST_PSNotifs_S3::create_get_op() {
- return new RGWPSListNotifs_ObjStore_S3();
+ return new RGWPSListNotifsOp();
}
RGWOp* RGWHandler_REST_PSNotifs_S3::create_put_op() {
- return new RGWPSCreateNotif_ObjStore_S3();
+ return new RGWPSCreateNotifOp();
}
RGWOp* RGWHandler_REST_PSNotifs_S3::create_delete_op() {
- return new RGWPSDeleteNotif_ObjStore_S3();
+ return new RGWPSDeleteNotifOp();
}