// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
-#include "db/db_impl/db_impl.h"
-
#include <cinttypes>
+
+#include "db/db_impl/db_impl.h"
#include "db/error_handler.h"
#include "db/event_helpers.h"
#include "monitoring/perf_context_imp.h"
#include "options/options_helper.h"
#include "test_util/sync_point.h"
+#include "util/cast_util.h"
namespace ROCKSDB_NAMESPACE {
// Convenience methods
Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& val) {
- auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
+ auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
if (!cfh->cfd()->ioptions()->merge_operator) {
return Status::NotSupported("Provide a merge_operator when opening DB");
} else {
if (tracer_) {
InstrumentedMutexLock lock(&trace_mutex_);
if (tracer_) {
- tracer_->Write(my_batch);
+ // TODO: maybe handle the tracing status?
+ tracer_->Write(my_batch).PermitUncheckedError();
}
}
if (write_options.sync && write_options.disableWAL) {
assert(!WriteBatchInternal::IsLatestPersistentState(my_batch) ||
disable_memtable);
- Status status;
if (write_options.low_pri) {
- status = ThrottleLowPriWritesIfNeeded(write_options, my_batch);
- if (!status.ok()) {
- return status;
+ Status s = ThrottleLowPriWritesIfNeeded(write_options, my_batch);
+ if (!s.ok()) {
+ return s;
}
}
? batch_cnt
// every key is a sub-batch consuming a seq
: WriteBatchInternal::Count(my_batch);
- uint64_t seq;
+ uint64_t seq = 0;
// Use a write thread to i) optimize for WAL write, ii) publish last
// sequence in in increasing order, iii) call pre_release_callback serially
- status = WriteImplWALOnly(&write_thread_, write_options, my_batch, callback,
- log_used, log_ref, &seq, sub_batch_cnt,
- pre_release_callback, kDoAssignOrder,
- kDoPublishLastSeq, disable_memtable);
+ Status status = WriteImplWALOnly(
+ &write_thread_, write_options, my_batch, callback, log_used, log_ref,
+ &seq, sub_batch_cnt, pre_release_callback, kDoAssignOrder,
+ kDoPublishLastSeq, disable_memtable);
TEST_SYNC_POINT("DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL");
if (!status.ok()) {
return status;
StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
write_thread_.JoinBatchGroup(&w);
+ Status status;
if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
// we are a non-leader in a parallel group
*seq_used = w.sequence;
}
// write is complete and leader has updated sequence
+ // Should we handle it?
+ status.PermitUncheckedError();
return w.FinalStatus();
}
// else we are the leader of the write batch group
last_batch_group_size_ =
write_thread_.EnterAsBatchGroupLeader(&w, &write_group);
+ IOStatus io_s;
if (status.ok()) {
// Rules for when we can update the memtable concurrently
// 1. supported by memtable
if (!two_write_queues_) {
if (status.ok() && !write_options.disableWAL) {
PERF_TIMER_GUARD(write_wal_time);
- status = WriteToWAL(write_group, log_writer, log_used, need_log_sync,
- need_log_dir_sync, last_sequence + 1);
+ io_s = WriteToWAL(write_group, log_writer, log_used, need_log_sync,
+ need_log_dir_sync, last_sequence + 1);
}
} else {
if (status.ok() && !write_options.disableWAL) {
PERF_TIMER_GUARD(write_wal_time);
// LastAllocatedSequence is increased inside WriteToWAL under
// wal_write_mutex_ to ensure ordered events in WAL
- status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence,
- seq_inc);
+ io_s = ConcurrentWriteToWAL(write_group, log_used, &last_sequence,
+ seq_inc);
} else {
// Otherwise we inc seq number for memtable writes
last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
}
}
+ status = io_s;
assert(last_sequence != kMaxSequenceNumber);
const SequenceNumber current_sequence = last_sequence + 1;
last_sequence += seq_inc;
PERF_TIMER_START(write_pre_and_post_process_time);
if (!w.CallbackFailed()) {
- WriteStatusCheck(status);
+ if (!io_s.ok()) {
+ IOStatusCheck(io_s);
+ } else {
+ WriteStatusCheck(status);
+ }
}
if (need_log_sync) {
mutex_.Lock();
- MarkLogsSynced(logfile_number_, need_log_dir_sync, status);
+ if (status.ok()) {
+ status = MarkLogsSynced(logfile_number_, need_log_dir_sync);
+ } else {
+ MarkLogsNotSynced(logfile_number_);
+ }
mutex_.Unlock();
// Requesting sync with two_write_queues_ is expected to be very rare. We
// hence provide a simple implementation that is not necessarily efficient.
WriteThread::Writer w(write_options, my_batch, callback, log_ref,
disable_memtable);
write_thread_.JoinBatchGroup(&w);
+ TEST_SYNC_POINT("DBImplWrite::PipelinedWriteImpl:AfterJoinBatchGroup");
if (w.state == WriteThread::STATE_GROUP_LEADER) {
WriteThread::WriteGroup wal_write_group;
if (w.callback && !w.callback->AllowWriteBatching()) {
PERF_TIMER_STOP(write_pre_and_post_process_time);
+ IOStatus io_s;
if (w.status.ok() && !write_options.disableWAL) {
PERF_TIMER_GUARD(write_wal_time);
stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1);
wal_write_group.size - 1);
RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1);
}
- w.status = WriteToWAL(wal_write_group, log_writer, log_used,
- need_log_sync, need_log_dir_sync, current_sequence);
+ io_s = WriteToWAL(wal_write_group, log_writer, log_used, need_log_sync,
+ need_log_dir_sync, current_sequence);
+ w.status = io_s;
}
if (!w.CallbackFailed()) {
- WriteStatusCheck(w.status);
+ if (!io_s.ok()) {
+ IOStatusCheck(io_s);
+ } else {
+ WriteStatusCheck(w.status);
+ }
}
if (need_log_sync) {
mutex_.Lock();
- MarkLogsSynced(logfile_number_, need_log_dir_sync, w.status);
+ if (w.status.ok()) {
+ w.status = MarkLogsSynced(logfile_number_, need_log_dir_sync);
+ } else {
+ MarkLogsNotSynced(logfile_number_);
+ }
mutex_.Unlock();
}
0 /*log_number*/, this, true /*concurrent_memtable_writes*/,
seq_per_batch_, sub_batch_cnt, true /*batch_per_txn*/,
write_options.memtable_insert_hint_per_batch);
-
- WriteStatusCheck(w.status);
if (write_options.disableWAL) {
has_unpersisted_data_.store(true, std::memory_order_relaxed);
}
std::lock_guard<std::mutex> lck(switch_mutex_);
switch_cv_.notify_all();
}
+ WriteStatusCheck(w.status);
if (!w.FinalStatus().ok()) {
return w.FinalStatus();
InstrumentedMutexLock l(&mutex_);
bool need_log_sync = false;
status = PreprocessWrite(write_options, &need_log_sync, &write_context);
- WriteStatusCheck(status);
+ WriteStatusCheckOnLocked(status);
}
if (!status.ok()) {
WriteThread::WriteGroup write_group;
}
seq_inc = total_batch_cnt;
}
+ IOStatus io_s;
if (!write_options.disableWAL) {
- status =
- ConcurrentWriteToWAL(write_group, log_used, &last_sequence, seq_inc);
+ io_s = ConcurrentWriteToWAL(write_group, log_used, &last_sequence, seq_inc);
+ status = io_s;
} else {
// Otherwise we inc seq number to do solely the seq allocation
last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
PERF_TIMER_START(write_pre_and_post_process_time);
if (!w.CallbackFailed()) {
- WriteStatusCheck(status);
+ if (!io_s.ok()) {
+ IOStatusCheck(io_s);
+ } else {
+ WriteStatusCheck(status);
+ }
}
if (status.ok()) {
size_t index = 0;
return status;
}
+void DBImpl::WriteStatusCheckOnLocked(const Status& status) {
+ // Is setting bg_error_ enough here? This will at least stop
+ // compaction and fail any further writes.
+ // Caller must hold mutex_.
+ assert(!status.IsIOFenced() || !error_handler_.GetBGError().ok());
+ mutex_.AssertHeld();
+ if (immutable_db_options_.paranoid_checks && !status.ok() &&
+ !status.IsBusy() && !status.IsIncomplete()) {
+ // Maybe change the return status to void?
+ error_handler_.SetBGError(status, BackgroundErrorReason::kWriteCallback)
+ .PermitUncheckedError();
+ }
+}
+
void DBImpl::WriteStatusCheck(const Status& status) {
// Is setting bg_error_ enough here? This will at least stop
// compaction and fail any further writes.
+ assert(!status.IsIOFenced() || !error_handler_.GetBGError().ok());
if (immutable_db_options_.paranoid_checks && !status.ok() &&
!status.IsBusy() && !status.IsIncomplete()) {
mutex_.Lock();
- error_handler_.SetBGError(status, BackgroundErrorReason::kWriteCallback);
+ // Maybe change the return status to void?
+ error_handler_.SetBGError(status, BackgroundErrorReason::kWriteCallback)
+ .PermitUncheckedError();
+ mutex_.Unlock();
+ }
+}
+
+void DBImpl::IOStatusCheck(const IOStatus& io_status) {
+ // Is setting bg_error_ enough here? This will at least stop
+ // compaction and fail any further writes.
+ if ((immutable_db_options_.paranoid_checks && !io_status.ok() &&
+ !io_status.IsBusy() && !io_status.IsIncomplete()) ||
+ io_status.IsIOFenced()) {
+ mutex_.Lock();
+ // Maybe change the return status to void?
+ error_handler_.SetBGError(io_status, BackgroundErrorReason::kWriteCallback)
+ .PermitUncheckedError();
mutex_.Unlock();
}
}
if (!status.ok()) {
mutex_.Lock();
assert(!error_handler_.IsBGWorkStopped());
- error_handler_.SetBGError(status, BackgroundErrorReason::kMemTable);
+ // Maybe change the return status to void?
+ error_handler_.SetBGError(status, BackgroundErrorReason::kMemTable)
+ .PermitUncheckedError();
mutex_.Unlock();
}
}
merged_batch = tmp_batch;
for (auto writer : write_group) {
if (!writer->CallbackFailed()) {
- WriteBatchInternal::Append(merged_batch, writer->batch,
- /*WAL_only*/ true);
+ Status s = WriteBatchInternal::Append(merged_batch, writer->batch,
+ /*WAL_only*/ true);
+ // Always returns Status::OK.
+ assert(s.ok());
if (WriteBatchInternal::IsLatestPersistentState(writer->batch)) {
// We only need to cache the last of such write batch
*to_be_cached_state = writer->batch;
// When two_write_queues_ is disabled, this function is called from the only
// write thread. Otherwise this must be called holding log_write_mutex_.
-Status DBImpl::WriteToWAL(const WriteBatch& merged_batch,
- log::Writer* log_writer, uint64_t* log_used,
- uint64_t* log_size) {
+IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch,
+ log::Writer* log_writer, uint64_t* log_used,
+ uint64_t* log_size) {
assert(log_size != nullptr);
Slice log_entry = WriteBatchInternal::Contents(&merged_batch);
*log_size = log_entry.size();
if (UNLIKELY(needs_locking)) {
log_write_mutex_.Lock();
}
- Status status = log_writer->AddRecord(log_entry);
+ IOStatus io_s = log_writer->AddRecord(log_entry);
+
if (UNLIKELY(needs_locking)) {
log_write_mutex_.Unlock();
}
// since alive_log_files_ might be modified concurrently
alive_log_files_.back().AddSize(log_entry.size());
log_empty_ = false;
- return status;
+ return io_s;
}
-Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
- log::Writer* log_writer, uint64_t* log_used,
- bool need_log_sync, bool need_log_dir_sync,
- SequenceNumber sequence) {
- Status status;
-
+IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
+ log::Writer* log_writer, uint64_t* log_used,
+ bool need_log_sync, bool need_log_dir_sync,
+ SequenceNumber sequence) {
+ IOStatus io_s;
assert(!write_group.leader->disable_wal);
// Same holds for all in the batch group
size_t write_with_wal = 0;
WriteBatchInternal::SetSequence(merged_batch, sequence);
uint64_t log_size;
- status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
+ io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
if (to_be_cached_state) {
cached_recoverable_state_ = *to_be_cached_state;
cached_recoverable_state_empty_ = false;
}
- if (status.ok() && need_log_sync) {
+ if (io_s.ok() && need_log_sync) {
StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS);
// It's safe to access logs_ with unlocked mutex_ here because:
// - we've set getting_synced=true for all logs,
// - as long as other threads don't modify it, it's safe to read
// from std::deque from multiple threads concurrently.
for (auto& log : logs_) {
- status = log.writer->file()->Sync(immutable_db_options_.use_fsync);
- if (!status.ok()) {
+ io_s = log.writer->file()->Sync(immutable_db_options_.use_fsync);
+ if (!io_s.ok()) {
break;
}
}
- if (status.ok() && need_log_dir_sync) {
+
+ if (io_s.ok() && need_log_dir_sync) {
// We only sync WAL directory the first time WAL syncing is
// requested, so that in case users never turn on WAL sync,
// we can avoid the disk I/O in the write code path.
- status = directories_.GetWalDir()->Fsync();
+ io_s = directories_.GetWalDir()->Fsync(IOOptions(), nullptr);
}
}
if (merged_batch == &tmp_batch_) {
tmp_batch_.Clear();
}
- if (status.ok()) {
+ if (io_s.ok()) {
auto stats = default_cf_internal_stats_;
if (need_log_sync) {
stats->AddDBStats(InternalStats::kIntStatsWalFileSynced, 1);
stats->AddDBStats(InternalStats::kIntStatsWriteWithWal, write_with_wal);
RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
}
- return status;
+ return io_s;
}
-Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
- uint64_t* log_used,
- SequenceNumber* last_sequence,
- size_t seq_inc) {
- Status status;
+IOStatus DBImpl::ConcurrentWriteToWAL(
+ const WriteThread::WriteGroup& write_group, uint64_t* log_used,
+ SequenceNumber* last_sequence, size_t seq_inc) {
+ IOStatus io_s;
assert(!write_group.leader->disable_wal);
// Same holds for all in the batch group
log::Writer* log_writer = logs_.back().writer;
uint64_t log_size;
- status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
+ io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
if (to_be_cached_state) {
cached_recoverable_state_ = *to_be_cached_state;
cached_recoverable_state_empty_ = false;
}
log_write_mutex_.Unlock();
- if (status.ok()) {
+ if (io_s.ok()) {
const bool concurrent = true;
auto stats = default_cf_internal_stats_;
stats->AddDBStats(InternalStats::kIntStatsWalFileBytes, log_size,
concurrent);
RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
}
- return status;
+ return io_s;
}
Status DBImpl::WriteRecoverableState() {
}
for (auto& cfd : cfds) {
autovector<MemTable*> to_delete;
- cfd->imm()->TrimHistory(&to_delete, cfd->mem()->ApproximateMemoryUsage());
+ bool trimmed = cfd->imm()->TrimHistory(
+ &to_delete, cfd->mem()->ApproximateMemoryUsage());
if (!to_delete.empty()) {
for (auto m : to_delete) {
delete m;
}
+ }
+ if (trimmed) {
context->superversion_context.NewSuperVersion();
assert(context->superversion_context.new_superversion.get() != nullptr);
cfd->InstallSuperVersion(&context->superversion_context, &mutex_);
std::unique_ptr<WritableFile> lfile;
log::Writer* new_log = nullptr;
MemTable* new_mem = nullptr;
+ IOStatus io_s;
// Recoverable state is persisted in WAL. After memtable switch, WAL might
// be deleted, so we write the state to memtable to be persisted as well.
if (creating_new_log) {
// TODO: Write buffer size passed in should be max of all CF's instead
// of mutable_cf_options.write_buffer_size.
- s = CreateWAL(new_log_number, recycle_log_number, preallocate_block_size,
- &new_log);
+ io_s = CreateWAL(new_log_number, recycle_log_number, preallocate_block_size,
+ &new_log);
+ if (s.ok()) {
+ s = io_s;
+ }
}
if (s.ok()) {
SequenceNumber seq = versions_->LastSequence();
if (!logs_.empty()) {
// Alway flush the buffer of the last log before switching to a new one
log::Writer* cur_log_writer = logs_.back().writer;
- s = cur_log_writer->WriteBuffer();
+ io_s = cur_log_writer->WriteBuffer();
+ if (s.ok()) {
+ s = io_s;
+ }
if (!s.ok()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"[%s] Failed to switch from #%" PRIu64 " to #%" PRIu64
}
// We may have lost data from the WritableFileBuffer in-memory buffer for
// the current log, so treat it as a fatal error and set bg_error
- error_handler_.SetBGError(s, BackgroundErrorReason::kMemTable);
+ // Should handle return error?
+ if (!io_s.ok()) {
+ // Should handle return error?
+ error_handler_.SetBGError(io_s, BackgroundErrorReason::kMemTable)
+ .PermitUncheckedError();
+ } else {
+ // Should handle return error?
+ error_handler_.SetBGError(s, BackgroundErrorReason::kMemTable)
+ .PermitUncheckedError();
+ }
// Read back bg_error in order to get the right severity
s = error_handler_.GetBGError();
return s;
NotifyOnMemTableSealed(cfd, memtable_info);
mutex_.Lock();
#endif // ROCKSDB_LITE
+ // It is possible that we got here without checking the value of i_os, but
+ // that is okay. If we did, it most likely means that s was already an error.
+ // In any case, ignore any unchecked error for i_os here.
+ io_s.PermitUncheckedError();
return s;
}
const Slice* ts = opt.timestamp;
assert(nullptr != ts);
size_t ts_sz = ts->size();
+ assert(column_family->GetComparator());
+ assert(ts_sz == column_family->GetComparator()->timestamp_size());
WriteBatch batch(key.size() + ts_sz + value.size() + 24, /*max_bytes=*/0,
ts_sz);
Status s = batch.Put(column_family, key, value);
Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,
const Slice& key) {
- WriteBatch batch;
- batch.Delete(column_family, key);
+ if (nullptr == opt.timestamp) {
+ WriteBatch batch;
+ Status s = batch.Delete(column_family, key);
+ if (!s.ok()) {
+ return s;
+ }
+ return Write(opt, &batch);
+ }
+ const Slice* ts = opt.timestamp;
+ assert(ts != nullptr);
+ const size_t ts_sz = ts->size();
+ constexpr size_t kKeyAndValueLenSize = 11;
+ constexpr size_t kWriteBatchOverhead =
+ WriteBatchInternal::kHeader + sizeof(ValueType) + kKeyAndValueLenSize;
+ WriteBatch batch(key.size() + ts_sz + kWriteBatchOverhead, /*max_bytes=*/0,
+ ts_sz);
+ Status s = batch.Delete(column_family, key);
+ if (!s.ok()) {
+ return s;
+ }
+ s = batch.AssignTimestamp(*ts);
+ if (!s.ok()) {
+ return s;
+ }
return Write(opt, &batch);
}
Status DB::SingleDelete(const WriteOptions& opt,
ColumnFamilyHandle* column_family, const Slice& key) {
WriteBatch batch;
- batch.SingleDelete(column_family, key);
+ Status s = batch.SingleDelete(column_family, key);
+ if (!s.ok()) {
+ return s;
+ }
return Write(opt, &batch);
}
ColumnFamilyHandle* column_family,
const Slice& begin_key, const Slice& end_key) {
WriteBatch batch;
- batch.DeleteRange(column_family, begin_key, end_key);
+ Status s = batch.DeleteRange(column_family, begin_key, end_key);
+ if (!s.ok()) {
+ return s;
+ }
return Write(opt, &batch);
}