1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 // Introduction of SyncPoint effectively disabled building and running this test
12 // which is a pity, it is a good test
17 #include <unordered_set>
26 #include "db/db_impl.h"
27 #include "db/db_test_util.h"
28 #include "db/dbformat.h"
29 #include "db/job_context.h"
30 #include "db/version_set.h"
31 #include "db/write_batch_internal.h"
32 #include "env/mock_env.h"
33 #include "memtable/hash_linklist_rep.h"
34 #include "monitoring/thread_status_util.h"
35 #include "port/port.h"
36 #include "port/stack_trace.h"
37 #include "rocksdb/cache.h"
38 #include "rocksdb/compaction_filter.h"
39 #include "rocksdb/convenience.h"
40 #include "rocksdb/db.h"
41 #include "rocksdb/env.h"
42 #include "rocksdb/experimental.h"
43 #include "rocksdb/filter_policy.h"
44 #include "rocksdb/options.h"
45 #include "rocksdb/perf_context.h"
46 #include "rocksdb/slice.h"
47 #include "rocksdb/slice_transform.h"
48 #include "rocksdb/snapshot.h"
49 #include "rocksdb/table.h"
50 #include "rocksdb/table_properties.h"
51 #include "rocksdb/thread_status.h"
52 #include "rocksdb/utilities/checkpoint.h"
53 #include "rocksdb/utilities/optimistic_transaction_db.h"
54 #include "rocksdb/utilities/write_batch_with_index.h"
55 #include "table/block_based_table_factory.h"
56 #include "table/mock_table.h"
57 #include "table/plain_table_factory.h"
58 #include "table/scoped_arena_iterator.h"
59 #include "util/compression.h"
60 #include "util/file_reader_writer.h"
61 #include "util/filename.h"
62 #include "util/hash.h"
63 #include "util/logging.h"
64 #include "util/mutexlock.h"
65 #include "util/rate_limiter.h"
66 #include "util/string_util.h"
67 #include "util/sync_point.h"
68 #include "util/testharness.h"
69 #include "util/testutil.h"
70 #include "utilities/merge_operators.h"
74 class DBTest
: public DBTestBase
{
76 DBTest() : DBTestBase("/db_test") {}
81 public testing::WithParamInterface
<std::tuple
<uint32_t, bool>> {
84 max_subcompactions_
= std::get
<0>(GetParam());
85 exclusive_manual_compaction_
= std::get
<1>(GetParam());
88 // Required if inheriting from testing::WithParamInterface<>
89 static void SetUpTestCase() {}
90 static void TearDownTestCase() {}
92 uint32_t max_subcompactions_
;
93 bool exclusive_manual_compaction_
;
96 TEST_F(DBTest
, MockEnvTest
) {
97 unique_ptr
<MockEnv
> env
{new MockEnv(Env::Default())};
99 options
.create_if_missing
= true;
100 options
.env
= env
.get();
103 const Slice keys
[] = {Slice("aaa"), Slice("bbb"), Slice("ccc")};
104 const Slice vals
[] = {Slice("foo"), Slice("bar"), Slice("baz")};
106 ASSERT_OK(DB::Open(options
, "/dir/db", &db
));
107 for (size_t i
= 0; i
< 3; ++i
) {
108 ASSERT_OK(db
->Put(WriteOptions(), keys
[i
], vals
[i
]));
111 for (size_t i
= 0; i
< 3; ++i
) {
113 ASSERT_OK(db
->Get(ReadOptions(), keys
[i
], &res
));
114 ASSERT_TRUE(res
== vals
[i
]);
117 Iterator
* iterator
= db
->NewIterator(ReadOptions());
118 iterator
->SeekToFirst();
119 for (size_t i
= 0; i
< 3; ++i
) {
120 ASSERT_TRUE(iterator
->Valid());
121 ASSERT_TRUE(keys
[i
] == iterator
->key());
122 ASSERT_TRUE(vals
[i
] == iterator
->value());
125 ASSERT_TRUE(!iterator
->Valid());
128 // TEST_FlushMemTable() is not supported in ROCKSDB_LITE
130 DBImpl
* dbi
= reinterpret_cast<DBImpl
*>(db
);
131 ASSERT_OK(dbi
->TEST_FlushMemTable());
133 for (size_t i
= 0; i
< 3; ++i
) {
135 ASSERT_OK(db
->Get(ReadOptions(), keys
[i
], &res
));
136 ASSERT_TRUE(res
== vals
[i
]);
138 #endif // ROCKSDB_LITE
143 // NewMemEnv returns nullptr in ROCKSDB_LITE since class InMemoryEnv isn't
146 TEST_F(DBTest
, MemEnvTest
) {
147 unique_ptr
<Env
> env
{NewMemEnv(Env::Default())};
149 options
.create_if_missing
= true;
150 options
.env
= env
.get();
153 const Slice keys
[] = {Slice("aaa"), Slice("bbb"), Slice("ccc")};
154 const Slice vals
[] = {Slice("foo"), Slice("bar"), Slice("baz")};
156 ASSERT_OK(DB::Open(options
, "/dir/db", &db
));
157 for (size_t i
= 0; i
< 3; ++i
) {
158 ASSERT_OK(db
->Put(WriteOptions(), keys
[i
], vals
[i
]));
161 for (size_t i
= 0; i
< 3; ++i
) {
163 ASSERT_OK(db
->Get(ReadOptions(), keys
[i
], &res
));
164 ASSERT_TRUE(res
== vals
[i
]);
167 Iterator
* iterator
= db
->NewIterator(ReadOptions());
168 iterator
->SeekToFirst();
169 for (size_t i
= 0; i
< 3; ++i
) {
170 ASSERT_TRUE(iterator
->Valid());
171 ASSERT_TRUE(keys
[i
] == iterator
->key());
172 ASSERT_TRUE(vals
[i
] == iterator
->value());
175 ASSERT_TRUE(!iterator
->Valid());
178 DBImpl
* dbi
= reinterpret_cast<DBImpl
*>(db
);
179 ASSERT_OK(dbi
->TEST_FlushMemTable());
181 for (size_t i
= 0; i
< 3; ++i
) {
183 ASSERT_OK(db
->Get(ReadOptions(), keys
[i
], &res
));
184 ASSERT_TRUE(res
== vals
[i
]);
189 options
.create_if_missing
= false;
190 ASSERT_OK(DB::Open(options
, "/dir/db", &db
));
191 for (size_t i
= 0; i
< 3; ++i
) {
193 ASSERT_OK(db
->Get(ReadOptions(), keys
[i
], &res
));
194 ASSERT_TRUE(res
== vals
[i
]);
198 #endif // ROCKSDB_LITE
200 TEST_F(DBTest
, WriteEmptyBatch
) {
201 Options options
= CurrentOptions();
203 options
.write_buffer_size
= 100000;
204 CreateAndReopenWithCF({"pikachu"}, options
);
206 ASSERT_OK(Put(1, "foo", "bar"));
209 wo
.disableWAL
= false;
210 WriteBatch empty_batch
;
211 ASSERT_OK(dbfull()->Write(wo
, &empty_batch
));
213 // make sure we can re-open it.
214 ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options
));
215 ASSERT_EQ("bar", Get(1, "foo"));
218 TEST_F(DBTest
, SkipDelay
) {
219 Options options
= CurrentOptions();
221 options
.write_buffer_size
= 100000;
222 CreateAndReopenWithCF({"pikachu"}, options
);
224 for (bool sync
: {true, false}) {
225 for (bool disableWAL
: {true, false}) {
226 // Use a small number to ensure a large delay that is still effective
228 // TODO(myabandeh): this is time dependent and could potentially make
230 auto token
= dbfull()->TEST_write_controler().GetDelayToken(1);
231 std::atomic
<int> sleep_count(0);
232 rocksdb::SyncPoint::GetInstance()->SetCallBack(
233 "DBImpl::DelayWrite:Sleep",
234 [&](void* arg
) { sleep_count
.fetch_add(1); });
235 std::atomic
<int> wait_count(0);
236 rocksdb::SyncPoint::GetInstance()->SetCallBack(
237 "DBImpl::DelayWrite:Wait",
238 [&](void* arg
) { wait_count
.fetch_add(1); });
239 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
243 wo
.disableWAL
= disableWAL
;
244 wo
.no_slowdown
= true;
245 dbfull()->Put(wo
, "foo", "bar");
246 // We need the 2nd write to trigger delay. This is because delay is
247 // estimated based on the last write size which is 0 for the first write.
248 ASSERT_NOK(dbfull()->Put(wo
, "foo2", "bar2"));
249 ASSERT_GE(sleep_count
.load(), 0);
250 ASSERT_GE(wait_count
.load(), 0);
253 token
= dbfull()->TEST_write_controler().GetDelayToken(1000000000);
254 wo
.no_slowdown
= false;
255 ASSERT_OK(dbfull()->Put(wo
, "foo3", "bar3"));
256 ASSERT_GE(sleep_count
.load(), 1);
264 TEST_F(DBTest
, LevelLimitReopen
) {
265 Options options
= CurrentOptions();
266 CreateAndReopenWithCF({"pikachu"}, options
);
268 const std::string
value(1024 * 1024, ' ');
270 while (NumTableFilesAtLevel(2, 1) == 0) {
271 ASSERT_OK(Put(1, Key(i
++), value
));
274 options
.num_levels
= 1;
275 options
.max_bytes_for_level_multiplier_additional
.resize(1, 1);
276 Status s
= TryReopenWithColumnFamilies({"default", "pikachu"}, options
);
277 ASSERT_EQ(s
.IsInvalidArgument(), true);
278 ASSERT_EQ(s
.ToString(),
279 "Invalid argument: db has more levels than options.num_levels");
281 options
.num_levels
= 10;
282 options
.max_bytes_for_level_multiplier_additional
.resize(10, 1);
283 ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options
));
285 #endif // ROCKSDB_LITE
288 TEST_F(DBTest
, PutSingleDeleteGet
) {
290 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
291 ASSERT_OK(Put(1, "foo", "v1"));
292 ASSERT_EQ("v1", Get(1, "foo"));
293 ASSERT_OK(Put(1, "foo2", "v2"));
294 ASSERT_EQ("v2", Get(1, "foo2"));
295 ASSERT_OK(SingleDelete(1, "foo"));
296 ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
297 // Skip HashCuckooRep as it does not support single delete. FIFO and
298 // universal compaction do not apply to the test case. Skip MergePut
299 // because single delete does not get removed when it encounters a merge.
300 } while (ChangeOptions(kSkipHashCuckoo
| kSkipFIFOCompaction
|
301 kSkipUniversalCompaction
| kSkipMergePut
));
304 TEST_F(DBTest
, ReadFromPersistedTier
) {
307 Options options
= CurrentOptions();
308 for (int disableWAL
= 0; disableWAL
<= 1; ++disableWAL
) {
309 CreateAndReopenWithCF({"pikachu"}, options
);
311 wopt
.disableWAL
= (disableWAL
== 1);
312 // 1st round: put but not flush
313 ASSERT_OK(db_
->Put(wopt
, handles_
[1], "foo", "first"));
314 ASSERT_OK(db_
->Put(wopt
, handles_
[1], "bar", "one"));
315 ASSERT_EQ("first", Get(1, "foo"));
316 ASSERT_EQ("one", Get(1, "bar"));
318 // Read directly from persited data.
320 ropt
.read_tier
= kPersistedTier
;
322 if (wopt
.disableWAL
) {
323 // as data has not yet being flushed, we expect not found.
324 ASSERT_TRUE(db_
->Get(ropt
, handles_
[1], "foo", &value
).IsNotFound());
325 ASSERT_TRUE(db_
->Get(ropt
, handles_
[1], "bar", &value
).IsNotFound());
327 ASSERT_OK(db_
->Get(ropt
, handles_
[1], "foo", &value
));
328 ASSERT_OK(db_
->Get(ropt
, handles_
[1], "bar", &value
));
332 std::vector
<ColumnFamilyHandle
*> multiget_cfs
;
333 multiget_cfs
.push_back(handles_
[1]);
334 multiget_cfs
.push_back(handles_
[1]);
335 std::vector
<Slice
> multiget_keys
;
336 multiget_keys
.push_back("foo");
337 multiget_keys
.push_back("bar");
338 std::vector
<std::string
> multiget_values
;
340 db_
->MultiGet(ropt
, multiget_cfs
, multiget_keys
, &multiget_values
);
341 if (wopt
.disableWAL
) {
342 ASSERT_TRUE(statuses
[0].IsNotFound());
343 ASSERT_TRUE(statuses
[1].IsNotFound());
345 ASSERT_OK(statuses
[0]);
346 ASSERT_OK(statuses
[1]);
349 // 2nd round: flush and put a new value in memtable.
351 ASSERT_OK(db_
->Put(wopt
, handles_
[1], "rocksdb", "hello"));
353 // once the data has been flushed, we are able to get the
354 // data when kPersistedTier is used.
355 ASSERT_TRUE(db_
->Get(ropt
, handles_
[1], "foo", &value
).ok());
356 ASSERT_EQ(value
, "first");
357 ASSERT_TRUE(db_
->Get(ropt
, handles_
[1], "bar", &value
).ok());
358 ASSERT_EQ(value
, "one");
359 if (wopt
.disableWAL
) {
361 db_
->Get(ropt
, handles_
[1], "rocksdb", &value
).IsNotFound());
363 ASSERT_OK(db_
->Get(ropt
, handles_
[1], "rocksdb", &value
));
364 ASSERT_EQ(value
, "hello");
367 // Expect same result in multiget
368 multiget_cfs
.push_back(handles_
[1]);
369 multiget_keys
.push_back("rocksdb");
371 db_
->MultiGet(ropt
, multiget_cfs
, multiget_keys
, &multiget_values
);
372 ASSERT_TRUE(statuses
[0].ok());
373 ASSERT_EQ("first", multiget_values
[0]);
374 ASSERT_TRUE(statuses
[1].ok());
375 ASSERT_EQ("one", multiget_values
[1]);
376 if (wopt
.disableWAL
) {
377 ASSERT_TRUE(statuses
[2].IsNotFound());
379 ASSERT_OK(statuses
[2]);
382 // 3rd round: delete and flush
383 ASSERT_OK(db_
->Delete(wopt
, handles_
[1], "foo"));
385 ASSERT_OK(db_
->Delete(wopt
, handles_
[1], "bar"));
387 ASSERT_TRUE(db_
->Get(ropt
, handles_
[1], "foo", &value
).IsNotFound());
388 if (wopt
.disableWAL
) {
389 // Still expect finding the value as its delete has not yet being
391 ASSERT_TRUE(db_
->Get(ropt
, handles_
[1], "bar", &value
).ok());
392 ASSERT_EQ(value
, "one");
394 ASSERT_TRUE(db_
->Get(ropt
, handles_
[1], "bar", &value
).IsNotFound());
396 ASSERT_TRUE(db_
->Get(ropt
, handles_
[1], "rocksdb", &value
).ok());
397 ASSERT_EQ(value
, "hello");
400 db_
->MultiGet(ropt
, multiget_cfs
, multiget_keys
, &multiget_values
);
401 ASSERT_TRUE(statuses
[0].IsNotFound());
402 if (wopt
.disableWAL
) {
403 ASSERT_TRUE(statuses
[1].ok());
404 ASSERT_EQ("one", multiget_values
[1]);
406 ASSERT_TRUE(statuses
[1].IsNotFound());
408 ASSERT_TRUE(statuses
[2].ok());
409 ASSERT_EQ("hello", multiget_values
[2]);
410 if (wopt
.disableWAL
== 0) {
411 DestroyAndReopen(options
);
414 } while (ChangeOptions(kSkipHashCuckoo
));
417 TEST_F(DBTest
, SingleDeleteFlush
) {
418 // Test to check whether flushing preserves a single delete hidden
423 Options options
= CurrentOptions();
424 options
.disable_auto_compactions
= true;
425 CreateAndReopenWithCF({"pikachu"}, options
);
427 // Put values on second level (so that they will not be in the same
428 // compaction as the other operations.
429 Put(1, "foo", "first");
430 Put(1, "bar", "one");
432 MoveFilesToLevel(2, 1);
434 // (Single) delete hidden by a put
435 SingleDelete(1, "foo");
436 Put(1, "foo", "second");
438 Put(1, "bar", "two");
441 SingleDelete(1, "foo");
445 dbfull()->CompactRange(CompactRangeOptions(), handles_
[1], nullptr,
448 ASSERT_EQ("NOT_FOUND", Get(1, "bar"));
449 ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
450 // Skip HashCuckooRep as it does not support single delete. FIFO and
451 // universal compaction do not apply to the test case. Skip MergePut
452 // because merges cannot be combined with single deletions.
453 } while (ChangeOptions(kSkipHashCuckoo
| kSkipFIFOCompaction
|
454 kSkipUniversalCompaction
| kSkipMergePut
));
457 TEST_F(DBTest
, SingleDeletePutFlush
) {
458 // Single deletes that encounter the matching put in a flush should get
463 Options options
= CurrentOptions();
464 options
.disable_auto_compactions
= true;
465 CreateAndReopenWithCF({"pikachu"}, options
);
467 Put(1, "foo", Slice());
468 Put(1, "a", Slice());
469 SingleDelete(1, "a");
472 ASSERT_EQ("[ ]", AllEntriesFor("a", 1));
473 // Skip HashCuckooRep as it does not support single delete. FIFO and
474 // universal compaction do not apply to the test case. Skip MergePut
475 // because merges cannot be combined with single deletions.
476 } while (ChangeOptions(kSkipHashCuckoo
| kSkipFIFOCompaction
|
477 kSkipUniversalCompaction
| kSkipMergePut
));
480 // Disable because not all platform can run it.
481 // It requires more than 9GB memory to run it, With single allocation
483 TEST_F(DBTest
, DISABLED_VeryLargeValue
) {
484 const size_t kValueSize
= 3221225472u; // 3GB value
485 const size_t kKeySize
= 8388608u; // 8MB key
486 std::string
raw(kValueSize
, 'v');
487 std::string
key1(kKeySize
, 'c');
488 std::string
key2(kKeySize
, 'd');
490 Options options
= CurrentOptions();
492 options
.write_buffer_size
= 100000; // Small write buffer
493 options
.paranoid_checks
= true;
494 DestroyAndReopen(options
);
496 ASSERT_OK(Put("boo", "v1"));
497 ASSERT_OK(Put("foo", "v1"));
498 ASSERT_OK(Put(key1
, raw
));
500 ASSERT_OK(Put(key2
, raw
));
501 dbfull()->TEST_WaitForFlushMemTable();
503 ASSERT_EQ(1, NumTableFilesAtLevel(0));
506 Status s
= db_
->Get(ReadOptions(), key1
, &value
);
508 ASSERT_EQ(kValueSize
, value
.size());
509 ASSERT_EQ('v', value
[0]);
511 s
= db_
->Get(ReadOptions(), key2
, &value
);
513 ASSERT_EQ(kValueSize
, value
.size());
514 ASSERT_EQ('w', value
[0]);
516 // Compact all files.
518 db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr);
520 // Check DB is not in read-only state.
521 ASSERT_OK(Put("boo", "v1"));
523 s
= db_
->Get(ReadOptions(), key1
, &value
);
525 ASSERT_EQ(kValueSize
, value
.size());
526 ASSERT_EQ('v', value
[0]);
528 s
= db_
->Get(ReadOptions(), key2
, &value
);
530 ASSERT_EQ(kValueSize
, value
.size());
531 ASSERT_EQ('w', value
[0]);
534 TEST_F(DBTest
, GetFromImmutableLayer
) {
536 Options options
= CurrentOptions();
538 CreateAndReopenWithCF({"pikachu"}, options
);
540 ASSERT_OK(Put(1, "foo", "v1"));
541 ASSERT_EQ("v1", Get(1, "foo"));
544 env_
->delay_sstable_sync_
.store(true, std::memory_order_release
);
545 Put(1, "k1", std::string(100000, 'x')); // Fill memtable
546 Put(1, "k2", std::string(100000, 'y')); // Trigger flush
547 ASSERT_EQ("v1", Get(1, "foo"));
548 ASSERT_EQ("NOT_FOUND", Get(0, "foo"));
549 // Release sync calls
550 env_
->delay_sstable_sync_
.store(false, std::memory_order_release
);
551 } while (ChangeOptions());
555 TEST_F(DBTest
, GetLevel0Ordering
) {
557 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
558 // Check that we process level-0 files in correct order. The code
559 // below generates two level-0 files where the earlier one comes
560 // before the later one in the level-0 file list since the earlier
561 // one has a smaller "smallest" key.
562 ASSERT_OK(Put(1, "bar", "b"));
563 ASSERT_OK(Put(1, "foo", "v1"));
565 ASSERT_OK(Put(1, "foo", "v2"));
567 ASSERT_EQ("v2", Get(1, "foo"));
568 } while (ChangeOptions());
571 TEST_F(DBTest
, WrongLevel0Config
) {
572 Options options
= CurrentOptions();
574 ASSERT_OK(DestroyDB(dbname_
, options
));
575 options
.level0_stop_writes_trigger
= 1;
576 options
.level0_slowdown_writes_trigger
= 2;
577 options
.level0_file_num_compaction_trigger
= 3;
578 ASSERT_OK(DB::Open(options
, dbname_
, &db_
));
582 TEST_F(DBTest
, GetOrderedByLevels
) {
584 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
585 ASSERT_OK(Put(1, "foo", "v1"));
586 Compact(1, "a", "z");
587 ASSERT_EQ("v1", Get(1, "foo"));
588 ASSERT_OK(Put(1, "foo", "v2"));
589 ASSERT_EQ("v2", Get(1, "foo"));
591 ASSERT_EQ("v2", Get(1, "foo"));
592 } while (ChangeOptions());
595 TEST_F(DBTest
, GetPicksCorrectFile
) {
597 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
598 // Arrange to have multiple files in a non-level-0 level.
599 ASSERT_OK(Put(1, "a", "va"));
600 Compact(1, "a", "b");
601 ASSERT_OK(Put(1, "x", "vx"));
602 Compact(1, "x", "y");
603 ASSERT_OK(Put(1, "f", "vf"));
604 Compact(1, "f", "g");
605 ASSERT_EQ("va", Get(1, "a"));
606 ASSERT_EQ("vf", Get(1, "f"));
607 ASSERT_EQ("vx", Get(1, "x"));
608 } while (ChangeOptions());
611 TEST_F(DBTest
, GetEncountersEmptyLevel
) {
613 Options options
= CurrentOptions();
614 CreateAndReopenWithCF({"pikachu"}, options
);
615 // Arrange for the following to happen:
616 // * sstable A in level 0
617 // * nothing in level 1
618 // * sstable B in level 2
619 // Then do enough Get() calls to arrange for an automatic compaction
620 // of sstable A. A bug would cause the compaction to be marked as
621 // occurring at level 1 (instead of the correct level 0).
623 // Step 1: First place sstables in levels 0 and 2
624 Put(1, "a", "begin");
627 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_
[1]);
628 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_
[1]);
629 Put(1, "a", "begin");
632 ASSERT_GT(NumTableFilesAtLevel(0, 1), 0);
633 ASSERT_GT(NumTableFilesAtLevel(2, 1), 0);
635 // Step 2: clear level 1 if necessary.
636 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_
[1]);
637 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 1);
638 ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
639 ASSERT_EQ(NumTableFilesAtLevel(2, 1), 1);
641 // Step 3: read a bunch of times
642 for (int i
= 0; i
< 1000; i
++) {
643 ASSERT_EQ("NOT_FOUND", Get(1, "missing"));
646 // Step 4: Wait for compaction to finish
647 dbfull()->TEST_WaitForCompact();
649 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 1); // XXX
650 } while (ChangeOptions(kSkipUniversalCompaction
| kSkipFIFOCompaction
));
652 #endif // ROCKSDB_LITE
654 TEST_F(DBTest
, FlushMultipleMemtable
) {
656 Options options
= CurrentOptions();
657 WriteOptions writeOpt
= WriteOptions();
658 writeOpt
.disableWAL
= true;
659 options
.max_write_buffer_number
= 4;
660 options
.min_write_buffer_number_to_merge
= 3;
661 options
.max_write_buffer_number_to_maintain
= -1;
662 CreateAndReopenWithCF({"pikachu"}, options
);
663 ASSERT_OK(dbfull()->Put(writeOpt
, handles_
[1], "foo", "v1"));
665 ASSERT_OK(dbfull()->Put(writeOpt
, handles_
[1], "bar", "v1"));
667 ASSERT_EQ("v1", Get(1, "foo"));
668 ASSERT_EQ("v1", Get(1, "bar"));
670 } while (ChangeCompactOptions());
673 TEST_F(DBTest
, FlushSchedule
) {
674 Options options
= CurrentOptions();
675 options
.disable_auto_compactions
= true;
676 options
.level0_stop_writes_trigger
= 1 << 10;
677 options
.level0_slowdown_writes_trigger
= 1 << 10;
678 options
.min_write_buffer_number_to_merge
= 1;
679 options
.max_write_buffer_number_to_maintain
= 1;
680 options
.max_write_buffer_number
= 2;
681 options
.write_buffer_size
= 120 * 1024;
682 CreateAndReopenWithCF({"pikachu"}, options
);
683 std::vector
<port::Thread
> threads
;
685 std::atomic
<int> thread_num(0);
686 // each column family will have 5 thread, each thread generating 2 memtables.
687 // each column family should end up with 10 table files
688 std::function
<void()> fill_memtable_func
= [&]() {
689 int a
= thread_num
.fetch_add(1);
692 // this should fill up 2 memtables
693 for (int k
= 0; k
< 5000; ++k
) {
694 ASSERT_OK(db_
->Put(wo
, handles_
[a
& 1], RandomString(&rnd
, 13), ""));
698 for (int i
= 0; i
< 10; ++i
) {
699 threads
.emplace_back(fill_memtable_func
);
702 for (auto& t
: threads
) {
706 auto default_tables
= GetNumberOfSstFilesForColumnFamily(db_
, "default");
707 auto pikachu_tables
= GetNumberOfSstFilesForColumnFamily(db_
, "pikachu");
708 ASSERT_LE(default_tables
, static_cast<uint64_t>(10));
709 ASSERT_GT(default_tables
, static_cast<uint64_t>(0));
710 ASSERT_LE(pikachu_tables
, static_cast<uint64_t>(10));
711 ASSERT_GT(pikachu_tables
, static_cast<uint64_t>(0));
713 #endif // ROCKSDB_LITE
716 class KeepFilter
: public CompactionFilter
{
718 virtual bool Filter(int level
, const Slice
& key
, const Slice
& value
,
719 std::string
* new_value
,
720 bool* value_changed
) const override
{
724 virtual const char* Name() const override
{ return "KeepFilter"; }
727 class KeepFilterFactory
: public CompactionFilterFactory
{
729 explicit KeepFilterFactory(bool check_context
= false)
730 : check_context_(check_context
) {}
732 virtual std::unique_ptr
<CompactionFilter
> CreateCompactionFilter(
733 const CompactionFilter::Context
& context
) override
{
734 if (check_context_
) {
735 EXPECT_EQ(expect_full_compaction_
.load(), context
.is_full_compaction
);
736 EXPECT_EQ(expect_manual_compaction_
.load(), context
.is_manual_compaction
);
738 return std::unique_ptr
<CompactionFilter
>(new KeepFilter());
741 virtual const char* Name() const override
{ return "KeepFilterFactory"; }
743 std::atomic_bool expect_full_compaction_
;
744 std::atomic_bool expect_manual_compaction_
;
747 class DelayFilter
: public CompactionFilter
{
749 explicit DelayFilter(DBTestBase
* d
) : db_test(d
) {}
750 virtual bool Filter(int level
, const Slice
& key
, const Slice
& value
,
751 std::string
* new_value
,
752 bool* value_changed
) const override
{
753 db_test
->env_
->addon_time_
.fetch_add(1000);
757 virtual const char* Name() const override
{ return "DelayFilter"; }
763 class DelayFilterFactory
: public CompactionFilterFactory
{
765 explicit DelayFilterFactory(DBTestBase
* d
) : db_test(d
) {}
766 virtual std::unique_ptr
<CompactionFilter
> CreateCompactionFilter(
767 const CompactionFilter::Context
& context
) override
{
768 return std::unique_ptr
<CompactionFilter
>(new DelayFilter(db_test
));
771 virtual const char* Name() const override
{ return "DelayFilterFactory"; }
780 static std::string
CompressibleString(Random
* rnd
, int len
) {
782 test::CompressibleString(rnd
, 0.8, len
, &r
);
785 #endif // ROCKSDB_LITE
787 TEST_F(DBTest
, FailMoreDbPaths
) {
788 Options options
= CurrentOptions();
789 options
.db_paths
.emplace_back(dbname_
, 10000000);
790 options
.db_paths
.emplace_back(dbname_
+ "_2", 1000000);
791 options
.db_paths
.emplace_back(dbname_
+ "_3", 1000000);
792 options
.db_paths
.emplace_back(dbname_
+ "_4", 1000000);
793 options
.db_paths
.emplace_back(dbname_
+ "_5", 1000000);
794 ASSERT_TRUE(TryReopen(options
).IsNotSupported());
797 void CheckColumnFamilyMeta(const ColumnFamilyMetaData
& cf_meta
) {
798 uint64_t cf_size
= 0;
799 uint64_t cf_csize
= 0;
800 size_t file_count
= 0;
801 for (auto level_meta
: cf_meta
.levels
) {
802 uint64_t level_size
= 0;
803 uint64_t level_csize
= 0;
804 file_count
+= level_meta
.files
.size();
805 for (auto file_meta
: level_meta
.files
) {
806 level_size
+= file_meta
.size
;
808 ASSERT_EQ(level_meta
.size
, level_size
);
809 cf_size
+= level_size
;
810 cf_csize
+= level_csize
;
812 ASSERT_EQ(cf_meta
.file_count
, file_count
);
813 ASSERT_EQ(cf_meta
.size
, cf_size
);
817 TEST_F(DBTest
, ColumnFamilyMetaDataTest
) {
818 Options options
= CurrentOptions();
819 options
.create_if_missing
= true;
820 DestroyAndReopen(options
);
824 ColumnFamilyMetaData cf_meta
;
825 for (int i
= 0; i
< 100; ++i
) {
826 GenerateNewFile(&rnd
, &key_index
);
827 db_
->GetColumnFamilyMetaData(&cf_meta
);
828 CheckColumnFamilyMeta(cf_meta
);
833 void MinLevelHelper(DBTest
* self
, Options
& options
) {
836 for (int num
= 0; num
< options
.level0_file_num_compaction_trigger
- 1;
838 std::vector
<std::string
> values
;
839 // Write 120KB (12 values, each 10K)
840 for (int i
= 0; i
< 12; i
++) {
841 values
.push_back(DBTestBase::RandomString(&rnd
, 10000));
842 ASSERT_OK(self
->Put(DBTestBase::Key(i
), values
[i
]));
844 self
->dbfull()->TEST_WaitForFlushMemTable();
845 ASSERT_EQ(self
->NumTableFilesAtLevel(0), num
+ 1);
848 // generate one more file in level-0, and should trigger level-0 compaction
849 std::vector
<std::string
> values
;
850 for (int i
= 0; i
< 12; i
++) {
851 values
.push_back(DBTestBase::RandomString(&rnd
, 10000));
852 ASSERT_OK(self
->Put(DBTestBase::Key(i
), values
[i
]));
854 self
->dbfull()->TEST_WaitForCompact();
856 ASSERT_EQ(self
->NumTableFilesAtLevel(0), 0);
857 ASSERT_EQ(self
->NumTableFilesAtLevel(1), 1);
860 // returns false if the calling-Test should be skipped
861 bool MinLevelToCompress(CompressionType
& type
, Options
& options
, int wbits
,
862 int lev
, int strategy
) {
864 "Test with compression options : window_bits = %d, level = %d, "
866 wbits
, lev
, strategy
);
867 options
.write_buffer_size
= 100 << 10; // 100KB
868 options
.arena_block_size
= 4096;
869 options
.num_levels
= 3;
870 options
.level0_file_num_compaction_trigger
= 3;
871 options
.create_if_missing
= true;
873 if (Snappy_Supported()) {
874 type
= kSnappyCompression
;
875 fprintf(stderr
, "using snappy\n");
876 } else if (Zlib_Supported()) {
877 type
= kZlibCompression
;
878 fprintf(stderr
, "using zlib\n");
879 } else if (BZip2_Supported()) {
880 type
= kBZip2Compression
;
881 fprintf(stderr
, "using bzip2\n");
882 } else if (LZ4_Supported()) {
883 type
= kLZ4Compression
;
884 fprintf(stderr
, "using lz4\n");
885 } else if (XPRESS_Supported()) {
886 type
= kXpressCompression
;
887 fprintf(stderr
, "using xpress\n");
888 } else if (ZSTD_Supported()) {
890 fprintf(stderr
, "using ZSTD\n");
892 fprintf(stderr
, "skipping test, compression disabled\n");
895 options
.compression_per_level
.resize(options
.num_levels
);
897 // do not compress L0
898 for (int i
= 0; i
< 1; i
++) {
899 options
.compression_per_level
[i
] = kNoCompression
;
901 for (int i
= 1; i
< options
.num_levels
; i
++) {
902 options
.compression_per_level
[i
] = type
;
908 TEST_F(DBTest
, MinLevelToCompress1
) {
909 Options options
= CurrentOptions();
910 CompressionType type
= kSnappyCompression
;
911 if (!MinLevelToCompress(type
, options
, -14, -1, 0)) {
915 MinLevelHelper(this, options
);
917 // do not compress L0 and L1
918 for (int i
= 0; i
< 2; i
++) {
919 options
.compression_per_level
[i
] = kNoCompression
;
921 for (int i
= 2; i
< options
.num_levels
; i
++) {
922 options
.compression_per_level
[i
] = type
;
924 DestroyAndReopen(options
);
925 MinLevelHelper(this, options
);
928 TEST_F(DBTest
, MinLevelToCompress2
) {
929 Options options
= CurrentOptions();
930 CompressionType type
= kSnappyCompression
;
931 if (!MinLevelToCompress(type
, options
, 15, -1, 0)) {
935 MinLevelHelper(this, options
);
937 // do not compress L0 and L1
938 for (int i
= 0; i
< 2; i
++) {
939 options
.compression_per_level
[i
] = kNoCompression
;
941 for (int i
= 2; i
< options
.num_levels
; i
++) {
942 options
.compression_per_level
[i
] = type
;
944 DestroyAndReopen(options
);
945 MinLevelHelper(this, options
);
948 // This test may fail because of a legit case that multiple L0 files
949 // are trivial moved to L1.
950 TEST_F(DBTest
, DISABLED_RepeatedWritesToSameKey
) {
952 Options options
= CurrentOptions();
954 options
.write_buffer_size
= 100000; // Small write buffer
955 CreateAndReopenWithCF({"pikachu"}, options
);
957 // We must have at most one file per level except for level-0,
958 // which may have up to kL0_StopWritesTrigger files.
959 const int kMaxFiles
=
960 options
.num_levels
+ options
.level0_stop_writes_trigger
;
964 RandomString(&rnd
, static_cast<int>(2 * options
.write_buffer_size
));
965 for (int i
= 0; i
< 5 * kMaxFiles
; i
++) {
966 ASSERT_OK(Put(1, "key", value
));
967 ASSERT_LE(TotalTableFiles(1), kMaxFiles
);
969 } while (ChangeCompactOptions());
971 #endif // ROCKSDB_LITE
973 TEST_F(DBTest
, SparseMerge
) {
975 Options options
= CurrentOptions();
976 options
.compression
= kNoCompression
;
977 CreateAndReopenWithCF({"pikachu"}, options
);
979 FillLevels("A", "Z", 1);
982 // small amount of data with prefix A
983 // large amount of data with prefix B
984 // small amount of data with prefix C
985 // and that recent updates have made small changes to all three prefixes.
986 // Check that we do not do a compaction that merges all of B in one shot.
987 const std::string
value(1000, 'x');
989 // Write approximately 100MB of "B" values
990 for (int i
= 0; i
< 100000; i
++) {
992 snprintf(key
, sizeof(key
), "B%010d", i
);
997 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_
[1]);
999 // Make sparse update
1001 Put(1, "B100", "bvalue2");
1003 ASSERT_OK(Flush(1));
1005 // Compactions should not cause us to create a situation where
1006 // a file overlaps too much data at the next level.
1007 ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(handles_
[1]),
1009 dbfull()->TEST_CompactRange(0, nullptr, nullptr);
1010 ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(handles_
[1]),
1012 dbfull()->TEST_CompactRange(1, nullptr, nullptr);
1013 ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(handles_
[1]),
1015 } while (ChangeCompactOptions());
1018 #ifndef ROCKSDB_LITE
1019 static bool Between(uint64_t val
, uint64_t low
, uint64_t high
) {
1020 bool result
= (val
>= low
) && (val
<= high
);
1022 fprintf(stderr
, "Value %llu is not in range [%llu, %llu]\n",
1023 (unsigned long long)(val
), (unsigned long long)(low
),
1024 (unsigned long long)(high
));
1029 TEST_F(DBTest
, ApproximateSizesMemTable
) {
1030 Options options
= CurrentOptions();
1031 options
.write_buffer_size
= 100000000; // Large write buffer
1032 options
.compression
= kNoCompression
;
1033 options
.create_if_missing
= true;
1034 DestroyAndReopen(options
);
1038 for (int i
= 0; i
< N
; i
++) {
1039 ASSERT_OK(Put(Key(i
), RandomString(&rnd
, 1024)));
1043 std::string start
= Key(50);
1044 std::string end
= Key(60);
1045 Range
r(start
, end
);
1046 uint8_t include_both
= DB::SizeApproximationFlags::INCLUDE_FILES
|
1047 DB::SizeApproximationFlags::INCLUDE_MEMTABLES
;
1048 db_
->GetApproximateSizes(&r
, 1, &size
, include_both
);
1049 ASSERT_GT(size
, 6000);
1050 ASSERT_LT(size
, 204800);
1051 // Zero if not including mem table
1052 db_
->GetApproximateSizes(&r
, 1, &size
);
1057 r
= Range(start
, end
);
1058 db_
->GetApproximateSizes(&r
, 1, &size
, include_both
);
1061 for (int i
= 0; i
< N
; i
++) {
1062 ASSERT_OK(Put(Key(1000 + i
), RandomString(&rnd
, 1024)));
1067 r
= Range(start
, end
);
1068 db_
->GetApproximateSizes(&r
, 1, &size
, include_both
);
1073 r
= Range(start
, end
);
1074 db_
->GetApproximateSizes(&r
, 1, &size
, include_both
);
1075 ASSERT_GT(size
, 6000);
1077 options
.max_write_buffer_number
= 8;
1078 options
.min_write_buffer_number_to_merge
= 5;
1079 options
.write_buffer_size
= 1024 * N
; // Not very large
1080 DestroyAndReopen(options
);
1083 for (int i
= 0; i
< N
; i
++) {
1084 keys
[i
* 3] = i
* 5;
1085 keys
[i
* 3 + 1] = i
* 5 + 1;
1086 keys
[i
* 3 + 2] = i
* 5 + 2;
1088 std::random_shuffle(std::begin(keys
), std::end(keys
));
1090 for (int i
= 0; i
< N
* 3; i
++) {
1091 ASSERT_OK(Put(Key(keys
[i
] + 1000), RandomString(&rnd
, 1024)));
1096 r
= Range(start
, end
);
1097 db_
->GetApproximateSizes(&r
, 1, &size
, include_both
);
1102 r
= Range(start
, end
);
1103 db_
->GetApproximateSizes(&r
, 1, &size
, include_both
);
1104 ASSERT_GT(size
, 6000);
1108 r
= Range(start
, end
);
1109 db_
->GetApproximateSizes(&r
, 1, &size
, include_both
);
1114 r
= Range(start
, end
);
1115 uint64_t size_with_mt
, size_without_mt
;
1116 db_
->GetApproximateSizes(&r
, 1, &size_with_mt
, include_both
);
1117 ASSERT_GT(size_with_mt
, 6000);
1118 db_
->GetApproximateSizes(&r
, 1, &size_without_mt
);
1119 ASSERT_EQ(size_without_mt
, 0);
1123 for (int i
= 0; i
< N
; i
++) {
1124 ASSERT_OK(Put(Key(i
+ 1000), RandomString(&rnd
, 1024)));
1129 r
= Range(start
, end
);
1130 db_
->GetApproximateSizes(&r
, 1, &size_with_mt
, include_both
);
1131 db_
->GetApproximateSizes(&r
, 1, &size_without_mt
);
1132 ASSERT_GT(size_with_mt
, size_without_mt
);
1133 ASSERT_GT(size_without_mt
, 6000);
1136 TEST_F(DBTest
, GetApproximateMemTableStats
) {
1137 Options options
= CurrentOptions();
1138 options
.write_buffer_size
= 100000000;
1139 options
.compression
= kNoCompression
;
1140 options
.create_if_missing
= true;
1141 DestroyAndReopen(options
);
1145 for (int i
= 0; i
< N
; i
++) {
1146 ASSERT_OK(Put(Key(i
), RandomString(&rnd
, 1024)));
1152 std::string start
= Key(50);
1153 std::string end
= Key(60);
1154 Range
r(start
, end
);
1155 db_
->GetApproximateMemTableStats(r
, &count
, &size
);
1156 ASSERT_GT(count
, 0);
1157 ASSERT_LE(count
, N
);
1158 ASSERT_GT(size
, 6000);
1159 ASSERT_LT(size
, 204800);
1163 r
= Range(start
, end
);
1164 db_
->GetApproximateMemTableStats(r
, &count
, &size
);
1165 ASSERT_EQ(count
, 0);
1172 r
= Range(start
, end
);
1173 db_
->GetApproximateMemTableStats(r
, &count
, &size
);
1174 ASSERT_EQ(count
, 0);
1177 for (int i
= 0; i
< N
; i
++) {
1178 ASSERT_OK(Put(Key(1000 + i
), RandomString(&rnd
, 1024)));
1183 r
= Range(start
, end
);
1184 db_
->GetApproximateMemTableStats(r
, &count
, &size
);
1185 ASSERT_GT(count
, 20);
1186 ASSERT_GT(size
, 6000);
1189 TEST_F(DBTest
, ApproximateSizes
) {
1191 Options options
= CurrentOptions();
1192 options
.write_buffer_size
= 100000000; // Large write buffer
1193 options
.compression
= kNoCompression
;
1194 options
.create_if_missing
= true;
1195 DestroyAndReopen(options
);
1196 CreateAndReopenWithCF({"pikachu"}, options
);
1198 ASSERT_TRUE(Between(Size("", "xyz", 1), 0, 0));
1199 ReopenWithColumnFamilies({"default", "pikachu"}, options
);
1200 ASSERT_TRUE(Between(Size("", "xyz", 1), 0, 0));
1202 // Write 8MB (80 values, each 100K)
1203 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
1205 static const int S1
= 100000;
1206 static const int S2
= 105000; // Allow some expansion from metadata
1208 for (int i
= 0; i
< N
; i
++) {
1209 ASSERT_OK(Put(1, Key(i
), RandomString(&rnd
, S1
)));
1212 // 0 because GetApproximateSizes() does not account for memtable space
1213 ASSERT_TRUE(Between(Size("", Key(50), 1), 0, 0));
1215 // Check sizes across recovery by reopening a few times
1216 for (int run
= 0; run
< 3; run
++) {
1217 ReopenWithColumnFamilies({"default", "pikachu"}, options
);
1219 for (int compact_start
= 0; compact_start
< N
; compact_start
+= 10) {
1220 for (int i
= 0; i
< N
; i
+= 10) {
1221 ASSERT_TRUE(Between(Size("", Key(i
), 1), S1
* i
, S2
* i
));
1222 ASSERT_TRUE(Between(Size("", Key(i
) + ".suffix", 1), S1
* (i
+ 1),
1224 ASSERT_TRUE(Between(Size(Key(i
), Key(i
+ 10), 1), S1
* 10, S2
* 10));
1226 ASSERT_TRUE(Between(Size("", Key(50), 1), S1
* 50, S2
* 50));
1228 Between(Size("", Key(50) + ".suffix", 1), S1
* 50, S2
* 50));
1230 std::string cstart_str
= Key(compact_start
);
1231 std::string cend_str
= Key(compact_start
+ 9);
1232 Slice cstart
= cstart_str
;
1233 Slice cend
= cend_str
;
1234 dbfull()->TEST_CompactRange(0, &cstart
, &cend
, handles_
[1]);
1237 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
1238 ASSERT_GT(NumTableFilesAtLevel(1, 1), 0);
1240 // ApproximateOffsetOf() is not yet implemented in plain table format.
1241 } while (ChangeOptions(kSkipUniversalCompaction
| kSkipFIFOCompaction
|
1242 kSkipPlainTable
| kSkipHashIndex
));
1245 TEST_F(DBTest
, ApproximateSizes_MixOfSmallAndLarge
) {
1247 Options options
= CurrentOptions();
1248 options
.compression
= kNoCompression
;
1249 CreateAndReopenWithCF({"pikachu"}, options
);
1252 std::string big1
= RandomString(&rnd
, 100000);
1253 ASSERT_OK(Put(1, Key(0), RandomString(&rnd
, 10000)));
1254 ASSERT_OK(Put(1, Key(1), RandomString(&rnd
, 10000)));
1255 ASSERT_OK(Put(1, Key(2), big1
));
1256 ASSERT_OK(Put(1, Key(3), RandomString(&rnd
, 10000)));
1257 ASSERT_OK(Put(1, Key(4), big1
));
1258 ASSERT_OK(Put(1, Key(5), RandomString(&rnd
, 10000)));
1259 ASSERT_OK(Put(1, Key(6), RandomString(&rnd
, 300000)));
1260 ASSERT_OK(Put(1, Key(7), RandomString(&rnd
, 10000)));
1262 // Check sizes across recovery by reopening a few times
1263 for (int run
= 0; run
< 3; run
++) {
1264 ReopenWithColumnFamilies({"default", "pikachu"}, options
);
1266 ASSERT_TRUE(Between(Size("", Key(0), 1), 0, 0));
1267 ASSERT_TRUE(Between(Size("", Key(1), 1), 10000, 11000));
1268 ASSERT_TRUE(Between(Size("", Key(2), 1), 20000, 21000));
1269 ASSERT_TRUE(Between(Size("", Key(3), 1), 120000, 121000));
1270 ASSERT_TRUE(Between(Size("", Key(4), 1), 130000, 131000));
1271 ASSERT_TRUE(Between(Size("", Key(5), 1), 230000, 231000));
1272 ASSERT_TRUE(Between(Size("", Key(6), 1), 240000, 241000));
1273 ASSERT_TRUE(Between(Size("", Key(7), 1), 540000, 541000));
1274 ASSERT_TRUE(Between(Size("", Key(8), 1), 550000, 560000));
1276 ASSERT_TRUE(Between(Size(Key(3), Key(5), 1), 110000, 111000));
1278 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_
[1]);
1280 // ApproximateOffsetOf() is not yet implemented in plain table format.
1281 } while (ChangeOptions(kSkipPlainTable
));
1283 #endif // ROCKSDB_LITE
1285 #ifndef ROCKSDB_LITE
1286 TEST_F(DBTest
, Snapshot
) {
1287 anon::OptionsOverride options_override
;
1288 options_override
.skip_policy
= kSkipNoSnapshot
;
1290 CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override
));
1291 Put(0, "foo", "0v1");
1292 Put(1, "foo", "1v1");
1294 const Snapshot
* s1
= db_
->GetSnapshot();
1295 ASSERT_EQ(1U, GetNumSnapshots());
1296 uint64_t time_snap1
= GetTimeOldestSnapshots();
1297 ASSERT_GT(time_snap1
, 0U);
1298 Put(0, "foo", "0v2");
1299 Put(1, "foo", "1v2");
1301 env_
->addon_time_
.fetch_add(1);
1303 const Snapshot
* s2
= db_
->GetSnapshot();
1304 ASSERT_EQ(2U, GetNumSnapshots());
1305 ASSERT_EQ(time_snap1
, GetTimeOldestSnapshots());
1306 Put(0, "foo", "0v3");
1307 Put(1, "foo", "1v3");
1310 ManagedSnapshot
s3(db_
);
1311 ASSERT_EQ(3U, GetNumSnapshots());
1312 ASSERT_EQ(time_snap1
, GetTimeOldestSnapshots());
1314 Put(0, "foo", "0v4");
1315 Put(1, "foo", "1v4");
1316 ASSERT_EQ("0v1", Get(0, "foo", s1
));
1317 ASSERT_EQ("1v1", Get(1, "foo", s1
));
1318 ASSERT_EQ("0v2", Get(0, "foo", s2
));
1319 ASSERT_EQ("1v2", Get(1, "foo", s2
));
1320 ASSERT_EQ("0v3", Get(0, "foo", s3
.snapshot()));
1321 ASSERT_EQ("1v3", Get(1, "foo", s3
.snapshot()));
1322 ASSERT_EQ("0v4", Get(0, "foo"));
1323 ASSERT_EQ("1v4", Get(1, "foo"));
1326 ASSERT_EQ(2U, GetNumSnapshots());
1327 ASSERT_EQ(time_snap1
, GetTimeOldestSnapshots());
1328 ASSERT_EQ("0v1", Get(0, "foo", s1
));
1329 ASSERT_EQ("1v1", Get(1, "foo", s1
));
1330 ASSERT_EQ("0v2", Get(0, "foo", s2
));
1331 ASSERT_EQ("1v2", Get(1, "foo", s2
));
1332 ASSERT_EQ("0v4", Get(0, "foo"));
1333 ASSERT_EQ("1v4", Get(1, "foo"));
1335 db_
->ReleaseSnapshot(s1
);
1336 ASSERT_EQ("0v2", Get(0, "foo", s2
));
1337 ASSERT_EQ("1v2", Get(1, "foo", s2
));
1338 ASSERT_EQ("0v4", Get(0, "foo"));
1339 ASSERT_EQ("1v4", Get(1, "foo"));
1340 ASSERT_EQ(1U, GetNumSnapshots());
1341 ASSERT_LT(time_snap1
, GetTimeOldestSnapshots());
1343 db_
->ReleaseSnapshot(s2
);
1344 ASSERT_EQ(0U, GetNumSnapshots());
1345 ASSERT_EQ("0v4", Get(0, "foo"));
1346 ASSERT_EQ("1v4", Get(1, "foo"));
1347 } while (ChangeOptions(kSkipHashCuckoo
));
1350 TEST_F(DBTest
, HiddenValuesAreRemoved
) {
1351 anon::OptionsOverride options_override
;
1352 options_override
.skip_policy
= kSkipNoSnapshot
;
1354 Options options
= CurrentOptions(options_override
);
1355 CreateAndReopenWithCF({"pikachu"}, options
);
1357 FillLevels("a", "z", 1);
1359 std::string big
= RandomString(&rnd
, 50000);
1361 Put(1, "pastfoo", "v");
1362 const Snapshot
* snapshot
= db_
->GetSnapshot();
1363 Put(1, "foo", "tiny");
1364 Put(1, "pastfoo2", "v2"); // Advance sequence number one more
1366 ASSERT_OK(Flush(1));
1367 ASSERT_GT(NumTableFilesAtLevel(0, 1), 0);
1369 ASSERT_EQ(big
, Get(1, "foo", snapshot
));
1370 ASSERT_TRUE(Between(Size("", "pastfoo", 1), 50000, 60000));
1371 db_
->ReleaseSnapshot(snapshot
);
1372 ASSERT_EQ(AllEntriesFor("foo", 1), "[ tiny, " + big
+ " ]");
1374 dbfull()->TEST_CompactRange(0, nullptr, &x
, handles_
[1]);
1375 ASSERT_EQ(AllEntriesFor("foo", 1), "[ tiny ]");
1376 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
1377 ASSERT_GE(NumTableFilesAtLevel(1, 1), 1);
1378 dbfull()->TEST_CompactRange(1, nullptr, &x
, handles_
[1]);
1379 ASSERT_EQ(AllEntriesFor("foo", 1), "[ tiny ]");
1381 ASSERT_TRUE(Between(Size("", "pastfoo", 1), 0, 1000));
1382 // ApproximateOffsetOf() is not yet implemented in plain table format,
1383 // which is used by Size().
1384 // skip HashCuckooRep as it does not support snapshot
1385 } while (ChangeOptions(kSkipUniversalCompaction
| kSkipFIFOCompaction
|
1386 kSkipPlainTable
| kSkipHashCuckoo
));
1388 #endif // ROCKSDB_LITE
1390 TEST_F(DBTest
, UnremovableSingleDelete
) {
1393 // Put(A, v1) Snapshot SingleDelete(A) Put(A, v2)
1395 // We do not want to end up with:
1397 // Put(A, v1) Snapshot Put(A, v2)
1399 // Because a subsequent SingleDelete(A) would delete the Put(A, v2)
1400 // but not Put(A, v1), so Get(A) would return v1.
1401 anon::OptionsOverride options_override
;
1402 options_override
.skip_policy
= kSkipNoSnapshot
;
1404 Options options
= CurrentOptions(options_override
);
1405 options
.disable_auto_compactions
= true;
1406 CreateAndReopenWithCF({"pikachu"}, options
);
1408 Put(1, "foo", "first");
1409 const Snapshot
* snapshot
= db_
->GetSnapshot();
1410 SingleDelete(1, "foo");
1411 Put(1, "foo", "second");
1412 ASSERT_OK(Flush(1));
1414 ASSERT_EQ("first", Get(1, "foo", snapshot
));
1415 ASSERT_EQ("second", Get(1, "foo"));
1417 dbfull()->CompactRange(CompactRangeOptions(), handles_
[1], nullptr,
1419 ASSERT_EQ("[ second, SDEL, first ]", AllEntriesFor("foo", 1));
1421 SingleDelete(1, "foo");
1423 ASSERT_EQ("first", Get(1, "foo", snapshot
));
1424 ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
1426 dbfull()->CompactRange(CompactRangeOptions(), handles_
[1], nullptr,
1429 ASSERT_EQ("first", Get(1, "foo", snapshot
));
1430 ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
1431 db_
->ReleaseSnapshot(snapshot
);
1432 // Skip HashCuckooRep as it does not support single delete. FIFO and
1433 // universal compaction do not apply to the test case. Skip MergePut
1434 // because single delete does not get removed when it encounters a merge.
1435 } while (ChangeOptions(kSkipHashCuckoo
| kSkipFIFOCompaction
|
1436 kSkipUniversalCompaction
| kSkipMergePut
));
1439 #ifndef ROCKSDB_LITE
1440 TEST_F(DBTest
, DeletionMarkers1
) {
1441 Options options
= CurrentOptions();
1442 options
.max_background_flushes
= 0;
1443 CreateAndReopenWithCF({"pikachu"}, options
);
1444 Put(1, "foo", "v1");
1445 ASSERT_OK(Flush(1));
1447 MoveFilesToLevel(last
, 1);
1448 // foo => v1 is now in last level
1449 ASSERT_EQ(NumTableFilesAtLevel(last
, 1), 1);
1451 // Place a table at level last-1 to prevent merging with preceding mutation
1452 Put(1, "a", "begin");
1455 MoveFilesToLevel(last
- 1, 1);
1456 ASSERT_EQ(NumTableFilesAtLevel(last
, 1), 1);
1457 ASSERT_EQ(NumTableFilesAtLevel(last
- 1, 1), 1);
1460 Put(1, "foo", "v2");
1461 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, DEL, v1 ]");
1462 ASSERT_OK(Flush(1)); // Moves to level last-2
1463 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, v1 ]");
1465 dbfull()->TEST_CompactRange(last
- 2, nullptr, &z
, handles_
[1]);
1466 // DEL eliminated, but v1 remains because we aren't compacting that level
1467 // (DEL can be eliminated because v2 hides v1).
1468 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, v1 ]");
1469 dbfull()->TEST_CompactRange(last
- 1, nullptr, nullptr, handles_
[1]);
1470 // Merging last-1 w/ last, so we are the base level for "foo", so
1471 // DEL is removed. (as is v1).
1472 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2 ]");
1475 TEST_F(DBTest
, DeletionMarkers2
) {
1476 Options options
= CurrentOptions();
1477 CreateAndReopenWithCF({"pikachu"}, options
);
1478 Put(1, "foo", "v1");
1479 ASSERT_OK(Flush(1));
1481 MoveFilesToLevel(last
, 1);
1482 // foo => v1 is now in last level
1483 ASSERT_EQ(NumTableFilesAtLevel(last
, 1), 1);
1485 // Place a table at level last-1 to prevent merging with preceding mutation
1486 Put(1, "a", "begin");
1489 MoveFilesToLevel(last
- 1, 1);
1490 ASSERT_EQ(NumTableFilesAtLevel(last
, 1), 1);
1491 ASSERT_EQ(NumTableFilesAtLevel(last
- 1, 1), 1);
1494 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v1 ]");
1495 ASSERT_OK(Flush(1)); // Moves to level last-2
1496 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v1 ]");
1497 dbfull()->TEST_CompactRange(last
- 2, nullptr, nullptr, handles_
[1]);
1498 // DEL kept: "last" file overlaps
1499 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v1 ]");
1500 dbfull()->TEST_CompactRange(last
- 1, nullptr, nullptr, handles_
[1]);
1501 // Merging last-1 w/ last, so we are the base level for "foo", so
1502 // DEL is removed. (as is v1).
1503 ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
1506 TEST_F(DBTest
, OverlapInLevel0
) {
1508 Options options
= CurrentOptions();
1509 CreateAndReopenWithCF({"pikachu"}, options
);
1511 // Fill levels 1 and 2 to disable the pushing of new memtables to levels >
1513 ASSERT_OK(Put(1, "100", "v100"));
1514 ASSERT_OK(Put(1, "999", "v999"));
1516 MoveFilesToLevel(2, 1);
1517 ASSERT_OK(Delete(1, "100"));
1518 ASSERT_OK(Delete(1, "999"));
1520 MoveFilesToLevel(1, 1);
1521 ASSERT_EQ("0,1,1", FilesPerLevel(1));
1523 // Make files spanning the following ranges in level-0:
1524 // files[0] 200 .. 900
1525 // files[1] 300 .. 500
1526 // Note that files are sorted by smallest key.
1527 ASSERT_OK(Put(1, "300", "v300"));
1528 ASSERT_OK(Put(1, "500", "v500"));
1530 ASSERT_OK(Put(1, "200", "v200"));
1531 ASSERT_OK(Put(1, "600", "v600"));
1532 ASSERT_OK(Put(1, "900", "v900"));
1534 ASSERT_EQ("2,1,1", FilesPerLevel(1));
1536 // Compact away the placeholder files we created initially
1537 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_
[1]);
1538 dbfull()->TEST_CompactRange(2, nullptr, nullptr, handles_
[1]);
1539 ASSERT_EQ("2", FilesPerLevel(1));
1541 // Do a memtable compaction. Before bug-fix, the compaction would
1542 // not detect the overlap with level-0 files and would incorrectly place
1543 // the deletion in a deeper level.
1544 ASSERT_OK(Delete(1, "600"));
1546 ASSERT_EQ("3", FilesPerLevel(1));
1547 ASSERT_EQ("NOT_FOUND", Get(1, "600"));
1548 } while (ChangeOptions(kSkipUniversalCompaction
| kSkipFIFOCompaction
));
1550 #endif // ROCKSDB_LITE
1552 TEST_F(DBTest
, ComparatorCheck
) {
1553 class NewComparator
: public Comparator
{
1555 virtual const char* Name() const override
{
1556 return "rocksdb.NewComparator";
1558 virtual int Compare(const Slice
& a
, const Slice
& b
) const override
{
1559 return BytewiseComparator()->Compare(a
, b
);
1561 virtual void FindShortestSeparator(std::string
* s
,
1562 const Slice
& l
) const override
{
1563 BytewiseComparator()->FindShortestSeparator(s
, l
);
1565 virtual void FindShortSuccessor(std::string
* key
) const override
{
1566 BytewiseComparator()->FindShortSuccessor(key
);
1569 Options new_options
, options
;
1572 options
= CurrentOptions();
1573 CreateAndReopenWithCF({"pikachu"}, options
);
1574 new_options
= CurrentOptions();
1575 new_options
.comparator
= &cmp
;
1576 // only the non-default column family has non-matching comparator
1577 Status s
= TryReopenWithColumnFamilies(
1578 {"default", "pikachu"}, std::vector
<Options
>({options
, new_options
}));
1579 ASSERT_TRUE(!s
.ok());
1580 ASSERT_TRUE(s
.ToString().find("comparator") != std::string::npos
)
1582 } while (ChangeCompactOptions());
1585 TEST_F(DBTest
, CustomComparator
) {
1586 class NumberComparator
: public Comparator
{
1588 virtual const char* Name() const override
{
1589 return "test.NumberComparator";
1591 virtual int Compare(const Slice
& a
, const Slice
& b
) const override
{
1592 return ToNumber(a
) - ToNumber(b
);
1594 virtual void FindShortestSeparator(std::string
* s
,
1595 const Slice
& l
) const override
{
1596 ToNumber(*s
); // Check format
1597 ToNumber(l
); // Check format
1599 virtual void FindShortSuccessor(std::string
* key
) const override
{
1600 ToNumber(*key
); // Check format
1604 static int ToNumber(const Slice
& x
) {
1605 // Check that there are no extra characters.
1606 EXPECT_TRUE(x
.size() >= 2 && x
[0] == '[' && x
[x
.size() - 1] == ']')
1610 EXPECT_TRUE(sscanf(x
.ToString().c_str(), "[%i]%c", &val
, &ignored
) == 1)
1615 Options new_options
;
1616 NumberComparator cmp
;
1618 new_options
= CurrentOptions();
1619 new_options
.create_if_missing
= true;
1620 new_options
.comparator
= &cmp
;
1621 new_options
.write_buffer_size
= 4096; // Compact more often
1622 new_options
.arena_block_size
= 4096;
1623 new_options
= CurrentOptions(new_options
);
1624 DestroyAndReopen(new_options
);
1625 CreateAndReopenWithCF({"pikachu"}, new_options
);
1626 ASSERT_OK(Put(1, "[10]", "ten"));
1627 ASSERT_OK(Put(1, "[0x14]", "twenty"));
1628 for (int i
= 0; i
< 2; i
++) {
1629 ASSERT_EQ("ten", Get(1, "[10]"));
1630 ASSERT_EQ("ten", Get(1, "[0xa]"));
1631 ASSERT_EQ("twenty", Get(1, "[20]"));
1632 ASSERT_EQ("twenty", Get(1, "[0x14]"));
1633 ASSERT_EQ("NOT_FOUND", Get(1, "[15]"));
1634 ASSERT_EQ("NOT_FOUND", Get(1, "[0xf]"));
1635 Compact(1, "[0]", "[9999]");
1638 for (int run
= 0; run
< 2; run
++) {
1639 for (int i
= 0; i
< 1000; i
++) {
1641 snprintf(buf
, sizeof(buf
), "[%d]", i
* 10);
1642 ASSERT_OK(Put(1, buf
, buf
));
1644 Compact(1, "[0]", "[1000000]");
1646 } while (ChangeCompactOptions());
1649 TEST_F(DBTest
, DBOpen_Options
) {
1650 Options options
= CurrentOptions();
1651 std::string dbname
= test::TmpDir(env_
) + "/db_options_test";
1652 ASSERT_OK(DestroyDB(dbname
, options
));
1654 // Does not exist, and create_if_missing == false: error
1656 options
.create_if_missing
= false;
1657 Status s
= DB::Open(options
, dbname
, &db
);
1658 ASSERT_TRUE(strstr(s
.ToString().c_str(), "does not exist") != nullptr);
1659 ASSERT_TRUE(db
== nullptr);
1661 // Does not exist, and create_if_missing == true: OK
1662 options
.create_if_missing
= true;
1663 s
= DB::Open(options
, dbname
, &db
);
1665 ASSERT_TRUE(db
!= nullptr);
1670 // Does exist, and error_if_exists == true: error
1671 options
.create_if_missing
= false;
1672 options
.error_if_exists
= true;
1673 s
= DB::Open(options
, dbname
, &db
);
1674 ASSERT_TRUE(strstr(s
.ToString().c_str(), "exists") != nullptr);
1675 ASSERT_TRUE(db
== nullptr);
1677 // Does exist, and error_if_exists == false: OK
1678 options
.create_if_missing
= true;
1679 options
.error_if_exists
= false;
1680 s
= DB::Open(options
, dbname
, &db
);
1682 ASSERT_TRUE(db
!= nullptr);
1688 TEST_F(DBTest
, DBOpen_Change_NumLevels
) {
1689 Options options
= CurrentOptions();
1690 options
.create_if_missing
= true;
1691 DestroyAndReopen(options
);
1692 ASSERT_TRUE(db_
!= nullptr);
1693 CreateAndReopenWithCF({"pikachu"}, options
);
1695 ASSERT_OK(Put(1, "a", "123"));
1696 ASSERT_OK(Put(1, "b", "234"));
1698 MoveFilesToLevel(3, 1);
1701 options
.create_if_missing
= false;
1702 options
.num_levels
= 2;
1703 Status s
= TryReopenWithColumnFamilies({"default", "pikachu"}, options
);
1704 ASSERT_TRUE(strstr(s
.ToString().c_str(), "Invalid argument") != nullptr);
1705 ASSERT_TRUE(db_
== nullptr);
1708 TEST_F(DBTest
, DestroyDBMetaDatabase
) {
1709 std::string dbname
= test::TmpDir(env_
) + "/db_meta";
1710 ASSERT_OK(env_
->CreateDirIfMissing(dbname
));
1711 std::string metadbname
= MetaDatabaseName(dbname
, 0);
1712 ASSERT_OK(env_
->CreateDirIfMissing(metadbname
));
1713 std::string metametadbname
= MetaDatabaseName(metadbname
, 0);
1714 ASSERT_OK(env_
->CreateDirIfMissing(metametadbname
));
1716 // Destroy previous versions if they exist. Using the long way.
1717 Options options
= CurrentOptions();
1718 ASSERT_OK(DestroyDB(metametadbname
, options
));
1719 ASSERT_OK(DestroyDB(metadbname
, options
));
1720 ASSERT_OK(DestroyDB(dbname
, options
));
1724 ASSERT_OK(DB::Open(options
, dbname
, &db
));
1727 ASSERT_OK(DB::Open(options
, metadbname
, &db
));
1730 ASSERT_OK(DB::Open(options
, metametadbname
, &db
));
1735 ASSERT_OK(DestroyDB(dbname
, options
));
1737 // Check if deletion worked.
1738 options
.create_if_missing
= false;
1739 ASSERT_TRUE(!(DB::Open(options
, dbname
, &db
)).ok());
1740 ASSERT_TRUE(!(DB::Open(options
, metadbname
, &db
)).ok());
1741 ASSERT_TRUE(!(DB::Open(options
, metametadbname
, &db
)).ok());
1744 #ifndef ROCKSDB_LITE
1745 TEST_F(DBTest
, SnapshotFiles
) {
1747 Options options
= CurrentOptions();
1748 options
.write_buffer_size
= 100000000; // Large write buffer
1749 CreateAndReopenWithCF({"pikachu"}, options
);
1753 // Write 8MB (80 values, each 100K)
1754 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
1755 std::vector
<std::string
> values
;
1756 for (int i
= 0; i
< 80; i
++) {
1757 values
.push_back(RandomString(&rnd
, 100000));
1758 ASSERT_OK(Put((i
< 40), Key(i
), values
[i
]));
1761 // assert that nothing makes it to disk yet.
1762 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
1764 // get a file snapshot
1765 uint64_t manifest_number
= 0;
1766 uint64_t manifest_size
= 0;
1767 std::vector
<std::string
> files
;
1768 dbfull()->DisableFileDeletions();
1769 dbfull()->GetLiveFiles(files
, &manifest_size
);
1771 // CURRENT, MANIFEST, OPTIONS, *.sst files (one for each CF)
1772 ASSERT_EQ(files
.size(), 5U);
1774 uint64_t number
= 0;
1777 // copy these files to a new snapshot directory
1778 std::string snapdir
= dbname_
+ ".snapdir/";
1779 ASSERT_OK(env_
->CreateDirIfMissing(snapdir
));
1781 for (size_t i
= 0; i
< files
.size(); i
++) {
1782 // our clients require that GetLiveFiles returns
1783 // files with "/" as first character!
1784 ASSERT_EQ(files
[i
][0], '/');
1785 std::string src
= dbname_
+ files
[i
];
1786 std::string dest
= snapdir
+ files
[i
];
1789 ASSERT_OK(env_
->GetFileSize(src
, &size
));
1791 // record the number and the size of the
1792 // latest manifest file
1793 if (ParseFileName(files
[i
].substr(1), &number
, &type
)) {
1794 if (type
== kDescriptorFile
) {
1795 if (number
> manifest_number
) {
1796 manifest_number
= number
;
1797 ASSERT_GE(size
, manifest_size
);
1798 size
= manifest_size
; // copy only valid MANIFEST data
1802 CopyFile(src
, dest
, size
);
1805 // release file snapshot
1806 dbfull()->DisableFileDeletions();
1807 // overwrite one key, this key should not appear in the snapshot
1808 std::vector
<std::string
> extras
;
1809 for (unsigned int i
= 0; i
< 1; i
++) {
1810 extras
.push_back(RandomString(&rnd
, 100000));
1811 ASSERT_OK(Put(0, Key(i
), extras
[i
]));
1814 // verify that data in the snapshot are correct
1815 std::vector
<ColumnFamilyDescriptor
> column_families
;
1816 column_families
.emplace_back("default", ColumnFamilyOptions());
1817 column_families
.emplace_back("pikachu", ColumnFamilyOptions());
1818 std::vector
<ColumnFamilyHandle
*> cf_handles
;
1822 opts
.create_if_missing
= false;
1824 DB::Open(opts
, snapdir
, column_families
, &cf_handles
, &snapdb
);
1827 ReadOptions roptions
;
1829 for (unsigned int i
= 0; i
< 80; i
++) {
1830 stat
= snapdb
->Get(roptions
, cf_handles
[i
< 40], Key(i
), &val
);
1831 ASSERT_EQ(values
[i
].compare(val
), 0);
1833 for (auto cfh
: cf_handles
) {
1838 // look at the new live files after we added an 'extra' key
1839 // and after we took the first snapshot.
1840 uint64_t new_manifest_number
= 0;
1841 uint64_t new_manifest_size
= 0;
1842 std::vector
<std::string
> newfiles
;
1843 dbfull()->DisableFileDeletions();
1844 dbfull()->GetLiveFiles(newfiles
, &new_manifest_size
);
1846 // find the new manifest file. assert that this manifest file is
1847 // the same one as in the previous snapshot. But its size should be
1848 // larger because we added an extra key after taking the
1849 // previous shapshot.
1850 for (size_t i
= 0; i
< newfiles
.size(); i
++) {
1851 std::string src
= dbname_
+ "/" + newfiles
[i
];
1852 // record the lognumber and the size of the
1853 // latest manifest file
1854 if (ParseFileName(newfiles
[i
].substr(1), &number
, &type
)) {
1855 if (type
== kDescriptorFile
) {
1856 if (number
> new_manifest_number
) {
1858 new_manifest_number
= number
;
1859 ASSERT_OK(env_
->GetFileSize(src
, &size
));
1860 ASSERT_GE(size
, new_manifest_size
);
1865 ASSERT_EQ(manifest_number
, new_manifest_number
);
1866 ASSERT_GT(new_manifest_size
, manifest_size
);
1868 // release file snapshot
1869 dbfull()->DisableFileDeletions();
1870 } while (ChangeCompactOptions());
1874 TEST_F(DBTest
, PurgeInfoLogs
) {
1875 Options options
= CurrentOptions();
1876 options
.keep_log_file_num
= 5;
1877 options
.create_if_missing
= true;
1878 for (int mode
= 0; mode
<= 1; mode
++) {
1880 options
.db_log_dir
= dbname_
+ "_logs";
1881 env_
->CreateDirIfMissing(options
.db_log_dir
);
1883 options
.db_log_dir
= "";
1885 for (int i
= 0; i
< 8; i
++) {
1889 std::vector
<std::string
> files
;
1890 env_
->GetChildren(options
.db_log_dir
.empty() ? dbname_
: options
.db_log_dir
,
1892 int info_log_count
= 0;
1893 for (std::string file
: files
) {
1894 if (file
.find("LOG") != std::string::npos
) {
1898 ASSERT_EQ(5, info_log_count
);
1901 // For mode (1), test DestroyDB() to delete all the logs under DB dir.
1902 // For mode (2), no info log file should have been put under DB dir.
1903 std::vector
<std::string
> db_files
;
1904 env_
->GetChildren(dbname_
, &db_files
);
1905 for (std::string file
: db_files
) {
1906 ASSERT_TRUE(file
.find("LOG") == std::string::npos
);
1911 env_
->GetChildren(options
.db_log_dir
, &files
);
1912 for (std::string file
: files
) {
1913 env_
->DeleteFile(options
.db_log_dir
+ "/" + file
);
1915 env_
->DeleteDir(options
.db_log_dir
);
1920 #ifndef ROCKSDB_LITE
1921 // Multi-threaded test:
1924 static const int kColumnFamilies
= 10;
1925 static const int kNumThreads
= 10;
1926 static const int kTestSeconds
= 10;
1927 static const int kNumKeys
= 1000;
1931 std::atomic
<bool> stop
;
1932 std::atomic
<int> counter
[kNumThreads
];
1933 std::atomic
<bool> thread_done
[kNumThreads
];
1941 static void MTThreadBody(void* arg
) {
1942 MTThread
* t
= reinterpret_cast<MTThread
*>(arg
);
1944 DB
* db
= t
->state
->test
->db_
;
1946 fprintf(stderr
, "... starting thread %d\n", id
);
1947 Random
rnd(1000 + id
);
1949 while (t
->state
->stop
.load(std::memory_order_acquire
) == false) {
1950 t
->state
->counter
[id
].store(counter
, std::memory_order_release
);
1952 int key
= rnd
.Uniform(kNumKeys
);
1954 snprintf(keybuf
, sizeof(keybuf
), "%016d", key
);
1957 // Write values of the form <key, my id, counter, cf, unique_id>.
1958 // into each of the CFs
1959 // We add some padding for force compactions.
1960 int unique_id
= rnd
.Uniform(1000000);
1962 // Half of the time directly use WriteBatch. Half of the time use
1963 // WriteBatchWithIndex.
1966 for (int cf
= 0; cf
< kColumnFamilies
; ++cf
) {
1967 snprintf(valbuf
, sizeof(valbuf
), "%d.%d.%d.%d.%-1000d", key
, id
,
1968 static_cast<int>(counter
), cf
, unique_id
);
1969 batch
.Put(t
->state
->test
->handles_
[cf
], Slice(keybuf
), Slice(valbuf
));
1971 ASSERT_OK(db
->Write(WriteOptions(), &batch
));
1973 WriteBatchWithIndex
batch(db
->GetOptions().comparator
);
1974 for (int cf
= 0; cf
< kColumnFamilies
; ++cf
) {
1975 snprintf(valbuf
, sizeof(valbuf
), "%d.%d.%d.%d.%-1000d", key
, id
,
1976 static_cast<int>(counter
), cf
, unique_id
);
1977 batch
.Put(t
->state
->test
->handles_
[cf
], Slice(keybuf
), Slice(valbuf
));
1979 ASSERT_OK(db
->Write(WriteOptions(), batch
.GetWriteBatch()));
1982 // Read a value and verify that it matches the pattern written above
1983 // and that writes to all column families were atomic (unique_id is the
1985 std::vector
<Slice
> keys(kColumnFamilies
, Slice(keybuf
));
1986 std::vector
<std::string
> values
;
1987 std::vector
<Status
> statuses
=
1988 db
->MultiGet(ReadOptions(), t
->state
->test
->handles_
, keys
, &values
);
1989 Status s
= statuses
[0];
1990 // all statuses have to be the same
1991 for (size_t i
= 1; i
< statuses
.size(); ++i
) {
1992 // they are either both ok or both not-found
1993 ASSERT_TRUE((s
.ok() && statuses
[i
].ok()) ||
1994 (s
.IsNotFound() && statuses
[i
].IsNotFound()));
1996 if (s
.IsNotFound()) {
1997 // Key has not yet been written
1999 // Check that the writer thread counter is >= the counter in the value
2002 for (int i
= 0; i
< kColumnFamilies
; ++i
) {
2004 ASSERT_EQ(5, sscanf(values
[i
].c_str(), "%d.%d.%d.%d.%d", &k
, &w
, &c
,
2009 ASSERT_LT(w
, kNumThreads
);
2010 ASSERT_LE(c
, t
->state
->counter
[w
].load(std::memory_order_acquire
));
2015 // this checks that updates across column families happened
2016 // atomically -- all unique ids are the same
2017 ASSERT_EQ(u
, unique_id
);
2024 t
->state
->thread_done
[id
].store(true, std::memory_order_release
);
2025 fprintf(stderr
, "... stopping thread %d after %d ops\n", id
, int(counter
));
2030 class MultiThreadedDBTest
: public DBTest
,
2031 public ::testing::WithParamInterface
<int> {
2033 virtual void SetUp() override
{ option_config_
= GetParam(); }
2035 static std::vector
<int> GenerateOptionConfigs() {
2036 std::vector
<int> optionConfigs
;
2037 for (int optionConfig
= kDefault
; optionConfig
< kEnd
; ++optionConfig
) {
2038 // skip as HashCuckooRep does not support snapshot
2039 if (optionConfig
!= kHashCuckoo
) {
2040 optionConfigs
.push_back(optionConfig
);
2043 return optionConfigs
;
2047 TEST_P(MultiThreadedDBTest
, MultiThreaded
) {
2048 anon::OptionsOverride options_override
;
2049 options_override
.skip_policy
= kSkipNoSnapshot
;
2050 Options options
= CurrentOptions(options_override
);
2051 std::vector
<std::string
> cfs
;
2052 for (int i
= 1; i
< kColumnFamilies
; ++i
) {
2053 cfs
.push_back(ToString(i
));
2056 CreateAndReopenWithCF(cfs
, options
);
2060 mt
.stop
.store(false, std::memory_order_release
);
2061 for (int id
= 0; id
< kNumThreads
; id
++) {
2062 mt
.counter
[id
].store(0, std::memory_order_release
);
2063 mt
.thread_done
[id
].store(false, std::memory_order_release
);
2067 MTThread thread
[kNumThreads
];
2068 for (int id
= 0; id
< kNumThreads
; id
++) {
2069 thread
[id
].state
= &mt
;
2071 env_
->StartThread(MTThreadBody
, &thread
[id
]);
2074 // Let them run for a while
2075 env_
->SleepForMicroseconds(kTestSeconds
* 1000000);
2077 // Stop the threads and wait for them to finish
2078 mt
.stop
.store(true, std::memory_order_release
);
2079 for (int id
= 0; id
< kNumThreads
; id
++) {
2080 while (mt
.thread_done
[id
].load(std::memory_order_acquire
) == false) {
2081 env_
->SleepForMicroseconds(100000);
2086 INSTANTIATE_TEST_CASE_P(
2087 MultiThreaded
, MultiThreadedDBTest
,
2088 ::testing::ValuesIn(MultiThreadedDBTest::GenerateOptionConfigs()));
2089 #endif // ROCKSDB_LITE
2091 // Group commit test:
2094 static const int kGCNumThreads
= 4;
2095 static const int kGCNumKeys
= 1000;
2100 std::atomic
<bool> done
;
2103 static void GCThreadBody(void* arg
) {
2104 GCThread
* t
= reinterpret_cast<GCThread
*>(arg
);
2109 for (int i
= 0; i
< kGCNumKeys
; ++i
) {
2110 std::string
kv(ToString(i
+ id
* kGCNumKeys
));
2111 ASSERT_OK(db
->Put(wo
, kv
, kv
));
2118 TEST_F(DBTest
, GroupCommitTest
) {
2120 Options options
= CurrentOptions();
2122 env_
->log_write_slowdown_
.store(100);
2123 options
.statistics
= rocksdb::CreateDBStatistics();
2127 GCThread thread
[kGCNumThreads
];
2128 for (int id
= 0; id
< kGCNumThreads
; id
++) {
2130 thread
[id
].db
= db_
;
2131 thread
[id
].done
= false;
2132 env_
->StartThread(GCThreadBody
, &thread
[id
]);
2135 for (int id
= 0; id
< kGCNumThreads
; id
++) {
2136 while (thread
[id
].done
== false) {
2137 env_
->SleepForMicroseconds(100000);
2140 env_
->log_write_slowdown_
.store(0);
2142 ASSERT_GT(TestGetTickerCount(options
, WRITE_DONE_BY_OTHER
), 0);
2144 std::vector
<std::string
> expected_db
;
2145 for (int i
= 0; i
< kGCNumThreads
* kGCNumKeys
; ++i
) {
2146 expected_db
.push_back(ToString(i
));
2148 std::sort(expected_db
.begin(), expected_db
.end());
2150 Iterator
* itr
= db_
->NewIterator(ReadOptions());
2152 for (auto x
: expected_db
) {
2153 ASSERT_TRUE(itr
->Valid());
2154 ASSERT_EQ(itr
->key().ToString(), x
);
2155 ASSERT_EQ(itr
->value().ToString(), x
);
2158 ASSERT_TRUE(!itr
->Valid());
2161 HistogramData hist_data
;
2162 options
.statistics
->histogramData(DB_WRITE
, &hist_data
);
2163 ASSERT_GT(hist_data
.average
, 0.0);
2164 } while (ChangeOptions(kSkipNoSeekToLast
));
2168 typedef std::map
<std::string
, std::string
> KVMap
;
2171 class ModelDB
: public DB
{
2173 class ModelSnapshot
: public Snapshot
{
2177 virtual SequenceNumber
GetSequenceNumber() const override
{
2178 // no need to call this
2184 explicit ModelDB(const Options
& options
) : options_(options
) {}
2186 virtual Status
Put(const WriteOptions
& o
, ColumnFamilyHandle
* cf
,
2187 const Slice
& k
, const Slice
& v
) override
{
2189 batch
.Put(cf
, k
, v
);
2190 return Write(o
, &batch
);
2193 virtual Status
Delete(const WriteOptions
& o
, ColumnFamilyHandle
* cf
,
2194 const Slice
& key
) override
{
2196 batch
.Delete(cf
, key
);
2197 return Write(o
, &batch
);
2199 using DB::SingleDelete
;
2200 virtual Status
SingleDelete(const WriteOptions
& o
, ColumnFamilyHandle
* cf
,
2201 const Slice
& key
) override
{
2203 batch
.SingleDelete(cf
, key
);
2204 return Write(o
, &batch
);
2207 virtual Status
Merge(const WriteOptions
& o
, ColumnFamilyHandle
* cf
,
2208 const Slice
& k
, const Slice
& v
) override
{
2210 batch
.Merge(cf
, k
, v
);
2211 return Write(o
, &batch
);
2214 virtual Status
Get(const ReadOptions
& options
, ColumnFamilyHandle
* cf
,
2215 const Slice
& key
, PinnableSlice
* value
) override
{
2216 return Status::NotSupported(key
);
2220 virtual std::vector
<Status
> MultiGet(
2221 const ReadOptions
& options
,
2222 const std::vector
<ColumnFamilyHandle
*>& column_family
,
2223 const std::vector
<Slice
>& keys
,
2224 std::vector
<std::string
>* values
) override
{
2225 std::vector
<Status
> s(keys
.size(),
2226 Status::NotSupported("Not implemented."));
2230 #ifndef ROCKSDB_LITE
2231 using DB::IngestExternalFile
;
2232 virtual Status
IngestExternalFile(
2233 ColumnFamilyHandle
* column_family
,
2234 const std::vector
<std::string
>& external_files
,
2235 const IngestExternalFileOptions
& options
) override
{
2236 return Status::NotSupported("Not implemented.");
2239 using DB::GetPropertiesOfAllTables
;
2240 virtual Status
GetPropertiesOfAllTables(
2241 ColumnFamilyHandle
* column_family
,
2242 TablePropertiesCollection
* props
) override
{
2246 virtual Status
GetPropertiesOfTablesInRange(
2247 ColumnFamilyHandle
* column_family
, const Range
* range
, std::size_t n
,
2248 TablePropertiesCollection
* props
) override
{
2251 #endif // ROCKSDB_LITE
2253 using DB::KeyMayExist
;
2254 virtual bool KeyMayExist(const ReadOptions
& options
,
2255 ColumnFamilyHandle
* column_family
, const Slice
& key
,
2257 bool* value_found
= nullptr) override
{
2258 if (value_found
!= nullptr) {
2259 *value_found
= false;
2261 return true; // Not Supported directly
2263 using DB::NewIterator
;
2264 virtual Iterator
* NewIterator(const ReadOptions
& options
,
2265 ColumnFamilyHandle
* column_family
) override
{
2266 if (options
.snapshot
== nullptr) {
2267 KVMap
* saved
= new KVMap
;
2269 return new ModelIter(saved
, true);
2271 const KVMap
* snapshot_state
=
2272 &(reinterpret_cast<const ModelSnapshot
*>(options
.snapshot
)->map_
);
2273 return new ModelIter(snapshot_state
, false);
2276 virtual Status
NewIterators(
2277 const ReadOptions
& options
,
2278 const std::vector
<ColumnFamilyHandle
*>& column_family
,
2279 std::vector
<Iterator
*>* iterators
) override
{
2280 return Status::NotSupported("Not supported yet");
2282 virtual const Snapshot
* GetSnapshot() override
{
2283 ModelSnapshot
* snapshot
= new ModelSnapshot
;
2284 snapshot
->map_
= map_
;
2288 virtual void ReleaseSnapshot(const Snapshot
* snapshot
) override
{
2289 delete reinterpret_cast<const ModelSnapshot
*>(snapshot
);
2292 virtual Status
Write(const WriteOptions
& options
,
2293 WriteBatch
* batch
) override
{
2294 class Handler
: public WriteBatch::Handler
{
2297 virtual void Put(const Slice
& key
, const Slice
& value
) override
{
2298 (*map_
)[key
.ToString()] = value
.ToString();
2300 virtual void Merge(const Slice
& key
, const Slice
& value
) override
{
2301 // ignore merge for now
2302 // (*map_)[key.ToString()] = value.ToString();
2304 virtual void Delete(const Slice
& key
) override
{
2305 map_
->erase(key
.ToString());
2309 handler
.map_
= &map_
;
2310 return batch
->Iterate(&handler
);
2313 using DB::GetProperty
;
2314 virtual bool GetProperty(ColumnFamilyHandle
* column_family
,
2315 const Slice
& property
, std::string
* value
) override
{
2318 using DB::GetIntProperty
;
2319 virtual bool GetIntProperty(ColumnFamilyHandle
* column_family
,
2320 const Slice
& property
, uint64_t* value
) override
{
2323 using DB::GetMapProperty
;
2324 virtual bool GetMapProperty(ColumnFamilyHandle
* column_family
,
2325 const Slice
& property
,
2326 std::map
<std::string
, double>* value
) override
{
2329 using DB::GetAggregatedIntProperty
;
2330 virtual bool GetAggregatedIntProperty(const Slice
& property
,
2331 uint64_t* value
) override
{
2334 using DB::GetApproximateSizes
;
2335 virtual void GetApproximateSizes(ColumnFamilyHandle
* column_family
,
2336 const Range
* range
, int n
, uint64_t* sizes
,
2337 uint8_t include_flags
2338 = INCLUDE_FILES
) override
{
2339 for (int i
= 0; i
< n
; i
++) {
2343 using DB::GetApproximateMemTableStats
;
2344 virtual void GetApproximateMemTableStats(ColumnFamilyHandle
* column_family
,
2346 uint64_t* const count
,
2347 uint64_t* const size
) override
{
2351 using DB::CompactRange
;
2352 virtual Status
CompactRange(const CompactRangeOptions
& options
,
2353 ColumnFamilyHandle
* column_family
,
2354 const Slice
* start
, const Slice
* end
) override
{
2355 return Status::NotSupported("Not supported operation.");
2358 virtual Status
SetDBOptions(
2359 const std::unordered_map
<std::string
, std::string
>& new_options
)
2361 return Status::NotSupported("Not supported operation.");
2364 using DB::CompactFiles
;
2365 virtual Status
CompactFiles(const CompactionOptions
& compact_options
,
2366 ColumnFamilyHandle
* column_family
,
2367 const std::vector
<std::string
>& input_file_names
,
2368 const int output_level
,
2369 const int output_path_id
= -1) override
{
2370 return Status::NotSupported("Not supported operation.");
2373 Status
PauseBackgroundWork() override
{
2374 return Status::NotSupported("Not supported operation.");
2377 Status
ContinueBackgroundWork() override
{
2378 return Status::NotSupported("Not supported operation.");
2381 Status
EnableAutoCompaction(
2382 const std::vector
<ColumnFamilyHandle
*>& column_family_handles
) override
{
2383 return Status::NotSupported("Not supported operation.");
2386 using DB::NumberLevels
;
2387 virtual int NumberLevels(ColumnFamilyHandle
* column_family
) override
{
2391 using DB::MaxMemCompactionLevel
;
2392 virtual int MaxMemCompactionLevel(
2393 ColumnFamilyHandle
* column_family
) override
{
2397 using DB::Level0StopWriteTrigger
;
2398 virtual int Level0StopWriteTrigger(
2399 ColumnFamilyHandle
* column_family
) override
{
2403 virtual const std::string
& GetName() const override
{ return name_
; }
2405 virtual Env
* GetEnv() const override
{ return nullptr; }
2407 using DB::GetOptions
;
2408 virtual Options
GetOptions(ColumnFamilyHandle
* column_family
) const override
{
2412 using DB::GetDBOptions
;
2413 virtual DBOptions
GetDBOptions() const override
{ return options_
; }
2416 virtual Status
Flush(const rocksdb::FlushOptions
& options
,
2417 ColumnFamilyHandle
* column_family
) override
{
2422 virtual Status
SyncWAL() override
{ return Status::OK(); }
2424 #ifndef ROCKSDB_LITE
2425 virtual Status
DisableFileDeletions() override
{ return Status::OK(); }
2427 virtual Status
EnableFileDeletions(bool force
) override
{
2428 return Status::OK();
2430 virtual Status
GetLiveFiles(std::vector
<std::string
>&, uint64_t* size
,
2431 bool flush_memtable
= true) override
{
2432 return Status::OK();
2435 virtual Status
GetSortedWalFiles(VectorLogPtr
& files
) override
{
2436 return Status::OK();
2439 virtual Status
DeleteFile(std::string name
) override
{ return Status::OK(); }
2441 virtual Status
GetUpdatesSince(
2442 rocksdb::SequenceNumber
, unique_ptr
<rocksdb::TransactionLogIterator
>*,
2443 const TransactionLogIterator::ReadOptions
& read_options
=
2444 TransactionLogIterator::ReadOptions()) override
{
2445 return Status::NotSupported("Not supported in Model DB");
2448 virtual void GetColumnFamilyMetaData(
2449 ColumnFamilyHandle
* column_family
,
2450 ColumnFamilyMetaData
* metadata
) override
{}
2451 #endif // ROCKSDB_LITE
2453 virtual Status
GetDbIdentity(std::string
& identity
) const override
{
2454 return Status::OK();
2457 virtual SequenceNumber
GetLatestSequenceNumber() const override
{ return 0; }
2459 virtual ColumnFamilyHandle
* DefaultColumnFamily() const override
{
2464 class ModelIter
: public Iterator
{
2466 ModelIter(const KVMap
* map
, bool owned
)
2467 : map_(map
), owned_(owned
), iter_(map_
->end()) {}
2469 if (owned_
) delete map_
;
2471 virtual bool Valid() const override
{ return iter_
!= map_
->end(); }
2472 virtual void SeekToFirst() override
{ iter_
= map_
->begin(); }
2473 virtual void SeekToLast() override
{
2474 if (map_
->empty()) {
2475 iter_
= map_
->end();
2477 iter_
= map_
->find(map_
->rbegin()->first
);
2480 virtual void Seek(const Slice
& k
) override
{
2481 iter_
= map_
->lower_bound(k
.ToString());
2483 virtual void SeekForPrev(const Slice
& k
) override
{
2484 iter_
= map_
->upper_bound(k
.ToString());
2487 virtual void Next() override
{ ++iter_
; }
2488 virtual void Prev() override
{
2489 if (iter_
== map_
->begin()) {
2490 iter_
= map_
->end();
2496 virtual Slice
key() const override
{ return iter_
->first
; }
2497 virtual Slice
value() const override
{ return iter_
->second
; }
2498 virtual Status
status() const override
{ return Status::OK(); }
2501 const KVMap
* const map_
;
2502 const bool owned_
; // Do we own map_
2503 KVMap::const_iterator iter_
;
2505 const Options options_
;
2507 std::string name_
= "";
2510 static std::string
RandomKey(Random
* rnd
, int minimum
= 0) {
2513 len
= (rnd
->OneIn(3)
2514 ? 1 // Short sometimes to encourage collisions
2515 : (rnd
->OneIn(100) ? rnd
->Skewed(10) : rnd
->Uniform(10)));
2516 } while (len
< minimum
);
2517 return test::RandomKey(rnd
, len
);
2520 static bool CompareIterators(int step
, DB
* model
, DB
* db
,
2521 const Snapshot
* model_snap
,
2522 const Snapshot
* db_snap
) {
2523 ReadOptions options
;
2524 options
.snapshot
= model_snap
;
2525 Iterator
* miter
= model
->NewIterator(options
);
2526 options
.snapshot
= db_snap
;
2527 Iterator
* dbiter
= db
->NewIterator(options
);
2530 for (miter
->SeekToFirst(), dbiter
->SeekToFirst();
2531 ok
&& miter
->Valid() && dbiter
->Valid(); miter
->Next(), dbiter
->Next()) {
2533 if (miter
->key().compare(dbiter
->key()) != 0) {
2534 fprintf(stderr
, "step %d: Key mismatch: '%s' vs. '%s'\n", step
,
2535 EscapeString(miter
->key()).c_str(),
2536 EscapeString(dbiter
->key()).c_str());
2541 if (miter
->value().compare(dbiter
->value()) != 0) {
2542 fprintf(stderr
, "step %d: Value mismatch for key '%s': '%s' vs. '%s'\n",
2543 step
, EscapeString(miter
->key()).c_str(),
2544 EscapeString(miter
->value()).c_str(),
2545 EscapeString(miter
->value()).c_str());
2551 if (miter
->Valid() != dbiter
->Valid()) {
2552 fprintf(stderr
, "step %d: Mismatch at end of iterators: %d vs. %d\n",
2553 step
, miter
->Valid(), dbiter
->Valid());
2562 class DBTestRandomized
: public DBTest
,
2563 public ::testing::WithParamInterface
<int> {
2565 virtual void SetUp() override
{ option_config_
= GetParam(); }
2567 static std::vector
<int> GenerateOptionConfigs() {
2568 std::vector
<int> option_configs
;
2569 // skip cuckoo hash as it does not support snapshot.
2570 for (int option_config
= kDefault
; option_config
< kEnd
; ++option_config
) {
2571 if (!ShouldSkipOptions(option_config
, kSkipDeletesFilterFirst
|
2574 option_configs
.push_back(option_config
);
2577 option_configs
.push_back(kBlockBasedTableWithIndexRestartInterval
);
2578 return option_configs
;
2582 INSTANTIATE_TEST_CASE_P(
2583 DBTestRandomized
, DBTestRandomized
,
2584 ::testing::ValuesIn(DBTestRandomized::GenerateOptionConfigs()));
2586 TEST_P(DBTestRandomized
, Randomized
) {
2587 anon::OptionsOverride options_override
;
2588 options_override
.skip_policy
= kSkipNoSnapshot
;
2589 Options options
= CurrentOptions(options_override
);
2590 DestroyAndReopen(options
);
2592 Random
rnd(test::RandomSeed() + GetParam());
2593 ModelDB
model(options
);
2594 const int N
= 10000;
2595 const Snapshot
* model_snap
= nullptr;
2596 const Snapshot
* db_snap
= nullptr;
2598 for (int step
= 0; step
< N
; step
++) {
2599 // TODO(sanjay): Test Get() works
2600 int p
= rnd
.Uniform(100);
2602 if (option_config_
== kHashSkipList
|| option_config_
== kHashLinkList
||
2603 option_config_
== kHashCuckoo
||
2604 option_config_
== kPlainTableFirstBytePrefix
||
2605 option_config_
== kBlockBasedTableWithWholeKeyHashIndex
||
2606 option_config_
== kBlockBasedTableWithPrefixHashIndex
) {
2609 if (p
< 45) { // Put
2610 k
= RandomKey(&rnd
, minimum
);
2611 v
= RandomString(&rnd
,
2612 rnd
.OneIn(20) ? 100 + rnd
.Uniform(100) : rnd
.Uniform(8));
2613 ASSERT_OK(model
.Put(WriteOptions(), k
, v
));
2614 ASSERT_OK(db_
->Put(WriteOptions(), k
, v
));
2615 } else if (p
< 90) { // Delete
2616 k
= RandomKey(&rnd
, minimum
);
2617 ASSERT_OK(model
.Delete(WriteOptions(), k
));
2618 ASSERT_OK(db_
->Delete(WriteOptions(), k
));
2619 } else { // Multi-element batch
2621 const int num
= rnd
.Uniform(8);
2622 for (int i
= 0; i
< num
; i
++) {
2623 if (i
== 0 || !rnd
.OneIn(10)) {
2624 k
= RandomKey(&rnd
, minimum
);
2626 // Periodically re-use the same key from the previous iter, so
2627 // we have multiple entries in the write batch for the same key
2630 v
= RandomString(&rnd
, rnd
.Uniform(10));
2636 ASSERT_OK(model
.Write(WriteOptions(), &b
));
2637 ASSERT_OK(db_
->Write(WriteOptions(), &b
));
2640 if ((step
% 100) == 0) {
2641 // For DB instances that use the hash index + block-based table, the
2642 // iterator will be invalid right when seeking a non-existent key, right
2643 // than return a key that is close to it.
2644 if (option_config_
!= kBlockBasedTableWithWholeKeyHashIndex
&&
2645 option_config_
!= kBlockBasedTableWithPrefixHashIndex
) {
2646 ASSERT_TRUE(CompareIterators(step
, &model
, db_
, nullptr, nullptr));
2647 ASSERT_TRUE(CompareIterators(step
, &model
, db_
, model_snap
, db_snap
));
2650 // Save a snapshot from each DB this time that we'll use next
2651 // time we compare things, to make sure the current state is
2652 // preserved with the snapshot
2653 if (model_snap
!= nullptr) model
.ReleaseSnapshot(model_snap
);
2654 if (db_snap
!= nullptr) db_
->ReleaseSnapshot(db_snap
);
2657 ASSERT_TRUE(CompareIterators(step
, &model
, db_
, nullptr, nullptr));
2659 model_snap
= model
.GetSnapshot();
2660 db_snap
= db_
->GetSnapshot();
2663 if (model_snap
!= nullptr) model
.ReleaseSnapshot(model_snap
);
2664 if (db_snap
!= nullptr) db_
->ReleaseSnapshot(db_snap
);
2667 TEST_F(DBTest
, BlockBasedTablePrefixIndexTest
) {
2668 // create a DB with block prefix index
2669 BlockBasedTableOptions table_options
;
2670 Options options
= CurrentOptions();
2671 table_options
.index_type
= BlockBasedTableOptions::kHashSearch
;
2672 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
2673 options
.prefix_extractor
.reset(NewFixedPrefixTransform(1));
2676 ASSERT_OK(Put("k1", "v1"));
2678 ASSERT_OK(Put("k2", "v2"));
2680 // Reopen it without prefix extractor, make sure everything still works.
2681 // RocksDB should just fall back to the binary index.
2682 table_options
.index_type
= BlockBasedTableOptions::kBinarySearch
;
2683 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
2684 options
.prefix_extractor
.reset();
2687 ASSERT_EQ("v1", Get("k1"));
2688 ASSERT_EQ("v2", Get("k2"));
2691 TEST_F(DBTest
, ChecksumTest
) {
2692 BlockBasedTableOptions table_options
;
2693 Options options
= CurrentOptions();
2695 table_options
.checksum
= kCRC32c
;
2696 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
2698 ASSERT_OK(Put("a", "b"));
2699 ASSERT_OK(Put("c", "d"));
2700 ASSERT_OK(Flush()); // table with crc checksum
2702 table_options
.checksum
= kxxHash
;
2703 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
2705 ASSERT_OK(Put("e", "f"));
2706 ASSERT_OK(Put("g", "h"));
2707 ASSERT_OK(Flush()); // table with xxhash checksum
2709 table_options
.checksum
= kCRC32c
;
2710 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
2712 ASSERT_EQ("b", Get("a"));
2713 ASSERT_EQ("d", Get("c"));
2714 ASSERT_EQ("f", Get("e"));
2715 ASSERT_EQ("h", Get("g"));
2717 table_options
.checksum
= kCRC32c
;
2718 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
2720 ASSERT_EQ("b", Get("a"));
2721 ASSERT_EQ("d", Get("c"));
2722 ASSERT_EQ("f", Get("e"));
2723 ASSERT_EQ("h", Get("g"));
2726 #ifndef ROCKSDB_LITE
2727 TEST_P(DBTestWithParam
, FIFOCompactionTest
) {
2728 for (int iter
= 0; iter
< 2; ++iter
) {
2729 // first iteration -- auto compaction
2730 // second iteration -- manual compaction
2732 options
.compaction_style
= kCompactionStyleFIFO
;
2733 options
.write_buffer_size
= 100 << 10; // 100KB
2734 options
.arena_block_size
= 4096;
2735 options
.compaction_options_fifo
.max_table_files_size
= 500 << 10; // 500KB
2736 options
.compression
= kNoCompression
;
2737 options
.create_if_missing
= true;
2738 options
.max_subcompactions
= max_subcompactions_
;
2740 options
.disable_auto_compactions
= true;
2742 options
= CurrentOptions(options
);
2743 DestroyAndReopen(options
);
2746 for (int i
= 0; i
< 6; ++i
) {
2747 for (int j
= 0; j
< 110; ++j
) {
2748 ASSERT_OK(Put(ToString(i
* 100 + j
), RandomString(&rnd
, 980)));
2750 // flush should happen here
2751 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
2754 ASSERT_OK(dbfull()->TEST_WaitForCompact());
2756 CompactRangeOptions cro
;
2757 cro
.exclusive_manual_compaction
= exclusive_manual_compaction_
;
2758 ASSERT_OK(db_
->CompactRange(cro
, nullptr, nullptr));
2760 // only 5 files should survive
2761 ASSERT_EQ(NumTableFilesAtLevel(0), 5);
2762 for (int i
= 0; i
< 50; ++i
) {
2763 // these keys should be deleted in previous compaction
2764 ASSERT_EQ("NOT_FOUND", Get(ToString(i
)));
2768 #endif // ROCKSDB_LITE
2770 #ifndef ROCKSDB_LITE
2772 * This test is not reliable enough as it heavily depends on disk behavior.
2773 * Disable as it is flaky.
2775 TEST_F(DBTest
, DISABLED_RateLimitingTest
) {
2776 Options options
= CurrentOptions();
2777 options
.write_buffer_size
= 1 << 20; // 1MB
2778 options
.level0_file_num_compaction_trigger
= 2;
2779 options
.target_file_size_base
= 1 << 20; // 1MB
2780 options
.max_bytes_for_level_base
= 4 << 20; // 4MB
2781 options
.max_bytes_for_level_multiplier
= 4;
2782 options
.compression
= kNoCompression
;
2783 options
.create_if_missing
= true;
2785 options
.statistics
= rocksdb::CreateDBStatistics();
2786 options
.IncreaseParallelism(4);
2787 DestroyAndReopen(options
);
2790 wo
.disableWAL
= true;
2792 // # no rate limiting
2794 uint64_t start
= env_
->NowMicros();
2796 for (int64_t i
= 0; i
< (96 << 10); ++i
) {
2798 Put(RandomString(&rnd
, 32), RandomString(&rnd
, (1 << 10) + 1), wo
));
2800 uint64_t elapsed
= env_
->NowMicros() - start
;
2801 double raw_rate
= env_
->bytes_written_
* 1000000.0 / elapsed
;
2802 uint64_t rate_limiter_drains
=
2803 TestGetTickerCount(options
, NUMBER_RATE_LIMITER_DRAINS
);
2804 ASSERT_EQ(0, rate_limiter_drains
);
2807 // # rate limiting with 0.7 x threshold
2808 options
.rate_limiter
.reset(
2809 NewGenericRateLimiter(static_cast<int64_t>(0.7 * raw_rate
)));
2810 env_
->bytes_written_
= 0;
2811 DestroyAndReopen(options
);
2813 start
= env_
->NowMicros();
2815 for (int64_t i
= 0; i
< (96 << 10); ++i
) {
2817 Put(RandomString(&rnd
, 32), RandomString(&rnd
, (1 << 10) + 1), wo
));
2819 rate_limiter_drains
=
2820 TestGetTickerCount(options
, NUMBER_RATE_LIMITER_DRAINS
) -
2821 rate_limiter_drains
;
2822 elapsed
= env_
->NowMicros() - start
;
2824 ASSERT_EQ(options
.rate_limiter
->GetTotalBytesThrough(), env_
->bytes_written_
);
2825 // Most intervals should've been drained (interval time is 100ms, elapsed is
2827 ASSERT_GT(rate_limiter_drains
, 0);
2828 ASSERT_LE(rate_limiter_drains
, elapsed
/ 100000 + 1);
2829 double ratio
= env_
->bytes_written_
* 1000000 / elapsed
/ raw_rate
;
2830 fprintf(stderr
, "write rate ratio = %.2lf, expected 0.7\n", ratio
);
2831 ASSERT_TRUE(ratio
< 0.8);
2833 // # rate limiting with half of the raw_rate
2834 options
.rate_limiter
.reset(
2835 NewGenericRateLimiter(static_cast<int64_t>(raw_rate
/ 2)));
2836 env_
->bytes_written_
= 0;
2837 DestroyAndReopen(options
);
2839 start
= env_
->NowMicros();
2841 for (int64_t i
= 0; i
< (96 << 10); ++i
) {
2843 Put(RandomString(&rnd
, 32), RandomString(&rnd
, (1 << 10) + 1), wo
));
2845 elapsed
= env_
->NowMicros() - start
;
2846 rate_limiter_drains
=
2847 TestGetTickerCount(options
, NUMBER_RATE_LIMITER_DRAINS
) -
2848 rate_limiter_drains
;
2850 ASSERT_EQ(options
.rate_limiter
->GetTotalBytesThrough(), env_
->bytes_written_
);
2851 // Most intervals should've been drained (interval time is 100ms, elapsed is
2853 ASSERT_GT(rate_limiter_drains
, elapsed
/ 100000 / 2);
2854 ASSERT_LE(rate_limiter_drains
, elapsed
/ 100000 + 1);
2855 ratio
= env_
->bytes_written_
* 1000000 / elapsed
/ raw_rate
;
2856 fprintf(stderr
, "write rate ratio = %.2lf, expected 0.5\n", ratio
);
2857 ASSERT_LT(ratio
, 0.6);
2860 TEST_F(DBTest
, TableOptionsSanitizeTest
) {
2861 Options options
= CurrentOptions();
2862 options
.create_if_missing
= true;
2863 DestroyAndReopen(options
);
2864 ASSERT_EQ(db_
->GetOptions().allow_mmap_reads
, false);
2866 options
.table_factory
.reset(new PlainTableFactory());
2867 options
.prefix_extractor
.reset(NewNoopTransform());
2869 ASSERT_TRUE(!TryReopen(options
).IsNotSupported());
2871 // Test for check of prefix_extractor when hash index is used for
2872 // block-based table
2873 BlockBasedTableOptions to
;
2874 to
.index_type
= BlockBasedTableOptions::kHashSearch
;
2875 options
= CurrentOptions();
2876 options
.create_if_missing
= true;
2877 options
.table_factory
.reset(NewBlockBasedTableFactory(to
));
2878 ASSERT_TRUE(TryReopen(options
).IsInvalidArgument());
2879 options
.prefix_extractor
.reset(NewFixedPrefixTransform(1));
2880 ASSERT_OK(TryReopen(options
));
2883 TEST_F(DBTest
, ConcurrentMemtableNotSupported
) {
2884 Options options
= CurrentOptions();
2885 options
.allow_concurrent_memtable_write
= true;
2886 options
.soft_pending_compaction_bytes_limit
= 0;
2887 options
.hard_pending_compaction_bytes_limit
= 100;
2888 options
.create_if_missing
= true;
2890 DestroyDB(dbname_
, options
);
2891 options
.memtable_factory
.reset(NewHashLinkListRepFactory(4, 0, 3, true, 4));
2892 ASSERT_NOK(TryReopen(options
));
2894 options
.memtable_factory
.reset(new SkipListFactory
);
2895 ASSERT_OK(TryReopen(options
));
2897 ColumnFamilyOptions
cf_options(options
);
2898 cf_options
.memtable_factory
.reset(
2899 NewHashLinkListRepFactory(4, 0, 3, true, 4));
2900 ColumnFamilyHandle
* handle
;
2901 ASSERT_NOK(db_
->CreateColumnFamily(cf_options
, "name", &handle
));
2904 #endif // ROCKSDB_LITE
2906 TEST_F(DBTest
, SanitizeNumThreads
) {
2907 for (int attempt
= 0; attempt
< 2; attempt
++) {
2908 const size_t kTotalTasks
= 8;
2909 test::SleepingBackgroundTask sleeping_tasks
[kTotalTasks
];
2911 Options options
= CurrentOptions();
2913 options
.max_background_compactions
= 3;
2914 options
.max_background_flushes
= 2;
2916 options
.create_if_missing
= true;
2917 DestroyAndReopen(options
);
2919 for (size_t i
= 0; i
< kTotalTasks
; i
++) {
2920 // Insert 5 tasks to low priority queue and 5 tasks to high priority queue
2921 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
,
2923 (i
< 4) ? Env::Priority::LOW
: Env::Priority::HIGH
);
2926 // Wait 100 milliseconds for they are scheduled.
2927 env_
->SleepForMicroseconds(100000);
2929 // pool size 3, total task 4. Queue size should be 1.
2930 ASSERT_EQ(1U, options
.env
->GetThreadPoolQueueLen(Env::Priority::LOW
));
2931 // pool size 2, total task 4. Queue size should be 2.
2932 ASSERT_EQ(2U, options
.env
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
2934 for (size_t i
= 0; i
< kTotalTasks
; i
++) {
2935 sleeping_tasks
[i
].WakeUp();
2936 sleeping_tasks
[i
].WaitUntilDone();
2939 ASSERT_OK(Put("abc", "def"));
2940 ASSERT_EQ("def", Get("abc"));
2942 ASSERT_EQ("def", Get("abc"));
2946 TEST_F(DBTest
, WriteSingleThreadEntry
) {
2947 std::vector
<port::Thread
> threads
;
2948 dbfull()->TEST_LockMutex();
2949 auto w
= dbfull()->TEST_BeginWrite();
2950 threads
.emplace_back([&] { Put("a", "b"); });
2951 env_
->SleepForMicroseconds(10000);
2952 threads
.emplace_back([&] { Flush(); });
2953 env_
->SleepForMicroseconds(10000);
2954 dbfull()->TEST_UnlockMutex();
2955 dbfull()->TEST_LockMutex();
2956 dbfull()->TEST_EndWrite(w
);
2957 dbfull()->TEST_UnlockMutex();
2959 for (auto& t
: threads
) {
2964 #ifndef ROCKSDB_LITE
2965 TEST_F(DBTest
, DynamicMemtableOptions
) {
2966 const uint64_t k64KB
= 1 << 16;
2967 const uint64_t k128KB
= 1 << 17;
2968 const uint64_t k5KB
= 5 * 1024;
2971 options
.create_if_missing
= true;
2972 options
.compression
= kNoCompression
;
2973 options
.max_background_compactions
= 1;
2974 options
.write_buffer_size
= k64KB
;
2975 options
.arena_block_size
= 16 * 1024;
2976 options
.max_write_buffer_number
= 2;
2977 // Don't trigger compact/slowdown/stop
2978 options
.level0_file_num_compaction_trigger
= 1024;
2979 options
.level0_slowdown_writes_trigger
= 1024;
2980 options
.level0_stop_writes_trigger
= 1024;
2981 DestroyAndReopen(options
);
2983 auto gen_l0_kb
= [this](int size
) {
2984 const int kNumPutsBeforeWaitForFlush
= 64;
2986 for (int i
= 0; i
< size
; i
++) {
2987 ASSERT_OK(Put(Key(i
), RandomString(&rnd
, 1024)));
2989 // The following condition prevents a race condition between flush jobs
2990 // acquiring work and this thread filling up multiple memtables. Without
2991 // this, the flush might produce less files than expected because
2992 // multiple memtables are flushed into a single L0 file. This race
2993 // condition affects assertion (A).
2994 if (i
% kNumPutsBeforeWaitForFlush
== kNumPutsBeforeWaitForFlush
- 1) {
2995 dbfull()->TEST_WaitForFlushMemTable();
2998 dbfull()->TEST_WaitForFlushMemTable();
3001 // Test write_buffer_size
3003 ASSERT_EQ(NumTableFilesAtLevel(0), 1);
3004 ASSERT_LT(SizeAtLevel(0), k64KB
+ k5KB
);
3005 ASSERT_GT(SizeAtLevel(0), k64KB
- k5KB
* 2);
3008 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
3009 ASSERT_EQ(NumTableFilesAtLevel(0), 0);
3011 // Increase buffer size
3012 ASSERT_OK(dbfull()->SetOptions({
3013 {"write_buffer_size", "131072"},
3016 // The existing memtable is still 64KB in size, after it becomes immutable,
3017 // the next memtable will be 128KB in size. Write 256KB total, we should
3018 // have a 64KB L0 file, a 128KB L0 file, and a memtable with 64KB data
3020 ASSERT_EQ(NumTableFilesAtLevel(0), 2); // (A)
3021 ASSERT_LT(SizeAtLevel(0), k128KB
+ k64KB
+ 2 * k5KB
);
3022 ASSERT_GT(SizeAtLevel(0), k128KB
+ k64KB
- 4 * k5KB
);
3024 // Test max_write_buffer_number
3025 // Block compaction thread, which will also block the flushes because
3026 // max_background_flushes == 0, so flushes are getting executed by the
3027 // compaction thread
3028 env_
->SetBackgroundThreads(1, Env::LOW
);
3029 test::SleepingBackgroundTask sleeping_task_low
;
3030 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
3031 Env::Priority::LOW
);
3032 // Start from scratch and disable compaction/flush. Flush can only happen
3033 // during compaction but trigger is pretty high
3034 options
.max_background_flushes
= 0;
3035 options
.disable_auto_compactions
= true;
3036 DestroyAndReopen(options
);
3038 // Put until writes are stopped, bounded by 256 puts. We should see stop at
3043 rocksdb::SyncPoint::GetInstance()->SetCallBack(
3044 "DBImpl::DelayWrite:Wait",
3045 [&](void* arg
) { sleeping_task_low
.WakeUp(); });
3046 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
3048 while (!sleeping_task_low
.WokenUp() && count
< 256) {
3049 ASSERT_OK(Put(Key(count
), RandomString(&rnd
, 1024), WriteOptions()));
3052 ASSERT_GT(static_cast<double>(count
), 128 * 0.8);
3053 ASSERT_LT(static_cast<double>(count
), 128 * 1.2);
3055 sleeping_task_low
.WaitUntilDone();
3058 ASSERT_OK(dbfull()->SetOptions({
3059 {"max_write_buffer_number", "8"},
3061 // Clean up memtable and L0
3062 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
3064 sleeping_task_low
.Reset();
3065 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
3066 Env::Priority::LOW
);
3068 while (!sleeping_task_low
.WokenUp() && count
< 1024) {
3069 ASSERT_OK(Put(Key(count
), RandomString(&rnd
, 1024), WriteOptions()));
3072 // Windows fails this test. Will tune in the future and figure out
3075 ASSERT_GT(static_cast<double>(count
), 512 * 0.8);
3076 ASSERT_LT(static_cast<double>(count
), 512 * 1.2);
3078 sleeping_task_low
.WaitUntilDone();
3081 ASSERT_OK(dbfull()->SetOptions({
3082 {"max_write_buffer_number", "4"},
3084 // Clean up memtable and L0
3085 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
3087 sleeping_task_low
.Reset();
3088 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
3089 Env::Priority::LOW
);
3092 while (!sleeping_task_low
.WokenUp() && count
< 1024) {
3093 ASSERT_OK(Put(Key(count
), RandomString(&rnd
, 1024), WriteOptions()));
3096 // Windows fails this test. Will tune in the future and figure out
3099 ASSERT_GT(static_cast<double>(count
), 256 * 0.8);
3100 ASSERT_LT(static_cast<double>(count
), 266 * 1.2);
3102 sleeping_task_low
.WaitUntilDone();
3104 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
3106 #endif // ROCKSDB_LITE
3108 #ifdef ROCKSDB_USING_THREAD_STATUS
3110 void VerifyOperationCount(Env
* env
, ThreadStatus::OperationType op_type
,
3111 int expected_count
) {
3113 std::vector
<ThreadStatus
> thread_list
;
3114 ASSERT_OK(env
->GetThreadList(&thread_list
));
3115 for (auto thread
: thread_list
) {
3116 if (thread
.operation_type
== op_type
) {
3120 ASSERT_EQ(op_count
, expected_count
);
3124 TEST_F(DBTest
, GetThreadStatus
) {
3127 options
.enable_thread_tracking
= true;
3130 std::vector
<ThreadStatus
> thread_list
;
3131 Status s
= env_
->GetThreadList(&thread_list
);
3133 for (int i
= 0; i
< 2; ++i
) {
3134 // repeat the test with differet number of high / low priority threads
3135 const int kTestCount
= 3;
3136 const unsigned int kHighPriCounts
[kTestCount
] = {3, 2, 5};
3137 const unsigned int kLowPriCounts
[kTestCount
] = {10, 15, 3};
3138 for (int test
= 0; test
< kTestCount
; ++test
) {
3139 // Change the number of threads in high / low priority pool.
3140 env_
->SetBackgroundThreads(kHighPriCounts
[test
], Env::HIGH
);
3141 env_
->SetBackgroundThreads(kLowPriCounts
[test
], Env::LOW
);
3142 // Wait to ensure the all threads has been registered
3143 unsigned int thread_type_counts
[ThreadStatus::NUM_THREAD_TYPES
];
3144 // Try up to 60 seconds.
3145 for (int num_try
= 0; num_try
< 60000; num_try
++) {
3146 env_
->SleepForMicroseconds(1000);
3147 thread_list
.clear();
3148 s
= env_
->GetThreadList(&thread_list
);
3150 memset(thread_type_counts
, 0, sizeof(thread_type_counts
));
3151 for (auto thread
: thread_list
) {
3152 ASSERT_LT(thread
.thread_type
, ThreadStatus::NUM_THREAD_TYPES
);
3153 thread_type_counts
[thread
.thread_type
]++;
3155 if (thread_type_counts
[ThreadStatus::HIGH_PRIORITY
] ==
3156 kHighPriCounts
[test
] &&
3157 thread_type_counts
[ThreadStatus::LOW_PRIORITY
] ==
3158 kLowPriCounts
[test
]) {
3162 // Verify the total number of threades
3163 ASSERT_EQ(thread_type_counts
[ThreadStatus::HIGH_PRIORITY
] +
3164 thread_type_counts
[ThreadStatus::LOW_PRIORITY
],
3165 kHighPriCounts
[test
] + kLowPriCounts
[test
]);
3166 // Verify the number of high-priority threads
3167 ASSERT_EQ(thread_type_counts
[ThreadStatus::HIGH_PRIORITY
],
3168 kHighPriCounts
[test
]);
3169 // Verify the number of low-priority threads
3170 ASSERT_EQ(thread_type_counts
[ThreadStatus::LOW_PRIORITY
],
3171 kLowPriCounts
[test
]);
3174 // repeat the test with multiple column families
3175 CreateAndReopenWithCF({"pikachu", "about-to-remove"}, options
);
3176 env_
->GetThreadStatusUpdater()->TEST_VerifyColumnFamilyInfoMap(handles_
,
3180 db_
->DropColumnFamily(handles_
[2]);
3182 handles_
.erase(handles_
.begin() + 2);
3183 env_
->GetThreadStatusUpdater()->TEST_VerifyColumnFamilyInfoMap(handles_
,
3186 env_
->GetThreadStatusUpdater()->TEST_VerifyColumnFamilyInfoMap(handles_
,
3190 TEST_F(DBTest
, DisableThreadStatus
) {
3193 options
.enable_thread_tracking
= false;
3195 CreateAndReopenWithCF({"pikachu", "about-to-remove"}, options
);
3196 // Verify non of the column family info exists
3197 env_
->GetThreadStatusUpdater()->TEST_VerifyColumnFamilyInfoMap(handles_
,
3201 TEST_F(DBTest
, ThreadStatusFlush
) {
3204 options
.write_buffer_size
= 100000; // Small write buffer
3205 options
.enable_thread_tracking
= true;
3206 options
= CurrentOptions(options
);
3208 rocksdb::SyncPoint::GetInstance()->LoadDependency({
3209 {"FlushJob::FlushJob()", "DBTest::ThreadStatusFlush:1"},
3210 {"DBTest::ThreadStatusFlush:2", "FlushJob::WriteLevel0Table"},
3212 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
3214 CreateAndReopenWithCF({"pikachu"}, options
);
3215 VerifyOperationCount(env_
, ThreadStatus::OP_FLUSH
, 0);
3217 ASSERT_OK(Put(1, "foo", "v1"));
3218 ASSERT_EQ("v1", Get(1, "foo"));
3219 VerifyOperationCount(env_
, ThreadStatus::OP_FLUSH
, 0);
3221 uint64_t num_running_flushes
= 0;
3222 db_
->GetIntProperty(DB::Properties::kNumRunningFlushes
, &num_running_flushes
);
3223 ASSERT_EQ(num_running_flushes
, 0);
3225 Put(1, "k1", std::string(100000, 'x')); // Fill memtable
3226 Put(1, "k2", std::string(100000, 'y')); // Trigger flush
3228 // The first sync point is to make sure there's one flush job
3229 // running when we perform VerifyOperationCount().
3230 TEST_SYNC_POINT("DBTest::ThreadStatusFlush:1");
3231 VerifyOperationCount(env_
, ThreadStatus::OP_FLUSH
, 1);
3232 db_
->GetIntProperty(DB::Properties::kNumRunningFlushes
, &num_running_flushes
);
3233 ASSERT_EQ(num_running_flushes
, 1);
3234 // This second sync point is to ensure the flush job will not
3235 // be completed until we already perform VerifyOperationCount().
3236 TEST_SYNC_POINT("DBTest::ThreadStatusFlush:2");
3237 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
3240 TEST_P(DBTestWithParam
, ThreadStatusSingleCompaction
) {
3241 const int kTestKeySize
= 16;
3242 const int kTestValueSize
= 984;
3243 const int kEntrySize
= kTestKeySize
+ kTestValueSize
;
3244 const int kEntriesPerBuffer
= 100;
3246 options
.create_if_missing
= true;
3247 options
.write_buffer_size
= kEntrySize
* kEntriesPerBuffer
;
3248 options
.compaction_style
= kCompactionStyleLevel
;
3249 options
.target_file_size_base
= options
.write_buffer_size
;
3250 options
.max_bytes_for_level_base
= options
.target_file_size_base
* 2;
3251 options
.max_bytes_for_level_multiplier
= 2;
3252 options
.compression
= kNoCompression
;
3253 options
= CurrentOptions(options
);
3255 options
.enable_thread_tracking
= true;
3256 const int kNumL0Files
= 4;
3257 options
.level0_file_num_compaction_trigger
= kNumL0Files
;
3258 options
.max_subcompactions
= max_subcompactions_
;
3260 rocksdb::SyncPoint::GetInstance()->LoadDependency({
3261 {"DBTest::ThreadStatusSingleCompaction:0", "DBImpl::BGWorkCompaction"},
3262 {"CompactionJob::Run():Start", "DBTest::ThreadStatusSingleCompaction:1"},
3263 {"DBTest::ThreadStatusSingleCompaction:2", "CompactionJob::Run():End"},
3265 for (int tests
= 0; tests
< 2; ++tests
) {
3266 DestroyAndReopen(options
);
3267 rocksdb::SyncPoint::GetInstance()->ClearTrace();
3268 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
3272 for (int file
= 0; file
< kNumL0Files
; ++file
) {
3273 for (int key
= 0; key
< kEntriesPerBuffer
; ++key
) {
3274 ASSERT_OK(Put(ToString(key
+ file
* kEntriesPerBuffer
),
3275 RandomString(&rnd
, kTestValueSize
)));
3279 // This makes sure a compaction won't be scheduled until
3280 // we have done with the above Put Phase.
3281 uint64_t num_running_compactions
= 0;
3282 db_
->GetIntProperty(DB::Properties::kNumRunningCompactions
,
3283 &num_running_compactions
);
3284 ASSERT_EQ(num_running_compactions
, 0);
3285 TEST_SYNC_POINT("DBTest::ThreadStatusSingleCompaction:0");
3286 ASSERT_GE(NumTableFilesAtLevel(0),
3287 options
.level0_file_num_compaction_trigger
);
3289 // This makes sure at least one compaction is running.
3290 TEST_SYNC_POINT("DBTest::ThreadStatusSingleCompaction:1");
3292 if (options
.enable_thread_tracking
) {
3293 // expecting one single L0 to L1 compaction
3294 VerifyOperationCount(env_
, ThreadStatus::OP_COMPACTION
, 1);
3296 // If thread tracking is not enabled, compaction count should be 0.
3297 VerifyOperationCount(env_
, ThreadStatus::OP_COMPACTION
, 0);
3299 db_
->GetIntProperty(DB::Properties::kNumRunningCompactions
,
3300 &num_running_compactions
);
3301 ASSERT_EQ(num_running_compactions
, 1);
3302 // TODO(yhchiang): adding assert to verify each compaction stage.
3303 TEST_SYNC_POINT("DBTest::ThreadStatusSingleCompaction:2");
3305 // repeat the test with disabling thread tracking.
3306 options
.enable_thread_tracking
= false;
3307 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
3311 TEST_P(DBTestWithParam
, PreShutdownManualCompaction
) {
3312 Options options
= CurrentOptions();
3313 options
.max_background_flushes
= 0;
3314 options
.max_subcompactions
= max_subcompactions_
;
3315 CreateAndReopenWithCF({"pikachu"}, options
);
3317 // iter - 0 with 7 levels
3318 // iter - 1 with 3 levels
3319 for (int iter
= 0; iter
< 2; ++iter
) {
3320 MakeTables(3, "p", "q", 1);
3321 ASSERT_EQ("1,1,1", FilesPerLevel(1));
3323 // Compaction range falls before files
3324 Compact(1, "", "c");
3325 ASSERT_EQ("1,1,1", FilesPerLevel(1));
3327 // Compaction range falls after files
3328 Compact(1, "r", "z");
3329 ASSERT_EQ("1,1,1", FilesPerLevel(1));
3331 // Compaction range overlaps files
3332 Compact(1, "p1", "p9");
3333 ASSERT_EQ("0,0,1", FilesPerLevel(1));
3335 // Populate a different range
3336 MakeTables(3, "c", "e", 1);
3337 ASSERT_EQ("1,1,2", FilesPerLevel(1));
3339 // Compact just the new range
3340 Compact(1, "b", "f");
3341 ASSERT_EQ("0,0,2", FilesPerLevel(1));
3344 MakeTables(1, "a", "z", 1);
3345 ASSERT_EQ("1,0,2", FilesPerLevel(1));
3346 CancelAllBackgroundWork(db_
);
3347 db_
->CompactRange(CompactRangeOptions(), handles_
[1], nullptr, nullptr);
3348 ASSERT_EQ("1,0,2", FilesPerLevel(1));
3351 options
= CurrentOptions();
3352 options
.max_background_flushes
= 0;
3353 options
.num_levels
= 3;
3354 options
.create_if_missing
= true;
3355 DestroyAndReopen(options
);
3356 CreateAndReopenWithCF({"pikachu"}, options
);
3361 TEST_F(DBTest
, PreShutdownFlush
) {
3362 Options options
= CurrentOptions();
3363 options
.max_background_flushes
= 0;
3364 CreateAndReopenWithCF({"pikachu"}, options
);
3365 ASSERT_OK(Put(1, "key", "value"));
3366 CancelAllBackgroundWork(db_
);
3368 db_
->CompactRange(CompactRangeOptions(), handles_
[1], nullptr, nullptr);
3369 ASSERT_TRUE(s
.IsShutdownInProgress());
3372 TEST_P(DBTestWithParam
, PreShutdownMultipleCompaction
) {
3373 const int kTestKeySize
= 16;
3374 const int kTestValueSize
= 984;
3375 const int kEntrySize
= kTestKeySize
+ kTestValueSize
;
3376 const int kEntriesPerBuffer
= 40;
3377 const int kNumL0Files
= 4;
3379 const int kHighPriCount
= 3;
3380 const int kLowPriCount
= 5;
3381 env_
->SetBackgroundThreads(kHighPriCount
, Env::HIGH
);
3382 env_
->SetBackgroundThreads(kLowPriCount
, Env::LOW
);
3385 options
.create_if_missing
= true;
3386 options
.write_buffer_size
= kEntrySize
* kEntriesPerBuffer
;
3387 options
.compaction_style
= kCompactionStyleLevel
;
3388 options
.target_file_size_base
= options
.write_buffer_size
;
3389 options
.max_bytes_for_level_base
=
3390 options
.target_file_size_base
* kNumL0Files
;
3391 options
.compression
= kNoCompression
;
3392 options
= CurrentOptions(options
);
3394 options
.enable_thread_tracking
= true;
3395 options
.level0_file_num_compaction_trigger
= kNumL0Files
;
3396 options
.max_bytes_for_level_multiplier
= 2;
3397 options
.max_background_compactions
= kLowPriCount
;
3398 options
.level0_stop_writes_trigger
= 1 << 10;
3399 options
.level0_slowdown_writes_trigger
= 1 << 10;
3400 options
.max_subcompactions
= max_subcompactions_
;
3405 std::vector
<ThreadStatus
> thread_list
;
3406 // Delay both flush and compaction
3407 rocksdb::SyncPoint::GetInstance()->LoadDependency(
3408 {{"FlushJob::FlushJob()", "CompactionJob::Run():Start"},
3409 {"CompactionJob::Run():Start",
3410 "DBTest::PreShutdownMultipleCompaction:Preshutdown"},
3411 {"CompactionJob::Run():Start",
3412 "DBTest::PreShutdownMultipleCompaction:VerifyCompaction"},
3413 {"DBTest::PreShutdownMultipleCompaction:Preshutdown",
3414 "CompactionJob::Run():End"},
3415 {"CompactionJob::Run():End",
3416 "DBTest::PreShutdownMultipleCompaction:VerifyPreshutdown"}});
3418 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
3420 // Make rocksdb busy
3422 // check how many threads are doing compaction using GetThreadList
3423 int operation_count
[ThreadStatus::NUM_OP_TYPES
] = {0};
3424 for (int file
= 0; file
< 16 * kNumL0Files
; ++file
) {
3425 for (int k
= 0; k
< kEntriesPerBuffer
; ++k
) {
3426 ASSERT_OK(Put(ToString(key
++), RandomString(&rnd
, kTestValueSize
)));
3429 Status s
= env_
->GetThreadList(&thread_list
);
3430 for (auto thread
: thread_list
) {
3431 operation_count
[thread
.operation_type
]++;
3434 // Speed up the test
3435 if (operation_count
[ThreadStatus::OP_FLUSH
] > 1 &&
3436 operation_count
[ThreadStatus::OP_COMPACTION
] >
3437 0.6 * options
.max_background_compactions
) {
3440 if (file
== 15 * kNumL0Files
) {
3441 TEST_SYNC_POINT("DBTest::PreShutdownMultipleCompaction:Preshutdown");
3445 TEST_SYNC_POINT("DBTest::PreShutdownMultipleCompaction:Preshutdown");
3446 ASSERT_GE(operation_count
[ThreadStatus::OP_COMPACTION
], 1);
3447 CancelAllBackgroundWork(db_
);
3448 TEST_SYNC_POINT("DBTest::PreShutdownMultipleCompaction:VerifyPreshutdown");
3449 dbfull()->TEST_WaitForCompact();
3450 // Record the number of compactions at a time.
3451 for (int i
= 0; i
< ThreadStatus::NUM_OP_TYPES
; ++i
) {
3452 operation_count
[i
] = 0;
3454 Status s
= env_
->GetThreadList(&thread_list
);
3455 for (auto thread
: thread_list
) {
3456 operation_count
[thread
.operation_type
]++;
3458 ASSERT_EQ(operation_count
[ThreadStatus::OP_COMPACTION
], 0);
3461 TEST_P(DBTestWithParam
, PreShutdownCompactionMiddle
) {
3462 const int kTestKeySize
= 16;
3463 const int kTestValueSize
= 984;
3464 const int kEntrySize
= kTestKeySize
+ kTestValueSize
;
3465 const int kEntriesPerBuffer
= 40;
3466 const int kNumL0Files
= 4;
3468 const int kHighPriCount
= 3;
3469 const int kLowPriCount
= 5;
3470 env_
->SetBackgroundThreads(kHighPriCount
, Env::HIGH
);
3471 env_
->SetBackgroundThreads(kLowPriCount
, Env::LOW
);
3474 options
.create_if_missing
= true;
3475 options
.write_buffer_size
= kEntrySize
* kEntriesPerBuffer
;
3476 options
.compaction_style
= kCompactionStyleLevel
;
3477 options
.target_file_size_base
= options
.write_buffer_size
;
3478 options
.max_bytes_for_level_base
=
3479 options
.target_file_size_base
* kNumL0Files
;
3480 options
.compression
= kNoCompression
;
3481 options
= CurrentOptions(options
);
3483 options
.enable_thread_tracking
= true;
3484 options
.level0_file_num_compaction_trigger
= kNumL0Files
;
3485 options
.max_bytes_for_level_multiplier
= 2;
3486 options
.max_background_compactions
= kLowPriCount
;
3487 options
.level0_stop_writes_trigger
= 1 << 10;
3488 options
.level0_slowdown_writes_trigger
= 1 << 10;
3489 options
.max_subcompactions
= max_subcompactions_
;
3494 std::vector
<ThreadStatus
> thread_list
;
3495 // Delay both flush and compaction
3496 rocksdb::SyncPoint::GetInstance()->LoadDependency(
3497 {{"DBTest::PreShutdownCompactionMiddle:Preshutdown",
3498 "CompactionJob::Run():Inprogress"},
3499 {"CompactionJob::Run():Start",
3500 "DBTest::PreShutdownCompactionMiddle:VerifyCompaction"},
3501 {"CompactionJob::Run():Inprogress", "CompactionJob::Run():End"},
3502 {"CompactionJob::Run():End",
3503 "DBTest::PreShutdownCompactionMiddle:VerifyPreshutdown"}});
3505 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
3507 // Make rocksdb busy
3509 // check how many threads are doing compaction using GetThreadList
3510 int operation_count
[ThreadStatus::NUM_OP_TYPES
] = {0};
3511 for (int file
= 0; file
< 16 * kNumL0Files
; ++file
) {
3512 for (int k
= 0; k
< kEntriesPerBuffer
; ++k
) {
3513 ASSERT_OK(Put(ToString(key
++), RandomString(&rnd
, kTestValueSize
)));
3516 Status s
= env_
->GetThreadList(&thread_list
);
3517 for (auto thread
: thread_list
) {
3518 operation_count
[thread
.operation_type
]++;
3521 // Speed up the test
3522 if (operation_count
[ThreadStatus::OP_FLUSH
] > 1 &&
3523 operation_count
[ThreadStatus::OP_COMPACTION
] >
3524 0.6 * options
.max_background_compactions
) {
3527 if (file
== 15 * kNumL0Files
) {
3528 TEST_SYNC_POINT("DBTest::PreShutdownCompactionMiddle:VerifyCompaction");
3532 ASSERT_GE(operation_count
[ThreadStatus::OP_COMPACTION
], 1);
3533 CancelAllBackgroundWork(db_
);
3534 TEST_SYNC_POINT("DBTest::PreShutdownCompactionMiddle:Preshutdown");
3535 TEST_SYNC_POINT("DBTest::PreShutdownCompactionMiddle:VerifyPreshutdown");
3536 dbfull()->TEST_WaitForCompact();
3537 // Record the number of compactions at a time.
3538 for (int i
= 0; i
< ThreadStatus::NUM_OP_TYPES
; ++i
) {
3539 operation_count
[i
] = 0;
3541 Status s
= env_
->GetThreadList(&thread_list
);
3542 for (auto thread
: thread_list
) {
3543 operation_count
[thread
.operation_type
]++;
3545 ASSERT_EQ(operation_count
[ThreadStatus::OP_COMPACTION
], 0);
3548 #endif // ROCKSDB_USING_THREAD_STATUS
3550 #ifndef ROCKSDB_LITE
3551 TEST_F(DBTest
, FlushOnDestroy
) {
3553 wo
.disableWAL
= true;
3554 ASSERT_OK(Put("foo", "v1", wo
));
3555 CancelAllBackgroundWork(db_
);
3558 TEST_F(DBTest
, DynamicLevelCompressionPerLevel
) {
3559 if (!Snappy_Supported()) {
3562 const int kNKeys
= 120;
3564 for (int i
= 0; i
< kNKeys
; i
++) {
3567 std::random_shuffle(std::begin(keys
), std::end(keys
));
3571 options
.create_if_missing
= true;
3572 options
.db_write_buffer_size
= 20480;
3573 options
.write_buffer_size
= 20480;
3574 options
.max_write_buffer_number
= 2;
3575 options
.level0_file_num_compaction_trigger
= 2;
3576 options
.level0_slowdown_writes_trigger
= 2;
3577 options
.level0_stop_writes_trigger
= 2;
3578 options
.target_file_size_base
= 20480;
3579 options
.level_compaction_dynamic_level_bytes
= true;
3580 options
.max_bytes_for_level_base
= 102400;
3581 options
.max_bytes_for_level_multiplier
= 4;
3582 options
.max_background_compactions
= 1;
3583 options
.num_levels
= 5;
3585 options
.compression_per_level
.resize(3);
3586 options
.compression_per_level
[0] = kNoCompression
;
3587 options
.compression_per_level
[1] = kNoCompression
;
3588 options
.compression_per_level
[2] = kSnappyCompression
;
3590 OnFileDeletionListener
* listener
= new OnFileDeletionListener();
3591 options
.listeners
.emplace_back(listener
);
3593 DestroyAndReopen(options
);
3595 // Insert more than 80K. L4 should be base level. Neither L0 nor L4 should
3596 // be compressed, so total data size should be more than 80K.
3597 for (int i
= 0; i
< 20; i
++) {
3598 ASSERT_OK(Put(Key(keys
[i
]), CompressibleString(&rnd
, 4000)));
3601 dbfull()->TEST_WaitForCompact();
3603 ASSERT_EQ(NumTableFilesAtLevel(1), 0);
3604 ASSERT_EQ(NumTableFilesAtLevel(2), 0);
3605 ASSERT_EQ(NumTableFilesAtLevel(3), 0);
3606 // Assuming each files' metadata is at least 50 bytes/
3607 ASSERT_GT(SizeAtLevel(0) + SizeAtLevel(4), 20U * 4000U + 50U * 4);
3609 // Insert 400KB. Some data will be compressed
3610 for (int i
= 21; i
< 120; i
++) {
3611 ASSERT_OK(Put(Key(keys
[i
]), CompressibleString(&rnd
, 4000)));
3614 dbfull()->TEST_WaitForCompact();
3615 ASSERT_EQ(NumTableFilesAtLevel(1), 0);
3616 ASSERT_EQ(NumTableFilesAtLevel(2), 0);
3617 ASSERT_LT(SizeAtLevel(0) + SizeAtLevel(3) + SizeAtLevel(4),
3618 120U * 4000U + 50U * 24);
3619 // Make sure data in files in L3 is not compacted by removing all files
3620 // in L4 and calculate number of rows
3621 ASSERT_OK(dbfull()->SetOptions({
3622 {"disable_auto_compactions", "true"},
3624 ColumnFamilyMetaData cf_meta
;
3625 db_
->GetColumnFamilyMetaData(&cf_meta
);
3626 for (auto file
: cf_meta
.levels
[4].files
) {
3627 listener
->SetExpectedFileName(dbname_
+ file
.name
);
3628 ASSERT_OK(dbfull()->DeleteFile(file
.name
));
3630 listener
->VerifyMatchedCount(cf_meta
.levels
[4].files
.size());
3633 std::unique_ptr
<Iterator
> iter(db_
->NewIterator(ReadOptions()));
3634 for (iter
->SeekToFirst(); iter
->Valid(); iter
->Next()) {
3637 ASSERT_OK(iter
->status());
3638 ASSERT_GT(SizeAtLevel(0) + SizeAtLevel(3), num_keys
* 4000U + num_keys
* 10U);
3641 TEST_F(DBTest
, DynamicLevelCompressionPerLevel2
) {
3642 if (!Snappy_Supported() || !LZ4_Supported() || !Zlib_Supported()) {
3645 const int kNKeys
= 500;
3647 for (int i
= 0; i
< kNKeys
; i
++) {
3650 std::random_shuffle(std::begin(keys
), std::end(keys
));
3654 options
.create_if_missing
= true;
3655 options
.db_write_buffer_size
= 6000;
3656 options
.write_buffer_size
= 6000;
3657 options
.max_write_buffer_number
= 2;
3658 options
.level0_file_num_compaction_trigger
= 2;
3659 options
.level0_slowdown_writes_trigger
= 2;
3660 options
.level0_stop_writes_trigger
= 2;
3661 options
.soft_pending_compaction_bytes_limit
= 1024 * 1024;
3663 // Use file size to distinguish levels
3664 // L1: 10, L2: 20, L3 40, L4 80
3665 // L0 is less than 30
3666 options
.target_file_size_base
= 10;
3667 options
.target_file_size_multiplier
= 2;
3669 options
.level_compaction_dynamic_level_bytes
= true;
3670 options
.max_bytes_for_level_base
= 200;
3671 options
.max_bytes_for_level_multiplier
= 8;
3672 options
.max_background_compactions
= 1;
3673 options
.num_levels
= 5;
3674 std::shared_ptr
<mock::MockTableFactory
> mtf(new mock::MockTableFactory
);
3675 options
.table_factory
= mtf
;
3677 options
.compression_per_level
.resize(3);
3678 options
.compression_per_level
[0] = kNoCompression
;
3679 options
.compression_per_level
[1] = kLZ4Compression
;
3680 options
.compression_per_level
[2] = kZlibCompression
;
3682 DestroyAndReopen(options
);
3683 // When base level is L4, L4 is LZ4.
3684 std::atomic
<int> num_zlib(0);
3685 std::atomic
<int> num_lz4(0);
3686 std::atomic
<int> num_no(0);
3687 rocksdb::SyncPoint::GetInstance()->SetCallBack(
3688 "LevelCompactionPicker::PickCompaction:Return", [&](void* arg
) {
3689 Compaction
* compaction
= reinterpret_cast<Compaction
*>(arg
);
3690 if (compaction
->output_level() == 4) {
3691 ASSERT_TRUE(compaction
->output_compression() == kLZ4Compression
);
3692 num_lz4
.fetch_add(1);
3695 rocksdb::SyncPoint::GetInstance()->SetCallBack(
3696 "FlushJob::WriteLevel0Table:output_compression", [&](void* arg
) {
3697 auto* compression
= reinterpret_cast<CompressionType
*>(arg
);
3698 ASSERT_TRUE(*compression
== kNoCompression
);
3699 num_no
.fetch_add(1);
3701 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
3703 for (int i
= 0; i
< 100; i
++) {
3704 ASSERT_OK(Put(Key(keys
[i
]), RandomString(&rnd
, 200)));
3707 dbfull()->TEST_WaitForFlushMemTable();
3712 dbfull()->TEST_WaitForFlushMemTable();
3713 dbfull()->TEST_WaitForCompact();
3714 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
3715 rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
3717 ASSERT_EQ(NumTableFilesAtLevel(1), 0);
3718 ASSERT_EQ(NumTableFilesAtLevel(2), 0);
3719 ASSERT_EQ(NumTableFilesAtLevel(3), 0);
3720 ASSERT_GT(NumTableFilesAtLevel(4), 0);
3721 ASSERT_GT(num_no
.load(), 2);
3722 ASSERT_GT(num_lz4
.load(), 0);
3723 int prev_num_files_l4
= NumTableFilesAtLevel(4);
3725 // After base level turn L4->L3, L3 becomes LZ4 and L4 becomes Zlib
3728 rocksdb::SyncPoint::GetInstance()->SetCallBack(
3729 "LevelCompactionPicker::PickCompaction:Return", [&](void* arg
) {
3730 Compaction
* compaction
= reinterpret_cast<Compaction
*>(arg
);
3731 if (compaction
->output_level() == 4 && compaction
->start_level() == 3) {
3732 ASSERT_TRUE(compaction
->output_compression() == kZlibCompression
);
3733 num_zlib
.fetch_add(1);
3735 ASSERT_TRUE(compaction
->output_compression() == kLZ4Compression
);
3736 num_lz4
.fetch_add(1);
3739 rocksdb::SyncPoint::GetInstance()->SetCallBack(
3740 "FlushJob::WriteLevel0Table:output_compression", [&](void* arg
) {
3741 auto* compression
= reinterpret_cast<CompressionType
*>(arg
);
3742 ASSERT_TRUE(*compression
== kNoCompression
);
3743 num_no
.fetch_add(1);
3745 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
3747 for (int i
= 101; i
< 500; i
++) {
3748 ASSERT_OK(Put(Key(keys
[i
]), RandomString(&rnd
, 200)));
3749 if (i
% 100 == 99) {
3751 dbfull()->TEST_WaitForCompact();
3755 rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
3756 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
3757 ASSERT_EQ(NumTableFilesAtLevel(1), 0);
3758 ASSERT_EQ(NumTableFilesAtLevel(2), 0);
3759 ASSERT_GT(NumTableFilesAtLevel(3), 0);
3760 ASSERT_GT(NumTableFilesAtLevel(4), prev_num_files_l4
);
3761 ASSERT_GT(num_no
.load(), 2);
3762 ASSERT_GT(num_lz4
.load(), 0);
3763 ASSERT_GT(num_zlib
.load(), 0);
3766 TEST_F(DBTest
, DynamicCompactionOptions
) {
3767 // minimum write buffer size is enforced at 64KB
3768 const uint64_t k32KB
= 1 << 15;
3769 const uint64_t k64KB
= 1 << 16;
3770 const uint64_t k128KB
= 1 << 17;
3771 const uint64_t k1MB
= 1 << 20;
3772 const uint64_t k4KB
= 1 << 12;
3775 options
.create_if_missing
= true;
3776 options
.compression
= kNoCompression
;
3777 options
.soft_pending_compaction_bytes_limit
= 1024 * 1024;
3778 options
.write_buffer_size
= k64KB
;
3779 options
.arena_block_size
= 4 * k4KB
;
3780 options
.max_write_buffer_number
= 2;
3781 // Compaction related options
3782 options
.level0_file_num_compaction_trigger
= 3;
3783 options
.level0_slowdown_writes_trigger
= 4;
3784 options
.level0_stop_writes_trigger
= 8;
3785 options
.target_file_size_base
= k64KB
;
3786 options
.max_compaction_bytes
= options
.target_file_size_base
* 10;
3787 options
.target_file_size_multiplier
= 1;
3788 options
.max_bytes_for_level_base
= k128KB
;
3789 options
.max_bytes_for_level_multiplier
= 4;
3791 // Block flush thread and disable compaction thread
3792 env_
->SetBackgroundThreads(1, Env::LOW
);
3793 env_
->SetBackgroundThreads(1, Env::HIGH
);
3794 DestroyAndReopen(options
);
3796 auto gen_l0_kb
= [this](int start
, int size
, int stride
) {
3798 for (int i
= 0; i
< size
; i
++) {
3799 ASSERT_OK(Put(Key(start
+ stride
* i
), RandomString(&rnd
, 1024)));
3801 dbfull()->TEST_WaitForFlushMemTable();
3804 // Write 3 files that have the same key range.
3805 // Since level0_file_num_compaction_trigger is 3, compaction should be
3806 // triggered. The compaction should result in one L1 file
3807 gen_l0_kb(0, 64, 1);
3808 ASSERT_EQ(NumTableFilesAtLevel(0), 1);
3809 gen_l0_kb(0, 64, 1);
3810 ASSERT_EQ(NumTableFilesAtLevel(0), 2);
3811 gen_l0_kb(0, 64, 1);
3812 dbfull()->TEST_WaitForCompact();
3813 ASSERT_EQ("0,1", FilesPerLevel());
3814 std::vector
<LiveFileMetaData
> metadata
;
3815 db_
->GetLiveFilesMetaData(&metadata
);
3816 ASSERT_EQ(1U, metadata
.size());
3817 ASSERT_LE(metadata
[0].size
, k64KB
+ k4KB
);
3818 ASSERT_GE(metadata
[0].size
, k64KB
- k4KB
);
3820 // Test compaction trigger and target_file_size_base
3821 // Reduce compaction trigger to 2, and reduce L1 file size to 32KB.
3822 // Writing to 64KB L0 files should trigger a compaction. Since these
3823 // 2 L0 files have the same key range, compaction merge them and should
3824 // result in 2 32KB L1 files.
3825 ASSERT_OK(dbfull()->SetOptions({{"level0_file_num_compaction_trigger", "2"},
3826 {"target_file_size_base", ToString(k32KB
)}}));
3828 gen_l0_kb(0, 64, 1);
3829 ASSERT_EQ("1,1", FilesPerLevel());
3830 gen_l0_kb(0, 64, 1);
3831 dbfull()->TEST_WaitForCompact();
3832 ASSERT_EQ("0,2", FilesPerLevel());
3834 db_
->GetLiveFilesMetaData(&metadata
);
3835 ASSERT_EQ(2U, metadata
.size());
3836 ASSERT_LE(metadata
[0].size
, k32KB
+ k4KB
);
3837 ASSERT_GE(metadata
[0].size
, k32KB
- k4KB
);
3838 ASSERT_LE(metadata
[1].size
, k32KB
+ k4KB
);
3839 ASSERT_GE(metadata
[1].size
, k32KB
- k4KB
);
3841 // Test max_bytes_for_level_base
3842 // Increase level base size to 256KB and write enough data that will
3843 // fill L1 and L2. L1 size should be around 256KB while L2 size should be
3844 // around 256KB x 4.
3846 dbfull()->SetOptions({{"max_bytes_for_level_base", ToString(k1MB
)}}));
3848 // writing 96 x 64KB => 6 * 1024KB
3849 // (L1 + L2) = (1 + 4) * 1024KB
3850 for (int i
= 0; i
< 96; ++i
) {
3851 gen_l0_kb(i
, 64, 96);
3853 dbfull()->TEST_WaitForCompact();
3854 ASSERT_GT(SizeAtLevel(1), k1MB
/ 2);
3855 ASSERT_LT(SizeAtLevel(1), k1MB
+ k1MB
/ 2);
3857 // Within (0.5, 1.5) of 4MB.
3858 ASSERT_GT(SizeAtLevel(2), 2 * k1MB
);
3859 ASSERT_LT(SizeAtLevel(2), 6 * k1MB
);
3861 // Test max_bytes_for_level_multiplier and
3862 // max_bytes_for_level_base. Now, reduce both mulitplier and level base,
3863 // After filling enough data that can fit in L1 - L3, we should see L1 size
3864 // reduces to 128KB from 256KB which was asserted previously. Same for L2.
3866 dbfull()->SetOptions({{"max_bytes_for_level_multiplier", "2"},
3867 {"max_bytes_for_level_base", ToString(k128KB
)}}));
3869 // writing 20 x 64KB = 10 x 128KB
3870 // (L1 + L2 + L3) = (1 + 2 + 4) * 128KB
3871 for (int i
= 0; i
< 20; ++i
) {
3872 gen_l0_kb(i
, 64, 32);
3874 dbfull()->TEST_WaitForCompact();
3875 uint64_t total_size
= SizeAtLevel(1) + SizeAtLevel(2) + SizeAtLevel(3);
3876 ASSERT_TRUE(total_size
< k128KB
* 7 * 1.5);
3878 // Test level0_stop_writes_trigger.
3879 // Clean up memtable and L0. Block compaction threads. If continue to write
3880 // and flush memtables. We should see put stop after 8 memtable flushes
3881 // since level0_stop_writes_trigger = 8
3882 dbfull()->TEST_FlushMemTable(true);
3883 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
3885 test::SleepingBackgroundTask sleeping_task_low
;
3886 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
3887 Env::Priority::LOW
);
3888 sleeping_task_low
.WaitUntilSleeping();
3889 ASSERT_EQ(NumTableFilesAtLevel(0), 0);
3893 while (count
< 64) {
3894 ASSERT_OK(Put(Key(count
), RandomString(&rnd
, 1024), wo
));
3895 dbfull()->TEST_FlushMemTable(true);
3897 if (dbfull()->TEST_write_controler().IsStopped()) {
3898 sleeping_task_low
.WakeUp();
3903 ASSERT_EQ(count
, 8);
3905 sleeping_task_low
.WaitUntilDone();
3907 // Now reduce level0_stop_writes_trigger to 6. Clear up memtables and L0.
3908 // Block compaction thread again. Perform the put and memtable flushes
3909 // until we see the stop after 6 memtable flushes.
3910 ASSERT_OK(dbfull()->SetOptions({{"level0_stop_writes_trigger", "6"}}));
3911 dbfull()->TEST_FlushMemTable(true);
3912 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
3913 ASSERT_EQ(NumTableFilesAtLevel(0), 0);
3915 // Block compaction again
3916 sleeping_task_low
.Reset();
3917 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
3918 Env::Priority::LOW
);
3919 sleeping_task_low
.WaitUntilSleeping();
3921 while (count
< 64) {
3922 ASSERT_OK(Put(Key(count
), RandomString(&rnd
, 1024), wo
));
3923 dbfull()->TEST_FlushMemTable(true);
3925 if (dbfull()->TEST_write_controler().IsStopped()) {
3926 sleeping_task_low
.WakeUp();
3930 ASSERT_EQ(count
, 6);
3932 sleeping_task_low
.WaitUntilDone();
3934 // Test disable_auto_compactions
3935 // Compaction thread is unblocked but auto compaction is disabled. Write
3936 // 4 L0 files and compaction should be triggered. If auto compaction is
3937 // disabled, then TEST_WaitForCompact will be waiting for nothing. Number of
3938 // L0 files do not change after the call.
3939 ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "true"}}));
3940 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
3941 ASSERT_EQ(NumTableFilesAtLevel(0), 0);
3943 for (int i
= 0; i
< 4; ++i
) {
3944 ASSERT_OK(Put(Key(i
), RandomString(&rnd
, 1024)));
3945 // Wait for compaction so that put won't stop
3946 dbfull()->TEST_FlushMemTable(true);
3948 dbfull()->TEST_WaitForCompact();
3949 ASSERT_EQ(NumTableFilesAtLevel(0), 4);
3951 // Enable auto compaction and perform the same test, # of L0 files should be
3952 // reduced after compaction.
3953 ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
3954 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
3955 ASSERT_EQ(NumTableFilesAtLevel(0), 0);
3957 for (int i
= 0; i
< 4; ++i
) {
3958 ASSERT_OK(Put(Key(i
), RandomString(&rnd
, 1024)));
3959 // Wait for compaction so that put won't stop
3960 dbfull()->TEST_FlushMemTable(true);
3962 dbfull()->TEST_WaitForCompact();
3963 ASSERT_LT(NumTableFilesAtLevel(0), 4);
3965 #endif // ROCKSDB_LITE
3967 TEST_F(DBTest
, FileCreationRandomFailure
) {
3970 options
.create_if_missing
= true;
3971 options
.write_buffer_size
= 100000; // Small write buffer
3972 options
.target_file_size_base
= 200000;
3973 options
.max_bytes_for_level_base
= 1000000;
3974 options
.max_bytes_for_level_multiplier
= 2;
3976 DestroyAndReopen(options
);
3979 const int kCDTKeysPerBuffer
= 4;
3980 const int kTestSize
= kCDTKeysPerBuffer
* 4096;
3981 const int kTotalIteration
= 100;
3982 // the second half of the test involves in random failure
3983 // of file creation.
3984 const int kRandomFailureTest
= kTotalIteration
/ 2;
3985 std::vector
<std::string
> values
;
3986 for (int i
= 0; i
< kTestSize
; ++i
) {
3987 values
.push_back("NOT_FOUND");
3989 for (int j
= 0; j
< kTotalIteration
; ++j
) {
3990 if (j
== kRandomFailureTest
) {
3991 env_
->non_writeable_rate_
.store(90);
3993 for (int k
= 0; k
< kTestSize
; ++k
) {
3994 // here we expect some of the Put fails.
3995 std::string value
= RandomString(&rnd
, 100);
3996 Status s
= Put(Key(k
), Slice(value
));
3998 // update the latest successful put
4001 // But everything before we simulate the failure-test should succeed.
4002 if (j
< kRandomFailureTest
) {
4008 // If rocksdb does not do the correct job, internal assert will fail here.
4009 dbfull()->TEST_WaitForFlushMemTable();
4010 dbfull()->TEST_WaitForCompact();
4012 // verify we have the latest successful update
4013 for (int k
= 0; k
< kTestSize
; ++k
) {
4014 auto v
= Get(Key(k
));
4015 ASSERT_EQ(v
, values
[k
]);
4018 // reopen and reverify we have the latest successful update
4019 env_
->non_writeable_rate_
.store(0);
4021 for (int k
= 0; k
< kTestSize
; ++k
) {
4022 auto v
= Get(Key(k
));
4023 ASSERT_EQ(v
, values
[k
]);
4027 #ifndef ROCKSDB_LITE
4028 TEST_F(DBTest
, DynamicMiscOptions
) {
4029 // Test max_sequential_skip_in_iterations
4032 options
.create_if_missing
= true;
4033 options
.max_sequential_skip_in_iterations
= 16;
4034 options
.compression
= kNoCompression
;
4035 options
.statistics
= rocksdb::CreateDBStatistics();
4036 DestroyAndReopen(options
);
4038 auto assert_reseek_count
= [this, &options
](int key_start
, int num_reseek
) {
4039 int key0
= key_start
;
4040 int key1
= key_start
+ 1;
4041 int key2
= key_start
+ 2;
4043 ASSERT_OK(Put(Key(key0
), RandomString(&rnd
, 8)));
4044 for (int i
= 0; i
< 10; ++i
) {
4045 ASSERT_OK(Put(Key(key1
), RandomString(&rnd
, 8)));
4047 ASSERT_OK(Put(Key(key2
), RandomString(&rnd
, 8)));
4048 std::unique_ptr
<Iterator
> iter(db_
->NewIterator(ReadOptions()));
4049 iter
->Seek(Key(key1
));
4050 ASSERT_TRUE(iter
->Valid());
4051 ASSERT_EQ(iter
->key().compare(Key(key1
)), 0);
4053 ASSERT_TRUE(iter
->Valid());
4054 ASSERT_EQ(iter
->key().compare(Key(key2
)), 0);
4055 ASSERT_EQ(num_reseek
,
4056 TestGetTickerCount(options
, NUMBER_OF_RESEEKS_IN_ITERATION
));
4059 assert_reseek_count(100, 0);
4061 ASSERT_OK(dbfull()->SetOptions({{"max_sequential_skip_in_iterations", "4"}}));
4062 // Clear memtable and make new option effective
4063 dbfull()->TEST_FlushMemTable(true);
4065 assert_reseek_count(200, 1);
4068 dbfull()->SetOptions({{"max_sequential_skip_in_iterations", "16"}}));
4069 // Clear memtable and make new option effective
4070 dbfull()->TEST_FlushMemTable(true);
4072 assert_reseek_count(300, 1);
4074 MutableCFOptions mutable_cf_options
;
4075 CreateAndReopenWithCF({"pikachu"}, options
);
4076 // Test soft_pending_compaction_bytes_limit,
4077 // hard_pending_compaction_bytes_limit
4078 ASSERT_OK(dbfull()->SetOptions(
4079 handles_
[1], {{"soft_pending_compaction_bytes_limit", "200"},
4080 {"hard_pending_compaction_bytes_limit", "300"}}));
4081 ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_
[1],
4082 &mutable_cf_options
));
4083 ASSERT_EQ(200, mutable_cf_options
.soft_pending_compaction_bytes_limit
);
4084 ASSERT_EQ(300, mutable_cf_options
.hard_pending_compaction_bytes_limit
);
4085 // Test report_bg_io_stats
4087 dbfull()->SetOptions(handles_
[1], {{"report_bg_io_stats", "true"}}));
4089 ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_
[1],
4090 &mutable_cf_options
));
4091 ASSERT_TRUE(mutable_cf_options
.report_bg_io_stats
);
4094 ASSERT_OK(dbfull()->SetOptions({{"compression", "kNoCompression"}}));
4095 ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_
[0],
4096 &mutable_cf_options
));
4097 ASSERT_EQ(CompressionType::kNoCompression
, mutable_cf_options
.compression
);
4098 ASSERT_OK(dbfull()->SetOptions({{"compression", "kSnappyCompression"}}));
4099 ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_
[0],
4100 &mutable_cf_options
));
4101 ASSERT_EQ(CompressionType::kSnappyCompression
,
4102 mutable_cf_options
.compression
);
4103 // Test paranoid_file_checks already done in db_block_cache_test
4105 dbfull()->SetOptions(handles_
[1], {{"paranoid_file_checks", "true"}}));
4106 ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_
[1],
4107 &mutable_cf_options
));
4108 ASSERT_TRUE(mutable_cf_options
.report_bg_io_stats
);
4110 #endif // ROCKSDB_LITE
4112 TEST_F(DBTest
, L0L1L2AndUpHitCounter
) {
4113 Options options
= CurrentOptions();
4114 options
.write_buffer_size
= 32 * 1024;
4115 options
.target_file_size_base
= 32 * 1024;
4116 options
.level0_file_num_compaction_trigger
= 2;
4117 options
.level0_slowdown_writes_trigger
= 2;
4118 options
.level0_stop_writes_trigger
= 4;
4119 options
.max_bytes_for_level_base
= 64 * 1024;
4120 options
.max_write_buffer_number
= 2;
4121 options
.max_background_compactions
= 8;
4122 options
.max_background_flushes
= 8;
4123 options
.statistics
= rocksdb::CreateDBStatistics();
4124 CreateAndReopenWithCF({"mypikachu"}, options
);
4126 int numkeys
= 20000;
4127 for (int i
= 0; i
< numkeys
; i
++) {
4128 ASSERT_OK(Put(1, Key(i
), "val"));
4130 ASSERT_EQ(0, TestGetTickerCount(options
, GET_HIT_L0
));
4131 ASSERT_EQ(0, TestGetTickerCount(options
, GET_HIT_L1
));
4132 ASSERT_EQ(0, TestGetTickerCount(options
, GET_HIT_L2_AND_UP
));
4134 ASSERT_OK(Flush(1));
4135 dbfull()->TEST_WaitForCompact();
4137 for (int i
= 0; i
< numkeys
; i
++) {
4138 ASSERT_EQ(Get(1, Key(i
)), "val");
4141 ASSERT_GT(TestGetTickerCount(options
, GET_HIT_L0
), 100);
4142 ASSERT_GT(TestGetTickerCount(options
, GET_HIT_L1
), 100);
4143 ASSERT_GT(TestGetTickerCount(options
, GET_HIT_L2_AND_UP
), 100);
4145 ASSERT_EQ(numkeys
, TestGetTickerCount(options
, GET_HIT_L0
) +
4146 TestGetTickerCount(options
, GET_HIT_L1
) +
4147 TestGetTickerCount(options
, GET_HIT_L2_AND_UP
));
4150 TEST_F(DBTest
, EncodeDecompressedBlockSizeTest
) {
4156 CompressionType compressions
[] = {kZlibCompression
, kBZip2Compression
,
4157 kLZ4Compression
, kLZ4HCCompression
,
4158 kXpressCompression
};
4159 for (auto comp
: compressions
) {
4160 if (!CompressionTypeSupported(comp
)) {
4163 // first_table_version 1 -- generate with table_version == 1, read with
4164 // table_version == 2
4165 // first_table_version 2 -- generate with table_version == 2, read with
4166 // table_version == 1
4167 for (int first_table_version
= 1; first_table_version
<= 2;
4168 ++first_table_version
) {
4169 BlockBasedTableOptions table_options
;
4170 table_options
.format_version
= first_table_version
;
4171 table_options
.filter_policy
.reset(NewBloomFilterPolicy(10));
4172 Options options
= CurrentOptions();
4173 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
4174 options
.create_if_missing
= true;
4175 options
.compression
= comp
;
4176 DestroyAndReopen(options
);
4178 int kNumKeysWritten
= 100000;
4181 for (int i
= 0; i
< kNumKeysWritten
; ++i
) {
4182 // compressible string
4183 ASSERT_OK(Put(Key(i
), RandomString(&rnd
, 128) + std::string(128, 'a')));
4186 table_options
.format_version
= first_table_version
== 1 ? 2 : 1;
4187 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
4189 for (int i
= 0; i
< kNumKeysWritten
; ++i
) {
4190 auto r
= Get(Key(i
));
4191 ASSERT_EQ(r
.substr(128), std::string(128, 'a'));
4197 TEST_F(DBTest
, CloseSpeedup
) {
4198 Options options
= CurrentOptions();
4199 options
.compaction_style
= kCompactionStyleLevel
;
4200 options
.write_buffer_size
= 110 << 10; // 110KB
4201 options
.arena_block_size
= 4 << 10;
4202 options
.level0_file_num_compaction_trigger
= 2;
4203 options
.num_levels
= 4;
4204 options
.max_bytes_for_level_base
= 400 * 1024;
4205 options
.max_write_buffer_number
= 16;
4207 // Block background threads
4208 env_
->SetBackgroundThreads(1, Env::LOW
);
4209 env_
->SetBackgroundThreads(1, Env::HIGH
);
4210 test::SleepingBackgroundTask sleeping_task_low
;
4211 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
4212 Env::Priority::LOW
);
4213 test::SleepingBackgroundTask sleeping_task_high
;
4214 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
,
4215 &sleeping_task_high
, Env::Priority::HIGH
);
4217 std::vector
<std::string
> filenames
;
4218 env_
->GetChildren(dbname_
, &filenames
);
4219 // Delete archival files.
4220 for (size_t i
= 0; i
< filenames
.size(); ++i
) {
4221 env_
->DeleteFile(dbname_
+ "/" + filenames
[i
]);
4223 env_
->DeleteDir(dbname_
);
4224 DestroyAndReopen(options
);
4226 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
4227 env_
->SetBackgroundThreads(1, Env::LOW
);
4228 env_
->SetBackgroundThreads(1, Env::HIGH
);
4232 // First three 110KB files are not going to level 2
4233 // After that, (100K, 200K)
4234 for (int num
= 0; num
< 5; num
++) {
4235 GenerateNewFile(&rnd
, &key_idx
, true);
4238 ASSERT_EQ(0, GetSstFileCount(dbname_
));
4241 ASSERT_EQ(0, GetSstFileCount(dbname_
));
4243 // Unblock background threads
4244 sleeping_task_high
.WakeUp();
4245 sleeping_task_high
.WaitUntilDone();
4246 sleeping_task_low
.WakeUp();
4247 sleeping_task_low
.WaitUntilDone();
4252 class DelayedMergeOperator
: public MergeOperator
{
4257 explicit DelayedMergeOperator(DBTest
* d
) : db_test_(d
) {}
4259 virtual bool FullMergeV2(const MergeOperationInput
& merge_in
,
4260 MergeOperationOutput
* merge_out
) const override
{
4261 db_test_
->env_
->addon_time_
.fetch_add(1000);
4262 merge_out
->new_value
= "";
4266 virtual const char* Name() const override
{ return "DelayedMergeOperator"; }
4269 TEST_F(DBTest
, MergeTestTime
) {
4270 std::string one
, two
, three
;
4271 PutFixed64(&one
, 1);
4272 PutFixed64(&two
, 2);
4273 PutFixed64(&three
, 3);
4275 // Enable time profiling
4276 SetPerfLevel(kEnableTime
);
4277 this->env_
->addon_time_
.store(0);
4278 this->env_
->time_elapse_only_sleep_
= true;
4279 this->env_
->no_slowdown_
= true;
4280 Options options
= CurrentOptions();
4281 options
.statistics
= rocksdb::CreateDBStatistics();
4282 options
.merge_operator
.reset(new DelayedMergeOperator(this));
4283 DestroyAndReopen(options
);
4285 ASSERT_EQ(TestGetTickerCount(options
, MERGE_OPERATION_TOTAL_TIME
), 0);
4286 db_
->Put(WriteOptions(), "foo", one
);
4288 ASSERT_OK(db_
->Merge(WriteOptions(), "foo", two
));
4290 ASSERT_OK(db_
->Merge(WriteOptions(), "foo", three
));
4294 opt
.verify_checksums
= true;
4295 opt
.snapshot
= nullptr;
4297 db_
->Get(opt
, "foo", &result
);
4299 ASSERT_EQ(1000000, TestGetTickerCount(options
, MERGE_OPERATION_TOTAL_TIME
));
4301 ReadOptions read_options
;
4302 std::unique_ptr
<Iterator
> iter(db_
->NewIterator(read_options
));
4304 for (iter
->SeekToFirst(); iter
->Valid(); iter
->Next()) {
4305 ASSERT_OK(iter
->status());
4309 ASSERT_EQ(1, count
);
4310 ASSERT_EQ(2000000, TestGetTickerCount(options
, MERGE_OPERATION_TOTAL_TIME
));
4311 #ifdef ROCKSDB_USING_THREAD_STATUS
4312 ASSERT_GT(TestGetTickerCount(options
, FLUSH_WRITE_BYTES
), 0);
4313 #endif // ROCKSDB_USING_THREAD_STATUS
4314 this->env_
->time_elapse_only_sleep_
= false;
4317 #ifndef ROCKSDB_LITE
4318 TEST_P(DBTestWithParam
, MergeCompactionTimeTest
) {
4319 SetPerfLevel(kEnableTime
);
4320 Options options
= CurrentOptions();
4321 options
.compaction_filter_factory
= std::make_shared
<KeepFilterFactory
>();
4322 options
.statistics
= rocksdb::CreateDBStatistics();
4323 options
.merge_operator
.reset(new DelayedMergeOperator(this));
4324 options
.compaction_style
= kCompactionStyleUniversal
;
4325 options
.max_subcompactions
= max_subcompactions_
;
4326 DestroyAndReopen(options
);
4328 for (int i
= 0; i
< 1000; i
++) {
4329 ASSERT_OK(db_
->Merge(WriteOptions(), "foo", "TEST"));
4332 dbfull()->TEST_WaitForFlushMemTable();
4333 dbfull()->TEST_WaitForCompact();
4335 ASSERT_NE(TestGetTickerCount(options
, MERGE_OPERATION_TOTAL_TIME
), 0);
4338 TEST_P(DBTestWithParam
, FilterCompactionTimeTest
) {
4339 Options options
= CurrentOptions();
4340 options
.compaction_filter_factory
=
4341 std::make_shared
<DelayFilterFactory
>(this);
4342 options
.disable_auto_compactions
= true;
4343 options
.create_if_missing
= true;
4344 options
.statistics
= rocksdb::CreateDBStatistics();
4345 options
.max_subcompactions
= max_subcompactions_
;
4346 DestroyAndReopen(options
);
4349 for (int table
= 0; table
< 4; ++table
) {
4350 for (int i
= 0; i
< 10 + table
; ++i
) {
4351 Put(ToString(table
* 100 + i
), "val");
4356 CompactRangeOptions cro
;
4357 cro
.exclusive_manual_compaction
= exclusive_manual_compaction_
;
4358 ASSERT_OK(db_
->CompactRange(cro
, nullptr, nullptr));
4359 ASSERT_EQ(0U, CountLiveFiles());
4363 Iterator
* itr
= db_
->NewIterator(ReadOptions());
4365 ASSERT_NE(TestGetTickerCount(options
, FILTER_OPERATION_TOTAL_TIME
), 0);
4368 #endif // ROCKSDB_LITE
4370 TEST_F(DBTest
, TestLogCleanup
) {
4371 Options options
= CurrentOptions();
4372 options
.write_buffer_size
= 64 * 1024; // very small
4373 // only two memtables allowed ==> only two log files
4374 options
.max_write_buffer_number
= 2;
4377 for (int i
= 0; i
< 100000; ++i
) {
4379 // only 2 memtables will be alive, so logs_to_free needs to always be below
4381 ASSERT_LT(dbfull()->TEST_LogsToFreeSize(), static_cast<size_t>(3));
4385 #ifndef ROCKSDB_LITE
4386 TEST_F(DBTest
, EmptyCompactedDB
) {
4387 Options options
= CurrentOptions();
4388 options
.max_open_files
= -1;
4390 ASSERT_OK(ReadOnlyReopen(options
));
4391 Status s
= Put("new", "value");
4392 ASSERT_TRUE(s
.IsNotSupported());
4395 #endif // ROCKSDB_LITE
4397 #ifndef ROCKSDB_LITE
4398 TEST_F(DBTest
, SuggestCompactRangeTest
) {
4399 class CompactionFilterFactoryGetContext
: public CompactionFilterFactory
{
4401 virtual std::unique_ptr
<CompactionFilter
> CreateCompactionFilter(
4402 const CompactionFilter::Context
& context
) override
{
4403 saved_context
= context
;
4404 std::unique_ptr
<CompactionFilter
> empty_filter
;
4405 return empty_filter
;
4407 const char* Name() const override
{
4408 return "CompactionFilterFactoryGetContext";
4410 static bool IsManual(CompactionFilterFactory
* compaction_filter_factory
) {
4411 return reinterpret_cast<CompactionFilterFactoryGetContext
*>(
4412 compaction_filter_factory
)
4413 ->saved_context
.is_manual_compaction
;
4415 CompactionFilter::Context saved_context
;
4418 Options options
= CurrentOptions();
4419 options
.memtable_factory
.reset(
4420 new SpecialSkipListFactory(DBTestBase::kNumKeysByGenerateNewRandomFile
));
4421 options
.compaction_style
= kCompactionStyleLevel
;
4422 options
.compaction_filter_factory
.reset(
4423 new CompactionFilterFactoryGetContext());
4424 options
.write_buffer_size
= 200 << 10;
4425 options
.arena_block_size
= 4 << 10;
4426 options
.level0_file_num_compaction_trigger
= 4;
4427 options
.num_levels
= 4;
4428 options
.compression
= kNoCompression
;
4429 options
.max_bytes_for_level_base
= 450 << 10;
4430 options
.target_file_size_base
= 98 << 10;
4431 options
.max_compaction_bytes
= static_cast<uint64_t>(1) << 60; // inf
4437 for (int num
= 0; num
< 3; num
++) {
4438 GenerateNewRandomFile(&rnd
);
4441 GenerateNewRandomFile(&rnd
);
4442 ASSERT_EQ("0,4", FilesPerLevel(0));
4443 ASSERT_TRUE(!CompactionFilterFactoryGetContext::IsManual(
4444 options
.compaction_filter_factory
.get()));
4446 GenerateNewRandomFile(&rnd
);
4447 ASSERT_EQ("1,4", FilesPerLevel(0));
4449 GenerateNewRandomFile(&rnd
);
4450 ASSERT_EQ("2,4", FilesPerLevel(0));
4452 GenerateNewRandomFile(&rnd
);
4453 ASSERT_EQ("3,4", FilesPerLevel(0));
4455 GenerateNewRandomFile(&rnd
);
4456 ASSERT_EQ("0,4,4", FilesPerLevel(0));
4458 GenerateNewRandomFile(&rnd
);
4459 ASSERT_EQ("1,4,4", FilesPerLevel(0));
4461 GenerateNewRandomFile(&rnd
);
4462 ASSERT_EQ("2,4,4", FilesPerLevel(0));
4464 GenerateNewRandomFile(&rnd
);
4465 ASSERT_EQ("3,4,4", FilesPerLevel(0));
4467 GenerateNewRandomFile(&rnd
);
4468 ASSERT_EQ("0,4,8", FilesPerLevel(0));
4470 GenerateNewRandomFile(&rnd
);
4471 ASSERT_EQ("1,4,8", FilesPerLevel(0));
4473 // compact it three times
4474 for (int i
= 0; i
< 3; ++i
) {
4475 ASSERT_OK(experimental::SuggestCompactRange(db_
, nullptr, nullptr));
4476 dbfull()->TEST_WaitForCompact();
4479 // All files are compacted
4480 ASSERT_EQ(0, NumTableFilesAtLevel(0));
4481 ASSERT_EQ(0, NumTableFilesAtLevel(1));
4483 GenerateNewRandomFile(&rnd
);
4484 ASSERT_EQ(1, NumTableFilesAtLevel(0));
4486 // nonoverlapping with the file on level 0
4487 Slice
start("a"), end("b");
4488 ASSERT_OK(experimental::SuggestCompactRange(db_
, &start
, &end
));
4489 dbfull()->TEST_WaitForCompact();
4491 // should not compact the level 0 file
4492 ASSERT_EQ(1, NumTableFilesAtLevel(0));
4496 ASSERT_OK(experimental::SuggestCompactRange(db_
, &start
, &end
));
4497 dbfull()->TEST_WaitForCompact();
4498 ASSERT_TRUE(CompactionFilterFactoryGetContext::IsManual(
4499 options
.compaction_filter_factory
.get()));
4501 // now it should compact the level 0 file
4502 ASSERT_EQ(0, NumTableFilesAtLevel(0));
4503 ASSERT_EQ(1, NumTableFilesAtLevel(1));
4506 TEST_F(DBTest
, PromoteL0
) {
4507 Options options
= CurrentOptions();
4508 options
.disable_auto_compactions
= true;
4509 options
.write_buffer_size
= 10 * 1024 * 1024;
4510 DestroyAndReopen(options
);
4512 // non overlapping ranges
4513 std::vector
<std::pair
<int32_t, int32_t>> ranges
= {
4514 {81, 160}, {0, 80}, {161, 240}, {241, 320}};
4516 int32_t value_size
= 10 * 1024; // 10 KB
4519 std::map
<int32_t, std::string
> values
;
4520 for (const auto& range
: ranges
) {
4521 for (int32_t j
= range
.first
; j
< range
.second
; j
++) {
4522 values
[j
] = RandomString(&rnd
, value_size
);
4523 ASSERT_OK(Put(Key(j
), values
[j
]));
4528 int32_t level0_files
= NumTableFilesAtLevel(0, 0);
4529 ASSERT_EQ(level0_files
, ranges
.size());
4530 ASSERT_EQ(NumTableFilesAtLevel(1, 0), 0); // No files in L1
4532 // Promote L0 level to L2.
4533 ASSERT_OK(experimental::PromoteL0(db_
, db_
->DefaultColumnFamily(), 2));
4534 // We expect that all the files were trivially moved from L0 to L2
4535 ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0);
4536 ASSERT_EQ(NumTableFilesAtLevel(2, 0), level0_files
);
4538 for (const auto& kv
: values
) {
4539 ASSERT_EQ(Get(Key(kv
.first
)), kv
.second
);
4543 TEST_F(DBTest
, PromoteL0Failure
) {
4544 Options options
= CurrentOptions();
4545 options
.disable_auto_compactions
= true;
4546 options
.write_buffer_size
= 10 * 1024 * 1024;
4547 DestroyAndReopen(options
);
4549 // Produce two L0 files with overlapping ranges.
4550 ASSERT_OK(Put(Key(0), ""));
4551 ASSERT_OK(Put(Key(3), ""));
4553 ASSERT_OK(Put(Key(1), ""));
4557 // Fails because L0 has overlapping files.
4558 status
= experimental::PromoteL0(db_
, db_
->DefaultColumnFamily());
4559 ASSERT_TRUE(status
.IsInvalidArgument());
4561 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
4562 // Now there is a file in L1.
4563 ASSERT_GE(NumTableFilesAtLevel(1, 0), 1);
4565 ASSERT_OK(Put(Key(5), ""));
4567 // Fails because L1 is non-empty.
4568 status
= experimental::PromoteL0(db_
, db_
->DefaultColumnFamily());
4569 ASSERT_TRUE(status
.IsInvalidArgument());
4571 #endif // ROCKSDB_LITE
4573 // Github issue #596
4574 TEST_F(DBTest
, HugeNumberOfLevels
) {
4575 Options options
= CurrentOptions();
4576 options
.write_buffer_size
= 2 * 1024 * 1024; // 2MB
4577 options
.max_bytes_for_level_base
= 2 * 1024 * 1024; // 2MB
4578 options
.num_levels
= 12;
4579 options
.max_background_compactions
= 10;
4580 options
.max_bytes_for_level_multiplier
= 2;
4581 options
.level_compaction_dynamic_level_bytes
= true;
4582 DestroyAndReopen(options
);
4585 for (int i
= 0; i
< 300000; ++i
) {
4586 ASSERT_OK(Put(Key(i
), RandomString(&rnd
, 1024)));
4589 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
4592 TEST_F(DBTest
, AutomaticConflictsWithManualCompaction
) {
4593 Options options
= CurrentOptions();
4594 options
.write_buffer_size
= 2 * 1024 * 1024; // 2MB
4595 options
.max_bytes_for_level_base
= 2 * 1024 * 1024; // 2MB
4596 options
.num_levels
= 12;
4597 options
.max_background_compactions
= 10;
4598 options
.max_bytes_for_level_multiplier
= 2;
4599 options
.level_compaction_dynamic_level_bytes
= true;
4600 DestroyAndReopen(options
);
4603 for (int i
= 0; i
< 300000; ++i
) {
4604 ASSERT_OK(Put(Key(i
), RandomString(&rnd
, 1024)));
4607 std::atomic
<int> callback_count(0);
4608 rocksdb::SyncPoint::GetInstance()->SetCallBack(
4609 "DBImpl::BackgroundCompaction()::Conflict",
4610 [&](void* arg
) { callback_count
.fetch_add(1); });
4611 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
4612 CompactRangeOptions croptions
;
4613 croptions
.exclusive_manual_compaction
= false;
4614 ASSERT_OK(db_
->CompactRange(croptions
, nullptr, nullptr));
4615 ASSERT_GE(callback_count
.load(), 1);
4616 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
4617 for (int i
= 0; i
< 300000; ++i
) {
4618 ASSERT_NE("NOT_FOUND", Get(Key(i
)));
4622 // Github issue #595
4623 // Large write batch with column families
4624 TEST_F(DBTest
, LargeBatchWithColumnFamilies
) {
4625 Options options
= CurrentOptions();
4627 options
.write_buffer_size
= 100000; // Small write buffer
4628 CreateAndReopenWithCF({"pikachu"}, options
);
4630 for (int i
= 0; i
< 5; i
++) {
4631 for (int pass
= 1; pass
<= 3; pass
++) {
4633 size_t write_size
= 1024 * 1024 * (5 + i
);
4634 fprintf(stderr
, "prepare: %" ROCKSDB_PRIszt
" MB, pass:%d\n",
4635 (write_size
/ 1024 / 1024), pass
);
4637 std::string
data(3000, j
++ % 127 + 20);
4638 data
+= ToString(j
);
4639 batch
.Put(handles_
[0], Slice(data
), Slice(data
));
4640 if (batch
.GetDataSize() > write_size
) {
4644 fprintf(stderr
, "write: %" ROCKSDB_PRIszt
" MB\n",
4645 (batch
.GetDataSize() / 1024 / 1024));
4646 ASSERT_OK(dbfull()->Write(WriteOptions(), &batch
));
4647 fprintf(stderr
, "done\n");
4650 // make sure we can re-open it.
4651 ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options
));
4654 // Make sure that Flushes can proceed in parallel with CompactRange()
4655 TEST_F(DBTest
, FlushesInParallelWithCompactRange
) {
4656 // iter == 0 -- leveled
4657 // iter == 1 -- leveled, but throw in a flush between two levels compacting
4658 // iter == 2 -- universal
4659 for (int iter
= 0; iter
< 3; ++iter
) {
4660 Options options
= CurrentOptions();
4662 options
.compaction_style
= kCompactionStyleLevel
;
4664 options
.compaction_style
= kCompactionStyleUniversal
;
4666 options
.write_buffer_size
= 110 << 10;
4667 options
.level0_file_num_compaction_trigger
= 4;
4668 options
.num_levels
= 4;
4669 options
.compression
= kNoCompression
;
4670 options
.max_bytes_for_level_base
= 450 << 10;
4671 options
.target_file_size_base
= 98 << 10;
4672 options
.max_write_buffer_number
= 2;
4674 DestroyAndReopen(options
);
4677 for (int num
= 0; num
< 14; num
++) {
4678 GenerateNewRandomFile(&rnd
);
4682 rocksdb::SyncPoint::GetInstance()->LoadDependency(
4683 {{"DBImpl::RunManualCompaction()::1",
4684 "DBTest::FlushesInParallelWithCompactRange:1"},
4685 {"DBTest::FlushesInParallelWithCompactRange:2",
4686 "DBImpl::RunManualCompaction()::2"}});
4688 rocksdb::SyncPoint::GetInstance()->LoadDependency(
4689 {{"CompactionJob::Run():Start",
4690 "DBTest::FlushesInParallelWithCompactRange:1"},
4691 {"DBTest::FlushesInParallelWithCompactRange:2",
4692 "CompactionJob::Run():End"}});
4694 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
4696 std::vector
<port::Thread
> threads
;
4697 threads
.emplace_back([&]() { Compact("a", "z"); });
4699 TEST_SYNC_POINT("DBTest::FlushesInParallelWithCompactRange:1");
4701 // this has to start a flush. if flushes are blocked, this will try to
4703 // 3 memtables, and that will fail because max_write_buffer_number is 2
4704 for (int num
= 0; num
< 3; num
++) {
4705 GenerateNewRandomFile(&rnd
, /* nowait */ true);
4708 TEST_SYNC_POINT("DBTest::FlushesInParallelWithCompactRange:2");
4710 for (auto& t
: threads
) {
4713 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
4717 TEST_F(DBTest
, DelayedWriteRate
) {
4718 const int kEntriesPerMemTable
= 100;
4719 const int kTotalFlushes
= 12;
4721 Options options
= CurrentOptions();
4722 env_
->SetBackgroundThreads(1, Env::LOW
);
4724 env_
->no_slowdown_
= true;
4725 options
.write_buffer_size
= 100000000;
4726 options
.max_write_buffer_number
= 256;
4727 options
.max_background_compactions
= 1;
4728 options
.level0_file_num_compaction_trigger
= 3;
4729 options
.level0_slowdown_writes_trigger
= 3;
4730 options
.level0_stop_writes_trigger
= 999999;
4731 options
.delayed_write_rate
= 20000000; // Start with 200MB/s
4732 options
.memtable_factory
.reset(
4733 new SpecialSkipListFactory(kEntriesPerMemTable
));
4735 CreateAndReopenWithCF({"pikachu"}, options
);
4737 // Block compactions
4738 test::SleepingBackgroundTask sleeping_task_low
;
4739 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
4740 Env::Priority::LOW
);
4742 for (int i
= 0; i
< 3; i
++) {
4743 Put(Key(i
), std::string(10000, 'x'));
4747 // These writes will be slowed down to 1KB/s
4748 uint64_t estimated_sleep_time
= 0;
4751 uint64_t cur_rate
= options
.delayed_write_rate
;
4752 for (int i
= 0; i
< kTotalFlushes
; i
++) {
4753 uint64_t size_memtable
= 0;
4754 for (int j
= 0; j
< kEntriesPerMemTable
; j
++) {
4755 auto rand_num
= rnd
.Uniform(20);
4756 // Spread the size range to more.
4757 size_t entry_size
= rand_num
* rand_num
* rand_num
;
4759 Put(Key(i
), std::string(entry_size
, 'x'), wo
);
4760 size_memtable
+= entry_size
+ 18;
4761 // Occasionally sleep a while
4762 if (rnd
.Uniform(20) == 6) {
4763 env_
->SleepForMicroseconds(2666);
4766 dbfull()->TEST_WaitForFlushMemTable();
4767 estimated_sleep_time
+= size_memtable
* 1000000u / cur_rate
;
4768 // Slow down twice. One for memtable switch and one for flush finishes.
4769 cur_rate
= static_cast<uint64_t>(static_cast<double>(cur_rate
) *
4770 kIncSlowdownRatio
* kIncSlowdownRatio
);
4772 // Estimate the total sleep time fall into the rough range.
4773 ASSERT_GT(env_
->addon_time_
.load(),
4774 static_cast<int64_t>(estimated_sleep_time
/ 2));
4775 ASSERT_LT(env_
->addon_time_
.load(),
4776 static_cast<int64_t>(estimated_sleep_time
* 2));
4778 env_
->no_slowdown_
= false;
4779 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
4780 sleeping_task_low
.WakeUp();
4781 sleeping_task_low
.WaitUntilDone();
4784 TEST_F(DBTest
, HardLimit
) {
4785 Options options
= CurrentOptions();
4787 env_
->SetBackgroundThreads(1, Env::LOW
);
4788 options
.max_write_buffer_number
= 256;
4789 options
.write_buffer_size
= 110 << 10; // 110KB
4790 options
.arena_block_size
= 4 * 1024;
4791 options
.level0_file_num_compaction_trigger
= 4;
4792 options
.level0_slowdown_writes_trigger
= 999999;
4793 options
.level0_stop_writes_trigger
= 999999;
4794 options
.hard_pending_compaction_bytes_limit
= 800 << 10;
4795 options
.max_bytes_for_level_base
= 10000000000u;
4796 options
.max_background_compactions
= 1;
4797 options
.memtable_factory
.reset(
4798 new SpecialSkipListFactory(KNumKeysByGenerateNewFile
- 1));
4800 env_
->SetBackgroundThreads(1, Env::LOW
);
4801 test::SleepingBackgroundTask sleeping_task_low
;
4802 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
4803 Env::Priority::LOW
);
4805 CreateAndReopenWithCF({"pikachu"}, options
);
4807 std::atomic
<int> callback_count(0);
4808 rocksdb::SyncPoint::GetInstance()->SetCallBack("DBImpl::DelayWrite:Wait",
4810 callback_count
.fetch_add(1);
4811 sleeping_task_low
.WakeUp();
4813 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
4817 for (int num
= 0; num
< 5; num
++) {
4818 GenerateNewFile(&rnd
, &key_idx
, true);
4819 dbfull()->TEST_WaitForFlushMemTable();
4822 ASSERT_EQ(0, callback_count
.load());
4824 for (int num
= 0; num
< 5; num
++) {
4825 GenerateNewFile(&rnd
, &key_idx
, true);
4826 dbfull()->TEST_WaitForFlushMemTable();
4828 ASSERT_GE(callback_count
.load(), 1);
4830 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
4831 sleeping_task_low
.WaitUntilDone();
4834 #ifndef ROCKSDB_LITE
4835 TEST_F(DBTest
, SoftLimit
) {
4836 Options options
= CurrentOptions();
4838 options
.write_buffer_size
= 100000; // Small write buffer
4839 options
.max_write_buffer_number
= 256;
4840 options
.level0_file_num_compaction_trigger
= 1;
4841 options
.level0_slowdown_writes_trigger
= 3;
4842 options
.level0_stop_writes_trigger
= 999999;
4843 options
.delayed_write_rate
= 20000; // About 200KB/s limited rate
4844 options
.soft_pending_compaction_bytes_limit
= 160000;
4845 options
.target_file_size_base
= 99999999; // All into one file
4846 options
.max_bytes_for_level_base
= 50000;
4847 options
.max_bytes_for_level_multiplier
= 10;
4848 options
.max_background_compactions
= 1;
4849 options
.compression
= kNoCompression
;
4853 // Generating 360KB in Level 3
4854 for (int i
= 0; i
< 72; i
++) {
4855 Put(Key(i
), std::string(5000, 'x'));
4860 dbfull()->TEST_WaitForCompact();
4861 MoveFilesToLevel(3);
4863 // Generating 360KB in Level 2
4864 for (int i
= 0; i
< 72; i
++) {
4865 Put(Key(i
), std::string(5000, 'x'));
4870 dbfull()->TEST_WaitForCompact();
4871 MoveFilesToLevel(2);
4875 test::SleepingBackgroundTask sleeping_task_low
;
4876 // Block compactions
4877 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
4878 Env::Priority::LOW
);
4879 sleeping_task_low
.WaitUntilSleeping();
4881 // Create 3 L0 files, making score of L0 to be 3.
4882 for (int i
= 0; i
< 3; i
++) {
4883 Put(Key(i
), std::string(5000, 'x'));
4884 Put(Key(100 - i
), std::string(5000, 'x'));
4885 // Flush the file. File size is around 30KB.
4888 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
4890 sleeping_task_low
.WakeUp();
4891 sleeping_task_low
.WaitUntilDone();
4892 sleeping_task_low
.Reset();
4893 dbfull()->TEST_WaitForCompact();
4895 // Now there is one L1 file but doesn't trigger soft_rate_limit
4896 // The L1 file size is around 30KB.
4897 ASSERT_EQ(NumTableFilesAtLevel(1), 1);
4898 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
4900 // Only allow one compactin going through.
4901 rocksdb::SyncPoint::GetInstance()->SetCallBack(
4902 "BackgroundCallCompaction:0", [&](void* arg
) {
4903 // Schedule a sleeping task.
4904 sleeping_task_low
.Reset();
4905 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
,
4906 &sleeping_task_low
, Env::Priority::LOW
);
4909 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
4911 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
4912 Env::Priority::LOW
);
4913 sleeping_task_low
.WaitUntilSleeping();
4914 // Create 3 L0 files, making score of L0 to be 3
4915 for (int i
= 0; i
< 3; i
++) {
4916 Put(Key(10 + i
), std::string(5000, 'x'));
4917 Put(Key(90 - i
), std::string(5000, 'x'));
4918 // Flush the file. File size is around 30KB.
4922 // Wake up sleep task to enable compaction to run and waits
4923 // for it to go to sleep state again to make sure one compaction
4925 sleeping_task_low
.WakeUp();
4926 sleeping_task_low
.WaitUntilSleeping();
4928 // Now there is one L1 file (around 60KB) which exceeds 50KB base by 10KB
4929 // Given level multiplier 10, estimated pending compaction is around 100KB
4930 // doesn't trigger soft_pending_compaction_bytes_limit
4931 ASSERT_EQ(NumTableFilesAtLevel(1), 1);
4932 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
4934 // Create 3 L0 files, making score of L0 to be 3, higher than L0.
4935 for (int i
= 0; i
< 3; i
++) {
4936 Put(Key(20 + i
), std::string(5000, 'x'));
4937 Put(Key(80 - i
), std::string(5000, 'x'));
4938 // Flush the file. File size is around 30KB.
4941 // Wake up sleep task to enable compaction to run and waits
4942 // for it to go to sleep state again to make sure one compaction
4944 sleeping_task_low
.WakeUp();
4945 sleeping_task_low
.WaitUntilSleeping();
4947 // Now there is one L1 file (around 90KB) which exceeds 50KB base by 40KB
4948 // L2 size is 360KB, so the estimated level fanout 4, estimated pending
4949 // compaction is around 200KB
4950 // triggerring soft_pending_compaction_bytes_limit
4951 ASSERT_EQ(NumTableFilesAtLevel(1), 1);
4952 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
4954 sleeping_task_low
.WakeUp();
4955 sleeping_task_low
.WaitUntilSleeping();
4957 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
4959 // shrink level base so L2 will hit soft limit easier.
4960 ASSERT_OK(dbfull()->SetOptions({
4961 {"max_bytes_for_level_base", "5000"},
4966 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
4968 sleeping_task_low
.WaitUntilSleeping();
4969 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
4970 sleeping_task_low
.WakeUp();
4971 sleeping_task_low
.WaitUntilDone();
4974 TEST_F(DBTest
, LastWriteBufferDelay
) {
4975 Options options
= CurrentOptions();
4977 options
.write_buffer_size
= 100000;
4978 options
.max_write_buffer_number
= 4;
4979 options
.delayed_write_rate
= 20000;
4980 options
.compression
= kNoCompression
;
4981 options
.disable_auto_compactions
= true;
4982 int kNumKeysPerMemtable
= 3;
4983 options
.memtable_factory
.reset(
4984 new SpecialSkipListFactory(kNumKeysPerMemtable
));
4987 test::SleepingBackgroundTask sleeping_task
;
4989 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task
,
4990 Env::Priority::HIGH
);
4991 sleeping_task
.WaitUntilSleeping();
4993 // Create 3 L0 files, making score of L0 to be 3.
4994 for (int i
= 0; i
< 3; i
++) {
4995 // Fill one mem table
4996 for (int j
= 0; j
< kNumKeysPerMemtable
; j
++) {
4999 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
5001 // Inserting a new entry would create a new mem table, triggering slow down.
5003 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
5005 sleeping_task
.WakeUp();
5006 sleeping_task
.WaitUntilDone();
5008 #endif // ROCKSDB_LITE
5010 TEST_F(DBTest
, FailWhenCompressionNotSupportedTest
) {
5011 CompressionType compressions
[] = {kZlibCompression
, kBZip2Compression
,
5012 kLZ4Compression
, kLZ4HCCompression
,
5013 kXpressCompression
};
5014 for (auto comp
: compressions
) {
5015 if (!CompressionTypeSupported(comp
)) {
5016 // not supported, we should fail the Open()
5017 Options options
= CurrentOptions();
5018 options
.compression
= comp
;
5019 ASSERT_TRUE(!TryReopen(options
).ok());
5020 // Try if CreateColumnFamily also fails
5021 options
.compression
= kNoCompression
;
5022 ASSERT_OK(TryReopen(options
));
5023 ColumnFamilyOptions
cf_options(options
);
5024 cf_options
.compression
= comp
;
5025 ColumnFamilyHandle
* handle
;
5026 ASSERT_TRUE(!db_
->CreateColumnFamily(cf_options
, "name", &handle
).ok());
5031 #ifndef ROCKSDB_LITE
5032 TEST_F(DBTest
, RowCache
) {
5033 Options options
= CurrentOptions();
5034 options
.statistics
= rocksdb::CreateDBStatistics();
5035 options
.row_cache
= NewLRUCache(8192);
5036 DestroyAndReopen(options
);
5038 ASSERT_OK(Put("foo", "bar"));
5041 ASSERT_EQ(TestGetTickerCount(options
, ROW_CACHE_HIT
), 0);
5042 ASSERT_EQ(TestGetTickerCount(options
, ROW_CACHE_MISS
), 0);
5043 ASSERT_EQ(Get("foo"), "bar");
5044 ASSERT_EQ(TestGetTickerCount(options
, ROW_CACHE_HIT
), 0);
5045 ASSERT_EQ(TestGetTickerCount(options
, ROW_CACHE_MISS
), 1);
5046 ASSERT_EQ(Get("foo"), "bar");
5047 ASSERT_EQ(TestGetTickerCount(options
, ROW_CACHE_HIT
), 1);
5048 ASSERT_EQ(TestGetTickerCount(options
, ROW_CACHE_MISS
), 1);
5050 #endif // ROCKSDB_LITE
5052 TEST_F(DBTest
, DeletingOldWalAfterDrop
) {
5053 rocksdb::SyncPoint::GetInstance()->LoadDependency(
5054 {{"Test:AllowFlushes", "DBImpl::BGWorkFlush"},
5055 {"DBImpl::BGWorkFlush:done", "Test:WaitForFlush"}});
5056 rocksdb::SyncPoint::GetInstance()->ClearTrace();
5058 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
5059 Options options
= CurrentOptions();
5060 options
.max_total_wal_size
= 8192;
5061 options
.compression
= kNoCompression
;
5062 options
.write_buffer_size
= 1 << 20;
5063 options
.level0_file_num_compaction_trigger
= (1 << 30);
5064 options
.level0_slowdown_writes_trigger
= (1 << 30);
5065 options
.level0_stop_writes_trigger
= (1 << 30);
5066 options
.disable_auto_compactions
= true;
5067 DestroyAndReopen(options
);
5068 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
5070 CreateColumnFamilies({"cf1", "cf2"}, options
);
5071 ASSERT_OK(Put(0, "key1", DummyString(8192)));
5072 ASSERT_OK(Put(0, "key2", DummyString(8192)));
5073 // the oldest wal should now be getting_flushed
5074 ASSERT_OK(db_
->DropColumnFamily(handles_
[0]));
5075 // all flushes should now do nothing because their CF is dropped
5076 TEST_SYNC_POINT("Test:AllowFlushes");
5077 TEST_SYNC_POINT("Test:WaitForFlush");
5078 uint64_t lognum1
= dbfull()->TEST_LogfileNumber();
5079 ASSERT_OK(Put(1, "key3", DummyString(8192)));
5080 ASSERT_OK(Put(1, "key4", DummyString(8192)));
5081 // new wal should have been created
5082 uint64_t lognum2
= dbfull()->TEST_LogfileNumber();
5083 EXPECT_GT(lognum2
, lognum1
);
5086 TEST_F(DBTest
, UnsupportedManualSync
) {
5087 DestroyAndReopen(CurrentOptions());
5088 env_
->is_wal_sync_thread_safe_
.store(false);
5089 Status s
= db_
->SyncWAL();
5090 ASSERT_TRUE(s
.IsNotSupported());
5093 INSTANTIATE_TEST_CASE_P(DBTestWithParam
, DBTestWithParam
,
5094 ::testing::Combine(::testing::Values(1, 4),
5095 ::testing::Bool()));
5097 TEST_F(DBTest
, PauseBackgroundWorkTest
) {
5098 Options options
= CurrentOptions();
5099 options
.write_buffer_size
= 100000; // Small write buffer
5102 std::vector
<port::Thread
> threads
;
5103 std::atomic
<bool> done(false);
5104 db_
->PauseBackgroundWork();
5105 threads
.emplace_back([&]() {
5107 for (int i
= 0; i
< 10000; ++i
) {
5108 Put(RandomString(&rnd
, 10), RandomString(&rnd
, 10));
5112 env_
->SleepForMicroseconds(200000);
5113 // make sure the thread is not done
5114 ASSERT_FALSE(done
.load());
5115 db_
->ContinueBackgroundWork();
5116 for (auto& t
: threads
) {
5120 ASSERT_TRUE(done
.load());
5123 } // namespace rocksdb
5125 int main(int argc
, char** argv
) {
5126 rocksdb::port::InstallStackTraceHandler();
5127 ::testing::InitGoogleTest(&argc
, argv
);
5128 return RUN_ALL_TESTS();