]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_pubsub.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rgw / rgw_pubsub.h
index f11b40b14b9ff3169437a5c3cac0c92b22b7fff6..974581ce3d9175d124c4eea6f7d03ebcd233c7a1 100644 (file)
@@ -1,17 +1,14 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab ft=cpp
 
-#ifndef CEPH_RGW_PUBSUB_H
-#define CEPH_RGW_PUBSUB_H
+#pragma once
 
-#include "services/svc_sys_obj.h"
+#include "rgw_sal.h"
 #include "rgw_tools.h"
 #include "rgw_zone.h"
 #include "rgw_notify_event_type.h"
 #include <boost/container/flat_map.hpp>
 
-namespace rgw::sal { class RadosStore; }
-
 class XMLObj;
 
 struct rgw_s3_key_filter {
@@ -332,44 +329,10 @@ struct rgw_pubsub_s3_event {
 };
 WRITE_CLASS_ENCODER(rgw_pubsub_s3_event)
 
-struct rgw_pubsub_event {
-  constexpr static const char* const json_type_plural = "events";
-  std::string id;
-  std::string event_name;
-  std::string source;
-  ceph::real_time timestamp;
-  JSONFormattable info;
-
-  void encode(bufferlist& bl) const {
-    ENCODE_START(1, 1, bl);
-    encode(id, bl);
-    encode(event_name, bl);
-    encode(source, bl);
-    encode(timestamp, bl);
-    encode(info, bl);
-    ENCODE_FINISH(bl);
-  }
-
-  void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(1, bl);
-    decode(id, bl);
-    decode(event_name, bl);
-    decode(source, bl);
-    decode(timestamp, bl);
-    decode(info, bl);
-    DECODE_FINISH(bl);
-  }
-
-  void dump(Formatter *f) const;
-};
-WRITE_CLASS_ENCODER(rgw_pubsub_event)
-
 // setting a unique ID for an event based on object hash and timestamp
 void set_event_id(std::string& id, const std::string& hash, const utime_t& ts);
 
