}
};
+class RGWOrderCallCR : public RGWCoroutine
+{
+public:
+ RGWOrderCallCR(CephContext *cct) : RGWCoroutine(cct) {}
+
+ virtual void call_cr(RGWCoroutine *_cr) = 0;
+};
+
+class RGWLastCallerWinsCR : public RGWOrderCallCR
+{
+ RGWCoroutine *cr{nullptr};
+
+public:
+ RGWLastCallerWinsCR(CephContext *cct) : RGWOrderCallCR(cct) {}
+ ~RGWLastCallerWinsCR() {
+ if (cr) {
+ cr->put();
+ }
+ }
+
+ int operate() override;
+
+ void call_cr(RGWCoroutine *_cr) {
+ if (cr) {
+ cr->put();
+ }
+ cr = _cr;
+ }
+};
+
template <class T, class K>
class RGWSyncShardMarkerTrack {
struct marker_entry {
int window_size;
int updates_since_flush;
+ RGWOrderCallCR *order_cr{nullptr};
protected:
typename std::set<K> need_retry_set;
virtual RGWCoroutine *store_marker(const T& new_marker, uint64_t index_pos, const real_time& timestamp) = 0;
+ virtual RGWOrderCallCR *allocate_order_control_cr() = 0;
virtual void handle_finish(const T& marker) { }
public:
RGWSyncShardMarkerTrack(int _window_size) : window_size(_window_size), updates_since_flush(0) {}
- virtual ~RGWSyncShardMarkerTrack() {}
+ virtual ~RGWSyncShardMarkerTrack() {
+ if (order_cr) {
+ order_cr->put();
+ }
+ }
bool start(const T& pos, int index_pos, const real_time& timestamp) {
if (pending.find(pos) != pending.end()) {
--i;
const T& high_marker = i->first;
marker_entry& high_entry = i->second;
- RGWCoroutine *cr = store_marker(high_marker, high_entry.pos, high_entry.timestamp);
+ RGWCoroutine *cr = order(store_marker(high_marker, high_entry.pos, high_entry.timestamp));
finish_markers.erase(finish_markers.begin(), last);
return cr;
}
void reset_need_retry(const K& key) {
need_retry_set.erase(key);
}
+
+ RGWCoroutine *order(RGWCoroutine *cr) {
+ /* either returns a new RGWLastWriteWinsCR, or update existing one, in which case it returns
+ * nothing and the existing one will call the cr
+ */
+ if (order_cr && order_cr->is_done()) {
+ order_cr->put();
+ order_cr = nullptr;
+ }
+ if (!order_cr) {
+ order_cr = allocate_order_control_cr();
+ order_cr->get();
+ order_cr->call_cr(cr);
+ return order_cr;
+ }
+ order_cr->call_cr(cr);
+ return nullptr; /* don't call it a second time */
+ }
};
class RGWMetaSyncShardMarkerTrack;