1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 // Introduction of SyncPoint effectively disabled building and running this test
12 // which is a pity, it is a good test
17 #include <unordered_set>
26 #include "cache/lru_cache.h"
27 #include "db/blob/blob_index.h"
28 #include "db/db_impl/db_impl.h"
29 #include "db/db_test_util.h"
30 #include "db/dbformat.h"
31 #include "db/job_context.h"
32 #include "db/version_set.h"
33 #include "db/write_batch_internal.h"
34 #include "env/mock_env.h"
35 #include "file/filename.h"
36 #include "memtable/hash_linklist_rep.h"
37 #include "monitoring/thread_status_util.h"
38 #include "port/port.h"
39 #include "port/stack_trace.h"
40 #include "rocksdb/cache.h"
41 #include "rocksdb/compaction_filter.h"
42 #include "rocksdb/convenience.h"
43 #include "rocksdb/db.h"
44 #include "rocksdb/env.h"
45 #include "rocksdb/experimental.h"
46 #include "rocksdb/filter_policy.h"
47 #include "rocksdb/options.h"
48 #include "rocksdb/perf_context.h"
49 #include "rocksdb/slice.h"
50 #include "rocksdb/slice_transform.h"
51 #include "rocksdb/snapshot.h"
52 #include "rocksdb/table.h"
53 #include "rocksdb/table_properties.h"
54 #include "rocksdb/thread_status.h"
55 #include "rocksdb/utilities/checkpoint.h"
56 #include "rocksdb/utilities/optimistic_transaction_db.h"
57 #include "rocksdb/utilities/write_batch_with_index.h"
58 #include "table/mock_table.h"
59 #include "table/scoped_arena_iterator.h"
60 #include "test_util/sync_point.h"
61 #include "test_util/testharness.h"
62 #include "test_util/testutil.h"
63 #include "util/compression.h"
64 #include "util/mutexlock.h"
65 #include "util/random.h"
66 #include "util/rate_limiter.h"
67 #include "util/string_util.h"
68 #include "utilities/merge_operators.h"
70 namespace ROCKSDB_NAMESPACE
{
72 // Note that whole DBTest and its child classes disable fsync on files
73 // and directories for speed.
74 // If fsync needs to be covered in a test, put it in other places.
75 class DBTest
: public DBTestBase
{
77 DBTest() : DBTestBase("/db_test", /*env_do_fsync=*/false) {}
82 public testing::WithParamInterface
<std::tuple
<uint32_t, bool>> {
85 max_subcompactions_
= std::get
<0>(GetParam());
86 exclusive_manual_compaction_
= std::get
<1>(GetParam());
89 // Required if inheriting from testing::WithParamInterface<>
90 static void SetUpTestCase() {}
91 static void TearDownTestCase() {}
93 uint32_t max_subcompactions_
;
94 bool exclusive_manual_compaction_
;
97 TEST_F(DBTest
, MockEnvTest
) {
98 std::unique_ptr
<MockEnv
> env
{new MockEnv(Env::Default())};
100 options
.create_if_missing
= true;
101 options
.env
= env
.get();
104 const Slice keys
[] = {Slice("aaa"), Slice("bbb"), Slice("ccc")};
105 const Slice vals
[] = {Slice("foo"), Slice("bar"), Slice("baz")};
107 ASSERT_OK(DB::Open(options
, "/dir/db", &db
));
108 for (size_t i
= 0; i
< 3; ++i
) {
109 ASSERT_OK(db
->Put(WriteOptions(), keys
[i
], vals
[i
]));
112 for (size_t i
= 0; i
< 3; ++i
) {
114 ASSERT_OK(db
->Get(ReadOptions(), keys
[i
], &res
));
115 ASSERT_TRUE(res
== vals
[i
]);
118 Iterator
* iterator
= db
->NewIterator(ReadOptions());
119 iterator
->SeekToFirst();
120 for (size_t i
= 0; i
< 3; ++i
) {
121 ASSERT_TRUE(iterator
->Valid());
122 ASSERT_TRUE(keys
[i
] == iterator
->key());
123 ASSERT_TRUE(vals
[i
] == iterator
->value());
126 ASSERT_TRUE(!iterator
->Valid());
129 // TEST_FlushMemTable() is not supported in ROCKSDB_LITE
131 DBImpl
* dbi
= static_cast_with_check
<DBImpl
>(db
);
132 ASSERT_OK(dbi
->TEST_FlushMemTable());
134 for (size_t i
= 0; i
< 3; ++i
) {
136 ASSERT_OK(db
->Get(ReadOptions(), keys
[i
], &res
));
137 ASSERT_TRUE(res
== vals
[i
]);
139 #endif // ROCKSDB_LITE
144 // NewMemEnv returns nullptr in ROCKSDB_LITE since class InMemoryEnv isn't
147 TEST_F(DBTest
, MemEnvTest
) {
148 std::unique_ptr
<Env
> env
{NewMemEnv(Env::Default())};
150 options
.create_if_missing
= true;
151 options
.env
= env
.get();
154 const Slice keys
[] = {Slice("aaa"), Slice("bbb"), Slice("ccc")};
155 const Slice vals
[] = {Slice("foo"), Slice("bar"), Slice("baz")};
157 ASSERT_OK(DB::Open(options
, "/dir/db", &db
));
158 for (size_t i
= 0; i
< 3; ++i
) {
159 ASSERT_OK(db
->Put(WriteOptions(), keys
[i
], vals
[i
]));
162 for (size_t i
= 0; i
< 3; ++i
) {
164 ASSERT_OK(db
->Get(ReadOptions(), keys
[i
], &res
));
165 ASSERT_TRUE(res
== vals
[i
]);
168 Iterator
* iterator
= db
->NewIterator(ReadOptions());
169 iterator
->SeekToFirst();
170 for (size_t i
= 0; i
< 3; ++i
) {
171 ASSERT_TRUE(iterator
->Valid());
172 ASSERT_TRUE(keys
[i
] == iterator
->key());
173 ASSERT_TRUE(vals
[i
] == iterator
->value());
176 ASSERT_TRUE(!iterator
->Valid());
179 DBImpl
* dbi
= static_cast_with_check
<DBImpl
>(db
);
180 ASSERT_OK(dbi
->TEST_FlushMemTable());
182 for (size_t i
= 0; i
< 3; ++i
) {
184 ASSERT_OK(db
->Get(ReadOptions(), keys
[i
], &res
));
185 ASSERT_TRUE(res
== vals
[i
]);
190 options
.create_if_missing
= false;
191 ASSERT_OK(DB::Open(options
, "/dir/db", &db
));
192 for (size_t i
= 0; i
< 3; ++i
) {
194 ASSERT_OK(db
->Get(ReadOptions(), keys
[i
], &res
));
195 ASSERT_TRUE(res
== vals
[i
]);
199 #endif // ROCKSDB_LITE
201 TEST_F(DBTest
, WriteEmptyBatch
) {
202 Options options
= CurrentOptions();
204 options
.write_buffer_size
= 100000;
205 CreateAndReopenWithCF({"pikachu"}, options
);
207 ASSERT_OK(Put(1, "foo", "bar"));
210 wo
.disableWAL
= false;
211 WriteBatch empty_batch
;
212 ASSERT_OK(dbfull()->Write(wo
, &empty_batch
));
214 // make sure we can re-open it.
215 ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options
));
216 ASSERT_EQ("bar", Get(1, "foo"));
219 TEST_F(DBTest
, SkipDelay
) {
220 Options options
= CurrentOptions();
222 options
.write_buffer_size
= 100000;
223 CreateAndReopenWithCF({"pikachu"}, options
);
225 for (bool sync
: {true, false}) {
226 for (bool disableWAL
: {true, false}) {
227 if (sync
&& disableWAL
) {
228 // sync and disableWAL is incompatible.
231 // Use a small number to ensure a large delay that is still effective
233 // TODO(myabandeh): this is time dependent and could potentially make
235 auto token
= dbfull()->TEST_write_controler().GetDelayToken(1);
236 std::atomic
<int> sleep_count(0);
237 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
238 "DBImpl::DelayWrite:Sleep",
239 [&](void* /*arg*/) { sleep_count
.fetch_add(1); });
240 std::atomic
<int> wait_count(0);
241 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
242 "DBImpl::DelayWrite:Wait",
243 [&](void* /*arg*/) { wait_count
.fetch_add(1); });
244 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
248 wo
.disableWAL
= disableWAL
;
249 wo
.no_slowdown
= true;
250 dbfull()->Put(wo
, "foo", "bar");
251 // We need the 2nd write to trigger delay. This is because delay is
252 // estimated based on the last write size which is 0 for the first write.
253 ASSERT_NOK(dbfull()->Put(wo
, "foo2", "bar2"));
254 ASSERT_GE(sleep_count
.load(), 0);
255 ASSERT_GE(wait_count
.load(), 0);
258 token
= dbfull()->TEST_write_controler().GetDelayToken(1000000000);
259 wo
.no_slowdown
= false;
260 ASSERT_OK(dbfull()->Put(wo
, "foo3", "bar3"));
261 ASSERT_GE(sleep_count
.load(), 1);
267 TEST_F(DBTest
, MixedSlowdownOptions
) {
268 Options options
= CurrentOptions();
270 options
.write_buffer_size
= 100000;
271 CreateAndReopenWithCF({"pikachu"}, options
);
272 std::vector
<port::Thread
> threads
;
273 std::atomic
<int> thread_num(0);
275 std::function
<void()> write_slowdown_func
= [&]() {
276 int a
= thread_num
.fetch_add(1);
277 std::string key
= "foo" + std::to_string(a
);
279 wo
.no_slowdown
= false;
280 ASSERT_OK(dbfull()->Put(wo
, key
, "bar"));
282 std::function
<void()> write_no_slowdown_func
= [&]() {
283 int a
= thread_num
.fetch_add(1);
284 std::string key
= "foo" + std::to_string(a
);
286 wo
.no_slowdown
= true;
287 ASSERT_NOK(dbfull()->Put(wo
, key
, "bar"));
289 // Use a small number to ensure a large delay that is still effective
291 // TODO(myabandeh): this is time dependent and could potentially make
293 auto token
= dbfull()->TEST_write_controler().GetDelayToken(1);
294 std::atomic
<int> sleep_count(0);
295 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
296 "DBImpl::DelayWrite:BeginWriteStallDone", [&](void* /*arg*/) {
297 sleep_count
.fetch_add(1);
298 if (threads
.empty()) {
299 for (int i
= 0; i
< 2; ++i
) {
300 threads
.emplace_back(write_slowdown_func
);
302 for (int i
= 0; i
< 2; ++i
) {
303 threads
.emplace_back(write_no_slowdown_func
);
307 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
311 wo
.disableWAL
= false;
312 wo
.no_slowdown
= false;
313 dbfull()->Put(wo
, "foo", "bar");
314 // We need the 2nd write to trigger delay. This is because delay is
315 // estimated based on the last write size which is 0 for the first write.
316 ASSERT_OK(dbfull()->Put(wo
, "foo2", "bar2"));
319 for (auto& t
: threads
) {
322 ASSERT_GE(sleep_count
.load(), 1);
324 wo
.no_slowdown
= true;
325 ASSERT_OK(dbfull()->Put(wo
, "foo3", "bar"));
328 TEST_F(DBTest
, MixedSlowdownOptionsInQueue
) {
329 Options options
= CurrentOptions();
331 options
.write_buffer_size
= 100000;
332 CreateAndReopenWithCF({"pikachu"}, options
);
333 std::vector
<port::Thread
> threads
;
334 std::atomic
<int> thread_num(0);
336 std::function
<void()> write_no_slowdown_func
= [&]() {
337 int a
= thread_num
.fetch_add(1);
338 std::string key
= "foo" + std::to_string(a
);
340 wo
.no_slowdown
= true;
341 ASSERT_NOK(dbfull()->Put(wo
, key
, "bar"));
343 // Use a small number to ensure a large delay that is still effective
345 // TODO(myabandeh): this is time dependent and could potentially make
347 auto token
= dbfull()->TEST_write_controler().GetDelayToken(1);
348 std::atomic
<int> sleep_count(0);
349 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
350 "DBImpl::DelayWrite:Sleep", [&](void* /*arg*/) {
351 sleep_count
.fetch_add(1);
352 if (threads
.empty()) {
353 for (int i
= 0; i
< 2; ++i
) {
354 threads
.emplace_back(write_no_slowdown_func
);
356 // Sleep for 2s to allow the threads to insert themselves into the
358 env_
->SleepForMicroseconds(3000000ULL);
361 std::atomic
<int> wait_count(0);
362 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
363 "DBImpl::DelayWrite:Wait",
364 [&](void* /*arg*/) { wait_count
.fetch_add(1); });
365 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
369 wo
.disableWAL
= false;
370 wo
.no_slowdown
= false;
371 dbfull()->Put(wo
, "foo", "bar");
372 // We need the 2nd write to trigger delay. This is because delay is
373 // estimated based on the last write size which is 0 for the first write.
374 ASSERT_OK(dbfull()->Put(wo
, "foo2", "bar2"));
377 for (auto& t
: threads
) {
380 ASSERT_EQ(sleep_count
.load(), 1);
381 ASSERT_GE(wait_count
.load(), 0);
384 TEST_F(DBTest
, MixedSlowdownOptionsStop
) {
385 Options options
= CurrentOptions();
387 options
.write_buffer_size
= 100000;
388 CreateAndReopenWithCF({"pikachu"}, options
);
389 std::vector
<port::Thread
> threads
;
390 std::atomic
<int> thread_num(0);
392 std::function
<void()> write_slowdown_func
= [&]() {
393 int a
= thread_num
.fetch_add(1);
394 std::string key
= "foo" + std::to_string(a
);
396 wo
.no_slowdown
= false;
397 ASSERT_OK(dbfull()->Put(wo
, key
, "bar"));
399 std::function
<void()> write_no_slowdown_func
= [&]() {
400 int a
= thread_num
.fetch_add(1);
401 std::string key
= "foo" + std::to_string(a
);
403 wo
.no_slowdown
= true;
404 ASSERT_NOK(dbfull()->Put(wo
, key
, "bar"));
406 std::function
<void()> wakeup_writer
= [&]() {
407 dbfull()->mutex_
.Lock();
408 dbfull()->bg_cv_
.SignalAll();
409 dbfull()->mutex_
.Unlock();
411 // Use a small number to ensure a large delay that is still effective
413 // TODO(myabandeh): this is time dependent and could potentially make
415 auto token
= dbfull()->TEST_write_controler().GetStopToken();
416 std::atomic
<int> wait_count(0);
417 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
418 "DBImpl::DelayWrite:Wait", [&](void* /*arg*/) {
419 wait_count
.fetch_add(1);
420 if (threads
.empty()) {
421 for (int i
= 0; i
< 2; ++i
) {
422 threads
.emplace_back(write_slowdown_func
);
424 for (int i
= 0; i
< 2; ++i
) {
425 threads
.emplace_back(write_no_slowdown_func
);
427 // Sleep for 2s to allow the threads to insert themselves into the
429 env_
->SleepForMicroseconds(3000000ULL);
432 threads
.emplace_back(wakeup_writer
);
434 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
438 wo
.disableWAL
= false;
439 wo
.no_slowdown
= false;
440 dbfull()->Put(wo
, "foo", "bar");
441 // We need the 2nd write to trigger delay. This is because delay is
442 // estimated based on the last write size which is 0 for the first write.
443 ASSERT_OK(dbfull()->Put(wo
, "foo2", "bar2"));
446 for (auto& t
: threads
) {
449 ASSERT_GE(wait_count
.load(), 1);
451 wo
.no_slowdown
= true;
452 ASSERT_OK(dbfull()->Put(wo
, "foo3", "bar"));
456 TEST_F(DBTest
, LevelLimitReopen
) {
457 Options options
= CurrentOptions();
458 CreateAndReopenWithCF({"pikachu"}, options
);
460 const std::string
value(1024 * 1024, ' ');
462 while (NumTableFilesAtLevel(2, 1) == 0) {
463 ASSERT_OK(Put(1, Key(i
++), value
));
466 options
.num_levels
= 1;
467 options
.max_bytes_for_level_multiplier_additional
.resize(1, 1);
468 Status s
= TryReopenWithColumnFamilies({"default", "pikachu"}, options
);
469 ASSERT_EQ(s
.IsInvalidArgument(), true);
470 ASSERT_EQ(s
.ToString(),
471 "Invalid argument: db has more levels than options.num_levels");
473 options
.num_levels
= 10;
474 options
.max_bytes_for_level_multiplier_additional
.resize(10, 1);
475 ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options
));
477 #endif // ROCKSDB_LITE
480 TEST_F(DBTest
, PutSingleDeleteGet
) {
482 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
483 ASSERT_OK(Put(1, "foo", "v1"));
484 ASSERT_EQ("v1", Get(1, "foo"));
485 ASSERT_OK(Put(1, "foo2", "v2"));
486 ASSERT_EQ("v2", Get(1, "foo2"));
487 ASSERT_OK(SingleDelete(1, "foo"));
488 ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
489 // Skip FIFO and universal compaction beccause they do not apply to the test
490 // case. Skip MergePut because single delete does not get removed when it
491 // encounters a merge.
492 } while (ChangeOptions(kSkipFIFOCompaction
| kSkipUniversalCompaction
|
496 TEST_F(DBTest
, ReadFromPersistedTier
) {
499 Options options
= CurrentOptions();
500 for (int disableWAL
= 0; disableWAL
<= 1; ++disableWAL
) {
501 CreateAndReopenWithCF({"pikachu"}, options
);
503 wopt
.disableWAL
= (disableWAL
== 1);
504 // 1st round: put but not flush
505 ASSERT_OK(db_
->Put(wopt
, handles_
[1], "foo", "first"));
506 ASSERT_OK(db_
->Put(wopt
, handles_
[1], "bar", "one"));
507 ASSERT_EQ("first", Get(1, "foo"));
508 ASSERT_EQ("one", Get(1, "bar"));
510 // Read directly from persited data.
512 ropt
.read_tier
= kPersistedTier
;
514 if (wopt
.disableWAL
) {
515 // as data has not yet being flushed, we expect not found.
516 ASSERT_TRUE(db_
->Get(ropt
, handles_
[1], "foo", &value
).IsNotFound());
517 ASSERT_TRUE(db_
->Get(ropt
, handles_
[1], "bar", &value
).IsNotFound());
519 ASSERT_OK(db_
->Get(ropt
, handles_
[1], "foo", &value
));
520 ASSERT_OK(db_
->Get(ropt
, handles_
[1], "bar", &value
));
524 std::vector
<ColumnFamilyHandle
*> multiget_cfs
;
525 multiget_cfs
.push_back(handles_
[1]);
526 multiget_cfs
.push_back(handles_
[1]);
527 std::vector
<Slice
> multiget_keys
;
528 multiget_keys
.push_back("foo");
529 multiget_keys
.push_back("bar");
530 std::vector
<std::string
> multiget_values
;
532 db_
->MultiGet(ropt
, multiget_cfs
, multiget_keys
, &multiget_values
);
533 if (wopt
.disableWAL
) {
534 ASSERT_TRUE(statuses
[0].IsNotFound());
535 ASSERT_TRUE(statuses
[1].IsNotFound());
537 ASSERT_OK(statuses
[0]);
538 ASSERT_OK(statuses
[1]);
541 // 2nd round: flush and put a new value in memtable.
543 ASSERT_OK(db_
->Put(wopt
, handles_
[1], "rocksdb", "hello"));
545 // once the data has been flushed, we are able to get the
546 // data when kPersistedTier is used.
547 ASSERT_TRUE(db_
->Get(ropt
, handles_
[1], "foo", &value
).ok());
548 ASSERT_EQ(value
, "first");
549 ASSERT_TRUE(db_
->Get(ropt
, handles_
[1], "bar", &value
).ok());
550 ASSERT_EQ(value
, "one");
551 if (wopt
.disableWAL
) {
553 db_
->Get(ropt
, handles_
[1], "rocksdb", &value
).IsNotFound());
555 ASSERT_OK(db_
->Get(ropt
, handles_
[1], "rocksdb", &value
));
556 ASSERT_EQ(value
, "hello");
559 // Expect same result in multiget
560 multiget_cfs
.push_back(handles_
[1]);
561 multiget_keys
.push_back("rocksdb");
563 db_
->MultiGet(ropt
, multiget_cfs
, multiget_keys
, &multiget_values
);
564 ASSERT_TRUE(statuses
[0].ok());
565 ASSERT_EQ("first", multiget_values
[0]);
566 ASSERT_TRUE(statuses
[1].ok());
567 ASSERT_EQ("one", multiget_values
[1]);
568 if (wopt
.disableWAL
) {
569 ASSERT_TRUE(statuses
[2].IsNotFound());
571 ASSERT_OK(statuses
[2]);
574 // 3rd round: delete and flush
575 ASSERT_OK(db_
->Delete(wopt
, handles_
[1], "foo"));
577 ASSERT_OK(db_
->Delete(wopt
, handles_
[1], "bar"));
579 ASSERT_TRUE(db_
->Get(ropt
, handles_
[1], "foo", &value
).IsNotFound());
580 if (wopt
.disableWAL
) {
581 // Still expect finding the value as its delete has not yet being
583 ASSERT_TRUE(db_
->Get(ropt
, handles_
[1], "bar", &value
).ok());
584 ASSERT_EQ(value
, "one");
586 ASSERT_TRUE(db_
->Get(ropt
, handles_
[1], "bar", &value
).IsNotFound());
588 ASSERT_TRUE(db_
->Get(ropt
, handles_
[1], "rocksdb", &value
).ok());
589 ASSERT_EQ(value
, "hello");
592 db_
->MultiGet(ropt
, multiget_cfs
, multiget_keys
, &multiget_values
);
593 ASSERT_TRUE(statuses
[0].IsNotFound());
594 if (wopt
.disableWAL
) {
595 ASSERT_TRUE(statuses
[1].ok());
596 ASSERT_EQ("one", multiget_values
[1]);
598 ASSERT_TRUE(statuses
[1].IsNotFound());
600 ASSERT_TRUE(statuses
[2].ok());
601 ASSERT_EQ("hello", multiget_values
[2]);
602 if (wopt
.disableWAL
== 0) {
603 DestroyAndReopen(options
);
606 } while (ChangeOptions());
609 TEST_F(DBTest
, SingleDeleteFlush
) {
610 // Test to check whether flushing preserves a single delete hidden
615 Options options
= CurrentOptions();
616 options
.disable_auto_compactions
= true;
617 CreateAndReopenWithCF({"pikachu"}, options
);
619 // Put values on second level (so that they will not be in the same
620 // compaction as the other operations.
621 Put(1, "foo", "first");
622 Put(1, "bar", "one");
624 MoveFilesToLevel(2, 1);
626 // (Single) delete hidden by a put
627 SingleDelete(1, "foo");
628 Put(1, "foo", "second");
630 Put(1, "bar", "two");
633 SingleDelete(1, "foo");
637 dbfull()->CompactRange(CompactRangeOptions(), handles_
[1], nullptr,
640 ASSERT_EQ("NOT_FOUND", Get(1, "bar"));
641 ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
642 // Skip FIFO and universal compaction beccause they do not apply to the test
643 // case. Skip MergePut because single delete does not get removed when it
644 // encounters a merge.
645 } while (ChangeOptions(kSkipFIFOCompaction
| kSkipUniversalCompaction
|
649 TEST_F(DBTest
, SingleDeletePutFlush
) {
650 // Single deletes that encounter the matching put in a flush should get
655 Options options
= CurrentOptions();
656 options
.disable_auto_compactions
= true;
657 CreateAndReopenWithCF({"pikachu"}, options
);
659 Put(1, "foo", Slice());
660 Put(1, "a", Slice());
661 SingleDelete(1, "a");
664 ASSERT_EQ("[ ]", AllEntriesFor("a", 1));
665 // Skip FIFO and universal compaction beccause they do not apply to the test
666 // case. Skip MergePut because single delete does not get removed when it
667 // encounters a merge.
668 } while (ChangeOptions(kSkipFIFOCompaction
| kSkipUniversalCompaction
|
672 // Disable because not all platform can run it.
673 // It requires more than 9GB memory to run it, With single allocation
675 TEST_F(DBTest
, DISABLED_SanitizeVeryVeryLargeValue
) {
676 const size_t kValueSize
= 4 * size_t{1024 * 1024 * 1024}; // 4GB value
677 std::string
raw(kValueSize
, 'v');
678 Options options
= CurrentOptions();
680 options
.merge_operator
= MergeOperators::CreatePutOperator();
681 options
.write_buffer_size
= 100000; // Small write buffer
682 options
.paranoid_checks
= true;
683 DestroyAndReopen(options
);
685 ASSERT_OK(Put("boo", "v1"));
686 ASSERT_TRUE(Put("foo", raw
).IsInvalidArgument());
687 ASSERT_TRUE(Merge("foo", raw
).IsInvalidArgument());
690 ASSERT_TRUE(wb
.Put("foo", raw
).IsInvalidArgument());
691 ASSERT_TRUE(wb
.Merge("foo", raw
).IsInvalidArgument());
693 Slice value_slice
= raw
;
694 Slice key_slice
= "foo";
695 SliceParts
sp_key(&key_slice
, 1);
696 SliceParts
sp_value(&value_slice
, 1);
698 ASSERT_TRUE(wb
.Put(sp_key
, sp_value
).IsInvalidArgument());
699 ASSERT_TRUE(wb
.Merge(sp_key
, sp_value
).IsInvalidArgument());
702 // Disable because not all platform can run it.
703 // It requires more than 9GB memory to run it, With single allocation
705 TEST_F(DBTest
, DISABLED_VeryLargeValue
) {
706 const size_t kValueSize
= 3221225472u; // 3GB value
707 const size_t kKeySize
= 8388608u; // 8MB key
708 std::string
raw(kValueSize
, 'v');
709 std::string
key1(kKeySize
, 'c');
710 std::string
key2(kKeySize
, 'd');
712 Options options
= CurrentOptions();
714 options
.write_buffer_size
= 100000; // Small write buffer
715 options
.paranoid_checks
= true;
716 DestroyAndReopen(options
);
718 ASSERT_OK(Put("boo", "v1"));
719 ASSERT_OK(Put("foo", "v1"));
720 ASSERT_OK(Put(key1
, raw
));
722 ASSERT_OK(Put(key2
, raw
));
723 dbfull()->TEST_WaitForFlushMemTable();
726 ASSERT_EQ(1, NumTableFilesAtLevel(0));
727 #endif // !ROCKSDB_LITE
730 Status s
= db_
->Get(ReadOptions(), key1
, &value
);
732 ASSERT_EQ(kValueSize
, value
.size());
733 ASSERT_EQ('v', value
[0]);
735 s
= db_
->Get(ReadOptions(), key2
, &value
);
737 ASSERT_EQ(kValueSize
, value
.size());
738 ASSERT_EQ('w', value
[0]);
740 // Compact all files.
742 db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr);
744 // Check DB is not in read-only state.
745 ASSERT_OK(Put("boo", "v1"));
747 s
= db_
->Get(ReadOptions(), key1
, &value
);
749 ASSERT_EQ(kValueSize
, value
.size());
750 ASSERT_EQ('v', value
[0]);
752 s
= db_
->Get(ReadOptions(), key2
, &value
);
754 ASSERT_EQ(kValueSize
, value
.size());
755 ASSERT_EQ('w', value
[0]);
758 TEST_F(DBTest
, GetFromImmutableLayer
) {
760 Options options
= CurrentOptions();
762 CreateAndReopenWithCF({"pikachu"}, options
);
764 ASSERT_OK(Put(1, "foo", "v1"));
765 ASSERT_EQ("v1", Get(1, "foo"));
768 env_
->delay_sstable_sync_
.store(true, std::memory_order_release
);
769 Put(1, "k1", std::string(100000, 'x')); // Fill memtable
770 Put(1, "k2", std::string(100000, 'y')); // Trigger flush
771 ASSERT_EQ("v1", Get(1, "foo"));
772 ASSERT_EQ("NOT_FOUND", Get(0, "foo"));
773 // Release sync calls
774 env_
->delay_sstable_sync_
.store(false, std::memory_order_release
);
775 } while (ChangeOptions());
779 TEST_F(DBTest
, GetLevel0Ordering
) {
781 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
782 // Check that we process level-0 files in correct order. The code
783 // below generates two level-0 files where the earlier one comes
784 // before the later one in the level-0 file list since the earlier
785 // one has a smaller "smallest" key.
786 ASSERT_OK(Put(1, "bar", "b"));
787 ASSERT_OK(Put(1, "foo", "v1"));
789 ASSERT_OK(Put(1, "foo", "v2"));
791 ASSERT_EQ("v2", Get(1, "foo"));
792 } while (ChangeOptions());
795 TEST_F(DBTest
, WrongLevel0Config
) {
796 Options options
= CurrentOptions();
798 ASSERT_OK(DestroyDB(dbname_
, options
));
799 options
.level0_stop_writes_trigger
= 1;
800 options
.level0_slowdown_writes_trigger
= 2;
801 options
.level0_file_num_compaction_trigger
= 3;
802 ASSERT_OK(DB::Open(options
, dbname_
, &db_
));
806 TEST_F(DBTest
, GetOrderedByLevels
) {
808 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
809 ASSERT_OK(Put(1, "foo", "v1"));
810 Compact(1, "a", "z");
811 ASSERT_EQ("v1", Get(1, "foo"));
812 ASSERT_OK(Put(1, "foo", "v2"));
813 ASSERT_EQ("v2", Get(1, "foo"));
815 ASSERT_EQ("v2", Get(1, "foo"));
816 } while (ChangeOptions());
819 TEST_F(DBTest
, GetPicksCorrectFile
) {
821 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
822 // Arrange to have multiple files in a non-level-0 level.
823 ASSERT_OK(Put(1, "a", "va"));
824 Compact(1, "a", "b");
825 ASSERT_OK(Put(1, "x", "vx"));
826 Compact(1, "x", "y");
827 ASSERT_OK(Put(1, "f", "vf"));
828 Compact(1, "f", "g");
829 ASSERT_EQ("va", Get(1, "a"));
830 ASSERT_EQ("vf", Get(1, "f"));
831 ASSERT_EQ("vx", Get(1, "x"));
832 } while (ChangeOptions());
835 TEST_F(DBTest
, GetEncountersEmptyLevel
) {
837 Options options
= CurrentOptions();
838 CreateAndReopenWithCF({"pikachu"}, options
);
839 // Arrange for the following to happen:
840 // * sstable A in level 0
841 // * nothing in level 1
842 // * sstable B in level 2
843 // Then do enough Get() calls to arrange for an automatic compaction
844 // of sstable A. A bug would cause the compaction to be marked as
845 // occurring at level 1 (instead of the correct level 0).
847 // Step 1: First place sstables in levels 0 and 2
848 Put(1, "a", "begin");
851 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_
[1]);
852 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_
[1]);
853 Put(1, "a", "begin");
856 ASSERT_GT(NumTableFilesAtLevel(0, 1), 0);
857 ASSERT_GT(NumTableFilesAtLevel(2, 1), 0);
859 // Step 2: clear level 1 if necessary.
860 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_
[1]);
861 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 1);
862 ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
863 ASSERT_EQ(NumTableFilesAtLevel(2, 1), 1);
865 // Step 3: read a bunch of times
866 for (int i
= 0; i
< 1000; i
++) {
867 ASSERT_EQ("NOT_FOUND", Get(1, "missing"));
870 // Step 4: Wait for compaction to finish
871 dbfull()->TEST_WaitForCompact();
873 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 1); // XXX
874 } while (ChangeOptions(kSkipUniversalCompaction
| kSkipFIFOCompaction
));
876 #endif // ROCKSDB_LITE
878 TEST_F(DBTest
, FlushMultipleMemtable
) {
880 Options options
= CurrentOptions();
881 WriteOptions writeOpt
= WriteOptions();
882 writeOpt
.disableWAL
= true;
883 options
.max_write_buffer_number
= 4;
884 options
.min_write_buffer_number_to_merge
= 3;
885 options
.max_write_buffer_size_to_maintain
= -1;
886 CreateAndReopenWithCF({"pikachu"}, options
);
887 ASSERT_OK(dbfull()->Put(writeOpt
, handles_
[1], "foo", "v1"));
889 ASSERT_OK(dbfull()->Put(writeOpt
, handles_
[1], "bar", "v1"));
891 ASSERT_EQ("v1", Get(1, "foo"));
892 ASSERT_EQ("v1", Get(1, "bar"));
894 } while (ChangeCompactOptions());
897 TEST_F(DBTest
, FlushSchedule
) {
898 Options options
= CurrentOptions();
899 options
.disable_auto_compactions
= true;
900 options
.level0_stop_writes_trigger
= 1 << 10;
901 options
.level0_slowdown_writes_trigger
= 1 << 10;
902 options
.min_write_buffer_number_to_merge
= 1;
903 options
.max_write_buffer_size_to_maintain
=
904 static_cast<int64_t>(options
.write_buffer_size
);
905 options
.max_write_buffer_number
= 2;
906 options
.write_buffer_size
= 120 * 1024;
907 CreateAndReopenWithCF({"pikachu"}, options
);
908 std::vector
<port::Thread
> threads
;
910 std::atomic
<int> thread_num(0);
911 // each column family will have 5 thread, each thread generating 2 memtables.
912 // each column family should end up with 10 table files
913 std::function
<void()> fill_memtable_func
= [&]() {
914 int a
= thread_num
.fetch_add(1);
917 // this should fill up 2 memtables
918 for (int k
= 0; k
< 5000; ++k
) {
919 ASSERT_OK(db_
->Put(wo
, handles_
[a
& 1], rnd
.RandomString(13), ""));
923 for (int i
= 0; i
< 10; ++i
) {
924 threads
.emplace_back(fill_memtable_func
);
927 for (auto& t
: threads
) {
931 auto default_tables
= GetNumberOfSstFilesForColumnFamily(db_
, "default");
932 auto pikachu_tables
= GetNumberOfSstFilesForColumnFamily(db_
, "pikachu");
933 ASSERT_LE(default_tables
, static_cast<uint64_t>(10));
934 ASSERT_GT(default_tables
, static_cast<uint64_t>(0));
935 ASSERT_LE(pikachu_tables
, static_cast<uint64_t>(10));
936 ASSERT_GT(pikachu_tables
, static_cast<uint64_t>(0));
938 #endif // ROCKSDB_LITE
941 class KeepFilter
: public CompactionFilter
{
943 bool Filter(int /*level*/, const Slice
& /*key*/, const Slice
& /*value*/,
944 std::string
* /*new_value*/,
945 bool* /*value_changed*/) const override
{
949 const char* Name() const override
{ return "KeepFilter"; }
952 class KeepFilterFactory
: public CompactionFilterFactory
{
954 explicit KeepFilterFactory(bool check_context
= false)
955 : check_context_(check_context
) {}
957 std::unique_ptr
<CompactionFilter
> CreateCompactionFilter(
958 const CompactionFilter::Context
& context
) override
{
959 if (check_context_
) {
960 EXPECT_EQ(expect_full_compaction_
.load(), context
.is_full_compaction
);
961 EXPECT_EQ(expect_manual_compaction_
.load(), context
.is_manual_compaction
);
963 return std::unique_ptr
<CompactionFilter
>(new KeepFilter());
966 const char* Name() const override
{ return "KeepFilterFactory"; }
968 std::atomic_bool expect_full_compaction_
;
969 std::atomic_bool expect_manual_compaction_
;
972 class DelayFilter
: public CompactionFilter
{
974 explicit DelayFilter(DBTestBase
* d
) : db_test(d
) {}
975 bool Filter(int /*level*/, const Slice
& /*key*/, const Slice
& /*value*/,
976 std::string
* /*new_value*/,
977 bool* /*value_changed*/) const override
{
978 db_test
->env_
->MockSleepForMicroseconds(1000);
982 const char* Name() const override
{ return "DelayFilter"; }
988 class DelayFilterFactory
: public CompactionFilterFactory
{
990 explicit DelayFilterFactory(DBTestBase
* d
) : db_test(d
) {}
991 std::unique_ptr
<CompactionFilter
> CreateCompactionFilter(
992 const CompactionFilter::Context
& /*context*/) override
{
993 return std::unique_ptr
<CompactionFilter
>(new DelayFilter(db_test
));
996 const char* Name() const override
{ return "DelayFilterFactory"; }
1003 #ifndef ROCKSDB_LITE
1005 static std::string
CompressibleString(Random
* rnd
, int len
) {
1007 test::CompressibleString(rnd
, 0.8, len
, &r
);
1010 #endif // ROCKSDB_LITE
1012 TEST_F(DBTest
, FailMoreDbPaths
) {
1013 Options options
= CurrentOptions();
1014 options
.db_paths
.emplace_back(dbname_
, 10000000);
1015 options
.db_paths
.emplace_back(dbname_
+ "_2", 1000000);
1016 options
.db_paths
.emplace_back(dbname_
+ "_3", 1000000);
1017 options
.db_paths
.emplace_back(dbname_
+ "_4", 1000000);
1018 options
.db_paths
.emplace_back(dbname_
+ "_5", 1000000);
1019 ASSERT_TRUE(TryReopen(options
).IsNotSupported());
1022 void CheckColumnFamilyMeta(
1023 const ColumnFamilyMetaData
& cf_meta
,
1024 const std::vector
<std::vector
<FileMetaData
>>& files_by_level
,
1025 uint64_t start_time
, uint64_t end_time
) {
1026 ASSERT_EQ(cf_meta
.name
, kDefaultColumnFamilyName
);
1027 ASSERT_EQ(cf_meta
.levels
.size(), files_by_level
.size());
1029 uint64_t cf_size
= 0;
1030 size_t file_count
= 0;
1032 for (size_t i
= 0; i
< cf_meta
.levels
.size(); ++i
) {
1033 const auto& level_meta_from_cf
= cf_meta
.levels
[i
];
1034 const auto& level_meta_from_files
= files_by_level
[i
];
1036 ASSERT_EQ(level_meta_from_cf
.level
, i
);
1037 ASSERT_EQ(level_meta_from_cf
.files
.size(), level_meta_from_files
.size());
1039 file_count
+= level_meta_from_cf
.files
.size();
1041 uint64_t level_size
= 0;
1042 for (size_t j
= 0; j
< level_meta_from_cf
.files
.size(); ++j
) {
1043 const auto& file_meta_from_cf
= level_meta_from_cf
.files
[j
];
1044 const auto& file_meta_from_files
= level_meta_from_files
[j
];
1046 level_size
+= file_meta_from_cf
.size
;
1048 ASSERT_EQ(file_meta_from_cf
.file_number
,
1049 file_meta_from_files
.fd
.GetNumber());
1050 ASSERT_EQ(file_meta_from_cf
.file_number
,
1051 TableFileNameToNumber(file_meta_from_cf
.name
));
1052 ASSERT_EQ(file_meta_from_cf
.size
, file_meta_from_files
.fd
.file_size
);
1053 ASSERT_EQ(file_meta_from_cf
.smallest_seqno
,
1054 file_meta_from_files
.fd
.smallest_seqno
);
1055 ASSERT_EQ(file_meta_from_cf
.largest_seqno
,
1056 file_meta_from_files
.fd
.largest_seqno
);
1057 ASSERT_EQ(file_meta_from_cf
.smallestkey
,
1058 file_meta_from_files
.smallest
.user_key().ToString());
1059 ASSERT_EQ(file_meta_from_cf
.largestkey
,
1060 file_meta_from_files
.largest
.user_key().ToString());
1061 ASSERT_EQ(file_meta_from_cf
.oldest_blob_file_number
,
1062 file_meta_from_files
.oldest_blob_file_number
);
1063 ASSERT_EQ(file_meta_from_cf
.oldest_ancester_time
,
1064 file_meta_from_files
.oldest_ancester_time
);
1065 ASSERT_EQ(file_meta_from_cf
.file_creation_time
,
1066 file_meta_from_files
.file_creation_time
);
1067 ASSERT_GE(file_meta_from_cf
.file_creation_time
, start_time
);
1068 ASSERT_LE(file_meta_from_cf
.file_creation_time
, end_time
);
1069 ASSERT_GE(file_meta_from_cf
.oldest_ancester_time
, start_time
);
1070 ASSERT_LE(file_meta_from_cf
.oldest_ancester_time
, end_time
);
1073 ASSERT_EQ(level_meta_from_cf
.size
, level_size
);
1074 cf_size
+= level_size
;
1077 ASSERT_EQ(cf_meta
.file_count
, file_count
);
1078 ASSERT_EQ(cf_meta
.size
, cf_size
);
1081 void CheckLiveFilesMeta(
1082 const std::vector
<LiveFileMetaData
>& live_file_meta
,
1083 const std::vector
<std::vector
<FileMetaData
>>& files_by_level
) {
1084 size_t total_file_count
= 0;
1085 for (const auto& f
: files_by_level
) {
1086 total_file_count
+= f
.size();
1089 ASSERT_EQ(live_file_meta
.size(), total_file_count
);
1094 for (const auto& meta
: live_file_meta
) {
1095 if (level
!= meta
.level
) {
1100 ASSERT_LT(i
, files_by_level
[level
].size());
1102 const auto& expected_meta
= files_by_level
[level
][i
];
1104 ASSERT_EQ(meta
.column_family_name
, kDefaultColumnFamilyName
);
1105 ASSERT_EQ(meta
.file_number
, expected_meta
.fd
.GetNumber());
1106 ASSERT_EQ(meta
.file_number
, TableFileNameToNumber(meta
.name
));
1107 ASSERT_EQ(meta
.size
, expected_meta
.fd
.file_size
);
1108 ASSERT_EQ(meta
.smallest_seqno
, expected_meta
.fd
.smallest_seqno
);
1109 ASSERT_EQ(meta
.largest_seqno
, expected_meta
.fd
.largest_seqno
);
1110 ASSERT_EQ(meta
.smallestkey
, expected_meta
.smallest
.user_key().ToString());
1111 ASSERT_EQ(meta
.largestkey
, expected_meta
.largest
.user_key().ToString());
1112 ASSERT_EQ(meta
.oldest_blob_file_number
,
1113 expected_meta
.oldest_blob_file_number
);
1119 #ifndef ROCKSDB_LITE
1120 TEST_F(DBTest
, MetaDataTest
) {
1121 Options options
= CurrentOptions();
1122 options
.create_if_missing
= true;
1123 options
.disable_auto_compactions
= true;
1125 int64_t temp_time
= 0;
1126 options
.env
->GetCurrentTime(&temp_time
);
1127 uint64_t start_time
= static_cast<uint64_t>(temp_time
);
1129 DestroyAndReopen(options
);
1133 for (int i
= 0; i
< 100; ++i
) {
1134 // Add a single blob reference to each file
1135 std::string blob_index
;
1136 BlobIndex::EncodeBlob(&blob_index
, /* blob_file_number */ i
+ 1000,
1137 /* offset */ 1234, /* size */ 5678, kNoCompression
);
1140 ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch
, 0, Key(key_index
),
1142 ASSERT_OK(dbfull()->Write(WriteOptions(), &batch
));
1146 // Fill up the rest of the file with random values.
1147 GenerateNewFile(&rnd
, &key_index
, /* nowait */ true);
1152 std::vector
<std::vector
<FileMetaData
>> files_by_level
;
1153 dbfull()->TEST_GetFilesMetaData(db_
->DefaultColumnFamily(), &files_by_level
);
1155 options
.env
->GetCurrentTime(&temp_time
);
1156 uint64_t end_time
= static_cast<uint64_t>(temp_time
);
1158 ColumnFamilyMetaData cf_meta
;
1159 db_
->GetColumnFamilyMetaData(&cf_meta
);
1160 CheckColumnFamilyMeta(cf_meta
, files_by_level
, start_time
, end_time
);
1162 std::vector
<LiveFileMetaData
> live_file_meta
;
1163 db_
->GetLiveFilesMetaData(&live_file_meta
);
1164 CheckLiveFilesMeta(live_file_meta
, files_by_level
);
1168 void MinLevelHelper(DBTest
* self
, Options
& options
) {
1171 for (int num
= 0; num
< options
.level0_file_num_compaction_trigger
- 1;
1173 std::vector
<std::string
> values
;
1174 // Write 120KB (12 values, each 10K)
1175 for (int i
= 0; i
< 12; i
++) {
1176 values
.push_back(rnd
.RandomString(10000));
1177 ASSERT_OK(self
->Put(DBTestBase::Key(i
), values
[i
]));
1179 self
->dbfull()->TEST_WaitForFlushMemTable();
1180 ASSERT_EQ(self
->NumTableFilesAtLevel(0), num
+ 1);
1183 // generate one more file in level-0, and should trigger level-0 compaction
1184 std::vector
<std::string
> values
;
1185 for (int i
= 0; i
< 12; i
++) {
1186 values
.push_back(rnd
.RandomString(10000));
1187 ASSERT_OK(self
->Put(DBTestBase::Key(i
), values
[i
]));
1189 self
->dbfull()->TEST_WaitForCompact();
1191 ASSERT_EQ(self
->NumTableFilesAtLevel(0), 0);
1192 ASSERT_EQ(self
->NumTableFilesAtLevel(1), 1);
1195 // returns false if the calling-Test should be skipped
1196 bool MinLevelToCompress(CompressionType
& type
, Options
& options
, int wbits
,
1197 int lev
, int strategy
) {
1199 "Test with compression options : window_bits = %d, level = %d, "
1201 wbits
, lev
, strategy
);
1202 options
.write_buffer_size
= 100 << 10; // 100KB
1203 options
.arena_block_size
= 4096;
1204 options
.num_levels
= 3;
1205 options
.level0_file_num_compaction_trigger
= 3;
1206 options
.create_if_missing
= true;
1208 if (Snappy_Supported()) {
1209 type
= kSnappyCompression
;
1210 fprintf(stderr
, "using snappy\n");
1211 } else if (Zlib_Supported()) {
1212 type
= kZlibCompression
;
1213 fprintf(stderr
, "using zlib\n");
1214 } else if (BZip2_Supported()) {
1215 type
= kBZip2Compression
;
1216 fprintf(stderr
, "using bzip2\n");
1217 } else if (LZ4_Supported()) {
1218 type
= kLZ4Compression
;
1219 fprintf(stderr
, "using lz4\n");
1220 } else if (XPRESS_Supported()) {
1221 type
= kXpressCompression
;
1222 fprintf(stderr
, "using xpress\n");
1223 } else if (ZSTD_Supported()) {
1225 fprintf(stderr
, "using ZSTD\n");
1227 fprintf(stderr
, "skipping test, compression disabled\n");
1230 options
.compression_per_level
.resize(options
.num_levels
);
1232 // do not compress L0
1233 for (int i
= 0; i
< 1; i
++) {
1234 options
.compression_per_level
[i
] = kNoCompression
;
1236 for (int i
= 1; i
< options
.num_levels
; i
++) {
1237 options
.compression_per_level
[i
] = type
;
1243 TEST_F(DBTest
, MinLevelToCompress1
) {
1244 Options options
= CurrentOptions();
1245 CompressionType type
= kSnappyCompression
;
1246 if (!MinLevelToCompress(type
, options
, -14, -1, 0)) {
1250 MinLevelHelper(this, options
);
1252 // do not compress L0 and L1
1253 for (int i
= 0; i
< 2; i
++) {
1254 options
.compression_per_level
[i
] = kNoCompression
;
1256 for (int i
= 2; i
< options
.num_levels
; i
++) {
1257 options
.compression_per_level
[i
] = type
;
1259 DestroyAndReopen(options
);
1260 MinLevelHelper(this, options
);
1263 TEST_F(DBTest
, MinLevelToCompress2
) {
1264 Options options
= CurrentOptions();
1265 CompressionType type
= kSnappyCompression
;
1266 if (!MinLevelToCompress(type
, options
, 15, -1, 0)) {
1270 MinLevelHelper(this, options
);
1272 // do not compress L0 and L1
1273 for (int i
= 0; i
< 2; i
++) {
1274 options
.compression_per_level
[i
] = kNoCompression
;
1276 for (int i
= 2; i
< options
.num_levels
; i
++) {
1277 options
.compression_per_level
[i
] = type
;
1279 DestroyAndReopen(options
);
1280 MinLevelHelper(this, options
);
1283 // This test may fail because of a legit case that multiple L0 files
1284 // are trivial moved to L1.
1285 TEST_F(DBTest
, DISABLED_RepeatedWritesToSameKey
) {
1287 Options options
= CurrentOptions();
1289 options
.write_buffer_size
= 100000; // Small write buffer
1290 CreateAndReopenWithCF({"pikachu"}, options
);
1292 // We must have at most one file per level except for level-0,
1293 // which may have up to kL0_StopWritesTrigger files.
1294 const int kMaxFiles
=
1295 options
.num_levels
+ options
.level0_stop_writes_trigger
;
1299 rnd
.RandomString(static_cast<int>(2 * options
.write_buffer_size
));
1300 for (int i
= 0; i
< 5 * kMaxFiles
; i
++) {
1301 ASSERT_OK(Put(1, "key", value
));
1302 ASSERT_LE(TotalTableFiles(1), kMaxFiles
);
1304 } while (ChangeCompactOptions());
1306 #endif // ROCKSDB_LITE
1308 TEST_F(DBTest
, SparseMerge
) {
1310 Options options
= CurrentOptions();
1311 options
.compression
= kNoCompression
;
1312 CreateAndReopenWithCF({"pikachu"}, options
);
1314 FillLevels("A", "Z", 1);
1316 // Suppose there is:
1317 // small amount of data with prefix A
1318 // large amount of data with prefix B
1319 // small amount of data with prefix C
1320 // and that recent updates have made small changes to all three prefixes.
1321 // Check that we do not do a compaction that merges all of B in one shot.
1322 const std::string
value(1000, 'x');
1324 // Write approximately 100MB of "B" values
1325 for (int i
= 0; i
< 100000; i
++) {
1327 snprintf(key
, sizeof(key
), "B%010d", i
);
1331 ASSERT_OK(Flush(1));
1332 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_
[1]);
1334 // Make sparse update
1336 Put(1, "B100", "bvalue2");
1338 ASSERT_OK(Flush(1));
1340 // Compactions should not cause us to create a situation where
1341 // a file overlaps too much data at the next level.
1342 ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(handles_
[1]),
1344 dbfull()->TEST_CompactRange(0, nullptr, nullptr);
1345 ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(handles_
[1]),
1347 dbfull()->TEST_CompactRange(1, nullptr, nullptr);
1348 ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(handles_
[1]),
1350 } while (ChangeCompactOptions());
1353 #ifndef ROCKSDB_LITE
1354 static bool Between(uint64_t val
, uint64_t low
, uint64_t high
) {
1355 bool result
= (val
>= low
) && (val
<= high
);
1357 fprintf(stderr
, "Value %llu is not in range [%llu, %llu]\n",
1358 (unsigned long long)(val
), (unsigned long long)(low
),
1359 (unsigned long long)(high
));
1364 TEST_F(DBTest
, ApproximateSizesMemTable
) {
1365 Options options
= CurrentOptions();
1366 options
.write_buffer_size
= 100000000; // Large write buffer
1367 options
.compression
= kNoCompression
;
1368 options
.create_if_missing
= true;
1369 DestroyAndReopen(options
);
1370 auto default_cf
= db_
->DefaultColumnFamily();
1374 for (int i
= 0; i
< N
; i
++) {
1375 ASSERT_OK(Put(Key(i
), rnd
.RandomString(1024)));
1379 std::string start
= Key(50);
1380 std::string end
= Key(60);
1381 Range
r(start
, end
);
1382 SizeApproximationOptions size_approx_options
;
1383 size_approx_options
.include_memtabtles
= true;
1384 size_approx_options
.include_files
= true;
1385 db_
->GetApproximateSizes(size_approx_options
, default_cf
, &r
, 1, &size
);
1386 ASSERT_GT(size
, 6000);
1387 ASSERT_LT(size
, 204800);
1388 // Zero if not including mem table
1389 db_
->GetApproximateSizes(&r
, 1, &size
);
1394 r
= Range(start
, end
);
1395 db_
->GetApproximateSizes(size_approx_options
, default_cf
, &r
, 1, &size
);
1398 for (int i
= 0; i
< N
; i
++) {
1399 ASSERT_OK(Put(Key(1000 + i
), rnd
.RandomString(1024)));
1404 r
= Range(start
, end
);
1405 db_
->GetApproximateSizes(size_approx_options
, default_cf
, &r
, 1, &size
);
1410 r
= Range(start
, end
);
1411 db_
->GetApproximateSizes(size_approx_options
, default_cf
, &r
, 1, &size
);
1412 ASSERT_GT(size
, 6000);
1414 options
.max_write_buffer_number
= 8;
1415 options
.min_write_buffer_number_to_merge
= 5;
1416 options
.write_buffer_size
= 1024 * N
; // Not very large
1417 DestroyAndReopen(options
);
1418 default_cf
= db_
->DefaultColumnFamily();
1421 for (int i
= 0; i
< N
; i
++) {
1422 keys
[i
* 3] = i
* 5;
1423 keys
[i
* 3 + 1] = i
* 5 + 1;
1424 keys
[i
* 3 + 2] = i
* 5 + 2;
1426 // MemTable entry counting is estimated and can vary greatly depending on
1427 // layout. Thus, using deterministic seed for test stability.
1428 RandomShuffle(std::begin(keys
), std::end(keys
), rnd
.Next());
1430 for (int i
= 0; i
< N
* 3; i
++) {
1431 ASSERT_OK(Put(Key(keys
[i
] + 1000), rnd
.RandomString(1024)));
1436 r
= Range(start
, end
);
1437 db_
->GetApproximateSizes(size_approx_options
, default_cf
, &r
, 1, &size
);
1442 r
= Range(start
, end
);
1443 db_
->GetApproximateSizes(size_approx_options
, default_cf
, &r
, 1, &size
);
1444 ASSERT_GT(size
, 6000);
1448 r
= Range(start
, end
);
1449 db_
->GetApproximateSizes(size_approx_options
, default_cf
, &r
, 1, &size
);
1454 r
= Range(start
, end
);
1455 uint64_t size_with_mt
, size_without_mt
;
1456 db_
->GetApproximateSizes(size_approx_options
, default_cf
, &r
, 1,
1458 ASSERT_GT(size_with_mt
, 6000);
1459 db_
->GetApproximateSizes(&r
, 1, &size_without_mt
);
1460 ASSERT_EQ(size_without_mt
, 0);
1464 for (int i
= 0; i
< N
; i
++) {
1465 ASSERT_OK(Put(Key(i
+ 1000), rnd
.RandomString(1024)));
1470 r
= Range(start
, end
);
1471 db_
->GetApproximateSizes(size_approx_options
, default_cf
, &r
, 1,
1473 db_
->GetApproximateSizes(&r
, 1, &size_without_mt
);
1474 ASSERT_GT(size_with_mt
, size_without_mt
);
1475 ASSERT_GT(size_without_mt
, 6000);
1477 // Check that include_memtabtles flag works as expected
1478 size_approx_options
.include_memtabtles
= false;
1479 db_
->GetApproximateSizes(size_approx_options
, default_cf
, &r
, 1, &size
);
1480 ASSERT_EQ(size
, size_without_mt
);
1482 // Check that files_size_error_margin works as expected, when the heuristic
1483 // conditions are not met
1485 end
= Key(1000 + N
- 2);
1486 r
= Range(start
, end
);
1487 size_approx_options
.files_size_error_margin
= -1.0; // disabled
1488 db_
->GetApproximateSizes(size_approx_options
, default_cf
, &r
, 1, &size
);
1490 size_approx_options
.files_size_error_margin
= 0.5; // enabled, but not used
1491 db_
->GetApproximateSizes(size_approx_options
, default_cf
, &r
, 1, &size2
);
1492 ASSERT_EQ(size
, size2
);
1495 TEST_F(DBTest
, ApproximateSizesFilesWithErrorMargin
) {
1496 // Roughly 4 keys per data block, 1000 keys per file,
1497 // with filter substantially larger than a data block
1498 BlockBasedTableOptions table_options
;
1499 table_options
.filter_policy
.reset(NewBloomFilterPolicy(16));
1500 table_options
.block_size
= 100;
1501 Options options
= CurrentOptions();
1502 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
1503 options
.write_buffer_size
= 24 * 1024;
1504 options
.compression
= kNoCompression
;
1505 options
.create_if_missing
= true;
1506 options
.target_file_size_base
= 24 * 1024;
1507 DestroyAndReopen(options
);
1508 const auto default_cf
= db_
->DefaultColumnFamily();
1510 const int N
= 64000;
1512 for (int i
= 0; i
< N
; i
++) {
1513 ASSERT_OK(Put(Key(i
), rnd
.RandomString(24)));
1515 // Flush everything to files
1517 // Compact the entire key space into the next level
1518 db_
->CompactRange(CompactRangeOptions(), default_cf
, nullptr, nullptr);
1521 for (int i
= N
; i
< (N
+ N
/ 4); i
++) {
1522 ASSERT_OK(Put(Key(i
), rnd
.RandomString(24)));
1524 // Flush everything to files again
1527 // Wait for compaction to finish
1528 ASSERT_OK(dbfull()->TEST_WaitForCompact());
1531 const std::string start
= Key(0);
1532 const std::string end
= Key(2 * N
);
1533 const Range
r(start
, end
);
1535 SizeApproximationOptions size_approx_options
;
1536 size_approx_options
.include_memtabtles
= false;
1537 size_approx_options
.include_files
= true;
1538 size_approx_options
.files_size_error_margin
= -1.0; // disabled
1540 // Get the precise size without any approximation heuristic
1542 db_
->GetApproximateSizes(size_approx_options
, default_cf
, &r
, 1, &size
);
1545 // Get the size with an approximation heuristic
1547 const double error_margin
= 0.2;
1548 size_approx_options
.files_size_error_margin
= error_margin
;
1549 db_
->GetApproximateSizes(size_approx_options
, default_cf
, &r
, 1, &size2
);
1550 ASSERT_LT(size2
, size
* (1 + error_margin
));
1551 ASSERT_GT(size2
, size
* (1 - error_margin
));
1555 // Ensure that metadata is not falsely attributed only to the last data in
1556 // the file. (In some applications, filters can be large portion of data
1558 // Perform many queries over small range, enough to ensure crossing file
1559 // boundary, and make sure we never see a spike for large filter.
1560 for (int i
= 0; i
< 3000; i
+= 10) {
1561 const std::string start
= Key(i
);
1562 const std::string end
= Key(i
+ 11); // overlap by 1 key
1563 const Range
r(start
, end
);
1565 db_
->GetApproximateSizes(&r
, 1, &size
);
1566 ASSERT_LE(size
, 11 * 100);
1571 TEST_F(DBTest
, GetApproximateMemTableStats
) {
1572 Options options
= CurrentOptions();
1573 options
.write_buffer_size
= 100000000;
1574 options
.compression
= kNoCompression
;
1575 options
.create_if_missing
= true;
1576 DestroyAndReopen(options
);
1580 for (int i
= 0; i
< N
; i
++) {
1581 ASSERT_OK(Put(Key(i
), rnd
.RandomString(1024)));
1587 std::string start
= Key(50);
1588 std::string end
= Key(60);
1589 Range
r(start
, end
);
1590 db_
->GetApproximateMemTableStats(r
, &count
, &size
);
1591 ASSERT_GT(count
, 0);
1592 ASSERT_LE(count
, N
);
1593 ASSERT_GT(size
, 6000);
1594 ASSERT_LT(size
, 204800);
1598 r
= Range(start
, end
);
1599 db_
->GetApproximateMemTableStats(r
, &count
, &size
);
1600 ASSERT_EQ(count
, 0);
1607 r
= Range(start
, end
);
1608 db_
->GetApproximateMemTableStats(r
, &count
, &size
);
1609 ASSERT_EQ(count
, 0);
1612 for (int i
= 0; i
< N
; i
++) {
1613 ASSERT_OK(Put(Key(1000 + i
), rnd
.RandomString(1024)));
1618 r
= Range(start
, end
);
1619 db_
->GetApproximateMemTableStats(r
, &count
, &size
);
1620 ASSERT_GT(count
, 20);
1621 ASSERT_GT(size
, 6000);
1624 TEST_F(DBTest
, ApproximateSizes
) {
1626 Options options
= CurrentOptions();
1627 options
.write_buffer_size
= 100000000; // Large write buffer
1628 options
.compression
= kNoCompression
;
1629 options
.create_if_missing
= true;
1630 DestroyAndReopen(options
);
1631 CreateAndReopenWithCF({"pikachu"}, options
);
1633 ASSERT_TRUE(Between(Size("", "xyz", 1), 0, 0));
1634 ReopenWithColumnFamilies({"default", "pikachu"}, options
);
1635 ASSERT_TRUE(Between(Size("", "xyz", 1), 0, 0));
1637 // Write 8MB (80 values, each 100K)
1638 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
1640 static const int S1
= 100000;
1641 static const int S2
= 105000; // Allow some expansion from metadata
1643 for (int i
= 0; i
< N
; i
++) {
1644 ASSERT_OK(Put(1, Key(i
), rnd
.RandomString(S1
)));
1647 // 0 because GetApproximateSizes() does not account for memtable space
1648 ASSERT_TRUE(Between(Size("", Key(50), 1), 0, 0));
1650 // Check sizes across recovery by reopening a few times
1651 for (int run
= 0; run
< 3; run
++) {
1652 ReopenWithColumnFamilies({"default", "pikachu"}, options
);
1654 for (int compact_start
= 0; compact_start
< N
; compact_start
+= 10) {
1655 for (int i
= 0; i
< N
; i
+= 10) {
1656 ASSERT_TRUE(Between(Size("", Key(i
), 1), S1
* i
, S2
* i
));
1657 ASSERT_TRUE(Between(Size("", Key(i
) + ".suffix", 1), S1
* (i
+ 1),
1659 ASSERT_TRUE(Between(Size(Key(i
), Key(i
+ 10), 1), S1
* 10, S2
* 10));
1661 ASSERT_TRUE(Between(Size("", Key(50), 1), S1
* 50, S2
* 50));
1663 Between(Size("", Key(50) + ".suffix", 1), S1
* 50, S2
* 50));
1665 std::string cstart_str
= Key(compact_start
);
1666 std::string cend_str
= Key(compact_start
+ 9);
1667 Slice cstart
= cstart_str
;
1668 Slice cend
= cend_str
;
1669 dbfull()->TEST_CompactRange(0, &cstart
, &cend
, handles_
[1]);
1672 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
1673 ASSERT_GT(NumTableFilesAtLevel(1, 1), 0);
1675 // ApproximateOffsetOf() is not yet implemented in plain table format.
1676 } while (ChangeOptions(kSkipUniversalCompaction
| kSkipFIFOCompaction
|
1677 kSkipPlainTable
| kSkipHashIndex
));
1680 TEST_F(DBTest
, ApproximateSizes_MixOfSmallAndLarge
) {
1682 Options options
= CurrentOptions();
1683 options
.compression
= kNoCompression
;
1684 CreateAndReopenWithCF({"pikachu"}, options
);
1687 std::string big1
= rnd
.RandomString(100000);
1688 ASSERT_OK(Put(1, Key(0), rnd
.RandomString(10000)));
1689 ASSERT_OK(Put(1, Key(1), rnd
.RandomString(10000)));
1690 ASSERT_OK(Put(1, Key(2), big1
));
1691 ASSERT_OK(Put(1, Key(3), rnd
.RandomString(10000)));
1692 ASSERT_OK(Put(1, Key(4), big1
));
1693 ASSERT_OK(Put(1, Key(5), rnd
.RandomString(10000)));
1694 ASSERT_OK(Put(1, Key(6), rnd
.RandomString(300000)));
1695 ASSERT_OK(Put(1, Key(7), rnd
.RandomString(10000)));
1697 // Check sizes across recovery by reopening a few times
1698 for (int run
= 0; run
< 3; run
++) {
1699 ReopenWithColumnFamilies({"default", "pikachu"}, options
);
1701 ASSERT_TRUE(Between(Size("", Key(0), 1), 0, 0));
1702 ASSERT_TRUE(Between(Size("", Key(1), 1), 10000, 11000));
1703 ASSERT_TRUE(Between(Size("", Key(2), 1), 20000, 21000));
1704 ASSERT_TRUE(Between(Size("", Key(3), 1), 120000, 121000));
1705 ASSERT_TRUE(Between(Size("", Key(4), 1), 130000, 131000));
1706 ASSERT_TRUE(Between(Size("", Key(5), 1), 230000, 232000));
1707 ASSERT_TRUE(Between(Size("", Key(6), 1), 240000, 242000));
1708 // Ensure some overhead is accounted for, even without including all
1709 ASSERT_TRUE(Between(Size("", Key(7), 1), 540500, 545000));
1710 ASSERT_TRUE(Between(Size("", Key(8), 1), 550500, 555000));
1712 ASSERT_TRUE(Between(Size(Key(3), Key(5), 1), 110100, 111000));
1714 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_
[1]);
1716 // ApproximateOffsetOf() is not yet implemented in plain table format.
1717 } while (ChangeOptions(kSkipPlainTable
));
1719 #endif // ROCKSDB_LITE
1721 #ifndef ROCKSDB_LITE
1722 TEST_F(DBTest
, Snapshot
) {
1723 env_
->SetMockSleep();
1724 anon::OptionsOverride options_override
;
1725 options_override
.skip_policy
= kSkipNoSnapshot
;
1727 CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override
));
1728 Put(0, "foo", "0v1");
1729 Put(1, "foo", "1v1");
1731 const Snapshot
* s1
= db_
->GetSnapshot();
1732 ASSERT_EQ(1U, GetNumSnapshots());
1733 uint64_t time_snap1
= GetTimeOldestSnapshots();
1734 ASSERT_GT(time_snap1
, 0U);
1735 ASSERT_EQ(GetSequenceOldestSnapshots(), s1
->GetSequenceNumber());
1736 Put(0, "foo", "0v2");
1737 Put(1, "foo", "1v2");
1739 env_
->MockSleepForSeconds(1);
1741 const Snapshot
* s2
= db_
->GetSnapshot();
1742 ASSERT_EQ(2U, GetNumSnapshots());
1743 ASSERT_EQ(time_snap1
, GetTimeOldestSnapshots());
1744 ASSERT_EQ(GetSequenceOldestSnapshots(), s1
->GetSequenceNumber());
1745 Put(0, "foo", "0v3");
1746 Put(1, "foo", "1v3");
1749 ManagedSnapshot
s3(db_
);
1750 ASSERT_EQ(3U, GetNumSnapshots());
1751 ASSERT_EQ(time_snap1
, GetTimeOldestSnapshots());
1752 ASSERT_EQ(GetSequenceOldestSnapshots(), s1
->GetSequenceNumber());
1754 Put(0, "foo", "0v4");
1755 Put(1, "foo", "1v4");
1756 ASSERT_EQ("0v1", Get(0, "foo", s1
));
1757 ASSERT_EQ("1v1", Get(1, "foo", s1
));
1758 ASSERT_EQ("0v2", Get(0, "foo", s2
));
1759 ASSERT_EQ("1v2", Get(1, "foo", s2
));
1760 ASSERT_EQ("0v3", Get(0, "foo", s3
.snapshot()));
1761 ASSERT_EQ("1v3", Get(1, "foo", s3
.snapshot()));
1762 ASSERT_EQ("0v4", Get(0, "foo"));
1763 ASSERT_EQ("1v4", Get(1, "foo"));
1766 ASSERT_EQ(2U, GetNumSnapshots());
1767 ASSERT_EQ(time_snap1
, GetTimeOldestSnapshots());
1768 ASSERT_EQ(GetSequenceOldestSnapshots(), s1
->GetSequenceNumber());
1769 ASSERT_EQ("0v1", Get(0, "foo", s1
));
1770 ASSERT_EQ("1v1", Get(1, "foo", s1
));
1771 ASSERT_EQ("0v2", Get(0, "foo", s2
));
1772 ASSERT_EQ("1v2", Get(1, "foo", s2
));
1773 ASSERT_EQ("0v4", Get(0, "foo"));
1774 ASSERT_EQ("1v4", Get(1, "foo"));
1776 db_
->ReleaseSnapshot(s1
);
1777 ASSERT_EQ("0v2", Get(0, "foo", s2
));
1778 ASSERT_EQ("1v2", Get(1, "foo", s2
));
1779 ASSERT_EQ("0v4", Get(0, "foo"));
1780 ASSERT_EQ("1v4", Get(1, "foo"));
1781 ASSERT_EQ(1U, GetNumSnapshots());
1782 ASSERT_LT(time_snap1
, GetTimeOldestSnapshots());
1783 ASSERT_EQ(GetSequenceOldestSnapshots(), s2
->GetSequenceNumber());
1785 db_
->ReleaseSnapshot(s2
);
1786 ASSERT_EQ(0U, GetNumSnapshots());
1787 ASSERT_EQ(GetSequenceOldestSnapshots(), 0);
1788 ASSERT_EQ("0v4", Get(0, "foo"));
1789 ASSERT_EQ("1v4", Get(1, "foo"));
1790 } while (ChangeOptions());
1793 TEST_F(DBTest
, HiddenValuesAreRemoved
) {
1794 anon::OptionsOverride options_override
;
1795 options_override
.skip_policy
= kSkipNoSnapshot
;
1797 Options options
= CurrentOptions(options_override
);
1798 CreateAndReopenWithCF({"pikachu"}, options
);
1800 FillLevels("a", "z", 1);
1802 std::string big
= rnd
.RandomString(50000);
1804 Put(1, "pastfoo", "v");
1805 const Snapshot
* snapshot
= db_
->GetSnapshot();
1806 Put(1, "foo", "tiny");
1807 Put(1, "pastfoo2", "v2"); // Advance sequence number one more
1809 ASSERT_OK(Flush(1));
1810 ASSERT_GT(NumTableFilesAtLevel(0, 1), 0);
1812 ASSERT_EQ(big
, Get(1, "foo", snapshot
));
1813 ASSERT_TRUE(Between(Size("", "pastfoo", 1), 50000, 60000));
1814 db_
->ReleaseSnapshot(snapshot
);
1815 ASSERT_EQ(AllEntriesFor("foo", 1), "[ tiny, " + big
+ " ]");
1817 dbfull()->TEST_CompactRange(0, nullptr, &x
, handles_
[1]);
1818 ASSERT_EQ(AllEntriesFor("foo", 1), "[ tiny ]");
1819 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
1820 ASSERT_GE(NumTableFilesAtLevel(1, 1), 1);
1821 dbfull()->TEST_CompactRange(1, nullptr, &x
, handles_
[1]);
1822 ASSERT_EQ(AllEntriesFor("foo", 1), "[ tiny ]");
1824 ASSERT_TRUE(Between(Size("", "pastfoo", 1), 0, 1000));
1825 // ApproximateOffsetOf() is not yet implemented in plain table format,
1826 // which is used by Size().
1827 } while (ChangeOptions(kSkipUniversalCompaction
| kSkipFIFOCompaction
|
1830 #endif // ROCKSDB_LITE
1832 TEST_F(DBTest
, UnremovableSingleDelete
) {
1835 // Put(A, v1) Snapshot SingleDelete(A) Put(A, v2)
1837 // We do not want to end up with:
1839 // Put(A, v1) Snapshot Put(A, v2)
1841 // Because a subsequent SingleDelete(A) would delete the Put(A, v2)
1842 // but not Put(A, v1), so Get(A) would return v1.
1843 anon::OptionsOverride options_override
;
1844 options_override
.skip_policy
= kSkipNoSnapshot
;
1846 Options options
= CurrentOptions(options_override
);
1847 options
.disable_auto_compactions
= true;
1848 CreateAndReopenWithCF({"pikachu"}, options
);
1850 Put(1, "foo", "first");
1851 const Snapshot
* snapshot
= db_
->GetSnapshot();
1852 SingleDelete(1, "foo");
1853 Put(1, "foo", "second");
1854 ASSERT_OK(Flush(1));
1856 ASSERT_EQ("first", Get(1, "foo", snapshot
));
1857 ASSERT_EQ("second", Get(1, "foo"));
1859 dbfull()->CompactRange(CompactRangeOptions(), handles_
[1], nullptr,
1861 ASSERT_EQ("[ second, SDEL, first ]", AllEntriesFor("foo", 1));
1863 SingleDelete(1, "foo");
1865 ASSERT_EQ("first", Get(1, "foo", snapshot
));
1866 ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
1868 dbfull()->CompactRange(CompactRangeOptions(), handles_
[1], nullptr,
1871 ASSERT_EQ("first", Get(1, "foo", snapshot
));
1872 ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
1873 db_
->ReleaseSnapshot(snapshot
);
1874 // Skip FIFO and universal compaction beccause they do not apply to the test
1875 // case. Skip MergePut because single delete does not get removed when it
1876 // encounters a merge.
1877 } while (ChangeOptions(kSkipFIFOCompaction
| kSkipUniversalCompaction
|
1881 #ifndef ROCKSDB_LITE
1882 TEST_F(DBTest
, DeletionMarkers1
) {
1883 Options options
= CurrentOptions();
1884 CreateAndReopenWithCF({"pikachu"}, options
);
1885 Put(1, "foo", "v1");
1886 ASSERT_OK(Flush(1));
1888 MoveFilesToLevel(last
, 1);
1889 // foo => v1 is now in last level
1890 ASSERT_EQ(NumTableFilesAtLevel(last
, 1), 1);
1892 // Place a table at level last-1 to prevent merging with preceding mutation
1893 Put(1, "a", "begin");
1896 MoveFilesToLevel(last
- 1, 1);
1897 ASSERT_EQ(NumTableFilesAtLevel(last
, 1), 1);
1898 ASSERT_EQ(NumTableFilesAtLevel(last
- 1, 1), 1);
1901 Put(1, "foo", "v2");
1902 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, DEL, v1 ]");
1903 ASSERT_OK(Flush(1)); // Moves to level last-2
1904 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, v1 ]");
1906 dbfull()->TEST_CompactRange(last
- 2, nullptr, &z
, handles_
[1]);
1907 // DEL eliminated, but v1 remains because we aren't compacting that level
1908 // (DEL can be eliminated because v2 hides v1).
1909 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, v1 ]");
1910 dbfull()->TEST_CompactRange(last
- 1, nullptr, nullptr, handles_
[1]);
1911 // Merging last-1 w/ last, so we are the base level for "foo", so
1912 // DEL is removed. (as is v1).
1913 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2 ]");
1916 TEST_F(DBTest
, DeletionMarkers2
) {
1917 Options options
= CurrentOptions();
1918 CreateAndReopenWithCF({"pikachu"}, options
);
1919 Put(1, "foo", "v1");
1920 ASSERT_OK(Flush(1));
1922 MoveFilesToLevel(last
, 1);
1923 // foo => v1 is now in last level
1924 ASSERT_EQ(NumTableFilesAtLevel(last
, 1), 1);
1926 // Place a table at level last-1 to prevent merging with preceding mutation
1927 Put(1, "a", "begin");
1930 MoveFilesToLevel(last
- 1, 1);
1931 ASSERT_EQ(NumTableFilesAtLevel(last
, 1), 1);
1932 ASSERT_EQ(NumTableFilesAtLevel(last
- 1, 1), 1);
1935 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v1 ]");
1936 ASSERT_OK(Flush(1)); // Moves to level last-2
1937 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v1 ]");
1938 dbfull()->TEST_CompactRange(last
- 2, nullptr, nullptr, handles_
[1]);
1939 // DEL kept: "last" file overlaps
1940 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v1 ]");
1941 dbfull()->TEST_CompactRange(last
- 1, nullptr, nullptr, handles_
[1]);
1942 // Merging last-1 w/ last, so we are the base level for "foo", so
1943 // DEL is removed. (as is v1).
1944 ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
1947 TEST_F(DBTest
, OverlapInLevel0
) {
1949 Options options
= CurrentOptions();
1950 CreateAndReopenWithCF({"pikachu"}, options
);
1952 // Fill levels 1 and 2 to disable the pushing of new memtables to levels >
1954 ASSERT_OK(Put(1, "100", "v100"));
1955 ASSERT_OK(Put(1, "999", "v999"));
1957 MoveFilesToLevel(2, 1);
1958 ASSERT_OK(Delete(1, "100"));
1959 ASSERT_OK(Delete(1, "999"));
1961 MoveFilesToLevel(1, 1);
1962 ASSERT_EQ("0,1,1", FilesPerLevel(1));
1964 // Make files spanning the following ranges in level-0:
1965 // files[0] 200 .. 900
1966 // files[1] 300 .. 500
1967 // Note that files are sorted by smallest key.
1968 ASSERT_OK(Put(1, "300", "v300"));
1969 ASSERT_OK(Put(1, "500", "v500"));
1971 ASSERT_OK(Put(1, "200", "v200"));
1972 ASSERT_OK(Put(1, "600", "v600"));
1973 ASSERT_OK(Put(1, "900", "v900"));
1975 ASSERT_EQ("2,1,1", FilesPerLevel(1));
1977 // Compact away the placeholder files we created initially
1978 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_
[1]);
1979 dbfull()->TEST_CompactRange(2, nullptr, nullptr, handles_
[1]);
1980 ASSERT_EQ("2", FilesPerLevel(1));
1982 // Do a memtable compaction. Before bug-fix, the compaction would
1983 // not detect the overlap with level-0 files and would incorrectly place
1984 // the deletion in a deeper level.
1985 ASSERT_OK(Delete(1, "600"));
1987 ASSERT_EQ("3", FilesPerLevel(1));
1988 ASSERT_EQ("NOT_FOUND", Get(1, "600"));
1989 } while (ChangeOptions(kSkipUniversalCompaction
| kSkipFIFOCompaction
));
1991 #endif // ROCKSDB_LITE
1993 TEST_F(DBTest
, ComparatorCheck
) {
1994 class NewComparator
: public Comparator
{
1996 const char* Name() const override
{ return "rocksdb.NewComparator"; }
1997 int Compare(const Slice
& a
, const Slice
& b
) const override
{
1998 return BytewiseComparator()->Compare(a
, b
);
2000 void FindShortestSeparator(std::string
* s
, const Slice
& l
) const override
{
2001 BytewiseComparator()->FindShortestSeparator(s
, l
);
2003 void FindShortSuccessor(std::string
* key
) const override
{
2004 BytewiseComparator()->FindShortSuccessor(key
);
2007 Options new_options
, options
;
2010 options
= CurrentOptions();
2011 CreateAndReopenWithCF({"pikachu"}, options
);
2012 new_options
= CurrentOptions();
2013 new_options
.comparator
= &cmp
;
2014 // only the non-default column family has non-matching comparator
2015 Status s
= TryReopenWithColumnFamilies(
2016 {"default", "pikachu"}, std::vector
<Options
>({options
, new_options
}));
2017 ASSERT_TRUE(!s
.ok());
2018 ASSERT_TRUE(s
.ToString().find("comparator") != std::string::npos
)
2020 } while (ChangeCompactOptions());
2023 TEST_F(DBTest
, CustomComparator
) {
2024 class NumberComparator
: public Comparator
{
2026 const char* Name() const override
{ return "test.NumberComparator"; }
2027 int Compare(const Slice
& a
, const Slice
& b
) const override
{
2028 return ToNumber(a
) - ToNumber(b
);
2030 void FindShortestSeparator(std::string
* s
, const Slice
& l
) const override
{
2031 ToNumber(*s
); // Check format
2032 ToNumber(l
); // Check format
2034 void FindShortSuccessor(std::string
* key
) const override
{
2035 ToNumber(*key
); // Check format
2039 static int ToNumber(const Slice
& x
) {
2040 // Check that there are no extra characters.
2041 EXPECT_TRUE(x
.size() >= 2 && x
[0] == '[' && x
[x
.size() - 1] == ']')
2045 EXPECT_TRUE(sscanf(x
.ToString().c_str(), "[%i]%c", &val
, &ignored
) == 1)
2050 Options new_options
;
2051 NumberComparator cmp
;
2053 new_options
= CurrentOptions();
2054 new_options
.create_if_missing
= true;
2055 new_options
.comparator
= &cmp
;
2056 new_options
.write_buffer_size
= 4096; // Compact more often
2057 new_options
.arena_block_size
= 4096;
2058 new_options
= CurrentOptions(new_options
);
2059 DestroyAndReopen(new_options
);
2060 CreateAndReopenWithCF({"pikachu"}, new_options
);
2061 ASSERT_OK(Put(1, "[10]", "ten"));
2062 ASSERT_OK(Put(1, "[0x14]", "twenty"));
2063 for (int i
= 0; i
< 2; i
++) {
2064 ASSERT_EQ("ten", Get(1, "[10]"));
2065 ASSERT_EQ("ten", Get(1, "[0xa]"));
2066 ASSERT_EQ("twenty", Get(1, "[20]"));
2067 ASSERT_EQ("twenty", Get(1, "[0x14]"));
2068 ASSERT_EQ("NOT_FOUND", Get(1, "[15]"));
2069 ASSERT_EQ("NOT_FOUND", Get(1, "[0xf]"));
2070 Compact(1, "[0]", "[9999]");
2073 for (int run
= 0; run
< 2; run
++) {
2074 for (int i
= 0; i
< 1000; i
++) {
2076 snprintf(buf
, sizeof(buf
), "[%d]", i
* 10);
2077 ASSERT_OK(Put(1, buf
, buf
));
2079 Compact(1, "[0]", "[1000000]");
2081 } while (ChangeCompactOptions());
2084 TEST_F(DBTest
, DBOpen_Options
) {
2085 Options options
= CurrentOptions();
2086 std::string dbname
= test::PerThreadDBPath("db_options_test");
2087 ASSERT_OK(DestroyDB(dbname
, options
));
2089 // Does not exist, and create_if_missing == false: error
2091 options
.create_if_missing
= false;
2092 Status s
= DB::Open(options
, dbname
, &db
);
2093 ASSERT_TRUE(strstr(s
.ToString().c_str(), "does not exist") != nullptr);
2094 ASSERT_TRUE(db
== nullptr);
2096 // Does not exist, and create_if_missing == true: OK
2097 options
.create_if_missing
= true;
2098 s
= DB::Open(options
, dbname
, &db
);
2100 ASSERT_TRUE(db
!= nullptr);
2105 // Does exist, and error_if_exists == true: error
2106 options
.create_if_missing
= false;
2107 options
.error_if_exists
= true;
2108 s
= DB::Open(options
, dbname
, &db
);
2109 ASSERT_TRUE(strstr(s
.ToString().c_str(), "exists") != nullptr);
2110 ASSERT_TRUE(db
== nullptr);
2112 // Does exist, and error_if_exists == false: OK
2113 options
.create_if_missing
= true;
2114 options
.error_if_exists
= false;
2115 s
= DB::Open(options
, dbname
, &db
);
2117 ASSERT_TRUE(db
!= nullptr);
2123 TEST_F(DBTest
, DBOpen_Change_NumLevels
) {
2124 Options options
= CurrentOptions();
2125 options
.create_if_missing
= true;
2126 DestroyAndReopen(options
);
2127 ASSERT_TRUE(db_
!= nullptr);
2128 CreateAndReopenWithCF({"pikachu"}, options
);
2130 ASSERT_OK(Put(1, "a", "123"));
2131 ASSERT_OK(Put(1, "b", "234"));
2133 MoveFilesToLevel(3, 1);
2136 options
.create_if_missing
= false;
2137 options
.num_levels
= 2;
2138 Status s
= TryReopenWithColumnFamilies({"default", "pikachu"}, options
);
2139 ASSERT_TRUE(strstr(s
.ToString().c_str(), "Invalid argument") != nullptr);
2140 ASSERT_TRUE(db_
== nullptr);
2143 TEST_F(DBTest
, DestroyDBMetaDatabase
) {
2144 std::string dbname
= test::PerThreadDBPath("db_meta");
2145 ASSERT_OK(env_
->CreateDirIfMissing(dbname
));
2146 std::string metadbname
= MetaDatabaseName(dbname
, 0);
2147 ASSERT_OK(env_
->CreateDirIfMissing(metadbname
));
2148 std::string metametadbname
= MetaDatabaseName(metadbname
, 0);
2149 ASSERT_OK(env_
->CreateDirIfMissing(metametadbname
));
2151 // Destroy previous versions if they exist. Using the long way.
2152 Options options
= CurrentOptions();
2153 ASSERT_OK(DestroyDB(metametadbname
, options
));
2154 ASSERT_OK(DestroyDB(metadbname
, options
));
2155 ASSERT_OK(DestroyDB(dbname
, options
));
2159 ASSERT_OK(DB::Open(options
, dbname
, &db
));
2162 ASSERT_OK(DB::Open(options
, metadbname
, &db
));
2165 ASSERT_OK(DB::Open(options
, metametadbname
, &db
));
2170 ASSERT_OK(DestroyDB(dbname
, options
));
2172 // Check if deletion worked.
2173 options
.create_if_missing
= false;
2174 ASSERT_TRUE(!(DB::Open(options
, dbname
, &db
)).ok());
2175 ASSERT_TRUE(!(DB::Open(options
, metadbname
, &db
)).ok());
2176 ASSERT_TRUE(!(DB::Open(options
, metametadbname
, &db
)).ok());
2179 #ifndef ROCKSDB_LITE
2180 TEST_F(DBTest
, SnapshotFiles
) {
2182 Options options
= CurrentOptions();
2183 options
.write_buffer_size
= 100000000; // Large write buffer
2184 CreateAndReopenWithCF({"pikachu"}, options
);
2188 // Write 8MB (80 values, each 100K)
2189 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
2190 std::vector
<std::string
> values
;
2191 for (int i
= 0; i
< 80; i
++) {
2192 values
.push_back(rnd
.RandomString(100000));
2193 ASSERT_OK(Put((i
< 40), Key(i
), values
[i
]));
2196 // assert that nothing makes it to disk yet.
2197 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
2199 // get a file snapshot
2200 uint64_t manifest_number
= 0;
2201 uint64_t manifest_size
= 0;
2202 std::vector
<std::string
> files
;
2203 dbfull()->DisableFileDeletions();
2204 dbfull()->GetLiveFiles(files
, &manifest_size
);
2206 // CURRENT, MANIFEST, OPTIONS, *.sst files (one for each CF)
2207 ASSERT_EQ(files
.size(), 5U);
2209 uint64_t number
= 0;
2212 // copy these files to a new snapshot directory
2213 std::string snapdir
= dbname_
+ ".snapdir/";
2214 if (env_
->FileExists(snapdir
).ok()) {
2215 ASSERT_OK(DestroyDir(env_
, snapdir
));
2217 ASSERT_OK(env_
->CreateDir(snapdir
));
2219 for (size_t i
= 0; i
< files
.size(); i
++) {
2220 // our clients require that GetLiveFiles returns
2221 // files with "/" as first character!
2222 ASSERT_EQ(files
[i
][0], '/');
2223 std::string src
= dbname_
+ files
[i
];
2224 std::string dest
= snapdir
+ files
[i
];
2227 ASSERT_OK(env_
->GetFileSize(src
, &size
));
2229 // record the number and the size of the
2230 // latest manifest file
2231 if (ParseFileName(files
[i
].substr(1), &number
, &type
)) {
2232 if (type
== kDescriptorFile
) {
2233 if (number
> manifest_number
) {
2234 manifest_number
= number
;
2235 ASSERT_GE(size
, manifest_size
);
2236 size
= manifest_size
; // copy only valid MANIFEST data
2240 CopyFile(src
, dest
, size
);
2243 // release file snapshot
2244 dbfull()->DisableFileDeletions();
2245 // overwrite one key, this key should not appear in the snapshot
2246 std::vector
<std::string
> extras
;
2247 for (unsigned int i
= 0; i
< 1; i
++) {
2248 extras
.push_back(rnd
.RandomString(100000));
2249 ASSERT_OK(Put(0, Key(i
), extras
[i
]));
2252 // verify that data in the snapshot are correct
2253 std::vector
<ColumnFamilyDescriptor
> column_families
;
2254 column_families
.emplace_back("default", ColumnFamilyOptions());
2255 column_families
.emplace_back("pikachu", ColumnFamilyOptions());
2256 std::vector
<ColumnFamilyHandle
*> cf_handles
;
2260 opts
.create_if_missing
= false;
2262 DB::Open(opts
, snapdir
, column_families
, &cf_handles
, &snapdb
);
2265 ReadOptions roptions
;
2267 for (unsigned int i
= 0; i
< 80; i
++) {
2268 stat
= snapdb
->Get(roptions
, cf_handles
[i
< 40], Key(i
), &val
);
2269 ASSERT_EQ(values
[i
].compare(val
), 0);
2271 for (auto cfh
: cf_handles
) {
2276 // look at the new live files after we added an 'extra' key
2277 // and after we took the first snapshot.
2278 uint64_t new_manifest_number
= 0;
2279 uint64_t new_manifest_size
= 0;
2280 std::vector
<std::string
> newfiles
;
2281 dbfull()->DisableFileDeletions();
2282 dbfull()->GetLiveFiles(newfiles
, &new_manifest_size
);
2284 // find the new manifest file. assert that this manifest file is
2285 // the same one as in the previous snapshot. But its size should be
2286 // larger because we added an extra key after taking the
2287 // previous shapshot.
2288 for (size_t i
= 0; i
< newfiles
.size(); i
++) {
2289 std::string src
= dbname_
+ "/" + newfiles
[i
];
2290 // record the lognumber and the size of the
2291 // latest manifest file
2292 if (ParseFileName(newfiles
[i
].substr(1), &number
, &type
)) {
2293 if (type
== kDescriptorFile
) {
2294 if (number
> new_manifest_number
) {
2296 new_manifest_number
= number
;
2297 ASSERT_OK(env_
->GetFileSize(src
, &size
));
2298 ASSERT_GE(size
, new_manifest_size
);
2303 ASSERT_EQ(manifest_number
, new_manifest_number
);
2304 ASSERT_GT(new_manifest_size
, manifest_size
);
2306 // release file snapshot
2307 dbfull()->DisableFileDeletions();
2308 } while (ChangeCompactOptions());
2311 TEST_F(DBTest
, ReadonlyDBGetLiveManifestSize
) {
2313 Options options
= CurrentOptions();
2314 options
.level0_file_num_compaction_trigger
= 2;
2315 DestroyAndReopen(options
);
2317 ASSERT_OK(Put("foo", "bar"));
2319 ASSERT_OK(Put("foo", "bar"));
2321 ASSERT_OK(dbfull()->TEST_WaitForCompact());
2324 ASSERT_OK(ReadOnlyReopen(options
));
2326 uint64_t manifest_size
= 0;
2327 std::vector
<std::string
> files
;
2328 dbfull()->GetLiveFiles(files
, &manifest_size
);
2330 for (const std::string
& f
: files
) {
2331 uint64_t number
= 0;
2333 if (ParseFileName(f
.substr(1), &number
, &type
)) {
2334 if (type
== kDescriptorFile
) {
2335 uint64_t size_on_disk
;
2336 env_
->GetFileSize(dbname_
+ "/" + f
, &size_on_disk
);
2337 ASSERT_EQ(manifest_size
, size_on_disk
);
2343 } while (ChangeCompactOptions());
2346 TEST_F(DBTest
, GetLiveBlobFiles
) {
2347 VersionSet
* const versions
= dbfull()->TEST_GetVersionSet();
2349 assert(versions
->GetColumnFamilySet());
2351 ColumnFamilyData
* const cfd
= versions
->GetColumnFamilySet()->GetDefault();
2354 Version
* const version
= cfd
->current();
2357 VersionStorageInfo
* const storage_info
= version
->storage_info();
2358 assert(storage_info
);
2360 // Add a live blob file.
2361 constexpr uint64_t blob_file_number
= 234;
2362 constexpr uint64_t total_blob_count
= 555;
2363 constexpr uint64_t total_blob_bytes
= 66666;
2364 constexpr char checksum_method
[] = "CRC32";
2365 constexpr char checksum_value
[] = "3d87ff57";
2367 auto shared_meta
= SharedBlobFileMetaData::Create(
2368 blob_file_number
, total_blob_count
, total_blob_bytes
, checksum_method
,
2371 constexpr uint64_t garbage_blob_count
= 0;
2372 constexpr uint64_t garbage_blob_bytes
= 0;
2374 auto meta
= BlobFileMetaData::Create(std::move(shared_meta
),
2375 BlobFileMetaData::LinkedSsts(),
2376 garbage_blob_count
, garbage_blob_bytes
);
2378 storage_info
->AddBlobFile(std::move(meta
));
2380 // Make sure it appears in the results returned by GetLiveFiles.
2381 uint64_t manifest_size
= 0;
2382 std::vector
<std::string
> files
;
2383 ASSERT_OK(dbfull()->GetLiveFiles(files
, &manifest_size
));
2385 ASSERT_FALSE(files
.empty());
2386 ASSERT_EQ(files
[0], BlobFileName("", blob_file_number
));
2390 TEST_F(DBTest
, PurgeInfoLogs
) {
2391 Options options
= CurrentOptions();
2392 options
.keep_log_file_num
= 5;
2393 options
.create_if_missing
= true;
2395 for (int mode
= 0; mode
<= 1; mode
++) {
2397 options
.db_log_dir
= dbname_
+ "_logs";
2398 env_
->CreateDirIfMissing(options
.db_log_dir
);
2400 options
.db_log_dir
= "";
2402 for (int i
= 0; i
< 8; i
++) {
2406 std::vector
<std::string
> files
;
2407 env_
->GetChildren(options
.db_log_dir
.empty() ? dbname_
: options
.db_log_dir
,
2409 int info_log_count
= 0;
2410 for (std::string file
: files
) {
2411 if (file
.find("LOG") != std::string::npos
) {
2415 ASSERT_EQ(5, info_log_count
);
2418 // For mode (1), test DestroyDB() to delete all the logs under DB dir.
2419 // For mode (2), no info log file should have been put under DB dir.
2420 std::vector
<std::string
> db_files
;
2421 env_
->GetChildren(dbname_
, &db_files
);
2422 for (std::string file
: db_files
) {
2423 ASSERT_TRUE(file
.find("LOG") == std::string::npos
);
2428 env_
->GetChildren(options
.db_log_dir
, &files
);
2429 for (std::string file
: files
) {
2430 env_
->DeleteFile(options
.db_log_dir
+ "/" + file
);
2432 env_
->DeleteDir(options
.db_log_dir
);
2437 #ifndef ROCKSDB_LITE
2438 // Multi-threaded test:
2441 static const int kColumnFamilies
= 10;
2442 static const int kNumThreads
= 10;
2443 static const int kTestSeconds
= 10;
2444 static const int kNumKeys
= 1000;
2448 std::atomic
<bool> stop
;
2449 std::atomic
<int> counter
[kNumThreads
];
2450 std::atomic
<bool> thread_done
[kNumThreads
];
2456 bool multiget_batched
;
2459 static void MTThreadBody(void* arg
) {
2460 MTThread
* t
= reinterpret_cast<MTThread
*>(arg
);
2462 DB
* db
= t
->state
->test
->db_
;
2464 fprintf(stderr
, "... starting thread %d\n", id
);
2465 Random
rnd(1000 + id
);
2467 while (t
->state
->stop
.load(std::memory_order_acquire
) == false) {
2468 t
->state
->counter
[id
].store(counter
, std::memory_order_release
);
2470 int key
= rnd
.Uniform(kNumKeys
);
2472 snprintf(keybuf
, sizeof(keybuf
), "%016d", key
);
2475 // Write values of the form <key, my id, counter, cf, unique_id>.
2476 // into each of the CFs
2477 // We add some padding for force compactions.
2478 int unique_id
= rnd
.Uniform(1000000);
2480 // Half of the time directly use WriteBatch. Half of the time use
2481 // WriteBatchWithIndex.
2484 for (int cf
= 0; cf
< kColumnFamilies
; ++cf
) {
2485 snprintf(valbuf
, sizeof(valbuf
), "%d.%d.%d.%d.%-1000d", key
, id
,
2486 static_cast<int>(counter
), cf
, unique_id
);
2487 batch
.Put(t
->state
->test
->handles_
[cf
], Slice(keybuf
), Slice(valbuf
));
2489 ASSERT_OK(db
->Write(WriteOptions(), &batch
));
2491 WriteBatchWithIndex
batch(db
->GetOptions().comparator
);
2492 for (int cf
= 0; cf
< kColumnFamilies
; ++cf
) {
2493 snprintf(valbuf
, sizeof(valbuf
), "%d.%d.%d.%d.%-1000d", key
, id
,
2494 static_cast<int>(counter
), cf
, unique_id
);
2495 batch
.Put(t
->state
->test
->handles_
[cf
], Slice(keybuf
), Slice(valbuf
));
2497 ASSERT_OK(db
->Write(WriteOptions(), batch
.GetWriteBatch()));
2500 // Read a value and verify that it matches the pattern written above
2501 // and that writes to all column families were atomic (unique_id is the
2503 std::vector
<Slice
> keys(kColumnFamilies
, Slice(keybuf
));
2504 std::vector
<std::string
> values
;
2505 std::vector
<Status
> statuses
;
2506 if (!t
->multiget_batched
) {
2507 statuses
= db
->MultiGet(ReadOptions(), t
->state
->test
->handles_
, keys
,
2510 std::vector
<PinnableSlice
> pin_values(keys
.size());
2511 statuses
.resize(keys
.size());
2512 const Snapshot
* snapshot
= db
->GetSnapshot();
2514 ro
.snapshot
= snapshot
;
2515 for (int cf
= 0; cf
< kColumnFamilies
; ++cf
) {
2516 db
->MultiGet(ro
, t
->state
->test
->handles_
[cf
], 1, &keys
[cf
],
2517 &pin_values
[cf
], &statuses
[cf
]);
2519 db
->ReleaseSnapshot(snapshot
);
2520 values
.resize(keys
.size());
2521 for (int cf
= 0; cf
< kColumnFamilies
; ++cf
) {
2522 if (statuses
[cf
].ok()) {
2523 values
[cf
].assign(pin_values
[cf
].data(), pin_values
[cf
].size());
2527 Status s
= statuses
[0];
2528 // all statuses have to be the same
2529 for (size_t i
= 1; i
< statuses
.size(); ++i
) {
2530 // they are either both ok or both not-found
2531 ASSERT_TRUE((s
.ok() && statuses
[i
].ok()) ||
2532 (s
.IsNotFound() && statuses
[i
].IsNotFound()));
2534 if (s
.IsNotFound()) {
2535 // Key has not yet been written
2537 // Check that the writer thread counter is >= the counter in the value
2540 for (int i
= 0; i
< kColumnFamilies
; ++i
) {
2542 ASSERT_EQ(5, sscanf(values
[i
].c_str(), "%d.%d.%d.%d.%d", &k
, &w
, &c
,
2547 ASSERT_LT(w
, kNumThreads
);
2548 ASSERT_LE(c
, t
->state
->counter
[w
].load(std::memory_order_acquire
));
2553 // this checks that updates across column families happened
2554 // atomically -- all unique ids are the same
2555 ASSERT_EQ(u
, unique_id
);
2562 t
->state
->thread_done
[id
].store(true, std::memory_order_release
);
2563 fprintf(stderr
, "... stopping thread %d after %d ops\n", id
, int(counter
));
2568 class MultiThreadedDBTest
2570 public ::testing::WithParamInterface
<std::tuple
<int, bool>> {
2572 void SetUp() override
{
2573 std::tie(option_config_
, multiget_batched_
) = GetParam();
2576 static std::vector
<int> GenerateOptionConfigs() {
2577 std::vector
<int> optionConfigs
;
2578 for (int optionConfig
= kDefault
; optionConfig
< kEnd
; ++optionConfig
) {
2579 optionConfigs
.push_back(optionConfig
);
2581 return optionConfigs
;
2584 bool multiget_batched_
;
2587 TEST_P(MultiThreadedDBTest
, MultiThreaded
) {
2588 if (option_config_
== kPipelinedWrite
) return;
2589 anon::OptionsOverride options_override
;
2590 options_override
.skip_policy
= kSkipNoSnapshot
;
2591 Options options
= CurrentOptions(options_override
);
2592 std::vector
<std::string
> cfs
;
2593 for (int i
= 1; i
< kColumnFamilies
; ++i
) {
2594 cfs
.push_back(ToString(i
));
2597 CreateAndReopenWithCF(cfs
, options
);
2601 mt
.stop
.store(false, std::memory_order_release
);
2602 for (int id
= 0; id
< kNumThreads
; id
++) {
2603 mt
.counter
[id
].store(0, std::memory_order_release
);
2604 mt
.thread_done
[id
].store(false, std::memory_order_release
);
2608 MTThread thread
[kNumThreads
];
2609 for (int id
= 0; id
< kNumThreads
; id
++) {
2610 thread
[id
].state
= &mt
;
2612 thread
[id
].multiget_batched
= multiget_batched_
;
2613 env_
->StartThread(MTThreadBody
, &thread
[id
]);
2616 // Let them run for a while
2617 env_
->SleepForMicroseconds(kTestSeconds
* 1000000);
2619 // Stop the threads and wait for them to finish
2620 mt
.stop
.store(true, std::memory_order_release
);
2621 for (int id
= 0; id
< kNumThreads
; id
++) {
2622 while (mt
.thread_done
[id
].load(std::memory_order_acquire
) == false) {
2623 env_
->SleepForMicroseconds(100000);
2628 INSTANTIATE_TEST_CASE_P(
2629 MultiThreaded
, MultiThreadedDBTest
,
2631 ::testing::ValuesIn(MultiThreadedDBTest::GenerateOptionConfigs()),
2632 ::testing::Bool()));
2633 #endif // ROCKSDB_LITE
2635 // Group commit test:
2636 #if !defined(TRAVIS) && !defined(OS_WIN)
2637 // Disable this test temporarily on Travis and appveyor as it fails
2638 // intermittently. Github issue: #4151
2641 static const int kGCNumThreads
= 4;
2642 static const int kGCNumKeys
= 1000;
2647 std::atomic
<bool> done
;
2650 static void GCThreadBody(void* arg
) {
2651 GCThread
* t
= reinterpret_cast<GCThread
*>(arg
);
2656 for (int i
= 0; i
< kGCNumKeys
; ++i
) {
2657 std::string
kv(ToString(i
+ id
* kGCNumKeys
));
2658 ASSERT_OK(db
->Put(wo
, kv
, kv
));
2665 TEST_F(DBTest
, GroupCommitTest
) {
2667 Options options
= CurrentOptions();
2669 options
.statistics
= ROCKSDB_NAMESPACE::CreateDBStatistics();
2672 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
2673 {{"WriteThread::JoinBatchGroup:BeganWaiting",
2674 "DBImpl::WriteImpl:BeforeLeaderEnters"},
2675 {"WriteThread::AwaitState:BlockingWaiting",
2676 "WriteThread::EnterAsBatchGroupLeader:End"}});
2677 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2680 GCThread thread
[kGCNumThreads
];
2681 for (int id
= 0; id
< kGCNumThreads
; id
++) {
2683 thread
[id
].db
= db_
;
2684 thread
[id
].done
= false;
2685 env_
->StartThread(GCThreadBody
, &thread
[id
]);
2687 env_
->WaitForJoin();
2689 ASSERT_GT(TestGetTickerCount(options
, WRITE_DONE_BY_OTHER
), 0);
2691 std::vector
<std::string
> expected_db
;
2692 for (int i
= 0; i
< kGCNumThreads
* kGCNumKeys
; ++i
) {
2693 expected_db
.push_back(ToString(i
));
2695 std::sort(expected_db
.begin(), expected_db
.end());
2697 Iterator
* itr
= db_
->NewIterator(ReadOptions());
2699 for (auto x
: expected_db
) {
2700 ASSERT_TRUE(itr
->Valid());
2701 ASSERT_EQ(itr
->key().ToString(), x
);
2702 ASSERT_EQ(itr
->value().ToString(), x
);
2705 ASSERT_TRUE(!itr
->Valid());
2708 HistogramData hist_data
;
2709 options
.statistics
->histogramData(DB_WRITE
, &hist_data
);
2710 ASSERT_GT(hist_data
.average
, 0.0);
2711 } while (ChangeOptions(kSkipNoSeekToLast
));
2716 typedef std::map
<std::string
, std::string
> KVMap
;
2719 class ModelDB
: public DB
{
2721 class ModelSnapshot
: public Snapshot
{
2725 SequenceNumber
GetSequenceNumber() const override
{
2726 // no need to call this
2732 explicit ModelDB(const Options
& options
) : options_(options
) {}
2734 Status
Put(const WriteOptions
& o
, ColumnFamilyHandle
* cf
, const Slice
& k
,
2735 const Slice
& v
) override
{
2737 batch
.Put(cf
, k
, v
);
2738 return Write(o
, &batch
);
2741 Status
Close() override
{ return Status::OK(); }
2743 Status
Delete(const WriteOptions
& o
, ColumnFamilyHandle
* cf
,
2744 const Slice
& key
) override
{
2746 batch
.Delete(cf
, key
);
2747 return Write(o
, &batch
);
2749 using DB::SingleDelete
;
2750 Status
SingleDelete(const WriteOptions
& o
, ColumnFamilyHandle
* cf
,
2751 const Slice
& key
) override
{
2753 batch
.SingleDelete(cf
, key
);
2754 return Write(o
, &batch
);
2757 Status
Merge(const WriteOptions
& o
, ColumnFamilyHandle
* cf
, const Slice
& k
,
2758 const Slice
& v
) override
{
2760 batch
.Merge(cf
, k
, v
);
2761 return Write(o
, &batch
);
2764 Status
Get(const ReadOptions
& /*options*/, ColumnFamilyHandle
* /*cf*/,
2765 const Slice
& key
, PinnableSlice
* /*value*/) override
{
2766 return Status::NotSupported(key
);
2769 using DB::GetMergeOperands
;
2770 virtual Status
GetMergeOperands(
2771 const ReadOptions
& /*options*/, ColumnFamilyHandle
* /*column_family*/,
2772 const Slice
& key
, PinnableSlice
* /*slice*/,
2773 GetMergeOperandsOptions
* /*merge_operands_options*/,
2774 int* /*number_of_operands*/) override
{
2775 return Status::NotSupported(key
);
2779 std::vector
<Status
> MultiGet(
2780 const ReadOptions
& /*options*/,
2781 const std::vector
<ColumnFamilyHandle
*>& /*column_family*/,
2782 const std::vector
<Slice
>& keys
,
2783 std::vector
<std::string
>* /*values*/) override
{
2784 std::vector
<Status
> s(keys
.size(),
2785 Status::NotSupported("Not implemented."));
2789 #ifndef ROCKSDB_LITE
2790 using DB::IngestExternalFile
;
2791 Status
IngestExternalFile(
2792 ColumnFamilyHandle
* /*column_family*/,
2793 const std::vector
<std::string
>& /*external_files*/,
2794 const IngestExternalFileOptions
& /*options*/) override
{
2795 return Status::NotSupported("Not implemented.");
2798 using DB::IngestExternalFiles
;
2799 Status
IngestExternalFiles(
2800 const std::vector
<IngestExternalFileArg
>& /*args*/) override
{
2801 return Status::NotSupported("Not implemented");
2804 using DB::CreateColumnFamilyWithImport
;
2805 virtual Status
CreateColumnFamilyWithImport(
2806 const ColumnFamilyOptions
& /*options*/,
2807 const std::string
& /*column_family_name*/,
2808 const ImportColumnFamilyOptions
& /*import_options*/,
2809 const ExportImportFilesMetaData
& /*metadata*/,
2810 ColumnFamilyHandle
** /*handle*/) override
{
2811 return Status::NotSupported("Not implemented.");
2814 using DB::VerifyChecksum
;
2815 Status
VerifyChecksum(const ReadOptions
&) override
{
2816 return Status::NotSupported("Not implemented.");
2819 using DB::GetPropertiesOfAllTables
;
2820 Status
GetPropertiesOfAllTables(
2821 ColumnFamilyHandle
* /*column_family*/,
2822 TablePropertiesCollection
* /*props*/) override
{
2826 Status
GetPropertiesOfTablesInRange(
2827 ColumnFamilyHandle
* /*column_family*/, const Range
* /*range*/,
2828 std::size_t /*n*/, TablePropertiesCollection
* /*props*/) override
{
2831 #endif // ROCKSDB_LITE
2833 using DB::KeyMayExist
;
2834 bool KeyMayExist(const ReadOptions
& /*options*/,
2835 ColumnFamilyHandle
* /*column_family*/, const Slice
& /*key*/,
2836 std::string
* /*value*/,
2837 bool* value_found
= nullptr) override
{
2838 if (value_found
!= nullptr) {
2839 *value_found
= false;
2841 return true; // Not Supported directly
2843 using DB::NewIterator
;
2844 Iterator
* NewIterator(const ReadOptions
& options
,
2845 ColumnFamilyHandle
* /*column_family*/) override
{
2846 if (options
.snapshot
== nullptr) {
2847 KVMap
* saved
= new KVMap
;
2849 return new ModelIter(saved
, true);
2851 const KVMap
* snapshot_state
=
2852 &(reinterpret_cast<const ModelSnapshot
*>(options
.snapshot
)->map_
);
2853 return new ModelIter(snapshot_state
, false);
2856 Status
NewIterators(const ReadOptions
& /*options*/,
2857 const std::vector
<ColumnFamilyHandle
*>& /*column_family*/,
2858 std::vector
<Iterator
*>* /*iterators*/) override
{
2859 return Status::NotSupported("Not supported yet");
2861 const Snapshot
* GetSnapshot() override
{
2862 ModelSnapshot
* snapshot
= new ModelSnapshot
;
2863 snapshot
->map_
= map_
;
2867 void ReleaseSnapshot(const Snapshot
* snapshot
) override
{
2868 delete reinterpret_cast<const ModelSnapshot
*>(snapshot
);
2871 Status
Write(const WriteOptions
& /*options*/, WriteBatch
* batch
) override
{
2872 class Handler
: public WriteBatch::Handler
{
2875 void Put(const Slice
& key
, const Slice
& value
) override
{
2876 (*map_
)[key
.ToString()] = value
.ToString();
2878 void Merge(const Slice
& /*key*/, const Slice
& /*value*/) override
{
2879 // ignore merge for now
2880 // (*map_)[key.ToString()] = value.ToString();
2882 void Delete(const Slice
& key
) override
{ map_
->erase(key
.ToString()); }
2885 handler
.map_
= &map_
;
2886 return batch
->Iterate(&handler
);
2889 using DB::GetProperty
;
2890 bool GetProperty(ColumnFamilyHandle
* /*column_family*/,
2891 const Slice
& /*property*/, std::string
* /*value*/) override
{
2894 using DB::GetIntProperty
;
2895 bool GetIntProperty(ColumnFamilyHandle
* /*column_family*/,
2896 const Slice
& /*property*/, uint64_t* /*value*/) override
{
2899 using DB::GetMapProperty
;
2900 bool GetMapProperty(ColumnFamilyHandle
* /*column_family*/,
2901 const Slice
& /*property*/,
2902 std::map
<std::string
, std::string
>* /*value*/) override
{
2905 using DB::GetAggregatedIntProperty
;
2906 bool GetAggregatedIntProperty(const Slice
& /*property*/,
2907 uint64_t* /*value*/) override
{
2910 using DB::GetApproximateSizes
;
2911 Status
GetApproximateSizes(const SizeApproximationOptions
& /*options*/,
2912 ColumnFamilyHandle
* /*column_family*/,
2913 const Range
* /*range*/, int n
,
2914 uint64_t* sizes
) override
{
2915 for (int i
= 0; i
< n
; i
++) {
2918 return Status::OK();
2920 using DB::GetApproximateMemTableStats
;
2921 void GetApproximateMemTableStats(ColumnFamilyHandle
* /*column_family*/,
2922 const Range
& /*range*/,
2923 uint64_t* const count
,
2924 uint64_t* const size
) override
{
2928 using DB::CompactRange
;
2929 Status
CompactRange(const CompactRangeOptions
& /*options*/,
2930 ColumnFamilyHandle
* /*column_family*/,
2931 const Slice
* /*start*/, const Slice
* /*end*/) override
{
2932 return Status::NotSupported("Not supported operation.");
2935 Status
SetDBOptions(
2936 const std::unordered_map
<std::string
, std::string
>& /*new_options*/)
2938 return Status::NotSupported("Not supported operation.");
2941 using DB::CompactFiles
;
2942 Status
CompactFiles(
2943 const CompactionOptions
& /*compact_options*/,
2944 ColumnFamilyHandle
* /*column_family*/,
2945 const std::vector
<std::string
>& /*input_file_names*/,
2946 const int /*output_level*/, const int /*output_path_id*/ = -1,
2947 std::vector
<std::string
>* const /*output_file_names*/ = nullptr,
2948 CompactionJobInfo
* /*compaction_job_info*/ = nullptr) override
{
2949 return Status::NotSupported("Not supported operation.");
2952 Status
PauseBackgroundWork() override
{
2953 return Status::NotSupported("Not supported operation.");
2956 Status
ContinueBackgroundWork() override
{
2957 return Status::NotSupported("Not supported operation.");
2960 Status
EnableAutoCompaction(
2961 const std::vector
<ColumnFamilyHandle
*>& /*column_family_handles*/)
2963 return Status::NotSupported("Not supported operation.");
2966 void EnableManualCompaction() override
{ return; }
2968 void DisableManualCompaction() override
{ return; }
2970 using DB::NumberLevels
;
2971 int NumberLevels(ColumnFamilyHandle
* /*column_family*/) override
{ return 1; }
2973 using DB::MaxMemCompactionLevel
;
2974 int MaxMemCompactionLevel(ColumnFamilyHandle
* /*column_family*/) override
{
2978 using DB::Level0StopWriteTrigger
;
2979 int Level0StopWriteTrigger(ColumnFamilyHandle
* /*column_family*/) override
{
2983 const std::string
& GetName() const override
{ return name_
; }
2985 Env
* GetEnv() const override
{ return nullptr; }
2987 using DB::GetOptions
;
2988 Options
GetOptions(ColumnFamilyHandle
* /*column_family*/) const override
{
2992 using DB::GetDBOptions
;
2993 DBOptions
GetDBOptions() const override
{ return options_
; }
2996 Status
Flush(const ROCKSDB_NAMESPACE::FlushOptions
& /*options*/,
2997 ColumnFamilyHandle
* /*column_family*/) override
{
3002 const ROCKSDB_NAMESPACE::FlushOptions
& /*options*/,
3003 const std::vector
<ColumnFamilyHandle
*>& /*column_families*/) override
{
3004 return Status::OK();
3007 Status
SyncWAL() override
{ return Status::OK(); }
3009 Status
DisableFileDeletions() override
{ return Status::OK(); }
3011 Status
EnableFileDeletions(bool /*force*/) override
{ return Status::OK(); }
3012 #ifndef ROCKSDB_LITE
3014 Status
GetLiveFiles(std::vector
<std::string
>&, uint64_t* /*size*/,
3015 bool /*flush_memtable*/ = true) override
{
3016 return Status::OK();
3019 Status
GetLiveFilesChecksumInfo(
3020 FileChecksumList
* /*checksum_list*/) override
{
3021 return Status::OK();
3024 Status
GetSortedWalFiles(VectorLogPtr
& /*files*/) override
{
3025 return Status::OK();
3028 Status
GetCurrentWalFile(
3029 std::unique_ptr
<LogFile
>* /*current_log_file*/) override
{
3030 return Status::OK();
3033 virtual Status
GetCreationTimeOfOldestFile(
3034 uint64_t* /*creation_time*/) override
{
3035 return Status::NotSupported();
3038 Status
DeleteFile(std::string
/*name*/) override
{ return Status::OK(); }
3040 Status
GetUpdatesSince(
3041 ROCKSDB_NAMESPACE::SequenceNumber
,
3042 std::unique_ptr
<ROCKSDB_NAMESPACE::TransactionLogIterator
>*,
3043 const TransactionLogIterator::ReadOptions
& /*read_options*/ =
3044 TransactionLogIterator::ReadOptions()) override
{
3045 return Status::NotSupported("Not supported in Model DB");
3048 void GetColumnFamilyMetaData(ColumnFamilyHandle
* /*column_family*/,
3049 ColumnFamilyMetaData
* /*metadata*/) override
{}
3050 #endif // ROCKSDB_LITE
3052 Status
GetDbIdentity(std::string
& /*identity*/) const override
{
3053 return Status::OK();
3056 Status
GetDbSessionId(std::string
& /*session_id*/) const override
{
3057 return Status::OK();
3060 SequenceNumber
GetLatestSequenceNumber() const override
{ return 0; }
3062 bool SetPreserveDeletesSequenceNumber(SequenceNumber
/*seqnum*/) override
{
3066 ColumnFamilyHandle
* DefaultColumnFamily() const override
{ return nullptr; }
3069 class ModelIter
: public Iterator
{
3071 ModelIter(const KVMap
* map
, bool owned
)
3072 : map_(map
), owned_(owned
), iter_(map_
->end()) {}
3073 ~ModelIter() override
{
3074 if (owned_
) delete map_
;
3076 bool Valid() const override
{ return iter_
!= map_
->end(); }
3077 void SeekToFirst() override
{ iter_
= map_
->begin(); }
3078 void SeekToLast() override
{
3079 if (map_
->empty()) {
3080 iter_
= map_
->end();
3082 iter_
= map_
->find(map_
->rbegin()->first
);
3085 void Seek(const Slice
& k
) override
{
3086 iter_
= map_
->lower_bound(k
.ToString());
3088 void SeekForPrev(const Slice
& k
) override
{
3089 iter_
= map_
->upper_bound(k
.ToString());
3092 void Next() override
{ ++iter_
; }
3093 void Prev() override
{
3094 if (iter_
== map_
->begin()) {
3095 iter_
= map_
->end();
3101 Slice
key() const override
{ return iter_
->first
; }
3102 Slice
value() const override
{ return iter_
->second
; }
3103 Status
status() const override
{ return Status::OK(); }
3106 const KVMap
* const map_
;
3107 const bool owned_
; // Do we own map_
3108 KVMap::const_iterator iter_
;
3110 const Options options_
;
3112 std::string name_
= "";
3115 #ifndef ROCKSDB_VALGRIND_RUN
3116 static std::string
RandomKey(Random
* rnd
, int minimum
= 0) {
3119 len
= (rnd
->OneIn(3)
3120 ? 1 // Short sometimes to encourage collisions
3121 : (rnd
->OneIn(100) ? rnd
->Skewed(10) : rnd
->Uniform(10)));
3122 } while (len
< minimum
);
3123 return test::RandomKey(rnd
, len
);
3126 static bool CompareIterators(int step
, DB
* model
, DB
* db
,
3127 const Snapshot
* model_snap
,
3128 const Snapshot
* db_snap
) {
3129 ReadOptions options
;
3130 options
.snapshot
= model_snap
;
3131 Iterator
* miter
= model
->NewIterator(options
);
3132 options
.snapshot
= db_snap
;
3133 Iterator
* dbiter
= db
->NewIterator(options
);
3136 for (miter
->SeekToFirst(), dbiter
->SeekToFirst();
3137 ok
&& miter
->Valid() && dbiter
->Valid(); miter
->Next(), dbiter
->Next()) {
3139 if (miter
->key().compare(dbiter
->key()) != 0) {
3140 fprintf(stderr
, "step %d: Key mismatch: '%s' vs. '%s'\n", step
,
3141 EscapeString(miter
->key()).c_str(),
3142 EscapeString(dbiter
->key()).c_str());
3147 if (miter
->value().compare(dbiter
->value()) != 0) {
3148 fprintf(stderr
, "step %d: Value mismatch for key '%s': '%s' vs. '%s'\n",
3149 step
, EscapeString(miter
->key()).c_str(),
3150 EscapeString(miter
->value()).c_str(),
3151 EscapeString(dbiter
->value()).c_str());
3157 if (miter
->Valid() != dbiter
->Valid()) {
3158 fprintf(stderr
, "step %d: Mismatch at end of iterators: %d vs. %d\n",
3159 step
, miter
->Valid(), dbiter
->Valid());
3168 class DBTestRandomized
: public DBTest
,
3169 public ::testing::WithParamInterface
<int> {
3171 void SetUp() override
{ option_config_
= GetParam(); }
3173 static std::vector
<int> GenerateOptionConfigs() {
3174 std::vector
<int> option_configs
;
3175 // skip cuckoo hash as it does not support snapshot.
3176 for (int option_config
= kDefault
; option_config
< kEnd
; ++option_config
) {
3177 if (!ShouldSkipOptions(option_config
,
3178 kSkipDeletesFilterFirst
| kSkipNoSeekToLast
)) {
3179 option_configs
.push_back(option_config
);
3182 option_configs
.push_back(kBlockBasedTableWithIndexRestartInterval
);
3183 return option_configs
;
3187 INSTANTIATE_TEST_CASE_P(
3188 DBTestRandomized
, DBTestRandomized
,
3189 ::testing::ValuesIn(DBTestRandomized::GenerateOptionConfigs()));
3191 TEST_P(DBTestRandomized
, Randomized
) {
3192 anon::OptionsOverride options_override
;
3193 options_override
.skip_policy
= kSkipNoSnapshot
;
3194 Options options
= CurrentOptions(options_override
);
3195 DestroyAndReopen(options
);
3197 Random
rnd(test::RandomSeed() + GetParam());
3198 ModelDB
model(options
);
3199 const int N
= 10000;
3200 const Snapshot
* model_snap
= nullptr;
3201 const Snapshot
* db_snap
= nullptr;
3203 for (int step
= 0; step
< N
; step
++) {
3204 // TODO(sanjay): Test Get() works
3205 int p
= rnd
.Uniform(100);
3207 if (option_config_
== kHashSkipList
|| option_config_
== kHashLinkList
||
3208 option_config_
== kPlainTableFirstBytePrefix
||
3209 option_config_
== kBlockBasedTableWithWholeKeyHashIndex
||
3210 option_config_
== kBlockBasedTableWithPrefixHashIndex
) {
3213 if (p
< 45) { // Put
3214 k
= RandomKey(&rnd
, minimum
);
3215 v
= rnd
.RandomString(rnd
.OneIn(20) ? 100 + rnd
.Uniform(100)
3217 ASSERT_OK(model
.Put(WriteOptions(), k
, v
));
3218 ASSERT_OK(db_
->Put(WriteOptions(), k
, v
));
3219 } else if (p
< 90) { // Delete
3220 k
= RandomKey(&rnd
, minimum
);
3221 ASSERT_OK(model
.Delete(WriteOptions(), k
));
3222 ASSERT_OK(db_
->Delete(WriteOptions(), k
));
3223 } else { // Multi-element batch
3225 const int num
= rnd
.Uniform(8);
3226 for (int i
= 0; i
< num
; i
++) {
3227 if (i
== 0 || !rnd
.OneIn(10)) {
3228 k
= RandomKey(&rnd
, minimum
);
3230 // Periodically re-use the same key from the previous iter, so
3231 // we have multiple entries in the write batch for the same key
3234 v
= rnd
.RandomString(rnd
.Uniform(10));
3240 ASSERT_OK(model
.Write(WriteOptions(), &b
));
3241 ASSERT_OK(db_
->Write(WriteOptions(), &b
));
3244 if ((step
% 100) == 0) {
3245 // For DB instances that use the hash index + block-based table, the
3246 // iterator will be invalid right when seeking a non-existent key, right
3247 // than return a key that is close to it.
3248 if (option_config_
!= kBlockBasedTableWithWholeKeyHashIndex
&&
3249 option_config_
!= kBlockBasedTableWithPrefixHashIndex
) {
3250 ASSERT_TRUE(CompareIterators(step
, &model
, db_
, nullptr, nullptr));
3251 ASSERT_TRUE(CompareIterators(step
, &model
, db_
, model_snap
, db_snap
));
3254 // Save a snapshot from each DB this time that we'll use next
3255 // time we compare things, to make sure the current state is
3256 // preserved with the snapshot
3257 if (model_snap
!= nullptr) model
.ReleaseSnapshot(model_snap
);
3258 if (db_snap
!= nullptr) db_
->ReleaseSnapshot(db_snap
);
3261 ASSERT_TRUE(CompareIterators(step
, &model
, db_
, nullptr, nullptr));
3263 model_snap
= model
.GetSnapshot();
3264 db_snap
= db_
->GetSnapshot();
3267 if (model_snap
!= nullptr) model
.ReleaseSnapshot(model_snap
);
3268 if (db_snap
!= nullptr) db_
->ReleaseSnapshot(db_snap
);
3270 #endif // ROCKSDB_VALGRIND_RUN
3272 TEST_F(DBTest
, BlockBasedTablePrefixIndexTest
) {
3273 // create a DB with block prefix index
3274 BlockBasedTableOptions table_options
;
3275 Options options
= CurrentOptions();
3276 table_options
.index_type
= BlockBasedTableOptions::kHashSearch
;
3277 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
3278 options
.prefix_extractor
.reset(NewFixedPrefixTransform(1));
3281 ASSERT_OK(Put("k1", "v1"));
3283 ASSERT_OK(Put("k2", "v2"));
3285 // Reopen it without prefix extractor, make sure everything still works.
3286 // RocksDB should just fall back to the binary index.
3287 table_options
.index_type
= BlockBasedTableOptions::kBinarySearch
;
3288 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
3289 options
.prefix_extractor
.reset();
3292 ASSERT_EQ("v1", Get("k1"));
3293 ASSERT_EQ("v2", Get("k2"));
3296 TEST_F(DBTest
, BlockBasedTablePrefixIndexTotalOrderSeek
) {
3297 // create a DB with block prefix index
3298 BlockBasedTableOptions table_options
;
3299 Options options
= CurrentOptions();
3300 options
.max_open_files
= 10;
3301 table_options
.index_type
= BlockBasedTableOptions::kHashSearch
;
3302 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
3303 options
.prefix_extractor
.reset(NewFixedPrefixTransform(1));
3305 // RocksDB sanitize max open files to at least 20. Modify it back.
3306 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3307 "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg
) {
3308 int* max_open_files
= static_cast<int*>(arg
);
3309 *max_open_files
= 11;
3311 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3314 ASSERT_OK(Put("k1", "v1"));
3317 CompactRangeOptions cro
;
3318 cro
.change_level
= true;
3319 cro
.target_level
= 1;
3320 ASSERT_OK(db_
->CompactRange(cro
, nullptr, nullptr));
3322 // Force evict tables
3323 dbfull()->TEST_table_cache()->SetCapacity(0);
3324 // Make table cache to keep one entry.
3325 dbfull()->TEST_table_cache()->SetCapacity(1);
3327 ReadOptions read_options
;
3328 read_options
.total_order_seek
= true;
3330 std::unique_ptr
<Iterator
> iter(db_
->NewIterator(read_options
));
3332 ASSERT_TRUE(iter
->Valid());
3333 ASSERT_EQ("k1", iter
->key().ToString());
3336 // After total order seek, prefix index should still be used.
3337 read_options
.total_order_seek
= false;
3339 std::unique_ptr
<Iterator
> iter(db_
->NewIterator(read_options
));
3341 ASSERT_TRUE(iter
->Valid());
3342 ASSERT_EQ("k1", iter
->key().ToString());
3344 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3347 TEST_F(DBTest
, ChecksumTest
) {
3348 BlockBasedTableOptions table_options
;
3349 Options options
= CurrentOptions();
3351 table_options
.checksum
= kCRC32c
;
3352 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
3354 ASSERT_OK(Put("a", "b"));
3355 ASSERT_OK(Put("c", "d"));
3356 ASSERT_OK(Flush()); // table with crc checksum
3358 table_options
.checksum
= kxxHash
;
3359 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
3361 ASSERT_OK(Put("e", "f"));
3362 ASSERT_OK(Put("g", "h"));
3363 ASSERT_OK(Flush()); // table with xxhash checksum
3365 table_options
.checksum
= kCRC32c
;
3366 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
3368 ASSERT_EQ("b", Get("a"));
3369 ASSERT_EQ("d", Get("c"));
3370 ASSERT_EQ("f", Get("e"));
3371 ASSERT_EQ("h", Get("g"));
3373 table_options
.checksum
= kCRC32c
;
3374 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
3376 ASSERT_EQ("b", Get("a"));
3377 ASSERT_EQ("d", Get("c"));
3378 ASSERT_EQ("f", Get("e"));
3379 ASSERT_EQ("h", Get("g"));
3382 #ifndef ROCKSDB_LITE
3383 TEST_P(DBTestWithParam
, FIFOCompactionTest
) {
3384 for (int iter
= 0; iter
< 2; ++iter
) {
3385 // first iteration -- auto compaction
3386 // second iteration -- manual compaction
3388 options
.compaction_style
= kCompactionStyleFIFO
;
3389 options
.write_buffer_size
= 100 << 10; // 100KB
3390 options
.arena_block_size
= 4096;
3391 options
.compaction_options_fifo
.max_table_files_size
= 500 << 10; // 500KB
3392 options
.compression
= kNoCompression
;
3393 options
.create_if_missing
= true;
3394 options
.max_subcompactions
= max_subcompactions_
;
3396 options
.disable_auto_compactions
= true;
3398 options
= CurrentOptions(options
);
3399 DestroyAndReopen(options
);
3402 for (int i
= 0; i
< 6; ++i
) {
3403 for (int j
= 0; j
< 110; ++j
) {
3404 ASSERT_OK(Put(ToString(i
* 100 + j
), rnd
.RandomString(980)));
3406 // flush should happen here
3407 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
3410 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3412 CompactRangeOptions cro
;
3413 cro
.exclusive_manual_compaction
= exclusive_manual_compaction_
;
3414 ASSERT_OK(db_
->CompactRange(cro
, nullptr, nullptr));
3416 // only 5 files should survive
3417 ASSERT_EQ(NumTableFilesAtLevel(0), 5);
3418 for (int i
= 0; i
< 50; ++i
) {
3419 // these keys should be deleted in previous compaction
3420 ASSERT_EQ("NOT_FOUND", Get(ToString(i
)));
3425 TEST_F(DBTest
, FIFOCompactionTestWithCompaction
) {
3427 options
.compaction_style
= kCompactionStyleFIFO
;
3428 options
.write_buffer_size
= 20 << 10; // 20K
3429 options
.arena_block_size
= 4096;
3430 options
.compaction_options_fifo
.max_table_files_size
= 1500 << 10; // 1MB
3431 options
.compaction_options_fifo
.allow_compaction
= true;
3432 options
.level0_file_num_compaction_trigger
= 6;
3433 options
.compression
= kNoCompression
;
3434 options
.create_if_missing
= true;
3435 options
= CurrentOptions(options
);
3436 DestroyAndReopen(options
);
3439 for (int i
= 0; i
< 60; i
++) {
3440 // Generate and flush a file about 20KB.
3441 for (int j
= 0; j
< 20; j
++) {
3442 ASSERT_OK(Put(ToString(i
* 20 + j
), rnd
.RandomString(980)));
3445 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3447 // It should be compacted to 10 files.
3448 ASSERT_EQ(NumTableFilesAtLevel(0), 10);
3450 for (int i
= 0; i
< 60; i
++) {
3451 // Generate and flush a file about 20KB.
3452 for (int j
= 0; j
< 20; j
++) {
3453 ASSERT_OK(Put(ToString(i
* 20 + j
+ 2000), rnd
.RandomString(980)));
3456 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3459 // It should be compacted to no more than 20 files.
3460 ASSERT_GT(NumTableFilesAtLevel(0), 10);
3461 ASSERT_LT(NumTableFilesAtLevel(0), 18);
3462 // Size limit is still guaranteed.
3463 ASSERT_LE(SizeAtLevel(0),
3464 options
.compaction_options_fifo
.max_table_files_size
);
3467 TEST_F(DBTest
, FIFOCompactionStyleWithCompactionAndDelete
) {
3469 options
.compaction_style
= kCompactionStyleFIFO
;
3470 options
.write_buffer_size
= 20 << 10; // 20K
3471 options
.arena_block_size
= 4096;
3472 options
.compaction_options_fifo
.max_table_files_size
= 1500 << 10; // 1MB
3473 options
.compaction_options_fifo
.allow_compaction
= true;
3474 options
.level0_file_num_compaction_trigger
= 3;
3475 options
.compression
= kNoCompression
;
3476 options
.create_if_missing
= true;
3477 options
= CurrentOptions(options
);
3478 DestroyAndReopen(options
);
3481 for (int i
= 0; i
< 3; i
++) {
3482 // Each file contains a different key which will be dropped later.
3483 ASSERT_OK(Put("a" + ToString(i
), rnd
.RandomString(500)));
3484 ASSERT_OK(Put("key" + ToString(i
), ""));
3485 ASSERT_OK(Put("z" + ToString(i
), rnd
.RandomString(500)));
3487 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3489 ASSERT_EQ(NumTableFilesAtLevel(0), 1);
3490 for (int i
= 0; i
< 3; i
++) {
3491 ASSERT_EQ("", Get("key" + ToString(i
)));
3493 for (int i
= 0; i
< 3; i
++) {
3494 // Each file contains a different key which will be dropped later.
3495 ASSERT_OK(Put("a" + ToString(i
), rnd
.RandomString(500)));
3496 ASSERT_OK(Delete("key" + ToString(i
)));
3497 ASSERT_OK(Put("z" + ToString(i
), rnd
.RandomString(500)));
3499 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3501 ASSERT_EQ(NumTableFilesAtLevel(0), 2);
3502 for (int i
= 0; i
< 3; i
++) {
3503 ASSERT_EQ("NOT_FOUND", Get("key" + ToString(i
)));
3507 // Check that FIFO-with-TTL is not supported with max_open_files != -1.
3508 TEST_F(DBTest
, FIFOCompactionWithTTLAndMaxOpenFilesTest
) {
3510 options
.compaction_style
= kCompactionStyleFIFO
;
3511 options
.create_if_missing
= true;
3512 options
.ttl
= 600; // seconds
3514 // TTL is now supported with max_open_files != -1.
3515 options
.max_open_files
= 100;
3516 options
= CurrentOptions(options
);
3517 ASSERT_OK(TryReopen(options
));
3519 options
.max_open_files
= -1;
3520 ASSERT_OK(TryReopen(options
));
3523 // Check that FIFO-with-TTL is supported only with BlockBasedTableFactory.
3524 TEST_F(DBTest
, FIFOCompactionWithTTLAndVariousTableFormatsTest
) {
3526 options
.compaction_style
= kCompactionStyleFIFO
;
3527 options
.create_if_missing
= true;
3528 options
.ttl
= 600; // seconds
3530 options
= CurrentOptions(options
);
3531 options
.table_factory
.reset(NewBlockBasedTableFactory());
3532 ASSERT_OK(TryReopen(options
));
3535 options
.table_factory
.reset(NewPlainTableFactory());
3536 ASSERT_TRUE(TryReopen(options
).IsNotSupported());
3539 options
.table_factory
.reset(NewAdaptiveTableFactory());
3540 ASSERT_TRUE(TryReopen(options
).IsNotSupported());
3543 TEST_F(DBTest
, FIFOCompactionWithTTLTest
) {
3545 options
.compaction_style
= kCompactionStyleFIFO
;
3546 options
.write_buffer_size
= 10 << 10; // 10KB
3547 options
.arena_block_size
= 4096;
3548 options
.compression
= kNoCompression
;
3549 options
.create_if_missing
= true;
3550 env_
->SetMockSleep();
3553 // Test to make sure that all files with expired ttl are deleted on next
3554 // manual compaction.
3556 // NOTE: Presumed unnecessary and removed: resetting mock time in env
3558 options
.compaction_options_fifo
.max_table_files_size
= 150 << 10; // 150KB
3559 options
.compaction_options_fifo
.allow_compaction
= false;
3560 options
.ttl
= 1 * 60 * 60 ; // 1 hour
3561 options
= CurrentOptions(options
);
3562 DestroyAndReopen(options
);
3565 for (int i
= 0; i
< 10; i
++) {
3566 // Generate and flush a file about 10KB.
3567 for (int j
= 0; j
< 10; j
++) {
3568 ASSERT_OK(Put(ToString(i
* 20 + j
), rnd
.RandomString(980)));
3571 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3573 ASSERT_EQ(NumTableFilesAtLevel(0), 10);
3575 // Sleep for 2 hours -- which is much greater than TTL.
3576 env_
->MockSleepForSeconds(2 * 60 * 60);
3578 // Since no flushes and compactions have run, the db should still be in
3579 // the same state even after considerable time has passed.
3580 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3581 ASSERT_EQ(NumTableFilesAtLevel(0), 10);
3583 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
3584 ASSERT_EQ(NumTableFilesAtLevel(0), 0);
3587 // Test to make sure that all files with expired ttl are deleted on next
3588 // automatic compaction.
3590 options
.compaction_options_fifo
.max_table_files_size
= 150 << 10; // 150KB
3591 options
.compaction_options_fifo
.allow_compaction
= false;
3592 options
.ttl
= 1 * 60 * 60; // 1 hour
3593 options
= CurrentOptions(options
);
3594 DestroyAndReopen(options
);
3597 for (int i
= 0; i
< 10; i
++) {
3598 // Generate and flush a file about 10KB.
3599 for (int j
= 0; j
< 10; j
++) {
3600 ASSERT_OK(Put(ToString(i
* 20 + j
), rnd
.RandomString(980)));
3603 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3605 ASSERT_EQ(NumTableFilesAtLevel(0), 10);
3607 // Sleep for 2 hours -- which is much greater than TTL.
3608 env_
->MockSleepForSeconds(2 * 60 * 60);
3609 // Just to make sure that we are in the same state even after sleeping.
3610 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3611 ASSERT_EQ(NumTableFilesAtLevel(0), 10);
3613 // Create 1 more file to trigger TTL compaction. The old files are dropped.
3614 for (int i
= 0; i
< 1; i
++) {
3615 for (int j
= 0; j
< 10; j
++) {
3616 ASSERT_OK(Put(ToString(i
* 20 + j
), rnd
.RandomString(980)));
3621 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3622 // Only the new 10 files remain.
3623 ASSERT_EQ(NumTableFilesAtLevel(0), 1);
3624 ASSERT_LE(SizeAtLevel(0),
3625 options
.compaction_options_fifo
.max_table_files_size
);
3628 // Test that shows the fall back to size-based FIFO compaction if TTL-based
3629 // deletion doesn't move the total size to be less than max_table_files_size.
3631 options
.write_buffer_size
= 10 << 10; // 10KB
3632 options
.compaction_options_fifo
.max_table_files_size
= 150 << 10; // 150KB
3633 options
.compaction_options_fifo
.allow_compaction
= false;
3634 options
.ttl
= 1 * 60 * 60; // 1 hour
3635 options
= CurrentOptions(options
);
3636 DestroyAndReopen(options
);
3639 for (int i
= 0; i
< 3; i
++) {
3640 // Generate and flush a file about 10KB.
3641 for (int j
= 0; j
< 10; j
++) {
3642 ASSERT_OK(Put(ToString(i
* 20 + j
), rnd
.RandomString(980)));
3645 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3647 ASSERT_EQ(NumTableFilesAtLevel(0), 3);
3649 // Sleep for 2 hours -- which is much greater than TTL.
3650 env_
->MockSleepForSeconds(2 * 60 * 60);
3651 // Just to make sure that we are in the same state even after sleeping.
3652 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3653 ASSERT_EQ(NumTableFilesAtLevel(0), 3);
3655 for (int i
= 0; i
< 5; i
++) {
3656 for (int j
= 0; j
< 140; j
++) {
3657 ASSERT_OK(Put(ToString(i
* 20 + j
), rnd
.RandomString(980)));
3660 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3662 // Size limit is still guaranteed.
3663 ASSERT_LE(SizeAtLevel(0),
3664 options
.compaction_options_fifo
.max_table_files_size
);
3667 // Test with TTL + Intra-L0 compactions.
3669 options
.compaction_options_fifo
.max_table_files_size
= 150 << 10; // 150KB
3670 options
.compaction_options_fifo
.allow_compaction
= true;
3671 options
.ttl
= 1 * 60 * 60; // 1 hour
3672 options
.level0_file_num_compaction_trigger
= 6;
3673 options
= CurrentOptions(options
);
3674 DestroyAndReopen(options
);
3677 for (int i
= 0; i
< 10; i
++) {
3678 // Generate and flush a file about 10KB.
3679 for (int j
= 0; j
< 10; j
++) {
3680 ASSERT_OK(Put(ToString(i
* 20 + j
), rnd
.RandomString(980)));
3683 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3685 // With Intra-L0 compaction, out of 10 files, 6 files will be compacted to 1
3686 // (due to level0_file_num_compaction_trigger = 6).
3687 // So total files = 1 + remaining 4 = 5.
3688 ASSERT_EQ(NumTableFilesAtLevel(0), 5);
3690 // Sleep for 2 hours -- which is much greater than TTL.
3691 env_
->MockSleepForSeconds(2 * 60 * 60);
3692 // Just to make sure that we are in the same state even after sleeping.
3693 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3694 ASSERT_EQ(NumTableFilesAtLevel(0), 5);
3696 // Create 10 more files. The old 5 files are dropped as their ttl expired.
3697 for (int i
= 0; i
< 10; i
++) {
3698 for (int j
= 0; j
< 10; j
++) {
3699 ASSERT_OK(Put(ToString(i
* 20 + j
), rnd
.RandomString(980)));
3702 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3704 ASSERT_EQ(NumTableFilesAtLevel(0), 5);
3705 ASSERT_LE(SizeAtLevel(0),
3706 options
.compaction_options_fifo
.max_table_files_size
);
3709 // Test with large TTL + Intra-L0 compactions.
3710 // Files dropped based on size, as ttl doesn't kick in.
3712 options
.write_buffer_size
= 20 << 10; // 20K
3713 options
.compaction_options_fifo
.max_table_files_size
= 1500 << 10; // 1.5MB
3714 options
.compaction_options_fifo
.allow_compaction
= true;
3715 options
.ttl
= 1 * 60 * 60; // 1 hour
3716 options
.level0_file_num_compaction_trigger
= 6;
3717 options
= CurrentOptions(options
);
3718 DestroyAndReopen(options
);
3721 for (int i
= 0; i
< 60; i
++) {
3722 // Generate and flush a file about 20KB.
3723 for (int j
= 0; j
< 20; j
++) {
3724 ASSERT_OK(Put(ToString(i
* 20 + j
), rnd
.RandomString(980)));
3727 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3729 // It should be compacted to 10 files.
3730 ASSERT_EQ(NumTableFilesAtLevel(0), 10);
3732 for (int i
= 0; i
< 60; i
++) {
3733 // Generate and flush a file about 20KB.
3734 for (int j
= 0; j
< 20; j
++) {
3735 ASSERT_OK(Put(ToString(i
* 20 + j
+ 2000), rnd
.RandomString(980)));
3738 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3741 // It should be compacted to no more than 20 files.
3742 ASSERT_GT(NumTableFilesAtLevel(0), 10);
3743 ASSERT_LT(NumTableFilesAtLevel(0), 18);
3744 // Size limit is still guaranteed.
3745 ASSERT_LE(SizeAtLevel(0),
3746 options
.compaction_options_fifo
.max_table_files_size
);
3749 #endif // ROCKSDB_LITE
3751 #ifndef ROCKSDB_LITE
3753 * This test is not reliable enough as it heavily depends on disk behavior.
3754 * Disable as it is flaky.
3756 TEST_F(DBTest
, DISABLED_RateLimitingTest
) {
3757 Options options
= CurrentOptions();
3758 options
.write_buffer_size
= 1 << 20; // 1MB
3759 options
.level0_file_num_compaction_trigger
= 2;
3760 options
.target_file_size_base
= 1 << 20; // 1MB
3761 options
.max_bytes_for_level_base
= 4 << 20; // 4MB
3762 options
.max_bytes_for_level_multiplier
= 4;
3763 options
.compression
= kNoCompression
;
3764 options
.create_if_missing
= true;
3766 options
.statistics
= ROCKSDB_NAMESPACE::CreateDBStatistics();
3767 options
.IncreaseParallelism(4);
3768 DestroyAndReopen(options
);
3771 wo
.disableWAL
= true;
3773 // # no rate limiting
3775 uint64_t start
= env_
->NowMicros();
3777 for (int64_t i
= 0; i
< (96 << 10); ++i
) {
3778 ASSERT_OK(Put(rnd
.RandomString(32), rnd
.RandomString((1 << 10) + 1), wo
));
3780 uint64_t elapsed
= env_
->NowMicros() - start
;
3781 double raw_rate
= env_
->bytes_written_
* 1000000.0 / elapsed
;
3782 uint64_t rate_limiter_drains
=
3783 TestGetTickerCount(options
, NUMBER_RATE_LIMITER_DRAINS
);
3784 ASSERT_EQ(0, rate_limiter_drains
);
3787 // # rate limiting with 0.7 x threshold
3788 options
.rate_limiter
.reset(
3789 NewGenericRateLimiter(static_cast<int64_t>(0.7 * raw_rate
)));
3790 env_
->bytes_written_
= 0;
3791 DestroyAndReopen(options
);
3793 start
= env_
->NowMicros();
3795 for (int64_t i
= 0; i
< (96 << 10); ++i
) {
3796 ASSERT_OK(Put(rnd
.RandomString(32), rnd
.RandomString((1 << 10) + 1), wo
));
3798 rate_limiter_drains
=
3799 TestGetTickerCount(options
, NUMBER_RATE_LIMITER_DRAINS
) -
3800 rate_limiter_drains
;
3801 elapsed
= env_
->NowMicros() - start
;
3803 ASSERT_EQ(options
.rate_limiter
->GetTotalBytesThrough(), env_
->bytes_written_
);
3804 // Most intervals should've been drained (interval time is 100ms, elapsed is
3806 ASSERT_GT(rate_limiter_drains
, 0);
3807 ASSERT_LE(rate_limiter_drains
, elapsed
/ 100000 + 1);
3808 double ratio
= env_
->bytes_written_
* 1000000 / elapsed
/ raw_rate
;
3809 fprintf(stderr
, "write rate ratio = %.2lf, expected 0.7\n", ratio
);
3810 ASSERT_TRUE(ratio
< 0.8);
3812 // # rate limiting with half of the raw_rate
3813 options
.rate_limiter
.reset(
3814 NewGenericRateLimiter(static_cast<int64_t>(raw_rate
/ 2)));
3815 env_
->bytes_written_
= 0;
3816 DestroyAndReopen(options
);
3818 start
= env_
->NowMicros();
3820 for (int64_t i
= 0; i
< (96 << 10); ++i
) {
3821 ASSERT_OK(Put(rnd
.RandomString(32), rnd
.RandomString((1 << 10) + 1), wo
));
3823 elapsed
= env_
->NowMicros() - start
;
3824 rate_limiter_drains
=
3825 TestGetTickerCount(options
, NUMBER_RATE_LIMITER_DRAINS
) -
3826 rate_limiter_drains
;
3828 ASSERT_EQ(options
.rate_limiter
->GetTotalBytesThrough(), env_
->bytes_written_
);
3829 // Most intervals should've been drained (interval time is 100ms, elapsed is
3831 ASSERT_GT(rate_limiter_drains
, elapsed
/ 100000 / 2);
3832 ASSERT_LE(rate_limiter_drains
, elapsed
/ 100000 + 1);
3833 ratio
= env_
->bytes_written_
* 1000000 / elapsed
/ raw_rate
;
3834 fprintf(stderr
, "write rate ratio = %.2lf, expected 0.5\n", ratio
);
3835 ASSERT_LT(ratio
, 0.6);
3838 TEST_F(DBTest
, TableOptionsSanitizeTest
) {
3839 Options options
= CurrentOptions();
3840 options
.create_if_missing
= true;
3841 DestroyAndReopen(options
);
3842 ASSERT_EQ(db_
->GetOptions().allow_mmap_reads
, false);
3844 options
.table_factory
.reset(NewPlainTableFactory());
3845 options
.prefix_extractor
.reset(NewNoopTransform());
3847 ASSERT_TRUE(!TryReopen(options
).IsNotSupported());
3849 // Test for check of prefix_extractor when hash index is used for
3850 // block-based table
3851 BlockBasedTableOptions to
;
3852 to
.index_type
= BlockBasedTableOptions::kHashSearch
;
3853 options
= CurrentOptions();
3854 options
.create_if_missing
= true;
3855 options
.table_factory
.reset(NewBlockBasedTableFactory(to
));
3856 ASSERT_TRUE(TryReopen(options
).IsInvalidArgument());
3857 options
.prefix_extractor
.reset(NewFixedPrefixTransform(1));
3858 ASSERT_OK(TryReopen(options
));
3861 TEST_F(DBTest
, ConcurrentMemtableNotSupported
) {
3862 Options options
= CurrentOptions();
3863 options
.allow_concurrent_memtable_write
= true;
3864 options
.soft_pending_compaction_bytes_limit
= 0;
3865 options
.hard_pending_compaction_bytes_limit
= 100;
3866 options
.create_if_missing
= true;
3868 DestroyDB(dbname_
, options
);
3869 options
.memtable_factory
.reset(NewHashLinkListRepFactory(4, 0, 3, true, 4));
3870 ASSERT_NOK(TryReopen(options
));
3872 options
.memtable_factory
.reset(new SkipListFactory
);
3873 ASSERT_OK(TryReopen(options
));
3875 ColumnFamilyOptions
cf_options(options
);
3876 cf_options
.memtable_factory
.reset(
3877 NewHashLinkListRepFactory(4, 0, 3, true, 4));
3878 ColumnFamilyHandle
* handle
;
3879 ASSERT_NOK(db_
->CreateColumnFamily(cf_options
, "name", &handle
));
3882 #endif // ROCKSDB_LITE
3884 TEST_F(DBTest
, SanitizeNumThreads
) {
3885 for (int attempt
= 0; attempt
< 2; attempt
++) {
3886 const size_t kTotalTasks
= 8;
3887 test::SleepingBackgroundTask sleeping_tasks
[kTotalTasks
];
3889 Options options
= CurrentOptions();
3891 options
.max_background_compactions
= 3;
3892 options
.max_background_flushes
= 2;
3894 options
.create_if_missing
= true;
3895 DestroyAndReopen(options
);
3897 for (size_t i
= 0; i
< kTotalTasks
; i
++) {
3898 // Insert 5 tasks to low priority queue and 5 tasks to high priority queue
3899 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
,
3901 (i
< 4) ? Env::Priority::LOW
: Env::Priority::HIGH
);
3904 // Wait until 10s for they are scheduled.
3905 for (int i
= 0; i
< 10000; i
++) {
3906 if (options
.env
->GetThreadPoolQueueLen(Env::Priority::LOW
) <= 1 &&
3907 options
.env
->GetThreadPoolQueueLen(Env::Priority::HIGH
) <= 2) {
3910 env_
->SleepForMicroseconds(1000);
3913 // pool size 3, total task 4. Queue size should be 1.
3914 ASSERT_EQ(1U, options
.env
->GetThreadPoolQueueLen(Env::Priority::LOW
));
3915 // pool size 2, total task 4. Queue size should be 2.
3916 ASSERT_EQ(2U, options
.env
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
3918 for (size_t i
= 0; i
< kTotalTasks
; i
++) {
3919 sleeping_tasks
[i
].WakeUp();
3920 sleeping_tasks
[i
].WaitUntilDone();
3923 ASSERT_OK(Put("abc", "def"));
3924 ASSERT_EQ("def", Get("abc"));
3926 ASSERT_EQ("def", Get("abc"));
3930 TEST_F(DBTest
, WriteSingleThreadEntry
) {
3931 std::vector
<port::Thread
> threads
;
3932 dbfull()->TEST_LockMutex();
3933 auto w
= dbfull()->TEST_BeginWrite();
3934 threads
.emplace_back([&] { Put("a", "b"); });
3935 env_
->SleepForMicroseconds(10000);
3936 threads
.emplace_back([&] { Flush(); });
3937 env_
->SleepForMicroseconds(10000);
3938 dbfull()->TEST_UnlockMutex();
3939 dbfull()->TEST_LockMutex();
3940 dbfull()->TEST_EndWrite(w
);
3941 dbfull()->TEST_UnlockMutex();
3943 for (auto& t
: threads
) {
3948 TEST_F(DBTest
, ConcurrentFlushWAL
) {
3949 const size_t cnt
= 100;
3954 for (bool two_write_queues
: {false, true}) {
3955 for (bool manual_wal_flush
: {false, true}) {
3956 options
.two_write_queues
= two_write_queues
;
3957 options
.manual_wal_flush
= manual_wal_flush
;
3958 options
.create_if_missing
= true;
3959 DestroyAndReopen(options
);
3960 std::vector
<port::Thread
> threads
;
3961 threads
.emplace_back([&] {
3962 for (size_t i
= 0; i
< cnt
; i
++) {
3963 auto istr
= ToString(i
);
3964 db_
->Put(wopt
, db_
->DefaultColumnFamily(), "a" + istr
, "b" + istr
);
3967 if (two_write_queues
) {
3968 threads
.emplace_back([&] {
3969 for (size_t i
= cnt
; i
< 2 * cnt
; i
++) {
3970 auto istr
= ToString(i
);
3972 batch
.Put("a" + istr
, "b" + istr
);
3973 dbfull()->WriteImpl(wopt
, &batch
, nullptr, nullptr, 0, true);
3977 threads
.emplace_back([&] {
3978 for (size_t i
= 0; i
< cnt
* 100; i
++) { // FlushWAL is faster than Put
3979 db_
->FlushWAL(false);
3982 for (auto& t
: threads
) {
3985 options
.create_if_missing
= false;
3986 // Recover from the wal and make sure that it is not corrupted
3988 for (size_t i
= 0; i
< cnt
; i
++) {
3990 auto istr
= ToString(i
);
3992 db_
->Get(ropt
, db_
->DefaultColumnFamily(), "a" + istr
, &pval
));
3993 ASSERT_TRUE(pval
== ("b" + istr
));
3999 #ifndef ROCKSDB_LITE
4000 TEST_F(DBTest
, DynamicMemtableOptions
) {
4001 const uint64_t k64KB
= 1 << 16;
4002 const uint64_t k128KB
= 1 << 17;
4003 const uint64_t k5KB
= 5 * 1024;
4006 options
.create_if_missing
= true;
4007 options
.compression
= kNoCompression
;
4008 options
.max_background_compactions
= 1;
4009 options
.write_buffer_size
= k64KB
;
4010 options
.arena_block_size
= 16 * 1024;
4011 options
.max_write_buffer_number
= 2;
4012 // Don't trigger compact/slowdown/stop
4013 options
.level0_file_num_compaction_trigger
= 1024;
4014 options
.level0_slowdown_writes_trigger
= 1024;
4015 options
.level0_stop_writes_trigger
= 1024;
4016 DestroyAndReopen(options
);
4018 auto gen_l0_kb
= [this](int size
) {
4019 const int kNumPutsBeforeWaitForFlush
= 64;
4021 for (int i
= 0; i
< size
; i
++) {
4022 ASSERT_OK(Put(Key(i
), rnd
.RandomString(1024)));
4024 // The following condition prevents a race condition between flush jobs
4025 // acquiring work and this thread filling up multiple memtables. Without
4026 // this, the flush might produce less files than expected because
4027 // multiple memtables are flushed into a single L0 file. This race
4028 // condition affects assertion (A).
4029 if (i
% kNumPutsBeforeWaitForFlush
== kNumPutsBeforeWaitForFlush
- 1) {
4030 dbfull()->TEST_WaitForFlushMemTable();
4033 dbfull()->TEST_WaitForFlushMemTable();
4036 // Test write_buffer_size
4038 ASSERT_EQ(NumTableFilesAtLevel(0), 1);
4039 ASSERT_LT(SizeAtLevel(0), k64KB
+ k5KB
);
4040 ASSERT_GT(SizeAtLevel(0), k64KB
- k5KB
* 2);
4043 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
4044 ASSERT_EQ(NumTableFilesAtLevel(0), 0);
4046 // Increase buffer size
4047 ASSERT_OK(dbfull()->SetOptions({
4048 {"write_buffer_size", "131072"},
4051 // The existing memtable inflated 64KB->128KB when we invoked SetOptions().
4052 // Write 192KB, we should have a 128KB L0 file and a memtable with 64KB data.
4054 ASSERT_EQ(NumTableFilesAtLevel(0), 1); // (A)
4055 ASSERT_LT(SizeAtLevel(0), k128KB
+ 2 * k5KB
);
4056 ASSERT_GT(SizeAtLevel(0), k128KB
- 4 * k5KB
);
4058 // Decrease buffer size below current usage
4059 ASSERT_OK(dbfull()->SetOptions({
4060 {"write_buffer_size", "65536"},
4062 // The existing memtable became eligible for flush when we reduced its
4063 // capacity to 64KB. Two keys need to be added to trigger flush: first causes
4064 // memtable to be marked full, second schedules the flush. Then we should have
4065 // a 128KB L0 file, a 64KB L0 file, and a memtable with just one key.
4067 ASSERT_EQ(NumTableFilesAtLevel(0), 2);
4068 ASSERT_LT(SizeAtLevel(0), k128KB
+ k64KB
+ 2 * k5KB
);
4069 ASSERT_GT(SizeAtLevel(0), k128KB
+ k64KB
- 4 * k5KB
);
4071 // Test max_write_buffer_number
4072 // Block compaction thread, which will also block the flushes because
4073 // max_background_flushes == 0, so flushes are getting executed by the
4074 // compaction thread
4075 env_
->SetBackgroundThreads(1, Env::LOW
);
4076 test::SleepingBackgroundTask sleeping_task_low
;
4077 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
4078 Env::Priority::LOW
);
4079 // Start from scratch and disable compaction/flush. Flush can only happen
4080 // during compaction but trigger is pretty high
4081 options
.disable_auto_compactions
= true;
4082 DestroyAndReopen(options
);
4083 env_
->SetBackgroundThreads(0, Env::HIGH
);
4085 // Put until writes are stopped, bounded by 256 puts. We should see stop at
4090 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4091 "DBImpl::DelayWrite:Wait",
4092 [&](void* /*arg*/) { sleeping_task_low
.WakeUp(); });
4093 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4095 while (!sleeping_task_low
.WokenUp() && count
< 256) {
4096 ASSERT_OK(Put(Key(count
), rnd
.RandomString(1024), WriteOptions()));
4099 ASSERT_GT(static_cast<double>(count
), 128 * 0.8);
4100 ASSERT_LT(static_cast<double>(count
), 128 * 1.2);
4102 sleeping_task_low
.WaitUntilDone();
4105 ASSERT_OK(dbfull()->SetOptions({
4106 {"max_write_buffer_number", "8"},
4108 // Clean up memtable and L0
4109 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
4111 sleeping_task_low
.Reset();
4112 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
4113 Env::Priority::LOW
);
4115 while (!sleeping_task_low
.WokenUp() && count
< 1024) {
4116 ASSERT_OK(Put(Key(count
), rnd
.RandomString(1024), WriteOptions()));
4119 // Windows fails this test. Will tune in the future and figure out
4122 ASSERT_GT(static_cast<double>(count
), 512 * 0.8);
4123 ASSERT_LT(static_cast<double>(count
), 512 * 1.2);
4125 sleeping_task_low
.WaitUntilDone();
4128 ASSERT_OK(dbfull()->SetOptions({
4129 {"max_write_buffer_number", "4"},
4131 // Clean up memtable and L0
4132 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
4134 sleeping_task_low
.Reset();
4135 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
4136 Env::Priority::LOW
);
4139 while (!sleeping_task_low
.WokenUp() && count
< 1024) {
4140 ASSERT_OK(Put(Key(count
), rnd
.RandomString(1024), WriteOptions()));
4143 // Windows fails this test. Will tune in the future and figure out
4146 ASSERT_GT(static_cast<double>(count
), 256 * 0.8);
4147 ASSERT_LT(static_cast<double>(count
), 266 * 1.2);
4149 sleeping_task_low
.WaitUntilDone();
4151 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4153 #endif // ROCKSDB_LITE
4155 #ifdef ROCKSDB_USING_THREAD_STATUS
4157 void VerifyOperationCount(Env
* env
, ThreadStatus::OperationType op_type
,
4158 int expected_count
) {
4160 std::vector
<ThreadStatus
> thread_list
;
4161 ASSERT_OK(env
->GetThreadList(&thread_list
));
4162 for (auto thread
: thread_list
) {
4163 if (thread
.operation_type
== op_type
) {
4167 ASSERT_EQ(op_count
, expected_count
);
4171 TEST_F(DBTest
, GetThreadStatus
) {
4174 options
.enable_thread_tracking
= true;
4177 std::vector
<ThreadStatus
> thread_list
;
4178 Status s
= env_
->GetThreadList(&thread_list
);
4180 for (int i
= 0; i
< 2; ++i
) {
4181 // repeat the test with differet number of high / low priority threads
4182 const int kTestCount
= 3;
4183 const unsigned int kHighPriCounts
[kTestCount
] = {3, 2, 5};
4184 const unsigned int kLowPriCounts
[kTestCount
] = {10, 15, 3};
4185 const unsigned int kBottomPriCounts
[kTestCount
] = {2, 1, 4};
4186 for (int test
= 0; test
< kTestCount
; ++test
) {
4187 // Change the number of threads in high / low priority pool.
4188 env_
->SetBackgroundThreads(kHighPriCounts
[test
], Env::HIGH
);
4189 env_
->SetBackgroundThreads(kLowPriCounts
[test
], Env::LOW
);
4190 env_
->SetBackgroundThreads(kBottomPriCounts
[test
], Env::BOTTOM
);
4191 // Wait to ensure the all threads has been registered
4192 unsigned int thread_type_counts
[ThreadStatus::NUM_THREAD_TYPES
];
4193 // TODO(ajkr): it'd be better if SetBackgroundThreads returned only after
4194 // all threads have been registered.
4195 // Try up to 60 seconds.
4196 for (int num_try
= 0; num_try
< 60000; num_try
++) {
4197 env_
->SleepForMicroseconds(1000);
4198 thread_list
.clear();
4199 s
= env_
->GetThreadList(&thread_list
);
4201 memset(thread_type_counts
, 0, sizeof(thread_type_counts
));
4202 for (auto thread
: thread_list
) {
4203 ASSERT_LT(thread
.thread_type
, ThreadStatus::NUM_THREAD_TYPES
);
4204 thread_type_counts
[thread
.thread_type
]++;
4206 if (thread_type_counts
[ThreadStatus::HIGH_PRIORITY
] ==
4207 kHighPriCounts
[test
] &&
4208 thread_type_counts
[ThreadStatus::LOW_PRIORITY
] ==
4209 kLowPriCounts
[test
] &&
4210 thread_type_counts
[ThreadStatus::BOTTOM_PRIORITY
] ==
4211 kBottomPriCounts
[test
]) {
4215 // Verify the number of high-priority threads
4216 ASSERT_EQ(thread_type_counts
[ThreadStatus::HIGH_PRIORITY
],
4217 kHighPriCounts
[test
]);
4218 // Verify the number of low-priority threads
4219 ASSERT_EQ(thread_type_counts
[ThreadStatus::LOW_PRIORITY
],
4220 kLowPriCounts
[test
]);
4221 // Verify the number of bottom-priority threads
4222 ASSERT_EQ(thread_type_counts
[ThreadStatus::BOTTOM_PRIORITY
],
4223 kBottomPriCounts
[test
]);
4226 // repeat the test with multiple column families
4227 CreateAndReopenWithCF({"pikachu", "about-to-remove"}, options
);
4228 env_
->GetThreadStatusUpdater()->TEST_VerifyColumnFamilyInfoMap(handles_
,
4232 db_
->DropColumnFamily(handles_
[2]);
4234 handles_
.erase(handles_
.begin() + 2);
4235 env_
->GetThreadStatusUpdater()->TEST_VerifyColumnFamilyInfoMap(handles_
,
4238 env_
->GetThreadStatusUpdater()->TEST_VerifyColumnFamilyInfoMap(handles_
,
4242 TEST_F(DBTest
, DisableThreadStatus
) {
4245 options
.enable_thread_tracking
= false;
4247 CreateAndReopenWithCF({"pikachu", "about-to-remove"}, options
);
4248 // Verify non of the column family info exists
4249 env_
->GetThreadStatusUpdater()->TEST_VerifyColumnFamilyInfoMap(handles_
,
4253 TEST_F(DBTest
, ThreadStatusFlush
) {
4256 options
.write_buffer_size
= 100000; // Small write buffer
4257 options
.enable_thread_tracking
= true;
4258 options
= CurrentOptions(options
);
4260 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
4261 {"FlushJob::FlushJob()", "DBTest::ThreadStatusFlush:1"},
4262 {"DBTest::ThreadStatusFlush:2", "FlushJob::WriteLevel0Table"},
4264 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4266 CreateAndReopenWithCF({"pikachu"}, options
);
4267 VerifyOperationCount(env_
, ThreadStatus::OP_FLUSH
, 0);
4269 ASSERT_OK(Put(1, "foo", "v1"));
4270 ASSERT_EQ("v1", Get(1, "foo"));
4271 VerifyOperationCount(env_
, ThreadStatus::OP_FLUSH
, 0);
4273 uint64_t num_running_flushes
= 0;
4274 db_
->GetIntProperty(DB::Properties::kNumRunningFlushes
, &num_running_flushes
);
4275 ASSERT_EQ(num_running_flushes
, 0);
4277 Put(1, "k1", std::string(100000, 'x')); // Fill memtable
4278 Put(1, "k2", std::string(100000, 'y')); // Trigger flush
4280 // The first sync point is to make sure there's one flush job
4281 // running when we perform VerifyOperationCount().
4282 TEST_SYNC_POINT("DBTest::ThreadStatusFlush:1");
4283 VerifyOperationCount(env_
, ThreadStatus::OP_FLUSH
, 1);
4284 db_
->GetIntProperty(DB::Properties::kNumRunningFlushes
, &num_running_flushes
);
4285 ASSERT_EQ(num_running_flushes
, 1);
4286 // This second sync point is to ensure the flush job will not
4287 // be completed until we already perform VerifyOperationCount().
4288 TEST_SYNC_POINT("DBTest::ThreadStatusFlush:2");
4289 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4292 TEST_P(DBTestWithParam
, ThreadStatusSingleCompaction
) {
4293 const int kTestKeySize
= 16;
4294 const int kTestValueSize
= 984;
4295 const int kEntrySize
= kTestKeySize
+ kTestValueSize
;
4296 const int kEntriesPerBuffer
= 100;
4298 options
.create_if_missing
= true;
4299 options
.write_buffer_size
= kEntrySize
* kEntriesPerBuffer
;
4300 options
.compaction_style
= kCompactionStyleLevel
;
4301 options
.target_file_size_base
= options
.write_buffer_size
;
4302 options
.max_bytes_for_level_base
= options
.target_file_size_base
* 2;
4303 options
.max_bytes_for_level_multiplier
= 2;
4304 options
.compression
= kNoCompression
;
4305 options
= CurrentOptions(options
);
4307 options
.enable_thread_tracking
= true;
4308 const int kNumL0Files
= 4;
4309 options
.level0_file_num_compaction_trigger
= kNumL0Files
;
4310 options
.max_subcompactions
= max_subcompactions_
;
4312 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
4313 {"DBTest::ThreadStatusSingleCompaction:0", "DBImpl::BGWorkCompaction"},
4314 {"CompactionJob::Run():Start", "DBTest::ThreadStatusSingleCompaction:1"},
4315 {"DBTest::ThreadStatusSingleCompaction:2", "CompactionJob::Run():End"},
4317 for (int tests
= 0; tests
< 2; ++tests
) {
4318 DestroyAndReopen(options
);
4319 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
4320 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4324 for (int file
= 0; file
< kNumL0Files
; ++file
) {
4325 for (int key
= 0; key
< kEntriesPerBuffer
; ++key
) {
4326 ASSERT_OK(Put(ToString(key
+ file
* kEntriesPerBuffer
),
4327 rnd
.RandomString(kTestValueSize
)));
4331 // This makes sure a compaction won't be scheduled until
4332 // we have done with the above Put Phase.
4333 uint64_t num_running_compactions
= 0;
4334 db_
->GetIntProperty(DB::Properties::kNumRunningCompactions
,
4335 &num_running_compactions
);
4336 ASSERT_EQ(num_running_compactions
, 0);
4337 TEST_SYNC_POINT("DBTest::ThreadStatusSingleCompaction:0");
4338 ASSERT_GE(NumTableFilesAtLevel(0),
4339 options
.level0_file_num_compaction_trigger
);
4341 // This makes sure at least one compaction is running.
4342 TEST_SYNC_POINT("DBTest::ThreadStatusSingleCompaction:1");
4344 if (options
.enable_thread_tracking
) {
4345 // expecting one single L0 to L1 compaction
4346 VerifyOperationCount(env_
, ThreadStatus::OP_COMPACTION
, 1);
4348 // If thread tracking is not enabled, compaction count should be 0.
4349 VerifyOperationCount(env_
, ThreadStatus::OP_COMPACTION
, 0);
4351 db_
->GetIntProperty(DB::Properties::kNumRunningCompactions
,
4352 &num_running_compactions
);
4353 ASSERT_EQ(num_running_compactions
, 1);
4354 // TODO(yhchiang): adding assert to verify each compaction stage.
4355 TEST_SYNC_POINT("DBTest::ThreadStatusSingleCompaction:2");
4357 // repeat the test with disabling thread tracking.
4358 options
.enable_thread_tracking
= false;
4359 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4363 TEST_P(DBTestWithParam
, PreShutdownManualCompaction
) {
4364 Options options
= CurrentOptions();
4365 options
.max_subcompactions
= max_subcompactions_
;
4366 CreateAndReopenWithCF({"pikachu"}, options
);
4368 // iter - 0 with 7 levels
4369 // iter - 1 with 3 levels
4370 for (int iter
= 0; iter
< 2; ++iter
) {
4371 MakeTables(3, "p", "q", 1);
4372 ASSERT_EQ("1,1,1", FilesPerLevel(1));
4374 // Compaction range falls before files
4375 Compact(1, "", "c");
4376 ASSERT_EQ("1,1,1", FilesPerLevel(1));
4378 // Compaction range falls after files
4379 Compact(1, "r", "z");
4380 ASSERT_EQ("1,1,1", FilesPerLevel(1));
4382 // Compaction range overlaps files
4383 Compact(1, "p", "q");
4384 ASSERT_EQ("0,0,1", FilesPerLevel(1));
4386 // Populate a different range
4387 MakeTables(3, "c", "e", 1);
4388 ASSERT_EQ("1,1,2", FilesPerLevel(1));
4390 // Compact just the new range
4391 Compact(1, "b", "f");
4392 ASSERT_EQ("0,0,2", FilesPerLevel(1));
4395 MakeTables(1, "a", "z", 1);
4396 ASSERT_EQ("1,0,2", FilesPerLevel(1));
4397 CancelAllBackgroundWork(db_
);
4398 db_
->CompactRange(CompactRangeOptions(), handles_
[1], nullptr, nullptr);
4399 ASSERT_EQ("1,0,2", FilesPerLevel(1));
4402 options
= CurrentOptions();
4403 options
.num_levels
= 3;
4404 options
.create_if_missing
= true;
4405 DestroyAndReopen(options
);
4406 CreateAndReopenWithCF({"pikachu"}, options
);
4411 TEST_F(DBTest
, PreShutdownFlush
) {
4412 Options options
= CurrentOptions();
4413 CreateAndReopenWithCF({"pikachu"}, options
);
4414 ASSERT_OK(Put(1, "key", "value"));
4415 CancelAllBackgroundWork(db_
);
4417 db_
->CompactRange(CompactRangeOptions(), handles_
[1], nullptr, nullptr);
4418 ASSERT_TRUE(s
.IsShutdownInProgress());
4421 TEST_P(DBTestWithParam
, PreShutdownMultipleCompaction
) {
4422 const int kTestKeySize
= 16;
4423 const int kTestValueSize
= 984;
4424 const int kEntrySize
= kTestKeySize
+ kTestValueSize
;
4425 const int kEntriesPerBuffer
= 40;
4426 const int kNumL0Files
= 4;
4428 const int kHighPriCount
= 3;
4429 const int kLowPriCount
= 5;
4430 env_
->SetBackgroundThreads(kHighPriCount
, Env::HIGH
);
4431 env_
->SetBackgroundThreads(kLowPriCount
, Env::LOW
);
4434 options
.create_if_missing
= true;
4435 options
.write_buffer_size
= kEntrySize
* kEntriesPerBuffer
;
4436 options
.compaction_style
= kCompactionStyleLevel
;
4437 options
.target_file_size_base
= options
.write_buffer_size
;
4438 options
.max_bytes_for_level_base
=
4439 options
.target_file_size_base
* kNumL0Files
;
4440 options
.compression
= kNoCompression
;
4441 options
= CurrentOptions(options
);
4443 options
.enable_thread_tracking
= true;
4444 options
.level0_file_num_compaction_trigger
= kNumL0Files
;
4445 options
.max_bytes_for_level_multiplier
= 2;
4446 options
.max_background_compactions
= kLowPriCount
;
4447 options
.level0_stop_writes_trigger
= 1 << 10;
4448 options
.level0_slowdown_writes_trigger
= 1 << 10;
4449 options
.max_subcompactions
= max_subcompactions_
;
4454 std::vector
<ThreadStatus
> thread_list
;
4455 // Delay both flush and compaction
4456 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
4457 {{"FlushJob::FlushJob()", "CompactionJob::Run():Start"},
4458 {"CompactionJob::Run():Start",
4459 "DBTest::PreShutdownMultipleCompaction:Preshutdown"},
4460 {"CompactionJob::Run():Start",
4461 "DBTest::PreShutdownMultipleCompaction:VerifyCompaction"},
4462 {"DBTest::PreShutdownMultipleCompaction:Preshutdown",
4463 "CompactionJob::Run():End"},
4464 {"CompactionJob::Run():End",
4465 "DBTest::PreShutdownMultipleCompaction:VerifyPreshutdown"}});
4467 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4469 // Make rocksdb busy
4471 // check how many threads are doing compaction using GetThreadList
4472 int operation_count
[ThreadStatus::NUM_OP_TYPES
] = {0};
4473 for (int file
= 0; file
< 16 * kNumL0Files
; ++file
) {
4474 for (int k
= 0; k
< kEntriesPerBuffer
; ++k
) {
4475 ASSERT_OK(Put(ToString(key
++), rnd
.RandomString(kTestValueSize
)));
4478 Status s
= env_
->GetThreadList(&thread_list
);
4479 for (auto thread
: thread_list
) {
4480 operation_count
[thread
.operation_type
]++;
4483 // Speed up the test
4484 if (operation_count
[ThreadStatus::OP_FLUSH
] > 1 &&
4485 operation_count
[ThreadStatus::OP_COMPACTION
] >
4486 0.6 * options
.max_background_compactions
) {
4489 if (file
== 15 * kNumL0Files
) {
4490 TEST_SYNC_POINT("DBTest::PreShutdownMultipleCompaction:Preshutdown");
4494 TEST_SYNC_POINT("DBTest::PreShutdownMultipleCompaction:Preshutdown");
4495 ASSERT_GE(operation_count
[ThreadStatus::OP_COMPACTION
], 1);
4496 CancelAllBackgroundWork(db_
);
4497 TEST_SYNC_POINT("DBTest::PreShutdownMultipleCompaction:VerifyPreshutdown");
4498 dbfull()->TEST_WaitForCompact();
4499 // Record the number of compactions at a time.
4500 for (int i
= 0; i
< ThreadStatus::NUM_OP_TYPES
; ++i
) {
4501 operation_count
[i
] = 0;
4503 Status s
= env_
->GetThreadList(&thread_list
);
4504 for (auto thread
: thread_list
) {
4505 operation_count
[thread
.operation_type
]++;
4507 ASSERT_EQ(operation_count
[ThreadStatus::OP_COMPACTION
], 0);
4510 TEST_P(DBTestWithParam
, PreShutdownCompactionMiddle
) {
4511 const int kTestKeySize
= 16;
4512 const int kTestValueSize
= 984;
4513 const int kEntrySize
= kTestKeySize
+ kTestValueSize
;
4514 const int kEntriesPerBuffer
= 40;
4515 const int kNumL0Files
= 4;
4517 const int kHighPriCount
= 3;
4518 const int kLowPriCount
= 5;
4519 env_
->SetBackgroundThreads(kHighPriCount
, Env::HIGH
);
4520 env_
->SetBackgroundThreads(kLowPriCount
, Env::LOW
);
4523 options
.create_if_missing
= true;
4524 options
.write_buffer_size
= kEntrySize
* kEntriesPerBuffer
;
4525 options
.compaction_style
= kCompactionStyleLevel
;
4526 options
.target_file_size_base
= options
.write_buffer_size
;
4527 options
.max_bytes_for_level_base
=
4528 options
.target_file_size_base
* kNumL0Files
;
4529 options
.compression
= kNoCompression
;
4530 options
= CurrentOptions(options
);
4532 options
.enable_thread_tracking
= true;
4533 options
.level0_file_num_compaction_trigger
= kNumL0Files
;
4534 options
.max_bytes_for_level_multiplier
= 2;
4535 options
.max_background_compactions
= kLowPriCount
;
4536 options
.level0_stop_writes_trigger
= 1 << 10;
4537 options
.level0_slowdown_writes_trigger
= 1 << 10;
4538 options
.max_subcompactions
= max_subcompactions_
;
4543 std::vector
<ThreadStatus
> thread_list
;
4544 // Delay both flush and compaction
4545 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
4546 {{"DBTest::PreShutdownCompactionMiddle:Preshutdown",
4547 "CompactionJob::Run():Inprogress"},
4548 {"CompactionJob::Run():Start",
4549 "DBTest::PreShutdownCompactionMiddle:VerifyCompaction"},
4550 {"CompactionJob::Run():Inprogress", "CompactionJob::Run():End"},
4551 {"CompactionJob::Run():End",
4552 "DBTest::PreShutdownCompactionMiddle:VerifyPreshutdown"}});
4554 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4556 // Make rocksdb busy
4558 // check how many threads are doing compaction using GetThreadList
4559 int operation_count
[ThreadStatus::NUM_OP_TYPES
] = {0};
4560 for (int file
= 0; file
< 16 * kNumL0Files
; ++file
) {
4561 for (int k
= 0; k
< kEntriesPerBuffer
; ++k
) {
4562 ASSERT_OK(Put(ToString(key
++), rnd
.RandomString(kTestValueSize
)));
4565 Status s
= env_
->GetThreadList(&thread_list
);
4566 for (auto thread
: thread_list
) {
4567 operation_count
[thread
.operation_type
]++;
4570 // Speed up the test
4571 if (operation_count
[ThreadStatus::OP_FLUSH
] > 1 &&
4572 operation_count
[ThreadStatus::OP_COMPACTION
] >
4573 0.6 * options
.max_background_compactions
) {
4576 if (file
== 15 * kNumL0Files
) {
4577 TEST_SYNC_POINT("DBTest::PreShutdownCompactionMiddle:VerifyCompaction");
4581 ASSERT_GE(operation_count
[ThreadStatus::OP_COMPACTION
], 1);
4582 CancelAllBackgroundWork(db_
);
4583 TEST_SYNC_POINT("DBTest::PreShutdownCompactionMiddle:Preshutdown");
4584 TEST_SYNC_POINT("DBTest::PreShutdownCompactionMiddle:VerifyPreshutdown");
4585 dbfull()->TEST_WaitForCompact();
4586 // Record the number of compactions at a time.
4587 for (int i
= 0; i
< ThreadStatus::NUM_OP_TYPES
; ++i
) {
4588 operation_count
[i
] = 0;
4590 Status s
= env_
->GetThreadList(&thread_list
);
4591 for (auto thread
: thread_list
) {
4592 operation_count
[thread
.operation_type
]++;
4594 ASSERT_EQ(operation_count
[ThreadStatus::OP_COMPACTION
], 0);
4597 #endif // ROCKSDB_USING_THREAD_STATUS
4599 #ifndef ROCKSDB_LITE
4600 TEST_F(DBTest
, FlushOnDestroy
) {
4602 wo
.disableWAL
= true;
4603 ASSERT_OK(Put("foo", "v1", wo
));
4604 CancelAllBackgroundWork(db_
);
4607 TEST_F(DBTest
, DynamicLevelCompressionPerLevel
) {
4608 if (!Snappy_Supported()) {
4611 const int kNKeys
= 120;
4613 for (int i
= 0; i
< kNKeys
; i
++) {
4616 RandomShuffle(std::begin(keys
), std::end(keys
));
4621 options
.create_if_missing
= true;
4622 options
.db_write_buffer_size
= 20480;
4623 options
.write_buffer_size
= 20480;
4624 options
.max_write_buffer_number
= 2;
4625 options
.level0_file_num_compaction_trigger
= 2;
4626 options
.level0_slowdown_writes_trigger
= 2;
4627 options
.level0_stop_writes_trigger
= 2;
4628 options
.target_file_size_base
= 20480;
4629 options
.level_compaction_dynamic_level_bytes
= true;
4630 options
.max_bytes_for_level_base
= 102400;
4631 options
.max_bytes_for_level_multiplier
= 4;
4632 options
.max_background_compactions
= 1;
4633 options
.num_levels
= 5;
4635 options
.compression_per_level
.resize(3);
4636 options
.compression_per_level
[0] = kNoCompression
;
4637 options
.compression_per_level
[1] = kNoCompression
;
4638 options
.compression_per_level
[2] = kSnappyCompression
;
4640 OnFileDeletionListener
* listener
= new OnFileDeletionListener();
4641 options
.listeners
.emplace_back(listener
);
4643 DestroyAndReopen(options
);
4645 // Insert more than 80K. L4 should be base level. Neither L0 nor L4 should
4646 // be compressed, so total data size should be more than 80K.
4647 for (int i
= 0; i
< 20; i
++) {
4648 ASSERT_OK(Put(Key(keys
[i
]), CompressibleString(&rnd
, 4000)));
4651 dbfull()->TEST_WaitForCompact();
4653 ASSERT_EQ(NumTableFilesAtLevel(1), 0);
4654 ASSERT_EQ(NumTableFilesAtLevel(2), 0);
4655 ASSERT_EQ(NumTableFilesAtLevel(3), 0);
4656 // Assuming each files' metadata is at least 50 bytes/
4657 ASSERT_GT(SizeAtLevel(0) + SizeAtLevel(4), 20U * 4000U + 50U * 4);
4659 // Insert 400KB. Some data will be compressed
4660 for (int i
= 21; i
< 120; i
++) {
4661 ASSERT_OK(Put(Key(keys
[i
]), CompressibleString(&rnd
, 4000)));
4664 dbfull()->TEST_WaitForCompact();
4665 ASSERT_EQ(NumTableFilesAtLevel(1), 0);
4666 ASSERT_EQ(NumTableFilesAtLevel(2), 0);
4667 ASSERT_LT(SizeAtLevel(0) + SizeAtLevel(3) + SizeAtLevel(4),
4668 120U * 4000U + 50U * 24);
4669 // Make sure data in files in L3 is not compacted by removing all files
4670 // in L4 and calculate number of rows
4671 ASSERT_OK(dbfull()->SetOptions({
4672 {"disable_auto_compactions", "true"},
4674 ColumnFamilyMetaData cf_meta
;
4675 db_
->GetColumnFamilyMetaData(&cf_meta
);
4676 for (auto file
: cf_meta
.levels
[4].files
) {
4677 listener
->SetExpectedFileName(dbname_
+ file
.name
);
4678 ASSERT_OK(dbfull()->DeleteFile(file
.name
));
4680 listener
->VerifyMatchedCount(cf_meta
.levels
[4].files
.size());
4683 std::unique_ptr
<Iterator
> iter(db_
->NewIterator(ReadOptions()));
4684 for (iter
->SeekToFirst(); iter
->Valid(); iter
->Next()) {
4687 ASSERT_OK(iter
->status());
4688 ASSERT_GT(SizeAtLevel(0) + SizeAtLevel(3), num_keys
* 4000U + num_keys
* 10U);
4691 TEST_F(DBTest
, DynamicLevelCompressionPerLevel2
) {
4692 if (!Snappy_Supported() || !LZ4_Supported() || !Zlib_Supported()) {
4695 const int kNKeys
= 500;
4697 for (int i
= 0; i
< kNKeys
; i
++) {
4700 RandomShuffle(std::begin(keys
), std::end(keys
));
4704 options
.create_if_missing
= true;
4705 options
.db_write_buffer_size
= 6000000;
4706 options
.write_buffer_size
= 600000;
4707 options
.max_write_buffer_number
= 2;
4708 options
.level0_file_num_compaction_trigger
= 2;
4709 options
.level0_slowdown_writes_trigger
= 2;
4710 options
.level0_stop_writes_trigger
= 2;
4711 options
.soft_pending_compaction_bytes_limit
= 1024 * 1024;
4712 options
.target_file_size_base
= 20;
4714 options
.level_compaction_dynamic_level_bytes
= true;
4715 options
.max_bytes_for_level_base
= 200;
4716 options
.max_bytes_for_level_multiplier
= 8;
4717 options
.max_background_compactions
= 1;
4718 options
.num_levels
= 5;
4719 std::shared_ptr
<mock::MockTableFactory
> mtf(new mock::MockTableFactory
);
4720 options
.table_factory
= mtf
;
4722 options
.compression_per_level
.resize(3);
4723 options
.compression_per_level
[0] = kNoCompression
;
4724 options
.compression_per_level
[1] = kLZ4Compression
;
4725 options
.compression_per_level
[2] = kZlibCompression
;
4727 DestroyAndReopen(options
);
4728 // When base level is L4, L4 is LZ4.
4729 std::atomic
<int> num_zlib(0);
4730 std::atomic
<int> num_lz4(0);
4731 std::atomic
<int> num_no(0);
4732 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4733 "LevelCompactionPicker::PickCompaction:Return", [&](void* arg
) {
4734 Compaction
* compaction
= reinterpret_cast<Compaction
*>(arg
);
4735 if (compaction
->output_level() == 4) {
4736 ASSERT_TRUE(compaction
->output_compression() == kLZ4Compression
);
4737 num_lz4
.fetch_add(1);
4740 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4741 "FlushJob::WriteLevel0Table:output_compression", [&](void* arg
) {
4742 auto* compression
= reinterpret_cast<CompressionType
*>(arg
);
4743 ASSERT_TRUE(*compression
== kNoCompression
);
4744 num_no
.fetch_add(1);
4746 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4748 for (int i
= 0; i
< 100; i
++) {
4749 std::string value
= rnd
.RandomString(200);
4750 ASSERT_OK(Put(Key(keys
[i
]), value
));
4753 dbfull()->TEST_WaitForCompact();
4758 dbfull()->TEST_WaitForFlushMemTable();
4759 dbfull()->TEST_WaitForCompact();
4760 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4761 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
4763 ASSERT_EQ(NumTableFilesAtLevel(1), 0);
4764 ASSERT_EQ(NumTableFilesAtLevel(2), 0);
4765 ASSERT_EQ(NumTableFilesAtLevel(3), 0);
4766 ASSERT_GT(NumTableFilesAtLevel(4), 0);
4767 ASSERT_GT(num_no
.load(), 2);
4768 ASSERT_GT(num_lz4
.load(), 0);
4769 int prev_num_files_l4
= NumTableFilesAtLevel(4);
4771 // After base level turn L4->L3, L3 becomes LZ4 and L4 becomes Zlib
4774 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4775 "LevelCompactionPicker::PickCompaction:Return", [&](void* arg
) {
4776 Compaction
* compaction
= reinterpret_cast<Compaction
*>(arg
);
4777 if (compaction
->output_level() == 4 && compaction
->start_level() == 3) {
4778 ASSERT_TRUE(compaction
->output_compression() == kZlibCompression
);
4779 num_zlib
.fetch_add(1);
4781 ASSERT_TRUE(compaction
->output_compression() == kLZ4Compression
);
4782 num_lz4
.fetch_add(1);
4785 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4786 "FlushJob::WriteLevel0Table:output_compression", [&](void* arg
) {
4787 auto* compression
= reinterpret_cast<CompressionType
*>(arg
);
4788 ASSERT_TRUE(*compression
== kNoCompression
);
4789 num_no
.fetch_add(1);
4791 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4793 for (int i
= 101; i
< 500; i
++) {
4794 std::string value
= rnd
.RandomString(200);
4795 ASSERT_OK(Put(Key(keys
[i
]), value
));
4796 if (i
% 100 == 99) {
4798 dbfull()->TEST_WaitForCompact();
4802 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
4803 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4804 ASSERT_EQ(NumTableFilesAtLevel(1), 0);
4805 ASSERT_EQ(NumTableFilesAtLevel(2), 0);
4806 ASSERT_GT(NumTableFilesAtLevel(3), 0);
4807 ASSERT_GT(NumTableFilesAtLevel(4), prev_num_files_l4
);
4808 ASSERT_GT(num_no
.load(), 2);
4809 ASSERT_GT(num_lz4
.load(), 0);
4810 ASSERT_GT(num_zlib
.load(), 0);
4813 TEST_F(DBTest
, DynamicCompactionOptions
) {
4814 // minimum write buffer size is enforced at 64KB
4815 const uint64_t k32KB
= 1 << 15;
4816 const uint64_t k64KB
= 1 << 16;
4817 const uint64_t k128KB
= 1 << 17;
4818 const uint64_t k1MB
= 1 << 20;
4819 const uint64_t k4KB
= 1 << 12;
4822 options
.create_if_missing
= true;
4823 options
.compression
= kNoCompression
;
4824 options
.soft_pending_compaction_bytes_limit
= 1024 * 1024;
4825 options
.write_buffer_size
= k64KB
;
4826 options
.arena_block_size
= 4 * k4KB
;
4827 options
.max_write_buffer_number
= 2;
4828 // Compaction related options
4829 options
.level0_file_num_compaction_trigger
= 3;
4830 options
.level0_slowdown_writes_trigger
= 4;
4831 options
.level0_stop_writes_trigger
= 8;
4832 options
.target_file_size_base
= k64KB
;
4833 options
.max_compaction_bytes
= options
.target_file_size_base
* 10;
4834 options
.target_file_size_multiplier
= 1;
4835 options
.max_bytes_for_level_base
= k128KB
;
4836 options
.max_bytes_for_level_multiplier
= 4;
4838 // Block flush thread and disable compaction thread
4839 env_
->SetBackgroundThreads(1, Env::LOW
);
4840 env_
->SetBackgroundThreads(1, Env::HIGH
);
4841 DestroyAndReopen(options
);
4843 auto gen_l0_kb
= [this](int start
, int size
, int stride
) {
4845 for (int i
= 0; i
< size
; i
++) {
4846 ASSERT_OK(Put(Key(start
+ stride
* i
), rnd
.RandomString(1024)));
4848 dbfull()->TEST_WaitForFlushMemTable();
4851 // Write 3 files that have the same key range.
4852 // Since level0_file_num_compaction_trigger is 3, compaction should be
4853 // triggered. The compaction should result in one L1 file
4854 gen_l0_kb(0, 64, 1);
4855 ASSERT_EQ(NumTableFilesAtLevel(0), 1);
4856 gen_l0_kb(0, 64, 1);
4857 ASSERT_EQ(NumTableFilesAtLevel(0), 2);
4858 gen_l0_kb(0, 64, 1);
4859 dbfull()->TEST_WaitForCompact();
4860 ASSERT_EQ("0,1", FilesPerLevel());
4861 std::vector
<LiveFileMetaData
> metadata
;
4862 db_
->GetLiveFilesMetaData(&metadata
);
4863 ASSERT_EQ(1U, metadata
.size());
4864 ASSERT_LE(metadata
[0].size
, k64KB
+ k4KB
);
4865 ASSERT_GE(metadata
[0].size
, k64KB
- k4KB
);
4867 // Test compaction trigger and target_file_size_base
4868 // Reduce compaction trigger to 2, and reduce L1 file size to 32KB.
4869 // Writing to 64KB L0 files should trigger a compaction. Since these
4870 // 2 L0 files have the same key range, compaction merge them and should
4871 // result in 2 32KB L1 files.
4872 ASSERT_OK(dbfull()->SetOptions({{"level0_file_num_compaction_trigger", "2"},
4873 {"target_file_size_base", ToString(k32KB
)}}));
4875 gen_l0_kb(0, 64, 1);
4876 ASSERT_EQ("1,1", FilesPerLevel());
4877 gen_l0_kb(0, 64, 1);
4878 dbfull()->TEST_WaitForCompact();
4879 ASSERT_EQ("0,2", FilesPerLevel());
4881 db_
->GetLiveFilesMetaData(&metadata
);
4882 ASSERT_EQ(2U, metadata
.size());
4883 ASSERT_LE(metadata
[0].size
, k32KB
+ k4KB
);
4884 ASSERT_GE(metadata
[0].size
, k32KB
- k4KB
);
4885 ASSERT_LE(metadata
[1].size
, k32KB
+ k4KB
);
4886 ASSERT_GE(metadata
[1].size
, k32KB
- k4KB
);
4888 // Test max_bytes_for_level_base
4889 // Increase level base size to 256KB and write enough data that will
4890 // fill L1 and L2. L1 size should be around 256KB while L2 size should be
4891 // around 256KB x 4.
4893 dbfull()->SetOptions({{"max_bytes_for_level_base", ToString(k1MB
)}}));
4895 // writing 96 x 64KB => 6 * 1024KB
4896 // (L1 + L2) = (1 + 4) * 1024KB
4897 for (int i
= 0; i
< 96; ++i
) {
4898 gen_l0_kb(i
, 64, 96);
4900 dbfull()->TEST_WaitForCompact();
4901 ASSERT_GT(SizeAtLevel(1), k1MB
/ 2);
4902 ASSERT_LT(SizeAtLevel(1), k1MB
+ k1MB
/ 2);
4904 // Within (0.5, 1.5) of 4MB.
4905 ASSERT_GT(SizeAtLevel(2), 2 * k1MB
);
4906 ASSERT_LT(SizeAtLevel(2), 6 * k1MB
);
4908 // Test max_bytes_for_level_multiplier and
4909 // max_bytes_for_level_base. Now, reduce both mulitplier and level base,
4910 // After filling enough data that can fit in L1 - L3, we should see L1 size
4911 // reduces to 128KB from 256KB which was asserted previously. Same for L2.
4913 dbfull()->SetOptions({{"max_bytes_for_level_multiplier", "2"},
4914 {"max_bytes_for_level_base", ToString(k128KB
)}}));
4916 // writing 20 x 64KB = 10 x 128KB
4917 // (L1 + L2 + L3) = (1 + 2 + 4) * 128KB
4918 for (int i
= 0; i
< 20; ++i
) {
4919 gen_l0_kb(i
, 64, 32);
4921 dbfull()->TEST_WaitForCompact();
4922 uint64_t total_size
= SizeAtLevel(1) + SizeAtLevel(2) + SizeAtLevel(3);
4923 ASSERT_TRUE(total_size
< k128KB
* 7 * 1.5);
4925 // Test level0_stop_writes_trigger.
4926 // Clean up memtable and L0. Block compaction threads. If continue to write
4927 // and flush memtables. We should see put stop after 8 memtable flushes
4928 // since level0_stop_writes_trigger = 8
4929 dbfull()->TEST_FlushMemTable(true, true);
4930 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
4932 test::SleepingBackgroundTask sleeping_task_low
;
4933 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
4934 Env::Priority::LOW
);
4935 sleeping_task_low
.WaitUntilSleeping();
4936 ASSERT_EQ(NumTableFilesAtLevel(0), 0);
4940 while (count
< 64) {
4941 ASSERT_OK(Put(Key(count
), rnd
.RandomString(1024), wo
));
4942 dbfull()->TEST_FlushMemTable(true, true);
4944 if (dbfull()->TEST_write_controler().IsStopped()) {
4945 sleeping_task_low
.WakeUp();
4950 ASSERT_EQ(count
, 8);
4952 sleeping_task_low
.WaitUntilDone();
4954 // Now reduce level0_stop_writes_trigger to 6. Clear up memtables and L0.
4955 // Block compaction thread again. Perform the put and memtable flushes
4956 // until we see the stop after 6 memtable flushes.
4957 ASSERT_OK(dbfull()->SetOptions({{"level0_stop_writes_trigger", "6"}}));
4958 dbfull()->TEST_FlushMemTable(true);
4959 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
4960 ASSERT_EQ(NumTableFilesAtLevel(0), 0);
4962 // Block compaction again
4963 sleeping_task_low
.Reset();
4964 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
4965 Env::Priority::LOW
);
4966 sleeping_task_low
.WaitUntilSleeping();
4968 while (count
< 64) {
4969 ASSERT_OK(Put(Key(count
), rnd
.RandomString(1024), wo
));
4970 dbfull()->TEST_FlushMemTable(true, true);
4972 if (dbfull()->TEST_write_controler().IsStopped()) {
4973 sleeping_task_low
.WakeUp();
4977 ASSERT_EQ(count
, 6);
4979 sleeping_task_low
.WaitUntilDone();
4981 // Test disable_auto_compactions
4982 // Compaction thread is unblocked but auto compaction is disabled. Write
4983 // 4 L0 files and compaction should be triggered. If auto compaction is
4984 // disabled, then TEST_WaitForCompact will be waiting for nothing. Number of
4985 // L0 files do not change after the call.
4986 ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "true"}}));
4987 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
4988 ASSERT_EQ(NumTableFilesAtLevel(0), 0);
4990 for (int i
= 0; i
< 4; ++i
) {
4991 ASSERT_OK(Put(Key(i
), rnd
.RandomString(1024)));
4992 // Wait for compaction so that put won't stop
4993 dbfull()->TEST_FlushMemTable(true);
4995 dbfull()->TEST_WaitForCompact();
4996 ASSERT_EQ(NumTableFilesAtLevel(0), 4);
4998 // Enable auto compaction and perform the same test, # of L0 files should be
4999 // reduced after compaction.
5000 ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
5001 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
5002 ASSERT_EQ(NumTableFilesAtLevel(0), 0);
5004 for (int i
= 0; i
< 4; ++i
) {
5005 ASSERT_OK(Put(Key(i
), rnd
.RandomString(1024)));
5006 // Wait for compaction so that put won't stop
5007 dbfull()->TEST_FlushMemTable(true);
5009 dbfull()->TEST_WaitForCompact();
5010 ASSERT_LT(NumTableFilesAtLevel(0), 4);
5013 // Test dynamic FIFO compaction options.
5014 // This test covers just option parsing and makes sure that the options are
5015 // correctly assigned. Also look at DBOptionsTest.SetFIFOCompactionOptions
5016 // test which makes sure that the FIFO compaction funcionality is working
5017 // as expected on dynamically changing the options.
5018 // Even more FIFOCompactionTests are at DBTest.FIFOCompaction* .
5019 TEST_F(DBTest
, DynamicFIFOCompactionOptions
) {
5022 options
.create_if_missing
= true;
5024 DestroyAndReopen(options
);
5027 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo
.max_table_files_size
,
5028 1024 * 1024 * 1024);
5029 ASSERT_EQ(dbfull()->GetOptions().ttl
, 0);
5030 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo
.allow_compaction
,
5033 ASSERT_OK(dbfull()->SetOptions(
5034 {{"compaction_options_fifo", "{max_table_files_size=23;}"}}));
5035 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo
.max_table_files_size
,
5037 ASSERT_EQ(dbfull()->GetOptions().ttl
, 0);
5038 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo
.allow_compaction
,
5041 ASSERT_OK(dbfull()->SetOptions({{"ttl", "97"}}));
5042 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo
.max_table_files_size
,
5044 ASSERT_EQ(dbfull()->GetOptions().ttl
, 97);
5045 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo
.allow_compaction
,
5048 ASSERT_OK(dbfull()->SetOptions({{"ttl", "203"}}));
5049 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo
.max_table_files_size
,
5051 ASSERT_EQ(dbfull()->GetOptions().ttl
, 203);
5052 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo
.allow_compaction
,
5055 ASSERT_OK(dbfull()->SetOptions(
5056 {{"compaction_options_fifo", "{allow_compaction=true;}"}}));
5057 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo
.max_table_files_size
,
5059 ASSERT_EQ(dbfull()->GetOptions().ttl
, 203);
5060 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo
.allow_compaction
,
5063 ASSERT_OK(dbfull()->SetOptions(
5064 {{"compaction_options_fifo", "{max_table_files_size=31;}"}}));
5065 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo
.max_table_files_size
,
5067 ASSERT_EQ(dbfull()->GetOptions().ttl
, 203);
5068 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo
.allow_compaction
,
5071 ASSERT_OK(dbfull()->SetOptions(
5072 {{"compaction_options_fifo",
5073 "{max_table_files_size=51;allow_compaction=true;}"}}));
5074 ASSERT_OK(dbfull()->SetOptions({{"ttl", "49"}}));
5075 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo
.max_table_files_size
,
5077 ASSERT_EQ(dbfull()->GetOptions().ttl
, 49);
5078 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo
.allow_compaction
,
5082 TEST_F(DBTest
, DynamicUniversalCompactionOptions
) {
5084 options
.create_if_missing
= true;
5086 DestroyAndReopen(options
);
5089 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal
.size_ratio
, 1U);
5090 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal
.min_merge_width
,
5092 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal
.max_merge_width
,
5096 .compaction_options_universal
.max_size_amplification_percent
,
5100 .compaction_options_universal
.compression_size_percent
,
5102 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal
.stop_style
,
5103 kCompactionStopStyleTotalSize
);
5105 dbfull()->GetOptions().compaction_options_universal
.allow_trivial_move
,
5108 ASSERT_OK(dbfull()->SetOptions(
5109 {{"compaction_options_universal", "{size_ratio=7;}"}}));
5110 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal
.size_ratio
, 7u);
5111 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal
.min_merge_width
,
5113 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal
.max_merge_width
,
5117 .compaction_options_universal
.max_size_amplification_percent
,
5121 .compaction_options_universal
.compression_size_percent
,
5123 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal
.stop_style
,
5124 kCompactionStopStyleTotalSize
);
5126 dbfull()->GetOptions().compaction_options_universal
.allow_trivial_move
,
5129 ASSERT_OK(dbfull()->SetOptions(
5130 {{"compaction_options_universal", "{min_merge_width=11;}"}}));
5131 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal
.size_ratio
, 7u);
5132 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal
.min_merge_width
,
5134 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal
.max_merge_width
,
5138 .compaction_options_universal
.max_size_amplification_percent
,
5142 .compaction_options_universal
.compression_size_percent
,
5144 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal
.stop_style
,
5145 kCompactionStopStyleTotalSize
);
5147 dbfull()->GetOptions().compaction_options_universal
.allow_trivial_move
,
5150 #endif // ROCKSDB_LITE
5152 TEST_F(DBTest
, FileCreationRandomFailure
) {
5155 options
.create_if_missing
= true;
5156 options
.write_buffer_size
= 100000; // Small write buffer
5157 options
.target_file_size_base
= 200000;
5158 options
.max_bytes_for_level_base
= 1000000;
5159 options
.max_bytes_for_level_multiplier
= 2;
5161 DestroyAndReopen(options
);
5164 constexpr int kCDTKeysPerBuffer
= 4;
5165 constexpr int kTestSize
= kCDTKeysPerBuffer
* 4096;
5166 constexpr int kTotalIteration
= 20;
5167 // the second half of the test involves in random failure
5168 // of file creation.
5169 constexpr int kRandomFailureTest
= kTotalIteration
/ 2;
5171 std::vector
<std::string
> values
;
5172 for (int i
= 0; i
< kTestSize
; ++i
) {
5173 values
.push_back("NOT_FOUND");
5175 for (int j
= 0; j
< kTotalIteration
; ++j
) {
5176 if (j
== kRandomFailureTest
) {
5177 env_
->non_writeable_rate_
.store(90);
5179 for (int k
= 0; k
< kTestSize
; ++k
) {
5180 // here we expect some of the Put fails.
5181 std::string value
= rnd
.RandomString(100);
5182 Status s
= Put(Key(k
), Slice(value
));
5184 // update the latest successful put
5187 // But everything before we simulate the failure-test should succeed.
5188 if (j
< kRandomFailureTest
) {
5194 // If rocksdb does not do the correct job, internal assert will fail here.
5195 dbfull()->TEST_WaitForFlushMemTable();
5196 dbfull()->TEST_WaitForCompact();
5198 // verify we have the latest successful update
5199 for (int k
= 0; k
< kTestSize
; ++k
) {
5200 auto v
= Get(Key(k
));
5201 ASSERT_EQ(v
, values
[k
]);
5204 // reopen and reverify we have the latest successful update
5205 env_
->non_writeable_rate_
.store(0);
5207 for (int k
= 0; k
< kTestSize
; ++k
) {
5208 auto v
= Get(Key(k
));
5209 ASSERT_EQ(v
, values
[k
]);
5213 #ifndef ROCKSDB_LITE
5215 TEST_F(DBTest
, DynamicMiscOptions
) {
5216 // Test max_sequential_skip_in_iterations
5219 options
.create_if_missing
= true;
5220 options
.max_sequential_skip_in_iterations
= 16;
5221 options
.compression
= kNoCompression
;
5222 options
.statistics
= ROCKSDB_NAMESPACE::CreateDBStatistics();
5223 DestroyAndReopen(options
);
5225 auto assert_reseek_count
= [this, &options
](int key_start
, int num_reseek
) {
5226 int key0
= key_start
;
5227 int key1
= key_start
+ 1;
5228 int key2
= key_start
+ 2;
5230 ASSERT_OK(Put(Key(key0
), rnd
.RandomString(8)));
5231 for (int i
= 0; i
< 10; ++i
) {
5232 ASSERT_OK(Put(Key(key1
), rnd
.RandomString(8)));
5234 ASSERT_OK(Put(Key(key2
), rnd
.RandomString(8)));
5235 std::unique_ptr
<Iterator
> iter(db_
->NewIterator(ReadOptions()));
5236 iter
->Seek(Key(key1
));
5237 ASSERT_TRUE(iter
->Valid());
5238 ASSERT_EQ(iter
->key().compare(Key(key1
)), 0);
5240 ASSERT_TRUE(iter
->Valid());
5241 ASSERT_EQ(iter
->key().compare(Key(key2
)), 0);
5242 ASSERT_EQ(num_reseek
,
5243 TestGetTickerCount(options
, NUMBER_OF_RESEEKS_IN_ITERATION
));
5246 assert_reseek_count(100, 0);
5248 ASSERT_OK(dbfull()->SetOptions({{"max_sequential_skip_in_iterations", "4"}}));
5249 // Clear memtable and make new option effective
5250 dbfull()->TEST_FlushMemTable(true);
5252 assert_reseek_count(200, 1);
5255 dbfull()->SetOptions({{"max_sequential_skip_in_iterations", "16"}}));
5256 // Clear memtable and make new option effective
5257 dbfull()->TEST_FlushMemTable(true);
5259 assert_reseek_count(300, 1);
5261 MutableCFOptions mutable_cf_options
;
5262 CreateAndReopenWithCF({"pikachu"}, options
);
5263 // Test soft_pending_compaction_bytes_limit,
5264 // hard_pending_compaction_bytes_limit
5265 ASSERT_OK(dbfull()->SetOptions(
5266 handles_
[1], {{"soft_pending_compaction_bytes_limit", "200"},
5267 {"hard_pending_compaction_bytes_limit", "300"}}));
5268 ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_
[1],
5269 &mutable_cf_options
));
5270 ASSERT_EQ(200, mutable_cf_options
.soft_pending_compaction_bytes_limit
);
5271 ASSERT_EQ(300, mutable_cf_options
.hard_pending_compaction_bytes_limit
);
5272 // Test report_bg_io_stats
5274 dbfull()->SetOptions(handles_
[1], {{"report_bg_io_stats", "true"}}));
5276 ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_
[1],
5277 &mutable_cf_options
));
5278 ASSERT_TRUE(mutable_cf_options
.report_bg_io_stats
);
5281 ASSERT_OK(dbfull()->SetOptions({{"compression", "kNoCompression"}}));
5282 ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_
[0],
5283 &mutable_cf_options
));
5284 ASSERT_EQ(CompressionType::kNoCompression
, mutable_cf_options
.compression
);
5286 if (Snappy_Supported()) {
5287 ASSERT_OK(dbfull()->SetOptions({{"compression", "kSnappyCompression"}}));
5288 ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_
[0],
5289 &mutable_cf_options
));
5290 ASSERT_EQ(CompressionType::kSnappyCompression
,
5291 mutable_cf_options
.compression
);
5294 // Test paranoid_file_checks already done in db_block_cache_test
5296 dbfull()->SetOptions(handles_
[1], {{"paranoid_file_checks", "true"}}));
5297 ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_
[1],
5298 &mutable_cf_options
));
5299 ASSERT_TRUE(mutable_cf_options
.report_bg_io_stats
);
5300 ASSERT_TRUE(mutable_cf_options
.check_flush_compaction_key_order
);
5302 ASSERT_OK(dbfull()->SetOptions(
5303 handles_
[1], {{"check_flush_compaction_key_order", "false"}}));
5304 ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_
[1],
5305 &mutable_cf_options
));
5306 ASSERT_FALSE(mutable_cf_options
.check_flush_compaction_key_order
);
5308 #endif // ROCKSDB_LITE
5310 TEST_F(DBTest
, L0L1L2AndUpHitCounter
) {
5311 Options options
= CurrentOptions();
5312 options
.write_buffer_size
= 32 * 1024;
5313 options
.target_file_size_base
= 32 * 1024;
5314 options
.level0_file_num_compaction_trigger
= 2;
5315 options
.level0_slowdown_writes_trigger
= 2;
5316 options
.level0_stop_writes_trigger
= 4;
5317 options
.max_bytes_for_level_base
= 64 * 1024;
5318 options
.max_write_buffer_number
= 2;
5319 options
.max_background_compactions
= 8;
5320 options
.max_background_flushes
= 8;
5321 options
.statistics
= ROCKSDB_NAMESPACE::CreateDBStatistics();
5322 CreateAndReopenWithCF({"mypikachu"}, options
);
5324 int numkeys
= 20000;
5325 for (int i
= 0; i
< numkeys
; i
++) {
5326 ASSERT_OK(Put(1, Key(i
), "val"));
5328 ASSERT_EQ(0, TestGetTickerCount(options
, GET_HIT_L0
));
5329 ASSERT_EQ(0, TestGetTickerCount(options
, GET_HIT_L1
));
5330 ASSERT_EQ(0, TestGetTickerCount(options
, GET_HIT_L2_AND_UP
));
5332 ASSERT_OK(Flush(1));
5333 dbfull()->TEST_WaitForCompact();
5335 for (int i
= 0; i
< numkeys
; i
++) {
5336 ASSERT_EQ(Get(1, Key(i
)), "val");
5339 ASSERT_GT(TestGetTickerCount(options
, GET_HIT_L0
), 100);
5340 ASSERT_GT(TestGetTickerCount(options
, GET_HIT_L1
), 100);
5341 ASSERT_GT(TestGetTickerCount(options
, GET_HIT_L2_AND_UP
), 100);
5343 ASSERT_EQ(numkeys
, TestGetTickerCount(options
, GET_HIT_L0
) +
5344 TestGetTickerCount(options
, GET_HIT_L1
) +
5345 TestGetTickerCount(options
, GET_HIT_L2_AND_UP
));
5348 TEST_F(DBTest
, EncodeDecompressedBlockSizeTest
) {
5354 CompressionType compressions
[] = {kZlibCompression
, kBZip2Compression
,
5355 kLZ4Compression
, kLZ4HCCompression
,
5356 kXpressCompression
};
5357 for (auto comp
: compressions
) {
5358 if (!CompressionTypeSupported(comp
)) {
5361 // first_table_version 1 -- generate with table_version == 1, read with
5362 // table_version == 2
5363 // first_table_version 2 -- generate with table_version == 2, read with
5364 // table_version == 1
5365 for (int first_table_version
= 1; first_table_version
<= 2;
5366 ++first_table_version
) {
5367 BlockBasedTableOptions table_options
;
5368 table_options
.format_version
= first_table_version
;
5369 table_options
.filter_policy
.reset(NewBloomFilterPolicy(10));
5370 Options options
= CurrentOptions();
5371 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
5372 options
.create_if_missing
= true;
5373 options
.compression
= comp
;
5374 DestroyAndReopen(options
);
5376 int kNumKeysWritten
= 1000;
5379 for (int i
= 0; i
< kNumKeysWritten
; ++i
) {
5380 // compressible string
5381 ASSERT_OK(Put(Key(i
), rnd
.RandomString(128) + std::string(128, 'a')));
5384 table_options
.format_version
= first_table_version
== 1 ? 2 : 1;
5385 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
5387 for (int i
= 0; i
< kNumKeysWritten
; ++i
) {
5388 auto r
= Get(Key(i
));
5389 ASSERT_EQ(r
.substr(128), std::string(128, 'a'));
5395 TEST_F(DBTest
, CloseSpeedup
) {
5396 Options options
= CurrentOptions();
5397 options
.compaction_style
= kCompactionStyleLevel
;
5398 options
.write_buffer_size
= 110 << 10; // 110KB
5399 options
.arena_block_size
= 4 << 10;
5400 options
.level0_file_num_compaction_trigger
= 2;
5401 options
.num_levels
= 4;
5402 options
.max_bytes_for_level_base
= 400 * 1024;
5403 options
.max_write_buffer_number
= 16;
5405 // Block background threads
5406 env_
->SetBackgroundThreads(1, Env::LOW
);
5407 env_
->SetBackgroundThreads(1, Env::HIGH
);
5408 test::SleepingBackgroundTask sleeping_task_low
;
5409 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
5410 Env::Priority::LOW
);
5411 test::SleepingBackgroundTask sleeping_task_high
;
5412 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
,
5413 &sleeping_task_high
, Env::Priority::HIGH
);
5415 std::vector
<std::string
> filenames
;
5416 env_
->GetChildren(dbname_
, &filenames
);
5417 // Delete archival files.
5418 for (size_t i
= 0; i
< filenames
.size(); ++i
) {
5419 env_
->DeleteFile(dbname_
+ "/" + filenames
[i
]);
5421 env_
->DeleteDir(dbname_
);
5422 DestroyAndReopen(options
);
5424 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
5425 env_
->SetBackgroundThreads(1, Env::LOW
);
5426 env_
->SetBackgroundThreads(1, Env::HIGH
);
5430 // First three 110KB files are not going to level 2
5431 // After that, (100K, 200K)
5432 for (int num
= 0; num
< 5; num
++) {
5433 GenerateNewFile(&rnd
, &key_idx
, true);
5436 ASSERT_EQ(0, GetSstFileCount(dbname_
));
5439 ASSERT_EQ(0, GetSstFileCount(dbname_
));
5441 // Unblock background threads
5442 sleeping_task_high
.WakeUp();
5443 sleeping_task_high
.WaitUntilDone();
5444 sleeping_task_low
.WakeUp();
5445 sleeping_task_low
.WaitUntilDone();
5450 class DelayedMergeOperator
: public MergeOperator
{
5455 explicit DelayedMergeOperator(DBTest
* d
) : db_test_(d
) {}
5457 bool FullMergeV2(const MergeOperationInput
& merge_in
,
5458 MergeOperationOutput
* merge_out
) const override
{
5459 db_test_
->env_
->MockSleepForMicroseconds(1000 *
5460 merge_in
.operand_list
.size());
5461 merge_out
->new_value
= "";
5465 const char* Name() const override
{ return "DelayedMergeOperator"; }
5468 TEST_F(DBTest
, MergeTestTime
) {
5469 std::string one
, two
, three
;
5470 PutFixed64(&one
, 1);
5471 PutFixed64(&two
, 2);
5472 PutFixed64(&three
, 3);
5474 // Enable time profiling
5475 SetPerfLevel(kEnableTime
);
5476 Options options
= CurrentOptions();
5477 options
.statistics
= ROCKSDB_NAMESPACE::CreateDBStatistics();
5478 options
.merge_operator
.reset(new DelayedMergeOperator(this));
5479 SetTimeElapseOnlySleepOnReopen(&options
);
5480 DestroyAndReopen(options
);
5482 // NOTE: Presumed unnecessary and removed: resetting mock time in env
5484 ASSERT_EQ(TestGetTickerCount(options
, MERGE_OPERATION_TOTAL_TIME
), 0);
5485 db_
->Put(WriteOptions(), "foo", one
);
5487 ASSERT_OK(db_
->Merge(WriteOptions(), "foo", two
));
5489 ASSERT_OK(db_
->Merge(WriteOptions(), "foo", three
));
5493 opt
.verify_checksums
= true;
5494 opt
.snapshot
= nullptr;
5496 db_
->Get(opt
, "foo", &result
);
5498 ASSERT_EQ(2000000, TestGetTickerCount(options
, MERGE_OPERATION_TOTAL_TIME
));
5500 ReadOptions read_options
;
5501 std::unique_ptr
<Iterator
> iter(db_
->NewIterator(read_options
));
5503 for (iter
->SeekToFirst(); iter
->Valid(); iter
->Next()) {
5504 ASSERT_OK(iter
->status());
5508 ASSERT_EQ(1, count
);
5509 ASSERT_EQ(4000000, TestGetTickerCount(options
, MERGE_OPERATION_TOTAL_TIME
));
5510 #ifdef ROCKSDB_USING_THREAD_STATUS
5511 ASSERT_GT(TestGetTickerCount(options
, FLUSH_WRITE_BYTES
), 0);
5512 #endif // ROCKSDB_USING_THREAD_STATUS
5515 #ifndef ROCKSDB_LITE
5516 TEST_P(DBTestWithParam
, MergeCompactionTimeTest
) {
5517 SetPerfLevel(kEnableTime
);
5518 Options options
= CurrentOptions();
5519 options
.compaction_filter_factory
= std::make_shared
<KeepFilterFactory
>();
5520 options
.statistics
= ROCKSDB_NAMESPACE::CreateDBStatistics();
5521 options
.merge_operator
.reset(new DelayedMergeOperator(this));
5522 options
.disable_auto_compactions
= true;
5523 options
.max_subcompactions
= max_subcompactions_
;
5524 SetTimeElapseOnlySleepOnReopen(&options
);
5525 DestroyAndReopen(options
);
5527 constexpr unsigned n
= 1000;
5528 for (unsigned i
= 0; i
< n
; i
++) {
5529 ASSERT_OK(db_
->Merge(WriteOptions(), "foo", "TEST"));
5532 dbfull()->TEST_WaitForFlushMemTable();
5534 CompactRangeOptions cro
;
5535 cro
.exclusive_manual_compaction
= exclusive_manual_compaction_
;
5536 ASSERT_OK(db_
->CompactRange(cro
, nullptr, nullptr));
5538 ASSERT_EQ(uint64_t{n
} * 1000000U,
5539 TestGetTickerCount(options
, MERGE_OPERATION_TOTAL_TIME
));
5542 TEST_P(DBTestWithParam
, FilterCompactionTimeTest
) {
5543 Options options
= CurrentOptions();
5544 options
.compaction_filter_factory
=
5545 std::make_shared
<DelayFilterFactory
>(this);
5546 options
.disable_auto_compactions
= true;
5547 options
.create_if_missing
= true;
5548 options
.statistics
= ROCKSDB_NAMESPACE::CreateDBStatistics();
5549 options
.statistics
->set_stats_level(kExceptTimeForMutex
);
5550 options
.max_subcompactions
= max_subcompactions_
;
5551 SetTimeElapseOnlySleepOnReopen(&options
);
5552 DestroyAndReopen(options
);
5556 for (int table
= 0; table
< 4; ++table
) {
5557 for (int i
= 0; i
< 10 + table
; ++i
) {
5558 Put(ToString(table
* 100 + i
), "val");
5564 CompactRangeOptions cro
;
5565 cro
.exclusive_manual_compaction
= exclusive_manual_compaction_
;
5566 ASSERT_OK(db_
->CompactRange(cro
, nullptr, nullptr));
5567 ASSERT_EQ(0U, CountLiveFiles());
5571 Iterator
* itr
= db_
->NewIterator(ReadOptions());
5573 ASSERT_EQ(uint64_t{n
} * 1000000U,
5574 TestGetTickerCount(options
, FILTER_OPERATION_TOTAL_TIME
));
5577 #endif // ROCKSDB_LITE
5579 TEST_F(DBTest
, TestLogCleanup
) {
5580 Options options
= CurrentOptions();
5581 options
.write_buffer_size
= 64 * 1024; // very small
5582 // only two memtables allowed ==> only two log files
5583 options
.max_write_buffer_number
= 2;
5586 for (int i
= 0; i
< 100000; ++i
) {
5588 // only 2 memtables will be alive, so logs_to_free needs to always be below
5590 ASSERT_LT(dbfull()->TEST_LogsToFreeSize(), static_cast<size_t>(3));
5594 #ifndef ROCKSDB_LITE
5595 TEST_F(DBTest
, EmptyCompactedDB
) {
5596 Options options
= CurrentOptions();
5597 options
.max_open_files
= -1;
5599 ASSERT_OK(ReadOnlyReopen(options
));
5600 Status s
= Put("new", "value");
5601 ASSERT_TRUE(s
.IsNotSupported());
5604 #endif // ROCKSDB_LITE
5606 #ifndef ROCKSDB_LITE
5607 TEST_F(DBTest
, DISABLED_SuggestCompactRangeTest
) {
5608 class CompactionFilterFactoryGetContext
: public CompactionFilterFactory
{
5610 std::unique_ptr
<CompactionFilter
> CreateCompactionFilter(
5611 const CompactionFilter::Context
& context
) override
{
5612 saved_context
= context
;
5613 std::unique_ptr
<CompactionFilter
> empty_filter
;
5614 return empty_filter
;
5616 const char* Name() const override
{
5617 return "CompactionFilterFactoryGetContext";
5619 static bool IsManual(CompactionFilterFactory
* compaction_filter_factory
) {
5620 return reinterpret_cast<CompactionFilterFactoryGetContext
*>(
5621 compaction_filter_factory
)
5622 ->saved_context
.is_manual_compaction
;
5624 CompactionFilter::Context saved_context
;
5627 Options options
= CurrentOptions();
5628 options
.memtable_factory
.reset(
5629 new SpecialSkipListFactory(DBTestBase::kNumKeysByGenerateNewRandomFile
));
5630 options
.compaction_style
= kCompactionStyleLevel
;
5631 options
.compaction_filter_factory
.reset(
5632 new CompactionFilterFactoryGetContext());
5633 options
.write_buffer_size
= 200 << 10;
5634 options
.arena_block_size
= 4 << 10;
5635 options
.level0_file_num_compaction_trigger
= 4;
5636 options
.num_levels
= 4;
5637 options
.compression
= kNoCompression
;
5638 options
.max_bytes_for_level_base
= 450 << 10;
5639 options
.target_file_size_base
= 98 << 10;
5640 options
.max_compaction_bytes
= static_cast<uint64_t>(1) << 60; // inf
5646 for (int num
= 0; num
< 3; num
++) {
5647 GenerateNewRandomFile(&rnd
);
5650 GenerateNewRandomFile(&rnd
);
5651 ASSERT_EQ("0,4", FilesPerLevel(0));
5652 ASSERT_TRUE(!CompactionFilterFactoryGetContext::IsManual(
5653 options
.compaction_filter_factory
.get()));
5655 GenerateNewRandomFile(&rnd
);
5656 ASSERT_EQ("1,4", FilesPerLevel(0));
5658 GenerateNewRandomFile(&rnd
);
5659 ASSERT_EQ("2,4", FilesPerLevel(0));
5661 GenerateNewRandomFile(&rnd
);
5662 ASSERT_EQ("3,4", FilesPerLevel(0));
5664 GenerateNewRandomFile(&rnd
);
5665 ASSERT_EQ("0,4,4", FilesPerLevel(0));
5667 GenerateNewRandomFile(&rnd
);
5668 ASSERT_EQ("1,4,4", FilesPerLevel(0));
5670 GenerateNewRandomFile(&rnd
);
5671 ASSERT_EQ("2,4,4", FilesPerLevel(0));
5673 GenerateNewRandomFile(&rnd
);
5674 ASSERT_EQ("3,4,4", FilesPerLevel(0));
5676 GenerateNewRandomFile(&rnd
);
5677 ASSERT_EQ("0,4,8", FilesPerLevel(0));
5679 GenerateNewRandomFile(&rnd
);
5680 ASSERT_EQ("1,4,8", FilesPerLevel(0));
5682 // compact it three times
5683 for (int i
= 0; i
< 3; ++i
) {
5684 ASSERT_OK(experimental::SuggestCompactRange(db_
, nullptr, nullptr));
5685 dbfull()->TEST_WaitForCompact();
5688 // All files are compacted
5689 ASSERT_EQ(0, NumTableFilesAtLevel(0));
5690 ASSERT_EQ(0, NumTableFilesAtLevel(1));
5692 GenerateNewRandomFile(&rnd
);
5693 ASSERT_EQ(1, NumTableFilesAtLevel(0));
5695 // nonoverlapping with the file on level 0
5696 Slice
start("a"), end("b");
5697 ASSERT_OK(experimental::SuggestCompactRange(db_
, &start
, &end
));
5698 dbfull()->TEST_WaitForCompact();
5700 // should not compact the level 0 file
5701 ASSERT_EQ(1, NumTableFilesAtLevel(0));
5705 ASSERT_OK(experimental::SuggestCompactRange(db_
, &start
, &end
));
5706 dbfull()->TEST_WaitForCompact();
5707 ASSERT_TRUE(CompactionFilterFactoryGetContext::IsManual(
5708 options
.compaction_filter_factory
.get()));
5710 // now it should compact the level 0 file
5711 ASSERT_EQ(0, NumTableFilesAtLevel(0));
5712 ASSERT_EQ(1, NumTableFilesAtLevel(1));
5716 TEST_F(DBTest
, PromoteL0
) {
5717 Options options
= CurrentOptions();
5718 options
.disable_auto_compactions
= true;
5719 options
.write_buffer_size
= 10 * 1024 * 1024;
5720 DestroyAndReopen(options
);
5722 // non overlapping ranges
5723 std::vector
<std::pair
<int32_t, int32_t>> ranges
= {
5724 {81, 160}, {0, 80}, {161, 240}, {241, 320}};
5726 int32_t value_size
= 10 * 1024; // 10 KB
5729 std::map
<int32_t, std::string
> values
;
5730 for (const auto& range
: ranges
) {
5731 for (int32_t j
= range
.first
; j
< range
.second
; j
++) {
5732 values
[j
] = rnd
.RandomString(value_size
);
5733 ASSERT_OK(Put(Key(j
), values
[j
]));
5738 int32_t level0_files
= NumTableFilesAtLevel(0, 0);
5739 ASSERT_EQ(level0_files
, ranges
.size());
5740 ASSERT_EQ(NumTableFilesAtLevel(1, 0), 0); // No files in L1
5742 // Promote L0 level to L2.
5743 ASSERT_OK(experimental::PromoteL0(db_
, db_
->DefaultColumnFamily(), 2));
5744 // We expect that all the files were trivially moved from L0 to L2
5745 ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0);
5746 ASSERT_EQ(NumTableFilesAtLevel(2, 0), level0_files
);
5748 for (const auto& kv
: values
) {
5749 ASSERT_EQ(Get(Key(kv
.first
)), kv
.second
);
5753 TEST_F(DBTest
, PromoteL0Failure
) {
5754 Options options
= CurrentOptions();
5755 options
.disable_auto_compactions
= true;
5756 options
.write_buffer_size
= 10 * 1024 * 1024;
5757 DestroyAndReopen(options
);
5759 // Produce two L0 files with overlapping ranges.
5760 ASSERT_OK(Put(Key(0), ""));
5761 ASSERT_OK(Put(Key(3), ""));
5763 ASSERT_OK(Put(Key(1), ""));
5767 // Fails because L0 has overlapping files.
5768 status
= experimental::PromoteL0(db_
, db_
->DefaultColumnFamily());
5769 ASSERT_TRUE(status
.IsInvalidArgument());
5771 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
5772 // Now there is a file in L1.
5773 ASSERT_GE(NumTableFilesAtLevel(1, 0), 1);
5775 ASSERT_OK(Put(Key(5), ""));
5777 // Fails because L1 is non-empty.
5778 status
= experimental::PromoteL0(db_
, db_
->DefaultColumnFamily());
5779 ASSERT_TRUE(status
.IsInvalidArgument());
5782 // Github issue #596
5783 TEST_F(DBTest
, CompactRangeWithEmptyBottomLevel
) {
5784 const int kNumLevels
= 2;
5785 const int kNumL0Files
= 2;
5786 Options options
= CurrentOptions();
5787 options
.disable_auto_compactions
= true;
5788 options
.num_levels
= kNumLevels
;
5789 DestroyAndReopen(options
);
5792 for (int i
= 0; i
< kNumL0Files
; ++i
) {
5793 ASSERT_OK(Put(Key(0), rnd
.RandomString(1024)));
5796 ASSERT_EQ(NumTableFilesAtLevel(0), kNumL0Files
);
5797 ASSERT_EQ(NumTableFilesAtLevel(1), 0);
5799 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
5800 ASSERT_EQ(NumTableFilesAtLevel(0), 0);
5801 ASSERT_EQ(NumTableFilesAtLevel(1), kNumL0Files
);
5803 #endif // ROCKSDB_LITE
5805 TEST_F(DBTest
, AutomaticConflictsWithManualCompaction
) {
5806 const int kNumL0Files
= 50;
5807 Options options
= CurrentOptions();
5808 options
.level0_file_num_compaction_trigger
= 4;
5809 // never slowdown / stop
5810 options
.level0_slowdown_writes_trigger
= 999999;
5811 options
.level0_stop_writes_trigger
= 999999;
5812 options
.max_background_compactions
= 10;
5813 DestroyAndReopen(options
);
5815 // schedule automatic compactions after the manual one starts, but before it
5816 // finishes to ensure conflict.
5817 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
5818 {{"DBImpl::BackgroundCompaction:Start",
5819 "DBTest::AutomaticConflictsWithManualCompaction:PrePuts"},
5820 {"DBTest::AutomaticConflictsWithManualCompaction:PostPuts",
5821 "DBImpl::BackgroundCompaction:NonTrivial:AfterRun"}});
5822 std::atomic
<int> callback_count(0);
5823 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
5824 "DBImpl::MaybeScheduleFlushOrCompaction:Conflict",
5825 [&](void* /*arg*/) { callback_count
.fetch_add(1); });
5826 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
5829 for (int i
= 0; i
< 2; ++i
) {
5830 // put two keys to ensure no trivial move
5831 for (int j
= 0; j
< 2; ++j
) {
5832 ASSERT_OK(Put(Key(j
), rnd
.RandomString(1024)));
5836 port::Thread
manual_compaction_thread([this]() {
5837 CompactRangeOptions croptions
;
5838 croptions
.exclusive_manual_compaction
= true;
5839 ASSERT_OK(db_
->CompactRange(croptions
, nullptr, nullptr));
5842 TEST_SYNC_POINT("DBTest::AutomaticConflictsWithManualCompaction:PrePuts");
5843 for (int i
= 0; i
< kNumL0Files
; ++i
) {
5844 // put two keys to ensure no trivial move
5845 for (int j
= 0; j
< 2; ++j
) {
5846 ASSERT_OK(Put(Key(j
), rnd
.RandomString(1024)));
5850 TEST_SYNC_POINT("DBTest::AutomaticConflictsWithManualCompaction:PostPuts");
5852 ASSERT_GE(callback_count
.load(), 1);
5853 for (int i
= 0; i
< 2; ++i
) {
5854 ASSERT_NE("NOT_FOUND", Get(Key(i
)));
5856 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
5857 manual_compaction_thread
.join();
5858 dbfull()->TEST_WaitForCompact();
5861 #ifndef ROCKSDB_LITE
5862 TEST_F(DBTest
, CompactFilesShouldTriggerAutoCompaction
) {
5863 Options options
= CurrentOptions();
5864 options
.max_background_compactions
= 1;
5865 options
.level0_file_num_compaction_trigger
= 4;
5866 options
.level0_slowdown_writes_trigger
= 36;
5867 options
.level0_stop_writes_trigger
= 36;
5868 DestroyAndReopen(options
);
5870 // generate files for manual compaction
5872 for (int i
= 0; i
< 2; ++i
) {
5873 // put two keys to ensure no trivial move
5874 for (int j
= 0; j
< 2; ++j
) {
5875 ASSERT_OK(Put(Key(j
), rnd
.RandomString(1024)));
5880 ROCKSDB_NAMESPACE::ColumnFamilyMetaData cf_meta_data
;
5881 db_
->GetColumnFamilyMetaData(db_
->DefaultColumnFamily(), &cf_meta_data
);
5883 std::vector
<std::string
> input_files
;
5884 input_files
.push_back(cf_meta_data
.levels
[0].files
[0].name
);
5886 SyncPoint::GetInstance()->LoadDependency({
5887 {"CompactFilesImpl:0",
5888 "DBTest::CompactFilesShouldTriggerAutoCompaction:Begin"},
5889 {"DBTest::CompactFilesShouldTriggerAutoCompaction:End",
5890 "CompactFilesImpl:1"},
5893 SyncPoint::GetInstance()->EnableProcessing();
5895 port::Thread
manual_compaction_thread([&]() {
5896 auto s
= db_
->CompactFiles(CompactionOptions(),
5897 db_
->DefaultColumnFamily(), input_files
, 0);
5901 "DBTest::CompactFilesShouldTriggerAutoCompaction:Begin");
5902 // generate enough files to trigger compaction
5903 for (int i
= 0; i
< 20; ++i
) {
5904 for (int j
= 0; j
< 2; ++j
) {
5905 ASSERT_OK(Put(Key(j
), rnd
.RandomString(1024)));
5909 db_
->GetColumnFamilyMetaData(db_
->DefaultColumnFamily(), &cf_meta_data
);
5910 ASSERT_GT(cf_meta_data
.levels
[0].files
.size(),
5911 options
.level0_file_num_compaction_trigger
);
5913 "DBTest::CompactFilesShouldTriggerAutoCompaction:End");
5915 manual_compaction_thread
.join();
5916 dbfull()->TEST_WaitForCompact();
5918 db_
->GetColumnFamilyMetaData(db_
->DefaultColumnFamily(), &cf_meta_data
);
5919 ASSERT_LE(cf_meta_data
.levels
[0].files
.size(),
5920 options
.level0_file_num_compaction_trigger
);
5922 #endif // ROCKSDB_LITE
5924 // Github issue #595
5925 // Large write batch with column families
5926 TEST_F(DBTest
, LargeBatchWithColumnFamilies
) {
5927 Options options
= CurrentOptions();
5929 options
.write_buffer_size
= 100000; // Small write buffer
5930 CreateAndReopenWithCF({"pikachu"}, options
);
5932 for (int i
= 0; i
< 5; i
++) {
5933 for (int pass
= 1; pass
<= 3; pass
++) {
5935 size_t write_size
= 1024 * 1024 * (5 + i
);
5936 fprintf(stderr
, "prepare: %" ROCKSDB_PRIszt
" MB, pass:%d\n",
5937 (write_size
/ 1024 / 1024), pass
);
5939 std::string
data(3000, j
++ % 127 + 20);
5940 data
+= ToString(j
);
5941 batch
.Put(handles_
[0], Slice(data
), Slice(data
));
5942 if (batch
.GetDataSize() > write_size
) {
5946 fprintf(stderr
, "write: %" ROCKSDB_PRIszt
" MB\n",
5947 (batch
.GetDataSize() / 1024 / 1024));
5948 ASSERT_OK(dbfull()->Write(WriteOptions(), &batch
));
5949 fprintf(stderr
, "done\n");
5952 // make sure we can re-open it.
5953 ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options
));
5956 // Make sure that Flushes can proceed in parallel with CompactRange()
5957 TEST_F(DBTest
, FlushesInParallelWithCompactRange
) {
5958 // iter == 0 -- leveled
5959 // iter == 1 -- leveled, but throw in a flush between two levels compacting
5960 // iter == 2 -- universal
5961 for (int iter
= 0; iter
< 3; ++iter
) {
5962 Options options
= CurrentOptions();
5964 options
.compaction_style
= kCompactionStyleLevel
;
5966 options
.compaction_style
= kCompactionStyleUniversal
;
5968 options
.write_buffer_size
= 110 << 10;
5969 options
.level0_file_num_compaction_trigger
= 4;
5970 options
.num_levels
= 4;
5971 options
.compression
= kNoCompression
;
5972 options
.max_bytes_for_level_base
= 450 << 10;
5973 options
.target_file_size_base
= 98 << 10;
5974 options
.max_write_buffer_number
= 2;
5976 DestroyAndReopen(options
);
5979 for (int num
= 0; num
< 14; num
++) {
5980 GenerateNewRandomFile(&rnd
);
5984 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
5985 {{"DBImpl::RunManualCompaction()::1",
5986 "DBTest::FlushesInParallelWithCompactRange:1"},
5987 {"DBTest::FlushesInParallelWithCompactRange:2",
5988 "DBImpl::RunManualCompaction()::2"}});
5990 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
5991 {{"CompactionJob::Run():Start",
5992 "DBTest::FlushesInParallelWithCompactRange:1"},
5993 {"DBTest::FlushesInParallelWithCompactRange:2",
5994 "CompactionJob::Run():End"}});
5996 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
5998 std::vector
<port::Thread
> threads
;
5999 threads
.emplace_back([&]() { Compact("a", "z"); });
6001 TEST_SYNC_POINT("DBTest::FlushesInParallelWithCompactRange:1");
6003 // this has to start a flush. if flushes are blocked, this will try to
6005 // 3 memtables, and that will fail because max_write_buffer_number is 2
6006 for (int num
= 0; num
< 3; num
++) {
6007 GenerateNewRandomFile(&rnd
, /* nowait */ true);
6010 TEST_SYNC_POINT("DBTest::FlushesInParallelWithCompactRange:2");
6012 for (auto& t
: threads
) {
6015 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
6019 TEST_F(DBTest
, DelayedWriteRate
) {
6020 const int kEntriesPerMemTable
= 100;
6021 const int kTotalFlushes
= 12;
6023 Options options
= CurrentOptions();
6024 env_
->SetBackgroundThreads(1, Env::LOW
);
6026 options
.write_buffer_size
= 100000000;
6027 options
.max_write_buffer_number
= 256;
6028 options
.max_background_compactions
= 1;
6029 options
.level0_file_num_compaction_trigger
= 3;
6030 options
.level0_slowdown_writes_trigger
= 3;
6031 options
.level0_stop_writes_trigger
= 999999;
6032 options
.delayed_write_rate
= 20000000; // Start with 200MB/s
6033 options
.memtable_factory
.reset(
6034 new SpecialSkipListFactory(kEntriesPerMemTable
));
6036 SetTimeElapseOnlySleepOnReopen(&options
);
6037 CreateAndReopenWithCF({"pikachu"}, options
);
6039 // Block compactions
6040 test::SleepingBackgroundTask sleeping_task_low
;
6041 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
6042 Env::Priority::LOW
);
6044 for (int i
= 0; i
< 3; i
++) {
6045 Put(Key(i
), std::string(10000, 'x'));
6049 // These writes will be slowed down to 1KB/s
6050 uint64_t estimated_sleep_time
= 0;
6053 uint64_t cur_rate
= options
.delayed_write_rate
;
6054 for (int i
= 0; i
< kTotalFlushes
; i
++) {
6055 uint64_t size_memtable
= 0;
6056 for (int j
= 0; j
< kEntriesPerMemTable
; j
++) {
6057 auto rand_num
= rnd
.Uniform(20);
6058 // Spread the size range to more.
6059 size_t entry_size
= rand_num
* rand_num
* rand_num
;
6061 Put(Key(i
), std::string(entry_size
, 'x'), wo
);
6062 size_memtable
+= entry_size
+ 18;
6063 // Occasionally sleep a while
6064 if (rnd
.Uniform(20) == 6) {
6065 env_
->SleepForMicroseconds(2666);
6068 dbfull()->TEST_WaitForFlushMemTable();
6069 estimated_sleep_time
+= size_memtable
* 1000000u / cur_rate
;
6070 // Slow down twice. One for memtable switch and one for flush finishes.
6071 cur_rate
= static_cast<uint64_t>(static_cast<double>(cur_rate
) *
6072 kIncSlowdownRatio
* kIncSlowdownRatio
);
6074 // Estimate the total sleep time fall into the rough range.
6075 ASSERT_GT(env_
->NowMicros(), estimated_sleep_time
/ 2);
6076 ASSERT_LT(env_
->NowMicros(), estimated_sleep_time
* 2);
6078 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
6079 sleeping_task_low
.WakeUp();
6080 sleeping_task_low
.WaitUntilDone();
6083 TEST_F(DBTest
, HardLimit
) {
6084 Options options
= CurrentOptions();
6086 env_
->SetBackgroundThreads(1, Env::LOW
);
6087 options
.max_write_buffer_number
= 256;
6088 options
.write_buffer_size
= 110 << 10; // 110KB
6089 options
.arena_block_size
= 4 * 1024;
6090 options
.level0_file_num_compaction_trigger
= 4;
6091 options
.level0_slowdown_writes_trigger
= 999999;
6092 options
.level0_stop_writes_trigger
= 999999;
6093 options
.hard_pending_compaction_bytes_limit
= 800 << 10;
6094 options
.max_bytes_for_level_base
= 10000000000u;
6095 options
.max_background_compactions
= 1;
6096 options
.memtable_factory
.reset(
6097 new SpecialSkipListFactory(KNumKeysByGenerateNewFile
- 1));
6099 env_
->SetBackgroundThreads(1, Env::LOW
);
6100 test::SleepingBackgroundTask sleeping_task_low
;
6101 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
6102 Env::Priority::LOW
);
6104 CreateAndReopenWithCF({"pikachu"}, options
);
6106 std::atomic
<int> callback_count(0);
6107 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
6108 "DBImpl::DelayWrite:Wait", [&](void* /*arg*/) {
6109 callback_count
.fetch_add(1);
6110 sleeping_task_low
.WakeUp();
6112 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
6116 for (int num
= 0; num
< 5; num
++) {
6117 GenerateNewFile(&rnd
, &key_idx
, true);
6118 dbfull()->TEST_WaitForFlushMemTable();
6121 ASSERT_EQ(0, callback_count
.load());
6123 for (int num
= 0; num
< 5; num
++) {
6124 GenerateNewFile(&rnd
, &key_idx
, true);
6125 dbfull()->TEST_WaitForFlushMemTable();
6127 ASSERT_GE(callback_count
.load(), 1);
6129 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
6130 sleeping_task_low
.WaitUntilDone();
6133 #if !defined(ROCKSDB_LITE) && !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION)
6134 class WriteStallListener
: public EventListener
{
6136 WriteStallListener() : condition_(WriteStallCondition::kNormal
) {}
6137 void OnStallConditionsChanged(const WriteStallInfo
& info
) override
{
6138 MutexLock
l(&mutex_
);
6139 condition_
= info
.condition
.cur
;
6141 bool CheckCondition(WriteStallCondition expected
) {
6142 MutexLock
l(&mutex_
);
6143 return expected
== condition_
;
6147 WriteStallCondition condition_
;
6150 TEST_F(DBTest
, SoftLimit
) {
6151 Options options
= CurrentOptions();
6153 options
.write_buffer_size
= 100000; // Small write buffer
6154 options
.max_write_buffer_number
= 256;
6155 options
.level0_file_num_compaction_trigger
= 1;
6156 options
.level0_slowdown_writes_trigger
= 3;
6157 options
.level0_stop_writes_trigger
= 999999;
6158 options
.delayed_write_rate
= 20000; // About 200KB/s limited rate
6159 options
.soft_pending_compaction_bytes_limit
= 160000;
6160 options
.target_file_size_base
= 99999999; // All into one file
6161 options
.max_bytes_for_level_base
= 50000;
6162 options
.max_bytes_for_level_multiplier
= 10;
6163 options
.max_background_compactions
= 1;
6164 options
.compression
= kNoCompression
;
6165 WriteStallListener
* listener
= new WriteStallListener();
6166 options
.listeners
.emplace_back(listener
);
6168 // FlushMemtable with opt.wait=true does not wait for
6169 // `OnStallConditionsChanged` being called. The event listener is triggered
6170 // on `JobContext::Clean`, which happens after flush result is installed.
6171 // We use sync point to create a custom WaitForFlush that waits for
6173 port::Mutex flush_mutex
;
6174 port::CondVar
flush_cv(&flush_mutex
);
6175 bool flush_finished
= false;
6176 auto InstallFlushCallback
= [&]() {
6178 MutexLock
l(&flush_mutex
);
6179 flush_finished
= false;
6181 SyncPoint::GetInstance()->SetCallBack(
6182 "DBImpl::BackgroundCallFlush:ContextCleanedUp", [&](void*) {
6184 MutexLock
l(&flush_mutex
);
6185 flush_finished
= true;
6187 flush_cv
.SignalAll();
6190 auto WaitForFlush
= [&]() {
6192 MutexLock
l(&flush_mutex
);
6193 while (!flush_finished
) {
6197 SyncPoint::GetInstance()->ClearCallBack(
6198 "DBImpl::BackgroundCallFlush:ContextCleanedUp");
6201 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
6205 // Generating 360KB in Level 3
6206 for (int i
= 0; i
< 72; i
++) {
6207 Put(Key(i
), std::string(5000, 'x'));
6209 dbfull()->TEST_FlushMemTable(true, true);
6212 dbfull()->TEST_WaitForCompact();
6213 MoveFilesToLevel(3);
6215 // Generating 360KB in Level 2
6216 for (int i
= 0; i
< 72; i
++) {
6217 Put(Key(i
), std::string(5000, 'x'));
6219 dbfull()->TEST_FlushMemTable(true, true);
6222 dbfull()->TEST_WaitForCompact();
6223 MoveFilesToLevel(2);
6227 test::SleepingBackgroundTask sleeping_task_low
;
6228 // Block compactions
6229 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
6230 Env::Priority::LOW
);
6231 sleeping_task_low
.WaitUntilSleeping();
6233 // Create 3 L0 files, making score of L0 to be 3.
6234 for (int i
= 0; i
< 3; i
++) {
6235 Put(Key(i
), std::string(5000, 'x'));
6236 Put(Key(100 - i
), std::string(5000, 'x'));
6237 // Flush the file. File size is around 30KB.
6238 InstallFlushCallback();
6239 dbfull()->TEST_FlushMemTable(true, true);
6242 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
6243 ASSERT_TRUE(listener
->CheckCondition(WriteStallCondition::kDelayed
));
6245 sleeping_task_low
.WakeUp();
6246 sleeping_task_low
.WaitUntilDone();
6247 sleeping_task_low
.Reset();
6248 dbfull()->TEST_WaitForCompact();
6250 // Now there is one L1 file but doesn't trigger soft_rate_limit
6251 // The L1 file size is around 30KB.
6252 ASSERT_EQ(NumTableFilesAtLevel(1), 1);
6253 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
6254 ASSERT_TRUE(listener
->CheckCondition(WriteStallCondition::kNormal
));
6256 // Only allow one compactin going through.
6257 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
6258 "BackgroundCallCompaction:0", [&](void* /*arg*/) {
6259 // Schedule a sleeping task.
6260 sleeping_task_low
.Reset();
6261 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
,
6262 &sleeping_task_low
, Env::Priority::LOW
);
6265 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
6266 Env::Priority::LOW
);
6267 sleeping_task_low
.WaitUntilSleeping();
6268 // Create 3 L0 files, making score of L0 to be 3
6269 for (int i
= 0; i
< 3; i
++) {
6270 Put(Key(10 + i
), std::string(5000, 'x'));
6271 Put(Key(90 - i
), std::string(5000, 'x'));
6272 // Flush the file. File size is around 30KB.
6273 InstallFlushCallback();
6274 dbfull()->TEST_FlushMemTable(true, true);
6278 // Wake up sleep task to enable compaction to run and waits
6279 // for it to go to sleep state again to make sure one compaction
6281 sleeping_task_low
.WakeUp();
6282 sleeping_task_low
.WaitUntilSleeping();
6284 // Now there is one L1 file (around 60KB) which exceeds 50KB base by 10KB
6285 // Given level multiplier 10, estimated pending compaction is around 100KB
6286 // doesn't trigger soft_pending_compaction_bytes_limit
6287 ASSERT_EQ(NumTableFilesAtLevel(1), 1);
6288 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
6289 ASSERT_TRUE(listener
->CheckCondition(WriteStallCondition::kNormal
));
6291 // Create 3 L0 files, making score of L0 to be 3, higher than L0.
6292 for (int i
= 0; i
< 3; i
++) {
6293 Put(Key(20 + i
), std::string(5000, 'x'));
6294 Put(Key(80 - i
), std::string(5000, 'x'));
6295 // Flush the file. File size is around 30KB.
6296 InstallFlushCallback();
6297 dbfull()->TEST_FlushMemTable(true, true);
6300 // Wake up sleep task to enable compaction to run and waits
6301 // for it to go to sleep state again to make sure one compaction
6303 sleeping_task_low
.WakeUp();
6304 sleeping_task_low
.WaitUntilSleeping();
6306 // Now there is one L1 file (around 90KB) which exceeds 50KB base by 40KB
6307 // L2 size is 360KB, so the estimated level fanout 4, estimated pending
6308 // compaction is around 200KB
6309 // triggerring soft_pending_compaction_bytes_limit
6310 ASSERT_EQ(NumTableFilesAtLevel(1), 1);
6311 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
6312 ASSERT_TRUE(listener
->CheckCondition(WriteStallCondition::kDelayed
));
6314 sleeping_task_low
.WakeUp();
6315 sleeping_task_low
.WaitUntilSleeping();
6317 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
6318 ASSERT_TRUE(listener
->CheckCondition(WriteStallCondition::kNormal
));
6320 // shrink level base so L2 will hit soft limit easier.
6321 ASSERT_OK(dbfull()->SetOptions({
6322 {"max_bytes_for_level_base", "5000"},
6327 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
6328 ASSERT_TRUE(listener
->CheckCondition(WriteStallCondition::kDelayed
));
6330 sleeping_task_low
.WaitUntilSleeping();
6331 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
6332 sleeping_task_low
.WakeUp();
6333 sleeping_task_low
.WaitUntilDone();
6336 TEST_F(DBTest
, LastWriteBufferDelay
) {
6337 Options options
= CurrentOptions();
6339 options
.write_buffer_size
= 100000;
6340 options
.max_write_buffer_number
= 4;
6341 options
.delayed_write_rate
= 20000;
6342 options
.compression
= kNoCompression
;
6343 options
.disable_auto_compactions
= true;
6344 int kNumKeysPerMemtable
= 3;
6345 options
.memtable_factory
.reset(
6346 new SpecialSkipListFactory(kNumKeysPerMemtable
));
6349 test::SleepingBackgroundTask sleeping_task
;
6351 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task
,
6352 Env::Priority::HIGH
);
6353 sleeping_task
.WaitUntilSleeping();
6355 // Create 3 L0 files, making score of L0 to be 3.
6356 for (int i
= 0; i
< 3; i
++) {
6357 // Fill one mem table
6358 for (int j
= 0; j
< kNumKeysPerMemtable
; j
++) {
6361 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
6363 // Inserting a new entry would create a new mem table, triggering slow down.
6365 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
6367 sleeping_task
.WakeUp();
6368 sleeping_task
.WaitUntilDone();
6370 #endif // !defined(ROCKSDB_LITE) && !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION)
6372 TEST_F(DBTest
, FailWhenCompressionNotSupportedTest
) {
6373 CompressionType compressions
[] = {kZlibCompression
, kBZip2Compression
,
6374 kLZ4Compression
, kLZ4HCCompression
,
6375 kXpressCompression
};
6376 for (auto comp
: compressions
) {
6377 if (!CompressionTypeSupported(comp
)) {
6378 // not supported, we should fail the Open()
6379 Options options
= CurrentOptions();
6380 options
.compression
= comp
;
6381 ASSERT_TRUE(!TryReopen(options
).ok());
6382 // Try if CreateColumnFamily also fails
6383 options
.compression
= kNoCompression
;
6384 ASSERT_OK(TryReopen(options
));
6385 ColumnFamilyOptions
cf_options(options
);
6386 cf_options
.compression
= comp
;
6387 ColumnFamilyHandle
* handle
;
6388 ASSERT_TRUE(!db_
->CreateColumnFamily(cf_options
, "name", &handle
).ok());
6393 TEST_F(DBTest
, CreateColumnFamilyShouldFailOnIncompatibleOptions
) {
6394 Options options
= CurrentOptions();
6395 options
.max_open_files
= 100;
6398 ColumnFamilyOptions
cf_options(options
);
6399 // ttl is now supported when max_open_files is -1.
6400 cf_options
.ttl
= 3600;
6401 ColumnFamilyHandle
* handle
;
6402 ASSERT_OK(db_
->CreateColumnFamily(cf_options
, "pikachu", &handle
));
6406 #ifndef ROCKSDB_LITE
6407 TEST_F(DBTest
, RowCache
) {
6408 Options options
= CurrentOptions();
6409 options
.statistics
= ROCKSDB_NAMESPACE::CreateDBStatistics();
6410 options
.row_cache
= NewLRUCache(8192);
6411 DestroyAndReopen(options
);
6413 ASSERT_OK(Put("foo", "bar"));
6416 ASSERT_EQ(TestGetTickerCount(options
, ROW_CACHE_HIT
), 0);
6417 ASSERT_EQ(TestGetTickerCount(options
, ROW_CACHE_MISS
), 0);
6418 ASSERT_EQ(Get("foo"), "bar");
6419 ASSERT_EQ(TestGetTickerCount(options
, ROW_CACHE_HIT
), 0);
6420 ASSERT_EQ(TestGetTickerCount(options
, ROW_CACHE_MISS
), 1);
6421 ASSERT_EQ(Get("foo"), "bar");
6422 ASSERT_EQ(TestGetTickerCount(options
, ROW_CACHE_HIT
), 1);
6423 ASSERT_EQ(TestGetTickerCount(options
, ROW_CACHE_MISS
), 1);
6426 TEST_F(DBTest
, PinnableSliceAndRowCache
) {
6427 Options options
= CurrentOptions();
6428 options
.statistics
= ROCKSDB_NAMESPACE::CreateDBStatistics();
6429 options
.row_cache
= NewLRUCache(8192);
6430 DestroyAndReopen(options
);
6432 ASSERT_OK(Put("foo", "bar"));
6435 ASSERT_EQ(Get("foo"), "bar");
6437 reinterpret_cast<LRUCache
*>(options
.row_cache
.get())->TEST_GetLRUSize(),
6441 PinnableSlice pin_slice
;
6442 ASSERT_EQ(Get("foo", &pin_slice
), Status::OK());
6443 ASSERT_EQ(pin_slice
.ToString(), "bar");
6444 // Entry is already in cache, lookup will remove the element from lru
6446 reinterpret_cast<LRUCache
*>(options
.row_cache
.get())->TEST_GetLRUSize(),
6449 // After PinnableSlice destruction element is added back in LRU
6451 reinterpret_cast<LRUCache
*>(options
.row_cache
.get())->TEST_GetLRUSize(),
6455 #endif // ROCKSDB_LITE
6457 TEST_F(DBTest
, DeletingOldWalAfterDrop
) {
6458 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
6459 {{"Test:AllowFlushes", "DBImpl::BGWorkFlush"},
6460 {"DBImpl::BGWorkFlush:done", "Test:WaitForFlush"}});
6461 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
6463 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
6464 Options options
= CurrentOptions();
6465 options
.max_total_wal_size
= 8192;
6466 options
.compression
= kNoCompression
;
6467 options
.write_buffer_size
= 1 << 20;
6468 options
.level0_file_num_compaction_trigger
= (1 << 30);
6469 options
.level0_slowdown_writes_trigger
= (1 << 30);
6470 options
.level0_stop_writes_trigger
= (1 << 30);
6471 options
.disable_auto_compactions
= true;
6472 DestroyAndReopen(options
);
6473 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
6475 CreateColumnFamilies({"cf1", "cf2"}, options
);
6476 ASSERT_OK(Put(0, "key1", DummyString(8192)));
6477 ASSERT_OK(Put(0, "key2", DummyString(8192)));
6478 // the oldest wal should now be getting_flushed
6479 ASSERT_OK(db_
->DropColumnFamily(handles_
[0]));
6480 // all flushes should now do nothing because their CF is dropped
6481 TEST_SYNC_POINT("Test:AllowFlushes");
6482 TEST_SYNC_POINT("Test:WaitForFlush");
6483 uint64_t lognum1
= dbfull()->TEST_LogfileNumber();
6484 ASSERT_OK(Put(1, "key3", DummyString(8192)));
6485 ASSERT_OK(Put(1, "key4", DummyString(8192)));
6486 // new wal should have been created
6487 uint64_t lognum2
= dbfull()->TEST_LogfileNumber();
6488 EXPECT_GT(lognum2
, lognum1
);
6491 TEST_F(DBTest
, UnsupportedManualSync
) {
6492 DestroyAndReopen(CurrentOptions());
6493 env_
->is_wal_sync_thread_safe_
.store(false);
6494 Status s
= db_
->SyncWAL();
6495 ASSERT_TRUE(s
.IsNotSupported());
6498 INSTANTIATE_TEST_CASE_P(DBTestWithParam
, DBTestWithParam
,
6499 ::testing::Combine(::testing::Values(1, 4),
6500 ::testing::Bool()));
6502 TEST_F(DBTest
, PauseBackgroundWorkTest
) {
6503 Options options
= CurrentOptions();
6504 options
.write_buffer_size
= 100000; // Small write buffer
6507 std::vector
<port::Thread
> threads
;
6508 std::atomic
<bool> done(false);
6509 db_
->PauseBackgroundWork();
6510 threads
.emplace_back([&]() {
6512 for (int i
= 0; i
< 10000; ++i
) {
6513 Put(rnd
.RandomString(10), rnd
.RandomString(10));
6517 env_
->SleepForMicroseconds(200000);
6518 // make sure the thread is not done
6519 ASSERT_FALSE(done
.load());
6520 db_
->ContinueBackgroundWork();
6521 for (auto& t
: threads
) {
6525 ASSERT_TRUE(done
.load());
6528 // Keep spawning short-living threads that create an iterator and quit.
6529 // Meanwhile in another thread keep flushing memtables.
6530 // This used to cause a deadlock.
6531 TEST_F(DBTest
, ThreadLocalPtrDeadlock
) {
6532 std::atomic
<int> flushes_done
{0};
6533 std::atomic
<int> threads_destroyed
{0};
6535 return flushes_done
.load() > 10;
6538 port::Thread
flushing_thread([&] {
6539 for (int i
= 0; !done(); ++i
) {
6540 ASSERT_OK(db_
->Put(WriteOptions(), Slice("hi"),
6541 Slice(std::to_string(i
).c_str())));
6542 ASSERT_OK(db_
->Flush(FlushOptions()));
6543 int cnt
= ++flushes_done
;
6544 fprintf(stderr
, "Flushed %d times\n", cnt
);
6548 std::vector
<port::Thread
> thread_spawning_threads(10);
6549 for (auto& t
: thread_spawning_threads
) {
6550 t
= port::Thread([&] {
6553 port::Thread
tmp_thread([&] {
6554 auto it
= db_
->NewIterator(ReadOptions());
6559 ++threads_destroyed
;
6564 for (auto& t
: thread_spawning_threads
) {
6567 flushing_thread
.join();
6568 fprintf(stderr
, "Done. Flushed %d times, destroyed %d threads\n",
6569 flushes_done
.load(), threads_destroyed
.load());
6572 TEST_F(DBTest
, LargeBlockSizeTest
) {
6573 Options options
= CurrentOptions();
6574 CreateAndReopenWithCF({"pikachu"}, options
);
6575 ASSERT_OK(Put(0, "foo", "bar"));
6576 BlockBasedTableOptions table_options
;
6577 table_options
.block_size
= 8LL * 1024 * 1024 * 1024LL;
6578 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
6579 ASSERT_NOK(TryReopenWithColumnFamilies({"default", "pikachu"}, options
));
6582 #ifndef ROCKSDB_LITE
6584 TEST_F(DBTest
, CreationTimeOfOldestFile
) {
6585 const int kNumKeysPerFile
= 32;
6586 const int kNumLevelFiles
= 2;
6587 const int kValueSize
= 100;
6589 Options options
= CurrentOptions();
6590 options
.max_open_files
= -1;
6591 env_
->SetMockSleep();
6594 // NOTE: Presumed unnecessary and removed: resetting mock time in env
6596 DestroyAndReopen(options
);
6598 bool set_file_creation_time_to_zero
= true;
6602 env_
->GetCurrentTime(&time_1
);
6603 const uint64_t uint_time_1
= static_cast<uint64_t>(time_1
);
6606 env_
->MockSleepForSeconds(50 * 60 * 60);
6609 env_
->GetCurrentTime(&time_2
);
6610 const uint64_t uint_time_2
= static_cast<uint64_t>(time_2
);
6612 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
6613 "PropertyBlockBuilder::AddTableProperty:Start", [&](void* arg
) {
6614 TableProperties
* props
= reinterpret_cast<TableProperties
*>(arg
);
6615 if (set_file_creation_time_to_zero
) {
6617 props
->file_creation_time
= 0;
6619 } else if (idx
== 1) {
6620 props
->file_creation_time
= uint_time_1
;
6625 props
->file_creation_time
= uint_time_1
;
6627 } else if (idx
== 1) {
6628 props
->file_creation_time
= uint_time_2
;
6632 // Set file creation time in manifest all to 0.
6633 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
6634 "FileMetaData::FileMetaData", [&](void* arg
) {
6635 FileMetaData
* meta
= static_cast<FileMetaData
*>(arg
);
6636 meta
->file_creation_time
= 0;
6638 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
6641 for (int i
= 0; i
< kNumLevelFiles
; ++i
) {
6642 for (int j
= 0; j
< kNumKeysPerFile
; ++j
) {
6644 Put(Key(i
* kNumKeysPerFile
+ j
), rnd
.RandomString(kValueSize
)));
6649 // At this point there should be 2 files, one with file_creation_time = 0 and
6650 // the other non-zero. GetCreationTimeOfOldestFile API should return 0.
6651 uint64_t creation_time
;
6652 Status s1
= dbfull()->GetCreationTimeOfOldestFile(&creation_time
);
6653 ASSERT_EQ(0, creation_time
);
6654 ASSERT_EQ(s1
, Status::OK());
6656 // Testing with non-zero file creation time.
6657 set_file_creation_time_to_zero
= false;
6658 options
= CurrentOptions();
6659 options
.max_open_files
= -1;
6662 // NOTE: Presumed unnecessary and removed: resetting mock time in env
6664 DestroyAndReopen(options
);
6666 for (int i
= 0; i
< kNumLevelFiles
; ++i
) {
6667 for (int j
= 0; j
< kNumKeysPerFile
; ++j
) {
6669 Put(Key(i
* kNumKeysPerFile
+ j
), rnd
.RandomString(kValueSize
)));
6674 // At this point there should be 2 files with non-zero file creation time.
6675 // GetCreationTimeOfOldestFile API should return non-zero value.
6677 Status s2
= dbfull()->GetCreationTimeOfOldestFile(&ctime
);
6678 ASSERT_EQ(uint_time_1
, ctime
);
6679 ASSERT_EQ(s2
, Status::OK());
6681 // Testing with max_open_files != -1
6682 options
= CurrentOptions();
6683 options
.max_open_files
= 10;
6684 DestroyAndReopen(options
);
6685 Status s3
= dbfull()->GetCreationTimeOfOldestFile(&ctime
);
6686 ASSERT_EQ(s3
, Status::NotSupported());
6688 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
6691 TEST_F(DBTest
, MemoryUsageWithMaxWriteBufferSizeToMaintain
) {
6692 Options options
= CurrentOptions();
6693 options
.max_write_buffer_size_to_maintain
= 10000;
6694 options
.write_buffer_size
= 160000;
6697 bool memory_limit_exceeded
= false;
6698 uint64_t size_all_mem_table
= 0;
6699 uint64_t cur_active_mem
= 0;
6700 for (int i
= 0; i
< 1000; i
++) {
6701 std::string value
= rnd
.RandomString(1000);
6702 ASSERT_OK(Put("keykey_" + std::to_string(i
), value
));
6704 dbfull()->TEST_WaitForFlushMemTable();
6706 ASSERT_TRUE(db_
->GetIntProperty(db_
->DefaultColumnFamily(),
6707 DB::Properties::kSizeAllMemTables
,
6708 &size_all_mem_table
));
6709 ASSERT_TRUE(db_
->GetIntProperty(db_
->DefaultColumnFamily(),
6710 DB::Properties::kCurSizeActiveMemTable
,
6713 // Errors out if memory usage keeps on increasing beyond the limit.
6714 // Once memory limit exceeds, memory_limit_exceeded is set and if
6715 // size_all_mem_table doesn't drop out in the next write then it errors out
6716 // (not expected behaviour). If memory usage drops then
6717 // memory_limit_exceeded is set to false.
6718 if ((size_all_mem_table
> cur_active_mem
) &&
6720 static_cast<uint64_t>(options
.max_write_buffer_size_to_maintain
)) &&
6721 (size_all_mem_table
> options
.max_write_buffer_size_to_maintain
+
6722 options
.write_buffer_size
)) {
6723 ASSERT_FALSE(memory_limit_exceeded
);
6724 memory_limit_exceeded
= true;
6726 memory_limit_exceeded
= false;
6733 } // namespace ROCKSDB_NAMESPACE
6735 #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
6737 void RegisterCustomObjects(int argc
, char** argv
);
6740 void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
6741 #endif // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
6743 int main(int argc
, char** argv
) {
6744 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
6745 ::testing::InitGoogleTest(&argc
, argv
);
6746 RegisterCustomObjects(argc
, argv
);
6747 return RUN_ALL_TESTS();