]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/compaction/compaction_service_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / compaction / compaction_service_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include "db/db_test_util.h"
9 #include "port/stack_trace.h"
10 #include "table/unique_id_impl.h"
11
12 namespace ROCKSDB_NAMESPACE {
13
14 class MyTestCompactionService : public CompactionService {
15 public:
16 MyTestCompactionService(
17 std::string db_path, Options& options,
18 std::shared_ptr<Statistics>& statistics,
19 std::vector<std::shared_ptr<EventListener>>& listeners,
20 std::vector<std::shared_ptr<TablePropertiesCollectorFactory>>
21 table_properties_collector_factories)
22 : db_path_(std::move(db_path)),
23 options_(options),
24 statistics_(statistics),
25 start_info_("na", "na", "na", 0, Env::TOTAL),
26 wait_info_("na", "na", "na", 0, Env::TOTAL),
27 listeners_(listeners),
28 table_properties_collector_factories_(
29 std::move(table_properties_collector_factories)) {}
30
31 static const char* kClassName() { return "MyTestCompactionService"; }
32
33 const char* Name() const override { return kClassName(); }
34
35 CompactionServiceJobStatus StartV2(
36 const CompactionServiceJobInfo& info,
37 const std::string& compaction_service_input) override {
38 InstrumentedMutexLock l(&mutex_);
39 start_info_ = info;
40 assert(info.db_name == db_path_);
41 jobs_.emplace(info.job_id, compaction_service_input);
42 CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess;
43 if (is_override_start_status_) {
44 return override_start_status_;
45 }
46 return s;
47 }
48
49 CompactionServiceJobStatus WaitForCompleteV2(
50 const CompactionServiceJobInfo& info,
51 std::string* compaction_service_result) override {
52 std::string compaction_input;
53 assert(info.db_name == db_path_);
54 {
55 InstrumentedMutexLock l(&mutex_);
56 wait_info_ = info;
57 auto i = jobs_.find(info.job_id);
58 if (i == jobs_.end()) {
59 return CompactionServiceJobStatus::kFailure;
60 }
61 compaction_input = std::move(i->second);
62 jobs_.erase(i);
63 }
64
65 if (is_override_wait_status_) {
66 return override_wait_status_;
67 }
68
69 CompactionServiceOptionsOverride options_override;
70 options_override.env = options_.env;
71 options_override.file_checksum_gen_factory =
72 options_.file_checksum_gen_factory;
73 options_override.comparator = options_.comparator;
74 options_override.merge_operator = options_.merge_operator;
75 options_override.compaction_filter = options_.compaction_filter;
76 options_override.compaction_filter_factory =
77 options_.compaction_filter_factory;
78 options_override.prefix_extractor = options_.prefix_extractor;
79 options_override.table_factory = options_.table_factory;
80 options_override.sst_partitioner_factory = options_.sst_partitioner_factory;
81 options_override.statistics = statistics_;
82 if (!listeners_.empty()) {
83 options_override.listeners = listeners_;
84 }
85
86 if (!table_properties_collector_factories_.empty()) {
87 options_override.table_properties_collector_factories =
88 table_properties_collector_factories_;
89 }
90
91 OpenAndCompactOptions options;
92 options.canceled = &canceled_;
93
94 Status s = DB::OpenAndCompact(
95 options, db_path_, db_path_ + "/" + std::to_string(info.job_id),
96 compaction_input, compaction_service_result, options_override);
97 if (is_override_wait_result_) {
98 *compaction_service_result = override_wait_result_;
99 }
100 compaction_num_.fetch_add(1);
101 if (s.ok()) {
102 return CompactionServiceJobStatus::kSuccess;
103 } else {
104 return CompactionServiceJobStatus::kFailure;
105 }
106 }
107
108 int GetCompactionNum() { return compaction_num_.load(); }
109
110 CompactionServiceJobInfo GetCompactionInfoForStart() { return start_info_; }
111 CompactionServiceJobInfo GetCompactionInfoForWait() { return wait_info_; }
112
113 void OverrideStartStatus(CompactionServiceJobStatus s) {
114 is_override_start_status_ = true;
115 override_start_status_ = s;
116 }
117
118 void OverrideWaitStatus(CompactionServiceJobStatus s) {
119 is_override_wait_status_ = true;
120 override_wait_status_ = s;
121 }
122
123 void OverrideWaitResult(std::string str) {
124 is_override_wait_result_ = true;
125 override_wait_result_ = std::move(str);
126 }
127
128 void ResetOverride() {
129 is_override_wait_result_ = false;
130 is_override_start_status_ = false;
131 is_override_wait_status_ = false;
132 }
133
134 void SetCanceled(bool canceled) { canceled_ = canceled; }
135
136 private:
137 InstrumentedMutex mutex_;
138 std::atomic_int compaction_num_{0};
139 std::map<uint64_t, std::string> jobs_;
140 const std::string db_path_;
141 Options options_;
142 std::shared_ptr<Statistics> statistics_;
143 CompactionServiceJobInfo start_info_;
144 CompactionServiceJobInfo wait_info_;
145 bool is_override_start_status_ = false;
146 CompactionServiceJobStatus override_start_status_ =
147 CompactionServiceJobStatus::kFailure;
148 bool is_override_wait_status_ = false;
149 CompactionServiceJobStatus override_wait_status_ =
150 CompactionServiceJobStatus::kFailure;
151 bool is_override_wait_result_ = false;
152 std::string override_wait_result_;
153 std::vector<std::shared_ptr<EventListener>> listeners_;
154 std::vector<std::shared_ptr<TablePropertiesCollectorFactory>>
155 table_properties_collector_factories_;
156 std::atomic_bool canceled_{false};
157 };
158
159 class CompactionServiceTest : public DBTestBase {
160 public:
161 explicit CompactionServiceTest()
162 : DBTestBase("compaction_service_test", true) {}
163
164 protected:
165 void ReopenWithCompactionService(Options* options) {
166 options->env = env_;
167 primary_statistics_ = CreateDBStatistics();
168 options->statistics = primary_statistics_;
169 compactor_statistics_ = CreateDBStatistics();
170
171 compaction_service_ = std::make_shared<MyTestCompactionService>(
172 dbname_, *options, compactor_statistics_, remote_listeners,
173 remote_table_properties_collector_factories);
174 options->compaction_service = compaction_service_;
175 DestroyAndReopen(*options);
176 }
177
178 Statistics* GetCompactorStatistics() { return compactor_statistics_.get(); }
179
180 Statistics* GetPrimaryStatistics() { return primary_statistics_.get(); }
181
182 MyTestCompactionService* GetCompactionService() {
183 CompactionService* cs = compaction_service_.get();
184 return static_cast_with_check<MyTestCompactionService>(cs);
185 }
186
187 void GenerateTestData() {
188 // Generate 20 files @ L2
189 for (int i = 0; i < 20; i++) {
190 for (int j = 0; j < 10; j++) {
191 int key_id = i * 10 + j;
192 ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
193 }
194 ASSERT_OK(Flush());
195 }
196 MoveFilesToLevel(2);
197
198 // Generate 10 files @ L1 overlap with all 20 files @ L2
199 for (int i = 0; i < 10; i++) {
200 for (int j = 0; j < 10; j++) {
201 int key_id = i * 20 + j * 2;
202 ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
203 }
204 ASSERT_OK(Flush());
205 }
206 MoveFilesToLevel(1);
207 ASSERT_EQ(FilesPerLevel(), "0,10,20");
208 }
209
210 void VerifyTestData() {
211 for (int i = 0; i < 200; i++) {
212 auto result = Get(Key(i));
213 if (i % 2) {
214 ASSERT_EQ(result, "value" + std::to_string(i));
215 } else {
216 ASSERT_EQ(result, "value_new" + std::to_string(i));
217 }
218 }
219 }
220
221 std::vector<std::shared_ptr<EventListener>> remote_listeners;
222 std::vector<std::shared_ptr<TablePropertiesCollectorFactory>>
223 remote_table_properties_collector_factories;
224
225 private:
226 std::shared_ptr<Statistics> compactor_statistics_;
227 std::shared_ptr<Statistics> primary_statistics_;
228 std::shared_ptr<CompactionService> compaction_service_;
229 };
230
231 TEST_F(CompactionServiceTest, BasicCompactions) {
232 Options options = CurrentOptions();
233 ReopenWithCompactionService(&options);
234
235 Statistics* primary_statistics = GetPrimaryStatistics();
236 Statistics* compactor_statistics = GetCompactorStatistics();
237
238 for (int i = 0; i < 20; i++) {
239 for (int j = 0; j < 10; j++) {
240 int key_id = i * 10 + j;
241 ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
242 }
243 ASSERT_OK(Flush());
244 }
245
246 for (int i = 0; i < 10; i++) {
247 for (int j = 0; j < 10; j++) {
248 int key_id = i * 20 + j * 2;
249 ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
250 }
251 ASSERT_OK(Flush());
252 }
253 ASSERT_OK(dbfull()->TEST_WaitForCompact());
254
255 // verify result
256 for (int i = 0; i < 200; i++) {
257 auto result = Get(Key(i));
258 if (i % 2) {
259 ASSERT_EQ(result, "value" + std::to_string(i));
260 } else {
261 ASSERT_EQ(result, "value_new" + std::to_string(i));
262 }
263 }
264 auto my_cs = GetCompactionService();
265 ASSERT_GE(my_cs->GetCompactionNum(), 1);
266
267 // make sure the compaction statistics is only recorded on the remote side
268 ASSERT_GE(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES), 1);
269 ASSERT_GE(compactor_statistics->getTickerCount(COMPACT_READ_BYTES), 1);
270 ASSERT_EQ(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES), 0);
271 // even with remote compaction, primary host still needs to read SST files to
272 // `verify_table()`.
273 ASSERT_GE(primary_statistics->getTickerCount(COMPACT_READ_BYTES), 1);
274 // all the compaction write happens on the remote side
275 ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
276 compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES));
277 ASSERT_GE(primary_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES), 1);
278 ASSERT_GT(primary_statistics->getTickerCount(COMPACT_READ_BYTES),
279 primary_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES));
280 // compactor is already the remote side, which doesn't have remote
281 ASSERT_EQ(compactor_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES), 0);
282 ASSERT_EQ(compactor_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
283 0);
284
285 // Test failed compaction
286 SyncPoint::GetInstance()->SetCallBack(
287 "DBImplSecondary::CompactWithoutInstallation::End", [&](void* status) {
288 // override job status
289 auto s = static_cast<Status*>(status);
290 *s = Status::Aborted("MyTestCompactionService failed to compact!");
291 });
292 SyncPoint::GetInstance()->EnableProcessing();
293
294 Status s;
295 for (int i = 0; i < 10; i++) {
296 for (int j = 0; j < 10; j++) {
297 int key_id = i * 20 + j * 2;
298 s = Put(Key(key_id), "value_new" + std::to_string(key_id));
299 if (s.IsAborted()) {
300 break;
301 }
302 }
303 if (s.IsAborted()) {
304 break;
305 }
306 s = Flush();
307 if (s.IsAborted()) {
308 break;
309 }
310 s = dbfull()->TEST_WaitForCompact();
311 if (s.IsAborted()) {
312 break;
313 }
314 }
315 ASSERT_TRUE(s.IsAborted());
316
317 // Test re-open and successful unique id verification
318 std::atomic_int verify_passed{0};
319 SyncPoint::GetInstance()->SetCallBack(
320 "BlockBasedTable::Open::PassedVerifyUniqueId", [&](void* arg) {
321 // override job status
322 auto id = static_cast<UniqueId64x2*>(arg);
323 assert(*id != kNullUniqueId64x2);
324 verify_passed++;
325 });
326 Reopen(options);
327 ASSERT_GT(verify_passed, 0);
328 Close();
329 }
330
331 TEST_F(CompactionServiceTest, ManualCompaction) {
332 Options options = CurrentOptions();
333 options.disable_auto_compactions = true;
334 ReopenWithCompactionService(&options);
335 GenerateTestData();
336
337 auto my_cs = GetCompactionService();
338
339 std::string start_str = Key(15);
340 std::string end_str = Key(45);
341 Slice start(start_str);
342 Slice end(end_str);
343 uint64_t comp_num = my_cs->GetCompactionNum();
344 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
345 ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
346 VerifyTestData();
347
348 start_str = Key(120);
349 start = start_str;
350 comp_num = my_cs->GetCompactionNum();
351 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, nullptr));
352 ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
353 VerifyTestData();
354
355 end_str = Key(92);
356 end = end_str;
357 comp_num = my_cs->GetCompactionNum();
358 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, &end));
359 ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
360 VerifyTestData();
361
362 comp_num = my_cs->GetCompactionNum();
363 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
364 ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
365 VerifyTestData();
366 }
367
368 TEST_F(CompactionServiceTest, CancelCompactionOnRemoteSide) {
369 Options options = CurrentOptions();
370 options.disable_auto_compactions = true;
371 ReopenWithCompactionService(&options);
372 GenerateTestData();
373
374 auto my_cs = GetCompactionService();
375
376 std::string start_str = Key(15);
377 std::string end_str = Key(45);
378 Slice start(start_str);
379 Slice end(end_str);
380 uint64_t comp_num = my_cs->GetCompactionNum();
381
382 // Test cancel compaction at the beginning
383 my_cs->SetCanceled(true);
384 auto s = db_->CompactRange(CompactRangeOptions(), &start, &end);
385 ASSERT_TRUE(s.IsIncomplete());
386 // compaction number is not increased
387 ASSERT_GE(my_cs->GetCompactionNum(), comp_num);
388 VerifyTestData();
389
390 // Test cancel compaction in progress
391 ReopenWithCompactionService(&options);
392 GenerateTestData();
393 my_cs = GetCompactionService();
394 my_cs->SetCanceled(false);
395
396 std::atomic_bool cancel_issued{false};
397 SyncPoint::GetInstance()->SetCallBack("CompactionJob::Run():Inprogress",
398 [&](void* /*arg*/) {
399 cancel_issued = true;
400 my_cs->SetCanceled(true);
401 });
402
403 SyncPoint::GetInstance()->EnableProcessing();
404
405 s = db_->CompactRange(CompactRangeOptions(), &start, &end);
406 ASSERT_TRUE(s.IsIncomplete());
407 ASSERT_TRUE(cancel_issued);
408 // compaction number is not increased
409 ASSERT_GE(my_cs->GetCompactionNum(), comp_num);
410 VerifyTestData();
411 }
412
413 TEST_F(CompactionServiceTest, FailedToStart) {
414 Options options = CurrentOptions();
415 options.disable_auto_compactions = true;
416 ReopenWithCompactionService(&options);
417
418 GenerateTestData();
419
420 auto my_cs = GetCompactionService();
421 my_cs->OverrideStartStatus(CompactionServiceJobStatus::kFailure);
422
423 std::string start_str = Key(15);
424 std::string end_str = Key(45);
425 Slice start(start_str);
426 Slice end(end_str);
427 Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
428 ASSERT_TRUE(s.IsIncomplete());
429 }
430
431 TEST_F(CompactionServiceTest, InvalidResult) {
432 Options options = CurrentOptions();
433 options.disable_auto_compactions = true;
434 ReopenWithCompactionService(&options);
435
436 GenerateTestData();
437
438 auto my_cs = GetCompactionService();
439 my_cs->OverrideWaitResult("Invalid Str");
440
441 std::string start_str = Key(15);
442 std::string end_str = Key(45);
443 Slice start(start_str);
444 Slice end(end_str);
445 Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
446 ASSERT_FALSE(s.ok());
447 }
448
449 TEST_F(CompactionServiceTest, SubCompaction) {
450 Options options = CurrentOptions();
451 options.max_subcompactions = 10;
452 options.target_file_size_base = 1 << 10; // 1KB
453 options.disable_auto_compactions = true;
454 ReopenWithCompactionService(&options);
455
456 GenerateTestData();
457 VerifyTestData();
458
459 auto my_cs = GetCompactionService();
460 int compaction_num_before = my_cs->GetCompactionNum();
461
462 auto cro = CompactRangeOptions();
463 cro.max_subcompactions = 10;
464 Status s = db_->CompactRange(cro, nullptr, nullptr);
465 ASSERT_OK(s);
466 VerifyTestData();
467 int compaction_num = my_cs->GetCompactionNum() - compaction_num_before;
468 // make sure there's sub-compaction by checking the compaction number
469 ASSERT_GE(compaction_num, 2);
470 }
471
472 class PartialDeleteCompactionFilter : public CompactionFilter {
473 public:
474 CompactionFilter::Decision FilterV2(
475 int /*level*/, const Slice& key, ValueType /*value_type*/,
476 const Slice& /*existing_value*/, std::string* /*new_value*/,
477 std::string* /*skip_until*/) const override {
478 int i = std::stoi(key.ToString().substr(3));
479 if (i > 5 && i <= 105) {
480 return CompactionFilter::Decision::kRemove;
481 }
482 return CompactionFilter::Decision::kKeep;
483 }
484
485 const char* Name() const override { return "PartialDeleteCompactionFilter"; }
486 };
487
488 TEST_F(CompactionServiceTest, CompactionFilter) {
489 Options options = CurrentOptions();
490 std::unique_ptr<CompactionFilter> delete_comp_filter(
491 new PartialDeleteCompactionFilter());
492 options.compaction_filter = delete_comp_filter.get();
493 ReopenWithCompactionService(&options);
494
495 for (int i = 0; i < 20; i++) {
496 for (int j = 0; j < 10; j++) {
497 int key_id = i * 10 + j;
498 ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
499 }
500 ASSERT_OK(Flush());
501 }
502
503 for (int i = 0; i < 10; i++) {
504 for (int j = 0; j < 10; j++) {
505 int key_id = i * 20 + j * 2;
506 ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
507 }
508 ASSERT_OK(Flush());
509 }
510 ASSERT_OK(dbfull()->TEST_WaitForCompact());
511
512 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
513
514 // verify result
515 for (int i = 0; i < 200; i++) {
516 auto result = Get(Key(i));
517 if (i > 5 && i <= 105) {
518 ASSERT_EQ(result, "NOT_FOUND");
519 } else if (i % 2) {
520 ASSERT_EQ(result, "value" + std::to_string(i));
521 } else {
522 ASSERT_EQ(result, "value_new" + std::to_string(i));
523 }
524 }
525 auto my_cs = GetCompactionService();
526 ASSERT_GE(my_cs->GetCompactionNum(), 1);
527 }
528
529 TEST_F(CompactionServiceTest, Snapshot) {
530 Options options = CurrentOptions();
531 ReopenWithCompactionService(&options);
532
533 ASSERT_OK(Put(Key(1), "value1"));
534 ASSERT_OK(Put(Key(2), "value1"));
535 const Snapshot* s1 = db_->GetSnapshot();
536 ASSERT_OK(Flush());
537
538 ASSERT_OK(Put(Key(1), "value2"));
539 ASSERT_OK(Put(Key(3), "value2"));
540 ASSERT_OK(Flush());
541
542 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
543 auto my_cs = GetCompactionService();
544 ASSERT_GE(my_cs->GetCompactionNum(), 1);
545 ASSERT_EQ("value1", Get(Key(1), s1));
546 ASSERT_EQ("value2", Get(Key(1)));
547 db_->ReleaseSnapshot(s1);
548 }
549
550 TEST_F(CompactionServiceTest, ConcurrentCompaction) {
551 Options options = CurrentOptions();
552 options.level0_file_num_compaction_trigger = 100;
553 options.max_background_jobs = 20;
554 ReopenWithCompactionService(&options);
555 GenerateTestData();
556
557 ColumnFamilyMetaData meta;
558 db_->GetColumnFamilyMetaData(&meta);
559
560 std::vector<std::thread> threads;
561 for (const auto& file : meta.levels[1].files) {
562 threads.emplace_back(std::thread([&]() {
563 std::string fname = file.db_path + "/" + file.name;
564 ASSERT_OK(db_->CompactFiles(CompactionOptions(), {fname}, 2));
565 }));
566 }
567
568 for (auto& thread : threads) {
569 thread.join();
570 }
571 ASSERT_OK(dbfull()->TEST_WaitForCompact());
572
573 // verify result
574 for (int i = 0; i < 200; i++) {
575 auto result = Get(Key(i));
576 if (i % 2) {
577 ASSERT_EQ(result, "value" + std::to_string(i));
578 } else {
579 ASSERT_EQ(result, "value_new" + std::to_string(i));
580 }
581 }
582 auto my_cs = GetCompactionService();
583 ASSERT_EQ(my_cs->GetCompactionNum(), 10);
584 ASSERT_EQ(FilesPerLevel(), "0,0,10");
585 }
586
587 TEST_F(CompactionServiceTest, CompactionInfo) {
588 Options options = CurrentOptions();
589 ReopenWithCompactionService(&options);
590
591 for (int i = 0; i < 20; i++) {
592 for (int j = 0; j < 10; j++) {
593 int key_id = i * 10 + j;
594 ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
595 }
596 ASSERT_OK(Flush());
597 }
598
599 for (int i = 0; i < 10; i++) {
600 for (int j = 0; j < 10; j++) {
601 int key_id = i * 20 + j * 2;
602 ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
603 }
604 ASSERT_OK(Flush());
605 }
606 ASSERT_OK(dbfull()->TEST_WaitForCompact());
607 auto my_cs =
608 static_cast_with_check<MyTestCompactionService>(GetCompactionService());
609 uint64_t comp_num = my_cs->GetCompactionNum();
610 ASSERT_GE(comp_num, 1);
611
612 CompactionServiceJobInfo info = my_cs->GetCompactionInfoForStart();
613 ASSERT_EQ(dbname_, info.db_name);
614 std::string db_id, db_session_id;
615 ASSERT_OK(db_->GetDbIdentity(db_id));
616 ASSERT_EQ(db_id, info.db_id);
617 ASSERT_OK(db_->GetDbSessionId(db_session_id));
618 ASSERT_EQ(db_session_id, info.db_session_id);
619 ASSERT_EQ(Env::LOW, info.priority);
620 info = my_cs->GetCompactionInfoForWait();
621 ASSERT_EQ(dbname_, info.db_name);
622 ASSERT_EQ(db_id, info.db_id);
623 ASSERT_EQ(db_session_id, info.db_session_id);
624 ASSERT_EQ(Env::LOW, info.priority);
625
626 // Test priority USER
627 ColumnFamilyMetaData meta;
628 db_->GetColumnFamilyMetaData(&meta);
629 SstFileMetaData file = meta.levels[1].files[0];
630 ASSERT_OK(db_->CompactFiles(CompactionOptions(),
631 {file.db_path + "/" + file.name}, 2));
632 info = my_cs->GetCompactionInfoForStart();
633 ASSERT_EQ(Env::USER, info.priority);
634 info = my_cs->GetCompactionInfoForWait();
635 ASSERT_EQ(Env::USER, info.priority);
636
637 // Test priority BOTTOM
638 env_->SetBackgroundThreads(1, Env::BOTTOM);
639 options.num_levels = 2;
640 ReopenWithCompactionService(&options);
641 my_cs =
642 static_cast_with_check<MyTestCompactionService>(GetCompactionService());
643
644 for (int i = 0; i < 20; i++) {
645 for (int j = 0; j < 10; j++) {
646 int key_id = i * 10 + j;
647 ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
648 }
649 ASSERT_OK(Flush());
650 }
651
652 for (int i = 0; i < 4; i++) {
653 for (int j = 0; j < 10; j++) {
654 int key_id = i * 20 + j * 2;
655 ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
656 }
657 ASSERT_OK(Flush());
658 }
659 ASSERT_OK(dbfull()->TEST_WaitForCompact());
660 info = my_cs->GetCompactionInfoForStart();
661 ASSERT_EQ(Env::BOTTOM, info.priority);
662 info = my_cs->GetCompactionInfoForWait();
663 ASSERT_EQ(Env::BOTTOM, info.priority);
664 }
665
666 TEST_F(CompactionServiceTest, FallbackLocalAuto) {
667 Options options = CurrentOptions();
668 ReopenWithCompactionService(&options);
669
670 auto my_cs = GetCompactionService();
671 Statistics* compactor_statistics = GetCompactorStatistics();
672 Statistics* primary_statistics = GetPrimaryStatistics();
673 uint64_t compactor_write_bytes =
674 compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES);
675 uint64_t primary_write_bytes =
676 primary_statistics->getTickerCount(COMPACT_WRITE_BYTES);
677
678 my_cs->OverrideStartStatus(CompactionServiceJobStatus::kUseLocal);
679
680 for (int i = 0; i < 20; i++) {
681 for (int j = 0; j < 10; j++) {
682 int key_id = i * 10 + j;
683 ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
684 }
685 ASSERT_OK(Flush());
686 }
687
688 for (int i = 0; i < 10; i++) {
689 for (int j = 0; j < 10; j++) {
690 int key_id = i * 20 + j * 2;
691 ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
692 }
693 ASSERT_OK(Flush());
694 }
695 ASSERT_OK(dbfull()->TEST_WaitForCompact());
696
697 // verify result
698 for (int i = 0; i < 200; i++) {
699 auto result = Get(Key(i));
700 if (i % 2) {
701 ASSERT_EQ(result, "value" + std::to_string(i));
702 } else {
703 ASSERT_EQ(result, "value_new" + std::to_string(i));
704 }
705 }
706
707 ASSERT_EQ(my_cs->GetCompactionNum(), 0);
708
709 // make sure the compaction statistics is only recorded on the local side
710 ASSERT_EQ(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES),
711 compactor_write_bytes);
712 ASSERT_GT(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES),
713 primary_write_bytes);
714 ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES), 0);
715 ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES), 0);
716 }
717
718 TEST_F(CompactionServiceTest, FallbackLocalManual) {
719 Options options = CurrentOptions();
720 options.disable_auto_compactions = true;
721 ReopenWithCompactionService(&options);
722
723 GenerateTestData();
724 VerifyTestData();
725
726 auto my_cs = GetCompactionService();
727 Statistics* compactor_statistics = GetCompactorStatistics();
728 Statistics* primary_statistics = GetPrimaryStatistics();
729 uint64_t compactor_write_bytes =
730 compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES);
731 uint64_t primary_write_bytes =
732 primary_statistics->getTickerCount(COMPACT_WRITE_BYTES);
733
734 // re-enable remote compaction
735 my_cs->ResetOverride();
736 std::string start_str = Key(15);
737 std::string end_str = Key(45);
738 Slice start(start_str);
739 Slice end(end_str);
740 uint64_t comp_num = my_cs->GetCompactionNum();
741
742 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
743 ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
744 // make sure the compaction statistics is only recorded on the remote side
745 ASSERT_GT(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES),
746 compactor_write_bytes);
747 ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
748 compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES));
749 ASSERT_EQ(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES),
750 primary_write_bytes);
751
752 // return run local again with API WaitForComplete
753 my_cs->OverrideWaitStatus(CompactionServiceJobStatus::kUseLocal);
754 start_str = Key(120);
755 start = start_str;
756 comp_num = my_cs->GetCompactionNum();
757 compactor_write_bytes =
758 compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES);
759 primary_write_bytes = primary_statistics->getTickerCount(COMPACT_WRITE_BYTES);
760
761 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, nullptr));
762 ASSERT_EQ(my_cs->GetCompactionNum(),
763 comp_num); // no remote compaction is run
764 // make sure the compaction statistics is only recorded on the local side
765 ASSERT_EQ(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES),
766 compactor_write_bytes);
767 ASSERT_GT(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES),
768 primary_write_bytes);
769 ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
770 compactor_write_bytes);
771
772 // verify result after 2 manual compactions
773 VerifyTestData();
774 }
775
776 TEST_F(CompactionServiceTest, RemoteEventListener) {
777 class RemoteEventListenerTest : public EventListener {
778 public:
779 const char* Name() const override { return "RemoteEventListenerTest"; }
780
781 void OnSubcompactionBegin(const SubcompactionJobInfo& info) override {
782 auto result = on_going_compactions.emplace(info.job_id);
783 ASSERT_TRUE(result.second); // make sure there's no duplication
784 compaction_num++;
785 EventListener::OnSubcompactionBegin(info);
786 }
787 void OnSubcompactionCompleted(const SubcompactionJobInfo& info) override {
788 auto num = on_going_compactions.erase(info.job_id);
789 ASSERT_TRUE(num == 1); // make sure the compaction id exists
790 EventListener::OnSubcompactionCompleted(info);
791 }
792 void OnTableFileCreated(const TableFileCreationInfo& info) override {
793 ASSERT_EQ(on_going_compactions.count(info.job_id), 1);
794 file_created++;
795 EventListener::OnTableFileCreated(info);
796 }
797 void OnTableFileCreationStarted(
798 const TableFileCreationBriefInfo& info) override {
799 ASSERT_EQ(on_going_compactions.count(info.job_id), 1);
800 file_creation_started++;
801 EventListener::OnTableFileCreationStarted(info);
802 }
803
804 bool ShouldBeNotifiedOnFileIO() override {
805 file_io_notified++;
806 return EventListener::ShouldBeNotifiedOnFileIO();
807 }
808
809 std::atomic_uint64_t file_io_notified{0};
810 std::atomic_uint64_t file_creation_started{0};
811 std::atomic_uint64_t file_created{0};
812
813 std::set<int> on_going_compactions; // store the job_id
814 std::atomic_uint64_t compaction_num{0};
815 };
816
817 auto listener = new RemoteEventListenerTest();
818 remote_listeners.emplace_back(listener);
819
820 Options options = CurrentOptions();
821 ReopenWithCompactionService(&options);
822
823 for (int i = 0; i < 20; i++) {
824 for (int j = 0; j < 10; j++) {
825 int key_id = i * 10 + j;
826 ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
827 }
828 ASSERT_OK(Flush());
829 }
830
831 for (int i = 0; i < 10; i++) {
832 for (int j = 0; j < 10; j++) {
833 int key_id = i * 20 + j * 2;
834 ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
835 }
836 ASSERT_OK(Flush());
837 }
838 ASSERT_OK(dbfull()->TEST_WaitForCompact());
839
840 // check the events are triggered
841 ASSERT_TRUE(listener->file_io_notified > 0);
842 ASSERT_TRUE(listener->file_creation_started > 0);
843 ASSERT_TRUE(listener->file_created > 0);
844 ASSERT_TRUE(listener->compaction_num > 0);
845 ASSERT_TRUE(listener->on_going_compactions.empty());
846
847 // verify result
848 for (int i = 0; i < 200; i++) {
849 auto result = Get(Key(i));
850 if (i % 2) {
851 ASSERT_EQ(result, "value" + std::to_string(i));
852 } else {
853 ASSERT_EQ(result, "value_new" + std::to_string(i));
854 }
855 }
856 }
857
858 TEST_F(CompactionServiceTest, TablePropertiesCollector) {
859 const static std::string kUserPropertyName = "TestCount";
860
861 class TablePropertiesCollectorTest : public TablePropertiesCollector {
862 public:
863 Status Finish(UserCollectedProperties* properties) override {
864 *properties = UserCollectedProperties{
865 {kUserPropertyName, std::to_string(count_)},
866 };
867 return Status::OK();
868 }
869
870 UserCollectedProperties GetReadableProperties() const override {
871 return UserCollectedProperties();
872 }
873
874 const char* Name() const override { return "TablePropertiesCollectorTest"; }
875
876 Status AddUserKey(const Slice& /*user_key*/, const Slice& /*value*/,
877 EntryType /*type*/, SequenceNumber /*seq*/,
878 uint64_t /*file_size*/) override {
879 count_++;
880 return Status::OK();
881 }
882
883 private:
884 uint32_t count_ = 0;
885 };
886
887 class TablePropertiesCollectorFactoryTest
888 : public TablePropertiesCollectorFactory {
889 public:
890 TablePropertiesCollector* CreateTablePropertiesCollector(
891 TablePropertiesCollectorFactory::Context /*context*/) override {
892 return new TablePropertiesCollectorTest();
893 }
894
895 const char* Name() const override {
896 return "TablePropertiesCollectorFactoryTest";
897 }
898 };
899
900 auto factory = new TablePropertiesCollectorFactoryTest();
901 remote_table_properties_collector_factories.emplace_back(factory);
902
903 const int kNumSst = 3;
904 const int kLevel0Trigger = 4;
905 Options options = CurrentOptions();
906 options.level0_file_num_compaction_trigger = kLevel0Trigger;
907 ReopenWithCompactionService(&options);
908
909 // generate a few SSTs locally which should not have user property
910 for (int i = 0; i < kNumSst; i++) {
911 for (int j = 0; j < 100; j++) {
912 ASSERT_OK(Put(Key(i * 10 + j), "value"));
913 }
914 ASSERT_OK(Flush());
915 }
916
917 TablePropertiesCollection fname_to_props;
918 ASSERT_OK(db_->GetPropertiesOfAllTables(&fname_to_props));
919 for (const auto& file_props : fname_to_props) {
920 auto properties = file_props.second->user_collected_properties;
921 auto it = properties.find(kUserPropertyName);
922 ASSERT_EQ(it, properties.end());
923 }
924
925 // trigger compaction
926 for (int i = kNumSst; i < kLevel0Trigger; i++) {
927 for (int j = 0; j < 100; j++) {
928 ASSERT_OK(Put(Key(i * 10 + j), "value"));
929 }
930 ASSERT_OK(Flush());
931 }
932 ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
933
934 ASSERT_OK(db_->GetPropertiesOfAllTables(&fname_to_props));
935
936 bool has_user_property = false;
937 for (const auto& file_props : fname_to_props) {
938 auto properties = file_props.second->user_collected_properties;
939 auto it = properties.find(kUserPropertyName);
940 if (it != properties.end()) {
941 has_user_property = true;
942 ASSERT_GT(std::stoi(it->second), 0);
943 }
944 }
945 ASSERT_TRUE(has_user_property);
946 }
947
948 } // namespace ROCKSDB_NAMESPACE
949
950 int main(int argc, char** argv) {
951 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
952 ::testing::InitGoogleTest(&argc, argv);
953 RegisterCustomObjects(argc, argv);
954 return RUN_ALL_TESTS();
955 }
956
957 #else
958 #include <stdio.h>
959
960 int main(int /*argc*/, char** /*argv*/) {
961 fprintf(stderr,
962 "SKIPPED as CompactionService is not supported in ROCKSDB_LITE\n");
963 return 0;
964 }
965
966 #endif // ROCKSDB_LITE