1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "db/db_impl/db_impl.h"
12 #include "db/error_handler.h"
13 #include "db/event_helpers.h"
14 #include "monitoring/perf_context_imp.h"
15 #include "options/options_helper.h"
16 #include "test_util/sync_point.h"
18 namespace ROCKSDB_NAMESPACE
{
19 // Convenience methods
20 Status
DBImpl::Put(const WriteOptions
& o
, ColumnFamilyHandle
* column_family
,
21 const Slice
& key
, const Slice
& val
) {
22 return DB::Put(o
, column_family
, key
, val
);
25 Status
DBImpl::Merge(const WriteOptions
& o
, ColumnFamilyHandle
* column_family
,
26 const Slice
& key
, const Slice
& val
) {
27 auto cfh
= reinterpret_cast<ColumnFamilyHandleImpl
*>(column_family
);
28 if (!cfh
->cfd()->ioptions()->merge_operator
) {
29 return Status::NotSupported("Provide a merge_operator when opening DB");
31 return DB::Merge(o
, column_family
, key
, val
);
35 Status
DBImpl::Delete(const WriteOptions
& write_options
,
36 ColumnFamilyHandle
* column_family
, const Slice
& key
) {
37 return DB::Delete(write_options
, column_family
, key
);
40 Status
DBImpl::SingleDelete(const WriteOptions
& write_options
,
41 ColumnFamilyHandle
* column_family
,
43 return DB::SingleDelete(write_options
, column_family
, key
);
46 void DBImpl::SetRecoverableStatePreReleaseCallback(
47 PreReleaseCallback
* callback
) {
48 recoverable_state_pre_release_callback_
.reset(callback
);
51 Status
DBImpl::Write(const WriteOptions
& write_options
, WriteBatch
* my_batch
) {
52 return WriteImpl(write_options
, my_batch
, nullptr, nullptr);
56 Status
DBImpl::WriteWithCallback(const WriteOptions
& write_options
,
58 WriteCallback
* callback
) {
59 return WriteImpl(write_options
, my_batch
, callback
, nullptr);
61 #endif // ROCKSDB_LITE
63 // The main write queue. This is the only write queue that updates LastSequence.
64 // When using one write queue, the same sequence also indicates the last
65 // published sequence.
66 Status
DBImpl::WriteImpl(const WriteOptions
& write_options
,
67 WriteBatch
* my_batch
, WriteCallback
* callback
,
68 uint64_t* log_used
, uint64_t log_ref
,
69 bool disable_memtable
, uint64_t* seq_used
,
71 PreReleaseCallback
* pre_release_callback
) {
72 assert(!seq_per_batch_
|| batch_cnt
!= 0);
73 if (my_batch
== nullptr) {
74 return Status::Corruption("Batch is nullptr!");
77 InstrumentedMutexLock
lock(&trace_mutex_
);
79 tracer_
->Write(my_batch
);
82 if (write_options
.sync
&& write_options
.disableWAL
) {
83 return Status::InvalidArgument("Sync writes has to enable WAL.");
85 if (two_write_queues_
&& immutable_db_options_
.enable_pipelined_write
) {
86 return Status::NotSupported(
87 "pipelined_writes is not compatible with concurrent prepares");
89 if (seq_per_batch_
&& immutable_db_options_
.enable_pipelined_write
) {
90 // TODO(yiwu): update pipeline write with seq_per_batch and batch_cnt
91 return Status::NotSupported(
92 "pipelined_writes is not compatible with seq_per_batch");
94 if (immutable_db_options_
.unordered_write
&&
95 immutable_db_options_
.enable_pipelined_write
) {
96 return Status::NotSupported(
97 "pipelined_writes is not compatible with unordered_write");
99 // Otherwise IsLatestPersistentState optimization does not make sense
100 assert(!WriteBatchInternal::IsLatestPersistentState(my_batch
) ||
104 if (write_options
.low_pri
) {
105 status
= ThrottleLowPriWritesIfNeeded(write_options
, my_batch
);
111 if (two_write_queues_
&& disable_memtable
) {
112 AssignOrder assign_order
=
113 seq_per_batch_
? kDoAssignOrder
: kDontAssignOrder
;
114 // Otherwise it is WAL-only Prepare batches in WriteCommitted policy and
115 // they don't consume sequence.
116 return WriteImplWALOnly(&nonmem_write_thread_
, write_options
, my_batch
,
117 callback
, log_used
, log_ref
, seq_used
, batch_cnt
,
118 pre_release_callback
, assign_order
,
119 kDontPublishLastSeq
, disable_memtable
);
122 if (immutable_db_options_
.unordered_write
) {
123 const size_t sub_batch_cnt
= batch_cnt
!= 0
125 // every key is a sub-batch consuming a seq
126 : WriteBatchInternal::Count(my_batch
);
128 // Use a write thread to i) optimize for WAL write, ii) publish last
129 // sequence in in increasing order, iii) call pre_release_callback serially
130 status
= WriteImplWALOnly(&write_thread_
, write_options
, my_batch
, callback
,
131 log_used
, log_ref
, &seq
, sub_batch_cnt
,
132 pre_release_callback
, kDoAssignOrder
,
133 kDoPublishLastSeq
, disable_memtable
);
134 TEST_SYNC_POINT("DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL");
141 if (!disable_memtable
) {
142 TEST_SYNC_POINT("DBImpl::WriteImpl:BeforeUnorderedWriteMemtable");
143 status
= UnorderedWriteMemtable(write_options
, my_batch
, callback
,
144 log_ref
, seq
, sub_batch_cnt
);
149 if (immutable_db_options_
.enable_pipelined_write
) {
150 return PipelinedWriteImpl(write_options
, my_batch
, callback
, log_used
,
151 log_ref
, disable_memtable
, seq_used
);
154 PERF_TIMER_GUARD(write_pre_and_post_process_time
);
155 WriteThread::Writer
w(write_options
, my_batch
, callback
, log_ref
,
156 disable_memtable
, batch_cnt
, pre_release_callback
);
158 if (!write_options
.disableWAL
) {
159 RecordTick(stats_
, WRITE_WITH_WAL
);
162 StopWatch
write_sw(env_
, immutable_db_options_
.statistics
.get(), DB_WRITE
);
164 write_thread_
.JoinBatchGroup(&w
);
165 if (w
.state
== WriteThread::STATE_PARALLEL_MEMTABLE_WRITER
) {
166 // we are a non-leader in a parallel group
168 if (w
.ShouldWriteToMemtable()) {
169 PERF_TIMER_STOP(write_pre_and_post_process_time
);
170 PERF_TIMER_GUARD(write_memtable_time
);
172 ColumnFamilyMemTablesImpl
column_family_memtables(
173 versions_
->GetColumnFamilySet());
174 w
.status
= WriteBatchInternal::InsertInto(
175 &w
, w
.sequence
, &column_family_memtables
, &flush_scheduler_
,
176 &trim_history_scheduler_
,
177 write_options
.ignore_missing_column_families
, 0 /*log_number*/, this,
178 true /*concurrent_memtable_writes*/, seq_per_batch_
, w
.batch_cnt
,
179 batch_per_txn_
, write_options
.memtable_insert_hint_per_batch
);
181 PERF_TIMER_START(write_pre_and_post_process_time
);
184 if (write_thread_
.CompleteParallelMemTableWriter(&w
)) {
185 // we're responsible for exit batch group
186 // TODO(myabandeh): propagate status to write_group
187 auto last_sequence
= w
.write_group
->last_sequence
;
188 versions_
->SetLastSequence(last_sequence
);
189 MemTableInsertStatusCheck(w
.status
);
190 write_thread_
.ExitAsBatchGroupFollower(&w
);
192 assert(w
.state
== WriteThread::STATE_COMPLETED
);
193 // STATE_COMPLETED conditional below handles exit
195 status
= w
.FinalStatus();
197 if (w
.state
== WriteThread::STATE_COMPLETED
) {
198 if (log_used
!= nullptr) {
199 *log_used
= w
.log_used
;
201 if (seq_used
!= nullptr) {
202 *seq_used
= w
.sequence
;
204 // write is complete and leader has updated sequence
205 return w
.FinalStatus();
207 // else we are the leader of the write batch group
208 assert(w
.state
== WriteThread::STATE_GROUP_LEADER
);
210 // Once reaches this point, the current writer "w" will try to do its write
211 // job. It may also pick up some of the remaining writers in the "writers_"
212 // when it finds suitable, and finish them in the same write batch.
213 // This is how a write job could be done by the other writer.
214 WriteContext write_context
;
215 WriteThread::WriteGroup write_group
;
216 bool in_parallel_group
= false;
217 uint64_t last_sequence
= kMaxSequenceNumber
;
221 bool need_log_sync
= write_options
.sync
;
222 bool need_log_dir_sync
= need_log_sync
&& !log_dir_synced_
;
223 if (!two_write_queues_
|| !disable_memtable
) {
224 // With concurrent writes we do preprocess only in the write thread that
225 // also does write to memtable to avoid sync issue on shared data structure
226 // with the other thread
228 // PreprocessWrite does its own perf timing.
229 PERF_TIMER_STOP(write_pre_and_post_process_time
);
231 status
= PreprocessWrite(write_options
, &need_log_sync
, &write_context
);
232 if (!two_write_queues_
) {
233 // Assign it after ::PreprocessWrite since the sequence might advance
234 // inside it by WriteRecoverableState
235 last_sequence
= versions_
->LastSequence();
238 PERF_TIMER_START(write_pre_and_post_process_time
);
240 log::Writer
* log_writer
= logs_
.back().writer
;
244 // Add to log and apply to memtable. We can release the lock
245 // during this phase since &w is currently responsible for logging
246 // and protects against concurrent loggers and concurrent writes
249 TEST_SYNC_POINT("DBImpl::WriteImpl:BeforeLeaderEnters");
250 last_batch_group_size_
=
251 write_thread_
.EnterAsBatchGroupLeader(&w
, &write_group
);
254 // Rules for when we can update the memtable concurrently
255 // 1. supported by memtable
256 // 2. Puts are not okay if inplace_update_support
257 // 3. Merges are not okay
259 // Rules 1..2 are enforced by checking the options
260 // during startup (CheckConcurrentWritesSupported), so if
261 // options.allow_concurrent_memtable_write is true then they can be
262 // assumed to be true. Rule 3 is checked for each batch. We could
263 // relax rules 2 if we could prevent write batches from referring
264 // more than once to a particular key.
265 bool parallel
= immutable_db_options_
.allow_concurrent_memtable_write
&&
266 write_group
.size
> 1;
267 size_t total_count
= 0;
268 size_t valid_batches
= 0;
269 size_t total_byte_size
= 0;
270 size_t pre_release_callback_cnt
= 0;
271 for (auto* writer
: write_group
) {
272 if (writer
->CheckCallback(this)) {
273 valid_batches
+= writer
->batch_cnt
;
274 if (writer
->ShouldWriteToMemtable()) {
275 total_count
+= WriteBatchInternal::Count(writer
->batch
);
276 parallel
= parallel
&& !writer
->batch
->HasMerge();
278 total_byte_size
= WriteBatchInternal::AppendedByteSize(
279 total_byte_size
, WriteBatchInternal::ByteSize(writer
->batch
));
280 if (writer
->pre_release_callback
) {
281 pre_release_callback_cnt
++;
285 // Note about seq_per_batch_: either disableWAL is set for the entire write
286 // group or not. In either case we inc seq for each write batch with no
287 // failed callback. This means that there could be a batch with
288 // disalbe_memtable in between; although we do not write this batch to
289 // memtable it still consumes a seq. Otherwise, if !seq_per_batch_, we inc
290 // the seq per valid written key to mem.
291 size_t seq_inc
= seq_per_batch_
? valid_batches
: total_count
;
293 const bool concurrent_update
= two_write_queues_
;
294 // Update stats while we are an exclusive group leader, so we know
295 // that nobody else can be writing to these particular stats.
296 // We're optimistic, updating the stats before we successfully
297 // commit. That lets us release our leader status early.
298 auto stats
= default_cf_internal_stats_
;
299 stats
->AddDBStats(InternalStats::kIntStatsNumKeysWritten
, total_count
,
301 RecordTick(stats_
, NUMBER_KEYS_WRITTEN
, total_count
);
302 stats
->AddDBStats(InternalStats::kIntStatsBytesWritten
, total_byte_size
,
304 RecordTick(stats_
, BYTES_WRITTEN
, total_byte_size
);
305 stats
->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf
, 1,
307 RecordTick(stats_
, WRITE_DONE_BY_SELF
);
308 auto write_done_by_other
= write_group
.size
- 1;
309 if (write_done_by_other
> 0) {
310 stats
->AddDBStats(InternalStats::kIntStatsWriteDoneByOther
,
311 write_done_by_other
, concurrent_update
);
312 RecordTick(stats_
, WRITE_DONE_BY_OTHER
, write_done_by_other
);
314 RecordInHistogram(stats_
, BYTES_PER_WRITE
, total_byte_size
);
316 if (write_options
.disableWAL
) {
317 has_unpersisted_data_
.store(true, std::memory_order_relaxed
);
320 PERF_TIMER_STOP(write_pre_and_post_process_time
);
322 if (!two_write_queues_
) {
323 if (status
.ok() && !write_options
.disableWAL
) {
324 PERF_TIMER_GUARD(write_wal_time
);
325 status
= WriteToWAL(write_group
, log_writer
, log_used
, need_log_sync
,
326 need_log_dir_sync
, last_sequence
+ 1);
329 if (status
.ok() && !write_options
.disableWAL
) {
330 PERF_TIMER_GUARD(write_wal_time
);
331 // LastAllocatedSequence is increased inside WriteToWAL under
332 // wal_write_mutex_ to ensure ordered events in WAL
333 status
= ConcurrentWriteToWAL(write_group
, log_used
, &last_sequence
,
336 // Otherwise we inc seq number for memtable writes
337 last_sequence
= versions_
->FetchAddLastAllocatedSequence(seq_inc
);
340 assert(last_sequence
!= kMaxSequenceNumber
);
341 const SequenceNumber current_sequence
= last_sequence
+ 1;
342 last_sequence
+= seq_inc
;
344 // PreReleaseCallback is called after WAL write and before memtable write
346 SequenceNumber next_sequence
= current_sequence
;
348 // Note: the logic for advancing seq here must be consistent with the
349 // logic in WriteBatchInternal::InsertInto(write_group...) as well as
350 // with WriteBatchInternal::InsertInto(write_batch...) that is called on
351 // the merged batch during recovery from the WAL.
352 for (auto* writer
: write_group
) {
353 if (writer
->CallbackFailed()) {
356 writer
->sequence
= next_sequence
;
357 if (writer
->pre_release_callback
) {
358 Status ws
= writer
->pre_release_callback
->Callback(
359 writer
->sequence
, disable_memtable
, writer
->log_used
, index
++,
360 pre_release_callback_cnt
);
366 if (seq_per_batch_
) {
367 assert(writer
->batch_cnt
);
368 next_sequence
+= writer
->batch_cnt
;
369 } else if (writer
->ShouldWriteToMemtable()) {
370 next_sequence
+= WriteBatchInternal::Count(writer
->batch
);
376 PERF_TIMER_GUARD(write_memtable_time
);
379 // w.sequence will be set inside InsertInto
380 w
.status
= WriteBatchInternal::InsertInto(
381 write_group
, current_sequence
, column_family_memtables_
.get(),
382 &flush_scheduler_
, &trim_history_scheduler_
,
383 write_options
.ignore_missing_column_families
,
384 0 /*recovery_log_number*/, this, parallel
, seq_per_batch_
,
387 write_group
.last_sequence
= last_sequence
;
388 write_thread_
.LaunchParallelMemTableWriters(&write_group
);
389 in_parallel_group
= true;
391 // Each parallel follower is doing each own writes. The leader should
393 if (w
.ShouldWriteToMemtable()) {
394 ColumnFamilyMemTablesImpl
column_family_memtables(
395 versions_
->GetColumnFamilySet());
396 assert(w
.sequence
== current_sequence
);
397 w
.status
= WriteBatchInternal::InsertInto(
398 &w
, w
.sequence
, &column_family_memtables
, &flush_scheduler_
,
399 &trim_history_scheduler_
,
400 write_options
.ignore_missing_column_families
, 0 /*log_number*/,
401 this, true /*concurrent_memtable_writes*/, seq_per_batch_
,
402 w
.batch_cnt
, batch_per_txn_
,
403 write_options
.memtable_insert_hint_per_batch
);
406 if (seq_used
!= nullptr) {
407 *seq_used
= w
.sequence
;
411 PERF_TIMER_START(write_pre_and_post_process_time
);
413 if (!w
.CallbackFailed()) {
414 WriteStatusCheck(status
);
419 MarkLogsSynced(logfile_number_
, need_log_dir_sync
, status
);
421 // Requesting sync with two_write_queues_ is expected to be very rare. We
422 // hence provide a simple implementation that is not necessarily efficient.
423 if (two_write_queues_
) {
424 if (manual_wal_flush_
) {
425 status
= FlushWAL(true);
432 bool should_exit_batch_group
= true;
433 if (in_parallel_group
) {
434 // CompleteParallelWorker returns true if this thread should
435 // handle exit, false means somebody else did
436 should_exit_batch_group
= write_thread_
.CompleteParallelMemTableWriter(&w
);
438 if (should_exit_batch_group
) {
440 // Note: if we are to resume after non-OK statuses we need to revisit how
441 // we reacts to non-OK statuses here.
442 versions_
->SetLastSequence(last_sequence
);
444 MemTableInsertStatusCheck(w
.status
);
445 write_thread_
.ExitAsBatchGroupLeader(write_group
, status
);
449 status
= w
.FinalStatus();
454 Status
DBImpl::PipelinedWriteImpl(const WriteOptions
& write_options
,
455 WriteBatch
* my_batch
, WriteCallback
* callback
,
456 uint64_t* log_used
, uint64_t log_ref
,
457 bool disable_memtable
, uint64_t* seq_used
) {
458 PERF_TIMER_GUARD(write_pre_and_post_process_time
);
459 StopWatch
write_sw(env_
, immutable_db_options_
.statistics
.get(), DB_WRITE
);
461 WriteContext write_context
;
463 WriteThread::Writer
w(write_options
, my_batch
, callback
, log_ref
,
465 write_thread_
.JoinBatchGroup(&w
);
466 if (w
.state
== WriteThread::STATE_GROUP_LEADER
) {
467 WriteThread::WriteGroup wal_write_group
;
468 if (w
.callback
&& !w
.callback
->AllowWriteBatching()) {
469 write_thread_
.WaitForMemTableWriters();
472 bool need_log_sync
= !write_options
.disableWAL
&& write_options
.sync
;
473 bool need_log_dir_sync
= need_log_sync
&& !log_dir_synced_
;
474 // PreprocessWrite does its own perf timing.
475 PERF_TIMER_STOP(write_pre_and_post_process_time
);
476 w
.status
= PreprocessWrite(write_options
, &need_log_sync
, &write_context
);
477 PERF_TIMER_START(write_pre_and_post_process_time
);
478 log::Writer
* log_writer
= logs_
.back().writer
;
481 // This can set non-OK status if callback fail.
482 last_batch_group_size_
=
483 write_thread_
.EnterAsBatchGroupLeader(&w
, &wal_write_group
);
484 const SequenceNumber current_sequence
=
485 write_thread_
.UpdateLastSequence(versions_
->LastSequence()) + 1;
486 size_t total_count
= 0;
487 size_t total_byte_size
= 0;
490 SequenceNumber next_sequence
= current_sequence
;
491 for (auto writer
: wal_write_group
) {
492 if (writer
->CheckCallback(this)) {
493 if (writer
->ShouldWriteToMemtable()) {
494 writer
->sequence
= next_sequence
;
495 size_t count
= WriteBatchInternal::Count(writer
->batch
);
496 next_sequence
+= count
;
497 total_count
+= count
;
499 total_byte_size
= WriteBatchInternal::AppendedByteSize(
500 total_byte_size
, WriteBatchInternal::ByteSize(writer
->batch
));
504 has_unpersisted_data_
.store(true, std::memory_order_relaxed
);
506 write_thread_
.UpdateLastSequence(current_sequence
+ total_count
- 1);
509 auto stats
= default_cf_internal_stats_
;
510 stats
->AddDBStats(InternalStats::kIntStatsNumKeysWritten
, total_count
);
511 RecordTick(stats_
, NUMBER_KEYS_WRITTEN
, total_count
);
512 stats
->AddDBStats(InternalStats::kIntStatsBytesWritten
, total_byte_size
);
513 RecordTick(stats_
, BYTES_WRITTEN
, total_byte_size
);
514 RecordInHistogram(stats_
, BYTES_PER_WRITE
, total_byte_size
);
516 PERF_TIMER_STOP(write_pre_and_post_process_time
);
518 if (w
.status
.ok() && !write_options
.disableWAL
) {
519 PERF_TIMER_GUARD(write_wal_time
);
520 stats
->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf
, 1);
521 RecordTick(stats_
, WRITE_DONE_BY_SELF
, 1);
522 if (wal_write_group
.size
> 1) {
523 stats
->AddDBStats(InternalStats::kIntStatsWriteDoneByOther
,
524 wal_write_group
.size
- 1);
525 RecordTick(stats_
, WRITE_DONE_BY_OTHER
, wal_write_group
.size
- 1);
527 w
.status
= WriteToWAL(wal_write_group
, log_writer
, log_used
,
528 need_log_sync
, need_log_dir_sync
, current_sequence
);
531 if (!w
.CallbackFailed()) {
532 WriteStatusCheck(w
.status
);
537 MarkLogsSynced(logfile_number_
, need_log_dir_sync
, w
.status
);
541 write_thread_
.ExitAsBatchGroupLeader(wal_write_group
, w
.status
);
544 WriteThread::WriteGroup memtable_write_group
;
545 if (w
.state
== WriteThread::STATE_MEMTABLE_WRITER_LEADER
) {
546 PERF_TIMER_GUARD(write_memtable_time
);
547 assert(w
.ShouldWriteToMemtable());
548 write_thread_
.EnterAsMemTableWriter(&w
, &memtable_write_group
);
549 if (memtable_write_group
.size
> 1 &&
550 immutable_db_options_
.allow_concurrent_memtable_write
) {
551 write_thread_
.LaunchParallelMemTableWriters(&memtable_write_group
);
553 memtable_write_group
.status
= WriteBatchInternal::InsertInto(
554 memtable_write_group
, w
.sequence
, column_family_memtables_
.get(),
555 &flush_scheduler_
, &trim_history_scheduler_
,
556 write_options
.ignore_missing_column_families
, 0 /*log_number*/, this,
557 false /*concurrent_memtable_writes*/, seq_per_batch_
, batch_per_txn_
);
558 versions_
->SetLastSequence(memtable_write_group
.last_sequence
);
559 write_thread_
.ExitAsMemTableWriter(&w
, memtable_write_group
);
563 if (w
.state
== WriteThread::STATE_PARALLEL_MEMTABLE_WRITER
) {
564 assert(w
.ShouldWriteToMemtable());
565 ColumnFamilyMemTablesImpl
column_family_memtables(
566 versions_
->GetColumnFamilySet());
567 w
.status
= WriteBatchInternal::InsertInto(
568 &w
, w
.sequence
, &column_family_memtables
, &flush_scheduler_
,
569 &trim_history_scheduler_
, write_options
.ignore_missing_column_families
,
570 0 /*log_number*/, this, true /*concurrent_memtable_writes*/,
571 false /*seq_per_batch*/, 0 /*batch_cnt*/, true /*batch_per_txn*/,
572 write_options
.memtable_insert_hint_per_batch
);
573 if (write_thread_
.CompleteParallelMemTableWriter(&w
)) {
574 MemTableInsertStatusCheck(w
.status
);
575 versions_
->SetLastSequence(w
.write_group
->last_sequence
);
576 write_thread_
.ExitAsMemTableWriter(&w
, *w
.write_group
);
579 if (seq_used
!= nullptr) {
580 *seq_used
= w
.sequence
;
583 assert(w
.state
== WriteThread::STATE_COMPLETED
);
584 return w
.FinalStatus();
587 Status
DBImpl::UnorderedWriteMemtable(const WriteOptions
& write_options
,
588 WriteBatch
* my_batch
,
589 WriteCallback
* callback
, uint64_t log_ref
,
591 const size_t sub_batch_cnt
) {
592 PERF_TIMER_GUARD(write_pre_and_post_process_time
);
593 StopWatch
write_sw(env_
, immutable_db_options_
.statistics
.get(), DB_WRITE
);
595 WriteThread::Writer
w(write_options
, my_batch
, callback
, log_ref
,
596 false /*disable_memtable*/);
598 if (w
.CheckCallback(this) && w
.ShouldWriteToMemtable()) {
600 size_t total_count
= WriteBatchInternal::Count(my_batch
);
601 InternalStats
* stats
= default_cf_internal_stats_
;
602 stats
->AddDBStats(InternalStats::kIntStatsNumKeysWritten
, total_count
);
603 RecordTick(stats_
, NUMBER_KEYS_WRITTEN
, total_count
);
605 ColumnFamilyMemTablesImpl
column_family_memtables(
606 versions_
->GetColumnFamilySet());
607 w
.status
= WriteBatchInternal::InsertInto(
608 &w
, w
.sequence
, &column_family_memtables
, &flush_scheduler_
,
609 &trim_history_scheduler_
, write_options
.ignore_missing_column_families
,
610 0 /*log_number*/, this, true /*concurrent_memtable_writes*/,
611 seq_per_batch_
, sub_batch_cnt
, true /*batch_per_txn*/,
612 write_options
.memtable_insert_hint_per_batch
);
614 WriteStatusCheck(w
.status
);
615 if (write_options
.disableWAL
) {
616 has_unpersisted_data_
.store(true, std::memory_order_relaxed
);
620 size_t pending_cnt
= pending_memtable_writes_
.fetch_sub(1) - 1;
621 if (pending_cnt
== 0) {
622 // switch_cv_ waits until pending_memtable_writes_ = 0. Locking its mutex
623 // before notify ensures that cv is in waiting state when it is notified
624 // thus not missing the update to pending_memtable_writes_ even though it is
625 // not modified under the mutex.
626 std::lock_guard
<std::mutex
> lck(switch_mutex_
);
627 switch_cv_
.notify_all();
630 if (!w
.FinalStatus().ok()) {
631 return w
.FinalStatus();
636 // The 2nd write queue. If enabled it will be used only for WAL-only writes.
637 // This is the only queue that updates LastPublishedSequence which is only
638 // applicable in a two-queue setting.
639 Status
DBImpl::WriteImplWALOnly(
640 WriteThread
* write_thread
, const WriteOptions
& write_options
,
641 WriteBatch
* my_batch
, WriteCallback
* callback
, uint64_t* log_used
,
642 const uint64_t log_ref
, uint64_t* seq_used
, const size_t sub_batch_cnt
,
643 PreReleaseCallback
* pre_release_callback
, const AssignOrder assign_order
,
644 const PublishLastSeq publish_last_seq
, const bool disable_memtable
) {
646 PERF_TIMER_GUARD(write_pre_and_post_process_time
);
647 WriteThread::Writer
w(write_options
, my_batch
, callback
, log_ref
,
648 disable_memtable
, sub_batch_cnt
, pre_release_callback
);
649 RecordTick(stats_
, WRITE_WITH_WAL
);
650 StopWatch
write_sw(env_
, immutable_db_options_
.statistics
.get(), DB_WRITE
);
652 write_thread
->JoinBatchGroup(&w
);
653 assert(w
.state
!= WriteThread::STATE_PARALLEL_MEMTABLE_WRITER
);
654 if (w
.state
== WriteThread::STATE_COMPLETED
) {
655 if (log_used
!= nullptr) {
656 *log_used
= w
.log_used
;
658 if (seq_used
!= nullptr) {
659 *seq_used
= w
.sequence
;
661 return w
.FinalStatus();
663 // else we are the leader of the write batch group
664 assert(w
.state
== WriteThread::STATE_GROUP_LEADER
);
666 if (publish_last_seq
== kDoPublishLastSeq
) {
667 // Currently we only use kDoPublishLastSeq in unordered_write
668 assert(immutable_db_options_
.unordered_write
);
669 WriteContext write_context
;
670 if (error_handler_
.IsDBStopped()) {
671 status
= error_handler_
.GetBGError();
673 // TODO(myabandeh): Make preliminary checks thread-safe so we could do them
674 // without paying the cost of obtaining the mutex.
676 InstrumentedMutexLock
l(&mutex_
);
677 bool need_log_sync
= false;
678 status
= PreprocessWrite(write_options
, &need_log_sync
, &write_context
);
679 WriteStatusCheck(status
);
682 WriteThread::WriteGroup write_group
;
683 write_thread
->EnterAsBatchGroupLeader(&w
, &write_group
);
684 write_thread
->ExitAsBatchGroupLeader(write_group
, status
);
689 WriteThread::WriteGroup write_group
;
690 uint64_t last_sequence
;
691 write_thread
->EnterAsBatchGroupLeader(&w
, &write_group
);
692 // Note: no need to update last_batch_group_size_ here since the batch writes
695 size_t pre_release_callback_cnt
= 0;
696 size_t total_byte_size
= 0;
697 for (auto* writer
: write_group
) {
698 if (writer
->CheckCallback(this)) {
699 total_byte_size
= WriteBatchInternal::AppendedByteSize(
700 total_byte_size
, WriteBatchInternal::ByteSize(writer
->batch
));
701 if (writer
->pre_release_callback
) {
702 pre_release_callback_cnt
++;
707 const bool concurrent_update
= true;
708 // Update stats while we are an exclusive group leader, so we know
709 // that nobody else can be writing to these particular stats.
710 // We're optimistic, updating the stats before we successfully
711 // commit. That lets us release our leader status early.
712 auto stats
= default_cf_internal_stats_
;
713 stats
->AddDBStats(InternalStats::kIntStatsBytesWritten
, total_byte_size
,
715 RecordTick(stats_
, BYTES_WRITTEN
, total_byte_size
);
716 stats
->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf
, 1,
718 RecordTick(stats_
, WRITE_DONE_BY_SELF
);
719 auto write_done_by_other
= write_group
.size
- 1;
720 if (write_done_by_other
> 0) {
721 stats
->AddDBStats(InternalStats::kIntStatsWriteDoneByOther
,
722 write_done_by_other
, concurrent_update
);
723 RecordTick(stats_
, WRITE_DONE_BY_OTHER
, write_done_by_other
);
725 RecordInHistogram(stats_
, BYTES_PER_WRITE
, total_byte_size
);
727 PERF_TIMER_STOP(write_pre_and_post_process_time
);
729 PERF_TIMER_GUARD(write_wal_time
);
730 // LastAllocatedSequence is increased inside WriteToWAL under
731 // wal_write_mutex_ to ensure ordered events in WAL
732 size_t seq_inc
= 0 /* total_count */;
733 if (assign_order
== kDoAssignOrder
) {
734 size_t total_batch_cnt
= 0;
735 for (auto* writer
: write_group
) {
736 assert(writer
->batch_cnt
|| !seq_per_batch_
);
737 if (!writer
->CallbackFailed()) {
738 total_batch_cnt
+= writer
->batch_cnt
;
741 seq_inc
= total_batch_cnt
;
743 if (!write_options
.disableWAL
) {
745 ConcurrentWriteToWAL(write_group
, log_used
, &last_sequence
, seq_inc
);
747 // Otherwise we inc seq number to do solely the seq allocation
748 last_sequence
= versions_
->FetchAddLastAllocatedSequence(seq_inc
);
751 size_t memtable_write_cnt
= 0;
752 auto curr_seq
= last_sequence
+ 1;
753 for (auto* writer
: write_group
) {
754 if (writer
->CallbackFailed()) {
757 writer
->sequence
= curr_seq
;
758 if (assign_order
== kDoAssignOrder
) {
759 assert(writer
->batch_cnt
|| !seq_per_batch_
);
760 curr_seq
+= writer
->batch_cnt
;
762 if (!writer
->disable_memtable
) {
763 memtable_write_cnt
++;
765 // else seq advances only by memtable writes
767 if (status
.ok() && write_options
.sync
) {
768 assert(!write_options
.disableWAL
);
769 // Requesting sync with two_write_queues_ is expected to be very rare. We
770 // hance provide a simple implementation that is not necessarily efficient.
771 if (manual_wal_flush_
) {
772 status
= FlushWAL(true);
777 PERF_TIMER_START(write_pre_and_post_process_time
);
779 if (!w
.CallbackFailed()) {
780 WriteStatusCheck(status
);
784 for (auto* writer
: write_group
) {
785 if (!writer
->CallbackFailed() && writer
->pre_release_callback
) {
786 assert(writer
->sequence
!= kMaxSequenceNumber
);
787 Status ws
= writer
->pre_release_callback
->Callback(
788 writer
->sequence
, disable_memtable
, writer
->log_used
, index
++,
789 pre_release_callback_cnt
);
797 if (publish_last_seq
== kDoPublishLastSeq
) {
798 versions_
->SetLastSequence(last_sequence
+ seq_inc
);
799 // Currently we only use kDoPublishLastSeq in unordered_write
800 assert(immutable_db_options_
.unordered_write
);
802 if (immutable_db_options_
.unordered_write
&& status
.ok()) {
803 pending_memtable_writes_
+= memtable_write_cnt
;
805 write_thread
->ExitAsBatchGroupLeader(write_group
, status
);
807 status
= w
.FinalStatus();
809 if (seq_used
!= nullptr) {
810 *seq_used
= w
.sequence
;
815 void DBImpl::WriteStatusCheck(const Status
& status
) {
816 // Is setting bg_error_ enough here? This will at least stop
817 // compaction and fail any further writes.
818 if (immutable_db_options_
.paranoid_checks
&& !status
.ok() &&
819 !status
.IsBusy() && !status
.IsIncomplete()) {
821 error_handler_
.SetBGError(status
, BackgroundErrorReason::kWriteCallback
);
826 void DBImpl::MemTableInsertStatusCheck(const Status
& status
) {
827 // A non-OK status here indicates that the state implied by the
828 // WAL has diverged from the in-memory state. This could be
829 // because of a corrupt write_batch (very bad), or because the
830 // client specified an invalid column family and didn't specify
831 // ignore_missing_column_families.
834 assert(!error_handler_
.IsBGWorkStopped());
835 error_handler_
.SetBGError(status
, BackgroundErrorReason::kMemTable
);
840 Status
DBImpl::PreprocessWrite(const WriteOptions
& write_options
,
842 WriteContext
* write_context
) {
844 assert(write_context
!= nullptr && need_log_sync
!= nullptr);
847 if (error_handler_
.IsDBStopped()) {
848 status
= error_handler_
.GetBGError();
851 PERF_TIMER_GUARD(write_scheduling_flushes_compactions_time
);
853 assert(!single_column_family_mode_
||
854 versions_
->GetColumnFamilySet()->NumberOfColumnFamilies() == 1);
855 if (UNLIKELY(status
.ok() && !single_column_family_mode_
&&
856 total_log_size_
> GetMaxTotalWalSize())) {
857 WaitForPendingWrites();
858 status
= SwitchWAL(write_context
);
861 if (UNLIKELY(status
.ok() && write_buffer_manager_
->ShouldFlush())) {
862 // Before a new memtable is added in SwitchMemtable(),
863 // write_buffer_manager_->ShouldFlush() will keep returning true. If another
864 // thread is writing to another DB with the same write buffer, they may also
865 // be flushed. We may end up with flushing much more DBs than needed. It's
866 // suboptimal but still correct.
867 WaitForPendingWrites();
868 status
= HandleWriteBufferFull(write_context
);
871 if (UNLIKELY(status
.ok() && !trim_history_scheduler_
.Empty())) {
872 status
= TrimMemtableHistory(write_context
);
875 if (UNLIKELY(status
.ok() && !flush_scheduler_
.Empty())) {
876 WaitForPendingWrites();
877 status
= ScheduleFlushes(write_context
);
880 PERF_TIMER_STOP(write_scheduling_flushes_compactions_time
);
881 PERF_TIMER_GUARD(write_pre_and_post_process_time
);
883 if (UNLIKELY(status
.ok() && (write_controller_
.IsStopped() ||
884 write_controller_
.NeedsDelay()))) {
885 PERF_TIMER_STOP(write_pre_and_post_process_time
);
886 PERF_TIMER_GUARD(write_delay_time
);
887 // We don't know size of curent batch so that we always use the size
888 // for previous one. It might create a fairness issue that expiration
889 // might happen for smaller writes but larger writes can go through.
890 // Can optimize it if it is an issue.
891 status
= DelayWrite(last_batch_group_size_
, write_options
);
892 PERF_TIMER_START(write_pre_and_post_process_time
);
895 if (status
.ok() && *need_log_sync
) {
896 // Wait until the parallel syncs are finished. Any sync process has to sync
897 // the front log too so it is enough to check the status of front()
898 // We do a while loop since log_sync_cv_ is signalled when any sync is
900 // Note: there does not seem to be a reason to wait for parallel sync at
901 // this early step but it is not important since parallel sync (SyncWAL) and
902 // need_log_sync are usually not used together.
903 while (logs_
.front().getting_synced
) {
906 for (auto& log
: logs_
) {
907 assert(!log
.getting_synced
);
908 // This is just to prevent the logs to be synced by a parallel SyncWAL
909 // call. We will do the actual syncing later after we will write to the
911 // Note: there does not seem to be a reason to set this early before we
912 // actually write to the WAL
913 log
.getting_synced
= true;
916 *need_log_sync
= false;
922 WriteBatch
* DBImpl::MergeBatch(const WriteThread::WriteGroup
& write_group
,
923 WriteBatch
* tmp_batch
, size_t* write_with_wal
,
924 WriteBatch
** to_be_cached_state
) {
925 assert(write_with_wal
!= nullptr);
926 assert(tmp_batch
!= nullptr);
927 assert(*to_be_cached_state
== nullptr);
928 WriteBatch
* merged_batch
= nullptr;
930 auto* leader
= write_group
.leader
;
931 assert(!leader
->disable_wal
); // Same holds for all in the batch group
932 if (write_group
.size
== 1 && !leader
->CallbackFailed() &&
933 leader
->batch
->GetWalTerminationPoint().is_cleared()) {
934 // we simply write the first WriteBatch to WAL if the group only
935 // contains one batch, that batch should be written to the WAL,
936 // and the batch is not wanting to be truncated
937 merged_batch
= leader
->batch
;
938 if (WriteBatchInternal::IsLatestPersistentState(merged_batch
)) {
939 *to_be_cached_state
= merged_batch
;
943 // WAL needs all of the batches flattened into a single batch.
944 // We could avoid copying here with an iov-like AddRecord
946 merged_batch
= tmp_batch
;
947 for (auto writer
: write_group
) {
948 if (!writer
->CallbackFailed()) {
949 WriteBatchInternal::Append(merged_batch
, writer
->batch
,
951 if (WriteBatchInternal::IsLatestPersistentState(writer
->batch
)) {
952 // We only need to cache the last of such write batch
953 *to_be_cached_state
= writer
->batch
;
962 // When two_write_queues_ is disabled, this function is called from the only
963 // write thread. Otherwise this must be called holding log_write_mutex_.
964 Status
DBImpl::WriteToWAL(const WriteBatch
& merged_batch
,
965 log::Writer
* log_writer
, uint64_t* log_used
,
966 uint64_t* log_size
) {
967 assert(log_size
!= nullptr);
968 Slice log_entry
= WriteBatchInternal::Contents(&merged_batch
);
969 *log_size
= log_entry
.size();
970 // When two_write_queues_ WriteToWAL has to be protected from concurretn calls
971 // from the two queues anyway and log_write_mutex_ is already held. Otherwise
972 // if manual_wal_flush_ is enabled we need to protect log_writer->AddRecord
973 // from possible concurrent calls via the FlushWAL by the application.
974 const bool needs_locking
= manual_wal_flush_
&& !two_write_queues_
;
975 // Due to performance cocerns of missed branch prediction penalize the new
976 // manual_wal_flush_ feature (by UNLIKELY) instead of the more common case
977 // when we do not need any locking.
978 if (UNLIKELY(needs_locking
)) {
979 log_write_mutex_
.Lock();
981 Status status
= log_writer
->AddRecord(log_entry
);
982 if (UNLIKELY(needs_locking
)) {
983 log_write_mutex_
.Unlock();
985 if (log_used
!= nullptr) {
986 *log_used
= logfile_number_
;
988 total_log_size_
+= log_entry
.size();
989 // TODO(myabandeh): it might be unsafe to access alive_log_files_.back() here
990 // since alive_log_files_ might be modified concurrently
991 alive_log_files_
.back().AddSize(log_entry
.size());
996 Status
DBImpl::WriteToWAL(const WriteThread::WriteGroup
& write_group
,
997 log::Writer
* log_writer
, uint64_t* log_used
,
998 bool need_log_sync
, bool need_log_dir_sync
,
999 SequenceNumber sequence
) {
1002 assert(!write_group
.leader
->disable_wal
);
1003 // Same holds for all in the batch group
1004 size_t write_with_wal
= 0;
1005 WriteBatch
* to_be_cached_state
= nullptr;
1006 WriteBatch
* merged_batch
= MergeBatch(write_group
, &tmp_batch_
,
1007 &write_with_wal
, &to_be_cached_state
);
1008 if (merged_batch
== write_group
.leader
->batch
) {
1009 write_group
.leader
->log_used
= logfile_number_
;
1010 } else if (write_with_wal
> 1) {
1011 for (auto writer
: write_group
) {
1012 writer
->log_used
= logfile_number_
;
1016 WriteBatchInternal::SetSequence(merged_batch
, sequence
);
1019 status
= WriteToWAL(*merged_batch
, log_writer
, log_used
, &log_size
);
1020 if (to_be_cached_state
) {
1021 cached_recoverable_state_
= *to_be_cached_state
;
1022 cached_recoverable_state_empty_
= false;
1025 if (status
.ok() && need_log_sync
) {
1026 StopWatch
sw(env_
, stats_
, WAL_FILE_SYNC_MICROS
);
1027 // It's safe to access logs_ with unlocked mutex_ here because:
1028 // - we've set getting_synced=true for all logs,
1029 // so other threads won't pop from logs_ while we're here,
1030 // - only writer thread can push to logs_, and we're in
1031 // writer thread, so no one will push to logs_,
1032 // - as long as other threads don't modify it, it's safe to read
1033 // from std::deque from multiple threads concurrently.
1034 for (auto& log
: logs_
) {
1035 status
= log
.writer
->file()->Sync(immutable_db_options_
.use_fsync
);
1040 if (status
.ok() && need_log_dir_sync
) {
1041 // We only sync WAL directory the first time WAL syncing is
1042 // requested, so that in case users never turn on WAL sync,
1043 // we can avoid the disk I/O in the write code path.
1044 status
= directories_
.GetWalDir()->Fsync();
1048 if (merged_batch
== &tmp_batch_
) {
1052 auto stats
= default_cf_internal_stats_
;
1053 if (need_log_sync
) {
1054 stats
->AddDBStats(InternalStats::kIntStatsWalFileSynced
, 1);
1055 RecordTick(stats_
, WAL_FILE_SYNCED
);
1057 stats
->AddDBStats(InternalStats::kIntStatsWalFileBytes
, log_size
);
1058 RecordTick(stats_
, WAL_FILE_BYTES
, log_size
);
1059 stats
->AddDBStats(InternalStats::kIntStatsWriteWithWal
, write_with_wal
);
1060 RecordTick(stats_
, WRITE_WITH_WAL
, write_with_wal
);
1065 Status
DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup
& write_group
,
1067 SequenceNumber
* last_sequence
,
1071 assert(!write_group
.leader
->disable_wal
);
1072 // Same holds for all in the batch group
1073 WriteBatch tmp_batch
;
1074 size_t write_with_wal
= 0;
1075 WriteBatch
* to_be_cached_state
= nullptr;
1076 WriteBatch
* merged_batch
=
1077 MergeBatch(write_group
, &tmp_batch
, &write_with_wal
, &to_be_cached_state
);
1079 // We need to lock log_write_mutex_ since logs_ and alive_log_files might be
1080 // pushed back concurrently
1081 log_write_mutex_
.Lock();
1082 if (merged_batch
== write_group
.leader
->batch
) {
1083 write_group
.leader
->log_used
= logfile_number_
;
1084 } else if (write_with_wal
> 1) {
1085 for (auto writer
: write_group
) {
1086 writer
->log_used
= logfile_number_
;
1089 *last_sequence
= versions_
->FetchAddLastAllocatedSequence(seq_inc
);
1090 auto sequence
= *last_sequence
+ 1;
1091 WriteBatchInternal::SetSequence(merged_batch
, sequence
);
1093 log::Writer
* log_writer
= logs_
.back().writer
;
1095 status
= WriteToWAL(*merged_batch
, log_writer
, log_used
, &log_size
);
1096 if (to_be_cached_state
) {
1097 cached_recoverable_state_
= *to_be_cached_state
;
1098 cached_recoverable_state_empty_
= false;
1100 log_write_mutex_
.Unlock();
1103 const bool concurrent
= true;
1104 auto stats
= default_cf_internal_stats_
;
1105 stats
->AddDBStats(InternalStats::kIntStatsWalFileBytes
, log_size
,
1107 RecordTick(stats_
, WAL_FILE_BYTES
, log_size
);
1108 stats
->AddDBStats(InternalStats::kIntStatsWriteWithWal
, write_with_wal
,
1110 RecordTick(stats_
, WRITE_WITH_WAL
, write_with_wal
);
1115 Status
DBImpl::WriteRecoverableState() {
1116 mutex_
.AssertHeld();
1117 if (!cached_recoverable_state_empty_
) {
1118 bool dont_care_bool
;
1119 SequenceNumber next_seq
;
1120 if (two_write_queues_
) {
1121 log_write_mutex_
.Lock();
1124 if (two_write_queues_
) {
1125 seq
= versions_
->FetchAddLastAllocatedSequence(0);
1127 seq
= versions_
->LastSequence();
1129 WriteBatchInternal::SetSequence(&cached_recoverable_state_
, seq
+ 1);
1130 auto status
= WriteBatchInternal::InsertInto(
1131 &cached_recoverable_state_
, column_family_memtables_
.get(),
1132 &flush_scheduler_
, &trim_history_scheduler_
, true,
1133 0 /*recovery_log_number*/, this, false /* concurrent_memtable_writes */,
1134 &next_seq
, &dont_care_bool
, seq_per_batch_
);
1135 auto last_seq
= next_seq
- 1;
1136 if (two_write_queues_
) {
1137 versions_
->FetchAddLastAllocatedSequence(last_seq
- seq
);
1138 versions_
->SetLastPublishedSequence(last_seq
);
1140 versions_
->SetLastSequence(last_seq
);
1141 if (two_write_queues_
) {
1142 log_write_mutex_
.Unlock();
1144 if (status
.ok() && recoverable_state_pre_release_callback_
) {
1145 const bool DISABLE_MEMTABLE
= true;
1146 for (uint64_t sub_batch_seq
= seq
+ 1;
1147 sub_batch_seq
< next_seq
&& status
.ok(); sub_batch_seq
++) {
1148 uint64_t const no_log_num
= 0;
1149 // Unlock it since the callback might end up locking mutex. e.g.,
1150 // AddCommitted -> AdvanceMaxEvictedSeq -> GetSnapshotListFromDB
1152 status
= recoverable_state_pre_release_callback_
->Callback(
1153 sub_batch_seq
, !DISABLE_MEMTABLE
, no_log_num
, 0, 1);
1158 cached_recoverable_state_
.Clear();
1159 cached_recoverable_state_empty_
= true;
1163 return Status::OK();
1166 void DBImpl::SelectColumnFamiliesForAtomicFlush(
1167 autovector
<ColumnFamilyData
*>* cfds
) {
1168 for (ColumnFamilyData
* cfd
: *versions_
->GetColumnFamilySet()) {
1169 if (cfd
->IsDropped()) {
1172 if (cfd
->imm()->NumNotFlushed() != 0 || !cfd
->mem()->IsEmpty() ||
1173 !cached_recoverable_state_empty_
.load()) {
1174 cfds
->push_back(cfd
);
1179 // Assign sequence number for atomic flush.
1180 void DBImpl::AssignAtomicFlushSeq(const autovector
<ColumnFamilyData
*>& cfds
) {
1181 assert(immutable_db_options_
.atomic_flush
);
1182 auto seq
= versions_
->LastSequence();
1183 for (auto cfd
: cfds
) {
1184 cfd
->imm()->AssignAtomicFlushSeq(seq
);
1188 Status
DBImpl::SwitchWAL(WriteContext
* write_context
) {
1189 mutex_
.AssertHeld();
1190 assert(write_context
!= nullptr);
1193 if (alive_log_files_
.begin()->getting_flushed
) {
1197 auto oldest_alive_log
= alive_log_files_
.begin()->number
;
1198 bool flush_wont_release_oldest_log
= false;
1200 auto oldest_log_with_uncommitted_prep
=
1201 logs_with_prep_tracker_
.FindMinLogContainingOutstandingPrep();
1203 assert(oldest_log_with_uncommitted_prep
== 0 ||
1204 oldest_log_with_uncommitted_prep
>= oldest_alive_log
);
1205 if (oldest_log_with_uncommitted_prep
> 0 &&
1206 oldest_log_with_uncommitted_prep
== oldest_alive_log
) {
1207 if (unable_to_release_oldest_log_
) {
1208 // we already attempted to flush all column families dependent on
1209 // the oldest alive log but the log still contained uncommitted
1210 // transactions so there is still nothing that we can do.
1214 immutable_db_options_
.info_log
,
1215 "Unable to release oldest log due to uncommitted transaction");
1216 unable_to_release_oldest_log_
= true;
1217 flush_wont_release_oldest_log
= true;
1221 if (!flush_wont_release_oldest_log
) {
1222 // we only mark this log as getting flushed if we have successfully
1223 // flushed all data in this log. If this log contains outstanding prepared
1224 // transactions then we cannot flush this log until those transactions are
1226 unable_to_release_oldest_log_
= false;
1227 alive_log_files_
.begin()->getting_flushed
= true;
1231 immutable_db_options_
.info_log
,
1232 "Flushing all column families with data in WAL number %" PRIu64
1233 ". Total log size is %" PRIu64
" while max_total_wal_size is %" PRIu64
,
1234 oldest_alive_log
, total_log_size_
.load(), GetMaxTotalWalSize());
1235 // no need to refcount because drop is happening in write thread, so can't
1236 // happen while we're in the write thread
1237 autovector
<ColumnFamilyData
*> cfds
;
1238 if (immutable_db_options_
.atomic_flush
) {
1239 SelectColumnFamiliesForAtomicFlush(&cfds
);
1241 for (auto cfd
: *versions_
->GetColumnFamilySet()) {
1242 if (cfd
->IsDropped()) {
1245 if (cfd
->OldestLogToKeep() <= oldest_alive_log
) {
1246 cfds
.push_back(cfd
);
1249 MaybeFlushStatsCF(&cfds
);
1251 WriteThread::Writer nonmem_w
;
1252 if (two_write_queues_
) {
1253 nonmem_write_thread_
.EnterUnbatched(&nonmem_w
, &mutex_
);
1256 for (const auto cfd
: cfds
) {
1258 status
= SwitchMemtable(cfd
, write_context
);
1259 cfd
->UnrefAndTryDelete();
1264 if (two_write_queues_
) {
1265 nonmem_write_thread_
.ExitUnbatched(&nonmem_w
);
1269 if (immutable_db_options_
.atomic_flush
) {
1270 AssignAtomicFlushSeq(cfds
);
1272 for (auto cfd
: cfds
) {
1273 cfd
->imm()->FlushRequested();
1275 FlushRequest flush_req
;
1276 GenerateFlushRequest(cfds
, &flush_req
);
1277 SchedulePendingFlush(flush_req
, FlushReason::kWriteBufferManager
);
1278 MaybeScheduleFlushOrCompaction();
1283 Status
DBImpl::HandleWriteBufferFull(WriteContext
* write_context
) {
1284 mutex_
.AssertHeld();
1285 assert(write_context
!= nullptr);
1288 // Before a new memtable is added in SwitchMemtable(),
1289 // write_buffer_manager_->ShouldFlush() will keep returning true. If another
1290 // thread is writing to another DB with the same write buffer, they may also
1291 // be flushed. We may end up with flushing much more DBs than needed. It's
1292 // suboptimal but still correct.
1294 immutable_db_options_
.info_log
,
1295 "Flushing column family with oldest memtable entry. Write buffer is "
1296 "using %" ROCKSDB_PRIszt
" bytes out of a total of %" ROCKSDB_PRIszt
".",
1297 write_buffer_manager_
->memory_usage(),
1298 write_buffer_manager_
->buffer_size());
1299 // no need to refcount because drop is happening in write thread, so can't
1300 // happen while we're in the write thread
1301 autovector
<ColumnFamilyData
*> cfds
;
1302 if (immutable_db_options_
.atomic_flush
) {
1303 SelectColumnFamiliesForAtomicFlush(&cfds
);
1305 ColumnFamilyData
* cfd_picked
= nullptr;
1306 SequenceNumber seq_num_for_cf_picked
= kMaxSequenceNumber
;
1308 for (auto cfd
: *versions_
->GetColumnFamilySet()) {
1309 if (cfd
->IsDropped()) {
1312 if (!cfd
->mem()->IsEmpty()) {
1313 // We only consider active mem table, hoping immutable memtable is
1314 // already in the process of flushing.
1315 uint64_t seq
= cfd
->mem()->GetCreationSeq();
1316 if (cfd_picked
== nullptr || seq
< seq_num_for_cf_picked
) {
1318 seq_num_for_cf_picked
= seq
;
1322 if (cfd_picked
!= nullptr) {
1323 cfds
.push_back(cfd_picked
);
1325 MaybeFlushStatsCF(&cfds
);
1328 WriteThread::Writer nonmem_w
;
1329 if (two_write_queues_
) {
1330 nonmem_write_thread_
.EnterUnbatched(&nonmem_w
, &mutex_
);
1332 for (const auto cfd
: cfds
) {
1333 if (cfd
->mem()->IsEmpty()) {
1337 status
= SwitchMemtable(cfd
, write_context
);
1338 cfd
->UnrefAndTryDelete();
1343 if (two_write_queues_
) {
1344 nonmem_write_thread_
.ExitUnbatched(&nonmem_w
);
1348 if (immutable_db_options_
.atomic_flush
) {
1349 AssignAtomicFlushSeq(cfds
);
1351 for (const auto cfd
: cfds
) {
1352 cfd
->imm()->FlushRequested();
1354 FlushRequest flush_req
;
1355 GenerateFlushRequest(cfds
, &flush_req
);
1356 SchedulePendingFlush(flush_req
, FlushReason::kWriteBufferFull
);
1357 MaybeScheduleFlushOrCompaction();
1362 uint64_t DBImpl::GetMaxTotalWalSize() const {
1363 mutex_
.AssertHeld();
1364 return mutable_db_options_
.max_total_wal_size
== 0
1365 ? 4 * max_total_in_memory_state_
1366 : mutable_db_options_
.max_total_wal_size
;
1369 // REQUIRES: mutex_ is held
1370 // REQUIRES: this thread is currently at the front of the writer queue
1371 Status
DBImpl::DelayWrite(uint64_t num_bytes
,
1372 const WriteOptions
& write_options
) {
1373 uint64_t time_delayed
= 0;
1374 bool delayed
= false;
1376 StopWatch
sw(env_
, stats_
, WRITE_STALL
, &time_delayed
);
1377 uint64_t delay
= write_controller_
.GetDelay(env_
, num_bytes
);
1379 if (write_options
.no_slowdown
) {
1380 return Status::Incomplete("Write stall");
1382 TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep");
1384 // Notify write_thread_ about the stall so it can setup a barrier and
1385 // fail any pending writers with no_slowdown
1386 write_thread_
.BeginWriteStall();
1387 TEST_SYNC_POINT("DBImpl::DelayWrite:BeginWriteStallDone");
1389 // We will delay the write until we have slept for delay ms or
1390 // we don't need a delay anymore
1391 const uint64_t kDelayInterval
= 1000;
1392 uint64_t stall_end
= sw
.start_time() + delay
;
1393 while (write_controller_
.NeedsDelay()) {
1394 if (env_
->NowMicros() >= stall_end
) {
1395 // We already delayed this write `delay` microseconds
1400 // Sleep for 0.001 seconds
1401 env_
->SleepForMicroseconds(kDelayInterval
);
1404 write_thread_
.EndWriteStall();
1407 // Don't wait if there's a background error, even if its a soft error. We
1408 // might wait here indefinitely as the background compaction may never
1409 // finish successfully, resulting in the stall condition lasting
1411 while (error_handler_
.GetBGError().ok() && write_controller_
.IsStopped()) {
1412 if (write_options
.no_slowdown
) {
1413 return Status::Incomplete("Write stall");
1417 // Notify write_thread_ about the stall so it can setup a barrier and
1418 // fail any pending writers with no_slowdown
1419 write_thread_
.BeginWriteStall();
1420 TEST_SYNC_POINT("DBImpl::DelayWrite:Wait");
1422 write_thread_
.EndWriteStall();
1425 assert(!delayed
|| !write_options
.no_slowdown
);
1427 default_cf_internal_stats_
->AddDBStats(
1428 InternalStats::kIntStatsWriteStallMicros
, time_delayed
);
1429 RecordTick(stats_
, STALL_MICROS
, time_delayed
);
1432 // If DB is not in read-only mode and write_controller is not stopping
1433 // writes, we can ignore any background errors and allow the write to
1436 if (write_controller_
.IsStopped()) {
1437 // If writes are still stopped, it means we bailed due to a background
1439 s
= Status::Incomplete(error_handler_
.GetBGError().ToString());
1441 if (error_handler_
.IsDBStopped()) {
1442 s
= error_handler_
.GetBGError();
1447 Status
DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions
& write_options
,
1448 WriteBatch
* my_batch
) {
1449 assert(write_options
.low_pri
);
1450 // This is called outside the DB mutex. Although it is safe to make the call,
1451 // the consistency condition is not guaranteed to hold. It's OK to live with
1453 // If we need to speed compaction, it means the compaction is left behind
1454 // and we start to limit low pri writes to a limit.
1455 if (write_controller_
.NeedSpeedupCompaction()) {
1456 if (allow_2pc() && (my_batch
->HasCommit() || my_batch
->HasRollback())) {
1457 // For 2PC, we only rate limit prepare, not commit.
1458 return Status::OK();
1460 if (write_options
.no_slowdown
) {
1461 return Status::Incomplete("Low priority write stall");
1463 assert(my_batch
!= nullptr);
1464 // Rate limit those writes. The reason that we don't completely wait
1465 // is that in case the write is heavy, low pri writes may never have
1466 // a chance to run. Now we guarantee we are still slowly making
1468 PERF_TIMER_GUARD(write_delay_time
);
1469 write_controller_
.low_pri_rate_limiter()->Request(
1470 my_batch
->GetDataSize(), Env::IO_HIGH
, nullptr /* stats */,
1471 RateLimiter::OpType::kWrite
);
1474 return Status::OK();
1477 void DBImpl::MaybeFlushStatsCF(autovector
<ColumnFamilyData
*>* cfds
) {
1478 assert(cfds
!= nullptr);
1479 if (!cfds
->empty() && immutable_db_options_
.persist_stats_to_disk
) {
1480 ColumnFamilyData
* cfd_stats
=
1481 versions_
->GetColumnFamilySet()->GetColumnFamily(
1482 kPersistentStatsColumnFamilyName
);
1483 if (cfd_stats
!= nullptr && !cfd_stats
->mem()->IsEmpty()) {
1484 for (ColumnFamilyData
* cfd
: *cfds
) {
1485 if (cfd
== cfd_stats
) {
1486 // stats CF already included in cfds
1490 // force flush stats CF when its log number is less than all other CF's
1492 bool force_flush_stats_cf
= true;
1493 for (auto* loop_cfd
: *versions_
->GetColumnFamilySet()) {
1494 if (loop_cfd
== cfd_stats
) {
1497 if (loop_cfd
->GetLogNumber() <= cfd_stats
->GetLogNumber()) {
1498 force_flush_stats_cf
= false;
1501 if (force_flush_stats_cf
) {
1502 cfds
->push_back(cfd_stats
);
1503 ROCKS_LOG_INFO(immutable_db_options_
.info_log
,
1504 "Force flushing stats CF with automated flush "
1505 "to avoid holding old logs");
1511 Status
DBImpl::TrimMemtableHistory(WriteContext
* context
) {
1512 autovector
<ColumnFamilyData
*> cfds
;
1513 ColumnFamilyData
* tmp_cfd
;
1514 while ((tmp_cfd
= trim_history_scheduler_
.TakeNextColumnFamily()) !=
1516 cfds
.push_back(tmp_cfd
);
1518 for (auto& cfd
: cfds
) {
1519 autovector
<MemTable
*> to_delete
;
1520 cfd
->imm()->TrimHistory(&to_delete
, cfd
->mem()->ApproximateMemoryUsage());
1521 if (!to_delete
.empty()) {
1522 for (auto m
: to_delete
) {
1525 context
->superversion_context
.NewSuperVersion();
1526 assert(context
->superversion_context
.new_superversion
.get() != nullptr);
1527 cfd
->InstallSuperVersion(&context
->superversion_context
, &mutex_
);
1530 if (cfd
->UnrefAndTryDelete()) {
1534 return Status::OK();
1537 Status
DBImpl::ScheduleFlushes(WriteContext
* context
) {
1538 autovector
<ColumnFamilyData
*> cfds
;
1539 if (immutable_db_options_
.atomic_flush
) {
1540 SelectColumnFamiliesForAtomicFlush(&cfds
);
1541 for (auto cfd
: cfds
) {
1544 flush_scheduler_
.Clear();
1546 ColumnFamilyData
* tmp_cfd
;
1547 while ((tmp_cfd
= flush_scheduler_
.TakeNextColumnFamily()) != nullptr) {
1548 cfds
.push_back(tmp_cfd
);
1550 MaybeFlushStatsCF(&cfds
);
1553 WriteThread::Writer nonmem_w
;
1554 if (two_write_queues_
) {
1555 nonmem_write_thread_
.EnterUnbatched(&nonmem_w
, &mutex_
);
1558 for (auto& cfd
: cfds
) {
1559 if (!cfd
->mem()->IsEmpty()) {
1560 status
= SwitchMemtable(cfd
, context
);
1562 if (cfd
->UnrefAndTryDelete()) {
1570 if (two_write_queues_
) {
1571 nonmem_write_thread_
.ExitUnbatched(&nonmem_w
);
1575 if (immutable_db_options_
.atomic_flush
) {
1576 AssignAtomicFlushSeq(cfds
);
1578 FlushRequest flush_req
;
1579 GenerateFlushRequest(cfds
, &flush_req
);
1580 SchedulePendingFlush(flush_req
, FlushReason::kWriteBufferFull
);
1581 MaybeScheduleFlushOrCompaction();
1586 #ifndef ROCKSDB_LITE
1587 void DBImpl::NotifyOnMemTableSealed(ColumnFamilyData
* /*cfd*/,
1588 const MemTableInfo
& mem_table_info
) {
1589 if (immutable_db_options_
.listeners
.size() == 0U) {
1592 if (shutting_down_
.load(std::memory_order_acquire
)) {
1596 for (auto listener
: immutable_db_options_
.listeners
) {
1597 listener
->OnMemTableSealed(mem_table_info
);
1600 #endif // ROCKSDB_LITE
1602 // REQUIRES: mutex_ is held
1603 // REQUIRES: this thread is currently at the front of the writer queue
1604 // REQUIRES: this thread is currently at the front of the 2nd writer queue if
1605 // two_write_queues_ is true (This is to simplify the reasoning.)
1606 Status
DBImpl::SwitchMemtable(ColumnFamilyData
* cfd
, WriteContext
* context
) {
1607 mutex_
.AssertHeld();
1608 WriteThread::Writer nonmem_w
;
1609 std::unique_ptr
<WritableFile
> lfile
;
1610 log::Writer
* new_log
= nullptr;
1611 MemTable
* new_mem
= nullptr;
1613 // Recoverable state is persisted in WAL. After memtable switch, WAL might
1614 // be deleted, so we write the state to memtable to be persisted as well.
1615 Status s
= WriteRecoverableState();
1620 // Attempt to switch to a new memtable and trigger flush of old.
1621 // Do this without holding the dbmutex lock.
1622 assert(versions_
->prev_log_number() == 0);
1623 if (two_write_queues_
) {
1624 log_write_mutex_
.Lock();
1626 bool creating_new_log
= !log_empty_
;
1627 if (two_write_queues_
) {
1628 log_write_mutex_
.Unlock();
1630 uint64_t recycle_log_number
= 0;
1631 if (creating_new_log
&& immutable_db_options_
.recycle_log_file_num
&&
1632 !log_recycle_files_
.empty()) {
1633 recycle_log_number
= log_recycle_files_
.front();
1635 uint64_t new_log_number
=
1636 creating_new_log
? versions_
->NewFileNumber() : logfile_number_
;
1637 const MutableCFOptions mutable_cf_options
= *cfd
->GetLatestMutableCFOptions();
1639 // Set memtable_info for memtable sealed callback
1640 #ifndef ROCKSDB_LITE
1641 MemTableInfo memtable_info
;
1642 memtable_info
.cf_name
= cfd
->GetName();
1643 memtable_info
.first_seqno
= cfd
->mem()->GetFirstSequenceNumber();
1644 memtable_info
.earliest_seqno
= cfd
->mem()->GetEarliestSequenceNumber();
1645 memtable_info
.num_entries
= cfd
->mem()->num_entries();
1646 memtable_info
.num_deletes
= cfd
->mem()->num_deletes();
1647 #endif // ROCKSDB_LITE
1648 // Log this later after lock release. It may be outdated, e.g., if background
1649 // flush happens before logging, but that should be ok.
1650 int num_imm_unflushed
= cfd
->imm()->NumNotFlushed();
1651 const auto preallocate_block_size
=
1652 GetWalPreallocateBlockSize(mutable_cf_options
.write_buffer_size
);
1654 if (creating_new_log
) {
1655 // TODO: Write buffer size passed in should be max of all CF's instead
1656 // of mutable_cf_options.write_buffer_size.
1657 s
= CreateWAL(new_log_number
, recycle_log_number
, preallocate_block_size
,
1661 SequenceNumber seq
= versions_
->LastSequence();
1662 new_mem
= cfd
->ConstructNewMemtable(mutable_cf_options
, seq
);
1663 context
->superversion_context
.NewSuperVersion();
1665 ROCKS_LOG_INFO(immutable_db_options_
.info_log
,
1666 "[%s] New memtable created with log file: #%" PRIu64
1667 ". Immutable memtables: %d.\n",
1668 cfd
->GetName().c_str(), new_log_number
, num_imm_unflushed
);
1670 if (recycle_log_number
!= 0) {
1671 // Since renaming the file is done outside DB mutex, we need to ensure
1672 // concurrent full purges don't delete the file while we're recycling it.
1673 // To achieve that we hold the old log number in the recyclable list until
1674 // after it has been renamed.
1675 assert(log_recycle_files_
.front() == recycle_log_number
);
1676 log_recycle_files_
.pop_front();
1678 if (s
.ok() && creating_new_log
) {
1679 log_write_mutex_
.Lock();
1680 assert(new_log
!= nullptr);
1681 if (!logs_
.empty()) {
1682 // Alway flush the buffer of the last log before switching to a new one
1683 log::Writer
* cur_log_writer
= logs_
.back().writer
;
1684 s
= cur_log_writer
->WriteBuffer();
1686 ROCKS_LOG_WARN(immutable_db_options_
.info_log
,
1687 "[%s] Failed to switch from #%" PRIu64
" to #%" PRIu64
1689 cfd
->GetName().c_str(), cur_log_writer
->get_log_number(),
1694 logfile_number_
= new_log_number
;
1696 log_dir_synced_
= false;
1697 logs_
.emplace_back(logfile_number_
, new_log
);
1698 alive_log_files_
.push_back(LogFileNumberSize(logfile_number_
));
1700 log_write_mutex_
.Unlock();
1704 // how do we fail if we're not creating new log?
1705 assert(creating_new_log
);
1712 SuperVersion
* new_superversion
=
1713 context
->superversion_context
.new_superversion
.release();
1714 if (new_superversion
!= nullptr) {
1715 delete new_superversion
;
1717 // We may have lost data from the WritableFileBuffer in-memory buffer for
1718 // the current log, so treat it as a fatal error and set bg_error
1719 error_handler_
.SetBGError(s
, BackgroundErrorReason::kMemTable
);
1720 // Read back bg_error in order to get the right severity
1721 s
= error_handler_
.GetBGError();
1725 for (auto loop_cfd
: *versions_
->GetColumnFamilySet()) {
1726 // all this is just optimization to delete logs that
1727 // are no longer needed -- if CF is empty, that means it
1728 // doesn't need that particular log to stay alive, so we just
1729 // advance the log number. no need to persist this in the manifest
1730 if (loop_cfd
->mem()->GetFirstSequenceNumber() == 0 &&
1731 loop_cfd
->imm()->NumNotFlushed() == 0) {
1732 if (creating_new_log
) {
1733 loop_cfd
->SetLogNumber(logfile_number_
);
1735 loop_cfd
->mem()->SetCreationSeq(versions_
->LastSequence());
1739 cfd
->mem()->SetNextLogNumber(logfile_number_
);
1740 cfd
->imm()->Add(cfd
->mem(), &context
->memtables_to_free_
);
1742 cfd
->SetMemtable(new_mem
);
1743 InstallSuperVersionAndScheduleWork(cfd
, &context
->superversion_context
,
1744 mutable_cf_options
);
1745 #ifndef ROCKSDB_LITE
1747 // Notify client that memtable is sealed, now that we have successfully
1748 // installed a new memtable
1749 NotifyOnMemTableSealed(cfd
, memtable_info
);
1751 #endif // ROCKSDB_LITE
1755 size_t DBImpl::GetWalPreallocateBlockSize(uint64_t write_buffer_size
) const {
1756 mutex_
.AssertHeld();
1758 static_cast<size_t>(write_buffer_size
/ 10 + write_buffer_size
);
1759 // Some users might set very high write_buffer_size and rely on
1760 // max_total_wal_size or other parameters to control the WAL size.
1761 if (mutable_db_options_
.max_total_wal_size
> 0) {
1762 bsize
= std::min
<size_t>(
1763 bsize
, static_cast<size_t>(mutable_db_options_
.max_total_wal_size
));
1765 if (immutable_db_options_
.db_write_buffer_size
> 0) {
1766 bsize
= std::min
<size_t>(bsize
, immutable_db_options_
.db_write_buffer_size
);
1768 if (immutable_db_options_
.write_buffer_manager
&&
1769 immutable_db_options_
.write_buffer_manager
->enabled()) {
1770 bsize
= std::min
<size_t>(
1771 bsize
, immutable_db_options_
.write_buffer_manager
->buffer_size());
1777 // Default implementations of convenience methods that subclasses of DB
1778 // can call if they wish
1779 Status
DB::Put(const WriteOptions
& opt
, ColumnFamilyHandle
* column_family
,
1780 const Slice
& key
, const Slice
& value
) {
1781 if (nullptr == opt
.timestamp
) {
1782 // Pre-allocate size of write batch conservatively.
1783 // 8 bytes are taken by header, 4 bytes for count, 1 byte for type,
1784 // and we allocate 11 extra bytes for key length, as well as value length.
1785 WriteBatch
batch(key
.size() + value
.size() + 24);
1786 Status s
= batch
.Put(column_family
, key
, value
);
1790 return Write(opt
, &batch
);
1792 const Slice
* ts
= opt
.timestamp
;
1793 assert(nullptr != ts
);
1794 size_t ts_sz
= ts
->size();
1795 WriteBatch
batch(key
.size() + ts_sz
+ value
.size() + 24, /*max_bytes=*/0,
1797 Status s
= batch
.Put(column_family
, key
, value
);
1801 s
= batch
.AssignTimestamp(*ts
);
1805 return Write(opt
, &batch
);
1808 Status
DB::Delete(const WriteOptions
& opt
, ColumnFamilyHandle
* column_family
,
1811 batch
.Delete(column_family
, key
);
1812 return Write(opt
, &batch
);
1815 Status
DB::SingleDelete(const WriteOptions
& opt
,
1816 ColumnFamilyHandle
* column_family
, const Slice
& key
) {
1818 batch
.SingleDelete(column_family
, key
);
1819 return Write(opt
, &batch
);
1822 Status
DB::DeleteRange(const WriteOptions
& opt
,
1823 ColumnFamilyHandle
* column_family
,
1824 const Slice
& begin_key
, const Slice
& end_key
) {
1826 batch
.DeleteRange(column_family
, begin_key
, end_key
);
1827 return Write(opt
, &batch
);
1830 Status
DB::Merge(const WriteOptions
& opt
, ColumnFamilyHandle
* column_family
,
1831 const Slice
& key
, const Slice
& value
) {
1833 Status s
= batch
.Merge(column_family
, key
, value
);
1837 return Write(opt
, &batch
);
1839 } // namespace ROCKSDB_NAMESPACE