]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_merge_operand_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / db_merge_operand_test.cc
1 // Copyright (c) 2018-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "db/db_test_util.h"
7 #include "port/stack_trace.h"
8 #include "rocksdb/perf_context.h"
9 #include "rocksdb/utilities/debug.h"
10 #include "table/block_based/block_builder.h"
11 #if !defined(ROCKSDB_LITE)
12 #include "test_util/sync_point.h"
13 #endif
14 #include "rocksdb/merge_operator.h"
15 #include "utilities/fault_injection_env.h"
16 #include "utilities/merge_operators.h"
17 #include "utilities/merge_operators/sortlist.h"
18 #include "utilities/merge_operators/string_append/stringappend2.h"
19
20 namespace ROCKSDB_NAMESPACE {
21
22 namespace {
23 class LimitedStringAppendMergeOp : public StringAppendTESTOperator {
24 public:
25 LimitedStringAppendMergeOp(int limit, char delim)
26 : StringAppendTESTOperator(delim), limit_(limit) {}
27
28 const char* Name() const override {
29 return "DBMergeOperatorTest::LimitedStringAppendMergeOp";
30 }
31
32 bool ShouldMerge(const std::vector<Slice>& operands) const override {
33 if (operands.size() > 0 && limit_ > 0 && operands.size() >= limit_) {
34 return true;
35 }
36 return false;
37 }
38
39 private:
40 size_t limit_ = 0;
41 };
42 } // anonymous namespace
43
44 class DBMergeOperandTest : public DBTestBase {
45 public:
46 DBMergeOperandTest()
47 : DBTestBase("db_merge_operand_test", /*env_do_fsync=*/true) {}
48 };
49
50 TEST_F(DBMergeOperandTest, CacheEvictedMergeOperandReadAfterFreeBug) {
51 // There was a bug of reading merge operands after they are mistakely freed
52 // in DB::GetMergeOperands, which is surfaced by cache full.
53 // See PR#9507 for more.
54 Options options;
55 options.create_if_missing = true;
56 options.merge_operator = MergeOperators::CreateStringAppendOperator();
57 options.env = env_;
58 BlockBasedTableOptions table_options;
59
60 // Small cache to simulate cache full
61 table_options.block_cache = NewLRUCache(1);
62 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
63
64 Reopen(options);
65 int num_records = 4;
66 int number_of_operands = 0;
67 std::vector<PinnableSlice> values(num_records);
68 GetMergeOperandsOptions merge_operands_info;
69 merge_operands_info.expected_max_number_of_operands = num_records;
70
71 ASSERT_OK(Merge("k1", "v1"));
72 ASSERT_OK(Flush());
73 ASSERT_OK(Merge("k1", "v2"));
74 ASSERT_OK(Flush());
75 ASSERT_OK(Merge("k1", "v3"));
76 ASSERT_OK(Flush());
77 ASSERT_OK(Merge("k1", "v4"));
78
79 ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
80 "k1", values.data(), &merge_operands_info,
81 &number_of_operands));
82 ASSERT_EQ(number_of_operands, 4);
83 ASSERT_EQ(values[0].ToString(), "v1");
84 ASSERT_EQ(values[1].ToString(), "v2");
85 ASSERT_EQ(values[2].ToString(), "v3");
86 ASSERT_EQ(values[3].ToString(), "v4");
87 }
88
89 TEST_F(DBMergeOperandTest, FlushedMergeOperandReadAfterFreeBug) {
90 // Repro for a bug where a memtable containing a merge operand could be
91 // deleted before the merge operand was saved to the result.
92 auto options = CurrentOptions();
93 options.merge_operator = MergeOperators::CreateStringAppendOperator();
94 Reopen(options);
95
96 ASSERT_OK(Merge("key", "value"));
97
98 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
99 {{"DBImpl::GetImpl:PostMemTableGet:0",
100 "DBMergeOperandTest::FlushedMergeOperandReadAfterFreeBug:PreFlush"},
101 {"DBMergeOperandTest::FlushedMergeOperandReadAfterFreeBug:PostFlush",
102 "DBImpl::GetImpl:PostMemTableGet:1"}});
103 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
104
105 auto flush_thread = port::Thread([&]() {
106 TEST_SYNC_POINT(
107 "DBMergeOperandTest::FlushedMergeOperandReadAfterFreeBug:PreFlush");
108 ASSERT_OK(Flush());
109 TEST_SYNC_POINT(
110 "DBMergeOperandTest::FlushedMergeOperandReadAfterFreeBug:PostFlush");
111 });
112
113 PinnableSlice value;
114 GetMergeOperandsOptions merge_operands_info;
115 merge_operands_info.expected_max_number_of_operands = 1;
116 int number_of_operands;
117 ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
118 "key", &value, &merge_operands_info,
119 &number_of_operands));
120 ASSERT_EQ(1, number_of_operands);
121
122 flush_thread.join();
123 }
124
125 TEST_F(DBMergeOperandTest, GetMergeOperandsBasic) {
126 Options options;
127 options.create_if_missing = true;
128 // Use only the latest two merge operands.
129 options.merge_operator = std::make_shared<LimitedStringAppendMergeOp>(2, ',');
130 options.env = env_;
131 Reopen(options);
132 int num_records = 4;
133 int number_of_operands = 0;
134 std::vector<PinnableSlice> values(num_records);
135 GetMergeOperandsOptions merge_operands_info;
136 merge_operands_info.expected_max_number_of_operands = num_records;
137
138 // k0 value in memtable
139 ASSERT_OK(Put("k0", "PutARock"));
140 ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
141 "k0", values.data(), &merge_operands_info,
142 &number_of_operands));
143 ASSERT_EQ(values[0], "PutARock");
144
145 // k0.1 value in SST
146 ASSERT_OK(Put("k0.1", "RockInSST"));
147 ASSERT_OK(Flush());
148 ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
149 "k0.1", values.data(), &merge_operands_info,
150 &number_of_operands));
151 ASSERT_EQ(values[0], "RockInSST");
152
153 // All k1 values are in memtable.
154 ASSERT_OK(Merge("k1", "a"));
155 ASSERT_OK(Put("k1", "x"));
156 ASSERT_OK(Merge("k1", "b"));
157 ASSERT_OK(Merge("k1", "c"));
158 ASSERT_OK(Merge("k1", "d"));
159 ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
160 "k1", values.data(), &merge_operands_info,
161 &number_of_operands));
162 ASSERT_EQ(values[0], "x");
163 ASSERT_EQ(values[1], "b");
164 ASSERT_EQ(values[2], "c");
165 ASSERT_EQ(values[3], "d");
166
167 // expected_max_number_of_operands is less than number of merge operands so
168 // status should be Incomplete.
169 merge_operands_info.expected_max_number_of_operands = num_records - 1;
170 Status status = db_->GetMergeOperands(
171 ReadOptions(), db_->DefaultColumnFamily(), "k1", values.data(),
172 &merge_operands_info, &number_of_operands);
173 ASSERT_EQ(status.IsIncomplete(), true);
174 merge_operands_info.expected_max_number_of_operands = num_records;
175
176 // All k1.1 values are in memtable.
177 ASSERT_OK(Merge("k1.1", "r"));
178 ASSERT_OK(Delete("k1.1"));
179 ASSERT_OK(Merge("k1.1", "c"));
180 ASSERT_OK(Merge("k1.1", "k"));
181 ASSERT_OK(Merge("k1.1", "s"));
182 ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
183 "k1.1", values.data(), &merge_operands_info,
184 &number_of_operands));
185 ASSERT_EQ(values[0], "c");
186 ASSERT_EQ(values[1], "k");
187 ASSERT_EQ(values[2], "s");
188
189 // All k2 values are flushed to L0 into a single file.
190 ASSERT_OK(Merge("k2", "q"));
191 ASSERT_OK(Merge("k2", "w"));
192 ASSERT_OK(Merge("k2", "e"));
193 ASSERT_OK(Merge("k2", "r"));
194 ASSERT_OK(Flush());
195 ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
196 "k2", values.data(), &merge_operands_info,
197 &number_of_operands));
198 ASSERT_EQ(values[0], "q");
199 ASSERT_EQ(values[1], "w");
200 ASSERT_EQ(values[2], "e");
201 ASSERT_EQ(values[3], "r");
202
203 // All k2.1 values are flushed to L0 into a single file.
204 ASSERT_OK(Merge("k2.1", "m"));
205 ASSERT_OK(Put("k2.1", "l"));
206 ASSERT_OK(Merge("k2.1", "n"));
207 ASSERT_OK(Merge("k2.1", "o"));
208 ASSERT_OK(Flush());
209 ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
210 "k2.1", values.data(), &merge_operands_info,
211 &number_of_operands));
212 ASSERT_EQ(values[0], "l,n,o");
213
214 // All k2.2 values are flushed to L0 into a single file.
215 ASSERT_OK(Merge("k2.2", "g"));
216 ASSERT_OK(Delete("k2.2"));
217 ASSERT_OK(Merge("k2.2", "o"));
218 ASSERT_OK(Merge("k2.2", "t"));
219 ASSERT_OK(Flush());
220 ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
221 "k2.2", values.data(), &merge_operands_info,
222 &number_of_operands));
223 ASSERT_EQ(values[0], "o,t");
224
225 // Do some compaction that will make the following tests more predictable
226 // Slice start("PutARock");
227 // Slice end("t");
228 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
229
230 // All k3 values are flushed and are in different files.
231 ASSERT_OK(Merge("k3", "ab"));
232 ASSERT_OK(Flush());
233 ASSERT_OK(Merge("k3", "bc"));
234 ASSERT_OK(Flush());
235 ASSERT_OK(Merge("k3", "cd"));
236 ASSERT_OK(Flush());
237 ASSERT_OK(Merge("k3", "de"));
238 ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
239 "k3", values.data(), &merge_operands_info,
240 &number_of_operands));
241 ASSERT_EQ(values[0], "ab");
242 ASSERT_EQ(values[1], "bc");
243 ASSERT_EQ(values[2], "cd");
244 ASSERT_EQ(values[3], "de");
245
246 // All k3.1 values are flushed and are in different files.
247 ASSERT_OK(Merge("k3.1", "ab"));
248 ASSERT_OK(Flush());
249 ASSERT_OK(Put("k3.1", "bc"));
250 ASSERT_OK(Flush());
251 ASSERT_OK(Merge("k3.1", "cd"));
252 ASSERT_OK(Flush());
253 ASSERT_OK(Merge("k3.1", "de"));
254 ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
255 "k3.1", values.data(), &merge_operands_info,
256 &number_of_operands));
257 ASSERT_EQ(values[0], "bc");
258 ASSERT_EQ(values[1], "cd");
259 ASSERT_EQ(values[2], "de");
260
261 // All k3.2 values are flushed and are in different files.
262 ASSERT_OK(Merge("k3.2", "ab"));
263 ASSERT_OK(Flush());
264 ASSERT_OK(Delete("k3.2"));
265 ASSERT_OK(Flush());
266 ASSERT_OK(Merge("k3.2", "cd"));
267 ASSERT_OK(Flush());
268 ASSERT_OK(Merge("k3.2", "de"));
269 ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
270 "k3.2", values.data(), &merge_operands_info,
271 &number_of_operands));
272 ASSERT_EQ(values[0], "cd");
273 ASSERT_EQ(values[1], "de");
274
275 // All K4 values are in different levels
276 ASSERT_OK(Merge("k4", "ba"));
277 ASSERT_OK(Flush());
278 MoveFilesToLevel(4);
279 ASSERT_OK(Merge("k4", "cb"));
280 ASSERT_OK(Flush());
281 MoveFilesToLevel(3);
282 ASSERT_OK(Merge("k4", "dc"));
283 ASSERT_OK(Flush());
284 MoveFilesToLevel(1);
285 ASSERT_OK(Merge("k4", "ed"));
286 ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
287 "k4", values.data(), &merge_operands_info,
288 &number_of_operands));
289 ASSERT_EQ(values[0], "ba");
290 ASSERT_EQ(values[1], "cb");
291 ASSERT_EQ(values[2], "dc");
292 ASSERT_EQ(values[3], "ed");
293
294 // First 3 k5 values are in SST and next 4 k5 values are in Immutable
295 // Memtable
296 ASSERT_OK(Merge("k5", "who"));
297 ASSERT_OK(Merge("k5", "am"));
298 ASSERT_OK(Merge("k5", "i"));
299 ASSERT_OK(Flush());
300 ASSERT_OK(Put("k5", "remember"));
301 ASSERT_OK(Merge("k5", "i"));
302 ASSERT_OK(Merge("k5", "am"));
303 ASSERT_OK(Merge("k5", "rocks"));
304 ASSERT_OK(dbfull()->TEST_SwitchMemtable());
305 ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
306 "k5", values.data(), &merge_operands_info,
307 &number_of_operands));
308 ASSERT_EQ(values[0], "remember");
309 ASSERT_EQ(values[1], "i");
310 ASSERT_EQ(values[2], "am");
311 }
312
313 TEST_F(DBMergeOperandTest, BlobDBGetMergeOperandsBasic) {
314 Options options;
315 options.create_if_missing = true;
316 options.enable_blob_files = true;
317 options.min_blob_size = 0;
318 // Use only the latest two merge operands.
319 options.merge_operator = std::make_shared<LimitedStringAppendMergeOp>(2, ',');
320 options.env = env_;
321 Reopen(options);
322 int num_records = 4;
323 int number_of_operands = 0;
324 std::vector<PinnableSlice> values(num_records);
325 GetMergeOperandsOptions merge_operands_info;
326 merge_operands_info.expected_max_number_of_operands = num_records;
327
328 // All k1 values are in memtable.
329 ASSERT_OK(Put("k1", "x"));
330 ASSERT_OK(Merge("k1", "b"));
331 ASSERT_OK(Merge("k1", "c"));
332 ASSERT_OK(Merge("k1", "d"));
333 ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
334 "k1", values.data(), &merge_operands_info,
335 &number_of_operands));
336 ASSERT_EQ(values[0], "x");
337 ASSERT_EQ(values[1], "b");
338 ASSERT_EQ(values[2], "c");
339 ASSERT_EQ(values[3], "d");
340
341 // expected_max_number_of_operands is less than number of merge operands so
342 // status should be Incomplete.
343 merge_operands_info.expected_max_number_of_operands = num_records - 1;
344 Status status = db_->GetMergeOperands(
345 ReadOptions(), db_->DefaultColumnFamily(), "k1", values.data(),
346 &merge_operands_info, &number_of_operands);
347 ASSERT_EQ(status.IsIncomplete(), true);
348 merge_operands_info.expected_max_number_of_operands = num_records;
349
350 // All k2 values are flushed to L0 into a single file.
351 ASSERT_OK(Put("k2", "q"));
352 ASSERT_OK(Merge("k2", "w"));
353 ASSERT_OK(Merge("k2", "e"));
354 ASSERT_OK(Merge("k2", "r"));
355 ASSERT_OK(Flush());
356 ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
357 "k2", values.data(), &merge_operands_info,
358 &number_of_operands));
359 ASSERT_EQ(values[0], "q,w,e,r");
360
361 // Do some compaction that will make the following tests more predictable
362 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
363
364 // All k3 values are flushed and are in different files.
365 ASSERT_OK(Put("k3", "ab"));
366 ASSERT_OK(Flush());
367 ASSERT_OK(Merge("k3", "bc"));
368 ASSERT_OK(Flush());
369 ASSERT_OK(Merge("k3", "cd"));
370 ASSERT_OK(Flush());
371 ASSERT_OK(Merge("k3", "de"));
372 ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
373 "k3", values.data(), &merge_operands_info,
374 &number_of_operands));
375 ASSERT_EQ(values[0], "ab");
376 ASSERT_EQ(values[1], "bc");
377 ASSERT_EQ(values[2], "cd");
378 ASSERT_EQ(values[3], "de");
379
380 // All K4 values are in different levels
381 ASSERT_OK(Put("k4", "ba"));
382 ASSERT_OK(Flush());
383 MoveFilesToLevel(4);
384 ASSERT_OK(Merge("k4", "cb"));
385 ASSERT_OK(Flush());
386 MoveFilesToLevel(3);
387 ASSERT_OK(Merge("k4", "dc"));
388 ASSERT_OK(Flush());
389 MoveFilesToLevel(1);
390 ASSERT_OK(Merge("k4", "ed"));
391 ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
392 "k4", values.data(), &merge_operands_info,
393 &number_of_operands));
394 ASSERT_EQ(values[0], "ba");
395 ASSERT_EQ(values[1], "cb");
396 ASSERT_EQ(values[2], "dc");
397 ASSERT_EQ(values[3], "ed");
398 }
399
400 TEST_F(DBMergeOperandTest, GetMergeOperandsLargeResultOptimization) {
401 // These constants are chosen to trigger the large result optimization
402 // (pinning a bundle of `DBImpl` resources).
403 const int kNumOperands = 1024;
404 const int kOperandLen = 1024;
405
406 Options options;
407 options.create_if_missing = true;
408 options.merge_operator = MergeOperators::CreateStringAppendOperator();
409 DestroyAndReopen(options);
410
411 Random rnd(301);
412 std::vector<std::string> expected_merge_operands;
413 expected_merge_operands.reserve(kNumOperands);
414 for (int i = 0; i < kNumOperands; ++i) {
415 expected_merge_operands.emplace_back(rnd.RandomString(kOperandLen));
416 ASSERT_OK(Merge("key", expected_merge_operands.back()));
417 }
418
419 std::vector<PinnableSlice> merge_operands(kNumOperands);
420 GetMergeOperandsOptions merge_operands_info;
421 merge_operands_info.expected_max_number_of_operands = kNumOperands;
422 int num_merge_operands = 0;
423 ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
424 "key", merge_operands.data(),
425 &merge_operands_info, &num_merge_operands));
426 ASSERT_EQ(num_merge_operands, kNumOperands);
427
428 // Ensures the large result optimization was used.
429 for (int i = 0; i < kNumOperands; ++i) {
430 ASSERT_TRUE(merge_operands[i].IsPinned());
431 }
432
433 // Add a Flush() to change the `SuperVersion` to challenge the resource
434 // pinning.
435 ASSERT_OK(Flush());
436
437 for (int i = 0; i < kNumOperands; ++i) {
438 ASSERT_EQ(expected_merge_operands[i], merge_operands[i]);
439 }
440 }
441
442 } // namespace ROCKSDB_NAMESPACE
443
444 int main(int argc, char** argv) {
445 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
446 ::testing::InitGoogleTest(&argc, argv);
447 return RUN_ALL_TESTS();
448 }