#include "rgw_pubsub_push.h"
#include "rgw_notify_event_type.h"
#include "rgw_perf_counters.h"
-#ifdef WITH_RADOSGW_AMQP_ENDPOINT
-#include "rgw_amqp.h"
-#endif
-#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
-#include "rgw_kafka.h"
-#endif
#include <boost/algorithm/hex.hpp>
#include <boost/asio/yield.hpp>
"data_oid_prefix": <prefix> #
"events_retention_days": <int> # default: 7
"start_with_full_sync" <bool> # default: false
-
- # non-dynamic config
- "notifications": [
- {
- "path": <notification-path>, # this can be either an explicit path: <bucket>, or <bucket>/<object>,
- # or a prefix if it ends with a wildcard
- "topic": <topic-name>
- },
- ...
- ],
- "subscriptions": [
- {
- "name": <subscription-name>,
- "topic": <topic>,
- "push_endpoint": <endpoint>,
- "push_endpoint_args:" <arg list>. # any push endpoint specific args (include all args)
- "data_bucket": <bucket>, # override name of bucket where subscription data will be store
- "data_oid_prefix": <prefix> # set prefix for subscription data object ids
- "s3_id": <id> # in case of S3 compatible notifications, the notification ID will be set here
- },
- ...
- ]
}
*/
encode_json("s3_id", s3_id, f);
}
- void init(CephContext *cct, const JSONFormattable& config,
- const string& data_bucket_prefix,
- const string& default_oid_prefix) {
- name = config["name"];
- topic = config["topic"];
- push_endpoint_name = config["push_endpoint"];
- string default_bucket_name = data_bucket_prefix + name;
- data_bucket_name = config["data_bucket"](default_bucket_name.c_str());
- data_oid_prefix = config["data_oid_prefix"](default_oid_prefix.c_str());
- s3_id = config["s3_id"];
- arn_topic = config["arn_topic"];
- if (!push_endpoint_name.empty()) {
- push_endpoint_args = config["push_endpoint_args"];
- try {
- push_endpoint = RGWPubSubEndpoint::create(push_endpoint_name, arn_topic, string_to_args(push_endpoint_args), cct);
- ldout(cct, 20) << "push endpoint created: " << push_endpoint->to_str() << dendl;
- } catch (const RGWPubSubEndpoint::configuration_error& e) {
- ldout(cct, 1) << "ERROR: failed to create push endpoint: "
- << push_endpoint_name << " due to: " << e.what() << dendl;
- }
- }
- }
};
using PSSubConfigRef = std::shared_ptr<PSSubConfig>;
using PSTopicConfigRef = std::shared_ptr<PSTopicConfig>;
using TopicsRef = std::shared_ptr<std::vector<PSTopicConfigRef>>;
+// global pubsub configuration
struct PSConfig {
const std::string id{"pubsub"};
rgw_user user;
std::string data_bucket_prefix;
std::string data_oid_prefix;
-
int events_retention_days{0};
-
uint64_t sync_instance{0};
- uint64_t max_id{0};
-
- /* FIXME: no hard coded buckets, we'll have configurable topics */
- std::map<std::string, PSSubConfigRef> subs;
- std::map<std::string, PSTopicConfigRef> topics;
- std::multimap<std::string, PSNotificationConfig> notifications;
-
bool start_with_full_sync{false};
void dump(Formatter *f) const {
encode_json("data_oid_prefix", data_oid_prefix, f);
encode_json("events_retention_days", events_retention_days, f);
encode_json("sync_instance", sync_instance, f);
- encode_json("max_id", max_id, f);
- {
- Formatter::ArraySection section(*f, "subs");
- for (auto& sub : subs) {
- encode_json("sub", *sub.second, f);
- }
- }
- {
- Formatter::ArraySection section(*f, "topics");
- for (auto& topic : topics) {
- encode_json("topic", *topic.second, f);
- }
- }
- {
- Formatter::ObjectSection section(*f, "notifications");
- std::string last;
- for (auto& notif : notifications) {
- const string& n = notif.first;
- if (n != last) {
- if (!last.empty()) {
- f->close_section();
- }
- f->open_array_section(n.c_str());
- }
- last = n;
- encode_json("notifications", notif.second, f);
- }
- if (!last.empty()) {
- f->close_section();
- }
- }
encode_json("start_with_full_sync", start_with_full_sync, f);
}
data_bucket_prefix = config["data_bucket_prefix"]("pubsub-");
data_oid_prefix = config["data_oid_prefix"];
events_retention_days = config["events_retention_days"](PUBSUB_EVENTS_RETENTION_DEFAULT);
-
- for (auto& c : config["notifications"].array()) {
- PSNotificationConfig nc;
- nc.id = ++max_id;
- nc.init(cct, c);
- notifications.insert(std::make_pair(nc.path, nc));
-
- PSTopicConfig topic_config = { .name = nc.topic };
- topics[nc.topic] = make_shared<PSTopicConfig>(topic_config);
- }
- for (auto& c : config["subscriptions"].array()) {
- auto sc = std::make_shared<PSSubConfig>();
- sc->init(cct, c, data_bucket_prefix, data_oid_prefix);
- subs[sc->name] = sc;
- auto iter = topics.find(sc->topic);
- if (iter != topics.end()) {
- iter->second->subs.insert(sc->name);
- }
- }
start_with_full_sync = config["start_with_full_sync"](false);
- ldout(cct, 5) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl;
+ ldout(cct, 20) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl;
}
void init_instance(const RGWRealm& realm, uint64_t instance_id) {
sync_instance = instance_id;
}
-
- void get_topics(CephContext *cct, const rgw_bucket& bucket, const rgw_obj_key& key, TopicsRef *result) {
- const std::string path = bucket.name + "/" + key.name;
-
- auto iter = notifications.upper_bound(path);
- if (iter == notifications.begin()) {
- return;
- }
-
- do {
- --iter;
- if (iter->first.size() > path.size()) {
- break;
- }
- if (path.compare(0, iter->first.size(), iter->first) != 0) {
- break;
- }
-
- PSNotificationConfig& target = iter->second;
-
- if (!target.is_prefix &&
- path.size() != iter->first.size()) {
- continue;
- }
-
- auto topic = topics.find(target.topic);
- if (topic == topics.end()) {
- continue;
- }
-
- ldout(cct, 20) << ": found topic for path=" << bucket << "/" << key << ": id=" << target.id <<
- " target_path=" << target.path << ", topic=" << target.topic << dendl;
- (*result)->push_back(topic->second);
- } while (iter != notifications.begin());
- }
-
- bool find_sub(const string& name, PSSubConfigRef *ref) {
- auto iter = subs.find(name);
- if (iter != subs.end()) {
- *ref = iter->second;
- return true;
- }
- return false;
- }
};
using PSConfigRef = std::shared_ptr<PSConfig>;
encode_json("info", oevent, &e->info);
}
-static void make_s3_record_ref(CephContext *cct, const rgw_bucket& bucket,
+static void make_s3_event_ref(CephContext *cct, const rgw_bucket& bucket,
const rgw_user& owner,
const rgw_obj_key& key,
const ceph::real_time& mtime,
- const std::vector<std::pair<std::string, std::string> > *attrs,
+ const std::vector<std::pair<std::string, std::string>>* attrs,
rgw::notify::EventType event_type,
- EventRef<rgw_pubsub_s3_record> *record) {
- *record = std::make_shared<rgw_pubsub_s3_record>();
+ EventRef<rgw_pubsub_s3_event>* event) {
+ *event = std::make_shared<rgw_pubsub_s3_event>();
- EventRef<rgw_pubsub_s3_record>& r = *record;
- r->eventTime = mtime;
- r->eventName = rgw::notify::to_string(event_type);
+ EventRef<rgw_pubsub_s3_event>& e = *event;
+ e->eventTime = mtime;
+ e->eventName = rgw::notify::to_string(event_type);
// userIdentity: not supported in sync module
// x_amz_request_id: not supported in sync module
// x_amz_id_2: not supported in sync module
// configurationId is filled from subscription configuration
- r->bucket_name = bucket.name;
- r->bucket_ownerIdentity = owner.to_str();
- r->bucket_arn = to_string(rgw::ARN(bucket));
- r->bucket_id = bucket.bucket_id; // rgw extension
- r->object_key = key.name;
+ e->bucket_name = bucket.name;
+ e->bucket_ownerIdentity = owner.to_str();
+ e->bucket_arn = to_string(rgw::ARN(bucket));
+ e->bucket_id = bucket.bucket_id; // rgw extension
+ e->object_key = key.name;
// object_size not supported in sync module
objstore_event oevent(bucket, key, mtime, attrs);
- r->object_etag = oevent.get_hash();
- r->object_versionId = key.instance;
+ e->object_etag = oevent.get_hash();
+ e->object_versionId = key.instance;
// use timestamp as per key sequence id (hex encoded)
const utime_t ts(real_clock::now());
boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t),
- std::back_inserter(r->object_sequencer));
+ std::back_inserter(e->object_sequencer));
- set_event_id(r->id, r->object_etag, ts);
+ set_event_id(e->id, e->object_etag, ts);
}
class PSManager;
int operate() override {
reenter(this) {
if (owner.empty()) {
- if (!conf->find_sub(sub_name, &sub_conf)) {
- ldout(sync_env->cct, 10) << "failed to find subscription config: name=" << sub_name << dendl;
+ ldout(sync_env->cct, 1) << "ERROR: missing user info when getting subscription: " << sub_name << dendl;
mgr->remove_get_sub(owner, sub_name);
- return set_cr_error(-ENOENT);
- }
-
- *ref = PSSubscription::get_shared(sc, mgr->env, sub_conf);
+ return set_cr_error(-EINVAL);
} else {
using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_sub_config>;
yield {
- RGWUserPubSub ups(sync_env->store, owner);
+ RGWPubSub ps(sync_env->store, owner.tenant);
rgw_raw_obj obj;
- ups.get_sub_meta_obj(sub_name, &obj);
+ ps.get_sub_meta_obj(sub_name, &obj);
bool empty_on_enoent = false;
call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj,
obj,
yield (*ref)->call_init_cr(this);
if (retcode < 0) {
- ldout(sync_env->cct, 10) << "failed to init subscription" << dendl;
+ ldout(sync_env->cct, 1) << "ERROR: failed to init subscription when getting subscription: " << sub_name << dendl;
mgr->remove_get_sub(owner, sub_name);
return set_cr_error(retcode);
}
- if (owner.empty()) {
- mgr->subs[sub_name] = *ref;
- }
mgr->remove_get_sub(owner, sub_name);
return set_cr_done();
rgw_obj_key key;
rgw::notify::EventType event_type;
- RGWUserPubSub ups;
+ RGWPubSub ps;
rgw_raw_obj bucket_obj;
rgw_raw_obj user_obj;
rgw_pubsub_bucket_topics bucket_topics;
- rgw_pubsub_user_topics user_topics;
+ rgw_pubsub_topics user_topics;
TopicsRef *topics;
public:
RGWPSFindBucketTopicsCR(RGWDataSyncCtx *_sc,
bucket(_bucket),
key(_key),
event_type(_event_type),
- ups(sync_env->store, owner),
+ ps(sync_env->store, owner.tenant),
topics(_topics) {
*topics = std::make_shared<vector<PSTopicConfigRef> >();
}
int operate() override {
reenter(this) {
- ups.get_bucket_meta_obj(bucket, &bucket_obj);
- ups.get_user_meta_obj(&user_obj);
+ ps.get_bucket_meta_obj(bucket, &bucket_obj);
+ ps.get_meta_obj(&user_obj);
using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_bucket_topics>;
yield {
ldout(sync_env->cct, 20) << "RGWPSFindBucketTopicsCR(): found " << bucket_topics.topics.size() << " topics for bucket " << bucket << dendl;
if (!bucket_topics.topics.empty()) {
- using ReadUserTopicsInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_user_topics>;
+ using ReadUserTopicsInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_topics>;
yield {
bool empty_on_enoent = true;
call(new ReadUserTopicsInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj,
(*topics)->push_back(tc);
}
- env->conf->get_topics(sync_env->cct, bucket, key, topics);
return set_cr_done();
}
return 0;
class RGWPSHandleObjEventCR : public RGWCoroutine {
RGWDataSyncCtx* const sc;
const PSEnvRef env;
- const rgw_user& owner;
+ const rgw_user owner;
const EventRef<rgw_pubsub_event> event;
- const EventRef<rgw_pubsub_s3_record> record;
+ const EventRef<rgw_pubsub_s3_event> s3_event;
const TopicsRef topics;
- const std::array<rgw_user, 2> owners;
bool has_subscriptions;
bool event_handled;
- bool sub_conf_found;
PSSubscriptionRef sub;
- std::array<rgw_user, 2>::const_iterator oiter;
std::vector<PSTopicConfigRef>::const_iterator titer;
std::set<std::string>::const_iterator siter;
- int last_sub_conf_error;
public:
RGWPSHandleObjEventCR(RGWDataSyncCtx* const _sc,
const PSEnvRef _env,
const rgw_user& _owner,
const EventRef<rgw_pubsub_event>& _event,
- const EventRef<rgw_pubsub_s3_record>& _record,
+ const EventRef<rgw_pubsub_s3_event>& _s3_event,
const TopicsRef& _topics) : RGWCoroutine(_sc->cct),
sc(_sc),
env(_env),
owner(_owner),
event(_event),
- record(_record),
+ s3_event(_s3_event),
topics(_topics),
- owners({owner, rgw_user{}}),
has_subscriptions(false),
event_handled(false) {}
for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) {
ldout(sc->cct, 20) << ": subscription: " << *siter << dendl;
has_subscriptions = true;
- sub_conf_found = false;
- // try to read subscription configuration from global/user cond
- // configuration is considered missing only if does not exist in either
- for (oiter = owners.begin(); oiter != owners.end(); ++oiter) {
- yield PSManager::call_get_subscription_cr(sc, env->manager, this, *oiter, *siter, &sub);
+ // try to read subscription configuration
+ yield PSManager::call_get_subscription_cr(sc, env->manager, this, owner, *siter, &sub);
+ if (retcode < 0) {
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_missing_conf);
+ ldout(sc->cct, 1) << "ERROR: failed to find subscription config for subscription=" << *siter
+ << " ret=" << retcode << dendl;
+ if (retcode == -ENOENT) {
+ // missing subscription info should be reflected back as invalid argument
+ // and not as missing object
+ retcode = -EINVAL;
+ }
+ // try the next subscription
+ continue;
+ }
+ if (sub->sub_conf->s3_id.empty()) {
+ // subscription was not made by S3 compatible API
+ ldout(sc->cct, 20) << "storing event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
+ yield call(PSSubscription::store_event_cr(sc, sub, event));
if (retcode < 0) {
- if (sub_conf_found) {
- // not a real issue, sub conf already found
- retcode = 0;
- }
- last_sub_conf_error = retcode;
- continue;
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
+ ldout(sc->cct, 1) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl;
+ } else {
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
+ event_handled = true;
}
- sub_conf_found = true;
- if (sub->sub_conf->s3_id.empty()) {
- // subscription was not made by S3 compatible API
- ldout(sc->cct, 20) << "storing event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
- yield call(PSSubscription::store_event_cr(sc, sub, event));
+ if (sub->sub_conf->push_endpoint) {
+ ldout(sc->cct, 20) << "push event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
+ yield call(PSSubscription::push_event_cr(sc, sub, event));
if (retcode < 0) {
- if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
- ldout(sc->cct, 1) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl;
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+ ldout(sc->cct, 1) << "ERROR: failed to push event for subscription=" << *siter << " ret=" << retcode << dendl;
} else {
- if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
event_handled = true;
}
- if (sub->sub_conf->push_endpoint) {
- ldout(sc->cct, 20) << "push event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
- yield call(PSSubscription::push_event_cr(sc, sub, event));
- if (retcode < 0) {
- if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
- ldout(sc->cct, 1) << "ERROR: failed to push event for subscription=" << *siter << " ret=" << retcode << dendl;
- } else {
- if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
- event_handled = true;
- }
- }
+ }
+ } else {
+ // subscription was made by S3 compatible API
+ ldout(sc->cct, 20) << "storing s3 event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
+ s3_event->configurationId = sub->sub_conf->s3_id;
+ s3_event->opaque_data = (*titer)->opaque_data;
+ yield call(PSSubscription::store_event_cr(sc, sub, s3_event));
+ if (retcode < 0) {
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
+ ldout(sc->cct, 1) << "ERROR: failed to store s3 event for subscription=" << *siter << " ret=" << retcode << dendl;
} else {
- // subscription was made by S3 compatible API
- ldout(sc->cct, 20) << "storing record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
- record->configurationId = sub->sub_conf->s3_id;
- record->opaque_data = (*titer)->opaque_data;
- yield call(PSSubscription::store_event_cr(sc, sub, record));
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
+ event_handled = true;
+ }
+ if (sub->sub_conf->push_endpoint) {
+ ldout(sc->cct, 20) << "push s3 event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
+ yield call(PSSubscription::push_event_cr(sc, sub, s3_event));
if (retcode < 0) {
- if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
- ldout(sc->cct, 1) << "ERROR: failed to store record for subscription=" << *siter << " ret=" << retcode << dendl;
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+ ldout(sc->cct, 1) << "ERROR: failed to push s3 event for subscription=" << *siter << " ret=" << retcode << dendl;
} else {
- if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
event_handled = true;
}
- if (sub->sub_conf->push_endpoint) {
- ldout(sc->cct, 20) << "push record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
- yield call(PSSubscription::push_event_cr(sc, sub, record));
- if (retcode < 0) {
- if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
- ldout(sc->cct, 1) << "ERROR: failed to push record for subscription=" << *siter << " ret=" << retcode << dendl;
- } else {
- if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
- event_handled = true;
- }
- }
}
}
- if (!sub_conf_found) {
- // could not find conf for subscription at user or global levels
- if (perfcounter) perfcounter->inc(l_rgw_pubsub_missing_conf);
- ldout(sc->cct, 1) << "ERROR: failed to find subscription config for subscription=" << *siter
- << " ret=" << last_sub_conf_error << dendl;
- if (retcode == -ENOENT) {
- // missing subscription info should be reflected back as invalid argument
- // and not as missing object
- retcode = -EINVAL;
- }
- }
}
}
if (has_subscriptions && !event_handled) {
PSEnvRef env;
std::optional<uint64_t> versioned_epoch;
EventRef<rgw_pubsub_event> event;
- EventRef<rgw_pubsub_s3_record> record;
+ EventRef<rgw_pubsub_s3_event> s3_event;
TopicsRef topics;
public:
RGWPSHandleRemoteObjCBCR(RGWDataSyncCtx *_sc,
}
attrs.push_back(std::make_pair(k, attr.second));
}
- // at this point we don't know whether we need the ceph event or S3 record
+ // at this point we don't know whether we need the ceph event or S3 event
// this is why both are created here, once we have information about the
// subscription, we will store/push only the relevant ones
make_event_ref(sc->cct,
sync_pipe.info.source_bs.bucket, key,
mtime, &attrs,
rgw::notify::ObjectCreated, &event);
- make_s3_record_ref(sc->cct,
+ make_s3_event_ref(sc->cct,
sync_pipe.info.source_bs.bucket, sync_pipe.dest_bucket_info.owner, key,
mtime, &attrs,
- rgw::notify::ObjectCreated, &record);
+ rgw::notify::ObjectCreated, &s3_event);
}
- yield call(new RGWPSHandleObjEventCR(sc, env, sync_pipe.source_bucket_info.owner, event, record, topics));
+ yield call(new RGWPSHandleObjEventCR(sc, env, sync_pipe.source_bucket_info.owner, event, s3_event, topics));
if (retcode < 0) {
return set_cr_error(retcode);
}
ceph::real_time mtime;
rgw::notify::EventType event_type;
EventRef<rgw_pubsub_event> event;
- EventRef<rgw_pubsub_s3_record> record;
+ EventRef<rgw_pubsub_s3_event> s3_event;
TopicsRef topics;
public:
RGWPSGenericObjEventCBCR(RGWDataSyncCtx *_sc,
ldout(sc->cct, 20) << "no topics found for " << bucket << "/" << key << dendl;
return set_cr_done();
}
- // at this point we don't know whether we need the ceph event or S3 record
+ // at this point we don't know whether we need the ceph event or S3 event
// this is why both are created here, once we have information about the
// subscription, we will store/push only the relevant ones
make_event_ref(sc->cct,
bucket, key,
mtime, nullptr,
event_type, &event);
- make_s3_record_ref(sc->cct,
+ make_s3_event_ref(sc->cct,
bucket, owner, key,
mtime, nullptr,
- event_type, &record);
- yield call(new RGWPSHandleObjEventCR(sc, env, owner, event, record, topics));
+ event_type, &s3_event);
+ yield call(new RGWPSHandleObjEventCR(sc, env, owner, event, s3_event, topics));
if (retcode < 0) {
return set_cr_error(retcode);
}
} else {
effective_conf.decode_json(&p);
}
-#ifdef WITH_RADOSGW_AMQP_ENDPOINT
- if (!rgw::amqp::init(cct)) {
- ldout(cct, 1) << "ERROR: failed to initialize AMQP manager in pubsub sync module" << dendl;
- }
-#endif
-#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
- if (!rgw::kafka::init(cct)) {
- ldout(cct, 1) << "ERROR: failed to initialize Kafka manager in pubsub sync module" << dendl;
- }
-#endif
-}
-
-RGWPSSyncModuleInstance::~RGWPSSyncModuleInstance() {
-#ifdef WITH_RADOSGW_AMQP_ENDPOINT
- rgw::amqp::shutdown();
-#endif
-#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
- rgw::kafka::shutdown();
-#endif
}
RGWDataSyncModule *RGWPSSyncModuleInstance::get_data_handler()