]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/services/svc_notify.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / rgw / services / svc_notify.cc
index ff0080083f12c6b6d9adc75b510965159c6afcfd..43f84ed0a4f7e425383126de64f4f343f823317c 100644 (file)
@@ -253,7 +253,8 @@ void RGWSI_Notify::finalize_watch()
 {
   for (int i = 0; i < num_watchers; i++) {
     RGWWatcher *watcher = watchers[i];
-    watcher->unregister_watch();
+    if (watchers_set.find(i) != watchers_set.end())
+      watcher->unregister_watch();
     delete watcher;
   }
 
@@ -278,6 +279,10 @@ int RGWSI_Notify::do_start(optional_yield y, const DoutPrefixProvider *dpp)
     return r;
   }
 
+  inject_notify_timeout_probability =
+    cct->_conf.get_val<double>("rgw_inject_notify_timeout_probability");
+  max_notify_retries = cct->_conf.get_val<uint64_t>("rgw_max_notify_retries");
+
   control_pool = zone_svc->get_zone_params().control_pool;
 
   int ret = init_watch(dpp, y);
@@ -396,19 +401,69 @@ int RGWSI_Notify::distribute(const DoutPrefixProvider *dpp, const string& key,
   return 0;
 }
 
+namespace librados {
+
+static std::ostream& operator<<(std::ostream& out, const notify_timeout_t& t)
+{
+  return out << t.notifier_id << ':' << t.cookie;
+}
+
+} // namespace librados
+
+using timeout_vector = std::vector<librados::notify_timeout_t>;
+
+static timeout_vector decode_timeouts(const bufferlist& bl)
+{
+  using ceph::decode;
+  auto p = bl.begin();
+
+  // decode and discard the acks
+  uint32_t num_acks;
+  decode(num_acks, p);
+  for (auto i = 0u; i < num_acks; ++i) {
+    std::pair<uint64_t, uint64_t> id;
+    decode(id, p);
+    // discard the payload
+    uint32_t blen;
+    decode(blen, p);
+    p += blen;
+  }
+
+  // decode and return the timeouts
+  uint32_t num_timeouts;
+  decode(num_timeouts, p);
+
+  timeout_vector timeouts;
+  for (auto i = 0u; i < num_timeouts; ++i) {
+    std::pair<uint64_t, uint64_t> id;
+    decode(id, p);
+    timeouts.push_back({id.first, id.second});
+  }
+  return timeouts;
+}
+
 int RGWSI_Notify::robust_notify(const DoutPrefixProvider *dpp,
                                 RGWSI_RADOS::Obj& notify_obj,
                                const RGWCacheNotifyInfo& cni,
                                 optional_yield y)
 {
-  bufferlist bl;
+  bufferlist bl, rbl;
   encode(cni, bl);
 
   // First, try to send, without being fancy about it.
-  auto r = notify_obj.notify(dpp, bl, 0, nullptr, y);
+  auto r = notify_obj.notify(dpp, bl, 0, &rbl, y);
 
   if (r < 0) {
+    timeout_vector timeouts;
+    try {
+      timeouts = decode_timeouts(rbl);
+    } catch (const buffer::error& e) {
+      ldpp_dout(dpp, 0) << "robust_notify failed to decode notify response: "
+          << e.what() << dendl;
+    }
+
     ldpp_dout(dpp, 1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                     << " Watchers " << timeouts << " did not respond."
                      << " Notify failed on object " << cni.obj << ": "
                      << cpp_strerror(-r) << dendl;
   }
@@ -427,10 +482,19 @@ int RGWSI_Notify::robust_notify(const DoutPrefixProvider *dpp,
       ldpp_dout(dpp, 1) << __PRETTY_FUNCTION__ << ":" << __LINE__
                        << " Invalidating obj=" << info.obj << " tries="
                        << tries << dendl;
-      r = notify_obj.notify(dpp, bl, 0, nullptr, y);
+      r = notify_obj.notify(dpp, retrybl, 0, &rbl, y);
       if (r < 0) {
+        timeout_vector timeouts;
+        try {
+          timeouts = decode_timeouts(rbl);
+        } catch (const buffer::error& e) {
+          ldpp_dout(dpp, 0) << "robust_notify failed to decode notify response: "
+              << e.what() << dendl;
+        }
+
        ldpp_dout(dpp, 1) << __PRETTY_FUNCTION__ << ":" << __LINE__
-                         << " invalidation attempt " << tries << " failed: "
+                         << " Watchers " << timeouts << " did not respond."
+                         << " Invalidation attempt " << tries << " failed: "
                          << cpp_strerror(-r) << dendl;
       }
     }