1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
12 #include <condition_variable>
14 #include <type_traits>
17 #include "db/dbformat.h"
18 #include "db/pre_release_callback.h"
19 #include "db/write_callback.h"
20 #include "monitoring/instrumented_mutex.h"
21 #include "rocksdb/options.h"
22 #include "rocksdb/status.h"
23 #include "rocksdb/types.h"
24 #include "rocksdb/write_batch.h"
25 #include "util/autovector.h"
31 enum State
: uint8_t {
32 // The initial state of a writer. This is a Writer that is
33 // waiting in JoinBatchGroup. This state can be left when another
34 // thread informs the waiter that it has become a group leader
35 // (-> STATE_GROUP_LEADER), when a leader that has chosen to be
36 // non-parallel informs a follower that its writes have been committed
37 // (-> STATE_COMPLETED), or when a leader that has chosen to perform
38 // updates in parallel and needs this Writer to apply its batch (->
39 // STATE_PARALLEL_FOLLOWER).
42 // The state used to inform a waiting Writer that it has become the
43 // leader, and it should now build a write batch group. Tricky:
44 // this state is not used if newest_writer_ is empty when a writer
45 // enqueues itself, because there is no need to wait (or even to
46 // create the mutex and condvar used to wait) in that case. This is
47 // a terminal state unless the leader chooses to make this a parallel
48 // batch, in which case the last parallel worker to finish will move
49 // the leader to STATE_COMPLETED.
50 STATE_GROUP_LEADER
= 2,
52 // The state used to inform a waiting writer that it has become the
53 // leader of memtable writer group. The leader will either write
54 // memtable for the whole group, or launch a parallel group write
55 // to memtable by calling LaunchParallelMemTableWrite.
56 STATE_MEMTABLE_WRITER_LEADER
= 4,
58 // The state used to inform a waiting writer that it has become a
59 // parallel memtable writer. It can be the group leader who launch the
60 // parallel writer group, or one of the followers. The writer should then
61 // apply its batch to the memtable concurrently and call
62 // CompleteParallelMemTableWriter.
63 STATE_PARALLEL_MEMTABLE_WRITER
= 8,
65 // A follower whose writes have been applied, or a parallel leader
66 // whose followers have all finished their work. This is a terminal
70 // A state indicating that the thread may be waiting using StateMutex()
72 STATE_LOCKED_WAITING
= 32,
78 Writer
* leader
= nullptr;
79 Writer
* last_writer
= nullptr;
80 SequenceNumber last_sequence
;
81 // before running goes to zero, status needs leader->StateMutex()
83 std::atomic
<size_t> running
;
90 explicit Iterator(Writer
* w
, Writer
* last
)
91 : writer(w
), last_writer(last
) {}
93 Writer
* operator*() const { return writer
; }
95 Iterator
& operator++() {
96 assert(writer
!= nullptr);
97 if (writer
== last_writer
) {
100 writer
= writer
->link_newer
;
105 bool operator!=(const Iterator
& other
) const {
106 return writer
!= other
.writer
;
110 Iterator
begin() const { return Iterator(leader
, last_writer
); }
111 Iterator
end() const { return Iterator(nullptr, nullptr); }
114 // Information kept for every waiting writer.
120 bool disable_memtable
;
121 size_t batch_cnt
; // if non-zero, number of sub-batches in the write batch
122 PreReleaseCallback
* pre_release_callback
;
123 uint64_t log_used
; // log number that this batch was inserted into
124 uint64_t log_ref
; // log number that memtable insert should reference
125 WriteCallback
* callback
;
126 bool made_waitable
; // records lazy construction of mutex and cv
127 std::atomic
<uint8_t> state
; // write under StateMutex() or pre-link
128 WriteGroup
* write_group
;
129 SequenceNumber sequence
; // the sequence number to use for the first key
131 Status callback_status
; // status returned by callback->Callback()
133 std::aligned_storage
<sizeof(std::mutex
)>::type state_mutex_bytes
;
134 std::aligned_storage
<sizeof(std::condition_variable
)>::type state_cv_bytes
;
135 Writer
* link_older
; // read/write only before linking, or as leader
136 Writer
* link_newer
; // lazy, read/write only before linking, or as leader
143 disable_memtable(false),
145 pre_release_callback(nullptr),
149 made_waitable(false),
151 write_group(nullptr),
152 sequence(kMaxSequenceNumber
),
154 link_newer(nullptr) {}
156 Writer(const WriteOptions
& write_options
, WriteBatch
* _batch
,
157 WriteCallback
* _callback
, uint64_t _log_ref
, bool _disable_memtable
,
158 size_t _batch_cnt
= 0,
159 PreReleaseCallback
* _pre_release_callback
= nullptr)
161 sync(write_options
.sync
),
162 no_slowdown(write_options
.no_slowdown
),
163 disable_wal(write_options
.disableWAL
),
164 disable_memtable(_disable_memtable
),
165 batch_cnt(_batch_cnt
),
166 pre_release_callback(_pre_release_callback
),
170 made_waitable(false),
172 write_group(nullptr),
173 sequence(kMaxSequenceNumber
),
175 link_newer(nullptr) {}
179 StateMutex().~mutex();
180 StateCV().~condition_variable();
184 bool CheckCallback(DB
* db
) {
185 if (callback
!= nullptr) {
186 callback_status
= callback
->Callback(db
);
188 return callback_status
.ok();
192 if (!made_waitable
) {
193 // Note that made_waitable is tracked separately from state
194 // transitions, because we can't atomically create the mutex and
195 // link into the list.
196 made_waitable
= true;
197 new (&state_mutex_bytes
) std::mutex
;
198 new (&state_cv_bytes
) std::condition_variable
;
202 // returns the aggregate status of this Writer
203 Status
FinalStatus() {
205 // a non-ok memtable write status takes presidence
206 assert(callback
== nullptr || callback_status
.ok());
208 } else if (!callback_status
.ok()) {
209 // if the callback failed then that is the status we want
210 // because a memtable insert should not have been attempted
211 assert(callback
!= nullptr);
213 return callback_status
;
215 // if there is no callback then we only care about
216 // the memtable insert status
217 assert(callback
== nullptr || callback_status
.ok());
222 bool CallbackFailed() {
223 return (callback
!= nullptr) && !callback_status
.ok();
226 bool ShouldWriteToMemtable() {
227 return status
.ok() && !CallbackFailed() && !disable_memtable
;
230 bool ShouldWriteToWAL() {
231 return status
.ok() && !CallbackFailed() && !disable_wal
;
234 // No other mutexes may be acquired while holding StateMutex(), it is
235 // always last in the order
236 std::mutex
& StateMutex() {
237 assert(made_waitable
);
238 return *static_cast<std::mutex
*>(static_cast<void*>(&state_mutex_bytes
));
241 std::condition_variable
& StateCV() {
242 assert(made_waitable
);
243 return *static_cast<std::condition_variable
*>(
244 static_cast<void*>(&state_cv_bytes
));
248 struct AdaptationContext
{
250 std::atomic
<int32_t> value
;
252 explicit AdaptationContext(const char* name0
) : name(name0
), value(0) {}
255 explicit WriteThread(const ImmutableDBOptions
& db_options
);
257 virtual ~WriteThread() = default;
259 // IMPORTANT: None of the methods in this class rely on the db mutex
260 // for correctness. All of the methods except JoinBatchGroup and
261 // EnterUnbatched may be called either with or without the db mutex held.
262 // Correctness is maintained by ensuring that only a single thread is
263 // a leader at a time.
265 // Registers w as ready to become part of a batch group, waits until the
266 // caller should perform some work, and returns the current state of the
267 // writer. If w has become the leader of a write batch group, returns
268 // STATE_GROUP_LEADER. If w has been made part of a sequential batch
269 // group and the leader has performed the write, returns STATE_DONE.
270 // If w has been made part of a parallel batch group and is responsible
271 // for updating the memtable, returns STATE_PARALLEL_FOLLOWER.
273 // The db mutex SHOULD NOT be held when calling this function, because
276 // Writer* w: Writer to be executed as part of a batch group
277 void JoinBatchGroup(Writer
* w
);
279 // Constructs a write batch group led by leader, which should be a
280 // Writer passed to JoinBatchGroup on the current thread.
282 // Writer* leader: Writer that is STATE_GROUP_LEADER
283 // WriteGroup* write_group: Out-param of group members
284 // returns: Total batch group byte size
285 size_t EnterAsBatchGroupLeader(Writer
* leader
, WriteGroup
* write_group
);
287 // Unlinks the Writer-s in a batch group, wakes up the non-leaders,
288 // and wakes up the next leader (if any).
290 // WriteGroup* write_group: the write group
291 // Status status: Status of write operation
292 void ExitAsBatchGroupLeader(WriteGroup
& write_group
, Status status
);
294 // Exit batch group on behalf of batch group leader.
295 void ExitAsBatchGroupFollower(Writer
* w
);
297 // Constructs a write batch group led by leader from newest_memtable_writers_
298 // list. The leader should either write memtable for the whole group and
299 // call ExitAsMemTableWriter, or launch parallel memtable write through
300 // LaunchParallelMemTableWriters.
301 void EnterAsMemTableWriter(Writer
* leader
, WriteGroup
* write_grup
);
303 // Memtable writer group leader, or the last finished writer in a parallel
304 // write group, exit from the newest_memtable_writers_ list, and wake up
305 // the next leader if needed.
306 void ExitAsMemTableWriter(Writer
* self
, WriteGroup
& write_group
);
308 // Causes JoinBatchGroup to return STATE_PARALLEL_FOLLOWER for all of the
309 // non-leader members of this write batch group. Sets Writer::sequence
310 // before waking them up.
312 // WriteGroup* write_group: Extra state used to coordinate the parallel add
313 void LaunchParallelMemTableWriters(WriteGroup
* write_group
);
315 // Reports the completion of w's batch to the parallel group leader, and
316 // waits for the rest of the parallel batch to complete. Returns true
317 // if this thread is the last to complete, and hence should advance
318 // the sequence number and then call EarlyExitParallelGroup, false if
319 // someone else has already taken responsibility for that.
320 bool CompleteParallelMemTableWriter(Writer
* w
);
322 // Waits for all preceding writers (unlocking mu while waiting), then
323 // registers w as the currently proceeding writer.
325 // Writer* w: A Writer not eligible for batching
326 // InstrumentedMutex* mu: The db mutex, to unlock while waiting
327 // REQUIRES: db mutex held
328 void EnterUnbatched(Writer
* w
, InstrumentedMutex
* mu
);
330 // Completes a Writer begun with EnterUnbatched, unblocking subsequent
332 void ExitUnbatched(Writer
* w
);
334 // Wait for all parallel memtable writers to finish, in case pipelined
336 void WaitForMemTableWriters();
338 SequenceNumber
UpdateLastSequence(SequenceNumber sequence
) {
339 if (sequence
> last_sequence_
) {
340 last_sequence_
= sequence
;
342 return last_sequence_
;
345 // Insert a dummy writer at the tail of the write queue to indicate a write
346 // stall, and fail any writers in the queue with no_slowdown set to true
347 void BeginWriteStall();
349 // Remove the dummy writer and wake up waiting writers
350 void EndWriteStall();
354 const uint64_t max_yield_usec_
;
355 const uint64_t slow_yield_usec_
;
357 // Allow multiple writers write to memtable concurrently.
358 const bool allow_concurrent_memtable_write_
;
360 // Enable pipelined write to WAL and memtable.
361 const bool enable_pipelined_write_
;
363 // Points to the newest pending writer. Only leader can remove
364 // elements, adding can be done lock-free by anybody.
365 std::atomic
<Writer
*> newest_writer_
;
367 // Points to the newest pending memtable writer. Used only when pipelined
369 std::atomic
<Writer
*> newest_memtable_writer_
;
371 // The last sequence that have been consumed by a writer. The sequence
372 // is not necessary visible to reads because the writer can be ongoing.
373 SequenceNumber last_sequence_
;
375 // A dummy writer to indicate a write stall condition. This will be inserted
376 // at the tail of the writer queue by the leader, so newer writers can just
377 // check for this and bail
378 Writer write_stall_dummy_
;
380 // Mutex and condvar for writers to block on a write stall. During a write
381 // stall, writers with no_slowdown set to false will wait on this rather
382 // on the writer queue
383 port::Mutex stall_mu_
;
384 port::CondVar stall_cv_
;
386 // Waits for w->state & goal_mask using w->StateMutex(). Returns
387 // the state that satisfies goal_mask.
388 uint8_t BlockingAwaitState(Writer
* w
, uint8_t goal_mask
);
390 // Blocks until w->state & goal_mask, returning the state value
391 // that satisfied the predicate. Uses ctx to adaptively use
392 // std::this_thread::yield() to avoid mutex overheads. ctx should be
393 // a context-dependent static.
394 uint8_t AwaitState(Writer
* w
, uint8_t goal_mask
, AdaptationContext
* ctx
);
396 // Set writer state and wake the writer up if it is waiting.
397 void SetState(Writer
* w
, uint8_t new_state
);
399 // Links w into the newest_writer list. Return true if w was linked directly
400 // into the leader position. Safe to call from multiple threads without
402 bool LinkOne(Writer
* w
, std::atomic
<Writer
*>* newest_writer
);
404 // Link write group into the newest_writer list as a whole, while keeping the
405 // order of the writers unchanged. Return true if the group was linked
406 // directly into the leader position.
407 bool LinkGroup(WriteGroup
& write_group
, std::atomic
<Writer
*>* newest_writer
);
409 // Computes any missing link_newer links. Should not be called
410 // concurrently with itself.
411 void CreateMissingNewerLinks(Writer
* head
);
413 // Starting from a pending writer, follow link_older to search for next
414 // leader, until we hit boundary.
415 Writer
* FindNextLeader(Writer
* pending_writer
, Writer
* boundary
);
417 // Set the leader in write_group to completed state and remove it from the
419 void CompleteLeader(WriteGroup
& write_group
);
421 // Set a follower in write_group to completed state and remove it from the
423 void CompleteFollower(Writer
* w
, WriteGroup
& write_group
);
426 } // namespace rocksdb