+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#include <algorithm>
+#include "rgw_rest_pubsub_common.h"
+#include "rgw_rest_pubsub.h"
#include "rgw_sync_module_pubsub.h"
+#include "rgw_pubsub_push.h"
#include "rgw_sync_module_pubsub_rest.h"
#include "rgw_pubsub.h"
#include "rgw_op.h"
#include "rgw_rest.h"
#include "rgw_rest_s3.h"
+#include "rgw_arn.h"
+#include "rgw_zone.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw
-class RGWPSCreateTopicOp : public RGWDefaultResponseOp {
-protected:
- std::unique_ptr<RGWUserPubSub> ups;
- string topic_name;
- string bucket_name;
-
+// command: PUT /topics/<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]
+class RGWPSCreateTopic_ObjStore : public RGWPSCreateTopicOp {
public:
- RGWPSCreateTopicOp() {}
-
- int verify_permission() override {
- return 0;
- }
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
- void execute() override;
-
- const char* name() const override { return "pubsub_topic_create"; }
- virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_CREATE; }
- virtual uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
- virtual int get_params() = 0;
-};
-
-void RGWPSCreateTopicOp::execute()
-{
- op_ret = get_params();
- if (op_ret < 0) {
- return;
- }
-
- ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
- op_ret = ups->create_topic(topic_name);
- if (op_ret < 0) {
- ldout(s->cct, 20) << "failed to create topic, ret=" << op_ret << dendl;
- return;
- }
-}
-
-class RGWPSCreateTopic_ObjStore_S3 : public RGWPSCreateTopicOp {
-public:
- explicit RGWPSCreateTopic_ObjStore_S3() {}
-
int get_params() override {
+
topic_name = s->object.name;
+
+ dest.push_endpoint = s->info.args.get("push-endpoint");
+ dest.push_endpoint_args = s->info.args.get_str();
+ // dest object only stores endpoint info
+ // bucket to store events/records will be set only when subscription is created
+ dest.bucket_name = "";
+ dest.oid_prefix = "";
+ dest.arn_topic = topic_name;
+ // the topic ARN will be sent in the reply
+ const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns,
+ store->svc.zone->get_zonegroup().get_name(),
+ s->user->user_id.tenant, topic_name);
+ topic_arn = arn.to_string();
return 0;
}
-};
-
-class RGWPSListTopicsOp : public RGWOp {
-protected:
- std::unique_ptr<RGWUserPubSub> ups;
- rgw_pubsub_user_topics result;
+ void send_response() override {
+ if (op_ret) {
+ set_req_state_err(s, op_ret);
+ }
+ dump_errno(s);
+ end_header(s, this, "application/json");
-public:
- RGWPSListTopicsOp() {}
+ if (op_ret < 0) {
+ return;
+ }
- int verify_permission() override {
- return 0;
- }
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
+ {
+ Formatter::ObjectSection section(*s->formatter, "result");
+ encode_json("arn", topic_arn, s->formatter);
+ }
+ rgw_flush_formatter_and_reset(s, s->formatter);
}
- void execute() override;
-
- const char* name() const override { return "pubsub_topics_list"; }
- virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPICS_LIST; }
- virtual uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
};
-void RGWPSListTopicsOp::execute()
-{
- ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
- op_ret = ups->get_user_topics(&result);
- if (op_ret < 0) {
- ldout(s->cct, 20) << "failed to get topics, ret=" << op_ret << dendl;
- return;
- }
-
-}
-
-class RGWPSListTopics_ObjStore_S3 : public RGWPSListTopicsOp {
+// command: GET /topics
+class RGWPSListTopics_ObjStore : public RGWPSListTopicsOp {
public:
- explicit RGWPSListTopics_ObjStore_S3() {}
-
void send_response() override {
if (op_ret) {
set_req_state_err(s, op_ret);
}
};
-class RGWPSGetTopicOp : public RGWOp {
-protected:
- string topic_name;
- std::unique_ptr<RGWUserPubSub> ups;
- rgw_pubsub_topic_subs result;
-
+// command: GET /topics/<topic-name>
+class RGWPSGetTopic_ObjStore : public RGWPSGetTopicOp {
public:
- RGWPSGetTopicOp() {}
-
- int verify_permission() override {
- return 0;
- }
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
- void execute() override;
-
- const char* name() const override { return "pubsub_topic_get"; }
- virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_GET; }
- virtual uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
- virtual int get_params() = 0;
-};
-
-void RGWPSGetTopicOp::execute()
-{
- op_ret = get_params();
- if (op_ret < 0) {
- return;
- }
- ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
- op_ret = ups->get_topic(topic_name, &result);
- if (op_ret < 0) {
- ldout(s->cct, 20) << "failed to get topic, ret=" << op_ret << dendl;
- return;
- }
-}
-
-class RGWPSGetTopic_ObjStore_S3 : public RGWPSGetTopicOp {
-public:
- explicit RGWPSGetTopic_ObjStore_S3() {}
-
int get_params() override {
topic_name = s->object.name;
return 0;
}
};
-class RGWPSDeleteTopicOp : public RGWDefaultResponseOp {
-protected:
- string topic_name;
- std::unique_ptr<RGWUserPubSub> ups;
-
+// command: DELETE /topics/<topic-name>
+class RGWPSDeleteTopic_ObjStore : public RGWPSDeleteTopicOp {
public:
- RGWPSDeleteTopicOp() {}
-
- int verify_permission() override {
- return 0;
- }
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
- void execute() override;
-
- const char* name() const override { return "pubsub_topic_delete"; }
- virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_DELETE; }
- virtual uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
- virtual int get_params() = 0;
-};
-
-void RGWPSDeleteTopicOp::execute()
-{
- op_ret = get_params();
- if (op_ret < 0) {
- return;
- }
-
- ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
- op_ret = ups->remove_topic(topic_name);
- if (op_ret < 0) {
- ldout(s->cct, 20) << "failed to remove topic, ret=" << op_ret << dendl;
- return;
- }
-}
-
-class RGWPSDeleteTopic_ObjStore_S3 : public RGWPSDeleteTopicOp {
-public:
- explicit RGWPSDeleteTopic_ObjStore_S3() {}
-
int get_params() override {
topic_name = s->object.name;
return 0;
}
};
-class RGWHandler_REST_PSTopic_S3 : public RGWHandler_REST_S3 {
+// ceph specifc topics handler factory
+class RGWHandler_REST_PSTopic : public RGWHandler_REST_S3 {
protected:
int init_permissions(RGWOp* op) override {
return 0;
}
+
int read_permissions(RGWOp* op) override {
return 0;
}
+
bool supports_quota() override {
return false;
}
+
RGWOp *op_get() override {
if (s->init_state.url_bucket.empty()) {
return nullptr;
}
if (s->object.empty()) {
- return new RGWPSListTopics_ObjStore_S3();
+ return new RGWPSListTopics_ObjStore();
}
- return new RGWPSGetTopic_ObjStore_S3();
+ return new RGWPSGetTopic_ObjStore();
}
RGWOp *op_put() override {
if (!s->object.empty()) {
- return new RGWPSCreateTopic_ObjStore_S3();
+ return new RGWPSCreateTopic_ObjStore();
}
return nullptr;
}
RGWOp *op_delete() override {
if (!s->object.empty()) {
- return new RGWPSDeleteTopic_ObjStore_S3();
+ return new RGWPSDeleteTopic_ObjStore();
}
return nullptr;
}
public:
- explicit RGWHandler_REST_PSTopic_S3(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
- virtual ~RGWHandler_REST_PSTopic_S3() {}
+ explicit RGWHandler_REST_PSTopic(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
+ virtual ~RGWHandler_REST_PSTopic() = default;
};
-
-class RGWPSCreateSubOp : public RGWDefaultResponseOp {
-protected:
- string sub_name;
- string topic_name;
- std::unique_ptr<RGWUserPubSub> ups;
- rgw_pubsub_sub_dest dest;
-
+// command: PUT /subscriptions/<sub-name>?topic=<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]...
+class RGWPSCreateSub_ObjStore : public RGWPSCreateSubOp {
public:
- RGWPSCreateSubOp() {}
-
- int verify_permission() override {
- return 0;
- }
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
- void execute() override;
-
- const char* name() const override { return "pubsub_subscription_create"; }
- virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_CREATE; }
- virtual uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
- virtual int get_params() = 0;
-};
-
-void RGWPSCreateSubOp::execute()
-{
- op_ret = get_params();
- if (op_ret < 0) {
- return;
- }
- ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
- auto sub = ups->get_sub(sub_name);
- op_ret = sub->subscribe(topic_name, dest);
- if (op_ret < 0) {
- ldout(s->cct, 20) << "failed to create subscription, ret=" << op_ret << dendl;
- return;
- }
-}
-
-class RGWPSCreateSub_ObjStore_S3 : public RGWPSCreateSubOp {
-public:
- explicit RGWPSCreateSub_ObjStore_S3() {}
-
int get_params() override {
sub_name = s->object.name;
bool exists;
-
topic_name = s->info.args.get("topic", &exists);
if (!exists) {
- ldout(s->cct, 20) << "ERROR: missing required param 'topic' for request" << dendl;
+ ldout(s->cct, 1) << "missing required param 'topic'" << dendl;
return -EINVAL;
}
- auto psmodule = static_cast<RGWPSSyncModuleInstance *>(store->get_sync_module().get());
- auto conf = psmodule->get_effective_conf();
+ const auto psmodule = static_cast<RGWPSSyncModuleInstance*>(store->get_sync_module().get());
+ const auto& conf = psmodule->get_effective_conf();
dest.push_endpoint = s->info.args.get("push-endpoint");
dest.bucket_name = string(conf["data_bucket_prefix"]) + s->owner.get_id().to_str() + "-" + topic_name;
dest.oid_prefix = string(conf["data_oid_prefix"]) + sub_name + "/";
dest.push_endpoint_args = s->info.args.get_str();
+ dest.arn_topic = topic_name;
return 0;
}
};
-class RGWPSGetSubOp : public RGWOp {
-protected:
- string sub_name;
- std::unique_ptr<RGWUserPubSub> ups;
- rgw_pubsub_sub_config result;
-
-public:
- RGWPSGetSubOp() {}
-
- int verify_permission() override {
- return 0;
- }
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
- void execute() override;
-
- const char* name() const override { return "pubsub_subscription_get"; }
- virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_GET; }
- virtual uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
- virtual int get_params() = 0;
-};
-
-void RGWPSGetSubOp::execute()
-{
- op_ret = get_params();
- if (op_ret < 0) {
- return;
- }
- ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
- auto sub = ups->get_sub(sub_name);
- op_ret = sub->get_conf(&result);
- if (op_ret < 0) {
- ldout(s->cct, 20) << "failed to get subscription, ret=" << op_ret << dendl;
- return;
- }
-}
-
-class RGWPSGetSub_ObjStore_S3 : public RGWPSGetSubOp {
+// command: GET /subscriptions/<sub-name>
+class RGWPSGetSub_ObjStore : public RGWPSGetSubOp {
public:
- explicit RGWPSGetSub_ObjStore_S3() {}
-
int get_params() override {
sub_name = s->object.name;
return 0;
}
-
void send_response() override {
if (op_ret) {
set_req_state_err(s, op_ret);
return;
}
- {
- Formatter::ObjectSection section(*s->formatter, "result");
- encode_json("topic", result.topic, s->formatter);
- encode_json("push_endpoint", result.dest.push_endpoint, s->formatter);
- encode_json("args", result.dest.push_endpoint_args, s->formatter);
- }
+ encode_json("result", result, s->formatter);
rgw_flush_formatter_and_reset(s, s->formatter);
}
};
-class RGWPSDeleteSubOp : public RGWDefaultResponseOp {
-protected:
- string sub_name;
- string topic_name;
- std::unique_ptr<RGWUserPubSub> ups;
-
-public:
- RGWPSDeleteSubOp() {}
-
- int verify_permission() override {
- return 0;
- }
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
- void execute() override;
-
- const char* name() const override { return "pubsub_subscription_delete"; }
- virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_DELETE; }
- virtual uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
- virtual int get_params() = 0;
-};
-
-void RGWPSDeleteSubOp::execute()
-{
- op_ret = get_params();
- if (op_ret < 0) {
- return;
- }
- ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
- auto sub = ups->get_sub(sub_name);
- op_ret = sub->unsubscribe(topic_name);
- if (op_ret < 0) {
- ldout(s->cct, 20) << "failed to remove subscription, ret=" << op_ret << dendl;
- return;
- }
-}
-
-class RGWPSDeleteSub_ObjStore_S3 : public RGWPSDeleteSubOp {
+// command: DELETE /subscriptions/<sub-name>
+class RGWPSDeleteSub_ObjStore : public RGWPSDeleteSubOp {
public:
- explicit RGWPSDeleteSub_ObjStore_S3() {}
-
int get_params() override {
sub_name = s->object.name;
topic_name = s->info.args.get("topic");
}
};
-class RGWPSAckSubEventOp : public RGWDefaultResponseOp {
-protected:
- string sub_name;
- string event_id;
- std::unique_ptr<RGWUserPubSub> ups;
-
+// command: POST /subscriptions/<sub-name>?ack&event-id=<event-id>
+class RGWPSAckSubEvent_ObjStore : public RGWPSAckSubEventOp {
public:
- RGWPSAckSubEventOp() {}
-
- int verify_permission() override {
- return 0;
- }
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
- void execute() override;
-
- const char* name() const override { return "pubsub_subscription_ack"; }
- virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_ACK; }
- virtual uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
- virtual int get_params() = 0;
-};
-
-void RGWPSAckSubEventOp::execute()
-{
- op_ret = get_params();
- if (op_ret < 0) {
- return;
- }
- ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
- auto sub = ups->get_sub(sub_name);
- op_ret = sub->remove_event(event_id);
- if (op_ret < 0) {
- ldout(s->cct, 20) << "failed to ack event, ret=" << op_ret << dendl;
- return;
- }
-}
-
-class RGWPSAckSubEvent_ObjStore_S3 : public RGWPSAckSubEventOp {
-public:
- explicit RGWPSAckSubEvent_ObjStore_S3() {}
+ explicit RGWPSAckSubEvent_ObjStore() {}
int get_params() override {
sub_name = s->object.name;
event_id = s->info.args.get("event-id", &exists);
if (!exists) {
- ldout(s->cct, 20) << "ERROR: missing required param 'event-id' for request" << dendl;
+ ldout(s->cct, 1) << "missing required param 'event-id'" << dendl;
return -EINVAL;
}
return 0;
}
};
-class RGWPSPullSubEventsOp : public RGWOp {
-protected:
- int max_entries{0};
- string sub_name;
- string marker;
- std::unique_ptr<RGWUserPubSub> ups;
- RGWUserPubSub::Sub::list_events_result result;
-
-public:
- RGWPSPullSubEventsOp() {}
-
- int verify_permission() override {
- return 0;
- }
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
- void execute() override;
-
- const char* name() const override { return "pubsub_subscription_pull"; }
- virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_PULL; }
- virtual uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
- virtual int get_params() = 0;
-};
-
-void RGWPSPullSubEventsOp::execute()
-{
- op_ret = get_params();
- if (op_ret < 0) {
- return;
- }
- ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
- auto sub = ups->get_sub(sub_name);
- op_ret = sub->list_events(marker, max_entries, &result);
- if (op_ret < 0) {
- ldout(s->cct, 20) << "failed to get subscription, ret=" << op_ret << dendl;
- return;
- }
-}
-
-class RGWPSPullSubEvents_ObjStore_S3 : public RGWPSPullSubEventsOp {
+// command: GET /subscriptions/<sub-name>?events[&max-entries=<max-entries>][&marker=<marker>]
+class RGWPSPullSubEvents_ObjStore : public RGWPSPullSubEventsOp {
public:
- explicit RGWPSPullSubEvents_ObjStore_S3() {}
-
int get_params() override {
sub_name = s->object.name;
marker = s->info.args.get("marker");
-#define DEFAULT_MAX_ENTRIES 100
- int ret = s->info.args.get_int("max-entries", &max_entries, DEFAULT_MAX_ENTRIES);
+ const int ret = s->info.args.get_int("max-entries", &max_entries,
+ RGWUserPubSub::Sub::DEFAULT_MAX_EVENTS);
if (ret < 0) {
- ldout(s->cct, 20) << "failed to parse 'max-entries' param" << dendl;
+ ldout(s->cct, 1) << "failed to parse 'max-entries' param" << dendl;
return -EINVAL;
}
return 0;
return;
}
- encode_json("result", result, s->formatter);
+ encode_json("result", *sub, s->formatter);
rgw_flush_formatter_and_reset(s, s->formatter);
}
};
-class RGWHandler_REST_PSSub_S3 : public RGWHandler_REST_S3 {
+// subscriptions handler factory
+class RGWHandler_REST_PSSub : public RGWHandler_REST_S3 {
protected:
int init_permissions(RGWOp* op) override {
return 0;
return nullptr;
}
if (s->info.args.exists("events")) {
- return new RGWPSPullSubEvents_ObjStore_S3();
+ return new RGWPSPullSubEvents_ObjStore();
}
- return new RGWPSGetSub_ObjStore_S3();
+ return new RGWPSGetSub_ObjStore();
}
RGWOp *op_put() override {
if (!s->object.empty()) {
- return new RGWPSCreateSub_ObjStore_S3();
+ return new RGWPSCreateSub_ObjStore();
}
return nullptr;
}
RGWOp *op_delete() override {
if (!s->object.empty()) {
- return new RGWPSDeleteSub_ObjStore_S3();
+ return new RGWPSDeleteSub_ObjStore();
}
return nullptr;
}
RGWOp *op_post() override {
if (s->info.args.exists("ack")) {
- return new RGWPSAckSubEvent_ObjStore_S3();
+ return new RGWPSAckSubEvent_ObjStore();
}
return nullptr;
}
public:
- explicit RGWHandler_REST_PSSub_S3(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
- virtual ~RGWHandler_REST_PSSub_S3() {}
+ explicit RGWHandler_REST_PSSub(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
+ virtual ~RGWHandler_REST_PSSub() = default;
};
-
-static int notif_bucket_path(const string& path, string *bucket_name)
-{
+namespace {
+// extract bucket name from ceph specific notification command, with the format:
+// /notifications/<bucket-name>
+int notif_bucket_path(const string& path, std::string& bucket_name) {
if (path.empty()) {
return -EINVAL;
}
return -EINVAL;
}
- *bucket_name = path.substr(pos + 1);
+ bucket_name = path.substr(pos + 1);
return 0;
}
+}
-class RGWPSCreateNotifOp : public RGWDefaultResponseOp {
-protected:
- std::unique_ptr<RGWUserPubSub> ups;
- string topic_name;
- set<string, ltstr_nocase> events;
-
- string bucket_name;
- RGWBucketInfo bucket_info;
-
-public:
- RGWPSCreateNotifOp() {}
+// command (ceph specific): PUT /notification/bucket/<bucket name>?topic=<topic name>
+class RGWPSCreateNotif_ObjStore : public RGWPSCreateNotifOp {
+private:
+ std::string topic_name;
+ rgw::notify::EventTypeList events;
- int verify_permission() override {
- int ret = get_params();
- if (ret < 0) {
- return ret;
+ int get_params() override {
+ bool exists;
+ topic_name = s->info.args.get("topic", &exists);
+ if (!exists) {
+ ldout(s->cct, 1) << "missing required param 'topic'" << dendl;
+ return -EINVAL;
}
- ret = store->get_bucket_info(*s->sysobj_ctx, s->owner.get_id().tenant, bucket_name,
- bucket_info, nullptr, nullptr);
- if (ret < 0) {
- return ret;
+ std::string events_str = s->info.args.get("events", &exists);
+ if (!exists) {
+ // if no events are provided, we notify on all of them
+ events_str = "OBJECT_CREATE,OBJECT_DELETE,DELETE_MARKER_CREATE";
}
-
- if (bucket_info.owner != s->owner.get_id()) {
- ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl;
- return -EPERM;
+ rgw::notify::from_string_list(events_str, events);
+ if (std::find(events.begin(), events.end(), rgw::notify::UnknownEvent) != events.end()) {
+ ldout(s->cct, 1) << "invalid event type in list: " << events_str << dendl;
+ return -EINVAL;
}
- return 0;
- }
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
+ return notif_bucket_path(s->object.name, bucket_name);
}
- void execute() override;
+public:
const char* name() const override { return "pubsub_notification_create"; }
- virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_CREATE; }
- virtual uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
- virtual int get_params() = 0;
+ void execute() override;
};
-void RGWPSCreateNotifOp::execute()
+void RGWPSCreateNotif_ObjStore::execute()
{
- op_ret = get_params();
- if (op_ret < 0) {
- return;
- }
+ ups.emplace(store, s->owner.get_id());
- ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
auto b = ups->get_bucket(bucket_info.bucket);
op_ret = b->create_notification(topic_name, events);
if (op_ret < 0) {
- ldout(s->cct, 20) << "failed to create notification, ret=" << op_ret << dendl;
+ ldout(s->cct, 1) << "failed to create notification for topic '" << topic_name << "', ret=" << op_ret << dendl;
return;
}
+ ldout(s->cct, 20) << "successfully created notification for topic '" << topic_name << "'" << dendl;
}
-class RGWPSCreateNotif_ObjStore_S3 : public RGWPSCreateNotifOp {
-public:
- explicit RGWPSCreateNotif_ObjStore_S3() {}
+// command: DELETE /notifications/bucket/<bucket>?topic=<topic-name>
+class RGWPSDeleteNotif_ObjStore : public RGWPSDeleteNotifOp {
+private:
+ std::string topic_name;
int get_params() override {
bool exists;
topic_name = s->info.args.get("topic", &exists);
if (!exists) {
- ldout(s->cct, 20) << "param 'topic' not provided" << dendl;
+ ldout(s->cct, 1) << "missing required param 'topic'" << dendl;
return -EINVAL;
}
-
- string events_str = s->info.args.get("events", &exists);
- if (exists) {
- get_str_set(events_str, ",", events);
- }
- return notif_bucket_path(s->object.name, &bucket_name);
+ return notif_bucket_path(s->object.name, bucket_name);
}
-};
-
-class RGWPSDeleteNotifOp : public RGWDefaultResponseOp {
-protected:
- std::unique_ptr<RGWUserPubSub> ups;
- string topic_name;
- string bucket_name;
- RGWBucketInfo bucket_info;
public:
- RGWPSDeleteNotifOp() {}
-
- int verify_permission() override {
- int ret = get_params();
- if (ret < 0) {
- return ret;
- }
-
- ret = store->get_bucket_info(*s->sysobj_ctx, s->owner.get_id().tenant, bucket_name,
- bucket_info, nullptr, nullptr);
- if (ret < 0) {
- return ret;
- }
-
- if (bucket_info.owner != s->owner.get_id()) {
- ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl;
- return -EPERM;
- }
- return 0;
- }
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
void execute() override;
-
const char* name() const override { return "pubsub_notification_delete"; }
- virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_DELETE; }
- virtual uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
- virtual int get_params() = 0;
};
-void RGWPSDeleteNotifOp::execute()
-{
+void RGWPSDeleteNotif_ObjStore::execute() {
op_ret = get_params();
if (op_ret < 0) {
return;
}
- ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
+ ups.emplace(store, s->owner.get_id());
auto b = ups->get_bucket(bucket_info.bucket);
op_ret = b->remove_notification(topic_name);
if (op_ret < 0) {
- ldout(s->cct, 20) << "failed to remove notification, ret=" << op_ret << dendl;
+ ldout(s->cct, 1) << "failed to remove notification from topic '" << topic_name << "', ret=" << op_ret << dendl;
return;
}
+ ldout(s->cct, 20) << "successfully removed notification from topic '" << topic_name << "'" << dendl;
}
-class RGWPSDeleteNotif_ObjStore_S3 : public RGWPSCreateNotifOp {
-public:
- explicit RGWPSDeleteNotif_ObjStore_S3() {}
+// command: GET /notifications/bucket/<bucket>
+class RGWPSListNotifs_ObjStore : public RGWPSListNotifsOp {
+private:
+ rgw_pubsub_bucket_topics result;
int get_params() override {
- bool exists;
- topic_name = s->info.args.get("topic", &exists);
- if (!exists) {
- ldout(s->cct, 20) << "param 'topic' not provided" << dendl;
- return -EINVAL;
- }
- return notif_bucket_path(s->object.name, &bucket_name);
+ return notif_bucket_path(s->object.name, bucket_name);
}
-};
-
-class RGWPSListNotifsOp : public RGWOp {
-protected:
- string bucket_name;
- RGWBucketInfo bucket_info;
- std::unique_ptr<RGWUserPubSub> ups;
- rgw_pubsub_bucket_topics result;
-
public:
- RGWPSListNotifsOp() {}
-
- int verify_permission() override {
- int ret = get_params();
- if (ret < 0) {
- return ret;
- }
-
- ret = store->get_bucket_info(*s->sysobj_ctx, s->owner.get_id().tenant, bucket_name,
- bucket_info, nullptr, nullptr);
- if (ret < 0) {
- return ret;
- }
-
- if (bucket_info.owner != s->owner.get_id()) {
- ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl;
- return -EPERM;
- }
-
- return 0;
- }
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
void execute() override;
-
- const char* name() const override { return "pubsub_notifications_list"; }
- virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_LIST; }
- virtual uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
- virtual int get_params() = 0;
-};
-
-void RGWPSListNotifsOp::execute()
-{
- ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
- auto b = ups->get_bucket(bucket_info.bucket);
- op_ret = b->get_topics(&result);
- if (op_ret < 0) {
- ldout(s->cct, 20) << "failed to get topics, ret=" << op_ret << dendl;
- return;
- }
-
-}
-
-class RGWPSListNotifs_ObjStore_S3 : public RGWPSListNotifsOp {
-public:
- explicit RGWPSListNotifs_ObjStore_S3() {}
-
- int get_params() override {
- return notif_bucket_path(s->object.name, &bucket_name);
- }
-
void send_response() override {
if (op_ret) {
set_req_state_err(s, op_ret);
if (op_ret < 0) {
return;
}
-
encode_json("result", result, s->formatter);
rgw_flush_formatter_and_reset(s, s->formatter);
}
+ const char* name() const override { return "pubsub_notifications_list"; }
};
+void RGWPSListNotifs_ObjStore::execute()
+{
+ ups.emplace(store, s->owner.get_id());
+ auto b = ups->get_bucket(bucket_info.bucket);
+ op_ret = b->get_topics(&result);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to get topics, ret=" << op_ret << dendl;
+ return;
+ }
+}
-class RGWHandler_REST_PSNotifs_S3 : public RGWHandler_REST_S3 {
+// ceph specific notification handler factory
+class RGWHandler_REST_PSNotifs : public RGWHandler_REST_S3 {
protected:
int init_permissions(RGWOp* op) override {
return 0;
if (s->object.empty()) {
return nullptr;
}
- return new RGWPSListNotifs_ObjStore_S3();
+ return new RGWPSListNotifs_ObjStore();
}
RGWOp *op_put() override {
if (!s->object.empty()) {
- return new RGWPSCreateNotif_ObjStore_S3();
+ return new RGWPSCreateNotif_ObjStore();
}
return nullptr;
}
RGWOp *op_delete() override {
if (!s->object.empty()) {
- return new RGWPSDeleteNotif_ObjStore_S3();
+ return new RGWPSDeleteNotif_ObjStore();
}
return nullptr;
}
public:
- explicit RGWHandler_REST_PSNotifs_S3(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
- virtual ~RGWHandler_REST_PSNotifs_S3() {}
+ explicit RGWHandler_REST_PSNotifs(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
+ virtual ~RGWHandler_REST_PSNotifs() = default;
};
-
-RGWHandler_REST* RGWRESTMgr_PubSub_S3::get_handler(struct req_state* const s,
+// factory for ceph specific PubSub REST handlers
+RGWHandler_REST* RGWRESTMgr_PubSub::get_handler(struct req_state* const s,
const rgw::auth::StrategyRegistry& auth_registry,
const std::string& frontend_prefix)
{
- int ret =
- RGWHandler_REST_S3::init_from_header(s,
- RGW_FORMAT_JSON, true);
- if (ret < 0) {
+ if (RGWHandler_REST_S3::init_from_header(s, RGW_FORMAT_JSON, true) < 0) {
return nullptr;
}
+
+ RGWHandler_REST* handler{nullptr};
- RGWHandler_REST *handler = nullptr;;
-
+ // ceph specific PubSub API: topics/subscriptions/notification are reserved bucket names
+ // this API is available only on RGW that belong to a pubsub zone
if (s->init_state.url_bucket == "topics") {
- handler = new RGWHandler_REST_PSTopic_S3(auth_registry);
- }
-
- if (s->init_state.url_bucket == "subscriptions") {
- handler = new RGWHandler_REST_PSSub_S3(auth_registry);
- }
-
- if (s->init_state.url_bucket == "notifications") {
- handler = new RGWHandler_REST_PSNotifs_S3(auth_registry);
- }
-
+ handler = new RGWHandler_REST_PSTopic(auth_registry);
+ } else if (s->init_state.url_bucket == "subscriptions") {
+ handler = new RGWHandler_REST_PSSub(auth_registry);
+ } else if (s->init_state.url_bucket == "notifications") {
+ handler = new RGWHandler_REST_PSNotifs(auth_registry);
+ } else if (s->info.args.exists("notification")) {
+ const int ret = RGWHandler_REST::allocate_formatter(s, RGW_FORMAT_XML, true);
+ if (ret == 0) {
+ handler = new RGWHandler_REST_PSNotifs_S3(auth_registry);
+ }
+ }
+
ldout(s->cct, 20) << __func__ << " handler=" << (handler ? typeid(*handler).name() : "<null>") << dendl;
+
return handler;
}