-struct rgw_pubsub_sub_dest {
-  std::string bucket_name;
-  std::string oid_prefix;
+struct rgw_pubsub_dest {
   std::string push_endpoint;
   std::string push_endpoint_args;
   std::string arn_topic;
@@ -378,8 +341,8 @@ struct rgw_pubsub_sub_dest {
 
   void encode(bufferlist& bl) const {
     ENCODE_START(5, 1, bl);
-    encode(bucket_name, bl);
-    encode(oid_prefix, bl);
+    encode("", bl);
+    encode("", bl);
     encode(push_endpoint, bl);
     encode(push_endpoint_args, bl);
     encode(arn_topic, bl);
@@ -390,8 +353,9 @@ struct rgw_pubsub_sub_dest {
 
   void decode(bufferlist::const_iterator& bl) {
     DECODE_START(5, bl);
-    decode(bucket_name, bl);
-    decode(oid_prefix, bl);
+    std::string dummy;
+    decode(dummy, bl);
+    decode(dummy, bl);
     decode(push_endpoint, bl);
     if (struct_v >= 2) {
         decode(push_endpoint_args, bl);
@@ -412,45 +376,12 @@ struct rgw_pubsub_sub_dest {
   void dump_xml(Formatter *f) const;
   std::string to_json_str() const;
 };
-WRITE_CLASS_ENCODER(rgw_pubsub_sub_dest)
-
-struct rgw_pubsub_sub_config {
-  rgw_user user;
-  std::string name;
-  std::string topic;
-  rgw_pubsub_sub_dest dest;
-  std::string s3_id;
-
-  void encode(bufferlist& bl) const {
-    ENCODE_START(2, 1, bl);
-    encode(user, bl);
-    encode(name, bl);
-    encode(topic, bl);
-    encode(dest, bl);
-    encode(s3_id, bl);
-    ENCODE_FINISH(bl);
-  }
-
-  void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(2, bl);
-    decode(user, bl);
-    decode(name, bl);
-    decode(topic, bl);
-    decode(dest, bl);
-    if (struct_v >= 2) {
-      decode(s3_id, bl);
-    }
-    DECODE_FINISH(bl);
-  }
-
-  void dump(Formatter *f) const;
-};
-WRITE_CLASS_ENCODER(rgw_pubsub_sub_config)
+WRITE_CLASS_ENCODER(rgw_pubsub_dest)
 
 struct rgw_pubsub_topic {
   rgw_user user;
   std::string name;
-  rgw_pubsub_sub_dest dest;
+  rgw_pubsub_dest dest;
   std::string arn;
   std::string opaque_data;
 
@@ -492,6 +423,7 @@ struct rgw_pubsub_topic {
 };
 WRITE_CLASS_ENCODER(rgw_pubsub_topic)
 
+// this struct deprecated and remain only for backward compatibility
 struct rgw_pubsub_topic_subs {
   rgw_pubsub_topic topic;
   std::set<std::string> subs;
@@ -525,8 +457,7 @@ struct rgw_pubsub_topic_filter {
     encode(topic, bl);
     // events are stored as a vector of std::strings
     std::vector<std::string> tmp_events;
-    const auto converter = s3_id.empty() ? rgw::notify::to_ceph_string : rgw::notify::to_string;
-    std::transform(events.begin(), events.end(), std::back_inserter(tmp_events), converter);
+    std::transform(events.begin(), events.end(), std::back_inserter(tmp_events), rgw::notify::to_string);
     encode(tmp_events, bl);
     encode(s3_id, bl);
     encode(s3_filter, bl);
@@ -574,17 +505,26 @@ struct rgw_pubsub_bucket_topics {
 WRITE_CLASS_ENCODER(rgw_pubsub_bucket_topics)
 
 struct rgw_pubsub_topics {
-  std::map<std::string, rgw_pubsub_topic_subs> topics;
+  std::map<std::string, rgw_pubsub_topic> topics;
 
   void encode(bufferlist& bl) const {
-    ENCODE_START(1, 1, bl);
+    ENCODE_START(2, 2, bl);
     encode(topics, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(1, bl);
-    decode(topics, bl);
+    DECODE_START(2, bl);
+    if (struct_v >= 2) {
+      decode(topics, bl);
+    } else {
+      std::map<std::string, rgw_pubsub_topic_subs> v1topics;
+      decode(v1topics, bl);
+      std::transform(v1topics.begin(), v1topics.end(), std::inserter(topics, topics.end()),
+          [](const auto& entry) {
+            return std::pair<std::string, rgw_pubsub_topic>(entry.first, entry.second.topic); 
+          });
+    }
     DECODE_FINISH(bl);
   }
 
@@ -593,233 +533,87 @@ struct rgw_pubsub_topics {
 };
 WRITE_CLASS_ENCODER(rgw_pubsub_topics)
 
-static std::string pubsub_oid_prefix = "pubsub.";
-
 class RGWPubSub
 {
   friend class Bucket;
 
-  rgw::sal::RadosStore* store;
+  rgw::sal::Driver* const driver;
   const std::string tenant;
-  RGWSysObjectCtx obj_ctx;
-
-  rgw_raw_obj meta_obj;
-
-  std::string meta_oid() const {
-    return pubsub_oid_prefix + tenant;
-  }
-
-  std::string bucket_meta_oid(const rgw_bucket& bucket) const {
-    return pubsub_oid_prefix + tenant + ".bucket." + bucket.name + "/" + bucket.marker;
-  }
-
-  std::string sub_meta_oid(const std::string& name) const {
-    return pubsub_oid_prefix + tenant + ".sub." + name;
-  }
-
-  template <class T>
-  int read(const rgw_raw_obj& obj, T* data, RGWObjVersionTracker* objv_tracker);
-
-  template <class T>
-  int write(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, const T& info,
-           RGWObjVersionTracker* obj_tracker, optional_yield y);
 
-  int remove(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, RGWObjVersionTracker* objv_tracker,
-            optional_yield y);
-
-  int read_topics(rgw_pubsub_topics *result, RGWObjVersionTracker* objv_tracker);
+  int read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result, 
+      RGWObjVersionTracker* objv_tracker, optional_yield y) const;
   int write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_topics& topics,
-                       RGWObjVersionTracker* objv_tracker, optional_yield y);
+                       RGWObjVersionTracker* objv_tracker, optional_yield y) const;
 
 public:
-  RGWPubSub(rgw::sal::RadosStore* _store, const std::string& tenant);
+  RGWPubSub(rgw::sal::Driver* _driver, const std::string& tenant);
 
   class Bucket {
     friend class RGWPubSub;
-    RGWPubSub *ps;
-    rgw_bucket bucket;
-    rgw_raw_obj bucket_meta_obj;
+    const RGWPubSub& ps;
+    rgw::sal::Bucket* const bucket;
 
     // read the list of topics associated with a bucket and populate into result
     // use version tacker to enforce atomicity between read/write
     // return 0 on success or if no topic was associated with the bucket, error code otherwise
-    int read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker* objv_tracker);
+    int read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_bucket_topics& result, 
+        RGWObjVersionTracker* objv_tracker, optional_yield y) const;
     // set the list of topics associated with a bucket
     // use version tacker to enforce atomicity between read/write
     // return 0 on success, error code otherwise
     int write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& topics,
-                    RGWObjVersionTracker* objv_tracker, optional_yield y);
+                    RGWObjVersionTracker* objv_tracker, optional_yield y) const;
   public:
-    Bucket(RGWPubSub *_ps, const rgw_bucket& _bucket) : ps(_ps), bucket(_bucket) {
-      ps->get_bucket_meta_obj(bucket, &bucket_meta_obj);
-    }
+    Bucket(const RGWPubSub& _ps, rgw::sal::Bucket* _bucket) : 
+      ps(_ps), bucket(_bucket)
+    {}
 
-    // read the list of topics associated with a bucket and populate into result
+    // get the list of topics associated with a bucket and populate into result
     // return 0 on success or if no topic was associated with the bucket, error code otherwise
-    int get_topics(rgw_pubsub_bucket_topics *result);
+    int get_topics(const DoutPrefixProvider *dpp, rgw_pubsub_bucket_topics& result, optional_yield y) const {
+      return read_topics(dpp, result, nullptr, y);
+    }
     // adds a topic + filter (event list, and possibly name metadata or tags filters) to a bucket
     // assigning a notification name is optional (needed for S3 compatible notifications)
     // if the topic already exist on the bucket, the filter event list may be updated
     // for S3 compliant notifications the version with: s3_filter and notif_name should be used
     // return -ENOENT if the topic does not exists
     // return 0 on success, error code otherwise
-    int create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, const rgw::notify::EventTypeList& events, optional_yield y);
-    int create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y);
+    int create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, 
+        const rgw::notify::EventTypeList& events, optional_yield y) const;
+    int create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, 
+        const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) const;
     // remove a topic and filter from bucket
     // if the topic does not exists on the bucket it is a no-op (considered success)
     // return -ENOENT if the topic does not exists
     // return 0 on success, error code otherwise
-    int remove_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, optional_yield y);
+    int remove_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, optional_yield y) const;
     // remove all notifications (and autogenerated topics) associated with the bucket
     // return 0 on success or if no topic was associated with the bucket, error code otherwise
-    int remove_notifications(const DoutPrefixProvider *dpp, optional_yield y);
+    int remove_notifications(const DoutPrefixProvider *dpp, optional_yield y) const;
   };
 
