1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 #include "db/write_thread.h"
9 #include "db/column_family.h"
10 #include "monitoring/perf_context_imp.h"
11 #include "port/port.h"
12 #include "util/random.h"
13 #include "util/sync_point.h"
17 WriteThread::WriteThread(const ImmutableDBOptions
& db_options
)
18 : max_yield_usec_(db_options
.enable_write_thread_adaptive_yield
19 ? db_options
.write_thread_max_yield_usec
21 slow_yield_usec_(db_options
.write_thread_slow_yield_usec
),
22 allow_concurrent_memtable_write_(
23 db_options
.allow_concurrent_memtable_write
),
24 enable_pipelined_write_(db_options
.enable_pipelined_write
),
25 newest_writer_(nullptr),
26 newest_memtable_writer_(nullptr),
30 stall_cv_(&stall_mu_
) {}
32 uint8_t WriteThread::BlockingAwaitState(Writer
* w
, uint8_t goal_mask
) {
33 // We're going to block. Lazily create the mutex. We guarantee
34 // propagation of this construction to the waker via the
35 // STATE_LOCKED_WAITING state. The waker won't try to touch the mutex
36 // or the condvar unless they CAS away the STATE_LOCKED_WAITING that
40 auto state
= w
->state
.load(std::memory_order_acquire
);
41 assert(state
!= STATE_LOCKED_WAITING
);
42 if ((state
& goal_mask
) == 0 &&
43 w
->state
.compare_exchange_strong(state
, STATE_LOCKED_WAITING
)) {
44 // we have permission (and an obligation) to use StateMutex
45 std::unique_lock
<std::mutex
> guard(w
->StateMutex());
46 w
->StateCV().wait(guard
, [w
] {
47 return w
->state
.load(std::memory_order_relaxed
) != STATE_LOCKED_WAITING
;
49 state
= w
->state
.load(std::memory_order_relaxed
);
51 // else tricky. Goal is met or CAS failed. In the latter case the waker
52 // must have changed the state, and compare_exchange_strong has updated
53 // our local variable with the new one. At the moment WriteThread never
54 // waits for a transition across intermediate states, so we know that
55 // since a state change has occurred the goal must have been met.
56 assert((state
& goal_mask
) != 0);
60 uint8_t WriteThread::AwaitState(Writer
* w
, uint8_t goal_mask
,
61 AdaptationContext
* ctx
) {
64 // 1. Busy loop using "pause" for 1 micro sec
65 // 2. Else SOMETIMES busy loop using "yield" for 100 micro sec (default)
66 // 3. Else blocking wait
68 // On a modern Xeon each loop takes about 7 nanoseconds (most of which
69 // is the effect of the pause instruction), so 200 iterations is a bit
70 // more than a microsecond. This is long enough that waits longer than
71 // this can amortize the cost of accessing the clock and yielding.
72 for (uint32_t tries
= 0; tries
< 200; ++tries
) {
73 state
= w
->state
.load(std::memory_order_acquire
);
74 if ((state
& goal_mask
) != 0) {
77 port::AsmVolatilePause();
80 // This is below the fast path, so that the stat is zero when all writes are
81 // from the same thread.
82 PERF_TIMER_GUARD(write_thread_wait_nanos
);
84 // If we're only going to end up waiting a short period of time,
85 // it can be a lot more efficient to call std::this_thread::yield()
86 // in a loop than to block in StateMutex(). For reference, on my 4.0
87 // SELinux test server with support for syscall auditing enabled, the
88 // minimum latency between FUTEX_WAKE to returning from FUTEX_WAIT is
89 // 2.7 usec, and the average is more like 10 usec. That can be a big
90 // drag on RockDB's single-writer design. Of course, spinning is a
91 // bad idea if other threads are waiting to run or if we're going to
92 // wait for a long time. How do we decide?
94 // We break waiting into 3 categories: short-uncontended,
95 // short-contended, and long. If we had an oracle, then we would always
96 // spin for short-uncontended, always block for long, and our choice for
97 // short-contended might depend on whether we were trying to optimize
98 // RocksDB throughput or avoid being greedy with system resources.
100 // Bucketing into short or long is easy by measuring elapsed time.
101 // Differentiating short-uncontended from short-contended is a bit
102 // trickier, but not too bad. We could look for involuntary context
103 // switches using getrusage(RUSAGE_THREAD, ..), but it's less work
104 // (portability code and CPU) to just look for yield calls that take
105 // longer than we expect. sched_yield() doesn't actually result in any
106 // context switch overhead if there are no other runnable processes
107 // on the current core, in which case it usually takes less than
110 // There are two primary tunables here: the threshold between "short"
111 // and "long" waits, and the threshold at which we suspect that a yield
112 // is slow enough to indicate we should probably block. If these
113 // thresholds are chosen well then CPU-bound workloads that don't
114 // have more threads than cores will experience few context switches
115 // (voluntary or involuntary), and the total number of context switches
116 // (voluntary and involuntary) will not be dramatically larger (maybe
117 // 2x) than the number of voluntary context switches that occur when
118 // --max_yield_wait_micros=0.
120 // There's another constant, which is the number of slow yields we will
121 // tolerate before reversing our previous decision. Solitary slow
122 // yields are pretty common (low-priority small jobs ready to run),
123 // so this should be at least 2. We set this conservatively to 3 so
124 // that we can also immediately schedule a ctx adaptation, rather than
125 // waiting for the next update_ctx.
127 const size_t kMaxSlowYieldsWhileSpinning
= 3;
129 // Whether the yield approach has any credit in this context. The credit is
130 // added by yield being succesfull before timing out, and decreased otherwise.
131 auto& yield_credit
= ctx
->value
;
132 // Update the yield_credit based on sample runs or right after a hard failure
133 bool update_ctx
= false;
134 // Should we reinforce the yield credit
135 bool would_spin_again
= false;
136 // The samling base for updating the yeild credit. The sampling rate would be
138 const int sampling_base
= 256;
140 if (max_yield_usec_
> 0) {
141 update_ctx
= Random::GetTLSInstance()->OneIn(sampling_base
);
143 if (update_ctx
|| yield_credit
.load(std::memory_order_relaxed
) >= 0) {
144 // we're updating the adaptation statistics, or spinning has >
145 // 50% chance of being shorter than max_yield_usec_ and causing no
146 // involuntary context switches
147 auto spin_begin
= std::chrono::steady_clock::now();
149 // this variable doesn't include the final yield (if any) that
150 // causes the goal to be met
151 size_t slow_yield_count
= 0;
153 auto iter_begin
= spin_begin
;
154 while ((iter_begin
- spin_begin
) <=
155 std::chrono::microseconds(max_yield_usec_
)) {
156 std::this_thread::yield();
158 state
= w
->state
.load(std::memory_order_acquire
);
159 if ((state
& goal_mask
) != 0) {
161 would_spin_again
= true;
165 auto now
= std::chrono::steady_clock::now();
166 if (now
== iter_begin
||
167 now
- iter_begin
>= std::chrono::microseconds(slow_yield_usec_
)) {
168 // conservatively count it as a slow yield if our clock isn't
169 // accurate enough to measure the yield duration
171 if (slow_yield_count
>= kMaxSlowYieldsWhileSpinning
) {
172 // Not just one ivcsw, but several. Immediately update yield_credit
173 // and fall back to blocking
183 if ((state
& goal_mask
) == 0) {
184 state
= BlockingAwaitState(w
, goal_mask
);
188 // Since our update is sample based, it is ok if a thread overwrites the
189 // updates by other threads. Thus the update does not have to be atomic.
190 auto v
= yield_credit
.load(std::memory_order_relaxed
);
191 // fixed point exponential decay with decay constant 1/1024, with +1
192 // and -1 scaled to avoid overflow for int32_t
194 // On each update the positive credit is decayed by a facor of 1/1024 (i.e.,
195 // 0.1%). If the sampled yield was successful, the credit is also increased
196 // by X. Setting X=2^17 ensures that the credit never exceeds
197 // 2^17*2^10=2^27, which is lower than 2^31 the upperbound of int32_t. Same
198 // logic applies to negative credits.
199 v
= v
- (v
/ 1024) + (would_spin_again
? 1 : -1) * 131072;
200 yield_credit
.store(v
, std::memory_order_relaxed
);
203 assert((state
& goal_mask
) != 0);
207 void WriteThread::SetState(Writer
* w
, uint8_t new_state
) {
208 auto state
= w
->state
.load(std::memory_order_acquire
);
209 if (state
== STATE_LOCKED_WAITING
||
210 !w
->state
.compare_exchange_strong(state
, new_state
)) {
211 assert(state
== STATE_LOCKED_WAITING
);
213 std::lock_guard
<std::mutex
> guard(w
->StateMutex());
214 assert(w
->state
.load(std::memory_order_relaxed
) != new_state
);
215 w
->state
.store(new_state
, std::memory_order_relaxed
);
216 w
->StateCV().notify_one();
220 bool WriteThread::LinkOne(Writer
* w
, std::atomic
<Writer
*>* newest_writer
) {
221 assert(newest_writer
!= nullptr);
222 assert(w
->state
== STATE_INIT
);
223 Writer
* writers
= newest_writer
->load(std::memory_order_relaxed
);
225 // If write stall in effect, and w->no_slowdown is not true,
226 // block here until stall is cleared. If its true, then return
228 if (writers
== &write_stall_dummy_
) {
229 if (w
->no_slowdown
) {
230 w
->status
= Status::Incomplete("Write stall");
231 SetState(w
, STATE_COMPLETED
);
234 // Since no_slowdown is false, wait here to be notified of the write
237 MutexLock
lock(&stall_mu_
);
238 writers
= newest_writer
->load(std::memory_order_relaxed
);
239 if (writers
== &write_stall_dummy_
) {
241 // Load newest_writers_ again since it may have changed
242 writers
= newest_writer
->load(std::memory_order_relaxed
);
247 w
->link_older
= writers
;
248 if (newest_writer
->compare_exchange_weak(writers
, w
)) {
249 return (writers
== nullptr);
254 bool WriteThread::LinkGroup(WriteGroup
& write_group
,
255 std::atomic
<Writer
*>* newest_writer
) {
256 assert(newest_writer
!= nullptr);
257 Writer
* leader
= write_group
.leader
;
258 Writer
* last_writer
= write_group
.last_writer
;
259 Writer
* w
= last_writer
;
261 // Unset link_newer pointers to make sure when we call
262 // CreateMissingNewerLinks later it create all missing links.
263 w
->link_newer
= nullptr;
264 w
->write_group
= nullptr;
270 Writer
* newest
= newest_writer
->load(std::memory_order_relaxed
);
272 leader
->link_older
= newest
;
273 if (newest_writer
->compare_exchange_weak(newest
, last_writer
)) {
274 return (newest
== nullptr);
279 void WriteThread::CreateMissingNewerLinks(Writer
* head
) {
281 Writer
* next
= head
->link_older
;
282 if (next
== nullptr || next
->link_newer
!= nullptr) {
283 assert(next
== nullptr || next
->link_newer
== head
);
286 next
->link_newer
= head
;
291 WriteThread::Writer
* WriteThread::FindNextLeader(Writer
* from
,
293 assert(from
!= nullptr && from
!= boundary
);
294 Writer
* current
= from
;
295 while (current
->link_older
!= boundary
) {
296 current
= current
->link_older
;
297 assert(current
!= nullptr);
302 void WriteThread::CompleteLeader(WriteGroup
& write_group
) {
303 assert(write_group
.size
> 0);
304 Writer
* leader
= write_group
.leader
;
305 if (write_group
.size
== 1) {
306 write_group
.leader
= nullptr;
307 write_group
.last_writer
= nullptr;
309 assert(leader
->link_newer
!= nullptr);
310 leader
->link_newer
->link_older
= nullptr;
311 write_group
.leader
= leader
->link_newer
;
313 write_group
.size
-= 1;
314 SetState(leader
, STATE_COMPLETED
);
317 void WriteThread::CompleteFollower(Writer
* w
, WriteGroup
& write_group
) {
318 assert(write_group
.size
> 1);
319 assert(w
!= write_group
.leader
);
320 if (w
== write_group
.last_writer
) {
321 w
->link_older
->link_newer
= nullptr;
322 write_group
.last_writer
= w
->link_older
;
324 w
->link_older
->link_newer
= w
->link_newer
;
325 w
->link_newer
->link_older
= w
->link_older
;
327 write_group
.size
-= 1;
328 SetState(w
, STATE_COMPLETED
);
331 void WriteThread::BeginWriteStall() {
332 LinkOne(&write_stall_dummy_
, &newest_writer_
);
334 // Walk writer list until w->write_group != nullptr. The current write group
335 // will not have a mix of slowdown/no_slowdown, so its ok to stop at that
337 Writer
* w
= write_stall_dummy_
.link_older
;
338 Writer
* prev
= &write_stall_dummy_
;
339 while (w
!= nullptr && w
->write_group
== nullptr) {
340 if (w
->no_slowdown
) {
341 prev
->link_older
= w
->link_older
;
342 w
->status
= Status::Incomplete("Write stall");
343 SetState(w
, STATE_COMPLETED
);
344 w
= prev
->link_older
;
352 void WriteThread::EndWriteStall() {
353 MutexLock
lock(&stall_mu_
);
355 assert(newest_writer_
.load(std::memory_order_relaxed
) == &write_stall_dummy_
);
356 newest_writer_
.exchange(write_stall_dummy_
.link_older
);
359 stall_cv_
.SignalAll();
362 static WriteThread::AdaptationContext
jbg_ctx("JoinBatchGroup");
363 void WriteThread::JoinBatchGroup(Writer
* w
) {
364 TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Start", w
);
365 assert(w
->batch
!= nullptr);
367 bool linked_as_leader
= LinkOne(w
, &newest_writer_
);
369 if (linked_as_leader
) {
370 SetState(w
, STATE_GROUP_LEADER
);
373 TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w
);
375 if (!linked_as_leader
) {
378 * 1) An existing leader pick us as the new leader when it finishes
379 * 2) An existing leader pick us as its follewer and
380 * 2.1) finishes the memtable writes on our behalf
381 * 2.2) Or tell us to finish the memtable writes in pralallel
382 * 3) (pipelined write) An existing leader pick us as its follower and
383 * finish book-keeping and WAL write for us, enqueue us as pending
384 * memtable writer, and
385 * 3.1) we become memtable writer group leader, or
386 * 3.2) an existing memtable writer group leader tell us to finish memtable
387 * writes in parallel.
389 TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:BeganWaiting", w
);
390 AwaitState(w
, STATE_GROUP_LEADER
| STATE_MEMTABLE_WRITER_LEADER
|
391 STATE_PARALLEL_MEMTABLE_WRITER
| STATE_COMPLETED
,
393 TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w
);
397 size_t WriteThread::EnterAsBatchGroupLeader(Writer
* leader
,
398 WriteGroup
* write_group
) {
399 assert(leader
->link_older
== nullptr);
400 assert(leader
->batch
!= nullptr);
401 assert(write_group
!= nullptr);
403 size_t size
= WriteBatchInternal::ByteSize(leader
->batch
);
405 // Allow the group to grow up to a maximum size, but if the
406 // original write is small, limit the growth so we do not slow
407 // down the small write too much.
408 size_t max_size
= 1 << 20;
409 if (size
<= (128 << 10)) {
410 max_size
= size
+ (128 << 10);
413 leader
->write_group
= write_group
;
414 write_group
->leader
= leader
;
415 write_group
->last_writer
= leader
;
416 write_group
->size
= 1;
417 Writer
* newest_writer
= newest_writer_
.load(std::memory_order_acquire
);
419 // This is safe regardless of any db mutex status of the caller. Previous
420 // calls to ExitAsGroupLeader either didn't call CreateMissingNewerLinks
421 // (they emptied the list and then we added ourself as leader) or had to
422 // explicitly wake us up (the list was non-empty when we added ourself,
423 // so we have already received our MarkJoined).
424 CreateMissingNewerLinks(newest_writer
);
426 // Tricky. Iteration start (leader) is exclusive and finish
427 // (newest_writer) is inclusive. Iteration goes from old to new.
429 while (w
!= newest_writer
) {
432 if (w
->sync
&& !leader
->sync
) {
433 // Do not include a sync write into a batch handled by a non-sync write.
437 if (w
->no_slowdown
!= leader
->no_slowdown
) {
438 // Do not mix writes that are ok with delays with the ones that
439 // request fail on delays.
443 if (!w
->disable_wal
&& leader
->disable_wal
) {
444 // Do not include a write that needs WAL into a batch that has
449 if (w
->batch
== nullptr) {
450 // Do not include those writes with nullptr batch. Those are not writes,
451 // those are something else. They want to be alone
455 if (w
->callback
!= nullptr && !w
->callback
->AllowWriteBatching()) {
456 // dont batch writes that don't want to be batched
460 auto batch_size
= WriteBatchInternal::ByteSize(w
->batch
);
461 if (size
+ batch_size
> max_size
) {
462 // Do not make batch too big
466 w
->write_group
= write_group
;
468 write_group
->last_writer
= w
;
471 TEST_SYNC_POINT_CALLBACK("WriteThread::EnterAsBatchGroupLeader:End", w
);
475 void WriteThread::EnterAsMemTableWriter(Writer
* leader
,
476 WriteGroup
* write_group
) {
477 assert(leader
!= nullptr);
478 assert(leader
->link_older
== nullptr);
479 assert(leader
->batch
!= nullptr);
480 assert(write_group
!= nullptr);
482 size_t size
= WriteBatchInternal::ByteSize(leader
->batch
);
484 // Allow the group to grow up to a maximum size, but if the
485 // original write is small, limit the growth so we do not slow
486 // down the small write too much.
487 size_t max_size
= 1 << 20;
488 if (size
<= (128 << 10)) {
489 max_size
= size
+ (128 << 10);
492 leader
->write_group
= write_group
;
493 write_group
->leader
= leader
;
494 write_group
->size
= 1;
495 Writer
* last_writer
= leader
;
497 if (!allow_concurrent_memtable_write_
|| !leader
->batch
->HasMerge()) {
498 Writer
* newest_writer
= newest_memtable_writer_
.load();
499 CreateMissingNewerLinks(newest_writer
);
502 while (w
!= newest_writer
) {
505 if (w
->batch
== nullptr) {
509 if (w
->batch
->HasMerge()) {
513 if (!allow_concurrent_memtable_write_
) {
514 auto batch_size
= WriteBatchInternal::ByteSize(w
->batch
);
515 if (size
+ batch_size
> max_size
) {
516 // Do not make batch too big
522 w
->write_group
= write_group
;
528 write_group
->last_writer
= last_writer
;
529 write_group
->last_sequence
=
530 last_writer
->sequence
+ WriteBatchInternal::Count(last_writer
->batch
) - 1;
533 void WriteThread::ExitAsMemTableWriter(Writer
* /*self*/,
534 WriteGroup
& write_group
) {
535 Writer
* leader
= write_group
.leader
;
536 Writer
* last_writer
= write_group
.last_writer
;
538 Writer
* newest_writer
= last_writer
;
539 if (!newest_memtable_writer_
.compare_exchange_strong(newest_writer
,
541 CreateMissingNewerLinks(newest_writer
);
542 Writer
* next_leader
= last_writer
->link_newer
;
543 assert(next_leader
!= nullptr);
544 next_leader
->link_older
= nullptr;
545 SetState(next_leader
, STATE_MEMTABLE_WRITER_LEADER
);
549 if (!write_group
.status
.ok()) {
550 w
->status
= write_group
.status
;
552 Writer
* next
= w
->link_newer
;
554 SetState(w
, STATE_COMPLETED
);
556 if (w
== last_writer
) {
561 // Note that leader has to exit last, since it owns the write group.
562 SetState(leader
, STATE_COMPLETED
);
565 void WriteThread::LaunchParallelMemTableWriters(WriteGroup
* write_group
) {
566 assert(write_group
!= nullptr);
567 write_group
->running
.store(write_group
->size
);
568 for (auto w
: *write_group
) {
569 SetState(w
, STATE_PARALLEL_MEMTABLE_WRITER
);
573 static WriteThread::AdaptationContext
cpmtw_ctx("CompleteParallelMemTableWriter");
574 // This method is called by both the leader and parallel followers
575 bool WriteThread::CompleteParallelMemTableWriter(Writer
* w
) {
577 auto* write_group
= w
->write_group
;
578 if (!w
->status
.ok()) {
579 std::lock_guard
<std::mutex
> guard(write_group
->leader
->StateMutex());
580 write_group
->status
= w
->status
;
583 if (write_group
->running
-- > 1) {
584 // we're not the last one
585 AwaitState(w
, STATE_COMPLETED
, &cpmtw_ctx
);
588 // else we're the last parallel worker and should perform exit duties.
589 w
->status
= write_group
->status
;
593 void WriteThread::ExitAsBatchGroupFollower(Writer
* w
) {
594 auto* write_group
= w
->write_group
;
596 assert(w
->state
== STATE_PARALLEL_MEMTABLE_WRITER
);
597 assert(write_group
->status
.ok());
598 ExitAsBatchGroupLeader(*write_group
, write_group
->status
);
599 assert(w
->status
.ok());
600 assert(w
->state
== STATE_COMPLETED
);
601 SetState(write_group
->leader
, STATE_COMPLETED
);
604 static WriteThread::AdaptationContext
eabgl_ctx("ExitAsBatchGroupLeader");
605 void WriteThread::ExitAsBatchGroupLeader(WriteGroup
& write_group
,
607 Writer
* leader
= write_group
.leader
;
608 Writer
* last_writer
= write_group
.last_writer
;
609 assert(leader
->link_older
== nullptr);
611 // Propagate memtable write error to the whole group.
612 if (status
.ok() && !write_group
.status
.ok()) {
613 status
= write_group
.status
;
616 if (enable_pipelined_write_
) {
617 // Notify writers don't write to memtable to exit.
618 for (Writer
* w
= last_writer
; w
!= leader
;) {
619 Writer
* next
= w
->link_older
;
621 if (!w
->ShouldWriteToMemtable()) {
622 CompleteFollower(w
, write_group
);
626 if (!leader
->ShouldWriteToMemtable()) {
627 CompleteLeader(write_group
);
630 Writer
* next_leader
= nullptr;
632 // Look for next leader before we call LinkGroup. If there isn't
633 // pending writers, place a dummy writer at the tail of the queue
634 // so we know the boundary of the current write group.
636 Writer
* expected
= last_writer
;
637 bool has_dummy
= newest_writer_
.compare_exchange_strong(expected
, &dummy
);
639 // We find at least one pending writer when we insert dummy. We search
640 // for next leader from there.
641 next_leader
= FindNextLeader(expected
, last_writer
);
642 assert(next_leader
!= nullptr && next_leader
!= last_writer
);
645 // Link the ramaining of the group to memtable writer list.
647 // We have to link our group to memtable writer queue before wake up the
648 // next leader or set newest_writer_ to null, otherwise the next leader
649 // can run ahead of us and link to memtable writer queue before we do.
650 if (write_group
.size
> 0) {
651 if (LinkGroup(write_group
, &newest_memtable_writer_
)) {
652 // The leader can now be different from current writer.
653 SetState(write_group
.leader
, STATE_MEMTABLE_WRITER_LEADER
);
657 // If we have inserted dummy in the queue, remove it now and check if there
658 // are pending writer join the queue since we insert the dummy. If so,
659 // look for next leader again.
661 assert(next_leader
== nullptr);
663 bool has_pending_writer
=
664 !newest_writer_
.compare_exchange_strong(expected
, nullptr);
665 if (has_pending_writer
) {
666 next_leader
= FindNextLeader(expected
, &dummy
);
667 assert(next_leader
!= nullptr && next_leader
!= &dummy
);
671 if (next_leader
!= nullptr) {
672 next_leader
->link_older
= nullptr;
673 SetState(next_leader
, STATE_GROUP_LEADER
);
675 AwaitState(leader
, STATE_MEMTABLE_WRITER_LEADER
|
676 STATE_PARALLEL_MEMTABLE_WRITER
| STATE_COMPLETED
,
679 Writer
* head
= newest_writer_
.load(std::memory_order_acquire
);
680 if (head
!= last_writer
||
681 !newest_writer_
.compare_exchange_strong(head
, nullptr)) {
682 // Either w wasn't the head during the load(), or it was the head
683 // during the load() but somebody else pushed onto the list before
684 // we did the compare_exchange_strong (causing it to fail). In the
685 // latter case compare_exchange_strong has the effect of re-reading
686 // its first param (head). No need to retry a failing CAS, because
687 // only a departing leader (which we are at the moment) can remove
688 // nodes from the list.
689 assert(head
!= last_writer
);
691 // After walking link_older starting from head (if not already done)
692 // we will be able to traverse w->link_newer below. This function
693 // can only be called from an active leader, only a leader can
694 // clear newest_writer_, we didn't, and only a clear newest_writer_
695 // could cause the next leader to start their work without a call
696 // to MarkJoined, so we can definitely conclude that no other leader
697 // work is going on here (with or without db mutex).
698 CreateMissingNewerLinks(head
);
699 assert(last_writer
->link_newer
->link_older
== last_writer
);
700 last_writer
->link_newer
->link_older
= nullptr;
702 // Next leader didn't self-identify, because newest_writer_ wasn't
703 // nullptr when they enqueued (we were definitely enqueued before them
704 // and are still in the list). That means leader handoff occurs when
705 // we call MarkJoined
706 SetState(last_writer
->link_newer
, STATE_GROUP_LEADER
);
708 // else nobody else was waiting, although there might already be a new
711 while (last_writer
!= leader
) {
712 last_writer
->status
= status
;
713 // we need to read link_older before calling SetState, because as soon
714 // as it is marked committed the other thread's Await may return and
715 // deallocate the Writer.
716 auto next
= last_writer
->link_older
;
717 SetState(last_writer
, STATE_COMPLETED
);
724 static WriteThread::AdaptationContext
eu_ctx("EnterUnbatched");
725 void WriteThread::EnterUnbatched(Writer
* w
, InstrumentedMutex
* mu
) {
726 assert(w
!= nullptr && w
->batch
== nullptr);
728 bool linked_as_leader
= LinkOne(w
, &newest_writer_
);
729 if (!linked_as_leader
) {
730 TEST_SYNC_POINT("WriteThread::EnterUnbatched:Wait");
731 // Last leader will not pick us as a follower since our batch is nullptr
732 AwaitState(w
, STATE_GROUP_LEADER
, &eu_ctx
);
734 if (enable_pipelined_write_
) {
735 WaitForMemTableWriters();
740 void WriteThread::ExitUnbatched(Writer
* w
) {
741 assert(w
!= nullptr);
742 Writer
* newest_writer
= w
;
743 if (!newest_writer_
.compare_exchange_strong(newest_writer
, nullptr)) {
744 CreateMissingNewerLinks(newest_writer
);
745 Writer
* next_leader
= w
->link_newer
;
746 assert(next_leader
!= nullptr);
747 next_leader
->link_older
= nullptr;
748 SetState(next_leader
, STATE_GROUP_LEADER
);
752 static WriteThread::AdaptationContext
wfmw_ctx("WaitForMemTableWriters");
753 void WriteThread::WaitForMemTableWriters() {
754 assert(enable_pipelined_write_
);
755 if (newest_memtable_writer_
.load() == nullptr) {
759 if (!LinkOne(&w
, &newest_memtable_writer_
)) {
760 AwaitState(&w
, STATE_MEMTABLE_WRITER_LEADER
, &wfmw_ctx
);
762 newest_memtable_writer_
.store(nullptr);
765 } // namespace rocksdb