// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
#include <atomic>
#include <cstdlib>
#include <functional>
+#include <memory>
#include "db/db_test_util.h"
#include "db/read_callback.h"
#include "options/options_helper.h"
#include "port/port.h"
#include "port/stack_trace.h"
+#include "rocksdb/experimental.h"
+#include "rocksdb/iostats_context.h"
#include "rocksdb/persistent_cache.h"
+#include "rocksdb/trace_record.h"
+#include "rocksdb/trace_record_result.h"
+#include "rocksdb/utilities/replayer.h"
#include "rocksdb/wal_filter.h"
+#include "test_util/testutil.h"
#include "util/random.h"
#include "utilities/fault_injection_env.h"
class DBTest2 : public DBTestBase {
public:
- DBTest2() : DBTestBase("/db_test2", /*env_do_fsync=*/true) {}
+ DBTest2() : DBTestBase("db_test2", /*env_do_fsync=*/true) {}
};
#ifndef ROCKSDB_LITE
std::vector<std::string> files;
ASSERT_OK(env_->GetChildren(dbname, &files));
for (auto& f : files) {
- if (f != "." && f != "..") {
- ASSERT_OK(env_->DeleteFile(dbname + "/" + f));
- }
+ ASSERT_OK(env_->DeleteFile(dbname + "/" + f));
}
// <dbname> should be empty now and we should be able to delete it
ASSERT_OK(env_->DeleteDir(dbname));
std::vector<std::string> files;
ASSERT_OK(env_->GetChildren(dbname, &files));
for (auto& f : files) {
- if (f != "." && f != "..") {
- ASSERT_OK(env_->DeleteFile(dbname + "/" + f));
- }
+ ASSERT_OK(env_->DeleteFile(dbname + "/" + f));
}
// <dbname> should be empty now and we should be able to delete it
ASSERT_OK(env_->DeleteDir(dbname));
public testing::WithParamInterface<std::tuple<int, bool>> {
public:
TestReadOnlyWithCompressedCache()
- : DBTestBase("/test_readonly_with_compressed_cache",
+ : DBTestBase("test_readonly_with_compressed_cache",
/*env_do_fsync=*/true) {
max_open_files_ = std::get<0>(GetParam());
use_mmap_ = std::get<1>(GetParam());
};
TEST_F(DBTest2, PartitionedIndexUserToInternalKey) {
+ const int kValueSize = 10500;
+ const int kNumEntriesPerFile = 1000;
+ const int kNumFiles = 3;
+ const int kNumDistinctKeys = 30;
+
BlockBasedTableOptions table_options;
Options options = CurrentOptions();
+ options.disable_auto_compactions = true;
table_options.index_type = BlockBasedTableOptions::kTwoLevelIndexSearch;
PartitionedIndexTestListener* listener = new PartitionedIndexTestListener();
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Reopen(options);
Random rnd(301);
- for (int i = 0; i < 3000; i++) {
- int j = i % 30;
- std::string value = rnd.RandomString(10500);
- ASSERT_OK(Put("keykey_" + std::to_string(j), value));
- snapshots.push_back(db_->GetSnapshot());
+ for (int i = 0; i < kNumFiles; i++) {
+ for (int j = 0; j < kNumEntriesPerFile; j++) {
+ int key_id = (i * kNumEntriesPerFile + j) % kNumDistinctKeys;
+ std::string value = rnd.RandomString(kValueSize);
+ ASSERT_OK(Put("keykey_" + std::to_string(key_id), value));
+ snapshots.push_back(db_->GetSnapshot());
+ }
+ ASSERT_OK(Flush());
}
- Flush();
+
for (auto s : snapshots) {
db_->ReleaseSnapshot(s);
}
public ::testing::WithParamInterface<bool> {
public:
PrefixFullBloomWithReverseComparator()
- : DBTestBase("/prefix_bloom_reverse", /*env_do_fsync=*/true) {}
+ : DBTestBase("prefix_bloom_reverse", /*env_do_fsync=*/true) {}
void SetUp() override { if_cache_filter_ = GetParam(); }
bool if_cache_filter_;
};
ASSERT_OK(dbfull()->Put(WriteOptions(), "bar234", "foo2"));
ASSERT_OK(dbfull()->Put(WriteOptions(), "foo123", "foo3"));
- dbfull()->Flush(FlushOptions());
+ ASSERT_OK(dbfull()->Flush(FlushOptions()));
if (bbto.block_cache) {
bbto.block_cache->EraseUnRefEntries();
PrefixFullBloomWithReverseComparator, testing::Bool());
TEST_F(DBTest2, IteratorPropertyVersionNumber) {
- Put("", "");
+ ASSERT_OK(Put("", ""));
Iterator* iter1 = db_->NewIterator(ReadOptions());
+ ASSERT_OK(iter1->status());
std::string prop_value;
ASSERT_OK(
iter1->GetProperty("rocksdb.iterator.super-version-number", &prop_value));
uint64_t version_number1 =
static_cast<uint64_t>(std::atoi(prop_value.c_str()));
- Put("", "");
- Flush();
+ ASSERT_OK(Put("", ""));
+ ASSERT_OK(Flush());
Iterator* iter2 = db_->NewIterator(ReadOptions());
+ ASSERT_OK(iter2->status());
ASSERT_OK(
iter2->GetProperty("rocksdb.iterator.super-version-number", &prop_value));
uint64_t version_number2 =
ASSERT_GT(version_number2, version_number1);
- Put("", "");
+ ASSERT_OK(Put("", ""));
Iterator* iter3 = db_->NewIterator(ReadOptions());
+ ASSERT_OK(iter3->status());
ASSERT_OK(
iter3->GetProperty("rocksdb.iterator.super-version-number", &prop_value));
uint64_t version_number3 =
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
CreateAndReopenWithCF({"pikachu"}, options);
- Put(1, "a", "begin");
- Put(1, "z", "end");
+ ASSERT_OK(Put(1, "a", "begin"));
+ ASSERT_OK(Put(1, "z", "end"));
ASSERT_OK(Flush(1));
TryReopenWithColumnFamilies({"default", "pikachu"}, options);
options.merge_operator = MergeOperators::CreatePutOperator();
options.disable_auto_compactions = true;
DestroyAndReopen(options);
- Put("poi", "Finch");
- db_->Merge(WriteOptions(), "poi", "Reese");
- db_->Merge(WriteOptions(), "poi", "Shaw");
- db_->Merge(WriteOptions(), "poi", "Root");
+ ASSERT_OK(Put("poi", "Finch"));
+ ASSERT_OK(db_->Merge(WriteOptions(), "poi", "Reese"));
+ ASSERT_OK(db_->Merge(WriteOptions(), "poi", "Shaw"));
+ ASSERT_OK(db_->Merge(WriteOptions(), "poi", "Root"));
options.max_successive_merges = 2;
Reopen(options);
}
public testing::WithParamInterface<std::tuple<bool, bool>> {
public:
DBTestSharedWriteBufferAcrossCFs()
- : DBTestBase("/db_test_shared_write_buffer", /*env_do_fsync=*/true) {}
+ : DBTestBase("db_test_shared_write_buffer", /*env_do_fsync=*/true) {}
void SetUp() override {
use_old_interface_ = std::get<0>(GetParam());
cost_cache_ = std::get<1>(GetParam());
TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) {
Options options = CurrentOptions();
options.arena_block_size = 4096;
+ auto flush_listener = std::make_shared<FlushCounterListener>();
+ options.listeners.push_back(flush_listener);
+ // Don't trip the listener at shutdown.
+ options.avoid_flush_during_shutdown = true;
// Avoid undeterministic value by malloc_usable_size();
// Force arena block size to 1
wo.disableWAL = true;
std::function<void()> wait_flush = [&]() {
- dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
- dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
- dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
- dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[0]));
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[2]));
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[3]));
+ // Ensure background work is fully finished including listener callbacks
+ // before accessing listener state.
+ ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
};
// Create some data and flush "default" and "nikitich" so that they
// are newer CFs created.
+ flush_listener->expected_flush_reason = FlushReason::kManualFlush;
ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
Flush(3);
ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(1));
+ flush_listener->expected_flush_reason = FlushReason::kWriteBufferManager;
ASSERT_OK(Put(3, Key(1), DummyString(30000), wo));
if (cost_cache_) {
ASSERT_GE(cache->GetUsage(), 256 * 1024);
std::string dbname2 = test::PerThreadDBPath("db_shared_wb_db2");
Options options = CurrentOptions();
options.arena_block_size = 4096;
+ auto flush_listener = std::make_shared<FlushCounterListener>();
+ options.listeners.push_back(flush_listener);
+ // Don't trip the listener at shutdown.
+ options.avoid_flush_during_shutdown = true;
// Avoid undeterministic value by malloc_usable_size();
// Force arena block size to 1
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
wo.disableWAL = true;
std::function<void()> wait_flush = [&]() {
- dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
- dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
- dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
- static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable();
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[0]));
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[2]));
+ ASSERT_OK(static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable());
+ // Ensure background work is fully finished including listener callbacks
+ // before accessing listener state.
+ ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
+ ASSERT_OK(
+ static_cast_with_check<DBImpl>(db2)->TEST_WaitForBackgroundWork());
};
// Trigger a flush on cf2
+ flush_listener->expected_flush_reason = FlushReason::kWriteBufferManager;
ASSERT_OK(Put(2, Key(1), DummyString(70000), wo));
wait_flush();
ASSERT_OK(Put(0, Key(1), DummyString(20000), wo));
ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
wait_flush();
- static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable();
+ ASSERT_OK(static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable());
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default") +
GetNumberOfSstFilesForColumnFamily(db_, "cf1") +
wait_flush();
ASSERT_OK(db2->Put(wo, Key(1), DummyString(1)));
wait_flush();
- static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable();
+ ASSERT_OK(static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable());
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(1));
TEST_F(DBTest2, TestWriteBufferNoLimitWithCache) {
Options options = CurrentOptions();
options.arena_block_size = 4096;
- std::shared_ptr<Cache> cache =
- NewLRUCache(LRUCacheOptions(10000000, 1, false, 0.0));
+ std::shared_ptr<Cache> cache = NewLRUCache(LRUCacheOptions(
+ 10000000 /* capacity */, 1 /* num_shard_bits */,
+ false /* strict_capacity_limit */, 0.0 /* high_pri_pool_ratio */,
+ nullptr /* memory_allocator */, kDefaultToAdaptiveMutex,
+ kDontChargeCacheMetadata));
+
options.write_buffer_size = 50000; // this is never hit
// Use a write buffer total size so that the soft limit is about
// 105000.
}
namespace {
- void ValidateKeyExistence(DB* db, const std::vector<Slice>& keys_must_exist,
- const std::vector<Slice>& keys_must_not_exist) {
- // Ensure that expected keys exist
- std::vector<std::string> values;
- if (keys_must_exist.size() > 0) {
- std::vector<Status> status_list =
+void ValidateKeyExistence(DB* db, const std::vector<Slice>& keys_must_exist,
+ const std::vector<Slice>& keys_must_not_exist) {
+ // Ensure that expected keys exist
+ std::vector<std::string> values;
+ if (keys_must_exist.size() > 0) {
+ std::vector<Status> status_list =
db->MultiGet(ReadOptions(), keys_must_exist, &values);
- for (size_t i = 0; i < keys_must_exist.size(); i++) {
- ASSERT_OK(status_list[i]);
- }
+ for (size_t i = 0; i < keys_must_exist.size(); i++) {
+ ASSERT_OK(status_list[i]);
}
+ }
- // Ensure that given keys don't exist
- if (keys_must_not_exist.size() > 0) {
- std::vector<Status> status_list =
+ // Ensure that given keys don't exist
+ if (keys_must_not_exist.size() > 0) {
+ std::vector<Status> status_list =
db->MultiGet(ReadOptions(), keys_must_not_exist, &values);
- for (size_t i = 0; i < keys_must_not_exist.size(); i++) {
- ASSERT_TRUE(status_list[i].IsNotFound());
- }
+ for (size_t i = 0; i < keys_must_not_exist.size(); i++) {
+ ASSERT_TRUE(status_list[i].IsNotFound());
}
}
+}
-} // namespace
+} // anonymous namespace
TEST_F(DBTest2, WalFilterTest) {
class TestWalFilter : public WalFilter {
- private:
+ private:
// Processing option that is requested to be applied at the given index
WalFilter::WalProcessingOption wal_processing_option_;
// Index at which to apply wal_processing_option_
// Current record index, incremented with each record encountered.
size_t current_record_index_;
- public:
+ public:
TestWalFilter(WalFilter::WalProcessingOption wal_processing_option,
- size_t apply_option_for_record_index)
- : wal_processing_option_(wal_processing_option),
- apply_option_at_record_index_(apply_option_for_record_index),
- current_record_index_(0) {}
+ size_t apply_option_for_record_index)
+ : wal_processing_option_(wal_processing_option),
+ apply_option_at_record_index_(apply_option_for_record_index),
+ current_record_index_(0) {}
WalProcessingOption LogRecord(const WriteBatch& /*batch*/,
WriteBatch* /*new_batch*/,
if (current_record_index_ == apply_option_at_record_index_) {
option_to_return = wal_processing_option_;
- }
- else {
+ } else {
option_to_return = WalProcessingOption::kContinueProcessing;
}
// Test with all WAL processing options
for (int option = 0;
- option < static_cast<int>(
- WalFilter::WalProcessingOption::kWalProcessingOptionMax);
- option++) {
+ option < static_cast<int>(
+ WalFilter::WalProcessingOption::kWalProcessingOptionMax);
+ option++) {
Options options = OptionsForLogIterTest();
DestroyAndReopen(options);
- CreateAndReopenWithCF({ "pikachu" }, options);
+ CreateAndReopenWithCF({"pikachu"}, options);
// Write given keys in given batches
for (size_t i = 0; i < batch_keys.size(); i++) {
WriteBatch batch;
for (size_t j = 0; j < batch_keys[i].size(); j++) {
- batch.Put(handles_[0], batch_keys[i][j], DummyString(1024));
+ ASSERT_OK(batch.Put(handles_[0], batch_keys[i][j], DummyString(1024)));
}
- dbfull()->Write(WriteOptions(), &batch);
+ ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
}
WalFilter::WalProcessingOption wal_processing_option =
- static_cast<WalFilter::WalProcessingOption>(option);
+ static_cast<WalFilter::WalProcessingOption>(option);
// Create a test filter that would apply wal_processing_option at the first
// record
size_t apply_option_for_record_index = 1;
TestWalFilter test_wal_filter(wal_processing_option,
- apply_option_for_record_index);
+ apply_option_for_record_index);
// Reopen database with option to use WAL filter
options = OptionsForLogIterTest();
options.wal_filter = &test_wal_filter;
Status status =
- TryReopenWithColumnFamilies({ "default", "pikachu" }, options);
+ TryReopenWithColumnFamilies({"default", "pikachu"}, options);
if (wal_processing_option ==
- WalFilter::WalProcessingOption::kCorruptedRecord) {
- assert(!status.ok());
+ WalFilter::WalProcessingOption::kCorruptedRecord) {
+ ASSERT_NOK(status);
// In case of corruption we can turn off paranoid_checks to reopen
// databse
options.paranoid_checks = false;
- ReopenWithColumnFamilies({ "default", "pikachu" }, options);
- }
- else {
- assert(status.ok());
+ ReopenWithColumnFamilies({"default", "pikachu"}, options);
+ } else {
+ ASSERT_OK(status);
}
// Compute which keys we expect to be found
std::vector<Slice> keys_must_exist;
std::vector<Slice> keys_must_not_exist;
switch (wal_processing_option) {
- case WalFilter::WalProcessingOption::kCorruptedRecord:
- case WalFilter::WalProcessingOption::kContinueProcessing: {
- fprintf(stderr, "Testing with complete WAL processing\n");
- // we expect all records to be processed
- for (size_t i = 0; i < batch_keys.size(); i++) {
- for (size_t j = 0; j < batch_keys[i].size(); j++) {
- keys_must_exist.push_back(Slice(batch_keys[i][j]));
- }
- }
- break;
- }
- case WalFilter::WalProcessingOption::kIgnoreCurrentRecord: {
- fprintf(stderr,
- "Testing with ignoring record %" ROCKSDB_PRIszt " only\n",
- apply_option_for_record_index);
- // We expect the record with apply_option_for_record_index to be not
- // found.
- for (size_t i = 0; i < batch_keys.size(); i++) {
- for (size_t j = 0; j < batch_keys[i].size(); j++) {
- if (i == apply_option_for_record_index) {
- keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
- }
- else {
+ case WalFilter::WalProcessingOption::kCorruptedRecord:
+ case WalFilter::WalProcessingOption::kContinueProcessing: {
+ fprintf(stderr, "Testing with complete WAL processing\n");
+ // we expect all records to be processed
+ for (size_t i = 0; i < batch_keys.size(); i++) {
+ for (size_t j = 0; j < batch_keys[i].size(); j++) {
keys_must_exist.push_back(Slice(batch_keys[i][j]));
}
}
+ break;
}
- break;
- }
- case WalFilter::WalProcessingOption::kStopReplay: {
- fprintf(stderr,
- "Testing with stopping replay from record %" ROCKSDB_PRIszt
- "\n",
- apply_option_for_record_index);
- // We expect records beyond apply_option_for_record_index to be not
- // found.
- for (size_t i = 0; i < batch_keys.size(); i++) {
- for (size_t j = 0; j < batch_keys[i].size(); j++) {
- if (i >= apply_option_for_record_index) {
- keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
+ case WalFilter::WalProcessingOption::kIgnoreCurrentRecord: {
+ fprintf(stderr,
+ "Testing with ignoring record %" ROCKSDB_PRIszt " only\n",
+ apply_option_for_record_index);
+ // We expect the record with apply_option_for_record_index to be not
+ // found.
+ for (size_t i = 0; i < batch_keys.size(); i++) {
+ for (size_t j = 0; j < batch_keys[i].size(); j++) {
+ if (i == apply_option_for_record_index) {
+ keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
+ } else {
+ keys_must_exist.push_back(Slice(batch_keys[i][j]));
+ }
}
- else {
- keys_must_exist.push_back(Slice(batch_keys[i][j]));
+ }
+ break;
+ }
+ case WalFilter::WalProcessingOption::kStopReplay: {
+ fprintf(stderr,
+ "Testing with stopping replay from record %" ROCKSDB_PRIszt
+ "\n",
+ apply_option_for_record_index);
+ // We expect records beyond apply_option_for_record_index to be not
+ // found.
+ for (size_t i = 0; i < batch_keys.size(); i++) {
+ for (size_t j = 0; j < batch_keys[i].size(); j++) {
+ if (i >= apply_option_for_record_index) {
+ keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
+ } else {
+ keys_must_exist.push_back(Slice(batch_keys[i][j]));
+ }
}
}
+ break;
}
- break;
- }
- default:
- assert(false); // unhandled case
+ default:
+ FAIL(); // unhandled case
}
bool checked_after_reopen = false;
//(even if they were skipped)
// reopn database with option to use WAL filter
options = OptionsForLogIterTest();
- ReopenWithColumnFamilies({ "default", "pikachu" }, options);
+ ReopenWithColumnFamilies({"default", "pikachu"}, options);
checked_after_reopen = true;
}
TEST_F(DBTest2, WalFilterTestWithChangeBatch) {
class ChangeBatchHandler : public WriteBatch::Handler {
- private:
+ private:
// Batch to insert keys in
WriteBatch* new_write_batch_;
// Number of keys to add in the new batch
// Number of keys added to new batch
size_t num_keys_added_;
- public:
+ public:
ChangeBatchHandler(WriteBatch* new_write_batch,
- size_t num_keys_to_add_in_new_batch)
- : new_write_batch_(new_write_batch),
- num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch),
- num_keys_added_(0) {}
+ size_t num_keys_to_add_in_new_batch)
+ : new_write_batch_(new_write_batch),
+ num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch),
+ num_keys_added_(0) {}
void Put(const Slice& key, const Slice& value) override {
if (num_keys_added_ < num_keys_to_add_in_new_batch_) {
- new_write_batch_->Put(key, value);
+ ASSERT_OK(new_write_batch_->Put(key, value));
++num_keys_added_;
}
}
};
class TestWalFilterWithChangeBatch : public WalFilter {
- private:
+ private:
// Index at which to start changing records
size_t change_records_from_index_;
// Number of keys to add in the new batch
// Current record index, incremented with each record encountered.
size_t current_record_index_;
- public:
+ public:
TestWalFilterWithChangeBatch(size_t change_records_from_index,
- size_t num_keys_to_add_in_new_batch)
- : change_records_from_index_(change_records_from_index),
- num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch),
- current_record_index_(0) {}
+ size_t num_keys_to_add_in_new_batch)
+ : change_records_from_index_(change_records_from_index),
+ num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch),
+ current_record_index_(0) {}
WalProcessingOption LogRecord(const WriteBatch& batch,
WriteBatch* new_batch,
bool* batch_changed) const override {
if (current_record_index_ >= change_records_from_index_) {
ChangeBatchHandler handler(new_batch, num_keys_to_add_in_new_batch_);
- batch.Iterate(&handler);
- *batch_changed = true;
+ Status s = batch.Iterate(&handler);
+ if (s.ok()) {
+ *batch_changed = true;
+ } else {
+ assert(false);
+ }
}
// Filter is passed as a const object for RocksDB to not modify the
// object, however we modify it for our own purpose here and hence
// cast the constness away.
(const_cast<TestWalFilterWithChangeBatch*>(this)
- ->current_record_index_)++;
+ ->current_record_index_)++;
return WalProcessingOption::kContinueProcessing;
}
Options options = OptionsForLogIterTest();
DestroyAndReopen(options);
- CreateAndReopenWithCF({ "pikachu" }, options);
+ CreateAndReopenWithCF({"pikachu"}, options);
// Write given keys in given batches
for (size_t i = 0; i < batch_keys.size(); i++) {
WriteBatch batch;
for (size_t j = 0; j < batch_keys[i].size(); j++) {
- batch.Put(handles_[0], batch_keys[i][j], DummyString(1024));
+ ASSERT_OK(batch.Put(handles_[0], batch_keys[i][j], DummyString(1024)));
}
- dbfull()->Write(WriteOptions(), &batch);
+ ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
}
// Create a test filter that would apply wal_processing_option at the first
size_t change_records_from_index = 1;
size_t num_keys_to_add_in_new_batch = 1;
TestWalFilterWithChangeBatch test_wal_filter_with_change_batch(
- change_records_from_index, num_keys_to_add_in_new_batch);
+ change_records_from_index, num_keys_to_add_in_new_batch);
// Reopen database with option to use WAL filter
options = OptionsForLogIterTest();
options.wal_filter = &test_wal_filter_with_change_batch;
- ReopenWithColumnFamilies({ "default", "pikachu" }, options);
+ ReopenWithColumnFamilies({"default", "pikachu"}, options);
// Ensure that all keys exist before change_records_from_index_
// And after that index only single key exists
for (size_t j = 0; j < batch_keys[i].size(); j++) {
if (i >= change_records_from_index && j >= num_keys_to_add_in_new_batch) {
keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
- }
- else {
+ } else {
keys_must_exist.push_back(Slice(batch_keys[i][j]));
}
}
//(even if they were skipped)
// reopn database with option to use WAL filter
options = OptionsForLogIterTest();
- ReopenWithColumnFamilies({ "default", "pikachu" }, options);
+ ReopenWithColumnFamilies({"default", "pikachu"}, options);
checked_after_reopen = true;
}
TEST_F(DBTest2, WalFilterTestWithChangeBatchExtraKeys) {
class TestWalFilterWithChangeBatchAddExtraKeys : public WalFilter {
- public:
- WalProcessingOption LogRecord(const WriteBatch& batch, WriteBatch* new_batch,
- bool* batch_changed) const override {
- *new_batch = batch;
- new_batch->Put("key_extra", "value_extra");
- *batch_changed = true;
- return WalProcessingOption::kContinueProcessing;
- }
-
- const char* Name() const override {
- return "WalFilterTestWithChangeBatchExtraKeys";
- }
+ public:
+ WalProcessingOption LogRecord(const WriteBatch& batch,
+ WriteBatch* new_batch,
+ bool* batch_changed) const override {
+ *new_batch = batch;
+ Status s = new_batch->Put("key_extra", "value_extra");
+ if (s.ok()) {
+ *batch_changed = true;
+ } else {
+ assert(false);
+ }
+ return WalProcessingOption::kContinueProcessing;
+ }
+
+ const char* Name() const override {
+ return "WalFilterTestWithChangeBatchExtraKeys";
+ }
};
std::vector<std::vector<std::string>> batch_keys(3);
Options options = OptionsForLogIterTest();
DestroyAndReopen(options);
- CreateAndReopenWithCF({ "pikachu" }, options);
+ CreateAndReopenWithCF({"pikachu"}, options);
// Write given keys in given batches
for (size_t i = 0; i < batch_keys.size(); i++) {
WriteBatch batch;
for (size_t j = 0; j < batch_keys[i].size(); j++) {
- batch.Put(handles_[0], batch_keys[i][j], DummyString(1024));
+ ASSERT_OK(batch.Put(handles_[0], batch_keys[i][j], DummyString(1024)));
}
- dbfull()->Write(WriteOptions(), &batch);
+ ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
}
// Create a test filter that would add extra keys
// Reopen without filter, now reopen should succeed - previous
// attempt to open must not have altered the db.
options = OptionsForLogIterTest();
- ReopenWithColumnFamilies({ "default", "pikachu" }, options);
+ ReopenWithColumnFamilies({"default", "pikachu"}, options);
std::vector<Slice> keys_must_exist;
std::vector<Slice> keys_must_not_exist; // empty vector
TEST_F(DBTest2, WalFilterTestWithColumnFamilies) {
class TestWalFilterWithColumnFamilies : public WalFilter {
- private:
+ private:
// column_family_id -> log_number map (provided to WALFilter)
std::map<uint32_t, uint64_t> cf_log_number_map_;
// column_family_name -> column_family_id map (provided to WALFilter)
// during recovery (i.e. aren't already flushed to SST file(s))
// for verification against the keys we expect.
std::map<uint32_t, std::vector<std::string>> cf_wal_keys_;
- public:
- void ColumnFamilyLogNumberMap(
- const std::map<uint32_t, uint64_t>& cf_lognumber_map,
- const std::map<std::string, uint32_t>& cf_name_id_map) override {
- cf_log_number_map_ = cf_lognumber_map;
- cf_name_id_map_ = cf_name_id_map;
- }
-
- WalProcessingOption LogRecordFound(unsigned long long log_number,
- const std::string& /*log_file_name*/,
- const WriteBatch& batch,
- WriteBatch* /*new_batch*/,
- bool* /*batch_changed*/) override {
- class LogRecordBatchHandler : public WriteBatch::Handler {
- private:
- const std::map<uint32_t, uint64_t> & cf_log_number_map_;
- std::map<uint32_t, std::vector<std::string>> & cf_wal_keys_;
+
+ public:
+ void ColumnFamilyLogNumberMap(
+ const std::map<uint32_t, uint64_t>& cf_lognumber_map,
+ const std::map<std::string, uint32_t>& cf_name_id_map) override {
+ cf_log_number_map_ = cf_lognumber_map;
+ cf_name_id_map_ = cf_name_id_map;
+ }
+
+ WalProcessingOption LogRecordFound(unsigned long long log_number,
+ const std::string& /*log_file_name*/,
+ const WriteBatch& batch,
+ WriteBatch* /*new_batch*/,
+ bool* /*batch_changed*/) override {
+ class LogRecordBatchHandler : public WriteBatch::Handler {
+ private:
+ const std::map<uint32_t, uint64_t>& cf_log_number_map_;
+ std::map<uint32_t, std::vector<std::string>>& cf_wal_keys_;
unsigned long long log_number_;
- public:
- LogRecordBatchHandler(unsigned long long current_log_number,
- const std::map<uint32_t, uint64_t> & cf_log_number_map,
- std::map<uint32_t, std::vector<std::string>> & cf_wal_keys) :
- cf_log_number_map_(cf_log_number_map),
- cf_wal_keys_(cf_wal_keys),
- log_number_(current_log_number){}
+
+ public:
+ LogRecordBatchHandler(
+ unsigned long long current_log_number,
+ const std::map<uint32_t, uint64_t>& cf_log_number_map,
+ std::map<uint32_t, std::vector<std::string>>& cf_wal_keys)
+ : cf_log_number_map_(cf_log_number_map),
+ cf_wal_keys_(cf_wal_keys),
+ log_number_(current_log_number) {}
Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& /*value*/) override {
// (i.e. isn't flushed to SST file(s) for column_family_id)
// add it to the cf_wal_keys_ map for verification.
if (log_number_ >= log_number_for_cf) {
- cf_wal_keys_[column_family_id].push_back(std::string(key.data(),
- key.size()));
+ cf_wal_keys_[column_family_id].push_back(
+ std::string(key.data(), key.size()));
}
return Status::OK();
}
} handler(log_number, cf_log_number_map_, cf_wal_keys_);
- batch.Iterate(&handler);
+ Status s = batch.Iterate(&handler);
+ if (!s.ok()) {
+ // TODO(AR) is this ok?
+ return WalProcessingOption::kCorruptedRecord;
+ }
return WalProcessingOption::kContinueProcessing;
- }
+ }
- const char* Name() const override {
- return "WalFilterTestWithColumnFamilies";
- }
+ const char* Name() const override {
+ return "WalFilterTestWithColumnFamilies";
+ }
const std::map<uint32_t, std::vector<std::string>>& GetColumnFamilyKeys() {
return cf_wal_keys_;
}
- const std::map<std::string, uint32_t> & GetColumnFamilyNameIdMap() {
+ const std::map<std::string, uint32_t>& GetColumnFamilyNameIdMap() {
return cf_name_id_map_;
}
};
Options options = OptionsForLogIterTest();
DestroyAndReopen(options);
- CreateAndReopenWithCF({ "pikachu" }, options);
+ CreateAndReopenWithCF({"pikachu"}, options);
// Write given keys in given batches
for (size_t i = 0; i < batch_keys_pre_flush.size(); i++) {
WriteBatch batch;
for (size_t j = 0; j < batch_keys_pre_flush[i].size(); j++) {
- batch.Put(handles_[0], batch_keys_pre_flush[i][j], DummyString(1024));
- batch.Put(handles_[1], batch_keys_pre_flush[i][j], DummyString(1024));
+ ASSERT_OK(batch.Put(handles_[0], batch_keys_pre_flush[i][j],
+ DummyString(1024)));
+ ASSERT_OK(batch.Put(handles_[1], batch_keys_pre_flush[i][j],
+ DummyString(1024)));
}
- dbfull()->Write(WriteOptions(), &batch);
+ ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
}
- //Flush default column-family
- db_->Flush(FlushOptions(), handles_[0]);
+ // Flush default column-family
+ ASSERT_OK(db_->Flush(FlushOptions(), handles_[0]));
// Do some more writes
std::vector<std::vector<std::string>> batch_keys_post_flush(3);
for (size_t i = 0; i < batch_keys_post_flush.size(); i++) {
WriteBatch batch;
for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) {
- batch.Put(handles_[0], batch_keys_post_flush[i][j], DummyString(1024));
- batch.Put(handles_[1], batch_keys_post_flush[i][j], DummyString(1024));
+ ASSERT_OK(batch.Put(handles_[0], batch_keys_post_flush[i][j],
+ DummyString(1024)));
+ ASSERT_OK(batch.Put(handles_[1], batch_keys_post_flush[i][j],
+ DummyString(1024)));
}
- dbfull()->Write(WriteOptions(), &batch);
+ ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
}
// On Recovery we should only find the second batch applicable to default CF
// Reopen database with option to use WAL filter
options = OptionsForLogIterTest();
options.wal_filter = &test_wal_filter_column_families;
- Status status =
- TryReopenWithColumnFamilies({ "default", "pikachu" }, options);
+ Status status = TryReopenWithColumnFamilies({"default", "pikachu"}, options);
ASSERT_TRUE(status.ok());
// verify that handles_[0] only has post_flush keys
auto name_id_map = test_wal_filter_column_families.GetColumnFamilyNameIdMap();
size_t index = 0;
auto keys_cf = cf_wal_keys[name_id_map[kDefaultColumnFamilyName]];
- //default column-family, only post_flush keys are expected
+ // default column-family, only post_flush keys are expected
for (size_t i = 0; i < batch_keys_post_flush.size(); i++) {
for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) {
Slice key_from_the_log(keys_cf[index++]);
Slice batch_key(batch_keys_post_flush[i][j]);
- ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0);
+ ASSERT_EQ(key_from_the_log.compare(batch_key), 0);
}
}
- ASSERT_TRUE(index == keys_cf.size());
+ ASSERT_EQ(index, keys_cf.size());
index = 0;
keys_cf = cf_wal_keys[name_id_map["pikachu"]];
- //pikachu column-family, all keys are expected
+ // pikachu column-family, all keys are expected
for (size_t i = 0; i < batch_keys_pre_flush.size(); i++) {
for (size_t j = 0; j < batch_keys_pre_flush[i].size(); j++) {
Slice key_from_the_log(keys_cf[index++]);
Slice batch_key(batch_keys_pre_flush[i][j]);
- ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0);
+ ASSERT_EQ(key_from_the_log.compare(batch_key), 0);
}
}
for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) {
Slice key_from_the_log(keys_cf[index++]);
Slice batch_key(batch_keys_post_flush[i][j]);
- ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0);
+ ASSERT_EQ(key_from_the_log.compare(batch_key), 0);
}
}
- ASSERT_TRUE(index == keys_cf.size());
+ ASSERT_EQ(index, keys_cf.size());
}
TEST_F(DBTest2, PresetCompressionDict) {
options.disable_auto_compactions = true;
options.level0_file_num_compaction_trigger = kNumL0Files;
options.memtable_factory.reset(
- new SpecialSkipListFactory(kL0FileBytes / kBlockSizeBytes));
+ test::NewSpecialSkipListFactory(kL0FileBytes / kBlockSizeBytes));
options.num_levels = 2;
options.target_file_size_base = kL0FileBytes;
options.target_file_size_multiplier = 2;
#if LZ4_VERSION_NUMBER >= 10400 // r124+
compression_types.push_back(kLZ4Compression);
compression_types.push_back(kLZ4HCCompression);
-#endif // LZ4_VERSION_NUMBER >= 10400
+#endif // LZ4_VERSION_NUMBER >= 10400
if (ZSTD_Supported()) {
compression_types.push_back(kZSTD);
}
enum DictionaryTypes : int {
kWithoutDict,
kWithDict,
+ kWithZSTDfinalizeDict,
kWithZSTDTrainedDict,
kDictEnd,
};
options.compression = compression_type;
size_t bytes_without_dict = 0;
size_t bytes_with_dict = 0;
+ size_t bytes_with_zstd_finalize_dict = 0;
size_t bytes_with_zstd_trained_dict = 0;
for (int i = kWithoutDict; i < kDictEnd; i++) {
// First iteration: compress without preset dictionary
options.compression_opts.max_dict_bytes = kBlockSizeBytes;
options.compression_opts.zstd_max_train_bytes = 0;
break;
+ case kWithZSTDfinalizeDict:
+ if (compression_type != kZSTD ||
+ !ZSTD_FinalizeDictionarySupported()) {
+ continue;
+ }
+ options.compression_opts.max_dict_bytes = kBlockSizeBytes;
+ options.compression_opts.zstd_max_train_bytes = kL0FileBytes;
+ options.compression_opts.use_zstd_dict_trainer = false;
+ break;
case kWithZSTDTrainedDict:
- if (compression_type != kZSTD) {
+ if (compression_type != kZSTD || !ZSTD_TrainDictionarySupported()) {
continue;
}
options.compression_opts.max_dict_bytes = kBlockSizeBytes;
options.compression_opts.zstd_max_train_bytes = kL0FileBytes;
+ options.compression_opts.use_zstd_dict_trainer = true;
break;
default:
assert(false);
ASSERT_OK(Put(1, Key(static_cast<int>(key_num)),
seq_datas[(key_num / 10) % 10]));
}
- dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
ASSERT_EQ(j + 1, NumTableFilesAtLevel(0, 1));
}
- dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1],
- true /* disallow_trivial_move */);
+ ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1],
+ true /* disallow_trivial_move */));
ASSERT_EQ(0, NumTableFilesAtLevel(0, 1));
ASSERT_GT(NumTableFilesAtLevel(1, 1), 0);
bytes_without_dict = total_sst_bytes;
} else if (i == kWithDict) {
bytes_with_dict = total_sst_bytes;
+ } else if (i == kWithZSTDfinalizeDict) {
+ bytes_with_zstd_finalize_dict = total_sst_bytes;
} else if (i == kWithZSTDTrainedDict) {
bytes_with_zstd_trained_dict = total_sst_bytes;
}
}
if (i == kWithDict) {
ASSERT_GT(bytes_without_dict, bytes_with_dict);
+ } else if (i == kWithZSTDTrainedDict) {
+ // In zstd compression, it is sometimes possible that using a finalized
+ // dictionary does not get as good a compression ratio as raw content
+ // dictionary. But using a dictionary should always get better
+ // compression ratio than not using one.
+ ASSERT_TRUE(bytes_with_dict > bytes_with_zstd_finalize_dict ||
+ bytes_without_dict > bytes_with_zstd_finalize_dict);
} else if (i == kWithZSTDTrainedDict) {
// In zstd compression, it is sometimes possible that using a trained
// dictionary does not get as good a compression ratio as without
public testing::WithParamInterface<std::tuple<CompressionType, bool>> {
public:
PresetCompressionDictTest()
- : DBTestBase("/db_test2", false /* env_do_fsync */),
+ : DBTestBase("db_test2", false /* env_do_fsync */),
compression_type_(std::get<0>(GetParam())),
bottommost_(std::get<1>(GetParam())) {}
TEST_P(PresetCompressionDictTest, Flush) {
// Verifies that dictionary is generated and written during flush only when
- // `ColumnFamilyOptions::compression` enables dictionary.
+ // `ColumnFamilyOptions::compression` enables dictionary. Also verifies the
+ // size of the dictionary is within expectations according to the limit on
+ // buffering set by `CompressionOptions::max_dict_buffer_bytes`.
const size_t kValueLen = 256;
const size_t kKeysPerFile = 1 << 10;
- const size_t kDictLen = 4 << 10;
+ const size_t kDictLen = 16 << 10;
+ const size_t kBlockLen = 4 << 10;
Options options = CurrentOptions();
if (bottommost_) {
options.bottommost_compression = compression_type_;
options.bottommost_compression_opts.enabled = true;
options.bottommost_compression_opts.max_dict_bytes = kDictLen;
+ options.bottommost_compression_opts.max_dict_buffer_bytes = kBlockLen;
} else {
options.compression = compression_type_;
options.compression_opts.max_dict_bytes = kDictLen;
+ options.compression_opts.max_dict_buffer_bytes = kBlockLen;
}
- options.memtable_factory.reset(new SpecialSkipListFactory(kKeysPerFile));
+ options.memtable_factory.reset(test::NewSpecialSkipListFactory(kKeysPerFile));
options.statistics = CreateDBStatistics();
BlockBasedTableOptions bbto;
+ bbto.block_size = kBlockLen;
bbto.cache_index_and_filter_blocks = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
Reopen(options);
- uint64_t prev_compression_dict_misses =
- TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS);
Random rnd(301);
for (size_t i = 0; i <= kKeysPerFile; ++i) {
ASSERT_OK(Put(Key(static_cast<int>(i)), rnd.RandomString(kValueLen)));
}
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
- // If there's a compression dictionary, it should have been loaded when the
- // flush finished, incurring a cache miss.
- uint64_t expected_compression_dict_misses;
+ // We can use `BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT` to detect whether a
+ // compression dictionary exists since dictionaries would be preloaded when
+ // the flush finishes.
if (bottommost_) {
- expected_compression_dict_misses = prev_compression_dict_misses;
+ // Flush is never considered bottommost. This should change in the future
+ // since flushed files may have nothing underneath them, like the one in
+ // this test case.
+ ASSERT_EQ(
+ TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
+ 0);
} else {
- expected_compression_dict_misses = prev_compression_dict_misses + 1;
+ ASSERT_GT(
+ TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
+ 0);
+ // TODO(ajkr): fix the below assertion to work with ZSTD. The expectation on
+ // number of bytes needs to be adjusted in case the cached block is in
+ // ZSTD's digested dictionary format.
+ if (compression_type_ != kZSTD &&
+ compression_type_ != kZSTDNotFinalCompression) {
+ // Although we limited buffering to `kBlockLen`, there may be up to two
+ // blocks of data included in the dictionary since we only check limit
+ // after each block is built.
+ ASSERT_LE(TestGetTickerCount(options,
+ BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
+ 2 * kBlockLen);
+ }
}
- ASSERT_EQ(expected_compression_dict_misses,
- TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS));
}
TEST_P(PresetCompressionDictTest, CompactNonBottommost) {
// Verifies that dictionary is generated and written during compaction to
// non-bottommost level only when `ColumnFamilyOptions::compression` enables
- // dictionary.
+ // dictionary. Also verifies the size of the dictionary is within expectations
+ // according to the limit on buffering set by
+ // `CompressionOptions::max_dict_buffer_bytes`.
const size_t kValueLen = 256;
const size_t kKeysPerFile = 1 << 10;
- const size_t kDictLen = 4 << 10;
+ const size_t kDictLen = 16 << 10;
+ const size_t kBlockLen = 4 << 10;
Options options = CurrentOptions();
if (bottommost_) {
options.bottommost_compression = compression_type_;
options.bottommost_compression_opts.enabled = true;
options.bottommost_compression_opts.max_dict_bytes = kDictLen;
+ options.bottommost_compression_opts.max_dict_buffer_bytes = kBlockLen;
} else {
options.compression = compression_type_;
options.compression_opts.max_dict_bytes = kDictLen;
+ options.compression_opts.max_dict_buffer_bytes = kBlockLen;
}
options.disable_auto_compactions = true;
options.statistics = CreateDBStatistics();
BlockBasedTableOptions bbto;
+ bbto.block_size = kBlockLen;
bbto.cache_index_and_filter_blocks = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
Reopen(options);
ASSERT_EQ("2,0,1", FilesPerLevel(0));
#endif // ROCKSDB_LITE
- uint64_t prev_compression_dict_misses =
- TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS);
+ uint64_t prev_compression_dict_bytes_inserted =
+ TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT);
// This L0->L1 compaction merges the two L0 files into L1. The produced L1
// file is not bottommost due to the existing L2 file covering the same key-
// range.
#ifndef ROCKSDB_LITE
ASSERT_EQ("0,1,1", FilesPerLevel(0));
#endif // ROCKSDB_LITE
- // If there's a compression dictionary, it should have been loaded when the
- // compaction finished, incurring a cache miss.
- uint64_t expected_compression_dict_misses;
+ // We can use `BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT` to detect whether a
+ // compression dictionary exists since dictionaries would be preloaded when
+ // the compaction finishes.
if (bottommost_) {
- expected_compression_dict_misses = prev_compression_dict_misses;
+ ASSERT_EQ(
+ TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
+ prev_compression_dict_bytes_inserted);
} else {
- expected_compression_dict_misses = prev_compression_dict_misses + 1;
+ ASSERT_GT(
+ TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
+ prev_compression_dict_bytes_inserted);
+ // TODO(ajkr): fix the below assertion to work with ZSTD. The expectation on
+ // number of bytes needs to be adjusted in case the cached block is in
+ // ZSTD's digested dictionary format.
+ if (compression_type_ != kZSTD &&
+ compression_type_ != kZSTDNotFinalCompression) {
+ // Although we limited buffering to `kBlockLen`, there may be up to two
+ // blocks of data included in the dictionary since we only check limit
+ // after each block is built.
+ ASSERT_LE(TestGetTickerCount(options,
+ BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
+ prev_compression_dict_bytes_inserted + 2 * kBlockLen);
+ }
}
- ASSERT_EQ(expected_compression_dict_misses,
- TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS));
}
TEST_P(PresetCompressionDictTest, CompactBottommost) {
// Verifies that dictionary is generated and written during compaction to
// non-bottommost level only when either `ColumnFamilyOptions::compression` or
- // `ColumnFamilyOptions::bottommost_compression` enables dictionary.
+ // `ColumnFamilyOptions::bottommost_compression` enables dictionary. Also
+ // verifies the size of the dictionary is within expectations according to the
+ // limit on buffering set by `CompressionOptions::max_dict_buffer_bytes`.
const size_t kValueLen = 256;
const size_t kKeysPerFile = 1 << 10;
- const size_t kDictLen = 4 << 10;
+ const size_t kDictLen = 16 << 10;
+ const size_t kBlockLen = 4 << 10;
Options options = CurrentOptions();
if (bottommost_) {
options.bottommost_compression = compression_type_;
options.bottommost_compression_opts.enabled = true;
options.bottommost_compression_opts.max_dict_bytes = kDictLen;
+ options.bottommost_compression_opts.max_dict_buffer_bytes = kBlockLen;
} else {
options.compression = compression_type_;
options.compression_opts.max_dict_bytes = kDictLen;
+ options.compression_opts.max_dict_buffer_bytes = kBlockLen;
}
options.disable_auto_compactions = true;
options.statistics = CreateDBStatistics();
BlockBasedTableOptions bbto;
+ bbto.block_size = kBlockLen;
bbto.cache_index_and_filter_blocks = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
Reopen(options);
ASSERT_EQ("2", FilesPerLevel(0));
#endif // ROCKSDB_LITE
- uint64_t prev_compression_dict_misses =
- TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS);
+ uint64_t prev_compression_dict_bytes_inserted =
+ TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT);
CompactRangeOptions cro;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
#ifndef ROCKSDB_LITE
ASSERT_EQ("0,1", FilesPerLevel(0));
#endif // ROCKSDB_LITE
- // If there's a compression dictionary, it should have been loaded when the
- // compaction finished, incurring a cache miss.
- ASSERT_EQ(prev_compression_dict_misses + 1,
- TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS));
+ ASSERT_GT(
+ TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
+ prev_compression_dict_bytes_inserted);
+ // TODO(ajkr): fix the below assertion to work with ZSTD. The expectation on
+ // number of bytes needs to be adjusted in case the cached block is in ZSTD's
+ // digested dictionary format.
+ if (compression_type_ != kZSTD &&
+ compression_type_ != kZSTDNotFinalCompression) {
+ // Although we limited buffering to `kBlockLen`, there may be up to two
+ // blocks of data included in the dictionary since we only check limit after
+ // each block is built.
+ ASSERT_LE(
+ TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
+ prev_compression_dict_bytes_inserted + 2 * kBlockLen);
+ }
}
class CompactionCompressionListener : public EventListener {
for (int level = 0; level < db->NumberLevels(); level++) {
std::string files_at_level;
ASSERT_TRUE(
- db->GetProperty("rocksdb.num-files-at-level" + NumberToString(level),
+ db->GetProperty("rocksdb.num-files-at-level" + std::to_string(level),
&files_at_level));
if (files_at_level != "0") {
bottommost_level = level;
});
} else if (compression_failure_type_ == kTestDecompressionFail) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "UncompressBlockContentsForCompressionType:TamperWithReturnValue",
- [](void* arg) {
+ "UncompressBlockData:TamperWithReturnValue", [](void* arg) {
Status* ret = static_cast<Status*>(arg);
ASSERT_OK(*ret);
*ret = Status::Corruption("kTestDecompressionFail");
});
} else if (compression_failure_type_ == kTestDecompressionCorruption) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "UncompressBlockContentsForCompressionType:"
+ "UncompressBlockData:"
"TamperWithDecompressionOutput",
[](void* arg) {
BlockContents* contents = static_cast<BlockContents*>(arg);
"Could not decompress: kTestDecompressionFail");
} else if (compression_failure_type_ == kTestDecompressionCorruption) {
ASSERT_EQ(std::string(s.getState()),
- "Decompressed block did not match raw block");
+ "Decompressed block did not match pre-compression block");
}
}
ASSERT_OK(Put(key, value));
}
ASSERT_OK(Flush());
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
}
// Make sure that we wrote enough to check all 7 levels
ASSERT_EQ(key_value_written[key], value);
key_value_written.erase(key);
}
+ ASSERT_OK(db_iter->status());
ASSERT_EQ(0, key_value_written.size());
}
}
class CompactionStallTestListener : public EventListener {
public:
- CompactionStallTestListener() : compacting_files_cnt_(0), compacted_files_cnt_(0) {}
+ CompactionStallTestListener()
+ : compacting_files_cnt_(0), compacted_files_cnt_(0) {}
void OnCompactionBegin(DB* /*db*/, const CompactionJobInfo& ci) override {
ASSERT_EQ(ci.cf_name, "default");
// Hold NotifyOnCompactionCompleted in the unlock mutex section
TEST_SYNC_POINT("DBTest2::CompactionStall:3");
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_LT(NumTableFilesAtLevel(0),
options.level0_file_num_compaction_trigger);
ASSERT_GT(listener->compacted_files_cnt_.load(),
10 - options.level0_file_num_compaction_trigger);
- ASSERT_EQ(listener->compacting_files_cnt_.load(), listener->compacted_files_cnt_.load());
+ ASSERT_EQ(listener->compacting_files_cnt_.load(),
+ listener->compacted_files_cnt_.load());
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
// This snapshot will have sequence number 0 what is expected behaviour.
const Snapshot* s1 = db_->GetSnapshot();
- Put(1, "k1", std::string(100000, 'x')); // Fill memtable
- Put(1, "k2", std::string(100000, 'y')); // Trigger flush
+ ASSERT_OK(Put(1, "k1", std::string(100000, 'x'))); // Fill memtable
+ ASSERT_OK(Put(1, "k2", std::string(100000, 'y'))); // Trigger flush
db_->ReleaseSnapshot(s1);
}
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
SequenceNumber oldest_ww_snap, first_ww_snap;
- Put("k", "v"); // inc seq
+ ASSERT_OK(Put("k", "v")); // inc seq
snapshots.push_back(db_->GetSnapshot());
snapshots.push_back(db_->GetSnapshot());
- Put("k", "v"); // inc seq
+ ASSERT_OK(Put("k", "v")); // inc seq
snapshots.push_back(db_->GetSnapshot());
snapshots.push_back(dbi->GetSnapshotForWriteConflictBoundary());
first_ww_snap = snapshots.back()->GetSequenceNumber();
- Put("k", "v"); // inc seq
+ ASSERT_OK(Put("k", "v")); // inc seq
snapshots.push_back(dbi->GetSnapshotForWriteConflictBoundary());
snapshots.push_back(db_->GetSnapshot());
- Put("k", "v"); // inc seq
+ ASSERT_OK(Put("k", "v")); // inc seq
snapshots.push_back(db_->GetSnapshot());
{
public testing::WithParamInterface<std::tuple<bool, bool>> {
public:
PinL0IndexAndFilterBlocksTest()
- : DBTestBase("/db_pin_l0_index_bloom_test", /*env_do_fsync=*/true) {}
+ : DBTestBase("db_pin_l0_index_bloom_test", /*env_do_fsync=*/true) {}
void SetUp() override {
infinite_max_files_ = std::get<0>(GetParam());
disallow_preload_ = std::get<1>(GetParam());
options->table_factory.reset(NewBlockBasedTableFactory(table_options));
CreateAndReopenWithCF({"pikachu"}, *options);
- Put(1, "a", "begin");
- Put(1, "z", "end");
+ ASSERT_OK(Put(1, "a", "begin"));
+ ASSERT_OK(Put(1, "z", "end"));
ASSERT_OK(Flush(1));
// move this table to L1
- dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
+ ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]));
// reset block cache
table_options.block_cache = NewLRUCache(64 * 1024);
options->table_factory.reset(NewBlockBasedTableFactory(table_options));
TryReopenWithColumnFamilies({"default", "pikachu"}, *options);
// create new table at L0
- Put(1, "a2", "begin2");
- Put(1, "z2", "end2");
+ ASSERT_OK(Put(1, "a2", "begin2"));
+ ASSERT_OK(Put(1, "z2", "end2"));
ASSERT_OK(Flush(1));
if (close_afterwards) {
std::string value;
// Miss and hit count should remain the same, they're all pinned.
- db_->KeyMayExist(ReadOptions(), handles_[1], "key", &value);
+ ASSERT_TRUE(db_->KeyMayExist(ReadOptions(), handles_[1], "key", &value));
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
// cache read for both of index and filter. If prefetch doesn't explicitly
// happen, it will happen when verifying the file.
Compact(1, "a", "zzzzz");
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
if (!disallow_preload_) {
ASSERT_EQ(fm + 3, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
#ifndef ROCKSDB_LITE
TEST_F(DBTest2, MaxCompactionBytesTest) {
Options options = CurrentOptions();
- options.memtable_factory.reset(
- new SpecialSkipListFactory(DBTestBase::kNumKeysByGenerateNewRandomFile));
+ options.memtable_factory.reset(test::NewSpecialSkipListFactory(
+ DBTestBase::kNumKeysByGenerateNewRandomFile));
options.compaction_style = kCompactionStyleLevel;
options.write_buffer_size = 200 << 10;
options.arena_block_size = 4 << 10;
GenerateNewRandomFile(&rnd);
}
CompactRangeOptions cro;
- cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
+ cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
ASSERT_EQ("0,0,8", FilesPerLevel(0));
GenerateNewRandomFile(&rnd);
// Add three more small files that overlap with the previous file
for (int i = 0; i < 3; i++) {
- Put("a", "z");
+ ASSERT_OK(Put("a", "z"));
ASSERT_OK(Flush());
}
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
- // Output files to L1 are cut to three pieces, according to
- // options.max_compaction_bytes
- ASSERT_EQ("0,3,8", FilesPerLevel(0));
+ // Output files to L1 are cut to 4 pieces, according to
+ // options.max_compaction_bytes (300K)
+ // There are 8 files on L2 (grandparents level), each one is 100K. The first
+ // file overlaps with a, b which max_compaction_bytes is less than 300K, the
+ // second one overlaps with d, e, which is also less than 300K. Including any
+ // extra grandparent file will make the future compaction larger than 300K.
+ // L1: [ 1 ] [ 2 ] [ 3 ] [ 4 ]
+ // L2: [a] [b] [c] [d] [e] [f] [g] [h]
+ ASSERT_EQ("0,4,8", FilesPerLevel(0));
}
static void UniqueIdCallback(void* arg) {
const size_t kNumEntries = 10;
for (size_t i = 0; i < kNumEntries; ++i) {
- ASSERT_OK(Put("k" + ToString(i), "v" + ToString(i)));
+ ASSERT_OK(Put("k" + std::to_string(i), "v" + std::to_string(i)));
}
ASSERT_OK(Flush());
for (size_t i = 0; i < kNumEntries; ++i) {
- ASSERT_EQ("v" + ToString(i), Get("k" + ToString(i)));
+ ASSERT_EQ("v" + std::to_string(i), Get("k" + std::to_string(i)));
}
- std::string last_key = "k" + ToString(kNumEntries - 1);
- std::string last_value = "v" + ToString(kNumEntries - 1);
+ std::string last_key = "k" + std::to_string(kNumEntries - 1);
+ std::string last_value = "v" + std::to_string(kNumEntries - 1);
env_->now_cpu_count_.store(0);
env_->SetMockSleep();
ASSERT_EQ(0, get_perf_context()->iter_next_cpu_nanos);
iter->Prev();
ASSERT_TRUE(iter->Valid());
+ ASSERT_OK(iter->status());
ASSERT_EQ("v0", iter->value().ToString());
ASSERT_EQ(0, get_perf_context()->iter_prev_cpu_nanos);
ASSERT_EQ(0, env_->now_cpu_count_.load());
ASSERT_LT(get_perf_context()->iter_next_cpu_nanos, kDummyAddonNanos);
iter->Prev();
ASSERT_TRUE(iter->Valid());
+ ASSERT_OK(iter->status());
ASSERT_EQ("v0", iter->value().ToString());
ASSERT_GT(get_perf_context()->iter_prev_cpu_nanos, 0);
ASSERT_LT(get_perf_context()->iter_prev_cpu_nanos, kDummyAddonNanos);
void CountSyncPoint() {
TEST_SYNC_POINT_CALLBACK("DBTest2::MarkedPoint", nullptr /* arg */);
}
-} // namespace
+} // anonymous namespace
TEST_F(DBTest2, SyncPointMarker) {
std::atomic<int> sync_point_called(0);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_EQ(iter->value().ToString(), Get(iter->key().ToString()));
}
+ ASSERT_OK(iter->status());
delete iter;
// Read amp is on average 100% since we read all what we loaded in memory
}
}
-#ifndef OS_SOLARIS // GetUniqueIdFromFile is not implemented
+#ifndef OS_SOLARIS // GetUniqueIdFromFile is not implemented
TEST_F(DBTest2, ReadAmpBitmapLiveInCacheAfterDBClose) {
{
const int kIdBufLen = 100;
Close();
Reopen(options);
- uint64_t total_useful_bytes = 0;
std::set<int> read_keys;
std::string value;
// Iter1: Read half the DB, Read even keys
if (read_keys.find(i) == read_keys.end()) {
auto internal_key = InternalKey(key, 0, ValueType::kTypeValue);
- total_useful_bytes +=
- GetEncodedEntrySize(internal_key.size(), value.size());
read_keys.insert(i);
}
}
if (read_keys.find(i) == read_keys.end()) {
auto internal_key = InternalKey(key, 0, ValueType::kTypeValue);
- total_useful_bytes +=
- GetEncodedEntrySize(internal_key.size(), value.size());
read_keys.insert(i);
}
}
size_t total_loaded_bytes_iter2 =
options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES);
-
// Read amp is on average 100% since we read all what we loaded in memory
if (k == 0) {
ASSERT_EQ(total_useful_bytes_iter1 + total_useful_bytes_iter2,
}
}
}
-#endif // !OS_SOLARIS
+#endif // !OS_SOLARIS
#ifndef ROCKSDB_LITE
TEST_F(DBTest2, AutomaticCompactionOverlapManualCompaction) {
int manual_compactions_paused = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Run():PausingManualCompaction:1", [&](void* arg) {
+ auto canceled = static_cast<std::atomic<bool>*>(arg);
+ // CompactRange triggers manual compaction and cancel the compaction
+ // by set *canceled as true
+ if (canceled != nullptr) {
+ canceled->store(true, std::memory_order_release);
+ }
+ manual_compactions_paused += 1;
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "TestCompactFiles:PausingManualCompaction:3", [&](void* arg) {
auto paused = static_cast<std::atomic<int>*>(arg);
+ // CompactFiles() relies on manual_compactions_paused to
+ // determine if thie compaction should be paused or not
ASSERT_EQ(0, paused->load(std::memory_order_acquire));
paused->fetch_add(1, std::memory_order_release);
- manual_compactions_paused += 1;
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
}
// OK, now trigger a manual compaction
- dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+ ASSERT_TRUE(dbfull()
+ ->CompactRange(CompactRangeOptions(), nullptr, nullptr)
+ .IsManualCompactionPaused());
// Wait for compactions to get scheduled and stopped
- dbfull()->TEST_WaitForCompact(true);
+ ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
// Get file names after compaction is stopped
files_meta.clear();
manual_compactions_paused = 0;
// Now make sure CompactFiles also not run
- dbfull()->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(),
- files_before_compact, 0);
+ ASSERT_TRUE(dbfull()
+ ->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(),
+ files_before_compact, 0)
+ .IsManualCompactionPaused());
// Wait for manual compaction to get scheduled and finish
- dbfull()->TEST_WaitForCompact(true);
+ ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
files_meta.clear();
files_after_compact.clear();
Random rnd(301);
for (int i = 0; i < 2; i++) {
- // Generate a file containing 10 keys.
+ // Generate a file containing 100 keys.
for (int j = 0; j < 100; j++) {
ASSERT_OK(Put(Key(j), rnd.RandomString(50)));
}
for (int k = 0; k < 1000; k++) {
ASSERT_OK(Put(Key(k + j * 1000), rnd.RandomString(50)));
}
- Flush();
+ ASSERT_OK(Flush());
}
for (int l = 1; l < options.num_levels - i; l++) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
dbfull()->DisableManualCompaction();
- dbfull()->CompactRange(compact_options, nullptr, nullptr);
- dbfull()->TEST_WaitForCompact(true);
+ ASSERT_TRUE(dbfull()
+ ->CompactRange(compact_options, nullptr, nullptr)
+ .IsManualCompactionPaused());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
// As manual compaction disabled, not even reach sync point
ASSERT_EQ(run_manual_compactions, 0);
#ifndef ROCKSDB_LITE
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
"CompactionJob::Run():PausingManualCompaction:1");
dbfull()->EnableManualCompaction();
- dbfull()->CompactRange(compact_options, nullptr, nullptr);
- dbfull()->TEST_WaitForCompact(true);
+ ASSERT_OK(dbfull()->CompactRange(compact_options, nullptr, nullptr));
+ ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
#ifndef ROCKSDB_LITE
ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
#endif // !ROCKSDB_LITE
for (int k = 0; k < 1000; k++) {
ASSERT_OK(Put(Key(k + j * 1000), rnd.RandomString(50)));
}
- Flush();
+ ASSERT_OK(Flush());
}
for (int l = 1; l < options.num_levels - i; l++) {
int run_manual_compactions = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Run():PausingManualCompaction:2", [&](void* arg) {
+ auto canceled = static_cast<std::atomic<bool>*>(arg);
+ // CompactRange triggers manual compaction and cancel the compaction
+ // by set *canceled as true
+ if (canceled != nullptr) {
+ canceled->store(true, std::memory_order_release);
+ }
+ run_manual_compactions++;
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "TestCompactFiles:PausingManualCompaction:3", [&](void* arg) {
auto paused = static_cast<std::atomic<int>*>(arg);
+ // CompactFiles() relies on manual_compactions_paused to
+ // determine if thie compaction should be paused or not
ASSERT_EQ(0, paused->load(std::memory_order_acquire));
paused->fetch_add(1, std::memory_order_release);
- run_manual_compactions++;
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- dbfull()->CompactRange(compact_options, nullptr, nullptr);
- dbfull()->TEST_WaitForCompact(true);
+ ASSERT_TRUE(dbfull()
+ ->CompactRange(compact_options, nullptr, nullptr)
+ .IsManualCompactionPaused());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
ASSERT_EQ(run_manual_compactions, 1);
#ifndef ROCKSDB_LITE
ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel());
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
"CompactionJob::Run():PausingManualCompaction:2");
- dbfull()->EnableManualCompaction();
- dbfull()->CompactRange(compact_options, nullptr, nullptr);
- dbfull()->TEST_WaitForCompact(true);
+ ASSERT_OK(dbfull()->CompactRange(compact_options, nullptr, nullptr));
+ ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
#ifndef ROCKSDB_LITE
ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
#endif // !ROCKSDB_LITE
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
-TEST_F(DBTest2, OptimizeForPointLookup) {
- Options options = CurrentOptions();
- Close();
- options.OptimizeForPointLookup(2);
- ASSERT_OK(DB::Open(options, dbname_, &db_));
-
- ASSERT_OK(Put("foo", "v1"));
- ASSERT_EQ("v1", Get("foo"));
- Flush();
- ASSERT_EQ("v1", Get("foo"));
-}
+TEST_F(DBTest2, CancelManualCompaction1) {
+ CompactRangeOptions compact_options;
+ auto canceledPtr =
+ std::unique_ptr<std::atomic<bool>>(new std::atomic<bool>{true});
+ compact_options.canceled = canceledPtr.get();
-TEST_F(DBTest2, OptimizeForSmallDB) {
Options options = CurrentOptions();
- Close();
- options.OptimizeForSmallDb();
+ options.disable_auto_compactions = true;
+ options.num_levels = 7;
- // Find the cache object
- ASSERT_TRUE(options.table_factory->IsInstanceOf(
- TableFactory::kBlockBasedTableName()));
- auto table_options =
- options.table_factory->GetOptions<BlockBasedTableOptions>();
+ Random rnd(301);
+ auto generate_files = [&]() {
+ for (int i = 0; i < options.num_levels; i++) {
+ for (int j = 0; j < options.num_levels - i + 1; j++) {
+ for (int k = 0; k < 1000; k++) {
+ ASSERT_OK(Put(Key(k + j * 1000), rnd.RandomString(50)));
+ }
+ ASSERT_OK(Flush());
+ }
- ASSERT_TRUE(table_options != nullptr);
- std::shared_ptr<Cache> cache = table_options->block_cache;
+ for (int l = 1; l < options.num_levels - i; l++) {
+ MoveFilesToLevel(l);
+ }
+ }
+ };
- ASSERT_EQ(0, cache->GetUsage());
- ASSERT_OK(DB::Open(options, dbname_, &db_));
- ASSERT_OK(Put("foo", "v1"));
+ DestroyAndReopen(options);
+ generate_files();
+#ifndef ROCKSDB_LITE
+ ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel());
+#endif // !ROCKSDB_LITE
- // memtable size is costed to the block cache
- ASSERT_NE(0, cache->GetUsage());
+ int run_manual_compactions = 0;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "CompactionJob::Run():PausingManualCompaction:1",
+ [&](void* /*arg*/) { run_manual_compactions++; });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- ASSERT_EQ("v1", Get("foo"));
- Flush();
+ // Setup a callback to disable compactions after a couple of levels are
+ // compacted
+ int compactions_run = 0;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::RunManualCompaction()::1",
+ [&](void* /*arg*/) { ++compactions_run; });
- size_t prev_size = cache->GetUsage();
- // Remember block cache size, so that we can find that
- // it is filled after Get().
- // Use pinnable slice so that it can ping the block so that
- // when we check the size it is not evicted.
- PinnableSlice value;
- ASSERT_OK(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), "foo", &value));
- ASSERT_GT(cache->GetUsage(), prev_size);
- value.Reset();
-}
+ ASSERT_TRUE(dbfull()
+ ->CompactRange(compact_options, nullptr, nullptr)
+ .IsManualCompactionPaused());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
-#endif // ROCKSDB_LITE
+ // Since compactions are disabled, we shouldn't start compacting.
+ // E.g. we should call the compaction function exactly one time.
+ ASSERT_EQ(compactions_run, 0);
+ ASSERT_EQ(run_manual_compactions, 0);
+#ifndef ROCKSDB_LITE
+ ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel());
+#endif // !ROCKSDB_LITE
-TEST_F(DBTest2, IterRaceFlush1) {
- ASSERT_OK(Put("foo", "v1"));
+ compactions_run = 0;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
+ "DBImpl::RunManualCompaction()::1");
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::RunManualCompaction()::1", [&](void* /*arg*/) {
+ ++compactions_run;
+ // After 3 compactions disable
+ if (compactions_run == 3) {
+ compact_options.canceled->store(true, std::memory_order_release);
+ }
+ });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
- {{"DBImpl::NewIterator:1", "DBTest2::IterRaceFlush:1"},
- {"DBTest2::IterRaceFlush:2", "DBImpl::NewIterator:2"}});
+ compact_options.canceled->store(false, std::memory_order_release);
+ ASSERT_TRUE(dbfull()
+ ->CompactRange(compact_options, nullptr, nullptr)
+ .IsManualCompactionPaused());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+ ASSERT_EQ(compactions_run, 3);
- ROCKSDB_NAMESPACE::port::Thread t1([&] {
- TEST_SYNC_POINT("DBTest2::IterRaceFlush:1");
- ASSERT_OK(Put("foo", "v2"));
- Flush();
- TEST_SYNC_POINT("DBTest2::IterRaceFlush:2");
- });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
+ "DBImpl::RunManualCompaction()::1");
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
+ "CompactionJob::Run():PausingManualCompaction:1");
- // iterator is created after the first Put(), so it should see either
- // "v1" or "v2".
- {
- std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
- it->Seek("foo");
- ASSERT_TRUE(it->Valid());
- ASSERT_EQ("foo", it->key().ToString());
- }
+ // Compactions should work again if we re-enable them..
+ compact_options.canceled->store(false, std::memory_order_relaxed);
+ ASSERT_OK(dbfull()->CompactRange(compact_options, nullptr, nullptr));
+ ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
+#ifndef ROCKSDB_LITE
+ ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
+#endif // !ROCKSDB_LITE
- t1.join();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
-TEST_F(DBTest2, IterRaceFlush2) {
- ASSERT_OK(Put("foo", "v1"));
+TEST_F(DBTest2, CancelManualCompaction2) {
+ CompactRangeOptions compact_options;
+ auto canceledPtr =
+ std::unique_ptr<std::atomic<bool>>(new std::atomic<bool>{true});
+ compact_options.canceled = canceledPtr.get();
+ compact_options.max_subcompactions = 1;
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
- {{"DBImpl::NewIterator:3", "DBTest2::IterRaceFlush2:1"},
- {"DBTest2::IterRaceFlush2:2", "DBImpl::NewIterator:4"}});
+ Options options = CurrentOptions();
+ options.disable_auto_compactions = true;
+ options.num_levels = 7;
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+ Random rnd(301);
+ auto generate_files = [&]() {
+ for (int i = 0; i < options.num_levels; i++) {
+ for (int j = 0; j < options.num_levels - i + 1; j++) {
+ for (int k = 0; k < 1000; k++) {
+ ASSERT_OK(Put(Key(k + j * 1000), rnd.RandomString(50)));
+ }
+ ASSERT_OK(Flush());
+ }
- ROCKSDB_NAMESPACE::port::Thread t1([&] {
- TEST_SYNC_POINT("DBTest2::IterRaceFlush2:1");
- ASSERT_OK(Put("foo", "v2"));
- Flush();
- TEST_SYNC_POINT("DBTest2::IterRaceFlush2:2");
- });
+ for (int l = 1; l < options.num_levels - i; l++) {
+ MoveFilesToLevel(l);
+ }
+ }
+ };
- // iterator is created after the first Put(), so it should see either
- // "v1" or "v2".
- {
- std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
- it->Seek("foo");
- ASSERT_TRUE(it->Valid());
- ASSERT_EQ("foo", it->key().ToString());
- }
+ DestroyAndReopen(options);
+ generate_files();
+#ifndef ROCKSDB_LITE
+ ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel());
+#endif // !ROCKSDB_LITE
- t1.join();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
-}
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
-TEST_F(DBTest2, IterRefreshRaceFlush) {
+ int compactions_run = 0;
+ std::atomic<int> kv_compactions{0};
+ int compactions_stopped_at = 0;
+ int kv_compactions_stopped_at = 0;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::RunManualCompaction()::1", [&](void* /*arg*/) {
+ ++compactions_run;
+ // After 3 compactions disable
+ });
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "CompactionIterator:ProcessKV", [&](void* /*arg*/) {
+ int kv_compactions_run =
+ kv_compactions.fetch_add(1, std::memory_order_release);
+ if (kv_compactions_run == 5) {
+ compact_options.canceled->store(true, std::memory_order_release);
+ kv_compactions_stopped_at = kv_compactions_run;
+ compactions_stopped_at = compactions_run;
+ }
+ });
+
+ compact_options.canceled->store(false, std::memory_order_release);
+ ASSERT_TRUE(dbfull()
+ ->CompactRange(compact_options, nullptr, nullptr)
+ .IsManualCompactionPaused());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
+
+ // NOTE: as we set compact_options.max_subcompacitons = 1, and store true to
+ // the canceled variable from the single compacting thread (via callback),
+ // this value is deterministically kv_compactions_stopped_at + 1.
+ ASSERT_EQ(kv_compactions, kv_compactions_stopped_at + 1);
+ ASSERT_EQ(compactions_run, compactions_stopped_at);
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
+ "CompactionIterator::ProcessKV");
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
+ "DBImpl::RunManualCompaction()::1");
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
+ "CompactionJob::Run():PausingManualCompaction:1");
+
+ // Compactions should work again if we re-enable them..
+ compact_options.canceled->store(false, std::memory_order_relaxed);
+ ASSERT_OK(dbfull()->CompactRange(compact_options, nullptr, nullptr));
+ ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
+#ifndef ROCKSDB_LITE
+ ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
+#endif // !ROCKSDB_LITE
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+}
+
+class CancelCompactionListener : public EventListener {
+ public:
+ CancelCompactionListener()
+ : num_compaction_started_(0), num_compaction_ended_(0) {}
+
+ void OnCompactionBegin(DB* /*db*/, const CompactionJobInfo& ci) override {
+ ASSERT_EQ(ci.cf_name, "default");
+ ASSERT_EQ(ci.base_input_level, 0);
+ num_compaction_started_++;
+ }
+
+ void OnCompactionCompleted(DB* /*db*/, const CompactionJobInfo& ci) override {
+ ASSERT_EQ(ci.cf_name, "default");
+ ASSERT_EQ(ci.base_input_level, 0);
+ ASSERT_EQ(ci.status.code(), code_);
+ ASSERT_EQ(ci.status.subcode(), subcode_);
+ num_compaction_ended_++;
+ }
+
+ std::atomic<size_t> num_compaction_started_;
+ std::atomic<size_t> num_compaction_ended_;
+ Status::Code code_;
+ Status::SubCode subcode_;
+};
+
+TEST_F(DBTest2, CancelManualCompactionWithListener) {
+ CompactRangeOptions compact_options;
+ auto canceledPtr =
+ std::unique_ptr<std::atomic<bool>>(new std::atomic<bool>{true});
+ compact_options.canceled = canceledPtr.get();
+ compact_options.max_subcompactions = 1;
+
+ Options options = CurrentOptions();
+ options.disable_auto_compactions = true;
+ CancelCompactionListener* listener = new CancelCompactionListener();
+ options.listeners.emplace_back(listener);
+
+ DestroyAndReopen(options);
+
+ Random rnd(301);
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 10; j++) {
+ ASSERT_OK(Put(Key(i + j * 10), rnd.RandomString(50)));
+ }
+ ASSERT_OK(Flush());
+ }
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "CompactionIterator:ProcessKV", [&](void* /*arg*/) {
+ compact_options.canceled->store(true, std::memory_order_release);
+ });
+
+ int running_compaction = 0;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "CompactionJob::FinishCompactionOutputFile1",
+ [&](void* /*arg*/) { running_compaction++; });
+
+ // Case I: 1 Notify begin compaction, 2 Set *canceled as true to disable
+ // manual compaction in the callback function, 3 Compaction not run,
+ // 4 Notify compaction end.
+ listener->code_ = Status::kIncomplete;
+ listener->subcode_ = Status::SubCode::kManualCompactionPaused;
+
+ compact_options.canceled->store(false, std::memory_order_release);
+ ASSERT_TRUE(dbfull()
+ ->CompactRange(compact_options, nullptr, nullptr)
+ .IsManualCompactionPaused());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
+
+ ASSERT_GT(listener->num_compaction_started_, 0);
+ ASSERT_EQ(listener->num_compaction_started_, listener->num_compaction_ended_);
+ ASSERT_EQ(running_compaction, 0);
+
+ listener->num_compaction_started_ = 0;
+ listener->num_compaction_ended_ = 0;
+
+ // Case II: 1 Set *canceled as true in the callback function to disable manual
+ // compaction, 2 Notify begin compaction (return without notifying), 3 Notify
+ // compaction end (return without notifying).
+ ASSERT_TRUE(dbfull()
+ ->CompactRange(compact_options, nullptr, nullptr)
+ .IsManualCompactionPaused());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
+
+ ASSERT_EQ(listener->num_compaction_started_, 0);
+ ASSERT_EQ(listener->num_compaction_started_, listener->num_compaction_ended_);
+ ASSERT_EQ(running_compaction, 0);
+
+ // Case III: 1 Notify begin compaction, 2 Compaction in between
+ // 3. Set *canceled as true in the callback function to disable manual
+ // compaction, 4 Notify compaction end.
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
+ "CompactionIterator:ProcessKV");
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "CompactionJob::Run:BeforeVerify", [&](void* /*arg*/) {
+ compact_options.canceled->store(true, std::memory_order_release);
+ });
+
+ listener->code_ = Status::kOk;
+ listener->subcode_ = Status::SubCode::kNone;
+
+ compact_options.canceled->store(false, std::memory_order_release);
+ ASSERT_OK(dbfull()->CompactRange(compact_options, nullptr, nullptr));
+ ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
+
+ ASSERT_GT(listener->num_compaction_started_, 0);
+ ASSERT_EQ(listener->num_compaction_started_, listener->num_compaction_ended_);
+
+ // Compaction job will succeed.
+ ASSERT_GT(running_compaction, 0);
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+}
+
+TEST_F(DBTest2, CompactionOnBottomPriorityWithListener) {
+ int num_levels = 3;
+ const int kNumFilesTrigger = 4;
+
+ Options options = CurrentOptions();
+ env_->SetBackgroundThreads(0, Env::Priority::HIGH);
+ env_->SetBackgroundThreads(0, Env::Priority::LOW);
+ env_->SetBackgroundThreads(1, Env::Priority::BOTTOM);
+ options.env = env_;
+ options.compaction_style = kCompactionStyleUniversal;
+ options.num_levels = num_levels;
+ options.write_buffer_size = 100 << 10; // 100KB
+ options.target_file_size_base = 32 << 10; // 32KB
+ options.level0_file_num_compaction_trigger = kNumFilesTrigger;
+ // Trigger compaction if size amplification exceeds 110%
+ options.compaction_options_universal.max_size_amplification_percent = 110;
+
+ CancelCompactionListener* listener = new CancelCompactionListener();
+ options.listeners.emplace_back(listener);
+
+ DestroyAndReopen(options);
+
+ int num_bottom_thread_compaction_scheduled = 0;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::BackgroundCompaction:ForwardToBottomPriPool",
+ [&](void* /*arg*/) { num_bottom_thread_compaction_scheduled++; });
+
+ int num_compaction_jobs = 0;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "CompactionJob::Run():End",
+ [&](void* /*arg*/) { num_compaction_jobs++; });
+
+ listener->code_ = Status::kOk;
+ listener->subcode_ = Status::SubCode::kNone;
+
+ Random rnd(301);
+ for (int i = 0; i < 1; ++i) {
+ for (int num = 0; num < kNumFilesTrigger; num++) {
+ int key_idx = 0;
+ GenerateNewFile(&rnd, &key_idx, true /* no_wait */);
+ // use no_wait above because that one waits for flush and compaction. We
+ // don't want to wait for compaction because the full compaction is
+ // intentionally blocked while more files are flushed.
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+ }
+ }
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ ASSERT_GT(num_bottom_thread_compaction_scheduled, 0);
+ ASSERT_EQ(num_compaction_jobs, 1);
+ ASSERT_GT(listener->num_compaction_started_, 0);
+ ASSERT_EQ(listener->num_compaction_started_, listener->num_compaction_ended_);
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+}
+
+TEST_F(DBTest2, OptimizeForPointLookup) {
+ Options options = CurrentOptions();
+ Close();
+ options.OptimizeForPointLookup(2);
+ ASSERT_OK(DB::Open(options, dbname_, &db_));
+
+ ASSERT_OK(Put("foo", "v1"));
+ ASSERT_EQ("v1", Get("foo"));
+ ASSERT_OK(Flush());
+ ASSERT_EQ("v1", Get("foo"));
+}
+
+TEST_F(DBTest2, OptimizeForSmallDB) {
+ Options options = CurrentOptions();
+ Close();
+ options.OptimizeForSmallDb();
+
+ // Find the cache object
+ ASSERT_TRUE(options.table_factory->IsInstanceOf(
+ TableFactory::kBlockBasedTableName()));
+ auto table_options =
+ options.table_factory->GetOptions<BlockBasedTableOptions>();
+
+ ASSERT_TRUE(table_options != nullptr);
+ std::shared_ptr<Cache> cache = table_options->block_cache;
+
+ ASSERT_EQ(0, cache->GetUsage());
+ ASSERT_OK(DB::Open(options, dbname_, &db_));
+ ASSERT_OK(Put("foo", "v1"));
+
+ // memtable size is costed to the block cache
+ ASSERT_NE(0, cache->GetUsage());
+
+ ASSERT_EQ("v1", Get("foo"));
+ ASSERT_OK(Flush());
+
+ size_t prev_size = cache->GetUsage();
+ // Remember block cache size, so that we can find that
+ // it is filled after Get().
+ // Use pinnable slice so that it can ping the block so that
+ // when we check the size it is not evicted.
+ PinnableSlice value;
+ ASSERT_OK(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), "foo", &value));
+ ASSERT_GT(cache->GetUsage(), prev_size);
+ value.Reset();
+}
+
+#endif // ROCKSDB_LITE
+
+TEST_F(DBTest2, IterRaceFlush1) {
+ ASSERT_OK(Put("foo", "v1"));
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+ {{"DBImpl::NewIterator:1", "DBTest2::IterRaceFlush:1"},
+ {"DBTest2::IterRaceFlush:2", "DBImpl::NewIterator:2"}});
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ ROCKSDB_NAMESPACE::port::Thread t1([&] {
+ TEST_SYNC_POINT("DBTest2::IterRaceFlush:1");
+ ASSERT_OK(Put("foo", "v2"));
+ ASSERT_OK(Flush());
+ TEST_SYNC_POINT("DBTest2::IterRaceFlush:2");
+ });
+
+ // iterator is created after the first Put(), and its snapshot sequence is
+ // assigned after second Put(), so it must see v2.
+ {
+ std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
+ it->Seek("foo");
+ ASSERT_TRUE(it->Valid());
+ ASSERT_OK(it->status());
+ ASSERT_EQ("foo", it->key().ToString());
+ ASSERT_EQ("v2", it->value().ToString());
+ }
+
+ t1.join();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+}
+
+TEST_F(DBTest2, IterRaceFlush2) {
+ ASSERT_OK(Put("foo", "v1"));
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+ {{"DBImpl::NewIterator:3", "DBTest2::IterRaceFlush2:1"},
+ {"DBTest2::IterRaceFlush2:2", "DBImpl::NewIterator:4"}});
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ ROCKSDB_NAMESPACE::port::Thread t1([&] {
+ TEST_SYNC_POINT("DBTest2::IterRaceFlush2:1");
+ ASSERT_OK(Put("foo", "v2"));
+ ASSERT_OK(Flush());
+ TEST_SYNC_POINT("DBTest2::IterRaceFlush2:2");
+ });
+
+ // iterator is created after the first Put(), and its snapshot sequence is
+ // assigned before second Put(), thus it must see v1.
+ {
+ std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
+ it->Seek("foo");
+ ASSERT_TRUE(it->Valid());
+ ASSERT_OK(it->status());
+ ASSERT_EQ("foo", it->key().ToString());
+ ASSERT_EQ("v1", it->value().ToString());
+ }
+
+ t1.join();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+}
+
+TEST_F(DBTest2, IterRefreshRaceFlush) {
ASSERT_OK(Put("foo", "v1"));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
ROCKSDB_NAMESPACE::port::Thread t1([&] {
TEST_SYNC_POINT("DBTest2::IterRefreshRaceFlush:1");
ASSERT_OK(Put("foo", "v2"));
- Flush();
+ ASSERT_OK(Flush());
TEST_SYNC_POINT("DBTest2::IterRefreshRaceFlush:2");
});
- // iterator is created after the first Put(), so it should see either
- // "v1" or "v2".
+ // iterator is refreshed after the first Put(), and its sequence number is
+ // assigned after second Put(), thus it must see v2.
{
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
- it->Refresh();
+ ASSERT_OK(it->status());
+ ASSERT_OK(it->Refresh());
it->Seek("foo");
ASSERT_TRUE(it->Valid());
+ ASSERT_OK(it->status());
ASSERT_EQ("foo", it->key().ToString());
+ ASSERT_EQ("v2", it->value().ToString());
}
t1.join();
ROCKSDB_NAMESPACE::port::Thread t1([&] {
TEST_SYNC_POINT("DBTest2::GetRaceFlush:1");
ASSERT_OK(Put("foo", "v2"));
- Flush();
+ ASSERT_OK(Flush());
TEST_SYNC_POINT("DBTest2::GetRaceFlush:2");
});
port::Thread t1([&] {
TEST_SYNC_POINT("DBTest2::GetRaceFlush:1");
ASSERT_OK(Put("foo", "v2"));
- Flush();
+ ASSERT_OK(Flush());
TEST_SYNC_POINT("DBTest2::GetRaceFlush:2");
});
ASSERT_EQ("second", value);
// nothing should be returned using memtable-only iterator after flushing.
it = db_->NewIterator(ropt, handles_[1]);
+ ASSERT_OK(it->status());
count = 0;
for (it->SeekToFirst(); it->Valid(); it->Next()) {
ASSERT_TRUE(it->Valid());
}
ASSERT_TRUE(!it->Valid());
ASSERT_EQ(0, count);
+ ASSERT_OK(it->status());
delete it;
// Add a key to memtable
ASSERT_OK(Put(1, "foobar", "third"));
it = db_->NewIterator(ropt, handles_[1]);
+ ASSERT_OK(it->status());
count = 0;
for (it->SeekToFirst(); it->Valid(); it->Next()) {
ASSERT_TRUE(it->Valid());
}
ASSERT_TRUE(!it->Valid());
ASSERT_EQ(1, count);
+ ASSERT_OK(it->status());
delete it;
}
WriteOptions wo;
for (int i = 0; i < 6; i++) {
wo.low_pri = false;
- Put("", "", wo);
+ ASSERT_OK(Put("", "", wo));
wo.low_pri = true;
- Put("", "", wo);
- Flush();
+ ASSERT_OK(Put("", "", wo));
+ ASSERT_OK(Flush());
}
ASSERT_EQ(0, rate_limit_count.load());
wo.low_pri = true;
- Put("", "", wo);
+ ASSERT_OK(Put("", "", wo));
ASSERT_EQ(1, rate_limit_count.load());
wo.low_pri = false;
- Put("", "", wo);
+ ASSERT_OK(Put("", "", wo));
ASSERT_EQ(1, rate_limit_count.load());
TEST_SYNC_POINT("DBTest.LowPriWrite:0");
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
wo.low_pri = true;
- Put("", "", wo);
+ ASSERT_OK(Put("", "", wo));
ASSERT_EQ(1, rate_limit_count.load());
wo.low_pri = false;
- Put("", "", wo);
+ ASSERT_OK(Put("", "", wo));
ASSERT_EQ(1, rate_limit_count.load());
}
const int kBytesPerKey = 1024;
const int kNumL0Files = 4;
- for (auto use_direct_io : {false, true}) {
- if (use_direct_io && !IsDirectIOSupported()) {
- continue;
- }
- Options options = CurrentOptions();
- options.compression = kNoCompression;
- options.level0_file_num_compaction_trigger = kNumL0Files;
- options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
- options.new_table_reader_for_compaction_inputs = true;
- // takes roughly one second, split into 100 x 10ms intervals. Each interval
- // permits 5.12KB, which is smaller than the block size, so this test
- // exercises the code for chunking reads.
- options.rate_limiter.reset(NewGenericRateLimiter(
- static_cast<int64_t>(kNumL0Files * kNumKeysPerFile *
- kBytesPerKey) /* rate_bytes_per_sec */,
- 10 * 1000 /* refill_period_us */, 10 /* fairness */,
- RateLimiter::Mode::kReadsOnly));
- options.use_direct_reads = options.use_direct_io_for_flush_and_compaction =
- use_direct_io;
- BlockBasedTableOptions bbto;
- bbto.block_size = 16384;
- bbto.no_block_cache = true;
- options.table_factory.reset(NewBlockBasedTableFactory(bbto));
- DestroyAndReopen(options);
-
- for (int i = 0; i < kNumL0Files; ++i) {
- for (int j = 0; j <= kNumKeysPerFile; ++j) {
- ASSERT_OK(Put(Key(j), DummyString(kBytesPerKey)));
+ for (int compaction_readahead_size : {0, 32 << 10}) {
+ for (auto use_direct_io : {false, true}) {
+ if (use_direct_io && !IsDirectIOSupported()) {
+ continue;
}
- dbfull()->TEST_WaitForFlushMemTable();
- ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
- }
- dbfull()->TEST_WaitForCompact();
- ASSERT_EQ(0, NumTableFilesAtLevel(0));
-
- ASSERT_EQ(0, options.rate_limiter->GetTotalBytesThrough(Env::IO_HIGH));
- // should be slightly above 512KB due to non-data blocks read. Arbitrarily
- // chose 1MB as the upper bound on the total bytes read.
- size_t rate_limited_bytes =
- options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW);
- // Include the explicit prefetch of the footer in direct I/O case.
- size_t direct_io_extra = use_direct_io ? 512 * 1024 : 0;
- ASSERT_GE(
- rate_limited_bytes,
- static_cast<size_t>(kNumKeysPerFile * kBytesPerKey * kNumL0Files));
- ASSERT_LT(
- rate_limited_bytes,
- static_cast<size_t>(2 * kNumKeysPerFile * kBytesPerKey * kNumL0Files +
- direct_io_extra));
+ Options options = CurrentOptions();
+ options.compaction_readahead_size = compaction_readahead_size;
+ options.compression = kNoCompression;
+ options.level0_file_num_compaction_trigger = kNumL0Files;
+ options.memtable_factory.reset(
+ test::NewSpecialSkipListFactory(kNumKeysPerFile));
+ // takes roughly one second, split into 100 x 10ms intervals. Each
+ // interval permits 5.12KB, which is smaller than the block size, so this
+ // test exercises the code for chunking reads.
+ options.rate_limiter.reset(NewGenericRateLimiter(
+ static_cast<int64_t>(kNumL0Files * kNumKeysPerFile *
+ kBytesPerKey) /* rate_bytes_per_sec */,
+ 10 * 1000 /* refill_period_us */, 10 /* fairness */,
+ RateLimiter::Mode::kReadsOnly));
+ options.use_direct_reads =
+ options.use_direct_io_for_flush_and_compaction = use_direct_io;
+ BlockBasedTableOptions bbto;
+ bbto.block_size = 16384;
+ bbto.no_block_cache = true;
+ options.table_factory.reset(NewBlockBasedTableFactory(bbto));
+ DestroyAndReopen(options);
- Iterator* iter = db_->NewIterator(ReadOptions());
- for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
- ASSERT_EQ(iter->value().ToString(), DummyString(kBytesPerKey));
+ for (int i = 0; i < kNumL0Files; ++i) {
+ for (int j = 0; j <= kNumKeysPerFile; ++j) {
+ ASSERT_OK(Put(Key(j), DummyString(kBytesPerKey)));
+ }
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+ if (i + 1 < kNumL0Files) {
+ ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
+ }
+ }
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ ASSERT_EQ(0, NumTableFilesAtLevel(0));
+
+ // should be slightly above 512KB due to non-data blocks read. Arbitrarily
+ // chose 1MB as the upper bound on the total bytes read.
+ size_t rate_limited_bytes = static_cast<size_t>(
+ options.rate_limiter->GetTotalBytesThrough(Env::IO_TOTAL));
+ // The charges can exist for `IO_LOW` and `IO_USER` priorities.
+ size_t rate_limited_bytes_by_pri =
+ options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW) +
+ options.rate_limiter->GetTotalBytesThrough(Env::IO_USER);
+ ASSERT_EQ(rate_limited_bytes,
+ static_cast<size_t>(rate_limited_bytes_by_pri));
+ // Include the explicit prefetch of the footer in direct I/O case.
+ size_t direct_io_extra = use_direct_io ? 512 * 1024 : 0;
+ ASSERT_GE(
+ rate_limited_bytes,
+ static_cast<size_t>(kNumKeysPerFile * kBytesPerKey * kNumL0Files));
+ ASSERT_LT(
+ rate_limited_bytes,
+ static_cast<size_t>(2 * kNumKeysPerFile * kBytesPerKey * kNumL0Files +
+ direct_io_extra));
+
+ Iterator* iter = db_->NewIterator(ReadOptions());
+ ASSERT_OK(iter->status());
+ for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+ ASSERT_EQ(iter->value().ToString(), DummyString(kBytesPerKey));
+ }
+ delete iter;
+ // bytes read for user iterator shouldn't count against the rate limit.
+ rate_limited_bytes_by_pri =
+ options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW) +
+ options.rate_limiter->GetTotalBytesThrough(Env::IO_USER);
+ ASSERT_EQ(rate_limited_bytes,
+ static_cast<size_t>(rate_limited_bytes_by_pri));
}
- delete iter;
- // bytes read for user iterator shouldn't count against the rate limit.
- ASSERT_EQ(rate_limited_bytes,
- static_cast<size_t>(
- options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW)));
}
}
#endif // ROCKSDB_LITE
options.disable_auto_compactions = true;
options.num_levels = 7;
Reopen(options);
- Put("foo", "bar");
- Flush();
+ ASSERT_OK(Put("foo", "bar"));
+ ASSERT_OK(Flush());
MoveFilesToLevel(6);
#ifndef ROCKSDB_LITE
ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
CompactRangeOptions compact_options;
compact_options.change_level = true;
compact_options.target_level = 1;
- dbfull()->CompactRange(compact_options, nullptr, nullptr);
+ ASSERT_OK(dbfull()->CompactRange(compact_options, nullptr, nullptr));
#ifndef ROCKSDB_LITE
ASSERT_EQ("0,1", FilesPerLevel());
#endif // !ROCKSDB_LITE
// the DB instead of assuming what seq the DB used.
int i = 1;
for (; i < 10; i++) {
- Put(key, value + std::to_string(i));
+ ASSERT_OK(Put(key, value + std::to_string(i)));
// Take a snapshot to avoid the value being removed during compaction
auto snapshot = dbfull()->GetSnapshot();
snapshots.push_back(snapshot);
}
- Flush();
+ ASSERT_OK(Flush());
for (; i < 20; i++) {
- Put(key, value + std::to_string(i));
+ ASSERT_OK(Put(key, value + std::to_string(i)));
// Take a snapshot to avoid the value being removed during compaction
auto snapshot = dbfull()->GetSnapshot();
snapshots.push_back(snapshot);
}
- Flush();
+ ASSERT_OK(Flush());
MoveFilesToLevel(6);
#ifndef ROCKSDB_LITE
ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
#endif // !ROCKSDB_LITE
for (; i < 30; i++) {
- Put(key, value + std::to_string(i));
+ ASSERT_OK(Put(key, value + std::to_string(i)));
auto snapshot = dbfull()->GetSnapshot();
snapshots.push_back(snapshot);
}
- Flush();
+ ASSERT_OK(Flush());
#ifndef ROCKSDB_LITE
ASSERT_EQ("1,0,0,0,0,0,2", FilesPerLevel());
#endif // !ROCKSDB_LITE
// And also add some values to the memtable
for (; i < 40; i++) {
- Put(key, value + std::to_string(i));
+ ASSERT_OK(Put(key, value + std::to_string(i)));
auto snapshot = dbfull()->GetSnapshot();
snapshots.push_back(snapshot);
}
[&](void* /*arg*/) { env_->SleepForMicroseconds(1000000); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- Put("key", "val");
+ ASSERT_OK(Put("key", "val"));
FlushOptions flush_opts;
flush_opts.wait = false;
db_->Flush(flush_opts);
TEST_SYNC_POINT("DBTest2::LiveFilesOmitObsoleteFiles:FlushTriggered");
- db_->DisableFileDeletions();
+ ASSERT_OK(db_->DisableFileDeletions());
VectorLogPtr log_files;
- db_->GetSortedWalFiles(log_files);
+ ASSERT_OK(db_->GetSortedWalFiles(log_files));
TEST_SYNC_POINT("DBTest2::LiveFilesOmitObsoleteFiles:LiveFilesCaptured");
for (const auto& log_file : log_files) {
ASSERT_OK(env_->FileExists(LogFileName(dbname_, log_file->LogNumber())));
}
- db_->EnableFileDeletions();
+ ASSERT_OK(db_->EnableFileDeletions());
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest2, TestNumPread) {
Options options = CurrentOptions();
+ bool prefetch_supported =
+ test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
// disable block cache
BlockBasedTableOptions table_options;
table_options.no_block_cache = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Reopen(options);
env_->count_random_reads_ = true;
-
env_->random_file_open_counter_.store(0);
ASSERT_OK(Put("bar", "foo"));
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Flush());
- // After flush, we'll open the file and read footer, meta block,
- // property block and index block.
- ASSERT_EQ(4, env_->random_read_counter_.Read());
+ if (prefetch_supported) {
+ // After flush, we'll open the file and read footer, meta block,
+ // property block and index block.
+ ASSERT_EQ(4, env_->random_read_counter_.Read());
+ } else {
+ // With prefetch not supported, we will do a single read into a buffer
+ ASSERT_EQ(1, env_->random_read_counter_.Read());
+ }
ASSERT_EQ(1, env_->random_file_open_counter_.load());
// One pread per a normal data block read
ASSERT_OK(Put("bar2", "foo2"));
ASSERT_OK(Put("foo2", "bar2"));
ASSERT_OK(Flush());
- // After flush, we'll open the file and read footer, meta block,
- // property block and index block.
- ASSERT_EQ(4, env_->random_read_counter_.Read());
+ if (prefetch_supported) {
+ // After flush, we'll open the file and read footer, meta block,
+ // property block and index block.
+ ASSERT_EQ(4, env_->random_read_counter_.Read());
+ } else {
+ // With prefetch not supported, we will do a single read into a buffer
+ ASSERT_EQ(1, env_->random_read_counter_.Read());
+ }
ASSERT_EQ(1, env_->random_file_open_counter_.load());
- // Compaction needs two input blocks, which requires 2 preads, and
- // generate a new SST file which needs 4 preads (footer, meta block,
- // property block and index block). In total 6.
env_->random_file_open_counter_.store(0);
env_->random_read_counter_.Reset();
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- ASSERT_EQ(6, env_->random_read_counter_.Read());
- // All compactin input files should have already been opened.
+ if (prefetch_supported) {
+ // Compaction needs two input blocks, which requires 2 preads, and
+ // generate a new SST file which needs 4 preads (footer, meta block,
+ // property block and index block). In total 6.
+ ASSERT_EQ(6, env_->random_read_counter_.Read());
+ } else {
+ // With prefetch off, compaction needs two input blocks,
+ // followed by a single buffered read. In total 3.
+ ASSERT_EQ(3, env_->random_read_counter_.Read());
+ }
+ // All compaction input files should have already been opened.
ASSERT_EQ(1, env_->random_file_open_counter_.load());
// One pread per a normal data block read
ASSERT_EQ(0, env_->random_file_open_counter_.load());
}
-TEST_F(DBTest2, TraceAndReplay) {
- Options options = CurrentOptions();
- options.merge_operator = MergeOperators::CreatePutOperator();
- ReadOptions ro;
- WriteOptions wo;
- TraceOptions trace_opts;
- EnvOptions env_opts;
- CreateAndReopenWithCF({"pikachu"}, options);
- Random rnd(301);
- Iterator* single_iter = nullptr;
-
- ASSERT_TRUE(db_->EndTrace().IsIOError());
+class TraceExecutionResultHandler : public TraceRecordResult::Handler {
+ public:
+ TraceExecutionResultHandler() {}
+ ~TraceExecutionResultHandler() override {}
- std::string trace_filename = dbname_ + "/rocksdb.trace";
- std::unique_ptr<TraceWriter> trace_writer;
- ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
+ virtual Status Handle(const StatusOnlyTraceExecutionResult& result) override {
+ if (result.GetStartTimestamp() > result.GetEndTimestamp()) {
+ return Status::InvalidArgument("Invalid timestamps.");
+ }
+ result.GetStatus().PermitUncheckedError();
+ switch (result.GetTraceType()) {
+ case kTraceWrite: {
+ total_latency_ += result.GetLatency();
+ cnt_++;
+ writes_++;
+ break;
+ }
+ default:
+ return Status::Corruption("Type mismatch.");
+ }
+ return Status::OK();
+ }
+
+ virtual Status Handle(
+ const SingleValueTraceExecutionResult& result) override {
+ if (result.GetStartTimestamp() > result.GetEndTimestamp()) {
+ return Status::InvalidArgument("Invalid timestamps.");
+ }
+ result.GetStatus().PermitUncheckedError();
+ switch (result.GetTraceType()) {
+ case kTraceGet: {
+ total_latency_ += result.GetLatency();
+ cnt_++;
+ gets_++;
+ break;
+ }
+ default:
+ return Status::Corruption("Type mismatch.");
+ }
+ return Status::OK();
+ }
+
+ virtual Status Handle(
+ const MultiValuesTraceExecutionResult& result) override {
+ if (result.GetStartTimestamp() > result.GetEndTimestamp()) {
+ return Status::InvalidArgument("Invalid timestamps.");
+ }
+ for (const Status& s : result.GetMultiStatus()) {
+ s.PermitUncheckedError();
+ }
+ switch (result.GetTraceType()) {
+ case kTraceMultiGet: {
+ total_latency_ += result.GetLatency();
+ cnt_++;
+ multigets_++;
+ break;
+ }
+ default:
+ return Status::Corruption("Type mismatch.");
+ }
+ return Status::OK();
+ }
+
+ virtual Status Handle(const IteratorTraceExecutionResult& result) override {
+ if (result.GetStartTimestamp() > result.GetEndTimestamp()) {
+ return Status::InvalidArgument("Invalid timestamps.");
+ }
+ result.GetStatus().PermitUncheckedError();
+ switch (result.GetTraceType()) {
+ case kTraceIteratorSeek:
+ case kTraceIteratorSeekForPrev: {
+ total_latency_ += result.GetLatency();
+ cnt_++;
+ seeks_++;
+ break;
+ }
+ default:
+ return Status::Corruption("Type mismatch.");
+ }
+ return Status::OK();
+ }
+
+ void Reset() {
+ total_latency_ = 0;
+ cnt_ = 0;
+ writes_ = 0;
+ gets_ = 0;
+ seeks_ = 0;
+ multigets_ = 0;
+ }
+
+ double GetAvgLatency() const {
+ return cnt_ == 0 ? 0.0 : 1.0 * total_latency_ / cnt_;
+ }
+
+ int GetNumWrites() const { return writes_; }
+
+ int GetNumGets() const { return gets_; }
+
+ int GetNumIterSeeks() const { return seeks_; }
+
+ int GetNumMultiGets() const { return multigets_; }
+
+ private:
+ std::atomic<uint64_t> total_latency_{0};
+ std::atomic<uint32_t> cnt_{0};
+ std::atomic<int> writes_{0};
+ std::atomic<int> gets_{0};
+ std::atomic<int> seeks_{0};
+ std::atomic<int> multigets_{0};
+};
+
+TEST_F(DBTest2, TraceAndReplay) {
+ Options options = CurrentOptions();
+ options.merge_operator = MergeOperators::CreatePutOperator();
+ ReadOptions ro;
+ WriteOptions wo;
+ TraceOptions trace_opts;
+ EnvOptions env_opts;
+ CreateAndReopenWithCF({"pikachu"}, options);
+ Random rnd(301);
+ Iterator* single_iter = nullptr;
+
+ ASSERT_TRUE(db_->EndTrace().IsIOError());
+
+ std::string trace_filename = dbname_ + "/rocksdb.trace";
+ std::unique_ptr<TraceWriter> trace_writer;
+ ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
+ ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
+
+ // 5 Writes
+ ASSERT_OK(Put(0, "a", "1"));
+ ASSERT_OK(Merge(0, "b", "2"));
+ ASSERT_OK(Delete(0, "c"));
+ ASSERT_OK(SingleDelete(0, "d"));
+ ASSERT_OK(db_->DeleteRange(wo, dbfull()->DefaultColumnFamily(), "e", "f"));
+
+ // 6th Write
+ WriteBatch batch;
+ ASSERT_OK(batch.Put("f", "11"));
+ ASSERT_OK(batch.Merge("g", "12"));
+ ASSERT_OK(batch.Delete("h"));
+ ASSERT_OK(batch.SingleDelete("i"));
+ ASSERT_OK(batch.DeleteRange("j", "k"));
+ ASSERT_OK(db_->Write(wo, &batch));
+
+ // 2 Seek(ForPrev)s
+ single_iter = db_->NewIterator(ro);
+ single_iter->Seek("f"); // Seek 1
+ single_iter->SeekForPrev("g");
+ ASSERT_OK(single_iter->status());
+ delete single_iter;
+
+ // 2 Gets
+ ASSERT_EQ("1", Get(0, "a"));
+ ASSERT_EQ("12", Get(0, "g"));
+
+ // 7th and 8th Write, 3rd Get
+ ASSERT_OK(Put(1, "foo", "bar"));
+ ASSERT_OK(Put(1, "rocksdb", "rocks"));
+ ASSERT_EQ("NOT_FOUND", Get(1, "leveldb"));
+
+ // Total Write x 8, Get x 3, Seek x 2.
+ ASSERT_OK(db_->EndTrace());
+ // These should not get into the trace file as it is after EndTrace.
+ ASSERT_OK(Put("hello", "world"));
+ ASSERT_OK(Merge("foo", "bar"));
+
+ // Open another db, replay, and verify the data
+ std::string value;
+ std::string dbname2 = test::PerThreadDBPath(env_, "/db_replay");
+ ASSERT_OK(DestroyDB(dbname2, options));
+
+ // Using a different name than db2, to pacify infer's use-after-lifetime
+ // warnings (http://fbinfer.com).
+ DB* db2_init = nullptr;
+ options.create_if_missing = true;
+ ASSERT_OK(DB::Open(options, dbname2, &db2_init));
+ ColumnFamilyHandle* cf;
+ ASSERT_OK(
+ db2_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf));
+ delete cf;
+ delete db2_init;
+
+ DB* db2 = nullptr;
+ std::vector<ColumnFamilyDescriptor> column_families;
+ ColumnFamilyOptions cf_options;
+ cf_options.merge_operator = MergeOperators::CreatePutOperator();
+ column_families.push_back(ColumnFamilyDescriptor("default", cf_options));
+ column_families.push_back(
+ ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions()));
+ std::vector<ColumnFamilyHandle*> handles;
+ DBOptions db_opts;
+ db_opts.env = env_;
+ ASSERT_OK(DB::Open(db_opts, dbname2, column_families, &handles, &db2));
+
+ env_->SleepForMicroseconds(100);
+ // Verify that the keys don't already exist
+ ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
+ ASSERT_TRUE(db2->Get(ro, handles[0], "g", &value).IsNotFound());
+
+ std::unique_ptr<TraceReader> trace_reader;
+ ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
+ std::unique_ptr<Replayer> replayer;
+ ASSERT_OK(
+ db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
+
+ TraceExecutionResultHandler res_handler;
+ std::function<void(Status, std::unique_ptr<TraceRecordResult> &&)> res_cb =
+ [&res_handler](Status exec_s, std::unique_ptr<TraceRecordResult>&& res) {
+ ASSERT_TRUE(exec_s.ok() || exec_s.IsNotSupported());
+ if (res != nullptr) {
+ ASSERT_OK(res->Accept(&res_handler));
+ res.reset();
+ }
+ };
+
+ // Unprepared replay should fail with Status::Incomplete()
+ ASSERT_TRUE(replayer->Replay(ReplayOptions(), nullptr).IsIncomplete());
+ ASSERT_OK(replayer->Prepare());
+ // Ok to repeatedly Prepare().
+ ASSERT_OK(replayer->Prepare());
+ // Replay using 1 thread, 1x speed.
+ ASSERT_OK(replayer->Replay(ReplayOptions(1, 1.0), res_cb));
+ ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
+ ASSERT_EQ(res_handler.GetNumWrites(), 8);
+ ASSERT_EQ(res_handler.GetNumGets(), 3);
+ ASSERT_EQ(res_handler.GetNumIterSeeks(), 2);
+ ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
+ res_handler.Reset();
+
+ ASSERT_OK(db2->Get(ro, handles[0], "a", &value));
+ ASSERT_EQ("1", value);
+ ASSERT_OK(db2->Get(ro, handles[0], "g", &value));
+ ASSERT_EQ("12", value);
+ ASSERT_TRUE(db2->Get(ro, handles[0], "hello", &value).IsNotFound());
+ ASSERT_TRUE(db2->Get(ro, handles[0], "world", &value).IsNotFound());
+
+ ASSERT_OK(db2->Get(ro, handles[1], "foo", &value));
+ ASSERT_EQ("bar", value);
+ ASSERT_OK(db2->Get(ro, handles[1], "rocksdb", &value));
+ ASSERT_EQ("rocks", value);
+
+ // Re-replay should fail with Status::Incomplete() if Prepare() was not
+ // called. Currently we don't distinguish between unprepared and trace end.
+ ASSERT_TRUE(replayer->Replay(ReplayOptions(), nullptr).IsIncomplete());
+
+ // Re-replay using 2 threads, 2x speed.
+ ASSERT_OK(replayer->Prepare());
+ ASSERT_OK(replayer->Replay(ReplayOptions(2, 2.0), res_cb));
+ ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
+ ASSERT_EQ(res_handler.GetNumWrites(), 8);
+ ASSERT_EQ(res_handler.GetNumGets(), 3);
+ ASSERT_EQ(res_handler.GetNumIterSeeks(), 2);
+ ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
+ res_handler.Reset();
+
+ // Re-replay using 2 threads, 1/2 speed.
+ ASSERT_OK(replayer->Prepare());
+ ASSERT_OK(replayer->Replay(ReplayOptions(2, 0.5), res_cb));
+ ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
+ ASSERT_EQ(res_handler.GetNumWrites(), 8);
+ ASSERT_EQ(res_handler.GetNumGets(), 3);
+ ASSERT_EQ(res_handler.GetNumIterSeeks(), 2);
+ ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
+ res_handler.Reset();
+
+ replayer.reset();
+
+ for (auto handle : handles) {
+ delete handle;
+ }
+ delete db2;
+ ASSERT_OK(DestroyDB(dbname2, options));
+}
+
+TEST_F(DBTest2, TraceAndManualReplay) {
+ Options options = CurrentOptions();
+ options.merge_operator = MergeOperators::CreatePutOperator();
+ ReadOptions ro;
+ WriteOptions wo;
+ TraceOptions trace_opts;
+ EnvOptions env_opts;
+ CreateAndReopenWithCF({"pikachu"}, options);
+ Random rnd(301);
+ Iterator* single_iter = nullptr;
+
+ ASSERT_TRUE(db_->EndTrace().IsIOError());
+
+ std::string trace_filename = dbname_ + "/rocksdb.trace";
+ std::unique_ptr<TraceWriter> trace_writer;
+ ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
ASSERT_OK(Put(0, "a", "1"));
single_iter = db_->NewIterator(ro);
single_iter->Seek("f");
single_iter->SeekForPrev("g");
+ ASSERT_OK(single_iter->status());
+ delete single_iter;
+
+ // Write some sequenced keys for testing lower/upper bounds of iterator.
+ batch.Clear();
+ ASSERT_OK(batch.Put("iter-0", "iter-0"));
+ ASSERT_OK(batch.Put("iter-1", "iter-1"));
+ ASSERT_OK(batch.Put("iter-2", "iter-2"));
+ ASSERT_OK(batch.Put("iter-3", "iter-3"));
+ ASSERT_OK(batch.Put("iter-4", "iter-4"));
+ ASSERT_OK(db_->Write(wo, &batch));
+
+ ReadOptions bounded_ro = ro;
+ Slice lower_bound("iter-1");
+ Slice upper_bound("iter-3");
+ bounded_ro.iterate_lower_bound = &lower_bound;
+ bounded_ro.iterate_upper_bound = &upper_bound;
+ single_iter = db_->NewIterator(bounded_ro);
+ single_iter->Seek("iter-0");
+ ASSERT_EQ(single_iter->key().ToString(), "iter-1");
+ single_iter->Seek("iter-2");
+ ASSERT_EQ(single_iter->key().ToString(), "iter-2");
+ single_iter->Seek("iter-4");
+ ASSERT_FALSE(single_iter->Valid());
+ single_iter->SeekForPrev("iter-0");
+ ASSERT_FALSE(single_iter->Valid());
+ single_iter->SeekForPrev("iter-2");
+ ASSERT_EQ(single_iter->key().ToString(), "iter-2");
+ single_iter->SeekForPrev("iter-4");
+ ASSERT_EQ(single_iter->key().ToString(), "iter-2");
+ ASSERT_OK(single_iter->status());
delete single_iter;
ASSERT_EQ("1", Get(0, "a"));
ASSERT_OK(Put(1, "rocksdb", "rocks"));
ASSERT_EQ("NOT_FOUND", Get(1, "leveldb"));
+ // Same as TraceAndReplay, Write x 8, Get x 3, Seek x 2.
+ // Plus 1 WriteBatch for iterator with lower/upper bounds, and 6
+ // Seek(ForPrev)s.
+ // Total Write x 9, Get x 3, Seek x 8
ASSERT_OK(db_->EndTrace());
// These should not get into the trace file as it is after EndTrace.
- Put("hello", "world");
- Merge("foo", "bar");
+ ASSERT_OK(Put("hello", "world"));
+ ASSERT_OK(Merge("foo", "bar"));
// Open another db, replay, and verify the data
std::string value;
std::unique_ptr<TraceReader> trace_reader;
ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
- Replayer replayer(db2, handles_, std::move(trace_reader));
- ASSERT_OK(replayer.Replay());
+ std::unique_ptr<Replayer> replayer;
+ ASSERT_OK(
+ db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
+
+ TraceExecutionResultHandler res_handler;
+
+ // Manual replay for 2 times. The 2nd checks if the replay can restart.
+ std::unique_ptr<TraceRecord> record;
+ std::unique_ptr<TraceRecordResult> result;
+ for (int i = 0; i < 2; i++) {
+ // Next should fail if unprepared.
+ ASSERT_TRUE(replayer->Next(nullptr).IsIncomplete());
+ ASSERT_OK(replayer->Prepare());
+ Status s = Status::OK();
+ // Looping until trace end.
+ while (s.ok()) {
+ s = replayer->Next(&record);
+ // Skip unsupported operations.
+ if (s.IsNotSupported()) {
+ continue;
+ }
+ if (s.ok()) {
+ ASSERT_OK(replayer->Execute(record, &result));
+ if (result != nullptr) {
+ ASSERT_OK(result->Accept(&res_handler));
+ if (record->GetTraceType() == kTraceIteratorSeek ||
+ record->GetTraceType() == kTraceIteratorSeekForPrev) {
+ IteratorSeekQueryTraceRecord* iter_rec =
+ dynamic_cast<IteratorSeekQueryTraceRecord*>(record.get());
+ IteratorTraceExecutionResult* iter_res =
+ dynamic_cast<IteratorTraceExecutionResult*>(result.get());
+ // Check if lower/upper bounds are correctly saved and decoded.
+ std::string lower_str = iter_rec->GetLowerBound().ToString();
+ std::string upper_str = iter_rec->GetUpperBound().ToString();
+ std::string iter_key = iter_res->GetKey().ToString();
+ std::string iter_value = iter_res->GetValue().ToString();
+ if (!lower_str.empty() && !upper_str.empty()) {
+ ASSERT_EQ(lower_str, "iter-1");
+ ASSERT_EQ(upper_str, "iter-3");
+ if (iter_res->GetValid()) {
+ // If iterator is valid, then lower_bound <= key < upper_bound.
+ ASSERT_GE(iter_key, lower_str);
+ ASSERT_LT(iter_key, upper_str);
+ } else {
+ // If iterator is invalid, then
+ // key < lower_bound or key >= upper_bound.
+ ASSERT_TRUE(iter_key < lower_str || iter_key >= upper_str);
+ }
+ }
+ // If iterator is invalid, the key and value should be empty.
+ if (!iter_res->GetValid()) {
+ ASSERT_TRUE(iter_key.empty());
+ ASSERT_TRUE(iter_value.empty());
+ }
+ }
+ result.reset();
+ }
+ }
+ }
+ // Status::Incomplete() will be returned when manually reading the trace
+ // end, or Prepare() was not called.
+ ASSERT_TRUE(s.IsIncomplete());
+ ASSERT_TRUE(replayer->Next(nullptr).IsIncomplete());
+ ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
+ ASSERT_EQ(res_handler.GetNumWrites(), 9);
+ ASSERT_EQ(res_handler.GetNumGets(), 3);
+ ASSERT_EQ(res_handler.GetNumIterSeeks(), 8);
+ ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
+ res_handler.Reset();
+ }
ASSERT_OK(db2->Get(ro, handles[0], "a", &value));
ASSERT_EQ("1", value);
ASSERT_OK(db2->Get(ro, handles[1], "rocksdb", &value));
ASSERT_EQ("rocks", value);
+ // Test execution of artificially created TraceRecords.
+ uint64_t fake_ts = 1U;
+ // Write
+ batch.Clear();
+ ASSERT_OK(batch.Put("trace-record-write1", "write1"));
+ ASSERT_OK(batch.Put("trace-record-write2", "write2"));
+ record.reset(new WriteQueryTraceRecord(batch.Data(), fake_ts++));
+ ASSERT_OK(replayer->Execute(record, &result));
+ ASSERT_TRUE(result != nullptr);
+ ASSERT_OK(result->Accept(&res_handler)); // Write x 1
+ ASSERT_OK(db2->Get(ro, handles[0], "trace-record-write1", &value));
+ ASSERT_EQ("write1", value);
+ ASSERT_OK(db2->Get(ro, handles[0], "trace-record-write2", &value));
+ ASSERT_EQ("write2", value);
+ ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
+ ASSERT_EQ(res_handler.GetNumWrites(), 1);
+ ASSERT_EQ(res_handler.GetNumGets(), 0);
+ ASSERT_EQ(res_handler.GetNumIterSeeks(), 0);
+ ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
+ res_handler.Reset();
+
+ // Get related
+ // Get an existing key.
+ record.reset(new GetQueryTraceRecord(handles[0]->GetID(),
+ "trace-record-write1", fake_ts++));
+ ASSERT_OK(replayer->Execute(record, &result));
+ ASSERT_TRUE(result != nullptr);
+ ASSERT_OK(result->Accept(&res_handler)); // Get x 1
+ // Get an non-existing key, should still return Status::OK().
+ record.reset(new GetQueryTraceRecord(handles[0]->GetID(), "trace-record-get",
+ fake_ts++));
+ ASSERT_OK(replayer->Execute(record, &result));
+ ASSERT_TRUE(result != nullptr);
+ ASSERT_OK(result->Accept(&res_handler)); // Get x 2
+ // Get from an invalid (non-existing) cf_id.
+ uint32_t invalid_cf_id = handles[1]->GetID() + 1;
+ record.reset(new GetQueryTraceRecord(invalid_cf_id, "whatever", fake_ts++));
+ ASSERT_TRUE(replayer->Execute(record, &result).IsCorruption());
+ ASSERT_TRUE(result == nullptr);
+ ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
+ ASSERT_EQ(res_handler.GetNumWrites(), 0);
+ ASSERT_EQ(res_handler.GetNumGets(), 2);
+ ASSERT_EQ(res_handler.GetNumIterSeeks(), 0);
+ ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
+ res_handler.Reset();
+
+ // Iteration related
+ for (IteratorSeekQueryTraceRecord::SeekType seekType :
+ {IteratorSeekQueryTraceRecord::kSeek,
+ IteratorSeekQueryTraceRecord::kSeekForPrev}) {
+ // Seek to an existing key.
+ record.reset(new IteratorSeekQueryTraceRecord(
+ seekType, handles[0]->GetID(), "trace-record-write1", fake_ts++));
+ ASSERT_OK(replayer->Execute(record, &result));
+ ASSERT_TRUE(result != nullptr);
+ ASSERT_OK(result->Accept(&res_handler)); // Seek x 1 in one iteration
+ // Seek to an non-existing key, should still return Status::OK().
+ record.reset(new IteratorSeekQueryTraceRecord(
+ seekType, handles[0]->GetID(), "trace-record-get", fake_ts++));
+ ASSERT_OK(replayer->Execute(record, &result));
+ ASSERT_TRUE(result != nullptr);
+ ASSERT_OK(result->Accept(&res_handler)); // Seek x 2 in one iteration
+ // Seek from an invalid cf_id.
+ record.reset(new IteratorSeekQueryTraceRecord(seekType, invalid_cf_id,
+ "whatever", fake_ts++));
+ ASSERT_TRUE(replayer->Execute(record, &result).IsCorruption());
+ ASSERT_TRUE(result == nullptr);
+ }
+ ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
+ ASSERT_EQ(res_handler.GetNumWrites(), 0);
+ ASSERT_EQ(res_handler.GetNumGets(), 0);
+ ASSERT_EQ(res_handler.GetNumIterSeeks(), 4); // Seek x 2 in two iterations
+ ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
+ res_handler.Reset();
+
+ // MultiGet related
+ // Get existing keys.
+ record.reset(new MultiGetQueryTraceRecord(
+ std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
+ std::vector<std::string>({"a", "foo"}), fake_ts++));
+ ASSERT_OK(replayer->Execute(record, &result));
+ ASSERT_TRUE(result != nullptr);
+ ASSERT_OK(result->Accept(&res_handler)); // MultiGet x 1
+ // Get all non-existing keys, should still return Status::OK().
+ record.reset(new MultiGetQueryTraceRecord(
+ std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
+ std::vector<std::string>({"no1", "no2"}), fake_ts++));
+ ASSERT_OK(replayer->Execute(record, &result));
+ ASSERT_TRUE(result != nullptr);
+ ASSERT_OK(result->Accept(&res_handler)); // MultiGet x 2
+ // Get mixed of existing and non-existing keys, should still return
+ // Status::OK().
+ record.reset(new MultiGetQueryTraceRecord(
+ std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
+ std::vector<std::string>({"a", "no2"}), fake_ts++));
+ ASSERT_OK(replayer->Execute(record, &result));
+ ASSERT_TRUE(result != nullptr);
+ MultiValuesTraceExecutionResult* mvr =
+ dynamic_cast<MultiValuesTraceExecutionResult*>(result.get());
+ ASSERT_TRUE(mvr != nullptr);
+ ASSERT_OK(mvr->GetMultiStatus()[0]);
+ ASSERT_TRUE(mvr->GetMultiStatus()[1].IsNotFound());
+ ASSERT_EQ(mvr->GetValues()[0], "1");
+ ASSERT_EQ(mvr->GetValues()[1], "");
+ ASSERT_OK(result->Accept(&res_handler)); // MultiGet x 3
+ // Get from an invalid (non-existing) cf_id.
+ record.reset(new MultiGetQueryTraceRecord(
+ std::vector<uint32_t>(
+ {handles[0]->GetID(), handles[1]->GetID(), invalid_cf_id}),
+ std::vector<std::string>({"a", "foo", "whatever"}), fake_ts++));
+ ASSERT_TRUE(replayer->Execute(record, &result).IsCorruption());
+ ASSERT_TRUE(result == nullptr);
+ // Empty MultiGet
+ record.reset(new MultiGetQueryTraceRecord(
+ std::vector<uint32_t>(), std::vector<std::string>(), fake_ts++));
+ ASSERT_TRUE(replayer->Execute(record, &result).IsInvalidArgument());
+ ASSERT_TRUE(result == nullptr);
+ // MultiGet size mismatch
+ record.reset(new MultiGetQueryTraceRecord(
+ std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
+ std::vector<std::string>({"a"}), fake_ts++));
+ ASSERT_TRUE(replayer->Execute(record, &result).IsInvalidArgument());
+ ASSERT_TRUE(result == nullptr);
+ ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
+ ASSERT_EQ(res_handler.GetNumWrites(), 0);
+ ASSERT_EQ(res_handler.GetNumGets(), 0);
+ ASSERT_EQ(res_handler.GetNumIterSeeks(), 0);
+ ASSERT_EQ(res_handler.GetNumMultiGets(), 3);
+ res_handler.Reset();
+
+ replayer.reset();
+
for (auto handle : handles) {
delete handle;
}
std::unique_ptr<TraceReader> trace_reader;
ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
- Replayer replayer(db2, handles_, std::move(trace_reader));
- ASSERT_OK(replayer.Replay());
+ std::unique_ptr<Replayer> replayer;
+ ASSERT_OK(
+ db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
+ ASSERT_OK(replayer->Prepare());
+ ASSERT_OK(replayer->Replay(ReplayOptions(), nullptr));
+ replayer.reset();
ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "b", &value).IsNotFound());
std::unique_ptr<TraceReader> trace_reader;
ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
- Replayer replayer(db2, handles_, std::move(trace_reader));
- ASSERT_OK(replayer.Replay());
+ std::unique_ptr<Replayer> replayer;
+ ASSERT_OK(
+ db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
+ ASSERT_OK(replayer->Prepare());
+ ASSERT_OK(replayer->Replay(ReplayOptions(), nullptr));
+ replayer.reset();
ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
ASSERT_FALSE(db2->Get(ro, handles[0], "b", &value).IsNotFound());
ASSERT_OK(db_->EndTrace());
// These should not get into the trace file as it is after EndTrace.
- Put("hello", "world");
- Merge("foo", "bar");
+ ASSERT_OK(Put("hello", "world"));
+ ASSERT_OK(Merge("foo", "bar"));
// Open another db, replay, and verify the data
std::string value;
- std::string dbname2 = test::TmpDir(env_) + "/db_replay";
+ std::string dbname2 = test::PerThreadDBPath(env_, "db_replay");
ASSERT_OK(DestroyDB(dbname2, options));
// Using a different name than db2, to pacify infer's use-after-lifetime
std::unique_ptr<TraceReader> trace_reader;
ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
- Replayer replayer(db2, handles_, std::move(trace_reader));
- ASSERT_OK(replayer.Replay());
+ std::unique_ptr<Replayer> replayer;
+ ASSERT_OK(
+ db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
+ ASSERT_OK(replayer->Prepare());
+ ASSERT_OK(replayer->Replay(ReplayOptions(), nullptr));
+ replayer.reset();
// All the key-values should not present since we filter out the WRITE ops.
ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
ASSERT_OK(DestroyDB(dbname2, options));
// Set up a new db.
- std::string dbname3 = test::TmpDir(env_) + "/db_not_trace_read";
+ std::string dbname3 = test::PerThreadDBPath(env_, "db_not_trace_read");
ASSERT_OK(DestroyDB(dbname3, options));
DB* db3_init = nullptr;
ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions()));
handles.clear();
- DB* db3 = nullptr;
+ DB* db3 = nullptr;
ASSERT_OK(DB::Open(db_opts, dbname3, column_families, &handles, &db3));
env_->SleepForMicroseconds(100);
ASSERT_TRUE(db3->Get(ro, handles[0], "a", &value).IsNotFound());
ASSERT_TRUE(db3->Get(ro, handles[0], "g", &value).IsNotFound());
- //The tracer will not record the READ ops.
+ // The tracer will not record the READ ops.
trace_opts.filter = TraceFilterType::kTraceFilterGet;
std::string trace_filename3 = dbname_ + "/rocksdb.trace_3";
std::unique_ptr<TraceWriter> trace_writer3;
ASSERT_OK(
- NewFileTraceWriter(env_, env_opts, trace_filename3, &trace_writer3));
+ NewFileTraceWriter(env_, env_opts, trace_filename3, &trace_writer3));
ASSERT_OK(db3->StartTrace(trace_opts, std::move(trace_writer3)));
ASSERT_OK(db3->Put(wo, handles[0], "a", "1"));
std::unique_ptr<TraceReader> trace_reader3;
ASSERT_OK(
- NewFileTraceReader(env_, env_opts, trace_filename3, &trace_reader3));
+ NewFileTraceReader(env_, env_opts, trace_filename3, &trace_reader3));
// Count the number of records in the trace file;
int count = 0;
ASSERT_FALSE(pinned_value.IsPinned());
ASSERT_EQ(pinned_value.ToString(), "bar");
- dbfull()->TEST_CompactRange(0 /* level */, nullptr /* begin */,
- nullptr /* end */, nullptr /* column_family */,
- true /* disallow_trivial_move */);
+ ASSERT_OK(dbfull()->TEST_CompactRange(
+ 0 /* level */, nullptr /* begin */, nullptr /* end */,
+ nullptr /* column_family */, true /* disallow_trivial_move */));
// Ensure pinned_value doesn't rely on memory munmap'd by the above
// compaction. It crashes if it does.
// Since v is the size of a block, each key should take a block
// of 400+ bytes.
- Put("1", v);
- Put("3", v);
- Put("5", v);
- Put("7", v);
+ ASSERT_OK(Put("1", v));
+ ASSERT_OK(Put("3", v));
+ ASSERT_OK(Put("5", v));
+ ASSERT_OK(Put("7", v));
ASSERT_OK(Flush());
ASSERT_EQ(0, bbto.block_cache->GetPinnedUsage());
iter->Seek("3");
ASSERT_TRUE(iter->Valid());
+ ASSERT_OK(iter->status());
+
ASSERT_GT(bbto.block_cache->GetPinnedUsage(), 0);
ASSERT_LT(bbto.block_cache->GetPinnedUsage(), 800);
}
ASSERT_EQ(0, bbto.block_cache->GetPinnedUsage());
// Test compaction case
- Put("2", v);
- Put("5", v);
- Put("6", v);
- Put("8", v);
+ ASSERT_OK(Put("2", v));
+ ASSERT_OK(Put("5", v));
+ ASSERT_OK(Put("6", v));
+ ASSERT_OK(Put("8", v));
ASSERT_OK(Flush());
// Clear existing data in block cache
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- Put("1", "1");
- Put("9", "1");
- Flush();
+ ASSERT_OK(Put("1", "1"));
+ ASSERT_OK(Put("9", "1"));
+ ASSERT_OK(Flush());
expected_lower_bound = 0;
expected_higher_bound = 8 * 1024;
- Put("1", "1");
- Put("9", "1");
- Flush();
+ ASSERT_OK(Put("1", "1"));
+ ASSERT_OK(Put("9", "1"));
+ ASSERT_OK(Flush());
- Put("1", "1");
- Put("9", "1");
- Flush();
+ ASSERT_OK(Put("1", "1"));
+ ASSERT_OK(Put("9", "1"));
+ ASSERT_OK(Flush());
// Full compaction to make sure there is no L0 file after the open.
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
options.max_open_files = -1;
Reopen(options);
- Put("1", "1");
- Put("9", "1");
- Flush();
+ ASSERT_OK(Put("1", "1"));
+ ASSERT_OK(Put("9", "1"));
+ ASSERT_OK(Flush());
- Put("1", "1");
- Put("9", "1");
- Flush();
+ ASSERT_OK(Put("1", "1"));
+ ASSERT_OK(Put("9", "1"));
+ ASSERT_OK(Flush());
ASSERT_TRUE(called.load());
called = false;
port::Thread user_thread1([&]() {
auto cfh = dbi->GetColumnFamilyHandleUnlocked(handles_[0]->GetID());
ASSERT_EQ(cfh->GetID(), handles_[0]->GetID());
- TEST_SYNC_POINT("TestGetColumnFamilyHandleUnlocked::GetColumnFamilyHandleUnlocked1");
- TEST_SYNC_POINT("TestGetColumnFamilyHandleUnlocked::ReadColumnFamilyHandle1");
+ TEST_SYNC_POINT(
+ "TestGetColumnFamilyHandleUnlocked::GetColumnFamilyHandleUnlocked1");
+ TEST_SYNC_POINT(
+ "TestGetColumnFamilyHandleUnlocked::ReadColumnFamilyHandle1");
ASSERT_EQ(cfh->GetID(), handles_[0]->GetID());
});
port::Thread user_thread2([&]() {
- TEST_SYNC_POINT("TestGetColumnFamilyHandleUnlocked::PreGetColumnFamilyHandleUnlocked2");
+ TEST_SYNC_POINT(
+ "TestGetColumnFamilyHandleUnlocked::PreGetColumnFamilyHandleUnlocked2");
auto cfh = dbi->GetColumnFamilyHandleUnlocked(handles_[1]->GetID());
ASSERT_EQ(cfh->GetID(), handles_[1]->GetID());
- TEST_SYNC_POINT("TestGetColumnFamilyHandleUnlocked::GetColumnFamilyHandleUnlocked2");
+ TEST_SYNC_POINT(
+ "TestGetColumnFamilyHandleUnlocked::GetColumnFamilyHandleUnlocked2");
ASSERT_EQ(cfh->GetID(), handles_[1]->GetID());
});
GetSstFiles(env_, dbname_, &files);
ASSERT_EQ(files.size(), 2);
- port::Thread user_thread1(
- [&]() { db_->CompactFiles(CompactionOptions(), handle, files, 1); });
+ Status user_thread1_status;
+ port::Thread user_thread1([&]() {
+ user_thread1_status =
+ db_->CompactFiles(CompactionOptions(), handle, files, 1);
+ });
+ Status user_thread2_status;
port::Thread user_thread2([&]() {
- ASSERT_OK(db_->IngestExternalFile(handle, {external_file2},
- IngestExternalFileOptions()));
+ user_thread2_status = db_->IngestExternalFile(handle, {external_file2},
+ IngestExternalFileOptions());
TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile1");
});
user_thread1.join();
user_thread2.join();
+ ASSERT_OK(user_thread1_status);
+ ASSERT_OK(user_thread2_status);
+
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
#endif // ROCKSDB_LITE
-// TODO: figure out why this test fails in appveyor
-#ifndef OS_WIN
TEST_F(DBTest2, MultiDBParallelOpenTest) {
const int kNumDbs = 2;
Options options = CurrentOptions();
std::vector<std::string> dbnames;
for (int i = 0; i < kNumDbs; ++i) {
- dbnames.emplace_back(test::TmpDir(env_) + "/db" + ToString(i));
+ dbnames.emplace_back(test::PerThreadDBPath(env_, "db" + std::to_string(i)));
ASSERT_OK(DestroyDB(dbnames.back(), options));
}
}
// Verify non-empty DBs can be recovered in parallel
- dbs.clear();
open_threads.clear();
for (int i = 0; i < kNumDbs; ++i) {
open_threads.emplace_back(
ASSERT_OK(DestroyDB(dbnames[i], options));
}
}
-#endif // OS_WIN
namespace {
class DummyOldStats : public Statistics {
public:
+ const char* Name() const override { return "DummyOldStats"; }
uint64_t getTickerCount(uint32_t /*ticker_type*/) const override { return 0; }
void recordTick(uint32_t /* ticker_type */, uint64_t /* count */) override {
num_rt++;
std::atomic<int> num_rt{0};
std::atomic<int> num_mt{0};
};
-} // namespace
+} // anonymous namespace
TEST_F(DBTest2, OldStatsInterface) {
DummyOldStats* dos = new DummyOldStats();
options.statistics = stats;
Reopen(options);
- Put("foo", "bar");
+ ASSERT_OK(Put("foo", "bar"));
ASSERT_EQ("bar", Get("foo"));
ASSERT_OK(Flush());
ASSERT_EQ("bar", Get("foo"));
ASSERT_OK(Put("bbb1", ""));
Iterator* iter = db_->NewIterator(ReadOptions());
+ ASSERT_OK(iter->status());
// Seeking into f1, the iterator will check bloom filter which returns the
// file iterator ot be invalidate, and the cursor will put into f2, with
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
Iterator* iter = db_->NewIterator(ReadOptions());
+ ASSERT_OK(iter->status());
// Bloom filter is filterd out by f1.
// This is just one of several valid position following the contract.
// the behavior of the current implementation. If underlying implementation
// changes, the test might fail here.
iter->Seek("bbb1");
+ ASSERT_OK(iter->status());
ASSERT_FALSE(iter->Valid());
delete iter;
ReadOptions ro;
ro.total_order_seek = true;
std::unique_ptr<Iterator> iter(db_->NewIterator(ro));
+ ASSERT_OK(iter->status());
iter->Seek("e");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("x", iter->key().ToString());
ASSERT_OK(Put("a", "a"));
Iterator* iter = db_->NewIterator(ReadOptions());
+ ASSERT_OK(iter->status());
ASSERT_OK(Flush());
size_t value = options.write_buffer_manager->memory_usage();
ASSERT_GT(value, base_value);
ASSERT_OK(Put("key", "2"));
ASSERT_OK(db_->Merge(WriteOptions(), "key", "3"));
ASSERT_OK(db_->Merge(WriteOptions(), "key", "4"));
- Flush();
+ ASSERT_OK(Flush());
CompactRangeOptions cro;
cro.change_level = true;
cro.target_level = 2;
nullptr));
ASSERT_OK(db_->Merge(WriteOptions(), "key", "5"));
- Flush();
+ ASSERT_OK(Flush());
ASSERT_OK(db_->Merge(WriteOptions(), "key", "6"));
- Flush();
+ ASSERT_OK(Flush());
ASSERT_OK(db_->Merge(WriteOptions(), "key", "7"));
- Flush();
+ ASSERT_OK(Flush());
ASSERT_OK(db_->Merge(WriteOptions(), "key", "8"));
- Flush();
- dbfull()->TEST_WaitForCompact(true);
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
#ifndef ROCKSDB_LITE
ASSERT_EQ("0,4,1", FilesPerLevel());
#endif // ROCKSDB_LITE
}
TEST_F(DBTest2, FileConsistencyCheckInOpen) {
- Put("foo", "bar");
- Flush();
+ ASSERT_OK(Put("foo", "bar"));
+ ASSERT_OK(Flush());
SyncPoint::GetInstance()->SetCallBack(
"VersionBuilder::CheckConsistencyBeforeReturn", [&](void* arg) {
ASSERT_OK(Put("a1", large_value));
ASSERT_OK(Put("x1", large_value));
ASSERT_OK(Put("y1", large_value));
- Flush();
+ ASSERT_OK(Flush());
{
std::unique_ptr<Iterator> iterator(db_->NewIterator(ReadOptions()));
+ ASSERT_OK(iterator->status());
iterator->SeekForPrev("x3");
ASSERT_TRUE(iterator->Valid());
ASSERT_EQ("x1", iterator->key().ToString());
ASSERT_OK(Put("xx1", ""));
ASSERT_OK(Put("xz1", ""));
ASSERT_OK(Put("zz", ""));
- Flush();
+ ASSERT_OK(Flush());
// After reopening DB with prefix size 2 => 1, prefix extractor
// won't take effective unless it won't change results based
{
std::unique_ptr<Iterator> iterator(db_->NewIterator(ReadOptions()));
+ ASSERT_OK(iterator->status());
iterator->Seek("xa");
ASSERT_TRUE(iterator->Valid());
ASSERT_EQ("xb", iterator->key().ToString());
{
std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
+ ASSERT_OK(iterator->status());
// SeekForPrev() never uses prefix bloom if it is changed.
iterator->SeekForPrev("xg0");
ub = Slice(ub_str);
{
std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
+ ASSERT_OK(iterator->status());
iterator->Seek("x");
ASSERT_TRUE(iterator->Valid());
if (expect_filter_check) {
ASSERT_EQ(4, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
}
+
+ ASSERT_OK(iterator->status());
}
{
std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
if (expect_filter_check) {
ASSERT_EQ(6, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
}
+
+ ASSERT_OK(iterator->status());
}
ub_str = "xg9";
if (expect_filter_check) {
ASSERT_EQ(7, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
}
+ ASSERT_OK(iterator->status());
}
}
}
Reopen(options);
ASSERT_OK(Put("b1", "ok"));
- Flush();
+ ASSERT_OK(Flush());
// Flushing several files so that the chance that hash bucket
// is empty fo "b" in at least one of the files is high.
ASSERT_OK(Put("a1", ""));
ASSERT_OK(Put("c1", ""));
- Flush();
+ ASSERT_OK(Flush());
ASSERT_OK(Put("a2", ""));
ASSERT_OK(Put("c2", ""));
- Flush();
+ ASSERT_OK(Flush());
ASSERT_OK(Put("a3", ""));
ASSERT_OK(Put("c3", ""));
- Flush();
+ ASSERT_OK(Flush());
ASSERT_OK(Put("a4", ""));
ASSERT_OK(Put("c4", ""));
- Flush();
+ ASSERT_OK(Flush());
ASSERT_OK(Put("a5", ""));
ASSERT_OK(Put("c5", ""));
- Flush();
+ ASSERT_OK(Flush());
ASSERT_EQ("ok", Get("b1"));
}
#ifndef ROCKSDB_LITE
TEST_F(DBTest2, AutoPrefixMode1) {
- // create a DB with block prefix index
- BlockBasedTableOptions table_options;
- Options options = CurrentOptions();
- table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
- options.table_factory.reset(NewBlockBasedTableFactory(table_options));
- options.prefix_extractor.reset(NewFixedPrefixTransform(1));
- options.statistics = CreateDBStatistics();
-
- Reopen(options);
-
- Random rnd(301);
- std::string large_value = rnd.RandomString(500);
-
- ASSERT_OK(Put("a1", large_value));
- ASSERT_OK(Put("x1", large_value));
- ASSERT_OK(Put("y1", large_value));
- Flush();
+ do {
+ // create a DB with block prefix index
+ Options options = CurrentOptions();
+ BlockBasedTableOptions table_options =
+ *options.table_factory->GetOptions<BlockBasedTableOptions>();
+ table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
+ options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+ options.prefix_extractor.reset(NewFixedPrefixTransform(1));
+ options.statistics = CreateDBStatistics();
- ReadOptions ro;
- ro.total_order_seek = false;
- ro.auto_prefix_mode = true;
- {
- std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
- iterator->Seek("b1");
- ASSERT_TRUE(iterator->Valid());
- ASSERT_EQ("x1", iterator->key().ToString());
- ASSERT_EQ(0, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
- }
+ Reopen(options);
- std::string ub_str = "b9";
- Slice ub(ub_str);
- ro.iterate_upper_bound = &ub;
+ Random rnd(301);
+ std::string large_value = rnd.RandomString(500);
- {
- std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
- iterator->Seek("b1");
- ASSERT_FALSE(iterator->Valid());
- ASSERT_EQ(1, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
- }
+ ASSERT_OK(Put("a1", large_value));
+ ASSERT_OK(Put("x1", large_value));
+ ASSERT_OK(Put("y1", large_value));
+ ASSERT_OK(Flush());
- ub_str = "z";
- ub = Slice(ub_str);
- {
- std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
- iterator->Seek("b1");
- ASSERT_TRUE(iterator->Valid());
- ASSERT_EQ("x1", iterator->key().ToString());
- ASSERT_EQ(1, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
- }
+ ReadOptions ro;
+ ro.total_order_seek = false;
+ ro.auto_prefix_mode = true;
- ub_str = "c";
- ub = Slice(ub_str);
- {
- std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
- iterator->Seek("b1");
- ASSERT_FALSE(iterator->Valid());
- ASSERT_EQ(2, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
- }
+ const auto stat = BLOOM_FILTER_PREFIX_CHECKED;
+ {
+ std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
+ iterator->Seek("b1");
+ ASSERT_TRUE(iterator->Valid());
+ ASSERT_EQ("x1", iterator->key().ToString());
+ EXPECT_EQ(0, TestGetAndResetTickerCount(options, stat));
+ ASSERT_OK(iterator->status());
+ }
- // The same queries without recreating iterator
- {
- ub_str = "b9";
- ub = Slice(ub_str);
+ Slice ub;
ro.iterate_upper_bound = &ub;
- std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
- iterator->Seek("b1");
- ASSERT_FALSE(iterator->Valid());
- ASSERT_EQ(3, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
+ ub = "b9";
+ {
+ std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
+ iterator->Seek("b1");
+ ASSERT_FALSE(iterator->Valid());
+ EXPECT_EQ(1, TestGetAndResetTickerCount(options, stat));
+ ASSERT_OK(iterator->status());
+ }
- ub_str = "z";
- ub = Slice(ub_str);
+ ub = "z";
+ {
+ std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
+ iterator->Seek("b1");
+ ASSERT_TRUE(iterator->Valid());
+ ASSERT_EQ("x1", iterator->key().ToString());
+ EXPECT_EQ(0, TestGetAndResetTickerCount(options, stat));
+ ASSERT_OK(iterator->status());
+ }
- iterator->Seek("b1");
- ASSERT_TRUE(iterator->Valid());
- ASSERT_EQ("x1", iterator->key().ToString());
- ASSERT_EQ(3, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
+ ub = "c";
+ {
+ std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
+ iterator->Seek("b1");
+ ASSERT_FALSE(iterator->Valid());
+ EXPECT_EQ(1, TestGetAndResetTickerCount(options, stat));
+ ASSERT_OK(iterator->status());
+ }
- ub_str = "c";
- ub = Slice(ub_str);
+ ub = "c1";
+ {
+ std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
+ iterator->Seek("b1");
+ ASSERT_FALSE(iterator->Valid());
+ EXPECT_EQ(0, TestGetAndResetTickerCount(options, stat));
+ ASSERT_OK(iterator->status());
+ }
- iterator->Seek("b1");
- ASSERT_FALSE(iterator->Valid());
- ASSERT_EQ(4, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
+ // The same queries without recreating iterator
+ {
+ std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
- ub_str = "b9";
- ub = Slice(ub_str);
- ro.iterate_upper_bound = &ub;
- iterator->SeekForPrev("b1");
- ASSERT_TRUE(iterator->Valid());
- ASSERT_EQ("a1", iterator->key().ToString());
- ASSERT_EQ(4, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
+ ub = "b9";
+ iterator->Seek("b1");
+ ASSERT_FALSE(iterator->Valid());
+ EXPECT_EQ(1, TestGetAndResetTickerCount(options, stat));
+ ASSERT_OK(iterator->status());
- ub_str = "zz";
- ub = Slice(ub_str);
- ro.iterate_upper_bound = &ub;
- iterator->SeekToLast();
- ASSERT_TRUE(iterator->Valid());
- ASSERT_EQ("y1", iterator->key().ToString());
+ ub = "z";
+ iterator->Seek("b1");
+ ASSERT_TRUE(iterator->Valid());
+ ASSERT_EQ("x1", iterator->key().ToString());
+ EXPECT_EQ(0, TestGetAndResetTickerCount(options, stat));
- iterator->SeekToFirst();
- ASSERT_TRUE(iterator->Valid());
- ASSERT_EQ("a1", iterator->key().ToString());
+ ub = "c";
+ iterator->Seek("b1");
+ ASSERT_FALSE(iterator->Valid());
+ EXPECT_EQ(1, TestGetAndResetTickerCount(options, stat));
+
+ ub = "b9";
+ iterator->SeekForPrev("b1");
+ ASSERT_TRUE(iterator->Valid());
+ ASSERT_EQ("a1", iterator->key().ToString());
+ EXPECT_EQ(0, TestGetAndResetTickerCount(options, stat));
+
+ ub = "zz";
+ iterator->SeekToLast();
+ ASSERT_TRUE(iterator->Valid());
+ ASSERT_EQ("y1", iterator->key().ToString());
+
+ iterator->SeekToFirst();
+ ASSERT_TRUE(iterator->Valid());
+ ASSERT_EQ("a1", iterator->key().ToString());
+ }
+
+ // Similar, now with reverse comparator
+ // Technically, we are violating axiom 2 of prefix_extractors, but
+ // it should be revised because of major use-cases using
+ // ReverseBytewiseComparator with capped/fixed prefix Seek. (FIXME)
+ options.comparator = ReverseBytewiseComparator();
+ options.prefix_extractor.reset(NewFixedPrefixTransform(1));
+
+ DestroyAndReopen(options);
+
+ ASSERT_OK(Put("a1", large_value));
+ ASSERT_OK(Put("x1", large_value));
+ ASSERT_OK(Put("y1", large_value));
+ ASSERT_OK(Flush());
+
+ {
+ std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
+
+ ub = "b1";
+ iterator->Seek("b9");
+ ASSERT_FALSE(iterator->Valid());
+ EXPECT_EQ(1, TestGetAndResetTickerCount(options, stat));
+ ASSERT_OK(iterator->status());
+
+ ub = "b1";
+ iterator->Seek("z");
+ ASSERT_TRUE(iterator->Valid());
+ ASSERT_EQ("y1", iterator->key().ToString());
+ EXPECT_EQ(0, TestGetAndResetTickerCount(options, stat));
+
+ ub = "b1";
+ iterator->Seek("c");
+ ASSERT_FALSE(iterator->Valid());
+ EXPECT_EQ(0, TestGetAndResetTickerCount(options, stat));
+
+ ub = "b";
+ iterator->Seek("c9");
+ ASSERT_FALSE(iterator->Valid());
+ // Fails if ReverseBytewiseComparator::IsSameLengthImmediateSuccessor
+ // is "correctly" implemented.
+ EXPECT_EQ(0, TestGetAndResetTickerCount(options, stat));
+
+ ub = "a";
+ iterator->Seek("b9");
+ // Fails if ReverseBytewiseComparator::IsSameLengthImmediateSuccessor
+ // is "correctly" implemented.
+ ASSERT_TRUE(iterator->Valid());
+ ASSERT_EQ("a1", iterator->key().ToString());
+ EXPECT_EQ(0, TestGetAndResetTickerCount(options, stat));
+
+ ub = "b";
+ iterator->Seek("a");
+ ASSERT_FALSE(iterator->Valid());
+ // Fails if ReverseBytewiseComparator::IsSameLengthImmediateSuccessor
+ // matches BytewiseComparator::IsSameLengthImmediateSuccessor. Upper
+ // comparing before seek key prevents a real bug from surfacing.
+ EXPECT_EQ(0, TestGetAndResetTickerCount(options, stat));
+
+ ub = "b1";
+ iterator->SeekForPrev("b9");
+ ASSERT_TRUE(iterator->Valid());
+ // Fails if ReverseBytewiseComparator::IsSameLengthImmediateSuccessor
+ // is "correctly" implemented.
+ ASSERT_EQ("x1", iterator->key().ToString());
+ EXPECT_EQ(0, TestGetAndResetTickerCount(options, stat));
+
+ ub = "a";
+ iterator->SeekToLast();
+ ASSERT_TRUE(iterator->Valid());
+ ASSERT_EQ("a1", iterator->key().ToString());
+
+ iterator->SeekToFirst();
+ ASSERT_TRUE(iterator->Valid());
+ ASSERT_EQ("y1", iterator->key().ToString());
+ }
+
+ // Now something a bit different, related to "short" keys that
+ // auto_prefix_mode can omit. See "BUG" section of auto_prefix_mode.
+ options.comparator = BytewiseComparator();
+ for (const auto config : {"fixed:2", "capped:2"}) {
+ ASSERT_OK(SliceTransform::CreateFromString(ConfigOptions(), config,
+ &options.prefix_extractor));
+
+ // FIXME: kHashSearch, etc. requires all keys be InDomain
+ if (StartsWith(config, "fixed") &&
+ (table_options.index_type == BlockBasedTableOptions::kHashSearch ||
+ StartsWith(options.memtable_factory->Name(), "Hash"))) {
+ continue;
+ }
+ DestroyAndReopen(options);
+
+ const char* a_end_stuff = "a\xffXYZ";
+ const char* b_begin_stuff = "b\x00XYZ";
+ ASSERT_OK(Put("a", large_value));
+ ASSERT_OK(Put("b", large_value));
+ ASSERT_OK(Put(Slice(b_begin_stuff, 3), large_value));
+ ASSERT_OK(Put("c", large_value));
+ ASSERT_OK(Flush());
+
+ // control showing valid optimization with auto_prefix mode
+ ub = Slice(a_end_stuff, 4);
+ ro.iterate_upper_bound = &ub;
+
+ std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
+ iterator->Seek(Slice(a_end_stuff, 2));
+ ASSERT_FALSE(iterator->Valid());
+ EXPECT_EQ(1, TestGetAndResetTickerCount(options, stat));
+ ASSERT_OK(iterator->status());
+
+ // test, cannot be validly optimized with auto_prefix_mode
+ ub = Slice(b_begin_stuff, 2);
+ ro.iterate_upper_bound = &ub;
+
+ iterator->Seek(Slice(a_end_stuff, 2));
+ // !!! BUG !!! See "BUG" section of auto_prefix_mode.
+ ASSERT_FALSE(iterator->Valid());
+ EXPECT_EQ(1, TestGetAndResetTickerCount(options, stat));
+ ASSERT_OK(iterator->status());
+
+ // To prove that is the wrong result, now use total order seek
+ ReadOptions tos_ro = ro;
+ tos_ro.total_order_seek = true;
+ tos_ro.auto_prefix_mode = false;
+ iterator.reset(db_->NewIterator(tos_ro));
+ iterator->Seek(Slice(a_end_stuff, 2));
+ ASSERT_TRUE(iterator->Valid());
+ ASSERT_EQ("b", iterator->key().ToString());
+ EXPECT_EQ(0, TestGetAndResetTickerCount(options, stat));
+ ASSERT_OK(iterator->status());
+ }
+ } while (ChangeOptions(kSkipPlainTable));
+}
+
+class RenameCurrentTest : public DBTestBase,
+ public testing::WithParamInterface<std::string> {
+ public:
+ RenameCurrentTest()
+ : DBTestBase("rename_current_test", /*env_do_fsync=*/true),
+ sync_point_(GetParam()) {}
+
+ ~RenameCurrentTest() override {}
+
+ void SetUp() override {
+ env_->no_file_overwrite_.store(true, std::memory_order_release);
+ }
+
+ void TearDown() override {
+ env_->no_file_overwrite_.store(false, std::memory_order_release);
+ }
+
+ void SetupSyncPoints() {
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->SetCallBack(sync_point_, [&](void* arg) {
+ Status* s = reinterpret_cast<Status*>(arg);
+ assert(s);
+ *s = Status::IOError("Injected IO error.");
+ });
+ }
+
+ const std::string sync_point_;
+};
+
+INSTANTIATE_TEST_CASE_P(DistributedFS, RenameCurrentTest,
+ ::testing::Values("SetCurrentFile:BeforeRename",
+ "SetCurrentFile:AfterRename"));
+
+TEST_P(RenameCurrentTest, Open) {
+ Destroy(last_options_);
+ Options options = GetDefaultOptions();
+ options.create_if_missing = true;
+ SetupSyncPoints();
+ SyncPoint::GetInstance()->EnableProcessing();
+ Status s = TryReopen(options);
+ ASSERT_NOK(s);
+
+ SyncPoint::GetInstance()->DisableProcessing();
+ Reopen(options);
+}
+
+TEST_P(RenameCurrentTest, Flush) {
+ Destroy(last_options_);
+ Options options = GetDefaultOptions();
+ options.max_manifest_file_size = 1;
+ options.create_if_missing = true;
+ Reopen(options);
+ ASSERT_OK(Put("key", "value"));
+ SetupSyncPoints();
+ SyncPoint::GetInstance()->EnableProcessing();
+ ASSERT_NOK(Flush());
+
+ ASSERT_NOK(Put("foo", "value"));
+
+ SyncPoint::GetInstance()->DisableProcessing();
+ Reopen(options);
+ ASSERT_EQ("value", Get("key"));
+ ASSERT_EQ("NOT_FOUND", Get("foo"));
+}
+
+TEST_P(RenameCurrentTest, Compaction) {
+ Destroy(last_options_);
+ Options options = GetDefaultOptions();
+ options.max_manifest_file_size = 1;
+ options.create_if_missing = true;
+ Reopen(options);
+ ASSERT_OK(Put("a", "a_value"));
+ ASSERT_OK(Put("c", "c_value"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put("b", "b_value"));
+ ASSERT_OK(Put("d", "d_value"));
+ ASSERT_OK(Flush());
+
+ SetupSyncPoints();
+ SyncPoint::GetInstance()->EnableProcessing();
+ ASSERT_NOK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
+ /*end=*/nullptr));
+
+ ASSERT_NOK(Put("foo", "value"));
+
+ SyncPoint::GetInstance()->DisableProcessing();
+ Reopen(options);
+ ASSERT_EQ("NOT_FOUND", Get("foo"));
+ ASSERT_EQ("d_value", Get("d"));
+}
+
+TEST_F(DBTest2, LastLevelTemperature) {
+ class TestListener : public EventListener {
+ public:
+ void OnFileReadFinish(const FileOperationInfo& info) override {
+ UpdateFileTemperature(info);
+ }
+
+ void OnFileWriteFinish(const FileOperationInfo& info) override {
+ UpdateFileTemperature(info);
+ }
+
+ void OnFileFlushFinish(const FileOperationInfo& info) override {
+ UpdateFileTemperature(info);
+ }
+
+ void OnFileSyncFinish(const FileOperationInfo& info) override {
+ UpdateFileTemperature(info);
+ }
+
+ void OnFileCloseFinish(const FileOperationInfo& info) override {
+ UpdateFileTemperature(info);
+ }
+
+ bool ShouldBeNotifiedOnFileIO() override { return true; }
+
+ std::unordered_map<uint64_t, Temperature> file_temperatures;
+
+ private:
+ void UpdateFileTemperature(const FileOperationInfo& info) {
+ auto filename = GetFileName(info.path);
+ uint64_t number;
+ FileType type;
+ ASSERT_TRUE(ParseFileName(filename, &number, &type));
+ if (type == kTableFile) {
+ MutexLock l(&mutex_);
+ auto ret = file_temperatures.insert({number, info.temperature});
+ if (!ret.second) {
+ // the same file temperature should always be the same for all events
+ ASSERT_TRUE(ret.first->second == info.temperature);
+ }
+ }
+ }
+
+ std::string GetFileName(const std::string& fname) {
+ auto filename = fname.substr(fname.find_last_of(kFilePathSeparator) + 1);
+ // workaround only for Windows that the file path could contain both
+ // Windows FilePathSeparator and '/'
+ filename = filename.substr(filename.find_last_of('/') + 1);
+ return filename;
+ }
+
+ port::Mutex mutex_;
+ };
+
+ const int kNumLevels = 7;
+ const int kLastLevel = kNumLevels - 1;
+
+ auto* listener = new TestListener();
+
+ Options options = CurrentOptions();
+ options.bottommost_temperature = Temperature::kWarm;
+ options.level0_file_num_compaction_trigger = 2;
+ options.level_compaction_dynamic_level_bytes = true;
+ options.num_levels = kNumLevels;
+ options.statistics = CreateDBStatistics();
+ options.listeners.emplace_back(listener);
+ Reopen(options);
+
+ auto size = GetSstSizeHelper(Temperature::kUnknown);
+ ASSERT_EQ(size, 0);
+ size = GetSstSizeHelper(Temperature::kWarm);
+ ASSERT_EQ(size, 0);
+ size = GetSstSizeHelper(Temperature::kHot);
+ ASSERT_EQ(size, 0);
+
+ ASSERT_OK(Put("foo", "bar"));
+ ASSERT_OK(Put("bar", "bar"));
+ ASSERT_OK(Flush());
+ ASSERT_OK(Put("foo", "bar"));
+ ASSERT_OK(Put("bar", "bar"));
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+ get_iostats_context()->Reset();
+ IOStatsContext* iostats = get_iostats_context();
+
+ ColumnFamilyMetaData metadata;
+ db_->GetColumnFamilyMetaData(&metadata);
+ ASSERT_EQ(1, metadata.file_count);
+ SstFileMetaData meta = metadata.levels[kLastLevel].files[0];
+ ASSERT_EQ(Temperature::kWarm, meta.temperature);
+ uint64_t number;
+ FileType type;
+ ASSERT_TRUE(ParseFileName(meta.name, &number, &type));
+ ASSERT_EQ(listener->file_temperatures.at(number), meta.temperature);
+
+ size = GetSstSizeHelper(Temperature::kUnknown);
+ ASSERT_EQ(size, 0);
+ size = GetSstSizeHelper(Temperature::kWarm);
+ ASSERT_GT(size, 0);
+ ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_read_count, 0);
+ ASSERT_EQ(iostats->file_io_stats_by_temperature.warm_file_read_count, 0);
+ ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_read_count, 0);
+ ASSERT_EQ(options.statistics->getTickerCount(HOT_FILE_READ_BYTES), 0);
+ ASSERT_GT(options.statistics->getTickerCount(WARM_FILE_READ_BYTES), 0);
+ ASSERT_EQ(options.statistics->getTickerCount(COLD_FILE_READ_BYTES), 0);
+
+ ASSERT_EQ("bar", Get("foo"));
+
+ ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_read_count, 0);
+ ASSERT_EQ(iostats->file_io_stats_by_temperature.warm_file_read_count, 1);
+ ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_read_count, 0);
+ ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_bytes_read, 0);
+ ASSERT_GT(iostats->file_io_stats_by_temperature.warm_file_bytes_read, 0);
+ ASSERT_EQ(iostats->file_io_stats_by_temperature.cold_file_bytes_read, 0);
+ ASSERT_EQ(options.statistics->getTickerCount(HOT_FILE_READ_BYTES), 0);
+ ASSERT_GT(options.statistics->getTickerCount(WARM_FILE_READ_BYTES), 0);
+ ASSERT_EQ(options.statistics->getTickerCount(COLD_FILE_READ_BYTES), 0);
+ ASSERT_EQ(options.statistics->getTickerCount(HOT_FILE_READ_COUNT), 0);
+ ASSERT_GT(options.statistics->getTickerCount(WARM_FILE_READ_COUNT), 0);
+ ASSERT_EQ(options.statistics->getTickerCount(COLD_FILE_READ_COUNT), 0);
+
+ // non-bottommost file still has unknown temperature
+ ASSERT_OK(Put("foo", "bar"));
+ ASSERT_OK(Put("bar", "bar"));
+ ASSERT_OK(Flush());
+ ASSERT_EQ("bar", Get("bar"));
+ ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_read_count, 0);
+ ASSERT_EQ(iostats->file_io_stats_by_temperature.warm_file_read_count, 1);
+ ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_read_count, 0);
+ ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_bytes_read, 0);
+ ASSERT_GT(iostats->file_io_stats_by_temperature.warm_file_bytes_read, 0);
+ ASSERT_EQ(iostats->file_io_stats_by_temperature.cold_file_bytes_read, 0);
+ ASSERT_EQ(options.statistics->getTickerCount(HOT_FILE_READ_BYTES), 0);
+ ASSERT_GT(options.statistics->getTickerCount(WARM_FILE_READ_BYTES), 0);
+ ASSERT_EQ(options.statistics->getTickerCount(COLD_FILE_READ_BYTES), 0);
+ ASSERT_EQ(options.statistics->getTickerCount(HOT_FILE_READ_COUNT), 0);
+ ASSERT_GT(options.statistics->getTickerCount(WARM_FILE_READ_COUNT), 0);
+ ASSERT_EQ(options.statistics->getTickerCount(COLD_FILE_READ_COUNT), 0);
+
+ db_->GetColumnFamilyMetaData(&metadata);
+ ASSERT_EQ(2, metadata.file_count);
+ meta = metadata.levels[0].files[0];
+ ASSERT_EQ(Temperature::kUnknown, meta.temperature);
+ ASSERT_TRUE(ParseFileName(meta.name, &number, &type));
+ ASSERT_EQ(listener->file_temperatures.at(number), meta.temperature);
+
+ meta = metadata.levels[kLastLevel].files[0];
+ ASSERT_EQ(Temperature::kWarm, meta.temperature);
+ ASSERT_TRUE(ParseFileName(meta.name, &number, &type));
+ ASSERT_EQ(listener->file_temperatures.at(number), meta.temperature);
+
+ size = GetSstSizeHelper(Temperature::kUnknown);
+ ASSERT_GT(size, 0);
+ size = GetSstSizeHelper(Temperature::kWarm);
+ ASSERT_GT(size, 0);
+
+ // reopen and check the information is persisted
+ Reopen(options);
+ db_->GetColumnFamilyMetaData(&metadata);
+ ASSERT_EQ(2, metadata.file_count);
+ meta = metadata.levels[0].files[0];
+ ASSERT_EQ(Temperature::kUnknown, meta.temperature);
+ ASSERT_TRUE(ParseFileName(meta.name, &number, &type));
+ ASSERT_EQ(listener->file_temperatures.at(number), meta.temperature);
+
+ meta = metadata.levels[kLastLevel].files[0];
+ ASSERT_EQ(Temperature::kWarm, meta.temperature);
+ ASSERT_TRUE(ParseFileName(meta.name, &number, &type));
+ ASSERT_EQ(listener->file_temperatures.at(number), meta.temperature);
+ size = GetSstSizeHelper(Temperature::kUnknown);
+ ASSERT_GT(size, 0);
+ size = GetSstSizeHelper(Temperature::kWarm);
+ ASSERT_GT(size, 0);
+
+ // check other non-exist temperatures
+ size = GetSstSizeHelper(Temperature::kHot);
+ ASSERT_EQ(size, 0);
+ size = GetSstSizeHelper(Temperature::kCold);
+ ASSERT_EQ(size, 0);
+ std::string prop;
+ ASSERT_TRUE(dbfull()->GetProperty(
+ DB::Properties::kLiveSstFilesSizeAtTemperature + std::to_string(22),
+ &prop));
+ ASSERT_EQ(std::atoi(prop.c_str()), 0);
+
+ Reopen(options);
+ db_->GetColumnFamilyMetaData(&metadata);
+ ASSERT_EQ(2, metadata.file_count);
+ meta = metadata.levels[0].files[0];
+ ASSERT_EQ(Temperature::kUnknown, meta.temperature);
+ ASSERT_TRUE(ParseFileName(meta.name, &number, &type));
+ ASSERT_EQ(listener->file_temperatures.at(number), meta.temperature);
+
+ meta = metadata.levels[kLastLevel].files[0];
+ ASSERT_EQ(Temperature::kWarm, meta.temperature);
+ ASSERT_TRUE(ParseFileName(meta.name, &number, &type));
+ ASSERT_EQ(listener->file_temperatures.at(number), meta.temperature);
+}
+
+TEST_F(DBTest2, LastLevelTemperatureUniversal) {
+ const int kTriggerNum = 3;
+ const int kNumLevels = 5;
+ const int kBottommostLevel = kNumLevels - 1;
+ Options options = CurrentOptions();
+ options.compaction_style = kCompactionStyleUniversal;
+ options.level0_file_num_compaction_trigger = kTriggerNum;
+ options.num_levels = kNumLevels;
+ options.statistics = CreateDBStatistics();
+ DestroyAndReopen(options);
+
+ auto size = GetSstSizeHelper(Temperature::kUnknown);
+ ASSERT_EQ(size, 0);
+ size = GetSstSizeHelper(Temperature::kWarm);
+ ASSERT_EQ(size, 0);
+ size = GetSstSizeHelper(Temperature::kHot);
+ ASSERT_EQ(size, 0);
+ get_iostats_context()->Reset();
+ IOStatsContext* iostats = get_iostats_context();
+
+ for (int i = 0; i < kTriggerNum; i++) {
+ ASSERT_OK(Put("foo", "bar"));
+ ASSERT_OK(Put("bar", "bar"));
+ ASSERT_OK(Flush());
}
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+ ColumnFamilyMetaData metadata;
+ db_->GetColumnFamilyMetaData(&metadata);
+ ASSERT_EQ(1, metadata.file_count);
+ ASSERT_EQ(Temperature::kUnknown,
+ metadata.levels[kBottommostLevel].files[0].temperature);
+ size = GetSstSizeHelper(Temperature::kUnknown);
+ ASSERT_GT(size, 0);
+ size = GetSstSizeHelper(Temperature::kWarm);
+ ASSERT_EQ(size, 0);
+ ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_read_count, 0);
+ ASSERT_EQ(iostats->file_io_stats_by_temperature.warm_file_read_count, 0);
+ ASSERT_EQ(iostats->file_io_stats_by_temperature.cold_file_read_count, 0);
+ ASSERT_EQ(options.statistics->getTickerCount(HOT_FILE_READ_BYTES), 0);
+ ASSERT_EQ(options.statistics->getTickerCount(WARM_FILE_READ_BYTES), 0);
+ ASSERT_EQ(options.statistics->getTickerCount(COLD_FILE_READ_BYTES), 0);
+ ASSERT_EQ(options.statistics->getTickerCount(HOT_FILE_READ_COUNT), 0);
+ ASSERT_EQ(options.statistics->getTickerCount(WARM_FILE_READ_COUNT), 0);
+ ASSERT_EQ(options.statistics->getTickerCount(COLD_FILE_READ_COUNT), 0);
+ ASSERT_EQ("bar", Get("foo"));
+
+ ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_read_count, 0);
+ ASSERT_EQ(iostats->file_io_stats_by_temperature.warm_file_read_count, 0);
+ ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_read_count, 0);
+ ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_bytes_read, 0);
+ ASSERT_EQ(iostats->file_io_stats_by_temperature.warm_file_bytes_read, 0);
+ ASSERT_EQ(iostats->file_io_stats_by_temperature.cold_file_bytes_read, 0);
+ ASSERT_EQ(options.statistics->getTickerCount(HOT_FILE_READ_BYTES), 0);
+ ASSERT_EQ(options.statistics->getTickerCount(WARM_FILE_READ_BYTES), 0);
+ ASSERT_EQ(options.statistics->getTickerCount(COLD_FILE_READ_BYTES), 0);
+ ASSERT_EQ(options.statistics->getTickerCount(HOT_FILE_READ_COUNT), 0);
+ ASSERT_EQ(options.statistics->getTickerCount(WARM_FILE_READ_COUNT), 0);
+ ASSERT_EQ(options.statistics->getTickerCount(COLD_FILE_READ_COUNT), 0);
+
+ ASSERT_OK(Put("foo", "bar"));
+ ASSERT_OK(Put("bar", "bar"));
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ db_->GetColumnFamilyMetaData(&metadata);
+ ASSERT_EQ(2, metadata.file_count);
+ ASSERT_EQ(Temperature::kUnknown, metadata.levels[0].files[0].temperature);
+ size = GetSstSizeHelper(Temperature::kUnknown);
+ ASSERT_GT(size, 0);
+ size = GetSstSizeHelper(Temperature::kWarm);
+ ASSERT_EQ(size, 0);
+
+ // Update bottommost temperature
+ options.bottommost_temperature = Temperature::kWarm;
+ Reopen(options);
+ db_->GetColumnFamilyMetaData(&metadata);
+ // Should not impact existing ones
+ ASSERT_EQ(Temperature::kUnknown,
+ metadata.levels[kBottommostLevel].files[0].temperature);
+ size = GetSstSizeHelper(Temperature::kUnknown);
+ ASSERT_GT(size, 0);
+ size = GetSstSizeHelper(Temperature::kWarm);
+ ASSERT_EQ(size, 0);
+
+ // new generated file should have the new settings
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ db_->GetColumnFamilyMetaData(&metadata);
+ ASSERT_EQ(1, metadata.file_count);
+ ASSERT_EQ(Temperature::kWarm,
+ metadata.levels[kBottommostLevel].files[0].temperature);
+ size = GetSstSizeHelper(Temperature::kUnknown);
+ ASSERT_EQ(size, 0);
+ size = GetSstSizeHelper(Temperature::kWarm);
+ ASSERT_GT(size, 0);
+ ASSERT_EQ(options.statistics->getTickerCount(HOT_FILE_READ_BYTES), 0);
+ ASSERT_GT(options.statistics->getTickerCount(WARM_FILE_READ_BYTES), 0);
+ ASSERT_EQ(options.statistics->getTickerCount(COLD_FILE_READ_BYTES), 0);
+ ASSERT_EQ(options.statistics->getTickerCount(HOT_FILE_READ_COUNT), 0);
+ ASSERT_GT(options.statistics->getTickerCount(WARM_FILE_READ_COUNT), 0);
+ ASSERT_EQ(options.statistics->getTickerCount(COLD_FILE_READ_COUNT), 0);
+
+ // non-bottommost file still has unknown temperature
+ ASSERT_OK(Put("foo", "bar"));
+ ASSERT_OK(Put("bar", "bar"));
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ db_->GetColumnFamilyMetaData(&metadata);
+ ASSERT_EQ(2, metadata.file_count);
+ ASSERT_EQ(Temperature::kUnknown, metadata.levels[0].files[0].temperature);
+ size = GetSstSizeHelper(Temperature::kUnknown);
+ ASSERT_GT(size, 0);
+ size = GetSstSizeHelper(Temperature::kWarm);
+ ASSERT_GT(size, 0);
+
+ // check other non-exist temperatures
+ size = GetSstSizeHelper(Temperature::kHot);
+ ASSERT_EQ(size, 0);
+ size = GetSstSizeHelper(Temperature::kCold);
+ ASSERT_EQ(size, 0);
+ std::string prop;
+ ASSERT_TRUE(dbfull()->GetProperty(
+ DB::Properties::kLiveSstFilesSizeAtTemperature + std::to_string(22),
+ &prop));
+ ASSERT_EQ(std::atoi(prop.c_str()), 0);
+
+ // Update bottommost temperature dynamically with SetOptions
+ auto s = db_->SetOptions({{"last_level_temperature", "kCold"}});
+ ASSERT_OK(s);
+ ASSERT_EQ(db_->GetOptions().bottommost_temperature, Temperature::kCold);
+ db_->GetColumnFamilyMetaData(&metadata);
+ // Should not impact the existing files
+ ASSERT_EQ(Temperature::kWarm,
+ metadata.levels[kBottommostLevel].files[0].temperature);
+ size = GetSstSizeHelper(Temperature::kUnknown);
+ ASSERT_GT(size, 0);
+ size = GetSstSizeHelper(Temperature::kWarm);
+ ASSERT_GT(size, 0);
+ size = GetSstSizeHelper(Temperature::kCold);
+ ASSERT_EQ(size, 0);
+
+ // new generated files should have the new settings
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ db_->GetColumnFamilyMetaData(&metadata);
+ ASSERT_EQ(1, metadata.file_count);
+ ASSERT_EQ(Temperature::kCold,
+ metadata.levels[kBottommostLevel].files[0].temperature);
+ size = GetSstSizeHelper(Temperature::kUnknown);
+ ASSERT_EQ(size, 0);
+ size = GetSstSizeHelper(Temperature::kWarm);
+ ASSERT_EQ(size, 0);
+ size = GetSstSizeHelper(Temperature::kCold);
+ ASSERT_GT(size, 0);
+
+ // kLastTemperature is an invalid temperature
+ options.bottommost_temperature = Temperature::kLastTemperature;
+ s = TryReopen(options);
+ ASSERT_TRUE(s.IsIOError());
+}
+
+TEST_F(DBTest2, LastLevelStatistics) {
+ Options options = CurrentOptions();
+ options.bottommost_temperature = Temperature::kWarm;
+ options.level0_file_num_compaction_trigger = 2;
+ options.level_compaction_dynamic_level_bytes = true;
+ options.statistics = CreateDBStatistics();
+ Reopen(options);
+
+ // generate 1 sst on level 0
+ ASSERT_OK(Put("foo", "bar"));
+ ASSERT_OK(Put("bar", "bar"));
+ ASSERT_OK(Flush());
+ ASSERT_EQ("bar", Get("bar"));
+
+ ASSERT_GT(options.statistics->getTickerCount(NON_LAST_LEVEL_READ_BYTES), 0);
+ ASSERT_GT(options.statistics->getTickerCount(NON_LAST_LEVEL_READ_COUNT), 0);
+ ASSERT_EQ(options.statistics->getTickerCount(LAST_LEVEL_READ_BYTES), 0);
+ ASSERT_EQ(options.statistics->getTickerCount(LAST_LEVEL_READ_COUNT), 0);
+
+ // 2nd flush to trigger compaction
+ ASSERT_OK(Put("foo", "bar"));
+ ASSERT_OK(Put("bar", "bar"));
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ ASSERT_EQ("bar", Get("bar"));
+
+ ASSERT_EQ(options.statistics->getTickerCount(LAST_LEVEL_READ_BYTES),
+ options.statistics->getTickerCount(WARM_FILE_READ_BYTES));
+ ASSERT_EQ(options.statistics->getTickerCount(LAST_LEVEL_READ_COUNT),
+ options.statistics->getTickerCount(WARM_FILE_READ_COUNT));
+
+ auto pre_bytes =
+ options.statistics->getTickerCount(NON_LAST_LEVEL_READ_BYTES);
+ auto pre_count =
+ options.statistics->getTickerCount(NON_LAST_LEVEL_READ_COUNT);
+
+ // 3rd flush to generate 1 sst on level 0
+ ASSERT_OK(Put("foo", "bar"));
+ ASSERT_OK(Put("bar", "bar"));
+ ASSERT_OK(Flush());
+ ASSERT_EQ("bar", Get("bar"));
+
+ ASSERT_GT(options.statistics->getTickerCount(NON_LAST_LEVEL_READ_BYTES),
+ pre_bytes);
+ ASSERT_GT(options.statistics->getTickerCount(NON_LAST_LEVEL_READ_COUNT),
+ pre_count);
+ ASSERT_EQ(options.statistics->getTickerCount(LAST_LEVEL_READ_BYTES),
+ options.statistics->getTickerCount(WARM_FILE_READ_BYTES));
+ ASSERT_EQ(options.statistics->getTickerCount(LAST_LEVEL_READ_COUNT),
+ options.statistics->getTickerCount(WARM_FILE_READ_COUNT));
+}
+
+TEST_F(DBTest2, CheckpointFileTemperature) {
+ class NoLinkTestFS : public FileTemperatureTestFS {
+ using FileTemperatureTestFS::FileTemperatureTestFS;
+
+ IOStatus LinkFile(const std::string&, const std::string&, const IOOptions&,
+ IODebugContext*) override {
+ // return not supported to force checkpoint copy the file instead of just
+ // link
+ return IOStatus::NotSupported();
+ }
+ };
+ auto test_fs = std::make_shared<NoLinkTestFS>(env_->GetFileSystem());
+ std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, test_fs));
+ Options options = CurrentOptions();
+ options.bottommost_temperature = Temperature::kWarm;
+ // set dynamic_level to true so the compaction would compact the data to the
+ // last level directly which will have the last_level_temperature
+ options.level_compaction_dynamic_level_bytes = true;
+ options.level0_file_num_compaction_trigger = 2;
+ options.env = env.get();
+ Reopen(options);
+
+ // generate a bottommost file and a non-bottommost file
+ ASSERT_OK(Put("foo", "bar"));
+ ASSERT_OK(Put("bar", "bar"));
+ ASSERT_OK(Flush());
+ ASSERT_OK(Put("foo", "bar"));
+ ASSERT_OK(Put("bar", "bar"));
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ ASSERT_OK(Put("foo", "bar"));
+ ASSERT_OK(Put("bar", "bar"));
+ ASSERT_OK(Flush());
+ auto size = GetSstSizeHelper(Temperature::kWarm);
+ ASSERT_GT(size, 0);
+
+ std::map<uint64_t, Temperature> temperatures;
+ std::vector<LiveFileStorageInfo> infos;
+ ASSERT_OK(
+ dbfull()->GetLiveFilesStorageInfo(LiveFilesStorageInfoOptions(), &infos));
+ for (auto info : infos) {
+ temperatures.emplace(info.file_number, info.temperature);
+ }
+
+ test_fs->PopRequestedSstFileTemperatures();
+ Checkpoint* checkpoint;
+ ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
+ ASSERT_OK(
+ checkpoint->CreateCheckpoint(dbname_ + kFilePathSeparator + "tempcp"));
+
+ // checking src file src_temperature hints: 2 sst files: 1 sst is kWarm,
+ // another is kUnknown
+ std::vector<std::pair<uint64_t, Temperature>> requested_temps;
+ test_fs->PopRequestedSstFileTemperatures(&requested_temps);
+ // Two requests
+ ASSERT_EQ(requested_temps.size(), 2);
+ std::set<uint64_t> distinct_requests;
+ for (const auto& requested_temp : requested_temps) {
+ // Matching manifest temperatures
+ ASSERT_EQ(temperatures.at(requested_temp.first), requested_temp.second);
+ distinct_requests.insert(requested_temp.first);
+ }
+ // Each request to distinct file
+ ASSERT_EQ(distinct_requests.size(), requested_temps.size());
+
+ delete checkpoint;
+ Close();
+}
+
+TEST_F(DBTest2, FileTemperatureManifestFixup) {
+ auto test_fs = std::make_shared<FileTemperatureTestFS>(env_->GetFileSystem());
+ std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, test_fs));
+ Options options = CurrentOptions();
+ options.bottommost_temperature = Temperature::kWarm;
+ // set dynamic_level to true so the compaction would compact the data to the
+ // last level directly which will have the last_level_temperature
+ options.level_compaction_dynamic_level_bytes = true;
+ options.level0_file_num_compaction_trigger = 2;
+ options.env = env.get();
+ std::vector<std::string> cfs = {/*"default",*/ "test1", "test2"};
+ CreateAndReopenWithCF(cfs, options);
+ // Needed for later re-opens (weird)
+ cfs.insert(cfs.begin(), kDefaultColumnFamilyName);
+
+ // Generate a bottommost file in all CFs
+ for (int cf = 0; cf < 3; ++cf) {
+ ASSERT_OK(Put(cf, "a", "val"));
+ ASSERT_OK(Put(cf, "c", "val"));
+ ASSERT_OK(Flush(cf));
+ ASSERT_OK(Put(cf, "b", "val"));
+ ASSERT_OK(Put(cf, "d", "val"));
+ ASSERT_OK(Flush(cf));
+ }
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+ // verify
+ ASSERT_GT(GetSstSizeHelper(Temperature::kWarm), 0);
+ ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0);
+ ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
+ ASSERT_EQ(GetSstSizeHelper(Temperature::kHot), 0);
+
+ // Generate a non-bottommost file in all CFs
+ for (int cf = 0; cf < 3; ++cf) {
+ ASSERT_OK(Put(cf, "e", "val"));
+ ASSERT_OK(Flush(cf));
+ }
+
+ // re-verify
+ ASSERT_GT(GetSstSizeHelper(Temperature::kWarm), 0);
+ // Not supported: ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
+ ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
+ ASSERT_EQ(GetSstSizeHelper(Temperature::kHot), 0);
+
+ // Now change FS temperature on bottommost file(s) to kCold
+ std::map<uint64_t, Temperature> current_temps;
+ test_fs->CopyCurrentSstFileTemperatures(¤t_temps);
+ for (auto e : current_temps) {
+ if (e.second == Temperature::kWarm) {
+ test_fs->OverrideSstFileTemperature(e.first, Temperature::kCold);
+ }
+ }
+ // Metadata not yet updated
+ ASSERT_EQ(Get("a"), "val");
+ ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
+
+ // Update with Close and UpdateManifestForFilesState, but first save cf
+ // descriptors
+ std::vector<ColumnFamilyDescriptor> column_families;
+ for (size_t i = 0; i < handles_.size(); ++i) {
+ ColumnFamilyDescriptor cfdescriptor;
+ // GetDescriptor is not implemented for ROCKSDB_LITE
+ handles_[i]->GetDescriptor(&cfdescriptor).PermitUncheckedError();
+ column_families.push_back(cfdescriptor);
+ }
+ Close();
+ experimental::UpdateManifestForFilesStateOptions update_opts;
+ update_opts.update_temperatures = true;
+
+ ASSERT_OK(experimental::UpdateManifestForFilesState(
+ options, dbname_, column_families, update_opts));
+
+ // Re-open and re-verify after update
+ ReopenWithColumnFamilies(cfs, options);
+ ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
+ // Not supported: ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
+ ASSERT_EQ(GetSstSizeHelper(Temperature::kWarm), 0);
+ ASSERT_EQ(GetSstSizeHelper(Temperature::kHot), 0);
+
+ // Change kUnknown to kHot
+ test_fs->CopyCurrentSstFileTemperatures(¤t_temps);
+ for (auto e : current_temps) {
+ if (e.second == Temperature::kUnknown) {
+ test_fs->OverrideSstFileTemperature(e.first, Temperature::kHot);
+ }
+ }
+
+ // Update with Close and UpdateManifestForFilesState
+ Close();
+ ASSERT_OK(experimental::UpdateManifestForFilesState(
+ options, dbname_, column_families, update_opts));
+
+ // Re-open and re-verify after update
+ ReopenWithColumnFamilies(cfs, options);
+ ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
+ ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0);
+ ASSERT_EQ(GetSstSizeHelper(Temperature::kWarm), 0);
+ ASSERT_GT(GetSstSizeHelper(Temperature::kHot), 0);
+
+ Close();
}
#endif // ROCKSDB_LITE
Status s = TryReopen(options);
ASSERT_TRUE(s.IsIOError());
}
-} // namespace ROCKSDB_NAMESPACE
-#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
-extern "C" {
-void RegisterCustomObjects(int argc, char** argv);
+TEST_F(DBTest2, PointInTimeRecoveryWithSyncFailureInCFCreation) {
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+ {{"DBImpl::BackgroundCallFlush:Start:1",
+ "PointInTimeRecoveryWithSyncFailureInCFCreation:1"},
+ {"PointInTimeRecoveryWithSyncFailureInCFCreation:2",
+ "DBImpl::BackgroundCallFlush:Start:2"}});
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ CreateColumnFamilies({"test1"}, Options());
+ ASSERT_OK(Put("foo", "bar"));
+
+ // Creating a CF when a flush is going on, log is synced but the
+ // closed log file is not synced and corrupted.
+ port::Thread flush_thread([&]() { ASSERT_NOK(Flush()); });
+ TEST_SYNC_POINT("PointInTimeRecoveryWithSyncFailureInCFCreation:1");
+ CreateColumnFamilies({"test2"}, Options());
+ env_->corrupt_in_sync_ = true;
+ TEST_SYNC_POINT("PointInTimeRecoveryWithSyncFailureInCFCreation:2");
+ flush_thread.join();
+ env_->corrupt_in_sync_ = false;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+
+ // Reopening the DB should not corrupt anything
+ Options options = CurrentOptions();
+ options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
+ ReopenWithColumnFamilies({"default", "test1", "test2"}, options);
}
-#else
-void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
-#endif // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
+
+TEST_F(DBTest2, RenameDirectory) {
+ Options options = CurrentOptions();
+ DestroyAndReopen(options);
+ ASSERT_OK(Put("foo", "value0"));
+ Close();
+ auto old_dbname = dbname_;
+ auto new_dbname = dbname_ + "_2";
+ EXPECT_OK(env_->RenameFile(dbname_, new_dbname));
+ options.create_if_missing = false;
+ dbname_ = new_dbname;
+ ASSERT_OK(TryReopen(options));
+ ASSERT_EQ("value0", Get("foo"));
+ Destroy(options);
+ dbname_ = old_dbname;
+}
+
+TEST_F(DBTest2, SstUniqueIdVerifyBackwardCompatible) {
+ const int kNumSst = 3;
+ const int kLevel0Trigger = 4;
+ auto options = CurrentOptions();
+ options.level0_file_num_compaction_trigger = kLevel0Trigger;
+ options.statistics = CreateDBStatistics();
+ // Skip for now
+ options.verify_sst_unique_id_in_manifest = false;
+ Reopen(options);
+
+ std::atomic_int skipped = 0;
+ std::atomic_int passed = 0;
+ SyncPoint::GetInstance()->SetCallBack(
+ "BlockBasedTable::Open::SkippedVerifyUniqueId",
+ [&](void* /*arg*/) { skipped++; });
+ SyncPoint::GetInstance()->SetCallBack(
+ "BlockBasedTable::Open::PassedVerifyUniqueId",
+ [&](void* /*arg*/) { passed++; });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ // generate a few SSTs
+ for (int i = 0; i < kNumSst; i++) {
+ for (int j = 0; j < 100; j++) {
+ ASSERT_OK(Put(Key(i * 10 + j), "value"));
+ }
+ ASSERT_OK(Flush());
+ }
+
+ // Verification has been skipped on files so far
+ EXPECT_EQ(skipped, kNumSst);
+ EXPECT_EQ(passed, 0);
+
+ // Reopen with verification
+ options.verify_sst_unique_id_in_manifest = true;
+ skipped = 0;
+ passed = 0;
+ Reopen(options);
+ EXPECT_EQ(skipped, 0);
+ EXPECT_EQ(passed, kNumSst);
+
+ // Now simulate no unique id in manifest for next file
+ // NOTE: this only works for loading manifest from disk,
+ // not in-memory manifest, so we need to re-open below.
+ SyncPoint::GetInstance()->SetCallBack(
+ "VersionEdit::EncodeTo:UniqueId", [&](void* arg) {
+ auto unique_id = static_cast<UniqueId64x2*>(arg);
+ // remove id before writing it to manifest
+ (*unique_id)[0] = 0;
+ (*unique_id)[1] = 0;
+ });
+
+ // test compaction generated Sst
+ for (int i = kNumSst; i < kLevel0Trigger; i++) {
+ for (int j = 0; j < 100; j++) {
+ ASSERT_OK(Put(Key(i * 10 + j), "value"));
+ }
+ ASSERT_OK(Flush());
+ }
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+#ifndef ROCKSDB_LITE
+ ASSERT_EQ("0,1", FilesPerLevel(0));
+#endif // ROCKSDB_LITE
+
+ // Reopen (with verification)
+ ASSERT_TRUE(options.verify_sst_unique_id_in_manifest);
+ skipped = 0;
+ passed = 0;
+ Reopen(options);
+ EXPECT_EQ(skipped, 1);
+ EXPECT_EQ(passed, 0);
+}
+
+TEST_F(DBTest2, SstUniqueIdVerify) {
+ const int kNumSst = 3;
+ const int kLevel0Trigger = 4;
+ auto options = CurrentOptions();
+ options.level0_file_num_compaction_trigger = kLevel0Trigger;
+ // Allow mismatch for now
+ options.verify_sst_unique_id_in_manifest = false;
+ Reopen(options);
+
+ SyncPoint::GetInstance()->SetCallBack(
+ "PropertyBlockBuilder::AddTableProperty:Start", [&](void* props_vs) {
+ auto props = static_cast<TableProperties*>(props_vs);
+ // update table property session_id to a different one, which
+ // changes unique ID
+ props->db_session_id = DBImpl::GenerateDbSessionId(nullptr);
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ // generate a few SSTs
+ for (int i = 0; i < kNumSst; i++) {
+ for (int j = 0; j < 100; j++) {
+ ASSERT_OK(Put(Key(i * 10 + j), "value"));
+ }
+ ASSERT_OK(Flush());
+ }
+
+ // Reopen with verification should report corruption
+ options.verify_sst_unique_id_in_manifest = true;
+ auto s = TryReopen(options);
+ ASSERT_TRUE(s.IsCorruption());
+
+ // Reopen without verification should be fine
+ options.verify_sst_unique_id_in_manifest = false;
+ Reopen(options);
+
+ // test compaction generated Sst
+ for (int i = kNumSst; i < kLevel0Trigger; i++) {
+ for (int j = 0; j < 100; j++) {
+ ASSERT_OK(Put(Key(i * 10 + j), "value"));
+ }
+ ASSERT_OK(Flush());
+ }
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+#ifndef ROCKSDB_LITE
+ ASSERT_EQ("0,1", FilesPerLevel(0));
+#endif // ROCKSDB_LITE
+
+ // Reopen with verification should fail
+ options.verify_sst_unique_id_in_manifest = true;
+ s = TryReopen(options);
+ ASSERT_TRUE(s.IsCorruption());
+}
+
+TEST_F(DBTest2, SstUniqueIdVerifyMultiCFs) {
+ const int kNumSst = 3;
+ const int kLevel0Trigger = 4;
+ auto options = CurrentOptions();
+ options.level0_file_num_compaction_trigger = kLevel0Trigger;
+ // Allow mismatch for now
+ options.verify_sst_unique_id_in_manifest = false;
+
+ CreateAndReopenWithCF({"one", "two"}, options);
+
+ // generate good SSTs
+ for (int cf_num : {0, 2}) {
+ for (int i = 0; i < kNumSst; i++) {
+ for (int j = 0; j < 100; j++) {
+ ASSERT_OK(Put(cf_num, Key(i * 10 + j), "value"));
+ }
+ ASSERT_OK(Flush(cf_num));
+ }
+ }
+
+ // generate SSTs with bad unique id
+ SyncPoint::GetInstance()->SetCallBack(
+ "PropertyBlockBuilder::AddTableProperty:Start", [&](void* props_vs) {
+ auto props = static_cast<TableProperties*>(props_vs);
+ // update table property session_id to a different one
+ props->db_session_id = DBImpl::GenerateDbSessionId(nullptr);
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+ for (int i = 0; i < kNumSst; i++) {
+ for (int j = 0; j < 100; j++) {
+ ASSERT_OK(Put(1, Key(i * 10 + j), "value"));
+ }
+ ASSERT_OK(Flush(1));
+ }
+
+ // Reopen with verification should report corruption
+ options.verify_sst_unique_id_in_manifest = true;
+ auto s = TryReopenWithColumnFamilies({"default", "one", "two"}, options);
+ ASSERT_TRUE(s.IsCorruption());
+}
+
+TEST_F(DBTest2, BestEffortsRecoveryWithSstUniqueIdVerification) {
+ const auto tamper_with_uniq_id = [&](void* arg) {
+ auto props = static_cast<TableProperties*>(arg);
+ assert(props);
+ // update table property session_id to a different one
+ props->db_session_id = DBImpl::GenerateDbSessionId(nullptr);
+ };
+
+ const auto assert_db = [&](size_t expected_count,
+ const std::string& expected_v) {
+ std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
+ size_t cnt = 0;
+ for (it->SeekToFirst(); it->Valid(); it->Next(), ++cnt) {
+ ASSERT_EQ(std::to_string(cnt), it->key());
+ ASSERT_EQ(expected_v, it->value());
+ }
+ ASSERT_EQ(expected_count, cnt);
+ };
+
+ const int num_l0_compaction_trigger = 8;
+ const int num_l0 = num_l0_compaction_trigger - 1;
+ Options options = CurrentOptions();
+ options.level0_file_num_compaction_trigger = num_l0_compaction_trigger;
+
+ for (int k = 0; k < num_l0; ++k) {
+ // Allow mismatch for now
+ options.verify_sst_unique_id_in_manifest = false;
+
+ DestroyAndReopen(options);
+
+ constexpr size_t num_keys_per_file = 10;
+ for (int i = 0; i < num_l0; ++i) {
+ for (size_t j = 0; j < num_keys_per_file; ++j) {
+ ASSERT_OK(Put(std::to_string(j), "v" + std::to_string(i)));
+ }
+ if (i == k) {
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->SetCallBack(
+ "PropertyBlockBuilder::AddTableProperty:Start",
+ tamper_with_uniq_id);
+ SyncPoint::GetInstance()->EnableProcessing();
+ }
+ ASSERT_OK(Flush());
+ }
+
+ options.verify_sst_unique_id_in_manifest = true;
+ Status s = TryReopen(options);
+ ASSERT_TRUE(s.IsCorruption());
+
+ options.best_efforts_recovery = true;
+ Reopen(options);
+ assert_db(k == 0 ? 0 : num_keys_per_file, "v" + std::to_string(k - 1));
+
+ // Reopen with regular recovery
+ options.best_efforts_recovery = false;
+ Reopen(options);
+ assert_db(k == 0 ? 0 : num_keys_per_file, "v" + std::to_string(k - 1));
+
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+
+ for (size_t i = 0; i < num_keys_per_file; ++i) {
+ ASSERT_OK(Put(std::to_string(i), "v"));
+ }
+ ASSERT_OK(Flush());
+ Reopen(options);
+ {
+ for (size_t i = 0; i < num_keys_per_file; ++i) {
+ ASSERT_EQ("v", Get(std::to_string(i)));
+ }
+ }
+ }
+}
+
+#ifndef ROCKSDB_LITE
+TEST_F(DBTest2, GetLatestSeqAndTsForKey) {
+ Destroy(last_options_);
+
+ Options options = CurrentOptions();
+ options.max_write_buffer_size_to_maintain = 64 << 10;
+ options.create_if_missing = true;
+ options.disable_auto_compactions = true;
+ options.comparator = test::BytewiseComparatorWithU64TsWrapper();
+ options.statistics = CreateDBStatistics();
+
+ Reopen(options);
+
+ constexpr uint64_t kTsU64Value = 12;
+
+ for (uint64_t key = 0; key < 100; ++key) {
+ std::string ts;
+ PutFixed64(&ts, kTsU64Value);
+
+ std::string key_str;
+ PutFixed64(&key_str, key);
+ std::reverse(key_str.begin(), key_str.end());
+ ASSERT_OK(db_->Put(WriteOptions(), key_str, ts, "value"));
+ }
+
+ ASSERT_OK(Flush());
+
+ constexpr bool cache_only = true;
+ constexpr SequenceNumber lower_bound_seq = 0;
+ auto* cfhi = static_cast_with_check<ColumnFamilyHandleImpl>(
+ dbfull()->DefaultColumnFamily());
+ assert(cfhi);
+ assert(cfhi->cfd());
+ SuperVersion* sv = cfhi->cfd()->GetSuperVersion();
+ for (uint64_t key = 0; key < 100; ++key) {
+ std::string key_str;
+ PutFixed64(&key_str, key);
+ std::reverse(key_str.begin(), key_str.end());
+ std::string ts;
+ SequenceNumber seq = kMaxSequenceNumber;
+ bool found_record_for_key = false;
+ bool is_blob_index = false;
+
+ const Status s = dbfull()->GetLatestSequenceForKey(
+ sv, key_str, cache_only, lower_bound_seq, &seq, &ts,
+ &found_record_for_key, &is_blob_index);
+ ASSERT_OK(s);
+ std::string expected_ts;
+ PutFixed64(&expected_ts, kTsU64Value);
+ ASSERT_EQ(expected_ts, ts);
+ ASSERT_TRUE(found_record_for_key);
+ ASSERT_FALSE(is_blob_index);
+ }
+
+ // Verify that no read to SST files.
+ ASSERT_EQ(0, options.statistics->getTickerCount(GET_HIT_L0));
+}
+#endif // ROCKSDB_LITE
+
+} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();