]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | // |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | #include "db/db_impl.h" | |
10 | ||
11 | #ifndef __STDC_FORMAT_MACROS | |
12 | #define __STDC_FORMAT_MACROS | |
13 | #endif | |
14 | #include <inttypes.h> | |
11fdf7f2 TL |
15 | #include "db/error_handler.h" |
16 | #include "db/event_helpers.h" | |
7c673cae FG |
17 | #include "monitoring/perf_context_imp.h" |
18 | #include "options/options_helper.h" | |
19 | #include "util/sync_point.h" | |
20 | ||
21 | namespace rocksdb { | |
22 | // Convenience methods | |
23 | Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family, | |
24 | const Slice& key, const Slice& val) { | |
25 | return DB::Put(o, column_family, key, val); | |
26 | } | |
27 | ||
28 | Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family, | |
29 | const Slice& key, const Slice& val) { | |
30 | auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); | |
31 | if (!cfh->cfd()->ioptions()->merge_operator) { | |
32 | return Status::NotSupported("Provide a merge_operator when opening DB"); | |
33 | } else { | |
34 | return DB::Merge(o, column_family, key, val); | |
35 | } | |
36 | } | |
37 | ||
38 | Status DBImpl::Delete(const WriteOptions& write_options, | |
39 | ColumnFamilyHandle* column_family, const Slice& key) { | |
40 | return DB::Delete(write_options, column_family, key); | |
41 | } | |
42 | ||
43 | Status DBImpl::SingleDelete(const WriteOptions& write_options, | |
44 | ColumnFamilyHandle* column_family, | |
45 | const Slice& key) { | |
46 | return DB::SingleDelete(write_options, column_family, key); | |
47 | } | |
48 | ||
11fdf7f2 TL |
49 | void DBImpl::SetRecoverableStatePreReleaseCallback( |
50 | PreReleaseCallback* callback) { | |
51 | recoverable_state_pre_release_callback_.reset(callback); | |
52 | } | |
53 | ||
7c673cae FG |
54 | Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) { |
55 | return WriteImpl(write_options, my_batch, nullptr, nullptr); | |
56 | } | |
57 | ||
58 | #ifndef ROCKSDB_LITE | |
59 | Status DBImpl::WriteWithCallback(const WriteOptions& write_options, | |
60 | WriteBatch* my_batch, | |
61 | WriteCallback* callback) { | |
62 | return WriteImpl(write_options, my_batch, callback, nullptr); | |
63 | } | |
64 | #endif // ROCKSDB_LITE | |
65 | ||
11fdf7f2 TL |
66 | // The main write queue. This is the only write queue that updates LastSequence. |
67 | // When using one write queue, the same sequence also indicates the last | |
68 | // published sequence. | |
7c673cae FG |
69 | Status DBImpl::WriteImpl(const WriteOptions& write_options, |
70 | WriteBatch* my_batch, WriteCallback* callback, | |
71 | uint64_t* log_used, uint64_t log_ref, | |
11fdf7f2 TL |
72 | bool disable_memtable, uint64_t* seq_used, |
73 | size_t batch_cnt, | |
74 | PreReleaseCallback* pre_release_callback) { | |
75 | assert(!seq_per_batch_ || batch_cnt != 0); | |
7c673cae FG |
76 | if (my_batch == nullptr) { |
77 | return Status::Corruption("Batch is nullptr!"); | |
78 | } | |
11fdf7f2 TL |
79 | if (tracer_) { |
80 | InstrumentedMutexLock lock(&trace_mutex_); | |
81 | if (tracer_) { | |
82 | tracer_->Write(my_batch); | |
83 | } | |
84 | } | |
85 | if (write_options.sync && write_options.disableWAL) { | |
86 | return Status::InvalidArgument("Sync writes has to enable WAL."); | |
87 | } | |
88 | if (two_write_queues_ && immutable_db_options_.enable_pipelined_write) { | |
89 | return Status::NotSupported( | |
90 | "pipelined_writes is not compatible with concurrent prepares"); | |
91 | } | |
92 | if (seq_per_batch_ && immutable_db_options_.enable_pipelined_write) { | |
93 | // TODO(yiwu): update pipeline write with seq_per_batch and batch_cnt | |
94 | return Status::NotSupported( | |
95 | "pipelined_writes is not compatible with seq_per_batch"); | |
96 | } | |
97 | // Otherwise IsLatestPersistentState optimization does not make sense | |
98 | assert(!WriteBatchInternal::IsLatestPersistentState(my_batch) || | |
99 | disable_memtable); | |
7c673cae FG |
100 | |
101 | Status status; | |
11fdf7f2 TL |
102 | if (write_options.low_pri) { |
103 | status = ThrottleLowPriWritesIfNeeded(write_options, my_batch); | |
104 | if (!status.ok()) { | |
105 | return status; | |
106 | } | |
107 | } | |
108 | ||
109 | if (two_write_queues_ && disable_memtable) { | |
110 | return WriteImplWALOnly(write_options, my_batch, callback, log_used, | |
111 | log_ref, seq_used, batch_cnt, pre_release_callback); | |
112 | } | |
113 | ||
114 | if (immutable_db_options_.enable_pipelined_write) { | |
115 | return PipelinedWriteImpl(write_options, my_batch, callback, log_used, | |
116 | log_ref, disable_memtable, seq_used); | |
117 | } | |
7c673cae FG |
118 | |
119 | PERF_TIMER_GUARD(write_pre_and_post_process_time); | |
120 | WriteThread::Writer w(write_options, my_batch, callback, log_ref, | |
11fdf7f2 | 121 | disable_memtable, batch_cnt, pre_release_callback); |
7c673cae FG |
122 | |
123 | if (!write_options.disableWAL) { | |
124 | RecordTick(stats_, WRITE_WITH_WAL); | |
125 | } | |
126 | ||
127 | StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE); | |
128 | ||
129 | write_thread_.JoinBatchGroup(&w); | |
11fdf7f2 | 130 | if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) { |
7c673cae | 131 | // we are a non-leader in a parallel group |
7c673cae FG |
132 | |
133 | if (w.ShouldWriteToMemtable()) { | |
11fdf7f2 TL |
134 | PERF_TIMER_STOP(write_pre_and_post_process_time); |
135 | PERF_TIMER_GUARD(write_memtable_time); | |
136 | ||
7c673cae FG |
137 | ColumnFamilyMemTablesImpl column_family_memtables( |
138 | versions_->GetColumnFamilySet()); | |
7c673cae | 139 | w.status = WriteBatchInternal::InsertInto( |
11fdf7f2 | 140 | &w, w.sequence, &column_family_memtables, &flush_scheduler_, |
7c673cae | 141 | write_options.ignore_missing_column_families, 0 /*log_number*/, this, |
11fdf7f2 TL |
142 | true /*concurrent_memtable_writes*/, seq_per_batch_, w.batch_cnt); |
143 | ||
144 | PERF_TIMER_START(write_pre_and_post_process_time); | |
7c673cae FG |
145 | } |
146 | ||
11fdf7f2 | 147 | if (write_thread_.CompleteParallelMemTableWriter(&w)) { |
7c673cae | 148 | // we're responsible for exit batch group |
11fdf7f2 TL |
149 | // TODO(myabandeh): propagate status to write_group |
150 | auto last_sequence = w.write_group->last_sequence; | |
7c673cae | 151 | versions_->SetLastSequence(last_sequence); |
11fdf7f2 | 152 | MemTableInsertStatusCheck(w.status); |
7c673cae FG |
153 | write_thread_.ExitAsBatchGroupFollower(&w); |
154 | } | |
155 | assert(w.state == WriteThread::STATE_COMPLETED); | |
156 | // STATE_COMPLETED conditional below handles exit | |
157 | ||
158 | status = w.FinalStatus(); | |
159 | } | |
160 | if (w.state == WriteThread::STATE_COMPLETED) { | |
161 | if (log_used != nullptr) { | |
162 | *log_used = w.log_used; | |
163 | } | |
11fdf7f2 TL |
164 | if (seq_used != nullptr) { |
165 | *seq_used = w.sequence; | |
166 | } | |
7c673cae FG |
167 | // write is complete and leader has updated sequence |
168 | return w.FinalStatus(); | |
169 | } | |
170 | // else we are the leader of the write batch group | |
171 | assert(w.state == WriteThread::STATE_GROUP_LEADER); | |
172 | ||
173 | // Once reaches this point, the current writer "w" will try to do its write | |
174 | // job. It may also pick up some of the remaining writers in the "writers_" | |
175 | // when it finds suitable, and finish them in the same write batch. | |
176 | // This is how a write job could be done by the other writer. | |
177 | WriteContext write_context; | |
11fdf7f2 | 178 | WriteThread::WriteGroup write_group; |
7c673cae | 179 | bool in_parallel_group = false; |
11fdf7f2 TL |
180 | uint64_t last_sequence = kMaxSequenceNumber; |
181 | if (!two_write_queues_) { | |
182 | last_sequence = versions_->LastSequence(); | |
183 | } | |
7c673cae FG |
184 | |
185 | mutex_.Lock(); | |
186 | ||
11fdf7f2 | 187 | bool need_log_sync = write_options.sync; |
7c673cae | 188 | bool need_log_dir_sync = need_log_sync && !log_dir_synced_; |
11fdf7f2 TL |
189 | if (!two_write_queues_ || !disable_memtable) { |
190 | // With concurrent writes we do preprocess only in the write thread that | |
191 | // also does write to memtable to avoid sync issue on shared data structure | |
192 | // with the other thread | |
193 | ||
194 | // PreprocessWrite does its own perf timing. | |
195 | PERF_TIMER_STOP(write_pre_and_post_process_time); | |
196 | ||
197 | status = PreprocessWrite(write_options, &need_log_sync, &write_context); | |
198 | ||
199 | PERF_TIMER_START(write_pre_and_post_process_time); | |
200 | } | |
201 | log::Writer* log_writer = logs_.back().writer; | |
7c673cae FG |
202 | |
203 | mutex_.Unlock(); | |
204 | ||
205 | // Add to log and apply to memtable. We can release the lock | |
206 | // during this phase since &w is currently responsible for logging | |
207 | // and protects against concurrent loggers and concurrent writes | |
208 | // into memtables | |
209 | ||
11fdf7f2 | 210 | TEST_SYNC_POINT("DBImpl::WriteImpl:BeforeLeaderEnters"); |
7c673cae | 211 | last_batch_group_size_ = |
11fdf7f2 | 212 | write_thread_.EnterAsBatchGroupLeader(&w, &write_group); |
7c673cae FG |
213 | |
214 | if (status.ok()) { | |
215 | // Rules for when we can update the memtable concurrently | |
216 | // 1. supported by memtable | |
217 | // 2. Puts are not okay if inplace_update_support | |
218 | // 3. Merges are not okay | |
219 | // | |
220 | // Rules 1..2 are enforced by checking the options | |
221 | // during startup (CheckConcurrentWritesSupported), so if | |
222 | // options.allow_concurrent_memtable_write is true then they can be | |
223 | // assumed to be true. Rule 3 is checked for each batch. We could | |
224 | // relax rules 2 if we could prevent write batches from referring | |
225 | // more than once to a particular key. | |
226 | bool parallel = immutable_db_options_.allow_concurrent_memtable_write && | |
11fdf7f2 TL |
227 | write_group.size > 1; |
228 | size_t total_count = 0; | |
229 | size_t valid_batches = 0; | |
230 | size_t total_byte_size = 0; | |
231 | for (auto* writer : write_group) { | |
7c673cae | 232 | if (writer->CheckCallback(this)) { |
11fdf7f2 | 233 | valid_batches += writer->batch_cnt; |
7c673cae FG |
234 | if (writer->ShouldWriteToMemtable()) { |
235 | total_count += WriteBatchInternal::Count(writer->batch); | |
236 | parallel = parallel && !writer->batch->HasMerge(); | |
237 | } | |
238 | ||
239 | total_byte_size = WriteBatchInternal::AppendedByteSize( | |
240 | total_byte_size, WriteBatchInternal::ByteSize(writer->batch)); | |
241 | } | |
242 | } | |
11fdf7f2 TL |
243 | // Note about seq_per_batch_: either disableWAL is set for the entire write |
244 | // group or not. In either case we inc seq for each write batch with no | |
245 | // failed callback. This means that there could be a batch with | |
246 | // disalbe_memtable in between; although we do not write this batch to | |
247 | // memtable it still consumes a seq. Otherwise, if !seq_per_batch_, we inc | |
248 | // the seq per valid written key to mem. | |
249 | size_t seq_inc = seq_per_batch_ ? valid_batches : total_count; | |
250 | ||
251 | const bool concurrent_update = two_write_queues_; | |
7c673cae FG |
252 | // Update stats while we are an exclusive group leader, so we know |
253 | // that nobody else can be writing to these particular stats. | |
254 | // We're optimistic, updating the stats before we successfully | |
255 | // commit. That lets us release our leader status early. | |
256 | auto stats = default_cf_internal_stats_; | |
11fdf7f2 TL |
257 | stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_count, |
258 | concurrent_update); | |
7c673cae | 259 | RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count); |
11fdf7f2 TL |
260 | stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size, |
261 | concurrent_update); | |
7c673cae | 262 | RecordTick(stats_, BYTES_WRITTEN, total_byte_size); |
11fdf7f2 | 263 | stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1, concurrent_update); |
7c673cae | 264 | RecordTick(stats_, WRITE_DONE_BY_SELF); |
11fdf7f2 | 265 | auto write_done_by_other = write_group.size - 1; |
7c673cae | 266 | if (write_done_by_other > 0) { |
11fdf7f2 TL |
267 | stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, write_done_by_other, |
268 | concurrent_update); | |
7c673cae FG |
269 | RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other); |
270 | } | |
494da23a | 271 | RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size); |
7c673cae FG |
272 | |
273 | if (write_options.disableWAL) { | |
274 | has_unpersisted_data_.store(true, std::memory_order_relaxed); | |
275 | } | |
276 | ||
277 | PERF_TIMER_STOP(write_pre_and_post_process_time); | |
278 | ||
11fdf7f2 TL |
279 | if (!two_write_queues_) { |
280 | if (status.ok() && !write_options.disableWAL) { | |
281 | PERF_TIMER_GUARD(write_wal_time); | |
282 | status = WriteToWAL(write_group, log_writer, log_used, need_log_sync, | |
283 | need_log_dir_sync, last_sequence + 1); | |
284 | } | |
285 | } else { | |
286 | if (status.ok() && !write_options.disableWAL) { | |
287 | PERF_TIMER_GUARD(write_wal_time); | |
288 | // LastAllocatedSequence is increased inside WriteToWAL under | |
289 | // wal_write_mutex_ to ensure ordered events in WAL | |
290 | status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence, | |
291 | seq_inc); | |
292 | } else { | |
293 | // Otherwise we inc seq number for memtable writes | |
294 | last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc); | |
7c673cae FG |
295 | } |
296 | } | |
11fdf7f2 TL |
297 | assert(last_sequence != kMaxSequenceNumber); |
298 | const SequenceNumber current_sequence = last_sequence + 1; | |
299 | last_sequence += seq_inc; | |
7c673cae | 300 | |
494da23a TL |
301 | // PreReleaseCallback is called after WAL write and before memtable write |
302 | if (status.ok()) { | |
303 | SequenceNumber next_sequence = current_sequence; | |
304 | // Note: the logic for advancing seq here must be consistent with the | |
305 | // logic in WriteBatchInternal::InsertInto(write_group...) as well as | |
306 | // with WriteBatchInternal::InsertInto(write_batch...) that is called on | |
307 | // the merged batch during recovery from the WAL. | |
308 | for (auto* writer : write_group) { | |
309 | if (writer->CallbackFailed()) { | |
310 | continue; | |
311 | } | |
312 | writer->sequence = next_sequence; | |
313 | if (writer->pre_release_callback) { | |
314 | Status ws = writer->pre_release_callback->Callback( | |
315 | writer->sequence, disable_memtable, writer->log_used); | |
316 | if (!ws.ok()) { | |
317 | status = ws; | |
318 | break; | |
319 | } | |
320 | } | |
321 | if (seq_per_batch_) { | |
322 | assert(writer->batch_cnt); | |
323 | next_sequence += writer->batch_cnt; | |
324 | } else if (writer->ShouldWriteToMemtable()) { | |
325 | next_sequence += WriteBatchInternal::Count(writer->batch); | |
326 | } | |
327 | } | |
328 | } | |
329 | ||
7c673cae FG |
330 | if (status.ok()) { |
331 | PERF_TIMER_GUARD(write_memtable_time); | |
332 | ||
333 | if (!parallel) { | |
11fdf7f2 | 334 | // w.sequence will be set inside InsertInto |
7c673cae FG |
335 | w.status = WriteBatchInternal::InsertInto( |
336 | write_group, current_sequence, column_family_memtables_.get(), | |
337 | &flush_scheduler_, write_options.ignore_missing_column_families, | |
11fdf7f2 TL |
338 | 0 /*recovery_log_number*/, this, parallel, seq_per_batch_, |
339 | batch_per_txn_); | |
7c673cae | 340 | } else { |
11fdf7f2 | 341 | write_group.last_sequence = last_sequence; |
11fdf7f2 | 342 | write_thread_.LaunchParallelMemTableWriters(&write_group); |
7c673cae FG |
343 | in_parallel_group = true; |
344 | ||
345 | // Each parallel follower is doing each own writes. The leader should | |
346 | // also do its own. | |
347 | if (w.ShouldWriteToMemtable()) { | |
348 | ColumnFamilyMemTablesImpl column_family_memtables( | |
349 | versions_->GetColumnFamilySet()); | |
350 | assert(w.sequence == current_sequence); | |
7c673cae | 351 | w.status = WriteBatchInternal::InsertInto( |
11fdf7f2 | 352 | &w, w.sequence, &column_family_memtables, &flush_scheduler_, |
7c673cae | 353 | write_options.ignore_missing_column_families, 0 /*log_number*/, |
11fdf7f2 TL |
354 | this, true /*concurrent_memtable_writes*/, seq_per_batch_, |
355 | w.batch_cnt, batch_per_txn_); | |
7c673cae FG |
356 | } |
357 | } | |
11fdf7f2 TL |
358 | if (seq_used != nullptr) { |
359 | *seq_used = w.sequence; | |
360 | } | |
7c673cae FG |
361 | } |
362 | } | |
363 | PERF_TIMER_START(write_pre_and_post_process_time); | |
364 | ||
11fdf7f2 TL |
365 | if (!w.CallbackFailed()) { |
366 | WriteStatusCheck(status); | |
7c673cae FG |
367 | } |
368 | ||
11fdf7f2 | 369 | if (need_log_sync) { |
7c673cae FG |
370 | mutex_.Lock(); |
371 | MarkLogsSynced(logfile_number_, need_log_dir_sync, status); | |
372 | mutex_.Unlock(); | |
11fdf7f2 TL |
373 | // Requesting sync with two_write_queues_ is expected to be very rare. We |
374 | // hence provide a simple implementation that is not necessarily efficient. | |
375 | if (two_write_queues_) { | |
376 | if (manual_wal_flush_) { | |
377 | status = FlushWAL(true); | |
378 | } else { | |
379 | status = SyncWAL(); | |
380 | } | |
381 | } | |
7c673cae FG |
382 | } |
383 | ||
384 | bool should_exit_batch_group = true; | |
385 | if (in_parallel_group) { | |
386 | // CompleteParallelWorker returns true if this thread should | |
387 | // handle exit, false means somebody else did | |
11fdf7f2 | 388 | should_exit_batch_group = write_thread_.CompleteParallelMemTableWriter(&w); |
7c673cae FG |
389 | } |
390 | if (should_exit_batch_group) { | |
11fdf7f2 | 391 | if (status.ok()) { |
494da23a TL |
392 | // Note: if we are to resume after non-OK statuses we need to revisit how |
393 | // we reacts to non-OK statuses here. | |
11fdf7f2 TL |
394 | versions_->SetLastSequence(last_sequence); |
395 | } | |
396 | MemTableInsertStatusCheck(w.status); | |
397 | write_thread_.ExitAsBatchGroupLeader(write_group, status); | |
7c673cae FG |
398 | } |
399 | ||
400 | if (status.ok()) { | |
401 | status = w.FinalStatus(); | |
402 | } | |
11fdf7f2 TL |
403 | return status; |
404 | } | |
405 | ||
406 | Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, | |
407 | WriteBatch* my_batch, WriteCallback* callback, | |
408 | uint64_t* log_used, uint64_t log_ref, | |
409 | bool disable_memtable, uint64_t* seq_used) { | |
410 | PERF_TIMER_GUARD(write_pre_and_post_process_time); | |
411 | StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE); | |
412 | ||
413 | WriteContext write_context; | |
414 | ||
415 | WriteThread::Writer w(write_options, my_batch, callback, log_ref, | |
416 | disable_memtable); | |
417 | write_thread_.JoinBatchGroup(&w); | |
418 | if (w.state == WriteThread::STATE_GROUP_LEADER) { | |
419 | WriteThread::WriteGroup wal_write_group; | |
420 | if (w.callback && !w.callback->AllowWriteBatching()) { | |
421 | write_thread_.WaitForMemTableWriters(); | |
422 | } | |
423 | mutex_.Lock(); | |
424 | bool need_log_sync = !write_options.disableWAL && write_options.sync; | |
425 | bool need_log_dir_sync = need_log_sync && !log_dir_synced_; | |
426 | // PreprocessWrite does its own perf timing. | |
427 | PERF_TIMER_STOP(write_pre_and_post_process_time); | |
428 | w.status = PreprocessWrite(write_options, &need_log_sync, &write_context); | |
429 | PERF_TIMER_START(write_pre_and_post_process_time); | |
430 | log::Writer* log_writer = logs_.back().writer; | |
431 | mutex_.Unlock(); | |
432 | ||
433 | // This can set non-OK status if callback fail. | |
434 | last_batch_group_size_ = | |
435 | write_thread_.EnterAsBatchGroupLeader(&w, &wal_write_group); | |
436 | const SequenceNumber current_sequence = | |
437 | write_thread_.UpdateLastSequence(versions_->LastSequence()) + 1; | |
438 | size_t total_count = 0; | |
439 | size_t total_byte_size = 0; | |
440 | ||
441 | if (w.status.ok()) { | |
442 | SequenceNumber next_sequence = current_sequence; | |
443 | for (auto writer : wal_write_group) { | |
444 | if (writer->CheckCallback(this)) { | |
445 | if (writer->ShouldWriteToMemtable()) { | |
446 | writer->sequence = next_sequence; | |
447 | size_t count = WriteBatchInternal::Count(writer->batch); | |
448 | next_sequence += count; | |
449 | total_count += count; | |
450 | } | |
451 | total_byte_size = WriteBatchInternal::AppendedByteSize( | |
452 | total_byte_size, WriteBatchInternal::ByteSize(writer->batch)); | |
453 | } | |
454 | } | |
455 | if (w.disable_wal) { | |
456 | has_unpersisted_data_.store(true, std::memory_order_relaxed); | |
457 | } | |
458 | write_thread_.UpdateLastSequence(current_sequence + total_count - 1); | |
459 | } | |
7c673cae | 460 | |
11fdf7f2 TL |
461 | auto stats = default_cf_internal_stats_; |
462 | stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_count); | |
463 | RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count); | |
464 | stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size); | |
465 | RecordTick(stats_, BYTES_WRITTEN, total_byte_size); | |
494da23a | 466 | RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size); |
11fdf7f2 TL |
467 | |
468 | PERF_TIMER_STOP(write_pre_and_post_process_time); | |
469 | ||
494da23a | 470 | if (w.status.ok() && !write_options.disableWAL) { |
11fdf7f2 TL |
471 | PERF_TIMER_GUARD(write_wal_time); |
472 | stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1); | |
473 | RecordTick(stats_, WRITE_DONE_BY_SELF, 1); | |
474 | if (wal_write_group.size > 1) { | |
475 | stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, | |
476 | wal_write_group.size - 1); | |
477 | RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1); | |
478 | } | |
479 | w.status = WriteToWAL(wal_write_group, log_writer, log_used, | |
480 | need_log_sync, need_log_dir_sync, current_sequence); | |
481 | } | |
482 | ||
483 | if (!w.CallbackFailed()) { | |
484 | WriteStatusCheck(w.status); | |
485 | } | |
486 | ||
487 | if (need_log_sync) { | |
488 | mutex_.Lock(); | |
489 | MarkLogsSynced(logfile_number_, need_log_dir_sync, w.status); | |
490 | mutex_.Unlock(); | |
491 | } | |
492 | ||
493 | write_thread_.ExitAsBatchGroupLeader(wal_write_group, w.status); | |
494 | } | |
495 | ||
496 | WriteThread::WriteGroup memtable_write_group; | |
497 | if (w.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) { | |
498 | PERF_TIMER_GUARD(write_memtable_time); | |
494da23a | 499 | assert(w.ShouldWriteToMemtable()); |
11fdf7f2 TL |
500 | write_thread_.EnterAsMemTableWriter(&w, &memtable_write_group); |
501 | if (memtable_write_group.size > 1 && | |
502 | immutable_db_options_.allow_concurrent_memtable_write) { | |
503 | write_thread_.LaunchParallelMemTableWriters(&memtable_write_group); | |
504 | } else { | |
505 | memtable_write_group.status = WriteBatchInternal::InsertInto( | |
506 | memtable_write_group, w.sequence, column_family_memtables_.get(), | |
507 | &flush_scheduler_, write_options.ignore_missing_column_families, | |
508 | 0 /*log_number*/, this, false /*concurrent_memtable_writes*/, | |
509 | seq_per_batch_, batch_per_txn_); | |
510 | versions_->SetLastSequence(memtable_write_group.last_sequence); | |
511 | write_thread_.ExitAsMemTableWriter(&w, memtable_write_group); | |
512 | } | |
513 | } | |
514 | ||
515 | if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) { | |
516 | assert(w.ShouldWriteToMemtable()); | |
517 | ColumnFamilyMemTablesImpl column_family_memtables( | |
518 | versions_->GetColumnFamilySet()); | |
519 | w.status = WriteBatchInternal::InsertInto( | |
520 | &w, w.sequence, &column_family_memtables, &flush_scheduler_, | |
521 | write_options.ignore_missing_column_families, 0 /*log_number*/, this, | |
522 | true /*concurrent_memtable_writes*/); | |
523 | if (write_thread_.CompleteParallelMemTableWriter(&w)) { | |
524 | MemTableInsertStatusCheck(w.status); | |
525 | versions_->SetLastSequence(w.write_group->last_sequence); | |
526 | write_thread_.ExitAsMemTableWriter(&w, *w.write_group); | |
527 | } | |
528 | } | |
529 | if (seq_used != nullptr) { | |
530 | *seq_used = w.sequence; | |
531 | } | |
532 | ||
533 | assert(w.state == WriteThread::STATE_COMPLETED); | |
534 | return w.FinalStatus(); | |
535 | } | |
536 | ||
537 | // The 2nd write queue. If enabled it will be used only for WAL-only writes. | |
538 | // This is the only queue that updates LastPublishedSequence which is only | |
539 | // applicable in a two-queue setting. | |
540 | Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, | |
541 | WriteBatch* my_batch, WriteCallback* callback, | |
542 | uint64_t* log_used, uint64_t log_ref, | |
543 | uint64_t* seq_used, size_t batch_cnt, | |
544 | PreReleaseCallback* pre_release_callback) { | |
545 | Status status; | |
546 | PERF_TIMER_GUARD(write_pre_and_post_process_time); | |
547 | WriteThread::Writer w(write_options, my_batch, callback, log_ref, | |
548 | true /* disable_memtable */, batch_cnt, | |
549 | pre_release_callback); | |
550 | RecordTick(stats_, WRITE_WITH_WAL); | |
551 | StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE); | |
552 | ||
553 | nonmem_write_thread_.JoinBatchGroup(&w); | |
554 | assert(w.state != WriteThread::STATE_PARALLEL_MEMTABLE_WRITER); | |
555 | if (w.state == WriteThread::STATE_COMPLETED) { | |
556 | if (log_used != nullptr) { | |
557 | *log_used = w.log_used; | |
558 | } | |
559 | if (seq_used != nullptr) { | |
560 | *seq_used = w.sequence; | |
561 | } | |
562 | return w.FinalStatus(); | |
563 | } | |
564 | // else we are the leader of the write batch group | |
565 | assert(w.state == WriteThread::STATE_GROUP_LEADER); | |
566 | WriteThread::WriteGroup write_group; | |
567 | uint64_t last_sequence; | |
568 | nonmem_write_thread_.EnterAsBatchGroupLeader(&w, &write_group); | |
569 | // Note: no need to update last_batch_group_size_ here since the batch writes | |
570 | // to WAL only | |
571 | ||
572 | size_t total_byte_size = 0; | |
573 | for (auto* writer : write_group) { | |
574 | if (writer->CheckCallback(this)) { | |
575 | total_byte_size = WriteBatchInternal::AppendedByteSize( | |
576 | total_byte_size, WriteBatchInternal::ByteSize(writer->batch)); | |
577 | } | |
578 | } | |
579 | ||
580 | const bool concurrent_update = true; | |
581 | // Update stats while we are an exclusive group leader, so we know | |
582 | // that nobody else can be writing to these particular stats. | |
583 | // We're optimistic, updating the stats before we successfully | |
584 | // commit. That lets us release our leader status early. | |
585 | auto stats = default_cf_internal_stats_; | |
586 | stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size, | |
587 | concurrent_update); | |
588 | RecordTick(stats_, BYTES_WRITTEN, total_byte_size); | |
589 | stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1, concurrent_update); | |
590 | RecordTick(stats_, WRITE_DONE_BY_SELF); | |
591 | auto write_done_by_other = write_group.size - 1; | |
592 | if (write_done_by_other > 0) { | |
593 | stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, write_done_by_other, | |
594 | concurrent_update); | |
595 | RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other); | |
596 | } | |
494da23a | 597 | RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size); |
11fdf7f2 TL |
598 | |
599 | PERF_TIMER_STOP(write_pre_and_post_process_time); | |
600 | ||
601 | PERF_TIMER_GUARD(write_wal_time); | |
602 | // LastAllocatedSequence is increased inside WriteToWAL under | |
603 | // wal_write_mutex_ to ensure ordered events in WAL | |
604 | size_t seq_inc = 0 /* total_count */; | |
605 | if (seq_per_batch_) { | |
606 | size_t total_batch_cnt = 0; | |
607 | for (auto* writer : write_group) { | |
608 | assert(writer->batch_cnt); | |
609 | total_batch_cnt += writer->batch_cnt; | |
610 | } | |
611 | seq_inc = total_batch_cnt; | |
612 | } | |
613 | if (!write_options.disableWAL) { | |
614 | status = | |
615 | ConcurrentWriteToWAL(write_group, log_used, &last_sequence, seq_inc); | |
616 | } else { | |
617 | // Otherwise we inc seq number to do solely the seq allocation | |
618 | last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc); | |
619 | } | |
620 | auto curr_seq = last_sequence + 1; | |
621 | for (auto* writer : write_group) { | |
622 | if (writer->CallbackFailed()) { | |
623 | continue; | |
624 | } | |
625 | writer->sequence = curr_seq; | |
626 | if (seq_per_batch_) { | |
627 | assert(writer->batch_cnt); | |
628 | curr_seq += writer->batch_cnt; | |
629 | } | |
630 | // else seq advances only by memtable writes | |
631 | } | |
632 | if (status.ok() && write_options.sync) { | |
633 | assert(!write_options.disableWAL); | |
634 | // Requesting sync with two_write_queues_ is expected to be very rare. We | |
635 | // hance provide a simple implementation that is not necessarily efficient. | |
636 | if (manual_wal_flush_) { | |
637 | status = FlushWAL(true); | |
638 | } else { | |
639 | status = SyncWAL(); | |
640 | } | |
641 | } | |
642 | PERF_TIMER_START(write_pre_and_post_process_time); | |
643 | ||
644 | if (!w.CallbackFailed()) { | |
645 | WriteStatusCheck(status); | |
646 | } | |
647 | if (status.ok()) { | |
648 | for (auto* writer : write_group) { | |
649 | if (!writer->CallbackFailed() && writer->pre_release_callback) { | |
650 | assert(writer->sequence != kMaxSequenceNumber); | |
651 | const bool DISABLE_MEMTABLE = true; | |
494da23a TL |
652 | Status ws = writer->pre_release_callback->Callback( |
653 | writer->sequence, DISABLE_MEMTABLE, writer->log_used); | |
11fdf7f2 TL |
654 | if (!ws.ok()) { |
655 | status = ws; | |
656 | break; | |
657 | } | |
658 | } | |
659 | } | |
660 | } | |
661 | nonmem_write_thread_.ExitAsBatchGroupLeader(write_group, status); | |
662 | if (status.ok()) { | |
663 | status = w.FinalStatus(); | |
664 | } | |
665 | if (seq_used != nullptr) { | |
666 | *seq_used = w.sequence; | |
667 | } | |
7c673cae FG |
668 | return status; |
669 | } | |
670 | ||
11fdf7f2 TL |
671 | void DBImpl::WriteStatusCheck(const Status& status) { |
672 | // Is setting bg_error_ enough here? This will at least stop | |
673 | // compaction and fail any further writes. | |
674 | if (immutable_db_options_.paranoid_checks && !status.ok() && | |
675 | !status.IsBusy() && !status.IsIncomplete()) { | |
676 | mutex_.Lock(); | |
677 | error_handler_.SetBGError(status, BackgroundErrorReason::kWriteCallback); | |
678 | mutex_.Unlock(); | |
679 | } | |
680 | } | |
681 | ||
682 | void DBImpl::MemTableInsertStatusCheck(const Status& status) { | |
7c673cae FG |
683 | // A non-OK status here indicates that the state implied by the |
684 | // WAL has diverged from the in-memory state. This could be | |
685 | // because of a corrupt write_batch (very bad), or because the | |
686 | // client specified an invalid column family and didn't specify | |
687 | // ignore_missing_column_families. | |
11fdf7f2 | 688 | if (!status.ok()) { |
7c673cae | 689 | mutex_.Lock(); |
11fdf7f2 TL |
690 | assert(!error_handler_.IsBGWorkStopped()); |
691 | error_handler_.SetBGError(status, BackgroundErrorReason::kMemTable); | |
7c673cae FG |
692 | mutex_.Unlock(); |
693 | } | |
694 | } | |
695 | ||
696 | Status DBImpl::PreprocessWrite(const WriteOptions& write_options, | |
11fdf7f2 | 697 | bool* need_log_sync, |
7c673cae FG |
698 | WriteContext* write_context) { |
699 | mutex_.AssertHeld(); | |
11fdf7f2 | 700 | assert(write_context != nullptr && need_log_sync != nullptr); |
7c673cae FG |
701 | Status status; |
702 | ||
11fdf7f2 TL |
703 | if (error_handler_.IsDBStopped()) { |
704 | status = error_handler_.GetBGError(); | |
705 | } | |
706 | ||
707 | PERF_TIMER_GUARD(write_scheduling_flushes_compactions_time); | |
708 | ||
7c673cae FG |
709 | assert(!single_column_family_mode_ || |
710 | versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1); | |
711 | if (UNLIKELY(status.ok() && !single_column_family_mode_ && | |
712 | total_log_size_ > GetMaxTotalWalSize())) { | |
11fdf7f2 | 713 | status = SwitchWAL(write_context); |
7c673cae FG |
714 | } |
715 | ||
716 | if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldFlush())) { | |
717 | // Before a new memtable is added in SwitchMemtable(), | |
718 | // write_buffer_manager_->ShouldFlush() will keep returning true. If another | |
719 | // thread is writing to another DB with the same write buffer, they may also | |
720 | // be flushed. We may end up with flushing much more DBs than needed. It's | |
721 | // suboptimal but still correct. | |
722 | status = HandleWriteBufferFull(write_context); | |
723 | } | |
724 | ||
7c673cae FG |
725 | if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) { |
726 | status = ScheduleFlushes(write_context); | |
727 | } | |
728 | ||
11fdf7f2 TL |
729 | PERF_TIMER_STOP(write_scheduling_flushes_compactions_time); |
730 | PERF_TIMER_GUARD(write_pre_and_post_process_time); | |
731 | ||
7c673cae FG |
732 | if (UNLIKELY(status.ok() && (write_controller_.IsStopped() || |
733 | write_controller_.NeedsDelay()))) { | |
11fdf7f2 | 734 | PERF_TIMER_STOP(write_pre_and_post_process_time); |
7c673cae FG |
735 | PERF_TIMER_GUARD(write_delay_time); |
736 | // We don't know size of curent batch so that we always use the size | |
737 | // for previous one. It might create a fairness issue that expiration | |
738 | // might happen for smaller writes but larger writes can go through. | |
739 | // Can optimize it if it is an issue. | |
740 | status = DelayWrite(last_batch_group_size_, write_options); | |
11fdf7f2 | 741 | PERF_TIMER_START(write_pre_and_post_process_time); |
7c673cae FG |
742 | } |
743 | ||
11fdf7f2 | 744 | if (status.ok() && *need_log_sync) { |
7c673cae FG |
745 | // Wait until the parallel syncs are finished. Any sync process has to sync |
746 | // the front log too so it is enough to check the status of front() | |
747 | // We do a while loop since log_sync_cv_ is signalled when any sync is | |
748 | // finished | |
749 | // Note: there does not seem to be a reason to wait for parallel sync at | |
750 | // this early step but it is not important since parallel sync (SyncWAL) and | |
751 | // need_log_sync are usually not used together. | |
752 | while (logs_.front().getting_synced) { | |
753 | log_sync_cv_.Wait(); | |
754 | } | |
755 | for (auto& log : logs_) { | |
756 | assert(!log.getting_synced); | |
757 | // This is just to prevent the logs to be synced by a parallel SyncWAL | |
758 | // call. We will do the actual syncing later after we will write to the | |
759 | // WAL. | |
760 | // Note: there does not seem to be a reason to set this early before we | |
761 | // actually write to the WAL | |
762 | log.getting_synced = true; | |
763 | } | |
11fdf7f2 TL |
764 | } else { |
765 | *need_log_sync = false; | |
7c673cae FG |
766 | } |
767 | ||
768 | return status; | |
769 | } | |
770 | ||
11fdf7f2 TL |
771 | WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group, |
772 | WriteBatch* tmp_batch, size_t* write_with_wal, | |
773 | WriteBatch** to_be_cached_state) { | |
774 | assert(write_with_wal != nullptr); | |
775 | assert(tmp_batch != nullptr); | |
776 | assert(*to_be_cached_state == nullptr); | |
7c673cae | 777 | WriteBatch* merged_batch = nullptr; |
11fdf7f2 TL |
778 | *write_with_wal = 0; |
779 | auto* leader = write_group.leader; | |
780 | assert(!leader->disable_wal); // Same holds for all in the batch group | |
781 | if (write_group.size == 1 && !leader->CallbackFailed() && | |
782 | leader->batch->GetWalTerminationPoint().is_cleared()) { | |
7c673cae FG |
783 | // we simply write the first WriteBatch to WAL if the group only |
784 | // contains one batch, that batch should be written to the WAL, | |
785 | // and the batch is not wanting to be truncated | |
11fdf7f2 TL |
786 | merged_batch = leader->batch; |
787 | if (WriteBatchInternal::IsLatestPersistentState(merged_batch)) { | |
788 | *to_be_cached_state = merged_batch; | |
789 | } | |
790 | *write_with_wal = 1; | |
7c673cae FG |
791 | } else { |
792 | // WAL needs all of the batches flattened into a single batch. | |
793 | // We could avoid copying here with an iov-like AddRecord | |
794 | // interface | |
11fdf7f2 | 795 | merged_batch = tmp_batch; |
7c673cae | 796 | for (auto writer : write_group) { |
11fdf7f2 | 797 | if (!writer->CallbackFailed()) { |
7c673cae FG |
798 | WriteBatchInternal::Append(merged_batch, writer->batch, |
799 | /*WAL_only*/ true); | |
11fdf7f2 TL |
800 | if (WriteBatchInternal::IsLatestPersistentState(writer->batch)) { |
801 | // We only need to cache the last of such write batch | |
802 | *to_be_cached_state = writer->batch; | |
803 | } | |
804 | (*write_with_wal)++; | |
7c673cae | 805 | } |
7c673cae FG |
806 | } |
807 | } | |
11fdf7f2 TL |
808 | return merged_batch; |
809 | } | |
7c673cae | 810 | |
11fdf7f2 TL |
811 | // When two_write_queues_ is disabled, this function is called from the only |
812 | // write thread. Otherwise this must be called holding log_write_mutex_. | |
813 | Status DBImpl::WriteToWAL(const WriteBatch& merged_batch, | |
814 | log::Writer* log_writer, uint64_t* log_used, | |
815 | uint64_t* log_size) { | |
816 | assert(log_size != nullptr); | |
817 | Slice log_entry = WriteBatchInternal::Contents(&merged_batch); | |
818 | *log_size = log_entry.size(); | |
819 | // When two_write_queues_ WriteToWAL has to be protected from concurretn calls | |
820 | // from the two queues anyway and log_write_mutex_ is already held. Otherwise | |
821 | // if manual_wal_flush_ is enabled we need to protect log_writer->AddRecord | |
822 | // from possible concurrent calls via the FlushWAL by the application. | |
823 | const bool needs_locking = manual_wal_flush_ && !two_write_queues_; | |
824 | // Due to performance cocerns of missed branch prediction penalize the new | |
825 | // manual_wal_flush_ feature (by UNLIKELY) instead of the more common case | |
826 | // when we do not need any locking. | |
827 | if (UNLIKELY(needs_locking)) { | |
828 | log_write_mutex_.Lock(); | |
829 | } | |
830 | Status status = log_writer->AddRecord(log_entry); | |
831 | if (UNLIKELY(needs_locking)) { | |
832 | log_write_mutex_.Unlock(); | |
833 | } | |
834 | if (log_used != nullptr) { | |
835 | *log_used = logfile_number_; | |
836 | } | |
7c673cae | 837 | total_log_size_ += log_entry.size(); |
11fdf7f2 TL |
838 | // TODO(myabandeh): it might be unsafe to access alive_log_files_.back() here |
839 | // since alive_log_files_ might be modified concurrently | |
7c673cae FG |
840 | alive_log_files_.back().AddSize(log_entry.size()); |
841 | log_empty_ = false; | |
11fdf7f2 TL |
842 | return status; |
843 | } | |
844 | ||
845 | Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, | |
846 | log::Writer* log_writer, uint64_t* log_used, | |
847 | bool need_log_sync, bool need_log_dir_sync, | |
848 | SequenceNumber sequence) { | |
849 | Status status; | |
850 | ||
851 | assert(!write_group.leader->disable_wal); | |
852 | // Same holds for all in the batch group | |
853 | size_t write_with_wal = 0; | |
854 | WriteBatch* to_be_cached_state = nullptr; | |
855 | WriteBatch* merged_batch = MergeBatch(write_group, &tmp_batch_, | |
856 | &write_with_wal, &to_be_cached_state); | |
857 | if (merged_batch == write_group.leader->batch) { | |
858 | write_group.leader->log_used = logfile_number_; | |
859 | } else if (write_with_wal > 1) { | |
860 | for (auto writer : write_group) { | |
861 | writer->log_used = logfile_number_; | |
862 | } | |
863 | } | |
864 | ||
865 | WriteBatchInternal::SetSequence(merged_batch, sequence); | |
866 | ||
867 | uint64_t log_size; | |
868 | status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size); | |
869 | if (to_be_cached_state) { | |
870 | cached_recoverable_state_ = *to_be_cached_state; | |
494da23a | 871 | cached_recoverable_state_empty_ = false; |
11fdf7f2 | 872 | } |
7c673cae FG |
873 | |
874 | if (status.ok() && need_log_sync) { | |
875 | StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS); | |
876 | // It's safe to access logs_ with unlocked mutex_ here because: | |
877 | // - we've set getting_synced=true for all logs, | |
878 | // so other threads won't pop from logs_ while we're here, | |
879 | // - only writer thread can push to logs_, and we're in | |
880 | // writer thread, so no one will push to logs_, | |
881 | // - as long as other threads don't modify it, it's safe to read | |
882 | // from std::deque from multiple threads concurrently. | |
883 | for (auto& log : logs_) { | |
884 | status = log.writer->file()->Sync(immutable_db_options_.use_fsync); | |
885 | if (!status.ok()) { | |
886 | break; | |
887 | } | |
888 | } | |
889 | if (status.ok() && need_log_dir_sync) { | |
890 | // We only sync WAL directory the first time WAL syncing is | |
891 | // requested, so that in case users never turn on WAL sync, | |
892 | // we can avoid the disk I/O in the write code path. | |
893 | status = directories_.GetWalDir()->Fsync(); | |
894 | } | |
895 | } | |
896 | ||
897 | if (merged_batch == &tmp_batch_) { | |
898 | tmp_batch_.Clear(); | |
899 | } | |
900 | if (status.ok()) { | |
901 | auto stats = default_cf_internal_stats_; | |
902 | if (need_log_sync) { | |
903 | stats->AddDBStats(InternalStats::WAL_FILE_SYNCED, 1); | |
904 | RecordTick(stats_, WAL_FILE_SYNCED); | |
905 | } | |
906 | stats->AddDBStats(InternalStats::WAL_FILE_BYTES, log_size); | |
907 | RecordTick(stats_, WAL_FILE_BYTES, log_size); | |
908 | stats->AddDBStats(InternalStats::WRITE_WITH_WAL, write_with_wal); | |
909 | RecordTick(stats_, WRITE_WITH_WAL, write_with_wal); | |
910 | } | |
911 | return status; | |
912 | } | |
913 | ||
11fdf7f2 TL |
914 | Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group, |
915 | uint64_t* log_used, | |
916 | SequenceNumber* last_sequence, | |
917 | size_t seq_inc) { | |
918 | Status status; | |
919 | ||
920 | assert(!write_group.leader->disable_wal); | |
921 | // Same holds for all in the batch group | |
922 | WriteBatch tmp_batch; | |
923 | size_t write_with_wal = 0; | |
924 | WriteBatch* to_be_cached_state = nullptr; | |
925 | WriteBatch* merged_batch = | |
926 | MergeBatch(write_group, &tmp_batch, &write_with_wal, &to_be_cached_state); | |
927 | ||
928 | // We need to lock log_write_mutex_ since logs_ and alive_log_files might be | |
929 | // pushed back concurrently | |
930 | log_write_mutex_.Lock(); | |
931 | if (merged_batch == write_group.leader->batch) { | |
932 | write_group.leader->log_used = logfile_number_; | |
933 | } else if (write_with_wal > 1) { | |
934 | for (auto writer : write_group) { | |
935 | writer->log_used = logfile_number_; | |
936 | } | |
937 | } | |
938 | *last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc); | |
939 | auto sequence = *last_sequence + 1; | |
940 | WriteBatchInternal::SetSequence(merged_batch, sequence); | |
941 | ||
942 | log::Writer* log_writer = logs_.back().writer; | |
943 | uint64_t log_size; | |
944 | status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size); | |
945 | if (to_be_cached_state) { | |
946 | cached_recoverable_state_ = *to_be_cached_state; | |
494da23a | 947 | cached_recoverable_state_empty_ = false; |
11fdf7f2 TL |
948 | } |
949 | log_write_mutex_.Unlock(); | |
950 | ||
951 | if (status.ok()) { | |
952 | const bool concurrent = true; | |
953 | auto stats = default_cf_internal_stats_; | |
954 | stats->AddDBStats(InternalStats::WAL_FILE_BYTES, log_size, concurrent); | |
955 | RecordTick(stats_, WAL_FILE_BYTES, log_size); | |
956 | stats->AddDBStats(InternalStats::WRITE_WITH_WAL, write_with_wal, | |
957 | concurrent); | |
958 | RecordTick(stats_, WRITE_WITH_WAL, write_with_wal); | |
959 | } | |
960 | return status; | |
961 | } | |
962 | ||
963 | Status DBImpl::WriteRecoverableState() { | |
964 | mutex_.AssertHeld(); | |
965 | if (!cached_recoverable_state_empty_) { | |
966 | bool dont_care_bool; | |
967 | SequenceNumber next_seq; | |
968 | if (two_write_queues_) { | |
969 | log_write_mutex_.Lock(); | |
970 | } | |
971 | SequenceNumber seq; | |
972 | if (two_write_queues_) { | |
973 | seq = versions_->FetchAddLastAllocatedSequence(0); | |
974 | } else { | |
975 | seq = versions_->LastSequence(); | |
976 | } | |
977 | WriteBatchInternal::SetSequence(&cached_recoverable_state_, seq + 1); | |
978 | auto status = WriteBatchInternal::InsertInto( | |
979 | &cached_recoverable_state_, column_family_memtables_.get(), | |
980 | &flush_scheduler_, true, 0 /*recovery_log_number*/, this, | |
981 | false /* concurrent_memtable_writes */, &next_seq, &dont_care_bool, | |
982 | seq_per_batch_); | |
983 | auto last_seq = next_seq - 1; | |
984 | if (two_write_queues_) { | |
985 | versions_->FetchAddLastAllocatedSequence(last_seq - seq); | |
986 | versions_->SetLastPublishedSequence(last_seq); | |
987 | } | |
988 | versions_->SetLastSequence(last_seq); | |
989 | if (two_write_queues_) { | |
990 | log_write_mutex_.Unlock(); | |
991 | } | |
992 | if (status.ok() && recoverable_state_pre_release_callback_) { | |
993 | const bool DISABLE_MEMTABLE = true; | |
994 | for (uint64_t sub_batch_seq = seq + 1; | |
995 | sub_batch_seq < next_seq && status.ok(); sub_batch_seq++) { | |
494da23a | 996 | uint64_t const no_log_num = 0; |
11fdf7f2 | 997 | status = recoverable_state_pre_release_callback_->Callback( |
494da23a | 998 | sub_batch_seq, !DISABLE_MEMTABLE, no_log_num); |
11fdf7f2 TL |
999 | } |
1000 | } | |
1001 | if (status.ok()) { | |
1002 | cached_recoverable_state_.Clear(); | |
1003 | cached_recoverable_state_empty_ = true; | |
1004 | } | |
1005 | return status; | |
1006 | } | |
1007 | return Status::OK(); | |
1008 | } | |
1009 | ||
494da23a TL |
1010 | void DBImpl::SelectColumnFamiliesForAtomicFlush( |
1011 | autovector<ColumnFamilyData*>* cfds) { | |
1012 | for (ColumnFamilyData* cfd : *versions_->GetColumnFamilySet()) { | |
1013 | if (cfd->IsDropped()) { | |
1014 | continue; | |
1015 | } | |
1016 | if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() || | |
1017 | !cached_recoverable_state_empty_.load()) { | |
1018 | cfds->push_back(cfd); | |
1019 | } | |
1020 | } | |
1021 | } | |
1022 | ||
1023 | // Assign sequence number for atomic flush. | |
1024 | void DBImpl::AssignAtomicFlushSeq(const autovector<ColumnFamilyData*>& cfds) { | |
1025 | assert(immutable_db_options_.atomic_flush); | |
1026 | auto seq = versions_->LastSequence(); | |
1027 | for (auto cfd : cfds) { | |
1028 | cfd->imm()->AssignAtomicFlushSeq(seq); | |
1029 | } | |
1030 | } | |
1031 | ||
11fdf7f2 | 1032 | Status DBImpl::SwitchWAL(WriteContext* write_context) { |
7c673cae FG |
1033 | mutex_.AssertHeld(); |
1034 | assert(write_context != nullptr); | |
1035 | Status status; | |
1036 | ||
1037 | if (alive_log_files_.begin()->getting_flushed) { | |
1038 | return status; | |
1039 | } | |
1040 | ||
1041 | auto oldest_alive_log = alive_log_files_.begin()->number; | |
11fdf7f2 TL |
1042 | bool flush_wont_release_oldest_log = false; |
1043 | if (allow_2pc()) { | |
494da23a | 1044 | auto oldest_log_with_uncommitted_prep = |
11fdf7f2 TL |
1045 | logs_with_prep_tracker_.FindMinLogContainingOutstandingPrep(); |
1046 | ||
494da23a TL |
1047 | assert(oldest_log_with_uncommitted_prep == 0 || |
1048 | oldest_log_with_uncommitted_prep >= oldest_alive_log); | |
1049 | if (oldest_log_with_uncommitted_prep > 0 && | |
1050 | oldest_log_with_uncommitted_prep == oldest_alive_log) { | |
11fdf7f2 | 1051 | if (unable_to_release_oldest_log_) { |
7c673cae | 1052 | // we already attempted to flush all column families dependent on |
494da23a | 1053 | // the oldest alive log but the log still contained uncommitted |
11fdf7f2 | 1054 | // transactions so there is still nothing that we can do. |
7c673cae | 1055 | return status; |
11fdf7f2 TL |
1056 | } else { |
1057 | ROCKS_LOG_WARN( | |
1058 | immutable_db_options_.info_log, | |
494da23a | 1059 | "Unable to release oldest log due to uncommitted transaction"); |
11fdf7f2 TL |
1060 | unable_to_release_oldest_log_ = true; |
1061 | flush_wont_release_oldest_log = true; | |
1062 | } | |
7c673cae | 1063 | } |
11fdf7f2 TL |
1064 | } |
1065 | if (!flush_wont_release_oldest_log) { | |
7c673cae FG |
1066 | // we only mark this log as getting flushed if we have successfully |
1067 | // flushed all data in this log. If this log contains outstanding prepared | |
494da23a TL |
1068 | // transactions then we cannot flush this log until those transactions are |
1069 | // commited. | |
11fdf7f2 | 1070 | unable_to_release_oldest_log_ = false; |
7c673cae FG |
1071 | alive_log_files_.begin()->getting_flushed = true; |
1072 | } | |
1073 | ||
494da23a TL |
1074 | ROCKS_LOG_INFO( |
1075 | immutable_db_options_.info_log, | |
1076 | "Flushing all column families with data in WAL number %" PRIu64 | |
1077 | ". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64, | |
1078 | oldest_alive_log, total_log_size_.load(), GetMaxTotalWalSize()); | |
7c673cae FG |
1079 | // no need to refcount because drop is happening in write thread, so can't |
1080 | // happen while we're in the write thread | |
494da23a TL |
1081 | autovector<ColumnFamilyData*> cfds; |
1082 | if (immutable_db_options_.atomic_flush) { | |
1083 | SelectColumnFamiliesForAtomicFlush(&cfds); | |
1084 | } else { | |
1085 | for (auto cfd : *versions_->GetColumnFamilySet()) { | |
1086 | if (cfd->IsDropped()) { | |
1087 | continue; | |
1088 | } | |
1089 | if (cfd->OldestLogToKeep() <= oldest_alive_log) { | |
1090 | cfds.push_back(cfd); | |
7c673cae | 1091 | } |
494da23a TL |
1092 | } |
1093 | } | |
1094 | for (const auto cfd : cfds) { | |
1095 | cfd->Ref(); | |
1096 | status = SwitchMemtable(cfd, write_context); | |
1097 | cfd->Unref(); | |
1098 | if (!status.ok()) { | |
1099 | break; | |
7c673cae FG |
1100 | } |
1101 | } | |
11fdf7f2 | 1102 | if (status.ok()) { |
494da23a TL |
1103 | if (immutable_db_options_.atomic_flush) { |
1104 | AssignAtomicFlushSeq(cfds); | |
1105 | } | |
1106 | for (auto cfd : cfds) { | |
1107 | cfd->imm()->FlushRequested(); | |
1108 | } | |
1109 | FlushRequest flush_req; | |
1110 | GenerateFlushRequest(cfds, &flush_req); | |
11fdf7f2 TL |
1111 | SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager); |
1112 | MaybeScheduleFlushOrCompaction(); | |
1113 | } | |
7c673cae FG |
1114 | return status; |
1115 | } | |
1116 | ||
1117 | Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) { | |
1118 | mutex_.AssertHeld(); | |
1119 | assert(write_context != nullptr); | |
1120 | Status status; | |
1121 | ||
1122 | // Before a new memtable is added in SwitchMemtable(), | |
1123 | // write_buffer_manager_->ShouldFlush() will keep returning true. If another | |
1124 | // thread is writing to another DB with the same write buffer, they may also | |
1125 | // be flushed. We may end up with flushing much more DBs than needed. It's | |
1126 | // suboptimal but still correct. | |
1127 | ROCKS_LOG_INFO( | |
1128 | immutable_db_options_.info_log, | |
1129 | "Flushing column family with largest mem table size. Write buffer is " | |
494da23a | 1130 | "using %" ROCKSDB_PRIszt " bytes out of a total of %" ROCKSDB_PRIszt ".", |
7c673cae FG |
1131 | write_buffer_manager_->memory_usage(), |
1132 | write_buffer_manager_->buffer_size()); | |
1133 | // no need to refcount because drop is happening in write thread, so can't | |
1134 | // happen while we're in the write thread | |
494da23a TL |
1135 | autovector<ColumnFamilyData*> cfds; |
1136 | if (immutable_db_options_.atomic_flush) { | |
1137 | SelectColumnFamiliesForAtomicFlush(&cfds); | |
1138 | } else { | |
1139 | ColumnFamilyData* cfd_picked = nullptr; | |
1140 | SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber; | |
7c673cae | 1141 | |
494da23a TL |
1142 | for (auto cfd : *versions_->GetColumnFamilySet()) { |
1143 | if (cfd->IsDropped()) { | |
1144 | continue; | |
1145 | } | |
1146 | if (!cfd->mem()->IsEmpty()) { | |
1147 | // We only consider active mem table, hoping immutable memtable is | |
1148 | // already in the process of flushing. | |
1149 | uint64_t seq = cfd->mem()->GetCreationSeq(); | |
1150 | if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) { | |
1151 | cfd_picked = cfd; | |
1152 | seq_num_for_cf_picked = seq; | |
1153 | } | |
7c673cae FG |
1154 | } |
1155 | } | |
494da23a TL |
1156 | if (cfd_picked != nullptr) { |
1157 | cfds.push_back(cfd_picked); | |
1158 | } | |
7c673cae | 1159 | } |
11fdf7f2 | 1160 | |
11fdf7f2 | 1161 | for (const auto cfd : cfds) { |
494da23a TL |
1162 | if (cfd->mem()->IsEmpty()) { |
1163 | continue; | |
1164 | } | |
11fdf7f2 TL |
1165 | cfd->Ref(); |
1166 | status = SwitchMemtable(cfd, write_context); | |
1167 | cfd->Unref(); | |
1168 | if (!status.ok()) { | |
1169 | break; | |
7c673cae | 1170 | } |
11fdf7f2 TL |
1171 | } |
1172 | if (status.ok()) { | |
494da23a TL |
1173 | if (immutable_db_options_.atomic_flush) { |
1174 | AssignAtomicFlushSeq(cfds); | |
1175 | } | |
1176 | for (const auto cfd : cfds) { | |
1177 | cfd->imm()->FlushRequested(); | |
1178 | } | |
1179 | FlushRequest flush_req; | |
1180 | GenerateFlushRequest(cfds, &flush_req); | |
11fdf7f2 TL |
1181 | SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull); |
1182 | MaybeScheduleFlushOrCompaction(); | |
7c673cae FG |
1183 | } |
1184 | return status; | |
1185 | } | |
1186 | ||
1187 | uint64_t DBImpl::GetMaxTotalWalSize() const { | |
1188 | mutex_.AssertHeld(); | |
1189 | return mutable_db_options_.max_total_wal_size == 0 | |
1190 | ? 4 * max_total_in_memory_state_ | |
1191 | : mutable_db_options_.max_total_wal_size; | |
1192 | } | |
1193 | ||
1194 | // REQUIRES: mutex_ is held | |
1195 | // REQUIRES: this thread is currently at the front of the writer queue | |
1196 | Status DBImpl::DelayWrite(uint64_t num_bytes, | |
1197 | const WriteOptions& write_options) { | |
1198 | uint64_t time_delayed = 0; | |
1199 | bool delayed = false; | |
1200 | { | |
1201 | StopWatch sw(env_, stats_, WRITE_STALL, &time_delayed); | |
1202 | uint64_t delay = write_controller_.GetDelay(env_, num_bytes); | |
1203 | if (delay > 0) { | |
1204 | if (write_options.no_slowdown) { | |
11fdf7f2 | 1205 | return Status::Incomplete("Write stall"); |
7c673cae FG |
1206 | } |
1207 | TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep"); | |
1208 | ||
11fdf7f2 TL |
1209 | // Notify write_thread_ about the stall so it can setup a barrier and |
1210 | // fail any pending writers with no_slowdown | |
1211 | write_thread_.BeginWriteStall(); | |
1212 | TEST_SYNC_POINT("DBImpl::DelayWrite:BeginWriteStallDone"); | |
7c673cae FG |
1213 | mutex_.Unlock(); |
1214 | // We will delay the write until we have slept for delay ms or | |
1215 | // we don't need a delay anymore | |
1216 | const uint64_t kDelayInterval = 1000; | |
1217 | uint64_t stall_end = sw.start_time() + delay; | |
1218 | while (write_controller_.NeedsDelay()) { | |
1219 | if (env_->NowMicros() >= stall_end) { | |
1220 | // We already delayed this write `delay` microseconds | |
1221 | break; | |
1222 | } | |
1223 | ||
1224 | delayed = true; | |
1225 | // Sleep for 0.001 seconds | |
1226 | env_->SleepForMicroseconds(kDelayInterval); | |
1227 | } | |
1228 | mutex_.Lock(); | |
11fdf7f2 | 1229 | write_thread_.EndWriteStall(); |
7c673cae FG |
1230 | } |
1231 | ||
11fdf7f2 TL |
1232 | // Don't wait if there's a background error, even if its a soft error. We |
1233 | // might wait here indefinitely as the background compaction may never | |
1234 | // finish successfully, resulting in the stall condition lasting | |
1235 | // indefinitely | |
1236 | while (error_handler_.GetBGError().ok() && write_controller_.IsStopped()) { | |
7c673cae | 1237 | if (write_options.no_slowdown) { |
11fdf7f2 | 1238 | return Status::Incomplete("Write stall"); |
7c673cae FG |
1239 | } |
1240 | delayed = true; | |
11fdf7f2 TL |
1241 | |
1242 | // Notify write_thread_ about the stall so it can setup a barrier and | |
1243 | // fail any pending writers with no_slowdown | |
1244 | write_thread_.BeginWriteStall(); | |
7c673cae FG |
1245 | TEST_SYNC_POINT("DBImpl::DelayWrite:Wait"); |
1246 | bg_cv_.Wait(); | |
11fdf7f2 | 1247 | write_thread_.EndWriteStall(); |
7c673cae FG |
1248 | } |
1249 | } | |
1250 | assert(!delayed || !write_options.no_slowdown); | |
1251 | if (delayed) { | |
1252 | default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_STALL_MICROS, | |
1253 | time_delayed); | |
1254 | RecordTick(stats_, STALL_MICROS, time_delayed); | |
1255 | } | |
1256 | ||
11fdf7f2 TL |
1257 | // If DB is not in read-only mode and write_controller is not stopping |
1258 | // writes, we can ignore any background errors and allow the write to | |
1259 | // proceed | |
1260 | Status s; | |
1261 | if (write_controller_.IsStopped()) { | |
1262 | // If writes are still stopped, it means we bailed due to a background | |
1263 | // error | |
1264 | s = Status::Incomplete(error_handler_.GetBGError().ToString()); | |
1265 | } | |
1266 | if (error_handler_.IsDBStopped()) { | |
1267 | s = error_handler_.GetBGError(); | |
1268 | } | |
1269 | return s; | |
1270 | } | |
1271 | ||
1272 | Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options, | |
1273 | WriteBatch* my_batch) { | |
1274 | assert(write_options.low_pri); | |
1275 | // This is called outside the DB mutex. Although it is safe to make the call, | |
1276 | // the consistency condition is not guaranteed to hold. It's OK to live with | |
1277 | // it in this case. | |
1278 | // If we need to speed compaction, it means the compaction is left behind | |
1279 | // and we start to limit low pri writes to a limit. | |
1280 | if (write_controller_.NeedSpeedupCompaction()) { | |
1281 | if (allow_2pc() && (my_batch->HasCommit() || my_batch->HasRollback())) { | |
1282 | // For 2PC, we only rate limit prepare, not commit. | |
1283 | return Status::OK(); | |
1284 | } | |
1285 | if (write_options.no_slowdown) { | |
1286 | return Status::Incomplete(); | |
1287 | } else { | |
1288 | assert(my_batch != nullptr); | |
1289 | // Rate limit those writes. The reason that we don't completely wait | |
1290 | // is that in case the write is heavy, low pri writes may never have | |
1291 | // a chance to run. Now we guarantee we are still slowly making | |
1292 | // progress. | |
1293 | PERF_TIMER_GUARD(write_delay_time); | |
1294 | write_controller_.low_pri_rate_limiter()->Request( | |
1295 | my_batch->GetDataSize(), Env::IO_HIGH, nullptr /* stats */, | |
1296 | RateLimiter::OpType::kWrite); | |
1297 | } | |
1298 | } | |
1299 | return Status::OK(); | |
7c673cae FG |
1300 | } |
1301 | ||
1302 | Status DBImpl::ScheduleFlushes(WriteContext* context) { | |
494da23a TL |
1303 | autovector<ColumnFamilyData*> cfds; |
1304 | if (immutable_db_options_.atomic_flush) { | |
1305 | SelectColumnFamiliesForAtomicFlush(&cfds); | |
1306 | for (auto cfd : cfds) { | |
1307 | cfd->Ref(); | |
1308 | } | |
1309 | flush_scheduler_.Clear(); | |
1310 | } else { | |
1311 | ColumnFamilyData* tmp_cfd; | |
1312 | while ((tmp_cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) { | |
1313 | cfds.push_back(tmp_cfd); | |
1314 | } | |
1315 | } | |
11fdf7f2 | 1316 | Status status; |
494da23a TL |
1317 | for (auto& cfd : cfds) { |
1318 | if (!cfd->mem()->IsEmpty()) { | |
1319 | status = SwitchMemtable(cfd, context); | |
1320 | } | |
7c673cae FG |
1321 | if (cfd->Unref()) { |
1322 | delete cfd; | |
494da23a | 1323 | cfd = nullptr; |
7c673cae FG |
1324 | } |
1325 | if (!status.ok()) { | |
11fdf7f2 TL |
1326 | break; |
1327 | } | |
7c673cae | 1328 | } |
11fdf7f2 | 1329 | if (status.ok()) { |
494da23a TL |
1330 | if (immutable_db_options_.atomic_flush) { |
1331 | AssignAtomicFlushSeq(cfds); | |
1332 | } | |
1333 | FlushRequest flush_req; | |
1334 | GenerateFlushRequest(cfds, &flush_req); | |
11fdf7f2 TL |
1335 | SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull); |
1336 | MaybeScheduleFlushOrCompaction(); | |
1337 | } | |
1338 | return status; | |
7c673cae FG |
1339 | } |
1340 | ||
1341 | #ifndef ROCKSDB_LITE | |
11fdf7f2 | 1342 | void DBImpl::NotifyOnMemTableSealed(ColumnFamilyData* /*cfd*/, |
7c673cae FG |
1343 | const MemTableInfo& mem_table_info) { |
1344 | if (immutable_db_options_.listeners.size() == 0U) { | |
1345 | return; | |
1346 | } | |
1347 | if (shutting_down_.load(std::memory_order_acquire)) { | |
1348 | return; | |
1349 | } | |
1350 | ||
1351 | for (auto listener : immutable_db_options_.listeners) { | |
1352 | listener->OnMemTableSealed(mem_table_info); | |
1353 | } | |
1354 | } | |
1355 | #endif // ROCKSDB_LITE | |
1356 | ||
1357 | // REQUIRES: mutex_ is held | |
1358 | // REQUIRES: this thread is currently at the front of the writer queue | |
1359 | Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { | |
1360 | mutex_.AssertHeld(); | |
11fdf7f2 TL |
1361 | WriteThread::Writer nonmem_w; |
1362 | if (two_write_queues_) { | |
1363 | // SwitchMemtable is a rare event. To simply the reasoning, we make sure | |
1364 | // that there is no concurrent thread writing to WAL. | |
1365 | nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); | |
1366 | } | |
1367 | ||
494da23a | 1368 | std::unique_ptr<WritableFile> lfile; |
7c673cae FG |
1369 | log::Writer* new_log = nullptr; |
1370 | MemTable* new_mem = nullptr; | |
1371 | ||
11fdf7f2 TL |
1372 | // Recoverable state is persisted in WAL. After memtable switch, WAL might |
1373 | // be deleted, so we write the state to memtable to be persisted as well. | |
1374 | Status s = WriteRecoverableState(); | |
1375 | if (!s.ok()) { | |
1376 | return s; | |
1377 | } | |
1378 | ||
1379 | // In case of pipelined write is enabled, wait for all pending memtable | |
1380 | // writers. | |
1381 | if (immutable_db_options_.enable_pipelined_write) { | |
1382 | // Memtable writers may call DB::Get in case max_successive_merges > 0, | |
1383 | // which may lock mutex. Unlocking mutex here to avoid deadlock. | |
1384 | mutex_.Unlock(); | |
1385 | write_thread_.WaitForMemTableWriters(); | |
1386 | mutex_.Lock(); | |
1387 | } | |
1388 | ||
7c673cae FG |
1389 | // Attempt to switch to a new memtable and trigger flush of old. |
1390 | // Do this without holding the dbmutex lock. | |
1391 | assert(versions_->prev_log_number() == 0); | |
11fdf7f2 TL |
1392 | if (two_write_queues_) { |
1393 | log_write_mutex_.Lock(); | |
1394 | } | |
7c673cae | 1395 | bool creating_new_log = !log_empty_; |
11fdf7f2 TL |
1396 | if (two_write_queues_) { |
1397 | log_write_mutex_.Unlock(); | |
1398 | } | |
7c673cae FG |
1399 | uint64_t recycle_log_number = 0; |
1400 | if (creating_new_log && immutable_db_options_.recycle_log_file_num && | |
11fdf7f2 TL |
1401 | !log_recycle_files_.empty()) { |
1402 | recycle_log_number = log_recycle_files_.front(); | |
1403 | log_recycle_files_.pop_front(); | |
7c673cae FG |
1404 | } |
1405 | uint64_t new_log_number = | |
1406 | creating_new_log ? versions_->NewFileNumber() : logfile_number_; | |
7c673cae FG |
1407 | const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions(); |
1408 | ||
11fdf7f2 | 1409 | // Set memtable_info for memtable sealed callback |
7c673cae FG |
1410 | #ifndef ROCKSDB_LITE |
1411 | MemTableInfo memtable_info; | |
1412 | memtable_info.cf_name = cfd->GetName(); | |
1413 | memtable_info.first_seqno = cfd->mem()->GetFirstSequenceNumber(); | |
1414 | memtable_info.earliest_seqno = cfd->mem()->GetEarliestSequenceNumber(); | |
1415 | memtable_info.num_entries = cfd->mem()->num_entries(); | |
1416 | memtable_info.num_deletes = cfd->mem()->num_deletes(); | |
1417 | #endif // ROCKSDB_LITE | |
1418 | // Log this later after lock release. It may be outdated, e.g., if background | |
1419 | // flush happens before logging, but that should be ok. | |
1420 | int num_imm_unflushed = cfd->imm()->NumNotFlushed(); | |
1421 | DBOptions db_options = | |
1422 | BuildDBOptions(immutable_db_options_, mutable_db_options_); | |
1423 | const auto preallocate_block_size = | |
494da23a | 1424 | GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size); |
11fdf7f2 | 1425 | auto write_hint = CalculateWALWriteHint(); |
7c673cae | 1426 | mutex_.Unlock(); |
7c673cae | 1427 | { |
11fdf7f2 TL |
1428 | std::string log_fname = |
1429 | LogFileName(immutable_db_options_.wal_dir, new_log_number); | |
7c673cae FG |
1430 | if (creating_new_log) { |
1431 | EnvOptions opt_env_opt = | |
1432 | env_->OptimizeForLogWrite(env_options_, db_options); | |
1433 | if (recycle_log_number) { | |
1434 | ROCKS_LOG_INFO(immutable_db_options_.info_log, | |
1435 | "reusing log %" PRIu64 " from recycle list\n", | |
1436 | recycle_log_number); | |
11fdf7f2 TL |
1437 | std::string old_log_fname = |
1438 | LogFileName(immutable_db_options_.wal_dir, recycle_log_number); | |
1439 | s = env_->ReuseWritableFile(log_fname, old_log_fname, &lfile, | |
1440 | opt_env_opt); | |
7c673cae | 1441 | } else { |
11fdf7f2 | 1442 | s = NewWritableFile(env_, log_fname, &lfile, opt_env_opt); |
7c673cae FG |
1443 | } |
1444 | if (s.ok()) { | |
1445 | // Our final size should be less than write_buffer_size | |
1446 | // (compression, etc) but err on the side of caution. | |
1447 | ||
1448 | // use preallocate_block_size instead | |
1449 | // of calling GetWalPreallocateBlockSize() | |
1450 | lfile->SetPreallocationBlockSize(preallocate_block_size); | |
11fdf7f2 | 1451 | lfile->SetWriteLifeTimeHint(write_hint); |
494da23a TL |
1452 | std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter( |
1453 | std::move(lfile), log_fname, opt_env_opt, env_, nullptr /* stats */, | |
1454 | immutable_db_options_.listeners)); | |
11fdf7f2 TL |
1455 | new_log = new log::Writer( |
1456 | std::move(file_writer), new_log_number, | |
1457 | immutable_db_options_.recycle_log_file_num > 0, manual_wal_flush_); | |
7c673cae FG |
1458 | } |
1459 | } | |
1460 | ||
1461 | if (s.ok()) { | |
1462 | SequenceNumber seq = versions_->LastSequence(); | |
1463 | new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq); | |
11fdf7f2 | 1464 | context->superversion_context.NewSuperVersion(); |
7c673cae | 1465 | } |
7c673cae FG |
1466 | } |
1467 | ROCKS_LOG_INFO(immutable_db_options_.info_log, | |
1468 | "[%s] New memtable created with log file: #%" PRIu64 | |
1469 | ". Immutable memtables: %d.\n", | |
1470 | cfd->GetName().c_str(), new_log_number, num_imm_unflushed); | |
1471 | mutex_.Lock(); | |
11fdf7f2 TL |
1472 | if (s.ok() && creating_new_log) { |
1473 | log_write_mutex_.Lock(); | |
7c673cae | 1474 | assert(new_log != nullptr); |
11fdf7f2 TL |
1475 | if (!logs_.empty()) { |
1476 | // Alway flush the buffer of the last log before switching to a new one | |
1477 | log::Writer* cur_log_writer = logs_.back().writer; | |
1478 | s = cur_log_writer->WriteBuffer(); | |
1479 | if (!s.ok()) { | |
1480 | ROCKS_LOG_WARN(immutable_db_options_.info_log, | |
1481 | "[%s] Failed to switch from #%" PRIu64 " to #%" PRIu64 | |
494da23a | 1482 | " WAL file\n", |
11fdf7f2 TL |
1483 | cfd->GetName().c_str(), cur_log_writer->get_log_number(), |
1484 | new_log_number); | |
1485 | } | |
1486 | } | |
494da23a TL |
1487 | if (s.ok()) { |
1488 | logfile_number_ = new_log_number; | |
1489 | log_empty_ = true; | |
1490 | log_dir_synced_ = false; | |
1491 | logs_.emplace_back(logfile_number_, new_log); | |
1492 | alive_log_files_.push_back(LogFileNumberSize(logfile_number_)); | |
1493 | } | |
11fdf7f2 TL |
1494 | log_write_mutex_.Unlock(); |
1495 | } | |
1496 | ||
1497 | if (!s.ok()) { | |
1498 | // how do we fail if we're not creating new log? | |
1499 | assert(creating_new_log); | |
494da23a TL |
1500 | if (new_mem) { |
1501 | delete new_mem; | |
1502 | } | |
1503 | if (new_log) { | |
1504 | delete new_log; | |
1505 | } | |
1506 | SuperVersion* new_superversion = | |
1507 | context->superversion_context.new_superversion.release(); | |
1508 | if (new_superversion != nullptr) { | |
1509 | delete new_superversion; | |
1510 | } | |
1511 | // We may have lost data from the WritableFileBuffer in-memory buffer for | |
1512 | // the current log, so treat it as a fatal error and set bg_error | |
1513 | error_handler_.SetBGError(s, BackgroundErrorReason::kMemTable); | |
1514 | // Read back bg_error in order to get the right severity | |
1515 | s = error_handler_.GetBGError(); | |
1516 | ||
11fdf7f2 TL |
1517 | if (two_write_queues_) { |
1518 | nonmem_write_thread_.ExitUnbatched(&nonmem_w); | |
1519 | } | |
1520 | return s; | |
7c673cae | 1521 | } |
11fdf7f2 | 1522 | |
7c673cae FG |
1523 | for (auto loop_cfd : *versions_->GetColumnFamilySet()) { |
1524 | // all this is just optimization to delete logs that | |
1525 | // are no longer needed -- if CF is empty, that means it | |
1526 | // doesn't need that particular log to stay alive, so we just | |
1527 | // advance the log number. no need to persist this in the manifest | |
1528 | if (loop_cfd->mem()->GetFirstSequenceNumber() == 0 && | |
1529 | loop_cfd->imm()->NumNotFlushed() == 0) { | |
1530 | if (creating_new_log) { | |
1531 | loop_cfd->SetLogNumber(logfile_number_); | |
1532 | } | |
1533 | loop_cfd->mem()->SetCreationSeq(versions_->LastSequence()); | |
1534 | } | |
1535 | } | |
1536 | ||
1537 | cfd->mem()->SetNextLogNumber(logfile_number_); | |
1538 | cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_); | |
1539 | new_mem->Ref(); | |
1540 | cfd->SetMemtable(new_mem); | |
11fdf7f2 TL |
1541 | InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context, |
1542 | mutable_cf_options); | |
494da23a TL |
1543 | #ifndef ROCKSDB_LITE |
1544 | mutex_.Unlock(); | |
1545 | // Notify client that memtable is sealed, now that we have successfully | |
1546 | // installed a new memtable | |
1547 | NotifyOnMemTableSealed(cfd, memtable_info); | |
1548 | mutex_.Lock(); | |
1549 | #endif // ROCKSDB_LITE | |
11fdf7f2 TL |
1550 | if (two_write_queues_) { |
1551 | nonmem_write_thread_.ExitUnbatched(&nonmem_w); | |
1552 | } | |
7c673cae FG |
1553 | return s; |
1554 | } | |
1555 | ||
1556 | size_t DBImpl::GetWalPreallocateBlockSize(uint64_t write_buffer_size) const { | |
1557 | mutex_.AssertHeld(); | |
494da23a TL |
1558 | size_t bsize = |
1559 | static_cast<size_t>(write_buffer_size / 10 + write_buffer_size); | |
7c673cae FG |
1560 | // Some users might set very high write_buffer_size and rely on |
1561 | // max_total_wal_size or other parameters to control the WAL size. | |
1562 | if (mutable_db_options_.max_total_wal_size > 0) { | |
494da23a TL |
1563 | bsize = std::min<size_t>( |
1564 | bsize, static_cast<size_t>(mutable_db_options_.max_total_wal_size)); | |
7c673cae FG |
1565 | } |
1566 | if (immutable_db_options_.db_write_buffer_size > 0) { | |
1567 | bsize = std::min<size_t>(bsize, immutable_db_options_.db_write_buffer_size); | |
1568 | } | |
1569 | if (immutable_db_options_.write_buffer_manager && | |
1570 | immutable_db_options_.write_buffer_manager->enabled()) { | |
1571 | bsize = std::min<size_t>( | |
1572 | bsize, immutable_db_options_.write_buffer_manager->buffer_size()); | |
1573 | } | |
1574 | ||
1575 | return bsize; | |
1576 | } | |
1577 | ||
1578 | // Default implementations of convenience methods that subclasses of DB | |
1579 | // can call if they wish | |
1580 | Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family, | |
1581 | const Slice& key, const Slice& value) { | |
1582 | // Pre-allocate size of write batch conservatively. | |
1583 | // 8 bytes are taken by header, 4 bytes for count, 1 byte for type, | |
1584 | // and we allocate 11 extra bytes for key length, as well as value length. | |
1585 | WriteBatch batch(key.size() + value.size() + 24); | |
11fdf7f2 TL |
1586 | Status s = batch.Put(column_family, key, value); |
1587 | if (!s.ok()) { | |
1588 | return s; | |
1589 | } | |
7c673cae FG |
1590 | return Write(opt, &batch); |
1591 | } | |
1592 | ||
1593 | Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family, | |
1594 | const Slice& key) { | |
1595 | WriteBatch batch; | |
1596 | batch.Delete(column_family, key); | |
1597 | return Write(opt, &batch); | |
1598 | } | |
1599 | ||
1600 | Status DB::SingleDelete(const WriteOptions& opt, | |
1601 | ColumnFamilyHandle* column_family, const Slice& key) { | |
1602 | WriteBatch batch; | |
1603 | batch.SingleDelete(column_family, key); | |
1604 | return Write(opt, &batch); | |
1605 | } | |
1606 | ||
1607 | Status DB::DeleteRange(const WriteOptions& opt, | |
1608 | ColumnFamilyHandle* column_family, | |
1609 | const Slice& begin_key, const Slice& end_key) { | |
1610 | WriteBatch batch; | |
1611 | batch.DeleteRange(column_family, begin_key, end_key); | |
1612 | return Write(opt, &batch); | |
1613 | } | |
1614 | ||
1615 | Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family, | |
1616 | const Slice& key, const Slice& value) { | |
1617 | WriteBatch batch; | |
11fdf7f2 TL |
1618 | Status s = batch.Merge(column_family, key, value); |
1619 | if (!s.ok()) { | |
1620 | return s; | |
1621 | } | |
7c673cae FG |
1622 | return Write(opt, &batch); |
1623 | } | |
1624 | } // namespace rocksdb |