]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/write_thread.h
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / db / write_thread.h
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #pragma once
7
8 #include <assert.h>
9 #include <stdint.h>
10 #include <atomic>
11 #include <chrono>
12 #include <condition_variable>
13 #include <mutex>
14 #include <type_traits>
15 #include <vector>
16
17 #include "db/dbformat.h"
18 #include "db/pre_release_callback.h"
19 #include "db/write_callback.h"
20 #include "monitoring/instrumented_mutex.h"
21 #include "rocksdb/options.h"
22 #include "rocksdb/status.h"
23 #include "rocksdb/types.h"
24 #include "rocksdb/write_batch.h"
25 #include "util/autovector.h"
26
27 namespace rocksdb {
28
29 class WriteThread {
30 public:
31 enum State : uint8_t {
32 // The initial state of a writer. This is a Writer that is
33 // waiting in JoinBatchGroup. This state can be left when another
34 // thread informs the waiter that it has become a group leader
35 // (-> STATE_GROUP_LEADER), when a leader that has chosen to be
36 // non-parallel informs a follower that its writes have been committed
37 // (-> STATE_COMPLETED), or when a leader that has chosen to perform
38 // updates in parallel and needs this Writer to apply its batch (->
39 // STATE_PARALLEL_FOLLOWER).
40 STATE_INIT = 1,
41
42 // The state used to inform a waiting Writer that it has become the
43 // leader, and it should now build a write batch group. Tricky:
44 // this state is not used if newest_writer_ is empty when a writer
45 // enqueues itself, because there is no need to wait (or even to
46 // create the mutex and condvar used to wait) in that case. This is
47 // a terminal state unless the leader chooses to make this a parallel
48 // batch, in which case the last parallel worker to finish will move
49 // the leader to STATE_COMPLETED.
50 STATE_GROUP_LEADER = 2,
51
52 // The state used to inform a waiting writer that it has become the
53 // leader of memtable writer group. The leader will either write
54 // memtable for the whole group, or launch a parallel group write
55 // to memtable by calling LaunchParallelMemTableWrite.
56 STATE_MEMTABLE_WRITER_LEADER = 4,
57
58 // The state used to inform a waiting writer that it has become a
59 // parallel memtable writer. It can be the group leader who launch the
60 // parallel writer group, or one of the followers. The writer should then
61 // apply its batch to the memtable concurrently and call
62 // CompleteParallelMemTableWriter.
63 STATE_PARALLEL_MEMTABLE_WRITER = 8,
64
65 // A follower whose writes have been applied, or a parallel leader
66 // whose followers have all finished their work. This is a terminal
67 // state.
68 STATE_COMPLETED = 16,
69
70 // A state indicating that the thread may be waiting using StateMutex()
71 // and StateCondVar()
72 STATE_LOCKED_WAITING = 32,
73 };
74
75 struct Writer;
76
77 struct WriteGroup {
78 Writer* leader = nullptr;
79 Writer* last_writer = nullptr;
80 SequenceNumber last_sequence;
81 // before running goes to zero, status needs leader->StateMutex()
82 Status status;
83 std::atomic<size_t> running;
84 size_t size = 0;
85
86 struct Iterator {
87 Writer* writer;
88 Writer* last_writer;
89
90 explicit Iterator(Writer* w, Writer* last)
91 : writer(w), last_writer(last) {}
92
93 Writer* operator*() const { return writer; }
94
95 Iterator& operator++() {
96 assert(writer != nullptr);
97 if (writer == last_writer) {
98 writer = nullptr;
99 } else {
100 writer = writer->link_newer;
101 }
102 return *this;
103 }
104
105 bool operator!=(const Iterator& other) const {
106 return writer != other.writer;
107 }
108 };
109
110 Iterator begin() const { return Iterator(leader, last_writer); }
111 Iterator end() const { return Iterator(nullptr, nullptr); }
112 };
113
114 // Information kept for every waiting writer.
115 struct Writer {
116 WriteBatch* batch;
117 bool sync;
118 bool no_slowdown;
119 bool disable_wal;
120 bool disable_memtable;
121 size_t batch_cnt; // if non-zero, number of sub-batches in the write batch
122 PreReleaseCallback* pre_release_callback;
123 uint64_t log_used; // log number that this batch was inserted into
124 uint64_t log_ref; // log number that memtable insert should reference
125 WriteCallback* callback;
126 bool made_waitable; // records lazy construction of mutex and cv
127 std::atomic<uint8_t> state; // write under StateMutex() or pre-link
128 WriteGroup* write_group;
129 SequenceNumber sequence; // the sequence number to use for the first key
130 Status status;
131 Status callback_status; // status returned by callback->Callback()
132
133 std::aligned_storage<sizeof(std::mutex)>::type state_mutex_bytes;
134 std::aligned_storage<sizeof(std::condition_variable)>::type state_cv_bytes;
135 Writer* link_older; // read/write only before linking, or as leader
136 Writer* link_newer; // lazy, read/write only before linking, or as leader
137
138 Writer()
139 : batch(nullptr),
140 sync(false),
141 no_slowdown(false),
142 disable_wal(false),
143 disable_memtable(false),
144 batch_cnt(0),
145 pre_release_callback(nullptr),
146 log_used(0),
147 log_ref(0),
148 callback(nullptr),
149 made_waitable(false),
150 state(STATE_INIT),
151 write_group(nullptr),
152 sequence(kMaxSequenceNumber),
153 link_older(nullptr),
154 link_newer(nullptr) {}
155
156 Writer(const WriteOptions& write_options, WriteBatch* _batch,
157 WriteCallback* _callback, uint64_t _log_ref, bool _disable_memtable,
158 size_t _batch_cnt = 0,
159 PreReleaseCallback* _pre_release_callback = nullptr)
160 : batch(_batch),
161 sync(write_options.sync),
162 no_slowdown(write_options.no_slowdown),
163 disable_wal(write_options.disableWAL),
164 disable_memtable(_disable_memtable),
165 batch_cnt(_batch_cnt),
166 pre_release_callback(_pre_release_callback),
167 log_used(0),
168 log_ref(_log_ref),
169 callback(_callback),
170 made_waitable(false),
171 state(STATE_INIT),
172 write_group(nullptr),
173 sequence(kMaxSequenceNumber),
174 link_older(nullptr),
175 link_newer(nullptr) {}
176
177 ~Writer() {
178 if (made_waitable) {
179 StateMutex().~mutex();
180 StateCV().~condition_variable();
181 }
182 }
183
184 bool CheckCallback(DB* db) {
185 if (callback != nullptr) {
186 callback_status = callback->Callback(db);
187 }
188 return callback_status.ok();
189 }
190
191 void CreateMutex() {
192 if (!made_waitable) {
193 // Note that made_waitable is tracked separately from state
194 // transitions, because we can't atomically create the mutex and
195 // link into the list.
196 made_waitable = true;
197 new (&state_mutex_bytes) std::mutex;
198 new (&state_cv_bytes) std::condition_variable;
199 }
200 }
201
202 // returns the aggregate status of this Writer
203 Status FinalStatus() {
204 if (!status.ok()) {
205 // a non-ok memtable write status takes presidence
206 assert(callback == nullptr || callback_status.ok());
207 return status;
208 } else if (!callback_status.ok()) {
209 // if the callback failed then that is the status we want
210 // because a memtable insert should not have been attempted
211 assert(callback != nullptr);
212 assert(status.ok());
213 return callback_status;
214 } else {
215 // if there is no callback then we only care about
216 // the memtable insert status
217 assert(callback == nullptr || callback_status.ok());
218 return status;
219 }
220 }
221
222 bool CallbackFailed() {
223 return (callback != nullptr) && !callback_status.ok();
224 }
225
226 bool ShouldWriteToMemtable() {
227 return status.ok() && !CallbackFailed() && !disable_memtable;
228 }
229
230 bool ShouldWriteToWAL() {
231 return status.ok() && !CallbackFailed() && !disable_wal;
232 }
233
234 // No other mutexes may be acquired while holding StateMutex(), it is
235 // always last in the order
236 std::mutex& StateMutex() {
237 assert(made_waitable);
238 return *static_cast<std::mutex*>(static_cast<void*>(&state_mutex_bytes));
239 }
240
241 std::condition_variable& StateCV() {
242 assert(made_waitable);
243 return *static_cast<std::condition_variable*>(
244 static_cast<void*>(&state_cv_bytes));
245 }
246 };
247
248 struct AdaptationContext {
249 const char* name;
250 std::atomic<int32_t> value;
251
252 explicit AdaptationContext(const char* name0) : name(name0), value(0) {}
253 };
254
255 explicit WriteThread(const ImmutableDBOptions& db_options);
256
257 virtual ~WriteThread() = default;
258
259 // IMPORTANT: None of the methods in this class rely on the db mutex
260 // for correctness. All of the methods except JoinBatchGroup and
261 // EnterUnbatched may be called either with or without the db mutex held.
262 // Correctness is maintained by ensuring that only a single thread is
263 // a leader at a time.
264
265 // Registers w as ready to become part of a batch group, waits until the
266 // caller should perform some work, and returns the current state of the
267 // writer. If w has become the leader of a write batch group, returns
268 // STATE_GROUP_LEADER. If w has been made part of a sequential batch
269 // group and the leader has performed the write, returns STATE_DONE.
270 // If w has been made part of a parallel batch group and is responsible
271 // for updating the memtable, returns STATE_PARALLEL_FOLLOWER.
272 //
273 // The db mutex SHOULD NOT be held when calling this function, because
274 // it will block.
275 //
276 // Writer* w: Writer to be executed as part of a batch group
277 void JoinBatchGroup(Writer* w);
278
279 // Constructs a write batch group led by leader, which should be a
280 // Writer passed to JoinBatchGroup on the current thread.
281 //
282 // Writer* leader: Writer that is STATE_GROUP_LEADER
283 // WriteGroup* write_group: Out-param of group members
284 // returns: Total batch group byte size
285 size_t EnterAsBatchGroupLeader(Writer* leader, WriteGroup* write_group);
286
287 // Unlinks the Writer-s in a batch group, wakes up the non-leaders,
288 // and wakes up the next leader (if any).
289 //
290 // WriteGroup* write_group: the write group
291 // Status status: Status of write operation
292 void ExitAsBatchGroupLeader(WriteGroup& write_group, Status status);
293
294 // Exit batch group on behalf of batch group leader.
295 void ExitAsBatchGroupFollower(Writer* w);
296
297 // Constructs a write batch group led by leader from newest_memtable_writers_
298 // list. The leader should either write memtable for the whole group and
299 // call ExitAsMemTableWriter, or launch parallel memtable write through
300 // LaunchParallelMemTableWriters.
301 void EnterAsMemTableWriter(Writer* leader, WriteGroup* write_grup);
302
303 // Memtable writer group leader, or the last finished writer in a parallel
304 // write group, exit from the newest_memtable_writers_ list, and wake up
305 // the next leader if needed.
306 void ExitAsMemTableWriter(Writer* self, WriteGroup& write_group);
307
308 // Causes JoinBatchGroup to return STATE_PARALLEL_FOLLOWER for all of the
309 // non-leader members of this write batch group. Sets Writer::sequence
310 // before waking them up.
311 //
312 // WriteGroup* write_group: Extra state used to coordinate the parallel add
313 void LaunchParallelMemTableWriters(WriteGroup* write_group);
314
315 // Reports the completion of w's batch to the parallel group leader, and
316 // waits for the rest of the parallel batch to complete. Returns true
317 // if this thread is the last to complete, and hence should advance
318 // the sequence number and then call EarlyExitParallelGroup, false if
319 // someone else has already taken responsibility for that.
320 bool CompleteParallelMemTableWriter(Writer* w);
321
322 // Waits for all preceding writers (unlocking mu while waiting), then
323 // registers w as the currently proceeding writer.
324 //
325 // Writer* w: A Writer not eligible for batching
326 // InstrumentedMutex* mu: The db mutex, to unlock while waiting
327 // REQUIRES: db mutex held
328 void EnterUnbatched(Writer* w, InstrumentedMutex* mu);
329
330 // Completes a Writer begun with EnterUnbatched, unblocking subsequent
331 // writers.
332 void ExitUnbatched(Writer* w);
333
334 // Wait for all parallel memtable writers to finish, in case pipelined
335 // write is enabled.
336 void WaitForMemTableWriters();
337
338 SequenceNumber UpdateLastSequence(SequenceNumber sequence) {
339 if (sequence > last_sequence_) {
340 last_sequence_ = sequence;
341 }
342 return last_sequence_;
343 }
344
345 // Insert a dummy writer at the tail of the write queue to indicate a write
346 // stall, and fail any writers in the queue with no_slowdown set to true
347 void BeginWriteStall();
348
349 // Remove the dummy writer and wake up waiting writers
350 void EndWriteStall();
351
352 private:
353 // See AwaitState.
354 const uint64_t max_yield_usec_;
355 const uint64_t slow_yield_usec_;
356
357 // Allow multiple writers write to memtable concurrently.
358 const bool allow_concurrent_memtable_write_;
359
360 // Enable pipelined write to WAL and memtable.
361 const bool enable_pipelined_write_;
362
363 // Points to the newest pending writer. Only leader can remove
364 // elements, adding can be done lock-free by anybody.
365 std::atomic<Writer*> newest_writer_;
366
367 // Points to the newest pending memtable writer. Used only when pipelined
368 // write is enabled.
369 std::atomic<Writer*> newest_memtable_writer_;
370
371 // The last sequence that have been consumed by a writer. The sequence
372 // is not necessary visible to reads because the writer can be ongoing.
373 SequenceNumber last_sequence_;
374
375 // A dummy writer to indicate a write stall condition. This will be inserted
376 // at the tail of the writer queue by the leader, so newer writers can just
377 // check for this and bail
378 Writer write_stall_dummy_;
379
380 // Mutex and condvar for writers to block on a write stall. During a write
381 // stall, writers with no_slowdown set to false will wait on this rather
382 // on the writer queue
383 port::Mutex stall_mu_;
384 port::CondVar stall_cv_;
385
386 // Waits for w->state & goal_mask using w->StateMutex(). Returns
387 // the state that satisfies goal_mask.
388 uint8_t BlockingAwaitState(Writer* w, uint8_t goal_mask);
389
390 // Blocks until w->state & goal_mask, returning the state value
391 // that satisfied the predicate. Uses ctx to adaptively use
392 // std::this_thread::yield() to avoid mutex overheads. ctx should be
393 // a context-dependent static.
394 uint8_t AwaitState(Writer* w, uint8_t goal_mask, AdaptationContext* ctx);
395
396 // Set writer state and wake the writer up if it is waiting.
397 void SetState(Writer* w, uint8_t new_state);
398
399 // Links w into the newest_writer list. Return true if w was linked directly
400 // into the leader position. Safe to call from multiple threads without
401 // external locking.
402 bool LinkOne(Writer* w, std::atomic<Writer*>* newest_writer);
403
404 // Link write group into the newest_writer list as a whole, while keeping the
405 // order of the writers unchanged. Return true if the group was linked
406 // directly into the leader position.
407 bool LinkGroup(WriteGroup& write_group, std::atomic<Writer*>* newest_writer);
408
409 // Computes any missing link_newer links. Should not be called
410 // concurrently with itself.
411 void CreateMissingNewerLinks(Writer* head);
412
413 // Starting from a pending writer, follow link_older to search for next
414 // leader, until we hit boundary.
415 Writer* FindNextLeader(Writer* pending_writer, Writer* boundary);
416
417 // Set the leader in write_group to completed state and remove it from the
418 // write group.
419 void CompleteLeader(WriteGroup& write_group);
420
421 // Set a follower in write_group to completed state and remove it from the
422 // write group.
423 void CompleteFollower(Writer* w, WriteGroup& write_group);
424 };
425
426 } // namespace rocksdb