]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/compact_files_test.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / rocksdb / db / compact_files_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
5
6 #ifndef ROCKSDB_LITE
7
8 #include <mutex>
9 #include <string>
10 #include <thread>
11 #include <vector>
12
13 #include "db/db_impl.h"
14 #include "port/port.h"
15 #include "rocksdb/db.h"
16 #include "rocksdb/env.h"
17 #include "util/string_util.h"
18 #include "util/sync_point.h"
19 #include "util/testharness.h"
20
21 namespace rocksdb {
22
23 class CompactFilesTest : public testing::Test {
24 public:
25 CompactFilesTest() {
26 env_ = Env::Default();
27 db_name_ = test::TmpDir(env_) + "/compact_files_test";
28 }
29
30 std::string db_name_;
31 Env* env_;
32 };
33
34 // A class which remembers the name of each flushed file.
35 class FlushedFileCollector : public EventListener {
36 public:
37 FlushedFileCollector() {}
38 ~FlushedFileCollector() {}
39
40 virtual void OnFlushCompleted(
41 DB* db, const FlushJobInfo& info) override {
42 std::lock_guard<std::mutex> lock(mutex_);
43 flushed_files_.push_back(info.file_path);
44 }
45
46 std::vector<std::string> GetFlushedFiles() {
47 std::lock_guard<std::mutex> lock(mutex_);
48 std::vector<std::string> result;
49 for (auto fname : flushed_files_) {
50 result.push_back(fname);
51 }
52 return result;
53 }
54 void ClearFlushedFiles() {
55 std::lock_guard<std::mutex> lock(mutex_);
56 flushed_files_.clear();
57 }
58
59 private:
60 std::vector<std::string> flushed_files_;
61 std::mutex mutex_;
62 };
63
64 TEST_F(CompactFilesTest, L0ConflictsFiles) {
65 Options options;
66 // to trigger compaction more easily
67 const int kWriteBufferSize = 10000;
68 const int kLevel0Trigger = 2;
69 options.create_if_missing = true;
70 options.compaction_style = kCompactionStyleLevel;
71 // Small slowdown and stop trigger for experimental purpose.
72 options.level0_slowdown_writes_trigger = 20;
73 options.level0_stop_writes_trigger = 20;
74 options.level0_stop_writes_trigger = 20;
75 options.write_buffer_size = kWriteBufferSize;
76 options.level0_file_num_compaction_trigger = kLevel0Trigger;
77 options.compression = kNoCompression;
78
79 DB* db = nullptr;
80 DestroyDB(db_name_, options);
81 Status s = DB::Open(options, db_name_, &db);
82 assert(s.ok());
83 assert(db);
84
85 rocksdb::SyncPoint::GetInstance()->LoadDependency({
86 {"CompactFilesImpl:0", "BackgroundCallCompaction:0"},
87 {"BackgroundCallCompaction:1", "CompactFilesImpl:1"},
88 });
89 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
90
91 // create couple files
92 // Background compaction starts and waits in BackgroundCallCompaction:0
93 for (int i = 0; i < kLevel0Trigger * 4; ++i) {
94 db->Put(WriteOptions(), ToString(i), "");
95 db->Put(WriteOptions(), ToString(100 - i), "");
96 db->Flush(FlushOptions());
97 }
98
99 rocksdb::ColumnFamilyMetaData meta;
100 db->GetColumnFamilyMetaData(&meta);
101 std::string file1;
102 for (auto& file : meta.levels[0].files) {
103 ASSERT_EQ(0, meta.levels[0].level);
104 if (file1 == "") {
105 file1 = file.db_path + "/" + file.name;
106 } else {
107 std::string file2 = file.db_path + "/" + file.name;
108 // Another thread starts a compact files and creates an L0 compaction
109 // The background compaction then notices that there is an L0 compaction
110 // already in progress and doesn't do an L0 compaction
111 // Once the background compaction finishes, the compact files finishes
112 ASSERT_OK(
113 db->CompactFiles(rocksdb::CompactionOptions(), {file1, file2}, 0));
114 break;
115 }
116 }
117 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
118 delete db;
119 }
120
121 TEST_F(CompactFilesTest, ObsoleteFiles) {
122 Options options;
123 // to trigger compaction more easily
124 const int kWriteBufferSize = 65536;
125 options.create_if_missing = true;
126 // Disable RocksDB background compaction.
127 options.compaction_style = kCompactionStyleNone;
128 options.level0_slowdown_writes_trigger = (1 << 30);
129 options.level0_stop_writes_trigger = (1 << 30);
130 options.write_buffer_size = kWriteBufferSize;
131 options.max_write_buffer_number = 2;
132 options.compression = kNoCompression;
133
134 // Add listener
135 FlushedFileCollector* collector = new FlushedFileCollector();
136 options.listeners.emplace_back(collector);
137
138 DB* db = nullptr;
139 DestroyDB(db_name_, options);
140 Status s = DB::Open(options, db_name_, &db);
141 assert(s.ok());
142 assert(db);
143
144 // create couple files
145 for (int i = 1000; i < 2000; ++i) {
146 db->Put(WriteOptions(), ToString(i),
147 std::string(kWriteBufferSize / 10, 'a' + (i % 26)));
148 }
149
150 auto l0_files = collector->GetFlushedFiles();
151 ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files, 1));
152 reinterpret_cast<DBImpl*>(db)->TEST_WaitForCompact();
153
154 // verify all compaction input files are deleted
155 for (auto fname : l0_files) {
156 ASSERT_EQ(Status::NotFound(), env_->FileExists(fname));
157 }
158 delete db;
159 }
160
161 TEST_F(CompactFilesTest, NotCutOutputOnLevel0) {
162 Options options;
163 options.create_if_missing = true;
164 // Disable RocksDB background compaction.
165 options.compaction_style = kCompactionStyleNone;
166 options.level0_slowdown_writes_trigger = 1000;
167 options.level0_stop_writes_trigger = 1000;
168 options.write_buffer_size = 65536;
169 options.max_write_buffer_number = 2;
170 options.compression = kNoCompression;
171 options.max_compaction_bytes = 5000;
172
173 // Add listener
174 FlushedFileCollector* collector = new FlushedFileCollector();
175 options.listeners.emplace_back(collector);
176
177 DB* db = nullptr;
178 DestroyDB(db_name_, options);
179 Status s = DB::Open(options, db_name_, &db);
180 assert(s.ok());
181 assert(db);
182
183 // create couple files
184 for (int i = 0; i < 500; ++i) {
185 db->Put(WriteOptions(), ToString(i), std::string(1000, 'a' + (i % 26)));
186 }
187 reinterpret_cast<DBImpl*>(db)->TEST_WaitForFlushMemTable();
188 auto l0_files_1 = collector->GetFlushedFiles();
189 collector->ClearFlushedFiles();
190 for (int i = 0; i < 500; ++i) {
191 db->Put(WriteOptions(), ToString(i), std::string(1000, 'a' + (i % 26)));
192 }
193 reinterpret_cast<DBImpl*>(db)->TEST_WaitForFlushMemTable();
194 auto l0_files_2 = collector->GetFlushedFiles();
195 ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files_1, 0));
196 ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files_2, 0));
197 // no assertion failure
198 delete db;
199 }
200
201 TEST_F(CompactFilesTest, CapturingPendingFiles) {
202 Options options;
203 options.create_if_missing = true;
204 // Disable RocksDB background compaction.
205 options.compaction_style = kCompactionStyleNone;
206 // Always do full scans for obsolete files (needed to reproduce the issue).
207 options.delete_obsolete_files_period_micros = 0;
208
209 // Add listener.
210 FlushedFileCollector* collector = new FlushedFileCollector();
211 options.listeners.emplace_back(collector);
212
213 DB* db = nullptr;
214 DestroyDB(db_name_, options);
215 Status s = DB::Open(options, db_name_, &db);
216 assert(s.ok());
217 assert(db);
218
219 // Create 5 files.
220 for (int i = 0; i < 5; ++i) {
221 db->Put(WriteOptions(), "key" + ToString(i), "value");
222 db->Flush(FlushOptions());
223 }
224
225 auto l0_files = collector->GetFlushedFiles();
226 EXPECT_EQ(5, l0_files.size());
227
228 rocksdb::SyncPoint::GetInstance()->LoadDependency({
229 {"CompactFilesImpl:2", "CompactFilesTest.CapturingPendingFiles:0"},
230 {"CompactFilesTest.CapturingPendingFiles:1", "CompactFilesImpl:3"},
231 });
232 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
233
234 // Start compacting files.
235 rocksdb::port::Thread compaction_thread(
236 [&] { EXPECT_OK(db->CompactFiles(CompactionOptions(), l0_files, 1)); });
237
238 // In the meantime flush another file.
239 TEST_SYNC_POINT("CompactFilesTest.CapturingPendingFiles:0");
240 db->Put(WriteOptions(), "key5", "value");
241 db->Flush(FlushOptions());
242 TEST_SYNC_POINT("CompactFilesTest.CapturingPendingFiles:1");
243
244 compaction_thread.join();
245
246 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
247
248 delete db;
249
250 // Make sure we can reopen the DB.
251 s = DB::Open(options, db_name_, &db);
252 ASSERT_TRUE(s.ok());
253 assert(db);
254 delete db;
255 }
256
257 TEST_F(CompactFilesTest, CompactionFilterWithGetSv) {
258 class FilterWithGet : public CompactionFilter {
259 public:
260 virtual bool Filter(int level, const Slice& key, const Slice& value,
261 std::string* new_value,
262 bool* value_changed) const override {
263 if (db_ == nullptr) {
264 return true;
265 }
266 std::string res;
267 db_->Get(ReadOptions(), "", &res);
268 return true;
269 }
270
271 void SetDB(DB* db) {
272 db_ = db;
273 }
274
275 virtual const char* Name() const override { return "FilterWithGet"; }
276
277 private:
278 DB* db_;
279 };
280
281
282 std::shared_ptr<FilterWithGet> cf(new FilterWithGet());
283
284 Options options;
285 options.create_if_missing = true;
286 options.compaction_filter = cf.get();
287
288 DB* db = nullptr;
289 DestroyDB(db_name_, options);
290 Status s = DB::Open(options, db_name_, &db);
291 ASSERT_OK(s);
292
293 cf->SetDB(db);
294
295 // Write one L0 file
296 db->Put(WriteOptions(), "K1", "V1");
297 db->Flush(FlushOptions());
298
299 // Compact all L0 files using CompactFiles
300 rocksdb::ColumnFamilyMetaData meta;
301 db->GetColumnFamilyMetaData(&meta);
302 for (auto& file : meta.levels[0].files) {
303 std::string fname = file.db_path + "/" + file.name;
304 ASSERT_OK(
305 db->CompactFiles(rocksdb::CompactionOptions(), {fname}, 0));
306 }
307
308
309 delete db;
310 }
311
312 } // namespace rocksdb
313
314 int main(int argc, char** argv) {
315 ::testing::InitGoogleTest(&argc, argv);
316 return RUN_ALL_TESTS();
317 }
318
319 #else
320 #include <stdio.h>
321
322 int main(int argc, char** argv) {
323 fprintf(stderr,
324 "SKIPPED as DBImpl::CompactFiles is not supported in ROCKSDB_LITE\n");
325 return 0;
326 }
327
328 #endif // !ROCKSDB_LITE