]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/write_thread.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / write_thread.h
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #pragma once
7
8 #include <atomic>
9 #include <cassert>
10 #include <chrono>
11 #include <condition_variable>
12 #include <cstdint>
13 #include <mutex>
14 #include <type_traits>
15 #include <vector>
16
17 #include "db/dbformat.h"
18 #include "db/post_memtable_callback.h"
19 #include "db/pre_release_callback.h"
20 #include "db/write_callback.h"
21 #include "monitoring/instrumented_mutex.h"
22 #include "rocksdb/options.h"
23 #include "rocksdb/status.h"
24 #include "rocksdb/types.h"
25 #include "rocksdb/write_batch.h"
26 #include "util/autovector.h"
27
28 namespace ROCKSDB_NAMESPACE {
29
30 class WriteThread {
31 public:
32 enum State : uint8_t {
33 // The initial state of a writer. This is a Writer that is
34 // waiting in JoinBatchGroup. This state can be left when another
35 // thread informs the waiter that it has become a group leader
36 // (-> STATE_GROUP_LEADER), when a leader that has chosen to be
37 // non-parallel informs a follower that its writes have been committed
38 // (-> STATE_COMPLETED), or when a leader that has chosen to perform
39 // updates in parallel and needs this Writer to apply its batch (->
40 // STATE_PARALLEL_MEMTABLE_WRITER).
41 STATE_INIT = 1,
42
43 // The state used to inform a waiting Writer that it has become the
44 // leader, and it should now build a write batch group. Tricky:
45 // this state is not used if newest_writer_ is empty when a writer
46 // enqueues itself, because there is no need to wait (or even to
47 // create the mutex and condvar used to wait) in that case. This is
48 // a terminal state unless the leader chooses to make this a parallel
49 // batch, in which case the last parallel worker to finish will move
50 // the leader to STATE_COMPLETED.
51 STATE_GROUP_LEADER = 2,
52
53 // The state used to inform a waiting writer that it has become the
54 // leader of memtable writer group. The leader will either write
55 // memtable for the whole group, or launch a parallel group write
56 // to memtable by calling LaunchParallelMemTableWrite.
57 STATE_MEMTABLE_WRITER_LEADER = 4,
58
59 // The state used to inform a waiting writer that it has become a
60 // parallel memtable writer. It can be the group leader who launch the
61 // parallel writer group, or one of the followers. The writer should then
62 // apply its batch to the memtable concurrently and call
63 // CompleteParallelMemTableWriter.
64 STATE_PARALLEL_MEMTABLE_WRITER = 8,
65
66 // A follower whose writes have been applied, or a parallel leader
67 // whose followers have all finished their work. This is a terminal
68 // state.
69 STATE_COMPLETED = 16,
70
71 // A state indicating that the thread may be waiting using StateMutex()
72 // and StateCondVar()
73 STATE_LOCKED_WAITING = 32,
74 };
75
76 struct Writer;
77
78 struct WriteGroup {
79 Writer* leader = nullptr;
80 Writer* last_writer = nullptr;
81 SequenceNumber last_sequence;
82 // before running goes to zero, status needs leader->StateMutex()
83 Status status;
84 std::atomic<size_t> running;
85 size_t size = 0;
86
87 struct Iterator {
88 Writer* writer;
89 Writer* last_writer;
90
91 explicit Iterator(Writer* w, Writer* last)
92 : writer(w), last_writer(last) {}
93
94 Writer* operator*() const { return writer; }
95
96 Iterator& operator++() {
97 assert(writer != nullptr);
98 if (writer == last_writer) {
99 writer = nullptr;
100 } else {
101 writer = writer->link_newer;
102 }
103 return *this;
104 }
105
106 bool operator!=(const Iterator& other) const {
107 return writer != other.writer;
108 }
109 };
110
111 Iterator begin() const { return Iterator(leader, last_writer); }
112 Iterator end() const { return Iterator(nullptr, nullptr); }
113 };
114
115 // Information kept for every waiting writer.
116 struct Writer {
117 WriteBatch* batch;
118 bool sync;
119 bool no_slowdown;
120 bool disable_wal;
121 Env::IOPriority rate_limiter_priority;
122 bool disable_memtable;
123 size_t batch_cnt; // if non-zero, number of sub-batches in the write batch
124 size_t protection_bytes_per_key;
125 PreReleaseCallback* pre_release_callback;
126 PostMemTableCallback* post_memtable_callback;
127 uint64_t log_used; // log number that this batch was inserted into
128 uint64_t log_ref; // log number that memtable insert should reference
129 WriteCallback* callback;
130 bool made_waitable; // records lazy construction of mutex and cv
131 std::atomic<uint8_t> state; // write under StateMutex() or pre-link
132 WriteGroup* write_group;
133 SequenceNumber sequence; // the sequence number to use for the first key
134 Status status;
135 Status callback_status; // status returned by callback->Callback()
136
137 std::aligned_storage<sizeof(std::mutex)>::type state_mutex_bytes;
138 std::aligned_storage<sizeof(std::condition_variable)>::type state_cv_bytes;
139 Writer* link_older; // read/write only before linking, or as leader
140 Writer* link_newer; // lazy, read/write only before linking, or as leader
141
142 Writer()
143 : batch(nullptr),
144 sync(false),
145 no_slowdown(false),
146 disable_wal(false),
147 rate_limiter_priority(Env::IOPriority::IO_TOTAL),
148 disable_memtable(false),
149 batch_cnt(0),
150 protection_bytes_per_key(0),
151 pre_release_callback(nullptr),
152 post_memtable_callback(nullptr),
153 log_used(0),
154 log_ref(0),
155 callback(nullptr),
156 made_waitable(false),
157 state(STATE_INIT),
158 write_group(nullptr),
159 sequence(kMaxSequenceNumber),
160 link_older(nullptr),
161 link_newer(nullptr) {}
162
163 Writer(const WriteOptions& write_options, WriteBatch* _batch,
164 WriteCallback* _callback, uint64_t _log_ref, bool _disable_memtable,
165 size_t _batch_cnt = 0,
166 PreReleaseCallback* _pre_release_callback = nullptr,
167 PostMemTableCallback* _post_memtable_callback = nullptr)
168 : batch(_batch),
169 sync(write_options.sync),
170 no_slowdown(write_options.no_slowdown),
171 disable_wal(write_options.disableWAL),
172 rate_limiter_priority(write_options.rate_limiter_priority),
173 disable_memtable(_disable_memtable),
174 batch_cnt(_batch_cnt),
175 protection_bytes_per_key(_batch->GetProtectionBytesPerKey()),
176 pre_release_callback(_pre_release_callback),
177 post_memtable_callback(_post_memtable_callback),
178 log_used(0),
179 log_ref(_log_ref),
180 callback(_callback),
181 made_waitable(false),
182 state(STATE_INIT),
183 write_group(nullptr),
184 sequence(kMaxSequenceNumber),
185 link_older(nullptr),
186 link_newer(nullptr) {}
187
188 ~Writer() {
189 if (made_waitable) {
190 StateMutex().~mutex();
191 StateCV().~condition_variable();
192 }
193 status.PermitUncheckedError();
194 callback_status.PermitUncheckedError();
195 }
196
197 bool CheckCallback(DB* db) {
198 if (callback != nullptr) {
199 callback_status = callback->Callback(db);
200 }
201 return callback_status.ok();
202 }
203
204 void CreateMutex() {
205 if (!made_waitable) {
206 // Note that made_waitable is tracked separately from state
207 // transitions, because we can't atomically create the mutex and
208 // link into the list.
209 made_waitable = true;
210 new (&state_mutex_bytes) std::mutex;
211 new (&state_cv_bytes) std::condition_variable;
212 }
213 }
214
215 // returns the aggregate status of this Writer
216 Status FinalStatus() {
217 if (!status.ok()) {
218 // a non-ok memtable write status takes presidence
219 assert(callback == nullptr || callback_status.ok());
220 return status;
221 } else if (!callback_status.ok()) {
222 // if the callback failed then that is the status we want
223 // because a memtable insert should not have been attempted
224 assert(callback != nullptr);
225 assert(status.ok());
226 return callback_status;
227 } else {
228 // if there is no callback then we only care about
229 // the memtable insert status
230 assert(callback == nullptr || callback_status.ok());
231 return status;
232 }
233 }
234
235 bool CallbackFailed() {
236 return (callback != nullptr) && !callback_status.ok();
237 }
238
239 bool ShouldWriteToMemtable() {
240 return status.ok() && !CallbackFailed() && !disable_memtable;
241 }
242
243 bool ShouldWriteToWAL() {
244 return status.ok() && !CallbackFailed() && !disable_wal;
245 }
246
247 // No other mutexes may be acquired while holding StateMutex(), it is
248 // always last in the order
249 std::mutex& StateMutex() {
250 assert(made_waitable);
251 return *static_cast<std::mutex*>(static_cast<void*>(&state_mutex_bytes));
252 }
253
254 std::condition_variable& StateCV() {
255 assert(made_waitable);
256 return *static_cast<std::condition_variable*>(
257 static_cast<void*>(&state_cv_bytes));
258 }
259 };
260
261 struct AdaptationContext {
262 const char* name;
263 std::atomic<int32_t> value;
264
265 explicit AdaptationContext(const char* name0) : name(name0), value(0) {}
266 };
267
268 explicit WriteThread(const ImmutableDBOptions& db_options);
269
270 virtual ~WriteThread() = default;
271
272 // IMPORTANT: None of the methods in this class rely on the db mutex
273 // for correctness. All of the methods except JoinBatchGroup and
274 // EnterUnbatched may be called either with or without the db mutex held.
275 // Correctness is maintained by ensuring that only a single thread is
276 // a leader at a time.
277
278 // Registers w as ready to become part of a batch group, waits until the
279 // caller should perform some work, and returns the current state of the
280 // writer. If w has become the leader of a write batch group, returns
281 // STATE_GROUP_LEADER. If w has been made part of a sequential batch
282 // group and the leader has performed the write, returns STATE_DONE.
283 // If w has been made part of a parallel batch group and is responsible
284 // for updating the memtable, returns STATE_PARALLEL_MEMTABLE_WRITER.
285 //
286 // The db mutex SHOULD NOT be held when calling this function, because
287 // it will block.
288 //
289 // Writer* w: Writer to be executed as part of a batch group
290 void JoinBatchGroup(Writer* w);
291
292 // Constructs a write batch group led by leader, which should be a
293 // Writer passed to JoinBatchGroup on the current thread.
294 //
295 // Writer* leader: Writer that is STATE_GROUP_LEADER
296 // WriteGroup* write_group: Out-param of group members
297 // returns: Total batch group byte size
298 size_t EnterAsBatchGroupLeader(Writer* leader, WriteGroup* write_group);
299
300 // Unlinks the Writer-s in a batch group, wakes up the non-leaders,
301 // and wakes up the next leader (if any).
302 //
303 // WriteGroup* write_group: the write group
304 // Status status: Status of write operation
305 void ExitAsBatchGroupLeader(WriteGroup& write_group, Status& status);
306
307 // Exit batch group on behalf of batch group leader.
308 void ExitAsBatchGroupFollower(Writer* w);
309
310 // Constructs a write batch group led by leader from newest_memtable_writers_
311 // list. The leader should either write memtable for the whole group and
312 // call ExitAsMemTableWriter, or launch parallel memtable write through
313 // LaunchParallelMemTableWriters.
314 void EnterAsMemTableWriter(Writer* leader, WriteGroup* write_grup);
315
316 // Memtable writer group leader, or the last finished writer in a parallel
317 // write group, exit from the newest_memtable_writers_ list, and wake up
318 // the next leader if needed.
319 void ExitAsMemTableWriter(Writer* self, WriteGroup& write_group);
320
321 // Causes JoinBatchGroup to return STATE_PARALLEL_MEMTABLE_WRITER for all of
322 // the non-leader members of this write batch group. Sets Writer::sequence
323 // before waking them up.
324 //
325 // WriteGroup* write_group: Extra state used to coordinate the parallel add
326 void LaunchParallelMemTableWriters(WriteGroup* write_group);
327
328 // Reports the completion of w's batch to the parallel group leader, and
329 // waits for the rest of the parallel batch to complete. Returns true
330 // if this thread is the last to complete, and hence should advance
331 // the sequence number and then call EarlyExitParallelGroup, false if
332 // someone else has already taken responsibility for that.
333 bool CompleteParallelMemTableWriter(Writer* w);
334
335 // Waits for all preceding writers (unlocking mu while waiting), then
336 // registers w as the currently proceeding writer.
337 //
338 // Writer* w: A Writer not eligible for batching
339 // InstrumentedMutex* mu: The db mutex, to unlock while waiting
340 // REQUIRES: db mutex held
341 void EnterUnbatched(Writer* w, InstrumentedMutex* mu);
342
343 // Completes a Writer begun with EnterUnbatched, unblocking subsequent
344 // writers.
345 void ExitUnbatched(Writer* w);
346
347 // Wait for all parallel memtable writers to finish, in case pipelined
348 // write is enabled.
349 void WaitForMemTableWriters();
350
351 SequenceNumber UpdateLastSequence(SequenceNumber sequence) {
352 if (sequence > last_sequence_) {
353 last_sequence_ = sequence;
354 }
355 return last_sequence_;
356 }
357
358 // Insert a dummy writer at the tail of the write queue to indicate a write
359 // stall, and fail any writers in the queue with no_slowdown set to true
360 void BeginWriteStall();
361
362 // Remove the dummy writer and wake up waiting writers
363 void EndWriteStall();
364
365 private:
366 // See AwaitState.
367 const uint64_t max_yield_usec_;
368 const uint64_t slow_yield_usec_;
369
370 // Allow multiple writers write to memtable concurrently.
371 const bool allow_concurrent_memtable_write_;
372
373 // Enable pipelined write to WAL and memtable.
374 const bool enable_pipelined_write_;
375
376 // The maximum limit of number of bytes that are written in a single batch
377 // of WAL or memtable write. It is followed when the leader write size
378 // is larger than 1/8 of this limit.
379 const uint64_t max_write_batch_group_size_bytes;
380
381 // Points to the newest pending writer. Only leader can remove
382 // elements, adding can be done lock-free by anybody.
383 std::atomic<Writer*> newest_writer_;
384
385 // Points to the newest pending memtable writer. Used only when pipelined
386 // write is enabled.
387 std::atomic<Writer*> newest_memtable_writer_;
388
389 // The last sequence that have been consumed by a writer. The sequence
390 // is not necessary visible to reads because the writer can be ongoing.
391 SequenceNumber last_sequence_;
392
393 // A dummy writer to indicate a write stall condition. This will be inserted
394 // at the tail of the writer queue by the leader, so newer writers can just
395 // check for this and bail
396 Writer write_stall_dummy_;
397
398 // Mutex and condvar for writers to block on a write stall. During a write
399 // stall, writers with no_slowdown set to false will wait on this rather
400 // on the writer queue
401 port::Mutex stall_mu_;
402 port::CondVar stall_cv_;
403
404 // Waits for w->state & goal_mask using w->StateMutex(). Returns
405 // the state that satisfies goal_mask.
406 uint8_t BlockingAwaitState(Writer* w, uint8_t goal_mask);
407
408 // Blocks until w->state & goal_mask, returning the state value
409 // that satisfied the predicate. Uses ctx to adaptively use
410 // std::this_thread::yield() to avoid mutex overheads. ctx should be
411 // a context-dependent static.
412 uint8_t AwaitState(Writer* w, uint8_t goal_mask, AdaptationContext* ctx);
413
414 // Set writer state and wake the writer up if it is waiting.
415 void SetState(Writer* w, uint8_t new_state);
416
417 // Links w into the newest_writer list. Return true if w was linked directly
418 // into the leader position. Safe to call from multiple threads without
419 // external locking.
420 bool LinkOne(Writer* w, std::atomic<Writer*>* newest_writer);
421
422 // Link write group into the newest_writer list as a whole, while keeping the
423 // order of the writers unchanged. Return true if the group was linked
424 // directly into the leader position.
425 bool LinkGroup(WriteGroup& write_group, std::atomic<Writer*>* newest_writer);
426
427 // Computes any missing link_newer links. Should not be called
428 // concurrently with itself.
429 void CreateMissingNewerLinks(Writer* head);
430
431 // Set the leader in write_group to completed state and remove it from the
432 // write group.
433 void CompleteLeader(WriteGroup& write_group);
434
435 // Set a follower in write_group to completed state and remove it from the
436 // write group.
437 void CompleteFollower(Writer* w, WriteGroup& write_group);
438 };
439
440 } // namespace ROCKSDB_NAMESPACE