]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_rest_pubsub.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_rest_pubsub.cc
index 65f783a293ef4b5b9f3a8373484e6124b8654fa2..c20cbe7c2333e14f8712f492a474d1a4a5048f2e 100644 (file)
 #include "rgw_rest_s3.h"
 #include "rgw_arn.h"
 #include "rgw_auth_s3.h"
+#include "rgw_notify.h"
+#include "rgw_sal_rados.h"
 #include "services/svc_zone.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rgw
 
+using namespace std;
+
+static const char* AWS_SNS_NS("https://sns.amazonaws.com/doc/2010-03-31/");
 
 // command (AWS compliant): 
 // POST
-// Action=CreateTopic&Name=<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]
+// Action=CreateTopic&Name=<topic-name>[&OpaqueData=data][&push-endpoint=<endpoint>[&persistent][&<arg1>=<value1>]]
 class RGWPSCreateTopic_ObjStore_AWS : public RGWPSCreateTopicOp {
 public:
   int get_params() override {
     topic_name = s->info.args.get("Name");
     if (topic_name.empty()) {
-      ldout(s->cct, 1) << "CreateTopic Action 'Name' argument is missing" << dendl;
+      ldpp_dout(this, 1) << "CreateTopic Action 'Name' argument is missing" << dendl;
       return -EINVAL;
     }
 
     opaque_data = s->info.args.get("OpaqueData");
 
     dest.push_endpoint = s->info.args.get("push-endpoint");
+    s->info.args.get_bool("persistent", &dest.persistent, false);
 
     if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
       return -EINVAL;
     }
-    for (const auto param : s->info.args.get_params()) {
+    for (const auto& param : s->info.args.get_params()) {
       if (param.first == "Action" || param.first == "Name" || param.first == "PayloadHash") {
         continue;
       }
@@ -50,6 +56,13 @@ public:
       // remove last separator
       dest.push_endpoint_args.pop_back();
     }
+    if (!dest.push_endpoint.empty() && dest.persistent) {
+      const auto ret = rgw::notify::add_persistent_topic(topic_name, s->yield);
+      if (ret < 0) {
+        ldpp_dout(this, 1) << "CreateTopic Action failed to create queue for persistent topics. error:" << ret << dendl;
+        return ret;
+      }
+    }
     
     // dest object only stores endpoint info
     // bucket to store events/records will be set only when subscription is created
@@ -58,7 +71,7 @@ public:
     dest.arn_topic = topic_name;
     // the topic ARN will be sent in the reply
     const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns, 
-        store->svc()->zone->get_zonegroup().get_name(),
+        store->get_zone()->get_zonegroup().get_name(),
         s->user->get_tenant(), topic_name);
     topic_arn = arn.to_string();
     return 0;
@@ -76,14 +89,14 @@ public:
     }
 
     const auto f = s->formatter;
-    f->open_object_section_in_ns("CreateTopicResponse", "https://sns.amazonaws.com/doc/2010-03-31/");
+    f->open_object_section_in_ns("CreateTopicResponse", AWS_SNS_NS);
     f->open_object_section("CreateTopicResult");
     encode_xml("TopicArn", topic_arn, f); 
-    f->close_section();
+    f->close_section(); // CreateTopicResult
     f->open_object_section("ResponseMetadata");
     encode_xml("RequestId", s->req_id, f); 
-    f->close_section();
-    f->close_section();
+    f->close_section(); // ResponseMetadata
+    f->close_section(); // CreateTopicResponse
     rgw_flush_formatter_and_reset(s, f);
   }
 };
@@ -105,14 +118,14 @@ public:
     }
 
     const auto f = s->formatter;
-    f->open_object_section_in_ns("ListTopicsResponse", "https://sns.amazonaws.com/doc/2010-03-31/");
+    f->open_object_section_in_ns("ListTopicsResponse", AWS_SNS_NS);
     f->open_object_section("ListTopicsResult");
     encode_xml("Topics", result, f); 
-    f->close_section();
+    f->close_section(); // ListTopicsResult
     f->open_object_section("ResponseMetadata");
     encode_xml("RequestId", s->req_id, f); 
-    f->close_section();
-    f->close_section();
+    f->close_section(); // ResponseMetadat
+    f->close_section(); // ListTopicsResponse
     rgw_flush_formatter_and_reset(s, f);
   }
 };
