// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab ft=cpp
-#ifndef CEPH_RGW_PUBSUB_H
-#define CEPH_RGW_PUBSUB_H
+#pragma once
-#include "services/svc_sys_obj.h"
+#include "rgw_sal.h"
#include "rgw_tools.h"
#include "rgw_zone.h"
#include "rgw_notify_event_type.h"
#include <boost/container/flat_map.hpp>
-namespace rgw::sal { class RadosStore; }
-
class XMLObj;
struct rgw_s3_key_filter {
};
WRITE_CLASS_ENCODER(rgw_pubsub_s3_event)
-struct rgw_pubsub_event {
- constexpr static const char* const json_type_plural = "events";
- std::string id;
- std::string event_name;
- std::string source;
- ceph::real_time timestamp;
- JSONFormattable info;
-
- void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- encode(id, bl);
- encode(event_name, bl);
- encode(source, bl);
- encode(timestamp, bl);
- encode(info, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::const_iterator& bl) {
- DECODE_START(1, bl);
- decode(id, bl);
- decode(event_name, bl);
- decode(source, bl);
- decode(timestamp, bl);
- decode(info, bl);
- DECODE_FINISH(bl);
- }
-
- void dump(Formatter *f) const;
-};
-WRITE_CLASS_ENCODER(rgw_pubsub_event)
-
// setting a unique ID for an event based on object hash and timestamp
void set_event_id(std::string& id, const std::string& hash, const utime_t& ts);
-struct rgw_pubsub_sub_dest {
- std::string bucket_name;
- std::string oid_prefix;
+struct rgw_pubsub_dest {
std::string push_endpoint;
std::string push_endpoint_args;
std::string arn_topic;
void encode(bufferlist& bl) const {
ENCODE_START(5, 1, bl);
- encode(bucket_name, bl);
- encode(oid_prefix, bl);
+ encode("", bl);
+ encode("", bl);
encode(push_endpoint, bl);
encode(push_endpoint_args, bl);
encode(arn_topic, bl);
void decode(bufferlist::const_iterator& bl) {
DECODE_START(5, bl);
- decode(bucket_name, bl);
- decode(oid_prefix, bl);
+ std::string dummy;
+ decode(dummy, bl);
+ decode(dummy, bl);
decode(push_endpoint, bl);
if (struct_v >= 2) {
decode(push_endpoint_args, bl);
void dump_xml(Formatter *f) const;
std::string to_json_str() const;
};
-WRITE_CLASS_ENCODER(rgw_pubsub_sub_dest)
-
-struct rgw_pubsub_sub_config {
- rgw_user user;
- std::string name;
- std::string topic;
- rgw_pubsub_sub_dest dest;
- std::string s3_id;
-
- void encode(bufferlist& bl) const {
- ENCODE_START(2, 1, bl);
- encode(user, bl);
- encode(name, bl);
- encode(topic, bl);
- encode(dest, bl);
- encode(s3_id, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::const_iterator& bl) {
- DECODE_START(2, bl);
- decode(user, bl);
- decode(name, bl);
- decode(topic, bl);
- decode(dest, bl);
- if (struct_v >= 2) {
- decode(s3_id, bl);
- }
- DECODE_FINISH(bl);
- }
-
- void dump(Formatter *f) const;
-};
-WRITE_CLASS_ENCODER(rgw_pubsub_sub_config)
+WRITE_CLASS_ENCODER(rgw_pubsub_dest)
struct rgw_pubsub_topic {
rgw_user user;
std::string name;
- rgw_pubsub_sub_dest dest;
+ rgw_pubsub_dest dest;
std::string arn;
std::string opaque_data;
};
WRITE_CLASS_ENCODER(rgw_pubsub_topic)
+// this struct deprecated and remain only for backward compatibility
struct rgw_pubsub_topic_subs {
rgw_pubsub_topic topic;
std::set<std::string> subs;
encode(topic, bl);
// events are stored as a vector of std::strings
std::vector<std::string> tmp_events;
- const auto converter = s3_id.empty() ? rgw::notify::to_ceph_string : rgw::notify::to_string;
- std::transform(events.begin(), events.end(), std::back_inserter(tmp_events), converter);
+ std::transform(events.begin(), events.end(), std::back_inserter(tmp_events), rgw::notify::to_string);
encode(tmp_events, bl);
encode(s3_id, bl);
encode(s3_filter, bl);
WRITE_CLASS_ENCODER(rgw_pubsub_bucket_topics)
struct rgw_pubsub_topics {
- std::map<std::string, rgw_pubsub_topic_subs> topics;
+ std::map<std::string, rgw_pubsub_topic> topics;
void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
+ ENCODE_START(2, 2, bl);
encode(topics, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) {
- DECODE_START(1, bl);
- decode(topics, bl);
+ DECODE_START(2, bl);
+ if (struct_v >= 2) {
+ decode(topics, bl);
+ } else {
+ std::map<std::string, rgw_pubsub_topic_subs> v1topics;
+ decode(v1topics, bl);
+ std::transform(v1topics.begin(), v1topics.end(), std::inserter(topics, topics.end()),
+ [](const auto& entry) {
+ return std::pair<std::string, rgw_pubsub_topic>(entry.first, entry.second.topic);
+ });
+ }
DECODE_FINISH(bl);
}
};
WRITE_CLASS_ENCODER(rgw_pubsub_topics)
-static std::string pubsub_oid_prefix = "pubsub.";
-
class RGWPubSub
{
friend class Bucket;
- rgw::sal::RadosStore* store;
+ rgw::sal::Driver* const driver;
const std::string tenant;
- RGWSysObjectCtx obj_ctx;
-
- rgw_raw_obj meta_obj;
-
- std::string meta_oid() const {
- return pubsub_oid_prefix + tenant;
- }
-
- std::string bucket_meta_oid(const rgw_bucket& bucket) const {
- return pubsub_oid_prefix + tenant + ".bucket." + bucket.name + "/" + bucket.marker;
- }
-
- std::string sub_meta_oid(const std::string& name) const {
- return pubsub_oid_prefix + tenant + ".sub." + name;
- }
-
- template <class T>
- int read(const rgw_raw_obj& obj, T* data, RGWObjVersionTracker* objv_tracker);
-
- template <class T>
- int write(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, const T& info,
- RGWObjVersionTracker* obj_tracker, optional_yield y);
- int remove(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, RGWObjVersionTracker* objv_tracker,
- optional_yield y);
-
- int read_topics(rgw_pubsub_topics *result, RGWObjVersionTracker* objv_tracker);
+ int read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result,
+ RGWObjVersionTracker* objv_tracker, optional_yield y) const;
int write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_topics& topics,
- RGWObjVersionTracker* objv_tracker, optional_yield y);
+ RGWObjVersionTracker* objv_tracker, optional_yield y) const;
public:
- RGWPubSub(rgw::sal::RadosStore* _store, const std::string& tenant);
+ RGWPubSub(rgw::sal::Driver* _driver, const std::string& tenant);
class Bucket {
friend class RGWPubSub;
- RGWPubSub *ps;
- rgw_bucket bucket;
- rgw_raw_obj bucket_meta_obj;
+ const RGWPubSub& ps;
+ rgw::sal::Bucket* const bucket;
// read the list of topics associated with a bucket and populate into result
// use version tacker to enforce atomicity between read/write
// return 0 on success or if no topic was associated with the bucket, error code otherwise
- int read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker* objv_tracker);
+ int read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_bucket_topics& result,
+ RGWObjVersionTracker* objv_tracker, optional_yield y) const;
// set the list of topics associated with a bucket
// use version tacker to enforce atomicity between read/write
// return 0 on success, error code otherwise
int write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& topics,
- RGWObjVersionTracker* objv_tracker, optional_yield y);
+ RGWObjVersionTracker* objv_tracker, optional_yield y) const;
public:
- Bucket(RGWPubSub *_ps, const rgw_bucket& _bucket) : ps(_ps), bucket(_bucket) {
- ps->get_bucket_meta_obj(bucket, &bucket_meta_obj);
- }
+ Bucket(const RGWPubSub& _ps, rgw::sal::Bucket* _bucket) :
+ ps(_ps), bucket(_bucket)
+ {}
- // read the list of topics associated with a bucket and populate into result
+ // get the list of topics associated with a bucket and populate into result
// return 0 on success or if no topic was associated with the bucket, error code otherwise
- int get_topics(rgw_pubsub_bucket_topics *result);
+ int get_topics(const DoutPrefixProvider *dpp, rgw_pubsub_bucket_topics& result, optional_yield y) const {
+ return read_topics(dpp, result, nullptr, y);
+ }
// adds a topic + filter (event list, and possibly name metadata or tags filters) to a bucket
// assigning a notification name is optional (needed for S3 compatible notifications)
// if the topic already exist on the bucket, the filter event list may be updated
// for S3 compliant notifications the version with: s3_filter and notif_name should be used
// return -ENOENT if the topic does not exists
// return 0 on success, error code otherwise
- int create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, const rgw::notify::EventTypeList& events, optional_yield y);
- int create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y);
+ int create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name,
+ const rgw::notify::EventTypeList& events, optional_yield y) const;
+ int create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name,
+ const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) const;
// remove a topic and filter from bucket
// if the topic does not exists on the bucket it is a no-op (considered success)
// return -ENOENT if the topic does not exists
// return 0 on success, error code otherwise
- int remove_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, optional_yield y);
+ int remove_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, optional_yield y) const;
// remove all notifications (and autogenerated topics) associated with the bucket
// return 0 on success or if no topic was associated with the bucket, error code otherwise
- int remove_notifications(const DoutPrefixProvider *dpp, optional_yield y);
+ int remove_notifications(const DoutPrefixProvider *dpp, optional_yield y) const;
};
- // base class for subscription
- class Sub {
- friend class RGWPubSub;
- protected:
- RGWPubSub* const ps;
- const std::string sub;
- rgw_raw_obj sub_meta_obj;
-
- int read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker* objv_tracker);
- int write_sub(const DoutPrefixProvider *dpp, const rgw_pubsub_sub_config& sub_conf,
- RGWObjVersionTracker* objv_tracker, optional_yield y);
- int remove_sub(const DoutPrefixProvider *dpp, RGWObjVersionTracker* objv_tracker, optional_yield y);
- public:
- Sub(RGWPubSub *_ps, const std::string& _sub) : ps(_ps), sub(_sub) {
- ps->get_sub_meta_obj(sub, &sub_meta_obj);
- }
-
- virtual ~Sub() = default;
-
- int subscribe(const DoutPrefixProvider *dpp, const std::string& topic_name, const rgw_pubsub_sub_dest& dest, optional_yield y,
- const std::string& s3_id="");
- int unsubscribe(const DoutPrefixProvider *dpp, const std::string& topic_name, optional_yield y);
- int get_conf(rgw_pubsub_sub_config* result);
-
- static const int DEFAULT_MAX_EVENTS = 100;
- // followint virtual methods should only be called in derived
- virtual int list_events(const DoutPrefixProvider *dpp, const std::string& marker, int max_events) {ceph_assert(false);}
- virtual int remove_event(const DoutPrefixProvider *dpp, const std::string& event_id) {ceph_assert(false);}
- virtual void dump(Formatter* f) const {ceph_assert(false);}
- };
-
- // subscription with templated list of events to support both S3 compliant and Ceph specific events
- template<typename EventType>
- class SubWithEvents : public Sub {
- private:
- struct list_events_result {
- std::string next_marker;
- bool is_truncated{false};
- void dump(Formatter *f) const;
- std::vector<EventType> events;
- } list;
-
- public:
- SubWithEvents(RGWPubSub *_ps, const std::string& _sub) : Sub(_ps, _sub) {}
-
- virtual ~SubWithEvents() = default;
-
- int list_events(const DoutPrefixProvider *dpp, const std::string& marker, int max_events) override;
- int remove_event(const DoutPrefixProvider *dpp, const std::string& event_id) override;
- void dump(Formatter* f) const override;
- };
-
- using BucketRef = std::shared_ptr<Bucket>;
- using SubRef = std::shared_ptr<Sub>;
-
- BucketRef get_bucket(const rgw_bucket& bucket) {
- return std::make_shared<Bucket>(this, bucket);
+ // get the list of topics
+ // return 0 on success or if no topic was associated with the bucket, error code otherwise
+ int get_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result, optional_yield y) const {
+ return read_topics(dpp, result, nullptr, y);
}
-
- SubRef get_sub(const std::string& sub) {
- return std::make_shared<Sub>(this, sub);
- }
-
- SubRef get_sub_with_events(const std::string& sub) {
- auto tmpsub = Sub(this, sub);
- rgw_pubsub_sub_config conf;
- if (tmpsub.get_conf(&conf) < 0) {
- return nullptr;
- }
- if (conf.s3_id.empty()) {
- return std::make_shared<SubWithEvents<rgw_pubsub_event>>(this, sub);
- }
- return std::make_shared<SubWithEvents<rgw_pubsub_s3_event>>(this, sub);
- }
-
- void get_meta_obj(rgw_raw_obj *obj) const;
- void get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const;
-
- void get_sub_meta_obj(const std::string& name, rgw_raw_obj *obj) const;
-
- // get all topics (per tenant, if used)) and populate them into "result"
- // return 0 on success or if no topics exist, error code otherwise
- int get_topics(rgw_pubsub_topics *result);
- // get a topic with its subscriptions by its name and populate it into "result"
- // return -ENOENT if the topic does not exists
- // return 0 on success, error code otherwise
- int get_topic(const std::string& name, rgw_pubsub_topic_subs *result);
// get a topic with by its name and populate it into "result"
// return -ENOENT if the topic does not exists
// return 0 on success, error code otherwise
- int get_topic(const std::string& name, rgw_pubsub_topic *result);
+ int get_topic(const DoutPrefixProvider *dpp, const std::string& name, rgw_pubsub_topic& result, optional_yield y) const;
// create a topic with a name only
// if the topic already exists it is a no-op (considered success)
// return 0 on success, error code otherwise
- int create_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y);
+ int create_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y) const;
// create a topic with push destination information and ARN
// if the topic already exists the destination and ARN values may be updated (considered succsess)
// return 0 on success, error code otherwise
- int create_topic(const DoutPrefixProvider *dpp, const std::string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data, optional_yield y);
+ int create_topic(const DoutPrefixProvider *dpp, const std::string& name, const rgw_pubsub_dest& dest,
+ const std::string& arn, const std::string& opaque_data, optional_yield y) const;
// remove a topic according to its name
// if the topic does not exists it is a no-op (considered success)
// return 0 on success, error code otherwise
- int remove_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y);
+ int remove_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y) const;
};
-
-template <class T>
-int RGWPubSub::read(const rgw_raw_obj& obj, T* result, RGWObjVersionTracker* objv_tracker)
-{
- bufferlist bl;
- int ret = rgw_get_system_obj(obj_ctx,
- obj.pool, obj.oid,
- bl,
- objv_tracker,
- nullptr, null_yield, nullptr, nullptr);
- if (ret < 0) {
- return ret;
- }
-
- auto iter = bl.cbegin();
- try {
- decode(*result, iter);
- } catch (buffer::error& err) {
- return -EIO;
- }
-
- return 0;
-}
-
-template <class T>
-int RGWPubSub::write(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, const T& info,
- RGWObjVersionTracker* objv_tracker, optional_yield y)
-{
- bufferlist bl;
- encode(info, bl);
-
- int ret = rgw_put_system_obj(dpp, obj_ctx, obj.pool, obj.oid,
- bl, false, objv_tracker,
- real_time(), y);
- if (ret < 0) {
- return ret;
- }
-
- obj_ctx.invalidate(obj);
- return 0;
-}
-
-#endif