]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_sync_module_pubsub.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rgw / rgw_sync_module_pubsub.cc
index d78bb29c76c5c6167ad7fd6065eeb2d48798e745..11388eadf1a7631fe837837e731a8c394d2d528f 100644 (file)
 #include "rgw_pubsub_push.h"
 #include "rgw_notify_event_type.h"
 #include "rgw_perf_counters.h"
-#ifdef WITH_RADOSGW_AMQP_ENDPOINT
-#include "rgw_amqp.h"
-#endif
-#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
-#include "rgw_kafka.h"
-#endif
 
 #include <boost/algorithm/hex.hpp>
 #include <boost/asio/yield.hpp>
@@ -43,28 +37,6 @@ config:
    "data_oid_prefix": <prefix>     #
    "events_retention_days": <int>  # default: 7
    "start_with_full_sync" <bool>   # default: false
-
-    # non-dynamic config
-    "notifications": [
-        {
-            "path": <notification-path>,    # this can be either an explicit path: <bucket>, or <bucket>/<object>,
-                                            # or a prefix if it ends with a wildcard
-            "topic": <topic-name>
-         },
-        ...
-    ],
-    "subscriptions": [
-        {
-            "name": <subscription-name>,
-            "topic": <topic>,
-            "push_endpoint": <endpoint>,
-            "push_endpoint_args:" <arg list>.            # any push endpoint specific args (include all args)
-            "data_bucket": <bucket>,       # override name of bucket where subscription data will be store
-            "data_oid_prefix": <prefix>    # set prefix for subscription data object ids
-            "s3_id": <id>                  # in case of S3 compatible notifications, the notification ID will be set here
-        },
-        ...
-    ]
 }
 
 */
@@ -119,28 +91,6 @@ struct PSSubConfig {
     encode_json("s3_id", s3_id, f);
   }
 
-  void init(CephContext *cct, const JSONFormattable& config,
-            const string& data_bucket_prefix,
-            const string& default_oid_prefix) {
-    name = config["name"];
-    topic = config["topic"];
-    push_endpoint_name = config["push_endpoint"];
-    string default_bucket_name = data_bucket_prefix + name;
-    data_bucket_name = config["data_bucket"](default_bucket_name.c_str());
-    data_oid_prefix = config["data_oid_prefix"](default_oid_prefix.c_str());
-    s3_id = config["s3_id"];
-    arn_topic = config["arn_topic"];
-    if (!push_endpoint_name.empty()) {
-      push_endpoint_args = config["push_endpoint_args"];
-      try {
-        push_endpoint = RGWPubSubEndpoint::create(push_endpoint_name, arn_topic, string_to_args(push_endpoint_args), cct);
-        ldout(cct, 20) << "push endpoint created: " << push_endpoint->to_str() << dendl;
-      } catch (const RGWPubSubEndpoint::configuration_error& e) {
-        ldout(cct, 1) << "ERROR: failed to create push endpoint: " 
-          << push_endpoint_name << " due to: " << e.what() << dendl;
-      }
-    }
-  }
 };
 
 using  PSSubConfigRef = std::shared_ptr<PSSubConfig>;
@@ -196,22 +146,14 @@ static string json_str(const char *name, const T& obj, bool pretty = false)
 using PSTopicConfigRef = std::shared_ptr<PSTopicConfig>;
 using TopicsRef = std::shared_ptr<std::vector<PSTopicConfigRef>>;
 
