1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
6 #include "db/write_thread.h"
9 #include "db/column_family.h"
10 #include "port/port.h"
11 #include "util/random.h"
12 #include "util/sync_point.h"
16 WriteThread::WriteThread(uint64_t max_yield_usec
, uint64_t slow_yield_usec
)
17 : max_yield_usec_(max_yield_usec
),
18 slow_yield_usec_(slow_yield_usec
),
19 newest_writer_(nullptr) {}
21 uint8_t WriteThread::BlockingAwaitState(Writer
* w
, uint8_t goal_mask
) {
22 // We're going to block. Lazily create the mutex. We guarantee
23 // propagation of this construction to the waker via the
24 // STATE_LOCKED_WAITING state. The waker won't try to touch the mutex
25 // or the condvar unless they CAS away the STATE_LOCKED_WAITING that
29 auto state
= w
->state
.load(std::memory_order_acquire
);
30 assert(state
!= STATE_LOCKED_WAITING
);
31 if ((state
& goal_mask
) == 0 &&
32 w
->state
.compare_exchange_strong(state
, STATE_LOCKED_WAITING
)) {
33 // we have permission (and an obligation) to use StateMutex
34 std::unique_lock
<std::mutex
> guard(w
->StateMutex());
35 w
->StateCV().wait(guard
, [w
] {
36 return w
->state
.load(std::memory_order_relaxed
) != STATE_LOCKED_WAITING
;
38 state
= w
->state
.load(std::memory_order_relaxed
);
40 // else tricky. Goal is met or CAS failed. In the latter case the waker
41 // must have changed the state, and compare_exchange_strong has updated
42 // our local variable with the new one. At the moment WriteThread never
43 // waits for a transition across intermediate states, so we know that
44 // since a state change has occurred the goal must have been met.
45 assert((state
& goal_mask
) != 0);
49 uint8_t WriteThread::AwaitState(Writer
* w
, uint8_t goal_mask
,
50 AdaptationContext
* ctx
) {
53 // On a modern Xeon each loop takes about 7 nanoseconds (most of which
54 // is the effect of the pause instruction), so 200 iterations is a bit
55 // more than a microsecond. This is long enough that waits longer than
56 // this can amortize the cost of accessing the clock and yielding.
57 for (uint32_t tries
= 0; tries
< 200; ++tries
) {
58 state
= w
->state
.load(std::memory_order_acquire
);
59 if ((state
& goal_mask
) != 0) {
62 port::AsmVolatilePause();
65 // If we're only going to end up waiting a short period of time,
66 // it can be a lot more efficient to call std::this_thread::yield()
67 // in a loop than to block in StateMutex(). For reference, on my 4.0
68 // SELinux test server with support for syscall auditing enabled, the
69 // minimum latency between FUTEX_WAKE to returning from FUTEX_WAIT is
70 // 2.7 usec, and the average is more like 10 usec. That can be a big
71 // drag on RockDB's single-writer design. Of course, spinning is a
72 // bad idea if other threads are waiting to run or if we're going to
73 // wait for a long time. How do we decide?
75 // We break waiting into 3 categories: short-uncontended,
76 // short-contended, and long. If we had an oracle, then we would always
77 // spin for short-uncontended, always block for long, and our choice for
78 // short-contended might depend on whether we were trying to optimize
79 // RocksDB throughput or avoid being greedy with system resources.
81 // Bucketing into short or long is easy by measuring elapsed time.
82 // Differentiating short-uncontended from short-contended is a bit
83 // trickier, but not too bad. We could look for involuntary context
84 // switches using getrusage(RUSAGE_THREAD, ..), but it's less work
85 // (portability code and CPU) to just look for yield calls that take
86 // longer than we expect. sched_yield() doesn't actually result in any
87 // context switch overhead if there are no other runnable processes
88 // on the current core, in which case it usually takes less than
91 // There are two primary tunables here: the threshold between "short"
92 // and "long" waits, and the threshold at which we suspect that a yield
93 // is slow enough to indicate we should probably block. If these
94 // thresholds are chosen well then CPU-bound workloads that don't
95 // have more threads than cores will experience few context switches
96 // (voluntary or involuntary), and the total number of context switches
97 // (voluntary and involuntary) will not be dramatically larger (maybe
98 // 2x) than the number of voluntary context switches that occur when
99 // --max_yield_wait_micros=0.
101 // There's another constant, which is the number of slow yields we will
102 // tolerate before reversing our previous decision. Solitary slow
103 // yields are pretty common (low-priority small jobs ready to run),
104 // so this should be at least 2. We set this conservatively to 3 so
105 // that we can also immediately schedule a ctx adaptation, rather than
106 // waiting for the next update_ctx.
108 const size_t kMaxSlowYieldsWhileSpinning
= 3;
110 bool update_ctx
= false;
111 bool would_spin_again
= false;
113 if (max_yield_usec_
> 0) {
114 update_ctx
= Random::GetTLSInstance()->OneIn(256);
116 if (update_ctx
|| ctx
->value
.load(std::memory_order_relaxed
) >= 0) {
117 // we're updating the adaptation statistics, or spinning has >
118 // 50% chance of being shorter than max_yield_usec_ and causing no
119 // involuntary context switches
120 auto spin_begin
= std::chrono::steady_clock::now();
122 // this variable doesn't include the final yield (if any) that
123 // causes the goal to be met
124 size_t slow_yield_count
= 0;
126 auto iter_begin
= spin_begin
;
127 while ((iter_begin
- spin_begin
) <=
128 std::chrono::microseconds(max_yield_usec_
)) {
129 std::this_thread::yield();
131 state
= w
->state
.load(std::memory_order_acquire
);
132 if ((state
& goal_mask
) != 0) {
134 would_spin_again
= true;
138 auto now
= std::chrono::steady_clock::now();
139 if (now
== iter_begin
||
140 now
- iter_begin
>= std::chrono::microseconds(slow_yield_usec_
)) {
141 // conservatively count it as a slow yield if our clock isn't
142 // accurate enough to measure the yield duration
144 if (slow_yield_count
>= kMaxSlowYieldsWhileSpinning
) {
145 // Not just one ivcsw, but several. Immediately update ctx
146 // and fall back to blocking
156 if ((state
& goal_mask
) == 0) {
157 state
= BlockingAwaitState(w
, goal_mask
);
161 auto v
= ctx
->value
.load(std::memory_order_relaxed
);
162 // fixed point exponential decay with decay constant 1/1024, with +1
163 // and -1 scaled to avoid overflow for int32_t
164 v
= v
+ (v
/ 1024) + (would_spin_again
? 1 : -1) * 16384;
165 ctx
->value
.store(v
, std::memory_order_relaxed
);
168 assert((state
& goal_mask
) != 0);
172 void WriteThread::SetState(Writer
* w
, uint8_t new_state
) {
173 auto state
= w
->state
.load(std::memory_order_acquire
);
174 if (state
== STATE_LOCKED_WAITING
||
175 !w
->state
.compare_exchange_strong(state
, new_state
)) {
176 assert(state
== STATE_LOCKED_WAITING
);
178 std::lock_guard
<std::mutex
> guard(w
->StateMutex());
179 assert(w
->state
.load(std::memory_order_relaxed
) != new_state
);
180 w
->state
.store(new_state
, std::memory_order_relaxed
);
181 w
->StateCV().notify_one();
185 void WriteThread::LinkOne(Writer
* w
, bool* linked_as_leader
) {
186 assert(w
->state
== STATE_INIT
);
189 Writer
* writers
= newest_writer_
.load(std::memory_order_relaxed
);
190 w
->link_older
= writers
;
191 if (newest_writer_
.compare_exchange_strong(writers
, w
)) {
192 if (writers
== nullptr) {
193 // this isn't part of the WriteThread machinery, but helps with
194 // debugging and is checked by an assert in WriteImpl
195 w
->state
.store(STATE_GROUP_LEADER
, std::memory_order_relaxed
);
197 // Then we are the head of the queue and hence definiltly the leader
198 *linked_as_leader
= (writers
== nullptr);
199 // Otherwise we will wait for previous leader to define our status
205 void WriteThread::CreateMissingNewerLinks(Writer
* head
) {
207 Writer
* next
= head
->link_older
;
208 if (next
== nullptr || next
->link_newer
!= nullptr) {
209 assert(next
== nullptr || next
->link_newer
== head
);
212 next
->link_newer
= head
;
217 void WriteThread::JoinBatchGroup(Writer
* w
) {
218 static AdaptationContext
ctx("JoinBatchGroup");
220 assert(w
->batch
!= nullptr);
221 bool linked_as_leader
;
222 LinkOne(w
, &linked_as_leader
);
224 TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w
);
226 if (!linked_as_leader
) {
229 * 1) An existing leader pick us as the new leader when it finishes
230 * 2) An exisitng leader pick us as its follewer and
231 * 2.1) finishes the memtable writes on our behalf
232 * 2.2) Or tell us to finish the memtable writes it in pralallel
235 STATE_GROUP_LEADER
| STATE_PARALLEL_FOLLOWER
| STATE_COMPLETED
,
237 TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w
);
241 size_t WriteThread::EnterAsBatchGroupLeader(
242 Writer
* leader
, WriteThread::Writer
** last_writer
,
243 autovector
<WriteThread::Writer
*>* write_batch_group
) {
244 assert(leader
->link_older
== nullptr);
245 assert(leader
->batch
!= nullptr);
247 size_t size
= WriteBatchInternal::ByteSize(leader
->batch
);
248 write_batch_group
->push_back(leader
);
250 // Allow the group to grow up to a maximum size, but if the
251 // original write is small, limit the growth so we do not slow
252 // down the small write too much.
253 size_t max_size
= 1 << 20;
254 if (size
<= (128 << 10)) {
255 max_size
= size
+ (128 << 10);
258 *last_writer
= leader
;
260 Writer
* newest_writer
= newest_writer_
.load(std::memory_order_acquire
);
262 // This is safe regardless of any db mutex status of the caller. Previous
263 // calls to ExitAsGroupLeader either didn't call CreateMissingNewerLinks
264 // (they emptied the list and then we added ourself as leader) or had to
265 // explicitly wake us up (the list was non-empty when we added ourself,
266 // so we have already received our MarkJoined).
267 CreateMissingNewerLinks(newest_writer
);
269 // Tricky. Iteration start (leader) is exclusive and finish
270 // (newest_writer) is inclusive. Iteration goes from old to new.
272 while (w
!= newest_writer
) {
275 if (w
->sync
&& !leader
->sync
) {
276 // Do not include a sync write into a batch handled by a non-sync write.
280 if (w
->no_slowdown
!= leader
->no_slowdown
) {
281 // Do not mix writes that are ok with delays with the ones that
282 // request fail on delays.
286 if (!w
->disable_wal
&& leader
->disable_wal
) {
287 // Do not include a write that needs WAL into a batch that has
292 if (w
->batch
== nullptr) {
293 // Do not include those writes with nullptr batch. Those are not writes,
294 // those are something else. They want to be alone
298 if (w
->callback
!= nullptr && !w
->callback
->AllowWriteBatching()) {
299 // dont batch writes that don't want to be batched
303 auto batch_size
= WriteBatchInternal::ByteSize(w
->batch
);
304 if (size
+ batch_size
> max_size
) {
305 // Do not make batch too big
310 write_batch_group
->push_back(w
);
311 w
->in_batch_group
= true;
317 void WriteThread::LaunchParallelFollowers(ParallelGroup
* pg
,
318 SequenceNumber sequence
) {
319 // EnterAsBatchGroupLeader already created the links from leader to
320 // newer writers in the group
322 pg
->leader
->parallel_group
= pg
;
324 Writer
* w
= pg
->leader
;
325 w
->sequence
= sequence
;
327 // Initialize and wake up the others
328 while (w
!= pg
->last_writer
) {
329 // Writers that won't write don't get sequence allotment
330 if (!w
->CallbackFailed() && w
->ShouldWriteToMemtable()) {
331 // There is a sequence number of each written key
332 sequence
+= WriteBatchInternal::Count(w
->batch
);
336 w
->sequence
= sequence
; // sequence number for the first key in the batch
337 w
->parallel_group
= pg
;
338 SetState(w
, STATE_PARALLEL_FOLLOWER
);
342 // This method is called by both the leader and parallel followers
343 bool WriteThread::CompleteParallelWorker(Writer
* w
) {
344 static AdaptationContext
ctx("CompleteParallelWorker");
346 auto* pg
= w
->parallel_group
;
347 if (!w
->status
.ok()) {
348 std::lock_guard
<std::mutex
> guard(pg
->leader
->StateMutex());
349 pg
->status
= w
->status
;
352 if (pg
->running
.load(std::memory_order_acquire
) > 1 && pg
->running
-- > 1) {
353 // we're not the last one
354 AwaitState(w
, STATE_COMPLETED
, &ctx
);
357 // else we're the last parallel worker and should perform exit duties.
358 w
->status
= pg
->status
;
362 void WriteThread::ExitAsBatchGroupFollower(Writer
* w
) {
363 auto* pg
= w
->parallel_group
;
365 assert(w
->state
== STATE_PARALLEL_FOLLOWER
);
366 assert(pg
->status
.ok());
367 ExitAsBatchGroupLeader(pg
->leader
, pg
->last_writer
, pg
->status
);
368 assert(w
->status
.ok());
369 assert(w
->state
== STATE_COMPLETED
);
370 SetState(pg
->leader
, STATE_COMPLETED
);
373 void WriteThread::ExitAsBatchGroupLeader(Writer
* leader
, Writer
* last_writer
,
375 assert(leader
->link_older
== nullptr);
377 Writer
* head
= newest_writer_
.load(std::memory_order_acquire
);
378 if (head
!= last_writer
||
379 !newest_writer_
.compare_exchange_strong(head
, nullptr)) {
380 // Either w wasn't the head during the load(), or it was the head
381 // during the load() but somebody else pushed onto the list before
382 // we did the compare_exchange_strong (causing it to fail). In the
383 // latter case compare_exchange_strong has the effect of re-reading
384 // its first param (head). No need to retry a failing CAS, because
385 // only a departing leader (which we are at the moment) can remove
386 // nodes from the list.
387 assert(head
!= last_writer
);
389 // After walking link_older starting from head (if not already done)
390 // we will be able to traverse w->link_newer below. This function
391 // can only be called from an active leader, only a leader can
392 // clear newest_writer_, we didn't, and only a clear newest_writer_
393 // could cause the next leader to start their work without a call
394 // to MarkJoined, so we can definitely conclude that no other leader
395 // work is going on here (with or without db mutex).
396 CreateMissingNewerLinks(head
);
397 assert(last_writer
->link_newer
->link_older
== last_writer
);
398 last_writer
->link_newer
->link_older
= nullptr;
400 // Next leader didn't self-identify, because newest_writer_ wasn't
401 // nullptr when they enqueued (we were definitely enqueued before them
402 // and are still in the list). That means leader handoff occurs when
403 // we call MarkJoined
404 SetState(last_writer
->link_newer
, STATE_GROUP_LEADER
);
406 // else nobody else was waiting, although there might already be a new
409 while (last_writer
!= leader
) {
410 last_writer
->status
= status
;
411 // we need to read link_older before calling SetState, because as soon
412 // as it is marked committed the other thread's Await may return and
413 // deallocate the Writer.
414 auto next
= last_writer
->link_older
;
415 SetState(last_writer
, STATE_COMPLETED
);
421 void WriteThread::EnterUnbatched(Writer
* w
, InstrumentedMutex
* mu
) {
422 static AdaptationContext
ctx("EnterUnbatched");
424 assert(w
->batch
== nullptr);
425 bool linked_as_leader
;
426 LinkOne(w
, &linked_as_leader
);
427 if (!linked_as_leader
) {
429 TEST_SYNC_POINT("WriteThread::EnterUnbatched:Wait");
430 // Last leader will not pick us as a follower since our batch is nullptr
431 AwaitState(w
, STATE_GROUP_LEADER
, &ctx
);
436 void WriteThread::ExitUnbatched(Writer
* w
) {
438 ExitAsBatchGroupLeader(w
, w
, dummy_status
);
441 } // namespace rocksdb