]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_notify.cc
import ceph 16.2.6
[ceph.git] / ceph / src / rgw / rgw_notify.cc
index a4ad062005e70b607fab90f84cf25bf108be6e0b..f40c2adaf588bb07796fbb7daf1c6202c38cfd9a 100644 (file)
@@ -145,6 +145,9 @@ class Manager : public DoutPrefixProvider {
       timer(io_context) {}  
  
     void async_wait(spawn::yield_context yield) { 
+      if (pending_tokens == 0) {
+        return;
+      }
       timer.expires_from_now(infinite_duration);
       boost::system::error_code ec; 
       timer.async_wait(yield[ec]);
@@ -360,7 +363,6 @@ class Manager : public DoutPrefixProvider {
           << queue_name << dendl;
         }
       }
-
     }
   }
 
@@ -381,6 +383,8 @@ class Manager : public DoutPrefixProvider {
     const auto max_jitter = 500; // ms
     std::uniform_int_distribution<> duration_jitter(min_jitter, max_jitter);
 
+    std::vector<std::string> queue_gc;
+    std::mutex queue_gc_lock;
     while (true) {
       Timer timer(io_context);
       const auto duration = (has_error ? 
@@ -399,8 +403,6 @@ class Manager : public DoutPrefixProvider {
         continue;
       }
 
-      std::vector<std::string> queue_gc;
-      std::mutex queue_gc_lock;
       for (const auto& queue_name : queues) {
         // try to lock the queue to check if it is owned by this rgw
         // or if ownershif needs to be taken
@@ -493,9 +495,16 @@ public:
       // start the worker threads to do the actual queue processing
       const std::string WORKER_THREAD_NAME = "notif-worker";
       for (auto worker_id = 0U; worker_id < worker_count; ++worker_id) {
-        workers.emplace_back([this]() { io_context.run(); });
-        const auto rc = ceph_pthread_setname(workers.back().native_handle(), 
-            (WORKER_THREAD_NAME+std::to_string(worker_id)).c_str());
+        workers.emplace_back([this]() {
+          try {
+            io_context.run(); 
+          } catch (const std::exception& err) {
+            ldpp_dout(this, 10) << "Notification worker failed with error: " << err.what() << dendl;
+            throw(err);
+          }
+        });
+        const auto rc = ceph_pthread_setname(workers.back().native_handle(),
+          (WORKER_THREAD_NAME+std::to_string(worker_id)).c_str());
         ceph_assert(rc == 0);
       }
       ldpp_dout(this, 10) << "Started notification manager with: " << worker_count << " workers" << dendl;
@@ -658,13 +667,14 @@ void tags_from_attributes(const req_state* s, rgw::sal::RGWObject* obj, KeyValue
 }
 
 // populate event from request
-void populate_event_from_request(const req_state *s, 
+void populate_event_from_request(const reservation_t& res,
         rgw::sal::RGWObject* obj,
         uint64_t size,
         const ceph::real_time& mtime, 
         const std::string& etag, 
         EventType event_type,
-        rgw_pubsub_s3_event& event) { 
+        rgw_pubsub_s3_event& event) {
+  const auto s = res.s;
   event.eventTime = mtime;
   event.eventName = to_string(event_type);
   event.userIdentity = s->user->get_id().id;    // user that triggered the change
@@ -674,7 +684,7 @@ void populate_event_from_request(const req_state *s,
   event.bucket_name = s->bucket_name;
   event.bucket_ownerIdentity = s->bucket_owner.get_id().id;
   event.bucket_arn = to_string(rgw::ARN(s->bucket->get_key()));
-  event.object_key = obj->get_name();
+  event.object_key = res.object_name ? *res.object_name : obj->get_name();
   event.object_size = size;
   event.object_etag = etag;
   event.object_versionId = obj->get_instance();
@@ -684,12 +694,14 @@ void populate_event_from_request(const req_state *s,
           std::back_inserter(event.object_sequencer));
   set_event_id(event.id, etag, ts);
   event.bucket_id = s->bucket->get_bucket_id();
-  // pass meta data
-  if (s->info.x_meta_map.empty()) {
-    // try to fetch the metadata from the attributes
+  // pass metadata
+  if (res.cached_metadata.empty()) {
+    // no metadata cached:
+    // either no metadata exist or no metadata filter was used
+    event.x_meta_map = s->info.x_meta_map;
     metadata_from_attributes(s, obj, event.x_meta_map);
   } else {
-    event.x_meta_map = s->info.x_meta_map;
+    event.x_meta_map = std::move(res.cached_metadata);
   }
   // pass tags
   if (s->tagset.get_tags().empty()) {
@@ -701,29 +713,23 @@ void populate_event_from_request(const req_state *s,
   // opaque data will be filled from topic configuration
 }
 
-bool notification_match(const rgw_pubsub_topic_filter& filter, const req_state* s, rgw::sal::RGWObject* obj, 
-    EventType event, const RGWObjTags* req_tags) {
-  if (!match(filter.events, event)) { 
+bool notification_match(reservation_t& res, const rgw_pubsub_topic_filter& filter, EventType event, const RGWObjTags* req_tags) {
+  if (!match(filter.events, event)) {
     return false;
   }
-  if (!match(filter.s3_filter.key_filter, obj->get_name())) {
+  const auto obj = res.object;
+  if (!match(filter.s3_filter.key_filter, 
+        res.object_name ? *res.object_name : obj->get_name())) {
     return false;
   }
 
+  const auto s = res.s;
   if (!filter.s3_filter.metadata_filter.kv.empty()) {
     // metadata filter exists
-    if (!s->info.x_meta_map.empty()) {
-      // metadata was cached in req_state
-      if (!match(filter.s3_filter.metadata_filter, s->info.x_meta_map)) {
-        return false;
-      }
-    } else {
-      // try to fetch the metadata from the attributes
-      KeyValueMap metadata;
-      metadata_from_attributes(s, obj, metadata);
-      if (!match(filter.s3_filter.metadata_filter, metadata)) {
-        return false;
-      }
+    res.cached_metadata = s->info.x_meta_map;
+    metadata_from_attributes(s, obj, res.cached_metadata);
+    if (!match(filter.s3_filter.metadata_filter, res.cached_metadata)) {
+      return false;
     }
   }
 
@@ -767,7 +773,7 @@ int publish_reserve(const DoutPrefixProvider *dpp, EventType event_type,
   for (const auto& bucket_topic : bucket_topics.topics) {
     const rgw_pubsub_topic_filter& topic_filter = bucket_topic.second;
     const rgw_pubsub_topic& topic_cfg = topic_filter.topic;
-    if (!notification_match(topic_filter, res.s, res.object, event_type, req_tags)) {
+    if (!notification_match(res, topic_filter, event_type, req_tags)) {
       // notification does not apply to req_state
       continue;
     }
@@ -820,7 +826,7 @@ int publish_commit(rgw::sal::RGWObject* obj,
       continue;
     }
     event_entry_t event_entry;
-    populate_event_from_request(res.s, obj, size, mtime, etag, event_type, event_entry.event);
+    populate_event_from_request(res, obj, size, mtime, etag, event_type, event_entry.event);
     event_entry.event.configurationId = topic.configurationId;
     event_entry.event.opaque_data = topic.cfg.opaque_data;
     if (topic.cfg.dest.persistent) {