]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_basic_test.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / db / db_basic_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "db/db_test_util.h"
10 #include "port/stack_trace.h"
11 #include "rocksdb/perf_context.h"
12 #include "rocksdb/utilities/debug.h"
13 #include "table/block_based/block_based_table_reader.h"
14 #include "table/block_based/block_builder.h"
15 #include "test_util/fault_injection_test_env.h"
16 #if !defined(ROCKSDB_LITE)
17 #include "test_util/sync_point.h"
18 #endif
19
20 namespace ROCKSDB_NAMESPACE {
21
22 class DBBasicTest : public DBTestBase {
23 public:
24 DBBasicTest() : DBTestBase("/db_basic_test") {}
25 };
26
27 TEST_F(DBBasicTest, OpenWhenOpen) {
28 Options options = CurrentOptions();
29 options.env = env_;
30 ROCKSDB_NAMESPACE::DB* db2 = nullptr;
31 ROCKSDB_NAMESPACE::Status s = DB::Open(options, dbname_, &db2);
32
33 ASSERT_EQ(Status::Code::kIOError, s.code());
34 ASSERT_EQ(Status::SubCode::kNone, s.subcode());
35 ASSERT_TRUE(strstr(s.getState(), "lock ") != nullptr);
36
37 delete db2;
38 }
39
40 #ifndef ROCKSDB_LITE
41 TEST_F(DBBasicTest, ReadOnlyDB) {
42 ASSERT_OK(Put("foo", "v1"));
43 ASSERT_OK(Put("bar", "v2"));
44 ASSERT_OK(Put("foo", "v3"));
45 Close();
46
47 auto options = CurrentOptions();
48 assert(options.env == env_);
49 ASSERT_OK(ReadOnlyReopen(options));
50 ASSERT_EQ("v3", Get("foo"));
51 ASSERT_EQ("v2", Get("bar"));
52 Iterator* iter = db_->NewIterator(ReadOptions());
53 int count = 0;
54 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
55 ASSERT_OK(iter->status());
56 ++count;
57 }
58 ASSERT_EQ(count, 2);
59 delete iter;
60 Close();
61
62 // Reopen and flush memtable.
63 Reopen(options);
64 Flush();
65 Close();
66 // Now check keys in read only mode.
67 ASSERT_OK(ReadOnlyReopen(options));
68 ASSERT_EQ("v3", Get("foo"));
69 ASSERT_EQ("v2", Get("bar"));
70 ASSERT_TRUE(db_->SyncWAL().IsNotSupported());
71 }
72
73 TEST_F(DBBasicTest, ReadOnlyDBWithWriteDBIdToManifestSet) {
74 ASSERT_OK(Put("foo", "v1"));
75 ASSERT_OK(Put("bar", "v2"));
76 ASSERT_OK(Put("foo", "v3"));
77 Close();
78
79 auto options = CurrentOptions();
80 options.write_dbid_to_manifest = true;
81 assert(options.env == env_);
82 ASSERT_OK(ReadOnlyReopen(options));
83 std::string db_id1;
84 db_->GetDbIdentity(db_id1);
85 ASSERT_EQ("v3", Get("foo"));
86 ASSERT_EQ("v2", Get("bar"));
87 Iterator* iter = db_->NewIterator(ReadOptions());
88 int count = 0;
89 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
90 ASSERT_OK(iter->status());
91 ++count;
92 }
93 ASSERT_EQ(count, 2);
94 delete iter;
95 Close();
96
97 // Reopen and flush memtable.
98 Reopen(options);
99 Flush();
100 Close();
101 // Now check keys in read only mode.
102 ASSERT_OK(ReadOnlyReopen(options));
103 ASSERT_EQ("v3", Get("foo"));
104 ASSERT_EQ("v2", Get("bar"));
105 ASSERT_TRUE(db_->SyncWAL().IsNotSupported());
106 std::string db_id2;
107 db_->GetDbIdentity(db_id2);
108 ASSERT_EQ(db_id1, db_id2);
109 }
110
111 TEST_F(DBBasicTest, CompactedDB) {
112 const uint64_t kFileSize = 1 << 20;
113 Options options = CurrentOptions();
114 options.disable_auto_compactions = true;
115 options.write_buffer_size = kFileSize;
116 options.target_file_size_base = kFileSize;
117 options.max_bytes_for_level_base = 1 << 30;
118 options.compression = kNoCompression;
119 Reopen(options);
120 // 1 L0 file, use CompactedDB if max_open_files = -1
121 ASSERT_OK(Put("aaa", DummyString(kFileSize / 2, '1')));
122 Flush();
123 Close();
124 ASSERT_OK(ReadOnlyReopen(options));
125 Status s = Put("new", "value");
126 ASSERT_EQ(s.ToString(),
127 "Not implemented: Not supported operation in read only mode.");
128 ASSERT_EQ(DummyString(kFileSize / 2, '1'), Get("aaa"));
129 Close();
130 options.max_open_files = -1;
131 ASSERT_OK(ReadOnlyReopen(options));
132 s = Put("new", "value");
133 ASSERT_EQ(s.ToString(),
134 "Not implemented: Not supported in compacted db mode.");
135 ASSERT_EQ(DummyString(kFileSize / 2, '1'), Get("aaa"));
136 Close();
137 Reopen(options);
138 // Add more L0 files
139 ASSERT_OK(Put("bbb", DummyString(kFileSize / 2, '2')));
140 Flush();
141 ASSERT_OK(Put("aaa", DummyString(kFileSize / 2, 'a')));
142 Flush();
143 ASSERT_OK(Put("bbb", DummyString(kFileSize / 2, 'b')));
144 ASSERT_OK(Put("eee", DummyString(kFileSize / 2, 'e')));
145 Flush();
146 Close();
147
148 ASSERT_OK(ReadOnlyReopen(options));
149 // Fallback to read-only DB
150 s = Put("new", "value");
151 ASSERT_EQ(s.ToString(),
152 "Not implemented: Not supported operation in read only mode.");
153 Close();
154
155 // Full compaction
156 Reopen(options);
157 // Add more keys
158 ASSERT_OK(Put("fff", DummyString(kFileSize / 2, 'f')));
159 ASSERT_OK(Put("hhh", DummyString(kFileSize / 2, 'h')));
160 ASSERT_OK(Put("iii", DummyString(kFileSize / 2, 'i')));
161 ASSERT_OK(Put("jjj", DummyString(kFileSize / 2, 'j')));
162 db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
163 ASSERT_EQ(3, NumTableFilesAtLevel(1));
164 Close();
165
166 // CompactedDB
167 ASSERT_OK(ReadOnlyReopen(options));
168 s = Put("new", "value");
169 ASSERT_EQ(s.ToString(),
170 "Not implemented: Not supported in compacted db mode.");
171 ASSERT_EQ("NOT_FOUND", Get("abc"));
172 ASSERT_EQ(DummyString(kFileSize / 2, 'a'), Get("aaa"));
173 ASSERT_EQ(DummyString(kFileSize / 2, 'b'), Get("bbb"));
174 ASSERT_EQ("NOT_FOUND", Get("ccc"));
175 ASSERT_EQ(DummyString(kFileSize / 2, 'e'), Get("eee"));
176 ASSERT_EQ(DummyString(kFileSize / 2, 'f'), Get("fff"));
177 ASSERT_EQ("NOT_FOUND", Get("ggg"));
178 ASSERT_EQ(DummyString(kFileSize / 2, 'h'), Get("hhh"));
179 ASSERT_EQ(DummyString(kFileSize / 2, 'i'), Get("iii"));
180 ASSERT_EQ(DummyString(kFileSize / 2, 'j'), Get("jjj"));
181 ASSERT_EQ("NOT_FOUND", Get("kkk"));
182
183 // MultiGet
184 std::vector<std::string> values;
185 std::vector<Status> status_list = dbfull()->MultiGet(
186 ReadOptions(),
187 std::vector<Slice>({Slice("aaa"), Slice("ccc"), Slice("eee"),
188 Slice("ggg"), Slice("iii"), Slice("kkk")}),
189 &values);
190 ASSERT_EQ(status_list.size(), static_cast<uint64_t>(6));
191 ASSERT_EQ(values.size(), static_cast<uint64_t>(6));
192 ASSERT_OK(status_list[0]);
193 ASSERT_EQ(DummyString(kFileSize / 2, 'a'), values[0]);
194 ASSERT_TRUE(status_list[1].IsNotFound());
195 ASSERT_OK(status_list[2]);
196 ASSERT_EQ(DummyString(kFileSize / 2, 'e'), values[2]);
197 ASSERT_TRUE(status_list[3].IsNotFound());
198 ASSERT_OK(status_list[4]);
199 ASSERT_EQ(DummyString(kFileSize / 2, 'i'), values[4]);
200 ASSERT_TRUE(status_list[5].IsNotFound());
201
202 Reopen(options);
203 // Add a key
204 ASSERT_OK(Put("fff", DummyString(kFileSize / 2, 'f')));
205 Close();
206 ASSERT_OK(ReadOnlyReopen(options));
207 s = Put("new", "value");
208 ASSERT_EQ(s.ToString(),
209 "Not implemented: Not supported operation in read only mode.");
210 }
211
212 TEST_F(DBBasicTest, LevelLimitReopen) {
213 Options options = CurrentOptions();
214 CreateAndReopenWithCF({"pikachu"}, options);
215
216 const std::string value(1024 * 1024, ' ');
217 int i = 0;
218 while (NumTableFilesAtLevel(2, 1) == 0) {
219 ASSERT_OK(Put(1, Key(i++), value));
220 dbfull()->TEST_WaitForFlushMemTable();
221 dbfull()->TEST_WaitForCompact();
222 }
223
224 options.num_levels = 1;
225 options.max_bytes_for_level_multiplier_additional.resize(1, 1);
226 Status s = TryReopenWithColumnFamilies({"default", "pikachu"}, options);
227 ASSERT_EQ(s.IsInvalidArgument(), true);
228 ASSERT_EQ(s.ToString(),
229 "Invalid argument: db has more levels than options.num_levels");
230
231 options.num_levels = 10;
232 options.max_bytes_for_level_multiplier_additional.resize(10, 1);
233 ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
234 }
235 #endif // ROCKSDB_LITE
236
237 TEST_F(DBBasicTest, PutDeleteGet) {
238 do {
239 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
240 ASSERT_OK(Put(1, "foo", "v1"));
241 ASSERT_EQ("v1", Get(1, "foo"));
242 ASSERT_OK(Put(1, "foo", "v2"));
243 ASSERT_EQ("v2", Get(1, "foo"));
244 ASSERT_OK(Delete(1, "foo"));
245 ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
246 } while (ChangeOptions());
247 }
248
249 TEST_F(DBBasicTest, PutSingleDeleteGet) {
250 do {
251 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
252 ASSERT_OK(Put(1, "foo", "v1"));
253 ASSERT_EQ("v1", Get(1, "foo"));
254 ASSERT_OK(Put(1, "foo2", "v2"));
255 ASSERT_EQ("v2", Get(1, "foo2"));
256 ASSERT_OK(SingleDelete(1, "foo"));
257 ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
258 // Ski FIFO and universal compaction because they do not apply to the test
259 // case. Skip MergePut because single delete does not get removed when it
260 // encounters a merge.
261 } while (ChangeOptions(kSkipFIFOCompaction | kSkipUniversalCompaction |
262 kSkipMergePut));
263 }
264
265 TEST_F(DBBasicTest, EmptyFlush) {
266 // It is possible to produce empty flushes when using single deletes. Tests
267 // whether empty flushes cause issues.
268 do {
269 Random rnd(301);
270
271 Options options = CurrentOptions();
272 options.disable_auto_compactions = true;
273 CreateAndReopenWithCF({"pikachu"}, options);
274
275 Put(1, "a", Slice());
276 SingleDelete(1, "a");
277 ASSERT_OK(Flush(1));
278
279 ASSERT_EQ("[ ]", AllEntriesFor("a", 1));
280 // Skip FIFO and universal compaction as they do not apply to the test
281 // case. Skip MergePut because merges cannot be combined with single
282 // deletions.
283 } while (ChangeOptions(kSkipFIFOCompaction | kSkipUniversalCompaction |
284 kSkipMergePut));
285 }
286
287 TEST_F(DBBasicTest, GetFromVersions) {
288 do {
289 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
290 ASSERT_OK(Put(1, "foo", "v1"));
291 ASSERT_OK(Flush(1));
292 ASSERT_EQ("v1", Get(1, "foo"));
293 ASSERT_EQ("NOT_FOUND", Get(0, "foo"));
294 } while (ChangeOptions());
295 }
296
297 #ifndef ROCKSDB_LITE
298 TEST_F(DBBasicTest, GetSnapshot) {
299 anon::OptionsOverride options_override;
300 options_override.skip_policy = kSkipNoSnapshot;
301 do {
302 CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override));
303 // Try with both a short key and a long key
304 for (int i = 0; i < 2; i++) {
305 std::string key = (i == 0) ? std::string("foo") : std::string(200, 'x');
306 ASSERT_OK(Put(1, key, "v1"));
307 const Snapshot* s1 = db_->GetSnapshot();
308 ASSERT_OK(Put(1, key, "v2"));
309 ASSERT_EQ("v2", Get(1, key));
310 ASSERT_EQ("v1", Get(1, key, s1));
311 ASSERT_OK(Flush(1));
312 ASSERT_EQ("v2", Get(1, key));
313 ASSERT_EQ("v1", Get(1, key, s1));
314 db_->ReleaseSnapshot(s1);
315 }
316 } while (ChangeOptions());
317 }
318 #endif // ROCKSDB_LITE
319
320 TEST_F(DBBasicTest, CheckLock) {
321 do {
322 DB* localdb;
323 Options options = CurrentOptions();
324 ASSERT_OK(TryReopen(options));
325
326 // second open should fail
327 ASSERT_TRUE(!(DB::Open(options, dbname_, &localdb)).ok());
328 } while (ChangeCompactOptions());
329 }
330
331 TEST_F(DBBasicTest, FlushMultipleMemtable) {
332 do {
333 Options options = CurrentOptions();
334 WriteOptions writeOpt = WriteOptions();
335 writeOpt.disableWAL = true;
336 options.max_write_buffer_number = 4;
337 options.min_write_buffer_number_to_merge = 3;
338 options.max_write_buffer_size_to_maintain = -1;
339 CreateAndReopenWithCF({"pikachu"}, options);
340 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
341 ASSERT_OK(Flush(1));
342 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
343
344 ASSERT_EQ("v1", Get(1, "foo"));
345 ASSERT_EQ("v1", Get(1, "bar"));
346 ASSERT_OK(Flush(1));
347 } while (ChangeCompactOptions());
348 }
349
350 TEST_F(DBBasicTest, FlushEmptyColumnFamily) {
351 // Block flush thread and disable compaction thread
352 env_->SetBackgroundThreads(1, Env::HIGH);
353 env_->SetBackgroundThreads(1, Env::LOW);
354 test::SleepingBackgroundTask sleeping_task_low;
355 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
356 Env::Priority::LOW);
357 test::SleepingBackgroundTask sleeping_task_high;
358 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
359 &sleeping_task_high, Env::Priority::HIGH);
360
361 Options options = CurrentOptions();
362 // disable compaction
363 options.disable_auto_compactions = true;
364 WriteOptions writeOpt = WriteOptions();
365 writeOpt.disableWAL = true;
366 options.max_write_buffer_number = 2;
367 options.min_write_buffer_number_to_merge = 1;
368 options.max_write_buffer_size_to_maintain =
369 static_cast<int64_t>(options.write_buffer_size);
370 CreateAndReopenWithCF({"pikachu"}, options);
371
372 // Compaction can still go through even if no thread can flush the
373 // mem table.
374 ASSERT_OK(Flush(0));
375 ASSERT_OK(Flush(1));
376
377 // Insert can go through
378 ASSERT_OK(dbfull()->Put(writeOpt, handles_[0], "foo", "v1"));
379 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
380
381 ASSERT_EQ("v1", Get(0, "foo"));
382 ASSERT_EQ("v1", Get(1, "bar"));
383
384 sleeping_task_high.WakeUp();
385 sleeping_task_high.WaitUntilDone();
386
387 // Flush can still go through.
388 ASSERT_OK(Flush(0));
389 ASSERT_OK(Flush(1));
390
391 sleeping_task_low.WakeUp();
392 sleeping_task_low.WaitUntilDone();
393 }
394
395 TEST_F(DBBasicTest, FLUSH) {
396 do {
397 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
398 WriteOptions writeOpt = WriteOptions();
399 writeOpt.disableWAL = true;
400 SetPerfLevel(kEnableTime);
401 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
402 // this will now also flush the last 2 writes
403 ASSERT_OK(Flush(1));
404 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
405
406 get_perf_context()->Reset();
407 Get(1, "foo");
408 ASSERT_TRUE((int)get_perf_context()->get_from_output_files_time > 0);
409 ASSERT_EQ(2, (int)get_perf_context()->get_read_bytes);
410
411 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
412 ASSERT_EQ("v1", Get(1, "foo"));
413 ASSERT_EQ("v1", Get(1, "bar"));
414
415 writeOpt.disableWAL = true;
416 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v2"));
417 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v2"));
418 ASSERT_OK(Flush(1));
419
420 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
421 ASSERT_EQ("v2", Get(1, "bar"));
422 get_perf_context()->Reset();
423 ASSERT_EQ("v2", Get(1, "foo"));
424 ASSERT_TRUE((int)get_perf_context()->get_from_output_files_time > 0);
425
426 writeOpt.disableWAL = false;
427 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v3"));
428 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v3"));
429 ASSERT_OK(Flush(1));
430
431 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
432 // 'foo' should be there because its put
433 // has WAL enabled.
434 ASSERT_EQ("v3", Get(1, "foo"));
435 ASSERT_EQ("v3", Get(1, "bar"));
436
437 SetPerfLevel(kDisable);
438 } while (ChangeCompactOptions());
439 }
440
441 TEST_F(DBBasicTest, ManifestRollOver) {
442 do {
443 Options options;
444 options.max_manifest_file_size = 10; // 10 bytes
445 options = CurrentOptions(options);
446 CreateAndReopenWithCF({"pikachu"}, options);
447 {
448 ASSERT_OK(Put(1, "manifest_key1", std::string(1000, '1')));
449 ASSERT_OK(Put(1, "manifest_key2", std::string(1000, '2')));
450 ASSERT_OK(Put(1, "manifest_key3", std::string(1000, '3')));
451 uint64_t manifest_before_flush = dbfull()->TEST_Current_Manifest_FileNo();
452 ASSERT_OK(Flush(1)); // This should trigger LogAndApply.
453 uint64_t manifest_after_flush = dbfull()->TEST_Current_Manifest_FileNo();
454 ASSERT_GT(manifest_after_flush, manifest_before_flush);
455 ReopenWithColumnFamilies({"default", "pikachu"}, options);
456 ASSERT_GT(dbfull()->TEST_Current_Manifest_FileNo(), manifest_after_flush);
457 // check if a new manifest file got inserted or not.
458 ASSERT_EQ(std::string(1000, '1'), Get(1, "manifest_key1"));
459 ASSERT_EQ(std::string(1000, '2'), Get(1, "manifest_key2"));
460 ASSERT_EQ(std::string(1000, '3'), Get(1, "manifest_key3"));
461 }
462 } while (ChangeCompactOptions());
463 }
464
465 TEST_F(DBBasicTest, IdentityAcrossRestarts1) {
466 do {
467 std::string id1;
468 ASSERT_OK(db_->GetDbIdentity(id1));
469
470 Options options = CurrentOptions();
471 Reopen(options);
472 std::string id2;
473 ASSERT_OK(db_->GetDbIdentity(id2));
474 // id1 should match id2 because identity was not regenerated
475 ASSERT_EQ(id1.compare(id2), 0);
476
477 std::string idfilename = IdentityFileName(dbname_);
478 ASSERT_OK(env_->DeleteFile(idfilename));
479 Reopen(options);
480 std::string id3;
481 ASSERT_OK(db_->GetDbIdentity(id3));
482 if (options.write_dbid_to_manifest) {
483 ASSERT_EQ(id1.compare(id3), 0);
484 } else {
485 // id1 should NOT match id3 because identity was regenerated
486 ASSERT_NE(id1.compare(id3), 0);
487 }
488 } while (ChangeCompactOptions());
489 }
490
491 TEST_F(DBBasicTest, IdentityAcrossRestarts2) {
492 do {
493 std::string id1;
494 ASSERT_OK(db_->GetDbIdentity(id1));
495
496 Options options = CurrentOptions();
497 options.write_dbid_to_manifest = true;
498 Reopen(options);
499 std::string id2;
500 ASSERT_OK(db_->GetDbIdentity(id2));
501 // id1 should match id2 because identity was not regenerated
502 ASSERT_EQ(id1.compare(id2), 0);
503
504 std::string idfilename = IdentityFileName(dbname_);
505 ASSERT_OK(env_->DeleteFile(idfilename));
506 Reopen(options);
507 std::string id3;
508 ASSERT_OK(db_->GetDbIdentity(id3));
509 // id1 should NOT match id3 because identity was regenerated
510 ASSERT_EQ(id1, id3);
511 } while (ChangeCompactOptions());
512 }
513
514 #ifndef ROCKSDB_LITE
515 TEST_F(DBBasicTest, Snapshot) {
516 anon::OptionsOverride options_override;
517 options_override.skip_policy = kSkipNoSnapshot;
518 do {
519 CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override));
520 Put(0, "foo", "0v1");
521 Put(1, "foo", "1v1");
522
523 const Snapshot* s1 = db_->GetSnapshot();
524 ASSERT_EQ(1U, GetNumSnapshots());
525 uint64_t time_snap1 = GetTimeOldestSnapshots();
526 ASSERT_GT(time_snap1, 0U);
527 ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
528 Put(0, "foo", "0v2");
529 Put(1, "foo", "1v2");
530
531 env_->addon_time_.fetch_add(1);
532
533 const Snapshot* s2 = db_->GetSnapshot();
534 ASSERT_EQ(2U, GetNumSnapshots());
535 ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
536 ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
537 Put(0, "foo", "0v3");
538 Put(1, "foo", "1v3");
539
540 {
541 ManagedSnapshot s3(db_);
542 ASSERT_EQ(3U, GetNumSnapshots());
543 ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
544 ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
545
546 Put(0, "foo", "0v4");
547 Put(1, "foo", "1v4");
548 ASSERT_EQ("0v1", Get(0, "foo", s1));
549 ASSERT_EQ("1v1", Get(1, "foo", s1));
550 ASSERT_EQ("0v2", Get(0, "foo", s2));
551 ASSERT_EQ("1v2", Get(1, "foo", s2));
552 ASSERT_EQ("0v3", Get(0, "foo", s3.snapshot()));
553 ASSERT_EQ("1v3", Get(1, "foo", s3.snapshot()));
554 ASSERT_EQ("0v4", Get(0, "foo"));
555 ASSERT_EQ("1v4", Get(1, "foo"));
556 }
557
558 ASSERT_EQ(2U, GetNumSnapshots());
559 ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
560 ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
561 ASSERT_EQ("0v1", Get(0, "foo", s1));
562 ASSERT_EQ("1v1", Get(1, "foo", s1));
563 ASSERT_EQ("0v2", Get(0, "foo", s2));
564 ASSERT_EQ("1v2", Get(1, "foo", s2));
565 ASSERT_EQ("0v4", Get(0, "foo"));
566 ASSERT_EQ("1v4", Get(1, "foo"));
567
568 db_->ReleaseSnapshot(s1);
569 ASSERT_EQ("0v2", Get(0, "foo", s2));
570 ASSERT_EQ("1v2", Get(1, "foo", s2));
571 ASSERT_EQ("0v4", Get(0, "foo"));
572 ASSERT_EQ("1v4", Get(1, "foo"));
573 ASSERT_EQ(1U, GetNumSnapshots());
574 ASSERT_LT(time_snap1, GetTimeOldestSnapshots());
575 ASSERT_EQ(GetSequenceOldestSnapshots(), s2->GetSequenceNumber());
576
577 db_->ReleaseSnapshot(s2);
578 ASSERT_EQ(0U, GetNumSnapshots());
579 ASSERT_EQ(GetSequenceOldestSnapshots(), 0);
580 ASSERT_EQ("0v4", Get(0, "foo"));
581 ASSERT_EQ("1v4", Get(1, "foo"));
582 } while (ChangeOptions());
583 }
584
585 #endif // ROCKSDB_LITE
586
587 TEST_F(DBBasicTest, CompactBetweenSnapshots) {
588 anon::OptionsOverride options_override;
589 options_override.skip_policy = kSkipNoSnapshot;
590 do {
591 Options options = CurrentOptions(options_override);
592 options.disable_auto_compactions = true;
593 CreateAndReopenWithCF({"pikachu"}, options);
594 Random rnd(301);
595 FillLevels("a", "z", 1);
596
597 Put(1, "foo", "first");
598 const Snapshot* snapshot1 = db_->GetSnapshot();
599 Put(1, "foo", "second");
600 Put(1, "foo", "third");
601 Put(1, "foo", "fourth");
602 const Snapshot* snapshot2 = db_->GetSnapshot();
603 Put(1, "foo", "fifth");
604 Put(1, "foo", "sixth");
605
606 // All entries (including duplicates) exist
607 // before any compaction or flush is triggered.
608 ASSERT_EQ(AllEntriesFor("foo", 1),
609 "[ sixth, fifth, fourth, third, second, first ]");
610 ASSERT_EQ("sixth", Get(1, "foo"));
611 ASSERT_EQ("fourth", Get(1, "foo", snapshot2));
612 ASSERT_EQ("first", Get(1, "foo", snapshot1));
613
614 // After a flush, "second", "third" and "fifth" should
615 // be removed
616 ASSERT_OK(Flush(1));
617 ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth, fourth, first ]");
618
619 // after we release the snapshot1, only two values left
620 db_->ReleaseSnapshot(snapshot1);
621 FillLevels("a", "z", 1);
622 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
623 nullptr);
624
625 // We have only one valid snapshot snapshot2. Since snapshot1 is
626 // not valid anymore, "first" should be removed by a compaction.
627 ASSERT_EQ("sixth", Get(1, "foo"));
628 ASSERT_EQ("fourth", Get(1, "foo", snapshot2));
629 ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth, fourth ]");
630
631 // after we release the snapshot2, only one value should be left
632 db_->ReleaseSnapshot(snapshot2);
633 FillLevels("a", "z", 1);
634 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
635 nullptr);
636 ASSERT_EQ("sixth", Get(1, "foo"));
637 ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth ]");
638 } while (ChangeOptions(kSkipFIFOCompaction));
639 }
640
641 TEST_F(DBBasicTest, DBOpen_Options) {
642 Options options = CurrentOptions();
643 Close();
644 Destroy(options);
645
646 // Does not exist, and create_if_missing == false: error
647 DB* db = nullptr;
648 options.create_if_missing = false;
649 Status s = DB::Open(options, dbname_, &db);
650 ASSERT_TRUE(strstr(s.ToString().c_str(), "does not exist") != nullptr);
651 ASSERT_TRUE(db == nullptr);
652
653 // Does not exist, and create_if_missing == true: OK
654 options.create_if_missing = true;
655 s = DB::Open(options, dbname_, &db);
656 ASSERT_OK(s);
657 ASSERT_TRUE(db != nullptr);
658
659 delete db;
660 db = nullptr;
661
662 // Does exist, and error_if_exists == true: error
663 options.create_if_missing = false;
664 options.error_if_exists = true;
665 s = DB::Open(options, dbname_, &db);
666 ASSERT_TRUE(strstr(s.ToString().c_str(), "exists") != nullptr);
667 ASSERT_TRUE(db == nullptr);
668
669 // Does exist, and error_if_exists == false: OK
670 options.create_if_missing = true;
671 options.error_if_exists = false;
672 s = DB::Open(options, dbname_, &db);
673 ASSERT_OK(s);
674 ASSERT_TRUE(db != nullptr);
675
676 delete db;
677 db = nullptr;
678 }
679
680 TEST_F(DBBasicTest, CompactOnFlush) {
681 anon::OptionsOverride options_override;
682 options_override.skip_policy = kSkipNoSnapshot;
683 do {
684 Options options = CurrentOptions(options_override);
685 options.disable_auto_compactions = true;
686 CreateAndReopenWithCF({"pikachu"}, options);
687
688 Put(1, "foo", "v1");
689 ASSERT_OK(Flush(1));
690 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v1 ]");
691
692 // Write two new keys
693 Put(1, "a", "begin");
694 Put(1, "z", "end");
695 Flush(1);
696
697 // Case1: Delete followed by a put
698 Delete(1, "foo");
699 Put(1, "foo", "v2");
700 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, DEL, v1 ]");
701
702 // After the current memtable is flushed, the DEL should
703 // have been removed
704 ASSERT_OK(Flush(1));
705 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, v1 ]");
706
707 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
708 nullptr);
709 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2 ]");
710
711 // Case 2: Delete followed by another delete
712 Delete(1, "foo");
713 Delete(1, "foo");
714 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, DEL, v2 ]");
715 ASSERT_OK(Flush(1));
716 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v2 ]");
717 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
718 nullptr);
719 ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
720
721 // Case 3: Put followed by a delete
722 Put(1, "foo", "v3");
723 Delete(1, "foo");
724 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v3 ]");
725 ASSERT_OK(Flush(1));
726 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL ]");
727 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
728 nullptr);
729 ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
730
731 // Case 4: Put followed by another Put
732 Put(1, "foo", "v4");
733 Put(1, "foo", "v5");
734 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5, v4 ]");
735 ASSERT_OK(Flush(1));
736 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]");
737 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
738 nullptr);
739 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]");
740
741 // clear database
742 Delete(1, "foo");
743 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
744 nullptr);
745 ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
746
747 // Case 5: Put followed by snapshot followed by another Put
748 // Both puts should remain.
749 Put(1, "foo", "v6");
750 const Snapshot* snapshot = db_->GetSnapshot();
751 Put(1, "foo", "v7");
752 ASSERT_OK(Flush(1));
753 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v7, v6 ]");
754 db_->ReleaseSnapshot(snapshot);
755
756 // clear database
757 Delete(1, "foo");
758 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
759 nullptr);
760 ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
761
762 // Case 5: snapshot followed by a put followed by another Put
763 // Only the last put should remain.
764 const Snapshot* snapshot1 = db_->GetSnapshot();
765 Put(1, "foo", "v8");
766 Put(1, "foo", "v9");
767 ASSERT_OK(Flush(1));
768 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v9 ]");
769 db_->ReleaseSnapshot(snapshot1);
770 } while (ChangeCompactOptions());
771 }
772
773 TEST_F(DBBasicTest, FlushOneColumnFamily) {
774 Options options = CurrentOptions();
775 CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
776 "alyosha", "popovich"},
777 options);
778
779 ASSERT_OK(Put(0, "Default", "Default"));
780 ASSERT_OK(Put(1, "pikachu", "pikachu"));
781 ASSERT_OK(Put(2, "ilya", "ilya"));
782 ASSERT_OK(Put(3, "muromec", "muromec"));
783 ASSERT_OK(Put(4, "dobrynia", "dobrynia"));
784 ASSERT_OK(Put(5, "nikitich", "nikitich"));
785 ASSERT_OK(Put(6, "alyosha", "alyosha"));
786 ASSERT_OK(Put(7, "popovich", "popovich"));
787
788 for (int i = 0; i < 8; ++i) {
789 Flush(i);
790 auto tables = ListTableFiles(env_, dbname_);
791 ASSERT_EQ(tables.size(), i + 1U);
792 }
793 }
794
795 TEST_F(DBBasicTest, MultiGetSimple) {
796 do {
797 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
798 SetPerfLevel(kEnableCount);
799 ASSERT_OK(Put(1, "k1", "v1"));
800 ASSERT_OK(Put(1, "k2", "v2"));
801 ASSERT_OK(Put(1, "k3", "v3"));
802 ASSERT_OK(Put(1, "k4", "v4"));
803 ASSERT_OK(Delete(1, "k4"));
804 ASSERT_OK(Put(1, "k5", "v5"));
805 ASSERT_OK(Delete(1, "no_key"));
806
807 std::vector<Slice> keys({"k1", "k2", "k3", "k4", "k5", "no_key"});
808
809 std::vector<std::string> values(20, "Temporary data to be overwritten");
810 std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
811
812 get_perf_context()->Reset();
813 std::vector<Status> s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
814 ASSERT_EQ(values.size(), keys.size());
815 ASSERT_EQ(values[0], "v1");
816 ASSERT_EQ(values[1], "v2");
817 ASSERT_EQ(values[2], "v3");
818 ASSERT_EQ(values[4], "v5");
819 // four kv pairs * two bytes per value
820 ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
821
822 ASSERT_OK(s[0]);
823 ASSERT_OK(s[1]);
824 ASSERT_OK(s[2]);
825 ASSERT_TRUE(s[3].IsNotFound());
826 ASSERT_OK(s[4]);
827 ASSERT_TRUE(s[5].IsNotFound());
828 SetPerfLevel(kDisable);
829 } while (ChangeCompactOptions());
830 }
831
832 TEST_F(DBBasicTest, MultiGetEmpty) {
833 do {
834 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
835 // Empty Key Set
836 std::vector<Slice> keys;
837 std::vector<std::string> values;
838 std::vector<ColumnFamilyHandle*> cfs;
839 std::vector<Status> s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
840 ASSERT_EQ(s.size(), 0U);
841
842 // Empty Database, Empty Key Set
843 Options options = CurrentOptions();
844 options.create_if_missing = true;
845 DestroyAndReopen(options);
846 CreateAndReopenWithCF({"pikachu"}, options);
847 s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
848 ASSERT_EQ(s.size(), 0U);
849
850 // Empty Database, Search for Keys
851 keys.resize(2);
852 keys[0] = "a";
853 keys[1] = "b";
854 cfs.push_back(handles_[0]);
855 cfs.push_back(handles_[1]);
856 s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
857 ASSERT_EQ(static_cast<int>(s.size()), 2);
858 ASSERT_TRUE(s[0].IsNotFound() && s[1].IsNotFound());
859 } while (ChangeCompactOptions());
860 }
861
862 TEST_F(DBBasicTest, ChecksumTest) {
863 BlockBasedTableOptions table_options;
864 Options options = CurrentOptions();
865 // change when new checksum type added
866 int max_checksum = static_cast<int>(kxxHash64);
867 const int kNumPerFile = 2;
868
869 // generate one table with each type of checksum
870 for (int i = 0; i <= max_checksum; ++i) {
871 table_options.checksum = static_cast<ChecksumType>(i);
872 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
873 Reopen(options);
874 for (int j = 0; j < kNumPerFile; ++j) {
875 ASSERT_OK(Put(Key(i * kNumPerFile + j), Key(i * kNumPerFile + j)));
876 }
877 ASSERT_OK(Flush());
878 }
879
880 // with each valid checksum type setting...
881 for (int i = 0; i <= max_checksum; ++i) {
882 table_options.checksum = static_cast<ChecksumType>(i);
883 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
884 Reopen(options);
885 // verify every type of checksum (should be regardless of that setting)
886 for (int j = 0; j < (max_checksum + 1) * kNumPerFile; ++j) {
887 ASSERT_EQ(Key(j), Get(Key(j)));
888 }
889 }
890 }
891
892 // On Windows you can have either memory mapped file or a file
893 // with unbuffered access. So this asserts and does not make
894 // sense to run
895 #ifndef OS_WIN
896 TEST_F(DBBasicTest, MmapAndBufferOptions) {
897 if (!IsMemoryMappedAccessSupported()) {
898 return;
899 }
900 Options options = CurrentOptions();
901
902 options.use_direct_reads = true;
903 options.allow_mmap_reads = true;
904 ASSERT_NOK(TryReopen(options));
905
906 // All other combinations are acceptable
907 options.use_direct_reads = false;
908 ASSERT_OK(TryReopen(options));
909
910 if (IsDirectIOSupported()) {
911 options.use_direct_reads = true;
912 options.allow_mmap_reads = false;
913 ASSERT_OK(TryReopen(options));
914 }
915
916 options.use_direct_reads = false;
917 ASSERT_OK(TryReopen(options));
918 }
919 #endif
920
921 class TestEnv : public EnvWrapper {
922 public:
923 explicit TestEnv(Env* base_env) : EnvWrapper(base_env), close_count(0) {}
924
925 class TestLogger : public Logger {
926 public:
927 using Logger::Logv;
928 explicit TestLogger(TestEnv* env_ptr) : Logger() { env = env_ptr; }
929 ~TestLogger() override {
930 if (!closed_) {
931 CloseHelper();
932 }
933 }
934 void Logv(const char* /*format*/, va_list /*ap*/) override {}
935
936 protected:
937 Status CloseImpl() override { return CloseHelper(); }
938
939 private:
940 Status CloseHelper() {
941 env->CloseCountInc();
942 ;
943 return Status::IOError();
944 }
945 TestEnv* env;
946 };
947
948 void CloseCountInc() { close_count++; }
949
950 int GetCloseCount() { return close_count; }
951
952 Status NewLogger(const std::string& /*fname*/,
953 std::shared_ptr<Logger>* result) override {
954 result->reset(new TestLogger(this));
955 return Status::OK();
956 }
957
958 private:
959 int close_count;
960 };
961
962 TEST_F(DBBasicTest, DBClose) {
963 Options options = GetDefaultOptions();
964 std::string dbname = test::PerThreadDBPath("db_close_test");
965 ASSERT_OK(DestroyDB(dbname, options));
966
967 DB* db = nullptr;
968 TestEnv* env = new TestEnv(env_);
969 std::unique_ptr<TestEnv> local_env_guard(env);
970 options.create_if_missing = true;
971 options.env = env;
972 Status s = DB::Open(options, dbname, &db);
973 ASSERT_OK(s);
974 ASSERT_TRUE(db != nullptr);
975
976 s = db->Close();
977 ASSERT_EQ(env->GetCloseCount(), 1);
978 ASSERT_EQ(s, Status::IOError());
979
980 delete db;
981 ASSERT_EQ(env->GetCloseCount(), 1);
982
983 // Do not call DB::Close() and ensure our logger Close() still gets called
984 s = DB::Open(options, dbname, &db);
985 ASSERT_OK(s);
986 ASSERT_TRUE(db != nullptr);
987 delete db;
988 ASSERT_EQ(env->GetCloseCount(), 2);
989
990 // Provide our own logger and ensure DB::Close() does not close it
991 options.info_log.reset(new TestEnv::TestLogger(env));
992 options.create_if_missing = false;
993 s = DB::Open(options, dbname, &db);
994 ASSERT_OK(s);
995 ASSERT_TRUE(db != nullptr);
996
997 s = db->Close();
998 ASSERT_EQ(s, Status::OK());
999 delete db;
1000 ASSERT_EQ(env->GetCloseCount(), 2);
1001 options.info_log.reset();
1002 ASSERT_EQ(env->GetCloseCount(), 3);
1003 }
1004
1005 TEST_F(DBBasicTest, DBCloseFlushError) {
1006 std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
1007 new FaultInjectionTestEnv(env_));
1008 Options options = GetDefaultOptions();
1009 options.create_if_missing = true;
1010 options.manual_wal_flush = true;
1011 options.write_buffer_size=100;
1012 options.env = fault_injection_env.get();
1013
1014 Reopen(options);
1015 ASSERT_OK(Put("key1", "value1"));
1016 ASSERT_OK(Put("key2", "value2"));
1017 ASSERT_OK(dbfull()->TEST_SwitchMemtable());
1018 ASSERT_OK(Put("key3", "value3"));
1019 fault_injection_env->SetFilesystemActive(false);
1020 Status s = dbfull()->Close();
1021 fault_injection_env->SetFilesystemActive(true);
1022 ASSERT_NE(s, Status::OK());
1023
1024 Destroy(options);
1025 }
1026
1027 class DBMultiGetTestWithParam : public DBBasicTest,
1028 public testing::WithParamInterface<bool> {};
1029
1030 TEST_P(DBMultiGetTestWithParam, MultiGetMultiCF) {
1031 Options options = CurrentOptions();
1032 CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
1033 "alyosha", "popovich"},
1034 options);
1035 // <CF, key, value> tuples
1036 std::vector<std::tuple<int, std::string, std::string>> cf_kv_vec;
1037 static const int num_keys = 24;
1038 cf_kv_vec.reserve(num_keys);
1039
1040 for (int i = 0; i < num_keys; ++i) {
1041 int cf = i / 3;
1042 int cf_key = 1 % 3;
1043 cf_kv_vec.emplace_back(std::make_tuple(
1044 cf, "cf" + std::to_string(cf) + "_key_" + std::to_string(cf_key),
1045 "cf" + std::to_string(cf) + "_val_" + std::to_string(cf_key)));
1046 ASSERT_OK(Put(std::get<0>(cf_kv_vec[i]), std::get<1>(cf_kv_vec[i]),
1047 std::get<2>(cf_kv_vec[i])));
1048 }
1049
1050 int get_sv_count = 0;
1051 ROCKSDB_NAMESPACE::DBImpl* db = reinterpret_cast<DBImpl*>(db_);
1052 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1053 "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
1054 if (++get_sv_count == 2) {
1055 // After MultiGet refs a couple of CFs, flush all CFs so MultiGet
1056 // is forced to repeat the process
1057 for (int i = 0; i < num_keys; ++i) {
1058 int cf = i / 3;
1059 int cf_key = i % 8;
1060 if (cf_key == 0) {
1061 ASSERT_OK(Flush(cf));
1062 }
1063 ASSERT_OK(Put(std::get<0>(cf_kv_vec[i]), std::get<1>(cf_kv_vec[i]),
1064 std::get<2>(cf_kv_vec[i]) + "_2"));
1065 }
1066 }
1067 if (get_sv_count == 11) {
1068 for (int i = 0; i < 8; ++i) {
1069 auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
1070 db->GetColumnFamilyHandle(i))
1071 ->cfd();
1072 ASSERT_EQ(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
1073 }
1074 }
1075 });
1076 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1077
1078 std::vector<int> cfs;
1079 std::vector<std::string> keys;
1080 std::vector<std::string> values;
1081
1082 for (int i = 0; i < num_keys; ++i) {
1083 cfs.push_back(std::get<0>(cf_kv_vec[i]));
1084 keys.push_back(std::get<1>(cf_kv_vec[i]));
1085 }
1086
1087 values = MultiGet(cfs, keys, nullptr, GetParam());
1088 ASSERT_EQ(values.size(), num_keys);
1089 for (unsigned int j = 0; j < values.size(); ++j) {
1090 ASSERT_EQ(values[j], std::get<2>(cf_kv_vec[j]) + "_2");
1091 }
1092
1093 keys.clear();
1094 cfs.clear();
1095 cfs.push_back(std::get<0>(cf_kv_vec[0]));
1096 keys.push_back(std::get<1>(cf_kv_vec[0]));
1097 cfs.push_back(std::get<0>(cf_kv_vec[3]));
1098 keys.push_back(std::get<1>(cf_kv_vec[3]));
1099 cfs.push_back(std::get<0>(cf_kv_vec[4]));
1100 keys.push_back(std::get<1>(cf_kv_vec[4]));
1101 values = MultiGet(cfs, keys, nullptr, GetParam());
1102 ASSERT_EQ(values[0], std::get<2>(cf_kv_vec[0]) + "_2");
1103 ASSERT_EQ(values[1], std::get<2>(cf_kv_vec[3]) + "_2");
1104 ASSERT_EQ(values[2], std::get<2>(cf_kv_vec[4]) + "_2");
1105
1106 keys.clear();
1107 cfs.clear();
1108 cfs.push_back(std::get<0>(cf_kv_vec[7]));
1109 keys.push_back(std::get<1>(cf_kv_vec[7]));
1110 cfs.push_back(std::get<0>(cf_kv_vec[6]));
1111 keys.push_back(std::get<1>(cf_kv_vec[6]));
1112 cfs.push_back(std::get<0>(cf_kv_vec[1]));
1113 keys.push_back(std::get<1>(cf_kv_vec[1]));
1114 values = MultiGet(cfs, keys, nullptr, GetParam());
1115 ASSERT_EQ(values[0], std::get<2>(cf_kv_vec[7]) + "_2");
1116 ASSERT_EQ(values[1], std::get<2>(cf_kv_vec[6]) + "_2");
1117 ASSERT_EQ(values[2], std::get<2>(cf_kv_vec[1]) + "_2");
1118
1119 for (int cf = 0; cf < 8; ++cf) {
1120 auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
1121 reinterpret_cast<DBImpl*>(db_)->GetColumnFamilyHandle(cf))
1122 ->cfd();
1123 ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
1124 ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete);
1125 }
1126 }
1127
1128 TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFMutex) {
1129 Options options = CurrentOptions();
1130 CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
1131 "alyosha", "popovich"},
1132 options);
1133
1134 for (int i = 0; i < 8; ++i) {
1135 ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
1136 "cf" + std::to_string(i) + "_val"));
1137 }
1138
1139 int get_sv_count = 0;
1140 int retries = 0;
1141 bool last_try = false;
1142 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1143 "DBImpl::MultiGet::LastTry", [&](void* /*arg*/) {
1144 last_try = true;
1145 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1146 });
1147 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1148 "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
1149 if (last_try) {
1150 return;
1151 }
1152 if (++get_sv_count == 2) {
1153 ++retries;
1154 get_sv_count = 0;
1155 for (int i = 0; i < 8; ++i) {
1156 ASSERT_OK(Flush(i));
1157 ASSERT_OK(Put(
1158 i, "cf" + std::to_string(i) + "_key",
1159 "cf" + std::to_string(i) + "_val" + std::to_string(retries)));
1160 }
1161 }
1162 });
1163 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1164
1165 std::vector<int> cfs;
1166 std::vector<std::string> keys;
1167 std::vector<std::string> values;
1168
1169 for (int i = 0; i < 8; ++i) {
1170 cfs.push_back(i);
1171 keys.push_back("cf" + std::to_string(i) + "_key");
1172 }
1173
1174 values = MultiGet(cfs, keys, nullptr, GetParam());
1175 ASSERT_TRUE(last_try);
1176 ASSERT_EQ(values.size(), 8);
1177 for (unsigned int j = 0; j < values.size(); ++j) {
1178 ASSERT_EQ(values[j],
1179 "cf" + std::to_string(j) + "_val" + std::to_string(retries));
1180 }
1181 for (int i = 0; i < 8; ++i) {
1182 auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
1183 reinterpret_cast<DBImpl*>(db_)->GetColumnFamilyHandle(i))
1184 ->cfd();
1185 ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
1186 }
1187 }
1188
1189 TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFSnapshot) {
1190 Options options = CurrentOptions();
1191 CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
1192 "alyosha", "popovich"},
1193 options);
1194
1195 for (int i = 0; i < 8; ++i) {
1196 ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
1197 "cf" + std::to_string(i) + "_val"));
1198 }
1199
1200 int get_sv_count = 0;
1201 ROCKSDB_NAMESPACE::DBImpl* db = reinterpret_cast<DBImpl*>(db_);
1202 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1203 "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
1204 if (++get_sv_count == 2) {
1205 for (int i = 0; i < 8; ++i) {
1206 ASSERT_OK(Flush(i));
1207 ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
1208 "cf" + std::to_string(i) + "_val2"));
1209 }
1210 }
1211 if (get_sv_count == 8) {
1212 for (int i = 0; i < 8; ++i) {
1213 auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
1214 db->GetColumnFamilyHandle(i))
1215 ->cfd();
1216 ASSERT_TRUE(
1217 (cfd->TEST_GetLocalSV()->Get() == SuperVersion::kSVInUse) ||
1218 (cfd->TEST_GetLocalSV()->Get() == SuperVersion::kSVObsolete));
1219 }
1220 }
1221 });
1222 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1223
1224 std::vector<int> cfs;
1225 std::vector<std::string> keys;
1226 std::vector<std::string> values;
1227
1228 for (int i = 0; i < 8; ++i) {
1229 cfs.push_back(i);
1230 keys.push_back("cf" + std::to_string(i) + "_key");
1231 }
1232
1233 const Snapshot* snapshot = db_->GetSnapshot();
1234 values = MultiGet(cfs, keys, snapshot, GetParam());
1235 db_->ReleaseSnapshot(snapshot);
1236 ASSERT_EQ(values.size(), 8);
1237 for (unsigned int j = 0; j < values.size(); ++j) {
1238 ASSERT_EQ(values[j], "cf" + std::to_string(j) + "_val");
1239 }
1240 for (int i = 0; i < 8; ++i) {
1241 auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
1242 reinterpret_cast<DBImpl*>(db_)->GetColumnFamilyHandle(i))
1243 ->cfd();
1244 ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
1245 }
1246 }
1247
1248 INSTANTIATE_TEST_CASE_P(DBMultiGetTestWithParam, DBMultiGetTestWithParam,
1249 testing::Bool());
1250
1251 TEST_F(DBBasicTest, MultiGetBatchedSimpleUnsorted) {
1252 do {
1253 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
1254 SetPerfLevel(kEnableCount);
1255 ASSERT_OK(Put(1, "k1", "v1"));
1256 ASSERT_OK(Put(1, "k2", "v2"));
1257 ASSERT_OK(Put(1, "k3", "v3"));
1258 ASSERT_OK(Put(1, "k4", "v4"));
1259 ASSERT_OK(Delete(1, "k4"));
1260 ASSERT_OK(Put(1, "k5", "v5"));
1261 ASSERT_OK(Delete(1, "no_key"));
1262
1263 get_perf_context()->Reset();
1264
1265 std::vector<Slice> keys({"no_key", "k5", "k4", "k3", "k2", "k1"});
1266 std::vector<PinnableSlice> values(keys.size());
1267 std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
1268 std::vector<Status> s(keys.size());
1269
1270 db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(),
1271 values.data(), s.data(), false);
1272
1273 ASSERT_EQ(values.size(), keys.size());
1274 ASSERT_EQ(std::string(values[5].data(), values[5].size()), "v1");
1275 ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v2");
1276 ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v3");
1277 ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
1278 // four kv pairs * two bytes per value
1279 ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
1280
1281 ASSERT_TRUE(s[0].IsNotFound());
1282 ASSERT_OK(s[1]);
1283 ASSERT_TRUE(s[2].IsNotFound());
1284 ASSERT_OK(s[3]);
1285 ASSERT_OK(s[4]);
1286 ASSERT_OK(s[5]);
1287
1288 SetPerfLevel(kDisable);
1289 } while (ChangeCompactOptions());
1290 }
1291
1292 TEST_F(DBBasicTest, MultiGetBatchedSimpleSorted) {
1293 do {
1294 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
1295 SetPerfLevel(kEnableCount);
1296 ASSERT_OK(Put(1, "k1", "v1"));
1297 ASSERT_OK(Put(1, "k2", "v2"));
1298 ASSERT_OK(Put(1, "k3", "v3"));
1299 ASSERT_OK(Put(1, "k4", "v4"));
1300 ASSERT_OK(Delete(1, "k4"));
1301 ASSERT_OK(Put(1, "k5", "v5"));
1302 ASSERT_OK(Delete(1, "no_key"));
1303
1304 get_perf_context()->Reset();
1305
1306 std::vector<Slice> keys({"k1", "k2", "k3", "k4", "k5", "no_key"});
1307 std::vector<PinnableSlice> values(keys.size());
1308 std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
1309 std::vector<Status> s(keys.size());
1310
1311 db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(),
1312 values.data(), s.data(), true);
1313
1314 ASSERT_EQ(values.size(), keys.size());
1315 ASSERT_EQ(std::string(values[0].data(), values[0].size()), "v1");
1316 ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v2");
1317 ASSERT_EQ(std::string(values[2].data(), values[2].size()), "v3");
1318 ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v5");
1319 // four kv pairs * two bytes per value
1320 ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
1321
1322 ASSERT_OK(s[0]);
1323 ASSERT_OK(s[1]);
1324 ASSERT_OK(s[2]);
1325 ASSERT_TRUE(s[3].IsNotFound());
1326 ASSERT_OK(s[4]);
1327 ASSERT_TRUE(s[5].IsNotFound());
1328
1329 SetPerfLevel(kDisable);
1330 } while (ChangeCompactOptions());
1331 }
1332
1333 TEST_F(DBBasicTest, MultiGetBatchedMultiLevel) {
1334 Options options = CurrentOptions();
1335 options.disable_auto_compactions = true;
1336 Reopen(options);
1337 int num_keys = 0;
1338
1339 for (int i = 0; i < 128; ++i) {
1340 ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i)));
1341 num_keys++;
1342 if (num_keys == 8) {
1343 Flush();
1344 num_keys = 0;
1345 }
1346 }
1347 if (num_keys > 0) {
1348 Flush();
1349 num_keys = 0;
1350 }
1351 MoveFilesToLevel(2);
1352
1353 for (int i = 0; i < 128; i += 3) {
1354 ASSERT_OK(Put("key_" + std::to_string(i), "val_l1_" + std::to_string(i)));
1355 num_keys++;
1356 if (num_keys == 8) {
1357 Flush();
1358 num_keys = 0;
1359 }
1360 }
1361 if (num_keys > 0) {
1362 Flush();
1363 num_keys = 0;
1364 }
1365 MoveFilesToLevel(1);
1366
1367 for (int i = 0; i < 128; i += 5) {
1368 ASSERT_OK(Put("key_" + std::to_string(i), "val_l0_" + std::to_string(i)));
1369 num_keys++;
1370 if (num_keys == 8) {
1371 Flush();
1372 num_keys = 0;
1373 }
1374 }
1375 if (num_keys > 0) {
1376 Flush();
1377 num_keys = 0;
1378 }
1379 ASSERT_EQ(0, num_keys);
1380
1381 for (int i = 0; i < 128; i += 9) {
1382 ASSERT_OK(Put("key_" + std::to_string(i), "val_mem_" + std::to_string(i)));
1383 }
1384
1385 std::vector<std::string> keys;
1386 std::vector<std::string> values;
1387
1388 for (int i = 64; i < 80; ++i) {
1389 keys.push_back("key_" + std::to_string(i));
1390 }
1391
1392 values = MultiGet(keys, nullptr);
1393 ASSERT_EQ(values.size(), 16);
1394 for (unsigned int j = 0; j < values.size(); ++j) {
1395 int key = j + 64;
1396 if (key % 9 == 0) {
1397 ASSERT_EQ(values[j], "val_mem_" + std::to_string(key));
1398 } else if (key % 5 == 0) {
1399 ASSERT_EQ(values[j], "val_l0_" + std::to_string(key));
1400 } else if (key % 3 == 0) {
1401 ASSERT_EQ(values[j], "val_l1_" + std::to_string(key));
1402 } else {
1403 ASSERT_EQ(values[j], "val_l2_" + std::to_string(key));
1404 }
1405 }
1406 }
1407
1408 TEST_F(DBBasicTest, MultiGetBatchedMultiLevelMerge) {
1409 Options options = CurrentOptions();
1410 options.disable_auto_compactions = true;
1411 options.merge_operator = MergeOperators::CreateStringAppendOperator();
1412 BlockBasedTableOptions bbto;
1413 bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
1414 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
1415 Reopen(options);
1416 int num_keys = 0;
1417
1418 for (int i = 0; i < 128; ++i) {
1419 ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i)));
1420 num_keys++;
1421 if (num_keys == 8) {
1422 Flush();
1423 num_keys = 0;
1424 }
1425 }
1426 if (num_keys > 0) {
1427 Flush();
1428 num_keys = 0;
1429 }
1430 MoveFilesToLevel(2);
1431
1432 for (int i = 0; i < 128; i += 3) {
1433 ASSERT_OK(Merge("key_" + std::to_string(i), "val_l1_" + std::to_string(i)));
1434 num_keys++;
1435 if (num_keys == 8) {
1436 Flush();
1437 num_keys = 0;
1438 }
1439 }
1440 if (num_keys > 0) {
1441 Flush();
1442 num_keys = 0;
1443 }
1444 MoveFilesToLevel(1);
1445
1446 for (int i = 0; i < 128; i += 5) {
1447 ASSERT_OK(Merge("key_" + std::to_string(i), "val_l0_" + std::to_string(i)));
1448 num_keys++;
1449 if (num_keys == 8) {
1450 Flush();
1451 num_keys = 0;
1452 }
1453 }
1454 if (num_keys > 0) {
1455 Flush();
1456 num_keys = 0;
1457 }
1458 ASSERT_EQ(0, num_keys);
1459
1460 for (int i = 0; i < 128; i += 9) {
1461 ASSERT_OK(Merge("key_" + std::to_string(i), "val_mem_" + std::to_string(i)));
1462 }
1463
1464 std::vector<std::string> keys;
1465 std::vector<std::string> values;
1466
1467 for (int i = 32; i < 80; ++i) {
1468 keys.push_back("key_" + std::to_string(i));
1469 }
1470
1471 values = MultiGet(keys, nullptr);
1472 ASSERT_EQ(values.size(), keys.size());
1473 for (unsigned int j = 0; j < 48; ++j) {
1474 int key = j + 32;
1475 std::string value;
1476 value.append("val_l2_" + std::to_string(key));
1477 if (key % 3 == 0) {
1478 value.append(",");
1479 value.append("val_l1_" + std::to_string(key));
1480 }
1481 if (key % 5 == 0) {
1482 value.append(",");
1483 value.append("val_l0_" + std::to_string(key));
1484 }
1485 if (key % 9 == 0) {
1486 value.append(",");
1487 value.append("val_mem_" + std::to_string(key));
1488 }
1489 ASSERT_EQ(values[j], value);
1490 }
1491 }
1492
1493 // Test class for batched MultiGet with prefix extractor
1494 // Param bool - If true, use partitioned filters
1495 // If false, use full filter block
1496 class MultiGetPrefixExtractorTest : public DBBasicTest,
1497 public ::testing::WithParamInterface<bool> {
1498 };
1499
1500 TEST_P(MultiGetPrefixExtractorTest, Batched) {
1501 Options options = CurrentOptions();
1502 options.prefix_extractor.reset(NewFixedPrefixTransform(2));
1503 options.memtable_prefix_bloom_size_ratio = 10;
1504 BlockBasedTableOptions bbto;
1505 if (GetParam()) {
1506 bbto.index_type = BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
1507 bbto.partition_filters = true;
1508 }
1509 bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
1510 bbto.whole_key_filtering = false;
1511 bbto.cache_index_and_filter_blocks = false;
1512 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
1513 Reopen(options);
1514
1515 SetPerfLevel(kEnableCount);
1516 get_perf_context()->Reset();
1517
1518 // First key is not in the prefix_extractor domain
1519 ASSERT_OK(Put("k", "v0"));
1520 ASSERT_OK(Put("kk1", "v1"));
1521 ASSERT_OK(Put("kk2", "v2"));
1522 ASSERT_OK(Put("kk3", "v3"));
1523 ASSERT_OK(Put("kk4", "v4"));
1524 std::vector<std::string> mem_keys(
1525 {"k", "kk1", "kk2", "kk3", "kk4", "rofl", "lmho"});
1526 std::vector<std::string> inmem_values;
1527 inmem_values = MultiGet(mem_keys, nullptr);
1528 ASSERT_EQ(inmem_values[0], "v0");
1529 ASSERT_EQ(inmem_values[1], "v1");
1530 ASSERT_EQ(inmem_values[2], "v2");
1531 ASSERT_EQ(inmem_values[3], "v3");
1532 ASSERT_EQ(inmem_values[4], "v4");
1533 ASSERT_EQ(get_perf_context()->bloom_memtable_miss_count, 2);
1534 ASSERT_EQ(get_perf_context()->bloom_memtable_hit_count, 5);
1535 ASSERT_OK(Flush());
1536
1537 std::vector<std::string> keys({"k", "kk1", "kk2", "kk3", "kk4"});
1538 std::vector<std::string> values;
1539 get_perf_context()->Reset();
1540 values = MultiGet(keys, nullptr);
1541 ASSERT_EQ(values[0], "v0");
1542 ASSERT_EQ(values[1], "v1");
1543 ASSERT_EQ(values[2], "v2");
1544 ASSERT_EQ(values[3], "v3");
1545 ASSERT_EQ(values[4], "v4");
1546 // Filter hits for 4 in-domain keys
1547 ASSERT_EQ(get_perf_context()->bloom_sst_hit_count, 4);
1548 }
1549
1550 INSTANTIATE_TEST_CASE_P(MultiGetPrefix, MultiGetPrefixExtractorTest,
1551 ::testing::Bool());
1552
1553 #ifndef ROCKSDB_LITE
1554 class DBMultiGetRowCacheTest : public DBBasicTest,
1555 public ::testing::WithParamInterface<bool> {};
1556
1557 TEST_P(DBMultiGetRowCacheTest, MultiGetBatched) {
1558 do {
1559 option_config_ = kRowCache;
1560 Options options = CurrentOptions();
1561 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
1562 CreateAndReopenWithCF({"pikachu"}, options);
1563 SetPerfLevel(kEnableCount);
1564 ASSERT_OK(Put(1, "k1", "v1"));
1565 ASSERT_OK(Put(1, "k2", "v2"));
1566 ASSERT_OK(Put(1, "k3", "v3"));
1567 ASSERT_OK(Put(1, "k4", "v4"));
1568 Flush(1);
1569 ASSERT_OK(Put(1, "k5", "v5"));
1570 const Snapshot* snap1 = dbfull()->GetSnapshot();
1571 ASSERT_OK(Delete(1, "k4"));
1572 Flush(1);
1573 const Snapshot* snap2 = dbfull()->GetSnapshot();
1574
1575 get_perf_context()->Reset();
1576
1577 std::vector<Slice> keys({"no_key", "k5", "k4", "k3", "k1"});
1578 std::vector<PinnableSlice> values(keys.size());
1579 std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
1580 std::vector<Status> s(keys.size());
1581
1582 ReadOptions ro;
1583 bool use_snapshots = GetParam();
1584 if (use_snapshots) {
1585 ro.snapshot = snap2;
1586 }
1587 db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(),
1588 s.data(), false);
1589
1590 ASSERT_EQ(values.size(), keys.size());
1591 ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v1");
1592 ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v3");
1593 ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
1594 // four kv pairs * two bytes per value
1595 ASSERT_EQ(6, (int)get_perf_context()->multiget_read_bytes);
1596
1597 ASSERT_TRUE(s[0].IsNotFound());
1598 ASSERT_OK(s[1]);
1599 ASSERT_TRUE(s[2].IsNotFound());
1600 ASSERT_OK(s[3]);
1601 ASSERT_OK(s[4]);
1602
1603 // Call MultiGet() again with some intersection with the previous set of
1604 // keys. Those should already be in the row cache.
1605 keys.assign({"no_key", "k5", "k3", "k2"});
1606 for (size_t i = 0; i < keys.size(); ++i) {
1607 values[i].Reset();
1608 s[i] = Status::OK();
1609 }
1610 get_perf_context()->Reset();
1611
1612 if (use_snapshots) {
1613 ro.snapshot = snap1;
1614 }
1615 db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(),
1616 values.data(), s.data(), false);
1617
1618 ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v2");
1619 ASSERT_EQ(std::string(values[2].data(), values[2].size()), "v3");
1620 ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
1621 // four kv pairs * two bytes per value
1622 ASSERT_EQ(6, (int)get_perf_context()->multiget_read_bytes);
1623
1624 ASSERT_TRUE(s[0].IsNotFound());
1625 ASSERT_OK(s[1]);
1626 ASSERT_OK(s[2]);
1627 ASSERT_OK(s[3]);
1628 if (use_snapshots) {
1629 // Only reads from the first SST file would have been cached, since
1630 // snapshot seq no is > fd.largest_seqno
1631 ASSERT_EQ(1, TestGetTickerCount(options, ROW_CACHE_HIT));
1632 } else {
1633 ASSERT_EQ(2, TestGetTickerCount(options, ROW_CACHE_HIT));
1634 }
1635
1636 SetPerfLevel(kDisable);
1637 dbfull()->ReleaseSnapshot(snap1);
1638 dbfull()->ReleaseSnapshot(snap2);
1639 } while (ChangeCompactOptions());
1640 }
1641
1642 INSTANTIATE_TEST_CASE_P(DBMultiGetRowCacheTest, DBMultiGetRowCacheTest,
1643 testing::Values(true, false));
1644
1645 TEST_F(DBBasicTest, GetAllKeyVersions) {
1646 Options options = CurrentOptions();
1647 options.env = env_;
1648 options.create_if_missing = true;
1649 options.disable_auto_compactions = true;
1650 CreateAndReopenWithCF({"pikachu"}, options);
1651 ASSERT_EQ(2, handles_.size());
1652 const size_t kNumInserts = 4;
1653 const size_t kNumDeletes = 4;
1654 const size_t kNumUpdates = 4;
1655
1656 // Check default column family
1657 for (size_t i = 0; i != kNumInserts; ++i) {
1658 ASSERT_OK(Put(std::to_string(i), "value"));
1659 }
1660 for (size_t i = 0; i != kNumUpdates; ++i) {
1661 ASSERT_OK(Put(std::to_string(i), "value1"));
1662 }
1663 for (size_t i = 0; i != kNumDeletes; ++i) {
1664 ASSERT_OK(Delete(std::to_string(i)));
1665 }
1666 std::vector<KeyVersion> key_versions;
1667 ASSERT_OK(ROCKSDB_NAMESPACE::GetAllKeyVersions(
1668 db_, Slice(), Slice(), std::numeric_limits<size_t>::max(),
1669 &key_versions));
1670 ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates, key_versions.size());
1671 ASSERT_OK(ROCKSDB_NAMESPACE::GetAllKeyVersions(
1672 db_, handles_[0], Slice(), Slice(), std::numeric_limits<size_t>::max(),
1673 &key_versions));
1674 ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates, key_versions.size());
1675
1676 // Check non-default column family
1677 for (size_t i = 0; i != kNumInserts - 1; ++i) {
1678 ASSERT_OK(Put(1, std::to_string(i), "value"));
1679 }
1680 for (size_t i = 0; i != kNumUpdates - 1; ++i) {
1681 ASSERT_OK(Put(1, std::to_string(i), "value1"));
1682 }
1683 for (size_t i = 0; i != kNumDeletes - 1; ++i) {
1684 ASSERT_OK(Delete(1, std::to_string(i)));
1685 }
1686 ASSERT_OK(ROCKSDB_NAMESPACE::GetAllKeyVersions(
1687 db_, handles_[1], Slice(), Slice(), std::numeric_limits<size_t>::max(),
1688 &key_versions));
1689 ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates - 3, key_versions.size());
1690 }
1691 #endif // !ROCKSDB_LITE
1692
1693 TEST_F(DBBasicTest, MultiGetIOBufferOverrun) {
1694 Options options = CurrentOptions();
1695 Random rnd(301);
1696 BlockBasedTableOptions table_options;
1697 table_options.pin_l0_filter_and_index_blocks_in_cache = true;
1698 table_options.block_size = 16 * 1024;
1699 assert(table_options.block_size >
1700 BlockBasedTable::kMultiGetReadStackBufSize);
1701 options.table_factory.reset(new BlockBasedTableFactory(table_options));
1702 Reopen(options);
1703
1704 std::string zero_str(128, '\0');
1705 for (int i = 0; i < 100; ++i) {
1706 // Make the value compressible. A purely random string doesn't compress
1707 // and the resultant data block will not be compressed
1708 std::string value(RandomString(&rnd, 128) + zero_str);
1709 assert(Put(Key(i), value) == Status::OK());
1710 }
1711 Flush();
1712
1713 std::vector<std::string> key_data(10);
1714 std::vector<Slice> keys;
1715 // We cannot resize a PinnableSlice vector, so just set initial size to
1716 // largest we think we will need
1717 std::vector<PinnableSlice> values(10);
1718 std::vector<Status> statuses;
1719 ReadOptions ro;
1720
1721 // Warm up the cache first
1722 key_data.emplace_back(Key(0));
1723 keys.emplace_back(Slice(key_data.back()));
1724 key_data.emplace_back(Key(50));
1725 keys.emplace_back(Slice(key_data.back()));
1726 statuses.resize(keys.size());
1727
1728 dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
1729 keys.data(), values.data(), statuses.data(), true);
1730 }
1731
1732 class DBBasicTestWithParallelIO
1733 : public DBTestBase,
1734 public testing::WithParamInterface<std::tuple<bool, bool, bool, bool>> {
1735 public:
1736 DBBasicTestWithParallelIO() : DBTestBase("/db_basic_test_with_parallel_io") {
1737 bool compressed_cache = std::get<0>(GetParam());
1738 bool uncompressed_cache = std::get<1>(GetParam());
1739 compression_enabled_ = std::get<2>(GetParam());
1740 fill_cache_ = std::get<3>(GetParam());
1741
1742 if (compressed_cache) {
1743 std::shared_ptr<Cache> cache = NewLRUCache(1048576);
1744 compressed_cache_ = std::make_shared<MyBlockCache>(cache);
1745 }
1746 if (uncompressed_cache) {
1747 std::shared_ptr<Cache> cache = NewLRUCache(1048576);
1748 uncompressed_cache_ = std::make_shared<MyBlockCache>(cache);
1749 }
1750
1751 env_->count_random_reads_ = true;
1752
1753 Options options = CurrentOptions();
1754 Random rnd(301);
1755 BlockBasedTableOptions table_options;
1756
1757 #ifndef ROCKSDB_LITE
1758 if (compression_enabled_) {
1759 std::vector<CompressionType> compression_types;
1760 compression_types = GetSupportedCompressions();
1761 // Not every platform may have compression libraries available, so
1762 // dynamically pick based on what's available
1763 if (compression_types.size() == 0) {
1764 compression_enabled_ = false;
1765 } else {
1766 options.compression = compression_types[0];
1767 }
1768 }
1769 #else
1770 // GetSupportedCompressions() is not available in LITE build
1771 if (!Snappy_Supported()) {
1772 compression_enabled_ = false;
1773 }
1774 #endif //ROCKSDB_LITE
1775
1776 table_options.block_cache = uncompressed_cache_;
1777 if (table_options.block_cache == nullptr) {
1778 table_options.no_block_cache = true;
1779 } else {
1780 table_options.pin_l0_filter_and_index_blocks_in_cache = true;
1781 }
1782 table_options.block_cache_compressed = compressed_cache_;
1783 table_options.flush_block_policy_factory.reset(
1784 new MyFlushBlockPolicyFactory());
1785 options.table_factory.reset(new BlockBasedTableFactory(table_options));
1786 if (!compression_enabled_) {
1787 options.compression = kNoCompression;
1788 }
1789 Reopen(options);
1790
1791 std::string zero_str(128, '\0');
1792 for (int i = 0; i < 100; ++i) {
1793 // Make the value compressible. A purely random string doesn't compress
1794 // and the resultant data block will not be compressed
1795 values_.emplace_back(RandomString(&rnd, 128) + zero_str);
1796 assert(Put(Key(i), values_[i]) == Status::OK());
1797 }
1798 Flush();
1799
1800 for (int i = 0; i < 100; ++i) {
1801 // block cannot gain space by compression
1802 uncompressable_values_.emplace_back(RandomString(&rnd, 256) + '\0');
1803 std::string tmp_key = "a" + Key(i);
1804 assert(Put(tmp_key, uncompressable_values_[i]) == Status::OK());
1805 }
1806 Flush();
1807 }
1808
1809 bool CheckValue(int i, const std::string& value) {
1810 if (values_[i].compare(value) == 0) {
1811 return true;
1812 }
1813 return false;
1814 }
1815
1816 bool CheckUncompressableValue(int i, const std::string& value) {
1817 if (uncompressable_values_[i].compare(value) == 0) {
1818 return true;
1819 }
1820 return false;
1821 }
1822
1823 int num_lookups() { return uncompressed_cache_->num_lookups(); }
1824 int num_found() { return uncompressed_cache_->num_found(); }
1825 int num_inserts() { return uncompressed_cache_->num_inserts(); }
1826
1827 int num_lookups_compressed() { return compressed_cache_->num_lookups(); }
1828 int num_found_compressed() { return compressed_cache_->num_found(); }
1829 int num_inserts_compressed() { return compressed_cache_->num_inserts(); }
1830
1831 bool fill_cache() { return fill_cache_; }
1832 bool compression_enabled() { return compression_enabled_; }
1833 bool has_compressed_cache() { return compressed_cache_ != nullptr; }
1834 bool has_uncompressed_cache() { return uncompressed_cache_ != nullptr; }
1835
1836 static void SetUpTestCase() {}
1837 static void TearDownTestCase() {}
1838
1839 private:
1840 class MyFlushBlockPolicyFactory : public FlushBlockPolicyFactory {
1841 public:
1842 MyFlushBlockPolicyFactory() {}
1843
1844 virtual const char* Name() const override {
1845 return "MyFlushBlockPolicyFactory";
1846 }
1847
1848 virtual FlushBlockPolicy* NewFlushBlockPolicy(
1849 const BlockBasedTableOptions& /*table_options*/,
1850 const BlockBuilder& data_block_builder) const override {
1851 return new MyFlushBlockPolicy(data_block_builder);
1852 }
1853 };
1854
1855 class MyFlushBlockPolicy : public FlushBlockPolicy {
1856 public:
1857 explicit MyFlushBlockPolicy(const BlockBuilder& data_block_builder)
1858 : num_keys_(0), data_block_builder_(data_block_builder) {}
1859
1860 bool Update(const Slice& /*key*/, const Slice& /*value*/) override {
1861 if (data_block_builder_.empty()) {
1862 // First key in this block
1863 num_keys_ = 1;
1864 return false;
1865 }
1866 // Flush every 10 keys
1867 if (num_keys_ == 10) {
1868 num_keys_ = 1;
1869 return true;
1870 }
1871 num_keys_++;
1872 return false;
1873 }
1874
1875 private:
1876 int num_keys_;
1877 const BlockBuilder& data_block_builder_;
1878 };
1879
1880 class MyBlockCache : public Cache {
1881 public:
1882 explicit MyBlockCache(std::shared_ptr<Cache>& target)
1883 : target_(target), num_lookups_(0), num_found_(0), num_inserts_(0) {}
1884
1885 virtual const char* Name() const override { return "MyBlockCache"; }
1886
1887 virtual Status Insert(const Slice& key, void* value, size_t charge,
1888 void (*deleter)(const Slice& key, void* value),
1889 Handle** handle = nullptr,
1890 Priority priority = Priority::LOW) override {
1891 num_inserts_++;
1892 return target_->Insert(key, value, charge, deleter, handle, priority);
1893 }
1894
1895 virtual Handle* Lookup(const Slice& key,
1896 Statistics* stats = nullptr) override {
1897 num_lookups_++;
1898 Handle* handle = target_->Lookup(key, stats);
1899 if (handle != nullptr) {
1900 num_found_++;
1901 }
1902 return handle;
1903 }
1904
1905 virtual bool Ref(Handle* handle) override { return target_->Ref(handle); }
1906
1907 virtual bool Release(Handle* handle, bool force_erase = false) override {
1908 return target_->Release(handle, force_erase);
1909 }
1910
1911 virtual void* Value(Handle* handle) override {
1912 return target_->Value(handle);
1913 }
1914
1915 virtual void Erase(const Slice& key) override { target_->Erase(key); }
1916 virtual uint64_t NewId() override { return target_->NewId(); }
1917
1918 virtual void SetCapacity(size_t capacity) override {
1919 target_->SetCapacity(capacity);
1920 }
1921
1922 virtual void SetStrictCapacityLimit(bool strict_capacity_limit) override {
1923 target_->SetStrictCapacityLimit(strict_capacity_limit);
1924 }
1925
1926 virtual bool HasStrictCapacityLimit() const override {
1927 return target_->HasStrictCapacityLimit();
1928 }
1929
1930 virtual size_t GetCapacity() const override {
1931 return target_->GetCapacity();
1932 }
1933
1934 virtual size_t GetUsage() const override { return target_->GetUsage(); }
1935
1936 virtual size_t GetUsage(Handle* handle) const override {
1937 return target_->GetUsage(handle);
1938 }
1939
1940 virtual size_t GetPinnedUsage() const override {
1941 return target_->GetPinnedUsage();
1942 }
1943
1944 virtual size_t GetCharge(Handle* /*handle*/) const override { return 0; }
1945
1946 virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t),
1947 bool thread_safe) override {
1948 return target_->ApplyToAllCacheEntries(callback, thread_safe);
1949 }
1950
1951 virtual void EraseUnRefEntries() override {
1952 return target_->EraseUnRefEntries();
1953 }
1954
1955 int num_lookups() { return num_lookups_; }
1956
1957 int num_found() { return num_found_; }
1958
1959 int num_inserts() { return num_inserts_; }
1960
1961 private:
1962 std::shared_ptr<Cache> target_;
1963 int num_lookups_;
1964 int num_found_;
1965 int num_inserts_;
1966 };
1967
1968 std::shared_ptr<MyBlockCache> compressed_cache_;
1969 std::shared_ptr<MyBlockCache> uncompressed_cache_;
1970 bool compression_enabled_;
1971 std::vector<std::string> values_;
1972 std::vector<std::string> uncompressable_values_;
1973 bool fill_cache_;
1974 };
1975
1976 TEST_P(DBBasicTestWithParallelIO, MultiGet) {
1977 std::vector<std::string> key_data(10);
1978 std::vector<Slice> keys;
1979 // We cannot resize a PinnableSlice vector, so just set initial size to
1980 // largest we think we will need
1981 std::vector<PinnableSlice> values(10);
1982 std::vector<Status> statuses;
1983 ReadOptions ro;
1984 ro.fill_cache = fill_cache();
1985
1986 // Warm up the cache first
1987 key_data.emplace_back(Key(0));
1988 keys.emplace_back(Slice(key_data.back()));
1989 key_data.emplace_back(Key(50));
1990 keys.emplace_back(Slice(key_data.back()));
1991 statuses.resize(keys.size());
1992
1993 dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
1994 keys.data(), values.data(), statuses.data(), true);
1995 ASSERT_TRUE(CheckValue(0, values[0].ToString()));
1996 ASSERT_TRUE(CheckValue(50, values[1].ToString()));
1997
1998 int random_reads = env_->random_read_counter_.Read();
1999 key_data[0] = Key(1);
2000 key_data[1] = Key(51);
2001 keys[0] = Slice(key_data[0]);
2002 keys[1] = Slice(key_data[1]);
2003 values[0].Reset();
2004 values[1].Reset();
2005 dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2006 keys.data(), values.data(), statuses.data(), true);
2007 ASSERT_TRUE(CheckValue(1, values[0].ToString()));
2008 ASSERT_TRUE(CheckValue(51, values[1].ToString()));
2009
2010 bool read_from_cache = false;
2011 if (fill_cache()) {
2012 if (has_uncompressed_cache()) {
2013 read_from_cache = true;
2014 } else if (has_compressed_cache() && compression_enabled()) {
2015 read_from_cache = true;
2016 }
2017 }
2018
2019 int expected_reads = random_reads + (read_from_cache ? 0 : 2);
2020 ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
2021
2022 keys.resize(10);
2023 statuses.resize(10);
2024 std::vector<int> key_ints{1, 2, 15, 16, 55, 81, 82, 83, 84, 85};
2025 for (size_t i = 0; i < key_ints.size(); ++i) {
2026 key_data[i] = Key(key_ints[i]);
2027 keys[i] = Slice(key_data[i]);
2028 statuses[i] = Status::OK();
2029 values[i].Reset();
2030 }
2031 dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2032 keys.data(), values.data(), statuses.data(), true);
2033 for (size_t i = 0; i < key_ints.size(); ++i) {
2034 ASSERT_OK(statuses[i]);
2035 ASSERT_TRUE(CheckValue(key_ints[i], values[i].ToString()));
2036 }
2037 if (compression_enabled() && !has_compressed_cache()) {
2038 expected_reads += (read_from_cache ? 2 : 3);
2039 } else {
2040 expected_reads += (read_from_cache ? 2 : 4);
2041 }
2042 ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
2043
2044 keys.resize(10);
2045 statuses.resize(10);
2046 std::vector<int> key_uncmp{1, 2, 15, 16, 55, 81, 82, 83, 84, 85};
2047 for (size_t i = 0; i < key_uncmp.size(); ++i) {
2048 key_data[i] = "a" + Key(key_uncmp[i]);
2049 keys[i] = Slice(key_data[i]);
2050 statuses[i] = Status::OK();
2051 values[i].Reset();
2052 }
2053 dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2054 keys.data(), values.data(), statuses.data(), true);
2055 for (size_t i = 0; i < key_uncmp.size(); ++i) {
2056 ASSERT_OK(statuses[i]);
2057 ASSERT_TRUE(CheckUncompressableValue(key_uncmp[i], values[i].ToString()));
2058 }
2059 if (compression_enabled() && !has_compressed_cache()) {
2060 expected_reads += (read_from_cache ? 3 : 3);
2061 } else {
2062 expected_reads += (read_from_cache ? 4 : 4);
2063 }
2064 ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
2065
2066 keys.resize(5);
2067 statuses.resize(5);
2068 std::vector<int> key_tr{1, 2, 15, 16, 55};
2069 for (size_t i = 0; i < key_tr.size(); ++i) {
2070 key_data[i] = "a" + Key(key_tr[i]);
2071 keys[i] = Slice(key_data[i]);
2072 statuses[i] = Status::OK();
2073 values[i].Reset();
2074 }
2075 dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2076 keys.data(), values.data(), statuses.data(), true);
2077 for (size_t i = 0; i < key_tr.size(); ++i) {
2078 ASSERT_OK(statuses[i]);
2079 ASSERT_TRUE(CheckUncompressableValue(key_tr[i], values[i].ToString()));
2080 }
2081 if (compression_enabled() && !has_compressed_cache()) {
2082 expected_reads += (read_from_cache ? 0 : 2);
2083 ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
2084 } else {
2085 if (has_uncompressed_cache()) {
2086 expected_reads += (read_from_cache ? 0 : 3);
2087 ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
2088 } else {
2089 // A rare case, even we enable the block compression but some of data
2090 // blocks are not compressed due to content. If user only enable the
2091 // compressed cache, the uncompressed blocks will not tbe cached, and
2092 // block reads will be triggered. The number of reads is related to
2093 // the compression algorithm.
2094 ASSERT_TRUE(env_->random_read_counter_.Read() >= expected_reads);
2095 }
2096 }
2097 }
2098
2099 TEST_P(DBBasicTestWithParallelIO, MultiGetWithChecksumMismatch) {
2100 std::vector<std::string> key_data(10);
2101 std::vector<Slice> keys;
2102 // We cannot resize a PinnableSlice vector, so just set initial size to
2103 // largest we think we will need
2104 std::vector<PinnableSlice> values(10);
2105 std::vector<Status> statuses;
2106 int read_count = 0;
2107 ReadOptions ro;
2108 ro.fill_cache = fill_cache();
2109
2110 SyncPoint::GetInstance()->SetCallBack(
2111 "RetrieveMultipleBlocks:VerifyChecksum", [&](void *status) {
2112 Status* s = static_cast<Status*>(status);
2113 read_count++;
2114 if (read_count == 2) {
2115 *s = Status::Corruption();
2116 }
2117 });
2118 SyncPoint::GetInstance()->EnableProcessing();
2119
2120 // Warm up the cache first
2121 key_data.emplace_back(Key(0));
2122 keys.emplace_back(Slice(key_data.back()));
2123 key_data.emplace_back(Key(50));
2124 keys.emplace_back(Slice(key_data.back()));
2125 statuses.resize(keys.size());
2126
2127 dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2128 keys.data(), values.data(), statuses.data(), true);
2129 ASSERT_TRUE(CheckValue(0, values[0].ToString()));
2130 //ASSERT_TRUE(CheckValue(50, values[1].ToString()));
2131 ASSERT_EQ(statuses[0], Status::OK());
2132 ASSERT_EQ(statuses[1], Status::Corruption());
2133
2134 SyncPoint::GetInstance()->DisableProcessing();
2135 }
2136
2137 TEST_P(DBBasicTestWithParallelIO, MultiGetWithMissingFile) {
2138 std::vector<std::string> key_data(10);
2139 std::vector<Slice> keys;
2140 // We cannot resize a PinnableSlice vector, so just set initial size to
2141 // largest we think we will need
2142 std::vector<PinnableSlice> values(10);
2143 std::vector<Status> statuses;
2144 ReadOptions ro;
2145 ro.fill_cache = fill_cache();
2146
2147 SyncPoint::GetInstance()->SetCallBack(
2148 "TableCache::MultiGet:FindTable", [&](void *status) {
2149 Status* s = static_cast<Status*>(status);
2150 *s = Status::IOError();
2151 });
2152 // DB open will create table readers unless we reduce the table cache
2153 // capacity.
2154 // SanitizeOptions will set max_open_files to minimum of 20. Table cache
2155 // is allocated with max_open_files - 10 as capacity. So override
2156 // max_open_files to 11 so table cache capacity will become 1. This will
2157 // prevent file open during DB open and force the file to be opened
2158 // during MultiGet
2159 SyncPoint::GetInstance()->SetCallBack(
2160 "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void *arg) {
2161 int* max_open_files = (int*)arg;
2162 *max_open_files = 11;
2163 });
2164 SyncPoint::GetInstance()->EnableProcessing();
2165
2166 Reopen(CurrentOptions());
2167
2168 // Warm up the cache first
2169 key_data.emplace_back(Key(0));
2170 keys.emplace_back(Slice(key_data.back()));
2171 key_data.emplace_back(Key(50));
2172 keys.emplace_back(Slice(key_data.back()));
2173 statuses.resize(keys.size());
2174
2175 dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2176 keys.data(), values.data(), statuses.data(), true);
2177 ASSERT_EQ(statuses[0], Status::IOError());
2178 ASSERT_EQ(statuses[1], Status::IOError());
2179
2180 SyncPoint::GetInstance()->DisableProcessing();
2181 }
2182
2183 INSTANTIATE_TEST_CASE_P(
2184 ParallelIO, DBBasicTestWithParallelIO,
2185 // Params are as follows -
2186 // Param 0 - Compressed cache enabled
2187 // Param 1 - Uncompressed cache enabled
2188 // Param 2 - Data compression enabled
2189 // Param 3 - ReadOptions::fill_cache
2190 ::testing::Combine(::testing::Bool(), ::testing::Bool(),
2191 ::testing::Bool(), ::testing::Bool()));
2192
2193 class DBBasicTestWithTimestampBase : public DBTestBase {
2194 public:
2195 explicit DBBasicTestWithTimestampBase(const std::string& dbname)
2196 : DBTestBase(dbname) {}
2197
2198 protected:
2199 class TestComparatorBase : public Comparator {
2200 public:
2201 explicit TestComparatorBase(size_t ts_sz) : Comparator(ts_sz) {}
2202
2203 const char* Name() const override { return "TestComparator"; }
2204
2205 void FindShortSuccessor(std::string*) const override {}
2206
2207 void FindShortestSeparator(std::string*, const Slice&) const override {}
2208
2209 int Compare(const Slice& a, const Slice& b) const override {
2210 int r = CompareWithoutTimestamp(a, b);
2211 if (r != 0 || 0 == timestamp_size()) {
2212 return r;
2213 }
2214 return CompareTimestamp(
2215 Slice(a.data() + a.size() - timestamp_size(), timestamp_size()),
2216 Slice(b.data() + b.size() - timestamp_size(), timestamp_size()));
2217 }
2218
2219 virtual int CompareImpl(const Slice& a, const Slice& b) const = 0;
2220
2221 int CompareWithoutTimestamp(const Slice& a, const Slice& b) const override {
2222 assert(a.size() >= timestamp_size());
2223 assert(b.size() >= timestamp_size());
2224 Slice k1 = StripTimestampFromUserKey(a, timestamp_size());
2225 Slice k2 = StripTimestampFromUserKey(b, timestamp_size());
2226
2227 return CompareImpl(k1, k2);
2228 }
2229
2230 int CompareTimestamp(const Slice& ts1, const Slice& ts2) const override {
2231 if (!ts1.data() && !ts2.data()) {
2232 return 0;
2233 } else if (ts1.data() && !ts2.data()) {
2234 return 1;
2235 } else if (!ts1.data() && ts2.data()) {
2236 return -1;
2237 }
2238 assert(ts1.size() == ts2.size());
2239 uint64_t low1 = 0;
2240 uint64_t low2 = 0;
2241 uint64_t high1 = 0;
2242 uint64_t high2 = 0;
2243 auto* ptr1 = const_cast<Slice*>(&ts1);
2244 auto* ptr2 = const_cast<Slice*>(&ts2);
2245 if (!GetFixed64(ptr1, &low1) || !GetFixed64(ptr1, &high1) ||
2246 !GetFixed64(ptr2, &low2) || !GetFixed64(ptr2, &high2)) {
2247 assert(false);
2248 }
2249 if (high1 < high2) {
2250 return 1;
2251 } else if (high1 > high2) {
2252 return -1;
2253 }
2254 if (low1 < low2) {
2255 return 1;
2256 } else if (low1 > low2) {
2257 return -1;
2258 }
2259 return 0;
2260 }
2261 };
2262
2263 Slice EncodeTimestamp(uint64_t low, uint64_t high, std::string* ts) {
2264 assert(nullptr != ts);
2265 ts->clear();
2266 PutFixed64(ts, low);
2267 PutFixed64(ts, high);
2268 assert(ts->size() == sizeof(low) + sizeof(high));
2269 return Slice(*ts);
2270 }
2271 };
2272
2273 class DBBasicTestWithTimestamp : public DBBasicTestWithTimestampBase {
2274 public:
2275 DBBasicTestWithTimestamp()
2276 : DBBasicTestWithTimestampBase("/db_basic_test_with_timestamp") {}
2277
2278 protected:
2279 class TestComparator : public TestComparatorBase {
2280 public:
2281 const int kKeyPrefixLength =
2282 3; // 3: length of "key" in generated keys ("key" + std::to_string(j))
2283 explicit TestComparator(size_t ts_sz) : TestComparatorBase(ts_sz) {}
2284
2285 int CompareImpl(const Slice& a, const Slice& b) const override {
2286 int n1 = atoi(
2287 std::string(a.data() + kKeyPrefixLength, a.size() - kKeyPrefixLength)
2288 .c_str());
2289 int n2 = atoi(
2290 std::string(b.data() + kKeyPrefixLength, b.size() - kKeyPrefixLength)
2291 .c_str());
2292 return (n1 < n2) ? -1 : (n1 > n2) ? 1 : 0;
2293 }
2294 };
2295 };
2296
2297 #ifndef ROCKSDB_LITE
2298 // A class which remembers the name of each flushed file.
2299 class FlushedFileCollector : public EventListener {
2300 public:
2301 FlushedFileCollector() {}
2302 ~FlushedFileCollector() override {}
2303
2304 void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
2305 InstrumentedMutexLock lock(&mutex_);
2306 flushed_files_.push_back(info.file_path);
2307 }
2308
2309 std::vector<std::string> GetFlushedFiles() {
2310 std::vector<std::string> result;
2311 {
2312 InstrumentedMutexLock lock(&mutex_);
2313 result = flushed_files_;
2314 }
2315 return result;
2316 }
2317
2318 void ClearFlushedFiles() {
2319 InstrumentedMutexLock lock(&mutex_);
2320 flushed_files_.clear();
2321 }
2322
2323 private:
2324 std::vector<std::string> flushed_files_;
2325 InstrumentedMutex mutex_;
2326 };
2327
2328 TEST_F(DBBasicTestWithTimestamp, PutAndGetWithCompaction) {
2329 const int kNumKeysPerFile = 8192;
2330 const size_t kNumTimestamps = 2;
2331 const size_t kNumKeysPerTimestamp = (kNumKeysPerFile - 1) / kNumTimestamps;
2332 const size_t kSplitPosBase = kNumKeysPerTimestamp / 2;
2333 Options options = CurrentOptions();
2334 options.create_if_missing = true;
2335 options.env = env_;
2336 options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
2337
2338 FlushedFileCollector* collector = new FlushedFileCollector();
2339 options.listeners.emplace_back(collector);
2340
2341 std::string tmp;
2342 size_t ts_sz = EncodeTimestamp(0, 0, &tmp).size();
2343 TestComparator test_cmp(ts_sz);
2344 options.comparator = &test_cmp;
2345 BlockBasedTableOptions bbto;
2346 bbto.filter_policy.reset(NewBloomFilterPolicy(
2347 10 /*bits_per_key*/, false /*use_block_based_builder*/));
2348 bbto.whole_key_filtering = true;
2349 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
2350 DestroyAndReopen(options);
2351 CreateAndReopenWithCF({"pikachu"}, options);
2352 size_t num_cfs = handles_.size();
2353 ASSERT_EQ(2, num_cfs);
2354 std::vector<std::string> write_ts_strs(kNumTimestamps);
2355 std::vector<std::string> read_ts_strs(kNumTimestamps);
2356 std::vector<Slice> write_ts_list;
2357 std::vector<Slice> read_ts_list;
2358
2359 for (size_t i = 0; i != kNumTimestamps; ++i) {
2360 write_ts_list.emplace_back(EncodeTimestamp(i * 2, 0, &write_ts_strs[i]));
2361 read_ts_list.emplace_back(EncodeTimestamp(1 + i * 2, 0, &read_ts_strs[i]));
2362 const Slice& write_ts = write_ts_list.back();
2363 WriteOptions wopts;
2364 wopts.timestamp = &write_ts;
2365 for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
2366 for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
2367 ASSERT_OK(Put(cf, "key" + std::to_string(j),
2368 "value_" + std::to_string(j) + "_" + std::to_string(i),
2369 wopts));
2370 if (j == kSplitPosBase + i || j == kNumKeysPerTimestamp - 1) {
2371 // flush all keys with the same timestamp to two sst files, split at
2372 // incremental positions such that lowerlevel[1].smallest.userkey ==
2373 // higherlevel[0].largest.userkey
2374 ASSERT_OK(Flush(cf));
2375
2376 // compact files (2 at each level) to a lower level such that all keys
2377 // with the same timestamp is at one level, with newer versions at
2378 // higher levels.
2379 CompactionOptions compact_opt;
2380 compact_opt.compression = kNoCompression;
2381 db_->CompactFiles(compact_opt, handles_[cf],
2382 collector->GetFlushedFiles(),
2383 static_cast<int>(kNumTimestamps - i));
2384 collector->ClearFlushedFiles();
2385 }
2386 }
2387 }
2388 }
2389 const auto& verify_db_func = [&]() {
2390 for (size_t i = 0; i != kNumTimestamps; ++i) {
2391 ReadOptions ropts;
2392 ropts.timestamp = &read_ts_list[i];
2393 for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
2394 ColumnFamilyHandle* cfh = handles_[cf];
2395 for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
2396 std::string value;
2397 ASSERT_OK(db_->Get(ropts, cfh, "key" + std::to_string(j), &value));
2398 ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i),
2399 value);
2400 }
2401 }
2402 }
2403 };
2404 verify_db_func();
2405 }
2406 #endif // !ROCKSDB_LITE
2407
2408 class DBBasicTestWithTimestampWithParam
2409 : public DBBasicTestWithTimestampBase,
2410 public testing::WithParamInterface<bool> {
2411 public:
2412 DBBasicTestWithTimestampWithParam()
2413 : DBBasicTestWithTimestampBase(
2414 "/db_basic_test_with_timestamp_with_param") {}
2415
2416 protected:
2417 class TestComparator : public TestComparatorBase {
2418 private:
2419 const Comparator* cmp_without_ts_;
2420
2421 public:
2422 explicit TestComparator(size_t ts_sz)
2423 : TestComparatorBase(ts_sz), cmp_without_ts_(nullptr) {
2424 cmp_without_ts_ = BytewiseComparator();
2425 }
2426
2427 int CompareImpl(const Slice& a, const Slice& b) const override {
2428 return cmp_without_ts_->Compare(a, b);
2429 }
2430 };
2431 };
2432
2433 TEST_P(DBBasicTestWithTimestampWithParam, PutAndGet) {
2434 const int kNumKeysPerFile = 8192;
2435 const size_t kNumTimestamps = 6;
2436 bool memtable_only = GetParam();
2437 Options options = CurrentOptions();
2438 options.create_if_missing = true;
2439 options.env = env_;
2440 options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
2441 std::string tmp;
2442 size_t ts_sz = EncodeTimestamp(0, 0, &tmp).size();
2443 TestComparator test_cmp(ts_sz);
2444 options.comparator = &test_cmp;
2445 BlockBasedTableOptions bbto;
2446 bbto.filter_policy.reset(NewBloomFilterPolicy(
2447 10 /*bits_per_key*/, false /*use_block_based_builder*/));
2448 bbto.whole_key_filtering = true;
2449 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
2450
2451 std::vector<CompressionType> compression_types;
2452 compression_types.push_back(kNoCompression);
2453 if (Zlib_Supported()) {
2454 compression_types.push_back(kZlibCompression);
2455 }
2456 #if LZ4_VERSION_NUMBER >= 10400 // r124+
2457 compression_types.push_back(kLZ4Compression);
2458 compression_types.push_back(kLZ4HCCompression);
2459 #endif // LZ4_VERSION_NUMBER >= 10400
2460 if (ZSTD_Supported()) {
2461 compression_types.push_back(kZSTD);
2462 }
2463
2464 // Switch compression dictionary on/off to check key extraction
2465 // correctness in kBuffered state
2466 std::vector<uint32_t> max_dict_bytes_list = {0, 1 << 14}; // 0 or 16KB
2467
2468 for (auto compression_type : compression_types) {
2469 for (uint32_t max_dict_bytes : max_dict_bytes_list) {
2470 options.compression = compression_type;
2471 options.compression_opts.max_dict_bytes = max_dict_bytes;
2472 if (compression_type == kZSTD) {
2473 options.compression_opts.zstd_max_train_bytes = max_dict_bytes;
2474 }
2475 options.target_file_size_base = 1 << 26; // 64MB
2476
2477 DestroyAndReopen(options);
2478 CreateAndReopenWithCF({"pikachu"}, options);
2479 size_t num_cfs = handles_.size();
2480 ASSERT_EQ(2, num_cfs);
2481 std::vector<std::string> write_ts_strs(kNumTimestamps);
2482 std::vector<std::string> read_ts_strs(kNumTimestamps);
2483 std::vector<Slice> write_ts_list;
2484 std::vector<Slice> read_ts_list;
2485
2486 for (size_t i = 0; i != kNumTimestamps; ++i) {
2487 write_ts_list.emplace_back(
2488 EncodeTimestamp(i * 2, 0, &write_ts_strs[i]));
2489 read_ts_list.emplace_back(
2490 EncodeTimestamp(1 + i * 2, 0, &read_ts_strs[i]));
2491 const Slice& write_ts = write_ts_list.back();
2492 WriteOptions wopts;
2493 wopts.timestamp = &write_ts;
2494 for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
2495 for (size_t j = 0; j != (kNumKeysPerFile - 1) / kNumTimestamps; ++j) {
2496 ASSERT_OK(Put(
2497 cf, "key" + std::to_string(j),
2498 "value_" + std::to_string(j) + "_" + std::to_string(i), wopts));
2499 }
2500 if (!memtable_only) {
2501 ASSERT_OK(Flush(cf));
2502 }
2503 }
2504 }
2505 const auto& verify_db_func = [&]() {
2506 for (size_t i = 0; i != kNumTimestamps; ++i) {
2507 ReadOptions ropts;
2508 ropts.timestamp = &read_ts_list[i];
2509 for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
2510 ColumnFamilyHandle* cfh = handles_[cf];
2511 for (size_t j = 0; j != (kNumKeysPerFile - 1) / kNumTimestamps;
2512 ++j) {
2513 std::string value;
2514 ASSERT_OK(
2515 db_->Get(ropts, cfh, "key" + std::to_string(j), &value));
2516 ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i),
2517 value);
2518 }
2519 }
2520 }
2521 };
2522 verify_db_func();
2523 }
2524 }
2525 }
2526
2527 INSTANTIATE_TEST_CASE_P(Timestamp, DBBasicTestWithTimestampWithParam,
2528 ::testing::Bool());
2529
2530 } // namespace ROCKSDB_NAMESPACE
2531
2532 #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
2533 extern "C" {
2534 void RegisterCustomObjects(int argc, char** argv);
2535 }
2536 #else
2537 void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
2538 #endif // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
2539
2540 int main(int argc, char** argv) {
2541 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
2542 ::testing::InitGoogleTest(&argc, argv);
2543 RegisterCustomObjects(argc, argv);
2544 return RUN_ALL_TESTS();
2545 }