// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
-// This source code is licensed under the BSD-style license found in the
-// LICENSE file in the root directory of this source tree. An additional grant
-// of patent rights can be found in the PATENTS file in the same directory.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
#include <alloca.h>
#endif
+#include "cache/lru_cache.h"
#include "db/db_impl.h"
#include "db/db_test_util.h"
#include "db/dbformat.h"
#include "util/compression.h"
#include "util/file_reader_writer.h"
#include "util/filename.h"
-#include "util/hash.h"
-#include "util/logging.h"
#include "util/mutexlock.h"
#include "util/rate_limiter.h"
#include "util/string_util.h"
for (bool sync : {true, false}) {
for (bool disableWAL : {true, false}) {
+ if (sync && disableWAL) {
+ // sync and disableWAL is incompatible.
+ continue;
+ }
// Use a small number to ensure a large delay that is still effective
// when we do Put
// TODO(myabandeh): this is time dependent and could potentially make
std::atomic<int> sleep_count(0);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::DelayWrite:Sleep",
- [&](void* arg) { sleep_count.fetch_add(1); });
+ [&](void* /*arg*/) { sleep_count.fetch_add(1); });
std::atomic<int> wait_count(0);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::DelayWrite:Wait",
- [&](void* arg) { wait_count.fetch_add(1); });
+ [&](void* /*arg*/) { wait_count.fetch_add(1); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
WriteOptions wo;
}
}
+TEST_F(DBTest, MixedSlowdownOptions) {
+ Options options = CurrentOptions();
+ options.env = env_;
+ options.write_buffer_size = 100000;
+ CreateAndReopenWithCF({"pikachu"}, options);
+ std::vector<port::Thread> threads;
+ std::atomic<int> thread_num(0);
+
+ std::function<void()> write_slowdown_func = [&]() {
+ int a = thread_num.fetch_add(1);
+ std::string key = "foo" + std::to_string(a);
+ WriteOptions wo;
+ wo.no_slowdown = false;
+ ASSERT_OK(dbfull()->Put(wo, key, "bar"));
+ };
+ std::function<void()> write_no_slowdown_func = [&]() {
+ int a = thread_num.fetch_add(1);
+ std::string key = "foo" + std::to_string(a);
+ WriteOptions wo;
+ wo.no_slowdown = true;
+ ASSERT_NOK(dbfull()->Put(wo, key, "bar"));
+ };
+ // Use a small number to ensure a large delay that is still effective
+ // when we do Put
+ // TODO(myabandeh): this is time dependent and could potentially make
+ // the test flaky
+ auto token = dbfull()->TEST_write_controler().GetDelayToken(1);
+ std::atomic<int> sleep_count(0);
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::DelayWrite:BeginWriteStallDone",
+ [&](void* /*arg*/) {
+ sleep_count.fetch_add(1);
+ if (threads.empty()) {
+ for (int i = 0; i < 2; ++i) {
+ threads.emplace_back(write_slowdown_func);
+ }
+ for (int i = 0; i < 2; ++i) {
+ threads.emplace_back(write_no_slowdown_func);
+ }
+ }
+ });
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ WriteOptions wo;
+ wo.sync = false;
+ wo.disableWAL = false;
+ wo.no_slowdown = false;
+ dbfull()->Put(wo, "foo", "bar");
+ // We need the 2nd write to trigger delay. This is because delay is
+ // estimated based on the last write size which is 0 for the first write.
+ ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2"));
+ token.reset();
+
+ for (auto& t : threads) {
+ t.join();
+ }
+ ASSERT_GE(sleep_count.load(), 1);
+
+ wo.no_slowdown = true;
+ ASSERT_OK(dbfull()->Put(wo, "foo3", "bar"));
+}
+
+TEST_F(DBTest, MixedSlowdownOptionsInQueue) {
+ Options options = CurrentOptions();
+ options.env = env_;
+ options.write_buffer_size = 100000;
+ CreateAndReopenWithCF({"pikachu"}, options);
+ std::vector<port::Thread> threads;
+ std::atomic<int> thread_num(0);
+
+ std::function<void()> write_no_slowdown_func = [&]() {
+ int a = thread_num.fetch_add(1);
+ std::string key = "foo" + std::to_string(a);
+ WriteOptions wo;
+ wo.no_slowdown = true;
+ ASSERT_NOK(dbfull()->Put(wo, key, "bar"));
+ };
+ // Use a small number to ensure a large delay that is still effective
+ // when we do Put
+ // TODO(myabandeh): this is time dependent and could potentially make
+ // the test flaky
+ auto token = dbfull()->TEST_write_controler().GetDelayToken(1);
+ std::atomic<int> sleep_count(0);
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::DelayWrite:Sleep",
+ [&](void* /*arg*/) {
+ sleep_count.fetch_add(1);
+ if (threads.empty()) {
+ for (int i = 0; i < 2; ++i) {
+ threads.emplace_back(write_no_slowdown_func);
+ }
+ // Sleep for 2s to allow the threads to insert themselves into the
+ // write queue
+ env_->SleepForMicroseconds(3000000ULL);
+ }
+ });
+ std::atomic<int> wait_count(0);
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::DelayWrite:Wait",
+ [&](void* /*arg*/) { wait_count.fetch_add(1); });
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ WriteOptions wo;
+ wo.sync = false;
+ wo.disableWAL = false;
+ wo.no_slowdown = false;
+ dbfull()->Put(wo, "foo", "bar");
+ // We need the 2nd write to trigger delay. This is because delay is
+ // estimated based on the last write size which is 0 for the first write.
+ ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2"));
+ token.reset();
+
+ for (auto& t : threads) {
+ t.join();
+ }
+ ASSERT_EQ(sleep_count.load(), 1);
+ ASSERT_GE(wait_count.load(), 0);
+}
+
+TEST_F(DBTest, MixedSlowdownOptionsStop) {
+ Options options = CurrentOptions();
+ options.env = env_;
+ options.write_buffer_size = 100000;
+ CreateAndReopenWithCF({"pikachu"}, options);
+ std::vector<port::Thread> threads;
+ std::atomic<int> thread_num(0);
+
+ std::function<void()> write_slowdown_func = [&]() {
+ int a = thread_num.fetch_add(1);
+ std::string key = "foo" + std::to_string(a);
+ WriteOptions wo;
+ wo.no_slowdown = false;
+ ASSERT_OK(dbfull()->Put(wo, key, "bar"));
+ };
+ std::function<void()> write_no_slowdown_func = [&]() {
+ int a = thread_num.fetch_add(1);
+ std::string key = "foo" + std::to_string(a);
+ WriteOptions wo;
+ wo.no_slowdown = true;
+ ASSERT_NOK(dbfull()->Put(wo, key, "bar"));
+ };
+ std::function<void()> wakeup_writer = [&]() {
+ dbfull()->mutex_.Lock();
+ dbfull()->bg_cv_.SignalAll();
+ dbfull()->mutex_.Unlock();
+ };
+ // Use a small number to ensure a large delay that is still effective
+ // when we do Put
+ // TODO(myabandeh): this is time dependent and could potentially make
+ // the test flaky
+ auto token = dbfull()->TEST_write_controler().GetStopToken();
+ std::atomic<int> wait_count(0);
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::DelayWrite:Wait",
+ [&](void* /*arg*/) {
+ wait_count.fetch_add(1);
+ if (threads.empty()) {
+ for (int i = 0; i < 2; ++i) {
+ threads.emplace_back(write_slowdown_func);
+ }
+ for (int i = 0; i < 2; ++i) {
+ threads.emplace_back(write_no_slowdown_func);
+ }
+ // Sleep for 2s to allow the threads to insert themselves into the
+ // write queue
+ env_->SleepForMicroseconds(3000000ULL);
+ }
+ token.reset();
+ threads.emplace_back(wakeup_writer);
+ });
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ WriteOptions wo;
+ wo.sync = false;
+ wo.disableWAL = false;
+ wo.no_slowdown = false;
+ dbfull()->Put(wo, "foo", "bar");
+ // We need the 2nd write to trigger delay. This is because delay is
+ // estimated based on the last write size which is 0 for the first write.
+ ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2"));
+ token.reset();
+
+ for (auto& t : threads) {
+ t.join();
+ }
+ ASSERT_GE(wait_count.load(), 1);
+
+ wo.no_slowdown = true;
+ ASSERT_OK(dbfull()->Put(wo, "foo3", "bar"));
+}
#ifndef ROCKSDB_LITE
TEST_F(DBTest, LevelLimitReopen) {
kSkipUniversalCompaction | kSkipMergePut));
}
+// Disable because not all platform can run it.
+// It requires more than 9GB memory to run it, With single allocation
+// of more than 3GB.
+TEST_F(DBTest, DISABLED_SanitizeVeryVeryLargeValue) {
+ const size_t kValueSize = 4 * size_t{1024 * 1024 * 1024}; // 4GB value
+ std::string raw(kValueSize, 'v');
+ Options options = CurrentOptions();
+ options.env = env_;
+ options.merge_operator = MergeOperators::CreatePutOperator();
+ options.write_buffer_size = 100000; // Small write buffer
+ options.paranoid_checks = true;
+ DestroyAndReopen(options);
+
+ ASSERT_OK(Put("boo", "v1"));
+ ASSERT_TRUE(Put("foo", raw).IsInvalidArgument());
+ ASSERT_TRUE(Merge("foo", raw).IsInvalidArgument());
+
+ WriteBatch wb;
+ ASSERT_TRUE(wb.Put("foo", raw).IsInvalidArgument());
+ ASSERT_TRUE(wb.Merge("foo", raw).IsInvalidArgument());
+
+ Slice value_slice = raw;
+ Slice key_slice = "foo";
+ SliceParts sp_key(&key_slice, 1);
+ SliceParts sp_value(&value_slice, 1);
+
+ ASSERT_TRUE(wb.Put(sp_key, sp_value).IsInvalidArgument());
+ ASSERT_TRUE(wb.Merge(sp_key, sp_value).IsInvalidArgument());
+}
+
// Disable because not all platform can run it.
// It requires more than 9GB memory to run it, With single allocation
// of more than 3GB.
ASSERT_OK(Put(key2, raw));
dbfull()->TEST_WaitForFlushMemTable();
+#ifndef ROCKSDB_LITE
ASSERT_EQ(1, NumTableFilesAtLevel(0));
+#endif // !ROCKSDB_LITE
std::string value;
Status s = db_->Get(ReadOptions(), key1, &value);
namespace {
class KeepFilter : public CompactionFilter {
public:
- virtual bool Filter(int level, const Slice& key, const Slice& value,
- std::string* new_value,
- bool* value_changed) const override {
+ virtual bool Filter(int /*level*/, const Slice& /*key*/,
+ const Slice& /*value*/, std::string* /*new_value*/,
+ bool* /*value_changed*/) const override {
return false;
}
class DelayFilter : public CompactionFilter {
public:
explicit DelayFilter(DBTestBase* d) : db_test(d) {}
- virtual bool Filter(int level, const Slice& key, const Slice& value,
- std::string* new_value,
- bool* value_changed) const override {
+ virtual bool Filter(int /*level*/, const Slice& /*key*/,
+ const Slice& /*value*/, std::string* /*new_value*/,
+ bool* /*value_changed*/) const override {
db_test->env_->addon_time_.fetch_add(1000);
return true;
}
public:
explicit DelayFilterFactory(DBTestBase* d) : db_test(d) {}
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
- const CompactionFilter::Context& context) override {
+ const CompactionFilter::Context& /*context*/) override {
return std::unique_ptr<CompactionFilter>(new DelayFilter(db_test));
}
#ifndef ROCKSDB_LITE
TEST_F(DBTest, DeletionMarkers1) {
Options options = CurrentOptions();
- options.max_background_flushes = 0;
CreateAndReopenWithCF({"pikachu"}, options);
Put(1, "foo", "v1");
ASSERT_OK(Flush(1));
TEST_F(DBTest, DBOpen_Options) {
Options options = CurrentOptions();
- std::string dbname = test::TmpDir(env_) + "/db_options_test";
+ std::string dbname = test::PerThreadDBPath("db_options_test");
ASSERT_OK(DestroyDB(dbname, options));
// Does not exist, and create_if_missing == false: error
}
TEST_F(DBTest, DestroyDBMetaDatabase) {
- std::string dbname = test::TmpDir(env_) + "/db_meta";
+ std::string dbname = test::PerThreadDBPath("db_meta");
ASSERT_OK(env_->CreateDirIfMissing(dbname));
std::string metadbname = MetaDatabaseName(dbname, 0);
ASSERT_OK(env_->CreateDirIfMissing(metadbname));
} // namespace
+#ifndef TRAVIS
+// Disable this test temporarily on Travis as it fails intermittently.
+// Github issue: #4151
TEST_F(DBTest, GroupCommitTest) {
do {
Options options = CurrentOptions();
options.env = env_;
- env_->log_write_slowdown_.store(100);
options.statistics = rocksdb::CreateDBStatistics();
Reopen(options);
+ rocksdb::SyncPoint::GetInstance()->LoadDependency(
+ {{"WriteThread::JoinBatchGroup:BeganWaiting",
+ "DBImpl::WriteImpl:BeforeLeaderEnters"}});
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
// Start threads
GCThread thread[kGCNumThreads];
for (int id = 0; id < kGCNumThreads; id++) {
thread[id].done = false;
env_->StartThread(GCThreadBody, &thread[id]);
}
-
- for (int id = 0; id < kGCNumThreads; id++) {
- while (thread[id].done == false) {
- env_->SleepForMicroseconds(100000);
- }
- }
- env_->log_write_slowdown_.store(0);
+ env_->WaitForJoin();
ASSERT_GT(TestGetTickerCount(options, WRITE_DONE_BY_OTHER), 0);
ASSERT_GT(hist_data.average, 0.0);
} while (ChangeOptions(kSkipNoSeekToLast));
}
+#endif // TRAVIS
namespace {
typedef std::map<std::string, std::string> KVMap;
batch.Put(cf, k, v);
return Write(o, &batch);
}
+ using DB::Close;
+ virtual Status Close() override { return Status::OK(); }
using DB::Delete;
virtual Status Delete(const WriteOptions& o, ColumnFamilyHandle* cf,
const Slice& key) override {
return Write(o, &batch);
}
using DB::Get;
- virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* cf,
- const Slice& key, PinnableSlice* value) override {
+ virtual Status Get(const ReadOptions& /*options*/, ColumnFamilyHandle* /*cf*/,
+ const Slice& key, PinnableSlice* /*value*/) override {
return Status::NotSupported(key);
}
using DB::MultiGet;
virtual std::vector<Status> MultiGet(
- const ReadOptions& options,
- const std::vector<ColumnFamilyHandle*>& column_family,
+ const ReadOptions& /*options*/,
+ const std::vector<ColumnFamilyHandle*>& /*column_family*/,
const std::vector<Slice>& keys,
- std::vector<std::string>* values) override {
+ std::vector<std::string>* /*values*/) override {
std::vector<Status> s(keys.size(),
Status::NotSupported("Not implemented."));
return s;
#ifndef ROCKSDB_LITE
using DB::IngestExternalFile;
virtual Status IngestExternalFile(
- ColumnFamilyHandle* column_family,
- const std::vector<std::string>& external_files,
- const IngestExternalFileOptions& options) override {
+ ColumnFamilyHandle* /*column_family*/,
+ const std::vector<std::string>& /*external_files*/,
+ const IngestExternalFileOptions& /*options*/) override {
+ return Status::NotSupported("Not implemented.");
+ }
+
+ virtual Status VerifyChecksum() override {
return Status::NotSupported("Not implemented.");
}
using DB::GetPropertiesOfAllTables;
virtual Status GetPropertiesOfAllTables(
- ColumnFamilyHandle* column_family,
- TablePropertiesCollection* props) override {
+ ColumnFamilyHandle* /*column_family*/,
+ TablePropertiesCollection* /*props*/) override {
return Status();
}
virtual Status GetPropertiesOfTablesInRange(
- ColumnFamilyHandle* column_family, const Range* range, std::size_t n,
- TablePropertiesCollection* props) override {
+ ColumnFamilyHandle* /*column_family*/, const Range* /*range*/,
+ std::size_t /*n*/, TablePropertiesCollection* /*props*/) override {
return Status();
}
#endif // ROCKSDB_LITE
using DB::KeyMayExist;
- virtual bool KeyMayExist(const ReadOptions& options,
- ColumnFamilyHandle* column_family, const Slice& key,
- std::string* value,
+ virtual bool KeyMayExist(const ReadOptions& /*options*/,
+ ColumnFamilyHandle* /*column_family*/,
+ const Slice& /*key*/, std::string* /*value*/,
bool* value_found = nullptr) override {
if (value_found != nullptr) {
*value_found = false;
return true; // Not Supported directly
}
using DB::NewIterator;
- virtual Iterator* NewIterator(const ReadOptions& options,
- ColumnFamilyHandle* column_family) override {
+ virtual Iterator* NewIterator(
+ const ReadOptions& options,
+ ColumnFamilyHandle* /*column_family*/) override {
if (options.snapshot == nullptr) {
KVMap* saved = new KVMap;
*saved = map_;
}
}
virtual Status NewIterators(
- const ReadOptions& options,
- const std::vector<ColumnFamilyHandle*>& column_family,
- std::vector<Iterator*>* iterators) override {
+ const ReadOptions& /*options*/,
+ const std::vector<ColumnFamilyHandle*>& /*column_family*/,
+ std::vector<Iterator*>* /*iterators*/) override {
return Status::NotSupported("Not supported yet");
}
virtual const Snapshot* GetSnapshot() override {
delete reinterpret_cast<const ModelSnapshot*>(snapshot);
}
- virtual Status Write(const WriteOptions& options,
+ virtual Status Write(const WriteOptions& /*options*/,
WriteBatch* batch) override {
class Handler : public WriteBatch::Handler {
public:
virtual void Put(const Slice& key, const Slice& value) override {
(*map_)[key.ToString()] = value.ToString();
}
- virtual void Merge(const Slice& key, const Slice& value) override {
+ virtual void Merge(const Slice& /*key*/,
+ const Slice& /*value*/) override {
// ignore merge for now
// (*map_)[key.ToString()] = value.ToString();
}
}
using DB::GetProperty;
- virtual bool GetProperty(ColumnFamilyHandle* column_family,
- const Slice& property, std::string* value) override {
+ virtual bool GetProperty(ColumnFamilyHandle* /*column_family*/,
+ const Slice& /*property*/,
+ std::string* /*value*/) override {
return false;
}
using DB::GetIntProperty;
- virtual bool GetIntProperty(ColumnFamilyHandle* column_family,
- const Slice& property, uint64_t* value) override {
+ virtual bool GetIntProperty(ColumnFamilyHandle* /*column_family*/,
+ const Slice& /*property*/,
+ uint64_t* /*value*/) override {
return false;
}
using DB::GetMapProperty;
- virtual bool GetMapProperty(ColumnFamilyHandle* column_family,
- const Slice& property,
- std::map<std::string, double>* value) override {
+ virtual bool GetMapProperty(
+ ColumnFamilyHandle* /*column_family*/, const Slice& /*property*/,
+ std::map<std::string, std::string>* /*value*/) override {
return false;
}
using DB::GetAggregatedIntProperty;
- virtual bool GetAggregatedIntProperty(const Slice& property,
- uint64_t* value) override {
+ virtual bool GetAggregatedIntProperty(const Slice& /*property*/,
+ uint64_t* /*value*/) override {
return false;
}
using DB::GetApproximateSizes;
- virtual void GetApproximateSizes(ColumnFamilyHandle* column_family,
- const Range* range, int n, uint64_t* sizes,
- uint8_t include_flags
+ virtual void GetApproximateSizes(ColumnFamilyHandle* /*column_family*/,
+ const Range* /*range*/, int n,
+ uint64_t* sizes,
+ uint8_t /*include_flags*/
= INCLUDE_FILES) override {
for (int i = 0; i < n; i++) {
sizes[i] = 0;
}
}
using DB::GetApproximateMemTableStats;
- virtual void GetApproximateMemTableStats(ColumnFamilyHandle* column_family,
- const Range& range,
- uint64_t* const count,
- uint64_t* const size) override {
+ virtual void GetApproximateMemTableStats(
+ ColumnFamilyHandle* /*column_family*/, const Range& /*range*/,
+ uint64_t* const count, uint64_t* const size) override {
*count = 0;
*size = 0;
}
using DB::CompactRange;
- virtual Status CompactRange(const CompactRangeOptions& options,
- ColumnFamilyHandle* column_family,
- const Slice* start, const Slice* end) override {
+ virtual Status CompactRange(const CompactRangeOptions& /*options*/,
+ ColumnFamilyHandle* /*column_family*/,
+ const Slice* /*start*/,
+ const Slice* /*end*/) override {
return Status::NotSupported("Not supported operation.");
}
virtual Status SetDBOptions(
- const std::unordered_map<std::string, std::string>& new_options)
+ const std::unordered_map<std::string, std::string>& /*new_options*/)
override {
return Status::NotSupported("Not supported operation.");
}
using DB::CompactFiles;
- virtual Status CompactFiles(const CompactionOptions& compact_options,
- ColumnFamilyHandle* column_family,
- const std::vector<std::string>& input_file_names,
- const int output_level,
- const int output_path_id = -1) override {
+ virtual Status CompactFiles(
+ const CompactionOptions& /*compact_options*/,
+ ColumnFamilyHandle* /*column_family*/,
+ const std::vector<std::string>& /*input_file_names*/,
+ const int /*output_level*/, const int /*output_path_id*/ = -1,
+ std::vector<std::string>* const /*output_file_names*/ = nullptr
+ ) override {
return Status::NotSupported("Not supported operation.");
}
}
Status EnableAutoCompaction(
- const std::vector<ColumnFamilyHandle*>& column_family_handles) override {
+ const std::vector<ColumnFamilyHandle*>& /*column_family_handles*/)
+ override {
return Status::NotSupported("Not supported operation.");
}
using DB::NumberLevels;
- virtual int NumberLevels(ColumnFamilyHandle* column_family) override {
+ virtual int NumberLevels(ColumnFamilyHandle* /*column_family*/) override {
return 1;
}
using DB::MaxMemCompactionLevel;
virtual int MaxMemCompactionLevel(
- ColumnFamilyHandle* column_family) override {
+ ColumnFamilyHandle* /*column_family*/) override {
return 1;
}
using DB::Level0StopWriteTrigger;
virtual int Level0StopWriteTrigger(
- ColumnFamilyHandle* column_family) override {
+ ColumnFamilyHandle* /*column_family*/) override {
return -1;
}
virtual Env* GetEnv() const override { return nullptr; }
using DB::GetOptions;
- virtual Options GetOptions(ColumnFamilyHandle* column_family) const override {
+ virtual Options GetOptions(
+ ColumnFamilyHandle* /*column_family*/) const override {
return options_;
}
virtual DBOptions GetDBOptions() const override { return options_; }
using DB::Flush;
- virtual Status Flush(const rocksdb::FlushOptions& options,
- ColumnFamilyHandle* column_family) override {
+ virtual Status Flush(const rocksdb::FlushOptions& /*options*/,
+ ColumnFamilyHandle* /*column_family*/) override {
Status ret;
return ret;
}
#ifndef ROCKSDB_LITE
virtual Status DisableFileDeletions() override { return Status::OK(); }
- virtual Status EnableFileDeletions(bool force) override {
+ virtual Status EnableFileDeletions(bool /*force*/) override {
return Status::OK();
}
- virtual Status GetLiveFiles(std::vector<std::string>&, uint64_t* size,
- bool flush_memtable = true) override {
+ virtual Status GetLiveFiles(std::vector<std::string>&, uint64_t* /*size*/,
+ bool /*flush_memtable*/ = true) override {
return Status::OK();
}
- virtual Status GetSortedWalFiles(VectorLogPtr& files) override {
+ virtual Status GetSortedWalFiles(VectorLogPtr& /*files*/) override {
return Status::OK();
}
- virtual Status DeleteFile(std::string name) override { return Status::OK(); }
+ virtual Status DeleteFile(std::string /*name*/) override {
+ return Status::OK();
+ }
virtual Status GetUpdatesSince(
rocksdb::SequenceNumber, unique_ptr<rocksdb::TransactionLogIterator>*,
- const TransactionLogIterator::ReadOptions& read_options =
+ const TransactionLogIterator::ReadOptions& /*read_options*/ =
TransactionLogIterator::ReadOptions()) override {
return Status::NotSupported("Not supported in Model DB");
}
virtual void GetColumnFamilyMetaData(
- ColumnFamilyHandle* column_family,
- ColumnFamilyMetaData* metadata) override {}
+ ColumnFamilyHandle* /*column_family*/,
+ ColumnFamilyMetaData* /*metadata*/) override {}
#endif // ROCKSDB_LITE
- virtual Status GetDbIdentity(std::string& identity) const override {
+ virtual Status GetDbIdentity(std::string& /*identity*/) const override {
return Status::OK();
}
virtual SequenceNumber GetLatestSequenceNumber() const override { return 0; }
+ virtual bool SetPreserveDeletesSequenceNumber(
+ SequenceNumber /*seqnum*/) override {
+ return true;
+ }
+
virtual ColumnFamilyHandle* DefaultColumnFamily() const override {
return nullptr;
}
std::string name_ = "";
};
+#ifndef ROCKSDB_VALGRIND_RUN
static std::string RandomKey(Random* rnd, int minimum = 0) {
int len;
do {
if (model_snap != nullptr) model.ReleaseSnapshot(model_snap);
if (db_snap != nullptr) db_->ReleaseSnapshot(db_snap);
}
+#endif // ROCKSDB_VALGRIND_RUN
TEST_F(DBTest, BlockBasedTablePrefixIndexTest) {
// create a DB with block prefix index
}
}
}
+
+TEST_F(DBTest, FIFOCompactionTestWithCompaction) {
+ Options options;
+ options.compaction_style = kCompactionStyleFIFO;
+ options.write_buffer_size = 20 << 10; // 20K
+ options.arena_block_size = 4096;
+ options.compaction_options_fifo.max_table_files_size = 1500 << 10; // 1MB
+ options.compaction_options_fifo.allow_compaction = true;
+ options.level0_file_num_compaction_trigger = 6;
+ options.compression = kNoCompression;
+ options.create_if_missing = true;
+ options = CurrentOptions(options);
+ DestroyAndReopen(options);
+
+ Random rnd(301);
+ for (int i = 0; i < 60; i++) {
+ // Generate and flush a file about 20KB.
+ for (int j = 0; j < 20; j++) {
+ ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980)));
+ }
+ Flush();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ }
+ // It should be compacted to 10 files.
+ ASSERT_EQ(NumTableFilesAtLevel(0), 10);
+
+ for (int i = 0; i < 60; i++) {
+ // Generate and flush a file about 20KB.
+ for (int j = 0; j < 20; j++) {
+ ASSERT_OK(Put(ToString(i * 20 + j + 2000), RandomString(&rnd, 980)));
+ }
+ Flush();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ }
+
+ // It should be compacted to no more than 20 files.
+ ASSERT_GT(NumTableFilesAtLevel(0), 10);
+ ASSERT_LT(NumTableFilesAtLevel(0), 18);
+ // Size limit is still guaranteed.
+ ASSERT_LE(SizeAtLevel(0),
+ options.compaction_options_fifo.max_table_files_size);
+}
+
+TEST_F(DBTest, FIFOCompactionStyleWithCompactionAndDelete) {
+ Options options;
+ options.compaction_style = kCompactionStyleFIFO;
+ options.write_buffer_size = 20 << 10; // 20K
+ options.arena_block_size = 4096;
+ options.compaction_options_fifo.max_table_files_size = 1500 << 10; // 1MB
+ options.compaction_options_fifo.allow_compaction = true;
+ options.level0_file_num_compaction_trigger = 3;
+ options.compression = kNoCompression;
+ options.create_if_missing = true;
+ options = CurrentOptions(options);
+ DestroyAndReopen(options);
+
+ Random rnd(301);
+ for (int i = 0; i < 3; i++) {
+ // Each file contains a different key which will be dropped later.
+ ASSERT_OK(Put("a" + ToString(i), RandomString(&rnd, 500)));
+ ASSERT_OK(Put("key" + ToString(i), ""));
+ ASSERT_OK(Put("z" + ToString(i), RandomString(&rnd, 500)));
+ Flush();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ }
+ ASSERT_EQ(NumTableFilesAtLevel(0), 1);
+ for (int i = 0; i < 3; i++) {
+ ASSERT_EQ("", Get("key" + ToString(i)));
+ }
+ for (int i = 0; i < 3; i++) {
+ // Each file contains a different key which will be dropped later.
+ ASSERT_OK(Put("a" + ToString(i), RandomString(&rnd, 500)));
+ ASSERT_OK(Delete("key" + ToString(i)));
+ ASSERT_OK(Put("z" + ToString(i), RandomString(&rnd, 500)));
+ Flush();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ }
+ ASSERT_EQ(NumTableFilesAtLevel(0), 2);
+ for (int i = 0; i < 3; i++) {
+ ASSERT_EQ("NOT_FOUND", Get("key" + ToString(i)));
+ }
+}
+
+// Check that FIFO-with-TTL is not supported with max_open_files != -1.
+TEST_F(DBTest, FIFOCompactionWithTTLAndMaxOpenFilesTest) {
+ Options options;
+ options.compaction_style = kCompactionStyleFIFO;
+ options.create_if_missing = true;
+ options.compaction_options_fifo.ttl = 600; // seconds
+
+ // Check that it is not supported with max_open_files != -1.
+ options.max_open_files = 100;
+ options = CurrentOptions(options);
+ ASSERT_TRUE(TryReopen(options).IsNotSupported());
+
+ options.max_open_files = -1;
+ ASSERT_OK(TryReopen(options));
+}
+
+// Check that FIFO-with-TTL is supported only with BlockBasedTableFactory.
+TEST_F(DBTest, FIFOCompactionWithTTLAndVariousTableFormatsTest) {
+ Options options;
+ options.compaction_style = kCompactionStyleFIFO;
+ options.create_if_missing = true;
+ options.compaction_options_fifo.ttl = 600; // seconds
+
+ options = CurrentOptions(options);
+ options.table_factory.reset(NewBlockBasedTableFactory());
+ ASSERT_OK(TryReopen(options));
+
+ Destroy(options);
+ options.table_factory.reset(NewPlainTableFactory());
+ ASSERT_TRUE(TryReopen(options).IsNotSupported());
+
+ Destroy(options);
+ options.table_factory.reset(NewCuckooTableFactory());
+ ASSERT_TRUE(TryReopen(options).IsNotSupported());
+
+ Destroy(options);
+ options.table_factory.reset(NewAdaptiveTableFactory());
+ ASSERT_TRUE(TryReopen(options).IsNotSupported());
+}
+
+TEST_F(DBTest, FIFOCompactionWithTTLTest) {
+ Options options;
+ options.compaction_style = kCompactionStyleFIFO;
+ options.write_buffer_size = 10 << 10; // 10KB
+ options.arena_block_size = 4096;
+ options.compression = kNoCompression;
+ options.create_if_missing = true;
+ env_->time_elapse_only_sleep_ = false;
+ options.env = env_;
+
+ // Test to make sure that all files with expired ttl are deleted on next
+ // manual compaction.
+ {
+ env_->addon_time_.store(0);
+ options.compaction_options_fifo.max_table_files_size = 150 << 10; // 150KB
+ options.compaction_options_fifo.allow_compaction = false;
+ options.compaction_options_fifo.ttl = 1 * 60 * 60 ; // 1 hour
+ options = CurrentOptions(options);
+ DestroyAndReopen(options);
+
+ Random rnd(301);
+ for (int i = 0; i < 10; i++) {
+ // Generate and flush a file about 10KB.
+ for (int j = 0; j < 10; j++) {
+ ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980)));
+ }
+ Flush();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ }
+ ASSERT_EQ(NumTableFilesAtLevel(0), 10);
+
+ // Sleep for 2 hours -- which is much greater than TTL.
+ // Note: Couldn't use SleepForMicroseconds because it takes an int instead
+ // of uint64_t. Hence used addon_time_ directly.
+ // env_->SleepForMicroseconds(2 * 60 * 60 * 1000 * 1000);
+ env_->addon_time_.fetch_add(2 * 60 * 60);
+
+ // Since no flushes and compactions have run, the db should still be in
+ // the same state even after considerable time has passed.
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ ASSERT_EQ(NumTableFilesAtLevel(0), 10);
+
+ dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+ ASSERT_EQ(NumTableFilesAtLevel(0), 0);
+ }
+
+ // Test to make sure that all files with expired ttl are deleted on next
+ // automatic compaction.
+ {
+ options.compaction_options_fifo.max_table_files_size = 150 << 10; // 150KB
+ options.compaction_options_fifo.allow_compaction = false;
+ options.compaction_options_fifo.ttl = 1 * 60 * 60; // 1 hour
+ options = CurrentOptions(options);
+ DestroyAndReopen(options);
+
+ Random rnd(301);
+ for (int i = 0; i < 10; i++) {
+ // Generate and flush a file about 10KB.
+ for (int j = 0; j < 10; j++) {
+ ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980)));
+ }
+ Flush();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ }
+ ASSERT_EQ(NumTableFilesAtLevel(0), 10);
+
+ // Sleep for 2 hours -- which is much greater than TTL.
+ env_->addon_time_.fetch_add(2 * 60 * 60);
+ // Just to make sure that we are in the same state even after sleeping.
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ ASSERT_EQ(NumTableFilesAtLevel(0), 10);
+
+ // Create 1 more file to trigger TTL compaction. The old files are dropped.
+ for (int i = 0; i < 1; i++) {
+ for (int j = 0; j < 10; j++) {
+ ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980)));
+ }
+ Flush();
+ }
+
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ // Only the new 10 files remain.
+ ASSERT_EQ(NumTableFilesAtLevel(0), 1);
+ ASSERT_LE(SizeAtLevel(0),
+ options.compaction_options_fifo.max_table_files_size);
+ }
+
+ // Test that shows the fall back to size-based FIFO compaction if TTL-based
+ // deletion doesn't move the total size to be less than max_table_files_size.
+ {
+ options.write_buffer_size = 10 << 10; // 10KB
+ options.compaction_options_fifo.max_table_files_size = 150 << 10; // 150KB
+ options.compaction_options_fifo.allow_compaction = false;
+ options.compaction_options_fifo.ttl = 1 * 60 * 60; // 1 hour
+ options = CurrentOptions(options);
+ DestroyAndReopen(options);
+
+ Random rnd(301);
+ for (int i = 0; i < 3; i++) {
+ // Generate and flush a file about 10KB.
+ for (int j = 0; j < 10; j++) {
+ ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980)));
+ }
+ Flush();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ }
+ ASSERT_EQ(NumTableFilesAtLevel(0), 3);
+
+ // Sleep for 2 hours -- which is much greater than TTL.
+ env_->addon_time_.fetch_add(2 * 60 * 60);
+ // Just to make sure that we are in the same state even after sleeping.
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ ASSERT_EQ(NumTableFilesAtLevel(0), 3);
+
+ for (int i = 0; i < 5; i++) {
+ for (int j = 0; j < 140; j++) {
+ ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980)));
+ }
+ Flush();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ }
+ // Size limit is still guaranteed.
+ ASSERT_LE(SizeAtLevel(0),
+ options.compaction_options_fifo.max_table_files_size);
+ }
+
+ // Test with TTL + Intra-L0 compactions.
+ {
+ options.compaction_options_fifo.max_table_files_size = 150 << 10; // 150KB
+ options.compaction_options_fifo.allow_compaction = true;
+ options.compaction_options_fifo.ttl = 1 * 60 * 60; // 1 hour
+ options.level0_file_num_compaction_trigger = 6;
+ options = CurrentOptions(options);
+ DestroyAndReopen(options);
+
+ Random rnd(301);
+ for (int i = 0; i < 10; i++) {
+ // Generate and flush a file about 10KB.
+ for (int j = 0; j < 10; j++) {
+ ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980)));
+ }
+ Flush();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ }
+ // With Intra-L0 compaction, out of 10 files, 6 files will be compacted to 1
+ // (due to level0_file_num_compaction_trigger = 6).
+ // So total files = 1 + remaining 4 = 5.
+ ASSERT_EQ(NumTableFilesAtLevel(0), 5);
+
+ // Sleep for 2 hours -- which is much greater than TTL.
+ env_->addon_time_.fetch_add(2 * 60 * 60);
+ // Just to make sure that we are in the same state even after sleeping.
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ ASSERT_EQ(NumTableFilesAtLevel(0), 5);
+
+ // Create 10 more files. The old 5 files are dropped as their ttl expired.
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 10; j++) {
+ ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980)));
+ }
+ Flush();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ }
+ ASSERT_EQ(NumTableFilesAtLevel(0), 5);
+ ASSERT_LE(SizeAtLevel(0),
+ options.compaction_options_fifo.max_table_files_size);
+ }
+
+ // Test with large TTL + Intra-L0 compactions.
+ // Files dropped based on size, as ttl doesn't kick in.
+ {
+ options.write_buffer_size = 20 << 10; // 20K
+ options.compaction_options_fifo.max_table_files_size = 1500 << 10; // 1.5MB
+ options.compaction_options_fifo.allow_compaction = true;
+ options.compaction_options_fifo.ttl = 1 * 60 * 60; // 1 hour
+ options.level0_file_num_compaction_trigger = 6;
+ options = CurrentOptions(options);
+ DestroyAndReopen(options);
+
+ Random rnd(301);
+ for (int i = 0; i < 60; i++) {
+ // Generate and flush a file about 20KB.
+ for (int j = 0; j < 20; j++) {
+ ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980)));
+ }
+ Flush();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ }
+ // It should be compacted to 10 files.
+ ASSERT_EQ(NumTableFilesAtLevel(0), 10);
+
+ for (int i = 0; i < 60; i++) {
+ // Generate and flush a file about 20KB.
+ for (int j = 0; j < 20; j++) {
+ ASSERT_OK(Put(ToString(i * 20 + j + 2000), RandomString(&rnd, 980)));
+ }
+ Flush();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ }
+
+ // It should be compacted to no more than 20 files.
+ ASSERT_GT(NumTableFilesAtLevel(0), 10);
+ ASSERT_LT(NumTableFilesAtLevel(0), 18);
+ // Size limit is still guaranteed.
+ ASSERT_LE(SizeAtLevel(0),
+ options.compaction_options_fifo.max_table_files_size);
+ }
+}
#endif // ROCKSDB_LITE
#ifndef ROCKSDB_LITE
}
}
+TEST_F(DBTest, ConcurrentFlushWAL) {
+ const size_t cnt = 100;
+ Options options;
+ WriteOptions wopt;
+ ReadOptions ropt;
+ for (bool two_write_queues : {false, true}) {
+ for (bool manual_wal_flush : {false, true}) {
+ options.two_write_queues = two_write_queues;
+ options.manual_wal_flush = manual_wal_flush;
+ options.create_if_missing = true;
+ DestroyAndReopen(options);
+ std::vector<port::Thread> threads;
+ threads.emplace_back([&] {
+ for (size_t i = 0; i < cnt; i++) {
+ auto istr = ToString(i);
+ db_->Put(wopt, db_->DefaultColumnFamily(), "a" + istr, "b" + istr);
+ }
+ });
+ if (two_write_queues) {
+ threads.emplace_back([&] {
+ for (size_t i = cnt; i < 2 * cnt; i++) {
+ auto istr = ToString(i);
+ WriteBatch batch;
+ batch.Put("a" + istr, "b" + istr);
+ dbfull()->WriteImpl(wopt, &batch, nullptr, nullptr, 0, true);
+ }
+ });
+ }
+ threads.emplace_back([&] {
+ for (size_t i = 0; i < cnt * 100; i++) { // FlushWAL is faster than Put
+ db_->FlushWAL(false);
+ }
+ });
+ for (auto& t : threads) {
+ t.join();
+ }
+ options.create_if_missing = false;
+ // Recover from the wal and make sure that it is not corrupted
+ Reopen(options);
+ for (size_t i = 0; i < cnt; i++) {
+ PinnableSlice pval;
+ auto istr = ToString(i);
+ ASSERT_OK(
+ db_->Get(ropt, db_->DefaultColumnFamily(), "a" + istr, &pval));
+ ASSERT_TRUE(pval == ("b" + istr));
+ }
+ }
+ }
+}
+
#ifndef ROCKSDB_LITE
TEST_F(DBTest, DynamicMemtableOptions) {
const uint64_t k64KB = 1 << 16;
{"write_buffer_size", "131072"},
}));
- // The existing memtable is still 64KB in size, after it becomes immutable,
- // the next memtable will be 128KB in size. Write 256KB total, we should
- // have a 64KB L0 file, a 128KB L0 file, and a memtable with 64KB data
- gen_l0_kb(256);
- ASSERT_EQ(NumTableFilesAtLevel(0), 2); // (A)
+ // The existing memtable inflated 64KB->128KB when we invoked SetOptions().
+ // Write 192KB, we should have a 128KB L0 file and a memtable with 64KB data.
+ gen_l0_kb(192);
+ ASSERT_EQ(NumTableFilesAtLevel(0), 1); // (A)
+ ASSERT_LT(SizeAtLevel(0), k128KB + 2 * k5KB);
+ ASSERT_GT(SizeAtLevel(0), k128KB - 4 * k5KB);
+
+ // Decrease buffer size below current usage
+ ASSERT_OK(dbfull()->SetOptions({
+ {"write_buffer_size", "65536"},
+ }));
+ // The existing memtable became eligible for flush when we reduced its
+ // capacity to 64KB. Two keys need to be added to trigger flush: first causes
+ // memtable to be marked full, second schedules the flush. Then we should have
+ // a 128KB L0 file, a 64KB L0 file, and a memtable with just one key.
+ gen_l0_kb(2);
+ ASSERT_EQ(NumTableFilesAtLevel(0), 2);
ASSERT_LT(SizeAtLevel(0), k128KB + k64KB + 2 * k5KB);
ASSERT_GT(SizeAtLevel(0), k128KB + k64KB - 4 * k5KB);
Env::Priority::LOW);
// Start from scratch and disable compaction/flush. Flush can only happen
// during compaction but trigger is pretty high
- options.max_background_flushes = 0;
options.disable_auto_compactions = true;
DestroyAndReopen(options);
+ env_->SetBackgroundThreads(0, Env::HIGH);
// Put until writes are stopped, bounded by 256 puts. We should see stop at
// ~128KB
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::DelayWrite:Wait",
- [&](void* arg) { sleeping_task_low.WakeUp(); });
+ [&](void* /*arg*/) { sleeping_task_low.WakeUp(); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
while (!sleeping_task_low.WokenUp() && count < 256) {
const int kTestCount = 3;
const unsigned int kHighPriCounts[kTestCount] = {3, 2, 5};
const unsigned int kLowPriCounts[kTestCount] = {10, 15, 3};
+ const unsigned int kBottomPriCounts[kTestCount] = {2, 1, 4};
for (int test = 0; test < kTestCount; ++test) {
// Change the number of threads in high / low priority pool.
env_->SetBackgroundThreads(kHighPriCounts[test], Env::HIGH);
env_->SetBackgroundThreads(kLowPriCounts[test], Env::LOW);
+ env_->SetBackgroundThreads(kBottomPriCounts[test], Env::BOTTOM);
// Wait to ensure the all threads has been registered
unsigned int thread_type_counts[ThreadStatus::NUM_THREAD_TYPES];
+ // TODO(ajkr): it'd be better if SetBackgroundThreads returned only after
+ // all threads have been registered.
// Try up to 60 seconds.
for (int num_try = 0; num_try < 60000; num_try++) {
env_->SleepForMicroseconds(1000);
if (thread_type_counts[ThreadStatus::HIGH_PRIORITY] ==
kHighPriCounts[test] &&
thread_type_counts[ThreadStatus::LOW_PRIORITY] ==
- kLowPriCounts[test]) {
+ kLowPriCounts[test] &&
+ thread_type_counts[ThreadStatus::BOTTOM_PRIORITY] ==
+ kBottomPriCounts[test]) {
break;
}
}
- // Verify the total number of threades
- ASSERT_EQ(thread_type_counts[ThreadStatus::HIGH_PRIORITY] +
- thread_type_counts[ThreadStatus::LOW_PRIORITY],
- kHighPriCounts[test] + kLowPriCounts[test]);
// Verify the number of high-priority threads
ASSERT_EQ(thread_type_counts[ThreadStatus::HIGH_PRIORITY],
kHighPriCounts[test]);
// Verify the number of low-priority threads
ASSERT_EQ(thread_type_counts[ThreadStatus::LOW_PRIORITY],
kLowPriCounts[test]);
+ // Verify the number of bottom-priority threads
+ ASSERT_EQ(thread_type_counts[ThreadStatus::BOTTOM_PRIORITY],
+ kBottomPriCounts[test]);
}
if (i == 0) {
// repeat the test with multiple column families
TEST_P(DBTestWithParam, PreShutdownManualCompaction) {
Options options = CurrentOptions();
- options.max_background_flushes = 0;
options.max_subcompactions = max_subcompactions_;
CreateAndReopenWithCF({"pikachu"}, options);
if (iter == 0) {
options = CurrentOptions();
- options.max_background_flushes = 0;
options.num_levels = 3;
options.create_if_missing = true;
DestroyAndReopen(options);
TEST_F(DBTest, PreShutdownFlush) {
Options options = CurrentOptions();
- options.max_background_flushes = 0;
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_OK(Put(1, "key", "value"));
CancelAllBackgroundWork(db_);
Random rnd(301);
Options options;
options.create_if_missing = true;
- options.db_write_buffer_size = 6000;
- options.write_buffer_size = 6000;
+ options.db_write_buffer_size = 6000000;
+ options.write_buffer_size = 600000;
options.max_write_buffer_number = 2;
options.level0_file_num_compaction_trigger = 2;
options.level0_slowdown_writes_trigger = 2;
options.level0_stop_writes_trigger = 2;
options.soft_pending_compaction_bytes_limit = 1024 * 1024;
-
- // Use file size to distinguish levels
- // L1: 10, L2: 20, L3 40, L4 80
- // L0 is less than 30
- options.target_file_size_base = 10;
- options.target_file_size_multiplier = 2;
+ options.target_file_size_base = 20;
options.level_compaction_dynamic_level_bytes = true;
options.max_bytes_for_level_base = 200;
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
for (int i = 0; i < 100; i++) {
- ASSERT_OK(Put(Key(keys[i]), RandomString(&rnd, 200)));
-
- if (i % 25 == 0) {
- dbfull()->TEST_WaitForFlushMemTable();
+ std::string value = RandomString(&rnd, 200);
+ ASSERT_OK(Put(Key(keys[i]), value));
+ if (i % 25 == 24) {
+ Flush();
+ dbfull()->TEST_WaitForCompact();
}
}
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
for (int i = 101; i < 500; i++) {
- ASSERT_OK(Put(Key(keys[i]), RandomString(&rnd, 200)));
+ std::string value = RandomString(&rnd, 200);
+ ASSERT_OK(Put(Key(keys[i]), value));
if (i % 100 == 99) {
Flush();
dbfull()->TEST_WaitForCompact();
// Clean up memtable and L0. Block compaction threads. If continue to write
// and flush memtables. We should see put stop after 8 memtable flushes
// since level0_stop_writes_trigger = 8
- dbfull()->TEST_FlushMemTable(true);
+ dbfull()->TEST_FlushMemTable(true, true);
dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
// Block compaction
test::SleepingBackgroundTask sleeping_task_low;
WriteOptions wo;
while (count < 64) {
ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), wo));
- dbfull()->TEST_FlushMemTable(true);
+ dbfull()->TEST_FlushMemTable(true, true);
count++;
if (dbfull()->TEST_write_controler().IsStopped()) {
sleeping_task_low.WakeUp();
count = 0;
while (count < 64) {
ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), wo));
- dbfull()->TEST_FlushMemTable(true);
+ dbfull()->TEST_FlushMemTable(true, true);
count++;
if (dbfull()->TEST_write_controler().IsStopped()) {
sleeping_task_low.WakeUp();
dbfull()->TEST_WaitForCompact();
ASSERT_LT(NumTableFilesAtLevel(0), 4);
}
+
+// Test dynamic FIFO copmaction options.
+// This test covers just option parsing and makes sure that the options are
+// correctly assigned. Also look at DBOptionsTest.SetFIFOCompactionOptions
+// test which makes sure that the FIFO compaction funcionality is working
+// as expected on dynamically changing the options.
+// Even more FIFOCompactionTests are at DBTest.FIFOCompaction* .
+TEST_F(DBTest, DynamicFIFOCompactionOptions) {
+ Options options;
+ options.create_if_missing = true;
+ DestroyAndReopen(options);
+
+ // Initial defaults
+ ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.max_table_files_size,
+ 1024 * 1024 * 1024);
+ ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.ttl, 0);
+ ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.allow_compaction,
+ false);
+
+ ASSERT_OK(dbfull()->SetOptions(
+ {{"compaction_options_fifo", "{max_table_files_size=23;}"}}));
+ ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.max_table_files_size,
+ 23);
+ ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.ttl, 0);
+ ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.allow_compaction,
+ false);
+
+ ASSERT_OK(dbfull()->SetOptions({{"compaction_options_fifo", "{ttl=97}"}}));
+ ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.max_table_files_size,
+ 23);
+ ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.ttl, 97);
+ ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.allow_compaction,
+ false);
+
+ ASSERT_OK(dbfull()->SetOptions({{"compaction_options_fifo", "{ttl=203;}"}}));
+ ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.max_table_files_size,
+ 23);
+ ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.ttl, 203);
+ ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.allow_compaction,
+ false);
+
+ ASSERT_OK(dbfull()->SetOptions(
+ {{"compaction_options_fifo", "{allow_compaction=true;}"}}));
+ ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.max_table_files_size,
+ 23);
+ ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.ttl, 203);
+ ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.allow_compaction,
+ true);
+
+ ASSERT_OK(dbfull()->SetOptions(
+ {{"compaction_options_fifo", "{max_table_files_size=31;ttl=19;}"}}));
+ ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.max_table_files_size,
+ 31);
+ ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.ttl, 19);
+ ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.allow_compaction,
+ true);
+
+ ASSERT_OK(dbfull()->SetOptions(
+ {{"compaction_options_fifo",
+ "{max_table_files_size=51;ttl=49;allow_compaction=true;}"}}));
+ ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.max_table_files_size,
+ 51);
+ ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.ttl, 49);
+ ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.allow_compaction,
+ true);
+}
+
+TEST_F(DBTest, DynamicUniversalCompactionOptions) {
+ Options options;
+ options.create_if_missing = true;
+ DestroyAndReopen(options);
+
+ // Initial defaults
+ ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.size_ratio, 1);
+ ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.min_merge_width,
+ 2);
+ ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.max_merge_width,
+ UINT_MAX);
+ ASSERT_EQ(dbfull()
+ ->GetOptions()
+ .compaction_options_universal.max_size_amplification_percent,
+ 200);
+ ASSERT_EQ(dbfull()
+ ->GetOptions()
+ .compaction_options_universal.compression_size_percent,
+ -1);
+ ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.stop_style,
+ kCompactionStopStyleTotalSize);
+ ASSERT_EQ(
+ dbfull()->GetOptions().compaction_options_universal.allow_trivial_move,
+ false);
+
+ ASSERT_OK(dbfull()->SetOptions(
+ {{"compaction_options_universal", "{size_ratio=7;}"}}));
+ ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.size_ratio, 7);
+ ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.min_merge_width,
+ 2);
+ ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.max_merge_width,
+ UINT_MAX);
+ ASSERT_EQ(dbfull()
+ ->GetOptions()
+ .compaction_options_universal.max_size_amplification_percent,
+ 200);
+ ASSERT_EQ(dbfull()
+ ->GetOptions()
+ .compaction_options_universal.compression_size_percent,
+ -1);
+ ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.stop_style,
+ kCompactionStopStyleTotalSize);
+ ASSERT_EQ(
+ dbfull()->GetOptions().compaction_options_universal.allow_trivial_move,
+ false);
+
+ ASSERT_OK(dbfull()->SetOptions(
+ {{"compaction_options_universal", "{min_merge_width=11;}"}}));
+ ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.size_ratio, 7);
+ ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.min_merge_width,
+ 11);
+ ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.max_merge_width,
+ UINT_MAX);
+ ASSERT_EQ(dbfull()
+ ->GetOptions()
+ .compaction_options_universal.max_size_amplification_percent,
+ 200);
+ ASSERT_EQ(dbfull()
+ ->GetOptions()
+ .compaction_options_universal.compression_size_percent,
+ -1);
+ ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.stop_style,
+ kCompactionStopStyleTotalSize);
+ ASSERT_EQ(
+ dbfull()->GetOptions().compaction_options_universal.allow_trivial_move,
+ false);
+}
#endif // ROCKSDB_LITE
TEST_F(DBTest, FileCreationRandomFailure) {
}
#ifndef ROCKSDB_LITE
+
TEST_F(DBTest, DynamicMiscOptions) {
// Test max_sequential_skip_in_iterations
Options options;
options.compression = comp;
DestroyAndReopen(options);
- int kNumKeysWritten = 100000;
+ int kNumKeysWritten = 1000;
Random rnd(301);
for (int i = 0; i < kNumKeysWritten; ++i) {
public:
explicit DelayedMergeOperator(DBTest* d) : db_test_(d) {}
- virtual bool FullMergeV2(const MergeOperationInput& merge_in,
+ virtual bool FullMergeV2(const MergeOperationInput& /*merge_in*/,
MergeOperationOutput* merge_out) const override {
db_test_->env_->addon_time_.fetch_add(1000);
merge_out->new_value = "";
options.disable_auto_compactions = true;
options.create_if_missing = true;
options.statistics = rocksdb::CreateDBStatistics();
+ options.statistics->stats_level_ = kExceptTimeForMutex;
options.max_subcompactions = max_subcompactions_;
DestroyAndReopen(options);
status = experimental::PromoteL0(db_, db_->DefaultColumnFamily());
ASSERT_TRUE(status.IsInvalidArgument());
}
-#endif // ROCKSDB_LITE
// Github issue #596
-TEST_F(DBTest, HugeNumberOfLevels) {
+TEST_F(DBTest, CompactRangeWithEmptyBottomLevel) {
+ const int kNumLevels = 2;
+ const int kNumL0Files = 2;
Options options = CurrentOptions();
- options.write_buffer_size = 2 * 1024 * 1024; // 2MB
- options.max_bytes_for_level_base = 2 * 1024 * 1024; // 2MB
- options.num_levels = 12;
- options.max_background_compactions = 10;
- options.max_bytes_for_level_multiplier = 2;
- options.level_compaction_dynamic_level_bytes = true;
+ options.disable_auto_compactions = true;
+ options.num_levels = kNumLevels;
DestroyAndReopen(options);
Random rnd(301);
- for (int i = 0; i < 300000; ++i) {
- ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024)));
+ for (int i = 0; i < kNumL0Files; ++i) {
+ ASSERT_OK(Put(Key(0), RandomString(&rnd, 1024)));
+ Flush();
}
+ ASSERT_EQ(NumTableFilesAtLevel(0), kNumL0Files);
+ ASSERT_EQ(NumTableFilesAtLevel(1), 0);
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ ASSERT_EQ(NumTableFilesAtLevel(0), 0);
+ ASSERT_EQ(NumTableFilesAtLevel(1), kNumL0Files);
}
+#endif // ROCKSDB_LITE
TEST_F(DBTest, AutomaticConflictsWithManualCompaction) {
+ const int kNumL0Files = 50;
Options options = CurrentOptions();
- options.write_buffer_size = 2 * 1024 * 1024; // 2MB
- options.max_bytes_for_level_base = 2 * 1024 * 1024; // 2MB
- options.num_levels = 12;
+ options.level0_file_num_compaction_trigger = 4;
+ // never slowdown / stop
+ options.level0_slowdown_writes_trigger = 999999;
+ options.level0_stop_writes_trigger = 999999;
options.max_background_compactions = 10;
- options.max_bytes_for_level_multiplier = 2;
- options.level_compaction_dynamic_level_bytes = true;
DestroyAndReopen(options);
- Random rnd(301);
- for (int i = 0; i < 300000; ++i) {
- ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024)));
- }
-
+ // schedule automatic compactions after the manual one starts, but before it
+ // finishes to ensure conflict.
+ rocksdb::SyncPoint::GetInstance()->LoadDependency(
+ {{"DBImpl::BackgroundCompaction:Start",
+ "DBTest::AutomaticConflictsWithManualCompaction:PrePuts"},
+ {"DBTest::AutomaticConflictsWithManualCompaction:PostPuts",
+ "DBImpl::BackgroundCompaction:NonTrivial:AfterRun"}});
std::atomic<int> callback_count(0);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
- "DBImpl::BackgroundCompaction()::Conflict",
- [&](void* arg) { callback_count.fetch_add(1); });
+ "DBImpl::MaybeScheduleFlushOrCompaction:Conflict",
+ [&](void* /*arg*/) { callback_count.fetch_add(1); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
- CompactRangeOptions croptions;
- croptions.exclusive_manual_compaction = false;
- ASSERT_OK(db_->CompactRange(croptions, nullptr, nullptr));
+
+ Random rnd(301);
+ for (int i = 0; i < 2; ++i) {
+ // put two keys to ensure no trivial move
+ for (int j = 0; j < 2; ++j) {
+ ASSERT_OK(Put(Key(j), RandomString(&rnd, 1024)));
+ }
+ ASSERT_OK(Flush());
+ }
+ port::Thread manual_compaction_thread([this]() {
+ CompactRangeOptions croptions;
+ croptions.exclusive_manual_compaction = true;
+ ASSERT_OK(db_->CompactRange(croptions, nullptr, nullptr));
+ });
+
+ TEST_SYNC_POINT("DBTest::AutomaticConflictsWithManualCompaction:PrePuts");
+ for (int i = 0; i < kNumL0Files; ++i) {
+ // put two keys to ensure no trivial move
+ for (int j = 0; j < 2; ++j) {
+ ASSERT_OK(Put(Key(j), RandomString(&rnd, 1024)));
+ }
+ ASSERT_OK(Flush());
+ }
+ TEST_SYNC_POINT("DBTest::AutomaticConflictsWithManualCompaction:PostPuts");
+
ASSERT_GE(callback_count.load(), 1);
- rocksdb::SyncPoint::GetInstance()->DisableProcessing();
- for (int i = 0; i < 300000; ++i) {
+ for (int i = 0; i < 2; ++i) {
ASSERT_NE("NOT_FOUND", Get(Key(i)));
}
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+ manual_compaction_thread.join();
+ dbfull()->TEST_WaitForCompact();
}
// Github issue #595
std::atomic<int> callback_count(0);
rocksdb::SyncPoint::GetInstance()->SetCallBack("DBImpl::DelayWrite:Wait",
- [&](void* arg) {
+ [&](void* /*arg*/) {
callback_count.fetch_add(1);
sleeping_task_low.WakeUp();
});
sleeping_task_low.WaitUntilDone();
}
-#ifndef ROCKSDB_LITE
+#if !defined(ROCKSDB_LITE) && !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION)
+class WriteStallListener : public EventListener {
+ public:
+ WriteStallListener()
+ : cond_(&mutex_),
+ condition_(WriteStallCondition::kNormal),
+ expected_(WriteStallCondition::kNormal),
+ expected_set_(false) {}
+ void OnStallConditionsChanged(const WriteStallInfo& info) override {
+ MutexLock l(&mutex_);
+ condition_ = info.condition.cur;
+ if (expected_set_ && condition_ == expected_) {
+ cond_.Signal();
+ expected_set_ = false;
+ }
+ }
+ bool CheckCondition(WriteStallCondition expected) {
+ MutexLock l(&mutex_);
+ if (expected != condition_) {
+ expected_ = expected;
+ expected_set_ = true;
+ while (expected != condition_) {
+ // We bail out on timeout 500 milliseconds
+ const uint64_t timeout_us = 500000;
+ if (cond_.TimedWait(timeout_us)) {
+ expected_set_ = false;
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+ private:
+ port::Mutex mutex_;
+ port::CondVar cond_;
+ WriteStallCondition condition_;
+ WriteStallCondition expected_;
+ bool expected_set_;
+};
+
TEST_F(DBTest, SoftLimit) {
Options options = CurrentOptions();
options.env = env_;
options.max_bytes_for_level_multiplier = 10;
options.max_background_compactions = 1;
options.compression = kNoCompression;
+ WriteStallListener* listener = new WriteStallListener();
+ options.listeners.emplace_back(listener);
Reopen(options);
for (int i = 0; i < 72; i++) {
Put(Key(i), std::string(5000, 'x'));
if (i % 10 == 0) {
- Flush();
+ dbfull()->TEST_FlushMemTable(true, true);
}
}
dbfull()->TEST_WaitForCompact();
for (int i = 0; i < 72; i++) {
Put(Key(i), std::string(5000, 'x'));
if (i % 10 == 0) {
- Flush();
+ dbfull()->TEST_FlushMemTable(true, true);
}
}
dbfull()->TEST_WaitForCompact();
Put(Key(i), std::string(5000, 'x'));
Put(Key(100 - i), std::string(5000, 'x'));
// Flush the file. File size is around 30KB.
- Flush();
+ dbfull()->TEST_FlushMemTable(true, true);
}
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
+ ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kDelayed));
sleeping_task_low.WakeUp();
sleeping_task_low.WaitUntilDone();
// The L1 file size is around 30KB.
ASSERT_EQ(NumTableFilesAtLevel(1), 1);
ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
+ ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kNormal));
// Only allow one compactin going through.
rocksdb::SyncPoint::GetInstance()->SetCallBack(
- "BackgroundCallCompaction:0", [&](void* arg) {
+ "BackgroundCallCompaction:0", [&](void* /*arg*/) {
// Schedule a sleeping task.
sleeping_task_low.Reset();
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
Put(Key(10 + i), std::string(5000, 'x'));
Put(Key(90 - i), std::string(5000, 'x'));
// Flush the file. File size is around 30KB.
- Flush();
+ dbfull()->TEST_FlushMemTable(true, true);
}
// Wake up sleep task to enable compaction to run and waits
// doesn't trigger soft_pending_compaction_bytes_limit
ASSERT_EQ(NumTableFilesAtLevel(1), 1);
ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
+ ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kNormal));
// Create 3 L0 files, making score of L0 to be 3, higher than L0.
for (int i = 0; i < 3; i++) {
Put(Key(20 + i), std::string(5000, 'x'));
Put(Key(80 - i), std::string(5000, 'x'));
// Flush the file. File size is around 30KB.
- Flush();
+ dbfull()->TEST_FlushMemTable(true, true);
}
// Wake up sleep task to enable compaction to run and waits
// for it to go to sleep state again to make sure one compaction
// triggerring soft_pending_compaction_bytes_limit
ASSERT_EQ(NumTableFilesAtLevel(1), 1);
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
+ ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kDelayed));
sleeping_task_low.WakeUp();
sleeping_task_low.WaitUntilSleeping();
ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
+ ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kNormal));
// shrink level base so L2 will hit soft limit easier.
ASSERT_OK(dbfull()->SetOptions({
Put("", "");
Flush();
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
+ ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kDelayed));
sleeping_task_low.WaitUntilSleeping();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
sleeping_task.WakeUp();
sleeping_task.WaitUntilDone();
}
-#endif // ROCKSDB_LITE
+#endif // !defined(ROCKSDB_LITE) && !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION)
TEST_F(DBTest, FailWhenCompressionNotSupportedTest) {
CompressionType compressions[] = {kZlibCompression, kBZip2Compression,
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 1);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 1);
}
+
+TEST_F(DBTest, PinnableSliceAndRowCache) {
+ Options options = CurrentOptions();
+ options.statistics = rocksdb::CreateDBStatistics();
+ options.row_cache = NewLRUCache(8192);
+ DestroyAndReopen(options);
+
+ ASSERT_OK(Put("foo", "bar"));
+ ASSERT_OK(Flush());
+
+ ASSERT_EQ(Get("foo"), "bar");
+ ASSERT_EQ(
+ reinterpret_cast<LRUCache*>(options.row_cache.get())->TEST_GetLRUSize(),
+ 1);
+
+ {
+ PinnableSlice pin_slice;
+ ASSERT_EQ(Get("foo", &pin_slice), Status::OK());
+ ASSERT_EQ(pin_slice.ToString(), "bar");
+ // Entry is already in cache, lookup will remove the element from lru
+ ASSERT_EQ(
+ reinterpret_cast<LRUCache*>(options.row_cache.get())->TEST_GetLRUSize(),
+ 0);
+ }
+ // After PinnableSlice destruction element is added back in LRU
+ ASSERT_EQ(
+ reinterpret_cast<LRUCache*>(options.row_cache.get())->TEST_GetLRUSize(),
+ 1);
+}
+
#endif // ROCKSDB_LITE
TEST_F(DBTest, DeletingOldWalAfterDrop) {
ASSERT_TRUE(done.load());
}
+// Keep spawning short-living threads that create an iterator and quit.
+// Meanwhile in another thread keep flushing memtables.
+// This used to cause a deadlock.
+TEST_F(DBTest, ThreadLocalPtrDeadlock) {
+ std::atomic<int> flushes_done{0};
+ std::atomic<int> threads_destroyed{0};
+ auto done = [&] {
+ return flushes_done.load() > 10;
+ };
+
+ port::Thread flushing_thread([&] {
+ for (int i = 0; !done(); ++i) {
+ ASSERT_OK(db_->Put(WriteOptions(), Slice("hi"),
+ Slice(std::to_string(i).c_str())));
+ ASSERT_OK(db_->Flush(FlushOptions()));
+ int cnt = ++flushes_done;
+ fprintf(stderr, "Flushed %d times\n", cnt);
+ }
+ });
+
+ std::vector<port::Thread> thread_spawning_threads(10);
+ for (auto& t: thread_spawning_threads) {
+ t = port::Thread([&] {
+ while (!done()) {
+ {
+ port::Thread tmp_thread([&] {
+ auto it = db_->NewIterator(ReadOptions());
+ delete it;
+ });
+ tmp_thread.join();
+ }
+ ++threads_destroyed;
+ }
+ });
+ }
+
+ for (auto& t: thread_spawning_threads) {
+ t.join();
+ }
+ flushing_thread.join();
+ fprintf(stderr, "Done. Flushed %d times, destroyed %d threads\n",
+ flushes_done.load(), threads_destroyed.load());
+}
} // namespace rocksdb
int main(int argc, char** argv) {