// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
-#include "db/blob_index.h"
+#include "db/blob/blob_index.h"
#include "db/db_impl/db_impl.h"
#include "db/db_test_util.h"
#include "db/dbformat.h"
#include "db/version_set.h"
#include "db/write_batch_internal.h"
#include "file/filename.h"
-#include "logging/logging.h"
#include "memtable/hash_linklist_rep.h"
#include "monitoring/statistics.h"
#include "rocksdb/cache.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/table.h"
#include "rocksdb/table_properties.h"
-#include "table/block_based/block_based_table_factory.h"
-#include "table/plain/plain_table_factory.h"
#include "test_util/sync_point.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
class EventListenerTest : public DBTestBase {
public:
- EventListenerTest() : DBTestBase("/listener_test") {}
+ EventListenerTest() : DBTestBase("/listener_test", /*env_do_fsync=*/true) {}
static std::string BlobStr(uint64_t blob_file_number, uint64_t offset,
uint64_t size) {
ASSERT_GT(info.table_properties.raw_value_size, 0U);
ASSERT_GT(info.table_properties.num_data_blocks, 0U);
ASSERT_GT(info.table_properties.num_entries, 0U);
+ ASSERT_EQ(info.file_checksum, kUnknownFileChecksum);
+ ASSERT_EQ(info.file_checksum_func_name, kUnknownFileChecksumFuncName);
#ifdef ROCKSDB_USING_THREAD_STATUS
// Verify the id of the current thread that created this table
for (size_t c = 0; c < cf_names.size(); ++c) {
for (int d = 0; d < kNumDBs; ++d) {
ASSERT_OK(dbs[d]->Flush(FlushOptions(), vec_handles[d][c]));
- reinterpret_cast<DBImpl*>(dbs[d])->TEST_WaitForFlushMemTable();
+ static_cast_with_check<DBImpl>(dbs[d])->TEST_WaitForFlushMemTable();
}
}
public:
class TestEnv : public EnvWrapper {
public:
- TestEnv() : EnvWrapper(Env::Default()) {}
+ explicit TestEnv(Env* t) : EnvWrapper(t) {}
void SetStatus(Status s) { status_ = s; }
return status_;
}
}
- return Env::Default()->NewWritableFile(fname, result, options);
+ return target()->NewWritableFile(fname, result, options);
}
private:
ASSERT_GT(info.cf_name.size(), 0U);
ASSERT_GT(info.file_path.size(), 0U);
ASSERT_GT(info.job_id, 0);
+ ASSERT_EQ(info.file_checksum, kUnknownFileChecksum);
+ ASSERT_EQ(info.file_checksum_func_name, kUnknownFileChecksumFuncName);
if (info.status.ok()) {
ASSERT_GT(info.table_properties.data_size, 0U);
ASSERT_GT(info.table_properties.raw_key_size, 0U);
}
}
- TestEnv test_env;
int started_[2];
int finished_[2];
int failure_[2];
TEST_F(EventListenerTest, TableFileCreationListenersTest) {
auto listener = std::make_shared<TableFileCreationListener>();
Options options;
+ std::unique_ptr<TableFileCreationListener::TestEnv> test_env(
+ new TableFileCreationListener::TestEnv(CurrentOptions().env));
options.create_if_missing = true;
options.listeners.push_back(listener);
- options.env = &listener->test_env;
+ options.env = test_env.get();
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "aaa"));
ASSERT_OK(Flush());
dbfull()->TEST_WaitForFlushMemTable();
listener->CheckAndResetCounters(1, 1, 0, 0, 0, 0);
-
ASSERT_OK(Put("foo", "aaa1"));
ASSERT_OK(Put("bar", "bbb1"));
- listener->test_env.SetStatus(Status::NotSupported("not supported"));
+ test_env->SetStatus(Status::NotSupported("not supported"));
ASSERT_NOK(Flush());
listener->CheckAndResetCounters(1, 1, 1, 0, 0, 0);
- listener->test_env.SetStatus(Status::OK());
+ test_env->SetStatus(Status::OK());
Reopen(options);
ASSERT_OK(Put("foo", "aaa2"));
ASSERT_OK(Put("foo", "aaa3"));
ASSERT_OK(Put("bar", "bbb3"));
ASSERT_OK(Flush());
- listener->test_env.SetStatus(Status::NotSupported("not supported"));
+ test_env->SetStatus(Status::NotSupported("not supported"));
dbfull()->CompactRange(CompactRangeOptions(), &kRangeStart, &kRangeEnd);
dbfull()->TEST_WaitForCompact();
listener->CheckAndResetCounters(1, 1, 0, 1, 1, 1);
+ Close();
}
class MemTableSealedListener : public EventListener {
TEST_F(EventListenerTest, MemTableSealedListenerTest) {
auto listener = std::make_shared<MemTableSealedListener>();
Options options;
+ options.env = CurrentOptions().env;
options.create_if_missing = true;
options.listeners.push_back(listener);
DestroyAndReopen(options);
// can succeed.
*bg_error = Status::OK();
env_->drop_writes_.store(false, std::memory_order_release);
- env_->no_slowdown_ = false;
+ env_->SetMockSleep(false);
}
++counter_;
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
env_->drop_writes_.store(true, std::memory_order_release);
- env_->no_slowdown_ = true;
+ env_->SetMockSleep();
ASSERT_OK(Put("key0", "val"));
ASSERT_OK(Put("key1", "val"));
ASSERT_EQ(2, NumTableFilesAtLevel(0));
env_->drop_writes_.store(true, std::memory_order_release);
- env_->no_slowdown_ = true;
+ env_->SetMockSleep();
ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(1, listener->counter());
file_reads_success_.store(0);
file_writes_.store(0);
file_writes_success_.store(0);
+ file_flushes_.store(0);
+ file_flushes_success_.store(0);
+ file_closes_.store(0);
+ file_closes_success_.store(0);
+ file_syncs_.store(0);
+ file_syncs_success_.store(0);
+ file_truncates_.store(0);
+ file_truncates_success_.store(0);
}
void OnFileReadFinish(const FileOperationInfo& info) override {
ReportDuration(info);
}
+ void OnFileFlushFinish(const FileOperationInfo& info) override {
+ ++file_flushes_;
+ if (info.status.ok()) {
+ ++file_flushes_success_;
+ }
+ ReportDuration(info);
+ }
+
+ void OnFileCloseFinish(const FileOperationInfo& info) override {
+ ++file_closes_;
+ if (info.status.ok()) {
+ ++file_closes_success_;
+ }
+ ReportDuration(info);
+ }
+
+ void OnFileSyncFinish(const FileOperationInfo& info) override {
+ ++file_syncs_;
+ if (info.status.ok()) {
+ ++file_syncs_success_;
+ }
+ ReportDuration(info);
+ }
+
+ void OnFileTruncateFinish(const FileOperationInfo& info) override {
+ ++file_truncates_;
+ if (info.status.ok()) {
+ ++file_truncates_success_;
+ }
+ ReportDuration(info);
+ }
+
bool ShouldBeNotifiedOnFileIO() override { return true; }
std::atomic<size_t> file_reads_;
std::atomic<size_t> file_reads_success_;
std::atomic<size_t> file_writes_;
std::atomic<size_t> file_writes_success_;
+ std::atomic<size_t> file_flushes_;
+ std::atomic<size_t> file_flushes_success_;
+ std::atomic<size_t> file_closes_;
+ std::atomic<size_t> file_closes_success_;
+ std::atomic<size_t> file_syncs_;
+ std::atomic<size_t> file_syncs_success_;
+ std::atomic<size_t> file_truncates_;
+ std::atomic<size_t> file_truncates_success_;
private:
void ReportDuration(const FileOperationInfo& info) const {
- auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(
- info.finish_timestamp - info.start_timestamp);
- ASSERT_GT(duration.count(), 0);
+ ASSERT_GT(info.duration.count(), 0);
}
};
TestFileOperationListener* listener = new TestFileOperationListener();
options.listeners.emplace_back(listener);
+ options.use_direct_io_for_flush_and_compaction = false;
+ Status s = TryReopen(options);
+ if (s.IsInvalidArgument()) {
+ options.use_direct_io_for_flush_and_compaction = false;
+ } else {
+ ASSERT_OK(s);
+ }
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "aaa"));
dbfull()->Flush(FlushOptions());
ASSERT_GE(listener->file_writes_.load(),
listener->file_writes_success_.load());
ASSERT_GT(listener->file_writes_.load(), 0);
+ ASSERT_GE(listener->file_flushes_.load(),
+ listener->file_flushes_success_.load());
+ ASSERT_GT(listener->file_flushes_.load(), 0);
Close();
Reopen(options);
ASSERT_GE(listener->file_reads_.load(), listener->file_reads_success_.load());
ASSERT_GT(listener->file_reads_.load(), 0);
+ ASSERT_GE(listener->file_closes_.load(),
+ listener->file_closes_success_.load());
+ ASSERT_GT(listener->file_closes_.load(), 0);
+ ASSERT_GE(listener->file_syncs_.load(), listener->file_syncs_success_.load());
+ ASSERT_GT(listener->file_syncs_.load(), 0);
+ if (true == options.use_direct_io_for_flush_and_compaction) {
+ ASSERT_GE(listener->file_truncates_.load(),
+ listener->file_truncates_success_.load());
+ ASSERT_GT(listener->file_truncates_.load(), 0);
+ }
}
} // namespace ROCKSDB_NAMESPACE