#include "db/column_family.h"
#include "monitoring/perf_context_imp.h"
#include "port/port.h"
+#include "test_util/sync_point.h"
#include "util/random.h"
-#include "util/sync_point.h"
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
WriteThread::WriteThread(const ImmutableDBOptions& db_options)
: max_yield_usec_(db_options.enable_write_thread_adaptive_yield
allow_concurrent_memtable_write_(
db_options.allow_concurrent_memtable_write),
enable_pipelined_write_(db_options.enable_pipelined_write),
+ max_write_batch_group_size_bytes(
+ db_options.max_write_batch_group_size_bytes),
newest_writer_(nullptr),
newest_memtable_writer_(nullptr),
last_sequence_(0),
uint8_t WriteThread::AwaitState(Writer* w, uint8_t goal_mask,
AdaptationContext* ctx) {
- uint8_t state;
+ uint8_t state = 0;
// 1. Busy loop using "pause" for 1 micro sec
// 2. Else SOMETIMES busy loop using "yield" for 100 micro sec (default)
prev->link_older = w->link_older;
w->status = Status::Incomplete("Write stall");
SetState(w, STATE_COMPLETED);
+ if (prev->link_older) {
+ prev->link_older->link_newer = prev;
+ }
w = prev->link_older;
} else {
prev = w;
void WriteThread::EndWriteStall() {
MutexLock lock(&stall_mu_);
+ // Unlink write_stall_dummy_ from the write queue. This will unblock
+ // pending write threads to enqueue themselves
assert(newest_writer_.load(std::memory_order_relaxed) == &write_stall_dummy_);
+ assert(write_stall_dummy_.link_older != nullptr);
+ write_stall_dummy_.link_older->link_newer = write_stall_dummy_.link_newer;
newest_writer_.exchange(write_stall_dummy_.link_older);
// Wake up writers
// Allow the group to grow up to a maximum size, but if the
// original write is small, limit the growth so we do not slow
// down the small write too much.
- size_t max_size = 1 << 20;
- if (size <= (128 << 10)) {
- max_size = size + (128 << 10);
+ size_t max_size = max_write_batch_group_size_bytes;
+ const uint64_t min_batch_size_bytes = max_write_batch_group_size_bytes / 8;
+ if (size <= min_batch_size_bytes) {
+ max_size = size + min_batch_size_bytes;
}
leader->write_group = write_group;
break;
}
- if (!w->disable_wal && leader->disable_wal) {
- // Do not include a write that needs WAL into a batch that has
+ if (w->disable_wal != leader->disable_wal) {
+ // Do not mix writes that enable WAL with the ones whose
// WAL disabled.
break;
}
// Allow the group to grow up to a maximum size, but if the
// original write is small, limit the growth so we do not slow
// down the small write too much.
- size_t max_size = 1 << 20;
- if (size <= (128 << 10)) {
- max_size = size + (128 << 10);
+ size_t max_size = max_write_batch_group_size_bytes;
+ const uint64_t min_batch_size_bytes = max_write_batch_group_size_bytes / 8;
+ if (size <= min_batch_size_bytes) {
+ max_size = size + min_batch_size_bytes;
}
leader->write_group = write_group;
newest_memtable_writer_.store(nullptr);
}
-} // namespace rocksdb
+} // namespace ROCKSDB_NAMESPACE