]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/write_thread.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / db / write_thread.cc
index 835992c8fce7e9900f9dabfc2afa6dd612952708..5f50bba63a3fabae2986a2639bf30c412ddef349 100644 (file)
@@ -9,10 +9,10 @@
 #include "db/column_family.h"
 #include "monitoring/perf_context_imp.h"
 #include "port/port.h"
+#include "test_util/sync_point.h"
 #include "util/random.h"
-#include "util/sync_point.h"
 
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
 
 WriteThread::WriteThread(const ImmutableDBOptions& db_options)
     : max_yield_usec_(db_options.enable_write_thread_adaptive_yield
@@ -22,6 +22,8 @@ WriteThread::WriteThread(const ImmutableDBOptions& db_options)
       allow_concurrent_memtable_write_(
           db_options.allow_concurrent_memtable_write),
       enable_pipelined_write_(db_options.enable_pipelined_write),
+      max_write_batch_group_size_bytes(
+          db_options.max_write_batch_group_size_bytes),
       newest_writer_(nullptr),
       newest_memtable_writer_(nullptr),
       last_sequence_(0),
@@ -59,7 +61,7 @@ uint8_t WriteThread::BlockingAwaitState(Writer* w, uint8_t goal_mask) {
 
 uint8_t WriteThread::AwaitState(Writer* w, uint8_t goal_mask,
                                 AdaptationContext* ctx) {
-  uint8_t state;
+  uint8_t state = 0;
 
   // 1. Busy loop using "pause" for 1 micro sec
   // 2. Else SOMETIMES busy loop using "yield" for 100 micro sec (default)
@@ -342,6 +344,9 @@ void WriteThread::BeginWriteStall() {
       prev->link_older = w->link_older;
       w->status = Status::Incomplete("Write stall");
       SetState(w, STATE_COMPLETED);
+      if (prev->link_older) {
+        prev->link_older->link_newer = prev;
+      }
       w = prev->link_older;
     } else {
       prev = w;
@@ -353,7 +358,11 @@ void WriteThread::BeginWriteStall() {
 void WriteThread::EndWriteStall() {
   MutexLock lock(&stall_mu_);
 
+  // Unlink write_stall_dummy_ from the write queue. This will unblock
+  // pending write threads to enqueue themselves
   assert(newest_writer_.load(std::memory_order_relaxed) == &write_stall_dummy_);
+  assert(write_stall_dummy_.link_older != nullptr);
+  write_stall_dummy_.link_older->link_newer = write_stall_dummy_.link_newer;
   newest_writer_.exchange(write_stall_dummy_.link_older);
 
   // Wake up writers
@@ -406,9 +415,10 @@ size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader,
   // Allow the group to grow up to a maximum size, but if the
   // original write is small, limit the growth so we do not slow
   // down the small write too much.
-  size_t max_size = 1 << 20;
-  if (size <= (128 << 10)) {
-    max_size = size + (128 << 10);
+  size_t max_size = max_write_batch_group_size_bytes;
+  const uint64_t min_batch_size_bytes = max_write_batch_group_size_bytes / 8;
+  if (size <= min_batch_size_bytes) {
+    max_size = size + min_batch_size_bytes;
   }
 
   leader->write_group = write_group;
@@ -441,8 +451,8 @@ size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader,
       break;
     }
 
-    if (!w->disable_wal && leader->disable_wal) {
-      // Do not include a write that needs WAL into a batch that has
+    if (w->disable_wal != leader->disable_wal) {
+      // Do not mix writes that enable WAL with the ones whose
       // WAL disabled.
       break;
     }
@@ -485,9 +495,10 @@ void WriteThread::EnterAsMemTableWriter(Writer* leader,
   // Allow the group to grow up to a maximum size, but if the
   // original write is small, limit the growth so we do not slow
   // down the small write too much.
-  size_t max_size = 1 << 20;
-  if (size <= (128 << 10)) {
-    max_size = size + (128 << 10);
+  size_t max_size = max_write_batch_group_size_bytes;
+  const uint64_t min_batch_size_bytes = max_write_batch_group_size_bytes / 8;
+  if (size <= min_batch_size_bytes) {
+    max_size = size + min_batch_size_bytes;
   }
 
   leader->write_group = write_group;
@@ -763,4 +774,4 @@ void WriteThread::WaitForMemTableWriters() {
   newest_memtable_writer_.store(nullptr);
 }
 
-}  // namespace rocksdb
+}  // namespace ROCKSDB_NAMESPACE