#include <stack>
#include <stdexcept>
#include <type_traits>
+#include <unordered_map>
#include <vector>
#include "db/column_family.h"
-#include "db/db_impl.h"
+#include "db/db_impl/db_impl.h"
#include "db/dbformat.h"
#include "db/flush_scheduler.h"
#include "db/memtable.h"
#include "db/merge_context.h"
#include "db/snapshot_impl.h"
+#include "db/trim_history_scheduler.h"
#include "db/write_batch_internal.h"
#include "monitoring/perf_context_imp.h"
#include "monitoring/statistics.h"
#include "rocksdb/merge_operator.h"
+#include "util/autovector.h"
+#include "util/cast_util.h"
#include "util/coding.h"
#include "util/duplicate_detector.h"
#include "util/string_util.h"
#include "util/util.h"
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
// anon namespace for file-local types
namespace {
}
};
+class TimestampAssigner : public WriteBatch::Handler {
+ public:
+ explicit TimestampAssigner(const Slice& ts)
+ : timestamp_(ts), timestamps_(kEmptyTimestampList) {}
+ explicit TimestampAssigner(const std::vector<Slice>& ts_list)
+ : timestamps_(ts_list) {
+ SanityCheck();
+ }
+ ~TimestampAssigner() override {}
+
+ Status PutCF(uint32_t, const Slice& key, const Slice&) override {
+ AssignTimestamp(key);
+ ++idx_;
+ return Status::OK();
+ }
+
+ Status DeleteCF(uint32_t, const Slice& key) override {
+ AssignTimestamp(key);
+ ++idx_;
+ return Status::OK();
+ }
+
+ Status SingleDeleteCF(uint32_t, const Slice& key) override {
+ AssignTimestamp(key);
+ ++idx_;
+ return Status::OK();
+ }
+
+ Status DeleteRangeCF(uint32_t, const Slice& begin_key,
+ const Slice& end_key) override {
+ AssignTimestamp(begin_key);
+ AssignTimestamp(end_key);
+ ++idx_;
+ return Status::OK();
+ }
+
+ Status MergeCF(uint32_t, const Slice& key, const Slice&) override {
+ AssignTimestamp(key);
+ ++idx_;
+ return Status::OK();
+ }
+
+ Status PutBlobIndexCF(uint32_t, const Slice&, const Slice&) override {
+ // TODO (yanqin): support blob db in the future.
+ return Status::OK();
+ }
+
+ Status MarkBeginPrepare(bool) override {
+ // TODO (yanqin): support in the future.
+ return Status::OK();
+ }
+
+ Status MarkEndPrepare(const Slice&) override {
+ // TODO (yanqin): support in the future.
+ return Status::OK();
+ }
+
+ Status MarkCommit(const Slice&) override {
+ // TODO (yanqin): support in the future.
+ return Status::OK();
+ }
+
+ Status MarkRollback(const Slice&) override {
+ // TODO (yanqin): support in the future.
+ return Status::OK();
+ }
+
+ private:
+ void SanityCheck() const {
+ assert(!timestamps_.empty());
+#ifndef NDEBUG
+ const size_t ts_sz = timestamps_[0].size();
+ for (size_t i = 1; i != timestamps_.size(); ++i) {
+ assert(ts_sz == timestamps_[i].size());
+ }
+#endif // !NDEBUG
+ }
+
+ void AssignTimestamp(const Slice& key) {
+ assert(timestamps_.empty() || idx_ < timestamps_.size());
+ const Slice& ts = timestamps_.empty() ? timestamp_ : timestamps_[idx_];
+ size_t ts_sz = ts.size();
+ char* ptr = const_cast<char*>(key.data() + key.size() - ts_sz);
+ memcpy(ptr, ts.data(), ts_sz);
+ }
+
+ static const std::vector<Slice> kEmptyTimestampList;
+ const Slice timestamp_;
+ const std::vector<Slice>& timestamps_;
+ size_t idx_ = 0;
+
+ // No copy or move.
+ TimestampAssigner(const TimestampAssigner&) = delete;
+ TimestampAssigner(TimestampAssigner&&) = delete;
+ TimestampAssigner& operator=(const TimestampAssigner&) = delete;
+ TimestampAssigner&& operator=(TimestampAssigner&&) = delete;
+};
+const std::vector<Slice> TimestampAssigner::kEmptyTimestampList;
+
} // anon namespace
struct SavePoints {
- std::stack<SavePoint> stack;
+ std::stack<SavePoint, autovector<SavePoint>> stack;
};
WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes)
- : save_points_(nullptr), content_flags_(0), max_bytes_(max_bytes), rep_() {
+ : content_flags_(0), max_bytes_(max_bytes), rep_(), timestamp_size_(0) {
+ rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader)
+ ? reserved_bytes
+ : WriteBatchInternal::kHeader);
+ rep_.resize(WriteBatchInternal::kHeader);
+}
+
+WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes, size_t ts_sz)
+ : content_flags_(0), max_bytes_(max_bytes), rep_(), timestamp_size_(ts_sz) {
rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader) ?
reserved_bytes : WriteBatchInternal::kHeader);
rep_.resize(WriteBatchInternal::kHeader);
}
WriteBatch::WriteBatch(const std::string& rep)
- : save_points_(nullptr),
- content_flags_(ContentFlags::DEFERRED),
+ : content_flags_(ContentFlags::DEFERRED),
max_bytes_(0),
- rep_(rep) {}
+ rep_(rep),
+ timestamp_size_(0) {}
WriteBatch::WriteBatch(std::string&& rep)
- : save_points_(nullptr),
- content_flags_(ContentFlags::DEFERRED),
+ : content_flags_(ContentFlags::DEFERRED),
max_bytes_(0),
- rep_(std::move(rep)) {}
+ rep_(std::move(rep)),
+ timestamp_size_(0) {}
WriteBatch::WriteBatch(const WriteBatch& src)
- : save_points_(src.save_points_),
- wal_term_point_(src.wal_term_point_),
+ : wal_term_point_(src.wal_term_point_),
content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
max_bytes_(src.max_bytes_),
- rep_(src.rep_) {}
+ rep_(src.rep_),
+ timestamp_size_(src.timestamp_size_) {
+ if (src.save_points_ != nullptr) {
+ save_points_.reset(new SavePoints());
+ save_points_->stack = src.save_points_->stack;
+ }
+}
WriteBatch::WriteBatch(WriteBatch&& src) noexcept
: save_points_(std::move(src.save_points_)),
wal_term_point_(std::move(src.wal_term_point_)),
content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
max_bytes_(src.max_bytes_),
- rep_(std::move(src.rep_)) {}
+ rep_(std::move(src.rep_)),
+ timestamp_size_(src.timestamp_size_) {}
WriteBatch& WriteBatch::operator=(const WriteBatch& src) {
if (&src != this) {
return *this;
}
-WriteBatch::~WriteBatch() { delete save_points_; }
+WriteBatch::~WriteBatch() { }
WriteBatch::Handler::~Handler() { }
wal_term_point_.clear();
}
-int WriteBatch::Count() const {
- return WriteBatchInternal::Count(this);
-}
+uint32_t WriteBatch::Count() const { return WriteBatchInternal::Count(this); }
uint32_t WriteBatch::ComputeContentFlags() const {
auto rv = content_flags_.load(std::memory_order_relaxed);
}
Status WriteBatch::Iterate(Handler* handler) const {
- Slice input(rep_);
- if (input.size() < WriteBatchInternal::kHeader) {
+ if (rep_.size() < WriteBatchInternal::kHeader) {
return Status::Corruption("malformed WriteBatch (too small)");
}
- input.remove_prefix(WriteBatchInternal::kHeader);
+ return WriteBatchInternal::Iterate(this, handler, WriteBatchInternal::kHeader,
+ rep_.size());
+}
+
+Status WriteBatchInternal::Iterate(const WriteBatch* wb,
+ WriteBatch::Handler* handler, size_t begin,
+ size_t end) {
+ if (begin > wb->rep_.size() || end > wb->rep_.size() || end < begin) {
+ return Status::Corruption("Invalid start/end bounds for Iterate");
+ }
+ assert(begin <= end);
+ Slice input(wb->rep_.data() + begin, static_cast<size_t>(end - begin));
+ bool whole_batch =
+ (begin == WriteBatchInternal::kHeader) && (end == wb->rep_.size());
+
Slice key, value, blob, xid;
// Sometimes a sub-batch starts with a Noop. We want to exclude such Noops as
// the batch boundary symbols otherwise we would mis-count the number of
// batches. We do that by checking whether the accumulated batch is empty
// before seeing the next Noop.
bool empty_batch = true;
- int found = 0;
+ uint32_t found = 0;
Status s;
char tag = 0;
uint32_t column_family = 0; // default
}
} else {
assert(s.IsTryAgain());
- assert(!last_was_try_again); // to detect infinite loop bugs
+ assert(!last_was_try_again); // to detect infinite loop bugs
if (UNLIKELY(last_was_try_again)) {
return Status::Corruption(
"two consecutive TryAgain in WriteBatch handler; this is either a "
switch (tag) {
case kTypeColumnFamilyValue:
case kTypeValue:
- assert(content_flags_.load(std::memory_order_relaxed) &
+ assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_PUT));
s = handler->PutCF(column_family, key, value);
if (LIKELY(s.ok())) {
break;
case kTypeColumnFamilyDeletion:
case kTypeDeletion:
- assert(content_flags_.load(std::memory_order_relaxed) &
+ assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_DELETE));
s = handler->DeleteCF(column_family, key);
if (LIKELY(s.ok())) {
break;
case kTypeColumnFamilySingleDeletion:
case kTypeSingleDeletion:
- assert(content_flags_.load(std::memory_order_relaxed) &
+ assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE));
s = handler->SingleDeleteCF(column_family, key);
if (LIKELY(s.ok())) {
break;
case kTypeColumnFamilyRangeDeletion:
case kTypeRangeDeletion:
- assert(content_flags_.load(std::memory_order_relaxed) &
+ assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_DELETE_RANGE));
s = handler->DeleteRangeCF(column_family, key, value);
if (LIKELY(s.ok())) {
break;
case kTypeColumnFamilyMerge:
case kTypeMerge:
- assert(content_flags_.load(std::memory_order_relaxed) &
+ assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_MERGE));
s = handler->MergeCF(column_family, key, value);
if (LIKELY(s.ok())) {
break;
case kTypeColumnFamilyBlobIndex:
case kTypeBlobIndex:
- assert(content_flags_.load(std::memory_order_relaxed) &
+ assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_BLOB_INDEX));
s = handler->PutBlobIndexCF(column_family, key, value);
if (LIKELY(s.ok())) {
empty_batch = false;
break;
case kTypeBeginPrepareXID:
- assert(content_flags_.load(std::memory_order_relaxed) &
+ assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
handler->MarkBeginPrepare();
empty_batch = false;
}
break;
case kTypeBeginPersistedPrepareXID:
- assert(content_flags_.load(std::memory_order_relaxed) &
+ assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
handler->MarkBeginPrepare();
empty_batch = false;
}
break;
case kTypeBeginUnprepareXID:
- assert(content_flags_.load(std::memory_order_relaxed) &
+ assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_UNPREPARE));
handler->MarkBeginPrepare(true /* unprepared */);
empty_batch = false;
}
break;
case kTypeEndPrepareXID:
- assert(content_flags_.load(std::memory_order_relaxed) &
+ assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_END_PREPARE));
handler->MarkEndPrepare(xid);
empty_batch = true;
break;
case kTypeCommitXID:
- assert(content_flags_.load(std::memory_order_relaxed) &
+ assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT));
handler->MarkCommit(xid);
empty_batch = true;
break;
case kTypeRollbackXID:
- assert(content_flags_.load(std::memory_order_relaxed) &
+ assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK));
handler->MarkRollback(xid);
empty_batch = true;
if (!s.ok()) {
return s;
}
- if (handler_continue && found != WriteBatchInternal::Count(this)) {
+ if (handler_continue && whole_batch &&
+ found != WriteBatchInternal::Count(wb)) {
return Status::Corruption("WriteBatch has wrong count");
} else {
return Status::OK();
b->is_latest_persistent_state_ = true;
}
-int WriteBatchInternal::Count(const WriteBatch* b) {
+uint32_t WriteBatchInternal::Count(const WriteBatch* b) {
return DecodeFixed32(b->rep_.data() + 8);
}
-void WriteBatchInternal::SetCount(WriteBatch* b, int n) {
+void WriteBatchInternal::SetCount(WriteBatch* b, uint32_t n) {
EncodeFixed32(&b->rep_[8], n);
}
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
PutVarint32(&b->rep_, column_family_id);
}
- PutLengthPrefixedSlice(&b->rep_, key);
+ if (0 == b->timestamp_size_) {
+ PutLengthPrefixedSlice(&b->rep_, key);
+ } else {
+ PutVarint32(&b->rep_,
+ static_cast<uint32_t>(key.size() + b->timestamp_size_));
+ b->rep_.append(key.data(), key.size());
+ b->rep_.append(b->timestamp_size_, '\0');
+ }
PutLengthPrefixedSlice(&b->rep_, value);
b->content_flags_.store(
b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
PutVarint32(&b->rep_, column_family_id);
}
- PutLengthPrefixedSliceParts(&b->rep_, key);
+ if (0 == b->timestamp_size_) {
+ PutLengthPrefixedSliceParts(&b->rep_, key);
+ } else {
+ PutLengthPrefixedSlicePartsWithPadding(&b->rep_, key, b->timestamp_size_);
+ }
PutLengthPrefixedSliceParts(&b->rep_, value);
b->content_flags_.store(
b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
void WriteBatch::SetSavePoint() {
if (save_points_ == nullptr) {
- save_points_ = new SavePoints();
+ save_points_.reset(new SavePoints());
}
// Record length and count of current batch of writes.
save_points_->stack.push(SavePoint(
save_points_->stack.pop();
assert(savepoint.size <= rep_.size());
- assert(savepoint.count <= Count());
+ assert(static_cast<uint32_t>(savepoint.count) <= Count());
if (savepoint.size == rep_.size()) {
// No changes to rollback
return Status::OK();
}
+Status WriteBatch::AssignTimestamp(const Slice& ts) {
+ TimestampAssigner ts_assigner(ts);
+ return Iterate(&ts_assigner);
+}
+
+Status WriteBatch::AssignTimestamps(const std::vector<Slice>& ts_list) {
+ TimestampAssigner ts_assigner(ts_list);
+ return Iterate(&ts_assigner);
+}
+
class MemTableInserter : public WriteBatch::Handler {
SequenceNumber sequence_;
ColumnFamilyMemTables* const cf_mems_;
FlushScheduler* const flush_scheduler_;
+ TrimHistoryScheduler* const trim_history_scheduler_;
const bool ignore_missing_column_families_;
const uint64_t recovering_log_number_;
// log number that all Memtables inserted into should reference
DupDetector duplicate_detector_;
bool dup_dectector_on_;
+ bool hint_per_batch_;
+ bool hint_created_;
+ // Hints for this batch
+ using HintMap = std::unordered_map<MemTable*, void*>;
+ using HintMapType = std::aligned_storage<sizeof(HintMap)>::type;
+ HintMapType hint_;
+
+ HintMap& GetHintMap() {
+ assert(hint_per_batch_);
+ if (!hint_created_) {
+ new (&hint_) HintMap();
+ hint_created_ = true;
+ }
+ return *reinterpret_cast<HintMap*>(&hint_);
+ }
+
MemPostInfoMap& GetPostMap() {
assert(concurrent_memtable_writes_);
if(!post_info_created_) {
// cf_mems should not be shared with concurrent inserters
MemTableInserter(SequenceNumber _sequence, ColumnFamilyMemTables* cf_mems,
FlushScheduler* flush_scheduler,
+ TrimHistoryScheduler* trim_history_scheduler,
bool ignore_missing_column_families,
uint64_t recovering_log_number, DB* db,
bool concurrent_memtable_writes,
bool* has_valid_writes = nullptr, bool seq_per_batch = false,
- bool batch_per_txn = true)
+ bool batch_per_txn = true, bool hint_per_batch = false)
: sequence_(_sequence),
cf_mems_(cf_mems),
flush_scheduler_(flush_scheduler),
+ trim_history_scheduler_(trim_history_scheduler),
ignore_missing_column_families_(ignore_missing_column_families),
recovering_log_number_(recovering_log_number),
log_number_ref_(0),
- db_(reinterpret_cast<DBImpl*>(db)),
+ db_(static_cast_with_check<DBImpl, DB>(db)),
concurrent_memtable_writes_(concurrent_memtable_writes),
post_info_created_(false),
has_valid_writes_(has_valid_writes),
write_before_prepare_(!batch_per_txn),
unprepared_batch_(false),
duplicate_detector_(),
- dup_dectector_on_(false) {
+ dup_dectector_on_(false),
+ hint_per_batch_(hint_per_batch),
+ hint_created_(false) {
assert(cf_mems_);
}
reinterpret_cast<MemPostInfoMap*>
(&mem_post_info_map_)->~MemPostInfoMap();
}
+ if (hint_created_) {
+ for (auto iter : GetHintMap()) {
+ delete[] reinterpret_cast<char*>(iter.second);
+ }
+ reinterpret_cast<HintMap*>(&hint_)->~HintMap();
+ }
delete rebuilding_trx_;
}
if (!moptions->inplace_update_support) {
bool mem_res =
mem->Add(sequence_, value_type, key, value,
- concurrent_memtable_writes_, get_post_process_info(mem));
+ concurrent_memtable_writes_, get_post_process_info(mem),
+ hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
if (UNLIKELY(!mem_res)) {
assert(seq_per_batch_);
ret_status = Status::TryAgain("key+seq exists");
MemTable* mem = cf_mems_->GetMemTable();
bool mem_res =
mem->Add(sequence_, delete_type, key, value,
- concurrent_memtable_writes_, get_post_process_info(mem));
+ concurrent_memtable_writes_, get_post_process_info(mem),
+ hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
if (UNLIKELY(!mem_res)) {
assert(seq_per_batch_);
ret_status = Status::TryAgain("key+seq exists");
Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
- assert(!concurrent_memtable_writes_);
// optimize for non-recovery mode
if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value);
MemTable* mem = cf_mems_->GetMemTable();
auto* moptions = mem->GetImmutableMemTableOptions();
bool perform_merge = false;
+ assert(!concurrent_memtable_writes_ ||
+ moptions->max_successive_merges == 0);
// If we pass DB through and options.max_successive_merges is hit
// during recovery, Get() will be issued which will try to acquire
// So we disable merge in recovery
if (moptions->max_successive_merges > 0 && db_ != nullptr &&
recovering_log_number_ == 0) {
+ assert(!concurrent_memtable_writes_);
LookupKey lkey(key, sequence_);
// Count the number of successive merges at the head
perform_merge = false;
} else {
// 3) Add value to memtable
+ assert(!concurrent_memtable_writes_);
bool mem_res = mem->Add(sequence_, kTypeValue, key, new_value);
if (UNLIKELY(!mem_res)) {
assert(seq_per_batch_);
if (!perform_merge) {
// Add merge operator to memtable
- bool mem_res = mem->Add(sequence_, kTypeMerge, key, value);
+ bool mem_res =
+ mem->Add(sequence_, kTypeMerge, key, value,
+ concurrent_memtable_writes_, get_post_process_info(mem));
if (UNLIKELY(!mem_res)) {
assert(seq_per_batch_);
ret_status = Status::TryAgain("key+seq exists");
cfd->mem()->MarkFlushScheduled()) {
// MarkFlushScheduled only returns true if we are the one that
// should take action, so no need to dedup further
- flush_scheduler_->ScheduleFlush(cfd);
+ flush_scheduler_->ScheduleWork(cfd);
+ }
+ }
+ // check if memtable_list size exceeds max_write_buffer_size_to_maintain
+ if (trim_history_scheduler_ != nullptr) {
+ auto* cfd = cf_mems_->current();
+
+ assert(cfd);
+ assert(cfd->ioptions());
+
+ const size_t size_to_maintain = static_cast<size_t>(
+ cfd->ioptions()->max_write_buffer_size_to_maintain);
+
+ if (size_to_maintain > 0) {
+ MemTableList* const imm = cfd->imm();
+ assert(imm);
+
+ if (imm->HasHistory()) {
+ const MemTable* const mem = cfd->mem();
+ assert(mem);
+
+ if (mem->ApproximateMemoryUsageFast() +
+ imm->ApproximateMemoryUsageExcludingLast() >=
+ size_to_maintain &&
+ imm->MarkTrimHistoryNeeded()) {
+ trim_history_scheduler_->ScheduleWork(cfd);
+ }
+ }
}
}
}
// we are now iterating through a prepared section
rebuilding_trx_ = new WriteBatch();
rebuilding_trx_seq_ = sequence_;
- // We only call MarkBeginPrepare once per batch, and unprepared_batch_
- // is initialized to false by default.
+ // Verify that we have matching MarkBeginPrepare/MarkEndPrepare markers.
+ // unprepared_batch_ should be false because it is false by default, and
+ // gets reset to false in MarkEndPrepare.
assert(!unprepared_batch_);
unprepared_batch_ = unprepare;
db_->InsertRecoveredTransaction(recovering_log_number_, name.ToString(),
rebuilding_trx_, rebuilding_trx_seq_,
batch_cnt, unprepared_batch_);
+ unprepared_batch_ = false;
rebuilding_trx_ = nullptr;
} else {
assert(rebuilding_trx_ == nullptr);
Status WriteBatchInternal::InsertInto(
WriteThread::WriteGroup& write_group, SequenceNumber sequence,
ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
+ TrimHistoryScheduler* trim_history_scheduler,
bool ignore_missing_column_families, uint64_t recovery_log_number, DB* db,
bool concurrent_memtable_writes, bool seq_per_batch, bool batch_per_txn) {
MemTableInserter inserter(
- sequence, memtables, flush_scheduler, ignore_missing_column_families,
- recovery_log_number, db, concurrent_memtable_writes,
- nullptr /*has_valid_writes*/, seq_per_batch, batch_per_txn);
+ sequence, memtables, flush_scheduler, trim_history_scheduler,
+ ignore_missing_column_families, recovery_log_number, db,
+ concurrent_memtable_writes, nullptr /*has_valid_writes*/, seq_per_batch,
+ batch_per_txn);
for (auto w : write_group) {
if (w->CallbackFailed()) {
continue;
Status WriteBatchInternal::InsertInto(
WriteThread::Writer* writer, SequenceNumber sequence,
ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
+ TrimHistoryScheduler* trim_history_scheduler,
bool ignore_missing_column_families, uint64_t log_number, DB* db,
bool concurrent_memtable_writes, bool seq_per_batch, size_t batch_cnt,
- bool batch_per_txn) {
+ bool batch_per_txn, bool hint_per_batch) {
#ifdef NDEBUG
(void)batch_cnt;
#endif
assert(writer->ShouldWriteToMemtable());
MemTableInserter inserter(
- sequence, memtables, flush_scheduler, ignore_missing_column_families,
- log_number, db, concurrent_memtable_writes, nullptr /*has_valid_writes*/,
- seq_per_batch, batch_per_txn);
+ sequence, memtables, flush_scheduler, trim_history_scheduler,
+ ignore_missing_column_families, log_number, db,
+ concurrent_memtable_writes, nullptr /*has_valid_writes*/, seq_per_batch,
+ batch_per_txn, hint_per_batch);
SetSequence(writer->batch, sequence);
inserter.set_log_number_ref(writer->log_ref);
Status s = writer->batch->Iterate(&inserter);
Status WriteBatchInternal::InsertInto(
const WriteBatch* batch, ColumnFamilyMemTables* memtables,
- FlushScheduler* flush_scheduler, bool ignore_missing_column_families,
- uint64_t log_number, DB* db, bool concurrent_memtable_writes,
- SequenceNumber* next_seq, bool* has_valid_writes, bool seq_per_batch,
- bool batch_per_txn) {
+ FlushScheduler* flush_scheduler,
+ TrimHistoryScheduler* trim_history_scheduler,
+ bool ignore_missing_column_families, uint64_t log_number, DB* db,
+ bool concurrent_memtable_writes, SequenceNumber* next_seq,
+ bool* has_valid_writes, bool seq_per_batch, bool batch_per_txn) {
MemTableInserter inserter(Sequence(batch), memtables, flush_scheduler,
+ trim_history_scheduler,
ignore_missing_column_families, log_number, db,
concurrent_memtable_writes, has_valid_writes,
seq_per_batch, batch_per_txn);
}
}
-} // namespace rocksdb
+} // namespace ROCKSDB_NAMESPACE