@@ -126,7 +139,7 @@ public:
     const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
 
     if (!topic_arn || topic_arn->resource.empty()) {
-        ldout(s->cct, 1) << "GetTopic Action 'TopicArn' argument is missing or invalid" << dendl;
+        ldpp_dout(this, 1) << "GetTopic Action 'TopicArn' argument is missing or invalid" << dendl;
         return -EINVAL;
     }
 
@@ -158,6 +171,47 @@ public:
   }
 };
 
+// command (AWS compliant): 
+// POST
+// Action=GetTopicAttributes&TopicArn=<topic-arn>
+class RGWPSGetTopicAttributes_ObjStore_AWS : public RGWPSGetTopicOp {
+public:
+  int get_params() override {
+    const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
+
+    if (!topic_arn || topic_arn->resource.empty()) {
+        ldpp_dout(this, 1) << "GetTopicAttribute Action 'TopicArn' argument is missing or invalid" << dendl;
+        return -EINVAL;
+    }
+
+    topic_name = topic_arn->resource;
+    return 0;
+  }
+
+  void send_response() override {
+    if (op_ret) {
+      set_req_state_err(s, op_ret);
+    }
+    dump_errno(s);
+    end_header(s, this, "application/xml");
+
+    if (op_ret < 0) {
+      return;
+    }
+
+    const auto f = s->formatter;
+    f->open_object_section_in_ns("GetTopicAttributesResponse", AWS_SNS_NS);
+    f->open_object_section("GetTopicAttributesResult");
+    result.topic.dump_xml_as_attributes(f);
+    f->close_section(); // GetTopicAttributesResult
+    f->open_object_section("ResponseMetadata");
+    encode_xml("RequestId", s->req_id, f); 
+    f->close_section(); // ResponseMetadata
+    f->close_section(); // GetTopicAttributesResponse
+    rgw_flush_formatter_and_reset(s, f);
+  }
+};
+
 // command (AWS compliant): 
 // POST
 // Action=DeleteTopic&TopicArn=<topic-arn>
@@ -167,11 +221,24 @@ public:
     const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
 
     if (!topic_arn || topic_arn->resource.empty()) {
-      ldout(s->cct, 1) << "DeleteTopic Action 'TopicArn' argument is missing or invalid" << dendl;
+      ldpp_dout(this, 1) << "DeleteTopic Action 'TopicArn' argument is missing or invalid" << dendl;
       return -EINVAL;
     }
 
     topic_name = topic_arn->resource;
+
+    // upon deletion it is not known if topic is persistent or not
+    // will try to delete the persistent topic anyway
+    const auto ret = rgw::notify::remove_persistent_topic(topic_name, s->yield);
+    if (ret == -ENOENT) {
+      // topic was not persistent, or already deleted
+      return 0;
+    }
+    if (ret < 0) {
+      ldpp_dout(this, 1) << "DeleteTopic Action failed to remove queue for persistent topics. error:" << ret << dendl;
+      return ret;
+    }
+
     return 0;
   }
   
@@ -187,11 +254,11 @@ public:
     }
 
     const auto f = s->formatter;
-    f->open_object_section_in_ns("DeleteTopicResponse", "https://sns.amazonaws.com/doc/2010-03-31/");
+    f->open_object_section_in_ns("DeleteTopicResponse", AWS_SNS_NS);
     f->open_object_section("ResponseMetadata");
     encode_xml("RequestId", s->req_id, f); 
-    f->close_section();
-    f->close_section();
+    f->close_section(); // ResponseMetadata
+    f->close_section(); // DeleteTopicResponse
     rgw_flush_formatter_and_reset(s, f);
   }
 };
