]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_test.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / db / db_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 // Introduction of SyncPoint effectively disabled building and running this test
11 // in Release build.
12 // which is a pity, it is a good test
13 #include <fcntl.h>
14 #include <algorithm>
15 #include <set>
16 #include <thread>
17 #include <unordered_set>
18 #include <utility>
19 #ifndef OS_WIN
20 #include <unistd.h>
21 #endif
22 #ifdef OS_SOLARIS
23 #include <alloca.h>
24 #endif
25
26 #include "cache/lru_cache.h"
27 #include "db/blob_index.h"
28 #include "db/db_impl/db_impl.h"
29 #include "db/db_test_util.h"
30 #include "db/dbformat.h"
31 #include "db/job_context.h"
32 #include "db/version_set.h"
33 #include "db/write_batch_internal.h"
34 #include "env/mock_env.h"
35 #include "file/filename.h"
36 #include "memtable/hash_linklist_rep.h"
37 #include "monitoring/thread_status_util.h"
38 #include "port/port.h"
39 #include "port/stack_trace.h"
40 #include "rocksdb/cache.h"
41 #include "rocksdb/compaction_filter.h"
42 #include "rocksdb/convenience.h"
43 #include "rocksdb/db.h"
44 #include "rocksdb/env.h"
45 #include "rocksdb/experimental.h"
46 #include "rocksdb/filter_policy.h"
47 #include "rocksdb/options.h"
48 #include "rocksdb/perf_context.h"
49 #include "rocksdb/slice.h"
50 #include "rocksdb/slice_transform.h"
51 #include "rocksdb/snapshot.h"
52 #include "rocksdb/table.h"
53 #include "rocksdb/table_properties.h"
54 #include "rocksdb/thread_status.h"
55 #include "rocksdb/utilities/checkpoint.h"
56 #include "rocksdb/utilities/optimistic_transaction_db.h"
57 #include "rocksdb/utilities/write_batch_with_index.h"
58 #include "table/block_based/block_based_table_factory.h"
59 #include "table/mock_table.h"
60 #include "table/plain/plain_table_factory.h"
61 #include "table/scoped_arena_iterator.h"
62 #include "test_util/sync_point.h"
63 #include "test_util/testharness.h"
64 #include "test_util/testutil.h"
65 #include "util/compression.h"
66 #include "util/mutexlock.h"
67 #include "util/rate_limiter.h"
68 #include "util/string_util.h"
69 #include "utilities/merge_operators.h"
70
71 namespace ROCKSDB_NAMESPACE {
72
73 class DBTest : public DBTestBase {
74 public:
75 DBTest() : DBTestBase("/db_test") {}
76 };
77
78 class DBTestWithParam
79 : public DBTest,
80 public testing::WithParamInterface<std::tuple<uint32_t, bool>> {
81 public:
82 DBTestWithParam() {
83 max_subcompactions_ = std::get<0>(GetParam());
84 exclusive_manual_compaction_ = std::get<1>(GetParam());
85 }
86
87 // Required if inheriting from testing::WithParamInterface<>
88 static void SetUpTestCase() {}
89 static void TearDownTestCase() {}
90
91 uint32_t max_subcompactions_;
92 bool exclusive_manual_compaction_;
93 };
94
95 TEST_F(DBTest, MockEnvTest) {
96 std::unique_ptr<MockEnv> env{new MockEnv(Env::Default())};
97 Options options;
98 options.create_if_missing = true;
99 options.env = env.get();
100 DB* db;
101
102 const Slice keys[] = {Slice("aaa"), Slice("bbb"), Slice("ccc")};
103 const Slice vals[] = {Slice("foo"), Slice("bar"), Slice("baz")};
104
105 ASSERT_OK(DB::Open(options, "/dir/db", &db));
106 for (size_t i = 0; i < 3; ++i) {
107 ASSERT_OK(db->Put(WriteOptions(), keys[i], vals[i]));
108 }
109
110 for (size_t i = 0; i < 3; ++i) {
111 std::string res;
112 ASSERT_OK(db->Get(ReadOptions(), keys[i], &res));
113 ASSERT_TRUE(res == vals[i]);
114 }
115
116 Iterator* iterator = db->NewIterator(ReadOptions());
117 iterator->SeekToFirst();
118 for (size_t i = 0; i < 3; ++i) {
119 ASSERT_TRUE(iterator->Valid());
120 ASSERT_TRUE(keys[i] == iterator->key());
121 ASSERT_TRUE(vals[i] == iterator->value());
122 iterator->Next();
123 }
124 ASSERT_TRUE(!iterator->Valid());
125 delete iterator;
126
127 // TEST_FlushMemTable() is not supported in ROCKSDB_LITE
128 #ifndef ROCKSDB_LITE
129 DBImpl* dbi = reinterpret_cast<DBImpl*>(db);
130 ASSERT_OK(dbi->TEST_FlushMemTable());
131
132 for (size_t i = 0; i < 3; ++i) {
133 std::string res;
134 ASSERT_OK(db->Get(ReadOptions(), keys[i], &res));
135 ASSERT_TRUE(res == vals[i]);
136 }
137 #endif // ROCKSDB_LITE
138
139 delete db;
140 }
141
142 // NewMemEnv returns nullptr in ROCKSDB_LITE since class InMemoryEnv isn't
143 // defined.
144 #ifndef ROCKSDB_LITE
145 TEST_F(DBTest, MemEnvTest) {
146 std::unique_ptr<Env> env{NewMemEnv(Env::Default())};
147 Options options;
148 options.create_if_missing = true;
149 options.env = env.get();
150 DB* db;
151
152 const Slice keys[] = {Slice("aaa"), Slice("bbb"), Slice("ccc")};
153 const Slice vals[] = {Slice("foo"), Slice("bar"), Slice("baz")};
154
155 ASSERT_OK(DB::Open(options, "/dir/db", &db));
156 for (size_t i = 0; i < 3; ++i) {
157 ASSERT_OK(db->Put(WriteOptions(), keys[i], vals[i]));
158 }
159
160 for (size_t i = 0; i < 3; ++i) {
161 std::string res;
162 ASSERT_OK(db->Get(ReadOptions(), keys[i], &res));
163 ASSERT_TRUE(res == vals[i]);
164 }
165
166 Iterator* iterator = db->NewIterator(ReadOptions());
167 iterator->SeekToFirst();
168 for (size_t i = 0; i < 3; ++i) {
169 ASSERT_TRUE(iterator->Valid());
170 ASSERT_TRUE(keys[i] == iterator->key());
171 ASSERT_TRUE(vals[i] == iterator->value());
172 iterator->Next();
173 }
174 ASSERT_TRUE(!iterator->Valid());
175 delete iterator;
176
177 DBImpl* dbi = reinterpret_cast<DBImpl*>(db);
178 ASSERT_OK(dbi->TEST_FlushMemTable());
179
180 for (size_t i = 0; i < 3; ++i) {
181 std::string res;
182 ASSERT_OK(db->Get(ReadOptions(), keys[i], &res));
183 ASSERT_TRUE(res == vals[i]);
184 }
185
186 delete db;
187
188 options.create_if_missing = false;
189 ASSERT_OK(DB::Open(options, "/dir/db", &db));
190 for (size_t i = 0; i < 3; ++i) {
191 std::string res;
192 ASSERT_OK(db->Get(ReadOptions(), keys[i], &res));
193 ASSERT_TRUE(res == vals[i]);
194 }
195 delete db;
196 }
197 #endif // ROCKSDB_LITE
198
199 TEST_F(DBTest, WriteEmptyBatch) {
200 Options options = CurrentOptions();
201 options.env = env_;
202 options.write_buffer_size = 100000;
203 CreateAndReopenWithCF({"pikachu"}, options);
204
205 ASSERT_OK(Put(1, "foo", "bar"));
206 WriteOptions wo;
207 wo.sync = true;
208 wo.disableWAL = false;
209 WriteBatch empty_batch;
210 ASSERT_OK(dbfull()->Write(wo, &empty_batch));
211
212 // make sure we can re-open it.
213 ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
214 ASSERT_EQ("bar", Get(1, "foo"));
215 }
216
217 TEST_F(DBTest, SkipDelay) {
218 Options options = CurrentOptions();
219 options.env = env_;
220 options.write_buffer_size = 100000;
221 CreateAndReopenWithCF({"pikachu"}, options);
222
223 for (bool sync : {true, false}) {
224 for (bool disableWAL : {true, false}) {
225 if (sync && disableWAL) {
226 // sync and disableWAL is incompatible.
227 continue;
228 }
229 // Use a small number to ensure a large delay that is still effective
230 // when we do Put
231 // TODO(myabandeh): this is time dependent and could potentially make
232 // the test flaky
233 auto token = dbfull()->TEST_write_controler().GetDelayToken(1);
234 std::atomic<int> sleep_count(0);
235 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
236 "DBImpl::DelayWrite:Sleep",
237 [&](void* /*arg*/) { sleep_count.fetch_add(1); });
238 std::atomic<int> wait_count(0);
239 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
240 "DBImpl::DelayWrite:Wait",
241 [&](void* /*arg*/) { wait_count.fetch_add(1); });
242 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
243
244 WriteOptions wo;
245 wo.sync = sync;
246 wo.disableWAL = disableWAL;
247 wo.no_slowdown = true;
248 dbfull()->Put(wo, "foo", "bar");
249 // We need the 2nd write to trigger delay. This is because delay is
250 // estimated based on the last write size which is 0 for the first write.
251 ASSERT_NOK(dbfull()->Put(wo, "foo2", "bar2"));
252 ASSERT_GE(sleep_count.load(), 0);
253 ASSERT_GE(wait_count.load(), 0);
254 token.reset();
255
256 token = dbfull()->TEST_write_controler().GetDelayToken(1000000000);
257 wo.no_slowdown = false;
258 ASSERT_OK(dbfull()->Put(wo, "foo3", "bar3"));
259 ASSERT_GE(sleep_count.load(), 1);
260 token.reset();
261 }
262 }
263 }
264
265 TEST_F(DBTest, MixedSlowdownOptions) {
266 Options options = CurrentOptions();
267 options.env = env_;
268 options.write_buffer_size = 100000;
269 CreateAndReopenWithCF({"pikachu"}, options);
270 std::vector<port::Thread> threads;
271 std::atomic<int> thread_num(0);
272
273 std::function<void()> write_slowdown_func = [&]() {
274 int a = thread_num.fetch_add(1);
275 std::string key = "foo" + std::to_string(a);
276 WriteOptions wo;
277 wo.no_slowdown = false;
278 ASSERT_OK(dbfull()->Put(wo, key, "bar"));
279 };
280 std::function<void()> write_no_slowdown_func = [&]() {
281 int a = thread_num.fetch_add(1);
282 std::string key = "foo" + std::to_string(a);
283 WriteOptions wo;
284 wo.no_slowdown = true;
285 ASSERT_NOK(dbfull()->Put(wo, key, "bar"));
286 };
287 // Use a small number to ensure a large delay that is still effective
288 // when we do Put
289 // TODO(myabandeh): this is time dependent and could potentially make
290 // the test flaky
291 auto token = dbfull()->TEST_write_controler().GetDelayToken(1);
292 std::atomic<int> sleep_count(0);
293 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
294 "DBImpl::DelayWrite:BeginWriteStallDone", [&](void* /*arg*/) {
295 sleep_count.fetch_add(1);
296 if (threads.empty()) {
297 for (int i = 0; i < 2; ++i) {
298 threads.emplace_back(write_slowdown_func);
299 }
300 for (int i = 0; i < 2; ++i) {
301 threads.emplace_back(write_no_slowdown_func);
302 }
303 }
304 });
305 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
306
307 WriteOptions wo;
308 wo.sync = false;
309 wo.disableWAL = false;
310 wo.no_slowdown = false;
311 dbfull()->Put(wo, "foo", "bar");
312 // We need the 2nd write to trigger delay. This is because delay is
313 // estimated based on the last write size which is 0 for the first write.
314 ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2"));
315 token.reset();
316
317 for (auto& t : threads) {
318 t.join();
319 }
320 ASSERT_GE(sleep_count.load(), 1);
321
322 wo.no_slowdown = true;
323 ASSERT_OK(dbfull()->Put(wo, "foo3", "bar"));
324 }
325
326 TEST_F(DBTest, MixedSlowdownOptionsInQueue) {
327 Options options = CurrentOptions();
328 options.env = env_;
329 options.write_buffer_size = 100000;
330 CreateAndReopenWithCF({"pikachu"}, options);
331 std::vector<port::Thread> threads;
332 std::atomic<int> thread_num(0);
333
334 std::function<void()> write_no_slowdown_func = [&]() {
335 int a = thread_num.fetch_add(1);
336 std::string key = "foo" + std::to_string(a);
337 WriteOptions wo;
338 wo.no_slowdown = true;
339 ASSERT_NOK(dbfull()->Put(wo, key, "bar"));
340 };
341 // Use a small number to ensure a large delay that is still effective
342 // when we do Put
343 // TODO(myabandeh): this is time dependent and could potentially make
344 // the test flaky
345 auto token = dbfull()->TEST_write_controler().GetDelayToken(1);
346 std::atomic<int> sleep_count(0);
347 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
348 "DBImpl::DelayWrite:Sleep", [&](void* /*arg*/) {
349 sleep_count.fetch_add(1);
350 if (threads.empty()) {
351 for (int i = 0; i < 2; ++i) {
352 threads.emplace_back(write_no_slowdown_func);
353 }
354 // Sleep for 2s to allow the threads to insert themselves into the
355 // write queue
356 env_->SleepForMicroseconds(3000000ULL);
357 }
358 });
359 std::atomic<int> wait_count(0);
360 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
361 "DBImpl::DelayWrite:Wait",
362 [&](void* /*arg*/) { wait_count.fetch_add(1); });
363 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
364
365 WriteOptions wo;
366 wo.sync = false;
367 wo.disableWAL = false;
368 wo.no_slowdown = false;
369 dbfull()->Put(wo, "foo", "bar");
370 // We need the 2nd write to trigger delay. This is because delay is
371 // estimated based on the last write size which is 0 for the first write.
372 ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2"));
373 token.reset();
374
375 for (auto& t : threads) {
376 t.join();
377 }
378 ASSERT_EQ(sleep_count.load(), 1);
379 ASSERT_GE(wait_count.load(), 0);
380 }
381
382 TEST_F(DBTest, MixedSlowdownOptionsStop) {
383 Options options = CurrentOptions();
384 options.env = env_;
385 options.write_buffer_size = 100000;
386 CreateAndReopenWithCF({"pikachu"}, options);
387 std::vector<port::Thread> threads;
388 std::atomic<int> thread_num(0);
389
390 std::function<void()> write_slowdown_func = [&]() {
391 int a = thread_num.fetch_add(1);
392 std::string key = "foo" + std::to_string(a);
393 WriteOptions wo;
394 wo.no_slowdown = false;
395 ASSERT_OK(dbfull()->Put(wo, key, "bar"));
396 };
397 std::function<void()> write_no_slowdown_func = [&]() {
398 int a = thread_num.fetch_add(1);
399 std::string key = "foo" + std::to_string(a);
400 WriteOptions wo;
401 wo.no_slowdown = true;
402 ASSERT_NOK(dbfull()->Put(wo, key, "bar"));
403 };
404 std::function<void()> wakeup_writer = [&]() {
405 dbfull()->mutex_.Lock();
406 dbfull()->bg_cv_.SignalAll();
407 dbfull()->mutex_.Unlock();
408 };
409 // Use a small number to ensure a large delay that is still effective
410 // when we do Put
411 // TODO(myabandeh): this is time dependent and could potentially make
412 // the test flaky
413 auto token = dbfull()->TEST_write_controler().GetStopToken();
414 std::atomic<int> wait_count(0);
415 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
416 "DBImpl::DelayWrite:Wait", [&](void* /*arg*/) {
417 wait_count.fetch_add(1);
418 if (threads.empty()) {
419 for (int i = 0; i < 2; ++i) {
420 threads.emplace_back(write_slowdown_func);
421 }
422 for (int i = 0; i < 2; ++i) {
423 threads.emplace_back(write_no_slowdown_func);
424 }
425 // Sleep for 2s to allow the threads to insert themselves into the
426 // write queue
427 env_->SleepForMicroseconds(3000000ULL);
428 }
429 token.reset();
430 threads.emplace_back(wakeup_writer);
431 });
432 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
433
434 WriteOptions wo;
435 wo.sync = false;
436 wo.disableWAL = false;
437 wo.no_slowdown = false;
438 dbfull()->Put(wo, "foo", "bar");
439 // We need the 2nd write to trigger delay. This is because delay is
440 // estimated based on the last write size which is 0 for the first write.
441 ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2"));
442 token.reset();
443
444 for (auto& t : threads) {
445 t.join();
446 }
447 ASSERT_GE(wait_count.load(), 1);
448
449 wo.no_slowdown = true;
450 ASSERT_OK(dbfull()->Put(wo, "foo3", "bar"));
451 }
452 #ifndef ROCKSDB_LITE
453
454 TEST_F(DBTest, LevelLimitReopen) {
455 Options options = CurrentOptions();
456 CreateAndReopenWithCF({"pikachu"}, options);
457
458 const std::string value(1024 * 1024, ' ');
459 int i = 0;
460 while (NumTableFilesAtLevel(2, 1) == 0) {
461 ASSERT_OK(Put(1, Key(i++), value));
462 }
463
464 options.num_levels = 1;
465 options.max_bytes_for_level_multiplier_additional.resize(1, 1);
466 Status s = TryReopenWithColumnFamilies({"default", "pikachu"}, options);
467 ASSERT_EQ(s.IsInvalidArgument(), true);
468 ASSERT_EQ(s.ToString(),
469 "Invalid argument: db has more levels than options.num_levels");
470
471 options.num_levels = 10;
472 options.max_bytes_for_level_multiplier_additional.resize(10, 1);
473 ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
474 }
475 #endif // ROCKSDB_LITE
476
477
478 TEST_F(DBTest, PutSingleDeleteGet) {
479 do {
480 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
481 ASSERT_OK(Put(1, "foo", "v1"));
482 ASSERT_EQ("v1", Get(1, "foo"));
483 ASSERT_OK(Put(1, "foo2", "v2"));
484 ASSERT_EQ("v2", Get(1, "foo2"));
485 ASSERT_OK(SingleDelete(1, "foo"));
486 ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
487 // Skip FIFO and universal compaction beccause they do not apply to the test
488 // case. Skip MergePut because single delete does not get removed when it
489 // encounters a merge.
490 } while (ChangeOptions(kSkipFIFOCompaction | kSkipUniversalCompaction |
491 kSkipMergePut));
492 }
493
494 TEST_F(DBTest, ReadFromPersistedTier) {
495 do {
496 Random rnd(301);
497 Options options = CurrentOptions();
498 for (int disableWAL = 0; disableWAL <= 1; ++disableWAL) {
499 CreateAndReopenWithCF({"pikachu"}, options);
500 WriteOptions wopt;
501 wopt.disableWAL = (disableWAL == 1);
502 // 1st round: put but not flush
503 ASSERT_OK(db_->Put(wopt, handles_[1], "foo", "first"));
504 ASSERT_OK(db_->Put(wopt, handles_[1], "bar", "one"));
505 ASSERT_EQ("first", Get(1, "foo"));
506 ASSERT_EQ("one", Get(1, "bar"));
507
508 // Read directly from persited data.
509 ReadOptions ropt;
510 ropt.read_tier = kPersistedTier;
511 std::string value;
512 if (wopt.disableWAL) {
513 // as data has not yet being flushed, we expect not found.
514 ASSERT_TRUE(db_->Get(ropt, handles_[1], "foo", &value).IsNotFound());
515 ASSERT_TRUE(db_->Get(ropt, handles_[1], "bar", &value).IsNotFound());
516 } else {
517 ASSERT_OK(db_->Get(ropt, handles_[1], "foo", &value));
518 ASSERT_OK(db_->Get(ropt, handles_[1], "bar", &value));
519 }
520
521 // Multiget
522 std::vector<ColumnFamilyHandle*> multiget_cfs;
523 multiget_cfs.push_back(handles_[1]);
524 multiget_cfs.push_back(handles_[1]);
525 std::vector<Slice> multiget_keys;
526 multiget_keys.push_back("foo");
527 multiget_keys.push_back("bar");
528 std::vector<std::string> multiget_values;
529 auto statuses =
530 db_->MultiGet(ropt, multiget_cfs, multiget_keys, &multiget_values);
531 if (wopt.disableWAL) {
532 ASSERT_TRUE(statuses[0].IsNotFound());
533 ASSERT_TRUE(statuses[1].IsNotFound());
534 } else {
535 ASSERT_OK(statuses[0]);
536 ASSERT_OK(statuses[1]);
537 }
538
539 // 2nd round: flush and put a new value in memtable.
540 ASSERT_OK(Flush(1));
541 ASSERT_OK(db_->Put(wopt, handles_[1], "rocksdb", "hello"));
542
543 // once the data has been flushed, we are able to get the
544 // data when kPersistedTier is used.
545 ASSERT_TRUE(db_->Get(ropt, handles_[1], "foo", &value).ok());
546 ASSERT_EQ(value, "first");
547 ASSERT_TRUE(db_->Get(ropt, handles_[1], "bar", &value).ok());
548 ASSERT_EQ(value, "one");
549 if (wopt.disableWAL) {
550 ASSERT_TRUE(
551 db_->Get(ropt, handles_[1], "rocksdb", &value).IsNotFound());
552 } else {
553 ASSERT_OK(db_->Get(ropt, handles_[1], "rocksdb", &value));
554 ASSERT_EQ(value, "hello");
555 }
556
557 // Expect same result in multiget
558 multiget_cfs.push_back(handles_[1]);
559 multiget_keys.push_back("rocksdb");
560 statuses =
561 db_->MultiGet(ropt, multiget_cfs, multiget_keys, &multiget_values);
562 ASSERT_TRUE(statuses[0].ok());
563 ASSERT_EQ("first", multiget_values[0]);
564 ASSERT_TRUE(statuses[1].ok());
565 ASSERT_EQ("one", multiget_values[1]);
566 if (wopt.disableWAL) {
567 ASSERT_TRUE(statuses[2].IsNotFound());
568 } else {
569 ASSERT_OK(statuses[2]);
570 }
571
572 // 3rd round: delete and flush
573 ASSERT_OK(db_->Delete(wopt, handles_[1], "foo"));
574 Flush(1);
575 ASSERT_OK(db_->Delete(wopt, handles_[1], "bar"));
576
577 ASSERT_TRUE(db_->Get(ropt, handles_[1], "foo", &value).IsNotFound());
578 if (wopt.disableWAL) {
579 // Still expect finding the value as its delete has not yet being
580 // flushed.
581 ASSERT_TRUE(db_->Get(ropt, handles_[1], "bar", &value).ok());
582 ASSERT_EQ(value, "one");
583 } else {
584 ASSERT_TRUE(db_->Get(ropt, handles_[1], "bar", &value).IsNotFound());
585 }
586 ASSERT_TRUE(db_->Get(ropt, handles_[1], "rocksdb", &value).ok());
587 ASSERT_EQ(value, "hello");
588
589 statuses =
590 db_->MultiGet(ropt, multiget_cfs, multiget_keys, &multiget_values);
591 ASSERT_TRUE(statuses[0].IsNotFound());
592 if (wopt.disableWAL) {
593 ASSERT_TRUE(statuses[1].ok());
594 ASSERT_EQ("one", multiget_values[1]);
595 } else {
596 ASSERT_TRUE(statuses[1].IsNotFound());
597 }
598 ASSERT_TRUE(statuses[2].ok());
599 ASSERT_EQ("hello", multiget_values[2]);
600 if (wopt.disableWAL == 0) {
601 DestroyAndReopen(options);
602 }
603 }
604 } while (ChangeOptions());
605 }
606
607 TEST_F(DBTest, SingleDeleteFlush) {
608 // Test to check whether flushing preserves a single delete hidden
609 // behind a put.
610 do {
611 Random rnd(301);
612
613 Options options = CurrentOptions();
614 options.disable_auto_compactions = true;
615 CreateAndReopenWithCF({"pikachu"}, options);
616
617 // Put values on second level (so that they will not be in the same
618 // compaction as the other operations.
619 Put(1, "foo", "first");
620 Put(1, "bar", "one");
621 ASSERT_OK(Flush(1));
622 MoveFilesToLevel(2, 1);
623
624 // (Single) delete hidden by a put
625 SingleDelete(1, "foo");
626 Put(1, "foo", "second");
627 Delete(1, "bar");
628 Put(1, "bar", "two");
629 ASSERT_OK(Flush(1));
630
631 SingleDelete(1, "foo");
632 Delete(1, "bar");
633 ASSERT_OK(Flush(1));
634
635 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
636 nullptr);
637
638 ASSERT_EQ("NOT_FOUND", Get(1, "bar"));
639 ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
640 // Skip FIFO and universal compaction beccause they do not apply to the test
641 // case. Skip MergePut because single delete does not get removed when it
642 // encounters a merge.
643 } while (ChangeOptions(kSkipFIFOCompaction | kSkipUniversalCompaction |
644 kSkipMergePut));
645 }
646
647 TEST_F(DBTest, SingleDeletePutFlush) {
648 // Single deletes that encounter the matching put in a flush should get
649 // removed.
650 do {
651 Random rnd(301);
652
653 Options options = CurrentOptions();
654 options.disable_auto_compactions = true;
655 CreateAndReopenWithCF({"pikachu"}, options);
656
657 Put(1, "foo", Slice());
658 Put(1, "a", Slice());
659 SingleDelete(1, "a");
660 ASSERT_OK(Flush(1));
661
662 ASSERT_EQ("[ ]", AllEntriesFor("a", 1));
663 // Skip FIFO and universal compaction beccause they do not apply to the test
664 // case. Skip MergePut because single delete does not get removed when it
665 // encounters a merge.
666 } while (ChangeOptions(kSkipFIFOCompaction | kSkipUniversalCompaction |
667 kSkipMergePut));
668 }
669
670 // Disable because not all platform can run it.
671 // It requires more than 9GB memory to run it, With single allocation
672 // of more than 3GB.
673 TEST_F(DBTest, DISABLED_SanitizeVeryVeryLargeValue) {
674 const size_t kValueSize = 4 * size_t{1024 * 1024 * 1024}; // 4GB value
675 std::string raw(kValueSize, 'v');
676 Options options = CurrentOptions();
677 options.env = env_;
678 options.merge_operator = MergeOperators::CreatePutOperator();
679 options.write_buffer_size = 100000; // Small write buffer
680 options.paranoid_checks = true;
681 DestroyAndReopen(options);
682
683 ASSERT_OK(Put("boo", "v1"));
684 ASSERT_TRUE(Put("foo", raw).IsInvalidArgument());
685 ASSERT_TRUE(Merge("foo", raw).IsInvalidArgument());
686
687 WriteBatch wb;
688 ASSERT_TRUE(wb.Put("foo", raw).IsInvalidArgument());
689 ASSERT_TRUE(wb.Merge("foo", raw).IsInvalidArgument());
690
691 Slice value_slice = raw;
692 Slice key_slice = "foo";
693 SliceParts sp_key(&key_slice, 1);
694 SliceParts sp_value(&value_slice, 1);
695
696 ASSERT_TRUE(wb.Put(sp_key, sp_value).IsInvalidArgument());
697 ASSERT_TRUE(wb.Merge(sp_key, sp_value).IsInvalidArgument());
698 }
699
700 // Disable because not all platform can run it.
701 // It requires more than 9GB memory to run it, With single allocation
702 // of more than 3GB.
703 TEST_F(DBTest, DISABLED_VeryLargeValue) {
704 const size_t kValueSize = 3221225472u; // 3GB value
705 const size_t kKeySize = 8388608u; // 8MB key
706 std::string raw(kValueSize, 'v');
707 std::string key1(kKeySize, 'c');
708 std::string key2(kKeySize, 'd');
709
710 Options options = CurrentOptions();
711 options.env = env_;
712 options.write_buffer_size = 100000; // Small write buffer
713 options.paranoid_checks = true;
714 DestroyAndReopen(options);
715
716 ASSERT_OK(Put("boo", "v1"));
717 ASSERT_OK(Put("foo", "v1"));
718 ASSERT_OK(Put(key1, raw));
719 raw[0] = 'w';
720 ASSERT_OK(Put(key2, raw));
721 dbfull()->TEST_WaitForFlushMemTable();
722
723 #ifndef ROCKSDB_LITE
724 ASSERT_EQ(1, NumTableFilesAtLevel(0));
725 #endif // !ROCKSDB_LITE
726
727 std::string value;
728 Status s = db_->Get(ReadOptions(), key1, &value);
729 ASSERT_OK(s);
730 ASSERT_EQ(kValueSize, value.size());
731 ASSERT_EQ('v', value[0]);
732
733 s = db_->Get(ReadOptions(), key2, &value);
734 ASSERT_OK(s);
735 ASSERT_EQ(kValueSize, value.size());
736 ASSERT_EQ('w', value[0]);
737
738 // Compact all files.
739 Flush();
740 db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
741
742 // Check DB is not in read-only state.
743 ASSERT_OK(Put("boo", "v1"));
744
745 s = db_->Get(ReadOptions(), key1, &value);
746 ASSERT_OK(s);
747 ASSERT_EQ(kValueSize, value.size());
748 ASSERT_EQ('v', value[0]);
749
750 s = db_->Get(ReadOptions(), key2, &value);
751 ASSERT_OK(s);
752 ASSERT_EQ(kValueSize, value.size());
753 ASSERT_EQ('w', value[0]);
754 }
755
756 TEST_F(DBTest, GetFromImmutableLayer) {
757 do {
758 Options options = CurrentOptions();
759 options.env = env_;
760 CreateAndReopenWithCF({"pikachu"}, options);
761
762 ASSERT_OK(Put(1, "foo", "v1"));
763 ASSERT_EQ("v1", Get(1, "foo"));
764
765 // Block sync calls
766 env_->delay_sstable_sync_.store(true, std::memory_order_release);
767 Put(1, "k1", std::string(100000, 'x')); // Fill memtable
768 Put(1, "k2", std::string(100000, 'y')); // Trigger flush
769 ASSERT_EQ("v1", Get(1, "foo"));
770 ASSERT_EQ("NOT_FOUND", Get(0, "foo"));
771 // Release sync calls
772 env_->delay_sstable_sync_.store(false, std::memory_order_release);
773 } while (ChangeOptions());
774 }
775
776
777 TEST_F(DBTest, GetLevel0Ordering) {
778 do {
779 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
780 // Check that we process level-0 files in correct order. The code
781 // below generates two level-0 files where the earlier one comes
782 // before the later one in the level-0 file list since the earlier
783 // one has a smaller "smallest" key.
784 ASSERT_OK(Put(1, "bar", "b"));
785 ASSERT_OK(Put(1, "foo", "v1"));
786 ASSERT_OK(Flush(1));
787 ASSERT_OK(Put(1, "foo", "v2"));
788 ASSERT_OK(Flush(1));
789 ASSERT_EQ("v2", Get(1, "foo"));
790 } while (ChangeOptions());
791 }
792
793 TEST_F(DBTest, WrongLevel0Config) {
794 Options options = CurrentOptions();
795 Close();
796 ASSERT_OK(DestroyDB(dbname_, options));
797 options.level0_stop_writes_trigger = 1;
798 options.level0_slowdown_writes_trigger = 2;
799 options.level0_file_num_compaction_trigger = 3;
800 ASSERT_OK(DB::Open(options, dbname_, &db_));
801 }
802
803 #ifndef ROCKSDB_LITE
804 TEST_F(DBTest, GetOrderedByLevels) {
805 do {
806 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
807 ASSERT_OK(Put(1, "foo", "v1"));
808 Compact(1, "a", "z");
809 ASSERT_EQ("v1", Get(1, "foo"));
810 ASSERT_OK(Put(1, "foo", "v2"));
811 ASSERT_EQ("v2", Get(1, "foo"));
812 ASSERT_OK(Flush(1));
813 ASSERT_EQ("v2", Get(1, "foo"));
814 } while (ChangeOptions());
815 }
816
817 TEST_F(DBTest, GetPicksCorrectFile) {
818 do {
819 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
820 // Arrange to have multiple files in a non-level-0 level.
821 ASSERT_OK(Put(1, "a", "va"));
822 Compact(1, "a", "b");
823 ASSERT_OK(Put(1, "x", "vx"));
824 Compact(1, "x", "y");
825 ASSERT_OK(Put(1, "f", "vf"));
826 Compact(1, "f", "g");
827 ASSERT_EQ("va", Get(1, "a"));
828 ASSERT_EQ("vf", Get(1, "f"));
829 ASSERT_EQ("vx", Get(1, "x"));
830 } while (ChangeOptions());
831 }
832
833 TEST_F(DBTest, GetEncountersEmptyLevel) {
834 do {
835 Options options = CurrentOptions();
836 CreateAndReopenWithCF({"pikachu"}, options);
837 // Arrange for the following to happen:
838 // * sstable A in level 0
839 // * nothing in level 1
840 // * sstable B in level 2
841 // Then do enough Get() calls to arrange for an automatic compaction
842 // of sstable A. A bug would cause the compaction to be marked as
843 // occurring at level 1 (instead of the correct level 0).
844
845 // Step 1: First place sstables in levels 0 and 2
846 Put(1, "a", "begin");
847 Put(1, "z", "end");
848 ASSERT_OK(Flush(1));
849 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
850 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
851 Put(1, "a", "begin");
852 Put(1, "z", "end");
853 ASSERT_OK(Flush(1));
854 ASSERT_GT(NumTableFilesAtLevel(0, 1), 0);
855 ASSERT_GT(NumTableFilesAtLevel(2, 1), 0);
856
857 // Step 2: clear level 1 if necessary.
858 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
859 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 1);
860 ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
861 ASSERT_EQ(NumTableFilesAtLevel(2, 1), 1);
862
863 // Step 3: read a bunch of times
864 for (int i = 0; i < 1000; i++) {
865 ASSERT_EQ("NOT_FOUND", Get(1, "missing"));
866 }
867
868 // Step 4: Wait for compaction to finish
869 dbfull()->TEST_WaitForCompact();
870
871 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 1); // XXX
872 } while (ChangeOptions(kSkipUniversalCompaction | kSkipFIFOCompaction));
873 }
874 #endif // ROCKSDB_LITE
875
876 TEST_F(DBTest, FlushMultipleMemtable) {
877 do {
878 Options options = CurrentOptions();
879 WriteOptions writeOpt = WriteOptions();
880 writeOpt.disableWAL = true;
881 options.max_write_buffer_number = 4;
882 options.min_write_buffer_number_to_merge = 3;
883 options.max_write_buffer_size_to_maintain = -1;
884 CreateAndReopenWithCF({"pikachu"}, options);
885 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
886 ASSERT_OK(Flush(1));
887 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
888
889 ASSERT_EQ("v1", Get(1, "foo"));
890 ASSERT_EQ("v1", Get(1, "bar"));
891 ASSERT_OK(Flush(1));
892 } while (ChangeCompactOptions());
893 }
894 #ifndef ROCKSDB_LITE
895 TEST_F(DBTest, FlushSchedule) {
896 Options options = CurrentOptions();
897 options.disable_auto_compactions = true;
898 options.level0_stop_writes_trigger = 1 << 10;
899 options.level0_slowdown_writes_trigger = 1 << 10;
900 options.min_write_buffer_number_to_merge = 1;
901 options.max_write_buffer_size_to_maintain =
902 static_cast<int64_t>(options.write_buffer_size);
903 options.max_write_buffer_number = 2;
904 options.write_buffer_size = 120 * 1024;
905 CreateAndReopenWithCF({"pikachu"}, options);
906 std::vector<port::Thread> threads;
907
908 std::atomic<int> thread_num(0);
909 // each column family will have 5 thread, each thread generating 2 memtables.
910 // each column family should end up with 10 table files
911 std::function<void()> fill_memtable_func = [&]() {
912 int a = thread_num.fetch_add(1);
913 Random rnd(a);
914 WriteOptions wo;
915 // this should fill up 2 memtables
916 for (int k = 0; k < 5000; ++k) {
917 ASSERT_OK(db_->Put(wo, handles_[a & 1], RandomString(&rnd, 13), ""));
918 }
919 };
920
921 for (int i = 0; i < 10; ++i) {
922 threads.emplace_back(fill_memtable_func);
923 }
924
925 for (auto& t : threads) {
926 t.join();
927 }
928
929 auto default_tables = GetNumberOfSstFilesForColumnFamily(db_, "default");
930 auto pikachu_tables = GetNumberOfSstFilesForColumnFamily(db_, "pikachu");
931 ASSERT_LE(default_tables, static_cast<uint64_t>(10));
932 ASSERT_GT(default_tables, static_cast<uint64_t>(0));
933 ASSERT_LE(pikachu_tables, static_cast<uint64_t>(10));
934 ASSERT_GT(pikachu_tables, static_cast<uint64_t>(0));
935 }
936 #endif // ROCKSDB_LITE
937
938 namespace {
939 class KeepFilter : public CompactionFilter {
940 public:
941 bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
942 std::string* /*new_value*/,
943 bool* /*value_changed*/) const override {
944 return false;
945 }
946
947 const char* Name() const override { return "KeepFilter"; }
948 };
949
950 class KeepFilterFactory : public CompactionFilterFactory {
951 public:
952 explicit KeepFilterFactory(bool check_context = false)
953 : check_context_(check_context) {}
954
955 std::unique_ptr<CompactionFilter> CreateCompactionFilter(
956 const CompactionFilter::Context& context) override {
957 if (check_context_) {
958 EXPECT_EQ(expect_full_compaction_.load(), context.is_full_compaction);
959 EXPECT_EQ(expect_manual_compaction_.load(), context.is_manual_compaction);
960 }
961 return std::unique_ptr<CompactionFilter>(new KeepFilter());
962 }
963
964 const char* Name() const override { return "KeepFilterFactory"; }
965 bool check_context_;
966 std::atomic_bool expect_full_compaction_;
967 std::atomic_bool expect_manual_compaction_;
968 };
969
970 class DelayFilter : public CompactionFilter {
971 public:
972 explicit DelayFilter(DBTestBase* d) : db_test(d) {}
973 bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
974 std::string* /*new_value*/,
975 bool* /*value_changed*/) const override {
976 db_test->env_->addon_time_.fetch_add(1000);
977 return true;
978 }
979
980 const char* Name() const override { return "DelayFilter"; }
981
982 private:
983 DBTestBase* db_test;
984 };
985
986 class DelayFilterFactory : public CompactionFilterFactory {
987 public:
988 explicit DelayFilterFactory(DBTestBase* d) : db_test(d) {}
989 std::unique_ptr<CompactionFilter> CreateCompactionFilter(
990 const CompactionFilter::Context& /*context*/) override {
991 return std::unique_ptr<CompactionFilter>(new DelayFilter(db_test));
992 }
993
994 const char* Name() const override { return "DelayFilterFactory"; }
995
996 private:
997 DBTestBase* db_test;
998 };
999 } // namespace
1000
1001 #ifndef ROCKSDB_LITE
1002
1003 static std::string CompressibleString(Random* rnd, int len) {
1004 std::string r;
1005 test::CompressibleString(rnd, 0.8, len, &r);
1006 return r;
1007 }
1008 #endif // ROCKSDB_LITE
1009
1010 TEST_F(DBTest, FailMoreDbPaths) {
1011 Options options = CurrentOptions();
1012 options.db_paths.emplace_back(dbname_, 10000000);
1013 options.db_paths.emplace_back(dbname_ + "_2", 1000000);
1014 options.db_paths.emplace_back(dbname_ + "_3", 1000000);
1015 options.db_paths.emplace_back(dbname_ + "_4", 1000000);
1016 options.db_paths.emplace_back(dbname_ + "_5", 1000000);
1017 ASSERT_TRUE(TryReopen(options).IsNotSupported());
1018 }
1019
1020 void CheckColumnFamilyMeta(
1021 const ColumnFamilyMetaData& cf_meta,
1022 const std::vector<std::vector<FileMetaData>>& files_by_level,
1023 uint64_t start_time, uint64_t end_time) {
1024 ASSERT_EQ(cf_meta.name, kDefaultColumnFamilyName);
1025 ASSERT_EQ(cf_meta.levels.size(), files_by_level.size());
1026
1027 uint64_t cf_size = 0;
1028 size_t file_count = 0;
1029
1030 for (size_t i = 0; i < cf_meta.levels.size(); ++i) {
1031 const auto& level_meta_from_cf = cf_meta.levels[i];
1032 const auto& level_meta_from_files = files_by_level[i];
1033
1034 ASSERT_EQ(level_meta_from_cf.level, i);
1035 ASSERT_EQ(level_meta_from_cf.files.size(), level_meta_from_files.size());
1036
1037 file_count += level_meta_from_cf.files.size();
1038
1039 uint64_t level_size = 0;
1040 for (size_t j = 0; j < level_meta_from_cf.files.size(); ++j) {
1041 const auto& file_meta_from_cf = level_meta_from_cf.files[j];
1042 const auto& file_meta_from_files = level_meta_from_files[j];
1043
1044 level_size += file_meta_from_cf.size;
1045
1046 ASSERT_EQ(file_meta_from_cf.file_number,
1047 file_meta_from_files.fd.GetNumber());
1048 ASSERT_EQ(file_meta_from_cf.file_number,
1049 TableFileNameToNumber(file_meta_from_cf.name));
1050 ASSERT_EQ(file_meta_from_cf.size, file_meta_from_files.fd.file_size);
1051 ASSERT_EQ(file_meta_from_cf.smallest_seqno,
1052 file_meta_from_files.fd.smallest_seqno);
1053 ASSERT_EQ(file_meta_from_cf.largest_seqno,
1054 file_meta_from_files.fd.largest_seqno);
1055 ASSERT_EQ(file_meta_from_cf.smallestkey,
1056 file_meta_from_files.smallest.user_key().ToString());
1057 ASSERT_EQ(file_meta_from_cf.largestkey,
1058 file_meta_from_files.largest.user_key().ToString());
1059 ASSERT_EQ(file_meta_from_cf.oldest_blob_file_number,
1060 file_meta_from_files.oldest_blob_file_number);
1061 ASSERT_EQ(file_meta_from_cf.oldest_ancester_time,
1062 file_meta_from_files.oldest_ancester_time);
1063 ASSERT_EQ(file_meta_from_cf.file_creation_time,
1064 file_meta_from_files.file_creation_time);
1065 ASSERT_GE(file_meta_from_cf.file_creation_time, start_time);
1066 ASSERT_LE(file_meta_from_cf.file_creation_time, end_time);
1067 ASSERT_GE(file_meta_from_cf.oldest_ancester_time, start_time);
1068 ASSERT_LE(file_meta_from_cf.oldest_ancester_time, end_time);
1069 }
1070
1071 ASSERT_EQ(level_meta_from_cf.size, level_size);
1072 cf_size += level_size;
1073 }
1074
1075 ASSERT_EQ(cf_meta.file_count, file_count);
1076 ASSERT_EQ(cf_meta.size, cf_size);
1077 }
1078
1079 void CheckLiveFilesMeta(
1080 const std::vector<LiveFileMetaData>& live_file_meta,
1081 const std::vector<std::vector<FileMetaData>>& files_by_level) {
1082 size_t total_file_count = 0;
1083 for (const auto& f : files_by_level) {
1084 total_file_count += f.size();
1085 }
1086
1087 ASSERT_EQ(live_file_meta.size(), total_file_count);
1088
1089 int level = 0;
1090 int i = 0;
1091
1092 for (const auto& meta : live_file_meta) {
1093 if (level != meta.level) {
1094 level = meta.level;
1095 i = 0;
1096 }
1097
1098 ASSERT_LT(i, files_by_level[level].size());
1099
1100 const auto& expected_meta = files_by_level[level][i];
1101
1102 ASSERT_EQ(meta.column_family_name, kDefaultColumnFamilyName);
1103 ASSERT_EQ(meta.file_number, expected_meta.fd.GetNumber());
1104 ASSERT_EQ(meta.file_number, TableFileNameToNumber(meta.name));
1105 ASSERT_EQ(meta.size, expected_meta.fd.file_size);
1106 ASSERT_EQ(meta.smallest_seqno, expected_meta.fd.smallest_seqno);
1107 ASSERT_EQ(meta.largest_seqno, expected_meta.fd.largest_seqno);
1108 ASSERT_EQ(meta.smallestkey, expected_meta.smallest.user_key().ToString());
1109 ASSERT_EQ(meta.largestkey, expected_meta.largest.user_key().ToString());
1110 ASSERT_EQ(meta.oldest_blob_file_number,
1111 expected_meta.oldest_blob_file_number);
1112
1113 ++i;
1114 }
1115 }
1116
1117 #ifndef ROCKSDB_LITE
1118 TEST_F(DBTest, MetaDataTest) {
1119 Options options = CurrentOptions();
1120 options.create_if_missing = true;
1121 options.disable_auto_compactions = true;
1122
1123 int64_t temp_time = 0;
1124 options.env->GetCurrentTime(&temp_time);
1125 uint64_t start_time = static_cast<uint64_t>(temp_time);
1126
1127 DestroyAndReopen(options);
1128
1129 Random rnd(301);
1130 int key_index = 0;
1131 for (int i = 0; i < 100; ++i) {
1132 // Add a single blob reference to each file
1133 std::string blob_index;
1134 BlobIndex::EncodeBlob(&blob_index, /* blob_file_number */ i + 1000,
1135 /* offset */ 1234, /* size */ 5678, kNoCompression);
1136
1137 WriteBatch batch;
1138 ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, Key(key_index),
1139 blob_index));
1140 ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
1141
1142 ++key_index;
1143
1144 // Fill up the rest of the file with random values.
1145 GenerateNewFile(&rnd, &key_index, /* nowait */ true);
1146
1147 Flush();
1148 }
1149
1150 std::vector<std::vector<FileMetaData>> files_by_level;
1151 dbfull()->TEST_GetFilesMetaData(db_->DefaultColumnFamily(), &files_by_level);
1152
1153 options.env->GetCurrentTime(&temp_time);
1154 uint64_t end_time = static_cast<uint64_t>(temp_time);
1155
1156 ColumnFamilyMetaData cf_meta;
1157 db_->GetColumnFamilyMetaData(&cf_meta);
1158 CheckColumnFamilyMeta(cf_meta, files_by_level, start_time, end_time);
1159
1160 std::vector<LiveFileMetaData> live_file_meta;
1161 db_->GetLiveFilesMetaData(&live_file_meta);
1162 CheckLiveFilesMeta(live_file_meta, files_by_level);
1163 }
1164
1165 namespace {
1166 void MinLevelHelper(DBTest* self, Options& options) {
1167 Random rnd(301);
1168
1169 for (int num = 0; num < options.level0_file_num_compaction_trigger - 1;
1170 num++) {
1171 std::vector<std::string> values;
1172 // Write 120KB (12 values, each 10K)
1173 for (int i = 0; i < 12; i++) {
1174 values.push_back(DBTestBase::RandomString(&rnd, 10000));
1175 ASSERT_OK(self->Put(DBTestBase::Key(i), values[i]));
1176 }
1177 self->dbfull()->TEST_WaitForFlushMemTable();
1178 ASSERT_EQ(self->NumTableFilesAtLevel(0), num + 1);
1179 }
1180
1181 // generate one more file in level-0, and should trigger level-0 compaction
1182 std::vector<std::string> values;
1183 for (int i = 0; i < 12; i++) {
1184 values.push_back(DBTestBase::RandomString(&rnd, 10000));
1185 ASSERT_OK(self->Put(DBTestBase::Key(i), values[i]));
1186 }
1187 self->dbfull()->TEST_WaitForCompact();
1188
1189 ASSERT_EQ(self->NumTableFilesAtLevel(0), 0);
1190 ASSERT_EQ(self->NumTableFilesAtLevel(1), 1);
1191 }
1192
1193 // returns false if the calling-Test should be skipped
1194 bool MinLevelToCompress(CompressionType& type, Options& options, int wbits,
1195 int lev, int strategy) {
1196 fprintf(stderr,
1197 "Test with compression options : window_bits = %d, level = %d, "
1198 "strategy = %d}\n",
1199 wbits, lev, strategy);
1200 options.write_buffer_size = 100 << 10; // 100KB
1201 options.arena_block_size = 4096;
1202 options.num_levels = 3;
1203 options.level0_file_num_compaction_trigger = 3;
1204 options.create_if_missing = true;
1205
1206 if (Snappy_Supported()) {
1207 type = kSnappyCompression;
1208 fprintf(stderr, "using snappy\n");
1209 } else if (Zlib_Supported()) {
1210 type = kZlibCompression;
1211 fprintf(stderr, "using zlib\n");
1212 } else if (BZip2_Supported()) {
1213 type = kBZip2Compression;
1214 fprintf(stderr, "using bzip2\n");
1215 } else if (LZ4_Supported()) {
1216 type = kLZ4Compression;
1217 fprintf(stderr, "using lz4\n");
1218 } else if (XPRESS_Supported()) {
1219 type = kXpressCompression;
1220 fprintf(stderr, "using xpress\n");
1221 } else if (ZSTD_Supported()) {
1222 type = kZSTD;
1223 fprintf(stderr, "using ZSTD\n");
1224 } else {
1225 fprintf(stderr, "skipping test, compression disabled\n");
1226 return false;
1227 }
1228 options.compression_per_level.resize(options.num_levels);
1229
1230 // do not compress L0
1231 for (int i = 0; i < 1; i++) {
1232 options.compression_per_level[i] = kNoCompression;
1233 }
1234 for (int i = 1; i < options.num_levels; i++) {
1235 options.compression_per_level[i] = type;
1236 }
1237 return true;
1238 }
1239 } // namespace
1240
1241 TEST_F(DBTest, MinLevelToCompress1) {
1242 Options options = CurrentOptions();
1243 CompressionType type = kSnappyCompression;
1244 if (!MinLevelToCompress(type, options, -14, -1, 0)) {
1245 return;
1246 }
1247 Reopen(options);
1248 MinLevelHelper(this, options);
1249
1250 // do not compress L0 and L1
1251 for (int i = 0; i < 2; i++) {
1252 options.compression_per_level[i] = kNoCompression;
1253 }
1254 for (int i = 2; i < options.num_levels; i++) {
1255 options.compression_per_level[i] = type;
1256 }
1257 DestroyAndReopen(options);
1258 MinLevelHelper(this, options);
1259 }
1260
1261 TEST_F(DBTest, MinLevelToCompress2) {
1262 Options options = CurrentOptions();
1263 CompressionType type = kSnappyCompression;
1264 if (!MinLevelToCompress(type, options, 15, -1, 0)) {
1265 return;
1266 }
1267 Reopen(options);
1268 MinLevelHelper(this, options);
1269
1270 // do not compress L0 and L1
1271 for (int i = 0; i < 2; i++) {
1272 options.compression_per_level[i] = kNoCompression;
1273 }
1274 for (int i = 2; i < options.num_levels; i++) {
1275 options.compression_per_level[i] = type;
1276 }
1277 DestroyAndReopen(options);
1278 MinLevelHelper(this, options);
1279 }
1280
1281 // This test may fail because of a legit case that multiple L0 files
1282 // are trivial moved to L1.
1283 TEST_F(DBTest, DISABLED_RepeatedWritesToSameKey) {
1284 do {
1285 Options options = CurrentOptions();
1286 options.env = env_;
1287 options.write_buffer_size = 100000; // Small write buffer
1288 CreateAndReopenWithCF({"pikachu"}, options);
1289
1290 // We must have at most one file per level except for level-0,
1291 // which may have up to kL0_StopWritesTrigger files.
1292 const int kMaxFiles =
1293 options.num_levels + options.level0_stop_writes_trigger;
1294
1295 Random rnd(301);
1296 std::string value =
1297 RandomString(&rnd, static_cast<int>(2 * options.write_buffer_size));
1298 for (int i = 0; i < 5 * kMaxFiles; i++) {
1299 ASSERT_OK(Put(1, "key", value));
1300 ASSERT_LE(TotalTableFiles(1), kMaxFiles);
1301 }
1302 } while (ChangeCompactOptions());
1303 }
1304 #endif // ROCKSDB_LITE
1305
1306 TEST_F(DBTest, SparseMerge) {
1307 do {
1308 Options options = CurrentOptions();
1309 options.compression = kNoCompression;
1310 CreateAndReopenWithCF({"pikachu"}, options);
1311
1312 FillLevels("A", "Z", 1);
1313
1314 // Suppose there is:
1315 // small amount of data with prefix A
1316 // large amount of data with prefix B
1317 // small amount of data with prefix C
1318 // and that recent updates have made small changes to all three prefixes.
1319 // Check that we do not do a compaction that merges all of B in one shot.
1320 const std::string value(1000, 'x');
1321 Put(1, "A", "va");
1322 // Write approximately 100MB of "B" values
1323 for (int i = 0; i < 100000; i++) {
1324 char key[100];
1325 snprintf(key, sizeof(key), "B%010d", i);
1326 Put(1, key, value);
1327 }
1328 Put(1, "C", "vc");
1329 ASSERT_OK(Flush(1));
1330 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
1331
1332 // Make sparse update
1333 Put(1, "A", "va2");
1334 Put(1, "B100", "bvalue2");
1335 Put(1, "C", "vc2");
1336 ASSERT_OK(Flush(1));
1337
1338 // Compactions should not cause us to create a situation where
1339 // a file overlaps too much data at the next level.
1340 ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(handles_[1]),
1341 20 * 1048576);
1342 dbfull()->TEST_CompactRange(0, nullptr, nullptr);
1343 ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(handles_[1]),
1344 20 * 1048576);
1345 dbfull()->TEST_CompactRange(1, nullptr, nullptr);
1346 ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(handles_[1]),
1347 20 * 1048576);
1348 } while (ChangeCompactOptions());
1349 }
1350
1351 #ifndef ROCKSDB_LITE
1352 static bool Between(uint64_t val, uint64_t low, uint64_t high) {
1353 bool result = (val >= low) && (val <= high);
1354 if (!result) {
1355 fprintf(stderr, "Value %llu is not in range [%llu, %llu]\n",
1356 (unsigned long long)(val), (unsigned long long)(low),
1357 (unsigned long long)(high));
1358 }
1359 return result;
1360 }
1361
1362 TEST_F(DBTest, ApproximateSizesMemTable) {
1363 Options options = CurrentOptions();
1364 options.write_buffer_size = 100000000; // Large write buffer
1365 options.compression = kNoCompression;
1366 options.create_if_missing = true;
1367 DestroyAndReopen(options);
1368 auto default_cf = db_->DefaultColumnFamily();
1369
1370 const int N = 128;
1371 Random rnd(301);
1372 for (int i = 0; i < N; i++) {
1373 ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024)));
1374 }
1375
1376 uint64_t size;
1377 std::string start = Key(50);
1378 std::string end = Key(60);
1379 Range r(start, end);
1380 SizeApproximationOptions size_approx_options;
1381 size_approx_options.include_memtabtles = true;
1382 size_approx_options.include_files = true;
1383 db_->GetApproximateSizes(size_approx_options, default_cf, &r, 1, &size);
1384 ASSERT_GT(size, 6000);
1385 ASSERT_LT(size, 204800);
1386 // Zero if not including mem table
1387 db_->GetApproximateSizes(&r, 1, &size);
1388 ASSERT_EQ(size, 0);
1389
1390 start = Key(500);
1391 end = Key(600);
1392 r = Range(start, end);
1393 db_->GetApproximateSizes(size_approx_options, default_cf, &r, 1, &size);
1394 ASSERT_EQ(size, 0);
1395
1396 for (int i = 0; i < N; i++) {
1397 ASSERT_OK(Put(Key(1000 + i), RandomString(&rnd, 1024)));
1398 }
1399
1400 start = Key(500);
1401 end = Key(600);
1402 r = Range(start, end);
1403 db_->GetApproximateSizes(size_approx_options, default_cf, &r, 1, &size);
1404 ASSERT_EQ(size, 0);
1405
1406 start = Key(100);
1407 end = Key(1020);
1408 r = Range(start, end);
1409 db_->GetApproximateSizes(size_approx_options, default_cf, &r, 1, &size);
1410 ASSERT_GT(size, 6000);
1411
1412 options.max_write_buffer_number = 8;
1413 options.min_write_buffer_number_to_merge = 5;
1414 options.write_buffer_size = 1024 * N; // Not very large
1415 DestroyAndReopen(options);
1416 default_cf = db_->DefaultColumnFamily();
1417
1418 int keys[N * 3];
1419 for (int i = 0; i < N; i++) {
1420 keys[i * 3] = i * 5;
1421 keys[i * 3 + 1] = i * 5 + 1;
1422 keys[i * 3 + 2] = i * 5 + 2;
1423 }
1424 std::random_shuffle(std::begin(keys), std::end(keys));
1425
1426 for (int i = 0; i < N * 3; i++) {
1427 ASSERT_OK(Put(Key(keys[i] + 1000), RandomString(&rnd, 1024)));
1428 }
1429
1430 start = Key(100);
1431 end = Key(300);
1432 r = Range(start, end);
1433 db_->GetApproximateSizes(size_approx_options, default_cf, &r, 1, &size);
1434 ASSERT_EQ(size, 0);
1435
1436 start = Key(1050);
1437 end = Key(1080);
1438 r = Range(start, end);
1439 db_->GetApproximateSizes(size_approx_options, default_cf, &r, 1, &size);
1440 ASSERT_GT(size, 6000);
1441
1442 start = Key(2100);
1443 end = Key(2300);
1444 r = Range(start, end);
1445 db_->GetApproximateSizes(size_approx_options, default_cf, &r, 1, &size);
1446 ASSERT_EQ(size, 0);
1447
1448 start = Key(1050);
1449 end = Key(1080);
1450 r = Range(start, end);
1451 uint64_t size_with_mt, size_without_mt;
1452 db_->GetApproximateSizes(size_approx_options, default_cf, &r, 1,
1453 &size_with_mt);
1454 ASSERT_GT(size_with_mt, 6000);
1455 db_->GetApproximateSizes(&r, 1, &size_without_mt);
1456 ASSERT_EQ(size_without_mt, 0);
1457
1458 Flush();
1459
1460 for (int i = 0; i < N; i++) {
1461 ASSERT_OK(Put(Key(i + 1000), RandomString(&rnd, 1024)));
1462 }
1463
1464 start = Key(1050);
1465 end = Key(1080);
1466 r = Range(start, end);
1467 db_->GetApproximateSizes(size_approx_options, default_cf, &r, 1,
1468 &size_with_mt);
1469 db_->GetApproximateSizes(&r, 1, &size_without_mt);
1470 ASSERT_GT(size_with_mt, size_without_mt);
1471 ASSERT_GT(size_without_mt, 6000);
1472
1473 // Check that include_memtabtles flag works as expected
1474 size_approx_options.include_memtabtles = false;
1475 db_->GetApproximateSizes(size_approx_options, default_cf, &r, 1, &size);
1476 ASSERT_EQ(size, size_without_mt);
1477
1478 // Check that files_size_error_margin works as expected, when the heuristic
1479 // conditions are not met
1480 start = Key(1);
1481 end = Key(1000 + N - 2);
1482 r = Range(start, end);
1483 size_approx_options.files_size_error_margin = -1.0; // disabled
1484 db_->GetApproximateSizes(size_approx_options, default_cf, &r, 1, &size);
1485 uint64_t size2;
1486 size_approx_options.files_size_error_margin = 0.5; // enabled, but not used
1487 db_->GetApproximateSizes(size_approx_options, default_cf, &r, 1, &size2);
1488 ASSERT_EQ(size, size2);
1489 }
1490
1491 TEST_F(DBTest, ApproximateSizesFilesWithErrorMargin) {
1492 Options options = CurrentOptions();
1493 options.write_buffer_size = 1024 * 1024;
1494 options.compression = kNoCompression;
1495 options.create_if_missing = true;
1496 options.target_file_size_base = 1024 * 1024;
1497 DestroyAndReopen(options);
1498 const auto default_cf = db_->DefaultColumnFamily();
1499
1500 const int N = 64000;
1501 Random rnd(301);
1502 for (int i = 0; i < N; i++) {
1503 ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024)));
1504 }
1505 // Flush everything to files
1506 Flush();
1507 // Compact the entire key space into the next level
1508 db_->CompactRange(CompactRangeOptions(), default_cf, nullptr, nullptr);
1509
1510 // Write more keys
1511 for (int i = N; i < (N + N / 4); i++) {
1512 ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024)));
1513 }
1514 // Flush everything to files again
1515 Flush();
1516
1517 // Wait for compaction to finish
1518 ASSERT_OK(dbfull()->TEST_WaitForCompact());
1519
1520 const std::string start = Key(0);
1521 const std::string end = Key(2 * N);
1522 const Range r(start, end);
1523
1524 SizeApproximationOptions size_approx_options;
1525 size_approx_options.include_memtabtles = false;
1526 size_approx_options.include_files = true;
1527 size_approx_options.files_size_error_margin = -1.0; // disabled
1528
1529 // Get the precise size without any approximation heuristic
1530 uint64_t size;
1531 db_->GetApproximateSizes(size_approx_options, default_cf, &r, 1, &size);
1532 ASSERT_NE(size, 0);
1533
1534 // Get the size with an approximation heuristic
1535 uint64_t size2;
1536 const double error_margin = 0.2;
1537 size_approx_options.files_size_error_margin = error_margin;
1538 db_->GetApproximateSizes(size_approx_options, default_cf, &r, 1, &size2);
1539 ASSERT_LT(size2, size * (1 + error_margin));
1540 ASSERT_GT(size2, size * (1 - error_margin));
1541 }
1542
1543 TEST_F(DBTest, GetApproximateMemTableStats) {
1544 Options options = CurrentOptions();
1545 options.write_buffer_size = 100000000;
1546 options.compression = kNoCompression;
1547 options.create_if_missing = true;
1548 DestroyAndReopen(options);
1549
1550 const int N = 128;
1551 Random rnd(301);
1552 for (int i = 0; i < N; i++) {
1553 ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024)));
1554 }
1555
1556 uint64_t count;
1557 uint64_t size;
1558
1559 std::string start = Key(50);
1560 std::string end = Key(60);
1561 Range r(start, end);
1562 db_->GetApproximateMemTableStats(r, &count, &size);
1563 ASSERT_GT(count, 0);
1564 ASSERT_LE(count, N);
1565 ASSERT_GT(size, 6000);
1566 ASSERT_LT(size, 204800);
1567
1568 start = Key(500);
1569 end = Key(600);
1570 r = Range(start, end);
1571 db_->GetApproximateMemTableStats(r, &count, &size);
1572 ASSERT_EQ(count, 0);
1573 ASSERT_EQ(size, 0);
1574
1575 Flush();
1576
1577 start = Key(50);
1578 end = Key(60);
1579 r = Range(start, end);
1580 db_->GetApproximateMemTableStats(r, &count, &size);
1581 ASSERT_EQ(count, 0);
1582 ASSERT_EQ(size, 0);
1583
1584 for (int i = 0; i < N; i++) {
1585 ASSERT_OK(Put(Key(1000 + i), RandomString(&rnd, 1024)));
1586 }
1587
1588 start = Key(100);
1589 end = Key(1020);
1590 r = Range(start, end);
1591 db_->GetApproximateMemTableStats(r, &count, &size);
1592 ASSERT_GT(count, 20);
1593 ASSERT_GT(size, 6000);
1594 }
1595
1596 TEST_F(DBTest, ApproximateSizes) {
1597 do {
1598 Options options = CurrentOptions();
1599 options.write_buffer_size = 100000000; // Large write buffer
1600 options.compression = kNoCompression;
1601 options.create_if_missing = true;
1602 DestroyAndReopen(options);
1603 CreateAndReopenWithCF({"pikachu"}, options);
1604
1605 ASSERT_TRUE(Between(Size("", "xyz", 1), 0, 0));
1606 ReopenWithColumnFamilies({"default", "pikachu"}, options);
1607 ASSERT_TRUE(Between(Size("", "xyz", 1), 0, 0));
1608
1609 // Write 8MB (80 values, each 100K)
1610 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
1611 const int N = 80;
1612 static const int S1 = 100000;
1613 static const int S2 = 105000; // Allow some expansion from metadata
1614 Random rnd(301);
1615 for (int i = 0; i < N; i++) {
1616 ASSERT_OK(Put(1, Key(i), RandomString(&rnd, S1)));
1617 }
1618
1619 // 0 because GetApproximateSizes() does not account for memtable space
1620 ASSERT_TRUE(Between(Size("", Key(50), 1), 0, 0));
1621
1622 // Check sizes across recovery by reopening a few times
1623 for (int run = 0; run < 3; run++) {
1624 ReopenWithColumnFamilies({"default", "pikachu"}, options);
1625
1626 for (int compact_start = 0; compact_start < N; compact_start += 10) {
1627 for (int i = 0; i < N; i += 10) {
1628 ASSERT_TRUE(Between(Size("", Key(i), 1), S1 * i, S2 * i));
1629 ASSERT_TRUE(Between(Size("", Key(i) + ".suffix", 1), S1 * (i + 1),
1630 S2 * (i + 1)));
1631 ASSERT_TRUE(Between(Size(Key(i), Key(i + 10), 1), S1 * 10, S2 * 10));
1632 }
1633 ASSERT_TRUE(Between(Size("", Key(50), 1), S1 * 50, S2 * 50));
1634 ASSERT_TRUE(
1635 Between(Size("", Key(50) + ".suffix", 1), S1 * 50, S2 * 50));
1636
1637 std::string cstart_str = Key(compact_start);
1638 std::string cend_str = Key(compact_start + 9);
1639 Slice cstart = cstart_str;
1640 Slice cend = cend_str;
1641 dbfull()->TEST_CompactRange(0, &cstart, &cend, handles_[1]);
1642 }
1643
1644 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
1645 ASSERT_GT(NumTableFilesAtLevel(1, 1), 0);
1646 }
1647 // ApproximateOffsetOf() is not yet implemented in plain table format.
1648 } while (ChangeOptions(kSkipUniversalCompaction | kSkipFIFOCompaction |
1649 kSkipPlainTable | kSkipHashIndex));
1650 }
1651
1652 TEST_F(DBTest, ApproximateSizes_MixOfSmallAndLarge) {
1653 do {
1654 Options options = CurrentOptions();
1655 options.compression = kNoCompression;
1656 CreateAndReopenWithCF({"pikachu"}, options);
1657
1658 Random rnd(301);
1659 std::string big1 = RandomString(&rnd, 100000);
1660 ASSERT_OK(Put(1, Key(0), RandomString(&rnd, 10000)));
1661 ASSERT_OK(Put(1, Key(1), RandomString(&rnd, 10000)));
1662 ASSERT_OK(Put(1, Key(2), big1));
1663 ASSERT_OK(Put(1, Key(3), RandomString(&rnd, 10000)));
1664 ASSERT_OK(Put(1, Key(4), big1));
1665 ASSERT_OK(Put(1, Key(5), RandomString(&rnd, 10000)));
1666 ASSERT_OK(Put(1, Key(6), RandomString(&rnd, 300000)));
1667 ASSERT_OK(Put(1, Key(7), RandomString(&rnd, 10000)));
1668
1669 // Check sizes across recovery by reopening a few times
1670 for (int run = 0; run < 3; run++) {
1671 ReopenWithColumnFamilies({"default", "pikachu"}, options);
1672
1673 ASSERT_TRUE(Between(Size("", Key(0), 1), 0, 0));
1674 ASSERT_TRUE(Between(Size("", Key(1), 1), 10000, 11000));
1675 ASSERT_TRUE(Between(Size("", Key(2), 1), 20000, 21000));
1676 ASSERT_TRUE(Between(Size("", Key(3), 1), 120000, 121000));
1677 ASSERT_TRUE(Between(Size("", Key(4), 1), 130000, 131000));
1678 ASSERT_TRUE(Between(Size("", Key(5), 1), 230000, 231000));
1679 ASSERT_TRUE(Between(Size("", Key(6), 1), 240000, 241000));
1680 ASSERT_TRUE(Between(Size("", Key(7), 1), 540000, 541000));
1681 ASSERT_TRUE(Between(Size("", Key(8), 1), 550000, 560000));
1682
1683 ASSERT_TRUE(Between(Size(Key(3), Key(5), 1), 110000, 111000));
1684
1685 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
1686 }
1687 // ApproximateOffsetOf() is not yet implemented in plain table format.
1688 } while (ChangeOptions(kSkipPlainTable));
1689 }
1690 #endif // ROCKSDB_LITE
1691
1692 #ifndef ROCKSDB_LITE
1693 TEST_F(DBTest, Snapshot) {
1694 anon::OptionsOverride options_override;
1695 options_override.skip_policy = kSkipNoSnapshot;
1696 do {
1697 CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override));
1698 Put(0, "foo", "0v1");
1699 Put(1, "foo", "1v1");
1700
1701 const Snapshot* s1 = db_->GetSnapshot();
1702 ASSERT_EQ(1U, GetNumSnapshots());
1703 uint64_t time_snap1 = GetTimeOldestSnapshots();
1704 ASSERT_GT(time_snap1, 0U);
1705 ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
1706 Put(0, "foo", "0v2");
1707 Put(1, "foo", "1v2");
1708
1709 env_->addon_time_.fetch_add(1);
1710
1711 const Snapshot* s2 = db_->GetSnapshot();
1712 ASSERT_EQ(2U, GetNumSnapshots());
1713 ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
1714 ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
1715 Put(0, "foo", "0v3");
1716 Put(1, "foo", "1v3");
1717
1718 {
1719 ManagedSnapshot s3(db_);
1720 ASSERT_EQ(3U, GetNumSnapshots());
1721 ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
1722 ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
1723
1724 Put(0, "foo", "0v4");
1725 Put(1, "foo", "1v4");
1726 ASSERT_EQ("0v1", Get(0, "foo", s1));
1727 ASSERT_EQ("1v1", Get(1, "foo", s1));
1728 ASSERT_EQ("0v2", Get(0, "foo", s2));
1729 ASSERT_EQ("1v2", Get(1, "foo", s2));
1730 ASSERT_EQ("0v3", Get(0, "foo", s3.snapshot()));
1731 ASSERT_EQ("1v3", Get(1, "foo", s3.snapshot()));
1732 ASSERT_EQ("0v4", Get(0, "foo"));
1733 ASSERT_EQ("1v4", Get(1, "foo"));
1734 }
1735
1736 ASSERT_EQ(2U, GetNumSnapshots());
1737 ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
1738 ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
1739 ASSERT_EQ("0v1", Get(0, "foo", s1));
1740 ASSERT_EQ("1v1", Get(1, "foo", s1));
1741 ASSERT_EQ("0v2", Get(0, "foo", s2));
1742 ASSERT_EQ("1v2", Get(1, "foo", s2));
1743 ASSERT_EQ("0v4", Get(0, "foo"));
1744 ASSERT_EQ("1v4", Get(1, "foo"));
1745
1746 db_->ReleaseSnapshot(s1);
1747 ASSERT_EQ("0v2", Get(0, "foo", s2));
1748 ASSERT_EQ("1v2", Get(1, "foo", s2));
1749 ASSERT_EQ("0v4", Get(0, "foo"));
1750 ASSERT_EQ("1v4", Get(1, "foo"));
1751 ASSERT_EQ(1U, GetNumSnapshots());
1752 ASSERT_LT(time_snap1, GetTimeOldestSnapshots());
1753 ASSERT_EQ(GetSequenceOldestSnapshots(), s2->GetSequenceNumber());
1754
1755 db_->ReleaseSnapshot(s2);
1756 ASSERT_EQ(0U, GetNumSnapshots());
1757 ASSERT_EQ(GetSequenceOldestSnapshots(), 0);
1758 ASSERT_EQ("0v4", Get(0, "foo"));
1759 ASSERT_EQ("1v4", Get(1, "foo"));
1760 } while (ChangeOptions());
1761 }
1762
1763 TEST_F(DBTest, HiddenValuesAreRemoved) {
1764 anon::OptionsOverride options_override;
1765 options_override.skip_policy = kSkipNoSnapshot;
1766 do {
1767 Options options = CurrentOptions(options_override);
1768 CreateAndReopenWithCF({"pikachu"}, options);
1769 Random rnd(301);
1770 FillLevels("a", "z", 1);
1771
1772 std::string big = RandomString(&rnd, 50000);
1773 Put(1, "foo", big);
1774 Put(1, "pastfoo", "v");
1775 const Snapshot* snapshot = db_->GetSnapshot();
1776 Put(1, "foo", "tiny");
1777 Put(1, "pastfoo2", "v2"); // Advance sequence number one more
1778
1779 ASSERT_OK(Flush(1));
1780 ASSERT_GT(NumTableFilesAtLevel(0, 1), 0);
1781
1782 ASSERT_EQ(big, Get(1, "foo", snapshot));
1783 ASSERT_TRUE(Between(Size("", "pastfoo", 1), 50000, 60000));
1784 db_->ReleaseSnapshot(snapshot);
1785 ASSERT_EQ(AllEntriesFor("foo", 1), "[ tiny, " + big + " ]");
1786 Slice x("x");
1787 dbfull()->TEST_CompactRange(0, nullptr, &x, handles_[1]);
1788 ASSERT_EQ(AllEntriesFor("foo", 1), "[ tiny ]");
1789 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
1790 ASSERT_GE(NumTableFilesAtLevel(1, 1), 1);
1791 dbfull()->TEST_CompactRange(1, nullptr, &x, handles_[1]);
1792 ASSERT_EQ(AllEntriesFor("foo", 1), "[ tiny ]");
1793
1794 ASSERT_TRUE(Between(Size("", "pastfoo", 1), 0, 1000));
1795 // ApproximateOffsetOf() is not yet implemented in plain table format,
1796 // which is used by Size().
1797 } while (ChangeOptions(kSkipUniversalCompaction | kSkipFIFOCompaction |
1798 kSkipPlainTable));
1799 }
1800 #endif // ROCKSDB_LITE
1801
1802 TEST_F(DBTest, UnremovableSingleDelete) {
1803 // If we compact:
1804 //
1805 // Put(A, v1) Snapshot SingleDelete(A) Put(A, v2)
1806 //
1807 // We do not want to end up with:
1808 //
1809 // Put(A, v1) Snapshot Put(A, v2)
1810 //
1811 // Because a subsequent SingleDelete(A) would delete the Put(A, v2)
1812 // but not Put(A, v1), so Get(A) would return v1.
1813 anon::OptionsOverride options_override;
1814 options_override.skip_policy = kSkipNoSnapshot;
1815 do {
1816 Options options = CurrentOptions(options_override);
1817 options.disable_auto_compactions = true;
1818 CreateAndReopenWithCF({"pikachu"}, options);
1819
1820 Put(1, "foo", "first");
1821 const Snapshot* snapshot = db_->GetSnapshot();
1822 SingleDelete(1, "foo");
1823 Put(1, "foo", "second");
1824 ASSERT_OK(Flush(1));
1825
1826 ASSERT_EQ("first", Get(1, "foo", snapshot));
1827 ASSERT_EQ("second", Get(1, "foo"));
1828
1829 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
1830 nullptr);
1831 ASSERT_EQ("[ second, SDEL, first ]", AllEntriesFor("foo", 1));
1832
1833 SingleDelete(1, "foo");
1834
1835 ASSERT_EQ("first", Get(1, "foo", snapshot));
1836 ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
1837
1838 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
1839 nullptr);
1840
1841 ASSERT_EQ("first", Get(1, "foo", snapshot));
1842 ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
1843 db_->ReleaseSnapshot(snapshot);
1844 // Skip FIFO and universal compaction beccause they do not apply to the test
1845 // case. Skip MergePut because single delete does not get removed when it
1846 // encounters a merge.
1847 } while (ChangeOptions(kSkipFIFOCompaction | kSkipUniversalCompaction |
1848 kSkipMergePut));
1849 }
1850
1851 #ifndef ROCKSDB_LITE
1852 TEST_F(DBTest, DeletionMarkers1) {
1853 Options options = CurrentOptions();
1854 CreateAndReopenWithCF({"pikachu"}, options);
1855 Put(1, "foo", "v1");
1856 ASSERT_OK(Flush(1));
1857 const int last = 2;
1858 MoveFilesToLevel(last, 1);
1859 // foo => v1 is now in last level
1860 ASSERT_EQ(NumTableFilesAtLevel(last, 1), 1);
1861
1862 // Place a table at level last-1 to prevent merging with preceding mutation
1863 Put(1, "a", "begin");
1864 Put(1, "z", "end");
1865 Flush(1);
1866 MoveFilesToLevel(last - 1, 1);
1867 ASSERT_EQ(NumTableFilesAtLevel(last, 1), 1);
1868 ASSERT_EQ(NumTableFilesAtLevel(last - 1, 1), 1);
1869
1870 Delete(1, "foo");
1871 Put(1, "foo", "v2");
1872 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, DEL, v1 ]");
1873 ASSERT_OK(Flush(1)); // Moves to level last-2
1874 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, v1 ]");
1875 Slice z("z");
1876 dbfull()->TEST_CompactRange(last - 2, nullptr, &z, handles_[1]);
1877 // DEL eliminated, but v1 remains because we aren't compacting that level
1878 // (DEL can be eliminated because v2 hides v1).
1879 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, v1 ]");
1880 dbfull()->TEST_CompactRange(last - 1, nullptr, nullptr, handles_[1]);
1881 // Merging last-1 w/ last, so we are the base level for "foo", so
1882 // DEL is removed. (as is v1).
1883 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2 ]");
1884 }
1885
1886 TEST_F(DBTest, DeletionMarkers2) {
1887 Options options = CurrentOptions();
1888 CreateAndReopenWithCF({"pikachu"}, options);
1889 Put(1, "foo", "v1");
1890 ASSERT_OK(Flush(1));
1891 const int last = 2;
1892 MoveFilesToLevel(last, 1);
1893 // foo => v1 is now in last level
1894 ASSERT_EQ(NumTableFilesAtLevel(last, 1), 1);
1895
1896 // Place a table at level last-1 to prevent merging with preceding mutation
1897 Put(1, "a", "begin");
1898 Put(1, "z", "end");
1899 Flush(1);
1900 MoveFilesToLevel(last - 1, 1);
1901 ASSERT_EQ(NumTableFilesAtLevel(last, 1), 1);
1902 ASSERT_EQ(NumTableFilesAtLevel(last - 1, 1), 1);
1903
1904 Delete(1, "foo");
1905 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v1 ]");
1906 ASSERT_OK(Flush(1)); // Moves to level last-2
1907 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v1 ]");
1908 dbfull()->TEST_CompactRange(last - 2, nullptr, nullptr, handles_[1]);
1909 // DEL kept: "last" file overlaps
1910 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v1 ]");
1911 dbfull()->TEST_CompactRange(last - 1, nullptr, nullptr, handles_[1]);
1912 // Merging last-1 w/ last, so we are the base level for "foo", so
1913 // DEL is removed. (as is v1).
1914 ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
1915 }
1916
1917 TEST_F(DBTest, OverlapInLevel0) {
1918 do {
1919 Options options = CurrentOptions();
1920 CreateAndReopenWithCF({"pikachu"}, options);
1921
1922 // Fill levels 1 and 2 to disable the pushing of new memtables to levels >
1923 // 0.
1924 ASSERT_OK(Put(1, "100", "v100"));
1925 ASSERT_OK(Put(1, "999", "v999"));
1926 Flush(1);
1927 MoveFilesToLevel(2, 1);
1928 ASSERT_OK(Delete(1, "100"));
1929 ASSERT_OK(Delete(1, "999"));
1930 Flush(1);
1931 MoveFilesToLevel(1, 1);
1932 ASSERT_EQ("0,1,1", FilesPerLevel(1));
1933
1934 // Make files spanning the following ranges in level-0:
1935 // files[0] 200 .. 900
1936 // files[1] 300 .. 500
1937 // Note that files are sorted by smallest key.
1938 ASSERT_OK(Put(1, "300", "v300"));
1939 ASSERT_OK(Put(1, "500", "v500"));
1940 Flush(1);
1941 ASSERT_OK(Put(1, "200", "v200"));
1942 ASSERT_OK(Put(1, "600", "v600"));
1943 ASSERT_OK(Put(1, "900", "v900"));
1944 Flush(1);
1945 ASSERT_EQ("2,1,1", FilesPerLevel(1));
1946
1947 // Compact away the placeholder files we created initially
1948 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
1949 dbfull()->TEST_CompactRange(2, nullptr, nullptr, handles_[1]);
1950 ASSERT_EQ("2", FilesPerLevel(1));
1951
1952 // Do a memtable compaction. Before bug-fix, the compaction would
1953 // not detect the overlap with level-0 files and would incorrectly place
1954 // the deletion in a deeper level.
1955 ASSERT_OK(Delete(1, "600"));
1956 Flush(1);
1957 ASSERT_EQ("3", FilesPerLevel(1));
1958 ASSERT_EQ("NOT_FOUND", Get(1, "600"));
1959 } while (ChangeOptions(kSkipUniversalCompaction | kSkipFIFOCompaction));
1960 }
1961 #endif // ROCKSDB_LITE
1962
1963 TEST_F(DBTest, ComparatorCheck) {
1964 class NewComparator : public Comparator {
1965 public:
1966 const char* Name() const override { return "rocksdb.NewComparator"; }
1967 int Compare(const Slice& a, const Slice& b) const override {
1968 return BytewiseComparator()->Compare(a, b);
1969 }
1970 void FindShortestSeparator(std::string* s, const Slice& l) const override {
1971 BytewiseComparator()->FindShortestSeparator(s, l);
1972 }
1973 void FindShortSuccessor(std::string* key) const override {
1974 BytewiseComparator()->FindShortSuccessor(key);
1975 }
1976 };
1977 Options new_options, options;
1978 NewComparator cmp;
1979 do {
1980 options = CurrentOptions();
1981 CreateAndReopenWithCF({"pikachu"}, options);
1982 new_options = CurrentOptions();
1983 new_options.comparator = &cmp;
1984 // only the non-default column family has non-matching comparator
1985 Status s = TryReopenWithColumnFamilies(
1986 {"default", "pikachu"}, std::vector<Options>({options, new_options}));
1987 ASSERT_TRUE(!s.ok());
1988 ASSERT_TRUE(s.ToString().find("comparator") != std::string::npos)
1989 << s.ToString();
1990 } while (ChangeCompactOptions());
1991 }
1992
1993 TEST_F(DBTest, CustomComparator) {
1994 class NumberComparator : public Comparator {
1995 public:
1996 const char* Name() const override { return "test.NumberComparator"; }
1997 int Compare(const Slice& a, const Slice& b) const override {
1998 return ToNumber(a) - ToNumber(b);
1999 }
2000 void FindShortestSeparator(std::string* s, const Slice& l) const override {
2001 ToNumber(*s); // Check format
2002 ToNumber(l); // Check format
2003 }
2004 void FindShortSuccessor(std::string* key) const override {
2005 ToNumber(*key); // Check format
2006 }
2007
2008 private:
2009 static int ToNumber(const Slice& x) {
2010 // Check that there are no extra characters.
2011 EXPECT_TRUE(x.size() >= 2 && x[0] == '[' && x[x.size() - 1] == ']')
2012 << EscapeString(x);
2013 int val;
2014 char ignored;
2015 EXPECT_TRUE(sscanf(x.ToString().c_str(), "[%i]%c", &val, &ignored) == 1)
2016 << EscapeString(x);
2017 return val;
2018 }
2019 };
2020 Options new_options;
2021 NumberComparator cmp;
2022 do {
2023 new_options = CurrentOptions();
2024 new_options.create_if_missing = true;
2025 new_options.comparator = &cmp;
2026 new_options.write_buffer_size = 4096; // Compact more often
2027 new_options.arena_block_size = 4096;
2028 new_options = CurrentOptions(new_options);
2029 DestroyAndReopen(new_options);
2030 CreateAndReopenWithCF({"pikachu"}, new_options);
2031 ASSERT_OK(Put(1, "[10]", "ten"));
2032 ASSERT_OK(Put(1, "[0x14]", "twenty"));
2033 for (int i = 0; i < 2; i++) {
2034 ASSERT_EQ("ten", Get(1, "[10]"));
2035 ASSERT_EQ("ten", Get(1, "[0xa]"));
2036 ASSERT_EQ("twenty", Get(1, "[20]"));
2037 ASSERT_EQ("twenty", Get(1, "[0x14]"));
2038 ASSERT_EQ("NOT_FOUND", Get(1, "[15]"));
2039 ASSERT_EQ("NOT_FOUND", Get(1, "[0xf]"));
2040 Compact(1, "[0]", "[9999]");
2041 }
2042
2043 for (int run = 0; run < 2; run++) {
2044 for (int i = 0; i < 1000; i++) {
2045 char buf[100];
2046 snprintf(buf, sizeof(buf), "[%d]", i * 10);
2047 ASSERT_OK(Put(1, buf, buf));
2048 }
2049 Compact(1, "[0]", "[1000000]");
2050 }
2051 } while (ChangeCompactOptions());
2052 }
2053
2054 TEST_F(DBTest, DBOpen_Options) {
2055 Options options = CurrentOptions();
2056 std::string dbname = test::PerThreadDBPath("db_options_test");
2057 ASSERT_OK(DestroyDB(dbname, options));
2058
2059 // Does not exist, and create_if_missing == false: error
2060 DB* db = nullptr;
2061 options.create_if_missing = false;
2062 Status s = DB::Open(options, dbname, &db);
2063 ASSERT_TRUE(strstr(s.ToString().c_str(), "does not exist") != nullptr);
2064 ASSERT_TRUE(db == nullptr);
2065
2066 // Does not exist, and create_if_missing == true: OK
2067 options.create_if_missing = true;
2068 s = DB::Open(options, dbname, &db);
2069 ASSERT_OK(s);
2070 ASSERT_TRUE(db != nullptr);
2071
2072 delete db;
2073 db = nullptr;
2074
2075 // Does exist, and error_if_exists == true: error
2076 options.create_if_missing = false;
2077 options.error_if_exists = true;
2078 s = DB::Open(options, dbname, &db);
2079 ASSERT_TRUE(strstr(s.ToString().c_str(), "exists") != nullptr);
2080 ASSERT_TRUE(db == nullptr);
2081
2082 // Does exist, and error_if_exists == false: OK
2083 options.create_if_missing = true;
2084 options.error_if_exists = false;
2085 s = DB::Open(options, dbname, &db);
2086 ASSERT_OK(s);
2087 ASSERT_TRUE(db != nullptr);
2088
2089 delete db;
2090 db = nullptr;
2091 }
2092
2093 TEST_F(DBTest, DBOpen_Change_NumLevels) {
2094 Options options = CurrentOptions();
2095 options.create_if_missing = true;
2096 DestroyAndReopen(options);
2097 ASSERT_TRUE(db_ != nullptr);
2098 CreateAndReopenWithCF({"pikachu"}, options);
2099
2100 ASSERT_OK(Put(1, "a", "123"));
2101 ASSERT_OK(Put(1, "b", "234"));
2102 Flush(1);
2103 MoveFilesToLevel(3, 1);
2104 Close();
2105
2106 options.create_if_missing = false;
2107 options.num_levels = 2;
2108 Status s = TryReopenWithColumnFamilies({"default", "pikachu"}, options);
2109 ASSERT_TRUE(strstr(s.ToString().c_str(), "Invalid argument") != nullptr);
2110 ASSERT_TRUE(db_ == nullptr);
2111 }
2112
2113 TEST_F(DBTest, DestroyDBMetaDatabase) {
2114 std::string dbname = test::PerThreadDBPath("db_meta");
2115 ASSERT_OK(env_->CreateDirIfMissing(dbname));
2116 std::string metadbname = MetaDatabaseName(dbname, 0);
2117 ASSERT_OK(env_->CreateDirIfMissing(metadbname));
2118 std::string metametadbname = MetaDatabaseName(metadbname, 0);
2119 ASSERT_OK(env_->CreateDirIfMissing(metametadbname));
2120
2121 // Destroy previous versions if they exist. Using the long way.
2122 Options options = CurrentOptions();
2123 ASSERT_OK(DestroyDB(metametadbname, options));
2124 ASSERT_OK(DestroyDB(metadbname, options));
2125 ASSERT_OK(DestroyDB(dbname, options));
2126
2127 // Setup databases
2128 DB* db = nullptr;
2129 ASSERT_OK(DB::Open(options, dbname, &db));
2130 delete db;
2131 db = nullptr;
2132 ASSERT_OK(DB::Open(options, metadbname, &db));
2133 delete db;
2134 db = nullptr;
2135 ASSERT_OK(DB::Open(options, metametadbname, &db));
2136 delete db;
2137 db = nullptr;
2138
2139 // Delete databases
2140 ASSERT_OK(DestroyDB(dbname, options));
2141
2142 // Check if deletion worked.
2143 options.create_if_missing = false;
2144 ASSERT_TRUE(!(DB::Open(options, dbname, &db)).ok());
2145 ASSERT_TRUE(!(DB::Open(options, metadbname, &db)).ok());
2146 ASSERT_TRUE(!(DB::Open(options, metametadbname, &db)).ok());
2147 }
2148
2149 #ifndef ROCKSDB_LITE
2150 TEST_F(DBTest, SnapshotFiles) {
2151 do {
2152 Options options = CurrentOptions();
2153 options.write_buffer_size = 100000000; // Large write buffer
2154 CreateAndReopenWithCF({"pikachu"}, options);
2155
2156 Random rnd(301);
2157
2158 // Write 8MB (80 values, each 100K)
2159 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
2160 std::vector<std::string> values;
2161 for (int i = 0; i < 80; i++) {
2162 values.push_back(RandomString(&rnd, 100000));
2163 ASSERT_OK(Put((i < 40), Key(i), values[i]));
2164 }
2165
2166 // assert that nothing makes it to disk yet.
2167 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
2168
2169 // get a file snapshot
2170 uint64_t manifest_number = 0;
2171 uint64_t manifest_size = 0;
2172 std::vector<std::string> files;
2173 dbfull()->DisableFileDeletions();
2174 dbfull()->GetLiveFiles(files, &manifest_size);
2175
2176 // CURRENT, MANIFEST, OPTIONS, *.sst files (one for each CF)
2177 ASSERT_EQ(files.size(), 5U);
2178
2179 uint64_t number = 0;
2180 FileType type;
2181
2182 // copy these files to a new snapshot directory
2183 std::string snapdir = dbname_ + ".snapdir/";
2184 ASSERT_OK(env_->CreateDirIfMissing(snapdir));
2185
2186 for (size_t i = 0; i < files.size(); i++) {
2187 // our clients require that GetLiveFiles returns
2188 // files with "/" as first character!
2189 ASSERT_EQ(files[i][0], '/');
2190 std::string src = dbname_ + files[i];
2191 std::string dest = snapdir + files[i];
2192
2193 uint64_t size;
2194 ASSERT_OK(env_->GetFileSize(src, &size));
2195
2196 // record the number and the size of the
2197 // latest manifest file
2198 if (ParseFileName(files[i].substr(1), &number, &type)) {
2199 if (type == kDescriptorFile) {
2200 if (number > manifest_number) {
2201 manifest_number = number;
2202 ASSERT_GE(size, manifest_size);
2203 size = manifest_size; // copy only valid MANIFEST data
2204 }
2205 }
2206 }
2207 CopyFile(src, dest, size);
2208 }
2209
2210 // release file snapshot
2211 dbfull()->DisableFileDeletions();
2212 // overwrite one key, this key should not appear in the snapshot
2213 std::vector<std::string> extras;
2214 for (unsigned int i = 0; i < 1; i++) {
2215 extras.push_back(RandomString(&rnd, 100000));
2216 ASSERT_OK(Put(0, Key(i), extras[i]));
2217 }
2218
2219 // verify that data in the snapshot are correct
2220 std::vector<ColumnFamilyDescriptor> column_families;
2221 column_families.emplace_back("default", ColumnFamilyOptions());
2222 column_families.emplace_back("pikachu", ColumnFamilyOptions());
2223 std::vector<ColumnFamilyHandle*> cf_handles;
2224 DB* snapdb;
2225 DBOptions opts;
2226 opts.env = env_;
2227 opts.create_if_missing = false;
2228 Status stat =
2229 DB::Open(opts, snapdir, column_families, &cf_handles, &snapdb);
2230 ASSERT_OK(stat);
2231
2232 ReadOptions roptions;
2233 std::string val;
2234 for (unsigned int i = 0; i < 80; i++) {
2235 stat = snapdb->Get(roptions, cf_handles[i < 40], Key(i), &val);
2236 ASSERT_EQ(values[i].compare(val), 0);
2237 }
2238 for (auto cfh : cf_handles) {
2239 delete cfh;
2240 }
2241 delete snapdb;
2242
2243 // look at the new live files after we added an 'extra' key
2244 // and after we took the first snapshot.
2245 uint64_t new_manifest_number = 0;
2246 uint64_t new_manifest_size = 0;
2247 std::vector<std::string> newfiles;
2248 dbfull()->DisableFileDeletions();
2249 dbfull()->GetLiveFiles(newfiles, &new_manifest_size);
2250
2251 // find the new manifest file. assert that this manifest file is
2252 // the same one as in the previous snapshot. But its size should be
2253 // larger because we added an extra key after taking the
2254 // previous shapshot.
2255 for (size_t i = 0; i < newfiles.size(); i++) {
2256 std::string src = dbname_ + "/" + newfiles[i];
2257 // record the lognumber and the size of the
2258 // latest manifest file
2259 if (ParseFileName(newfiles[i].substr(1), &number, &type)) {
2260 if (type == kDescriptorFile) {
2261 if (number > new_manifest_number) {
2262 uint64_t size;
2263 new_manifest_number = number;
2264 ASSERT_OK(env_->GetFileSize(src, &size));
2265 ASSERT_GE(size, new_manifest_size);
2266 }
2267 }
2268 }
2269 }
2270 ASSERT_EQ(manifest_number, new_manifest_number);
2271 ASSERT_GT(new_manifest_size, manifest_size);
2272
2273 // release file snapshot
2274 dbfull()->DisableFileDeletions();
2275 } while (ChangeCompactOptions());
2276 }
2277
2278 TEST_F(DBTest, ReadonlyDBGetLiveManifestSize) {
2279 do {
2280 Options options = CurrentOptions();
2281 options.level0_file_num_compaction_trigger = 2;
2282 DestroyAndReopen(options);
2283
2284 ASSERT_OK(Put("foo", "bar"));
2285 ASSERT_OK(Flush());
2286 ASSERT_OK(Put("foo", "bar"));
2287 ASSERT_OK(Flush());
2288 ASSERT_OK(dbfull()->TEST_WaitForCompact());
2289
2290 Close();
2291 ASSERT_OK(ReadOnlyReopen(options));
2292
2293 uint64_t manifest_size = 0;
2294 std::vector<std::string> files;
2295 dbfull()->GetLiveFiles(files, &manifest_size);
2296
2297 for (const std::string& f : files) {
2298 uint64_t number = 0;
2299 FileType type;
2300 if (ParseFileName(f.substr(1), &number, &type)) {
2301 if (type == kDescriptorFile) {
2302 uint64_t size_on_disk;
2303 env_->GetFileSize(dbname_ + "/" + f, &size_on_disk);
2304 ASSERT_EQ(manifest_size, size_on_disk);
2305 break;
2306 }
2307 }
2308 }
2309 Close();
2310 } while (ChangeCompactOptions());
2311 }
2312 #endif
2313
2314 TEST_F(DBTest, PurgeInfoLogs) {
2315 Options options = CurrentOptions();
2316 options.keep_log_file_num = 5;
2317 options.create_if_missing = true;
2318 for (int mode = 0; mode <= 1; mode++) {
2319 if (mode == 1) {
2320 options.db_log_dir = dbname_ + "_logs";
2321 env_->CreateDirIfMissing(options.db_log_dir);
2322 } else {
2323 options.db_log_dir = "";
2324 }
2325 for (int i = 0; i < 8; i++) {
2326 Reopen(options);
2327 }
2328
2329 std::vector<std::string> files;
2330 env_->GetChildren(options.db_log_dir.empty() ? dbname_ : options.db_log_dir,
2331 &files);
2332 int info_log_count = 0;
2333 for (std::string file : files) {
2334 if (file.find("LOG") != std::string::npos) {
2335 info_log_count++;
2336 }
2337 }
2338 ASSERT_EQ(5, info_log_count);
2339
2340 Destroy(options);
2341 // For mode (1), test DestroyDB() to delete all the logs under DB dir.
2342 // For mode (2), no info log file should have been put under DB dir.
2343 std::vector<std::string> db_files;
2344 env_->GetChildren(dbname_, &db_files);
2345 for (std::string file : db_files) {
2346 ASSERT_TRUE(file.find("LOG") == std::string::npos);
2347 }
2348
2349 if (mode == 1) {
2350 // Cleaning up
2351 env_->GetChildren(options.db_log_dir, &files);
2352 for (std::string file : files) {
2353 env_->DeleteFile(options.db_log_dir + "/" + file);
2354 }
2355 env_->DeleteDir(options.db_log_dir);
2356 }
2357 }
2358 }
2359
2360 #ifndef ROCKSDB_LITE
2361 // Multi-threaded test:
2362 namespace {
2363
2364 static const int kColumnFamilies = 10;
2365 static const int kNumThreads = 10;
2366 static const int kTestSeconds = 10;
2367 static const int kNumKeys = 1000;
2368
2369 struct MTState {
2370 DBTest* test;
2371 std::atomic<bool> stop;
2372 std::atomic<int> counter[kNumThreads];
2373 std::atomic<bool> thread_done[kNumThreads];
2374 };
2375
2376 struct MTThread {
2377 MTState* state;
2378 int id;
2379 bool multiget_batched;
2380 };
2381
2382 static void MTThreadBody(void* arg) {
2383 MTThread* t = reinterpret_cast<MTThread*>(arg);
2384 int id = t->id;
2385 DB* db = t->state->test->db_;
2386 int counter = 0;
2387 fprintf(stderr, "... starting thread %d\n", id);
2388 Random rnd(1000 + id);
2389 char valbuf[1500];
2390 while (t->state->stop.load(std::memory_order_acquire) == false) {
2391 t->state->counter[id].store(counter, std::memory_order_release);
2392
2393 int key = rnd.Uniform(kNumKeys);
2394 char keybuf[20];
2395 snprintf(keybuf, sizeof(keybuf), "%016d", key);
2396
2397 if (rnd.OneIn(2)) {
2398 // Write values of the form <key, my id, counter, cf, unique_id>.
2399 // into each of the CFs
2400 // We add some padding for force compactions.
2401 int unique_id = rnd.Uniform(1000000);
2402
2403 // Half of the time directly use WriteBatch. Half of the time use
2404 // WriteBatchWithIndex.
2405 if (rnd.OneIn(2)) {
2406 WriteBatch batch;
2407 for (int cf = 0; cf < kColumnFamilies; ++cf) {
2408 snprintf(valbuf, sizeof(valbuf), "%d.%d.%d.%d.%-1000d", key, id,
2409 static_cast<int>(counter), cf, unique_id);
2410 batch.Put(t->state->test->handles_[cf], Slice(keybuf), Slice(valbuf));
2411 }
2412 ASSERT_OK(db->Write(WriteOptions(), &batch));
2413 } else {
2414 WriteBatchWithIndex batch(db->GetOptions().comparator);
2415 for (int cf = 0; cf < kColumnFamilies; ++cf) {
2416 snprintf(valbuf, sizeof(valbuf), "%d.%d.%d.%d.%-1000d", key, id,
2417 static_cast<int>(counter), cf, unique_id);
2418 batch.Put(t->state->test->handles_[cf], Slice(keybuf), Slice(valbuf));
2419 }
2420 ASSERT_OK(db->Write(WriteOptions(), batch.GetWriteBatch()));
2421 }
2422 } else {
2423 // Read a value and verify that it matches the pattern written above
2424 // and that writes to all column families were atomic (unique_id is the
2425 // same)
2426 std::vector<Slice> keys(kColumnFamilies, Slice(keybuf));
2427 std::vector<std::string> values;
2428 std::vector<Status> statuses;
2429 if (!t->multiget_batched) {
2430 statuses = db->MultiGet(ReadOptions(), t->state->test->handles_, keys,
2431 &values);
2432 } else {
2433 std::vector<PinnableSlice> pin_values(keys.size());
2434 statuses.resize(keys.size());
2435 const Snapshot* snapshot = db->GetSnapshot();
2436 ReadOptions ro;
2437 ro.snapshot = snapshot;
2438 for (int cf = 0; cf < kColumnFamilies; ++cf) {
2439 db->MultiGet(ro, t->state->test->handles_[cf], 1, &keys[cf],
2440 &pin_values[cf], &statuses[cf]);
2441 }
2442 db->ReleaseSnapshot(snapshot);
2443 values.resize(keys.size());
2444 for (int cf = 0; cf < kColumnFamilies; ++cf) {
2445 if (statuses[cf].ok()) {
2446 values[cf].assign(pin_values[cf].data(), pin_values[cf].size());
2447 }
2448 }
2449 }
2450 Status s = statuses[0];
2451 // all statuses have to be the same
2452 for (size_t i = 1; i < statuses.size(); ++i) {
2453 // they are either both ok or both not-found
2454 ASSERT_TRUE((s.ok() && statuses[i].ok()) ||
2455 (s.IsNotFound() && statuses[i].IsNotFound()));
2456 }
2457 if (s.IsNotFound()) {
2458 // Key has not yet been written
2459 } else {
2460 // Check that the writer thread counter is >= the counter in the value
2461 ASSERT_OK(s);
2462 int unique_id = -1;
2463 for (int i = 0; i < kColumnFamilies; ++i) {
2464 int k, w, c, cf, u;
2465 ASSERT_EQ(5, sscanf(values[i].c_str(), "%d.%d.%d.%d.%d", &k, &w, &c,
2466 &cf, &u))
2467 << values[i];
2468 ASSERT_EQ(k, key);
2469 ASSERT_GE(w, 0);
2470 ASSERT_LT(w, kNumThreads);
2471 ASSERT_LE(c, t->state->counter[w].load(std::memory_order_acquire));
2472 ASSERT_EQ(cf, i);
2473 if (i == 0) {
2474 unique_id = u;
2475 } else {
2476 // this checks that updates across column families happened
2477 // atomically -- all unique ids are the same
2478 ASSERT_EQ(u, unique_id);
2479 }
2480 }
2481 }
2482 }
2483 counter++;
2484 }
2485 t->state->thread_done[id].store(true, std::memory_order_release);
2486 fprintf(stderr, "... stopping thread %d after %d ops\n", id, int(counter));
2487 }
2488
2489 } // namespace
2490
2491 class MultiThreadedDBTest
2492 : public DBTest,
2493 public ::testing::WithParamInterface<std::tuple<int, bool>> {
2494 public:
2495 void SetUp() override {
2496 std::tie(option_config_, multiget_batched_) = GetParam();
2497 }
2498
2499 static std::vector<int> GenerateOptionConfigs() {
2500 std::vector<int> optionConfigs;
2501 for (int optionConfig = kDefault; optionConfig < kEnd; ++optionConfig) {
2502 optionConfigs.push_back(optionConfig);
2503 }
2504 return optionConfigs;
2505 }
2506
2507 bool multiget_batched_;
2508 };
2509
2510 TEST_P(MultiThreadedDBTest, MultiThreaded) {
2511 if (option_config_ == kPipelinedWrite) return;
2512 anon::OptionsOverride options_override;
2513 options_override.skip_policy = kSkipNoSnapshot;
2514 Options options = CurrentOptions(options_override);
2515 std::vector<std::string> cfs;
2516 for (int i = 1; i < kColumnFamilies; ++i) {
2517 cfs.push_back(ToString(i));
2518 }
2519 Reopen(options);
2520 CreateAndReopenWithCF(cfs, options);
2521 // Initialize state
2522 MTState mt;
2523 mt.test = this;
2524 mt.stop.store(false, std::memory_order_release);
2525 for (int id = 0; id < kNumThreads; id++) {
2526 mt.counter[id].store(0, std::memory_order_release);
2527 mt.thread_done[id].store(false, std::memory_order_release);
2528 }
2529
2530 // Start threads
2531 MTThread thread[kNumThreads];
2532 for (int id = 0; id < kNumThreads; id++) {
2533 thread[id].state = &mt;
2534 thread[id].id = id;
2535 thread[id].multiget_batched = multiget_batched_;
2536 env_->StartThread(MTThreadBody, &thread[id]);
2537 }
2538
2539 // Let them run for a while
2540 env_->SleepForMicroseconds(kTestSeconds * 1000000);
2541
2542 // Stop the threads and wait for them to finish
2543 mt.stop.store(true, std::memory_order_release);
2544 for (int id = 0; id < kNumThreads; id++) {
2545 while (mt.thread_done[id].load(std::memory_order_acquire) == false) {
2546 env_->SleepForMicroseconds(100000);
2547 }
2548 }
2549 }
2550
2551 INSTANTIATE_TEST_CASE_P(
2552 MultiThreaded, MultiThreadedDBTest,
2553 ::testing::Combine(
2554 ::testing::ValuesIn(MultiThreadedDBTest::GenerateOptionConfigs()),
2555 ::testing::Bool()));
2556 #endif // ROCKSDB_LITE
2557
2558 // Group commit test:
2559 #if !defined(TRAVIS) && !defined(OS_WIN)
2560 // Disable this test temporarily on Travis and appveyor as it fails
2561 // intermittently. Github issue: #4151
2562 namespace {
2563
2564 static const int kGCNumThreads = 4;
2565 static const int kGCNumKeys = 1000;
2566
2567 struct GCThread {
2568 DB* db;
2569 int id;
2570 std::atomic<bool> done;
2571 };
2572
2573 static void GCThreadBody(void* arg) {
2574 GCThread* t = reinterpret_cast<GCThread*>(arg);
2575 int id = t->id;
2576 DB* db = t->db;
2577 WriteOptions wo;
2578
2579 for (int i = 0; i < kGCNumKeys; ++i) {
2580 std::string kv(ToString(i + id * kGCNumKeys));
2581 ASSERT_OK(db->Put(wo, kv, kv));
2582 }
2583 t->done = true;
2584 }
2585
2586 } // namespace
2587
2588 TEST_F(DBTest, GroupCommitTest) {
2589 do {
2590 Options options = CurrentOptions();
2591 options.env = env_;
2592 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
2593 Reopen(options);
2594
2595 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
2596 {{"WriteThread::JoinBatchGroup:BeganWaiting",
2597 "DBImpl::WriteImpl:BeforeLeaderEnters"},
2598 {"WriteThread::AwaitState:BlockingWaiting",
2599 "WriteThread::EnterAsBatchGroupLeader:End"}});
2600 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2601
2602 // Start threads
2603 GCThread thread[kGCNumThreads];
2604 for (int id = 0; id < kGCNumThreads; id++) {
2605 thread[id].id = id;
2606 thread[id].db = db_;
2607 thread[id].done = false;
2608 env_->StartThread(GCThreadBody, &thread[id]);
2609 }
2610 env_->WaitForJoin();
2611
2612 ASSERT_GT(TestGetTickerCount(options, WRITE_DONE_BY_OTHER), 0);
2613
2614 std::vector<std::string> expected_db;
2615 for (int i = 0; i < kGCNumThreads * kGCNumKeys; ++i) {
2616 expected_db.push_back(ToString(i));
2617 }
2618 std::sort(expected_db.begin(), expected_db.end());
2619
2620 Iterator* itr = db_->NewIterator(ReadOptions());
2621 itr->SeekToFirst();
2622 for (auto x : expected_db) {
2623 ASSERT_TRUE(itr->Valid());
2624 ASSERT_EQ(itr->key().ToString(), x);
2625 ASSERT_EQ(itr->value().ToString(), x);
2626 itr->Next();
2627 }
2628 ASSERT_TRUE(!itr->Valid());
2629 delete itr;
2630
2631 HistogramData hist_data;
2632 options.statistics->histogramData(DB_WRITE, &hist_data);
2633 ASSERT_GT(hist_data.average, 0.0);
2634 } while (ChangeOptions(kSkipNoSeekToLast));
2635 }
2636 #endif // TRAVIS
2637
2638 namespace {
2639 typedef std::map<std::string, std::string> KVMap;
2640 }
2641
2642 class ModelDB : public DB {
2643 public:
2644 class ModelSnapshot : public Snapshot {
2645 public:
2646 KVMap map_;
2647
2648 SequenceNumber GetSequenceNumber() const override {
2649 // no need to call this
2650 assert(false);
2651 return 0;
2652 }
2653 };
2654
2655 explicit ModelDB(const Options& options) : options_(options) {}
2656 using DB::Put;
2657 Status Put(const WriteOptions& o, ColumnFamilyHandle* cf, const Slice& k,
2658 const Slice& v) override {
2659 WriteBatch batch;
2660 batch.Put(cf, k, v);
2661 return Write(o, &batch);
2662 }
2663 using DB::Close;
2664 Status Close() override { return Status::OK(); }
2665 using DB::Delete;
2666 Status Delete(const WriteOptions& o, ColumnFamilyHandle* cf,
2667 const Slice& key) override {
2668 WriteBatch batch;
2669 batch.Delete(cf, key);
2670 return Write(o, &batch);
2671 }
2672 using DB::SingleDelete;
2673 Status SingleDelete(const WriteOptions& o, ColumnFamilyHandle* cf,
2674 const Slice& key) override {
2675 WriteBatch batch;
2676 batch.SingleDelete(cf, key);
2677 return Write(o, &batch);
2678 }
2679 using DB::Merge;
2680 Status Merge(const WriteOptions& o, ColumnFamilyHandle* cf, const Slice& k,
2681 const Slice& v) override {
2682 WriteBatch batch;
2683 batch.Merge(cf, k, v);
2684 return Write(o, &batch);
2685 }
2686 using DB::Get;
2687 Status Get(const ReadOptions& /*options*/, ColumnFamilyHandle* /*cf*/,
2688 const Slice& key, PinnableSlice* /*value*/) override {
2689 return Status::NotSupported(key);
2690 }
2691
2692 using DB::GetMergeOperands;
2693 virtual Status GetMergeOperands(
2694 const ReadOptions& /*options*/, ColumnFamilyHandle* /*column_family*/,
2695 const Slice& key, PinnableSlice* /*slice*/,
2696 GetMergeOperandsOptions* /*merge_operands_options*/,
2697 int* /*number_of_operands*/) override {
2698 return Status::NotSupported(key);
2699 }
2700
2701 using DB::MultiGet;
2702 std::vector<Status> MultiGet(
2703 const ReadOptions& /*options*/,
2704 const std::vector<ColumnFamilyHandle*>& /*column_family*/,
2705 const std::vector<Slice>& keys,
2706 std::vector<std::string>* /*values*/) override {
2707 std::vector<Status> s(keys.size(),
2708 Status::NotSupported("Not implemented."));
2709 return s;
2710 }
2711
2712 #ifndef ROCKSDB_LITE
2713 using DB::IngestExternalFile;
2714 Status IngestExternalFile(
2715 ColumnFamilyHandle* /*column_family*/,
2716 const std::vector<std::string>& /*external_files*/,
2717 const IngestExternalFileOptions& /*options*/) override {
2718 return Status::NotSupported("Not implemented.");
2719 }
2720
2721 using DB::IngestExternalFiles;
2722 Status IngestExternalFiles(
2723 const std::vector<IngestExternalFileArg>& /*args*/) override {
2724 return Status::NotSupported("Not implemented");
2725 }
2726
2727 using DB::CreateColumnFamilyWithImport;
2728 virtual Status CreateColumnFamilyWithImport(
2729 const ColumnFamilyOptions& /*options*/,
2730 const std::string& /*column_family_name*/,
2731 const ImportColumnFamilyOptions& /*import_options*/,
2732 const ExportImportFilesMetaData& /*metadata*/,
2733 ColumnFamilyHandle** /*handle*/) override {
2734 return Status::NotSupported("Not implemented.");
2735 }
2736
2737 using DB::VerifyChecksum;
2738 Status VerifyChecksum(const ReadOptions&) override {
2739 return Status::NotSupported("Not implemented.");
2740 }
2741
2742 using DB::GetPropertiesOfAllTables;
2743 Status GetPropertiesOfAllTables(
2744 ColumnFamilyHandle* /*column_family*/,
2745 TablePropertiesCollection* /*props*/) override {
2746 return Status();
2747 }
2748
2749 Status GetPropertiesOfTablesInRange(
2750 ColumnFamilyHandle* /*column_family*/, const Range* /*range*/,
2751 std::size_t /*n*/, TablePropertiesCollection* /*props*/) override {
2752 return Status();
2753 }
2754 #endif // ROCKSDB_LITE
2755
2756 using DB::KeyMayExist;
2757 bool KeyMayExist(const ReadOptions& /*options*/,
2758 ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/,
2759 std::string* /*value*/,
2760 bool* value_found = nullptr) override {
2761 if (value_found != nullptr) {
2762 *value_found = false;
2763 }
2764 return true; // Not Supported directly
2765 }
2766 using DB::NewIterator;
2767 Iterator* NewIterator(const ReadOptions& options,
2768 ColumnFamilyHandle* /*column_family*/) override {
2769 if (options.snapshot == nullptr) {
2770 KVMap* saved = new KVMap;
2771 *saved = map_;
2772 return new ModelIter(saved, true);
2773 } else {
2774 const KVMap* snapshot_state =
2775 &(reinterpret_cast<const ModelSnapshot*>(options.snapshot)->map_);
2776 return new ModelIter(snapshot_state, false);
2777 }
2778 }
2779 Status NewIterators(const ReadOptions& /*options*/,
2780 const std::vector<ColumnFamilyHandle*>& /*column_family*/,
2781 std::vector<Iterator*>* /*iterators*/) override {
2782 return Status::NotSupported("Not supported yet");
2783 }
2784 const Snapshot* GetSnapshot() override {
2785 ModelSnapshot* snapshot = new ModelSnapshot;
2786 snapshot->map_ = map_;
2787 return snapshot;
2788 }
2789
2790 void ReleaseSnapshot(const Snapshot* snapshot) override {
2791 delete reinterpret_cast<const ModelSnapshot*>(snapshot);
2792 }
2793
2794 Status Write(const WriteOptions& /*options*/, WriteBatch* batch) override {
2795 class Handler : public WriteBatch::Handler {
2796 public:
2797 KVMap* map_;
2798 void Put(const Slice& key, const Slice& value) override {
2799 (*map_)[key.ToString()] = value.ToString();
2800 }
2801 void Merge(const Slice& /*key*/, const Slice& /*value*/) override {
2802 // ignore merge for now
2803 // (*map_)[key.ToString()] = value.ToString();
2804 }
2805 void Delete(const Slice& key) override { map_->erase(key.ToString()); }
2806 };
2807 Handler handler;
2808 handler.map_ = &map_;
2809 return batch->Iterate(&handler);
2810 }
2811
2812 using DB::GetProperty;
2813 bool GetProperty(ColumnFamilyHandle* /*column_family*/,
2814 const Slice& /*property*/, std::string* /*value*/) override {
2815 return false;
2816 }
2817 using DB::GetIntProperty;
2818 bool GetIntProperty(ColumnFamilyHandle* /*column_family*/,
2819 const Slice& /*property*/, uint64_t* /*value*/) override {
2820 return false;
2821 }
2822 using DB::GetMapProperty;
2823 bool GetMapProperty(ColumnFamilyHandle* /*column_family*/,
2824 const Slice& /*property*/,
2825 std::map<std::string, std::string>* /*value*/) override {
2826 return false;
2827 }
2828 using DB::GetAggregatedIntProperty;
2829 bool GetAggregatedIntProperty(const Slice& /*property*/,
2830 uint64_t* /*value*/) override {
2831 return false;
2832 }
2833 using DB::GetApproximateSizes;
2834 Status GetApproximateSizes(const SizeApproximationOptions& /*options*/,
2835 ColumnFamilyHandle* /*column_family*/,
2836 const Range* /*range*/, int n,
2837 uint64_t* sizes) override {
2838 for (int i = 0; i < n; i++) {
2839 sizes[i] = 0;
2840 }
2841 return Status::OK();
2842 }
2843 using DB::GetApproximateMemTableStats;
2844 void GetApproximateMemTableStats(ColumnFamilyHandle* /*column_family*/,
2845 const Range& /*range*/,
2846 uint64_t* const count,
2847 uint64_t* const size) override {
2848 *count = 0;
2849 *size = 0;
2850 }
2851 using DB::CompactRange;
2852 Status CompactRange(const CompactRangeOptions& /*options*/,
2853 ColumnFamilyHandle* /*column_family*/,
2854 const Slice* /*start*/, const Slice* /*end*/) override {
2855 return Status::NotSupported("Not supported operation.");
2856 }
2857
2858 Status SetDBOptions(
2859 const std::unordered_map<std::string, std::string>& /*new_options*/)
2860 override {
2861 return Status::NotSupported("Not supported operation.");
2862 }
2863
2864 using DB::CompactFiles;
2865 Status CompactFiles(
2866 const CompactionOptions& /*compact_options*/,
2867 ColumnFamilyHandle* /*column_family*/,
2868 const std::vector<std::string>& /*input_file_names*/,
2869 const int /*output_level*/, const int /*output_path_id*/ = -1,
2870 std::vector<std::string>* const /*output_file_names*/ = nullptr,
2871 CompactionJobInfo* /*compaction_job_info*/ = nullptr) override {
2872 return Status::NotSupported("Not supported operation.");
2873 }
2874
2875 Status PauseBackgroundWork() override {
2876 return Status::NotSupported("Not supported operation.");
2877 }
2878
2879 Status ContinueBackgroundWork() override {
2880 return Status::NotSupported("Not supported operation.");
2881 }
2882
2883 Status EnableAutoCompaction(
2884 const std::vector<ColumnFamilyHandle*>& /*column_family_handles*/)
2885 override {
2886 return Status::NotSupported("Not supported operation.");
2887 }
2888
2889 void EnableManualCompaction() override { return; }
2890
2891 void DisableManualCompaction() override { return; }
2892
2893 using DB::NumberLevels;
2894 int NumberLevels(ColumnFamilyHandle* /*column_family*/) override { return 1; }
2895
2896 using DB::MaxMemCompactionLevel;
2897 int MaxMemCompactionLevel(ColumnFamilyHandle* /*column_family*/) override {
2898 return 1;
2899 }
2900
2901 using DB::Level0StopWriteTrigger;
2902 int Level0StopWriteTrigger(ColumnFamilyHandle* /*column_family*/) override {
2903 return -1;
2904 }
2905
2906 const std::string& GetName() const override { return name_; }
2907
2908 Env* GetEnv() const override { return nullptr; }
2909
2910 using DB::GetOptions;
2911 Options GetOptions(ColumnFamilyHandle* /*column_family*/) const override {
2912 return options_;
2913 }
2914
2915 using DB::GetDBOptions;
2916 DBOptions GetDBOptions() const override { return options_; }
2917
2918 using DB::Flush;
2919 Status Flush(const ROCKSDB_NAMESPACE::FlushOptions& /*options*/,
2920 ColumnFamilyHandle* /*column_family*/) override {
2921 Status ret;
2922 return ret;
2923 }
2924 Status Flush(
2925 const ROCKSDB_NAMESPACE::FlushOptions& /*options*/,
2926 const std::vector<ColumnFamilyHandle*>& /*column_families*/) override {
2927 return Status::OK();
2928 }
2929
2930 Status SyncWAL() override { return Status::OK(); }
2931
2932 #ifndef ROCKSDB_LITE
2933 Status DisableFileDeletions() override { return Status::OK(); }
2934
2935 Status EnableFileDeletions(bool /*force*/) override { return Status::OK(); }
2936 Status GetLiveFiles(std::vector<std::string>&, uint64_t* /*size*/,
2937 bool /*flush_memtable*/ = true) override {
2938 return Status::OK();
2939 }
2940
2941 Status GetSortedWalFiles(VectorLogPtr& /*files*/) override {
2942 return Status::OK();
2943 }
2944
2945 Status GetCurrentWalFile(
2946 std::unique_ptr<LogFile>* /*current_log_file*/) override {
2947 return Status::OK();
2948 }
2949
2950 virtual Status GetCreationTimeOfOldestFile(
2951 uint64_t* /*creation_time*/) override {
2952 return Status::NotSupported();
2953 }
2954
2955 Status DeleteFile(std::string /*name*/) override { return Status::OK(); }
2956
2957 Status GetUpdatesSince(
2958 ROCKSDB_NAMESPACE::SequenceNumber,
2959 std::unique_ptr<ROCKSDB_NAMESPACE::TransactionLogIterator>*,
2960 const TransactionLogIterator::ReadOptions& /*read_options*/ =
2961 TransactionLogIterator::ReadOptions()) override {
2962 return Status::NotSupported("Not supported in Model DB");
2963 }
2964
2965 void GetColumnFamilyMetaData(ColumnFamilyHandle* /*column_family*/,
2966 ColumnFamilyMetaData* /*metadata*/) override {}
2967 #endif // ROCKSDB_LITE
2968
2969 Status GetDbIdentity(std::string& /*identity*/) const override {
2970 return Status::OK();
2971 }
2972
2973 SequenceNumber GetLatestSequenceNumber() const override { return 0; }
2974
2975 bool SetPreserveDeletesSequenceNumber(SequenceNumber /*seqnum*/) override {
2976 return true;
2977 }
2978
2979 ColumnFamilyHandle* DefaultColumnFamily() const override { return nullptr; }
2980
2981 private:
2982 class ModelIter : public Iterator {
2983 public:
2984 ModelIter(const KVMap* map, bool owned)
2985 : map_(map), owned_(owned), iter_(map_->end()) {}
2986 ~ModelIter() override {
2987 if (owned_) delete map_;
2988 }
2989 bool Valid() const override { return iter_ != map_->end(); }
2990 void SeekToFirst() override { iter_ = map_->begin(); }
2991 void SeekToLast() override {
2992 if (map_->empty()) {
2993 iter_ = map_->end();
2994 } else {
2995 iter_ = map_->find(map_->rbegin()->first);
2996 }
2997 }
2998 void Seek(const Slice& k) override {
2999 iter_ = map_->lower_bound(k.ToString());
3000 }
3001 void SeekForPrev(const Slice& k) override {
3002 iter_ = map_->upper_bound(k.ToString());
3003 Prev();
3004 }
3005 void Next() override { ++iter_; }
3006 void Prev() override {
3007 if (iter_ == map_->begin()) {
3008 iter_ = map_->end();
3009 return;
3010 }
3011 --iter_;
3012 }
3013
3014 Slice key() const override { return iter_->first; }
3015 Slice value() const override { return iter_->second; }
3016 Status status() const override { return Status::OK(); }
3017
3018 private:
3019 const KVMap* const map_;
3020 const bool owned_; // Do we own map_
3021 KVMap::const_iterator iter_;
3022 };
3023 const Options options_;
3024 KVMap map_;
3025 std::string name_ = "";
3026 };
3027
3028 #ifndef ROCKSDB_VALGRIND_RUN
3029 static std::string RandomKey(Random* rnd, int minimum = 0) {
3030 int len;
3031 do {
3032 len = (rnd->OneIn(3)
3033 ? 1 // Short sometimes to encourage collisions
3034 : (rnd->OneIn(100) ? rnd->Skewed(10) : rnd->Uniform(10)));
3035 } while (len < minimum);
3036 return test::RandomKey(rnd, len);
3037 }
3038
3039 static bool CompareIterators(int step, DB* model, DB* db,
3040 const Snapshot* model_snap,
3041 const Snapshot* db_snap) {
3042 ReadOptions options;
3043 options.snapshot = model_snap;
3044 Iterator* miter = model->NewIterator(options);
3045 options.snapshot = db_snap;
3046 Iterator* dbiter = db->NewIterator(options);
3047 bool ok = true;
3048 int count = 0;
3049 for (miter->SeekToFirst(), dbiter->SeekToFirst();
3050 ok && miter->Valid() && dbiter->Valid(); miter->Next(), dbiter->Next()) {
3051 count++;
3052 if (miter->key().compare(dbiter->key()) != 0) {
3053 fprintf(stderr, "step %d: Key mismatch: '%s' vs. '%s'\n", step,
3054 EscapeString(miter->key()).c_str(),
3055 EscapeString(dbiter->key()).c_str());
3056 ok = false;
3057 break;
3058 }
3059
3060 if (miter->value().compare(dbiter->value()) != 0) {
3061 fprintf(stderr, "step %d: Value mismatch for key '%s': '%s' vs. '%s'\n",
3062 step, EscapeString(miter->key()).c_str(),
3063 EscapeString(miter->value()).c_str(),
3064 EscapeString(miter->value()).c_str());
3065 ok = false;
3066 }
3067 }
3068
3069 if (ok) {
3070 if (miter->Valid() != dbiter->Valid()) {
3071 fprintf(stderr, "step %d: Mismatch at end of iterators: %d vs. %d\n",
3072 step, miter->Valid(), dbiter->Valid());
3073 ok = false;
3074 }
3075 }
3076 delete miter;
3077 delete dbiter;
3078 return ok;
3079 }
3080
3081 class DBTestRandomized : public DBTest,
3082 public ::testing::WithParamInterface<int> {
3083 public:
3084 void SetUp() override { option_config_ = GetParam(); }
3085
3086 static std::vector<int> GenerateOptionConfigs() {
3087 std::vector<int> option_configs;
3088 // skip cuckoo hash as it does not support snapshot.
3089 for (int option_config = kDefault; option_config < kEnd; ++option_config) {
3090 if (!ShouldSkipOptions(option_config,
3091 kSkipDeletesFilterFirst | kSkipNoSeekToLast)) {
3092 option_configs.push_back(option_config);
3093 }
3094 }
3095 option_configs.push_back(kBlockBasedTableWithIndexRestartInterval);
3096 return option_configs;
3097 }
3098 };
3099
3100 INSTANTIATE_TEST_CASE_P(
3101 DBTestRandomized, DBTestRandomized,
3102 ::testing::ValuesIn(DBTestRandomized::GenerateOptionConfigs()));
3103
3104 TEST_P(DBTestRandomized, Randomized) {
3105 anon::OptionsOverride options_override;
3106 options_override.skip_policy = kSkipNoSnapshot;
3107 Options options = CurrentOptions(options_override);
3108 DestroyAndReopen(options);
3109
3110 Random rnd(test::RandomSeed() + GetParam());
3111 ModelDB model(options);
3112 const int N = 10000;
3113 const Snapshot* model_snap = nullptr;
3114 const Snapshot* db_snap = nullptr;
3115 std::string k, v;
3116 for (int step = 0; step < N; step++) {
3117 // TODO(sanjay): Test Get() works
3118 int p = rnd.Uniform(100);
3119 int minimum = 0;
3120 if (option_config_ == kHashSkipList || option_config_ == kHashLinkList ||
3121 option_config_ == kPlainTableFirstBytePrefix ||
3122 option_config_ == kBlockBasedTableWithWholeKeyHashIndex ||
3123 option_config_ == kBlockBasedTableWithPrefixHashIndex) {
3124 minimum = 1;
3125 }
3126 if (p < 45) { // Put
3127 k = RandomKey(&rnd, minimum);
3128 v = RandomString(&rnd,
3129 rnd.OneIn(20) ? 100 + rnd.Uniform(100) : rnd.Uniform(8));
3130 ASSERT_OK(model.Put(WriteOptions(), k, v));
3131 ASSERT_OK(db_->Put(WriteOptions(), k, v));
3132 } else if (p < 90) { // Delete
3133 k = RandomKey(&rnd, minimum);
3134 ASSERT_OK(model.Delete(WriteOptions(), k));
3135 ASSERT_OK(db_->Delete(WriteOptions(), k));
3136 } else { // Multi-element batch
3137 WriteBatch b;
3138 const int num = rnd.Uniform(8);
3139 for (int i = 0; i < num; i++) {
3140 if (i == 0 || !rnd.OneIn(10)) {
3141 k = RandomKey(&rnd, minimum);
3142 } else {
3143 // Periodically re-use the same key from the previous iter, so
3144 // we have multiple entries in the write batch for the same key
3145 }
3146 if (rnd.OneIn(2)) {
3147 v = RandomString(&rnd, rnd.Uniform(10));
3148 b.Put(k, v);
3149 } else {
3150 b.Delete(k);
3151 }
3152 }
3153 ASSERT_OK(model.Write(WriteOptions(), &b));
3154 ASSERT_OK(db_->Write(WriteOptions(), &b));
3155 }
3156
3157 if ((step % 100) == 0) {
3158 // For DB instances that use the hash index + block-based table, the
3159 // iterator will be invalid right when seeking a non-existent key, right
3160 // than return a key that is close to it.
3161 if (option_config_ != kBlockBasedTableWithWholeKeyHashIndex &&
3162 option_config_ != kBlockBasedTableWithPrefixHashIndex) {
3163 ASSERT_TRUE(CompareIterators(step, &model, db_, nullptr, nullptr));
3164 ASSERT_TRUE(CompareIterators(step, &model, db_, model_snap, db_snap));
3165 }
3166
3167 // Save a snapshot from each DB this time that we'll use next
3168 // time we compare things, to make sure the current state is
3169 // preserved with the snapshot
3170 if (model_snap != nullptr) model.ReleaseSnapshot(model_snap);
3171 if (db_snap != nullptr) db_->ReleaseSnapshot(db_snap);
3172
3173 Reopen(options);
3174 ASSERT_TRUE(CompareIterators(step, &model, db_, nullptr, nullptr));
3175
3176 model_snap = model.GetSnapshot();
3177 db_snap = db_->GetSnapshot();
3178 }
3179 }
3180 if (model_snap != nullptr) model.ReleaseSnapshot(model_snap);
3181 if (db_snap != nullptr) db_->ReleaseSnapshot(db_snap);
3182 }
3183 #endif // ROCKSDB_VALGRIND_RUN
3184
3185 TEST_F(DBTest, BlockBasedTablePrefixIndexTest) {
3186 // create a DB with block prefix index
3187 BlockBasedTableOptions table_options;
3188 Options options = CurrentOptions();
3189 table_options.index_type = BlockBasedTableOptions::kHashSearch;
3190 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
3191 options.prefix_extractor.reset(NewFixedPrefixTransform(1));
3192
3193 Reopen(options);
3194 ASSERT_OK(Put("k1", "v1"));
3195 Flush();
3196 ASSERT_OK(Put("k2", "v2"));
3197
3198 // Reopen it without prefix extractor, make sure everything still works.
3199 // RocksDB should just fall back to the binary index.
3200 table_options.index_type = BlockBasedTableOptions::kBinarySearch;
3201 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
3202 options.prefix_extractor.reset();
3203
3204 Reopen(options);
3205 ASSERT_EQ("v1", Get("k1"));
3206 ASSERT_EQ("v2", Get("k2"));
3207 }
3208
3209 TEST_F(DBTest, BlockBasedTablePrefixIndexTotalOrderSeek) {
3210 // create a DB with block prefix index
3211 BlockBasedTableOptions table_options;
3212 Options options = CurrentOptions();
3213 options.max_open_files = 10;
3214 table_options.index_type = BlockBasedTableOptions::kHashSearch;
3215 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
3216 options.prefix_extractor.reset(NewFixedPrefixTransform(1));
3217
3218 // RocksDB sanitize max open files to at least 20. Modify it back.
3219 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3220 "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
3221 int* max_open_files = static_cast<int*>(arg);
3222 *max_open_files = 11;
3223 });
3224 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3225
3226 Reopen(options);
3227 ASSERT_OK(Put("k1", "v1"));
3228 Flush();
3229
3230 CompactRangeOptions cro;
3231 cro.change_level = true;
3232 cro.target_level = 1;
3233 ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
3234
3235 // Force evict tables
3236 dbfull()->TEST_table_cache()->SetCapacity(0);
3237 // Make table cache to keep one entry.
3238 dbfull()->TEST_table_cache()->SetCapacity(1);
3239
3240 ReadOptions read_options;
3241 read_options.total_order_seek = true;
3242 {
3243 std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
3244 iter->Seek("k1");
3245 ASSERT_TRUE(iter->Valid());
3246 ASSERT_EQ("k1", iter->key().ToString());
3247 }
3248
3249 // After total order seek, prefix index should still be used.
3250 read_options.total_order_seek = false;
3251 {
3252 std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
3253 iter->Seek("k1");
3254 ASSERT_TRUE(iter->Valid());
3255 ASSERT_EQ("k1", iter->key().ToString());
3256 }
3257 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3258 }
3259
3260 TEST_F(DBTest, ChecksumTest) {
3261 BlockBasedTableOptions table_options;
3262 Options options = CurrentOptions();
3263
3264 table_options.checksum = kCRC32c;
3265 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
3266 Reopen(options);
3267 ASSERT_OK(Put("a", "b"));
3268 ASSERT_OK(Put("c", "d"));
3269 ASSERT_OK(Flush()); // table with crc checksum
3270
3271 table_options.checksum = kxxHash;
3272 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
3273 Reopen(options);
3274 ASSERT_OK(Put("e", "f"));
3275 ASSERT_OK(Put("g", "h"));
3276 ASSERT_OK(Flush()); // table with xxhash checksum
3277
3278 table_options.checksum = kCRC32c;
3279 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
3280 Reopen(options);
3281 ASSERT_EQ("b", Get("a"));
3282 ASSERT_EQ("d", Get("c"));
3283 ASSERT_EQ("f", Get("e"));
3284 ASSERT_EQ("h", Get("g"));
3285
3286 table_options.checksum = kCRC32c;
3287 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
3288 Reopen(options);
3289 ASSERT_EQ("b", Get("a"));
3290 ASSERT_EQ("d", Get("c"));
3291 ASSERT_EQ("f", Get("e"));
3292 ASSERT_EQ("h", Get("g"));
3293 }
3294
3295 #ifndef ROCKSDB_LITE
3296 TEST_P(DBTestWithParam, FIFOCompactionTest) {
3297 for (int iter = 0; iter < 2; ++iter) {
3298 // first iteration -- auto compaction
3299 // second iteration -- manual compaction
3300 Options options;
3301 options.compaction_style = kCompactionStyleFIFO;
3302 options.write_buffer_size = 100 << 10; // 100KB
3303 options.arena_block_size = 4096;
3304 options.compaction_options_fifo.max_table_files_size = 500 << 10; // 500KB
3305 options.compression = kNoCompression;
3306 options.create_if_missing = true;
3307 options.max_subcompactions = max_subcompactions_;
3308 if (iter == 1) {
3309 options.disable_auto_compactions = true;
3310 }
3311 options = CurrentOptions(options);
3312 DestroyAndReopen(options);
3313
3314 Random rnd(301);
3315 for (int i = 0; i < 6; ++i) {
3316 for (int j = 0; j < 110; ++j) {
3317 ASSERT_OK(Put(ToString(i * 100 + j), RandomString(&rnd, 980)));
3318 }
3319 // flush should happen here
3320 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
3321 }
3322 if (iter == 0) {
3323 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3324 } else {
3325 CompactRangeOptions cro;
3326 cro.exclusive_manual_compaction = exclusive_manual_compaction_;
3327 ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
3328 }
3329 // only 5 files should survive
3330 ASSERT_EQ(NumTableFilesAtLevel(0), 5);
3331 for (int i = 0; i < 50; ++i) {
3332 // these keys should be deleted in previous compaction
3333 ASSERT_EQ("NOT_FOUND", Get(ToString(i)));
3334 }
3335 }
3336 }
3337
3338 TEST_F(DBTest, FIFOCompactionTestWithCompaction) {
3339 Options options;
3340 options.compaction_style = kCompactionStyleFIFO;
3341 options.write_buffer_size = 20 << 10; // 20K
3342 options.arena_block_size = 4096;
3343 options.compaction_options_fifo.max_table_files_size = 1500 << 10; // 1MB
3344 options.compaction_options_fifo.allow_compaction = true;
3345 options.level0_file_num_compaction_trigger = 6;
3346 options.compression = kNoCompression;
3347 options.create_if_missing = true;
3348 options = CurrentOptions(options);
3349 DestroyAndReopen(options);
3350
3351 Random rnd(301);
3352 for (int i = 0; i < 60; i++) {
3353 // Generate and flush a file about 20KB.
3354 for (int j = 0; j < 20; j++) {
3355 ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980)));
3356 }
3357 Flush();
3358 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3359 }
3360 // It should be compacted to 10 files.
3361 ASSERT_EQ(NumTableFilesAtLevel(0), 10);
3362
3363 for (int i = 0; i < 60; i++) {
3364 // Generate and flush a file about 20KB.
3365 for (int j = 0; j < 20; j++) {
3366 ASSERT_OK(Put(ToString(i * 20 + j + 2000), RandomString(&rnd, 980)));
3367 }
3368 Flush();
3369 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3370 }
3371
3372 // It should be compacted to no more than 20 files.
3373 ASSERT_GT(NumTableFilesAtLevel(0), 10);
3374 ASSERT_LT(NumTableFilesAtLevel(0), 18);
3375 // Size limit is still guaranteed.
3376 ASSERT_LE(SizeAtLevel(0),
3377 options.compaction_options_fifo.max_table_files_size);
3378 }
3379
3380 TEST_F(DBTest, FIFOCompactionStyleWithCompactionAndDelete) {
3381 Options options;
3382 options.compaction_style = kCompactionStyleFIFO;
3383 options.write_buffer_size = 20 << 10; // 20K
3384 options.arena_block_size = 4096;
3385 options.compaction_options_fifo.max_table_files_size = 1500 << 10; // 1MB
3386 options.compaction_options_fifo.allow_compaction = true;
3387 options.level0_file_num_compaction_trigger = 3;
3388 options.compression = kNoCompression;
3389 options.create_if_missing = true;
3390 options = CurrentOptions(options);
3391 DestroyAndReopen(options);
3392
3393 Random rnd(301);
3394 for (int i = 0; i < 3; i++) {
3395 // Each file contains a different key which will be dropped later.
3396 ASSERT_OK(Put("a" + ToString(i), RandomString(&rnd, 500)));
3397 ASSERT_OK(Put("key" + ToString(i), ""));
3398 ASSERT_OK(Put("z" + ToString(i), RandomString(&rnd, 500)));
3399 Flush();
3400 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3401 }
3402 ASSERT_EQ(NumTableFilesAtLevel(0), 1);
3403 for (int i = 0; i < 3; i++) {
3404 ASSERT_EQ("", Get("key" + ToString(i)));
3405 }
3406 for (int i = 0; i < 3; i++) {
3407 // Each file contains a different key which will be dropped later.
3408 ASSERT_OK(Put("a" + ToString(i), RandomString(&rnd, 500)));
3409 ASSERT_OK(Delete("key" + ToString(i)));
3410 ASSERT_OK(Put("z" + ToString(i), RandomString(&rnd, 500)));
3411 Flush();
3412 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3413 }
3414 ASSERT_EQ(NumTableFilesAtLevel(0), 2);
3415 for (int i = 0; i < 3; i++) {
3416 ASSERT_EQ("NOT_FOUND", Get("key" + ToString(i)));
3417 }
3418 }
3419
3420 // Check that FIFO-with-TTL is not supported with max_open_files != -1.
3421 TEST_F(DBTest, FIFOCompactionWithTTLAndMaxOpenFilesTest) {
3422 Options options;
3423 options.compaction_style = kCompactionStyleFIFO;
3424 options.create_if_missing = true;
3425 options.ttl = 600; // seconds
3426
3427 // TTL is now supported with max_open_files != -1.
3428 options.max_open_files = 100;
3429 options = CurrentOptions(options);
3430 ASSERT_OK(TryReopen(options));
3431
3432 options.max_open_files = -1;
3433 ASSERT_OK(TryReopen(options));
3434 }
3435
3436 // Check that FIFO-with-TTL is supported only with BlockBasedTableFactory.
3437 TEST_F(DBTest, FIFOCompactionWithTTLAndVariousTableFormatsTest) {
3438 Options options;
3439 options.compaction_style = kCompactionStyleFIFO;
3440 options.create_if_missing = true;
3441 options.ttl = 600; // seconds
3442
3443 options = CurrentOptions(options);
3444 options.table_factory.reset(NewBlockBasedTableFactory());
3445 ASSERT_OK(TryReopen(options));
3446
3447 Destroy(options);
3448 options.table_factory.reset(NewPlainTableFactory());
3449 ASSERT_TRUE(TryReopen(options).IsNotSupported());
3450
3451 Destroy(options);
3452 options.table_factory.reset(NewAdaptiveTableFactory());
3453 ASSERT_TRUE(TryReopen(options).IsNotSupported());
3454 }
3455
3456 TEST_F(DBTest, FIFOCompactionWithTTLTest) {
3457 Options options;
3458 options.compaction_style = kCompactionStyleFIFO;
3459 options.write_buffer_size = 10 << 10; // 10KB
3460 options.arena_block_size = 4096;
3461 options.compression = kNoCompression;
3462 options.create_if_missing = true;
3463 env_->time_elapse_only_sleep_ = false;
3464 options.env = env_;
3465
3466 // Test to make sure that all files with expired ttl are deleted on next
3467 // manual compaction.
3468 {
3469 env_->addon_time_.store(0);
3470 options.compaction_options_fifo.max_table_files_size = 150 << 10; // 150KB
3471 options.compaction_options_fifo.allow_compaction = false;
3472 options.ttl = 1 * 60 * 60 ; // 1 hour
3473 options = CurrentOptions(options);
3474 DestroyAndReopen(options);
3475
3476 Random rnd(301);
3477 for (int i = 0; i < 10; i++) {
3478 // Generate and flush a file about 10KB.
3479 for (int j = 0; j < 10; j++) {
3480 ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980)));
3481 }
3482 Flush();
3483 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3484 }
3485 ASSERT_EQ(NumTableFilesAtLevel(0), 10);
3486
3487 // Sleep for 2 hours -- which is much greater than TTL.
3488 // Note: Couldn't use SleepForMicroseconds because it takes an int instead
3489 // of uint64_t. Hence used addon_time_ directly.
3490 // env_->SleepForMicroseconds(2 * 60 * 60 * 1000 * 1000);
3491 env_->addon_time_.fetch_add(2 * 60 * 60);
3492
3493 // Since no flushes and compactions have run, the db should still be in
3494 // the same state even after considerable time has passed.
3495 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3496 ASSERT_EQ(NumTableFilesAtLevel(0), 10);
3497
3498 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
3499 ASSERT_EQ(NumTableFilesAtLevel(0), 0);
3500 }
3501
3502 // Test to make sure that all files with expired ttl are deleted on next
3503 // automatic compaction.
3504 {
3505 options.compaction_options_fifo.max_table_files_size = 150 << 10; // 150KB
3506 options.compaction_options_fifo.allow_compaction = false;
3507 options.ttl = 1 * 60 * 60; // 1 hour
3508 options = CurrentOptions(options);
3509 DestroyAndReopen(options);
3510
3511 Random rnd(301);
3512 for (int i = 0; i < 10; i++) {
3513 // Generate and flush a file about 10KB.
3514 for (int j = 0; j < 10; j++) {
3515 ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980)));
3516 }
3517 Flush();
3518 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3519 }
3520 ASSERT_EQ(NumTableFilesAtLevel(0), 10);
3521
3522 // Sleep for 2 hours -- which is much greater than TTL.
3523 env_->addon_time_.fetch_add(2 * 60 * 60);
3524 // Just to make sure that we are in the same state even after sleeping.
3525 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3526 ASSERT_EQ(NumTableFilesAtLevel(0), 10);
3527
3528 // Create 1 more file to trigger TTL compaction. The old files are dropped.
3529 for (int i = 0; i < 1; i++) {
3530 for (int j = 0; j < 10; j++) {
3531 ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980)));
3532 }
3533 Flush();
3534 }
3535
3536 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3537 // Only the new 10 files remain.
3538 ASSERT_EQ(NumTableFilesAtLevel(0), 1);
3539 ASSERT_LE(SizeAtLevel(0),
3540 options.compaction_options_fifo.max_table_files_size);
3541 }
3542
3543 // Test that shows the fall back to size-based FIFO compaction if TTL-based
3544 // deletion doesn't move the total size to be less than max_table_files_size.
3545 {
3546 options.write_buffer_size = 10 << 10; // 10KB
3547 options.compaction_options_fifo.max_table_files_size = 150 << 10; // 150KB
3548 options.compaction_options_fifo.allow_compaction = false;
3549 options.ttl = 1 * 60 * 60; // 1 hour
3550 options = CurrentOptions(options);
3551 DestroyAndReopen(options);
3552
3553 Random rnd(301);
3554 for (int i = 0; i < 3; i++) {
3555 // Generate and flush a file about 10KB.
3556 for (int j = 0; j < 10; j++) {
3557 ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980)));
3558 }
3559 Flush();
3560 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3561 }
3562 ASSERT_EQ(NumTableFilesAtLevel(0), 3);
3563
3564 // Sleep for 2 hours -- which is much greater than TTL.
3565 env_->addon_time_.fetch_add(2 * 60 * 60);
3566 // Just to make sure that we are in the same state even after sleeping.
3567 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3568 ASSERT_EQ(NumTableFilesAtLevel(0), 3);
3569
3570 for (int i = 0; i < 5; i++) {
3571 for (int j = 0; j < 140; j++) {
3572 ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980)));
3573 }
3574 Flush();
3575 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3576 }
3577 // Size limit is still guaranteed.
3578 ASSERT_LE(SizeAtLevel(0),
3579 options.compaction_options_fifo.max_table_files_size);
3580 }
3581
3582 // Test with TTL + Intra-L0 compactions.
3583 {
3584 options.compaction_options_fifo.max_table_files_size = 150 << 10; // 150KB
3585 options.compaction_options_fifo.allow_compaction = true;
3586 options.ttl = 1 * 60 * 60; // 1 hour
3587 options.level0_file_num_compaction_trigger = 6;
3588 options = CurrentOptions(options);
3589 DestroyAndReopen(options);
3590
3591 Random rnd(301);
3592 for (int i = 0; i < 10; i++) {
3593 // Generate and flush a file about 10KB.
3594 for (int j = 0; j < 10; j++) {
3595 ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980)));
3596 }
3597 Flush();
3598 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3599 }
3600 // With Intra-L0 compaction, out of 10 files, 6 files will be compacted to 1
3601 // (due to level0_file_num_compaction_trigger = 6).
3602 // So total files = 1 + remaining 4 = 5.
3603 ASSERT_EQ(NumTableFilesAtLevel(0), 5);
3604
3605 // Sleep for 2 hours -- which is much greater than TTL.
3606 env_->addon_time_.fetch_add(2 * 60 * 60);
3607 // Just to make sure that we are in the same state even after sleeping.
3608 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3609 ASSERT_EQ(NumTableFilesAtLevel(0), 5);
3610
3611 // Create 10 more files. The old 5 files are dropped as their ttl expired.
3612 for (int i = 0; i < 10; i++) {
3613 for (int j = 0; j < 10; j++) {
3614 ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980)));
3615 }
3616 Flush();
3617 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3618 }
3619 ASSERT_EQ(NumTableFilesAtLevel(0), 5);
3620 ASSERT_LE(SizeAtLevel(0),
3621 options.compaction_options_fifo.max_table_files_size);
3622 }
3623
3624 // Test with large TTL + Intra-L0 compactions.
3625 // Files dropped based on size, as ttl doesn't kick in.
3626 {
3627 options.write_buffer_size = 20 << 10; // 20K
3628 options.compaction_options_fifo.max_table_files_size = 1500 << 10; // 1.5MB
3629 options.compaction_options_fifo.allow_compaction = true;
3630 options.ttl = 1 * 60 * 60; // 1 hour
3631 options.level0_file_num_compaction_trigger = 6;
3632 options = CurrentOptions(options);
3633 DestroyAndReopen(options);
3634
3635 Random rnd(301);
3636 for (int i = 0; i < 60; i++) {
3637 // Generate and flush a file about 20KB.
3638 for (int j = 0; j < 20; j++) {
3639 ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980)));
3640 }
3641 Flush();
3642 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3643 }
3644 // It should be compacted to 10 files.
3645 ASSERT_EQ(NumTableFilesAtLevel(0), 10);
3646
3647 for (int i = 0; i < 60; i++) {
3648 // Generate and flush a file about 20KB.
3649 for (int j = 0; j < 20; j++) {
3650 ASSERT_OK(Put(ToString(i * 20 + j + 2000), RandomString(&rnd, 980)));
3651 }
3652 Flush();
3653 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3654 }
3655
3656 // It should be compacted to no more than 20 files.
3657 ASSERT_GT(NumTableFilesAtLevel(0), 10);
3658 ASSERT_LT(NumTableFilesAtLevel(0), 18);
3659 // Size limit is still guaranteed.
3660 ASSERT_LE(SizeAtLevel(0),
3661 options.compaction_options_fifo.max_table_files_size);
3662 }
3663 }
3664 #endif // ROCKSDB_LITE
3665
3666 #ifndef ROCKSDB_LITE
3667 /*
3668 * This test is not reliable enough as it heavily depends on disk behavior.
3669 * Disable as it is flaky.
3670 */
3671 TEST_F(DBTest, DISABLED_RateLimitingTest) {
3672 Options options = CurrentOptions();
3673 options.write_buffer_size = 1 << 20; // 1MB
3674 options.level0_file_num_compaction_trigger = 2;
3675 options.target_file_size_base = 1 << 20; // 1MB
3676 options.max_bytes_for_level_base = 4 << 20; // 4MB
3677 options.max_bytes_for_level_multiplier = 4;
3678 options.compression = kNoCompression;
3679 options.create_if_missing = true;
3680 options.env = env_;
3681 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
3682 options.IncreaseParallelism(4);
3683 DestroyAndReopen(options);
3684
3685 WriteOptions wo;
3686 wo.disableWAL = true;
3687
3688 // # no rate limiting
3689 Random rnd(301);
3690 uint64_t start = env_->NowMicros();
3691 // Write ~96M data
3692 for (int64_t i = 0; i < (96 << 10); ++i) {
3693 ASSERT_OK(
3694 Put(RandomString(&rnd, 32), RandomString(&rnd, (1 << 10) + 1), wo));
3695 }
3696 uint64_t elapsed = env_->NowMicros() - start;
3697 double raw_rate = env_->bytes_written_ * 1000000.0 / elapsed;
3698 uint64_t rate_limiter_drains =
3699 TestGetTickerCount(options, NUMBER_RATE_LIMITER_DRAINS);
3700 ASSERT_EQ(0, rate_limiter_drains);
3701 Close();
3702
3703 // # rate limiting with 0.7 x threshold
3704 options.rate_limiter.reset(
3705 NewGenericRateLimiter(static_cast<int64_t>(0.7 * raw_rate)));
3706 env_->bytes_written_ = 0;
3707 DestroyAndReopen(options);
3708
3709 start = env_->NowMicros();
3710 // Write ~96M data
3711 for (int64_t i = 0; i < (96 << 10); ++i) {
3712 ASSERT_OK(
3713 Put(RandomString(&rnd, 32), RandomString(&rnd, (1 << 10) + 1), wo));
3714 }
3715 rate_limiter_drains =
3716 TestGetTickerCount(options, NUMBER_RATE_LIMITER_DRAINS) -
3717 rate_limiter_drains;
3718 elapsed = env_->NowMicros() - start;
3719 Close();
3720 ASSERT_EQ(options.rate_limiter->GetTotalBytesThrough(), env_->bytes_written_);
3721 // Most intervals should've been drained (interval time is 100ms, elapsed is
3722 // micros)
3723 ASSERT_GT(rate_limiter_drains, 0);
3724 ASSERT_LE(rate_limiter_drains, elapsed / 100000 + 1);
3725 double ratio = env_->bytes_written_ * 1000000 / elapsed / raw_rate;
3726 fprintf(stderr, "write rate ratio = %.2lf, expected 0.7\n", ratio);
3727 ASSERT_TRUE(ratio < 0.8);
3728
3729 // # rate limiting with half of the raw_rate
3730 options.rate_limiter.reset(
3731 NewGenericRateLimiter(static_cast<int64_t>(raw_rate / 2)));
3732 env_->bytes_written_ = 0;
3733 DestroyAndReopen(options);
3734
3735 start = env_->NowMicros();
3736 // Write ~96M data
3737 for (int64_t i = 0; i < (96 << 10); ++i) {
3738 ASSERT_OK(
3739 Put(RandomString(&rnd, 32), RandomString(&rnd, (1 << 10) + 1), wo));
3740 }
3741 elapsed = env_->NowMicros() - start;
3742 rate_limiter_drains =
3743 TestGetTickerCount(options, NUMBER_RATE_LIMITER_DRAINS) -
3744 rate_limiter_drains;
3745 Close();
3746 ASSERT_EQ(options.rate_limiter->GetTotalBytesThrough(), env_->bytes_written_);
3747 // Most intervals should've been drained (interval time is 100ms, elapsed is
3748 // micros)
3749 ASSERT_GT(rate_limiter_drains, elapsed / 100000 / 2);
3750 ASSERT_LE(rate_limiter_drains, elapsed / 100000 + 1);
3751 ratio = env_->bytes_written_ * 1000000 / elapsed / raw_rate;
3752 fprintf(stderr, "write rate ratio = %.2lf, expected 0.5\n", ratio);
3753 ASSERT_LT(ratio, 0.6);
3754 }
3755
3756 TEST_F(DBTest, TableOptionsSanitizeTest) {
3757 Options options = CurrentOptions();
3758 options.create_if_missing = true;
3759 DestroyAndReopen(options);
3760 ASSERT_EQ(db_->GetOptions().allow_mmap_reads, false);
3761
3762 options.table_factory.reset(new PlainTableFactory());
3763 options.prefix_extractor.reset(NewNoopTransform());
3764 Destroy(options);
3765 ASSERT_TRUE(!TryReopen(options).IsNotSupported());
3766
3767 // Test for check of prefix_extractor when hash index is used for
3768 // block-based table
3769 BlockBasedTableOptions to;
3770 to.index_type = BlockBasedTableOptions::kHashSearch;
3771 options = CurrentOptions();
3772 options.create_if_missing = true;
3773 options.table_factory.reset(NewBlockBasedTableFactory(to));
3774 ASSERT_TRUE(TryReopen(options).IsInvalidArgument());
3775 options.prefix_extractor.reset(NewFixedPrefixTransform(1));
3776 ASSERT_OK(TryReopen(options));
3777 }
3778
3779 TEST_F(DBTest, ConcurrentMemtableNotSupported) {
3780 Options options = CurrentOptions();
3781 options.allow_concurrent_memtable_write = true;
3782 options.soft_pending_compaction_bytes_limit = 0;
3783 options.hard_pending_compaction_bytes_limit = 100;
3784 options.create_if_missing = true;
3785
3786 DestroyDB(dbname_, options);
3787 options.memtable_factory.reset(NewHashLinkListRepFactory(4, 0, 3, true, 4));
3788 ASSERT_NOK(TryReopen(options));
3789
3790 options.memtable_factory.reset(new SkipListFactory);
3791 ASSERT_OK(TryReopen(options));
3792
3793 ColumnFamilyOptions cf_options(options);
3794 cf_options.memtable_factory.reset(
3795 NewHashLinkListRepFactory(4, 0, 3, true, 4));
3796 ColumnFamilyHandle* handle;
3797 ASSERT_NOK(db_->CreateColumnFamily(cf_options, "name", &handle));
3798 }
3799
3800 #endif // ROCKSDB_LITE
3801
3802 TEST_F(DBTest, SanitizeNumThreads) {
3803 for (int attempt = 0; attempt < 2; attempt++) {
3804 const size_t kTotalTasks = 8;
3805 test::SleepingBackgroundTask sleeping_tasks[kTotalTasks];
3806
3807 Options options = CurrentOptions();
3808 if (attempt == 0) {
3809 options.max_background_compactions = 3;
3810 options.max_background_flushes = 2;
3811 }
3812 options.create_if_missing = true;
3813 DestroyAndReopen(options);
3814
3815 for (size_t i = 0; i < kTotalTasks; i++) {
3816 // Insert 5 tasks to low priority queue and 5 tasks to high priority queue
3817 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
3818 &sleeping_tasks[i],
3819 (i < 4) ? Env::Priority::LOW : Env::Priority::HIGH);
3820 }
3821
3822 // Wait until 10s for they are scheduled.
3823 for (int i = 0; i < 10000; i++) {
3824 if (options.env->GetThreadPoolQueueLen(Env::Priority::LOW) <= 1 &&
3825 options.env->GetThreadPoolQueueLen(Env::Priority::HIGH) <= 2) {
3826 break;
3827 }
3828 env_->SleepForMicroseconds(1000);
3829 }
3830
3831 // pool size 3, total task 4. Queue size should be 1.
3832 ASSERT_EQ(1U, options.env->GetThreadPoolQueueLen(Env::Priority::LOW));
3833 // pool size 2, total task 4. Queue size should be 2.
3834 ASSERT_EQ(2U, options.env->GetThreadPoolQueueLen(Env::Priority::HIGH));
3835
3836 for (size_t i = 0; i < kTotalTasks; i++) {
3837 sleeping_tasks[i].WakeUp();
3838 sleeping_tasks[i].WaitUntilDone();
3839 }
3840
3841 ASSERT_OK(Put("abc", "def"));
3842 ASSERT_EQ("def", Get("abc"));
3843 Flush();
3844 ASSERT_EQ("def", Get("abc"));
3845 }
3846 }
3847
3848 TEST_F(DBTest, WriteSingleThreadEntry) {
3849 std::vector<port::Thread> threads;
3850 dbfull()->TEST_LockMutex();
3851 auto w = dbfull()->TEST_BeginWrite();
3852 threads.emplace_back([&] { Put("a", "b"); });
3853 env_->SleepForMicroseconds(10000);
3854 threads.emplace_back([&] { Flush(); });
3855 env_->SleepForMicroseconds(10000);
3856 dbfull()->TEST_UnlockMutex();
3857 dbfull()->TEST_LockMutex();
3858 dbfull()->TEST_EndWrite(w);
3859 dbfull()->TEST_UnlockMutex();
3860
3861 for (auto& t : threads) {
3862 t.join();
3863 }
3864 }
3865
3866 TEST_F(DBTest, ConcurrentFlushWAL) {
3867 const size_t cnt = 100;
3868 Options options;
3869 WriteOptions wopt;
3870 ReadOptions ropt;
3871 for (bool two_write_queues : {false, true}) {
3872 for (bool manual_wal_flush : {false, true}) {
3873 options.two_write_queues = two_write_queues;
3874 options.manual_wal_flush = manual_wal_flush;
3875 options.create_if_missing = true;
3876 DestroyAndReopen(options);
3877 std::vector<port::Thread> threads;
3878 threads.emplace_back([&] {
3879 for (size_t i = 0; i < cnt; i++) {
3880 auto istr = ToString(i);
3881 db_->Put(wopt, db_->DefaultColumnFamily(), "a" + istr, "b" + istr);
3882 }
3883 });
3884 if (two_write_queues) {
3885 threads.emplace_back([&] {
3886 for (size_t i = cnt; i < 2 * cnt; i++) {
3887 auto istr = ToString(i);
3888 WriteBatch batch;
3889 batch.Put("a" + istr, "b" + istr);
3890 dbfull()->WriteImpl(wopt, &batch, nullptr, nullptr, 0, true);
3891 }
3892 });
3893 }
3894 threads.emplace_back([&] {
3895 for (size_t i = 0; i < cnt * 100; i++) { // FlushWAL is faster than Put
3896 db_->FlushWAL(false);
3897 }
3898 });
3899 for (auto& t : threads) {
3900 t.join();
3901 }
3902 options.create_if_missing = false;
3903 // Recover from the wal and make sure that it is not corrupted
3904 Reopen(options);
3905 for (size_t i = 0; i < cnt; i++) {
3906 PinnableSlice pval;
3907 auto istr = ToString(i);
3908 ASSERT_OK(
3909 db_->Get(ropt, db_->DefaultColumnFamily(), "a" + istr, &pval));
3910 ASSERT_TRUE(pval == ("b" + istr));
3911 }
3912 }
3913 }
3914 }
3915
3916 #ifndef ROCKSDB_LITE
3917 TEST_F(DBTest, DynamicMemtableOptions) {
3918 const uint64_t k64KB = 1 << 16;
3919 const uint64_t k128KB = 1 << 17;
3920 const uint64_t k5KB = 5 * 1024;
3921 Options options;
3922 options.env = env_;
3923 options.create_if_missing = true;
3924 options.compression = kNoCompression;
3925 options.max_background_compactions = 1;
3926 options.write_buffer_size = k64KB;
3927 options.arena_block_size = 16 * 1024;
3928 options.max_write_buffer_number = 2;
3929 // Don't trigger compact/slowdown/stop
3930 options.level0_file_num_compaction_trigger = 1024;
3931 options.level0_slowdown_writes_trigger = 1024;
3932 options.level0_stop_writes_trigger = 1024;
3933 DestroyAndReopen(options);
3934
3935 auto gen_l0_kb = [this](int size) {
3936 const int kNumPutsBeforeWaitForFlush = 64;
3937 Random rnd(301);
3938 for (int i = 0; i < size; i++) {
3939 ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024)));
3940
3941 // The following condition prevents a race condition between flush jobs
3942 // acquiring work and this thread filling up multiple memtables. Without
3943 // this, the flush might produce less files than expected because
3944 // multiple memtables are flushed into a single L0 file. This race
3945 // condition affects assertion (A).
3946 if (i % kNumPutsBeforeWaitForFlush == kNumPutsBeforeWaitForFlush - 1) {
3947 dbfull()->TEST_WaitForFlushMemTable();
3948 }
3949 }
3950 dbfull()->TEST_WaitForFlushMemTable();
3951 };
3952
3953 // Test write_buffer_size
3954 gen_l0_kb(64);
3955 ASSERT_EQ(NumTableFilesAtLevel(0), 1);
3956 ASSERT_LT(SizeAtLevel(0), k64KB + k5KB);
3957 ASSERT_GT(SizeAtLevel(0), k64KB - k5KB * 2);
3958
3959 // Clean up L0
3960 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
3961 ASSERT_EQ(NumTableFilesAtLevel(0), 0);
3962
3963 // Increase buffer size
3964 ASSERT_OK(dbfull()->SetOptions({
3965 {"write_buffer_size", "131072"},
3966 }));
3967
3968 // The existing memtable inflated 64KB->128KB when we invoked SetOptions().
3969 // Write 192KB, we should have a 128KB L0 file and a memtable with 64KB data.
3970 gen_l0_kb(192);
3971 ASSERT_EQ(NumTableFilesAtLevel(0), 1); // (A)
3972 ASSERT_LT(SizeAtLevel(0), k128KB + 2 * k5KB);
3973 ASSERT_GT(SizeAtLevel(0), k128KB - 4 * k5KB);
3974
3975 // Decrease buffer size below current usage
3976 ASSERT_OK(dbfull()->SetOptions({
3977 {"write_buffer_size", "65536"},
3978 }));
3979 // The existing memtable became eligible for flush when we reduced its
3980 // capacity to 64KB. Two keys need to be added to trigger flush: first causes
3981 // memtable to be marked full, second schedules the flush. Then we should have
3982 // a 128KB L0 file, a 64KB L0 file, and a memtable with just one key.
3983 gen_l0_kb(2);
3984 ASSERT_EQ(NumTableFilesAtLevel(0), 2);
3985 ASSERT_LT(SizeAtLevel(0), k128KB + k64KB + 2 * k5KB);
3986 ASSERT_GT(SizeAtLevel(0), k128KB + k64KB - 4 * k5KB);
3987
3988 // Test max_write_buffer_number
3989 // Block compaction thread, which will also block the flushes because
3990 // max_background_flushes == 0, so flushes are getting executed by the
3991 // compaction thread
3992 env_->SetBackgroundThreads(1, Env::LOW);
3993 test::SleepingBackgroundTask sleeping_task_low;
3994 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
3995 Env::Priority::LOW);
3996 // Start from scratch and disable compaction/flush. Flush can only happen
3997 // during compaction but trigger is pretty high
3998 options.disable_auto_compactions = true;
3999 DestroyAndReopen(options);
4000 env_->SetBackgroundThreads(0, Env::HIGH);
4001
4002 // Put until writes are stopped, bounded by 256 puts. We should see stop at
4003 // ~128KB
4004 int count = 0;
4005 Random rnd(301);
4006
4007 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4008 "DBImpl::DelayWrite:Wait",
4009 [&](void* /*arg*/) { sleeping_task_low.WakeUp(); });
4010 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4011
4012 while (!sleeping_task_low.WokenUp() && count < 256) {
4013 ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), WriteOptions()));
4014 count++;
4015 }
4016 ASSERT_GT(static_cast<double>(count), 128 * 0.8);
4017 ASSERT_LT(static_cast<double>(count), 128 * 1.2);
4018
4019 sleeping_task_low.WaitUntilDone();
4020
4021 // Increase
4022 ASSERT_OK(dbfull()->SetOptions({
4023 {"max_write_buffer_number", "8"},
4024 }));
4025 // Clean up memtable and L0
4026 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
4027
4028 sleeping_task_low.Reset();
4029 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
4030 Env::Priority::LOW);
4031 count = 0;
4032 while (!sleeping_task_low.WokenUp() && count < 1024) {
4033 ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), WriteOptions()));
4034 count++;
4035 }
4036 // Windows fails this test. Will tune in the future and figure out
4037 // approp number
4038 #ifndef OS_WIN
4039 ASSERT_GT(static_cast<double>(count), 512 * 0.8);
4040 ASSERT_LT(static_cast<double>(count), 512 * 1.2);
4041 #endif
4042 sleeping_task_low.WaitUntilDone();
4043
4044 // Decrease
4045 ASSERT_OK(dbfull()->SetOptions({
4046 {"max_write_buffer_number", "4"},
4047 }));
4048 // Clean up memtable and L0
4049 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
4050
4051 sleeping_task_low.Reset();
4052 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
4053 Env::Priority::LOW);
4054
4055 count = 0;
4056 while (!sleeping_task_low.WokenUp() && count < 1024) {
4057 ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), WriteOptions()));
4058 count++;
4059 }
4060 // Windows fails this test. Will tune in the future and figure out
4061 // approp number
4062 #ifndef OS_WIN
4063 ASSERT_GT(static_cast<double>(count), 256 * 0.8);
4064 ASSERT_LT(static_cast<double>(count), 266 * 1.2);
4065 #endif
4066 sleeping_task_low.WaitUntilDone();
4067
4068 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4069 }
4070 #endif // ROCKSDB_LITE
4071
4072 #ifdef ROCKSDB_USING_THREAD_STATUS
4073 namespace {
4074 void VerifyOperationCount(Env* env, ThreadStatus::OperationType op_type,
4075 int expected_count) {
4076 int op_count = 0;
4077 std::vector<ThreadStatus> thread_list;
4078 ASSERT_OK(env->GetThreadList(&thread_list));
4079 for (auto thread : thread_list) {
4080 if (thread.operation_type == op_type) {
4081 op_count++;
4082 }
4083 }
4084 ASSERT_EQ(op_count, expected_count);
4085 }
4086 } // namespace
4087
4088 TEST_F(DBTest, GetThreadStatus) {
4089 Options options;
4090 options.env = env_;
4091 options.enable_thread_tracking = true;
4092 TryReopen(options);
4093
4094 std::vector<ThreadStatus> thread_list;
4095 Status s = env_->GetThreadList(&thread_list);
4096
4097 for (int i = 0; i < 2; ++i) {
4098 // repeat the test with differet number of high / low priority threads
4099 const int kTestCount = 3;
4100 const unsigned int kHighPriCounts[kTestCount] = {3, 2, 5};
4101 const unsigned int kLowPriCounts[kTestCount] = {10, 15, 3};
4102 const unsigned int kBottomPriCounts[kTestCount] = {2, 1, 4};
4103 for (int test = 0; test < kTestCount; ++test) {
4104 // Change the number of threads in high / low priority pool.
4105 env_->SetBackgroundThreads(kHighPriCounts[test], Env::HIGH);
4106 env_->SetBackgroundThreads(kLowPriCounts[test], Env::LOW);
4107 env_->SetBackgroundThreads(kBottomPriCounts[test], Env::BOTTOM);
4108 // Wait to ensure the all threads has been registered
4109 unsigned int thread_type_counts[ThreadStatus::NUM_THREAD_TYPES];
4110 // TODO(ajkr): it'd be better if SetBackgroundThreads returned only after
4111 // all threads have been registered.
4112 // Try up to 60 seconds.
4113 for (int num_try = 0; num_try < 60000; num_try++) {
4114 env_->SleepForMicroseconds(1000);
4115 thread_list.clear();
4116 s = env_->GetThreadList(&thread_list);
4117 ASSERT_OK(s);
4118 memset(thread_type_counts, 0, sizeof(thread_type_counts));
4119 for (auto thread : thread_list) {
4120 ASSERT_LT(thread.thread_type, ThreadStatus::NUM_THREAD_TYPES);
4121 thread_type_counts[thread.thread_type]++;
4122 }
4123 if (thread_type_counts[ThreadStatus::HIGH_PRIORITY] ==
4124 kHighPriCounts[test] &&
4125 thread_type_counts[ThreadStatus::LOW_PRIORITY] ==
4126 kLowPriCounts[test] &&
4127 thread_type_counts[ThreadStatus::BOTTOM_PRIORITY] ==
4128 kBottomPriCounts[test]) {
4129 break;
4130 }
4131 }
4132 // Verify the number of high-priority threads
4133 ASSERT_EQ(thread_type_counts[ThreadStatus::HIGH_PRIORITY],
4134 kHighPriCounts[test]);
4135 // Verify the number of low-priority threads
4136 ASSERT_EQ(thread_type_counts[ThreadStatus::LOW_PRIORITY],
4137 kLowPriCounts[test]);
4138 // Verify the number of bottom-priority threads
4139 ASSERT_EQ(thread_type_counts[ThreadStatus::BOTTOM_PRIORITY],
4140 kBottomPriCounts[test]);
4141 }
4142 if (i == 0) {
4143 // repeat the test with multiple column families
4144 CreateAndReopenWithCF({"pikachu", "about-to-remove"}, options);
4145 env_->GetThreadStatusUpdater()->TEST_VerifyColumnFamilyInfoMap(handles_,
4146 true);
4147 }
4148 }
4149 db_->DropColumnFamily(handles_[2]);
4150 delete handles_[2];
4151 handles_.erase(handles_.begin() + 2);
4152 env_->GetThreadStatusUpdater()->TEST_VerifyColumnFamilyInfoMap(handles_,
4153 true);
4154 Close();
4155 env_->GetThreadStatusUpdater()->TEST_VerifyColumnFamilyInfoMap(handles_,
4156 true);
4157 }
4158
4159 TEST_F(DBTest, DisableThreadStatus) {
4160 Options options;
4161 options.env = env_;
4162 options.enable_thread_tracking = false;
4163 TryReopen(options);
4164 CreateAndReopenWithCF({"pikachu", "about-to-remove"}, options);
4165 // Verify non of the column family info exists
4166 env_->GetThreadStatusUpdater()->TEST_VerifyColumnFamilyInfoMap(handles_,
4167 false);
4168 }
4169
4170 TEST_F(DBTest, ThreadStatusFlush) {
4171 Options options;
4172 options.env = env_;
4173 options.write_buffer_size = 100000; // Small write buffer
4174 options.enable_thread_tracking = true;
4175 options = CurrentOptions(options);
4176
4177 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
4178 {"FlushJob::FlushJob()", "DBTest::ThreadStatusFlush:1"},
4179 {"DBTest::ThreadStatusFlush:2", "FlushJob::WriteLevel0Table"},
4180 });
4181 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4182
4183 CreateAndReopenWithCF({"pikachu"}, options);
4184 VerifyOperationCount(env_, ThreadStatus::OP_FLUSH, 0);
4185
4186 ASSERT_OK(Put(1, "foo", "v1"));
4187 ASSERT_EQ("v1", Get(1, "foo"));
4188 VerifyOperationCount(env_, ThreadStatus::OP_FLUSH, 0);
4189
4190 uint64_t num_running_flushes = 0;
4191 db_->GetIntProperty(DB::Properties::kNumRunningFlushes, &num_running_flushes);
4192 ASSERT_EQ(num_running_flushes, 0);
4193
4194 Put(1, "k1", std::string(100000, 'x')); // Fill memtable
4195 Put(1, "k2", std::string(100000, 'y')); // Trigger flush
4196
4197 // The first sync point is to make sure there's one flush job
4198 // running when we perform VerifyOperationCount().
4199 TEST_SYNC_POINT("DBTest::ThreadStatusFlush:1");
4200 VerifyOperationCount(env_, ThreadStatus::OP_FLUSH, 1);
4201 db_->GetIntProperty(DB::Properties::kNumRunningFlushes, &num_running_flushes);
4202 ASSERT_EQ(num_running_flushes, 1);
4203 // This second sync point is to ensure the flush job will not
4204 // be completed until we already perform VerifyOperationCount().
4205 TEST_SYNC_POINT("DBTest::ThreadStatusFlush:2");
4206 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4207 }
4208
4209 TEST_P(DBTestWithParam, ThreadStatusSingleCompaction) {
4210 const int kTestKeySize = 16;
4211 const int kTestValueSize = 984;
4212 const int kEntrySize = kTestKeySize + kTestValueSize;
4213 const int kEntriesPerBuffer = 100;
4214 Options options;
4215 options.create_if_missing = true;
4216 options.write_buffer_size = kEntrySize * kEntriesPerBuffer;
4217 options.compaction_style = kCompactionStyleLevel;
4218 options.target_file_size_base = options.write_buffer_size;
4219 options.max_bytes_for_level_base = options.target_file_size_base * 2;
4220 options.max_bytes_for_level_multiplier = 2;
4221 options.compression = kNoCompression;
4222 options = CurrentOptions(options);
4223 options.env = env_;
4224 options.enable_thread_tracking = true;
4225 const int kNumL0Files = 4;
4226 options.level0_file_num_compaction_trigger = kNumL0Files;
4227 options.max_subcompactions = max_subcompactions_;
4228
4229 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
4230 {"DBTest::ThreadStatusSingleCompaction:0", "DBImpl::BGWorkCompaction"},
4231 {"CompactionJob::Run():Start", "DBTest::ThreadStatusSingleCompaction:1"},
4232 {"DBTest::ThreadStatusSingleCompaction:2", "CompactionJob::Run():End"},
4233 });
4234 for (int tests = 0; tests < 2; ++tests) {
4235 DestroyAndReopen(options);
4236 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
4237 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4238
4239 Random rnd(301);
4240 // The Put Phase.
4241 for (int file = 0; file < kNumL0Files; ++file) {
4242 for (int key = 0; key < kEntriesPerBuffer; ++key) {
4243 ASSERT_OK(Put(ToString(key + file * kEntriesPerBuffer),
4244 RandomString(&rnd, kTestValueSize)));
4245 }
4246 Flush();
4247 }
4248 // This makes sure a compaction won't be scheduled until
4249 // we have done with the above Put Phase.
4250 uint64_t num_running_compactions = 0;
4251 db_->GetIntProperty(DB::Properties::kNumRunningCompactions,
4252 &num_running_compactions);
4253 ASSERT_EQ(num_running_compactions, 0);
4254 TEST_SYNC_POINT("DBTest::ThreadStatusSingleCompaction:0");
4255 ASSERT_GE(NumTableFilesAtLevel(0),
4256 options.level0_file_num_compaction_trigger);
4257
4258 // This makes sure at least one compaction is running.
4259 TEST_SYNC_POINT("DBTest::ThreadStatusSingleCompaction:1");
4260
4261 if (options.enable_thread_tracking) {
4262 // expecting one single L0 to L1 compaction
4263 VerifyOperationCount(env_, ThreadStatus::OP_COMPACTION, 1);
4264 } else {
4265 // If thread tracking is not enabled, compaction count should be 0.
4266 VerifyOperationCount(env_, ThreadStatus::OP_COMPACTION, 0);
4267 }
4268 db_->GetIntProperty(DB::Properties::kNumRunningCompactions,
4269 &num_running_compactions);
4270 ASSERT_EQ(num_running_compactions, 1);
4271 // TODO(yhchiang): adding assert to verify each compaction stage.
4272 TEST_SYNC_POINT("DBTest::ThreadStatusSingleCompaction:2");
4273
4274 // repeat the test with disabling thread tracking.
4275 options.enable_thread_tracking = false;
4276 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4277 }
4278 }
4279
4280 TEST_P(DBTestWithParam, PreShutdownManualCompaction) {
4281 Options options = CurrentOptions();
4282 options.max_subcompactions = max_subcompactions_;
4283 CreateAndReopenWithCF({"pikachu"}, options);
4284
4285 // iter - 0 with 7 levels
4286 // iter - 1 with 3 levels
4287 for (int iter = 0; iter < 2; ++iter) {
4288 MakeTables(3, "p", "q", 1);
4289 ASSERT_EQ("1,1,1", FilesPerLevel(1));
4290
4291 // Compaction range falls before files
4292 Compact(1, "", "c");
4293 ASSERT_EQ("1,1,1", FilesPerLevel(1));
4294
4295 // Compaction range falls after files
4296 Compact(1, "r", "z");
4297 ASSERT_EQ("1,1,1", FilesPerLevel(1));
4298
4299 // Compaction range overlaps files
4300 Compact(1, "p1", "p9");
4301 ASSERT_EQ("0,0,1", FilesPerLevel(1));
4302
4303 // Populate a different range
4304 MakeTables(3, "c", "e", 1);
4305 ASSERT_EQ("1,1,2", FilesPerLevel(1));
4306
4307 // Compact just the new range
4308 Compact(1, "b", "f");
4309 ASSERT_EQ("0,0,2", FilesPerLevel(1));
4310
4311 // Compact all
4312 MakeTables(1, "a", "z", 1);
4313 ASSERT_EQ("1,0,2", FilesPerLevel(1));
4314 CancelAllBackgroundWork(db_);
4315 db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr);
4316 ASSERT_EQ("1,0,2", FilesPerLevel(1));
4317
4318 if (iter == 0) {
4319 options = CurrentOptions();
4320 options.num_levels = 3;
4321 options.create_if_missing = true;
4322 DestroyAndReopen(options);
4323 CreateAndReopenWithCF({"pikachu"}, options);
4324 }
4325 }
4326 }
4327
4328 TEST_F(DBTest, PreShutdownFlush) {
4329 Options options = CurrentOptions();
4330 CreateAndReopenWithCF({"pikachu"}, options);
4331 ASSERT_OK(Put(1, "key", "value"));
4332 CancelAllBackgroundWork(db_);
4333 Status s =
4334 db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr);
4335 ASSERT_TRUE(s.IsShutdownInProgress());
4336 }
4337
4338 TEST_P(DBTestWithParam, PreShutdownMultipleCompaction) {
4339 const int kTestKeySize = 16;
4340 const int kTestValueSize = 984;
4341 const int kEntrySize = kTestKeySize + kTestValueSize;
4342 const int kEntriesPerBuffer = 40;
4343 const int kNumL0Files = 4;
4344
4345 const int kHighPriCount = 3;
4346 const int kLowPriCount = 5;
4347 env_->SetBackgroundThreads(kHighPriCount, Env::HIGH);
4348 env_->SetBackgroundThreads(kLowPriCount, Env::LOW);
4349
4350 Options options;
4351 options.create_if_missing = true;
4352 options.write_buffer_size = kEntrySize * kEntriesPerBuffer;
4353 options.compaction_style = kCompactionStyleLevel;
4354 options.target_file_size_base = options.write_buffer_size;
4355 options.max_bytes_for_level_base =
4356 options.target_file_size_base * kNumL0Files;
4357 options.compression = kNoCompression;
4358 options = CurrentOptions(options);
4359 options.env = env_;
4360 options.enable_thread_tracking = true;
4361 options.level0_file_num_compaction_trigger = kNumL0Files;
4362 options.max_bytes_for_level_multiplier = 2;
4363 options.max_background_compactions = kLowPriCount;
4364 options.level0_stop_writes_trigger = 1 << 10;
4365 options.level0_slowdown_writes_trigger = 1 << 10;
4366 options.max_subcompactions = max_subcompactions_;
4367
4368 TryReopen(options);
4369 Random rnd(301);
4370
4371 std::vector<ThreadStatus> thread_list;
4372 // Delay both flush and compaction
4373 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
4374 {{"FlushJob::FlushJob()", "CompactionJob::Run():Start"},
4375 {"CompactionJob::Run():Start",
4376 "DBTest::PreShutdownMultipleCompaction:Preshutdown"},
4377 {"CompactionJob::Run():Start",
4378 "DBTest::PreShutdownMultipleCompaction:VerifyCompaction"},
4379 {"DBTest::PreShutdownMultipleCompaction:Preshutdown",
4380 "CompactionJob::Run():End"},
4381 {"CompactionJob::Run():End",
4382 "DBTest::PreShutdownMultipleCompaction:VerifyPreshutdown"}});
4383
4384 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4385
4386 // Make rocksdb busy
4387 int key = 0;
4388 // check how many threads are doing compaction using GetThreadList
4389 int operation_count[ThreadStatus::NUM_OP_TYPES] = {0};
4390 for (int file = 0; file < 16 * kNumL0Files; ++file) {
4391 for (int k = 0; k < kEntriesPerBuffer; ++k) {
4392 ASSERT_OK(Put(ToString(key++), RandomString(&rnd, kTestValueSize)));
4393 }
4394
4395 Status s = env_->GetThreadList(&thread_list);
4396 for (auto thread : thread_list) {
4397 operation_count[thread.operation_type]++;
4398 }
4399
4400 // Speed up the test
4401 if (operation_count[ThreadStatus::OP_FLUSH] > 1 &&
4402 operation_count[ThreadStatus::OP_COMPACTION] >
4403 0.6 * options.max_background_compactions) {
4404 break;
4405 }
4406 if (file == 15 * kNumL0Files) {
4407 TEST_SYNC_POINT("DBTest::PreShutdownMultipleCompaction:Preshutdown");
4408 }
4409 }
4410
4411 TEST_SYNC_POINT("DBTest::PreShutdownMultipleCompaction:Preshutdown");
4412 ASSERT_GE(operation_count[ThreadStatus::OP_COMPACTION], 1);
4413 CancelAllBackgroundWork(db_);
4414 TEST_SYNC_POINT("DBTest::PreShutdownMultipleCompaction:VerifyPreshutdown");
4415 dbfull()->TEST_WaitForCompact();
4416 // Record the number of compactions at a time.
4417 for (int i = 0; i < ThreadStatus::NUM_OP_TYPES; ++i) {
4418 operation_count[i] = 0;
4419 }
4420 Status s = env_->GetThreadList(&thread_list);
4421 for (auto thread : thread_list) {
4422 operation_count[thread.operation_type]++;
4423 }
4424 ASSERT_EQ(operation_count[ThreadStatus::OP_COMPACTION], 0);
4425 }
4426
4427 TEST_P(DBTestWithParam, PreShutdownCompactionMiddle) {
4428 const int kTestKeySize = 16;
4429 const int kTestValueSize = 984;
4430 const int kEntrySize = kTestKeySize + kTestValueSize;
4431 const int kEntriesPerBuffer = 40;
4432 const int kNumL0Files = 4;
4433
4434 const int kHighPriCount = 3;
4435 const int kLowPriCount = 5;
4436 env_->SetBackgroundThreads(kHighPriCount, Env::HIGH);
4437 env_->SetBackgroundThreads(kLowPriCount, Env::LOW);
4438
4439 Options options;
4440 options.create_if_missing = true;
4441 options.write_buffer_size = kEntrySize * kEntriesPerBuffer;
4442 options.compaction_style = kCompactionStyleLevel;
4443 options.target_file_size_base = options.write_buffer_size;
4444 options.max_bytes_for_level_base =
4445 options.target_file_size_base * kNumL0Files;
4446 options.compression = kNoCompression;
4447 options = CurrentOptions(options);
4448 options.env = env_;
4449 options.enable_thread_tracking = true;
4450 options.level0_file_num_compaction_trigger = kNumL0Files;
4451 options.max_bytes_for_level_multiplier = 2;
4452 options.max_background_compactions = kLowPriCount;
4453 options.level0_stop_writes_trigger = 1 << 10;
4454 options.level0_slowdown_writes_trigger = 1 << 10;
4455 options.max_subcompactions = max_subcompactions_;
4456
4457 TryReopen(options);
4458 Random rnd(301);
4459
4460 std::vector<ThreadStatus> thread_list;
4461 // Delay both flush and compaction
4462 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
4463 {{"DBTest::PreShutdownCompactionMiddle:Preshutdown",
4464 "CompactionJob::Run():Inprogress"},
4465 {"CompactionJob::Run():Start",
4466 "DBTest::PreShutdownCompactionMiddle:VerifyCompaction"},
4467 {"CompactionJob::Run():Inprogress", "CompactionJob::Run():End"},
4468 {"CompactionJob::Run():End",
4469 "DBTest::PreShutdownCompactionMiddle:VerifyPreshutdown"}});
4470
4471 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4472
4473 // Make rocksdb busy
4474 int key = 0;
4475 // check how many threads are doing compaction using GetThreadList
4476 int operation_count[ThreadStatus::NUM_OP_TYPES] = {0};
4477 for (int file = 0; file < 16 * kNumL0Files; ++file) {
4478 for (int k = 0; k < kEntriesPerBuffer; ++k) {
4479 ASSERT_OK(Put(ToString(key++), RandomString(&rnd, kTestValueSize)));
4480 }
4481
4482 Status s = env_->GetThreadList(&thread_list);
4483 for (auto thread : thread_list) {
4484 operation_count[thread.operation_type]++;
4485 }
4486
4487 // Speed up the test
4488 if (operation_count[ThreadStatus::OP_FLUSH] > 1 &&
4489 operation_count[ThreadStatus::OP_COMPACTION] >
4490 0.6 * options.max_background_compactions) {
4491 break;
4492 }
4493 if (file == 15 * kNumL0Files) {
4494 TEST_SYNC_POINT("DBTest::PreShutdownCompactionMiddle:VerifyCompaction");
4495 }
4496 }
4497
4498 ASSERT_GE(operation_count[ThreadStatus::OP_COMPACTION], 1);
4499 CancelAllBackgroundWork(db_);
4500 TEST_SYNC_POINT("DBTest::PreShutdownCompactionMiddle:Preshutdown");
4501 TEST_SYNC_POINT("DBTest::PreShutdownCompactionMiddle:VerifyPreshutdown");
4502 dbfull()->TEST_WaitForCompact();
4503 // Record the number of compactions at a time.
4504 for (int i = 0; i < ThreadStatus::NUM_OP_TYPES; ++i) {
4505 operation_count[i] = 0;
4506 }
4507 Status s = env_->GetThreadList(&thread_list);
4508 for (auto thread : thread_list) {
4509 operation_count[thread.operation_type]++;
4510 }
4511 ASSERT_EQ(operation_count[ThreadStatus::OP_COMPACTION], 0);
4512 }
4513
4514 #endif // ROCKSDB_USING_THREAD_STATUS
4515
4516 #ifndef ROCKSDB_LITE
4517 TEST_F(DBTest, FlushOnDestroy) {
4518 WriteOptions wo;
4519 wo.disableWAL = true;
4520 ASSERT_OK(Put("foo", "v1", wo));
4521 CancelAllBackgroundWork(db_);
4522 }
4523
4524 TEST_F(DBTest, DynamicLevelCompressionPerLevel) {
4525 if (!Snappy_Supported()) {
4526 return;
4527 }
4528 const int kNKeys = 120;
4529 int keys[kNKeys];
4530 for (int i = 0; i < kNKeys; i++) {
4531 keys[i] = i;
4532 }
4533 std::random_shuffle(std::begin(keys), std::end(keys));
4534
4535 Random rnd(301);
4536 Options options;
4537 options.create_if_missing = true;
4538 options.db_write_buffer_size = 20480;
4539 options.write_buffer_size = 20480;
4540 options.max_write_buffer_number = 2;
4541 options.level0_file_num_compaction_trigger = 2;
4542 options.level0_slowdown_writes_trigger = 2;
4543 options.level0_stop_writes_trigger = 2;
4544 options.target_file_size_base = 20480;
4545 options.level_compaction_dynamic_level_bytes = true;
4546 options.max_bytes_for_level_base = 102400;
4547 options.max_bytes_for_level_multiplier = 4;
4548 options.max_background_compactions = 1;
4549 options.num_levels = 5;
4550
4551 options.compression_per_level.resize(3);
4552 options.compression_per_level[0] = kNoCompression;
4553 options.compression_per_level[1] = kNoCompression;
4554 options.compression_per_level[2] = kSnappyCompression;
4555
4556 OnFileDeletionListener* listener = new OnFileDeletionListener();
4557 options.listeners.emplace_back(listener);
4558
4559 DestroyAndReopen(options);
4560
4561 // Insert more than 80K. L4 should be base level. Neither L0 nor L4 should
4562 // be compressed, so total data size should be more than 80K.
4563 for (int i = 0; i < 20; i++) {
4564 ASSERT_OK(Put(Key(keys[i]), CompressibleString(&rnd, 4000)));
4565 }
4566 Flush();
4567 dbfull()->TEST_WaitForCompact();
4568
4569 ASSERT_EQ(NumTableFilesAtLevel(1), 0);
4570 ASSERT_EQ(NumTableFilesAtLevel(2), 0);
4571 ASSERT_EQ(NumTableFilesAtLevel(3), 0);
4572 // Assuming each files' metadata is at least 50 bytes/
4573 ASSERT_GT(SizeAtLevel(0) + SizeAtLevel(4), 20U * 4000U + 50U * 4);
4574
4575 // Insert 400KB. Some data will be compressed
4576 for (int i = 21; i < 120; i++) {
4577 ASSERT_OK(Put(Key(keys[i]), CompressibleString(&rnd, 4000)));
4578 }
4579 Flush();
4580 dbfull()->TEST_WaitForCompact();
4581 ASSERT_EQ(NumTableFilesAtLevel(1), 0);
4582 ASSERT_EQ(NumTableFilesAtLevel(2), 0);
4583 ASSERT_LT(SizeAtLevel(0) + SizeAtLevel(3) + SizeAtLevel(4),
4584 120U * 4000U + 50U * 24);
4585 // Make sure data in files in L3 is not compacted by removing all files
4586 // in L4 and calculate number of rows
4587 ASSERT_OK(dbfull()->SetOptions({
4588 {"disable_auto_compactions", "true"},
4589 }));
4590 ColumnFamilyMetaData cf_meta;
4591 db_->GetColumnFamilyMetaData(&cf_meta);
4592 for (auto file : cf_meta.levels[4].files) {
4593 listener->SetExpectedFileName(dbname_ + file.name);
4594 ASSERT_OK(dbfull()->DeleteFile(file.name));
4595 }
4596 listener->VerifyMatchedCount(cf_meta.levels[4].files.size());
4597
4598 int num_keys = 0;
4599 std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
4600 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
4601 num_keys++;
4602 }
4603 ASSERT_OK(iter->status());
4604 ASSERT_GT(SizeAtLevel(0) + SizeAtLevel(3), num_keys * 4000U + num_keys * 10U);
4605 }
4606
4607 TEST_F(DBTest, DynamicLevelCompressionPerLevel2) {
4608 if (!Snappy_Supported() || !LZ4_Supported() || !Zlib_Supported()) {
4609 return;
4610 }
4611 const int kNKeys = 500;
4612 int keys[kNKeys];
4613 for (int i = 0; i < kNKeys; i++) {
4614 keys[i] = i;
4615 }
4616 std::random_shuffle(std::begin(keys), std::end(keys));
4617
4618 Random rnd(301);
4619 Options options;
4620 options.create_if_missing = true;
4621 options.db_write_buffer_size = 6000000;
4622 options.write_buffer_size = 600000;
4623 options.max_write_buffer_number = 2;
4624 options.level0_file_num_compaction_trigger = 2;
4625 options.level0_slowdown_writes_trigger = 2;
4626 options.level0_stop_writes_trigger = 2;
4627 options.soft_pending_compaction_bytes_limit = 1024 * 1024;
4628 options.target_file_size_base = 20;
4629
4630 options.level_compaction_dynamic_level_bytes = true;
4631 options.max_bytes_for_level_base = 200;
4632 options.max_bytes_for_level_multiplier = 8;
4633 options.max_background_compactions = 1;
4634 options.num_levels = 5;
4635 std::shared_ptr<mock::MockTableFactory> mtf(new mock::MockTableFactory);
4636 options.table_factory = mtf;
4637
4638 options.compression_per_level.resize(3);
4639 options.compression_per_level[0] = kNoCompression;
4640 options.compression_per_level[1] = kLZ4Compression;
4641 options.compression_per_level[2] = kZlibCompression;
4642
4643 DestroyAndReopen(options);
4644 // When base level is L4, L4 is LZ4.
4645 std::atomic<int> num_zlib(0);
4646 std::atomic<int> num_lz4(0);
4647 std::atomic<int> num_no(0);
4648 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4649 "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
4650 Compaction* compaction = reinterpret_cast<Compaction*>(arg);
4651 if (compaction->output_level() == 4) {
4652 ASSERT_TRUE(compaction->output_compression() == kLZ4Compression);
4653 num_lz4.fetch_add(1);
4654 }
4655 });
4656 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4657 "FlushJob::WriteLevel0Table:output_compression", [&](void* arg) {
4658 auto* compression = reinterpret_cast<CompressionType*>(arg);
4659 ASSERT_TRUE(*compression == kNoCompression);
4660 num_no.fetch_add(1);
4661 });
4662 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4663
4664 for (int i = 0; i < 100; i++) {
4665 std::string value = RandomString(&rnd, 200);
4666 ASSERT_OK(Put(Key(keys[i]), value));
4667 if (i % 25 == 24) {
4668 Flush();
4669 dbfull()->TEST_WaitForCompact();
4670 }
4671 }
4672
4673 Flush();
4674 dbfull()->TEST_WaitForFlushMemTable();
4675 dbfull()->TEST_WaitForCompact();
4676 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4677 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
4678
4679 ASSERT_EQ(NumTableFilesAtLevel(1), 0);
4680 ASSERT_EQ(NumTableFilesAtLevel(2), 0);
4681 ASSERT_EQ(NumTableFilesAtLevel(3), 0);
4682 ASSERT_GT(NumTableFilesAtLevel(4), 0);
4683 ASSERT_GT(num_no.load(), 2);
4684 ASSERT_GT(num_lz4.load(), 0);
4685 int prev_num_files_l4 = NumTableFilesAtLevel(4);
4686
4687 // After base level turn L4->L3, L3 becomes LZ4 and L4 becomes Zlib
4688 num_lz4.store(0);
4689 num_no.store(0);
4690 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4691 "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
4692 Compaction* compaction = reinterpret_cast<Compaction*>(arg);
4693 if (compaction->output_level() == 4 && compaction->start_level() == 3) {
4694 ASSERT_TRUE(compaction->output_compression() == kZlibCompression);
4695 num_zlib.fetch_add(1);
4696 } else {
4697 ASSERT_TRUE(compaction->output_compression() == kLZ4Compression);
4698 num_lz4.fetch_add(1);
4699 }
4700 });
4701 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4702 "FlushJob::WriteLevel0Table:output_compression", [&](void* arg) {
4703 auto* compression = reinterpret_cast<CompressionType*>(arg);
4704 ASSERT_TRUE(*compression == kNoCompression);
4705 num_no.fetch_add(1);
4706 });
4707 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4708
4709 for (int i = 101; i < 500; i++) {
4710 std::string value = RandomString(&rnd, 200);
4711 ASSERT_OK(Put(Key(keys[i]), value));
4712 if (i % 100 == 99) {
4713 Flush();
4714 dbfull()->TEST_WaitForCompact();
4715 }
4716 }
4717
4718 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
4719 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4720 ASSERT_EQ(NumTableFilesAtLevel(1), 0);
4721 ASSERT_EQ(NumTableFilesAtLevel(2), 0);
4722 ASSERT_GT(NumTableFilesAtLevel(3), 0);
4723 ASSERT_GT(NumTableFilesAtLevel(4), prev_num_files_l4);
4724 ASSERT_GT(num_no.load(), 2);
4725 ASSERT_GT(num_lz4.load(), 0);
4726 ASSERT_GT(num_zlib.load(), 0);
4727 }
4728
4729 TEST_F(DBTest, DynamicCompactionOptions) {
4730 // minimum write buffer size is enforced at 64KB
4731 const uint64_t k32KB = 1 << 15;
4732 const uint64_t k64KB = 1 << 16;
4733 const uint64_t k128KB = 1 << 17;
4734 const uint64_t k1MB = 1 << 20;
4735 const uint64_t k4KB = 1 << 12;
4736 Options options;
4737 options.env = env_;
4738 options.create_if_missing = true;
4739 options.compression = kNoCompression;
4740 options.soft_pending_compaction_bytes_limit = 1024 * 1024;
4741 options.write_buffer_size = k64KB;
4742 options.arena_block_size = 4 * k4KB;
4743 options.max_write_buffer_number = 2;
4744 // Compaction related options
4745 options.level0_file_num_compaction_trigger = 3;
4746 options.level0_slowdown_writes_trigger = 4;
4747 options.level0_stop_writes_trigger = 8;
4748 options.target_file_size_base = k64KB;
4749 options.max_compaction_bytes = options.target_file_size_base * 10;
4750 options.target_file_size_multiplier = 1;
4751 options.max_bytes_for_level_base = k128KB;
4752 options.max_bytes_for_level_multiplier = 4;
4753
4754 // Block flush thread and disable compaction thread
4755 env_->SetBackgroundThreads(1, Env::LOW);
4756 env_->SetBackgroundThreads(1, Env::HIGH);
4757 DestroyAndReopen(options);
4758
4759 auto gen_l0_kb = [this](int start, int size, int stride) {
4760 Random rnd(301);
4761 for (int i = 0; i < size; i++) {
4762 ASSERT_OK(Put(Key(start + stride * i), RandomString(&rnd, 1024)));
4763 }
4764 dbfull()->TEST_WaitForFlushMemTable();
4765 };
4766
4767 // Write 3 files that have the same key range.
4768 // Since level0_file_num_compaction_trigger is 3, compaction should be
4769 // triggered. The compaction should result in one L1 file
4770 gen_l0_kb(0, 64, 1);
4771 ASSERT_EQ(NumTableFilesAtLevel(0), 1);
4772 gen_l0_kb(0, 64, 1);
4773 ASSERT_EQ(NumTableFilesAtLevel(0), 2);
4774 gen_l0_kb(0, 64, 1);
4775 dbfull()->TEST_WaitForCompact();
4776 ASSERT_EQ("0,1", FilesPerLevel());
4777 std::vector<LiveFileMetaData> metadata;
4778 db_->GetLiveFilesMetaData(&metadata);
4779 ASSERT_EQ(1U, metadata.size());
4780 ASSERT_LE(metadata[0].size, k64KB + k4KB);
4781 ASSERT_GE(metadata[0].size, k64KB - k4KB);
4782
4783 // Test compaction trigger and target_file_size_base
4784 // Reduce compaction trigger to 2, and reduce L1 file size to 32KB.
4785 // Writing to 64KB L0 files should trigger a compaction. Since these
4786 // 2 L0 files have the same key range, compaction merge them and should
4787 // result in 2 32KB L1 files.
4788 ASSERT_OK(dbfull()->SetOptions({{"level0_file_num_compaction_trigger", "2"},
4789 {"target_file_size_base", ToString(k32KB)}}));
4790
4791 gen_l0_kb(0, 64, 1);
4792 ASSERT_EQ("1,1", FilesPerLevel());
4793 gen_l0_kb(0, 64, 1);
4794 dbfull()->TEST_WaitForCompact();
4795 ASSERT_EQ("0,2", FilesPerLevel());
4796 metadata.clear();
4797 db_->GetLiveFilesMetaData(&metadata);
4798 ASSERT_EQ(2U, metadata.size());
4799 ASSERT_LE(metadata[0].size, k32KB + k4KB);
4800 ASSERT_GE(metadata[0].size, k32KB - k4KB);
4801 ASSERT_LE(metadata[1].size, k32KB + k4KB);
4802 ASSERT_GE(metadata[1].size, k32KB - k4KB);
4803
4804 // Test max_bytes_for_level_base
4805 // Increase level base size to 256KB and write enough data that will
4806 // fill L1 and L2. L1 size should be around 256KB while L2 size should be
4807 // around 256KB x 4.
4808 ASSERT_OK(
4809 dbfull()->SetOptions({{"max_bytes_for_level_base", ToString(k1MB)}}));
4810
4811 // writing 96 x 64KB => 6 * 1024KB
4812 // (L1 + L2) = (1 + 4) * 1024KB
4813 for (int i = 0; i < 96; ++i) {
4814 gen_l0_kb(i, 64, 96);
4815 }
4816 dbfull()->TEST_WaitForCompact();
4817 ASSERT_GT(SizeAtLevel(1), k1MB / 2);
4818 ASSERT_LT(SizeAtLevel(1), k1MB + k1MB / 2);
4819
4820 // Within (0.5, 1.5) of 4MB.
4821 ASSERT_GT(SizeAtLevel(2), 2 * k1MB);
4822 ASSERT_LT(SizeAtLevel(2), 6 * k1MB);
4823
4824 // Test max_bytes_for_level_multiplier and
4825 // max_bytes_for_level_base. Now, reduce both mulitplier and level base,
4826 // After filling enough data that can fit in L1 - L3, we should see L1 size
4827 // reduces to 128KB from 256KB which was asserted previously. Same for L2.
4828 ASSERT_OK(
4829 dbfull()->SetOptions({{"max_bytes_for_level_multiplier", "2"},
4830 {"max_bytes_for_level_base", ToString(k128KB)}}));
4831
4832 // writing 20 x 64KB = 10 x 128KB
4833 // (L1 + L2 + L3) = (1 + 2 + 4) * 128KB
4834 for (int i = 0; i < 20; ++i) {
4835 gen_l0_kb(i, 64, 32);
4836 }
4837 dbfull()->TEST_WaitForCompact();
4838 uint64_t total_size = SizeAtLevel(1) + SizeAtLevel(2) + SizeAtLevel(3);
4839 ASSERT_TRUE(total_size < k128KB * 7 * 1.5);
4840
4841 // Test level0_stop_writes_trigger.
4842 // Clean up memtable and L0. Block compaction threads. If continue to write
4843 // and flush memtables. We should see put stop after 8 memtable flushes
4844 // since level0_stop_writes_trigger = 8
4845 dbfull()->TEST_FlushMemTable(true, true);
4846 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
4847 // Block compaction
4848 test::SleepingBackgroundTask sleeping_task_low;
4849 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
4850 Env::Priority::LOW);
4851 sleeping_task_low.WaitUntilSleeping();
4852 ASSERT_EQ(NumTableFilesAtLevel(0), 0);
4853 int count = 0;
4854 Random rnd(301);
4855 WriteOptions wo;
4856 while (count < 64) {
4857 ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), wo));
4858 dbfull()->TEST_FlushMemTable(true, true);
4859 count++;
4860 if (dbfull()->TEST_write_controler().IsStopped()) {
4861 sleeping_task_low.WakeUp();
4862 break;
4863 }
4864 }
4865 // Stop trigger = 8
4866 ASSERT_EQ(count, 8);
4867 // Unblock
4868 sleeping_task_low.WaitUntilDone();
4869
4870 // Now reduce level0_stop_writes_trigger to 6. Clear up memtables and L0.
4871 // Block compaction thread again. Perform the put and memtable flushes
4872 // until we see the stop after 6 memtable flushes.
4873 ASSERT_OK(dbfull()->SetOptions({{"level0_stop_writes_trigger", "6"}}));
4874 dbfull()->TEST_FlushMemTable(true);
4875 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
4876 ASSERT_EQ(NumTableFilesAtLevel(0), 0);
4877
4878 // Block compaction again
4879 sleeping_task_low.Reset();
4880 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
4881 Env::Priority::LOW);
4882 sleeping_task_low.WaitUntilSleeping();
4883 count = 0;
4884 while (count < 64) {
4885 ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), wo));
4886 dbfull()->TEST_FlushMemTable(true, true);
4887 count++;
4888 if (dbfull()->TEST_write_controler().IsStopped()) {
4889 sleeping_task_low.WakeUp();
4890 break;
4891 }
4892 }
4893 ASSERT_EQ(count, 6);
4894 // Unblock
4895 sleeping_task_low.WaitUntilDone();
4896
4897 // Test disable_auto_compactions
4898 // Compaction thread is unblocked but auto compaction is disabled. Write
4899 // 4 L0 files and compaction should be triggered. If auto compaction is
4900 // disabled, then TEST_WaitForCompact will be waiting for nothing. Number of
4901 // L0 files do not change after the call.
4902 ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "true"}}));
4903 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
4904 ASSERT_EQ(NumTableFilesAtLevel(0), 0);
4905
4906 for (int i = 0; i < 4; ++i) {
4907 ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024)));
4908 // Wait for compaction so that put won't stop
4909 dbfull()->TEST_FlushMemTable(true);
4910 }
4911 dbfull()->TEST_WaitForCompact();
4912 ASSERT_EQ(NumTableFilesAtLevel(0), 4);
4913
4914 // Enable auto compaction and perform the same test, # of L0 files should be
4915 // reduced after compaction.
4916 ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
4917 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
4918 ASSERT_EQ(NumTableFilesAtLevel(0), 0);
4919
4920 for (int i = 0; i < 4; ++i) {
4921 ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024)));
4922 // Wait for compaction so that put won't stop
4923 dbfull()->TEST_FlushMemTable(true);
4924 }
4925 dbfull()->TEST_WaitForCompact();
4926 ASSERT_LT(NumTableFilesAtLevel(0), 4);
4927 }
4928
4929 // Test dynamic FIFO compaction options.
4930 // This test covers just option parsing and makes sure that the options are
4931 // correctly assigned. Also look at DBOptionsTest.SetFIFOCompactionOptions
4932 // test which makes sure that the FIFO compaction funcionality is working
4933 // as expected on dynamically changing the options.
4934 // Even more FIFOCompactionTests are at DBTest.FIFOCompaction* .
4935 TEST_F(DBTest, DynamicFIFOCompactionOptions) {
4936 Options options;
4937 options.ttl = 0;
4938 options.create_if_missing = true;
4939 DestroyAndReopen(options);
4940
4941 // Initial defaults
4942 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.max_table_files_size,
4943 1024 * 1024 * 1024);
4944 ASSERT_EQ(dbfull()->GetOptions().ttl, 0);
4945 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.allow_compaction,
4946 false);
4947
4948 ASSERT_OK(dbfull()->SetOptions(
4949 {{"compaction_options_fifo", "{max_table_files_size=23;}"}}));
4950 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.max_table_files_size,
4951 23);
4952 ASSERT_EQ(dbfull()->GetOptions().ttl, 0);
4953 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.allow_compaction,
4954 false);
4955
4956 ASSERT_OK(dbfull()->SetOptions({{"ttl", "97"}}));
4957 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.max_table_files_size,
4958 23);
4959 ASSERT_EQ(dbfull()->GetOptions().ttl, 97);
4960 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.allow_compaction,
4961 false);
4962
4963 ASSERT_OK(dbfull()->SetOptions({{"ttl", "203"}}));
4964 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.max_table_files_size,
4965 23);
4966 ASSERT_EQ(dbfull()->GetOptions().ttl, 203);
4967 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.allow_compaction,
4968 false);
4969
4970 ASSERT_OK(dbfull()->SetOptions(
4971 {{"compaction_options_fifo", "{allow_compaction=true;}"}}));
4972 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.max_table_files_size,
4973 23);
4974 ASSERT_EQ(dbfull()->GetOptions().ttl, 203);
4975 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.allow_compaction,
4976 true);
4977
4978 ASSERT_OK(dbfull()->SetOptions(
4979 {{"compaction_options_fifo", "{max_table_files_size=31;}"}}));
4980 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.max_table_files_size,
4981 31);
4982 ASSERT_EQ(dbfull()->GetOptions().ttl, 203);
4983 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.allow_compaction,
4984 true);
4985
4986 ASSERT_OK(dbfull()->SetOptions(
4987 {{"compaction_options_fifo",
4988 "{max_table_files_size=51;allow_compaction=true;}"}}));
4989 ASSERT_OK(dbfull()->SetOptions({{"ttl", "49"}}));
4990 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.max_table_files_size,
4991 51);
4992 ASSERT_EQ(dbfull()->GetOptions().ttl, 49);
4993 ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.allow_compaction,
4994 true);
4995 }
4996
4997 TEST_F(DBTest, DynamicUniversalCompactionOptions) {
4998 Options options;
4999 options.create_if_missing = true;
5000 DestroyAndReopen(options);
5001
5002 // Initial defaults
5003 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.size_ratio, 1U);
5004 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.min_merge_width,
5005 2u);
5006 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.max_merge_width,
5007 UINT_MAX);
5008 ASSERT_EQ(dbfull()
5009 ->GetOptions()
5010 .compaction_options_universal.max_size_amplification_percent,
5011 200u);
5012 ASSERT_EQ(dbfull()
5013 ->GetOptions()
5014 .compaction_options_universal.compression_size_percent,
5015 -1);
5016 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.stop_style,
5017 kCompactionStopStyleTotalSize);
5018 ASSERT_EQ(
5019 dbfull()->GetOptions().compaction_options_universal.allow_trivial_move,
5020 false);
5021
5022 ASSERT_OK(dbfull()->SetOptions(
5023 {{"compaction_options_universal", "{size_ratio=7;}"}}));
5024 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.size_ratio, 7u);
5025 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.min_merge_width,
5026 2u);
5027 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.max_merge_width,
5028 UINT_MAX);
5029 ASSERT_EQ(dbfull()
5030 ->GetOptions()
5031 .compaction_options_universal.max_size_amplification_percent,
5032 200u);
5033 ASSERT_EQ(dbfull()
5034 ->GetOptions()
5035 .compaction_options_universal.compression_size_percent,
5036 -1);
5037 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.stop_style,
5038 kCompactionStopStyleTotalSize);
5039 ASSERT_EQ(
5040 dbfull()->GetOptions().compaction_options_universal.allow_trivial_move,
5041 false);
5042
5043 ASSERT_OK(dbfull()->SetOptions(
5044 {{"compaction_options_universal", "{min_merge_width=11;}"}}));
5045 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.size_ratio, 7u);
5046 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.min_merge_width,
5047 11u);
5048 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.max_merge_width,
5049 UINT_MAX);
5050 ASSERT_EQ(dbfull()
5051 ->GetOptions()
5052 .compaction_options_universal.max_size_amplification_percent,
5053 200u);
5054 ASSERT_EQ(dbfull()
5055 ->GetOptions()
5056 .compaction_options_universal.compression_size_percent,
5057 -1);
5058 ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.stop_style,
5059 kCompactionStopStyleTotalSize);
5060 ASSERT_EQ(
5061 dbfull()->GetOptions().compaction_options_universal.allow_trivial_move,
5062 false);
5063 }
5064 #endif // ROCKSDB_LITE
5065
5066 TEST_F(DBTest, FileCreationRandomFailure) {
5067 Options options;
5068 options.env = env_;
5069 options.create_if_missing = true;
5070 options.write_buffer_size = 100000; // Small write buffer
5071 options.target_file_size_base = 200000;
5072 options.max_bytes_for_level_base = 1000000;
5073 options.max_bytes_for_level_multiplier = 2;
5074
5075 DestroyAndReopen(options);
5076 Random rnd(301);
5077
5078 const int kCDTKeysPerBuffer = 4;
5079 const int kTestSize = kCDTKeysPerBuffer * 4096;
5080 const int kTotalIteration = 100;
5081 // the second half of the test involves in random failure
5082 // of file creation.
5083 const int kRandomFailureTest = kTotalIteration / 2;
5084 std::vector<std::string> values;
5085 for (int i = 0; i < kTestSize; ++i) {
5086 values.push_back("NOT_FOUND");
5087 }
5088 for (int j = 0; j < kTotalIteration; ++j) {
5089 if (j == kRandomFailureTest) {
5090 env_->non_writeable_rate_.store(90);
5091 }
5092 for (int k = 0; k < kTestSize; ++k) {
5093 // here we expect some of the Put fails.
5094 std::string value = RandomString(&rnd, 100);
5095 Status s = Put(Key(k), Slice(value));
5096 if (s.ok()) {
5097 // update the latest successful put
5098 values[k] = value;
5099 }
5100 // But everything before we simulate the failure-test should succeed.
5101 if (j < kRandomFailureTest) {
5102 ASSERT_OK(s);
5103 }
5104 }
5105 }
5106
5107 // If rocksdb does not do the correct job, internal assert will fail here.
5108 dbfull()->TEST_WaitForFlushMemTable();
5109 dbfull()->TEST_WaitForCompact();
5110
5111 // verify we have the latest successful update
5112 for (int k = 0; k < kTestSize; ++k) {
5113 auto v = Get(Key(k));
5114 ASSERT_EQ(v, values[k]);
5115 }
5116
5117 // reopen and reverify we have the latest successful update
5118 env_->non_writeable_rate_.store(0);
5119 Reopen(options);
5120 for (int k = 0; k < kTestSize; ++k) {
5121 auto v = Get(Key(k));
5122 ASSERT_EQ(v, values[k]);
5123 }
5124 }
5125
5126 #ifndef ROCKSDB_LITE
5127
5128 TEST_F(DBTest, DynamicMiscOptions) {
5129 // Test max_sequential_skip_in_iterations
5130 Options options;
5131 options.env = env_;
5132 options.create_if_missing = true;
5133 options.max_sequential_skip_in_iterations = 16;
5134 options.compression = kNoCompression;
5135 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
5136 DestroyAndReopen(options);
5137
5138 auto assert_reseek_count = [this, &options](int key_start, int num_reseek) {
5139 int key0 = key_start;
5140 int key1 = key_start + 1;
5141 int key2 = key_start + 2;
5142 Random rnd(301);
5143 ASSERT_OK(Put(Key(key0), RandomString(&rnd, 8)));
5144 for (int i = 0; i < 10; ++i) {
5145 ASSERT_OK(Put(Key(key1), RandomString(&rnd, 8)));
5146 }
5147 ASSERT_OK(Put(Key(key2), RandomString(&rnd, 8)));
5148 std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
5149 iter->Seek(Key(key1));
5150 ASSERT_TRUE(iter->Valid());
5151 ASSERT_EQ(iter->key().compare(Key(key1)), 0);
5152 iter->Next();
5153 ASSERT_TRUE(iter->Valid());
5154 ASSERT_EQ(iter->key().compare(Key(key2)), 0);
5155 ASSERT_EQ(num_reseek,
5156 TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION));
5157 };
5158 // No reseek
5159 assert_reseek_count(100, 0);
5160
5161 ASSERT_OK(dbfull()->SetOptions({{"max_sequential_skip_in_iterations", "4"}}));
5162 // Clear memtable and make new option effective
5163 dbfull()->TEST_FlushMemTable(true);
5164 // Trigger reseek
5165 assert_reseek_count(200, 1);
5166
5167 ASSERT_OK(
5168 dbfull()->SetOptions({{"max_sequential_skip_in_iterations", "16"}}));
5169 // Clear memtable and make new option effective
5170 dbfull()->TEST_FlushMemTable(true);
5171 // No reseek
5172 assert_reseek_count(300, 1);
5173
5174 MutableCFOptions mutable_cf_options;
5175 CreateAndReopenWithCF({"pikachu"}, options);
5176 // Test soft_pending_compaction_bytes_limit,
5177 // hard_pending_compaction_bytes_limit
5178 ASSERT_OK(dbfull()->SetOptions(
5179 handles_[1], {{"soft_pending_compaction_bytes_limit", "200"},
5180 {"hard_pending_compaction_bytes_limit", "300"}}));
5181 ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_[1],
5182 &mutable_cf_options));
5183 ASSERT_EQ(200, mutable_cf_options.soft_pending_compaction_bytes_limit);
5184 ASSERT_EQ(300, mutable_cf_options.hard_pending_compaction_bytes_limit);
5185 // Test report_bg_io_stats
5186 ASSERT_OK(
5187 dbfull()->SetOptions(handles_[1], {{"report_bg_io_stats", "true"}}));
5188 // sanity check
5189 ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_[1],
5190 &mutable_cf_options));
5191 ASSERT_TRUE(mutable_cf_options.report_bg_io_stats);
5192 // Test compression
5193 // sanity check
5194 ASSERT_OK(dbfull()->SetOptions({{"compression", "kNoCompression"}}));
5195 ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_[0],
5196 &mutable_cf_options));
5197 ASSERT_EQ(CompressionType::kNoCompression, mutable_cf_options.compression);
5198
5199 if (Snappy_Supported()) {
5200 ASSERT_OK(dbfull()->SetOptions({{"compression", "kSnappyCompression"}}));
5201 ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_[0],
5202 &mutable_cf_options));
5203 ASSERT_EQ(CompressionType::kSnappyCompression,
5204 mutable_cf_options.compression);
5205 }
5206
5207 // Test paranoid_file_checks already done in db_block_cache_test
5208 ASSERT_OK(
5209 dbfull()->SetOptions(handles_[1], {{"paranoid_file_checks", "true"}}));
5210 ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_[1],
5211 &mutable_cf_options));
5212 ASSERT_TRUE(mutable_cf_options.report_bg_io_stats);
5213 }
5214 #endif // ROCKSDB_LITE
5215
5216 TEST_F(DBTest, L0L1L2AndUpHitCounter) {
5217 Options options = CurrentOptions();
5218 options.write_buffer_size = 32 * 1024;
5219 options.target_file_size_base = 32 * 1024;
5220 options.level0_file_num_compaction_trigger = 2;
5221 options.level0_slowdown_writes_trigger = 2;
5222 options.level0_stop_writes_trigger = 4;
5223 options.max_bytes_for_level_base = 64 * 1024;
5224 options.max_write_buffer_number = 2;
5225 options.max_background_compactions = 8;
5226 options.max_background_flushes = 8;
5227 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
5228 CreateAndReopenWithCF({"mypikachu"}, options);
5229
5230 int numkeys = 20000;
5231 for (int i = 0; i < numkeys; i++) {
5232 ASSERT_OK(Put(1, Key(i), "val"));
5233 }
5234 ASSERT_EQ(0, TestGetTickerCount(options, GET_HIT_L0));
5235 ASSERT_EQ(0, TestGetTickerCount(options, GET_HIT_L1));
5236 ASSERT_EQ(0, TestGetTickerCount(options, GET_HIT_L2_AND_UP));
5237
5238 ASSERT_OK(Flush(1));
5239 dbfull()->TEST_WaitForCompact();
5240
5241 for (int i = 0; i < numkeys; i++) {
5242 ASSERT_EQ(Get(1, Key(i)), "val");
5243 }
5244
5245 ASSERT_GT(TestGetTickerCount(options, GET_HIT_L0), 100);
5246 ASSERT_GT(TestGetTickerCount(options, GET_HIT_L1), 100);
5247 ASSERT_GT(TestGetTickerCount(options, GET_HIT_L2_AND_UP), 100);
5248
5249 ASSERT_EQ(numkeys, TestGetTickerCount(options, GET_HIT_L0) +
5250 TestGetTickerCount(options, GET_HIT_L1) +
5251 TestGetTickerCount(options, GET_HIT_L2_AND_UP));
5252 }
5253
5254 TEST_F(DBTest, EncodeDecompressedBlockSizeTest) {
5255 // iter 0 -- zlib
5256 // iter 1 -- bzip2
5257 // iter 2 -- lz4
5258 // iter 3 -- lz4HC
5259 // iter 4 -- xpress
5260 CompressionType compressions[] = {kZlibCompression, kBZip2Compression,
5261 kLZ4Compression, kLZ4HCCompression,
5262 kXpressCompression};
5263 for (auto comp : compressions) {
5264 if (!CompressionTypeSupported(comp)) {
5265 continue;
5266 }
5267 // first_table_version 1 -- generate with table_version == 1, read with
5268 // table_version == 2
5269 // first_table_version 2 -- generate with table_version == 2, read with
5270 // table_version == 1
5271 for (int first_table_version = 1; first_table_version <= 2;
5272 ++first_table_version) {
5273 BlockBasedTableOptions table_options;
5274 table_options.format_version = first_table_version;
5275 table_options.filter_policy.reset(NewBloomFilterPolicy(10));
5276 Options options = CurrentOptions();
5277 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
5278 options.create_if_missing = true;
5279 options.compression = comp;
5280 DestroyAndReopen(options);
5281
5282 int kNumKeysWritten = 1000;
5283
5284 Random rnd(301);
5285 for (int i = 0; i < kNumKeysWritten; ++i) {
5286 // compressible string
5287 ASSERT_OK(Put(Key(i), RandomString(&rnd, 128) + std::string(128, 'a')));
5288 }
5289
5290 table_options.format_version = first_table_version == 1 ? 2 : 1;
5291 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
5292 Reopen(options);
5293 for (int i = 0; i < kNumKeysWritten; ++i) {
5294 auto r = Get(Key(i));
5295 ASSERT_EQ(r.substr(128), std::string(128, 'a'));
5296 }
5297 }
5298 }
5299 }
5300
5301 TEST_F(DBTest, CloseSpeedup) {
5302 Options options = CurrentOptions();
5303 options.compaction_style = kCompactionStyleLevel;
5304 options.write_buffer_size = 110 << 10; // 110KB
5305 options.arena_block_size = 4 << 10;
5306 options.level0_file_num_compaction_trigger = 2;
5307 options.num_levels = 4;
5308 options.max_bytes_for_level_base = 400 * 1024;
5309 options.max_write_buffer_number = 16;
5310
5311 // Block background threads
5312 env_->SetBackgroundThreads(1, Env::LOW);
5313 env_->SetBackgroundThreads(1, Env::HIGH);
5314 test::SleepingBackgroundTask sleeping_task_low;
5315 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
5316 Env::Priority::LOW);
5317 test::SleepingBackgroundTask sleeping_task_high;
5318 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
5319 &sleeping_task_high, Env::Priority::HIGH);
5320
5321 std::vector<std::string> filenames;
5322 env_->GetChildren(dbname_, &filenames);
5323 // Delete archival files.
5324 for (size_t i = 0; i < filenames.size(); ++i) {
5325 env_->DeleteFile(dbname_ + "/" + filenames[i]);
5326 }
5327 env_->DeleteDir(dbname_);
5328 DestroyAndReopen(options);
5329
5330 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
5331 env_->SetBackgroundThreads(1, Env::LOW);
5332 env_->SetBackgroundThreads(1, Env::HIGH);
5333 Random rnd(301);
5334 int key_idx = 0;
5335
5336 // First three 110KB files are not going to level 2
5337 // After that, (100K, 200K)
5338 for (int num = 0; num < 5; num++) {
5339 GenerateNewFile(&rnd, &key_idx, true);
5340 }
5341
5342 ASSERT_EQ(0, GetSstFileCount(dbname_));
5343
5344 Close();
5345 ASSERT_EQ(0, GetSstFileCount(dbname_));
5346
5347 // Unblock background threads
5348 sleeping_task_high.WakeUp();
5349 sleeping_task_high.WaitUntilDone();
5350 sleeping_task_low.WakeUp();
5351 sleeping_task_low.WaitUntilDone();
5352
5353 Destroy(options);
5354 }
5355
5356 class DelayedMergeOperator : public MergeOperator {
5357 private:
5358 DBTest* db_test_;
5359
5360 public:
5361 explicit DelayedMergeOperator(DBTest* d) : db_test_(d) {}
5362
5363 bool FullMergeV2(const MergeOperationInput& /*merge_in*/,
5364 MergeOperationOutput* merge_out) const override {
5365 db_test_->env_->addon_time_.fetch_add(1000);
5366 merge_out->new_value = "";
5367 return true;
5368 }
5369
5370 const char* Name() const override { return "DelayedMergeOperator"; }
5371 };
5372
5373 TEST_F(DBTest, MergeTestTime) {
5374 std::string one, two, three;
5375 PutFixed64(&one, 1);
5376 PutFixed64(&two, 2);
5377 PutFixed64(&three, 3);
5378
5379 // Enable time profiling
5380 SetPerfLevel(kEnableTime);
5381 this->env_->addon_time_.store(0);
5382 this->env_->time_elapse_only_sleep_ = true;
5383 this->env_->no_slowdown_ = true;
5384 Options options = CurrentOptions();
5385 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
5386 options.merge_operator.reset(new DelayedMergeOperator(this));
5387 DestroyAndReopen(options);
5388
5389 ASSERT_EQ(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 0);
5390 db_->Put(WriteOptions(), "foo", one);
5391 ASSERT_OK(Flush());
5392 ASSERT_OK(db_->Merge(WriteOptions(), "foo", two));
5393 ASSERT_OK(Flush());
5394 ASSERT_OK(db_->Merge(WriteOptions(), "foo", three));
5395 ASSERT_OK(Flush());
5396
5397 ReadOptions opt;
5398 opt.verify_checksums = true;
5399 opt.snapshot = nullptr;
5400 std::string result;
5401 db_->Get(opt, "foo", &result);
5402
5403 ASSERT_EQ(1000000, TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME));
5404
5405 ReadOptions read_options;
5406 std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
5407 int count = 0;
5408 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
5409 ASSERT_OK(iter->status());
5410 ++count;
5411 }
5412
5413 ASSERT_EQ(1, count);
5414 ASSERT_EQ(2000000, TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME));
5415 #ifdef ROCKSDB_USING_THREAD_STATUS
5416 ASSERT_GT(TestGetTickerCount(options, FLUSH_WRITE_BYTES), 0);
5417 #endif // ROCKSDB_USING_THREAD_STATUS
5418 this->env_->time_elapse_only_sleep_ = false;
5419 }
5420
5421 #ifndef ROCKSDB_LITE
5422 TEST_P(DBTestWithParam, MergeCompactionTimeTest) {
5423 SetPerfLevel(kEnableTime);
5424 Options options = CurrentOptions();
5425 options.compaction_filter_factory = std::make_shared<KeepFilterFactory>();
5426 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
5427 options.merge_operator.reset(new DelayedMergeOperator(this));
5428 options.compaction_style = kCompactionStyleUniversal;
5429 options.max_subcompactions = max_subcompactions_;
5430 DestroyAndReopen(options);
5431
5432 for (int i = 0; i < 1000; i++) {
5433 ASSERT_OK(db_->Merge(WriteOptions(), "foo", "TEST"));
5434 ASSERT_OK(Flush());
5435 }
5436 dbfull()->TEST_WaitForFlushMemTable();
5437 dbfull()->TEST_WaitForCompact();
5438
5439 ASSERT_NE(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 0);
5440 }
5441
5442 TEST_P(DBTestWithParam, FilterCompactionTimeTest) {
5443 Options options = CurrentOptions();
5444 options.compaction_filter_factory =
5445 std::make_shared<DelayFilterFactory>(this);
5446 options.disable_auto_compactions = true;
5447 options.create_if_missing = true;
5448 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
5449 options.statistics->set_stats_level(kExceptTimeForMutex);
5450 options.max_subcompactions = max_subcompactions_;
5451 DestroyAndReopen(options);
5452
5453 // put some data
5454 for (int table = 0; table < 4; ++table) {
5455 for (int i = 0; i < 10 + table; ++i) {
5456 Put(ToString(table * 100 + i), "val");
5457 }
5458 Flush();
5459 }
5460
5461 CompactRangeOptions cro;
5462 cro.exclusive_manual_compaction = exclusive_manual_compaction_;
5463 ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
5464 ASSERT_EQ(0U, CountLiveFiles());
5465
5466 Reopen(options);
5467
5468 Iterator* itr = db_->NewIterator(ReadOptions());
5469 itr->SeekToFirst();
5470 ASSERT_NE(TestGetTickerCount(options, FILTER_OPERATION_TOTAL_TIME), 0);
5471 delete itr;
5472 }
5473 #endif // ROCKSDB_LITE
5474
5475 TEST_F(DBTest, TestLogCleanup) {
5476 Options options = CurrentOptions();
5477 options.write_buffer_size = 64 * 1024; // very small
5478 // only two memtables allowed ==> only two log files
5479 options.max_write_buffer_number = 2;
5480 Reopen(options);
5481
5482 for (int i = 0; i < 100000; ++i) {
5483 Put(Key(i), "val");
5484 // only 2 memtables will be alive, so logs_to_free needs to always be below
5485 // 2
5486 ASSERT_LT(dbfull()->TEST_LogsToFreeSize(), static_cast<size_t>(3));
5487 }
5488 }
5489
5490 #ifndef ROCKSDB_LITE
5491 TEST_F(DBTest, EmptyCompactedDB) {
5492 Options options = CurrentOptions();
5493 options.max_open_files = -1;
5494 Close();
5495 ASSERT_OK(ReadOnlyReopen(options));
5496 Status s = Put("new", "value");
5497 ASSERT_TRUE(s.IsNotSupported());
5498 Close();
5499 }
5500 #endif // ROCKSDB_LITE
5501
5502 #ifndef ROCKSDB_LITE
5503 TEST_F(DBTest, SuggestCompactRangeTest) {
5504 class CompactionFilterFactoryGetContext : public CompactionFilterFactory {
5505 public:
5506 std::unique_ptr<CompactionFilter> CreateCompactionFilter(
5507 const CompactionFilter::Context& context) override {
5508 saved_context = context;
5509 std::unique_ptr<CompactionFilter> empty_filter;
5510 return empty_filter;
5511 }
5512 const char* Name() const override {
5513 return "CompactionFilterFactoryGetContext";
5514 }
5515 static bool IsManual(CompactionFilterFactory* compaction_filter_factory) {
5516 return reinterpret_cast<CompactionFilterFactoryGetContext*>(
5517 compaction_filter_factory)
5518 ->saved_context.is_manual_compaction;
5519 }
5520 CompactionFilter::Context saved_context;
5521 };
5522
5523 Options options = CurrentOptions();
5524 options.memtable_factory.reset(
5525 new SpecialSkipListFactory(DBTestBase::kNumKeysByGenerateNewRandomFile));
5526 options.compaction_style = kCompactionStyleLevel;
5527 options.compaction_filter_factory.reset(
5528 new CompactionFilterFactoryGetContext());
5529 options.write_buffer_size = 200 << 10;
5530 options.arena_block_size = 4 << 10;
5531 options.level0_file_num_compaction_trigger = 4;
5532 options.num_levels = 4;
5533 options.compression = kNoCompression;
5534 options.max_bytes_for_level_base = 450 << 10;
5535 options.target_file_size_base = 98 << 10;
5536 options.max_compaction_bytes = static_cast<uint64_t>(1) << 60; // inf
5537
5538 Reopen(options);
5539
5540 Random rnd(301);
5541
5542 for (int num = 0; num < 3; num++) {
5543 GenerateNewRandomFile(&rnd);
5544 }
5545
5546 GenerateNewRandomFile(&rnd);
5547 ASSERT_EQ("0,4", FilesPerLevel(0));
5548 ASSERT_TRUE(!CompactionFilterFactoryGetContext::IsManual(
5549 options.compaction_filter_factory.get()));
5550
5551 GenerateNewRandomFile(&rnd);
5552 ASSERT_EQ("1,4", FilesPerLevel(0));
5553
5554 GenerateNewRandomFile(&rnd);
5555 ASSERT_EQ("2,4", FilesPerLevel(0));
5556
5557 GenerateNewRandomFile(&rnd);
5558 ASSERT_EQ("3,4", FilesPerLevel(0));
5559
5560 GenerateNewRandomFile(&rnd);
5561 ASSERT_EQ("0,4,4", FilesPerLevel(0));
5562
5563 GenerateNewRandomFile(&rnd);
5564 ASSERT_EQ("1,4,4", FilesPerLevel(0));
5565
5566 GenerateNewRandomFile(&rnd);
5567 ASSERT_EQ("2,4,4", FilesPerLevel(0));
5568
5569 GenerateNewRandomFile(&rnd);
5570 ASSERT_EQ("3,4,4", FilesPerLevel(0));
5571
5572 GenerateNewRandomFile(&rnd);
5573 ASSERT_EQ("0,4,8", FilesPerLevel(0));
5574
5575 GenerateNewRandomFile(&rnd);
5576 ASSERT_EQ("1,4,8", FilesPerLevel(0));
5577
5578 // compact it three times
5579 for (int i = 0; i < 3; ++i) {
5580 ASSERT_OK(experimental::SuggestCompactRange(db_, nullptr, nullptr));
5581 dbfull()->TEST_WaitForCompact();
5582 }
5583
5584 // All files are compacted
5585 ASSERT_EQ(0, NumTableFilesAtLevel(0));
5586 ASSERT_EQ(0, NumTableFilesAtLevel(1));
5587
5588 GenerateNewRandomFile(&rnd);
5589 ASSERT_EQ(1, NumTableFilesAtLevel(0));
5590
5591 // nonoverlapping with the file on level 0
5592 Slice start("a"), end("b");
5593 ASSERT_OK(experimental::SuggestCompactRange(db_, &start, &end));
5594 dbfull()->TEST_WaitForCompact();
5595
5596 // should not compact the level 0 file
5597 ASSERT_EQ(1, NumTableFilesAtLevel(0));
5598
5599 start = Slice("j");
5600 end = Slice("m");
5601 ASSERT_OK(experimental::SuggestCompactRange(db_, &start, &end));
5602 dbfull()->TEST_WaitForCompact();
5603 ASSERT_TRUE(CompactionFilterFactoryGetContext::IsManual(
5604 options.compaction_filter_factory.get()));
5605
5606 // now it should compact the level 0 file
5607 ASSERT_EQ(0, NumTableFilesAtLevel(0));
5608 ASSERT_EQ(1, NumTableFilesAtLevel(1));
5609 }
5610
5611 TEST_F(DBTest, PromoteL0) {
5612 Options options = CurrentOptions();
5613 options.disable_auto_compactions = true;
5614 options.write_buffer_size = 10 * 1024 * 1024;
5615 DestroyAndReopen(options);
5616
5617 // non overlapping ranges
5618 std::vector<std::pair<int32_t, int32_t>> ranges = {
5619 {81, 160}, {0, 80}, {161, 240}, {241, 320}};
5620
5621 int32_t value_size = 10 * 1024; // 10 KB
5622
5623 Random rnd(301);
5624 std::map<int32_t, std::string> values;
5625 for (const auto& range : ranges) {
5626 for (int32_t j = range.first; j < range.second; j++) {
5627 values[j] = RandomString(&rnd, value_size);
5628 ASSERT_OK(Put(Key(j), values[j]));
5629 }
5630 ASSERT_OK(Flush());
5631 }
5632
5633 int32_t level0_files = NumTableFilesAtLevel(0, 0);
5634 ASSERT_EQ(level0_files, ranges.size());
5635 ASSERT_EQ(NumTableFilesAtLevel(1, 0), 0); // No files in L1
5636
5637 // Promote L0 level to L2.
5638 ASSERT_OK(experimental::PromoteL0(db_, db_->DefaultColumnFamily(), 2));
5639 // We expect that all the files were trivially moved from L0 to L2
5640 ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0);
5641 ASSERT_EQ(NumTableFilesAtLevel(2, 0), level0_files);
5642
5643 for (const auto& kv : values) {
5644 ASSERT_EQ(Get(Key(kv.first)), kv.second);
5645 }
5646 }
5647
5648 TEST_F(DBTest, PromoteL0Failure) {
5649 Options options = CurrentOptions();
5650 options.disable_auto_compactions = true;
5651 options.write_buffer_size = 10 * 1024 * 1024;
5652 DestroyAndReopen(options);
5653
5654 // Produce two L0 files with overlapping ranges.
5655 ASSERT_OK(Put(Key(0), ""));
5656 ASSERT_OK(Put(Key(3), ""));
5657 ASSERT_OK(Flush());
5658 ASSERT_OK(Put(Key(1), ""));
5659 ASSERT_OK(Flush());
5660
5661 Status status;
5662 // Fails because L0 has overlapping files.
5663 status = experimental::PromoteL0(db_, db_->DefaultColumnFamily());
5664 ASSERT_TRUE(status.IsInvalidArgument());
5665
5666 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
5667 // Now there is a file in L1.
5668 ASSERT_GE(NumTableFilesAtLevel(1, 0), 1);
5669
5670 ASSERT_OK(Put(Key(5), ""));
5671 ASSERT_OK(Flush());
5672 // Fails because L1 is non-empty.
5673 status = experimental::PromoteL0(db_, db_->DefaultColumnFamily());
5674 ASSERT_TRUE(status.IsInvalidArgument());
5675 }
5676
5677 // Github issue #596
5678 TEST_F(DBTest, CompactRangeWithEmptyBottomLevel) {
5679 const int kNumLevels = 2;
5680 const int kNumL0Files = 2;
5681 Options options = CurrentOptions();
5682 options.disable_auto_compactions = true;
5683 options.num_levels = kNumLevels;
5684 DestroyAndReopen(options);
5685
5686 Random rnd(301);
5687 for (int i = 0; i < kNumL0Files; ++i) {
5688 ASSERT_OK(Put(Key(0), RandomString(&rnd, 1024)));
5689 Flush();
5690 }
5691 ASSERT_EQ(NumTableFilesAtLevel(0), kNumL0Files);
5692 ASSERT_EQ(NumTableFilesAtLevel(1), 0);
5693
5694 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
5695 ASSERT_EQ(NumTableFilesAtLevel(0), 0);
5696 ASSERT_EQ(NumTableFilesAtLevel(1), kNumL0Files);
5697 }
5698 #endif // ROCKSDB_LITE
5699
5700 TEST_F(DBTest, AutomaticConflictsWithManualCompaction) {
5701 const int kNumL0Files = 50;
5702 Options options = CurrentOptions();
5703 options.level0_file_num_compaction_trigger = 4;
5704 // never slowdown / stop
5705 options.level0_slowdown_writes_trigger = 999999;
5706 options.level0_stop_writes_trigger = 999999;
5707 options.max_background_compactions = 10;
5708 DestroyAndReopen(options);
5709
5710 // schedule automatic compactions after the manual one starts, but before it
5711 // finishes to ensure conflict.
5712 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
5713 {{"DBImpl::BackgroundCompaction:Start",
5714 "DBTest::AutomaticConflictsWithManualCompaction:PrePuts"},
5715 {"DBTest::AutomaticConflictsWithManualCompaction:PostPuts",
5716 "DBImpl::BackgroundCompaction:NonTrivial:AfterRun"}});
5717 std::atomic<int> callback_count(0);
5718 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
5719 "DBImpl::MaybeScheduleFlushOrCompaction:Conflict",
5720 [&](void* /*arg*/) { callback_count.fetch_add(1); });
5721 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
5722
5723 Random rnd(301);
5724 for (int i = 0; i < 2; ++i) {
5725 // put two keys to ensure no trivial move
5726 for (int j = 0; j < 2; ++j) {
5727 ASSERT_OK(Put(Key(j), RandomString(&rnd, 1024)));
5728 }
5729 ASSERT_OK(Flush());
5730 }
5731 port::Thread manual_compaction_thread([this]() {
5732 CompactRangeOptions croptions;
5733 croptions.exclusive_manual_compaction = true;
5734 ASSERT_OK(db_->CompactRange(croptions, nullptr, nullptr));
5735 });
5736
5737 TEST_SYNC_POINT("DBTest::AutomaticConflictsWithManualCompaction:PrePuts");
5738 for (int i = 0; i < kNumL0Files; ++i) {
5739 // put two keys to ensure no trivial move
5740 for (int j = 0; j < 2; ++j) {
5741 ASSERT_OK(Put(Key(j), RandomString(&rnd, 1024)));
5742 }
5743 ASSERT_OK(Flush());
5744 }
5745 TEST_SYNC_POINT("DBTest::AutomaticConflictsWithManualCompaction:PostPuts");
5746
5747 ASSERT_GE(callback_count.load(), 1);
5748 for (int i = 0; i < 2; ++i) {
5749 ASSERT_NE("NOT_FOUND", Get(Key(i)));
5750 }
5751 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
5752 manual_compaction_thread.join();
5753 dbfull()->TEST_WaitForCompact();
5754 }
5755
5756 #ifndef ROCKSDB_LITE
5757 TEST_F(DBTest, CompactFilesShouldTriggerAutoCompaction) {
5758 Options options = CurrentOptions();
5759 options.max_background_compactions = 1;
5760 options.level0_file_num_compaction_trigger = 4;
5761 options.level0_slowdown_writes_trigger = 36;
5762 options.level0_stop_writes_trigger = 36;
5763 DestroyAndReopen(options);
5764
5765 // generate files for manual compaction
5766 Random rnd(301);
5767 for (int i = 0; i < 2; ++i) {
5768 // put two keys to ensure no trivial move
5769 for (int j = 0; j < 2; ++j) {
5770 ASSERT_OK(Put(Key(j), RandomString(&rnd, 1024)));
5771 }
5772 ASSERT_OK(Flush());
5773 }
5774
5775 ROCKSDB_NAMESPACE::ColumnFamilyMetaData cf_meta_data;
5776 db_->GetColumnFamilyMetaData(db_->DefaultColumnFamily(), &cf_meta_data);
5777
5778 std::vector<std::string> input_files;
5779 input_files.push_back(cf_meta_data.levels[0].files[0].name);
5780
5781 SyncPoint::GetInstance()->LoadDependency({
5782 {"CompactFilesImpl:0",
5783 "DBTest::CompactFilesShouldTriggerAutoCompaction:Begin"},
5784 {"DBTest::CompactFilesShouldTriggerAutoCompaction:End",
5785 "CompactFilesImpl:1"},
5786 });
5787
5788 SyncPoint::GetInstance()->EnableProcessing();
5789
5790 port::Thread manual_compaction_thread([&]() {
5791 auto s = db_->CompactFiles(CompactionOptions(),
5792 db_->DefaultColumnFamily(), input_files, 0);
5793 });
5794
5795 TEST_SYNC_POINT(
5796 "DBTest::CompactFilesShouldTriggerAutoCompaction:Begin");
5797 // generate enough files to trigger compaction
5798 for (int i = 0; i < 20; ++i) {
5799 for (int j = 0; j < 2; ++j) {
5800 ASSERT_OK(Put(Key(j), RandomString(&rnd, 1024)));
5801 }
5802 ASSERT_OK(Flush());
5803 }
5804 db_->GetColumnFamilyMetaData(db_->DefaultColumnFamily(), &cf_meta_data);
5805 ASSERT_GT(cf_meta_data.levels[0].files.size(),
5806 options.level0_file_num_compaction_trigger);
5807 TEST_SYNC_POINT(
5808 "DBTest::CompactFilesShouldTriggerAutoCompaction:End");
5809
5810 manual_compaction_thread.join();
5811 dbfull()->TEST_WaitForCompact();
5812
5813 db_->GetColumnFamilyMetaData(db_->DefaultColumnFamily(), &cf_meta_data);
5814 ASSERT_LE(cf_meta_data.levels[0].files.size(),
5815 options.level0_file_num_compaction_trigger);
5816 }
5817 #endif // ROCKSDB_LITE
5818
5819 // Github issue #595
5820 // Large write batch with column families
5821 TEST_F(DBTest, LargeBatchWithColumnFamilies) {
5822 Options options = CurrentOptions();
5823 options.env = env_;
5824 options.write_buffer_size = 100000; // Small write buffer
5825 CreateAndReopenWithCF({"pikachu"}, options);
5826 int64_t j = 0;
5827 for (int i = 0; i < 5; i++) {
5828 for (int pass = 1; pass <= 3; pass++) {
5829 WriteBatch batch;
5830 size_t write_size = 1024 * 1024 * (5 + i);
5831 fprintf(stderr, "prepare: %" ROCKSDB_PRIszt " MB, pass:%d\n",
5832 (write_size / 1024 / 1024), pass);
5833 for (;;) {
5834 std::string data(3000, j++ % 127 + 20);
5835 data += ToString(j);
5836 batch.Put(handles_[0], Slice(data), Slice(data));
5837 if (batch.GetDataSize() > write_size) {
5838 break;
5839 }
5840 }
5841 fprintf(stderr, "write: %" ROCKSDB_PRIszt " MB\n",
5842 (batch.GetDataSize() / 1024 / 1024));
5843 ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
5844 fprintf(stderr, "done\n");
5845 }
5846 }
5847 // make sure we can re-open it.
5848 ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
5849 }
5850
5851 // Make sure that Flushes can proceed in parallel with CompactRange()
5852 TEST_F(DBTest, FlushesInParallelWithCompactRange) {
5853 // iter == 0 -- leveled
5854 // iter == 1 -- leveled, but throw in a flush between two levels compacting
5855 // iter == 2 -- universal
5856 for (int iter = 0; iter < 3; ++iter) {
5857 Options options = CurrentOptions();
5858 if (iter < 2) {
5859 options.compaction_style = kCompactionStyleLevel;
5860 } else {
5861 options.compaction_style = kCompactionStyleUniversal;
5862 }
5863 options.write_buffer_size = 110 << 10;
5864 options.level0_file_num_compaction_trigger = 4;
5865 options.num_levels = 4;
5866 options.compression = kNoCompression;
5867 options.max_bytes_for_level_base = 450 << 10;
5868 options.target_file_size_base = 98 << 10;
5869 options.max_write_buffer_number = 2;
5870
5871 DestroyAndReopen(options);
5872
5873 Random rnd(301);
5874 for (int num = 0; num < 14; num++) {
5875 GenerateNewRandomFile(&rnd);
5876 }
5877
5878 if (iter == 1) {
5879 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
5880 {{"DBImpl::RunManualCompaction()::1",
5881 "DBTest::FlushesInParallelWithCompactRange:1"},
5882 {"DBTest::FlushesInParallelWithCompactRange:2",
5883 "DBImpl::RunManualCompaction()::2"}});
5884 } else {
5885 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
5886 {{"CompactionJob::Run():Start",
5887 "DBTest::FlushesInParallelWithCompactRange:1"},
5888 {"DBTest::FlushesInParallelWithCompactRange:2",
5889 "CompactionJob::Run():End"}});
5890 }
5891 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
5892
5893 std::vector<port::Thread> threads;
5894 threads.emplace_back([&]() { Compact("a", "z"); });
5895
5896 TEST_SYNC_POINT("DBTest::FlushesInParallelWithCompactRange:1");
5897
5898 // this has to start a flush. if flushes are blocked, this will try to
5899 // create
5900 // 3 memtables, and that will fail because max_write_buffer_number is 2
5901 for (int num = 0; num < 3; num++) {
5902 GenerateNewRandomFile(&rnd, /* nowait */ true);
5903 }
5904
5905 TEST_SYNC_POINT("DBTest::FlushesInParallelWithCompactRange:2");
5906
5907 for (auto& t : threads) {
5908 t.join();
5909 }
5910 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
5911 }
5912 }
5913
5914 TEST_F(DBTest, DelayedWriteRate) {
5915 const int kEntriesPerMemTable = 100;
5916 const int kTotalFlushes = 12;
5917
5918 Options options = CurrentOptions();
5919 env_->SetBackgroundThreads(1, Env::LOW);
5920 options.env = env_;
5921 env_->no_slowdown_ = true;
5922 options.write_buffer_size = 100000000;
5923 options.max_write_buffer_number = 256;
5924 options.max_background_compactions = 1;
5925 options.level0_file_num_compaction_trigger = 3;
5926 options.level0_slowdown_writes_trigger = 3;
5927 options.level0_stop_writes_trigger = 999999;
5928 options.delayed_write_rate = 20000000; // Start with 200MB/s
5929 options.memtable_factory.reset(
5930 new SpecialSkipListFactory(kEntriesPerMemTable));
5931
5932 CreateAndReopenWithCF({"pikachu"}, options);
5933
5934 // Block compactions
5935 test::SleepingBackgroundTask sleeping_task_low;
5936 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
5937 Env::Priority::LOW);
5938
5939 for (int i = 0; i < 3; i++) {
5940 Put(Key(i), std::string(10000, 'x'));
5941 Flush();
5942 }
5943
5944 // These writes will be slowed down to 1KB/s
5945 uint64_t estimated_sleep_time = 0;
5946 Random rnd(301);
5947 Put("", "");
5948 uint64_t cur_rate = options.delayed_write_rate;
5949 for (int i = 0; i < kTotalFlushes; i++) {
5950 uint64_t size_memtable = 0;
5951 for (int j = 0; j < kEntriesPerMemTable; j++) {
5952 auto rand_num = rnd.Uniform(20);
5953 // Spread the size range to more.
5954 size_t entry_size = rand_num * rand_num * rand_num;
5955 WriteOptions wo;
5956 Put(Key(i), std::string(entry_size, 'x'), wo);
5957 size_memtable += entry_size + 18;
5958 // Occasionally sleep a while
5959 if (rnd.Uniform(20) == 6) {
5960 env_->SleepForMicroseconds(2666);
5961 }
5962 }
5963 dbfull()->TEST_WaitForFlushMemTable();
5964 estimated_sleep_time += size_memtable * 1000000u / cur_rate;
5965 // Slow down twice. One for memtable switch and one for flush finishes.
5966 cur_rate = static_cast<uint64_t>(static_cast<double>(cur_rate) *
5967 kIncSlowdownRatio * kIncSlowdownRatio);
5968 }
5969 // Estimate the total sleep time fall into the rough range.
5970 ASSERT_GT(env_->addon_time_.load(),
5971 static_cast<int64_t>(estimated_sleep_time / 2));
5972 ASSERT_LT(env_->addon_time_.load(),
5973 static_cast<int64_t>(estimated_sleep_time * 2));
5974
5975 env_->no_slowdown_ = false;
5976 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
5977 sleeping_task_low.WakeUp();
5978 sleeping_task_low.WaitUntilDone();
5979 }
5980
5981 TEST_F(DBTest, HardLimit) {
5982 Options options = CurrentOptions();
5983 options.env = env_;
5984 env_->SetBackgroundThreads(1, Env::LOW);
5985 options.max_write_buffer_number = 256;
5986 options.write_buffer_size = 110 << 10; // 110KB
5987 options.arena_block_size = 4 * 1024;
5988 options.level0_file_num_compaction_trigger = 4;
5989 options.level0_slowdown_writes_trigger = 999999;
5990 options.level0_stop_writes_trigger = 999999;
5991 options.hard_pending_compaction_bytes_limit = 800 << 10;
5992 options.max_bytes_for_level_base = 10000000000u;
5993 options.max_background_compactions = 1;
5994 options.memtable_factory.reset(
5995 new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
5996
5997 env_->SetBackgroundThreads(1, Env::LOW);
5998 test::SleepingBackgroundTask sleeping_task_low;
5999 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
6000 Env::Priority::LOW);
6001
6002 CreateAndReopenWithCF({"pikachu"}, options);
6003
6004 std::atomic<int> callback_count(0);
6005 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
6006 "DBImpl::DelayWrite:Wait", [&](void* /*arg*/) {
6007 callback_count.fetch_add(1);
6008 sleeping_task_low.WakeUp();
6009 });
6010 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
6011
6012 Random rnd(301);
6013 int key_idx = 0;
6014 for (int num = 0; num < 5; num++) {
6015 GenerateNewFile(&rnd, &key_idx, true);
6016 dbfull()->TEST_WaitForFlushMemTable();
6017 }
6018
6019 ASSERT_EQ(0, callback_count.load());
6020
6021 for (int num = 0; num < 5; num++) {
6022 GenerateNewFile(&rnd, &key_idx, true);
6023 dbfull()->TEST_WaitForFlushMemTable();
6024 }
6025 ASSERT_GE(callback_count.load(), 1);
6026
6027 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
6028 sleeping_task_low.WaitUntilDone();
6029 }
6030
6031 #if !defined(ROCKSDB_LITE) && !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION)
6032 class WriteStallListener : public EventListener {
6033 public:
6034 WriteStallListener() : condition_(WriteStallCondition::kNormal) {}
6035 void OnStallConditionsChanged(const WriteStallInfo& info) override {
6036 MutexLock l(&mutex_);
6037 condition_ = info.condition.cur;
6038 }
6039 bool CheckCondition(WriteStallCondition expected) {
6040 MutexLock l(&mutex_);
6041 return expected == condition_;
6042 }
6043 private:
6044 port::Mutex mutex_;
6045 WriteStallCondition condition_;
6046 };
6047
6048 TEST_F(DBTest, SoftLimit) {
6049 Options options = CurrentOptions();
6050 options.env = env_;
6051 options.write_buffer_size = 100000; // Small write buffer
6052 options.max_write_buffer_number = 256;
6053 options.level0_file_num_compaction_trigger = 1;
6054 options.level0_slowdown_writes_trigger = 3;
6055 options.level0_stop_writes_trigger = 999999;
6056 options.delayed_write_rate = 20000; // About 200KB/s limited rate
6057 options.soft_pending_compaction_bytes_limit = 160000;
6058 options.target_file_size_base = 99999999; // All into one file
6059 options.max_bytes_for_level_base = 50000;
6060 options.max_bytes_for_level_multiplier = 10;
6061 options.max_background_compactions = 1;
6062 options.compression = kNoCompression;
6063 WriteStallListener* listener = new WriteStallListener();
6064 options.listeners.emplace_back(listener);
6065
6066 // FlushMemtable with opt.wait=true does not wait for
6067 // `OnStallConditionsChanged` being called. The event listener is triggered
6068 // on `JobContext::Clean`, which happens after flush result is installed.
6069 // We use sync point to create a custom WaitForFlush that waits for
6070 // context cleanup.
6071 port::Mutex flush_mutex;
6072 port::CondVar flush_cv(&flush_mutex);
6073 bool flush_finished = false;
6074 auto InstallFlushCallback = [&]() {
6075 {
6076 MutexLock l(&flush_mutex);
6077 flush_finished = false;
6078 }
6079 SyncPoint::GetInstance()->SetCallBack(
6080 "DBImpl::BackgroundCallFlush:ContextCleanedUp", [&](void*) {
6081 {
6082 MutexLock l(&flush_mutex);
6083 flush_finished = true;
6084 }
6085 flush_cv.SignalAll();
6086 });
6087 };
6088 auto WaitForFlush = [&]() {
6089 {
6090 MutexLock l(&flush_mutex);
6091 while (!flush_finished) {
6092 flush_cv.Wait();
6093 }
6094 }
6095 SyncPoint::GetInstance()->ClearCallBack(
6096 "DBImpl::BackgroundCallFlush:ContextCleanedUp");
6097 };
6098
6099 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
6100
6101 Reopen(options);
6102
6103 // Generating 360KB in Level 3
6104 for (int i = 0; i < 72; i++) {
6105 Put(Key(i), std::string(5000, 'x'));
6106 if (i % 10 == 0) {
6107 dbfull()->TEST_FlushMemTable(true, true);
6108 }
6109 }
6110 dbfull()->TEST_WaitForCompact();
6111 MoveFilesToLevel(3);
6112
6113 // Generating 360KB in Level 2
6114 for (int i = 0; i < 72; i++) {
6115 Put(Key(i), std::string(5000, 'x'));
6116 if (i % 10 == 0) {
6117 dbfull()->TEST_FlushMemTable(true, true);
6118 }
6119 }
6120 dbfull()->TEST_WaitForCompact();
6121 MoveFilesToLevel(2);
6122
6123 Put(Key(0), "");
6124
6125 test::SleepingBackgroundTask sleeping_task_low;
6126 // Block compactions
6127 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
6128 Env::Priority::LOW);
6129 sleeping_task_low.WaitUntilSleeping();
6130
6131 // Create 3 L0 files, making score of L0 to be 3.
6132 for (int i = 0; i < 3; i++) {
6133 Put(Key(i), std::string(5000, 'x'));
6134 Put(Key(100 - i), std::string(5000, 'x'));
6135 // Flush the file. File size is around 30KB.
6136 InstallFlushCallback();
6137 dbfull()->TEST_FlushMemTable(true, true);
6138 WaitForFlush();
6139 }
6140 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
6141 ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kDelayed));
6142
6143 sleeping_task_low.WakeUp();
6144 sleeping_task_low.WaitUntilDone();
6145 sleeping_task_low.Reset();
6146 dbfull()->TEST_WaitForCompact();
6147
6148 // Now there is one L1 file but doesn't trigger soft_rate_limit
6149 // The L1 file size is around 30KB.
6150 ASSERT_EQ(NumTableFilesAtLevel(1), 1);
6151 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
6152 ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kNormal));
6153
6154 // Only allow one compactin going through.
6155 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
6156 "BackgroundCallCompaction:0", [&](void* /*arg*/) {
6157 // Schedule a sleeping task.
6158 sleeping_task_low.Reset();
6159 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
6160 &sleeping_task_low, Env::Priority::LOW);
6161 });
6162
6163 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
6164 Env::Priority::LOW);
6165 sleeping_task_low.WaitUntilSleeping();
6166 // Create 3 L0 files, making score of L0 to be 3
6167 for (int i = 0; i < 3; i++) {
6168 Put(Key(10 + i), std::string(5000, 'x'));
6169 Put(Key(90 - i), std::string(5000, 'x'));
6170 // Flush the file. File size is around 30KB.
6171 InstallFlushCallback();
6172 dbfull()->TEST_FlushMemTable(true, true);
6173 WaitForFlush();
6174 }
6175
6176 // Wake up sleep task to enable compaction to run and waits
6177 // for it to go to sleep state again to make sure one compaction
6178 // goes through.
6179 sleeping_task_low.WakeUp();
6180 sleeping_task_low.WaitUntilSleeping();
6181
6182 // Now there is one L1 file (around 60KB) which exceeds 50KB base by 10KB
6183 // Given level multiplier 10, estimated pending compaction is around 100KB
6184 // doesn't trigger soft_pending_compaction_bytes_limit
6185 ASSERT_EQ(NumTableFilesAtLevel(1), 1);
6186 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
6187 ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kNormal));
6188
6189 // Create 3 L0 files, making score of L0 to be 3, higher than L0.
6190 for (int i = 0; i < 3; i++) {
6191 Put(Key(20 + i), std::string(5000, 'x'));
6192 Put(Key(80 - i), std::string(5000, 'x'));
6193 // Flush the file. File size is around 30KB.
6194 InstallFlushCallback();
6195 dbfull()->TEST_FlushMemTable(true, true);
6196 WaitForFlush();
6197 }
6198 // Wake up sleep task to enable compaction to run and waits
6199 // for it to go to sleep state again to make sure one compaction
6200 // goes through.
6201 sleeping_task_low.WakeUp();
6202 sleeping_task_low.WaitUntilSleeping();
6203
6204 // Now there is one L1 file (around 90KB) which exceeds 50KB base by 40KB
6205 // L2 size is 360KB, so the estimated level fanout 4, estimated pending
6206 // compaction is around 200KB
6207 // triggerring soft_pending_compaction_bytes_limit
6208 ASSERT_EQ(NumTableFilesAtLevel(1), 1);
6209 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
6210 ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kDelayed));
6211
6212 sleeping_task_low.WakeUp();
6213 sleeping_task_low.WaitUntilSleeping();
6214
6215 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
6216 ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kNormal));
6217
6218 // shrink level base so L2 will hit soft limit easier.
6219 ASSERT_OK(dbfull()->SetOptions({
6220 {"max_bytes_for_level_base", "5000"},
6221 }));
6222
6223 Put("", "");
6224 Flush();
6225 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
6226 ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kDelayed));
6227
6228 sleeping_task_low.WaitUntilSleeping();
6229 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
6230 sleeping_task_low.WakeUp();
6231 sleeping_task_low.WaitUntilDone();
6232 }
6233
6234 TEST_F(DBTest, LastWriteBufferDelay) {
6235 Options options = CurrentOptions();
6236 options.env = env_;
6237 options.write_buffer_size = 100000;
6238 options.max_write_buffer_number = 4;
6239 options.delayed_write_rate = 20000;
6240 options.compression = kNoCompression;
6241 options.disable_auto_compactions = true;
6242 int kNumKeysPerMemtable = 3;
6243 options.memtable_factory.reset(
6244 new SpecialSkipListFactory(kNumKeysPerMemtable));
6245
6246 Reopen(options);
6247 test::SleepingBackgroundTask sleeping_task;
6248 // Block flushes
6249 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
6250 Env::Priority::HIGH);
6251 sleeping_task.WaitUntilSleeping();
6252
6253 // Create 3 L0 files, making score of L0 to be 3.
6254 for (int i = 0; i < 3; i++) {
6255 // Fill one mem table
6256 for (int j = 0; j < kNumKeysPerMemtable; j++) {
6257 Put(Key(j), "");
6258 }
6259 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
6260 }
6261 // Inserting a new entry would create a new mem table, triggering slow down.
6262 Put(Key(0), "");
6263 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
6264
6265 sleeping_task.WakeUp();
6266 sleeping_task.WaitUntilDone();
6267 }
6268 #endif // !defined(ROCKSDB_LITE) && !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION)
6269
6270 TEST_F(DBTest, FailWhenCompressionNotSupportedTest) {
6271 CompressionType compressions[] = {kZlibCompression, kBZip2Compression,
6272 kLZ4Compression, kLZ4HCCompression,
6273 kXpressCompression};
6274 for (auto comp : compressions) {
6275 if (!CompressionTypeSupported(comp)) {
6276 // not supported, we should fail the Open()
6277 Options options = CurrentOptions();
6278 options.compression = comp;
6279 ASSERT_TRUE(!TryReopen(options).ok());
6280 // Try if CreateColumnFamily also fails
6281 options.compression = kNoCompression;
6282 ASSERT_OK(TryReopen(options));
6283 ColumnFamilyOptions cf_options(options);
6284 cf_options.compression = comp;
6285 ColumnFamilyHandle* handle;
6286 ASSERT_TRUE(!db_->CreateColumnFamily(cf_options, "name", &handle).ok());
6287 }
6288 }
6289 }
6290
6291 TEST_F(DBTest, CreateColumnFamilyShouldFailOnIncompatibleOptions) {
6292 Options options = CurrentOptions();
6293 options.max_open_files = 100;
6294 Reopen(options);
6295
6296 ColumnFamilyOptions cf_options(options);
6297 // ttl is now supported when max_open_files is -1.
6298 cf_options.ttl = 3600;
6299 ColumnFamilyHandle* handle;
6300 ASSERT_OK(db_->CreateColumnFamily(cf_options, "pikachu", &handle));
6301 delete handle;
6302 }
6303
6304 #ifndef ROCKSDB_LITE
6305 TEST_F(DBTest, RowCache) {
6306 Options options = CurrentOptions();
6307 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
6308 options.row_cache = NewLRUCache(8192);
6309 DestroyAndReopen(options);
6310
6311 ASSERT_OK(Put("foo", "bar"));
6312 ASSERT_OK(Flush());
6313
6314 ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 0);
6315 ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 0);
6316 ASSERT_EQ(Get("foo"), "bar");
6317 ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 0);
6318 ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 1);
6319 ASSERT_EQ(Get("foo"), "bar");
6320 ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 1);
6321 ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 1);
6322 }
6323
6324 TEST_F(DBTest, PinnableSliceAndRowCache) {
6325 Options options = CurrentOptions();
6326 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
6327 options.row_cache = NewLRUCache(8192);
6328 DestroyAndReopen(options);
6329
6330 ASSERT_OK(Put("foo", "bar"));
6331 ASSERT_OK(Flush());
6332
6333 ASSERT_EQ(Get("foo"), "bar");
6334 ASSERT_EQ(
6335 reinterpret_cast<LRUCache*>(options.row_cache.get())->TEST_GetLRUSize(),
6336 1);
6337
6338 {
6339 PinnableSlice pin_slice;
6340 ASSERT_EQ(Get("foo", &pin_slice), Status::OK());
6341 ASSERT_EQ(pin_slice.ToString(), "bar");
6342 // Entry is already in cache, lookup will remove the element from lru
6343 ASSERT_EQ(
6344 reinterpret_cast<LRUCache*>(options.row_cache.get())->TEST_GetLRUSize(),
6345 0);
6346 }
6347 // After PinnableSlice destruction element is added back in LRU
6348 ASSERT_EQ(
6349 reinterpret_cast<LRUCache*>(options.row_cache.get())->TEST_GetLRUSize(),
6350 1);
6351 }
6352
6353 #endif // ROCKSDB_LITE
6354
6355 TEST_F(DBTest, DeletingOldWalAfterDrop) {
6356 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
6357 {{"Test:AllowFlushes", "DBImpl::BGWorkFlush"},
6358 {"DBImpl::BGWorkFlush:done", "Test:WaitForFlush"}});
6359 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
6360
6361 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
6362 Options options = CurrentOptions();
6363 options.max_total_wal_size = 8192;
6364 options.compression = kNoCompression;
6365 options.write_buffer_size = 1 << 20;
6366 options.level0_file_num_compaction_trigger = (1 << 30);
6367 options.level0_slowdown_writes_trigger = (1 << 30);
6368 options.level0_stop_writes_trigger = (1 << 30);
6369 options.disable_auto_compactions = true;
6370 DestroyAndReopen(options);
6371 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
6372
6373 CreateColumnFamilies({"cf1", "cf2"}, options);
6374 ASSERT_OK(Put(0, "key1", DummyString(8192)));
6375 ASSERT_OK(Put(0, "key2", DummyString(8192)));
6376 // the oldest wal should now be getting_flushed
6377 ASSERT_OK(db_->DropColumnFamily(handles_[0]));
6378 // all flushes should now do nothing because their CF is dropped
6379 TEST_SYNC_POINT("Test:AllowFlushes");
6380 TEST_SYNC_POINT("Test:WaitForFlush");
6381 uint64_t lognum1 = dbfull()->TEST_LogfileNumber();
6382 ASSERT_OK(Put(1, "key3", DummyString(8192)));
6383 ASSERT_OK(Put(1, "key4", DummyString(8192)));
6384 // new wal should have been created
6385 uint64_t lognum2 = dbfull()->TEST_LogfileNumber();
6386 EXPECT_GT(lognum2, lognum1);
6387 }
6388
6389 TEST_F(DBTest, UnsupportedManualSync) {
6390 DestroyAndReopen(CurrentOptions());
6391 env_->is_wal_sync_thread_safe_.store(false);
6392 Status s = db_->SyncWAL();
6393 ASSERT_TRUE(s.IsNotSupported());
6394 }
6395
6396 INSTANTIATE_TEST_CASE_P(DBTestWithParam, DBTestWithParam,
6397 ::testing::Combine(::testing::Values(1, 4),
6398 ::testing::Bool()));
6399
6400 TEST_F(DBTest, PauseBackgroundWorkTest) {
6401 Options options = CurrentOptions();
6402 options.write_buffer_size = 100000; // Small write buffer
6403 Reopen(options);
6404
6405 std::vector<port::Thread> threads;
6406 std::atomic<bool> done(false);
6407 db_->PauseBackgroundWork();
6408 threads.emplace_back([&]() {
6409 Random rnd(301);
6410 for (int i = 0; i < 10000; ++i) {
6411 Put(RandomString(&rnd, 10), RandomString(&rnd, 10));
6412 }
6413 done.store(true);
6414 });
6415 env_->SleepForMicroseconds(200000);
6416 // make sure the thread is not done
6417 ASSERT_FALSE(done.load());
6418 db_->ContinueBackgroundWork();
6419 for (auto& t : threads) {
6420 t.join();
6421 }
6422 // now it's done
6423 ASSERT_TRUE(done.load());
6424 }
6425
6426 // Keep spawning short-living threads that create an iterator and quit.
6427 // Meanwhile in another thread keep flushing memtables.
6428 // This used to cause a deadlock.
6429 TEST_F(DBTest, ThreadLocalPtrDeadlock) {
6430 std::atomic<int> flushes_done{0};
6431 std::atomic<int> threads_destroyed{0};
6432 auto done = [&] {
6433 return flushes_done.load() > 10;
6434 };
6435
6436 port::Thread flushing_thread([&] {
6437 for (int i = 0; !done(); ++i) {
6438 ASSERT_OK(db_->Put(WriteOptions(), Slice("hi"),
6439 Slice(std::to_string(i).c_str())));
6440 ASSERT_OK(db_->Flush(FlushOptions()));
6441 int cnt = ++flushes_done;
6442 fprintf(stderr, "Flushed %d times\n", cnt);
6443 }
6444 });
6445
6446 std::vector<port::Thread> thread_spawning_threads(10);
6447 for (auto& t: thread_spawning_threads) {
6448 t = port::Thread([&] {
6449 while (!done()) {
6450 {
6451 port::Thread tmp_thread([&] {
6452 auto it = db_->NewIterator(ReadOptions());
6453 delete it;
6454 });
6455 tmp_thread.join();
6456 }
6457 ++threads_destroyed;
6458 }
6459 });
6460 }
6461
6462 for (auto& t: thread_spawning_threads) {
6463 t.join();
6464 }
6465 flushing_thread.join();
6466 fprintf(stderr, "Done. Flushed %d times, destroyed %d threads\n",
6467 flushes_done.load(), threads_destroyed.load());
6468 }
6469
6470 TEST_F(DBTest, LargeBlockSizeTest) {
6471 Options options = CurrentOptions();
6472 CreateAndReopenWithCF({"pikachu"}, options);
6473 ASSERT_OK(Put(0, "foo", "bar"));
6474 BlockBasedTableOptions table_options;
6475 table_options.block_size = 8LL * 1024 * 1024 * 1024LL;
6476 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
6477 ASSERT_NOK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
6478 }
6479
6480 #ifndef ROCKSDB_LITE
6481
6482 TEST_F(DBTest, CreationTimeOfOldestFile) {
6483 const int kNumKeysPerFile = 32;
6484 const int kNumLevelFiles = 2;
6485 const int kValueSize = 100;
6486
6487 Options options = CurrentOptions();
6488 options.max_open_files = -1;
6489 env_->time_elapse_only_sleep_ = false;
6490 options.env = env_;
6491
6492 env_->addon_time_.store(0);
6493 DestroyAndReopen(options);
6494
6495 bool set_file_creation_time_to_zero = true;
6496 int idx = 0;
6497
6498 int64_t time_1 = 0;
6499 env_->GetCurrentTime(&time_1);
6500 const uint64_t uint_time_1 = static_cast<uint64_t>(time_1);
6501
6502 // Add 50 hours
6503 env_->addon_time_.fetch_add(50 * 60 * 60);
6504
6505 int64_t time_2 = 0;
6506 env_->GetCurrentTime(&time_2);
6507 const uint64_t uint_time_2 = static_cast<uint64_t>(time_2);
6508
6509 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
6510 "PropertyBlockBuilder::AddTableProperty:Start", [&](void* arg) {
6511 TableProperties* props = reinterpret_cast<TableProperties*>(arg);
6512 if (set_file_creation_time_to_zero) {
6513 if (idx == 0) {
6514 props->file_creation_time = 0;
6515 idx++;
6516 } else if (idx == 1) {
6517 props->file_creation_time = uint_time_1;
6518 idx = 0;
6519 }
6520 } else {
6521 if (idx == 0) {
6522 props->file_creation_time = uint_time_1;
6523 idx++;
6524 } else if (idx == 1) {
6525 props->file_creation_time = uint_time_2;
6526 }
6527 }
6528 });
6529 // Set file creation time in manifest all to 0.
6530 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
6531 "FileMetaData::FileMetaData", [&](void* arg) {
6532 FileMetaData* meta = static_cast<FileMetaData*>(arg);
6533 meta->file_creation_time = 0;
6534 });
6535 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
6536
6537 Random rnd(301);
6538 for (int i = 0; i < kNumLevelFiles; ++i) {
6539 for (int j = 0; j < kNumKeysPerFile; ++j) {
6540 ASSERT_OK(
6541 Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
6542 }
6543 Flush();
6544 }
6545
6546 // At this point there should be 2 files, one with file_creation_time = 0 and
6547 // the other non-zero. GetCreationTimeOfOldestFile API should return 0.
6548 uint64_t creation_time;
6549 Status s1 = dbfull()->GetCreationTimeOfOldestFile(&creation_time);
6550 ASSERT_EQ(0, creation_time);
6551 ASSERT_EQ(s1, Status::OK());
6552
6553 // Testing with non-zero file creation time.
6554 set_file_creation_time_to_zero = false;
6555 options = CurrentOptions();
6556 options.max_open_files = -1;
6557 env_->time_elapse_only_sleep_ = false;
6558 options.env = env_;
6559
6560 env_->addon_time_.store(0);
6561 DestroyAndReopen(options);
6562
6563 for (int i = 0; i < kNumLevelFiles; ++i) {
6564 for (int j = 0; j < kNumKeysPerFile; ++j) {
6565 ASSERT_OK(
6566 Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
6567 }
6568 Flush();
6569 }
6570
6571 // At this point there should be 2 files with non-zero file creation time.
6572 // GetCreationTimeOfOldestFile API should return non-zero value.
6573 uint64_t ctime;
6574 Status s2 = dbfull()->GetCreationTimeOfOldestFile(&ctime);
6575 ASSERT_EQ(uint_time_1, ctime);
6576 ASSERT_EQ(s2, Status::OK());
6577
6578 // Testing with max_open_files != -1
6579 options = CurrentOptions();
6580 options.max_open_files = 10;
6581 DestroyAndReopen(options);
6582 Status s3 = dbfull()->GetCreationTimeOfOldestFile(&ctime);
6583 ASSERT_EQ(s3, Status::NotSupported());
6584
6585 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
6586 }
6587
6588 #endif
6589
6590 } // namespace ROCKSDB_NAMESPACE
6591
6592 #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
6593 extern "C" {
6594 void RegisterCustomObjects(int argc, char** argv);
6595 }
6596 #else
6597 void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
6598 #endif // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
6599
6600 int main(int argc, char** argv) {
6601 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
6602 ::testing::InitGoogleTest(&argc, argv);
6603 RegisterCustomObjects(argc, argv);
6604 return RUN_ALL_TESTS();
6605 }