#include "rgw_arn.h"
#include "rgw_zone.h"
#include "services/svc_zone.h"
+#include "rgw_sal_rados.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw
public:
int get_params() override {
- topic_name = s->object.name;
+ topic_name = s->object->get_name();
opaque_data = s->info.args.get("OpaqueData");
dest.push_endpoint = s->info.args.get("push-endpoint");
class RGWPSGetTopic_ObjStore : public RGWPSGetTopicOp {
public:
int get_params() override {
- topic_name = s->object.name;
+ topic_name = s->object->get_name();
return 0;
}
class RGWPSDeleteTopic_ObjStore : public RGWPSDeleteTopicOp {
public:
int get_params() override {
- topic_name = s->object.name;
+ topic_name = s->object->get_name();
return 0;
}
};
// ceph specifc topics handler factory
class RGWHandler_REST_PSTopic : public RGWHandler_REST_S3 {
protected:
- int init_permissions(RGWOp* op) override {
+ int init_permissions(RGWOp* op, optional_yield) override {
return 0;
}
- int read_permissions(RGWOp* op) override {
+ int read_permissions(RGWOp* op, optional_yield) override {
return 0;
}
if (s->init_state.url_bucket.empty()) {
return nullptr;
}
- if (s->object.empty()) {
+ if (s->object->empty()) {
return new RGWPSListTopics_ObjStore();
}
return new RGWPSGetTopic_ObjStore();
}
RGWOp *op_put() override {
- if (!s->object.empty()) {
+ if (!s->object->empty()) {
return new RGWPSCreateTopic_ObjStore();
}
return nullptr;
}
RGWOp *op_delete() override {
- if (!s->object.empty()) {
+ if (!s->object->empty()) {
return new RGWPSDeleteTopic_ObjStore();
}
return nullptr;
class RGWPSCreateSub_ObjStore : public RGWPSCreateSubOp {
public:
int get_params() override {
- sub_name = s->object.name;
+ sub_name = s->object->get_name();
bool exists;
topic_name = s->info.args.get("topic", &exists);
class RGWPSGetSub_ObjStore : public RGWPSGetSubOp {
public:
int get_params() override {
- sub_name = s->object.name;
+ sub_name = s->object->get_name();
return 0;
}
void send_response() override {
class RGWPSDeleteSub_ObjStore : public RGWPSDeleteSubOp {
public:
int get_params() override {
- sub_name = s->object.name;
+ sub_name = s->object->get_name();
topic_name = s->info.args.get("topic");
return 0;
}
explicit RGWPSAckSubEvent_ObjStore() {}
int get_params() override {
- sub_name = s->object.name;
+ sub_name = s->object->get_name();
bool exists;
class RGWPSPullSubEvents_ObjStore : public RGWPSPullSubEventsOp {
public:
int get_params() override {
- sub_name = s->object.name;
+ sub_name = s->object->get_name();
marker = s->info.args.get("marker");
const int ret = s->info.args.get_int("max-entries", &max_entries,
- RGWUserPubSub::Sub::DEFAULT_MAX_EVENTS);
+ RGWPubSub::Sub::DEFAULT_MAX_EVENTS);
if (ret < 0) {
ldout(s->cct, 1) << "failed to parse 'max-entries' param" << dendl;
return -EINVAL;
// subscriptions handler factory
class RGWHandler_REST_PSSub : public RGWHandler_REST_S3 {
protected:
- int init_permissions(RGWOp* op) override {
+ int init_permissions(RGWOp* op, optional_yield) override {
return 0;
}
- int read_permissions(RGWOp* op) override {
+ int read_permissions(RGWOp* op, optional_yield) override {
return 0;
}
bool supports_quota() override {
return false;
}
RGWOp *op_get() override {
- if (s->object.empty()) {
+ if (s->object->empty()) {
return nullptr;
}
if (s->info.args.exists("events")) {
return new RGWPSGetSub_ObjStore();
}
RGWOp *op_put() override {
- if (!s->object.empty()) {
+ if (!s->object->empty()) {
return new RGWPSCreateSub_ObjStore();
}
return nullptr;
}
RGWOp *op_delete() override {
- if (!s->object.empty()) {
+ if (!s->object->empty()) {
return new RGWPSDeleteSub_ObjStore();
}
return nullptr;
ldout(s->cct, 1) << "invalid event type in list: " << events_str << dendl;
return -EINVAL;
}
- return notif_bucket_path(s->object.name, bucket_name);
+ return notif_bucket_path(s->object->get_name(), bucket_name);
}
public:
const char* name() const override { return "pubsub_notification_create"; }
- void execute() override;
+ void execute(optional_yield y) override;
};
-void RGWPSCreateNotif_ObjStore::execute()
+void RGWPSCreateNotif_ObjStore::execute(optional_yield y)
{
- ups.emplace(store, s->owner.get_id());
+ ps.emplace(store, s->owner.get_id().tenant);
- auto b = ups->get_bucket(bucket_info.bucket);
- op_ret = b->create_notification(topic_name, events);
+ auto b = ps->get_bucket(bucket_info.bucket);
+ op_ret = b->create_notification(topic_name, events, y);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to create notification for topic '" << topic_name << "', ret=" << op_ret << dendl;
return;
ldout(s->cct, 1) << "missing required param 'topic'" << dendl;
return -EINVAL;
}
- return notif_bucket_path(s->object.name, bucket_name);
+ return notif_bucket_path(s->object->get_name(), bucket_name);
}
public:
- void execute() override;
+ void execute(optional_yield y) override;
const char* name() const override { return "pubsub_notification_delete"; }
};
-void RGWPSDeleteNotif_ObjStore::execute() {
+void RGWPSDeleteNotif_ObjStore::execute(optional_yield y) {
op_ret = get_params();
if (op_ret < 0) {
return;
}
- ups.emplace(store, s->owner.get_id());
- auto b = ups->get_bucket(bucket_info.bucket);
- op_ret = b->remove_notification(topic_name);
+ ps.emplace(store, s->owner.get_id().tenant);
+ auto b = ps->get_bucket(bucket_info.bucket);
+ op_ret = b->remove_notification(topic_name, y);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to remove notification from topic '" << topic_name << "', ret=" << op_ret << dendl;
return;
rgw_pubsub_bucket_topics result;
int get_params() override {
- return notif_bucket_path(s->object.name, bucket_name);
+ return notif_bucket_path(s->object->get_name(), bucket_name);
}
public:
- void execute() override;
+ void execute(optional_yield y) override;
void send_response() override {
if (op_ret) {
set_req_state_err(s, op_ret);
const char* name() const override { return "pubsub_notifications_list"; }
};
-void RGWPSListNotifs_ObjStore::execute()
+void RGWPSListNotifs_ObjStore::execute(optional_yield y)
{
- ups.emplace(store, s->owner.get_id());
- auto b = ups->get_bucket(bucket_info.bucket);
+ ps.emplace(store, s->owner.get_id().tenant);
+ auto b = ps->get_bucket(bucket_info.bucket);
op_ret = b->get_topics(&result);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to get topics, ret=" << op_ret << dendl;
// ceph specific notification handler factory
class RGWHandler_REST_PSNotifs : public RGWHandler_REST_S3 {
protected:
- int init_permissions(RGWOp* op) override {
+ int init_permissions(RGWOp* op, optional_yield) override {
return 0;
}
- int read_permissions(RGWOp* op) override {
+ int read_permissions(RGWOp* op, optional_yield) override {
return 0;
}
bool supports_quota() override {
return false;
}
RGWOp *op_get() override {
- if (s->object.empty()) {
+ if (s->object->empty()) {
return nullptr;
}
return new RGWPSListNotifs_ObjStore();
}
RGWOp *op_put() override {
- if (!s->object.empty()) {
+ if (!s->object->empty()) {
return new RGWPSCreateNotif_ObjStore();
}
return nullptr;
}
RGWOp *op_delete() override {
- if (!s->object.empty()) {
+ if (!s->object->empty()) {
return new RGWPSDeleteNotif_ObjStore();
}
return nullptr;
};
// factory for ceph specific PubSub REST handlers
-RGWHandler_REST* RGWRESTMgr_PubSub::get_handler(struct req_state* const s,
- const rgw::auth::StrategyRegistry& auth_registry,
- const std::string& frontend_prefix)
+RGWHandler_REST* RGWRESTMgr_PubSub::get_handler(rgw::sal::RGWRadosStore *store,
+ struct req_state* const s,
+ const rgw::auth::StrategyRegistry& auth_registry,
+ const std::string& frontend_prefix)
{
- if (RGWHandler_REST_S3::init_from_header(s, RGW_FORMAT_JSON, true) < 0) {
+ if (RGWHandler_REST_S3::init_from_header(store, s, RGW_FORMAT_JSON, true) < 0) {
return nullptr;
}