// (found in the LICENSE.Apache file in the root directory).
#include "db/write_thread.h"
+
#include <chrono>
#include <thread>
+
#include "db/column_family.h"
#include "monitoring/perf_context_imp.h"
#include "port/port.h"
}
void WriteThread::SetState(Writer* w, uint8_t new_state) {
+ assert(w);
auto state = w->state.load(std::memory_order_acquire);
if (state == STATE_LOCKED_WAITING ||
!w->state.compare_exchange_strong(state, new_state)) {
MutexLock lock(&stall_mu_);
writers = newest_writer->load(std::memory_order_relaxed);
if (writers == &write_stall_dummy_) {
+ TEST_SYNC_POINT_CALLBACK("WriteThread::WriteStall::Wait", w);
stall_cv_.Wait();
// Load newest_writers_ again since it may have changed
writers = newest_writer->load(std::memory_order_relaxed);
}
}
-WriteThread::Writer* WriteThread::FindNextLeader(Writer* from,
- Writer* boundary) {
- assert(from != nullptr && from != boundary);
- Writer* current = from;
- while (current->link_older != boundary) {
- current = current->link_older;
- assert(current != nullptr);
- }
- return current;
-}
-
void WriteThread::CompleteLeader(WriteGroup& write_group) {
assert(write_group.size > 0);
Writer* leader = write_group.leader;
}
TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w);
+ TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait2", w);
if (!linked_as_leader) {
/**
* writes in parallel.
*/
TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:BeganWaiting", w);
- AwaitState(w, STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER |
- STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
+ AwaitState(w,
+ STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER |
+ STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
&jbg_ctx);
TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w);
}
break;
}
+ if (w->protection_bytes_per_key != leader->protection_bytes_per_key) {
+ // Do not mix writes with different levels of integrity protection.
+ break;
+ }
+
+ if (w->rate_limiter_priority != leader->rate_limiter_priority) {
+ // Do not mix writes with different rate limiter priorities.
+ break;
+ }
+
if (w->batch == nullptr) {
// Do not include those writes with nullptr batch. Those are not writes,
// those are something else. They want to be alone
}
}
-static WriteThread::AdaptationContext cpmtw_ctx("CompleteParallelMemTableWriter");
+static WriteThread::AdaptationContext cpmtw_ctx(
+ "CompleteParallelMemTableWriter");
// This method is called by both the leader and parallel followers
bool WriteThread::CompleteParallelMemTableWriter(Writer* w) {
-
auto* write_group = w->write_group;
if (!w->status.ok()) {
std::lock_guard<std::mutex> guard(write_group->leader->StateMutex());
static WriteThread::AdaptationContext eabgl_ctx("ExitAsBatchGroupLeader");
void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
Status& status) {
+ TEST_SYNC_POINT_CALLBACK("WriteThread::ExitAsBatchGroupLeader:Start",
+ &write_group);
+
Writer* leader = write_group.leader;
Writer* last_writer = write_group.last_writer;
assert(leader->link_older == nullptr);
}
if (enable_pipelined_write_) {
- // Notify writers don't write to memtable to exit.
+ // We insert a dummy Writer right before our current write_group. This
+ // allows us to unlink our write_group without the risk that a subsequent
+ // writer becomes a new leader and might overtake us and add itself to the
+ // memtable-writer-list before we can do so. This ensures that writers are
+ // added to the memtable-writer-list in the exact same order in which they
+ // were in the newest_writer list.
+ // This must happen before completing the writers from our group to prevent
+ // a race where the owning thread of one of these writers can start a new
+ // write operation.
+ Writer dummy;
+ Writer* head = newest_writer_.load(std::memory_order_acquire);
+ if (head != last_writer ||
+ !newest_writer_.compare_exchange_strong(head, &dummy)) {
+ // Either last_writer wasn't the head during the load(), or it was the
+ // head during the load() but somebody else pushed onto the list before
+ // we did the compare_exchange_strong (causing it to fail). In the latter
+ // case compare_exchange_strong has the effect of re-reading its first
+ // param (head). No need to retry a failing CAS, because only a departing
+ // leader (which we are at the moment) can remove nodes from the list.
+ assert(head != last_writer);
+
+ // After walking link_older starting from head (if not already done) we
+ // will be able to traverse w->link_newer below.
+ CreateMissingNewerLinks(head);
+ assert(last_writer->link_newer != nullptr);
+ last_writer->link_newer->link_older = &dummy;
+ dummy.link_newer = last_writer->link_newer;
+ }
+
+ // Complete writers that don't write to memtable
for (Writer* w = last_writer; w != leader;) {
Writer* next = w->link_older;
w->status = status;
CompleteLeader(write_group);
}
- Writer* next_leader = nullptr;
-
- // Look for next leader before we call LinkGroup. If there isn't
- // pending writers, place a dummy writer at the tail of the queue
- // so we know the boundary of the current write group.
- Writer dummy;
- Writer* expected = last_writer;
- bool has_dummy = newest_writer_.compare_exchange_strong(expected, &dummy);
- if (!has_dummy) {
- // We find at least one pending writer when we insert dummy. We search
- // for next leader from there.
- next_leader = FindNextLeader(expected, last_writer);
- assert(next_leader != nullptr && next_leader != last_writer);
- }
+ TEST_SYNC_POINT_CALLBACK(
+ "WriteThread::ExitAsBatchGroupLeader:AfterCompleteWriters",
+ &write_group);
- // Link the ramaining of the group to memtable writer list.
- //
+ // Link the remaining of the group to memtable writer list.
// We have to link our group to memtable writer queue before wake up the
// next leader or set newest_writer_ to null, otherwise the next leader
// can run ahead of us and link to memtable writer queue before we do.
}
}
- // If we have inserted dummy in the queue, remove it now and check if there
- // are pending writer join the queue since we insert the dummy. If so,
- // look for next leader again.
- if (has_dummy) {
- assert(next_leader == nullptr);
- expected = &dummy;
- bool has_pending_writer =
- !newest_writer_.compare_exchange_strong(expected, nullptr);
- if (has_pending_writer) {
- next_leader = FindNextLeader(expected, &dummy);
- assert(next_leader != nullptr && next_leader != &dummy);
- }
+ // Unlink the dummy writer from the list and identify the new leader
+ head = newest_writer_.load(std::memory_order_acquire);
+ if (head != &dummy ||
+ !newest_writer_.compare_exchange_strong(head, nullptr)) {
+ CreateMissingNewerLinks(head);
+ Writer* new_leader = dummy.link_newer;
+ assert(new_leader != nullptr);
+ new_leader->link_older = nullptr;
+ SetState(new_leader, STATE_GROUP_LEADER);
}
- if (next_leader != nullptr) {
- next_leader->link_older = nullptr;
- SetState(next_leader, STATE_GROUP_LEADER);
- }
- AwaitState(leader, STATE_MEMTABLE_WRITER_LEADER |
- STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
+ AwaitState(leader,
+ STATE_MEMTABLE_WRITER_LEADER | STATE_PARALLEL_MEMTABLE_WRITER |
+ STATE_COMPLETED,
&eabgl_ctx);
} else {
Writer* head = newest_writer_.load(std::memory_order_acquire);
if (head != last_writer ||
!newest_writer_.compare_exchange_strong(head, nullptr)) {
- // Either w wasn't the head during the load(), or it was the head
- // during the load() but somebody else pushed onto the list before
+ // Either last_writer wasn't the head during the load(), or it was the
+ // head during the load() but somebody else pushed onto the list before
// we did the compare_exchange_strong (causing it to fail). In the
// latter case compare_exchange_strong has the effect of re-reading
// its first param (head). No need to retry a failing CAS, because
// to MarkJoined, so we can definitely conclude that no other leader
// work is going on here (with or without db mutex).
CreateMissingNewerLinks(head);
+ assert(last_writer->link_newer != nullptr);
assert(last_writer->link_newer->link_older == last_writer);
last_writer->link_newer->link_older = nullptr;