]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/write_thread.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / write_thread.cc
index d26a694aac7c69a03656e7200de0ee5c8cfd6536..cc8645f373119bf60c893377f4e71cb95260e07a 100644 (file)
@@ -4,8 +4,10 @@
 //  (found in the LICENSE.Apache file in the root directory).
 
 #include "db/write_thread.h"
+
 #include <chrono>
 #include <thread>
+
 #include "db/column_family.h"
 #include "monitoring/perf_context_imp.h"
 #include "port/port.h"
@@ -208,6 +210,7 @@ uint8_t WriteThread::AwaitState(Writer* w, uint8_t goal_mask,
 }
 
 void WriteThread::SetState(Writer* w, uint8_t new_state) {
+  assert(w);
   auto state = w->state.load(std::memory_order_acquire);
   if (state == STATE_LOCKED_WAITING ||
       !w->state.compare_exchange_strong(state, new_state)) {
@@ -240,6 +243,7 @@ bool WriteThread::LinkOne(Writer* w, std::atomic<Writer*>* newest_writer) {
         MutexLock lock(&stall_mu_);
         writers = newest_writer->load(std::memory_order_relaxed);
         if (writers == &write_stall_dummy_) {
+          TEST_SYNC_POINT_CALLBACK("WriteThread::WriteStall::Wait", w);
           stall_cv_.Wait();
           // Load newest_writers_ again since it may have changed
           writers = newest_writer->load(std::memory_order_relaxed);
@@ -291,17 +295,6 @@ void WriteThread::CreateMissingNewerLinks(Writer* head) {
   }
 }
 
-WriteThread::Writer* WriteThread::FindNextLeader(Writer* from,
-                                                 Writer* boundary) {
-  assert(from != nullptr && from != boundary);
-  Writer* current = from;
-  while (current->link_older != boundary) {
-    current = current->link_older;
-    assert(current != nullptr);
-  }
-  return current;
-}
-
 void WriteThread::CompleteLeader(WriteGroup& write_group) {
   assert(write_group.size > 0);
   Writer* leader = write_group.leader;
@@ -387,6 +380,7 @@ void WriteThread::JoinBatchGroup(Writer* w) {
   }
 
   TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w);
+  TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait2", w);
 
   if (!linked_as_leader) {
     /**
@@ -403,8 +397,9 @@ void WriteThread::JoinBatchGroup(Writer* w) {
      *      writes in parallel.
      */
     TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:BeganWaiting", w);
-    AwaitState(w, STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER |
-                      STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
+    AwaitState(w,
+               STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER |
+                   STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
                &jbg_ctx);
     TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w);
   }
@@ -464,6 +459,16 @@ size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader,
       break;
     }
 
+    if (w->protection_bytes_per_key != leader->protection_bytes_per_key) {
+      // Do not mix writes with different levels of integrity protection.
+      break;
+    }
+
+    if (w->rate_limiter_priority != leader->rate_limiter_priority) {
+      // Do not mix writes with different rate limiter priorities.
+      break;
+    }
+
     if (w->batch == nullptr) {
       // Do not include those writes with nullptr batch. Those are not writes,
       // those are something else. They want to be alone
@@ -591,10 +596,10 @@ void WriteThread::LaunchParallelMemTableWriters(WriteGroup* write_group) {
   }
 }
 
-static WriteThread::AdaptationContext cpmtw_ctx("CompleteParallelMemTableWriter");
+static WriteThread::AdaptationContext cpmtw_ctx(
+    "CompleteParallelMemTableWriter");
 // This method is called by both the leader and parallel followers
 bool WriteThread::CompleteParallelMemTableWriter(Writer* w) {
-
   auto* write_group = w->write_group;
   if (!w->status.ok()) {
     std::lock_guard<std::mutex> guard(write_group->leader->StateMutex());
@@ -627,6 +632,9 @@ void WriteThread::ExitAsBatchGroupFollower(Writer* w) {
 static WriteThread::AdaptationContext eabgl_ctx("ExitAsBatchGroupLeader");
 void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
                                          Status& status) {
+  TEST_SYNC_POINT_CALLBACK("WriteThread::ExitAsBatchGroupLeader:Start",
+                           &write_group);
+
   Writer* leader = write_group.leader;
   Writer* last_writer = write_group.last_writer;
   assert(leader->link_older == nullptr);
@@ -643,7 +651,36 @@ void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
   }
 
   if (enable_pipelined_write_) {
-    // Notify writers don't write to memtable to exit.
+    // We insert a dummy Writer right before our current write_group. This
+    // allows us to unlink our write_group without the risk that a subsequent
+    // writer becomes a new leader and might overtake us and add itself to the
+    // memtable-writer-list before we can do so. This ensures that writers are
+    // added to the memtable-writer-list in the exact same order in which they
+    // were in the newest_writer list.
+    // This must happen before completing the writers from our group to prevent
+    // a race where the owning thread of one of these writers can start a new
+    // write operation.
+    Writer dummy;
+    Writer* head = newest_writer_.load(std::memory_order_acquire);
+    if (head != last_writer ||
+        !newest_writer_.compare_exchange_strong(head, &dummy)) {
+      // Either last_writer wasn't the head during the load(), or it was the
+      // head during the load() but somebody else pushed onto the list before
+      // we did the compare_exchange_strong (causing it to fail). In the latter
+      // case compare_exchange_strong has the effect of re-reading its first
+      // param (head). No need to retry a failing CAS, because only a departing
+      // leader (which we are at the moment) can remove nodes from the list.
+      assert(head != last_writer);
+
+      // After walking link_older starting from head (if not already done) we
+      // will be able to traverse w->link_newer below.
+      CreateMissingNewerLinks(head);
+      assert(last_writer->link_newer != nullptr);
+      last_writer->link_newer->link_older = &dummy;
+      dummy.link_newer = last_writer->link_newer;
+    }
+
+    // Complete writers that don't write to memtable
     for (Writer* w = last_writer; w != leader;) {
       Writer* next = w->link_older;
       w->status = status;
@@ -656,23 +693,11 @@ void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
       CompleteLeader(write_group);
     }
 
-    Writer* next_leader = nullptr;
-
-    // Look for next leader before we call LinkGroup. If there isn't
-    // pending writers, place a dummy writer at the tail of the queue
-    // so we know the boundary of the current write group.
-    Writer dummy;
-    Writer* expected = last_writer;
-    bool has_dummy = newest_writer_.compare_exchange_strong(expected, &dummy);
-    if (!has_dummy) {
-      // We find at least one pending writer when we insert dummy. We search
-      // for next leader from there.
-      next_leader = FindNextLeader(expected, last_writer);
-      assert(next_leader != nullptr && next_leader != last_writer);
-    }
+    TEST_SYNC_POINT_CALLBACK(
+        "WriteThread::ExitAsBatchGroupLeader:AfterCompleteWriters",
+        &write_group);
 
-    // Link the ramaining of the group to memtable writer list.
-    //
+    // Link the remaining of the group to memtable writer list.
     // We have to link our group to memtable writer queue before wake up the
     // next leader or set newest_writer_ to null, otherwise the next leader
     // can run ahead of us and link to memtable writer queue before we do.
@@ -683,33 +708,27 @@ void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
       }
     }
 
-    // If we have inserted dummy in the queue, remove it now and check if there
-    // are pending writer join the queue since we insert the dummy. If so,
-    // look for next leader again.
-    if (has_dummy) {
-      assert(next_leader == nullptr);
-      expected = &dummy;
-      bool has_pending_writer =
-          !newest_writer_.compare_exchange_strong(expected, nullptr);
-      if (has_pending_writer) {
-        next_leader = FindNextLeader(expected, &dummy);
-        assert(next_leader != nullptr && next_leader != &dummy);
-      }
+    // Unlink the dummy writer from the list and identify the new leader
+    head = newest_writer_.load(std::memory_order_acquire);
+    if (head != &dummy ||
+        !newest_writer_.compare_exchange_strong(head, nullptr)) {
+      CreateMissingNewerLinks(head);
+      Writer* new_leader = dummy.link_newer;
+      assert(new_leader != nullptr);
+      new_leader->link_older = nullptr;
+      SetState(new_leader, STATE_GROUP_LEADER);
     }
 
-    if (next_leader != nullptr) {
-      next_leader->link_older = nullptr;
-      SetState(next_leader, STATE_GROUP_LEADER);
-    }
-    AwaitState(leader, STATE_MEMTABLE_WRITER_LEADER |
-                           STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
+    AwaitState(leader,
+               STATE_MEMTABLE_WRITER_LEADER | STATE_PARALLEL_MEMTABLE_WRITER |
+                   STATE_COMPLETED,
                &eabgl_ctx);
   } else {
     Writer* head = newest_writer_.load(std::memory_order_acquire);
     if (head != last_writer ||
         !newest_writer_.compare_exchange_strong(head, nullptr)) {
-      // Either w wasn't the head during the load(), or it was the head
-      // during the load() but somebody else pushed onto the list before
+      // Either last_writer wasn't the head during the load(), or it was the
+      // head during the load() but somebody else pushed onto the list before
       // we did the compare_exchange_strong (causing it to fail).  In the
       // latter case compare_exchange_strong has the effect of re-reading
       // its first param (head).  No need to retry a failing CAS, because
@@ -725,6 +744,7 @@ void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
       // to MarkJoined, so we can definitely conclude that no other leader
       // work is going on here (with or without db mutex).
       CreateMissingNewerLinks(head);
+      assert(last_writer->link_newer != nullptr);
       assert(last_writer->link_newer->link_older == last_writer);
       last_writer->link_newer->link_older = nullptr;