X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=ceph%2Fsrc%2Frgw%2Frgw_rest_pubsub.cc;h=c20cbe7c2333e14f8712f492a474d1a4a5048f2e;hb=20effc670b57271cb089376d6d0800990e5218d5;hp=272d98e62d05620459b5702f51bced6e5d32c396;hpb=b3b6e05ebb17ef6ac03c6f6f46d242185719a08e;p=ceph.git diff --git a/ceph/src/rgw/rgw_rest_pubsub.cc b/ceph/src/rgw/rgw_rest_pubsub.cc index 272d98e62..c20cbe7c2 100644 --- a/ceph/src/rgw/rgw_rest_pubsub.cc +++ b/ceph/src/rgw/rgw_rest_pubsub.cc @@ -21,6 +21,8 @@ #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rgw +using namespace std; + static const char* AWS_SNS_NS("https://sns.amazonaws.com/doc/2010-03-31/"); // command (AWS compliant): @@ -38,7 +40,7 @@ public: opaque_data = s->info.args.get("OpaqueData"); dest.push_endpoint = s->info.args.get("push-endpoint"); - dest.persistent = s->info.args.exists("persistent"); + s->info.args.get_bool("persistent", &dest.persistent, false); if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) { return -EINVAL; @@ -69,7 +71,7 @@ public: dest.arn_topic = topic_name; // the topic ARN will be sent in the reply const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns, - store->svc()->zone->get_zonegroup().get_name(), + store->get_zone()->get_zonegroup().get_name(), s->user->get_tenant(), topic_name); topic_arn = arn.to_string(); return 0; @@ -421,6 +423,50 @@ auto find_unique_topic(const rgw_pubsub_bucket_topics& bucket_topics, const std: } } +int remove_notification_by_topic(const DoutPrefixProvider *dpp, const std::string& topic_name, const RGWPubSub::BucketRef& b, optional_yield y, RGWPubSub& ps) { + int op_ret = b->remove_notification(dpp, topic_name, y); + if (op_ret < 0) { + ldpp_dout(dpp, 1) << "failed to remove notification of topic '" << topic_name << "', ret=" << op_ret << dendl; + } + op_ret = ps.remove_topic(dpp, topic_name, y); + if (op_ret < 0) { + ldpp_dout(dpp, 1) << "failed to remove auto-generated topic '" << topic_name << "', ret=" << op_ret << dendl; + } + return op_ret; +} + +int delete_all_notifications(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& bucket_topics, const RGWPubSub::BucketRef& b, optional_yield y, RGWPubSub& ps) { + // delete all notifications of on a bucket + for (const auto& topic : bucket_topics.topics) { + // remove the auto generated subscription of the topic (if exist) + rgw_pubsub_topic_subs topic_subs; + int op_ret = ps.get_topic(topic.first, &topic_subs); + for (const auto& topic_sub_name : topic_subs.subs) { + auto sub = ps.get_sub(topic_sub_name); + rgw_pubsub_sub_config sub_conf; + op_ret = sub->get_conf(&sub_conf); + if (op_ret < 0) { + ldpp_dout(dpp, 1) << "failed to get subscription '" << topic_sub_name << "' info, ret=" << op_ret << dendl; + return op_ret; + } + if (!sub_conf.s3_id.empty()) { + // S3 notification, has autogenerated subscription + const auto& sub_topic_name = sub_conf.topic; + op_ret = sub->unsubscribe(dpp, sub_topic_name, y); + if (op_ret < 0) { + ldpp_dout(dpp, 1) << "failed to remove auto-generated subscription '" << topic_sub_name << "', ret=" << op_ret << dendl; + return op_ret; + } + } + } + op_ret = remove_notification_by_topic(dpp, topic.first, b, y, ps); + if (op_ret < 0) { + return op_ret; + } + } + return 0; +} + // command (S3 compliant): PUT /?notification // a "notification" and a subscription will be auto-generated // actual configuration is XML encoded in the body of the message @@ -431,7 +477,7 @@ class RGWPSCreateNotif_ObjStore_S3 : public RGWPSCreateNotifOp { const auto max_size = s->cct->_conf->rgw_max_put_param_size; int r; bufferlist data; - std::tie(r, data) = rgw_rest_read_all_input(s, max_size, false); + std::tie(r, data) = read_all_input(s, max_size, false); if (r < 0) { ldpp_dout(this, 1) << "failed to read XML payload" << dendl; @@ -454,6 +500,7 @@ class RGWPSCreateNotif_ObjStore_S3 : public RGWPSCreateNotifOp { } try { // NotificationConfigurations is mandatory + // It can be empty which means we delete all the notifications RGWXMLDecoder::decode_xml("NotificationConfiguration", configurations, &parser, true); } catch (RGWXMLDecoder::err& err) { ldpp_dout(this, 1) << "failed to parse XML payload. error: " << err << dendl; @@ -492,14 +539,15 @@ void RGWPSCreateNotif_ObjStore_S3::execute(optional_yield y) { return; } - ps.emplace(store, s->owner.get_id().tenant); + ps.emplace(static_cast(store), s->owner.get_id().tenant); auto b = ps->get_bucket(bucket_info.bucket); ceph_assert(b); + std::string data_bucket_prefix = ""; std::string data_oid_prefix = ""; bool push_only = true; - if (store->getRados()->get_sync_module()) { - const auto psmodule = dynamic_cast(store->getRados()->get_sync_module().get()); + if (store->get_sync_module()) { + const auto psmodule = dynamic_cast(store->get_sync_module().get()); if (psmodule) { const auto& conf = psmodule->get_effective_conf(); data_bucket_prefix = conf["data_bucket_prefix"]; @@ -509,6 +557,19 @@ void RGWPSCreateNotif_ObjStore_S3::execute(optional_yield y) { } } + if(configurations.list.empty()) { + // get all topics on a bucket + rgw_pubsub_bucket_topics bucket_topics; + op_ret = b->get_topics(&bucket_topics); + if (op_ret < 0) { + ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << bucket_info.bucket.name << "', ret=" << op_ret << dendl; + return; + } + + op_ret = delete_all_notifications(this, bucket_topics, b, y, *ps); + return; + } + for (const auto& c : configurations.list) { const auto& notif_name = c.id; if (notif_name.empty()) { @@ -613,17 +674,6 @@ private: return 0; } - void remove_notification_by_topic(const std::string& topic_name, const RGWPubSub::BucketRef& b, optional_yield y) { - op_ret = b->remove_notification(this, topic_name, y); - if (op_ret < 0) { - ldpp_dout(this, 1) << "failed to remove notification of topic '" << topic_name << "', ret=" << op_ret << dendl; - } - op_ret = ps->remove_topic(this, topic_name, y); - if (op_ret < 0) { - ldpp_dout(this, 1) << "failed to remove auto-generated topic '" << topic_name << "', ret=" << op_ret << dendl; - } - } - public: void execute(optional_yield y) override; const char* name() const override { return "pubsub_notification_delete_s3"; } @@ -635,7 +685,7 @@ void RGWPSDeleteNotif_ObjStore_S3::execute(optional_yield y) { return; } - ps.emplace(store, s->owner.get_id().tenant); + ps.emplace(static_cast(store), s->owner.get_id().tenant); auto b = ps->get_bucket(bucket_info.bucket); ceph_assert(b); @@ -659,7 +709,7 @@ void RGWPSDeleteNotif_ObjStore_S3::execute(optional_yield y) { ldpp_dout(this, 1) << "failed to remove auto-generated subscription '" << notif_name << "', ret=" << op_ret << dendl; return; } - remove_notification_by_topic(unique_topic_name, b, y); + op_ret = remove_notification_by_topic(this, unique_topic_name, b, y, *ps); return; } // notification to be removed is not found - considered success @@ -667,31 +717,7 @@ void RGWPSDeleteNotif_ObjStore_S3::execute(optional_yield y) { return; } - // delete all notification of on a bucket - for (const auto& topic : bucket_topics.topics) { - // remove the auto generated subscription of the topic (if exist) - rgw_pubsub_topic_subs topic_subs; - op_ret = ps->get_topic(topic.first, &topic_subs); - for (const auto& topic_sub_name : topic_subs.subs) { - auto sub = ps->get_sub(topic_sub_name); - rgw_pubsub_sub_config sub_conf; - op_ret = sub->get_conf(&sub_conf); - if (op_ret < 0) { - ldpp_dout(this, 1) << "failed to get subscription '" << topic_sub_name << "' info, ret=" << op_ret << dendl; - return; - } - if (!sub_conf.s3_id.empty()) { - // S3 notification, has autogenerated subscription - const auto& sub_topic_name = sub_conf.topic; - op_ret = sub->unsubscribe(this, sub_topic_name, y); - if (op_ret < 0) { - ldpp_dout(this, 1) << "failed to remove auto-generated subscription '" << topic_sub_name << "', ret=" << op_ret << dendl; - return; - } - } - } - remove_notification_by_topic(topic.first, b, y); - } + op_ret = delete_all_notifications(this, bucket_topics, b, y, *ps); } // command (S3 compliant): GET /bucket?notification[=] @@ -734,7 +760,7 @@ public: }; void RGWPSListNotifs_ObjStore_S3::execute(optional_yield y) { - ps.emplace(store, s->owner.get_id().tenant); + ps.emplace(static_cast(store), s->owner.get_id().tenant); auto b = ps->get_bucket(bucket_info.bucket); ceph_assert(b);