]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | |
6 | #pragma once | |
7 | ||
8 | #include <assert.h> | |
9 | #include <stdint.h> | |
10 | #include <atomic> | |
11 | #include <chrono> | |
12 | #include <condition_variable> | |
13 | #include <mutex> | |
14 | #include <type_traits> | |
15 | #include <vector> | |
16 | ||
11fdf7f2 TL |
17 | #include "db/dbformat.h" |
18 | #include "db/pre_release_callback.h" | |
7c673cae FG |
19 | #include "db/write_callback.h" |
20 | #include "monitoring/instrumented_mutex.h" | |
21 | #include "rocksdb/options.h" | |
22 | #include "rocksdb/status.h" | |
23 | #include "rocksdb/types.h" | |
24 | #include "rocksdb/write_batch.h" | |
25 | #include "util/autovector.h" | |
26 | ||
f67539c2 | 27 | namespace ROCKSDB_NAMESPACE { |
7c673cae FG |
28 | |
29 | class WriteThread { | |
30 | public: | |
31 | enum State : uint8_t { | |
32 | // The initial state of a writer. This is a Writer that is | |
33 | // waiting in JoinBatchGroup. This state can be left when another | |
34 | // thread informs the waiter that it has become a group leader | |
35 | // (-> STATE_GROUP_LEADER), when a leader that has chosen to be | |
36 | // non-parallel informs a follower that its writes have been committed | |
37 | // (-> STATE_COMPLETED), or when a leader that has chosen to perform | |
38 | // updates in parallel and needs this Writer to apply its batch (-> | |
39 | // STATE_PARALLEL_FOLLOWER). | |
40 | STATE_INIT = 1, | |
41 | ||
42 | // The state used to inform a waiting Writer that it has become the | |
43 | // leader, and it should now build a write batch group. Tricky: | |
44 | // this state is not used if newest_writer_ is empty when a writer | |
45 | // enqueues itself, because there is no need to wait (or even to | |
46 | // create the mutex and condvar used to wait) in that case. This is | |
47 | // a terminal state unless the leader chooses to make this a parallel | |
48 | // batch, in which case the last parallel worker to finish will move | |
49 | // the leader to STATE_COMPLETED. | |
50 | STATE_GROUP_LEADER = 2, | |
51 | ||
11fdf7f2 TL |
52 | // The state used to inform a waiting writer that it has become the |
53 | // leader of memtable writer group. The leader will either write | |
54 | // memtable for the whole group, or launch a parallel group write | |
55 | // to memtable by calling LaunchParallelMemTableWrite. | |
56 | STATE_MEMTABLE_WRITER_LEADER = 4, | |
57 | ||
58 | // The state used to inform a waiting writer that it has become a | |
59 | // parallel memtable writer. It can be the group leader who launch the | |
60 | // parallel writer group, or one of the followers. The writer should then | |
61 | // apply its batch to the memtable concurrently and call | |
62 | // CompleteParallelMemTableWriter. | |
63 | STATE_PARALLEL_MEMTABLE_WRITER = 8, | |
7c673cae FG |
64 | |
65 | // A follower whose writes have been applied, or a parallel leader | |
66 | // whose followers have all finished their work. This is a terminal | |
67 | // state. | |
11fdf7f2 | 68 | STATE_COMPLETED = 16, |
7c673cae FG |
69 | |
70 | // A state indicating that the thread may be waiting using StateMutex() | |
71 | // and StateCondVar() | |
11fdf7f2 | 72 | STATE_LOCKED_WAITING = 32, |
7c673cae FG |
73 | }; |
74 | ||
75 | struct Writer; | |
76 | ||
11fdf7f2 TL |
77 | struct WriteGroup { |
78 | Writer* leader = nullptr; | |
79 | Writer* last_writer = nullptr; | |
7c673cae FG |
80 | SequenceNumber last_sequence; |
81 | // before running goes to zero, status needs leader->StateMutex() | |
82 | Status status; | |
11fdf7f2 TL |
83 | std::atomic<size_t> running; |
84 | size_t size = 0; | |
85 | ||
86 | struct Iterator { | |
87 | Writer* writer; | |
88 | Writer* last_writer; | |
89 | ||
90 | explicit Iterator(Writer* w, Writer* last) | |
91 | : writer(w), last_writer(last) {} | |
92 | ||
93 | Writer* operator*() const { return writer; } | |
94 | ||
95 | Iterator& operator++() { | |
96 | assert(writer != nullptr); | |
97 | if (writer == last_writer) { | |
98 | writer = nullptr; | |
99 | } else { | |
100 | writer = writer->link_newer; | |
101 | } | |
102 | return *this; | |
103 | } | |
104 | ||
105 | bool operator!=(const Iterator& other) const { | |
106 | return writer != other.writer; | |
107 | } | |
108 | }; | |
109 | ||
110 | Iterator begin() const { return Iterator(leader, last_writer); } | |
111 | Iterator end() const { return Iterator(nullptr, nullptr); } | |
7c673cae FG |
112 | }; |
113 | ||
114 | // Information kept for every waiting writer. | |
115 | struct Writer { | |
116 | WriteBatch* batch; | |
117 | bool sync; | |
118 | bool no_slowdown; | |
119 | bool disable_wal; | |
120 | bool disable_memtable; | |
11fdf7f2 TL |
121 | size_t batch_cnt; // if non-zero, number of sub-batches in the write batch |
122 | PreReleaseCallback* pre_release_callback; | |
7c673cae FG |
123 | uint64_t log_used; // log number that this batch was inserted into |
124 | uint64_t log_ref; // log number that memtable insert should reference | |
7c673cae FG |
125 | WriteCallback* callback; |
126 | bool made_waitable; // records lazy construction of mutex and cv | |
127 | std::atomic<uint8_t> state; // write under StateMutex() or pre-link | |
11fdf7f2 | 128 | WriteGroup* write_group; |
7c673cae | 129 | SequenceNumber sequence; // the sequence number to use for the first key |
494da23a | 130 | Status status; |
7c673cae | 131 | Status callback_status; // status returned by callback->Callback() |
11fdf7f2 | 132 | |
7c673cae FG |
133 | std::aligned_storage<sizeof(std::mutex)>::type state_mutex_bytes; |
134 | std::aligned_storage<sizeof(std::condition_variable)>::type state_cv_bytes; | |
135 | Writer* link_older; // read/write only before linking, or as leader | |
136 | Writer* link_newer; // lazy, read/write only before linking, or as leader | |
137 | ||
138 | Writer() | |
139 | : batch(nullptr), | |
140 | sync(false), | |
141 | no_slowdown(false), | |
142 | disable_wal(false), | |
143 | disable_memtable(false), | |
11fdf7f2 TL |
144 | batch_cnt(0), |
145 | pre_release_callback(nullptr), | |
7c673cae FG |
146 | log_used(0), |
147 | log_ref(0), | |
7c673cae FG |
148 | callback(nullptr), |
149 | made_waitable(false), | |
150 | state(STATE_INIT), | |
11fdf7f2 TL |
151 | write_group(nullptr), |
152 | sequence(kMaxSequenceNumber), | |
7c673cae FG |
153 | link_older(nullptr), |
154 | link_newer(nullptr) {} | |
155 | ||
156 | Writer(const WriteOptions& write_options, WriteBatch* _batch, | |
11fdf7f2 TL |
157 | WriteCallback* _callback, uint64_t _log_ref, bool _disable_memtable, |
158 | size_t _batch_cnt = 0, | |
159 | PreReleaseCallback* _pre_release_callback = nullptr) | |
7c673cae FG |
160 | : batch(_batch), |
161 | sync(write_options.sync), | |
162 | no_slowdown(write_options.no_slowdown), | |
163 | disable_wal(write_options.disableWAL), | |
164 | disable_memtable(_disable_memtable), | |
11fdf7f2 TL |
165 | batch_cnt(_batch_cnt), |
166 | pre_release_callback(_pre_release_callback), | |
7c673cae FG |
167 | log_used(0), |
168 | log_ref(_log_ref), | |
7c673cae FG |
169 | callback(_callback), |
170 | made_waitable(false), | |
171 | state(STATE_INIT), | |
11fdf7f2 TL |
172 | write_group(nullptr), |
173 | sequence(kMaxSequenceNumber), | |
7c673cae FG |
174 | link_older(nullptr), |
175 | link_newer(nullptr) {} | |
176 | ||
177 | ~Writer() { | |
178 | if (made_waitable) { | |
179 | StateMutex().~mutex(); | |
180 | StateCV().~condition_variable(); | |
181 | } | |
182 | } | |
183 | ||
184 | bool CheckCallback(DB* db) { | |
185 | if (callback != nullptr) { | |
186 | callback_status = callback->Callback(db); | |
187 | } | |
188 | return callback_status.ok(); | |
189 | } | |
190 | ||
191 | void CreateMutex() { | |
192 | if (!made_waitable) { | |
193 | // Note that made_waitable is tracked separately from state | |
194 | // transitions, because we can't atomically create the mutex and | |
195 | // link into the list. | |
196 | made_waitable = true; | |
197 | new (&state_mutex_bytes) std::mutex; | |
198 | new (&state_cv_bytes) std::condition_variable; | |
199 | } | |
200 | } | |
201 | ||
202 | // returns the aggregate status of this Writer | |
203 | Status FinalStatus() { | |
204 | if (!status.ok()) { | |
205 | // a non-ok memtable write status takes presidence | |
206 | assert(callback == nullptr || callback_status.ok()); | |
207 | return status; | |
208 | } else if (!callback_status.ok()) { | |
209 | // if the callback failed then that is the status we want | |
210 | // because a memtable insert should not have been attempted | |
211 | assert(callback != nullptr); | |
212 | assert(status.ok()); | |
213 | return callback_status; | |
214 | } else { | |
215 | // if there is no callback then we only care about | |
216 | // the memtable insert status | |
217 | assert(callback == nullptr || callback_status.ok()); | |
218 | return status; | |
219 | } | |
220 | } | |
221 | ||
222 | bool CallbackFailed() { | |
223 | return (callback != nullptr) && !callback_status.ok(); | |
224 | } | |
225 | ||
226 | bool ShouldWriteToMemtable() { | |
11fdf7f2 | 227 | return status.ok() && !CallbackFailed() && !disable_memtable; |
7c673cae FG |
228 | } |
229 | ||
11fdf7f2 TL |
230 | bool ShouldWriteToWAL() { |
231 | return status.ok() && !CallbackFailed() && !disable_wal; | |
232 | } | |
7c673cae FG |
233 | |
234 | // No other mutexes may be acquired while holding StateMutex(), it is | |
235 | // always last in the order | |
236 | std::mutex& StateMutex() { | |
237 | assert(made_waitable); | |
238 | return *static_cast<std::mutex*>(static_cast<void*>(&state_mutex_bytes)); | |
239 | } | |
240 | ||
241 | std::condition_variable& StateCV() { | |
242 | assert(made_waitable); | |
243 | return *static_cast<std::condition_variable*>( | |
244 | static_cast<void*>(&state_cv_bytes)); | |
245 | } | |
246 | }; | |
247 | ||
11fdf7f2 TL |
248 | struct AdaptationContext { |
249 | const char* name; | |
250 | std::atomic<int32_t> value; | |
251 | ||
252 | explicit AdaptationContext(const char* name0) : name(name0), value(0) {} | |
253 | }; | |
254 | ||
255 | explicit WriteThread(const ImmutableDBOptions& db_options); | |
256 | ||
257 | virtual ~WriteThread() = default; | |
7c673cae FG |
258 | |
259 | // IMPORTANT: None of the methods in this class rely on the db mutex | |
260 | // for correctness. All of the methods except JoinBatchGroup and | |
261 | // EnterUnbatched may be called either with or without the db mutex held. | |
262 | // Correctness is maintained by ensuring that only a single thread is | |
263 | // a leader at a time. | |
264 | ||
265 | // Registers w as ready to become part of a batch group, waits until the | |
266 | // caller should perform some work, and returns the current state of the | |
267 | // writer. If w has become the leader of a write batch group, returns | |
268 | // STATE_GROUP_LEADER. If w has been made part of a sequential batch | |
269 | // group and the leader has performed the write, returns STATE_DONE. | |
270 | // If w has been made part of a parallel batch group and is responsible | |
271 | // for updating the memtable, returns STATE_PARALLEL_FOLLOWER. | |
272 | // | |
273 | // The db mutex SHOULD NOT be held when calling this function, because | |
274 | // it will block. | |
275 | // | |
276 | // Writer* w: Writer to be executed as part of a batch group | |
277 | void JoinBatchGroup(Writer* w); | |
278 | ||
279 | // Constructs a write batch group led by leader, which should be a | |
280 | // Writer passed to JoinBatchGroup on the current thread. | |
281 | // | |
11fdf7f2 TL |
282 | // Writer* leader: Writer that is STATE_GROUP_LEADER |
283 | // WriteGroup* write_group: Out-param of group members | |
284 | // returns: Total batch group byte size | |
285 | size_t EnterAsBatchGroupLeader(Writer* leader, WriteGroup* write_group); | |
286 | ||
287 | // Unlinks the Writer-s in a batch group, wakes up the non-leaders, | |
288 | // and wakes up the next leader (if any). | |
289 | // | |
290 | // WriteGroup* write_group: the write group | |
291 | // Status status: Status of write operation | |
292 | void ExitAsBatchGroupLeader(WriteGroup& write_group, Status status); | |
293 | ||
294 | // Exit batch group on behalf of batch group leader. | |
295 | void ExitAsBatchGroupFollower(Writer* w); | |
296 | ||
297 | // Constructs a write batch group led by leader from newest_memtable_writers_ | |
298 | // list. The leader should either write memtable for the whole group and | |
299 | // call ExitAsMemTableWriter, or launch parallel memtable write through | |
300 | // LaunchParallelMemTableWriters. | |
301 | void EnterAsMemTableWriter(Writer* leader, WriteGroup* write_grup); | |
302 | ||
303 | // Memtable writer group leader, or the last finished writer in a parallel | |
304 | // write group, exit from the newest_memtable_writers_ list, and wake up | |
305 | // the next leader if needed. | |
306 | void ExitAsMemTableWriter(Writer* self, WriteGroup& write_group); | |
7c673cae FG |
307 | |
308 | // Causes JoinBatchGroup to return STATE_PARALLEL_FOLLOWER for all of the | |
309 | // non-leader members of this write batch group. Sets Writer::sequence | |
310 | // before waking them up. | |
311 | // | |
11fdf7f2 TL |
312 | // WriteGroup* write_group: Extra state used to coordinate the parallel add |
313 | void LaunchParallelMemTableWriters(WriteGroup* write_group); | |
7c673cae FG |
314 | |
315 | // Reports the completion of w's batch to the parallel group leader, and | |
316 | // waits for the rest of the parallel batch to complete. Returns true | |
317 | // if this thread is the last to complete, and hence should advance | |
318 | // the sequence number and then call EarlyExitParallelGroup, false if | |
319 | // someone else has already taken responsibility for that. | |
11fdf7f2 | 320 | bool CompleteParallelMemTableWriter(Writer* w); |
7c673cae FG |
321 | |
322 | // Waits for all preceding writers (unlocking mu while waiting), then | |
323 | // registers w as the currently proceeding writer. | |
324 | // | |
325 | // Writer* w: A Writer not eligible for batching | |
326 | // InstrumentedMutex* mu: The db mutex, to unlock while waiting | |
327 | // REQUIRES: db mutex held | |
328 | void EnterUnbatched(Writer* w, InstrumentedMutex* mu); | |
329 | ||
330 | // Completes a Writer begun with EnterUnbatched, unblocking subsequent | |
331 | // writers. | |
332 | void ExitUnbatched(Writer* w); | |
333 | ||
11fdf7f2 TL |
334 | // Wait for all parallel memtable writers to finish, in case pipelined |
335 | // write is enabled. | |
336 | void WaitForMemTableWriters(); | |
7c673cae | 337 | |
11fdf7f2 TL |
338 | SequenceNumber UpdateLastSequence(SequenceNumber sequence) { |
339 | if (sequence > last_sequence_) { | |
340 | last_sequence_ = sequence; | |
341 | } | |
342 | return last_sequence_; | |
343 | } | |
344 | ||
345 | // Insert a dummy writer at the tail of the write queue to indicate a write | |
346 | // stall, and fail any writers in the queue with no_slowdown set to true | |
347 | void BeginWriteStall(); | |
348 | ||
349 | // Remove the dummy writer and wake up waiting writers | |
350 | void EndWriteStall(); | |
7c673cae FG |
351 | |
352 | private: | |
11fdf7f2 TL |
353 | // See AwaitState. |
354 | const uint64_t max_yield_usec_; | |
355 | const uint64_t slow_yield_usec_; | |
356 | ||
357 | // Allow multiple writers write to memtable concurrently. | |
358 | const bool allow_concurrent_memtable_write_; | |
7c673cae | 359 | |
11fdf7f2 TL |
360 | // Enable pipelined write to WAL and memtable. |
361 | const bool enable_pipelined_write_; | |
362 | ||
f67539c2 TL |
363 | // The maximum limit of number of bytes that are written in a single batch |
364 | // of WAL or memtable write. It is followed when the leader write size | |
365 | // is larger than 1/8 of this limit. | |
366 | const uint64_t max_write_batch_group_size_bytes; | |
367 | ||
11fdf7f2 TL |
368 | // Points to the newest pending writer. Only leader can remove |
369 | // elements, adding can be done lock-free by anybody. | |
7c673cae FG |
370 | std::atomic<Writer*> newest_writer_; |
371 | ||
11fdf7f2 TL |
372 | // Points to the newest pending memtable writer. Used only when pipelined |
373 | // write is enabled. | |
374 | std::atomic<Writer*> newest_memtable_writer_; | |
375 | ||
376 | // The last sequence that have been consumed by a writer. The sequence | |
377 | // is not necessary visible to reads because the writer can be ongoing. | |
378 | SequenceNumber last_sequence_; | |
379 | ||
380 | // A dummy writer to indicate a write stall condition. This will be inserted | |
381 | // at the tail of the writer queue by the leader, so newer writers can just | |
382 | // check for this and bail | |
383 | Writer write_stall_dummy_; | |
384 | ||
385 | // Mutex and condvar for writers to block on a write stall. During a write | |
386 | // stall, writers with no_slowdown set to false will wait on this rather | |
387 | // on the writer queue | |
388 | port::Mutex stall_mu_; | |
389 | port::CondVar stall_cv_; | |
390 | ||
7c673cae FG |
391 | // Waits for w->state & goal_mask using w->StateMutex(). Returns |
392 | // the state that satisfies goal_mask. | |
393 | uint8_t BlockingAwaitState(Writer* w, uint8_t goal_mask); | |
394 | ||
395 | // Blocks until w->state & goal_mask, returning the state value | |
396 | // that satisfied the predicate. Uses ctx to adaptively use | |
397 | // std::this_thread::yield() to avoid mutex overheads. ctx should be | |
398 | // a context-dependent static. | |
399 | uint8_t AwaitState(Writer* w, uint8_t goal_mask, AdaptationContext* ctx); | |
400 | ||
11fdf7f2 | 401 | // Set writer state and wake the writer up if it is waiting. |
7c673cae FG |
402 | void SetState(Writer* w, uint8_t new_state); |
403 | ||
11fdf7f2 TL |
404 | // Links w into the newest_writer list. Return true if w was linked directly |
405 | // into the leader position. Safe to call from multiple threads without | |
406 | // external locking. | |
407 | bool LinkOne(Writer* w, std::atomic<Writer*>* newest_writer); | |
408 | ||
409 | // Link write group into the newest_writer list as a whole, while keeping the | |
410 | // order of the writers unchanged. Return true if the group was linked | |
411 | // directly into the leader position. | |
412 | bool LinkGroup(WriteGroup& write_group, std::atomic<Writer*>* newest_writer); | |
7c673cae FG |
413 | |
414 | // Computes any missing link_newer links. Should not be called | |
415 | // concurrently with itself. | |
416 | void CreateMissingNewerLinks(Writer* head); | |
11fdf7f2 TL |
417 | |
418 | // Starting from a pending writer, follow link_older to search for next | |
419 | // leader, until we hit boundary. | |
420 | Writer* FindNextLeader(Writer* pending_writer, Writer* boundary); | |
421 | ||
422 | // Set the leader in write_group to completed state and remove it from the | |
423 | // write group. | |
424 | void CompleteLeader(WriteGroup& write_group); | |
425 | ||
426 | // Set a follower in write_group to completed state and remove it from the | |
427 | // write group. | |
428 | void CompleteFollower(Writer* w, WriteGroup& write_group); | |
7c673cae FG |
429 | }; |
430 | ||
f67539c2 | 431 | } // namespace ROCKSDB_NAMESPACE |