]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/write_thread.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / db / write_thread.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "db/write_thread.h"
7 #include <chrono>
8 #include <thread>
9 #include "db/column_family.h"
10 #include "monitoring/perf_context_imp.h"
11 #include "port/port.h"
12 #include "test_util/sync_point.h"
13 #include "util/random.h"
14
15 namespace ROCKSDB_NAMESPACE {
16
17 WriteThread::WriteThread(const ImmutableDBOptions& db_options)
18 : max_yield_usec_(db_options.enable_write_thread_adaptive_yield
19 ? db_options.write_thread_max_yield_usec
20 : 0),
21 slow_yield_usec_(db_options.write_thread_slow_yield_usec),
22 allow_concurrent_memtable_write_(
23 db_options.allow_concurrent_memtable_write),
24 enable_pipelined_write_(db_options.enable_pipelined_write),
25 max_write_batch_group_size_bytes(
26 db_options.max_write_batch_group_size_bytes),
27 newest_writer_(nullptr),
28 newest_memtable_writer_(nullptr),
29 last_sequence_(0),
30 write_stall_dummy_(),
31 stall_mu_(),
32 stall_cv_(&stall_mu_) {}
33
34 uint8_t WriteThread::BlockingAwaitState(Writer* w, uint8_t goal_mask) {
35 // We're going to block. Lazily create the mutex. We guarantee
36 // propagation of this construction to the waker via the
37 // STATE_LOCKED_WAITING state. The waker won't try to touch the mutex
38 // or the condvar unless they CAS away the STATE_LOCKED_WAITING that
39 // we install below.
40 w->CreateMutex();
41
42 auto state = w->state.load(std::memory_order_acquire);
43 assert(state != STATE_LOCKED_WAITING);
44 if ((state & goal_mask) == 0 &&
45 w->state.compare_exchange_strong(state, STATE_LOCKED_WAITING)) {
46 // we have permission (and an obligation) to use StateMutex
47 std::unique_lock<std::mutex> guard(w->StateMutex());
48 w->StateCV().wait(guard, [w] {
49 return w->state.load(std::memory_order_relaxed) != STATE_LOCKED_WAITING;
50 });
51 state = w->state.load(std::memory_order_relaxed);
52 }
53 // else tricky. Goal is met or CAS failed. In the latter case the waker
54 // must have changed the state, and compare_exchange_strong has updated
55 // our local variable with the new one. At the moment WriteThread never
56 // waits for a transition across intermediate states, so we know that
57 // since a state change has occurred the goal must have been met.
58 assert((state & goal_mask) != 0);
59 return state;
60 }
61
62 uint8_t WriteThread::AwaitState(Writer* w, uint8_t goal_mask,
63 AdaptationContext* ctx) {
64 uint8_t state = 0;
65
66 // 1. Busy loop using "pause" for 1 micro sec
67 // 2. Else SOMETIMES busy loop using "yield" for 100 micro sec (default)
68 // 3. Else blocking wait
69
70 // On a modern Xeon each loop takes about 7 nanoseconds (most of which
71 // is the effect of the pause instruction), so 200 iterations is a bit
72 // more than a microsecond. This is long enough that waits longer than
73 // this can amortize the cost of accessing the clock and yielding.
74 for (uint32_t tries = 0; tries < 200; ++tries) {
75 state = w->state.load(std::memory_order_acquire);
76 if ((state & goal_mask) != 0) {
77 return state;
78 }
79 port::AsmVolatilePause();
80 }
81
82 // This is below the fast path, so that the stat is zero when all writes are
83 // from the same thread.
84 PERF_TIMER_GUARD(write_thread_wait_nanos);
85
86 // If we're only going to end up waiting a short period of time,
87 // it can be a lot more efficient to call std::this_thread::yield()
88 // in a loop than to block in StateMutex(). For reference, on my 4.0
89 // SELinux test server with support for syscall auditing enabled, the
90 // minimum latency between FUTEX_WAKE to returning from FUTEX_WAIT is
91 // 2.7 usec, and the average is more like 10 usec. That can be a big
92 // drag on RockDB's single-writer design. Of course, spinning is a
93 // bad idea if other threads are waiting to run or if we're going to
94 // wait for a long time. How do we decide?
95 //
96 // We break waiting into 3 categories: short-uncontended,
97 // short-contended, and long. If we had an oracle, then we would always
98 // spin for short-uncontended, always block for long, and our choice for
99 // short-contended might depend on whether we were trying to optimize
100 // RocksDB throughput or avoid being greedy with system resources.
101 //
102 // Bucketing into short or long is easy by measuring elapsed time.
103 // Differentiating short-uncontended from short-contended is a bit
104 // trickier, but not too bad. We could look for involuntary context
105 // switches using getrusage(RUSAGE_THREAD, ..), but it's less work
106 // (portability code and CPU) to just look for yield calls that take
107 // longer than we expect. sched_yield() doesn't actually result in any
108 // context switch overhead if there are no other runnable processes
109 // on the current core, in which case it usually takes less than
110 // a microsecond.
111 //
112 // There are two primary tunables here: the threshold between "short"
113 // and "long" waits, and the threshold at which we suspect that a yield
114 // is slow enough to indicate we should probably block. If these
115 // thresholds are chosen well then CPU-bound workloads that don't
116 // have more threads than cores will experience few context switches
117 // (voluntary or involuntary), and the total number of context switches
118 // (voluntary and involuntary) will not be dramatically larger (maybe
119 // 2x) than the number of voluntary context switches that occur when
120 // --max_yield_wait_micros=0.
121 //
122 // There's another constant, which is the number of slow yields we will
123 // tolerate before reversing our previous decision. Solitary slow
124 // yields are pretty common (low-priority small jobs ready to run),
125 // so this should be at least 2. We set this conservatively to 3 so
126 // that we can also immediately schedule a ctx adaptation, rather than
127 // waiting for the next update_ctx.
128
129 const size_t kMaxSlowYieldsWhileSpinning = 3;
130
131 // Whether the yield approach has any credit in this context. The credit is
132 // added by yield being succesfull before timing out, and decreased otherwise.
133 auto& yield_credit = ctx->value;
134 // Update the yield_credit based on sample runs or right after a hard failure
135 bool update_ctx = false;
136 // Should we reinforce the yield credit
137 bool would_spin_again = false;
138 // The samling base for updating the yeild credit. The sampling rate would be
139 // 1/sampling_base.
140 const int sampling_base = 256;
141
142 if (max_yield_usec_ > 0) {
143 update_ctx = Random::GetTLSInstance()->OneIn(sampling_base);
144
145 if (update_ctx || yield_credit.load(std::memory_order_relaxed) >= 0) {
146 // we're updating the adaptation statistics, or spinning has >
147 // 50% chance of being shorter than max_yield_usec_ and causing no
148 // involuntary context switches
149 auto spin_begin = std::chrono::steady_clock::now();
150
151 // this variable doesn't include the final yield (if any) that
152 // causes the goal to be met
153 size_t slow_yield_count = 0;
154
155 auto iter_begin = spin_begin;
156 while ((iter_begin - spin_begin) <=
157 std::chrono::microseconds(max_yield_usec_)) {
158 std::this_thread::yield();
159
160 state = w->state.load(std::memory_order_acquire);
161 if ((state & goal_mask) != 0) {
162 // success
163 would_spin_again = true;
164 break;
165 }
166
167 auto now = std::chrono::steady_clock::now();
168 if (now == iter_begin ||
169 now - iter_begin >= std::chrono::microseconds(slow_yield_usec_)) {
170 // conservatively count it as a slow yield if our clock isn't
171 // accurate enough to measure the yield duration
172 ++slow_yield_count;
173 if (slow_yield_count >= kMaxSlowYieldsWhileSpinning) {
174 // Not just one ivcsw, but several. Immediately update yield_credit
175 // and fall back to blocking
176 update_ctx = true;
177 break;
178 }
179 }
180 iter_begin = now;
181 }
182 }
183 }
184
185 if ((state & goal_mask) == 0) {
186 TEST_SYNC_POINT_CALLBACK("WriteThread::AwaitState:BlockingWaiting", w);
187 state = BlockingAwaitState(w, goal_mask);
188 }
189
190 if (update_ctx) {
191 // Since our update is sample based, it is ok if a thread overwrites the
192 // updates by other threads. Thus the update does not have to be atomic.
193 auto v = yield_credit.load(std::memory_order_relaxed);
194 // fixed point exponential decay with decay constant 1/1024, with +1
195 // and -1 scaled to avoid overflow for int32_t
196 //
197 // On each update the positive credit is decayed by a facor of 1/1024 (i.e.,
198 // 0.1%). If the sampled yield was successful, the credit is also increased
199 // by X. Setting X=2^17 ensures that the credit never exceeds
200 // 2^17*2^10=2^27, which is lower than 2^31 the upperbound of int32_t. Same
201 // logic applies to negative credits.
202 v = v - (v / 1024) + (would_spin_again ? 1 : -1) * 131072;
203 yield_credit.store(v, std::memory_order_relaxed);
204 }
205
206 assert((state & goal_mask) != 0);
207 return state;
208 }
209
210 void WriteThread::SetState(Writer* w, uint8_t new_state) {
211 auto state = w->state.load(std::memory_order_acquire);
212 if (state == STATE_LOCKED_WAITING ||
213 !w->state.compare_exchange_strong(state, new_state)) {
214 assert(state == STATE_LOCKED_WAITING);
215
216 std::lock_guard<std::mutex> guard(w->StateMutex());
217 assert(w->state.load(std::memory_order_relaxed) != new_state);
218 w->state.store(new_state, std::memory_order_relaxed);
219 w->StateCV().notify_one();
220 }
221 }
222
223 bool WriteThread::LinkOne(Writer* w, std::atomic<Writer*>* newest_writer) {
224 assert(newest_writer != nullptr);
225 assert(w->state == STATE_INIT);
226 Writer* writers = newest_writer->load(std::memory_order_relaxed);
227 while (true) {
228 // If write stall in effect, and w->no_slowdown is not true,
229 // block here until stall is cleared. If its true, then return
230 // immediately
231 if (writers == &write_stall_dummy_) {
232 if (w->no_slowdown) {
233 w->status = Status::Incomplete("Write stall");
234 SetState(w, STATE_COMPLETED);
235 return false;
236 }
237 // Since no_slowdown is false, wait here to be notified of the write
238 // stall clearing
239 {
240 MutexLock lock(&stall_mu_);
241 writers = newest_writer->load(std::memory_order_relaxed);
242 if (writers == &write_stall_dummy_) {
243 stall_cv_.Wait();
244 // Load newest_writers_ again since it may have changed
245 writers = newest_writer->load(std::memory_order_relaxed);
246 continue;
247 }
248 }
249 }
250 w->link_older = writers;
251 if (newest_writer->compare_exchange_weak(writers, w)) {
252 return (writers == nullptr);
253 }
254 }
255 }
256
257 bool WriteThread::LinkGroup(WriteGroup& write_group,
258 std::atomic<Writer*>* newest_writer) {
259 assert(newest_writer != nullptr);
260 Writer* leader = write_group.leader;
261 Writer* last_writer = write_group.last_writer;
262 Writer* w = last_writer;
263 while (true) {
264 // Unset link_newer pointers to make sure when we call
265 // CreateMissingNewerLinks later it create all missing links.
266 w->link_newer = nullptr;
267 w->write_group = nullptr;
268 if (w == leader) {
269 break;
270 }
271 w = w->link_older;
272 }
273 Writer* newest = newest_writer->load(std::memory_order_relaxed);
274 while (true) {
275 leader->link_older = newest;
276 if (newest_writer->compare_exchange_weak(newest, last_writer)) {
277 return (newest == nullptr);
278 }
279 }
280 }
281
282 void WriteThread::CreateMissingNewerLinks(Writer* head) {
283 while (true) {
284 Writer* next = head->link_older;
285 if (next == nullptr || next->link_newer != nullptr) {
286 assert(next == nullptr || next->link_newer == head);
287 break;
288 }
289 next->link_newer = head;
290 head = next;
291 }
292 }
293
294 WriteThread::Writer* WriteThread::FindNextLeader(Writer* from,
295 Writer* boundary) {
296 assert(from != nullptr && from != boundary);
297 Writer* current = from;
298 while (current->link_older != boundary) {
299 current = current->link_older;
300 assert(current != nullptr);
301 }
302 return current;
303 }
304
305 void WriteThread::CompleteLeader(WriteGroup& write_group) {
306 assert(write_group.size > 0);
307 Writer* leader = write_group.leader;
308 if (write_group.size == 1) {
309 write_group.leader = nullptr;
310 write_group.last_writer = nullptr;
311 } else {
312 assert(leader->link_newer != nullptr);
313 leader->link_newer->link_older = nullptr;
314 write_group.leader = leader->link_newer;
315 }
316 write_group.size -= 1;
317 SetState(leader, STATE_COMPLETED);
318 }
319
320 void WriteThread::CompleteFollower(Writer* w, WriteGroup& write_group) {
321 assert(write_group.size > 1);
322 assert(w != write_group.leader);
323 if (w == write_group.last_writer) {
324 w->link_older->link_newer = nullptr;
325 write_group.last_writer = w->link_older;
326 } else {
327 w->link_older->link_newer = w->link_newer;
328 w->link_newer->link_older = w->link_older;
329 }
330 write_group.size -= 1;
331 SetState(w, STATE_COMPLETED);
332 }
333
334 void WriteThread::BeginWriteStall() {
335 LinkOne(&write_stall_dummy_, &newest_writer_);
336
337 // Walk writer list until w->write_group != nullptr. The current write group
338 // will not have a mix of slowdown/no_slowdown, so its ok to stop at that
339 // point
340 Writer* w = write_stall_dummy_.link_older;
341 Writer* prev = &write_stall_dummy_;
342 while (w != nullptr && w->write_group == nullptr) {
343 if (w->no_slowdown) {
344 prev->link_older = w->link_older;
345 w->status = Status::Incomplete("Write stall");
346 SetState(w, STATE_COMPLETED);
347 if (prev->link_older) {
348 prev->link_older->link_newer = prev;
349 }
350 w = prev->link_older;
351 } else {
352 prev = w;
353 w = w->link_older;
354 }
355 }
356 }
357
358 void WriteThread::EndWriteStall() {
359 MutexLock lock(&stall_mu_);
360
361 // Unlink write_stall_dummy_ from the write queue. This will unblock
362 // pending write threads to enqueue themselves
363 assert(newest_writer_.load(std::memory_order_relaxed) == &write_stall_dummy_);
364 assert(write_stall_dummy_.link_older != nullptr);
365 write_stall_dummy_.link_older->link_newer = write_stall_dummy_.link_newer;
366 newest_writer_.exchange(write_stall_dummy_.link_older);
367
368 // Wake up writers
369 stall_cv_.SignalAll();
370 }
371
372 static WriteThread::AdaptationContext jbg_ctx("JoinBatchGroup");
373 void WriteThread::JoinBatchGroup(Writer* w) {
374 TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Start", w);
375 assert(w->batch != nullptr);
376
377 bool linked_as_leader = LinkOne(w, &newest_writer_);
378
379 if (linked_as_leader) {
380 SetState(w, STATE_GROUP_LEADER);
381 }
382
383 TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w);
384
385 if (!linked_as_leader) {
386 /**
387 * Wait util:
388 * 1) An existing leader pick us as the new leader when it finishes
389 * 2) An existing leader pick us as its follewer and
390 * 2.1) finishes the memtable writes on our behalf
391 * 2.2) Or tell us to finish the memtable writes in pralallel
392 * 3) (pipelined write) An existing leader pick us as its follower and
393 * finish book-keeping and WAL write for us, enqueue us as pending
394 * memtable writer, and
395 * 3.1) we become memtable writer group leader, or
396 * 3.2) an existing memtable writer group leader tell us to finish memtable
397 * writes in parallel.
398 */
399 TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:BeganWaiting", w);
400 AwaitState(w, STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER |
401 STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
402 &jbg_ctx);
403 TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w);
404 }
405 }
406
407 size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader,
408 WriteGroup* write_group) {
409 assert(leader->link_older == nullptr);
410 assert(leader->batch != nullptr);
411 assert(write_group != nullptr);
412
413 size_t size = WriteBatchInternal::ByteSize(leader->batch);
414
415 // Allow the group to grow up to a maximum size, but if the
416 // original write is small, limit the growth so we do not slow
417 // down the small write too much.
418 size_t max_size = max_write_batch_group_size_bytes;
419 const uint64_t min_batch_size_bytes = max_write_batch_group_size_bytes / 8;
420 if (size <= min_batch_size_bytes) {
421 max_size = size + min_batch_size_bytes;
422 }
423
424 leader->write_group = write_group;
425 write_group->leader = leader;
426 write_group->last_writer = leader;
427 write_group->size = 1;
428 Writer* newest_writer = newest_writer_.load(std::memory_order_acquire);
429
430 // This is safe regardless of any db mutex status of the caller. Previous
431 // calls to ExitAsGroupLeader either didn't call CreateMissingNewerLinks
432 // (they emptied the list and then we added ourself as leader) or had to
433 // explicitly wake us up (the list was non-empty when we added ourself,
434 // so we have already received our MarkJoined).
435 CreateMissingNewerLinks(newest_writer);
436
437 // Tricky. Iteration start (leader) is exclusive and finish
438 // (newest_writer) is inclusive. Iteration goes from old to new.
439 Writer* w = leader;
440 while (w != newest_writer) {
441 w = w->link_newer;
442
443 if (w->sync && !leader->sync) {
444 // Do not include a sync write into a batch handled by a non-sync write.
445 break;
446 }
447
448 if (w->no_slowdown != leader->no_slowdown) {
449 // Do not mix writes that are ok with delays with the ones that
450 // request fail on delays.
451 break;
452 }
453
454 if (w->disable_wal != leader->disable_wal) {
455 // Do not mix writes that enable WAL with the ones whose
456 // WAL disabled.
457 break;
458 }
459
460 if (w->batch == nullptr) {
461 // Do not include those writes with nullptr batch. Those are not writes,
462 // those are something else. They want to be alone
463 break;
464 }
465
466 if (w->callback != nullptr && !w->callback->AllowWriteBatching()) {
467 // dont batch writes that don't want to be batched
468 break;
469 }
470
471 auto batch_size = WriteBatchInternal::ByteSize(w->batch);
472 if (size + batch_size > max_size) {
473 // Do not make batch too big
474 break;
475 }
476
477 w->write_group = write_group;
478 size += batch_size;
479 write_group->last_writer = w;
480 write_group->size++;
481 }
482 TEST_SYNC_POINT_CALLBACK("WriteThread::EnterAsBatchGroupLeader:End", w);
483 return size;
484 }
485
486 void WriteThread::EnterAsMemTableWriter(Writer* leader,
487 WriteGroup* write_group) {
488 assert(leader != nullptr);
489 assert(leader->link_older == nullptr);
490 assert(leader->batch != nullptr);
491 assert(write_group != nullptr);
492
493 size_t size = WriteBatchInternal::ByteSize(leader->batch);
494
495 // Allow the group to grow up to a maximum size, but if the
496 // original write is small, limit the growth so we do not slow
497 // down the small write too much.
498 size_t max_size = max_write_batch_group_size_bytes;
499 const uint64_t min_batch_size_bytes = max_write_batch_group_size_bytes / 8;
500 if (size <= min_batch_size_bytes) {
501 max_size = size + min_batch_size_bytes;
502 }
503
504 leader->write_group = write_group;
505 write_group->leader = leader;
506 write_group->size = 1;
507 Writer* last_writer = leader;
508
509 if (!allow_concurrent_memtable_write_ || !leader->batch->HasMerge()) {
510 Writer* newest_writer = newest_memtable_writer_.load();
511 CreateMissingNewerLinks(newest_writer);
512
513 Writer* w = leader;
514 while (w != newest_writer) {
515 w = w->link_newer;
516
517 if (w->batch == nullptr) {
518 break;
519 }
520
521 if (w->batch->HasMerge()) {
522 break;
523 }
524
525 if (!allow_concurrent_memtable_write_) {
526 auto batch_size = WriteBatchInternal::ByteSize(w->batch);
527 if (size + batch_size > max_size) {
528 // Do not make batch too big
529 break;
530 }
531 size += batch_size;
532 }
533
534 w->write_group = write_group;
535 last_writer = w;
536 write_group->size++;
537 }
538 }
539
540 write_group->last_writer = last_writer;
541 write_group->last_sequence =
542 last_writer->sequence + WriteBatchInternal::Count(last_writer->batch) - 1;
543 }
544
545 void WriteThread::ExitAsMemTableWriter(Writer* /*self*/,
546 WriteGroup& write_group) {
547 Writer* leader = write_group.leader;
548 Writer* last_writer = write_group.last_writer;
549
550 Writer* newest_writer = last_writer;
551 if (!newest_memtable_writer_.compare_exchange_strong(newest_writer,
552 nullptr)) {
553 CreateMissingNewerLinks(newest_writer);
554 Writer* next_leader = last_writer->link_newer;
555 assert(next_leader != nullptr);
556 next_leader->link_older = nullptr;
557 SetState(next_leader, STATE_MEMTABLE_WRITER_LEADER);
558 }
559 Writer* w = leader;
560 while (true) {
561 if (!write_group.status.ok()) {
562 w->status = write_group.status;
563 }
564 Writer* next = w->link_newer;
565 if (w != leader) {
566 SetState(w, STATE_COMPLETED);
567 }
568 if (w == last_writer) {
569 break;
570 }
571 w = next;
572 }
573 // Note that leader has to exit last, since it owns the write group.
574 SetState(leader, STATE_COMPLETED);
575 }
576
577 void WriteThread::LaunchParallelMemTableWriters(WriteGroup* write_group) {
578 assert(write_group != nullptr);
579 write_group->running.store(write_group->size);
580 for (auto w : *write_group) {
581 SetState(w, STATE_PARALLEL_MEMTABLE_WRITER);
582 }
583 }
584
585 static WriteThread::AdaptationContext cpmtw_ctx("CompleteParallelMemTableWriter");
586 // This method is called by both the leader and parallel followers
587 bool WriteThread::CompleteParallelMemTableWriter(Writer* w) {
588
589 auto* write_group = w->write_group;
590 if (!w->status.ok()) {
591 std::lock_guard<std::mutex> guard(write_group->leader->StateMutex());
592 write_group->status = w->status;
593 }
594
595 if (write_group->running-- > 1) {
596 // we're not the last one
597 AwaitState(w, STATE_COMPLETED, &cpmtw_ctx);
598 return false;
599 }
600 // else we're the last parallel worker and should perform exit duties.
601 w->status = write_group->status;
602 return true;
603 }
604
605 void WriteThread::ExitAsBatchGroupFollower(Writer* w) {
606 auto* write_group = w->write_group;
607
608 assert(w->state == STATE_PARALLEL_MEMTABLE_WRITER);
609 assert(write_group->status.ok());
610 ExitAsBatchGroupLeader(*write_group, write_group->status);
611 assert(w->status.ok());
612 assert(w->state == STATE_COMPLETED);
613 SetState(write_group->leader, STATE_COMPLETED);
614 }
615
616 static WriteThread::AdaptationContext eabgl_ctx("ExitAsBatchGroupLeader");
617 void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
618 Status status) {
619 Writer* leader = write_group.leader;
620 Writer* last_writer = write_group.last_writer;
621 assert(leader->link_older == nullptr);
622
623 // Propagate memtable write error to the whole group.
624 if (status.ok() && !write_group.status.ok()) {
625 status = write_group.status;
626 }
627
628 if (enable_pipelined_write_) {
629 // Notify writers don't write to memtable to exit.
630 for (Writer* w = last_writer; w != leader;) {
631 Writer* next = w->link_older;
632 w->status = status;
633 if (!w->ShouldWriteToMemtable()) {
634 CompleteFollower(w, write_group);
635 }
636 w = next;
637 }
638 if (!leader->ShouldWriteToMemtable()) {
639 CompleteLeader(write_group);
640 }
641
642 Writer* next_leader = nullptr;
643
644 // Look for next leader before we call LinkGroup. If there isn't
645 // pending writers, place a dummy writer at the tail of the queue
646 // so we know the boundary of the current write group.
647 Writer dummy;
648 Writer* expected = last_writer;
649 bool has_dummy = newest_writer_.compare_exchange_strong(expected, &dummy);
650 if (!has_dummy) {
651 // We find at least one pending writer when we insert dummy. We search
652 // for next leader from there.
653 next_leader = FindNextLeader(expected, last_writer);
654 assert(next_leader != nullptr && next_leader != last_writer);
655 }
656
657 // Link the ramaining of the group to memtable writer list.
658 //
659 // We have to link our group to memtable writer queue before wake up the
660 // next leader or set newest_writer_ to null, otherwise the next leader
661 // can run ahead of us and link to memtable writer queue before we do.
662 if (write_group.size > 0) {
663 if (LinkGroup(write_group, &newest_memtable_writer_)) {
664 // The leader can now be different from current writer.
665 SetState(write_group.leader, STATE_MEMTABLE_WRITER_LEADER);
666 }
667 }
668
669 // If we have inserted dummy in the queue, remove it now and check if there
670 // are pending writer join the queue since we insert the dummy. If so,
671 // look for next leader again.
672 if (has_dummy) {
673 assert(next_leader == nullptr);
674 expected = &dummy;
675 bool has_pending_writer =
676 !newest_writer_.compare_exchange_strong(expected, nullptr);
677 if (has_pending_writer) {
678 next_leader = FindNextLeader(expected, &dummy);
679 assert(next_leader != nullptr && next_leader != &dummy);
680 }
681 }
682
683 if (next_leader != nullptr) {
684 next_leader->link_older = nullptr;
685 SetState(next_leader, STATE_GROUP_LEADER);
686 }
687 AwaitState(leader, STATE_MEMTABLE_WRITER_LEADER |
688 STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
689 &eabgl_ctx);
690 } else {
691 Writer* head = newest_writer_.load(std::memory_order_acquire);
692 if (head != last_writer ||
693 !newest_writer_.compare_exchange_strong(head, nullptr)) {
694 // Either w wasn't the head during the load(), or it was the head
695 // during the load() but somebody else pushed onto the list before
696 // we did the compare_exchange_strong (causing it to fail). In the
697 // latter case compare_exchange_strong has the effect of re-reading
698 // its first param (head). No need to retry a failing CAS, because
699 // only a departing leader (which we are at the moment) can remove
700 // nodes from the list.
701 assert(head != last_writer);
702
703 // After walking link_older starting from head (if not already done)
704 // we will be able to traverse w->link_newer below. This function
705 // can only be called from an active leader, only a leader can
706 // clear newest_writer_, we didn't, and only a clear newest_writer_
707 // could cause the next leader to start their work without a call
708 // to MarkJoined, so we can definitely conclude that no other leader
709 // work is going on here (with or without db mutex).
710 CreateMissingNewerLinks(head);
711 assert(last_writer->link_newer->link_older == last_writer);
712 last_writer->link_newer->link_older = nullptr;
713
714 // Next leader didn't self-identify, because newest_writer_ wasn't
715 // nullptr when they enqueued (we were definitely enqueued before them
716 // and are still in the list). That means leader handoff occurs when
717 // we call MarkJoined
718 SetState(last_writer->link_newer, STATE_GROUP_LEADER);
719 }
720 // else nobody else was waiting, although there might already be a new
721 // leader now
722
723 while (last_writer != leader) {
724 last_writer->status = status;
725 // we need to read link_older before calling SetState, because as soon
726 // as it is marked committed the other thread's Await may return and
727 // deallocate the Writer.
728 auto next = last_writer->link_older;
729 SetState(last_writer, STATE_COMPLETED);
730
731 last_writer = next;
732 }
733 }
734 }
735
736 static WriteThread::AdaptationContext eu_ctx("EnterUnbatched");
737 void WriteThread::EnterUnbatched(Writer* w, InstrumentedMutex* mu) {
738 assert(w != nullptr && w->batch == nullptr);
739 mu->Unlock();
740 bool linked_as_leader = LinkOne(w, &newest_writer_);
741 if (!linked_as_leader) {
742 TEST_SYNC_POINT("WriteThread::EnterUnbatched:Wait");
743 // Last leader will not pick us as a follower since our batch is nullptr
744 AwaitState(w, STATE_GROUP_LEADER, &eu_ctx);
745 }
746 if (enable_pipelined_write_) {
747 WaitForMemTableWriters();
748 }
749 mu->Lock();
750 }
751
752 void WriteThread::ExitUnbatched(Writer* w) {
753 assert(w != nullptr);
754 Writer* newest_writer = w;
755 if (!newest_writer_.compare_exchange_strong(newest_writer, nullptr)) {
756 CreateMissingNewerLinks(newest_writer);
757 Writer* next_leader = w->link_newer;
758 assert(next_leader != nullptr);
759 next_leader->link_older = nullptr;
760 SetState(next_leader, STATE_GROUP_LEADER);
761 }
762 }
763
764 static WriteThread::AdaptationContext wfmw_ctx("WaitForMemTableWriters");
765 void WriteThread::WaitForMemTableWriters() {
766 assert(enable_pipelined_write_);
767 if (newest_memtable_writer_.load() == nullptr) {
768 return;
769 }
770 Writer w;
771 if (!LinkOne(&w, &newest_memtable_writer_)) {
772 AwaitState(&w, STATE_MEMTABLE_WRITER_LEADER, &wfmw_ctx);
773 }
774 newest_memtable_writer_.store(nullptr);
775 }
776
777 } // namespace ROCKSDB_NAMESPACE