1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 // Introduction of SyncPoint effectively disabled building and running this test
12 // which is a pity, it is a good test
17 #include <unordered_set>
26 #include "cache/lru_cache.h"
27 #include "db/blob_index.h"
28 #include "db/db_impl/db_impl.h"
29 #include "db/db_test_util.h"
30 #include "db/dbformat.h"
31 #include "db/job_context.h"
32 #include "db/version_set.h"
33 #include "db/write_batch_internal.h"
34 #include "env/mock_env.h"
35 #include "file/filename.h"
36 #include "memtable/hash_linklist_rep.h"
37 #include "monitoring/thread_status_util.h"
38 #include "port/port.h"
39 #include "port/stack_trace.h"
40 #include "rocksdb/cache.h"
41 #include "rocksdb/compaction_filter.h"
42 #include "rocksdb/convenience.h"
43 #include "rocksdb/db.h"
44 #include "rocksdb/env.h"
45 #include "rocksdb/experimental.h"
46 #include "rocksdb/filter_policy.h"
47 #include "rocksdb/options.h"
48 #include "rocksdb/perf_context.h"
49 #include "rocksdb/slice.h"
50 #include "rocksdb/slice_transform.h"
51 #include "rocksdb/snapshot.h"
52 #include "rocksdb/table.h"
53 #include "rocksdb/table_properties.h"
54 #include "rocksdb/thread_status.h"
55 #include "rocksdb/utilities/checkpoint.h"
56 #include "rocksdb/utilities/optimistic_transaction_db.h"
57 #include "rocksdb/utilities/write_batch_with_index.h"
58 #include "table/block_based/block_based_table_factory.h"
59 #include "table/mock_table.h"
60 #include "table/plain/plain_table_factory.h"
61 #include "table/scoped_arena_iterator.h"
62 #include "test_util/sync_point.h"
63 #include "test_util/testharness.h"
64 #include "test_util/testutil.h"
65 #include "util/compression.h"
66 #include "util/mutexlock.h"
67 #include "util/rate_limiter.h"
68 #include "util/string_util.h"
69 #include "utilities/merge_operators.h"
71 namespace ROCKSDB_NAMESPACE
{
73 class DBTest
: public DBTestBase
{
75 DBTest() : DBTestBase("/db_test") {}
80 public testing::WithParamInterface
<std::tuple
<uint32_t, bool>> {
83 max_subcompactions_
= std::get
<0>(GetParam());
84 exclusive_manual_compaction_
= std::get
<1>(GetParam());
87 // Required if inheriting from testing::WithParamInterface<>
88 static void SetUpTestCase() {}
89 static void TearDownTestCase() {}
91 uint32_t max_subcompactions_
;
92 bool exclusive_manual_compaction_
;
95 TEST_F(DBTest
, MockEnvTest
) {
96 std::unique_ptr
<MockEnv
> env
{new MockEnv(Env::Default())};
98 options
.create_if_missing
= true;
99 options
.env
= env
.get();
102 const Slice keys
[] = {Slice("aaa"), Slice("bbb"), Slice("ccc")};
103 const Slice vals
[] = {Slice("foo"), Slice("bar"), Slice("baz")};
105 ASSERT_OK(DB::Open(options
, "/dir/db", &db
));
106 for (size_t i
= 0; i
< 3; ++i
) {
107 ASSERT_OK(db
->Put(WriteOptions(), keys
[i
], vals
[i
]));
110 for (size_t i
= 0; i
< 3; ++i
) {
112 ASSERT_OK(db
->Get(ReadOptions(), keys
[i
], &res
));
113 ASSERT_TRUE(res
== vals
[i
]);
116 Iterator
* iterator
= db
->NewIterator(ReadOptions());
117 iterator
->SeekToFirst();
118 for (size_t i
= 0; i
< 3; ++i
) {
119 ASSERT_TRUE(iterator
->Valid());
120 ASSERT_TRUE(keys
[i
] == iterator
->key());
121 ASSERT_TRUE(vals
[i
] == iterator
->value());
124 ASSERT_TRUE(!iterator
->Valid());
127 // TEST_FlushMemTable() is not supported in ROCKSDB_LITE
129 DBImpl
* dbi
= reinterpret_cast<DBImpl
*>(db
);
130 ASSERT_OK(dbi
->TEST_FlushMemTable());
132 for (size_t i
= 0; i
< 3; ++i
) {
134 ASSERT_OK(db
->Get(ReadOptions(), keys
[i
], &res
));
135 ASSERT_TRUE(res
== vals
[i
]);
137 #endif // ROCKSDB_LITE
142 // NewMemEnv returns nullptr in ROCKSDB_LITE since class InMemoryEnv isn't
145 TEST_F(DBTest
, MemEnvTest
) {
146 std::unique_ptr
<Env
> env
{NewMemEnv(Env::Default())};
148 options
.create_if_missing
= true;
149 options
.env
= env
.get();
152 const Slice keys
[] = {Slice("aaa"), Slice("bbb"), Slice("ccc")};
153 const Slice vals
[] = {Slice("foo"), Slice("bar"), Slice("baz")};
155 ASSERT_OK(DB::Open(options
, "/dir/db", &db
));
156 for (size_t i
= 0; i
< 3; ++i
) {
157 ASSERT_OK(db
->Put(WriteOptions(), keys
[i
], vals
[i
]));
160 for (size_t i
= 0; i
< 3; ++i
) {
162 ASSERT_OK(db
->Get(ReadOptions(), keys
[i
], &res
));
163 ASSERT_TRUE(res
== vals
[i
]);
166 Iterator
* iterator
= db
->NewIterator(ReadOptions());
167 iterator
->SeekToFirst();
168 for (size_t i
= 0; i
< 3; ++i
) {
169 ASSERT_TRUE(iterator
->Valid());
170 ASSERT_TRUE(keys
[i
] == iterator
->key());
171 ASSERT_TRUE(vals
[i
] == iterator
->value());
174 ASSERT_TRUE(!iterator
->Valid());
177 DBImpl
* dbi
= reinterpret_cast<DBImpl
*>(db
);
178 ASSERT_OK(dbi
->TEST_FlushMemTable());
180 for (size_t i
= 0; i
< 3; ++i
) {
182 ASSERT_OK(db
->Get(ReadOptions(), keys
[i
], &res
));
183 ASSERT_TRUE(res
== vals
[i
]);
188 options
.create_if_missing
= false;
189 ASSERT_OK(DB::Open(options
, "/dir/db", &db
));
190 for (size_t i
= 0; i
< 3; ++i
) {
192 ASSERT_OK(db
->Get(ReadOptions(), keys
[i
], &res
));
193 ASSERT_TRUE(res
== vals
[i
]);
197 #endif // ROCKSDB_LITE
199 TEST_F(DBTest
, WriteEmptyBatch
) {
200 Options options
= CurrentOptions();
202 options
.write_buffer_size
= 100000;
203 CreateAndReopenWithCF({"pikachu"}, options
);
205 ASSERT_OK(Put(1, "foo", "bar"));
208 wo
.disableWAL
= false;
209 WriteBatch empty_batch
;
210 ASSERT_OK(dbfull()->Write(wo
, &empty_batch
));
212 // make sure we can re-open it.
213 ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options
));
214 ASSERT_EQ("bar", Get(1, "foo"));
217 TEST_F(DBTest
, SkipDelay
) {
218 Options options
= CurrentOptions();
220 options
.write_buffer_size
= 100000;
221 CreateAndReopenWithCF({"pikachu"}, options
);
223 for (bool sync
: {true, false}) {
224 for (bool disableWAL
: {true, false}) {
225 if (sync
&& disableWAL
) {
226 // sync and disableWAL is incompatible.
229 // Use a small number to ensure a large delay that is still effective
231 // TODO(myabandeh): this is time dependent and could potentially make
233 auto token
= dbfull()->TEST_write_controler().GetDelayToken(1);
234 std::atomic
<int> sleep_count(0);
235 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
236 "DBImpl::DelayWrite:Sleep",
237 [&](void* /*arg*/) { sleep_count
.fetch_add(1); });
238 std::atomic
<int> wait_count(0);
239 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
240 "DBImpl::DelayWrite:Wait",
241 [&](void* /*arg*/) { wait_count
.fetch_add(1); });
242 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
246 wo
.disableWAL
= disableWAL
;
247 wo
.no_slowdown
= true;
248 dbfull()->Put(wo
, "foo", "bar");
249 // We need the 2nd write to trigger delay. This is because delay is
250 // estimated based on the last write size which is 0 for the first write.
251 ASSERT_NOK(dbfull()->Put(wo
, "foo2", "bar2"));
252 ASSERT_GE(sleep_count
.load(), 0);
253 ASSERT_GE(wait_count
.load(), 0);
256 token
= dbfull()->TEST_write_controler().GetDelayToken(1000000000);
257 wo
.no_slowdown
= false;
258 ASSERT_OK(dbfull()->Put(wo
, "foo3", "bar3"));
259 ASSERT_GE(sleep_count
.load(), 1);
265 TEST_F(DBTest
, MixedSlowdownOptions
) {
266 Options options
= CurrentOptions();
268 options
.write_buffer_size
= 100000;
269 CreateAndReopenWithCF({"pikachu"}, options
);
270 std::vector
<port::Thread
> threads
;
271 std::atomic
<int> thread_num(0);
273 std::function
<void()> write_slowdown_func
= [&]() {
274 int a
= thread_num
.fetch_add(1);
275 std::string key
= "foo" + std::to_string(a
);
277 wo
.no_slowdown
= false;
278 ASSERT_OK(dbfull()->Put(wo
, key
, "bar"));
280 std::function
<void()> write_no_slowdown_func
= [&]() {
281 int a
= thread_num
.fetch_add(1);
282 std::string key
= "foo" + std::to_string(a
);
284 wo
.no_slowdown
= true;
285 ASSERT_NOK(dbfull()->Put(wo
, key
, "bar"));
287 // Use a small number to ensure a large delay that is still effective
289 // TODO(myabandeh): this is time dependent and could potentially make
291 auto token
= dbfull()->TEST_write_controler().GetDelayToken(1);
292 std::atomic
<int> sleep_count(0);
293 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
294 "DBImpl::DelayWrite:BeginWriteStallDone", [&](void* /*arg*/) {
295 sleep_count
.fetch_add(1);
296 if (threads
.empty()) {
297 for (int i
= 0; i
< 2; ++i
) {
298 threads
.emplace_back(write_slowdown_func
);
300 for (int i
= 0; i
< 2; ++i
) {
301 threads
.emplace_back(write_no_slowdown_func
);
305 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
309 wo
.disableWAL
= false;
310 wo
.no_slowdown
= false;
311 dbfull()->Put(wo
, "foo", "bar");
312 // We need the 2nd write to trigger delay. This is because delay is
313 // estimated based on the last write size which is 0 for the first write.
314 ASSERT_OK(dbfull()->Put(wo
, "foo2", "bar2"));
317 for (auto& t
: threads
) {
320 ASSERT_GE(sleep_count
.load(), 1);
322 wo
.no_slowdown
= true;
323 ASSERT_OK(dbfull()->Put(wo
, "foo3", "bar"));
326 TEST_F(DBTest
, MixedSlowdownOptionsInQueue
) {
327 Options options
= CurrentOptions();
329 options
.write_buffer_size
= 100000;
330 CreateAndReopenWithCF({"pikachu"}, options
);
331 std::vector
<port::Thread
> threads
;
332 std::atomic
<int> thread_num(0);
334 std::function
<void()> write_no_slowdown_func
= [&]() {
335 int a
= thread_num
.fetch_add(1);
336 std::string key
= "foo" + std::to_string(a
);
338 wo
.no_slowdown
= true;
339 ASSERT_NOK(dbfull()->Put(wo
, key
, "bar"));
341 // Use a small number to ensure a large delay that is still effective
343 // TODO(myabandeh): this is time dependent and could potentially make
345 auto token
= dbfull()->TEST_write_controler().GetDelayToken(1);
346 std::atomic
<int> sleep_count(0);
347 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
348 "DBImpl::DelayWrite:Sleep", [&](void* /*arg*/) {
349 sleep_count
.fetch_add(1);
350 if (threads
.empty()) {
351 for (int i
= 0; i
< 2; ++i
) {
352 threads
.emplace_back(write_no_slowdown_func
);
354 // Sleep for 2s to allow the threads to insert themselves into the
356 env_
->SleepForMicroseconds(3000000ULL);
359 std::atomic
<int> wait_count(0);
360 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
361 "DBImpl::DelayWrite:Wait",
362 [&](void* /*arg*/) { wait_count
.fetch_add(1); });
363 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
367 wo
.disableWAL
= false;
368 wo
.no_slowdown
= false;
369 dbfull()->Put(wo
, "foo", "bar");
370 // We need the 2nd write to trigger delay. This is because delay is
371 // estimated based on the last write size which is 0 for the first write.
372 ASSERT_OK(dbfull()->Put(wo
, "foo2", "bar2"));
375 for (auto& t
: threads
) {
378 ASSERT_EQ(sleep_count
.load(), 1);
379 ASSERT_GE(wait_count
.load(), 0);
382 TEST_F(DBTest
, MixedSlowdownOptionsStop
) {
383 Options options
= CurrentOptions();
385 options
.write_buffer_size
= 100000;
386 CreateAndReopenWithCF({"pikachu"}, options
);
387 std::vector
<port::Thread
> threads
;
388 std::atomic
<int> thread_num(0);
390 std::function
<void()> write_slowdown_func
= [&]() {
391 int a
= thread_num
.fetch_add(1);
392 std::string key
= "foo" + std::to_string(a
);
394 wo
.no_slowdown
= false;
395 ASSERT_OK(dbfull()->Put(wo
, key
, "bar"));
397 std::function
<void()> write_no_slowdown_func
= [&]() {
398 int a
= thread_num
.fetch_add(1);
399 std::string key
= "foo" + std::to_string(a
);
401 wo
.no_slowdown
= true;
402 ASSERT_NOK(dbfull()->Put(wo
, key
, "bar"));
404 std::function
<void()> wakeup_writer
= [&]() {
405 dbfull()->mutex_
.Lock();
406 dbfull()->bg_cv_
.SignalAll();
407 dbfull()->mutex_
.Unlock();
409 // Use a small number to ensure a large delay that is still effective
411 // TODO(myabandeh): this is time dependent and could potentially make
413 auto token
= dbfull()->TEST_write_controler().GetStopToken();
414 std::atomic
<int> wait_count(0);
415 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
416 "DBImpl::DelayWrite:Wait", [&](void* /*arg*/) {
417 wait_count
.fetch_add(1);
418 if (threads
.empty()) {
419 for (int i
= 0; i
< 2; ++i
) {
420 threads
.emplace_back(write_slowdown_func
);
422 for (int i
= 0; i
< 2; ++i
) {
423 threads
.emplace_back(write_no_slowdown_func
);
425 // Sleep for 2s to allow the threads to insert themselves into the
427 env_
->SleepForMicroseconds(3000000ULL);
430 threads
.emplace_back(wakeup_writer
);
432 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
436 wo
.disableWAL
= false;
437 wo
.no_slowdown
= false;
438 dbfull()->Put(wo
, "foo", "bar");
439 // We need the 2nd write to trigger delay. This is because delay is
440 // estimated based on the last write size which is 0 for the first write.
441 ASSERT_OK(dbfull()->Put(wo
, "foo2", "bar2"));
444 for (auto& t
: threads
) {
447 ASSERT_GE(wait_count
.load(), 1);
449 wo
.no_slowdown
= true;
450 ASSERT_OK(dbfull()->Put(wo
, "foo3", "bar"));
454 TEST_F(DBTest
, LevelLimitReopen
) {
455 Options options
= CurrentOptions();
456 CreateAndReopenWithCF({"pikachu"}, options
);
458 const std::string
value(1024 * 1024, ' ');
460 while (NumTableFilesAtLevel(2, 1) == 0) {
461 ASSERT_OK(Put(1, Key(i
++), value
));
464 options
.num_levels
= 1;
465 options
.max_bytes_for_level_multiplier_additional
.resize(1, 1);
466 Status s
= TryReopenWithColumnFamilies({"default", "pikachu"}, options
);
467 ASSERT_EQ(s
.IsInvalidArgument(), true);
468 ASSERT_EQ(s
.ToString(),
469 "Invalid argument: db has more levels than options.num_levels");
471 options
.num_levels
= 10;
472 options
.max_bytes_for_level_multiplier_additional
.resize(10, 1);
473 ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options
));
475 #endif // ROCKSDB_LITE
478 TEST_F(DBTest
, PutSingleDeleteGet
) {
480 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
481 ASSERT_OK(Put(1, "foo", "v1"));
482 ASSERT_EQ("v1", Get(1, "foo"));
483 ASSERT_OK(Put(1, "foo2", "v2"));
484 ASSERT_EQ("v2", Get(1, "foo2"));
485 ASSERT_OK(SingleDelete(1, "foo"));
486 ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
487 // Skip FIFO and universal compaction beccause they do not apply to the test
488 // case. Skip MergePut because single delete does not get removed when it
489 // encounters a merge.
490 } while (ChangeOptions(kSkipFIFOCompaction
| kSkipUniversalCompaction
|
494 TEST_F(DBTest
, ReadFromPersistedTier
) {
497 Options options
= CurrentOptions();
498 for (int disableWAL
= 0; disableWAL
<= 1; ++disableWAL
) {
499 CreateAndReopenWithCF({"pikachu"}, options
);
501 wopt
.disableWAL
= (disableWAL
== 1);
502 // 1st round: put but not flush
503 ASSERT_OK(db_
->Put(wopt
, handles_
[1], "foo", "first"));
504 ASSERT_OK(db_
->Put(wopt
, handles_
[1], "bar", "one"));
505 ASSERT_EQ("first", Get(1, "foo"));
506 ASSERT_EQ("one", Get(1, "bar"));
508 // Read directly from persited data.
510 ropt
.read_tier
= kPersistedTier
;
512 if (wopt
.disableWAL
) {
513 // as data has not yet being flushed, we expect not found.
514 ASSERT_TRUE(db_
->Get(ropt
, handles_
[1], "foo", &value
).IsNotFound());
515 ASSERT_TRUE(db_
->Get(ropt
, handles_
[1], "bar", &value
).IsNotFound());
517 ASSERT_OK(db_
->Get(ropt
, handles_
[1], "foo", &value
));
518 ASSERT_OK(db_
->Get(ropt
, handles_
[1], "bar", &value
));
522 std::vector
<ColumnFamilyHandle
*> multiget_cfs
;
523 multiget_cfs
.push_back(handles_
[1]);
524 multiget_cfs
.push_back(handles_
[1]);
525 std::vector
<Slice
> multiget_keys
;
526 multiget_keys
.push_back("foo");
527 multiget_keys
.push_back("bar");
528 std::vector
<std::string
> multiget_values
;
530 db_
->MultiGet(ropt
, multiget_cfs
, multiget_keys
, &multiget_values
);
531 if (wopt
.disableWAL
) {
532 ASSERT_TRUE(statuses
[0].IsNotFound());
533 ASSERT_TRUE(statuses
[1].IsNotFound());
535 ASSERT_OK(statuses
[0]);
536 ASSERT_OK(statuses
[1]);
539 // 2nd round: flush and put a new value in memtable.
541 ASSERT_OK(db_
->Put(wopt
, handles_
[1], "rocksdb", "hello"));
543 // once the data has been flushed, we are able to get the
544 // data when kPersistedTier is used.
545 ASSERT_TRUE(db_
->Get(ropt
, handles_
[1], "foo", &value
).ok());
546 ASSERT_EQ(value
, "first");
547 ASSERT_TRUE(db_
->Get(ropt
, handles_
[1], "bar", &value
).ok());
548 ASSERT_EQ(value
, "one");
549 if (wopt
.disableWAL
) {
551 db_
->Get(ropt
, handles_
[1], "rocksdb", &value
).IsNotFound());
553 ASSERT_OK(db_
->Get(ropt
, handles_
[1], "rocksdb", &value
));
554 ASSERT_EQ(value
, "hello");
557 // Expect same result in multiget
558 multiget_cfs
.push_back(handles_
[1]);
559 multiget_keys
.push_back("rocksdb");
561 db_
->MultiGet(ropt
, multiget_cfs
, multiget_keys
, &multiget_values
);
562 ASSERT_TRUE(statuses
[0].ok());
563 ASSERT_EQ("first", multiget_values
[0]);
564 ASSERT_TRUE(statuses
[1].ok());
565 ASSERT_EQ("one", multiget_values
[1]);
566 if (wopt
.disableWAL
) {
567 ASSERT_TRUE(statuses
[2].IsNotFound());
569 ASSERT_OK(statuses
[2]);
572 // 3rd round: delete and flush
573 ASSERT_OK(db_
->Delete(wopt
, handles_
[1], "foo"));
575 ASSERT_OK(db_
->Delete(wopt
, handles_
[1], "bar"));
577 ASSERT_TRUE(db_
->Get(ropt
, handles_
[1], "foo", &value
).IsNotFound());
578 if (wopt
.disableWAL
) {
579 // Still expect finding the value as its delete has not yet being
581 ASSERT_TRUE(db_
->Get(ropt
, handles_
[1], "bar", &value
).ok());
582 ASSERT_EQ(value
, "one");
584 ASSERT_TRUE(db_
->Get(ropt
, handles_
[1], "bar", &value
).IsNotFound());
586 ASSERT_TRUE(db_
->Get(ropt
, handles_
[1], "rocksdb", &value
).ok());
587 ASSERT_EQ(value
, "hello");
590 db_
->MultiGet(ropt
, multiget_cfs
, multiget_keys
, &multiget_values
);
591 ASSERT_TRUE(statuses
[0].IsNotFound());
592 if (wopt
.disableWAL
) {
593 ASSERT_TRUE(statuses
[1].ok());
594 ASSERT_EQ("one", multiget_values
[1]);
596 ASSERT_TRUE(statuses
[1].IsNotFound());
598 ASSERT_TRUE(statuses
[2].ok());
599 ASSERT_EQ("hello", multiget_values
[2]);
600 if (wopt
.disableWAL
== 0) {
601 DestroyAndReopen(options
);
604 } while (ChangeOptions());
607 TEST_F(DBTest
, SingleDeleteFlush
) {
608 // Test to check whether flushing preserves a single delete hidden
613 Options options
= CurrentOptions();
614 options
.disable_auto_compactions
= true;
615 CreateAndReopenWithCF({"pikachu"}, options
);
617 // Put values on second level (so that they will not be in the same
618 // compaction as the other operations.
619 Put(1, "foo", "first");
620 Put(1, "bar", "one");
622 MoveFilesToLevel(2, 1);
624 // (Single) delete hidden by a put
625 SingleDelete(1, "foo");
626 Put(1, "foo", "second");
628 Put(1, "bar", "two");
631 SingleDelete(1, "foo");
635 dbfull()->CompactRange(CompactRangeOptions(), handles_
[1], nullptr,
638 ASSERT_EQ("NOT_FOUND", Get(1, "bar"));
639 ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
640 // Skip FIFO and universal compaction beccause they do not apply to the test
641 // case. Skip MergePut because single delete does not get removed when it
642 // encounters a merge.
643 } while (ChangeOptions(kSkipFIFOCompaction
| kSkipUniversalCompaction
|
647 TEST_F(DBTest
, SingleDeletePutFlush
) {
648 // Single deletes that encounter the matching put in a flush should get
653 Options options
= CurrentOptions();
654 options
.disable_auto_compactions
= true;
655 CreateAndReopenWithCF({"pikachu"}, options
);
657 Put(1, "foo", Slice());
658 Put(1, "a", Slice());
659 SingleDelete(1, "a");
662 ASSERT_EQ("[ ]", AllEntriesFor("a", 1));
663 // Skip FIFO and universal compaction beccause they do not apply to the test
664 // case. Skip MergePut because single delete does not get removed when it
665 // encounters a merge.
666 } while (ChangeOptions(kSkipFIFOCompaction
| kSkipUniversalCompaction
|
670 // Disable because not all platform can run it.
671 // It requires more than 9GB memory to run it, With single allocation
673 TEST_F(DBTest
, DISABLED_SanitizeVeryVeryLargeValue
) {
674 const size_t kValueSize
= 4 * size_t{1024 * 1024 * 1024}; // 4GB value
675 std::string
raw(kValueSize
, 'v');
676 Options options
= CurrentOptions();
678 options
.merge_operator
= MergeOperators::CreatePutOperator();
679 options
.write_buffer_size
= 100000; // Small write buffer
680 options
.paranoid_checks
= true;
681 DestroyAndReopen(options
);
683 ASSERT_OK(Put("boo", "v1"));
684 ASSERT_TRUE(Put("foo", raw
).IsInvalidArgument());
685 ASSERT_TRUE(Merge("foo", raw
).IsInvalidArgument());
688 ASSERT_TRUE(wb
.Put("foo", raw
).IsInvalidArgument());
689 ASSERT_TRUE(wb
.Merge("foo", raw
).IsInvalidArgument());
691 Slice value_slice
= raw
;
692 Slice key_slice
= "foo";
693 SliceParts
sp_key(&key_slice
, 1);
694 SliceParts
sp_value(&value_slice
, 1);
696 ASSERT_TRUE(wb
.Put(sp_key
, sp_value
).IsInvalidArgument());
697 ASSERT_TRUE(wb
.Merge(sp_key
, sp_value
).IsInvalidArgument());
700 // Disable because not all platform can run it.
701 // It requires more than 9GB memory to run it, With single allocation
703 TEST_F(DBTest
, DISABLED_VeryLargeValue
) {
704 const size_t kValueSize
= 3221225472u; // 3GB value
705 const size_t kKeySize
= 8388608u; // 8MB key
706 std::string
raw(kValueSize
, 'v');
707 std::string
key1(kKeySize
, 'c');
708 std::string
key2(kKeySize
, 'd');
710 Options options
= CurrentOptions();
712 options
.write_buffer_size
= 100000; // Small write buffer
713 options
.paranoid_checks
= true;
714 DestroyAndReopen(options
);
716 ASSERT_OK(Put("boo", "v1"));
717 ASSERT_OK(Put("foo", "v1"));
718 ASSERT_OK(Put(key1
, raw
));
720 ASSERT_OK(Put(key2
, raw
));
721 dbfull()->TEST_WaitForFlushMemTable();
724 ASSERT_EQ(1, NumTableFilesAtLevel(0));
725 #endif // !ROCKSDB_LITE
728 Status s
= db_
->Get(ReadOptions(), key1
, &value
);
730 ASSERT_EQ(kValueSize
, value
.size());
731 ASSERT_EQ('v', value
[0]);
733 s
= db_
->Get(ReadOptions(), key2
, &value
);
735 ASSERT_EQ(kValueSize
, value
.size());
736 ASSERT_EQ('w', value
[0]);
738 // Compact all files.
740 db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr);
742 // Check DB is not in read-only state.
743 ASSERT_OK(Put("boo", "v1"));
745 s
= db_
->Get(ReadOptions(), key1
, &value
);
747 ASSERT_EQ(kValueSize
, value
.size());
748 ASSERT_EQ('v', value
[0]);
750 s
= db_
->Get(ReadOptions(), key2
, &value
);
752 ASSERT_EQ(kValueSize
, value
.size());
753 ASSERT_EQ('w', value
[0]);
756 TEST_F(DBTest
, GetFromImmutableLayer
) {
758 Options options
= CurrentOptions();
760 CreateAndReopenWithCF({"pikachu"}, options
);
762 ASSERT_OK(Put(1, "foo", "v1"));
763 ASSERT_EQ("v1", Get(1, "foo"));
766 env_
->delay_sstable_sync_
.store(true, std::memory_order_release
);
767 Put(1, "k1", std::string(100000, 'x')); // Fill memtable
768 Put(1, "k2", std::string(100000, 'y')); // Trigger flush
769 ASSERT_EQ("v1", Get(1, "foo"));
770 ASSERT_EQ("NOT_FOUND", Get(0, "foo"));
771 // Release sync calls
772 env_
->delay_sstable_sync_
.store(false, std::memory_order_release
);
773 } while (ChangeOptions());
777 TEST_F(DBTest
, GetLevel0Ordering
) {
779 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
780 // Check that we process level-0 files in correct order. The code
781 // below generates two level-0 files where the earlier one comes
782 // before the later one in the level-0 file list since the earlier
783 // one has a smaller "smallest" key.
784 ASSERT_OK(Put(1, "bar", "b"));
785 ASSERT_OK(Put(1, "foo", "v1"));
787 ASSERT_OK(Put(1, "foo", "v2"));
789 ASSERT_EQ("v2", Get(1, "foo"));
790 } while (ChangeOptions());
793 TEST_F(DBTest
, WrongLevel0Config
) {
794 Options options
= CurrentOptions();
796 ASSERT_OK(DestroyDB(dbname_
, options
));
797 options
.level0_stop_writes_trigger
= 1;
798 options
.level0_slowdown_writes_trigger
= 2;
799 options
.level0_file_num_compaction_trigger
= 3;
800 ASSERT_OK(DB::Open(options
, dbname_
, &db_
));
804 TEST_F(DBTest
, GetOrderedByLevels
) {
806 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
807 ASSERT_OK(Put(1, "foo", "v1"));
808 Compact(1, "a", "z");
809 ASSERT_EQ("v1", Get(1, "foo"));
810 ASSERT_OK(Put(1, "foo", "v2"));
811 ASSERT_EQ("v2", Get(1, "foo"));
813 ASSERT_EQ("v2", Get(1, "foo"));
814 } while (ChangeOptions());
817 TEST_F(DBTest
, GetPicksCorrectFile
) {
819 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
820 // Arrange to have multiple files in a non-level-0 level.
821 ASSERT_OK(Put(1, "a", "va"));
822 Compact(1, "a", "b");
823 ASSERT_OK(Put(1, "x", "vx"));
824 Compact(1, "x", "y");
825 ASSERT_OK(Put(1, "f", "vf"));
826 Compact(1, "f", "g");
827 ASSERT_EQ("va", Get(1, "a"));
828 ASSERT_EQ("vf", Get(1, "f"));
829 ASSERT_EQ("vx", Get(1, "x"));
830 } while (ChangeOptions());
833 TEST_F(DBTest
, GetEncountersEmptyLevel
) {
835 Options options
= CurrentOptions();
836 CreateAndReopenWithCF({"pikachu"}, options
);
837 // Arrange for the following to happen:
838 // * sstable A in level 0
839 // * nothing in level 1
840 // * sstable B in level 2
841 // Then do enough Get() calls to arrange for an automatic compaction
842 // of sstable A. A bug would cause the compaction to be marked as
843 // occurring at level 1 (instead of the correct level 0).
845 // Step 1: First place sstables in levels 0 and 2
846 Put(1, "a", "begin");
849 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_
[1]);
850 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_
[1]);
851 Put(1, "a", "begin");
854 ASSERT_GT(NumTableFilesAtLevel(0, 1), 0);
855 ASSERT_GT(NumTableFilesAtLevel(2, 1), 0);
857 // Step 2: clear level 1 if necessary.
858 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_
[1]);
859 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 1);
860 ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
861 ASSERT_EQ(NumTableFilesAtLevel(2, 1), 1);
863 // Step 3: read a bunch of times
864 for (int i
= 0; i
< 1000; i
++) {
865 ASSERT_EQ("NOT_FOUND", Get(1, "missing"));
868 // Step 4: Wait for compaction to finish
869 dbfull()->TEST_WaitForCompact();
871 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 1); // XXX
872 } while (ChangeOptions(kSkipUniversalCompaction
| kSkipFIFOCompaction
));
874 #endif // ROCKSDB_LITE
876 TEST_F(DBTest
, FlushMultipleMemtable
) {
878 Options options
= CurrentOptions();
879 WriteOptions writeOpt
= WriteOptions();
880 writeOpt
.disableWAL
= true;
881 options
.max_write_buffer_number
= 4;
882 options
.min_write_buffer_number_to_merge
= 3;
883 options
.max_write_buffer_size_to_maintain
= -1;
884 CreateAndReopenWithCF({"pikachu"}, options
);
885 ASSERT_OK(dbfull()->Put(writeOpt
, handles_
[1], "foo", "v1"));
887 ASSERT_OK(dbfull()->Put(writeOpt
, handles_
[1], "bar", "v1"));
889 ASSERT_EQ("v1", Get(1, "foo"));
890 ASSERT_EQ("v1", Get(1, "bar"));
892 } while (ChangeCompactOptions());
895 TEST_F(DBTest
, FlushSchedule
) {
896 Options options
= CurrentOptions();
897 options
.disable_auto_compactions
= true;
898 options
.level0_stop_writes_trigger
= 1 << 10;
899 options
.level0_slowdown_writes_trigger
= 1 << 10;
900 options
.min_write_buffer_number_to_merge
= 1;
901 options
.max_write_buffer_size_to_maintain
=
902 static_cast<int64_t>(options
.write_buffer_size
);
903 options
.max_write_buffer_number
= 2;
904 options
.write_buffer_size
= 120 * 1024;
905 CreateAndReopenWithCF({"pikachu"}, options
);
906 std::vector
<port::Thread
> threads
;
908 std::atomic
<int> thread_num(0);
909 // each column family will have 5 thread, each thread generating 2 memtables.
910 // each column family should end up with 10 table files
911 std::function
<void()> fill_memtable_func
= [&]() {
912 int a
= thread_num
.fetch_add(1);
915 // this should fill up 2 memtables
916 for (int k
= 0; k
< 5000; ++k
) {
917 ASSERT_OK(db_
->Put(wo
, handles_
[a
& 1], RandomString(&rnd
, 13), ""));
921 for (int i
= 0; i
< 10; ++i
) {
922 threads
.emplace_back(fill_memtable_func
);
925 for (auto& t
: threads
) {
929 auto default_tables
= GetNumberOfSstFilesForColumnFamily(db_
, "default");
930 auto pikachu_tables
= GetNumberOfSstFilesForColumnFamily(db_
, "pikachu");
931 ASSERT_LE(default_tables
, static_cast<uint64_t>(10));
932 ASSERT_GT(default_tables
, static_cast<uint64_t>(0));
933 ASSERT_LE(pikachu_tables
, static_cast<uint64_t>(10));
934 ASSERT_GT(pikachu_tables
, static_cast<uint64_t>(0));
936 #endif // ROCKSDB_LITE
939 class KeepFilter
: public CompactionFilter
{
941 bool Filter(int /*level*/, const Slice
& /*key*/, const Slice
& /*value*/,
942 std::string
* /*new_value*/,
943 bool* /*value_changed*/) const override
{
947 const char* Name() const override
{ return "KeepFilter"; }
950 class KeepFilterFactory
: public CompactionFilterFactory
{
952 explicit KeepFilterFactory(bool check_context
= false)
953 : check_context_(check_context
) {}
955 std::unique_ptr
<CompactionFilter
> CreateCompactionFilter(
956 const CompactionFilter::Context
& context
) override
{
957 if (check_context_
) {
958 EXPECT_EQ(expect_full_compaction_
.load(), context
.is_full_compaction
);
959 EXPECT_EQ(expect_manual_compaction_
.load(), context
.is_manual_compaction
);
961 return std::unique_ptr
<CompactionFilter
>(new KeepFilter());
964 const char* Name() const override
{ return "KeepFilterFactory"; }
966 std::atomic_bool expect_full_compaction_
;
967 std::atomic_bool expect_manual_compaction_
;
970 class DelayFilter
: public CompactionFilter
{
972 explicit DelayFilter(DBTestBase
* d
) : db_test(d
) {}
973 bool Filter(int /*level*/, const Slice
& /*key*/, const Slice
& /*value*/,
974 std::string
* /*new_value*/,
975 bool* /*value_changed*/) const override
{
976 db_test
->env_
->addon_time_
.fetch_add(1000);
980 const char* Name() const override
{ return "DelayFilter"; }
986 class DelayFilterFactory
: public CompactionFilterFactory
{
988 explicit DelayFilterFactory(DBTestBase
* d
) : db_test(d
) {}
989 std::unique_ptr
<CompactionFilter
> CreateCompactionFilter(
990 const CompactionFilter::Context
& /*context*/) override
{
991 return std::unique_ptr
<CompactionFilter
>(new DelayFilter(db_test
));
994 const char* Name() const override
{ return "DelayFilterFactory"; }
1001 #ifndef ROCKSDB_LITE
1003 static std::string
CompressibleString(Random
* rnd
, int len
) {
1005 test::CompressibleString(rnd
, 0.8, len
, &r
);
1008 #endif // ROCKSDB_LITE
1010 TEST_F(DBTest
, FailMoreDbPaths
) {
1011 Options options
= CurrentOptions();
1012 options
.db_paths
.emplace_back(dbname_
, 10000000);
1013 options
.db_paths
.emplace_back(dbname_
+ "_2", 1000000);
1014 options
.db_paths
.emplace_back(dbname_
+ "_3", 1000000);
1015 options
.db_paths
.emplace_back(dbname_
+ "_4", 1000000);
1016 options
.db_paths
.emplace_back(dbname_
+ "_5", 1000000);
1017 ASSERT_TRUE(TryReopen(options
).IsNotSupported());
1020 void CheckColumnFamilyMeta(
1021 const ColumnFamilyMetaData
& cf_meta
,
1022 const std::vector
<std::vector
<FileMetaData
>>& files_by_level
,
1023 uint64_t start_time
, uint64_t end_time
) {
1024 ASSERT_EQ(cf_meta
.name
, kDefaultColumnFamilyName
);
1025 ASSERT_EQ(cf_meta
.levels
.size(), files_by_level
.size());
1027 uint64_t cf_size
= 0;
1028 size_t file_count
= 0;
1030 for (size_t i
= 0; i
< cf_meta
.levels
.size(); ++i
) {
1031 const auto& level_meta_from_cf
= cf_meta
.levels
[i
];
1032 const auto& level_meta_from_files
= files_by_level
[i
];
1034 ASSERT_EQ(level_meta_from_cf
.level
, i
);
1035 ASSERT_EQ(level_meta_from_cf
.files
.size(), level_meta_from_files
.size());
1037 file_count
+= level_meta_from_cf
.files
.size();
1039 uint64_t level_size
= 0;
1040 for (size_t j
= 0; j
< level_meta_from_cf
.files
.size(); ++j
) {
1041 const auto& file_meta_from_cf
= level_meta_from_cf
.files
[j
];
1042 const auto& file_meta_from_files
= level_meta_from_files
[j
];
1044 level_size
+= file_meta_from_cf
.size
;
1046 ASSERT_EQ(file_meta_from_cf
.file_number
,
1047 file_meta_from_files
.fd
.GetNumber());
1048 ASSERT_EQ(file_meta_from_cf
.file_number
,
1049 TableFileNameToNumber(file_meta_from_cf
.name
));
1050 ASSERT_EQ(file_meta_from_cf
.size
, file_meta_from_files
.fd
.file_size
);
1051 ASSERT_EQ(file_meta_from_cf
.smallest_seqno
,
1052 file_meta_from_files
.fd
.smallest_seqno
);
1053 ASSERT_EQ(file_meta_from_cf
.largest_seqno
,
1054 file_meta_from_files
.fd
.largest_seqno
);
1055 ASSERT_EQ(file_meta_from_cf
.smallestkey
,
1056 file_meta_from_files
.smallest
.user_key().ToString());
1057 ASSERT_EQ(file_meta_from_cf
.largestkey
,
1058 file_meta_from_files
.largest
.user_key().ToString());
1059 ASSERT_EQ(file_meta_from_cf
.oldest_blob_file_number
,
1060 file_meta_from_files
.oldest_blob_file_number
);
1061 ASSERT_EQ(file_meta_from_cf
.oldest_ancester_time
,
1062 file_meta_from_files
.oldest_ancester_time
);
1063 ASSERT_EQ(file_meta_from_cf
.file_creation_time
,
1064 file_meta_from_files
.file_creation_time
);
1065 ASSERT_GE(file_meta_from_cf
.file_creation_time
, start_time
);
1066 ASSERT_LE(file_meta_from_cf
.file_creation_time
, end_time
);
1067 ASSERT_GE(file_meta_from_cf
.oldest_ancester_time
, start_time
);
1068 ASSERT_LE(file_meta_from_cf
.oldest_ancester_time
, end_time
);
1071 ASSERT_EQ(level_meta_from_cf
.size
, level_size
);
1072 cf_size
+= level_size
;
1075 ASSERT_EQ(cf_meta
.file_count
, file_count
);
1076 ASSERT_EQ(cf_meta
.size
, cf_size
);
1079 void CheckLiveFilesMeta(
1080 const std::vector
<LiveFileMetaData
>& live_file_meta
,
1081 const std::vector
<std::vector
<FileMetaData
>>& files_by_level
) {
1082 size_t total_file_count
= 0;
1083 for (const auto& f
: files_by_level
) {
1084 total_file_count
+= f
.size();
1087 ASSERT_EQ(live_file_meta
.size(), total_file_count
);
1092 for (const auto& meta
: live_file_meta
) {
1093 if (level
!= meta
.level
) {
1098 ASSERT_LT(i
, files_by_level
[level
].size());
1100 const auto& expected_meta
= files_by_level
[level
][i
];
1102 ASSERT_EQ(meta
.column_family_name
, kDefaultColumnFamilyName
);
1103 ASSERT_EQ(meta
.file_number
, expected_meta
.fd
.GetNumber());
1104 ASSERT_EQ(meta
.file_number
, TableFileNameToNumber(meta
.name
));
1105 ASSERT_EQ(meta
.size
, expected_meta
.fd
.file_size
);
1106 ASSERT_EQ(meta
.smallest_seqno
, expected_meta
.fd
.smallest_seqno
);
1107 ASSERT_EQ(meta
.largest_seqno
, expected_meta
.fd
.largest_seqno
);
1108 ASSERT_EQ(meta
.smallestkey
, expected_meta
.smallest
.user_key().ToString());
1109 ASSERT_EQ(meta
.largestkey
, expected_meta
.largest
.user_key().ToString());
1110 ASSERT_EQ(meta
.oldest_blob_file_number
,
1111 expected_meta
.oldest_blob_file_number
);
1117 #ifndef ROCKSDB_LITE
1118 TEST_F(DBTest
, MetaDataTest
) {
1119 Options options
= CurrentOptions();
1120 options
.create_if_missing
= true;
1121 options
.disable_auto_compactions
= true;
1123 int64_t temp_time
= 0;
1124 options
.env
->GetCurrentTime(&temp_time
);
1125 uint64_t start_time
= static_cast<uint64_t>(temp_time
);
1127 DestroyAndReopen(options
);
1131 for (int i
= 0; i
< 100; ++i
) {
1132 // Add a single blob reference to each file
1133 std::string blob_index
;
1134 BlobIndex::EncodeBlob(&blob_index
, /* blob_file_number */ i
+ 1000,
1135 /* offset */ 1234, /* size */ 5678, kNoCompression
);
1138 ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch
, 0, Key(key_index
),
1140 ASSERT_OK(dbfull()->Write(WriteOptions(), &batch
));
1144 // Fill up the rest of the file with random values.
1145 GenerateNewFile(&rnd
, &key_index
, /* nowait */ true);
1150 std::vector
<std::vector
<FileMetaData
>> files_by_level
;
1151 dbfull()->TEST_GetFilesMetaData(db_
->DefaultColumnFamily(), &files_by_level
);
1153 options
.env
->GetCurrentTime(&temp_time
);
1154 uint64_t end_time
= static_cast<uint64_t>(temp_time
);
1156 ColumnFamilyMetaData cf_meta
;
1157 db_
->GetColumnFamilyMetaData(&cf_meta
);
1158 CheckColumnFamilyMeta(cf_meta
, files_by_level
, start_time
, end_time
);
1160 std::vector
<LiveFileMetaData
> live_file_meta
;
1161 db_
->GetLiveFilesMetaData(&live_file_meta
);
1162 CheckLiveFilesMeta(live_file_meta
, files_by_level
);
1166 void MinLevelHelper(DBTest
* self
, Options
& options
) {
1169 for (int num
= 0; num
< options
.level0_file_num_compaction_trigger
- 1;
1171 std::vector
<std::string
> values
;
1172 // Write 120KB (12 values, each 10K)
1173 for (int i
= 0; i
< 12; i
++) {
1174 values
.push_back(DBTestBase::RandomString(&rnd
, 10000));
1175 ASSERT_OK(self
->Put(DBTestBase::Key(i
), values
[i
]));
1177 self
->dbfull()->TEST_WaitForFlushMemTable();
1178 ASSERT_EQ(self
->NumTableFilesAtLevel(0), num
+ 1);
1181 // generate one more file in level-0, and should trigger level-0 compaction
1182 std::vector
<std::string
> values
;
1183 for (int i
= 0; i
< 12; i
++) {
1184 values
.push_back(DBTestBase::RandomString(&rnd
, 10000));
1185 ASSERT_OK(self
->Put(DBTestBase::Key(i
), values
[i
]));
1187 self
->dbfull()->TEST_WaitForCompact();
1189 ASSERT_EQ(self
->NumTableFilesAtLevel(0), 0);
1190 ASSERT_EQ(self
->NumTableFilesAtLevel(1), 1);
1193 // returns false if the calling-Test should be skipped
1194 bool MinLevelToCompress(CompressionType
& type
, Options
& options
, int wbits
,
1195 int lev
, int strategy
) {
1197 "Test with compression options : window_bits = %d, level = %d, "
1199 wbits
, lev
, strategy
);
1200 options
.write_buffer_size
= 100 << 10; // 100KB
1201 options
.arena_block_size
= 4096;
1202 options
.num_levels
= 3;
1203 options
.level0_file_num_compaction_trigger
= 3;
1204 options
.create_if_missing
= true;
1206 if (Snappy_Supported()) {
1207 type
= kSnappyCompression
;
1208 fprintf(stderr
, "using snappy\n");
1209 } else if (Zlib_Supported()) {
1210 type
= kZlibCompression
;
1211 fprintf(stderr
, "using zlib\n");
1212 } else if (BZip2_Supported()) {
1213 type
= kBZip2Compression
;
1214 fprintf(stderr
, "using bzip2\n");
1215 } else if (LZ4_Supported()) {
1216 type
= kLZ4Compression
;
1217 fprintf(stderr
, "using lz4\n");
1218 } else if (XPRESS_Supported()) {
1219 type
= kXpressCompression
;
1220 fprintf(stderr
, "using xpress\n");
1221 } else if (ZSTD_Supported()) {
1223 fprintf(stderr
, "using ZSTD\n");
1225 fprintf(stderr
, "skipping test, compression disabled\n");
1228 options
.compression_per_level
.resize(options
.num_levels
);
1230 // do not compress L0
1231 for (int i
= 0; i
< 1; i
++) {
1232 options
.compression_per_level
[i
] = kNoCompression
;
1234 for (int i
= 1; i
< options
.num_levels
; i
++) {
1235 options
.compression_per_level
[i
] = type
;
1241 TEST_F(DBTest
, MinLevelToCompress1
) {
1242 Options options
= CurrentOptions();
1243 CompressionType type
= kSnappyCompression
;
1244 if (!MinLevelToCompress(type
, options
, -14, -1, 0)) {
1248 MinLevelHelper(this, options
);
1250 // do not compress L0 and L1
1251 for (int i
= 0; i
< 2; i
++) {
1252 options
.compression_per_level
[i
] = kNoCompression
;
1254 for (int i
= 2; i
< options
.num_levels
; i
++) {
1255 options
.compression_per_level
[i
] = type
;
1257 DestroyAndReopen(options
);
1258 MinLevelHelper(this, options
);
1261 TEST_F(DBTest
, MinLevelToCompress2
) {
1262 Options options
= CurrentOptions();
1263 CompressionType type
= kSnappyCompression
;
1264 if (!MinLevelToCompress(type
, options
, 15, -1, 0)) {
1268 MinLevelHelper(this, options
);
1270 // do not compress L0 and L1
1271 for (int i
= 0; i
< 2; i
++) {
1272 options
.compression_per_level
[i
] = kNoCompression
;
1274 for (int i
= 2; i
< options
.num_levels
; i
++) {
1275 options
.compression_per_level
[i
] = type
;
1277 DestroyAndReopen(options
);
1278 MinLevelHelper(this, options
);
1281 // This test may fail because of a legit case that multiple L0 files
1282 // are trivial moved to L1.
1283 TEST_F(DBTest
, DISABLED_RepeatedWritesToSameKey
) {
1285 Options options
= CurrentOptions();
1287 options
.write_buffer_size
= 100000; // Small write buffer
1288 CreateAndReopenWithCF({"pikachu"}, options
);
1290 // We must have at most one file per level except for level-0,
1291 // which may have up to kL0_StopWritesTrigger files.
1292 const int kMaxFiles
=
1293 options
.num_levels
+ options
.level0_stop_writes_trigger
;
1297 RandomString(&rnd
, static_cast<int>(2 * options
.write_buffer_size
));
1298 for (int i
= 0; i
< 5 * kMaxFiles
; i
++) {
1299 ASSERT_OK(Put(1, "key", value
));
1300 ASSERT_LE(TotalTableFiles(1), kMaxFiles
);
1302 } while (ChangeCompactOptions());
1304 #endif // ROCKSDB_LITE
1306 TEST_F(DBTest
, SparseMerge
) {
1308 Options options
= CurrentOptions();
1309 options
.compression
= kNoCompression
;
1310 CreateAndReopenWithCF({"pikachu"}, options
);
1312 FillLevels("A", "Z", 1);
1314 // Suppose there is:
1315 // small amount of data with prefix A
1316 // large amount of data with prefix B
1317 // small amount of data with prefix C
1318 // and that recent updates have made small changes to all three prefixes.
1319 // Check that we do not do a compaction that merges all of B in one shot.
1320 const std::string
value(1000, 'x');
1322 // Write approximately 100MB of "B" values
1323 for (int i
= 0; i
< 100000; i
++) {
1325 snprintf(key
, sizeof(key
), "B%010d", i
);
1329 ASSERT_OK(Flush(1));
1330 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_
[1]);
1332 // Make sparse update
1334 Put(1, "B100", "bvalue2");
1336 ASSERT_OK(Flush(1));
1338 // Compactions should not cause us to create a situation where
1339 // a file overlaps too much data at the next level.
1340 ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(handles_
[1]),
1342 dbfull()->TEST_CompactRange(0, nullptr, nullptr);
1343 ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(handles_
[1]),
1345 dbfull()->TEST_CompactRange(1, nullptr, nullptr);
1346 ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(handles_
[1]),
1348 } while (ChangeCompactOptions());
1351 #ifndef ROCKSDB_LITE
1352 static bool Between(uint64_t val
, uint64_t low
, uint64_t high
) {
1353 bool result
= (val
>= low
) && (val
<= high
);
1355 fprintf(stderr
, "Value %llu is not in range [%llu, %llu]\n",
1356 (unsigned long long)(val
), (unsigned long long)(low
),
1357 (unsigned long long)(high
));
1362 TEST_F(DBTest
, ApproximateSizesMemTable
) {
1363 Options options
= CurrentOptions();
1364 options
.write_buffer_size
= 100000000; // Large write buffer
1365 options
.compression
= kNoCompression
;
1366 options
.create_if_missing
= true;
1367 DestroyAndReopen(options
);
1368 auto default_cf
= db_
->DefaultColumnFamily();
1372 for (int i
= 0; i
< N
; i
++) {
1373 ASSERT_OK(Put(Key(i
), RandomString(&rnd
, 1024)));
1377 std::string start
= Key(50);
1378 std::string end
= Key(60);
1379 Range
r(start
, end
);
1380 SizeApproximationOptions size_approx_options
;
1381 size_approx_options
.include_memtabtles
= true;
1382 size_approx_options
.include_files
= true;
1383 db_
->GetApproximateSizes(size_approx_options
, default_cf
, &r
, 1, &size
);
1384 ASSERT_GT(size
, 6000);
1385 ASSERT_LT(size
, 204800);
1386 // Zero if not including mem table
1387 db_
->GetApproximateSizes(&r
, 1, &size
);
1392 r
= Range(start
, end
);
1393 db_
->GetApproximateSizes(size_approx_options
, default_cf
, &r
, 1, &size
);
1396 for (int i
= 0; i
< N
; i
++) {
1397 ASSERT_OK(Put(Key(1000 + i
), RandomString(&rnd
, 1024)));
1402 r
= Range(start
, end
);
1403 db_
->GetApproximateSizes(size_approx_options
, default_cf
, &r
, 1, &size
);
1408 r
= Range(start
, end
);
1409 db_
->GetApproximateSizes(size_approx_options
, default_cf
, &r
, 1, &size
);
1410 ASSERT_GT(size
, 6000);
1412 options
.max_write_buffer_number
= 8;
1413 options
.min_write_buffer_number_to_merge
= 5;
1414 options
.write_buffer_size
= 1024 * N
; // Not very large
1415 DestroyAndReopen(options
);
1416 default_cf
= db_
->DefaultColumnFamily();
1419 for (int i
= 0; i
< N
; i
++) {
1420 keys
[i
* 3] = i
* 5;
1421 keys
[i
* 3 + 1] = i
* 5 + 1;
1422 keys
[i
* 3 + 2] = i
* 5 + 2;
1424 std::random_shuffle(std::begin(keys
), std::end(keys
));
1426 for (int i
= 0; i
< N
* 3; i
++) {
1427 ASSERT_OK(Put(Key(keys
[i
] + 1000), RandomString(&rnd
, 1024)));
1432 r
= Range(start
, end
);
1433 db_
->GetApproximateSizes(size_approx_options
, default_cf
, &r
, 1, &size
);
1438 r
= Range(start
, end
);
1439 db_
->GetApproximateSizes(size_approx_options
, default_cf
, &r
, 1, &size
);
1440 ASSERT_GT(size
, 6000);
1444 r
= Range(start
, end
);
1445 db_
->GetApproximateSizes(size_approx_options
, default_cf
, &r
, 1, &size
);
1450 r
= Range(start
, end
);
1451 uint64_t size_with_mt
, size_without_mt
;
1452 db_
->GetApproximateSizes(size_approx_options
, default_cf
, &r
, 1,
1454 ASSERT_GT(size_with_mt
, 6000);
1455 db_
->GetApproximateSizes(&r
, 1, &size_without_mt
);
1456 ASSERT_EQ(size_without_mt
, 0);
1460 for (int i
= 0; i
< N
; i
++) {
1461 ASSERT_OK(Put(Key(i
+ 1000), RandomString(&rnd
, 1024)));
1466 r
= Range(start
, end
);
1467 db_
->GetApproximateSizes(size_approx_options
, default_cf
, &r
, 1,
1469 db_
->GetApproximateSizes(&r
, 1, &size_without_mt
);
1470 ASSERT_GT(size_with_mt
, size_without_mt
);
1471 ASSERT_GT(size_without_mt
, 6000);
1473 // Check that include_memtabtles flag works as expected
1474 size_approx_options
.include_memtabtles
= false;
1475 db_
->GetApproximateSizes(size_approx_options
, default_cf
, &r
, 1, &size
);
1476 ASSERT_EQ(size
, size_without_mt
);
1478 // Check that files_size_error_margin works as expected, when the heuristic
1479 // conditions are not met
1481 end
= Key(1000 + N
- 2);
1482 r
= Range(start
, end
);
1483 size_approx_options
.files_size_error_margin
= -1.0; // disabled
1484 db_
->GetApproximateSizes(size_approx_options
, default_cf
, &r
, 1, &size
);
1486 size_approx_options
.files_size_error_margin
= 0.5; // enabled, but not used
1487 db_
->GetApproximateSizes(size_approx_options
, default_cf
, &r
, 1, &size2
);
1488 ASSERT_EQ(size
, size2
);
1491 TEST_F(DBTest
, ApproximateSizesFilesWithErrorMargin
) {
1492 Options options
= CurrentOptions();
1493 options
.write_buffer_size
= 1024 * 1024;
1494 options
.compression
= kNoCompression
;
1495 options
.create_if_missing
= true;
1496 options
.target_file_size_base
= 1024 * 1024;
1497 DestroyAndReopen(options
);
1498 const auto default_cf
= db_
->DefaultColumnFamily();
1500 const int N
= 64000;
1502 for (int i
= 0; i
< N
; i
++) {
1503 ASSERT_OK(Put(Key(i
), RandomString(&rnd
, 1024)));
1505 // Flush everything to files
1507 // Compact the entire key space into the next level
1508 db_
->CompactRange(CompactRangeOptions(), default_cf
, nullptr, nullptr);
1511 for (int i
= N
; i
< (N
+ N
/ 4); i
++) {
1512 ASSERT_OK(Put(Key(i
), RandomString(&rnd
, 1024)));
1514 // Flush everything to files again
1517 // Wait for compaction to finish
1518 ASSERT_OK(dbfull()->TEST_WaitForCompact());
1520 const std::string start
= Key(0);
1521 const std::string end
= Key(2 * N
);
1522 const Range
r(start
, end
);
1524 SizeApproximationOptions size_approx_options
;
1525 size_approx_options
.include_memtabtles
= false;
1526 size_approx_options
.include_files
= true;
1527 size_approx_options
.files_size_error_margin
= -1.0; // disabled
1529 // Get the precise size without any approximation heuristic
1531 db_
->GetApproximateSizes(size_approx_options
, default_cf
, &r
, 1, &size
);
1534 // Get the size with an approximation heuristic
1536 const double error_margin
= 0.2;
1537 size_approx_options
.files_size_error_margin
= error_margin
;
1538 db_
->GetApproximateSizes(size_approx_options
, default_cf
, &r
, 1, &size2
);
1539 ASSERT_LT(size2
, size
* (1 + error_margin
));
1540 ASSERT_GT(size2
, size
* (1 - error_margin
));
1543 TEST_F(DBTest
, GetApproximateMemTableStats
) {
1544 Options options
= CurrentOptions();
1545 options
.write_buffer_size
= 100000000;
1546 options
.compression
= kNoCompression
;
1547 options
.create_if_missing
= true;
1548 DestroyAndReopen(options
);
1552 for (int i
= 0; i
< N
; i
++) {
1553 ASSERT_OK(Put(Key(i
), RandomString(&rnd
, 1024)));
1559 std::string start
= Key(50);
1560 std::string end
= Key(60);
1561 Range
r(start
, end
);
1562 db_
->GetApproximateMemTableStats(r
, &count
, &size
);
1563 ASSERT_GT(count
, 0);
1564 ASSERT_LE(count
, N
);
1565 ASSERT_GT(size
, 6000);
1566 ASSERT_LT(size
, 204800);
1570 r
= Range(start
, end
);
1571 db_
->GetApproximateMemTableStats(r
, &count
, &size
);
1572 ASSERT_EQ(count
, 0);
1579 r
= Range(start
, end
);
1580 db_
->GetApproximateMemTableStats(r
, &count
, &size
);
1581 ASSERT_EQ(count
, 0);
1584 for (int i
= 0; i
< N
; i
++) {
1585 ASSERT_OK(Put(Key(1000 + i
), RandomString(&rnd
, 1024)));
1590 r
= Range(start
, end
);
1591 db_
->GetApproximateMemTableStats(r
, &count
, &size
);
1592 ASSERT_GT(count
, 20);
1593 ASSERT_GT(size
, 6000);
1596 TEST_F(DBTest
, ApproximateSizes
) {
1598 Options options
= CurrentOptions();
1599 options
.write_buffer_size
= 100000000; // Large write buffer
1600 options
.compression
= kNoCompression
;
1601 options
.create_if_missing
= true;
1602 DestroyAndReopen(options
);
1603 CreateAndReopenWithCF({"pikachu"}, options
);
1605 ASSERT_TRUE(Between(Size("", "xyz", 1), 0, 0));
1606 ReopenWithColumnFamilies({"default", "pikachu"}, options
);
1607 ASSERT_TRUE(Between(Size("", "xyz", 1), 0, 0));
1609 // Write 8MB (80 values, each 100K)
1610 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
1612 static const int S1
= 100000;
1613 static const int S2
= 105000; // Allow some expansion from metadata
1615 for (int i
= 0; i
< N
; i
++) {
1616 ASSERT_OK(Put(1, Key(i
), RandomString(&rnd
, S1
)));
1619 // 0 because GetApproximateSizes() does not account for memtable space
1620 ASSERT_TRUE(Between(Size("", Key(50), 1), 0, 0));
1622 // Check sizes across recovery by reopening a few times
1623 for (int run
= 0; run
< 3; run
++) {
1624 ReopenWithColumnFamilies({"default", "pikachu"}, options
);
1626 for (int compact_start
= 0; compact_start
< N
; compact_start
+= 10) {
1627 for (int i
= 0; i
< N
; i
+= 10) {
1628 ASSERT_TRUE(Between(Size("", Key(i
), 1), S1
* i
, S2
* i
));
1629 ASSERT_TRUE(Between(Size("", Key(i
) + ".suffix", 1), S1
* (i
+ 1),
1631 ASSERT_TRUE(Between(Size(Key(i
), Key(i
+ 10), 1), S1
* 10, S2
* 10));
1633 ASSERT_TRUE(Between(Size("", Key(50), 1), S1
* 50, S2
* 50));
1635 Between(Size("", Key(50) + ".suffix", 1), S1
* 50, S2
* 50));
1637 std::string cstart_str
= Key(compact_start
);
1638 std::string cend_str
= Key(compact_start
+ 9);
1639 Slice cstart
= cstart_str
;
1640 Slice cend
= cend_str
;
1641 dbfull()->TEST_CompactRange(0, &cstart
, &cend
, handles_
[1]);
1644 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
1645 ASSERT_GT(NumTableFilesAtLevel(1, 1), 0);
1647 // ApproximateOffsetOf() is not yet implemented in plain table format.
1648 } while (ChangeOptions(kSkipUniversalCompaction
| kSkipFIFOCompaction
|
1649 kSkipPlainTable
| kSkipHashIndex
));
1652 TEST_F(DBTest
, ApproximateSizes_MixOfSmallAndLarge
) {
1654 Options options
= CurrentOptions();
1655 options
.compression
= kNoCompression
;
1656 CreateAndReopenWithCF({"pikachu"}, options
);
1659 std::string big1
= RandomString(&rnd
, 100000);
1660 ASSERT_OK(Put(1, Key(0), RandomString(&rnd
, 10000)));
1661 ASSERT_OK(Put(1, Key(1), RandomString(&rnd
, 10000)));
1662 ASSERT_OK(Put(1, Key(2), big1
));
1663 ASSERT_OK(Put(1, Key(3), RandomString(&rnd
, 10000)));
1664 ASSERT_OK(Put(1, Key(4), big1
));
1665 ASSERT_OK(Put(1, Key(5), RandomString(&rnd
, 10000)));
1666 ASSERT_OK(Put(1, Key(6), RandomString(&rnd
, 300000)));
1667 ASSERT_OK(Put(1, Key(7), RandomString(&rnd
, 10000)));
1669 // Check sizes across recovery by reopening a few times
1670 for (int run
= 0; run
< 3; run
++) {
1671 ReopenWithColumnFamilies({"default", "pikachu"}, options
);
1673 ASSERT_TRUE(Between(Size("", Key(0), 1), 0, 0));
1674 ASSERT_TRUE(Between(Size("", Key(1), 1), 10000, 11000));
1675 ASSERT_TRUE(Between(Size("", Key(2), 1), 20000, 21000));
1676 ASSERT_TRUE(Between(Size("", Key(3), 1), 120000, 121000));
1677 ASSERT_TRUE(Between(Size("", Key(4), 1), 130000, 131000));
1678 ASSERT_TRUE(Between(Size("", Key(5), 1), 230000, 231000));
1679 ASSERT_TRUE(Between(Size("", Key(6), 1), 240000, 241000));
1680 ASSERT_TRUE(Between(Size("", Key(7), 1), 540000, 541000));
1681 ASSERT_TRUE(Between(Size("", Key(8), 1), 550000, 560000));
1683 ASSERT_TRUE(Between(Size(Key(3), Key(5), 1), 110000, 111000));
1685 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_
[1]);
1687 // ApproximateOffsetOf() is not yet implemented in plain table format.
1688 } while (ChangeOptions(kSkipPlainTable
));
1690 #endif // ROCKSDB_LITE
1692 #ifndef ROCKSDB_LITE
1693 TEST_F(DBTest
, Snapshot
) {
1694 anon::OptionsOverride options_override
;
1695 options_override
.skip_policy
= kSkipNoSnapshot
;
1697 CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override
));
1698 Put(0, "foo", "0v1");
1699 Put(1, "foo", "1v1");
1701 const Snapshot
* s1
= db_
->GetSnapshot();
1702 ASSERT_EQ(1U, GetNumSnapshots());
1703 uint64_t time_snap1
= GetTimeOldestSnapshots();
1704 ASSERT_GT(time_snap1
, 0U);
1705 ASSERT_EQ(GetSequenceOldestSnapshots(), s1
->GetSequenceNumber());
1706 Put(0, "foo", "0v2");
1707 Put(1, "foo", "1v2");
1709 env_
->addon_time_
.fetch_add(1);
1711 const Snapshot
* s2
= db_
->GetSnapshot();
1712 ASSERT_EQ(2U, GetNumSnapshots());
1713 ASSERT_EQ(time_snap1
, GetTimeOldestSnapshots());
1714 ASSERT_EQ(GetSequenceOldestSnapshots(), s1
->GetSequenceNumber());
1715 Put(0, "foo", "0v3");
1716 Put(1, "foo", "1v3");
1719 ManagedSnapshot
s3(db_
);
1720 ASSERT_EQ(3U, GetNumSnapshots());
1721 ASSERT_EQ(time_snap1
, GetTimeOldestSnapshots());
1722 ASSERT_EQ(GetSequenceOldestSnapshots(), s1
->GetSequenceNumber());
1724 Put(0, "foo", "0v4");
1725 Put(1, "foo", "1v4");
1726 ASSERT_EQ("0v1", Get(0, "foo", s1
));
1727 ASSERT_EQ("1v1", Get(1, "foo", s1
));
1728 ASSERT_EQ("0v2", Get(0, "foo", s2
));
1729 ASSERT_EQ("1v2", Get(1, "foo", s2
));
1730 ASSERT_EQ("0v3", Get(0, "foo", s3
.snapshot()));
1731 ASSERT_EQ("1v3", Get(1, "foo", s3
.snapshot()));
1732 ASSERT_EQ("0v4", Get(0, "foo"));
1733 ASSERT_EQ("1v4", Get(1, "foo"));
1736 ASSERT_EQ(2U, GetNumSnapshots());
1737 ASSERT_EQ(time_snap1
, GetTimeOldestSnapshots());
1738 ASSERT_EQ(GetSequenceOldestSnapshots(), s1
->GetSequenceNumber());
1739 ASSERT_EQ("0v1", Get(0, "foo", s1
));
1740 ASSERT_EQ("1v1", Get(1, "foo", s1
));
1741 ASSERT_EQ("0v2", Get(0, "foo", s2
));
1742 ASSERT_EQ("1v2", Get(1, "foo", s2
));
1743 ASSERT_EQ("0v4", Get(0, "foo"));
1744 ASSERT_EQ("1v4", Get(1, "foo"));
1746 db_
->ReleaseSnapshot(s1
);
1747 ASSERT_EQ("0v2", Get(0, "foo", s2
));
1748 ASSERT_EQ("1v2", Get(1, "foo", s2
));
1749 ASSERT_EQ("0v4", Get(0, "foo"));
1750 ASSERT_EQ("1v4", Get(1, "foo"));
1751 ASSERT_EQ(1U, GetNumSnapshots());
1752 ASSERT_LT(time_snap1
, GetTimeOldestSnapshots());
1753 ASSERT_EQ(GetSequenceOldestSnapshots(), s2
->GetSequenceNumber());
1755 db_
->ReleaseSnapshot(s2
);
1756 ASSERT_EQ(0U, GetNumSnapshots());
1757 ASSERT_EQ(GetSequenceOldestSnapshots(), 0);
1758 ASSERT_EQ("0v4", Get(0, "foo"));
1759 ASSERT_EQ("1v4", Get(1, "foo"));
1760 } while (ChangeOptions());
1763 TEST_F(DBTest
, HiddenValuesAreRemoved
) {
1764 anon::OptionsOverride options_override
;
1765 options_override
.skip_policy
= kSkipNoSnapshot
;
1767 Options options
= CurrentOptions(options_override
);
1768 CreateAndReopenWithCF({"pikachu"}, options
);
1770 FillLevels("a", "z", 1);
1772 std::string big
= RandomString(&rnd
, 50000);
1774 Put(1, "pastfoo", "v");
1775 const Snapshot
* snapshot
= db_
->GetSnapshot();
1776 Put(1, "foo", "tiny");
1777 Put(1, "pastfoo2", "v2"); // Advance sequence number one more
1779 ASSERT_OK(Flush(1));
1780 ASSERT_GT(NumTableFilesAtLevel(0, 1), 0);
1782 ASSERT_EQ(big
, Get(1, "foo", snapshot
));
1783 ASSERT_TRUE(Between(Size("", "pastfoo", 1), 50000, 60000));
1784 db_
->ReleaseSnapshot(snapshot
);
1785 ASSERT_EQ(AllEntriesFor("foo", 1), "[ tiny, " + big
+ " ]");
1787 dbfull()->TEST_CompactRange(0, nullptr, &x
, handles_
[1]);
1788 ASSERT_EQ(AllEntriesFor("foo", 1), "[ tiny ]");
1789 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
1790 ASSERT_GE(NumTableFilesAtLevel(1, 1), 1);
1791 dbfull()->TEST_CompactRange(1, nullptr, &x
, handles_
[1]);
1792 ASSERT_EQ(AllEntriesFor("foo", 1), "[ tiny ]");
1794 ASSERT_TRUE(Between(Size("", "pastfoo", 1), 0, 1000));
1795 // ApproximateOffsetOf() is not yet implemented in plain table format,
1796 // which is used by Size().
1797 } while (ChangeOptions(kSkipUniversalCompaction
| kSkipFIFOCompaction
|
1800 #endif // ROCKSDB_LITE
1802 TEST_F(DBTest
, UnremovableSingleDelete
) {
1805 // Put(A, v1) Snapshot SingleDelete(A) Put(A, v2)
1807 // We do not want to end up with:
1809 // Put(A, v1) Snapshot Put(A, v2)
1811 // Because a subsequent SingleDelete(A) would delete the Put(A, v2)
1812 // but not Put(A, v1), so Get(A) would return v1.
1813 anon::OptionsOverride options_override
;
1814 options_override
.skip_policy
= kSkipNoSnapshot
;
1816 Options options
= CurrentOptions(options_override
);
1817 options
.disable_auto_compactions
= true;
1818 CreateAndReopenWithCF({"pikachu"}, options
);
1820 Put(1, "foo", "first");
1821 const Snapshot
* snapshot
= db_
->GetSnapshot();
1822 SingleDelete(1, "foo");
1823 Put(1, "foo", "second");
1824 ASSERT_OK(Flush(1));
1826 ASSERT_EQ("first", Get(1, "foo", snapshot
));
1827 ASSERT_EQ("second", Get(1, "foo"));
1829 dbfull()->CompactRange(CompactRangeOptions(), handles_
[1], nullptr,
1831 ASSERT_EQ("[ second, SDEL, first ]", AllEntriesFor("foo", 1));
1833 SingleDelete(1, "foo");
1835 ASSERT_EQ("first", Get(1, "foo", snapshot
));
1836 ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
1838 dbfull()->CompactRange(CompactRangeOptions(), handles_
[1], nullptr,
1841 ASSERT_EQ("first", Get(1, "foo", snapshot
));
1842 ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
1843 db_
->ReleaseSnapshot(snapshot
);
1844 // Skip FIFO and universal compaction beccause they do not apply to the test
1845 // case. Skip MergePut because single delete does not get removed when it
1846 // encounters a merge.
1847 } while (ChangeOptions(kSkipFIFOCompaction
| kSkipUniversalCompaction
|
1851 #ifndef ROCKSDB_LITE
1852 TEST_F(DBTest
, DeletionMarkers1
) {
1853 Options options
= CurrentOptions();
1854 CreateAndReopenWithCF({"pikachu"}, options
);
1855 Put(1, "foo", "v1");
1856 ASSERT_OK(Flush(1));
1858 MoveFilesToLevel(last
, 1);
1859 // foo => v1 is now in last level
1860 ASSERT_EQ(NumTableFilesAtLevel(last
, 1), 1);
1862 // Place a table at level last-1 to prevent merging with preceding mutation
1863 Put(1, "a", "begin");
1866 MoveFilesToLevel(last
- 1, 1);
1867 ASSERT_EQ(NumTableFilesAtLevel(last
, 1), 1);
1868 ASSERT_EQ(NumTableFilesAtLevel(last
- 1, 1), 1);
1871 Put(1, "foo", "v2");
1872 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, DEL, v1 ]");
1873 ASSERT_OK(Flush(1)); // Moves to level last-2
1874 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, v1 ]");
1876 dbfull()->TEST_CompactRange(last
- 2, nullptr, &z
, handles_
[1]);
1877 // DEL eliminated, but v1 remains because we aren't compacting that level
1878 // (DEL can be eliminated because v2 hides v1).
1879 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, v1 ]");
1880 dbfull()->TEST_CompactRange(last
- 1, nullptr, nullptr, handles_
[1]);
1881 // Merging last-1 w/ last, so we are the base level for "foo", so
1882 // DEL is removed. (as is v1).
1883 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2 ]");
1886 TEST_F(DBTest
, DeletionMarkers2
) {
1887 Options options
= CurrentOptions();
1888 CreateAndReopenWithCF({"pikachu"}, options
);
1889 Put(1, "foo", "v1");
1890 ASSERT_OK(Flush(1));
1892 MoveFilesToLevel(last
, 1);
1893 // foo => v1 is now in last level
1894 ASSERT_EQ(NumTableFilesAtLevel(last
, 1), 1);
1896 // Place a table at level last-1 to prevent merging with preceding mutation
1897 Put(1, "a", "begin");
1900 MoveFilesToLevel(last
- 1, 1);
1901 ASSERT_EQ(NumTableFilesAtLevel(last
, 1), 1);
1902 ASSERT_EQ(NumTableFilesAtLevel(last
- 1, 1), 1);
1905 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v1 ]");
1906 ASSERT_OK(Flush(1)); // Moves to level last-2
1907 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v1 ]");
1908 dbfull()->TEST_CompactRange(last
- 2, nullptr, nullptr, handles_
[1]);
1909 // DEL kept: "last" file overlaps
1910 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v1 ]");
1911 dbfull()->TEST_CompactRange(last
- 1, nullptr, nullptr, handles_
[1]);
1912 // Merging last-1 w/ last, so we are the base level for "foo", so
1913 // DEL is removed. (as is v1).
1914 ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
1917 TEST_F(DBTest
, OverlapInLevel0
) {
1919 Options options
= CurrentOptions();
1920 CreateAndReopenWithCF({"pikachu"}, options
);
1922 // Fill levels 1 and 2 to disable the pushing of new memtables to levels >
1924 ASSERT_OK(Put(1, "100", "v100"));
1925 ASSERT_OK(Put(1, "999", "v999"));
1927 MoveFilesToLevel(2, 1);
1928 ASSERT_OK(Delete(1, "100"));
1929 ASSERT_OK(Delete(1, "999"));
1931 MoveFilesToLevel(1, 1);
1932 ASSERT_EQ("0,1,1", FilesPerLevel(1));
1934 // Make files spanning the following ranges in level-0:
1935 // files[0] 200 .. 900
1936 // files[1] 300 .. 500
1937 // Note that files are sorted by smallest key.
1938 ASSERT_OK(Put(1, "300", "v300"));
1939 ASSERT_OK(Put(1, "500", "v500"));
1941 ASSERT_OK(Put(1, "200", "v200"));
1942 ASSERT_OK(Put(1, "600", "v600"));
1943 ASSERT_OK(Put(1, "900", "v900"));
1945 ASSERT_EQ("2,1,1", FilesPerLevel(1));
1947 // Compact away the placeholder files we created initially
1948 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_
[1]);
1949 dbfull()->TEST_CompactRange(2, nullptr, nullptr, handles_
[1]);
1950 ASSERT_EQ("2", FilesPerLevel(1));
1952 // Do a memtable compaction. Before bug-fix, the compaction would
1953 // not detect the overlap with level-0 files and would incorrectly place
1954 // the deletion in a deeper level.
1955 ASSERT_OK(Delete(1, "600"));
1957 ASSERT_EQ("3", FilesPerLevel(1));
1958 ASSERT_EQ("NOT_FOUND", Get(1, "600"));
1959 } while (ChangeOptions(kSkipUniversalCompaction
| kSkipFIFOCompaction
));
1961 #endif // ROCKSDB_LITE
1963 TEST_F(DBTest
, ComparatorCheck
) {
1964 class NewComparator
: public Comparator
{
1966 const char* Name() const override
{ return "rocksdb.NewComparator"; }
1967 int Compare(const Slice
& a
, const Slice
& b
) const override
{
1968 return BytewiseComparator()->Compare(a
, b
);
1970 void FindShortestSeparator(std::string
* s
, const Slice
& l
) const override
{
1971 BytewiseComparator()->FindShortestSeparator(s
, l
);
1973 void FindShortSuccessor(std::string
* key
) const override
{
1974 BytewiseComparator()->FindShortSuccessor(key
);
1977 Options new_options
, options
;
1980 options
= CurrentOptions();
1981 CreateAndReopenWithCF({"pikachu"}, options
);
1982 new_options
= CurrentOptions();
1983 new_options
.comparator
= &cmp
;
1984 // only the non-default column family has non-matching comparator
1985 Status s
= TryReopenWithColumnFamilies(
1986 {"default", "pikachu"}, std::vector
<Options
>({options
, new_options
}));
1987 ASSERT_TRUE(!s
.ok());
1988 ASSERT_TRUE(s
.ToString().find("comparator") != std::string::npos
)
1990 } while (ChangeCompactOptions());
1993 TEST_F(DBTest
, CustomComparator
) {
1994 class NumberComparator
: public Comparator
{
1996 const char* Name() const override
{ return "test.NumberComparator"; }
1997 int Compare(const Slice
& a
, const Slice
& b
) const override
{
1998 return ToNumber(a
) - ToNumber(b
);
2000 void FindShortestSeparator(std::string
* s
, const Slice
& l
) const override
{
2001 ToNumber(*s
); // Check format
2002 ToNumber(l
); // Check format
2004 void FindShortSuccessor(std::string
* key
) const override
{
2005 ToNumber(*key
); // Check format
2009 static int ToNumber(const Slice
& x
) {
2010 // Check that there are no extra characters.
2011 EXPECT_TRUE(x
.size() >= 2 && x
[0] == '[' && x
[x
.size() - 1] == ']')
2015 EXPECT_TRUE(sscanf(x
.ToString().c_str(), "[%i]%c", &val
, &ignored
) == 1)
2020 Options new_options
;
2021 NumberComparator cmp
;
2023 new_options
= CurrentOptions();
2024 new_options
.create_if_missing
= true;
2025 new_options
.comparator
= &cmp
;
2026 new_options
.write_buffer_size
= 4096; // Compact more often
2027 new_options
.arena_block_size
= 4096;
2028 new_options
= CurrentOptions(new_options
);
2029 DestroyAndReopen(new_options
);
2030 CreateAndReopenWithCF({"pikachu"}, new_options
);
2031 ASSERT_OK(Put(1, "[10]", "ten"));
2032 ASSERT_OK(Put(1, "[0x14]", "twenty"));
2033 for (int i
= 0; i
< 2; i
++) {
2034 ASSERT_EQ("ten", Get(1, "[10]"));
2035 ASSERT_EQ("ten", Get(1, "[0xa]"));
2036 ASSERT_EQ("twenty", Get(1, "[20]"));
2037 ASSERT_EQ("twenty", Get(1, "[0x14]"));
2038 ASSERT_EQ("NOT_FOUND", Get(1, "[15]"));
2039 ASSERT_EQ("NOT_FOUND", Get(1, "[0xf]"));
2040 Compact(1, "[0]", "[9999]");
2043 for (int run
= 0; run
< 2; run
++) {
2044 for (int i
= 0; i
< 1000; i
++) {
2046 snprintf(buf
, sizeof(buf
), "[%d]", i
* 10);
2047 ASSERT_OK(Put(1, buf
, buf
));
2049 Compact(1, "[0]", "[1000000]");
2051 } while (ChangeCompactOptions());
2054 TEST_F(DBTest
, DBOpen_Options
) {
2055 Options options
= CurrentOptions();
2056 std::string dbname
= test::PerThreadDBPath("db_options_test");
2057 ASSERT_OK(DestroyDB(dbname
, options
));
2059 // Does not exist, and create_if_missing == false: error
2061 options
.create_if_missing
= false;
2062 Status s
= DB::Open(options
, dbname
, &db
);
2063 ASSERT_TRUE(strstr(s
.ToString().c_str(), "does not exist") != nullptr);
2064 ASSERT_TRUE(db
== nullptr);
2066 // Does not exist, and create_if_missing == true: OK
2067 options
.create_if_missing
= true;
2068 s
= DB::Open(options
, dbname
, &db
);
2070 ASSERT_TRUE(db
!= nullptr);
2075 // Does exist, and error_if_exists == true: error
2076 options
.create_if_missing
= false;
2077 options
.error_if_exists
= true;
2078 s
= DB::Open(options
, dbname
, &db
);
2079 ASSERT_TRUE(strstr(s
.ToString().c_str(), "exists") != nullptr);
2080 ASSERT_TRUE(db
== nullptr);
2082 // Does exist, and error_if_exists == false: OK
2083 options
.create_if_missing
= true;
2084 options
.error_if_exists
= false;
2085 s
= DB::Open(options
, dbname
, &db
);
2087 ASSERT_TRUE(db
!= nullptr);
2093 TEST_F(DBTest
, DBOpen_Change_NumLevels
) {
2094 Options options
= CurrentOptions();
2095 options
.create_if_missing
= true;
2096 DestroyAndReopen(options
);
2097 ASSERT_TRUE(db_
!= nullptr);
2098 CreateAndReopenWithCF({"pikachu"}, options
);
2100 ASSERT_OK(Put(1, "a", "123"));
2101 ASSERT_OK(Put(1, "b", "234"));
2103 MoveFilesToLevel(3, 1);
2106 options
.create_if_missing
= false;
2107 options
.num_levels
= 2;
2108 Status s
= TryReopenWithColumnFamilies({"default", "pikachu"}, options
);
2109 ASSERT_TRUE(strstr(s
.ToString().c_str(), "Invalid argument") != nullptr);
2110 ASSERT_TRUE(db_
== nullptr);
2113 TEST_F(DBTest
, DestroyDBMetaDatabase
) {
2114 std::string dbname
= test::PerThreadDBPath("db_meta");
2115 ASSERT_OK(env_
->CreateDirIfMissing(dbname
));
2116 std::string metadbname
= MetaDatabaseName(dbname
, 0);
2117 ASSERT_OK(env_
->CreateDirIfMissing(metadbname
));
2118 std::string metametadbname
= MetaDatabaseName(metadbname
, 0);
2119 ASSERT_OK(env_
->CreateDirIfMissing(metametadbname
));
2121 // Destroy previous versions if they exist. Using the long way.
2122 Options options
= CurrentOptions();
2123 ASSERT_OK(DestroyDB(metametadbname
, options
));
2124 ASSERT_OK(DestroyDB(metadbname
, options
));
2125 ASSERT_OK(DestroyDB(dbname
, options
));
2129 ASSERT_OK(DB::Open(options
, dbname
, &db
));
2132 ASSERT_OK(DB::Open(options
, metadbname
, &db
));
2135 ASSERT_OK(DB::Open(options
, metametadbname
, &db
));
2140 ASSERT_OK(DestroyDB(dbname
, options
));
2142 // Check if deletion worked.
2143 options
.create_if_missing
= false;
2144 ASSERT_TRUE(!(DB::Open(options
, dbname
, &db
)).ok());
2145 ASSERT_TRUE(!(DB::Open(options
, metadbname
, &db
)).ok());
2146 ASSERT_TRUE(!(DB::Open(options
, metametadbname
, &db
)).ok());
2149 #ifndef ROCKSDB_LITE
2150 TEST_F(DBTest
, SnapshotFiles
) {
2152 Options options
= CurrentOptions();
2153 options
.write_buffer_size
= 100000000; // Large write buffer
2154 CreateAndReopenWithCF({"pikachu"}, options
);
2158 // Write 8MB (80 values, each 100K)
2159 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
2160 std::vector
<std::string
> values
;
2161 for (int i
= 0; i
< 80; i
++) {
2162 values
.push_back(RandomString(&rnd
, 100000));
2163 ASSERT_OK(Put((i
< 40), Key(i
), values
[i
]));
2166 // assert that nothing makes it to disk yet.
2167 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
2169 // get a file snapshot
2170 uint64_t manifest_number
= 0;
2171 uint64_t manifest_size
= 0;
2172 std::vector
<std::string
> files
;
2173 dbfull()->DisableFileDeletions();
2174 dbfull()->GetLiveFiles(files
, &manifest_size
);
2176 // CURRENT, MANIFEST, OPTIONS, *.sst files (one for each CF)
2177 ASSERT_EQ(files
.size(), 5U);
2179 uint64_t number
= 0;
2182 // copy these files to a new snapshot directory
2183 std::string snapdir
= dbname_
+ ".snapdir/";
2184 ASSERT_OK(env_
->CreateDirIfMissing(snapdir
));
2186 for (size_t i
= 0; i
< files
.size(); i
++) {
2187 // our clients require that GetLiveFiles returns
2188 // files with "/" as first character!
2189 ASSERT_EQ(files
[i
][0], '/');
2190 std::string src
= dbname_
+ files
[i
];
2191 std::string dest
= snapdir
+ files
[i
];
2194 ASSERT_OK(env_
->GetFileSize(src
, &size
));
2196 // record the number and the size of the
2197 // latest manifest file
2198 if (ParseFileName(files
[i
].substr(1), &number
, &type
)) {
2199 if (type
== kDescriptorFile
) {
2200 if (number
> manifest_number
) {
2201 manifest_number
= number
;
2202 ASSERT_GE(size
, manifest_size
);
2203 size
= manifest_size
; // copy only valid MANIFEST data
2207 CopyFile(src
, dest
, size
);
2210 // release file snapshot
2211 dbfull()->DisableFileDeletions();
2212 // overwrite one key, this key should not appear in the snapshot
2213 std::vector
<std::string
> extras
;
2214 for (unsigned int i
= 0; i
< 1; i
++) {
2215 extras
.push_back(RandomString(&rnd
, 100000));
2216 ASSERT_OK(Put(0, Key(i
), extras
[i
]));
2219 // verify that data in the snapshot are correct
2220 std::vector
<ColumnFamilyDescriptor
> column_families
;
2221 column_families
.emplace_back("default", ColumnFamilyOptions());
2222 column_families
.emplace_back("pikachu", ColumnFamilyOptions());
2223 std::vector
<ColumnFamilyHandle
*> cf_handles
;
2227 opts
.create_if_missing
= false;
2229 DB::Open(opts
, snapdir
, column_families
, &cf_handles
, &snapdb
);
2232 ReadOptions roptions
;
2234 for (unsigned int i
= 0; i
< 80; i
++) {
2235 stat
= snapdb
->Get(roptions
, cf_handles
[i
< 40], Key(i
), &val
);
2236 ASSERT_EQ(values
[i
].compare(val
), 0);
2238 for (auto cfh
: cf_handles
) {
2243 // look at the new live files after we added an 'extra' key
2244 // and after we took the first snapshot.
2245 uint64_t new_manifest_number
= 0;
2246 uint64_t new_manifest_size
= 0;
2247 std::vector
<std::string
> newfiles
;
2248 dbfull()->DisableFileDeletions();
2249 dbfull()->GetLiveFiles(newfiles
, &new_manifest_size
);
2251 // find the new manifest file. assert that this manifest file is
2252 // the same one as in the previous snapshot. But its size should be
2253 // larger because we added an extra key after taking the
2254 // previous shapshot.
2255 for (size_t i
= 0; i
< newfiles
.size(); i
++) {
2256 std::string src
= dbname_
+ "/" + newfiles
[i
];
2257 // record the lognumber and the size of the
2258 // latest manifest file
2259 if (ParseFileName(newfiles
[i
].substr(1), &number
, &type
)) {
2260 if (type
== kDescriptorFile
) {
2261 if (number
> new_manifest_number
) {
2263 new_manifest_number
= number
;
2264 ASSERT_OK(env_
->GetFileSize(src
, &size
));
2265 ASSERT_GE(size
, new_manifest_size
);
2270 ASSERT_EQ(manifest_number
, new_manifest_number
);
2271 ASSERT_GT(new_manifest_size
, manifest_size
);
2273 // release file snapshot
2274 dbfull()->DisableFileDeletions();
2275 } while (ChangeCompactOptions());
2278 TEST_F(DBTest
, ReadonlyDBGetLiveManifestSize
) {
2280 Options options
= CurrentOptions();
2281 options
.level0_file_num_compaction_trigger
= 2;
2282 DestroyAndReopen(options
);
2284 ASSERT_OK(Put("foo", "bar"));
2286 ASSERT_OK(Put("foo", "bar"));
2288 ASSERT_OK(dbfull()->TEST_WaitForCompact());
2291 ASSERT_OK(ReadOnlyReopen(options
));
2293 uint64_t manifest_size
= 0;
2294 std::vector
<std::string
> files
;
2295 dbfull()->GetLiveFiles(files
, &manifest_size
);
2297 for (const std::string
& f
: files
) {
2298 uint64_t number
= 0;
2300 if (ParseFileName(f
.substr(1), &number
, &type
)) {
2301 if (type
== kDescriptorFile
) {
2302 uint64_t size_on_disk
;
2303 env_
->GetFileSize(dbname_
+ "/" + f
, &size_on_disk
);
2304 ASSERT_EQ(manifest_size
, size_on_disk
);
2310 } while (ChangeCompactOptions());
2314 TEST_F(DBTest
, PurgeInfoLogs
) {
2315 Options options
= CurrentOptions();
2316 options
.keep_log_file_num
= 5;
2317 options
.create_if_missing
= true;
2318 for (int mode
= 0; mode
<= 1; mode
++) {
2320 options
.db_log_dir
= dbname_
+ "_logs";
2321 env_
->CreateDirIfMissing(options
.db_log_dir
);
2323 options
.db_log_dir
= "";
2325 for (int i
= 0; i
< 8; i
++) {
2329 std::vector
<std::string
> files
;
2330 env_
->GetChildren(options
.db_log_dir
.empty() ? dbname_
: options
.db_log_dir
,
2332 int info_log_count
= 0;
2333 for (std::string file
: files
) {
2334 if (file
.find("LOG") != std::string::npos
) {
2338 ASSERT_EQ(5, info_log_count
);
2341 // For mode (1), test DestroyDB() to delete all the logs under DB dir.
2342 // For mode (2), no info log file should have been put under DB dir.
2343 std::vector
<std::string
> db_files
;
2344 env_
->GetChildren(dbname_
, &db_files
);
2345 for (std::string file
: db_files
) {
2346 ASSERT_TRUE(file
.find("LOG") == std::string::npos
);
2351 env_
->GetChildren(options
.db_log_dir
, &files
);
2352 for (std::string file
: files
) {
2353 env_
->DeleteFile(options
.db_log_dir
+ "/" + file
);
2355 env_
->DeleteDir(options
.db_log_dir
);
2360 #ifndef ROCKSDB_LITE
2361 // Multi-threaded test:
2364 static const int kColumnFamilies
= 10;
2365 static const int kNumThreads
= 10;
2366 static const int kTestSeconds
= 10;
2367 static const int kNumKeys
= 1000;
2371 std::atomic
<bool> stop
;
2372 std::atomic
<int> counter
[kNumThreads
];
2373 std::atomic
<bool> thread_done
[kNumThreads
];
2379 bool multiget_batched
;
2382 static void MTThreadBody(void* arg
) {
2383 MTThread
* t
= reinterpret_cast<MTThread
*>(arg
);
2385 DB
* db
= t
->state
->test
->db_
;
2387 fprintf(stderr
, "... starting thread %d\n", id
);
2388 Random
rnd(1000 + id
);
2390 while (t
->state
->stop
.load(std::memory_order_acquire
) == false) {
2391 t
->state
->counter
[id
].store(counter
, std::memory_order_release
);
2393 int key
= rnd
.Uniform(kNumKeys
);
2395 snprintf(keybuf
, sizeof(keybuf
), "%016d", key
);
2398 // Write values of the form <key, my id, counter, cf, unique_id>.
2399 // into each of the CFs
2400 // We add some padding for force compactions.
2401 int unique_id
= rnd
.Uniform(1000000);
2403 // Half of the time directly use WriteBatch. Half of the time use
2404 // WriteBatchWithIndex.
2407 for (int cf
= 0; cf
< kColumnFamilies
; ++cf
) {
2408 snprintf(valbuf
, sizeof(valbuf
), "%d.%d.%d.%d.%-1000d", key
, id
,
2409 static_cast<int>(counter
), cf
, unique_id
);
2410 batch
.Put(t
->state
->test
->handles_
[cf
], Slice(keybuf
), Slice(valbuf
));
2412 ASSERT_OK(db
->Write(WriteOptions(), &batch
));
2414 WriteBatchWithIndex
batch(db
->GetOptions().comparator
);
2415 for (int cf
= 0; cf
< kColumnFamilies
; ++cf
) {
2416 snprintf(valbuf
, sizeof(valbuf
), "%d.%d.%d.%d.%-1000d", key
, id
,
2417 static_cast<int>(counter
), cf
, unique_id
);
2418 batch
.Put(t
->state
->test
->handles_
[cf
], Slice(keybuf
), Slice(valbuf
));
2420 ASSERT_OK(db
->Write(WriteOptions(), batch
.GetWriteBatch()));
2423 // Read a value and verify that it matches the pattern written above
2424 // and that writes to all column families were atomic (unique_id is the
2426 std::vector
<Slice
> keys(kColumnFamilies
, Slice(keybuf
));
2427 std::vector
<std::string
> values
;
2428 std::vector
<Status
> statuses
;
2429 if (!t
->multiget_batched
) {
2430 statuses
= db
->MultiGet(ReadOptions(), t
->state
->test
->handles_
, keys
,
2433 std::vector
<PinnableSlice
> pin_values(keys
.size());
2434 statuses
.resize(keys
.size());
2435 const Snapshot
* snapshot
= db
->GetSnapshot();
2437 ro
.snapshot
= snapshot
;
2438 for (int cf
= 0; cf
< kColumnFamilies
; ++cf
) {
2439 db
->MultiGet(ro
, t
->state
->test
->handles_
[cf
], 1, &keys
[cf
],
2440 &pin_values
[cf
], &statuses
[cf
]);
2442 db
->ReleaseSnapshot(snapshot
);
2443 values
.resize(keys
.size());
2444 for (int cf
= 0; cf
< kColumnFamilies
; ++cf
) {
2445 if (statuses
[cf
].ok()) {
2446 values
[cf
].assign(pin_values
[cf
].data(), pin_values
[cf
].size());
2450 Status s
= statuses
[0];
2451 // all statuses have to be the same
2452 for (size_t i
= 1; i
< statuses
.size(); ++i
) {
2453 // they are either both ok or both not-found
2454 ASSERT_TRUE((s
.ok() && statuses
[i
].ok()) ||
2455 (s
.IsNotFound() && statuses
[i
].IsNotFound()));
2457 if (s
.IsNotFound()) {
2458 // Key has not yet been written
2460 // Check that the writer thread counter is >= the counter in the value
2463 for (int i
= 0; i
< kColumnFamilies
; ++i
) {
2465 ASSERT_EQ(5, sscanf(values
[i
].c_str(), "%d.%d.%d.%d.%d", &k
, &w
, &c
,
2470 ASSERT_LT(w
, kNumThreads
);
2471 ASSERT_LE(c
, t
->state
->counter
[w
].load(std::memory_order_acquire
));
2476 // this checks that updates across column families happened
2477 // atomically -- all unique ids are the same
2478 ASSERT_EQ(u
, unique_id
);
2485 t
->state
->thread_done
[id
].store(true, std::memory_order_release
);
2486 fprintf(stderr
, "... stopping thread %d after %d ops\n", id
, int(counter
));
2491 class MultiThreadedDBTest
2493 public ::testing::WithParamInterface
<std::tuple
<int, bool>> {
2495 void SetUp() override
{
2496 std::tie(option_config_
, multiget_batched_
) = GetParam();
2499 static std::vector
<int> GenerateOptionConfigs() {
2500 std::vector
<int> optionConfigs
;
2501 for (int optionConfig
= kDefault
; optionConfig
< kEnd
; ++optionConfig
) {
2502 optionConfigs
.push_back(optionConfig
);
2504 return optionConfigs
;
2507 bool multiget_batched_
;
2510 TEST_P(MultiThreadedDBTest
, MultiThreaded
) {
2511 if (option_config_
== kPipelinedWrite
) return;
2512 anon::OptionsOverride options_override
;
2513 options_override
.skip_policy
= kSkipNoSnapshot
;
2514 Options options
= CurrentOptions(options_override
);
2515 std::vector
<std::string
> cfs
;
2516 for (int i
= 1; i
< kColumnFamilies
; ++i
) {
2517 cfs
.push_back(ToString(i
));
2520 CreateAndReopenWithCF(cfs
, options
);
2524 mt
.stop
.store(false, std::memory_order_release
);
2525 for (int id
= 0; id
< kNumThreads
; id
++) {
2526 mt
.counter
[id
].store(0, std::memory_order_release
);
2527 mt
.thread_done
[id
].store(false, std::memory_order_release
);
2531 MTThread thread
[kNumThreads
];
2532 for (int id
= 0; id
< kNumThreads
; id
++) {
2533 thread
[id
].state
= &mt
;
2535 thread
[id
].multiget_batched
= multiget_batched_
;
2536 env_
->StartThread(MTThreadBody
, &thread
[id
]);
2539 // Let them run for a while
2540 env_
->SleepForMicroseconds(kTestSeconds
* 1000000);
2542 // Stop the threads and wait for them to finish
2543 mt
.stop
.store(true, std::memory_order_release
);
2544 for (int id
= 0; id
< kNumThreads
; id
++) {
2545 while (mt
.thread_done
[id
].load(std::memory_order_acquire
) == false) {
2546 env_
->SleepForMicroseconds(100000);
2551 INSTANTIATE_TEST_CASE_P(
2552 MultiThreaded
, MultiThreadedDBTest
,
2554 ::testing::ValuesIn(MultiThreadedDBTest::GenerateOptionConfigs()),
2555 ::testing::Bool()));
2556 #endif // ROCKSDB_LITE
2558 // Group commit test:
2559 #if !defined(TRAVIS) && !defined(OS_WIN)
2560 // Disable this test temporarily on Travis and appveyor as it fails
2561 // intermittently. Github issue: #4151
2564 static const int kGCNumThreads
= 4;
2565 static const int kGCNumKeys
= 1000;
2570 std::atomic
<bool> done
;
2573 static void GCThreadBody(void* arg
) {
2574 GCThread
* t
= reinterpret_cast<GCThread
*>(arg
);
2579 for (int i
= 0; i
< kGCNumKeys
; ++i
) {
2580 std::string
kv(ToString(i
+ id
* kGCNumKeys
));
2581 ASSERT_OK(db
->Put(wo
, kv
, kv
));
2588 TEST_F(DBTest
, GroupCommitTest
) {
2590 Options options
= CurrentOptions();
2592 options
.statistics
= ROCKSDB_NAMESPACE::CreateDBStatistics();
2595 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
2596 {{"WriteThread::JoinBatchGroup:BeganWaiting",
2597 "DBImpl::WriteImpl:BeforeLeaderEnters"},
2598 {"WriteThread::AwaitState:BlockingWaiting",
2599 "WriteThread::EnterAsBatchGroupLeader:End"}});
2600 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2603 GCThread thread
[kGCNumThreads
];
2604 for (int id
= 0; id
< kGCNumThreads
; id
++) {
2606 thread
[id
].db
= db_
;
2607 thread
[id
].done
= false;
2608 env_
->StartThread(GCThreadBody
, &thread
[id
]);
2610 env_
->WaitForJoin();
2612 ASSERT_GT(TestGetTickerCount(options
, WRITE_DONE_BY_OTHER
), 0);
2614 std::vector
<std::string
> expected_db
;
2615 for (int i
= 0; i
< kGCNumThreads
* kGCNumKeys
; ++i
) {
2616 expected_db
.push_back(ToString(i
));
2618 std::sort(expected_db
.begin(), expected_db
.end());
2620 Iterator
* itr
= db_
->NewIterator(ReadOptions());
2622 for (auto x
: expected_db
) {
2623 ASSERT_TRUE(itr
->Valid());
2624 ASSERT_EQ(itr
->key().ToString(), x
);
2625 ASSERT_EQ(itr
->value().ToString(), x
);
2628 ASSERT_TRUE(!itr
->Valid());
2631 HistogramData hist_data
;
2632 options
.statistics
->histogramData(DB_WRITE
, &hist_data
);
2633 ASSERT_GT(hist_data
.average
, 0.0);
2634 } while (ChangeOptions(kSkipNoSeekToLast
));
2639 typedef std::map
<std::string
, std::string
> KVMap
;
2642 class ModelDB
: public DB
{
2644 class ModelSnapshot
: public Snapshot
{
2648 SequenceNumber
GetSequenceNumber() const override
{
2649 // no need to call this
2655 explicit ModelDB(const Options
& options
) : options_(options
) {}
2657 Status
Put(const WriteOptions
& o
, ColumnFamilyHandle
* cf
, const Slice
& k
,
2658 const Slice
& v
) override
{
2660 batch
.Put(cf
, k
, v
);
2661 return Write(o
, &batch
);
2664 Status
Close() override
{ return Status::OK(); }
2666 Status
Delete(const WriteOptions
& o
, ColumnFamilyHandle
* cf
,
2667 const Slice
& key
) override
{
2669 batch
.Delete(cf
, key
);
2670 return Write(o
, &batch
);
2672 using DB::SingleDelete
;
2673 Status
SingleDelete(const WriteOptions
& o
, ColumnFamilyHandle
* cf
,
2674 const Slice
& key
) override
{
2676 batch
.SingleDelete(cf
, key
);
2677 return Write(o
, &batch
);
2680 Status
Merge(const WriteOptions
& o
, ColumnFamilyHandle
* cf
, const Slice
& k
,
2681 const Slice
& v
) override
{
2683 batch
.Merge(cf
, k
, v
);
2684 return Write(o
, &batch
);
2687 Status
Get(const ReadOptions
& /*options*/, ColumnFamilyHandle
* /*cf*/,
2688 const Slice
& key
, PinnableSlice
* /*value*/) override
{
2689 return Status::NotSupported(key
);
2692 using DB::GetMergeOperands
;
2693 virtual Status
GetMergeOperands(
2694 const ReadOptions
& /*options*/, ColumnFamilyHandle
* /*column_family*/,
2695 const Slice
& key
, PinnableSlice
* /*slice*/,
2696 GetMergeOperandsOptions
* /*merge_operands_options*/,
2697 int* /*number_of_operands*/) override
{
2698 return Status::NotSupported(key
);
2702 std::vector
<Status
> MultiGet(
2703 const ReadOptions
& /*options*/,
2704 const std::vector
<ColumnFamilyHandle
*>& /*column_family*/,
2705 const std::vector
<Slice
>& keys
,
2706 std::vector
<std::string
>* /*values*/) override
{
2707 std::vector
<Status
> s(keys
.size(),
2708 Status::NotSupported("Not implemented."));
2712 #ifndef ROCKSDB_LITE
2713 using DB::IngestExternalFile
;
2714 Status
IngestExternalFile(
2715 ColumnFamilyHandle
* /*column_family*/,
2716 const std::vector
<std::string
>& /*external_files*/,
2717 const IngestExternalFileOptions
& /*options*/) override
{
2718 return Status::NotSupported("Not implemented.");
2721 using DB::IngestExternalFiles
;
2722 Status
IngestExternalFiles(
2723 const std::vector
<IngestExternalFileArg
>& /*args*/) override
{
2724 return Status::NotSupported("Not implemented");
2727 using DB::CreateColumnFamilyWithImport
;
2728 virtual Status
CreateColumnFamilyWithImport(
2729 const ColumnFamilyOptions
& /*options*/,
2730 const std::string
& /*column_family_name*/,
2731 const ImportColumnFamilyOptions
& /*import_options*/,
2732 const ExportImportFilesMetaData
& /*metadata*/,
2733 ColumnFamilyHandle
** /*handle*/) override
{
2734 return Status::NotSupported("Not implemented.");
2737 using DB::VerifyChecksum
;
2738 Status
VerifyChecksum(const ReadOptions
&) override
{
2739 return Status::NotSupported("Not implemented.");
2742 using DB::GetPropertiesOfAllTables
;
2743 Status
GetPropertiesOfAllTables(
2744 ColumnFamilyHandle
* /*column_family*/,
2745 TablePropertiesCollection
* /*props*/) override
{
2749 Status
GetPropertiesOfTablesInRange(
2750 ColumnFamilyHandle
* /*column_family*/, const Range
* /*range*/,
2751 std::size_t /*n*/, TablePropertiesCollection
* /*props*/) override
{
2754 #endif // ROCKSDB_LITE
2756 using DB::KeyMayExist
;
2757 bool KeyMayExist(const ReadOptions
& /*options*/,
2758 ColumnFamilyHandle
* /*column_family*/, const Slice
& /*key*/,
2759 std::string
* /*value*/,
2760 bool* value_found
= nullptr) override
{
2761 if (value_found
!= nullptr) {
2762 *value_found
= false;
2764 return true; // Not Supported directly
2766 using DB::NewIterator
;
2767 Iterator
* NewIterator(const ReadOptions
& options
,
2768 ColumnFamilyHandle
* /*column_family*/) override
{
2769 if (options
.snapshot
== nullptr) {
2770 KVMap
* saved
= new KVMap
;
2772 return new ModelIter(saved
, true);
2774 const KVMap
* snapshot_state
=
2775 &(reinterpret_cast<const ModelSnapshot
*>(options
.snapshot
)->map_
);
2776 return new ModelIter(snapshot_state
, false);
2779 Status
NewIterators(const ReadOptions
& /*options*/,
2780 const std::vector
<ColumnFamilyHandle
*>& /*column_family*/,
2781 std::vector
<Iterator
*>* /*iterators*/) override
{
2782 return Status::NotSupported("Not supported yet");
2784 const Snapshot
* GetSnapshot() override
{
2785 ModelSnapshot
* snapshot
= new ModelSnapshot
;
2786 snapshot
->map_
= map_
;
2790 void ReleaseSnapshot(const Snapshot
* snapshot
) override
{
2791 delete reinterpret_cast<const ModelSnapshot
*>(snapshot
);
2794 Status
Write(const WriteOptions
& /*options*/, WriteBatch
* batch
) override
{
2795 class Handler
: public WriteBatch::Handler
{
2798 void Put(const Slice
& key
, const Slice
& value
) override
{
2799 (*map_
)[key
.ToString()] = value
.ToString();
2801 void Merge(const Slice
& /*key*/, const Slice
& /*value*/) override
{
2802 // ignore merge for now
2803 // (*map_)[key.ToString()] = value.ToString();
2805 void Delete(const Slice
& key
) override
{ map_
->erase(key
.ToString()); }
2808 handler
.map_
= &map_
;
2809 return batch
->Iterate(&handler
);
2812 using DB::GetProperty
;
2813 bool GetProperty(ColumnFamilyHandle
* /*column_family*/,
2814 const Slice
& /*property*/, std::string
* /*value*/) override
{
2817 using DB::GetIntProperty
;
2818 bool GetIntProperty(ColumnFamilyHandle
* /*column_family*/,
2819 const Slice
& /*property*/, uint64_t* /*value*/) override
{
2822 using DB::GetMapProperty
;
2823 bool GetMapProperty(ColumnFamilyHandle
* /*column_family*/,
2824 const Slice
& /*property*/,
2825 std::map
<std::string
, std::string
>* /*value*/) override
{
2828 using DB::GetAggregatedIntProperty
;
2829 bool GetAggregatedIntProperty(const Slice
& /*property*/,
2830 uint64_t* /*value*/) override
{
2833 using DB::GetApproximateSizes
;
2834 Status
GetApproximateSizes(const SizeApproximationOptions
& /*options*/,
2835 ColumnFamilyHandle
* /*column_family*/,
2836 const Range
* /*range*/, int n
,
2837 uint64_t* sizes
) override
{
2838 for (int i
= 0; i
< n
; i
++) {
2841 return Status::OK();
2843 using DB::GetApproximateMemTableStats
;
2844 void GetApproximateMemTableStats(ColumnFamilyHandle
* /*column_family*/,
2845 const Range
& /*range*/,
2846 uint64_t* const count
,
2847 uint64_t* const size
) override
{
2851 using DB::CompactRange
;
2852 Status
CompactRange(const CompactRangeOptions
& /*options*/,
2853 ColumnFamilyHandle
* /*column_family*/,
2854 const Slice
* /*start*/, const Slice
* /*end*/) override
{
2855 return Status::NotSupported("Not supported operation.");
2858 Status
SetDBOptions(
2859 const std::unordered_map
<std::string
, std::string
>& /*new_options*/)
2861 return Status::NotSupported("Not supported operation.");
2864 using DB::CompactFiles
;
2865 Status
CompactFiles(
2866 const CompactionOptions
& /*compact_options*/,
2867 ColumnFamilyHandle
* /*column_family*/,
2868 const std::vector
<std::string
>& /*input_file_names*/,
2869 const int /*output_level*/, const int /*output_path_id*/ = -1,
2870 std::vector
<std::string
>* const /*output_file_names*/ = nullptr,
2871 CompactionJobInfo
* /*compaction_job_info*/ = nullptr) override
{
2872 return Status::NotSupported("Not supported operation.");
2875 Status
PauseBackgroundWork() override
{
2876 return Status::NotSupported("Not supported operation.");
2879 Status
ContinueBackgroundWork() override
{
2880 return Status::NotSupported("Not supported operation.");
2883 Status
EnableAutoCompaction(
2884 const std::vector
<ColumnFamilyHandle
*>& /*column_family_handles*/)
2886 return Status::NotSupported("Not supported operation.");
2889 void EnableManualCompaction() override
{ return; }
2891 void DisableManualCompaction() override
{ return; }
2893 using DB::NumberLevels
;
2894 int NumberLevels(ColumnFamilyHandle
* /*column_family*/) override
{ return 1; }
2896 using DB::MaxMemCompactionLevel
;
2897 int MaxMemCompactionLevel(ColumnFamilyHandle
* /*column_family*/) override
{
2901 using DB::Level0StopWriteTrigger
;
2902 int Level0StopWriteTrigger(ColumnFamilyHandle
* /*column_family*/) override
{
2906 const std::string
& GetName() const override
{ return name_
; }
2908 Env
* GetEnv() const override
{ return nullptr; }
2910 using DB::GetOptions
;
2911 Options
GetOptions(ColumnFamilyHandle
* /*column_family*/) const override
{
2915 using DB::GetDBOptions
;
2916 DBOptions
GetDBOptions() const override
{ return options_
; }
2919 Status
Flush(const ROCKSDB_NAMESPACE::FlushOptions
& /*options*/,
2920 ColumnFamilyHandle
* /*column_family*/) override
{
2925 const ROCKSDB_NAMESPACE::FlushOptions
& /*options*/,
2926 const std::vector
<ColumnFamilyHandle
*>& /*column_families*/) override
{
2927 return Status::OK();
2930 Status
SyncWAL() override
{ return Status::OK(); }
2932 #ifndef ROCKSDB_LITE
2933 Status
DisableFileDeletions() override
{ return Status::OK(); }
2935 Status
EnableFileDeletions(bool /*force*/) override
{ return Status::OK(); }
2936 Status
GetLiveFiles(std::vector
<std::string
>&, uint64_t* /*size*/,
2937 bool /*flush_memtable*/ = true) override
{
2938 return Status::OK();
2941 Status
GetSortedWalFiles(VectorLogPtr
& /*files*/) override
{
2942 return Status::OK();
2945 Status
GetCurrentWalFile(
2946 std::unique_ptr
<LogFile
>* /*current_log_file*/) override
{
2947 return Status::OK();
2950 virtual Status
GetCreationTimeOfOldestFile(
2951 uint64_t* /*creation_time*/) override
{
2952 return Status::NotSupported();
2955 Status
DeleteFile(std::string
/*name*/) override
{ return Status::OK(); }
2957 Status
GetUpdatesSince(
2958 ROCKSDB_NAMESPACE::SequenceNumber
,
2959 std::unique_ptr
<ROCKSDB_NAMESPACE::TransactionLogIterator
>*,
2960 const TransactionLogIterator::ReadOptions
& /*read_options*/ =
2961 TransactionLogIterator::ReadOptions()) override
{
2962 return Status::NotSupported("Not supported in Model DB");
2965 void GetColumnFamilyMetaData(ColumnFamilyHandle
* /*column_family*/,
2966 ColumnFamilyMetaData
* /*metadata*/) override
{}
2967 #endif // ROCKSDB_LITE
2969 Status
GetDbIdentity(std::string
& /*identity*/) const override
{
2970 return Status::OK();
2973 SequenceNumber
GetLatestSequenceNumber() const override
{ return 0; }
2975 bool SetPreserveDeletesSequenceNumber(SequenceNumber
/*seqnum*/) override
{
2979 ColumnFamilyHandle
* DefaultColumnFamily() const override
{ return nullptr; }
2982 class ModelIter
: public Iterator
{
2984 ModelIter(const KVMap
* map
, bool owned
)
2985 : map_(map
), owned_(owned
), iter_(map_
->end()) {}
2986 ~ModelIter() override
{
2987 if (owned_
) delete map_
;
2989 bool Valid() const override
{ return iter_
!= map_
->end(); }
2990 void SeekToFirst() override
{ iter_
= map_
->begin(); }
2991 void SeekToLast() override
{
2992 if (map_
->empty()) {
2993 iter_
= map_
->end();
2995 iter_
= map_
->find(map_
->rbegin()->first
);
2998 void Seek(const Slice
& k
) override
{
2999 iter_
= map_
->lower_bound(k
.ToString());
3001 void SeekForPrev(const Slice
& k
) override
{
3002 iter_
= map_
->upper_bound(k
.ToString());
3005 void Next() override
{ ++iter_
; }
3006 void Prev() override
{
3007 if (iter_
== map_
->begin()) {
3008 iter_
= map_
->end();
3014 Slice
key() const override
{ return iter_
->first
; }
3015 Slice
value() const override
{ return iter_
->second
; }
3016 Status
status() const override
{ return Status::OK(); }
3019 const KVMap
* const map_
;
3020 const bool owned_
; // Do we own map_
3021 KVMap::const_iterator iter_
;
3023 const Options options_
;
3025 std::string name_
= "";
3028 #ifndef ROCKSDB_VALGRIND_RUN
3029 static std::string
RandomKey(Random
* rnd
, int minimum
= 0) {
3032 len
= (rnd
->OneIn(3)
3033 ? 1 // Short sometimes to encourage collisions
3034 : (rnd
->OneIn(100) ? rnd
->Skewed(10) : rnd
->Uniform(10)));
3035 } while (len
< minimum
);
3036 return test::RandomKey(rnd
, len
);
3039 static bool CompareIterators(int step
, DB
* model
, DB
* db
,
3040 const Snapshot
* model_snap
,
3041 const Snapshot
* db_snap
) {
3042 ReadOptions options
;
3043 options
.snapshot
= model_snap
;
3044 Iterator
* miter
= model
->NewIterator(options
);
3045 options
.snapshot
= db_snap
;
3046 Iterator
* dbiter
= db
->NewIterator(options
);
3049 for (miter
->SeekToFirst(), dbiter
->SeekToFirst();
3050 ok
&& miter
->Valid() && dbiter
->Valid(); miter
->Next(), dbiter
->Next()) {
3052 if (miter
->key().compare(dbiter
->key()) != 0) {
3053 fprintf(stderr
, "step %d: Key mismatch: '%s' vs. '%s'\n", step
,
3054 EscapeString(miter
->key()).c_str(),
3055 EscapeString(dbiter
->key()).c_str());
3060 if (miter
->value().compare(dbiter
->value()) != 0) {
3061 fprintf(stderr
, "step %d: Value mismatch for key '%s': '%s' vs. '%s'\n",
3062 step
, EscapeString(miter
->key()).c_str(),
3063 EscapeString(miter
->value()).c_str(),
3064 EscapeString(miter
->value()).c_str());
3070 if (miter
->Valid() != dbiter
->Valid()) {
3071 fprintf(stderr
, "step %d: Mismatch at end of iterators: %d vs. %d\n",
3072 step
, miter
->Valid(), dbiter
->Valid());
3081 class DBTestRandomized
: public DBTest
,
3082 public ::testing::WithParamInterface
<int> {
3084 void SetUp() override
{ option_config_
= GetParam(); }
3086 static std::vector
<int> GenerateOptionConfigs() {
3087 std::vector
<int> option_configs
;
3088 // skip cuckoo hash as it does not support snapshot.
3089 for (int option_config
= kDefault
; option_config
< kEnd
; ++option_config
) {
3090 if (!ShouldSkipOptions(option_config
,
3091 kSkipDeletesFilterFirst
| kSkipNoSeekToLast
)) {
3092 option_configs
.push_back(option_config
);
3095 option_configs
.push_back(kBlockBasedTableWithIndexRestartInterval
);
3096 return option_configs
;
3100 INSTANTIATE_TEST_CASE_P(
3101 DBTestRandomized
, DBTestRandomized
,
3102 ::testing::ValuesIn(DBTestRandomized::GenerateOptionConfigs()));
3104 TEST_P(DBTestRandomized
, Randomized
) {
3105 anon::OptionsOverride options_override
;
3106 options_override
.skip_policy
= kSkipNoSnapshot
;
3107 Options options
= CurrentOptions(options_override
);
3108 DestroyAndReopen(options
);
3110 Random
rnd(test::RandomSeed() + GetParam());
3111 ModelDB
model(options
);
3112 const int N
= 10000;
3113 const Snapshot
* model_snap
= nullptr;
3114 const Snapshot
* db_snap
= nullptr;
3116 for (int step
= 0; step
< N
; step
++) {
3117 // TODO(sanjay): Test Get() works
3118 int p
= rnd
.Uniform(100);
3120 if (option_config_
== kHashSkipList
|| option_config_
== kHashLinkList
||
3121 option_config_
== kPlainTableFirstBytePrefix
||
3122 option_config_
== kBlockBasedTableWithWholeKeyHashIndex
||
3123 option_config_
== kBlockBasedTableWithPrefixHashIndex
) {
3126 if (p
< 45) { // Put
3127 k
= RandomKey(&rnd
, minimum
);
3128 v
= RandomString(&rnd
,
3129 rnd
.OneIn(20) ? 100 + rnd
.Uniform(100) : rnd
.Uniform(8));
3130 ASSERT_OK(model
.Put(WriteOptions(), k
, v
));
3131 ASSERT_OK(db_
->Put(WriteOptions(), k
, v
));
3132 } else if (p
< 90) { // Delete
3133 k
= RandomKey(&rnd
, minimum
);
3134 ASSERT_OK(model
.Delete(WriteOptions(), k
));
3135 ASSERT_OK(db_
->Delete(WriteOptions(), k
));
3136 } else { // Multi-element batch
3138 const int num
= rnd
.Uniform(8);
3139 for (int i
= 0; i
< num
; i
++) {
3140 if (i
== 0 || !rnd
.OneIn(10)) {
3141 k
= RandomKey(&rnd
, minimum
);
3143 // Periodically re-use the same key from the previous iter, so
3144 // we have multiple entries in the write batch for the same key
3147 v
= RandomString(&rnd
, rnd
.Uniform(10));
3153 ASSERT_OK(model
.Write(WriteOptions(), &b
));
3154 ASSERT_OK(db_
->Write(WriteOptions(), &b
));
3157 if ((step
% 100) == 0) {
3158 // For DB instances that use the hash index + block-based table, the
3159 // iterator will be invalid right when seeking a non-existent key, right
3160 // than return a key that is close to it.
3161 if (option_config_
!= kBlockBasedTableWithWholeKeyHashIndex
&&
3162 option_config_
!= kBlockBasedTableWithPrefixHashIndex
) {
3163 ASSERT_TRUE(CompareIterators(step
, &model
, db_
, nullptr, nullptr));
3164 ASSERT_TRUE(CompareIterators(step
, &model
, db_
, model_snap
, db_snap
));
3167 // Save a snapshot from each DB this time that we'll use next
3168 // time we compare things, to make sure the current state is
3169 // preserved with the snapshot
3170 if (model_snap
!= nullptr) model
.ReleaseSnapshot(model_snap
);
3171 if (db_snap
!= nullptr) db_
->ReleaseSnapshot(db_snap
);
3174 ASSERT_TRUE(CompareIterators(step
, &model
, db_
, nullptr, nullptr));
3176 model_snap
= model
.GetSnapshot();
3177 db_snap
= db_
->GetSnapshot();
3180 if (model_snap
!= nullptr) model
.ReleaseSnapshot(model_snap
);
3181 if (db_snap
!= nullptr) db_
->ReleaseSnapshot(db_snap
);
3183 #endif // ROCKSDB_VALGRIND_RUN
3185 TEST_F(DBTest
, BlockBasedTablePrefixIndexTest
) {
3186 // create a DB with block prefix index
3187 BlockBasedTableOptions table_options
;
3188 Options options
= CurrentOptions();
3189 table_options
.index_type
= BlockBasedTableOptions::kHashSearch
;
3190 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
3191 options
.prefix_extractor
.reset(NewFixedPrefixTransform(1));
3194 ASSERT_OK(Put("k1", "v1"));
3196 ASSERT_OK(Put("k2", "v2"));
3198 // Reopen it without prefix extractor, make sure everything still works.
3199 // RocksDB should just fall back to the binary index.
3200 table_options
.index_type
= BlockBasedTableOptions::kBinarySearch
;
3201 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
3202 options
.prefix_extractor
.reset();
3205 ASSERT_EQ("v1", Get("k1"));
3206 ASSERT_EQ("v2", Get("k2"));
3209 TEST_F(DBTest
, BlockBasedTablePrefixIndexTotalOrderSeek
) {
3210 // create a DB with block prefix index
3211 BlockBasedTableOptions table_options
;
3212 Options options
= CurrentOptions();
3213 options
.max_open_files
= 10;
3214 table_options
.index_type
= BlockBasedTableOptions::kHashSearch
;
3215 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
3216 options
.prefix_extractor
.reset(NewFixedPrefixTransform(1));
3218 // RocksDB sanitize max open files to at least 20. Modify it back.
3219 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3220 "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg
) {
3221 int* max_open_files
= static_cast<int*>(arg
);
3222 *max_open_files
= 11;
3224 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3227 ASSERT_OK(Put("k1", "v1"));
3230 CompactRangeOptions cro
;
3231 cro
.change_level
= true;
3232 cro
.target_level
= 1;
3233 ASSERT_OK(db_
->CompactRange(cro
, nullptr, nullptr));
3235 // Force evict tables
3236 dbfull()->TEST_table_cache()->SetCapacity(0);
3237 // Make table cache to keep one entry.
3238 dbfull()->TEST_table_cache()->SetCapacity(1);
3240 ReadOptions read_options
;
3241 read_options
.total_order_seek
= true;
3243 std::unique_ptr
<Iterator
> iter(db_
->NewIterator(read_options
));
3245 ASSERT_TRUE(iter
->Valid());
3246 ASSERT_EQ("k1", iter
->key().ToString());
3249 // After total order seek, prefix index should still be used.
3250 read_options
.total_order_seek
= false;
3252 std::unique_ptr
<Iterator
> iter(db_
->NewIterator(read_options
));
3254 ASSERT_TRUE(iter
->Valid());
3255 ASSERT_EQ("k1", iter
->key().ToString());
3257 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3260 TEST_F(DBTest
, ChecksumTest
) {
3261 BlockBasedTableOptions table_options
;
3262 Options options
= CurrentOptions();
3264 table_options
.checksum
= kCRC32c
;
3265 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
3267 ASSERT_OK(Put("a", "b"));
3268 ASSERT_OK(Put("c", "d"));
3269 ASSERT_OK(Flush()); // table with crc checksum
3271 table_options
.checksum
= kxxHash
;
3272 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
3274 ASSERT_OK(Put("e", "f"));
3275 ASSERT_OK(Put("g", "h"));
3276 ASSERT_OK(Flush()); // table with xxhash checksum
3278 table_options
.checksum
= kCRC32c
;
3279 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
3281 ASSERT_EQ("b", Get("a"));
3282 ASSERT_EQ("d", Get("c"));
3283 ASSERT_EQ("f", Get("e"));
3284 ASSERT_EQ("h", Get("g"));
3286 table_options
.checksum
= kCRC32c
;
3287 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
3289 ASSERT_EQ("b", Get("a"));
3290 ASSERT_EQ("d", Get("c"));
3291 ASSERT_EQ("f", Get("e"));
3292 ASSERT_EQ("h", Get("g"));
3295 #ifndef ROCKSDB_LITE
3296 TEST_P(DBTestWithParam
, FIFOCompactionTest
) {
3297 for (int iter
= 0; iter
< 2; ++iter
) {
3298 // first iteration -- auto compaction
3299 // second iteration -- manual compaction
3301 options
.compaction_style
= kCompactionStyleFIFO
;
3302 options
.write_buffer_size
= 100 << 10; // 100KB
3303 options
.arena_block_size
= 4096;
3304 options
.compaction_options_fifo
.max_table_files_size
= 500 << 10; // 500KB
3305 options
.compression
= kNoCompression
;
3306 options
.create_if_missing
= true;
3307 options
.max_subcompactions
= max_subcompactions_
;
3309 options
.disable_auto_compactions
= true;
3311 options
= CurrentOptions(options
);
3312 DestroyAndReopen(options
);
3315 for (int i
= 0; i
< 6; ++i
) {
3316 for (int j
= 0; j
< 110; ++j
) {
3317 ASSERT_OK(Put(ToString(i
* 100 + j
), RandomString(&rnd
, 980)));
3319 // flush should happen here
3320 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
3323 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3325 CompactRangeOptions cro
;
3326 cro
.exclusive_manual_compaction
= exclusive_manual_compaction_
;
3327 ASSERT_OK(db_
->CompactRange(cro
, nullptr, nullptr));
3329 // only 5 files should survive
3330 ASSERT_EQ(NumTableFilesAtLevel(0), 5);
3331 for (int i
= 0; i
< 50; ++i
) {
3332 // these keys should be deleted in previous compaction
3333 ASSERT_EQ("NOT_FOUND", Get(ToString(i
)));
3338 TEST_F(DBTest
, FIFOCompactionTestWithCompaction
) {
3340 options
.compaction_style
= kCompactionStyleFIFO
;
3341 options
.write_buffer_size
= 20 << 10; // 20K
3342 options
.arena_block_size
= 4096;
3343 options
.compaction_options_fifo
.max_table_files_size
= 1500 << 10; // 1MB
3344 options
.compaction_options_fifo
.allow_compaction
= true;
3345 options
.level0_file_num_compaction_trigger
= 6;
3346 options
.compression
= kNoCompression
;
3347 options
.create_if_missing
= true;
3348 options
= CurrentOptions(options
);
3349 DestroyAndReopen(options
);
3352 for (int i
= 0; i
< 60; i
++) {
3353 // Generate and flush a file about 20KB.
3354 for (int j
= 0; j
< 20; j
++) {
3355 ASSERT_OK(Put(ToString(i
* 20 + j
), RandomString(&rnd
, 980)));
3358 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3360 // It should be compacted to 10 files.
3361 ASSERT_EQ(NumTableFilesAtLevel(0), 10);
3363 for (int i
= 0; i
< 60; i
++) {
3364 // Generate and flush a file about 20KB.
3365 for (int j
= 0; j
< 20; j
++) {
3366 ASSERT_OK(Put(ToString(i
* 20 + j
+ 2000), RandomString(&rnd
, 980)));
3369 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3372 // It should be compacted to no more than 20 files.
3373 ASSERT_GT(NumTableFilesAtLevel(0), 10);
3374 ASSERT_LT(NumTableFilesAtLevel(0), 18);
3375 // Size limit is still guaranteed.
3376 ASSERT_LE(SizeAtLevel(0),
3377 options
.compaction_options_fifo
.max_table_files_size
);
3380 TEST_F(DBTest
, FIFOCompactionStyleWithCompactionAndDelete
) {
3382 options
.compaction_style
= kCompactionStyleFIFO
;
3383 options
.write_buffer_size
= 20 << 10; // 20K
3384 options
.arena_block_size
= 4096;
3385 options
.compaction_options_fifo
.max_table_files_size
= 1500 << 10; // 1MB
3386 options
.compaction_options_fifo
.allow_compaction
= true;
3387 options
.level0_file_num_compaction_trigger
= 3;
3388 options
.compression
= kNoCompression
;
3389 options
.create_if_missing
= true;
3390 options
= CurrentOptions(options
);
3391 DestroyAndReopen(options
);
3394 for (int i
= 0; i
< 3; i
++) {
3395 // Each file contains a different key which will be dropped later.
3396 ASSERT_OK(Put("a" + ToString(i
), RandomString(&rnd
, 500)));
3397 ASSERT_OK(Put("key" + ToString(i
), ""));
3398 ASSERT_OK(Put("z" + ToString(i
), RandomString(&rnd
, 500)));
3400 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3402 ASSERT_EQ(NumTableFilesAtLevel(0), 1);
3403 for (int i
= 0; i
< 3; i
++) {
3404 ASSERT_EQ("", Get("key" + ToString(i
)));
3406 for (int i
= 0; i
< 3; i
++) {
3407 // Each file contains a different key which will be dropped later.
3408 ASSERT_OK(Put("a" + ToString(i
), RandomString(&rnd
, 500)));
3409 ASSERT_OK(Delete("key" + ToString(i
)));
3410 ASSERT_OK(Put("z" + ToString(i
), RandomString(&rnd
, 500)));
3412 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3414 ASSERT_EQ(NumTableFilesAtLevel(0), 2);
3415 for (int i
= 0; i
< 3; i
++) {
3416 ASSERT_EQ("NOT_FOUND", Get("key" + ToString(i
)));
3420 // Check that FIFO-with-TTL is not supported with max_open_files != -1.
3421 TEST_F(DBTest
, FIFOCompactionWithTTLAndMaxOpenFilesTest
) {
3423 options
.compaction_style
= kCompactionStyleFIFO
;
3424 options
.create_if_missing
= true;
3425 options
.ttl
= 600; // seconds
3427 // TTL is now supported with max_open_files != -1.
3428 options
.max_open_files
= 100;
3429 options
= CurrentOptions(options
);
3430 ASSERT_OK(TryReopen(options
));
3432 options
.max_open_files
= -1;
3433 ASSERT_OK(TryReopen(options
));
3436 // Check that FIFO-with-TTL is supported only with BlockBasedTableFactory.
3437 TEST_F(DBTest
, FIFOCompactionWithTTLAndVariousTableFormatsTest
) {
3439 options
.compaction_style
= kCompactionStyleFIFO
;
3440 options
.create_if_missing
= true;
3441 options
.ttl
= 600; // seconds
3443 options
= CurrentOptions(options
);
3444 options
.table_factory
.reset(NewBlockBasedTableFactory());
3445 ASSERT_OK(TryReopen(options
));
3448 options
.table_factory
.reset(NewPlainTableFactory());
3449 ASSERT_TRUE(TryReopen(options
).IsNotSupported());
3452 options
.table_factory
.reset(NewAdaptiveTableFactory());
3453 ASSERT_TRUE(TryReopen(options
).IsNotSupported());
3456 TEST_F(DBTest
, FIFOCompactionWithTTLTest
) {
3458 options
.compaction_style
= kCompactionStyleFIFO
;
3459 options
.write_buffer_size
= 10 << 10; // 10KB
3460 options
.arena_block_size
= 4096;
3461 options
.compression
= kNoCompression
;
3462 options
.create_if_missing
= true;
3463 env_
->time_elapse_only_sleep_
= false;
3466 // Test to make sure that all files with expired ttl are deleted on next
3467 // manual compaction.
3469 env_
->addon_time_
.store(0);
3470 options
.compaction_options_fifo
.max_table_files_size
= 150 << 10; // 150KB
3471 options
.compaction_options_fifo
.allow_compaction
= false;
3472 options
.ttl
= 1 * 60 * 60 ; // 1 hour
3473 options
= CurrentOptions(options
);
3474 DestroyAndReopen(options
);
3477 for (int i
= 0; i
< 10; i
++) {
3478 // Generate and flush a file about 10KB.
3479 for (int j
= 0; j
< 10; j
++) {
3480 ASSERT_OK(Put(ToString(i
* 20 + j
), RandomString(&rnd
, 980)));
3483 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3485 ASSERT_EQ(NumTableFilesAtLevel(0), 10);
3487 // Sleep for 2 hours -- which is much greater than TTL.
3488 // Note: Couldn't use SleepForMicroseconds because it takes an int instead
3489 // of uint64_t. Hence used addon_time_ directly.
3490 // env_->SleepForMicroseconds(2 * 60 * 60 * 1000 * 1000);
3491 env_
->addon_time_
.fetch_add(2 * 60 * 60);
3493 // Since no flushes and compactions have run, the db should still be in
3494 // the same state even after considerable time has passed.
3495 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3496 ASSERT_EQ(NumTableFilesAtLevel(0), 10);
3498 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
3499 ASSERT_EQ(NumTableFilesAtLevel(0), 0);
3502 // Test to make sure that all files with expired ttl are deleted on next
3503 // automatic compaction.
3505 options
.compaction_options_fifo
.max_table_files_size
= 150 << 10; // 150KB
3506 options
.compaction_options_fifo
.allow_compaction
= false;
3507 options
.ttl
= 1 * 60 * 60; // 1 hour
3508 options
= CurrentOptions(options
);
3509 DestroyAndReopen(options
);
3512 for (int i
= 0; i
< 10; i
++) {
3513 // Generate and flush a file about 10KB.
3514 for (int j
= 0; j
< 10; j
++) {
3515 ASSERT_OK(Put(ToString(i
* 20 + j
), RandomString(&rnd
, 980)));
3518 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3520 ASSERT_EQ(NumTableFilesAtLevel(0), 10);
3522 // Sleep for 2 hours -- which is much greater than TTL.
3523 env_
->addon_time_
.fetch_add(2 * 60 * 60);
3524 // Just to make sure that we are in the same state even after sleeping.
3525 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3526 ASSERT_EQ(NumTableFilesAtLevel(0), 10);
3528 // Create 1 more file to trigger TTL compaction. The old files are dropped.
3529 for (int i
= 0; i
< 1; i
++) {
3530 for (int j
= 0; j
< 10; j
++) {
3531 ASSERT_OK(Put(ToString(i
* 20 + j
), RandomString(&rnd
, 980)));
3536 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3537 // Only the new 10 files remain.
3538 ASSERT_EQ(NumTableFilesAtLevel(0), 1);
3539 ASSERT_LE(SizeAtLevel(0),
3540 options
.compaction_options_fifo
.max_table_files_size
);
3543 // Test that shows the fall back to size-based FIFO compaction if TTL-based
3544 // deletion doesn't move the total size to be less than max_table_files_size.
3546 options
.write_buffer_size
= 10 << 10; // 10KB
3547 options
.compaction_options_fifo
.max_table_files_size
= 150 << 10; // 150KB
3548 options
.compaction_options_fifo
.allow_compaction
= false;
3549 options
.ttl
= 1 * 60 * 60; // 1 hour
3550 options
= CurrentOptions(options
);
3551 DestroyAndReopen(options
);
3554 for (int i
= 0; i
< 3; i
++) {
3555 // Generate and flush a file about 10KB.
3556 for (int j
= 0; j
< 10; j
++) {
3557 ASSERT_OK(Put(ToString(i
* 20 + j
), RandomString(&rnd
, 980)));
3560 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3562 ASSERT_EQ(NumTableFilesAtLevel(0), 3);
3564 // Sleep for 2 hours -- which is much greater than TTL.
3565 env_
->addon_time_
.fetch_add(2 * 60 * 60);
3566 // Just to make sure that we are in the same state even after sleeping.
3567 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3568 ASSERT_EQ(NumTableFilesAtLevel(0), 3);
3570 for (int i
= 0; i
< 5; i
++) {
3571 for (int j
= 0; j
< 140; j
++) {
3572 ASSERT_OK(Put(ToString(i
* 20 + j
), RandomString(&rnd
, 980)));
3575 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3577 // Size limit is still guaranteed.
3578 ASSERT_LE(SizeAtLevel(0),
3579 options
.compaction_options_fifo
.max_table_files_size
);
3582 // Test with TTL + Intra-L0 compactions.
3584 options
.compaction_options_fifo
.max_table_files_size
= 150 << 10; // 150KB
3585 options
.compaction_options_fifo
.allow_compaction
= true;
3586 options
.ttl
= 1 * 60 * 60; // 1 hour
3587 options
.level0_file_num_compaction_trigger
= 6;
3588 options
= CurrentOptions(options
);
3589 DestroyAndReopen(options
);
3592 for (int i
= 0; i
< 10; i
++) {
3593 // Generate and flush a file about 10KB.
3594 for (int j
= 0; j
< 10; j
++) {
3595 ASSERT_OK(Put(ToString(i
* 20 + j
), RandomString(&rnd
, 980)));
3598 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3600 // With Intra-L0 compaction, out of 10 files, 6 files will be compacted to 1
3601 // (due to level0_file_num_compaction_trigger = 6).
3602 // So total files = 1 + remaining 4 = 5.
3603 ASSERT_EQ(NumTableFilesAtLevel(0), 5);
3605 // Sleep for 2 hours -- which is much greater than TTL.
3606 env_
->addon_time_
.fetch_add(2 * 60 * 60);
3607 // Just to make sure that we are in the same state even after sleeping.
3608 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3609 ASSERT_EQ(NumTableFilesAtLevel(0), 5);
3611 // Create 10 more files. The old 5 files are dropped as their ttl expired.
3612 for (int i
= 0; i
< 10; i
++) {
3613 for (int j
= 0; j
< 10; j
++) {
3614 ASSERT_OK(Put(ToString(i
* 20 + j
), RandomString(&rnd
, 980)));
3617 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3619 ASSERT_EQ(NumTableFilesAtLevel(0), 5);
3620 ASSERT_LE(SizeAtLevel(0),
3621 options
.compaction_options_fifo
.max_table_files_size
);
3624 // Test with large TTL + Intra-L0 compactions.
3625 // Files dropped based on size, as ttl doesn't kick in.
3627 options
.write_buffer_size
= 20 << 10; // 20K
3628 options
.compaction_options_fifo
.max_table_files_size
= 1500 << 10; // 1.5MB
3629 options
.compaction_options_fifo
.allow_compaction
= true;
3630 options
.ttl
= 1 * 60 * 60; // 1 hour
3631 options
.level0_file_num_compaction_trigger
= 6;
3632 options
= CurrentOptions(options
);
3633 DestroyAndReopen(options
);
3636 for (int i
= 0; i
< 60; i
++) {
3637 // Generate and flush a file about 20KB.
3638 for (int j
= 0; j
< 20; j
++) {
3639 ASSERT_OK(Put(ToString(i
* 20 + j
), RandomString(&rnd
, 980)));
3642 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3644 // It should be compacted to 10 files.
3645 ASSERT_EQ(NumTableFilesAtLevel(0), 10);
3647 for (int i
= 0; i
< 60; i
++) {
3648 // Generate and flush a file about 20KB.
3649 for (int j
= 0; j
< 20; j
++) {
3650 ASSERT_OK(Put(ToString(i
* 20 + j
+ 2000), RandomString(&rnd
, 980)));
3653 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3656 // It should be compacted to no more than 20 files.
3657 ASSERT_GT(NumTableFilesAtLevel(0), 10);
3658 ASSERT_LT(NumTableFilesAtLevel(0), 18);
3659 // Size limit is still guaranteed.
3660 ASSERT_LE(SizeAtLevel(0),
3661 options
.compaction_options_fifo
.max_table_files_size
);
3664 #endif // ROCKSDB_LITE
3666 #ifndef ROCKSDB_LITE
3668 * This test is not reliable enough as it heavily depends on disk behavior.
3669 * Disable as it is flaky.
3671 TEST_F(DBTest
, DISABLED_RateLimitingTest
) {
3672 Options options
= CurrentOptions();
3673 options
.write_buffer_size
= 1 << 20; // 1MB
3674 options
.level0_file_num_compaction_trigger
= 2;
3675 options
.target_file_size_base
= 1 << 20; // 1MB
3676 options
.max_bytes_for_level_base
= 4 << 20; // 4MB
3677 options
.max_bytes_for_level_multiplier
= 4;
3678 options
.compression
= kNoCompression
;
3679 options
.create_if_missing
= true;
3681 options
.statistics
= ROCKSDB_NAMESPACE::CreateDBStatistics();
3682 options
.IncreaseParallelism(4);
3683 DestroyAndReopen(options
);
3686 wo
.disableWAL
= true;
3688 // # no rate limiting
3690 uint64_t start
= env_
->NowMicros();
3692 for (int64_t i
= 0; i
< (96 << 10); ++i
) {
3694 Put(RandomString(&rnd
, 32), RandomString(&rnd
, (1 << 10) + 1), wo
));
3696 uint64_t elapsed
= env_
->NowMicros() - start
;
3697 double raw_rate
= env_
->bytes_written_
* 1000000.0 / elapsed
;
3698 uint64_t rate_limiter_drains
=
3699 TestGetTickerCount(options
, NUMBER_RATE_LIMITER_DRAINS
);
3700 ASSERT_EQ(0, rate_limiter_drains
);
3703 // # rate limiting with 0.7 x threshold
3704 options
.rate_limiter
.reset(
3705 NewGenericRateLimiter(static_cast<int64_t>(0.7 * raw_rate
)));
3706 env_
->bytes_written_
= 0;
3707 DestroyAndReopen(options
);
3709 start
= env_
->NowMicros();
3711 for (int64_t i
= 0; i
< (96 << 10); ++i
) {
3713 Put(RandomString(&rnd
, 32), RandomString(&rnd
, (1 << 10) + 1), wo
));
3715 rate_limiter_drains
=
3716 TestGetTickerCount(options
, NUMBER_RATE_LIMITER_DRAINS
) -
3717 rate_limiter_drains
;
3718 elapsed
= env_
->NowMicros() - start
;
3720 ASSERT_EQ(options
.rate_limiter
->GetTotalBytesThrough(), env_
->bytes_written_
);
3721 // Most intervals should've been drained (interval time is 100ms, elapsed is
3723 ASSERT_GT(rate_limiter_drains
, 0);
3724 ASSERT_LE(rate_limiter_drains
, elapsed
/ 100000 + 1);
3725 double ratio
= env_
->bytes_written_
* 1000000 / elapsed
/ raw_rate
;
3726 fprintf(stderr
, "write rate ratio = %.2lf, expected 0.7\n", ratio
);
3727 ASSERT_TRUE(ratio
< 0.8);
3729 // # rate limiting with half of the raw_rate
3730 options
.rate_limiter
.reset(
3731 NewGenericRateLimiter(static_cast<int64_t>(raw_rate
/ 2)));
3732 env_
->bytes_written_
= 0;
3733 DestroyAndReopen(options
);
3735 start
= env_
->NowMicros();
3737 for (int64_t i
= 0; i
< (96 << 10); ++i
) {
3739 Put(RandomString(&rnd
, 32), RandomString(&rnd
, (1 << 10) + 1), wo
));
3741 elapsed
= env_
->NowMicros() - start
;
3742 rate_limiter_drains
=
3743 TestGetTickerCount(options
, NUMBER_RATE_LIMITER_DRAINS
) -
3744 rate_limiter_drains
;
3746 ASSERT_EQ(options
.rate_limiter
->GetTotalBytesThrough(), env_
->bytes_written_
);
3747 // Most intervals should've been drained (interval time is 100ms, elapsed is
3749 ASSERT_GT(rate_limiter_drains
, elapsed
/ 100000 / 2);
3750 ASSERT_LE(rate_limiter_drains
, elapsed
/ 100000 + 1);
3751 ratio
= env_
->bytes_written_
* 1000000 / elapsed
/ raw_rate
;
3752 fprintf(stderr
, "write rate ratio = %.2lf, expected 0.5\n", ratio
);
3753 ASSERT_LT(ratio
, 0.6);
3756 TEST_F(DBTest
, TableOptionsSanitizeTest
) {
3757 Options options
= CurrentOptions();
3758 options
.create_if_missing
= true;
3759 DestroyAndReopen(options
);
3760 ASSERT_EQ(db_
->GetOptions().allow_mmap_reads
, false);
3762 options
.table_factory
.reset(new PlainTableFactory());
3763 options
.prefix_extractor
.reset(NewNoopTransform());
3765 ASSERT_TRUE(!TryReopen(options
).IsNotSupported());
3767 // Test for check of prefix_extractor when hash index is used for
3768 // block-based table
3769 BlockBasedTableOptions to
;
3770 to
.index_type
= BlockBasedTableOptions::kHashSearch
;
3771 options
= CurrentOptions();
3772 options
.create_if_missing
= true;
3773 options
.table_factory
.reset(NewBlockBasedTableFactory(to
));
3774 ASSERT_TRUE(TryReopen(options
).IsInvalidArgument());
3775 options
.prefix_extractor
.reset(NewFixedPrefixTransform(1));
3776 ASSERT_OK(TryReopen(options
));
3779 TEST_F(DBTest
, ConcurrentMemtableNotSupported
) {
3780 Options options
= CurrentOptions();
3781 options
.allow_concurrent_memtable_write
= true;
3782 options
.soft_pending_compaction_bytes_limit
= 0;
3783 options
.hard_pending_compaction_bytes_limit
= 100;
3784 options
.create_if_missing
= true;
3786 DestroyDB(dbname_
, options
);
3787 options
.memtable_factory
.reset(NewHashLinkListRepFactory(4, 0, 3, true, 4));
3788 ASSERT_NOK(TryReopen(options
));
3790 options
.memtable_factory
.reset(new SkipListFactory
);
3791 ASSERT_OK(TryReopen(options
));
3793 ColumnFamilyOptions
cf_options(options
);
3794 cf_options
.memtable_factory
.reset(
3795 NewHashLinkListRepFactory(4, 0, 3, true, 4));
3796 ColumnFamilyHandle
* handle
;
3797 ASSERT_NOK(db_
->CreateColumnFamily(cf_options
, "name", &handle
));
3800 #endif // ROCKSDB_LITE
3802 TEST_F(DBTest
, SanitizeNumThreads
) {
3803 for (int attempt
= 0; attempt
< 2; attempt
++) {
3804 const size_t kTotalTasks
= 8;
3805 test::SleepingBackgroundTask sleeping_tasks
[kTotalTasks
];
3807 Options options
= CurrentOptions();
3809 options
.max_background_compactions
= 3;
3810 options
.max_background_flushes
= 2;
3812 options
.create_if_missing
= true;
3813 DestroyAndReopen(options
);
3815 for (size_t i
= 0; i
< kTotalTasks
; i
++) {
3816 // Insert 5 tasks to low priority queue and 5 tasks to high priority queue
3817 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
,
3819 (i
< 4) ? Env::Priority::LOW
: Env::Priority::HIGH
);
3822 // Wait until 10s for they are scheduled.
3823 for (int i
= 0; i
< 10000; i
++) {
3824 if (options
.env
->GetThreadPoolQueueLen(Env::Priority::LOW
) <= 1 &&
3825 options
.env
->GetThreadPoolQueueLen(Env::Priority::HIGH
) <= 2) {
3828 env_
->SleepForMicroseconds(1000);
3831 // pool size 3, total task 4. Queue size should be 1.
3832 ASSERT_EQ(1U, options
.env
->GetThreadPoolQueueLen(Env::Priority::LOW
));
3833 // pool size 2, total task 4. Queue size should be 2.
3834 ASSERT_EQ(2U, options
.env
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
3836 for (size_t i
= 0; i
< kTotalTasks
; i
++) {
3837 sleeping_tasks
[i
].WakeUp();
3838 sleeping_tasks
[i
].WaitUntilDone();
3841 ASSERT_OK(Put("abc", "def"));
3842 ASSERT_EQ("def", Get("abc"));
3844 ASSERT_EQ("def", Get("abc"));
3848 TEST_F(DBTest
, WriteSingleThreadEntry
) {
3849 std::vector
<port::Thread
> threads
;
3850 dbfull()->TEST_LockMutex();
3851 auto w
= dbfull()->TEST_BeginWrite();
3852 threads
.emplace_back([&] { Put("a", "b"); });
3853 env_
->SleepForMicroseconds(10000);
3854 threads
.emplace_back([&] { Flush(); });
3855 env_
->SleepForMicroseconds(10000);
3856 dbfull()->TEST_UnlockMutex();
3857 dbfull()->TEST_LockMutex();
3858 dbfull()->TEST_EndWrite(w
);
3859 dbfull()->TEST_UnlockMutex();
3861 for (auto& t
: threads
) {
3866 TEST_F(DBTest
, ConcurrentFlushWAL
) {
3867 const size_t cnt
= 100;
3871 for (bool two_write_queues
: {false, true}) {
3872 for (bool manual_wal_flush
: {false, true}) {
3873 options
.two_write_queues
= two_write_queues
;
3874 options
.manual_wal_flush
= manual_wal_flush
;
3875 options
.create_if_missing
= true;
3876 DestroyAndReopen(options
);
3877 std::vector
<port::Thread
> threads
;
3878 threads
.emplace_back([&] {
3879 for (size_t i
= 0; i
< cnt
; i
++) {
3880 auto istr
= ToString(i
);
3881 db_
->Put(wopt
, db_
->DefaultColumnFamily(), "a" + istr
, "b" + istr
);
3884 if (two_write_queues
) {
3885 threads
.emplace_back([&] {
3886 for (size_t i
= cnt
; i
< 2 * cnt
; i
++) {
3887 auto istr
= ToString(i
);
3889 batch
.Put("a" + istr
, "b" + istr
);
3890 dbfull()->WriteImpl(wopt
, &batch
, nullptr, nullptr, 0, true);
3894 threads
.emplace_back([&] {
3895 for (size_t i
= 0; i
< cnt
* 100; i
++) { // FlushWAL is faster than Put
3896 db_
->FlushWAL(false);
3899 for (auto& t
: threads
) {
3902 options
.create_if_missing
= false;
3903 // Recover from the wal and make sure that it is not corrupted
3905 for (size_t i
= 0; i
< cnt
; i
++) {
3907 auto istr
= ToString(i
);
3909 db_
->Get(ropt
, db_
->DefaultColumnFamily(), "a" + istr
, &pval
));
3910 ASSERT_TRUE(pval
== ("b" + istr
));
3916 #ifndef ROCKSDB_LITE
3917 TEST_F(DBTest
, DynamicMemtableOptions
) {
3918 const uint64_t k64KB
= 1 << 16;
3919 const uint64_t k128KB
= 1 << 17;
3920 const uint64_t k5KB
= 5 * 1024;
3923 options
.create_if_missing
= true;
3924 options
.compression
= kNoCompression
;
3925 options
.max_background_compactions
= 1;
3926 options
.write_buffer_size
= k64KB
;
3927 options
.arena_block_size
= 16 * 1024;
3928 options
.max_write_buffer_number
= 2;
3929 // Don't trigger compact/slowdown/stop
3930 options
.level0_file_num_compaction_trigger
= 1024;
3931 options
.level0_slowdown_writes_trigger
= 1024;
3932 options
.level0_stop_writes_trigger
= 1024;
3933 DestroyAndReopen(options
);
3935 auto gen_l0_kb
= [this](int size
) {
3936 const int kNumPutsBeforeWaitForFlush
= 64;
3938 for (int i
= 0; i
< size
; i
++) {
3939 ASSERT_OK(Put(Key(i
), RandomString(&rnd
, 1024)));
3941 // The following condition prevents a race condition between flush jobs
3942 // acquiring work and this thread filling up multiple memtables. Without
3943 // this, the flush might produce less files than expected because
3944 // multiple memtables are flushed into a single L0 file. This race
3945 // condition affects assertion (A).
3946 if (i
% kNumPutsBeforeWaitForFlush
== kNumPutsBeforeWaitForFlush
- 1) {
3947 dbfull()->TEST_WaitForFlushMemTable();
3950 dbfull()->TEST_WaitForFlushMemTable();
3953 // Test write_buffer_size
3955 ASSERT_EQ(NumTableFilesAtLevel(0), 1);
3956 ASSERT_LT(SizeAtLevel(0), k64KB
+ k5KB
);
3957 ASSERT_GT(SizeAtLevel(0), k64KB
- k5KB
* 2);
3960 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
3961 ASSERT_EQ(NumTableFilesAtLevel(0), 0);
3963 // Increase buffer size
3964 ASSERT_OK(dbfull()->SetOptions({
3965 {"write_buffer_size", "131072"},
3968 // The existing memtable inflated 64KB->128KB when we invoked SetOptions().
3969 // Write 192KB, we should have a 128KB L0 file and a memtable with 64KB data.
3971 ASSERT_EQ(NumTableFilesAtLevel(0), 1); // (A)
3972 ASSERT_LT(SizeAtLevel(0), k128KB
+ 2 * k5KB
);
3973 ASSERT_GT(SizeAtLevel(0), k128KB
- 4 * k5KB
);
3975 // Decrease buffer size below current usage
3976 ASSERT_OK(dbfull()->SetOptions({
3977 {"write_buffer_size", "65536"},
3979 // The existing memtable became eligible for flush when we reduced its
3980 // capacity to 64KB. Two keys need to be added to trigger flush: first causes
3981 // memtable to be marked full, second schedules the flush. Then we should have
3982 // a 128KB L0 file, a 64KB L0 file, and a memtable with just one key.
3984 ASSERT_EQ(NumTableFilesAtLevel(0), 2);
3985 ASSERT_LT(SizeAtLevel(0), k128KB
+ k64KB
+ 2 * k5KB
);
3986 ASSERT_GT(SizeAtLevel(0), k128KB
+ k64KB
- 4 * k5KB
);
3988 // Test max_write_buffer_number
3989 // Block compaction thread, which will also block the flushes because
3990 // max_background_flushes == 0, so flushes are getting executed by the
3991 // compaction thread
3992 env_
->SetBackgroundThreads(1, Env::LOW
);
3993 test::SleepingBackgroundTask sleeping_task_low
;
3994 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
3995 Env::Priority::LOW
);
3996 // Start from scratch and disable compaction/flush. Flush can only happen
3997 // during compaction but trigger is pretty high
3998 options
.disable_auto_compactions
= true;
3999 DestroyAndReopen(options
);
4000 env_
->SetBackgroundThreads(0, Env::HIGH
);
4002 // Put until writes are stopped, bounded by 256 puts. We should see stop at
4007 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4008 "DBImpl::DelayWrite:Wait",
4009 [&](void* /*arg*/) { sleeping_task_low
.WakeUp(); });
4010 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4012 while (!sleeping_task_low
.WokenUp() && count
< 256) {
4013 ASSERT_OK(Put(Key(count
), RandomString(&rnd
, 1024), WriteOptions()));
4016 ASSERT_GT(static_cast<double>(count
), 128 * 0.8);
4017 ASSERT_LT(static_cast<double>(count
), 128 * 1.2);
4019 sleeping_task_low
.WaitUntilDone();
4022 ASSERT_OK(dbfull()->SetOptions({
4023 {"max_write_buffer_number", "8"},
4025 // Clean up memtable and L0
4026 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
4028 sleeping_task_low
.Reset();
4029 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
4030 Env::Priority::LOW
);
4032 while (!sleeping_task_low
.WokenUp() && count
< 1024) {
4033 ASSERT_OK(Put(Key(count
), RandomString(&rnd
, 1024), WriteOptions()));
4036 // Windows fails this test. Will tune in the future and figure out
4039 ASSERT_GT(static_cast<double>(count
), 512 * 0.8);
4040 ASSERT_LT(static_cast<double>(count
), 512 * 1.2);
4042 sleeping_task_low
.WaitUntilDone();
4045 ASSERT_OK(dbfull()->SetOptions({
4046 {"max_write_buffer_number", "4"},
4048 // Clean up memtable and L0
4049 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
4051 sleeping_task_low
.Reset();
4052 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
4053 Env::Priority::LOW
);
4056 while (!sleeping_task_low
.WokenUp() && count
< 1024) {
4057 ASSERT_OK(Put(Key(count
), RandomString(&rnd
, 1024), WriteOptions()));
4060 // Windows fails this test. Will tune in the future and figure out
4063 ASSERT_GT(static_cast<double>(count
), 256 * 0.8);
4064 ASSERT_LT(static_cast<double>(count
), 266 * 1.2);
4066 sleeping_task_low
.WaitUntilDone();
4068 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4070 #endif // ROCKSDB_LITE
4072 #ifdef ROCKSDB_USING_THREAD_STATUS
4074 void VerifyOperationCount(Env
* env
, ThreadStatus::OperationType op_type
,
4075 int expected_count
) {
4077 std::vector
<ThreadStatus
> thread_list
;
4078 ASSERT_OK(env
->GetThreadList(&thread_list
));
4079 for (auto thread
: thread_list
) {
4080 if (thread
.operation_type
== op_type
) {
4084 ASSERT_EQ(op_count
, expected_count
);
4088 TEST_F(DBTest
, GetThreadStatus
) {
4091 options
.enable_thread_tracking
= true;
4094 std::vector
<ThreadStatus
> thread_list
;
4095 Status s
= env_
->GetThreadList(&thread_list
);
4097 for (int i
= 0; i
< 2; ++i
) {
4098 // repeat the test with differet number of high / low priority threads
4099 const int kTestCount
= 3;
4100 const unsigned int kHighPriCounts
[kTestCount
] = {3, 2, 5};
4101 const unsigned int kLowPriCounts
[kTestCount
] = {10, 15, 3};
4102 const unsigned int kBottomPriCounts
[kTestCount
] = {2, 1, 4};
4103 for (int test
= 0; test
< kTestCount
; ++test
) {
4104 // Change the number of threads in high / low priority pool.
4105 env_
->SetBackgroundThreads(kHighPriCounts
[test
], Env::HIGH
);
4106 env_
->SetBackgroundThreads(kLowPriCounts
[test
], Env::LOW
);
4107 env_
->SetBackgroundThreads(kBottomPriCounts
[test
], Env::BOTTOM
);
4108 // Wait to ensure the all threads has been registered
4109 unsigned int thread_type_counts
[ThreadStatus::NUM_THREAD_TYPES
];
4110 // TODO(ajkr): it'd be better if SetBackgroundThreads returned only after
4111 // all threads have been registered.
4112 // Try up to 60 seconds.
4113 for (int num_try
= 0; num_try
< 60000; num_try
++) {
4114 env_
->SleepForMicroseconds(1000);
4115 thread_list
.clear();
4116 s
= env_
->GetThreadList(&thread_list
);
4118 memset(thread_type_counts
, 0, sizeof(thread_type_counts
));
4119 for (auto thread
: thread_list
) {
4120 ASSERT_LT(thread
.thread_type
, ThreadStatus::NUM_THREAD_TYPES
);
4121 thread_type_counts
[thread
.thread_type
]++;
4123 if (thread_type_counts
[ThreadStatus::HIGH_PRIORITY
] ==
4124 kHighPriCounts
[test
] &&
4125 thread_type_counts
[ThreadStatus::LOW_PRIORITY
] ==
4126 kLowPriCounts
[test
] &&
4127 thread_type_counts
[ThreadStatus::BOTTOM_PRIORITY
] ==
4128 kBottomPriCounts
[test
]) {
4132 // Verify the number of high-priority threads
4133 ASSERT_EQ(thread_type_counts
[ThreadStatus::HIGH_PRIORITY
],
4134 kHighPriCounts
[test
]);
4135 // Verify the number of low-priority threads
4136 ASSERT_EQ(thread_type_counts
[ThreadStatus::LOW_PRIORITY
],
4137 kLowPriCounts
[test
]);
4138 // Verify the number of bottom-priority threads
4139 ASSERT_EQ(thread_type_counts
[ThreadStatus::BOTTOM_PRIORITY
],
4140 kBottomPriCounts
[test
]);
4143 // repeat the test with multiple column families
4144 CreateAndReopenWithCF({"pikachu", "about-to-remove"}, options
);
4145 env_
->GetThreadStatusUpdater()->TEST_VerifyColumnFamilyInfoMap(handles_
,
4149 db_
->DropColumnFamily(handles_
[2]);
4151 handles_
.erase(handles_
.begin() + 2);
4152 env_
->GetThreadStatusUpdater()->TEST_VerifyColumnFamilyInfoMap(handles_
,
4155 env_
->GetThreadStatusUpdater()->TEST_VerifyColumnFamilyInfoMap(handles_
,
4159 TEST_F(DBTest
, DisableThreadStatus
) {
4162 options
.enable_thread_tracking
= false;
4164 CreateAndReopenWithCF({"pikachu", "about-to-remove"}, options
);
4165 // Verify non of the column family info exists
4166 env_
->GetThreadStatusUpdater()->TEST_VerifyColumnFamilyInfoMap(handles_
,
4170 TEST_F(DBTest
, ThreadStatusFlush
) {
4173 options
.write_buffer_size
= 100000; // Small write buffer
4174 options
.enable_thread_tracking
= true;
4175 options
= CurrentOptions(options
);
4177 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
4178 {"FlushJob::FlushJob()", "DBTest::ThreadStatusFlush:1"},
4179 {"DBTest::ThreadStatusFlush:2", "FlushJob::WriteLevel0Table"},
4181 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4183 CreateAndReopenWithCF({"pikachu"}, options
);
4184 VerifyOperationCount(env_
, ThreadStatus::OP_FLUSH
, 0);
4186 ASSERT_OK(Put(1, "foo", "v1"));
4187 ASSERT_EQ("v1", Get(1, "foo"));
4188 VerifyOperationCount(env_
, ThreadStatus::OP_FLUSH
, 0);
4190 uint64_t num_running_flushes
= 0;
4191 db_
->GetIntProperty(DB::Properties::kNumRunningFlushes
, &num_running_flushes
);
4192 ASSERT_EQ(num_running_flushes
, 0);
4194 Put(1, "k1", std::string(100000, 'x')); // Fill memtable
4195 Put(1, "k2", std::string(100000, 'y')); // Trigger flush
4197 // The first sync point is to make sure there's one flush job
4198 // running when we perform VerifyOperationCount().
4199 TEST_SYNC_POINT("DBTest::ThreadStatusFlush:1");
4200 VerifyOperationCount(env_
, ThreadStatus::OP_FLUSH
, 1);
4201 db_
->GetIntProperty(DB::Properties::kNumRunningFlushes
, &num_running_flushes
);
4202 ASSERT_EQ(num_running_flushes
, 1);
4203 // This second sync point is to ensure the flush job will not
4204 // be completed until we already perform VerifyOperationCount().
4205 TEST_SYNC_POINT("DBTest::ThreadStatusFlush:2");
4206 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4209 TEST_P(DBTestWithParam
, ThreadStatusSingleCompaction
) {
4210 const int kTestKeySize
= 16;
4211 const int kTestValueSize
= 984;
4212 const int kEntrySize
= kTestKeySize
+ kTestValueSize
;
4213 const int kEntriesPerBuffer
= 100;
4215 options
.create_if_missing
= true;
4216 options
.write_buffer_size
= kEntrySize
* kEntriesPerBuffer
;
4217 options
.compaction_style
= kCompactionStyleLevel
;
4218 options
.target_file_size_base
= options
.write_buffer_size
;
4219 options
.max_bytes_for_level_base
= options
.target_file_size_base
* 2;
4220 options
.max_bytes_for_level_multiplier
= 2;
4221 options
.compression
= kNoCompression
;
4222 options
= CurrentOptions(options
);
4224 options
.enable_thread_tracking
= true;
4225 const int kNumL0Files
= 4;
4226 options
.level0_file_num_compaction_trigger
= kNumL0Files
;
4227 options
.max_subcompactions
= max_subcompactions_
;
4229 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
4230 {"DBTest::ThreadStatusSingleCompaction:0", "DBImpl::BGWorkCompaction"},
4231 {"CompactionJob::Run():Start", "DBTest::ThreadStatusSingleCompaction:1"},
4232 {"DBTest::ThreadStatusSingleCompaction:2", "CompactionJob::Run():End"},
4234 for (int tests
= 0; tests
< 2; ++tests
) {
4235 DestroyAndReopen(options
);
4236 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
4237 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4241 for (int file
= 0; file
< kNumL0Files
; ++file
) {
4242 for (int key
= 0; key
< kEntriesPerBuffer
; ++key
) {
4243 ASSERT_OK(Put(ToString(key
+ file
* kEntriesPerBuffer
),
4244 RandomString(&rnd
, kTestValueSize
)));
4248 // This makes sure a compaction won't be scheduled until
4249 // we have done with the above Put Phase.
4250 uint64_t num_running_compactions
= 0;
4251 db_
->GetIntProperty(DB::Properties::kNumRunningCompactions
,
4252 &num_running_compactions
);
4253 ASSERT_EQ(num_running_compactions
, 0);
4254 TEST_SYNC_POINT("DBTest::ThreadStatusSingleCompaction:0");
4255 ASSERT_GE(NumTableFilesAtLevel(0),
4256 options
.level0_file_num_compaction_trigger
);
4258 // This makes sure at least one compaction is running.
4259 TEST_SYNC_POINT("DBTest::ThreadStatusSingleCompaction:1");
4261 if (options
.enable_thread_tracking
) {
4262 // expecting one single L0 to L1 compaction
4263 VerifyOperationCount(env_
, ThreadStatus::OP_COMPACTION
, 1);
4265 // If thread tracking is not enabled, compaction count should be 0.
4266 VerifyOperationCount(env_
, ThreadStatus::OP_COMPACTION
, 0);
4268 db_
->GetIntProperty(DB::Properties::kNumRunningCompactions
,
4269 &num_running_compactions
);
4270 ASSERT_EQ(num_running_compactions
, 1);
4271 // TODO(yhchiang): adding assert to verify each compaction stage.
4272 TEST_SYNC_POINT("DBTest::ThreadStatusSingleCompaction:2");
4274 // repeat the test with disabling thread tracking.
4275 options
.enable_thread_tracking
= false;
4276 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4280 TEST_P(DBTestWithParam
, PreShutdownManualCompaction
) {
4281 Options options
= CurrentOptions();
4282 options
.max_subcompactions
= max_subcompactions_
;
4283 CreateAndReopenWithCF({"pikachu"}, options
);
4285 // iter - 0 with 7 levels
4286 // iter - 1 with 3 levels
4287 for (int iter
= 0; iter
< 2; ++iter
) {
4288 MakeTables(3, "p", "q", 1);
4289 ASSERT_EQ("1,1,1", FilesPerLevel(1));
4291 // Compaction range falls before files
4292 Compact(1, "", "c");
4293 ASSERT_EQ("1,1,1", FilesPerLevel(1));
4295 // Compaction range falls after files
4296 Compact(1, "r", "z");
4297 ASSERT_EQ("1,1,1", FilesPerLevel(1));
4299 // Compaction range overlaps files
4300 Compact(1, "p1", "p9");
4301 ASSERT_EQ("0,0,1", FilesPerLevel(1));
4303 // Populate a different range
4304 MakeTables(3, "c", "e", 1);
4305 ASSERT_EQ("1,1,2", FilesPerLevel(1));
4307 // Compact just the new range
4308 Compact(1, "b", "f");
4309 ASSERT_EQ("0,0,2", FilesPerLevel(1));
4312 MakeTables(1, "a", "z", 1);
4313 ASSERT_EQ("1,0,2", FilesPerLevel(1));
4314 CancelAllBackgroundWork(db_
);
4315 db_
->CompactRange(CompactRangeOptions(), handles_
[1], nullptr, nullptr);
4316 ASSERT_EQ("1,0,2", FilesPerLevel(1));
4319 options
= CurrentOptions();
4320 options
.num_levels
= 3;
4321 options
.create_if_missing
= true;
4322 DestroyAndReopen(options
);
4323 CreateAndReopenWithCF({"pikachu"}, options
);
4328 TEST_F(DBTest
, PreShutdownFlush
) {
4329 Options options
= CurrentOptions();
4330 CreateAndReopenWithCF({"pikachu"}, options
);
4331 ASSERT_OK(Put(1, "key", "value"));
4332 CancelAllBackgroundWork(db_
);
4334 db_
->CompactRange(CompactRangeOptions(), handles_
[1], nullptr, nullptr);
4335 ASSERT_TRUE(s
.IsShutdownInProgress());
4338 TEST_P(DBTestWithParam
, PreShutdownMultipleCompaction
) {
4339 const int kTestKeySize
= 16;
4340 const int kTestValueSize
= 984;
4341 const int kEntrySize
= kTestKeySize
+ kTestValueSize
;
4342 const int kEntriesPerBuffer
= 40;
4343 const int kNumL0Files
= 4;
4345 const int kHighPriCount
= 3;
4346 const int kLowPriCount
= 5;
4347 env_
->SetBackgroundThreads(kHighPriCount
, Env::HIGH
);
4348 env_
->SetBackgroundThreads(kLowPriCount
, Env::LOW
);
4351 options
.create_if_missing
= true;
4352 options
.write_buffer_size
= kEntrySize
* kEntriesPerBuffer
;
4353 options
.compaction_style
= kCompactionStyleLevel
;
4354 options
.target_file_size_base
= options
.write_buffer_size
;
4355 options
.max_bytes_for_level_base
=
4356 options
.target_file_size_base
* kNumL0Files
;
4357 options
.compression
= kNoCompression
;
4358 options
= CurrentOptions(options
);
4360 options
.enable_thread_tracking
= true;
4361 options
.level0_file_num_compaction_trigger
= kNumL0Files
;
4362 options
.max_bytes_for_level_multiplier
= 2;
4363 options
.max_background_compactions
= kLowPriCount
;
4364 options
.level0_stop_writes_trigger
= 1 << 10;
4365 options
.level0_slowdown_writes_trigger
= 1 << 10;
4366 options
.max_subcompactions
= max_subcompactions_
;
4371 std::vector
<ThreadStatus
> thread_list
;
4372 // Delay both flush and compaction
4373 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
4374 {{"FlushJob::FlushJob()", "CompactionJob::Run():Start"},
4375 {"CompactionJob::Run():Start",
4376 "DBTest::PreShutdownMultipleCompaction:Preshutdown"},
4377 {"CompactionJob::Run():Start",
4378 "DBTest::PreShutdownMultipleCompaction:VerifyCompaction"},
4379 {"DBTest::PreShutdownMultipleCompaction:Preshutdown",
4380 "CompactionJob::Run():End"},
4381 {"CompactionJob::Run():End",
4382 "DBTest::PreShutdownMultipleCompaction:VerifyPreshutdown"}});
4384 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4386 // Make rocksdb busy
4388 // check how many threads are doing compaction using GetThreadList
4389 int operation_count
[ThreadStatus::NUM_OP_TYPES
] = {0};
4390 for (int file
= 0; file
< 16 * kNumL0Files
; ++file
) {
4391 for (int k
= 0; k
< kEntriesPerBuffer
; ++k
) {
4392 ASSERT_OK(Put(ToString(key
++), RandomString(&rnd
, kTestValueSize
)));
4395 Status s
= env_
->GetThreadList(&thread_list
);
4396 for (auto thread
: thread_list
) {
4397 operation_count
[thread
.operation_type
]++;
4400 // Speed up the test
4401 if (operation_count
[ThreadStatus::OP_FLUSH
] > 1 &&
4402 operation_count
[ThreadStatus::OP_COMPACTION
] >
4403 0.6 * options
.max_background_compactions
) {
4406 if (file
== 15 * kNumL0Files
) {
4407 TEST_SYNC_POINT("DBTest::PreShutdownMultipleCompaction:Preshutdown");
4411 TEST_SYNC_POINT("DBTest::PreShutdownMultipleCompaction:Preshutdown");
4412 ASSERT_GE(operation_count
[ThreadStatus::OP_COMPACTION
], 1);
4413 CancelAllBackgroundWork(db_
);
4414 TEST_SYNC_POINT("DBTest::PreShutdownMultipleCompaction:VerifyPreshutdown");
4415 dbfull()->TEST_WaitForCompact();
4416 // Record the number of compactions at a time.
4417 for (int i
= 0; i
< ThreadStatus::NUM_OP_TYPES
; ++i
) {
4418 operation_count
[i
] = 0;
4420 Status s
= env_
->GetThreadList(&thread_list
);
4421 for (auto thread
: thread_list
) {
4422 operation_count
[thread
.operation_type
]++;
4424 ASSERT_EQ(operation_count
[ThreadStatus::OP_COMPACTION
], 0);
4427 TEST_P(DBTestWithParam
, PreShutdownCompactionMiddle
) {
4428 const int kTestKeySize
= 16;
4429 const int kTestValueSize
= 984;
4430 const int kEntrySize
= kTestKeySize
+ kTestValueSize
;
4431 const int kEntriesPerBuffer
= 40;
4432 const int kNumL0Files
= 4;
4434 const int kHighPriCount
= 3;
4435 const int kLowPriCount
= 5;
4436 env_
->SetBackgroundThreads(kHighPriCount
, Env::HIGH
);
4437 env_
->SetBackgroundThreads(kLowPriCount
, Env::LOW
);
4440 options
.create_if_missing
= true;
4441 options
.write_buffer_size
= kEntrySize
* kEntriesPerBuffer
;
4442 options
.compaction_style
= kCompactionStyleLevel
;
4443 options
.target_file_size_base
= options
.write_buffer_size
;
4444 options
.max_bytes_for_level_base
=
4445 options
.target_file_size_base
* kNumL0Files
;
4446 options
.compression
= kNoCompression
;
4447 options
= CurrentOptions(options
);
4449 options
.enable_thread_tracking
= true;
4450 options
.level0_file_num_compaction_trigger
= kNumL0Files
;
4451 options
.max_bytes_for_level_multiplier
= 2;
4452 options
.max_background_compactions
= kLowPriCount
;
4453 options
.level0_stop_writes_trigger
= 1 << 10;
4454 options
.level0_slowdown_writes_trigger
= 1 << 10;
4455 options
.max_subcompactions
= max_subcompactions_
;
4460 std::vector
<ThreadStatus
> thread_list
;
4461 // Delay both flush and compaction
4462 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
4463 {{"DBTest::PreShutdownCompactionMiddle:Preshutdown",
4464 "CompactionJob::Run():Inprogress"},
4465 {"CompactionJob::Run():Start",
4466 "DBTest::PreShutdownCompactionMiddle:VerifyCompaction"},
4467 {"CompactionJob::Run():Inprogress", "CompactionJob::Run():End"},
4468 {"CompactionJob::Run():End",
4469 "DBTest::PreShutdownCompactionMiddle:VerifyPreshutdown"}});
4471 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4473 // Make rocksdb busy
4475 // check how many threads are doing compaction using GetThreadList
4476 int operation_count
[ThreadStatus::NUM_OP_TYPES
] = {0};
4477 for (int file
= 0; file
< 16 * kNumL0Files
; ++file
) {
4478 for (int k
= 0; k
< kEntriesPerBuffer
; ++k
) {
4479 ASSERT_OK(Put(ToString(key
++), RandomString(&rnd
, kTestValueSize
)));
4482 Status s
= env_
->GetThreadList(&thread_list
);
4483 for (auto thread
: thread_list
) {
4484 operation_count
[thread
.operation_type
]++;
4487 // Speed up the test
4488 if (operation_count
[ThreadStatus::OP_FLUSH
] > 1 &&
4489 operation_count
[ThreadStatus::OP_COMPACTION
] >
4490 0.6 * options
.max_background_compactions
) {
4493 if (file
== 15 * kNumL0Files
) {
4494 TEST_SYNC_POINT("DBTest::PreShutdownCompactionMiddle:VerifyCompaction");
4498 ASSERT_GE(operation_count
[ThreadStatus::OP_COMPACTION
], 1);
4499 CancelAllBackgroundWork(db_
);
4500 TEST_SYNC_POINT("DBTest::PreShutdownCompactionMiddle:Preshutdown");
4501 TEST_SYNC_POINT("DBTest::PreShutdownCompactionMiddle:VerifyPreshutdown");
4502 dbfull()->TEST_WaitForCompact();
4503 // Record the number of compactions at a time.
4504 for (int i
= 0; i
< ThreadStatus::NUM_OP_TYPES
; ++i
) {
4505 operation_count
[i
] = 0;
4507 Status s
= env_
->GetThreadList(&thread_list
);
4508 for (auto thread
: thread_list
) {
4509 operation_count
[thread
.operation_type
]++;
4511 ASSERT_EQ(operation_count
[ThreadStatus::OP_COMPACTION
], 0);
4514 #endif // ROCKSDB_USING_THREAD_STATUS
4516 #ifndef ROCKSDB_LITE
4517 TEST_F(DBTest
, FlushOnDestroy
) {
4519 wo
.disableWAL
= true;
4520 ASSERT_OK(Put("foo", "v1", wo
));
4521 CancelAllBackgroundWork(db_
);
4524 TEST_F(DBTest
, DynamicLevelCompressionPerLevel
) {
4525 if (!Snappy_Supported()) {
4528 const int kNKeys
= 120;
4530 for (int i
= 0; i
< kNKeys
; i
++) {
4533 std::random_shuffle(std::begin(keys
), std::end(keys
));
4537 options
.create_if_missing
= true;
4538 options
.db_write_buffer_size
= 20480;
4539 options
.write_buffer_size
= 20480;
4540 options
.max_write_buffer_number
= 2;
4541 options
.level0_file_num_compaction_trigger
= 2;
4542 options
.level0_slowdown_writes_trigger
= 2;
4543 options
.level0_stop_writes_trigger
= 2;
4544 options
.target_file_size_base
= 20480;
4545 options
.level_compaction_dynamic_level_bytes
= true;
4546 options
.max_bytes_for_level_base
= 102400;
4547 options
.max_bytes_for_level_multiplier
= 4;
4548 options
.max_background_compactions
= 1;
4549 options
.num_levels
= 5;
4551 options
.compression_per_level
.resize(3);
4552 options
.compression_per_level
[0] = kNoCompression
;
4553 options
.compression_per_level
[1] = kNoCompression
;
4554 options
.compression_per_level
[2] = kSnappyCompression
;
4556 OnFileDeletionListener
* listener
= new OnFileDeletionListener();
4557 options
.listeners
.emplace_back(listener
);
4559 DestroyAndReopen(options
);
4561 // Insert more than 80K. L4 should be base level. Neither L0 nor L4 should
4562 // be compressed, so total data size should be more than 80K.
4563 for (int i
= 0; i
< 20; i
++) {
4564 ASSERT_OK(Put(Key(keys
[i
]), CompressibleString(&rnd
, 4000)));
4567 dbfull()->TEST_WaitForCompact();
4569 ASSERT_EQ(NumTableFilesAtLevel(1), 0);
4570 ASSERT_EQ(NumTableFilesAtLevel(2), 0);
4571 ASSERT_EQ(NumTableFilesAtLevel(3), 0);
4572 // Assuming each files' metadata is at least 50 bytes/
4573 ASSERT_GT(SizeAtLevel(0) + SizeAtLevel(4), 20U * 4000U + 50U * 4);
4575 // Insert 400KB. Some data will be compressed
4576 for (int i
= 21; i
< 120; i
++) {
4577 ASSERT_OK(Put(Key(keys
[i
]), CompressibleString(&rnd
, 4000)));
4580 dbfull()->TEST_WaitForCompact();
4581 ASSERT_EQ(NumTableFilesAtLevel(1), 0);
4582 ASSERT_EQ(NumTableFilesAtLevel(2), 0);
4583 ASSERT_LT(SizeAtLevel(0) + SizeAtLevel(3) + SizeAtLevel(4),
4584 120U * 4000U + 50U * 24);
4585 // Make sure data in files in L3 is not compacted by removing all files
4586 // in L4 and calculate number of rows
4587 ASSERT_OK(dbfull()->SetOptions({
4588 {"disable_auto_compactions", "true"},
4590 ColumnFamilyMetaData cf_meta
;
4591 db_
->GetColumnFamilyMetaData(&cf_meta
);
4592 for (auto file
: cf_meta
.levels
[4].files
) {
4593 listener
->SetExpectedFileName(dbname_
+ file
.name
);
4594 ASSERT_OK(dbfull()->DeleteFile(file
.name
));
4596 listener
->VerifyMatchedCount(cf_meta
.levels
[4].files
.size());
4599 std::unique_ptr
<Iterator
> iter(db_
->NewIterator(ReadOptions()));
4600 for (iter
->SeekToFirst(); iter
->Valid(); iter
->Next()) {
4603 ASSERT_OK(iter
->status());
4604 ASSERT_GT(SizeAtLevel(0) + SizeAtLevel(3), num_keys
* 4000U + num_keys
* 10U);
4607 TEST_F(DBTest
, DynamicLevelCompressionPerLevel2
) {
4608 if (!Snappy_Supported() || !LZ4_Supported() || !Zlib_Supported()) {
4611 const int kNKeys
= 500;
4613 for (int i
= 0; i
< kNKeys
; i
++) {
4616 std::random_shuffle(std::begin(keys
), std::end(keys
));
4620 options
.create_if_missing
= true;
4621 options
.db_write_buffer_size
= 6000000;
4622 options
.write_buffer_size
= 600000;
4623 options
.max_write_buffer_number
= 2;
4624 options
.level0_file_num_compaction_trigger
= 2;
4625 options
.level0_slowdown_writes_trigger
= 2;
4626 options
.level0_stop_writes_trigger
= 2;
4627 options
.soft_pending_compaction_bytes_limit
= 1024 * 1024;
4628 options
.target_file_size_base
= 20;
4630 options
.level_compaction_dynamic_level_bytes
= true;
4631 options
.max_bytes_for_level_base
= 200;
4632 options
.max_bytes_for_level_multiplier
= 8;
4633 options
.max_background_compactions
= 1;
4634 options
.num_levels
= 5;
4635 std::shared_ptr
<mock::MockTableFactory
> mtf(new mock::MockTableFactory
);
4636 options
.table_factory
= mtf
;
4638 options
.compression_per_level
.resize(3);
4639 options
.compression_per_level
[0] = kNoCompression
;
4640 options
.compression_per_level
[1] = kLZ4Compression
;
4641 options
.compression_per_level
[2] = kZlibCompression
;
4643 DestroyAndReopen(options
);
4644 // When base level is L4, L4 is LZ4.
4645 std::atomic
<int> num_zlib(0);
4646 std::atomic
<int> num_lz4(0);
4647 std::atomic
<int> num_no(0);
4648 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4649 "LevelCompactionPicker::PickCompaction:Return", [&](void* arg
) {
4650 Compaction
* compaction
= reinterpret_cast<Compaction
*>(arg
);
4651 if (compaction
->output_level() == 4) {
4652 ASSERT_TRUE(compaction
->output_compression() == kLZ4Compression
);
4653 num_lz4
.fetch_add(1);
4656 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4657 "FlushJob::WriteLevel0Table:output_compression", [&](void* arg
) {
4658 auto* compression
= reinterpret_cast<CompressionType
*>(arg
);
4659 ASSERT_TRUE(*compression
== kNoCompression
);
4660 num_no
.fetch_add(1);
4662 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4664 for (int i
= 0; i
< 100; i
++) {
4665 std::string value
= RandomString(&rnd
, 200);
4666 ASSERT_OK(Put(Key(keys
[i
]), value
));
4669 dbfull()->TEST_WaitForCompact();
4674 dbfull()->TEST_WaitForFlushMemTable();
4675 dbfull()->TEST_WaitForCompact();
4676 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4677 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
4679 ASSERT_EQ(NumTableFilesAtLevel(1), 0);
4680 ASSERT_EQ(NumTableFilesAtLevel(2), 0);
4681 ASSERT_EQ(NumTableFilesAtLevel(3), 0);
4682 ASSERT_GT(NumTableFilesAtLevel(4), 0);
4683 ASSERT_GT(num_no
.load(), 2);
4684 ASSERT_GT(num_lz4
.load(), 0);
4685 int prev_num_files_l4
= NumTableFilesAtLevel(4);
4687 // After base level turn L4->L3, L3 becomes LZ4 and L4 becomes Zlib
4690 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4691 "LevelCompactionPicker::PickCompaction:Return", [&](void* arg
) {
4692 Compaction
* compaction
= reinterpret_cast<Compaction
*>(arg
);
4693 if (compaction
->output_level() == 4 && compaction
->start_level() == 3) {
4694 ASSERT_TRUE(compaction
->output_compression() == kZlibCompression
);
4695 num_zlib
.fetch_add(1);
4697 ASSERT_TRUE(compaction
->output_compression() == kLZ4Compression
);
4698 num_lz4
.fetch_add(1);
4701 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4702 "FlushJob::WriteLevel0Table:output_compression", [&](void* arg
) {
4703 auto* compression
= reinterpret_cast<CompressionType
*>(arg
);
4704 ASSERT_TRUE(*compression
== kNoCompression
);
4705 num_no
.fetch_add(1);
4707 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4709 for (int i
= 101; i
< 500; i
++) {
4710 std::string value
= RandomString(&rnd
, 200);
4711 ASSERT_OK(Put(Key(keys
[i
]), value
));
4712 if (i
% 100 == 99) {
4714 dbfull()->TEST_WaitForCompact();
4718 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
4719 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4720 ASSERT_EQ(NumTableFilesAtLevel(1), 0);
4721 ASSERT_EQ(NumTableFilesAtLevel(2), 0);
4722 ASSERT_GT(NumTableFilesAtLevel(3), 0);
4723 ASSERT_GT(NumTableFilesAtLevel(4), prev_num_files_l4
);
4724 ASSERT_GT(num_no
.load(), 2);
4725 ASSERT_GT(num_lz4
.load(), 0);
4726 ASSERT_GT(num_zlib
.load(), 0);
4729 TEST_F(DBTest
, DynamicCompactionOptions
) {
4730 // minimum write buffer size is enforced at 64KB
4731 const uint64_t k32KB
= 1 << 15;
4732 const uint64_t k64KB
= 1 << 16;
4733 const uint64_t k128KB
= 1 << 17;
4734 const uint64_t k1MB
= 1 << 20;
4735 const uint64_t k4KB
= 1 << 12;
4738 options
.create_if_missing
= true;
4739 options
.compression
= kNoCompression
;
4740 options
.soft_pending_compaction_bytes_limit
= 1024 * 1024;
4741 options
.write_buffer_size
= k64KB
;
4742 options
.arena_block_size
= 4 * k4KB
;
4743 options
.max_write_buffer_number
= 2;
4744 // Compaction related options
4745 options
.level0_file_num_compaction_trigger
= 3;
4746 options
.level0_slowdown_writes_trigger
= 4;
4747 options
.level0_stop_writes_trigger
= 8;
4748 options
.target_file_size_base
= k64KB
;
4749 options
.max_compaction_bytes
= options
.target_file_size_base
* 10;
4750 options
.target_file_size_multiplier
= 1;
4751 options
.max_bytes_for_level_base
= k128KB
;
4752 options
.max_bytes_for_level_multiplier
= 4;
4754 // Block flush thread and disable compaction thread
4755 env_
->SetBackgroundThreads(1, Env::LOW
);
4756 env_
->SetBackgroundThreads(1, Env::HIGH
);
4757 DestroyAndReopen(options
);
4759 auto gen_l0_kb
= [this](int start
, int size
, int stride
) {
4761 for (int i
= 0; i
< size
; i
++) {
4762 ASSERT_OK(Put(Key(start
+ stride
* i
), RandomString(&rnd
, 1024)));
4764 dbfull()->TEST_WaitForFlushMemTable();
4767 // Write 3 files that have the same key range.
4768 // Since level0_file_num_compaction_trigger is 3, compaction should be
4769 // triggered. The compaction should result in one L1 file
4770 gen_l0_kb(0, 64, 1);
4771 ASSERT_EQ(NumTableFilesAtLevel(0), 1);
4772 gen_l0_kb(0, 64, 1);
4773 ASSERT_EQ(NumTableFilesAtLevel(0), 2);
4774 gen_l0_kb(0, 64, 1);
4775 dbfull()->TEST_WaitForCompact();
4776 ASSERT_EQ("0,1", FilesPerLevel());
4777 std::vector
<LiveFileMetaData
> metadata
;
4778 db_
->GetLiveFilesMetaData(&metadata
);
4779 ASSERT_EQ(1U, metadata
.size());
4780 ASSERT_LE(metadata
[0].size
, k64KB
+ k4KB
);
4781 ASSERT_GE(metadata
[0].size
, k64KB
- k4KB
);
4783 // Test compaction trigger and target_file_size_base
4784 // Reduce compaction trigger to 2, and reduce L1 file size to 32KB.
4785 // Writing to 64KB L0 files should trigger a compaction. Since these
4786 // 2 L0 files have the same key range, compaction merge them and should
4787 // result in 2 32KB L1 files.
4788 ASSERT_OK(dbfull()->SetOptions({{"level0_file_num_compaction_trigger", "2"},
4789 {"target_file_size_base", ToString(k32KB
)}}));
4791 gen_l0_kb(0, 64, 1);
4792 ASSERT_EQ("1,1", FilesPerLevel());
4793 gen_l0_kb(0, 64, 1);
4794 dbfull()->TEST_WaitForCompact();
4795 ASSERT_EQ("0,2", FilesPerLevel());
4797 db_
->GetLiveFilesMetaData(&metadata
);
4798 ASSERT_EQ(2U, metadata
.size());
4799 ASSERT_LE(metadata
[0].size
, k32KB
+ k4KB
);
4800 ASSERT_GE(metadata
[0].size
, k32KB
- k4KB
);
4801 ASSERT_LE(metadata
[1].size
, k32KB
+ k4KB
);
4802 ASSERT_GE(metadata
[1].size
, k32KB
- k4KB
);
4804 // Test max_bytes_for_level_base
4805 // Increase level base size to 256KB and write enough data that will
4806 // fill L1 and L2. L1 size should be around 256KB while L2 size should be
4807 // around 256KB x 4.
4809 dbfull()->SetOptions({{"max_bytes_for_level_base", ToString(k1MB
)}}));
4811 // writing 96 x 64KB => 6 * 1024KB
4812 // (L1 + L2) = (1 + 4) * 1024KB
4813 for (int i
= 0; i
< 96; ++i
) {
4814 gen_l0_kb(i
, 64, 96);
4816 dbfull()->TEST_WaitForCompact();
4817 ASSERT_GT(SizeAtLevel(1), k1MB
/ 2);
4818 ASSERT_LT(SizeAtLevel(1), k1MB
+ k1MB
/ 2);
4820 // Within (0.5, 1.5) of 4MB.
4821 ASSERT_GT(SizeAtLevel(2), 2 * k1MB
);
4822 ASSERT_LT(SizeAtLevel(2), 6 * k1MB
);
4824 // Test max_bytes_for_level_multiplier and
4825 // max_bytes_for_level_base. Now, reduce both mulitplier and level base,
4826 // After filling enough data that can fit in L1 - L3, we should see L1 size
4827 // reduces to 128KB from 256KB which was asserted previously. Same for L2.
4829 dbfull()->SetOptions({{"max_bytes_for_level_multiplier", "2"},
4830 {"max_bytes_for_level_base", ToString(k128KB
)}}));
4832 // writing 20 x 64KB = 10 x 128KB
4833 // (L1 + L2 + L3) = (1 + 2 + 4) * 128KB
4834 for (int i
= 0; i
< 20; ++i
) {
4835 gen_l0_kb(i
, 64, 32);
4837 dbfull()->TEST_WaitForCompact();
4838 uint64_t total_size
= SizeAtLevel(1) + SizeAtLevel(2) + SizeAtLevel(3);
4839 ASSERT_TRUE(total_size
< k128KB
* 7 * 1.5);
4841 // Test level0_stop_writes_trigger.
4842 // Clean up memtable and L0. Block compaction threads. If continue to write
4843 // and flush memtables. We should see put stop after 8 memtable flushes
4844 // since level0_stop_writes_trigger = 8
4845 dbfull()->TEST_FlushMemTable(true, true);
4846 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
4848 test::SleepingBackgroundTask sleeping_task_low
;
4849 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
4850 Env::Priority::LOW
);
4851 sleeping_task_low
.WaitUntilSleeping();
4852 ASSERT_EQ(NumTableFilesAtLevel(0), 0);
4856 while (count
< 64) {
4857 ASSERT_OK(Put(Key(count
), RandomString(&rnd
, 1024), wo
));
4858 dbfull()->TEST_FlushMemTable(true, true);
4860 if (dbfull()->TEST_write_controler().IsStopped()) {
4861 sleeping_task_low
.WakeUp();
4866 ASSERT_EQ(count
, 8);
4868 sleeping_task_low
.WaitUntilDone();
4870 // Now reduce level0_stop_writes_trigger to 6. Clear up memtables and L0.
4871 // Block compaction thread again. Perform the put and memtable flushes
4872 // until we see the stop after 6 memtable flushes.
4873 ASSERT_OK(dbfull()->SetOptions({{"level0_stop_writes_trigger", "6"}}));
4874 dbfull()->TEST_FlushMemTable(true);
4875 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
4876 ASSERT_EQ(NumTableFilesAtLevel(0), 0);
4878 // Block compaction again
4879 sleeping_task_low
.Reset();
4880 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
4881 Env::Priority::LOW
);
4882 sleeping_task_low
.WaitUntilSleeping();
4884 while (count
< 64) {
4885 ASSERT_OK(Put(Key(count
), RandomString(&rnd
, 1024), wo
));
4886 dbfull()->TEST_FlushMemTable(true, true);
4888 if (dbfull()->TEST_write_controler().IsStopped()) {
4889 sleeping_task_low
.WakeUp();
4893 ASSERT_EQ(count
, 6);
4895 sleeping_task_low
.WaitUntilDone();
4897 // Test disable_auto_compactions
4898 // Compaction thread is unblocked but auto compaction is disabled. Write
4899 // 4 L0 files and compaction should be triggered. If auto compaction is
4900 // disabled, then TEST_WaitForCompact will be waiting for nothing. Number of
4901 // L0 files do not change after the call.
4902 ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "true"}}));
4903 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
4904 ASSERT_EQ(NumTableFilesAtLevel(0), 0);
4906 for (int i
= 0; i
< 4; ++i
) {
4907 ASSERT_OK(Put(Key(i
), RandomString(&rnd
, 1024)));
4908 // Wait for compaction so that put won't stop
4909 dbfull()->TEST_FlushMemTable(true);
4911 dbfull()->TEST_WaitForCompact();
4912 ASSERT_EQ(NumTableFilesAtLevel(0), 4);
4914 // Enable auto compaction and perform the same test, # of L0 files should be
4915 // reduced after compaction.
4916 ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
4917 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
4918 ASSERT_EQ(NumTableFilesAtLevel(0), 0);
4920 for (int i
= 0; i
< 4; ++i
) {
4921 ASSERT_OK(Put(Key(i
), RandomString(&rnd
, 1024)));
4922 // Wait for compaction so that put won't stop
4923 dbfull()->TEST_FlushMemTable(true);
4925 dbfull()->TEST_WaitForCompact();
4926 ASSERT_LT(NumTableFilesAtLevel(0), 4);
4929 // Test dynamic FIFO compaction options.
4930 // This test covers just option parsing and makes sure that the options are
4931 // correctly assigned. Also look at DBOptionsTest.SetFIFOCompactionOptions
4932 // test which makes sure that the FIFO compaction funcionality is working
4933 // as expected on dynamically changing the options.
4934 // Even more FIFOCompactionTests are at DBTest.FIFOCompaction* .
4935 TEST_F(DBTest
, DynamicFIFOCompactionOptions
) {
4938 options
.create_if_missing
= true;
4939 DestroyAndReopen(options
);
4942 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo
.max_table_files_size
,
4943 1024 * 1024 * 1024);
4944 ASSERT_EQ(dbfull()->GetOptions().ttl
, 0);
4945 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo
.allow_compaction
,
4948 ASSERT_OK(dbfull()->SetOptions(
4949 {{"compaction_options_fifo", "{max_table_files_size=23;}"}}));
4950 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo
.max_table_files_size
,
4952 ASSERT_EQ(dbfull()->GetOptions().ttl
, 0);
4953 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo
.allow_compaction
,
4956 ASSERT_OK(dbfull()->SetOptions({{"ttl", "97"}}));
4957 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo
.max_table_files_size
,
4959 ASSERT_EQ(dbfull()->GetOptions().ttl
, 97);
4960 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo
.allow_compaction
,
4963 ASSERT_OK(dbfull()->SetOptions({{"ttl", "203"}}));
4964 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo
.max_table_files_size
,
4966 ASSERT_EQ(dbfull()->GetOptions().ttl
, 203);
4967 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo
.allow_compaction
,
4970 ASSERT_OK(dbfull()->SetOptions(
4971 {{"compaction_options_fifo", "{allow_compaction=true;}"}}));
4972 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo
.max_table_files_size
,
4974 ASSERT_EQ(dbfull()->GetOptions().ttl
, 203);
4975 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo
.allow_compaction
,
4978 ASSERT_OK(dbfull()->SetOptions(
4979 {{"compaction_options_fifo", "{max_table_files_size=31;}"}}));
4980 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo
.max_table_files_size
,
4982 ASSERT_EQ(dbfull()->GetOptions().ttl
, 203);
4983 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo
.allow_compaction
,
4986 ASSERT_OK(dbfull()->SetOptions(
4987 {{"compaction_options_fifo",
4988 "{max_table_files_size=51;allow_compaction=true;}"}}));
4989 ASSERT_OK(dbfull()->SetOptions({{"ttl", "49"}}));
4990 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo
.max_table_files_size
,
4992 ASSERT_EQ(dbfull()->GetOptions().ttl
, 49);
4993 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo
.allow_compaction
,
4997 TEST_F(DBTest
, DynamicUniversalCompactionOptions
) {
4999 options
.create_if_missing
= true;
5000 DestroyAndReopen(options
);
5003 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal
.size_ratio
, 1U);
5004 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal
.min_merge_width
,
5006 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal
.max_merge_width
,
5010 .compaction_options_universal
.max_size_amplification_percent
,
5014 .compaction_options_universal
.compression_size_percent
,
5016 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal
.stop_style
,
5017 kCompactionStopStyleTotalSize
);
5019 dbfull()->GetOptions().compaction_options_universal
.allow_trivial_move
,
5022 ASSERT_OK(dbfull()->SetOptions(
5023 {{"compaction_options_universal", "{size_ratio=7;}"}}));
5024 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal
.size_ratio
, 7u);
5025 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal
.min_merge_width
,
5027 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal
.max_merge_width
,
5031 .compaction_options_universal
.max_size_amplification_percent
,
5035 .compaction_options_universal
.compression_size_percent
,
5037 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal
.stop_style
,
5038 kCompactionStopStyleTotalSize
);
5040 dbfull()->GetOptions().compaction_options_universal
.allow_trivial_move
,
5043 ASSERT_OK(dbfull()->SetOptions(
5044 {{"compaction_options_universal", "{min_merge_width=11;}"}}));
5045 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal
.size_ratio
, 7u);
5046 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal
.min_merge_width
,
5048 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal
.max_merge_width
,
5052 .compaction_options_universal
.max_size_amplification_percent
,
5056 .compaction_options_universal
.compression_size_percent
,
5058 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal
.stop_style
,
5059 kCompactionStopStyleTotalSize
);
5061 dbfull()->GetOptions().compaction_options_universal
.allow_trivial_move
,
5064 #endif // ROCKSDB_LITE
5066 TEST_F(DBTest
, FileCreationRandomFailure
) {
5069 options
.create_if_missing
= true;
5070 options
.write_buffer_size
= 100000; // Small write buffer
5071 options
.target_file_size_base
= 200000;
5072 options
.max_bytes_for_level_base
= 1000000;
5073 options
.max_bytes_for_level_multiplier
= 2;
5075 DestroyAndReopen(options
);
5078 const int kCDTKeysPerBuffer
= 4;
5079 const int kTestSize
= kCDTKeysPerBuffer
* 4096;
5080 const int kTotalIteration
= 100;
5081 // the second half of the test involves in random failure
5082 // of file creation.
5083 const int kRandomFailureTest
= kTotalIteration
/ 2;
5084 std::vector
<std::string
> values
;
5085 for (int i
= 0; i
< kTestSize
; ++i
) {
5086 values
.push_back("NOT_FOUND");
5088 for (int j
= 0; j
< kTotalIteration
; ++j
) {
5089 if (j
== kRandomFailureTest
) {
5090 env_
->non_writeable_rate_
.store(90);
5092 for (int k
= 0; k
< kTestSize
; ++k
) {
5093 // here we expect some of the Put fails.
5094 std::string value
= RandomString(&rnd
, 100);
5095 Status s
= Put(Key(k
), Slice(value
));
5097 // update the latest successful put
5100 // But everything before we simulate the failure-test should succeed.
5101 if (j
< kRandomFailureTest
) {
5107 // If rocksdb does not do the correct job, internal assert will fail here.
5108 dbfull()->TEST_WaitForFlushMemTable();
5109 dbfull()->TEST_WaitForCompact();
5111 // verify we have the latest successful update
5112 for (int k
= 0; k
< kTestSize
; ++k
) {
5113 auto v
= Get(Key(k
));
5114 ASSERT_EQ(v
, values
[k
]);
5117 // reopen and reverify we have the latest successful update
5118 env_
->non_writeable_rate_
.store(0);
5120 for (int k
= 0; k
< kTestSize
; ++k
) {
5121 auto v
= Get(Key(k
));
5122 ASSERT_EQ(v
, values
[k
]);
5126 #ifndef ROCKSDB_LITE
5128 TEST_F(DBTest
, DynamicMiscOptions
) {
5129 // Test max_sequential_skip_in_iterations
5132 options
.create_if_missing
= true;
5133 options
.max_sequential_skip_in_iterations
= 16;
5134 options
.compression
= kNoCompression
;
5135 options
.statistics
= ROCKSDB_NAMESPACE::CreateDBStatistics();
5136 DestroyAndReopen(options
);
5138 auto assert_reseek_count
= [this, &options
](int key_start
, int num_reseek
) {
5139 int key0
= key_start
;
5140 int key1
= key_start
+ 1;
5141 int key2
= key_start
+ 2;
5143 ASSERT_OK(Put(Key(key0
), RandomString(&rnd
, 8)));
5144 for (int i
= 0; i
< 10; ++i
) {
5145 ASSERT_OK(Put(Key(key1
), RandomString(&rnd
, 8)));
5147 ASSERT_OK(Put(Key(key2
), RandomString(&rnd
, 8)));
5148 std::unique_ptr
<Iterator
> iter(db_
->NewIterator(ReadOptions()));
5149 iter
->Seek(Key(key1
));
5150 ASSERT_TRUE(iter
->Valid());
5151 ASSERT_EQ(iter
->key().compare(Key(key1
)), 0);
5153 ASSERT_TRUE(iter
->Valid());
5154 ASSERT_EQ(iter
->key().compare(Key(key2
)), 0);
5155 ASSERT_EQ(num_reseek
,
5156 TestGetTickerCount(options
, NUMBER_OF_RESEEKS_IN_ITERATION
));
5159 assert_reseek_count(100, 0);
5161 ASSERT_OK(dbfull()->SetOptions({{"max_sequential_skip_in_iterations", "4"}}));
5162 // Clear memtable and make new option effective
5163 dbfull()->TEST_FlushMemTable(true);
5165 assert_reseek_count(200, 1);
5168 dbfull()->SetOptions({{"max_sequential_skip_in_iterations", "16"}}));
5169 // Clear memtable and make new option effective
5170 dbfull()->TEST_FlushMemTable(true);
5172 assert_reseek_count(300, 1);
5174 MutableCFOptions mutable_cf_options
;
5175 CreateAndReopenWithCF({"pikachu"}, options
);
5176 // Test soft_pending_compaction_bytes_limit,
5177 // hard_pending_compaction_bytes_limit
5178 ASSERT_OK(dbfull()->SetOptions(
5179 handles_
[1], {{"soft_pending_compaction_bytes_limit", "200"},
5180 {"hard_pending_compaction_bytes_limit", "300"}}));
5181 ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_
[1],
5182 &mutable_cf_options
));
5183 ASSERT_EQ(200, mutable_cf_options
.soft_pending_compaction_bytes_limit
);
5184 ASSERT_EQ(300, mutable_cf_options
.hard_pending_compaction_bytes_limit
);
5185 // Test report_bg_io_stats
5187 dbfull()->SetOptions(handles_
[1], {{"report_bg_io_stats", "true"}}));
5189 ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_
[1],
5190 &mutable_cf_options
));
5191 ASSERT_TRUE(mutable_cf_options
.report_bg_io_stats
);
5194 ASSERT_OK(dbfull()->SetOptions({{"compression", "kNoCompression"}}));
5195 ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_
[0],
5196 &mutable_cf_options
));
5197 ASSERT_EQ(CompressionType::kNoCompression
, mutable_cf_options
.compression
);
5199 if (Snappy_Supported()) {
5200 ASSERT_OK(dbfull()->SetOptions({{"compression", "kSnappyCompression"}}));
5201 ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_
[0],
5202 &mutable_cf_options
));
5203 ASSERT_EQ(CompressionType::kSnappyCompression
,
5204 mutable_cf_options
.compression
);
5207 // Test paranoid_file_checks already done in db_block_cache_test
5209 dbfull()->SetOptions(handles_
[1], {{"paranoid_file_checks", "true"}}));
5210 ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_
[1],
5211 &mutable_cf_options
));
5212 ASSERT_TRUE(mutable_cf_options
.report_bg_io_stats
);
5214 #endif // ROCKSDB_LITE
5216 TEST_F(DBTest
, L0L1L2AndUpHitCounter
) {
5217 Options options
= CurrentOptions();
5218 options
.write_buffer_size
= 32 * 1024;
5219 options
.target_file_size_base
= 32 * 1024;
5220 options
.level0_file_num_compaction_trigger
= 2;
5221 options
.level0_slowdown_writes_trigger
= 2;
5222 options
.level0_stop_writes_trigger
= 4;
5223 options
.max_bytes_for_level_base
= 64 * 1024;
5224 options
.max_write_buffer_number
= 2;
5225 options
.max_background_compactions
= 8;
5226 options
.max_background_flushes
= 8;
5227 options
.statistics
= ROCKSDB_NAMESPACE::CreateDBStatistics();
5228 CreateAndReopenWithCF({"mypikachu"}, options
);
5230 int numkeys
= 20000;
5231 for (int i
= 0; i
< numkeys
; i
++) {
5232 ASSERT_OK(Put(1, Key(i
), "val"));
5234 ASSERT_EQ(0, TestGetTickerCount(options
, GET_HIT_L0
));
5235 ASSERT_EQ(0, TestGetTickerCount(options
, GET_HIT_L1
));
5236 ASSERT_EQ(0, TestGetTickerCount(options
, GET_HIT_L2_AND_UP
));
5238 ASSERT_OK(Flush(1));
5239 dbfull()->TEST_WaitForCompact();
5241 for (int i
= 0; i
< numkeys
; i
++) {
5242 ASSERT_EQ(Get(1, Key(i
)), "val");
5245 ASSERT_GT(TestGetTickerCount(options
, GET_HIT_L0
), 100);
5246 ASSERT_GT(TestGetTickerCount(options
, GET_HIT_L1
), 100);
5247 ASSERT_GT(TestGetTickerCount(options
, GET_HIT_L2_AND_UP
), 100);
5249 ASSERT_EQ(numkeys
, TestGetTickerCount(options
, GET_HIT_L0
) +
5250 TestGetTickerCount(options
, GET_HIT_L1
) +
5251 TestGetTickerCount(options
, GET_HIT_L2_AND_UP
));
5254 TEST_F(DBTest
, EncodeDecompressedBlockSizeTest
) {
5260 CompressionType compressions
[] = {kZlibCompression
, kBZip2Compression
,
5261 kLZ4Compression
, kLZ4HCCompression
,
5262 kXpressCompression
};
5263 for (auto comp
: compressions
) {
5264 if (!CompressionTypeSupported(comp
)) {
5267 // first_table_version 1 -- generate with table_version == 1, read with
5268 // table_version == 2
5269 // first_table_version 2 -- generate with table_version == 2, read with
5270 // table_version == 1
5271 for (int first_table_version
= 1; first_table_version
<= 2;
5272 ++first_table_version
) {
5273 BlockBasedTableOptions table_options
;
5274 table_options
.format_version
= first_table_version
;
5275 table_options
.filter_policy
.reset(NewBloomFilterPolicy(10));
5276 Options options
= CurrentOptions();
5277 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
5278 options
.create_if_missing
= true;
5279 options
.compression
= comp
;
5280 DestroyAndReopen(options
);
5282 int kNumKeysWritten
= 1000;
5285 for (int i
= 0; i
< kNumKeysWritten
; ++i
) {
5286 // compressible string
5287 ASSERT_OK(Put(Key(i
), RandomString(&rnd
, 128) + std::string(128, 'a')));
5290 table_options
.format_version
= first_table_version
== 1 ? 2 : 1;
5291 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
5293 for (int i
= 0; i
< kNumKeysWritten
; ++i
) {
5294 auto r
= Get(Key(i
));
5295 ASSERT_EQ(r
.substr(128), std::string(128, 'a'));
5301 TEST_F(DBTest
, CloseSpeedup
) {
5302 Options options
= CurrentOptions();
5303 options
.compaction_style
= kCompactionStyleLevel
;
5304 options
.write_buffer_size
= 110 << 10; // 110KB
5305 options
.arena_block_size
= 4 << 10;
5306 options
.level0_file_num_compaction_trigger
= 2;
5307 options
.num_levels
= 4;
5308 options
.max_bytes_for_level_base
= 400 * 1024;
5309 options
.max_write_buffer_number
= 16;
5311 // Block background threads
5312 env_
->SetBackgroundThreads(1, Env::LOW
);
5313 env_
->SetBackgroundThreads(1, Env::HIGH
);
5314 test::SleepingBackgroundTask sleeping_task_low
;
5315 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
5316 Env::Priority::LOW
);
5317 test::SleepingBackgroundTask sleeping_task_high
;
5318 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
,
5319 &sleeping_task_high
, Env::Priority::HIGH
);
5321 std::vector
<std::string
> filenames
;
5322 env_
->GetChildren(dbname_
, &filenames
);
5323 // Delete archival files.
5324 for (size_t i
= 0; i
< filenames
.size(); ++i
) {
5325 env_
->DeleteFile(dbname_
+ "/" + filenames
[i
]);
5327 env_
->DeleteDir(dbname_
);
5328 DestroyAndReopen(options
);
5330 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
5331 env_
->SetBackgroundThreads(1, Env::LOW
);
5332 env_
->SetBackgroundThreads(1, Env::HIGH
);
5336 // First three 110KB files are not going to level 2
5337 // After that, (100K, 200K)
5338 for (int num
= 0; num
< 5; num
++) {
5339 GenerateNewFile(&rnd
, &key_idx
, true);
5342 ASSERT_EQ(0, GetSstFileCount(dbname_
));
5345 ASSERT_EQ(0, GetSstFileCount(dbname_
));
5347 // Unblock background threads
5348 sleeping_task_high
.WakeUp();
5349 sleeping_task_high
.WaitUntilDone();
5350 sleeping_task_low
.WakeUp();
5351 sleeping_task_low
.WaitUntilDone();
5356 class DelayedMergeOperator
: public MergeOperator
{
5361 explicit DelayedMergeOperator(DBTest
* d
) : db_test_(d
) {}
5363 bool FullMergeV2(const MergeOperationInput
& /*merge_in*/,
5364 MergeOperationOutput
* merge_out
) const override
{
5365 db_test_
->env_
->addon_time_
.fetch_add(1000);
5366 merge_out
->new_value
= "";
5370 const char* Name() const override
{ return "DelayedMergeOperator"; }
5373 TEST_F(DBTest
, MergeTestTime
) {
5374 std::string one
, two
, three
;
5375 PutFixed64(&one
, 1);
5376 PutFixed64(&two
, 2);
5377 PutFixed64(&three
, 3);
5379 // Enable time profiling
5380 SetPerfLevel(kEnableTime
);
5381 this->env_
->addon_time_
.store(0);
5382 this->env_
->time_elapse_only_sleep_
= true;
5383 this->env_
->no_slowdown_
= true;
5384 Options options
= CurrentOptions();
5385 options
.statistics
= ROCKSDB_NAMESPACE::CreateDBStatistics();
5386 options
.merge_operator
.reset(new DelayedMergeOperator(this));
5387 DestroyAndReopen(options
);
5389 ASSERT_EQ(TestGetTickerCount(options
, MERGE_OPERATION_TOTAL_TIME
), 0);
5390 db_
->Put(WriteOptions(), "foo", one
);
5392 ASSERT_OK(db_
->Merge(WriteOptions(), "foo", two
));
5394 ASSERT_OK(db_
->Merge(WriteOptions(), "foo", three
));
5398 opt
.verify_checksums
= true;
5399 opt
.snapshot
= nullptr;
5401 db_
->Get(opt
, "foo", &result
);
5403 ASSERT_EQ(1000000, TestGetTickerCount(options
, MERGE_OPERATION_TOTAL_TIME
));
5405 ReadOptions read_options
;
5406 std::unique_ptr
<Iterator
> iter(db_
->NewIterator(read_options
));
5408 for (iter
->SeekToFirst(); iter
->Valid(); iter
->Next()) {
5409 ASSERT_OK(iter
->status());
5413 ASSERT_EQ(1, count
);
5414 ASSERT_EQ(2000000, TestGetTickerCount(options
, MERGE_OPERATION_TOTAL_TIME
));
5415 #ifdef ROCKSDB_USING_THREAD_STATUS
5416 ASSERT_GT(TestGetTickerCount(options
, FLUSH_WRITE_BYTES
), 0);
5417 #endif // ROCKSDB_USING_THREAD_STATUS
5418 this->env_
->time_elapse_only_sleep_
= false;
5421 #ifndef ROCKSDB_LITE
5422 TEST_P(DBTestWithParam
, MergeCompactionTimeTest
) {
5423 SetPerfLevel(kEnableTime
);
5424 Options options
= CurrentOptions();
5425 options
.compaction_filter_factory
= std::make_shared
<KeepFilterFactory
>();
5426 options
.statistics
= ROCKSDB_NAMESPACE::CreateDBStatistics();
5427 options
.merge_operator
.reset(new DelayedMergeOperator(this));
5428 options
.compaction_style
= kCompactionStyleUniversal
;
5429 options
.max_subcompactions
= max_subcompactions_
;
5430 DestroyAndReopen(options
);
5432 for (int i
= 0; i
< 1000; i
++) {
5433 ASSERT_OK(db_
->Merge(WriteOptions(), "foo", "TEST"));
5436 dbfull()->TEST_WaitForFlushMemTable();
5437 dbfull()->TEST_WaitForCompact();
5439 ASSERT_NE(TestGetTickerCount(options
, MERGE_OPERATION_TOTAL_TIME
), 0);
5442 TEST_P(DBTestWithParam
, FilterCompactionTimeTest
) {
5443 Options options
= CurrentOptions();
5444 options
.compaction_filter_factory
=
5445 std::make_shared
<DelayFilterFactory
>(this);
5446 options
.disable_auto_compactions
= true;
5447 options
.create_if_missing
= true;
5448 options
.statistics
= ROCKSDB_NAMESPACE::CreateDBStatistics();
5449 options
.statistics
->set_stats_level(kExceptTimeForMutex
);
5450 options
.max_subcompactions
= max_subcompactions_
;
5451 DestroyAndReopen(options
);
5454 for (int table
= 0; table
< 4; ++table
) {
5455 for (int i
= 0; i
< 10 + table
; ++i
) {
5456 Put(ToString(table
* 100 + i
), "val");
5461 CompactRangeOptions cro
;
5462 cro
.exclusive_manual_compaction
= exclusive_manual_compaction_
;
5463 ASSERT_OK(db_
->CompactRange(cro
, nullptr, nullptr));
5464 ASSERT_EQ(0U, CountLiveFiles());
5468 Iterator
* itr
= db_
->NewIterator(ReadOptions());
5470 ASSERT_NE(TestGetTickerCount(options
, FILTER_OPERATION_TOTAL_TIME
), 0);
5473 #endif // ROCKSDB_LITE
5475 TEST_F(DBTest
, TestLogCleanup
) {
5476 Options options
= CurrentOptions();
5477 options
.write_buffer_size
= 64 * 1024; // very small
5478 // only two memtables allowed ==> only two log files
5479 options
.max_write_buffer_number
= 2;
5482 for (int i
= 0; i
< 100000; ++i
) {
5484 // only 2 memtables will be alive, so logs_to_free needs to always be below
5486 ASSERT_LT(dbfull()->TEST_LogsToFreeSize(), static_cast<size_t>(3));
5490 #ifndef ROCKSDB_LITE
5491 TEST_F(DBTest
, EmptyCompactedDB
) {
5492 Options options
= CurrentOptions();
5493 options
.max_open_files
= -1;
5495 ASSERT_OK(ReadOnlyReopen(options
));
5496 Status s
= Put("new", "value");
5497 ASSERT_TRUE(s
.IsNotSupported());
5500 #endif // ROCKSDB_LITE
5502 #ifndef ROCKSDB_LITE
5503 TEST_F(DBTest
, SuggestCompactRangeTest
) {
5504 class CompactionFilterFactoryGetContext
: public CompactionFilterFactory
{
5506 std::unique_ptr
<CompactionFilter
> CreateCompactionFilter(
5507 const CompactionFilter::Context
& context
) override
{
5508 saved_context
= context
;
5509 std::unique_ptr
<CompactionFilter
> empty_filter
;
5510 return empty_filter
;
5512 const char* Name() const override
{
5513 return "CompactionFilterFactoryGetContext";
5515 static bool IsManual(CompactionFilterFactory
* compaction_filter_factory
) {
5516 return reinterpret_cast<CompactionFilterFactoryGetContext
*>(
5517 compaction_filter_factory
)
5518 ->saved_context
.is_manual_compaction
;
5520 CompactionFilter::Context saved_context
;
5523 Options options
= CurrentOptions();
5524 options
.memtable_factory
.reset(
5525 new SpecialSkipListFactory(DBTestBase::kNumKeysByGenerateNewRandomFile
));
5526 options
.compaction_style
= kCompactionStyleLevel
;
5527 options
.compaction_filter_factory
.reset(
5528 new CompactionFilterFactoryGetContext());
5529 options
.write_buffer_size
= 200 << 10;
5530 options
.arena_block_size
= 4 << 10;
5531 options
.level0_file_num_compaction_trigger
= 4;
5532 options
.num_levels
= 4;
5533 options
.compression
= kNoCompression
;
5534 options
.max_bytes_for_level_base
= 450 << 10;
5535 options
.target_file_size_base
= 98 << 10;
5536 options
.max_compaction_bytes
= static_cast<uint64_t>(1) << 60; // inf
5542 for (int num
= 0; num
< 3; num
++) {
5543 GenerateNewRandomFile(&rnd
);
5546 GenerateNewRandomFile(&rnd
);
5547 ASSERT_EQ("0,4", FilesPerLevel(0));
5548 ASSERT_TRUE(!CompactionFilterFactoryGetContext::IsManual(
5549 options
.compaction_filter_factory
.get()));
5551 GenerateNewRandomFile(&rnd
);
5552 ASSERT_EQ("1,4", FilesPerLevel(0));
5554 GenerateNewRandomFile(&rnd
);
5555 ASSERT_EQ("2,4", FilesPerLevel(0));
5557 GenerateNewRandomFile(&rnd
);
5558 ASSERT_EQ("3,4", FilesPerLevel(0));
5560 GenerateNewRandomFile(&rnd
);
5561 ASSERT_EQ("0,4,4", FilesPerLevel(0));
5563 GenerateNewRandomFile(&rnd
);
5564 ASSERT_EQ("1,4,4", FilesPerLevel(0));
5566 GenerateNewRandomFile(&rnd
);
5567 ASSERT_EQ("2,4,4", FilesPerLevel(0));
5569 GenerateNewRandomFile(&rnd
);
5570 ASSERT_EQ("3,4,4", FilesPerLevel(0));
5572 GenerateNewRandomFile(&rnd
);
5573 ASSERT_EQ("0,4,8", FilesPerLevel(0));
5575 GenerateNewRandomFile(&rnd
);
5576 ASSERT_EQ("1,4,8", FilesPerLevel(0));
5578 // compact it three times
5579 for (int i
= 0; i
< 3; ++i
) {
5580 ASSERT_OK(experimental::SuggestCompactRange(db_
, nullptr, nullptr));
5581 dbfull()->TEST_WaitForCompact();
5584 // All files are compacted
5585 ASSERT_EQ(0, NumTableFilesAtLevel(0));
5586 ASSERT_EQ(0, NumTableFilesAtLevel(1));
5588 GenerateNewRandomFile(&rnd
);
5589 ASSERT_EQ(1, NumTableFilesAtLevel(0));
5591 // nonoverlapping with the file on level 0
5592 Slice
start("a"), end("b");
5593 ASSERT_OK(experimental::SuggestCompactRange(db_
, &start
, &end
));
5594 dbfull()->TEST_WaitForCompact();
5596 // should not compact the level 0 file
5597 ASSERT_EQ(1, NumTableFilesAtLevel(0));
5601 ASSERT_OK(experimental::SuggestCompactRange(db_
, &start
, &end
));
5602 dbfull()->TEST_WaitForCompact();
5603 ASSERT_TRUE(CompactionFilterFactoryGetContext::IsManual(
5604 options
.compaction_filter_factory
.get()));
5606 // now it should compact the level 0 file
5607 ASSERT_EQ(0, NumTableFilesAtLevel(0));
5608 ASSERT_EQ(1, NumTableFilesAtLevel(1));
5611 TEST_F(DBTest
, PromoteL0
) {
5612 Options options
= CurrentOptions();
5613 options
.disable_auto_compactions
= true;
5614 options
.write_buffer_size
= 10 * 1024 * 1024;
5615 DestroyAndReopen(options
);
5617 // non overlapping ranges
5618 std::vector
<std::pair
<int32_t, int32_t>> ranges
= {
5619 {81, 160}, {0, 80}, {161, 240}, {241, 320}};
5621 int32_t value_size
= 10 * 1024; // 10 KB
5624 std::map
<int32_t, std::string
> values
;
5625 for (const auto& range
: ranges
) {
5626 for (int32_t j
= range
.first
; j
< range
.second
; j
++) {
5627 values
[j
] = RandomString(&rnd
, value_size
);
5628 ASSERT_OK(Put(Key(j
), values
[j
]));
5633 int32_t level0_files
= NumTableFilesAtLevel(0, 0);
5634 ASSERT_EQ(level0_files
, ranges
.size());
5635 ASSERT_EQ(NumTableFilesAtLevel(1, 0), 0); // No files in L1
5637 // Promote L0 level to L2.
5638 ASSERT_OK(experimental::PromoteL0(db_
, db_
->DefaultColumnFamily(), 2));
5639 // We expect that all the files were trivially moved from L0 to L2
5640 ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0);
5641 ASSERT_EQ(NumTableFilesAtLevel(2, 0), level0_files
);
5643 for (const auto& kv
: values
) {
5644 ASSERT_EQ(Get(Key(kv
.first
)), kv
.second
);
5648 TEST_F(DBTest
, PromoteL0Failure
) {
5649 Options options
= CurrentOptions();
5650 options
.disable_auto_compactions
= true;
5651 options
.write_buffer_size
= 10 * 1024 * 1024;
5652 DestroyAndReopen(options
);
5654 // Produce two L0 files with overlapping ranges.
5655 ASSERT_OK(Put(Key(0), ""));
5656 ASSERT_OK(Put(Key(3), ""));
5658 ASSERT_OK(Put(Key(1), ""));
5662 // Fails because L0 has overlapping files.
5663 status
= experimental::PromoteL0(db_
, db_
->DefaultColumnFamily());
5664 ASSERT_TRUE(status
.IsInvalidArgument());
5666 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
5667 // Now there is a file in L1.
5668 ASSERT_GE(NumTableFilesAtLevel(1, 0), 1);
5670 ASSERT_OK(Put(Key(5), ""));
5672 // Fails because L1 is non-empty.
5673 status
= experimental::PromoteL0(db_
, db_
->DefaultColumnFamily());
5674 ASSERT_TRUE(status
.IsInvalidArgument());
5677 // Github issue #596
5678 TEST_F(DBTest
, CompactRangeWithEmptyBottomLevel
) {
5679 const int kNumLevels
= 2;
5680 const int kNumL0Files
= 2;
5681 Options options
= CurrentOptions();
5682 options
.disable_auto_compactions
= true;
5683 options
.num_levels
= kNumLevels
;
5684 DestroyAndReopen(options
);
5687 for (int i
= 0; i
< kNumL0Files
; ++i
) {
5688 ASSERT_OK(Put(Key(0), RandomString(&rnd
, 1024)));
5691 ASSERT_EQ(NumTableFilesAtLevel(0), kNumL0Files
);
5692 ASSERT_EQ(NumTableFilesAtLevel(1), 0);
5694 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
5695 ASSERT_EQ(NumTableFilesAtLevel(0), 0);
5696 ASSERT_EQ(NumTableFilesAtLevel(1), kNumL0Files
);
5698 #endif // ROCKSDB_LITE
5700 TEST_F(DBTest
, AutomaticConflictsWithManualCompaction
) {
5701 const int kNumL0Files
= 50;
5702 Options options
= CurrentOptions();
5703 options
.level0_file_num_compaction_trigger
= 4;
5704 // never slowdown / stop
5705 options
.level0_slowdown_writes_trigger
= 999999;
5706 options
.level0_stop_writes_trigger
= 999999;
5707 options
.max_background_compactions
= 10;
5708 DestroyAndReopen(options
);
5710 // schedule automatic compactions after the manual one starts, but before it
5711 // finishes to ensure conflict.
5712 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
5713 {{"DBImpl::BackgroundCompaction:Start",
5714 "DBTest::AutomaticConflictsWithManualCompaction:PrePuts"},
5715 {"DBTest::AutomaticConflictsWithManualCompaction:PostPuts",
5716 "DBImpl::BackgroundCompaction:NonTrivial:AfterRun"}});
5717 std::atomic
<int> callback_count(0);
5718 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
5719 "DBImpl::MaybeScheduleFlushOrCompaction:Conflict",
5720 [&](void* /*arg*/) { callback_count
.fetch_add(1); });
5721 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
5724 for (int i
= 0; i
< 2; ++i
) {
5725 // put two keys to ensure no trivial move
5726 for (int j
= 0; j
< 2; ++j
) {
5727 ASSERT_OK(Put(Key(j
), RandomString(&rnd
, 1024)));
5731 port::Thread
manual_compaction_thread([this]() {
5732 CompactRangeOptions croptions
;
5733 croptions
.exclusive_manual_compaction
= true;
5734 ASSERT_OK(db_
->CompactRange(croptions
, nullptr, nullptr));
5737 TEST_SYNC_POINT("DBTest::AutomaticConflictsWithManualCompaction:PrePuts");
5738 for (int i
= 0; i
< kNumL0Files
; ++i
) {
5739 // put two keys to ensure no trivial move
5740 for (int j
= 0; j
< 2; ++j
) {
5741 ASSERT_OK(Put(Key(j
), RandomString(&rnd
, 1024)));
5745 TEST_SYNC_POINT("DBTest::AutomaticConflictsWithManualCompaction:PostPuts");
5747 ASSERT_GE(callback_count
.load(), 1);
5748 for (int i
= 0; i
< 2; ++i
) {
5749 ASSERT_NE("NOT_FOUND", Get(Key(i
)));
5751 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
5752 manual_compaction_thread
.join();
5753 dbfull()->TEST_WaitForCompact();
5756 #ifndef ROCKSDB_LITE
5757 TEST_F(DBTest
, CompactFilesShouldTriggerAutoCompaction
) {
5758 Options options
= CurrentOptions();
5759 options
.max_background_compactions
= 1;
5760 options
.level0_file_num_compaction_trigger
= 4;
5761 options
.level0_slowdown_writes_trigger
= 36;
5762 options
.level0_stop_writes_trigger
= 36;
5763 DestroyAndReopen(options
);
5765 // generate files for manual compaction
5767 for (int i
= 0; i
< 2; ++i
) {
5768 // put two keys to ensure no trivial move
5769 for (int j
= 0; j
< 2; ++j
) {
5770 ASSERT_OK(Put(Key(j
), RandomString(&rnd
, 1024)));
5775 ROCKSDB_NAMESPACE::ColumnFamilyMetaData cf_meta_data
;
5776 db_
->GetColumnFamilyMetaData(db_
->DefaultColumnFamily(), &cf_meta_data
);
5778 std::vector
<std::string
> input_files
;
5779 input_files
.push_back(cf_meta_data
.levels
[0].files
[0].name
);
5781 SyncPoint::GetInstance()->LoadDependency({
5782 {"CompactFilesImpl:0",
5783 "DBTest::CompactFilesShouldTriggerAutoCompaction:Begin"},
5784 {"DBTest::CompactFilesShouldTriggerAutoCompaction:End",
5785 "CompactFilesImpl:1"},
5788 SyncPoint::GetInstance()->EnableProcessing();
5790 port::Thread
manual_compaction_thread([&]() {
5791 auto s
= db_
->CompactFiles(CompactionOptions(),
5792 db_
->DefaultColumnFamily(), input_files
, 0);
5796 "DBTest::CompactFilesShouldTriggerAutoCompaction:Begin");
5797 // generate enough files to trigger compaction
5798 for (int i
= 0; i
< 20; ++i
) {
5799 for (int j
= 0; j
< 2; ++j
) {
5800 ASSERT_OK(Put(Key(j
), RandomString(&rnd
, 1024)));
5804 db_
->GetColumnFamilyMetaData(db_
->DefaultColumnFamily(), &cf_meta_data
);
5805 ASSERT_GT(cf_meta_data
.levels
[0].files
.size(),
5806 options
.level0_file_num_compaction_trigger
);
5808 "DBTest::CompactFilesShouldTriggerAutoCompaction:End");
5810 manual_compaction_thread
.join();
5811 dbfull()->TEST_WaitForCompact();
5813 db_
->GetColumnFamilyMetaData(db_
->DefaultColumnFamily(), &cf_meta_data
);
5814 ASSERT_LE(cf_meta_data
.levels
[0].files
.size(),
5815 options
.level0_file_num_compaction_trigger
);
5817 #endif // ROCKSDB_LITE
5819 // Github issue #595
5820 // Large write batch with column families
5821 TEST_F(DBTest
, LargeBatchWithColumnFamilies
) {
5822 Options options
= CurrentOptions();
5824 options
.write_buffer_size
= 100000; // Small write buffer
5825 CreateAndReopenWithCF({"pikachu"}, options
);
5827 for (int i
= 0; i
< 5; i
++) {
5828 for (int pass
= 1; pass
<= 3; pass
++) {
5830 size_t write_size
= 1024 * 1024 * (5 + i
);
5831 fprintf(stderr
, "prepare: %" ROCKSDB_PRIszt
" MB, pass:%d\n",
5832 (write_size
/ 1024 / 1024), pass
);
5834 std::string
data(3000, j
++ % 127 + 20);
5835 data
+= ToString(j
);
5836 batch
.Put(handles_
[0], Slice(data
), Slice(data
));
5837 if (batch
.GetDataSize() > write_size
) {
5841 fprintf(stderr
, "write: %" ROCKSDB_PRIszt
" MB\n",
5842 (batch
.GetDataSize() / 1024 / 1024));
5843 ASSERT_OK(dbfull()->Write(WriteOptions(), &batch
));
5844 fprintf(stderr
, "done\n");
5847 // make sure we can re-open it.
5848 ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options
));
5851 // Make sure that Flushes can proceed in parallel with CompactRange()
5852 TEST_F(DBTest
, FlushesInParallelWithCompactRange
) {
5853 // iter == 0 -- leveled
5854 // iter == 1 -- leveled, but throw in a flush between two levels compacting
5855 // iter == 2 -- universal
5856 for (int iter
= 0; iter
< 3; ++iter
) {
5857 Options options
= CurrentOptions();
5859 options
.compaction_style
= kCompactionStyleLevel
;
5861 options
.compaction_style
= kCompactionStyleUniversal
;
5863 options
.write_buffer_size
= 110 << 10;
5864 options
.level0_file_num_compaction_trigger
= 4;
5865 options
.num_levels
= 4;
5866 options
.compression
= kNoCompression
;
5867 options
.max_bytes_for_level_base
= 450 << 10;
5868 options
.target_file_size_base
= 98 << 10;
5869 options
.max_write_buffer_number
= 2;
5871 DestroyAndReopen(options
);
5874 for (int num
= 0; num
< 14; num
++) {
5875 GenerateNewRandomFile(&rnd
);
5879 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
5880 {{"DBImpl::RunManualCompaction()::1",
5881 "DBTest::FlushesInParallelWithCompactRange:1"},
5882 {"DBTest::FlushesInParallelWithCompactRange:2",
5883 "DBImpl::RunManualCompaction()::2"}});
5885 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
5886 {{"CompactionJob::Run():Start",
5887 "DBTest::FlushesInParallelWithCompactRange:1"},
5888 {"DBTest::FlushesInParallelWithCompactRange:2",
5889 "CompactionJob::Run():End"}});
5891 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
5893 std::vector
<port::Thread
> threads
;
5894 threads
.emplace_back([&]() { Compact("a", "z"); });
5896 TEST_SYNC_POINT("DBTest::FlushesInParallelWithCompactRange:1");
5898 // this has to start a flush. if flushes are blocked, this will try to
5900 // 3 memtables, and that will fail because max_write_buffer_number is 2
5901 for (int num
= 0; num
< 3; num
++) {
5902 GenerateNewRandomFile(&rnd
, /* nowait */ true);
5905 TEST_SYNC_POINT("DBTest::FlushesInParallelWithCompactRange:2");
5907 for (auto& t
: threads
) {
5910 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
5914 TEST_F(DBTest
, DelayedWriteRate
) {
5915 const int kEntriesPerMemTable
= 100;
5916 const int kTotalFlushes
= 12;
5918 Options options
= CurrentOptions();
5919 env_
->SetBackgroundThreads(1, Env::LOW
);
5921 env_
->no_slowdown_
= true;
5922 options
.write_buffer_size
= 100000000;
5923 options
.max_write_buffer_number
= 256;
5924 options
.max_background_compactions
= 1;
5925 options
.level0_file_num_compaction_trigger
= 3;
5926 options
.level0_slowdown_writes_trigger
= 3;
5927 options
.level0_stop_writes_trigger
= 999999;
5928 options
.delayed_write_rate
= 20000000; // Start with 200MB/s
5929 options
.memtable_factory
.reset(
5930 new SpecialSkipListFactory(kEntriesPerMemTable
));
5932 CreateAndReopenWithCF({"pikachu"}, options
);
5934 // Block compactions
5935 test::SleepingBackgroundTask sleeping_task_low
;
5936 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
5937 Env::Priority::LOW
);
5939 for (int i
= 0; i
< 3; i
++) {
5940 Put(Key(i
), std::string(10000, 'x'));
5944 // These writes will be slowed down to 1KB/s
5945 uint64_t estimated_sleep_time
= 0;
5948 uint64_t cur_rate
= options
.delayed_write_rate
;
5949 for (int i
= 0; i
< kTotalFlushes
; i
++) {
5950 uint64_t size_memtable
= 0;
5951 for (int j
= 0; j
< kEntriesPerMemTable
; j
++) {
5952 auto rand_num
= rnd
.Uniform(20);
5953 // Spread the size range to more.
5954 size_t entry_size
= rand_num
* rand_num
* rand_num
;
5956 Put(Key(i
), std::string(entry_size
, 'x'), wo
);
5957 size_memtable
+= entry_size
+ 18;
5958 // Occasionally sleep a while
5959 if (rnd
.Uniform(20) == 6) {
5960 env_
->SleepForMicroseconds(2666);
5963 dbfull()->TEST_WaitForFlushMemTable();
5964 estimated_sleep_time
+= size_memtable
* 1000000u / cur_rate
;
5965 // Slow down twice. One for memtable switch and one for flush finishes.
5966 cur_rate
= static_cast<uint64_t>(static_cast<double>(cur_rate
) *
5967 kIncSlowdownRatio
* kIncSlowdownRatio
);
5969 // Estimate the total sleep time fall into the rough range.
5970 ASSERT_GT(env_
->addon_time_
.load(),
5971 static_cast<int64_t>(estimated_sleep_time
/ 2));
5972 ASSERT_LT(env_
->addon_time_
.load(),
5973 static_cast<int64_t>(estimated_sleep_time
* 2));
5975 env_
->no_slowdown_
= false;
5976 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
5977 sleeping_task_low
.WakeUp();
5978 sleeping_task_low
.WaitUntilDone();
5981 TEST_F(DBTest
, HardLimit
) {
5982 Options options
= CurrentOptions();
5984 env_
->SetBackgroundThreads(1, Env::LOW
);
5985 options
.max_write_buffer_number
= 256;
5986 options
.write_buffer_size
= 110 << 10; // 110KB
5987 options
.arena_block_size
= 4 * 1024;
5988 options
.level0_file_num_compaction_trigger
= 4;
5989 options
.level0_slowdown_writes_trigger
= 999999;
5990 options
.level0_stop_writes_trigger
= 999999;
5991 options
.hard_pending_compaction_bytes_limit
= 800 << 10;
5992 options
.max_bytes_for_level_base
= 10000000000u;
5993 options
.max_background_compactions
= 1;
5994 options
.memtable_factory
.reset(
5995 new SpecialSkipListFactory(KNumKeysByGenerateNewFile
- 1));
5997 env_
->SetBackgroundThreads(1, Env::LOW
);
5998 test::SleepingBackgroundTask sleeping_task_low
;
5999 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
6000 Env::Priority::LOW
);
6002 CreateAndReopenWithCF({"pikachu"}, options
);
6004 std::atomic
<int> callback_count(0);
6005 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
6006 "DBImpl::DelayWrite:Wait", [&](void* /*arg*/) {
6007 callback_count
.fetch_add(1);
6008 sleeping_task_low
.WakeUp();
6010 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
6014 for (int num
= 0; num
< 5; num
++) {
6015 GenerateNewFile(&rnd
, &key_idx
, true);
6016 dbfull()->TEST_WaitForFlushMemTable();
6019 ASSERT_EQ(0, callback_count
.load());
6021 for (int num
= 0; num
< 5; num
++) {
6022 GenerateNewFile(&rnd
, &key_idx
, true);
6023 dbfull()->TEST_WaitForFlushMemTable();
6025 ASSERT_GE(callback_count
.load(), 1);
6027 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
6028 sleeping_task_low
.WaitUntilDone();
6031 #if !defined(ROCKSDB_LITE) && !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION)
6032 class WriteStallListener
: public EventListener
{
6034 WriteStallListener() : condition_(WriteStallCondition::kNormal
) {}
6035 void OnStallConditionsChanged(const WriteStallInfo
& info
) override
{
6036 MutexLock
l(&mutex_
);
6037 condition_
= info
.condition
.cur
;
6039 bool CheckCondition(WriteStallCondition expected
) {
6040 MutexLock
l(&mutex_
);
6041 return expected
== condition_
;
6045 WriteStallCondition condition_
;
6048 TEST_F(DBTest
, SoftLimit
) {
6049 Options options
= CurrentOptions();
6051 options
.write_buffer_size
= 100000; // Small write buffer
6052 options
.max_write_buffer_number
= 256;
6053 options
.level0_file_num_compaction_trigger
= 1;
6054 options
.level0_slowdown_writes_trigger
= 3;
6055 options
.level0_stop_writes_trigger
= 999999;
6056 options
.delayed_write_rate
= 20000; // About 200KB/s limited rate
6057 options
.soft_pending_compaction_bytes_limit
= 160000;
6058 options
.target_file_size_base
= 99999999; // All into one file
6059 options
.max_bytes_for_level_base
= 50000;
6060 options
.max_bytes_for_level_multiplier
= 10;
6061 options
.max_background_compactions
= 1;
6062 options
.compression
= kNoCompression
;
6063 WriteStallListener
* listener
= new WriteStallListener();
6064 options
.listeners
.emplace_back(listener
);
6066 // FlushMemtable with opt.wait=true does not wait for
6067 // `OnStallConditionsChanged` being called. The event listener is triggered
6068 // on `JobContext::Clean`, which happens after flush result is installed.
6069 // We use sync point to create a custom WaitForFlush that waits for
6071 port::Mutex flush_mutex
;
6072 port::CondVar
flush_cv(&flush_mutex
);
6073 bool flush_finished
= false;
6074 auto InstallFlushCallback
= [&]() {
6076 MutexLock
l(&flush_mutex
);
6077 flush_finished
= false;
6079 SyncPoint::GetInstance()->SetCallBack(
6080 "DBImpl::BackgroundCallFlush:ContextCleanedUp", [&](void*) {
6082 MutexLock
l(&flush_mutex
);
6083 flush_finished
= true;
6085 flush_cv
.SignalAll();
6088 auto WaitForFlush
= [&]() {
6090 MutexLock
l(&flush_mutex
);
6091 while (!flush_finished
) {
6095 SyncPoint::GetInstance()->ClearCallBack(
6096 "DBImpl::BackgroundCallFlush:ContextCleanedUp");
6099 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
6103 // Generating 360KB in Level 3
6104 for (int i
= 0; i
< 72; i
++) {
6105 Put(Key(i
), std::string(5000, 'x'));
6107 dbfull()->TEST_FlushMemTable(true, true);
6110 dbfull()->TEST_WaitForCompact();
6111 MoveFilesToLevel(3);
6113 // Generating 360KB in Level 2
6114 for (int i
= 0; i
< 72; i
++) {
6115 Put(Key(i
), std::string(5000, 'x'));
6117 dbfull()->TEST_FlushMemTable(true, true);
6120 dbfull()->TEST_WaitForCompact();
6121 MoveFilesToLevel(2);
6125 test::SleepingBackgroundTask sleeping_task_low
;
6126 // Block compactions
6127 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
6128 Env::Priority::LOW
);
6129 sleeping_task_low
.WaitUntilSleeping();
6131 // Create 3 L0 files, making score of L0 to be 3.
6132 for (int i
= 0; i
< 3; i
++) {
6133 Put(Key(i
), std::string(5000, 'x'));
6134 Put(Key(100 - i
), std::string(5000, 'x'));
6135 // Flush the file. File size is around 30KB.
6136 InstallFlushCallback();
6137 dbfull()->TEST_FlushMemTable(true, true);
6140 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
6141 ASSERT_TRUE(listener
->CheckCondition(WriteStallCondition::kDelayed
));
6143 sleeping_task_low
.WakeUp();
6144 sleeping_task_low
.WaitUntilDone();
6145 sleeping_task_low
.Reset();
6146 dbfull()->TEST_WaitForCompact();
6148 // Now there is one L1 file but doesn't trigger soft_rate_limit
6149 // The L1 file size is around 30KB.
6150 ASSERT_EQ(NumTableFilesAtLevel(1), 1);
6151 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
6152 ASSERT_TRUE(listener
->CheckCondition(WriteStallCondition::kNormal
));
6154 // Only allow one compactin going through.
6155 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
6156 "BackgroundCallCompaction:0", [&](void* /*arg*/) {
6157 // Schedule a sleeping task.
6158 sleeping_task_low
.Reset();
6159 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
,
6160 &sleeping_task_low
, Env::Priority::LOW
);
6163 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
6164 Env::Priority::LOW
);
6165 sleeping_task_low
.WaitUntilSleeping();
6166 // Create 3 L0 files, making score of L0 to be 3
6167 for (int i
= 0; i
< 3; i
++) {
6168 Put(Key(10 + i
), std::string(5000, 'x'));
6169 Put(Key(90 - i
), std::string(5000, 'x'));
6170 // Flush the file. File size is around 30KB.
6171 InstallFlushCallback();
6172 dbfull()->TEST_FlushMemTable(true, true);
6176 // Wake up sleep task to enable compaction to run and waits
6177 // for it to go to sleep state again to make sure one compaction
6179 sleeping_task_low
.WakeUp();
6180 sleeping_task_low
.WaitUntilSleeping();
6182 // Now there is one L1 file (around 60KB) which exceeds 50KB base by 10KB
6183 // Given level multiplier 10, estimated pending compaction is around 100KB
6184 // doesn't trigger soft_pending_compaction_bytes_limit
6185 ASSERT_EQ(NumTableFilesAtLevel(1), 1);
6186 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
6187 ASSERT_TRUE(listener
->CheckCondition(WriteStallCondition::kNormal
));
6189 // Create 3 L0 files, making score of L0 to be 3, higher than L0.
6190 for (int i
= 0; i
< 3; i
++) {
6191 Put(Key(20 + i
), std::string(5000, 'x'));
6192 Put(Key(80 - i
), std::string(5000, 'x'));
6193 // Flush the file. File size is around 30KB.
6194 InstallFlushCallback();
6195 dbfull()->TEST_FlushMemTable(true, true);
6198 // Wake up sleep task to enable compaction to run and waits
6199 // for it to go to sleep state again to make sure one compaction
6201 sleeping_task_low
.WakeUp();
6202 sleeping_task_low
.WaitUntilSleeping();
6204 // Now there is one L1 file (around 90KB) which exceeds 50KB base by 40KB
6205 // L2 size is 360KB, so the estimated level fanout 4, estimated pending
6206 // compaction is around 200KB
6207 // triggerring soft_pending_compaction_bytes_limit
6208 ASSERT_EQ(NumTableFilesAtLevel(1), 1);
6209 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
6210 ASSERT_TRUE(listener
->CheckCondition(WriteStallCondition::kDelayed
));
6212 sleeping_task_low
.WakeUp();
6213 sleeping_task_low
.WaitUntilSleeping();
6215 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
6216 ASSERT_TRUE(listener
->CheckCondition(WriteStallCondition::kNormal
));
6218 // shrink level base so L2 will hit soft limit easier.
6219 ASSERT_OK(dbfull()->SetOptions({
6220 {"max_bytes_for_level_base", "5000"},
6225 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
6226 ASSERT_TRUE(listener
->CheckCondition(WriteStallCondition::kDelayed
));
6228 sleeping_task_low
.WaitUntilSleeping();
6229 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
6230 sleeping_task_low
.WakeUp();
6231 sleeping_task_low
.WaitUntilDone();
6234 TEST_F(DBTest
, LastWriteBufferDelay
) {
6235 Options options
= CurrentOptions();
6237 options
.write_buffer_size
= 100000;
6238 options
.max_write_buffer_number
= 4;
6239 options
.delayed_write_rate
= 20000;
6240 options
.compression
= kNoCompression
;
6241 options
.disable_auto_compactions
= true;
6242 int kNumKeysPerMemtable
= 3;
6243 options
.memtable_factory
.reset(
6244 new SpecialSkipListFactory(kNumKeysPerMemtable
));
6247 test::SleepingBackgroundTask sleeping_task
;
6249 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task
,
6250 Env::Priority::HIGH
);
6251 sleeping_task
.WaitUntilSleeping();
6253 // Create 3 L0 files, making score of L0 to be 3.
6254 for (int i
= 0; i
< 3; i
++) {
6255 // Fill one mem table
6256 for (int j
= 0; j
< kNumKeysPerMemtable
; j
++) {
6259 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
6261 // Inserting a new entry would create a new mem table, triggering slow down.
6263 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
6265 sleeping_task
.WakeUp();
6266 sleeping_task
.WaitUntilDone();
6268 #endif // !defined(ROCKSDB_LITE) && !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION)
6270 TEST_F(DBTest
, FailWhenCompressionNotSupportedTest
) {
6271 CompressionType compressions
[] = {kZlibCompression
, kBZip2Compression
,
6272 kLZ4Compression
, kLZ4HCCompression
,
6273 kXpressCompression
};
6274 for (auto comp
: compressions
) {
6275 if (!CompressionTypeSupported(comp
)) {
6276 // not supported, we should fail the Open()
6277 Options options
= CurrentOptions();
6278 options
.compression
= comp
;
6279 ASSERT_TRUE(!TryReopen(options
).ok());
6280 // Try if CreateColumnFamily also fails
6281 options
.compression
= kNoCompression
;
6282 ASSERT_OK(TryReopen(options
));
6283 ColumnFamilyOptions
cf_options(options
);
6284 cf_options
.compression
= comp
;
6285 ColumnFamilyHandle
* handle
;
6286 ASSERT_TRUE(!db_
->CreateColumnFamily(cf_options
, "name", &handle
).ok());
6291 TEST_F(DBTest
, CreateColumnFamilyShouldFailOnIncompatibleOptions
) {
6292 Options options
= CurrentOptions();
6293 options
.max_open_files
= 100;
6296 ColumnFamilyOptions
cf_options(options
);
6297 // ttl is now supported when max_open_files is -1.
6298 cf_options
.ttl
= 3600;
6299 ColumnFamilyHandle
* handle
;
6300 ASSERT_OK(db_
->CreateColumnFamily(cf_options
, "pikachu", &handle
));
6304 #ifndef ROCKSDB_LITE
6305 TEST_F(DBTest
, RowCache
) {
6306 Options options
= CurrentOptions();
6307 options
.statistics
= ROCKSDB_NAMESPACE::CreateDBStatistics();
6308 options
.row_cache
= NewLRUCache(8192);
6309 DestroyAndReopen(options
);
6311 ASSERT_OK(Put("foo", "bar"));
6314 ASSERT_EQ(TestGetTickerCount(options
, ROW_CACHE_HIT
), 0);
6315 ASSERT_EQ(TestGetTickerCount(options
, ROW_CACHE_MISS
), 0);
6316 ASSERT_EQ(Get("foo"), "bar");
6317 ASSERT_EQ(TestGetTickerCount(options
, ROW_CACHE_HIT
), 0);
6318 ASSERT_EQ(TestGetTickerCount(options
, ROW_CACHE_MISS
), 1);
6319 ASSERT_EQ(Get("foo"), "bar");
6320 ASSERT_EQ(TestGetTickerCount(options
, ROW_CACHE_HIT
), 1);
6321 ASSERT_EQ(TestGetTickerCount(options
, ROW_CACHE_MISS
), 1);
6324 TEST_F(DBTest
, PinnableSliceAndRowCache
) {
6325 Options options
= CurrentOptions();
6326 options
.statistics
= ROCKSDB_NAMESPACE::CreateDBStatistics();
6327 options
.row_cache
= NewLRUCache(8192);
6328 DestroyAndReopen(options
);
6330 ASSERT_OK(Put("foo", "bar"));
6333 ASSERT_EQ(Get("foo"), "bar");
6335 reinterpret_cast<LRUCache
*>(options
.row_cache
.get())->TEST_GetLRUSize(),
6339 PinnableSlice pin_slice
;
6340 ASSERT_EQ(Get("foo", &pin_slice
), Status::OK());
6341 ASSERT_EQ(pin_slice
.ToString(), "bar");
6342 // Entry is already in cache, lookup will remove the element from lru
6344 reinterpret_cast<LRUCache
*>(options
.row_cache
.get())->TEST_GetLRUSize(),
6347 // After PinnableSlice destruction element is added back in LRU
6349 reinterpret_cast<LRUCache
*>(options
.row_cache
.get())->TEST_GetLRUSize(),
6353 #endif // ROCKSDB_LITE
6355 TEST_F(DBTest
, DeletingOldWalAfterDrop
) {
6356 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
6357 {{"Test:AllowFlushes", "DBImpl::BGWorkFlush"},
6358 {"DBImpl::BGWorkFlush:done", "Test:WaitForFlush"}});
6359 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
6361 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
6362 Options options
= CurrentOptions();
6363 options
.max_total_wal_size
= 8192;
6364 options
.compression
= kNoCompression
;
6365 options
.write_buffer_size
= 1 << 20;
6366 options
.level0_file_num_compaction_trigger
= (1 << 30);
6367 options
.level0_slowdown_writes_trigger
= (1 << 30);
6368 options
.level0_stop_writes_trigger
= (1 << 30);
6369 options
.disable_auto_compactions
= true;
6370 DestroyAndReopen(options
);
6371 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
6373 CreateColumnFamilies({"cf1", "cf2"}, options
);
6374 ASSERT_OK(Put(0, "key1", DummyString(8192)));
6375 ASSERT_OK(Put(0, "key2", DummyString(8192)));
6376 // the oldest wal should now be getting_flushed
6377 ASSERT_OK(db_
->DropColumnFamily(handles_
[0]));
6378 // all flushes should now do nothing because their CF is dropped
6379 TEST_SYNC_POINT("Test:AllowFlushes");
6380 TEST_SYNC_POINT("Test:WaitForFlush");
6381 uint64_t lognum1
= dbfull()->TEST_LogfileNumber();
6382 ASSERT_OK(Put(1, "key3", DummyString(8192)));
6383 ASSERT_OK(Put(1, "key4", DummyString(8192)));
6384 // new wal should have been created
6385 uint64_t lognum2
= dbfull()->TEST_LogfileNumber();
6386 EXPECT_GT(lognum2
, lognum1
);
6389 TEST_F(DBTest
, UnsupportedManualSync
) {
6390 DestroyAndReopen(CurrentOptions());
6391 env_
->is_wal_sync_thread_safe_
.store(false);
6392 Status s
= db_
->SyncWAL();
6393 ASSERT_TRUE(s
.IsNotSupported());
6396 INSTANTIATE_TEST_CASE_P(DBTestWithParam
, DBTestWithParam
,
6397 ::testing::Combine(::testing::Values(1, 4),
6398 ::testing::Bool()));
6400 TEST_F(DBTest
, PauseBackgroundWorkTest
) {
6401 Options options
= CurrentOptions();
6402 options
.write_buffer_size
= 100000; // Small write buffer
6405 std::vector
<port::Thread
> threads
;
6406 std::atomic
<bool> done(false);
6407 db_
->PauseBackgroundWork();
6408 threads
.emplace_back([&]() {
6410 for (int i
= 0; i
< 10000; ++i
) {
6411 Put(RandomString(&rnd
, 10), RandomString(&rnd
, 10));
6415 env_
->SleepForMicroseconds(200000);
6416 // make sure the thread is not done
6417 ASSERT_FALSE(done
.load());
6418 db_
->ContinueBackgroundWork();
6419 for (auto& t
: threads
) {
6423 ASSERT_TRUE(done
.load());
6426 // Keep spawning short-living threads that create an iterator and quit.
6427 // Meanwhile in another thread keep flushing memtables.
6428 // This used to cause a deadlock.
6429 TEST_F(DBTest
, ThreadLocalPtrDeadlock
) {
6430 std::atomic
<int> flushes_done
{0};
6431 std::atomic
<int> threads_destroyed
{0};
6433 return flushes_done
.load() > 10;
6436 port::Thread
flushing_thread([&] {
6437 for (int i
= 0; !done(); ++i
) {
6438 ASSERT_OK(db_
->Put(WriteOptions(), Slice("hi"),
6439 Slice(std::to_string(i
).c_str())));
6440 ASSERT_OK(db_
->Flush(FlushOptions()));
6441 int cnt
= ++flushes_done
;
6442 fprintf(stderr
, "Flushed %d times\n", cnt
);
6446 std::vector
<port::Thread
> thread_spawning_threads(10);
6447 for (auto& t
: thread_spawning_threads
) {
6448 t
= port::Thread([&] {
6451 port::Thread
tmp_thread([&] {
6452 auto it
= db_
->NewIterator(ReadOptions());
6457 ++threads_destroyed
;
6462 for (auto& t
: thread_spawning_threads
) {
6465 flushing_thread
.join();
6466 fprintf(stderr
, "Done. Flushed %d times, destroyed %d threads\n",
6467 flushes_done
.load(), threads_destroyed
.load());
6470 TEST_F(DBTest
, LargeBlockSizeTest
) {
6471 Options options
= CurrentOptions();
6472 CreateAndReopenWithCF({"pikachu"}, options
);
6473 ASSERT_OK(Put(0, "foo", "bar"));
6474 BlockBasedTableOptions table_options
;
6475 table_options
.block_size
= 8LL * 1024 * 1024 * 1024LL;
6476 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
6477 ASSERT_NOK(TryReopenWithColumnFamilies({"default", "pikachu"}, options
));
6480 #ifndef ROCKSDB_LITE
6482 TEST_F(DBTest
, CreationTimeOfOldestFile
) {
6483 const int kNumKeysPerFile
= 32;
6484 const int kNumLevelFiles
= 2;
6485 const int kValueSize
= 100;
6487 Options options
= CurrentOptions();
6488 options
.max_open_files
= -1;
6489 env_
->time_elapse_only_sleep_
= false;
6492 env_
->addon_time_
.store(0);
6493 DestroyAndReopen(options
);
6495 bool set_file_creation_time_to_zero
= true;
6499 env_
->GetCurrentTime(&time_1
);
6500 const uint64_t uint_time_1
= static_cast<uint64_t>(time_1
);
6503 env_
->addon_time_
.fetch_add(50 * 60 * 60);
6506 env_
->GetCurrentTime(&time_2
);
6507 const uint64_t uint_time_2
= static_cast<uint64_t>(time_2
);
6509 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
6510 "PropertyBlockBuilder::AddTableProperty:Start", [&](void* arg
) {
6511 TableProperties
* props
= reinterpret_cast<TableProperties
*>(arg
);
6512 if (set_file_creation_time_to_zero
) {
6514 props
->file_creation_time
= 0;
6516 } else if (idx
== 1) {
6517 props
->file_creation_time
= uint_time_1
;
6522 props
->file_creation_time
= uint_time_1
;
6524 } else if (idx
== 1) {
6525 props
->file_creation_time
= uint_time_2
;
6529 // Set file creation time in manifest all to 0.
6530 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
6531 "FileMetaData::FileMetaData", [&](void* arg
) {
6532 FileMetaData
* meta
= static_cast<FileMetaData
*>(arg
);
6533 meta
->file_creation_time
= 0;
6535 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
6538 for (int i
= 0; i
< kNumLevelFiles
; ++i
) {
6539 for (int j
= 0; j
< kNumKeysPerFile
; ++j
) {
6541 Put(Key(i
* kNumKeysPerFile
+ j
), RandomString(&rnd
, kValueSize
)));
6546 // At this point there should be 2 files, one with file_creation_time = 0 and
6547 // the other non-zero. GetCreationTimeOfOldestFile API should return 0.
6548 uint64_t creation_time
;
6549 Status s1
= dbfull()->GetCreationTimeOfOldestFile(&creation_time
);
6550 ASSERT_EQ(0, creation_time
);
6551 ASSERT_EQ(s1
, Status::OK());
6553 // Testing with non-zero file creation time.
6554 set_file_creation_time_to_zero
= false;
6555 options
= CurrentOptions();
6556 options
.max_open_files
= -1;
6557 env_
->time_elapse_only_sleep_
= false;
6560 env_
->addon_time_
.store(0);
6561 DestroyAndReopen(options
);
6563 for (int i
= 0; i
< kNumLevelFiles
; ++i
) {
6564 for (int j
= 0; j
< kNumKeysPerFile
; ++j
) {
6566 Put(Key(i
* kNumKeysPerFile
+ j
), RandomString(&rnd
, kValueSize
)));
6571 // At this point there should be 2 files with non-zero file creation time.
6572 // GetCreationTimeOfOldestFile API should return non-zero value.
6574 Status s2
= dbfull()->GetCreationTimeOfOldestFile(&ctime
);
6575 ASSERT_EQ(uint_time_1
, ctime
);
6576 ASSERT_EQ(s2
, Status::OK());
6578 // Testing with max_open_files != -1
6579 options
= CurrentOptions();
6580 options
.max_open_files
= 10;
6581 DestroyAndReopen(options
);
6582 Status s3
= dbfull()->GetCreationTimeOfOldestFile(&ctime
);
6583 ASSERT_EQ(s3
, Status::NotSupported());
6585 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
6590 } // namespace ROCKSDB_NAMESPACE
6592 #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
6594 void RegisterCustomObjects(int argc
, char** argv
);
6597 void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
6598 #endif // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
6600 int main(int argc
, char** argv
) {
6601 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
6602 ::testing::InitGoogleTest(&argc
, argv
);
6603 RegisterCustomObjects(argc
, argv
);
6604 return RUN_ALL_TESTS();