]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/db_test2.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / db / db_test2.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9#include <atomic>
10#include <cstdlib>
11#include <functional>
12
13#include "db/db_test_util.h"
11fdf7f2 14#include "db/read_callback.h"
7c673cae
FG
15#include "port/port.h"
16#include "port/stack_trace.h"
17#include "rocksdb/persistent_cache.h"
18#include "rocksdb/wal_filter.h"
19
20namespace rocksdb {
21
22class DBTest2 : public DBTestBase {
23 public:
24 DBTest2() : DBTestBase("/db_test2") {}
25};
26
27class PrefixFullBloomWithReverseComparator
28 : public DBTestBase,
29 public ::testing::WithParamInterface<bool> {
30 public:
31 PrefixFullBloomWithReverseComparator()
32 : DBTestBase("/prefix_bloom_reverse") {}
494da23a 33 void SetUp() override { if_cache_filter_ = GetParam(); }
7c673cae
FG
34 bool if_cache_filter_;
35};
36
37TEST_P(PrefixFullBloomWithReverseComparator,
38 PrefixFullBloomWithReverseComparator) {
39 Options options = last_options_;
40 options.comparator = ReverseBytewiseComparator();
41 options.prefix_extractor.reset(NewCappedPrefixTransform(3));
42 options.statistics = rocksdb::CreateDBStatistics();
43 BlockBasedTableOptions bbto;
44 if (if_cache_filter_) {
45 bbto.no_block_cache = false;
46 bbto.cache_index_and_filter_blocks = true;
47 bbto.block_cache = NewLRUCache(1);
48 }
49 bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
50 bbto.whole_key_filtering = false;
51 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
52 DestroyAndReopen(options);
53
54 ASSERT_OK(dbfull()->Put(WriteOptions(), "bar123", "foo"));
55 ASSERT_OK(dbfull()->Put(WriteOptions(), "bar234", "foo2"));
56 ASSERT_OK(dbfull()->Put(WriteOptions(), "foo123", "foo3"));
57
58 dbfull()->Flush(FlushOptions());
59
60 if (bbto.block_cache) {
61 bbto.block_cache->EraseUnRefEntries();
62 }
63
494da23a 64 std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
7c673cae
FG
65 iter->Seek("bar345");
66 ASSERT_OK(iter->status());
67 ASSERT_TRUE(iter->Valid());
68 ASSERT_EQ("bar234", iter->key().ToString());
69 ASSERT_EQ("foo2", iter->value().ToString());
70 iter->Next();
71 ASSERT_TRUE(iter->Valid());
72 ASSERT_EQ("bar123", iter->key().ToString());
73 ASSERT_EQ("foo", iter->value().ToString());
74
75 iter->Seek("foo234");
76 ASSERT_OK(iter->status());
77 ASSERT_TRUE(iter->Valid());
78 ASSERT_EQ("foo123", iter->key().ToString());
79 ASSERT_EQ("foo3", iter->value().ToString());
80
81 iter->Seek("bar");
82 ASSERT_OK(iter->status());
83 ASSERT_TRUE(!iter->Valid());
84}
85
86INSTANTIATE_TEST_CASE_P(PrefixFullBloomWithReverseComparator,
87 PrefixFullBloomWithReverseComparator, testing::Bool());
88
89TEST_F(DBTest2, IteratorPropertyVersionNumber) {
90 Put("", "");
91 Iterator* iter1 = db_->NewIterator(ReadOptions());
92 std::string prop_value;
93 ASSERT_OK(
94 iter1->GetProperty("rocksdb.iterator.super-version-number", &prop_value));
95 uint64_t version_number1 =
96 static_cast<uint64_t>(std::atoi(prop_value.c_str()));
97
98 Put("", "");
99 Flush();
100
101 Iterator* iter2 = db_->NewIterator(ReadOptions());
102 ASSERT_OK(
103 iter2->GetProperty("rocksdb.iterator.super-version-number", &prop_value));
104 uint64_t version_number2 =
105 static_cast<uint64_t>(std::atoi(prop_value.c_str()));
106
107 ASSERT_GT(version_number2, version_number1);
108
109 Put("", "");
110
111 Iterator* iter3 = db_->NewIterator(ReadOptions());
112 ASSERT_OK(
113 iter3->GetProperty("rocksdb.iterator.super-version-number", &prop_value));
114 uint64_t version_number3 =
115 static_cast<uint64_t>(std::atoi(prop_value.c_str()));
116
117 ASSERT_EQ(version_number2, version_number3);
118
119 iter1->SeekToFirst();
120 ASSERT_OK(
121 iter1->GetProperty("rocksdb.iterator.super-version-number", &prop_value));
122 uint64_t version_number1_new =
123 static_cast<uint64_t>(std::atoi(prop_value.c_str()));
124 ASSERT_EQ(version_number1, version_number1_new);
125
126 delete iter1;
127 delete iter2;
128 delete iter3;
129}
130
131TEST_F(DBTest2, CacheIndexAndFilterWithDBRestart) {
132 Options options = CurrentOptions();
133 options.create_if_missing = true;
134 options.statistics = rocksdb::CreateDBStatistics();
135 BlockBasedTableOptions table_options;
136 table_options.cache_index_and_filter_blocks = true;
137 table_options.filter_policy.reset(NewBloomFilterPolicy(20));
138 options.table_factory.reset(new BlockBasedTableFactory(table_options));
139 CreateAndReopenWithCF({"pikachu"}, options);
140
141 Put(1, "a", "begin");
142 Put(1, "z", "end");
143 ASSERT_OK(Flush(1));
144 TryReopenWithColumnFamilies({"default", "pikachu"}, options);
145
146 std::string value;
147 value = Get(1, "a");
148}
149
150TEST_F(DBTest2, MaxSuccessiveMergesChangeWithDBRecovery) {
151 Options options = CurrentOptions();
152 options.create_if_missing = true;
153 options.statistics = rocksdb::CreateDBStatistics();
154 options.max_successive_merges = 3;
155 options.merge_operator = MergeOperators::CreatePutOperator();
156 options.disable_auto_compactions = true;
157 DestroyAndReopen(options);
158 Put("poi", "Finch");
159 db_->Merge(WriteOptions(), "poi", "Reese");
160 db_->Merge(WriteOptions(), "poi", "Shaw");
161 db_->Merge(WriteOptions(), "poi", "Root");
162 options.max_successive_merges = 2;
163 Reopen(options);
164}
165
166#ifndef ROCKSDB_LITE
167class DBTestSharedWriteBufferAcrossCFs
168 : public DBTestBase,
11fdf7f2 169 public testing::WithParamInterface<std::tuple<bool, bool>> {
7c673cae
FG
170 public:
171 DBTestSharedWriteBufferAcrossCFs()
172 : DBTestBase("/db_test_shared_write_buffer") {}
11fdf7f2
TL
173 void SetUp() override {
174 use_old_interface_ = std::get<0>(GetParam());
175 cost_cache_ = std::get<1>(GetParam());
176 }
7c673cae 177 bool use_old_interface_;
11fdf7f2 178 bool cost_cache_;
7c673cae
FG
179};
180
181TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) {
182 Options options = CurrentOptions();
11fdf7f2
TL
183 options.arena_block_size = 4096;
184
185 // Avoid undeterministic value by malloc_usable_size();
186 // Force arena block size to 1
187 rocksdb::SyncPoint::GetInstance()->SetCallBack(
188 "Arena::Arena:0", [&](void* arg) {
189 size_t* block_size = static_cast<size_t*>(arg);
190 *block_size = 1;
191 });
192
193 rocksdb::SyncPoint::GetInstance()->SetCallBack(
194 "Arena::AllocateNewBlock:0", [&](void* arg) {
195 std::pair<size_t*, size_t*>* pair =
196 static_cast<std::pair<size_t*, size_t*>*>(arg);
197 *std::get<0>(*pair) = *std::get<1>(*pair);
198 });
199 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
200
201 // The total soft write buffer size is about 105000
202 std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
203 ASSERT_LT(cache->GetUsage(), 1024 * 1024);
204
7c673cae 205 if (use_old_interface_) {
11fdf7f2
TL
206 options.db_write_buffer_size = 120000; // this is the real limit
207 } else if (!cost_cache_) {
208 options.write_buffer_manager.reset(new WriteBufferManager(114285));
7c673cae 209 } else {
11fdf7f2 210 options.write_buffer_manager.reset(new WriteBufferManager(114285, cache));
7c673cae
FG
211 }
212 options.write_buffer_size = 500000; // this is never hit
213 CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options);
214
215 WriteOptions wo;
216 wo.disableWAL = true;
217
11fdf7f2
TL
218 std::function<void()> wait_flush = [&]() {
219 dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
220 dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
221 dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
222 dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
223 };
224
7c673cae
FG
225 // Create some data and flush "default" and "nikitich" so that they
226 // are newer CFs created.
227 ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
228 Flush(3);
229 ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
230 ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
231 Flush(0);
232 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
233 static_cast<uint64_t>(1));
234 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
235 static_cast<uint64_t>(1));
236
237 ASSERT_OK(Put(3, Key(1), DummyString(30000), wo));
11fdf7f2
TL
238 if (cost_cache_) {
239 ASSERT_GE(cache->GetUsage(), 1024 * 1024);
240 ASSERT_LE(cache->GetUsage(), 2 * 1024 * 1024);
241 }
242 wait_flush();
7c673cae 243 ASSERT_OK(Put(0, Key(1), DummyString(60000), wo));
11fdf7f2
TL
244 if (cost_cache_) {
245 ASSERT_GE(cache->GetUsage(), 1024 * 1024);
246 ASSERT_LE(cache->GetUsage(), 2 * 1024 * 1024);
247 }
248 wait_flush();
7c673cae
FG
249 ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
250 // No flush should trigger
11fdf7f2 251 wait_flush();
7c673cae
FG
252 {
253 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
254 static_cast<uint64_t>(1));
255 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
256 static_cast<uint64_t>(0));
257 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
258 static_cast<uint64_t>(0));
259 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
260 static_cast<uint64_t>(1));
261 }
262
263 // Trigger a flush. Flushing "nikitich".
264 ASSERT_OK(Put(3, Key(2), DummyString(30000), wo));
11fdf7f2 265 wait_flush();
7c673cae 266 ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
11fdf7f2 267 wait_flush();
7c673cae
FG
268 {
269 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
270 static_cast<uint64_t>(1));
271 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
272 static_cast<uint64_t>(0));
273 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
274 static_cast<uint64_t>(0));
275 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
276 static_cast<uint64_t>(2));
277 }
278
279 // Without hitting the threshold, no flush should trigger.
280 ASSERT_OK(Put(2, Key(1), DummyString(30000), wo));
11fdf7f2 281 wait_flush();
7c673cae 282 ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
11fdf7f2 283 wait_flush();
7c673cae 284 ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
11fdf7f2 285 wait_flush();
7c673cae
FG
286 {
287 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
288 static_cast<uint64_t>(1));
289 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
290 static_cast<uint64_t>(0));
291 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
292 static_cast<uint64_t>(0));
293 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
294 static_cast<uint64_t>(2));
295 }
296
297 // Hit the write buffer limit again. "default"
298 // will have been flushed.
299 ASSERT_OK(Put(2, Key(2), DummyString(10000), wo));
11fdf7f2 300 wait_flush();
7c673cae 301 ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
11fdf7f2 302 wait_flush();
7c673cae 303 ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
11fdf7f2 304 wait_flush();
7c673cae 305 ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
11fdf7f2 306 wait_flush();
7c673cae 307 ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
11fdf7f2 308 wait_flush();
7c673cae
FG
309 {
310 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
311 static_cast<uint64_t>(2));
312 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
313 static_cast<uint64_t>(0));
314 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
315 static_cast<uint64_t>(0));
316 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
317 static_cast<uint64_t>(2));
318 }
319
320 // Trigger another flush. This time "dobrynia". "pikachu" should not
321 // be flushed, althrough it was never flushed.
322 ASSERT_OK(Put(1, Key(1), DummyString(1), wo));
11fdf7f2 323 wait_flush();
7c673cae 324 ASSERT_OK(Put(2, Key(1), DummyString(80000), wo));
11fdf7f2 325 wait_flush();
7c673cae 326 ASSERT_OK(Put(1, Key(1), DummyString(1), wo));
11fdf7f2 327 wait_flush();
7c673cae 328 ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
11fdf7f2
TL
329 wait_flush();
330
7c673cae
FG
331 {
332 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
333 static_cast<uint64_t>(2));
334 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
335 static_cast<uint64_t>(0));
336 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
337 static_cast<uint64_t>(1));
338 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
339 static_cast<uint64_t>(2));
340 }
11fdf7f2
TL
341 if (cost_cache_) {
342 ASSERT_GE(cache->GetUsage(), 1024 * 1024);
343 Close();
344 options.write_buffer_manager.reset();
494da23a 345 last_options_.write_buffer_manager.reset();
11fdf7f2
TL
346 ASSERT_LT(cache->GetUsage(), 1024 * 1024);
347 }
348 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
7c673cae
FG
349}
350
351INSTANTIATE_TEST_CASE_P(DBTestSharedWriteBufferAcrossCFs,
11fdf7f2
TL
352 DBTestSharedWriteBufferAcrossCFs,
353 ::testing::Values(std::make_tuple(true, false),
354 std::make_tuple(false, false),
355 std::make_tuple(false, true)));
7c673cae
FG
356
357TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) {
11fdf7f2 358 std::string dbname2 = test::PerThreadDBPath("db_shared_wb_db2");
7c673cae 359 Options options = CurrentOptions();
11fdf7f2
TL
360 options.arena_block_size = 4096;
361 // Avoid undeterministic value by malloc_usable_size();
362 // Force arena block size to 1
363 rocksdb::SyncPoint::GetInstance()->SetCallBack(
364 "Arena::Arena:0", [&](void* arg) {
365 size_t* block_size = static_cast<size_t*>(arg);
366 *block_size = 1;
367 });
368
369 rocksdb::SyncPoint::GetInstance()->SetCallBack(
370 "Arena::AllocateNewBlock:0", [&](void* arg) {
371 std::pair<size_t*, size_t*>* pair =
372 static_cast<std::pair<size_t*, size_t*>*>(arg);
373 *std::get<0>(*pair) = *std::get<1>(*pair);
374 });
375 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
376
7c673cae 377 options.write_buffer_size = 500000; // this is never hit
11fdf7f2
TL
378 // Use a write buffer total size so that the soft limit is about
379 // 105000.
380 options.write_buffer_manager.reset(new WriteBufferManager(120000));
7c673cae
FG
381 CreateAndReopenWithCF({"cf1", "cf2"}, options);
382
383 ASSERT_OK(DestroyDB(dbname2, options));
384 DB* db2 = nullptr;
385 ASSERT_OK(DB::Open(options, dbname2, &db2));
386
387 WriteOptions wo;
388 wo.disableWAL = true;
389
11fdf7f2
TL
390 std::function<void()> wait_flush = [&]() {
391 dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
392 dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
393 dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
394 static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable();
395 };
396
7c673cae
FG
397 // Trigger a flush on cf2
398 ASSERT_OK(Put(2, Key(1), DummyString(70000), wo));
11fdf7f2 399 wait_flush();
7c673cae 400 ASSERT_OK(Put(0, Key(1), DummyString(20000), wo));
11fdf7f2 401 wait_flush();
7c673cae
FG
402
403 // Insert to DB2
404 ASSERT_OK(db2->Put(wo, Key(2), DummyString(20000)));
11fdf7f2 405 wait_flush();
7c673cae
FG
406
407 ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
11fdf7f2 408 wait_flush();
7c673cae
FG
409 static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable();
410 {
411 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default") +
412 GetNumberOfSstFilesForColumnFamily(db_, "cf1") +
413 GetNumberOfSstFilesForColumnFamily(db_, "cf2"),
414 static_cast<uint64_t>(1));
415 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"),
416 static_cast<uint64_t>(0));
417 }
418
419 // Triggering to flush another CF in DB1
420 ASSERT_OK(db2->Put(wo, Key(2), DummyString(70000)));
11fdf7f2 421 wait_flush();
7c673cae 422 ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
11fdf7f2 423 wait_flush();
7c673cae
FG
424 {
425 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
426 static_cast<uint64_t>(1));
427 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf1"),
428 static_cast<uint64_t>(0));
429 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf2"),
430 static_cast<uint64_t>(1));
431 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"),
432 static_cast<uint64_t>(0));
433 }
434
435 // Triggering flush in DB2.
436 ASSERT_OK(db2->Put(wo, Key(3), DummyString(40000)));
11fdf7f2 437 wait_flush();
7c673cae 438 ASSERT_OK(db2->Put(wo, Key(1), DummyString(1)));
11fdf7f2 439 wait_flush();
7c673cae
FG
440 static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable();
441 {
442 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
443 static_cast<uint64_t>(1));
444 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf1"),
445 static_cast<uint64_t>(0));
446 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf2"),
447 static_cast<uint64_t>(1));
448 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"),
449 static_cast<uint64_t>(1));
450 }
451
452 delete db2;
453 ASSERT_OK(DestroyDB(dbname2, options));
11fdf7f2
TL
454
455 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
7c673cae
FG
456}
457
494da23a
TL
458TEST_F(DBTest2, TestWriteBufferNoLimitWithCache) {
459 Options options = CurrentOptions();
460 options.arena_block_size = 4096;
461 std::shared_ptr<Cache> cache =
462 NewLRUCache(LRUCacheOptions(10000000, 1, false, 0.0));
463 options.write_buffer_size = 50000; // this is never hit
464 // Use a write buffer total size so that the soft limit is about
465 // 105000.
466 options.write_buffer_manager.reset(new WriteBufferManager(0, cache));
467 Reopen(options);
468
469 ASSERT_OK(Put("foo", "bar"));
470 // One dummy entry is 1MB.
471 ASSERT_GT(cache->GetUsage(), 500000);
472}
473
7c673cae
FG
474namespace {
475 void ValidateKeyExistence(DB* db, const std::vector<Slice>& keys_must_exist,
476 const std::vector<Slice>& keys_must_not_exist) {
477 // Ensure that expected keys exist
478 std::vector<std::string> values;
479 if (keys_must_exist.size() > 0) {
480 std::vector<Status> status_list =
481 db->MultiGet(ReadOptions(), keys_must_exist, &values);
482 for (size_t i = 0; i < keys_must_exist.size(); i++) {
483 ASSERT_OK(status_list[i]);
484 }
485 }
486
487 // Ensure that given keys don't exist
488 if (keys_must_not_exist.size() > 0) {
489 std::vector<Status> status_list =
490 db->MultiGet(ReadOptions(), keys_must_not_exist, &values);
491 for (size_t i = 0; i < keys_must_not_exist.size(); i++) {
492 ASSERT_TRUE(status_list[i].IsNotFound());
493 }
494 }
495 }
496
497} // namespace
498
499TEST_F(DBTest2, WalFilterTest) {
500 class TestWalFilter : public WalFilter {
501 private:
502 // Processing option that is requested to be applied at the given index
503 WalFilter::WalProcessingOption wal_processing_option_;
504 // Index at which to apply wal_processing_option_
505 // At other indexes default wal_processing_option::kContinueProcessing is
506 // returned.
507 size_t apply_option_at_record_index_;
508 // Current record index, incremented with each record encountered.
509 size_t current_record_index_;
510
511 public:
512 TestWalFilter(WalFilter::WalProcessingOption wal_processing_option,
513 size_t apply_option_for_record_index)
514 : wal_processing_option_(wal_processing_option),
515 apply_option_at_record_index_(apply_option_for_record_index),
516 current_record_index_(0) {}
517
494da23a
TL
518 WalProcessingOption LogRecord(const WriteBatch& /*batch*/,
519 WriteBatch* /*new_batch*/,
520 bool* /*batch_changed*/) const override {
7c673cae
FG
521 WalFilter::WalProcessingOption option_to_return;
522
523 if (current_record_index_ == apply_option_at_record_index_) {
524 option_to_return = wal_processing_option_;
525 }
526 else {
527 option_to_return = WalProcessingOption::kContinueProcessing;
528 }
529
530 // Filter is passed as a const object for RocksDB to not modify the
531 // object, however we modify it for our own purpose here and hence
532 // cast the constness away.
533 (const_cast<TestWalFilter*>(this)->current_record_index_)++;
534
535 return option_to_return;
536 }
537
494da23a 538 const char* Name() const override { return "TestWalFilter"; }
7c673cae
FG
539 };
540
541 // Create 3 batches with two keys each
542 std::vector<std::vector<std::string>> batch_keys(3);
543
544 batch_keys[0].push_back("key1");
545 batch_keys[0].push_back("key2");
546 batch_keys[1].push_back("key3");
547 batch_keys[1].push_back("key4");
548 batch_keys[2].push_back("key5");
549 batch_keys[2].push_back("key6");
550
551 // Test with all WAL processing options
552 for (int option = 0;
553 option < static_cast<int>(
554 WalFilter::WalProcessingOption::kWalProcessingOptionMax);
555 option++) {
556 Options options = OptionsForLogIterTest();
557 DestroyAndReopen(options);
558 CreateAndReopenWithCF({ "pikachu" }, options);
559
560 // Write given keys in given batches
561 for (size_t i = 0; i < batch_keys.size(); i++) {
562 WriteBatch batch;
563 for (size_t j = 0; j < batch_keys[i].size(); j++) {
564 batch.Put(handles_[0], batch_keys[i][j], DummyString(1024));
565 }
566 dbfull()->Write(WriteOptions(), &batch);
567 }
568
569 WalFilter::WalProcessingOption wal_processing_option =
570 static_cast<WalFilter::WalProcessingOption>(option);
571
572 // Create a test filter that would apply wal_processing_option at the first
573 // record
574 size_t apply_option_for_record_index = 1;
575 TestWalFilter test_wal_filter(wal_processing_option,
576 apply_option_for_record_index);
577
578 // Reopen database with option to use WAL filter
579 options = OptionsForLogIterTest();
580 options.wal_filter = &test_wal_filter;
581 Status status =
582 TryReopenWithColumnFamilies({ "default", "pikachu" }, options);
583 if (wal_processing_option ==
584 WalFilter::WalProcessingOption::kCorruptedRecord) {
585 assert(!status.ok());
586 // In case of corruption we can turn off paranoid_checks to reopen
587 // databse
588 options.paranoid_checks = false;
589 ReopenWithColumnFamilies({ "default", "pikachu" }, options);
590 }
591 else {
592 assert(status.ok());
593 }
594
595 // Compute which keys we expect to be found
596 // and which we expect not to be found after recovery.
597 std::vector<Slice> keys_must_exist;
598 std::vector<Slice> keys_must_not_exist;
599 switch (wal_processing_option) {
600 case WalFilter::WalProcessingOption::kCorruptedRecord:
601 case WalFilter::WalProcessingOption::kContinueProcessing: {
602 fprintf(stderr, "Testing with complete WAL processing\n");
603 // we expect all records to be processed
604 for (size_t i = 0; i < batch_keys.size(); i++) {
605 for (size_t j = 0; j < batch_keys[i].size(); j++) {
606 keys_must_exist.push_back(Slice(batch_keys[i][j]));
607 }
608 }
609 break;
610 }
611 case WalFilter::WalProcessingOption::kIgnoreCurrentRecord: {
612 fprintf(stderr,
613 "Testing with ignoring record %" ROCKSDB_PRIszt " only\n",
614 apply_option_for_record_index);
615 // We expect the record with apply_option_for_record_index to be not
616 // found.
617 for (size_t i = 0; i < batch_keys.size(); i++) {
618 for (size_t j = 0; j < batch_keys[i].size(); j++) {
619 if (i == apply_option_for_record_index) {
620 keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
621 }
622 else {
623 keys_must_exist.push_back(Slice(batch_keys[i][j]));
624 }
625 }
626 }
627 break;
628 }
629 case WalFilter::WalProcessingOption::kStopReplay: {
630 fprintf(stderr,
631 "Testing with stopping replay from record %" ROCKSDB_PRIszt
632 "\n",
633 apply_option_for_record_index);
634 // We expect records beyond apply_option_for_record_index to be not
635 // found.
636 for (size_t i = 0; i < batch_keys.size(); i++) {
637 for (size_t j = 0; j < batch_keys[i].size(); j++) {
638 if (i >= apply_option_for_record_index) {
639 keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
640 }
641 else {
642 keys_must_exist.push_back(Slice(batch_keys[i][j]));
643 }
644 }
645 }
646 break;
647 }
648 default:
649 assert(false); // unhandled case
650 }
651
652 bool checked_after_reopen = false;
653
654 while (true) {
655 // Ensure that expected keys exists
656 // and not expected keys don't exist after recovery
657 ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist);
658
659 if (checked_after_reopen) {
660 break;
661 }
662
663 // reopen database again to make sure previous log(s) are not used
664 //(even if they were skipped)
665 // reopn database with option to use WAL filter
666 options = OptionsForLogIterTest();
667 ReopenWithColumnFamilies({ "default", "pikachu" }, options);
668
669 checked_after_reopen = true;
670 }
671 }
672}
673
674TEST_F(DBTest2, WalFilterTestWithChangeBatch) {
675 class ChangeBatchHandler : public WriteBatch::Handler {
676 private:
677 // Batch to insert keys in
678 WriteBatch* new_write_batch_;
679 // Number of keys to add in the new batch
680 size_t num_keys_to_add_in_new_batch_;
681 // Number of keys added to new batch
682 size_t num_keys_added_;
683
684 public:
685 ChangeBatchHandler(WriteBatch* new_write_batch,
686 size_t num_keys_to_add_in_new_batch)
687 : new_write_batch_(new_write_batch),
688 num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch),
689 num_keys_added_(0) {}
494da23a 690 void Put(const Slice& key, const Slice& value) override {
7c673cae
FG
691 if (num_keys_added_ < num_keys_to_add_in_new_batch_) {
692 new_write_batch_->Put(key, value);
693 ++num_keys_added_;
694 }
695 }
696 };
697
698 class TestWalFilterWithChangeBatch : public WalFilter {
699 private:
700 // Index at which to start changing records
701 size_t change_records_from_index_;
702 // Number of keys to add in the new batch
703 size_t num_keys_to_add_in_new_batch_;
704 // Current record index, incremented with each record encountered.
705 size_t current_record_index_;
706
707 public:
708 TestWalFilterWithChangeBatch(size_t change_records_from_index,
709 size_t num_keys_to_add_in_new_batch)
710 : change_records_from_index_(change_records_from_index),
711 num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch),
712 current_record_index_(0) {}
713
494da23a
TL
714 WalProcessingOption LogRecord(const WriteBatch& batch,
715 WriteBatch* new_batch,
716 bool* batch_changed) const override {
7c673cae
FG
717 if (current_record_index_ >= change_records_from_index_) {
718 ChangeBatchHandler handler(new_batch, num_keys_to_add_in_new_batch_);
719 batch.Iterate(&handler);
720 *batch_changed = true;
721 }
722
723 // Filter is passed as a const object for RocksDB to not modify the
724 // object, however we modify it for our own purpose here and hence
725 // cast the constness away.
726 (const_cast<TestWalFilterWithChangeBatch*>(this)
727 ->current_record_index_)++;
728
729 return WalProcessingOption::kContinueProcessing;
730 }
731
494da23a 732 const char* Name() const override { return "TestWalFilterWithChangeBatch"; }
7c673cae
FG
733 };
734
735 std::vector<std::vector<std::string>> batch_keys(3);
736
737 batch_keys[0].push_back("key1");
738 batch_keys[0].push_back("key2");
739 batch_keys[1].push_back("key3");
740 batch_keys[1].push_back("key4");
741 batch_keys[2].push_back("key5");
742 batch_keys[2].push_back("key6");
743
744 Options options = OptionsForLogIterTest();
745 DestroyAndReopen(options);
746 CreateAndReopenWithCF({ "pikachu" }, options);
747
748 // Write given keys in given batches
749 for (size_t i = 0; i < batch_keys.size(); i++) {
750 WriteBatch batch;
751 for (size_t j = 0; j < batch_keys[i].size(); j++) {
752 batch.Put(handles_[0], batch_keys[i][j], DummyString(1024));
753 }
754 dbfull()->Write(WriteOptions(), &batch);
755 }
756
757 // Create a test filter that would apply wal_processing_option at the first
758 // record
759 size_t change_records_from_index = 1;
760 size_t num_keys_to_add_in_new_batch = 1;
761 TestWalFilterWithChangeBatch test_wal_filter_with_change_batch(
762 change_records_from_index, num_keys_to_add_in_new_batch);
763
764 // Reopen database with option to use WAL filter
765 options = OptionsForLogIterTest();
766 options.wal_filter = &test_wal_filter_with_change_batch;
767 ReopenWithColumnFamilies({ "default", "pikachu" }, options);
768
769 // Ensure that all keys exist before change_records_from_index_
770 // And after that index only single key exists
771 // as our filter adds only single key for each batch
772 std::vector<Slice> keys_must_exist;
773 std::vector<Slice> keys_must_not_exist;
774
775 for (size_t i = 0; i < batch_keys.size(); i++) {
776 for (size_t j = 0; j < batch_keys[i].size(); j++) {
777 if (i >= change_records_from_index && j >= num_keys_to_add_in_new_batch) {
778 keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
779 }
780 else {
781 keys_must_exist.push_back(Slice(batch_keys[i][j]));
782 }
783 }
784 }
785
786 bool checked_after_reopen = false;
787
788 while (true) {
789 // Ensure that expected keys exists
790 // and not expected keys don't exist after recovery
791 ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist);
792
793 if (checked_after_reopen) {
794 break;
795 }
796
797 // reopen database again to make sure previous log(s) are not used
798 //(even if they were skipped)
799 // reopn database with option to use WAL filter
800 options = OptionsForLogIterTest();
801 ReopenWithColumnFamilies({ "default", "pikachu" }, options);
802
803 checked_after_reopen = true;
804 }
805}
806
807TEST_F(DBTest2, WalFilterTestWithChangeBatchExtraKeys) {
808 class TestWalFilterWithChangeBatchAddExtraKeys : public WalFilter {
809 public:
494da23a
TL
810 WalProcessingOption LogRecord(const WriteBatch& batch, WriteBatch* new_batch,
811 bool* batch_changed) const override {
812 *new_batch = batch;
813 new_batch->Put("key_extra", "value_extra");
814 *batch_changed = true;
815 return WalProcessingOption::kContinueProcessing;
816 }
817
818 const char* Name() const override {
819 return "WalFilterTestWithChangeBatchExtraKeys";
820 }
7c673cae
FG
821 };
822
823 std::vector<std::vector<std::string>> batch_keys(3);
824
825 batch_keys[0].push_back("key1");
826 batch_keys[0].push_back("key2");
827 batch_keys[1].push_back("key3");
828 batch_keys[1].push_back("key4");
829 batch_keys[2].push_back("key5");
830 batch_keys[2].push_back("key6");
831
832 Options options = OptionsForLogIterTest();
833 DestroyAndReopen(options);
834 CreateAndReopenWithCF({ "pikachu" }, options);
835
836 // Write given keys in given batches
837 for (size_t i = 0; i < batch_keys.size(); i++) {
838 WriteBatch batch;
839 for (size_t j = 0; j < batch_keys[i].size(); j++) {
840 batch.Put(handles_[0], batch_keys[i][j], DummyString(1024));
841 }
842 dbfull()->Write(WriteOptions(), &batch);
843 }
844
845 // Create a test filter that would add extra keys
846 TestWalFilterWithChangeBatchAddExtraKeys test_wal_filter_extra_keys;
847
848 // Reopen database with option to use WAL filter
849 options = OptionsForLogIterTest();
850 options.wal_filter = &test_wal_filter_extra_keys;
851 Status status = TryReopenWithColumnFamilies({"default", "pikachu"}, options);
852 ASSERT_TRUE(status.IsNotSupported());
853
854 // Reopen without filter, now reopen should succeed - previous
855 // attempt to open must not have altered the db.
856 options = OptionsForLogIterTest();
857 ReopenWithColumnFamilies({ "default", "pikachu" }, options);
858
859 std::vector<Slice> keys_must_exist;
860 std::vector<Slice> keys_must_not_exist; // empty vector
861
862 for (size_t i = 0; i < batch_keys.size(); i++) {
863 for (size_t j = 0; j < batch_keys[i].size(); j++) {
864 keys_must_exist.push_back(Slice(batch_keys[i][j]));
865 }
866 }
867
868 ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist);
869}
870
871TEST_F(DBTest2, WalFilterTestWithColumnFamilies) {
872 class TestWalFilterWithColumnFamilies : public WalFilter {
873 private:
874 // column_family_id -> log_number map (provided to WALFilter)
875 std::map<uint32_t, uint64_t> cf_log_number_map_;
876 // column_family_name -> column_family_id map (provided to WALFilter)
877 std::map<std::string, uint32_t> cf_name_id_map_;
878 // column_family_name -> keys_found_in_wal map
879 // We store keys that are applicable to the column_family
880 // during recovery (i.e. aren't already flushed to SST file(s))
881 // for verification against the keys we expect.
882 std::map<uint32_t, std::vector<std::string>> cf_wal_keys_;
883 public:
494da23a
TL
884 void ColumnFamilyLogNumberMap(
885 const std::map<uint32_t, uint64_t>& cf_lognumber_map,
886 const std::map<std::string, uint32_t>& cf_name_id_map) override {
887 cf_log_number_map_ = cf_lognumber_map;
888 cf_name_id_map_ = cf_name_id_map;
889 }
890
891 WalProcessingOption LogRecordFound(unsigned long long log_number,
892 const std::string& /*log_file_name*/,
893 const WriteBatch& batch,
894 WriteBatch* /*new_batch*/,
895 bool* /*batch_changed*/) override {
896 class LogRecordBatchHandler : public WriteBatch::Handler {
7c673cae
FG
897 private:
898 const std::map<uint32_t, uint64_t> & cf_log_number_map_;
899 std::map<uint32_t, std::vector<std::string>> & cf_wal_keys_;
900 unsigned long long log_number_;
901 public:
902 LogRecordBatchHandler(unsigned long long current_log_number,
903 const std::map<uint32_t, uint64_t> & cf_log_number_map,
904 std::map<uint32_t, std::vector<std::string>> & cf_wal_keys) :
905 cf_log_number_map_(cf_log_number_map),
906 cf_wal_keys_(cf_wal_keys),
907 log_number_(current_log_number){}
908
494da23a
TL
909 Status PutCF(uint32_t column_family_id, const Slice& key,
910 const Slice& /*value*/) override {
7c673cae
FG
911 auto it = cf_log_number_map_.find(column_family_id);
912 assert(it != cf_log_number_map_.end());
913 unsigned long long log_number_for_cf = it->second;
914 // If the current record is applicable for column_family_id
915 // (i.e. isn't flushed to SST file(s) for column_family_id)
916 // add it to the cf_wal_keys_ map for verification.
917 if (log_number_ >= log_number_for_cf) {
918 cf_wal_keys_[column_family_id].push_back(std::string(key.data(),
919 key.size()));
920 }
921 return Status::OK();
922 }
923 } handler(log_number, cf_log_number_map_, cf_wal_keys_);
924
925 batch.Iterate(&handler);
926
927 return WalProcessingOption::kContinueProcessing;
494da23a 928 }
7c673cae 929
494da23a
TL
930 const char* Name() const override {
931 return "WalFilterTestWithColumnFamilies";
932 }
7c673cae
FG
933
934 const std::map<uint32_t, std::vector<std::string>>& GetColumnFamilyKeys() {
935 return cf_wal_keys_;
936 }
937
938 const std::map<std::string, uint32_t> & GetColumnFamilyNameIdMap() {
939 return cf_name_id_map_;
940 }
941 };
942
943 std::vector<std::vector<std::string>> batch_keys_pre_flush(3);
944
945 batch_keys_pre_flush[0].push_back("key1");
946 batch_keys_pre_flush[0].push_back("key2");
947 batch_keys_pre_flush[1].push_back("key3");
948 batch_keys_pre_flush[1].push_back("key4");
949 batch_keys_pre_flush[2].push_back("key5");
950 batch_keys_pre_flush[2].push_back("key6");
951
952 Options options = OptionsForLogIterTest();
953 DestroyAndReopen(options);
954 CreateAndReopenWithCF({ "pikachu" }, options);
955
956 // Write given keys in given batches
957 for (size_t i = 0; i < batch_keys_pre_flush.size(); i++) {
958 WriteBatch batch;
959 for (size_t j = 0; j < batch_keys_pre_flush[i].size(); j++) {
960 batch.Put(handles_[0], batch_keys_pre_flush[i][j], DummyString(1024));
961 batch.Put(handles_[1], batch_keys_pre_flush[i][j], DummyString(1024));
962 }
963 dbfull()->Write(WriteOptions(), &batch);
964 }
965
966 //Flush default column-family
967 db_->Flush(FlushOptions(), handles_[0]);
968
969 // Do some more writes
970 std::vector<std::vector<std::string>> batch_keys_post_flush(3);
971
972 batch_keys_post_flush[0].push_back("key7");
973 batch_keys_post_flush[0].push_back("key8");
974 batch_keys_post_flush[1].push_back("key9");
975 batch_keys_post_flush[1].push_back("key10");
976 batch_keys_post_flush[2].push_back("key11");
977 batch_keys_post_flush[2].push_back("key12");
978
979 // Write given keys in given batches
980 for (size_t i = 0; i < batch_keys_post_flush.size(); i++) {
981 WriteBatch batch;
982 for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) {
983 batch.Put(handles_[0], batch_keys_post_flush[i][j], DummyString(1024));
984 batch.Put(handles_[1], batch_keys_post_flush[i][j], DummyString(1024));
985 }
986 dbfull()->Write(WriteOptions(), &batch);
987 }
988
989 // On Recovery we should only find the second batch applicable to default CF
990 // But both batches applicable to pikachu CF
991
992 // Create a test filter that would add extra keys
993 TestWalFilterWithColumnFamilies test_wal_filter_column_families;
994
995 // Reopen database with option to use WAL filter
996 options = OptionsForLogIterTest();
997 options.wal_filter = &test_wal_filter_column_families;
998 Status status =
999 TryReopenWithColumnFamilies({ "default", "pikachu" }, options);
1000 ASSERT_TRUE(status.ok());
1001
1002 // verify that handles_[0] only has post_flush keys
1003 // while handles_[1] has pre and post flush keys
1004 auto cf_wal_keys = test_wal_filter_column_families.GetColumnFamilyKeys();
1005 auto name_id_map = test_wal_filter_column_families.GetColumnFamilyNameIdMap();
1006 size_t index = 0;
1007 auto keys_cf = cf_wal_keys[name_id_map[kDefaultColumnFamilyName]];
1008 //default column-family, only post_flush keys are expected
1009 for (size_t i = 0; i < batch_keys_post_flush.size(); i++) {
1010 for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) {
1011 Slice key_from_the_log(keys_cf[index++]);
1012 Slice batch_key(batch_keys_post_flush[i][j]);
1013 ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0);
1014 }
1015 }
1016 ASSERT_TRUE(index == keys_cf.size());
1017
1018 index = 0;
1019 keys_cf = cf_wal_keys[name_id_map["pikachu"]];
1020 //pikachu column-family, all keys are expected
1021 for (size_t i = 0; i < batch_keys_pre_flush.size(); i++) {
1022 for (size_t j = 0; j < batch_keys_pre_flush[i].size(); j++) {
1023 Slice key_from_the_log(keys_cf[index++]);
1024 Slice batch_key(batch_keys_pre_flush[i][j]);
1025 ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0);
1026 }
1027 }
1028
1029 for (size_t i = 0; i < batch_keys_post_flush.size(); i++) {
1030 for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) {
1031 Slice key_from_the_log(keys_cf[index++]);
1032 Slice batch_key(batch_keys_post_flush[i][j]);
1033 ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0);
1034 }
1035 }
1036 ASSERT_TRUE(index == keys_cf.size());
1037}
1038
494da23a
TL
1039// Temporarily disable it because the test is flaky.
1040TEST_F(DBTest2, DISABLED_PresetCompressionDict) {
1041 // Verifies that compression ratio improves when dictionary is enabled, and
1042 // improves even further when the dictionary is trained by ZSTD.
7c673cae
FG
1043 const size_t kBlockSizeBytes = 4 << 10;
1044 const size_t kL0FileBytes = 128 << 10;
1045 const size_t kApproxPerBlockOverheadBytes = 50;
1046 const int kNumL0Files = 5;
1047
1048 Options options;
11fdf7f2 1049 options.env = CurrentOptions().env; // Make sure to use any custom env that the test is configured with.
7c673cae
FG
1050 options.allow_concurrent_memtable_write = false;
1051 options.arena_block_size = kBlockSizeBytes;
7c673cae
FG
1052 options.create_if_missing = true;
1053 options.disable_auto_compactions = true;
1054 options.level0_file_num_compaction_trigger = kNumL0Files;
1055 options.memtable_factory.reset(
1056 new SpecialSkipListFactory(kL0FileBytes / kBlockSizeBytes));
1057 options.num_levels = 2;
1058 options.target_file_size_base = kL0FileBytes;
1059 options.target_file_size_multiplier = 2;
1060 options.write_buffer_size = kL0FileBytes;
1061 BlockBasedTableOptions table_options;
1062 table_options.block_size = kBlockSizeBytes;
1063 std::vector<CompressionType> compression_types;
1064 if (Zlib_Supported()) {
1065 compression_types.push_back(kZlibCompression);
1066 }
1067#if LZ4_VERSION_NUMBER >= 10400 // r124+
1068 compression_types.push_back(kLZ4Compression);
1069 compression_types.push_back(kLZ4HCCompression);
1070#endif // LZ4_VERSION_NUMBER >= 10400
1071 if (ZSTD_Supported()) {
1072 compression_types.push_back(kZSTD);
1073 }
1074
1075 for (auto compression_type : compression_types) {
1076 options.compression = compression_type;
1077 size_t prev_out_bytes;
11fdf7f2 1078 for (int i = 0; i < 3; ++i) {
7c673cae
FG
1079 // First iteration: compress without preset dictionary
1080 // Second iteration: compress with preset dictionary
11fdf7f2
TL
1081 // Third iteration (zstd only): compress with zstd-trained dictionary
1082 //
1083 // To make sure the compression dictionary has the intended effect, we
1084 // verify the compressed size is smaller in successive iterations. Also in
1085 // the non-first iterations, verify the data we get out is the same data
1086 // we put in.
1087 switch (i) {
1088 case 0:
1089 options.compression_opts.max_dict_bytes = 0;
1090 options.compression_opts.zstd_max_train_bytes = 0;
1091 break;
1092 case 1:
494da23a 1093 options.compression_opts.max_dict_bytes = 4 * kBlockSizeBytes;
11fdf7f2
TL
1094 options.compression_opts.zstd_max_train_bytes = 0;
1095 break;
1096 case 2:
1097 if (compression_type != kZSTD) {
1098 continue;
1099 }
494da23a
TL
1100 options.compression_opts.max_dict_bytes = 4 * kBlockSizeBytes;
1101 options.compression_opts.zstd_max_train_bytes = kL0FileBytes;
11fdf7f2
TL
1102 break;
1103 default:
1104 assert(false);
7c673cae
FG
1105 }
1106
1107 options.statistics = rocksdb::CreateDBStatistics();
1108 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
1109 CreateAndReopenWithCF({"pikachu"}, options);
1110 Random rnd(301);
494da23a
TL
1111 std::string seq_datas[10];
1112 for (int j = 0; j < 10; ++j) {
1113 seq_datas[j] =
1114 RandomString(&rnd, kBlockSizeBytes - kApproxPerBlockOverheadBytes);
1115 }
7c673cae
FG
1116
1117 ASSERT_EQ(0, NumTableFilesAtLevel(0, 1));
1118 for (int j = 0; j < kNumL0Files; ++j) {
1119 for (size_t k = 0; k < kL0FileBytes / kBlockSizeBytes + 1; ++k) {
494da23a
TL
1120 auto key_num = j * (kL0FileBytes / kBlockSizeBytes) + k;
1121 ASSERT_OK(Put(1, Key(static_cast<int>(key_num)),
1122 seq_datas[(key_num / 10) % 10]));
7c673cae
FG
1123 }
1124 dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
1125 ASSERT_EQ(j + 1, NumTableFilesAtLevel(0, 1));
1126 }
494da23a
TL
1127 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1],
1128 true /* disallow_trivial_move */);
7c673cae
FG
1129 ASSERT_EQ(0, NumTableFilesAtLevel(0, 1));
1130 ASSERT_GT(NumTableFilesAtLevel(1, 1), 0);
1131
1132 size_t out_bytes = 0;
1133 std::vector<std::string> files;
11fdf7f2 1134 GetSstFiles(env_, dbname_, &files);
7c673cae
FG
1135 for (const auto& file : files) {
1136 uint64_t curr_bytes;
1137 env_->GetFileSize(dbname_ + "/" + file, &curr_bytes);
1138 out_bytes += static_cast<size_t>(curr_bytes);
1139 }
1140
1141 for (size_t j = 0; j < kNumL0Files * (kL0FileBytes / kBlockSizeBytes);
1142 j++) {
494da23a 1143 ASSERT_EQ(seq_datas[(j / 10) % 10], Get(1, Key(static_cast<int>(j))));
7c673cae
FG
1144 }
1145 if (i) {
1146 ASSERT_GT(prev_out_bytes, out_bytes);
1147 }
1148 prev_out_bytes = out_bytes;
1149 DestroyAndReopen(options);
1150 }
1151 }
1152}
1153
494da23a
TL
1154TEST_F(DBTest2, PresetCompressionDictLocality) {
1155 if (!ZSTD_Supported()) {
1156 return;
1157 }
1158 // Verifies that compression dictionary is generated from local data. The
1159 // verification simply checks all output SSTs have different compression
1160 // dictionaries. We do not verify effectiveness as that'd likely be flaky in
1161 // the future.
1162 const int kNumEntriesPerFile = 1 << 10; // 1KB
1163 const int kNumBytesPerEntry = 1 << 10; // 1KB
1164 const int kNumFiles = 4;
1165 Options options = CurrentOptions();
1166 options.compression = kZSTD;
1167 options.compression_opts.max_dict_bytes = 1 << 14; // 16KB
1168 options.compression_opts.zstd_max_train_bytes = 1 << 18; // 256KB
1169 options.statistics = rocksdb::CreateDBStatistics();
1170 options.target_file_size_base = kNumEntriesPerFile * kNumBytesPerEntry;
1171 BlockBasedTableOptions table_options;
1172 table_options.cache_index_and_filter_blocks = true;
1173 options.table_factory.reset(new BlockBasedTableFactory(table_options));
1174 Reopen(options);
1175
1176 Random rnd(301);
1177 for (int i = 0; i < kNumFiles; ++i) {
1178 for (int j = 0; j < kNumEntriesPerFile; ++j) {
1179 ASSERT_OK(Put(Key(i * kNumEntriesPerFile + j),
1180 RandomString(&rnd, kNumBytesPerEntry)));
1181 }
1182 ASSERT_OK(Flush());
1183 MoveFilesToLevel(1);
1184 ASSERT_EQ(NumTableFilesAtLevel(1), i + 1);
1185 }
1186
1187 // Store all the dictionaries generated during a full compaction.
1188 std::vector<std::string> compression_dicts;
1189 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1190 "BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict",
1191 [&](void* arg) {
1192 compression_dicts.emplace_back(static_cast<Slice*>(arg)->ToString());
1193 });
1194 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1195 CompactRangeOptions compact_range_opts;
1196 compact_range_opts.bottommost_level_compaction =
1197 BottommostLevelCompaction::kForce;
1198 ASSERT_OK(db_->CompactRange(compact_range_opts, nullptr, nullptr));
1199
1200 // Dictionary compression should not be so good as to compress four totally
1201 // random files into one. If it does then there's probably something wrong
1202 // with the test.
1203 ASSERT_GT(NumTableFilesAtLevel(1), 1);
1204
1205 // Furthermore, there should be one compression dictionary generated per file.
1206 // And they should all be different from each other.
1207 ASSERT_EQ(NumTableFilesAtLevel(1),
1208 static_cast<int>(compression_dicts.size()));
1209 for (size_t i = 1; i < compression_dicts.size(); ++i) {
1210 std::string& a = compression_dicts[i - 1];
1211 std::string& b = compression_dicts[i];
1212 size_t alen = a.size();
1213 size_t blen = b.size();
1214 ASSERT_TRUE(alen != blen || memcmp(a.data(), b.data(), alen) != 0);
1215 }
1216}
1217
7c673cae
FG
1218class CompactionCompressionListener : public EventListener {
1219 public:
1220 explicit CompactionCompressionListener(Options* db_options)
1221 : db_options_(db_options) {}
1222
1223 void OnCompactionCompleted(DB* db, const CompactionJobInfo& ci) override {
1224 // Figure out last level with files
1225 int bottommost_level = 0;
1226 for (int level = 0; level < db->NumberLevels(); level++) {
1227 std::string files_at_level;
1228 ASSERT_TRUE(
1229 db->GetProperty("rocksdb.num-files-at-level" + NumberToString(level),
1230 &files_at_level));
1231 if (files_at_level != "0") {
1232 bottommost_level = level;
1233 }
1234 }
1235
1236 if (db_options_->bottommost_compression != kDisableCompressionOption &&
11fdf7f2 1237 ci.output_level == bottommost_level) {
7c673cae
FG
1238 ASSERT_EQ(ci.compression, db_options_->bottommost_compression);
1239 } else if (db_options_->compression_per_level.size() != 0) {
1240 ASSERT_EQ(ci.compression,
1241 db_options_->compression_per_level[ci.output_level]);
1242 } else {
1243 ASSERT_EQ(ci.compression, db_options_->compression);
1244 }
1245 max_level_checked = std::max(max_level_checked, ci.output_level);
1246 }
1247
1248 int max_level_checked = 0;
1249 const Options* db_options_;
1250};
1251
1252TEST_F(DBTest2, CompressionOptions) {
1253 if (!Zlib_Supported() || !Snappy_Supported()) {
1254 return;
1255 }
1256
1257 Options options = CurrentOptions();
1258 options.level0_file_num_compaction_trigger = 2;
1259 options.max_bytes_for_level_base = 100;
1260 options.max_bytes_for_level_multiplier = 2;
1261 options.num_levels = 7;
1262 options.max_background_compactions = 1;
7c673cae
FG
1263
1264 CompactionCompressionListener* listener =
1265 new CompactionCompressionListener(&options);
1266 options.listeners.emplace_back(listener);
1267
1268 const int kKeySize = 5;
1269 const int kValSize = 20;
1270 Random rnd(301);
1271
1272 for (int iter = 0; iter <= 2; iter++) {
1273 listener->max_level_checked = 0;
1274
1275 if (iter == 0) {
1276 // Use different compression algorithms for different levels but
1277 // always use Zlib for bottommost level
1278 options.compression_per_level = {kNoCompression, kNoCompression,
1279 kNoCompression, kSnappyCompression,
1280 kSnappyCompression, kSnappyCompression,
1281 kZlibCompression};
1282 options.compression = kNoCompression;
1283 options.bottommost_compression = kZlibCompression;
1284 } else if (iter == 1) {
1285 // Use Snappy except for bottommost level use ZLib
1286 options.compression_per_level = {};
1287 options.compression = kSnappyCompression;
1288 options.bottommost_compression = kZlibCompression;
1289 } else if (iter == 2) {
1290 // Use Snappy everywhere
1291 options.compression_per_level = {};
1292 options.compression = kSnappyCompression;
1293 options.bottommost_compression = kDisableCompressionOption;
1294 }
1295
1296 DestroyAndReopen(options);
1297 // Write 10 random files
1298 for (int i = 0; i < 10; i++) {
1299 for (int j = 0; j < 5; j++) {
1300 ASSERT_OK(
1301 Put(RandomString(&rnd, kKeySize), RandomString(&rnd, kValSize)));
1302 }
1303 ASSERT_OK(Flush());
1304 dbfull()->TEST_WaitForCompact();
1305 }
1306
1307 // Make sure that we wrote enough to check all 7 levels
1308 ASSERT_EQ(listener->max_level_checked, 6);
1309 }
1310}
1311
1312class CompactionStallTestListener : public EventListener {
1313 public:
494da23a
TL
1314 CompactionStallTestListener() : compacting_files_cnt_(0), compacted_files_cnt_(0) {}
1315
1316 void OnCompactionBegin(DB* /*db*/, const CompactionJobInfo& ci) override {
1317 ASSERT_EQ(ci.cf_name, "default");
1318 ASSERT_EQ(ci.base_input_level, 0);
1319 ASSERT_EQ(ci.compaction_reason, CompactionReason::kLevelL0FilesNum);
1320 compacting_files_cnt_ += ci.input_files.size();
1321 }
7c673cae 1322
11fdf7f2 1323 void OnCompactionCompleted(DB* /*db*/, const CompactionJobInfo& ci) override {
7c673cae
FG
1324 ASSERT_EQ(ci.cf_name, "default");
1325 ASSERT_EQ(ci.base_input_level, 0);
1326 ASSERT_EQ(ci.compaction_reason, CompactionReason::kLevelL0FilesNum);
1327 compacted_files_cnt_ += ci.input_files.size();
1328 }
494da23a
TL
1329
1330 std::atomic<size_t> compacting_files_cnt_;
7c673cae
FG
1331 std::atomic<size_t> compacted_files_cnt_;
1332};
1333
1334TEST_F(DBTest2, CompactionStall) {
1335 rocksdb::SyncPoint::GetInstance()->LoadDependency(
1336 {{"DBImpl::BGWorkCompaction", "DBTest2::CompactionStall:0"},
1337 {"DBImpl::BGWorkCompaction", "DBTest2::CompactionStall:1"},
1338 {"DBTest2::CompactionStall:2",
494da23a
TL
1339 "DBImpl::NotifyOnCompactionBegin::UnlockMutex"},
1340 {"DBTest2::CompactionStall:3",
7c673cae
FG
1341 "DBImpl::NotifyOnCompactionCompleted::UnlockMutex"}});
1342 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1343
1344 Options options = CurrentOptions();
1345 options.level0_file_num_compaction_trigger = 4;
1346 options.max_background_compactions = 40;
1347 CompactionStallTestListener* listener = new CompactionStallTestListener();
1348 options.listeners.emplace_back(listener);
1349 DestroyAndReopen(options);
11fdf7f2
TL
1350 // make sure all background compaction jobs can be scheduled
1351 auto stop_token =
1352 dbfull()->TEST_write_controler().GetCompactionPressureToken();
7c673cae
FG
1353
1354 Random rnd(301);
1355
1356 // 4 Files in L0
1357 for (int i = 0; i < 4; i++) {
1358 for (int j = 0; j < 10; j++) {
1359 ASSERT_OK(Put(RandomString(&rnd, 10), RandomString(&rnd, 10)));
1360 }
1361 ASSERT_OK(Flush());
1362 }
1363
1364 // Wait for compaction to be triggered
1365 TEST_SYNC_POINT("DBTest2::CompactionStall:0");
1366
1367 // Clear "DBImpl::BGWorkCompaction" SYNC_POINT since we want to hold it again
1368 // at DBTest2::CompactionStall::1
1369 rocksdb::SyncPoint::GetInstance()->ClearTrace();
1370
1371 // Another 6 L0 files to trigger compaction again
1372 for (int i = 0; i < 6; i++) {
1373 for (int j = 0; j < 10; j++) {
1374 ASSERT_OK(Put(RandomString(&rnd, 10), RandomString(&rnd, 10)));
1375 }
1376 ASSERT_OK(Flush());
1377 }
1378
1379 // Wait for another compaction to be triggered
1380 TEST_SYNC_POINT("DBTest2::CompactionStall:1");
1381
494da23a 1382 // Hold NotifyOnCompactionBegin in the unlock mutex section
7c673cae
FG
1383 TEST_SYNC_POINT("DBTest2::CompactionStall:2");
1384
494da23a
TL
1385 // Hold NotifyOnCompactionCompleted in the unlock mutex section
1386 TEST_SYNC_POINT("DBTest2::CompactionStall:3");
1387
7c673cae
FG
1388 dbfull()->TEST_WaitForCompact();
1389 ASSERT_LT(NumTableFilesAtLevel(0),
1390 options.level0_file_num_compaction_trigger);
1391 ASSERT_GT(listener->compacted_files_cnt_.load(),
1392 10 - options.level0_file_num_compaction_trigger);
494da23a 1393 ASSERT_EQ(listener->compacting_files_cnt_.load(), listener->compacted_files_cnt_.load());
7c673cae
FG
1394
1395 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
1396}
1397
1398#endif // ROCKSDB_LITE
1399
1400TEST_F(DBTest2, FirstSnapshotTest) {
1401 Options options;
1402 options.write_buffer_size = 100000; // Small write buffer
1403 options = CurrentOptions(options);
1404 CreateAndReopenWithCF({"pikachu"}, options);
1405
1406 // This snapshot will have sequence number 0 what is expected behaviour.
1407 const Snapshot* s1 = db_->GetSnapshot();
1408
1409 Put(1, "k1", std::string(100000, 'x')); // Fill memtable
1410 Put(1, "k2", std::string(100000, 'y')); // Trigger flush
1411
1412 db_->ReleaseSnapshot(s1);
1413}
1414
494da23a
TL
1415#ifndef ROCKSDB_LITE
1416TEST_F(DBTest2, DuplicateSnapshot) {
1417 Options options;
1418 options = CurrentOptions(options);
1419 std::vector<const Snapshot*> snapshots;
1420 DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
1421 SequenceNumber oldest_ww_snap, first_ww_snap;
1422
1423 Put("k", "v"); // inc seq
1424 snapshots.push_back(db_->GetSnapshot());
1425 snapshots.push_back(db_->GetSnapshot());
1426 Put("k", "v"); // inc seq
1427 snapshots.push_back(db_->GetSnapshot());
1428 snapshots.push_back(dbi->GetSnapshotForWriteConflictBoundary());
1429 first_ww_snap = snapshots.back()->GetSequenceNumber();
1430 Put("k", "v"); // inc seq
1431 snapshots.push_back(dbi->GetSnapshotForWriteConflictBoundary());
1432 snapshots.push_back(db_->GetSnapshot());
1433 Put("k", "v"); // inc seq
1434 snapshots.push_back(db_->GetSnapshot());
1435
1436 {
1437 InstrumentedMutexLock l(dbi->mutex());
1438 auto seqs = dbi->snapshots().GetAll(&oldest_ww_snap);
1439 ASSERT_EQ(seqs.size(), 4); // duplicates are not counted
1440 ASSERT_EQ(oldest_ww_snap, first_ww_snap);
1441 }
1442
1443 for (auto s : snapshots) {
1444 db_->ReleaseSnapshot(s);
1445 }
1446}
1447#endif // ROCKSDB_LITE
1448
1449class PinL0IndexAndFilterBlocksTest
1450 : public DBTestBase,
1451 public testing::WithParamInterface<std::tuple<bool, bool>> {
7c673cae
FG
1452 public:
1453 PinL0IndexAndFilterBlocksTest() : DBTestBase("/db_pin_l0_index_bloom_test") {}
494da23a
TL
1454 void SetUp() override {
1455 infinite_max_files_ = std::get<0>(GetParam());
1456 disallow_preload_ = std::get<1>(GetParam());
1457 }
7c673cae
FG
1458
1459 void CreateTwoLevels(Options* options, bool close_afterwards) {
1460 if (infinite_max_files_) {
1461 options->max_open_files = -1;
1462 }
1463 options->create_if_missing = true;
1464 options->statistics = rocksdb::CreateDBStatistics();
1465 BlockBasedTableOptions table_options;
1466 table_options.cache_index_and_filter_blocks = true;
1467 table_options.pin_l0_filter_and_index_blocks_in_cache = true;
1468 table_options.filter_policy.reset(NewBloomFilterPolicy(20));
1469 options->table_factory.reset(new BlockBasedTableFactory(table_options));
1470 CreateAndReopenWithCF({"pikachu"}, *options);
1471
1472 Put(1, "a", "begin");
1473 Put(1, "z", "end");
1474 ASSERT_OK(Flush(1));
1475 // move this table to L1
1476 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
1477
1478 // reset block cache
1479 table_options.block_cache = NewLRUCache(64 * 1024);
1480 options->table_factory.reset(NewBlockBasedTableFactory(table_options));
1481 TryReopenWithColumnFamilies({"default", "pikachu"}, *options);
1482 // create new table at L0
1483 Put(1, "a2", "begin2");
1484 Put(1, "z2", "end2");
1485 ASSERT_OK(Flush(1));
1486
1487 if (close_afterwards) {
1488 Close(); // This ensures that there is no ref to block cache entries
1489 }
1490 table_options.block_cache->EraseUnRefEntries();
1491 }
1492
1493 bool infinite_max_files_;
494da23a 1494 bool disallow_preload_;
7c673cae
FG
1495};
1496
1497TEST_P(PinL0IndexAndFilterBlocksTest,
1498 IndexAndFilterBlocksOfNewTableAddedToCacheWithPinning) {
1499 Options options = CurrentOptions();
1500 if (infinite_max_files_) {
1501 options.max_open_files = -1;
1502 }
1503 options.create_if_missing = true;
1504 options.statistics = rocksdb::CreateDBStatistics();
1505 BlockBasedTableOptions table_options;
1506 table_options.cache_index_and_filter_blocks = true;
1507 table_options.pin_l0_filter_and_index_blocks_in_cache = true;
1508 table_options.filter_policy.reset(NewBloomFilterPolicy(20));
1509 options.table_factory.reset(new BlockBasedTableFactory(table_options));
1510 CreateAndReopenWithCF({"pikachu"}, options);
1511
1512 ASSERT_OK(Put(1, "key", "val"));
1513 // Create a new table.
1514 ASSERT_OK(Flush(1));
1515
1516 // index/filter blocks added to block cache right after table creation.
1517 ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1518 ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1519 ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1520 ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1521
1522 // only index/filter were added
1523 ASSERT_EQ(2, TestGetTickerCount(options, BLOCK_CACHE_ADD));
1524 ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_DATA_MISS));
1525
1526 std::string value;
1527 // Miss and hit count should remain the same, they're all pinned.
1528 db_->KeyMayExist(ReadOptions(), handles_[1], "key", &value);
1529 ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1530 ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1531 ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1532 ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1533
1534 // Miss and hit count should remain the same, they're all pinned.
1535 value = Get(1, "key");
1536 ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1537 ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1538 ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1539 ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1540}
1541
1542TEST_P(PinL0IndexAndFilterBlocksTest,
1543 MultiLevelIndexAndFilterBlocksCachedWithPinning) {
1544 Options options = CurrentOptions();
1545 PinL0IndexAndFilterBlocksTest::CreateTwoLevels(&options, false);
1546 // get base cache values
1547 uint64_t fm = TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS);
1548 uint64_t fh = TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT);
1549 uint64_t im = TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS);
1550 uint64_t ih = TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT);
1551
1552 std::string value;
1553 // this should be read from L0
1554 // so cache values don't change
1555 value = Get(1, "a2");
1556 ASSERT_EQ(fm, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1557 ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1558 ASSERT_EQ(im, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1559 ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1560
1561 // this should be read from L1
1562 // the file is opened, prefetching results in a cache filter miss
1563 // the block is loaded and added to the cache,
1564 // then the get results in a cache hit for L1
1565 // When we have inifinite max_files, there is still cache miss because we have
1566 // reset the block cache
1567 value = Get(1, "a");
1568 ASSERT_EQ(fm + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1569 ASSERT_EQ(im + 1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1570}
1571
1572TEST_P(PinL0IndexAndFilterBlocksTest, DisablePrefetchingNonL0IndexAndFilter) {
1573 Options options = CurrentOptions();
1574 // This ensures that db does not ref anything in the block cache, so
1575 // EraseUnRefEntries could clear them up.
1576 bool close_afterwards = true;
1577 PinL0IndexAndFilterBlocksTest::CreateTwoLevels(&options, close_afterwards);
1578
1579 // Get base cache values
1580 uint64_t fm = TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS);
1581 uint64_t fh = TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT);
1582 uint64_t im = TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS);
1583 uint64_t ih = TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT);
1584
494da23a
TL
1585 if (disallow_preload_) {
1586 // Now we have two files. We narrow the max open files to allow 3 entries
1587 // so that preloading SST files won't happen.
1588 options.max_open_files = 13;
1589 // RocksDB sanitize max open files to at least 20. Modify it back.
1590 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1591 "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
1592 int* max_open_files = static_cast<int*>(arg);
1593 *max_open_files = 13;
1594 });
1595 }
1596 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1597
7c673cae
FG
1598 // Reopen database. If max_open_files is set as -1, table readers will be
1599 // preloaded. This will trigger a BlockBasedTable::Open() and prefetch
1600 // L0 index and filter. Level 1's prefetching is disabled in DB::Open()
1601 TryReopenWithColumnFamilies({"default", "pikachu"}, options);
1602
494da23a
TL
1603 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
1604
1605 if (!disallow_preload_) {
7c673cae
FG
1606 // After reopen, cache miss are increased by one because we read (and only
1607 // read) filter and index on L0
1608 ASSERT_EQ(fm + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1609 ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1610 ASSERT_EQ(im + 1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1611 ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1612 } else {
1613 // If max_open_files is not -1, we do not preload table readers, so there is
1614 // no change.
1615 ASSERT_EQ(fm, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1616 ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1617 ASSERT_EQ(im, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1618 ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1619 }
1620 std::string value;
1621 // this should be read from L0
1622 value = Get(1, "a2");
1623 // If max_open_files is -1, we have pinned index and filter in Rep, so there
1624 // will not be changes in index and filter misses or hits. If max_open_files
1625 // is not -1, Get() will open a TableReader and prefetch index and filter.
1626 ASSERT_EQ(fm + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1627 ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1628 ASSERT_EQ(im + 1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1629 ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1630
1631 // this should be read from L1
1632 value = Get(1, "a");
494da23a 1633 if (!disallow_preload_) {
7c673cae
FG
1634 // In inifinite max files case, there's a cache miss in executing Get()
1635 // because index and filter are not prefetched before.
1636 ASSERT_EQ(fm + 2, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1637 ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1638 ASSERT_EQ(im + 2, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1639 ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1640 } else {
1641 // In this case, cache miss will be increased by one in
1642 // BlockBasedTable::Open() because this is not in DB::Open() code path so we
1643 // will prefetch L1's index and filter. Cache hit will also be increased by
1644 // one because Get() will read index and filter from the block cache
1645 // prefetched in previous Open() call.
1646 ASSERT_EQ(fm + 2, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1647 ASSERT_EQ(fh + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1648 ASSERT_EQ(im + 2, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1649 ASSERT_EQ(ih + 1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1650 }
494da23a
TL
1651
1652 // Force a full compaction to one single file. There will be a block
1653 // cache read for both of index and filter. If prefetch doesn't explicitly
1654 // happen, it will happen when verifying the file.
1655 Compact(1, "a", "zzzzz");
1656 dbfull()->TEST_WaitForCompact();
1657
1658 if (!disallow_preload_) {
1659 ASSERT_EQ(fm + 3, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1660 ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1661 ASSERT_EQ(im + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1662 ASSERT_EQ(ih + 2, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1663 } else {
1664 ASSERT_EQ(fm + 3, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1665 ASSERT_EQ(fh + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1666 ASSERT_EQ(im + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1667 ASSERT_EQ(ih + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1668 }
1669
1670 // Bloom and index hit will happen when a Get() happens.
1671 value = Get(1, "a");
1672 if (!disallow_preload_) {
1673 ASSERT_EQ(fm + 3, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1674 ASSERT_EQ(fh + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1675 ASSERT_EQ(im + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1676 ASSERT_EQ(ih + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1677 } else {
1678 ASSERT_EQ(fm + 3, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1679 ASSERT_EQ(fh + 2, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1680 ASSERT_EQ(im + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1681 ASSERT_EQ(ih + 4, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1682 }
7c673cae
FG
1683}
1684
1685INSTANTIATE_TEST_CASE_P(PinL0IndexAndFilterBlocksTest,
494da23a
TL
1686 PinL0IndexAndFilterBlocksTest,
1687 ::testing::Values(std::make_tuple(true, false),
1688 std::make_tuple(false, false),
1689 std::make_tuple(false, true)));
7c673cae
FG
1690
1691#ifndef ROCKSDB_LITE
1692TEST_F(DBTest2, MaxCompactionBytesTest) {
1693 Options options = CurrentOptions();
1694 options.memtable_factory.reset(
1695 new SpecialSkipListFactory(DBTestBase::kNumKeysByGenerateNewRandomFile));
1696 options.compaction_style = kCompactionStyleLevel;
1697 options.write_buffer_size = 200 << 10;
1698 options.arena_block_size = 4 << 10;
1699 options.level0_file_num_compaction_trigger = 4;
1700 options.num_levels = 4;
1701 options.compression = kNoCompression;
1702 options.max_bytes_for_level_base = 450 << 10;
1703 options.target_file_size_base = 100 << 10;
1704 // Infinite for full compaction.
1705 options.max_compaction_bytes = options.target_file_size_base * 100;
1706
1707 Reopen(options);
1708
1709 Random rnd(301);
1710
1711 for (int num = 0; num < 8; num++) {
1712 GenerateNewRandomFile(&rnd);
1713 }
1714 CompactRangeOptions cro;
1715 cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
1716 ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
1717 ASSERT_EQ("0,0,8", FilesPerLevel(0));
1718
1719 // When compact from Ln -> Ln+1, cut a file if the file overlaps with
1720 // more than three files in Ln+1.
1721 options.max_compaction_bytes = options.target_file_size_base * 3;
1722 Reopen(options);
1723
1724 GenerateNewRandomFile(&rnd);
1725 // Add three more small files that overlap with the previous file
1726 for (int i = 0; i < 3; i++) {
1727 Put("a", "z");
1728 ASSERT_OK(Flush());
1729 }
1730 dbfull()->TEST_WaitForCompact();
1731
1732 // Output files to L1 are cut to three pieces, according to
1733 // options.max_compaction_bytes
1734 ASSERT_EQ("0,3,8", FilesPerLevel(0));
1735}
1736
1737static void UniqueIdCallback(void* arg) {
1738 int* result = reinterpret_cast<int*>(arg);
1739 if (*result == -1) {
1740 *result = 0;
1741 }
1742
1743 rocksdb::SyncPoint::GetInstance()->ClearTrace();
1744 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1745 "GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback);
1746}
1747
1748class MockPersistentCache : public PersistentCache {
1749 public:
1750 explicit MockPersistentCache(const bool is_compressed, const size_t max_size)
1751 : is_compressed_(is_compressed), max_size_(max_size) {
1752 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1753 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1754 "GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback);
1755 }
1756
494da23a 1757 ~MockPersistentCache() override {}
7c673cae
FG
1758
1759 PersistentCache::StatsType Stats() override {
1760 return PersistentCache::StatsType();
1761 }
1762
1763 Status Insert(const Slice& page_key, const char* data,
1764 const size_t size) override {
1765 MutexLock _(&lock_);
1766
1767 if (size_ > max_size_) {
1768 size_ -= data_.begin()->second.size();
1769 data_.erase(data_.begin());
1770 }
1771
1772 data_.insert(std::make_pair(page_key.ToString(), std::string(data, size)));
1773 size_ += size;
1774 return Status::OK();
1775 }
1776
1777 Status Lookup(const Slice& page_key, std::unique_ptr<char[]>* data,
1778 size_t* size) override {
1779 MutexLock _(&lock_);
1780 auto it = data_.find(page_key.ToString());
1781 if (it == data_.end()) {
1782 return Status::NotFound();
1783 }
1784
1785 assert(page_key.ToString() == it->first);
1786 data->reset(new char[it->second.size()]);
1787 memcpy(data->get(), it->second.c_str(), it->second.size());
1788 *size = it->second.size();
1789 return Status::OK();
1790 }
1791
1792 bool IsCompressed() override { return is_compressed_; }
1793
1794 std::string GetPrintableOptions() const override {
1795 return "MockPersistentCache";
1796 }
1797
1798 port::Mutex lock_;
1799 std::map<std::string, std::string> data_;
1800 const bool is_compressed_ = true;
1801 size_t size_ = 0;
1802 const size_t max_size_ = 10 * 1024; // 10KiB
1803};
1804
494da23a
TL
1805#ifdef OS_LINUX
1806// Make sure that in CPU time perf context counters, Env::NowCPUNanos()
1807// is used, rather than Env::CPUNanos();
1808TEST_F(DBTest2, TestPerfContextGetCpuTime) {
1809 // force resizing table cache so table handle is not preloaded so that
1810 // we can measure find_table_nanos during Get().
1811 dbfull()->TEST_table_cache()->SetCapacity(0);
1812 ASSERT_OK(Put("foo", "bar"));
1813 ASSERT_OK(Flush());
1814 env_->now_cpu_count_.store(0);
1815
1816 // CPU timing is not enabled with kEnableTimeExceptForMutex
1817 SetPerfLevel(PerfLevel::kEnableTimeExceptForMutex);
1818 ASSERT_EQ("bar", Get("foo"));
1819 ASSERT_EQ(0, get_perf_context()->get_cpu_nanos);
1820 ASSERT_EQ(0, env_->now_cpu_count_.load());
1821
1822 uint64_t kDummyAddonTime = uint64_t{1000000000000};
1823
1824 // Add time to NowNanos() reading.
1825 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1826 "TableCache::FindTable:0",
1827 [&](void* /*arg*/) { env_->addon_time_.fetch_add(kDummyAddonTime); });
1828 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1829
1830 SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex);
1831 ASSERT_EQ("bar", Get("foo"));
1832 ASSERT_GT(env_->now_cpu_count_.load(), 2);
1833 ASSERT_LT(get_perf_context()->get_cpu_nanos, kDummyAddonTime);
1834 ASSERT_GT(get_perf_context()->find_table_nanos, kDummyAddonTime);
1835
1836 SetPerfLevel(PerfLevel::kDisable);
1837 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
1838}
1839
1840TEST_F(DBTest2, TestPerfContextIterCpuTime) {
1841 DestroyAndReopen(CurrentOptions());
1842 // force resizing table cache so table handle is not preloaded so that
1843 // we can measure find_table_nanos during iteration
1844 dbfull()->TEST_table_cache()->SetCapacity(0);
1845
1846 const size_t kNumEntries = 10;
1847 for (size_t i = 0; i < kNumEntries; ++i) {
1848 ASSERT_OK(Put("k" + ToString(i), "v" + ToString(i)));
1849 }
1850 ASSERT_OK(Flush());
1851 for (size_t i = 0; i < kNumEntries; ++i) {
1852 ASSERT_EQ("v" + ToString(i), Get("k" + ToString(i)));
1853 }
1854 std::string last_key = "k" + ToString(kNumEntries - 1);
1855 std::string last_value = "v" + ToString(kNumEntries - 1);
1856 env_->now_cpu_count_.store(0);
1857
1858 // CPU timing is not enabled with kEnableTimeExceptForMutex
1859 SetPerfLevel(PerfLevel::kEnableTimeExceptForMutex);
1860 Iterator* iter = db_->NewIterator(ReadOptions());
1861 iter->Seek("k0");
1862 ASSERT_TRUE(iter->Valid());
1863 ASSERT_EQ("v0", iter->value().ToString());
1864 iter->SeekForPrev(last_key);
1865 ASSERT_TRUE(iter->Valid());
1866 iter->SeekToLast();
1867 ASSERT_TRUE(iter->Valid());
1868 ASSERT_EQ(last_value, iter->value().ToString());
1869 iter->SeekToFirst();
1870 ASSERT_TRUE(iter->Valid());
1871 ASSERT_EQ("v0", iter->value().ToString());
1872 ASSERT_EQ(0, get_perf_context()->iter_seek_cpu_nanos);
1873 iter->Next();
1874 ASSERT_TRUE(iter->Valid());
1875 ASSERT_EQ("v1", iter->value().ToString());
1876 ASSERT_EQ(0, get_perf_context()->iter_next_cpu_nanos);
1877 iter->Prev();
1878 ASSERT_TRUE(iter->Valid());
1879 ASSERT_EQ("v0", iter->value().ToString());
1880 ASSERT_EQ(0, get_perf_context()->iter_prev_cpu_nanos);
1881 ASSERT_EQ(0, env_->now_cpu_count_.load());
1882 delete iter;
1883
1884 uint64_t kDummyAddonTime = uint64_t{1000000000000};
1885
1886 // Add time to NowNanos() reading.
1887 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1888 "TableCache::FindTable:0",
1889 [&](void* /*arg*/) { env_->addon_time_.fetch_add(kDummyAddonTime); });
1890 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1891
1892 SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex);
1893 iter = db_->NewIterator(ReadOptions());
1894 iter->Seek("k0");
1895 ASSERT_TRUE(iter->Valid());
1896 ASSERT_EQ("v0", iter->value().ToString());
1897 iter->SeekForPrev(last_key);
1898 ASSERT_TRUE(iter->Valid());
1899 iter->SeekToLast();
1900 ASSERT_TRUE(iter->Valid());
1901 ASSERT_EQ(last_value, iter->value().ToString());
1902 iter->SeekToFirst();
1903 ASSERT_TRUE(iter->Valid());
1904 ASSERT_EQ("v0", iter->value().ToString());
1905 ASSERT_GT(get_perf_context()->iter_seek_cpu_nanos, 0);
1906 ASSERT_LT(get_perf_context()->iter_seek_cpu_nanos, kDummyAddonTime);
1907 iter->Next();
1908 ASSERT_TRUE(iter->Valid());
1909 ASSERT_EQ("v1", iter->value().ToString());
1910 ASSERT_GT(get_perf_context()->iter_next_cpu_nanos, 0);
1911 ASSERT_LT(get_perf_context()->iter_next_cpu_nanos, kDummyAddonTime);
1912 iter->Prev();
1913 ASSERT_TRUE(iter->Valid());
1914 ASSERT_EQ("v0", iter->value().ToString());
1915 ASSERT_GT(get_perf_context()->iter_prev_cpu_nanos, 0);
1916 ASSERT_LT(get_perf_context()->iter_prev_cpu_nanos, kDummyAddonTime);
1917 ASSERT_GE(env_->now_cpu_count_.load(), 12);
1918 ASSERT_GT(get_perf_context()->find_table_nanos, kDummyAddonTime);
1919
1920 SetPerfLevel(PerfLevel::kDisable);
1921 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
1922 delete iter;
1923}
1924#endif // OS_LINUX
1925
7c673cae
FG
1926#ifndef OS_SOLARIS // GetUniqueIdFromFile is not implemented
1927TEST_F(DBTest2, PersistentCache) {
1928 int num_iter = 80;
1929
1930 Options options;
1931 options.write_buffer_size = 64 * 1024; // small write buffer
1932 options.statistics = rocksdb::CreateDBStatistics();
1933 options = CurrentOptions(options);
1934
1935 auto bsizes = {/*no block cache*/ 0, /*1M*/ 1 * 1024 * 1024};
1936 auto types = {/*compressed*/ 1, /*uncompressed*/ 0};
1937 for (auto bsize : bsizes) {
1938 for (auto type : types) {
1939 BlockBasedTableOptions table_options;
1940 table_options.persistent_cache.reset(
1941 new MockPersistentCache(type, 10 * 1024));
1942 table_options.no_block_cache = true;
1943 table_options.block_cache = bsize ? NewLRUCache(bsize) : nullptr;
1944 table_options.block_cache_compressed = nullptr;
1945 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
1946
1947 DestroyAndReopen(options);
1948 CreateAndReopenWithCF({"pikachu"}, options);
1949 // default column family doesn't have block cache
1950 Options no_block_cache_opts;
1951 no_block_cache_opts.statistics = options.statistics;
1952 no_block_cache_opts = CurrentOptions(no_block_cache_opts);
1953 BlockBasedTableOptions table_options_no_bc;
1954 table_options_no_bc.no_block_cache = true;
1955 no_block_cache_opts.table_factory.reset(
1956 NewBlockBasedTableFactory(table_options_no_bc));
1957 ReopenWithColumnFamilies(
1958 {"default", "pikachu"},
1959 std::vector<Options>({no_block_cache_opts, options}));
1960
1961 Random rnd(301);
1962
1963 // Write 8MB (80 values, each 100K)
1964 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
1965 std::vector<std::string> values;
1966 std::string str;
1967 for (int i = 0; i < num_iter; i++) {
1968 if (i % 4 == 0) { // high compression ratio
1969 str = RandomString(&rnd, 1000);
1970 }
1971 values.push_back(str);
1972 ASSERT_OK(Put(1, Key(i), values[i]));
1973 }
1974
1975 // flush all data from memtable so that reads are from block cache
1976 ASSERT_OK(Flush(1));
1977
1978 for (int i = 0; i < num_iter; i++) {
1979 ASSERT_EQ(Get(1, Key(i)), values[i]);
1980 }
1981
1982 auto hit = options.statistics->getTickerCount(PERSISTENT_CACHE_HIT);
1983 auto miss = options.statistics->getTickerCount(PERSISTENT_CACHE_MISS);
1984
1985 ASSERT_GT(hit, 0);
1986 ASSERT_GT(miss, 0);
1987 }
1988 }
1989}
1990#endif // !OS_SOLARIS
1991
1992namespace {
1993void CountSyncPoint() {
1994 TEST_SYNC_POINT_CALLBACK("DBTest2::MarkedPoint", nullptr /* arg */);
1995}
1996} // namespace
1997
1998TEST_F(DBTest2, SyncPointMarker) {
1999 std::atomic<int> sync_point_called(0);
2000 rocksdb::SyncPoint::GetInstance()->SetCallBack(
2001 "DBTest2::MarkedPoint",
11fdf7f2 2002 [&](void* /*arg*/) { sync_point_called.fetch_add(1); });
7c673cae
FG
2003
2004 // The first dependency enforces Marker can be loaded before MarkedPoint.
2005 // The second checks that thread 1's MarkedPoint should be disabled here.
2006 // Execution order:
2007 // | Thread 1 | Thread 2 |
2008 // | | Marker |
2009 // | MarkedPoint | |
2010 // | Thread1First | |
2011 // | | MarkedPoint |
2012 rocksdb::SyncPoint::GetInstance()->LoadDependencyAndMarkers(
2013 {{"DBTest2::SyncPointMarker:Thread1First", "DBTest2::MarkedPoint"}},
2014 {{"DBTest2::SyncPointMarker:Marker", "DBTest2::MarkedPoint"}});
2015
2016 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
2017
2018 std::function<void()> func1 = [&]() {
2019 CountSyncPoint();
2020 TEST_SYNC_POINT("DBTest2::SyncPointMarker:Thread1First");
2021 };
2022
2023 std::function<void()> func2 = [&]() {
2024 TEST_SYNC_POINT("DBTest2::SyncPointMarker:Marker");
2025 CountSyncPoint();
2026 };
2027
2028 auto thread1 = port::Thread(func1);
2029 auto thread2 = port::Thread(func2);
2030 thread1.join();
2031 thread2.join();
2032
2033 // Callback is only executed once
2034 ASSERT_EQ(sync_point_called.load(), 1);
2035 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
2036}
2037#endif
2038
2039size_t GetEncodedEntrySize(size_t key_size, size_t value_size) {
2040 std::string buffer;
2041
2042 PutVarint32(&buffer, static_cast<uint32_t>(0));
2043 PutVarint32(&buffer, static_cast<uint32_t>(key_size));
2044 PutVarint32(&buffer, static_cast<uint32_t>(value_size));
2045
2046 return buffer.size() + key_size + value_size;
2047}
2048
2049TEST_F(DBTest2, ReadAmpBitmap) {
2050 Options options = CurrentOptions();
2051 BlockBasedTableOptions bbto;
11fdf7f2
TL
2052 uint32_t bytes_per_bit[2] = {1, 16};
2053 for (size_t k = 0; k < 2; k++) {
2054 // Disable delta encoding to make it easier to calculate read amplification
2055 bbto.use_delta_encoding = false;
2056 // Huge block cache to make it easier to calculate read amplification
2057 bbto.block_cache = NewLRUCache(1024 * 1024 * 1024);
2058 bbto.read_amp_bytes_per_bit = bytes_per_bit[k];
2059 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
2060 options.statistics = rocksdb::CreateDBStatistics();
2061 DestroyAndReopen(options);
7c673cae 2062
11fdf7f2 2063 const size_t kNumEntries = 10000;
7c673cae 2064
11fdf7f2
TL
2065 Random rnd(301);
2066 for (size_t i = 0; i < kNumEntries; i++) {
2067 ASSERT_OK(Put(Key(static_cast<int>(i)), RandomString(&rnd, 100)));
2068 }
2069 ASSERT_OK(Flush());
7c673cae 2070
11fdf7f2
TL
2071 Close();
2072 Reopen(options);
2073
2074 // Read keys/values randomly and verify that reported read amp error
2075 // is less than 2%
2076 uint64_t total_useful_bytes = 0;
2077 std::set<int> read_keys;
2078 std::string value;
2079 for (size_t i = 0; i < kNumEntries * 5; i++) {
2080 int key_idx = rnd.Next() % kNumEntries;
2081 std::string key = Key(key_idx);
2082 ASSERT_OK(db_->Get(ReadOptions(), key, &value));
2083
2084 if (read_keys.find(key_idx) == read_keys.end()) {
2085 auto internal_key = InternalKey(key, 0, ValueType::kTypeValue);
2086 total_useful_bytes +=
2087 GetEncodedEntrySize(internal_key.size(), value.size());
2088 read_keys.insert(key_idx);
2089 }
7c673cae 2090
11fdf7f2
TL
2091 double expected_read_amp =
2092 static_cast<double>(total_useful_bytes) /
2093 options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES);
7c673cae 2094
11fdf7f2
TL
2095 double read_amp =
2096 static_cast<double>(options.statistics->getTickerCount(
2097 READ_AMP_ESTIMATE_USEFUL_BYTES)) /
2098 options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES);
7c673cae 2099
11fdf7f2
TL
2100 double error_pct = fabs(expected_read_amp - read_amp) * 100;
2101 // Error between reported read amp and real read amp should be less than
2102 // 2%
2103 EXPECT_LE(error_pct, 2);
2104 }
7c673cae 2105
11fdf7f2
TL
2106 // Make sure we read every thing in the DB (which is smaller than our cache)
2107 Iterator* iter = db_->NewIterator(ReadOptions());
2108 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
2109 ASSERT_EQ(iter->value().ToString(), Get(iter->key().ToString()));
2110 }
2111 delete iter;
7c673cae 2112
11fdf7f2
TL
2113 // Read amp is on average 100% since we read all what we loaded in memory
2114 if (k == 0) {
2115 ASSERT_EQ(
2116 options.statistics->getTickerCount(READ_AMP_ESTIMATE_USEFUL_BYTES),
2117 options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES));
2118 } else {
2119 ASSERT_NEAR(
2120 options.statistics->getTickerCount(READ_AMP_ESTIMATE_USEFUL_BYTES) *
2121 1.0f /
2122 options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES),
2123 1, .01);
2124 }
7c673cae 2125 }
7c673cae
FG
2126}
2127
2128#ifndef OS_SOLARIS // GetUniqueIdFromFile is not implemented
2129TEST_F(DBTest2, ReadAmpBitmapLiveInCacheAfterDBClose) {
11fdf7f2
TL
2130 {
2131 const int kIdBufLen = 100;
2132 char id_buf[kIdBufLen];
2133#ifndef OS_WIN
2134 // You can't open a directory on windows using random access file
2135 std::unique_ptr<RandomAccessFile> file;
2136 ASSERT_OK(env_->NewRandomAccessFile(dbname_, &file, EnvOptions()));
2137 if (file->GetUniqueId(id_buf, kIdBufLen) == 0) {
2138 // fs holding db directory doesn't support getting a unique file id,
2139 // this means that running this test will fail because lru_cache will load
2140 // the blocks again regardless of them being already in the cache
2141 return;
2142 }
2143#else
2144 std::unique_ptr<Directory> dir;
2145 ASSERT_OK(env_->NewDirectory(dbname_, &dir));
2146 if (dir->GetUniqueId(id_buf, kIdBufLen) == 0) {
2147 // fs holding db directory doesn't support getting a unique file id,
2148 // this means that running this test will fail because lru_cache will load
2149 // the blocks again regardless of them being already in the cache
2150 return;
2151 }
2152#endif
7c673cae 2153 }
11fdf7f2
TL
2154 uint32_t bytes_per_bit[2] = {1, 16};
2155 for (size_t k = 0; k < 2; k++) {
2156 std::shared_ptr<Cache> lru_cache = NewLRUCache(1024 * 1024 * 1024);
2157 std::shared_ptr<Statistics> stats = rocksdb::CreateDBStatistics();
2158
2159 Options options = CurrentOptions();
2160 BlockBasedTableOptions bbto;
2161 // Disable delta encoding to make it easier to calculate read amplification
2162 bbto.use_delta_encoding = false;
2163 // Huge block cache to make it easier to calculate read amplification
2164 bbto.block_cache = lru_cache;
2165 bbto.read_amp_bytes_per_bit = bytes_per_bit[k];
2166 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
2167 options.statistics = stats;
2168 DestroyAndReopen(options);
7c673cae 2169
11fdf7f2 2170 const int kNumEntries = 10000;
7c673cae 2171
11fdf7f2
TL
2172 Random rnd(301);
2173 for (int i = 0; i < kNumEntries; i++) {
2174 ASSERT_OK(Put(Key(i), RandomString(&rnd, 100)));
2175 }
2176 ASSERT_OK(Flush());
7c673cae 2177
11fdf7f2
TL
2178 Close();
2179 Reopen(options);
2180
2181 uint64_t total_useful_bytes = 0;
2182 std::set<int> read_keys;
2183 std::string value;
2184 // Iter1: Read half the DB, Read even keys
2185 // Key(0), Key(2), Key(4), Key(6), Key(8), ...
2186 for (int i = 0; i < kNumEntries; i += 2) {
2187 std::string key = Key(i);
2188 ASSERT_OK(db_->Get(ReadOptions(), key, &value));
2189
2190 if (read_keys.find(i) == read_keys.end()) {
2191 auto internal_key = InternalKey(key, 0, ValueType::kTypeValue);
2192 total_useful_bytes +=
2193 GetEncodedEntrySize(internal_key.size(), value.size());
2194 read_keys.insert(i);
2195 }
2196 }
7c673cae 2197
11fdf7f2
TL
2198 size_t total_useful_bytes_iter1 =
2199 options.statistics->getTickerCount(READ_AMP_ESTIMATE_USEFUL_BYTES);
2200 size_t total_loaded_bytes_iter1 =
2201 options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES);
7c673cae 2202
11fdf7f2
TL
2203 Close();
2204 std::shared_ptr<Statistics> new_statistics = rocksdb::CreateDBStatistics();
2205 // Destroy old statistics obj that the blocks in lru_cache are pointing to
2206 options.statistics.reset();
2207 // Use the statistics object that we just created
2208 options.statistics = new_statistics;
2209 Reopen(options);
2210
2211 // Iter2: Read half the DB, Read odd keys
2212 // Key(1), Key(3), Key(5), Key(7), Key(9), ...
2213 for (int i = 1; i < kNumEntries; i += 2) {
2214 std::string key = Key(i);
2215 ASSERT_OK(db_->Get(ReadOptions(), key, &value));
2216
2217 if (read_keys.find(i) == read_keys.end()) {
2218 auto internal_key = InternalKey(key, 0, ValueType::kTypeValue);
2219 total_useful_bytes +=
2220 GetEncodedEntrySize(internal_key.size(), value.size());
2221 read_keys.insert(i);
2222 }
7c673cae 2223 }
7c673cae 2224
11fdf7f2
TL
2225 size_t total_useful_bytes_iter2 =
2226 options.statistics->getTickerCount(READ_AMP_ESTIMATE_USEFUL_BYTES);
2227 size_t total_loaded_bytes_iter2 =
2228 options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES);
7c673cae 2229
7c673cae 2230
11fdf7f2
TL
2231 // Read amp is on average 100% since we read all what we loaded in memory
2232 if (k == 0) {
2233 ASSERT_EQ(total_useful_bytes_iter1 + total_useful_bytes_iter2,
2234 total_loaded_bytes_iter1 + total_loaded_bytes_iter2);
2235 } else {
2236 ASSERT_NEAR((total_useful_bytes_iter1 + total_useful_bytes_iter2) * 1.0f /
2237 (total_loaded_bytes_iter1 + total_loaded_bytes_iter2),
2238 1, .01);
7c673cae
FG
2239 }
2240 }
7c673cae
FG
2241}
2242#endif // !OS_SOLARIS
2243
2244#ifndef ROCKSDB_LITE
2245TEST_F(DBTest2, AutomaticCompactionOverlapManualCompaction) {
2246 Options options = CurrentOptions();
2247 options.num_levels = 3;
2248 options.IncreaseParallelism(20);
2249 DestroyAndReopen(options);
2250
2251 ASSERT_OK(Put(Key(0), "a"));
2252 ASSERT_OK(Put(Key(5), "a"));
2253 ASSERT_OK(Flush());
2254
2255 ASSERT_OK(Put(Key(10), "a"));
2256 ASSERT_OK(Put(Key(15), "a"));
2257 ASSERT_OK(Flush());
2258
2259 CompactRangeOptions cro;
2260 cro.change_level = true;
2261 cro.target_level = 2;
2262 ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
2263
2264 auto get_stat = [](std::string level_str, LevelStatType type,
11fdf7f2 2265 std::map<std::string, std::string> props) {
7c673cae 2266 auto prop_str =
11fdf7f2 2267 "compaction." + level_str + "." +
7c673cae
FG
2268 InternalStats::compaction_level_stats.at(type).property_name.c_str();
2269 auto prop_item = props.find(prop_str);
11fdf7f2 2270 return prop_item == props.end() ? 0 : std::stod(prop_item->second);
7c673cae
FG
2271 };
2272
2273 // Trivial move 2 files to L2
2274 ASSERT_EQ("0,0,2", FilesPerLevel());
2275 // Also test that the stats GetMapProperty API reporting the same result
2276 {
11fdf7f2 2277 std::map<std::string, std::string> prop;
7c673cae
FG
2278 ASSERT_TRUE(dbfull()->GetMapProperty("rocksdb.cfstats", &prop));
2279 ASSERT_EQ(0, get_stat("L0", LevelStatType::NUM_FILES, prop));
2280 ASSERT_EQ(0, get_stat("L1", LevelStatType::NUM_FILES, prop));
2281 ASSERT_EQ(2, get_stat("L2", LevelStatType::NUM_FILES, prop));
2282 ASSERT_EQ(2, get_stat("Sum", LevelStatType::NUM_FILES, prop));
2283 }
2284
2285 // While the compaction is running, we will create 2 new files that
2286 // can fit in L2, these 2 files will be moved to L2 and overlap with
2287 // the running compaction and break the LSM consistency.
2288 rocksdb::SyncPoint::GetInstance()->SetCallBack(
11fdf7f2 2289 "CompactionJob::Run():Start", [&](void* /*arg*/) {
7c673cae
FG
2290 ASSERT_OK(
2291 dbfull()->SetOptions({{"level0_file_num_compaction_trigger", "2"},
2292 {"max_bytes_for_level_base", "1"}}));
2293 ASSERT_OK(Put(Key(6), "a"));
2294 ASSERT_OK(Put(Key(7), "a"));
2295 ASSERT_OK(Flush());
2296
2297 ASSERT_OK(Put(Key(8), "a"));
2298 ASSERT_OK(Put(Key(9), "a"));
2299 ASSERT_OK(Flush());
2300 });
2301 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
2302
2303 // Run a manual compaction that will compact the 2 files in L2
2304 // into 1 file in L2
2305 cro.exclusive_manual_compaction = false;
2306 cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
2307 ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
2308
2309 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
2310
2311 // Test that the stats GetMapProperty API reporting 1 file in L2
2312 {
11fdf7f2 2313 std::map<std::string, std::string> prop;
7c673cae
FG
2314 ASSERT_TRUE(dbfull()->GetMapProperty("rocksdb.cfstats", &prop));
2315 ASSERT_EQ(1, get_stat("L2", LevelStatType::NUM_FILES, prop));
2316 }
2317}
2318
2319TEST_F(DBTest2, ManualCompactionOverlapManualCompaction) {
2320 Options options = CurrentOptions();
2321 options.num_levels = 2;
2322 options.IncreaseParallelism(20);
2323 options.disable_auto_compactions = true;
2324 DestroyAndReopen(options);
2325
2326 ASSERT_OK(Put(Key(0), "a"));
2327 ASSERT_OK(Put(Key(5), "a"));
2328 ASSERT_OK(Flush());
2329
2330 ASSERT_OK(Put(Key(10), "a"));
2331 ASSERT_OK(Put(Key(15), "a"));
2332 ASSERT_OK(Flush());
2333
2334 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2335
2336 // Trivial move 2 files to L1
2337 ASSERT_EQ("0,2", FilesPerLevel());
2338
2339 std::function<void()> bg_manual_compact = [&]() {
2340 std::string k1 = Key(6);
2341 std::string k2 = Key(9);
2342 Slice k1s(k1);
2343 Slice k2s(k2);
2344 CompactRangeOptions cro;
2345 cro.exclusive_manual_compaction = false;
2346 ASSERT_OK(db_->CompactRange(cro, &k1s, &k2s));
2347 };
2348 rocksdb::port::Thread bg_thread;
2349
2350 // While the compaction is running, we will create 2 new files that
2351 // can fit in L1, these 2 files will be moved to L1 and overlap with
2352 // the running compaction and break the LSM consistency.
2353 std::atomic<bool> flag(false);
2354 rocksdb::SyncPoint::GetInstance()->SetCallBack(
11fdf7f2 2355 "CompactionJob::Run():Start", [&](void* /*arg*/) {
7c673cae
FG
2356 if (flag.exchange(true)) {
2357 // We want to make sure to call this callback only once
2358 return;
2359 }
2360 ASSERT_OK(Put(Key(6), "a"));
2361 ASSERT_OK(Put(Key(7), "a"));
2362 ASSERT_OK(Flush());
2363
2364 ASSERT_OK(Put(Key(8), "a"));
2365 ASSERT_OK(Put(Key(9), "a"));
2366 ASSERT_OK(Flush());
2367
2368 // Start a non-exclusive manual compaction in a bg thread
2369 bg_thread = port::Thread(bg_manual_compact);
2370 // This manual compaction conflict with the other manual compaction
2371 // so it should wait until the first compaction finish
2372 env_->SleepForMicroseconds(1000000);
2373 });
2374 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
2375
2376 // Run a manual compaction that will compact the 2 files in L1
2377 // into 1 file in L1
2378 CompactRangeOptions cro;
2379 cro.exclusive_manual_compaction = false;
2380 cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
2381 ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
2382 bg_thread.join();
2383
2384 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
2385}
2386
2387TEST_F(DBTest2, OptimizeForPointLookup) {
2388 Options options = CurrentOptions();
2389 Close();
2390 options.OptimizeForPointLookup(2);
2391 ASSERT_OK(DB::Open(options, dbname_, &db_));
2392
2393 ASSERT_OK(Put("foo", "v1"));
2394 ASSERT_EQ("v1", Get("foo"));
2395 Flush();
2396 ASSERT_EQ("v1", Get("foo"));
2397}
2398
2399#endif // ROCKSDB_LITE
2400
2401TEST_F(DBTest2, GetRaceFlush1) {
2402 ASSERT_OK(Put("foo", "v1"));
2403
2404 rocksdb::SyncPoint::GetInstance()->LoadDependency(
2405 {{"DBImpl::GetImpl:1", "DBTest2::GetRaceFlush:1"},
2406 {"DBTest2::GetRaceFlush:2", "DBImpl::GetImpl:2"}});
2407
2408 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
2409
2410 rocksdb::port::Thread t1([&] {
2411 TEST_SYNC_POINT("DBTest2::GetRaceFlush:1");
2412 ASSERT_OK(Put("foo", "v2"));
2413 Flush();
2414 TEST_SYNC_POINT("DBTest2::GetRaceFlush:2");
2415 });
2416
2417 // Get() is issued after the first Put(), so it should see either
2418 // "v1" or "v2".
2419 ASSERT_NE("NOT_FOUND", Get("foo"));
2420 t1.join();
2421 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
2422}
2423
2424TEST_F(DBTest2, GetRaceFlush2) {
2425 ASSERT_OK(Put("foo", "v1"));
2426
2427 rocksdb::SyncPoint::GetInstance()->LoadDependency(
2428 {{"DBImpl::GetImpl:3", "DBTest2::GetRaceFlush:1"},
2429 {"DBTest2::GetRaceFlush:2", "DBImpl::GetImpl:4"}});
2430
2431 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
2432
2433 port::Thread t1([&] {
2434 TEST_SYNC_POINT("DBTest2::GetRaceFlush:1");
2435 ASSERT_OK(Put("foo", "v2"));
2436 Flush();
2437 TEST_SYNC_POINT("DBTest2::GetRaceFlush:2");
2438 });
2439
2440 // Get() is issued after the first Put(), so it should see either
2441 // "v1" or "v2".
2442 ASSERT_NE("NOT_FOUND", Get("foo"));
2443 t1.join();
2444 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
2445}
2446
2447TEST_F(DBTest2, DirectIO) {
2448 if (!IsDirectIOSupported()) {
2449 return;
2450 }
2451 Options options = CurrentOptions();
2452 options.use_direct_reads = options.use_direct_io_for_flush_and_compaction =
2453 true;
2454 options.allow_mmap_reads = options.allow_mmap_writes = false;
2455 DestroyAndReopen(options);
2456
2457 ASSERT_OK(Put(Key(0), "a"));
2458 ASSERT_OK(Put(Key(5), "a"));
2459 ASSERT_OK(Flush());
2460
2461 ASSERT_OK(Put(Key(10), "a"));
2462 ASSERT_OK(Put(Key(15), "a"));
2463 ASSERT_OK(Flush());
2464
2465 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2466 Reopen(options);
2467}
2468
2469TEST_F(DBTest2, MemtableOnlyIterator) {
2470 Options options = CurrentOptions();
2471 CreateAndReopenWithCF({"pikachu"}, options);
2472
2473 ASSERT_OK(Put(1, "foo", "first"));
2474 ASSERT_OK(Put(1, "bar", "second"));
2475
2476 ReadOptions ropt;
2477 ropt.read_tier = kMemtableTier;
2478 std::string value;
2479 Iterator* it = nullptr;
2480
2481 // Before flushing
2482 // point lookups
2483 ASSERT_OK(db_->Get(ropt, handles_[1], "foo", &value));
2484 ASSERT_EQ("first", value);
2485 ASSERT_OK(db_->Get(ropt, handles_[1], "bar", &value));
2486 ASSERT_EQ("second", value);
2487
2488 // Memtable-only iterator (read_tier=kMemtableTier); data not flushed yet.
2489 it = db_->NewIterator(ropt, handles_[1]);
2490 int count = 0;
2491 for (it->SeekToFirst(); it->Valid(); it->Next()) {
2492 ASSERT_TRUE(it->Valid());
2493 count++;
2494 }
2495 ASSERT_TRUE(!it->Valid());
2496 ASSERT_EQ(2, count);
2497 delete it;
2498
2499 Flush(1);
2500
2501 // After flushing
2502 // point lookups
2503 ASSERT_OK(db_->Get(ropt, handles_[1], "foo", &value));
2504 ASSERT_EQ("first", value);
2505 ASSERT_OK(db_->Get(ropt, handles_[1], "bar", &value));
2506 ASSERT_EQ("second", value);
2507 // nothing should be returned using memtable-only iterator after flushing.
2508 it = db_->NewIterator(ropt, handles_[1]);
2509 count = 0;
2510 for (it->SeekToFirst(); it->Valid(); it->Next()) {
2511 ASSERT_TRUE(it->Valid());
2512 count++;
2513 }
2514 ASSERT_TRUE(!it->Valid());
2515 ASSERT_EQ(0, count);
2516 delete it;
2517
2518 // Add a key to memtable
2519 ASSERT_OK(Put(1, "foobar", "third"));
2520 it = db_->NewIterator(ropt, handles_[1]);
2521 count = 0;
2522 for (it->SeekToFirst(); it->Valid(); it->Next()) {
2523 ASSERT_TRUE(it->Valid());
2524 ASSERT_EQ("foobar", it->key().ToString());
2525 ASSERT_EQ("third", it->value().ToString());
2526 count++;
2527 }
2528 ASSERT_TRUE(!it->Valid());
2529 ASSERT_EQ(1, count);
2530 delete it;
2531}
11fdf7f2
TL
2532
2533TEST_F(DBTest2, LowPriWrite) {
2534 Options options = CurrentOptions();
2535 // Compaction pressure should trigger since 6 files
2536 options.level0_file_num_compaction_trigger = 4;
2537 options.level0_slowdown_writes_trigger = 12;
2538 options.level0_stop_writes_trigger = 30;
2539 options.delayed_write_rate = 8 * 1024 * 1024;
2540 Reopen(options);
2541
2542 std::atomic<int> rate_limit_count(0);
2543
2544 rocksdb::SyncPoint::GetInstance()->SetCallBack(
2545 "GenericRateLimiter::Request:1", [&](void* arg) {
2546 rate_limit_count.fetch_add(1);
2547 int64_t* rate_bytes_per_sec = static_cast<int64_t*>(arg);
2548 ASSERT_EQ(1024 * 1024, *rate_bytes_per_sec);
2549 });
2550 // Block compaction
2551 rocksdb::SyncPoint::GetInstance()->LoadDependency({
2552 {"DBTest.LowPriWrite:0", "DBImpl::BGWorkCompaction"},
2553 });
2554 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
2555 WriteOptions wo;
2556 for (int i = 0; i < 6; i++) {
2557 wo.low_pri = false;
2558 Put("", "", wo);
2559 wo.low_pri = true;
2560 Put("", "", wo);
2561 Flush();
2562 }
2563 ASSERT_EQ(0, rate_limit_count.load());
2564 wo.low_pri = true;
2565 Put("", "", wo);
2566 ASSERT_EQ(1, rate_limit_count.load());
2567 wo.low_pri = false;
2568 Put("", "", wo);
2569 ASSERT_EQ(1, rate_limit_count.load());
2570
2571 TEST_SYNC_POINT("DBTest.LowPriWrite:0");
2572 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
2573
2574 dbfull()->TEST_WaitForCompact();
2575 wo.low_pri = true;
2576 Put("", "", wo);
2577 ASSERT_EQ(1, rate_limit_count.load());
2578 wo.low_pri = false;
2579 Put("", "", wo);
2580 ASSERT_EQ(1, rate_limit_count.load());
2581}
2582
2583#ifndef ROCKSDB_LITE
2584TEST_F(DBTest2, RateLimitedCompactionReads) {
2585 // compaction input has 512KB data
2586 const int kNumKeysPerFile = 128;
2587 const int kBytesPerKey = 1024;
2588 const int kNumL0Files = 4;
2589
2590 for (auto use_direct_io : {false, true}) {
2591 if (use_direct_io && !IsDirectIOSupported()) {
2592 continue;
2593 }
2594 Options options = CurrentOptions();
2595 options.compression = kNoCompression;
2596 options.level0_file_num_compaction_trigger = kNumL0Files;
2597 options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
2598 options.new_table_reader_for_compaction_inputs = true;
2599 // takes roughly one second, split into 100 x 10ms intervals. Each interval
2600 // permits 5.12KB, which is smaller than the block size, so this test
2601 // exercises the code for chunking reads.
2602 options.rate_limiter.reset(NewGenericRateLimiter(
2603 static_cast<int64_t>(kNumL0Files * kNumKeysPerFile *
2604 kBytesPerKey) /* rate_bytes_per_sec */,
2605 10 * 1000 /* refill_period_us */, 10 /* fairness */,
2606 RateLimiter::Mode::kReadsOnly));
2607 options.use_direct_reads = options.use_direct_io_for_flush_and_compaction =
2608 use_direct_io;
2609 BlockBasedTableOptions bbto;
2610 bbto.block_size = 16384;
2611 bbto.no_block_cache = true;
2612 options.table_factory.reset(new BlockBasedTableFactory(bbto));
2613 DestroyAndReopen(options);
2614
2615 for (int i = 0; i < kNumL0Files; ++i) {
2616 for (int j = 0; j <= kNumKeysPerFile; ++j) {
2617 ASSERT_OK(Put(Key(j), DummyString(kBytesPerKey)));
2618 }
2619 dbfull()->TEST_WaitForFlushMemTable();
2620 ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
2621 }
2622 dbfull()->TEST_WaitForCompact();
2623 ASSERT_EQ(0, NumTableFilesAtLevel(0));
2624
2625 ASSERT_EQ(0, options.rate_limiter->GetTotalBytesThrough(Env::IO_HIGH));
2626 // should be slightly above 512KB due to non-data blocks read. Arbitrarily
2627 // chose 1MB as the upper bound on the total bytes read.
2628 size_t rate_limited_bytes =
2629 options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW);
2630 // Include the explicit prefetch of the footer in direct I/O case.
2631 size_t direct_io_extra = use_direct_io ? 512 * 1024 : 0;
2632 ASSERT_GE(
2633 rate_limited_bytes,
2634 static_cast<size_t>(kNumKeysPerFile * kBytesPerKey * kNumL0Files));
2635 ASSERT_LT(
2636 rate_limited_bytes,
2637 static_cast<size_t>(2 * kNumKeysPerFile * kBytesPerKey * kNumL0Files +
2638 direct_io_extra));
2639
2640 Iterator* iter = db_->NewIterator(ReadOptions());
2641 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
2642 ASSERT_EQ(iter->value().ToString(), DummyString(kBytesPerKey));
2643 }
2644 delete iter;
2645 // bytes read for user iterator shouldn't count against the rate limit.
2646 ASSERT_EQ(rate_limited_bytes,
2647 static_cast<size_t>(
2648 options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW)));
2649 }
2650}
2651#endif // ROCKSDB_LITE
2652
2653// Make sure DB can be reopen with reduced number of levels, given no file
2654// is on levels higher than the new num_levels.
2655TEST_F(DBTest2, ReduceLevel) {
2656 Options options;
2657 options.disable_auto_compactions = true;
2658 options.num_levels = 7;
2659 Reopen(options);
2660 Put("foo", "bar");
2661 Flush();
2662 MoveFilesToLevel(6);
2663#ifndef ROCKSDB_LITE
2664 ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
2665#endif // !ROCKSDB_LITE
2666 CompactRangeOptions compact_options;
2667 compact_options.change_level = true;
2668 compact_options.target_level = 1;
2669 dbfull()->CompactRange(compact_options, nullptr, nullptr);
2670#ifndef ROCKSDB_LITE
2671 ASSERT_EQ("0,1", FilesPerLevel());
2672#endif // !ROCKSDB_LITE
2673 options.num_levels = 3;
2674 Reopen(options);
2675#ifndef ROCKSDB_LITE
2676 ASSERT_EQ("0,1", FilesPerLevel());
2677#endif // !ROCKSDB_LITE
2678}
2679
2680// Test that ReadCallback is actually used in both memtbale and sst tables
2681TEST_F(DBTest2, ReadCallbackTest) {
2682 Options options;
2683 options.disable_auto_compactions = true;
2684 options.num_levels = 7;
2685 Reopen(options);
2686 std::vector<const Snapshot*> snapshots;
2687 // Try to create a db with multiple layers and a memtable
2688 const std::string key = "foo";
2689 const std::string value = "bar";
2690 // This test assumes that the seq start with 1 and increased by 1 after each
2691 // write batch of size 1. If that behavior changes, the test needs to be
2692 // updated as well.
2693 // TODO(myabandeh): update this test to use the seq number that is returned by
2694 // the DB instead of assuming what seq the DB used.
2695 int i = 1;
2696 for (; i < 10; i++) {
2697 Put(key, value + std::to_string(i));
2698 // Take a snapshot to avoid the value being removed during compaction
2699 auto snapshot = dbfull()->GetSnapshot();
2700 snapshots.push_back(snapshot);
2701 }
2702 Flush();
2703 for (; i < 20; i++) {
2704 Put(key, value + std::to_string(i));
2705 // Take a snapshot to avoid the value being removed during compaction
2706 auto snapshot = dbfull()->GetSnapshot();
2707 snapshots.push_back(snapshot);
2708 }
2709 Flush();
2710 MoveFilesToLevel(6);
2711#ifndef ROCKSDB_LITE
2712 ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
2713#endif // !ROCKSDB_LITE
2714 for (; i < 30; i++) {
2715 Put(key, value + std::to_string(i));
2716 auto snapshot = dbfull()->GetSnapshot();
2717 snapshots.push_back(snapshot);
2718 }
2719 Flush();
2720#ifndef ROCKSDB_LITE
2721 ASSERT_EQ("1,0,0,0,0,0,2", FilesPerLevel());
2722#endif // !ROCKSDB_LITE
2723 // And also add some values to the memtable
2724 for (; i < 40; i++) {
2725 Put(key, value + std::to_string(i));
2726 auto snapshot = dbfull()->GetSnapshot();
2727 snapshots.push_back(snapshot);
2728 }
2729
2730 class TestReadCallback : public ReadCallback {
2731 public:
494da23a
TL
2732 explicit TestReadCallback(SequenceNumber snapshot)
2733 : ReadCallback(snapshot), snapshot_(snapshot) {}
2734 bool IsVisibleFullCheck(SequenceNumber seq) override {
11fdf7f2
TL
2735 return seq <= snapshot_;
2736 }
2737
2738 private:
2739 SequenceNumber snapshot_;
2740 };
2741
2742 for (int seq = 1; seq < i; seq++) {
2743 PinnableSlice pinnable_val;
2744 ReadOptions roptions;
2745 TestReadCallback callback(seq);
2746 bool dont_care = true;
2747 Status s = dbfull()->GetImpl(roptions, dbfull()->DefaultColumnFamily(), key,
2748 &pinnable_val, &dont_care, &callback);
2749 ASSERT_TRUE(s.ok());
2750 // Assuming that after each Put the DB increased seq by one, the value and
2751 // seq number must be equal since we also inc value by 1 after each Put.
2752 ASSERT_EQ(value + std::to_string(seq), pinnable_val.ToString());
2753 }
2754
2755 for (auto snapshot : snapshots) {
2756 dbfull()->ReleaseSnapshot(snapshot);
2757 }
2758}
2759
2760#ifndef ROCKSDB_LITE
2761
2762TEST_F(DBTest2, LiveFilesOmitObsoleteFiles) {
2763 // Regression test for race condition where an obsolete file is returned to
2764 // user as a "live file" but then deleted, all while file deletions are
2765 // disabled.
2766 //
2767 // It happened like this:
2768 //
2769 // 1. [flush thread] Log file "x.log" found by FindObsoleteFiles
2770 // 2. [user thread] DisableFileDeletions, GetSortedWalFiles are called and the
2771 // latter returned "x.log"
2772 // 3. [flush thread] PurgeObsoleteFiles deleted "x.log"
2773 // 4. [user thread] Reading "x.log" failed
2774 //
2775 // Unfortunately the only regression test I can come up with involves sleep.
2776 // We cannot set SyncPoints to repro since, once the fix is applied, the
2777 // SyncPoints would cause a deadlock as the repro's sequence of events is now
2778 // prohibited.
2779 //
2780 // Instead, if we sleep for a second between Find and Purge, and ensure the
2781 // read attempt happens after purge, then the sequence of events will almost
2782 // certainly happen on the old code.
2783 rocksdb::SyncPoint::GetInstance()->LoadDependency({
2784 {"DBImpl::BackgroundCallFlush:FilesFound",
2785 "DBTest2::LiveFilesOmitObsoleteFiles:FlushTriggered"},
2786 {"DBImpl::PurgeObsoleteFiles:End",
2787 "DBTest2::LiveFilesOmitObsoleteFiles:LiveFilesCaptured"},
2788 });
2789 rocksdb::SyncPoint::GetInstance()->SetCallBack(
2790 "DBImpl::PurgeObsoleteFiles:Begin",
2791 [&](void* /*arg*/) { env_->SleepForMicroseconds(1000000); });
2792 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
2793
2794 Put("key", "val");
2795 FlushOptions flush_opts;
2796 flush_opts.wait = false;
2797 db_->Flush(flush_opts);
2798 TEST_SYNC_POINT("DBTest2::LiveFilesOmitObsoleteFiles:FlushTriggered");
2799
2800 db_->DisableFileDeletions();
2801 VectorLogPtr log_files;
2802 db_->GetSortedWalFiles(log_files);
2803 TEST_SYNC_POINT("DBTest2::LiveFilesOmitObsoleteFiles:LiveFilesCaptured");
2804 for (const auto& log_file : log_files) {
2805 ASSERT_OK(env_->FileExists(LogFileName(dbname_, log_file->LogNumber())));
2806 }
2807
2808 db_->EnableFileDeletions();
2809 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
2810}
2811
494da23a
TL
2812TEST_F(DBTest2, TestNumPread) {
2813 Options options = CurrentOptions();
2814 // disable block cache
2815 BlockBasedTableOptions table_options;
2816 table_options.no_block_cache = true;
2817 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
2818 Reopen(options);
2819 env_->count_random_reads_ = true;
2820
2821 env_->random_file_open_counter_.store(0);
2822 ASSERT_OK(Put("bar", "foo"));
2823 ASSERT_OK(Put("foo", "bar"));
2824 ASSERT_OK(Flush());
2825 // After flush, we'll open the file and read footer, meta block,
2826 // property block and index block.
2827 ASSERT_EQ(4, env_->random_read_counter_.Read());
2828 ASSERT_EQ(1, env_->random_file_open_counter_.load());
2829
2830 // One pread per a normal data block read
2831 env_->random_file_open_counter_.store(0);
2832 env_->random_read_counter_.Reset();
2833 ASSERT_EQ("bar", Get("foo"));
2834 ASSERT_EQ(1, env_->random_read_counter_.Read());
2835 // All files are already opened.
2836 ASSERT_EQ(0, env_->random_file_open_counter_.load());
2837
2838 env_->random_file_open_counter_.store(0);
2839 env_->random_read_counter_.Reset();
2840 ASSERT_OK(Put("bar2", "foo2"));
2841 ASSERT_OK(Put("foo2", "bar2"));
2842 ASSERT_OK(Flush());
2843 // After flush, we'll open the file and read footer, meta block,
2844 // property block and index block.
2845 ASSERT_EQ(4, env_->random_read_counter_.Read());
2846 ASSERT_EQ(1, env_->random_file_open_counter_.load());
2847
2848 // Compaction needs two input blocks, which requires 2 preads, and
2849 // generate a new SST file which needs 4 preads (footer, meta block,
2850 // property block and index block). In total 6.
2851 env_->random_file_open_counter_.store(0);
2852 env_->random_read_counter_.Reset();
2853 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2854 ASSERT_EQ(6, env_->random_read_counter_.Read());
2855 // All compactin input files should have already been opened.
2856 ASSERT_EQ(1, env_->random_file_open_counter_.load());
2857
2858 // One pread per a normal data block read
2859 env_->random_file_open_counter_.store(0);
2860 env_->random_read_counter_.Reset();
2861 ASSERT_EQ("foo2", Get("bar2"));
2862 ASSERT_EQ(1, env_->random_read_counter_.Read());
2863 // SST files are already opened.
2864 ASSERT_EQ(0, env_->random_file_open_counter_.load());
2865}
2866
11fdf7f2
TL
2867TEST_F(DBTest2, TraceAndReplay) {
2868 Options options = CurrentOptions();
2869 options.merge_operator = MergeOperators::CreatePutOperator();
2870 ReadOptions ro;
2871 WriteOptions wo;
2872 TraceOptions trace_opts;
2873 EnvOptions env_opts;
2874 CreateAndReopenWithCF({"pikachu"}, options);
2875 Random rnd(301);
2876 Iterator* single_iter = nullptr;
2877
494da23a
TL
2878 ASSERT_TRUE(db_->EndTrace().IsIOError());
2879
11fdf7f2
TL
2880 std::string trace_filename = dbname_ + "/rocksdb.trace";
2881 std::unique_ptr<TraceWriter> trace_writer;
2882 ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
2883 ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
2884
2885 ASSERT_OK(Put(0, "a", "1"));
2886 ASSERT_OK(Merge(0, "b", "2"));
2887 ASSERT_OK(Delete(0, "c"));
2888 ASSERT_OK(SingleDelete(0, "d"));
2889 ASSERT_OK(db_->DeleteRange(wo, dbfull()->DefaultColumnFamily(), "e", "f"));
2890
2891 WriteBatch batch;
2892 ASSERT_OK(batch.Put("f", "11"));
2893 ASSERT_OK(batch.Merge("g", "12"));
2894 ASSERT_OK(batch.Delete("h"));
2895 ASSERT_OK(batch.SingleDelete("i"));
2896 ASSERT_OK(batch.DeleteRange("j", "k"));
2897 ASSERT_OK(db_->Write(wo, &batch));
2898
2899 single_iter = db_->NewIterator(ro);
2900 single_iter->Seek("f");
2901 single_iter->SeekForPrev("g");
2902 delete single_iter;
2903
2904 ASSERT_EQ("1", Get(0, "a"));
2905 ASSERT_EQ("12", Get(0, "g"));
2906
2907 ASSERT_OK(Put(1, "foo", "bar"));
2908 ASSERT_OK(Put(1, "rocksdb", "rocks"));
2909 ASSERT_EQ("NOT_FOUND", Get(1, "leveldb"));
2910
2911 ASSERT_OK(db_->EndTrace());
2912 // These should not get into the trace file as it is after EndTrace.
2913 Put("hello", "world");
2914 Merge("foo", "bar");
2915
2916 // Open another db, replay, and verify the data
2917 std::string value;
2918 std::string dbname2 = test::TmpDir(env_) + "/db_replay";
2919 ASSERT_OK(DestroyDB(dbname2, options));
2920
2921 // Using a different name than db2, to pacify infer's use-after-lifetime
2922 // warnings (http://fbinfer.com).
2923 DB* db2_init = nullptr;
2924 options.create_if_missing = true;
2925 ASSERT_OK(DB::Open(options, dbname2, &db2_init));
2926 ColumnFamilyHandle* cf;
2927 ASSERT_OK(
2928 db2_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf));
2929 delete cf;
2930 delete db2_init;
2931
2932 DB* db2 = nullptr;
2933 std::vector<ColumnFamilyDescriptor> column_families;
2934 ColumnFamilyOptions cf_options;
2935 cf_options.merge_operator = MergeOperators::CreatePutOperator();
2936 column_families.push_back(ColumnFamilyDescriptor("default", cf_options));
2937 column_families.push_back(
2938 ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions()));
2939 std::vector<ColumnFamilyHandle*> handles;
2940 ASSERT_OK(DB::Open(DBOptions(), dbname2, column_families, &handles, &db2));
2941
2942 env_->SleepForMicroseconds(100);
2943 // Verify that the keys don't already exist
2944 ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
2945 ASSERT_TRUE(db2->Get(ro, handles[0], "g", &value).IsNotFound());
2946
2947 std::unique_ptr<TraceReader> trace_reader;
2948 ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
2949 Replayer replayer(db2, handles_, std::move(trace_reader));
2950 ASSERT_OK(replayer.Replay());
2951
2952 ASSERT_OK(db2->Get(ro, handles[0], "a", &value));
2953 ASSERT_EQ("1", value);
2954 ASSERT_OK(db2->Get(ro, handles[0], "g", &value));
2955 ASSERT_EQ("12", value);
2956 ASSERT_TRUE(db2->Get(ro, handles[0], "hello", &value).IsNotFound());
2957 ASSERT_TRUE(db2->Get(ro, handles[0], "world", &value).IsNotFound());
2958
2959 ASSERT_OK(db2->Get(ro, handles[1], "foo", &value));
2960 ASSERT_EQ("bar", value);
2961 ASSERT_OK(db2->Get(ro, handles[1], "rocksdb", &value));
2962 ASSERT_EQ("rocks", value);
2963
2964 for (auto handle : handles) {
2965 delete handle;
2966 }
2967 delete db2;
2968 ASSERT_OK(DestroyDB(dbname2, options));
2969}
2970
494da23a
TL
2971TEST_F(DBTest2, TraceWithLimit) {
2972 Options options = CurrentOptions();
2973 options.merge_operator = MergeOperators::CreatePutOperator();
2974 ReadOptions ro;
2975 WriteOptions wo;
2976 TraceOptions trace_opts;
2977 EnvOptions env_opts;
2978 CreateAndReopenWithCF({"pikachu"}, options);
2979 Random rnd(301);
2980
2981 // test the max trace file size options
2982 trace_opts.max_trace_file_size = 5;
2983 std::string trace_filename = dbname_ + "/rocksdb.trace1";
2984 std::unique_ptr<TraceWriter> trace_writer;
2985 ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
2986 ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
2987 ASSERT_OK(Put(0, "a", "1"));
2988 ASSERT_OK(Put(0, "b", "1"));
2989 ASSERT_OK(Put(0, "c", "1"));
2990 ASSERT_OK(db_->EndTrace());
2991
2992 std::string dbname2 = test::TmpDir(env_) + "/db_replay2";
2993 std::string value;
2994 ASSERT_OK(DestroyDB(dbname2, options));
2995
2996 // Using a different name than db2, to pacify infer's use-after-lifetime
2997 // warnings (http://fbinfer.com).
2998 DB* db2_init = nullptr;
2999 options.create_if_missing = true;
3000 ASSERT_OK(DB::Open(options, dbname2, &db2_init));
3001 ColumnFamilyHandle* cf;
3002 ASSERT_OK(
3003 db2_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf));
3004 delete cf;
3005 delete db2_init;
3006
3007 DB* db2 = nullptr;
3008 std::vector<ColumnFamilyDescriptor> column_families;
3009 ColumnFamilyOptions cf_options;
3010 cf_options.merge_operator = MergeOperators::CreatePutOperator();
3011 column_families.push_back(ColumnFamilyDescriptor("default", cf_options));
3012 column_families.push_back(
3013 ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions()));
3014 std::vector<ColumnFamilyHandle*> handles;
3015 ASSERT_OK(DB::Open(DBOptions(), dbname2, column_families, &handles, &db2));
3016
3017 env_->SleepForMicroseconds(100);
3018 // Verify that the keys don't already exist
3019 ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
3020 ASSERT_TRUE(db2->Get(ro, handles[0], "b", &value).IsNotFound());
3021 ASSERT_TRUE(db2->Get(ro, handles[0], "c", &value).IsNotFound());
3022
3023 std::unique_ptr<TraceReader> trace_reader;
3024 ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
3025 Replayer replayer(db2, handles_, std::move(trace_reader));
3026 ASSERT_OK(replayer.Replay());
3027
3028 ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
3029 ASSERT_TRUE(db2->Get(ro, handles[0], "b", &value).IsNotFound());
3030 ASSERT_TRUE(db2->Get(ro, handles[0], "c", &value).IsNotFound());
3031
3032 for (auto handle : handles) {
3033 delete handle;
3034 }
3035 delete db2;
3036 ASSERT_OK(DestroyDB(dbname2, options));
3037}
3038
3039TEST_F(DBTest2, TraceWithSampling) {
3040 Options options = CurrentOptions();
3041 ReadOptions ro;
3042 WriteOptions wo;
3043 TraceOptions trace_opts;
3044 EnvOptions env_opts;
3045 CreateAndReopenWithCF({"pikachu"}, options);
3046 Random rnd(301);
3047
3048 // test the trace file sampling options
3049 trace_opts.sampling_frequency = 2;
3050 std::string trace_filename = dbname_ + "/rocksdb.trace_sampling";
3051 std::unique_ptr<TraceWriter> trace_writer;
3052 ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
3053 ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
3054 ASSERT_OK(Put(0, "a", "1"));
3055 ASSERT_OK(Put(0, "b", "2"));
3056 ASSERT_OK(Put(0, "c", "3"));
3057 ASSERT_OK(Put(0, "d", "4"));
3058 ASSERT_OK(Put(0, "e", "5"));
3059 ASSERT_OK(db_->EndTrace());
3060
3061 std::string dbname2 = test::TmpDir(env_) + "/db_replay_sampling";
3062 std::string value;
3063 ASSERT_OK(DestroyDB(dbname2, options));
3064
3065 // Using a different name than db2, to pacify infer's use-after-lifetime
3066 // warnings (http://fbinfer.com).
3067 DB* db2_init = nullptr;
3068 options.create_if_missing = true;
3069 ASSERT_OK(DB::Open(options, dbname2, &db2_init));
3070 ColumnFamilyHandle* cf;
3071 ASSERT_OK(
3072 db2_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf));
3073 delete cf;
3074 delete db2_init;
3075
3076 DB* db2 = nullptr;
3077 std::vector<ColumnFamilyDescriptor> column_families;
3078 ColumnFamilyOptions cf_options;
3079 column_families.push_back(ColumnFamilyDescriptor("default", cf_options));
3080 column_families.push_back(
3081 ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions()));
3082 std::vector<ColumnFamilyHandle*> handles;
3083 ASSERT_OK(DB::Open(DBOptions(), dbname2, column_families, &handles, &db2));
3084
3085 env_->SleepForMicroseconds(100);
3086 ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
3087 ASSERT_TRUE(db2->Get(ro, handles[0], "b", &value).IsNotFound());
3088 ASSERT_TRUE(db2->Get(ro, handles[0], "c", &value).IsNotFound());
3089 ASSERT_TRUE(db2->Get(ro, handles[0], "d", &value).IsNotFound());
3090 ASSERT_TRUE(db2->Get(ro, handles[0], "e", &value).IsNotFound());
3091
3092 std::unique_ptr<TraceReader> trace_reader;
3093 ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
3094 Replayer replayer(db2, handles_, std::move(trace_reader));
3095 ASSERT_OK(replayer.Replay());
3096
3097 ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
3098 ASSERT_FALSE(db2->Get(ro, handles[0], "b", &value).IsNotFound());
3099 ASSERT_TRUE(db2->Get(ro, handles[0], "c", &value).IsNotFound());
3100 ASSERT_FALSE(db2->Get(ro, handles[0], "d", &value).IsNotFound());
3101 ASSERT_TRUE(db2->Get(ro, handles[0], "e", &value).IsNotFound());
3102
3103 for (auto handle : handles) {
3104 delete handle;
3105 }
3106 delete db2;
3107 ASSERT_OK(DestroyDB(dbname2, options));
3108}
3109
3110TEST_F(DBTest2, TraceWithFilter) {
3111 Options options = CurrentOptions();
3112 options.merge_operator = MergeOperators::CreatePutOperator();
3113 ReadOptions ro;
3114 WriteOptions wo;
3115 TraceOptions trace_opts;
3116 EnvOptions env_opts;
3117 CreateAndReopenWithCF({"pikachu"}, options);
3118 Random rnd(301);
3119 Iterator* single_iter = nullptr;
3120
3121 trace_opts.filter = TraceFilterType::kTraceFilterWrite;
3122
3123 std::string trace_filename = dbname_ + "/rocksdb.trace";
3124 std::unique_ptr<TraceWriter> trace_writer;
3125 ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
3126 ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
3127
3128 ASSERT_OK(Put(0, "a", "1"));
3129 ASSERT_OK(Merge(0, "b", "2"));
3130 ASSERT_OK(Delete(0, "c"));
3131 ASSERT_OK(SingleDelete(0, "d"));
3132 ASSERT_OK(db_->DeleteRange(wo, dbfull()->DefaultColumnFamily(), "e", "f"));
3133
3134 WriteBatch batch;
3135 ASSERT_OK(batch.Put("f", "11"));
3136 ASSERT_OK(batch.Merge("g", "12"));
3137 ASSERT_OK(batch.Delete("h"));
3138 ASSERT_OK(batch.SingleDelete("i"));
3139 ASSERT_OK(batch.DeleteRange("j", "k"));
3140 ASSERT_OK(db_->Write(wo, &batch));
3141
3142 single_iter = db_->NewIterator(ro);
3143 single_iter->Seek("f");
3144 single_iter->SeekForPrev("g");
3145 delete single_iter;
3146
3147 ASSERT_EQ("1", Get(0, "a"));
3148 ASSERT_EQ("12", Get(0, "g"));
3149
3150 ASSERT_OK(Put(1, "foo", "bar"));
3151 ASSERT_OK(Put(1, "rocksdb", "rocks"));
3152 ASSERT_EQ("NOT_FOUND", Get(1, "leveldb"));
3153
3154 ASSERT_OK(db_->EndTrace());
3155 // These should not get into the trace file as it is after EndTrace.
3156 Put("hello", "world");
3157 Merge("foo", "bar");
3158
3159 // Open another db, replay, and verify the data
3160 std::string value;
3161 std::string dbname2 = test::TmpDir(env_) + "/db_replay";
3162 ASSERT_OK(DestroyDB(dbname2, options));
3163
3164 // Using a different name than db2, to pacify infer's use-after-lifetime
3165 // warnings (http://fbinfer.com).
3166 DB* db2_init = nullptr;
3167 options.create_if_missing = true;
3168 ASSERT_OK(DB::Open(options, dbname2, &db2_init));
3169 ColumnFamilyHandle* cf;
3170 ASSERT_OK(
3171 db2_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf));
3172 delete cf;
3173 delete db2_init;
3174
3175 DB* db2 = nullptr;
3176 std::vector<ColumnFamilyDescriptor> column_families;
3177 ColumnFamilyOptions cf_options;
3178 cf_options.merge_operator = MergeOperators::CreatePutOperator();
3179 column_families.push_back(ColumnFamilyDescriptor("default", cf_options));
3180 column_families.push_back(
3181 ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions()));
3182 std::vector<ColumnFamilyHandle*> handles;
3183 ASSERT_OK(DB::Open(DBOptions(), dbname2, column_families, &handles, &db2));
3184
3185 env_->SleepForMicroseconds(100);
3186 // Verify that the keys don't already exist
3187 ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
3188 ASSERT_TRUE(db2->Get(ro, handles[0], "g", &value).IsNotFound());
3189
3190 std::unique_ptr<TraceReader> trace_reader;
3191 ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
3192 Replayer replayer(db2, handles_, std::move(trace_reader));
3193 ASSERT_OK(replayer.Replay());
3194
3195 // All the key-values should not present since we filter out the WRITE ops.
3196 ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
3197 ASSERT_TRUE(db2->Get(ro, handles[0], "g", &value).IsNotFound());
3198 ASSERT_TRUE(db2->Get(ro, handles[0], "hello", &value).IsNotFound());
3199 ASSERT_TRUE(db2->Get(ro, handles[0], "world", &value).IsNotFound());
3200 ASSERT_TRUE(db2->Get(ro, handles[0], "foo", &value).IsNotFound());
3201 ASSERT_TRUE(db2->Get(ro, handles[0], "rocksdb", &value).IsNotFound());
3202
3203 for (auto handle : handles) {
3204 delete handle;
3205 }
3206 delete db2;
3207 ASSERT_OK(DestroyDB(dbname2, options));
3208
3209 // Set up a new db.
3210 std::string dbname3 = test::TmpDir(env_) + "/db_not_trace_read";
3211 ASSERT_OK(DestroyDB(dbname3, options));
3212
3213 DB* db3_init = nullptr;
3214 options.create_if_missing = true;
3215 ColumnFamilyHandle* cf3;
3216 ASSERT_OK(DB::Open(options, dbname3, &db3_init));
3217 ASSERT_OK(
3218 db3_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf3));
3219 delete cf3;
3220 delete db3_init;
3221
3222 column_families.clear();
3223 column_families.push_back(ColumnFamilyDescriptor("default", cf_options));
3224 column_families.push_back(
3225 ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions()));
3226 handles.clear();
3227
3228 DB* db3 = nullptr;
3229 ASSERT_OK(DB::Open(DBOptions(), dbname3, column_families, &handles, &db3));
3230
3231 env_->SleepForMicroseconds(100);
3232 // Verify that the keys don't already exist
3233 ASSERT_TRUE(db3->Get(ro, handles[0], "a", &value).IsNotFound());
3234 ASSERT_TRUE(db3->Get(ro, handles[0], "g", &value).IsNotFound());
3235
3236 //The tracer will not record the READ ops.
3237 trace_opts.filter = TraceFilterType::kTraceFilterGet;
3238 std::string trace_filename3 = dbname_ + "/rocksdb.trace_3";
3239 std::unique_ptr<TraceWriter> trace_writer3;
3240 ASSERT_OK(
3241 NewFileTraceWriter(env_, env_opts, trace_filename3, &trace_writer3));
3242 ASSERT_OK(db3->StartTrace(trace_opts, std::move(trace_writer3)));
3243
3244 ASSERT_OK(db3->Put(wo, handles[0], "a", "1"));
3245 ASSERT_OK(db3->Merge(wo, handles[0], "b", "2"));
3246 ASSERT_OK(db3->Delete(wo, handles[0], "c"));
3247 ASSERT_OK(db3->SingleDelete(wo, handles[0], "d"));
3248
3249 ASSERT_OK(db3->Get(ro, handles[0], "a", &value));
3250 ASSERT_EQ(value, "1");
3251 ASSERT_TRUE(db3->Get(ro, handles[0], "c", &value).IsNotFound());
3252
3253 ASSERT_OK(db3->EndTrace());
3254
3255 for (auto handle : handles) {
3256 delete handle;
3257 }
3258 delete db3;
3259 ASSERT_OK(DestroyDB(dbname3, options));
3260
3261 std::unique_ptr<TraceReader> trace_reader3;
3262 ASSERT_OK(
3263 NewFileTraceReader(env_, env_opts, trace_filename3, &trace_reader3));
3264
3265 // Count the number of records in the trace file;
3266 int count = 0;
3267 std::string data;
3268 Status s;
3269 while (true) {
3270 s = trace_reader3->Read(&data);
3271 if (!s.ok()) {
3272 break;
3273 }
3274 count += 1;
3275 }
3276 // We also need to count the header and footer
3277 // 4 WRITE + HEADER + FOOTER = 6
3278 ASSERT_EQ(count, 6);
3279}
3280
11fdf7f2
TL
3281#endif // ROCKSDB_LITE
3282
3283TEST_F(DBTest2, PinnableSliceAndMmapReads) {
3284 Options options = CurrentOptions();
3285 options.allow_mmap_reads = true;
3286 options.max_open_files = 100;
3287 options.compression = kNoCompression;
3288 Reopen(options);
3289
3290 ASSERT_OK(Put("foo", "bar"));
3291 ASSERT_OK(Flush());
3292
3293 PinnableSlice pinned_value;
3294 ASSERT_EQ(Get("foo", &pinned_value), Status::OK());
3295 // It is not safe to pin mmap files as they might disappear by compaction
3296 ASSERT_FALSE(pinned_value.IsPinned());
3297 ASSERT_EQ(pinned_value.ToString(), "bar");
3298
3299 dbfull()->TEST_CompactRange(0 /* level */, nullptr /* begin */,
3300 nullptr /* end */, nullptr /* column_family */,
3301 true /* disallow_trivial_move */);
3302
3303 // Ensure pinned_value doesn't rely on memory munmap'd by the above
3304 // compaction. It crashes if it does.
3305 ASSERT_EQ(pinned_value.ToString(), "bar");
3306
3307#ifndef ROCKSDB_LITE
3308 pinned_value.Reset();
3309 // Unsafe to pin mmap files when they could be kicked out of table cache
3310 Close();
3311 ASSERT_OK(ReadOnlyReopen(options));
3312 ASSERT_EQ(Get("foo", &pinned_value), Status::OK());
3313 ASSERT_FALSE(pinned_value.IsPinned());
3314 ASSERT_EQ(pinned_value.ToString(), "bar");
3315
3316 pinned_value.Reset();
3317 // In read-only mode with infinite capacity on table cache it should pin the
3318 // value and avoid the memcpy
3319 Close();
3320 options.max_open_files = -1;
3321 ASSERT_OK(ReadOnlyReopen(options));
3322 ASSERT_EQ(Get("foo", &pinned_value), Status::OK());
3323 ASSERT_TRUE(pinned_value.IsPinned());
3324 ASSERT_EQ(pinned_value.ToString(), "bar");
3325#endif
3326}
3327
3328TEST_F(DBTest2, DISABLED_IteratorPinnedMemory) {
3329 Options options = CurrentOptions();
3330 options.create_if_missing = true;
3331 options.statistics = rocksdb::CreateDBStatistics();
3332 BlockBasedTableOptions bbto;
3333 bbto.no_block_cache = false;
3334 bbto.cache_index_and_filter_blocks = false;
3335 bbto.block_cache = NewLRUCache(100000);
3336 bbto.block_size = 400; // small block size
3337 options.table_factory.reset(new BlockBasedTableFactory(bbto));
3338 Reopen(options);
3339
3340 Random rnd(301);
3341 std::string v = RandomString(&rnd, 400);
3342
3343 // Since v is the size of a block, each key should take a block
3344 // of 400+ bytes.
3345 Put("1", v);
3346 Put("3", v);
3347 Put("5", v);
3348 Put("7", v);
3349 ASSERT_OK(Flush());
3350
3351 ASSERT_EQ(0, bbto.block_cache->GetPinnedUsage());
3352
3353 // Verify that iterators don't pin more than one data block in block cache
3354 // at each time.
3355 {
494da23a 3356 std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
11fdf7f2
TL
3357 iter->SeekToFirst();
3358
3359 for (int i = 0; i < 4; i++) {
3360 ASSERT_TRUE(iter->Valid());
3361 // Block cache should contain exactly one block.
3362 ASSERT_GT(bbto.block_cache->GetPinnedUsage(), 0);
3363 ASSERT_LT(bbto.block_cache->GetPinnedUsage(), 800);
3364 iter->Next();
3365 }
3366 ASSERT_FALSE(iter->Valid());
3367
3368 iter->Seek("4");
3369 ASSERT_TRUE(iter->Valid());
3370
3371 ASSERT_GT(bbto.block_cache->GetPinnedUsage(), 0);
3372 ASSERT_LT(bbto.block_cache->GetPinnedUsage(), 800);
3373
3374 iter->Seek("3");
3375 ASSERT_TRUE(iter->Valid());
3376
3377 ASSERT_GT(bbto.block_cache->GetPinnedUsage(), 0);
3378 ASSERT_LT(bbto.block_cache->GetPinnedUsage(), 800);
3379 }
3380 ASSERT_EQ(0, bbto.block_cache->GetPinnedUsage());
3381
3382 // Test compaction case
3383 Put("2", v);
3384 Put("5", v);
3385 Put("6", v);
3386 Put("8", v);
3387 ASSERT_OK(Flush());
3388
3389 // Clear existing data in block cache
3390 bbto.block_cache->SetCapacity(0);
3391 bbto.block_cache->SetCapacity(100000);
3392
3393 // Verify compaction input iterators don't hold more than one data blocks at
3394 // one time.
3395 std::atomic<bool> finished(false);
3396 std::atomic<int> block_newed(0);
3397 std::atomic<int> block_destroyed(0);
3398 rocksdb::SyncPoint::GetInstance()->SetCallBack(
3399 "Block::Block:0", [&](void* /*arg*/) {
3400 if (finished) {
3401 return;
3402 }
3403 // Two iterators. At most 2 outstanding blocks.
3404 EXPECT_GE(block_newed.load(), block_destroyed.load());
3405 EXPECT_LE(block_newed.load(), block_destroyed.load() + 1);
3406 block_newed.fetch_add(1);
3407 });
3408 rocksdb::SyncPoint::GetInstance()->SetCallBack(
3409 "Block::~Block", [&](void* /*arg*/) {
3410 if (finished) {
3411 return;
3412 }
3413 // Two iterators. At most 2 outstanding blocks.
3414 EXPECT_GE(block_newed.load(), block_destroyed.load() + 1);
3415 EXPECT_LE(block_newed.load(), block_destroyed.load() + 2);
3416 block_destroyed.fetch_add(1);
3417 });
3418 rocksdb::SyncPoint::GetInstance()->SetCallBack(
3419 "CompactionJob::Run:BeforeVerify",
3420 [&](void* /*arg*/) { finished = true; });
3421 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
3422
3423 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
3424
3425 // Two input files. Each of them has 4 data blocks.
3426 ASSERT_EQ(8, block_newed.load());
3427 ASSERT_EQ(8, block_destroyed.load());
3428
3429 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
3430}
3431
3432TEST_F(DBTest2, TestBBTTailPrefetch) {
3433 std::atomic<bool> called(false);
3434 size_t expected_lower_bound = 512 * 1024;
3435 size_t expected_higher_bound = 512 * 1024;
3436 rocksdb::SyncPoint::GetInstance()->SetCallBack(
3437 "BlockBasedTable::Open::TailPrefetchLen", [&](void* arg) {
3438 size_t* prefetch_size = static_cast<size_t*>(arg);
3439 EXPECT_LE(expected_lower_bound, *prefetch_size);
3440 EXPECT_GE(expected_higher_bound, *prefetch_size);
3441 called = true;
3442 });
3443 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
3444
3445 Put("1", "1");
3446 Put("9", "1");
3447 Flush();
3448
3449 expected_lower_bound = 0;
3450 expected_higher_bound = 8 * 1024;
3451
3452 Put("1", "1");
3453 Put("9", "1");
3454 Flush();
3455
3456 Put("1", "1");
3457 Put("9", "1");
3458 Flush();
3459
3460 // Full compaction to make sure there is no L0 file after the open.
3461 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
3462
3463 ASSERT_TRUE(called.load());
3464 called = false;
3465
3466 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
3467 rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
3468
3469 std::atomic<bool> first_call(true);
3470 rocksdb::SyncPoint::GetInstance()->SetCallBack(
3471 "BlockBasedTable::Open::TailPrefetchLen", [&](void* arg) {
3472 size_t* prefetch_size = static_cast<size_t*>(arg);
3473 if (first_call) {
3474 EXPECT_EQ(4 * 1024, *prefetch_size);
3475 first_call = false;
3476 } else {
3477 EXPECT_GE(4 * 1024, *prefetch_size);
3478 }
3479 called = true;
3480 });
3481 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
3482
3483 Options options = CurrentOptions();
3484 options.max_file_opening_threads = 1; // one thread
3485 BlockBasedTableOptions table_options;
3486 table_options.cache_index_and_filter_blocks = true;
3487 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
3488 options.max_open_files = -1;
3489 Reopen(options);
3490
3491 Put("1", "1");
3492 Put("9", "1");
3493 Flush();
3494
3495 Put("1", "1");
3496 Put("9", "1");
3497 Flush();
3498
3499 ASSERT_TRUE(called.load());
3500 called = false;
3501
3502 // Parallel loading SST files
3503 options.max_file_opening_threads = 16;
3504 Reopen(options);
3505
3506 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
3507
3508 ASSERT_TRUE(called.load());
3509
3510 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
3511 rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
3512}
3513
494da23a
TL
3514TEST_F(DBTest2, TestGetColumnFamilyHandleUnlocked) {
3515 // Setup sync point dependency to reproduce the race condition of
3516 // DBImpl::GetColumnFamilyHandleUnlocked
3517 rocksdb::SyncPoint::GetInstance()->LoadDependency(
3518 { {"TestGetColumnFamilyHandleUnlocked::GetColumnFamilyHandleUnlocked1",
3519 "TestGetColumnFamilyHandleUnlocked::PreGetColumnFamilyHandleUnlocked2"},
3520 {"TestGetColumnFamilyHandleUnlocked::GetColumnFamilyHandleUnlocked2",
3521 "TestGetColumnFamilyHandleUnlocked::ReadColumnFamilyHandle1"},
3522 });
3523 SyncPoint::GetInstance()->EnableProcessing();
3524
3525 CreateColumnFamilies({"test1", "test2"}, Options());
3526 ASSERT_EQ(handles_.size(), 2);
3527
3528 DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
3529 port::Thread user_thread1([&]() {
3530 auto cfh = dbi->GetColumnFamilyHandleUnlocked(handles_[0]->GetID());
3531 ASSERT_EQ(cfh->GetID(), handles_[0]->GetID());
3532 TEST_SYNC_POINT("TestGetColumnFamilyHandleUnlocked::GetColumnFamilyHandleUnlocked1");
3533 TEST_SYNC_POINT("TestGetColumnFamilyHandleUnlocked::ReadColumnFamilyHandle1");
3534 ASSERT_EQ(cfh->GetID(), handles_[0]->GetID());
3535 });
3536
3537 port::Thread user_thread2([&]() {
3538 TEST_SYNC_POINT("TestGetColumnFamilyHandleUnlocked::PreGetColumnFamilyHandleUnlocked2");
3539 auto cfh = dbi->GetColumnFamilyHandleUnlocked(handles_[1]->GetID());
3540 ASSERT_EQ(cfh->GetID(), handles_[1]->GetID());
3541 TEST_SYNC_POINT("TestGetColumnFamilyHandleUnlocked::GetColumnFamilyHandleUnlocked2");
3542 ASSERT_EQ(cfh->GetID(), handles_[1]->GetID());
3543 });
3544
3545 user_thread1.join();
3546 user_thread2.join();
3547
3548 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
3549 rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
3550}
3551
3552#ifndef ROCKSDB_LITE
3553TEST_F(DBTest2, TestCompactFiles) {
3554 // Setup sync point dependency to reproduce the race condition of
3555 // DBImpl::GetColumnFamilyHandleUnlocked
3556 rocksdb::SyncPoint::GetInstance()->LoadDependency({
3557 {"TestCompactFiles::IngestExternalFile1",
3558 "TestCompactFiles::IngestExternalFile2"},
3559 });
3560 SyncPoint::GetInstance()->EnableProcessing();
3561
3562 Options options;
3563 options.num_levels = 2;
3564 options.disable_auto_compactions = true;
3565 Reopen(options);
3566 auto* handle = db_->DefaultColumnFamily();
3567 ASSERT_EQ(db_->NumberLevels(handle), 2);
3568
3569 rocksdb::SstFileWriter sst_file_writer{rocksdb::EnvOptions(), options};
3570 std::string external_file1 = dbname_ + "/test_compact_files1.sst_t";
3571 std::string external_file2 = dbname_ + "/test_compact_files2.sst_t";
3572 std::string external_file3 = dbname_ + "/test_compact_files3.sst_t";
3573
3574 ASSERT_OK(sst_file_writer.Open(external_file1));
3575 ASSERT_OK(sst_file_writer.Put("1", "1"));
3576 ASSERT_OK(sst_file_writer.Put("2", "2"));
3577 ASSERT_OK(sst_file_writer.Finish());
3578
3579 ASSERT_OK(sst_file_writer.Open(external_file2));
3580 ASSERT_OK(sst_file_writer.Put("3", "3"));
3581 ASSERT_OK(sst_file_writer.Put("4", "4"));
3582 ASSERT_OK(sst_file_writer.Finish());
3583
3584 ASSERT_OK(sst_file_writer.Open(external_file3));
3585 ASSERT_OK(sst_file_writer.Put("5", "5"));
3586 ASSERT_OK(sst_file_writer.Put("6", "6"));
3587 ASSERT_OK(sst_file_writer.Finish());
3588
3589 ASSERT_OK(db_->IngestExternalFile(handle, {external_file1, external_file3},
3590 IngestExternalFileOptions()));
3591 ASSERT_EQ(NumTableFilesAtLevel(1, 0), 2);
3592 std::vector<std::string> files;
3593 GetSstFiles(env_, dbname_, &files);
3594 ASSERT_EQ(files.size(), 2);
3595
3596 port::Thread user_thread1(
3597 [&]() { db_->CompactFiles(CompactionOptions(), handle, files, 1); });
3598
3599 port::Thread user_thread2([&]() {
3600 ASSERT_OK(db_->IngestExternalFile(handle, {external_file2},
3601 IngestExternalFileOptions()));
3602 TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile1");
3603 });
3604
3605 user_thread1.join();
3606 user_thread2.join();
3607
3608 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
3609 rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
3610}
3611#endif // ROCKSDB_LITE
3612
3613// TODO: figure out why this test fails in appveyor
3614#ifndef OS_WIN
3615TEST_F(DBTest2, MultiDBParallelOpenTest) {
3616 const int kNumDbs = 2;
3617 Options options = CurrentOptions();
3618 std::vector<std::string> dbnames;
3619 for (int i = 0; i < kNumDbs; ++i) {
3620 dbnames.emplace_back(test::TmpDir(env_) + "/db" + ToString(i));
3621 ASSERT_OK(DestroyDB(dbnames.back(), options));
3622 }
3623
3624 // Verify empty DBs can be created in parallel
3625 std::vector<std::thread> open_threads;
3626 std::vector<DB*> dbs{static_cast<unsigned int>(kNumDbs), nullptr};
3627 options.create_if_missing = true;
3628 for (int i = 0; i < kNumDbs; ++i) {
3629 open_threads.emplace_back(
3630 [&](int dbnum) {
3631 ASSERT_OK(DB::Open(options, dbnames[dbnum], &dbs[dbnum]));
3632 },
3633 i);
3634 }
3635
3636 // Now add some data and close, so next we can verify non-empty DBs can be
3637 // recovered in parallel
3638 for (int i = 0; i < kNumDbs; ++i) {
3639 open_threads[i].join();
3640 ASSERT_OK(dbs[i]->Put(WriteOptions(), "xi", "gua"));
3641 delete dbs[i];
3642 }
3643
3644 // Verify non-empty DBs can be recovered in parallel
3645 dbs.clear();
3646 open_threads.clear();
3647 for (int i = 0; i < kNumDbs; ++i) {
3648 open_threads.emplace_back(
3649 [&](int dbnum) {
3650 ASSERT_OK(DB::Open(options, dbnames[dbnum], &dbs[dbnum]));
3651 },
3652 i);
3653 }
3654
3655 // Wait and cleanup
3656 for (int i = 0; i < kNumDbs; ++i) {
3657 open_threads[i].join();
3658 delete dbs[i];
3659 ASSERT_OK(DestroyDB(dbnames[i], options));
3660 }
3661}
3662#endif // OS_WIN
3663
3664namespace {
3665class DummyOldStats : public Statistics {
3666 public:
3667 uint64_t getTickerCount(uint32_t /*ticker_type*/) const override { return 0; }
3668 void recordTick(uint32_t /* ticker_type */, uint64_t /* count */) override {
3669 num_rt++;
3670 }
3671 void setTickerCount(uint32_t /*ticker_type*/, uint64_t /*count*/) override {}
3672 uint64_t getAndResetTickerCount(uint32_t /*ticker_type*/) override {
3673 return 0;
3674 }
3675 void measureTime(uint32_t /*histogram_type*/, uint64_t /*count*/) override {
3676 num_mt++;
3677 }
3678 void histogramData(uint32_t /*histogram_type*/,
3679 rocksdb::HistogramData* const /*data*/) const override {}
3680 std::string getHistogramString(uint32_t /*type*/) const override {
3681 return "";
3682 }
3683 bool HistEnabledForType(uint32_t /*type*/) const override { return false; }
3684 std::string ToString() const override { return ""; }
3685 int num_rt = 0;
3686 int num_mt = 0;
3687};
3688} // namespace
3689
3690TEST_F(DBTest2, OldStatsInterface) {
3691 DummyOldStats* dos = new DummyOldStats();
3692 std::shared_ptr<Statistics> stats(dos);
3693 Options options = CurrentOptions();
3694 options.create_if_missing = true;
3695 options.statistics = stats;
3696 Reopen(options);
3697
3698 Put("foo", "bar");
3699 ASSERT_EQ("bar", Get("foo"));
3700 ASSERT_OK(Flush());
3701 ASSERT_EQ("bar", Get("foo"));
3702
3703 ASSERT_GT(dos->num_rt, 0);
3704 ASSERT_GT(dos->num_mt, 0);
3705}
7c673cae
FG
3706} // namespace rocksdb
3707
3708int main(int argc, char** argv) {
3709 rocksdb::port::InstallStackTraceHandler();
3710 ::testing::InitGoogleTest(&argc, argv);
3711 return RUN_ALL_TESTS();
3712}