1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 #include "db/blob/blob_index.h"
7 #include "db/db_impl/db_impl.h"
8 #include "db/db_test_util.h"
9 #include "db/dbformat.h"
10 #include "db/version_set.h"
11 #include "db/write_batch_internal.h"
12 #include "file/filename.h"
13 #include "monitoring/statistics.h"
14 #include "rocksdb/cache.h"
15 #include "rocksdb/compaction_filter.h"
16 #include "rocksdb/db.h"
17 #include "rocksdb/env.h"
18 #include "rocksdb/filter_policy.h"
19 #include "rocksdb/options.h"
20 #include "rocksdb/perf_context.h"
21 #include "rocksdb/slice.h"
22 #include "rocksdb/slice_transform.h"
23 #include "rocksdb/table.h"
24 #include "rocksdb/table_properties.h"
25 #include "test_util/sync_point.h"
26 #include "test_util/testharness.h"
27 #include "test_util/testutil.h"
28 #include "util/hash.h"
29 #include "util/mutexlock.h"
30 #include "util/rate_limiter.h"
31 #include "util/string_util.h"
32 #include "utilities/merge_operators.h"
36 namespace ROCKSDB_NAMESPACE
{
38 class EventListenerTest
: public DBTestBase
{
40 EventListenerTest() : DBTestBase("listener_test", /*env_do_fsync=*/true) {}
42 static std::string
BlobStr(uint64_t blob_file_number
, uint64_t offset
,
44 std::string blob_index
;
45 BlobIndex::EncodeBlob(&blob_index
, blob_file_number
, offset
, size
,
50 const size_t k110KB
= 110 << 10;
53 struct TestPropertiesCollector
54 : public ROCKSDB_NAMESPACE::TablePropertiesCollector
{
55 ROCKSDB_NAMESPACE::Status
AddUserKey(
56 const ROCKSDB_NAMESPACE::Slice
& /*key*/,
57 const ROCKSDB_NAMESPACE::Slice
& /*value*/,
58 ROCKSDB_NAMESPACE::EntryType
/*type*/,
59 ROCKSDB_NAMESPACE::SequenceNumber
/*seq*/,
60 uint64_t /*file_size*/) override
{
63 ROCKSDB_NAMESPACE::Status
Finish(
64 ROCKSDB_NAMESPACE::UserCollectedProperties
* properties
) override
{
65 properties
->insert({"0", "1"});
69 const char* Name() const override
{ return "TestTablePropertiesCollector"; }
71 ROCKSDB_NAMESPACE::UserCollectedProperties
GetReadableProperties()
73 ROCKSDB_NAMESPACE::UserCollectedProperties ret
;
79 class TestPropertiesCollectorFactory
: public TablePropertiesCollectorFactory
{
81 TablePropertiesCollector
* CreateTablePropertiesCollector(
82 TablePropertiesCollectorFactory::Context
/*context*/) override
{
83 return new TestPropertiesCollector
;
85 const char* Name() const override
{ return "TestTablePropertiesCollector"; }
88 class TestCompactionListener
: public EventListener
{
90 explicit TestCompactionListener(EventListenerTest
* test
) : test_(test
) {}
92 void OnCompactionCompleted(DB
* db
, const CompactionJobInfo
& ci
) override
{
93 std::lock_guard
<std::mutex
> lock(mutex_
);
94 compacted_dbs_
.push_back(db
);
95 ASSERT_GT(ci
.input_files
.size(), 0U);
96 ASSERT_EQ(ci
.input_files
.size(), ci
.input_file_infos
.size());
98 for (size_t i
= 0; i
< ci
.input_file_infos
.size(); ++i
) {
99 ASSERT_EQ(ci
.input_file_infos
[i
].level
, ci
.base_input_level
);
100 ASSERT_EQ(ci
.input_file_infos
[i
].file_number
,
101 TableFileNameToNumber(ci
.input_files
[i
]));
104 ASSERT_GT(ci
.output_files
.size(), 0U);
105 ASSERT_EQ(ci
.output_files
.size(), ci
.output_file_infos
.size());
108 ASSERT_EQ(test_
->db_
, db
);
110 std::vector
<std::vector
<FileMetaData
>> files_by_level
;
111 test_
->dbfull()->TEST_GetFilesMetaData(test_
->handles_
[ci
.cf_id
],
113 ASSERT_GT(files_by_level
.size(), ci
.output_level
);
115 for (size_t i
= 0; i
< ci
.output_file_infos
.size(); ++i
) {
116 ASSERT_EQ(ci
.output_file_infos
[i
].level
, ci
.output_level
);
117 ASSERT_EQ(ci
.output_file_infos
[i
].file_number
,
118 TableFileNameToNumber(ci
.output_files
[i
]));
120 auto it
= std::find_if(
121 files_by_level
[ci
.output_level
].begin(),
122 files_by_level
[ci
.output_level
].end(), [&](const FileMetaData
& meta
) {
123 return meta
.fd
.GetNumber() == ci
.output_file_infos
[i
].file_number
;
125 ASSERT_NE(it
, files_by_level
[ci
.output_level
].end());
127 ASSERT_EQ(ci
.output_file_infos
[i
].oldest_blob_file_number
,
128 it
->oldest_blob_file_number
);
131 ASSERT_EQ(db
->GetEnv()->GetThreadID(), ci
.thread_id
);
132 ASSERT_GT(ci
.thread_id
, 0U);
134 for (auto fl
: {ci
.input_files
, ci
.output_files
}) {
136 auto it
= ci
.table_properties
.find(fn
);
137 ASSERT_NE(it
, ci
.table_properties
.end());
138 auto tp
= it
->second
;
139 ASSERT_TRUE(tp
!= nullptr);
140 ASSERT_EQ(tp
->user_collected_properties
.find("0")->second
, "1");
145 EventListenerTest
* test_
;
146 std::vector
<DB
*> compacted_dbs_
;
150 TEST_F(EventListenerTest
, OnSingleDBCompactionTest
) {
151 const int kTestKeySize
= 16;
152 const int kTestValueSize
= 984;
153 const int kEntrySize
= kTestKeySize
+ kTestValueSize
;
154 const int kEntriesPerBuffer
= 100;
155 const int kNumL0Files
= 4;
158 options
.env
= CurrentOptions().env
;
159 options
.create_if_missing
= true;
160 options
.write_buffer_size
= kEntrySize
* kEntriesPerBuffer
;
161 options
.compaction_style
= kCompactionStyleLevel
;
162 options
.target_file_size_base
= options
.write_buffer_size
;
163 options
.max_bytes_for_level_base
= options
.target_file_size_base
* 2;
164 options
.max_bytes_for_level_multiplier
= 2;
165 options
.compression
= kNoCompression
;
166 #ifdef ROCKSDB_USING_THREAD_STATUS
167 options
.enable_thread_tracking
= true;
168 #endif // ROCKSDB_USING_THREAD_STATUS
169 options
.level0_file_num_compaction_trigger
= kNumL0Files
;
170 options
.table_properties_collector_factories
.push_back(
171 std::make_shared
<TestPropertiesCollectorFactory
>());
173 TestCompactionListener
* listener
= new TestCompactionListener(this);
174 options
.listeners
.emplace_back(listener
);
175 std::vector
<std::string
> cf_names
= {"pikachu", "ilya", "muromec",
176 "dobrynia", "nikitich", "alyosha",
178 CreateAndReopenWithCF(cf_names
, options
);
179 ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
182 ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch
, 1, "ditto",
183 BlobStr(123, 0, 1 << 10)));
184 ASSERT_OK(dbfull()->Write(WriteOptions(), &batch
));
186 ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
187 ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
188 ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
189 ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
190 ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
191 ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
192 for (int i
= 1; i
< 8; ++i
) {
194 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
195 ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_
[i
],
197 ASSERT_OK(dbfull()->TEST_WaitForCompact());
200 ASSERT_EQ(listener
->compacted_dbs_
.size(), cf_names
.size());
201 for (size_t i
= 0; i
< cf_names
.size(); ++i
) {
202 ASSERT_EQ(listener
->compacted_dbs_
[i
], db_
);
206 // This simple Listener can only handle one flush at a time.
207 class TestFlushListener
: public EventListener
{
209 TestFlushListener(Env
* env
, EventListenerTest
* test
)
210 : slowdown_count(0), stop_count(0), db_closed(), env_(env
), test_(test
) {
214 virtual ~TestFlushListener() {
215 prev_fc_info_
.status
.PermitUncheckedError(); // Ignore the status
217 void OnTableFileCreated(const TableFileCreationInfo
& info
) override
{
218 // remember the info for later checking the FlushJobInfo.
219 prev_fc_info_
= info
;
220 ASSERT_GT(info
.db_name
.size(), 0U);
221 ASSERT_GT(info
.cf_name
.size(), 0U);
222 ASSERT_GT(info
.file_path
.size(), 0U);
223 ASSERT_GT(info
.job_id
, 0);
224 ASSERT_GT(info
.table_properties
.data_size
, 0U);
225 ASSERT_GT(info
.table_properties
.raw_key_size
, 0U);
226 ASSERT_GT(info
.table_properties
.raw_value_size
, 0U);
227 ASSERT_GT(info
.table_properties
.num_data_blocks
, 0U);
228 ASSERT_GT(info
.table_properties
.num_entries
, 0U);
229 ASSERT_EQ(info
.file_checksum
, kUnknownFileChecksum
);
230 ASSERT_EQ(info
.file_checksum_func_name
, kUnknownFileChecksumFuncName
);
232 #ifdef ROCKSDB_USING_THREAD_STATUS
233 // Verify the id of the current thread that created this table
234 // file matches the id of any active flush or compaction thread.
235 uint64_t thread_id
= env_
->GetThreadID();
236 std::vector
<ThreadStatus
> thread_list
;
237 ASSERT_OK(env_
->GetThreadList(&thread_list
));
238 bool found_match
= false;
239 for (auto thread_status
: thread_list
) {
240 if (thread_status
.operation_type
== ThreadStatus::OP_FLUSH
||
241 thread_status
.operation_type
== ThreadStatus::OP_COMPACTION
) {
242 if (thread_id
== thread_status
.thread_id
) {
248 ASSERT_TRUE(found_match
);
249 #endif // ROCKSDB_USING_THREAD_STATUS
252 void OnFlushCompleted(DB
* db
, const FlushJobInfo
& info
) override
{
253 flushed_dbs_
.push_back(db
);
254 flushed_column_family_names_
.push_back(info
.cf_name
);
255 if (info
.triggered_writes_slowdown
) {
258 if (info
.triggered_writes_stop
) {
261 // verify whether the previously created file matches the flushed file.
262 ASSERT_EQ(prev_fc_info_
.db_name
, db
->GetName());
263 ASSERT_EQ(prev_fc_info_
.cf_name
, info
.cf_name
);
264 ASSERT_EQ(prev_fc_info_
.job_id
, info
.job_id
);
265 ASSERT_EQ(prev_fc_info_
.file_path
, info
.file_path
);
266 ASSERT_EQ(TableFileNameToNumber(info
.file_path
), info
.file_number
);
268 // Note: the following chunk relies on the notification pertaining to the
269 // database pointed to by DBTestBase::db_, and is thus bypassed when
270 // that assumption does not hold (see the test case MultiDBMultiListeners
273 if (db
== test_
->db_
) {
274 std::vector
<std::vector
<FileMetaData
>> files_by_level
;
275 ASSERT_LT(info
.cf_id
, test_
->handles_
.size());
276 ASSERT_GE(info
.cf_id
, 0u);
277 ASSERT_NE(test_
->handles_
[info
.cf_id
], nullptr);
278 test_
->dbfull()->TEST_GetFilesMetaData(test_
->handles_
[info
.cf_id
],
281 ASSERT_FALSE(files_by_level
.empty());
282 auto it
= std::find_if(files_by_level
[0].begin(), files_by_level
[0].end(),
283 [&](const FileMetaData
& meta
) {
284 return meta
.fd
.GetNumber() == info
.file_number
;
286 ASSERT_NE(it
, files_by_level
[0].end());
287 ASSERT_EQ(info
.oldest_blob_file_number
, it
->oldest_blob_file_number
);
290 ASSERT_EQ(db
->GetEnv()->GetThreadID(), info
.thread_id
);
291 ASSERT_GT(info
.thread_id
, 0U);
292 ASSERT_EQ(info
.table_properties
.user_collected_properties
.find("0")->second
,
296 std::vector
<std::string
> flushed_column_family_names_
;
297 std::vector
<DB
*> flushed_dbs_
;
301 std::atomic_bool db_closed
;
302 TableFileCreationInfo prev_fc_info_
;
306 EventListenerTest
* test_
;
309 TEST_F(EventListenerTest
, OnSingleDBFlushTest
) {
311 options
.env
= CurrentOptions().env
;
312 options
.write_buffer_size
= k110KB
;
313 #ifdef ROCKSDB_USING_THREAD_STATUS
314 options
.enable_thread_tracking
= true;
315 #endif // ROCKSDB_USING_THREAD_STATUS
316 TestFlushListener
* listener
= new TestFlushListener(options
.env
, this);
317 options
.listeners
.emplace_back(listener
);
318 std::vector
<std::string
> cf_names
= {"pikachu", "ilya", "muromec",
319 "dobrynia", "nikitich", "alyosha",
321 options
.table_properties_collector_factories
.push_back(
322 std::make_shared
<TestPropertiesCollectorFactory
>());
323 CreateAndReopenWithCF(cf_names
, options
);
325 ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
328 ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch
, 1, "ditto",
329 BlobStr(456, 0, 1 << 10)));
330 ASSERT_OK(dbfull()->Write(WriteOptions(), &batch
));
332 ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
333 ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
334 ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
335 ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
336 ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
337 ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
338 for (int i
= 1; i
< 8; ++i
) {
340 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
341 // Ensure background work is fully finished including listener callbacks
342 // before accessing listener state.
343 ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
344 ASSERT_EQ(listener
->flushed_dbs_
.size(), i
);
345 ASSERT_EQ(listener
->flushed_column_family_names_
.size(), i
);
348 // make sure callback functions are called in the right order
349 for (size_t i
= 0; i
< cf_names
.size(); ++i
) {
350 ASSERT_EQ(listener
->flushed_dbs_
[i
], db_
);
351 ASSERT_EQ(listener
->flushed_column_family_names_
[i
], cf_names
[i
]);
355 TEST_F(EventListenerTest
, MultiCF
) {
357 options
.env
= CurrentOptions().env
;
358 options
.write_buffer_size
= k110KB
;
359 #ifdef ROCKSDB_USING_THREAD_STATUS
360 options
.enable_thread_tracking
= true;
361 #endif // ROCKSDB_USING_THREAD_STATUS
362 for (auto atomic_flush
: {false, true}) {
363 options
.atomic_flush
= atomic_flush
;
364 options
.create_if_missing
= true;
365 DestroyAndReopen(options
);
366 TestFlushListener
* listener
= new TestFlushListener(options
.env
, this);
367 options
.listeners
.emplace_back(listener
);
368 options
.table_properties_collector_factories
.push_back(
369 std::make_shared
<TestPropertiesCollectorFactory
>());
370 std::vector
<std::string
> cf_names
= {"pikachu", "ilya", "muromec",
371 "dobrynia", "nikitich", "alyosha",
373 CreateAndReopenWithCF(cf_names
, options
);
375 ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
376 ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
377 ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
378 ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
379 ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
380 ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
381 ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
383 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
385 for (int i
= 1; i
< 8; ++i
) {
386 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
387 {{"DBImpl::NotifyOnFlushCompleted::PostAllOnFlushCompleted",
388 "EventListenerTest.MultiCF:PreVerifyListener"}});
390 TEST_SYNC_POINT("EventListenerTest.MultiCF:PreVerifyListener");
391 ASSERT_EQ(listener
->flushed_dbs_
.size(), i
);
392 ASSERT_EQ(listener
->flushed_column_family_names_
.size(), i
);
393 // make sure callback functions are called in the right order
395 for (size_t j
= 0; j
< cf_names
.size(); j
++) {
396 ASSERT_EQ(listener
->flushed_dbs_
[j
], db_
);
397 ASSERT_EQ(listener
->flushed_column_family_names_
[j
], cf_names
[j
]);
402 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
407 TEST_F(EventListenerTest
, MultiDBMultiListeners
) {
409 options
.env
= CurrentOptions().env
;
410 #ifdef ROCKSDB_USING_THREAD_STATUS
411 options
.enable_thread_tracking
= true;
412 #endif // ROCKSDB_USING_THREAD_STATUS
413 options
.table_properties_collector_factories
.push_back(
414 std::make_shared
<TestPropertiesCollectorFactory
>());
415 std::vector
<TestFlushListener
*> listeners
;
416 const int kNumDBs
= 5;
417 const int kNumListeners
= 10;
418 for (int i
= 0; i
< kNumListeners
; ++i
) {
419 listeners
.emplace_back(new TestFlushListener(options
.env
, this));
422 std::vector
<std::string
> cf_names
= {"pikachu", "ilya", "muromec",
423 "dobrynia", "nikitich", "alyosha",
426 options
.create_if_missing
= true;
427 for (int i
= 0; i
< kNumListeners
; ++i
) {
428 options
.listeners
.emplace_back(listeners
[i
]);
430 DBOptions
db_opts(options
);
431 ColumnFamilyOptions
cf_opts(options
);
433 std::vector
<DB
*> dbs
;
434 std::vector
<std::vector
<ColumnFamilyHandle
*>> vec_handles
;
436 for (int d
= 0; d
< kNumDBs
; ++d
) {
437 ASSERT_OK(DestroyDB(dbname_
+ std::to_string(d
), options
));
439 std::vector
<ColumnFamilyHandle
*> handles
;
440 ASSERT_OK(DB::Open(options
, dbname_
+ std::to_string(d
), &db
));
441 for (size_t c
= 0; c
< cf_names
.size(); ++c
) {
442 ColumnFamilyHandle
* handle
;
443 ASSERT_OK(db
->CreateColumnFamily(cf_opts
, cf_names
[c
], &handle
));
444 handles
.push_back(handle
);
447 vec_handles
.push_back(std::move(handles
));
451 for (int d
= 0; d
< kNumDBs
; ++d
) {
452 for (size_t c
= 0; c
< cf_names
.size(); ++c
) {
453 ASSERT_OK(dbs
[d
]->Put(WriteOptions(), vec_handles
[d
][c
], cf_names
[c
],
458 for (size_t c
= 0; c
< cf_names
.size(); ++c
) {
459 for (int d
= 0; d
< kNumDBs
; ++d
) {
460 ASSERT_OK(dbs
[d
]->Flush(FlushOptions(), vec_handles
[d
][c
]));
462 static_cast_with_check
<DBImpl
>(dbs
[d
])->TEST_WaitForFlushMemTable());
466 for (int d
= 0; d
< kNumDBs
; ++d
) {
467 // Ensure background work is fully finished including listener callbacks
468 // before accessing listener state.
470 static_cast_with_check
<DBImpl
>(dbs
[d
])->TEST_WaitForBackgroundWork());
473 for (auto* listener
: listeners
) {
475 for (size_t c
= 0; c
< cf_names
.size(); ++c
) {
476 for (int d
= 0; d
< kNumDBs
; ++d
) {
477 ASSERT_EQ(listener
->flushed_dbs_
[pos
], dbs
[d
]);
478 ASSERT_EQ(listener
->flushed_column_family_names_
[pos
], cf_names
[c
]);
484 for (auto handles
: vec_handles
) {
485 for (auto h
: handles
) {
492 for (auto db
: dbs
) {
497 TEST_F(EventListenerTest
, DisableBGCompaction
) {
499 options
.env
= CurrentOptions().env
;
500 #ifdef ROCKSDB_USING_THREAD_STATUS
501 options
.enable_thread_tracking
= true;
502 #endif // ROCKSDB_USING_THREAD_STATUS
503 TestFlushListener
* listener
= new TestFlushListener(options
.env
, this);
504 const int kCompactionTrigger
= 1;
505 const int kSlowdownTrigger
= 5;
506 const int kStopTrigger
= 100;
507 options
.level0_file_num_compaction_trigger
= kCompactionTrigger
;
508 options
.level0_slowdown_writes_trigger
= kSlowdownTrigger
;
509 options
.level0_stop_writes_trigger
= kStopTrigger
;
510 options
.max_write_buffer_number
= 10;
511 options
.listeners
.emplace_back(listener
);
512 // BG compaction is disabled. Number of L0 files will simply keeps
513 // increasing in this test.
514 options
.compaction_style
= kCompactionStyleNone
;
515 options
.compression
= kNoCompression
;
516 options
.write_buffer_size
= 100000; // Small write buffer
517 options
.table_properties_collector_factories
.push_back(
518 std::make_shared
<TestPropertiesCollectorFactory
>());
520 CreateAndReopenWithCF({"pikachu"}, options
);
521 ColumnFamilyMetaData cf_meta
;
522 db_
->GetColumnFamilyMetaData(handles_
[1], &cf_meta
);
524 // keep writing until writes are forced to stop.
525 for (int i
= 0; static_cast<int>(cf_meta
.file_count
) < kSlowdownTrigger
* 10;
528 Put(1, std::to_string(i
), std::string(10000, 'x'), WriteOptions()));
530 fo
.allow_write_stall
= true;
531 ASSERT_OK(db_
->Flush(fo
, handles_
[1]));
532 db_
->GetColumnFamilyMetaData(handles_
[1], &cf_meta
);
534 // Ensure background work is fully finished including listener callbacks
535 // before accessing listener state.
536 ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
537 ASSERT_GE(listener
->slowdown_count
, kSlowdownTrigger
* 9);
540 class TestCompactionReasonListener
: public EventListener
{
542 void OnCompactionCompleted(DB
* /*db*/, const CompactionJobInfo
& ci
) override
{
543 std::lock_guard
<std::mutex
> lock(mutex_
);
544 compaction_reasons_
.push_back(ci
.compaction_reason
);
547 std::vector
<CompactionReason
> compaction_reasons_
;
551 TEST_F(EventListenerTest
, CompactionReasonLevel
) {
553 options
.env
= CurrentOptions().env
;
554 options
.create_if_missing
= true;
555 options
.memtable_factory
.reset(test::NewSpecialSkipListFactory(
556 DBTestBase::kNumKeysByGenerateNewRandomFile
));
558 TestCompactionReasonListener
* listener
= new TestCompactionReasonListener();
559 options
.listeners
.emplace_back(listener
);
561 options
.level0_file_num_compaction_trigger
= 4;
562 options
.compaction_style
= kCompactionStyleLevel
;
564 DestroyAndReopen(options
);
567 // Write 4 files in L0
568 for (int i
= 0; i
< 4; i
++) {
569 GenerateNewRandomFile(&rnd
);
571 ASSERT_OK(dbfull()->TEST_WaitForCompact());
573 ASSERT_EQ(listener
->compaction_reasons_
.size(), 1);
574 ASSERT_EQ(listener
->compaction_reasons_
[0],
575 CompactionReason::kLevelL0FilesNum
);
577 DestroyAndReopen(options
);
579 // Write 3 non-overlapping files in L0
580 for (int k
= 1; k
<= 30; k
++) {
581 ASSERT_OK(Put(Key(k
), Key(k
)));
587 // Do a trivial move from L0 -> L1
588 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
590 options
.max_bytes_for_level_base
= 1;
592 listener
->compaction_reasons_
.clear();
595 ASSERT_OK(dbfull()->TEST_WaitForCompact());
596 ASSERT_GT(listener
->compaction_reasons_
.size(), 1);
598 for (auto compaction_reason
: listener
->compaction_reasons_
) {
599 ASSERT_EQ(compaction_reason
, CompactionReason::kLevelMaxLevelSize
);
602 options
.disable_auto_compactions
= true;
604 listener
->compaction_reasons_
.clear();
607 ASSERT_OK(Put("key", "value"));
608 CompactRangeOptions cro
;
609 cro
.bottommost_level_compaction
= BottommostLevelCompaction::kForceOptimized
;
610 ASSERT_OK(db_
->CompactRange(cro
, nullptr, nullptr));
611 ASSERT_GT(listener
->compaction_reasons_
.size(), 0);
612 for (auto compaction_reason
: listener
->compaction_reasons_
) {
613 ASSERT_EQ(compaction_reason
, CompactionReason::kManualCompaction
);
617 TEST_F(EventListenerTest
, CompactionReasonUniversal
) {
619 options
.env
= CurrentOptions().env
;
620 options
.create_if_missing
= true;
621 options
.memtable_factory
.reset(test::NewSpecialSkipListFactory(
622 DBTestBase::kNumKeysByGenerateNewRandomFile
));
624 TestCompactionReasonListener
* listener
= new TestCompactionReasonListener();
625 options
.listeners
.emplace_back(listener
);
627 options
.compaction_style
= kCompactionStyleUniversal
;
631 options
.level0_file_num_compaction_trigger
= 8;
632 options
.compaction_options_universal
.max_size_amplification_percent
= 100000;
633 options
.compaction_options_universal
.size_ratio
= 100000;
634 DestroyAndReopen(options
);
635 listener
->compaction_reasons_
.clear();
637 // Write 8 files in L0
638 for (int i
= 0; i
< 8; i
++) {
639 GenerateNewRandomFile(&rnd
);
641 ASSERT_OK(dbfull()->TEST_WaitForCompact());
643 ASSERT_GT(listener
->compaction_reasons_
.size(), 0);
644 for (auto compaction_reason
: listener
->compaction_reasons_
) {
645 ASSERT_EQ(compaction_reason
, CompactionReason::kUniversalSizeRatio
);
648 options
.level0_file_num_compaction_trigger
= 8;
649 options
.compaction_options_universal
.max_size_amplification_percent
= 1;
650 options
.compaction_options_universal
.size_ratio
= 100000;
652 DestroyAndReopen(options
);
653 listener
->compaction_reasons_
.clear();
655 // Write 8 files in L0
656 for (int i
= 0; i
< 8; i
++) {
657 GenerateNewRandomFile(&rnd
);
659 ASSERT_OK(dbfull()->TEST_WaitForCompact());
661 ASSERT_GT(listener
->compaction_reasons_
.size(), 0);
662 for (auto compaction_reason
: listener
->compaction_reasons_
) {
663 ASSERT_EQ(compaction_reason
, CompactionReason::kUniversalSizeAmplification
);
666 options
.disable_auto_compactions
= true;
668 listener
->compaction_reasons_
.clear();
671 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
673 ASSERT_GT(listener
->compaction_reasons_
.size(), 0);
674 for (auto compaction_reason
: listener
->compaction_reasons_
) {
675 ASSERT_EQ(compaction_reason
, CompactionReason::kManualCompaction
);
679 TEST_F(EventListenerTest
, CompactionReasonFIFO
) {
681 options
.env
= CurrentOptions().env
;
682 options
.create_if_missing
= true;
683 options
.memtable_factory
.reset(test::NewSpecialSkipListFactory(
684 DBTestBase::kNumKeysByGenerateNewRandomFile
));
686 TestCompactionReasonListener
* listener
= new TestCompactionReasonListener();
687 options
.listeners
.emplace_back(listener
);
689 options
.level0_file_num_compaction_trigger
= 4;
690 options
.compaction_style
= kCompactionStyleFIFO
;
691 options
.compaction_options_fifo
.max_table_files_size
= 1;
693 DestroyAndReopen(options
);
696 // Write 4 files in L0
697 for (int i
= 0; i
< 4; i
++) {
698 GenerateNewRandomFile(&rnd
);
700 ASSERT_OK(dbfull()->TEST_WaitForCompact());
702 ASSERT_GT(listener
->compaction_reasons_
.size(), 0);
703 for (auto compaction_reason
: listener
->compaction_reasons_
) {
704 ASSERT_EQ(compaction_reason
, CompactionReason::kFIFOMaxSize
);
708 class TableFileCreationListener
: public EventListener
{
710 class TestEnv
: public EnvWrapper
{
712 explicit TestEnv(Env
* t
) : EnvWrapper(t
) {}
713 static const char* kClassName() { return "TestEnv"; }
714 const char* Name() const override
{ return kClassName(); }
716 void SetStatus(Status s
) { status_
= s
; }
718 Status
NewWritableFile(const std::string
& fname
,
719 std::unique_ptr
<WritableFile
>* result
,
720 const EnvOptions
& options
) override
{
721 if (fname
.size() > 4 && fname
.substr(fname
.size() - 4) == ".sst") {
726 return target()->NewWritableFile(fname
, result
, options
);
733 TableFileCreationListener() {
734 for (int i
= 0; i
< 2; i
++) {
735 started_
[i
] = finished_
[i
] = failure_
[i
] = 0;
739 int Index(TableFileCreationReason reason
) {
742 case TableFileCreationReason::kFlush
:
745 case TableFileCreationReason::kCompaction
:
754 void CheckAndResetCounters(int flush_started
, int flush_finished
,
755 int flush_failure
, int compaction_started
,
756 int compaction_finished
, int compaction_failure
) {
757 ASSERT_EQ(started_
[0], flush_started
);
758 ASSERT_EQ(finished_
[0], flush_finished
);
759 ASSERT_EQ(failure_
[0], flush_failure
);
760 ASSERT_EQ(started_
[1], compaction_started
);
761 ASSERT_EQ(finished_
[1], compaction_finished
);
762 ASSERT_EQ(failure_
[1], compaction_failure
);
763 for (int i
= 0; i
< 2; i
++) {
764 started_
[i
] = finished_
[i
] = failure_
[i
] = 0;
768 void OnTableFileCreationStarted(
769 const TableFileCreationBriefInfo
& info
) override
{
770 int idx
= Index(info
.reason
);
774 ASSERT_GT(info
.db_name
.size(), 0U);
775 ASSERT_GT(info
.cf_name
.size(), 0U);
776 ASSERT_GT(info
.file_path
.size(), 0U);
777 ASSERT_GT(info
.job_id
, 0);
780 void OnTableFileCreated(const TableFileCreationInfo
& info
) override
{
781 int idx
= Index(info
.reason
);
785 ASSERT_GT(info
.db_name
.size(), 0U);
786 ASSERT_GT(info
.cf_name
.size(), 0U);
787 ASSERT_GT(info
.file_path
.size(), 0U);
788 ASSERT_GT(info
.job_id
, 0);
789 ASSERT_EQ(info
.file_checksum
, kUnknownFileChecksum
);
790 ASSERT_EQ(info
.file_checksum_func_name
, kUnknownFileChecksumFuncName
);
791 if (info
.status
.ok()) {
792 if (info
.table_properties
.num_range_deletions
== 0U) {
793 ASSERT_GT(info
.table_properties
.data_size
, 0U);
794 ASSERT_GT(info
.table_properties
.raw_key_size
, 0U);
795 ASSERT_GT(info
.table_properties
.raw_value_size
, 0U);
796 ASSERT_GT(info
.table_properties
.num_data_blocks
, 0U);
797 ASSERT_GT(info
.table_properties
.num_entries
, 0U);
802 last_failure_
= info
.status
;
810 Status last_failure_
;
813 TEST_F(EventListenerTest
, TableFileCreationListenersTest
) {
814 auto listener
= std::make_shared
<TableFileCreationListener
>();
816 std::unique_ptr
<TableFileCreationListener::TestEnv
> test_env(
817 new TableFileCreationListener::TestEnv(CurrentOptions().env
));
818 options
.create_if_missing
= true;
819 options
.listeners
.push_back(listener
);
820 options
.env
= test_env
.get();
821 DestroyAndReopen(options
);
823 ASSERT_OK(Put("foo", "aaa"));
824 ASSERT_OK(Put("bar", "bbb"));
826 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
827 listener
->CheckAndResetCounters(1, 1, 0, 0, 0, 0);
828 ASSERT_OK(Put("foo", "aaa1"));
829 ASSERT_OK(Put("bar", "bbb1"));
830 test_env
->SetStatus(Status::NotSupported("not supported"));
832 listener
->CheckAndResetCounters(1, 1, 1, 0, 0, 0);
833 ASSERT_TRUE(listener
->last_failure_
.IsNotSupported());
834 test_env
->SetStatus(Status::OK());
837 ASSERT_OK(Put("foo", "aaa2"));
838 ASSERT_OK(Put("bar", "bbb2"));
840 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
841 listener
->CheckAndResetCounters(1, 1, 0, 0, 0, 0);
843 const Slice kRangeStart
= "a";
844 const Slice kRangeEnd
= "z";
846 dbfull()->CompactRange(CompactRangeOptions(), &kRangeStart
, &kRangeEnd
));
847 ASSERT_OK(dbfull()->TEST_WaitForCompact());
848 listener
->CheckAndResetCounters(0, 0, 0, 1, 1, 0);
850 ASSERT_OK(Put("foo", "aaa3"));
851 ASSERT_OK(Put("bar", "bbb3"));
853 test_env
->SetStatus(Status::NotSupported("not supported"));
855 dbfull()->CompactRange(CompactRangeOptions(), &kRangeStart
, &kRangeEnd
));
856 ASSERT_NOK(dbfull()->TEST_WaitForCompact());
857 listener
->CheckAndResetCounters(1, 1, 0, 1, 1, 1);
858 ASSERT_TRUE(listener
->last_failure_
.IsNotSupported());
861 test_env
->SetStatus(Status::OK());
862 DestroyAndReopen(options
);
864 // Verify that an empty table file that is immediately deleted gives Aborted
865 // status to listener.
866 ASSERT_OK(Put("baz", "z"));
867 ASSERT_OK(SingleDelete("baz"));
869 listener
->CheckAndResetCounters(1, 1, 1, 0, 0, 0);
870 ASSERT_TRUE(listener
->last_failure_
.IsAborted());
872 // Also in compaction
873 ASSERT_OK(Put("baz", "z"));
875 ASSERT_OK(db_
->DeleteRange(WriteOptions(), db_
->DefaultColumnFamily(),
876 kRangeStart
, kRangeEnd
));
878 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
879 ASSERT_OK(dbfull()->TEST_WaitForCompact());
880 listener
->CheckAndResetCounters(2, 2, 0, 1, 1, 1);
881 ASSERT_TRUE(listener
->last_failure_
.IsAborted());
883 Close(); // Avoid UAF on listener
886 class MemTableSealedListener
: public EventListener
{
888 SequenceNumber latest_seq_number_
;
891 MemTableSealedListener() {}
892 void OnMemTableSealed(const MemTableInfo
& info
) override
{
893 latest_seq_number_
= info
.first_seqno
;
896 void OnFlushCompleted(DB
* /*db*/,
897 const FlushJobInfo
& flush_job_info
) override
{
898 ASSERT_LE(flush_job_info
.smallest_seqno
, latest_seq_number_
);
902 TEST_F(EventListenerTest
, MemTableSealedListenerTest
) {
903 auto listener
= std::make_shared
<MemTableSealedListener
>();
905 options
.env
= CurrentOptions().env
;
906 options
.create_if_missing
= true;
907 options
.listeners
.push_back(listener
);
908 DestroyAndReopen(options
);
910 for (unsigned int i
= 0; i
< 10; i
++) {
911 std::string tag
= std::to_string(i
);
912 ASSERT_OK(Put("foo" + tag
, "aaa"));
913 ASSERT_OK(Put("bar" + tag
, "bbb"));
919 class ColumnFamilyHandleDeletionStartedListener
: public EventListener
{
921 std::vector
<std::string
> cfs_
;
925 explicit ColumnFamilyHandleDeletionStartedListener(
926 const std::vector
<std::string
>& cfs
)
927 : cfs_(cfs
), counter(0) {
928 cfs_
.insert(cfs_
.begin(), kDefaultColumnFamilyName
);
930 void OnColumnFamilyHandleDeletionStarted(
931 ColumnFamilyHandle
* handle
) override
{
932 ASSERT_EQ(cfs_
[handle
->GetID()], handle
->GetName());
935 int getCounter() { return counter
; }
938 TEST_F(EventListenerTest
, ColumnFamilyHandleDeletionStartedListenerTest
) {
939 std::vector
<std::string
> cfs
{"pikachu", "eevee", "Mewtwo"};
941 std::make_shared
<ColumnFamilyHandleDeletionStartedListener
>(cfs
);
943 options
.env
= CurrentOptions().env
;
944 options
.create_if_missing
= true;
945 options
.listeners
.push_back(listener
);
946 CreateAndReopenWithCF(cfs
, options
);
947 ASSERT_EQ(handles_
.size(), 4);
952 ASSERT_EQ(listener
->getCounter(), 3);
955 class BackgroundErrorListener
: public EventListener
{
961 BackgroundErrorListener(SpecialEnv
* env
) : env_(env
), counter_(0) {}
963 void OnBackgroundError(BackgroundErrorReason
/*reason*/,
964 Status
* bg_error
) override
{
966 // suppress the first error and disable write-dropping such that a retry
968 *bg_error
= Status::OK();
969 env_
->drop_writes_
.store(false, std::memory_order_release
);
970 env_
->SetMockSleep(false);
975 int counter() { return counter_
; }
978 TEST_F(EventListenerTest
, BackgroundErrorListenerFailedFlushTest
) {
979 auto listener
= std::make_shared
<BackgroundErrorListener
>(env_
);
981 options
.create_if_missing
= true;
983 options
.listeners
.push_back(listener
);
984 options
.memtable_factory
.reset(test::NewSpecialSkipListFactory(1));
985 options
.paranoid_checks
= true;
986 DestroyAndReopen(options
);
988 // the usual TEST_WaitForFlushMemTable() doesn't work for failed flushes, so
989 // forge a custom one for the failed flush case.
990 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
991 {{"DBImpl::BGWorkFlush:done",
992 "EventListenerTest:BackgroundErrorListenerFailedFlushTest:1"}});
993 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
995 env_
->drop_writes_
.store(true, std::memory_order_release
);
996 env_
->SetMockSleep();
998 ASSERT_OK(Put("key0", "val"));
999 ASSERT_OK(Put("key1", "val"));
1000 TEST_SYNC_POINT("EventListenerTest:BackgroundErrorListenerFailedFlushTest:1");
1001 ASSERT_EQ(1, listener
->counter());
1002 ASSERT_OK(Put("key2", "val"));
1003 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
1004 ASSERT_EQ(1, NumTableFilesAtLevel(0));
1007 TEST_F(EventListenerTest
, BackgroundErrorListenerFailedCompactionTest
) {
1008 auto listener
= std::make_shared
<BackgroundErrorListener
>(env_
);
1010 options
.create_if_missing
= true;
1011 options
.disable_auto_compactions
= true;
1013 options
.level0_file_num_compaction_trigger
= 2;
1014 options
.listeners
.push_back(listener
);
1015 options
.memtable_factory
.reset(test::NewSpecialSkipListFactory(2));
1016 options
.paranoid_checks
= true;
1017 DestroyAndReopen(options
);
1019 // third iteration triggers the second memtable's flush
1020 for (int i
= 0; i
< 3; ++i
) {
1021 ASSERT_OK(Put("key0", "val"));
1023 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
1025 ASSERT_OK(Put("key1", "val"));
1027 ASSERT_EQ(2, NumTableFilesAtLevel(0));
1029 env_
->drop_writes_
.store(true, std::memory_order_release
);
1030 env_
->SetMockSleep();
1031 ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
1032 ASSERT_OK(dbfull()->TEST_WaitForCompact());
1033 ASSERT_EQ(1, listener
->counter());
1035 // trigger flush so compaction is triggered again; this time it succeeds
1036 // The previous failed compaction may get retried automatically, so we may
1037 // be left with 0 or 1 files in level 1, depending on when the retry gets
1039 ASSERT_OK(Put("key0", "val"));
1040 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
1041 ASSERT_OK(dbfull()->TEST_WaitForCompact());
1042 ASSERT_LE(1, NumTableFilesAtLevel(0));
1045 class TestFileOperationListener
: public EventListener
{
1047 TestFileOperationListener() {
1048 file_reads_
.store(0);
1049 file_reads_success_
.store(0);
1050 file_writes_
.store(0);
1051 file_writes_success_
.store(0);
1052 file_flushes_
.store(0);
1053 file_flushes_success_
.store(0);
1054 file_closes_
.store(0);
1055 file_closes_success_
.store(0);
1056 file_syncs_
.store(0);
1057 file_syncs_success_
.store(0);
1058 file_truncates_
.store(0);
1059 file_truncates_success_
.store(0);
1060 file_seq_reads_
.store(0);
1061 blob_file_reads_
.store(0);
1062 blob_file_writes_
.store(0);
1063 blob_file_flushes_
.store(0);
1064 blob_file_closes_
.store(0);
1065 blob_file_syncs_
.store(0);
1066 blob_file_truncates_
.store(0);
1069 void OnFileReadFinish(const FileOperationInfo
& info
) override
{
1071 if (info
.status
.ok()) {
1072 ++file_reads_success_
;
1074 if (info
.path
.find("MANIFEST") != std::string::npos
) {
1077 if (EndsWith(info
.path
, ".blob")) {
1080 ReportDuration(info
);
1083 void OnFileWriteFinish(const FileOperationInfo
& info
) override
{
1085 if (info
.status
.ok()) {
1086 ++file_writes_success_
;
1088 if (EndsWith(info
.path
, ".blob")) {
1089 ++blob_file_writes_
;
1091 ReportDuration(info
);
1094 void OnFileFlushFinish(const FileOperationInfo
& info
) override
{
1096 if (info
.status
.ok()) {
1097 ++file_flushes_success_
;
1099 if (EndsWith(info
.path
, ".blob")) {
1100 ++blob_file_flushes_
;
1102 ReportDuration(info
);
1105 void OnFileCloseFinish(const FileOperationInfo
& info
) override
{
1107 if (info
.status
.ok()) {
1108 ++file_closes_success_
;
1110 if (EndsWith(info
.path
, ".blob")) {
1111 ++blob_file_closes_
;
1113 ReportDuration(info
);
1116 void OnFileSyncFinish(const FileOperationInfo
& info
) override
{
1118 if (info
.status
.ok()) {
1119 ++file_syncs_success_
;
1121 if (EndsWith(info
.path
, ".blob")) {
1124 ReportDuration(info
);
1127 void OnFileTruncateFinish(const FileOperationInfo
& info
) override
{
1129 if (info
.status
.ok()) {
1130 ++file_truncates_success_
;
1132 if (EndsWith(info
.path
, ".blob")) {
1133 ++blob_file_truncates_
;
1135 ReportDuration(info
);
1138 bool ShouldBeNotifiedOnFileIO() override
{ return true; }
1140 std::atomic
<size_t> file_reads_
;
1141 std::atomic
<size_t> file_reads_success_
;
1142 std::atomic
<size_t> file_writes_
;
1143 std::atomic
<size_t> file_writes_success_
;
1144 std::atomic
<size_t> file_flushes_
;
1145 std::atomic
<size_t> file_flushes_success_
;
1146 std::atomic
<size_t> file_closes_
;
1147 std::atomic
<size_t> file_closes_success_
;
1148 std::atomic
<size_t> file_syncs_
;
1149 std::atomic
<size_t> file_syncs_success_
;
1150 std::atomic
<size_t> file_truncates_
;
1151 std::atomic
<size_t> file_truncates_success_
;
1152 std::atomic
<size_t> file_seq_reads_
;
1153 std::atomic
<size_t> blob_file_reads_
;
1154 std::atomic
<size_t> blob_file_writes_
;
1155 std::atomic
<size_t> blob_file_flushes_
;
1156 std::atomic
<size_t> blob_file_closes_
;
1157 std::atomic
<size_t> blob_file_syncs_
;
1158 std::atomic
<size_t> blob_file_truncates_
;
1161 void ReportDuration(const FileOperationInfo
& info
) const {
1162 ASSERT_GT(info
.duration
.count(), 0);
1166 TEST_F(EventListenerTest
, OnFileOperationTest
) {
1168 options
.env
= CurrentOptions().env
;
1169 options
.create_if_missing
= true;
1171 TestFileOperationListener
* listener
= new TestFileOperationListener();
1172 options
.listeners
.emplace_back(listener
);
1174 options
.use_direct_io_for_flush_and_compaction
= false;
1175 Status s
= TryReopen(options
);
1176 if (s
.IsInvalidArgument()) {
1177 options
.use_direct_io_for_flush_and_compaction
= false;
1181 DestroyAndReopen(options
);
1182 ASSERT_OK(Put("foo", "aaa"));
1183 ASSERT_OK(dbfull()->Flush(FlushOptions()));
1184 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
1185 ASSERT_GE(listener
->file_writes_
.load(),
1186 listener
->file_writes_success_
.load());
1187 ASSERT_GT(listener
->file_writes_
.load(), 0);
1188 ASSERT_GE(listener
->file_flushes_
.load(),
1189 listener
->file_flushes_success_
.load());
1190 ASSERT_GT(listener
->file_flushes_
.load(), 0);
1194 ASSERT_GE(listener
->file_reads_
.load(), listener
->file_reads_success_
.load());
1195 ASSERT_GT(listener
->file_reads_
.load(), 0);
1196 ASSERT_GE(listener
->file_closes_
.load(),
1197 listener
->file_closes_success_
.load());
1198 ASSERT_GT(listener
->file_closes_
.load(), 0);
1199 ASSERT_GE(listener
->file_syncs_
.load(), listener
->file_syncs_success_
.load());
1200 ASSERT_GT(listener
->file_syncs_
.load(), 0);
1201 if (true == options
.use_direct_io_for_flush_and_compaction
) {
1202 ASSERT_GE(listener
->file_truncates_
.load(),
1203 listener
->file_truncates_success_
.load());
1204 ASSERT_GT(listener
->file_truncates_
.load(), 0);
1208 TEST_F(EventListenerTest
, OnBlobFileOperationTest
) {
1210 options
.env
= CurrentOptions().env
;
1211 options
.create_if_missing
= true;
1212 TestFileOperationListener
* listener
= new TestFileOperationListener();
1213 options
.listeners
.emplace_back(listener
);
1214 options
.disable_auto_compactions
= true;
1215 options
.enable_blob_files
= true;
1216 options
.min_blob_size
= 0;
1217 options
.enable_blob_garbage_collection
= true;
1218 options
.blob_garbage_collection_age_cutoff
= 0.5;
1220 DestroyAndReopen(options
);
1222 ASSERT_OK(Put("Key1", "blob_value1"));
1223 ASSERT_OK(Put("Key2", "blob_value2"));
1224 ASSERT_OK(Put("Key3", "blob_value3"));
1225 ASSERT_OK(Put("Key4", "blob_value4"));
1228 ASSERT_OK(Put("Key3", "new_blob_value3"));
1229 ASSERT_OK(Put("Key4", "new_blob_value4"));
1232 ASSERT_OK(Put("Key5", "blob_value5"));
1233 ASSERT_OK(Put("Key6", "blob_value6"));
1236 ASSERT_GT(listener
->blob_file_writes_
.load(), 0U);
1237 ASSERT_GT(listener
->blob_file_flushes_
.load(), 0U);
1241 ASSERT_GT(listener
->blob_file_closes_
.load(), 0U);
1242 ASSERT_GT(listener
->blob_file_syncs_
.load(), 0U);
1243 if (true == options
.use_direct_io_for_flush_and_compaction
) {
1244 ASSERT_GT(listener
->blob_file_truncates_
.load(), 0U);
1248 TEST_F(EventListenerTest
, ReadManifestAndWALOnRecovery
) {
1250 options
.env
= CurrentOptions().env
;
1251 options
.create_if_missing
= true;
1253 TestFileOperationListener
* listener
= new TestFileOperationListener();
1254 options
.listeners
.emplace_back(listener
);
1256 options
.use_direct_io_for_flush_and_compaction
= false;
1257 Status s
= TryReopen(options
);
1258 if (s
.IsInvalidArgument()) {
1259 options
.use_direct_io_for_flush_and_compaction
= false;
1263 DestroyAndReopen(options
);
1264 ASSERT_OK(Put("foo", "aaa"));
1267 size_t seq_reads
= listener
->file_seq_reads_
.load();
1269 ASSERT_GT(listener
->file_seq_reads_
.load(), seq_reads
);
1272 class BlobDBJobLevelEventListenerTest
: public EventListener
{
1274 explicit BlobDBJobLevelEventListenerTest(EventListenerTest
* test
)
1275 : test_(test
), call_count_(0) {}
1277 const VersionStorageInfo
* GetVersionStorageInfo() const {
1278 VersionSet
* const versions
= test_
->dbfull()->GetVersionSet();
1281 ColumnFamilyData
* const cfd
= versions
->GetColumnFamilySet()->GetDefault();
1282 EXPECT_NE(cfd
, nullptr);
1284 Version
* const current
= cfd
->current();
1285 EXPECT_NE(current
, nullptr);
1287 const VersionStorageInfo
* const storage_info
= current
->storage_info();
1288 EXPECT_NE(storage_info
, nullptr);
1290 return storage_info
;
1293 void CheckBlobFileAdditions(
1294 const std::vector
<BlobFileAdditionInfo
>& blob_file_addition_infos
) const {
1295 const auto* vstorage
= GetVersionStorageInfo();
1297 EXPECT_FALSE(blob_file_addition_infos
.empty());
1299 for (const auto& blob_file_addition_info
: blob_file_addition_infos
) {
1300 const auto meta
= vstorage
->GetBlobFileMetaData(
1301 blob_file_addition_info
.blob_file_number
);
1303 EXPECT_NE(meta
, nullptr);
1304 EXPECT_EQ(meta
->GetBlobFileNumber(),
1305 blob_file_addition_info
.blob_file_number
);
1306 EXPECT_EQ(meta
->GetTotalBlobBytes(),
1307 blob_file_addition_info
.total_blob_bytes
);
1308 EXPECT_EQ(meta
->GetTotalBlobCount(),
1309 blob_file_addition_info
.total_blob_count
);
1310 EXPECT_FALSE(blob_file_addition_info
.blob_file_path
.empty());
1314 std::vector
<std::string
> GetFlushedFiles() {
1315 std::lock_guard
<std::mutex
> lock(mutex_
);
1316 std::vector
<std::string
> result
;
1317 for (const auto& fname
: flushed_files_
) {
1318 result
.push_back(fname
);
1323 void OnFlushCompleted(DB
* /*db*/, const FlushJobInfo
& info
) override
{
1327 std::lock_guard
<std::mutex
> lock(mutex_
);
1328 flushed_files_
.push_back(info
.file_path
);
1331 EXPECT_EQ(info
.blob_compression_type
, kNoCompression
);
1333 CheckBlobFileAdditions(info
.blob_file_addition_infos
);
1336 void OnCompactionCompleted(DB
* /*db*/,
1337 const CompactionJobInfo
& info
) override
{
1340 EXPECT_EQ(info
.blob_compression_type
, kNoCompression
);
1342 CheckBlobFileAdditions(info
.blob_file_addition_infos
);
1344 EXPECT_FALSE(info
.blob_file_garbage_infos
.empty());
1346 for (const auto& blob_file_garbage_info
: info
.blob_file_garbage_infos
) {
1347 EXPECT_GT(blob_file_garbage_info
.blob_file_number
, 0U);
1348 EXPECT_GT(blob_file_garbage_info
.garbage_blob_count
, 0U);
1349 EXPECT_GT(blob_file_garbage_info
.garbage_blob_bytes
, 0U);
1350 EXPECT_FALSE(blob_file_garbage_info
.blob_file_path
.empty());
1354 EventListenerTest
* test_
;
1355 uint32_t call_count_
;
1358 std::vector
<std::string
> flushed_files_
;
1362 // Test OnFlushCompleted EventListener called for blob files
1363 TEST_F(EventListenerTest
, BlobDBOnFlushCompleted
) {
1365 options
.env
= CurrentOptions().env
;
1366 options
.enable_blob_files
= true;
1367 options
.create_if_missing
= true;
1368 options
.disable_auto_compactions
= true;
1370 options
.min_blob_size
= 0;
1371 BlobDBJobLevelEventListenerTest
* blob_event_listener
=
1372 new BlobDBJobLevelEventListenerTest(this);
1373 options
.listeners
.emplace_back(blob_event_listener
);
1375 DestroyAndReopen(options
);
1377 ASSERT_OK(Put("Key1", "blob_value1"));
1378 ASSERT_OK(Put("Key2", "blob_value2"));
1381 ASSERT_OK(Put("Key3", "blob_value3"));
1384 ASSERT_EQ(Get("Key1"), "blob_value1");
1385 ASSERT_EQ(Get("Key2"), "blob_value2");
1386 ASSERT_EQ(Get("Key3"), "blob_value3");
1388 ASSERT_GT(blob_event_listener
->call_count_
, 0U);
1391 // Test OnCompactionCompleted EventListener called for blob files
1392 TEST_F(EventListenerTest
, BlobDBOnCompactionCompleted
) {
1394 options
.env
= CurrentOptions().env
;
1395 options
.enable_blob_files
= true;
1396 options
.create_if_missing
= true;
1397 options
.disable_auto_compactions
= true;
1398 options
.min_blob_size
= 0;
1399 BlobDBJobLevelEventListenerTest
* blob_event_listener
=
1400 new BlobDBJobLevelEventListenerTest(this);
1401 options
.listeners
.emplace_back(blob_event_listener
);
1403 options
.enable_blob_garbage_collection
= true;
1404 options
.blob_garbage_collection_age_cutoff
= 0.5;
1406 DestroyAndReopen(options
);
1408 ASSERT_OK(Put("Key1", "blob_value1"));
1409 ASSERT_OK(Put("Key2", "blob_value2"));
1410 ASSERT_OK(Put("Key3", "blob_value3"));
1411 ASSERT_OK(Put("Key4", "blob_value4"));
1414 ASSERT_OK(Put("Key3", "new_blob_value3"));
1415 ASSERT_OK(Put("Key4", "new_blob_value4"));
1418 ASSERT_OK(Put("Key5", "blob_value5"));
1419 ASSERT_OK(Put("Key6", "blob_value6"));
1422 blob_event_listener
->call_count_
= 0;
1423 constexpr Slice
* begin
= nullptr;
1424 constexpr Slice
* end
= nullptr;
1426 // On compaction, because of blob_garbage_collection_age_cutoff, it will
1427 // delete the oldest blob file and create new blob file during compaction.
1428 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), begin
, end
));
1430 // Make sure, OnCompactionCompleted is called.
1431 ASSERT_GT(blob_event_listener
->call_count_
, 0U);
1434 // Test CompactFiles calls OnCompactionCompleted EventListener for blob files
1435 // and populate the blob files info.
1436 TEST_F(EventListenerTest
, BlobDBCompactFiles
) {
1438 options
.env
= CurrentOptions().env
;
1439 options
.enable_blob_files
= true;
1440 options
.create_if_missing
= true;
1441 options
.disable_auto_compactions
= true;
1442 options
.min_blob_size
= 0;
1443 options
.enable_blob_garbage_collection
= true;
1444 options
.blob_garbage_collection_age_cutoff
= 0.5;
1446 BlobDBJobLevelEventListenerTest
* blob_event_listener
=
1447 new BlobDBJobLevelEventListenerTest(this);
1448 options
.listeners
.emplace_back(blob_event_listener
);
1450 DestroyAndReopen(options
);
1452 ASSERT_OK(Put("Key1", "blob_value1"));
1453 ASSERT_OK(Put("Key2", "blob_value2"));
1454 ASSERT_OK(Put("Key3", "blob_value3"));
1455 ASSERT_OK(Put("Key4", "blob_value4"));
1458 ASSERT_OK(Put("Key3", "new_blob_value3"));
1459 ASSERT_OK(Put("Key4", "new_blob_value4"));
1462 ASSERT_OK(Put("Key5", "blob_value5"));
1463 ASSERT_OK(Put("Key6", "blob_value6"));
1466 std::vector
<std::string
> output_file_names
;
1467 CompactionJobInfo compaction_job_info
;
1469 // On compaction, because of blob_garbage_collection_age_cutoff, it will
1470 // delete the oldest blob file and create new blob file during compaction
1471 // which will be populated in output_files_names.
1472 ASSERT_OK(dbfull()->CompactFiles(
1473 CompactionOptions(), blob_event_listener
->GetFlushedFiles(), 1, -1,
1474 &output_file_names
, &compaction_job_info
));
1476 bool is_blob_in_output
= false;
1477 for (const auto& file
: output_file_names
) {
1478 if (EndsWith(file
, ".blob")) {
1479 is_blob_in_output
= true;
1482 ASSERT_TRUE(is_blob_in_output
);
1484 for (const auto& blob_file_addition_info
:
1485 compaction_job_info
.blob_file_addition_infos
) {
1486 EXPECT_GT(blob_file_addition_info
.blob_file_number
, 0U);
1487 EXPECT_GT(blob_file_addition_info
.total_blob_bytes
, 0U);
1488 EXPECT_GT(blob_file_addition_info
.total_blob_count
, 0U);
1489 EXPECT_FALSE(blob_file_addition_info
.blob_file_path
.empty());
1492 for (const auto& blob_file_garbage_info
:
1493 compaction_job_info
.blob_file_garbage_infos
) {
1494 EXPECT_GT(blob_file_garbage_info
.blob_file_number
, 0U);
1495 EXPECT_GT(blob_file_garbage_info
.garbage_blob_count
, 0U);
1496 EXPECT_GT(blob_file_garbage_info
.garbage_blob_bytes
, 0U);
1497 EXPECT_FALSE(blob_file_garbage_info
.blob_file_path
.empty());
1501 class BlobDBFileLevelEventListener
: public EventListener
{
1503 void OnBlobFileCreationStarted(
1504 const BlobFileCreationBriefInfo
& info
) override
{
1506 EXPECT_FALSE(info
.db_name
.empty());
1507 EXPECT_FALSE(info
.cf_name
.empty());
1508 EXPECT_FALSE(info
.file_path
.empty());
1509 EXPECT_GT(info
.job_id
, 0);
1512 void OnBlobFileCreated(const BlobFileCreationInfo
& info
) override
{
1514 EXPECT_FALSE(info
.db_name
.empty());
1515 EXPECT_FALSE(info
.cf_name
.empty());
1516 EXPECT_FALSE(info
.file_path
.empty());
1517 EXPECT_GT(info
.job_id
, 0);
1518 EXPECT_GT(info
.total_blob_count
, 0U);
1519 EXPECT_GT(info
.total_blob_bytes
, 0U);
1520 EXPECT_EQ(info
.file_checksum
, kUnknownFileChecksum
);
1521 EXPECT_EQ(info
.file_checksum_func_name
, kUnknownFileChecksumFuncName
);
1522 EXPECT_TRUE(info
.status
.ok());
1525 void OnBlobFileDeleted(const BlobFileDeletionInfo
& info
) override
{
1527 EXPECT_FALSE(info
.db_name
.empty());
1528 EXPECT_FALSE(info
.file_path
.empty());
1529 EXPECT_GT(info
.job_id
, 0);
1530 EXPECT_TRUE(info
.status
.ok());
1533 void CheckCounters() {
1534 EXPECT_EQ(files_started_
, files_created_
);
1535 EXPECT_GT(files_started_
, 0U);
1536 EXPECT_GT(files_deleted_
, 0U);
1537 EXPECT_LT(files_deleted_
, files_created_
);
1541 std::atomic
<uint32_t> files_started_
{};
1542 std::atomic
<uint32_t> files_created_
{};
1543 std::atomic
<uint32_t> files_deleted_
{};
1546 TEST_F(EventListenerTest
, BlobDBFileTest
) {
1548 options
.env
= CurrentOptions().env
;
1549 options
.enable_blob_files
= true;
1550 options
.create_if_missing
= true;
1551 options
.disable_auto_compactions
= true;
1552 options
.min_blob_size
= 0;
1553 options
.enable_blob_garbage_collection
= true;
1554 options
.blob_garbage_collection_age_cutoff
= 0.5;
1556 BlobDBFileLevelEventListener
* blob_event_listener
=
1557 new BlobDBFileLevelEventListener();
1558 options
.listeners
.emplace_back(blob_event_listener
);
1560 DestroyAndReopen(options
);
1562 ASSERT_OK(Put("Key1", "blob_value1"));
1563 ASSERT_OK(Put("Key2", "blob_value2"));
1564 ASSERT_OK(Put("Key3", "blob_value3"));
1565 ASSERT_OK(Put("Key4", "blob_value4"));
1568 ASSERT_OK(Put("Key3", "new_blob_value3"));
1569 ASSERT_OK(Put("Key4", "new_blob_value4"));
1572 ASSERT_OK(Put("Key5", "blob_value5"));
1573 ASSERT_OK(Put("Key6", "blob_value6"));
1576 constexpr Slice
* begin
= nullptr;
1577 constexpr Slice
* end
= nullptr;
1579 // On compaction, because of blob_garbage_collection_age_cutoff, it will
1580 // delete the oldest blob file and create new blob file during compaction.
1581 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), begin
, end
));
1582 ASSERT_OK(dbfull()->TEST_WaitForCompact());
1584 blob_event_listener
->CheckCounters();
1587 } // namespace ROCKSDB_NAMESPACE
1589 #endif // ROCKSDB_LITE
1591 int main(int argc
, char** argv
) {
1592 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
1593 ::testing::InitGoogleTest(&argc
, argv
);
1594 return RUN_ALL_TESTS();