1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 #include "db/write_thread.h"
9 #include "db/column_family.h"
10 #include "monitoring/perf_context_imp.h"
11 #include "port/port.h"
12 #include "test_util/sync_point.h"
13 #include "util/random.h"
15 namespace ROCKSDB_NAMESPACE
{
17 WriteThread::WriteThread(const ImmutableDBOptions
& db_options
)
18 : max_yield_usec_(db_options
.enable_write_thread_adaptive_yield
19 ? db_options
.write_thread_max_yield_usec
21 slow_yield_usec_(db_options
.write_thread_slow_yield_usec
),
22 allow_concurrent_memtable_write_(
23 db_options
.allow_concurrent_memtable_write
),
24 enable_pipelined_write_(db_options
.enable_pipelined_write
),
25 max_write_batch_group_size_bytes(
26 db_options
.max_write_batch_group_size_bytes
),
27 newest_writer_(nullptr),
28 newest_memtable_writer_(nullptr),
32 stall_cv_(&stall_mu_
) {}
34 uint8_t WriteThread::BlockingAwaitState(Writer
* w
, uint8_t goal_mask
) {
35 // We're going to block. Lazily create the mutex. We guarantee
36 // propagation of this construction to the waker via the
37 // STATE_LOCKED_WAITING state. The waker won't try to touch the mutex
38 // or the condvar unless they CAS away the STATE_LOCKED_WAITING that
42 auto state
= w
->state
.load(std::memory_order_acquire
);
43 assert(state
!= STATE_LOCKED_WAITING
);
44 if ((state
& goal_mask
) == 0 &&
45 w
->state
.compare_exchange_strong(state
, STATE_LOCKED_WAITING
)) {
46 // we have permission (and an obligation) to use StateMutex
47 std::unique_lock
<std::mutex
> guard(w
->StateMutex());
48 w
->StateCV().wait(guard
, [w
] {
49 return w
->state
.load(std::memory_order_relaxed
) != STATE_LOCKED_WAITING
;
51 state
= w
->state
.load(std::memory_order_relaxed
);
53 // else tricky. Goal is met or CAS failed. In the latter case the waker
54 // must have changed the state, and compare_exchange_strong has updated
55 // our local variable with the new one. At the moment WriteThread never
56 // waits for a transition across intermediate states, so we know that
57 // since a state change has occurred the goal must have been met.
58 assert((state
& goal_mask
) != 0);
62 uint8_t WriteThread::AwaitState(Writer
* w
, uint8_t goal_mask
,
63 AdaptationContext
* ctx
) {
66 // 1. Busy loop using "pause" for 1 micro sec
67 // 2. Else SOMETIMES busy loop using "yield" for 100 micro sec (default)
68 // 3. Else blocking wait
70 // On a modern Xeon each loop takes about 7 nanoseconds (most of which
71 // is the effect of the pause instruction), so 200 iterations is a bit
72 // more than a microsecond. This is long enough that waits longer than
73 // this can amortize the cost of accessing the clock and yielding.
74 for (uint32_t tries
= 0; tries
< 200; ++tries
) {
75 state
= w
->state
.load(std::memory_order_acquire
);
76 if ((state
& goal_mask
) != 0) {
79 port::AsmVolatilePause();
82 // This is below the fast path, so that the stat is zero when all writes are
83 // from the same thread.
84 PERF_TIMER_GUARD(write_thread_wait_nanos
);
86 // If we're only going to end up waiting a short period of time,
87 // it can be a lot more efficient to call std::this_thread::yield()
88 // in a loop than to block in StateMutex(). For reference, on my 4.0
89 // SELinux test server with support for syscall auditing enabled, the
90 // minimum latency between FUTEX_WAKE to returning from FUTEX_WAIT is
91 // 2.7 usec, and the average is more like 10 usec. That can be a big
92 // drag on RockDB's single-writer design. Of course, spinning is a
93 // bad idea if other threads are waiting to run or if we're going to
94 // wait for a long time. How do we decide?
96 // We break waiting into 3 categories: short-uncontended,
97 // short-contended, and long. If we had an oracle, then we would always
98 // spin for short-uncontended, always block for long, and our choice for
99 // short-contended might depend on whether we were trying to optimize
100 // RocksDB throughput or avoid being greedy with system resources.
102 // Bucketing into short or long is easy by measuring elapsed time.
103 // Differentiating short-uncontended from short-contended is a bit
104 // trickier, but not too bad. We could look for involuntary context
105 // switches using getrusage(RUSAGE_THREAD, ..), but it's less work
106 // (portability code and CPU) to just look for yield calls that take
107 // longer than we expect. sched_yield() doesn't actually result in any
108 // context switch overhead if there are no other runnable processes
109 // on the current core, in which case it usually takes less than
112 // There are two primary tunables here: the threshold between "short"
113 // and "long" waits, and the threshold at which we suspect that a yield
114 // is slow enough to indicate we should probably block. If these
115 // thresholds are chosen well then CPU-bound workloads that don't
116 // have more threads than cores will experience few context switches
117 // (voluntary or involuntary), and the total number of context switches
118 // (voluntary and involuntary) will not be dramatically larger (maybe
119 // 2x) than the number of voluntary context switches that occur when
120 // --max_yield_wait_micros=0.
122 // There's another constant, which is the number of slow yields we will
123 // tolerate before reversing our previous decision. Solitary slow
124 // yields are pretty common (low-priority small jobs ready to run),
125 // so this should be at least 2. We set this conservatively to 3 so
126 // that we can also immediately schedule a ctx adaptation, rather than
127 // waiting for the next update_ctx.
129 const size_t kMaxSlowYieldsWhileSpinning
= 3;
131 // Whether the yield approach has any credit in this context. The credit is
132 // added by yield being succesfull before timing out, and decreased otherwise.
133 auto& yield_credit
= ctx
->value
;
134 // Update the yield_credit based on sample runs or right after a hard failure
135 bool update_ctx
= false;
136 // Should we reinforce the yield credit
137 bool would_spin_again
= false;
138 // The samling base for updating the yeild credit. The sampling rate would be
140 const int sampling_base
= 256;
142 if (max_yield_usec_
> 0) {
143 update_ctx
= Random::GetTLSInstance()->OneIn(sampling_base
);
145 if (update_ctx
|| yield_credit
.load(std::memory_order_relaxed
) >= 0) {
146 // we're updating the adaptation statistics, or spinning has >
147 // 50% chance of being shorter than max_yield_usec_ and causing no
148 // involuntary context switches
149 auto spin_begin
= std::chrono::steady_clock::now();
151 // this variable doesn't include the final yield (if any) that
152 // causes the goal to be met
153 size_t slow_yield_count
= 0;
155 auto iter_begin
= spin_begin
;
156 while ((iter_begin
- spin_begin
) <=
157 std::chrono::microseconds(max_yield_usec_
)) {
158 std::this_thread::yield();
160 state
= w
->state
.load(std::memory_order_acquire
);
161 if ((state
& goal_mask
) != 0) {
163 would_spin_again
= true;
167 auto now
= std::chrono::steady_clock::now();
168 if (now
== iter_begin
||
169 now
- iter_begin
>= std::chrono::microseconds(slow_yield_usec_
)) {
170 // conservatively count it as a slow yield if our clock isn't
171 // accurate enough to measure the yield duration
173 if (slow_yield_count
>= kMaxSlowYieldsWhileSpinning
) {
174 // Not just one ivcsw, but several. Immediately update yield_credit
175 // and fall back to blocking
185 if ((state
& goal_mask
) == 0) {
186 TEST_SYNC_POINT_CALLBACK("WriteThread::AwaitState:BlockingWaiting", w
);
187 state
= BlockingAwaitState(w
, goal_mask
);
191 // Since our update is sample based, it is ok if a thread overwrites the
192 // updates by other threads. Thus the update does not have to be atomic.
193 auto v
= yield_credit
.load(std::memory_order_relaxed
);
194 // fixed point exponential decay with decay constant 1/1024, with +1
195 // and -1 scaled to avoid overflow for int32_t
197 // On each update the positive credit is decayed by a facor of 1/1024 (i.e.,
198 // 0.1%). If the sampled yield was successful, the credit is also increased
199 // by X. Setting X=2^17 ensures that the credit never exceeds
200 // 2^17*2^10=2^27, which is lower than 2^31 the upperbound of int32_t. Same
201 // logic applies to negative credits.
202 v
= v
- (v
/ 1024) + (would_spin_again
? 1 : -1) * 131072;
203 yield_credit
.store(v
, std::memory_order_relaxed
);
206 assert((state
& goal_mask
) != 0);
210 void WriteThread::SetState(Writer
* w
, uint8_t new_state
) {
211 auto state
= w
->state
.load(std::memory_order_acquire
);
212 if (state
== STATE_LOCKED_WAITING
||
213 !w
->state
.compare_exchange_strong(state
, new_state
)) {
214 assert(state
== STATE_LOCKED_WAITING
);
216 std::lock_guard
<std::mutex
> guard(w
->StateMutex());
217 assert(w
->state
.load(std::memory_order_relaxed
) != new_state
);
218 w
->state
.store(new_state
, std::memory_order_relaxed
);
219 w
->StateCV().notify_one();
223 bool WriteThread::LinkOne(Writer
* w
, std::atomic
<Writer
*>* newest_writer
) {
224 assert(newest_writer
!= nullptr);
225 assert(w
->state
== STATE_INIT
);
226 Writer
* writers
= newest_writer
->load(std::memory_order_relaxed
);
228 // If write stall in effect, and w->no_slowdown is not true,
229 // block here until stall is cleared. If its true, then return
231 if (writers
== &write_stall_dummy_
) {
232 if (w
->no_slowdown
) {
233 w
->status
= Status::Incomplete("Write stall");
234 SetState(w
, STATE_COMPLETED
);
237 // Since no_slowdown is false, wait here to be notified of the write
240 MutexLock
lock(&stall_mu_
);
241 writers
= newest_writer
->load(std::memory_order_relaxed
);
242 if (writers
== &write_stall_dummy_
) {
244 // Load newest_writers_ again since it may have changed
245 writers
= newest_writer
->load(std::memory_order_relaxed
);
250 w
->link_older
= writers
;
251 if (newest_writer
->compare_exchange_weak(writers
, w
)) {
252 return (writers
== nullptr);
257 bool WriteThread::LinkGroup(WriteGroup
& write_group
,
258 std::atomic
<Writer
*>* newest_writer
) {
259 assert(newest_writer
!= nullptr);
260 Writer
* leader
= write_group
.leader
;
261 Writer
* last_writer
= write_group
.last_writer
;
262 Writer
* w
= last_writer
;
264 // Unset link_newer pointers to make sure when we call
265 // CreateMissingNewerLinks later it create all missing links.
266 w
->link_newer
= nullptr;
267 w
->write_group
= nullptr;
273 Writer
* newest
= newest_writer
->load(std::memory_order_relaxed
);
275 leader
->link_older
= newest
;
276 if (newest_writer
->compare_exchange_weak(newest
, last_writer
)) {
277 return (newest
== nullptr);
282 void WriteThread::CreateMissingNewerLinks(Writer
* head
) {
284 Writer
* next
= head
->link_older
;
285 if (next
== nullptr || next
->link_newer
!= nullptr) {
286 assert(next
== nullptr || next
->link_newer
== head
);
289 next
->link_newer
= head
;
294 WriteThread::Writer
* WriteThread::FindNextLeader(Writer
* from
,
296 assert(from
!= nullptr && from
!= boundary
);
297 Writer
* current
= from
;
298 while (current
->link_older
!= boundary
) {
299 current
= current
->link_older
;
300 assert(current
!= nullptr);
305 void WriteThread::CompleteLeader(WriteGroup
& write_group
) {
306 assert(write_group
.size
> 0);
307 Writer
* leader
= write_group
.leader
;
308 if (write_group
.size
== 1) {
309 write_group
.leader
= nullptr;
310 write_group
.last_writer
= nullptr;
312 assert(leader
->link_newer
!= nullptr);
313 leader
->link_newer
->link_older
= nullptr;
314 write_group
.leader
= leader
->link_newer
;
316 write_group
.size
-= 1;
317 SetState(leader
, STATE_COMPLETED
);
320 void WriteThread::CompleteFollower(Writer
* w
, WriteGroup
& write_group
) {
321 assert(write_group
.size
> 1);
322 assert(w
!= write_group
.leader
);
323 if (w
== write_group
.last_writer
) {
324 w
->link_older
->link_newer
= nullptr;
325 write_group
.last_writer
= w
->link_older
;
327 w
->link_older
->link_newer
= w
->link_newer
;
328 w
->link_newer
->link_older
= w
->link_older
;
330 write_group
.size
-= 1;
331 SetState(w
, STATE_COMPLETED
);
334 void WriteThread::BeginWriteStall() {
335 LinkOne(&write_stall_dummy_
, &newest_writer_
);
337 // Walk writer list until w->write_group != nullptr. The current write group
338 // will not have a mix of slowdown/no_slowdown, so its ok to stop at that
340 Writer
* w
= write_stall_dummy_
.link_older
;
341 Writer
* prev
= &write_stall_dummy_
;
342 while (w
!= nullptr && w
->write_group
== nullptr) {
343 if (w
->no_slowdown
) {
344 prev
->link_older
= w
->link_older
;
345 w
->status
= Status::Incomplete("Write stall");
346 SetState(w
, STATE_COMPLETED
);
347 if (prev
->link_older
) {
348 prev
->link_older
->link_newer
= prev
;
350 w
= prev
->link_older
;
358 void WriteThread::EndWriteStall() {
359 MutexLock
lock(&stall_mu_
);
361 // Unlink write_stall_dummy_ from the write queue. This will unblock
362 // pending write threads to enqueue themselves
363 assert(newest_writer_
.load(std::memory_order_relaxed
) == &write_stall_dummy_
);
364 assert(write_stall_dummy_
.link_older
!= nullptr);
365 write_stall_dummy_
.link_older
->link_newer
= write_stall_dummy_
.link_newer
;
366 newest_writer_
.exchange(write_stall_dummy_
.link_older
);
369 stall_cv_
.SignalAll();
372 static WriteThread::AdaptationContext
jbg_ctx("JoinBatchGroup");
373 void WriteThread::JoinBatchGroup(Writer
* w
) {
374 TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Start", w
);
375 assert(w
->batch
!= nullptr);
377 bool linked_as_leader
= LinkOne(w
, &newest_writer_
);
379 if (linked_as_leader
) {
380 SetState(w
, STATE_GROUP_LEADER
);
383 TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w
);
385 if (!linked_as_leader
) {
388 * 1) An existing leader pick us as the new leader when it finishes
389 * 2) An existing leader pick us as its follewer and
390 * 2.1) finishes the memtable writes on our behalf
391 * 2.2) Or tell us to finish the memtable writes in pralallel
392 * 3) (pipelined write) An existing leader pick us as its follower and
393 * finish book-keeping and WAL write for us, enqueue us as pending
394 * memtable writer, and
395 * 3.1) we become memtable writer group leader, or
396 * 3.2) an existing memtable writer group leader tell us to finish memtable
397 * writes in parallel.
399 TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:BeganWaiting", w
);
400 AwaitState(w
, STATE_GROUP_LEADER
| STATE_MEMTABLE_WRITER_LEADER
|
401 STATE_PARALLEL_MEMTABLE_WRITER
| STATE_COMPLETED
,
403 TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w
);
407 size_t WriteThread::EnterAsBatchGroupLeader(Writer
* leader
,
408 WriteGroup
* write_group
) {
409 assert(leader
->link_older
== nullptr);
410 assert(leader
->batch
!= nullptr);
411 assert(write_group
!= nullptr);
413 size_t size
= WriteBatchInternal::ByteSize(leader
->batch
);
415 // Allow the group to grow up to a maximum size, but if the
416 // original write is small, limit the growth so we do not slow
417 // down the small write too much.
418 size_t max_size
= max_write_batch_group_size_bytes
;
419 const uint64_t min_batch_size_bytes
= max_write_batch_group_size_bytes
/ 8;
420 if (size
<= min_batch_size_bytes
) {
421 max_size
= size
+ min_batch_size_bytes
;
424 leader
->write_group
= write_group
;
425 write_group
->leader
= leader
;
426 write_group
->last_writer
= leader
;
427 write_group
->size
= 1;
428 Writer
* newest_writer
= newest_writer_
.load(std::memory_order_acquire
);
430 // This is safe regardless of any db mutex status of the caller. Previous
431 // calls to ExitAsGroupLeader either didn't call CreateMissingNewerLinks
432 // (they emptied the list and then we added ourself as leader) or had to
433 // explicitly wake us up (the list was non-empty when we added ourself,
434 // so we have already received our MarkJoined).
435 CreateMissingNewerLinks(newest_writer
);
437 // Tricky. Iteration start (leader) is exclusive and finish
438 // (newest_writer) is inclusive. Iteration goes from old to new.
440 while (w
!= newest_writer
) {
443 if (w
->sync
&& !leader
->sync
) {
444 // Do not include a sync write into a batch handled by a non-sync write.
448 if (w
->no_slowdown
!= leader
->no_slowdown
) {
449 // Do not mix writes that are ok with delays with the ones that
450 // request fail on delays.
454 if (w
->disable_wal
!= leader
->disable_wal
) {
455 // Do not mix writes that enable WAL with the ones whose
460 if (w
->batch
== nullptr) {
461 // Do not include those writes with nullptr batch. Those are not writes,
462 // those are something else. They want to be alone
466 if (w
->callback
!= nullptr && !w
->callback
->AllowWriteBatching()) {
467 // dont batch writes that don't want to be batched
471 auto batch_size
= WriteBatchInternal::ByteSize(w
->batch
);
472 if (size
+ batch_size
> max_size
) {
473 // Do not make batch too big
477 w
->write_group
= write_group
;
479 write_group
->last_writer
= w
;
482 TEST_SYNC_POINT_CALLBACK("WriteThread::EnterAsBatchGroupLeader:End", w
);
486 void WriteThread::EnterAsMemTableWriter(Writer
* leader
,
487 WriteGroup
* write_group
) {
488 assert(leader
!= nullptr);
489 assert(leader
->link_older
== nullptr);
490 assert(leader
->batch
!= nullptr);
491 assert(write_group
!= nullptr);
493 size_t size
= WriteBatchInternal::ByteSize(leader
->batch
);
495 // Allow the group to grow up to a maximum size, but if the
496 // original write is small, limit the growth so we do not slow
497 // down the small write too much.
498 size_t max_size
= max_write_batch_group_size_bytes
;
499 const uint64_t min_batch_size_bytes
= max_write_batch_group_size_bytes
/ 8;
500 if (size
<= min_batch_size_bytes
) {
501 max_size
= size
+ min_batch_size_bytes
;
504 leader
->write_group
= write_group
;
505 write_group
->leader
= leader
;
506 write_group
->size
= 1;
507 Writer
* last_writer
= leader
;
509 if (!allow_concurrent_memtable_write_
|| !leader
->batch
->HasMerge()) {
510 Writer
* newest_writer
= newest_memtable_writer_
.load();
511 CreateMissingNewerLinks(newest_writer
);
514 while (w
!= newest_writer
) {
517 if (w
->batch
== nullptr) {
521 if (w
->batch
->HasMerge()) {
525 if (!allow_concurrent_memtable_write_
) {
526 auto batch_size
= WriteBatchInternal::ByteSize(w
->batch
);
527 if (size
+ batch_size
> max_size
) {
528 // Do not make batch too big
534 w
->write_group
= write_group
;
540 write_group
->last_writer
= last_writer
;
541 write_group
->last_sequence
=
542 last_writer
->sequence
+ WriteBatchInternal::Count(last_writer
->batch
) - 1;
545 void WriteThread::ExitAsMemTableWriter(Writer
* /*self*/,
546 WriteGroup
& write_group
) {
547 Writer
* leader
= write_group
.leader
;
548 Writer
* last_writer
= write_group
.last_writer
;
550 Writer
* newest_writer
= last_writer
;
551 if (!newest_memtable_writer_
.compare_exchange_strong(newest_writer
,
553 CreateMissingNewerLinks(newest_writer
);
554 Writer
* next_leader
= last_writer
->link_newer
;
555 assert(next_leader
!= nullptr);
556 next_leader
->link_older
= nullptr;
557 SetState(next_leader
, STATE_MEMTABLE_WRITER_LEADER
);
561 if (!write_group
.status
.ok()) {
562 w
->status
= write_group
.status
;
564 Writer
* next
= w
->link_newer
;
566 SetState(w
, STATE_COMPLETED
);
568 if (w
== last_writer
) {
573 // Note that leader has to exit last, since it owns the write group.
574 SetState(leader
, STATE_COMPLETED
);
577 void WriteThread::LaunchParallelMemTableWriters(WriteGroup
* write_group
) {
578 assert(write_group
!= nullptr);
579 write_group
->running
.store(write_group
->size
);
580 for (auto w
: *write_group
) {
581 SetState(w
, STATE_PARALLEL_MEMTABLE_WRITER
);
585 static WriteThread::AdaptationContext
cpmtw_ctx("CompleteParallelMemTableWriter");
586 // This method is called by both the leader and parallel followers
587 bool WriteThread::CompleteParallelMemTableWriter(Writer
* w
) {
589 auto* write_group
= w
->write_group
;
590 if (!w
->status
.ok()) {
591 std::lock_guard
<std::mutex
> guard(write_group
->leader
->StateMutex());
592 write_group
->status
= w
->status
;
595 if (write_group
->running
-- > 1) {
596 // we're not the last one
597 AwaitState(w
, STATE_COMPLETED
, &cpmtw_ctx
);
600 // else we're the last parallel worker and should perform exit duties.
601 w
->status
= write_group
->status
;
605 void WriteThread::ExitAsBatchGroupFollower(Writer
* w
) {
606 auto* write_group
= w
->write_group
;
608 assert(w
->state
== STATE_PARALLEL_MEMTABLE_WRITER
);
609 assert(write_group
->status
.ok());
610 ExitAsBatchGroupLeader(*write_group
, write_group
->status
);
611 assert(w
->status
.ok());
612 assert(w
->state
== STATE_COMPLETED
);
613 SetState(write_group
->leader
, STATE_COMPLETED
);
616 static WriteThread::AdaptationContext
eabgl_ctx("ExitAsBatchGroupLeader");
617 void WriteThread::ExitAsBatchGroupLeader(WriteGroup
& write_group
,
619 Writer
* leader
= write_group
.leader
;
620 Writer
* last_writer
= write_group
.last_writer
;
621 assert(leader
->link_older
== nullptr);
623 // Propagate memtable write error to the whole group.
624 if (status
.ok() && !write_group
.status
.ok()) {
625 status
= write_group
.status
;
628 if (enable_pipelined_write_
) {
629 // Notify writers don't write to memtable to exit.
630 for (Writer
* w
= last_writer
; w
!= leader
;) {
631 Writer
* next
= w
->link_older
;
633 if (!w
->ShouldWriteToMemtable()) {
634 CompleteFollower(w
, write_group
);
638 if (!leader
->ShouldWriteToMemtable()) {
639 CompleteLeader(write_group
);
642 Writer
* next_leader
= nullptr;
644 // Look for next leader before we call LinkGroup. If there isn't
645 // pending writers, place a dummy writer at the tail of the queue
646 // so we know the boundary of the current write group.
648 Writer
* expected
= last_writer
;
649 bool has_dummy
= newest_writer_
.compare_exchange_strong(expected
, &dummy
);
651 // We find at least one pending writer when we insert dummy. We search
652 // for next leader from there.
653 next_leader
= FindNextLeader(expected
, last_writer
);
654 assert(next_leader
!= nullptr && next_leader
!= last_writer
);
657 // Link the ramaining of the group to memtable writer list.
659 // We have to link our group to memtable writer queue before wake up the
660 // next leader or set newest_writer_ to null, otherwise the next leader
661 // can run ahead of us and link to memtable writer queue before we do.
662 if (write_group
.size
> 0) {
663 if (LinkGroup(write_group
, &newest_memtable_writer_
)) {
664 // The leader can now be different from current writer.
665 SetState(write_group
.leader
, STATE_MEMTABLE_WRITER_LEADER
);
669 // If we have inserted dummy in the queue, remove it now and check if there
670 // are pending writer join the queue since we insert the dummy. If so,
671 // look for next leader again.
673 assert(next_leader
== nullptr);
675 bool has_pending_writer
=
676 !newest_writer_
.compare_exchange_strong(expected
, nullptr);
677 if (has_pending_writer
) {
678 next_leader
= FindNextLeader(expected
, &dummy
);
679 assert(next_leader
!= nullptr && next_leader
!= &dummy
);
683 if (next_leader
!= nullptr) {
684 next_leader
->link_older
= nullptr;
685 SetState(next_leader
, STATE_GROUP_LEADER
);
687 AwaitState(leader
, STATE_MEMTABLE_WRITER_LEADER
|
688 STATE_PARALLEL_MEMTABLE_WRITER
| STATE_COMPLETED
,
691 Writer
* head
= newest_writer_
.load(std::memory_order_acquire
);
692 if (head
!= last_writer
||
693 !newest_writer_
.compare_exchange_strong(head
, nullptr)) {
694 // Either w wasn't the head during the load(), or it was the head
695 // during the load() but somebody else pushed onto the list before
696 // we did the compare_exchange_strong (causing it to fail). In the
697 // latter case compare_exchange_strong has the effect of re-reading
698 // its first param (head). No need to retry a failing CAS, because
699 // only a departing leader (which we are at the moment) can remove
700 // nodes from the list.
701 assert(head
!= last_writer
);
703 // After walking link_older starting from head (if not already done)
704 // we will be able to traverse w->link_newer below. This function
705 // can only be called from an active leader, only a leader can
706 // clear newest_writer_, we didn't, and only a clear newest_writer_
707 // could cause the next leader to start their work without a call
708 // to MarkJoined, so we can definitely conclude that no other leader
709 // work is going on here (with or without db mutex).
710 CreateMissingNewerLinks(head
);
711 assert(last_writer
->link_newer
->link_older
== last_writer
);
712 last_writer
->link_newer
->link_older
= nullptr;
714 // Next leader didn't self-identify, because newest_writer_ wasn't
715 // nullptr when they enqueued (we were definitely enqueued before them
716 // and are still in the list). That means leader handoff occurs when
717 // we call MarkJoined
718 SetState(last_writer
->link_newer
, STATE_GROUP_LEADER
);
720 // else nobody else was waiting, although there might already be a new
723 while (last_writer
!= leader
) {
724 last_writer
->status
= status
;
725 // we need to read link_older before calling SetState, because as soon
726 // as it is marked committed the other thread's Await may return and
727 // deallocate the Writer.
728 auto next
= last_writer
->link_older
;
729 SetState(last_writer
, STATE_COMPLETED
);
736 static WriteThread::AdaptationContext
eu_ctx("EnterUnbatched");
737 void WriteThread::EnterUnbatched(Writer
* w
, InstrumentedMutex
* mu
) {
738 assert(w
!= nullptr && w
->batch
== nullptr);
740 bool linked_as_leader
= LinkOne(w
, &newest_writer_
);
741 if (!linked_as_leader
) {
742 TEST_SYNC_POINT("WriteThread::EnterUnbatched:Wait");
743 // Last leader will not pick us as a follower since our batch is nullptr
744 AwaitState(w
, STATE_GROUP_LEADER
, &eu_ctx
);
746 if (enable_pipelined_write_
) {
747 WaitForMemTableWriters();
752 void WriteThread::ExitUnbatched(Writer
* w
) {
753 assert(w
!= nullptr);
754 Writer
* newest_writer
= w
;
755 if (!newest_writer_
.compare_exchange_strong(newest_writer
, nullptr)) {
756 CreateMissingNewerLinks(newest_writer
);
757 Writer
* next_leader
= w
->link_newer
;
758 assert(next_leader
!= nullptr);
759 next_leader
->link_older
= nullptr;
760 SetState(next_leader
, STATE_GROUP_LEADER
);
764 static WriteThread::AdaptationContext
wfmw_ctx("WaitForMemTableWriters");
765 void WriteThread::WaitForMemTableWriters() {
766 assert(enable_pipelined_write_
);
767 if (newest_memtable_writer_
.load() == nullptr) {
771 if (!LinkOne(&w
, &newest_memtable_writer_
)) {
772 AwaitState(&w
, STATE_MEMTABLE_WRITER_LEADER
, &wfmw_ctx
);
774 newest_memtable_writer_
.store(nullptr);
777 } // namespace ROCKSDB_NAMESPACE