]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_sync_module_pubsub_rest.cc
import ceph 14.2.5
[ceph.git] / ceph / src / rgw / rgw_sync_module_pubsub_rest.cc
index ef87613989ce5b280b0b3f956d080d5e5f0c9408..66c99e94bbf509838cfc73dd7f743c2a14c49bee 100644 (file)
@@ -1,98 +1,66 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#include <algorithm>
+#include "rgw_rest_pubsub_common.h"
+#include "rgw_rest_pubsub.h"
 #include "rgw_sync_module_pubsub.h"
+#include "rgw_pubsub_push.h"
 #include "rgw_sync_module_pubsub_rest.h"
 #include "rgw_pubsub.h"
 #include "rgw_op.h"
 #include "rgw_rest.h"
 #include "rgw_rest_s3.h"
+#include "rgw_arn.h"
+#include "rgw_zone.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rgw
 
-class RGWPSCreateTopicOp : public RGWDefaultResponseOp {
-protected:
-  std::unique_ptr<RGWUserPubSub> ups;
-  string topic_name;
-  string bucket_name;
-
+// command: PUT /topics/<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]
+class RGWPSCreateTopic_ObjStore : public RGWPSCreateTopicOp {
 public:
-  RGWPSCreateTopicOp() {}
-
-  int verify_permission() override {
-    return 0;
-  }
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
-  }
-  void execute() override;
-
-  const char* name() const override { return "pubsub_topic_create"; }
-  virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_CREATE; }
-  virtual uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
-  virtual int get_params() = 0;
-};
-
-void RGWPSCreateTopicOp::execute()
-{
-  op_ret = get_params();
-  if (op_ret < 0) {
-    return;
-  }
-
-  ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
-  op_ret = ups->create_topic(topic_name);
-  if (op_ret < 0) {
-    ldout(s->cct, 20) << "failed to create topic, ret=" << op_ret << dendl;
-    return;
-  }
-}
-
-class RGWPSCreateTopic_ObjStore_S3 : public RGWPSCreateTopicOp {
-public:
-  explicit RGWPSCreateTopic_ObjStore_S3() {}
-
   int get_params() override {
+    
     topic_name = s->object.name;
+
+    dest.push_endpoint = s->info.args.get("push-endpoint");
+    dest.push_endpoint_args = s->info.args.get_str();
+    // dest object only stores endpoint info
+    // bucket to store events/records will be set only when subscription is created
+    dest.bucket_name = "";
+    dest.oid_prefix = "";
+    dest.arn_topic = topic_name;
+    // the topic ARN will be sent in the reply
+    const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns, 
+        store->svc.zone->get_zonegroup().get_name(),
+        s->user->user_id.tenant, topic_name);
+    topic_arn = arn.to_string();
     return 0;
   }
-};
-
-class RGWPSListTopicsOp : public RGWOp {
-protected:
-  std::unique_ptr<RGWUserPubSub> ups;
-  rgw_pubsub_user_topics result;
 
+  void send_response() override {
+    if (op_ret) {
+      set_req_state_err(s, op_ret);
+    }
+    dump_errno(s);
+    end_header(s, this, "application/json");
 
-public:
-  RGWPSListTopicsOp() {}
+    if (op_ret < 0) {
+      return;
+    }
 
-  int verify_permission() override {
-    return 0;
-  }
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
+    {
+      Formatter::ObjectSection section(*s->formatter, "result");
+      encode_json("arn", topic_arn, s->formatter);
+    }
+    rgw_flush_formatter_and_reset(s, s->formatter);
   }
-  void execute() override;
-
-  const char* name() const override { return "pubsub_topics_list"; }
-  virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPICS_LIST; }
-  virtual uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
 };
 
