]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_rest_pubsub.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rgw / rgw_rest_pubsub.cc
index c20cbe7c2333e14f8712f492a474d1a4a5048f2e..5ab22e63775effc201628946a3a799f81b0c65c9 100644 (file)
@@ -4,33 +4,80 @@
 #include <algorithm>
 #include <boost/tokenizer.hpp>
 #include <optional>
-#include "rgw_rest_pubsub_common.h"
 #include "rgw_rest_pubsub.h"
 #include "rgw_pubsub_push.h"
 #include "rgw_pubsub.h"
-#include "rgw_sync_module_pubsub.h"
 #include "rgw_op.h"
 #include "rgw_rest.h"
 #include "rgw_rest_s3.h"
 #include "rgw_arn.h"
 #include "rgw_auth_s3.h"
 #include "rgw_notify.h"
-#include "rgw_sal_rados.h"
 #include "services/svc_zone.h"
+#include "common/dout.h"
+#include "rgw_url.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rgw
 
-using namespace std;
-
 static const char* AWS_SNS_NS("https://sns.amazonaws.com/doc/2010-03-31/");
 
+bool verify_transport_security(CephContext *cct, const RGWEnv& env) {
+  const auto is_secure = rgw_transport_is_secure(cct, env);
+  if (!is_secure && g_conf().get_val<bool>("rgw_allow_notification_secrets_in_cleartext")) {
+    ldout(cct, 0) << "WARNING: bypassing endpoint validation, allows sending secrets over insecure transport" << dendl;
+    return true;
+  }
+  return is_secure;
+}
+
+// make sure that endpoint is a valid URL
+// make sure that if user/password are passed inside URL, it is over secure connection
+// update rgw_pubsub_dest to indicate that a password is stored in the URL
+bool validate_and_update_endpoint_secret(rgw_pubsub_dest& dest, CephContext *cct, const RGWEnv& env) {
+  if (dest.push_endpoint.empty()) {
+      return true;
+  }
+  std::string user;
+  std::string password;
+  if (!rgw::parse_url_userinfo(dest.push_endpoint, user, password)) {
+    ldout(cct, 1) << "endpoint validation error: malformed endpoint URL:" << dest.push_endpoint << dendl;
+    return false;
+  }
+  // this should be verified inside parse_url()
+  ceph_assert(user.empty() == password.empty());
+  if (!user.empty()) {
+      dest.stored_secret = true;
+      if (!verify_transport_security(cct, env)) {
+        ldout(cct, 1) << "endpoint validation error: sending secrets over insecure transport" << dendl;
+        return false;
+      }
+  }
+  return true;
+}
+
+bool topic_has_endpoint_secret(const rgw_pubsub_topic& topic) {
+    return topic.dest.stored_secret;
+}
+
+bool topics_has_endpoint_secret(const rgw_pubsub_topics& topics) {
+    for (const auto& topic : topics.topics) {
+        if (topic_has_endpoint_secret(topic.second)) return true;
+    }
+    return false;
+}
+
 // command (AWS compliant): 
 // POST
 // Action=CreateTopic&Name=<topic-name>[&OpaqueData=data][&push-endpoint=<endpoint>[&persistent][&<arg1>=<value1>]]
