1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "db/db_test_util.h"
10 #include "port/stack_trace.h"
11 #include "rocksdb/perf_context.h"
12 #include "rocksdb/utilities/debug.h"
13 #include "table/block_based/block_based_table_reader.h"
14 #include "table/block_based/block_builder.h"
15 #include "test_util/fault_injection_test_env.h"
16 #if !defined(ROCKSDB_LITE)
17 #include "test_util/sync_point.h"
20 namespace ROCKSDB_NAMESPACE
{
22 class DBBasicTest
: public DBTestBase
{
24 DBBasicTest() : DBTestBase("/db_basic_test") {}
27 TEST_F(DBBasicTest
, OpenWhenOpen
) {
28 Options options
= CurrentOptions();
30 ROCKSDB_NAMESPACE::DB
* db2
= nullptr;
31 ROCKSDB_NAMESPACE::Status s
= DB::Open(options
, dbname_
, &db2
);
33 ASSERT_EQ(Status::Code::kIOError
, s
.code());
34 ASSERT_EQ(Status::SubCode::kNone
, s
.subcode());
35 ASSERT_TRUE(strstr(s
.getState(), "lock ") != nullptr);
41 TEST_F(DBBasicTest
, ReadOnlyDB
) {
42 ASSERT_OK(Put("foo", "v1"));
43 ASSERT_OK(Put("bar", "v2"));
44 ASSERT_OK(Put("foo", "v3"));
47 auto options
= CurrentOptions();
48 assert(options
.env
== env_
);
49 ASSERT_OK(ReadOnlyReopen(options
));
50 ASSERT_EQ("v3", Get("foo"));
51 ASSERT_EQ("v2", Get("bar"));
52 Iterator
* iter
= db_
->NewIterator(ReadOptions());
54 for (iter
->SeekToFirst(); iter
->Valid(); iter
->Next()) {
55 ASSERT_OK(iter
->status());
62 // Reopen and flush memtable.
66 // Now check keys in read only mode.
67 ASSERT_OK(ReadOnlyReopen(options
));
68 ASSERT_EQ("v3", Get("foo"));
69 ASSERT_EQ("v2", Get("bar"));
70 ASSERT_TRUE(db_
->SyncWAL().IsNotSupported());
73 TEST_F(DBBasicTest
, ReadOnlyDBWithWriteDBIdToManifestSet
) {
74 ASSERT_OK(Put("foo", "v1"));
75 ASSERT_OK(Put("bar", "v2"));
76 ASSERT_OK(Put("foo", "v3"));
79 auto options
= CurrentOptions();
80 options
.write_dbid_to_manifest
= true;
81 assert(options
.env
== env_
);
82 ASSERT_OK(ReadOnlyReopen(options
));
84 db_
->GetDbIdentity(db_id1
);
85 ASSERT_EQ("v3", Get("foo"));
86 ASSERT_EQ("v2", Get("bar"));
87 Iterator
* iter
= db_
->NewIterator(ReadOptions());
89 for (iter
->SeekToFirst(); iter
->Valid(); iter
->Next()) {
90 ASSERT_OK(iter
->status());
97 // Reopen and flush memtable.
101 // Now check keys in read only mode.
102 ASSERT_OK(ReadOnlyReopen(options
));
103 ASSERT_EQ("v3", Get("foo"));
104 ASSERT_EQ("v2", Get("bar"));
105 ASSERT_TRUE(db_
->SyncWAL().IsNotSupported());
107 db_
->GetDbIdentity(db_id2
);
108 ASSERT_EQ(db_id1
, db_id2
);
111 TEST_F(DBBasicTest
, CompactedDB
) {
112 const uint64_t kFileSize
= 1 << 20;
113 Options options
= CurrentOptions();
114 options
.disable_auto_compactions
= true;
115 options
.write_buffer_size
= kFileSize
;
116 options
.target_file_size_base
= kFileSize
;
117 options
.max_bytes_for_level_base
= 1 << 30;
118 options
.compression
= kNoCompression
;
120 // 1 L0 file, use CompactedDB if max_open_files = -1
121 ASSERT_OK(Put("aaa", DummyString(kFileSize
/ 2, '1')));
124 ASSERT_OK(ReadOnlyReopen(options
));
125 Status s
= Put("new", "value");
126 ASSERT_EQ(s
.ToString(),
127 "Not implemented: Not supported operation in read only mode.");
128 ASSERT_EQ(DummyString(kFileSize
/ 2, '1'), Get("aaa"));
130 options
.max_open_files
= -1;
131 ASSERT_OK(ReadOnlyReopen(options
));
132 s
= Put("new", "value");
133 ASSERT_EQ(s
.ToString(),
134 "Not implemented: Not supported in compacted db mode.");
135 ASSERT_EQ(DummyString(kFileSize
/ 2, '1'), Get("aaa"));
139 ASSERT_OK(Put("bbb", DummyString(kFileSize
/ 2, '2')));
141 ASSERT_OK(Put("aaa", DummyString(kFileSize
/ 2, 'a')));
143 ASSERT_OK(Put("bbb", DummyString(kFileSize
/ 2, 'b')));
144 ASSERT_OK(Put("eee", DummyString(kFileSize
/ 2, 'e')));
148 ASSERT_OK(ReadOnlyReopen(options
));
149 // Fallback to read-only DB
150 s
= Put("new", "value");
151 ASSERT_EQ(s
.ToString(),
152 "Not implemented: Not supported operation in read only mode.");
158 ASSERT_OK(Put("fff", DummyString(kFileSize
/ 2, 'f')));
159 ASSERT_OK(Put("hhh", DummyString(kFileSize
/ 2, 'h')));
160 ASSERT_OK(Put("iii", DummyString(kFileSize
/ 2, 'i')));
161 ASSERT_OK(Put("jjj", DummyString(kFileSize
/ 2, 'j')));
162 db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr);
163 ASSERT_EQ(3, NumTableFilesAtLevel(1));
167 ASSERT_OK(ReadOnlyReopen(options
));
168 s
= Put("new", "value");
169 ASSERT_EQ(s
.ToString(),
170 "Not implemented: Not supported in compacted db mode.");
171 ASSERT_EQ("NOT_FOUND", Get("abc"));
172 ASSERT_EQ(DummyString(kFileSize
/ 2, 'a'), Get("aaa"));
173 ASSERT_EQ(DummyString(kFileSize
/ 2, 'b'), Get("bbb"));
174 ASSERT_EQ("NOT_FOUND", Get("ccc"));
175 ASSERT_EQ(DummyString(kFileSize
/ 2, 'e'), Get("eee"));
176 ASSERT_EQ(DummyString(kFileSize
/ 2, 'f'), Get("fff"));
177 ASSERT_EQ("NOT_FOUND", Get("ggg"));
178 ASSERT_EQ(DummyString(kFileSize
/ 2, 'h'), Get("hhh"));
179 ASSERT_EQ(DummyString(kFileSize
/ 2, 'i'), Get("iii"));
180 ASSERT_EQ(DummyString(kFileSize
/ 2, 'j'), Get("jjj"));
181 ASSERT_EQ("NOT_FOUND", Get("kkk"));
184 std::vector
<std::string
> values
;
185 std::vector
<Status
> status_list
= dbfull()->MultiGet(
187 std::vector
<Slice
>({Slice("aaa"), Slice("ccc"), Slice("eee"),
188 Slice("ggg"), Slice("iii"), Slice("kkk")}),
190 ASSERT_EQ(status_list
.size(), static_cast<uint64_t>(6));
191 ASSERT_EQ(values
.size(), static_cast<uint64_t>(6));
192 ASSERT_OK(status_list
[0]);
193 ASSERT_EQ(DummyString(kFileSize
/ 2, 'a'), values
[0]);
194 ASSERT_TRUE(status_list
[1].IsNotFound());
195 ASSERT_OK(status_list
[2]);
196 ASSERT_EQ(DummyString(kFileSize
/ 2, 'e'), values
[2]);
197 ASSERT_TRUE(status_list
[3].IsNotFound());
198 ASSERT_OK(status_list
[4]);
199 ASSERT_EQ(DummyString(kFileSize
/ 2, 'i'), values
[4]);
200 ASSERT_TRUE(status_list
[5].IsNotFound());
204 ASSERT_OK(Put("fff", DummyString(kFileSize
/ 2, 'f')));
206 ASSERT_OK(ReadOnlyReopen(options
));
207 s
= Put("new", "value");
208 ASSERT_EQ(s
.ToString(),
209 "Not implemented: Not supported operation in read only mode.");
212 TEST_F(DBBasicTest
, LevelLimitReopen
) {
213 Options options
= CurrentOptions();
214 CreateAndReopenWithCF({"pikachu"}, options
);
216 const std::string
value(1024 * 1024, ' ');
218 while (NumTableFilesAtLevel(2, 1) == 0) {
219 ASSERT_OK(Put(1, Key(i
++), value
));
220 dbfull()->TEST_WaitForFlushMemTable();
221 dbfull()->TEST_WaitForCompact();
224 options
.num_levels
= 1;
225 options
.max_bytes_for_level_multiplier_additional
.resize(1, 1);
226 Status s
= TryReopenWithColumnFamilies({"default", "pikachu"}, options
);
227 ASSERT_EQ(s
.IsInvalidArgument(), true);
228 ASSERT_EQ(s
.ToString(),
229 "Invalid argument: db has more levels than options.num_levels");
231 options
.num_levels
= 10;
232 options
.max_bytes_for_level_multiplier_additional
.resize(10, 1);
233 ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options
));
235 #endif // ROCKSDB_LITE
237 TEST_F(DBBasicTest
, PutDeleteGet
) {
239 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
240 ASSERT_OK(Put(1, "foo", "v1"));
241 ASSERT_EQ("v1", Get(1, "foo"));
242 ASSERT_OK(Put(1, "foo", "v2"));
243 ASSERT_EQ("v2", Get(1, "foo"));
244 ASSERT_OK(Delete(1, "foo"));
245 ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
246 } while (ChangeOptions());
249 TEST_F(DBBasicTest
, PutSingleDeleteGet
) {
251 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
252 ASSERT_OK(Put(1, "foo", "v1"));
253 ASSERT_EQ("v1", Get(1, "foo"));
254 ASSERT_OK(Put(1, "foo2", "v2"));
255 ASSERT_EQ("v2", Get(1, "foo2"));
256 ASSERT_OK(SingleDelete(1, "foo"));
257 ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
258 // Ski FIFO and universal compaction because they do not apply to the test
259 // case. Skip MergePut because single delete does not get removed when it
260 // encounters a merge.
261 } while (ChangeOptions(kSkipFIFOCompaction
| kSkipUniversalCompaction
|
265 TEST_F(DBBasicTest
, EmptyFlush
) {
266 // It is possible to produce empty flushes when using single deletes. Tests
267 // whether empty flushes cause issues.
271 Options options
= CurrentOptions();
272 options
.disable_auto_compactions
= true;
273 CreateAndReopenWithCF({"pikachu"}, options
);
275 Put(1, "a", Slice());
276 SingleDelete(1, "a");
279 ASSERT_EQ("[ ]", AllEntriesFor("a", 1));
280 // Skip FIFO and universal compaction as they do not apply to the test
281 // case. Skip MergePut because merges cannot be combined with single
283 } while (ChangeOptions(kSkipFIFOCompaction
| kSkipUniversalCompaction
|
287 TEST_F(DBBasicTest
, GetFromVersions
) {
289 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
290 ASSERT_OK(Put(1, "foo", "v1"));
292 ASSERT_EQ("v1", Get(1, "foo"));
293 ASSERT_EQ("NOT_FOUND", Get(0, "foo"));
294 } while (ChangeOptions());
298 TEST_F(DBBasicTest
, GetSnapshot
) {
299 anon::OptionsOverride options_override
;
300 options_override
.skip_policy
= kSkipNoSnapshot
;
302 CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override
));
303 // Try with both a short key and a long key
304 for (int i
= 0; i
< 2; i
++) {
305 std::string key
= (i
== 0) ? std::string("foo") : std::string(200, 'x');
306 ASSERT_OK(Put(1, key
, "v1"));
307 const Snapshot
* s1
= db_
->GetSnapshot();
308 ASSERT_OK(Put(1, key
, "v2"));
309 ASSERT_EQ("v2", Get(1, key
));
310 ASSERT_EQ("v1", Get(1, key
, s1
));
312 ASSERT_EQ("v2", Get(1, key
));
313 ASSERT_EQ("v1", Get(1, key
, s1
));
314 db_
->ReleaseSnapshot(s1
);
316 } while (ChangeOptions());
318 #endif // ROCKSDB_LITE
320 TEST_F(DBBasicTest
, CheckLock
) {
323 Options options
= CurrentOptions();
324 ASSERT_OK(TryReopen(options
));
326 // second open should fail
327 ASSERT_TRUE(!(DB::Open(options
, dbname_
, &localdb
)).ok());
328 } while (ChangeCompactOptions());
331 TEST_F(DBBasicTest
, FlushMultipleMemtable
) {
333 Options options
= CurrentOptions();
334 WriteOptions writeOpt
= WriteOptions();
335 writeOpt
.disableWAL
= true;
336 options
.max_write_buffer_number
= 4;
337 options
.min_write_buffer_number_to_merge
= 3;
338 options
.max_write_buffer_size_to_maintain
= -1;
339 CreateAndReopenWithCF({"pikachu"}, options
);
340 ASSERT_OK(dbfull()->Put(writeOpt
, handles_
[1], "foo", "v1"));
342 ASSERT_OK(dbfull()->Put(writeOpt
, handles_
[1], "bar", "v1"));
344 ASSERT_EQ("v1", Get(1, "foo"));
345 ASSERT_EQ("v1", Get(1, "bar"));
347 } while (ChangeCompactOptions());
350 TEST_F(DBBasicTest
, FlushEmptyColumnFamily
) {
351 // Block flush thread and disable compaction thread
352 env_
->SetBackgroundThreads(1, Env::HIGH
);
353 env_
->SetBackgroundThreads(1, Env::LOW
);
354 test::SleepingBackgroundTask sleeping_task_low
;
355 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
357 test::SleepingBackgroundTask sleeping_task_high
;
358 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
,
359 &sleeping_task_high
, Env::Priority::HIGH
);
361 Options options
= CurrentOptions();
362 // disable compaction
363 options
.disable_auto_compactions
= true;
364 WriteOptions writeOpt
= WriteOptions();
365 writeOpt
.disableWAL
= true;
366 options
.max_write_buffer_number
= 2;
367 options
.min_write_buffer_number_to_merge
= 1;
368 options
.max_write_buffer_size_to_maintain
=
369 static_cast<int64_t>(options
.write_buffer_size
);
370 CreateAndReopenWithCF({"pikachu"}, options
);
372 // Compaction can still go through even if no thread can flush the
377 // Insert can go through
378 ASSERT_OK(dbfull()->Put(writeOpt
, handles_
[0], "foo", "v1"));
379 ASSERT_OK(dbfull()->Put(writeOpt
, handles_
[1], "bar", "v1"));
381 ASSERT_EQ("v1", Get(0, "foo"));
382 ASSERT_EQ("v1", Get(1, "bar"));
384 sleeping_task_high
.WakeUp();
385 sleeping_task_high
.WaitUntilDone();
387 // Flush can still go through.
391 sleeping_task_low
.WakeUp();
392 sleeping_task_low
.WaitUntilDone();
395 TEST_F(DBBasicTest
, FLUSH
) {
397 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
398 WriteOptions writeOpt
= WriteOptions();
399 writeOpt
.disableWAL
= true;
400 SetPerfLevel(kEnableTime
);
401 ASSERT_OK(dbfull()->Put(writeOpt
, handles_
[1], "foo", "v1"));
402 // this will now also flush the last 2 writes
404 ASSERT_OK(dbfull()->Put(writeOpt
, handles_
[1], "bar", "v1"));
406 get_perf_context()->Reset();
408 ASSERT_TRUE((int)get_perf_context()->get_from_output_files_time
> 0);
409 ASSERT_EQ(2, (int)get_perf_context()->get_read_bytes
);
411 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
412 ASSERT_EQ("v1", Get(1, "foo"));
413 ASSERT_EQ("v1", Get(1, "bar"));
415 writeOpt
.disableWAL
= true;
416 ASSERT_OK(dbfull()->Put(writeOpt
, handles_
[1], "bar", "v2"));
417 ASSERT_OK(dbfull()->Put(writeOpt
, handles_
[1], "foo", "v2"));
420 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
421 ASSERT_EQ("v2", Get(1, "bar"));
422 get_perf_context()->Reset();
423 ASSERT_EQ("v2", Get(1, "foo"));
424 ASSERT_TRUE((int)get_perf_context()->get_from_output_files_time
> 0);
426 writeOpt
.disableWAL
= false;
427 ASSERT_OK(dbfull()->Put(writeOpt
, handles_
[1], "bar", "v3"));
428 ASSERT_OK(dbfull()->Put(writeOpt
, handles_
[1], "foo", "v3"));
431 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
432 // 'foo' should be there because its put
434 ASSERT_EQ("v3", Get(1, "foo"));
435 ASSERT_EQ("v3", Get(1, "bar"));
437 SetPerfLevel(kDisable
);
438 } while (ChangeCompactOptions());
441 TEST_F(DBBasicTest
, ManifestRollOver
) {
444 options
.max_manifest_file_size
= 10; // 10 bytes
445 options
= CurrentOptions(options
);
446 CreateAndReopenWithCF({"pikachu"}, options
);
448 ASSERT_OK(Put(1, "manifest_key1", std::string(1000, '1')));
449 ASSERT_OK(Put(1, "manifest_key2", std::string(1000, '2')));
450 ASSERT_OK(Put(1, "manifest_key3", std::string(1000, '3')));
451 uint64_t manifest_before_flush
= dbfull()->TEST_Current_Manifest_FileNo();
452 ASSERT_OK(Flush(1)); // This should trigger LogAndApply.
453 uint64_t manifest_after_flush
= dbfull()->TEST_Current_Manifest_FileNo();
454 ASSERT_GT(manifest_after_flush
, manifest_before_flush
);
455 ReopenWithColumnFamilies({"default", "pikachu"}, options
);
456 ASSERT_GT(dbfull()->TEST_Current_Manifest_FileNo(), manifest_after_flush
);
457 // check if a new manifest file got inserted or not.
458 ASSERT_EQ(std::string(1000, '1'), Get(1, "manifest_key1"));
459 ASSERT_EQ(std::string(1000, '2'), Get(1, "manifest_key2"));
460 ASSERT_EQ(std::string(1000, '3'), Get(1, "manifest_key3"));
462 } while (ChangeCompactOptions());
465 TEST_F(DBBasicTest
, IdentityAcrossRestarts1
) {
468 ASSERT_OK(db_
->GetDbIdentity(id1
));
470 Options options
= CurrentOptions();
473 ASSERT_OK(db_
->GetDbIdentity(id2
));
474 // id1 should match id2 because identity was not regenerated
475 ASSERT_EQ(id1
.compare(id2
), 0);
477 std::string idfilename
= IdentityFileName(dbname_
);
478 ASSERT_OK(env_
->DeleteFile(idfilename
));
481 ASSERT_OK(db_
->GetDbIdentity(id3
));
482 if (options
.write_dbid_to_manifest
) {
483 ASSERT_EQ(id1
.compare(id3
), 0);
485 // id1 should NOT match id3 because identity was regenerated
486 ASSERT_NE(id1
.compare(id3
), 0);
488 } while (ChangeCompactOptions());
491 TEST_F(DBBasicTest
, IdentityAcrossRestarts2
) {
494 ASSERT_OK(db_
->GetDbIdentity(id1
));
496 Options options
= CurrentOptions();
497 options
.write_dbid_to_manifest
= true;
500 ASSERT_OK(db_
->GetDbIdentity(id2
));
501 // id1 should match id2 because identity was not regenerated
502 ASSERT_EQ(id1
.compare(id2
), 0);
504 std::string idfilename
= IdentityFileName(dbname_
);
505 ASSERT_OK(env_
->DeleteFile(idfilename
));
508 ASSERT_OK(db_
->GetDbIdentity(id3
));
509 // id1 should NOT match id3 because identity was regenerated
511 } while (ChangeCompactOptions());
515 TEST_F(DBBasicTest
, Snapshot
) {
516 anon::OptionsOverride options_override
;
517 options_override
.skip_policy
= kSkipNoSnapshot
;
519 CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override
));
520 Put(0, "foo", "0v1");
521 Put(1, "foo", "1v1");
523 const Snapshot
* s1
= db_
->GetSnapshot();
524 ASSERT_EQ(1U, GetNumSnapshots());
525 uint64_t time_snap1
= GetTimeOldestSnapshots();
526 ASSERT_GT(time_snap1
, 0U);
527 ASSERT_EQ(GetSequenceOldestSnapshots(), s1
->GetSequenceNumber());
528 Put(0, "foo", "0v2");
529 Put(1, "foo", "1v2");
531 env_
->addon_time_
.fetch_add(1);
533 const Snapshot
* s2
= db_
->GetSnapshot();
534 ASSERT_EQ(2U, GetNumSnapshots());
535 ASSERT_EQ(time_snap1
, GetTimeOldestSnapshots());
536 ASSERT_EQ(GetSequenceOldestSnapshots(), s1
->GetSequenceNumber());
537 Put(0, "foo", "0v3");
538 Put(1, "foo", "1v3");
541 ManagedSnapshot
s3(db_
);
542 ASSERT_EQ(3U, GetNumSnapshots());
543 ASSERT_EQ(time_snap1
, GetTimeOldestSnapshots());
544 ASSERT_EQ(GetSequenceOldestSnapshots(), s1
->GetSequenceNumber());
546 Put(0, "foo", "0v4");
547 Put(1, "foo", "1v4");
548 ASSERT_EQ("0v1", Get(0, "foo", s1
));
549 ASSERT_EQ("1v1", Get(1, "foo", s1
));
550 ASSERT_EQ("0v2", Get(0, "foo", s2
));
551 ASSERT_EQ("1v2", Get(1, "foo", s2
));
552 ASSERT_EQ("0v3", Get(0, "foo", s3
.snapshot()));
553 ASSERT_EQ("1v3", Get(1, "foo", s3
.snapshot()));
554 ASSERT_EQ("0v4", Get(0, "foo"));
555 ASSERT_EQ("1v4", Get(1, "foo"));
558 ASSERT_EQ(2U, GetNumSnapshots());
559 ASSERT_EQ(time_snap1
, GetTimeOldestSnapshots());
560 ASSERT_EQ(GetSequenceOldestSnapshots(), s1
->GetSequenceNumber());
561 ASSERT_EQ("0v1", Get(0, "foo", s1
));
562 ASSERT_EQ("1v1", Get(1, "foo", s1
));
563 ASSERT_EQ("0v2", Get(0, "foo", s2
));
564 ASSERT_EQ("1v2", Get(1, "foo", s2
));
565 ASSERT_EQ("0v4", Get(0, "foo"));
566 ASSERT_EQ("1v4", Get(1, "foo"));
568 db_
->ReleaseSnapshot(s1
);
569 ASSERT_EQ("0v2", Get(0, "foo", s2
));
570 ASSERT_EQ("1v2", Get(1, "foo", s2
));
571 ASSERT_EQ("0v4", Get(0, "foo"));
572 ASSERT_EQ("1v4", Get(1, "foo"));
573 ASSERT_EQ(1U, GetNumSnapshots());
574 ASSERT_LT(time_snap1
, GetTimeOldestSnapshots());
575 ASSERT_EQ(GetSequenceOldestSnapshots(), s2
->GetSequenceNumber());
577 db_
->ReleaseSnapshot(s2
);
578 ASSERT_EQ(0U, GetNumSnapshots());
579 ASSERT_EQ(GetSequenceOldestSnapshots(), 0);
580 ASSERT_EQ("0v4", Get(0, "foo"));
581 ASSERT_EQ("1v4", Get(1, "foo"));
582 } while (ChangeOptions());
585 #endif // ROCKSDB_LITE
587 TEST_F(DBBasicTest
, CompactBetweenSnapshots
) {
588 anon::OptionsOverride options_override
;
589 options_override
.skip_policy
= kSkipNoSnapshot
;
591 Options options
= CurrentOptions(options_override
);
592 options
.disable_auto_compactions
= true;
593 CreateAndReopenWithCF({"pikachu"}, options
);
595 FillLevels("a", "z", 1);
597 Put(1, "foo", "first");
598 const Snapshot
* snapshot1
= db_
->GetSnapshot();
599 Put(1, "foo", "second");
600 Put(1, "foo", "third");
601 Put(1, "foo", "fourth");
602 const Snapshot
* snapshot2
= db_
->GetSnapshot();
603 Put(1, "foo", "fifth");
604 Put(1, "foo", "sixth");
606 // All entries (including duplicates) exist
607 // before any compaction or flush is triggered.
608 ASSERT_EQ(AllEntriesFor("foo", 1),
609 "[ sixth, fifth, fourth, third, second, first ]");
610 ASSERT_EQ("sixth", Get(1, "foo"));
611 ASSERT_EQ("fourth", Get(1, "foo", snapshot2
));
612 ASSERT_EQ("first", Get(1, "foo", snapshot1
));
614 // After a flush, "second", "third" and "fifth" should
617 ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth, fourth, first ]");
619 // after we release the snapshot1, only two values left
620 db_
->ReleaseSnapshot(snapshot1
);
621 FillLevels("a", "z", 1);
622 dbfull()->CompactRange(CompactRangeOptions(), handles_
[1], nullptr,
625 // We have only one valid snapshot snapshot2. Since snapshot1 is
626 // not valid anymore, "first" should be removed by a compaction.
627 ASSERT_EQ("sixth", Get(1, "foo"));
628 ASSERT_EQ("fourth", Get(1, "foo", snapshot2
));
629 ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth, fourth ]");
631 // after we release the snapshot2, only one value should be left
632 db_
->ReleaseSnapshot(snapshot2
);
633 FillLevels("a", "z", 1);
634 dbfull()->CompactRange(CompactRangeOptions(), handles_
[1], nullptr,
636 ASSERT_EQ("sixth", Get(1, "foo"));
637 ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth ]");
638 } while (ChangeOptions(kSkipFIFOCompaction
));
641 TEST_F(DBBasicTest
, DBOpen_Options
) {
642 Options options
= CurrentOptions();
646 // Does not exist, and create_if_missing == false: error
648 options
.create_if_missing
= false;
649 Status s
= DB::Open(options
, dbname_
, &db
);
650 ASSERT_TRUE(strstr(s
.ToString().c_str(), "does not exist") != nullptr);
651 ASSERT_TRUE(db
== nullptr);
653 // Does not exist, and create_if_missing == true: OK
654 options
.create_if_missing
= true;
655 s
= DB::Open(options
, dbname_
, &db
);
657 ASSERT_TRUE(db
!= nullptr);
662 // Does exist, and error_if_exists == true: error
663 options
.create_if_missing
= false;
664 options
.error_if_exists
= true;
665 s
= DB::Open(options
, dbname_
, &db
);
666 ASSERT_TRUE(strstr(s
.ToString().c_str(), "exists") != nullptr);
667 ASSERT_TRUE(db
== nullptr);
669 // Does exist, and error_if_exists == false: OK
670 options
.create_if_missing
= true;
671 options
.error_if_exists
= false;
672 s
= DB::Open(options
, dbname_
, &db
);
674 ASSERT_TRUE(db
!= nullptr);
680 TEST_F(DBBasicTest
, CompactOnFlush
) {
681 anon::OptionsOverride options_override
;
682 options_override
.skip_policy
= kSkipNoSnapshot
;
684 Options options
= CurrentOptions(options_override
);
685 options
.disable_auto_compactions
= true;
686 CreateAndReopenWithCF({"pikachu"}, options
);
690 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v1 ]");
692 // Write two new keys
693 Put(1, "a", "begin");
697 // Case1: Delete followed by a put
700 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, DEL, v1 ]");
702 // After the current memtable is flushed, the DEL should
705 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, v1 ]");
707 dbfull()->CompactRange(CompactRangeOptions(), handles_
[1], nullptr,
709 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2 ]");
711 // Case 2: Delete followed by another delete
714 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, DEL, v2 ]");
716 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v2 ]");
717 dbfull()->CompactRange(CompactRangeOptions(), handles_
[1], nullptr,
719 ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
721 // Case 3: Put followed by a delete
724 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v3 ]");
726 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL ]");
727 dbfull()->CompactRange(CompactRangeOptions(), handles_
[1], nullptr,
729 ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
731 // Case 4: Put followed by another Put
734 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5, v4 ]");
736 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]");
737 dbfull()->CompactRange(CompactRangeOptions(), handles_
[1], nullptr,
739 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]");
743 dbfull()->CompactRange(CompactRangeOptions(), handles_
[1], nullptr,
745 ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
747 // Case 5: Put followed by snapshot followed by another Put
748 // Both puts should remain.
750 const Snapshot
* snapshot
= db_
->GetSnapshot();
753 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v7, v6 ]");
754 db_
->ReleaseSnapshot(snapshot
);
758 dbfull()->CompactRange(CompactRangeOptions(), handles_
[1], nullptr,
760 ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
762 // Case 5: snapshot followed by a put followed by another Put
763 // Only the last put should remain.
764 const Snapshot
* snapshot1
= db_
->GetSnapshot();
768 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v9 ]");
769 db_
->ReleaseSnapshot(snapshot1
);
770 } while (ChangeCompactOptions());
773 TEST_F(DBBasicTest
, FlushOneColumnFamily
) {
774 Options options
= CurrentOptions();
775 CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
776 "alyosha", "popovich"},
779 ASSERT_OK(Put(0, "Default", "Default"));
780 ASSERT_OK(Put(1, "pikachu", "pikachu"));
781 ASSERT_OK(Put(2, "ilya", "ilya"));
782 ASSERT_OK(Put(3, "muromec", "muromec"));
783 ASSERT_OK(Put(4, "dobrynia", "dobrynia"));
784 ASSERT_OK(Put(5, "nikitich", "nikitich"));
785 ASSERT_OK(Put(6, "alyosha", "alyosha"));
786 ASSERT_OK(Put(7, "popovich", "popovich"));
788 for (int i
= 0; i
< 8; ++i
) {
790 auto tables
= ListTableFiles(env_
, dbname_
);
791 ASSERT_EQ(tables
.size(), i
+ 1U);
795 TEST_F(DBBasicTest
, MultiGetSimple
) {
797 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
798 SetPerfLevel(kEnableCount
);
799 ASSERT_OK(Put(1, "k1", "v1"));
800 ASSERT_OK(Put(1, "k2", "v2"));
801 ASSERT_OK(Put(1, "k3", "v3"));
802 ASSERT_OK(Put(1, "k4", "v4"));
803 ASSERT_OK(Delete(1, "k4"));
804 ASSERT_OK(Put(1, "k5", "v5"));
805 ASSERT_OK(Delete(1, "no_key"));
807 std::vector
<Slice
> keys({"k1", "k2", "k3", "k4", "k5", "no_key"});
809 std::vector
<std::string
> values(20, "Temporary data to be overwritten");
810 std::vector
<ColumnFamilyHandle
*> cfs(keys
.size(), handles_
[1]);
812 get_perf_context()->Reset();
813 std::vector
<Status
> s
= db_
->MultiGet(ReadOptions(), cfs
, keys
, &values
);
814 ASSERT_EQ(values
.size(), keys
.size());
815 ASSERT_EQ(values
[0], "v1");
816 ASSERT_EQ(values
[1], "v2");
817 ASSERT_EQ(values
[2], "v3");
818 ASSERT_EQ(values
[4], "v5");
819 // four kv pairs * two bytes per value
820 ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes
);
825 ASSERT_TRUE(s
[3].IsNotFound());
827 ASSERT_TRUE(s
[5].IsNotFound());
828 SetPerfLevel(kDisable
);
829 } while (ChangeCompactOptions());
832 TEST_F(DBBasicTest
, MultiGetEmpty
) {
834 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
836 std::vector
<Slice
> keys
;
837 std::vector
<std::string
> values
;
838 std::vector
<ColumnFamilyHandle
*> cfs
;
839 std::vector
<Status
> s
= db_
->MultiGet(ReadOptions(), cfs
, keys
, &values
);
840 ASSERT_EQ(s
.size(), 0U);
842 // Empty Database, Empty Key Set
843 Options options
= CurrentOptions();
844 options
.create_if_missing
= true;
845 DestroyAndReopen(options
);
846 CreateAndReopenWithCF({"pikachu"}, options
);
847 s
= db_
->MultiGet(ReadOptions(), cfs
, keys
, &values
);
848 ASSERT_EQ(s
.size(), 0U);
850 // Empty Database, Search for Keys
854 cfs
.push_back(handles_
[0]);
855 cfs
.push_back(handles_
[1]);
856 s
= db_
->MultiGet(ReadOptions(), cfs
, keys
, &values
);
857 ASSERT_EQ(static_cast<int>(s
.size()), 2);
858 ASSERT_TRUE(s
[0].IsNotFound() && s
[1].IsNotFound());
859 } while (ChangeCompactOptions());
862 TEST_F(DBBasicTest
, ChecksumTest
) {
863 BlockBasedTableOptions table_options
;
864 Options options
= CurrentOptions();
865 // change when new checksum type added
866 int max_checksum
= static_cast<int>(kxxHash64
);
867 const int kNumPerFile
= 2;
869 // generate one table with each type of checksum
870 for (int i
= 0; i
<= max_checksum
; ++i
) {
871 table_options
.checksum
= static_cast<ChecksumType
>(i
);
872 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
874 for (int j
= 0; j
< kNumPerFile
; ++j
) {
875 ASSERT_OK(Put(Key(i
* kNumPerFile
+ j
), Key(i
* kNumPerFile
+ j
)));
880 // with each valid checksum type setting...
881 for (int i
= 0; i
<= max_checksum
; ++i
) {
882 table_options
.checksum
= static_cast<ChecksumType
>(i
);
883 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
885 // verify every type of checksum (should be regardless of that setting)
886 for (int j
= 0; j
< (max_checksum
+ 1) * kNumPerFile
; ++j
) {
887 ASSERT_EQ(Key(j
), Get(Key(j
)));
892 // On Windows you can have either memory mapped file or a file
893 // with unbuffered access. So this asserts and does not make
896 TEST_F(DBBasicTest
, MmapAndBufferOptions
) {
897 if (!IsMemoryMappedAccessSupported()) {
900 Options options
= CurrentOptions();
902 options
.use_direct_reads
= true;
903 options
.allow_mmap_reads
= true;
904 ASSERT_NOK(TryReopen(options
));
906 // All other combinations are acceptable
907 options
.use_direct_reads
= false;
908 ASSERT_OK(TryReopen(options
));
910 if (IsDirectIOSupported()) {
911 options
.use_direct_reads
= true;
912 options
.allow_mmap_reads
= false;
913 ASSERT_OK(TryReopen(options
));
916 options
.use_direct_reads
= false;
917 ASSERT_OK(TryReopen(options
));
921 class TestEnv
: public EnvWrapper
{
923 explicit TestEnv(Env
* base_env
) : EnvWrapper(base_env
), close_count(0) {}
925 class TestLogger
: public Logger
{
928 explicit TestLogger(TestEnv
* env_ptr
) : Logger() { env
= env_ptr
; }
929 ~TestLogger() override
{
934 void Logv(const char* /*format*/, va_list /*ap*/) override
{}
937 Status
CloseImpl() override
{ return CloseHelper(); }
940 Status
CloseHelper() {
941 env
->CloseCountInc();
943 return Status::IOError();
948 void CloseCountInc() { close_count
++; }
950 int GetCloseCount() { return close_count
; }
952 Status
NewLogger(const std::string
& /*fname*/,
953 std::shared_ptr
<Logger
>* result
) override
{
954 result
->reset(new TestLogger(this));
962 TEST_F(DBBasicTest
, DBClose
) {
963 Options options
= GetDefaultOptions();
964 std::string dbname
= test::PerThreadDBPath("db_close_test");
965 ASSERT_OK(DestroyDB(dbname
, options
));
968 TestEnv
* env
= new TestEnv(env_
);
969 std::unique_ptr
<TestEnv
> local_env_guard(env
);
970 options
.create_if_missing
= true;
972 Status s
= DB::Open(options
, dbname
, &db
);
974 ASSERT_TRUE(db
!= nullptr);
977 ASSERT_EQ(env
->GetCloseCount(), 1);
978 ASSERT_EQ(s
, Status::IOError());
981 ASSERT_EQ(env
->GetCloseCount(), 1);
983 // Do not call DB::Close() and ensure our logger Close() still gets called
984 s
= DB::Open(options
, dbname
, &db
);
986 ASSERT_TRUE(db
!= nullptr);
988 ASSERT_EQ(env
->GetCloseCount(), 2);
990 // Provide our own logger and ensure DB::Close() does not close it
991 options
.info_log
.reset(new TestEnv::TestLogger(env
));
992 options
.create_if_missing
= false;
993 s
= DB::Open(options
, dbname
, &db
);
995 ASSERT_TRUE(db
!= nullptr);
998 ASSERT_EQ(s
, Status::OK());
1000 ASSERT_EQ(env
->GetCloseCount(), 2);
1001 options
.info_log
.reset();
1002 ASSERT_EQ(env
->GetCloseCount(), 3);
1005 TEST_F(DBBasicTest
, DBCloseFlushError
) {
1006 std::unique_ptr
<FaultInjectionTestEnv
> fault_injection_env(
1007 new FaultInjectionTestEnv(env_
));
1008 Options options
= GetDefaultOptions();
1009 options
.create_if_missing
= true;
1010 options
.manual_wal_flush
= true;
1011 options
.write_buffer_size
=100;
1012 options
.env
= fault_injection_env
.get();
1015 ASSERT_OK(Put("key1", "value1"));
1016 ASSERT_OK(Put("key2", "value2"));
1017 ASSERT_OK(dbfull()->TEST_SwitchMemtable());
1018 ASSERT_OK(Put("key3", "value3"));
1019 fault_injection_env
->SetFilesystemActive(false);
1020 Status s
= dbfull()->Close();
1021 fault_injection_env
->SetFilesystemActive(true);
1022 ASSERT_NE(s
, Status::OK());
1027 class DBMultiGetTestWithParam
: public DBBasicTest
,
1028 public testing::WithParamInterface
<bool> {};
1030 TEST_P(DBMultiGetTestWithParam
, MultiGetMultiCF
) {
1031 Options options
= CurrentOptions();
1032 CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
1033 "alyosha", "popovich"},
1035 // <CF, key, value> tuples
1036 std::vector
<std::tuple
<int, std::string
, std::string
>> cf_kv_vec
;
1037 static const int num_keys
= 24;
1038 cf_kv_vec
.reserve(num_keys
);
1040 for (int i
= 0; i
< num_keys
; ++i
) {
1043 cf_kv_vec
.emplace_back(std::make_tuple(
1044 cf
, "cf" + std::to_string(cf
) + "_key_" + std::to_string(cf_key
),
1045 "cf" + std::to_string(cf
) + "_val_" + std::to_string(cf_key
)));
1046 ASSERT_OK(Put(std::get
<0>(cf_kv_vec
[i
]), std::get
<1>(cf_kv_vec
[i
]),
1047 std::get
<2>(cf_kv_vec
[i
])));
1050 int get_sv_count
= 0;
1051 ROCKSDB_NAMESPACE::DBImpl
* db
= reinterpret_cast<DBImpl
*>(db_
);
1052 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1053 "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
1054 if (++get_sv_count
== 2) {
1055 // After MultiGet refs a couple of CFs, flush all CFs so MultiGet
1056 // is forced to repeat the process
1057 for (int i
= 0; i
< num_keys
; ++i
) {
1061 ASSERT_OK(Flush(cf
));
1063 ASSERT_OK(Put(std::get
<0>(cf_kv_vec
[i
]), std::get
<1>(cf_kv_vec
[i
]),
1064 std::get
<2>(cf_kv_vec
[i
]) + "_2"));
1067 if (get_sv_count
== 11) {
1068 for (int i
= 0; i
< 8; ++i
) {
1069 auto* cfd
= reinterpret_cast<ColumnFamilyHandleImpl
*>(
1070 db
->GetColumnFamilyHandle(i
))
1072 ASSERT_EQ(cfd
->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse
);
1076 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1078 std::vector
<int> cfs
;
1079 std::vector
<std::string
> keys
;
1080 std::vector
<std::string
> values
;
1082 for (int i
= 0; i
< num_keys
; ++i
) {
1083 cfs
.push_back(std::get
<0>(cf_kv_vec
[i
]));
1084 keys
.push_back(std::get
<1>(cf_kv_vec
[i
]));
1087 values
= MultiGet(cfs
, keys
, nullptr, GetParam());
1088 ASSERT_EQ(values
.size(), num_keys
);
1089 for (unsigned int j
= 0; j
< values
.size(); ++j
) {
1090 ASSERT_EQ(values
[j
], std::get
<2>(cf_kv_vec
[j
]) + "_2");
1095 cfs
.push_back(std::get
<0>(cf_kv_vec
[0]));
1096 keys
.push_back(std::get
<1>(cf_kv_vec
[0]));
1097 cfs
.push_back(std::get
<0>(cf_kv_vec
[3]));
1098 keys
.push_back(std::get
<1>(cf_kv_vec
[3]));
1099 cfs
.push_back(std::get
<0>(cf_kv_vec
[4]));
1100 keys
.push_back(std::get
<1>(cf_kv_vec
[4]));
1101 values
= MultiGet(cfs
, keys
, nullptr, GetParam());
1102 ASSERT_EQ(values
[0], std::get
<2>(cf_kv_vec
[0]) + "_2");
1103 ASSERT_EQ(values
[1], std::get
<2>(cf_kv_vec
[3]) + "_2");
1104 ASSERT_EQ(values
[2], std::get
<2>(cf_kv_vec
[4]) + "_2");
1108 cfs
.push_back(std::get
<0>(cf_kv_vec
[7]));
1109 keys
.push_back(std::get
<1>(cf_kv_vec
[7]));
1110 cfs
.push_back(std::get
<0>(cf_kv_vec
[6]));
1111 keys
.push_back(std::get
<1>(cf_kv_vec
[6]));
1112 cfs
.push_back(std::get
<0>(cf_kv_vec
[1]));
1113 keys
.push_back(std::get
<1>(cf_kv_vec
[1]));
1114 values
= MultiGet(cfs
, keys
, nullptr, GetParam());
1115 ASSERT_EQ(values
[0], std::get
<2>(cf_kv_vec
[7]) + "_2");
1116 ASSERT_EQ(values
[1], std::get
<2>(cf_kv_vec
[6]) + "_2");
1117 ASSERT_EQ(values
[2], std::get
<2>(cf_kv_vec
[1]) + "_2");
1119 for (int cf
= 0; cf
< 8; ++cf
) {
1120 auto* cfd
= reinterpret_cast<ColumnFamilyHandleImpl
*>(
1121 reinterpret_cast<DBImpl
*>(db_
)->GetColumnFamilyHandle(cf
))
1123 ASSERT_NE(cfd
->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse
);
1124 ASSERT_NE(cfd
->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete
);
1128 TEST_P(DBMultiGetTestWithParam
, MultiGetMultiCFMutex
) {
1129 Options options
= CurrentOptions();
1130 CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
1131 "alyosha", "popovich"},
1134 for (int i
= 0; i
< 8; ++i
) {
1135 ASSERT_OK(Put(i
, "cf" + std::to_string(i
) + "_key",
1136 "cf" + std::to_string(i
) + "_val"));
1139 int get_sv_count
= 0;
1141 bool last_try
= false;
1142 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1143 "DBImpl::MultiGet::LastTry", [&](void* /*arg*/) {
1145 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1147 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1148 "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
1152 if (++get_sv_count
== 2) {
1155 for (int i
= 0; i
< 8; ++i
) {
1156 ASSERT_OK(Flush(i
));
1158 i
, "cf" + std::to_string(i
) + "_key",
1159 "cf" + std::to_string(i
) + "_val" + std::to_string(retries
)));
1163 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1165 std::vector
<int> cfs
;
1166 std::vector
<std::string
> keys
;
1167 std::vector
<std::string
> values
;
1169 for (int i
= 0; i
< 8; ++i
) {
1171 keys
.push_back("cf" + std::to_string(i
) + "_key");
1174 values
= MultiGet(cfs
, keys
, nullptr, GetParam());
1175 ASSERT_TRUE(last_try
);
1176 ASSERT_EQ(values
.size(), 8);
1177 for (unsigned int j
= 0; j
< values
.size(); ++j
) {
1178 ASSERT_EQ(values
[j
],
1179 "cf" + std::to_string(j
) + "_val" + std::to_string(retries
));
1181 for (int i
= 0; i
< 8; ++i
) {
1182 auto* cfd
= reinterpret_cast<ColumnFamilyHandleImpl
*>(
1183 reinterpret_cast<DBImpl
*>(db_
)->GetColumnFamilyHandle(i
))
1185 ASSERT_NE(cfd
->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse
);
1189 TEST_P(DBMultiGetTestWithParam
, MultiGetMultiCFSnapshot
) {
1190 Options options
= CurrentOptions();
1191 CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
1192 "alyosha", "popovich"},
1195 for (int i
= 0; i
< 8; ++i
) {
1196 ASSERT_OK(Put(i
, "cf" + std::to_string(i
) + "_key",
1197 "cf" + std::to_string(i
) + "_val"));
1200 int get_sv_count
= 0;
1201 ROCKSDB_NAMESPACE::DBImpl
* db
= reinterpret_cast<DBImpl
*>(db_
);
1202 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1203 "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
1204 if (++get_sv_count
== 2) {
1205 for (int i
= 0; i
< 8; ++i
) {
1206 ASSERT_OK(Flush(i
));
1207 ASSERT_OK(Put(i
, "cf" + std::to_string(i
) + "_key",
1208 "cf" + std::to_string(i
) + "_val2"));
1211 if (get_sv_count
== 8) {
1212 for (int i
= 0; i
< 8; ++i
) {
1213 auto* cfd
= reinterpret_cast<ColumnFamilyHandleImpl
*>(
1214 db
->GetColumnFamilyHandle(i
))
1217 (cfd
->TEST_GetLocalSV()->Get() == SuperVersion::kSVInUse
) ||
1218 (cfd
->TEST_GetLocalSV()->Get() == SuperVersion::kSVObsolete
));
1222 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1224 std::vector
<int> cfs
;
1225 std::vector
<std::string
> keys
;
1226 std::vector
<std::string
> values
;
1228 for (int i
= 0; i
< 8; ++i
) {
1230 keys
.push_back("cf" + std::to_string(i
) + "_key");
1233 const Snapshot
* snapshot
= db_
->GetSnapshot();
1234 values
= MultiGet(cfs
, keys
, snapshot
, GetParam());
1235 db_
->ReleaseSnapshot(snapshot
);
1236 ASSERT_EQ(values
.size(), 8);
1237 for (unsigned int j
= 0; j
< values
.size(); ++j
) {
1238 ASSERT_EQ(values
[j
], "cf" + std::to_string(j
) + "_val");
1240 for (int i
= 0; i
< 8; ++i
) {
1241 auto* cfd
= reinterpret_cast<ColumnFamilyHandleImpl
*>(
1242 reinterpret_cast<DBImpl
*>(db_
)->GetColumnFamilyHandle(i
))
1244 ASSERT_NE(cfd
->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse
);
1248 INSTANTIATE_TEST_CASE_P(DBMultiGetTestWithParam
, DBMultiGetTestWithParam
,
1251 TEST_F(DBBasicTest
, MultiGetBatchedSimpleUnsorted
) {
1253 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
1254 SetPerfLevel(kEnableCount
);
1255 ASSERT_OK(Put(1, "k1", "v1"));
1256 ASSERT_OK(Put(1, "k2", "v2"));
1257 ASSERT_OK(Put(1, "k3", "v3"));
1258 ASSERT_OK(Put(1, "k4", "v4"));
1259 ASSERT_OK(Delete(1, "k4"));
1260 ASSERT_OK(Put(1, "k5", "v5"));
1261 ASSERT_OK(Delete(1, "no_key"));
1263 get_perf_context()->Reset();
1265 std::vector
<Slice
> keys({"no_key", "k5", "k4", "k3", "k2", "k1"});
1266 std::vector
<PinnableSlice
> values(keys
.size());
1267 std::vector
<ColumnFamilyHandle
*> cfs(keys
.size(), handles_
[1]);
1268 std::vector
<Status
> s(keys
.size());
1270 db_
->MultiGet(ReadOptions(), handles_
[1], keys
.size(), keys
.data(),
1271 values
.data(), s
.data(), false);
1273 ASSERT_EQ(values
.size(), keys
.size());
1274 ASSERT_EQ(std::string(values
[5].data(), values
[5].size()), "v1");
1275 ASSERT_EQ(std::string(values
[4].data(), values
[4].size()), "v2");
1276 ASSERT_EQ(std::string(values
[3].data(), values
[3].size()), "v3");
1277 ASSERT_EQ(std::string(values
[1].data(), values
[1].size()), "v5");
1278 // four kv pairs * two bytes per value
1279 ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes
);
1281 ASSERT_TRUE(s
[0].IsNotFound());
1283 ASSERT_TRUE(s
[2].IsNotFound());
1288 SetPerfLevel(kDisable
);
1289 } while (ChangeCompactOptions());
1292 TEST_F(DBBasicTest
, MultiGetBatchedSimpleSorted
) {
1294 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
1295 SetPerfLevel(kEnableCount
);
1296 ASSERT_OK(Put(1, "k1", "v1"));
1297 ASSERT_OK(Put(1, "k2", "v2"));
1298 ASSERT_OK(Put(1, "k3", "v3"));
1299 ASSERT_OK(Put(1, "k4", "v4"));
1300 ASSERT_OK(Delete(1, "k4"));
1301 ASSERT_OK(Put(1, "k5", "v5"));
1302 ASSERT_OK(Delete(1, "no_key"));
1304 get_perf_context()->Reset();
1306 std::vector
<Slice
> keys({"k1", "k2", "k3", "k4", "k5", "no_key"});
1307 std::vector
<PinnableSlice
> values(keys
.size());
1308 std::vector
<ColumnFamilyHandle
*> cfs(keys
.size(), handles_
[1]);
1309 std::vector
<Status
> s(keys
.size());
1311 db_
->MultiGet(ReadOptions(), handles_
[1], keys
.size(), keys
.data(),
1312 values
.data(), s
.data(), true);
1314 ASSERT_EQ(values
.size(), keys
.size());
1315 ASSERT_EQ(std::string(values
[0].data(), values
[0].size()), "v1");
1316 ASSERT_EQ(std::string(values
[1].data(), values
[1].size()), "v2");
1317 ASSERT_EQ(std::string(values
[2].data(), values
[2].size()), "v3");
1318 ASSERT_EQ(std::string(values
[4].data(), values
[4].size()), "v5");
1319 // four kv pairs * two bytes per value
1320 ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes
);
1325 ASSERT_TRUE(s
[3].IsNotFound());
1327 ASSERT_TRUE(s
[5].IsNotFound());
1329 SetPerfLevel(kDisable
);
1330 } while (ChangeCompactOptions());
1333 TEST_F(DBBasicTest
, MultiGetBatchedMultiLevel
) {
1334 Options options
= CurrentOptions();
1335 options
.disable_auto_compactions
= true;
1339 for (int i
= 0; i
< 128; ++i
) {
1340 ASSERT_OK(Put("key_" + std::to_string(i
), "val_l2_" + std::to_string(i
)));
1342 if (num_keys
== 8) {
1351 MoveFilesToLevel(2);
1353 for (int i
= 0; i
< 128; i
+= 3) {
1354 ASSERT_OK(Put("key_" + std::to_string(i
), "val_l1_" + std::to_string(i
)));
1356 if (num_keys
== 8) {
1365 MoveFilesToLevel(1);
1367 for (int i
= 0; i
< 128; i
+= 5) {
1368 ASSERT_OK(Put("key_" + std::to_string(i
), "val_l0_" + std::to_string(i
)));
1370 if (num_keys
== 8) {
1379 ASSERT_EQ(0, num_keys
);
1381 for (int i
= 0; i
< 128; i
+= 9) {
1382 ASSERT_OK(Put("key_" + std::to_string(i
), "val_mem_" + std::to_string(i
)));
1385 std::vector
<std::string
> keys
;
1386 std::vector
<std::string
> values
;
1388 for (int i
= 64; i
< 80; ++i
) {
1389 keys
.push_back("key_" + std::to_string(i
));
1392 values
= MultiGet(keys
, nullptr);
1393 ASSERT_EQ(values
.size(), 16);
1394 for (unsigned int j
= 0; j
< values
.size(); ++j
) {
1397 ASSERT_EQ(values
[j
], "val_mem_" + std::to_string(key
));
1398 } else if (key
% 5 == 0) {
1399 ASSERT_EQ(values
[j
], "val_l0_" + std::to_string(key
));
1400 } else if (key
% 3 == 0) {
1401 ASSERT_EQ(values
[j
], "val_l1_" + std::to_string(key
));
1403 ASSERT_EQ(values
[j
], "val_l2_" + std::to_string(key
));
1408 TEST_F(DBBasicTest
, MultiGetBatchedMultiLevelMerge
) {
1409 Options options
= CurrentOptions();
1410 options
.disable_auto_compactions
= true;
1411 options
.merge_operator
= MergeOperators::CreateStringAppendOperator();
1412 BlockBasedTableOptions bbto
;
1413 bbto
.filter_policy
.reset(NewBloomFilterPolicy(10, false));
1414 options
.table_factory
.reset(NewBlockBasedTableFactory(bbto
));
1418 for (int i
= 0; i
< 128; ++i
) {
1419 ASSERT_OK(Put("key_" + std::to_string(i
), "val_l2_" + std::to_string(i
)));
1421 if (num_keys
== 8) {
1430 MoveFilesToLevel(2);
1432 for (int i
= 0; i
< 128; i
+= 3) {
1433 ASSERT_OK(Merge("key_" + std::to_string(i
), "val_l1_" + std::to_string(i
)));
1435 if (num_keys
== 8) {
1444 MoveFilesToLevel(1);
1446 for (int i
= 0; i
< 128; i
+= 5) {
1447 ASSERT_OK(Merge("key_" + std::to_string(i
), "val_l0_" + std::to_string(i
)));
1449 if (num_keys
== 8) {
1458 ASSERT_EQ(0, num_keys
);
1460 for (int i
= 0; i
< 128; i
+= 9) {
1461 ASSERT_OK(Merge("key_" + std::to_string(i
), "val_mem_" + std::to_string(i
)));
1464 std::vector
<std::string
> keys
;
1465 std::vector
<std::string
> values
;
1467 for (int i
= 32; i
< 80; ++i
) {
1468 keys
.push_back("key_" + std::to_string(i
));
1471 values
= MultiGet(keys
, nullptr);
1472 ASSERT_EQ(values
.size(), keys
.size());
1473 for (unsigned int j
= 0; j
< 48; ++j
) {
1476 value
.append("val_l2_" + std::to_string(key
));
1479 value
.append("val_l1_" + std::to_string(key
));
1483 value
.append("val_l0_" + std::to_string(key
));
1487 value
.append("val_mem_" + std::to_string(key
));
1489 ASSERT_EQ(values
[j
], value
);
1493 // Test class for batched MultiGet with prefix extractor
1494 // Param bool - If true, use partitioned filters
1495 // If false, use full filter block
1496 class MultiGetPrefixExtractorTest
: public DBBasicTest
,
1497 public ::testing::WithParamInterface
<bool> {
1500 TEST_P(MultiGetPrefixExtractorTest
, Batched
) {
1501 Options options
= CurrentOptions();
1502 options
.prefix_extractor
.reset(NewFixedPrefixTransform(2));
1503 options
.memtable_prefix_bloom_size_ratio
= 10;
1504 BlockBasedTableOptions bbto
;
1506 bbto
.index_type
= BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch
;
1507 bbto
.partition_filters
= true;
1509 bbto
.filter_policy
.reset(NewBloomFilterPolicy(10, false));
1510 bbto
.whole_key_filtering
= false;
1511 bbto
.cache_index_and_filter_blocks
= false;
1512 options
.table_factory
.reset(NewBlockBasedTableFactory(bbto
));
1515 SetPerfLevel(kEnableCount
);
1516 get_perf_context()->Reset();
1518 // First key is not in the prefix_extractor domain
1519 ASSERT_OK(Put("k", "v0"));
1520 ASSERT_OK(Put("kk1", "v1"));
1521 ASSERT_OK(Put("kk2", "v2"));
1522 ASSERT_OK(Put("kk3", "v3"));
1523 ASSERT_OK(Put("kk4", "v4"));
1524 std::vector
<std::string
> mem_keys(
1525 {"k", "kk1", "kk2", "kk3", "kk4", "rofl", "lmho"});
1526 std::vector
<std::string
> inmem_values
;
1527 inmem_values
= MultiGet(mem_keys
, nullptr);
1528 ASSERT_EQ(inmem_values
[0], "v0");
1529 ASSERT_EQ(inmem_values
[1], "v1");
1530 ASSERT_EQ(inmem_values
[2], "v2");
1531 ASSERT_EQ(inmem_values
[3], "v3");
1532 ASSERT_EQ(inmem_values
[4], "v4");
1533 ASSERT_EQ(get_perf_context()->bloom_memtable_miss_count
, 2);
1534 ASSERT_EQ(get_perf_context()->bloom_memtable_hit_count
, 5);
1537 std::vector
<std::string
> keys({"k", "kk1", "kk2", "kk3", "kk4"});
1538 std::vector
<std::string
> values
;
1539 get_perf_context()->Reset();
1540 values
= MultiGet(keys
, nullptr);
1541 ASSERT_EQ(values
[0], "v0");
1542 ASSERT_EQ(values
[1], "v1");
1543 ASSERT_EQ(values
[2], "v2");
1544 ASSERT_EQ(values
[3], "v3");
1545 ASSERT_EQ(values
[4], "v4");
1546 // Filter hits for 4 in-domain keys
1547 ASSERT_EQ(get_perf_context()->bloom_sst_hit_count
, 4);
1550 INSTANTIATE_TEST_CASE_P(MultiGetPrefix
, MultiGetPrefixExtractorTest
,
1553 #ifndef ROCKSDB_LITE
1554 class DBMultiGetRowCacheTest
: public DBBasicTest
,
1555 public ::testing::WithParamInterface
<bool> {};
1557 TEST_P(DBMultiGetRowCacheTest
, MultiGetBatched
) {
1559 option_config_
= kRowCache
;
1560 Options options
= CurrentOptions();
1561 options
.statistics
= ROCKSDB_NAMESPACE::CreateDBStatistics();
1562 CreateAndReopenWithCF({"pikachu"}, options
);
1563 SetPerfLevel(kEnableCount
);
1564 ASSERT_OK(Put(1, "k1", "v1"));
1565 ASSERT_OK(Put(1, "k2", "v2"));
1566 ASSERT_OK(Put(1, "k3", "v3"));
1567 ASSERT_OK(Put(1, "k4", "v4"));
1569 ASSERT_OK(Put(1, "k5", "v5"));
1570 const Snapshot
* snap1
= dbfull()->GetSnapshot();
1571 ASSERT_OK(Delete(1, "k4"));
1573 const Snapshot
* snap2
= dbfull()->GetSnapshot();
1575 get_perf_context()->Reset();
1577 std::vector
<Slice
> keys({"no_key", "k5", "k4", "k3", "k1"});
1578 std::vector
<PinnableSlice
> values(keys
.size());
1579 std::vector
<ColumnFamilyHandle
*> cfs(keys
.size(), handles_
[1]);
1580 std::vector
<Status
> s(keys
.size());
1583 bool use_snapshots
= GetParam();
1584 if (use_snapshots
) {
1585 ro
.snapshot
= snap2
;
1587 db_
->MultiGet(ro
, handles_
[1], keys
.size(), keys
.data(), values
.data(),
1590 ASSERT_EQ(values
.size(), keys
.size());
1591 ASSERT_EQ(std::string(values
[4].data(), values
[4].size()), "v1");
1592 ASSERT_EQ(std::string(values
[3].data(), values
[3].size()), "v3");
1593 ASSERT_EQ(std::string(values
[1].data(), values
[1].size()), "v5");
1594 // four kv pairs * two bytes per value
1595 ASSERT_EQ(6, (int)get_perf_context()->multiget_read_bytes
);
1597 ASSERT_TRUE(s
[0].IsNotFound());
1599 ASSERT_TRUE(s
[2].IsNotFound());
1603 // Call MultiGet() again with some intersection with the previous set of
1604 // keys. Those should already be in the row cache.
1605 keys
.assign({"no_key", "k5", "k3", "k2"});
1606 for (size_t i
= 0; i
< keys
.size(); ++i
) {
1608 s
[i
] = Status::OK();
1610 get_perf_context()->Reset();
1612 if (use_snapshots
) {
1613 ro
.snapshot
= snap1
;
1615 db_
->MultiGet(ReadOptions(), handles_
[1], keys
.size(), keys
.data(),
1616 values
.data(), s
.data(), false);
1618 ASSERT_EQ(std::string(values
[3].data(), values
[3].size()), "v2");
1619 ASSERT_EQ(std::string(values
[2].data(), values
[2].size()), "v3");
1620 ASSERT_EQ(std::string(values
[1].data(), values
[1].size()), "v5");
1621 // four kv pairs * two bytes per value
1622 ASSERT_EQ(6, (int)get_perf_context()->multiget_read_bytes
);
1624 ASSERT_TRUE(s
[0].IsNotFound());
1628 if (use_snapshots
) {
1629 // Only reads from the first SST file would have been cached, since
1630 // snapshot seq no is > fd.largest_seqno
1631 ASSERT_EQ(1, TestGetTickerCount(options
, ROW_CACHE_HIT
));
1633 ASSERT_EQ(2, TestGetTickerCount(options
, ROW_CACHE_HIT
));
1636 SetPerfLevel(kDisable
);
1637 dbfull()->ReleaseSnapshot(snap1
);
1638 dbfull()->ReleaseSnapshot(snap2
);
1639 } while (ChangeCompactOptions());
1642 INSTANTIATE_TEST_CASE_P(DBMultiGetRowCacheTest
, DBMultiGetRowCacheTest
,
1643 testing::Values(true, false));
1645 TEST_F(DBBasicTest
, GetAllKeyVersions
) {
1646 Options options
= CurrentOptions();
1648 options
.create_if_missing
= true;
1649 options
.disable_auto_compactions
= true;
1650 CreateAndReopenWithCF({"pikachu"}, options
);
1651 ASSERT_EQ(2, handles_
.size());
1652 const size_t kNumInserts
= 4;
1653 const size_t kNumDeletes
= 4;
1654 const size_t kNumUpdates
= 4;
1656 // Check default column family
1657 for (size_t i
= 0; i
!= kNumInserts
; ++i
) {
1658 ASSERT_OK(Put(std::to_string(i
), "value"));
1660 for (size_t i
= 0; i
!= kNumUpdates
; ++i
) {
1661 ASSERT_OK(Put(std::to_string(i
), "value1"));
1663 for (size_t i
= 0; i
!= kNumDeletes
; ++i
) {
1664 ASSERT_OK(Delete(std::to_string(i
)));
1666 std::vector
<KeyVersion
> key_versions
;
1667 ASSERT_OK(ROCKSDB_NAMESPACE::GetAllKeyVersions(
1668 db_
, Slice(), Slice(), std::numeric_limits
<size_t>::max(),
1670 ASSERT_EQ(kNumInserts
+ kNumDeletes
+ kNumUpdates
, key_versions
.size());
1671 ASSERT_OK(ROCKSDB_NAMESPACE::GetAllKeyVersions(
1672 db_
, handles_
[0], Slice(), Slice(), std::numeric_limits
<size_t>::max(),
1674 ASSERT_EQ(kNumInserts
+ kNumDeletes
+ kNumUpdates
, key_versions
.size());
1676 // Check non-default column family
1677 for (size_t i
= 0; i
!= kNumInserts
- 1; ++i
) {
1678 ASSERT_OK(Put(1, std::to_string(i
), "value"));
1680 for (size_t i
= 0; i
!= kNumUpdates
- 1; ++i
) {
1681 ASSERT_OK(Put(1, std::to_string(i
), "value1"));
1683 for (size_t i
= 0; i
!= kNumDeletes
- 1; ++i
) {
1684 ASSERT_OK(Delete(1, std::to_string(i
)));
1686 ASSERT_OK(ROCKSDB_NAMESPACE::GetAllKeyVersions(
1687 db_
, handles_
[1], Slice(), Slice(), std::numeric_limits
<size_t>::max(),
1689 ASSERT_EQ(kNumInserts
+ kNumDeletes
+ kNumUpdates
- 3, key_versions
.size());
1691 #endif // !ROCKSDB_LITE
1693 TEST_F(DBBasicTest
, MultiGetIOBufferOverrun
) {
1694 Options options
= CurrentOptions();
1696 BlockBasedTableOptions table_options
;
1697 table_options
.pin_l0_filter_and_index_blocks_in_cache
= true;
1698 table_options
.block_size
= 16 * 1024;
1699 assert(table_options
.block_size
>
1700 BlockBasedTable::kMultiGetReadStackBufSize
);
1701 options
.table_factory
.reset(new BlockBasedTableFactory(table_options
));
1704 std::string
zero_str(128, '\0');
1705 for (int i
= 0; i
< 100; ++i
) {
1706 // Make the value compressible. A purely random string doesn't compress
1707 // and the resultant data block will not be compressed
1708 std::string
value(RandomString(&rnd
, 128) + zero_str
);
1709 assert(Put(Key(i
), value
) == Status::OK());
1713 std::vector
<std::string
> key_data(10);
1714 std::vector
<Slice
> keys
;
1715 // We cannot resize a PinnableSlice vector, so just set initial size to
1716 // largest we think we will need
1717 std::vector
<PinnableSlice
> values(10);
1718 std::vector
<Status
> statuses
;
1721 // Warm up the cache first
1722 key_data
.emplace_back(Key(0));
1723 keys
.emplace_back(Slice(key_data
.back()));
1724 key_data
.emplace_back(Key(50));
1725 keys
.emplace_back(Slice(key_data
.back()));
1726 statuses
.resize(keys
.size());
1728 dbfull()->MultiGet(ro
, dbfull()->DefaultColumnFamily(), keys
.size(),
1729 keys
.data(), values
.data(), statuses
.data(), true);
1732 class DBBasicTestWithParallelIO
1733 : public DBTestBase
,
1734 public testing::WithParamInterface
<std::tuple
<bool, bool, bool, bool>> {
1736 DBBasicTestWithParallelIO() : DBTestBase("/db_basic_test_with_parallel_io") {
1737 bool compressed_cache
= std::get
<0>(GetParam());
1738 bool uncompressed_cache
= std::get
<1>(GetParam());
1739 compression_enabled_
= std::get
<2>(GetParam());
1740 fill_cache_
= std::get
<3>(GetParam());
1742 if (compressed_cache
) {
1743 std::shared_ptr
<Cache
> cache
= NewLRUCache(1048576);
1744 compressed_cache_
= std::make_shared
<MyBlockCache
>(cache
);
1746 if (uncompressed_cache
) {
1747 std::shared_ptr
<Cache
> cache
= NewLRUCache(1048576);
1748 uncompressed_cache_
= std::make_shared
<MyBlockCache
>(cache
);
1751 env_
->count_random_reads_
= true;
1753 Options options
= CurrentOptions();
1755 BlockBasedTableOptions table_options
;
1757 #ifndef ROCKSDB_LITE
1758 if (compression_enabled_
) {
1759 std::vector
<CompressionType
> compression_types
;
1760 compression_types
= GetSupportedCompressions();
1761 // Not every platform may have compression libraries available, so
1762 // dynamically pick based on what's available
1763 if (compression_types
.size() == 0) {
1764 compression_enabled_
= false;
1766 options
.compression
= compression_types
[0];
1770 // GetSupportedCompressions() is not available in LITE build
1771 if (!Snappy_Supported()) {
1772 compression_enabled_
= false;
1774 #endif //ROCKSDB_LITE
1776 table_options
.block_cache
= uncompressed_cache_
;
1777 if (table_options
.block_cache
== nullptr) {
1778 table_options
.no_block_cache
= true;
1780 table_options
.pin_l0_filter_and_index_blocks_in_cache
= true;
1782 table_options
.block_cache_compressed
= compressed_cache_
;
1783 table_options
.flush_block_policy_factory
.reset(
1784 new MyFlushBlockPolicyFactory());
1785 options
.table_factory
.reset(new BlockBasedTableFactory(table_options
));
1786 if (!compression_enabled_
) {
1787 options
.compression
= kNoCompression
;
1791 std::string
zero_str(128, '\0');
1792 for (int i
= 0; i
< 100; ++i
) {
1793 // Make the value compressible. A purely random string doesn't compress
1794 // and the resultant data block will not be compressed
1795 values_
.emplace_back(RandomString(&rnd
, 128) + zero_str
);
1796 assert(Put(Key(i
), values_
[i
]) == Status::OK());
1800 for (int i
= 0; i
< 100; ++i
) {
1801 // block cannot gain space by compression
1802 uncompressable_values_
.emplace_back(RandomString(&rnd
, 256) + '\0');
1803 std::string tmp_key
= "a" + Key(i
);
1804 assert(Put(tmp_key
, uncompressable_values_
[i
]) == Status::OK());
1809 bool CheckValue(int i
, const std::string
& value
) {
1810 if (values_
[i
].compare(value
) == 0) {
1816 bool CheckUncompressableValue(int i
, const std::string
& value
) {
1817 if (uncompressable_values_
[i
].compare(value
) == 0) {
1823 int num_lookups() { return uncompressed_cache_
->num_lookups(); }
1824 int num_found() { return uncompressed_cache_
->num_found(); }
1825 int num_inserts() { return uncompressed_cache_
->num_inserts(); }
1827 int num_lookups_compressed() { return compressed_cache_
->num_lookups(); }
1828 int num_found_compressed() { return compressed_cache_
->num_found(); }
1829 int num_inserts_compressed() { return compressed_cache_
->num_inserts(); }
1831 bool fill_cache() { return fill_cache_
; }
1832 bool compression_enabled() { return compression_enabled_
; }
1833 bool has_compressed_cache() { return compressed_cache_
!= nullptr; }
1834 bool has_uncompressed_cache() { return uncompressed_cache_
!= nullptr; }
1836 static void SetUpTestCase() {}
1837 static void TearDownTestCase() {}
1840 class MyFlushBlockPolicyFactory
: public FlushBlockPolicyFactory
{
1842 MyFlushBlockPolicyFactory() {}
1844 virtual const char* Name() const override
{
1845 return "MyFlushBlockPolicyFactory";
1848 virtual FlushBlockPolicy
* NewFlushBlockPolicy(
1849 const BlockBasedTableOptions
& /*table_options*/,
1850 const BlockBuilder
& data_block_builder
) const override
{
1851 return new MyFlushBlockPolicy(data_block_builder
);
1855 class MyFlushBlockPolicy
: public FlushBlockPolicy
{
1857 explicit MyFlushBlockPolicy(const BlockBuilder
& data_block_builder
)
1858 : num_keys_(0), data_block_builder_(data_block_builder
) {}
1860 bool Update(const Slice
& /*key*/, const Slice
& /*value*/) override
{
1861 if (data_block_builder_
.empty()) {
1862 // First key in this block
1866 // Flush every 10 keys
1867 if (num_keys_
== 10) {
1877 const BlockBuilder
& data_block_builder_
;
1880 class MyBlockCache
: public Cache
{
1882 explicit MyBlockCache(std::shared_ptr
<Cache
>& target
)
1883 : target_(target
), num_lookups_(0), num_found_(0), num_inserts_(0) {}
1885 virtual const char* Name() const override
{ return "MyBlockCache"; }
1887 virtual Status
Insert(const Slice
& key
, void* value
, size_t charge
,
1888 void (*deleter
)(const Slice
& key
, void* value
),
1889 Handle
** handle
= nullptr,
1890 Priority priority
= Priority::LOW
) override
{
1892 return target_
->Insert(key
, value
, charge
, deleter
, handle
, priority
);
1895 virtual Handle
* Lookup(const Slice
& key
,
1896 Statistics
* stats
= nullptr) override
{
1898 Handle
* handle
= target_
->Lookup(key
, stats
);
1899 if (handle
!= nullptr) {
1905 virtual bool Ref(Handle
* handle
) override
{ return target_
->Ref(handle
); }
1907 virtual bool Release(Handle
* handle
, bool force_erase
= false) override
{
1908 return target_
->Release(handle
, force_erase
);
1911 virtual void* Value(Handle
* handle
) override
{
1912 return target_
->Value(handle
);
1915 virtual void Erase(const Slice
& key
) override
{ target_
->Erase(key
); }
1916 virtual uint64_t NewId() override
{ return target_
->NewId(); }
1918 virtual void SetCapacity(size_t capacity
) override
{
1919 target_
->SetCapacity(capacity
);
1922 virtual void SetStrictCapacityLimit(bool strict_capacity_limit
) override
{
1923 target_
->SetStrictCapacityLimit(strict_capacity_limit
);
1926 virtual bool HasStrictCapacityLimit() const override
{
1927 return target_
->HasStrictCapacityLimit();
1930 virtual size_t GetCapacity() const override
{
1931 return target_
->GetCapacity();
1934 virtual size_t GetUsage() const override
{ return target_
->GetUsage(); }
1936 virtual size_t GetUsage(Handle
* handle
) const override
{
1937 return target_
->GetUsage(handle
);
1940 virtual size_t GetPinnedUsage() const override
{
1941 return target_
->GetPinnedUsage();
1944 virtual size_t GetCharge(Handle
* /*handle*/) const override
{ return 0; }
1946 virtual void ApplyToAllCacheEntries(void (*callback
)(void*, size_t),
1947 bool thread_safe
) override
{
1948 return target_
->ApplyToAllCacheEntries(callback
, thread_safe
);
1951 virtual void EraseUnRefEntries() override
{
1952 return target_
->EraseUnRefEntries();
1955 int num_lookups() { return num_lookups_
; }
1957 int num_found() { return num_found_
; }
1959 int num_inserts() { return num_inserts_
; }
1962 std::shared_ptr
<Cache
> target_
;
1968 std::shared_ptr
<MyBlockCache
> compressed_cache_
;
1969 std::shared_ptr
<MyBlockCache
> uncompressed_cache_
;
1970 bool compression_enabled_
;
1971 std::vector
<std::string
> values_
;
1972 std::vector
<std::string
> uncompressable_values_
;
1976 TEST_P(DBBasicTestWithParallelIO
, MultiGet
) {
1977 std::vector
<std::string
> key_data(10);
1978 std::vector
<Slice
> keys
;
1979 // We cannot resize a PinnableSlice vector, so just set initial size to
1980 // largest we think we will need
1981 std::vector
<PinnableSlice
> values(10);
1982 std::vector
<Status
> statuses
;
1984 ro
.fill_cache
= fill_cache();
1986 // Warm up the cache first
1987 key_data
.emplace_back(Key(0));
1988 keys
.emplace_back(Slice(key_data
.back()));
1989 key_data
.emplace_back(Key(50));
1990 keys
.emplace_back(Slice(key_data
.back()));
1991 statuses
.resize(keys
.size());
1993 dbfull()->MultiGet(ro
, dbfull()->DefaultColumnFamily(), keys
.size(),
1994 keys
.data(), values
.data(), statuses
.data(), true);
1995 ASSERT_TRUE(CheckValue(0, values
[0].ToString()));
1996 ASSERT_TRUE(CheckValue(50, values
[1].ToString()));
1998 int random_reads
= env_
->random_read_counter_
.Read();
1999 key_data
[0] = Key(1);
2000 key_data
[1] = Key(51);
2001 keys
[0] = Slice(key_data
[0]);
2002 keys
[1] = Slice(key_data
[1]);
2005 dbfull()->MultiGet(ro
, dbfull()->DefaultColumnFamily(), keys
.size(),
2006 keys
.data(), values
.data(), statuses
.data(), true);
2007 ASSERT_TRUE(CheckValue(1, values
[0].ToString()));
2008 ASSERT_TRUE(CheckValue(51, values
[1].ToString()));
2010 bool read_from_cache
= false;
2012 if (has_uncompressed_cache()) {
2013 read_from_cache
= true;
2014 } else if (has_compressed_cache() && compression_enabled()) {
2015 read_from_cache
= true;
2019 int expected_reads
= random_reads
+ (read_from_cache
? 0 : 2);
2020 ASSERT_EQ(env_
->random_read_counter_
.Read(), expected_reads
);
2023 statuses
.resize(10);
2024 std::vector
<int> key_ints
{1, 2, 15, 16, 55, 81, 82, 83, 84, 85};
2025 for (size_t i
= 0; i
< key_ints
.size(); ++i
) {
2026 key_data
[i
] = Key(key_ints
[i
]);
2027 keys
[i
] = Slice(key_data
[i
]);
2028 statuses
[i
] = Status::OK();
2031 dbfull()->MultiGet(ro
, dbfull()->DefaultColumnFamily(), keys
.size(),
2032 keys
.data(), values
.data(), statuses
.data(), true);
2033 for (size_t i
= 0; i
< key_ints
.size(); ++i
) {
2034 ASSERT_OK(statuses
[i
]);
2035 ASSERT_TRUE(CheckValue(key_ints
[i
], values
[i
].ToString()));
2037 if (compression_enabled() && !has_compressed_cache()) {
2038 expected_reads
+= (read_from_cache
? 2 : 3);
2040 expected_reads
+= (read_from_cache
? 2 : 4);
2042 ASSERT_EQ(env_
->random_read_counter_
.Read(), expected_reads
);
2045 statuses
.resize(10);
2046 std::vector
<int> key_uncmp
{1, 2, 15, 16, 55, 81, 82, 83, 84, 85};
2047 for (size_t i
= 0; i
< key_uncmp
.size(); ++i
) {
2048 key_data
[i
] = "a" + Key(key_uncmp
[i
]);
2049 keys
[i
] = Slice(key_data
[i
]);
2050 statuses
[i
] = Status::OK();
2053 dbfull()->MultiGet(ro
, dbfull()->DefaultColumnFamily(), keys
.size(),
2054 keys
.data(), values
.data(), statuses
.data(), true);
2055 for (size_t i
= 0; i
< key_uncmp
.size(); ++i
) {
2056 ASSERT_OK(statuses
[i
]);
2057 ASSERT_TRUE(CheckUncompressableValue(key_uncmp
[i
], values
[i
].ToString()));
2059 if (compression_enabled() && !has_compressed_cache()) {
2060 expected_reads
+= (read_from_cache
? 3 : 3);
2062 expected_reads
+= (read_from_cache
? 4 : 4);
2064 ASSERT_EQ(env_
->random_read_counter_
.Read(), expected_reads
);
2068 std::vector
<int> key_tr
{1, 2, 15, 16, 55};
2069 for (size_t i
= 0; i
< key_tr
.size(); ++i
) {
2070 key_data
[i
] = "a" + Key(key_tr
[i
]);
2071 keys
[i
] = Slice(key_data
[i
]);
2072 statuses
[i
] = Status::OK();
2075 dbfull()->MultiGet(ro
, dbfull()->DefaultColumnFamily(), keys
.size(),
2076 keys
.data(), values
.data(), statuses
.data(), true);
2077 for (size_t i
= 0; i
< key_tr
.size(); ++i
) {
2078 ASSERT_OK(statuses
[i
]);
2079 ASSERT_TRUE(CheckUncompressableValue(key_tr
[i
], values
[i
].ToString()));
2081 if (compression_enabled() && !has_compressed_cache()) {
2082 expected_reads
+= (read_from_cache
? 0 : 2);
2083 ASSERT_EQ(env_
->random_read_counter_
.Read(), expected_reads
);
2085 if (has_uncompressed_cache()) {
2086 expected_reads
+= (read_from_cache
? 0 : 3);
2087 ASSERT_EQ(env_
->random_read_counter_
.Read(), expected_reads
);
2089 // A rare case, even we enable the block compression but some of data
2090 // blocks are not compressed due to content. If user only enable the
2091 // compressed cache, the uncompressed blocks will not tbe cached, and
2092 // block reads will be triggered. The number of reads is related to
2093 // the compression algorithm.
2094 ASSERT_TRUE(env_
->random_read_counter_
.Read() >= expected_reads
);
2099 TEST_P(DBBasicTestWithParallelIO
, MultiGetWithChecksumMismatch
) {
2100 std::vector
<std::string
> key_data(10);
2101 std::vector
<Slice
> keys
;
2102 // We cannot resize a PinnableSlice vector, so just set initial size to
2103 // largest we think we will need
2104 std::vector
<PinnableSlice
> values(10);
2105 std::vector
<Status
> statuses
;
2108 ro
.fill_cache
= fill_cache();
2110 SyncPoint::GetInstance()->SetCallBack(
2111 "RetrieveMultipleBlocks:VerifyChecksum", [&](void *status
) {
2112 Status
* s
= static_cast<Status
*>(status
);
2114 if (read_count
== 2) {
2115 *s
= Status::Corruption();
2118 SyncPoint::GetInstance()->EnableProcessing();
2120 // Warm up the cache first
2121 key_data
.emplace_back(Key(0));
2122 keys
.emplace_back(Slice(key_data
.back()));
2123 key_data
.emplace_back(Key(50));
2124 keys
.emplace_back(Slice(key_data
.back()));
2125 statuses
.resize(keys
.size());
2127 dbfull()->MultiGet(ro
, dbfull()->DefaultColumnFamily(), keys
.size(),
2128 keys
.data(), values
.data(), statuses
.data(), true);
2129 ASSERT_TRUE(CheckValue(0, values
[0].ToString()));
2130 //ASSERT_TRUE(CheckValue(50, values[1].ToString()));
2131 ASSERT_EQ(statuses
[0], Status::OK());
2132 ASSERT_EQ(statuses
[1], Status::Corruption());
2134 SyncPoint::GetInstance()->DisableProcessing();
2137 TEST_P(DBBasicTestWithParallelIO
, MultiGetWithMissingFile
) {
2138 std::vector
<std::string
> key_data(10);
2139 std::vector
<Slice
> keys
;
2140 // We cannot resize a PinnableSlice vector, so just set initial size to
2141 // largest we think we will need
2142 std::vector
<PinnableSlice
> values(10);
2143 std::vector
<Status
> statuses
;
2145 ro
.fill_cache
= fill_cache();
2147 SyncPoint::GetInstance()->SetCallBack(
2148 "TableCache::MultiGet:FindTable", [&](void *status
) {
2149 Status
* s
= static_cast<Status
*>(status
);
2150 *s
= Status::IOError();
2152 // DB open will create table readers unless we reduce the table cache
2154 // SanitizeOptions will set max_open_files to minimum of 20. Table cache
2155 // is allocated with max_open_files - 10 as capacity. So override
2156 // max_open_files to 11 so table cache capacity will become 1. This will
2157 // prevent file open during DB open and force the file to be opened
2159 SyncPoint::GetInstance()->SetCallBack(
2160 "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void *arg
) {
2161 int* max_open_files
= (int*)arg
;
2162 *max_open_files
= 11;
2164 SyncPoint::GetInstance()->EnableProcessing();
2166 Reopen(CurrentOptions());
2168 // Warm up the cache first
2169 key_data
.emplace_back(Key(0));
2170 keys
.emplace_back(Slice(key_data
.back()));
2171 key_data
.emplace_back(Key(50));
2172 keys
.emplace_back(Slice(key_data
.back()));
2173 statuses
.resize(keys
.size());
2175 dbfull()->MultiGet(ro
, dbfull()->DefaultColumnFamily(), keys
.size(),
2176 keys
.data(), values
.data(), statuses
.data(), true);
2177 ASSERT_EQ(statuses
[0], Status::IOError());
2178 ASSERT_EQ(statuses
[1], Status::IOError());
2180 SyncPoint::GetInstance()->DisableProcessing();
2183 INSTANTIATE_TEST_CASE_P(
2184 ParallelIO
, DBBasicTestWithParallelIO
,
2185 // Params are as follows -
2186 // Param 0 - Compressed cache enabled
2187 // Param 1 - Uncompressed cache enabled
2188 // Param 2 - Data compression enabled
2189 // Param 3 - ReadOptions::fill_cache
2190 ::testing::Combine(::testing::Bool(), ::testing::Bool(),
2191 ::testing::Bool(), ::testing::Bool()));
2193 class DBBasicTestWithTimestampBase
: public DBTestBase
{
2195 explicit DBBasicTestWithTimestampBase(const std::string
& dbname
)
2196 : DBTestBase(dbname
) {}
2199 class TestComparatorBase
: public Comparator
{
2201 explicit TestComparatorBase(size_t ts_sz
) : Comparator(ts_sz
) {}
2203 const char* Name() const override
{ return "TestComparator"; }
2205 void FindShortSuccessor(std::string
*) const override
{}
2207 void FindShortestSeparator(std::string
*, const Slice
&) const override
{}
2209 int Compare(const Slice
& a
, const Slice
& b
) const override
{
2210 int r
= CompareWithoutTimestamp(a
, b
);
2211 if (r
!= 0 || 0 == timestamp_size()) {
2214 return CompareTimestamp(
2215 Slice(a
.data() + a
.size() - timestamp_size(), timestamp_size()),
2216 Slice(b
.data() + b
.size() - timestamp_size(), timestamp_size()));
2219 virtual int CompareImpl(const Slice
& a
, const Slice
& b
) const = 0;
2221 int CompareWithoutTimestamp(const Slice
& a
, const Slice
& b
) const override
{
2222 assert(a
.size() >= timestamp_size());
2223 assert(b
.size() >= timestamp_size());
2224 Slice k1
= StripTimestampFromUserKey(a
, timestamp_size());
2225 Slice k2
= StripTimestampFromUserKey(b
, timestamp_size());
2227 return CompareImpl(k1
, k2
);
2230 int CompareTimestamp(const Slice
& ts1
, const Slice
& ts2
) const override
{
2231 if (!ts1
.data() && !ts2
.data()) {
2233 } else if (ts1
.data() && !ts2
.data()) {
2235 } else if (!ts1
.data() && ts2
.data()) {
2238 assert(ts1
.size() == ts2
.size());
2243 auto* ptr1
= const_cast<Slice
*>(&ts1
);
2244 auto* ptr2
= const_cast<Slice
*>(&ts2
);
2245 if (!GetFixed64(ptr1
, &low1
) || !GetFixed64(ptr1
, &high1
) ||
2246 !GetFixed64(ptr2
, &low2
) || !GetFixed64(ptr2
, &high2
)) {
2249 if (high1
< high2
) {
2251 } else if (high1
> high2
) {
2256 } else if (low1
> low2
) {
2263 Slice
EncodeTimestamp(uint64_t low
, uint64_t high
, std::string
* ts
) {
2264 assert(nullptr != ts
);
2266 PutFixed64(ts
, low
);
2267 PutFixed64(ts
, high
);
2268 assert(ts
->size() == sizeof(low
) + sizeof(high
));
2273 class DBBasicTestWithTimestamp
: public DBBasicTestWithTimestampBase
{
2275 DBBasicTestWithTimestamp()
2276 : DBBasicTestWithTimestampBase("/db_basic_test_with_timestamp") {}
2279 class TestComparator
: public TestComparatorBase
{
2281 const int kKeyPrefixLength
=
2282 3; // 3: length of "key" in generated keys ("key" + std::to_string(j))
2283 explicit TestComparator(size_t ts_sz
) : TestComparatorBase(ts_sz
) {}
2285 int CompareImpl(const Slice
& a
, const Slice
& b
) const override
{
2287 std::string(a
.data() + kKeyPrefixLength
, a
.size() - kKeyPrefixLength
)
2290 std::string(b
.data() + kKeyPrefixLength
, b
.size() - kKeyPrefixLength
)
2292 return (n1
< n2
) ? -1 : (n1
> n2
) ? 1 : 0;
2297 #ifndef ROCKSDB_LITE
2298 // A class which remembers the name of each flushed file.
2299 class FlushedFileCollector
: public EventListener
{
2301 FlushedFileCollector() {}
2302 ~FlushedFileCollector() override
{}
2304 void OnFlushCompleted(DB
* /*db*/, const FlushJobInfo
& info
) override
{
2305 InstrumentedMutexLock
lock(&mutex_
);
2306 flushed_files_
.push_back(info
.file_path
);
2309 std::vector
<std::string
> GetFlushedFiles() {
2310 std::vector
<std::string
> result
;
2312 InstrumentedMutexLock
lock(&mutex_
);
2313 result
= flushed_files_
;
2318 void ClearFlushedFiles() {
2319 InstrumentedMutexLock
lock(&mutex_
);
2320 flushed_files_
.clear();
2324 std::vector
<std::string
> flushed_files_
;
2325 InstrumentedMutex mutex_
;
2328 TEST_F(DBBasicTestWithTimestamp
, PutAndGetWithCompaction
) {
2329 const int kNumKeysPerFile
= 8192;
2330 const size_t kNumTimestamps
= 2;
2331 const size_t kNumKeysPerTimestamp
= (kNumKeysPerFile
- 1) / kNumTimestamps
;
2332 const size_t kSplitPosBase
= kNumKeysPerTimestamp
/ 2;
2333 Options options
= CurrentOptions();
2334 options
.create_if_missing
= true;
2336 options
.memtable_factory
.reset(new SpecialSkipListFactory(kNumKeysPerFile
));
2338 FlushedFileCollector
* collector
= new FlushedFileCollector();
2339 options
.listeners
.emplace_back(collector
);
2342 size_t ts_sz
= EncodeTimestamp(0, 0, &tmp
).size();
2343 TestComparator
test_cmp(ts_sz
);
2344 options
.comparator
= &test_cmp
;
2345 BlockBasedTableOptions bbto
;
2346 bbto
.filter_policy
.reset(NewBloomFilterPolicy(
2347 10 /*bits_per_key*/, false /*use_block_based_builder*/));
2348 bbto
.whole_key_filtering
= true;
2349 options
.table_factory
.reset(NewBlockBasedTableFactory(bbto
));
2350 DestroyAndReopen(options
);
2351 CreateAndReopenWithCF({"pikachu"}, options
);
2352 size_t num_cfs
= handles_
.size();
2353 ASSERT_EQ(2, num_cfs
);
2354 std::vector
<std::string
> write_ts_strs(kNumTimestamps
);
2355 std::vector
<std::string
> read_ts_strs(kNumTimestamps
);
2356 std::vector
<Slice
> write_ts_list
;
2357 std::vector
<Slice
> read_ts_list
;
2359 for (size_t i
= 0; i
!= kNumTimestamps
; ++i
) {
2360 write_ts_list
.emplace_back(EncodeTimestamp(i
* 2, 0, &write_ts_strs
[i
]));
2361 read_ts_list
.emplace_back(EncodeTimestamp(1 + i
* 2, 0, &read_ts_strs
[i
]));
2362 const Slice
& write_ts
= write_ts_list
.back();
2364 wopts
.timestamp
= &write_ts
;
2365 for (int cf
= 0; cf
!= static_cast<int>(num_cfs
); ++cf
) {
2366 for (size_t j
= 0; j
!= kNumKeysPerTimestamp
; ++j
) {
2367 ASSERT_OK(Put(cf
, "key" + std::to_string(j
),
2368 "value_" + std::to_string(j
) + "_" + std::to_string(i
),
2370 if (j
== kSplitPosBase
+ i
|| j
== kNumKeysPerTimestamp
- 1) {
2371 // flush all keys with the same timestamp to two sst files, split at
2372 // incremental positions such that lowerlevel[1].smallest.userkey ==
2373 // higherlevel[0].largest.userkey
2374 ASSERT_OK(Flush(cf
));
2376 // compact files (2 at each level) to a lower level such that all keys
2377 // with the same timestamp is at one level, with newer versions at
2379 CompactionOptions compact_opt
;
2380 compact_opt
.compression
= kNoCompression
;
2381 db_
->CompactFiles(compact_opt
, handles_
[cf
],
2382 collector
->GetFlushedFiles(),
2383 static_cast<int>(kNumTimestamps
- i
));
2384 collector
->ClearFlushedFiles();
2389 const auto& verify_db_func
= [&]() {
2390 for (size_t i
= 0; i
!= kNumTimestamps
; ++i
) {
2392 ropts
.timestamp
= &read_ts_list
[i
];
2393 for (int cf
= 0; cf
!= static_cast<int>(num_cfs
); ++cf
) {
2394 ColumnFamilyHandle
* cfh
= handles_
[cf
];
2395 for (size_t j
= 0; j
!= kNumKeysPerTimestamp
; ++j
) {
2397 ASSERT_OK(db_
->Get(ropts
, cfh
, "key" + std::to_string(j
), &value
));
2398 ASSERT_EQ("value_" + std::to_string(j
) + "_" + std::to_string(i
),
2406 #endif // !ROCKSDB_LITE
2408 class DBBasicTestWithTimestampWithParam
2409 : public DBBasicTestWithTimestampBase
,
2410 public testing::WithParamInterface
<bool> {
2412 DBBasicTestWithTimestampWithParam()
2413 : DBBasicTestWithTimestampBase(
2414 "/db_basic_test_with_timestamp_with_param") {}
2417 class TestComparator
: public TestComparatorBase
{
2419 const Comparator
* cmp_without_ts_
;
2422 explicit TestComparator(size_t ts_sz
)
2423 : TestComparatorBase(ts_sz
), cmp_without_ts_(nullptr) {
2424 cmp_without_ts_
= BytewiseComparator();
2427 int CompareImpl(const Slice
& a
, const Slice
& b
) const override
{
2428 return cmp_without_ts_
->Compare(a
, b
);
2433 TEST_P(DBBasicTestWithTimestampWithParam
, PutAndGet
) {
2434 const int kNumKeysPerFile
= 8192;
2435 const size_t kNumTimestamps
= 6;
2436 bool memtable_only
= GetParam();
2437 Options options
= CurrentOptions();
2438 options
.create_if_missing
= true;
2440 options
.memtable_factory
.reset(new SpecialSkipListFactory(kNumKeysPerFile
));
2442 size_t ts_sz
= EncodeTimestamp(0, 0, &tmp
).size();
2443 TestComparator
test_cmp(ts_sz
);
2444 options
.comparator
= &test_cmp
;
2445 BlockBasedTableOptions bbto
;
2446 bbto
.filter_policy
.reset(NewBloomFilterPolicy(
2447 10 /*bits_per_key*/, false /*use_block_based_builder*/));
2448 bbto
.whole_key_filtering
= true;
2449 options
.table_factory
.reset(NewBlockBasedTableFactory(bbto
));
2451 std::vector
<CompressionType
> compression_types
;
2452 compression_types
.push_back(kNoCompression
);
2453 if (Zlib_Supported()) {
2454 compression_types
.push_back(kZlibCompression
);
2456 #if LZ4_VERSION_NUMBER >= 10400 // r124+
2457 compression_types
.push_back(kLZ4Compression
);
2458 compression_types
.push_back(kLZ4HCCompression
);
2459 #endif // LZ4_VERSION_NUMBER >= 10400
2460 if (ZSTD_Supported()) {
2461 compression_types
.push_back(kZSTD
);
2464 // Switch compression dictionary on/off to check key extraction
2465 // correctness in kBuffered state
2466 std::vector
<uint32_t> max_dict_bytes_list
= {0, 1 << 14}; // 0 or 16KB
2468 for (auto compression_type
: compression_types
) {
2469 for (uint32_t max_dict_bytes
: max_dict_bytes_list
) {
2470 options
.compression
= compression_type
;
2471 options
.compression_opts
.max_dict_bytes
= max_dict_bytes
;
2472 if (compression_type
== kZSTD
) {
2473 options
.compression_opts
.zstd_max_train_bytes
= max_dict_bytes
;
2475 options
.target_file_size_base
= 1 << 26; // 64MB
2477 DestroyAndReopen(options
);
2478 CreateAndReopenWithCF({"pikachu"}, options
);
2479 size_t num_cfs
= handles_
.size();
2480 ASSERT_EQ(2, num_cfs
);
2481 std::vector
<std::string
> write_ts_strs(kNumTimestamps
);
2482 std::vector
<std::string
> read_ts_strs(kNumTimestamps
);
2483 std::vector
<Slice
> write_ts_list
;
2484 std::vector
<Slice
> read_ts_list
;
2486 for (size_t i
= 0; i
!= kNumTimestamps
; ++i
) {
2487 write_ts_list
.emplace_back(
2488 EncodeTimestamp(i
* 2, 0, &write_ts_strs
[i
]));
2489 read_ts_list
.emplace_back(
2490 EncodeTimestamp(1 + i
* 2, 0, &read_ts_strs
[i
]));
2491 const Slice
& write_ts
= write_ts_list
.back();
2493 wopts
.timestamp
= &write_ts
;
2494 for (int cf
= 0; cf
!= static_cast<int>(num_cfs
); ++cf
) {
2495 for (size_t j
= 0; j
!= (kNumKeysPerFile
- 1) / kNumTimestamps
; ++j
) {
2497 cf
, "key" + std::to_string(j
),
2498 "value_" + std::to_string(j
) + "_" + std::to_string(i
), wopts
));
2500 if (!memtable_only
) {
2501 ASSERT_OK(Flush(cf
));
2505 const auto& verify_db_func
= [&]() {
2506 for (size_t i
= 0; i
!= kNumTimestamps
; ++i
) {
2508 ropts
.timestamp
= &read_ts_list
[i
];
2509 for (int cf
= 0; cf
!= static_cast<int>(num_cfs
); ++cf
) {
2510 ColumnFamilyHandle
* cfh
= handles_
[cf
];
2511 for (size_t j
= 0; j
!= (kNumKeysPerFile
- 1) / kNumTimestamps
;
2515 db_
->Get(ropts
, cfh
, "key" + std::to_string(j
), &value
));
2516 ASSERT_EQ("value_" + std::to_string(j
) + "_" + std::to_string(i
),
2527 INSTANTIATE_TEST_CASE_P(Timestamp
, DBBasicTestWithTimestampWithParam
,
2530 } // namespace ROCKSDB_NAMESPACE
2532 #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
2534 void RegisterCustomObjects(int argc
, char** argv
);
2537 void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
2538 #endif // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
2540 int main(int argc
, char** argv
) {
2541 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
2542 ::testing::InitGoogleTest(&argc
, argv
);
2543 RegisterCustomObjects(argc
, argv
);
2544 return RUN_ALL_TESTS();