-  // base class for subscription
-  class Sub {
-    friend class RGWPubSub;
-  protected:
-    RGWPubSub* const ps;
-    const std::string sub;
-    rgw_raw_obj sub_meta_obj;
-
-    int read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker* objv_tracker);
-    int write_sub(const DoutPrefixProvider *dpp, const rgw_pubsub_sub_config& sub_conf,
-                 RGWObjVersionTracker* objv_tracker, optional_yield y);
-    int remove_sub(const DoutPrefixProvider *dpp, RGWObjVersionTracker* objv_tracker, optional_yield y);
-  public:
-    Sub(RGWPubSub *_ps, const std::string& _sub) : ps(_ps), sub(_sub) {
-      ps->get_sub_meta_obj(sub, &sub_meta_obj);
-    }
-
-    virtual ~Sub() = default;
-
-    int subscribe(const DoutPrefixProvider *dpp, const std::string& topic_name, const rgw_pubsub_sub_dest& dest, optional_yield y,
-                 const std::string& s3_id="");
-    int unsubscribe(const DoutPrefixProvider *dpp, const std::string& topic_name, optional_yield y);
-    int get_conf(rgw_pubsub_sub_config* result);
-    
-    static const int DEFAULT_MAX_EVENTS = 100;
-    // followint virtual methods should only be called in derived
-    virtual int list_events(const DoutPrefixProvider *dpp, const std::string& marker, int max_events) {ceph_assert(false);}
-    virtual int remove_event(const DoutPrefixProvider *dpp, const std::string& event_id) {ceph_assert(false);}
-    virtual void dump(Formatter* f) const {ceph_assert(false);}
-  };
-
-  // subscription with templated list of events to support both S3 compliant and Ceph specific events
-  template<typename EventType>
-  class SubWithEvents : public Sub {
-  private:
-    struct list_events_result {
-      std::string next_marker;
-      bool is_truncated{false};
-      void dump(Formatter *f) const;
-      std::vector<EventType> events;
-    } list;
-
-  public:
-    SubWithEvents(RGWPubSub *_ps, const std::string& _sub) : Sub(_ps, _sub) {}
-
-    virtual ~SubWithEvents() = default;
-    
-    int list_events(const DoutPrefixProvider *dpp, const std::string& marker, int max_events) override;
-    int remove_event(const DoutPrefixProvider *dpp, const std::string& event_id) override;
-    void dump(Formatter* f) const override;
-  };
-
-  using BucketRef = std::shared_ptr<Bucket>;
-  using SubRef = std::shared_ptr<Sub>;
-
-  BucketRef get_bucket(const rgw_bucket& bucket) {
-    return std::make_shared<Bucket>(this, bucket);
+  // get the list of topics
+  // return 0 on success or if no topic was associated with the bucket, error code otherwise
+  int get_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result, optional_yield y) const {
+    return read_topics(dpp, result, nullptr, y);
   }