+// global pubsub configuration
 struct PSConfig {
   const std::string id{"pubsub"};
   rgw_user user;
   std::string data_bucket_prefix;
   std::string data_oid_prefix;
-
   int events_retention_days{0};
-
   uint64_t sync_instance{0};
-  uint64_t max_id{0};
-
-  /* FIXME: no hard coded buckets, we'll have configurable topics */
-  std::map<std::string, PSSubConfigRef> subs;
-  std::map<std::string, PSTopicConfigRef> topics;
-  std::multimap<std::string, PSNotificationConfig> notifications;
-  
   bool start_with_full_sync{false};
 
   void dump(Formatter *f) const {
@@ -221,37 +163,6 @@ struct PSConfig {
     encode_json("data_oid_prefix", data_oid_prefix, f);
     encode_json("events_retention_days", events_retention_days, f);
     encode_json("sync_instance", sync_instance, f);
-    encode_json("max_id", max_id, f);
-    {
-      Formatter::ArraySection section(*f, "subs");
-      for (auto& sub : subs) {
-        encode_json("sub", *sub.second, f);
-      }
-    }
-    {
-      Formatter::ArraySection section(*f, "topics");
-      for (auto& topic : topics) {
-        encode_json("topic", *topic.second, f);
-      }
-    }
-    {
-      Formatter::ObjectSection section(*f, "notifications");
-      std::string last;
-      for (auto& notif : notifications) {
-        const string& n = notif.first;
-        if (n != last) {
-          if (!last.empty()) {
-            f->close_section();
-          }
-          f->open_array_section(n.c_str());
-        }
-        last = n;
-        encode_json("notifications", notif.second, f);
-      }
-      if (!last.empty()) {
-        f->close_section();
-      }
-    }
     encode_json("start_with_full_sync", start_with_full_sync, f);
   }
 
@@ -261,77 +172,14 @@ struct PSConfig {
     data_bucket_prefix = config["data_bucket_prefix"]("pubsub-");
     data_oid_prefix = config["data_oid_prefix"];
     events_retention_days = config["events_retention_days"](PUBSUB_EVENTS_RETENTION_DEFAULT);
-
-    for (auto& c : config["notifications"].array()) {
-      PSNotificationConfig nc;
-      nc.id = ++max_id;
-      nc.init(cct, c);
-      notifications.insert(std::make_pair(nc.path, nc));
-
-      PSTopicConfig topic_config = { .name = nc.topic };
-      topics[nc.topic] = make_shared<PSTopicConfig>(topic_config);
-    }
-    for (auto& c : config["subscriptions"].array()) {
-      auto sc = std::make_shared<PSSubConfig>();
-      sc->init(cct, c, data_bucket_prefix, data_oid_prefix);
-      subs[sc->name] = sc;
-      auto iter = topics.find(sc->topic);
-      if (iter != topics.end()) {
-        iter->second->subs.insert(sc->name);
-      }
-    }
     start_with_full_sync = config["start_with_full_sync"](false);
 
-    ldout(cct, 5) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl;
+    ldout(cct, 20) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl;
   }
 
   void init_instance(const RGWRealm& realm, uint64_t instance_id) {
     sync_instance = instance_id;
   }
-
-  void get_topics(CephContext *cct, const rgw_bucket& bucket, const rgw_obj_key& key, TopicsRef *result) {
-    const std::string path = bucket.name + "/" + key.name;
-
-    auto iter = notifications.upper_bound(path);
-    if (iter == notifications.begin()) {
-      return;
-    }
-
-    do {
-      --iter;
-      if (iter->first.size() > path.size()) {
-        break;
-      }
-      if (path.compare(0, iter->first.size(), iter->first) != 0) {
-        break;
-      }
-
-      PSNotificationConfig& target = iter->second;
-
-      if (!target.is_prefix &&
-          path.size() != iter->first.size()) {
-        continue;
-      }
-
-      auto topic = topics.find(target.topic);
-      if (topic == topics.end()) {
-        continue;
-      }
-
-      ldout(cct, 20) << ": found topic for path=" << bucket << "/" << key << ": id=" << target.id << 
-          " target_path=" << target.path << ", topic=" << target.topic << dendl;
-      (*result)->push_back(topic->second);
-    } while (iter != notifications.begin());
-  }
-
-  bool find_sub(const string& name, PSSubConfigRef *ref) {
-    auto iter = subs.find(name);
-    if (iter != subs.end()) {
-      *ref = iter->second;
-      return true;
-    }
-    return false;
-  }
 };
 
 using PSConfigRef = std::shared_ptr<PSConfig>;
@@ -410,38 +258,38 @@ static void make_event_ref(CephContext *cct, const rgw_bucket& bucket,
   encode_json("info", oevent, &e->info);
 }
 
-static void make_s3_record_ref(CephContext *cct, const rgw_bucket& bucket,
+static void make_s3_event_ref(CephContext *cct, const rgw_bucket& bucket,
                        const rgw_user& owner,
                        const rgw_obj_key& key,
                        const ceph::real_time& mtime,
-                       const std::vector<std::pair<std::string, std::string> > *attrs,
+                       const std::vector<std::pair<std::string, std::string>>* attrs,
                        rgw::notify::EventType event_type,
-                       EventRef<rgw_pubsub_s3_record> *record) {
-  *record = std::make_shared<rgw_pubsub_s3_record>();
+                       EventRef<rgw_pubsub_s3_event>* event) {
+  *event = std::make_shared<rgw_pubsub_s3_event>();
 
-  EventRef<rgw_pubsub_s3_record>& r = *record;
-  r->eventTime = mtime;
-  r->eventName = rgw::notify::to_string(event_type);
+  EventRef<rgw_pubsub_s3_event>& e = *event;
+  e->eventTime = mtime;
+  e->eventName = rgw::notify::to_string(event_type);
   // userIdentity: not supported in sync module
   // x_amz_request_id: not supported in sync module
   // x_amz_id_2: not supported in sync module
   // configurationId is filled from subscription configuration
-  r->bucket_name = bucket.name;
-  r->bucket_ownerIdentity = owner.to_str();
-  r->bucket_arn = to_string(rgw::ARN(bucket));
-  r->bucket_id = bucket.bucket_id; // rgw extension
-  r->object_key = key.name;
+  e->bucket_name = bucket.name;
+  e->bucket_ownerIdentity = owner.to_str();
+  e->bucket_arn = to_string(rgw::ARN(bucket));
+  e->bucket_id = bucket.bucket_id; // rgw extension
+  e->object_key = key.name;
   // object_size not supported in sync module
   objstore_event oevent(bucket, key, mtime, attrs);
-  r->object_etag = oevent.get_hash();
-  r->object_versionId = key.instance;
+  e->object_etag = oevent.get_hash();
+  e->object_versionId = key.instance;
  
   // use timestamp as per key sequence id (hex encoded)
   const utime_t ts(real_clock::now());
   boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t), 
-          std::back_inserter(r->object_sequencer));
+          std::back_inserter(e->object_sequencer));
  
