]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/db_impl_write.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / db / db_impl_write.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9#include "db/db_impl.h"
10
11#ifndef __STDC_FORMAT_MACROS
12#define __STDC_FORMAT_MACROS
13#endif
14#include <inttypes.h>
11fdf7f2
TL
15#include "db/error_handler.h"
16#include "db/event_helpers.h"
7c673cae
FG
17#include "monitoring/perf_context_imp.h"
18#include "options/options_helper.h"
19#include "util/sync_point.h"
20
21namespace rocksdb {
22// Convenience methods
23Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family,
24 const Slice& key, const Slice& val) {
25 return DB::Put(o, column_family, key, val);
26}
27
28Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family,
29 const Slice& key, const Slice& val) {
30 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
31 if (!cfh->cfd()->ioptions()->merge_operator) {
32 return Status::NotSupported("Provide a merge_operator when opening DB");
33 } else {
34 return DB::Merge(o, column_family, key, val);
35 }
36}
37
38Status DBImpl::Delete(const WriteOptions& write_options,
39 ColumnFamilyHandle* column_family, const Slice& key) {
40 return DB::Delete(write_options, column_family, key);
41}
42
43Status DBImpl::SingleDelete(const WriteOptions& write_options,
44 ColumnFamilyHandle* column_family,
45 const Slice& key) {
46 return DB::SingleDelete(write_options, column_family, key);
47}
48
11fdf7f2
TL
49void DBImpl::SetRecoverableStatePreReleaseCallback(
50 PreReleaseCallback* callback) {
51 recoverable_state_pre_release_callback_.reset(callback);
52}
53
7c673cae
FG
54Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
55 return WriteImpl(write_options, my_batch, nullptr, nullptr);
56}
57
58#ifndef ROCKSDB_LITE
59Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
60 WriteBatch* my_batch,
61 WriteCallback* callback) {
62 return WriteImpl(write_options, my_batch, callback, nullptr);
63}
64#endif // ROCKSDB_LITE
65
11fdf7f2
TL
66// The main write queue. This is the only write queue that updates LastSequence.
67// When using one write queue, the same sequence also indicates the last
68// published sequence.
7c673cae
FG
69Status DBImpl::WriteImpl(const WriteOptions& write_options,
70 WriteBatch* my_batch, WriteCallback* callback,
71 uint64_t* log_used, uint64_t log_ref,
11fdf7f2
TL
72 bool disable_memtable, uint64_t* seq_used,
73 size_t batch_cnt,
74 PreReleaseCallback* pre_release_callback) {
75 assert(!seq_per_batch_ || batch_cnt != 0);
7c673cae
FG
76 if (my_batch == nullptr) {
77 return Status::Corruption("Batch is nullptr!");
78 }
11fdf7f2
TL
79 if (tracer_) {
80 InstrumentedMutexLock lock(&trace_mutex_);
81 if (tracer_) {
82 tracer_->Write(my_batch);
83 }
84 }
85 if (write_options.sync && write_options.disableWAL) {
86 return Status::InvalidArgument("Sync writes has to enable WAL.");
87 }
88 if (two_write_queues_ && immutable_db_options_.enable_pipelined_write) {
89 return Status::NotSupported(
90 "pipelined_writes is not compatible with concurrent prepares");
91 }
92 if (seq_per_batch_ && immutable_db_options_.enable_pipelined_write) {
93 // TODO(yiwu): update pipeline write with seq_per_batch and batch_cnt
94 return Status::NotSupported(
95 "pipelined_writes is not compatible with seq_per_batch");
96 }
97 // Otherwise IsLatestPersistentState optimization does not make sense
98 assert(!WriteBatchInternal::IsLatestPersistentState(my_batch) ||
99 disable_memtable);
7c673cae
FG
100
101 Status status;
11fdf7f2
TL
102 if (write_options.low_pri) {
103 status = ThrottleLowPriWritesIfNeeded(write_options, my_batch);
104 if (!status.ok()) {
105 return status;
106 }
107 }
108
109 if (two_write_queues_ && disable_memtable) {
110 return WriteImplWALOnly(write_options, my_batch, callback, log_used,
111 log_ref, seq_used, batch_cnt, pre_release_callback);
112 }
113
114 if (immutable_db_options_.enable_pipelined_write) {
115 return PipelinedWriteImpl(write_options, my_batch, callback, log_used,
116 log_ref, disable_memtable, seq_used);
117 }
7c673cae
FG
118
119 PERF_TIMER_GUARD(write_pre_and_post_process_time);
120 WriteThread::Writer w(write_options, my_batch, callback, log_ref,
11fdf7f2 121 disable_memtable, batch_cnt, pre_release_callback);
7c673cae
FG
122
123 if (!write_options.disableWAL) {
124 RecordTick(stats_, WRITE_WITH_WAL);
125 }
126
127 StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
128
129 write_thread_.JoinBatchGroup(&w);
11fdf7f2 130 if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
7c673cae 131 // we are a non-leader in a parallel group
7c673cae
FG
132
133 if (w.ShouldWriteToMemtable()) {
11fdf7f2
TL
134 PERF_TIMER_STOP(write_pre_and_post_process_time);
135 PERF_TIMER_GUARD(write_memtable_time);
136
7c673cae
FG
137 ColumnFamilyMemTablesImpl column_family_memtables(
138 versions_->GetColumnFamilySet());
7c673cae 139 w.status = WriteBatchInternal::InsertInto(
11fdf7f2 140 &w, w.sequence, &column_family_memtables, &flush_scheduler_,
7c673cae 141 write_options.ignore_missing_column_families, 0 /*log_number*/, this,
11fdf7f2
TL
142 true /*concurrent_memtable_writes*/, seq_per_batch_, w.batch_cnt);
143
144 PERF_TIMER_START(write_pre_and_post_process_time);
7c673cae
FG
145 }
146
11fdf7f2 147 if (write_thread_.CompleteParallelMemTableWriter(&w)) {
7c673cae 148 // we're responsible for exit batch group
11fdf7f2
TL
149 // TODO(myabandeh): propagate status to write_group
150 auto last_sequence = w.write_group->last_sequence;
7c673cae 151 versions_->SetLastSequence(last_sequence);
11fdf7f2 152 MemTableInsertStatusCheck(w.status);
7c673cae
FG
153 write_thread_.ExitAsBatchGroupFollower(&w);
154 }
155 assert(w.state == WriteThread::STATE_COMPLETED);
156 // STATE_COMPLETED conditional below handles exit
157
158 status = w.FinalStatus();
159 }
160 if (w.state == WriteThread::STATE_COMPLETED) {
161 if (log_used != nullptr) {
162 *log_used = w.log_used;
163 }
11fdf7f2
TL
164 if (seq_used != nullptr) {
165 *seq_used = w.sequence;
166 }
7c673cae
FG
167 // write is complete and leader has updated sequence
168 return w.FinalStatus();
169 }
170 // else we are the leader of the write batch group
171 assert(w.state == WriteThread::STATE_GROUP_LEADER);
172
173 // Once reaches this point, the current writer "w" will try to do its write
174 // job. It may also pick up some of the remaining writers in the "writers_"
175 // when it finds suitable, and finish them in the same write batch.
176 // This is how a write job could be done by the other writer.
177 WriteContext write_context;
11fdf7f2 178 WriteThread::WriteGroup write_group;
7c673cae 179 bool in_parallel_group = false;
11fdf7f2
TL
180 uint64_t last_sequence = kMaxSequenceNumber;
181 if (!two_write_queues_) {
182 last_sequence = versions_->LastSequence();
183 }
7c673cae
FG
184
185 mutex_.Lock();
186
11fdf7f2 187 bool need_log_sync = write_options.sync;
7c673cae 188 bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
11fdf7f2
TL
189 if (!two_write_queues_ || !disable_memtable) {
190 // With concurrent writes we do preprocess only in the write thread that
191 // also does write to memtable to avoid sync issue on shared data structure
192 // with the other thread
193
194 // PreprocessWrite does its own perf timing.
195 PERF_TIMER_STOP(write_pre_and_post_process_time);
196
197 status = PreprocessWrite(write_options, &need_log_sync, &write_context);
198
199 PERF_TIMER_START(write_pre_and_post_process_time);
200 }
201 log::Writer* log_writer = logs_.back().writer;
7c673cae
FG
202
203 mutex_.Unlock();
204
205 // Add to log and apply to memtable. We can release the lock
206 // during this phase since &w is currently responsible for logging
207 // and protects against concurrent loggers and concurrent writes
208 // into memtables
209
11fdf7f2 210 TEST_SYNC_POINT("DBImpl::WriteImpl:BeforeLeaderEnters");
7c673cae 211 last_batch_group_size_ =
11fdf7f2 212 write_thread_.EnterAsBatchGroupLeader(&w, &write_group);
7c673cae
FG
213
214 if (status.ok()) {
215 // Rules for when we can update the memtable concurrently
216 // 1. supported by memtable
217 // 2. Puts are not okay if inplace_update_support
218 // 3. Merges are not okay
219 //
220 // Rules 1..2 are enforced by checking the options
221 // during startup (CheckConcurrentWritesSupported), so if
222 // options.allow_concurrent_memtable_write is true then they can be
223 // assumed to be true. Rule 3 is checked for each batch. We could
224 // relax rules 2 if we could prevent write batches from referring
225 // more than once to a particular key.
226 bool parallel = immutable_db_options_.allow_concurrent_memtable_write &&
11fdf7f2
TL
227 write_group.size > 1;
228 size_t total_count = 0;
229 size_t valid_batches = 0;
230 size_t total_byte_size = 0;
231 for (auto* writer : write_group) {
7c673cae 232 if (writer->CheckCallback(this)) {
11fdf7f2 233 valid_batches += writer->batch_cnt;
7c673cae
FG
234 if (writer->ShouldWriteToMemtable()) {
235 total_count += WriteBatchInternal::Count(writer->batch);
236 parallel = parallel && !writer->batch->HasMerge();
237 }
238
239 total_byte_size = WriteBatchInternal::AppendedByteSize(
240 total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
241 }
242 }
11fdf7f2
TL
243 // Note about seq_per_batch_: either disableWAL is set for the entire write
244 // group or not. In either case we inc seq for each write batch with no
245 // failed callback. This means that there could be a batch with
246 // disalbe_memtable in between; although we do not write this batch to
247 // memtable it still consumes a seq. Otherwise, if !seq_per_batch_, we inc
248 // the seq per valid written key to mem.
249 size_t seq_inc = seq_per_batch_ ? valid_batches : total_count;
250
251 const bool concurrent_update = two_write_queues_;
7c673cae
FG
252 // Update stats while we are an exclusive group leader, so we know
253 // that nobody else can be writing to these particular stats.
254 // We're optimistic, updating the stats before we successfully
255 // commit. That lets us release our leader status early.
256 auto stats = default_cf_internal_stats_;
11fdf7f2
TL
257 stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_count,
258 concurrent_update);
7c673cae 259 RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
11fdf7f2
TL
260 stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size,
261 concurrent_update);
7c673cae 262 RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
11fdf7f2 263 stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1, concurrent_update);
7c673cae 264 RecordTick(stats_, WRITE_DONE_BY_SELF);
11fdf7f2 265 auto write_done_by_other = write_group.size - 1;
7c673cae 266 if (write_done_by_other > 0) {
11fdf7f2
TL
267 stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, write_done_by_other,
268 concurrent_update);
7c673cae
FG
269 RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other);
270 }
494da23a 271 RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size);
7c673cae
FG
272
273 if (write_options.disableWAL) {
274 has_unpersisted_data_.store(true, std::memory_order_relaxed);
275 }
276
277 PERF_TIMER_STOP(write_pre_and_post_process_time);
278
11fdf7f2
TL
279 if (!two_write_queues_) {
280 if (status.ok() && !write_options.disableWAL) {
281 PERF_TIMER_GUARD(write_wal_time);
282 status = WriteToWAL(write_group, log_writer, log_used, need_log_sync,
283 need_log_dir_sync, last_sequence + 1);
284 }
285 } else {
286 if (status.ok() && !write_options.disableWAL) {
287 PERF_TIMER_GUARD(write_wal_time);
288 // LastAllocatedSequence is increased inside WriteToWAL under
289 // wal_write_mutex_ to ensure ordered events in WAL
290 status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence,
291 seq_inc);
292 } else {
293 // Otherwise we inc seq number for memtable writes
294 last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
7c673cae
FG
295 }
296 }
11fdf7f2
TL
297 assert(last_sequence != kMaxSequenceNumber);
298 const SequenceNumber current_sequence = last_sequence + 1;
299 last_sequence += seq_inc;
7c673cae 300
494da23a
TL
301 // PreReleaseCallback is called after WAL write and before memtable write
302 if (status.ok()) {
303 SequenceNumber next_sequence = current_sequence;
304 // Note: the logic for advancing seq here must be consistent with the
305 // logic in WriteBatchInternal::InsertInto(write_group...) as well as
306 // with WriteBatchInternal::InsertInto(write_batch...) that is called on
307 // the merged batch during recovery from the WAL.
308 for (auto* writer : write_group) {
309 if (writer->CallbackFailed()) {
310 continue;
311 }
312 writer->sequence = next_sequence;
313 if (writer->pre_release_callback) {
314 Status ws = writer->pre_release_callback->Callback(
315 writer->sequence, disable_memtable, writer->log_used);
316 if (!ws.ok()) {
317 status = ws;
318 break;
319 }
320 }
321 if (seq_per_batch_) {
322 assert(writer->batch_cnt);
323 next_sequence += writer->batch_cnt;
324 } else if (writer->ShouldWriteToMemtable()) {
325 next_sequence += WriteBatchInternal::Count(writer->batch);
326 }
327 }
328 }
329
7c673cae
FG
330 if (status.ok()) {
331 PERF_TIMER_GUARD(write_memtable_time);
332
333 if (!parallel) {
11fdf7f2 334 // w.sequence will be set inside InsertInto
7c673cae
FG
335 w.status = WriteBatchInternal::InsertInto(
336 write_group, current_sequence, column_family_memtables_.get(),
337 &flush_scheduler_, write_options.ignore_missing_column_families,
11fdf7f2
TL
338 0 /*recovery_log_number*/, this, parallel, seq_per_batch_,
339 batch_per_txn_);
7c673cae 340 } else {
11fdf7f2 341 write_group.last_sequence = last_sequence;
11fdf7f2 342 write_thread_.LaunchParallelMemTableWriters(&write_group);
7c673cae
FG
343 in_parallel_group = true;
344
345 // Each parallel follower is doing each own writes. The leader should
346 // also do its own.
347 if (w.ShouldWriteToMemtable()) {
348 ColumnFamilyMemTablesImpl column_family_memtables(
349 versions_->GetColumnFamilySet());
350 assert(w.sequence == current_sequence);
7c673cae 351 w.status = WriteBatchInternal::InsertInto(
11fdf7f2 352 &w, w.sequence, &column_family_memtables, &flush_scheduler_,
7c673cae 353 write_options.ignore_missing_column_families, 0 /*log_number*/,
11fdf7f2
TL
354 this, true /*concurrent_memtable_writes*/, seq_per_batch_,
355 w.batch_cnt, batch_per_txn_);
7c673cae
FG
356 }
357 }
11fdf7f2
TL
358 if (seq_used != nullptr) {
359 *seq_used = w.sequence;
360 }
7c673cae
FG
361 }
362 }
363 PERF_TIMER_START(write_pre_and_post_process_time);
364
11fdf7f2
TL
365 if (!w.CallbackFailed()) {
366 WriteStatusCheck(status);
7c673cae
FG
367 }
368
11fdf7f2 369 if (need_log_sync) {
7c673cae
FG
370 mutex_.Lock();
371 MarkLogsSynced(logfile_number_, need_log_dir_sync, status);
372 mutex_.Unlock();
11fdf7f2
TL
373 // Requesting sync with two_write_queues_ is expected to be very rare. We
374 // hence provide a simple implementation that is not necessarily efficient.
375 if (two_write_queues_) {
376 if (manual_wal_flush_) {
377 status = FlushWAL(true);
378 } else {
379 status = SyncWAL();
380 }
381 }
7c673cae
FG
382 }
383
384 bool should_exit_batch_group = true;
385 if (in_parallel_group) {
386 // CompleteParallelWorker returns true if this thread should
387 // handle exit, false means somebody else did
11fdf7f2 388 should_exit_batch_group = write_thread_.CompleteParallelMemTableWriter(&w);
7c673cae
FG
389 }
390 if (should_exit_batch_group) {
11fdf7f2 391 if (status.ok()) {
494da23a
TL
392 // Note: if we are to resume after non-OK statuses we need to revisit how
393 // we reacts to non-OK statuses here.
11fdf7f2
TL
394 versions_->SetLastSequence(last_sequence);
395 }
396 MemTableInsertStatusCheck(w.status);
397 write_thread_.ExitAsBatchGroupLeader(write_group, status);
7c673cae
FG
398 }
399
400 if (status.ok()) {
401 status = w.FinalStatus();
402 }
11fdf7f2
TL
403 return status;
404}
405
406Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
407 WriteBatch* my_batch, WriteCallback* callback,
408 uint64_t* log_used, uint64_t log_ref,
409 bool disable_memtable, uint64_t* seq_used) {
410 PERF_TIMER_GUARD(write_pre_and_post_process_time);
411 StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
412
413 WriteContext write_context;
414
415 WriteThread::Writer w(write_options, my_batch, callback, log_ref,
416 disable_memtable);
417 write_thread_.JoinBatchGroup(&w);
418 if (w.state == WriteThread::STATE_GROUP_LEADER) {
419 WriteThread::WriteGroup wal_write_group;
420 if (w.callback && !w.callback->AllowWriteBatching()) {
421 write_thread_.WaitForMemTableWriters();
422 }
423 mutex_.Lock();
424 bool need_log_sync = !write_options.disableWAL && write_options.sync;
425 bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
426 // PreprocessWrite does its own perf timing.
427 PERF_TIMER_STOP(write_pre_and_post_process_time);
428 w.status = PreprocessWrite(write_options, &need_log_sync, &write_context);
429 PERF_TIMER_START(write_pre_and_post_process_time);
430 log::Writer* log_writer = logs_.back().writer;
431 mutex_.Unlock();
432
433 // This can set non-OK status if callback fail.
434 last_batch_group_size_ =
435 write_thread_.EnterAsBatchGroupLeader(&w, &wal_write_group);
436 const SequenceNumber current_sequence =
437 write_thread_.UpdateLastSequence(versions_->LastSequence()) + 1;
438 size_t total_count = 0;
439 size_t total_byte_size = 0;
440
441 if (w.status.ok()) {
442 SequenceNumber next_sequence = current_sequence;
443 for (auto writer : wal_write_group) {
444 if (writer->CheckCallback(this)) {
445 if (writer->ShouldWriteToMemtable()) {
446 writer->sequence = next_sequence;
447 size_t count = WriteBatchInternal::Count(writer->batch);
448 next_sequence += count;
449 total_count += count;
450 }
451 total_byte_size = WriteBatchInternal::AppendedByteSize(
452 total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
453 }
454 }
455 if (w.disable_wal) {
456 has_unpersisted_data_.store(true, std::memory_order_relaxed);
457 }
458 write_thread_.UpdateLastSequence(current_sequence + total_count - 1);
459 }
7c673cae 460
11fdf7f2
TL
461 auto stats = default_cf_internal_stats_;
462 stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_count);
463 RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
464 stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size);
465 RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
494da23a 466 RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size);
11fdf7f2
TL
467
468 PERF_TIMER_STOP(write_pre_and_post_process_time);
469
494da23a 470 if (w.status.ok() && !write_options.disableWAL) {
11fdf7f2
TL
471 PERF_TIMER_GUARD(write_wal_time);
472 stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1);
473 RecordTick(stats_, WRITE_DONE_BY_SELF, 1);
474 if (wal_write_group.size > 1) {
475 stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER,
476 wal_write_group.size - 1);
477 RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1);
478 }
479 w.status = WriteToWAL(wal_write_group, log_writer, log_used,
480 need_log_sync, need_log_dir_sync, current_sequence);
481 }
482
483 if (!w.CallbackFailed()) {
484 WriteStatusCheck(w.status);
485 }
486
487 if (need_log_sync) {
488 mutex_.Lock();
489 MarkLogsSynced(logfile_number_, need_log_dir_sync, w.status);
490 mutex_.Unlock();
491 }
492
493 write_thread_.ExitAsBatchGroupLeader(wal_write_group, w.status);
494 }
495
496 WriteThread::WriteGroup memtable_write_group;
497 if (w.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) {
498 PERF_TIMER_GUARD(write_memtable_time);
494da23a 499 assert(w.ShouldWriteToMemtable());
11fdf7f2
TL
500 write_thread_.EnterAsMemTableWriter(&w, &memtable_write_group);
501 if (memtable_write_group.size > 1 &&
502 immutable_db_options_.allow_concurrent_memtable_write) {
503 write_thread_.LaunchParallelMemTableWriters(&memtable_write_group);
504 } else {
505 memtable_write_group.status = WriteBatchInternal::InsertInto(
506 memtable_write_group, w.sequence, column_family_memtables_.get(),
507 &flush_scheduler_, write_options.ignore_missing_column_families,
508 0 /*log_number*/, this, false /*concurrent_memtable_writes*/,
509 seq_per_batch_, batch_per_txn_);
510 versions_->SetLastSequence(memtable_write_group.last_sequence);
511 write_thread_.ExitAsMemTableWriter(&w, memtable_write_group);
512 }
513 }
514
515 if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
516 assert(w.ShouldWriteToMemtable());
517 ColumnFamilyMemTablesImpl column_family_memtables(
518 versions_->GetColumnFamilySet());
519 w.status = WriteBatchInternal::InsertInto(
520 &w, w.sequence, &column_family_memtables, &flush_scheduler_,
521 write_options.ignore_missing_column_families, 0 /*log_number*/, this,
522 true /*concurrent_memtable_writes*/);
523 if (write_thread_.CompleteParallelMemTableWriter(&w)) {
524 MemTableInsertStatusCheck(w.status);
525 versions_->SetLastSequence(w.write_group->last_sequence);
526 write_thread_.ExitAsMemTableWriter(&w, *w.write_group);
527 }
528 }
529 if (seq_used != nullptr) {
530 *seq_used = w.sequence;
531 }
532
533 assert(w.state == WriteThread::STATE_COMPLETED);
534 return w.FinalStatus();
535}
536
537// The 2nd write queue. If enabled it will be used only for WAL-only writes.
538// This is the only queue that updates LastPublishedSequence which is only
539// applicable in a two-queue setting.
540Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options,
541 WriteBatch* my_batch, WriteCallback* callback,
542 uint64_t* log_used, uint64_t log_ref,
543 uint64_t* seq_used, size_t batch_cnt,
544 PreReleaseCallback* pre_release_callback) {
545 Status status;
546 PERF_TIMER_GUARD(write_pre_and_post_process_time);
547 WriteThread::Writer w(write_options, my_batch, callback, log_ref,
548 true /* disable_memtable */, batch_cnt,
549 pre_release_callback);
550 RecordTick(stats_, WRITE_WITH_WAL);
551 StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
552
553 nonmem_write_thread_.JoinBatchGroup(&w);
554 assert(w.state != WriteThread::STATE_PARALLEL_MEMTABLE_WRITER);
555 if (w.state == WriteThread::STATE_COMPLETED) {
556 if (log_used != nullptr) {
557 *log_used = w.log_used;
558 }
559 if (seq_used != nullptr) {
560 *seq_used = w.sequence;
561 }
562 return w.FinalStatus();
563 }
564 // else we are the leader of the write batch group
565 assert(w.state == WriteThread::STATE_GROUP_LEADER);
566 WriteThread::WriteGroup write_group;
567 uint64_t last_sequence;
568 nonmem_write_thread_.EnterAsBatchGroupLeader(&w, &write_group);
569 // Note: no need to update last_batch_group_size_ here since the batch writes
570 // to WAL only
571
572 size_t total_byte_size = 0;
573 for (auto* writer : write_group) {
574 if (writer->CheckCallback(this)) {
575 total_byte_size = WriteBatchInternal::AppendedByteSize(
576 total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
577 }
578 }
579
580 const bool concurrent_update = true;
581 // Update stats while we are an exclusive group leader, so we know
582 // that nobody else can be writing to these particular stats.
583 // We're optimistic, updating the stats before we successfully
584 // commit. That lets us release our leader status early.
585 auto stats = default_cf_internal_stats_;
586 stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size,
587 concurrent_update);
588 RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
589 stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1, concurrent_update);
590 RecordTick(stats_, WRITE_DONE_BY_SELF);
591 auto write_done_by_other = write_group.size - 1;
592 if (write_done_by_other > 0) {
593 stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, write_done_by_other,
594 concurrent_update);
595 RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other);
596 }
494da23a 597 RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size);
11fdf7f2
TL
598
599 PERF_TIMER_STOP(write_pre_and_post_process_time);
600
601 PERF_TIMER_GUARD(write_wal_time);
602 // LastAllocatedSequence is increased inside WriteToWAL under
603 // wal_write_mutex_ to ensure ordered events in WAL
604 size_t seq_inc = 0 /* total_count */;
605 if (seq_per_batch_) {
606 size_t total_batch_cnt = 0;
607 for (auto* writer : write_group) {
608 assert(writer->batch_cnt);
609 total_batch_cnt += writer->batch_cnt;
610 }
611 seq_inc = total_batch_cnt;
612 }
613 if (!write_options.disableWAL) {
614 status =
615 ConcurrentWriteToWAL(write_group, log_used, &last_sequence, seq_inc);
616 } else {
617 // Otherwise we inc seq number to do solely the seq allocation
618 last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
619 }
620 auto curr_seq = last_sequence + 1;
621 for (auto* writer : write_group) {
622 if (writer->CallbackFailed()) {
623 continue;
624 }
625 writer->sequence = curr_seq;
626 if (seq_per_batch_) {
627 assert(writer->batch_cnt);
628 curr_seq += writer->batch_cnt;
629 }
630 // else seq advances only by memtable writes
631 }
632 if (status.ok() && write_options.sync) {
633 assert(!write_options.disableWAL);
634 // Requesting sync with two_write_queues_ is expected to be very rare. We
635 // hance provide a simple implementation that is not necessarily efficient.
636 if (manual_wal_flush_) {
637 status = FlushWAL(true);
638 } else {
639 status = SyncWAL();
640 }
641 }
642 PERF_TIMER_START(write_pre_and_post_process_time);
643
644 if (!w.CallbackFailed()) {
645 WriteStatusCheck(status);
646 }
647 if (status.ok()) {
648 for (auto* writer : write_group) {
649 if (!writer->CallbackFailed() && writer->pre_release_callback) {
650 assert(writer->sequence != kMaxSequenceNumber);
651 const bool DISABLE_MEMTABLE = true;
494da23a
TL
652 Status ws = writer->pre_release_callback->Callback(
653 writer->sequence, DISABLE_MEMTABLE, writer->log_used);
11fdf7f2
TL
654 if (!ws.ok()) {
655 status = ws;
656 break;
657 }
658 }
659 }
660 }
661 nonmem_write_thread_.ExitAsBatchGroupLeader(write_group, status);
662 if (status.ok()) {
663 status = w.FinalStatus();
664 }
665 if (seq_used != nullptr) {
666 *seq_used = w.sequence;
667 }
7c673cae
FG
668 return status;
669}
670
11fdf7f2
TL
671void DBImpl::WriteStatusCheck(const Status& status) {
672 // Is setting bg_error_ enough here? This will at least stop
673 // compaction and fail any further writes.
674 if (immutable_db_options_.paranoid_checks && !status.ok() &&
675 !status.IsBusy() && !status.IsIncomplete()) {
676 mutex_.Lock();
677 error_handler_.SetBGError(status, BackgroundErrorReason::kWriteCallback);
678 mutex_.Unlock();
679 }
680}
681
682void DBImpl::MemTableInsertStatusCheck(const Status& status) {
7c673cae
FG
683 // A non-OK status here indicates that the state implied by the
684 // WAL has diverged from the in-memory state. This could be
685 // because of a corrupt write_batch (very bad), or because the
686 // client specified an invalid column family and didn't specify
687 // ignore_missing_column_families.
11fdf7f2 688 if (!status.ok()) {
7c673cae 689 mutex_.Lock();
11fdf7f2
TL
690 assert(!error_handler_.IsBGWorkStopped());
691 error_handler_.SetBGError(status, BackgroundErrorReason::kMemTable);
7c673cae
FG
692 mutex_.Unlock();
693 }
694}
695
696Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
11fdf7f2 697 bool* need_log_sync,
7c673cae
FG
698 WriteContext* write_context) {
699 mutex_.AssertHeld();
11fdf7f2 700 assert(write_context != nullptr && need_log_sync != nullptr);
7c673cae
FG
701 Status status;
702
11fdf7f2
TL
703 if (error_handler_.IsDBStopped()) {
704 status = error_handler_.GetBGError();
705 }
706
707 PERF_TIMER_GUARD(write_scheduling_flushes_compactions_time);
708
7c673cae
FG
709 assert(!single_column_family_mode_ ||
710 versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1);
711 if (UNLIKELY(status.ok() && !single_column_family_mode_ &&
712 total_log_size_ > GetMaxTotalWalSize())) {
11fdf7f2 713 status = SwitchWAL(write_context);
7c673cae
FG
714 }
715
716 if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldFlush())) {
717 // Before a new memtable is added in SwitchMemtable(),
718 // write_buffer_manager_->ShouldFlush() will keep returning true. If another
719 // thread is writing to another DB with the same write buffer, they may also
720 // be flushed. We may end up with flushing much more DBs than needed. It's
721 // suboptimal but still correct.
722 status = HandleWriteBufferFull(write_context);
723 }
724
7c673cae
FG
725 if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) {
726 status = ScheduleFlushes(write_context);
727 }
728
11fdf7f2
TL
729 PERF_TIMER_STOP(write_scheduling_flushes_compactions_time);
730 PERF_TIMER_GUARD(write_pre_and_post_process_time);
731
7c673cae
FG
732 if (UNLIKELY(status.ok() && (write_controller_.IsStopped() ||
733 write_controller_.NeedsDelay()))) {
11fdf7f2 734 PERF_TIMER_STOP(write_pre_and_post_process_time);
7c673cae
FG
735 PERF_TIMER_GUARD(write_delay_time);
736 // We don't know size of curent batch so that we always use the size
737 // for previous one. It might create a fairness issue that expiration
738 // might happen for smaller writes but larger writes can go through.
739 // Can optimize it if it is an issue.
740 status = DelayWrite(last_batch_group_size_, write_options);
11fdf7f2 741 PERF_TIMER_START(write_pre_and_post_process_time);
7c673cae
FG
742 }
743
11fdf7f2 744 if (status.ok() && *need_log_sync) {
7c673cae
FG
745 // Wait until the parallel syncs are finished. Any sync process has to sync
746 // the front log too so it is enough to check the status of front()
747 // We do a while loop since log_sync_cv_ is signalled when any sync is
748 // finished
749 // Note: there does not seem to be a reason to wait for parallel sync at
750 // this early step but it is not important since parallel sync (SyncWAL) and
751 // need_log_sync are usually not used together.
752 while (logs_.front().getting_synced) {
753 log_sync_cv_.Wait();
754 }
755 for (auto& log : logs_) {
756 assert(!log.getting_synced);
757 // This is just to prevent the logs to be synced by a parallel SyncWAL
758 // call. We will do the actual syncing later after we will write to the
759 // WAL.
760 // Note: there does not seem to be a reason to set this early before we
761 // actually write to the WAL
762 log.getting_synced = true;
763 }
11fdf7f2
TL
764 } else {
765 *need_log_sync = false;
7c673cae
FG
766 }
767
768 return status;
769}
770
11fdf7f2
TL
771WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
772 WriteBatch* tmp_batch, size_t* write_with_wal,
773 WriteBatch** to_be_cached_state) {
774 assert(write_with_wal != nullptr);
775 assert(tmp_batch != nullptr);
776 assert(*to_be_cached_state == nullptr);
7c673cae 777 WriteBatch* merged_batch = nullptr;
11fdf7f2
TL
778 *write_with_wal = 0;
779 auto* leader = write_group.leader;
780 assert(!leader->disable_wal); // Same holds for all in the batch group
781 if (write_group.size == 1 && !leader->CallbackFailed() &&
782 leader->batch->GetWalTerminationPoint().is_cleared()) {
7c673cae
FG
783 // we simply write the first WriteBatch to WAL if the group only
784 // contains one batch, that batch should be written to the WAL,
785 // and the batch is not wanting to be truncated
11fdf7f2
TL
786 merged_batch = leader->batch;
787 if (WriteBatchInternal::IsLatestPersistentState(merged_batch)) {
788 *to_be_cached_state = merged_batch;
789 }
790 *write_with_wal = 1;
7c673cae
FG
791 } else {
792 // WAL needs all of the batches flattened into a single batch.
793 // We could avoid copying here with an iov-like AddRecord
794 // interface
11fdf7f2 795 merged_batch = tmp_batch;
7c673cae 796 for (auto writer : write_group) {
11fdf7f2 797 if (!writer->CallbackFailed()) {
7c673cae
FG
798 WriteBatchInternal::Append(merged_batch, writer->batch,
799 /*WAL_only*/ true);
11fdf7f2
TL
800 if (WriteBatchInternal::IsLatestPersistentState(writer->batch)) {
801 // We only need to cache the last of such write batch
802 *to_be_cached_state = writer->batch;
803 }
804 (*write_with_wal)++;
7c673cae 805 }
7c673cae
FG
806 }
807 }
11fdf7f2
TL
808 return merged_batch;
809}
7c673cae 810
11fdf7f2
TL
811// When two_write_queues_ is disabled, this function is called from the only
812// write thread. Otherwise this must be called holding log_write_mutex_.
813Status DBImpl::WriteToWAL(const WriteBatch& merged_batch,
814 log::Writer* log_writer, uint64_t* log_used,
815 uint64_t* log_size) {
816 assert(log_size != nullptr);
817 Slice log_entry = WriteBatchInternal::Contents(&merged_batch);
818 *log_size = log_entry.size();
819 // When two_write_queues_ WriteToWAL has to be protected from concurretn calls
820 // from the two queues anyway and log_write_mutex_ is already held. Otherwise
821 // if manual_wal_flush_ is enabled we need to protect log_writer->AddRecord
822 // from possible concurrent calls via the FlushWAL by the application.
823 const bool needs_locking = manual_wal_flush_ && !two_write_queues_;
824 // Due to performance cocerns of missed branch prediction penalize the new
825 // manual_wal_flush_ feature (by UNLIKELY) instead of the more common case
826 // when we do not need any locking.
827 if (UNLIKELY(needs_locking)) {
828 log_write_mutex_.Lock();
829 }
830 Status status = log_writer->AddRecord(log_entry);
831 if (UNLIKELY(needs_locking)) {
832 log_write_mutex_.Unlock();
833 }
834 if (log_used != nullptr) {
835 *log_used = logfile_number_;
836 }
7c673cae 837 total_log_size_ += log_entry.size();
11fdf7f2
TL
838 // TODO(myabandeh): it might be unsafe to access alive_log_files_.back() here
839 // since alive_log_files_ might be modified concurrently
7c673cae
FG
840 alive_log_files_.back().AddSize(log_entry.size());
841 log_empty_ = false;
11fdf7f2
TL
842 return status;
843}
844
845Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
846 log::Writer* log_writer, uint64_t* log_used,
847 bool need_log_sync, bool need_log_dir_sync,
848 SequenceNumber sequence) {
849 Status status;
850
851 assert(!write_group.leader->disable_wal);
852 // Same holds for all in the batch group
853 size_t write_with_wal = 0;
854 WriteBatch* to_be_cached_state = nullptr;
855 WriteBatch* merged_batch = MergeBatch(write_group, &tmp_batch_,
856 &write_with_wal, &to_be_cached_state);
857 if (merged_batch == write_group.leader->batch) {
858 write_group.leader->log_used = logfile_number_;
859 } else if (write_with_wal > 1) {
860 for (auto writer : write_group) {
861 writer->log_used = logfile_number_;
862 }
863 }
864
865 WriteBatchInternal::SetSequence(merged_batch, sequence);
866
867 uint64_t log_size;
868 status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
869 if (to_be_cached_state) {
870 cached_recoverable_state_ = *to_be_cached_state;
494da23a 871 cached_recoverable_state_empty_ = false;
11fdf7f2 872 }
7c673cae
FG
873
874 if (status.ok() && need_log_sync) {
875 StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS);
876 // It's safe to access logs_ with unlocked mutex_ here because:
877 // - we've set getting_synced=true for all logs,
878 // so other threads won't pop from logs_ while we're here,
879 // - only writer thread can push to logs_, and we're in
880 // writer thread, so no one will push to logs_,
881 // - as long as other threads don't modify it, it's safe to read
882 // from std::deque from multiple threads concurrently.
883 for (auto& log : logs_) {
884 status = log.writer->file()->Sync(immutable_db_options_.use_fsync);
885 if (!status.ok()) {
886 break;
887 }
888 }
889 if (status.ok() && need_log_dir_sync) {
890 // We only sync WAL directory the first time WAL syncing is
891 // requested, so that in case users never turn on WAL sync,
892 // we can avoid the disk I/O in the write code path.
893 status = directories_.GetWalDir()->Fsync();
894 }
895 }
896
897 if (merged_batch == &tmp_batch_) {
898 tmp_batch_.Clear();
899 }
900 if (status.ok()) {
901 auto stats = default_cf_internal_stats_;
902 if (need_log_sync) {
903 stats->AddDBStats(InternalStats::WAL_FILE_SYNCED, 1);
904 RecordTick(stats_, WAL_FILE_SYNCED);
905 }
906 stats->AddDBStats(InternalStats::WAL_FILE_BYTES, log_size);
907 RecordTick(stats_, WAL_FILE_BYTES, log_size);
908 stats->AddDBStats(InternalStats::WRITE_WITH_WAL, write_with_wal);
909 RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
910 }
911 return status;
912}
913
11fdf7f2
TL
914Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
915 uint64_t* log_used,
916 SequenceNumber* last_sequence,
917 size_t seq_inc) {
918 Status status;
919
920 assert(!write_group.leader->disable_wal);
921 // Same holds for all in the batch group
922 WriteBatch tmp_batch;
923 size_t write_with_wal = 0;
924 WriteBatch* to_be_cached_state = nullptr;
925 WriteBatch* merged_batch =
926 MergeBatch(write_group, &tmp_batch, &write_with_wal, &to_be_cached_state);
927
928 // We need to lock log_write_mutex_ since logs_ and alive_log_files might be
929 // pushed back concurrently
930 log_write_mutex_.Lock();
931 if (merged_batch == write_group.leader->batch) {
932 write_group.leader->log_used = logfile_number_;
933 } else if (write_with_wal > 1) {
934 for (auto writer : write_group) {
935 writer->log_used = logfile_number_;
936 }
937 }
938 *last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
939 auto sequence = *last_sequence + 1;
940 WriteBatchInternal::SetSequence(merged_batch, sequence);
941
942 log::Writer* log_writer = logs_.back().writer;
943 uint64_t log_size;
944 status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
945 if (to_be_cached_state) {
946 cached_recoverable_state_ = *to_be_cached_state;
494da23a 947 cached_recoverable_state_empty_ = false;
11fdf7f2
TL
948 }
949 log_write_mutex_.Unlock();
950
951 if (status.ok()) {
952 const bool concurrent = true;
953 auto stats = default_cf_internal_stats_;
954 stats->AddDBStats(InternalStats::WAL_FILE_BYTES, log_size, concurrent);
955 RecordTick(stats_, WAL_FILE_BYTES, log_size);
956 stats->AddDBStats(InternalStats::WRITE_WITH_WAL, write_with_wal,
957 concurrent);
958 RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
959 }
960 return status;
961}
962
963Status DBImpl::WriteRecoverableState() {
964 mutex_.AssertHeld();
965 if (!cached_recoverable_state_empty_) {
966 bool dont_care_bool;
967 SequenceNumber next_seq;
968 if (two_write_queues_) {
969 log_write_mutex_.Lock();
970 }
971 SequenceNumber seq;
972 if (two_write_queues_) {
973 seq = versions_->FetchAddLastAllocatedSequence(0);
974 } else {
975 seq = versions_->LastSequence();
976 }
977 WriteBatchInternal::SetSequence(&cached_recoverable_state_, seq + 1);
978 auto status = WriteBatchInternal::InsertInto(
979 &cached_recoverable_state_, column_family_memtables_.get(),
980 &flush_scheduler_, true, 0 /*recovery_log_number*/, this,
981 false /* concurrent_memtable_writes */, &next_seq, &dont_care_bool,
982 seq_per_batch_);
983 auto last_seq = next_seq - 1;
984 if (two_write_queues_) {
985 versions_->FetchAddLastAllocatedSequence(last_seq - seq);
986 versions_->SetLastPublishedSequence(last_seq);
987 }
988 versions_->SetLastSequence(last_seq);
989 if (two_write_queues_) {
990 log_write_mutex_.Unlock();
991 }
992 if (status.ok() && recoverable_state_pre_release_callback_) {
993 const bool DISABLE_MEMTABLE = true;
994 for (uint64_t sub_batch_seq = seq + 1;
995 sub_batch_seq < next_seq && status.ok(); sub_batch_seq++) {
494da23a 996 uint64_t const no_log_num = 0;
11fdf7f2 997 status = recoverable_state_pre_release_callback_->Callback(
494da23a 998 sub_batch_seq, !DISABLE_MEMTABLE, no_log_num);
11fdf7f2
TL
999 }
1000 }
1001 if (status.ok()) {
1002 cached_recoverable_state_.Clear();
1003 cached_recoverable_state_empty_ = true;
1004 }
1005 return status;
1006 }
1007 return Status::OK();
1008}
1009
494da23a
TL
1010void DBImpl::SelectColumnFamiliesForAtomicFlush(
1011 autovector<ColumnFamilyData*>* cfds) {
1012 for (ColumnFamilyData* cfd : *versions_->GetColumnFamilySet()) {
1013 if (cfd->IsDropped()) {
1014 continue;
1015 }
1016 if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
1017 !cached_recoverable_state_empty_.load()) {
1018 cfds->push_back(cfd);
1019 }
1020 }
1021}
1022
1023// Assign sequence number for atomic flush.
1024void DBImpl::AssignAtomicFlushSeq(const autovector<ColumnFamilyData*>& cfds) {
1025 assert(immutable_db_options_.atomic_flush);
1026 auto seq = versions_->LastSequence();
1027 for (auto cfd : cfds) {
1028 cfd->imm()->AssignAtomicFlushSeq(seq);
1029 }
1030}
1031
11fdf7f2 1032Status DBImpl::SwitchWAL(WriteContext* write_context) {
7c673cae
FG
1033 mutex_.AssertHeld();
1034 assert(write_context != nullptr);
1035 Status status;
1036
1037 if (alive_log_files_.begin()->getting_flushed) {
1038 return status;
1039 }
1040
1041 auto oldest_alive_log = alive_log_files_.begin()->number;
11fdf7f2
TL
1042 bool flush_wont_release_oldest_log = false;
1043 if (allow_2pc()) {
494da23a 1044 auto oldest_log_with_uncommitted_prep =
11fdf7f2
TL
1045 logs_with_prep_tracker_.FindMinLogContainingOutstandingPrep();
1046
494da23a
TL
1047 assert(oldest_log_with_uncommitted_prep == 0 ||
1048 oldest_log_with_uncommitted_prep >= oldest_alive_log);
1049 if (oldest_log_with_uncommitted_prep > 0 &&
1050 oldest_log_with_uncommitted_prep == oldest_alive_log) {
11fdf7f2 1051 if (unable_to_release_oldest_log_) {
7c673cae 1052 // we already attempted to flush all column families dependent on
494da23a 1053 // the oldest alive log but the log still contained uncommitted
11fdf7f2 1054 // transactions so there is still nothing that we can do.
7c673cae 1055 return status;
11fdf7f2
TL
1056 } else {
1057 ROCKS_LOG_WARN(
1058 immutable_db_options_.info_log,
494da23a 1059 "Unable to release oldest log due to uncommitted transaction");
11fdf7f2
TL
1060 unable_to_release_oldest_log_ = true;
1061 flush_wont_release_oldest_log = true;
1062 }
7c673cae 1063 }
11fdf7f2
TL
1064 }
1065 if (!flush_wont_release_oldest_log) {
7c673cae
FG
1066 // we only mark this log as getting flushed if we have successfully
1067 // flushed all data in this log. If this log contains outstanding prepared
494da23a
TL
1068 // transactions then we cannot flush this log until those transactions are
1069 // commited.
11fdf7f2 1070 unable_to_release_oldest_log_ = false;
7c673cae
FG
1071 alive_log_files_.begin()->getting_flushed = true;
1072 }
1073
494da23a
TL
1074 ROCKS_LOG_INFO(
1075 immutable_db_options_.info_log,
1076 "Flushing all column families with data in WAL number %" PRIu64
1077 ". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64,
1078 oldest_alive_log, total_log_size_.load(), GetMaxTotalWalSize());
7c673cae
FG
1079 // no need to refcount because drop is happening in write thread, so can't
1080 // happen while we're in the write thread
494da23a
TL
1081 autovector<ColumnFamilyData*> cfds;
1082 if (immutable_db_options_.atomic_flush) {
1083 SelectColumnFamiliesForAtomicFlush(&cfds);
1084 } else {
1085 for (auto cfd : *versions_->GetColumnFamilySet()) {
1086 if (cfd->IsDropped()) {
1087 continue;
1088 }
1089 if (cfd->OldestLogToKeep() <= oldest_alive_log) {
1090 cfds.push_back(cfd);
7c673cae 1091 }
494da23a
TL
1092 }
1093 }
1094 for (const auto cfd : cfds) {
1095 cfd->Ref();
1096 status = SwitchMemtable(cfd, write_context);
1097 cfd->Unref();
1098 if (!status.ok()) {
1099 break;
7c673cae
FG
1100 }
1101 }
11fdf7f2 1102 if (status.ok()) {
494da23a
TL
1103 if (immutable_db_options_.atomic_flush) {
1104 AssignAtomicFlushSeq(cfds);
1105 }
1106 for (auto cfd : cfds) {
1107 cfd->imm()->FlushRequested();
1108 }
1109 FlushRequest flush_req;
1110 GenerateFlushRequest(cfds, &flush_req);
11fdf7f2
TL
1111 SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager);
1112 MaybeScheduleFlushOrCompaction();
1113 }
7c673cae
FG
1114 return status;
1115}
1116
1117Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) {
1118 mutex_.AssertHeld();
1119 assert(write_context != nullptr);
1120 Status status;
1121
1122 // Before a new memtable is added in SwitchMemtable(),
1123 // write_buffer_manager_->ShouldFlush() will keep returning true. If another
1124 // thread is writing to another DB with the same write buffer, they may also
1125 // be flushed. We may end up with flushing much more DBs than needed. It's
1126 // suboptimal but still correct.
1127 ROCKS_LOG_INFO(
1128 immutable_db_options_.info_log,
1129 "Flushing column family with largest mem table size. Write buffer is "
494da23a 1130 "using %" ROCKSDB_PRIszt " bytes out of a total of %" ROCKSDB_PRIszt ".",
7c673cae
FG
1131 write_buffer_manager_->memory_usage(),
1132 write_buffer_manager_->buffer_size());
1133 // no need to refcount because drop is happening in write thread, so can't
1134 // happen while we're in the write thread
494da23a
TL
1135 autovector<ColumnFamilyData*> cfds;
1136 if (immutable_db_options_.atomic_flush) {
1137 SelectColumnFamiliesForAtomicFlush(&cfds);
1138 } else {
1139 ColumnFamilyData* cfd_picked = nullptr;
1140 SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber;
7c673cae 1141
494da23a
TL
1142 for (auto cfd : *versions_->GetColumnFamilySet()) {
1143 if (cfd->IsDropped()) {
1144 continue;
1145 }
1146 if (!cfd->mem()->IsEmpty()) {
1147 // We only consider active mem table, hoping immutable memtable is
1148 // already in the process of flushing.
1149 uint64_t seq = cfd->mem()->GetCreationSeq();
1150 if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) {
1151 cfd_picked = cfd;
1152 seq_num_for_cf_picked = seq;
1153 }
7c673cae
FG
1154 }
1155 }
494da23a
TL
1156 if (cfd_picked != nullptr) {
1157 cfds.push_back(cfd_picked);
1158 }
7c673cae 1159 }
11fdf7f2 1160
11fdf7f2 1161 for (const auto cfd : cfds) {
494da23a
TL
1162 if (cfd->mem()->IsEmpty()) {
1163 continue;
1164 }
11fdf7f2
TL
1165 cfd->Ref();
1166 status = SwitchMemtable(cfd, write_context);
1167 cfd->Unref();
1168 if (!status.ok()) {
1169 break;
7c673cae 1170 }
11fdf7f2
TL
1171 }
1172 if (status.ok()) {
494da23a
TL
1173 if (immutable_db_options_.atomic_flush) {
1174 AssignAtomicFlushSeq(cfds);
1175 }
1176 for (const auto cfd : cfds) {
1177 cfd->imm()->FlushRequested();
1178 }
1179 FlushRequest flush_req;
1180 GenerateFlushRequest(cfds, &flush_req);
11fdf7f2
TL
1181 SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
1182 MaybeScheduleFlushOrCompaction();
7c673cae
FG
1183 }
1184 return status;
1185}
1186
1187uint64_t DBImpl::GetMaxTotalWalSize() const {
1188 mutex_.AssertHeld();
1189 return mutable_db_options_.max_total_wal_size == 0
1190 ? 4 * max_total_in_memory_state_
1191 : mutable_db_options_.max_total_wal_size;
1192}
1193
1194// REQUIRES: mutex_ is held
1195// REQUIRES: this thread is currently at the front of the writer queue
1196Status DBImpl::DelayWrite(uint64_t num_bytes,
1197 const WriteOptions& write_options) {
1198 uint64_t time_delayed = 0;
1199 bool delayed = false;
1200 {
1201 StopWatch sw(env_, stats_, WRITE_STALL, &time_delayed);
1202 uint64_t delay = write_controller_.GetDelay(env_, num_bytes);
1203 if (delay > 0) {
1204 if (write_options.no_slowdown) {
11fdf7f2 1205 return Status::Incomplete("Write stall");
7c673cae
FG
1206 }
1207 TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep");
1208
11fdf7f2
TL
1209 // Notify write_thread_ about the stall so it can setup a barrier and
1210 // fail any pending writers with no_slowdown
1211 write_thread_.BeginWriteStall();
1212 TEST_SYNC_POINT("DBImpl::DelayWrite:BeginWriteStallDone");
7c673cae
FG
1213 mutex_.Unlock();
1214 // We will delay the write until we have slept for delay ms or
1215 // we don't need a delay anymore
1216 const uint64_t kDelayInterval = 1000;
1217 uint64_t stall_end = sw.start_time() + delay;
1218 while (write_controller_.NeedsDelay()) {
1219 if (env_->NowMicros() >= stall_end) {
1220 // We already delayed this write `delay` microseconds
1221 break;
1222 }
1223
1224 delayed = true;
1225 // Sleep for 0.001 seconds
1226 env_->SleepForMicroseconds(kDelayInterval);
1227 }
1228 mutex_.Lock();
11fdf7f2 1229 write_thread_.EndWriteStall();
7c673cae
FG
1230 }
1231
11fdf7f2
TL
1232 // Don't wait if there's a background error, even if its a soft error. We
1233 // might wait here indefinitely as the background compaction may never
1234 // finish successfully, resulting in the stall condition lasting
1235 // indefinitely
1236 while (error_handler_.GetBGError().ok() && write_controller_.IsStopped()) {
7c673cae 1237 if (write_options.no_slowdown) {
11fdf7f2 1238 return Status::Incomplete("Write stall");
7c673cae
FG
1239 }
1240 delayed = true;
11fdf7f2
TL
1241
1242 // Notify write_thread_ about the stall so it can setup a barrier and
1243 // fail any pending writers with no_slowdown
1244 write_thread_.BeginWriteStall();
7c673cae
FG
1245 TEST_SYNC_POINT("DBImpl::DelayWrite:Wait");
1246 bg_cv_.Wait();
11fdf7f2 1247 write_thread_.EndWriteStall();
7c673cae
FG
1248 }
1249 }
1250 assert(!delayed || !write_options.no_slowdown);
1251 if (delayed) {
1252 default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_STALL_MICROS,
1253 time_delayed);
1254 RecordTick(stats_, STALL_MICROS, time_delayed);
1255 }
1256
11fdf7f2
TL
1257 // If DB is not in read-only mode and write_controller is not stopping
1258 // writes, we can ignore any background errors and allow the write to
1259 // proceed
1260 Status s;
1261 if (write_controller_.IsStopped()) {
1262 // If writes are still stopped, it means we bailed due to a background
1263 // error
1264 s = Status::Incomplete(error_handler_.GetBGError().ToString());
1265 }
1266 if (error_handler_.IsDBStopped()) {
1267 s = error_handler_.GetBGError();
1268 }
1269 return s;
1270}
1271
1272Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,
1273 WriteBatch* my_batch) {
1274 assert(write_options.low_pri);
1275 // This is called outside the DB mutex. Although it is safe to make the call,
1276 // the consistency condition is not guaranteed to hold. It's OK to live with
1277 // it in this case.
1278 // If we need to speed compaction, it means the compaction is left behind
1279 // and we start to limit low pri writes to a limit.
1280 if (write_controller_.NeedSpeedupCompaction()) {
1281 if (allow_2pc() && (my_batch->HasCommit() || my_batch->HasRollback())) {
1282 // For 2PC, we only rate limit prepare, not commit.
1283 return Status::OK();
1284 }
1285 if (write_options.no_slowdown) {
1286 return Status::Incomplete();
1287 } else {
1288 assert(my_batch != nullptr);
1289 // Rate limit those writes. The reason that we don't completely wait
1290 // is that in case the write is heavy, low pri writes may never have
1291 // a chance to run. Now we guarantee we are still slowly making
1292 // progress.
1293 PERF_TIMER_GUARD(write_delay_time);
1294 write_controller_.low_pri_rate_limiter()->Request(
1295 my_batch->GetDataSize(), Env::IO_HIGH, nullptr /* stats */,
1296 RateLimiter::OpType::kWrite);
1297 }
1298 }
1299 return Status::OK();
7c673cae
FG
1300}
1301
1302Status DBImpl::ScheduleFlushes(WriteContext* context) {
494da23a
TL
1303 autovector<ColumnFamilyData*> cfds;
1304 if (immutable_db_options_.atomic_flush) {
1305 SelectColumnFamiliesForAtomicFlush(&cfds);
1306 for (auto cfd : cfds) {
1307 cfd->Ref();
1308 }
1309 flush_scheduler_.Clear();
1310 } else {
1311 ColumnFamilyData* tmp_cfd;
1312 while ((tmp_cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
1313 cfds.push_back(tmp_cfd);
1314 }
1315 }
11fdf7f2 1316 Status status;
494da23a
TL
1317 for (auto& cfd : cfds) {
1318 if (!cfd->mem()->IsEmpty()) {
1319 status = SwitchMemtable(cfd, context);
1320 }
7c673cae
FG
1321 if (cfd->Unref()) {
1322 delete cfd;
494da23a 1323 cfd = nullptr;
7c673cae
FG
1324 }
1325 if (!status.ok()) {
11fdf7f2
TL
1326 break;
1327 }
7c673cae 1328 }
11fdf7f2 1329 if (status.ok()) {
494da23a
TL
1330 if (immutable_db_options_.atomic_flush) {
1331 AssignAtomicFlushSeq(cfds);
1332 }
1333 FlushRequest flush_req;
1334 GenerateFlushRequest(cfds, &flush_req);
11fdf7f2
TL
1335 SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
1336 MaybeScheduleFlushOrCompaction();
1337 }
1338 return status;
7c673cae
FG
1339}
1340
1341#ifndef ROCKSDB_LITE
11fdf7f2 1342void DBImpl::NotifyOnMemTableSealed(ColumnFamilyData* /*cfd*/,
7c673cae
FG
1343 const MemTableInfo& mem_table_info) {
1344 if (immutable_db_options_.listeners.size() == 0U) {
1345 return;
1346 }
1347 if (shutting_down_.load(std::memory_order_acquire)) {
1348 return;
1349 }
1350
1351 for (auto listener : immutable_db_options_.listeners) {
1352 listener->OnMemTableSealed(mem_table_info);
1353 }
1354}
1355#endif // ROCKSDB_LITE
1356
1357// REQUIRES: mutex_ is held
1358// REQUIRES: this thread is currently at the front of the writer queue
1359Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
1360 mutex_.AssertHeld();
11fdf7f2
TL
1361 WriteThread::Writer nonmem_w;
1362 if (two_write_queues_) {
1363 // SwitchMemtable is a rare event. To simply the reasoning, we make sure
1364 // that there is no concurrent thread writing to WAL.
1365 nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
1366 }
1367
494da23a 1368 std::unique_ptr<WritableFile> lfile;
7c673cae
FG
1369 log::Writer* new_log = nullptr;
1370 MemTable* new_mem = nullptr;
1371
11fdf7f2
TL
1372 // Recoverable state is persisted in WAL. After memtable switch, WAL might
1373 // be deleted, so we write the state to memtable to be persisted as well.
1374 Status s = WriteRecoverableState();
1375 if (!s.ok()) {
1376 return s;
1377 }
1378
1379 // In case of pipelined write is enabled, wait for all pending memtable
1380 // writers.
1381 if (immutable_db_options_.enable_pipelined_write) {
1382 // Memtable writers may call DB::Get in case max_successive_merges > 0,
1383 // which may lock mutex. Unlocking mutex here to avoid deadlock.
1384 mutex_.Unlock();
1385 write_thread_.WaitForMemTableWriters();
1386 mutex_.Lock();
1387 }
1388
7c673cae
FG
1389 // Attempt to switch to a new memtable and trigger flush of old.
1390 // Do this without holding the dbmutex lock.
1391 assert(versions_->prev_log_number() == 0);
11fdf7f2
TL
1392 if (two_write_queues_) {
1393 log_write_mutex_.Lock();
1394 }
7c673cae 1395 bool creating_new_log = !log_empty_;
11fdf7f2
TL
1396 if (two_write_queues_) {
1397 log_write_mutex_.Unlock();
1398 }
7c673cae
FG
1399 uint64_t recycle_log_number = 0;
1400 if (creating_new_log && immutable_db_options_.recycle_log_file_num &&
11fdf7f2
TL
1401 !log_recycle_files_.empty()) {
1402 recycle_log_number = log_recycle_files_.front();
1403 log_recycle_files_.pop_front();
7c673cae
FG
1404 }
1405 uint64_t new_log_number =
1406 creating_new_log ? versions_->NewFileNumber() : logfile_number_;
7c673cae
FG
1407 const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
1408
11fdf7f2 1409 // Set memtable_info for memtable sealed callback
7c673cae
FG
1410#ifndef ROCKSDB_LITE
1411 MemTableInfo memtable_info;
1412 memtable_info.cf_name = cfd->GetName();
1413 memtable_info.first_seqno = cfd->mem()->GetFirstSequenceNumber();
1414 memtable_info.earliest_seqno = cfd->mem()->GetEarliestSequenceNumber();
1415 memtable_info.num_entries = cfd->mem()->num_entries();
1416 memtable_info.num_deletes = cfd->mem()->num_deletes();
1417#endif // ROCKSDB_LITE
1418 // Log this later after lock release. It may be outdated, e.g., if background
1419 // flush happens before logging, but that should be ok.
1420 int num_imm_unflushed = cfd->imm()->NumNotFlushed();
1421 DBOptions db_options =
1422 BuildDBOptions(immutable_db_options_, mutable_db_options_);
1423 const auto preallocate_block_size =
494da23a 1424 GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size);
11fdf7f2 1425 auto write_hint = CalculateWALWriteHint();
7c673cae 1426 mutex_.Unlock();
7c673cae 1427 {
11fdf7f2
TL
1428 std::string log_fname =
1429 LogFileName(immutable_db_options_.wal_dir, new_log_number);
7c673cae
FG
1430 if (creating_new_log) {
1431 EnvOptions opt_env_opt =
1432 env_->OptimizeForLogWrite(env_options_, db_options);
1433 if (recycle_log_number) {
1434 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1435 "reusing log %" PRIu64 " from recycle list\n",
1436 recycle_log_number);
11fdf7f2
TL
1437 std::string old_log_fname =
1438 LogFileName(immutable_db_options_.wal_dir, recycle_log_number);
1439 s = env_->ReuseWritableFile(log_fname, old_log_fname, &lfile,
1440 opt_env_opt);
7c673cae 1441 } else {
11fdf7f2 1442 s = NewWritableFile(env_, log_fname, &lfile, opt_env_opt);
7c673cae
FG
1443 }
1444 if (s.ok()) {
1445 // Our final size should be less than write_buffer_size
1446 // (compression, etc) but err on the side of caution.
1447
1448 // use preallocate_block_size instead
1449 // of calling GetWalPreallocateBlockSize()
1450 lfile->SetPreallocationBlockSize(preallocate_block_size);
11fdf7f2 1451 lfile->SetWriteLifeTimeHint(write_hint);
494da23a
TL
1452 std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
1453 std::move(lfile), log_fname, opt_env_opt, env_, nullptr /* stats */,
1454 immutable_db_options_.listeners));
11fdf7f2
TL
1455 new_log = new log::Writer(
1456 std::move(file_writer), new_log_number,
1457 immutable_db_options_.recycle_log_file_num > 0, manual_wal_flush_);
7c673cae
FG
1458 }
1459 }
1460
1461 if (s.ok()) {
1462 SequenceNumber seq = versions_->LastSequence();
1463 new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq);
11fdf7f2 1464 context->superversion_context.NewSuperVersion();
7c673cae 1465 }
7c673cae
FG
1466 }
1467 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1468 "[%s] New memtable created with log file: #%" PRIu64
1469 ". Immutable memtables: %d.\n",
1470 cfd->GetName().c_str(), new_log_number, num_imm_unflushed);
1471 mutex_.Lock();
11fdf7f2
TL
1472 if (s.ok() && creating_new_log) {
1473 log_write_mutex_.Lock();
7c673cae 1474 assert(new_log != nullptr);
11fdf7f2
TL
1475 if (!logs_.empty()) {
1476 // Alway flush the buffer of the last log before switching to a new one
1477 log::Writer* cur_log_writer = logs_.back().writer;
1478 s = cur_log_writer->WriteBuffer();
1479 if (!s.ok()) {
1480 ROCKS_LOG_WARN(immutable_db_options_.info_log,
1481 "[%s] Failed to switch from #%" PRIu64 " to #%" PRIu64
494da23a 1482 " WAL file\n",
11fdf7f2
TL
1483 cfd->GetName().c_str(), cur_log_writer->get_log_number(),
1484 new_log_number);
1485 }
1486 }
494da23a
TL
1487 if (s.ok()) {
1488 logfile_number_ = new_log_number;
1489 log_empty_ = true;
1490 log_dir_synced_ = false;
1491 logs_.emplace_back(logfile_number_, new_log);
1492 alive_log_files_.push_back(LogFileNumberSize(logfile_number_));
1493 }
11fdf7f2
TL
1494 log_write_mutex_.Unlock();
1495 }
1496
1497 if (!s.ok()) {
1498 // how do we fail if we're not creating new log?
1499 assert(creating_new_log);
494da23a
TL
1500 if (new_mem) {
1501 delete new_mem;
1502 }
1503 if (new_log) {
1504 delete new_log;
1505 }
1506 SuperVersion* new_superversion =
1507 context->superversion_context.new_superversion.release();
1508 if (new_superversion != nullptr) {
1509 delete new_superversion;
1510 }
1511 // We may have lost data from the WritableFileBuffer in-memory buffer for
1512 // the current log, so treat it as a fatal error and set bg_error
1513 error_handler_.SetBGError(s, BackgroundErrorReason::kMemTable);
1514 // Read back bg_error in order to get the right severity
1515 s = error_handler_.GetBGError();
1516
11fdf7f2
TL
1517 if (two_write_queues_) {
1518 nonmem_write_thread_.ExitUnbatched(&nonmem_w);
1519 }
1520 return s;
7c673cae 1521 }
11fdf7f2 1522
7c673cae
FG
1523 for (auto loop_cfd : *versions_->GetColumnFamilySet()) {
1524 // all this is just optimization to delete logs that
1525 // are no longer needed -- if CF is empty, that means it
1526 // doesn't need that particular log to stay alive, so we just
1527 // advance the log number. no need to persist this in the manifest
1528 if (loop_cfd->mem()->GetFirstSequenceNumber() == 0 &&
1529 loop_cfd->imm()->NumNotFlushed() == 0) {
1530 if (creating_new_log) {
1531 loop_cfd->SetLogNumber(logfile_number_);
1532 }
1533 loop_cfd->mem()->SetCreationSeq(versions_->LastSequence());
1534 }
1535 }
1536
1537 cfd->mem()->SetNextLogNumber(logfile_number_);
1538 cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_);
1539 new_mem->Ref();
1540 cfd->SetMemtable(new_mem);
11fdf7f2
TL
1541 InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context,
1542 mutable_cf_options);
494da23a
TL
1543#ifndef ROCKSDB_LITE
1544 mutex_.Unlock();
1545 // Notify client that memtable is sealed, now that we have successfully
1546 // installed a new memtable
1547 NotifyOnMemTableSealed(cfd, memtable_info);
1548 mutex_.Lock();
1549#endif // ROCKSDB_LITE
11fdf7f2
TL
1550 if (two_write_queues_) {
1551 nonmem_write_thread_.ExitUnbatched(&nonmem_w);
1552 }
7c673cae
FG
1553 return s;
1554}
1555
1556size_t DBImpl::GetWalPreallocateBlockSize(uint64_t write_buffer_size) const {
1557 mutex_.AssertHeld();
494da23a
TL
1558 size_t bsize =
1559 static_cast<size_t>(write_buffer_size / 10 + write_buffer_size);
7c673cae
FG
1560 // Some users might set very high write_buffer_size and rely on
1561 // max_total_wal_size or other parameters to control the WAL size.
1562 if (mutable_db_options_.max_total_wal_size > 0) {
494da23a
TL
1563 bsize = std::min<size_t>(
1564 bsize, static_cast<size_t>(mutable_db_options_.max_total_wal_size));
7c673cae
FG
1565 }
1566 if (immutable_db_options_.db_write_buffer_size > 0) {
1567 bsize = std::min<size_t>(bsize, immutable_db_options_.db_write_buffer_size);
1568 }
1569 if (immutable_db_options_.write_buffer_manager &&
1570 immutable_db_options_.write_buffer_manager->enabled()) {
1571 bsize = std::min<size_t>(
1572 bsize, immutable_db_options_.write_buffer_manager->buffer_size());
1573 }
1574
1575 return bsize;
1576}
1577
1578// Default implementations of convenience methods that subclasses of DB
1579// can call if they wish
1580Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
1581 const Slice& key, const Slice& value) {
1582 // Pre-allocate size of write batch conservatively.
1583 // 8 bytes are taken by header, 4 bytes for count, 1 byte for type,
1584 // and we allocate 11 extra bytes for key length, as well as value length.
1585 WriteBatch batch(key.size() + value.size() + 24);
11fdf7f2
TL
1586 Status s = batch.Put(column_family, key, value);
1587 if (!s.ok()) {
1588 return s;
1589 }
7c673cae
FG
1590 return Write(opt, &batch);
1591}
1592
1593Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,
1594 const Slice& key) {
1595 WriteBatch batch;
1596 batch.Delete(column_family, key);
1597 return Write(opt, &batch);
1598}
1599
1600Status DB::SingleDelete(const WriteOptions& opt,
1601 ColumnFamilyHandle* column_family, const Slice& key) {
1602 WriteBatch batch;
1603 batch.SingleDelete(column_family, key);
1604 return Write(opt, &batch);
1605}
1606
1607Status DB::DeleteRange(const WriteOptions& opt,
1608 ColumnFamilyHandle* column_family,
1609 const Slice& begin_key, const Slice& end_key) {
1610 WriteBatch batch;
1611 batch.DeleteRange(column_family, begin_key, end_key);
1612 return Write(opt, &batch);
1613}
1614
1615Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family,
1616 const Slice& key, const Slice& value) {
1617 WriteBatch batch;
11fdf7f2
TL
1618 Status s = batch.Merge(column_family, key, value);
1619 if (!s.ok()) {
1620 return s;
1621 }
7c673cae
FG
1622 return Write(opt, &batch);
1623}
1624} // namespace rocksdb