-void RGWPSListTopicsOp::execute()
-{
-  ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
-  op_ret = ups->get_user_topics(&result);
-  if (op_ret < 0) {
-    ldout(s->cct, 20) << "failed to get topics, ret=" << op_ret << dendl;
-    return;
-  }
-
-}
-
-class RGWPSListTopics_ObjStore_S3 : public RGWPSListTopicsOp {
+// command: GET /topics
+class RGWPSListTopics_ObjStore : public RGWPSListTopicsOp {
 public:
-  explicit RGWPSListTopics_ObjStore_S3() {}
-
   void send_response() override {
     if (op_ret) {
       set_req_state_err(s, op_ret);
@@ -109,47 +77,9 @@ public:
   }
 };
 
-class RGWPSGetTopicOp : public RGWOp {
-protected:
-  string topic_name;
-  std::unique_ptr<RGWUserPubSub> ups;
-  rgw_pubsub_topic_subs result;
-
+// command: GET /topics/<topic-name>
+class RGWPSGetTopic_ObjStore : public RGWPSGetTopicOp {
 public:
-  RGWPSGetTopicOp() {}
-
-  int verify_permission() override {
-    return 0;
-  }
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
-  }
-  void execute() override;
-
-  const char* name() const override { return "pubsub_topic_get"; }
-  virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_GET; }
-  virtual uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
-  virtual int get_params() = 0;
-};
-
-void RGWPSGetTopicOp::execute()
-{
-  op_ret = get_params();
-  if (op_ret < 0) {
-    return;
-  }
-  ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
-  op_ret = ups->get_topic(topic_name, &result);
-  if (op_ret < 0) {
-    ldout(s->cct, 20) << "failed to get topic, ret=" << op_ret << dendl;
-    return;
-  }
-}
-
-class RGWPSGetTopic_ObjStore_S3 : public RGWPSGetTopicOp {
-public:
-  explicit RGWPSGetTopic_ObjStore_S3() {}
-
   int get_params() override {
     topic_name = s->object.name;
     return 0;
@@ -171,204 +101,89 @@ public:
   }
 };
 
-class RGWPSDeleteTopicOp : public RGWDefaultResponseOp {
-protected:
-  string topic_name;
-  std::unique_ptr<RGWUserPubSub> ups;
-
+// command: DELETE /topics/<topic-name>
+class RGWPSDeleteTopic_ObjStore : public RGWPSDeleteTopicOp {
 public:
-  RGWPSDeleteTopicOp() {}
-
-  int verify_permission() override {
-    return 0;
-  }
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
-  }
-  void execute() override;
-
-  const char* name() const override { return "pubsub_topic_delete"; }
-  virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_DELETE; }
-  virtual uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
-  virtual int get_params() = 0;
-};
-
-void RGWPSDeleteTopicOp::execute()
-{
-  op_ret = get_params();
-  if (op_ret < 0) {
-    return;
-  }
-
-  ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
-  op_ret = ups->remove_topic(topic_name);
-  if (op_ret < 0) {
-    ldout(s->cct, 20) << "failed to remove topic, ret=" << op_ret << dendl;
-    return;
-  }
-}
-
-class RGWPSDeleteTopic_ObjStore_S3 : public RGWPSDeleteTopicOp {
-public:
-  explicit RGWPSDeleteTopic_ObjStore_S3() {}
-
   int get_params() override {
     topic_name = s->object.name;
     return 0;
   }
 };
 
