]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_with_timestamp_basic_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / db_with_timestamp_basic_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/db_with_timestamp_test_util.h"
11 #include "port/stack_trace.h"
12 #include "rocksdb/perf_context.h"
13 #include "rocksdb/utilities/debug.h"
14 #include "table/block_based/block_based_table_reader.h"
15 #include "table/block_based/block_builder.h"
16 #if !defined(ROCKSDB_LITE)
17 #include "test_util/sync_point.h"
18 #endif
19 #include "test_util/testutil.h"
20 #include "utilities/fault_injection_env.h"
21 #include "utilities/merge_operators/string_append/stringappend2.h"
22
23 namespace ROCKSDB_NAMESPACE {
24 class DBBasicTestWithTimestamp : public DBBasicTestWithTimestampBase {
25 public:
26 DBBasicTestWithTimestamp()
27 : DBBasicTestWithTimestampBase("db_basic_test_with_timestamp") {}
28 };
29
30 TEST_F(DBBasicTestWithTimestamp, SanityChecks) {
31 Options options = CurrentOptions();
32 options.env = env_;
33 options.create_if_missing = true;
34 options.avoid_flush_during_shutdown = true;
35 options.merge_operator = MergeOperators::CreateStringAppendTESTOperator();
36 DestroyAndReopen(options);
37
38 Options options1 = CurrentOptions();
39 options1.env = env_;
40 options1.comparator = test::BytewiseComparatorWithU64TsWrapper();
41 options1.merge_operator = MergeOperators::CreateStringAppendTESTOperator();
42 assert(options1.comparator &&
43 options1.comparator->timestamp_size() == sizeof(uint64_t));
44 ColumnFamilyHandle* handle = nullptr;
45 Status s = db_->CreateColumnFamily(options1, "data", &handle);
46 ASSERT_OK(s);
47
48 std::string dummy_ts(sizeof(uint64_t), '\0');
49 // Perform timestamp operations on default cf.
50 ASSERT_TRUE(
51 db_->Put(WriteOptions(), "key", dummy_ts, "value").IsInvalidArgument());
52 ASSERT_TRUE(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), "key",
53 dummy_ts, "value")
54 .IsInvalidArgument());
55 ASSERT_TRUE(db_->Delete(WriteOptions(), "key", dummy_ts).IsInvalidArgument());
56 ASSERT_TRUE(
57 db_->SingleDelete(WriteOptions(), "key", dummy_ts).IsInvalidArgument());
58 ASSERT_TRUE(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
59 "begin_key", "end_key", dummy_ts)
60 .IsInvalidArgument());
61
62 // Perform non-timestamp operations on "data" cf.
63 ASSERT_TRUE(
64 db_->Put(WriteOptions(), handle, "key", "value").IsInvalidArgument());
65 ASSERT_TRUE(db_->Delete(WriteOptions(), handle, "key").IsInvalidArgument());
66 ASSERT_TRUE(
67 db_->SingleDelete(WriteOptions(), handle, "key").IsInvalidArgument());
68
69 ASSERT_TRUE(
70 db_->Merge(WriteOptions(), handle, "key", "value").IsInvalidArgument());
71 ASSERT_TRUE(db_->DeleteRange(WriteOptions(), handle, "begin_key", "end_key")
72 .IsInvalidArgument());
73
74 {
75 WriteBatch wb;
76 ASSERT_OK(wb.Put(handle, "key", "value"));
77 ASSERT_TRUE(db_->Write(WriteOptions(), &wb).IsInvalidArgument());
78 }
79 {
80 WriteBatch wb;
81 ASSERT_OK(wb.Delete(handle, "key"));
82 ASSERT_TRUE(db_->Write(WriteOptions(), &wb).IsInvalidArgument());
83 }
84 {
85 WriteBatch wb;
86 ASSERT_OK(wb.SingleDelete(handle, "key"));
87 ASSERT_TRUE(db_->Write(WriteOptions(), &wb).IsInvalidArgument());
88 }
89 {
90 WriteBatch wb;
91 ASSERT_OK(wb.DeleteRange(handle, "begin_key", "end_key"));
92 ASSERT_TRUE(db_->Write(WriteOptions(), &wb).IsInvalidArgument());
93 }
94
95 // Perform timestamp operations with timestamps of incorrect size.
96 const std::string wrong_ts(sizeof(uint32_t), '\0');
97 ASSERT_TRUE(db_->Put(WriteOptions(), handle, "key", wrong_ts, "value")
98 .IsInvalidArgument());
99 ASSERT_TRUE(db_->Merge(WriteOptions(), handle, "key", wrong_ts, "value")
100 .IsInvalidArgument());
101 ASSERT_TRUE(
102 db_->Delete(WriteOptions(), handle, "key", wrong_ts).IsInvalidArgument());
103 ASSERT_TRUE(db_->SingleDelete(WriteOptions(), handle, "key", wrong_ts)
104 .IsInvalidArgument());
105 ASSERT_TRUE(
106 db_->DeleteRange(WriteOptions(), handle, "begin_key", "end_key", wrong_ts)
107 .IsInvalidArgument());
108
109 delete handle;
110 }
111
112 TEST_F(DBBasicTestWithTimestamp, MixedCfs) {
113 Options options = CurrentOptions();
114 options.env = env_;
115 options.create_if_missing = true;
116 options.avoid_flush_during_shutdown = true;
117 DestroyAndReopen(options);
118
119 Options options1 = CurrentOptions();
120 options1.env = env_;
121 const size_t kTimestampSize = Timestamp(0, 0).size();
122 TestComparator test_cmp(kTimestampSize);
123 options1.comparator = &test_cmp;
124 ColumnFamilyHandle* handle = nullptr;
125 Status s = db_->CreateColumnFamily(options1, "data", &handle);
126 ASSERT_OK(s);
127
128 WriteBatch wb;
129 ASSERT_OK(wb.Put("a", "value"));
130 ASSERT_OK(wb.Put(handle, "a", "value"));
131 {
132 std::string ts = Timestamp(1, 0);
133 const auto ts_sz_func = [kTimestampSize, handle](uint32_t cf_id) {
134 assert(handle);
135 if (cf_id == 0) {
136 return static_cast<size_t>(0);
137 } else if (cf_id == handle->GetID()) {
138 return kTimestampSize;
139 } else {
140 assert(false);
141 return std::numeric_limits<size_t>::max();
142 }
143 };
144 ASSERT_OK(wb.UpdateTimestamps(ts, ts_sz_func));
145 ASSERT_OK(db_->Write(WriteOptions(), &wb));
146 }
147
148 const auto verify_db = [this](ColumnFamilyHandle* h, const std::string& key,
149 const std::string& ts,
150 const std::string& expected_value) {
151 ASSERT_EQ(expected_value, Get(key));
152 Slice read_ts_slice(ts);
153 ReadOptions read_opts;
154 read_opts.timestamp = &read_ts_slice;
155 std::string value;
156 ASSERT_OK(db_->Get(read_opts, h, key, &value));
157 ASSERT_EQ(expected_value, value);
158 };
159
160 verify_db(handle, "a", Timestamp(1, 0), "value");
161
162 delete handle;
163 Close();
164
165 std::vector<ColumnFamilyDescriptor> cf_descs;
166 cf_descs.emplace_back(kDefaultColumnFamilyName, options);
167 cf_descs.emplace_back("data", options1);
168 options.create_if_missing = false;
169 s = DB::Open(options, dbname_, cf_descs, &handles_, &db_);
170 ASSERT_OK(s);
171
172 verify_db(handles_[1], "a", Timestamp(1, 0), "value");
173
174 Close();
175 }
176
177 TEST_F(DBBasicTestWithTimestamp, CompactRangeWithSpecifiedRange) {
178 Options options = CurrentOptions();
179 options.env = env_;
180 options.create_if_missing = true;
181 const size_t kTimestampSize = Timestamp(0, 0).size();
182 TestComparator test_cmp(kTimestampSize);
183 options.comparator = &test_cmp;
184 DestroyAndReopen(options);
185
186 WriteOptions write_opts;
187 std::string ts = Timestamp(1, 0);
188
189 ASSERT_OK(db_->Put(write_opts, "foo1", ts, "bar"));
190 ASSERT_OK(Flush());
191
192 ASSERT_OK(db_->Put(write_opts, "foo2", ts, "bar"));
193 ASSERT_OK(Flush());
194
195 std::string start_str = "foo";
196 std::string end_str = "foo2";
197 Slice start(start_str), end(end_str);
198 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
199
200 Close();
201 }
202
203 TEST_F(DBBasicTestWithTimestamp, GcPreserveLatestVersionBelowFullHistoryLow) {
204 Options options = CurrentOptions();
205 options.env = env_;
206 options.create_if_missing = true;
207 const size_t kTimestampSize = Timestamp(0, 0).size();
208 TestComparator test_cmp(kTimestampSize);
209 options.comparator = &test_cmp;
210 DestroyAndReopen(options);
211
212 std::string ts_str = Timestamp(1, 0);
213 WriteOptions wopts;
214 ASSERT_OK(db_->Put(wopts, "k1", ts_str, "v1"));
215 ASSERT_OK(db_->Put(wopts, "k2", ts_str, "v2"));
216 ASSERT_OK(db_->Put(wopts, "k3", ts_str, "v3"));
217
218 ts_str = Timestamp(2, 0);
219 ASSERT_OK(db_->Delete(wopts, "k3", ts_str));
220
221 ts_str = Timestamp(4, 0);
222 ASSERT_OK(db_->Put(wopts, "k1", ts_str, "v5"));
223
224 ts_str = Timestamp(5, 0);
225 ASSERT_OK(
226 db_->DeleteRange(wopts, db_->DefaultColumnFamily(), "k0", "k9", ts_str));
227
228 ts_str = Timestamp(3, 0);
229 Slice ts = ts_str;
230 CompactRangeOptions cro;
231 cro.full_history_ts_low = &ts;
232 ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
233
234 ASSERT_OK(Flush());
235
236 ReadOptions ropts;
237 ropts.timestamp = &ts;
238 std::string value;
239 Status s = db_->Get(ropts, "k1", &value);
240 ASSERT_OK(s);
241 ASSERT_EQ("v1", value);
242
243 std::string key_ts;
244 ASSERT_TRUE(db_->Get(ropts, "k3", &value, &key_ts).IsNotFound());
245 ASSERT_EQ(Timestamp(2, 0), key_ts);
246
247 ts_str = Timestamp(5, 0);
248 ts = ts_str;
249 ropts.timestamp = &ts;
250 ASSERT_TRUE(db_->Get(ropts, "k2", &value, &key_ts).IsNotFound());
251 ASSERT_EQ(Timestamp(5, 0), key_ts);
252 ASSERT_TRUE(db_->Get(ropts, "k2", &value).IsNotFound());
253
254 Close();
255 }
256
257 TEST_F(DBBasicTestWithTimestamp, UpdateFullHistoryTsLow) {
258 Options options = CurrentOptions();
259 options.env = env_;
260 options.create_if_missing = true;
261 const size_t kTimestampSize = Timestamp(0, 0).size();
262 TestComparator test_cmp(kTimestampSize);
263 options.comparator = &test_cmp;
264 DestroyAndReopen(options);
265
266 const std::string kKey = "test kKey";
267
268 // Test set ts_low first and flush()
269 int current_ts_low = 5;
270 std::string ts_low_str = Timestamp(current_ts_low, 0);
271 Slice ts_low = ts_low_str;
272 CompactRangeOptions comp_opts;
273 comp_opts.full_history_ts_low = &ts_low;
274 comp_opts.bottommost_level_compaction = BottommostLevelCompaction::kForce;
275
276 ASSERT_OK(db_->CompactRange(comp_opts, nullptr, nullptr));
277
278 auto* cfd =
279 static_cast_with_check<ColumnFamilyHandleImpl>(db_->DefaultColumnFamily())
280 ->cfd();
281 auto result_ts_low = cfd->GetFullHistoryTsLow();
282
283 ASSERT_TRUE(test_cmp.CompareTimestamp(ts_low, result_ts_low) == 0);
284
285 for (int i = 0; i < 10; i++) {
286 WriteOptions write_opts;
287 std::string ts = Timestamp(i, 0);
288 ASSERT_OK(db_->Put(write_opts, kKey, ts, Key(i)));
289 }
290 ASSERT_OK(Flush());
291
292 for (int i = 0; i < 10; i++) {
293 ReadOptions read_opts;
294 std::string ts_str = Timestamp(i, 0);
295 Slice ts = ts_str;
296 read_opts.timestamp = &ts;
297 std::string value;
298 Status status = db_->Get(read_opts, kKey, &value);
299 if (i < current_ts_low) {
300 ASSERT_TRUE(status.IsInvalidArgument());
301 } else {
302 ASSERT_OK(status);
303 ASSERT_TRUE(value.compare(Key(i)) == 0);
304 }
305 }
306
307 // Test set ts_low and then trigger compaction
308 for (int i = 10; i < 20; i++) {
309 WriteOptions write_opts;
310 std::string ts = Timestamp(i, 0);
311 ASSERT_OK(db_->Put(write_opts, kKey, ts, Key(i)));
312 }
313
314 ASSERT_OK(Flush());
315
316 current_ts_low = 15;
317 ts_low_str = Timestamp(current_ts_low, 0);
318 ts_low = ts_low_str;
319 comp_opts.full_history_ts_low = &ts_low;
320 ASSERT_OK(db_->CompactRange(comp_opts, nullptr, nullptr));
321 result_ts_low = cfd->GetFullHistoryTsLow();
322 ASSERT_TRUE(test_cmp.CompareTimestamp(ts_low, result_ts_low) == 0);
323
324 for (int i = current_ts_low; i < 20; i++) {
325 ReadOptions read_opts;
326 std::string ts_str = Timestamp(i, 0);
327 Slice ts = ts_str;
328 read_opts.timestamp = &ts;
329 std::string value;
330 Status status = db_->Get(read_opts, kKey, &value);
331 ASSERT_OK(status);
332 ASSERT_TRUE(value.compare(Key(i)) == 0);
333 }
334
335 // Test invalid compaction with range
336 Slice start(kKey), end(kKey);
337 Status s = db_->CompactRange(comp_opts, &start, &end);
338 ASSERT_TRUE(s.IsInvalidArgument());
339 s = db_->CompactRange(comp_opts, &start, nullptr);
340 ASSERT_TRUE(s.IsInvalidArgument());
341 s = db_->CompactRange(comp_opts, nullptr, &end);
342 ASSERT_TRUE(s.IsInvalidArgument());
343
344 // Test invalid compaction with the decreasing ts_low
345 ts_low_str = Timestamp(current_ts_low - 1, 0);
346 ts_low = ts_low_str;
347 comp_opts.full_history_ts_low = &ts_low;
348 s = db_->CompactRange(comp_opts, nullptr, nullptr);
349 ASSERT_TRUE(s.IsInvalidArgument());
350
351 Close();
352 }
353
354 TEST_F(DBBasicTestWithTimestamp, UpdateFullHistoryTsLowWithPublicAPI) {
355 Options options = CurrentOptions();
356 options.env = env_;
357 options.create_if_missing = true;
358 const size_t kTimestampSize = Timestamp(0, 0).size();
359 TestComparator test_cmp(kTimestampSize);
360 options.comparator = &test_cmp;
361 DestroyAndReopen(options);
362 std::string ts_low_str = Timestamp(9, 0);
363 ASSERT_OK(
364 db_->IncreaseFullHistoryTsLow(db_->DefaultColumnFamily(), ts_low_str));
365 std::string result_ts_low;
366 ASSERT_OK(db_->GetFullHistoryTsLow(nullptr, &result_ts_low));
367 ASSERT_TRUE(test_cmp.CompareTimestamp(ts_low_str, result_ts_low) == 0);
368 // test increase full_history_low backward
369 std::string ts_low_str_back = Timestamp(8, 0);
370 auto s = db_->IncreaseFullHistoryTsLow(db_->DefaultColumnFamily(),
371 ts_low_str_back);
372 ASSERT_EQ(s, Status::InvalidArgument());
373 // test IncreaseFullHistoryTsLow with a timestamp whose length is longger
374 // than the cf's timestamp size
375 std::string ts_low_str_long(Timestamp(0, 0).size() + 1, 'a');
376 s = db_->IncreaseFullHistoryTsLow(db_->DefaultColumnFamily(),
377 ts_low_str_long);
378 ASSERT_EQ(s, Status::InvalidArgument());
379 // test IncreaseFullHistoryTsLow with a timestamp which is null
380 std::string ts_low_str_null = "";
381 s = db_->IncreaseFullHistoryTsLow(db_->DefaultColumnFamily(),
382 ts_low_str_null);
383 ASSERT_EQ(s, Status::InvalidArgument());
384 // test IncreaseFullHistoryTsLow for a column family that does not enable
385 // timestamp
386 options.comparator = BytewiseComparator();
387 DestroyAndReopen(options);
388 ts_low_str = Timestamp(10, 0);
389 s = db_->IncreaseFullHistoryTsLow(db_->DefaultColumnFamily(), ts_low_str);
390 ASSERT_EQ(s, Status::InvalidArgument());
391 // test GetFullHistoryTsLow for a column family that does not enable
392 // timestamp
393 std::string current_ts_low;
394 s = db_->GetFullHistoryTsLow(db_->DefaultColumnFamily(), &current_ts_low);
395 ASSERT_EQ(s, Status::InvalidArgument());
396 Close();
397 }
398
399 TEST_F(DBBasicTestWithTimestamp, GetApproximateSizes) {
400 Options options = CurrentOptions();
401 options.write_buffer_size = 100000000; // Large write buffer
402 options.compression = kNoCompression;
403 options.create_if_missing = true;
404 const size_t kTimestampSize = Timestamp(0, 0).size();
405 TestComparator test_cmp(kTimestampSize);
406 options.comparator = &test_cmp;
407 DestroyAndReopen(options);
408 auto default_cf = db_->DefaultColumnFamily();
409
410 WriteOptions write_opts;
411 std::string ts = Timestamp(1, 0);
412
413 const int N = 128;
414 Random rnd(301);
415 for (int i = 0; i < N; i++) {
416 ASSERT_OK(db_->Put(write_opts, Key(i), ts, rnd.RandomString(1024)));
417 }
418
419 uint64_t size;
420 std::string start = Key(50);
421 std::string end = Key(60);
422 Range r(start, end);
423 SizeApproximationOptions size_approx_options;
424 size_approx_options.include_memtables = true;
425 size_approx_options.include_files = true;
426 ASSERT_OK(
427 db_->GetApproximateSizes(size_approx_options, default_cf, &r, 1, &size));
428 ASSERT_GT(size, 6000);
429 ASSERT_LT(size, 204800);
430
431 // test multiple ranges
432 std::vector<Range> ranges;
433 std::string start_tmp = Key(10);
434 std::string end_tmp = Key(20);
435 ranges.emplace_back(Range(start_tmp, end_tmp));
436 ranges.emplace_back(Range(start, end));
437 uint64_t range_sizes[2];
438 ASSERT_OK(db_->GetApproximateSizes(size_approx_options, default_cf,
439 ranges.data(), 2, range_sizes));
440
441 ASSERT_EQ(range_sizes[1], size);
442
443 // Zero if not including mem table
444 ASSERT_OK(db_->GetApproximateSizes(&r, 1, &size));
445 ASSERT_EQ(size, 0);
446
447 start = Key(500);
448 end = Key(600);
449 r = Range(start, end);
450 ASSERT_OK(
451 db_->GetApproximateSizes(size_approx_options, default_cf, &r, 1, &size));
452 ASSERT_EQ(size, 0);
453
454 // Test range boundaries
455 ASSERT_OK(db_->Put(write_opts, Key(1000), ts, rnd.RandomString(1024)));
456 // Should include start key
457 start = Key(1000);
458 end = Key(1100);
459 r = Range(start, end);
460 ASSERT_OK(
461 db_->GetApproximateSizes(size_approx_options, default_cf, &r, 1, &size));
462 ASSERT_GT(size, 0);
463
464 // Should exclude end key
465 start = Key(900);
466 end = Key(1000);
467 r = Range(start, end);
468 ASSERT_OK(
469 db_->GetApproximateSizes(size_approx_options, default_cf, &r, 1, &size));
470 ASSERT_EQ(size, 0);
471
472 Close();
473 }
474
475 TEST_F(DBBasicTestWithTimestamp, SimpleIterate) {
476 const int kNumKeysPerFile = 128;
477 const uint64_t kMaxKey = 1024;
478 Options options = CurrentOptions();
479 options.env = env_;
480 options.create_if_missing = true;
481 const size_t kTimestampSize = Timestamp(0, 0).size();
482 TestComparator test_cmp(kTimestampSize);
483 options.comparator = &test_cmp;
484 options.memtable_factory.reset(
485 test::NewSpecialSkipListFactory(kNumKeysPerFile));
486 DestroyAndReopen(options);
487 const std::vector<uint64_t> start_keys = {1, 0};
488 const std::vector<std::string> write_timestamps = {Timestamp(1, 0),
489 Timestamp(3, 0)};
490 const std::vector<std::string> read_timestamps = {Timestamp(2, 0),
491 Timestamp(4, 0)};
492 for (size_t i = 0; i < write_timestamps.size(); ++i) {
493 WriteOptions write_opts;
494 for (uint64_t key = start_keys[i]; key <= kMaxKey; ++key) {
495 Status s = db_->Put(write_opts, Key1(key), write_timestamps[i],
496 "value" + std::to_string(i));
497 ASSERT_OK(s);
498 }
499 }
500 for (size_t i = 0; i < read_timestamps.size(); ++i) {
501 ReadOptions read_opts;
502 Slice read_ts = read_timestamps[i];
503 read_opts.timestamp = &read_ts;
504 std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
505 int count = 0;
506 uint64_t key = 0;
507 // Forward iterate.
508 for (it->Seek(Key1(0)), key = start_keys[i]; it->Valid();
509 it->Next(), ++count, ++key) {
510 CheckIterUserEntry(it.get(), Key1(key), kTypeValue,
511 "value" + std::to_string(i), write_timestamps[i]);
512 }
513 size_t expected_count = kMaxKey - start_keys[i] + 1;
514 ASSERT_EQ(expected_count, count);
515
516 // Backward iterate.
517 count = 0;
518 for (it->SeekForPrev(Key1(kMaxKey)), key = kMaxKey; it->Valid();
519 it->Prev(), ++count, --key) {
520 CheckIterUserEntry(it.get(), Key1(key), kTypeValue,
521 "value" + std::to_string(i), write_timestamps[i]);
522 }
523 ASSERT_EQ(static_cast<size_t>(kMaxKey) - start_keys[i] + 1, count);
524
525 // SeekToFirst()/SeekToLast() with lower/upper bounds.
526 // Then iter with lower and upper bounds.
527 uint64_t l = 0;
528 uint64_t r = kMaxKey + 1;
529 while (l < r) {
530 std::string lb_str = Key1(l);
531 Slice lb = lb_str;
532 std::string ub_str = Key1(r);
533 Slice ub = ub_str;
534 read_opts.iterate_lower_bound = &lb;
535 read_opts.iterate_upper_bound = &ub;
536 it.reset(db_->NewIterator(read_opts));
537 for (it->SeekToFirst(), key = std::max(l, start_keys[i]), count = 0;
538 it->Valid(); it->Next(), ++key, ++count) {
539 CheckIterUserEntry(it.get(), Key1(key), kTypeValue,
540 "value" + std::to_string(i), write_timestamps[i]);
541 }
542 ASSERT_EQ(r - std::max(l, start_keys[i]), count);
543
544 for (it->SeekToLast(), key = std::min(r, kMaxKey + 1), count = 0;
545 it->Valid(); it->Prev(), --key, ++count) {
546 CheckIterUserEntry(it.get(), Key1(key - 1), kTypeValue,
547 "value" + std::to_string(i), write_timestamps[i]);
548 }
549 l += (kMaxKey / 100);
550 r -= (kMaxKey / 100);
551 }
552 }
553 Close();
554 }
555
556 TEST_F(DBBasicTestWithTimestamp, TrimHistoryTest) {
557 Options options = CurrentOptions();
558 options.env = env_;
559 options.create_if_missing = true;
560 const size_t kTimestampSize = Timestamp(0, 0).size();
561 TestComparator test_cmp(kTimestampSize);
562 options.comparator = &test_cmp;
563 DestroyAndReopen(options);
564 auto check_value_by_ts = [](DB* db, Slice key, std::string readTs,
565 Status status, std::string checkValue,
566 std::string expected_ts) {
567 ReadOptions ropts;
568 Slice ts = readTs;
569 ropts.timestamp = &ts;
570 std::string value;
571 std::string key_ts;
572 Status s = db->Get(ropts, key, &value, &key_ts);
573 ASSERT_TRUE(s == status);
574 if (s.ok()) {
575 ASSERT_EQ(checkValue, value);
576 }
577 if (s.ok() || s.IsNotFound()) {
578 ASSERT_EQ(expected_ts, key_ts);
579 }
580 };
581 // Construct data of different versions with different ts
582 ASSERT_OK(db_->Put(WriteOptions(), "k1", Timestamp(2, 0), "v1"));
583 ASSERT_OK(db_->Put(WriteOptions(), "k1", Timestamp(4, 0), "v2"));
584 ASSERT_OK(db_->Delete(WriteOptions(), "k1", Timestamp(5, 0)));
585 ASSERT_OK(db_->Put(WriteOptions(), "k1", Timestamp(6, 0), "v3"));
586 check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::OK(), "v3",
587 Timestamp(6, 0));
588 ASSERT_OK(Flush());
589 Close();
590
591 ColumnFamilyOptions cf_options(options);
592 std::vector<ColumnFamilyDescriptor> column_families;
593 column_families.push_back(
594 ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
595 DBOptions db_options(options);
596
597 // Trim data whose version > Timestamp(5, 0), read(k1, ts(7)) <- NOT_FOUND.
598 ASSERT_OK(DB::OpenAndTrimHistory(db_options, dbname_, column_families,
599 &handles_, &db_, Timestamp(5, 0)));
600 check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::NotFound(), "",
601 Timestamp(5, 0));
602 Close();
603
604 // Trim data whose timestamp > Timestamp(4, 0), read(k1, ts(7)) <- v2
605 ASSERT_OK(DB::OpenAndTrimHistory(db_options, dbname_, column_families,
606 &handles_, &db_, Timestamp(4, 0)));
607 check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::OK(), "v2",
608 Timestamp(4, 0));
609 Close();
610
611 Reopen(options);
612 ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "k1",
613 "k3", Timestamp(7, 0)));
614 check_value_by_ts(db_, "k1", Timestamp(8, 0), Status::NotFound(), "",
615 Timestamp(7, 0));
616 Close();
617 // Trim data whose timestamp > Timestamp(6, 0), read(k1, ts(8)) <- v2
618 ASSERT_OK(DB::OpenAndTrimHistory(db_options, dbname_, column_families,
619 &handles_, &db_, Timestamp(6, 0)));
620 check_value_by_ts(db_, "k1", Timestamp(8, 0), Status::OK(), "v2",
621 Timestamp(4, 0));
622 Close();
623 }
624
625 TEST_F(DBBasicTestWithTimestamp, OpenAndTrimHistoryInvalidOptionTest) {
626 Destroy(last_options_);
627
628 Options options = CurrentOptions();
629 options.env = env_;
630 options.create_if_missing = true;
631 const size_t kTimestampSize = Timestamp(0, 0).size();
632 TestComparator test_cmp(kTimestampSize);
633 options.comparator = &test_cmp;
634
635 ColumnFamilyOptions cf_options(options);
636 std::vector<ColumnFamilyDescriptor> column_families;
637 column_families.push_back(
638 ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
639 DBOptions db_options(options);
640
641 // OpenAndTrimHistory should not work with avoid_flush_during_recovery
642 db_options.avoid_flush_during_recovery = true;
643 ASSERT_TRUE(DB::OpenAndTrimHistory(db_options, dbname_, column_families,
644 &handles_, &db_, Timestamp(0, 0))
645 .IsInvalidArgument());
646 }
647
648 #ifndef ROCKSDB_LITE
649 TEST_F(DBBasicTestWithTimestamp, GetTimestampTableProperties) {
650 Options options = CurrentOptions();
651 const size_t kTimestampSize = Timestamp(0, 0).size();
652 TestComparator test_cmp(kTimestampSize);
653 options.comparator = &test_cmp;
654 DestroyAndReopen(options);
655 // Create 2 tables
656 for (int table = 0; table < 2; ++table) {
657 for (int i = 0; i < 10; i++) {
658 std::string ts = Timestamp(i, 0);
659 ASSERT_OK(db_->Put(WriteOptions(), "key", ts, Key(i)));
660 }
661 ASSERT_OK(Flush());
662 }
663
664 TablePropertiesCollection props;
665 ASSERT_OK(db_->GetPropertiesOfAllTables(&props));
666 ASSERT_EQ(2U, props.size());
667 for (const auto& item : props) {
668 auto& user_collected = item.second->user_collected_properties;
669 ASSERT_TRUE(user_collected.find("rocksdb.timestamp_min") !=
670 user_collected.end());
671 ASSERT_TRUE(user_collected.find("rocksdb.timestamp_max") !=
672 user_collected.end());
673 ASSERT_EQ(user_collected.at("rocksdb.timestamp_min"), Timestamp(0, 0));
674 ASSERT_EQ(user_collected.at("rocksdb.timestamp_max"), Timestamp(9, 0));
675 }
676 Close();
677 }
678 #endif // !ROCKSDB_LITE
679
680 class DBBasicTestWithTimestampTableOptions
681 : public DBBasicTestWithTimestampBase,
682 public testing::WithParamInterface<BlockBasedTableOptions::IndexType> {
683 public:
684 explicit DBBasicTestWithTimestampTableOptions()
685 : DBBasicTestWithTimestampBase(
686 "db_basic_test_with_timestamp_table_options") {}
687 };
688
689 INSTANTIATE_TEST_CASE_P(
690 Timestamp, DBBasicTestWithTimestampTableOptions,
691 testing::Values(
692 BlockBasedTableOptions::IndexType::kBinarySearch,
693 BlockBasedTableOptions::IndexType::kHashSearch,
694 BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch,
695 BlockBasedTableOptions::IndexType::kBinarySearchWithFirstKey));
696
697 TEST_P(DBBasicTestWithTimestampTableOptions, GetAndMultiGet) {
698 Options options = GetDefaultOptions();
699 options.create_if_missing = true;
700 options.prefix_extractor.reset(NewFixedPrefixTransform(3));
701 options.compression = kNoCompression;
702 BlockBasedTableOptions bbto;
703 bbto.index_type = GetParam();
704 bbto.block_size = 100;
705 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
706 const size_t kTimestampSize = Timestamp(0, 0).size();
707 TestComparator cmp(kTimestampSize);
708 options.comparator = &cmp;
709 DestroyAndReopen(options);
710 constexpr uint64_t kNumKeys = 1024;
711 for (uint64_t k = 0; k < kNumKeys; ++k) {
712 WriteOptions write_opts;
713 ASSERT_OK(db_->Put(write_opts, Key1(k), Timestamp(1, 0),
714 "value" + std::to_string(k)));
715 }
716 ASSERT_OK(Flush());
717 {
718 ReadOptions read_opts;
719 read_opts.total_order_seek = true;
720 std::string ts_str = Timestamp(2, 0);
721 Slice ts = ts_str;
722 read_opts.timestamp = &ts;
723 std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
724 // verify Get()
725 for (it->SeekToFirst(); it->Valid(); it->Next()) {
726 std::string value_from_get;
727 std::string key_str(it->key().data(), it->key().size());
728 std::string timestamp;
729 ASSERT_OK(db_->Get(read_opts, key_str, &value_from_get, &timestamp));
730 ASSERT_EQ(it->value(), value_from_get);
731 ASSERT_EQ(Timestamp(1, 0), timestamp);
732 }
733
734 // verify MultiGet()
735 constexpr uint64_t step = 2;
736 static_assert(0 == (kNumKeys % step),
737 "kNumKeys must be a multiple of step");
738 for (uint64_t k = 0; k < kNumKeys; k += 2) {
739 std::vector<std::string> key_strs;
740 std::vector<Slice> keys;
741 for (size_t i = 0; i < step; ++i) {
742 key_strs.push_back(Key1(k + i));
743 }
744 for (size_t i = 0; i < step; ++i) {
745 keys.emplace_back(key_strs[i]);
746 }
747 std::vector<std::string> values;
748 std::vector<std::string> timestamps;
749 std::vector<Status> statuses =
750 db_->MultiGet(read_opts, keys, &values, &timestamps);
751 ASSERT_EQ(step, statuses.size());
752 ASSERT_EQ(step, values.size());
753 ASSERT_EQ(step, timestamps.size());
754 for (uint64_t i = 0; i < step; ++i) {
755 ASSERT_OK(statuses[i]);
756 ASSERT_EQ("value" + std::to_string(k + i), values[i]);
757 ASSERT_EQ(Timestamp(1, 0), timestamps[i]);
758 }
759 }
760 }
761 Close();
762 }
763
764 TEST_P(DBBasicTestWithTimestampTableOptions, SeekWithPrefixLessThanKey) {
765 Options options = CurrentOptions();
766 options.env = env_;
767 options.create_if_missing = true;
768 options.prefix_extractor.reset(NewFixedPrefixTransform(3));
769 options.memtable_whole_key_filtering = true;
770 options.memtable_prefix_bloom_size_ratio = 0.1;
771 BlockBasedTableOptions bbto;
772 bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
773 bbto.cache_index_and_filter_blocks = true;
774 bbto.whole_key_filtering = true;
775 bbto.index_type = GetParam();
776 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
777 const size_t kTimestampSize = Timestamp(0, 0).size();
778 TestComparator test_cmp(kTimestampSize);
779 options.comparator = &test_cmp;
780 DestroyAndReopen(options);
781
782 WriteOptions write_opts;
783 std::string ts = Timestamp(1, 0);
784
785 ASSERT_OK(db_->Put(write_opts, "foo1", ts, "bar"));
786 ASSERT_OK(Flush());
787
788 ASSERT_OK(db_->Put(write_opts, "foo2", ts, "bar"));
789 ASSERT_OK(Flush());
790
791 // Move sst file to next level
792 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
793
794 ASSERT_OK(db_->Put(write_opts, "foo3", ts, "bar"));
795 ASSERT_OK(Flush());
796
797 ReadOptions read_opts;
798 Slice read_ts = ts;
799 read_opts.timestamp = &read_ts;
800 {
801 std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
802 iter->Seek("foo");
803 ASSERT_TRUE(iter->Valid());
804 ASSERT_OK(iter->status());
805 iter->Next();
806 ASSERT_TRUE(iter->Valid());
807 ASSERT_OK(iter->status());
808
809 iter->Seek("bbb");
810 ASSERT_FALSE(iter->Valid());
811 ASSERT_OK(iter->status());
812 }
813
814 Close();
815 }
816
817 TEST_P(DBBasicTestWithTimestampTableOptions, SeekWithCappedPrefix) {
818 Options options = CurrentOptions();
819 options.env = env_;
820 options.create_if_missing = true;
821 // All of the keys or this test must be longer than 3 characters
822 constexpr int kMinKeyLen = 3;
823 options.prefix_extractor.reset(NewCappedPrefixTransform(kMinKeyLen));
824 options.memtable_whole_key_filtering = true;
825 options.memtable_prefix_bloom_size_ratio = 0.1;
826 BlockBasedTableOptions bbto;
827 bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
828 bbto.cache_index_and_filter_blocks = true;
829 bbto.whole_key_filtering = true;
830 bbto.index_type = GetParam();
831 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
832 const size_t kTimestampSize = Timestamp(0, 0).size();
833 TestComparator test_cmp(kTimestampSize);
834 options.comparator = &test_cmp;
835 DestroyAndReopen(options);
836
837 WriteOptions write_opts;
838 std::string ts = Timestamp(1, 0);
839
840 ASSERT_OK(db_->Put(write_opts, "foo1", ts, "bar"));
841 ASSERT_OK(Flush());
842
843 ASSERT_OK(db_->Put(write_opts, "foo2", ts, "bar"));
844 ASSERT_OK(Flush());
845
846 // Move sst file to next level
847 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
848
849 ASSERT_OK(db_->Put(write_opts, "foo3", ts, "bar"));
850 ASSERT_OK(Flush());
851
852 ReadOptions read_opts;
853 ts = Timestamp(2, 0);
854 Slice read_ts = ts;
855 read_opts.timestamp = &read_ts;
856 {
857 std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
858 // Make sure the prefix extractor doesn't include timestamp, otherwise it
859 // may return invalid result.
860 iter->Seek("foo");
861 ASSERT_TRUE(iter->Valid());
862 ASSERT_OK(iter->status());
863 iter->Next();
864 ASSERT_TRUE(iter->Valid());
865 ASSERT_OK(iter->status());
866 }
867
868 Close();
869 }
870
871 TEST_P(DBBasicTestWithTimestampTableOptions, SeekWithBound) {
872 Options options = CurrentOptions();
873 options.env = env_;
874 options.create_if_missing = true;
875 options.prefix_extractor.reset(NewFixedPrefixTransform(2));
876 BlockBasedTableOptions bbto;
877 bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
878 bbto.cache_index_and_filter_blocks = true;
879 bbto.whole_key_filtering = true;
880 bbto.index_type = GetParam();
881 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
882 const size_t kTimestampSize = Timestamp(0, 0).size();
883 TestComparator test_cmp(kTimestampSize);
884 options.comparator = &test_cmp;
885 DestroyAndReopen(options);
886
887 WriteOptions write_opts;
888 std::string ts = Timestamp(1, 0);
889
890 ASSERT_OK(db_->Put(write_opts, "foo1", ts, "bar1"));
891 ASSERT_OK(Flush());
892
893 ASSERT_OK(db_->Put(write_opts, "foo2", ts, "bar2"));
894 ASSERT_OK(Flush());
895
896 // Move sst file to next level
897 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
898
899 for (int i = 3; i < 9; ++i) {
900 ASSERT_OK(db_->Put(write_opts, "foo" + std::to_string(i), ts,
901 "bar" + std::to_string(i)));
902 }
903 ASSERT_OK(Flush());
904
905 ReadOptions read_opts;
906 ts = Timestamp(2, 0);
907 Slice read_ts = ts;
908 read_opts.timestamp = &read_ts;
909 std::string up_bound = "foo5"; // exclusive
910 Slice up_bound_slice = up_bound;
911 std::string lo_bound = "foo2"; // inclusive
912 Slice lo_bound_slice = lo_bound;
913 read_opts.iterate_upper_bound = &up_bound_slice;
914 read_opts.iterate_lower_bound = &lo_bound_slice;
915 read_opts.auto_prefix_mode = true;
916 {
917 std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
918 // Make sure the prefix extractor doesn't include timestamp, otherwise it
919 // may return invalid result.
920 iter->Seek("foo");
921 CheckIterUserEntry(iter.get(), lo_bound, kTypeValue, "bar2",
922 Timestamp(1, 0));
923 iter->SeekToFirst();
924 CheckIterUserEntry(iter.get(), lo_bound, kTypeValue, "bar2",
925 Timestamp(1, 0));
926 iter->SeekForPrev("g");
927 CheckIterUserEntry(iter.get(), "foo4", kTypeValue, "bar4", Timestamp(1, 0));
928 iter->SeekToLast();
929 CheckIterUserEntry(iter.get(), "foo4", kTypeValue, "bar4", Timestamp(1, 0));
930 }
931
932 Close();
933 }
934
935 TEST_F(DBBasicTestWithTimestamp, ChangeIterationDirection) {
936 Options options = GetDefaultOptions();
937 options.create_if_missing = true;
938 options.env = env_;
939 const size_t kTimestampSize = Timestamp(0, 0).size();
940 TestComparator test_cmp(kTimestampSize);
941 options.comparator = &test_cmp;
942 options.prefix_extractor.reset(NewFixedPrefixTransform(1));
943 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
944 DestroyAndReopen(options);
945 const std::vector<std::string> timestamps = {Timestamp(1, 1), Timestamp(0, 2),
946 Timestamp(4, 3)};
947 const std::vector<std::tuple<std::string, std::string>> kvs = {
948 std::make_tuple("aa", "value1"), std::make_tuple("ab", "value2")};
949 for (const auto& ts : timestamps) {
950 WriteBatch wb(0, 0, 0, kTimestampSize);
951 for (const auto& kv : kvs) {
952 const std::string& key = std::get<0>(kv);
953 const std::string& value = std::get<1>(kv);
954 ASSERT_OK(wb.Put(key, value));
955 }
956
957 ASSERT_OK(wb.UpdateTimestamps(
958 ts, [kTimestampSize](uint32_t) { return kTimestampSize; }));
959 ASSERT_OK(db_->Write(WriteOptions(), &wb));
960 }
961 std::string read_ts_str = Timestamp(5, 3);
962 Slice read_ts = read_ts_str;
963 ReadOptions read_opts;
964 read_opts.timestamp = &read_ts;
965 std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
966
967 it->SeekToFirst();
968 ASSERT_TRUE(it->Valid());
969 it->Prev();
970 ASSERT_FALSE(it->Valid());
971
972 it->SeekToLast();
973 ASSERT_TRUE(it->Valid());
974 uint64_t prev_reseek_count =
975 options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION);
976 ASSERT_EQ(0, prev_reseek_count);
977 it->Next();
978 ASSERT_FALSE(it->Valid());
979 ASSERT_EQ(1 + prev_reseek_count,
980 options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION));
981
982 it->Seek(std::get<0>(kvs[0]));
983 CheckIterUserEntry(it.get(), std::get<0>(kvs[0]), kTypeValue,
984 std::get<1>(kvs[0]), Timestamp(4, 3));
985 it->Next();
986 CheckIterUserEntry(it.get(), std::get<0>(kvs[1]), kTypeValue,
987 std::get<1>(kvs[1]), Timestamp(4, 3));
988 it->Prev();
989 CheckIterUserEntry(it.get(), std::get<0>(kvs[0]), kTypeValue,
990 std::get<1>(kvs[0]), Timestamp(4, 3));
991
992 prev_reseek_count =
993 options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION);
994 ASSERT_EQ(1, prev_reseek_count);
995 it->Next();
996 CheckIterUserEntry(it.get(), std::get<0>(kvs[1]), kTypeValue,
997 std::get<1>(kvs[1]), Timestamp(4, 3));
998 ASSERT_EQ(1 + prev_reseek_count,
999 options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION));
1000
1001 it->SeekForPrev(std::get<0>(kvs[1]));
1002 CheckIterUserEntry(it.get(), std::get<0>(kvs[1]), kTypeValue,
1003 std::get<1>(kvs[1]), Timestamp(4, 3));
1004 it->Prev();
1005 CheckIterUserEntry(it.get(), std::get<0>(kvs[0]), kTypeValue,
1006 std::get<1>(kvs[0]), Timestamp(4, 3));
1007
1008 prev_reseek_count =
1009 options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION);
1010 it->Next();
1011 CheckIterUserEntry(it.get(), std::get<0>(kvs[1]), kTypeValue,
1012 std::get<1>(kvs[1]), Timestamp(4, 3));
1013 ASSERT_EQ(1 + prev_reseek_count,
1014 options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION));
1015
1016 it.reset();
1017 Close();
1018 }
1019
1020 TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterateLowerTsBound) {
1021 constexpr int kNumKeysPerFile = 128;
1022 constexpr uint64_t kMaxKey = 1024;
1023 Options options = CurrentOptions();
1024 options.env = env_;
1025 options.create_if_missing = true;
1026 const size_t kTimestampSize = Timestamp(0, 0).size();
1027 TestComparator test_cmp(kTimestampSize);
1028 options.comparator = &test_cmp;
1029 options.memtable_factory.reset(
1030 test::NewSpecialSkipListFactory(kNumKeysPerFile));
1031 DestroyAndReopen(options);
1032 const std::vector<std::string> write_timestamps = {Timestamp(1, 0),
1033 Timestamp(3, 0)};
1034 const std::vector<std::string> read_timestamps = {Timestamp(2, 0),
1035 Timestamp(4, 0)};
1036 const std::vector<std::string> read_timestamps_lb = {Timestamp(1, 0),
1037 Timestamp(1, 0)};
1038 for (size_t i = 0; i < write_timestamps.size(); ++i) {
1039 WriteOptions write_opts;
1040 for (uint64_t key = 0; key <= kMaxKey; ++key) {
1041 Status s = db_->Put(write_opts, Key1(key), write_timestamps[i],
1042 "value" + std::to_string(i));
1043 ASSERT_OK(s);
1044 }
1045 }
1046 for (size_t i = 0; i < read_timestamps.size(); ++i) {
1047 ReadOptions read_opts;
1048 Slice read_ts = read_timestamps[i];
1049 Slice read_ts_lb = read_timestamps_lb[i];
1050 read_opts.timestamp = &read_ts;
1051 read_opts.iter_start_ts = &read_ts_lb;
1052 std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
1053 int count = 0;
1054 uint64_t key = 0;
1055 for (it->Seek(Key1(0)), key = 0; it->Valid(); it->Next(), ++count, ++key) {
1056 CheckIterEntry(it.get(), Key1(key), kTypeValue,
1057 "value" + std::to_string(i), write_timestamps[i]);
1058 if (i > 0) {
1059 it->Next();
1060 CheckIterEntry(it.get(), Key1(key), kTypeValue,
1061 "value" + std::to_string(i - 1),
1062 write_timestamps[i - 1]);
1063 }
1064 }
1065 size_t expected_count = kMaxKey + 1;
1066 ASSERT_EQ(expected_count, count);
1067 }
1068 // Delete all keys@ts=5 and check iteration result with start ts set
1069 {
1070 std::string write_timestamp = Timestamp(5, 0);
1071 WriteOptions write_opts;
1072 for (uint64_t key = 0; key < kMaxKey + 1; ++key) {
1073 Status s = db_->Delete(write_opts, Key1(key), write_timestamp);
1074 ASSERT_OK(s);
1075 }
1076
1077 std::string read_timestamp = Timestamp(6, 0);
1078 ReadOptions read_opts;
1079 Slice read_ts = read_timestamp;
1080 read_opts.timestamp = &read_ts;
1081 std::string read_timestamp_lb = Timestamp(2, 0);
1082 Slice read_ts_lb = read_timestamp_lb;
1083 read_opts.iter_start_ts = &read_ts_lb;
1084 std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
1085 int count = 0;
1086 uint64_t key = 0;
1087 for (it->Seek(Key1(0)), key = 0; it->Valid(); it->Next(), ++count, ++key) {
1088 CheckIterEntry(it.get(), Key1(key), kTypeDeletionWithTimestamp, Slice(),
1089 write_timestamp);
1090 // Skip key@ts=3 and land on tombstone key@ts=5
1091 it->Next();
1092 }
1093 ASSERT_EQ(kMaxKey + 1, count);
1094 }
1095 Close();
1096 }
1097
1098 TEST_F(DBBasicTestWithTimestamp, BackwardIterateLowerTsBound) {
1099 constexpr int kNumKeysPerFile = 128;
1100 constexpr uint64_t kMaxKey = 1024;
1101 Options options = CurrentOptions();
1102 options.env = env_;
1103 options.create_if_missing = true;
1104 const size_t kTimestampSize = Timestamp(0, 0).size();
1105 TestComparator test_cmp(kTimestampSize);
1106 options.comparator = &test_cmp;
1107 options.memtable_factory.reset(
1108 test::NewSpecialSkipListFactory(kNumKeysPerFile));
1109 DestroyAndReopen(options);
1110 const std::vector<std::string> write_timestamps = {Timestamp(1, 0),
1111 Timestamp(3, 0)};
1112 const std::vector<std::string> read_timestamps = {Timestamp(2, 0),
1113 Timestamp(4, 0)};
1114 const std::vector<std::string> read_timestamps_lb = {Timestamp(1, 0),
1115 Timestamp(1, 0)};
1116 for (size_t i = 0; i < write_timestamps.size(); ++i) {
1117 WriteOptions write_opts;
1118 for (uint64_t key = 0; key <= kMaxKey; ++key) {
1119 Status s = db_->Put(write_opts, Key1(key), write_timestamps[i],
1120 "value" + std::to_string(i));
1121 ASSERT_OK(s);
1122 }
1123 }
1124 for (size_t i = 0; i < read_timestamps.size(); ++i) {
1125 ReadOptions read_opts;
1126 Slice read_ts = read_timestamps[i];
1127 Slice read_ts_lb = read_timestamps_lb[i];
1128 read_opts.timestamp = &read_ts;
1129 read_opts.iter_start_ts = &read_ts_lb;
1130 std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
1131 int count = 0;
1132 uint64_t key = 0;
1133 for (it->SeekForPrev(Key1(kMaxKey)), key = kMaxKey; it->Valid();
1134 it->Prev(), ++count, --key) {
1135 CheckIterEntry(it.get(), Key1(key), kTypeValue, "value0",
1136 write_timestamps[0]);
1137 if (i > 0) {
1138 it->Prev();
1139 CheckIterEntry(it.get(), Key1(key), kTypeValue, "value1",
1140 write_timestamps[1]);
1141 }
1142 }
1143 size_t expected_count = kMaxKey + 1;
1144 ASSERT_EQ(expected_count, count);
1145 }
1146 // Delete all keys@ts=5 and check iteration result with start ts set
1147 {
1148 std::string write_timestamp = Timestamp(5, 0);
1149 WriteOptions write_opts;
1150 for (uint64_t key = 0; key < kMaxKey + 1; ++key) {
1151 Status s = db_->Delete(write_opts, Key1(key), write_timestamp);
1152 ASSERT_OK(s);
1153 }
1154
1155 std::string read_timestamp = Timestamp(6, 0);
1156 ReadOptions read_opts;
1157 Slice read_ts = read_timestamp;
1158 read_opts.timestamp = &read_ts;
1159 std::string read_timestamp_lb = Timestamp(2, 0);
1160 Slice read_ts_lb = read_timestamp_lb;
1161 read_opts.iter_start_ts = &read_ts_lb;
1162 std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
1163 int count = 0;
1164 uint64_t key = kMaxKey;
1165 for (it->SeekForPrev(Key1(key)), key = kMaxKey; it->Valid();
1166 it->Prev(), ++count, --key) {
1167 CheckIterEntry(it.get(), Key1(key), kTypeValue, "value1",
1168 Timestamp(3, 0));
1169 it->Prev();
1170 CheckIterEntry(it.get(), Key1(key), kTypeDeletionWithTimestamp, Slice(),
1171 write_timestamp);
1172 }
1173 ASSERT_EQ(kMaxKey + 1, count);
1174 }
1175 Close();
1176 }
1177
1178 TEST_F(DBBasicTestWithTimestamp, SimpleBackwardIterateLowerTsBound) {
1179 Options options = CurrentOptions();
1180 options.env = env_;
1181 options.create_if_missing = true;
1182 const size_t kTimestampSize = Timestamp(0, 0).size();
1183 TestComparator test_cmp(kTimestampSize);
1184 options.comparator = &test_cmp;
1185 DestroyAndReopen(options);
1186
1187 std::string ts_ub_buf = Timestamp(5, 0);
1188 Slice ts_ub = ts_ub_buf;
1189 std::string ts_lb_buf = Timestamp(1, 0);
1190 Slice ts_lb = ts_lb_buf;
1191
1192 {
1193 ReadOptions read_opts;
1194 read_opts.timestamp = &ts_ub;
1195 read_opts.iter_start_ts = &ts_lb;
1196 std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
1197 it->SeekToLast();
1198 ASSERT_FALSE(it->Valid());
1199 ASSERT_OK(it->status());
1200
1201 it->SeekForPrev("foo");
1202 ASSERT_FALSE(it->Valid());
1203 ASSERT_OK(it->status());
1204 }
1205
1206 // Test iterate_upper_bound
1207 ASSERT_OK(db_->Put(WriteOptions(), "a", Timestamp(0, 0), "v0"));
1208 ASSERT_OK(db_->SingleDelete(WriteOptions(), "a", Timestamp(1, 0)));
1209
1210 for (int i = 0; i < 5; ++i) {
1211 ASSERT_OK(db_->Put(WriteOptions(), "b", Timestamp(i, 0),
1212 "v" + std::to_string(i)));
1213 }
1214
1215 {
1216 ReadOptions read_opts;
1217 read_opts.timestamp = &ts_ub;
1218 read_opts.iter_start_ts = &ts_lb;
1219 std::string key_ub_str = "b"; // exclusive
1220 Slice key_ub = key_ub_str;
1221 read_opts.iterate_upper_bound = &key_ub;
1222 std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
1223 it->SeekToLast();
1224 CheckIterEntry(it.get(), "a", kTypeSingleDeletion, Slice(),
1225 Timestamp(1, 0));
1226
1227 key_ub_str = "a"; // exclusive
1228 key_ub = key_ub_str;
1229 read_opts.iterate_upper_bound = &key_ub;
1230 it.reset(db_->NewIterator(read_opts));
1231 it->SeekToLast();
1232 ASSERT_FALSE(it->Valid());
1233 ASSERT_OK(it->status());
1234 }
1235
1236 Close();
1237 }
1238
1239 TEST_F(DBBasicTestWithTimestamp, BackwardIterateLowerTsBound_Reseek) {
1240 Options options = CurrentOptions();
1241 options.env = env_;
1242 options.create_if_missing = true;
1243 options.max_sequential_skip_in_iterations = 2;
1244 const size_t kTimestampSize = Timestamp(0, 0).size();
1245 TestComparator test_cmp(kTimestampSize);
1246 options.comparator = &test_cmp;
1247 DestroyAndReopen(options);
1248
1249 for (int i = 0; i < 10; ++i) {
1250 ASSERT_OK(db_->Put(WriteOptions(), "a", Timestamp(i, 0),
1251 "v" + std::to_string(i)));
1252 }
1253
1254 for (int i = 0; i < 10; ++i) {
1255 ASSERT_OK(db_->Put(WriteOptions(), "b", Timestamp(i, 0),
1256 "v" + std::to_string(i)));
1257 }
1258
1259 {
1260 std::string ts_ub_buf = Timestamp(6, 0);
1261 Slice ts_ub = ts_ub_buf;
1262 std::string ts_lb_buf = Timestamp(4, 0);
1263 Slice ts_lb = ts_lb_buf;
1264
1265 ReadOptions read_opts;
1266 read_opts.timestamp = &ts_ub;
1267 read_opts.iter_start_ts = &ts_lb;
1268 std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
1269 it->SeekToLast();
1270 for (int i = 0; i < 3 && it->Valid(); it->Prev(), ++i) {
1271 CheckIterEntry(it.get(), "b", kTypeValue, "v" + std::to_string(4 + i),
1272 Timestamp(4 + i, 0));
1273 }
1274 for (int i = 0; i < 3 && it->Valid(); it->Prev(), ++i) {
1275 CheckIterEntry(it.get(), "a", kTypeValue, "v" + std::to_string(4 + i),
1276 Timestamp(4 + i, 0));
1277 }
1278 }
1279
1280 Close();
1281 }
1282
1283 TEST_F(DBBasicTestWithTimestamp, ReseekToTargetTimestamp) {
1284 Options options = CurrentOptions();
1285 options.env = env_;
1286 options.create_if_missing = true;
1287 constexpr size_t kNumKeys = 16;
1288 options.max_sequential_skip_in_iterations = kNumKeys / 2;
1289 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
1290 const size_t kTimestampSize = Timestamp(0, 0).size();
1291 TestComparator test_cmp(kTimestampSize);
1292 options.comparator = &test_cmp;
1293 DestroyAndReopen(options);
1294 // Insert kNumKeys
1295 WriteOptions write_opts;
1296 Status s;
1297 for (size_t i = 0; i != kNumKeys; ++i) {
1298 std::string ts = Timestamp(static_cast<uint64_t>(i + 1), 0);
1299 s = db_->Put(write_opts, "foo", ts, "value" + std::to_string(i));
1300 ASSERT_OK(s);
1301 }
1302 {
1303 ReadOptions read_opts;
1304 std::string ts_str = Timestamp(1, 0);
1305 Slice ts = ts_str;
1306 read_opts.timestamp = &ts;
1307 std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
1308 iter->SeekToFirst();
1309 CheckIterUserEntry(iter.get(), "foo", kTypeValue, "value0", ts_str);
1310 ASSERT_EQ(
1311 1, options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION));
1312
1313 ts_str = Timestamp(kNumKeys, 0);
1314 ts = ts_str;
1315 read_opts.timestamp = &ts;
1316 iter.reset(db_->NewIterator(read_opts));
1317 iter->SeekToLast();
1318 CheckIterUserEntry(iter.get(), "foo", kTypeValue,
1319 "value" + std::to_string(kNumKeys - 1), ts_str);
1320 ASSERT_EQ(
1321 2, options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION));
1322 }
1323 Close();
1324 }
1325
1326 TEST_F(DBBasicTestWithTimestamp, ReseekToNextUserKey) {
1327 Options options = CurrentOptions();
1328 options.env = env_;
1329 options.create_if_missing = true;
1330 constexpr size_t kNumKeys = 16;
1331 options.max_sequential_skip_in_iterations = kNumKeys / 2;
1332 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
1333 const size_t kTimestampSize = Timestamp(0, 0).size();
1334 TestComparator test_cmp(kTimestampSize);
1335 options.comparator = &test_cmp;
1336 DestroyAndReopen(options);
1337 // Write kNumKeys + 1 keys
1338 WriteOptions write_opts;
1339 Status s;
1340 for (size_t i = 0; i != kNumKeys; ++i) {
1341 std::string ts = Timestamp(static_cast<uint64_t>(i + 1), 0);
1342 s = db_->Put(write_opts, "a", ts, "value" + std::to_string(i));
1343 ASSERT_OK(s);
1344 }
1345 {
1346 std::string ts_str = Timestamp(static_cast<uint64_t>(kNumKeys + 1), 0);
1347 WriteBatch batch(0, 0, 0, kTimestampSize);
1348 { ASSERT_OK(batch.Put("a", "new_value")); }
1349 { ASSERT_OK(batch.Put("b", "new_value")); }
1350 s = batch.UpdateTimestamps(
1351 ts_str, [kTimestampSize](uint32_t) { return kTimestampSize; });
1352 ASSERT_OK(s);
1353 s = db_->Write(write_opts, &batch);
1354 ASSERT_OK(s);
1355 }
1356 {
1357 ReadOptions read_opts;
1358 std::string ts_str = Timestamp(static_cast<uint64_t>(kNumKeys + 1), 0);
1359 Slice ts = ts_str;
1360 read_opts.timestamp = &ts;
1361 std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
1362 iter->Seek("a");
1363 iter->Next();
1364 CheckIterUserEntry(iter.get(), "b", kTypeValue, "new_value", ts_str);
1365 ASSERT_EQ(
1366 1, options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION));
1367 }
1368 Close();
1369 }
1370
1371 TEST_F(DBBasicTestWithTimestamp, ReseekToUserKeyBeforeSavedKey) {
1372 Options options = GetDefaultOptions();
1373 options.env = env_;
1374 options.create_if_missing = true;
1375 constexpr size_t kNumKeys = 16;
1376 options.max_sequential_skip_in_iterations = kNumKeys / 2;
1377 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
1378 const size_t kTimestampSize = Timestamp(0, 0).size();
1379 TestComparator test_cmp(kTimestampSize);
1380 options.comparator = &test_cmp;
1381 DestroyAndReopen(options);
1382 for (size_t i = 0; i < kNumKeys; ++i) {
1383 std::string ts = Timestamp(static_cast<uint64_t>(i + 1), 0);
1384 WriteOptions write_opts;
1385 Status s = db_->Put(write_opts, "b", ts, "value" + std::to_string(i));
1386 ASSERT_OK(s);
1387 }
1388 {
1389 std::string ts = Timestamp(1, 0);
1390 WriteOptions write_opts;
1391 ASSERT_OK(db_->Put(write_opts, "a", ts, "value"));
1392 }
1393 {
1394 ReadOptions read_opts;
1395 std::string ts_str = Timestamp(1, 0);
1396 Slice ts = ts_str;
1397 read_opts.timestamp = &ts;
1398 std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
1399 iter->SeekToLast();
1400 iter->Prev();
1401 CheckIterUserEntry(iter.get(), "a", kTypeValue, "value", ts_str);
1402 ASSERT_EQ(
1403 1, options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION));
1404 }
1405 Close();
1406 }
1407
1408 TEST_F(DBBasicTestWithTimestamp, MultiGetWithFastLocalBloom) {
1409 Options options = CurrentOptions();
1410 options.env = env_;
1411 options.create_if_missing = true;
1412 BlockBasedTableOptions bbto;
1413 bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
1414 bbto.cache_index_and_filter_blocks = true;
1415 bbto.whole_key_filtering = true;
1416 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
1417 const size_t kTimestampSize = Timestamp(0, 0).size();
1418 TestComparator test_cmp(kTimestampSize);
1419 options.comparator = &test_cmp;
1420 DestroyAndReopen(options);
1421
1422 // Write any value
1423 WriteOptions write_opts;
1424 std::string ts = Timestamp(1, 0);
1425
1426 ASSERT_OK(db_->Put(write_opts, "foo", ts, "bar"));
1427
1428 ASSERT_OK(Flush());
1429
1430 // Read with MultiGet
1431 ReadOptions read_opts;
1432 Slice read_ts = ts;
1433 read_opts.timestamp = &read_ts;
1434 size_t batch_size = 1;
1435 std::vector<Slice> keys(batch_size);
1436 std::vector<PinnableSlice> values(batch_size);
1437 std::vector<Status> statuses(batch_size);
1438 std::vector<std::string> timestamps(batch_size);
1439 keys[0] = "foo";
1440 ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
1441 db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(),
1442 timestamps.data(), statuses.data(), true);
1443
1444 ASSERT_OK(statuses[0]);
1445 ASSERT_EQ(Timestamp(1, 0), timestamps[0]);
1446 for (auto& elem : values) {
1447 elem.Reset();
1448 }
1449
1450 ASSERT_OK(db_->SingleDelete(WriteOptions(), "foo", Timestamp(2, 0)));
1451 ts = Timestamp(3, 0);
1452 read_ts = ts;
1453 read_opts.timestamp = &read_ts;
1454 db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(),
1455 timestamps.data(), statuses.data(), true);
1456 ASSERT_TRUE(statuses[0].IsNotFound());
1457 ASSERT_EQ(Timestamp(2, 0), timestamps[0]);
1458
1459 Close();
1460 }
1461
1462 TEST_P(DBBasicTestWithTimestampTableOptions, MultiGetWithPrefix) {
1463 Options options = CurrentOptions();
1464 options.env = env_;
1465 options.create_if_missing = true;
1466 options.prefix_extractor.reset(NewCappedPrefixTransform(5));
1467 BlockBasedTableOptions bbto;
1468 bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
1469 bbto.cache_index_and_filter_blocks = true;
1470 bbto.whole_key_filtering = false;
1471 bbto.index_type = GetParam();
1472 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
1473 const size_t kTimestampSize = Timestamp(0, 0).size();
1474 TestComparator test_cmp(kTimestampSize);
1475 options.comparator = &test_cmp;
1476 DestroyAndReopen(options);
1477
1478 // Write any value
1479 WriteOptions write_opts;
1480 std::string ts = Timestamp(1, 0);
1481
1482 ASSERT_OK(db_->Put(write_opts, "foo", ts, "bar"));
1483
1484 ASSERT_OK(Flush());
1485
1486 // Read with MultiGet
1487 ReadOptions read_opts;
1488 Slice read_ts = ts;
1489 read_opts.timestamp = &read_ts;
1490 size_t batch_size = 1;
1491 std::vector<Slice> keys(batch_size);
1492 std::vector<PinnableSlice> values(batch_size);
1493 std::vector<Status> statuses(batch_size);
1494 std::vector<std::string> timestamps(batch_size);
1495 keys[0] = "foo";
1496 ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
1497 db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(),
1498 timestamps.data(), statuses.data(), true);
1499
1500 ASSERT_OK(statuses[0]);
1501 ASSERT_EQ(Timestamp(1, 0), timestamps[0]);
1502 for (auto& elem : values) {
1503 elem.Reset();
1504 }
1505
1506 ASSERT_OK(db_->SingleDelete(WriteOptions(), "foo", Timestamp(2, 0)));
1507 // TODO re-enable after fixing a bug of kHashSearch
1508 if (GetParam() != BlockBasedTableOptions::IndexType::kHashSearch) {
1509 ASSERT_OK(Flush());
1510 }
1511
1512 ts = Timestamp(3, 0);
1513 read_ts = ts;
1514 db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(),
1515 timestamps.data(), statuses.data(), true);
1516 ASSERT_TRUE(statuses[0].IsNotFound());
1517 ASSERT_EQ(Timestamp(2, 0), timestamps[0]);
1518
1519 Close();
1520 }
1521
1522 TEST_P(DBBasicTestWithTimestampTableOptions, MultiGetWithMemBloomFilter) {
1523 Options options = CurrentOptions();
1524 options.env = env_;
1525 options.create_if_missing = true;
1526 options.prefix_extractor.reset(NewCappedPrefixTransform(5));
1527 BlockBasedTableOptions bbto;
1528 bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
1529 bbto.cache_index_and_filter_blocks = true;
1530 bbto.whole_key_filtering = false;
1531 bbto.index_type = GetParam();
1532 options.memtable_prefix_bloom_size_ratio = 0.1;
1533 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
1534 const size_t kTimestampSize = Timestamp(0, 0).size();
1535 TestComparator test_cmp(kTimestampSize);
1536 options.comparator = &test_cmp;
1537 DestroyAndReopen(options);
1538
1539 // Write any value
1540 WriteOptions write_opts;
1541 std::string ts = Timestamp(1, 0);
1542
1543 ASSERT_OK(db_->Put(write_opts, "foo", ts, "bar"));
1544
1545 // Read with MultiGet
1546 ts = Timestamp(2, 0);
1547 Slice read_ts = ts;
1548 ReadOptions read_opts;
1549 read_opts.timestamp = &read_ts;
1550 size_t batch_size = 1;
1551 std::vector<Slice> keys(batch_size);
1552 std::vector<PinnableSlice> values(batch_size);
1553 std::vector<Status> statuses(batch_size);
1554 keys[0] = "foo";
1555 ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
1556 db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(),
1557 statuses.data());
1558
1559 ASSERT_OK(statuses[0]);
1560 Close();
1561 }
1562
1563 TEST_F(DBBasicTestWithTimestamp, MultiGetRangeFiltering) {
1564 Options options = CurrentOptions();
1565 options.env = env_;
1566 options.create_if_missing = true;
1567 BlockBasedTableOptions bbto;
1568 bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
1569 bbto.cache_index_and_filter_blocks = true;
1570 bbto.whole_key_filtering = false;
1571 options.memtable_prefix_bloom_size_ratio = 0.1;
1572 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
1573 const size_t kTimestampSize = Timestamp(0, 0).size();
1574 TestComparator test_cmp(kTimestampSize);
1575 options.comparator = &test_cmp;
1576 DestroyAndReopen(options);
1577
1578 // Write any value
1579 WriteOptions write_opts;
1580 std::string ts = Timestamp(1, 0);
1581
1582 // random data
1583 for (int i = 0; i < 3; i++) {
1584 auto key = std::to_string(i * 10);
1585 auto value = std::to_string(i * 10);
1586 Slice key_slice = key;
1587 Slice value_slice = value;
1588 ASSERT_OK(db_->Put(write_opts, key_slice, ts, value_slice));
1589 ASSERT_OK(Flush());
1590 }
1591
1592 // Make num_levels to 2 to do key range filtering of sst files
1593 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1594
1595 ASSERT_OK(db_->Put(write_opts, "foo", ts, "bar"));
1596
1597 ASSERT_OK(Flush());
1598
1599 // Read with MultiGet
1600 ts = Timestamp(2, 0);
1601 Slice read_ts = ts;
1602 ReadOptions read_opts;
1603 read_opts.timestamp = &read_ts;
1604 size_t batch_size = 1;
1605 std::vector<Slice> keys(batch_size);
1606 std::vector<PinnableSlice> values(batch_size);
1607 std::vector<Status> statuses(batch_size);
1608 keys[0] = "foo";
1609 ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
1610 db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(),
1611 statuses.data());
1612
1613 ASSERT_OK(statuses[0]);
1614 Close();
1615 }
1616
1617 TEST_P(DBBasicTestWithTimestampTableOptions, MultiGetPrefixFilter) {
1618 Options options = CurrentOptions();
1619 options.env = env_;
1620 options.create_if_missing = true;
1621 options.prefix_extractor.reset(NewCappedPrefixTransform(3));
1622 BlockBasedTableOptions bbto;
1623 bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
1624 bbto.cache_index_and_filter_blocks = true;
1625 bbto.whole_key_filtering = false;
1626 bbto.index_type = GetParam();
1627 options.memtable_prefix_bloom_size_ratio = 0.1;
1628 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
1629 const size_t kTimestampSize = Timestamp(0, 0).size();
1630 TestComparator test_cmp(kTimestampSize);
1631 options.comparator = &test_cmp;
1632 DestroyAndReopen(options);
1633
1634 WriteOptions write_opts;
1635 std::string ts = Timestamp(1, 0);
1636
1637 ASSERT_OK(db_->Put(write_opts, "foo", ts, "bar"));
1638
1639 ASSERT_OK(Flush());
1640 // Read with MultiGet
1641 ts = Timestamp(2, 0);
1642 Slice read_ts = ts;
1643 ReadOptions read_opts;
1644 read_opts.timestamp = &read_ts;
1645 size_t batch_size = 1;
1646 std::vector<Slice> keys(batch_size);
1647 std::vector<std::string> values(batch_size);
1648 std::vector<std::string> timestamps(batch_size);
1649 keys[0] = "foo";
1650 ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
1651 std::vector<ColumnFamilyHandle*> cfhs(keys.size(), cfh);
1652 std::vector<Status> statuses =
1653 db_->MultiGet(read_opts, cfhs, keys, &values, &timestamps);
1654
1655 ASSERT_OK(statuses[0]);
1656 Close();
1657 }
1658
1659 TEST_F(DBBasicTestWithTimestamp, MaxKeysSkippedDuringNext) {
1660 Options options = CurrentOptions();
1661 options.env = env_;
1662 options.create_if_missing = true;
1663 const size_t kTimestampSize = Timestamp(0, 0).size();
1664 TestComparator test_cmp(kTimestampSize);
1665 options.comparator = &test_cmp;
1666 DestroyAndReopen(options);
1667 constexpr size_t max_skippable_internal_keys = 2;
1668 const size_t kNumKeys = max_skippable_internal_keys + 2;
1669 WriteOptions write_opts;
1670 Status s;
1671 {
1672 std::string ts = Timestamp(1, 0);
1673 ASSERT_OK(db_->Put(write_opts, "a", ts, "value"));
1674 }
1675 for (size_t i = 0; i < kNumKeys; ++i) {
1676 std::string ts = Timestamp(static_cast<uint64_t>(i + 1), 0);
1677 s = db_->Put(write_opts, "b", ts, "value" + std::to_string(i));
1678 ASSERT_OK(s);
1679 }
1680 {
1681 ReadOptions read_opts;
1682 read_opts.max_skippable_internal_keys = max_skippable_internal_keys;
1683 std::string ts_str = Timestamp(1, 0);
1684 Slice ts = ts_str;
1685 read_opts.timestamp = &ts;
1686 std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
1687 iter->SeekToFirst();
1688 iter->Next();
1689 ASSERT_TRUE(iter->status().IsIncomplete());
1690 }
1691 Close();
1692 }
1693
1694 TEST_F(DBBasicTestWithTimestamp, MaxKeysSkippedDuringPrev) {
1695 Options options = GetDefaultOptions();
1696 options.env = env_;
1697 options.create_if_missing = true;
1698 const size_t kTimestampSize = Timestamp(0, 0).size();
1699 TestComparator test_cmp(kTimestampSize);
1700 options.comparator = &test_cmp;
1701 DestroyAndReopen(options);
1702 constexpr size_t max_skippable_internal_keys = 2;
1703 const size_t kNumKeys = max_skippable_internal_keys + 2;
1704 WriteOptions write_opts;
1705 Status s;
1706 {
1707 std::string ts = Timestamp(1, 0);
1708 ASSERT_OK(db_->Put(write_opts, "b", ts, "value"));
1709 }
1710 for (size_t i = 0; i < kNumKeys; ++i) {
1711 std::string ts = Timestamp(static_cast<uint64_t>(i + 1), 0);
1712 s = db_->Put(write_opts, "a", ts, "value" + std::to_string(i));
1713 ASSERT_OK(s);
1714 }
1715 {
1716 ReadOptions read_opts;
1717 read_opts.max_skippable_internal_keys = max_skippable_internal_keys;
1718 std::string ts_str = Timestamp(1, 0);
1719 Slice ts = ts_str;
1720 read_opts.timestamp = &ts;
1721 std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
1722 iter->SeekToLast();
1723 iter->Prev();
1724 ASSERT_TRUE(iter->status().IsIncomplete());
1725 }
1726 Close();
1727 }
1728
1729 // Create two L0, and compact them to a new L1. In this test, L1 is L_bottom.
1730 // Two L0s:
1731 // f1 f2
1732 // <a, 1, kTypeValue> <a, 3, kTypeDeletionWithTimestamp>...<b, 2, kTypeValue>
1733 // Since f2.smallest < f1.largest < f2.largest
1734 // f1 and f2 will be the inputs of a real compaction instead of trivial move.
1735 TEST_F(DBBasicTestWithTimestamp, CompactDeletionWithTimestampMarkerToBottom) {
1736 Options options = CurrentOptions();
1737 options.env = env_;
1738 options.create_if_missing = true;
1739 const size_t kTimestampSize = Timestamp(0, 0).size();
1740 TestComparator test_cmp(kTimestampSize);
1741 options.comparator = &test_cmp;
1742 options.num_levels = 2;
1743 options.level0_file_num_compaction_trigger = 2;
1744 DestroyAndReopen(options);
1745 WriteOptions write_opts;
1746 std::string ts = Timestamp(1, 0);
1747 ASSERT_OK(db_->Put(write_opts, "a", ts, "value0"));
1748 ASSERT_OK(Flush());
1749
1750 ts = Timestamp(2, 0);
1751 ASSERT_OK(db_->Put(write_opts, "b", ts, "value0"));
1752 ts = Timestamp(3, 0);
1753 ASSERT_OK(db_->Delete(write_opts, "a", ts));
1754 ASSERT_OK(Flush());
1755 ASSERT_OK(dbfull()->TEST_WaitForCompact());
1756
1757 ReadOptions read_opts;
1758 ts = Timestamp(1, 0);
1759 Slice read_ts = ts;
1760 read_opts.timestamp = &read_ts;
1761 std::string value;
1762 Status s = db_->Get(read_opts, "a", &value);
1763 ASSERT_OK(s);
1764 ASSERT_EQ("value0", value);
1765
1766 ts = Timestamp(3, 0);
1767 read_ts = ts;
1768 read_opts.timestamp = &read_ts;
1769 std::string key_ts;
1770 s = db_->Get(read_opts, "a", &value, &key_ts);
1771 ASSERT_TRUE(s.IsNotFound());
1772 ASSERT_EQ(Timestamp(3, 0), key_ts);
1773
1774 // Time-travel to the past before deletion
1775 ts = Timestamp(2, 0);
1776 read_ts = ts;
1777 read_opts.timestamp = &read_ts;
1778 s = db_->Get(read_opts, "a", &value);
1779 ASSERT_OK(s);
1780 ASSERT_EQ("value0", value);
1781 Close();
1782 }
1783
1784 #if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
1785 class DBBasicTestWithTimestampFilterPrefixSettings
1786 : public DBBasicTestWithTimestampBase,
1787 public testing::WithParamInterface<
1788 std::tuple<std::shared_ptr<const FilterPolicy>, bool, bool,
1789 std::shared_ptr<const SliceTransform>, bool, double,
1790 BlockBasedTableOptions::IndexType>> {
1791 public:
1792 DBBasicTestWithTimestampFilterPrefixSettings()
1793 : DBBasicTestWithTimestampBase(
1794 "db_basic_test_with_timestamp_filter_prefix") {}
1795 };
1796
1797 TEST_P(DBBasicTestWithTimestampFilterPrefixSettings, GetAndMultiGet) {
1798 Options options = CurrentOptions();
1799 options.env = env_;
1800 options.create_if_missing = true;
1801 BlockBasedTableOptions bbto;
1802 bbto.filter_policy = std::get<0>(GetParam());
1803 bbto.whole_key_filtering = std::get<1>(GetParam());
1804 bbto.cache_index_and_filter_blocks = std::get<2>(GetParam());
1805 bbto.index_type = std::get<6>(GetParam());
1806 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
1807 options.prefix_extractor = std::get<3>(GetParam());
1808 options.memtable_whole_key_filtering = std::get<4>(GetParam());
1809 options.memtable_prefix_bloom_size_ratio = std::get<5>(GetParam());
1810
1811 const size_t kTimestampSize = Timestamp(0, 0).size();
1812 TestComparator test_cmp(kTimestampSize);
1813 options.comparator = &test_cmp;
1814 DestroyAndReopen(options);
1815 const int kMaxKey = 1000;
1816
1817 // Write any value
1818 WriteOptions write_opts;
1819 std::string ts = Timestamp(1, 0);
1820
1821 int idx = 0;
1822 for (; idx < kMaxKey / 4; idx++) {
1823 ASSERT_OK(db_->Put(write_opts, Key1(idx), ts, "bar"));
1824 ASSERT_OK(db_->Put(write_opts, KeyWithPrefix("foo", idx), ts, "bar"));
1825 }
1826
1827 ASSERT_OK(Flush());
1828 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1829
1830 for (; idx < kMaxKey / 2; idx++) {
1831 ASSERT_OK(db_->Put(write_opts, Key1(idx), ts, "bar"));
1832 ASSERT_OK(db_->Put(write_opts, KeyWithPrefix("foo", idx), ts, "bar"));
1833 }
1834
1835 ASSERT_OK(Flush());
1836
1837 for (; idx < kMaxKey; idx++) {
1838 ASSERT_OK(db_->Put(write_opts, Key1(idx), ts, "bar"));
1839 ASSERT_OK(db_->Put(write_opts, KeyWithPrefix("foo", idx), ts, "bar"));
1840 }
1841
1842 // Read with MultiGet
1843 ReadOptions read_opts;
1844 Slice read_ts = ts;
1845 read_opts.timestamp = &read_ts;
1846
1847 for (idx = 0; idx < kMaxKey; idx++) {
1848 size_t batch_size = 4;
1849 std::vector<std::string> keys_str(batch_size);
1850 std::vector<PinnableSlice> values(batch_size);
1851 std::vector<Status> statuses(batch_size);
1852 ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
1853
1854 keys_str[0] = Key1(idx);
1855 keys_str[1] = KeyWithPrefix("foo", idx);
1856 keys_str[2] = Key1(kMaxKey + idx);
1857 keys_str[3] = KeyWithPrefix("foo", kMaxKey + idx);
1858
1859 auto keys = ConvertStrToSlice(keys_str);
1860
1861 db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(),
1862 statuses.data());
1863
1864 for (int i = 0; i < 2; i++) {
1865 ASSERT_OK(statuses[i]);
1866 }
1867 for (int i = 2; i < 4; i++) {
1868 ASSERT_TRUE(statuses[i].IsNotFound());
1869 }
1870
1871 for (int i = 0; i < 2; i++) {
1872 std::string value;
1873 ASSERT_OK(db_->Get(read_opts, keys[i], &value));
1874 std::unique_ptr<Iterator> it1(db_->NewIterator(read_opts));
1875 ASSERT_NE(nullptr, it1);
1876 ASSERT_OK(it1->status());
1877 it1->Seek(keys[i]);
1878 ASSERT_TRUE(it1->Valid());
1879 }
1880
1881 for (int i = 2; i < 4; i++) {
1882 std::string value;
1883 Status s = db_->Get(read_opts, keys[i], &value);
1884 ASSERT_TRUE(s.IsNotFound());
1885 }
1886 }
1887 Close();
1888 }
1889
1890 INSTANTIATE_TEST_CASE_P(
1891 Timestamp, DBBasicTestWithTimestampFilterPrefixSettings,
1892 ::testing::Combine(
1893 ::testing::Values(
1894 std::shared_ptr<const FilterPolicy>(nullptr),
1895 std::shared_ptr<const FilterPolicy>(NewBloomFilterPolicy(10, true)),
1896 std::shared_ptr<const FilterPolicy>(NewBloomFilterPolicy(10,
1897 false))),
1898 ::testing::Bool(), ::testing::Bool(),
1899 ::testing::Values(
1900 std::shared_ptr<const SliceTransform>(NewFixedPrefixTransform(1)),
1901 std::shared_ptr<const SliceTransform>(NewFixedPrefixTransform(4)),
1902 std::shared_ptr<const SliceTransform>(NewFixedPrefixTransform(7)),
1903 std::shared_ptr<const SliceTransform>(NewFixedPrefixTransform(8))),
1904 ::testing::Bool(), ::testing::Values(0, 0.1),
1905 ::testing::Values(
1906 BlockBasedTableOptions::IndexType::kBinarySearch,
1907 BlockBasedTableOptions::IndexType::kHashSearch,
1908 BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch,
1909 BlockBasedTableOptions::IndexType::kBinarySearchWithFirstKey)));
1910 #endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
1911
1912 class DataVisibilityTest : public DBBasicTestWithTimestampBase {
1913 public:
1914 DataVisibilityTest() : DBBasicTestWithTimestampBase("data_visibility_test") {
1915 // Initialize test data
1916 for (int i = 0; i < kTestDataSize; i++) {
1917 test_data_[i].key = "key" + std::to_string(i);
1918 test_data_[i].value = "value" + std::to_string(i);
1919 test_data_[i].timestamp = Timestamp(i, 0);
1920 test_data_[i].ts = i;
1921 test_data_[i].seq_num = kMaxSequenceNumber;
1922 }
1923 }
1924
1925 protected:
1926 struct TestData {
1927 std::string key;
1928 std::string value;
1929 int ts;
1930 std::string timestamp;
1931 SequenceNumber seq_num;
1932 };
1933
1934 constexpr static int kTestDataSize = 3;
1935 TestData test_data_[kTestDataSize];
1936
1937 void PutTestData(int index, ColumnFamilyHandle* cfh = nullptr) {
1938 ASSERT_LE(index, kTestDataSize);
1939 WriteOptions write_opts;
1940
1941 if (cfh == nullptr) {
1942 ASSERT_OK(db_->Put(write_opts, test_data_[index].key,
1943 test_data_[index].timestamp, test_data_[index].value));
1944 const Snapshot* snap = db_->GetSnapshot();
1945 test_data_[index].seq_num = snap->GetSequenceNumber();
1946 if (index > 0) {
1947 ASSERT_GT(test_data_[index].seq_num, test_data_[index - 1].seq_num);
1948 }
1949 db_->ReleaseSnapshot(snap);
1950 } else {
1951 ASSERT_OK(db_->Put(write_opts, cfh, test_data_[index].key,
1952 test_data_[index].timestamp, test_data_[index].value));
1953 }
1954 }
1955
1956 void AssertVisibility(int ts, SequenceNumber seq,
1957 std::vector<Status> statuses) {
1958 ASSERT_EQ(kTestDataSize, statuses.size());
1959 for (int i = 0; i < kTestDataSize; i++) {
1960 if (test_data_[i].seq_num <= seq && test_data_[i].ts <= ts) {
1961 ASSERT_OK(statuses[i]);
1962 } else {
1963 ASSERT_TRUE(statuses[i].IsNotFound());
1964 }
1965 }
1966 }
1967
1968 std::vector<Slice> GetKeys() {
1969 std::vector<Slice> ret(kTestDataSize);
1970 for (int i = 0; i < kTestDataSize; i++) {
1971 ret[i] = test_data_[i].key;
1972 }
1973 return ret;
1974 }
1975
1976 void VerifyDefaultCF(int ts, const Snapshot* snap = nullptr) {
1977 ReadOptions read_opts;
1978 std::string read_ts = Timestamp(ts, 0);
1979 Slice read_ts_slice = read_ts;
1980 read_opts.timestamp = &read_ts_slice;
1981 read_opts.snapshot = snap;
1982
1983 ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
1984 std::vector<ColumnFamilyHandle*> cfs(kTestDataSize, cfh);
1985 SequenceNumber seq =
1986 snap ? snap->GetSequenceNumber() : kMaxSequenceNumber - 1;
1987
1988 // There're several MultiGet interfaces with not exactly the same
1989 // implementations, query data with all of them.
1990 auto keys = GetKeys();
1991 std::vector<std::string> values;
1992 auto s1 = db_->MultiGet(read_opts, cfs, keys, &values);
1993 AssertVisibility(ts, seq, s1);
1994
1995 auto s2 = db_->MultiGet(read_opts, keys, &values);
1996 AssertVisibility(ts, seq, s2);
1997
1998 std::vector<std::string> timestamps;
1999 auto s3 = db_->MultiGet(read_opts, cfs, keys, &values, &timestamps);
2000 AssertVisibility(ts, seq, s3);
2001
2002 auto s4 = db_->MultiGet(read_opts, keys, &values, &timestamps);
2003 AssertVisibility(ts, seq, s4);
2004
2005 std::vector<PinnableSlice> values_ps5(kTestDataSize);
2006 std::vector<Status> s5(kTestDataSize);
2007 db_->MultiGet(read_opts, cfh, kTestDataSize, keys.data(), values_ps5.data(),
2008 s5.data());
2009 AssertVisibility(ts, seq, s5);
2010
2011 std::vector<PinnableSlice> values_ps6(kTestDataSize);
2012 std::vector<Status> s6(kTestDataSize);
2013 std::vector<std::string> timestamps_array(kTestDataSize);
2014 db_->MultiGet(read_opts, cfh, kTestDataSize, keys.data(), values_ps6.data(),
2015 timestamps_array.data(), s6.data());
2016 AssertVisibility(ts, seq, s6);
2017
2018 std::vector<PinnableSlice> values_ps7(kTestDataSize);
2019 std::vector<Status> s7(kTestDataSize);
2020 db_->MultiGet(read_opts, kTestDataSize, cfs.data(), keys.data(),
2021 values_ps7.data(), s7.data());
2022 AssertVisibility(ts, seq, s7);
2023
2024 std::vector<PinnableSlice> values_ps8(kTestDataSize);
2025 std::vector<Status> s8(kTestDataSize);
2026 db_->MultiGet(read_opts, kTestDataSize, cfs.data(), keys.data(),
2027 values_ps8.data(), timestamps_array.data(), s8.data());
2028 AssertVisibility(ts, seq, s8);
2029 }
2030
2031 void VerifyDefaultCF(const Snapshot* snap = nullptr) {
2032 for (int i = 0; i <= kTestDataSize; i++) {
2033 VerifyDefaultCF(i, snap);
2034 }
2035 }
2036 };
2037 constexpr int DataVisibilityTest::kTestDataSize;
2038
2039 // Application specifies timestamp but not snapshot.
2040 // reader writer
2041 // ts'=90
2042 // ts=100
2043 // seq=10
2044 // seq'=11
2045 // write finishes
2046 // GetImpl(ts,seq)
2047 // It is OK to return <k, t1, s1> if ts>=t1 AND seq>=s1. If ts>=t1 but seq<s1,
2048 // the key should not be returned.
2049 TEST_F(DataVisibilityTest, PointLookupWithoutSnapshot1) {
2050 Options options = CurrentOptions();
2051 const size_t kTimestampSize = Timestamp(0, 0).size();
2052 TestComparator test_cmp(kTimestampSize);
2053 options.comparator = &test_cmp;
2054 DestroyAndReopen(options);
2055 SyncPoint::GetInstance()->DisableProcessing();
2056 SyncPoint::GetInstance()->LoadDependency({
2057 {"DBImpl::GetImpl:3",
2058 "DataVisibilityTest::PointLookupWithoutSnapshot1:BeforePut"},
2059 {"DataVisibilityTest::PointLookupWithoutSnapshot1:AfterPut",
2060 "DBImpl::GetImpl:4"},
2061 });
2062 SyncPoint::GetInstance()->EnableProcessing();
2063 port::Thread writer_thread([this]() {
2064 std::string write_ts = Timestamp(1, 0);
2065 WriteOptions write_opts;
2066 TEST_SYNC_POINT(
2067 "DataVisibilityTest::PointLookupWithoutSnapshot1:BeforePut");
2068 Status s = db_->Put(write_opts, "foo", write_ts, "value");
2069 ASSERT_OK(s);
2070 TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithoutSnapshot1:AfterPut");
2071 });
2072 ReadOptions read_opts;
2073 std::string read_ts_str = Timestamp(3, 0);
2074 Slice read_ts = read_ts_str;
2075 read_opts.timestamp = &read_ts;
2076 std::string value;
2077 Status s = db_->Get(read_opts, "foo", &value);
2078
2079 writer_thread.join();
2080 ASSERT_TRUE(s.IsNotFound());
2081 Close();
2082 }
2083
2084 // Application specifies timestamp but not snapshot.
2085 // reader writer
2086 // ts'=90
2087 // ts=100
2088 // seq=10
2089 // seq'=11
2090 // write finishes
2091 // Flush
2092 // GetImpl(ts,seq)
2093 // It is OK to return <k, t1, s1> if ts>=t1 AND seq>=s1. If ts>=t1 but seq<s1,
2094 // the key should not be returned.
2095 TEST_F(DataVisibilityTest, PointLookupWithoutSnapshot2) {
2096 Options options = CurrentOptions();
2097 const size_t kTimestampSize = Timestamp(0, 0).size();
2098 TestComparator test_cmp(kTimestampSize);
2099 options.comparator = &test_cmp;
2100 DestroyAndReopen(options);
2101 SyncPoint::GetInstance()->DisableProcessing();
2102 SyncPoint::GetInstance()->LoadDependency({
2103 {"DBImpl::GetImpl:3",
2104 "DataVisibilityTest::PointLookupWithoutSnapshot2:BeforePut"},
2105 {"DataVisibilityTest::PointLookupWithoutSnapshot2:AfterPut",
2106 "DBImpl::GetImpl:4"},
2107 });
2108 SyncPoint::GetInstance()->EnableProcessing();
2109 port::Thread writer_thread([this]() {
2110 std::string write_ts = Timestamp(1, 0);
2111 WriteOptions write_opts;
2112 TEST_SYNC_POINT(
2113 "DataVisibilityTest::PointLookupWithoutSnapshot2:BeforePut");
2114 Status s = db_->Put(write_opts, "foo", write_ts, "value");
2115 ASSERT_OK(s);
2116 ASSERT_OK(Flush());
2117
2118 write_ts = Timestamp(2, 0);
2119 s = db_->Put(write_opts, "bar", write_ts, "value");
2120 ASSERT_OK(s);
2121 TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithoutSnapshot2:AfterPut");
2122 });
2123 ReadOptions read_opts;
2124 std::string read_ts_str = Timestamp(3, 0);
2125 Slice read_ts = read_ts_str;
2126 read_opts.timestamp = &read_ts;
2127 std::string value;
2128 Status s = db_->Get(read_opts, "foo", &value);
2129 writer_thread.join();
2130 ASSERT_TRUE(s.IsNotFound());
2131 Close();
2132 }
2133
2134 // Application specifies both timestamp and snapshot.
2135 // reader writer
2136 // seq=10
2137 // ts'=90
2138 // ts=100
2139 // seq'=11
2140 // write finishes
2141 // GetImpl(ts,seq)
2142 // Since application specifies both timestamp and snapshot, application expects
2143 // to see data that visible in BOTH timestamp and sequence number. Therefore,
2144 // <k, t1, s1> can be returned only if t1<=ts AND s1<=seq.
2145 TEST_F(DataVisibilityTest, PointLookupWithSnapshot1) {
2146 Options options = CurrentOptions();
2147 const size_t kTimestampSize = Timestamp(0, 0).size();
2148 TestComparator test_cmp(kTimestampSize);
2149 options.comparator = &test_cmp;
2150 DestroyAndReopen(options);
2151 SyncPoint::GetInstance()->DisableProcessing();
2152 SyncPoint::GetInstance()->LoadDependency({
2153 {"DataVisibilityTest::PointLookupWithSnapshot1:AfterTakingSnap",
2154 "DataVisibilityTest::PointLookupWithSnapshot1:BeforePut"},
2155 {"DataVisibilityTest::PointLookupWithSnapshot1:AfterPut",
2156 "DBImpl::GetImpl:1"},
2157 });
2158 SyncPoint::GetInstance()->EnableProcessing();
2159 port::Thread writer_thread([this]() {
2160 std::string write_ts = Timestamp(1, 0);
2161 WriteOptions write_opts;
2162 TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithSnapshot1:BeforePut");
2163 Status s = db_->Put(write_opts, "foo", write_ts, "value");
2164 TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithSnapshot1:AfterPut");
2165 ASSERT_OK(s);
2166 });
2167 ReadOptions read_opts;
2168 const Snapshot* snap = db_->GetSnapshot();
2169 TEST_SYNC_POINT(
2170 "DataVisibilityTest::PointLookupWithSnapshot1:AfterTakingSnap");
2171 read_opts.snapshot = snap;
2172 std::string read_ts_str = Timestamp(3, 0);
2173 Slice read_ts = read_ts_str;
2174 read_opts.timestamp = &read_ts;
2175 std::string value;
2176 Status s = db_->Get(read_opts, "foo", &value);
2177 writer_thread.join();
2178
2179 ASSERT_TRUE(s.IsNotFound());
2180
2181 db_->ReleaseSnapshot(snap);
2182 Close();
2183 }
2184
2185 // Application specifies both timestamp and snapshot.
2186 // reader writer
2187 // seq=10
2188 // ts'=90
2189 // ts=100
2190 // seq'=11
2191 // write finishes
2192 // Flush
2193 // GetImpl(ts,seq)
2194 // Since application specifies both timestamp and snapshot, application expects
2195 // to see data that visible in BOTH timestamp and sequence number. Therefore,
2196 // <k, t1, s1> can be returned only if t1<=ts AND s1<=seq.
2197 TEST_F(DataVisibilityTest, PointLookupWithSnapshot2) {
2198 Options options = CurrentOptions();
2199 const size_t kTimestampSize = Timestamp(0, 0).size();
2200 TestComparator test_cmp(kTimestampSize);
2201 options.comparator = &test_cmp;
2202 DestroyAndReopen(options);
2203 SyncPoint::GetInstance()->DisableProcessing();
2204 SyncPoint::GetInstance()->LoadDependency({
2205 {"DataVisibilityTest::PointLookupWithSnapshot2:AfterTakingSnap",
2206 "DataVisibilityTest::PointLookupWithSnapshot2:BeforePut"},
2207 });
2208 SyncPoint::GetInstance()->EnableProcessing();
2209 port::Thread writer_thread([this]() {
2210 std::string write_ts = Timestamp(1, 0);
2211 WriteOptions write_opts;
2212 TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithSnapshot2:BeforePut");
2213 Status s = db_->Put(write_opts, "foo", write_ts, "value1");
2214 ASSERT_OK(s);
2215 ASSERT_OK(Flush());
2216
2217 write_ts = Timestamp(2, 0);
2218 s = db_->Put(write_opts, "bar", write_ts, "value2");
2219 ASSERT_OK(s);
2220 });
2221 const Snapshot* snap = db_->GetSnapshot();
2222 TEST_SYNC_POINT(
2223 "DataVisibilityTest::PointLookupWithSnapshot2:AfterTakingSnap");
2224 writer_thread.join();
2225 std::string read_ts_str = Timestamp(3, 0);
2226 Slice read_ts = read_ts_str;
2227 ReadOptions read_opts;
2228 read_opts.snapshot = snap;
2229 read_opts.timestamp = &read_ts;
2230 std::string value;
2231 Status s = db_->Get(read_opts, "foo", &value);
2232 ASSERT_TRUE(s.IsNotFound());
2233 db_->ReleaseSnapshot(snap);
2234 Close();
2235 }
2236
2237 // Application specifies timestamp but not snapshot.
2238 // reader writer
2239 // ts'=90
2240 // ts=100
2241 // seq=10
2242 // seq'=11
2243 // write finishes
2244 // scan(ts,seq)
2245 // <k, t1, s1> can be seen in scan as long as ts>=t1 AND seq>=s1. If ts>=t1 but
2246 // seq<s1, then the key should not be returned.
2247 TEST_F(DataVisibilityTest, RangeScanWithoutSnapshot) {
2248 Options options = CurrentOptions();
2249 const size_t kTimestampSize = Timestamp(0, 0).size();
2250 TestComparator test_cmp(kTimestampSize);
2251 options.comparator = &test_cmp;
2252 DestroyAndReopen(options);
2253 SyncPoint::GetInstance()->DisableProcessing();
2254 SyncPoint::GetInstance()->LoadDependency({
2255 {"DBImpl::NewIterator:3",
2256 "DataVisibilityTest::RangeScanWithoutSnapshot:BeforePut"},
2257 });
2258 SyncPoint::GetInstance()->EnableProcessing();
2259 port::Thread writer_thread([this]() {
2260 WriteOptions write_opts;
2261 TEST_SYNC_POINT("DataVisibilityTest::RangeScanWithoutSnapshot:BeforePut");
2262 for (int i = 0; i < 3; ++i) {
2263 std::string write_ts = Timestamp(i + 1, 0);
2264 Status s = db_->Put(write_opts, "key" + std::to_string(i), write_ts,
2265 "value" + std::to_string(i));
2266 ASSERT_OK(s);
2267 }
2268 });
2269 std::string read_ts_str = Timestamp(10, 0);
2270 Slice read_ts = read_ts_str;
2271 ReadOptions read_opts;
2272 read_opts.total_order_seek = true;
2273 read_opts.timestamp = &read_ts;
2274 Iterator* it = db_->NewIterator(read_opts);
2275 ASSERT_NE(nullptr, it);
2276 writer_thread.join();
2277 it->SeekToFirst();
2278 ASSERT_FALSE(it->Valid());
2279 delete it;
2280 Close();
2281 }
2282
2283 // Application specifies both timestamp and snapshot.
2284 // reader writer
2285 // seq=10
2286 // ts'=90
2287 // ts=100 seq'=11
2288 // write finishes
2289 // scan(ts,seq)
2290 // <k, t1, s1> can be seen by the scan only if t1<=ts AND s1<=seq. If t1<=ts
2291 // but s1>seq, then the key should not be returned.
2292 TEST_F(DataVisibilityTest, RangeScanWithSnapshot) {
2293 Options options = CurrentOptions();
2294 const size_t kTimestampSize = Timestamp(0, 0).size();
2295 TestComparator test_cmp(kTimestampSize);
2296 options.comparator = &test_cmp;
2297 DestroyAndReopen(options);
2298 SyncPoint::GetInstance()->DisableProcessing();
2299 SyncPoint::GetInstance()->LoadDependency({
2300 {"DataVisibilityTest::RangeScanWithSnapshot:AfterTakingSnapshot",
2301 "DataVisibilityTest::RangeScanWithSnapshot:BeforePut"},
2302 });
2303 SyncPoint::GetInstance()->EnableProcessing();
2304 port::Thread writer_thread([this]() {
2305 WriteOptions write_opts;
2306 TEST_SYNC_POINT("DataVisibilityTest::RangeScanWithSnapshot:BeforePut");
2307 for (int i = 0; i < 3; ++i) {
2308 std::string write_ts = Timestamp(i + 1, 0);
2309 Status s = db_->Put(write_opts, "key" + std::to_string(i), write_ts,
2310 "value" + std::to_string(i));
2311 ASSERT_OK(s);
2312 }
2313 });
2314 const Snapshot* snap = db_->GetSnapshot();
2315 TEST_SYNC_POINT(
2316 "DataVisibilityTest::RangeScanWithSnapshot:AfterTakingSnapshot");
2317
2318 writer_thread.join();
2319
2320 std::string read_ts_str = Timestamp(10, 0);
2321 Slice read_ts = read_ts_str;
2322 ReadOptions read_opts;
2323 read_opts.snapshot = snap;
2324 read_opts.total_order_seek = true;
2325 read_opts.timestamp = &read_ts;
2326 Iterator* it = db_->NewIterator(read_opts);
2327 ASSERT_NE(nullptr, it);
2328 it->Seek("key0");
2329 ASSERT_FALSE(it->Valid());
2330
2331 delete it;
2332 db_->ReleaseSnapshot(snap);
2333 Close();
2334 }
2335
2336 // Application specifies both timestamp and snapshot.
2337 // Query each combination and make sure for MultiGet key <k, t1, s1>, only
2338 // return keys that ts>=t1 AND seq>=s1.
2339 TEST_F(DataVisibilityTest, MultiGetWithTimestamp) {
2340 Options options = CurrentOptions();
2341 const size_t kTimestampSize = Timestamp(0, 0).size();
2342 TestComparator test_cmp(kTimestampSize);
2343 options.comparator = &test_cmp;
2344 DestroyAndReopen(options);
2345
2346 const Snapshot* snap0 = db_->GetSnapshot();
2347 PutTestData(0);
2348 VerifyDefaultCF();
2349 VerifyDefaultCF(snap0);
2350
2351 const Snapshot* snap1 = db_->GetSnapshot();
2352 PutTestData(1);
2353 VerifyDefaultCF();
2354 VerifyDefaultCF(snap0);
2355 VerifyDefaultCF(snap1);
2356
2357 ASSERT_OK(Flush());
2358
2359 const Snapshot* snap2 = db_->GetSnapshot();
2360 PutTestData(2);
2361 VerifyDefaultCF();
2362 VerifyDefaultCF(snap0);
2363 VerifyDefaultCF(snap1);
2364 VerifyDefaultCF(snap2);
2365
2366 db_->ReleaseSnapshot(snap0);
2367 db_->ReleaseSnapshot(snap1);
2368 db_->ReleaseSnapshot(snap2);
2369
2370 Close();
2371 }
2372
2373 // Application specifies timestamp but not snapshot.
2374 // reader writer
2375 // ts'=0, 1
2376 // ts=3
2377 // seq=10
2378 // seq'=11, 12
2379 // write finishes
2380 // MultiGet(ts,seq)
2381 // For MultiGet <k, t1, s1>, only return keys that ts>=t1 AND seq>=s1.
2382 TEST_F(DataVisibilityTest, MultiGetWithoutSnapshot) {
2383 Options options = CurrentOptions();
2384 const size_t kTimestampSize = Timestamp(0, 0).size();
2385 TestComparator test_cmp(kTimestampSize);
2386 options.comparator = &test_cmp;
2387 DestroyAndReopen(options);
2388
2389 SyncPoint::GetInstance()->DisableProcessing();
2390 SyncPoint::GetInstance()->LoadDependency({
2391 {"DBImpl::MultiGet:AfterGetSeqNum1",
2392 "DataVisibilityTest::MultiGetWithoutSnapshot:BeforePut"},
2393 {"DataVisibilityTest::MultiGetWithoutSnapshot:AfterPut",
2394 "DBImpl::MultiGet:AfterGetSeqNum2"},
2395 });
2396 SyncPoint::GetInstance()->EnableProcessing();
2397 port::Thread writer_thread([this]() {
2398 TEST_SYNC_POINT("DataVisibilityTest::MultiGetWithoutSnapshot:BeforePut");
2399 PutTestData(0);
2400 PutTestData(1);
2401 TEST_SYNC_POINT("DataVisibilityTest::MultiGetWithoutSnapshot:AfterPut");
2402 });
2403
2404 ReadOptions read_opts;
2405 std::string read_ts = Timestamp(kTestDataSize, 0);
2406 Slice read_ts_slice = read_ts;
2407 read_opts.timestamp = &read_ts_slice;
2408 auto keys = GetKeys();
2409 std::vector<std::string> values;
2410 auto ss = db_->MultiGet(read_opts, keys, &values);
2411
2412 writer_thread.join();
2413 for (auto s : ss) {
2414 ASSERT_TRUE(s.IsNotFound());
2415 }
2416 VerifyDefaultCF();
2417 Close();
2418 }
2419
2420 TEST_F(DataVisibilityTest, MultiGetCrossCF) {
2421 Options options = CurrentOptions();
2422 const size_t kTimestampSize = Timestamp(0, 0).size();
2423 TestComparator test_cmp(kTimestampSize);
2424 options.comparator = &test_cmp;
2425 DestroyAndReopen(options);
2426
2427 CreateAndReopenWithCF({"second"}, options);
2428 ColumnFamilyHandle* second_cf = handles_[1];
2429
2430 const Snapshot* snap0 = db_->GetSnapshot();
2431 PutTestData(0);
2432 PutTestData(0, second_cf);
2433 VerifyDefaultCF();
2434 VerifyDefaultCF(snap0);
2435
2436 const Snapshot* snap1 = db_->GetSnapshot();
2437 PutTestData(1);
2438 PutTestData(1, second_cf);
2439 VerifyDefaultCF();
2440 VerifyDefaultCF(snap0);
2441 VerifyDefaultCF(snap1);
2442
2443 ASSERT_OK(Flush());
2444
2445 const Snapshot* snap2 = db_->GetSnapshot();
2446 PutTestData(2);
2447 PutTestData(2, second_cf);
2448 VerifyDefaultCF();
2449 VerifyDefaultCF(snap0);
2450 VerifyDefaultCF(snap1);
2451 VerifyDefaultCF(snap2);
2452
2453 ReadOptions read_opts;
2454 std::string read_ts = Timestamp(kTestDataSize, 0);
2455 Slice read_ts_slice = read_ts;
2456 read_opts.timestamp = &read_ts_slice;
2457 read_opts.snapshot = snap1;
2458 auto keys = GetKeys();
2459 auto keys2 = GetKeys();
2460 keys.insert(keys.end(), keys2.begin(), keys2.end());
2461 std::vector<ColumnFamilyHandle*> cfs(kTestDataSize,
2462 db_->DefaultColumnFamily());
2463 std::vector<ColumnFamilyHandle*> cfs2(kTestDataSize, second_cf);
2464 cfs.insert(cfs.end(), cfs2.begin(), cfs2.end());
2465
2466 std::vector<std::string> values;
2467 auto ss = db_->MultiGet(read_opts, cfs, keys, &values);
2468 for (int i = 0; i < 2 * kTestDataSize; i++) {
2469 if (i % 3 == 0) {
2470 // only the first key for each column family should be returned
2471 ASSERT_OK(ss[i]);
2472 } else {
2473 ASSERT_TRUE(ss[i].IsNotFound());
2474 }
2475 }
2476
2477 db_->ReleaseSnapshot(snap0);
2478 db_->ReleaseSnapshot(snap1);
2479 db_->ReleaseSnapshot(snap2);
2480 Close();
2481 }
2482
2483 #if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
2484 class DBBasicTestWithTimestampCompressionSettings
2485 : public DBBasicTestWithTimestampBase,
2486 public testing::WithParamInterface<
2487 std::tuple<std::shared_ptr<const FilterPolicy>, CompressionType,
2488 uint32_t, uint32_t>> {
2489 public:
2490 DBBasicTestWithTimestampCompressionSettings()
2491 : DBBasicTestWithTimestampBase(
2492 "db_basic_test_with_timestamp_compression") {}
2493 };
2494
2495 TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGet) {
2496 const int kNumKeysPerFile = 1024;
2497 const size_t kNumTimestamps = 4;
2498 Options options = CurrentOptions();
2499 options.create_if_missing = true;
2500 options.env = env_;
2501 options.memtable_factory.reset(
2502 test::NewSpecialSkipListFactory(kNumKeysPerFile));
2503 size_t ts_sz = Timestamp(0, 0).size();
2504 TestComparator test_cmp(ts_sz);
2505 options.comparator = &test_cmp;
2506 BlockBasedTableOptions bbto;
2507 bbto.filter_policy = std::get<0>(GetParam());
2508 bbto.whole_key_filtering = true;
2509 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
2510
2511 const CompressionType comp_type = std::get<1>(GetParam());
2512 #if LZ4_VERSION_NUMBER < 10400 // r124+
2513 if (comp_type == kLZ4Compression || comp_type == kLZ4HCCompression) {
2514 return;
2515 }
2516 #endif // LZ4_VERSION_NUMBER >= 10400
2517 if (!ZSTD_Supported() && comp_type == kZSTD) {
2518 return;
2519 }
2520 if (!Zlib_Supported() && comp_type == kZlibCompression) {
2521 return;
2522 }
2523
2524 options.compression = comp_type;
2525 options.compression_opts.max_dict_bytes = std::get<2>(GetParam());
2526 if (comp_type == kZSTD) {
2527 options.compression_opts.zstd_max_train_bytes = std::get<2>(GetParam());
2528 }
2529 options.compression_opts.parallel_threads = std::get<3>(GetParam());
2530 options.target_file_size_base = 1 << 26; // 64MB
2531 DestroyAndReopen(options);
2532 CreateAndReopenWithCF({"pikachu"}, options);
2533 size_t num_cfs = handles_.size();
2534 ASSERT_EQ(2, num_cfs);
2535 std::vector<std::string> write_ts_list;
2536 std::vector<std::string> read_ts_list;
2537
2538 for (size_t i = 0; i != kNumTimestamps; ++i) {
2539 write_ts_list.push_back(Timestamp(i * 2, 0));
2540 read_ts_list.push_back(Timestamp(1 + i * 2, 0));
2541 const Slice write_ts = write_ts_list.back();
2542 WriteOptions wopts;
2543 for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
2544 for (size_t j = 0; j != (kNumKeysPerFile - 1) / kNumTimestamps; ++j) {
2545 ASSERT_OK(
2546 db_->Put(wopts, handles_[cf], Key1(j), write_ts,
2547 "value_" + std::to_string(j) + "_" + std::to_string(i)));
2548 }
2549 }
2550 }
2551 const auto& verify_db_func = [&]() {
2552 for (size_t i = 0; i != kNumTimestamps; ++i) {
2553 ReadOptions ropts;
2554 const Slice read_ts = read_ts_list[i];
2555 ropts.timestamp = &read_ts;
2556 for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
2557 ColumnFamilyHandle* cfh = handles_[cf];
2558 for (size_t j = 0; j != (kNumKeysPerFile - 1) / kNumTimestamps; ++j) {
2559 std::string value;
2560 ASSERT_OK(db_->Get(ropts, cfh, Key1(j), &value));
2561 ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i),
2562 value);
2563 }
2564 }
2565 }
2566 };
2567 verify_db_func();
2568 Close();
2569 }
2570
2571 TEST_P(DBBasicTestWithTimestampCompressionSettings, PutDeleteGet) {
2572 Options options = CurrentOptions();
2573 options.env = env_;
2574 options.create_if_missing = true;
2575 const size_t kTimestampSize = Timestamp(0, 0).size();
2576 TestComparator test_cmp(kTimestampSize);
2577 options.comparator = &test_cmp;
2578 const int kNumKeysPerFile = 1024;
2579 options.memtable_factory.reset(
2580 test::NewSpecialSkipListFactory(kNumKeysPerFile));
2581 BlockBasedTableOptions bbto;
2582 bbto.filter_policy = std::get<0>(GetParam());
2583 bbto.whole_key_filtering = true;
2584 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
2585
2586 const CompressionType comp_type = std::get<1>(GetParam());
2587 #if LZ4_VERSION_NUMBER < 10400 // r124+
2588 if (comp_type == kLZ4Compression || comp_type == kLZ4HCCompression) {
2589 return;
2590 }
2591 #endif // LZ4_VERSION_NUMBER >= 10400
2592 if (!ZSTD_Supported() && comp_type == kZSTD) {
2593 return;
2594 }
2595 if (!Zlib_Supported() && comp_type == kZlibCompression) {
2596 return;
2597 }
2598
2599 options.compression = comp_type;
2600 options.compression_opts.max_dict_bytes = std::get<2>(GetParam());
2601 if (comp_type == kZSTD) {
2602 options.compression_opts.zstd_max_train_bytes = std::get<2>(GetParam());
2603 }
2604 options.compression_opts.parallel_threads = std::get<3>(GetParam());
2605 options.target_file_size_base = 1 << 26; // 64MB
2606
2607 DestroyAndReopen(options);
2608
2609 const size_t kNumL0Files =
2610 static_cast<size_t>(Options().level0_file_num_compaction_trigger);
2611 {
2612 // Half of the keys will go through Deletion and remaining half with
2613 // SingleDeletion. Generate enough L0 files with ts=1 to trigger compaction
2614 // to L1
2615 std::string ts = Timestamp(1, 0);
2616 WriteOptions wopts;
2617 for (size_t i = 0; i < kNumL0Files; ++i) {
2618 for (int j = 0; j < kNumKeysPerFile; ++j) {
2619 ASSERT_OK(db_->Put(wopts, Key1(j), ts, "value" + std::to_string(i)));
2620 }
2621 ASSERT_OK(db_->Flush(FlushOptions()));
2622 }
2623 ASSERT_OK(dbfull()->TEST_WaitForCompact());
2624 // Generate another L0 at ts=3
2625 ts = Timestamp(3, 0);
2626 for (int i = 0; i < kNumKeysPerFile; ++i) {
2627 std::string key_str = Key1(i);
2628 Slice key(key_str);
2629 if ((i % 3) == 0) {
2630 if (i < kNumKeysPerFile / 2) {
2631 ASSERT_OK(db_->Delete(wopts, key, ts));
2632 } else {
2633 ASSERT_OK(db_->SingleDelete(wopts, key, ts));
2634 }
2635 } else {
2636 ASSERT_OK(db_->Put(wopts, key, ts, "new_value"));
2637 }
2638 }
2639 ASSERT_OK(db_->Flush(FlushOptions()));
2640 // Populate memtable at ts=5
2641 ts = Timestamp(5, 0);
2642 for (int i = 0; i != kNumKeysPerFile; ++i) {
2643 std::string key_str = Key1(i);
2644 Slice key(key_str);
2645 if ((i % 3) == 1) {
2646 if (i < kNumKeysPerFile / 2) {
2647 ASSERT_OK(db_->Delete(wopts, key, ts));
2648 } else {
2649 ASSERT_OK(db_->SingleDelete(wopts, key, ts));
2650 }
2651 } else if ((i % 3) == 2) {
2652 ASSERT_OK(db_->Put(wopts, key, ts, "new_value_2"));
2653 }
2654 }
2655 }
2656 {
2657 std::string ts_str = Timestamp(6, 0);
2658 Slice ts = ts_str;
2659 ReadOptions ropts;
2660 ropts.timestamp = &ts;
2661 for (uint64_t i = 0; i != static_cast<uint64_t>(kNumKeysPerFile); ++i) {
2662 std::string value;
2663 std::string key_ts;
2664 Status s = db_->Get(ropts, Key1(i), &value, &key_ts);
2665 if ((i % 3) == 2) {
2666 ASSERT_OK(s);
2667 ASSERT_EQ("new_value_2", value);
2668 ASSERT_EQ(Timestamp(5, 0), key_ts);
2669 } else if ((i % 3) == 1) {
2670 ASSERT_TRUE(s.IsNotFound());
2671 ASSERT_EQ(Timestamp(5, 0), key_ts);
2672 } else {
2673 ASSERT_TRUE(s.IsNotFound());
2674 ASSERT_EQ(Timestamp(3, 0), key_ts);
2675 }
2676 }
2677 }
2678 }
2679
2680 #ifndef ROCKSDB_LITE
2681 // A class which remembers the name of each flushed file.
2682 class FlushedFileCollector : public EventListener {
2683 public:
2684 FlushedFileCollector() {}
2685 ~FlushedFileCollector() override {}
2686
2687 void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
2688 InstrumentedMutexLock lock(&mutex_);
2689 flushed_files_.push_back(info.file_path);
2690 }
2691
2692 std::vector<std::string> GetFlushedFiles() {
2693 std::vector<std::string> result;
2694 {
2695 InstrumentedMutexLock lock(&mutex_);
2696 result = flushed_files_;
2697 }
2698 return result;
2699 }
2700
2701 void ClearFlushedFiles() {
2702 InstrumentedMutexLock lock(&mutex_);
2703 flushed_files_.clear();
2704 }
2705
2706 private:
2707 std::vector<std::string> flushed_files_;
2708 InstrumentedMutex mutex_;
2709 };
2710
2711 TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGetWithCompaction) {
2712 const int kNumKeysPerFile = 1024;
2713 const size_t kNumTimestamps = 2;
2714 const size_t kNumKeysPerTimestamp = (kNumKeysPerFile - 1) / kNumTimestamps;
2715 const size_t kSplitPosBase = kNumKeysPerTimestamp / 2;
2716 Options options = CurrentOptions();
2717 options.create_if_missing = true;
2718 options.env = env_;
2719 options.memtable_factory.reset(
2720 test::NewSpecialSkipListFactory(kNumKeysPerFile));
2721
2722 FlushedFileCollector* collector = new FlushedFileCollector();
2723 options.listeners.emplace_back(collector);
2724
2725 size_t ts_sz = Timestamp(0, 0).size();
2726 TestComparator test_cmp(ts_sz);
2727 options.comparator = &test_cmp;
2728 BlockBasedTableOptions bbto;
2729 bbto.filter_policy = std::get<0>(GetParam());
2730 bbto.whole_key_filtering = true;
2731 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
2732
2733 const CompressionType comp_type = std::get<1>(GetParam());
2734 #if LZ4_VERSION_NUMBER < 10400 // r124+
2735 if (comp_type == kLZ4Compression || comp_type == kLZ4HCCompression) {
2736 return;
2737 }
2738 #endif // LZ4_VERSION_NUMBER >= 10400
2739 if (!ZSTD_Supported() && comp_type == kZSTD) {
2740 return;
2741 }
2742 if (!Zlib_Supported() && comp_type == kZlibCompression) {
2743 return;
2744 }
2745
2746 options.compression = comp_type;
2747 options.compression_opts.max_dict_bytes = std::get<2>(GetParam());
2748 if (comp_type == kZSTD) {
2749 options.compression_opts.zstd_max_train_bytes = std::get<2>(GetParam());
2750 }
2751 options.compression_opts.parallel_threads = std::get<3>(GetParam());
2752 DestroyAndReopen(options);
2753 CreateAndReopenWithCF({"pikachu"}, options);
2754
2755 size_t num_cfs = handles_.size();
2756 ASSERT_EQ(2, num_cfs);
2757 std::vector<std::string> write_ts_list;
2758 std::vector<std::string> read_ts_list;
2759
2760 const auto& verify_records_func = [&](size_t i, size_t begin, size_t end,
2761 ColumnFamilyHandle* cfh) {
2762 std::string value;
2763 std::string timestamp;
2764
2765 ReadOptions ropts;
2766 const Slice read_ts = read_ts_list[i];
2767 ropts.timestamp = &read_ts;
2768 std::string expected_timestamp =
2769 std::string(write_ts_list[i].data(), write_ts_list[i].size());
2770
2771 for (size_t j = begin; j <= end; ++j) {
2772 ASSERT_OK(db_->Get(ropts, cfh, Key1(j), &value, &timestamp));
2773 ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i), value);
2774 ASSERT_EQ(expected_timestamp, timestamp);
2775 }
2776 };
2777
2778 for (size_t i = 0; i != kNumTimestamps; ++i) {
2779 write_ts_list.push_back(Timestamp(i * 2, 0));
2780 read_ts_list.push_back(Timestamp(1 + i * 2, 0));
2781 const Slice write_ts = write_ts_list.back();
2782 WriteOptions wopts;
2783 for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
2784 size_t memtable_get_start = 0;
2785 for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
2786 ASSERT_OK(
2787 db_->Put(wopts, handles_[cf], Key1(j), write_ts,
2788 "value_" + std::to_string(j) + "_" + std::to_string(i)));
2789 if (j == kSplitPosBase + i || j == kNumKeysPerTimestamp - 1) {
2790 verify_records_func(i, memtable_get_start, j, handles_[cf]);
2791 memtable_get_start = j + 1;
2792
2793 // flush all keys with the same timestamp to two sst files, split at
2794 // incremental positions such that lowerlevel[1].smallest.userkey ==
2795 // higherlevel[0].largest.userkey
2796 ASSERT_OK(Flush(cf));
2797 ASSERT_OK(dbfull()->TEST_WaitForCompact()); // wait for flush (which
2798 // is also a compaction)
2799
2800 // compact files (2 at each level) to a lower level such that all
2801 // keys with the same timestamp is at one level, with newer versions
2802 // at higher levels.
2803 CompactionOptions compact_opt;
2804 compact_opt.compression = kNoCompression;
2805 ASSERT_OK(db_->CompactFiles(compact_opt, handles_[cf],
2806 collector->GetFlushedFiles(),
2807 static_cast<int>(kNumTimestamps - i)));
2808 collector->ClearFlushedFiles();
2809 }
2810 }
2811 }
2812 }
2813 const auto& verify_db_func = [&]() {
2814 for (size_t i = 0; i != kNumTimestamps; ++i) {
2815 ReadOptions ropts;
2816 const Slice read_ts = read_ts_list[i];
2817 ropts.timestamp = &read_ts;
2818 std::string expected_timestamp(write_ts_list[i].data(),
2819 write_ts_list[i].size());
2820 for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
2821 ColumnFamilyHandle* cfh = handles_[cf];
2822 verify_records_func(i, 0, kNumKeysPerTimestamp - 1, cfh);
2823 }
2824 }
2825 };
2826 verify_db_func();
2827 Close();
2828 }
2829
2830 TEST_F(DBBasicTestWithTimestamp, BatchWriteAndMultiGet) {
2831 const int kNumKeysPerFile = 8192;
2832 const size_t kNumTimestamps = 2;
2833 const size_t kNumKeysPerTimestamp = (kNumKeysPerFile - 1) / kNumTimestamps;
2834 Options options = CurrentOptions();
2835 options.create_if_missing = true;
2836 options.env = env_;
2837 options.memtable_factory.reset(
2838 test::NewSpecialSkipListFactory(kNumKeysPerFile));
2839 options.memtable_prefix_bloom_size_ratio = 0.1;
2840 options.memtable_whole_key_filtering = true;
2841
2842 size_t ts_sz = Timestamp(0, 0).size();
2843 TestComparator test_cmp(ts_sz);
2844 options.comparator = &test_cmp;
2845 BlockBasedTableOptions bbto;
2846 bbto.filter_policy.reset(NewBloomFilterPolicy(
2847 10 /*bits_per_key*/, false /*use_block_based_builder*/));
2848 bbto.whole_key_filtering = true;
2849 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
2850 DestroyAndReopen(options);
2851 CreateAndReopenWithCF({"pikachu"}, options);
2852 size_t num_cfs = handles_.size();
2853 ASSERT_EQ(2, num_cfs);
2854 std::vector<std::string> write_ts_list;
2855 std::vector<std::string> read_ts_list;
2856
2857 const auto& verify_records_func = [&](size_t i, ColumnFamilyHandle* cfh) {
2858 std::vector<Slice> keys;
2859 std::vector<std::string> key_vals;
2860 std::vector<std::string> values;
2861 std::vector<std::string> timestamps;
2862
2863 for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
2864 key_vals.push_back(Key1(j));
2865 }
2866 for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
2867 keys.push_back(key_vals[j]);
2868 }
2869
2870 ReadOptions ropts;
2871 const Slice read_ts = read_ts_list[i];
2872 ropts.timestamp = &read_ts;
2873 std::string expected_timestamp(write_ts_list[i].data(),
2874 write_ts_list[i].size());
2875
2876 std::vector<ColumnFamilyHandle*> cfhs(keys.size(), cfh);
2877 std::vector<Status> statuses =
2878 db_->MultiGet(ropts, cfhs, keys, &values, &timestamps);
2879 for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
2880 ASSERT_OK(statuses[j]);
2881 ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i),
2882 values[j]);
2883 ASSERT_EQ(expected_timestamp, timestamps[j]);
2884 }
2885 };
2886
2887 const std::string dummy_ts(ts_sz, '\0');
2888 for (size_t i = 0; i != kNumTimestamps; ++i) {
2889 write_ts_list.push_back(Timestamp(i * 2, 0));
2890 read_ts_list.push_back(Timestamp(1 + i * 2, 0));
2891 const Slice& write_ts = write_ts_list.back();
2892 for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
2893 WriteOptions wopts;
2894 WriteBatch batch(0, 0, 0, ts_sz);
2895 for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
2896 const std::string key = Key1(j);
2897 const std::string value =
2898 "value_" + std::to_string(j) + "_" + std::to_string(i);
2899 ASSERT_OK(batch.Put(handles_[cf], key, value));
2900 }
2901 ASSERT_OK(batch.UpdateTimestamps(write_ts,
2902 [ts_sz](uint32_t) { return ts_sz; }));
2903 ASSERT_OK(db_->Write(wopts, &batch));
2904
2905 verify_records_func(i, handles_[cf]);
2906
2907 ASSERT_OK(Flush(cf));
2908 }
2909 }
2910
2911 const auto& verify_db_func = [&]() {
2912 for (size_t i = 0; i != kNumTimestamps; ++i) {
2913 ReadOptions ropts;
2914 const Slice read_ts = read_ts_list[i];
2915 ropts.timestamp = &read_ts;
2916 for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
2917 ColumnFamilyHandle* cfh = handles_[cf];
2918 verify_records_func(i, cfh);
2919 }
2920 }
2921 };
2922 verify_db_func();
2923 Close();
2924 }
2925
2926 TEST_F(DBBasicTestWithTimestamp, MultiGetNoReturnTs) {
2927 Options options = CurrentOptions();
2928 options.env = env_;
2929 const size_t kTimestampSize = Timestamp(0, 0).size();
2930 TestComparator test_cmp(kTimestampSize);
2931 options.comparator = &test_cmp;
2932 DestroyAndReopen(options);
2933 WriteOptions write_opts;
2934 std::string ts = Timestamp(1, 0);
2935 ASSERT_OK(db_->Put(write_opts, "foo", ts, "value"));
2936 ASSERT_OK(db_->Put(write_opts, "bar", ts, "value"));
2937 ASSERT_OK(db_->Put(write_opts, "fooxxxxxxxxxxxxxxxx", ts, "value"));
2938 ASSERT_OK(db_->Put(write_opts, "barxxxxxxxxxxxxxxxx", ts, "value"));
2939 ColumnFamilyHandle* cfh = dbfull()->DefaultColumnFamily();
2940 ts = Timestamp(2, 0);
2941 Slice read_ts = ts;
2942 ReadOptions read_opts;
2943 read_opts.timestamp = &read_ts;
2944 {
2945 ColumnFamilyHandle* column_families[] = {cfh, cfh};
2946 Slice keys[] = {"foo", "bar"};
2947 PinnableSlice values[] = {PinnableSlice(), PinnableSlice()};
2948 Status statuses[] = {Status::OK(), Status::OK()};
2949 dbfull()->MultiGet(read_opts, /*num_keys=*/2, &column_families[0], &keys[0],
2950 &values[0], &statuses[0], /*sorted_input=*/false);
2951 for (const auto& s : statuses) {
2952 ASSERT_OK(s);
2953 }
2954 }
2955 {
2956 ColumnFamilyHandle* column_families[] = {cfh, cfh, cfh, cfh};
2957 // Make user keys longer than configured timestamp size (16 bytes) to
2958 // verify RocksDB does not use the trailing bytes 'x' as timestamp.
2959 Slice keys[] = {"fooxxxxxxxxxxxxxxxx", "barxxxxxxxxxxxxxxxx", "foo", "bar"};
2960 PinnableSlice values[] = {PinnableSlice(), PinnableSlice(), PinnableSlice(),
2961 PinnableSlice()};
2962 Status statuses[] = {Status::OK(), Status::OK(), Status::OK(),
2963 Status::OK()};
2964 dbfull()->MultiGet(read_opts, /*num_keys=*/4, &column_families[0], &keys[0],
2965 &values[0], &statuses[0], /*sorted_input=*/false);
2966 for (const auto& s : statuses) {
2967 ASSERT_OK(s);
2968 }
2969 }
2970 Close();
2971 }
2972
2973 #endif // !ROCKSDB_LITE
2974
2975 INSTANTIATE_TEST_CASE_P(
2976 Timestamp, DBBasicTestWithTimestampCompressionSettings,
2977 ::testing::Combine(
2978 ::testing::Values(std::shared_ptr<const FilterPolicy>(nullptr),
2979 std::shared_ptr<const FilterPolicy>(
2980 NewBloomFilterPolicy(10, false))),
2981 ::testing::Values(kNoCompression, kZlibCompression, kLZ4Compression,
2982 kLZ4HCCompression, kZSTD),
2983 ::testing::Values(0, 1 << 14), ::testing::Values(1, 4)));
2984
2985 class DBBasicTestWithTimestampPrefixSeek
2986 : public DBBasicTestWithTimestampBase,
2987 public testing::WithParamInterface<
2988 std::tuple<std::shared_ptr<const SliceTransform>,
2989 std::shared_ptr<const FilterPolicy>, bool,
2990 BlockBasedTableOptions::IndexType>> {
2991 public:
2992 DBBasicTestWithTimestampPrefixSeek()
2993 : DBBasicTestWithTimestampBase(
2994 "/db_basic_test_with_timestamp_prefix_seek") {}
2995 };
2996
2997 TEST_P(DBBasicTestWithTimestampPrefixSeek, IterateWithPrefix) {
2998 const size_t kNumKeysPerFile = 128;
2999 Options options = CurrentOptions();
3000 options.env = env_;
3001 options.create_if_missing = true;
3002 const size_t kTimestampSize = Timestamp(0, 0).size();
3003 TestComparator test_cmp(kTimestampSize);
3004 options.comparator = &test_cmp;
3005 options.prefix_extractor = std::get<0>(GetParam());
3006 options.memtable_factory.reset(
3007 test::NewSpecialSkipListFactory(kNumKeysPerFile));
3008 BlockBasedTableOptions bbto;
3009 bbto.filter_policy = std::get<1>(GetParam());
3010 bbto.index_type = std::get<3>(GetParam());
3011 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
3012 DestroyAndReopen(options);
3013
3014 const uint64_t kMaxKey = 0xffffffffffffffff;
3015 const uint64_t kMinKey = 0xfffffffffffff000;
3016 const std::vector<std::string> write_ts_list = {Timestamp(3, 0xffffffff),
3017 Timestamp(6, 0xffffffff)};
3018 WriteOptions write_opts;
3019 {
3020 for (size_t i = 0; i != write_ts_list.size(); ++i) {
3021 for (uint64_t key = kMaxKey; key >= kMinKey; --key) {
3022 Status s = db_->Put(write_opts, Key1(key), write_ts_list[i],
3023 "value" + std::to_string(i));
3024 ASSERT_OK(s);
3025 }
3026 }
3027 }
3028 const std::vector<std::string> read_ts_list = {Timestamp(5, 0xffffffff),
3029 Timestamp(9, 0xffffffff)};
3030 {
3031 ReadOptions read_opts;
3032 read_opts.total_order_seek = false;
3033 read_opts.prefix_same_as_start = std::get<2>(GetParam());
3034 fprintf(stdout, "%s %s %d\n", options.prefix_extractor->Name(),
3035 bbto.filter_policy ? bbto.filter_policy->Name() : "null",
3036 static_cast<int>(read_opts.prefix_same_as_start));
3037 for (size_t i = 0; i != read_ts_list.size(); ++i) {
3038 Slice read_ts = read_ts_list[i];
3039 read_opts.timestamp = &read_ts;
3040 std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
3041
3042 // Seek to kMaxKey
3043 iter->Seek(Key1(kMaxKey));
3044 CheckIterUserEntry(iter.get(), Key1(kMaxKey), kTypeValue,
3045 "value" + std::to_string(i), write_ts_list[i]);
3046 iter->Next();
3047 ASSERT_FALSE(iter->Valid());
3048
3049 // Seek to kMinKey
3050 iter->Seek(Key1(kMinKey));
3051 CheckIterUserEntry(iter.get(), Key1(kMinKey), kTypeValue,
3052 "value" + std::to_string(i), write_ts_list[i]);
3053 iter->Prev();
3054 ASSERT_FALSE(iter->Valid());
3055 }
3056 const std::vector<uint64_t> targets = {kMinKey, kMinKey + 0x10,
3057 kMinKey + 0x100, kMaxKey};
3058 const SliceTransform* const pe = options.prefix_extractor.get();
3059 ASSERT_NE(nullptr, pe);
3060 const size_t kPrefixShift =
3061 8 * (Key1(0).size() - pe->Transform(Key1(0)).size());
3062 const uint64_t kPrefixMask =
3063 ~((static_cast<uint64_t>(1) << kPrefixShift) - 1);
3064 const uint64_t kNumKeysWithinPrefix =
3065 (static_cast<uint64_t>(1) << kPrefixShift);
3066 for (size_t i = 0; i != read_ts_list.size(); ++i) {
3067 Slice read_ts = read_ts_list[i];
3068 read_opts.timestamp = &read_ts;
3069 std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
3070 // Forward and backward iterate.
3071 for (size_t j = 0; j != targets.size(); ++j) {
3072 std::string start_key = Key1(targets[j]);
3073 uint64_t expected_ub =
3074 (targets[j] & kPrefixMask) - 1 + kNumKeysWithinPrefix;
3075 uint64_t expected_key = targets[j];
3076 size_t count = 0;
3077 it->Seek(Key1(targets[j]));
3078 while (it->Valid()) {
3079 std::string saved_prev_key;
3080 saved_prev_key.assign(it->key().data(), it->key().size());
3081
3082 // Out of prefix
3083 if (!read_opts.prefix_same_as_start &&
3084 pe->Transform(saved_prev_key) != pe->Transform(start_key)) {
3085 break;
3086 }
3087 CheckIterUserEntry(it.get(), Key1(expected_key), kTypeValue,
3088 "value" + std::to_string(i), write_ts_list[i]);
3089 ++count;
3090 ++expected_key;
3091 it->Next();
3092 }
3093 ASSERT_EQ(expected_ub - targets[j] + 1, count);
3094
3095 count = 0;
3096 expected_key = targets[j];
3097 it->SeekForPrev(start_key);
3098 uint64_t expected_lb = (targets[j] & kPrefixMask);
3099 while (it->Valid()) {
3100 // Out of prefix
3101 if (!read_opts.prefix_same_as_start &&
3102 pe->Transform(it->key()) != pe->Transform(start_key)) {
3103 break;
3104 }
3105 CheckIterUserEntry(it.get(), Key1(expected_key), kTypeValue,
3106 "value" + std::to_string(i), write_ts_list[i]);
3107 ++count;
3108 --expected_key;
3109 it->Prev();
3110 }
3111 ASSERT_EQ(targets[j] - std::max(expected_lb, kMinKey) + 1, count);
3112 }
3113 }
3114 }
3115 Close();
3116 }
3117
3118 // TODO(yanqin): consider handling non-fixed-length prefix extractors, e.g.
3119 // NoopTransform.
3120 INSTANTIATE_TEST_CASE_P(
3121 Timestamp, DBBasicTestWithTimestampPrefixSeek,
3122 ::testing::Combine(
3123 ::testing::Values(
3124 std::shared_ptr<const SliceTransform>(NewFixedPrefixTransform(1)),
3125 std::shared_ptr<const SliceTransform>(NewFixedPrefixTransform(4)),
3126 std::shared_ptr<const SliceTransform>(NewFixedPrefixTransform(7)),
3127 std::shared_ptr<const SliceTransform>(NewFixedPrefixTransform(8))),
3128 ::testing::Values(std::shared_ptr<const FilterPolicy>(nullptr),
3129 std::shared_ptr<const FilterPolicy>(
3130 NewBloomFilterPolicy(10 /*bits_per_key*/, false)),
3131 std::shared_ptr<const FilterPolicy>(
3132 NewBloomFilterPolicy(20 /*bits_per_key*/,
3133 false))),
3134 ::testing::Bool(),
3135 ::testing::Values(
3136 BlockBasedTableOptions::IndexType::kBinarySearch,
3137 BlockBasedTableOptions::IndexType::kHashSearch,
3138 BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch,
3139 BlockBasedTableOptions::IndexType::kBinarySearchWithFirstKey)));
3140
3141 class DBBasicTestWithTsIterTombstones
3142 : public DBBasicTestWithTimestampBase,
3143 public testing::WithParamInterface<
3144 std::tuple<std::shared_ptr<const SliceTransform>,
3145 std::shared_ptr<const FilterPolicy>, int,
3146 BlockBasedTableOptions::IndexType>> {
3147 public:
3148 DBBasicTestWithTsIterTombstones()
3149 : DBBasicTestWithTimestampBase("/db_basic_ts_iter_tombstones") {}
3150 };
3151
3152 TEST_P(DBBasicTestWithTsIterTombstones, IterWithDelete) {
3153 constexpr size_t kNumKeysPerFile = 128;
3154 Options options = CurrentOptions();
3155 options.env = env_;
3156 const size_t kTimestampSize = Timestamp(0, 0).size();
3157 TestComparator test_cmp(kTimestampSize);
3158 options.comparator = &test_cmp;
3159 options.prefix_extractor = std::get<0>(GetParam());
3160 options.memtable_factory.reset(
3161 test::NewSpecialSkipListFactory(kNumKeysPerFile));
3162 BlockBasedTableOptions bbto;
3163 bbto.filter_policy = std::get<1>(GetParam());
3164 bbto.index_type = std::get<3>(GetParam());
3165 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
3166 options.num_levels = std::get<2>(GetParam());
3167 DestroyAndReopen(options);
3168 std::vector<std::string> write_ts_strs = {Timestamp(2, 0), Timestamp(4, 0)};
3169 constexpr uint64_t kMaxKey = 0xffffffffffffffff;
3170 constexpr uint64_t kMinKey = 0xfffffffffffff000;
3171 // Insert kMinKey...kMaxKey
3172 uint64_t key = kMinKey;
3173 WriteOptions write_opts;
3174 Slice ts = write_ts_strs[0];
3175 do {
3176 Status s = db_->Put(write_opts, Key1(key), write_ts_strs[0],
3177 "value" + std::to_string(key));
3178 ASSERT_OK(s);
3179 if (kMaxKey == key) {
3180 break;
3181 }
3182 ++key;
3183 } while (true);
3184
3185 for (key = kMaxKey; key >= kMinKey; --key) {
3186 Status s;
3187 if (0 != (key % 2)) {
3188 s = db_->Put(write_opts, Key1(key), write_ts_strs[1],
3189 "value1" + std::to_string(key));
3190 } else {
3191 s = db_->Delete(write_opts, Key1(key), write_ts_strs[1]);
3192 }
3193 ASSERT_OK(s);
3194 }
3195 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3196 {
3197 std::string read_ts = Timestamp(4, 0);
3198 ts = read_ts;
3199 ReadOptions read_opts;
3200 read_opts.total_order_seek = true;
3201 read_opts.timestamp = &ts;
3202 std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
3203 size_t count = 0;
3204 key = kMinKey + 1;
3205 for (iter->SeekToFirst(); iter->Valid(); iter->Next(), ++count, key += 2) {
3206 ASSERT_EQ(Key1(key), iter->key());
3207 ASSERT_EQ("value1" + std::to_string(key), iter->value());
3208 }
3209 ASSERT_EQ((kMaxKey - kMinKey + 1) / 2, count);
3210
3211 for (iter->SeekToLast(), count = 0, key = kMaxKey; iter->Valid();
3212 key -= 2, ++count, iter->Prev()) {
3213 ASSERT_EQ(Key1(key), iter->key());
3214 ASSERT_EQ("value1" + std::to_string(key), iter->value());
3215 }
3216 ASSERT_EQ((kMaxKey - kMinKey + 1) / 2, count);
3217 }
3218 Close();
3219 }
3220
3221 INSTANTIATE_TEST_CASE_P(
3222 Timestamp, DBBasicTestWithTsIterTombstones,
3223 ::testing::Combine(
3224 ::testing::Values(
3225 std::shared_ptr<const SliceTransform>(NewFixedPrefixTransform(7)),
3226 std::shared_ptr<const SliceTransform>(NewFixedPrefixTransform(8))),
3227 ::testing::Values(std::shared_ptr<const FilterPolicy>(nullptr),
3228 std::shared_ptr<const FilterPolicy>(
3229 NewBloomFilterPolicy(10, false)),
3230 std::shared_ptr<const FilterPolicy>(
3231 NewBloomFilterPolicy(20, false))),
3232 ::testing::Values(2, 6),
3233 ::testing::Values(
3234 BlockBasedTableOptions::IndexType::kBinarySearch,
3235 BlockBasedTableOptions::IndexType::kHashSearch,
3236 BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch,
3237 BlockBasedTableOptions::IndexType::kBinarySearchWithFirstKey)));
3238 #endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
3239
3240 class UpdateFullHistoryTsLowTest : public DBBasicTestWithTimestampBase {
3241 public:
3242 UpdateFullHistoryTsLowTest()
3243 : DBBasicTestWithTimestampBase("/update_full_history_ts_low_test") {}
3244 };
3245
3246 TEST_F(UpdateFullHistoryTsLowTest, ConcurrentUpdate) {
3247 Options options = CurrentOptions();
3248 options.env = env_;
3249 options.create_if_missing = true;
3250 std::string lower_ts_low = Timestamp(10, 0);
3251 std::string higher_ts_low = Timestamp(25, 0);
3252 const size_t kTimestampSize = lower_ts_low.size();
3253 TestComparator test_cmp(kTimestampSize);
3254 options.comparator = &test_cmp;
3255
3256 DestroyAndReopen(options);
3257 SyncPoint::GetInstance()->DisableProcessing();
3258 SyncPoint::GetInstance()->ClearAllCallBacks();
3259 // This workaround swaps `lower_ts_low` originally used for update by the
3260 // caller to `higher_ts_low` after its writer is queued to make sure
3261 // the caller will always get a TryAgain error.
3262 // It mimics cases where two threads update full_history_ts_low concurrently
3263 // with one thread writing a higher ts_low and one thread writing a lower
3264 // ts_low.
3265 VersionEdit* version_edit;
3266 SyncPoint::GetInstance()->SetCallBack(
3267 "DBImpl::IncreaseFullHistoryTsLowImpl:BeforeEdit",
3268 [&](void* arg) { version_edit = reinterpret_cast<VersionEdit*>(arg); });
3269 SyncPoint::GetInstance()->SetCallBack(
3270 "VersionSet::LogAndApply:BeforeWriterWaiting",
3271 [&](void* /*arg*/) { version_edit->SetFullHistoryTsLow(higher_ts_low); });
3272 SyncPoint::GetInstance()->EnableProcessing();
3273 ASSERT_TRUE(
3274 db_->IncreaseFullHistoryTsLow(db_->DefaultColumnFamily(), lower_ts_low)
3275 .IsTryAgain());
3276 SyncPoint::GetInstance()->DisableProcessing();
3277 SyncPoint::GetInstance()->ClearAllCallBacks();
3278
3279 Close();
3280 }
3281
3282 TEST_F(DBBasicTestWithTimestamp,
3283 GCPreserveRangeTombstoneWhenNoOrSmallFullHistoryLow) {
3284 Options options = CurrentOptions();
3285 options.env = env_;
3286 options.create_if_missing = true;
3287 const size_t kTimestampSize = Timestamp(0, 0).size();
3288 TestComparator test_cmp(kTimestampSize);
3289 options.comparator = &test_cmp;
3290 DestroyAndReopen(options);
3291
3292 std::string ts_str = Timestamp(1, 0);
3293 WriteOptions wopts;
3294 ASSERT_OK(db_->Put(wopts, "k1", ts_str, "v1"));
3295 ASSERT_OK(db_->Put(wopts, "k2", ts_str, "v2"));
3296 ASSERT_OK(db_->Put(wopts, "k3", ts_str, "v3"));
3297 ts_str = Timestamp(2, 0);
3298 ASSERT_OK(
3299 db_->DeleteRange(wopts, db_->DefaultColumnFamily(), "k1", "k3", ts_str));
3300
3301 ts_str = Timestamp(3, 0);
3302 Slice ts = ts_str;
3303 ReadOptions ropts;
3304 ropts.timestamp = &ts;
3305 CompactRangeOptions cro;
3306 cro.full_history_ts_low = nullptr;
3307 std::string value, key_ts;
3308 Status s;
3309 auto verify = [&] {
3310 s = db_->Get(ropts, "k1", &value);
3311 ASSERT_TRUE(s.IsNotFound());
3312
3313 s = db_->Get(ropts, "k2", &value, &key_ts);
3314 ASSERT_TRUE(s.IsNotFound());
3315 ASSERT_EQ(key_ts, Timestamp(2, 0));
3316
3317 ASSERT_OK(db_->Get(ropts, "k3", &value, &key_ts));
3318 ASSERT_EQ(value, "v3");
3319 ASSERT_EQ(Timestamp(1, 0), key_ts);
3320
3321 size_t batch_size = 3;
3322 std::vector<std::string> key_strs = {"k1", "k2", "k3"};
3323 std::vector<Slice> keys{key_strs.begin(), key_strs.end()};
3324 std::vector<PinnableSlice> values(batch_size);
3325 std::vector<Status> statuses(batch_size);
3326 db_->MultiGet(ropts, db_->DefaultColumnFamily(), batch_size, keys.data(),
3327 values.data(), statuses.data(), true /* sorted_input */);
3328 ASSERT_TRUE(statuses[0].IsNotFound());
3329 ASSERT_TRUE(statuses[1].IsNotFound());
3330 ASSERT_OK(statuses[2]);
3331 ;
3332 ASSERT_EQ(values[2], "v3");
3333 };
3334 verify();
3335 ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
3336 verify();
3337 std::string lb = Timestamp(0, 0);
3338 Slice lb_slice = lb;
3339 cro.full_history_ts_low = &lb_slice;
3340 ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
3341 verify();
3342 Close();
3343 }
3344
3345 TEST_F(DBBasicTestWithTimestamp,
3346 GCRangeTombstonesAndCoveredKeysRespectingTslow) {
3347 Options options = CurrentOptions();
3348 options.env = env_;
3349 options.create_if_missing = true;
3350 BlockBasedTableOptions bbto;
3351 bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
3352 bbto.cache_index_and_filter_blocks = true;
3353 bbto.whole_key_filtering = true;
3354 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
3355 const size_t kTimestampSize = Timestamp(0, 0).size();
3356 TestComparator test_cmp(kTimestampSize);
3357 options.comparator = &test_cmp;
3358 options.num_levels = 2;
3359 DestroyAndReopen(options);
3360
3361 WriteOptions wopts;
3362 ASSERT_OK(db_->Put(wopts, "k1", Timestamp(1, 0), "v1"));
3363 ASSERT_OK(db_->Delete(wopts, "k2", Timestamp(2, 0)));
3364 ASSERT_OK(db_->DeleteRange(wopts, db_->DefaultColumnFamily(), "k1", "k3",
3365 Timestamp(3, 0)));
3366 ASSERT_OK(db_->Put(wopts, "k3", Timestamp(4, 0), "v3"));
3367
3368 ReadOptions ropts;
3369 std::string read_ts = Timestamp(5, 0);
3370 Slice read_ts_slice = read_ts;
3371 ropts.timestamp = &read_ts_slice;
3372 size_t batch_size = 3;
3373 std::vector<std::string> key_strs = {"k1", "k2", "k3"};
3374 std::vector<Slice> keys = {key_strs.begin(), key_strs.end()};
3375 std::vector<PinnableSlice> values(batch_size);
3376 std::vector<Status> statuses(batch_size);
3377 std::vector<std::string> timestamps(batch_size);
3378 db_->MultiGet(ropts, db_->DefaultColumnFamily(), batch_size, keys.data(),
3379 values.data(), timestamps.data(), statuses.data(),
3380 true /* sorted_input */);
3381 ASSERT_TRUE(statuses[0].IsNotFound());
3382 ASSERT_EQ(timestamps[0], Timestamp(3, 0));
3383 ASSERT_TRUE(statuses[1].IsNotFound());
3384 // DeleteRange has a higher timestamp than Delete for "k2"
3385 ASSERT_EQ(timestamps[1], Timestamp(3, 0));
3386 ASSERT_OK(statuses[2]);
3387 ASSERT_EQ(values[2], "v3");
3388 ASSERT_EQ(timestamps[2], Timestamp(4, 0));
3389
3390 CompactRangeOptions cro;
3391 // Range tombstone has timestamp >= full_history_ts_low, covered keys
3392 // are not dropped.
3393 std::string compaction_ts_str = Timestamp(2, 0);
3394 Slice compaction_ts = compaction_ts_str;
3395 cro.full_history_ts_low = &compaction_ts;
3396 cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
3397 ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
3398 ropts.timestamp = &compaction_ts;
3399 std::string value, ts;
3400 ASSERT_OK(db_->Get(ropts, "k1", &value, &ts));
3401 ASSERT_EQ(value, "v1");
3402 // timestamp is below full_history_ts_low, zeroed out as the key goes into
3403 // bottommost level
3404 ASSERT_EQ(ts, Timestamp(0, 0));
3405 ASSERT_TRUE(db_->Get(ropts, "k2", &value, &ts).IsNotFound());
3406 ASSERT_EQ(ts, Timestamp(2, 0));
3407
3408 compaction_ts_str = Timestamp(4, 0);
3409 compaction_ts = compaction_ts_str;
3410 cro.full_history_ts_low = &compaction_ts;
3411 ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
3412 ropts.timestamp = &read_ts_slice;
3413 // k1, k2 and the range tombstone should be dropped
3414 // k3 should still exist
3415 db_->MultiGet(ropts, db_->DefaultColumnFamily(), batch_size, keys.data(),
3416 values.data(), timestamps.data(), statuses.data(),
3417 true /* sorted_input */);
3418 ASSERT_TRUE(statuses[0].IsNotFound());
3419 ASSERT_TRUE(timestamps[0].empty());
3420 ASSERT_TRUE(statuses[1].IsNotFound());
3421 ASSERT_TRUE(timestamps[1].empty());
3422 ASSERT_OK(statuses[2]);
3423 ASSERT_EQ(values[2], "v3");
3424 ASSERT_EQ(timestamps[2], Timestamp(4, 0));
3425
3426 Close();
3427 }
3428
3429 TEST_P(DBBasicTestWithTimestampTableOptions, DeleteRangeBaiscReadAndIterate) {
3430 const int kNum = 200, kRangeBegin = 50, kRangeEnd = 150, kNumPerFile = 25;
3431 Options options = CurrentOptions();
3432 options.prefix_extractor.reset(NewFixedPrefixTransform(3));
3433 options.compression = kNoCompression;
3434 BlockBasedTableOptions bbto;
3435 bbto.index_type = GetParam();
3436 bbto.block_size = 100;
3437 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
3438 options.env = env_;
3439 options.create_if_missing = true;
3440 const size_t kTimestampSize = Timestamp(0, 0).size();
3441 TestComparator test_cmp(kTimestampSize);
3442 options.comparator = &test_cmp;
3443 options.memtable_factory.reset(test::NewSpecialSkipListFactory(kNumPerFile));
3444 DestroyAndReopen(options);
3445
3446 // Write half of the keys before the tombstone and half after the tombstone.
3447 // Only covered keys (i.e., within the range and older than the tombstone)
3448 // should be deleted.
3449 for (int i = 0; i < kNum; ++i) {
3450 if (i == kNum / 2) {
3451 ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
3452 Key1(kRangeBegin), Key1(kRangeEnd),
3453 Timestamp(i, 0)));
3454 }
3455 ASSERT_OK(db_->Put(WriteOptions(), Key1(i), Timestamp(i, 0),
3456 "val" + std::to_string(i)));
3457 if (i == kNum - kNumPerFile) {
3458 ASSERT_OK(Flush());
3459 }
3460 }
3461
3462 ReadOptions read_opts;
3463 read_opts.total_order_seek = true;
3464 std::string read_ts = Timestamp(kNum, 0);
3465 Slice read_ts_slice = read_ts;
3466 read_opts.timestamp = &read_ts_slice;
3467 {
3468 std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
3469 ASSERT_OK(iter->status());
3470
3471 int expected = 0;
3472 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
3473 ASSERT_EQ(Key1(expected), iter->key());
3474 if (expected == kRangeBegin - 1) {
3475 expected = kNum / 2;
3476 } else {
3477 ++expected;
3478 }
3479 }
3480 ASSERT_EQ(kNum, expected);
3481
3482 expected = kNum / 2;
3483 for (iter->Seek(Key1(kNum / 2)); iter->Valid(); iter->Next()) {
3484 ASSERT_EQ(Key1(expected), iter->key());
3485 ++expected;
3486 }
3487 ASSERT_EQ(kNum, expected);
3488
3489 expected = kRangeBegin - 1;
3490 for (iter->SeekForPrev(Key1(kNum / 2 - 1)); iter->Valid(); iter->Prev()) {
3491 ASSERT_EQ(Key1(expected), iter->key());
3492 --expected;
3493 }
3494 ASSERT_EQ(-1, expected);
3495
3496 read_ts = Timestamp(0, 0);
3497 read_ts_slice = read_ts;
3498 read_opts.timestamp = &read_ts_slice;
3499 iter.reset(db_->NewIterator(read_opts));
3500 iter->SeekToFirst();
3501 ASSERT_TRUE(iter->Valid());
3502 ASSERT_EQ(iter->key(), Key1(0));
3503 iter->Next();
3504 ASSERT_FALSE(iter->Valid());
3505 ASSERT_OK(iter->status());
3506 }
3507
3508 read_ts = Timestamp(kNum, 0);
3509 read_ts_slice = read_ts;
3510 read_opts.timestamp = &read_ts_slice;
3511 std::string value, timestamp;
3512 Status s;
3513 for (int i = 0; i < kNum; ++i) {
3514 s = db_->Get(read_opts, Key1(i), &value, &timestamp);
3515 if (i >= kRangeBegin && i < kNum / 2) {
3516 ASSERT_TRUE(s.IsNotFound());
3517 ASSERT_EQ(timestamp, Timestamp(kNum / 2, 0));
3518 } else {
3519 ASSERT_OK(s);
3520 ASSERT_EQ(value, "val" + std::to_string(i));
3521 ASSERT_EQ(timestamp, Timestamp(i, 0));
3522 }
3523 }
3524
3525 size_t batch_size = kNum;
3526 std::vector<std::string> key_strs(batch_size);
3527 std::vector<Slice> keys(batch_size);
3528 std::vector<PinnableSlice> values(batch_size);
3529 std::vector<Status> statuses(batch_size);
3530 std::vector<std::string> timestamps(batch_size);
3531 for (int i = 0; i < kNum; ++i) {
3532 key_strs[i] = Key1(i);
3533 keys[i] = key_strs[i];
3534 }
3535 db_->MultiGet(read_opts, db_->DefaultColumnFamily(), batch_size, keys.data(),
3536 values.data(), timestamps.data(), statuses.data(),
3537 true /* sorted_input */);
3538 for (int i = 0; i < kNum; ++i) {
3539 if (i >= kRangeBegin && i < kNum / 2) {
3540 ASSERT_TRUE(statuses[i].IsNotFound());
3541 ASSERT_EQ(timestamps[i], Timestamp(kNum / 2, 0));
3542 } else {
3543 ASSERT_OK(statuses[i]);
3544 ASSERT_EQ(values[i], "val" + std::to_string(i));
3545 ASSERT_EQ(timestamps[i], Timestamp(i, 0));
3546 }
3547 }
3548 Close();
3549 }
3550
3551 TEST_F(DBBasicTestWithTimestamp, DeleteRangeGetIteratorWithSnapshot) {
3552 // 4 keys 0, 1, 2, 3 at timestamps 0, 1, 2, 3 respectively.
3553 // A range tombstone [1, 3) at timestamp 1 and has a sequence number between
3554 // key 1 and 2.
3555 Options options = CurrentOptions();
3556 const size_t kTimestampSize = Timestamp(0, 0).size();
3557 TestComparator test_cmp(kTimestampSize);
3558 options.comparator = &test_cmp;
3559 DestroyAndReopen(options);
3560 WriteOptions write_opts;
3561 std::string put_ts = Timestamp(0, 0);
3562 const int kNum = 4, kNumPerFile = 1, kRangeBegin = 1, kRangeEnd = 3;
3563 options.memtable_factory.reset(test::NewSpecialSkipListFactory(kNumPerFile));
3564 const Snapshot* before_tombstone = nullptr;
3565 const Snapshot* after_tombstone = nullptr;
3566 for (int i = 0; i < kNum; ++i) {
3567 ASSERT_OK(db_->Put(WriteOptions(), Key1(i), Timestamp(i, 0),
3568 "val" + std::to_string(i)));
3569 if (i == kRangeBegin) {
3570 before_tombstone = db_->GetSnapshot();
3571 ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
3572 Key1(kRangeBegin), Key1(kRangeEnd),
3573 Timestamp(kRangeBegin, 0)));
3574 }
3575 if (i == kNum / 2) {
3576 ASSERT_OK(Flush());
3577 }
3578 }
3579 assert(before_tombstone);
3580 after_tombstone = db_->GetSnapshot();
3581 // snapshot and ts before tombstone
3582 std::string read_ts_str = Timestamp(kRangeBegin - 1, 0); // (0, 0)
3583 Slice read_ts = read_ts_str;
3584 ReadOptions read_opts;
3585 read_opts.timestamp = &read_ts;
3586 read_opts.snapshot = before_tombstone;
3587 std::vector<Status> expected_status = {
3588 Status::OK(), Status::NotFound(), Status::NotFound(), Status::NotFound()};
3589 std::vector<std::string> expected_values(kNum);
3590 expected_values[0] = "val" + std::to_string(0);
3591 std::vector<std::string> expected_timestamps(kNum);
3592 expected_timestamps[0] = Timestamp(0, 0);
3593
3594 size_t batch_size = kNum;
3595 std::vector<std::string> key_strs(batch_size);
3596 std::vector<Slice> keys(batch_size);
3597 std::vector<PinnableSlice> values(batch_size);
3598 std::vector<Status> statuses(batch_size);
3599 std::vector<std::string> timestamps(batch_size);
3600 for (int i = 0; i < kNum; ++i) {
3601 key_strs[i] = Key1(i);
3602 keys[i] = key_strs[i];
3603 }
3604
3605 auto verify = [&] {
3606 db_->MultiGet(read_opts, db_->DefaultColumnFamily(), batch_size,
3607 keys.data(), values.data(), timestamps.data(),
3608 statuses.data(), true /* sorted_input */);
3609 std::string value, timestamp;
3610 Status s;
3611 for (int i = 0; i < kNum; ++i) {
3612 s = db_->Get(read_opts, Key1(i), &value, &timestamp);
3613 ASSERT_EQ(s, expected_status[i]);
3614 ASSERT_EQ(statuses[i], expected_status[i]);
3615 if (s.ok()) {
3616 ASSERT_EQ(value, expected_values[i]);
3617 ASSERT_EQ(values[i], expected_values[i]);
3618 }
3619 if (!timestamp.empty()) {
3620 ASSERT_EQ(timestamp, expected_timestamps[i]);
3621 ASSERT_EQ(timestamps[i], expected_timestamps[i]);
3622 } else {
3623 ASSERT_TRUE(timestamps[i].empty());
3624 }
3625 }
3626 std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
3627 std::unique_ptr<Iterator> iter_for_seek(db_->NewIterator(read_opts));
3628 iter->SeekToFirst();
3629 for (int i = 0; i < kNum; ++i) {
3630 if (expected_status[i].ok()) {
3631 auto verify_iter = [&](Iterator* iter_ptr) {
3632 ASSERT_TRUE(iter_ptr->Valid());
3633 ASSERT_EQ(iter_ptr->key(), keys[i]);
3634 ASSERT_EQ(iter_ptr->value(), expected_values[i]);
3635 ASSERT_EQ(iter_ptr->timestamp(), expected_timestamps[i]);
3636 };
3637 verify_iter(iter.get());
3638 iter->Next();
3639
3640 iter_for_seek->Seek(keys[i]);
3641 verify_iter(iter_for_seek.get());
3642
3643 iter_for_seek->SeekForPrev(keys[i]);
3644 verify_iter(iter_for_seek.get());
3645 }
3646 }
3647 ASSERT_FALSE(iter->Valid());
3648 ASSERT_OK(iter->status());
3649 };
3650
3651 verify();
3652
3653 // snapshot before tombstone and ts after tombstone
3654 read_ts_str = Timestamp(kNum, 0); // (4, 0)
3655 read_ts = read_ts_str;
3656 read_opts.timestamp = &read_ts;
3657 read_opts.snapshot = before_tombstone;
3658 expected_status[1] = Status::OK();
3659 expected_timestamps[1] = Timestamp(1, 0);
3660 expected_values[1] = "val" + std::to_string(1);
3661 verify();
3662
3663 // snapshot after tombstone and ts before tombstone
3664 read_ts_str = Timestamp(kRangeBegin - 1, 0); // (0, 0)
3665 read_ts = read_ts_str;
3666 read_opts.timestamp = &read_ts;
3667 read_opts.snapshot = after_tombstone;
3668 expected_status[1] = Status::NotFound();
3669 expected_timestamps[1].clear();
3670 expected_values[1].clear();
3671 verify();
3672
3673 // snapshot and ts after tombstone
3674 read_ts_str = Timestamp(kNum, 0); // (4, 0)
3675 read_ts = read_ts_str;
3676 read_opts.timestamp = &read_ts;
3677 read_opts.snapshot = after_tombstone;
3678 for (int i = 0; i < kNum; ++i) {
3679 if (i == kRangeBegin) {
3680 expected_status[i] = Status::NotFound();
3681 expected_values[i].clear();
3682 } else {
3683 expected_status[i] = Status::OK();
3684 expected_values[i] = "val" + std::to_string(i);
3685 }
3686 expected_timestamps[i] = Timestamp(i, 0);
3687 }
3688 verify();
3689
3690 db_->ReleaseSnapshot(before_tombstone);
3691 db_->ReleaseSnapshot(after_tombstone);
3692 Close();
3693 }
3694
3695 TEST_F(DBBasicTestWithTimestamp, MergeBasic) {
3696 Options options = GetDefaultOptions();
3697 options.create_if_missing = true;
3698 const size_t kTimestampSize = Timestamp(0, 0).size();
3699 TestComparator test_cmp(kTimestampSize);
3700 options.comparator = &test_cmp;
3701 options.merge_operator = std::make_shared<StringAppendTESTOperator>('.');
3702 DestroyAndReopen(options);
3703
3704 const std::array<std::string, 3> write_ts_strs = {
3705 Timestamp(100, 0), Timestamp(200, 0), Timestamp(300, 0)};
3706 constexpr size_t kNumOfUniqKeys = 100;
3707 ColumnFamilyHandle* default_cf = db_->DefaultColumnFamily();
3708
3709 for (size_t i = 0; i < write_ts_strs.size(); ++i) {
3710 for (size_t j = 0; j < kNumOfUniqKeys; ++j) {
3711 Status s;
3712 if (i == 0) {
3713 const std::string val = "v" + std::to_string(j) + "_0";
3714 s = db_->Put(WriteOptions(), Key1(j), write_ts_strs[i], val);
3715 } else {
3716 const std::string merge_op = std::to_string(i);
3717 s = db_->Merge(WriteOptions(), default_cf, Key1(j), write_ts_strs[i],
3718 merge_op);
3719 }
3720 ASSERT_OK(s);
3721 }
3722 }
3723
3724 std::array<std::string, 3> read_ts_strs = {
3725 Timestamp(150, 0), Timestamp(250, 0), Timestamp(350, 0)};
3726
3727 const auto verify_db_with_get = [&]() {
3728 for (size_t i = 0; i < kNumOfUniqKeys; ++i) {
3729 const std::string base_val = "v" + std::to_string(i) + "_0";
3730 const std::array<std::string, 3> expected_values = {
3731 base_val, base_val + ".1", base_val + ".1.2"};
3732 const std::array<std::string, 3>& expected_ts = write_ts_strs;
3733 ReadOptions read_opts;
3734 for (size_t j = 0; j < read_ts_strs.size(); ++j) {
3735 Slice read_ts = read_ts_strs[j];
3736 read_opts.timestamp = &read_ts;
3737 std::string value;
3738 std::string ts;
3739 const Status s = db_->Get(read_opts, Key1(i), &value, &ts);
3740 ASSERT_OK(s);
3741 ASSERT_EQ(expected_values[j], value);
3742 ASSERT_EQ(expected_ts[j], ts);
3743
3744 // Do Seek/SeekForPrev
3745 std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
3746 it->Seek(Key1(i));
3747 ASSERT_TRUE(it->Valid());
3748 ASSERT_EQ(expected_values[j], it->value());
3749 ASSERT_EQ(expected_ts[j], it->timestamp());
3750
3751 it->SeekForPrev(Key1(i));
3752 ASSERT_TRUE(it->Valid());
3753 ASSERT_EQ(expected_values[j], it->value());
3754 ASSERT_EQ(expected_ts[j], it->timestamp());
3755 }
3756 }
3757 };
3758
3759 const auto verify_db_with_iterator = [&]() {
3760 std::string value_suffix;
3761 for (size_t i = 0; i < read_ts_strs.size(); ++i) {
3762 ReadOptions read_opts;
3763 Slice read_ts = read_ts_strs[i];
3764 read_opts.timestamp = &read_ts;
3765 std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
3766 size_t key_int_val = 0;
3767 for (it->SeekToFirst(); it->Valid(); it->Next(), ++key_int_val) {
3768 const std::string key = Key1(key_int_val);
3769 const std::string value =
3770 "v" + std::to_string(key_int_val) + "_0" + value_suffix;
3771 ASSERT_EQ(key, it->key());
3772 ASSERT_EQ(value, it->value());
3773 ASSERT_EQ(write_ts_strs[i], it->timestamp());
3774 }
3775 ASSERT_EQ(kNumOfUniqKeys, key_int_val);
3776
3777 key_int_val = kNumOfUniqKeys - 1;
3778 for (it->SeekToLast(); it->Valid(); it->Prev(), --key_int_val) {
3779 const std::string key = Key1(key_int_val);
3780 const std::string value =
3781 "v" + std::to_string(key_int_val) + "_0" + value_suffix;
3782 ASSERT_EQ(key, it->key());
3783 ASSERT_EQ(value, it->value());
3784 ASSERT_EQ(write_ts_strs[i], it->timestamp());
3785 }
3786 ASSERT_EQ(std::numeric_limits<size_t>::max(), key_int_val);
3787
3788 value_suffix = value_suffix + "." + std::to_string(i + 1);
3789 }
3790 };
3791
3792 verify_db_with_get();
3793 verify_db_with_iterator();
3794
3795 ASSERT_OK(db_->Flush(FlushOptions()));
3796
3797 verify_db_with_get();
3798 verify_db_with_iterator();
3799
3800 Close();
3801 }
3802
3803 TEST_F(DBBasicTestWithTimestamp, MergeAfterDeletion) {
3804 Options options = GetDefaultOptions();
3805 options.create_if_missing = true;
3806 const size_t kTimestampSize = Timestamp(0, 0).size();
3807 TestComparator test_cmp(kTimestampSize);
3808 options.comparator = &test_cmp;
3809 options.merge_operator = std::make_shared<StringAppendTESTOperator>('.');
3810 DestroyAndReopen(options);
3811
3812 ColumnFamilyHandle* const column_family = db_->DefaultColumnFamily();
3813
3814 const size_t num_keys_per_file = 10;
3815 const size_t num_merges_per_key = 2;
3816 for (size_t i = 0; i < num_keys_per_file; ++i) {
3817 std::string ts = Timestamp(i + 10000, 0);
3818 Status s = db_->Delete(WriteOptions(), Key1(i), ts);
3819 ASSERT_OK(s);
3820 for (size_t j = 1; j <= num_merges_per_key; ++j) {
3821 ts = Timestamp(i + 10000 + j, 0);
3822 s = db_->Merge(WriteOptions(), column_family, Key1(i), ts,
3823 std::to_string(j));
3824 ASSERT_OK(s);
3825 }
3826 }
3827
3828 const auto verify_db = [&]() {
3829 ReadOptions read_opts;
3830 std::string read_ts_str = Timestamp(20000, 0);
3831 Slice ts = read_ts_str;
3832 read_opts.timestamp = &ts;
3833 std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
3834 size_t count = 0;
3835 for (it->SeekToFirst(); it->Valid(); it->Next(), ++count) {
3836 std::string key = Key1(count);
3837 ASSERT_EQ(key, it->key());
3838 std::string value;
3839 for (size_t j = 1; j <= num_merges_per_key; ++j) {
3840 value.append(std::to_string(j));
3841 if (j < num_merges_per_key) {
3842 value.push_back('.');
3843 }
3844 }
3845 ASSERT_EQ(value, it->value());
3846 std::string ts1 = Timestamp(count + 10000 + num_merges_per_key, 0);
3847 ASSERT_EQ(ts1, it->timestamp());
3848 }
3849 ASSERT_OK(it->status());
3850 ASSERT_EQ(num_keys_per_file, count);
3851 for (it->SeekToLast(); it->Valid(); it->Prev(), --count) {
3852 std::string key = Key1(count - 1);
3853 ASSERT_EQ(key, it->key());
3854 std::string value;
3855 for (size_t j = 1; j <= num_merges_per_key; ++j) {
3856 value.append(std::to_string(j));
3857 if (j < num_merges_per_key) {
3858 value.push_back('.');
3859 }
3860 }
3861 ASSERT_EQ(value, it->value());
3862 std::string ts1 = Timestamp(count - 1 + 10000 + num_merges_per_key, 0);
3863 ASSERT_EQ(ts1, it->timestamp());
3864 }
3865 ASSERT_OK(it->status());
3866 ASSERT_EQ(0, count);
3867 };
3868
3869 verify_db();
3870
3871 Close();
3872 }
3873 } // namespace ROCKSDB_NAMESPACE
3874
3875 int main(int argc, char** argv) {
3876 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
3877 ::testing::InitGoogleTest(&argc, argv);
3878 RegisterCustomObjects(argc, argv);
3879 return RUN_ALL_TESTS();
3880 }