1 // Copyright (c) 2018-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 #include "db/db_test_util.h"
7 #include "port/stack_trace.h"
8 #include "rocksdb/perf_context.h"
9 #include "rocksdb/utilities/debug.h"
10 #include "table/block_based/block_builder.h"
11 #if !defined(ROCKSDB_LITE)
12 #include "test_util/sync_point.h"
14 #include "rocksdb/merge_operator.h"
15 #include "utilities/fault_injection_env.h"
16 #include "utilities/merge_operators.h"
17 #include "utilities/merge_operators/sortlist.h"
18 #include "utilities/merge_operators/string_append/stringappend2.h"
20 namespace ROCKSDB_NAMESPACE
{
23 class LimitedStringAppendMergeOp
: public StringAppendTESTOperator
{
25 LimitedStringAppendMergeOp(int limit
, char delim
)
26 : StringAppendTESTOperator(delim
), limit_(limit
) {}
28 const char* Name() const override
{
29 return "DBMergeOperatorTest::LimitedStringAppendMergeOp";
32 bool ShouldMerge(const std::vector
<Slice
>& operands
) const override
{
33 if (operands
.size() > 0 && limit_
> 0 && operands
.size() >= limit_
) {
42 } // anonymous namespace
44 class DBMergeOperandTest
: public DBTestBase
{
47 : DBTestBase("db_merge_operand_test", /*env_do_fsync=*/true) {}
50 TEST_F(DBMergeOperandTest
, CacheEvictedMergeOperandReadAfterFreeBug
) {
51 // There was a bug of reading merge operands after they are mistakely freed
52 // in DB::GetMergeOperands, which is surfaced by cache full.
53 // See PR#9507 for more.
55 options
.create_if_missing
= true;
56 options
.merge_operator
= MergeOperators::CreateStringAppendOperator();
58 BlockBasedTableOptions table_options
;
60 // Small cache to simulate cache full
61 table_options
.block_cache
= NewLRUCache(1);
62 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
66 int number_of_operands
= 0;
67 std::vector
<PinnableSlice
> values(num_records
);
68 GetMergeOperandsOptions merge_operands_info
;
69 merge_operands_info
.expected_max_number_of_operands
= num_records
;
71 ASSERT_OK(Merge("k1", "v1"));
73 ASSERT_OK(Merge("k1", "v2"));
75 ASSERT_OK(Merge("k1", "v3"));
77 ASSERT_OK(Merge("k1", "v4"));
79 ASSERT_OK(db_
->GetMergeOperands(ReadOptions(), db_
->DefaultColumnFamily(),
80 "k1", values
.data(), &merge_operands_info
,
81 &number_of_operands
));
82 ASSERT_EQ(number_of_operands
, 4);
83 ASSERT_EQ(values
[0].ToString(), "v1");
84 ASSERT_EQ(values
[1].ToString(), "v2");
85 ASSERT_EQ(values
[2].ToString(), "v3");
86 ASSERT_EQ(values
[3].ToString(), "v4");
89 TEST_F(DBMergeOperandTest
, FlushedMergeOperandReadAfterFreeBug
) {
90 // Repro for a bug where a memtable containing a merge operand could be
91 // deleted before the merge operand was saved to the result.
92 auto options
= CurrentOptions();
93 options
.merge_operator
= MergeOperators::CreateStringAppendOperator();
96 ASSERT_OK(Merge("key", "value"));
98 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
99 {{"DBImpl::GetImpl:PostMemTableGet:0",
100 "DBMergeOperandTest::FlushedMergeOperandReadAfterFreeBug:PreFlush"},
101 {"DBMergeOperandTest::FlushedMergeOperandReadAfterFreeBug:PostFlush",
102 "DBImpl::GetImpl:PostMemTableGet:1"}});
103 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
105 auto flush_thread
= port::Thread([&]() {
107 "DBMergeOperandTest::FlushedMergeOperandReadAfterFreeBug:PreFlush");
110 "DBMergeOperandTest::FlushedMergeOperandReadAfterFreeBug:PostFlush");
114 GetMergeOperandsOptions merge_operands_info
;
115 merge_operands_info
.expected_max_number_of_operands
= 1;
116 int number_of_operands
;
117 ASSERT_OK(db_
->GetMergeOperands(ReadOptions(), db_
->DefaultColumnFamily(),
118 "key", &value
, &merge_operands_info
,
119 &number_of_operands
));
120 ASSERT_EQ(1, number_of_operands
);
125 TEST_F(DBMergeOperandTest
, GetMergeOperandsBasic
) {
127 options
.create_if_missing
= true;
128 // Use only the latest two merge operands.
129 options
.merge_operator
= std::make_shared
<LimitedStringAppendMergeOp
>(2, ',');
133 int number_of_operands
= 0;
134 std::vector
<PinnableSlice
> values(num_records
);
135 GetMergeOperandsOptions merge_operands_info
;
136 merge_operands_info
.expected_max_number_of_operands
= num_records
;
138 // k0 value in memtable
139 ASSERT_OK(Put("k0", "PutARock"));
140 ASSERT_OK(db_
->GetMergeOperands(ReadOptions(), db_
->DefaultColumnFamily(),
141 "k0", values
.data(), &merge_operands_info
,
142 &number_of_operands
));
143 ASSERT_EQ(values
[0], "PutARock");
146 ASSERT_OK(Put("k0.1", "RockInSST"));
148 ASSERT_OK(db_
->GetMergeOperands(ReadOptions(), db_
->DefaultColumnFamily(),
149 "k0.1", values
.data(), &merge_operands_info
,
150 &number_of_operands
));
151 ASSERT_EQ(values
[0], "RockInSST");
153 // All k1 values are in memtable.
154 ASSERT_OK(Merge("k1", "a"));
155 ASSERT_OK(Put("k1", "x"));
156 ASSERT_OK(Merge("k1", "b"));
157 ASSERT_OK(Merge("k1", "c"));
158 ASSERT_OK(Merge("k1", "d"));
159 ASSERT_OK(db_
->GetMergeOperands(ReadOptions(), db_
->DefaultColumnFamily(),
160 "k1", values
.data(), &merge_operands_info
,
161 &number_of_operands
));
162 ASSERT_EQ(values
[0], "x");
163 ASSERT_EQ(values
[1], "b");
164 ASSERT_EQ(values
[2], "c");
165 ASSERT_EQ(values
[3], "d");
167 // expected_max_number_of_operands is less than number of merge operands so
168 // status should be Incomplete.
169 merge_operands_info
.expected_max_number_of_operands
= num_records
- 1;
170 Status status
= db_
->GetMergeOperands(
171 ReadOptions(), db_
->DefaultColumnFamily(), "k1", values
.data(),
172 &merge_operands_info
, &number_of_operands
);
173 ASSERT_EQ(status
.IsIncomplete(), true);
174 merge_operands_info
.expected_max_number_of_operands
= num_records
;
176 // All k1.1 values are in memtable.
177 ASSERT_OK(Merge("k1.1", "r"));
178 ASSERT_OK(Delete("k1.1"));
179 ASSERT_OK(Merge("k1.1", "c"));
180 ASSERT_OK(Merge("k1.1", "k"));
181 ASSERT_OK(Merge("k1.1", "s"));
182 ASSERT_OK(db_
->GetMergeOperands(ReadOptions(), db_
->DefaultColumnFamily(),
183 "k1.1", values
.data(), &merge_operands_info
,
184 &number_of_operands
));
185 ASSERT_EQ(values
[0], "c");
186 ASSERT_EQ(values
[1], "k");
187 ASSERT_EQ(values
[2], "s");
189 // All k2 values are flushed to L0 into a single file.
190 ASSERT_OK(Merge("k2", "q"));
191 ASSERT_OK(Merge("k2", "w"));
192 ASSERT_OK(Merge("k2", "e"));
193 ASSERT_OK(Merge("k2", "r"));
195 ASSERT_OK(db_
->GetMergeOperands(ReadOptions(), db_
->DefaultColumnFamily(),
196 "k2", values
.data(), &merge_operands_info
,
197 &number_of_operands
));
198 ASSERT_EQ(values
[0], "q");
199 ASSERT_EQ(values
[1], "w");
200 ASSERT_EQ(values
[2], "e");
201 ASSERT_EQ(values
[3], "r");
203 // All k2.1 values are flushed to L0 into a single file.
204 ASSERT_OK(Merge("k2.1", "m"));
205 ASSERT_OK(Put("k2.1", "l"));
206 ASSERT_OK(Merge("k2.1", "n"));
207 ASSERT_OK(Merge("k2.1", "o"));
209 ASSERT_OK(db_
->GetMergeOperands(ReadOptions(), db_
->DefaultColumnFamily(),
210 "k2.1", values
.data(), &merge_operands_info
,
211 &number_of_operands
));
212 ASSERT_EQ(values
[0], "l,n,o");
214 // All k2.2 values are flushed to L0 into a single file.
215 ASSERT_OK(Merge("k2.2", "g"));
216 ASSERT_OK(Delete("k2.2"));
217 ASSERT_OK(Merge("k2.2", "o"));
218 ASSERT_OK(Merge("k2.2", "t"));
220 ASSERT_OK(db_
->GetMergeOperands(ReadOptions(), db_
->DefaultColumnFamily(),
221 "k2.2", values
.data(), &merge_operands_info
,
222 &number_of_operands
));
223 ASSERT_EQ(values
[0], "o,t");
225 // Do some compaction that will make the following tests more predictable
226 // Slice start("PutARock");
228 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
230 // All k3 values are flushed and are in different files.
231 ASSERT_OK(Merge("k3", "ab"));
233 ASSERT_OK(Merge("k3", "bc"));
235 ASSERT_OK(Merge("k3", "cd"));
237 ASSERT_OK(Merge("k3", "de"));
238 ASSERT_OK(db_
->GetMergeOperands(ReadOptions(), db_
->DefaultColumnFamily(),
239 "k3", values
.data(), &merge_operands_info
,
240 &number_of_operands
));
241 ASSERT_EQ(values
[0], "ab");
242 ASSERT_EQ(values
[1], "bc");
243 ASSERT_EQ(values
[2], "cd");
244 ASSERT_EQ(values
[3], "de");
246 // All k3.1 values are flushed and are in different files.
247 ASSERT_OK(Merge("k3.1", "ab"));
249 ASSERT_OK(Put("k3.1", "bc"));
251 ASSERT_OK(Merge("k3.1", "cd"));
253 ASSERT_OK(Merge("k3.1", "de"));
254 ASSERT_OK(db_
->GetMergeOperands(ReadOptions(), db_
->DefaultColumnFamily(),
255 "k3.1", values
.data(), &merge_operands_info
,
256 &number_of_operands
));
257 ASSERT_EQ(values
[0], "bc");
258 ASSERT_EQ(values
[1], "cd");
259 ASSERT_EQ(values
[2], "de");
261 // All k3.2 values are flushed and are in different files.
262 ASSERT_OK(Merge("k3.2", "ab"));
264 ASSERT_OK(Delete("k3.2"));
266 ASSERT_OK(Merge("k3.2", "cd"));
268 ASSERT_OK(Merge("k3.2", "de"));
269 ASSERT_OK(db_
->GetMergeOperands(ReadOptions(), db_
->DefaultColumnFamily(),
270 "k3.2", values
.data(), &merge_operands_info
,
271 &number_of_operands
));
272 ASSERT_EQ(values
[0], "cd");
273 ASSERT_EQ(values
[1], "de");
275 // All K4 values are in different levels
276 ASSERT_OK(Merge("k4", "ba"));
279 ASSERT_OK(Merge("k4", "cb"));
282 ASSERT_OK(Merge("k4", "dc"));
285 ASSERT_OK(Merge("k4", "ed"));
286 ASSERT_OK(db_
->GetMergeOperands(ReadOptions(), db_
->DefaultColumnFamily(),
287 "k4", values
.data(), &merge_operands_info
,
288 &number_of_operands
));
289 ASSERT_EQ(values
[0], "ba");
290 ASSERT_EQ(values
[1], "cb");
291 ASSERT_EQ(values
[2], "dc");
292 ASSERT_EQ(values
[3], "ed");
294 // First 3 k5 values are in SST and next 4 k5 values are in Immutable
296 ASSERT_OK(Merge("k5", "who"));
297 ASSERT_OK(Merge("k5", "am"));
298 ASSERT_OK(Merge("k5", "i"));
300 ASSERT_OK(Put("k5", "remember"));
301 ASSERT_OK(Merge("k5", "i"));
302 ASSERT_OK(Merge("k5", "am"));
303 ASSERT_OK(Merge("k5", "rocks"));
304 ASSERT_OK(dbfull()->TEST_SwitchMemtable());
305 ASSERT_OK(db_
->GetMergeOperands(ReadOptions(), db_
->DefaultColumnFamily(),
306 "k5", values
.data(), &merge_operands_info
,
307 &number_of_operands
));
308 ASSERT_EQ(values
[0], "remember");
309 ASSERT_EQ(values
[1], "i");
310 ASSERT_EQ(values
[2], "am");
313 TEST_F(DBMergeOperandTest
, BlobDBGetMergeOperandsBasic
) {
315 options
.create_if_missing
= true;
316 options
.enable_blob_files
= true;
317 options
.min_blob_size
= 0;
318 // Use only the latest two merge operands.
319 options
.merge_operator
= std::make_shared
<LimitedStringAppendMergeOp
>(2, ',');
323 int number_of_operands
= 0;
324 std::vector
<PinnableSlice
> values(num_records
);
325 GetMergeOperandsOptions merge_operands_info
;
326 merge_operands_info
.expected_max_number_of_operands
= num_records
;
328 // All k1 values are in memtable.
329 ASSERT_OK(Put("k1", "x"));
330 ASSERT_OK(Merge("k1", "b"));
331 ASSERT_OK(Merge("k1", "c"));
332 ASSERT_OK(Merge("k1", "d"));
333 ASSERT_OK(db_
->GetMergeOperands(ReadOptions(), db_
->DefaultColumnFamily(),
334 "k1", values
.data(), &merge_operands_info
,
335 &number_of_operands
));
336 ASSERT_EQ(values
[0], "x");
337 ASSERT_EQ(values
[1], "b");
338 ASSERT_EQ(values
[2], "c");
339 ASSERT_EQ(values
[3], "d");
341 // expected_max_number_of_operands is less than number of merge operands so
342 // status should be Incomplete.
343 merge_operands_info
.expected_max_number_of_operands
= num_records
- 1;
344 Status status
= db_
->GetMergeOperands(
345 ReadOptions(), db_
->DefaultColumnFamily(), "k1", values
.data(),
346 &merge_operands_info
, &number_of_operands
);
347 ASSERT_EQ(status
.IsIncomplete(), true);
348 merge_operands_info
.expected_max_number_of_operands
= num_records
;
350 // All k2 values are flushed to L0 into a single file.
351 ASSERT_OK(Put("k2", "q"));
352 ASSERT_OK(Merge("k2", "w"));
353 ASSERT_OK(Merge("k2", "e"));
354 ASSERT_OK(Merge("k2", "r"));
356 ASSERT_OK(db_
->GetMergeOperands(ReadOptions(), db_
->DefaultColumnFamily(),
357 "k2", values
.data(), &merge_operands_info
,
358 &number_of_operands
));
359 ASSERT_EQ(values
[0], "q,w,e,r");
361 // Do some compaction that will make the following tests more predictable
362 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
364 // All k3 values are flushed and are in different files.
365 ASSERT_OK(Put("k3", "ab"));
367 ASSERT_OK(Merge("k3", "bc"));
369 ASSERT_OK(Merge("k3", "cd"));
371 ASSERT_OK(Merge("k3", "de"));
372 ASSERT_OK(db_
->GetMergeOperands(ReadOptions(), db_
->DefaultColumnFamily(),
373 "k3", values
.data(), &merge_operands_info
,
374 &number_of_operands
));
375 ASSERT_EQ(values
[0], "ab");
376 ASSERT_EQ(values
[1], "bc");
377 ASSERT_EQ(values
[2], "cd");
378 ASSERT_EQ(values
[3], "de");
380 // All K4 values are in different levels
381 ASSERT_OK(Put("k4", "ba"));
384 ASSERT_OK(Merge("k4", "cb"));
387 ASSERT_OK(Merge("k4", "dc"));
390 ASSERT_OK(Merge("k4", "ed"));
391 ASSERT_OK(db_
->GetMergeOperands(ReadOptions(), db_
->DefaultColumnFamily(),
392 "k4", values
.data(), &merge_operands_info
,
393 &number_of_operands
));
394 ASSERT_EQ(values
[0], "ba");
395 ASSERT_EQ(values
[1], "cb");
396 ASSERT_EQ(values
[2], "dc");
397 ASSERT_EQ(values
[3], "ed");
400 TEST_F(DBMergeOperandTest
, GetMergeOperandsLargeResultOptimization
) {
401 // These constants are chosen to trigger the large result optimization
402 // (pinning a bundle of `DBImpl` resources).
403 const int kNumOperands
= 1024;
404 const int kOperandLen
= 1024;
407 options
.create_if_missing
= true;
408 options
.merge_operator
= MergeOperators::CreateStringAppendOperator();
409 DestroyAndReopen(options
);
412 std::vector
<std::string
> expected_merge_operands
;
413 expected_merge_operands
.reserve(kNumOperands
);
414 for (int i
= 0; i
< kNumOperands
; ++i
) {
415 expected_merge_operands
.emplace_back(rnd
.RandomString(kOperandLen
));
416 ASSERT_OK(Merge("key", expected_merge_operands
.back()));
419 std::vector
<PinnableSlice
> merge_operands(kNumOperands
);
420 GetMergeOperandsOptions merge_operands_info
;
421 merge_operands_info
.expected_max_number_of_operands
= kNumOperands
;
422 int num_merge_operands
= 0;
423 ASSERT_OK(db_
->GetMergeOperands(ReadOptions(), db_
->DefaultColumnFamily(),
424 "key", merge_operands
.data(),
425 &merge_operands_info
, &num_merge_operands
));
426 ASSERT_EQ(num_merge_operands
, kNumOperands
);
428 // Ensures the large result optimization was used.
429 for (int i
= 0; i
< kNumOperands
; ++i
) {
430 ASSERT_TRUE(merge_operands
[i
].IsPinned());
433 // Add a Flush() to change the `SuperVersion` to challenge the resource
437 for (int i
= 0; i
< kNumOperands
; ++i
) {
438 ASSERT_EQ(expected_merge_operands
[i
], merge_operands
[i
]);
442 } // namespace ROCKSDB_NAMESPACE
444 int main(int argc
, char** argv
) {
445 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
446 ::testing::InitGoogleTest(&argc
, argv
);
447 return RUN_ALL_TESTS();