#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw
+using namespace std;
+
static const char* AWS_SNS_NS("https://sns.amazonaws.com/doc/2010-03-31/");
// command (AWS compliant):
opaque_data = s->info.args.get("OpaqueData");
dest.push_endpoint = s->info.args.get("push-endpoint");
- dest.persistent = s->info.args.exists("persistent");
+ s->info.args.get_bool("persistent", &dest.persistent, false);
if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
return -EINVAL;
dest.arn_topic = topic_name;
// the topic ARN will be sent in the reply
const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns,
- store->svc()->zone->get_zonegroup().get_name(),
+ store->get_zone()->get_zonegroup().get_name(),
s->user->get_tenant(), topic_name);
topic_arn = arn.to_string();
return 0;
}
}
+int remove_notification_by_topic(const DoutPrefixProvider *dpp, const std::string& topic_name, const RGWPubSub::BucketRef& b, optional_yield y, RGWPubSub& ps) {
+ int op_ret = b->remove_notification(dpp, topic_name, y);
+ if (op_ret < 0) {
+ ldpp_dout(dpp, 1) << "failed to remove notification of topic '" << topic_name << "', ret=" << op_ret << dendl;
+ }
+ op_ret = ps.remove_topic(dpp, topic_name, y);
+ if (op_ret < 0) {
+ ldpp_dout(dpp, 1) << "failed to remove auto-generated topic '" << topic_name << "', ret=" << op_ret << dendl;
+ }
+ return op_ret;
+}
+
+int delete_all_notifications(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& bucket_topics, const RGWPubSub::BucketRef& b, optional_yield y, RGWPubSub& ps) {
+ // delete all notifications of on a bucket
+ for (const auto& topic : bucket_topics.topics) {
+ // remove the auto generated subscription of the topic (if exist)
+ rgw_pubsub_topic_subs topic_subs;
+ int op_ret = ps.get_topic(topic.first, &topic_subs);
+ for (const auto& topic_sub_name : topic_subs.subs) {
+ auto sub = ps.get_sub(topic_sub_name);
+ rgw_pubsub_sub_config sub_conf;
+ op_ret = sub->get_conf(&sub_conf);
+ if (op_ret < 0) {
+ ldpp_dout(dpp, 1) << "failed to get subscription '" << topic_sub_name << "' info, ret=" << op_ret << dendl;
+ return op_ret;
+ }
+ if (!sub_conf.s3_id.empty()) {
+ // S3 notification, has autogenerated subscription
+ const auto& sub_topic_name = sub_conf.topic;
+ op_ret = sub->unsubscribe(dpp, sub_topic_name, y);
+ if (op_ret < 0) {
+ ldpp_dout(dpp, 1) << "failed to remove auto-generated subscription '" << topic_sub_name << "', ret=" << op_ret << dendl;
+ return op_ret;
+ }
+ }
+ }
+ op_ret = remove_notification_by_topic(dpp, topic.first, b, y, ps);
+ if (op_ret < 0) {
+ return op_ret;
+ }
+ }
+ return 0;
+}
+
// command (S3 compliant): PUT /<bucket name>?notification
// a "notification" and a subscription will be auto-generated
// actual configuration is XML encoded in the body of the message
const auto max_size = s->cct->_conf->rgw_max_put_param_size;
int r;
bufferlist data;
- std::tie(r, data) = rgw_rest_read_all_input(s, max_size, false);
+ std::tie(r, data) = read_all_input(s, max_size, false);
if (r < 0) {
ldpp_dout(this, 1) << "failed to read XML payload" << dendl;
}
try {
// NotificationConfigurations is mandatory
+ // It can be empty which means we delete all the notifications
RGWXMLDecoder::decode_xml("NotificationConfiguration", configurations, &parser, true);
} catch (RGWXMLDecoder::err& err) {
ldpp_dout(this, 1) << "failed to parse XML payload. error: " << err << dendl;
return;
}
- ps.emplace(store, s->owner.get_id().tenant);
+ ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
auto b = ps->get_bucket(bucket_info.bucket);
ceph_assert(b);
+
std::string data_bucket_prefix = "";
std::string data_oid_prefix = "";
bool push_only = true;
- if (store->getRados()->get_sync_module()) {
- const auto psmodule = dynamic_cast<RGWPSSyncModuleInstance*>(store->getRados()->get_sync_module().get());
+ if (store->get_sync_module()) {
+ const auto psmodule = dynamic_cast<RGWPSSyncModuleInstance*>(store->get_sync_module().get());
if (psmodule) {
const auto& conf = psmodule->get_effective_conf();
data_bucket_prefix = conf["data_bucket_prefix"];
}
}
+ if(configurations.list.empty()) {
+ // get all topics on a bucket
+ rgw_pubsub_bucket_topics bucket_topics;
+ op_ret = b->get_topics(&bucket_topics);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << bucket_info.bucket.name << "', ret=" << op_ret << dendl;
+ return;
+ }
+
+ op_ret = delete_all_notifications(this, bucket_topics, b, y, *ps);
+ return;
+ }
+
for (const auto& c : configurations.list) {
const auto& notif_name = c.id;
if (notif_name.empty()) {
return 0;
}
- void remove_notification_by_topic(const std::string& topic_name, const RGWPubSub::BucketRef& b, optional_yield y) {
- op_ret = b->remove_notification(this, topic_name, y);
- if (op_ret < 0) {
- ldpp_dout(this, 1) << "failed to remove notification of topic '" << topic_name << "', ret=" << op_ret << dendl;
- }
- op_ret = ps->remove_topic(this, topic_name, y);
- if (op_ret < 0) {
- ldpp_dout(this, 1) << "failed to remove auto-generated topic '" << topic_name << "', ret=" << op_ret << dendl;
- }
- }
-
public:
void execute(optional_yield y) override;
const char* name() const override { return "pubsub_notification_delete_s3"; }
return;
}
- ps.emplace(store, s->owner.get_id().tenant);
+ ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
auto b = ps->get_bucket(bucket_info.bucket);
ceph_assert(b);
ldpp_dout(this, 1) << "failed to remove auto-generated subscription '" << notif_name << "', ret=" << op_ret << dendl;
return;
}
- remove_notification_by_topic(unique_topic_name, b, y);
+ op_ret = remove_notification_by_topic(this, unique_topic_name, b, y, *ps);
return;
}
// notification to be removed is not found - considered success
return;
}
- // delete all notification of on a bucket
- for (const auto& topic : bucket_topics.topics) {
- // remove the auto generated subscription of the topic (if exist)
- rgw_pubsub_topic_subs topic_subs;
- op_ret = ps->get_topic(topic.first, &topic_subs);
- for (const auto& topic_sub_name : topic_subs.subs) {
- auto sub = ps->get_sub(topic_sub_name);
- rgw_pubsub_sub_config sub_conf;
- op_ret = sub->get_conf(&sub_conf);
- if (op_ret < 0) {
- ldpp_dout(this, 1) << "failed to get subscription '" << topic_sub_name << "' info, ret=" << op_ret << dendl;
- return;
- }
- if (!sub_conf.s3_id.empty()) {
- // S3 notification, has autogenerated subscription
- const auto& sub_topic_name = sub_conf.topic;
- op_ret = sub->unsubscribe(this, sub_topic_name, y);
- if (op_ret < 0) {
- ldpp_dout(this, 1) << "failed to remove auto-generated subscription '" << topic_sub_name << "', ret=" << op_ret << dendl;
- return;
- }
- }
- }
- remove_notification_by_topic(topic.first, b, y);
- }
+ op_ret = delete_all_notifications(this, bucket_topics, b, y, *ps);
}
// command (S3 compliant): GET /bucket?notification[=<notification-id>]
};
void RGWPSListNotifs_ObjStore_S3::execute(optional_yield y) {
- ps.emplace(store, s->owner.get_id().tenant);
+ ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
auto b = ps->get_bucket(bucket_info.bucket);
ceph_assert(b);