#include <array>
#include <limits>
#include <memory>
+
#include "db/dbformat.h"
+#include "db/kv_checksum.h"
#include "db/merge_context.h"
#include "db/merge_helper.h"
#include "db/pinned_iterators_manager.h"
#include "db/range_tombstone_fragmenter.h"
#include "db/read_callback.h"
+#include "db/wide/wide_column_serialization.h"
+#include "logging/logging.h"
#include "memory/arena.h"
#include "memory/memory_usage.h"
#include "monitoring/perf_context_imp.h"
#include "rocksdb/iterator.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/slice_transform.h"
+#include "rocksdb/types.h"
#include "rocksdb/write_buffer_manager.h"
#include "table/internal_iterator.h"
#include "table/iterator_wrapper.h"
namespace ROCKSDB_NAMESPACE {
ImmutableMemTableOptions::ImmutableMemTableOptions(
- const ImmutableCFOptions& ioptions,
+ const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options)
: arena_block_size(mutable_cf_options.arena_block_size),
memtable_prefix_bloom_bits(
inplace_update_num_locks(mutable_cf_options.inplace_update_num_locks),
inplace_callback(ioptions.inplace_callback),
max_successive_merges(mutable_cf_options.max_successive_merges),
- statistics(ioptions.statistics),
- merge_operator(ioptions.merge_operator),
- info_log(ioptions.info_log),
- allow_data_in_errors(ioptions.allow_data_in_errors) {}
+ statistics(ioptions.stats),
+ merge_operator(ioptions.merge_operator.get()),
+ info_log(ioptions.logger),
+ allow_data_in_errors(ioptions.allow_data_in_errors),
+ protection_bytes_per_key(
+ mutable_cf_options.memtable_protection_bytes_per_key) {}
MemTable::MemTable(const InternalKeyComparator& cmp,
- const ImmutableCFOptions& ioptions,
+ const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options,
WriteBufferManager* write_buffer_manager,
SequenceNumber latest_seq, uint32_t column_family_id)
mutable_cf_options.memtable_huge_page_size),
table_(ioptions.memtable_factory->CreateMemTableRep(
comparator_, &arena_, mutable_cf_options.prefix_extractor.get(),
- ioptions.info_log, column_family_id)),
+ ioptions.logger, column_family_id)),
range_del_table_(SkipListFactory().CreateMemTableRep(
- comparator_, &arena_, nullptr /* transform */, ioptions.info_log,
+ comparator_, &arena_, nullptr /* transform */, ioptions.logger,
column_family_id)),
is_range_del_table_empty_(true),
data_size_(0),
: 0),
prefix_extractor_(mutable_cf_options.prefix_extractor.get()),
flush_state_(FLUSH_NOT_REQUESTED),
- env_(ioptions.env),
+ clock_(ioptions.clock),
insert_with_hint_prefix_extractor_(
- ioptions.memtable_insert_with_hint_prefix_extractor),
+ ioptions.memtable_insert_with_hint_prefix_extractor.get()),
oldest_key_time_(std::numeric_limits<uint64_t>::max()),
atomic_flush_seqno_(kMaxSequenceNumber),
approximate_memory_usage_(0) {
bloom_filter_.reset(
new DynamicBloom(&arena_, moptions_.memtable_prefix_bloom_bits,
6 /* hard coded 6 probes */,
- moptions_.memtable_huge_page_size, ioptions.info_log));
+ moptions_.memtable_huge_page_size, ioptions.logger));
+ }
+ // Initialize cached_range_tombstone_ here since it could
+ // be read before it is constructed in MemTable::Add(), which could also lead
+ // to a data race on the global mutex table backing atomic shared_ptr.
+ auto new_cache = std::make_shared<FragmentedRangeTombstoneListCache>();
+ size_t size = cached_range_tombstone_.Size();
+ for (size_t i = 0; i < size; ++i) {
+ std::shared_ptr<FragmentedRangeTombstoneListCache>* local_cache_ref_ptr =
+ cached_range_tombstone_.AccessAtCore(i);
+ auto new_local_cache_ref = std::make_shared<
+ const std::shared_ptr<FragmentedRangeTombstoneListCache>>(new_cache);
+ std::atomic_store_explicit(
+ local_cache_ref_ptr,
+ std::shared_ptr<FragmentedRangeTombstoneListCache>(new_local_cache_ref,
+ new_cache.get()),
+ std::memory_order_relaxed);
}
}
for (size_t usage : usages) {
// If usage + total_usage >= kMaxSizet, return kMaxSizet.
// the following variation is to avoid numeric overflow.
- if (usage >= port::kMaxSizet - total_usage) {
- return port::kMaxSizet;
+ if (usage >= std::numeric_limits<size_t>::max() - total_usage) {
+ return std::numeric_limits<size_t>::max();
}
total_usage += usage;
}
uint64_t oldest_key_time = oldest_key_time_.load(std::memory_order_relaxed);
if (oldest_key_time == std::numeric_limits<uint64_t>::max()) {
int64_t current_time = 0;
- auto s = env_->GetCurrentTime(¤t_time);
+ auto s = clock_->GetCurrentTime(¤t_time);
if (s.ok()) {
assert(current_time >= 0);
// If fail, the timestamp is already set.
}
}
+Status MemTable::VerifyEntryChecksum(const char* entry,
+ size_t protection_bytes_per_key,
+ bool allow_data_in_errors) {
+ if (protection_bytes_per_key == 0) {
+ return Status::OK();
+ }
+ uint32_t key_length;
+ const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
+ if (key_ptr == nullptr) {
+ return Status::Corruption("Unable to parse internal key length");
+ }
+ if (key_length < 8) {
+ return Status::Corruption("Memtable entry internal key length too short.");
+ }
+ Slice user_key = Slice(key_ptr, key_length - 8);
+
+ const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
+ ValueType type;
+ SequenceNumber seq;
+ UnPackSequenceAndType(tag, &seq, &type);
+
+ uint32_t value_length = 0;
+ const char* value_ptr = GetVarint32Ptr(
+ key_ptr + key_length, key_ptr + key_length + 5, &value_length);
+ if (value_ptr == nullptr) {
+ return Status::Corruption("Unable to parse internal key value");
+ }
+ Slice value = Slice(value_ptr, value_length);
+
+ const char* checksum_ptr = value_ptr + value_length;
+ uint64_t expected = ProtectionInfo64()
+ .ProtectKVO(user_key, value, type)
+ .ProtectS(seq)
+ .GetVal();
+ bool match = true;
+ switch (protection_bytes_per_key) {
+ case 1:
+ match = static_cast<uint8_t>(checksum_ptr[0]) ==
+ static_cast<uint8_t>(expected);
+ break;
+ case 2:
+ match = DecodeFixed16(checksum_ptr) == static_cast<uint16_t>(expected);
+ break;
+ case 4:
+ match = DecodeFixed32(checksum_ptr) == static_cast<uint32_t>(expected);
+ break;
+ case 8:
+ match = DecodeFixed64(checksum_ptr) == expected;
+ break;
+ default:
+ assert(false);
+ }
+ if (!match) {
+ std::string msg(
+ "Corrupted memtable entry, per key-value checksum verification "
+ "failed.");
+ if (allow_data_in_errors) {
+ msg.append("Unrecognized value type: " +
+ std::to_string(static_cast<int>(type)) + ". ");
+ msg.append("User key: " + user_key.ToString(/*hex=*/true) + ". ");
+ msg.append("seq: " + std::to_string(seq) + ".");
+ }
+ return Status::Corruption(msg.c_str());
+ }
+ return Status::OK();
+}
+
int MemTable::KeyComparator::operator()(const char* prefix_len_key1,
const char* prefix_len_key2) const {
// Internal keys are encoded as length-prefixed strings.
return comparator.CompareKeySeq(k1, k2);
}
-int MemTable::KeyComparator::operator()(const char* prefix_len_key,
- const KeyComparator::DecodedType& key)
- const {
+int MemTable::KeyComparator::operator()(
+ const char* prefix_len_key, const KeyComparator::DecodedType& key) const {
// Internal keys are encoded as length-prefixed strings.
Slice a = GetLengthPrefixedSlice(prefix_len_key);
return comparator.CompareKeySeq(a, key);
valid_(false),
arena_mode_(arena != nullptr),
value_pinned_(
- !mem.GetImmutableMemTableOptions()->inplace_update_support) {
+ !mem.GetImmutableMemTableOptions()->inplace_update_support),
+ protection_bytes_per_key_(mem.moptions_.protection_bytes_per_key),
+ status_(Status::OK()),
+ logger_(mem.moptions_.info_log) {
if (use_range_del_table) {
iter_ = mem.range_del_table_->GetIterator(arena);
} else if (prefix_extractor_ != nullptr && !read_options.total_order_seek &&
} else {
iter_ = mem.table_->GetIterator(arena);
}
+ status_.PermitUncheckedError();
}
// No copying allowed
MemTableIterator(const MemTableIterator&) = delete;
PinnedIteratorsManager* pinned_iters_mgr_ = nullptr;
#endif
- bool Valid() const override { return valid_; }
+ bool Valid() const override { return valid_ && status_.ok(); }
void Seek(const Slice& k) override {
PERF_TIMER_GUARD(seek_on_memtable_time);
PERF_COUNTER_ADD(seek_on_memtable_count, 1);
if (bloom_) {
// iterator should only use prefix bloom filter
- Slice user_k(ExtractUserKey(k));
- if (prefix_extractor_->InDomain(user_k) &&
- !bloom_->MayContain(prefix_extractor_->Transform(user_k))) {
- PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
- valid_ = false;
- return;
- } else {
- PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
+ auto ts_sz = comparator_.comparator.user_comparator()->timestamp_size();
+ Slice user_k_without_ts(ExtractUserKeyAndStripTimestamp(k, ts_sz));
+ if (prefix_extractor_->InDomain(user_k_without_ts)) {
+ if (!bloom_->MayContain(
+ prefix_extractor_->Transform(user_k_without_ts))) {
+ PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
+ valid_ = false;
+ return;
+ } else {
+ PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
+ }
}
}
iter_->Seek(k, nullptr);
valid_ = iter_->Valid();
+ VerifyEntryChecksum();
}
void SeekForPrev(const Slice& k) override {
PERF_TIMER_GUARD(seek_on_memtable_time);
PERF_COUNTER_ADD(seek_on_memtable_count, 1);
if (bloom_) {
- Slice user_k(ExtractUserKey(k));
- if (prefix_extractor_->InDomain(user_k) &&
- !bloom_->MayContain(prefix_extractor_->Transform(user_k))) {
- PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
- valid_ = false;
- return;
- } else {
- PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
+ auto ts_sz = comparator_.comparator.user_comparator()->timestamp_size();
+ Slice user_k_without_ts(ExtractUserKeyAndStripTimestamp(k, ts_sz));
+ if (prefix_extractor_->InDomain(user_k_without_ts)) {
+ if (!bloom_->MayContain(
+ prefix_extractor_->Transform(user_k_without_ts))) {
+ PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
+ valid_ = false;
+ return;
+ } else {
+ PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
+ }
}
}
iter_->Seek(k, nullptr);
valid_ = iter_->Valid();
- if (!Valid()) {
+ VerifyEntryChecksum();
+ if (!Valid() && status().ok()) {
SeekToLast();
}
while (Valid() && comparator_.comparator.Compare(k, key()) < 0) {
void SeekToFirst() override {
iter_->SeekToFirst();
valid_ = iter_->Valid();
+ VerifyEntryChecksum();
}
void SeekToLast() override {
iter_->SeekToLast();
valid_ = iter_->Valid();
+ VerifyEntryChecksum();
}
void Next() override {
PERF_COUNTER_ADD(next_on_memtable_count, 1);
iter_->Next();
TEST_SYNC_POINT_CALLBACK("MemTableIterator::Next:0", iter_);
valid_ = iter_->Valid();
+ VerifyEntryChecksum();
}
bool NextAndGetResult(IterateResult* result) override {
Next();
- bool is_valid = valid_;
+ bool is_valid = Valid();
if (is_valid) {
result->key = key();
result->bound_check_result = IterBoundCheck::kUnknown;
assert(Valid());
iter_->Prev();
valid_ = iter_->Valid();
+ VerifyEntryChecksum();
}
Slice key() const override {
assert(Valid());
return GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
}
- Status status() const override { return Status::OK(); }
+ Status status() const override { return status_; }
bool IsKeyPinned() const override {
// memtable data is always pinned
bool valid_;
bool arena_mode_;
bool value_pinned_;
+ size_t protection_bytes_per_key_;
+ Status status_;
+ Logger* logger_;
+
+ void VerifyEntryChecksum() {
+ if (protection_bytes_per_key_ > 0 && Valid()) {
+ status_ = MemTable::VerifyEntryChecksum(iter_->key(),
+ protection_bytes_per_key_);
+ if (!status_.ok()) {
+ ROCKS_LOG_ERROR(logger_, "In MemtableIterator: %s", status_.getState());
+ }
+ }
+ }
};
InternalIterator* MemTable::NewIterator(const ReadOptions& read_options,
}
FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIterator(
- const ReadOptions& read_options, SequenceNumber read_seq) {
+ const ReadOptions& read_options, SequenceNumber read_seq,
+ bool immutable_memtable) {
if (read_options.ignore_range_deletions ||
is_range_del_table_empty_.load(std::memory_order_relaxed)) {
return nullptr;
}
- auto* unfragmented_iter = new MemTableIterator(
- *this, read_options, nullptr /* arena */, true /* use_range_del_table */);
- if (unfragmented_iter == nullptr) {
- return nullptr;
+ return NewRangeTombstoneIteratorInternal(read_options, read_seq,
+ immutable_memtable);
+}
+
+FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIteratorInternal(
+ const ReadOptions& read_options, SequenceNumber read_seq,
+ bool immutable_memtable) {
+ if (immutable_memtable) {
+ // Note that caller should already have verified that
+ // !is_range_del_table_empty_
+ assert(IsFragmentedRangeTombstonesConstructed());
+ return new FragmentedRangeTombstoneIterator(
+ fragmented_range_tombstone_list_.get(), comparator_.comparator,
+ read_seq, read_options.timestamp);
}
- auto fragmented_tombstone_list =
- std::make_shared<FragmentedRangeTombstoneList>(
+
+ // takes current cache
+ std::shared_ptr<FragmentedRangeTombstoneListCache> cache =
+ std::atomic_load_explicit(cached_range_tombstone_.Access(),
+ std::memory_order_relaxed);
+ // construct fragmented tombstone list if necessary
+ if (!cache->initialized.load(std::memory_order_acquire)) {
+ cache->reader_mutex.lock();
+ if (!cache->tombstones) {
+ auto* unfragmented_iter =
+ new MemTableIterator(*this, read_options, nullptr /* arena */,
+ true /* use_range_del_table */);
+ cache->tombstones.reset(new FragmentedRangeTombstoneList(
std::unique_ptr<InternalIterator>(unfragmented_iter),
- comparator_.comparator);
+ comparator_.comparator));
+ cache->initialized.store(true, std::memory_order_release);
+ }
+ cache->reader_mutex.unlock();
+ }
auto* fragmented_iter = new FragmentedRangeTombstoneIterator(
- fragmented_tombstone_list, comparator_.comparator, read_seq);
+ cache, comparator_.comparator, read_seq, read_options.timestamp);
return fragmented_iter;
}
+void MemTable::ConstructFragmentedRangeTombstones() {
+ assert(!IsFragmentedRangeTombstonesConstructed(false));
+ // There should be no concurrent Construction
+ if (!is_range_del_table_empty_.load(std::memory_order_relaxed)) {
+ auto* unfragmented_iter =
+ new MemTableIterator(*this, ReadOptions(), nullptr /* arena */,
+ true /* use_range_del_table */);
+
+ fragmented_range_tombstone_list_ =
+ std::make_unique<FragmentedRangeTombstoneList>(
+ std::unique_ptr<InternalIterator>(unfragmented_iter),
+ comparator_.comparator);
+ }
+}
+
port::RWMutex* MemTable::GetLock(const Slice& key) {
return &locks_[GetSliceRangedNPHash(key, locks_.size())];
}
return {entry_count * (data_size / n), entry_count};
}
-bool MemTable::Add(SequenceNumber s, ValueType type,
- const Slice& key, /* user key */
- const Slice& value, bool allow_concurrent,
- MemTablePostProcessInfo* post_process_info, void** hint) {
+Status MemTable::VerifyEncodedEntry(Slice encoded,
+ const ProtectionInfoKVOS64& kv_prot_info) {
+ uint32_t ikey_len = 0;
+ if (!GetVarint32(&encoded, &ikey_len)) {
+ return Status::Corruption("Unable to parse internal key length");
+ }
+ size_t ts_sz = GetInternalKeyComparator().user_comparator()->timestamp_size();
+ if (ikey_len < 8 + ts_sz) {
+ return Status::Corruption("Internal key length too short");
+ }
+ if (ikey_len > encoded.size()) {
+ return Status::Corruption("Internal key length too long");
+ }
+ uint32_t value_len = 0;
+ const size_t user_key_len = ikey_len - 8;
+ Slice key(encoded.data(), user_key_len);
+ encoded.remove_prefix(user_key_len);
+
+ uint64_t packed = DecodeFixed64(encoded.data());
+ ValueType value_type = kMaxValue;
+ SequenceNumber sequence_number = kMaxSequenceNumber;
+ UnPackSequenceAndType(packed, &sequence_number, &value_type);
+ encoded.remove_prefix(8);
+
+ if (!GetVarint32(&encoded, &value_len)) {
+ return Status::Corruption("Unable to parse value length");
+ }
+ if (value_len < encoded.size()) {
+ return Status::Corruption("Value length too short");
+ }
+ if (value_len > encoded.size()) {
+ return Status::Corruption("Value length too long");
+ }
+ Slice value(encoded.data(), value_len);
+
+ return kv_prot_info.StripS(sequence_number)
+ .StripKVO(key, value, value_type)
+ .GetStatus();
+}
+
+void MemTable::UpdateEntryChecksum(const ProtectionInfoKVOS64* kv_prot_info,
+ const Slice& key, const Slice& value,
+ ValueType type, SequenceNumber s,
+ char* checksum_ptr) {
+ if (moptions_.protection_bytes_per_key == 0) {
+ return;
+ }
+
+ uint64_t checksum = 0;
+ if (kv_prot_info == nullptr) {
+ checksum =
+ ProtectionInfo64().ProtectKVO(key, value, type).ProtectS(s).GetVal();
+ } else {
+ checksum = kv_prot_info->GetVal();
+ }
+ switch (moptions_.protection_bytes_per_key) {
+ case 1:
+ checksum_ptr[0] = static_cast<uint8_t>(checksum);
+ break;
+ case 2:
+ EncodeFixed16(checksum_ptr, static_cast<uint16_t>(checksum));
+ break;
+ case 4:
+ EncodeFixed32(checksum_ptr, static_cast<uint32_t>(checksum));
+ break;
+ case 8:
+ EncodeFixed64(checksum_ptr, checksum);
+ break;
+ default:
+ assert(false);
+ }
+}
+
+Status MemTable::Add(SequenceNumber s, ValueType type,
+ const Slice& key, /* user key */
+ const Slice& value,
+ const ProtectionInfoKVOS64* kv_prot_info,
+ bool allow_concurrent,
+ MemTablePostProcessInfo* post_process_info, void** hint) {
// Format of an entry is concatenation of:
// key_size : varint32 of internal_key.size()
// key bytes : char[internal_key.size()]
// value_size : varint32 of value.size()
// value bytes : char[value.size()]
+ // checksum : char[moptions_.protection_bytes_per_key]
uint32_t key_size = static_cast<uint32_t>(key.size());
uint32_t val_size = static_cast<uint32_t>(value.size());
uint32_t internal_key_size = key_size + 8;
const uint32_t encoded_len = VarintLength(internal_key_size) +
internal_key_size + VarintLength(val_size) +
- val_size;
+ val_size + moptions_.protection_bytes_per_key;
char* buf = nullptr;
std::unique_ptr<MemTableRep>& table =
type == kTypeRangeDeletion ? range_del_table_ : table_;
p += 8;
p = EncodeVarint32(p, val_size);
memcpy(p, value.data(), val_size);
- assert((unsigned)(p + val_size - buf) == (unsigned)encoded_len);
+ assert((unsigned)(p + val_size - buf + moptions_.protection_bytes_per_key) ==
+ (unsigned)encoded_len);
+
+ UpdateEntryChecksum(kv_prot_info, key, value, type, s,
+ buf + encoded_len - moptions_.protection_bytes_per_key);
+ Slice encoded(buf, encoded_len - moptions_.protection_bytes_per_key);
+ if (kv_prot_info != nullptr) {
+ TEST_SYNC_POINT_CALLBACK("MemTable::Add:Encoded", &encoded);
+ Status status = VerifyEncodedEntry(encoded, *kv_prot_info);
+ if (!status.ok()) {
+ return status;
+ }
+ }
+
size_t ts_sz = GetInternalKeyComparator().user_comparator()->timestamp_size();
+ Slice key_without_ts = StripTimestampFromUserKey(key, ts_sz);
if (!allow_concurrent) {
// Extract prefix for insert with hint.
Slice prefix = insert_with_hint_prefix_extractor_->Transform(key_slice);
bool res = table->InsertKeyWithHint(handle, &insert_hints_[prefix]);
if (UNLIKELY(!res)) {
- return res;
+ return Status::TryAgain("key+seq exists");
}
} else {
bool res = table->InsertKey(handle);
if (UNLIKELY(!res)) {
- return res;
+ return Status::TryAgain("key+seq exists");
}
}
std::memory_order_relaxed);
data_size_.store(data_size_.load(std::memory_order_relaxed) + encoded_len,
std::memory_order_relaxed);
- if (type == kTypeDeletion) {
+ if (type == kTypeDeletion || type == kTypeSingleDeletion ||
+ type == kTypeDeletionWithTimestamp) {
num_deletes_.store(num_deletes_.load(std::memory_order_relaxed) + 1,
std::memory_order_relaxed);
}
if (bloom_filter_ && prefix_extractor_ &&
- prefix_extractor_->InDomain(key)) {
- bloom_filter_->Add(
- prefix_extractor_->Transform(StripTimestampFromUserKey(key, ts_sz)));
+ prefix_extractor_->InDomain(key_without_ts)) {
+ bloom_filter_->Add(prefix_extractor_->Transform(key_without_ts));
}
if (bloom_filter_ && moptions_.memtable_whole_key_filtering) {
- bloom_filter_->Add(StripTimestampFromUserKey(key, ts_sz));
+ bloom_filter_->Add(key_without_ts);
}
// The first sequence number inserted into the memtable
? table->InsertKeyConcurrently(handle)
: table->InsertKeyWithHintConcurrently(handle, hint);
if (UNLIKELY(!res)) {
- return res;
+ return Status::TryAgain("key+seq exists");
}
assert(post_process_info != nullptr);
}
if (bloom_filter_ && prefix_extractor_ &&
- prefix_extractor_->InDomain(key)) {
- bloom_filter_->AddConcurrently(prefix_extractor_->Transform(key));
+ prefix_extractor_->InDomain(key_without_ts)) {
+ bloom_filter_->AddConcurrently(
+ prefix_extractor_->Transform(key_without_ts));
}
if (bloom_filter_ && moptions_.memtable_whole_key_filtering) {
- bloom_filter_->AddConcurrently(StripTimestampFromUserKey(key, ts_sz));
+ bloom_filter_->AddConcurrently(key_without_ts);
}
// atomically update first_seqno_ and earliest_seqno_.
}
}
if (type == kTypeRangeDeletion) {
+ auto new_cache = std::make_shared<FragmentedRangeTombstoneListCache>();
+ size_t size = cached_range_tombstone_.Size();
+ if (allow_concurrent) {
+ range_del_mutex_.lock();
+ }
+ for (size_t i = 0; i < size; ++i) {
+ std::shared_ptr<FragmentedRangeTombstoneListCache>* local_cache_ref_ptr =
+ cached_range_tombstone_.AccessAtCore(i);
+ auto new_local_cache_ref = std::make_shared<
+ const std::shared_ptr<FragmentedRangeTombstoneListCache>>(new_cache);
+ // It is okay for some reader to load old cache during invalidation as
+ // the new sequence number is not published yet.
+ // Each core will have a shared_ptr to a shared_ptr to the cached
+ // fragmented range tombstones, so that ref count is maintianed locally
+ // per-core using the per-core shared_ptr.
+ std::atomic_store_explicit(
+ local_cache_ref_ptr,
+ std::shared_ptr<FragmentedRangeTombstoneListCache>(
+ new_local_cache_ref, new_cache.get()),
+ std::memory_order_relaxed);
+ }
+ if (allow_concurrent) {
+ range_del_mutex_.unlock();
+ }
is_range_del_table_empty_.store(false, std::memory_order_relaxed);
}
UpdateOldestKeyTime();
- return true;
+
+ TEST_SYNC_POINT_CALLBACK("MemTable::Add:BeforeReturn:Encoded", &encoded);
+ return Status::OK();
}
// Callback from MemTable::Get()
bool* found_final_value; // Is value set correctly? Used by KeyMayExist
bool* merge_in_progress;
std::string* value;
+ PinnableWideColumns* columns;
SequenceNumber seq;
std::string* timestamp;
const MergeOperator* merge_operator;
Statistics* statistics;
bool inplace_update_support;
bool do_merge;
- Env* env_;
+ SystemClock* clock;
+
ReadCallback* callback_;
bool* is_blob_index;
bool allow_data_in_errors;
+ size_t protection_bytes_per_key;
bool CheckCallback(SequenceNumber _seq) {
if (callback_) {
return callback_->IsVisible(_seq);
return true;
}
};
-} // namespace
+} // anonymous namespace
static bool SaveValue(void* arg, const char* entry) {
+ TEST_SYNC_POINT_CALLBACK("Memtable::SaveValue:Begin:entry", &entry);
Saver* s = reinterpret_cast<Saver*>(arg);
assert(s != nullptr);
+ assert(!s->value || !s->columns);
+
+ if (s->protection_bytes_per_key > 0) {
+ *(s->status) = MemTable::VerifyEntryChecksum(
+ entry, s->protection_bytes_per_key, s->allow_data_in_errors);
+ if (!s->status->ok()) {
+ ROCKS_LOG_ERROR(s->logger, "In SaveValue: %s", s->status->getState());
+ // Memtable entry corrupted
+ return false;
+ }
+ }
+
MergeContext* merge_context = s->merge_context;
SequenceNumber max_covering_tombstone_seq = s->max_covering_tombstone_seq;
const MergeOperator* merge_operator = s->merge_operator;
assert(merge_context != nullptr);
- // entry format is:
- // klength varint32
- // userkey char[klength-8]
- // tag uint64
- // vlength varint32f
- // value char[vlength]
- // Check that it belongs to same user key. We do not check the
- // sequence number since the Seek() call above should have skipped
- // all entries with overly large sequence numbers.
+ // Refer to comments under MemTable::Add() for entry format.
+ // Check that it belongs to same user key.
uint32_t key_length = 0;
const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
assert(key_length >= 8);
const Comparator* user_comparator =
s->mem->GetInternalKeyComparator().user_comparator();
size_t ts_sz = user_comparator->timestamp_size();
- if (user_comparator->CompareWithoutTimestamp(user_key_slice,
- s->key->user_key()) == 0) {
+ if (ts_sz && s->timestamp && max_covering_tombstone_seq > 0) {
+ // timestamp should already be set to range tombstone timestamp
+ assert(s->timestamp->size() == ts_sz);
+ }
+ if (user_comparator->EqualWithoutTimestamp(user_key_slice,
+ s->key->user_key())) {
// Correct user key
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
ValueType type;
return true; // to continue to the next seq
}
- s->seq = seq;
+ if (s->seq == kMaxSequenceNumber) {
+ s->seq = seq;
+ if (s->seq > max_covering_tombstone_seq) {
+ if (ts_sz && s->timestamp != nullptr) {
+ // `timestamp` was set to range tombstone's timestamp before
+ // `SaveValue` is ever called. This key has a higher sequence number
+ // than range tombstone, and is the key with the highest seqno across
+ // all keys with this user_key, so we update timestamp here.
+ Slice ts = ExtractTimestampFromUserKey(user_key_slice, ts_sz);
+ s->timestamp->assign(ts.data(), ts_sz);
+ }
+ } else {
+ s->seq = max_covering_tombstone_seq;
+ }
+ }
+
+ if (ts_sz > 0 && s->timestamp != nullptr) {
+ if (!s->timestamp->empty()) {
+ assert(ts_sz == s->timestamp->size());
+ }
+ // TODO optimize for smaller size ts
+ const std::string kMaxTs(ts_sz, '\xff');
+ if (s->timestamp->empty() ||
+ user_comparator->CompareTimestamp(*(s->timestamp), kMaxTs) == 0) {
+ Slice ts = ExtractTimestampFromUserKey(user_key_slice, ts_sz);
+ s->timestamp->assign(ts.data(), ts_sz);
+ }
+ }
- if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex) &&
+ if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex ||
+ type == kTypeWideColumnEntity || type == kTypeDeletion ||
+ type == kTypeSingleDeletion || type == kTypeDeletionWithTimestamp) &&
max_covering_tombstone_seq > seq) {
type = kTypeRangeDeletion;
}
switch (type) {
- case kTypeBlobIndex:
- if (s->is_blob_index == nullptr) {
- ROCKS_LOG_ERROR(s->logger, "Encounter unexpected blob index.");
+ case kTypeBlobIndex: {
+ if (!s->do_merge) {
*(s->status) = Status::NotSupported(
- "Encounter unsupported blob value. Please open DB with "
- "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
- } else if (*(s->merge_in_progress)) {
- *(s->status) =
- Status::NotSupported("Blob DB does not support merge operator.");
+ "GetMergeOperands not supported by stacked BlobDB");
+ *(s->found_final_value) = true;
+ return false;
+ }
+
+ if (*(s->merge_in_progress)) {
+ *(s->status) = Status::NotSupported(
+ "Merge operator not supported by stacked BlobDB");
+ *(s->found_final_value) = true;
+ return false;
}
- if (!s->status->ok()) {
+
+ if (s->is_blob_index == nullptr) {
+ ROCKS_LOG_ERROR(s->logger, "Encountered unexpected blob index.");
+ *(s->status) = Status::NotSupported(
+ "Encountered unexpected blob index. Please open DB with "
+ "ROCKSDB_NAMESPACE::blob_db::BlobDB.");
*(s->found_final_value) = true;
return false;
}
- FALLTHROUGH_INTENDED;
+
+ if (s->inplace_update_support) {
+ s->mem->GetLock(s->key->user_key())->ReadLock();
+ }
+
+ Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
+
+ *(s->status) = Status::OK();
+
+ if (s->value) {
+ s->value->assign(v.data(), v.size());
+ } else if (s->columns) {
+ s->columns->SetPlainValue(v);
+ }
+
+ if (s->inplace_update_support) {
+ s->mem->GetLock(s->key->user_key())->ReadUnlock();
+ }
+
+ *(s->found_final_value) = true;
+ *(s->is_blob_index) = true;
+
+ return false;
+ }
case kTypeValue: {
if (s->inplace_update_support) {
s->mem->GetLock(s->key->user_key())->ReadLock();
}
+
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
+
*(s->status) = Status::OK();
- if (*(s->merge_in_progress)) {
- if (s->do_merge) {
- if (s->value != nullptr) {
- *(s->status) = MergeHelper::TimedFullMerge(
- merge_operator, s->key->user_key(), &v,
- merge_context->GetOperands(), s->value, s->logger,
- s->statistics, s->env_, nullptr /* result_operand */, true);
- }
- } else {
- // Preserve the value with the goal of returning it as part of
- // raw merge operands to the user
- merge_context->PushOperand(
- v, s->inplace_update_support == false /* operand_pinned */);
- }
- } else if (!s->do_merge) {
+
+ if (!s->do_merge) {
// Preserve the value with the goal of returning it as part of
// raw merge operands to the user
+ // TODO(yanqin) update MergeContext so that timestamps information
+ // can also be retained.
+
merge_context->PushOperand(
v, s->inplace_update_support == false /* operand_pinned */);
- } else if (s->value != nullptr) {
+ } else if (*(s->merge_in_progress)) {
+ assert(s->do_merge);
+
+ if (s->value || s->columns) {
+ std::string result;
+ *(s->status) = MergeHelper::TimedFullMerge(
+ merge_operator, s->key->user_key(), &v,
+ merge_context->GetOperands(), &result, s->logger, s->statistics,
+ s->clock, /* result_operand */ nullptr,
+ /* update_num_ops_stats */ true);
+
+ if (s->status->ok()) {
+ if (s->value) {
+ *(s->value) = std::move(result);
+ } else {
+ assert(s->columns);
+ s->columns->SetPlainValue(result);
+ }
+ }
+ }
+ } else if (s->value) {
s->value->assign(v.data(), v.size());
+ } else if (s->columns) {
+ s->columns->SetPlainValue(v);
}
+
if (s->inplace_update_support) {
s->mem->GetLock(s->key->user_key())->ReadUnlock();
}
+
*(s->found_final_value) = true;
+
if (s->is_blob_index != nullptr) {
- *(s->is_blob_index) = (type == kTypeBlobIndex);
+ *(s->is_blob_index) = false;
}
- if (ts_sz > 0 && s->timestamp != nullptr) {
- Slice ts = ExtractTimestampFromUserKey(user_key_slice, ts_sz);
- s->timestamp->assign(ts.data(), ts.size());
+ return false;
+ }
+ case kTypeWideColumnEntity: {
+ if (s->inplace_update_support) {
+ s->mem->GetLock(s->key->user_key())->ReadLock();
+ }
+
+ Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
+
+ *(s->status) = Status::OK();
+
+ if (!s->do_merge) {
+ // Preserve the value with the goal of returning it as part of
+ // raw merge operands to the user
+
+ Slice value_of_default;
+ *(s->status) = WideColumnSerialization::GetValueOfDefaultColumn(
+ v, value_of_default);
+
+ if (s->status->ok()) {
+ merge_context->PushOperand(
+ value_of_default,
+ s->inplace_update_support == false /* operand_pinned */);
+ }
+ } else if (*(s->merge_in_progress)) {
+ assert(s->do_merge);
+
+ if (s->value) {
+ Slice value_of_default;
+ *(s->status) = WideColumnSerialization::GetValueOfDefaultColumn(
+ v, value_of_default);
+ if (s->status->ok()) {
+ *(s->status) = MergeHelper::TimedFullMerge(
+ merge_operator, s->key->user_key(), &value_of_default,
+ merge_context->GetOperands(), s->value, s->logger,
+ s->statistics, s->clock, /* result_operand */ nullptr,
+ /* update_num_ops_stats */ true);
+ }
+ } else if (s->columns) {
+ std::string result;
+ *(s->status) = MergeHelper::TimedFullMergeWithEntity(
+ merge_operator, s->key->user_key(), v,
+ merge_context->GetOperands(), &result, s->logger, s->statistics,
+ s->clock, /* update_num_ops_stats */ true);
+
+ if (s->status->ok()) {
+ *(s->status) = s->columns->SetWideColumnValue(result);
+ }
+ }
+ } else if (s->value) {
+ Slice value_of_default;
+ *(s->status) = WideColumnSerialization::GetValueOfDefaultColumn(
+ v, value_of_default);
+ if (s->status->ok()) {
+ s->value->assign(value_of_default.data(), value_of_default.size());
+ }
+ } else if (s->columns) {
+ *(s->status) = s->columns->SetWideColumnValue(v);
+ }
+
+ if (s->inplace_update_support) {
+ s->mem->GetLock(s->key->user_key())->ReadUnlock();
+ }
+
+ *(s->found_final_value) = true;
+
+ if (s->is_blob_index != nullptr) {
+ *(s->is_blob_index) = false;
}
+
return false;
}
case kTypeDeletion:
case kTypeSingleDeletion:
case kTypeRangeDeletion: {
if (*(s->merge_in_progress)) {
- if (s->value != nullptr) {
+ if (s->value || s->columns) {
+ std::string result;
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), nullptr,
- merge_context->GetOperands(), s->value, s->logger,
- s->statistics, s->env_, nullptr /* result_operand */, true);
+ merge_context->GetOperands(), &result, s->logger, s->statistics,
+ s->clock, /* result_operand */ nullptr,
+ /* update_num_ops_stats */ true);
+
+ if (s->status->ok()) {
+ if (s->value) {
+ *(s->value) = std::move(result);
+ } else {
+ assert(s->columns);
+ s->columns->SetPlainValue(result);
+ }
+ }
}
} else {
*(s->status) = Status::NotFound();
v, s->inplace_update_support == false /* operand_pinned */);
if (s->do_merge && merge_operator->ShouldMerge(
merge_context->GetOperandsDirectionBackward())) {
- *(s->status) = MergeHelper::TimedFullMerge(
- merge_operator, s->key->user_key(), nullptr,
- merge_context->GetOperands(), s->value, s->logger, s->statistics,
- s->env_, nullptr /* result_operand */, true);
+ if (s->value || s->columns) {
+ std::string result;
+ *(s->status) = MergeHelper::TimedFullMerge(
+ merge_operator, s->key->user_key(), nullptr,
+ merge_context->GetOperands(), &result, s->logger, s->statistics,
+ s->clock, /* result_operand */ nullptr,
+ /* update_num_ops_stats */ true);
+
+ if (s->status->ok()) {
+ if (s->value) {
+ *(s->value) = std::move(result);
+ } else {
+ assert(s->columns);
+ s->columns->SetPlainValue(result);
+ }
+ }
+ }
+
*(s->found_final_value) = true;
return false;
}
}
bool MemTable::Get(const LookupKey& key, std::string* value,
- std::string* timestamp, Status* s,
- MergeContext* merge_context,
+ PinnableWideColumns* columns, std::string* timestamp,
+ Status* s, MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq,
SequenceNumber* seq, const ReadOptions& read_opts,
- ReadCallback* callback, bool* is_blob_index, bool do_merge) {
+ bool immutable_memtable, ReadCallback* callback,
+ bool* is_blob_index, bool do_merge) {
// The sequence number is updated synchronously in version_set.h
if (IsEmpty()) {
// Avoiding recording stats for speed.
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
NewRangeTombstoneIterator(read_opts,
- GetInternalKeySeqno(key.internal_key())));
+ GetInternalKeySeqno(key.internal_key()),
+ immutable_memtable));
if (range_del_iter != nullptr) {
- *max_covering_tombstone_seq =
- std::max(*max_covering_tombstone_seq,
- range_del_iter->MaxCoveringTombstoneSeqnum(key.user_key()));
+ SequenceNumber covering_seq =
+ range_del_iter->MaxCoveringTombstoneSeqnum(key.user_key());
+ if (covering_seq > *max_covering_tombstone_seq) {
+ *max_covering_tombstone_seq = covering_seq;
+ if (timestamp) {
+ // Will be overwritten in SaveValue() if there is a point key with
+ // a higher seqno.
+ timestamp->assign(range_del_iter->timestamp().data(),
+ range_del_iter->timestamp().size());
+ }
+ }
}
- Slice user_key = key.user_key();
bool found_final_value = false;
bool merge_in_progress = s->IsMergeInProgress();
bool may_contain = true;
size_t ts_sz = GetInternalKeyComparator().user_comparator()->timestamp_size();
+ Slice user_key_without_ts = StripTimestampFromUserKey(key.user_key(), ts_sz);
+ bool bloom_checked = false;
if (bloom_filter_) {
// when both memtable_whole_key_filtering and prefix_extractor_ are set,
// only do whole key filtering for Get() to save CPU
if (moptions_.memtable_whole_key_filtering) {
- may_contain =
- bloom_filter_->MayContain(StripTimestampFromUserKey(user_key, ts_sz));
+ may_contain = bloom_filter_->MayContain(user_key_without_ts);
+ bloom_checked = true;
} else {
assert(prefix_extractor_);
- may_contain =
- !prefix_extractor_->InDomain(user_key) ||
- bloom_filter_->MayContain(prefix_extractor_->Transform(user_key));
+ if (prefix_extractor_->InDomain(user_key_without_ts)) {
+ may_contain = bloom_filter_->MayContain(
+ prefix_extractor_->Transform(user_key_without_ts));
+ bloom_checked = true;
+ }
}
}
PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
*seq = kMaxSequenceNumber;
} else {
- if (bloom_filter_) {
+ if (bloom_checked) {
PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
}
GetFromTable(key, *max_covering_tombstone_seq, do_merge, callback,
- is_blob_index, value, timestamp, s, merge_context, seq,
- &found_final_value, &merge_in_progress);
+ is_blob_index, value, columns, timestamp, s, merge_context,
+ seq, &found_final_value, &merge_in_progress);
}
// No change to value, since we have not yet found a Put/Delete
- if (!found_final_value && merge_in_progress) {
+ // Propagate corruption error
+ if (!found_final_value && merge_in_progress && !s->IsCorruption()) {
*s = Status::MergeInProgress();
}
PERF_COUNTER_ADD(get_from_memtable_count, 1);
SequenceNumber max_covering_tombstone_seq,
bool do_merge, ReadCallback* callback,
bool* is_blob_index, std::string* value,
+ PinnableWideColumns* columns,
std::string* timestamp, Status* s,
MergeContext* merge_context, SequenceNumber* seq,
bool* found_final_value, bool* merge_in_progress) {
saver.merge_in_progress = merge_in_progress;
saver.key = &key;
saver.value = value;
+ saver.columns = columns;
saver.timestamp = timestamp;
saver.seq = kMaxSequenceNumber;
saver.mem = this;
saver.logger = moptions_.info_log;
saver.inplace_update_support = moptions_.inplace_update_support;
saver.statistics = moptions_.statistics;
- saver.env_ = env_;
+ saver.clock = clock_;
saver.callback_ = callback;
saver.is_blob_index = is_blob_index;
saver.do_merge = do_merge;
saver.allow_data_in_errors = moptions_.allow_data_in_errors;
+ saver.protection_bytes_per_key = moptions_.protection_bytes_per_key;
table_->Get(key, &saver, SaveValue);
*seq = saver.seq;
}
void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
- ReadCallback* callback, bool* is_blob) {
+ ReadCallback* callback, bool immutable_memtable) {
// The sequence number is updated synchronously in version_set.h
if (IsEmpty()) {
// Avoiding recording stats for speed.
}
PERF_TIMER_GUARD(get_from_memtable_time);
+ // For now, memtable Bloom filter is effectively disabled if there are any
+ // range tombstones. This is the simplest way to ensure range tombstones are
+ // handled. TODO: allow Bloom checks where max_covering_tombstone_seq==0
+ bool no_range_del = read_options.ignore_range_deletions ||
+ is_range_del_table_empty_.load(std::memory_order_relaxed);
MultiGetRange temp_range(*range, range->begin(), range->end());
- if (bloom_filter_) {
- std::array<Slice*, MultiGetContext::MAX_BATCH_SIZE> keys;
- std::array<bool, MultiGetContext::MAX_BATCH_SIZE> may_match = {{true}};
- autovector<Slice, MultiGetContext::MAX_BATCH_SIZE> prefixes;
+ if (bloom_filter_ && no_range_del) {
+ bool whole_key =
+ !prefix_extractor_ || moptions_.memtable_whole_key_filtering;
+ std::array<Slice, MultiGetContext::MAX_BATCH_SIZE> bloom_keys;
+ std::array<bool, MultiGetContext::MAX_BATCH_SIZE> may_match;
+ std::array<size_t, MultiGetContext::MAX_BATCH_SIZE> range_indexes;
int num_keys = 0;
for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) {
- if (!prefix_extractor_) {
- keys[num_keys++] = &iter->ukey_without_ts;
+ if (whole_key) {
+ bloom_keys[num_keys] = iter->ukey_without_ts;
+ range_indexes[num_keys++] = iter.index();
} else if (prefix_extractor_->InDomain(iter->ukey_without_ts)) {
- prefixes.emplace_back(
- prefix_extractor_->Transform(iter->ukey_without_ts));
- keys[num_keys++] = &prefixes.back();
+ bloom_keys[num_keys] =
+ prefix_extractor_->Transform(iter->ukey_without_ts);
+ range_indexes[num_keys++] = iter.index();
}
}
- bloom_filter_->MayContain(num_keys, &keys[0], &may_match[0]);
- int idx = 0;
- for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) {
- if (prefix_extractor_ &&
- !prefix_extractor_->InDomain(iter->ukey_without_ts)) {
- PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
- continue;
- }
- if (!may_match[idx]) {
- temp_range.SkipKey(iter);
+ bloom_filter_->MayContain(num_keys, &bloom_keys[0], &may_match[0]);
+ for (int i = 0; i < num_keys; ++i) {
+ if (!may_match[i]) {
+ temp_range.SkipIndex(range_indexes[i]);
PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
} else {
PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
}
- idx++;
}
}
for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) {
- SequenceNumber seq = kMaxSequenceNumber;
bool found_final_value{false};
bool merge_in_progress = iter->s->IsMergeInProgress();
- std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
- NewRangeTombstoneIterator(
- read_options, GetInternalKeySeqno(iter->lkey->internal_key())));
- if (range_del_iter != nullptr) {
- iter->max_covering_tombstone_seq = std::max(
- iter->max_covering_tombstone_seq,
- range_del_iter->MaxCoveringTombstoneSeqnum(iter->lkey->user_key()));
+ if (!no_range_del) {
+ std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
+ NewRangeTombstoneIteratorInternal(
+ read_options, GetInternalKeySeqno(iter->lkey->internal_key()),
+ immutable_memtable));
+ SequenceNumber covering_seq =
+ range_del_iter->MaxCoveringTombstoneSeqnum(iter->lkey->user_key());
+ if (covering_seq > iter->max_covering_tombstone_seq) {
+ iter->max_covering_tombstone_seq = covering_seq;
+ if (iter->timestamp) {
+ // Will be overwritten in SaveValue() if there is a point key with
+ // a higher seqno.
+ iter->timestamp->assign(range_del_iter->timestamp().data(),
+ range_del_iter->timestamp().size());
+ }
+ }
}
+ SequenceNumber dummy_seq;
GetFromTable(*(iter->lkey), iter->max_covering_tombstone_seq, true,
- callback, is_blob, iter->value->GetSelf(), iter->timestamp,
- iter->s, &(iter->merge_context), &seq, &found_final_value,
+ callback, &iter->is_blob_index, iter->value->GetSelf(),
+ /*columns=*/nullptr, iter->timestamp, iter->s,
+ &(iter->merge_context), &dummy_seq, &found_final_value,
&merge_in_progress);
if (!found_final_value && merge_in_progress) {
PERF_COUNTER_ADD(get_from_memtable_count, 1);
}
-void MemTable::Update(SequenceNumber seq,
- const Slice& key,
- const Slice& value) {
+Status MemTable::Update(SequenceNumber seq, ValueType value_type,
+ const Slice& key, const Slice& value,
+ const ProtectionInfoKVOS64* kv_prot_info) {
LookupKey lkey(key, seq);
Slice mem_key = lkey.memtable_key();
iter->Seek(lkey.internal_key(), mem_key.data());
if (iter->Valid()) {
- // entry format is:
- // key_length varint32
- // userkey char[klength-8]
- // tag uint64
- // vlength varint32
- // value char[vlength]
+ // Refer to comments under MemTable::Add() for entry format.
// Check that it belongs to same user key. We do not check the
// sequence number since the Seek() call above should have skipped
// all entries with overly large sequence numbers.
SequenceNumber existing_seq;
UnPackSequenceAndType(tag, &existing_seq, &type);
assert(existing_seq != seq);
- if (type == kTypeValue) {
+ if (type == value_type) {
Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
uint32_t new_size = static_cast<uint32_t>(value.size());
(unsigned)(VarintLength(key_length) + key_length +
VarintLength(value.size()) + value.size()));
RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED);
- return;
+ if (kv_prot_info != nullptr) {
+ ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info);
+ // `seq` is swallowed and `existing_seq` prevails.
+ updated_kv_prot_info.UpdateS(seq, existing_seq);
+ UpdateEntryChecksum(&updated_kv_prot_info, key, value, type,
+ existing_seq, p + value.size());
+ Slice encoded(entry, p + value.size() - entry);
+ return VerifyEncodedEntry(encoded, updated_kv_prot_info);
+ } else {
+ UpdateEntryChecksum(nullptr, key, value, type, existing_seq,
+ p + value.size());
+ }
+ return Status::OK();
}
}
}
}
- // key doesn't exist
- bool add_res __attribute__((__unused__));
- add_res = Add(seq, kTypeValue, key, value);
- // We already checked unused != seq above. In that case, Add should not fail.
- assert(add_res);
+ // The latest value is not value_type or key doesn't exist
+ return Add(seq, value_type, key, value, kv_prot_info);
}
-bool MemTable::UpdateCallback(SequenceNumber seq,
- const Slice& key,
- const Slice& delta) {
+Status MemTable::UpdateCallback(SequenceNumber seq, const Slice& key,
+ const Slice& delta,
+ const ProtectionInfoKVOS64* kv_prot_info) {
LookupKey lkey(key, seq);
Slice memkey = lkey.memtable_key();
iter->Seek(lkey.internal_key(), memkey.data());
if (iter->Valid()) {
- // entry format is:
- // key_length varint32
- // userkey char[klength-8]
- // tag uint64
- // vlength varint32
- // value char[vlength]
+ // Refer to comments under MemTable::Add() for entry format.
// Check that it belongs to same user key. We do not check the
// sequence number since the Seek() call above should have skipped
// all entries with overly large sequence numbers.
// Correct user key
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
ValueType type;
- uint64_t unused;
- UnPackSequenceAndType(tag, &unused, &type);
- switch (type) {
- case kTypeValue: {
- Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
- uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
-
- char* prev_buffer = const_cast<char*>(prev_value.data());
- uint32_t new_prev_size = prev_size;
+ uint64_t existing_seq;
+ UnPackSequenceAndType(tag, &existing_seq, &type);
+ if (type == kTypeValue) {
+ Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
+ uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
- std::string str_value;
- WriteLock wl(GetLock(lkey.user_key()));
- auto status = moptions_.inplace_callback(prev_buffer, &new_prev_size,
- delta, &str_value);
- if (status == UpdateStatus::UPDATED_INPLACE) {
- // Value already updated by callback.
- assert(new_prev_size <= prev_size);
- if (new_prev_size < prev_size) {
- // overwrite the new prev_size
- char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length,
- new_prev_size);
- if (VarintLength(new_prev_size) < VarintLength(prev_size)) {
- // shift the value buffer as well.
- memcpy(p, prev_buffer, new_prev_size);
- }
+ char* prev_buffer = const_cast<char*>(prev_value.data());
+ uint32_t new_prev_size = prev_size;
+
+ std::string str_value;
+ WriteLock wl(GetLock(lkey.user_key()));
+ auto status = moptions_.inplace_callback(prev_buffer, &new_prev_size,
+ delta, &str_value);
+ if (status == UpdateStatus::UPDATED_INPLACE) {
+ // Value already updated by callback.
+ assert(new_prev_size <= prev_size);
+ if (new_prev_size < prev_size) {
+ // overwrite the new prev_size
+ char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length,
+ new_prev_size);
+ if (VarintLength(new_prev_size) < VarintLength(prev_size)) {
+ // shift the value buffer as well.
+ memcpy(p, prev_buffer, new_prev_size);
+ prev_buffer = p;
}
- RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED);
- UpdateFlushState();
- return true;
- } else if (status == UpdateStatus::UPDATED) {
- Add(seq, kTypeValue, key, Slice(str_value));
- RecordTick(moptions_.statistics, NUMBER_KEYS_WRITTEN);
- UpdateFlushState();
- return true;
- } else if (status == UpdateStatus::UPDATE_FAILED) {
- // No action required. Return.
- UpdateFlushState();
- return true;
}
+ RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED);
+ UpdateFlushState();
+ Slice new_value(prev_buffer, new_prev_size);
+ if (kv_prot_info != nullptr) {
+ ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info);
+ // `seq` is swallowed and `existing_seq` prevails.
+ updated_kv_prot_info.UpdateS(seq, existing_seq);
+ updated_kv_prot_info.UpdateV(delta, new_value);
+ Slice encoded(entry, prev_buffer + new_prev_size - entry);
+ UpdateEntryChecksum(&updated_kv_prot_info, key, new_value, type,
+ existing_seq, prev_buffer + new_prev_size);
+ return VerifyEncodedEntry(encoded, updated_kv_prot_info);
+ } else {
+ UpdateEntryChecksum(nullptr, key, new_value, type, existing_seq,
+ prev_buffer + new_prev_size);
+ }
+ return Status::OK();
+ } else if (status == UpdateStatus::UPDATED) {
+ Status s;
+ if (kv_prot_info != nullptr) {
+ ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info);
+ updated_kv_prot_info.UpdateV(delta, str_value);
+ s = Add(seq, kTypeValue, key, Slice(str_value),
+ &updated_kv_prot_info);
+ } else {
+ s = Add(seq, kTypeValue, key, Slice(str_value),
+ nullptr /* kv_prot_info */);
+ }
+ RecordTick(moptions_.statistics, NUMBER_KEYS_WRITTEN);
+ UpdateFlushState();
+ return s;
+ } else if (status == UpdateStatus::UPDATE_FAILED) {
+ // `UPDATE_FAILED` is named incorrectly. It indicates no update
+ // happened. It does not indicate a failure happened.
+ UpdateFlushState();
+ return Status::OK();
}
- default:
- break;
}
}
}
- // If the latest value is not kTypeValue
- // or key doesn't exist
- return false;
+ // The latest value is not `kTypeValue` or key doesn't exist
+ return Status::NotFound();
}
size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) {