-  set_event_id(r->id, r->object_etag, ts);
+  set_event_id(e->id, e->object_etag, ts);
 }
 
 class PSManager;
@@ -920,19 +768,15 @@ class PSManager
     int operate() override {
       reenter(this) {
         if (owner.empty()) {
-          if (!conf->find_sub(sub_name, &sub_conf)) {
-            ldout(sync_env->cct, 10) << "failed to find subscription config: name=" << sub_name << dendl;
+          ldout(sync_env->cct, 1) << "ERROR: missing user info when getting subscription: " << sub_name << dendl;
             mgr->remove_get_sub(owner, sub_name);
-            return set_cr_error(-ENOENT);
-          }
-
-          *ref = PSSubscription::get_shared(sc, mgr->env, sub_conf);
+            return set_cr_error(-EINVAL);
         } else {
           using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_sub_config>;
           yield {
-            RGWUserPubSub ups(sync_env->store, owner);
+            RGWPubSub ps(sync_env->store, owner.tenant);
             rgw_raw_obj obj;
-            ups.get_sub_meta_obj(sub_name, &obj);
+            ps.get_sub_meta_obj(sub_name, &obj);
             bool empty_on_enoent = false;
             call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj,
                                 obj,
@@ -948,14 +792,11 @@ class PSManager
 
         yield (*ref)->call_init_cr(this);
         if (retcode < 0) {
-          ldout(sync_env->cct, 10) << "failed to init subscription" << dendl;
+          ldout(sync_env->cct, 1) << "ERROR: failed to init subscription when getting subscription: " << sub_name << dendl;
           mgr->remove_get_sub(owner, sub_name);
           return set_cr_error(retcode);
         }
 
-        if (owner.empty()) {
-          mgr->subs[sub_name] = *ref;
-        }
         mgr->remove_get_sub(owner, sub_name);
 
         return set_cr_done();
@@ -1095,12 +936,12 @@ class RGWPSFindBucketTopicsCR : public RGWCoroutine {
   rgw_obj_key key;
   rgw::notify::EventType event_type;
 
-  RGWUserPubSub ups;
+  RGWPubSub ps;
 
   rgw_raw_obj bucket_obj;
   rgw_raw_obj user_obj;
   rgw_pubsub_bucket_topics bucket_topics;
-  rgw_pubsub_user_topics user_topics;
+  rgw_pubsub_topics user_topics;
   TopicsRef *topics;
 public:
   RGWPSFindBucketTopicsCR(RGWDataSyncCtx *_sc,
@@ -1116,14 +957,14 @@ public:
                                                           bucket(_bucket),
                                                           key(_key),
                                                           event_type(_event_type),
-                                                          ups(sync_env->store, owner),
+                                                          ps(sync_env->store, owner.tenant),
                                                           topics(_topics) {
     *topics = std::make_shared<vector<PSTopicConfigRef> >();
   }
   int operate() override {
     reenter(this) {
-      ups.get_bucket_meta_obj(bucket, &bucket_obj);
-      ups.get_user_meta_obj(&user_obj);
+      ps.get_bucket_meta_obj(bucket, &bucket_obj);
+      ps.get_meta_obj(&user_obj);
 
       using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_bucket_topics>;
       yield {
@@ -1139,7 +980,7 @@ public:
       ldout(sync_env->cct, 20) << "RGWPSFindBucketTopicsCR(): found " << bucket_topics.topics.size() << " topics for bucket " << bucket << dendl;
 
       if (!bucket_topics.topics.empty()) {
-       using ReadUserTopicsInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_user_topics>;
+       using ReadUserTopicsInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_topics>;
        yield {
          bool empty_on_enoent = true;
          call(new ReadUserTopicsInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj,
@@ -1164,7 +1005,6 @@ public:
         (*topics)->push_back(tc);
       }
 
-      env->conf->get_topics(sync_env->cct, bucket, key, topics);
       return set_cr_done();
     }
     return 0;
@@ -1174,34 +1014,29 @@ public:
 class RGWPSHandleObjEventCR : public RGWCoroutine {
   RGWDataSyncCtx* const sc;
   const PSEnvRef env;
-  const rgw_user& owner;
+  const rgw_user owner;
   const EventRef<rgw_pubsub_event> event;
-  const EventRef<rgw_pubsub_s3_record> record;
+  const EventRef<rgw_pubsub_s3_event> s3_event;
   const TopicsRef topics;
-  const std::array<rgw_user, 2> owners;
   bool has_subscriptions;
   bool event_handled;
-  bool sub_conf_found;
   PSSubscriptionRef sub;
-  std::array<rgw_user, 2>::const_iterator oiter;
   std::vector<PSTopicConfigRef>::const_iterator titer;
   std::set<std::string>::const_iterator siter;
-  int last_sub_conf_error;
 
 public:
   RGWPSHandleObjEventCR(RGWDataSyncCtx* const _sc,
                       const PSEnvRef _env,
                       const rgw_user& _owner,
                       const EventRef<rgw_pubsub_event>& _event,
-                      const EventRef<rgw_pubsub_s3_record>& _record,
+                      const EventRef<rgw_pubsub_s3_event>& _s3_event,
                       const TopicsRef& _topics) : RGWCoroutine(_sc->cct),
                                           sc(_sc),
                                           env(_env),
                                           owner(_owner),
                                           event(_event),
-                                          record(_record),
+                                          s3_event(_s3_event),
                                           topics(_topics),
-                                          owners({owner, rgw_user{}}),
                                           has_subscriptions(false),
                                           event_handled(false) {}
 
@@ -1226,79 +1061,67 @@ public:
         for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) {
           ldout(sc->cct, 20) << ": subscription: " << *siter << dendl;
           has_subscriptions = true;
-          sub_conf_found = false;
-          // try to read subscription configuration from global/user cond
-          // configuration is considered missing only if does not exist in either
-          for (oiter = owners.begin(); oiter != owners.end(); ++oiter) {
-            yield PSManager::call_get_subscription_cr(sc, env->manager, this, *oiter, *siter, &sub);
+          // try to read subscription configuration
+          yield PSManager::call_get_subscription_cr(sc, env->manager, this, owner, *siter, &sub);
+          if (retcode < 0) {
+            if (perfcounter) perfcounter->inc(l_rgw_pubsub_missing_conf);
+            ldout(sc->cct, 1) << "ERROR: failed to find subscription config for subscription=" << *siter 
+              << " ret=" << retcode << dendl;
+            if (retcode == -ENOENT) {
+              // missing subscription info should be reflected back as invalid argument
+              // and not as missing object
+              retcode = -EINVAL;
+            }
+            // try the next subscription
+            continue;
+          }
+          if (sub->sub_conf->s3_id.empty()) {
+            // subscription was not made by S3 compatible API
+            ldout(sc->cct, 20) << "storing event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
+            yield call(PSSubscription::store_event_cr(sc, sub, event));
             if (retcode < 0) {
-              if (sub_conf_found) {
-                // not a real issue, sub conf already found
-                retcode = 0;
-              }
-              last_sub_conf_error = retcode;
-              continue;
+              if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
+              ldout(sc->cct, 1) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl;
+            } else {
+              if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
+              event_handled = true;
             }
-            sub_conf_found = true;
-            if (sub->sub_conf->s3_id.empty()) {
-              // subscription was not made by S3 compatible API
-              ldout(sc->cct, 20) << "storing event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
-              yield call(PSSubscription::store_event_cr(sc, sub, event));
+            if (sub->sub_conf->push_endpoint) {
+              ldout(sc->cct, 20) << "push event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
+              yield call(PSSubscription::push_event_cr(sc, sub, event));
               if (retcode < 0) {
-                if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
-                ldout(sc->cct, 1) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl;
+                if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+                ldout(sc->cct, 1) << "ERROR: failed to push event for subscription=" << *siter << " ret=" << retcode << dendl;
               } else {
-                if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
+                if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
                 event_handled = true;
               }
-              if (sub->sub_conf->push_endpoint) {
-                ldout(sc->cct, 20) << "push event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
-                yield call(PSSubscription::push_event_cr(sc, sub, event));
-                if (retcode < 0) {
-                  if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
-                  ldout(sc->cct, 1) << "ERROR: failed to push event for subscription=" << *siter << " ret=" << retcode << dendl;
-                } else {
-                  if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
-                  event_handled = true;
-                }
-              } 
+            } 
+          } else {
+            // subscription was made by S3 compatible API
+            ldout(sc->cct, 20) << "storing s3 event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
+            s3_event->configurationId = sub->sub_conf->s3_id;
+            s3_event->opaque_data = (*titer)->opaque_data;
+            yield call(PSSubscription::store_event_cr(sc, sub, s3_event));
+            if (retcode < 0) {
+              if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
+              ldout(sc->cct, 1) << "ERROR: failed to store s3 event for subscription=" << *siter << " ret=" << retcode << dendl;
             } else {
-              // subscription was made by S3 compatible API
-              ldout(sc->cct, 20) << "storing record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
-              record->configurationId = sub->sub_conf->s3_id;
-              record->opaque_data = (*titer)->opaque_data;
-              yield call(PSSubscription::store_event_cr(sc, sub, record));
+              if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
+              event_handled = true;
+            }
+            if (sub->sub_conf->push_endpoint) {
+                ldout(sc->cct, 20) << "push s3 event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
+              yield call(PSSubscription::push_event_cr(sc, sub, s3_event));
               if (retcode < 0) {
-                if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
-                ldout(sc->cct, 1) << "ERROR: failed to store record for subscription=" << *siter << " ret=" << retcode << dendl;
+                if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+                ldout(sc->cct, 1) << "ERROR: failed to push s3 event for subscription=" << *siter << " ret=" << retcode << dendl;
               } else {
-                if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
+                if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
                 event_handled = true;
               }
-              if (sub->sub_conf->push_endpoint) {
-                  ldout(sc->cct, 20) << "push record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
-                yield call(PSSubscription::push_event_cr(sc, sub, record));
-                if (retcode < 0) {
-                  if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
-                  ldout(sc->cct, 1) << "ERROR: failed to push record for subscription=" << *siter << " ret=" << retcode << dendl;
-                } else {
-                  if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
-                  event_handled = true;
-                }
-              }
             }
           }
-          if (!sub_conf_found) {
-            // could not find conf for subscription at user or global levels
-            if (perfcounter) perfcounter->inc(l_rgw_pubsub_missing_conf);
-            ldout(sc->cct, 1) << "ERROR: failed to find subscription config for subscription=" << *siter 
-              << " ret=" << last_sub_conf_error << dendl;
-              if (retcode == -ENOENT) {
-                // missing subscription info should be reflected back as invalid argument
-                // and not as missing object
-                retcode =  -EINVAL;
-              }
-          }
         }
       }
       if (has_subscriptions && !event_handled) {
@@ -1322,7 +1145,7 @@ class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
   PSEnvRef env;
   std::optional<uint64_t> versioned_epoch;
   EventRef<rgw_pubsub_event> event;
-  EventRef<rgw_pubsub_s3_record> record;
+  EventRef<rgw_pubsub_s3_event> s3_event;
   TopicsRef topics;
 public:
   RGWPSHandleRemoteObjCBCR(RGWDataSyncCtx *_sc,
@@ -1349,20 +1172,20 @@ public:
           }
           attrs.push_back(std::make_pair(k, attr.second));
         } 
-        // at this point we don't know whether we need the ceph event or S3 record
+        // at this point we don't know whether we need the ceph event or S3 event
         // this is why both are created here, once we have information about the 
         // subscription, we will store/push only the relevant ones
         make_event_ref(sc->cct,
                        sync_pipe.info.source_bs.bucket, key,
                        mtime, &attrs,
                        rgw::notify::ObjectCreated, &event);
-        make_s3_record_ref(sc->cct,
+        make_s3_event_ref(sc->cct,
                        sync_pipe.info.source_bs.bucket, sync_pipe.dest_bucket_info.owner, key,
                        mtime, &attrs,
-                       rgw::notify::ObjectCreated, &record);
+                       rgw::notify::ObjectCreated, &s3_event);
       }
 
-      yield call(new RGWPSHandleObjEventCR(sc, env, sync_pipe.source_bucket_info.owner, event, record, topics));
+      yield call(new RGWPSHandleObjEventCR(sc, env, sync_pipe.source_bucket_info.owner, event, s3_event, topics));
       if (retcode < 0) {
         return set_cr_error(retcode);
       }
@@ -1448,7 +1271,7 @@ class RGWPSGenericObjEventCBCR : public RGWCoroutine {
   ceph::real_time mtime;
   rgw::notify::EventType event_type;
   EventRef<rgw_pubsub_event> event;
-  EventRef<rgw_pubsub_s3_record> record;
+  EventRef<rgw_pubsub_s3_event> s3_event;
   TopicsRef topics;
 public:
   RGWPSGenericObjEventCBCR(RGWDataSyncCtx *_sc,
@@ -1474,18 +1297,18 @@ public:
         ldout(sc->cct, 20) << "no topics found for " << bucket << "/" << key << dendl;
         return set_cr_done();
       }
-      // at this point we don't know whether we need the ceph event or S3 record
+      // at this point we don't know whether we need the ceph event or S3 event
       // this is why both are created here, once we have information about the 
       // subscription, we will store/push only the relevant ones
       make_event_ref(sc->cct,
                      bucket, key,
                      mtime, nullptr,
                      event_type, &event);
-      make_s3_record_ref(sc->cct,
+      make_s3_event_ref(sc->cct,
                      bucket, owner, key,
                      mtime, nullptr,
-                     event_type, &record);
-      yield call(new RGWPSHandleObjEventCR(sc, env, owner, event, record, topics));
+                     event_type, &s3_event);
+      yield call(new RGWPSHandleObjEventCR(sc, env, owner, event, s3_event, topics));
       if (retcode < 0) {
         return set_cr_error(retcode);
       }
@@ -1553,25 +1376,6 @@ RGWPSSyncModuleInstance::RGWPSSyncModuleInstance(CephContext *cct, const JSONFor
   } else {
     effective_conf.decode_json(&p);
   }
-#ifdef WITH_RADOSGW_AMQP_ENDPOINT
-  if (!rgw::amqp::init(cct)) {
-    ldout(cct, 1) << "ERROR: failed to initialize AMQP manager in pubsub sync module" << dendl;
-  }
-#endif
-#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
-  if (!rgw::kafka::init(cct)) {
-    ldout(cct, 1) << "ERROR: failed to initialize Kafka manager in pubsub sync module" << dendl;
-  }
-#endif
-}
-
-RGWPSSyncModuleInstance::~RGWPSSyncModuleInstance() {
-#ifdef WITH_RADOSGW_AMQP_ENDPOINT
-  rgw::amqp::shutdown();
-#endif
-#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
-  rgw::kafka::shutdown();
-#endif
 }
 
 RGWDataSyncModule *RGWPSSyncModuleInstance::get_data_handler()