-class RGWHandler_REST_PSTopic_S3 : public RGWHandler_REST_S3 {
+// ceph specifc topics handler factory
+class RGWHandler_REST_PSTopic : public RGWHandler_REST_S3 {
 protected:
   int init_permissions(RGWOp* op) override {
     return 0;
   }
+
   int read_permissions(RGWOp* op) override {
     return 0;
   }
+
   bool supports_quota() override {
     return false;
   }
+
   RGWOp *op_get() override {
     if (s->init_state.url_bucket.empty()) {
       return nullptr;
     }
     if (s->object.empty()) {
-      return new RGWPSListTopics_ObjStore_S3();
+      return new RGWPSListTopics_ObjStore();
     }
-    return new RGWPSGetTopic_ObjStore_S3();
+    return new RGWPSGetTopic_ObjStore();
   }
   RGWOp *op_put() override {
     if (!s->object.empty()) {
-      return new RGWPSCreateTopic_ObjStore_S3();
+      return new RGWPSCreateTopic_ObjStore();
     }
     return nullptr;
   }
   RGWOp *op_delete() override {
     if (!s->object.empty()) {
-      return new RGWPSDeleteTopic_ObjStore_S3();
+      return new RGWPSDeleteTopic_ObjStore();
     }
     return nullptr;
   }
 public:
-  explicit RGWHandler_REST_PSTopic_S3(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
-  virtual ~RGWHandler_REST_PSTopic_S3() {}
+  explicit RGWHandler_REST_PSTopic(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
+  virtual ~RGWHandler_REST_PSTopic() = default;
 };
 
-
-class RGWPSCreateSubOp : public RGWDefaultResponseOp {
-protected:
-  string sub_name;
-  string topic_name;
-  std::unique_ptr<RGWUserPubSub> ups;
-  rgw_pubsub_sub_dest dest;
-
+// command: PUT /subscriptions/<sub-name>?topic=<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]...
+class RGWPSCreateSub_ObjStore : public RGWPSCreateSubOp {
 public:
-  RGWPSCreateSubOp() {}
-
-  int verify_permission() override {
-    return 0;
-  }
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
-  }
-  void execute() override;
-
-  const char* name() const override { return "pubsub_subscription_create"; }
-  virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_CREATE; }
-  virtual uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
-  virtual int get_params() = 0;
-};
-
-void RGWPSCreateSubOp::execute()
-{
-  op_ret = get_params();
-  if (op_ret < 0) {
-    return;
-  }
-  ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
-  auto sub = ups->get_sub(sub_name);
-  op_ret = sub->subscribe(topic_name, dest);
-  if (op_ret < 0) {
-    ldout(s->cct, 20) << "failed to create subscription, ret=" << op_ret << dendl;
-    return;
-  }
-}
-
-class RGWPSCreateSub_ObjStore_S3 : public RGWPSCreateSubOp {
-public:
-  explicit RGWPSCreateSub_ObjStore_S3() {}
-
   int get_params() override {
     sub_name = s->object.name;
 
     bool exists;
-
     topic_name = s->info.args.get("topic", &exists);
     if (!exists) {
-      ldout(s->cct, 20) << "ERROR: missing required param 'topic' for request" << dendl;
+      ldout(s->cct, 1) << "missing required param 'topic'" << dendl;
       return -EINVAL;
     }
 
-    auto psmodule = static_cast<RGWPSSyncModuleInstance *>(store->get_sync_module().get());
-    auto conf = psmodule->get_effective_conf();
+    const auto psmodule = static_cast<RGWPSSyncModuleInstance*>(store->get_sync_module().get());
+    const auto& conf = psmodule->get_effective_conf();
 
     dest.push_endpoint = s->info.args.get("push-endpoint");
     dest.bucket_name = string(conf["data_bucket_prefix"]) + s->owner.get_id().to_str() + "-" + topic_name;
     dest.oid_prefix = string(conf["data_oid_prefix"]) + sub_name + "/";
     dest.push_endpoint_args = s->info.args.get_str();
+    dest.arn_topic = topic_name;
 
     return 0;
   }
 };
 
-class RGWPSGetSubOp : public RGWOp {
-protected:
-  string sub_name;
-  std::unique_ptr<RGWUserPubSub> ups;
-  rgw_pubsub_sub_config result;
-
-public:
-  RGWPSGetSubOp() {}
-
-  int verify_permission() override {
-    return 0;
-  }
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
-  }
-  void execute() override;
-
-  const char* name() const override { return "pubsub_subscription_get"; }
-  virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_GET; }
-  virtual uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
-  virtual int get_params() = 0;
-};
-
-void RGWPSGetSubOp::execute()
-{
-  op_ret = get_params();
-  if (op_ret < 0) {
-    return;
-  }
-  ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
-  auto sub = ups->get_sub(sub_name);
-  op_ret = sub->get_conf(&result);
-  if (op_ret < 0) {
-    ldout(s->cct, 20) << "failed to get subscription, ret=" << op_ret << dendl;
-    return;
-  }
-}
-
-class RGWPSGetSub_ObjStore_S3 : public RGWPSGetSubOp {
+// command: GET /subscriptions/<sub-name>
+class RGWPSGetSub_ObjStore : public RGWPSGetSubOp {
 public:
-  explicit RGWPSGetSub_ObjStore_S3() {}
-
   int get_params() override {
     sub_name = s->object.name;
     return 0;
   }
-
   void send_response() override {
     if (op_ret) {
       set_req_state_err(s, op_ret);
@@ -380,58 +195,14 @@ public:
       return;
     }
 
-    {
-      Formatter::ObjectSection section(*s->formatter, "result");
-      encode_json("topic", result.topic, s->formatter);
-      encode_json("push_endpoint", result.dest.push_endpoint, s->formatter);
-      encode_json("args", result.dest.push_endpoint_args, s->formatter);
-    }
+    encode_json("result", result, s->formatter);
     rgw_flush_formatter_and_reset(s, s->formatter);
   }
 };
 
-class RGWPSDeleteSubOp : public RGWDefaultResponseOp {
-protected:
-  string sub_name;
-  string topic_name;
-  std::unique_ptr<RGWUserPubSub> ups;
-
-public:
-  RGWPSDeleteSubOp() {}
-
-  int verify_permission() override {
-    return 0;
-  }
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
-  }
-  void execute() override;
-
-  const char* name() const override { return "pubsub_subscription_delete"; }
-  virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_DELETE; }
-  virtual uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
-  virtual int get_params() = 0;
-};
-
-void RGWPSDeleteSubOp::execute()
-{
-  op_ret = get_params();
-  if (op_ret < 0) {
-    return;
-  }
-  ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
-  auto sub = ups->get_sub(sub_name);
-  op_ret = sub->unsubscribe(topic_name);
-  if (op_ret < 0) {
-    ldout(s->cct, 20) << "failed to remove subscription, ret=" << op_ret << dendl;
-    return;
-  }
-}
-
-class RGWPSDeleteSub_ObjStore_S3 : public RGWPSDeleteSubOp {
+// command: DELETE /subscriptions/<sub-name>
+class RGWPSDeleteSub_ObjStore : public RGWPSDeleteSubOp {
 public:
-  explicit RGWPSDeleteSub_ObjStore_S3() {}
-
   int get_params() override {
     sub_name = s->object.name;
     topic_name = s->info.args.get("topic");
@@ -439,47 +210,10 @@ public:
   }
 };
 
-class RGWPSAckSubEventOp : public RGWDefaultResponseOp {
-protected:
-  string sub_name;
-  string event_id;
-  std::unique_ptr<RGWUserPubSub> ups;
-
+// command: POST /subscriptions/<sub-name>?ack&event-id=<event-id>
+class RGWPSAckSubEvent_ObjStore : public RGWPSAckSubEventOp {
 public:
-  RGWPSAckSubEventOp() {}
-
-  int verify_permission() override {
-    return 0;
-  }
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
-  }
-  void execute() override;
-
-  const char* name() const override { return "pubsub_subscription_ack"; }
-  virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_ACK; }
-  virtual uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
-  virtual int get_params() = 0;
-};
-
-void RGWPSAckSubEventOp::execute()
-{
-  op_ret = get_params();
-  if (op_ret < 0) {
-    return;
-  }
-  ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
-  auto sub = ups->get_sub(sub_name);
-  op_ret = sub->remove_event(event_id);
-  if (op_ret < 0) {
-    ldout(s->cct, 20) << "failed to ack event, ret=" << op_ret << dendl;
-    return;
-  }
-}
-
-class RGWPSAckSubEvent_ObjStore_S3 : public RGWPSAckSubEventOp {
-public:
-  explicit RGWPSAckSubEvent_ObjStore_S3() {}
+  explicit RGWPSAckSubEvent_ObjStore() {}
 
   int get_params() override {
     sub_name = s->object.name;
@@ -488,64 +222,23 @@ public:
 
     event_id = s->info.args.get("event-id", &exists);
     if (!exists) {
-      ldout(s->cct, 20) << "ERROR: missing required param 'event-id' for request" << dendl;
+      ldout(s->cct, 1) << "missing required param 'event-id'" << dendl;
       return -EINVAL;
     }
     return 0;
   }
 };
 
