1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
8 #include "db/db_test_util.h"
9 #include "port/stack_trace.h"
10 #include "table/unique_id_impl.h"
12 namespace ROCKSDB_NAMESPACE
{
14 class MyTestCompactionService
: public CompactionService
{
16 MyTestCompactionService(
17 std::string db_path
, Options
& options
,
18 std::shared_ptr
<Statistics
>& statistics
,
19 std::vector
<std::shared_ptr
<EventListener
>>& listeners
,
20 std::vector
<std::shared_ptr
<TablePropertiesCollectorFactory
>>
21 table_properties_collector_factories
)
22 : db_path_(std::move(db_path
)),
24 statistics_(statistics
),
25 start_info_("na", "na", "na", 0, Env::TOTAL
),
26 wait_info_("na", "na", "na", 0, Env::TOTAL
),
27 listeners_(listeners
),
28 table_properties_collector_factories_(
29 std::move(table_properties_collector_factories
)) {}
31 static const char* kClassName() { return "MyTestCompactionService"; }
33 const char* Name() const override
{ return kClassName(); }
35 CompactionServiceJobStatus
StartV2(
36 const CompactionServiceJobInfo
& info
,
37 const std::string
& compaction_service_input
) override
{
38 InstrumentedMutexLock
l(&mutex_
);
40 assert(info
.db_name
== db_path_
);
41 jobs_
.emplace(info
.job_id
, compaction_service_input
);
42 CompactionServiceJobStatus s
= CompactionServiceJobStatus::kSuccess
;
43 if (is_override_start_status_
) {
44 return override_start_status_
;
49 CompactionServiceJobStatus
WaitForCompleteV2(
50 const CompactionServiceJobInfo
& info
,
51 std::string
* compaction_service_result
) override
{
52 std::string compaction_input
;
53 assert(info
.db_name
== db_path_
);
55 InstrumentedMutexLock
l(&mutex_
);
57 auto i
= jobs_
.find(info
.job_id
);
58 if (i
== jobs_
.end()) {
59 return CompactionServiceJobStatus::kFailure
;
61 compaction_input
= std::move(i
->second
);
65 if (is_override_wait_status_
) {
66 return override_wait_status_
;
69 CompactionServiceOptionsOverride options_override
;
70 options_override
.env
= options_
.env
;
71 options_override
.file_checksum_gen_factory
=
72 options_
.file_checksum_gen_factory
;
73 options_override
.comparator
= options_
.comparator
;
74 options_override
.merge_operator
= options_
.merge_operator
;
75 options_override
.compaction_filter
= options_
.compaction_filter
;
76 options_override
.compaction_filter_factory
=
77 options_
.compaction_filter_factory
;
78 options_override
.prefix_extractor
= options_
.prefix_extractor
;
79 options_override
.table_factory
= options_
.table_factory
;
80 options_override
.sst_partitioner_factory
= options_
.sst_partitioner_factory
;
81 options_override
.statistics
= statistics_
;
82 if (!listeners_
.empty()) {
83 options_override
.listeners
= listeners_
;
86 if (!table_properties_collector_factories_
.empty()) {
87 options_override
.table_properties_collector_factories
=
88 table_properties_collector_factories_
;
91 OpenAndCompactOptions options
;
92 options
.canceled
= &canceled_
;
94 Status s
= DB::OpenAndCompact(
95 options
, db_path_
, db_path_
+ "/" + std::to_string(info
.job_id
),
96 compaction_input
, compaction_service_result
, options_override
);
97 if (is_override_wait_result_
) {
98 *compaction_service_result
= override_wait_result_
;
100 compaction_num_
.fetch_add(1);
102 return CompactionServiceJobStatus::kSuccess
;
104 return CompactionServiceJobStatus::kFailure
;
108 int GetCompactionNum() { return compaction_num_
.load(); }
110 CompactionServiceJobInfo
GetCompactionInfoForStart() { return start_info_
; }
111 CompactionServiceJobInfo
GetCompactionInfoForWait() { return wait_info_
; }
113 void OverrideStartStatus(CompactionServiceJobStatus s
) {
114 is_override_start_status_
= true;
115 override_start_status_
= s
;
118 void OverrideWaitStatus(CompactionServiceJobStatus s
) {
119 is_override_wait_status_
= true;
120 override_wait_status_
= s
;
123 void OverrideWaitResult(std::string str
) {
124 is_override_wait_result_
= true;
125 override_wait_result_
= std::move(str
);
128 void ResetOverride() {
129 is_override_wait_result_
= false;
130 is_override_start_status_
= false;
131 is_override_wait_status_
= false;
134 void SetCanceled(bool canceled
) { canceled_
= canceled
; }
137 InstrumentedMutex mutex_
;
138 std::atomic_int compaction_num_
{0};
139 std::map
<uint64_t, std::string
> jobs_
;
140 const std::string db_path_
;
142 std::shared_ptr
<Statistics
> statistics_
;
143 CompactionServiceJobInfo start_info_
;
144 CompactionServiceJobInfo wait_info_
;
145 bool is_override_start_status_
= false;
146 CompactionServiceJobStatus override_start_status_
=
147 CompactionServiceJobStatus::kFailure
;
148 bool is_override_wait_status_
= false;
149 CompactionServiceJobStatus override_wait_status_
=
150 CompactionServiceJobStatus::kFailure
;
151 bool is_override_wait_result_
= false;
152 std::string override_wait_result_
;
153 std::vector
<std::shared_ptr
<EventListener
>> listeners_
;
154 std::vector
<std::shared_ptr
<TablePropertiesCollectorFactory
>>
155 table_properties_collector_factories_
;
156 std::atomic_bool canceled_
{false};
159 class CompactionServiceTest
: public DBTestBase
{
161 explicit CompactionServiceTest()
162 : DBTestBase("compaction_service_test", true) {}
165 void ReopenWithCompactionService(Options
* options
) {
167 primary_statistics_
= CreateDBStatistics();
168 options
->statistics
= primary_statistics_
;
169 compactor_statistics_
= CreateDBStatistics();
171 compaction_service_
= std::make_shared
<MyTestCompactionService
>(
172 dbname_
, *options
, compactor_statistics_
, remote_listeners
,
173 remote_table_properties_collector_factories
);
174 options
->compaction_service
= compaction_service_
;
175 DestroyAndReopen(*options
);
178 Statistics
* GetCompactorStatistics() { return compactor_statistics_
.get(); }
180 Statistics
* GetPrimaryStatistics() { return primary_statistics_
.get(); }
182 MyTestCompactionService
* GetCompactionService() {
183 CompactionService
* cs
= compaction_service_
.get();
184 return static_cast_with_check
<MyTestCompactionService
>(cs
);
187 void GenerateTestData() {
188 // Generate 20 files @ L2
189 for (int i
= 0; i
< 20; i
++) {
190 for (int j
= 0; j
< 10; j
++) {
191 int key_id
= i
* 10 + j
;
192 ASSERT_OK(Put(Key(key_id
), "value" + std::to_string(key_id
)));
198 // Generate 10 files @ L1 overlap with all 20 files @ L2
199 for (int i
= 0; i
< 10; i
++) {
200 for (int j
= 0; j
< 10; j
++) {
201 int key_id
= i
* 20 + j
* 2;
202 ASSERT_OK(Put(Key(key_id
), "value_new" + std::to_string(key_id
)));
207 ASSERT_EQ(FilesPerLevel(), "0,10,20");
210 void VerifyTestData() {
211 for (int i
= 0; i
< 200; i
++) {
212 auto result
= Get(Key(i
));
214 ASSERT_EQ(result
, "value" + std::to_string(i
));
216 ASSERT_EQ(result
, "value_new" + std::to_string(i
));
221 std::vector
<std::shared_ptr
<EventListener
>> remote_listeners
;
222 std::vector
<std::shared_ptr
<TablePropertiesCollectorFactory
>>
223 remote_table_properties_collector_factories
;
226 std::shared_ptr
<Statistics
> compactor_statistics_
;
227 std::shared_ptr
<Statistics
> primary_statistics_
;
228 std::shared_ptr
<CompactionService
> compaction_service_
;
231 TEST_F(CompactionServiceTest
, BasicCompactions
) {
232 Options options
= CurrentOptions();
233 ReopenWithCompactionService(&options
);
235 Statistics
* primary_statistics
= GetPrimaryStatistics();
236 Statistics
* compactor_statistics
= GetCompactorStatistics();
238 for (int i
= 0; i
< 20; i
++) {
239 for (int j
= 0; j
< 10; j
++) {
240 int key_id
= i
* 10 + j
;
241 ASSERT_OK(Put(Key(key_id
), "value" + std::to_string(key_id
)));
246 for (int i
= 0; i
< 10; i
++) {
247 for (int j
= 0; j
< 10; j
++) {
248 int key_id
= i
* 20 + j
* 2;
249 ASSERT_OK(Put(Key(key_id
), "value_new" + std::to_string(key_id
)));
253 ASSERT_OK(dbfull()->TEST_WaitForCompact());
256 for (int i
= 0; i
< 200; i
++) {
257 auto result
= Get(Key(i
));
259 ASSERT_EQ(result
, "value" + std::to_string(i
));
261 ASSERT_EQ(result
, "value_new" + std::to_string(i
));
264 auto my_cs
= GetCompactionService();
265 ASSERT_GE(my_cs
->GetCompactionNum(), 1);
267 // make sure the compaction statistics is only recorded on the remote side
268 ASSERT_GE(compactor_statistics
->getTickerCount(COMPACT_WRITE_BYTES
), 1);
269 ASSERT_GE(compactor_statistics
->getTickerCount(COMPACT_READ_BYTES
), 1);
270 ASSERT_EQ(primary_statistics
->getTickerCount(COMPACT_WRITE_BYTES
), 0);
271 // even with remote compaction, primary host still needs to read SST files to
273 ASSERT_GE(primary_statistics
->getTickerCount(COMPACT_READ_BYTES
), 1);
274 // all the compaction write happens on the remote side
275 ASSERT_EQ(primary_statistics
->getTickerCount(REMOTE_COMPACT_WRITE_BYTES
),
276 compactor_statistics
->getTickerCount(COMPACT_WRITE_BYTES
));
277 ASSERT_GE(primary_statistics
->getTickerCount(REMOTE_COMPACT_READ_BYTES
), 1);
278 ASSERT_GT(primary_statistics
->getTickerCount(COMPACT_READ_BYTES
),
279 primary_statistics
->getTickerCount(REMOTE_COMPACT_READ_BYTES
));
280 // compactor is already the remote side, which doesn't have remote
281 ASSERT_EQ(compactor_statistics
->getTickerCount(REMOTE_COMPACT_READ_BYTES
), 0);
282 ASSERT_EQ(compactor_statistics
->getTickerCount(REMOTE_COMPACT_WRITE_BYTES
),
285 // Test failed compaction
286 SyncPoint::GetInstance()->SetCallBack(
287 "DBImplSecondary::CompactWithoutInstallation::End", [&](void* status
) {
288 // override job status
289 auto s
= static_cast<Status
*>(status
);
290 *s
= Status::Aborted("MyTestCompactionService failed to compact!");
292 SyncPoint::GetInstance()->EnableProcessing();
295 for (int i
= 0; i
< 10; i
++) {
296 for (int j
= 0; j
< 10; j
++) {
297 int key_id
= i
* 20 + j
* 2;
298 s
= Put(Key(key_id
), "value_new" + std::to_string(key_id
));
310 s
= dbfull()->TEST_WaitForCompact();
315 ASSERT_TRUE(s
.IsAborted());
317 // Test re-open and successful unique id verification
318 std::atomic_int verify_passed
{0};
319 SyncPoint::GetInstance()->SetCallBack(
320 "BlockBasedTable::Open::PassedVerifyUniqueId", [&](void* arg
) {
321 // override job status
322 auto id
= static_cast<UniqueId64x2
*>(arg
);
323 assert(*id
!= kNullUniqueId64x2
);
327 ASSERT_GT(verify_passed
, 0);
331 TEST_F(CompactionServiceTest
, ManualCompaction
) {
332 Options options
= CurrentOptions();
333 options
.disable_auto_compactions
= true;
334 ReopenWithCompactionService(&options
);
337 auto my_cs
= GetCompactionService();
339 std::string start_str
= Key(15);
340 std::string end_str
= Key(45);
341 Slice
start(start_str
);
343 uint64_t comp_num
= my_cs
->GetCompactionNum();
344 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), &start
, &end
));
345 ASSERT_GE(my_cs
->GetCompactionNum(), comp_num
+ 1);
348 start_str
= Key(120);
350 comp_num
= my_cs
->GetCompactionNum();
351 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), &start
, nullptr));
352 ASSERT_GE(my_cs
->GetCompactionNum(), comp_num
+ 1);
357 comp_num
= my_cs
->GetCompactionNum();
358 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), nullptr, &end
));
359 ASSERT_GE(my_cs
->GetCompactionNum(), comp_num
+ 1);
362 comp_num
= my_cs
->GetCompactionNum();
363 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
364 ASSERT_GE(my_cs
->GetCompactionNum(), comp_num
+ 1);
368 TEST_F(CompactionServiceTest
, CancelCompactionOnRemoteSide
) {
369 Options options
= CurrentOptions();
370 options
.disable_auto_compactions
= true;
371 ReopenWithCompactionService(&options
);
374 auto my_cs
= GetCompactionService();
376 std::string start_str
= Key(15);
377 std::string end_str
= Key(45);
378 Slice
start(start_str
);
380 uint64_t comp_num
= my_cs
->GetCompactionNum();
382 // Test cancel compaction at the beginning
383 my_cs
->SetCanceled(true);
384 auto s
= db_
->CompactRange(CompactRangeOptions(), &start
, &end
);
385 ASSERT_TRUE(s
.IsIncomplete());
386 // compaction number is not increased
387 ASSERT_GE(my_cs
->GetCompactionNum(), comp_num
);
390 // Test cancel compaction in progress
391 ReopenWithCompactionService(&options
);
393 my_cs
= GetCompactionService();
394 my_cs
->SetCanceled(false);
396 std::atomic_bool cancel_issued
{false};
397 SyncPoint::GetInstance()->SetCallBack("CompactionJob::Run():Inprogress",
399 cancel_issued
= true;
400 my_cs
->SetCanceled(true);
403 SyncPoint::GetInstance()->EnableProcessing();
405 s
= db_
->CompactRange(CompactRangeOptions(), &start
, &end
);
406 ASSERT_TRUE(s
.IsIncomplete());
407 ASSERT_TRUE(cancel_issued
);
408 // compaction number is not increased
409 ASSERT_GE(my_cs
->GetCompactionNum(), comp_num
);
413 TEST_F(CompactionServiceTest
, FailedToStart
) {
414 Options options
= CurrentOptions();
415 options
.disable_auto_compactions
= true;
416 ReopenWithCompactionService(&options
);
420 auto my_cs
= GetCompactionService();
421 my_cs
->OverrideStartStatus(CompactionServiceJobStatus::kFailure
);
423 std::string start_str
= Key(15);
424 std::string end_str
= Key(45);
425 Slice
start(start_str
);
427 Status s
= db_
->CompactRange(CompactRangeOptions(), &start
, &end
);
428 ASSERT_TRUE(s
.IsIncomplete());
431 TEST_F(CompactionServiceTest
, InvalidResult
) {
432 Options options
= CurrentOptions();
433 options
.disable_auto_compactions
= true;
434 ReopenWithCompactionService(&options
);
438 auto my_cs
= GetCompactionService();
439 my_cs
->OverrideWaitResult("Invalid Str");
441 std::string start_str
= Key(15);
442 std::string end_str
= Key(45);
443 Slice
start(start_str
);
445 Status s
= db_
->CompactRange(CompactRangeOptions(), &start
, &end
);
446 ASSERT_FALSE(s
.ok());
449 TEST_F(CompactionServiceTest
, SubCompaction
) {
450 Options options
= CurrentOptions();
451 options
.max_subcompactions
= 10;
452 options
.target_file_size_base
= 1 << 10; // 1KB
453 options
.disable_auto_compactions
= true;
454 ReopenWithCompactionService(&options
);
459 auto my_cs
= GetCompactionService();
460 int compaction_num_before
= my_cs
->GetCompactionNum();
462 auto cro
= CompactRangeOptions();
463 cro
.max_subcompactions
= 10;
464 Status s
= db_
->CompactRange(cro
, nullptr, nullptr);
467 int compaction_num
= my_cs
->GetCompactionNum() - compaction_num_before
;
468 // make sure there's sub-compaction by checking the compaction number
469 ASSERT_GE(compaction_num
, 2);
472 class PartialDeleteCompactionFilter
: public CompactionFilter
{
474 CompactionFilter::Decision
FilterV2(
475 int /*level*/, const Slice
& key
, ValueType
/*value_type*/,
476 const Slice
& /*existing_value*/, std::string
* /*new_value*/,
477 std::string
* /*skip_until*/) const override
{
478 int i
= std::stoi(key
.ToString().substr(3));
479 if (i
> 5 && i
<= 105) {
480 return CompactionFilter::Decision::kRemove
;
482 return CompactionFilter::Decision::kKeep
;
485 const char* Name() const override
{ return "PartialDeleteCompactionFilter"; }
488 TEST_F(CompactionServiceTest
, CompactionFilter
) {
489 Options options
= CurrentOptions();
490 std::unique_ptr
<CompactionFilter
> delete_comp_filter(
491 new PartialDeleteCompactionFilter());
492 options
.compaction_filter
= delete_comp_filter
.get();
493 ReopenWithCompactionService(&options
);
495 for (int i
= 0; i
< 20; i
++) {
496 for (int j
= 0; j
< 10; j
++) {
497 int key_id
= i
* 10 + j
;
498 ASSERT_OK(Put(Key(key_id
), "value" + std::to_string(key_id
)));
503 for (int i
= 0; i
< 10; i
++) {
504 for (int j
= 0; j
< 10; j
++) {
505 int key_id
= i
* 20 + j
* 2;
506 ASSERT_OK(Put(Key(key_id
), "value_new" + std::to_string(key_id
)));
510 ASSERT_OK(dbfull()->TEST_WaitForCompact());
512 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
515 for (int i
= 0; i
< 200; i
++) {
516 auto result
= Get(Key(i
));
517 if (i
> 5 && i
<= 105) {
518 ASSERT_EQ(result
, "NOT_FOUND");
520 ASSERT_EQ(result
, "value" + std::to_string(i
));
522 ASSERT_EQ(result
, "value_new" + std::to_string(i
));
525 auto my_cs
= GetCompactionService();
526 ASSERT_GE(my_cs
->GetCompactionNum(), 1);
529 TEST_F(CompactionServiceTest
, Snapshot
) {
530 Options options
= CurrentOptions();
531 ReopenWithCompactionService(&options
);
533 ASSERT_OK(Put(Key(1), "value1"));
534 ASSERT_OK(Put(Key(2), "value1"));
535 const Snapshot
* s1
= db_
->GetSnapshot();
538 ASSERT_OK(Put(Key(1), "value2"));
539 ASSERT_OK(Put(Key(3), "value2"));
542 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
543 auto my_cs
= GetCompactionService();
544 ASSERT_GE(my_cs
->GetCompactionNum(), 1);
545 ASSERT_EQ("value1", Get(Key(1), s1
));
546 ASSERT_EQ("value2", Get(Key(1)));
547 db_
->ReleaseSnapshot(s1
);
550 TEST_F(CompactionServiceTest
, ConcurrentCompaction
) {
551 Options options
= CurrentOptions();
552 options
.level0_file_num_compaction_trigger
= 100;
553 options
.max_background_jobs
= 20;
554 ReopenWithCompactionService(&options
);
557 ColumnFamilyMetaData meta
;
558 db_
->GetColumnFamilyMetaData(&meta
);
560 std::vector
<std::thread
> threads
;
561 for (const auto& file
: meta
.levels
[1].files
) {
562 threads
.emplace_back(std::thread([&]() {
563 std::string fname
= file
.db_path
+ "/" + file
.name
;
564 ASSERT_OK(db_
->CompactFiles(CompactionOptions(), {fname
}, 2));
568 for (auto& thread
: threads
) {
571 ASSERT_OK(dbfull()->TEST_WaitForCompact());
574 for (int i
= 0; i
< 200; i
++) {
575 auto result
= Get(Key(i
));
577 ASSERT_EQ(result
, "value" + std::to_string(i
));
579 ASSERT_EQ(result
, "value_new" + std::to_string(i
));
582 auto my_cs
= GetCompactionService();
583 ASSERT_EQ(my_cs
->GetCompactionNum(), 10);
584 ASSERT_EQ(FilesPerLevel(), "0,0,10");
587 TEST_F(CompactionServiceTest
, CompactionInfo
) {
588 Options options
= CurrentOptions();
589 ReopenWithCompactionService(&options
);
591 for (int i
= 0; i
< 20; i
++) {
592 for (int j
= 0; j
< 10; j
++) {
593 int key_id
= i
* 10 + j
;
594 ASSERT_OK(Put(Key(key_id
), "value" + std::to_string(key_id
)));
599 for (int i
= 0; i
< 10; i
++) {
600 for (int j
= 0; j
< 10; j
++) {
601 int key_id
= i
* 20 + j
* 2;
602 ASSERT_OK(Put(Key(key_id
), "value_new" + std::to_string(key_id
)));
606 ASSERT_OK(dbfull()->TEST_WaitForCompact());
608 static_cast_with_check
<MyTestCompactionService
>(GetCompactionService());
609 uint64_t comp_num
= my_cs
->GetCompactionNum();
610 ASSERT_GE(comp_num
, 1);
612 CompactionServiceJobInfo info
= my_cs
->GetCompactionInfoForStart();
613 ASSERT_EQ(dbname_
, info
.db_name
);
614 std::string db_id
, db_session_id
;
615 ASSERT_OK(db_
->GetDbIdentity(db_id
));
616 ASSERT_EQ(db_id
, info
.db_id
);
617 ASSERT_OK(db_
->GetDbSessionId(db_session_id
));
618 ASSERT_EQ(db_session_id
, info
.db_session_id
);
619 ASSERT_EQ(Env::LOW
, info
.priority
);
620 info
= my_cs
->GetCompactionInfoForWait();
621 ASSERT_EQ(dbname_
, info
.db_name
);
622 ASSERT_EQ(db_id
, info
.db_id
);
623 ASSERT_EQ(db_session_id
, info
.db_session_id
);
624 ASSERT_EQ(Env::LOW
, info
.priority
);
626 // Test priority USER
627 ColumnFamilyMetaData meta
;
628 db_
->GetColumnFamilyMetaData(&meta
);
629 SstFileMetaData file
= meta
.levels
[1].files
[0];
630 ASSERT_OK(db_
->CompactFiles(CompactionOptions(),
631 {file
.db_path
+ "/" + file
.name
}, 2));
632 info
= my_cs
->GetCompactionInfoForStart();
633 ASSERT_EQ(Env::USER
, info
.priority
);
634 info
= my_cs
->GetCompactionInfoForWait();
635 ASSERT_EQ(Env::USER
, info
.priority
);
637 // Test priority BOTTOM
638 env_
->SetBackgroundThreads(1, Env::BOTTOM
);
639 options
.num_levels
= 2;
640 ReopenWithCompactionService(&options
);
642 static_cast_with_check
<MyTestCompactionService
>(GetCompactionService());
644 for (int i
= 0; i
< 20; i
++) {
645 for (int j
= 0; j
< 10; j
++) {
646 int key_id
= i
* 10 + j
;
647 ASSERT_OK(Put(Key(key_id
), "value" + std::to_string(key_id
)));
652 for (int i
= 0; i
< 4; i
++) {
653 for (int j
= 0; j
< 10; j
++) {
654 int key_id
= i
* 20 + j
* 2;
655 ASSERT_OK(Put(Key(key_id
), "value_new" + std::to_string(key_id
)));
659 ASSERT_OK(dbfull()->TEST_WaitForCompact());
660 info
= my_cs
->GetCompactionInfoForStart();
661 ASSERT_EQ(Env::BOTTOM
, info
.priority
);
662 info
= my_cs
->GetCompactionInfoForWait();
663 ASSERT_EQ(Env::BOTTOM
, info
.priority
);
666 TEST_F(CompactionServiceTest
, FallbackLocalAuto
) {
667 Options options
= CurrentOptions();
668 ReopenWithCompactionService(&options
);
670 auto my_cs
= GetCompactionService();
671 Statistics
* compactor_statistics
= GetCompactorStatistics();
672 Statistics
* primary_statistics
= GetPrimaryStatistics();
673 uint64_t compactor_write_bytes
=
674 compactor_statistics
->getTickerCount(COMPACT_WRITE_BYTES
);
675 uint64_t primary_write_bytes
=
676 primary_statistics
->getTickerCount(COMPACT_WRITE_BYTES
);
678 my_cs
->OverrideStartStatus(CompactionServiceJobStatus::kUseLocal
);
680 for (int i
= 0; i
< 20; i
++) {
681 for (int j
= 0; j
< 10; j
++) {
682 int key_id
= i
* 10 + j
;
683 ASSERT_OK(Put(Key(key_id
), "value" + std::to_string(key_id
)));
688 for (int i
= 0; i
< 10; i
++) {
689 for (int j
= 0; j
< 10; j
++) {
690 int key_id
= i
* 20 + j
* 2;
691 ASSERT_OK(Put(Key(key_id
), "value_new" + std::to_string(key_id
)));
695 ASSERT_OK(dbfull()->TEST_WaitForCompact());
698 for (int i
= 0; i
< 200; i
++) {
699 auto result
= Get(Key(i
));
701 ASSERT_EQ(result
, "value" + std::to_string(i
));
703 ASSERT_EQ(result
, "value_new" + std::to_string(i
));
707 ASSERT_EQ(my_cs
->GetCompactionNum(), 0);
709 // make sure the compaction statistics is only recorded on the local side
710 ASSERT_EQ(compactor_statistics
->getTickerCount(COMPACT_WRITE_BYTES
),
711 compactor_write_bytes
);
712 ASSERT_GT(primary_statistics
->getTickerCount(COMPACT_WRITE_BYTES
),
713 primary_write_bytes
);
714 ASSERT_EQ(primary_statistics
->getTickerCount(REMOTE_COMPACT_READ_BYTES
), 0);
715 ASSERT_EQ(primary_statistics
->getTickerCount(REMOTE_COMPACT_WRITE_BYTES
), 0);
718 TEST_F(CompactionServiceTest
, FallbackLocalManual
) {
719 Options options
= CurrentOptions();
720 options
.disable_auto_compactions
= true;
721 ReopenWithCompactionService(&options
);
726 auto my_cs
= GetCompactionService();
727 Statistics
* compactor_statistics
= GetCompactorStatistics();
728 Statistics
* primary_statistics
= GetPrimaryStatistics();
729 uint64_t compactor_write_bytes
=
730 compactor_statistics
->getTickerCount(COMPACT_WRITE_BYTES
);
731 uint64_t primary_write_bytes
=
732 primary_statistics
->getTickerCount(COMPACT_WRITE_BYTES
);
734 // re-enable remote compaction
735 my_cs
->ResetOverride();
736 std::string start_str
= Key(15);
737 std::string end_str
= Key(45);
738 Slice
start(start_str
);
740 uint64_t comp_num
= my_cs
->GetCompactionNum();
742 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), &start
, &end
));
743 ASSERT_GE(my_cs
->GetCompactionNum(), comp_num
+ 1);
744 // make sure the compaction statistics is only recorded on the remote side
745 ASSERT_GT(compactor_statistics
->getTickerCount(COMPACT_WRITE_BYTES
),
746 compactor_write_bytes
);
747 ASSERT_EQ(primary_statistics
->getTickerCount(REMOTE_COMPACT_WRITE_BYTES
),
748 compactor_statistics
->getTickerCount(COMPACT_WRITE_BYTES
));
749 ASSERT_EQ(primary_statistics
->getTickerCount(COMPACT_WRITE_BYTES
),
750 primary_write_bytes
);
752 // return run local again with API WaitForComplete
753 my_cs
->OverrideWaitStatus(CompactionServiceJobStatus::kUseLocal
);
754 start_str
= Key(120);
756 comp_num
= my_cs
->GetCompactionNum();
757 compactor_write_bytes
=
758 compactor_statistics
->getTickerCount(COMPACT_WRITE_BYTES
);
759 primary_write_bytes
= primary_statistics
->getTickerCount(COMPACT_WRITE_BYTES
);
761 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), &start
, nullptr));
762 ASSERT_EQ(my_cs
->GetCompactionNum(),
763 comp_num
); // no remote compaction is run
764 // make sure the compaction statistics is only recorded on the local side
765 ASSERT_EQ(compactor_statistics
->getTickerCount(COMPACT_WRITE_BYTES
),
766 compactor_write_bytes
);
767 ASSERT_GT(primary_statistics
->getTickerCount(COMPACT_WRITE_BYTES
),
768 primary_write_bytes
);
769 ASSERT_EQ(primary_statistics
->getTickerCount(REMOTE_COMPACT_WRITE_BYTES
),
770 compactor_write_bytes
);
772 // verify result after 2 manual compactions
776 TEST_F(CompactionServiceTest
, RemoteEventListener
) {
777 class RemoteEventListenerTest
: public EventListener
{
779 const char* Name() const override
{ return "RemoteEventListenerTest"; }
781 void OnSubcompactionBegin(const SubcompactionJobInfo
& info
) override
{
782 auto result
= on_going_compactions
.emplace(info
.job_id
);
783 ASSERT_TRUE(result
.second
); // make sure there's no duplication
785 EventListener::OnSubcompactionBegin(info
);
787 void OnSubcompactionCompleted(const SubcompactionJobInfo
& info
) override
{
788 auto num
= on_going_compactions
.erase(info
.job_id
);
789 ASSERT_TRUE(num
== 1); // make sure the compaction id exists
790 EventListener::OnSubcompactionCompleted(info
);
792 void OnTableFileCreated(const TableFileCreationInfo
& info
) override
{
793 ASSERT_EQ(on_going_compactions
.count(info
.job_id
), 1);
795 EventListener::OnTableFileCreated(info
);
797 void OnTableFileCreationStarted(
798 const TableFileCreationBriefInfo
& info
) override
{
799 ASSERT_EQ(on_going_compactions
.count(info
.job_id
), 1);
800 file_creation_started
++;
801 EventListener::OnTableFileCreationStarted(info
);
804 bool ShouldBeNotifiedOnFileIO() override
{
806 return EventListener::ShouldBeNotifiedOnFileIO();
809 std::atomic_uint64_t file_io_notified
{0};
810 std::atomic_uint64_t file_creation_started
{0};
811 std::atomic_uint64_t file_created
{0};
813 std::set
<int> on_going_compactions
; // store the job_id
814 std::atomic_uint64_t compaction_num
{0};
817 auto listener
= new RemoteEventListenerTest();
818 remote_listeners
.emplace_back(listener
);
820 Options options
= CurrentOptions();
821 ReopenWithCompactionService(&options
);
823 for (int i
= 0; i
< 20; i
++) {
824 for (int j
= 0; j
< 10; j
++) {
825 int key_id
= i
* 10 + j
;
826 ASSERT_OK(Put(Key(key_id
), "value" + std::to_string(key_id
)));
831 for (int i
= 0; i
< 10; i
++) {
832 for (int j
= 0; j
< 10; j
++) {
833 int key_id
= i
* 20 + j
* 2;
834 ASSERT_OK(Put(Key(key_id
), "value_new" + std::to_string(key_id
)));
838 ASSERT_OK(dbfull()->TEST_WaitForCompact());
840 // check the events are triggered
841 ASSERT_TRUE(listener
->file_io_notified
> 0);
842 ASSERT_TRUE(listener
->file_creation_started
> 0);
843 ASSERT_TRUE(listener
->file_created
> 0);
844 ASSERT_TRUE(listener
->compaction_num
> 0);
845 ASSERT_TRUE(listener
->on_going_compactions
.empty());
848 for (int i
= 0; i
< 200; i
++) {
849 auto result
= Get(Key(i
));
851 ASSERT_EQ(result
, "value" + std::to_string(i
));
853 ASSERT_EQ(result
, "value_new" + std::to_string(i
));
858 TEST_F(CompactionServiceTest
, TablePropertiesCollector
) {
859 const static std::string kUserPropertyName
= "TestCount";
861 class TablePropertiesCollectorTest
: public TablePropertiesCollector
{
863 Status
Finish(UserCollectedProperties
* properties
) override
{
864 *properties
= UserCollectedProperties
{
865 {kUserPropertyName
, std::to_string(count_
)},
870 UserCollectedProperties
GetReadableProperties() const override
{
871 return UserCollectedProperties();
874 const char* Name() const override
{ return "TablePropertiesCollectorTest"; }
876 Status
AddUserKey(const Slice
& /*user_key*/, const Slice
& /*value*/,
877 EntryType
/*type*/, SequenceNumber
/*seq*/,
878 uint64_t /*file_size*/) override
{
887 class TablePropertiesCollectorFactoryTest
888 : public TablePropertiesCollectorFactory
{
890 TablePropertiesCollector
* CreateTablePropertiesCollector(
891 TablePropertiesCollectorFactory::Context
/*context*/) override
{
892 return new TablePropertiesCollectorTest();
895 const char* Name() const override
{
896 return "TablePropertiesCollectorFactoryTest";
900 auto factory
= new TablePropertiesCollectorFactoryTest();
901 remote_table_properties_collector_factories
.emplace_back(factory
);
903 const int kNumSst
= 3;
904 const int kLevel0Trigger
= 4;
905 Options options
= CurrentOptions();
906 options
.level0_file_num_compaction_trigger
= kLevel0Trigger
;
907 ReopenWithCompactionService(&options
);
909 // generate a few SSTs locally which should not have user property
910 for (int i
= 0; i
< kNumSst
; i
++) {
911 for (int j
= 0; j
< 100; j
++) {
912 ASSERT_OK(Put(Key(i
* 10 + j
), "value"));
917 TablePropertiesCollection fname_to_props
;
918 ASSERT_OK(db_
->GetPropertiesOfAllTables(&fname_to_props
));
919 for (const auto& file_props
: fname_to_props
) {
920 auto properties
= file_props
.second
->user_collected_properties
;
921 auto it
= properties
.find(kUserPropertyName
);
922 ASSERT_EQ(it
, properties
.end());
925 // trigger compaction
926 for (int i
= kNumSst
; i
< kLevel0Trigger
; i
++) {
927 for (int j
= 0; j
< 100; j
++) {
928 ASSERT_OK(Put(Key(i
* 10 + j
), "value"));
932 ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
934 ASSERT_OK(db_
->GetPropertiesOfAllTables(&fname_to_props
));
936 bool has_user_property
= false;
937 for (const auto& file_props
: fname_to_props
) {
938 auto properties
= file_props
.second
->user_collected_properties
;
939 auto it
= properties
.find(kUserPropertyName
);
940 if (it
!= properties
.end()) {
941 has_user_property
= true;
942 ASSERT_GT(std::stoi(it
->second
), 0);
945 ASSERT_TRUE(has_user_property
);
948 } // namespace ROCKSDB_NAMESPACE
950 int main(int argc
, char** argv
) {
951 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
952 ::testing::InitGoogleTest(&argc
, argv
);
953 RegisterCustomObjects(argc
, argv
);
954 return RUN_ALL_TESTS();
960 int main(int /*argc*/, char** /*argv*/) {
962 "SKIPPED as CompactionService is not supported in ROCKSDB_LITE\n");
966 #endif // ROCKSDB_LITE