]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/compact_files_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / compact_files_test.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5
6#ifndef ROCKSDB_LITE
7
8#include <mutex>
9#include <string>
10#include <thread>
11#include <vector>
12
f67539c2 13#include "db/db_impl/db_impl.h"
7c673cae
FG
14#include "port/port.h"
15#include "rocksdb/db.h"
16#include "rocksdb/env.h"
f67539c2
TL
17#include "test_util/sync_point.h"
18#include "test_util/testharness.h"
20effc67 19#include "util/cast_util.h"
7c673cae 20#include "util/string_util.h"
7c673cae 21
f67539c2 22namespace ROCKSDB_NAMESPACE {
7c673cae
FG
23
24class CompactFilesTest : public testing::Test {
25 public:
26 CompactFilesTest() {
27 env_ = Env::Default();
11fdf7f2 28 db_name_ = test::PerThreadDBPath("compact_files_test");
7c673cae
FG
29 }
30
31 std::string db_name_;
32 Env* env_;
33};
34
35// A class which remembers the name of each flushed file.
36class FlushedFileCollector : public EventListener {
37 public:
38 FlushedFileCollector() {}
494da23a 39 ~FlushedFileCollector() override {}
7c673cae 40
494da23a 41 void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
7c673cae
FG
42 std::lock_guard<std::mutex> lock(mutex_);
43 flushed_files_.push_back(info.file_path);
44 }
45
46 std::vector<std::string> GetFlushedFiles() {
47 std::lock_guard<std::mutex> lock(mutex_);
48 std::vector<std::string> result;
49 for (auto fname : flushed_files_) {
50 result.push_back(fname);
51 }
52 return result;
53 }
54 void ClearFlushedFiles() {
55 std::lock_guard<std::mutex> lock(mutex_);
56 flushed_files_.clear();
57 }
58
59 private:
60 std::vector<std::string> flushed_files_;
61 std::mutex mutex_;
62};
63
64TEST_F(CompactFilesTest, L0ConflictsFiles) {
65 Options options;
66 // to trigger compaction more easily
67 const int kWriteBufferSize = 10000;
68 const int kLevel0Trigger = 2;
69 options.create_if_missing = true;
70 options.compaction_style = kCompactionStyleLevel;
71 // Small slowdown and stop trigger for experimental purpose.
72 options.level0_slowdown_writes_trigger = 20;
73 options.level0_stop_writes_trigger = 20;
74 options.level0_stop_writes_trigger = 20;
75 options.write_buffer_size = kWriteBufferSize;
76 options.level0_file_num_compaction_trigger = kLevel0Trigger;
77 options.compression = kNoCompression;
78
79 DB* db = nullptr;
80 DestroyDB(db_name_, options);
81 Status s = DB::Open(options, db_name_, &db);
82 assert(s.ok());
83 assert(db);
84
f67539c2 85 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
7c673cae
FG
86 {"CompactFilesImpl:0", "BackgroundCallCompaction:0"},
87 {"BackgroundCallCompaction:1", "CompactFilesImpl:1"},
88 });
f67539c2 89 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
7c673cae
FG
90
91 // create couple files
92 // Background compaction starts and waits in BackgroundCallCompaction:0
93 for (int i = 0; i < kLevel0Trigger * 4; ++i) {
94 db->Put(WriteOptions(), ToString(i), "");
95 db->Put(WriteOptions(), ToString(100 - i), "");
96 db->Flush(FlushOptions());
97 }
98
f67539c2 99 ROCKSDB_NAMESPACE::ColumnFamilyMetaData meta;
7c673cae
FG
100 db->GetColumnFamilyMetaData(&meta);
101 std::string file1;
102 for (auto& file : meta.levels[0].files) {
103 ASSERT_EQ(0, meta.levels[0].level);
104 if (file1 == "") {
105 file1 = file.db_path + "/" + file.name;
106 } else {
107 std::string file2 = file.db_path + "/" + file.name;
108 // Another thread starts a compact files and creates an L0 compaction
109 // The background compaction then notices that there is an L0 compaction
110 // already in progress and doesn't do an L0 compaction
111 // Once the background compaction finishes, the compact files finishes
f67539c2
TL
112 ASSERT_OK(db->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(),
113 {file1, file2}, 0));
7c673cae
FG
114 break;
115 }
116 }
f67539c2 117 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
7c673cae
FG
118 delete db;
119}
120
121TEST_F(CompactFilesTest, ObsoleteFiles) {
122 Options options;
123 // to trigger compaction more easily
124 const int kWriteBufferSize = 65536;
125 options.create_if_missing = true;
126 // Disable RocksDB background compaction.
127 options.compaction_style = kCompactionStyleNone;
128 options.level0_slowdown_writes_trigger = (1 << 30);
129 options.level0_stop_writes_trigger = (1 << 30);
130 options.write_buffer_size = kWriteBufferSize;
131 options.max_write_buffer_number = 2;
132 options.compression = kNoCompression;
133
134 // Add listener
135 FlushedFileCollector* collector = new FlushedFileCollector();
136 options.listeners.emplace_back(collector);
137
138 DB* db = nullptr;
139 DestroyDB(db_name_, options);
140 Status s = DB::Open(options, db_name_, &db);
141 assert(s.ok());
142 assert(db);
143
144 // create couple files
145 for (int i = 1000; i < 2000; ++i) {
146 db->Put(WriteOptions(), ToString(i),
147 std::string(kWriteBufferSize / 10, 'a' + (i % 26)));
148 }
149
150 auto l0_files = collector->GetFlushedFiles();
151 ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files, 1));
20effc67 152 static_cast_with_check<DBImpl>(db)->TEST_WaitForCompact();
7c673cae
FG
153
154 // verify all compaction input files are deleted
155 for (auto fname : l0_files) {
156 ASSERT_EQ(Status::NotFound(), env_->FileExists(fname));
157 }
158 delete db;
159}
160
161TEST_F(CompactFilesTest, NotCutOutputOnLevel0) {
162 Options options;
163 options.create_if_missing = true;
164 // Disable RocksDB background compaction.
165 options.compaction_style = kCompactionStyleNone;
166 options.level0_slowdown_writes_trigger = 1000;
167 options.level0_stop_writes_trigger = 1000;
168 options.write_buffer_size = 65536;
169 options.max_write_buffer_number = 2;
170 options.compression = kNoCompression;
171 options.max_compaction_bytes = 5000;
172
173 // Add listener
174 FlushedFileCollector* collector = new FlushedFileCollector();
175 options.listeners.emplace_back(collector);
176
177 DB* db = nullptr;
178 DestroyDB(db_name_, options);
179 Status s = DB::Open(options, db_name_, &db);
180 assert(s.ok());
181 assert(db);
182
183 // create couple files
184 for (int i = 0; i < 500; ++i) {
185 db->Put(WriteOptions(), ToString(i), std::string(1000, 'a' + (i % 26)));
186 }
20effc67 187 static_cast_with_check<DBImpl>(db)->TEST_WaitForFlushMemTable();
7c673cae
FG
188 auto l0_files_1 = collector->GetFlushedFiles();
189 collector->ClearFlushedFiles();
190 for (int i = 0; i < 500; ++i) {
191 db->Put(WriteOptions(), ToString(i), std::string(1000, 'a' + (i % 26)));
192 }
20effc67 193 static_cast_with_check<DBImpl>(db)->TEST_WaitForFlushMemTable();
7c673cae
FG
194 auto l0_files_2 = collector->GetFlushedFiles();
195 ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files_1, 0));
196 ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files_2, 0));
197 // no assertion failure
198 delete db;
199}
200
201TEST_F(CompactFilesTest, CapturingPendingFiles) {
202 Options options;
203 options.create_if_missing = true;
204 // Disable RocksDB background compaction.
205 options.compaction_style = kCompactionStyleNone;
206 // Always do full scans for obsolete files (needed to reproduce the issue).
207 options.delete_obsolete_files_period_micros = 0;
208
209 // Add listener.
210 FlushedFileCollector* collector = new FlushedFileCollector();
211 options.listeners.emplace_back(collector);
212
213 DB* db = nullptr;
214 DestroyDB(db_name_, options);
215 Status s = DB::Open(options, db_name_, &db);
216 assert(s.ok());
217 assert(db);
218
219 // Create 5 files.
220 for (int i = 0; i < 5; ++i) {
221 db->Put(WriteOptions(), "key" + ToString(i), "value");
222 db->Flush(FlushOptions());
223 }
224
225 auto l0_files = collector->GetFlushedFiles();
226 EXPECT_EQ(5, l0_files.size());
227
f67539c2 228 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
7c673cae
FG
229 {"CompactFilesImpl:2", "CompactFilesTest.CapturingPendingFiles:0"},
230 {"CompactFilesTest.CapturingPendingFiles:1", "CompactFilesImpl:3"},
231 });
f67539c2 232 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
7c673cae
FG
233
234 // Start compacting files.
f67539c2 235 ROCKSDB_NAMESPACE::port::Thread compaction_thread(
7c673cae
FG
236 [&] { EXPECT_OK(db->CompactFiles(CompactionOptions(), l0_files, 1)); });
237
238 // In the meantime flush another file.
239 TEST_SYNC_POINT("CompactFilesTest.CapturingPendingFiles:0");
240 db->Put(WriteOptions(), "key5", "value");
241 db->Flush(FlushOptions());
242 TEST_SYNC_POINT("CompactFilesTest.CapturingPendingFiles:1");
243
244 compaction_thread.join();
245
f67539c2 246 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
7c673cae
FG
247
248 delete db;
249
250 // Make sure we can reopen the DB.
251 s = DB::Open(options, db_name_, &db);
252 ASSERT_TRUE(s.ok());
253 assert(db);
254 delete db;
255}
256
257TEST_F(CompactFilesTest, CompactionFilterWithGetSv) {
258 class FilterWithGet : public CompactionFilter {
259 public:
494da23a
TL
260 bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
261 std::string* /*new_value*/,
262 bool* /*value_changed*/) const override {
7c673cae
FG
263 if (db_ == nullptr) {
264 return true;
265 }
266 std::string res;
267 db_->Get(ReadOptions(), "", &res);
268 return true;
269 }
270
271 void SetDB(DB* db) {
272 db_ = db;
273 }
274
494da23a 275 const char* Name() const override { return "FilterWithGet"; }
7c673cae
FG
276
277 private:
278 DB* db_;
279 };
280
281
282 std::shared_ptr<FilterWithGet> cf(new FilterWithGet());
283
284 Options options;
285 options.create_if_missing = true;
286 options.compaction_filter = cf.get();
287
288 DB* db = nullptr;
289 DestroyDB(db_name_, options);
290 Status s = DB::Open(options, db_name_, &db);
291 ASSERT_OK(s);
292
293 cf->SetDB(db);
294
295 // Write one L0 file
296 db->Put(WriteOptions(), "K1", "V1");
297 db->Flush(FlushOptions());
298
299 // Compact all L0 files using CompactFiles
f67539c2 300 ROCKSDB_NAMESPACE::ColumnFamilyMetaData meta;
7c673cae
FG
301 db->GetColumnFamilyMetaData(&meta);
302 for (auto& file : meta.levels[0].files) {
303 std::string fname = file.db_path + "/" + file.name;
304 ASSERT_OK(
f67539c2 305 db->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(), {fname}, 0));
7c673cae
FG
306 }
307
308
309 delete db;
310}
311
11fdf7f2
TL
312TEST_F(CompactFilesTest, SentinelCompressionType) {
313 if (!Zlib_Supported()) {
314 fprintf(stderr, "zlib compression not supported, skip this test\n");
315 return;
316 }
317 if (!Snappy_Supported()) {
318 fprintf(stderr, "snappy compression not supported, skip this test\n");
319 return;
320 }
321 // Check that passing `CompressionType::kDisableCompressionOption` to
322 // `CompactFiles` causes it to use the column family compression options.
323 for (auto compaction_style :
324 {CompactionStyle::kCompactionStyleLevel,
325 CompactionStyle::kCompactionStyleUniversal,
326 CompactionStyle::kCompactionStyleNone}) {
327 DestroyDB(db_name_, Options());
328 Options options;
329 options.compaction_style = compaction_style;
330 // L0: Snappy, L1: ZSTD, L2: Snappy
331 options.compression_per_level = {CompressionType::kSnappyCompression,
332 CompressionType::kZlibCompression,
333 CompressionType::kSnappyCompression};
334 options.create_if_missing = true;
335 FlushedFileCollector* collector = new FlushedFileCollector();
336 options.listeners.emplace_back(collector);
337 DB* db = nullptr;
338 ASSERT_OK(DB::Open(options, db_name_, &db));
339
340 db->Put(WriteOptions(), "key", "val");
341 db->Flush(FlushOptions());
342
343 auto l0_files = collector->GetFlushedFiles();
344 ASSERT_EQ(1, l0_files.size());
345
346 // L0->L1 compaction, so output should be ZSTD-compressed
347 CompactionOptions compaction_opts;
348 compaction_opts.compression = CompressionType::kDisableCompressionOption;
349 ASSERT_OK(db->CompactFiles(compaction_opts, l0_files, 1));
350
f67539c2 351 ROCKSDB_NAMESPACE::TablePropertiesCollection all_tables_props;
11fdf7f2
TL
352 ASSERT_OK(db->GetPropertiesOfAllTables(&all_tables_props));
353 for (const auto& name_and_table_props : all_tables_props) {
354 ASSERT_EQ(CompressionTypeToString(CompressionType::kZlibCompression),
355 name_and_table_props.second->compression_name);
356 }
357 delete db;
358 }
359}
360
494da23a
TL
361TEST_F(CompactFilesTest, GetCompactionJobInfo) {
362 Options options;
363 options.create_if_missing = true;
364 // Disable RocksDB background compaction.
365 options.compaction_style = kCompactionStyleNone;
366 options.level0_slowdown_writes_trigger = 1000;
367 options.level0_stop_writes_trigger = 1000;
368 options.write_buffer_size = 65536;
369 options.max_write_buffer_number = 2;
370 options.compression = kNoCompression;
371 options.max_compaction_bytes = 5000;
372
373 // Add listener
374 FlushedFileCollector* collector = new FlushedFileCollector();
375 options.listeners.emplace_back(collector);
376
377 DB* db = nullptr;
378 DestroyDB(db_name_, options);
379 Status s = DB::Open(options, db_name_, &db);
380 assert(s.ok());
381 assert(db);
382
383 // create couple files
384 for (int i = 0; i < 500; ++i) {
385 db->Put(WriteOptions(), ToString(i), std::string(1000, 'a' + (i % 26)));
386 }
20effc67 387 static_cast_with_check<DBImpl>(db)->TEST_WaitForFlushMemTable();
494da23a
TL
388 auto l0_files_1 = collector->GetFlushedFiles();
389 CompactionOptions co;
390 co.compression = CompressionType::kLZ4Compression;
f67539c2 391 CompactionJobInfo compaction_job_info{};
494da23a
TL
392 ASSERT_OK(
393 db->CompactFiles(co, l0_files_1, 0, -1, nullptr, &compaction_job_info));
394 ASSERT_EQ(compaction_job_info.base_input_level, 0);
395 ASSERT_EQ(compaction_job_info.cf_id, db->DefaultColumnFamily()->GetID());
396 ASSERT_EQ(compaction_job_info.cf_name, db->DefaultColumnFamily()->GetName());
397 ASSERT_EQ(compaction_job_info.compaction_reason,
398 CompactionReason::kManualCompaction);
399 ASSERT_EQ(compaction_job_info.compression, CompressionType::kLZ4Compression);
400 ASSERT_EQ(compaction_job_info.output_level, 0);
401 ASSERT_OK(compaction_job_info.status);
402 // no assertion failure
403 delete db;
404}
405
f67539c2 406} // namespace ROCKSDB_NAMESPACE
7c673cae
FG
407
408int main(int argc, char** argv) {
409 ::testing::InitGoogleTest(&argc, argv);
410 return RUN_ALL_TESTS();
411}
412
413#else
414#include <stdio.h>
415
11fdf7f2 416int main(int /*argc*/, char** /*argv*/) {
7c673cae
FG
417 fprintf(stderr,
418 "SKIPPED as DBImpl::CompactFiles is not supported in ROCKSDB_LITE\n");
419 return 0;
420}
421
422#endif // !ROCKSDB_LITE