]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/db_basic_test.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / db / db_basic_test.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
494da23a 9// #include <iostream>
7c673cae
FG
10#include "db/db_test_util.h"
11#include "port/stack_trace.h"
12#include "rocksdb/perf_context.h"
11fdf7f2 13#include "util/fault_injection_test_env.h"
7c673cae
FG
14#if !defined(ROCKSDB_LITE)
15#include "util/sync_point.h"
16#endif
17
18namespace rocksdb {
19
20class DBBasicTest : public DBTestBase {
21 public:
22 DBBasicTest() : DBTestBase("/db_basic_test") {}
23};
24
25TEST_F(DBBasicTest, OpenWhenOpen) {
26 Options options = CurrentOptions();
27 options.env = env_;
28 rocksdb::DB* db2 = nullptr;
29 rocksdb::Status s = DB::Open(options, dbname_, &db2);
30
31 ASSERT_EQ(Status::Code::kIOError, s.code());
32 ASSERT_EQ(Status::SubCode::kNone, s.subcode());
33 ASSERT_TRUE(strstr(s.getState(), "lock ") != nullptr);
34
35 delete db2;
36}
37
38#ifndef ROCKSDB_LITE
39TEST_F(DBBasicTest, ReadOnlyDB) {
40 ASSERT_OK(Put("foo", "v1"));
41 ASSERT_OK(Put("bar", "v2"));
42 ASSERT_OK(Put("foo", "v3"));
43 Close();
44
45 auto options = CurrentOptions();
11fdf7f2 46 assert(options.env == env_);
7c673cae
FG
47 ASSERT_OK(ReadOnlyReopen(options));
48 ASSERT_EQ("v3", Get("foo"));
49 ASSERT_EQ("v2", Get("bar"));
50 Iterator* iter = db_->NewIterator(ReadOptions());
51 int count = 0;
52 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
53 ASSERT_OK(iter->status());
54 ++count;
55 }
56 ASSERT_EQ(count, 2);
57 delete iter;
58 Close();
59
60 // Reopen and flush memtable.
61 Reopen(options);
62 Flush();
63 Close();
64 // Now check keys in read only mode.
65 ASSERT_OK(ReadOnlyReopen(options));
66 ASSERT_EQ("v3", Get("foo"));
67 ASSERT_EQ("v2", Get("bar"));
68 ASSERT_TRUE(db_->SyncWAL().IsNotSupported());
69}
70
71TEST_F(DBBasicTest, CompactedDB) {
72 const uint64_t kFileSize = 1 << 20;
73 Options options = CurrentOptions();
74 options.disable_auto_compactions = true;
75 options.write_buffer_size = kFileSize;
76 options.target_file_size_base = kFileSize;
77 options.max_bytes_for_level_base = 1 << 30;
78 options.compression = kNoCompression;
79 Reopen(options);
80 // 1 L0 file, use CompactedDB if max_open_files = -1
81 ASSERT_OK(Put("aaa", DummyString(kFileSize / 2, '1')));
82 Flush();
83 Close();
84 ASSERT_OK(ReadOnlyReopen(options));
85 Status s = Put("new", "value");
86 ASSERT_EQ(s.ToString(),
87 "Not implemented: Not supported operation in read only mode.");
88 ASSERT_EQ(DummyString(kFileSize / 2, '1'), Get("aaa"));
89 Close();
90 options.max_open_files = -1;
91 ASSERT_OK(ReadOnlyReopen(options));
92 s = Put("new", "value");
93 ASSERT_EQ(s.ToString(),
94 "Not implemented: Not supported in compacted db mode.");
95 ASSERT_EQ(DummyString(kFileSize / 2, '1'), Get("aaa"));
96 Close();
97 Reopen(options);
98 // Add more L0 files
99 ASSERT_OK(Put("bbb", DummyString(kFileSize / 2, '2')));
100 Flush();
101 ASSERT_OK(Put("aaa", DummyString(kFileSize / 2, 'a')));
102 Flush();
103 ASSERT_OK(Put("bbb", DummyString(kFileSize / 2, 'b')));
104 ASSERT_OK(Put("eee", DummyString(kFileSize / 2, 'e')));
105 Flush();
106 Close();
107
108 ASSERT_OK(ReadOnlyReopen(options));
109 // Fallback to read-only DB
110 s = Put("new", "value");
111 ASSERT_EQ(s.ToString(),
112 "Not implemented: Not supported operation in read only mode.");
113 Close();
114
115 // Full compaction
116 Reopen(options);
117 // Add more keys
118 ASSERT_OK(Put("fff", DummyString(kFileSize / 2, 'f')));
119 ASSERT_OK(Put("hhh", DummyString(kFileSize / 2, 'h')));
120 ASSERT_OK(Put("iii", DummyString(kFileSize / 2, 'i')));
121 ASSERT_OK(Put("jjj", DummyString(kFileSize / 2, 'j')));
122 db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
123 ASSERT_EQ(3, NumTableFilesAtLevel(1));
124 Close();
125
126 // CompactedDB
127 ASSERT_OK(ReadOnlyReopen(options));
128 s = Put("new", "value");
129 ASSERT_EQ(s.ToString(),
130 "Not implemented: Not supported in compacted db mode.");
131 ASSERT_EQ("NOT_FOUND", Get("abc"));
132 ASSERT_EQ(DummyString(kFileSize / 2, 'a'), Get("aaa"));
133 ASSERT_EQ(DummyString(kFileSize / 2, 'b'), Get("bbb"));
134 ASSERT_EQ("NOT_FOUND", Get("ccc"));
135 ASSERT_EQ(DummyString(kFileSize / 2, 'e'), Get("eee"));
136 ASSERT_EQ(DummyString(kFileSize / 2, 'f'), Get("fff"));
137 ASSERT_EQ("NOT_FOUND", Get("ggg"));
138 ASSERT_EQ(DummyString(kFileSize / 2, 'h'), Get("hhh"));
139 ASSERT_EQ(DummyString(kFileSize / 2, 'i'), Get("iii"));
140 ASSERT_EQ(DummyString(kFileSize / 2, 'j'), Get("jjj"));
141 ASSERT_EQ("NOT_FOUND", Get("kkk"));
142
143 // MultiGet
144 std::vector<std::string> values;
145 std::vector<Status> status_list = dbfull()->MultiGet(
146 ReadOptions(),
147 std::vector<Slice>({Slice("aaa"), Slice("ccc"), Slice("eee"),
148 Slice("ggg"), Slice("iii"), Slice("kkk")}),
149 &values);
150 ASSERT_EQ(status_list.size(), static_cast<uint64_t>(6));
151 ASSERT_EQ(values.size(), static_cast<uint64_t>(6));
152 ASSERT_OK(status_list[0]);
153 ASSERT_EQ(DummyString(kFileSize / 2, 'a'), values[0]);
154 ASSERT_TRUE(status_list[1].IsNotFound());
155 ASSERT_OK(status_list[2]);
156 ASSERT_EQ(DummyString(kFileSize / 2, 'e'), values[2]);
157 ASSERT_TRUE(status_list[3].IsNotFound());
158 ASSERT_OK(status_list[4]);
159 ASSERT_EQ(DummyString(kFileSize / 2, 'i'), values[4]);
160 ASSERT_TRUE(status_list[5].IsNotFound());
161
162 Reopen(options);
163 // Add a key
164 ASSERT_OK(Put("fff", DummyString(kFileSize / 2, 'f')));
165 Close();
166 ASSERT_OK(ReadOnlyReopen(options));
167 s = Put("new", "value");
168 ASSERT_EQ(s.ToString(),
169 "Not implemented: Not supported operation in read only mode.");
170}
171
172TEST_F(DBBasicTest, LevelLimitReopen) {
173 Options options = CurrentOptions();
174 CreateAndReopenWithCF({"pikachu"}, options);
175
176 const std::string value(1024 * 1024, ' ');
177 int i = 0;
178 while (NumTableFilesAtLevel(2, 1) == 0) {
179 ASSERT_OK(Put(1, Key(i++), value));
11fdf7f2
TL
180 dbfull()->TEST_WaitForFlushMemTable();
181 dbfull()->TEST_WaitForCompact();
7c673cae
FG
182 }
183
184 options.num_levels = 1;
185 options.max_bytes_for_level_multiplier_additional.resize(1, 1);
186 Status s = TryReopenWithColumnFamilies({"default", "pikachu"}, options);
187 ASSERT_EQ(s.IsInvalidArgument(), true);
188 ASSERT_EQ(s.ToString(),
189 "Invalid argument: db has more levels than options.num_levels");
190
191 options.num_levels = 10;
192 options.max_bytes_for_level_multiplier_additional.resize(10, 1);
193 ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
194}
195#endif // ROCKSDB_LITE
196
197TEST_F(DBBasicTest, PutDeleteGet) {
198 do {
199 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
200 ASSERT_OK(Put(1, "foo", "v1"));
201 ASSERT_EQ("v1", Get(1, "foo"));
202 ASSERT_OK(Put(1, "foo", "v2"));
203 ASSERT_EQ("v2", Get(1, "foo"));
204 ASSERT_OK(Delete(1, "foo"));
205 ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
206 } while (ChangeOptions());
207}
208
209TEST_F(DBBasicTest, PutSingleDeleteGet) {
210 do {
211 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
212 ASSERT_OK(Put(1, "foo", "v1"));
213 ASSERT_EQ("v1", Get(1, "foo"));
214 ASSERT_OK(Put(1, "foo2", "v2"));
215 ASSERT_EQ("v2", Get(1, "foo2"));
216 ASSERT_OK(SingleDelete(1, "foo"));
217 ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
494da23a
TL
218 // Ski FIFO and universal compaction because they do not apply to the test
219 // case. Skip MergePut because single delete does not get removed when it
220 // encounters a merge.
221 } while (ChangeOptions(kSkipFIFOCompaction | kSkipUniversalCompaction |
222 kSkipMergePut));
7c673cae
FG
223}
224
225TEST_F(DBBasicTest, EmptyFlush) {
226 // It is possible to produce empty flushes when using single deletes. Tests
227 // whether empty flushes cause issues.
228 do {
229 Random rnd(301);
230
231 Options options = CurrentOptions();
232 options.disable_auto_compactions = true;
233 CreateAndReopenWithCF({"pikachu"}, options);
234
235 Put(1, "a", Slice());
236 SingleDelete(1, "a");
237 ASSERT_OK(Flush(1));
238
239 ASSERT_EQ("[ ]", AllEntriesFor("a", 1));
494da23a
TL
240 // Skip FIFO and universal compaction as they do not apply to the test
241 // case. Skip MergePut because merges cannot be combined with single
242 // deletions.
243 } while (ChangeOptions(kSkipFIFOCompaction | kSkipUniversalCompaction |
244 kSkipMergePut));
7c673cae
FG
245}
246
247TEST_F(DBBasicTest, GetFromVersions) {
248 do {
249 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
250 ASSERT_OK(Put(1, "foo", "v1"));
251 ASSERT_OK(Flush(1));
252 ASSERT_EQ("v1", Get(1, "foo"));
253 ASSERT_EQ("NOT_FOUND", Get(0, "foo"));
254 } while (ChangeOptions());
255}
256
257#ifndef ROCKSDB_LITE
258TEST_F(DBBasicTest, GetSnapshot) {
259 anon::OptionsOverride options_override;
260 options_override.skip_policy = kSkipNoSnapshot;
261 do {
262 CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override));
263 // Try with both a short key and a long key
264 for (int i = 0; i < 2; i++) {
265 std::string key = (i == 0) ? std::string("foo") : std::string(200, 'x');
266 ASSERT_OK(Put(1, key, "v1"));
267 const Snapshot* s1 = db_->GetSnapshot();
7c673cae
FG
268 ASSERT_OK(Put(1, key, "v2"));
269 ASSERT_EQ("v2", Get(1, key));
270 ASSERT_EQ("v1", Get(1, key, s1));
271 ASSERT_OK(Flush(1));
272 ASSERT_EQ("v2", Get(1, key));
273 ASSERT_EQ("v1", Get(1, key, s1));
274 db_->ReleaseSnapshot(s1);
275 }
276 } while (ChangeOptions());
277}
278#endif // ROCKSDB_LITE
279
280TEST_F(DBBasicTest, CheckLock) {
281 do {
282 DB* localdb;
283 Options options = CurrentOptions();
284 ASSERT_OK(TryReopen(options));
285
286 // second open should fail
287 ASSERT_TRUE(!(DB::Open(options, dbname_, &localdb)).ok());
288 } while (ChangeCompactOptions());
289}
290
291TEST_F(DBBasicTest, FlushMultipleMemtable) {
292 do {
293 Options options = CurrentOptions();
294 WriteOptions writeOpt = WriteOptions();
295 writeOpt.disableWAL = true;
296 options.max_write_buffer_number = 4;
297 options.min_write_buffer_number_to_merge = 3;
298 options.max_write_buffer_number_to_maintain = -1;
299 CreateAndReopenWithCF({"pikachu"}, options);
300 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
301 ASSERT_OK(Flush(1));
302 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
303
304 ASSERT_EQ("v1", Get(1, "foo"));
305 ASSERT_EQ("v1", Get(1, "bar"));
306 ASSERT_OK(Flush(1));
307 } while (ChangeCompactOptions());
308}
309
310TEST_F(DBBasicTest, FlushEmptyColumnFamily) {
311 // Block flush thread and disable compaction thread
312 env_->SetBackgroundThreads(1, Env::HIGH);
313 env_->SetBackgroundThreads(1, Env::LOW);
314 test::SleepingBackgroundTask sleeping_task_low;
315 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
316 Env::Priority::LOW);
317 test::SleepingBackgroundTask sleeping_task_high;
318 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
319 &sleeping_task_high, Env::Priority::HIGH);
320
321 Options options = CurrentOptions();
322 // disable compaction
323 options.disable_auto_compactions = true;
324 WriteOptions writeOpt = WriteOptions();
325 writeOpt.disableWAL = true;
326 options.max_write_buffer_number = 2;
327 options.min_write_buffer_number_to_merge = 1;
328 options.max_write_buffer_number_to_maintain = 1;
329 CreateAndReopenWithCF({"pikachu"}, options);
330
331 // Compaction can still go through even if no thread can flush the
332 // mem table.
333 ASSERT_OK(Flush(0));
334 ASSERT_OK(Flush(1));
335
336 // Insert can go through
337 ASSERT_OK(dbfull()->Put(writeOpt, handles_[0], "foo", "v1"));
338 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
339
340 ASSERT_EQ("v1", Get(0, "foo"));
341 ASSERT_EQ("v1", Get(1, "bar"));
342
343 sleeping_task_high.WakeUp();
344 sleeping_task_high.WaitUntilDone();
345
346 // Flush can still go through.
347 ASSERT_OK(Flush(0));
348 ASSERT_OK(Flush(1));
349
350 sleeping_task_low.WakeUp();
351 sleeping_task_low.WaitUntilDone();
352}
353
354TEST_F(DBBasicTest, FLUSH) {
355 do {
356 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
357 WriteOptions writeOpt = WriteOptions();
358 writeOpt.disableWAL = true;
359 SetPerfLevel(kEnableTime);
7c673cae
FG
360 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
361 // this will now also flush the last 2 writes
362 ASSERT_OK(Flush(1));
363 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
364
11fdf7f2 365 get_perf_context()->Reset();
7c673cae 366 Get(1, "foo");
11fdf7f2
TL
367 ASSERT_TRUE((int)get_perf_context()->get_from_output_files_time > 0);
368 ASSERT_EQ(2, (int)get_perf_context()->get_read_bytes);
7c673cae
FG
369
370 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
371 ASSERT_EQ("v1", Get(1, "foo"));
372 ASSERT_EQ("v1", Get(1, "bar"));
373
374 writeOpt.disableWAL = true;
375 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v2"));
376 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v2"));
377 ASSERT_OK(Flush(1));
378
379 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
380 ASSERT_EQ("v2", Get(1, "bar"));
11fdf7f2 381 get_perf_context()->Reset();
7c673cae 382 ASSERT_EQ("v2", Get(1, "foo"));
11fdf7f2 383 ASSERT_TRUE((int)get_perf_context()->get_from_output_files_time > 0);
7c673cae
FG
384
385 writeOpt.disableWAL = false;
386 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v3"));
387 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v3"));
388 ASSERT_OK(Flush(1));
389
390 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
391 // 'foo' should be there because its put
392 // has WAL enabled.
393 ASSERT_EQ("v3", Get(1, "foo"));
394 ASSERT_EQ("v3", Get(1, "bar"));
395
396 SetPerfLevel(kDisable);
397 } while (ChangeCompactOptions());
398}
399
400TEST_F(DBBasicTest, ManifestRollOver) {
401 do {
402 Options options;
403 options.max_manifest_file_size = 10; // 10 bytes
404 options = CurrentOptions(options);
405 CreateAndReopenWithCF({"pikachu"}, options);
406 {
407 ASSERT_OK(Put(1, "manifest_key1", std::string(1000, '1')));
408 ASSERT_OK(Put(1, "manifest_key2", std::string(1000, '2')));
409 ASSERT_OK(Put(1, "manifest_key3", std::string(1000, '3')));
410 uint64_t manifest_before_flush = dbfull()->TEST_Current_Manifest_FileNo();
411 ASSERT_OK(Flush(1)); // This should trigger LogAndApply.
412 uint64_t manifest_after_flush = dbfull()->TEST_Current_Manifest_FileNo();
413 ASSERT_GT(manifest_after_flush, manifest_before_flush);
414 ReopenWithColumnFamilies({"default", "pikachu"}, options);
415 ASSERT_GT(dbfull()->TEST_Current_Manifest_FileNo(), manifest_after_flush);
416 // check if a new manifest file got inserted or not.
417 ASSERT_EQ(std::string(1000, '1'), Get(1, "manifest_key1"));
418 ASSERT_EQ(std::string(1000, '2'), Get(1, "manifest_key2"));
419 ASSERT_EQ(std::string(1000, '3'), Get(1, "manifest_key3"));
420 }
421 } while (ChangeCompactOptions());
422}
423
424TEST_F(DBBasicTest, IdentityAcrossRestarts) {
425 do {
426 std::string id1;
427 ASSERT_OK(db_->GetDbIdentity(id1));
428
429 Options options = CurrentOptions();
430 Reopen(options);
431 std::string id2;
432 ASSERT_OK(db_->GetDbIdentity(id2));
433 // id1 should match id2 because identity was not regenerated
434 ASSERT_EQ(id1.compare(id2), 0);
435
436 std::string idfilename = IdentityFileName(dbname_);
437 ASSERT_OK(env_->DeleteFile(idfilename));
438 Reopen(options);
439 std::string id3;
440 ASSERT_OK(db_->GetDbIdentity(id3));
441 // id1 should NOT match id3 because identity was regenerated
442 ASSERT_NE(id1.compare(id3), 0);
443 } while (ChangeCompactOptions());
444}
445
446#ifndef ROCKSDB_LITE
447TEST_F(DBBasicTest, Snapshot) {
448 anon::OptionsOverride options_override;
449 options_override.skip_policy = kSkipNoSnapshot;
450 do {
451 CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override));
452 Put(0, "foo", "0v1");
453 Put(1, "foo", "1v1");
454
455 const Snapshot* s1 = db_->GetSnapshot();
456 ASSERT_EQ(1U, GetNumSnapshots());
457 uint64_t time_snap1 = GetTimeOldestSnapshots();
458 ASSERT_GT(time_snap1, 0U);
459 Put(0, "foo", "0v2");
460 Put(1, "foo", "1v2");
461
462 env_->addon_time_.fetch_add(1);
463
464 const Snapshot* s2 = db_->GetSnapshot();
465 ASSERT_EQ(2U, GetNumSnapshots());
466 ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
467 Put(0, "foo", "0v3");
468 Put(1, "foo", "1v3");
469
470 {
471 ManagedSnapshot s3(db_);
472 ASSERT_EQ(3U, GetNumSnapshots());
473 ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
474
475 Put(0, "foo", "0v4");
476 Put(1, "foo", "1v4");
477 ASSERT_EQ("0v1", Get(0, "foo", s1));
478 ASSERT_EQ("1v1", Get(1, "foo", s1));
479 ASSERT_EQ("0v2", Get(0, "foo", s2));
480 ASSERT_EQ("1v2", Get(1, "foo", s2));
481 ASSERT_EQ("0v3", Get(0, "foo", s3.snapshot()));
482 ASSERT_EQ("1v3", Get(1, "foo", s3.snapshot()));
483 ASSERT_EQ("0v4", Get(0, "foo"));
484 ASSERT_EQ("1v4", Get(1, "foo"));
485 }
486
487 ASSERT_EQ(2U, GetNumSnapshots());
488 ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
489 ASSERT_EQ("0v1", Get(0, "foo", s1));
490 ASSERT_EQ("1v1", Get(1, "foo", s1));
491 ASSERT_EQ("0v2", Get(0, "foo", s2));
492 ASSERT_EQ("1v2", Get(1, "foo", s2));
493 ASSERT_EQ("0v4", Get(0, "foo"));
494 ASSERT_EQ("1v4", Get(1, "foo"));
495
496 db_->ReleaseSnapshot(s1);
497 ASSERT_EQ("0v2", Get(0, "foo", s2));
498 ASSERT_EQ("1v2", Get(1, "foo", s2));
499 ASSERT_EQ("0v4", Get(0, "foo"));
500 ASSERT_EQ("1v4", Get(1, "foo"));
501 ASSERT_EQ(1U, GetNumSnapshots());
502 ASSERT_LT(time_snap1, GetTimeOldestSnapshots());
503
504 db_->ReleaseSnapshot(s2);
505 ASSERT_EQ(0U, GetNumSnapshots());
506 ASSERT_EQ("0v4", Get(0, "foo"));
507 ASSERT_EQ("1v4", Get(1, "foo"));
494da23a 508 } while (ChangeOptions());
7c673cae
FG
509}
510
511#endif // ROCKSDB_LITE
512
513TEST_F(DBBasicTest, CompactBetweenSnapshots) {
514 anon::OptionsOverride options_override;
515 options_override.skip_policy = kSkipNoSnapshot;
516 do {
517 Options options = CurrentOptions(options_override);
518 options.disable_auto_compactions = true;
519 CreateAndReopenWithCF({"pikachu"}, options);
520 Random rnd(301);
521 FillLevels("a", "z", 1);
522
523 Put(1, "foo", "first");
524 const Snapshot* snapshot1 = db_->GetSnapshot();
525 Put(1, "foo", "second");
526 Put(1, "foo", "third");
527 Put(1, "foo", "fourth");
528 const Snapshot* snapshot2 = db_->GetSnapshot();
529 Put(1, "foo", "fifth");
530 Put(1, "foo", "sixth");
531
532 // All entries (including duplicates) exist
533 // before any compaction or flush is triggered.
534 ASSERT_EQ(AllEntriesFor("foo", 1),
535 "[ sixth, fifth, fourth, third, second, first ]");
536 ASSERT_EQ("sixth", Get(1, "foo"));
537 ASSERT_EQ("fourth", Get(1, "foo", snapshot2));
538 ASSERT_EQ("first", Get(1, "foo", snapshot1));
539
540 // After a flush, "second", "third" and "fifth" should
541 // be removed
542 ASSERT_OK(Flush(1));
543 ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth, fourth, first ]");
544
545 // after we release the snapshot1, only two values left
546 db_->ReleaseSnapshot(snapshot1);
547 FillLevels("a", "z", 1);
548 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
549 nullptr);
550
551 // We have only one valid snapshot snapshot2. Since snapshot1 is
552 // not valid anymore, "first" should be removed by a compaction.
553 ASSERT_EQ("sixth", Get(1, "foo"));
554 ASSERT_EQ("fourth", Get(1, "foo", snapshot2));
555 ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth, fourth ]");
556
557 // after we release the snapshot2, only one value should be left
558 db_->ReleaseSnapshot(snapshot2);
559 FillLevels("a", "z", 1);
560 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
561 nullptr);
562 ASSERT_EQ("sixth", Get(1, "foo"));
563 ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth ]");
494da23a 564 } while (ChangeOptions(kSkipFIFOCompaction));
7c673cae
FG
565}
566
567TEST_F(DBBasicTest, DBOpen_Options) {
568 Options options = CurrentOptions();
11fdf7f2
TL
569 Close();
570 Destroy(options);
7c673cae
FG
571
572 // Does not exist, and create_if_missing == false: error
573 DB* db = nullptr;
574 options.create_if_missing = false;
11fdf7f2 575 Status s = DB::Open(options, dbname_, &db);
7c673cae
FG
576 ASSERT_TRUE(strstr(s.ToString().c_str(), "does not exist") != nullptr);
577 ASSERT_TRUE(db == nullptr);
578
579 // Does not exist, and create_if_missing == true: OK
580 options.create_if_missing = true;
11fdf7f2 581 s = DB::Open(options, dbname_, &db);
7c673cae
FG
582 ASSERT_OK(s);
583 ASSERT_TRUE(db != nullptr);
584
585 delete db;
586 db = nullptr;
587
588 // Does exist, and error_if_exists == true: error
589 options.create_if_missing = false;
590 options.error_if_exists = true;
11fdf7f2 591 s = DB::Open(options, dbname_, &db);
7c673cae
FG
592 ASSERT_TRUE(strstr(s.ToString().c_str(), "exists") != nullptr);
593 ASSERT_TRUE(db == nullptr);
594
595 // Does exist, and error_if_exists == false: OK
596 options.create_if_missing = true;
597 options.error_if_exists = false;
11fdf7f2 598 s = DB::Open(options, dbname_, &db);
7c673cae
FG
599 ASSERT_OK(s);
600 ASSERT_TRUE(db != nullptr);
601
602 delete db;
603 db = nullptr;
604}
605
606TEST_F(DBBasicTest, CompactOnFlush) {
607 anon::OptionsOverride options_override;
608 options_override.skip_policy = kSkipNoSnapshot;
609 do {
610 Options options = CurrentOptions(options_override);
611 options.disable_auto_compactions = true;
612 CreateAndReopenWithCF({"pikachu"}, options);
613
614 Put(1, "foo", "v1");
615 ASSERT_OK(Flush(1));
616 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v1 ]");
617
618 // Write two new keys
619 Put(1, "a", "begin");
620 Put(1, "z", "end");
621 Flush(1);
622
623 // Case1: Delete followed by a put
624 Delete(1, "foo");
625 Put(1, "foo", "v2");
626 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, DEL, v1 ]");
627
628 // After the current memtable is flushed, the DEL should
629 // have been removed
630 ASSERT_OK(Flush(1));
631 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, v1 ]");
632
633 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
634 nullptr);
635 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2 ]");
636
637 // Case 2: Delete followed by another delete
638 Delete(1, "foo");
639 Delete(1, "foo");
640 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, DEL, v2 ]");
641 ASSERT_OK(Flush(1));
642 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v2 ]");
643 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
644 nullptr);
645 ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
646
647 // Case 3: Put followed by a delete
648 Put(1, "foo", "v3");
649 Delete(1, "foo");
650 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v3 ]");
651 ASSERT_OK(Flush(1));
652 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL ]");
653 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
654 nullptr);
655 ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
656
657 // Case 4: Put followed by another Put
658 Put(1, "foo", "v4");
659 Put(1, "foo", "v5");
660 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5, v4 ]");
661 ASSERT_OK(Flush(1));
662 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]");
663 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
664 nullptr);
665 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]");
666
667 // clear database
668 Delete(1, "foo");
669 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
670 nullptr);
671 ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
672
673 // Case 5: Put followed by snapshot followed by another Put
674 // Both puts should remain.
675 Put(1, "foo", "v6");
676 const Snapshot* snapshot = db_->GetSnapshot();
677 Put(1, "foo", "v7");
678 ASSERT_OK(Flush(1));
679 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v7, v6 ]");
680 db_->ReleaseSnapshot(snapshot);
681
682 // clear database
683 Delete(1, "foo");
684 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
685 nullptr);
686 ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
687
688 // Case 5: snapshot followed by a put followed by another Put
689 // Only the last put should remain.
690 const Snapshot* snapshot1 = db_->GetSnapshot();
691 Put(1, "foo", "v8");
692 Put(1, "foo", "v9");
693 ASSERT_OK(Flush(1));
694 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v9 ]");
695 db_->ReleaseSnapshot(snapshot1);
696 } while (ChangeCompactOptions());
697}
698
699TEST_F(DBBasicTest, FlushOneColumnFamily) {
700 Options options = CurrentOptions();
701 CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
702 "alyosha", "popovich"},
703 options);
704
705 ASSERT_OK(Put(0, "Default", "Default"));
706 ASSERT_OK(Put(1, "pikachu", "pikachu"));
707 ASSERT_OK(Put(2, "ilya", "ilya"));
708 ASSERT_OK(Put(3, "muromec", "muromec"));
709 ASSERT_OK(Put(4, "dobrynia", "dobrynia"));
710 ASSERT_OK(Put(5, "nikitich", "nikitich"));
711 ASSERT_OK(Put(6, "alyosha", "alyosha"));
712 ASSERT_OK(Put(7, "popovich", "popovich"));
713
714 for (int i = 0; i < 8; ++i) {
715 Flush(i);
716 auto tables = ListTableFiles(env_, dbname_);
717 ASSERT_EQ(tables.size(), i + 1U);
718 }
719}
720
721TEST_F(DBBasicTest, MultiGetSimple) {
722 do {
723 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
11fdf7f2 724 SetPerfLevel(kEnableCount);
7c673cae
FG
725 ASSERT_OK(Put(1, "k1", "v1"));
726 ASSERT_OK(Put(1, "k2", "v2"));
727 ASSERT_OK(Put(1, "k3", "v3"));
728 ASSERT_OK(Put(1, "k4", "v4"));
729 ASSERT_OK(Delete(1, "k4"));
730 ASSERT_OK(Put(1, "k5", "v5"));
731 ASSERT_OK(Delete(1, "no_key"));
732
733 std::vector<Slice> keys({"k1", "k2", "k3", "k4", "k5", "no_key"});
734
735 std::vector<std::string> values(20, "Temporary data to be overwritten");
736 std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
737
11fdf7f2 738 get_perf_context()->Reset();
7c673cae
FG
739 std::vector<Status> s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
740 ASSERT_EQ(values.size(), keys.size());
741 ASSERT_EQ(values[0], "v1");
742 ASSERT_EQ(values[1], "v2");
743 ASSERT_EQ(values[2], "v3");
744 ASSERT_EQ(values[4], "v5");
11fdf7f2
TL
745 // four kv pairs * two bytes per value
746 ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
7c673cae
FG
747
748 ASSERT_OK(s[0]);
749 ASSERT_OK(s[1]);
750 ASSERT_OK(s[2]);
751 ASSERT_TRUE(s[3].IsNotFound());
752 ASSERT_OK(s[4]);
753 ASSERT_TRUE(s[5].IsNotFound());
11fdf7f2 754 SetPerfLevel(kDisable);
7c673cae
FG
755 } while (ChangeCompactOptions());
756}
757
758TEST_F(DBBasicTest, MultiGetEmpty) {
759 do {
760 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
761 // Empty Key Set
762 std::vector<Slice> keys;
763 std::vector<std::string> values;
764 std::vector<ColumnFamilyHandle*> cfs;
765 std::vector<Status> s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
766 ASSERT_EQ(s.size(), 0U);
767
768 // Empty Database, Empty Key Set
769 Options options = CurrentOptions();
770 options.create_if_missing = true;
771 DestroyAndReopen(options);
772 CreateAndReopenWithCF({"pikachu"}, options);
773 s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
774 ASSERT_EQ(s.size(), 0U);
775
776 // Empty Database, Search for Keys
777 keys.resize(2);
778 keys[0] = "a";
779 keys[1] = "b";
780 cfs.push_back(handles_[0]);
781 cfs.push_back(handles_[1]);
782 s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
783 ASSERT_EQ(static_cast<int>(s.size()), 2);
784 ASSERT_TRUE(s[0].IsNotFound() && s[1].IsNotFound());
785 } while (ChangeCompactOptions());
786}
787
788TEST_F(DBBasicTest, ChecksumTest) {
789 BlockBasedTableOptions table_options;
790 Options options = CurrentOptions();
11fdf7f2 791 // change when new checksum type added
494da23a 792 int max_checksum = static_cast<int>(kxxHash64);
11fdf7f2
TL
793 const int kNumPerFile = 2;
794
795 // generate one table with each type of checksum
796 for (int i = 0; i <= max_checksum; ++i) {
797 table_options.checksum = static_cast<ChecksumType>(i);
798 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
799 Reopen(options);
800 for (int j = 0; j < kNumPerFile; ++j) {
801 ASSERT_OK(Put(Key(i * kNumPerFile + j), Key(i * kNumPerFile + j)));
802 }
803 ASSERT_OK(Flush());
804 }
7c673cae 805
11fdf7f2 806 // verify data with each type of checksum
494da23a 807 for (int i = 0; i <= kxxHash64; ++i) {
11fdf7f2
TL
808 table_options.checksum = static_cast<ChecksumType>(i);
809 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
810 Reopen(options);
811 for (int j = 0; j < (max_checksum + 1) * kNumPerFile; ++j) {
812 ASSERT_EQ(Key(j), Get(Key(j)));
813 }
814 }
7c673cae
FG
815}
816
817// On Windows you can have either memory mapped file or a file
818// with unbuffered access. So this asserts and does not make
819// sense to run
820#ifndef OS_WIN
821TEST_F(DBBasicTest, MmapAndBufferOptions) {
11fdf7f2
TL
822 if (!IsMemoryMappedAccessSupported()) {
823 return;
824 }
7c673cae
FG
825 Options options = CurrentOptions();
826
827 options.use_direct_reads = true;
828 options.allow_mmap_reads = true;
829 ASSERT_NOK(TryReopen(options));
830
831 // All other combinations are acceptable
832 options.use_direct_reads = false;
833 ASSERT_OK(TryReopen(options));
834
835 if (IsDirectIOSupported()) {
836 options.use_direct_reads = true;
837 options.allow_mmap_reads = false;
838 ASSERT_OK(TryReopen(options));
839 }
840
841 options.use_direct_reads = false;
842 ASSERT_OK(TryReopen(options));
843}
844#endif
845
11fdf7f2
TL
846class TestEnv : public EnvWrapper {
847 public:
848 explicit TestEnv() : EnvWrapper(Env::Default()),
849 close_count(0) { }
850
851 class TestLogger : public Logger {
852 public:
853 using Logger::Logv;
854 TestLogger(TestEnv *env_ptr) : Logger() { env = env_ptr; }
494da23a 855 ~TestLogger() override {
11fdf7f2
TL
856 if (!closed_) {
857 CloseHelper();
858 }
859 }
494da23a 860 void Logv(const char* /*format*/, va_list /*ap*/) override{};
11fdf7f2
TL
861
862 protected:
494da23a
TL
863 Status CloseImpl() override { return CloseHelper(); }
864
865 private:
11fdf7f2
TL
866 Status CloseHelper() {
867 env->CloseCountInc();;
868 return Status::IOError();
869 }
870 TestEnv *env;
871 };
872
873 void CloseCountInc() { close_count++; }
874
875 int GetCloseCount() { return close_count; }
876
494da23a
TL
877 Status NewLogger(const std::string& /*fname*/,
878 std::shared_ptr<Logger>* result) override {
11fdf7f2
TL
879 result->reset(new TestLogger(this));
880 return Status::OK();
881 }
882
883 private:
884 int close_count;
885};
886
887TEST_F(DBBasicTest, DBClose) {
888 Options options = GetDefaultOptions();
889 std::string dbname = test::PerThreadDBPath("db_close_test");
890 ASSERT_OK(DestroyDB(dbname, options));
891
892 DB* db = nullptr;
893 TestEnv* env = new TestEnv();
894 options.create_if_missing = true;
895 options.env = env;
896 Status s = DB::Open(options, dbname, &db);
897 ASSERT_OK(s);
898 ASSERT_TRUE(db != nullptr);
899
900 s = db->Close();
901 ASSERT_EQ(env->GetCloseCount(), 1);
902 ASSERT_EQ(s, Status::IOError());
903
904 delete db;
905 ASSERT_EQ(env->GetCloseCount(), 1);
906
907 // Do not call DB::Close() and ensure our logger Close() still gets called
908 s = DB::Open(options, dbname, &db);
909 ASSERT_OK(s);
910 ASSERT_TRUE(db != nullptr);
911 delete db;
912 ASSERT_EQ(env->GetCloseCount(), 2);
913
914 // Provide our own logger and ensure DB::Close() does not close it
915 options.info_log.reset(new TestEnv::TestLogger(env));
916 options.create_if_missing = false;
917 s = DB::Open(options, dbname, &db);
918 ASSERT_OK(s);
919 ASSERT_TRUE(db != nullptr);
920
921 s = db->Close();
922 ASSERT_EQ(s, Status::OK());
923 delete db;
924 ASSERT_EQ(env->GetCloseCount(), 2);
925 options.info_log.reset();
926 ASSERT_EQ(env->GetCloseCount(), 3);
927
928 delete options.env;
929}
930
931TEST_F(DBBasicTest, DBCloseFlushError) {
932 std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
933 new FaultInjectionTestEnv(Env::Default()));
934 Options options = GetDefaultOptions();
935 options.create_if_missing = true;
936 options.manual_wal_flush = true;
937 options.write_buffer_size=100;
938 options.env = fault_injection_env.get();
939
940 Reopen(options);
941 ASSERT_OK(Put("key1", "value1"));
942 ASSERT_OK(Put("key2", "value2"));
943 ASSERT_OK(dbfull()->TEST_SwitchMemtable());
944 ASSERT_OK(Put("key3", "value3"));
945 fault_injection_env->SetFilesystemActive(false);
946 Status s = dbfull()->Close();
947 fault_injection_env->SetFilesystemActive(true);
948 ASSERT_NE(s, Status::OK());
949
950 Destroy(options);
951}
952
494da23a
TL
953TEST_F(DBBasicTest, MultiGetMultiCF) {
954 Options options = CurrentOptions();
955 CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
956 "alyosha", "popovich"},
957 options);
958
959 for (int i = 0; i < 8; ++i) {
960 ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
961 "cf" + std::to_string(i) + "_val"));
962 }
963
964 int get_sv_count = 0;
965 rocksdb::DBImpl* db = reinterpret_cast<DBImpl*>(db_);
966 rocksdb::SyncPoint::GetInstance()->SetCallBack(
967 "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
968 if (++get_sv_count == 2) {
969 // After MultiGet refs a couple of CFs, flush all CFs so MultiGet
970 // is forced to repeat the process
971 for (int i = 0; i < 8; ++i) {
972 ASSERT_OK(Flush(i));
973 ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
974 "cf" + std::to_string(i) + "_val2"));
975 }
976 }
977 if (get_sv_count == 11) {
978 for (int i = 0; i < 8; ++i) {
979 auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
980 db->GetColumnFamilyHandle(i))
981 ->cfd();
982 ASSERT_EQ(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
983 }
984 }
985 });
986 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
987
988 std::vector<int> cfs;
989 std::vector<std::string> keys;
990 std::vector<std::string> values;
991
992 for (int i = 0; i < 8; ++i) {
993 cfs.push_back(i);
994 keys.push_back("cf" + std::to_string(i) + "_key");
995 }
996
997 values = MultiGet(cfs, keys);
998 ASSERT_EQ(values.size(), 8);
999 for (unsigned int j = 0; j < values.size(); ++j) {
1000 ASSERT_EQ(values[j], "cf" + std::to_string(j) + "_val2");
1001 }
1002 for (int i = 0; i < 8; ++i) {
1003 auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
1004 reinterpret_cast<DBImpl*>(db_)->GetColumnFamilyHandle(i))
1005 ->cfd();
1006 ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
1007 ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete);
1008 }
1009}
1010
1011TEST_F(DBBasicTest, MultiGetMultiCFMutex) {
1012 Options options = CurrentOptions();
1013 CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
1014 "alyosha", "popovich"},
1015 options);
1016
1017 for (int i = 0; i < 8; ++i) {
1018 ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
1019 "cf" + std::to_string(i) + "_val"));
1020 }
1021
1022 int get_sv_count = 0;
1023 int retries = 0;
1024 bool last_try = false;
1025 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1026 "DBImpl::MultiGet::LastTry", [&](void* /*arg*/) {
1027 last_try = true;
1028 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
1029 });
1030 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1031 "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
1032 if (last_try) {
1033 return;
1034 }
1035 if (++get_sv_count == 2) {
1036 ++retries;
1037 get_sv_count = 0;
1038 for (int i = 0; i < 8; ++i) {
1039 ASSERT_OK(Flush(i));
1040 ASSERT_OK(Put(
1041 i, "cf" + std::to_string(i) + "_key",
1042 "cf" + std::to_string(i) + "_val" + std::to_string(retries)));
1043 }
1044 }
1045 });
1046 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1047
1048 std::vector<int> cfs;
1049 std::vector<std::string> keys;
1050 std::vector<std::string> values;
1051
1052 for (int i = 0; i < 8; ++i) {
1053 cfs.push_back(i);
1054 keys.push_back("cf" + std::to_string(i) + "_key");
1055 }
1056
1057 values = MultiGet(cfs, keys);
1058 ASSERT_TRUE(last_try);
1059 ASSERT_EQ(values.size(), 8);
1060 for (unsigned int j = 0; j < values.size(); ++j) {
1061 ASSERT_EQ(values[j],
1062 "cf" + std::to_string(j) + "_val" + std::to_string(retries));
1063 }
1064 for (int i = 0; i < 8; ++i) {
1065 auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
1066 reinterpret_cast<DBImpl*>(db_)->GetColumnFamilyHandle(i))
1067 ->cfd();
1068 ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
1069 }
1070}
1071
1072TEST_F(DBBasicTest, MultiGetMultiCFSnapshot) {
1073 Options options = CurrentOptions();
1074 CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
1075 "alyosha", "popovich"},
1076 options);
1077
1078 for (int i = 0; i < 8; ++i) {
1079 ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
1080 "cf" + std::to_string(i) + "_val"));
1081 }
1082
1083 int get_sv_count = 0;
1084 rocksdb::DBImpl* db = reinterpret_cast<DBImpl*>(db_);
1085 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1086 "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
1087 if (++get_sv_count == 2) {
1088 for (int i = 0; i < 8; ++i) {
1089 ASSERT_OK(Flush(i));
1090 ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
1091 "cf" + std::to_string(i) + "_val2"));
1092 }
1093 }
1094 if (get_sv_count == 8) {
1095 for (int i = 0; i < 8; ++i) {
1096 auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
1097 db->GetColumnFamilyHandle(i))
1098 ->cfd();
1099 ASSERT_TRUE(
1100 (cfd->TEST_GetLocalSV()->Get() == SuperVersion::kSVInUse) ||
1101 (cfd->TEST_GetLocalSV()->Get() == SuperVersion::kSVObsolete));
1102 }
1103 }
1104 });
1105 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1106
1107 std::vector<int> cfs;
1108 std::vector<std::string> keys;
1109 std::vector<std::string> values;
1110
1111 for (int i = 0; i < 8; ++i) {
1112 cfs.push_back(i);
1113 keys.push_back("cf" + std::to_string(i) + "_key");
1114 }
1115
1116 const Snapshot* snapshot = db_->GetSnapshot();
1117 values = MultiGet(cfs, keys, snapshot);
1118 db_->ReleaseSnapshot(snapshot);
1119 ASSERT_EQ(values.size(), 8);
1120 for (unsigned int j = 0; j < values.size(); ++j) {
1121 ASSERT_EQ(values[j], "cf" + std::to_string(j) + "_val");
1122 }
1123 for (int i = 0; i < 8; ++i) {
1124 auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
1125 reinterpret_cast<DBImpl*>(db_)->GetColumnFamilyHandle(i))
1126 ->cfd();
1127 ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
1128 }
1129}
1130
7c673cae
FG
1131} // namespace rocksdb
1132
1133int main(int argc, char** argv) {
1134 rocksdb::port::InstallStackTraceHandler();
1135 ::testing::InitGoogleTest(&argc, argv);
1136 return RUN_ALL_TESTS();
1137}