-class RGWPSCreateTopic_ObjStore_AWS : public RGWPSCreateTopicOp {
-public:
-  int get_params() override {
+class RGWPSCreateTopicOp : public RGWOp {
+  private:
+  std::string topic_name;
+  rgw_pubsub_dest dest;
+  std::string topic_arn;
+  std::string opaque_data;
+  
+  int get_params() {
     topic_name = s->info.args.get("Name");
     if (topic_name.empty()) {
       ldpp_dout(this, 1) << "CreateTopic Action 'Name' argument is missing" << dendl;
@@ -65,18 +112,29 @@ public:
     }
     
     // dest object only stores endpoint info
-    // bucket to store events/records will be set only when subscription is created
-    dest.bucket_name = "";
-    dest.oid_prefix = "";
     dest.arn_topic = topic_name;
     // the topic ARN will be sent in the reply
     const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns, 
-        store->get_zone()->get_zonegroup().get_name(),
+        driver->get_zone()->get_zonegroup().get_name(),
         s->user->get_tenant(), topic_name);
     topic_arn = arn.to_string();
     return 0;
   }
 
+  public:
+  int verify_permission(optional_yield) override {
+    return 0;
+  }
+
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+  void execute(optional_yield) override;
+
+  const char* name() const override { return "pubsub_topic_create"; }
+  RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_CREATE; }
+  uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
+
   void send_response() override {
     if (op_ret) {
       set_req_state_err(s, op_ret);
@@ -101,11 +159,41 @@ public:
   }
 };
 
+void RGWPSCreateTopicOp::execute(optional_yield y) {
+  op_ret = get_params();
+  if (op_ret < 0) {
+    return;
+  }
+
+  const RGWPubSub ps(driver, s->owner.get_id().tenant);
+  op_ret = ps.create_topic(this, topic_name, dest, topic_arn, opaque_data, y);
+  if (op_ret < 0) {
+    ldpp_dout(this, 1) << "failed to create topic '" << topic_name << "', ret=" << op_ret << dendl;
+    return;
+  }
+  ldpp_dout(this, 20) << "successfully created topic '" << topic_name << "'" << dendl;
+}
+
 // command (AWS compliant): 
 // POST 
 // Action=ListTopics
-class RGWPSListTopics_ObjStore_AWS : public RGWPSListTopicsOp {
+class RGWPSListTopicsOp : public RGWOp {
+private:
+  rgw_pubsub_topics result;
+
 public:
+  int verify_permission(optional_yield) override {
+    return 0;
+  }
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+  void execute(optional_yield) override;
+
+  const char* name() const override { return "pubsub_topics_list"; }
+  RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPICS_LIST; }
+  uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+
   void send_response() override {
     if (op_ret) {
       set_req_state_err(s, op_ret);
@@ -130,12 +218,32 @@ public:
   }
 };
 
+void RGWPSListTopicsOp::execute(optional_yield y) {
+  const RGWPubSub ps(driver, s->owner.get_id().tenant);
+  op_ret = ps.get_topics(this, result, y);
+  // if there are no topics it is not considered an error
+  op_ret = op_ret == -ENOENT ? 0 : op_ret;
+  if (op_ret < 0) {
+    ldpp_dout(this, 1) << "failed to get topics, ret=" << op_ret << dendl;
+    return;
+  }
+  if (topics_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
+    ldpp_dout(this, 1) << "topics contain secrets and cannot be sent over insecure transport" << dendl;
+    op_ret = -EPERM;
+    return;
+  }
+  ldpp_dout(this, 20) << "successfully got topics" << dendl;
+}
+
 // command (extension to AWS): 
 // POST
 // Action=GetTopic&TopicArn=<topic-arn>
-class RGWPSGetTopic_ObjStore_AWS : public RGWPSGetTopicOp {
-public:
-  int get_params() override {
+class RGWPSGetTopicOp : public RGWOp {
+  private:
+  std::string topic_name;
+  rgw_pubsub_topic result;
+  
+  int get_params() {
     const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
 
     if (!topic_arn || topic_arn->resource.empty()) {
@@ -147,6 +255,19 @@ public:
     return 0;
   }
 
+  public:
+  int verify_permission(optional_yield y) override {
+    return 0;
+  }
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+  void execute(optional_yield y) override;
+
+  const char* name() const override { return "pubsub_topic_get"; }
+  RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_GET; }
+  uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+
   void send_response() override {
     if (op_ret) {
       set_req_state_err(s, op_ret);
@@ -161,7 +282,7 @@ public:
     const auto f = s->formatter;
     f->open_object_section("GetTopicResponse");
     f->open_object_section("GetTopicResult");
-    encode_xml("Topic", result.topic, f); 
+    encode_xml("Topic", result, f); 
     f->close_section();
     f->open_object_section("ResponseMetadata");
     encode_xml("RequestId", s->req_id, f); 
@@ -171,12 +292,34 @@ public:
   }
 };
 
+void RGWPSGetTopicOp::execute(optional_yield y) {
+  op_ret = get_params();
+  if (op_ret < 0) {
+    return;
+  }
+  const RGWPubSub ps(driver, s->owner.get_id().tenant);
+  op_ret = ps.get_topic(this, topic_name, result, y);
+  if (op_ret < 0) {
+    ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
+    return;
+  }
+  if (topic_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
+    ldpp_dout(this, 1) << "topic '" << topic_name << "' contain secret and cannot be sent over insecure transport" << dendl;
+    op_ret = -EPERM;
+    return;
+  }
+  ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl;
+}
+
 // command (AWS compliant): 
 // POST
 // Action=GetTopicAttributes&TopicArn=<topic-arn>
-class RGWPSGetTopicAttributes_ObjStore_AWS : public RGWPSGetTopicOp {
-public:
-  int get_params() override {
+class RGWPSGetTopicAttributesOp : public RGWOp {
+  private:
+  std::string topic_name;
+  rgw_pubsub_topic result;
+  
+  int get_params() {
     const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
 
     if (!topic_arn || topic_arn->resource.empty()) {
@@ -188,6 +331,19 @@ public:
     return 0;
   }
 
+  public:
+  int verify_permission(optional_yield y) override {
+    return 0;
+  }
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+  void execute(optional_yield y) override;
+
+  const char* name() const override { return "pubsub_topic_get"; }
+  RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_GET; }
+  uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+
   void send_response() override {
     if (op_ret) {
       set_req_state_err(s, op_ret);
@@ -202,7 +358,7 @@ public:
     const auto f = s->formatter;
     f->open_object_section_in_ns("GetTopicAttributesResponse", AWS_SNS_NS);
     f->open_object_section("GetTopicAttributesResult");
-    result.topic.dump_xml_as_attributes(f);
+    result.dump_xml_as_attributes(f);
     f->close_section(); // GetTopicAttributesResult
     f->open_object_section("ResponseMetadata");
     encode_xml("RequestId", s->req_id, f); 
@@ -212,12 +368,33 @@ public:
   }
 };
 
+void RGWPSGetTopicAttributesOp::execute(optional_yield y) {
+  op_ret = get_params();
+  if (op_ret < 0) {
+    return;
+  }
+  const RGWPubSub ps(driver, s->owner.get_id().tenant);
+  op_ret = ps.get_topic(this, topic_name, result, y);
+  if (op_ret < 0) {
+    ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
+    return;
+  }
+  if (topic_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
+    ldpp_dout(this, 1) << "topic '" << topic_name << "' contain secret and cannot be sent over insecure transport" << dendl;
+    op_ret = -EPERM;
+    return;
+  }
+  ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl;
+}
+
 // command (AWS compliant): 
 // POST
 // Action=DeleteTopic&TopicArn=<topic-arn>
-class RGWPSDeleteTopic_ObjStore_AWS : public RGWPSDeleteTopicOp {
-public:
-  int get_params() override {
+class RGWPSDeleteTopicOp : public RGWOp {
+  private:
+  std::string topic_name;
+  
+  int get_params() {
     const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
 
     if (!topic_arn || topic_arn->resource.empty()) {
@@ -241,7 +418,20 @@ public:
 
     return 0;
   }
-  
+
+  public:
+  int verify_permission(optional_yield) override {
+    return 0;
+  }
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+  void execute(optional_yield y) override;
+
+  const char* name() const override { return "pubsub_topic_delete"; }
+  RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_DELETE; }
+  uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
+
   void send_response() override {
     if (op_ret) {
       set_req_state_err(s, op_ret);
@@ -263,143 +453,68 @@ public:
   }
 };
 
-namespace {
-// utility classes and functions for handling parameters with the following format:
-// Attributes.entry.{N}.{key|value}={VALUE}
-// N - any unsigned number
-// VALUE - url encoded string
-
-// and Attribute is holding key and value
-// ctor and set are done according to the "type" argument
-// if type is not "key" or "value" its a no-op
-class Attribute {
-  std::string key;
-  std::string value;
-public:
-  Attribute(const std::string& type, const std::string& key_or_value) {
-    set(type, key_or_value);
-  }
-  void set(const std::string& type, const std::string& key_or_value) {
-    if (type == "key") {
-      key = key_or_value;
-    } else if (type == "value") {
-      value = key_or_value;
-    }
-  }
-  const std::string& get_key() const { return key; }
-  const std::string& get_value() const { return value; }
-};
-
-using AttributeMap = std::map<unsigned, Attribute>;
-
-// aggregate the attributes into a map
-// the key and value are associated by the index (N)
-// no assumptions are made on the order in which these parameters are added
-void update_attribute_map(const std::string& input, AttributeMap& map) {
-  const boost::char_separator<char> sep(".");
-  const boost::tokenizer tokens(input, sep);
-  auto token = tokens.begin();
-  if (*token != "Attributes") {
-      return;
-  }
-  ++token;
-
-  if (*token != "entry") {
-      return;
-  }
-  ++token;
-
-  unsigned idx;
-  try {
-    idx = std::stoul(*token);
-  } catch (const std::invalid_argument&) {
+void RGWPSDeleteTopicOp::execute(optional_yield y) {
+  op_ret = get_params();
+  if (op_ret < 0) {
     return;
   }
-  ++token;
-
-  std::string key_or_value = "";
-  // get the rest of the string regardless of dots
-  // this is to allow dots in the value
-  while (token != tokens.end()) {
-    key_or_value.append(*token+".");
-    ++token;
-  }
-  // remove last separator
-  key_or_value.pop_back();
-
-  auto pos = key_or_value.find("=");
-  if (pos != string::npos) {
-    const auto key_or_value_lhs = key_or_value.substr(0, pos);
-    const auto key_or_value_rhs = url_decode(key_or_value.substr(pos + 1, key_or_value.size() - 1));
-    const auto map_it = map.find(idx);
-    if (map_it == map.end()) {
-      // new entry
-      map.emplace(std::make_pair(idx, Attribute(key_or_value_lhs, key_or_value_rhs)));
-    } else {
-      // existing entry
-      map_it->second.set(key_or_value_lhs, key_or_value_rhs);
-    }
+  const RGWPubSub ps(driver, s->owner.get_id().tenant);
+  op_ret = ps.remove_topic(this, topic_name, y);
+  if (op_ret < 0) {
+    ldpp_dout(this, 1) << "failed to remove topic '" << topic_name << ", ret=" << op_ret << dendl;
+    return;
   }
-}
+  ldpp_dout(this, 1) << "successfully removed topic '" << topic_name << "'" << dendl;
 }
 
-void RGWHandler_REST_PSTopic_AWS::rgw_topic_parse_input() {
-  if (post_body.size() > 0) {
-    ldpp_dout(s, 10) << "Content of POST: " << post_body << dendl;
-
-    if (post_body.find("Action") != string::npos) {
-      const boost::char_separator<char> sep("&");
-      const boost::tokenizer<boost::char_separator<char>> tokens(post_body, sep);
-      AttributeMap map;
-      for (const auto& t : tokens) {
-        auto pos = t.find("=");
-        if (pos != string::npos) {
-          const auto key = t.substr(0, pos);
-          if (key == "Action") {
-            s->info.args.append(key, t.substr(pos + 1, t.size() - 1));
-          } else if (key == "Name" || key == "TopicArn") {
-            const auto value = url_decode(t.substr(pos + 1, t.size() - 1));
-            s->info.args.append(key, value);
-          } else {
-            update_attribute_map(t, map);
-          }
-        }
-      }
-      // update the regular args with the content of the attribute map
-      for (const auto& attr : map) {
-          s->info.args.append(attr.second.get_key(), attr.second.get_value());
-      }
-    }
-    const auto payload_hash = rgw::auth::s3::calc_v4_payload_hash(post_body);
-    s->info.args.append("PayloadHash", payload_hash);
+using op_generator = RGWOp*(*)();
+static const std::unordered_map<std::string, op_generator> op_generators = {
+  {"CreateTopic", []() -> RGWOp* {return new RGWPSCreateTopicOp;}},
+  {"DeleteTopic", []() -> RGWOp* {return new RGWPSDeleteTopicOp;}},
+  {"ListTopics", []() -> RGWOp* {return new RGWPSListTopicsOp;}},
+  {"GetTopic", []() -> RGWOp* {return new RGWPSGetTopicOp;}},
+  {"GetTopicAttributes", []() -> RGWOp* {return new RGWPSGetTopicAttributesOp;}}
+};
+
+bool RGWHandler_REST_PSTopic_AWS::action_exists(const req_state* s) 
+{
+  if (s->info.args.exists("Action")) {
+    const std::string action_name = s->info.args.get("Action");
+    return op_generators.contains(action_name);
   }
+  return false;
 }
 
-RGWOp* RGWHandler_REST_PSTopic_AWS::op_post() {
-  rgw_topic_parse_input();
+RGWOp *RGWHandler_REST_PSTopic_AWS::op_post()
+{
+  s->dialect = "sns";
+  s->prot_flags = RGW_REST_STS;
 
   if (s->info.args.exists("Action")) {
-    const auto action = s->info.args.get("Action");
-    if (action.compare("CreateTopic") == 0)
-      return new RGWPSCreateTopic_ObjStore_AWS();
-    if (action.compare("DeleteTopic") == 0)
-      return new RGWPSDeleteTopic_ObjStore_AWS;
-    if (action.compare("ListTopics") == 0)
-      return new RGWPSListTopics_ObjStore_AWS();
-    if (action.compare("GetTopic") == 0)
-      return new RGWPSGetTopic_ObjStore_AWS();
-    if (action.compare("GetTopicAttributes") == 0)
-      return new RGWPSGetTopicAttributes_ObjStore_AWS();
+    const std::string action_name = s->info.args.get("Action");
+    const auto action_it = op_generators.find(action_name);
+    if (action_it != op_generators.end()) {
+      return action_it->second();
+    }
+    ldpp_dout(s, 10) << "unknown action '" << action_name << "' for Topic handler" << dendl;
+  } else {
+    ldpp_dout(s, 10) << "missing action argument in Topic handler" << dendl;
   }
-
   return nullptr;
 }
 
 int RGWHandler_REST_PSTopic_AWS::authorize(const DoutPrefixProvider* dpp, optional_yield y) {
-  return RGW_Auth_S3::authorize(dpp, store, auth_registry, s, y);
+  const auto rc = RGW_Auth_S3::authorize(dpp, driver, auth_registry, s, y);
+  if (rc < 0) {
+    return rc;
+  }
+  if (s->auth.identity->is_anonymous()) {
+    ldpp_dout(dpp, 1) << "anonymous user not allowed in topic operations" << dendl;
+    return -ERR_INVALID_REQUEST;
+  }
+  return 0;
 }
 
-
 namespace {
 // return a unique topic by prefexing with the notification name: <notification>_<topic>
 std::string topic_to_unique(const std::string& topic, const std::string& notification) {
@@ -408,7 +523,7 @@ std::string topic_to_unique(const std::string& topic, const std::string& notific
 
 // extract the topic from a unique topic of the form: <notification>_<topic>
 [[maybe_unused]] std::string unique_to_topic(const std::string& unique_topic, const std::string& notification) {
-  if (unique_topic.find(notification + "_") == string::npos) {
+  if (unique_topic.find(notification + "_") == std::string::npos) {
     return "";
   }
   return unique_topic.substr(notification.length() + 1);
@@ -423,8 +538,8 @@ auto find_unique_topic(const rgw_pubsub_bucket_topics& bucket_topics, const std:
 }
 }
 
-int remove_notification_by_topic(const DoutPrefixProvider *dpp, const std::string& topic_name, const RGWPubSub::BucketRef& b, optional_yield y, RGWPubSub& ps) {
-  int op_ret = b->remove_notification(dpp, topic_name, y);
+int remove_notification_by_topic(const DoutPrefixProvider *dpp, const std::string& topic_name, const RGWPubSub::Bucket& b, optional_yield y, const RGWPubSub& ps) {
+  int op_ret = b.remove_notification(dpp, topic_name, y);
   if (op_ret < 0) {
     ldpp_dout(dpp, 1) << "failed to remove notification of topic '" << topic_name << "', ret=" << op_ret << dendl;
   }
@@ -435,31 +550,10 @@ int remove_notification_by_topic(const DoutPrefixProvider *dpp, const std::strin
   return op_ret;
 }
 
-int delete_all_notifications(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& bucket_topics, const RGWPubSub::BucketRef& b, optional_yield y, RGWPubSub& ps) {
+int delete_all_notifications(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& bucket_topics, const RGWPubSub::Bucket& b, optional_yield y, const RGWPubSub& ps) {
   // delete all notifications of on a bucket
   for (const auto& topic : bucket_topics.topics) {
-    // remove the auto generated subscription of the topic (if exist)
-    rgw_pubsub_topic_subs topic_subs;
-    int op_ret = ps.get_topic(topic.first, &topic_subs);
-    for (const auto& topic_sub_name : topic_subs.subs) {
-      auto sub = ps.get_sub(topic_sub_name);
-      rgw_pubsub_sub_config sub_conf;
-      op_ret = sub->get_conf(&sub_conf);
-      if (op_ret < 0) {
-        ldpp_dout(dpp, 1) << "failed to get subscription '" << topic_sub_name << "' info, ret=" << op_ret << dendl;
-        return op_ret;
-      }
-      if (!sub_conf.s3_id.empty()) {
-        // S3 notification, has autogenerated subscription
-        const auto& sub_topic_name = sub_conf.topic;
-        op_ret = sub->unsubscribe(dpp, sub_topic_name, y);
-        if (op_ret < 0) {
-          ldpp_dout(dpp, 1) << "failed to remove auto-generated subscription '" << topic_sub_name << "', ret=" << op_ret << dendl;
-          return op_ret;
-        }
-      }
-    }
-    op_ret = remove_notification_by_topic(dpp, topic.first, b, y, ps);
+    const auto op_ret = remove_notification_by_topic(dpp, topic.first, b, y, ps);
     if (op_ret < 0) {
       return op_ret;
     }
@@ -470,10 +564,26 @@ int delete_all_notifications(const DoutPrefixProvider *dpp, const rgw_pubsub_buc
 // command (S3 compliant): PUT /<bucket name>?notification
 // a "notification" and a subscription will be auto-generated
 // actual configuration is XML encoded in the body of the message
-class RGWPSCreateNotif_ObjStore_S3 : public RGWPSCreateNotifOp {
-  rgw_pubsub_s3_notifications configurations;
+class RGWPSCreateNotifOp : public RGWDefaultResponseOp {
+  int verify_params() override {
+    bool exists;
+    const auto no_value = s->info.args.get("notification", &exists);
+    if (!exists) {
+      ldpp_dout(this, 1) << "missing required param 'notification'" << dendl;
+      return -EINVAL;
+    } 
+    if (no_value.length() > 0) {
+      ldpp_dout(this, 1) << "param 'notification' should not have any value" << dendl;
+      return -EINVAL;
+    }
+    if (s->bucket_name.empty()) {
+      ldpp_dout(this, 1) << "request must be on a bucket" << dendl;
+      return -EINVAL;
+    }
+    return 0;
+  }
 
-  int get_params_from_body() {
+  int get_params_from_body(rgw_pubsub_s3_notifications& configurations) {
     const auto max_size = s->cct->_conf->rgw_max_put_param_size;
     int r;
     bufferlist data;
@@ -508,65 +618,54 @@ class RGWPSCreateNotif_ObjStore_S3 : public RGWPSCreateNotifOp {
     }
     return 0;
   }
+public:
+  int verify_permission(optional_yield y) override;
 
-  int get_params() override {
-    bool exists;
-    const auto no_value = s->info.args.get("notification", &exists);
-    if (!exists) {
-      ldpp_dout(this, 1) << "missing required param 'notification'" << dendl;
-      return -EINVAL;
-    } 
-    if (no_value.length() > 0) {
-      ldpp_dout(this, 1) << "param 'notification' should not have any value" << dendl;
-      return -EINVAL;
-    }
-    if (s->bucket_name.empty()) {
-      ldpp_dout(this, 1) << "request must be on a bucket" << dendl;
-      return -EINVAL;
-    }
-    bucket_name = s->bucket_name;
-    return 0;
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
   }
 
-public:
   const char* name() const override { return "pubsub_notification_create_s3"; }
+  RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_CREATE; }
+  uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
+
+
   void execute(optional_yield) override;
 };
 
-void RGWPSCreateNotif_ObjStore_S3::execute(optional_yield y) {
-  op_ret = get_params_from_body();
+void RGWPSCreateNotifOp::execute(optional_yield y) {
+  op_ret = verify_params();
   if (op_ret < 0) {
     return;
   }
 
-  ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
-  auto b = ps->get_bucket(bucket_info.bucket);
-  ceph_assert(b);
+  rgw_pubsub_s3_notifications configurations;
+  op_ret = get_params_from_body(configurations);
+  if (op_ret < 0) {
+    return;
+  }
 
-  std::string data_bucket_prefix = "";
-  std::string data_oid_prefix = "";
-  bool push_only = true;
-  if (store->get_sync_module()) {
-    const auto psmodule = dynamic_cast<RGWPSSyncModuleInstance*>(store->get_sync_module().get());
-    if (psmodule) {
-      const auto& conf = psmodule->get_effective_conf();
-      data_bucket_prefix = conf["data_bucket_prefix"];
-      data_oid_prefix = conf["data_oid_prefix"];
-      // TODO: allow "push-only" on PS zone as well
-      push_only = false;
-    }
+  std::unique_ptr<rgw::sal::User> user = driver->get_user(s->owner.get_id());
+  std::unique_ptr<rgw::sal::Bucket> bucket;
+  op_ret = driver->get_bucket(this, user.get(), s->owner.get_id().tenant, s->bucket_name, &bucket, y);
+  if (op_ret < 0) {
+    ldpp_dout(this, 1) << "failed to get bucket '" << s->bucket_name << "' info, ret = " << op_ret << dendl;
+    return;
   }
 
+  const RGWPubSub ps(driver, s->owner.get_id().tenant);
+  const RGWPubSub::Bucket b(ps, bucket.get());
+
   if(configurations.list.empty()) {
     // get all topics on a bucket
     rgw_pubsub_bucket_topics bucket_topics;
-    op_ret = b->get_topics(&bucket_topics);
+    op_ret = b.get_topics(this, bucket_topics, y);
     if (op_ret < 0) {
-      ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << bucket_info.bucket.name << "', ret=" << op_ret << dendl;
+      ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << s->bucket_name << "', ret=" << op_ret << dendl;
       return;
     }
 
-    op_ret = delete_all_notifications(this, bucket_topics, b, y, *ps);
+    op_ret = delete_all_notifications(this, bucket_topics, b, y, ps);
     return;
   }
 
@@ -600,7 +699,7 @@ void RGWPSCreateNotif_ObjStore_S3::execute(optional_yield y) {
 
     // get topic information. destination information is stored in the topic
     rgw_pubsub_topic topic_info;  
-    op_ret = ps->get_topic(topic_name, &topic_info);
+    op_ret = ps.get_topic(this, topic_name, topic_info, y);
     if (op_ret < 0) {
       ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
       return;
@@ -615,7 +714,7 @@ void RGWPSCreateNotif_ObjStore_S3::execute(optional_yield y) {
     // generate the internal topic. destination is stored here for the "push-only" case
     // when no subscription exists
     // ARN is cached to make the "GET" method faster
-    op_ret = ps->create_topic(this, unique_topic_name, topic_info.dest, topic_info.arn, topic_info.opaque_data, y);
+    op_ret = ps.create_topic(this, unique_topic_name, topic_info.dest, topic_info.arn, topic_info.opaque_data, y);
     if (op_ret < 0) {
       ldpp_dout(this, 1) << "failed to auto-generate unique topic '" << unique_topic_name << 
         "', ret=" << op_ret << dendl;
@@ -624,42 +723,29 @@ void RGWPSCreateNotif_ObjStore_S3::execute(optional_yield y) {
     ldpp_dout(this, 20) << "successfully auto-generated unique topic '" << unique_topic_name << "'" << dendl;
     // generate the notification
     rgw::notify::EventTypeList events;
-    op_ret = b->create_notification(this, unique_topic_name, c.events, std::make_optional(c.filter), notif_name, y);
+    op_ret = b.create_notification(this, unique_topic_name, c.events, std::make_optional(c.filter), notif_name, y);
     if (op_ret < 0) {
       ldpp_dout(this, 1) << "failed to auto-generate notification for unique topic '" << unique_topic_name <<
         "', ret=" << op_ret << dendl;
       // rollback generated topic (ignore return value)
-      ps->remove_topic(this, unique_topic_name, y);
+      ps.remove_topic(this, unique_topic_name, y);
       return;
     }
     ldpp_dout(this, 20) << "successfully auto-generated notification for unique topic '" << unique_topic_name << "'" << dendl;
-  
-    if (!push_only) {
-      // generate the subscription with destination information from the original topic
-      rgw_pubsub_sub_dest dest = topic_info.dest;
-      dest.bucket_name = data_bucket_prefix + s->owner.get_id().to_str() + "-" + unique_topic_name;
-      dest.oid_prefix = data_oid_prefix + notif_name + "/";
-      auto sub = ps->get_sub(notif_name);
-      op_ret = sub->subscribe(this, unique_topic_name, dest, y, notif_name);
-      if (op_ret < 0) {
-        ldpp_dout(this, 1) << "failed to auto-generate subscription '" << notif_name << "', ret=" << op_ret << dendl;
-        // rollback generated notification (ignore return value)
-        b->remove_notification(this, unique_topic_name, y);
-        // rollback generated topic (ignore return value)
-        ps->remove_topic(this, unique_topic_name, y);
-        return;
-      }
-      ldpp_dout(this, 20) << "successfully auto-generated subscription '" << notif_name << "'" << dendl;
-    }
   }
 }
 
-// command (extension to S3): DELETE /bucket?notification[=<notification-id>]
-class RGWPSDeleteNotif_ObjStore_S3 : public RGWPSDeleteNotifOp {
-private:
-  std::string notif_name;
+int RGWPSCreateNotifOp::verify_permission(optional_yield y) {
+  if (!verify_bucket_permission(this, s, rgw::IAM::s3PutBucketNotification)) {
+    return -EACCES;
+  }
 
-  int get_params() override {
+  return 0;
+}
+
+// command (extension to S3): DELETE /bucket?notification[=<notification-id>]
+class RGWPSDeleteNotifOp : public RGWDefaultResponseOp {
+  int get_params(std::string& notif_name) const {
     bool exists;
     notif_name = s->info.args.get("notification", &exists);
     if (!exists) {
@@ -670,30 +756,46 @@ private:
       ldpp_dout(this, 1) << "request must be on a bucket" << dendl;
       return -EINVAL;
     }
-    bucket_name = s->bucket_name;
     return 0;
   }
 
 public:
-  void execute(optional_yield y) override;
+  int verify_permission(optional_yield y) override;
+
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+  
   const char* name() const override { return "pubsub_notification_delete_s3"; }
+  RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_DELETE; }
+  uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
+
+  void execute(optional_yield y) override;
 };
 
-void RGWPSDeleteNotif_ObjStore_S3::execute(optional_yield y) {
-  op_ret = get_params();
+void RGWPSDeleteNotifOp::execute(optional_yield y) {
+  std::string notif_name;
+  op_ret = get_params(notif_name);
+  if (op_ret < 0) {
+    return;
+  }
+
+  std::unique_ptr<rgw::sal::User> user = driver->get_user(s->owner.get_id());
+  std::unique_ptr<rgw::sal::Bucket> bucket;
+  op_ret = driver->get_bucket(this, user.get(), s->owner.get_id().tenant, s->bucket_name, &bucket, y);
   if (op_ret < 0) {
+    ldpp_dout(this, 1) << "failed to get bucket '" << s->bucket_name << "' info, ret = " << op_ret << dendl;
     return;
   }
 
-  ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
-  auto b = ps->get_bucket(bucket_info.bucket);
-  ceph_assert(b);
+  const RGWPubSub ps(driver, s->owner.get_id().tenant);
+  const RGWPubSub::Bucket b(ps, bucket.get());
 
   // get all topics on a bucket
   rgw_pubsub_bucket_topics bucket_topics;
-  op_ret = b->get_topics(&bucket_topics);
+  op_ret = b.get_topics(this, bucket_topics, y);
   if (op_ret < 0) {
-    ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << bucket_info.bucket.name << "', ret=" << op_ret << dendl;
+    ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << s->bucket_name << "', ret=" << op_ret << dendl;
     return;
   }
 
@@ -701,15 +803,8 @@ void RGWPSDeleteNotif_ObjStore_S3::execute(optional_yield y) {
     // delete a specific notification
     const auto unique_topic = find_unique_topic(bucket_topics, notif_name);
     if (unique_topic) {
-      // remove the auto generated subscription according to notification name (if exist)
       const auto unique_topic_name = unique_topic->get().topic.name;
-      auto sub = ps->get_sub(notif_name);
-      op_ret = sub->unsubscribe(this, unique_topic_name, y);
-      if (op_ret < 0 && op_ret != -ENOENT) {
-        ldpp_dout(this, 1) << "failed to remove auto-generated subscription '" << notif_name << "', ret=" << op_ret << dendl;
-        return;
-      }
-      op_ret = remove_notification_by_topic(this, unique_topic_name, b, y, *ps);
+      op_ret = remove_notification_by_topic(this, unique_topic_name, b, y, ps);
       return;
     }
     // notification to be removed is not found - considered success
@@ -717,16 +812,22 @@ void RGWPSDeleteNotif_ObjStore_S3::execute(optional_yield y) {
     return;
   }
 
-  op_ret = delete_all_notifications(this, bucket_topics, b, y, *ps);
+  op_ret = delete_all_notifications(this, bucket_topics, b, y, ps);
+}
+
+int RGWPSDeleteNotifOp::verify_permission(optional_yield y) {
+  if (!verify_bucket_permission(this, s, rgw::IAM::s3PutBucketNotification)) {
+    return -EACCES;
+  }
+
+  return 0;
 }
 
 // command (S3 compliant): GET /bucket?notification[=<notification-id>]
-class RGWPSListNotifs_ObjStore_S3 : public RGWPSListNotifsOp {
-private:
-  std::string notif_name;
+class RGWPSListNotifsOp : public RGWOp {
   rgw_pubsub_s3_notifications notifications;
 
-  int get_params() override {
+  int get_params(std::string& notif_name) const {
     bool exists;
     notif_name = s->info.args.get("notification", &exists);
     if (!exists) {
@@ -737,11 +838,20 @@ private:
       ldpp_dout(this, 1) << "request must be on a bucket" << dendl;
       return -EINVAL;
     }
-    bucket_name = s->bucket_name;
     return 0;
   }
 
 public:
+  int verify_permission(optional_yield y) override;
+
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+
+  const char* name() const override { return "pubsub_notifications_get_s3"; }
+  RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_LIST; }
+  uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+
   void execute(optional_yield y) override;
   void send_response() override {
     if (op_ret) {
@@ -756,19 +866,31 @@ public:
     notifications.dump_xml(s->formatter);
     rgw_flush_formatter_and_reset(s, s->formatter);
   }
-  const char* name() const override { return "pubsub_notifications_get_s3"; }
 };
 
-void RGWPSListNotifs_ObjStore_S3::execute(optional_yield y) {
-  ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
-  auto b = ps->get_bucket(bucket_info.bucket);
-  ceph_assert(b);
+void RGWPSListNotifsOp::execute(optional_yield y) {
+  std::string notif_name;
+  op_ret = get_params(notif_name);
+  if (op_ret < 0) {
+    return;
+  }
+
+  std::unique_ptr<rgw::sal::User> user = driver->get_user(s->owner.get_id());
+  std::unique_ptr<rgw::sal::Bucket> bucket;
+  op_ret = driver->get_bucket(this, user.get(), s->owner.get_id().tenant, s->bucket_name, &bucket, y);
+  if (op_ret < 0) {
+    ldpp_dout(this, 1) << "failed to get bucket '" << s->bucket_name << "' info, ret = " << op_ret << dendl;
+    return;
+  }
+
+  const RGWPubSub ps(driver, s->owner.get_id().tenant);
+  const RGWPubSub::Bucket b(ps, bucket.get());
   
   // get all topics on a bucket
   rgw_pubsub_bucket_topics bucket_topics;
-  op_ret = b->get_topics(&bucket_topics);
+  op_ret = b.get_topics(this, bucket_topics, y);
   if (op_ret < 0) {
-    ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << bucket_info.bucket.name << "', ret=" << op_ret << dendl;
+    ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << s->bucket_name << "', ret=" << op_ret << dendl;
     return;
   }
   if (!notif_name.empty()) {
@@ -792,27 +914,35 @@ void RGWPSListNotifs_ObjStore_S3::execute(optional_yield y) {
   }
 }
 
+int RGWPSListNotifsOp::verify_permission(optional_yield y) {
+  if (!verify_bucket_permission(this, s, rgw::IAM::s3GetBucketNotification)) {
+    return -EACCES;
+  }
+
+  return 0;
+}
+
 RGWOp* RGWHandler_REST_PSNotifs_S3::op_get() {
-  return new RGWPSListNotifs_ObjStore_S3();
+  return new RGWPSListNotifsOp();
 }
 
 RGWOp* RGWHandler_REST_PSNotifs_S3::op_put() {
-  return new RGWPSCreateNotif_ObjStore_S3();
+  return new RGWPSCreateNotifOp();
 }
 
 RGWOp* RGWHandler_REST_PSNotifs_S3::op_delete() {
-  return new RGWPSDeleteNotif_ObjStore_S3();
+  return new RGWPSDeleteNotifOp();
 }
 
 RGWOp* RGWHandler_REST_PSNotifs_S3::create_get_op() {
-    return new RGWPSListNotifs_ObjStore_S3();
+    return new RGWPSListNotifsOp();
 }
 
 RGWOp* RGWHandler_REST_PSNotifs_S3::create_put_op() {
-  return new RGWPSCreateNotif_ObjStore_S3();
+  return new RGWPSCreateNotifOp();
 }
 
 RGWOp* RGWHandler_REST_PSNotifs_S3::create_delete_op() {
-  return new RGWPSDeleteNotif_ObjStore_S3();
+  return new RGWPSDeleteNotifOp();
 }