]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/write_thread.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / write_thread.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5
6#include "db/write_thread.h"
1e59de90 7
7c673cae
FG
8#include <chrono>
9#include <thread>
1e59de90 10
7c673cae 11#include "db/column_family.h"
11fdf7f2 12#include "monitoring/perf_context_imp.h"
7c673cae 13#include "port/port.h"
f67539c2 14#include "test_util/sync_point.h"
7c673cae 15#include "util/random.h"
7c673cae 16
f67539c2 17namespace ROCKSDB_NAMESPACE {
7c673cae 18
11fdf7f2
TL
19WriteThread::WriteThread(const ImmutableDBOptions& db_options)
20 : max_yield_usec_(db_options.enable_write_thread_adaptive_yield
21 ? db_options.write_thread_max_yield_usec
22 : 0),
23 slow_yield_usec_(db_options.write_thread_slow_yield_usec),
24 allow_concurrent_memtable_write_(
25 db_options.allow_concurrent_memtable_write),
26 enable_pipelined_write_(db_options.enable_pipelined_write),
f67539c2
TL
27 max_write_batch_group_size_bytes(
28 db_options.max_write_batch_group_size_bytes),
11fdf7f2
TL
29 newest_writer_(nullptr),
30 newest_memtable_writer_(nullptr),
31 last_sequence_(0),
32 write_stall_dummy_(),
33 stall_mu_(),
34 stall_cv_(&stall_mu_) {}
7c673cae
FG
35
36uint8_t WriteThread::BlockingAwaitState(Writer* w, uint8_t goal_mask) {
37 // We're going to block. Lazily create the mutex. We guarantee
38 // propagation of this construction to the waker via the
39 // STATE_LOCKED_WAITING state. The waker won't try to touch the mutex
40 // or the condvar unless they CAS away the STATE_LOCKED_WAITING that
41 // we install below.
42 w->CreateMutex();
43
44 auto state = w->state.load(std::memory_order_acquire);
45 assert(state != STATE_LOCKED_WAITING);
46 if ((state & goal_mask) == 0 &&
47 w->state.compare_exchange_strong(state, STATE_LOCKED_WAITING)) {
48 // we have permission (and an obligation) to use StateMutex
49 std::unique_lock<std::mutex> guard(w->StateMutex());
50 w->StateCV().wait(guard, [w] {
51 return w->state.load(std::memory_order_relaxed) != STATE_LOCKED_WAITING;
52 });
53 state = w->state.load(std::memory_order_relaxed);
54 }
55 // else tricky. Goal is met or CAS failed. In the latter case the waker
56 // must have changed the state, and compare_exchange_strong has updated
57 // our local variable with the new one. At the moment WriteThread never
58 // waits for a transition across intermediate states, so we know that
59 // since a state change has occurred the goal must have been met.
60 assert((state & goal_mask) != 0);
61 return state;
62}
63
64uint8_t WriteThread::AwaitState(Writer* w, uint8_t goal_mask,
65 AdaptationContext* ctx) {
f67539c2 66 uint8_t state = 0;
7c673cae 67
11fdf7f2
TL
68 // 1. Busy loop using "pause" for 1 micro sec
69 // 2. Else SOMETIMES busy loop using "yield" for 100 micro sec (default)
70 // 3. Else blocking wait
71
7c673cae
FG
72 // On a modern Xeon each loop takes about 7 nanoseconds (most of which
73 // is the effect of the pause instruction), so 200 iterations is a bit
74 // more than a microsecond. This is long enough that waits longer than
75 // this can amortize the cost of accessing the clock and yielding.
76 for (uint32_t tries = 0; tries < 200; ++tries) {
77 state = w->state.load(std::memory_order_acquire);
78 if ((state & goal_mask) != 0) {
79 return state;
80 }
81 port::AsmVolatilePause();
82 }
83
11fdf7f2
TL
84 // This is below the fast path, so that the stat is zero when all writes are
85 // from the same thread.
86 PERF_TIMER_GUARD(write_thread_wait_nanos);
87
7c673cae
FG
88 // If we're only going to end up waiting a short period of time,
89 // it can be a lot more efficient to call std::this_thread::yield()
90 // in a loop than to block in StateMutex(). For reference, on my 4.0
91 // SELinux test server with support for syscall auditing enabled, the
92 // minimum latency between FUTEX_WAKE to returning from FUTEX_WAIT is
93 // 2.7 usec, and the average is more like 10 usec. That can be a big
94 // drag on RockDB's single-writer design. Of course, spinning is a
95 // bad idea if other threads are waiting to run or if we're going to
96 // wait for a long time. How do we decide?
97 //
98 // We break waiting into 3 categories: short-uncontended,
99 // short-contended, and long. If we had an oracle, then we would always
100 // spin for short-uncontended, always block for long, and our choice for
101 // short-contended might depend on whether we were trying to optimize
102 // RocksDB throughput or avoid being greedy with system resources.
103 //
104 // Bucketing into short or long is easy by measuring elapsed time.
105 // Differentiating short-uncontended from short-contended is a bit
106 // trickier, but not too bad. We could look for involuntary context
107 // switches using getrusage(RUSAGE_THREAD, ..), but it's less work
108 // (portability code and CPU) to just look for yield calls that take
109 // longer than we expect. sched_yield() doesn't actually result in any
110 // context switch overhead if there are no other runnable processes
111 // on the current core, in which case it usually takes less than
112 // a microsecond.
113 //
114 // There are two primary tunables here: the threshold between "short"
115 // and "long" waits, and the threshold at which we suspect that a yield
116 // is slow enough to indicate we should probably block. If these
117 // thresholds are chosen well then CPU-bound workloads that don't
118 // have more threads than cores will experience few context switches
119 // (voluntary or involuntary), and the total number of context switches
120 // (voluntary and involuntary) will not be dramatically larger (maybe
121 // 2x) than the number of voluntary context switches that occur when
122 // --max_yield_wait_micros=0.
123 //
124 // There's another constant, which is the number of slow yields we will
125 // tolerate before reversing our previous decision. Solitary slow
126 // yields are pretty common (low-priority small jobs ready to run),
127 // so this should be at least 2. We set this conservatively to 3 so
128 // that we can also immediately schedule a ctx adaptation, rather than
129 // waiting for the next update_ctx.
130
131 const size_t kMaxSlowYieldsWhileSpinning = 3;
132
11fdf7f2
TL
133 // Whether the yield approach has any credit in this context. The credit is
134 // added by yield being succesfull before timing out, and decreased otherwise.
135 auto& yield_credit = ctx->value;
136 // Update the yield_credit based on sample runs or right after a hard failure
7c673cae 137 bool update_ctx = false;
11fdf7f2 138 // Should we reinforce the yield credit
7c673cae 139 bool would_spin_again = false;
11fdf7f2
TL
140 // The samling base for updating the yeild credit. The sampling rate would be
141 // 1/sampling_base.
142 const int sampling_base = 256;
7c673cae
FG
143
144 if (max_yield_usec_ > 0) {
11fdf7f2 145 update_ctx = Random::GetTLSInstance()->OneIn(sampling_base);
7c673cae 146
11fdf7f2 147 if (update_ctx || yield_credit.load(std::memory_order_relaxed) >= 0) {
7c673cae
FG
148 // we're updating the adaptation statistics, or spinning has >
149 // 50% chance of being shorter than max_yield_usec_ and causing no
150 // involuntary context switches
151 auto spin_begin = std::chrono::steady_clock::now();
152
153 // this variable doesn't include the final yield (if any) that
154 // causes the goal to be met
155 size_t slow_yield_count = 0;
156
157 auto iter_begin = spin_begin;
158 while ((iter_begin - spin_begin) <=
159 std::chrono::microseconds(max_yield_usec_)) {
160 std::this_thread::yield();
161
162 state = w->state.load(std::memory_order_acquire);
163 if ((state & goal_mask) != 0) {
164 // success
165 would_spin_again = true;
166 break;
167 }
168
169 auto now = std::chrono::steady_clock::now();
170 if (now == iter_begin ||
171 now - iter_begin >= std::chrono::microseconds(slow_yield_usec_)) {
172 // conservatively count it as a slow yield if our clock isn't
173 // accurate enough to measure the yield duration
174 ++slow_yield_count;
175 if (slow_yield_count >= kMaxSlowYieldsWhileSpinning) {
11fdf7f2 176 // Not just one ivcsw, but several. Immediately update yield_credit
7c673cae
FG
177 // and fall back to blocking
178 update_ctx = true;
179 break;
180 }
181 }
182 iter_begin = now;
183 }
184 }
185 }
186
187 if ((state & goal_mask) == 0) {
494da23a 188 TEST_SYNC_POINT_CALLBACK("WriteThread::AwaitState:BlockingWaiting", w);
7c673cae
FG
189 state = BlockingAwaitState(w, goal_mask);
190 }
191
192 if (update_ctx) {
11fdf7f2
TL
193 // Since our update is sample based, it is ok if a thread overwrites the
194 // updates by other threads. Thus the update does not have to be atomic.
195 auto v = yield_credit.load(std::memory_order_relaxed);
7c673cae
FG
196 // fixed point exponential decay with decay constant 1/1024, with +1
197 // and -1 scaled to avoid overflow for int32_t
11fdf7f2
TL
198 //
199 // On each update the positive credit is decayed by a facor of 1/1024 (i.e.,
200 // 0.1%). If the sampled yield was successful, the credit is also increased
201 // by X. Setting X=2^17 ensures that the credit never exceeds
202 // 2^17*2^10=2^27, which is lower than 2^31 the upperbound of int32_t. Same
203 // logic applies to negative credits.
204 v = v - (v / 1024) + (would_spin_again ? 1 : -1) * 131072;
205 yield_credit.store(v, std::memory_order_relaxed);
7c673cae
FG
206 }
207
208 assert((state & goal_mask) != 0);
209 return state;
210}
211
212void WriteThread::SetState(Writer* w, uint8_t new_state) {
1e59de90 213 assert(w);
7c673cae
FG
214 auto state = w->state.load(std::memory_order_acquire);
215 if (state == STATE_LOCKED_WAITING ||
216 !w->state.compare_exchange_strong(state, new_state)) {
217 assert(state == STATE_LOCKED_WAITING);
218
219 std::lock_guard<std::mutex> guard(w->StateMutex());
220 assert(w->state.load(std::memory_order_relaxed) != new_state);
221 w->state.store(new_state, std::memory_order_relaxed);
222 w->StateCV().notify_one();
223 }
224}
225
11fdf7f2
TL
226bool WriteThread::LinkOne(Writer* w, std::atomic<Writer*>* newest_writer) {
227 assert(newest_writer != nullptr);
7c673cae 228 assert(w->state == STATE_INIT);
11fdf7f2 229 Writer* writers = newest_writer->load(std::memory_order_relaxed);
7c673cae 230 while (true) {
11fdf7f2
TL
231 // If write stall in effect, and w->no_slowdown is not true,
232 // block here until stall is cleared. If its true, then return
233 // immediately
234 if (writers == &write_stall_dummy_) {
235 if (w->no_slowdown) {
236 w->status = Status::Incomplete("Write stall");
237 SetState(w, STATE_COMPLETED);
238 return false;
239 }
240 // Since no_slowdown is false, wait here to be notified of the write
241 // stall clearing
242 {
243 MutexLock lock(&stall_mu_);
244 writers = newest_writer->load(std::memory_order_relaxed);
245 if (writers == &write_stall_dummy_) {
1e59de90 246 TEST_SYNC_POINT_CALLBACK("WriteThread::WriteStall::Wait", w);
11fdf7f2
TL
247 stall_cv_.Wait();
248 // Load newest_writers_ again since it may have changed
249 writers = newest_writer->load(std::memory_order_relaxed);
250 continue;
251 }
7c673cae 252 }
11fdf7f2
TL
253 }
254 w->link_older = writers;
255 if (newest_writer->compare_exchange_weak(writers, w)) {
256 return (writers == nullptr);
257 }
258 }
259}
260
261bool WriteThread::LinkGroup(WriteGroup& write_group,
262 std::atomic<Writer*>* newest_writer) {
263 assert(newest_writer != nullptr);
264 Writer* leader = write_group.leader;
265 Writer* last_writer = write_group.last_writer;
266 Writer* w = last_writer;
267 while (true) {
268 // Unset link_newer pointers to make sure when we call
269 // CreateMissingNewerLinks later it create all missing links.
270 w->link_newer = nullptr;
271 w->write_group = nullptr;
272 if (w == leader) {
273 break;
274 }
275 w = w->link_older;
276 }
277 Writer* newest = newest_writer->load(std::memory_order_relaxed);
278 while (true) {
279 leader->link_older = newest;
280 if (newest_writer->compare_exchange_weak(newest, last_writer)) {
281 return (newest == nullptr);
7c673cae
FG
282 }
283 }
284}
285
286void WriteThread::CreateMissingNewerLinks(Writer* head) {
287 while (true) {
288 Writer* next = head->link_older;
289 if (next == nullptr || next->link_newer != nullptr) {
290 assert(next == nullptr || next->link_newer == head);
291 break;
292 }
293 next->link_newer = head;
294 head = next;
295 }
296}
297
11fdf7f2
TL
298void WriteThread::CompleteLeader(WriteGroup& write_group) {
299 assert(write_group.size > 0);
300 Writer* leader = write_group.leader;
301 if (write_group.size == 1) {
302 write_group.leader = nullptr;
303 write_group.last_writer = nullptr;
304 } else {
305 assert(leader->link_newer != nullptr);
306 leader->link_newer->link_older = nullptr;
307 write_group.leader = leader->link_newer;
308 }
309 write_group.size -= 1;
310 SetState(leader, STATE_COMPLETED);
311}
7c673cae 312
11fdf7f2
TL
313void WriteThread::CompleteFollower(Writer* w, WriteGroup& write_group) {
314 assert(write_group.size > 1);
315 assert(w != write_group.leader);
316 if (w == write_group.last_writer) {
317 w->link_older->link_newer = nullptr;
318 write_group.last_writer = w->link_older;
319 } else {
320 w->link_older->link_newer = w->link_newer;
321 w->link_newer->link_older = w->link_older;
322 }
323 write_group.size -= 1;
324 SetState(w, STATE_COMPLETED);
325}
326
327void WriteThread::BeginWriteStall() {
328 LinkOne(&write_stall_dummy_, &newest_writer_);
329
330 // Walk writer list until w->write_group != nullptr. The current write group
331 // will not have a mix of slowdown/no_slowdown, so its ok to stop at that
332 // point
333 Writer* w = write_stall_dummy_.link_older;
334 Writer* prev = &write_stall_dummy_;
335 while (w != nullptr && w->write_group == nullptr) {
336 if (w->no_slowdown) {
337 prev->link_older = w->link_older;
338 w->status = Status::Incomplete("Write stall");
339 SetState(w, STATE_COMPLETED);
20effc67
TL
340 // Only update `link_newer` if it's already set.
341 // `CreateMissingNewerLinks()` will update the nullptr `link_newer` later,
342 // which assumes the the first non-nullptr `link_newer` is the last
343 // nullptr link in the writer list.
344 // If `link_newer` is set here, `CreateMissingNewerLinks()` may stop
345 // updating the whole list when it sees the first non nullptr link.
346 if (prev->link_older && prev->link_older->link_newer) {
f67539c2
TL
347 prev->link_older->link_newer = prev;
348 }
11fdf7f2
TL
349 w = prev->link_older;
350 } else {
351 prev = w;
352 w = w->link_older;
353 }
354 }
355}
356
357void WriteThread::EndWriteStall() {
358 MutexLock lock(&stall_mu_);
359
f67539c2
TL
360 // Unlink write_stall_dummy_ from the write queue. This will unblock
361 // pending write threads to enqueue themselves
11fdf7f2 362 assert(newest_writer_.load(std::memory_order_relaxed) == &write_stall_dummy_);
f67539c2
TL
363 assert(write_stall_dummy_.link_older != nullptr);
364 write_stall_dummy_.link_older->link_newer = write_stall_dummy_.link_newer;
11fdf7f2
TL
365 newest_writer_.exchange(write_stall_dummy_.link_older);
366
367 // Wake up writers
368 stall_cv_.SignalAll();
369}
370
371static WriteThread::AdaptationContext jbg_ctx("JoinBatchGroup");
372void WriteThread::JoinBatchGroup(Writer* w) {
373 TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Start", w);
7c673cae 374 assert(w->batch != nullptr);
11fdf7f2
TL
375
376 bool linked_as_leader = LinkOne(w, &newest_writer_);
377
378 if (linked_as_leader) {
379 SetState(w, STATE_GROUP_LEADER);
380 }
7c673cae
FG
381
382 TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w);
1e59de90 383 TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait2", w);
7c673cae
FG
384
385 if (!linked_as_leader) {
386 /**
387 * Wait util:
388 * 1) An existing leader pick us as the new leader when it finishes
11fdf7f2 389 * 2) An existing leader pick us as its follewer and
7c673cae 390 * 2.1) finishes the memtable writes on our behalf
11fdf7f2
TL
391 * 2.2) Or tell us to finish the memtable writes in pralallel
392 * 3) (pipelined write) An existing leader pick us as its follower and
393 * finish book-keeping and WAL write for us, enqueue us as pending
394 * memtable writer, and
395 * 3.1) we become memtable writer group leader, or
396 * 3.2) an existing memtable writer group leader tell us to finish memtable
397 * writes in parallel.
7c673cae 398 */
11fdf7f2 399 TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:BeganWaiting", w);
1e59de90
TL
400 AwaitState(w,
401 STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER |
402 STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
11fdf7f2 403 &jbg_ctx);
7c673cae
FG
404 TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w);
405 }
406}
407
11fdf7f2
TL
408size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader,
409 WriteGroup* write_group) {
7c673cae
FG
410 assert(leader->link_older == nullptr);
411 assert(leader->batch != nullptr);
11fdf7f2 412 assert(write_group != nullptr);
7c673cae
FG
413
414 size_t size = WriteBatchInternal::ByteSize(leader->batch);
7c673cae
FG
415
416 // Allow the group to grow up to a maximum size, but if the
417 // original write is small, limit the growth so we do not slow
418 // down the small write too much.
f67539c2
TL
419 size_t max_size = max_write_batch_group_size_bytes;
420 const uint64_t min_batch_size_bytes = max_write_batch_group_size_bytes / 8;
421 if (size <= min_batch_size_bytes) {
422 max_size = size + min_batch_size_bytes;
7c673cae
FG
423 }
424
11fdf7f2
TL
425 leader->write_group = write_group;
426 write_group->leader = leader;
427 write_group->last_writer = leader;
428 write_group->size = 1;
7c673cae
FG
429 Writer* newest_writer = newest_writer_.load(std::memory_order_acquire);
430
431 // This is safe regardless of any db mutex status of the caller. Previous
432 // calls to ExitAsGroupLeader either didn't call CreateMissingNewerLinks
433 // (they emptied the list and then we added ourself as leader) or had to
434 // explicitly wake us up (the list was non-empty when we added ourself,
435 // so we have already received our MarkJoined).
436 CreateMissingNewerLinks(newest_writer);
437
438 // Tricky. Iteration start (leader) is exclusive and finish
439 // (newest_writer) is inclusive. Iteration goes from old to new.
440 Writer* w = leader;
441 while (w != newest_writer) {
20effc67 442 assert(w->link_newer);
7c673cae
FG
443 w = w->link_newer;
444
445 if (w->sync && !leader->sync) {
446 // Do not include a sync write into a batch handled by a non-sync write.
447 break;
448 }
449
450 if (w->no_slowdown != leader->no_slowdown) {
451 // Do not mix writes that are ok with delays with the ones that
452 // request fail on delays.
453 break;
454 }
455
f67539c2
TL
456 if (w->disable_wal != leader->disable_wal) {
457 // Do not mix writes that enable WAL with the ones whose
7c673cae
FG
458 // WAL disabled.
459 break;
460 }
461
1e59de90
TL
462 if (w->protection_bytes_per_key != leader->protection_bytes_per_key) {
463 // Do not mix writes with different levels of integrity protection.
464 break;
465 }
466
467 if (w->rate_limiter_priority != leader->rate_limiter_priority) {
468 // Do not mix writes with different rate limiter priorities.
469 break;
470 }
471
7c673cae
FG
472 if (w->batch == nullptr) {
473 // Do not include those writes with nullptr batch. Those are not writes,
474 // those are something else. They want to be alone
475 break;
476 }
477
478 if (w->callback != nullptr && !w->callback->AllowWriteBatching()) {
20effc67 479 // don't batch writes that don't want to be batched
7c673cae
FG
480 break;
481 }
482
483 auto batch_size = WriteBatchInternal::ByteSize(w->batch);
484 if (size + batch_size > max_size) {
485 // Do not make batch too big
486 break;
487 }
488
11fdf7f2 489 w->write_group = write_group;
7c673cae 490 size += batch_size;
11fdf7f2
TL
491 write_group->last_writer = w;
492 write_group->size++;
7c673cae 493 }
11fdf7f2 494 TEST_SYNC_POINT_CALLBACK("WriteThread::EnterAsBatchGroupLeader:End", w);
7c673cae
FG
495 return size;
496}
497
11fdf7f2
TL
498void WriteThread::EnterAsMemTableWriter(Writer* leader,
499 WriteGroup* write_group) {
500 assert(leader != nullptr);
501 assert(leader->link_older == nullptr);
502 assert(leader->batch != nullptr);
503 assert(write_group != nullptr);
504
505 size_t size = WriteBatchInternal::ByteSize(leader->batch);
506
507 // Allow the group to grow up to a maximum size, but if the
508 // original write is small, limit the growth so we do not slow
509 // down the small write too much.
f67539c2
TL
510 size_t max_size = max_write_batch_group_size_bytes;
511 const uint64_t min_batch_size_bytes = max_write_batch_group_size_bytes / 8;
512 if (size <= min_batch_size_bytes) {
513 max_size = size + min_batch_size_bytes;
11fdf7f2
TL
514 }
515
516 leader->write_group = write_group;
517 write_group->leader = leader;
518 write_group->size = 1;
519 Writer* last_writer = leader;
520
521 if (!allow_concurrent_memtable_write_ || !leader->batch->HasMerge()) {
522 Writer* newest_writer = newest_memtable_writer_.load();
523 CreateMissingNewerLinks(newest_writer);
7c673cae 524
11fdf7f2
TL
525 Writer* w = leader;
526 while (w != newest_writer) {
20effc67 527 assert(w->link_newer);
11fdf7f2 528 w = w->link_newer;
7c673cae 529
11fdf7f2
TL
530 if (w->batch == nullptr) {
531 break;
532 }
533
534 if (w->batch->HasMerge()) {
535 break;
536 }
7c673cae 537
11fdf7f2
TL
538 if (!allow_concurrent_memtable_write_) {
539 auto batch_size = WriteBatchInternal::ByteSize(w->batch);
540 if (size + batch_size > max_size) {
541 // Do not make batch too big
542 break;
543 }
544 size += batch_size;
545 }
546
547 w->write_group = write_group;
548 last_writer = w;
549 write_group->size++;
7c673cae 550 }
11fdf7f2
TL
551 }
552
553 write_group->last_writer = last_writer;
554 write_group->last_sequence =
555 last_writer->sequence + WriteBatchInternal::Count(last_writer->batch) - 1;
556}
557
558void WriteThread::ExitAsMemTableWriter(Writer* /*self*/,
559 WriteGroup& write_group) {
560 Writer* leader = write_group.leader;
561 Writer* last_writer = write_group.last_writer;
562
563 Writer* newest_writer = last_writer;
564 if (!newest_memtable_writer_.compare_exchange_strong(newest_writer,
565 nullptr)) {
566 CreateMissingNewerLinks(newest_writer);
567 Writer* next_leader = last_writer->link_newer;
568 assert(next_leader != nullptr);
569 next_leader->link_older = nullptr;
570 SetState(next_leader, STATE_MEMTABLE_WRITER_LEADER);
571 }
572 Writer* w = leader;
573 while (true) {
574 if (!write_group.status.ok()) {
575 w->status = write_group.status;
576 }
577 Writer* next = w->link_newer;
578 if (w != leader) {
579 SetState(w, STATE_COMPLETED);
580 }
581 if (w == last_writer) {
582 break;
583 }
20effc67 584 assert(next);
11fdf7f2
TL
585 w = next;
586 }
587 // Note that leader has to exit last, since it owns the write group.
588 SetState(leader, STATE_COMPLETED);
589}
7c673cae 590
11fdf7f2
TL
591void WriteThread::LaunchParallelMemTableWriters(WriteGroup* write_group) {
592 assert(write_group != nullptr);
593 write_group->running.store(write_group->size);
594 for (auto w : *write_group) {
595 SetState(w, STATE_PARALLEL_MEMTABLE_WRITER);
7c673cae
FG
596 }
597}
598
1e59de90
TL
599static WriteThread::AdaptationContext cpmtw_ctx(
600 "CompleteParallelMemTableWriter");
7c673cae 601// This method is called by both the leader and parallel followers
11fdf7f2 602bool WriteThread::CompleteParallelMemTableWriter(Writer* w) {
11fdf7f2 603 auto* write_group = w->write_group;
7c673cae 604 if (!w->status.ok()) {
11fdf7f2
TL
605 std::lock_guard<std::mutex> guard(write_group->leader->StateMutex());
606 write_group->status = w->status;
7c673cae
FG
607 }
608
11fdf7f2 609 if (write_group->running-- > 1) {
7c673cae 610 // we're not the last one
11fdf7f2 611 AwaitState(w, STATE_COMPLETED, &cpmtw_ctx);
7c673cae
FG
612 return false;
613 }
614 // else we're the last parallel worker and should perform exit duties.
11fdf7f2 615 w->status = write_group->status;
20effc67
TL
616 // Callers of this function must ensure w->status is checked.
617 write_group->status.PermitUncheckedError();
7c673cae
FG
618 return true;
619}
620
621void WriteThread::ExitAsBatchGroupFollower(Writer* w) {
11fdf7f2 622 auto* write_group = w->write_group;
7c673cae 623
11fdf7f2
TL
624 assert(w->state == STATE_PARALLEL_MEMTABLE_WRITER);
625 assert(write_group->status.ok());
626 ExitAsBatchGroupLeader(*write_group, write_group->status);
7c673cae
FG
627 assert(w->status.ok());
628 assert(w->state == STATE_COMPLETED);
11fdf7f2 629 SetState(write_group->leader, STATE_COMPLETED);
7c673cae
FG
630}
631
11fdf7f2
TL
632static WriteThread::AdaptationContext eabgl_ctx("ExitAsBatchGroupLeader");
633void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
20effc67 634 Status& status) {
1e59de90
TL
635 TEST_SYNC_POINT_CALLBACK("WriteThread::ExitAsBatchGroupLeader:Start",
636 &write_group);
637
11fdf7f2
TL
638 Writer* leader = write_group.leader;
639 Writer* last_writer = write_group.last_writer;
7c673cae
FG
640 assert(leader->link_older == nullptr);
641
20effc67
TL
642 // If status is non-ok already, then write_group.status won't have the chance
643 // of being propagated to caller.
644 if (!status.ok()) {
645 write_group.status.PermitUncheckedError();
646 }
647
11fdf7f2
TL
648 // Propagate memtable write error to the whole group.
649 if (status.ok() && !write_group.status.ok()) {
650 status = write_group.status;
651 }
652
653 if (enable_pipelined_write_) {
1e59de90
TL
654 // We insert a dummy Writer right before our current write_group. This
655 // allows us to unlink our write_group without the risk that a subsequent
656 // writer becomes a new leader and might overtake us and add itself to the
657 // memtable-writer-list before we can do so. This ensures that writers are
658 // added to the memtable-writer-list in the exact same order in which they
659 // were in the newest_writer list.
660 // This must happen before completing the writers from our group to prevent
661 // a race where the owning thread of one of these writers can start a new
662 // write operation.
663 Writer dummy;
664 Writer* head = newest_writer_.load(std::memory_order_acquire);
665 if (head != last_writer ||
666 !newest_writer_.compare_exchange_strong(head, &dummy)) {
667 // Either last_writer wasn't the head during the load(), or it was the
668 // head during the load() but somebody else pushed onto the list before
669 // we did the compare_exchange_strong (causing it to fail). In the latter
670 // case compare_exchange_strong has the effect of re-reading its first
671 // param (head). No need to retry a failing CAS, because only a departing
672 // leader (which we are at the moment) can remove nodes from the list.
673 assert(head != last_writer);
674
675 // After walking link_older starting from head (if not already done) we
676 // will be able to traverse w->link_newer below.
677 CreateMissingNewerLinks(head);
678 assert(last_writer->link_newer != nullptr);
679 last_writer->link_newer->link_older = &dummy;
680 dummy.link_newer = last_writer->link_newer;
681 }
682
683 // Complete writers that don't write to memtable
11fdf7f2
TL
684 for (Writer* w = last_writer; w != leader;) {
685 Writer* next = w->link_older;
686 w->status = status;
687 if (!w->ShouldWriteToMemtable()) {
688 CompleteFollower(w, write_group);
689 }
690 w = next;
691 }
692 if (!leader->ShouldWriteToMemtable()) {
693 CompleteLeader(write_group);
694 }
695
1e59de90
TL
696 TEST_SYNC_POINT_CALLBACK(
697 "WriteThread::ExitAsBatchGroupLeader:AfterCompleteWriters",
698 &write_group);
11fdf7f2 699
1e59de90 700 // Link the remaining of the group to memtable writer list.
11fdf7f2
TL
701 // We have to link our group to memtable writer queue before wake up the
702 // next leader or set newest_writer_ to null, otherwise the next leader
703 // can run ahead of us and link to memtable writer queue before we do.
704 if (write_group.size > 0) {
705 if (LinkGroup(write_group, &newest_memtable_writer_)) {
706 // The leader can now be different from current writer.
707 SetState(write_group.leader, STATE_MEMTABLE_WRITER_LEADER);
708 }
709 }
710
1e59de90
TL
711 // Unlink the dummy writer from the list and identify the new leader
712 head = newest_writer_.load(std::memory_order_acquire);
713 if (head != &dummy ||
714 !newest_writer_.compare_exchange_strong(head, nullptr)) {
715 CreateMissingNewerLinks(head);
716 Writer* new_leader = dummy.link_newer;
717 assert(new_leader != nullptr);
718 new_leader->link_older = nullptr;
719 SetState(new_leader, STATE_GROUP_LEADER);
11fdf7f2
TL
720 }
721
1e59de90
TL
722 AwaitState(leader,
723 STATE_MEMTABLE_WRITER_LEADER | STATE_PARALLEL_MEMTABLE_WRITER |
724 STATE_COMPLETED,
11fdf7f2
TL
725 &eabgl_ctx);
726 } else {
727 Writer* head = newest_writer_.load(std::memory_order_acquire);
728 if (head != last_writer ||
729 !newest_writer_.compare_exchange_strong(head, nullptr)) {
1e59de90
TL
730 // Either last_writer wasn't the head during the load(), or it was the
731 // head during the load() but somebody else pushed onto the list before
11fdf7f2
TL
732 // we did the compare_exchange_strong (causing it to fail). In the
733 // latter case compare_exchange_strong has the effect of re-reading
734 // its first param (head). No need to retry a failing CAS, because
735 // only a departing leader (which we are at the moment) can remove
736 // nodes from the list.
737 assert(head != last_writer);
738
739 // After walking link_older starting from head (if not already done)
740 // we will be able to traverse w->link_newer below. This function
741 // can only be called from an active leader, only a leader can
742 // clear newest_writer_, we didn't, and only a clear newest_writer_
743 // could cause the next leader to start their work without a call
744 // to MarkJoined, so we can definitely conclude that no other leader
745 // work is going on here (with or without db mutex).
746 CreateMissingNewerLinks(head);
1e59de90 747 assert(last_writer->link_newer != nullptr);
11fdf7f2
TL
748 assert(last_writer->link_newer->link_older == last_writer);
749 last_writer->link_newer->link_older = nullptr;
750
751 // Next leader didn't self-identify, because newest_writer_ wasn't
752 // nullptr when they enqueued (we were definitely enqueued before them
753 // and are still in the list). That means leader handoff occurs when
754 // we call MarkJoined
755 SetState(last_writer->link_newer, STATE_GROUP_LEADER);
756 }
757 // else nobody else was waiting, although there might already be a new
758 // leader now
759
760 while (last_writer != leader) {
20effc67 761 assert(last_writer);
11fdf7f2
TL
762 last_writer->status = status;
763 // we need to read link_older before calling SetState, because as soon
764 // as it is marked committed the other thread's Await may return and
765 // deallocate the Writer.
766 auto next = last_writer->link_older;
767 SetState(last_writer, STATE_COMPLETED);
768
769 last_writer = next;
770 }
7c673cae
FG
771 }
772}
773
11fdf7f2 774static WriteThread::AdaptationContext eu_ctx("EnterUnbatched");
7c673cae 775void WriteThread::EnterUnbatched(Writer* w, InstrumentedMutex* mu) {
11fdf7f2
TL
776 assert(w != nullptr && w->batch == nullptr);
777 mu->Unlock();
778 bool linked_as_leader = LinkOne(w, &newest_writer_);
7c673cae 779 if (!linked_as_leader) {
7c673cae
FG
780 TEST_SYNC_POINT("WriteThread::EnterUnbatched:Wait");
781 // Last leader will not pick us as a follower since our batch is nullptr
11fdf7f2 782 AwaitState(w, STATE_GROUP_LEADER, &eu_ctx);
7c673cae 783 }
11fdf7f2
TL
784 if (enable_pipelined_write_) {
785 WaitForMemTableWriters();
786 }
787 mu->Lock();
7c673cae
FG
788}
789
790void WriteThread::ExitUnbatched(Writer* w) {
11fdf7f2
TL
791 assert(w != nullptr);
792 Writer* newest_writer = w;
793 if (!newest_writer_.compare_exchange_strong(newest_writer, nullptr)) {
794 CreateMissingNewerLinks(newest_writer);
795 Writer* next_leader = w->link_newer;
796 assert(next_leader != nullptr);
797 next_leader->link_older = nullptr;
798 SetState(next_leader, STATE_GROUP_LEADER);
799 }
800}
801
802static WriteThread::AdaptationContext wfmw_ctx("WaitForMemTableWriters");
803void WriteThread::WaitForMemTableWriters() {
804 assert(enable_pipelined_write_);
805 if (newest_memtable_writer_.load() == nullptr) {
806 return;
807 }
808 Writer w;
809 if (!LinkOne(&w, &newest_memtable_writer_)) {
810 AwaitState(&w, STATE_MEMTABLE_WRITER_LEADER, &wfmw_ctx);
811 }
812 newest_memtable_writer_.store(nullptr);
7c673cae
FG
813}
814
f67539c2 815} // namespace ROCKSDB_NAMESPACE