-
-  SubRef get_sub(const std::string& sub) {
-    return std::make_shared<Sub>(this, sub);
-  }
-  
-  SubRef get_sub_with_events(const std::string& sub) {
-    auto tmpsub = Sub(this, sub);
-    rgw_pubsub_sub_config conf;
-    if (tmpsub.get_conf(&conf) < 0) {
-      return nullptr;
-    }
-    if (conf.s3_id.empty()) {
-      return std::make_shared<SubWithEvents<rgw_pubsub_event>>(this, sub);
-    }
-    return std::make_shared<SubWithEvents<rgw_pubsub_s3_event>>(this, sub);
-  }
-
-  void get_meta_obj(rgw_raw_obj *obj) const;
-  void get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const;
-
-  void get_sub_meta_obj(const std::string& name, rgw_raw_obj *obj) const;
-
-  // get all topics (per tenant, if used)) and populate them into "result"
-  // return 0 on success or if no topics exist, error code otherwise
-  int get_topics(rgw_pubsub_topics *result);
-  // get a topic with its subscriptions by its name and populate it into "result"
-  // return -ENOENT if the topic does not exists 
-  // return 0 on success, error code otherwise
-  int get_topic(const std::string& name, rgw_pubsub_topic_subs *result);
   // get a topic with by its name and populate it into "result"
   // return -ENOENT if the topic does not exists 
   // return 0 on success, error code otherwise
-  int get_topic(const std::string& name, rgw_pubsub_topic *result);
+  int get_topic(const DoutPrefixProvider *dpp, const std::string& name, rgw_pubsub_topic& result, optional_yield y) const;
   // create a topic with a name only
   // if the topic already exists it is a no-op (considered success)
   // return 0 on success, error code otherwise
-  int create_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y);
+  int create_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y) const;
   // create a topic with push destination information and ARN
   // if the topic already exists the destination and ARN values may be updated (considered succsess)
   // return 0 on success, error code otherwise
-  int create_topic(const DoutPrefixProvider *dpp, const std::string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data, optional_yield y);
+  int create_topic(const DoutPrefixProvider *dpp, const std::string& name, const rgw_pubsub_dest& dest, 
+      const std::string& arn, const std::string& opaque_data, optional_yield y) const;
   // remove a topic according to its name
   // if the topic does not exists it is a no-op (considered success)
   // return 0 on success, error code otherwise
-  int remove_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y);
+  int remove_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y) const;
 };
 
-
-template <class T>
-int RGWPubSub::read(const rgw_raw_obj& obj, T* result, RGWObjVersionTracker* objv_tracker)
-{
-  bufferlist bl;
-  int ret = rgw_get_system_obj(obj_ctx,
-                               obj.pool, obj.oid,
-                               bl,
-                               objv_tracker,
-                               nullptr, null_yield, nullptr, nullptr);
-  if (ret < 0) {
-    return ret;
-  }
-
-  auto iter = bl.cbegin();
-  try {
-    decode(*result, iter);
-  } catch (buffer::error& err) {
-    return -EIO;
-  }
-
-  return 0;
-}
-
-template <class T>
-int RGWPubSub::write(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, const T& info,
-                        RGWObjVersionTracker* objv_tracker, optional_yield y)
-{
-  bufferlist bl;
-  encode(info, bl);
-
-  int ret = rgw_put_system_obj(dpp, obj_ctx, obj.pool, obj.oid,
-                              bl, false, objv_tracker,
-                              real_time(), y);
-  if (ret < 0) {
-    return ret;
-  }
-
-  obj_ctx.invalidate(obj);
-  return 0;
-}
-
-#endif