timer(io_context) {}
void async_wait(spawn::yield_context yield) {
+ if (pending_tokens == 0) {
+ return;
+ }
timer.expires_from_now(infinite_duration);
boost::system::error_code ec;
timer.async_wait(yield[ec]);
<< queue_name << dendl;
}
}
-
}
}
const auto max_jitter = 500; // ms
std::uniform_int_distribution<> duration_jitter(min_jitter, max_jitter);
+ std::vector<std::string> queue_gc;
+ std::mutex queue_gc_lock;
while (true) {
Timer timer(io_context);
const auto duration = (has_error ?
continue;
}
- std::vector<std::string> queue_gc;
- std::mutex queue_gc_lock;
for (const auto& queue_name : queues) {
// try to lock the queue to check if it is owned by this rgw
// or if ownershif needs to be taken
// start the worker threads to do the actual queue processing
const std::string WORKER_THREAD_NAME = "notif-worker";
for (auto worker_id = 0U; worker_id < worker_count; ++worker_id) {
- workers.emplace_back([this]() { io_context.run(); });
- const auto rc = ceph_pthread_setname(workers.back().native_handle(),
- (WORKER_THREAD_NAME+std::to_string(worker_id)).c_str());
+ workers.emplace_back([this]() {
+ try {
+ io_context.run();
+ } catch (const std::exception& err) {
+ ldpp_dout(this, 10) << "Notification worker failed with error: " << err.what() << dendl;
+ throw(err);
+ }
+ });
+ const auto rc = ceph_pthread_setname(workers.back().native_handle(),
+ (WORKER_THREAD_NAME+std::to_string(worker_id)).c_str());
ceph_assert(rc == 0);
}
ldpp_dout(this, 10) << "Started notification manager with: " << worker_count << " workers" << dendl;
}
// populate event from request
-void populate_event_from_request(const req_state *s,
+void populate_event_from_request(const reservation_t& res,
rgw::sal::RGWObject* obj,
uint64_t size,
const ceph::real_time& mtime,
const std::string& etag,
EventType event_type,
- rgw_pubsub_s3_event& event) {
+ rgw_pubsub_s3_event& event) {
+ const auto s = res.s;
event.eventTime = mtime;
event.eventName = to_string(event_type);
event.userIdentity = s->user->get_id().id; // user that triggered the change
event.bucket_name = s->bucket_name;
event.bucket_ownerIdentity = s->bucket_owner.get_id().id;
event.bucket_arn = to_string(rgw::ARN(s->bucket->get_key()));
- event.object_key = obj->get_name();
+ event.object_key = res.object_name ? *res.object_name : obj->get_name();
event.object_size = size;
event.object_etag = etag;
event.object_versionId = obj->get_instance();
std::back_inserter(event.object_sequencer));
set_event_id(event.id, etag, ts);
event.bucket_id = s->bucket->get_bucket_id();
- // pass meta data
- if (s->info.x_meta_map.empty()) {
- // try to fetch the metadata from the attributes
+ // pass metadata
+ if (res.cached_metadata.empty()) {
+ // no metadata cached:
+ // either no metadata exist or no metadata filter was used
+ event.x_meta_map = s->info.x_meta_map;
metadata_from_attributes(s, obj, event.x_meta_map);
} else {
- event.x_meta_map = s->info.x_meta_map;
+ event.x_meta_map = std::move(res.cached_metadata);
}
// pass tags
if (s->tagset.get_tags().empty()) {
// opaque data will be filled from topic configuration
}
-bool notification_match(const rgw_pubsub_topic_filter& filter, const req_state* s, rgw::sal::RGWObject* obj,
- EventType event, const RGWObjTags* req_tags) {
- if (!match(filter.events, event)) {
+bool notification_match(reservation_t& res, const rgw_pubsub_topic_filter& filter, EventType event, const RGWObjTags* req_tags) {
+ if (!match(filter.events, event)) {
return false;
}
- if (!match(filter.s3_filter.key_filter, obj->get_name())) {
+ const auto obj = res.object;
+ if (!match(filter.s3_filter.key_filter,
+ res.object_name ? *res.object_name : obj->get_name())) {
return false;
}
+ const auto s = res.s;
if (!filter.s3_filter.metadata_filter.kv.empty()) {
// metadata filter exists
- if (!s->info.x_meta_map.empty()) {
- // metadata was cached in req_state
- if (!match(filter.s3_filter.metadata_filter, s->info.x_meta_map)) {
- return false;
- }
- } else {
- // try to fetch the metadata from the attributes
- KeyValueMap metadata;
- metadata_from_attributes(s, obj, metadata);
- if (!match(filter.s3_filter.metadata_filter, metadata)) {
- return false;
- }
+ res.cached_metadata = s->info.x_meta_map;
+ metadata_from_attributes(s, obj, res.cached_metadata);
+ if (!match(filter.s3_filter.metadata_filter, res.cached_metadata)) {
+ return false;
}
}
for (const auto& bucket_topic : bucket_topics.topics) {
const rgw_pubsub_topic_filter& topic_filter = bucket_topic.second;
const rgw_pubsub_topic& topic_cfg = topic_filter.topic;
- if (!notification_match(topic_filter, res.s, res.object, event_type, req_tags)) {
+ if (!notification_match(res, topic_filter, event_type, req_tags)) {
// notification does not apply to req_state
continue;
}
continue;
}
event_entry_t event_entry;
- populate_event_from_request(res.s, obj, size, mtime, etag, event_type, event_entry.event);
+ populate_event_from_request(res, obj, size, mtime, etag, event_type, event_entry.event);
event_entry.event.configurationId = topic.configurationId;
event_entry.event.opaque_data = topic.cfg.opaque_data;
if (topic.cfg.dest.persistent) {