1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 #include "db/db_impl.h"
7 #include "db/db_test_util.h"
8 #include "db/dbformat.h"
9 #include "db/version_set.h"
10 #include "db/write_batch_internal.h"
11 #include "memtable/hash_linklist_rep.h"
12 #include "monitoring/statistics.h"
13 #include "rocksdb/cache.h"
14 #include "rocksdb/compaction_filter.h"
15 #include "rocksdb/db.h"
16 #include "rocksdb/env.h"
17 #include "rocksdb/filter_policy.h"
18 #include "rocksdb/options.h"
19 #include "rocksdb/perf_context.h"
20 #include "rocksdb/slice.h"
21 #include "rocksdb/slice_transform.h"
22 #include "rocksdb/table.h"
23 #include "rocksdb/table_properties.h"
24 #include "table/block_based_table_factory.h"
25 #include "table/plain_table_factory.h"
26 #include "util/filename.h"
27 #include "util/hash.h"
28 #include "util/logging.h"
29 #include "util/mutexlock.h"
30 #include "util/rate_limiter.h"
31 #include "util/string_util.h"
32 #include "util/sync_point.h"
33 #include "util/testharness.h"
34 #include "util/testutil.h"
35 #include "utilities/merge_operators.h"
41 class EventListenerTest
: public DBTestBase
{
43 EventListenerTest() : DBTestBase("/listener_test") {}
45 const size_t k110KB
= 110 << 10;
48 struct TestPropertiesCollector
: public rocksdb::TablePropertiesCollector
{
49 rocksdb::Status
AddUserKey(const rocksdb::Slice
& /*key*/,
50 const rocksdb::Slice
& /*value*/,
51 rocksdb::EntryType
/*type*/,
52 rocksdb::SequenceNumber
/*seq*/,
53 uint64_t /*file_size*/) override
{
56 rocksdb::Status
Finish(
57 rocksdb::UserCollectedProperties
* properties
) override
{
58 properties
->insert({"0", "1"});
62 const char* Name() const override
{ return "TestTablePropertiesCollector"; }
64 rocksdb::UserCollectedProperties
GetReadableProperties() const override
{
65 rocksdb::UserCollectedProperties ret
;
71 class TestPropertiesCollectorFactory
: public TablePropertiesCollectorFactory
{
73 TablePropertiesCollector
* CreateTablePropertiesCollector(
74 TablePropertiesCollectorFactory::Context
/*context*/) override
{
75 return new TestPropertiesCollector
;
77 const char* Name() const override
{ return "TestTablePropertiesCollector"; }
80 class TestCompactionListener
: public EventListener
{
82 void OnCompactionCompleted(DB
*db
, const CompactionJobInfo
& ci
) override
{
83 std::lock_guard
<std::mutex
> lock(mutex_
);
84 compacted_dbs_
.push_back(db
);
85 ASSERT_GT(ci
.input_files
.size(), 0U);
86 ASSERT_GT(ci
.output_files
.size(), 0U);
87 ASSERT_EQ(db
->GetEnv()->GetThreadID(), ci
.thread_id
);
88 ASSERT_GT(ci
.thread_id
, 0U);
90 for (auto fl
: {ci
.input_files
, ci
.output_files
}) {
92 auto it
= ci
.table_properties
.find(fn
);
93 ASSERT_NE(it
, ci
.table_properties
.end());
95 ASSERT_TRUE(tp
!= nullptr);
96 ASSERT_EQ(tp
->user_collected_properties
.find("0")->second
, "1");
101 std::vector
<DB
*> compacted_dbs_
;
105 TEST_F(EventListenerTest
, OnSingleDBCompactionTest
) {
106 const int kTestKeySize
= 16;
107 const int kTestValueSize
= 984;
108 const int kEntrySize
= kTestKeySize
+ kTestValueSize
;
109 const int kEntriesPerBuffer
= 100;
110 const int kNumL0Files
= 4;
113 options
.env
= CurrentOptions().env
;
114 options
.create_if_missing
= true;
115 options
.write_buffer_size
= kEntrySize
* kEntriesPerBuffer
;
116 options
.compaction_style
= kCompactionStyleLevel
;
117 options
.target_file_size_base
= options
.write_buffer_size
;
118 options
.max_bytes_for_level_base
= options
.target_file_size_base
* 2;
119 options
.max_bytes_for_level_multiplier
= 2;
120 options
.compression
= kNoCompression
;
121 #ifdef ROCKSDB_USING_THREAD_STATUS
122 options
.enable_thread_tracking
= true;
123 #endif // ROCKSDB_USING_THREAD_STATUS
124 options
.level0_file_num_compaction_trigger
= kNumL0Files
;
125 options
.table_properties_collector_factories
.push_back(
126 std::make_shared
<TestPropertiesCollectorFactory
>());
128 TestCompactionListener
* listener
= new TestCompactionListener();
129 options
.listeners
.emplace_back(listener
);
130 std::vector
<std::string
> cf_names
= {
131 "pikachu", "ilya", "muromec", "dobrynia",
132 "nikitich", "alyosha", "popovich"};
133 CreateAndReopenWithCF(cf_names
, options
);
134 ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
135 ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
136 ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
137 ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
138 ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
139 ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
140 ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
141 for (int i
= 1; i
< 8; ++i
) {
143 const Slice kRangeStart
= "a";
144 const Slice kRangeEnd
= "z";
145 ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_
[i
],
146 &kRangeStart
, &kRangeEnd
));
147 dbfull()->TEST_WaitForFlushMemTable();
148 dbfull()->TEST_WaitForCompact();
151 ASSERT_EQ(listener
->compacted_dbs_
.size(), cf_names
.size());
152 for (size_t i
= 0; i
< cf_names
.size(); ++i
) {
153 ASSERT_EQ(listener
->compacted_dbs_
[i
], db_
);
157 // This simple Listener can only handle one flush at a time.
158 class TestFlushListener
: public EventListener
{
160 explicit TestFlushListener(Env
* env
)
161 : slowdown_count(0), stop_count(0), db_closed(), env_(env
) {
164 void OnTableFileCreated(
165 const TableFileCreationInfo
& info
) override
{
166 // remember the info for later checking the FlushJobInfo.
167 prev_fc_info_
= info
;
168 ASSERT_GT(info
.db_name
.size(), 0U);
169 ASSERT_GT(info
.cf_name
.size(), 0U);
170 ASSERT_GT(info
.file_path
.size(), 0U);
171 ASSERT_GT(info
.job_id
, 0);
172 ASSERT_GT(info
.table_properties
.data_size
, 0U);
173 ASSERT_GT(info
.table_properties
.raw_key_size
, 0U);
174 ASSERT_GT(info
.table_properties
.raw_value_size
, 0U);
175 ASSERT_GT(info
.table_properties
.num_data_blocks
, 0U);
176 ASSERT_GT(info
.table_properties
.num_entries
, 0U);
178 #ifdef ROCKSDB_USING_THREAD_STATUS
179 // Verify the id of the current thread that created this table
180 // file matches the id of any active flush or compaction thread.
181 uint64_t thread_id
= env_
->GetThreadID();
182 std::vector
<ThreadStatus
> thread_list
;
183 ASSERT_OK(env_
->GetThreadList(&thread_list
));
184 bool found_match
= false;
185 for (auto thread_status
: thread_list
) {
186 if (thread_status
.operation_type
== ThreadStatus::OP_FLUSH
||
187 thread_status
.operation_type
== ThreadStatus::OP_COMPACTION
) {
188 if (thread_id
== thread_status
.thread_id
) {
194 ASSERT_TRUE(found_match
);
195 #endif // ROCKSDB_USING_THREAD_STATUS
198 void OnFlushCompleted(
199 DB
* db
, const FlushJobInfo
& info
) override
{
200 flushed_dbs_
.push_back(db
);
201 flushed_column_family_names_
.push_back(info
.cf_name
);
202 if (info
.triggered_writes_slowdown
) {
205 if (info
.triggered_writes_stop
) {
208 // verify whether the previously created file matches the flushed file.
209 ASSERT_EQ(prev_fc_info_
.db_name
, db
->GetName());
210 ASSERT_EQ(prev_fc_info_
.cf_name
, info
.cf_name
);
211 ASSERT_EQ(prev_fc_info_
.job_id
, info
.job_id
);
212 ASSERT_EQ(prev_fc_info_
.file_path
, info
.file_path
);
213 ASSERT_EQ(db
->GetEnv()->GetThreadID(), info
.thread_id
);
214 ASSERT_GT(info
.thread_id
, 0U);
215 ASSERT_EQ(info
.table_properties
.user_collected_properties
.find("0")->second
,
219 std::vector
<std::string
> flushed_column_family_names_
;
220 std::vector
<DB
*> flushed_dbs_
;
224 std::atomic_bool db_closed
;
225 TableFileCreationInfo prev_fc_info_
;
231 TEST_F(EventListenerTest
, OnSingleDBFlushTest
) {
233 options
.env
= CurrentOptions().env
;
234 options
.write_buffer_size
= k110KB
;
235 #ifdef ROCKSDB_USING_THREAD_STATUS
236 options
.enable_thread_tracking
= true;
237 #endif // ROCKSDB_USING_THREAD_STATUS
238 TestFlushListener
* listener
= new TestFlushListener(options
.env
);
239 options
.listeners
.emplace_back(listener
);
240 std::vector
<std::string
> cf_names
= {
241 "pikachu", "ilya", "muromec", "dobrynia",
242 "nikitich", "alyosha", "popovich"};
243 options
.table_properties_collector_factories
.push_back(
244 std::make_shared
<TestPropertiesCollectorFactory
>());
245 CreateAndReopenWithCF(cf_names
, options
);
247 ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
248 ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
249 ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
250 ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
251 ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
252 ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
253 ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
254 for (int i
= 1; i
< 8; ++i
) {
256 dbfull()->TEST_WaitForFlushMemTable();
257 ASSERT_EQ(listener
->flushed_dbs_
.size(), i
);
258 ASSERT_EQ(listener
->flushed_column_family_names_
.size(), i
);
261 // make sure callback functions are called in the right order
262 for (size_t i
= 0; i
< cf_names
.size(); ++i
) {
263 ASSERT_EQ(listener
->flushed_dbs_
[i
], db_
);
264 ASSERT_EQ(listener
->flushed_column_family_names_
[i
], cf_names
[i
]);
268 TEST_F(EventListenerTest
, MultiCF
) {
270 options
.env
= CurrentOptions().env
;
271 options
.write_buffer_size
= k110KB
;
272 #ifdef ROCKSDB_USING_THREAD_STATUS
273 options
.enable_thread_tracking
= true;
274 #endif // ROCKSDB_USING_THREAD_STATUS
275 TestFlushListener
* listener
= new TestFlushListener(options
.env
);
276 options
.listeners
.emplace_back(listener
);
277 options
.table_properties_collector_factories
.push_back(
278 std::make_shared
<TestPropertiesCollectorFactory
>());
279 std::vector
<std::string
> cf_names
= {
280 "pikachu", "ilya", "muromec", "dobrynia",
281 "nikitich", "alyosha", "popovich"};
282 CreateAndReopenWithCF(cf_names
, options
);
284 ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
285 ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
286 ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
287 ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
288 ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
289 ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
290 ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
291 for (int i
= 1; i
< 8; ++i
) {
293 ASSERT_EQ(listener
->flushed_dbs_
.size(), i
);
294 ASSERT_EQ(listener
->flushed_column_family_names_
.size(), i
);
297 // make sure callback functions are called in the right order
298 for (size_t i
= 0; i
< cf_names
.size(); i
++) {
299 ASSERT_EQ(listener
->flushed_dbs_
[i
], db_
);
300 ASSERT_EQ(listener
->flushed_column_family_names_
[i
], cf_names
[i
]);
304 TEST_F(EventListenerTest
, MultiDBMultiListeners
) {
306 options
.env
= CurrentOptions().env
;
307 #ifdef ROCKSDB_USING_THREAD_STATUS
308 options
.enable_thread_tracking
= true;
309 #endif // ROCKSDB_USING_THREAD_STATUS
310 options
.table_properties_collector_factories
.push_back(
311 std::make_shared
<TestPropertiesCollectorFactory
>());
312 std::vector
<TestFlushListener
*> listeners
;
313 const int kNumDBs
= 5;
314 const int kNumListeners
= 10;
315 for (int i
= 0; i
< kNumListeners
; ++i
) {
316 listeners
.emplace_back(new TestFlushListener(options
.env
));
319 std::vector
<std::string
> cf_names
= {
320 "pikachu", "ilya", "muromec", "dobrynia",
321 "nikitich", "alyosha", "popovich"};
323 options
.create_if_missing
= true;
324 for (int i
= 0; i
< kNumListeners
; ++i
) {
325 options
.listeners
.emplace_back(listeners
[i
]);
327 DBOptions
db_opts(options
);
328 ColumnFamilyOptions
cf_opts(options
);
330 std::vector
<DB
*> dbs
;
331 std::vector
<std::vector
<ColumnFamilyHandle
*>> vec_handles
;
333 for (int d
= 0; d
< kNumDBs
; ++d
) {
334 ASSERT_OK(DestroyDB(dbname_
+ ToString(d
), options
));
336 std::vector
<ColumnFamilyHandle
*> handles
;
337 ASSERT_OK(DB::Open(options
, dbname_
+ ToString(d
), &db
));
338 for (size_t c
= 0; c
< cf_names
.size(); ++c
) {
339 ColumnFamilyHandle
* handle
;
340 db
->CreateColumnFamily(cf_opts
, cf_names
[c
], &handle
);
341 handles
.push_back(handle
);
344 vec_handles
.push_back(std::move(handles
));
348 for (int d
= 0; d
< kNumDBs
; ++d
) {
349 for (size_t c
= 0; c
< cf_names
.size(); ++c
) {
350 ASSERT_OK(dbs
[d
]->Put(WriteOptions(), vec_handles
[d
][c
],
351 cf_names
[c
], cf_names
[c
]));
355 for (size_t c
= 0; c
< cf_names
.size(); ++c
) {
356 for (int d
= 0; d
< kNumDBs
; ++d
) {
357 ASSERT_OK(dbs
[d
]->Flush(FlushOptions(), vec_handles
[d
][c
]));
358 reinterpret_cast<DBImpl
*>(dbs
[d
])->TEST_WaitForFlushMemTable();
362 for (auto* listener
: listeners
) {
364 for (size_t c
= 0; c
< cf_names
.size(); ++c
) {
365 for (int d
= 0; d
< kNumDBs
; ++d
) {
366 ASSERT_EQ(listener
->flushed_dbs_
[pos
], dbs
[d
]);
367 ASSERT_EQ(listener
->flushed_column_family_names_
[pos
], cf_names
[c
]);
374 for (auto handles
: vec_handles
) {
375 for (auto h
: handles
) {
382 for (auto db
: dbs
) {
387 TEST_F(EventListenerTest
, DisableBGCompaction
) {
389 options
.env
= CurrentOptions().env
;
390 #ifdef ROCKSDB_USING_THREAD_STATUS
391 options
.enable_thread_tracking
= true;
392 #endif // ROCKSDB_USING_THREAD_STATUS
393 TestFlushListener
* listener
= new TestFlushListener(options
.env
);
394 const int kCompactionTrigger
= 1;
395 const int kSlowdownTrigger
= 5;
396 const int kStopTrigger
= 100;
397 options
.level0_file_num_compaction_trigger
= kCompactionTrigger
;
398 options
.level0_slowdown_writes_trigger
= kSlowdownTrigger
;
399 options
.level0_stop_writes_trigger
= kStopTrigger
;
400 options
.max_write_buffer_number
= 10;
401 options
.listeners
.emplace_back(listener
);
402 // BG compaction is disabled. Number of L0 files will simply keeps
403 // increasing in this test.
404 options
.compaction_style
= kCompactionStyleNone
;
405 options
.compression
= kNoCompression
;
406 options
.write_buffer_size
= 100000; // Small write buffer
407 options
.table_properties_collector_factories
.push_back(
408 std::make_shared
<TestPropertiesCollectorFactory
>());
410 CreateAndReopenWithCF({"pikachu"}, options
);
411 ColumnFamilyMetaData cf_meta
;
412 db_
->GetColumnFamilyMetaData(handles_
[1], &cf_meta
);
414 // keep writing until writes are forced to stop.
415 for (int i
= 0; static_cast<int>(cf_meta
.file_count
) < kSlowdownTrigger
* 10;
417 Put(1, ToString(i
), std::string(10000, 'x'), WriteOptions());
419 fo
.allow_write_stall
= true;
420 db_
->Flush(fo
, handles_
[1]);
421 db_
->GetColumnFamilyMetaData(handles_
[1], &cf_meta
);
423 ASSERT_GE(listener
->slowdown_count
, kSlowdownTrigger
* 9);
426 class TestCompactionReasonListener
: public EventListener
{
428 void OnCompactionCompleted(DB
* /*db*/, const CompactionJobInfo
& ci
) override
{
429 std::lock_guard
<std::mutex
> lock(mutex_
);
430 compaction_reasons_
.push_back(ci
.compaction_reason
);
433 std::vector
<CompactionReason
> compaction_reasons_
;
437 TEST_F(EventListenerTest
, CompactionReasonLevel
) {
439 options
.env
= CurrentOptions().env
;
440 options
.create_if_missing
= true;
441 options
.memtable_factory
.reset(
442 new SpecialSkipListFactory(DBTestBase::kNumKeysByGenerateNewRandomFile
));
444 TestCompactionReasonListener
* listener
= new TestCompactionReasonListener();
445 options
.listeners
.emplace_back(listener
);
447 options
.level0_file_num_compaction_trigger
= 4;
448 options
.compaction_style
= kCompactionStyleLevel
;
450 DestroyAndReopen(options
);
453 // Write 4 files in L0
454 for (int i
= 0; i
< 4; i
++) {
455 GenerateNewRandomFile(&rnd
);
457 dbfull()->TEST_WaitForCompact();
459 ASSERT_EQ(listener
->compaction_reasons_
.size(), 1);
460 ASSERT_EQ(listener
->compaction_reasons_
[0],
461 CompactionReason::kLevelL0FilesNum
);
463 DestroyAndReopen(options
);
465 // Write 3 non-overlapping files in L0
466 for (int k
= 1; k
<= 30; k
++) {
467 ASSERT_OK(Put(Key(k
), Key(k
)));
473 // Do a trivial move from L0 -> L1
474 db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr);
476 options
.max_bytes_for_level_base
= 1;
478 listener
->compaction_reasons_
.clear();
481 dbfull()->TEST_WaitForCompact();
482 ASSERT_GT(listener
->compaction_reasons_
.size(), 1);
484 for (auto compaction_reason
: listener
->compaction_reasons_
) {
485 ASSERT_EQ(compaction_reason
, CompactionReason::kLevelMaxLevelSize
);
488 options
.disable_auto_compactions
= true;
490 listener
->compaction_reasons_
.clear();
494 CompactRangeOptions cro
;
495 cro
.bottommost_level_compaction
= BottommostLevelCompaction::kForce
;
496 ASSERT_OK(db_
->CompactRange(cro
, nullptr, nullptr));
497 ASSERT_GT(listener
->compaction_reasons_
.size(), 0);
498 for (auto compaction_reason
: listener
->compaction_reasons_
) {
499 ASSERT_EQ(compaction_reason
, CompactionReason::kManualCompaction
);
503 TEST_F(EventListenerTest
, CompactionReasonUniversal
) {
505 options
.env
= CurrentOptions().env
;
506 options
.create_if_missing
= true;
507 options
.memtable_factory
.reset(
508 new SpecialSkipListFactory(DBTestBase::kNumKeysByGenerateNewRandomFile
));
510 TestCompactionReasonListener
* listener
= new TestCompactionReasonListener();
511 options
.listeners
.emplace_back(listener
);
513 options
.compaction_style
= kCompactionStyleUniversal
;
517 options
.level0_file_num_compaction_trigger
= 8;
518 options
.compaction_options_universal
.max_size_amplification_percent
= 100000;
519 options
.compaction_options_universal
.size_ratio
= 100000;
520 DestroyAndReopen(options
);
521 listener
->compaction_reasons_
.clear();
523 // Write 8 files in L0
524 for (int i
= 0; i
< 8; i
++) {
525 GenerateNewRandomFile(&rnd
);
527 dbfull()->TEST_WaitForCompact();
529 ASSERT_GT(listener
->compaction_reasons_
.size(), 0);
530 for (auto compaction_reason
: listener
->compaction_reasons_
) {
531 ASSERT_EQ(compaction_reason
, CompactionReason::kUniversalSizeRatio
);
534 options
.level0_file_num_compaction_trigger
= 8;
535 options
.compaction_options_universal
.max_size_amplification_percent
= 1;
536 options
.compaction_options_universal
.size_ratio
= 100000;
538 DestroyAndReopen(options
);
539 listener
->compaction_reasons_
.clear();
541 // Write 8 files in L0
542 for (int i
= 0; i
< 8; i
++) {
543 GenerateNewRandomFile(&rnd
);
545 dbfull()->TEST_WaitForCompact();
547 ASSERT_GT(listener
->compaction_reasons_
.size(), 0);
548 for (auto compaction_reason
: listener
->compaction_reasons_
) {
549 ASSERT_EQ(compaction_reason
, CompactionReason::kUniversalSizeAmplification
);
552 options
.disable_auto_compactions
= true;
554 listener
->compaction_reasons_
.clear();
557 db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr);
559 ASSERT_GT(listener
->compaction_reasons_
.size(), 0);
560 for (auto compaction_reason
: listener
->compaction_reasons_
) {
561 ASSERT_EQ(compaction_reason
, CompactionReason::kManualCompaction
);
565 TEST_F(EventListenerTest
, CompactionReasonFIFO
) {
567 options
.env
= CurrentOptions().env
;
568 options
.create_if_missing
= true;
569 options
.memtable_factory
.reset(
570 new SpecialSkipListFactory(DBTestBase::kNumKeysByGenerateNewRandomFile
));
572 TestCompactionReasonListener
* listener
= new TestCompactionReasonListener();
573 options
.listeners
.emplace_back(listener
);
575 options
.level0_file_num_compaction_trigger
= 4;
576 options
.compaction_style
= kCompactionStyleFIFO
;
577 options
.compaction_options_fifo
.max_table_files_size
= 1;
579 DestroyAndReopen(options
);
582 // Write 4 files in L0
583 for (int i
= 0; i
< 4; i
++) {
584 GenerateNewRandomFile(&rnd
);
586 dbfull()->TEST_WaitForCompact();
588 ASSERT_GT(listener
->compaction_reasons_
.size(), 0);
589 for (auto compaction_reason
: listener
->compaction_reasons_
) {
590 ASSERT_EQ(compaction_reason
, CompactionReason::kFIFOMaxSize
);
594 class TableFileCreationListener
: public EventListener
{
596 class TestEnv
: public EnvWrapper
{
598 TestEnv() : EnvWrapper(Env::Default()) {}
600 void SetStatus(Status s
) { status_
= s
; }
602 Status
NewWritableFile(const std::string
& fname
,
603 std::unique_ptr
<WritableFile
>* result
,
604 const EnvOptions
& options
) override
{
605 if (fname
.size() > 4 && fname
.substr(fname
.size() - 4) == ".sst") {
610 return Env::Default()->NewWritableFile(fname
, result
, options
);
617 TableFileCreationListener() {
618 for (int i
= 0; i
< 2; i
++) {
619 started_
[i
] = finished_
[i
] = failure_
[i
] = 0;
623 int Index(TableFileCreationReason reason
) {
626 case TableFileCreationReason::kFlush
:
629 case TableFileCreationReason::kCompaction
:
638 void CheckAndResetCounters(int flush_started
, int flush_finished
,
639 int flush_failure
, int compaction_started
,
640 int compaction_finished
, int compaction_failure
) {
641 ASSERT_EQ(started_
[0], flush_started
);
642 ASSERT_EQ(finished_
[0], flush_finished
);
643 ASSERT_EQ(failure_
[0], flush_failure
);
644 ASSERT_EQ(started_
[1], compaction_started
);
645 ASSERT_EQ(finished_
[1], compaction_finished
);
646 ASSERT_EQ(failure_
[1], compaction_failure
);
647 for (int i
= 0; i
< 2; i
++) {
648 started_
[i
] = finished_
[i
] = failure_
[i
] = 0;
652 void OnTableFileCreationStarted(
653 const TableFileCreationBriefInfo
& info
) override
{
654 int idx
= Index(info
.reason
);
658 ASSERT_GT(info
.db_name
.size(), 0U);
659 ASSERT_GT(info
.cf_name
.size(), 0U);
660 ASSERT_GT(info
.file_path
.size(), 0U);
661 ASSERT_GT(info
.job_id
, 0);
664 void OnTableFileCreated(const TableFileCreationInfo
& info
) override
{
665 int idx
= Index(info
.reason
);
669 ASSERT_GT(info
.db_name
.size(), 0U);
670 ASSERT_GT(info
.cf_name
.size(), 0U);
671 ASSERT_GT(info
.file_path
.size(), 0U);
672 ASSERT_GT(info
.job_id
, 0);
673 if (info
.status
.ok()) {
674 ASSERT_GT(info
.table_properties
.data_size
, 0U);
675 ASSERT_GT(info
.table_properties
.raw_key_size
, 0U);
676 ASSERT_GT(info
.table_properties
.raw_value_size
, 0U);
677 ASSERT_GT(info
.table_properties
.num_data_blocks
, 0U);
678 ASSERT_GT(info
.table_properties
.num_entries
, 0U);
692 TEST_F(EventListenerTest
, TableFileCreationListenersTest
) {
693 auto listener
= std::make_shared
<TableFileCreationListener
>();
695 options
.create_if_missing
= true;
696 options
.listeners
.push_back(listener
);
697 options
.env
= &listener
->test_env
;
698 DestroyAndReopen(options
);
700 ASSERT_OK(Put("foo", "aaa"));
701 ASSERT_OK(Put("bar", "bbb"));
703 dbfull()->TEST_WaitForFlushMemTable();
704 listener
->CheckAndResetCounters(1, 1, 0, 0, 0, 0);
706 ASSERT_OK(Put("foo", "aaa1"));
707 ASSERT_OK(Put("bar", "bbb1"));
708 listener
->test_env
.SetStatus(Status::NotSupported("not supported"));
710 listener
->CheckAndResetCounters(1, 1, 1, 0, 0, 0);
711 listener
->test_env
.SetStatus(Status::OK());
714 ASSERT_OK(Put("foo", "aaa2"));
715 ASSERT_OK(Put("bar", "bbb2"));
717 dbfull()->TEST_WaitForFlushMemTable();
718 listener
->CheckAndResetCounters(1, 1, 0, 0, 0, 0);
720 const Slice kRangeStart
= "a";
721 const Slice kRangeEnd
= "z";
722 dbfull()->CompactRange(CompactRangeOptions(), &kRangeStart
, &kRangeEnd
);
723 dbfull()->TEST_WaitForCompact();
724 listener
->CheckAndResetCounters(0, 0, 0, 1, 1, 0);
726 ASSERT_OK(Put("foo", "aaa3"));
727 ASSERT_OK(Put("bar", "bbb3"));
729 listener
->test_env
.SetStatus(Status::NotSupported("not supported"));
730 dbfull()->CompactRange(CompactRangeOptions(), &kRangeStart
, &kRangeEnd
);
731 dbfull()->TEST_WaitForCompact();
732 listener
->CheckAndResetCounters(1, 1, 0, 1, 1, 1);
735 class MemTableSealedListener
: public EventListener
{
737 SequenceNumber latest_seq_number_
;
739 MemTableSealedListener() {}
740 void OnMemTableSealed(const MemTableInfo
& info
) override
{
741 latest_seq_number_
= info
.first_seqno
;
744 void OnFlushCompleted(DB
* /*db*/,
745 const FlushJobInfo
& flush_job_info
) override
{
746 ASSERT_LE(flush_job_info
.smallest_seqno
, latest_seq_number_
);
750 TEST_F(EventListenerTest
, MemTableSealedListenerTest
) {
751 auto listener
= std::make_shared
<MemTableSealedListener
>();
753 options
.create_if_missing
= true;
754 options
.listeners
.push_back(listener
);
755 DestroyAndReopen(options
);
757 for (unsigned int i
= 0; i
< 10; i
++) {
758 std::string tag
= std::to_string(i
);
759 ASSERT_OK(Put("foo"+tag
, "aaa"));
760 ASSERT_OK(Put("bar"+tag
, "bbb"));
766 class ColumnFamilyHandleDeletionStartedListener
: public EventListener
{
768 std::vector
<std::string
> cfs_
;
772 explicit ColumnFamilyHandleDeletionStartedListener(
773 const std::vector
<std::string
>& cfs
)
774 : cfs_(cfs
), counter(0) {
775 cfs_
.insert(cfs_
.begin(), kDefaultColumnFamilyName
);
777 void OnColumnFamilyHandleDeletionStarted(
778 ColumnFamilyHandle
* handle
) override
{
779 ASSERT_EQ(cfs_
[handle
->GetID()], handle
->GetName());
782 int getCounter() { return counter
; }
785 TEST_F(EventListenerTest
, ColumnFamilyHandleDeletionStartedListenerTest
) {
786 std::vector
<std::string
> cfs
{"pikachu", "eevee", "Mewtwo"};
788 std::make_shared
<ColumnFamilyHandleDeletionStartedListener
>(cfs
);
790 options
.env
= CurrentOptions().env
;
791 options
.create_if_missing
= true;
792 options
.listeners
.push_back(listener
);
793 CreateAndReopenWithCF(cfs
, options
);
794 ASSERT_EQ(handles_
.size(), 4);
799 ASSERT_EQ(listener
->getCounter(), 3);
802 class BackgroundErrorListener
: public EventListener
{
808 BackgroundErrorListener(SpecialEnv
* env
) : env_(env
), counter_(0) {}
810 void OnBackgroundError(BackgroundErrorReason
/*reason*/,
811 Status
* bg_error
) override
{
813 // suppress the first error and disable write-dropping such that a retry
815 *bg_error
= Status::OK();
816 env_
->drop_writes_
.store(false, std::memory_order_release
);
817 env_
->no_slowdown_
= false;
822 int counter() { return counter_
; }
825 TEST_F(EventListenerTest
, BackgroundErrorListenerFailedFlushTest
) {
826 auto listener
= std::make_shared
<BackgroundErrorListener
>(env_
);
828 options
.create_if_missing
= true;
830 options
.listeners
.push_back(listener
);
831 options
.memtable_factory
.reset(new SpecialSkipListFactory(1));
832 options
.paranoid_checks
= true;
833 DestroyAndReopen(options
);
835 // the usual TEST_WaitForFlushMemTable() doesn't work for failed flushes, so
836 // forge a custom one for the failed flush case.
837 rocksdb::SyncPoint::GetInstance()->LoadDependency(
838 {{"DBImpl::BGWorkFlush:done",
839 "EventListenerTest:BackgroundErrorListenerFailedFlushTest:1"}});
840 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
842 env_
->drop_writes_
.store(true, std::memory_order_release
);
843 env_
->no_slowdown_
= true;
845 ASSERT_OK(Put("key0", "val"));
846 ASSERT_OK(Put("key1", "val"));
847 TEST_SYNC_POINT("EventListenerTest:BackgroundErrorListenerFailedFlushTest:1");
848 ASSERT_EQ(1, listener
->counter());
849 ASSERT_OK(Put("key2", "val"));
850 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
851 ASSERT_EQ(1, NumTableFilesAtLevel(0));
854 TEST_F(EventListenerTest
, BackgroundErrorListenerFailedCompactionTest
) {
855 auto listener
= std::make_shared
<BackgroundErrorListener
>(env_
);
857 options
.create_if_missing
= true;
858 options
.disable_auto_compactions
= true;
860 options
.level0_file_num_compaction_trigger
= 2;
861 options
.listeners
.push_back(listener
);
862 options
.memtable_factory
.reset(new SpecialSkipListFactory(2));
863 options
.paranoid_checks
= true;
864 DestroyAndReopen(options
);
866 // third iteration triggers the second memtable's flush
867 for (int i
= 0; i
< 3; ++i
) {
868 ASSERT_OK(Put("key0", "val"));
870 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
872 ASSERT_OK(Put("key1", "val"));
874 ASSERT_EQ(2, NumTableFilesAtLevel(0));
876 env_
->drop_writes_
.store(true, std::memory_order_release
);
877 env_
->no_slowdown_
= true;
878 ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
879 ASSERT_OK(dbfull()->TEST_WaitForCompact());
880 ASSERT_EQ(1, listener
->counter());
882 // trigger flush so compaction is triggered again; this time it succeeds
883 // The previous failed compaction may get retried automatically, so we may
884 // be left with 0 or 1 files in level 1, depending on when the retry gets
886 ASSERT_OK(Put("key0", "val"));
887 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
888 ASSERT_OK(dbfull()->TEST_WaitForCompact());
889 ASSERT_LE(1, NumTableFilesAtLevel(0));
892 class TestFileOperationListener
: public EventListener
{
894 TestFileOperationListener() {
895 file_reads_
.store(0);
896 file_reads_success_
.store(0);
897 file_writes_
.store(0);
898 file_writes_success_
.store(0);
901 void OnFileReadFinish(const FileOperationInfo
& info
) override
{
903 if (info
.status
.ok()) {
904 ++file_reads_success_
;
906 ReportDuration(info
);
909 void OnFileWriteFinish(const FileOperationInfo
& info
) override
{
911 if (info
.status
.ok()) {
912 ++file_writes_success_
;
914 ReportDuration(info
);
917 bool ShouldBeNotifiedOnFileIO() override
{ return true; }
919 std::atomic
<size_t> file_reads_
;
920 std::atomic
<size_t> file_reads_success_
;
921 std::atomic
<size_t> file_writes_
;
922 std::atomic
<size_t> file_writes_success_
;
925 void ReportDuration(const FileOperationInfo
& info
) const {
926 auto duration
= std::chrono::duration_cast
<std::chrono::nanoseconds
>(
927 info
.finish_timestamp
- info
.start_timestamp
);
928 ASSERT_GT(duration
.count(), 0);
932 TEST_F(EventListenerTest
, OnFileOperationTest
) {
934 options
.env
= CurrentOptions().env
;
935 options
.create_if_missing
= true;
937 TestFileOperationListener
* listener
= new TestFileOperationListener();
938 options
.listeners
.emplace_back(listener
);
940 DestroyAndReopen(options
);
941 ASSERT_OK(Put("foo", "aaa"));
942 dbfull()->Flush(FlushOptions());
943 dbfull()->TEST_WaitForFlushMemTable();
944 ASSERT_GE(listener
->file_writes_
.load(),
945 listener
->file_writes_success_
.load());
946 ASSERT_GT(listener
->file_writes_
.load(), 0);
950 ASSERT_GE(listener
->file_reads_
.load(), listener
->file_reads_success_
.load());
951 ASSERT_GT(listener
->file_reads_
.load(), 0);
954 } // namespace rocksdb
956 #endif // ROCKSDB_LITE
958 int main(int argc
, char** argv
) {
959 ::testing::InitGoogleTest(&argc
, argv
);
960 return RUN_ALL_TESTS();