]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_sync.h
update sources to 12.2.10
[ceph.git] / ceph / src / rgw / rgw_sync.h
index c651f7a9ad1b98191a44207db7beadf543079417..ad27413d9b82e2c203eaa22f3fbfd788ea6e5f40 100644 (file)
@@ -280,6 +280,36 @@ public:
   }
 };
 
+class RGWOrderCallCR : public RGWCoroutine
+{
+public:
+  RGWOrderCallCR(CephContext *cct) : RGWCoroutine(cct) {}
+
+  virtual void call_cr(RGWCoroutine *_cr) = 0;
+};
+
+class RGWLastCallerWinsCR : public RGWOrderCallCR
+{
+  RGWCoroutine *cr{nullptr};
+
+public:
+  RGWLastCallerWinsCR(CephContext *cct) : RGWOrderCallCR(cct) {}
+  ~RGWLastCallerWinsCR() {
+    if (cr) {
+      cr->put();
+    }
+  }
+
+  int operate() override;
+
+  void call_cr(RGWCoroutine *_cr) {
+    if (cr) {
+      cr->put();
+    }
+    cr = _cr;
+  }
+};
+
 template <class T, class K>
 class RGWSyncShardMarkerTrack {
   struct marker_entry {
@@ -296,16 +326,22 @@ class RGWSyncShardMarkerTrack {
   int window_size;
   int updates_since_flush;
 
+  RGWOrderCallCR *order_cr{nullptr};
 
 protected:
   typename std::set<K> need_retry_set;
 
   virtual RGWCoroutine *store_marker(const T& new_marker, uint64_t index_pos, const real_time& timestamp) = 0;
+  virtual RGWOrderCallCR *allocate_order_control_cr() = 0;
   virtual void handle_finish(const T& marker) { }
 
 public:
   RGWSyncShardMarkerTrack(int _window_size) : window_size(_window_size), updates_since_flush(0) {}
-  virtual ~RGWSyncShardMarkerTrack() {}
+  virtual ~RGWSyncShardMarkerTrack() {
+    if (order_cr) {
+      order_cr->put();
+    }
+  }
 
   bool start(const T& pos, int index_pos, const real_time& timestamp) {
     if (pending.find(pos) != pending.end()) {
@@ -372,7 +408,7 @@ public:
     --i;
     const T& high_marker = i->first;
     marker_entry& high_entry = i->second;
-    RGWCoroutine *cr = store_marker(high_marker, high_entry.pos, high_entry.timestamp);
+    RGWCoroutine *cr = order(store_marker(high_marker, high_entry.pos, high_entry.timestamp));
     finish_markers.erase(finish_markers.begin(), last);
     return cr;
   }
@@ -395,6 +431,24 @@ public:
   void reset_need_retry(const K& key) {
     need_retry_set.erase(key);
   }
+
+  RGWCoroutine *order(RGWCoroutine *cr) {
+    /* either returns a new RGWLastWriteWinsCR, or update existing one, in which case it returns
+     * nothing and the existing one will call the cr
+     */
+    if (order_cr && order_cr->is_done()) {
+      order_cr->put();
+      order_cr = nullptr;
+    }
+    if (!order_cr) {
+      order_cr = allocate_order_control_cr();
+      order_cr->get();
+      order_cr->call_cr(cr);
+      return order_cr;
+    }
+    order_cr->call_cr(cr);
+    return nullptr; /* don't call it a second time */
+  }
 };
 
 class RGWMetaSyncShardMarkerTrack;