@@ -278,7 +345,7 @@ void update_attribute_map(const std::string& input, AttributeMap& map) {
 
 void RGWHandler_REST_PSTopic_AWS::rgw_topic_parse_input() {
   if (post_body.size() > 0) {
-    ldout(s->cct, 10) << "Content of POST: " << post_body << dendl;
+    ldpp_dout(s, 10) << "Content of POST: " << post_body << dendl;
 
     if (post_body.find("Action") != string::npos) {
       const boost::char_separator<char> sep("&");
@@ -299,7 +366,7 @@ void RGWHandler_REST_PSTopic_AWS::rgw_topic_parse_input() {
         }
       }
       // update the regular args with the content of the attribute map
-      for (const auto attr : map) {
+      for (const auto& attr : map) {
           s->info.args.append(attr.second.get_key(), attr.second.get_value());
       }
     }
@@ -321,17 +388,15 @@ RGWOp* RGWHandler_REST_PSTopic_AWS::op_post() {
       return new RGWPSListTopics_ObjStore_AWS();
     if (action.compare("GetTopic") == 0)
       return new RGWPSGetTopic_ObjStore_AWS();
+    if (action.compare("GetTopicAttributes") == 0)
+      return new RGWPSGetTopicAttributes_ObjStore_AWS();
   }
 
   return nullptr;
 }
 
-int RGWHandler_REST_PSTopic_AWS::authorize(const DoutPrefixProvider* dpp) {
-  /*if (s->info.args.exists("Action") && s->info.args.get("Action").find("Topic") != std::string::npos) {
-      // TODO: some topic specific authorization
-      return 0;
-  }*/
-  return RGW_Auth_S3::authorize(dpp, store, auth_registry, s);
+int RGWHandler_REST_PSTopic_AWS::authorize(const DoutPrefixProvider* dpp, optional_yield y) {
+  return RGW_Auth_S3::authorize(dpp, store, auth_registry, s, y);
 }
 
 
@@ -358,6 +423,50 @@ auto find_unique_topic(const rgw_pubsub_bucket_topics& bucket_topics, const std:
 }
 }
 
+int remove_notification_by_topic(const DoutPrefixProvider *dpp, const std::string& topic_name, const RGWPubSub::BucketRef& b, optional_yield y, RGWPubSub& ps) {
+  int op_ret = b->remove_notification(dpp, topic_name, y);
+  if (op_ret < 0) {
+    ldpp_dout(dpp, 1) << "failed to remove notification of topic '" << topic_name << "', ret=" << op_ret << dendl;
+  }
+  op_ret = ps.remove_topic(dpp, topic_name, y);
+  if (op_ret < 0) {
+    ldpp_dout(dpp, 1) << "failed to remove auto-generated topic '" << topic_name << "', ret=" << op_ret << dendl;
+  }
+  return op_ret;
+}
+
+int delete_all_notifications(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& bucket_topics, const RGWPubSub::BucketRef& b, optional_yield y, RGWPubSub& ps) {
+  // delete all notifications of on a bucket
+  for (const auto& topic : bucket_topics.topics) {
+    // remove the auto generated subscription of the topic (if exist)
+    rgw_pubsub_topic_subs topic_subs;
+    int op_ret = ps.get_topic(topic.first, &topic_subs);
+    for (const auto& topic_sub_name : topic_subs.subs) {
+      auto sub = ps.get_sub(topic_sub_name);
+      rgw_pubsub_sub_config sub_conf;
+      op_ret = sub->get_conf(&sub_conf);
+      if (op_ret < 0) {
+        ldpp_dout(dpp, 1) << "failed to get subscription '" << topic_sub_name << "' info, ret=" << op_ret << dendl;
+        return op_ret;
+      }
+      if (!sub_conf.s3_id.empty()) {
+        // S3 notification, has autogenerated subscription
+        const auto& sub_topic_name = sub_conf.topic;
+        op_ret = sub->unsubscribe(dpp, sub_topic_name, y);
+        if (op_ret < 0) {
+          ldpp_dout(dpp, 1) << "failed to remove auto-generated subscription '" << topic_sub_name << "', ret=" << op_ret << dendl;
+          return op_ret;
+        }
+      }
+    }
+    op_ret = remove_notification_by_topic(dpp, topic.first, b, y, ps);
+    if (op_ret < 0) {
+      return op_ret;
+    }
+  }
+  return 0;
+}
+
 // command (S3 compliant): PUT /<bucket name>?notification
 // a "notification" and a subscription will be auto-generated
 // actual configuration is XML encoded in the body of the message
@@ -368,32 +477,33 @@ class RGWPSCreateNotif_ObjStore_S3 : public RGWPSCreateNotifOp {
     const auto max_size = s->cct->_conf->rgw_max_put_param_size;
     int r;
     bufferlist data;
-    std::tie(r, data) = rgw_rest_read_all_input(s, max_size, false);
+    std::tie(r, data) = read_all_input(s, max_size, false);
 
     if (r < 0) {
-      ldout(s->cct, 1) << "failed to read XML payload" << dendl;
+      ldpp_dout(this, 1) << "failed to read XML payload" << dendl;
       return r;
     }
     if (data.length() == 0) {
-      ldout(s->cct, 1) << "XML payload missing" << dendl;
+      ldpp_dout(this, 1) << "XML payload missing" << dendl;
       return -EINVAL;
     }
 
     RGWXMLDecoder::XMLParser parser;
 
     if (!parser.init()){
-      ldout(s->cct, 1) << "failed to initialize XML parser" << dendl;
+      ldpp_dout(this, 1) << "failed to initialize XML parser" << dendl;
       return -EINVAL;
     }
     if (!parser.parse(data.c_str(), data.length(), 1)) {
-      ldout(s->cct, 1) << "failed to parse XML payload" << dendl;
+      ldpp_dout(this, 1) << "failed to parse XML payload" << dendl;
       return -ERR_MALFORMED_XML;
     }
     try {
       // NotificationConfigurations is mandatory
+      // It can be empty which means we delete all the notifications
       RGWXMLDecoder::decode_xml("NotificationConfiguration", configurations, &parser, true);
     } catch (RGWXMLDecoder::err& err) {
-      ldout(s->cct, 1) << "failed to parse XML payload. error: " << err << dendl;
+      ldpp_dout(this, 1) << "failed to parse XML payload. error: " << err << dendl;
       return -ERR_MALFORMED_XML;
     }
     return 0;
@@ -403,15 +513,15 @@ class RGWPSCreateNotif_ObjStore_S3 : public RGWPSCreateNotifOp {
     bool exists;
     const auto no_value = s->info.args.get("notification", &exists);
     if (!exists) {
-      ldout(s->cct, 1) << "missing required param 'notification'" << dendl;
+      ldpp_dout(this, 1) << "missing required param 'notification'" << dendl;
       return -EINVAL;
     } 
     if (no_value.length() > 0) {
-      ldout(s->cct, 1) << "param 'notification' should not have any value" << dendl;
+      ldpp_dout(this, 1) << "param 'notification' should not have any value" << dendl;
       return -EINVAL;
     }
     if (s->bucket_name.empty()) {
-      ldout(s->cct, 1) << "request must be on a bucket" << dendl;
+      ldpp_dout(this, 1) << "request must be on a bucket" << dendl;
       return -EINVAL;
     }
     bucket_name = s->bucket_name;
@@ -420,23 +530,24 @@ class RGWPSCreateNotif_ObjStore_S3 : public RGWPSCreateNotifOp {
 
 public:
   const char* name() const override { return "pubsub_notification_create_s3"; }
-  void execute() override;
+  void execute(optional_yield) override;
 };
 
-void RGWPSCreateNotif_ObjStore_S3::execute() {
+void RGWPSCreateNotif_ObjStore_S3::execute(optional_yield y) {
   op_ret = get_params_from_body();
   if (op_ret < 0) {
     return;
   }
 
-  ups.emplace(store, s->owner.get_id());
-  auto b = ups->get_bucket(bucket_info.bucket);
+  ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
+  auto b = ps->get_bucket(bucket_info.bucket);
   ceph_assert(b);
+
   std::string data_bucket_prefix = "";
   std::string data_oid_prefix = "";
   bool push_only = true;
-  if (store->getRados()->get_sync_module()) {
-    const auto psmodule = dynamic_cast<RGWPSSyncModuleInstance*>(store->getRados()->get_sync_module().get());
+  if (store->get_sync_module()) {
+    const auto psmodule = dynamic_cast<RGWPSSyncModuleInstance*>(store->get_sync_module().get());
     if (psmodule) {
       const auto& conf = psmodule->get_effective_conf();
       data_bucket_prefix = conf["data_bucket_prefix"];
@@ -446,28 +557,41 @@ void RGWPSCreateNotif_ObjStore_S3::execute() {
     }
   }
 
+  if(configurations.list.empty()) {
+    // get all topics on a bucket
+    rgw_pubsub_bucket_topics bucket_topics;
+    op_ret = b->get_topics(&bucket_topics);
+    if (op_ret < 0) {
+      ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << bucket_info.bucket.name << "', ret=" << op_ret << dendl;
+      return;
+    }
+
+    op_ret = delete_all_notifications(this, bucket_topics, b, y, *ps);
+    return;
+  }
+
   for (const auto& c : configurations.list) {
     const auto& notif_name = c.id;
     if (notif_name.empty()) {
-      ldout(s->cct, 1) << "missing notification id" << dendl;
+      ldpp_dout(this, 1) << "missing notification id" << dendl;
       op_ret = -EINVAL;
       return;
     }
     if (c.topic_arn.empty()) {
-      ldout(s->cct, 1) << "missing topic ARN in notification: '" << notif_name << "'" << dendl;
+      ldpp_dout(this, 1) << "missing topic ARN in notification: '" << notif_name << "'" << dendl;
       op_ret = -EINVAL;
       return;
     }
 
     const auto arn = rgw::ARN::parse(c.topic_arn);
     if (!arn || arn->resource.empty()) {
-      ldout(s->cct, 1) << "topic ARN has invalid format: '" << c.topic_arn << "' in notification: '" << notif_name << "'" << dendl;
+      ldpp_dout(this, 1) << "topic ARN has invalid format: '" << c.topic_arn << "' in notification: '" << notif_name << "'" << dendl;
       op_ret = -EINVAL;
       return;
     }
 
     if (std::find(c.events.begin(), c.events.end(), rgw::notify::UnknownEvent) != c.events.end()) {
-      ldout(s->cct, 1) << "unknown event type in notification: '" << notif_name << "'" << dendl;
+      ldpp_dout(this, 1) << "unknown event type in notification: '" << notif_name << "'" << dendl;
       op_ret = -EINVAL;
       return;
     }
@@ -476,9 +600,9 @@ void RGWPSCreateNotif_ObjStore_S3::execute() {
 
     // get topic information. destination information is stored in the topic
     rgw_pubsub_topic topic_info;  
-    op_ret = ups->get_topic(topic_name, &topic_info);
+    op_ret = ps->get_topic(topic_name, &topic_info);
     if (op_ret < 0) {
-      ldout(s->cct, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
+      ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
       return;
     }
     // make sure that full topic configuration match
@@ -491,41 +615,41 @@ void RGWPSCreateNotif_ObjStore_S3::execute() {
     // generate the internal topic. destination is stored here for the "push-only" case
     // when no subscription exists
     // ARN is cached to make the "GET" method faster
-    op_ret = ups->create_topic(unique_topic_name, topic_info.dest, topic_info.arn, topic_info.opaque_data);
+    op_ret = ps->create_topic(this, unique_topic_name, topic_info.dest, topic_info.arn, topic_info.opaque_data, y);
     if (op_ret < 0) {
-      ldout(s->cct, 1) << "failed to auto-generate unique topic '" << unique_topic_name << 
+      ldpp_dout(this, 1) << "failed to auto-generate unique topic '" << unique_topic_name << 
         "', ret=" << op_ret << dendl;
       return;
     }
-    ldout(s->cct, 20) << "successfully auto-generated unique topic '" << unique_topic_name << "'" << dendl;
+    ldpp_dout(this, 20) << "successfully auto-generated unique topic '" << unique_topic_name << "'" << dendl;
     // generate the notification
     rgw::notify::EventTypeList events;
-    op_ret = b->create_notification(unique_topic_name, c.events, std::make_optional(c.filter), notif_name);
+    op_ret = b->create_notification(this, unique_topic_name, c.events, std::make_optional(c.filter), notif_name, y);
     if (op_ret < 0) {
-      ldout(s->cct, 1) << "failed to auto-generate notification for unique topic '" << unique_topic_name <<
+      ldpp_dout(this, 1) << "failed to auto-generate notification for unique topic '" << unique_topic_name <<
         "', ret=" << op_ret << dendl;
       // rollback generated topic (ignore return value)
-      ups->remove_topic(unique_topic_name);
+      ps->remove_topic(this, unique_topic_name, y);
       return;
     }
-    ldout(s->cct, 20) << "successfully auto-generated notification for unique topic '" << unique_topic_name << "'" << dendl;
+    ldpp_dout(this, 20) << "successfully auto-generated notification for unique topic '" << unique_topic_name << "'" << dendl;
   
     if (!push_only) {
       // generate the subscription with destination information from the original topic
       rgw_pubsub_sub_dest dest = topic_info.dest;
       dest.bucket_name = data_bucket_prefix + s->owner.get_id().to_str() + "-" + unique_topic_name;
       dest.oid_prefix = data_oid_prefix + notif_name + "/";
-      auto sub = ups->get_sub(notif_name);
-      op_ret = sub->subscribe(unique_topic_name, dest, notif_name);
+      auto sub = ps->get_sub(notif_name);
+      op_ret = sub->subscribe(this, unique_topic_name, dest, y, notif_name);
       if (op_ret < 0) {
-        ldout(s->cct, 1) << "failed to auto-generate subscription '" << notif_name << "', ret=" << op_ret << dendl;
+        ldpp_dout(this, 1) << "failed to auto-generate subscription '" << notif_name << "', ret=" << op_ret << dendl;
         // rollback generated notification (ignore return value)
-        b->remove_notification(unique_topic_name);
+        b->remove_notification(this, unique_topic_name, y);
         // rollback generated topic (ignore return value)
-        ups->remove_topic(unique_topic_name);
+        ps->remove_topic(this, unique_topic_name, y);
         return;
       }
-      ldout(s->cct, 20) << "successfully auto-generated subscription '" << notif_name << "'" << dendl;
+      ldpp_dout(this, 20) << "successfully auto-generated subscription '" << notif_name << "'" << dendl;
     }
   }
 }
@@ -539,48 +663,37 @@ private:
     bool exists;
     notif_name = s->info.args.get("notification", &exists);
     if (!exists) {
-      ldout(s->cct, 1) << "missing required param 'notification'" << dendl;
+      ldpp_dout(this, 1) << "missing required param 'notification'" << dendl;
       return -EINVAL;
     } 
     if (s->bucket_name.empty()) {
-      ldout(s->cct, 1) << "request must be on a bucket" << dendl;
+      ldpp_dout(this, 1) << "request must be on a bucket" << dendl;
       return -EINVAL;
     }
     bucket_name = s->bucket_name;
     return 0;
   }
 
-  void remove_notification_by_topic(const std::string& topic_name, const RGWUserPubSub::BucketRef& b) {
-    op_ret = b->remove_notification(topic_name);
-    if (op_ret < 0) {
-      ldout(s->cct, 1) << "failed to remove notification of topic '" << topic_name << "', ret=" << op_ret << dendl;
-    }
-    op_ret = ups->remove_topic(topic_name);
-    if (op_ret < 0) {
-      ldout(s->cct, 1) << "failed to remove auto-generated topic '" << topic_name << "', ret=" << op_ret << dendl;
-    }
-  }
-
 public:
-  void execute() override;
+  void execute(optional_yield y) override;
   const char* name() const override { return "pubsub_notification_delete_s3"; }
 };
 
-void RGWPSDeleteNotif_ObjStore_S3::execute() {
+void RGWPSDeleteNotif_ObjStore_S3::execute(optional_yield y) {
   op_ret = get_params();
   if (op_ret < 0) {
     return;
   }
 
-  ups.emplace(store, s->owner.get_id());
-  auto b = ups->get_bucket(bucket_info.bucket);
+  ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
+  auto b = ps->get_bucket(bucket_info.bucket);
   ceph_assert(b);
 
   // get all topics on a bucket
   rgw_pubsub_bucket_topics bucket_topics;
   op_ret = b->get_topics(&bucket_topics);
   if (op_ret < 0) {
-    ldout(s->cct, 1) << "failed to get list of topics from bucket '" << bucket_info.bucket.name << "', ret=" << op_ret << dendl;
+    ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << bucket_info.bucket.name << "', ret=" << op_ret << dendl;
     return;
   }
 
@@ -590,45 +703,21 @@ void RGWPSDeleteNotif_ObjStore_S3::execute() {
     if (unique_topic) {
       // remove the auto generated subscription according to notification name (if exist)
       const auto unique_topic_name = unique_topic->get().topic.name;
-      auto sub = ups->get_sub(notif_name);
-      op_ret = sub->unsubscribe(unique_topic_name);
+      auto sub = ps->get_sub(notif_name);
+      op_ret = sub->unsubscribe(this, unique_topic_name, y);
       if (op_ret < 0 && op_ret != -ENOENT) {
-        ldout(s->cct, 1) << "failed to remove auto-generated subscription '" << notif_name << "', ret=" << op_ret << dendl;
+        ldpp_dout(this, 1) << "failed to remove auto-generated subscription '" << notif_name << "', ret=" << op_ret << dendl;
         return;
       }
-      remove_notification_by_topic(unique_topic_name, b);
+      op_ret = remove_notification_by_topic(this, unique_topic_name, b, y, *ps);
       return;
     }
     // notification to be removed is not found - considered success
-    ldout(s->cct, 20) << "notification '" << notif_name << "' already removed" << dendl;
+    ldpp_dout(this, 20) << "notification '" << notif_name << "' already removed" << dendl;
     return;
   }
 
-  // delete all notification of on a bucket
-  for (const auto& topic : bucket_topics.topics) {
-    // remove the auto generated subscription of the topic (if exist)
-    rgw_pubsub_topic_subs topic_subs;
-    op_ret = ups->get_topic(topic.first, &topic_subs);
-    for (const auto& topic_sub_name : topic_subs.subs) {
-      auto sub = ups->get_sub(topic_sub_name);
-      rgw_pubsub_sub_config sub_conf;
-      op_ret = sub->get_conf(&sub_conf);
-      if (op_ret < 0) {
-        ldout(s->cct, 1) << "failed to get subscription '" << topic_sub_name << "' info, ret=" << op_ret << dendl;
-        return;
-      }
-      if (!sub_conf.s3_id.empty()) {
-        // S3 notification, has autogenerated subscription
-        const auto& sub_topic_name = sub_conf.topic;
-        op_ret = sub->unsubscribe(sub_topic_name);
-        if (op_ret < 0) {
-          ldout(s->cct, 1) << "failed to remove auto-generated subscription '" << topic_sub_name << "', ret=" << op_ret << dendl;
-          return;
-        }
-      }
-    }
-    remove_notification_by_topic(topic.first, b);
-  }
+  op_ret = delete_all_notifications(this, bucket_topics, b, y, *ps);
 }
 
 // command (S3 compliant): GET /bucket?notification[=<notification-id>]
@@ -641,11 +730,11 @@ private:
     bool exists;
     notif_name = s->info.args.get("notification", &exists);
     if (!exists) {
-      ldout(s->cct, 1) << "missing required param 'notification'" << dendl;
+      ldpp_dout(this, 1) << "missing required param 'notification'" << dendl;
       return -EINVAL;
     } 
     if (s->bucket_name.empty()) {
-      ldout(s->cct, 1) << "request must be on a bucket" << dendl;
+      ldpp_dout(this, 1) << "request must be on a bucket" << dendl;
       return -EINVAL;
     }
     bucket_name = s->bucket_name;
@@ -653,7 +742,7 @@ private:
   }
 
 public:
-  void execute() override;
+  void execute(optional_yield y) override;
   void send_response() override {
     if (op_ret) {
       set_req_state_err(s, op_ret);
@@ -670,16 +759,16 @@ public:
   const char* name() const override { return "pubsub_notifications_get_s3"; }
 };
 
-void RGWPSListNotifs_ObjStore_S3::execute() {
-  ups.emplace(store, s->owner.get_id());
-  auto b = ups->get_bucket(bucket_info.bucket);
+void RGWPSListNotifs_ObjStore_S3::execute(optional_yield y) {
+  ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
+  auto b = ps->get_bucket(bucket_info.bucket);
   ceph_assert(b);
   
   // get all topics on a bucket
   rgw_pubsub_bucket_topics bucket_topics;
   op_ret = b->get_topics(&bucket_topics);
   if (op_ret < 0) {
-    ldout(s->cct, 1) << "failed to get list of topics from bucket '" << bucket_info.bucket.name << "', ret=" << op_ret << dendl;
+    ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << bucket_info.bucket.name << "', ret=" << op_ret << dendl;
     return;
   }
   if (!notif_name.empty()) {
@@ -690,7 +779,7 @@ void RGWPSListNotifs_ObjStore_S3::execute() {
       return;
     }
     op_ret = -ENOENT;
-    ldout(s->cct, 1) << "failed to get notification info for '" << notif_name << "', ret=" << op_ret << dendl;
+    ldpp_dout(this, 1) << "failed to get notification info for '" << notif_name << "', ret=" << op_ret << dendl;
     return;
   }
   // loop through all topics of the bucket