]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/write_thread.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / rocksdb / db / write_thread.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
5
6 #include "db/write_thread.h"
7 #include <chrono>
8 #include <thread>
9 #include "db/column_family.h"
10 #include "port/port.h"
11 #include "util/random.h"
12 #include "util/sync_point.h"
13
14 namespace rocksdb {
15
16 WriteThread::WriteThread(uint64_t max_yield_usec, uint64_t slow_yield_usec)
17 : max_yield_usec_(max_yield_usec),
18 slow_yield_usec_(slow_yield_usec),
19 newest_writer_(nullptr) {}
20
21 uint8_t WriteThread::BlockingAwaitState(Writer* w, uint8_t goal_mask) {
22 // We're going to block. Lazily create the mutex. We guarantee
23 // propagation of this construction to the waker via the
24 // STATE_LOCKED_WAITING state. The waker won't try to touch the mutex
25 // or the condvar unless they CAS away the STATE_LOCKED_WAITING that
26 // we install below.
27 w->CreateMutex();
28
29 auto state = w->state.load(std::memory_order_acquire);
30 assert(state != STATE_LOCKED_WAITING);
31 if ((state & goal_mask) == 0 &&
32 w->state.compare_exchange_strong(state, STATE_LOCKED_WAITING)) {
33 // we have permission (and an obligation) to use StateMutex
34 std::unique_lock<std::mutex> guard(w->StateMutex());
35 w->StateCV().wait(guard, [w] {
36 return w->state.load(std::memory_order_relaxed) != STATE_LOCKED_WAITING;
37 });
38 state = w->state.load(std::memory_order_relaxed);
39 }
40 // else tricky. Goal is met or CAS failed. In the latter case the waker
41 // must have changed the state, and compare_exchange_strong has updated
42 // our local variable with the new one. At the moment WriteThread never
43 // waits for a transition across intermediate states, so we know that
44 // since a state change has occurred the goal must have been met.
45 assert((state & goal_mask) != 0);
46 return state;
47 }
48
49 uint8_t WriteThread::AwaitState(Writer* w, uint8_t goal_mask,
50 AdaptationContext* ctx) {
51 uint8_t state;
52
53 // On a modern Xeon each loop takes about 7 nanoseconds (most of which
54 // is the effect of the pause instruction), so 200 iterations is a bit
55 // more than a microsecond. This is long enough that waits longer than
56 // this can amortize the cost of accessing the clock and yielding.
57 for (uint32_t tries = 0; tries < 200; ++tries) {
58 state = w->state.load(std::memory_order_acquire);
59 if ((state & goal_mask) != 0) {
60 return state;
61 }
62 port::AsmVolatilePause();
63 }
64
65 // If we're only going to end up waiting a short period of time,
66 // it can be a lot more efficient to call std::this_thread::yield()
67 // in a loop than to block in StateMutex(). For reference, on my 4.0
68 // SELinux test server with support for syscall auditing enabled, the
69 // minimum latency between FUTEX_WAKE to returning from FUTEX_WAIT is
70 // 2.7 usec, and the average is more like 10 usec. That can be a big
71 // drag on RockDB's single-writer design. Of course, spinning is a
72 // bad idea if other threads are waiting to run or if we're going to
73 // wait for a long time. How do we decide?
74 //
75 // We break waiting into 3 categories: short-uncontended,
76 // short-contended, and long. If we had an oracle, then we would always
77 // spin for short-uncontended, always block for long, and our choice for
78 // short-contended might depend on whether we were trying to optimize
79 // RocksDB throughput or avoid being greedy with system resources.
80 //
81 // Bucketing into short or long is easy by measuring elapsed time.
82 // Differentiating short-uncontended from short-contended is a bit
83 // trickier, but not too bad. We could look for involuntary context
84 // switches using getrusage(RUSAGE_THREAD, ..), but it's less work
85 // (portability code and CPU) to just look for yield calls that take
86 // longer than we expect. sched_yield() doesn't actually result in any
87 // context switch overhead if there are no other runnable processes
88 // on the current core, in which case it usually takes less than
89 // a microsecond.
90 //
91 // There are two primary tunables here: the threshold between "short"
92 // and "long" waits, and the threshold at which we suspect that a yield
93 // is slow enough to indicate we should probably block. If these
94 // thresholds are chosen well then CPU-bound workloads that don't
95 // have more threads than cores will experience few context switches
96 // (voluntary or involuntary), and the total number of context switches
97 // (voluntary and involuntary) will not be dramatically larger (maybe
98 // 2x) than the number of voluntary context switches that occur when
99 // --max_yield_wait_micros=0.
100 //
101 // There's another constant, which is the number of slow yields we will
102 // tolerate before reversing our previous decision. Solitary slow
103 // yields are pretty common (low-priority small jobs ready to run),
104 // so this should be at least 2. We set this conservatively to 3 so
105 // that we can also immediately schedule a ctx adaptation, rather than
106 // waiting for the next update_ctx.
107
108 const size_t kMaxSlowYieldsWhileSpinning = 3;
109
110 bool update_ctx = false;
111 bool would_spin_again = false;
112
113 if (max_yield_usec_ > 0) {
114 update_ctx = Random::GetTLSInstance()->OneIn(256);
115
116 if (update_ctx || ctx->value.load(std::memory_order_relaxed) >= 0) {
117 // we're updating the adaptation statistics, or spinning has >
118 // 50% chance of being shorter than max_yield_usec_ and causing no
119 // involuntary context switches
120 auto spin_begin = std::chrono::steady_clock::now();
121
122 // this variable doesn't include the final yield (if any) that
123 // causes the goal to be met
124 size_t slow_yield_count = 0;
125
126 auto iter_begin = spin_begin;
127 while ((iter_begin - spin_begin) <=
128 std::chrono::microseconds(max_yield_usec_)) {
129 std::this_thread::yield();
130
131 state = w->state.load(std::memory_order_acquire);
132 if ((state & goal_mask) != 0) {
133 // success
134 would_spin_again = true;
135 break;
136 }
137
138 auto now = std::chrono::steady_clock::now();
139 if (now == iter_begin ||
140 now - iter_begin >= std::chrono::microseconds(slow_yield_usec_)) {
141 // conservatively count it as a slow yield if our clock isn't
142 // accurate enough to measure the yield duration
143 ++slow_yield_count;
144 if (slow_yield_count >= kMaxSlowYieldsWhileSpinning) {
145 // Not just one ivcsw, but several. Immediately update ctx
146 // and fall back to blocking
147 update_ctx = true;
148 break;
149 }
150 }
151 iter_begin = now;
152 }
153 }
154 }
155
156 if ((state & goal_mask) == 0) {
157 state = BlockingAwaitState(w, goal_mask);
158 }
159
160 if (update_ctx) {
161 auto v = ctx->value.load(std::memory_order_relaxed);
162 // fixed point exponential decay with decay constant 1/1024, with +1
163 // and -1 scaled to avoid overflow for int32_t
164 v = v + (v / 1024) + (would_spin_again ? 1 : -1) * 16384;
165 ctx->value.store(v, std::memory_order_relaxed);
166 }
167
168 assert((state & goal_mask) != 0);
169 return state;
170 }
171
172 void WriteThread::SetState(Writer* w, uint8_t new_state) {
173 auto state = w->state.load(std::memory_order_acquire);
174 if (state == STATE_LOCKED_WAITING ||
175 !w->state.compare_exchange_strong(state, new_state)) {
176 assert(state == STATE_LOCKED_WAITING);
177
178 std::lock_guard<std::mutex> guard(w->StateMutex());
179 assert(w->state.load(std::memory_order_relaxed) != new_state);
180 w->state.store(new_state, std::memory_order_relaxed);
181 w->StateCV().notify_one();
182 }
183 }
184
185 void WriteThread::LinkOne(Writer* w, bool* linked_as_leader) {
186 assert(w->state == STATE_INIT);
187
188 while (true) {
189 Writer* writers = newest_writer_.load(std::memory_order_relaxed);
190 w->link_older = writers;
191 if (newest_writer_.compare_exchange_strong(writers, w)) {
192 if (writers == nullptr) {
193 // this isn't part of the WriteThread machinery, but helps with
194 // debugging and is checked by an assert in WriteImpl
195 w->state.store(STATE_GROUP_LEADER, std::memory_order_relaxed);
196 }
197 // Then we are the head of the queue and hence definiltly the leader
198 *linked_as_leader = (writers == nullptr);
199 // Otherwise we will wait for previous leader to define our status
200 return;
201 }
202 }
203 }
204
205 void WriteThread::CreateMissingNewerLinks(Writer* head) {
206 while (true) {
207 Writer* next = head->link_older;
208 if (next == nullptr || next->link_newer != nullptr) {
209 assert(next == nullptr || next->link_newer == head);
210 break;
211 }
212 next->link_newer = head;
213 head = next;
214 }
215 }
216
217 void WriteThread::JoinBatchGroup(Writer* w) {
218 static AdaptationContext ctx("JoinBatchGroup");
219
220 assert(w->batch != nullptr);
221 bool linked_as_leader;
222 LinkOne(w, &linked_as_leader);
223
224 TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w);
225
226 if (!linked_as_leader) {
227 /**
228 * Wait util:
229 * 1) An existing leader pick us as the new leader when it finishes
230 * 2) An exisitng leader pick us as its follewer and
231 * 2.1) finishes the memtable writes on our behalf
232 * 2.2) Or tell us to finish the memtable writes it in pralallel
233 */
234 AwaitState(w,
235 STATE_GROUP_LEADER | STATE_PARALLEL_FOLLOWER | STATE_COMPLETED,
236 &ctx);
237 TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w);
238 }
239 }
240
241 size_t WriteThread::EnterAsBatchGroupLeader(
242 Writer* leader, WriteThread::Writer** last_writer,
243 autovector<WriteThread::Writer*>* write_batch_group) {
244 assert(leader->link_older == nullptr);
245 assert(leader->batch != nullptr);
246
247 size_t size = WriteBatchInternal::ByteSize(leader->batch);
248 write_batch_group->push_back(leader);
249
250 // Allow the group to grow up to a maximum size, but if the
251 // original write is small, limit the growth so we do not slow
252 // down the small write too much.
253 size_t max_size = 1 << 20;
254 if (size <= (128 << 10)) {
255 max_size = size + (128 << 10);
256 }
257
258 *last_writer = leader;
259
260 Writer* newest_writer = newest_writer_.load(std::memory_order_acquire);
261
262 // This is safe regardless of any db mutex status of the caller. Previous
263 // calls to ExitAsGroupLeader either didn't call CreateMissingNewerLinks
264 // (they emptied the list and then we added ourself as leader) or had to
265 // explicitly wake us up (the list was non-empty when we added ourself,
266 // so we have already received our MarkJoined).
267 CreateMissingNewerLinks(newest_writer);
268
269 // Tricky. Iteration start (leader) is exclusive and finish
270 // (newest_writer) is inclusive. Iteration goes from old to new.
271 Writer* w = leader;
272 while (w != newest_writer) {
273 w = w->link_newer;
274
275 if (w->sync && !leader->sync) {
276 // Do not include a sync write into a batch handled by a non-sync write.
277 break;
278 }
279
280 if (w->no_slowdown != leader->no_slowdown) {
281 // Do not mix writes that are ok with delays with the ones that
282 // request fail on delays.
283 break;
284 }
285
286 if (!w->disable_wal && leader->disable_wal) {
287 // Do not include a write that needs WAL into a batch that has
288 // WAL disabled.
289 break;
290 }
291
292 if (w->batch == nullptr) {
293 // Do not include those writes with nullptr batch. Those are not writes,
294 // those are something else. They want to be alone
295 break;
296 }
297
298 if (w->callback != nullptr && !w->callback->AllowWriteBatching()) {
299 // dont batch writes that don't want to be batched
300 break;
301 }
302
303 auto batch_size = WriteBatchInternal::ByteSize(w->batch);
304 if (size + batch_size > max_size) {
305 // Do not make batch too big
306 break;
307 }
308
309 size += batch_size;
310 write_batch_group->push_back(w);
311 w->in_batch_group = true;
312 *last_writer = w;
313 }
314 return size;
315 }
316
317 void WriteThread::LaunchParallelFollowers(ParallelGroup* pg,
318 SequenceNumber sequence) {
319 // EnterAsBatchGroupLeader already created the links from leader to
320 // newer writers in the group
321
322 pg->leader->parallel_group = pg;
323
324 Writer* w = pg->leader;
325 w->sequence = sequence;
326
327 // Initialize and wake up the others
328 while (w != pg->last_writer) {
329 // Writers that won't write don't get sequence allotment
330 if (!w->CallbackFailed() && w->ShouldWriteToMemtable()) {
331 // There is a sequence number of each written key
332 sequence += WriteBatchInternal::Count(w->batch);
333 }
334 w = w->link_newer;
335
336 w->sequence = sequence; // sequence number for the first key in the batch
337 w->parallel_group = pg;
338 SetState(w, STATE_PARALLEL_FOLLOWER);
339 }
340 }
341
342 // This method is called by both the leader and parallel followers
343 bool WriteThread::CompleteParallelWorker(Writer* w) {
344 static AdaptationContext ctx("CompleteParallelWorker");
345
346 auto* pg = w->parallel_group;
347 if (!w->status.ok()) {
348 std::lock_guard<std::mutex> guard(pg->leader->StateMutex());
349 pg->status = w->status;
350 }
351
352 if (pg->running.load(std::memory_order_acquire) > 1 && pg->running-- > 1) {
353 // we're not the last one
354 AwaitState(w, STATE_COMPLETED, &ctx);
355 return false;
356 }
357 // else we're the last parallel worker and should perform exit duties.
358 w->status = pg->status;
359 return true;
360 }
361
362 void WriteThread::ExitAsBatchGroupFollower(Writer* w) {
363 auto* pg = w->parallel_group;
364
365 assert(w->state == STATE_PARALLEL_FOLLOWER);
366 assert(pg->status.ok());
367 ExitAsBatchGroupLeader(pg->leader, pg->last_writer, pg->status);
368 assert(w->status.ok());
369 assert(w->state == STATE_COMPLETED);
370 SetState(pg->leader, STATE_COMPLETED);
371 }
372
373 void WriteThread::ExitAsBatchGroupLeader(Writer* leader, Writer* last_writer,
374 Status status) {
375 assert(leader->link_older == nullptr);
376
377 Writer* head = newest_writer_.load(std::memory_order_acquire);
378 if (head != last_writer ||
379 !newest_writer_.compare_exchange_strong(head, nullptr)) {
380 // Either w wasn't the head during the load(), or it was the head
381 // during the load() but somebody else pushed onto the list before
382 // we did the compare_exchange_strong (causing it to fail). In the
383 // latter case compare_exchange_strong has the effect of re-reading
384 // its first param (head). No need to retry a failing CAS, because
385 // only a departing leader (which we are at the moment) can remove
386 // nodes from the list.
387 assert(head != last_writer);
388
389 // After walking link_older starting from head (if not already done)
390 // we will be able to traverse w->link_newer below. This function
391 // can only be called from an active leader, only a leader can
392 // clear newest_writer_, we didn't, and only a clear newest_writer_
393 // could cause the next leader to start their work without a call
394 // to MarkJoined, so we can definitely conclude that no other leader
395 // work is going on here (with or without db mutex).
396 CreateMissingNewerLinks(head);
397 assert(last_writer->link_newer->link_older == last_writer);
398 last_writer->link_newer->link_older = nullptr;
399
400 // Next leader didn't self-identify, because newest_writer_ wasn't
401 // nullptr when they enqueued (we were definitely enqueued before them
402 // and are still in the list). That means leader handoff occurs when
403 // we call MarkJoined
404 SetState(last_writer->link_newer, STATE_GROUP_LEADER);
405 }
406 // else nobody else was waiting, although there might already be a new
407 // leader now
408
409 while (last_writer != leader) {
410 last_writer->status = status;
411 // we need to read link_older before calling SetState, because as soon
412 // as it is marked committed the other thread's Await may return and
413 // deallocate the Writer.
414 auto next = last_writer->link_older;
415 SetState(last_writer, STATE_COMPLETED);
416
417 last_writer = next;
418 }
419 }
420
421 void WriteThread::EnterUnbatched(Writer* w, InstrumentedMutex* mu) {
422 static AdaptationContext ctx("EnterUnbatched");
423
424 assert(w->batch == nullptr);
425 bool linked_as_leader;
426 LinkOne(w, &linked_as_leader);
427 if (!linked_as_leader) {
428 mu->Unlock();
429 TEST_SYNC_POINT("WriteThread::EnterUnbatched:Wait");
430 // Last leader will not pick us as a follower since our batch is nullptr
431 AwaitState(w, STATE_GROUP_LEADER, &ctx);
432 mu->Lock();
433 }
434 }
435
436 void WriteThread::ExitUnbatched(Writer* w) {
437 Status dummy_status;
438 ExitAsBatchGroupLeader(w, w, dummy_status);
439 }
440
441 } // namespace rocksdb