]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | |
6 | #include "db/write_thread.h" | |
1e59de90 | 7 | |
7c673cae FG |
8 | #include <chrono> |
9 | #include <thread> | |
1e59de90 | 10 | |
7c673cae | 11 | #include "db/column_family.h" |
11fdf7f2 | 12 | #include "monitoring/perf_context_imp.h" |
7c673cae | 13 | #include "port/port.h" |
f67539c2 | 14 | #include "test_util/sync_point.h" |
7c673cae | 15 | #include "util/random.h" |
7c673cae | 16 | |
f67539c2 | 17 | namespace ROCKSDB_NAMESPACE { |
7c673cae | 18 | |
11fdf7f2 TL |
19 | WriteThread::WriteThread(const ImmutableDBOptions& db_options) |
20 | : max_yield_usec_(db_options.enable_write_thread_adaptive_yield | |
21 | ? db_options.write_thread_max_yield_usec | |
22 | : 0), | |
23 | slow_yield_usec_(db_options.write_thread_slow_yield_usec), | |
24 | allow_concurrent_memtable_write_( | |
25 | db_options.allow_concurrent_memtable_write), | |
26 | enable_pipelined_write_(db_options.enable_pipelined_write), | |
f67539c2 TL |
27 | max_write_batch_group_size_bytes( |
28 | db_options.max_write_batch_group_size_bytes), | |
11fdf7f2 TL |
29 | newest_writer_(nullptr), |
30 | newest_memtable_writer_(nullptr), | |
31 | last_sequence_(0), | |
32 | write_stall_dummy_(), | |
33 | stall_mu_(), | |
34 | stall_cv_(&stall_mu_) {} | |
7c673cae FG |
35 | |
36 | uint8_t WriteThread::BlockingAwaitState(Writer* w, uint8_t goal_mask) { | |
37 | // We're going to block. Lazily create the mutex. We guarantee | |
38 | // propagation of this construction to the waker via the | |
39 | // STATE_LOCKED_WAITING state. The waker won't try to touch the mutex | |
40 | // or the condvar unless they CAS away the STATE_LOCKED_WAITING that | |
41 | // we install below. | |
42 | w->CreateMutex(); | |
43 | ||
44 | auto state = w->state.load(std::memory_order_acquire); | |
45 | assert(state != STATE_LOCKED_WAITING); | |
46 | if ((state & goal_mask) == 0 && | |
47 | w->state.compare_exchange_strong(state, STATE_LOCKED_WAITING)) { | |
48 | // we have permission (and an obligation) to use StateMutex | |
49 | std::unique_lock<std::mutex> guard(w->StateMutex()); | |
50 | w->StateCV().wait(guard, [w] { | |
51 | return w->state.load(std::memory_order_relaxed) != STATE_LOCKED_WAITING; | |
52 | }); | |
53 | state = w->state.load(std::memory_order_relaxed); | |
54 | } | |
55 | // else tricky. Goal is met or CAS failed. In the latter case the waker | |
56 | // must have changed the state, and compare_exchange_strong has updated | |
57 | // our local variable with the new one. At the moment WriteThread never | |
58 | // waits for a transition across intermediate states, so we know that | |
59 | // since a state change has occurred the goal must have been met. | |
60 | assert((state & goal_mask) != 0); | |
61 | return state; | |
62 | } | |
63 | ||
64 | uint8_t WriteThread::AwaitState(Writer* w, uint8_t goal_mask, | |
65 | AdaptationContext* ctx) { | |
f67539c2 | 66 | uint8_t state = 0; |
7c673cae | 67 | |
11fdf7f2 TL |
68 | // 1. Busy loop using "pause" for 1 micro sec |
69 | // 2. Else SOMETIMES busy loop using "yield" for 100 micro sec (default) | |
70 | // 3. Else blocking wait | |
71 | ||
7c673cae FG |
72 | // On a modern Xeon each loop takes about 7 nanoseconds (most of which |
73 | // is the effect of the pause instruction), so 200 iterations is a bit | |
74 | // more than a microsecond. This is long enough that waits longer than | |
75 | // this can amortize the cost of accessing the clock and yielding. | |
76 | for (uint32_t tries = 0; tries < 200; ++tries) { | |
77 | state = w->state.load(std::memory_order_acquire); | |
78 | if ((state & goal_mask) != 0) { | |
79 | return state; | |
80 | } | |
81 | port::AsmVolatilePause(); | |
82 | } | |
83 | ||
11fdf7f2 TL |
84 | // This is below the fast path, so that the stat is zero when all writes are |
85 | // from the same thread. | |
86 | PERF_TIMER_GUARD(write_thread_wait_nanos); | |
87 | ||
7c673cae FG |
88 | // If we're only going to end up waiting a short period of time, |
89 | // it can be a lot more efficient to call std::this_thread::yield() | |
90 | // in a loop than to block in StateMutex(). For reference, on my 4.0 | |
91 | // SELinux test server with support for syscall auditing enabled, the | |
92 | // minimum latency between FUTEX_WAKE to returning from FUTEX_WAIT is | |
93 | // 2.7 usec, and the average is more like 10 usec. That can be a big | |
94 | // drag on RockDB's single-writer design. Of course, spinning is a | |
95 | // bad idea if other threads are waiting to run or if we're going to | |
96 | // wait for a long time. How do we decide? | |
97 | // | |
98 | // We break waiting into 3 categories: short-uncontended, | |
99 | // short-contended, and long. If we had an oracle, then we would always | |
100 | // spin for short-uncontended, always block for long, and our choice for | |
101 | // short-contended might depend on whether we were trying to optimize | |
102 | // RocksDB throughput or avoid being greedy with system resources. | |
103 | // | |
104 | // Bucketing into short or long is easy by measuring elapsed time. | |
105 | // Differentiating short-uncontended from short-contended is a bit | |
106 | // trickier, but not too bad. We could look for involuntary context | |
107 | // switches using getrusage(RUSAGE_THREAD, ..), but it's less work | |
108 | // (portability code and CPU) to just look for yield calls that take | |
109 | // longer than we expect. sched_yield() doesn't actually result in any | |
110 | // context switch overhead if there are no other runnable processes | |
111 | // on the current core, in which case it usually takes less than | |
112 | // a microsecond. | |
113 | // | |
114 | // There are two primary tunables here: the threshold between "short" | |
115 | // and "long" waits, and the threshold at which we suspect that a yield | |
116 | // is slow enough to indicate we should probably block. If these | |
117 | // thresholds are chosen well then CPU-bound workloads that don't | |
118 | // have more threads than cores will experience few context switches | |
119 | // (voluntary or involuntary), and the total number of context switches | |
120 | // (voluntary and involuntary) will not be dramatically larger (maybe | |
121 | // 2x) than the number of voluntary context switches that occur when | |
122 | // --max_yield_wait_micros=0. | |
123 | // | |
124 | // There's another constant, which is the number of slow yields we will | |
125 | // tolerate before reversing our previous decision. Solitary slow | |
126 | // yields are pretty common (low-priority small jobs ready to run), | |
127 | // so this should be at least 2. We set this conservatively to 3 so | |
128 | // that we can also immediately schedule a ctx adaptation, rather than | |
129 | // waiting for the next update_ctx. | |
130 | ||
131 | const size_t kMaxSlowYieldsWhileSpinning = 3; | |
132 | ||
11fdf7f2 TL |
133 | // Whether the yield approach has any credit in this context. The credit is |
134 | // added by yield being succesfull before timing out, and decreased otherwise. | |
135 | auto& yield_credit = ctx->value; | |
136 | // Update the yield_credit based on sample runs or right after a hard failure | |
7c673cae | 137 | bool update_ctx = false; |
11fdf7f2 | 138 | // Should we reinforce the yield credit |
7c673cae | 139 | bool would_spin_again = false; |
11fdf7f2 TL |
140 | // The samling base for updating the yeild credit. The sampling rate would be |
141 | // 1/sampling_base. | |
142 | const int sampling_base = 256; | |
7c673cae FG |
143 | |
144 | if (max_yield_usec_ > 0) { | |
11fdf7f2 | 145 | update_ctx = Random::GetTLSInstance()->OneIn(sampling_base); |
7c673cae | 146 | |
11fdf7f2 | 147 | if (update_ctx || yield_credit.load(std::memory_order_relaxed) >= 0) { |
7c673cae FG |
148 | // we're updating the adaptation statistics, or spinning has > |
149 | // 50% chance of being shorter than max_yield_usec_ and causing no | |
150 | // involuntary context switches | |
151 | auto spin_begin = std::chrono::steady_clock::now(); | |
152 | ||
153 | // this variable doesn't include the final yield (if any) that | |
154 | // causes the goal to be met | |
155 | size_t slow_yield_count = 0; | |
156 | ||
157 | auto iter_begin = spin_begin; | |
158 | while ((iter_begin - spin_begin) <= | |
159 | std::chrono::microseconds(max_yield_usec_)) { | |
160 | std::this_thread::yield(); | |
161 | ||
162 | state = w->state.load(std::memory_order_acquire); | |
163 | if ((state & goal_mask) != 0) { | |
164 | // success | |
165 | would_spin_again = true; | |
166 | break; | |
167 | } | |
168 | ||
169 | auto now = std::chrono::steady_clock::now(); | |
170 | if (now == iter_begin || | |
171 | now - iter_begin >= std::chrono::microseconds(slow_yield_usec_)) { | |
172 | // conservatively count it as a slow yield if our clock isn't | |
173 | // accurate enough to measure the yield duration | |
174 | ++slow_yield_count; | |
175 | if (slow_yield_count >= kMaxSlowYieldsWhileSpinning) { | |
11fdf7f2 | 176 | // Not just one ivcsw, but several. Immediately update yield_credit |
7c673cae FG |
177 | // and fall back to blocking |
178 | update_ctx = true; | |
179 | break; | |
180 | } | |
181 | } | |
182 | iter_begin = now; | |
183 | } | |
184 | } | |
185 | } | |
186 | ||
187 | if ((state & goal_mask) == 0) { | |
494da23a | 188 | TEST_SYNC_POINT_CALLBACK("WriteThread::AwaitState:BlockingWaiting", w); |
7c673cae FG |
189 | state = BlockingAwaitState(w, goal_mask); |
190 | } | |
191 | ||
192 | if (update_ctx) { | |
11fdf7f2 TL |
193 | // Since our update is sample based, it is ok if a thread overwrites the |
194 | // updates by other threads. Thus the update does not have to be atomic. | |
195 | auto v = yield_credit.load(std::memory_order_relaxed); | |
7c673cae FG |
196 | // fixed point exponential decay with decay constant 1/1024, with +1 |
197 | // and -1 scaled to avoid overflow for int32_t | |
11fdf7f2 TL |
198 | // |
199 | // On each update the positive credit is decayed by a facor of 1/1024 (i.e., | |
200 | // 0.1%). If the sampled yield was successful, the credit is also increased | |
201 | // by X. Setting X=2^17 ensures that the credit never exceeds | |
202 | // 2^17*2^10=2^27, which is lower than 2^31 the upperbound of int32_t. Same | |
203 | // logic applies to negative credits. | |
204 | v = v - (v / 1024) + (would_spin_again ? 1 : -1) * 131072; | |
205 | yield_credit.store(v, std::memory_order_relaxed); | |
7c673cae FG |
206 | } |
207 | ||
208 | assert((state & goal_mask) != 0); | |
209 | return state; | |
210 | } | |
211 | ||
212 | void WriteThread::SetState(Writer* w, uint8_t new_state) { | |
1e59de90 | 213 | assert(w); |
7c673cae FG |
214 | auto state = w->state.load(std::memory_order_acquire); |
215 | if (state == STATE_LOCKED_WAITING || | |
216 | !w->state.compare_exchange_strong(state, new_state)) { | |
217 | assert(state == STATE_LOCKED_WAITING); | |
218 | ||
219 | std::lock_guard<std::mutex> guard(w->StateMutex()); | |
220 | assert(w->state.load(std::memory_order_relaxed) != new_state); | |
221 | w->state.store(new_state, std::memory_order_relaxed); | |
222 | w->StateCV().notify_one(); | |
223 | } | |
224 | } | |
225 | ||
11fdf7f2 TL |
226 | bool WriteThread::LinkOne(Writer* w, std::atomic<Writer*>* newest_writer) { |
227 | assert(newest_writer != nullptr); | |
7c673cae | 228 | assert(w->state == STATE_INIT); |
11fdf7f2 | 229 | Writer* writers = newest_writer->load(std::memory_order_relaxed); |
7c673cae | 230 | while (true) { |
11fdf7f2 TL |
231 | // If write stall in effect, and w->no_slowdown is not true, |
232 | // block here until stall is cleared. If its true, then return | |
233 | // immediately | |
234 | if (writers == &write_stall_dummy_) { | |
235 | if (w->no_slowdown) { | |
236 | w->status = Status::Incomplete("Write stall"); | |
237 | SetState(w, STATE_COMPLETED); | |
238 | return false; | |
239 | } | |
240 | // Since no_slowdown is false, wait here to be notified of the write | |
241 | // stall clearing | |
242 | { | |
243 | MutexLock lock(&stall_mu_); | |
244 | writers = newest_writer->load(std::memory_order_relaxed); | |
245 | if (writers == &write_stall_dummy_) { | |
1e59de90 | 246 | TEST_SYNC_POINT_CALLBACK("WriteThread::WriteStall::Wait", w); |
11fdf7f2 TL |
247 | stall_cv_.Wait(); |
248 | // Load newest_writers_ again since it may have changed | |
249 | writers = newest_writer->load(std::memory_order_relaxed); | |
250 | continue; | |
251 | } | |
7c673cae | 252 | } |
11fdf7f2 TL |
253 | } |
254 | w->link_older = writers; | |
255 | if (newest_writer->compare_exchange_weak(writers, w)) { | |
256 | return (writers == nullptr); | |
257 | } | |
258 | } | |
259 | } | |
260 | ||
261 | bool WriteThread::LinkGroup(WriteGroup& write_group, | |
262 | std::atomic<Writer*>* newest_writer) { | |
263 | assert(newest_writer != nullptr); | |
264 | Writer* leader = write_group.leader; | |
265 | Writer* last_writer = write_group.last_writer; | |
266 | Writer* w = last_writer; | |
267 | while (true) { | |
268 | // Unset link_newer pointers to make sure when we call | |
269 | // CreateMissingNewerLinks later it create all missing links. | |
270 | w->link_newer = nullptr; | |
271 | w->write_group = nullptr; | |
272 | if (w == leader) { | |
273 | break; | |
274 | } | |
275 | w = w->link_older; | |
276 | } | |
277 | Writer* newest = newest_writer->load(std::memory_order_relaxed); | |
278 | while (true) { | |
279 | leader->link_older = newest; | |
280 | if (newest_writer->compare_exchange_weak(newest, last_writer)) { | |
281 | return (newest == nullptr); | |
7c673cae FG |
282 | } |
283 | } | |
284 | } | |
285 | ||
286 | void WriteThread::CreateMissingNewerLinks(Writer* head) { | |
287 | while (true) { | |
288 | Writer* next = head->link_older; | |
289 | if (next == nullptr || next->link_newer != nullptr) { | |
290 | assert(next == nullptr || next->link_newer == head); | |
291 | break; | |
292 | } | |
293 | next->link_newer = head; | |
294 | head = next; | |
295 | } | |
296 | } | |
297 | ||
11fdf7f2 TL |
298 | void WriteThread::CompleteLeader(WriteGroup& write_group) { |
299 | assert(write_group.size > 0); | |
300 | Writer* leader = write_group.leader; | |
301 | if (write_group.size == 1) { | |
302 | write_group.leader = nullptr; | |
303 | write_group.last_writer = nullptr; | |
304 | } else { | |
305 | assert(leader->link_newer != nullptr); | |
306 | leader->link_newer->link_older = nullptr; | |
307 | write_group.leader = leader->link_newer; | |
308 | } | |
309 | write_group.size -= 1; | |
310 | SetState(leader, STATE_COMPLETED); | |
311 | } | |
7c673cae | 312 | |
11fdf7f2 TL |
313 | void WriteThread::CompleteFollower(Writer* w, WriteGroup& write_group) { |
314 | assert(write_group.size > 1); | |
315 | assert(w != write_group.leader); | |
316 | if (w == write_group.last_writer) { | |
317 | w->link_older->link_newer = nullptr; | |
318 | write_group.last_writer = w->link_older; | |
319 | } else { | |
320 | w->link_older->link_newer = w->link_newer; | |
321 | w->link_newer->link_older = w->link_older; | |
322 | } | |
323 | write_group.size -= 1; | |
324 | SetState(w, STATE_COMPLETED); | |
325 | } | |
326 | ||
327 | void WriteThread::BeginWriteStall() { | |
328 | LinkOne(&write_stall_dummy_, &newest_writer_); | |
329 | ||
330 | // Walk writer list until w->write_group != nullptr. The current write group | |
331 | // will not have a mix of slowdown/no_slowdown, so its ok to stop at that | |
332 | // point | |
333 | Writer* w = write_stall_dummy_.link_older; | |
334 | Writer* prev = &write_stall_dummy_; | |
335 | while (w != nullptr && w->write_group == nullptr) { | |
336 | if (w->no_slowdown) { | |
337 | prev->link_older = w->link_older; | |
338 | w->status = Status::Incomplete("Write stall"); | |
339 | SetState(w, STATE_COMPLETED); | |
20effc67 TL |
340 | // Only update `link_newer` if it's already set. |
341 | // `CreateMissingNewerLinks()` will update the nullptr `link_newer` later, | |
342 | // which assumes the the first non-nullptr `link_newer` is the last | |
343 | // nullptr link in the writer list. | |
344 | // If `link_newer` is set here, `CreateMissingNewerLinks()` may stop | |
345 | // updating the whole list when it sees the first non nullptr link. | |
346 | if (prev->link_older && prev->link_older->link_newer) { | |
f67539c2 TL |
347 | prev->link_older->link_newer = prev; |
348 | } | |
11fdf7f2 TL |
349 | w = prev->link_older; |
350 | } else { | |
351 | prev = w; | |
352 | w = w->link_older; | |
353 | } | |
354 | } | |
355 | } | |
356 | ||
357 | void WriteThread::EndWriteStall() { | |
358 | MutexLock lock(&stall_mu_); | |
359 | ||
f67539c2 TL |
360 | // Unlink write_stall_dummy_ from the write queue. This will unblock |
361 | // pending write threads to enqueue themselves | |
11fdf7f2 | 362 | assert(newest_writer_.load(std::memory_order_relaxed) == &write_stall_dummy_); |
f67539c2 TL |
363 | assert(write_stall_dummy_.link_older != nullptr); |
364 | write_stall_dummy_.link_older->link_newer = write_stall_dummy_.link_newer; | |
11fdf7f2 TL |
365 | newest_writer_.exchange(write_stall_dummy_.link_older); |
366 | ||
367 | // Wake up writers | |
368 | stall_cv_.SignalAll(); | |
369 | } | |
370 | ||
371 | static WriteThread::AdaptationContext jbg_ctx("JoinBatchGroup"); | |
372 | void WriteThread::JoinBatchGroup(Writer* w) { | |
373 | TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Start", w); | |
7c673cae | 374 | assert(w->batch != nullptr); |
11fdf7f2 TL |
375 | |
376 | bool linked_as_leader = LinkOne(w, &newest_writer_); | |
377 | ||
378 | if (linked_as_leader) { | |
379 | SetState(w, STATE_GROUP_LEADER); | |
380 | } | |
7c673cae FG |
381 | |
382 | TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w); | |
1e59de90 | 383 | TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait2", w); |
7c673cae FG |
384 | |
385 | if (!linked_as_leader) { | |
386 | /** | |
387 | * Wait util: | |
388 | * 1) An existing leader pick us as the new leader when it finishes | |
11fdf7f2 | 389 | * 2) An existing leader pick us as its follewer and |
7c673cae | 390 | * 2.1) finishes the memtable writes on our behalf |
11fdf7f2 TL |
391 | * 2.2) Or tell us to finish the memtable writes in pralallel |
392 | * 3) (pipelined write) An existing leader pick us as its follower and | |
393 | * finish book-keeping and WAL write for us, enqueue us as pending | |
394 | * memtable writer, and | |
395 | * 3.1) we become memtable writer group leader, or | |
396 | * 3.2) an existing memtable writer group leader tell us to finish memtable | |
397 | * writes in parallel. | |
7c673cae | 398 | */ |
11fdf7f2 | 399 | TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:BeganWaiting", w); |
1e59de90 TL |
400 | AwaitState(w, |
401 | STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER | | |
402 | STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED, | |
11fdf7f2 | 403 | &jbg_ctx); |
7c673cae FG |
404 | TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w); |
405 | } | |
406 | } | |
407 | ||
11fdf7f2 TL |
408 | size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader, |
409 | WriteGroup* write_group) { | |
7c673cae FG |
410 | assert(leader->link_older == nullptr); |
411 | assert(leader->batch != nullptr); | |
11fdf7f2 | 412 | assert(write_group != nullptr); |
7c673cae FG |
413 | |
414 | size_t size = WriteBatchInternal::ByteSize(leader->batch); | |
7c673cae FG |
415 | |
416 | // Allow the group to grow up to a maximum size, but if the | |
417 | // original write is small, limit the growth so we do not slow | |
418 | // down the small write too much. | |
f67539c2 TL |
419 | size_t max_size = max_write_batch_group_size_bytes; |
420 | const uint64_t min_batch_size_bytes = max_write_batch_group_size_bytes / 8; | |
421 | if (size <= min_batch_size_bytes) { | |
422 | max_size = size + min_batch_size_bytes; | |
7c673cae FG |
423 | } |
424 | ||
11fdf7f2 TL |
425 | leader->write_group = write_group; |
426 | write_group->leader = leader; | |
427 | write_group->last_writer = leader; | |
428 | write_group->size = 1; | |
7c673cae FG |
429 | Writer* newest_writer = newest_writer_.load(std::memory_order_acquire); |
430 | ||
431 | // This is safe regardless of any db mutex status of the caller. Previous | |
432 | // calls to ExitAsGroupLeader either didn't call CreateMissingNewerLinks | |
433 | // (they emptied the list and then we added ourself as leader) or had to | |
434 | // explicitly wake us up (the list was non-empty when we added ourself, | |
435 | // so we have already received our MarkJoined). | |
436 | CreateMissingNewerLinks(newest_writer); | |
437 | ||
438 | // Tricky. Iteration start (leader) is exclusive and finish | |
439 | // (newest_writer) is inclusive. Iteration goes from old to new. | |
440 | Writer* w = leader; | |
441 | while (w != newest_writer) { | |
20effc67 | 442 | assert(w->link_newer); |
7c673cae FG |
443 | w = w->link_newer; |
444 | ||
445 | if (w->sync && !leader->sync) { | |
446 | // Do not include a sync write into a batch handled by a non-sync write. | |
447 | break; | |
448 | } | |
449 | ||
450 | if (w->no_slowdown != leader->no_slowdown) { | |
451 | // Do not mix writes that are ok with delays with the ones that | |
452 | // request fail on delays. | |
453 | break; | |
454 | } | |
455 | ||
f67539c2 TL |
456 | if (w->disable_wal != leader->disable_wal) { |
457 | // Do not mix writes that enable WAL with the ones whose | |
7c673cae FG |
458 | // WAL disabled. |
459 | break; | |
460 | } | |
461 | ||
1e59de90 TL |
462 | if (w->protection_bytes_per_key != leader->protection_bytes_per_key) { |
463 | // Do not mix writes with different levels of integrity protection. | |
464 | break; | |
465 | } | |
466 | ||
467 | if (w->rate_limiter_priority != leader->rate_limiter_priority) { | |
468 | // Do not mix writes with different rate limiter priorities. | |
469 | break; | |
470 | } | |
471 | ||
7c673cae FG |
472 | if (w->batch == nullptr) { |
473 | // Do not include those writes with nullptr batch. Those are not writes, | |
474 | // those are something else. They want to be alone | |
475 | break; | |
476 | } | |
477 | ||
478 | if (w->callback != nullptr && !w->callback->AllowWriteBatching()) { | |
20effc67 | 479 | // don't batch writes that don't want to be batched |
7c673cae FG |
480 | break; |
481 | } | |
482 | ||
483 | auto batch_size = WriteBatchInternal::ByteSize(w->batch); | |
484 | if (size + batch_size > max_size) { | |
485 | // Do not make batch too big | |
486 | break; | |
487 | } | |
488 | ||
11fdf7f2 | 489 | w->write_group = write_group; |
7c673cae | 490 | size += batch_size; |
11fdf7f2 TL |
491 | write_group->last_writer = w; |
492 | write_group->size++; | |
7c673cae | 493 | } |
11fdf7f2 | 494 | TEST_SYNC_POINT_CALLBACK("WriteThread::EnterAsBatchGroupLeader:End", w); |
7c673cae FG |
495 | return size; |
496 | } | |
497 | ||
11fdf7f2 TL |
498 | void WriteThread::EnterAsMemTableWriter(Writer* leader, |
499 | WriteGroup* write_group) { | |
500 | assert(leader != nullptr); | |
501 | assert(leader->link_older == nullptr); | |
502 | assert(leader->batch != nullptr); | |
503 | assert(write_group != nullptr); | |
504 | ||
505 | size_t size = WriteBatchInternal::ByteSize(leader->batch); | |
506 | ||
507 | // Allow the group to grow up to a maximum size, but if the | |
508 | // original write is small, limit the growth so we do not slow | |
509 | // down the small write too much. | |
f67539c2 TL |
510 | size_t max_size = max_write_batch_group_size_bytes; |
511 | const uint64_t min_batch_size_bytes = max_write_batch_group_size_bytes / 8; | |
512 | if (size <= min_batch_size_bytes) { | |
513 | max_size = size + min_batch_size_bytes; | |
11fdf7f2 TL |
514 | } |
515 | ||
516 | leader->write_group = write_group; | |
517 | write_group->leader = leader; | |
518 | write_group->size = 1; | |
519 | Writer* last_writer = leader; | |
520 | ||
521 | if (!allow_concurrent_memtable_write_ || !leader->batch->HasMerge()) { | |
522 | Writer* newest_writer = newest_memtable_writer_.load(); | |
523 | CreateMissingNewerLinks(newest_writer); | |
7c673cae | 524 | |
11fdf7f2 TL |
525 | Writer* w = leader; |
526 | while (w != newest_writer) { | |
20effc67 | 527 | assert(w->link_newer); |
11fdf7f2 | 528 | w = w->link_newer; |
7c673cae | 529 | |
11fdf7f2 TL |
530 | if (w->batch == nullptr) { |
531 | break; | |
532 | } | |
533 | ||
534 | if (w->batch->HasMerge()) { | |
535 | break; | |
536 | } | |
7c673cae | 537 | |
11fdf7f2 TL |
538 | if (!allow_concurrent_memtable_write_) { |
539 | auto batch_size = WriteBatchInternal::ByteSize(w->batch); | |
540 | if (size + batch_size > max_size) { | |
541 | // Do not make batch too big | |
542 | break; | |
543 | } | |
544 | size += batch_size; | |
545 | } | |
546 | ||
547 | w->write_group = write_group; | |
548 | last_writer = w; | |
549 | write_group->size++; | |
7c673cae | 550 | } |
11fdf7f2 TL |
551 | } |
552 | ||
553 | write_group->last_writer = last_writer; | |
554 | write_group->last_sequence = | |
555 | last_writer->sequence + WriteBatchInternal::Count(last_writer->batch) - 1; | |
556 | } | |
557 | ||
558 | void WriteThread::ExitAsMemTableWriter(Writer* /*self*/, | |
559 | WriteGroup& write_group) { | |
560 | Writer* leader = write_group.leader; | |
561 | Writer* last_writer = write_group.last_writer; | |
562 | ||
563 | Writer* newest_writer = last_writer; | |
564 | if (!newest_memtable_writer_.compare_exchange_strong(newest_writer, | |
565 | nullptr)) { | |
566 | CreateMissingNewerLinks(newest_writer); | |
567 | Writer* next_leader = last_writer->link_newer; | |
568 | assert(next_leader != nullptr); | |
569 | next_leader->link_older = nullptr; | |
570 | SetState(next_leader, STATE_MEMTABLE_WRITER_LEADER); | |
571 | } | |
572 | Writer* w = leader; | |
573 | while (true) { | |
574 | if (!write_group.status.ok()) { | |
575 | w->status = write_group.status; | |
576 | } | |
577 | Writer* next = w->link_newer; | |
578 | if (w != leader) { | |
579 | SetState(w, STATE_COMPLETED); | |
580 | } | |
581 | if (w == last_writer) { | |
582 | break; | |
583 | } | |
20effc67 | 584 | assert(next); |
11fdf7f2 TL |
585 | w = next; |
586 | } | |
587 | // Note that leader has to exit last, since it owns the write group. | |
588 | SetState(leader, STATE_COMPLETED); | |
589 | } | |
7c673cae | 590 | |
11fdf7f2 TL |
591 | void WriteThread::LaunchParallelMemTableWriters(WriteGroup* write_group) { |
592 | assert(write_group != nullptr); | |
593 | write_group->running.store(write_group->size); | |
594 | for (auto w : *write_group) { | |
595 | SetState(w, STATE_PARALLEL_MEMTABLE_WRITER); | |
7c673cae FG |
596 | } |
597 | } | |
598 | ||
1e59de90 TL |
599 | static WriteThread::AdaptationContext cpmtw_ctx( |
600 | "CompleteParallelMemTableWriter"); | |
7c673cae | 601 | // This method is called by both the leader and parallel followers |
11fdf7f2 | 602 | bool WriteThread::CompleteParallelMemTableWriter(Writer* w) { |
11fdf7f2 | 603 | auto* write_group = w->write_group; |
7c673cae | 604 | if (!w->status.ok()) { |
11fdf7f2 TL |
605 | std::lock_guard<std::mutex> guard(write_group->leader->StateMutex()); |
606 | write_group->status = w->status; | |
7c673cae FG |
607 | } |
608 | ||
11fdf7f2 | 609 | if (write_group->running-- > 1) { |
7c673cae | 610 | // we're not the last one |
11fdf7f2 | 611 | AwaitState(w, STATE_COMPLETED, &cpmtw_ctx); |
7c673cae FG |
612 | return false; |
613 | } | |
614 | // else we're the last parallel worker and should perform exit duties. | |
11fdf7f2 | 615 | w->status = write_group->status; |
20effc67 TL |
616 | // Callers of this function must ensure w->status is checked. |
617 | write_group->status.PermitUncheckedError(); | |
7c673cae FG |
618 | return true; |
619 | } | |
620 | ||
621 | void WriteThread::ExitAsBatchGroupFollower(Writer* w) { | |
11fdf7f2 | 622 | auto* write_group = w->write_group; |
7c673cae | 623 | |
11fdf7f2 TL |
624 | assert(w->state == STATE_PARALLEL_MEMTABLE_WRITER); |
625 | assert(write_group->status.ok()); | |
626 | ExitAsBatchGroupLeader(*write_group, write_group->status); | |
7c673cae FG |
627 | assert(w->status.ok()); |
628 | assert(w->state == STATE_COMPLETED); | |
11fdf7f2 | 629 | SetState(write_group->leader, STATE_COMPLETED); |
7c673cae FG |
630 | } |
631 | ||
11fdf7f2 TL |
632 | static WriteThread::AdaptationContext eabgl_ctx("ExitAsBatchGroupLeader"); |
633 | void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group, | |
20effc67 | 634 | Status& status) { |
1e59de90 TL |
635 | TEST_SYNC_POINT_CALLBACK("WriteThread::ExitAsBatchGroupLeader:Start", |
636 | &write_group); | |
637 | ||
11fdf7f2 TL |
638 | Writer* leader = write_group.leader; |
639 | Writer* last_writer = write_group.last_writer; | |
7c673cae FG |
640 | assert(leader->link_older == nullptr); |
641 | ||
20effc67 TL |
642 | // If status is non-ok already, then write_group.status won't have the chance |
643 | // of being propagated to caller. | |
644 | if (!status.ok()) { | |
645 | write_group.status.PermitUncheckedError(); | |
646 | } | |
647 | ||
11fdf7f2 TL |
648 | // Propagate memtable write error to the whole group. |
649 | if (status.ok() && !write_group.status.ok()) { | |
650 | status = write_group.status; | |
651 | } | |
652 | ||
653 | if (enable_pipelined_write_) { | |
1e59de90 TL |
654 | // We insert a dummy Writer right before our current write_group. This |
655 | // allows us to unlink our write_group without the risk that a subsequent | |
656 | // writer becomes a new leader and might overtake us and add itself to the | |
657 | // memtable-writer-list before we can do so. This ensures that writers are | |
658 | // added to the memtable-writer-list in the exact same order in which they | |
659 | // were in the newest_writer list. | |
660 | // This must happen before completing the writers from our group to prevent | |
661 | // a race where the owning thread of one of these writers can start a new | |
662 | // write operation. | |
663 | Writer dummy; | |
664 | Writer* head = newest_writer_.load(std::memory_order_acquire); | |
665 | if (head != last_writer || | |
666 | !newest_writer_.compare_exchange_strong(head, &dummy)) { | |
667 | // Either last_writer wasn't the head during the load(), or it was the | |
668 | // head during the load() but somebody else pushed onto the list before | |
669 | // we did the compare_exchange_strong (causing it to fail). In the latter | |
670 | // case compare_exchange_strong has the effect of re-reading its first | |
671 | // param (head). No need to retry a failing CAS, because only a departing | |
672 | // leader (which we are at the moment) can remove nodes from the list. | |
673 | assert(head != last_writer); | |
674 | ||
675 | // After walking link_older starting from head (if not already done) we | |
676 | // will be able to traverse w->link_newer below. | |
677 | CreateMissingNewerLinks(head); | |
678 | assert(last_writer->link_newer != nullptr); | |
679 | last_writer->link_newer->link_older = &dummy; | |
680 | dummy.link_newer = last_writer->link_newer; | |
681 | } | |
682 | ||
683 | // Complete writers that don't write to memtable | |
11fdf7f2 TL |
684 | for (Writer* w = last_writer; w != leader;) { |
685 | Writer* next = w->link_older; | |
686 | w->status = status; | |
687 | if (!w->ShouldWriteToMemtable()) { | |
688 | CompleteFollower(w, write_group); | |
689 | } | |
690 | w = next; | |
691 | } | |
692 | if (!leader->ShouldWriteToMemtable()) { | |
693 | CompleteLeader(write_group); | |
694 | } | |
695 | ||
1e59de90 TL |
696 | TEST_SYNC_POINT_CALLBACK( |
697 | "WriteThread::ExitAsBatchGroupLeader:AfterCompleteWriters", | |
698 | &write_group); | |
11fdf7f2 | 699 | |
1e59de90 | 700 | // Link the remaining of the group to memtable writer list. |
11fdf7f2 TL |
701 | // We have to link our group to memtable writer queue before wake up the |
702 | // next leader or set newest_writer_ to null, otherwise the next leader | |
703 | // can run ahead of us and link to memtable writer queue before we do. | |
704 | if (write_group.size > 0) { | |
705 | if (LinkGroup(write_group, &newest_memtable_writer_)) { | |
706 | // The leader can now be different from current writer. | |
707 | SetState(write_group.leader, STATE_MEMTABLE_WRITER_LEADER); | |
708 | } | |
709 | } | |
710 | ||
1e59de90 TL |
711 | // Unlink the dummy writer from the list and identify the new leader |
712 | head = newest_writer_.load(std::memory_order_acquire); | |
713 | if (head != &dummy || | |
714 | !newest_writer_.compare_exchange_strong(head, nullptr)) { | |
715 | CreateMissingNewerLinks(head); | |
716 | Writer* new_leader = dummy.link_newer; | |
717 | assert(new_leader != nullptr); | |
718 | new_leader->link_older = nullptr; | |
719 | SetState(new_leader, STATE_GROUP_LEADER); | |
11fdf7f2 TL |
720 | } |
721 | ||
1e59de90 TL |
722 | AwaitState(leader, |
723 | STATE_MEMTABLE_WRITER_LEADER | STATE_PARALLEL_MEMTABLE_WRITER | | |
724 | STATE_COMPLETED, | |
11fdf7f2 TL |
725 | &eabgl_ctx); |
726 | } else { | |
727 | Writer* head = newest_writer_.load(std::memory_order_acquire); | |
728 | if (head != last_writer || | |
729 | !newest_writer_.compare_exchange_strong(head, nullptr)) { | |
1e59de90 TL |
730 | // Either last_writer wasn't the head during the load(), or it was the |
731 | // head during the load() but somebody else pushed onto the list before | |
11fdf7f2 TL |
732 | // we did the compare_exchange_strong (causing it to fail). In the |
733 | // latter case compare_exchange_strong has the effect of re-reading | |
734 | // its first param (head). No need to retry a failing CAS, because | |
735 | // only a departing leader (which we are at the moment) can remove | |
736 | // nodes from the list. | |
737 | assert(head != last_writer); | |
738 | ||
739 | // After walking link_older starting from head (if not already done) | |
740 | // we will be able to traverse w->link_newer below. This function | |
741 | // can only be called from an active leader, only a leader can | |
742 | // clear newest_writer_, we didn't, and only a clear newest_writer_ | |
743 | // could cause the next leader to start their work without a call | |
744 | // to MarkJoined, so we can definitely conclude that no other leader | |
745 | // work is going on here (with or without db mutex). | |
746 | CreateMissingNewerLinks(head); | |
1e59de90 | 747 | assert(last_writer->link_newer != nullptr); |
11fdf7f2 TL |
748 | assert(last_writer->link_newer->link_older == last_writer); |
749 | last_writer->link_newer->link_older = nullptr; | |
750 | ||
751 | // Next leader didn't self-identify, because newest_writer_ wasn't | |
752 | // nullptr when they enqueued (we were definitely enqueued before them | |
753 | // and are still in the list). That means leader handoff occurs when | |
754 | // we call MarkJoined | |
755 | SetState(last_writer->link_newer, STATE_GROUP_LEADER); | |
756 | } | |
757 | // else nobody else was waiting, although there might already be a new | |
758 | // leader now | |
759 | ||
760 | while (last_writer != leader) { | |
20effc67 | 761 | assert(last_writer); |
11fdf7f2 TL |
762 | last_writer->status = status; |
763 | // we need to read link_older before calling SetState, because as soon | |
764 | // as it is marked committed the other thread's Await may return and | |
765 | // deallocate the Writer. | |
766 | auto next = last_writer->link_older; | |
767 | SetState(last_writer, STATE_COMPLETED); | |
768 | ||
769 | last_writer = next; | |
770 | } | |
7c673cae FG |
771 | } |
772 | } | |
773 | ||
11fdf7f2 | 774 | static WriteThread::AdaptationContext eu_ctx("EnterUnbatched"); |
7c673cae | 775 | void WriteThread::EnterUnbatched(Writer* w, InstrumentedMutex* mu) { |
11fdf7f2 TL |
776 | assert(w != nullptr && w->batch == nullptr); |
777 | mu->Unlock(); | |
778 | bool linked_as_leader = LinkOne(w, &newest_writer_); | |
7c673cae | 779 | if (!linked_as_leader) { |
7c673cae FG |
780 | TEST_SYNC_POINT("WriteThread::EnterUnbatched:Wait"); |
781 | // Last leader will not pick us as a follower since our batch is nullptr | |
11fdf7f2 | 782 | AwaitState(w, STATE_GROUP_LEADER, &eu_ctx); |
7c673cae | 783 | } |
11fdf7f2 TL |
784 | if (enable_pipelined_write_) { |
785 | WaitForMemTableWriters(); | |
786 | } | |
787 | mu->Lock(); | |
7c673cae FG |
788 | } |
789 | ||
790 | void WriteThread::ExitUnbatched(Writer* w) { | |
11fdf7f2 TL |
791 | assert(w != nullptr); |
792 | Writer* newest_writer = w; | |
793 | if (!newest_writer_.compare_exchange_strong(newest_writer, nullptr)) { | |
794 | CreateMissingNewerLinks(newest_writer); | |
795 | Writer* next_leader = w->link_newer; | |
796 | assert(next_leader != nullptr); | |
797 | next_leader->link_older = nullptr; | |
798 | SetState(next_leader, STATE_GROUP_LEADER); | |
799 | } | |
800 | } | |
801 | ||
802 | static WriteThread::AdaptationContext wfmw_ctx("WaitForMemTableWriters"); | |
803 | void WriteThread::WaitForMemTableWriters() { | |
804 | assert(enable_pipelined_write_); | |
805 | if (newest_memtable_writer_.load() == nullptr) { | |
806 | return; | |
807 | } | |
808 | Writer w; | |
809 | if (!LinkOne(&w, &newest_memtable_writer_)) { | |
810 | AwaitState(&w, STATE_MEMTABLE_WRITER_LEADER, &wfmw_ctx); | |
811 | } | |
812 | newest_memtable_writer_.store(nullptr); | |
7c673cae FG |
813 | } |
814 | ||
f67539c2 | 815 | } // namespace ROCKSDB_NAMESPACE |