]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/compact_files_test.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / db / compact_files_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include <mutex>
9 #include <string>
10 #include <thread>
11 #include <vector>
12
13 #include "db/db_impl/db_impl.h"
14 #include "port/port.h"
15 #include "rocksdb/db.h"
16 #include "rocksdb/env.h"
17 #include "test_util/sync_point.h"
18 #include "test_util/testharness.h"
19 #include "util/string_util.h"
20
21 namespace ROCKSDB_NAMESPACE {
22
23 class CompactFilesTest : public testing::Test {
24 public:
25 CompactFilesTest() {
26 env_ = Env::Default();
27 db_name_ = test::PerThreadDBPath("compact_files_test");
28 }
29
30 std::string db_name_;
31 Env* env_;
32 };
33
34 // A class which remembers the name of each flushed file.
35 class FlushedFileCollector : public EventListener {
36 public:
37 FlushedFileCollector() {}
38 ~FlushedFileCollector() override {}
39
40 void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
41 std::lock_guard<std::mutex> lock(mutex_);
42 flushed_files_.push_back(info.file_path);
43 }
44
45 std::vector<std::string> GetFlushedFiles() {
46 std::lock_guard<std::mutex> lock(mutex_);
47 std::vector<std::string> result;
48 for (auto fname : flushed_files_) {
49 result.push_back(fname);
50 }
51 return result;
52 }
53 void ClearFlushedFiles() {
54 std::lock_guard<std::mutex> lock(mutex_);
55 flushed_files_.clear();
56 }
57
58 private:
59 std::vector<std::string> flushed_files_;
60 std::mutex mutex_;
61 };
62
63 TEST_F(CompactFilesTest, L0ConflictsFiles) {
64 Options options;
65 // to trigger compaction more easily
66 const int kWriteBufferSize = 10000;
67 const int kLevel0Trigger = 2;
68 options.create_if_missing = true;
69 options.compaction_style = kCompactionStyleLevel;
70 // Small slowdown and stop trigger for experimental purpose.
71 options.level0_slowdown_writes_trigger = 20;
72 options.level0_stop_writes_trigger = 20;
73 options.level0_stop_writes_trigger = 20;
74 options.write_buffer_size = kWriteBufferSize;
75 options.level0_file_num_compaction_trigger = kLevel0Trigger;
76 options.compression = kNoCompression;
77
78 DB* db = nullptr;
79 DestroyDB(db_name_, options);
80 Status s = DB::Open(options, db_name_, &db);
81 assert(s.ok());
82 assert(db);
83
84 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
85 {"CompactFilesImpl:0", "BackgroundCallCompaction:0"},
86 {"BackgroundCallCompaction:1", "CompactFilesImpl:1"},
87 });
88 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
89
90 // create couple files
91 // Background compaction starts and waits in BackgroundCallCompaction:0
92 for (int i = 0; i < kLevel0Trigger * 4; ++i) {
93 db->Put(WriteOptions(), ToString(i), "");
94 db->Put(WriteOptions(), ToString(100 - i), "");
95 db->Flush(FlushOptions());
96 }
97
98 ROCKSDB_NAMESPACE::ColumnFamilyMetaData meta;
99 db->GetColumnFamilyMetaData(&meta);
100 std::string file1;
101 for (auto& file : meta.levels[0].files) {
102 ASSERT_EQ(0, meta.levels[0].level);
103 if (file1 == "") {
104 file1 = file.db_path + "/" + file.name;
105 } else {
106 std::string file2 = file.db_path + "/" + file.name;
107 // Another thread starts a compact files and creates an L0 compaction
108 // The background compaction then notices that there is an L0 compaction
109 // already in progress and doesn't do an L0 compaction
110 // Once the background compaction finishes, the compact files finishes
111 ASSERT_OK(db->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(),
112 {file1, file2}, 0));
113 break;
114 }
115 }
116 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
117 delete db;
118 }
119
120 TEST_F(CompactFilesTest, ObsoleteFiles) {
121 Options options;
122 // to trigger compaction more easily
123 const int kWriteBufferSize = 65536;
124 options.create_if_missing = true;
125 // Disable RocksDB background compaction.
126 options.compaction_style = kCompactionStyleNone;
127 options.level0_slowdown_writes_trigger = (1 << 30);
128 options.level0_stop_writes_trigger = (1 << 30);
129 options.write_buffer_size = kWriteBufferSize;
130 options.max_write_buffer_number = 2;
131 options.compression = kNoCompression;
132
133 // Add listener
134 FlushedFileCollector* collector = new FlushedFileCollector();
135 options.listeners.emplace_back(collector);
136
137 DB* db = nullptr;
138 DestroyDB(db_name_, options);
139 Status s = DB::Open(options, db_name_, &db);
140 assert(s.ok());
141 assert(db);
142
143 // create couple files
144 for (int i = 1000; i < 2000; ++i) {
145 db->Put(WriteOptions(), ToString(i),
146 std::string(kWriteBufferSize / 10, 'a' + (i % 26)));
147 }
148
149 auto l0_files = collector->GetFlushedFiles();
150 ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files, 1));
151 reinterpret_cast<DBImpl*>(db)->TEST_WaitForCompact();
152
153 // verify all compaction input files are deleted
154 for (auto fname : l0_files) {
155 ASSERT_EQ(Status::NotFound(), env_->FileExists(fname));
156 }
157 delete db;
158 }
159
160 TEST_F(CompactFilesTest, NotCutOutputOnLevel0) {
161 Options options;
162 options.create_if_missing = true;
163 // Disable RocksDB background compaction.
164 options.compaction_style = kCompactionStyleNone;
165 options.level0_slowdown_writes_trigger = 1000;
166 options.level0_stop_writes_trigger = 1000;
167 options.write_buffer_size = 65536;
168 options.max_write_buffer_number = 2;
169 options.compression = kNoCompression;
170 options.max_compaction_bytes = 5000;
171
172 // Add listener
173 FlushedFileCollector* collector = new FlushedFileCollector();
174 options.listeners.emplace_back(collector);
175
176 DB* db = nullptr;
177 DestroyDB(db_name_, options);
178 Status s = DB::Open(options, db_name_, &db);
179 assert(s.ok());
180 assert(db);
181
182 // create couple files
183 for (int i = 0; i < 500; ++i) {
184 db->Put(WriteOptions(), ToString(i), std::string(1000, 'a' + (i % 26)));
185 }
186 reinterpret_cast<DBImpl*>(db)->TEST_WaitForFlushMemTable();
187 auto l0_files_1 = collector->GetFlushedFiles();
188 collector->ClearFlushedFiles();
189 for (int i = 0; i < 500; ++i) {
190 db->Put(WriteOptions(), ToString(i), std::string(1000, 'a' + (i % 26)));
191 }
192 reinterpret_cast<DBImpl*>(db)->TEST_WaitForFlushMemTable();
193 auto l0_files_2 = collector->GetFlushedFiles();
194 ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files_1, 0));
195 ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files_2, 0));
196 // no assertion failure
197 delete db;
198 }
199
200 TEST_F(CompactFilesTest, CapturingPendingFiles) {
201 Options options;
202 options.create_if_missing = true;
203 // Disable RocksDB background compaction.
204 options.compaction_style = kCompactionStyleNone;
205 // Always do full scans for obsolete files (needed to reproduce the issue).
206 options.delete_obsolete_files_period_micros = 0;
207
208 // Add listener.
209 FlushedFileCollector* collector = new FlushedFileCollector();
210 options.listeners.emplace_back(collector);
211
212 DB* db = nullptr;
213 DestroyDB(db_name_, options);
214 Status s = DB::Open(options, db_name_, &db);
215 assert(s.ok());
216 assert(db);
217
218 // Create 5 files.
219 for (int i = 0; i < 5; ++i) {
220 db->Put(WriteOptions(), "key" + ToString(i), "value");
221 db->Flush(FlushOptions());
222 }
223
224 auto l0_files = collector->GetFlushedFiles();
225 EXPECT_EQ(5, l0_files.size());
226
227 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
228 {"CompactFilesImpl:2", "CompactFilesTest.CapturingPendingFiles:0"},
229 {"CompactFilesTest.CapturingPendingFiles:1", "CompactFilesImpl:3"},
230 });
231 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
232
233 // Start compacting files.
234 ROCKSDB_NAMESPACE::port::Thread compaction_thread(
235 [&] { EXPECT_OK(db->CompactFiles(CompactionOptions(), l0_files, 1)); });
236
237 // In the meantime flush another file.
238 TEST_SYNC_POINT("CompactFilesTest.CapturingPendingFiles:0");
239 db->Put(WriteOptions(), "key5", "value");
240 db->Flush(FlushOptions());
241 TEST_SYNC_POINT("CompactFilesTest.CapturingPendingFiles:1");
242
243 compaction_thread.join();
244
245 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
246
247 delete db;
248
249 // Make sure we can reopen the DB.
250 s = DB::Open(options, db_name_, &db);
251 ASSERT_TRUE(s.ok());
252 assert(db);
253 delete db;
254 }
255
256 TEST_F(CompactFilesTest, CompactionFilterWithGetSv) {
257 class FilterWithGet : public CompactionFilter {
258 public:
259 bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
260 std::string* /*new_value*/,
261 bool* /*value_changed*/) const override {
262 if (db_ == nullptr) {
263 return true;
264 }
265 std::string res;
266 db_->Get(ReadOptions(), "", &res);
267 return true;
268 }
269
270 void SetDB(DB* db) {
271 db_ = db;
272 }
273
274 const char* Name() const override { return "FilterWithGet"; }
275
276 private:
277 DB* db_;
278 };
279
280
281 std::shared_ptr<FilterWithGet> cf(new FilterWithGet());
282
283 Options options;
284 options.create_if_missing = true;
285 options.compaction_filter = cf.get();
286
287 DB* db = nullptr;
288 DestroyDB(db_name_, options);
289 Status s = DB::Open(options, db_name_, &db);
290 ASSERT_OK(s);
291
292 cf->SetDB(db);
293
294 // Write one L0 file
295 db->Put(WriteOptions(), "K1", "V1");
296 db->Flush(FlushOptions());
297
298 // Compact all L0 files using CompactFiles
299 ROCKSDB_NAMESPACE::ColumnFamilyMetaData meta;
300 db->GetColumnFamilyMetaData(&meta);
301 for (auto& file : meta.levels[0].files) {
302 std::string fname = file.db_path + "/" + file.name;
303 ASSERT_OK(
304 db->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(), {fname}, 0));
305 }
306
307
308 delete db;
309 }
310
311 TEST_F(CompactFilesTest, SentinelCompressionType) {
312 if (!Zlib_Supported()) {
313 fprintf(stderr, "zlib compression not supported, skip this test\n");
314 return;
315 }
316 if (!Snappy_Supported()) {
317 fprintf(stderr, "snappy compression not supported, skip this test\n");
318 return;
319 }
320 // Check that passing `CompressionType::kDisableCompressionOption` to
321 // `CompactFiles` causes it to use the column family compression options.
322 for (auto compaction_style :
323 {CompactionStyle::kCompactionStyleLevel,
324 CompactionStyle::kCompactionStyleUniversal,
325 CompactionStyle::kCompactionStyleNone}) {
326 DestroyDB(db_name_, Options());
327 Options options;
328 options.compaction_style = compaction_style;
329 // L0: Snappy, L1: ZSTD, L2: Snappy
330 options.compression_per_level = {CompressionType::kSnappyCompression,
331 CompressionType::kZlibCompression,
332 CompressionType::kSnappyCompression};
333 options.create_if_missing = true;
334 FlushedFileCollector* collector = new FlushedFileCollector();
335 options.listeners.emplace_back(collector);
336 DB* db = nullptr;
337 ASSERT_OK(DB::Open(options, db_name_, &db));
338
339 db->Put(WriteOptions(), "key", "val");
340 db->Flush(FlushOptions());
341
342 auto l0_files = collector->GetFlushedFiles();
343 ASSERT_EQ(1, l0_files.size());
344
345 // L0->L1 compaction, so output should be ZSTD-compressed
346 CompactionOptions compaction_opts;
347 compaction_opts.compression = CompressionType::kDisableCompressionOption;
348 ASSERT_OK(db->CompactFiles(compaction_opts, l0_files, 1));
349
350 ROCKSDB_NAMESPACE::TablePropertiesCollection all_tables_props;
351 ASSERT_OK(db->GetPropertiesOfAllTables(&all_tables_props));
352 for (const auto& name_and_table_props : all_tables_props) {
353 ASSERT_EQ(CompressionTypeToString(CompressionType::kZlibCompression),
354 name_and_table_props.second->compression_name);
355 }
356 delete db;
357 }
358 }
359
360 TEST_F(CompactFilesTest, GetCompactionJobInfo) {
361 Options options;
362 options.create_if_missing = true;
363 // Disable RocksDB background compaction.
364 options.compaction_style = kCompactionStyleNone;
365 options.level0_slowdown_writes_trigger = 1000;
366 options.level0_stop_writes_trigger = 1000;
367 options.write_buffer_size = 65536;
368 options.max_write_buffer_number = 2;
369 options.compression = kNoCompression;
370 options.max_compaction_bytes = 5000;
371
372 // Add listener
373 FlushedFileCollector* collector = new FlushedFileCollector();
374 options.listeners.emplace_back(collector);
375
376 DB* db = nullptr;
377 DestroyDB(db_name_, options);
378 Status s = DB::Open(options, db_name_, &db);
379 assert(s.ok());
380 assert(db);
381
382 // create couple files
383 for (int i = 0; i < 500; ++i) {
384 db->Put(WriteOptions(), ToString(i), std::string(1000, 'a' + (i % 26)));
385 }
386 reinterpret_cast<DBImpl*>(db)->TEST_WaitForFlushMemTable();
387 auto l0_files_1 = collector->GetFlushedFiles();
388 CompactionOptions co;
389 co.compression = CompressionType::kLZ4Compression;
390 CompactionJobInfo compaction_job_info{};
391 ASSERT_OK(
392 db->CompactFiles(co, l0_files_1, 0, -1, nullptr, &compaction_job_info));
393 ASSERT_EQ(compaction_job_info.base_input_level, 0);
394 ASSERT_EQ(compaction_job_info.cf_id, db->DefaultColumnFamily()->GetID());
395 ASSERT_EQ(compaction_job_info.cf_name, db->DefaultColumnFamily()->GetName());
396 ASSERT_EQ(compaction_job_info.compaction_reason,
397 CompactionReason::kManualCompaction);
398 ASSERT_EQ(compaction_job_info.compression, CompressionType::kLZ4Compression);
399 ASSERT_EQ(compaction_job_info.output_level, 0);
400 ASSERT_OK(compaction_job_info.status);
401 // no assertion failure
402 delete db;
403 }
404
405 } // namespace ROCKSDB_NAMESPACE
406
407 int main(int argc, char** argv) {
408 ::testing::InitGoogleTest(&argc, argv);
409 return RUN_ALL_TESTS();
410 }
411
412 #else
413 #include <stdio.h>
414
415 int main(int /*argc*/, char** /*argv*/) {
416 fprintf(stderr,
417 "SKIPPED as DBImpl::CompactFiles is not supported in ROCKSDB_LITE\n");
418 return 0;
419 }
420
421 #endif // !ROCKSDB_LITE