]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | |
6 | #ifndef ROCKSDB_LITE | |
7 | ||
8 | #include <mutex> | |
9 | #include <string> | |
10 | #include <thread> | |
11 | #include <vector> | |
12 | ||
f67539c2 | 13 | #include "db/db_impl/db_impl.h" |
7c673cae FG |
14 | #include "port/port.h" |
15 | #include "rocksdb/db.h" | |
16 | #include "rocksdb/env.h" | |
f67539c2 TL |
17 | #include "test_util/sync_point.h" |
18 | #include "test_util/testharness.h" | |
20effc67 | 19 | #include "util/cast_util.h" |
7c673cae | 20 | #include "util/string_util.h" |
7c673cae | 21 | |
f67539c2 | 22 | namespace ROCKSDB_NAMESPACE { |
7c673cae FG |
23 | |
24 | class CompactFilesTest : public testing::Test { | |
25 | public: | |
26 | CompactFilesTest() { | |
27 | env_ = Env::Default(); | |
11fdf7f2 | 28 | db_name_ = test::PerThreadDBPath("compact_files_test"); |
7c673cae FG |
29 | } |
30 | ||
31 | std::string db_name_; | |
32 | Env* env_; | |
33 | }; | |
34 | ||
35 | // A class which remembers the name of each flushed file. | |
36 | class FlushedFileCollector : public EventListener { | |
37 | public: | |
38 | FlushedFileCollector() {} | |
494da23a | 39 | ~FlushedFileCollector() override {} |
7c673cae | 40 | |
494da23a | 41 | void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override { |
7c673cae FG |
42 | std::lock_guard<std::mutex> lock(mutex_); |
43 | flushed_files_.push_back(info.file_path); | |
44 | } | |
45 | ||
46 | std::vector<std::string> GetFlushedFiles() { | |
47 | std::lock_guard<std::mutex> lock(mutex_); | |
48 | std::vector<std::string> result; | |
49 | for (auto fname : flushed_files_) { | |
50 | result.push_back(fname); | |
51 | } | |
52 | return result; | |
53 | } | |
54 | void ClearFlushedFiles() { | |
55 | std::lock_guard<std::mutex> lock(mutex_); | |
56 | flushed_files_.clear(); | |
57 | } | |
58 | ||
59 | private: | |
60 | std::vector<std::string> flushed_files_; | |
61 | std::mutex mutex_; | |
62 | }; | |
63 | ||
64 | TEST_F(CompactFilesTest, L0ConflictsFiles) { | |
65 | Options options; | |
66 | // to trigger compaction more easily | |
67 | const int kWriteBufferSize = 10000; | |
68 | const int kLevel0Trigger = 2; | |
69 | options.create_if_missing = true; | |
70 | options.compaction_style = kCompactionStyleLevel; | |
71 | // Small slowdown and stop trigger for experimental purpose. | |
72 | options.level0_slowdown_writes_trigger = 20; | |
73 | options.level0_stop_writes_trigger = 20; | |
74 | options.level0_stop_writes_trigger = 20; | |
75 | options.write_buffer_size = kWriteBufferSize; | |
76 | options.level0_file_num_compaction_trigger = kLevel0Trigger; | |
77 | options.compression = kNoCompression; | |
78 | ||
79 | DB* db = nullptr; | |
80 | DestroyDB(db_name_, options); | |
81 | Status s = DB::Open(options, db_name_, &db); | |
82 | assert(s.ok()); | |
83 | assert(db); | |
84 | ||
f67539c2 | 85 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ |
7c673cae FG |
86 | {"CompactFilesImpl:0", "BackgroundCallCompaction:0"}, |
87 | {"BackgroundCallCompaction:1", "CompactFilesImpl:1"}, | |
88 | }); | |
f67539c2 | 89 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); |
7c673cae FG |
90 | |
91 | // create couple files | |
92 | // Background compaction starts and waits in BackgroundCallCompaction:0 | |
93 | for (int i = 0; i < kLevel0Trigger * 4; ++i) { | |
94 | db->Put(WriteOptions(), ToString(i), ""); | |
95 | db->Put(WriteOptions(), ToString(100 - i), ""); | |
96 | db->Flush(FlushOptions()); | |
97 | } | |
98 | ||
f67539c2 | 99 | ROCKSDB_NAMESPACE::ColumnFamilyMetaData meta; |
7c673cae FG |
100 | db->GetColumnFamilyMetaData(&meta); |
101 | std::string file1; | |
102 | for (auto& file : meta.levels[0].files) { | |
103 | ASSERT_EQ(0, meta.levels[0].level); | |
104 | if (file1 == "") { | |
105 | file1 = file.db_path + "/" + file.name; | |
106 | } else { | |
107 | std::string file2 = file.db_path + "/" + file.name; | |
108 | // Another thread starts a compact files and creates an L0 compaction | |
109 | // The background compaction then notices that there is an L0 compaction | |
110 | // already in progress and doesn't do an L0 compaction | |
111 | // Once the background compaction finishes, the compact files finishes | |
f67539c2 TL |
112 | ASSERT_OK(db->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(), |
113 | {file1, file2}, 0)); | |
7c673cae FG |
114 | break; |
115 | } | |
116 | } | |
f67539c2 | 117 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); |
7c673cae FG |
118 | delete db; |
119 | } | |
120 | ||
121 | TEST_F(CompactFilesTest, ObsoleteFiles) { | |
122 | Options options; | |
123 | // to trigger compaction more easily | |
124 | const int kWriteBufferSize = 65536; | |
125 | options.create_if_missing = true; | |
126 | // Disable RocksDB background compaction. | |
127 | options.compaction_style = kCompactionStyleNone; | |
128 | options.level0_slowdown_writes_trigger = (1 << 30); | |
129 | options.level0_stop_writes_trigger = (1 << 30); | |
130 | options.write_buffer_size = kWriteBufferSize; | |
131 | options.max_write_buffer_number = 2; | |
132 | options.compression = kNoCompression; | |
133 | ||
134 | // Add listener | |
135 | FlushedFileCollector* collector = new FlushedFileCollector(); | |
136 | options.listeners.emplace_back(collector); | |
137 | ||
138 | DB* db = nullptr; | |
139 | DestroyDB(db_name_, options); | |
140 | Status s = DB::Open(options, db_name_, &db); | |
141 | assert(s.ok()); | |
142 | assert(db); | |
143 | ||
144 | // create couple files | |
145 | for (int i = 1000; i < 2000; ++i) { | |
146 | db->Put(WriteOptions(), ToString(i), | |
147 | std::string(kWriteBufferSize / 10, 'a' + (i % 26))); | |
148 | } | |
149 | ||
150 | auto l0_files = collector->GetFlushedFiles(); | |
151 | ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files, 1)); | |
20effc67 | 152 | static_cast_with_check<DBImpl>(db)->TEST_WaitForCompact(); |
7c673cae FG |
153 | |
154 | // verify all compaction input files are deleted | |
155 | for (auto fname : l0_files) { | |
156 | ASSERT_EQ(Status::NotFound(), env_->FileExists(fname)); | |
157 | } | |
158 | delete db; | |
159 | } | |
160 | ||
161 | TEST_F(CompactFilesTest, NotCutOutputOnLevel0) { | |
162 | Options options; | |
163 | options.create_if_missing = true; | |
164 | // Disable RocksDB background compaction. | |
165 | options.compaction_style = kCompactionStyleNone; | |
166 | options.level0_slowdown_writes_trigger = 1000; | |
167 | options.level0_stop_writes_trigger = 1000; | |
168 | options.write_buffer_size = 65536; | |
169 | options.max_write_buffer_number = 2; | |
170 | options.compression = kNoCompression; | |
171 | options.max_compaction_bytes = 5000; | |
172 | ||
173 | // Add listener | |
174 | FlushedFileCollector* collector = new FlushedFileCollector(); | |
175 | options.listeners.emplace_back(collector); | |
176 | ||
177 | DB* db = nullptr; | |
178 | DestroyDB(db_name_, options); | |
179 | Status s = DB::Open(options, db_name_, &db); | |
180 | assert(s.ok()); | |
181 | assert(db); | |
182 | ||
183 | // create couple files | |
184 | for (int i = 0; i < 500; ++i) { | |
185 | db->Put(WriteOptions(), ToString(i), std::string(1000, 'a' + (i % 26))); | |
186 | } | |
20effc67 | 187 | static_cast_with_check<DBImpl>(db)->TEST_WaitForFlushMemTable(); |
7c673cae FG |
188 | auto l0_files_1 = collector->GetFlushedFiles(); |
189 | collector->ClearFlushedFiles(); | |
190 | for (int i = 0; i < 500; ++i) { | |
191 | db->Put(WriteOptions(), ToString(i), std::string(1000, 'a' + (i % 26))); | |
192 | } | |
20effc67 | 193 | static_cast_with_check<DBImpl>(db)->TEST_WaitForFlushMemTable(); |
7c673cae FG |
194 | auto l0_files_2 = collector->GetFlushedFiles(); |
195 | ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files_1, 0)); | |
196 | ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files_2, 0)); | |
197 | // no assertion failure | |
198 | delete db; | |
199 | } | |
200 | ||
201 | TEST_F(CompactFilesTest, CapturingPendingFiles) { | |
202 | Options options; | |
203 | options.create_if_missing = true; | |
204 | // Disable RocksDB background compaction. | |
205 | options.compaction_style = kCompactionStyleNone; | |
206 | // Always do full scans for obsolete files (needed to reproduce the issue). | |
207 | options.delete_obsolete_files_period_micros = 0; | |
208 | ||
209 | // Add listener. | |
210 | FlushedFileCollector* collector = new FlushedFileCollector(); | |
211 | options.listeners.emplace_back(collector); | |
212 | ||
213 | DB* db = nullptr; | |
214 | DestroyDB(db_name_, options); | |
215 | Status s = DB::Open(options, db_name_, &db); | |
216 | assert(s.ok()); | |
217 | assert(db); | |
218 | ||
219 | // Create 5 files. | |
220 | for (int i = 0; i < 5; ++i) { | |
221 | db->Put(WriteOptions(), "key" + ToString(i), "value"); | |
222 | db->Flush(FlushOptions()); | |
223 | } | |
224 | ||
225 | auto l0_files = collector->GetFlushedFiles(); | |
226 | EXPECT_EQ(5, l0_files.size()); | |
227 | ||
f67539c2 | 228 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ |
7c673cae FG |
229 | {"CompactFilesImpl:2", "CompactFilesTest.CapturingPendingFiles:0"}, |
230 | {"CompactFilesTest.CapturingPendingFiles:1", "CompactFilesImpl:3"}, | |
231 | }); | |
f67539c2 | 232 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); |
7c673cae FG |
233 | |
234 | // Start compacting files. | |
f67539c2 | 235 | ROCKSDB_NAMESPACE::port::Thread compaction_thread( |
7c673cae FG |
236 | [&] { EXPECT_OK(db->CompactFiles(CompactionOptions(), l0_files, 1)); }); |
237 | ||
238 | // In the meantime flush another file. | |
239 | TEST_SYNC_POINT("CompactFilesTest.CapturingPendingFiles:0"); | |
240 | db->Put(WriteOptions(), "key5", "value"); | |
241 | db->Flush(FlushOptions()); | |
242 | TEST_SYNC_POINT("CompactFilesTest.CapturingPendingFiles:1"); | |
243 | ||
244 | compaction_thread.join(); | |
245 | ||
f67539c2 | 246 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); |
7c673cae FG |
247 | |
248 | delete db; | |
249 | ||
250 | // Make sure we can reopen the DB. | |
251 | s = DB::Open(options, db_name_, &db); | |
252 | ASSERT_TRUE(s.ok()); | |
253 | assert(db); | |
254 | delete db; | |
255 | } | |
256 | ||
257 | TEST_F(CompactFilesTest, CompactionFilterWithGetSv) { | |
258 | class FilterWithGet : public CompactionFilter { | |
259 | public: | |
494da23a TL |
260 | bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/, |
261 | std::string* /*new_value*/, | |
262 | bool* /*value_changed*/) const override { | |
7c673cae FG |
263 | if (db_ == nullptr) { |
264 | return true; | |
265 | } | |
266 | std::string res; | |
267 | db_->Get(ReadOptions(), "", &res); | |
268 | return true; | |
269 | } | |
270 | ||
271 | void SetDB(DB* db) { | |
272 | db_ = db; | |
273 | } | |
274 | ||
494da23a | 275 | const char* Name() const override { return "FilterWithGet"; } |
7c673cae FG |
276 | |
277 | private: | |
278 | DB* db_; | |
279 | }; | |
280 | ||
281 | ||
282 | std::shared_ptr<FilterWithGet> cf(new FilterWithGet()); | |
283 | ||
284 | Options options; | |
285 | options.create_if_missing = true; | |
286 | options.compaction_filter = cf.get(); | |
287 | ||
288 | DB* db = nullptr; | |
289 | DestroyDB(db_name_, options); | |
290 | Status s = DB::Open(options, db_name_, &db); | |
291 | ASSERT_OK(s); | |
292 | ||
293 | cf->SetDB(db); | |
294 | ||
295 | // Write one L0 file | |
296 | db->Put(WriteOptions(), "K1", "V1"); | |
297 | db->Flush(FlushOptions()); | |
298 | ||
299 | // Compact all L0 files using CompactFiles | |
f67539c2 | 300 | ROCKSDB_NAMESPACE::ColumnFamilyMetaData meta; |
7c673cae FG |
301 | db->GetColumnFamilyMetaData(&meta); |
302 | for (auto& file : meta.levels[0].files) { | |
303 | std::string fname = file.db_path + "/" + file.name; | |
304 | ASSERT_OK( | |
f67539c2 | 305 | db->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(), {fname}, 0)); |
7c673cae FG |
306 | } |
307 | ||
308 | ||
309 | delete db; | |
310 | } | |
311 | ||
11fdf7f2 TL |
312 | TEST_F(CompactFilesTest, SentinelCompressionType) { |
313 | if (!Zlib_Supported()) { | |
314 | fprintf(stderr, "zlib compression not supported, skip this test\n"); | |
315 | return; | |
316 | } | |
317 | if (!Snappy_Supported()) { | |
318 | fprintf(stderr, "snappy compression not supported, skip this test\n"); | |
319 | return; | |
320 | } | |
321 | // Check that passing `CompressionType::kDisableCompressionOption` to | |
322 | // `CompactFiles` causes it to use the column family compression options. | |
323 | for (auto compaction_style : | |
324 | {CompactionStyle::kCompactionStyleLevel, | |
325 | CompactionStyle::kCompactionStyleUniversal, | |
326 | CompactionStyle::kCompactionStyleNone}) { | |
327 | DestroyDB(db_name_, Options()); | |
328 | Options options; | |
329 | options.compaction_style = compaction_style; | |
330 | // L0: Snappy, L1: ZSTD, L2: Snappy | |
331 | options.compression_per_level = {CompressionType::kSnappyCompression, | |
332 | CompressionType::kZlibCompression, | |
333 | CompressionType::kSnappyCompression}; | |
334 | options.create_if_missing = true; | |
335 | FlushedFileCollector* collector = new FlushedFileCollector(); | |
336 | options.listeners.emplace_back(collector); | |
337 | DB* db = nullptr; | |
338 | ASSERT_OK(DB::Open(options, db_name_, &db)); | |
339 | ||
340 | db->Put(WriteOptions(), "key", "val"); | |
341 | db->Flush(FlushOptions()); | |
342 | ||
343 | auto l0_files = collector->GetFlushedFiles(); | |
344 | ASSERT_EQ(1, l0_files.size()); | |
345 | ||
346 | // L0->L1 compaction, so output should be ZSTD-compressed | |
347 | CompactionOptions compaction_opts; | |
348 | compaction_opts.compression = CompressionType::kDisableCompressionOption; | |
349 | ASSERT_OK(db->CompactFiles(compaction_opts, l0_files, 1)); | |
350 | ||
f67539c2 | 351 | ROCKSDB_NAMESPACE::TablePropertiesCollection all_tables_props; |
11fdf7f2 TL |
352 | ASSERT_OK(db->GetPropertiesOfAllTables(&all_tables_props)); |
353 | for (const auto& name_and_table_props : all_tables_props) { | |
354 | ASSERT_EQ(CompressionTypeToString(CompressionType::kZlibCompression), | |
355 | name_and_table_props.second->compression_name); | |
356 | } | |
357 | delete db; | |
358 | } | |
359 | } | |
360 | ||
494da23a TL |
361 | TEST_F(CompactFilesTest, GetCompactionJobInfo) { |
362 | Options options; | |
363 | options.create_if_missing = true; | |
364 | // Disable RocksDB background compaction. | |
365 | options.compaction_style = kCompactionStyleNone; | |
366 | options.level0_slowdown_writes_trigger = 1000; | |
367 | options.level0_stop_writes_trigger = 1000; | |
368 | options.write_buffer_size = 65536; | |
369 | options.max_write_buffer_number = 2; | |
370 | options.compression = kNoCompression; | |
371 | options.max_compaction_bytes = 5000; | |
372 | ||
373 | // Add listener | |
374 | FlushedFileCollector* collector = new FlushedFileCollector(); | |
375 | options.listeners.emplace_back(collector); | |
376 | ||
377 | DB* db = nullptr; | |
378 | DestroyDB(db_name_, options); | |
379 | Status s = DB::Open(options, db_name_, &db); | |
380 | assert(s.ok()); | |
381 | assert(db); | |
382 | ||
383 | // create couple files | |
384 | for (int i = 0; i < 500; ++i) { | |
385 | db->Put(WriteOptions(), ToString(i), std::string(1000, 'a' + (i % 26))); | |
386 | } | |
20effc67 | 387 | static_cast_with_check<DBImpl>(db)->TEST_WaitForFlushMemTable(); |
494da23a TL |
388 | auto l0_files_1 = collector->GetFlushedFiles(); |
389 | CompactionOptions co; | |
390 | co.compression = CompressionType::kLZ4Compression; | |
f67539c2 | 391 | CompactionJobInfo compaction_job_info{}; |
494da23a TL |
392 | ASSERT_OK( |
393 | db->CompactFiles(co, l0_files_1, 0, -1, nullptr, &compaction_job_info)); | |
394 | ASSERT_EQ(compaction_job_info.base_input_level, 0); | |
395 | ASSERT_EQ(compaction_job_info.cf_id, db->DefaultColumnFamily()->GetID()); | |
396 | ASSERT_EQ(compaction_job_info.cf_name, db->DefaultColumnFamily()->GetName()); | |
397 | ASSERT_EQ(compaction_job_info.compaction_reason, | |
398 | CompactionReason::kManualCompaction); | |
399 | ASSERT_EQ(compaction_job_info.compression, CompressionType::kLZ4Compression); | |
400 | ASSERT_EQ(compaction_job_info.output_level, 0); | |
401 | ASSERT_OK(compaction_job_info.status); | |
402 | // no assertion failure | |
403 | delete db; | |
404 | } | |
405 | ||
f67539c2 | 406 | } // namespace ROCKSDB_NAMESPACE |
7c673cae FG |
407 | |
408 | int main(int argc, char** argv) { | |
409 | ::testing::InitGoogleTest(&argc, argv); | |
410 | return RUN_ALL_TESTS(); | |
411 | } | |
412 | ||
413 | #else | |
414 | #include <stdio.h> | |
415 | ||
11fdf7f2 | 416 | int main(int /*argc*/, char** /*argv*/) { |
7c673cae FG |
417 | fprintf(stderr, |
418 | "SKIPPED as DBImpl::CompactFiles is not supported in ROCKSDB_LITE\n"); | |
419 | return 0; | |
420 | } | |
421 | ||
422 | #endif // !ROCKSDB_LITE |