]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/write_thread.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / db / write_thread.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "db/write_thread.h"
7 #include <chrono>
8 #include <thread>
9 #include "db/column_family.h"
10 #include "monitoring/perf_context_imp.h"
11 #include "port/port.h"
12 #include "util/random.h"
13 #include "util/sync_point.h"
14
15 namespace rocksdb {
16
17 WriteThread::WriteThread(const ImmutableDBOptions& db_options)
18 : max_yield_usec_(db_options.enable_write_thread_adaptive_yield
19 ? db_options.write_thread_max_yield_usec
20 : 0),
21 slow_yield_usec_(db_options.write_thread_slow_yield_usec),
22 allow_concurrent_memtable_write_(
23 db_options.allow_concurrent_memtable_write),
24 enable_pipelined_write_(db_options.enable_pipelined_write),
25 newest_writer_(nullptr),
26 newest_memtable_writer_(nullptr),
27 last_sequence_(0),
28 write_stall_dummy_(),
29 stall_mu_(),
30 stall_cv_(&stall_mu_) {}
31
32 uint8_t WriteThread::BlockingAwaitState(Writer* w, uint8_t goal_mask) {
33 // We're going to block. Lazily create the mutex. We guarantee
34 // propagation of this construction to the waker via the
35 // STATE_LOCKED_WAITING state. The waker won't try to touch the mutex
36 // or the condvar unless they CAS away the STATE_LOCKED_WAITING that
37 // we install below.
38 w->CreateMutex();
39
40 auto state = w->state.load(std::memory_order_acquire);
41 assert(state != STATE_LOCKED_WAITING);
42 if ((state & goal_mask) == 0 &&
43 w->state.compare_exchange_strong(state, STATE_LOCKED_WAITING)) {
44 // we have permission (and an obligation) to use StateMutex
45 std::unique_lock<std::mutex> guard(w->StateMutex());
46 w->StateCV().wait(guard, [w] {
47 return w->state.load(std::memory_order_relaxed) != STATE_LOCKED_WAITING;
48 });
49 state = w->state.load(std::memory_order_relaxed);
50 }
51 // else tricky. Goal is met or CAS failed. In the latter case the waker
52 // must have changed the state, and compare_exchange_strong has updated
53 // our local variable with the new one. At the moment WriteThread never
54 // waits for a transition across intermediate states, so we know that
55 // since a state change has occurred the goal must have been met.
56 assert((state & goal_mask) != 0);
57 return state;
58 }
59
60 uint8_t WriteThread::AwaitState(Writer* w, uint8_t goal_mask,
61 AdaptationContext* ctx) {
62 uint8_t state;
63
64 // 1. Busy loop using "pause" for 1 micro sec
65 // 2. Else SOMETIMES busy loop using "yield" for 100 micro sec (default)
66 // 3. Else blocking wait
67
68 // On a modern Xeon each loop takes about 7 nanoseconds (most of which
69 // is the effect of the pause instruction), so 200 iterations is a bit
70 // more than a microsecond. This is long enough that waits longer than
71 // this can amortize the cost of accessing the clock and yielding.
72 for (uint32_t tries = 0; tries < 200; ++tries) {
73 state = w->state.load(std::memory_order_acquire);
74 if ((state & goal_mask) != 0) {
75 return state;
76 }
77 port::AsmVolatilePause();
78 }
79
80 // This is below the fast path, so that the stat is zero when all writes are
81 // from the same thread.
82 PERF_TIMER_GUARD(write_thread_wait_nanos);
83
84 // If we're only going to end up waiting a short period of time,
85 // it can be a lot more efficient to call std::this_thread::yield()
86 // in a loop than to block in StateMutex(). For reference, on my 4.0
87 // SELinux test server with support for syscall auditing enabled, the
88 // minimum latency between FUTEX_WAKE to returning from FUTEX_WAIT is
89 // 2.7 usec, and the average is more like 10 usec. That can be a big
90 // drag on RockDB's single-writer design. Of course, spinning is a
91 // bad idea if other threads are waiting to run or if we're going to
92 // wait for a long time. How do we decide?
93 //
94 // We break waiting into 3 categories: short-uncontended,
95 // short-contended, and long. If we had an oracle, then we would always
96 // spin for short-uncontended, always block for long, and our choice for
97 // short-contended might depend on whether we were trying to optimize
98 // RocksDB throughput or avoid being greedy with system resources.
99 //
100 // Bucketing into short or long is easy by measuring elapsed time.
101 // Differentiating short-uncontended from short-contended is a bit
102 // trickier, but not too bad. We could look for involuntary context
103 // switches using getrusage(RUSAGE_THREAD, ..), but it's less work
104 // (portability code and CPU) to just look for yield calls that take
105 // longer than we expect. sched_yield() doesn't actually result in any
106 // context switch overhead if there are no other runnable processes
107 // on the current core, in which case it usually takes less than
108 // a microsecond.
109 //
110 // There are two primary tunables here: the threshold between "short"
111 // and "long" waits, and the threshold at which we suspect that a yield
112 // is slow enough to indicate we should probably block. If these
113 // thresholds are chosen well then CPU-bound workloads that don't
114 // have more threads than cores will experience few context switches
115 // (voluntary or involuntary), and the total number of context switches
116 // (voluntary and involuntary) will not be dramatically larger (maybe
117 // 2x) than the number of voluntary context switches that occur when
118 // --max_yield_wait_micros=0.
119 //
120 // There's another constant, which is the number of slow yields we will
121 // tolerate before reversing our previous decision. Solitary slow
122 // yields are pretty common (low-priority small jobs ready to run),
123 // so this should be at least 2. We set this conservatively to 3 so
124 // that we can also immediately schedule a ctx adaptation, rather than
125 // waiting for the next update_ctx.
126
127 const size_t kMaxSlowYieldsWhileSpinning = 3;
128
129 // Whether the yield approach has any credit in this context. The credit is
130 // added by yield being succesfull before timing out, and decreased otherwise.
131 auto& yield_credit = ctx->value;
132 // Update the yield_credit based on sample runs or right after a hard failure
133 bool update_ctx = false;
134 // Should we reinforce the yield credit
135 bool would_spin_again = false;
136 // The samling base for updating the yeild credit. The sampling rate would be
137 // 1/sampling_base.
138 const int sampling_base = 256;
139
140 if (max_yield_usec_ > 0) {
141 update_ctx = Random::GetTLSInstance()->OneIn(sampling_base);
142
143 if (update_ctx || yield_credit.load(std::memory_order_relaxed) >= 0) {
144 // we're updating the adaptation statistics, or spinning has >
145 // 50% chance of being shorter than max_yield_usec_ and causing no
146 // involuntary context switches
147 auto spin_begin = std::chrono::steady_clock::now();
148
149 // this variable doesn't include the final yield (if any) that
150 // causes the goal to be met
151 size_t slow_yield_count = 0;
152
153 auto iter_begin = spin_begin;
154 while ((iter_begin - spin_begin) <=
155 std::chrono::microseconds(max_yield_usec_)) {
156 std::this_thread::yield();
157
158 state = w->state.load(std::memory_order_acquire);
159 if ((state & goal_mask) != 0) {
160 // success
161 would_spin_again = true;
162 break;
163 }
164
165 auto now = std::chrono::steady_clock::now();
166 if (now == iter_begin ||
167 now - iter_begin >= std::chrono::microseconds(slow_yield_usec_)) {
168 // conservatively count it as a slow yield if our clock isn't
169 // accurate enough to measure the yield duration
170 ++slow_yield_count;
171 if (slow_yield_count >= kMaxSlowYieldsWhileSpinning) {
172 // Not just one ivcsw, but several. Immediately update yield_credit
173 // and fall back to blocking
174 update_ctx = true;
175 break;
176 }
177 }
178 iter_begin = now;
179 }
180 }
181 }
182
183 if ((state & goal_mask) == 0) {
184 state = BlockingAwaitState(w, goal_mask);
185 }
186
187 if (update_ctx) {
188 // Since our update is sample based, it is ok if a thread overwrites the
189 // updates by other threads. Thus the update does not have to be atomic.
190 auto v = yield_credit.load(std::memory_order_relaxed);
191 // fixed point exponential decay with decay constant 1/1024, with +1
192 // and -1 scaled to avoid overflow for int32_t
193 //
194 // On each update the positive credit is decayed by a facor of 1/1024 (i.e.,
195 // 0.1%). If the sampled yield was successful, the credit is also increased
196 // by X. Setting X=2^17 ensures that the credit never exceeds
197 // 2^17*2^10=2^27, which is lower than 2^31 the upperbound of int32_t. Same
198 // logic applies to negative credits.
199 v = v - (v / 1024) + (would_spin_again ? 1 : -1) * 131072;
200 yield_credit.store(v, std::memory_order_relaxed);
201 }
202
203 assert((state & goal_mask) != 0);
204 return state;
205 }
206
207 void WriteThread::SetState(Writer* w, uint8_t new_state) {
208 auto state = w->state.load(std::memory_order_acquire);
209 if (state == STATE_LOCKED_WAITING ||
210 !w->state.compare_exchange_strong(state, new_state)) {
211 assert(state == STATE_LOCKED_WAITING);
212
213 std::lock_guard<std::mutex> guard(w->StateMutex());
214 assert(w->state.load(std::memory_order_relaxed) != new_state);
215 w->state.store(new_state, std::memory_order_relaxed);
216 w->StateCV().notify_one();
217 }
218 }
219
220 bool WriteThread::LinkOne(Writer* w, std::atomic<Writer*>* newest_writer) {
221 assert(newest_writer != nullptr);
222 assert(w->state == STATE_INIT);
223 Writer* writers = newest_writer->load(std::memory_order_relaxed);
224 while (true) {
225 // If write stall in effect, and w->no_slowdown is not true,
226 // block here until stall is cleared. If its true, then return
227 // immediately
228 if (writers == &write_stall_dummy_) {
229 if (w->no_slowdown) {
230 w->status = Status::Incomplete("Write stall");
231 SetState(w, STATE_COMPLETED);
232 return false;
233 }
234 // Since no_slowdown is false, wait here to be notified of the write
235 // stall clearing
236 {
237 MutexLock lock(&stall_mu_);
238 writers = newest_writer->load(std::memory_order_relaxed);
239 if (writers == &write_stall_dummy_) {
240 stall_cv_.Wait();
241 // Load newest_writers_ again since it may have changed
242 writers = newest_writer->load(std::memory_order_relaxed);
243 continue;
244 }
245 }
246 }
247 w->link_older = writers;
248 if (newest_writer->compare_exchange_weak(writers, w)) {
249 return (writers == nullptr);
250 }
251 }
252 }
253
254 bool WriteThread::LinkGroup(WriteGroup& write_group,
255 std::atomic<Writer*>* newest_writer) {
256 assert(newest_writer != nullptr);
257 Writer* leader = write_group.leader;
258 Writer* last_writer = write_group.last_writer;
259 Writer* w = last_writer;
260 while (true) {
261 // Unset link_newer pointers to make sure when we call
262 // CreateMissingNewerLinks later it create all missing links.
263 w->link_newer = nullptr;
264 w->write_group = nullptr;
265 if (w == leader) {
266 break;
267 }
268 w = w->link_older;
269 }
270 Writer* newest = newest_writer->load(std::memory_order_relaxed);
271 while (true) {
272 leader->link_older = newest;
273 if (newest_writer->compare_exchange_weak(newest, last_writer)) {
274 return (newest == nullptr);
275 }
276 }
277 }
278
279 void WriteThread::CreateMissingNewerLinks(Writer* head) {
280 while (true) {
281 Writer* next = head->link_older;
282 if (next == nullptr || next->link_newer != nullptr) {
283 assert(next == nullptr || next->link_newer == head);
284 break;
285 }
286 next->link_newer = head;
287 head = next;
288 }
289 }
290
291 WriteThread::Writer* WriteThread::FindNextLeader(Writer* from,
292 Writer* boundary) {
293 assert(from != nullptr && from != boundary);
294 Writer* current = from;
295 while (current->link_older != boundary) {
296 current = current->link_older;
297 assert(current != nullptr);
298 }
299 return current;
300 }
301
302 void WriteThread::CompleteLeader(WriteGroup& write_group) {
303 assert(write_group.size > 0);
304 Writer* leader = write_group.leader;
305 if (write_group.size == 1) {
306 write_group.leader = nullptr;
307 write_group.last_writer = nullptr;
308 } else {
309 assert(leader->link_newer != nullptr);
310 leader->link_newer->link_older = nullptr;
311 write_group.leader = leader->link_newer;
312 }
313 write_group.size -= 1;
314 SetState(leader, STATE_COMPLETED);
315 }
316
317 void WriteThread::CompleteFollower(Writer* w, WriteGroup& write_group) {
318 assert(write_group.size > 1);
319 assert(w != write_group.leader);
320 if (w == write_group.last_writer) {
321 w->link_older->link_newer = nullptr;
322 write_group.last_writer = w->link_older;
323 } else {
324 w->link_older->link_newer = w->link_newer;
325 w->link_newer->link_older = w->link_older;
326 }
327 write_group.size -= 1;
328 SetState(w, STATE_COMPLETED);
329 }
330
331 void WriteThread::BeginWriteStall() {
332 LinkOne(&write_stall_dummy_, &newest_writer_);
333
334 // Walk writer list until w->write_group != nullptr. The current write group
335 // will not have a mix of slowdown/no_slowdown, so its ok to stop at that
336 // point
337 Writer* w = write_stall_dummy_.link_older;
338 Writer* prev = &write_stall_dummy_;
339 while (w != nullptr && w->write_group == nullptr) {
340 if (w->no_slowdown) {
341 prev->link_older = w->link_older;
342 w->status = Status::Incomplete("Write stall");
343 SetState(w, STATE_COMPLETED);
344 w = prev->link_older;
345 } else {
346 prev = w;
347 w = w->link_older;
348 }
349 }
350 }
351
352 void WriteThread::EndWriteStall() {
353 MutexLock lock(&stall_mu_);
354
355 assert(newest_writer_.load(std::memory_order_relaxed) == &write_stall_dummy_);
356 newest_writer_.exchange(write_stall_dummy_.link_older);
357
358 // Wake up writers
359 stall_cv_.SignalAll();
360 }
361
362 static WriteThread::AdaptationContext jbg_ctx("JoinBatchGroup");
363 void WriteThread::JoinBatchGroup(Writer* w) {
364 TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Start", w);
365 assert(w->batch != nullptr);
366
367 bool linked_as_leader = LinkOne(w, &newest_writer_);
368
369 if (linked_as_leader) {
370 SetState(w, STATE_GROUP_LEADER);
371 }
372
373 TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w);
374
375 if (!linked_as_leader) {
376 /**
377 * Wait util:
378 * 1) An existing leader pick us as the new leader when it finishes
379 * 2) An existing leader pick us as its follewer and
380 * 2.1) finishes the memtable writes on our behalf
381 * 2.2) Or tell us to finish the memtable writes in pralallel
382 * 3) (pipelined write) An existing leader pick us as its follower and
383 * finish book-keeping and WAL write for us, enqueue us as pending
384 * memtable writer, and
385 * 3.1) we become memtable writer group leader, or
386 * 3.2) an existing memtable writer group leader tell us to finish memtable
387 * writes in parallel.
388 */
389 TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:BeganWaiting", w);
390 AwaitState(w, STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER |
391 STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
392 &jbg_ctx);
393 TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w);
394 }
395 }
396
397 size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader,
398 WriteGroup* write_group) {
399 assert(leader->link_older == nullptr);
400 assert(leader->batch != nullptr);
401 assert(write_group != nullptr);
402
403 size_t size = WriteBatchInternal::ByteSize(leader->batch);
404
405 // Allow the group to grow up to a maximum size, but if the
406 // original write is small, limit the growth so we do not slow
407 // down the small write too much.
408 size_t max_size = 1 << 20;
409 if (size <= (128 << 10)) {
410 max_size = size + (128 << 10);
411 }
412
413 leader->write_group = write_group;
414 write_group->leader = leader;
415 write_group->last_writer = leader;
416 write_group->size = 1;
417 Writer* newest_writer = newest_writer_.load(std::memory_order_acquire);
418
419 // This is safe regardless of any db mutex status of the caller. Previous
420 // calls to ExitAsGroupLeader either didn't call CreateMissingNewerLinks
421 // (they emptied the list and then we added ourself as leader) or had to
422 // explicitly wake us up (the list was non-empty when we added ourself,
423 // so we have already received our MarkJoined).
424 CreateMissingNewerLinks(newest_writer);
425
426 // Tricky. Iteration start (leader) is exclusive and finish
427 // (newest_writer) is inclusive. Iteration goes from old to new.
428 Writer* w = leader;
429 while (w != newest_writer) {
430 w = w->link_newer;
431
432 if (w->sync && !leader->sync) {
433 // Do not include a sync write into a batch handled by a non-sync write.
434 break;
435 }
436
437 if (w->no_slowdown != leader->no_slowdown) {
438 // Do not mix writes that are ok with delays with the ones that
439 // request fail on delays.
440 break;
441 }
442
443 if (!w->disable_wal && leader->disable_wal) {
444 // Do not include a write that needs WAL into a batch that has
445 // WAL disabled.
446 break;
447 }
448
449 if (w->batch == nullptr) {
450 // Do not include those writes with nullptr batch. Those are not writes,
451 // those are something else. They want to be alone
452 break;
453 }
454
455 if (w->callback != nullptr && !w->callback->AllowWriteBatching()) {
456 // dont batch writes that don't want to be batched
457 break;
458 }
459
460 auto batch_size = WriteBatchInternal::ByteSize(w->batch);
461 if (size + batch_size > max_size) {
462 // Do not make batch too big
463 break;
464 }
465
466 w->write_group = write_group;
467 size += batch_size;
468 write_group->last_writer = w;
469 write_group->size++;
470 }
471 TEST_SYNC_POINT_CALLBACK("WriteThread::EnterAsBatchGroupLeader:End", w);
472 return size;
473 }
474
475 void WriteThread::EnterAsMemTableWriter(Writer* leader,
476 WriteGroup* write_group) {
477 assert(leader != nullptr);
478 assert(leader->link_older == nullptr);
479 assert(leader->batch != nullptr);
480 assert(write_group != nullptr);
481
482 size_t size = WriteBatchInternal::ByteSize(leader->batch);
483
484 // Allow the group to grow up to a maximum size, but if the
485 // original write is small, limit the growth so we do not slow
486 // down the small write too much.
487 size_t max_size = 1 << 20;
488 if (size <= (128 << 10)) {
489 max_size = size + (128 << 10);
490 }
491
492 leader->write_group = write_group;
493 write_group->leader = leader;
494 write_group->size = 1;
495 Writer* last_writer = leader;
496
497 if (!allow_concurrent_memtable_write_ || !leader->batch->HasMerge()) {
498 Writer* newest_writer = newest_memtable_writer_.load();
499 CreateMissingNewerLinks(newest_writer);
500
501 Writer* w = leader;
502 while (w != newest_writer) {
503 w = w->link_newer;
504
505 if (w->batch == nullptr) {
506 break;
507 }
508
509 if (w->batch->HasMerge()) {
510 break;
511 }
512
513 if (!allow_concurrent_memtable_write_) {
514 auto batch_size = WriteBatchInternal::ByteSize(w->batch);
515 if (size + batch_size > max_size) {
516 // Do not make batch too big
517 break;
518 }
519 size += batch_size;
520 }
521
522 w->write_group = write_group;
523 last_writer = w;
524 write_group->size++;
525 }
526 }
527
528 write_group->last_writer = last_writer;
529 write_group->last_sequence =
530 last_writer->sequence + WriteBatchInternal::Count(last_writer->batch) - 1;
531 }
532
533 void WriteThread::ExitAsMemTableWriter(Writer* /*self*/,
534 WriteGroup& write_group) {
535 Writer* leader = write_group.leader;
536 Writer* last_writer = write_group.last_writer;
537
538 Writer* newest_writer = last_writer;
539 if (!newest_memtable_writer_.compare_exchange_strong(newest_writer,
540 nullptr)) {
541 CreateMissingNewerLinks(newest_writer);
542 Writer* next_leader = last_writer->link_newer;
543 assert(next_leader != nullptr);
544 next_leader->link_older = nullptr;
545 SetState(next_leader, STATE_MEMTABLE_WRITER_LEADER);
546 }
547 Writer* w = leader;
548 while (true) {
549 if (!write_group.status.ok()) {
550 w->status = write_group.status;
551 }
552 Writer* next = w->link_newer;
553 if (w != leader) {
554 SetState(w, STATE_COMPLETED);
555 }
556 if (w == last_writer) {
557 break;
558 }
559 w = next;
560 }
561 // Note that leader has to exit last, since it owns the write group.
562 SetState(leader, STATE_COMPLETED);
563 }
564
565 void WriteThread::LaunchParallelMemTableWriters(WriteGroup* write_group) {
566 assert(write_group != nullptr);
567 write_group->running.store(write_group->size);
568 for (auto w : *write_group) {
569 SetState(w, STATE_PARALLEL_MEMTABLE_WRITER);
570 }
571 }
572
573 static WriteThread::AdaptationContext cpmtw_ctx("CompleteParallelMemTableWriter");
574 // This method is called by both the leader and parallel followers
575 bool WriteThread::CompleteParallelMemTableWriter(Writer* w) {
576
577 auto* write_group = w->write_group;
578 if (!w->status.ok()) {
579 std::lock_guard<std::mutex> guard(write_group->leader->StateMutex());
580 write_group->status = w->status;
581 }
582
583 if (write_group->running-- > 1) {
584 // we're not the last one
585 AwaitState(w, STATE_COMPLETED, &cpmtw_ctx);
586 return false;
587 }
588 // else we're the last parallel worker and should perform exit duties.
589 w->status = write_group->status;
590 return true;
591 }
592
593 void WriteThread::ExitAsBatchGroupFollower(Writer* w) {
594 auto* write_group = w->write_group;
595
596 assert(w->state == STATE_PARALLEL_MEMTABLE_WRITER);
597 assert(write_group->status.ok());
598 ExitAsBatchGroupLeader(*write_group, write_group->status);
599 assert(w->status.ok());
600 assert(w->state == STATE_COMPLETED);
601 SetState(write_group->leader, STATE_COMPLETED);
602 }
603
604 static WriteThread::AdaptationContext eabgl_ctx("ExitAsBatchGroupLeader");
605 void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
606 Status status) {
607 Writer* leader = write_group.leader;
608 Writer* last_writer = write_group.last_writer;
609 assert(leader->link_older == nullptr);
610
611 // Propagate memtable write error to the whole group.
612 if (status.ok() && !write_group.status.ok()) {
613 status = write_group.status;
614 }
615
616 if (enable_pipelined_write_) {
617 // Notify writers don't write to memtable to exit.
618 for (Writer* w = last_writer; w != leader;) {
619 Writer* next = w->link_older;
620 w->status = status;
621 if (!w->ShouldWriteToMemtable()) {
622 CompleteFollower(w, write_group);
623 }
624 w = next;
625 }
626 if (!leader->ShouldWriteToMemtable()) {
627 CompleteLeader(write_group);
628 }
629
630 Writer* next_leader = nullptr;
631
632 // Look for next leader before we call LinkGroup. If there isn't
633 // pending writers, place a dummy writer at the tail of the queue
634 // so we know the boundary of the current write group.
635 Writer dummy;
636 Writer* expected = last_writer;
637 bool has_dummy = newest_writer_.compare_exchange_strong(expected, &dummy);
638 if (!has_dummy) {
639 // We find at least one pending writer when we insert dummy. We search
640 // for next leader from there.
641 next_leader = FindNextLeader(expected, last_writer);
642 assert(next_leader != nullptr && next_leader != last_writer);
643 }
644
645 // Link the ramaining of the group to memtable writer list.
646 //
647 // We have to link our group to memtable writer queue before wake up the
648 // next leader or set newest_writer_ to null, otherwise the next leader
649 // can run ahead of us and link to memtable writer queue before we do.
650 if (write_group.size > 0) {
651 if (LinkGroup(write_group, &newest_memtable_writer_)) {
652 // The leader can now be different from current writer.
653 SetState(write_group.leader, STATE_MEMTABLE_WRITER_LEADER);
654 }
655 }
656
657 // If we have inserted dummy in the queue, remove it now and check if there
658 // are pending writer join the queue since we insert the dummy. If so,
659 // look for next leader again.
660 if (has_dummy) {
661 assert(next_leader == nullptr);
662 expected = &dummy;
663 bool has_pending_writer =
664 !newest_writer_.compare_exchange_strong(expected, nullptr);
665 if (has_pending_writer) {
666 next_leader = FindNextLeader(expected, &dummy);
667 assert(next_leader != nullptr && next_leader != &dummy);
668 }
669 }
670
671 if (next_leader != nullptr) {
672 next_leader->link_older = nullptr;
673 SetState(next_leader, STATE_GROUP_LEADER);
674 }
675 AwaitState(leader, STATE_MEMTABLE_WRITER_LEADER |
676 STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
677 &eabgl_ctx);
678 } else {
679 Writer* head = newest_writer_.load(std::memory_order_acquire);
680 if (head != last_writer ||
681 !newest_writer_.compare_exchange_strong(head, nullptr)) {
682 // Either w wasn't the head during the load(), or it was the head
683 // during the load() but somebody else pushed onto the list before
684 // we did the compare_exchange_strong (causing it to fail). In the
685 // latter case compare_exchange_strong has the effect of re-reading
686 // its first param (head). No need to retry a failing CAS, because
687 // only a departing leader (which we are at the moment) can remove
688 // nodes from the list.
689 assert(head != last_writer);
690
691 // After walking link_older starting from head (if not already done)
692 // we will be able to traverse w->link_newer below. This function
693 // can only be called from an active leader, only a leader can
694 // clear newest_writer_, we didn't, and only a clear newest_writer_
695 // could cause the next leader to start their work without a call
696 // to MarkJoined, so we can definitely conclude that no other leader
697 // work is going on here (with or without db mutex).
698 CreateMissingNewerLinks(head);
699 assert(last_writer->link_newer->link_older == last_writer);
700 last_writer->link_newer->link_older = nullptr;
701
702 // Next leader didn't self-identify, because newest_writer_ wasn't
703 // nullptr when they enqueued (we were definitely enqueued before them
704 // and are still in the list). That means leader handoff occurs when
705 // we call MarkJoined
706 SetState(last_writer->link_newer, STATE_GROUP_LEADER);
707 }
708 // else nobody else was waiting, although there might already be a new
709 // leader now
710
711 while (last_writer != leader) {
712 last_writer->status = status;
713 // we need to read link_older before calling SetState, because as soon
714 // as it is marked committed the other thread's Await may return and
715 // deallocate the Writer.
716 auto next = last_writer->link_older;
717 SetState(last_writer, STATE_COMPLETED);
718
719 last_writer = next;
720 }
721 }
722 }
723
724 static WriteThread::AdaptationContext eu_ctx("EnterUnbatched");
725 void WriteThread::EnterUnbatched(Writer* w, InstrumentedMutex* mu) {
726 assert(w != nullptr && w->batch == nullptr);
727 mu->Unlock();
728 bool linked_as_leader = LinkOne(w, &newest_writer_);
729 if (!linked_as_leader) {
730 TEST_SYNC_POINT("WriteThread::EnterUnbatched:Wait");
731 // Last leader will not pick us as a follower since our batch is nullptr
732 AwaitState(w, STATE_GROUP_LEADER, &eu_ctx);
733 }
734 if (enable_pipelined_write_) {
735 WaitForMemTableWriters();
736 }
737 mu->Lock();
738 }
739
740 void WriteThread::ExitUnbatched(Writer* w) {
741 assert(w != nullptr);
742 Writer* newest_writer = w;
743 if (!newest_writer_.compare_exchange_strong(newest_writer, nullptr)) {
744 CreateMissingNewerLinks(newest_writer);
745 Writer* next_leader = w->link_newer;
746 assert(next_leader != nullptr);
747 next_leader->link_older = nullptr;
748 SetState(next_leader, STATE_GROUP_LEADER);
749 }
750 }
751
752 static WriteThread::AdaptationContext wfmw_ctx("WaitForMemTableWriters");
753 void WriteThread::WaitForMemTableWriters() {
754 assert(enable_pipelined_write_);
755 if (newest_memtable_writer_.load() == nullptr) {
756 return;
757 }
758 Writer w;
759 if (!LinkOne(&w, &newest_memtable_writer_)) {
760 AwaitState(&w, STATE_MEMTABLE_WRITER_LEADER, &wfmw_ctx);
761 }
762 newest_memtable_writer_.store(nullptr);
763 }
764
765 } // namespace rocksdb