-class RGWPSPullSubEventsOp : public RGWOp {
-protected:
-  int max_entries{0};
-  string sub_name;
-  string marker;
-  std::unique_ptr<RGWUserPubSub> ups;
-  RGWUserPubSub::Sub::list_events_result result;
-
-public:
-  RGWPSPullSubEventsOp() {}
-
-  int verify_permission() override {
-    return 0;
-  }
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
-  }
-  void execute() override;
-
-  const char* name() const override { return "pubsub_subscription_pull"; }
-  virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_PULL; }
-  virtual uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
-  virtual int get_params() = 0;
-};
-
-void RGWPSPullSubEventsOp::execute()
-{
-  op_ret = get_params();
-  if (op_ret < 0) {
-    return;
-  }
-  ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
-  auto sub = ups->get_sub(sub_name);
-  op_ret = sub->list_events(marker, max_entries, &result);
-  if (op_ret < 0) {
-    ldout(s->cct, 20) << "failed to get subscription, ret=" << op_ret << dendl;
-    return;
-  }
-}
-
-class RGWPSPullSubEvents_ObjStore_S3 : public RGWPSPullSubEventsOp {
+// command: GET /subscriptions/<sub-name>?events[&max-entries=<max-entries>][&marker=<marker>]
+class RGWPSPullSubEvents_ObjStore : public RGWPSPullSubEventsOp {
 public:
-  explicit RGWPSPullSubEvents_ObjStore_S3() {}
-
   int get_params() override {
     sub_name = s->object.name;
     marker = s->info.args.get("marker");
-#define DEFAULT_MAX_ENTRIES 100
-    int ret = s->info.args.get_int("max-entries", &max_entries, DEFAULT_MAX_ENTRIES);
+    const int ret = s->info.args.get_int("max-entries", &max_entries, 
+        RGWUserPubSub::Sub::DEFAULT_MAX_EVENTS);
     if (ret < 0) {
-      ldout(s->cct, 20) << "failed to parse 'max-entries' param" << dendl;
+      ldout(s->cct, 1) << "failed to parse 'max-entries' param" << dendl;
       return -EINVAL;
     }
     return 0;
@@ -562,12 +255,13 @@ public:
       return;
     }
 
-    encode_json("result", result, s->formatter);
+    encode_json("result", *sub, s->formatter);
     rgw_flush_formatter_and_reset(s, s->formatter);
   }
 };
 
