1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
13 #include "db/db_impl.h"
14 #include "port/port.h"
15 #include "rocksdb/db.h"
16 #include "rocksdb/env.h"
17 #include "util/string_util.h"
18 #include "util/sync_point.h"
19 #include "util/testharness.h"
23 class CompactFilesTest
: public testing::Test
{
26 env_
= Env::Default();
27 db_name_
= test::TmpDir(env_
) + "/compact_files_test";
34 // A class which remembers the name of each flushed file.
35 class FlushedFileCollector
: public EventListener
{
37 FlushedFileCollector() {}
38 ~FlushedFileCollector() {}
40 virtual void OnFlushCompleted(
41 DB
* db
, const FlushJobInfo
& info
) override
{
42 std::lock_guard
<std::mutex
> lock(mutex_
);
43 flushed_files_
.push_back(info
.file_path
);
46 std::vector
<std::string
> GetFlushedFiles() {
47 std::lock_guard
<std::mutex
> lock(mutex_
);
48 std::vector
<std::string
> result
;
49 for (auto fname
: flushed_files_
) {
50 result
.push_back(fname
);
54 void ClearFlushedFiles() {
55 std::lock_guard
<std::mutex
> lock(mutex_
);
56 flushed_files_
.clear();
60 std::vector
<std::string
> flushed_files_
;
64 TEST_F(CompactFilesTest
, L0ConflictsFiles
) {
66 // to trigger compaction more easily
67 const int kWriteBufferSize
= 10000;
68 const int kLevel0Trigger
= 2;
69 options
.create_if_missing
= true;
70 options
.compaction_style
= kCompactionStyleLevel
;
71 // Small slowdown and stop trigger for experimental purpose.
72 options
.level0_slowdown_writes_trigger
= 20;
73 options
.level0_stop_writes_trigger
= 20;
74 options
.level0_stop_writes_trigger
= 20;
75 options
.write_buffer_size
= kWriteBufferSize
;
76 options
.level0_file_num_compaction_trigger
= kLevel0Trigger
;
77 options
.compression
= kNoCompression
;
80 DestroyDB(db_name_
, options
);
81 Status s
= DB::Open(options
, db_name_
, &db
);
85 rocksdb::SyncPoint::GetInstance()->LoadDependency({
86 {"CompactFilesImpl:0", "BackgroundCallCompaction:0"},
87 {"BackgroundCallCompaction:1", "CompactFilesImpl:1"},
89 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
91 // create couple files
92 // Background compaction starts and waits in BackgroundCallCompaction:0
93 for (int i
= 0; i
< kLevel0Trigger
* 4; ++i
) {
94 db
->Put(WriteOptions(), ToString(i
), "");
95 db
->Put(WriteOptions(), ToString(100 - i
), "");
96 db
->Flush(FlushOptions());
99 rocksdb::ColumnFamilyMetaData meta
;
100 db
->GetColumnFamilyMetaData(&meta
);
102 for (auto& file
: meta
.levels
[0].files
) {
103 ASSERT_EQ(0, meta
.levels
[0].level
);
105 file1
= file
.db_path
+ "/" + file
.name
;
107 std::string file2
= file
.db_path
+ "/" + file
.name
;
108 // Another thread starts a compact files and creates an L0 compaction
109 // The background compaction then notices that there is an L0 compaction
110 // already in progress and doesn't do an L0 compaction
111 // Once the background compaction finishes, the compact files finishes
113 db
->CompactFiles(rocksdb::CompactionOptions(), {file1
, file2
}, 0));
117 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
121 TEST_F(CompactFilesTest
, ObsoleteFiles
) {
123 // to trigger compaction more easily
124 const int kWriteBufferSize
= 65536;
125 options
.create_if_missing
= true;
126 // Disable RocksDB background compaction.
127 options
.compaction_style
= kCompactionStyleNone
;
128 options
.level0_slowdown_writes_trigger
= (1 << 30);
129 options
.level0_stop_writes_trigger
= (1 << 30);
130 options
.write_buffer_size
= kWriteBufferSize
;
131 options
.max_write_buffer_number
= 2;
132 options
.compression
= kNoCompression
;
135 FlushedFileCollector
* collector
= new FlushedFileCollector();
136 options
.listeners
.emplace_back(collector
);
139 DestroyDB(db_name_
, options
);
140 Status s
= DB::Open(options
, db_name_
, &db
);
144 // create couple files
145 for (int i
= 1000; i
< 2000; ++i
) {
146 db
->Put(WriteOptions(), ToString(i
),
147 std::string(kWriteBufferSize
/ 10, 'a' + (i
% 26)));
150 auto l0_files
= collector
->GetFlushedFiles();
151 ASSERT_OK(db
->CompactFiles(CompactionOptions(), l0_files
, 1));
152 reinterpret_cast<DBImpl
*>(db
)->TEST_WaitForCompact();
154 // verify all compaction input files are deleted
155 for (auto fname
: l0_files
) {
156 ASSERT_EQ(Status::NotFound(), env_
->FileExists(fname
));
161 TEST_F(CompactFilesTest
, NotCutOutputOnLevel0
) {
163 options
.create_if_missing
= true;
164 // Disable RocksDB background compaction.
165 options
.compaction_style
= kCompactionStyleNone
;
166 options
.level0_slowdown_writes_trigger
= 1000;
167 options
.level0_stop_writes_trigger
= 1000;
168 options
.write_buffer_size
= 65536;
169 options
.max_write_buffer_number
= 2;
170 options
.compression
= kNoCompression
;
171 options
.max_compaction_bytes
= 5000;
174 FlushedFileCollector
* collector
= new FlushedFileCollector();
175 options
.listeners
.emplace_back(collector
);
178 DestroyDB(db_name_
, options
);
179 Status s
= DB::Open(options
, db_name_
, &db
);
183 // create couple files
184 for (int i
= 0; i
< 500; ++i
) {
185 db
->Put(WriteOptions(), ToString(i
), std::string(1000, 'a' + (i
% 26)));
187 reinterpret_cast<DBImpl
*>(db
)->TEST_WaitForFlushMemTable();
188 auto l0_files_1
= collector
->GetFlushedFiles();
189 collector
->ClearFlushedFiles();
190 for (int i
= 0; i
< 500; ++i
) {
191 db
->Put(WriteOptions(), ToString(i
), std::string(1000, 'a' + (i
% 26)));
193 reinterpret_cast<DBImpl
*>(db
)->TEST_WaitForFlushMemTable();
194 auto l0_files_2
= collector
->GetFlushedFiles();
195 ASSERT_OK(db
->CompactFiles(CompactionOptions(), l0_files_1
, 0));
196 ASSERT_OK(db
->CompactFiles(CompactionOptions(), l0_files_2
, 0));
197 // no assertion failure
201 TEST_F(CompactFilesTest
, CapturingPendingFiles
) {
203 options
.create_if_missing
= true;
204 // Disable RocksDB background compaction.
205 options
.compaction_style
= kCompactionStyleNone
;
206 // Always do full scans for obsolete files (needed to reproduce the issue).
207 options
.delete_obsolete_files_period_micros
= 0;
210 FlushedFileCollector
* collector
= new FlushedFileCollector();
211 options
.listeners
.emplace_back(collector
);
214 DestroyDB(db_name_
, options
);
215 Status s
= DB::Open(options
, db_name_
, &db
);
220 for (int i
= 0; i
< 5; ++i
) {
221 db
->Put(WriteOptions(), "key" + ToString(i
), "value");
222 db
->Flush(FlushOptions());
225 auto l0_files
= collector
->GetFlushedFiles();
226 EXPECT_EQ(5, l0_files
.size());
228 rocksdb::SyncPoint::GetInstance()->LoadDependency({
229 {"CompactFilesImpl:2", "CompactFilesTest.CapturingPendingFiles:0"},
230 {"CompactFilesTest.CapturingPendingFiles:1", "CompactFilesImpl:3"},
232 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
234 // Start compacting files.
235 rocksdb::port::Thread
compaction_thread(
236 [&] { EXPECT_OK(db
->CompactFiles(CompactionOptions(), l0_files
, 1)); });
238 // In the meantime flush another file.
239 TEST_SYNC_POINT("CompactFilesTest.CapturingPendingFiles:0");
240 db
->Put(WriteOptions(), "key5", "value");
241 db
->Flush(FlushOptions());
242 TEST_SYNC_POINT("CompactFilesTest.CapturingPendingFiles:1");
244 compaction_thread
.join();
246 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
250 // Make sure we can reopen the DB.
251 s
= DB::Open(options
, db_name_
, &db
);
257 TEST_F(CompactFilesTest
, CompactionFilterWithGetSv
) {
258 class FilterWithGet
: public CompactionFilter
{
260 virtual bool Filter(int level
, const Slice
& key
, const Slice
& value
,
261 std::string
* new_value
,
262 bool* value_changed
) const override
{
263 if (db_
== nullptr) {
267 db_
->Get(ReadOptions(), "", &res
);
275 virtual const char* Name() const override
{ return "FilterWithGet"; }
282 std::shared_ptr
<FilterWithGet
> cf(new FilterWithGet());
285 options
.create_if_missing
= true;
286 options
.compaction_filter
= cf
.get();
289 DestroyDB(db_name_
, options
);
290 Status s
= DB::Open(options
, db_name_
, &db
);
296 db
->Put(WriteOptions(), "K1", "V1");
297 db
->Flush(FlushOptions());
299 // Compact all L0 files using CompactFiles
300 rocksdb::ColumnFamilyMetaData meta
;
301 db
->GetColumnFamilyMetaData(&meta
);
302 for (auto& file
: meta
.levels
[0].files
) {
303 std::string fname
= file
.db_path
+ "/" + file
.name
;
305 db
->CompactFiles(rocksdb::CompactionOptions(), {fname
}, 0));
312 } // namespace rocksdb
314 int main(int argc
, char** argv
) {
315 ::testing::InitGoogleTest(&argc
, argv
);
316 return RUN_ALL_TESTS();
322 int main(int argc
, char** argv
) {
324 "SKIPPED as DBImpl::CompactFiles is not supported in ROCKSDB_LITE\n");
328 #endif // !ROCKSDB_LITE