]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_impl/db_impl_write.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / db / db_impl / db_impl_write.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "db/db_impl/db_impl.h"
10
11 #include <cinttypes>
12 #include "db/error_handler.h"
13 #include "db/event_helpers.h"
14 #include "monitoring/perf_context_imp.h"
15 #include "options/options_helper.h"
16 #include "test_util/sync_point.h"
17
18 namespace ROCKSDB_NAMESPACE {
19 // Convenience methods
20 Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family,
21 const Slice& key, const Slice& val) {
22 return DB::Put(o, column_family, key, val);
23 }
24
25 Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family,
26 const Slice& key, const Slice& val) {
27 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
28 if (!cfh->cfd()->ioptions()->merge_operator) {
29 return Status::NotSupported("Provide a merge_operator when opening DB");
30 } else {
31 return DB::Merge(o, column_family, key, val);
32 }
33 }
34
35 Status DBImpl::Delete(const WriteOptions& write_options,
36 ColumnFamilyHandle* column_family, const Slice& key) {
37 return DB::Delete(write_options, column_family, key);
38 }
39
40 Status DBImpl::SingleDelete(const WriteOptions& write_options,
41 ColumnFamilyHandle* column_family,
42 const Slice& key) {
43 return DB::SingleDelete(write_options, column_family, key);
44 }
45
46 void DBImpl::SetRecoverableStatePreReleaseCallback(
47 PreReleaseCallback* callback) {
48 recoverable_state_pre_release_callback_.reset(callback);
49 }
50
51 Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
52 return WriteImpl(write_options, my_batch, nullptr, nullptr);
53 }
54
55 #ifndef ROCKSDB_LITE
56 Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
57 WriteBatch* my_batch,
58 WriteCallback* callback) {
59 return WriteImpl(write_options, my_batch, callback, nullptr);
60 }
61 #endif // ROCKSDB_LITE
62
63 // The main write queue. This is the only write queue that updates LastSequence.
64 // When using one write queue, the same sequence also indicates the last
65 // published sequence.
66 Status DBImpl::WriteImpl(const WriteOptions& write_options,
67 WriteBatch* my_batch, WriteCallback* callback,
68 uint64_t* log_used, uint64_t log_ref,
69 bool disable_memtable, uint64_t* seq_used,
70 size_t batch_cnt,
71 PreReleaseCallback* pre_release_callback) {
72 assert(!seq_per_batch_ || batch_cnt != 0);
73 if (my_batch == nullptr) {
74 return Status::Corruption("Batch is nullptr!");
75 }
76 if (tracer_) {
77 InstrumentedMutexLock lock(&trace_mutex_);
78 if (tracer_) {
79 tracer_->Write(my_batch);
80 }
81 }
82 if (write_options.sync && write_options.disableWAL) {
83 return Status::InvalidArgument("Sync writes has to enable WAL.");
84 }
85 if (two_write_queues_ && immutable_db_options_.enable_pipelined_write) {
86 return Status::NotSupported(
87 "pipelined_writes is not compatible with concurrent prepares");
88 }
89 if (seq_per_batch_ && immutable_db_options_.enable_pipelined_write) {
90 // TODO(yiwu): update pipeline write with seq_per_batch and batch_cnt
91 return Status::NotSupported(
92 "pipelined_writes is not compatible with seq_per_batch");
93 }
94 if (immutable_db_options_.unordered_write &&
95 immutable_db_options_.enable_pipelined_write) {
96 return Status::NotSupported(
97 "pipelined_writes is not compatible with unordered_write");
98 }
99 // Otherwise IsLatestPersistentState optimization does not make sense
100 assert(!WriteBatchInternal::IsLatestPersistentState(my_batch) ||
101 disable_memtable);
102
103 Status status;
104 if (write_options.low_pri) {
105 status = ThrottleLowPriWritesIfNeeded(write_options, my_batch);
106 if (!status.ok()) {
107 return status;
108 }
109 }
110
111 if (two_write_queues_ && disable_memtable) {
112 AssignOrder assign_order =
113 seq_per_batch_ ? kDoAssignOrder : kDontAssignOrder;
114 // Otherwise it is WAL-only Prepare batches in WriteCommitted policy and
115 // they don't consume sequence.
116 return WriteImplWALOnly(&nonmem_write_thread_, write_options, my_batch,
117 callback, log_used, log_ref, seq_used, batch_cnt,
118 pre_release_callback, assign_order,
119 kDontPublishLastSeq, disable_memtable);
120 }
121
122 if (immutable_db_options_.unordered_write) {
123 const size_t sub_batch_cnt = batch_cnt != 0
124 ? batch_cnt
125 // every key is a sub-batch consuming a seq
126 : WriteBatchInternal::Count(my_batch);
127 uint64_t seq;
128 // Use a write thread to i) optimize for WAL write, ii) publish last
129 // sequence in in increasing order, iii) call pre_release_callback serially
130 status = WriteImplWALOnly(&write_thread_, write_options, my_batch, callback,
131 log_used, log_ref, &seq, sub_batch_cnt,
132 pre_release_callback, kDoAssignOrder,
133 kDoPublishLastSeq, disable_memtable);
134 TEST_SYNC_POINT("DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL");
135 if (!status.ok()) {
136 return status;
137 }
138 if (seq_used) {
139 *seq_used = seq;
140 }
141 if (!disable_memtable) {
142 TEST_SYNC_POINT("DBImpl::WriteImpl:BeforeUnorderedWriteMemtable");
143 status = UnorderedWriteMemtable(write_options, my_batch, callback,
144 log_ref, seq, sub_batch_cnt);
145 }
146 return status;
147 }
148
149 if (immutable_db_options_.enable_pipelined_write) {
150 return PipelinedWriteImpl(write_options, my_batch, callback, log_used,
151 log_ref, disable_memtable, seq_used);
152 }
153
154 PERF_TIMER_GUARD(write_pre_and_post_process_time);
155 WriteThread::Writer w(write_options, my_batch, callback, log_ref,
156 disable_memtable, batch_cnt, pre_release_callback);
157
158 if (!write_options.disableWAL) {
159 RecordTick(stats_, WRITE_WITH_WAL);
160 }
161
162 StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
163
164 write_thread_.JoinBatchGroup(&w);
165 if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
166 // we are a non-leader in a parallel group
167
168 if (w.ShouldWriteToMemtable()) {
169 PERF_TIMER_STOP(write_pre_and_post_process_time);
170 PERF_TIMER_GUARD(write_memtable_time);
171
172 ColumnFamilyMemTablesImpl column_family_memtables(
173 versions_->GetColumnFamilySet());
174 w.status = WriteBatchInternal::InsertInto(
175 &w, w.sequence, &column_family_memtables, &flush_scheduler_,
176 &trim_history_scheduler_,
177 write_options.ignore_missing_column_families, 0 /*log_number*/, this,
178 true /*concurrent_memtable_writes*/, seq_per_batch_, w.batch_cnt,
179 batch_per_txn_, write_options.memtable_insert_hint_per_batch);
180
181 PERF_TIMER_START(write_pre_and_post_process_time);
182 }
183
184 if (write_thread_.CompleteParallelMemTableWriter(&w)) {
185 // we're responsible for exit batch group
186 // TODO(myabandeh): propagate status to write_group
187 auto last_sequence = w.write_group->last_sequence;
188 versions_->SetLastSequence(last_sequence);
189 MemTableInsertStatusCheck(w.status);
190 write_thread_.ExitAsBatchGroupFollower(&w);
191 }
192 assert(w.state == WriteThread::STATE_COMPLETED);
193 // STATE_COMPLETED conditional below handles exit
194
195 status = w.FinalStatus();
196 }
197 if (w.state == WriteThread::STATE_COMPLETED) {
198 if (log_used != nullptr) {
199 *log_used = w.log_used;
200 }
201 if (seq_used != nullptr) {
202 *seq_used = w.sequence;
203 }
204 // write is complete and leader has updated sequence
205 return w.FinalStatus();
206 }
207 // else we are the leader of the write batch group
208 assert(w.state == WriteThread::STATE_GROUP_LEADER);
209
210 // Once reaches this point, the current writer "w" will try to do its write
211 // job. It may also pick up some of the remaining writers in the "writers_"
212 // when it finds suitable, and finish them in the same write batch.
213 // This is how a write job could be done by the other writer.
214 WriteContext write_context;
215 WriteThread::WriteGroup write_group;
216 bool in_parallel_group = false;
217 uint64_t last_sequence = kMaxSequenceNumber;
218
219 mutex_.Lock();
220
221 bool need_log_sync = write_options.sync;
222 bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
223 if (!two_write_queues_ || !disable_memtable) {
224 // With concurrent writes we do preprocess only in the write thread that
225 // also does write to memtable to avoid sync issue on shared data structure
226 // with the other thread
227
228 // PreprocessWrite does its own perf timing.
229 PERF_TIMER_STOP(write_pre_and_post_process_time);
230
231 status = PreprocessWrite(write_options, &need_log_sync, &write_context);
232 if (!two_write_queues_) {
233 // Assign it after ::PreprocessWrite since the sequence might advance
234 // inside it by WriteRecoverableState
235 last_sequence = versions_->LastSequence();
236 }
237
238 PERF_TIMER_START(write_pre_and_post_process_time);
239 }
240 log::Writer* log_writer = logs_.back().writer;
241
242 mutex_.Unlock();
243
244 // Add to log and apply to memtable. We can release the lock
245 // during this phase since &w is currently responsible for logging
246 // and protects against concurrent loggers and concurrent writes
247 // into memtables
248
249 TEST_SYNC_POINT("DBImpl::WriteImpl:BeforeLeaderEnters");
250 last_batch_group_size_ =
251 write_thread_.EnterAsBatchGroupLeader(&w, &write_group);
252
253 if (status.ok()) {
254 // Rules for when we can update the memtable concurrently
255 // 1. supported by memtable
256 // 2. Puts are not okay if inplace_update_support
257 // 3. Merges are not okay
258 //
259 // Rules 1..2 are enforced by checking the options
260 // during startup (CheckConcurrentWritesSupported), so if
261 // options.allow_concurrent_memtable_write is true then they can be
262 // assumed to be true. Rule 3 is checked for each batch. We could
263 // relax rules 2 if we could prevent write batches from referring
264 // more than once to a particular key.
265 bool parallel = immutable_db_options_.allow_concurrent_memtable_write &&
266 write_group.size > 1;
267 size_t total_count = 0;
268 size_t valid_batches = 0;
269 size_t total_byte_size = 0;
270 size_t pre_release_callback_cnt = 0;
271 for (auto* writer : write_group) {
272 if (writer->CheckCallback(this)) {
273 valid_batches += writer->batch_cnt;
274 if (writer->ShouldWriteToMemtable()) {
275 total_count += WriteBatchInternal::Count(writer->batch);
276 parallel = parallel && !writer->batch->HasMerge();
277 }
278 total_byte_size = WriteBatchInternal::AppendedByteSize(
279 total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
280 if (writer->pre_release_callback) {
281 pre_release_callback_cnt++;
282 }
283 }
284 }
285 // Note about seq_per_batch_: either disableWAL is set for the entire write
286 // group or not. In either case we inc seq for each write batch with no
287 // failed callback. This means that there could be a batch with
288 // disalbe_memtable in between; although we do not write this batch to
289 // memtable it still consumes a seq. Otherwise, if !seq_per_batch_, we inc
290 // the seq per valid written key to mem.
291 size_t seq_inc = seq_per_batch_ ? valid_batches : total_count;
292
293 const bool concurrent_update = two_write_queues_;
294 // Update stats while we are an exclusive group leader, so we know
295 // that nobody else can be writing to these particular stats.
296 // We're optimistic, updating the stats before we successfully
297 // commit. That lets us release our leader status early.
298 auto stats = default_cf_internal_stats_;
299 stats->AddDBStats(InternalStats::kIntStatsNumKeysWritten, total_count,
300 concurrent_update);
301 RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
302 stats->AddDBStats(InternalStats::kIntStatsBytesWritten, total_byte_size,
303 concurrent_update);
304 RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
305 stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1,
306 concurrent_update);
307 RecordTick(stats_, WRITE_DONE_BY_SELF);
308 auto write_done_by_other = write_group.size - 1;
309 if (write_done_by_other > 0) {
310 stats->AddDBStats(InternalStats::kIntStatsWriteDoneByOther,
311 write_done_by_other, concurrent_update);
312 RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other);
313 }
314 RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size);
315
316 if (write_options.disableWAL) {
317 has_unpersisted_data_.store(true, std::memory_order_relaxed);
318 }
319
320 PERF_TIMER_STOP(write_pre_and_post_process_time);
321
322 if (!two_write_queues_) {
323 if (status.ok() && !write_options.disableWAL) {
324 PERF_TIMER_GUARD(write_wal_time);
325 status = WriteToWAL(write_group, log_writer, log_used, need_log_sync,
326 need_log_dir_sync, last_sequence + 1);
327 }
328 } else {
329 if (status.ok() && !write_options.disableWAL) {
330 PERF_TIMER_GUARD(write_wal_time);
331 // LastAllocatedSequence is increased inside WriteToWAL under
332 // wal_write_mutex_ to ensure ordered events in WAL
333 status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence,
334 seq_inc);
335 } else {
336 // Otherwise we inc seq number for memtable writes
337 last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
338 }
339 }
340 assert(last_sequence != kMaxSequenceNumber);
341 const SequenceNumber current_sequence = last_sequence + 1;
342 last_sequence += seq_inc;
343
344 // PreReleaseCallback is called after WAL write and before memtable write
345 if (status.ok()) {
346 SequenceNumber next_sequence = current_sequence;
347 size_t index = 0;
348 // Note: the logic for advancing seq here must be consistent with the
349 // logic in WriteBatchInternal::InsertInto(write_group...) as well as
350 // with WriteBatchInternal::InsertInto(write_batch...) that is called on
351 // the merged batch during recovery from the WAL.
352 for (auto* writer : write_group) {
353 if (writer->CallbackFailed()) {
354 continue;
355 }
356 writer->sequence = next_sequence;
357 if (writer->pre_release_callback) {
358 Status ws = writer->pre_release_callback->Callback(
359 writer->sequence, disable_memtable, writer->log_used, index++,
360 pre_release_callback_cnt);
361 if (!ws.ok()) {
362 status = ws;
363 break;
364 }
365 }
366 if (seq_per_batch_) {
367 assert(writer->batch_cnt);
368 next_sequence += writer->batch_cnt;
369 } else if (writer->ShouldWriteToMemtable()) {
370 next_sequence += WriteBatchInternal::Count(writer->batch);
371 }
372 }
373 }
374
375 if (status.ok()) {
376 PERF_TIMER_GUARD(write_memtable_time);
377
378 if (!parallel) {
379 // w.sequence will be set inside InsertInto
380 w.status = WriteBatchInternal::InsertInto(
381 write_group, current_sequence, column_family_memtables_.get(),
382 &flush_scheduler_, &trim_history_scheduler_,
383 write_options.ignore_missing_column_families,
384 0 /*recovery_log_number*/, this, parallel, seq_per_batch_,
385 batch_per_txn_);
386 } else {
387 write_group.last_sequence = last_sequence;
388 write_thread_.LaunchParallelMemTableWriters(&write_group);
389 in_parallel_group = true;
390
391 // Each parallel follower is doing each own writes. The leader should
392 // also do its own.
393 if (w.ShouldWriteToMemtable()) {
394 ColumnFamilyMemTablesImpl column_family_memtables(
395 versions_->GetColumnFamilySet());
396 assert(w.sequence == current_sequence);
397 w.status = WriteBatchInternal::InsertInto(
398 &w, w.sequence, &column_family_memtables, &flush_scheduler_,
399 &trim_history_scheduler_,
400 write_options.ignore_missing_column_families, 0 /*log_number*/,
401 this, true /*concurrent_memtable_writes*/, seq_per_batch_,
402 w.batch_cnt, batch_per_txn_,
403 write_options.memtable_insert_hint_per_batch);
404 }
405 }
406 if (seq_used != nullptr) {
407 *seq_used = w.sequence;
408 }
409 }
410 }
411 PERF_TIMER_START(write_pre_and_post_process_time);
412
413 if (!w.CallbackFailed()) {
414 WriteStatusCheck(status);
415 }
416
417 if (need_log_sync) {
418 mutex_.Lock();
419 MarkLogsSynced(logfile_number_, need_log_dir_sync, status);
420 mutex_.Unlock();
421 // Requesting sync with two_write_queues_ is expected to be very rare. We
422 // hence provide a simple implementation that is not necessarily efficient.
423 if (two_write_queues_) {
424 if (manual_wal_flush_) {
425 status = FlushWAL(true);
426 } else {
427 status = SyncWAL();
428 }
429 }
430 }
431
432 bool should_exit_batch_group = true;
433 if (in_parallel_group) {
434 // CompleteParallelWorker returns true if this thread should
435 // handle exit, false means somebody else did
436 should_exit_batch_group = write_thread_.CompleteParallelMemTableWriter(&w);
437 }
438 if (should_exit_batch_group) {
439 if (status.ok()) {
440 // Note: if we are to resume after non-OK statuses we need to revisit how
441 // we reacts to non-OK statuses here.
442 versions_->SetLastSequence(last_sequence);
443 }
444 MemTableInsertStatusCheck(w.status);
445 write_thread_.ExitAsBatchGroupLeader(write_group, status);
446 }
447
448 if (status.ok()) {
449 status = w.FinalStatus();
450 }
451 return status;
452 }
453
454 Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
455 WriteBatch* my_batch, WriteCallback* callback,
456 uint64_t* log_used, uint64_t log_ref,
457 bool disable_memtable, uint64_t* seq_used) {
458 PERF_TIMER_GUARD(write_pre_and_post_process_time);
459 StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
460
461 WriteContext write_context;
462
463 WriteThread::Writer w(write_options, my_batch, callback, log_ref,
464 disable_memtable);
465 write_thread_.JoinBatchGroup(&w);
466 if (w.state == WriteThread::STATE_GROUP_LEADER) {
467 WriteThread::WriteGroup wal_write_group;
468 if (w.callback && !w.callback->AllowWriteBatching()) {
469 write_thread_.WaitForMemTableWriters();
470 }
471 mutex_.Lock();
472 bool need_log_sync = !write_options.disableWAL && write_options.sync;
473 bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
474 // PreprocessWrite does its own perf timing.
475 PERF_TIMER_STOP(write_pre_and_post_process_time);
476 w.status = PreprocessWrite(write_options, &need_log_sync, &write_context);
477 PERF_TIMER_START(write_pre_and_post_process_time);
478 log::Writer* log_writer = logs_.back().writer;
479 mutex_.Unlock();
480
481 // This can set non-OK status if callback fail.
482 last_batch_group_size_ =
483 write_thread_.EnterAsBatchGroupLeader(&w, &wal_write_group);
484 const SequenceNumber current_sequence =
485 write_thread_.UpdateLastSequence(versions_->LastSequence()) + 1;
486 size_t total_count = 0;
487 size_t total_byte_size = 0;
488
489 if (w.status.ok()) {
490 SequenceNumber next_sequence = current_sequence;
491 for (auto writer : wal_write_group) {
492 if (writer->CheckCallback(this)) {
493 if (writer->ShouldWriteToMemtable()) {
494 writer->sequence = next_sequence;
495 size_t count = WriteBatchInternal::Count(writer->batch);
496 next_sequence += count;
497 total_count += count;
498 }
499 total_byte_size = WriteBatchInternal::AppendedByteSize(
500 total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
501 }
502 }
503 if (w.disable_wal) {
504 has_unpersisted_data_.store(true, std::memory_order_relaxed);
505 }
506 write_thread_.UpdateLastSequence(current_sequence + total_count - 1);
507 }
508
509 auto stats = default_cf_internal_stats_;
510 stats->AddDBStats(InternalStats::kIntStatsNumKeysWritten, total_count);
511 RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
512 stats->AddDBStats(InternalStats::kIntStatsBytesWritten, total_byte_size);
513 RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
514 RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size);
515
516 PERF_TIMER_STOP(write_pre_and_post_process_time);
517
518 if (w.status.ok() && !write_options.disableWAL) {
519 PERF_TIMER_GUARD(write_wal_time);
520 stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1);
521 RecordTick(stats_, WRITE_DONE_BY_SELF, 1);
522 if (wal_write_group.size > 1) {
523 stats->AddDBStats(InternalStats::kIntStatsWriteDoneByOther,
524 wal_write_group.size - 1);
525 RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1);
526 }
527 w.status = WriteToWAL(wal_write_group, log_writer, log_used,
528 need_log_sync, need_log_dir_sync, current_sequence);
529 }
530
531 if (!w.CallbackFailed()) {
532 WriteStatusCheck(w.status);
533 }
534
535 if (need_log_sync) {
536 mutex_.Lock();
537 MarkLogsSynced(logfile_number_, need_log_dir_sync, w.status);
538 mutex_.Unlock();
539 }
540
541 write_thread_.ExitAsBatchGroupLeader(wal_write_group, w.status);
542 }
543
544 WriteThread::WriteGroup memtable_write_group;
545 if (w.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) {
546 PERF_TIMER_GUARD(write_memtable_time);
547 assert(w.ShouldWriteToMemtable());
548 write_thread_.EnterAsMemTableWriter(&w, &memtable_write_group);
549 if (memtable_write_group.size > 1 &&
550 immutable_db_options_.allow_concurrent_memtable_write) {
551 write_thread_.LaunchParallelMemTableWriters(&memtable_write_group);
552 } else {
553 memtable_write_group.status = WriteBatchInternal::InsertInto(
554 memtable_write_group, w.sequence, column_family_memtables_.get(),
555 &flush_scheduler_, &trim_history_scheduler_,
556 write_options.ignore_missing_column_families, 0 /*log_number*/, this,
557 false /*concurrent_memtable_writes*/, seq_per_batch_, batch_per_txn_);
558 versions_->SetLastSequence(memtable_write_group.last_sequence);
559 write_thread_.ExitAsMemTableWriter(&w, memtable_write_group);
560 }
561 }
562
563 if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
564 assert(w.ShouldWriteToMemtable());
565 ColumnFamilyMemTablesImpl column_family_memtables(
566 versions_->GetColumnFamilySet());
567 w.status = WriteBatchInternal::InsertInto(
568 &w, w.sequence, &column_family_memtables, &flush_scheduler_,
569 &trim_history_scheduler_, write_options.ignore_missing_column_families,
570 0 /*log_number*/, this, true /*concurrent_memtable_writes*/,
571 false /*seq_per_batch*/, 0 /*batch_cnt*/, true /*batch_per_txn*/,
572 write_options.memtable_insert_hint_per_batch);
573 if (write_thread_.CompleteParallelMemTableWriter(&w)) {
574 MemTableInsertStatusCheck(w.status);
575 versions_->SetLastSequence(w.write_group->last_sequence);
576 write_thread_.ExitAsMemTableWriter(&w, *w.write_group);
577 }
578 }
579 if (seq_used != nullptr) {
580 *seq_used = w.sequence;
581 }
582
583 assert(w.state == WriteThread::STATE_COMPLETED);
584 return w.FinalStatus();
585 }
586
587 Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options,
588 WriteBatch* my_batch,
589 WriteCallback* callback, uint64_t log_ref,
590 SequenceNumber seq,
591 const size_t sub_batch_cnt) {
592 PERF_TIMER_GUARD(write_pre_and_post_process_time);
593 StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
594
595 WriteThread::Writer w(write_options, my_batch, callback, log_ref,
596 false /*disable_memtable*/);
597
598 if (w.CheckCallback(this) && w.ShouldWriteToMemtable()) {
599 w.sequence = seq;
600 size_t total_count = WriteBatchInternal::Count(my_batch);
601 InternalStats* stats = default_cf_internal_stats_;
602 stats->AddDBStats(InternalStats::kIntStatsNumKeysWritten, total_count);
603 RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
604
605 ColumnFamilyMemTablesImpl column_family_memtables(
606 versions_->GetColumnFamilySet());
607 w.status = WriteBatchInternal::InsertInto(
608 &w, w.sequence, &column_family_memtables, &flush_scheduler_,
609 &trim_history_scheduler_, write_options.ignore_missing_column_families,
610 0 /*log_number*/, this, true /*concurrent_memtable_writes*/,
611 seq_per_batch_, sub_batch_cnt, true /*batch_per_txn*/,
612 write_options.memtable_insert_hint_per_batch);
613
614 WriteStatusCheck(w.status);
615 if (write_options.disableWAL) {
616 has_unpersisted_data_.store(true, std::memory_order_relaxed);
617 }
618 }
619
620 size_t pending_cnt = pending_memtable_writes_.fetch_sub(1) - 1;
621 if (pending_cnt == 0) {
622 // switch_cv_ waits until pending_memtable_writes_ = 0. Locking its mutex
623 // before notify ensures that cv is in waiting state when it is notified
624 // thus not missing the update to pending_memtable_writes_ even though it is
625 // not modified under the mutex.
626 std::lock_guard<std::mutex> lck(switch_mutex_);
627 switch_cv_.notify_all();
628 }
629
630 if (!w.FinalStatus().ok()) {
631 return w.FinalStatus();
632 }
633 return Status::OK();
634 }
635
636 // The 2nd write queue. If enabled it will be used only for WAL-only writes.
637 // This is the only queue that updates LastPublishedSequence which is only
638 // applicable in a two-queue setting.
639 Status DBImpl::WriteImplWALOnly(
640 WriteThread* write_thread, const WriteOptions& write_options,
641 WriteBatch* my_batch, WriteCallback* callback, uint64_t* log_used,
642 const uint64_t log_ref, uint64_t* seq_used, const size_t sub_batch_cnt,
643 PreReleaseCallback* pre_release_callback, const AssignOrder assign_order,
644 const PublishLastSeq publish_last_seq, const bool disable_memtable) {
645 Status status;
646 PERF_TIMER_GUARD(write_pre_and_post_process_time);
647 WriteThread::Writer w(write_options, my_batch, callback, log_ref,
648 disable_memtable, sub_batch_cnt, pre_release_callback);
649 RecordTick(stats_, WRITE_WITH_WAL);
650 StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
651
652 write_thread->JoinBatchGroup(&w);
653 assert(w.state != WriteThread::STATE_PARALLEL_MEMTABLE_WRITER);
654 if (w.state == WriteThread::STATE_COMPLETED) {
655 if (log_used != nullptr) {
656 *log_used = w.log_used;
657 }
658 if (seq_used != nullptr) {
659 *seq_used = w.sequence;
660 }
661 return w.FinalStatus();
662 }
663 // else we are the leader of the write batch group
664 assert(w.state == WriteThread::STATE_GROUP_LEADER);
665
666 if (publish_last_seq == kDoPublishLastSeq) {
667 // Currently we only use kDoPublishLastSeq in unordered_write
668 assert(immutable_db_options_.unordered_write);
669 WriteContext write_context;
670 if (error_handler_.IsDBStopped()) {
671 status = error_handler_.GetBGError();
672 }
673 // TODO(myabandeh): Make preliminary checks thread-safe so we could do them
674 // without paying the cost of obtaining the mutex.
675 if (status.ok()) {
676 InstrumentedMutexLock l(&mutex_);
677 bool need_log_sync = false;
678 status = PreprocessWrite(write_options, &need_log_sync, &write_context);
679 WriteStatusCheck(status);
680 }
681 if (!status.ok()) {
682 WriteThread::WriteGroup write_group;
683 write_thread->EnterAsBatchGroupLeader(&w, &write_group);
684 write_thread->ExitAsBatchGroupLeader(write_group, status);
685 return status;
686 }
687 }
688
689 WriteThread::WriteGroup write_group;
690 uint64_t last_sequence;
691 write_thread->EnterAsBatchGroupLeader(&w, &write_group);
692 // Note: no need to update last_batch_group_size_ here since the batch writes
693 // to WAL only
694
695 size_t pre_release_callback_cnt = 0;
696 size_t total_byte_size = 0;
697 for (auto* writer : write_group) {
698 if (writer->CheckCallback(this)) {
699 total_byte_size = WriteBatchInternal::AppendedByteSize(
700 total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
701 if (writer->pre_release_callback) {
702 pre_release_callback_cnt++;
703 }
704 }
705 }
706
707 const bool concurrent_update = true;
708 // Update stats while we are an exclusive group leader, so we know
709 // that nobody else can be writing to these particular stats.
710 // We're optimistic, updating the stats before we successfully
711 // commit. That lets us release our leader status early.
712 auto stats = default_cf_internal_stats_;
713 stats->AddDBStats(InternalStats::kIntStatsBytesWritten, total_byte_size,
714 concurrent_update);
715 RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
716 stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1,
717 concurrent_update);
718 RecordTick(stats_, WRITE_DONE_BY_SELF);
719 auto write_done_by_other = write_group.size - 1;
720 if (write_done_by_other > 0) {
721 stats->AddDBStats(InternalStats::kIntStatsWriteDoneByOther,
722 write_done_by_other, concurrent_update);
723 RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other);
724 }
725 RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size);
726
727 PERF_TIMER_STOP(write_pre_and_post_process_time);
728
729 PERF_TIMER_GUARD(write_wal_time);
730 // LastAllocatedSequence is increased inside WriteToWAL under
731 // wal_write_mutex_ to ensure ordered events in WAL
732 size_t seq_inc = 0 /* total_count */;
733 if (assign_order == kDoAssignOrder) {
734 size_t total_batch_cnt = 0;
735 for (auto* writer : write_group) {
736 assert(writer->batch_cnt || !seq_per_batch_);
737 if (!writer->CallbackFailed()) {
738 total_batch_cnt += writer->batch_cnt;
739 }
740 }
741 seq_inc = total_batch_cnt;
742 }
743 if (!write_options.disableWAL) {
744 status =
745 ConcurrentWriteToWAL(write_group, log_used, &last_sequence, seq_inc);
746 } else {
747 // Otherwise we inc seq number to do solely the seq allocation
748 last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
749 }
750
751 size_t memtable_write_cnt = 0;
752 auto curr_seq = last_sequence + 1;
753 for (auto* writer : write_group) {
754 if (writer->CallbackFailed()) {
755 continue;
756 }
757 writer->sequence = curr_seq;
758 if (assign_order == kDoAssignOrder) {
759 assert(writer->batch_cnt || !seq_per_batch_);
760 curr_seq += writer->batch_cnt;
761 }
762 if (!writer->disable_memtable) {
763 memtable_write_cnt++;
764 }
765 // else seq advances only by memtable writes
766 }
767 if (status.ok() && write_options.sync) {
768 assert(!write_options.disableWAL);
769 // Requesting sync with two_write_queues_ is expected to be very rare. We
770 // hance provide a simple implementation that is not necessarily efficient.
771 if (manual_wal_flush_) {
772 status = FlushWAL(true);
773 } else {
774 status = SyncWAL();
775 }
776 }
777 PERF_TIMER_START(write_pre_and_post_process_time);
778
779 if (!w.CallbackFailed()) {
780 WriteStatusCheck(status);
781 }
782 if (status.ok()) {
783 size_t index = 0;
784 for (auto* writer : write_group) {
785 if (!writer->CallbackFailed() && writer->pre_release_callback) {
786 assert(writer->sequence != kMaxSequenceNumber);
787 Status ws = writer->pre_release_callback->Callback(
788 writer->sequence, disable_memtable, writer->log_used, index++,
789 pre_release_callback_cnt);
790 if (!ws.ok()) {
791 status = ws;
792 break;
793 }
794 }
795 }
796 }
797 if (publish_last_seq == kDoPublishLastSeq) {
798 versions_->SetLastSequence(last_sequence + seq_inc);
799 // Currently we only use kDoPublishLastSeq in unordered_write
800 assert(immutable_db_options_.unordered_write);
801 }
802 if (immutable_db_options_.unordered_write && status.ok()) {
803 pending_memtable_writes_ += memtable_write_cnt;
804 }
805 write_thread->ExitAsBatchGroupLeader(write_group, status);
806 if (status.ok()) {
807 status = w.FinalStatus();
808 }
809 if (seq_used != nullptr) {
810 *seq_used = w.sequence;
811 }
812 return status;
813 }
814
815 void DBImpl::WriteStatusCheck(const Status& status) {
816 // Is setting bg_error_ enough here? This will at least stop
817 // compaction and fail any further writes.
818 if (immutable_db_options_.paranoid_checks && !status.ok() &&
819 !status.IsBusy() && !status.IsIncomplete()) {
820 mutex_.Lock();
821 error_handler_.SetBGError(status, BackgroundErrorReason::kWriteCallback);
822 mutex_.Unlock();
823 }
824 }
825
826 void DBImpl::MemTableInsertStatusCheck(const Status& status) {
827 // A non-OK status here indicates that the state implied by the
828 // WAL has diverged from the in-memory state. This could be
829 // because of a corrupt write_batch (very bad), or because the
830 // client specified an invalid column family and didn't specify
831 // ignore_missing_column_families.
832 if (!status.ok()) {
833 mutex_.Lock();
834 assert(!error_handler_.IsBGWorkStopped());
835 error_handler_.SetBGError(status, BackgroundErrorReason::kMemTable);
836 mutex_.Unlock();
837 }
838 }
839
840 Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
841 bool* need_log_sync,
842 WriteContext* write_context) {
843 mutex_.AssertHeld();
844 assert(write_context != nullptr && need_log_sync != nullptr);
845 Status status;
846
847 if (error_handler_.IsDBStopped()) {
848 status = error_handler_.GetBGError();
849 }
850
851 PERF_TIMER_GUARD(write_scheduling_flushes_compactions_time);
852
853 assert(!single_column_family_mode_ ||
854 versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1);
855 if (UNLIKELY(status.ok() && !single_column_family_mode_ &&
856 total_log_size_ > GetMaxTotalWalSize())) {
857 WaitForPendingWrites();
858 status = SwitchWAL(write_context);
859 }
860
861 if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldFlush())) {
862 // Before a new memtable is added in SwitchMemtable(),
863 // write_buffer_manager_->ShouldFlush() will keep returning true. If another
864 // thread is writing to another DB with the same write buffer, they may also
865 // be flushed. We may end up with flushing much more DBs than needed. It's
866 // suboptimal but still correct.
867 WaitForPendingWrites();
868 status = HandleWriteBufferFull(write_context);
869 }
870
871 if (UNLIKELY(status.ok() && !trim_history_scheduler_.Empty())) {
872 status = TrimMemtableHistory(write_context);
873 }
874
875 if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) {
876 WaitForPendingWrites();
877 status = ScheduleFlushes(write_context);
878 }
879
880 PERF_TIMER_STOP(write_scheduling_flushes_compactions_time);
881 PERF_TIMER_GUARD(write_pre_and_post_process_time);
882
883 if (UNLIKELY(status.ok() && (write_controller_.IsStopped() ||
884 write_controller_.NeedsDelay()))) {
885 PERF_TIMER_STOP(write_pre_and_post_process_time);
886 PERF_TIMER_GUARD(write_delay_time);
887 // We don't know size of curent batch so that we always use the size
888 // for previous one. It might create a fairness issue that expiration
889 // might happen for smaller writes but larger writes can go through.
890 // Can optimize it if it is an issue.
891 status = DelayWrite(last_batch_group_size_, write_options);
892 PERF_TIMER_START(write_pre_and_post_process_time);
893 }
894
895 if (status.ok() && *need_log_sync) {
896 // Wait until the parallel syncs are finished. Any sync process has to sync
897 // the front log too so it is enough to check the status of front()
898 // We do a while loop since log_sync_cv_ is signalled when any sync is
899 // finished
900 // Note: there does not seem to be a reason to wait for parallel sync at
901 // this early step but it is not important since parallel sync (SyncWAL) and
902 // need_log_sync are usually not used together.
903 while (logs_.front().getting_synced) {
904 log_sync_cv_.Wait();
905 }
906 for (auto& log : logs_) {
907 assert(!log.getting_synced);
908 // This is just to prevent the logs to be synced by a parallel SyncWAL
909 // call. We will do the actual syncing later after we will write to the
910 // WAL.
911 // Note: there does not seem to be a reason to set this early before we
912 // actually write to the WAL
913 log.getting_synced = true;
914 }
915 } else {
916 *need_log_sync = false;
917 }
918
919 return status;
920 }
921
922 WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
923 WriteBatch* tmp_batch, size_t* write_with_wal,
924 WriteBatch** to_be_cached_state) {
925 assert(write_with_wal != nullptr);
926 assert(tmp_batch != nullptr);
927 assert(*to_be_cached_state == nullptr);
928 WriteBatch* merged_batch = nullptr;
929 *write_with_wal = 0;
930 auto* leader = write_group.leader;
931 assert(!leader->disable_wal); // Same holds for all in the batch group
932 if (write_group.size == 1 && !leader->CallbackFailed() &&
933 leader->batch->GetWalTerminationPoint().is_cleared()) {
934 // we simply write the first WriteBatch to WAL if the group only
935 // contains one batch, that batch should be written to the WAL,
936 // and the batch is not wanting to be truncated
937 merged_batch = leader->batch;
938 if (WriteBatchInternal::IsLatestPersistentState(merged_batch)) {
939 *to_be_cached_state = merged_batch;
940 }
941 *write_with_wal = 1;
942 } else {
943 // WAL needs all of the batches flattened into a single batch.
944 // We could avoid copying here with an iov-like AddRecord
945 // interface
946 merged_batch = tmp_batch;
947 for (auto writer : write_group) {
948 if (!writer->CallbackFailed()) {
949 WriteBatchInternal::Append(merged_batch, writer->batch,
950 /*WAL_only*/ true);
951 if (WriteBatchInternal::IsLatestPersistentState(writer->batch)) {
952 // We only need to cache the last of such write batch
953 *to_be_cached_state = writer->batch;
954 }
955 (*write_with_wal)++;
956 }
957 }
958 }
959 return merged_batch;
960 }
961
962 // When two_write_queues_ is disabled, this function is called from the only
963 // write thread. Otherwise this must be called holding log_write_mutex_.
964 Status DBImpl::WriteToWAL(const WriteBatch& merged_batch,
965 log::Writer* log_writer, uint64_t* log_used,
966 uint64_t* log_size) {
967 assert(log_size != nullptr);
968 Slice log_entry = WriteBatchInternal::Contents(&merged_batch);
969 *log_size = log_entry.size();
970 // When two_write_queues_ WriteToWAL has to be protected from concurretn calls
971 // from the two queues anyway and log_write_mutex_ is already held. Otherwise
972 // if manual_wal_flush_ is enabled we need to protect log_writer->AddRecord
973 // from possible concurrent calls via the FlushWAL by the application.
974 const bool needs_locking = manual_wal_flush_ && !two_write_queues_;
975 // Due to performance cocerns of missed branch prediction penalize the new
976 // manual_wal_flush_ feature (by UNLIKELY) instead of the more common case
977 // when we do not need any locking.
978 if (UNLIKELY(needs_locking)) {
979 log_write_mutex_.Lock();
980 }
981 Status status = log_writer->AddRecord(log_entry);
982 if (UNLIKELY(needs_locking)) {
983 log_write_mutex_.Unlock();
984 }
985 if (log_used != nullptr) {
986 *log_used = logfile_number_;
987 }
988 total_log_size_ += log_entry.size();
989 // TODO(myabandeh): it might be unsafe to access alive_log_files_.back() here
990 // since alive_log_files_ might be modified concurrently
991 alive_log_files_.back().AddSize(log_entry.size());
992 log_empty_ = false;
993 return status;
994 }
995
996 Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
997 log::Writer* log_writer, uint64_t* log_used,
998 bool need_log_sync, bool need_log_dir_sync,
999 SequenceNumber sequence) {
1000 Status status;
1001
1002 assert(!write_group.leader->disable_wal);
1003 // Same holds for all in the batch group
1004 size_t write_with_wal = 0;
1005 WriteBatch* to_be_cached_state = nullptr;
1006 WriteBatch* merged_batch = MergeBatch(write_group, &tmp_batch_,
1007 &write_with_wal, &to_be_cached_state);
1008 if (merged_batch == write_group.leader->batch) {
1009 write_group.leader->log_used = logfile_number_;
1010 } else if (write_with_wal > 1) {
1011 for (auto writer : write_group) {
1012 writer->log_used = logfile_number_;
1013 }
1014 }
1015
1016 WriteBatchInternal::SetSequence(merged_batch, sequence);
1017
1018 uint64_t log_size;
1019 status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
1020 if (to_be_cached_state) {
1021 cached_recoverable_state_ = *to_be_cached_state;
1022 cached_recoverable_state_empty_ = false;
1023 }
1024
1025 if (status.ok() && need_log_sync) {
1026 StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS);
1027 // It's safe to access logs_ with unlocked mutex_ here because:
1028 // - we've set getting_synced=true for all logs,
1029 // so other threads won't pop from logs_ while we're here,
1030 // - only writer thread can push to logs_, and we're in
1031 // writer thread, so no one will push to logs_,
1032 // - as long as other threads don't modify it, it's safe to read
1033 // from std::deque from multiple threads concurrently.
1034 for (auto& log : logs_) {
1035 status = log.writer->file()->Sync(immutable_db_options_.use_fsync);
1036 if (!status.ok()) {
1037 break;
1038 }
1039 }
1040 if (status.ok() && need_log_dir_sync) {
1041 // We only sync WAL directory the first time WAL syncing is
1042 // requested, so that in case users never turn on WAL sync,
1043 // we can avoid the disk I/O in the write code path.
1044 status = directories_.GetWalDir()->Fsync();
1045 }
1046 }
1047
1048 if (merged_batch == &tmp_batch_) {
1049 tmp_batch_.Clear();
1050 }
1051 if (status.ok()) {
1052 auto stats = default_cf_internal_stats_;
1053 if (need_log_sync) {
1054 stats->AddDBStats(InternalStats::kIntStatsWalFileSynced, 1);
1055 RecordTick(stats_, WAL_FILE_SYNCED);
1056 }
1057 stats->AddDBStats(InternalStats::kIntStatsWalFileBytes, log_size);
1058 RecordTick(stats_, WAL_FILE_BYTES, log_size);
1059 stats->AddDBStats(InternalStats::kIntStatsWriteWithWal, write_with_wal);
1060 RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
1061 }
1062 return status;
1063 }
1064
1065 Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
1066 uint64_t* log_used,
1067 SequenceNumber* last_sequence,
1068 size_t seq_inc) {
1069 Status status;
1070
1071 assert(!write_group.leader->disable_wal);
1072 // Same holds for all in the batch group
1073 WriteBatch tmp_batch;
1074 size_t write_with_wal = 0;
1075 WriteBatch* to_be_cached_state = nullptr;
1076 WriteBatch* merged_batch =
1077 MergeBatch(write_group, &tmp_batch, &write_with_wal, &to_be_cached_state);
1078
1079 // We need to lock log_write_mutex_ since logs_ and alive_log_files might be
1080 // pushed back concurrently
1081 log_write_mutex_.Lock();
1082 if (merged_batch == write_group.leader->batch) {
1083 write_group.leader->log_used = logfile_number_;
1084 } else if (write_with_wal > 1) {
1085 for (auto writer : write_group) {
1086 writer->log_used = logfile_number_;
1087 }
1088 }
1089 *last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
1090 auto sequence = *last_sequence + 1;
1091 WriteBatchInternal::SetSequence(merged_batch, sequence);
1092
1093 log::Writer* log_writer = logs_.back().writer;
1094 uint64_t log_size;
1095 status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
1096 if (to_be_cached_state) {
1097 cached_recoverable_state_ = *to_be_cached_state;
1098 cached_recoverable_state_empty_ = false;
1099 }
1100 log_write_mutex_.Unlock();
1101
1102 if (status.ok()) {
1103 const bool concurrent = true;
1104 auto stats = default_cf_internal_stats_;
1105 stats->AddDBStats(InternalStats::kIntStatsWalFileBytes, log_size,
1106 concurrent);
1107 RecordTick(stats_, WAL_FILE_BYTES, log_size);
1108 stats->AddDBStats(InternalStats::kIntStatsWriteWithWal, write_with_wal,
1109 concurrent);
1110 RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
1111 }
1112 return status;
1113 }
1114
1115 Status DBImpl::WriteRecoverableState() {
1116 mutex_.AssertHeld();
1117 if (!cached_recoverable_state_empty_) {
1118 bool dont_care_bool;
1119 SequenceNumber next_seq;
1120 if (two_write_queues_) {
1121 log_write_mutex_.Lock();
1122 }
1123 SequenceNumber seq;
1124 if (two_write_queues_) {
1125 seq = versions_->FetchAddLastAllocatedSequence(0);
1126 } else {
1127 seq = versions_->LastSequence();
1128 }
1129 WriteBatchInternal::SetSequence(&cached_recoverable_state_, seq + 1);
1130 auto status = WriteBatchInternal::InsertInto(
1131 &cached_recoverable_state_, column_family_memtables_.get(),
1132 &flush_scheduler_, &trim_history_scheduler_, true,
1133 0 /*recovery_log_number*/, this, false /* concurrent_memtable_writes */,
1134 &next_seq, &dont_care_bool, seq_per_batch_);
1135 auto last_seq = next_seq - 1;
1136 if (two_write_queues_) {
1137 versions_->FetchAddLastAllocatedSequence(last_seq - seq);
1138 versions_->SetLastPublishedSequence(last_seq);
1139 }
1140 versions_->SetLastSequence(last_seq);
1141 if (two_write_queues_) {
1142 log_write_mutex_.Unlock();
1143 }
1144 if (status.ok() && recoverable_state_pre_release_callback_) {
1145 const bool DISABLE_MEMTABLE = true;
1146 for (uint64_t sub_batch_seq = seq + 1;
1147 sub_batch_seq < next_seq && status.ok(); sub_batch_seq++) {
1148 uint64_t const no_log_num = 0;
1149 // Unlock it since the callback might end up locking mutex. e.g.,
1150 // AddCommitted -> AdvanceMaxEvictedSeq -> GetSnapshotListFromDB
1151 mutex_.Unlock();
1152 status = recoverable_state_pre_release_callback_->Callback(
1153 sub_batch_seq, !DISABLE_MEMTABLE, no_log_num, 0, 1);
1154 mutex_.Lock();
1155 }
1156 }
1157 if (status.ok()) {
1158 cached_recoverable_state_.Clear();
1159 cached_recoverable_state_empty_ = true;
1160 }
1161 return status;
1162 }
1163 return Status::OK();
1164 }
1165
1166 void DBImpl::SelectColumnFamiliesForAtomicFlush(
1167 autovector<ColumnFamilyData*>* cfds) {
1168 for (ColumnFamilyData* cfd : *versions_->GetColumnFamilySet()) {
1169 if (cfd->IsDropped()) {
1170 continue;
1171 }
1172 if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
1173 !cached_recoverable_state_empty_.load()) {
1174 cfds->push_back(cfd);
1175 }
1176 }
1177 }
1178
1179 // Assign sequence number for atomic flush.
1180 void DBImpl::AssignAtomicFlushSeq(const autovector<ColumnFamilyData*>& cfds) {
1181 assert(immutable_db_options_.atomic_flush);
1182 auto seq = versions_->LastSequence();
1183 for (auto cfd : cfds) {
1184 cfd->imm()->AssignAtomicFlushSeq(seq);
1185 }
1186 }
1187
1188 Status DBImpl::SwitchWAL(WriteContext* write_context) {
1189 mutex_.AssertHeld();
1190 assert(write_context != nullptr);
1191 Status status;
1192
1193 if (alive_log_files_.begin()->getting_flushed) {
1194 return status;
1195 }
1196
1197 auto oldest_alive_log = alive_log_files_.begin()->number;
1198 bool flush_wont_release_oldest_log = false;
1199 if (allow_2pc()) {
1200 auto oldest_log_with_uncommitted_prep =
1201 logs_with_prep_tracker_.FindMinLogContainingOutstandingPrep();
1202
1203 assert(oldest_log_with_uncommitted_prep == 0 ||
1204 oldest_log_with_uncommitted_prep >= oldest_alive_log);
1205 if (oldest_log_with_uncommitted_prep > 0 &&
1206 oldest_log_with_uncommitted_prep == oldest_alive_log) {
1207 if (unable_to_release_oldest_log_) {
1208 // we already attempted to flush all column families dependent on
1209 // the oldest alive log but the log still contained uncommitted
1210 // transactions so there is still nothing that we can do.
1211 return status;
1212 } else {
1213 ROCKS_LOG_WARN(
1214 immutable_db_options_.info_log,
1215 "Unable to release oldest log due to uncommitted transaction");
1216 unable_to_release_oldest_log_ = true;
1217 flush_wont_release_oldest_log = true;
1218 }
1219 }
1220 }
1221 if (!flush_wont_release_oldest_log) {
1222 // we only mark this log as getting flushed if we have successfully
1223 // flushed all data in this log. If this log contains outstanding prepared
1224 // transactions then we cannot flush this log until those transactions are
1225 // commited.
1226 unable_to_release_oldest_log_ = false;
1227 alive_log_files_.begin()->getting_flushed = true;
1228 }
1229
1230 ROCKS_LOG_INFO(
1231 immutable_db_options_.info_log,
1232 "Flushing all column families with data in WAL number %" PRIu64
1233 ". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64,
1234 oldest_alive_log, total_log_size_.load(), GetMaxTotalWalSize());
1235 // no need to refcount because drop is happening in write thread, so can't
1236 // happen while we're in the write thread
1237 autovector<ColumnFamilyData*> cfds;
1238 if (immutable_db_options_.atomic_flush) {
1239 SelectColumnFamiliesForAtomicFlush(&cfds);
1240 } else {
1241 for (auto cfd : *versions_->GetColumnFamilySet()) {
1242 if (cfd->IsDropped()) {
1243 continue;
1244 }
1245 if (cfd->OldestLogToKeep() <= oldest_alive_log) {
1246 cfds.push_back(cfd);
1247 }
1248 }
1249 MaybeFlushStatsCF(&cfds);
1250 }
1251 WriteThread::Writer nonmem_w;
1252 if (two_write_queues_) {
1253 nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
1254 }
1255
1256 for (const auto cfd : cfds) {
1257 cfd->Ref();
1258 status = SwitchMemtable(cfd, write_context);
1259 cfd->UnrefAndTryDelete();
1260 if (!status.ok()) {
1261 break;
1262 }
1263 }
1264 if (two_write_queues_) {
1265 nonmem_write_thread_.ExitUnbatched(&nonmem_w);
1266 }
1267
1268 if (status.ok()) {
1269 if (immutable_db_options_.atomic_flush) {
1270 AssignAtomicFlushSeq(cfds);
1271 }
1272 for (auto cfd : cfds) {
1273 cfd->imm()->FlushRequested();
1274 }
1275 FlushRequest flush_req;
1276 GenerateFlushRequest(cfds, &flush_req);
1277 SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager);
1278 MaybeScheduleFlushOrCompaction();
1279 }
1280 return status;
1281 }
1282
1283 Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) {
1284 mutex_.AssertHeld();
1285 assert(write_context != nullptr);
1286 Status status;
1287
1288 // Before a new memtable is added in SwitchMemtable(),
1289 // write_buffer_manager_->ShouldFlush() will keep returning true. If another
1290 // thread is writing to another DB with the same write buffer, they may also
1291 // be flushed. We may end up with flushing much more DBs than needed. It's
1292 // suboptimal but still correct.
1293 ROCKS_LOG_INFO(
1294 immutable_db_options_.info_log,
1295 "Flushing column family with oldest memtable entry. Write buffer is "
1296 "using %" ROCKSDB_PRIszt " bytes out of a total of %" ROCKSDB_PRIszt ".",
1297 write_buffer_manager_->memory_usage(),
1298 write_buffer_manager_->buffer_size());
1299 // no need to refcount because drop is happening in write thread, so can't
1300 // happen while we're in the write thread
1301 autovector<ColumnFamilyData*> cfds;
1302 if (immutable_db_options_.atomic_flush) {
1303 SelectColumnFamiliesForAtomicFlush(&cfds);
1304 } else {
1305 ColumnFamilyData* cfd_picked = nullptr;
1306 SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber;
1307
1308 for (auto cfd : *versions_->GetColumnFamilySet()) {
1309 if (cfd->IsDropped()) {
1310 continue;
1311 }
1312 if (!cfd->mem()->IsEmpty()) {
1313 // We only consider active mem table, hoping immutable memtable is
1314 // already in the process of flushing.
1315 uint64_t seq = cfd->mem()->GetCreationSeq();
1316 if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) {
1317 cfd_picked = cfd;
1318 seq_num_for_cf_picked = seq;
1319 }
1320 }
1321 }
1322 if (cfd_picked != nullptr) {
1323 cfds.push_back(cfd_picked);
1324 }
1325 MaybeFlushStatsCF(&cfds);
1326 }
1327
1328 WriteThread::Writer nonmem_w;
1329 if (two_write_queues_) {
1330 nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
1331 }
1332 for (const auto cfd : cfds) {
1333 if (cfd->mem()->IsEmpty()) {
1334 continue;
1335 }
1336 cfd->Ref();
1337 status = SwitchMemtable(cfd, write_context);
1338 cfd->UnrefAndTryDelete();
1339 if (!status.ok()) {
1340 break;
1341 }
1342 }
1343 if (two_write_queues_) {
1344 nonmem_write_thread_.ExitUnbatched(&nonmem_w);
1345 }
1346
1347 if (status.ok()) {
1348 if (immutable_db_options_.atomic_flush) {
1349 AssignAtomicFlushSeq(cfds);
1350 }
1351 for (const auto cfd : cfds) {
1352 cfd->imm()->FlushRequested();
1353 }
1354 FlushRequest flush_req;
1355 GenerateFlushRequest(cfds, &flush_req);
1356 SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
1357 MaybeScheduleFlushOrCompaction();
1358 }
1359 return status;
1360 }
1361
1362 uint64_t DBImpl::GetMaxTotalWalSize() const {
1363 mutex_.AssertHeld();
1364 return mutable_db_options_.max_total_wal_size == 0
1365 ? 4 * max_total_in_memory_state_
1366 : mutable_db_options_.max_total_wal_size;
1367 }
1368
1369 // REQUIRES: mutex_ is held
1370 // REQUIRES: this thread is currently at the front of the writer queue
1371 Status DBImpl::DelayWrite(uint64_t num_bytes,
1372 const WriteOptions& write_options) {
1373 uint64_t time_delayed = 0;
1374 bool delayed = false;
1375 {
1376 StopWatch sw(env_, stats_, WRITE_STALL, &time_delayed);
1377 uint64_t delay = write_controller_.GetDelay(env_, num_bytes);
1378 if (delay > 0) {
1379 if (write_options.no_slowdown) {
1380 return Status::Incomplete("Write stall");
1381 }
1382 TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep");
1383
1384 // Notify write_thread_ about the stall so it can setup a barrier and
1385 // fail any pending writers with no_slowdown
1386 write_thread_.BeginWriteStall();
1387 TEST_SYNC_POINT("DBImpl::DelayWrite:BeginWriteStallDone");
1388 mutex_.Unlock();
1389 // We will delay the write until we have slept for delay ms or
1390 // we don't need a delay anymore
1391 const uint64_t kDelayInterval = 1000;
1392 uint64_t stall_end = sw.start_time() + delay;
1393 while (write_controller_.NeedsDelay()) {
1394 if (env_->NowMicros() >= stall_end) {
1395 // We already delayed this write `delay` microseconds
1396 break;
1397 }
1398
1399 delayed = true;
1400 // Sleep for 0.001 seconds
1401 env_->SleepForMicroseconds(kDelayInterval);
1402 }
1403 mutex_.Lock();
1404 write_thread_.EndWriteStall();
1405 }
1406
1407 // Don't wait if there's a background error, even if its a soft error. We
1408 // might wait here indefinitely as the background compaction may never
1409 // finish successfully, resulting in the stall condition lasting
1410 // indefinitely
1411 while (error_handler_.GetBGError().ok() && write_controller_.IsStopped()) {
1412 if (write_options.no_slowdown) {
1413 return Status::Incomplete("Write stall");
1414 }
1415 delayed = true;
1416
1417 // Notify write_thread_ about the stall so it can setup a barrier and
1418 // fail any pending writers with no_slowdown
1419 write_thread_.BeginWriteStall();
1420 TEST_SYNC_POINT("DBImpl::DelayWrite:Wait");
1421 bg_cv_.Wait();
1422 write_thread_.EndWriteStall();
1423 }
1424 }
1425 assert(!delayed || !write_options.no_slowdown);
1426 if (delayed) {
1427 default_cf_internal_stats_->AddDBStats(
1428 InternalStats::kIntStatsWriteStallMicros, time_delayed);
1429 RecordTick(stats_, STALL_MICROS, time_delayed);
1430 }
1431
1432 // If DB is not in read-only mode and write_controller is not stopping
1433 // writes, we can ignore any background errors and allow the write to
1434 // proceed
1435 Status s;
1436 if (write_controller_.IsStopped()) {
1437 // If writes are still stopped, it means we bailed due to a background
1438 // error
1439 s = Status::Incomplete(error_handler_.GetBGError().ToString());
1440 }
1441 if (error_handler_.IsDBStopped()) {
1442 s = error_handler_.GetBGError();
1443 }
1444 return s;
1445 }
1446
1447 Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,
1448 WriteBatch* my_batch) {
1449 assert(write_options.low_pri);
1450 // This is called outside the DB mutex. Although it is safe to make the call,
1451 // the consistency condition is not guaranteed to hold. It's OK to live with
1452 // it in this case.
1453 // If we need to speed compaction, it means the compaction is left behind
1454 // and we start to limit low pri writes to a limit.
1455 if (write_controller_.NeedSpeedupCompaction()) {
1456 if (allow_2pc() && (my_batch->HasCommit() || my_batch->HasRollback())) {
1457 // For 2PC, we only rate limit prepare, not commit.
1458 return Status::OK();
1459 }
1460 if (write_options.no_slowdown) {
1461 return Status::Incomplete("Low priority write stall");
1462 } else {
1463 assert(my_batch != nullptr);
1464 // Rate limit those writes. The reason that we don't completely wait
1465 // is that in case the write is heavy, low pri writes may never have
1466 // a chance to run. Now we guarantee we are still slowly making
1467 // progress.
1468 PERF_TIMER_GUARD(write_delay_time);
1469 write_controller_.low_pri_rate_limiter()->Request(
1470 my_batch->GetDataSize(), Env::IO_HIGH, nullptr /* stats */,
1471 RateLimiter::OpType::kWrite);
1472 }
1473 }
1474 return Status::OK();
1475 }
1476
1477 void DBImpl::MaybeFlushStatsCF(autovector<ColumnFamilyData*>* cfds) {
1478 assert(cfds != nullptr);
1479 if (!cfds->empty() && immutable_db_options_.persist_stats_to_disk) {
1480 ColumnFamilyData* cfd_stats =
1481 versions_->GetColumnFamilySet()->GetColumnFamily(
1482 kPersistentStatsColumnFamilyName);
1483 if (cfd_stats != nullptr && !cfd_stats->mem()->IsEmpty()) {
1484 for (ColumnFamilyData* cfd : *cfds) {
1485 if (cfd == cfd_stats) {
1486 // stats CF already included in cfds
1487 return;
1488 }
1489 }
1490 // force flush stats CF when its log number is less than all other CF's
1491 // log numbers
1492 bool force_flush_stats_cf = true;
1493 for (auto* loop_cfd : *versions_->GetColumnFamilySet()) {
1494 if (loop_cfd == cfd_stats) {
1495 continue;
1496 }
1497 if (loop_cfd->GetLogNumber() <= cfd_stats->GetLogNumber()) {
1498 force_flush_stats_cf = false;
1499 }
1500 }
1501 if (force_flush_stats_cf) {
1502 cfds->push_back(cfd_stats);
1503 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1504 "Force flushing stats CF with automated flush "
1505 "to avoid holding old logs");
1506 }
1507 }
1508 }
1509 }
1510
1511 Status DBImpl::TrimMemtableHistory(WriteContext* context) {
1512 autovector<ColumnFamilyData*> cfds;
1513 ColumnFamilyData* tmp_cfd;
1514 while ((tmp_cfd = trim_history_scheduler_.TakeNextColumnFamily()) !=
1515 nullptr) {
1516 cfds.push_back(tmp_cfd);
1517 }
1518 for (auto& cfd : cfds) {
1519 autovector<MemTable*> to_delete;
1520 cfd->imm()->TrimHistory(&to_delete, cfd->mem()->ApproximateMemoryUsage());
1521 if (!to_delete.empty()) {
1522 for (auto m : to_delete) {
1523 delete m;
1524 }
1525 context->superversion_context.NewSuperVersion();
1526 assert(context->superversion_context.new_superversion.get() != nullptr);
1527 cfd->InstallSuperVersion(&context->superversion_context, &mutex_);
1528 }
1529
1530 if (cfd->UnrefAndTryDelete()) {
1531 cfd = nullptr;
1532 }
1533 }
1534 return Status::OK();
1535 }
1536
1537 Status DBImpl::ScheduleFlushes(WriteContext* context) {
1538 autovector<ColumnFamilyData*> cfds;
1539 if (immutable_db_options_.atomic_flush) {
1540 SelectColumnFamiliesForAtomicFlush(&cfds);
1541 for (auto cfd : cfds) {
1542 cfd->Ref();
1543 }
1544 flush_scheduler_.Clear();
1545 } else {
1546 ColumnFamilyData* tmp_cfd;
1547 while ((tmp_cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
1548 cfds.push_back(tmp_cfd);
1549 }
1550 MaybeFlushStatsCF(&cfds);
1551 }
1552 Status status;
1553 WriteThread::Writer nonmem_w;
1554 if (two_write_queues_) {
1555 nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
1556 }
1557
1558 for (auto& cfd : cfds) {
1559 if (!cfd->mem()->IsEmpty()) {
1560 status = SwitchMemtable(cfd, context);
1561 }
1562 if (cfd->UnrefAndTryDelete()) {
1563 cfd = nullptr;
1564 }
1565 if (!status.ok()) {
1566 break;
1567 }
1568 }
1569
1570 if (two_write_queues_) {
1571 nonmem_write_thread_.ExitUnbatched(&nonmem_w);
1572 }
1573
1574 if (status.ok()) {
1575 if (immutable_db_options_.atomic_flush) {
1576 AssignAtomicFlushSeq(cfds);
1577 }
1578 FlushRequest flush_req;
1579 GenerateFlushRequest(cfds, &flush_req);
1580 SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
1581 MaybeScheduleFlushOrCompaction();
1582 }
1583 return status;
1584 }
1585
1586 #ifndef ROCKSDB_LITE
1587 void DBImpl::NotifyOnMemTableSealed(ColumnFamilyData* /*cfd*/,
1588 const MemTableInfo& mem_table_info) {
1589 if (immutable_db_options_.listeners.size() == 0U) {
1590 return;
1591 }
1592 if (shutting_down_.load(std::memory_order_acquire)) {
1593 return;
1594 }
1595
1596 for (auto listener : immutable_db_options_.listeners) {
1597 listener->OnMemTableSealed(mem_table_info);
1598 }
1599 }
1600 #endif // ROCKSDB_LITE
1601
1602 // REQUIRES: mutex_ is held
1603 // REQUIRES: this thread is currently at the front of the writer queue
1604 // REQUIRES: this thread is currently at the front of the 2nd writer queue if
1605 // two_write_queues_ is true (This is to simplify the reasoning.)
1606 Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
1607 mutex_.AssertHeld();
1608 WriteThread::Writer nonmem_w;
1609 std::unique_ptr<WritableFile> lfile;
1610 log::Writer* new_log = nullptr;
1611 MemTable* new_mem = nullptr;
1612
1613 // Recoverable state is persisted in WAL. After memtable switch, WAL might
1614 // be deleted, so we write the state to memtable to be persisted as well.
1615 Status s = WriteRecoverableState();
1616 if (!s.ok()) {
1617 return s;
1618 }
1619
1620 // Attempt to switch to a new memtable and trigger flush of old.
1621 // Do this without holding the dbmutex lock.
1622 assert(versions_->prev_log_number() == 0);
1623 if (two_write_queues_) {
1624 log_write_mutex_.Lock();
1625 }
1626 bool creating_new_log = !log_empty_;
1627 if (two_write_queues_) {
1628 log_write_mutex_.Unlock();
1629 }
1630 uint64_t recycle_log_number = 0;
1631 if (creating_new_log && immutable_db_options_.recycle_log_file_num &&
1632 !log_recycle_files_.empty()) {
1633 recycle_log_number = log_recycle_files_.front();
1634 }
1635 uint64_t new_log_number =
1636 creating_new_log ? versions_->NewFileNumber() : logfile_number_;
1637 const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
1638
1639 // Set memtable_info for memtable sealed callback
1640 #ifndef ROCKSDB_LITE
1641 MemTableInfo memtable_info;
1642 memtable_info.cf_name = cfd->GetName();
1643 memtable_info.first_seqno = cfd->mem()->GetFirstSequenceNumber();
1644 memtable_info.earliest_seqno = cfd->mem()->GetEarliestSequenceNumber();
1645 memtable_info.num_entries = cfd->mem()->num_entries();
1646 memtable_info.num_deletes = cfd->mem()->num_deletes();
1647 #endif // ROCKSDB_LITE
1648 // Log this later after lock release. It may be outdated, e.g., if background
1649 // flush happens before logging, but that should be ok.
1650 int num_imm_unflushed = cfd->imm()->NumNotFlushed();
1651 const auto preallocate_block_size =
1652 GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size);
1653 mutex_.Unlock();
1654 if (creating_new_log) {
1655 // TODO: Write buffer size passed in should be max of all CF's instead
1656 // of mutable_cf_options.write_buffer_size.
1657 s = CreateWAL(new_log_number, recycle_log_number, preallocate_block_size,
1658 &new_log);
1659 }
1660 if (s.ok()) {
1661 SequenceNumber seq = versions_->LastSequence();
1662 new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq);
1663 context->superversion_context.NewSuperVersion();
1664 }
1665 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1666 "[%s] New memtable created with log file: #%" PRIu64
1667 ". Immutable memtables: %d.\n",
1668 cfd->GetName().c_str(), new_log_number, num_imm_unflushed);
1669 mutex_.Lock();
1670 if (recycle_log_number != 0) {
1671 // Since renaming the file is done outside DB mutex, we need to ensure
1672 // concurrent full purges don't delete the file while we're recycling it.
1673 // To achieve that we hold the old log number in the recyclable list until
1674 // after it has been renamed.
1675 assert(log_recycle_files_.front() == recycle_log_number);
1676 log_recycle_files_.pop_front();
1677 }
1678 if (s.ok() && creating_new_log) {
1679 log_write_mutex_.Lock();
1680 assert(new_log != nullptr);
1681 if (!logs_.empty()) {
1682 // Alway flush the buffer of the last log before switching to a new one
1683 log::Writer* cur_log_writer = logs_.back().writer;
1684 s = cur_log_writer->WriteBuffer();
1685 if (!s.ok()) {
1686 ROCKS_LOG_WARN(immutable_db_options_.info_log,
1687 "[%s] Failed to switch from #%" PRIu64 " to #%" PRIu64
1688 " WAL file\n",
1689 cfd->GetName().c_str(), cur_log_writer->get_log_number(),
1690 new_log_number);
1691 }
1692 }
1693 if (s.ok()) {
1694 logfile_number_ = new_log_number;
1695 log_empty_ = true;
1696 log_dir_synced_ = false;
1697 logs_.emplace_back(logfile_number_, new_log);
1698 alive_log_files_.push_back(LogFileNumberSize(logfile_number_));
1699 }
1700 log_write_mutex_.Unlock();
1701 }
1702
1703 if (!s.ok()) {
1704 // how do we fail if we're not creating new log?
1705 assert(creating_new_log);
1706 if (new_mem) {
1707 delete new_mem;
1708 }
1709 if (new_log) {
1710 delete new_log;
1711 }
1712 SuperVersion* new_superversion =
1713 context->superversion_context.new_superversion.release();
1714 if (new_superversion != nullptr) {
1715 delete new_superversion;
1716 }
1717 // We may have lost data from the WritableFileBuffer in-memory buffer for
1718 // the current log, so treat it as a fatal error and set bg_error
1719 error_handler_.SetBGError(s, BackgroundErrorReason::kMemTable);
1720 // Read back bg_error in order to get the right severity
1721 s = error_handler_.GetBGError();
1722 return s;
1723 }
1724
1725 for (auto loop_cfd : *versions_->GetColumnFamilySet()) {
1726 // all this is just optimization to delete logs that
1727 // are no longer needed -- if CF is empty, that means it
1728 // doesn't need that particular log to stay alive, so we just
1729 // advance the log number. no need to persist this in the manifest
1730 if (loop_cfd->mem()->GetFirstSequenceNumber() == 0 &&
1731 loop_cfd->imm()->NumNotFlushed() == 0) {
1732 if (creating_new_log) {
1733 loop_cfd->SetLogNumber(logfile_number_);
1734 }
1735 loop_cfd->mem()->SetCreationSeq(versions_->LastSequence());
1736 }
1737 }
1738
1739 cfd->mem()->SetNextLogNumber(logfile_number_);
1740 cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_);
1741 new_mem->Ref();
1742 cfd->SetMemtable(new_mem);
1743 InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context,
1744 mutable_cf_options);
1745 #ifndef ROCKSDB_LITE
1746 mutex_.Unlock();
1747 // Notify client that memtable is sealed, now that we have successfully
1748 // installed a new memtable
1749 NotifyOnMemTableSealed(cfd, memtable_info);
1750 mutex_.Lock();
1751 #endif // ROCKSDB_LITE
1752 return s;
1753 }
1754
1755 size_t DBImpl::GetWalPreallocateBlockSize(uint64_t write_buffer_size) const {
1756 mutex_.AssertHeld();
1757 size_t bsize =
1758 static_cast<size_t>(write_buffer_size / 10 + write_buffer_size);
1759 // Some users might set very high write_buffer_size and rely on
1760 // max_total_wal_size or other parameters to control the WAL size.
1761 if (mutable_db_options_.max_total_wal_size > 0) {
1762 bsize = std::min<size_t>(
1763 bsize, static_cast<size_t>(mutable_db_options_.max_total_wal_size));
1764 }
1765 if (immutable_db_options_.db_write_buffer_size > 0) {
1766 bsize = std::min<size_t>(bsize, immutable_db_options_.db_write_buffer_size);
1767 }
1768 if (immutable_db_options_.write_buffer_manager &&
1769 immutable_db_options_.write_buffer_manager->enabled()) {
1770 bsize = std::min<size_t>(
1771 bsize, immutable_db_options_.write_buffer_manager->buffer_size());
1772 }
1773
1774 return bsize;
1775 }
1776
1777 // Default implementations of convenience methods that subclasses of DB
1778 // can call if they wish
1779 Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
1780 const Slice& key, const Slice& value) {
1781 if (nullptr == opt.timestamp) {
1782 // Pre-allocate size of write batch conservatively.
1783 // 8 bytes are taken by header, 4 bytes for count, 1 byte for type,
1784 // and we allocate 11 extra bytes for key length, as well as value length.
1785 WriteBatch batch(key.size() + value.size() + 24);
1786 Status s = batch.Put(column_family, key, value);
1787 if (!s.ok()) {
1788 return s;
1789 }
1790 return Write(opt, &batch);
1791 }
1792 const Slice* ts = opt.timestamp;
1793 assert(nullptr != ts);
1794 size_t ts_sz = ts->size();
1795 WriteBatch batch(key.size() + ts_sz + value.size() + 24, /*max_bytes=*/0,
1796 ts_sz);
1797 Status s = batch.Put(column_family, key, value);
1798 if (!s.ok()) {
1799 return s;
1800 }
1801 s = batch.AssignTimestamp(*ts);
1802 if (!s.ok()) {
1803 return s;
1804 }
1805 return Write(opt, &batch);
1806 }
1807
1808 Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,
1809 const Slice& key) {
1810 WriteBatch batch;
1811 batch.Delete(column_family, key);
1812 return Write(opt, &batch);
1813 }
1814
1815 Status DB::SingleDelete(const WriteOptions& opt,
1816 ColumnFamilyHandle* column_family, const Slice& key) {
1817 WriteBatch batch;
1818 batch.SingleDelete(column_family, key);
1819 return Write(opt, &batch);
1820 }
1821
1822 Status DB::DeleteRange(const WriteOptions& opt,
1823 ColumnFamilyHandle* column_family,
1824 const Slice& begin_key, const Slice& end_key) {
1825 WriteBatch batch;
1826 batch.DeleteRange(column_family, begin_key, end_key);
1827 return Write(opt, &batch);
1828 }
1829
1830 Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family,
1831 const Slice& key, const Slice& value) {
1832 WriteBatch batch;
1833 Status s = batch.Merge(column_family, key, value);
1834 if (!s.ok()) {
1835 return s;
1836 }
1837 return Write(opt, &batch);
1838 }
1839 } // namespace ROCKSDB_NAMESPACE