-class RGWHandler_REST_PSSub_S3 : public RGWHandler_REST_S3 {
+// subscriptions handler factory
+class RGWHandler_REST_PSSub : public RGWHandler_REST_S3 {
 protected:
   int init_permissions(RGWOp* op) override {
     return 0;
@@ -584,36 +278,37 @@ protected:
       return nullptr;
     }
     if (s->info.args.exists("events")) {
-      return new RGWPSPullSubEvents_ObjStore_S3();
+      return new RGWPSPullSubEvents_ObjStore();
     }
-    return new RGWPSGetSub_ObjStore_S3();
+    return new RGWPSGetSub_ObjStore();
   }
   RGWOp *op_put() override {
     if (!s->object.empty()) {
-      return new RGWPSCreateSub_ObjStore_S3();
+      return new RGWPSCreateSub_ObjStore();
     }
     return nullptr;
   }
   RGWOp *op_delete() override {
     if (!s->object.empty()) {
-      return new RGWPSDeleteSub_ObjStore_S3();
+      return new RGWPSDeleteSub_ObjStore();
     }
     return nullptr;
   }
   RGWOp *op_post() override {
     if (s->info.args.exists("ack")) {
-      return new RGWPSAckSubEvent_ObjStore_S3();
+      return new RGWPSAckSubEvent_ObjStore();
     }
     return nullptr;
   }
 public:
-  explicit RGWHandler_REST_PSSub_S3(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
-  virtual ~RGWHandler_REST_PSSub_S3() {}
+  explicit RGWHandler_REST_PSSub(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
+  virtual ~RGWHandler_REST_PSSub() = default;
 };
 
-
-static int notif_bucket_path(const string& path, string *bucket_name)
-{
+namespace {
+// extract bucket name from ceph specific notification command, with the format:
+// /notifications/<bucket-name>
+int notif_bucket_path(const string& path, std::string& bucket_name) {
   if (path.empty()) {
     return -EINVAL;
   }
@@ -630,218 +325,103 @@ static int notif_bucket_path(const string& path, string *bucket_name)
     return -EINVAL;
   }
 
-  *bucket_name = path.substr(pos + 1);
+  bucket_name = path.substr(pos + 1);
   return 0;
 }
+}
 
-class RGWPSCreateNotifOp : public RGWDefaultResponseOp {
-protected:
-  std::unique_ptr<RGWUserPubSub> ups;
-  string topic_name;
-  set<string, ltstr_nocase> events;
-
-  string bucket_name;
-  RGWBucketInfo bucket_info;
-
-public:
-  RGWPSCreateNotifOp() {}
+// command (ceph specific): PUT /notification/bucket/<bucket name>?topic=<topic name>
+class RGWPSCreateNotif_ObjStore : public RGWPSCreateNotifOp {
+private:
+  std::string topic_name;
+  rgw::notify::EventTypeList events;
 
-  int verify_permission() override {
-    int ret = get_params();
-    if (ret < 0) {
-      return ret;
+  int get_params() override {
+    bool exists;
+    topic_name = s->info.args.get("topic", &exists);
+    if (!exists) {
+      ldout(s->cct, 1) << "missing required param 'topic'" << dendl;
+      return -EINVAL;
     }
 
-    ret = store->get_bucket_info(*s->sysobj_ctx, s->owner.get_id().tenant, bucket_name,
-                                 bucket_info, nullptr, nullptr);
-    if (ret < 0) {
-      return ret;
+    std::string events_str = s->info.args.get("events", &exists);
+    if (!exists) {
+      // if no events are provided, we notify on all of them
+      events_str = "OBJECT_CREATE,OBJECT_DELETE,DELETE_MARKER_CREATE";
     }
-
-    if (bucket_info.owner != s->owner.get_id()) {
-      ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl;
-      return -EPERM;
+    rgw::notify::from_string_list(events_str, events);
+    if (std::find(events.begin(), events.end(), rgw::notify::UnknownEvent) != events.end()) {
+      ldout(s->cct, 1) << "invalid event type in list: " << events_str << dendl;
+      return -EINVAL;
     }
-    return 0;
-  }
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
+    return notif_bucket_path(s->object.name, bucket_name);
   }
-  void execute() override;
 
+public:
   const char* name() const override { return "pubsub_notification_create"; }
-  virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_CREATE; }
-  virtual uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
-  virtual int get_params() = 0;
+  void execute() override;
 };
 
-void RGWPSCreateNotifOp::execute()
+void RGWPSCreateNotif_ObjStore::execute()
 {
-  op_ret = get_params();
-  if (op_ret < 0) {
-    return;
-  }
+  ups.emplace(store, s->owner.get_id());
 
-  ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
   auto b = ups->get_bucket(bucket_info.bucket);
   op_ret = b->create_notification(topic_name, events);
   if (op_ret < 0) {
-    ldout(s->cct, 20) << "failed to create notification, ret=" << op_ret << dendl;
+    ldout(s->cct, 1) << "failed to create notification for topic '" << topic_name << "', ret=" << op_ret << dendl;
     return;
   }
+  ldout(s->cct, 20) << "successfully created notification for topic '" << topic_name << "'" << dendl;
 }
 
-class RGWPSCreateNotif_ObjStore_S3 : public RGWPSCreateNotifOp {
-public:
-  explicit RGWPSCreateNotif_ObjStore_S3() {}
+// command: DELETE /notifications/bucket/<bucket>?topic=<topic-name>
+class RGWPSDeleteNotif_ObjStore : public RGWPSDeleteNotifOp {
+private:
+  std::string topic_name;
 
   int get_params() override {
     bool exists;
     topic_name = s->info.args.get("topic", &exists);
     if (!exists) {
-      ldout(s->cct, 20) << "param 'topic' not provided" << dendl;
+      ldout(s->cct, 1) << "missing required param 'topic'" << dendl;
       return -EINVAL;
     }
-
-    string events_str = s->info.args.get("events", &exists);
-    if (exists) {
-      get_str_set(events_str, ",", events);
-    }
-    return notif_bucket_path(s->object.name, &bucket_name);
+    return notif_bucket_path(s->object.name, bucket_name);
   }
-};
-
-class RGWPSDeleteNotifOp : public RGWDefaultResponseOp {
-protected:
-  std::unique_ptr<RGWUserPubSub> ups;
-  string topic_name;
-  string bucket_name;
-  RGWBucketInfo bucket_info;
 
 public:
-  RGWPSDeleteNotifOp() {}
-
-  int verify_permission() override {
-    int ret = get_params();
-    if (ret < 0) {
-      return ret;
-    }
-
-    ret = store->get_bucket_info(*s->sysobj_ctx, s->owner.get_id().tenant, bucket_name,
-                                 bucket_info, nullptr, nullptr);
-    if (ret < 0) {
-      return ret;
-    }
-
-    if (bucket_info.owner != s->owner.get_id()) {
-      ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl;
-      return -EPERM;
-    }
-    return 0;
-  }
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
-  }
   void execute() override;
-
   const char* name() const override { return "pubsub_notification_delete"; }
-  virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_DELETE; }
-  virtual uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
-  virtual int get_params() = 0;
 };
 
-void RGWPSDeleteNotifOp::execute()
-{
+void RGWPSDeleteNotif_ObjStore::execute() {
   op_ret = get_params();
   if (op_ret < 0) {
     return;
   }
 
-  ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
+  ups.emplace(store, s->owner.get_id());
   auto b = ups->get_bucket(bucket_info.bucket);
   op_ret = b->remove_notification(topic_name);
   if (op_ret < 0) {
-    ldout(s->cct, 20) << "failed to remove notification, ret=" << op_ret << dendl;
+    ldout(s->cct, 1) << "failed to remove notification from topic '" << topic_name << "', ret=" << op_ret << dendl;
     return;
   }
+  ldout(s->cct, 20) << "successfully removed notification from topic '" << topic_name << "'" << dendl;
 }
 
-class RGWPSDeleteNotif_ObjStore_S3 : public RGWPSCreateNotifOp {
-public:
-  explicit RGWPSDeleteNotif_ObjStore_S3() {}
+// command: GET /notifications/bucket/<bucket>
+class RGWPSListNotifs_ObjStore : public RGWPSListNotifsOp {
+private:
+  rgw_pubsub_bucket_topics result;
 
   int get_params() override {
-    bool exists;
-    topic_name = s->info.args.get("topic", &exists);
-    if (!exists) {
-      ldout(s->cct, 20) << "param 'topic' not provided" << dendl;
-      return -EINVAL;
-    }
-    return notif_bucket_path(s->object.name, &bucket_name);
+    return notif_bucket_path(s->object.name, bucket_name);
   }
-};
-
-class RGWPSListNotifsOp : public RGWOp {
-protected:
-  string bucket_name;
-  RGWBucketInfo bucket_info;
-  std::unique_ptr<RGWUserPubSub> ups;
-  rgw_pubsub_bucket_topics result;
-
 
 public:
-  RGWPSListNotifsOp() {}
-
-  int verify_permission() override {
-    int ret = get_params();
-    if (ret < 0) {
-      return ret;
-    }
-
-    ret = store->get_bucket_info(*s->sysobj_ctx, s->owner.get_id().tenant, bucket_name,
-                                 bucket_info, nullptr, nullptr);
-    if (ret < 0) {
-      return ret;
-    }
-
-    if (bucket_info.owner != s->owner.get_id()) {
-      ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl;
-      return -EPERM;
-    }
-
-    return 0;
-  }
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
-  }
   void execute() override;
-
-  const char* name() const override { return "pubsub_notifications_list"; }
-  virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_LIST; }
-  virtual uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
-  virtual int get_params() = 0;
-};
-
-void RGWPSListNotifsOp::execute()
-{
-  ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
-  auto b = ups->get_bucket(bucket_info.bucket);
-  op_ret = b->get_topics(&result);
-  if (op_ret < 0) {
-    ldout(s->cct, 20) << "failed to get topics, ret=" << op_ret << dendl;
-    return;
-  }
-
-}
-
-class RGWPSListNotifs_ObjStore_S3 : public RGWPSListNotifsOp {
-public:
-  explicit RGWPSListNotifs_ObjStore_S3() {}
-
-  int get_params() override {
-    return notif_bucket_path(s->object.name, &bucket_name);
-  }
-
   void send_response() override {
     if (op_ret) {
       set_req_state_err(s, op_ret);
@@ -852,14 +432,25 @@ public:
     if (op_ret < 0) {
       return;
     }
-
     encode_json("result", result, s->formatter);
     rgw_flush_formatter_and_reset(s, s->formatter);
   }
+  const char* name() const override { return "pubsub_notifications_list"; }
 };
 
+void RGWPSListNotifs_ObjStore::execute()
+{
+  ups.emplace(store, s->owner.get_id());
+  auto b = ups->get_bucket(bucket_info.bucket);
+  op_ret = b->get_topics(&result);
+  if (op_ret < 0) {
+    ldout(s->cct, 1) << "failed to get topics, ret=" << op_ret << dendl;
+    return;
+  }
+}
 
-class RGWHandler_REST_PSNotifs_S3 : public RGWHandler_REST_S3 {
+// ceph specific notification handler factory
+class RGWHandler_REST_PSNotifs : public RGWHandler_REST_S3 {
 protected:
   int init_permissions(RGWOp* op) override {
     return 0;
@@ -875,52 +466,53 @@ protected:
     if (s->object.empty()) {
       return nullptr;
     }
-    return new RGWPSListNotifs_ObjStore_S3();
+    return new RGWPSListNotifs_ObjStore();
   }
   RGWOp *op_put() override {
     if (!s->object.empty()) {
-      return new RGWPSCreateNotif_ObjStore_S3();
+      return new RGWPSCreateNotif_ObjStore();
     }
     return nullptr;
   }
   RGWOp *op_delete() override {
     if (!s->object.empty()) {
-      return new RGWPSDeleteNotif_ObjStore_S3();
+      return new RGWPSDeleteNotif_ObjStore();
     }
     return nullptr;
   }
 public:
-  explicit RGWHandler_REST_PSNotifs_S3(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
-  virtual ~RGWHandler_REST_PSNotifs_S3() {}
+  explicit RGWHandler_REST_PSNotifs(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
+  virtual ~RGWHandler_REST_PSNotifs() = default;
 };
 
-
-RGWHandler_REST* RGWRESTMgr_PubSub_S3::get_handler(struct req_state* const s,
+// factory for ceph specific PubSub REST handlers 
+RGWHandler_REST* RGWRESTMgr_PubSub::get_handler(struct req_state* const s,
                                                      const rgw::auth::StrategyRegistry& auth_registry,
                                                      const std::string& frontend_prefix)
 {
-  int ret =
-    RGWHandler_REST_S3::init_from_header(s,
-                                       RGW_FORMAT_JSON, true);
-  if (ret < 0) {
+  if (RGWHandler_REST_S3::init_from_header(s, RGW_FORMAT_JSON, true) < 0) {
     return nullptr;
   }
+  RGWHandler_REST* handler{nullptr};
 
-  RGWHandler_REST *handler = nullptr;;
-
+  // ceph specific PubSub API: topics/subscriptions/notification are reserved bucket names
+  // this API is available only on RGW that belong to a pubsub zone
   if (s->init_state.url_bucket == "topics") {
-    handler = new RGWHandler_REST_PSTopic_S3(auth_registry);
-  }
-
-  if (s->init_state.url_bucket == "subscriptions") {
-    handler = new RGWHandler_REST_PSSub_S3(auth_registry);
-  }
-
-  if (s->init_state.url_bucket == "notifications") {
-    handler = new RGWHandler_REST_PSNotifs_S3(auth_registry);
-  }
-
+    handler = new RGWHandler_REST_PSTopic(auth_registry);
+  } else if (s->init_state.url_bucket == "subscriptions") {
+    handler = new RGWHandler_REST_PSSub(auth_registry);
+  } else if (s->init_state.url_bucket == "notifications") {
+    handler = new RGWHandler_REST_PSNotifs(auth_registry);
+  } else if (s->info.args.exists("notification")) {
+    const int ret = RGWHandler_REST::allocate_formatter(s, RGW_FORMAT_XML, true);
+    if (ret == 0) {
+        handler = new RGWHandler_REST_PSNotifs_S3(auth_registry);
+    }
+  }
+  
   ldout(s->cct, 20) << __func__ << " handler=" << (handler ? typeid(*handler).name() : "<null>") << dendl;
+
   return handler;
 }