1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
13 #include "db/db_impl.h"
14 #include "port/port.h"
15 #include "rocksdb/db.h"
16 #include "rocksdb/env.h"
17 #include "util/string_util.h"
18 #include "util/sync_point.h"
19 #include "util/testharness.h"
23 class CompactFilesTest
: public testing::Test
{
26 env_
= Env::Default();
27 db_name_
= test::PerThreadDBPath("compact_files_test");
34 // A class which remembers the name of each flushed file.
35 class FlushedFileCollector
: public EventListener
{
37 FlushedFileCollector() {}
38 ~FlushedFileCollector() override
{}
40 void OnFlushCompleted(DB
* /*db*/, const FlushJobInfo
& info
) override
{
41 std::lock_guard
<std::mutex
> lock(mutex_
);
42 flushed_files_
.push_back(info
.file_path
);
45 std::vector
<std::string
> GetFlushedFiles() {
46 std::lock_guard
<std::mutex
> lock(mutex_
);
47 std::vector
<std::string
> result
;
48 for (auto fname
: flushed_files_
) {
49 result
.push_back(fname
);
53 void ClearFlushedFiles() {
54 std::lock_guard
<std::mutex
> lock(mutex_
);
55 flushed_files_
.clear();
59 std::vector
<std::string
> flushed_files_
;
63 TEST_F(CompactFilesTest
, L0ConflictsFiles
) {
65 // to trigger compaction more easily
66 const int kWriteBufferSize
= 10000;
67 const int kLevel0Trigger
= 2;
68 options
.create_if_missing
= true;
69 options
.compaction_style
= kCompactionStyleLevel
;
70 // Small slowdown and stop trigger for experimental purpose.
71 options
.level0_slowdown_writes_trigger
= 20;
72 options
.level0_stop_writes_trigger
= 20;
73 options
.level0_stop_writes_trigger
= 20;
74 options
.write_buffer_size
= kWriteBufferSize
;
75 options
.level0_file_num_compaction_trigger
= kLevel0Trigger
;
76 options
.compression
= kNoCompression
;
79 DestroyDB(db_name_
, options
);
80 Status s
= DB::Open(options
, db_name_
, &db
);
84 rocksdb::SyncPoint::GetInstance()->LoadDependency({
85 {"CompactFilesImpl:0", "BackgroundCallCompaction:0"},
86 {"BackgroundCallCompaction:1", "CompactFilesImpl:1"},
88 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
90 // create couple files
91 // Background compaction starts and waits in BackgroundCallCompaction:0
92 for (int i
= 0; i
< kLevel0Trigger
* 4; ++i
) {
93 db
->Put(WriteOptions(), ToString(i
), "");
94 db
->Put(WriteOptions(), ToString(100 - i
), "");
95 db
->Flush(FlushOptions());
98 rocksdb::ColumnFamilyMetaData meta
;
99 db
->GetColumnFamilyMetaData(&meta
);
101 for (auto& file
: meta
.levels
[0].files
) {
102 ASSERT_EQ(0, meta
.levels
[0].level
);
104 file1
= file
.db_path
+ "/" + file
.name
;
106 std::string file2
= file
.db_path
+ "/" + file
.name
;
107 // Another thread starts a compact files and creates an L0 compaction
108 // The background compaction then notices that there is an L0 compaction
109 // already in progress and doesn't do an L0 compaction
110 // Once the background compaction finishes, the compact files finishes
112 db
->CompactFiles(rocksdb::CompactionOptions(), {file1
, file2
}, 0));
116 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
120 TEST_F(CompactFilesTest
, ObsoleteFiles
) {
122 // to trigger compaction more easily
123 const int kWriteBufferSize
= 65536;
124 options
.create_if_missing
= true;
125 // Disable RocksDB background compaction.
126 options
.compaction_style
= kCompactionStyleNone
;
127 options
.level0_slowdown_writes_trigger
= (1 << 30);
128 options
.level0_stop_writes_trigger
= (1 << 30);
129 options
.write_buffer_size
= kWriteBufferSize
;
130 options
.max_write_buffer_number
= 2;
131 options
.compression
= kNoCompression
;
134 FlushedFileCollector
* collector
= new FlushedFileCollector();
135 options
.listeners
.emplace_back(collector
);
138 DestroyDB(db_name_
, options
);
139 Status s
= DB::Open(options
, db_name_
, &db
);
143 // create couple files
144 for (int i
= 1000; i
< 2000; ++i
) {
145 db
->Put(WriteOptions(), ToString(i
),
146 std::string(kWriteBufferSize
/ 10, 'a' + (i
% 26)));
149 auto l0_files
= collector
->GetFlushedFiles();
150 ASSERT_OK(db
->CompactFiles(CompactionOptions(), l0_files
, 1));
151 reinterpret_cast<DBImpl
*>(db
)->TEST_WaitForCompact();
153 // verify all compaction input files are deleted
154 for (auto fname
: l0_files
) {
155 ASSERT_EQ(Status::NotFound(), env_
->FileExists(fname
));
160 TEST_F(CompactFilesTest
, NotCutOutputOnLevel0
) {
162 options
.create_if_missing
= true;
163 // Disable RocksDB background compaction.
164 options
.compaction_style
= kCompactionStyleNone
;
165 options
.level0_slowdown_writes_trigger
= 1000;
166 options
.level0_stop_writes_trigger
= 1000;
167 options
.write_buffer_size
= 65536;
168 options
.max_write_buffer_number
= 2;
169 options
.compression
= kNoCompression
;
170 options
.max_compaction_bytes
= 5000;
173 FlushedFileCollector
* collector
= new FlushedFileCollector();
174 options
.listeners
.emplace_back(collector
);
177 DestroyDB(db_name_
, options
);
178 Status s
= DB::Open(options
, db_name_
, &db
);
182 // create couple files
183 for (int i
= 0; i
< 500; ++i
) {
184 db
->Put(WriteOptions(), ToString(i
), std::string(1000, 'a' + (i
% 26)));
186 reinterpret_cast<DBImpl
*>(db
)->TEST_WaitForFlushMemTable();
187 auto l0_files_1
= collector
->GetFlushedFiles();
188 collector
->ClearFlushedFiles();
189 for (int i
= 0; i
< 500; ++i
) {
190 db
->Put(WriteOptions(), ToString(i
), std::string(1000, 'a' + (i
% 26)));
192 reinterpret_cast<DBImpl
*>(db
)->TEST_WaitForFlushMemTable();
193 auto l0_files_2
= collector
->GetFlushedFiles();
194 ASSERT_OK(db
->CompactFiles(CompactionOptions(), l0_files_1
, 0));
195 ASSERT_OK(db
->CompactFiles(CompactionOptions(), l0_files_2
, 0));
196 // no assertion failure
200 TEST_F(CompactFilesTest
, CapturingPendingFiles
) {
202 options
.create_if_missing
= true;
203 // Disable RocksDB background compaction.
204 options
.compaction_style
= kCompactionStyleNone
;
205 // Always do full scans for obsolete files (needed to reproduce the issue).
206 options
.delete_obsolete_files_period_micros
= 0;
209 FlushedFileCollector
* collector
= new FlushedFileCollector();
210 options
.listeners
.emplace_back(collector
);
213 DestroyDB(db_name_
, options
);
214 Status s
= DB::Open(options
, db_name_
, &db
);
219 for (int i
= 0; i
< 5; ++i
) {
220 db
->Put(WriteOptions(), "key" + ToString(i
), "value");
221 db
->Flush(FlushOptions());
224 auto l0_files
= collector
->GetFlushedFiles();
225 EXPECT_EQ(5, l0_files
.size());
227 rocksdb::SyncPoint::GetInstance()->LoadDependency({
228 {"CompactFilesImpl:2", "CompactFilesTest.CapturingPendingFiles:0"},
229 {"CompactFilesTest.CapturingPendingFiles:1", "CompactFilesImpl:3"},
231 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
233 // Start compacting files.
234 rocksdb::port::Thread
compaction_thread(
235 [&] { EXPECT_OK(db
->CompactFiles(CompactionOptions(), l0_files
, 1)); });
237 // In the meantime flush another file.
238 TEST_SYNC_POINT("CompactFilesTest.CapturingPendingFiles:0");
239 db
->Put(WriteOptions(), "key5", "value");
240 db
->Flush(FlushOptions());
241 TEST_SYNC_POINT("CompactFilesTest.CapturingPendingFiles:1");
243 compaction_thread
.join();
245 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
249 // Make sure we can reopen the DB.
250 s
= DB::Open(options
, db_name_
, &db
);
256 TEST_F(CompactFilesTest
, CompactionFilterWithGetSv
) {
257 class FilterWithGet
: public CompactionFilter
{
259 bool Filter(int /*level*/, const Slice
& /*key*/, const Slice
& /*value*/,
260 std::string
* /*new_value*/,
261 bool* /*value_changed*/) const override
{
262 if (db_
== nullptr) {
266 db_
->Get(ReadOptions(), "", &res
);
274 const char* Name() const override
{ return "FilterWithGet"; }
281 std::shared_ptr
<FilterWithGet
> cf(new FilterWithGet());
284 options
.create_if_missing
= true;
285 options
.compaction_filter
= cf
.get();
288 DestroyDB(db_name_
, options
);
289 Status s
= DB::Open(options
, db_name_
, &db
);
295 db
->Put(WriteOptions(), "K1", "V1");
296 db
->Flush(FlushOptions());
298 // Compact all L0 files using CompactFiles
299 rocksdb::ColumnFamilyMetaData meta
;
300 db
->GetColumnFamilyMetaData(&meta
);
301 for (auto& file
: meta
.levels
[0].files
) {
302 std::string fname
= file
.db_path
+ "/" + file
.name
;
304 db
->CompactFiles(rocksdb::CompactionOptions(), {fname
}, 0));
311 TEST_F(CompactFilesTest
, SentinelCompressionType
) {
312 if (!Zlib_Supported()) {
313 fprintf(stderr
, "zlib compression not supported, skip this test\n");
316 if (!Snappy_Supported()) {
317 fprintf(stderr
, "snappy compression not supported, skip this test\n");
320 // Check that passing `CompressionType::kDisableCompressionOption` to
321 // `CompactFiles` causes it to use the column family compression options.
322 for (auto compaction_style
:
323 {CompactionStyle::kCompactionStyleLevel
,
324 CompactionStyle::kCompactionStyleUniversal
,
325 CompactionStyle::kCompactionStyleNone
}) {
326 DestroyDB(db_name_
, Options());
328 options
.compaction_style
= compaction_style
;
329 // L0: Snappy, L1: ZSTD, L2: Snappy
330 options
.compression_per_level
= {CompressionType::kSnappyCompression
,
331 CompressionType::kZlibCompression
,
332 CompressionType::kSnappyCompression
};
333 options
.create_if_missing
= true;
334 FlushedFileCollector
* collector
= new FlushedFileCollector();
335 options
.listeners
.emplace_back(collector
);
337 ASSERT_OK(DB::Open(options
, db_name_
, &db
));
339 db
->Put(WriteOptions(), "key", "val");
340 db
->Flush(FlushOptions());
342 auto l0_files
= collector
->GetFlushedFiles();
343 ASSERT_EQ(1, l0_files
.size());
345 // L0->L1 compaction, so output should be ZSTD-compressed
346 CompactionOptions compaction_opts
;
347 compaction_opts
.compression
= CompressionType::kDisableCompressionOption
;
348 ASSERT_OK(db
->CompactFiles(compaction_opts
, l0_files
, 1));
350 rocksdb::TablePropertiesCollection all_tables_props
;
351 ASSERT_OK(db
->GetPropertiesOfAllTables(&all_tables_props
));
352 for (const auto& name_and_table_props
: all_tables_props
) {
353 ASSERT_EQ(CompressionTypeToString(CompressionType::kZlibCompression
),
354 name_and_table_props
.second
->compression_name
);
360 TEST_F(CompactFilesTest
, GetCompactionJobInfo
) {
362 options
.create_if_missing
= true;
363 // Disable RocksDB background compaction.
364 options
.compaction_style
= kCompactionStyleNone
;
365 options
.level0_slowdown_writes_trigger
= 1000;
366 options
.level0_stop_writes_trigger
= 1000;
367 options
.write_buffer_size
= 65536;
368 options
.max_write_buffer_number
= 2;
369 options
.compression
= kNoCompression
;
370 options
.max_compaction_bytes
= 5000;
373 FlushedFileCollector
* collector
= new FlushedFileCollector();
374 options
.listeners
.emplace_back(collector
);
377 DestroyDB(db_name_
, options
);
378 Status s
= DB::Open(options
, db_name_
, &db
);
382 // create couple files
383 for (int i
= 0; i
< 500; ++i
) {
384 db
->Put(WriteOptions(), ToString(i
), std::string(1000, 'a' + (i
% 26)));
386 reinterpret_cast<DBImpl
*>(db
)->TEST_WaitForFlushMemTable();
387 auto l0_files_1
= collector
->GetFlushedFiles();
388 CompactionOptions co
;
389 co
.compression
= CompressionType::kLZ4Compression
;
390 CompactionJobInfo compaction_job_info
;
392 db
->CompactFiles(co
, l0_files_1
, 0, -1, nullptr, &compaction_job_info
));
393 ASSERT_EQ(compaction_job_info
.base_input_level
, 0);
394 ASSERT_EQ(compaction_job_info
.cf_id
, db
->DefaultColumnFamily()->GetID());
395 ASSERT_EQ(compaction_job_info
.cf_name
, db
->DefaultColumnFamily()->GetName());
396 ASSERT_EQ(compaction_job_info
.compaction_reason
,
397 CompactionReason::kManualCompaction
);
398 ASSERT_EQ(compaction_job_info
.compression
, CompressionType::kLZ4Compression
);
399 ASSERT_EQ(compaction_job_info
.output_level
, 0);
400 ASSERT_OK(compaction_job_info
.status
);
401 // no assertion failure
405 } // namespace rocksdb
407 int main(int argc
, char** argv
) {
408 ::testing::InitGoogleTest(&argc
, argv
);
409 return RUN_ALL_TESTS();
415 int main(int /*argc*/, char** /*argv*/) {
417 "SKIPPED as DBImpl::CompactFiles is not supported in ROCKSDB_LITE\n");
421 #endif // !ROCKSDB_LITE