if (write_thread_.CompleteParallelMemTableWriter(&w)) {
// we're responsible for exit batch group
- for (auto* writer : *(w.write_group)) {
- if (!writer->CallbackFailed() && writer->pre_release_callback) {
- assert(writer->sequence != kMaxSequenceNumber);
- Status ws = writer->pre_release_callback->Callback(writer->sequence,
- disable_memtable);
- if (!ws.ok()) {
- status = ws;
- break;
- }
- }
- }
// TODO(myabandeh): propagate status to write_group
auto last_sequence = w.write_group->last_sequence;
versions_->SetLastSequence(last_sequence);
concurrent_update);
RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other);
}
- MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size);
+ RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size);
if (write_options.disableWAL) {
has_unpersisted_data_.store(true, std::memory_order_relaxed);
const SequenceNumber current_sequence = last_sequence + 1;
last_sequence += seq_inc;
+ // PreReleaseCallback is called after WAL write and before memtable write
+ if (status.ok()) {
+ SequenceNumber next_sequence = current_sequence;
+ // Note: the logic for advancing seq here must be consistent with the
+ // logic in WriteBatchInternal::InsertInto(write_group...) as well as
+ // with WriteBatchInternal::InsertInto(write_batch...) that is called on
+ // the merged batch during recovery from the WAL.
+ for (auto* writer : write_group) {
+ if (writer->CallbackFailed()) {
+ continue;
+ }
+ writer->sequence = next_sequence;
+ if (writer->pre_release_callback) {
+ Status ws = writer->pre_release_callback->Callback(
+ writer->sequence, disable_memtable, writer->log_used);
+ if (!ws.ok()) {
+ status = ws;
+ break;
+ }
+ }
+ if (seq_per_batch_) {
+ assert(writer->batch_cnt);
+ next_sequence += writer->batch_cnt;
+ } else if (writer->ShouldWriteToMemtable()) {
+ next_sequence += WriteBatchInternal::Count(writer->batch);
+ }
+ }
+ }
+
if (status.ok()) {
PERF_TIMER_GUARD(write_memtable_time);
0 /*recovery_log_number*/, this, parallel, seq_per_batch_,
batch_per_txn_);
} else {
- SequenceNumber next_sequence = current_sequence;
- // Note: the logic for advancing seq here must be consistent with the
- // logic in WriteBatchInternal::InsertInto(write_group...) as well as
- // with WriteBatchInternal::InsertInto(write_batch...) that is called on
- // the merged batch during recovery from the WAL.
- for (auto* writer : write_group) {
- if (writer->CallbackFailed()) {
- continue;
- }
- writer->sequence = next_sequence;
- if (seq_per_batch_) {
- assert(writer->batch_cnt);
- next_sequence += writer->batch_cnt;
- } else if (writer->ShouldWriteToMemtable()) {
- next_sequence += WriteBatchInternal::Count(writer->batch);
- }
- }
write_group.last_sequence = last_sequence;
- write_group.running.store(static_cast<uint32_t>(write_group.size),
- std::memory_order_relaxed);
write_thread_.LaunchParallelMemTableWriters(&write_group);
in_parallel_group = true;
}
if (should_exit_batch_group) {
if (status.ok()) {
- for (auto* writer : write_group) {
- if (!writer->CallbackFailed() && writer->pre_release_callback) {
- assert(writer->sequence != kMaxSequenceNumber);
- Status ws = writer->pre_release_callback->Callback(writer->sequence,
- disable_memtable);
- if (!ws.ok()) {
- status = ws;
- break;
- }
- }
- }
+ // Note: if we are to resume after non-OK statuses we need to revisit how
+ // we reacts to non-OK statuses here.
versions_->SetLastSequence(last_sequence);
}
MemTableInsertStatusCheck(w.status);
RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size);
RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
- MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size);
+ RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size);
PERF_TIMER_STOP(write_pre_and_post_process_time);
- if (w.ShouldWriteToWAL()) {
+ if (w.status.ok() && !write_options.disableWAL) {
PERF_TIMER_GUARD(write_wal_time);
stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1);
RecordTick(stats_, WRITE_DONE_BY_SELF, 1);
WriteThread::WriteGroup memtable_write_group;
if (w.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) {
PERF_TIMER_GUARD(write_memtable_time);
- assert(w.status.ok());
+ assert(w.ShouldWriteToMemtable());
write_thread_.EnterAsMemTableWriter(&w, &memtable_write_group);
if (memtable_write_group.size > 1 &&
immutable_db_options_.allow_concurrent_memtable_write) {
concurrent_update);
RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other);
}
- MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size);
+ RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size);
PERF_TIMER_STOP(write_pre_and_post_process_time);
if (!writer->CallbackFailed() && writer->pre_release_callback) {
assert(writer->sequence != kMaxSequenceNumber);
const bool DISABLE_MEMTABLE = true;
- Status ws = writer->pre_release_callback->Callback(writer->sequence,
- DISABLE_MEMTABLE);
+ Status ws = writer->pre_release_callback->Callback(
+ writer->sequence, DISABLE_MEMTABLE, writer->log_used);
if (!ws.ok()) {
status = ws;
break;
status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
if (to_be_cached_state) {
cached_recoverable_state_ = *to_be_cached_state;
- cached_recoverable_state_empty_ = false;
+ cached_recoverable_state_empty_ = false;
}
if (status.ok() && need_log_sync) {
status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
if (to_be_cached_state) {
cached_recoverable_state_ = *to_be_cached_state;
- cached_recoverable_state_empty_ = false;
+ cached_recoverable_state_empty_ = false;
}
log_write_mutex_.Unlock();
const bool DISABLE_MEMTABLE = true;
for (uint64_t sub_batch_seq = seq + 1;
sub_batch_seq < next_seq && status.ok(); sub_batch_seq++) {
+ uint64_t const no_log_num = 0;
status = recoverable_state_pre_release_callback_->Callback(
- sub_batch_seq, !DISABLE_MEMTABLE);
+ sub_batch_seq, !DISABLE_MEMTABLE, no_log_num);
}
}
if (status.ok()) {
return Status::OK();
}
+void DBImpl::SelectColumnFamiliesForAtomicFlush(
+ autovector<ColumnFamilyData*>* cfds) {
+ for (ColumnFamilyData* cfd : *versions_->GetColumnFamilySet()) {
+ if (cfd->IsDropped()) {
+ continue;
+ }
+ if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
+ !cached_recoverable_state_empty_.load()) {
+ cfds->push_back(cfd);
+ }
+ }
+}
+
+// Assign sequence number for atomic flush.
+void DBImpl::AssignAtomicFlushSeq(const autovector<ColumnFamilyData*>& cfds) {
+ assert(immutable_db_options_.atomic_flush);
+ auto seq = versions_->LastSequence();
+ for (auto cfd : cfds) {
+ cfd->imm()->AssignAtomicFlushSeq(seq);
+ }
+}
+
Status DBImpl::SwitchWAL(WriteContext* write_context) {
mutex_.AssertHeld();
assert(write_context != nullptr);
auto oldest_alive_log = alive_log_files_.begin()->number;
bool flush_wont_release_oldest_log = false;
if (allow_2pc()) {
- auto oldest_log_with_uncommited_prep =
+ auto oldest_log_with_uncommitted_prep =
logs_with_prep_tracker_.FindMinLogContainingOutstandingPrep();
- assert(oldest_log_with_uncommited_prep == 0 ||
- oldest_log_with_uncommited_prep >= oldest_alive_log);
- if (oldest_log_with_uncommited_prep > 0 &&
- oldest_log_with_uncommited_prep == oldest_alive_log) {
+ assert(oldest_log_with_uncommitted_prep == 0 ||
+ oldest_log_with_uncommitted_prep >= oldest_alive_log);
+ if (oldest_log_with_uncommitted_prep > 0 &&
+ oldest_log_with_uncommitted_prep == oldest_alive_log) {
if (unable_to_release_oldest_log_) {
// we already attempted to flush all column families dependent on
- // the oldest alive log but the log still contained uncommited
+ // the oldest alive log but the log still contained uncommitted
// transactions so there is still nothing that we can do.
return status;
} else {
ROCKS_LOG_WARN(
immutable_db_options_.info_log,
- "Unable to release oldest log due to uncommited transaction");
+ "Unable to release oldest log due to uncommitted transaction");
unable_to_release_oldest_log_ = true;
flush_wont_release_oldest_log = true;
}
if (!flush_wont_release_oldest_log) {
// we only mark this log as getting flushed if we have successfully
// flushed all data in this log. If this log contains outstanding prepared
- // transactions then we cannot flush this log until those transactions are commited.
+ // transactions then we cannot flush this log until those transactions are
+ // commited.
unable_to_release_oldest_log_ = false;
alive_log_files_.begin()->getting_flushed = true;
}
- ROCKS_LOG_INFO(immutable_db_options_.info_log,
- "Flushing all column families with data in WAL number %" PRIu64
- ". Total log size is %" PRIu64
- " while max_total_wal_size is %" PRIu64,
- oldest_alive_log, total_log_size_.load(), GetMaxTotalWalSize());
+ ROCKS_LOG_INFO(
+ immutable_db_options_.info_log,
+ "Flushing all column families with data in WAL number %" PRIu64
+ ". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64,
+ oldest_alive_log, total_log_size_.load(), GetMaxTotalWalSize());
// no need to refcount because drop is happening in write thread, so can't
// happen while we're in the write thread
- FlushRequest flush_req;
- for (auto cfd : *versions_->GetColumnFamilySet()) {
- if (cfd->IsDropped()) {
- continue;
- }
- if (cfd->OldestLogToKeep() <= oldest_alive_log) {
- status = SwitchMemtable(cfd, write_context);
- if (!status.ok()) {
- break;
+ autovector<ColumnFamilyData*> cfds;
+ if (immutable_db_options_.atomic_flush) {
+ SelectColumnFamiliesForAtomicFlush(&cfds);
+ } else {
+ for (auto cfd : *versions_->GetColumnFamilySet()) {
+ if (cfd->IsDropped()) {
+ continue;
+ }
+ if (cfd->OldestLogToKeep() <= oldest_alive_log) {
+ cfds.push_back(cfd);
}
- flush_req.emplace_back(cfd, cfd->imm()->GetLatestMemTableID());
- cfd->imm()->FlushRequested();
+ }
+ }
+ for (const auto cfd : cfds) {
+ cfd->Ref();
+ status = SwitchMemtable(cfd, write_context);
+ cfd->Unref();
+ if (!status.ok()) {
+ break;
}
}
if (status.ok()) {
+ if (immutable_db_options_.atomic_flush) {
+ AssignAtomicFlushSeq(cfds);
+ }
+ for (auto cfd : cfds) {
+ cfd->imm()->FlushRequested();
+ }
+ FlushRequest flush_req;
+ GenerateFlushRequest(cfds, &flush_req);
SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager);
MaybeScheduleFlushOrCompaction();
}
ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"Flushing column family with largest mem table size. Write buffer is "
- "using %" PRIu64 " bytes out of a total of %" PRIu64 ".",
+ "using %" ROCKSDB_PRIszt " bytes out of a total of %" ROCKSDB_PRIszt ".",
write_buffer_manager_->memory_usage(),
write_buffer_manager_->buffer_size());
// no need to refcount because drop is happening in write thread, so can't
// happen while we're in the write thread
- ColumnFamilyData* cfd_picked = nullptr;
- SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber;
+ autovector<ColumnFamilyData*> cfds;
+ if (immutable_db_options_.atomic_flush) {
+ SelectColumnFamiliesForAtomicFlush(&cfds);
+ } else {
+ ColumnFamilyData* cfd_picked = nullptr;
+ SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber;
- for (auto cfd : *versions_->GetColumnFamilySet()) {
- if (cfd->IsDropped()) {
- continue;
- }
- if (!cfd->mem()->IsEmpty()) {
- // We only consider active mem table, hoping immutable memtable is
- // already in the process of flushing.
- uint64_t seq = cfd->mem()->GetCreationSeq();
- if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) {
- cfd_picked = cfd;
- seq_num_for_cf_picked = seq;
+ for (auto cfd : *versions_->GetColumnFamilySet()) {
+ if (cfd->IsDropped()) {
+ continue;
+ }
+ if (!cfd->mem()->IsEmpty()) {
+ // We only consider active mem table, hoping immutable memtable is
+ // already in the process of flushing.
+ uint64_t seq = cfd->mem()->GetCreationSeq();
+ if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) {
+ cfd_picked = cfd;
+ seq_num_for_cf_picked = seq;
+ }
}
}
+ if (cfd_picked != nullptr) {
+ cfds.push_back(cfd_picked);
+ }
}
- autovector<ColumnFamilyData*> cfds;
- if (cfd_picked != nullptr) {
- cfds.push_back(cfd_picked);
- }
- FlushRequest flush_req;
for (const auto cfd : cfds) {
+ if (cfd->mem()->IsEmpty()) {
+ continue;
+ }
cfd->Ref();
status = SwitchMemtable(cfd, write_context);
cfd->Unref();
if (!status.ok()) {
break;
}
- uint64_t flush_memtable_id = cfd->imm()->GetLatestMemTableID();
- cfd->imm()->FlushRequested();
- flush_req.emplace_back(cfd, flush_memtable_id);
}
if (status.ok()) {
+ if (immutable_db_options_.atomic_flush) {
+ AssignAtomicFlushSeq(cfds);
+ }
+ for (const auto cfd : cfds) {
+ cfd->imm()->FlushRequested();
+ }
+ FlushRequest flush_req;
+ GenerateFlushRequest(cfds, &flush_req);
SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
MaybeScheduleFlushOrCompaction();
}
}
Status DBImpl::ScheduleFlushes(WriteContext* context) {
- ColumnFamilyData* cfd;
- FlushRequest flush_req;
+ autovector<ColumnFamilyData*> cfds;
+ if (immutable_db_options_.atomic_flush) {
+ SelectColumnFamiliesForAtomicFlush(&cfds);
+ for (auto cfd : cfds) {
+ cfd->Ref();
+ }
+ flush_scheduler_.Clear();
+ } else {
+ ColumnFamilyData* tmp_cfd;
+ while ((tmp_cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
+ cfds.push_back(tmp_cfd);
+ }
+ }
Status status;
- while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
- status = SwitchMemtable(cfd, context);
- bool should_schedule = true;
+ for (auto& cfd : cfds) {
+ if (!cfd->mem()->IsEmpty()) {
+ status = SwitchMemtable(cfd, context);
+ }
if (cfd->Unref()) {
delete cfd;
- should_schedule = false;
+ cfd = nullptr;
}
if (!status.ok()) {
break;
}
- if (should_schedule) {
- uint64_t flush_memtable_id = cfd->imm()->GetLatestMemTableID();
- flush_req.emplace_back(cfd, flush_memtable_id);
- }
}
if (status.ok()) {
+ if (immutable_db_options_.atomic_flush) {
+ AssignAtomicFlushSeq(cfds);
+ }
+ FlushRequest flush_req;
+ GenerateFlushRequest(cfds, &flush_req);
SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
MaybeScheduleFlushOrCompaction();
}
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
}
- unique_ptr<WritableFile> lfile;
+ std::unique_ptr<WritableFile> lfile;
log::Writer* new_log = nullptr;
MemTable* new_mem = nullptr;
DBOptions db_options =
BuildDBOptions(immutable_db_options_, mutable_db_options_);
const auto preallocate_block_size =
- GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size);
+ GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size);
auto write_hint = CalculateWALWriteHint();
mutex_.Unlock();
{
// of calling GetWalPreallocateBlockSize()
lfile->SetPreallocationBlockSize(preallocate_block_size);
lfile->SetWriteLifeTimeHint(write_hint);
- unique_ptr<WritableFileWriter> file_writer(
- new WritableFileWriter(std::move(lfile), log_fname, opt_env_opt));
+ std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
+ std::move(lfile), log_fname, opt_env_opt, env_, nullptr /* stats */,
+ immutable_db_options_.listeners));
new_log = new log::Writer(
std::move(file_writer), new_log_number,
immutable_db_options_.recycle_log_file_num > 0, manual_wal_flush_);
new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq);
context->superversion_context.NewSuperVersion();
}
-
-#ifndef ROCKSDB_LITE
- // PLEASE NOTE: We assume that there are no failable operations
- // after lock is acquired below since we are already notifying
- // client about mem table becoming immutable.
- NotifyOnMemTableSealed(cfd, memtable_info);
-#endif //ROCKSDB_LITE
}
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] New memtable created with log file: #%" PRIu64
mutex_.Lock();
if (s.ok() && creating_new_log) {
log_write_mutex_.Lock();
- logfile_number_ = new_log_number;
assert(new_log != nullptr);
- log_empty_ = true;
- log_dir_synced_ = false;
if (!logs_.empty()) {
// Alway flush the buffer of the last log before switching to a new one
log::Writer* cur_log_writer = logs_.back().writer;
if (!s.ok()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"[%s] Failed to switch from #%" PRIu64 " to #%" PRIu64
- " WAL file -- %s\n",
+ " WAL file\n",
cfd->GetName().c_str(), cur_log_writer->get_log_number(),
new_log_number);
}
}
- logs_.emplace_back(logfile_number_, new_log);
- alive_log_files_.push_back(LogFileNumberSize(logfile_number_));
+ if (s.ok()) {
+ logfile_number_ = new_log_number;
+ log_empty_ = true;
+ log_dir_synced_ = false;
+ logs_.emplace_back(logfile_number_, new_log);
+ alive_log_files_.push_back(LogFileNumberSize(logfile_number_));
+ }
log_write_mutex_.Unlock();
}
if (!s.ok()) {
// how do we fail if we're not creating new log?
assert(creating_new_log);
- assert(!new_mem);
- assert(!new_log);
+ if (new_mem) {
+ delete new_mem;
+ }
+ if (new_log) {
+ delete new_log;
+ }
+ SuperVersion* new_superversion =
+ context->superversion_context.new_superversion.release();
+ if (new_superversion != nullptr) {
+ delete new_superversion;
+ }
+ // We may have lost data from the WritableFileBuffer in-memory buffer for
+ // the current log, so treat it as a fatal error and set bg_error
+ error_handler_.SetBGError(s, BackgroundErrorReason::kMemTable);
+ // Read back bg_error in order to get the right severity
+ s = error_handler_.GetBGError();
+
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
cfd->SetMemtable(new_mem);
InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context,
mutable_cf_options);
+#ifndef ROCKSDB_LITE
+ mutex_.Unlock();
+ // Notify client that memtable is sealed, now that we have successfully
+ // installed a new memtable
+ NotifyOnMemTableSealed(cfd, memtable_info);
+ mutex_.Lock();
+#endif // ROCKSDB_LITE
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
size_t DBImpl::GetWalPreallocateBlockSize(uint64_t write_buffer_size) const {
mutex_.AssertHeld();
- size_t bsize = static_cast<size_t>(
- write_buffer_size / 10 + write_buffer_size);
+ size_t bsize =
+ static_cast<size_t>(write_buffer_size / 10 + write_buffer_size);
// Some users might set very high write_buffer_size and rely on
// max_total_wal_size or other parameters to control the WAL size.
if (mutable_db_options_.max_total_wal_size > 0) {
- bsize = std::min<size_t>(bsize, static_cast<size_t>(
- mutable_db_options_.max_total_wal_size));
+ bsize = std::min<size_t>(
+ bsize, static_cast<size_t>(mutable_db_options_.max_total_wal_size));
}
if (immutable_db_options_.db_write_buffer_size > 0) {
bsize = std::min<size_t>(bsize, immutable_db_options_.db_write_buffer_size);