]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_test.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / rocksdb / db / db_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 // Introduction of SyncPoint effectively disabled building and running this test
11 // in Release build.
12 // which is a pity, it is a good test
13 #include <fcntl.h>
14 #include <algorithm>
15 #include <set>
16 #include <thread>
17 #include <unordered_set>
18 #include <utility>
19 #ifndef OS_WIN
20 #include <unistd.h>
21 #endif
22 #ifdef OS_SOLARIS
23 #include <alloca.h>
24 #endif
25
26 #include "db/db_impl.h"
27 #include "db/db_test_util.h"
28 #include "db/dbformat.h"
29 #include "db/job_context.h"
30 #include "db/version_set.h"
31 #include "db/write_batch_internal.h"
32 #include "env/mock_env.h"
33 #include "memtable/hash_linklist_rep.h"
34 #include "monitoring/thread_status_util.h"
35 #include "port/port.h"
36 #include "port/stack_trace.h"
37 #include "rocksdb/cache.h"
38 #include "rocksdb/compaction_filter.h"
39 #include "rocksdb/convenience.h"
40 #include "rocksdb/db.h"
41 #include "rocksdb/env.h"
42 #include "rocksdb/experimental.h"
43 #include "rocksdb/filter_policy.h"
44 #include "rocksdb/options.h"
45 #include "rocksdb/perf_context.h"
46 #include "rocksdb/slice.h"
47 #include "rocksdb/slice_transform.h"
48 #include "rocksdb/snapshot.h"
49 #include "rocksdb/table.h"
50 #include "rocksdb/table_properties.h"
51 #include "rocksdb/thread_status.h"
52 #include "rocksdb/utilities/checkpoint.h"
53 #include "rocksdb/utilities/optimistic_transaction_db.h"
54 #include "rocksdb/utilities/write_batch_with_index.h"
55 #include "table/block_based_table_factory.h"
56 #include "table/mock_table.h"
57 #include "table/plain_table_factory.h"
58 #include "table/scoped_arena_iterator.h"
59 #include "util/compression.h"
60 #include "util/file_reader_writer.h"
61 #include "util/filename.h"
62 #include "util/hash.h"
63 #include "util/logging.h"
64 #include "util/mutexlock.h"
65 #include "util/rate_limiter.h"
66 #include "util/string_util.h"
67 #include "util/sync_point.h"
68 #include "util/testharness.h"
69 #include "util/testutil.h"
70 #include "utilities/merge_operators.h"
71
72 namespace rocksdb {
73
74 class DBTest : public DBTestBase {
75 public:
76 DBTest() : DBTestBase("/db_test") {}
77 };
78
79 class DBTestWithParam
80 : public DBTest,
81 public testing::WithParamInterface<std::tuple<uint32_t, bool>> {
82 public:
83 DBTestWithParam() {
84 max_subcompactions_ = std::get<0>(GetParam());
85 exclusive_manual_compaction_ = std::get<1>(GetParam());
86 }
87
88 // Required if inheriting from testing::WithParamInterface<>
89 static void SetUpTestCase() {}
90 static void TearDownTestCase() {}
91
92 uint32_t max_subcompactions_;
93 bool exclusive_manual_compaction_;
94 };
95
96 TEST_F(DBTest, MockEnvTest) {
97 unique_ptr<MockEnv> env{new MockEnv(Env::Default())};
98 Options options;
99 options.create_if_missing = true;
100 options.env = env.get();
101 DB* db;
102
103 const Slice keys[] = {Slice("aaa"), Slice("bbb"), Slice("ccc")};
104 const Slice vals[] = {Slice("foo"), Slice("bar"), Slice("baz")};
105
106 ASSERT_OK(DB::Open(options, "/dir/db", &db));
107 for (size_t i = 0; i < 3; ++i) {
108 ASSERT_OK(db->Put(WriteOptions(), keys[i], vals[i]));
109 }
110
111 for (size_t i = 0; i < 3; ++i) {
112 std::string res;
113 ASSERT_OK(db->Get(ReadOptions(), keys[i], &res));
114 ASSERT_TRUE(res == vals[i]);
115 }
116
117 Iterator* iterator = db->NewIterator(ReadOptions());
118 iterator->SeekToFirst();
119 for (size_t i = 0; i < 3; ++i) {
120 ASSERT_TRUE(iterator->Valid());
121 ASSERT_TRUE(keys[i] == iterator->key());
122 ASSERT_TRUE(vals[i] == iterator->value());
123 iterator->Next();
124 }
125 ASSERT_TRUE(!iterator->Valid());
126 delete iterator;
127
128 // TEST_FlushMemTable() is not supported in ROCKSDB_LITE
129 #ifndef ROCKSDB_LITE
130 DBImpl* dbi = reinterpret_cast<DBImpl*>(db);
131 ASSERT_OK(dbi->TEST_FlushMemTable());
132
133 for (size_t i = 0; i < 3; ++i) {
134 std::string res;
135 ASSERT_OK(db->Get(ReadOptions(), keys[i], &res));
136 ASSERT_TRUE(res == vals[i]);
137 }
138 #endif // ROCKSDB_LITE
139
140 delete db;
141 }
142
143 // NewMemEnv returns nullptr in ROCKSDB_LITE since class InMemoryEnv isn't
144 // defined.
145 #ifndef ROCKSDB_LITE
146 TEST_F(DBTest, MemEnvTest) {
147 unique_ptr<Env> env{NewMemEnv(Env::Default())};
148 Options options;
149 options.create_if_missing = true;
150 options.env = env.get();
151 DB* db;
152
153 const Slice keys[] = {Slice("aaa"), Slice("bbb"), Slice("ccc")};
154 const Slice vals[] = {Slice("foo"), Slice("bar"), Slice("baz")};
155
156 ASSERT_OK(DB::Open(options, "/dir/db", &db));
157 for (size_t i = 0; i < 3; ++i) {
158 ASSERT_OK(db->Put(WriteOptions(), keys[i], vals[i]));
159 }
160
161 for (size_t i = 0; i < 3; ++i) {
162 std::string res;
163 ASSERT_OK(db->Get(ReadOptions(), keys[i], &res));
164 ASSERT_TRUE(res == vals[i]);
165 }
166
167 Iterator* iterator = db->NewIterator(ReadOptions());
168 iterator->SeekToFirst();
169 for (size_t i = 0; i < 3; ++i) {
170 ASSERT_TRUE(iterator->Valid());
171 ASSERT_TRUE(keys[i] == iterator->key());
172 ASSERT_TRUE(vals[i] == iterator->value());
173 iterator->Next();
174 }
175 ASSERT_TRUE(!iterator->Valid());
176 delete iterator;
177
178 DBImpl* dbi = reinterpret_cast<DBImpl*>(db);
179 ASSERT_OK(dbi->TEST_FlushMemTable());
180
181 for (size_t i = 0; i < 3; ++i) {
182 std::string res;
183 ASSERT_OK(db->Get(ReadOptions(), keys[i], &res));
184 ASSERT_TRUE(res == vals[i]);
185 }
186
187 delete db;
188
189 options.create_if_missing = false;
190 ASSERT_OK(DB::Open(options, "/dir/db", &db));
191 for (size_t i = 0; i < 3; ++i) {
192 std::string res;
193 ASSERT_OK(db->Get(ReadOptions(), keys[i], &res));
194 ASSERT_TRUE(res == vals[i]);
195 }
196 delete db;
197 }
198 #endif // ROCKSDB_LITE
199
200 TEST_F(DBTest, WriteEmptyBatch) {
201 Options options = CurrentOptions();
202 options.env = env_;
203 options.write_buffer_size = 100000;
204 CreateAndReopenWithCF({"pikachu"}, options);
205
206 ASSERT_OK(Put(1, "foo", "bar"));
207 WriteOptions wo;
208 wo.sync = true;
209 wo.disableWAL = false;
210 WriteBatch empty_batch;
211 ASSERT_OK(dbfull()->Write(wo, &empty_batch));
212
213 // make sure we can re-open it.
214 ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
215 ASSERT_EQ("bar", Get(1, "foo"));
216 }
217
218 TEST_F(DBTest, SkipDelay) {
219 Options options = CurrentOptions();
220 options.env = env_;
221 options.write_buffer_size = 100000;
222 CreateAndReopenWithCF({"pikachu"}, options);
223
224 for (bool sync : {true, false}) {
225 for (bool disableWAL : {true, false}) {
226 // Use a small number to ensure a large delay that is still effective
227 // when we do Put
228 // TODO(myabandeh): this is time dependent and could potentially make
229 // the test flaky
230 auto token = dbfull()->TEST_write_controler().GetDelayToken(1);
231 std::atomic<int> sleep_count(0);
232 rocksdb::SyncPoint::GetInstance()->SetCallBack(
233 "DBImpl::DelayWrite:Sleep",
234 [&](void* arg) { sleep_count.fetch_add(1); });
235 std::atomic<int> wait_count(0);
236 rocksdb::SyncPoint::GetInstance()->SetCallBack(
237 "DBImpl::DelayWrite:Wait",
238 [&](void* arg) { wait_count.fetch_add(1); });
239 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
240
241 WriteOptions wo;
242 wo.sync = sync;
243 wo.disableWAL = disableWAL;
244 wo.no_slowdown = true;
245 dbfull()->Put(wo, "foo", "bar");
246 // We need the 2nd write to trigger delay. This is because delay is
247 // estimated based on the last write size which is 0 for the first write.
248 ASSERT_NOK(dbfull()->Put(wo, "foo2", "bar2"));
249 ASSERT_GE(sleep_count.load(), 0);
250 ASSERT_GE(wait_count.load(), 0);
251 token.reset();
252
253 token = dbfull()->TEST_write_controler().GetDelayToken(1000000000);
254 wo.no_slowdown = false;
255 ASSERT_OK(dbfull()->Put(wo, "foo3", "bar3"));
256 ASSERT_GE(sleep_count.load(), 1);
257 token.reset();
258 }
259 }
260 }
261
262 #ifndef ROCKSDB_LITE
263
264 TEST_F(DBTest, LevelLimitReopen) {
265 Options options = CurrentOptions();
266 CreateAndReopenWithCF({"pikachu"}, options);
267
268 const std::string value(1024 * 1024, ' ');
269 int i = 0;
270 while (NumTableFilesAtLevel(2, 1) == 0) {
271 ASSERT_OK(Put(1, Key(i++), value));
272 }
273
274 options.num_levels = 1;
275 options.max_bytes_for_level_multiplier_additional.resize(1, 1);
276 Status s = TryReopenWithColumnFamilies({"default", "pikachu"}, options);
277 ASSERT_EQ(s.IsInvalidArgument(), true);
278 ASSERT_EQ(s.ToString(),
279 "Invalid argument: db has more levels than options.num_levels");
280
281 options.num_levels = 10;
282 options.max_bytes_for_level_multiplier_additional.resize(10, 1);
283 ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
284 }
285 #endif // ROCKSDB_LITE
286
287
288 TEST_F(DBTest, PutSingleDeleteGet) {
289 do {
290 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
291 ASSERT_OK(Put(1, "foo", "v1"));
292 ASSERT_EQ("v1", Get(1, "foo"));
293 ASSERT_OK(Put(1, "foo2", "v2"));
294 ASSERT_EQ("v2", Get(1, "foo2"));
295 ASSERT_OK(SingleDelete(1, "foo"));
296 ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
297 // Skip HashCuckooRep as it does not support single delete. FIFO and
298 // universal compaction do not apply to the test case. Skip MergePut
299 // because single delete does not get removed when it encounters a merge.
300 } while (ChangeOptions(kSkipHashCuckoo | kSkipFIFOCompaction |
301 kSkipUniversalCompaction | kSkipMergePut));
302 }
303
304 TEST_F(DBTest, ReadFromPersistedTier) {
305 do {
306 Random rnd(301);
307 Options options = CurrentOptions();
308 for (int disableWAL = 0; disableWAL <= 1; ++disableWAL) {
309 CreateAndReopenWithCF({"pikachu"}, options);
310 WriteOptions wopt;
311 wopt.disableWAL = (disableWAL == 1);
312 // 1st round: put but not flush
313 ASSERT_OK(db_->Put(wopt, handles_[1], "foo", "first"));
314 ASSERT_OK(db_->Put(wopt, handles_[1], "bar", "one"));
315 ASSERT_EQ("first", Get(1, "foo"));
316 ASSERT_EQ("one", Get(1, "bar"));
317
318 // Read directly from persited data.
319 ReadOptions ropt;
320 ropt.read_tier = kPersistedTier;
321 std::string value;
322 if (wopt.disableWAL) {
323 // as data has not yet being flushed, we expect not found.
324 ASSERT_TRUE(db_->Get(ropt, handles_[1], "foo", &value).IsNotFound());
325 ASSERT_TRUE(db_->Get(ropt, handles_[1], "bar", &value).IsNotFound());
326 } else {
327 ASSERT_OK(db_->Get(ropt, handles_[1], "foo", &value));
328 ASSERT_OK(db_->Get(ropt, handles_[1], "bar", &value));
329 }
330
331 // Multiget
332 std::vector<ColumnFamilyHandle*> multiget_cfs;
333 multiget_cfs.push_back(handles_[1]);
334 multiget_cfs.push_back(handles_[1]);
335 std::vector<Slice> multiget_keys;
336 multiget_keys.push_back("foo");
337 multiget_keys.push_back("bar");
338 std::vector<std::string> multiget_values;
339 auto statuses =
340 db_->MultiGet(ropt, multiget_cfs, multiget_keys, &multiget_values);
341 if (wopt.disableWAL) {
342 ASSERT_TRUE(statuses[0].IsNotFound());
343 ASSERT_TRUE(statuses[1].IsNotFound());
344 } else {
345 ASSERT_OK(statuses[0]);
346 ASSERT_OK(statuses[1]);
347 }
348
349 // 2nd round: flush and put a new value in memtable.
350 ASSERT_OK(Flush(1));
351 ASSERT_OK(db_->Put(wopt, handles_[1], "rocksdb", "hello"));
352
353 // once the data has been flushed, we are able to get the
354 // data when kPersistedTier is used.
355 ASSERT_TRUE(db_->Get(ropt, handles_[1], "foo", &value).ok());
356 ASSERT_EQ(value, "first");
357 ASSERT_TRUE(db_->Get(ropt, handles_[1], "bar", &value).ok());
358 ASSERT_EQ(value, "one");
359 if (wopt.disableWAL) {
360 ASSERT_TRUE(
361 db_->Get(ropt, handles_[1], "rocksdb", &value).IsNotFound());
362 } else {
363 ASSERT_OK(db_->Get(ropt, handles_[1], "rocksdb", &value));
364 ASSERT_EQ(value, "hello");
365 }
366
367 // Expect same result in multiget
368 multiget_cfs.push_back(handles_[1]);
369 multiget_keys.push_back("rocksdb");
370 statuses =
371 db_->MultiGet(ropt, multiget_cfs, multiget_keys, &multiget_values);
372 ASSERT_TRUE(statuses[0].ok());
373 ASSERT_EQ("first", multiget_values[0]);
374 ASSERT_TRUE(statuses[1].ok());
375 ASSERT_EQ("one", multiget_values[1]);
376 if (wopt.disableWAL) {
377 ASSERT_TRUE(statuses[2].IsNotFound());
378 } else {
379 ASSERT_OK(statuses[2]);
380 }
381
382 // 3rd round: delete and flush
383 ASSERT_OK(db_->Delete(wopt, handles_[1], "foo"));
384 Flush(1);
385 ASSERT_OK(db_->Delete(wopt, handles_[1], "bar"));
386
387 ASSERT_TRUE(db_->Get(ropt, handles_[1], "foo", &value).IsNotFound());
388 if (wopt.disableWAL) {
389 // Still expect finding the value as its delete has not yet being
390 // flushed.
391 ASSERT_TRUE(db_->Get(ropt, handles_[1], "bar", &value).ok());
392 ASSERT_EQ(value, "one");
393 } else {
394 ASSERT_TRUE(db_->Get(ropt, handles_[1], "bar", &value).IsNotFound());
395 }
396 ASSERT_TRUE(db_->Get(ropt, handles_[1], "rocksdb", &value).ok());
397 ASSERT_EQ(value, "hello");
398
399 statuses =
400 db_->MultiGet(ropt, multiget_cfs, multiget_keys, &multiget_values);
401 ASSERT_TRUE(statuses[0].IsNotFound());
402 if (wopt.disableWAL) {
403 ASSERT_TRUE(statuses[1].ok());
404 ASSERT_EQ("one", multiget_values[1]);
405 } else {
406 ASSERT_TRUE(statuses[1].IsNotFound());
407 }
408 ASSERT_TRUE(statuses[2].ok());
409 ASSERT_EQ("hello", multiget_values[2]);
410 if (wopt.disableWAL == 0) {
411 DestroyAndReopen(options);
412 }
413 }
414 } while (ChangeOptions(kSkipHashCuckoo));
415 }
416
417 TEST_F(DBTest, SingleDeleteFlush) {
418 // Test to check whether flushing preserves a single delete hidden
419 // behind a put.
420 do {
421 Random rnd(301);
422
423 Options options = CurrentOptions();
424 options.disable_auto_compactions = true;
425 CreateAndReopenWithCF({"pikachu"}, options);
426
427 // Put values on second level (so that they will not be in the same
428 // compaction as the other operations.
429 Put(1, "foo", "first");
430 Put(1, "bar", "one");
431 ASSERT_OK(Flush(1));
432 MoveFilesToLevel(2, 1);
433
434 // (Single) delete hidden by a put
435 SingleDelete(1, "foo");
436 Put(1, "foo", "second");
437 Delete(1, "bar");
438 Put(1, "bar", "two");
439 ASSERT_OK(Flush(1));
440
441 SingleDelete(1, "foo");
442 Delete(1, "bar");
443 ASSERT_OK(Flush(1));
444
445 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
446 nullptr);
447
448 ASSERT_EQ("NOT_FOUND", Get(1, "bar"));
449 ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
450 // Skip HashCuckooRep as it does not support single delete. FIFO and
451 // universal compaction do not apply to the test case. Skip MergePut
452 // because merges cannot be combined with single deletions.
453 } while (ChangeOptions(kSkipHashCuckoo | kSkipFIFOCompaction |
454 kSkipUniversalCompaction | kSkipMergePut));
455 }
456
457 TEST_F(DBTest, SingleDeletePutFlush) {
458 // Single deletes that encounter the matching put in a flush should get
459 // removed.
460 do {
461 Random rnd(301);
462
463 Options options = CurrentOptions();
464 options.disable_auto_compactions = true;
465 CreateAndReopenWithCF({"pikachu"}, options);
466
467 Put(1, "foo", Slice());
468 Put(1, "a", Slice());
469 SingleDelete(1, "a");
470 ASSERT_OK(Flush(1));
471
472 ASSERT_EQ("[ ]", AllEntriesFor("a", 1));
473 // Skip HashCuckooRep as it does not support single delete. FIFO and
474 // universal compaction do not apply to the test case. Skip MergePut
475 // because merges cannot be combined with single deletions.
476 } while (ChangeOptions(kSkipHashCuckoo | kSkipFIFOCompaction |
477 kSkipUniversalCompaction | kSkipMergePut));
478 }
479
480 // Disable because not all platform can run it.
481 // It requires more than 9GB memory to run it, With single allocation
482 // of more than 3GB.
483 TEST_F(DBTest, DISABLED_VeryLargeValue) {
484 const size_t kValueSize = 3221225472u; // 3GB value
485 const size_t kKeySize = 8388608u; // 8MB key
486 std::string raw(kValueSize, 'v');
487 std::string key1(kKeySize, 'c');
488 std::string key2(kKeySize, 'd');
489
490 Options options = CurrentOptions();
491 options.env = env_;
492 options.write_buffer_size = 100000; // Small write buffer
493 options.paranoid_checks = true;
494 DestroyAndReopen(options);
495
496 ASSERT_OK(Put("boo", "v1"));
497 ASSERT_OK(Put("foo", "v1"));
498 ASSERT_OK(Put(key1, raw));
499 raw[0] = 'w';
500 ASSERT_OK(Put(key2, raw));
501 dbfull()->TEST_WaitForFlushMemTable();
502
503 ASSERT_EQ(1, NumTableFilesAtLevel(0));
504
505 std::string value;
506 Status s = db_->Get(ReadOptions(), key1, &value);
507 ASSERT_OK(s);
508 ASSERT_EQ(kValueSize, value.size());
509 ASSERT_EQ('v', value[0]);
510
511 s = db_->Get(ReadOptions(), key2, &value);
512 ASSERT_OK(s);
513 ASSERT_EQ(kValueSize, value.size());
514 ASSERT_EQ('w', value[0]);
515
516 // Compact all files.
517 Flush();
518 db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
519
520 // Check DB is not in read-only state.
521 ASSERT_OK(Put("boo", "v1"));
522
523 s = db_->Get(ReadOptions(), key1, &value);
524 ASSERT_OK(s);
525 ASSERT_EQ(kValueSize, value.size());
526 ASSERT_EQ('v', value[0]);
527
528 s = db_->Get(ReadOptions(), key2, &value);
529 ASSERT_OK(s);
530 ASSERT_EQ(kValueSize, value.size());
531 ASSERT_EQ('w', value[0]);
532 }
533
534 TEST_F(DBTest, GetFromImmutableLayer) {
535 do {
536 Options options = CurrentOptions();
537 options.env = env_;
538 CreateAndReopenWithCF({"pikachu"}, options);
539
540 ASSERT_OK(Put(1, "foo", "v1"));
541 ASSERT_EQ("v1", Get(1, "foo"));
542
543 // Block sync calls
544 env_->delay_sstable_sync_.store(true, std::memory_order_release);
545 Put(1, "k1", std::string(100000, 'x')); // Fill memtable
546 Put(1, "k2", std::string(100000, 'y')); // Trigger flush
547 ASSERT_EQ("v1", Get(1, "foo"));
548 ASSERT_EQ("NOT_FOUND", Get(0, "foo"));
549 // Release sync calls
550 env_->delay_sstable_sync_.store(false, std::memory_order_release);
551 } while (ChangeOptions());
552 }
553
554
555 TEST_F(DBTest, GetLevel0Ordering) {
556 do {
557 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
558 // Check that we process level-0 files in correct order. The code
559 // below generates two level-0 files where the earlier one comes
560 // before the later one in the level-0 file list since the earlier
561 // one has a smaller "smallest" key.
562 ASSERT_OK(Put(1, "bar", "b"));
563 ASSERT_OK(Put(1, "foo", "v1"));
564 ASSERT_OK(Flush(1));
565 ASSERT_OK(Put(1, "foo", "v2"));
566 ASSERT_OK(Flush(1));
567 ASSERT_EQ("v2", Get(1, "foo"));
568 } while (ChangeOptions());
569 }
570
571 TEST_F(DBTest, WrongLevel0Config) {
572 Options options = CurrentOptions();
573 Close();
574 ASSERT_OK(DestroyDB(dbname_, options));
575 options.level0_stop_writes_trigger = 1;
576 options.level0_slowdown_writes_trigger = 2;
577 options.level0_file_num_compaction_trigger = 3;
578 ASSERT_OK(DB::Open(options, dbname_, &db_));
579 }
580
581 #ifndef ROCKSDB_LITE
582 TEST_F(DBTest, GetOrderedByLevels) {
583 do {
584 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
585 ASSERT_OK(Put(1, "foo", "v1"));
586 Compact(1, "a", "z");
587 ASSERT_EQ("v1", Get(1, "foo"));
588 ASSERT_OK(Put(1, "foo", "v2"));
589 ASSERT_EQ("v2", Get(1, "foo"));
590 ASSERT_OK(Flush(1));
591 ASSERT_EQ("v2", Get(1, "foo"));
592 } while (ChangeOptions());
593 }
594
595 TEST_F(DBTest, GetPicksCorrectFile) {
596 do {
597 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
598 // Arrange to have multiple files in a non-level-0 level.
599 ASSERT_OK(Put(1, "a", "va"));
600 Compact(1, "a", "b");
601 ASSERT_OK(Put(1, "x", "vx"));
602 Compact(1, "x", "y");
603 ASSERT_OK(Put(1, "f", "vf"));
604 Compact(1, "f", "g");
605 ASSERT_EQ("va", Get(1, "a"));
606 ASSERT_EQ("vf", Get(1, "f"));
607 ASSERT_EQ("vx", Get(1, "x"));
608 } while (ChangeOptions());
609 }
610
611 TEST_F(DBTest, GetEncountersEmptyLevel) {
612 do {
613 Options options = CurrentOptions();
614 CreateAndReopenWithCF({"pikachu"}, options);
615 // Arrange for the following to happen:
616 // * sstable A in level 0
617 // * nothing in level 1
618 // * sstable B in level 2
619 // Then do enough Get() calls to arrange for an automatic compaction
620 // of sstable A. A bug would cause the compaction to be marked as
621 // occurring at level 1 (instead of the correct level 0).
622
623 // Step 1: First place sstables in levels 0 and 2
624 Put(1, "a", "begin");
625 Put(1, "z", "end");
626 ASSERT_OK(Flush(1));
627 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
628 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
629 Put(1, "a", "begin");
630 Put(1, "z", "end");
631 ASSERT_OK(Flush(1));
632 ASSERT_GT(NumTableFilesAtLevel(0, 1), 0);
633 ASSERT_GT(NumTableFilesAtLevel(2, 1), 0);
634
635 // Step 2: clear level 1 if necessary.
636 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
637 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 1);
638 ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
639 ASSERT_EQ(NumTableFilesAtLevel(2, 1), 1);
640
641 // Step 3: read a bunch of times
642 for (int i = 0; i < 1000; i++) {
643 ASSERT_EQ("NOT_FOUND", Get(1, "missing"));
644 }
645
646 // Step 4: Wait for compaction to finish
647 dbfull()->TEST_WaitForCompact();
648
649 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 1); // XXX
650 } while (ChangeOptions(kSkipUniversalCompaction | kSkipFIFOCompaction));
651 }
652 #endif // ROCKSDB_LITE
653
654 TEST_F(DBTest, FlushMultipleMemtable) {
655 do {
656 Options options = CurrentOptions();
657 WriteOptions writeOpt = WriteOptions();
658 writeOpt.disableWAL = true;
659 options.max_write_buffer_number = 4;
660 options.min_write_buffer_number_to_merge = 3;
661 options.max_write_buffer_number_to_maintain = -1;
662 CreateAndReopenWithCF({"pikachu"}, options);
663 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
664 ASSERT_OK(Flush(1));
665 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
666
667 ASSERT_EQ("v1", Get(1, "foo"));
668 ASSERT_EQ("v1", Get(1, "bar"));
669 ASSERT_OK(Flush(1));
670 } while (ChangeCompactOptions());
671 }
672 #ifndef ROCKSDB_LITE
673 TEST_F(DBTest, FlushSchedule) {
674 Options options = CurrentOptions();
675 options.disable_auto_compactions = true;
676 options.level0_stop_writes_trigger = 1 << 10;
677 options.level0_slowdown_writes_trigger = 1 << 10;
678 options.min_write_buffer_number_to_merge = 1;
679 options.max_write_buffer_number_to_maintain = 1;
680 options.max_write_buffer_number = 2;
681 options.write_buffer_size = 120 * 1024;
682 CreateAndReopenWithCF({"pikachu"}, options);
683 std::vector<port::Thread> threads;
684
685 std::atomic<int> thread_num(0);
686 // each column family will have 5 thread, each thread generating 2 memtables.
687 // each column family should end up with 10 table files
688 std::function<void()> fill_memtable_func = [&]() {
689 int a = thread_num.fetch_add(1);
690 Random rnd(a);
691 WriteOptions wo;
692 // this should fill up 2 memtables
693 for (int k = 0; k < 5000; ++k) {
694 ASSERT_OK(db_->Put(wo, handles_[a & 1], RandomString(&rnd, 13), ""));
695 }
696 };
697
698 for (int i = 0; i < 10; ++i) {
699 threads.emplace_back(fill_memtable_func);
700 }
701
702 for (auto& t : threads) {
703 t.join();
704 }
705
706 auto default_tables = GetNumberOfSstFilesForColumnFamily(db_, "default");
707 auto pikachu_tables = GetNumberOfSstFilesForColumnFamily(db_, "pikachu");
708 ASSERT_LE(default_tables, static_cast<uint64_t>(10));
709 ASSERT_GT(default_tables, static_cast<uint64_t>(0));
710 ASSERT_LE(pikachu_tables, static_cast<uint64_t>(10));
711 ASSERT_GT(pikachu_tables, static_cast<uint64_t>(0));
712 }
713 #endif // ROCKSDB_LITE
714
715 namespace {
716 class KeepFilter : public CompactionFilter {
717 public:
718 virtual bool Filter(int level, const Slice& key, const Slice& value,
719 std::string* new_value,
720 bool* value_changed) const override {
721 return false;
722 }
723
724 virtual const char* Name() const override { return "KeepFilter"; }
725 };
726
727 class KeepFilterFactory : public CompactionFilterFactory {
728 public:
729 explicit KeepFilterFactory(bool check_context = false)
730 : check_context_(check_context) {}
731
732 virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
733 const CompactionFilter::Context& context) override {
734 if (check_context_) {
735 EXPECT_EQ(expect_full_compaction_.load(), context.is_full_compaction);
736 EXPECT_EQ(expect_manual_compaction_.load(), context.is_manual_compaction);
737 }
738 return std::unique_ptr<CompactionFilter>(new KeepFilter());
739 }
740
741 virtual const char* Name() const override { return "KeepFilterFactory"; }
742 bool check_context_;
743 std::atomic_bool expect_full_compaction_;
744 std::atomic_bool expect_manual_compaction_;
745 };
746
747 class DelayFilter : public CompactionFilter {
748 public:
749 explicit DelayFilter(DBTestBase* d) : db_test(d) {}
750 virtual bool Filter(int level, const Slice& key, const Slice& value,
751 std::string* new_value,
752 bool* value_changed) const override {
753 db_test->env_->addon_time_.fetch_add(1000);
754 return true;
755 }
756
757 virtual const char* Name() const override { return "DelayFilter"; }
758
759 private:
760 DBTestBase* db_test;
761 };
762
763 class DelayFilterFactory : public CompactionFilterFactory {
764 public:
765 explicit DelayFilterFactory(DBTestBase* d) : db_test(d) {}
766 virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
767 const CompactionFilter::Context& context) override {
768 return std::unique_ptr<CompactionFilter>(new DelayFilter(db_test));
769 }
770
771 virtual const char* Name() const override { return "DelayFilterFactory"; }
772
773 private:
774 DBTestBase* db_test;
775 };
776 } // namespace
777
778 #ifndef ROCKSDB_LITE
779
780 static std::string CompressibleString(Random* rnd, int len) {
781 std::string r;
782 test::CompressibleString(rnd, 0.8, len, &r);
783 return r;
784 }
785 #endif // ROCKSDB_LITE
786
787 TEST_F(DBTest, FailMoreDbPaths) {
788 Options options = CurrentOptions();
789 options.db_paths.emplace_back(dbname_, 10000000);
790 options.db_paths.emplace_back(dbname_ + "_2", 1000000);
791 options.db_paths.emplace_back(dbname_ + "_3", 1000000);
792 options.db_paths.emplace_back(dbname_ + "_4", 1000000);
793 options.db_paths.emplace_back(dbname_ + "_5", 1000000);
794 ASSERT_TRUE(TryReopen(options).IsNotSupported());
795 }
796
797 void CheckColumnFamilyMeta(const ColumnFamilyMetaData& cf_meta) {
798 uint64_t cf_size = 0;
799 uint64_t cf_csize = 0;
800 size_t file_count = 0;
801 for (auto level_meta : cf_meta.levels) {
802 uint64_t level_size = 0;
803 uint64_t level_csize = 0;
804 file_count += level_meta.files.size();
805 for (auto file_meta : level_meta.files) {
806 level_size += file_meta.size;
807 }
808 ASSERT_EQ(level_meta.size, level_size);
809 cf_size += level_size;
810 cf_csize += level_csize;
811 }
812 ASSERT_EQ(cf_meta.file_count, file_count);
813 ASSERT_EQ(cf_meta.size, cf_size);
814 }
815
816 #ifndef ROCKSDB_LITE
817 TEST_F(DBTest, ColumnFamilyMetaDataTest) {
818 Options options = CurrentOptions();
819 options.create_if_missing = true;
820 DestroyAndReopen(options);
821
822 Random rnd(301);
823 int key_index = 0;
824 ColumnFamilyMetaData cf_meta;
825 for (int i = 0; i < 100; ++i) {
826 GenerateNewFile(&rnd, &key_index);
827 db_->GetColumnFamilyMetaData(&cf_meta);
828 CheckColumnFamilyMeta(cf_meta);
829 }
830 }
831
832 namespace {
833 void MinLevelHelper(DBTest* self, Options& options) {
834 Random rnd(301);
835
836 for (int num = 0; num < options.level0_file_num_compaction_trigger - 1;
837 num++) {
838 std::vector<std::string> values;
839 // Write 120KB (12 values, each 10K)
840 for (int i = 0; i < 12; i++) {
841 values.push_back(DBTestBase::RandomString(&rnd, 10000));
842 ASSERT_OK(self->Put(DBTestBase::Key(i), values[i]));
843 }
844 self->dbfull()->TEST_WaitForFlushMemTable();
845 ASSERT_EQ(self->NumTableFilesAtLevel(0), num + 1);
846 }
847
848 // generate one more file in level-0, and should trigger level-0 compaction
849 std::vector<std::string> values;
850 for (int i = 0; i < 12; i++) {
851 values.push_back(DBTestBase::RandomString(&rnd, 10000));
852 ASSERT_OK(self->Put(DBTestBase::Key(i), values[i]));
853 }
854 self->dbfull()->TEST_WaitForCompact();
855
856 ASSERT_EQ(self->NumTableFilesAtLevel(0), 0);
857 ASSERT_EQ(self->NumTableFilesAtLevel(1), 1);
858 }
859
860 // returns false if the calling-Test should be skipped
861 bool MinLevelToCompress(CompressionType& type, Options& options, int wbits,
862 int lev, int strategy) {
863 fprintf(stderr,
864 "Test with compression options : window_bits = %d, level = %d, "
865 "strategy = %d}\n",
866 wbits, lev, strategy);
867 options.write_buffer_size = 100 << 10; // 100KB
868 options.arena_block_size = 4096;
869 options.num_levels = 3;
870 options.level0_file_num_compaction_trigger = 3;
871 options.create_if_missing = true;
872
873 if (Snappy_Supported()) {
874 type = kSnappyCompression;
875 fprintf(stderr, "using snappy\n");
876 } else if (Zlib_Supported()) {
877 type = kZlibCompression;
878 fprintf(stderr, "using zlib\n");
879 } else if (BZip2_Supported()) {
880 type = kBZip2Compression;
881 fprintf(stderr, "using bzip2\n");
882 } else if (LZ4_Supported()) {
883 type = kLZ4Compression;
884 fprintf(stderr, "using lz4\n");
885 } else if (XPRESS_Supported()) {
886 type = kXpressCompression;
887 fprintf(stderr, "using xpress\n");
888 } else if (ZSTD_Supported()) {
889 type = kZSTD;
890 fprintf(stderr, "using ZSTD\n");
891 } else {
892 fprintf(stderr, "skipping test, compression disabled\n");
893 return false;
894 }
895 options.compression_per_level.resize(options.num_levels);
896
897 // do not compress L0
898 for (int i = 0; i < 1; i++) {
899 options.compression_per_level[i] = kNoCompression;
900 }
901 for (int i = 1; i < options.num_levels; i++) {
902 options.compression_per_level[i] = type;
903 }
904 return true;
905 }
906 } // namespace
907
908 TEST_F(DBTest, MinLevelToCompress1) {
909 Options options = CurrentOptions();
910 CompressionType type = kSnappyCompression;
911 if (!MinLevelToCompress(type, options, -14, -1, 0)) {
912 return;
913 }
914 Reopen(options);
915 MinLevelHelper(this, options);
916
917 // do not compress L0 and L1
918 for (int i = 0; i < 2; i++) {
919 options.compression_per_level[i] = kNoCompression;
920 }
921 for (int i = 2; i < options.num_levels; i++) {
922 options.compression_per_level[i] = type;
923 }
924 DestroyAndReopen(options);
925 MinLevelHelper(this, options);
926 }
927
928 TEST_F(DBTest, MinLevelToCompress2) {
929 Options options = CurrentOptions();
930 CompressionType type = kSnappyCompression;
931 if (!MinLevelToCompress(type, options, 15, -1, 0)) {
932 return;
933 }
934 Reopen(options);
935 MinLevelHelper(this, options);
936
937 // do not compress L0 and L1
938 for (int i = 0; i < 2; i++) {
939 options.compression_per_level[i] = kNoCompression;
940 }
941 for (int i = 2; i < options.num_levels; i++) {
942 options.compression_per_level[i] = type;
943 }
944 DestroyAndReopen(options);
945 MinLevelHelper(this, options);
946 }
947
948 // This test may fail because of a legit case that multiple L0 files
949 // are trivial moved to L1.
950 TEST_F(DBTest, DISABLED_RepeatedWritesToSameKey) {
951 do {
952 Options options = CurrentOptions();
953 options.env = env_;
954 options.write_buffer_size = 100000; // Small write buffer
955 CreateAndReopenWithCF({"pikachu"}, options);
956
957 // We must have at most one file per level except for level-0,
958 // which may have up to kL0_StopWritesTrigger files.
959 const int kMaxFiles =
960 options.num_levels + options.level0_stop_writes_trigger;
961
962 Random rnd(301);
963 std::string value =
964 RandomString(&rnd, static_cast<int>(2 * options.write_buffer_size));
965 for (int i = 0; i < 5 * kMaxFiles; i++) {
966 ASSERT_OK(Put(1, "key", value));
967 ASSERT_LE(TotalTableFiles(1), kMaxFiles);
968 }
969 } while (ChangeCompactOptions());
970 }
971 #endif // ROCKSDB_LITE
972
973 TEST_F(DBTest, SparseMerge) {
974 do {
975 Options options = CurrentOptions();
976 options.compression = kNoCompression;
977 CreateAndReopenWithCF({"pikachu"}, options);
978
979 FillLevels("A", "Z", 1);
980
981 // Suppose there is:
982 // small amount of data with prefix A
983 // large amount of data with prefix B
984 // small amount of data with prefix C
985 // and that recent updates have made small changes to all three prefixes.
986 // Check that we do not do a compaction that merges all of B in one shot.
987 const std::string value(1000, 'x');
988 Put(1, "A", "va");
989 // Write approximately 100MB of "B" values
990 for (int i = 0; i < 100000; i++) {
991 char key[100];
992 snprintf(key, sizeof(key), "B%010d", i);
993 Put(1, key, value);
994 }
995 Put(1, "C", "vc");
996 ASSERT_OK(Flush(1));
997 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
998
999 // Make sparse update
1000 Put(1, "A", "va2");
1001 Put(1, "B100", "bvalue2");
1002 Put(1, "C", "vc2");
1003 ASSERT_OK(Flush(1));
1004
1005 // Compactions should not cause us to create a situation where
1006 // a file overlaps too much data at the next level.
1007 ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(handles_[1]),
1008 20 * 1048576);
1009 dbfull()->TEST_CompactRange(0, nullptr, nullptr);
1010 ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(handles_[1]),
1011 20 * 1048576);
1012 dbfull()->TEST_CompactRange(1, nullptr, nullptr);
1013 ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(handles_[1]),
1014 20 * 1048576);
1015 } while (ChangeCompactOptions());
1016 }
1017
1018 #ifndef ROCKSDB_LITE
1019 static bool Between(uint64_t val, uint64_t low, uint64_t high) {
1020 bool result = (val >= low) && (val <= high);
1021 if (!result) {
1022 fprintf(stderr, "Value %llu is not in range [%llu, %llu]\n",
1023 (unsigned long long)(val), (unsigned long long)(low),
1024 (unsigned long long)(high));
1025 }
1026 return result;
1027 }
1028
1029 TEST_F(DBTest, ApproximateSizesMemTable) {
1030 Options options = CurrentOptions();
1031 options.write_buffer_size = 100000000; // Large write buffer
1032 options.compression = kNoCompression;
1033 options.create_if_missing = true;
1034 DestroyAndReopen(options);
1035
1036 const int N = 128;
1037 Random rnd(301);
1038 for (int i = 0; i < N; i++) {
1039 ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024)));
1040 }
1041
1042 uint64_t size;
1043 std::string start = Key(50);
1044 std::string end = Key(60);
1045 Range r(start, end);
1046 uint8_t include_both = DB::SizeApproximationFlags::INCLUDE_FILES |
1047 DB::SizeApproximationFlags::INCLUDE_MEMTABLES;
1048 db_->GetApproximateSizes(&r, 1, &size, include_both);
1049 ASSERT_GT(size, 6000);
1050 ASSERT_LT(size, 204800);
1051 // Zero if not including mem table
1052 db_->GetApproximateSizes(&r, 1, &size);
1053 ASSERT_EQ(size, 0);
1054
1055 start = Key(500);
1056 end = Key(600);
1057 r = Range(start, end);
1058 db_->GetApproximateSizes(&r, 1, &size, include_both);
1059 ASSERT_EQ(size, 0);
1060
1061 for (int i = 0; i < N; i++) {
1062 ASSERT_OK(Put(Key(1000 + i), RandomString(&rnd, 1024)));
1063 }
1064
1065 start = Key(500);
1066 end = Key(600);
1067 r = Range(start, end);
1068 db_->GetApproximateSizes(&r, 1, &size, include_both);
1069 ASSERT_EQ(size, 0);
1070
1071 start = Key(100);
1072 end = Key(1020);
1073 r = Range(start, end);
1074 db_->GetApproximateSizes(&r, 1, &size, include_both);
1075 ASSERT_GT(size, 6000);
1076
1077 options.max_write_buffer_number = 8;
1078 options.min_write_buffer_number_to_merge = 5;
1079 options.write_buffer_size = 1024 * N; // Not very large
1080 DestroyAndReopen(options);
1081
1082 int keys[N * 3];
1083 for (int i = 0; i < N; i++) {
1084 keys[i * 3] = i * 5;
1085 keys[i * 3 + 1] = i * 5 + 1;
1086 keys[i * 3 + 2] = i * 5 + 2;
1087 }
1088 std::random_shuffle(std::begin(keys), std::end(keys));
1089
1090 for (int i = 0; i < N * 3; i++) {
1091 ASSERT_OK(Put(Key(keys[i] + 1000), RandomString(&rnd, 1024)));
1092 }
1093
1094 start = Key(100);
1095 end = Key(300);
1096 r = Range(start, end);
1097 db_->GetApproximateSizes(&r, 1, &size, include_both);
1098 ASSERT_EQ(size, 0);
1099
1100 start = Key(1050);
1101 end = Key(1080);
1102 r = Range(start, end);
1103 db_->GetApproximateSizes(&r, 1, &size, include_both);
1104 ASSERT_GT(size, 6000);
1105
1106 start = Key(2100);
1107 end = Key(2300);
1108 r = Range(start, end);
1109 db_->GetApproximateSizes(&r, 1, &size, include_both);
1110 ASSERT_EQ(size, 0);
1111
1112 start = Key(1050);
1113 end = Key(1080);
1114 r = Range(start, end);
1115 uint64_t size_with_mt, size_without_mt;
1116 db_->GetApproximateSizes(&r, 1, &size_with_mt, include_both);
1117 ASSERT_GT(size_with_mt, 6000);
1118 db_->GetApproximateSizes(&r, 1, &size_without_mt);
1119 ASSERT_EQ(size_without_mt, 0);
1120
1121 Flush();
1122
1123 for (int i = 0; i < N; i++) {
1124 ASSERT_OK(Put(Key(i + 1000), RandomString(&rnd, 1024)));
1125 }
1126
1127 start = Key(1050);
1128 end = Key(1080);
1129 r = Range(start, end);
1130 db_->GetApproximateSizes(&r, 1, &size_with_mt, include_both);
1131 db_->GetApproximateSizes(&r, 1, &size_without_mt);
1132 ASSERT_GT(size_with_mt, size_without_mt);
1133 ASSERT_GT(size_without_mt, 6000);
1134 }
1135
1136 TEST_F(DBTest, GetApproximateMemTableStats) {
1137 Options options = CurrentOptions();
1138 options.write_buffer_size = 100000000;
1139 options.compression = kNoCompression;
1140 options.create_if_missing = true;
1141 DestroyAndReopen(options);
1142
1143 const int N = 128;
1144 Random rnd(301);
1145 for (int i = 0; i < N; i++) {
1146 ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024)));
1147 }
1148
1149 uint64_t count;
1150 uint64_t size;
1151
1152 std::string start = Key(50);
1153 std::string end = Key(60);
1154 Range r(start, end);
1155 db_->GetApproximateMemTableStats(r, &count, &size);
1156 ASSERT_GT(count, 0);
1157 ASSERT_LE(count, N);
1158 ASSERT_GT(size, 6000);
1159 ASSERT_LT(size, 204800);
1160
1161 start = Key(500);
1162 end = Key(600);
1163 r = Range(start, end);
1164 db_->GetApproximateMemTableStats(r, &count, &size);
1165 ASSERT_EQ(count, 0);
1166 ASSERT_EQ(size, 0);
1167
1168 Flush();
1169
1170 start = Key(50);
1171 end = Key(60);
1172 r = Range(start, end);
1173 db_->GetApproximateMemTableStats(r, &count, &size);
1174 ASSERT_EQ(count, 0);
1175 ASSERT_EQ(size, 0);
1176
1177 for (int i = 0; i < N; i++) {
1178 ASSERT_OK(Put(Key(1000 + i), RandomString(&rnd, 1024)));
1179 }
1180
1181 start = Key(100);
1182 end = Key(1020);
1183 r = Range(start, end);
1184 db_->GetApproximateMemTableStats(r, &count, &size);
1185 ASSERT_GT(count, 20);
1186 ASSERT_GT(size, 6000);
1187 }
1188
1189 TEST_F(DBTest, ApproximateSizes) {
1190 do {
1191 Options options = CurrentOptions();
1192 options.write_buffer_size = 100000000; // Large write buffer
1193 options.compression = kNoCompression;
1194 options.create_if_missing = true;
1195 DestroyAndReopen(options);
1196 CreateAndReopenWithCF({"pikachu"}, options);
1197
1198 ASSERT_TRUE(Between(Size("", "xyz", 1), 0, 0));
1199 ReopenWithColumnFamilies({"default", "pikachu"}, options);
1200 ASSERT_TRUE(Between(Size("", "xyz", 1), 0, 0));
1201
1202 // Write 8MB (80 values, each 100K)
1203 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
1204 const int N = 80;
1205 static const int S1 = 100000;
1206 static const int S2 = 105000; // Allow some expansion from metadata
1207 Random rnd(301);
1208 for (int i = 0; i < N; i++) {
1209 ASSERT_OK(Put(1, Key(i), RandomString(&rnd, S1)));
1210 }
1211
1212 // 0 because GetApproximateSizes() does not account for memtable space
1213 ASSERT_TRUE(Between(Size("", Key(50), 1), 0, 0));
1214
1215 // Check sizes across recovery by reopening a few times
1216 for (int run = 0; run < 3; run++) {
1217 ReopenWithColumnFamilies({"default", "pikachu"}, options);
1218
1219 for (int compact_start = 0; compact_start < N; compact_start += 10) {
1220 for (int i = 0; i < N; i += 10) {
1221 ASSERT_TRUE(Between(Size("", Key(i), 1), S1 * i, S2 * i));
1222 ASSERT_TRUE(Between(Size("", Key(i) + ".suffix", 1), S1 * (i + 1),
1223 S2 * (i + 1)));
1224 ASSERT_TRUE(Between(Size(Key(i), Key(i + 10), 1), S1 * 10, S2 * 10));
1225 }
1226 ASSERT_TRUE(Between(Size("", Key(50), 1), S1 * 50, S2 * 50));
1227 ASSERT_TRUE(
1228 Between(Size("", Key(50) + ".suffix", 1), S1 * 50, S2 * 50));
1229
1230 std::string cstart_str = Key(compact_start);
1231 std::string cend_str = Key(compact_start + 9);
1232 Slice cstart = cstart_str;
1233 Slice cend = cend_str;
1234 dbfull()->TEST_CompactRange(0, &cstart, &cend, handles_[1]);
1235 }
1236
1237 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
1238 ASSERT_GT(NumTableFilesAtLevel(1, 1), 0);
1239 }
1240 // ApproximateOffsetOf() is not yet implemented in plain table format.
1241 } while (ChangeOptions(kSkipUniversalCompaction | kSkipFIFOCompaction |
1242 kSkipPlainTable | kSkipHashIndex));
1243 }
1244
1245 TEST_F(DBTest, ApproximateSizes_MixOfSmallAndLarge) {
1246 do {
1247 Options options = CurrentOptions();
1248 options.compression = kNoCompression;
1249 CreateAndReopenWithCF({"pikachu"}, options);
1250
1251 Random rnd(301);
1252 std::string big1 = RandomString(&rnd, 100000);
1253 ASSERT_OK(Put(1, Key(0), RandomString(&rnd, 10000)));
1254 ASSERT_OK(Put(1, Key(1), RandomString(&rnd, 10000)));
1255 ASSERT_OK(Put(1, Key(2), big1));
1256 ASSERT_OK(Put(1, Key(3), RandomString(&rnd, 10000)));
1257 ASSERT_OK(Put(1, Key(4), big1));
1258 ASSERT_OK(Put(1, Key(5), RandomString(&rnd, 10000)));
1259 ASSERT_OK(Put(1, Key(6), RandomString(&rnd, 300000)));
1260 ASSERT_OK(Put(1, Key(7), RandomString(&rnd, 10000)));
1261
1262 // Check sizes across recovery by reopening a few times
1263 for (int run = 0; run < 3; run++) {
1264 ReopenWithColumnFamilies({"default", "pikachu"}, options);
1265
1266 ASSERT_TRUE(Between(Size("", Key(0), 1), 0, 0));
1267 ASSERT_TRUE(Between(Size("", Key(1), 1), 10000, 11000));
1268 ASSERT_TRUE(Between(Size("", Key(2), 1), 20000, 21000));
1269 ASSERT_TRUE(Between(Size("", Key(3), 1), 120000, 121000));
1270 ASSERT_TRUE(Between(Size("", Key(4), 1), 130000, 131000));
1271 ASSERT_TRUE(Between(Size("", Key(5), 1), 230000, 231000));
1272 ASSERT_TRUE(Between(Size("", Key(6), 1), 240000, 241000));
1273 ASSERT_TRUE(Between(Size("", Key(7), 1), 540000, 541000));
1274 ASSERT_TRUE(Between(Size("", Key(8), 1), 550000, 560000));
1275
1276 ASSERT_TRUE(Between(Size(Key(3), Key(5), 1), 110000, 111000));
1277
1278 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
1279 }
1280 // ApproximateOffsetOf() is not yet implemented in plain table format.
1281 } while (ChangeOptions(kSkipPlainTable));
1282 }
1283 #endif // ROCKSDB_LITE
1284
1285 #ifndef ROCKSDB_LITE
1286 TEST_F(DBTest, Snapshot) {
1287 anon::OptionsOverride options_override;
1288 options_override.skip_policy = kSkipNoSnapshot;
1289 do {
1290 CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override));
1291 Put(0, "foo", "0v1");
1292 Put(1, "foo", "1v1");
1293
1294 const Snapshot* s1 = db_->GetSnapshot();
1295 ASSERT_EQ(1U, GetNumSnapshots());
1296 uint64_t time_snap1 = GetTimeOldestSnapshots();
1297 ASSERT_GT(time_snap1, 0U);
1298 Put(0, "foo", "0v2");
1299 Put(1, "foo", "1v2");
1300
1301 env_->addon_time_.fetch_add(1);
1302
1303 const Snapshot* s2 = db_->GetSnapshot();
1304 ASSERT_EQ(2U, GetNumSnapshots());
1305 ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
1306 Put(0, "foo", "0v3");
1307 Put(1, "foo", "1v3");
1308
1309 {
1310 ManagedSnapshot s3(db_);
1311 ASSERT_EQ(3U, GetNumSnapshots());
1312 ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
1313
1314 Put(0, "foo", "0v4");
1315 Put(1, "foo", "1v4");
1316 ASSERT_EQ("0v1", Get(0, "foo", s1));
1317 ASSERT_EQ("1v1", Get(1, "foo", s1));
1318 ASSERT_EQ("0v2", Get(0, "foo", s2));
1319 ASSERT_EQ("1v2", Get(1, "foo", s2));
1320 ASSERT_EQ("0v3", Get(0, "foo", s3.snapshot()));
1321 ASSERT_EQ("1v3", Get(1, "foo", s3.snapshot()));
1322 ASSERT_EQ("0v4", Get(0, "foo"));
1323 ASSERT_EQ("1v4", Get(1, "foo"));
1324 }
1325
1326 ASSERT_EQ(2U, GetNumSnapshots());
1327 ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
1328 ASSERT_EQ("0v1", Get(0, "foo", s1));
1329 ASSERT_EQ("1v1", Get(1, "foo", s1));
1330 ASSERT_EQ("0v2", Get(0, "foo", s2));
1331 ASSERT_EQ("1v2", Get(1, "foo", s2));
1332 ASSERT_EQ("0v4", Get(0, "foo"));
1333 ASSERT_EQ("1v4", Get(1, "foo"));
1334
1335 db_->ReleaseSnapshot(s1);
1336 ASSERT_EQ("0v2", Get(0, "foo", s2));
1337 ASSERT_EQ("1v2", Get(1, "foo", s2));
1338 ASSERT_EQ("0v4", Get(0, "foo"));
1339 ASSERT_EQ("1v4", Get(1, "foo"));
1340 ASSERT_EQ(1U, GetNumSnapshots());
1341 ASSERT_LT(time_snap1, GetTimeOldestSnapshots());
1342
1343 db_->ReleaseSnapshot(s2);
1344 ASSERT_EQ(0U, GetNumSnapshots());
1345 ASSERT_EQ("0v4", Get(0, "foo"));
1346 ASSERT_EQ("1v4", Get(1, "foo"));
1347 } while (ChangeOptions(kSkipHashCuckoo));
1348 }
1349
1350 TEST_F(DBTest, HiddenValuesAreRemoved) {
1351 anon::OptionsOverride options_override;
1352 options_override.skip_policy = kSkipNoSnapshot;
1353 do {
1354 Options options = CurrentOptions(options_override);
1355 CreateAndReopenWithCF({"pikachu"}, options);
1356 Random rnd(301);
1357 FillLevels("a", "z", 1);
1358
1359 std::string big = RandomString(&rnd, 50000);
1360 Put(1, "foo", big);
1361 Put(1, "pastfoo", "v");
1362 const Snapshot* snapshot = db_->GetSnapshot();
1363 Put(1, "foo", "tiny");
1364 Put(1, "pastfoo2", "v2"); // Advance sequence number one more
1365
1366 ASSERT_OK(Flush(1));
1367 ASSERT_GT(NumTableFilesAtLevel(0, 1), 0);
1368
1369 ASSERT_EQ(big, Get(1, "foo", snapshot));
1370 ASSERT_TRUE(Between(Size("", "pastfoo", 1), 50000, 60000));
1371 db_->ReleaseSnapshot(snapshot);
1372 ASSERT_EQ(AllEntriesFor("foo", 1), "[ tiny, " + big + " ]");
1373 Slice x("x");
1374 dbfull()->TEST_CompactRange(0, nullptr, &x, handles_[1]);
1375 ASSERT_EQ(AllEntriesFor("foo", 1), "[ tiny ]");
1376 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
1377 ASSERT_GE(NumTableFilesAtLevel(1, 1), 1);
1378 dbfull()->TEST_CompactRange(1, nullptr, &x, handles_[1]);
1379 ASSERT_EQ(AllEntriesFor("foo", 1), "[ tiny ]");
1380
1381 ASSERT_TRUE(Between(Size("", "pastfoo", 1), 0, 1000));
1382 // ApproximateOffsetOf() is not yet implemented in plain table format,
1383 // which is used by Size().
1384 // skip HashCuckooRep as it does not support snapshot
1385 } while (ChangeOptions(kSkipUniversalCompaction | kSkipFIFOCompaction |
1386 kSkipPlainTable | kSkipHashCuckoo));
1387 }
1388 #endif // ROCKSDB_LITE
1389
1390 TEST_F(DBTest, UnremovableSingleDelete) {
1391 // If we compact:
1392 //
1393 // Put(A, v1) Snapshot SingleDelete(A) Put(A, v2)
1394 //
1395 // We do not want to end up with:
1396 //
1397 // Put(A, v1) Snapshot Put(A, v2)
1398 //
1399 // Because a subsequent SingleDelete(A) would delete the Put(A, v2)
1400 // but not Put(A, v1), so Get(A) would return v1.
1401 anon::OptionsOverride options_override;
1402 options_override.skip_policy = kSkipNoSnapshot;
1403 do {
1404 Options options = CurrentOptions(options_override);
1405 options.disable_auto_compactions = true;
1406 CreateAndReopenWithCF({"pikachu"}, options);
1407
1408 Put(1, "foo", "first");
1409 const Snapshot* snapshot = db_->GetSnapshot();
1410 SingleDelete(1, "foo");
1411 Put(1, "foo", "second");
1412 ASSERT_OK(Flush(1));
1413
1414 ASSERT_EQ("first", Get(1, "foo", snapshot));
1415 ASSERT_EQ("second", Get(1, "foo"));
1416
1417 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
1418 nullptr);
1419 ASSERT_EQ("[ second, SDEL, first ]", AllEntriesFor("foo", 1));
1420
1421 SingleDelete(1, "foo");
1422
1423 ASSERT_EQ("first", Get(1, "foo", snapshot));
1424 ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
1425
1426 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
1427 nullptr);
1428
1429 ASSERT_EQ("first", Get(1, "foo", snapshot));
1430 ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
1431 db_->ReleaseSnapshot(snapshot);
1432 // Skip HashCuckooRep as it does not support single delete. FIFO and
1433 // universal compaction do not apply to the test case. Skip MergePut
1434 // because single delete does not get removed when it encounters a merge.
1435 } while (ChangeOptions(kSkipHashCuckoo | kSkipFIFOCompaction |
1436 kSkipUniversalCompaction | kSkipMergePut));
1437 }
1438
1439 #ifndef ROCKSDB_LITE
1440 TEST_F(DBTest, DeletionMarkers1) {
1441 Options options = CurrentOptions();
1442 options.max_background_flushes = 0;
1443 CreateAndReopenWithCF({"pikachu"}, options);
1444 Put(1, "foo", "v1");
1445 ASSERT_OK(Flush(1));
1446 const int last = 2;
1447 MoveFilesToLevel(last, 1);
1448 // foo => v1 is now in last level
1449 ASSERT_EQ(NumTableFilesAtLevel(last, 1), 1);
1450
1451 // Place a table at level last-1 to prevent merging with preceding mutation
1452 Put(1, "a", "begin");
1453 Put(1, "z", "end");
1454 Flush(1);
1455 MoveFilesToLevel(last - 1, 1);
1456 ASSERT_EQ(NumTableFilesAtLevel(last, 1), 1);
1457 ASSERT_EQ(NumTableFilesAtLevel(last - 1, 1), 1);
1458
1459 Delete(1, "foo");
1460 Put(1, "foo", "v2");
1461 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, DEL, v1 ]");
1462 ASSERT_OK(Flush(1)); // Moves to level last-2
1463 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, v1 ]");
1464 Slice z("z");
1465 dbfull()->TEST_CompactRange(last - 2, nullptr, &z, handles_[1]);
1466 // DEL eliminated, but v1 remains because we aren't compacting that level
1467 // (DEL can be eliminated because v2 hides v1).
1468 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, v1 ]");
1469 dbfull()->TEST_CompactRange(last - 1, nullptr, nullptr, handles_[1]);
1470 // Merging last-1 w/ last, so we are the base level for "foo", so
1471 // DEL is removed. (as is v1).
1472 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2 ]");
1473 }
1474
1475 TEST_F(DBTest, DeletionMarkers2) {
1476 Options options = CurrentOptions();
1477 CreateAndReopenWithCF({"pikachu"}, options);
1478 Put(1, "foo", "v1");
1479 ASSERT_OK(Flush(1));
1480 const int last = 2;
1481 MoveFilesToLevel(last, 1);
1482 // foo => v1 is now in last level
1483 ASSERT_EQ(NumTableFilesAtLevel(last, 1), 1);
1484
1485 // Place a table at level last-1 to prevent merging with preceding mutation
1486 Put(1, "a", "begin");
1487 Put(1, "z", "end");
1488 Flush(1);
1489 MoveFilesToLevel(last - 1, 1);
1490 ASSERT_EQ(NumTableFilesAtLevel(last, 1), 1);
1491 ASSERT_EQ(NumTableFilesAtLevel(last - 1, 1), 1);
1492
1493 Delete(1, "foo");
1494 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v1 ]");
1495 ASSERT_OK(Flush(1)); // Moves to level last-2
1496 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v1 ]");
1497 dbfull()->TEST_CompactRange(last - 2, nullptr, nullptr, handles_[1]);
1498 // DEL kept: "last" file overlaps
1499 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v1 ]");
1500 dbfull()->TEST_CompactRange(last - 1, nullptr, nullptr, handles_[1]);
1501 // Merging last-1 w/ last, so we are the base level for "foo", so
1502 // DEL is removed. (as is v1).
1503 ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
1504 }
1505
1506 TEST_F(DBTest, OverlapInLevel0) {
1507 do {
1508 Options options = CurrentOptions();
1509 CreateAndReopenWithCF({"pikachu"}, options);
1510
1511 // Fill levels 1 and 2 to disable the pushing of new memtables to levels >
1512 // 0.
1513 ASSERT_OK(Put(1, "100", "v100"));
1514 ASSERT_OK(Put(1, "999", "v999"));
1515 Flush(1);
1516 MoveFilesToLevel(2, 1);
1517 ASSERT_OK(Delete(1, "100"));
1518 ASSERT_OK(Delete(1, "999"));
1519 Flush(1);
1520 MoveFilesToLevel(1, 1);
1521 ASSERT_EQ("0,1,1", FilesPerLevel(1));
1522
1523 // Make files spanning the following ranges in level-0:
1524 // files[0] 200 .. 900
1525 // files[1] 300 .. 500
1526 // Note that files are sorted by smallest key.
1527 ASSERT_OK(Put(1, "300", "v300"));
1528 ASSERT_OK(Put(1, "500", "v500"));
1529 Flush(1);
1530 ASSERT_OK(Put(1, "200", "v200"));
1531 ASSERT_OK(Put(1, "600", "v600"));
1532 ASSERT_OK(Put(1, "900", "v900"));
1533 Flush(1);
1534 ASSERT_EQ("2,1,1", FilesPerLevel(1));
1535
1536 // Compact away the placeholder files we created initially
1537 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
1538 dbfull()->TEST_CompactRange(2, nullptr, nullptr, handles_[1]);
1539 ASSERT_EQ("2", FilesPerLevel(1));
1540
1541 // Do a memtable compaction. Before bug-fix, the compaction would
1542 // not detect the overlap with level-0 files and would incorrectly place
1543 // the deletion in a deeper level.
1544 ASSERT_OK(Delete(1, "600"));
1545 Flush(1);
1546 ASSERT_EQ("3", FilesPerLevel(1));
1547 ASSERT_EQ("NOT_FOUND", Get(1, "600"));
1548 } while (ChangeOptions(kSkipUniversalCompaction | kSkipFIFOCompaction));
1549 }
1550 #endif // ROCKSDB_LITE
1551
1552 TEST_F(DBTest, ComparatorCheck) {
1553 class NewComparator : public Comparator {
1554 public:
1555 virtual const char* Name() const override {
1556 return "rocksdb.NewComparator";
1557 }
1558 virtual int Compare(const Slice& a, const Slice& b) const override {
1559 return BytewiseComparator()->Compare(a, b);
1560 }
1561 virtual void FindShortestSeparator(std::string* s,
1562 const Slice& l) const override {
1563 BytewiseComparator()->FindShortestSeparator(s, l);
1564 }
1565 virtual void FindShortSuccessor(std::string* key) const override {
1566 BytewiseComparator()->FindShortSuccessor(key);
1567 }
1568 };
1569 Options new_options, options;
1570 NewComparator cmp;
1571 do {
1572 options = CurrentOptions();
1573 CreateAndReopenWithCF({"pikachu"}, options);
1574 new_options = CurrentOptions();
1575 new_options.comparator = &cmp;
1576 // only the non-default column family has non-matching comparator
1577 Status s = TryReopenWithColumnFamilies(
1578 {"default", "pikachu"}, std::vector<Options>({options, new_options}));
1579 ASSERT_TRUE(!s.ok());
1580 ASSERT_TRUE(s.ToString().find("comparator") != std::string::npos)
1581 << s.ToString();
1582 } while (ChangeCompactOptions());
1583 }
1584
1585 TEST_F(DBTest, CustomComparator) {
1586 class NumberComparator : public Comparator {
1587 public:
1588 virtual const char* Name() const override {
1589 return "test.NumberComparator";
1590 }
1591 virtual int Compare(const Slice& a, const Slice& b) const override {
1592 return ToNumber(a) - ToNumber(b);
1593 }
1594 virtual void FindShortestSeparator(std::string* s,
1595 const Slice& l) const override {
1596 ToNumber(*s); // Check format
1597 ToNumber(l); // Check format
1598 }
1599 virtual void FindShortSuccessor(std::string* key) const override {
1600 ToNumber(*key); // Check format
1601 }
1602
1603 private:
1604 static int ToNumber(const Slice& x) {
1605 // Check that there are no extra characters.
1606 EXPECT_TRUE(x.size() >= 2 && x[0] == '[' && x[x.size() - 1] == ']')
1607 << EscapeString(x);
1608 int val;
1609 char ignored;
1610 EXPECT_TRUE(sscanf(x.ToString().c_str(), "[%i]%c", &val, &ignored) == 1)
1611 << EscapeString(x);
1612 return val;
1613 }
1614 };
1615 Options new_options;
1616 NumberComparator cmp;
1617 do {
1618 new_options = CurrentOptions();
1619 new_options.create_if_missing = true;
1620 new_options.comparator = &cmp;
1621 new_options.write_buffer_size = 4096; // Compact more often
1622 new_options.arena_block_size = 4096;
1623 new_options = CurrentOptions(new_options);
1624 DestroyAndReopen(new_options);
1625 CreateAndReopenWithCF({"pikachu"}, new_options);
1626 ASSERT_OK(Put(1, "[10]", "ten"));
1627 ASSERT_OK(Put(1, "[0x14]", "twenty"));
1628 for (int i = 0; i < 2; i++) {
1629 ASSERT_EQ("ten", Get(1, "[10]"));
1630 ASSERT_EQ("ten", Get(1, "[0xa]"));
1631 ASSERT_EQ("twenty", Get(1, "[20]"));
1632 ASSERT_EQ("twenty", Get(1, "[0x14]"));
1633 ASSERT_EQ("NOT_FOUND", Get(1, "[15]"));
1634 ASSERT_EQ("NOT_FOUND", Get(1, "[0xf]"));
1635 Compact(1, "[0]", "[9999]");
1636 }
1637
1638 for (int run = 0; run < 2; run++) {
1639 for (int i = 0; i < 1000; i++) {
1640 char buf[100];
1641 snprintf(buf, sizeof(buf), "[%d]", i * 10);
1642 ASSERT_OK(Put(1, buf, buf));
1643 }
1644 Compact(1, "[0]", "[1000000]");
1645 }
1646 } while (ChangeCompactOptions());
1647 }
1648
1649 TEST_F(DBTest, DBOpen_Options) {
1650 Options options = CurrentOptions();
1651 std::string dbname = test::TmpDir(env_) + "/db_options_test";
1652 ASSERT_OK(DestroyDB(dbname, options));
1653
1654 // Does not exist, and create_if_missing == false: error
1655 DB* db = nullptr;
1656 options.create_if_missing = false;
1657 Status s = DB::Open(options, dbname, &db);
1658 ASSERT_TRUE(strstr(s.ToString().c_str(), "does not exist") != nullptr);
1659 ASSERT_TRUE(db == nullptr);
1660
1661 // Does not exist, and create_if_missing == true: OK
1662 options.create_if_missing = true;
1663 s = DB::Open(options, dbname, &db);
1664 ASSERT_OK(s);
1665 ASSERT_TRUE(db != nullptr);
1666
1667 delete db;
1668 db = nullptr;
1669
1670 // Does exist, and error_if_exists == true: error
1671 options.create_if_missing = false;
1672 options.error_if_exists = true;
1673 s = DB::Open(options, dbname, &db);
1674 ASSERT_TRUE(strstr(s.ToString().c_str(), "exists") != nullptr);
1675 ASSERT_TRUE(db == nullptr);
1676
1677 // Does exist, and error_if_exists == false: OK
1678 options.create_if_missing = true;
1679 options.error_if_exists = false;
1680 s = DB::Open(options, dbname, &db);
1681 ASSERT_OK(s);
1682 ASSERT_TRUE(db != nullptr);
1683
1684 delete db;
1685 db = nullptr;
1686 }
1687
1688 TEST_F(DBTest, DBOpen_Change_NumLevels) {
1689 Options options = CurrentOptions();
1690 options.create_if_missing = true;
1691 DestroyAndReopen(options);
1692 ASSERT_TRUE(db_ != nullptr);
1693 CreateAndReopenWithCF({"pikachu"}, options);
1694
1695 ASSERT_OK(Put(1, "a", "123"));
1696 ASSERT_OK(Put(1, "b", "234"));
1697 Flush(1);
1698 MoveFilesToLevel(3, 1);
1699 Close();
1700
1701 options.create_if_missing = false;
1702 options.num_levels = 2;
1703 Status s = TryReopenWithColumnFamilies({"default", "pikachu"}, options);
1704 ASSERT_TRUE(strstr(s.ToString().c_str(), "Invalid argument") != nullptr);
1705 ASSERT_TRUE(db_ == nullptr);
1706 }
1707
1708 TEST_F(DBTest, DestroyDBMetaDatabase) {
1709 std::string dbname = test::TmpDir(env_) + "/db_meta";
1710 ASSERT_OK(env_->CreateDirIfMissing(dbname));
1711 std::string metadbname = MetaDatabaseName(dbname, 0);
1712 ASSERT_OK(env_->CreateDirIfMissing(metadbname));
1713 std::string metametadbname = MetaDatabaseName(metadbname, 0);
1714 ASSERT_OK(env_->CreateDirIfMissing(metametadbname));
1715
1716 // Destroy previous versions if they exist. Using the long way.
1717 Options options = CurrentOptions();
1718 ASSERT_OK(DestroyDB(metametadbname, options));
1719 ASSERT_OK(DestroyDB(metadbname, options));
1720 ASSERT_OK(DestroyDB(dbname, options));
1721
1722 // Setup databases
1723 DB* db = nullptr;
1724 ASSERT_OK(DB::Open(options, dbname, &db));
1725 delete db;
1726 db = nullptr;
1727 ASSERT_OK(DB::Open(options, metadbname, &db));
1728 delete db;
1729 db = nullptr;
1730 ASSERT_OK(DB::Open(options, metametadbname, &db));
1731 delete db;
1732 db = nullptr;
1733
1734 // Delete databases
1735 ASSERT_OK(DestroyDB(dbname, options));
1736
1737 // Check if deletion worked.
1738 options.create_if_missing = false;
1739 ASSERT_TRUE(!(DB::Open(options, dbname, &db)).ok());
1740 ASSERT_TRUE(!(DB::Open(options, metadbname, &db)).ok());
1741 ASSERT_TRUE(!(DB::Open(options, metametadbname, &db)).ok());
1742 }
1743
1744 #ifndef ROCKSDB_LITE
1745 TEST_F(DBTest, SnapshotFiles) {
1746 do {
1747 Options options = CurrentOptions();
1748 options.write_buffer_size = 100000000; // Large write buffer
1749 CreateAndReopenWithCF({"pikachu"}, options);
1750
1751 Random rnd(301);
1752
1753 // Write 8MB (80 values, each 100K)
1754 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
1755 std::vector<std::string> values;
1756 for (int i = 0; i < 80; i++) {
1757 values.push_back(RandomString(&rnd, 100000));
1758 ASSERT_OK(Put((i < 40), Key(i), values[i]));
1759 }
1760
1761 // assert that nothing makes it to disk yet.
1762 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
1763
1764 // get a file snapshot
1765 uint64_t manifest_number = 0;
1766 uint64_t manifest_size = 0;
1767 std::vector<std::string> files;
1768 dbfull()->DisableFileDeletions();
1769 dbfull()->GetLiveFiles(files, &manifest_size);
1770
1771 // CURRENT, MANIFEST, OPTIONS, *.sst files (one for each CF)
1772 ASSERT_EQ(files.size(), 5U);
1773
1774 uint64_t number = 0;
1775 FileType type;
1776
1777 // copy these files to a new snapshot directory
1778 std::string snapdir = dbname_ + ".snapdir/";
1779 ASSERT_OK(env_->CreateDirIfMissing(snapdir));
1780
1781 for (size_t i = 0; i < files.size(); i++) {
1782 // our clients require that GetLiveFiles returns
1783 // files with "/" as first character!
1784 ASSERT_EQ(files[i][0], '/');
1785 std::string src = dbname_ + files[i];
1786 std::string dest = snapdir + files[i];
1787
1788 uint64_t size;
1789 ASSERT_OK(env_->GetFileSize(src, &size));
1790
1791 // record the number and the size of the
1792 // latest manifest file
1793 if (ParseFileName(files[i].substr(1), &number, &type)) {
1794 if (type == kDescriptorFile) {
1795 if (number > manifest_number) {
1796 manifest_number = number;
1797 ASSERT_GE(size, manifest_size);
1798 size = manifest_size; // copy only valid MANIFEST data
1799 }
1800 }
1801 }
1802 CopyFile(src, dest, size);
1803 }
1804
1805 // release file snapshot
1806 dbfull()->DisableFileDeletions();
1807 // overwrite one key, this key should not appear in the snapshot
1808 std::vector<std::string> extras;
1809 for (unsigned int i = 0; i < 1; i++) {
1810 extras.push_back(RandomString(&rnd, 100000));
1811 ASSERT_OK(Put(0, Key(i), extras[i]));
1812 }
1813
1814 // verify that data in the snapshot are correct
1815 std::vector<ColumnFamilyDescriptor> column_families;
1816 column_families.emplace_back("default", ColumnFamilyOptions());
1817 column_families.emplace_back("pikachu", ColumnFamilyOptions());
1818 std::vector<ColumnFamilyHandle*> cf_handles;
1819 DB* snapdb;
1820 DBOptions opts;
1821 opts.env = env_;
1822 opts.create_if_missing = false;
1823 Status stat =
1824 DB::Open(opts, snapdir, column_families, &cf_handles, &snapdb);
1825 ASSERT_OK(stat);
1826
1827 ReadOptions roptions;
1828 std::string val;
1829 for (unsigned int i = 0; i < 80; i++) {
1830 stat = snapdb->Get(roptions, cf_handles[i < 40], Key(i), &val);
1831 ASSERT_EQ(values[i].compare(val), 0);
1832 }
1833 for (auto cfh : cf_handles) {
1834 delete cfh;
1835 }
1836 delete snapdb;
1837
1838 // look at the new live files after we added an 'extra' key
1839 // and after we took the first snapshot.
1840 uint64_t new_manifest_number = 0;
1841 uint64_t new_manifest_size = 0;
1842 std::vector<std::string> newfiles;
1843 dbfull()->DisableFileDeletions();
1844 dbfull()->GetLiveFiles(newfiles, &new_manifest_size);
1845
1846 // find the new manifest file. assert that this manifest file is
1847 // the same one as in the previous snapshot. But its size should be
1848 // larger because we added an extra key after taking the
1849 // previous shapshot.
1850 for (size_t i = 0; i < newfiles.size(); i++) {
1851 std::string src = dbname_ + "/" + newfiles[i];
1852 // record the lognumber and the size of the
1853 // latest manifest file
1854 if (ParseFileName(newfiles[i].substr(1), &number, &type)) {
1855 if (type == kDescriptorFile) {
1856 if (number > new_manifest_number) {
1857 uint64_t size;
1858 new_manifest_number = number;
1859 ASSERT_OK(env_->GetFileSize(src, &size));
1860 ASSERT_GE(size, new_manifest_size);
1861 }
1862 }
1863 }
1864 }
1865 ASSERT_EQ(manifest_number, new_manifest_number);
1866 ASSERT_GT(new_manifest_size, manifest_size);
1867
1868 // release file snapshot
1869 dbfull()->DisableFileDeletions();
1870 } while (ChangeCompactOptions());
1871 }
1872 #endif
1873
1874 TEST_F(DBTest, PurgeInfoLogs) {
1875 Options options = CurrentOptions();
1876 options.keep_log_file_num = 5;
1877 options.create_if_missing = true;
1878 for (int mode = 0; mode <= 1; mode++) {
1879 if (mode == 1) {
1880 options.db_log_dir = dbname_ + "_logs";
1881 env_->CreateDirIfMissing(options.db_log_dir);
1882 } else {
1883 options.db_log_dir = "";
1884 }
1885 for (int i = 0; i < 8; i++) {
1886 Reopen(options);
1887 }
1888
1889 std::vector<std::string> files;
1890 env_->GetChildren(options.db_log_dir.empty() ? dbname_ : options.db_log_dir,
1891 &files);
1892 int info_log_count = 0;
1893 for (std::string file : files) {
1894 if (file.find("LOG") != std::string::npos) {
1895 info_log_count++;
1896 }
1897 }
1898 ASSERT_EQ(5, info_log_count);
1899
1900 Destroy(options);
1901 // For mode (1), test DestroyDB() to delete all the logs under DB dir.
1902 // For mode (2), no info log file should have been put under DB dir.
1903 std::vector<std::string> db_files;
1904 env_->GetChildren(dbname_, &db_files);
1905 for (std::string file : db_files) {
1906 ASSERT_TRUE(file.find("LOG") == std::string::npos);
1907 }
1908
1909 if (mode == 1) {
1910 // Cleaning up
1911 env_->GetChildren(options.db_log_dir, &files);
1912 for (std::string file : files) {
1913 env_->DeleteFile(options.db_log_dir + "/" + file);
1914 }
1915 env_->DeleteDir(options.db_log_dir);
1916 }
1917 }
1918 }
1919
1920 #ifndef ROCKSDB_LITE
1921 // Multi-threaded test:
1922 namespace {
1923
1924 static const int kColumnFamilies = 10;
1925 static const int kNumThreads = 10;
1926 static const int kTestSeconds = 10;
1927 static const int kNumKeys = 1000;
1928
1929 struct MTState {
1930 DBTest* test;
1931 std::atomic<bool> stop;
1932 std::atomic<int> counter[kNumThreads];
1933 std::atomic<bool> thread_done[kNumThreads];
1934 };
1935
1936 struct MTThread {
1937 MTState* state;
1938 int id;
1939 };
1940
1941 static void MTThreadBody(void* arg) {
1942 MTThread* t = reinterpret_cast<MTThread*>(arg);
1943 int id = t->id;
1944 DB* db = t->state->test->db_;
1945 int counter = 0;
1946 fprintf(stderr, "... starting thread %d\n", id);
1947 Random rnd(1000 + id);
1948 char valbuf[1500];
1949 while (t->state->stop.load(std::memory_order_acquire) == false) {
1950 t->state->counter[id].store(counter, std::memory_order_release);
1951
1952 int key = rnd.Uniform(kNumKeys);
1953 char keybuf[20];
1954 snprintf(keybuf, sizeof(keybuf), "%016d", key);
1955
1956 if (rnd.OneIn(2)) {
1957 // Write values of the form <key, my id, counter, cf, unique_id>.
1958 // into each of the CFs
1959 // We add some padding for force compactions.
1960 int unique_id = rnd.Uniform(1000000);
1961
1962 // Half of the time directly use WriteBatch. Half of the time use
1963 // WriteBatchWithIndex.
1964 if (rnd.OneIn(2)) {
1965 WriteBatch batch;
1966 for (int cf = 0; cf < kColumnFamilies; ++cf) {
1967 snprintf(valbuf, sizeof(valbuf), "%d.%d.%d.%d.%-1000d", key, id,
1968 static_cast<int>(counter), cf, unique_id);
1969 batch.Put(t->state->test->handles_[cf], Slice(keybuf), Slice(valbuf));
1970 }
1971 ASSERT_OK(db->Write(WriteOptions(), &batch));
1972 } else {
1973 WriteBatchWithIndex batch(db->GetOptions().comparator);
1974 for (int cf = 0; cf < kColumnFamilies; ++cf) {
1975 snprintf(valbuf, sizeof(valbuf), "%d.%d.%d.%d.%-1000d", key, id,
1976 static_cast<int>(counter), cf, unique_id);
1977 batch.Put(t->state->test->handles_[cf], Slice(keybuf), Slice(valbuf));
1978 }
1979 ASSERT_OK(db->Write(WriteOptions(), batch.GetWriteBatch()));
1980 }
1981 } else {
1982 // Read a value and verify that it matches the pattern written above
1983 // and that writes to all column families were atomic (unique_id is the
1984 // same)
1985 std::vector<Slice> keys(kColumnFamilies, Slice(keybuf));
1986 std::vector<std::string> values;
1987 std::vector<Status> statuses =
1988 db->MultiGet(ReadOptions(), t->state->test->handles_, keys, &values);
1989 Status s = statuses[0];
1990 // all statuses have to be the same
1991 for (size_t i = 1; i < statuses.size(); ++i) {
1992 // they are either both ok or both not-found
1993 ASSERT_TRUE((s.ok() && statuses[i].ok()) ||
1994 (s.IsNotFound() && statuses[i].IsNotFound()));
1995 }
1996 if (s.IsNotFound()) {
1997 // Key has not yet been written
1998 } else {
1999 // Check that the writer thread counter is >= the counter in the value
2000 ASSERT_OK(s);
2001 int unique_id = -1;
2002 for (int i = 0; i < kColumnFamilies; ++i) {
2003 int k, w, c, cf, u;
2004 ASSERT_EQ(5, sscanf(values[i].c_str(), "%d.%d.%d.%d.%d", &k, &w, &c,
2005 &cf, &u))
2006 << values[i];
2007 ASSERT_EQ(k, key);
2008 ASSERT_GE(w, 0);
2009 ASSERT_LT(w, kNumThreads);
2010 ASSERT_LE(c, t->state->counter[w].load(std::memory_order_acquire));
2011 ASSERT_EQ(cf, i);
2012 if (i == 0) {
2013 unique_id = u;
2014 } else {
2015 // this checks that updates across column families happened
2016 // atomically -- all unique ids are the same
2017 ASSERT_EQ(u, unique_id);
2018 }
2019 }
2020 }
2021 }
2022 counter++;
2023 }
2024 t->state->thread_done[id].store(true, std::memory_order_release);
2025 fprintf(stderr, "... stopping thread %d after %d ops\n", id, int(counter));
2026 }
2027
2028 } // namespace
2029
2030 class MultiThreadedDBTest : public DBTest,
2031 public ::testing::WithParamInterface<int> {
2032 public:
2033 virtual void SetUp() override { option_config_ = GetParam(); }
2034
2035 static std::vector<int> GenerateOptionConfigs() {
2036 std::vector<int> optionConfigs;
2037 for (int optionConfig = kDefault; optionConfig < kEnd; ++optionConfig) {
2038 // skip as HashCuckooRep does not support snapshot
2039 if (optionConfig != kHashCuckoo) {
2040 optionConfigs.push_back(optionConfig);
2041 }
2042 }
2043 return optionConfigs;
2044 }
2045 };
2046
2047 TEST_P(MultiThreadedDBTest, MultiThreaded) {
2048 anon::OptionsOverride options_override;
2049 options_override.skip_policy = kSkipNoSnapshot;
2050 Options options = CurrentOptions(options_override);
2051 std::vector<std::string> cfs;
2052 for (int i = 1; i < kColumnFamilies; ++i) {
2053 cfs.push_back(ToString(i));
2054 }
2055 Reopen(options);
2056 CreateAndReopenWithCF(cfs, options);
2057 // Initialize state
2058 MTState mt;
2059 mt.test = this;
2060 mt.stop.store(false, std::memory_order_release);
2061 for (int id = 0; id < kNumThreads; id++) {
2062 mt.counter[id].store(0, std::memory_order_release);
2063 mt.thread_done[id].store(false, std::memory_order_release);
2064 }
2065
2066 // Start threads
2067 MTThread thread[kNumThreads];
2068 for (int id = 0; id < kNumThreads; id++) {
2069 thread[id].state = &mt;
2070 thread[id].id = id;
2071 env_->StartThread(MTThreadBody, &thread[id]);
2072 }
2073
2074 // Let them run for a while
2075 env_->SleepForMicroseconds(kTestSeconds * 1000000);
2076
2077 // Stop the threads and wait for them to finish
2078 mt.stop.store(true, std::memory_order_release);
2079 for (int id = 0; id < kNumThreads; id++) {
2080 while (mt.thread_done[id].load(std::memory_order_acquire) == false) {
2081 env_->SleepForMicroseconds(100000);
2082 }
2083 }
2084 }
2085
2086 INSTANTIATE_TEST_CASE_P(
2087 MultiThreaded, MultiThreadedDBTest,
2088 ::testing::ValuesIn(MultiThreadedDBTest::GenerateOptionConfigs()));
2089 #endif // ROCKSDB_LITE
2090
2091 // Group commit test:
2092 namespace {
2093
2094 static const int kGCNumThreads = 4;
2095 static const int kGCNumKeys = 1000;
2096
2097 struct GCThread {
2098 DB* db;
2099 int id;
2100 std::atomic<bool> done;
2101 };
2102
2103 static void GCThreadBody(void* arg) {
2104 GCThread* t = reinterpret_cast<GCThread*>(arg);
2105 int id = t->id;
2106 DB* db = t->db;
2107 WriteOptions wo;
2108
2109 for (int i = 0; i < kGCNumKeys; ++i) {
2110 std::string kv(ToString(i + id * kGCNumKeys));
2111 ASSERT_OK(db->Put(wo, kv, kv));
2112 }
2113 t->done = true;
2114 }
2115
2116 } // namespace
2117
2118 TEST_F(DBTest, GroupCommitTest) {
2119 do {
2120 Options options = CurrentOptions();
2121 options.env = env_;
2122 env_->log_write_slowdown_.store(100);
2123 options.statistics = rocksdb::CreateDBStatistics();
2124 Reopen(options);
2125
2126 // Start threads
2127 GCThread thread[kGCNumThreads];
2128 for (int id = 0; id < kGCNumThreads; id++) {
2129 thread[id].id = id;
2130 thread[id].db = db_;
2131 thread[id].done = false;
2132 env_->StartThread(GCThreadBody, &thread[id]);
2133 }
2134
2135 for (int id = 0; id < kGCNumThreads; id++) {
2136 while (thread[id].done == false) {
2137 env_->SleepForMicroseconds(100000);
2138 }
2139 }
2140 env_->log_write_slowdown_.store(0);
2141
2142 ASSERT_GT(TestGetTickerCount(options, WRITE_DONE_BY_OTHER), 0);
2143
2144 std::vector<std::string> expected_db;
2145 for (int i = 0; i < kGCNumThreads * kGCNumKeys; ++i) {
2146 expected_db.push_back(ToString(i));
2147 }
2148 std::sort(expected_db.begin(), expected_db.end());
2149
2150 Iterator* itr = db_->NewIterator(ReadOptions());
2151 itr->SeekToFirst();
2152 for (auto x : expected_db) {
2153 ASSERT_TRUE(itr->Valid());
2154 ASSERT_EQ(itr->key().ToString(), x);
2155 ASSERT_EQ(itr->value().ToString(), x);
2156 itr->Next();
2157 }
2158 ASSERT_TRUE(!itr->Valid());
2159 delete itr;
2160
2161 HistogramData hist_data;
2162 options.statistics->histogramData(DB_WRITE, &hist_data);
2163 ASSERT_GT(hist_data.average, 0.0);
2164 } while (ChangeOptions(kSkipNoSeekToLast));
2165 }
2166
2167 namespace {
2168 typedef std::map<std::string, std::string> KVMap;
2169 }
2170
2171 class ModelDB : public DB {
2172 public:
2173 class ModelSnapshot : public Snapshot {
2174 public:
2175 KVMap map_;
2176
2177 virtual SequenceNumber GetSequenceNumber() const override {
2178 // no need to call this
2179 assert(false);
2180 return 0;
2181 }
2182 };
2183
2184 explicit ModelDB(const Options& options) : options_(options) {}
2185 using DB::Put;
2186 virtual Status Put(const WriteOptions& o, ColumnFamilyHandle* cf,
2187 const Slice& k, const Slice& v) override {
2188 WriteBatch batch;
2189 batch.Put(cf, k, v);
2190 return Write(o, &batch);
2191 }
2192 using DB::Delete;
2193 virtual Status Delete(const WriteOptions& o, ColumnFamilyHandle* cf,
2194 const Slice& key) override {
2195 WriteBatch batch;
2196 batch.Delete(cf, key);
2197 return Write(o, &batch);
2198 }
2199 using DB::SingleDelete;
2200 virtual Status SingleDelete(const WriteOptions& o, ColumnFamilyHandle* cf,
2201 const Slice& key) override {
2202 WriteBatch batch;
2203 batch.SingleDelete(cf, key);
2204 return Write(o, &batch);
2205 }
2206 using DB::Merge;
2207 virtual Status Merge(const WriteOptions& o, ColumnFamilyHandle* cf,
2208 const Slice& k, const Slice& v) override {
2209 WriteBatch batch;
2210 batch.Merge(cf, k, v);
2211 return Write(o, &batch);
2212 }
2213 using DB::Get;
2214 virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* cf,
2215 const Slice& key, PinnableSlice* value) override {
2216 return Status::NotSupported(key);
2217 }
2218
2219 using DB::MultiGet;
2220 virtual std::vector<Status> MultiGet(
2221 const ReadOptions& options,
2222 const std::vector<ColumnFamilyHandle*>& column_family,
2223 const std::vector<Slice>& keys,
2224 std::vector<std::string>* values) override {
2225 std::vector<Status> s(keys.size(),
2226 Status::NotSupported("Not implemented."));
2227 return s;
2228 }
2229
2230 #ifndef ROCKSDB_LITE
2231 using DB::IngestExternalFile;
2232 virtual Status IngestExternalFile(
2233 ColumnFamilyHandle* column_family,
2234 const std::vector<std::string>& external_files,
2235 const IngestExternalFileOptions& options) override {
2236 return Status::NotSupported("Not implemented.");
2237 }
2238
2239 using DB::GetPropertiesOfAllTables;
2240 virtual Status GetPropertiesOfAllTables(
2241 ColumnFamilyHandle* column_family,
2242 TablePropertiesCollection* props) override {
2243 return Status();
2244 }
2245
2246 virtual Status GetPropertiesOfTablesInRange(
2247 ColumnFamilyHandle* column_family, const Range* range, std::size_t n,
2248 TablePropertiesCollection* props) override {
2249 return Status();
2250 }
2251 #endif // ROCKSDB_LITE
2252
2253 using DB::KeyMayExist;
2254 virtual bool KeyMayExist(const ReadOptions& options,
2255 ColumnFamilyHandle* column_family, const Slice& key,
2256 std::string* value,
2257 bool* value_found = nullptr) override {
2258 if (value_found != nullptr) {
2259 *value_found = false;
2260 }
2261 return true; // Not Supported directly
2262 }
2263 using DB::NewIterator;
2264 virtual Iterator* NewIterator(const ReadOptions& options,
2265 ColumnFamilyHandle* column_family) override {
2266 if (options.snapshot == nullptr) {
2267 KVMap* saved = new KVMap;
2268 *saved = map_;
2269 return new ModelIter(saved, true);
2270 } else {
2271 const KVMap* snapshot_state =
2272 &(reinterpret_cast<const ModelSnapshot*>(options.snapshot)->map_);
2273 return new ModelIter(snapshot_state, false);
2274 }
2275 }
2276 virtual Status NewIterators(
2277 const ReadOptions& options,
2278 const std::vector<ColumnFamilyHandle*>& column_family,
2279 std::vector<Iterator*>* iterators) override {
2280 return Status::NotSupported("Not supported yet");
2281 }
2282 virtual const Snapshot* GetSnapshot() override {
2283 ModelSnapshot* snapshot = new ModelSnapshot;
2284 snapshot->map_ = map_;
2285 return snapshot;
2286 }
2287
2288 virtual void ReleaseSnapshot(const Snapshot* snapshot) override {
2289 delete reinterpret_cast<const ModelSnapshot*>(snapshot);
2290 }
2291
2292 virtual Status Write(const WriteOptions& options,
2293 WriteBatch* batch) override {
2294 class Handler : public WriteBatch::Handler {
2295 public:
2296 KVMap* map_;
2297 virtual void Put(const Slice& key, const Slice& value) override {
2298 (*map_)[key.ToString()] = value.ToString();
2299 }
2300 virtual void Merge(const Slice& key, const Slice& value) override {
2301 // ignore merge for now
2302 // (*map_)[key.ToString()] = value.ToString();
2303 }
2304 virtual void Delete(const Slice& key) override {
2305 map_->erase(key.ToString());
2306 }
2307 };
2308 Handler handler;
2309 handler.map_ = &map_;
2310 return batch->Iterate(&handler);
2311 }
2312
2313 using DB::GetProperty;
2314 virtual bool GetProperty(ColumnFamilyHandle* column_family,
2315 const Slice& property, std::string* value) override {
2316 return false;
2317 }
2318 using DB::GetIntProperty;
2319 virtual bool GetIntProperty(ColumnFamilyHandle* column_family,
2320 const Slice& property, uint64_t* value) override {
2321 return false;
2322 }
2323 using DB::GetMapProperty;
2324 virtual bool GetMapProperty(ColumnFamilyHandle* column_family,
2325 const Slice& property,
2326 std::map<std::string, double>* value) override {
2327 return false;
2328 }
2329 using DB::GetAggregatedIntProperty;
2330 virtual bool GetAggregatedIntProperty(const Slice& property,
2331 uint64_t* value) override {
2332 return false;
2333 }
2334 using DB::GetApproximateSizes;
2335 virtual void GetApproximateSizes(ColumnFamilyHandle* column_family,
2336 const Range* range, int n, uint64_t* sizes,
2337 uint8_t include_flags
2338 = INCLUDE_FILES) override {
2339 for (int i = 0; i < n; i++) {
2340 sizes[i] = 0;
2341 }
2342 }
2343 using DB::GetApproximateMemTableStats;
2344 virtual void GetApproximateMemTableStats(ColumnFamilyHandle* column_family,
2345 const Range& range,
2346 uint64_t* const count,
2347 uint64_t* const size) override {
2348 *count = 0;
2349 *size = 0;
2350 }
2351 using DB::CompactRange;
2352 virtual Status CompactRange(const CompactRangeOptions& options,
2353 ColumnFamilyHandle* column_family,
2354 const Slice* start, const Slice* end) override {
2355 return Status::NotSupported("Not supported operation.");
2356 }
2357
2358 virtual Status SetDBOptions(
2359 const std::unordered_map<std::string, std::string>& new_options)
2360 override {
2361 return Status::NotSupported("Not supported operation.");
2362 }
2363
2364 using DB::CompactFiles;
2365 virtual Status CompactFiles(const CompactionOptions& compact_options,
2366 ColumnFamilyHandle* column_family,
2367 const std::vector<std::string>& input_file_names,
2368 const int output_level,
2369 const int output_path_id = -1) override {
2370 return Status::NotSupported("Not supported operation.");
2371 }
2372
2373 Status PauseBackgroundWork() override {
2374 return Status::NotSupported("Not supported operation.");
2375 }
2376
2377 Status ContinueBackgroundWork() override {
2378 return Status::NotSupported("Not supported operation.");
2379 }
2380
2381 Status EnableAutoCompaction(
2382 const std::vector<ColumnFamilyHandle*>& column_family_handles) override {
2383 return Status::NotSupported("Not supported operation.");
2384 }
2385
2386 using DB::NumberLevels;
2387 virtual int NumberLevels(ColumnFamilyHandle* column_family) override {
2388 return 1;
2389 }
2390
2391 using DB::MaxMemCompactionLevel;
2392 virtual int MaxMemCompactionLevel(
2393 ColumnFamilyHandle* column_family) override {
2394 return 1;
2395 }
2396
2397 using DB::Level0StopWriteTrigger;
2398 virtual int Level0StopWriteTrigger(
2399 ColumnFamilyHandle* column_family) override {
2400 return -1;
2401 }
2402
2403 virtual const std::string& GetName() const override { return name_; }
2404
2405 virtual Env* GetEnv() const override { return nullptr; }
2406
2407 using DB::GetOptions;
2408 virtual Options GetOptions(ColumnFamilyHandle* column_family) const override {
2409 return options_;
2410 }
2411
2412 using DB::GetDBOptions;
2413 virtual DBOptions GetDBOptions() const override { return options_; }
2414
2415 using DB::Flush;
2416 virtual Status Flush(const rocksdb::FlushOptions& options,
2417 ColumnFamilyHandle* column_family) override {
2418 Status ret;
2419 return ret;
2420 }
2421
2422 virtual Status SyncWAL() override { return Status::OK(); }
2423
2424 #ifndef ROCKSDB_LITE
2425 virtual Status DisableFileDeletions() override { return Status::OK(); }
2426
2427 virtual Status EnableFileDeletions(bool force) override {
2428 return Status::OK();
2429 }
2430 virtual Status GetLiveFiles(std::vector<std::string>&, uint64_t* size,
2431 bool flush_memtable = true) override {
2432 return Status::OK();
2433 }
2434
2435 virtual Status GetSortedWalFiles(VectorLogPtr& files) override {
2436 return Status::OK();
2437 }
2438
2439 virtual Status DeleteFile(std::string name) override { return Status::OK(); }
2440
2441 virtual Status GetUpdatesSince(
2442 rocksdb::SequenceNumber, unique_ptr<rocksdb::TransactionLogIterator>*,
2443 const TransactionLogIterator::ReadOptions& read_options =
2444 TransactionLogIterator::ReadOptions()) override {
2445 return Status::NotSupported("Not supported in Model DB");
2446 }
2447
2448 virtual void GetColumnFamilyMetaData(
2449 ColumnFamilyHandle* column_family,
2450 ColumnFamilyMetaData* metadata) override {}
2451 #endif // ROCKSDB_LITE
2452
2453 virtual Status GetDbIdentity(std::string& identity) const override {
2454 return Status::OK();
2455 }
2456
2457 virtual SequenceNumber GetLatestSequenceNumber() const override { return 0; }
2458
2459 virtual ColumnFamilyHandle* DefaultColumnFamily() const override {
2460 return nullptr;
2461 }
2462
2463 private:
2464 class ModelIter : public Iterator {
2465 public:
2466 ModelIter(const KVMap* map, bool owned)
2467 : map_(map), owned_(owned), iter_(map_->end()) {}
2468 ~ModelIter() {
2469 if (owned_) delete map_;
2470 }
2471 virtual bool Valid() const override { return iter_ != map_->end(); }
2472 virtual void SeekToFirst() override { iter_ = map_->begin(); }
2473 virtual void SeekToLast() override {
2474 if (map_->empty()) {
2475 iter_ = map_->end();
2476 } else {
2477 iter_ = map_->find(map_->rbegin()->first);
2478 }
2479 }
2480 virtual void Seek(const Slice& k) override {
2481 iter_ = map_->lower_bound(k.ToString());
2482 }
2483 virtual void SeekForPrev(const Slice& k) override {
2484 iter_ = map_->upper_bound(k.ToString());
2485 Prev();
2486 }
2487 virtual void Next() override { ++iter_; }
2488 virtual void Prev() override {
2489 if (iter_ == map_->begin()) {
2490 iter_ = map_->end();
2491 return;
2492 }
2493 --iter_;
2494 }
2495
2496 virtual Slice key() const override { return iter_->first; }
2497 virtual Slice value() const override { return iter_->second; }
2498 virtual Status status() const override { return Status::OK(); }
2499
2500 private:
2501 const KVMap* const map_;
2502 const bool owned_; // Do we own map_
2503 KVMap::const_iterator iter_;
2504 };
2505 const Options options_;
2506 KVMap map_;
2507 std::string name_ = "";
2508 };
2509
2510 static std::string RandomKey(Random* rnd, int minimum = 0) {
2511 int len;
2512 do {
2513 len = (rnd->OneIn(3)
2514 ? 1 // Short sometimes to encourage collisions
2515 : (rnd->OneIn(100) ? rnd->Skewed(10) : rnd->Uniform(10)));
2516 } while (len < minimum);
2517 return test::RandomKey(rnd, len);
2518 }
2519
2520 static bool CompareIterators(int step, DB* model, DB* db,
2521 const Snapshot* model_snap,
2522 const Snapshot* db_snap) {
2523 ReadOptions options;
2524 options.snapshot = model_snap;
2525 Iterator* miter = model->NewIterator(options);
2526 options.snapshot = db_snap;
2527 Iterator* dbiter = db->NewIterator(options);
2528 bool ok = true;
2529 int count = 0;
2530 for (miter->SeekToFirst(), dbiter->SeekToFirst();
2531 ok && miter->Valid() && dbiter->Valid(); miter->Next(), dbiter->Next()) {
2532 count++;
2533 if (miter->key().compare(dbiter->key()) != 0) {
2534 fprintf(stderr, "step %d: Key mismatch: '%s' vs. '%s'\n", step,
2535 EscapeString(miter->key()).c_str(),
2536 EscapeString(dbiter->key()).c_str());
2537 ok = false;
2538 break;
2539 }
2540
2541 if (miter->value().compare(dbiter->value()) != 0) {
2542 fprintf(stderr, "step %d: Value mismatch for key '%s': '%s' vs. '%s'\n",
2543 step, EscapeString(miter->key()).c_str(),
2544 EscapeString(miter->value()).c_str(),
2545 EscapeString(miter->value()).c_str());
2546 ok = false;
2547 }
2548 }
2549
2550 if (ok) {
2551 if (miter->Valid() != dbiter->Valid()) {
2552 fprintf(stderr, "step %d: Mismatch at end of iterators: %d vs. %d\n",
2553 step, miter->Valid(), dbiter->Valid());
2554 ok = false;
2555 }
2556 }
2557 delete miter;
2558 delete dbiter;
2559 return ok;
2560 }
2561
2562 class DBTestRandomized : public DBTest,
2563 public ::testing::WithParamInterface<int> {
2564 public:
2565 virtual void SetUp() override { option_config_ = GetParam(); }
2566
2567 static std::vector<int> GenerateOptionConfigs() {
2568 std::vector<int> option_configs;
2569 // skip cuckoo hash as it does not support snapshot.
2570 for (int option_config = kDefault; option_config < kEnd; ++option_config) {
2571 if (!ShouldSkipOptions(option_config, kSkipDeletesFilterFirst |
2572 kSkipNoSeekToLast |
2573 kSkipHashCuckoo)) {
2574 option_configs.push_back(option_config);
2575 }
2576 }
2577 option_configs.push_back(kBlockBasedTableWithIndexRestartInterval);
2578 return option_configs;
2579 }
2580 };
2581
2582 INSTANTIATE_TEST_CASE_P(
2583 DBTestRandomized, DBTestRandomized,
2584 ::testing::ValuesIn(DBTestRandomized::GenerateOptionConfigs()));
2585
2586 TEST_P(DBTestRandomized, Randomized) {
2587 anon::OptionsOverride options_override;
2588 options_override.skip_policy = kSkipNoSnapshot;
2589 Options options = CurrentOptions(options_override);
2590 DestroyAndReopen(options);
2591
2592 Random rnd(test::RandomSeed() + GetParam());
2593 ModelDB model(options);
2594 const int N = 10000;
2595 const Snapshot* model_snap = nullptr;
2596 const Snapshot* db_snap = nullptr;
2597 std::string k, v;
2598 for (int step = 0; step < N; step++) {
2599 // TODO(sanjay): Test Get() works
2600 int p = rnd.Uniform(100);
2601 int minimum = 0;
2602 if (option_config_ == kHashSkipList || option_config_ == kHashLinkList ||
2603 option_config_ == kHashCuckoo ||
2604 option_config_ == kPlainTableFirstBytePrefix ||
2605 option_config_ == kBlockBasedTableWithWholeKeyHashIndex ||
2606 option_config_ == kBlockBasedTableWithPrefixHashIndex) {
2607 minimum = 1;
2608 }
2609 if (p < 45) { // Put
2610 k = RandomKey(&rnd, minimum);
2611 v = RandomString(&rnd,
2612 rnd.OneIn(20) ? 100 + rnd.Uniform(100) : rnd.Uniform(8));
2613 ASSERT_OK(model.Put(WriteOptions(), k, v));
2614 ASSERT_OK(db_->Put(WriteOptions(), k, v));
2615 } else if (p < 90) { // Delete
2616 k = RandomKey(&rnd, minimum);
2617 ASSERT_OK(model.Delete(WriteOptions(), k));
2618 ASSERT_OK(db_->Delete(WriteOptions(), k));
2619 } else { // Multi-element batch
2620 WriteBatch b;
2621 const int num = rnd.Uniform(8);
2622 for (int i = 0; i < num; i++) {
2623 if (i == 0 || !rnd.OneIn(10)) {
2624 k = RandomKey(&rnd, minimum);
2625 } else {
2626 // Periodically re-use the same key from the previous iter, so
2627 // we have multiple entries in the write batch for the same key
2628 }
2629 if (rnd.OneIn(2)) {
2630 v = RandomString(&rnd, rnd.Uniform(10));
2631 b.Put(k, v);
2632 } else {
2633 b.Delete(k);
2634 }
2635 }
2636 ASSERT_OK(model.Write(WriteOptions(), &b));
2637 ASSERT_OK(db_->Write(WriteOptions(), &b));
2638 }
2639
2640 if ((step % 100) == 0) {
2641 // For DB instances that use the hash index + block-based table, the
2642 // iterator will be invalid right when seeking a non-existent key, right
2643 // than return a key that is close to it.
2644 if (option_config_ != kBlockBasedTableWithWholeKeyHashIndex &&
2645 option_config_ != kBlockBasedTableWithPrefixHashIndex) {
2646 ASSERT_TRUE(CompareIterators(step, &model, db_, nullptr, nullptr));
2647 ASSERT_TRUE(CompareIterators(step, &model, db_, model_snap, db_snap));
2648 }
2649
2650 // Save a snapshot from each DB this time that we'll use next
2651 // time we compare things, to make sure the current state is
2652 // preserved with the snapshot
2653 if (model_snap != nullptr) model.ReleaseSnapshot(model_snap);
2654 if (db_snap != nullptr) db_->ReleaseSnapshot(db_snap);
2655
2656 Reopen(options);
2657 ASSERT_TRUE(CompareIterators(step, &model, db_, nullptr, nullptr));
2658
2659 model_snap = model.GetSnapshot();
2660 db_snap = db_->GetSnapshot();
2661 }
2662 }
2663 if (model_snap != nullptr) model.ReleaseSnapshot(model_snap);
2664 if (db_snap != nullptr) db_->ReleaseSnapshot(db_snap);
2665 }
2666
2667 TEST_F(DBTest, BlockBasedTablePrefixIndexTest) {
2668 // create a DB with block prefix index
2669 BlockBasedTableOptions table_options;
2670 Options options = CurrentOptions();
2671 table_options.index_type = BlockBasedTableOptions::kHashSearch;
2672 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
2673 options.prefix_extractor.reset(NewFixedPrefixTransform(1));
2674
2675 Reopen(options);
2676 ASSERT_OK(Put("k1", "v1"));
2677 Flush();
2678 ASSERT_OK(Put("k2", "v2"));
2679
2680 // Reopen it without prefix extractor, make sure everything still works.
2681 // RocksDB should just fall back to the binary index.
2682 table_options.index_type = BlockBasedTableOptions::kBinarySearch;
2683 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
2684 options.prefix_extractor.reset();
2685
2686 Reopen(options);
2687 ASSERT_EQ("v1", Get("k1"));
2688 ASSERT_EQ("v2", Get("k2"));
2689 }
2690
2691 TEST_F(DBTest, ChecksumTest) {
2692 BlockBasedTableOptions table_options;
2693 Options options = CurrentOptions();
2694
2695 table_options.checksum = kCRC32c;
2696 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
2697 Reopen(options);
2698 ASSERT_OK(Put("a", "b"));
2699 ASSERT_OK(Put("c", "d"));
2700 ASSERT_OK(Flush()); // table with crc checksum
2701
2702 table_options.checksum = kxxHash;
2703 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
2704 Reopen(options);
2705 ASSERT_OK(Put("e", "f"));
2706 ASSERT_OK(Put("g", "h"));
2707 ASSERT_OK(Flush()); // table with xxhash checksum
2708
2709 table_options.checksum = kCRC32c;
2710 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
2711 Reopen(options);
2712 ASSERT_EQ("b", Get("a"));
2713 ASSERT_EQ("d", Get("c"));
2714 ASSERT_EQ("f", Get("e"));
2715 ASSERT_EQ("h", Get("g"));
2716
2717 table_options.checksum = kCRC32c;
2718 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
2719 Reopen(options);
2720 ASSERT_EQ("b", Get("a"));
2721 ASSERT_EQ("d", Get("c"));
2722 ASSERT_EQ("f", Get("e"));
2723 ASSERT_EQ("h", Get("g"));
2724 }
2725
2726 #ifndef ROCKSDB_LITE
2727 TEST_P(DBTestWithParam, FIFOCompactionTest) {
2728 for (int iter = 0; iter < 2; ++iter) {
2729 // first iteration -- auto compaction
2730 // second iteration -- manual compaction
2731 Options options;
2732 options.compaction_style = kCompactionStyleFIFO;
2733 options.write_buffer_size = 100 << 10; // 100KB
2734 options.arena_block_size = 4096;
2735 options.compaction_options_fifo.max_table_files_size = 500 << 10; // 500KB
2736 options.compression = kNoCompression;
2737 options.create_if_missing = true;
2738 options.max_subcompactions = max_subcompactions_;
2739 if (iter == 1) {
2740 options.disable_auto_compactions = true;
2741 }
2742 options = CurrentOptions(options);
2743 DestroyAndReopen(options);
2744
2745 Random rnd(301);
2746 for (int i = 0; i < 6; ++i) {
2747 for (int j = 0; j < 110; ++j) {
2748 ASSERT_OK(Put(ToString(i * 100 + j), RandomString(&rnd, 980)));
2749 }
2750 // flush should happen here
2751 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
2752 }
2753 if (iter == 0) {
2754 ASSERT_OK(dbfull()->TEST_WaitForCompact());
2755 } else {
2756 CompactRangeOptions cro;
2757 cro.exclusive_manual_compaction = exclusive_manual_compaction_;
2758 ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
2759 }
2760 // only 5 files should survive
2761 ASSERT_EQ(NumTableFilesAtLevel(0), 5);
2762 for (int i = 0; i < 50; ++i) {
2763 // these keys should be deleted in previous compaction
2764 ASSERT_EQ("NOT_FOUND", Get(ToString(i)));
2765 }
2766 }
2767 }
2768 #endif // ROCKSDB_LITE
2769
2770 #ifndef ROCKSDB_LITE
2771 /*
2772 * This test is not reliable enough as it heavily depends on disk behavior.
2773 * Disable as it is flaky.
2774 */
2775 TEST_F(DBTest, DISABLED_RateLimitingTest) {
2776 Options options = CurrentOptions();
2777 options.write_buffer_size = 1 << 20; // 1MB
2778 options.level0_file_num_compaction_trigger = 2;
2779 options.target_file_size_base = 1 << 20; // 1MB
2780 options.max_bytes_for_level_base = 4 << 20; // 4MB
2781 options.max_bytes_for_level_multiplier = 4;
2782 options.compression = kNoCompression;
2783 options.create_if_missing = true;
2784 options.env = env_;
2785 options.statistics = rocksdb::CreateDBStatistics();
2786 options.IncreaseParallelism(4);
2787 DestroyAndReopen(options);
2788
2789 WriteOptions wo;
2790 wo.disableWAL = true;
2791
2792 // # no rate limiting
2793 Random rnd(301);
2794 uint64_t start = env_->NowMicros();
2795 // Write ~96M data
2796 for (int64_t i = 0; i < (96 << 10); ++i) {
2797 ASSERT_OK(
2798 Put(RandomString(&rnd, 32), RandomString(&rnd, (1 << 10) + 1), wo));
2799 }
2800 uint64_t elapsed = env_->NowMicros() - start;
2801 double raw_rate = env_->bytes_written_ * 1000000.0 / elapsed;
2802 uint64_t rate_limiter_drains =
2803 TestGetTickerCount(options, NUMBER_RATE_LIMITER_DRAINS);
2804 ASSERT_EQ(0, rate_limiter_drains);
2805 Close();
2806
2807 // # rate limiting with 0.7 x threshold
2808 options.rate_limiter.reset(
2809 NewGenericRateLimiter(static_cast<int64_t>(0.7 * raw_rate)));
2810 env_->bytes_written_ = 0;
2811 DestroyAndReopen(options);
2812
2813 start = env_->NowMicros();
2814 // Write ~96M data
2815 for (int64_t i = 0; i < (96 << 10); ++i) {
2816 ASSERT_OK(
2817 Put(RandomString(&rnd, 32), RandomString(&rnd, (1 << 10) + 1), wo));
2818 }
2819 rate_limiter_drains =
2820 TestGetTickerCount(options, NUMBER_RATE_LIMITER_DRAINS) -
2821 rate_limiter_drains;
2822 elapsed = env_->NowMicros() - start;
2823 Close();
2824 ASSERT_EQ(options.rate_limiter->GetTotalBytesThrough(), env_->bytes_written_);
2825 // Most intervals should've been drained (interval time is 100ms, elapsed is
2826 // micros)
2827 ASSERT_GT(rate_limiter_drains, 0);
2828 ASSERT_LE(rate_limiter_drains, elapsed / 100000 + 1);
2829 double ratio = env_->bytes_written_ * 1000000 / elapsed / raw_rate;
2830 fprintf(stderr, "write rate ratio = %.2lf, expected 0.7\n", ratio);
2831 ASSERT_TRUE(ratio < 0.8);
2832
2833 // # rate limiting with half of the raw_rate
2834 options.rate_limiter.reset(
2835 NewGenericRateLimiter(static_cast<int64_t>(raw_rate / 2)));
2836 env_->bytes_written_ = 0;
2837 DestroyAndReopen(options);
2838
2839 start = env_->NowMicros();
2840 // Write ~96M data
2841 for (int64_t i = 0; i < (96 << 10); ++i) {
2842 ASSERT_OK(
2843 Put(RandomString(&rnd, 32), RandomString(&rnd, (1 << 10) + 1), wo));
2844 }
2845 elapsed = env_->NowMicros() - start;
2846 rate_limiter_drains =
2847 TestGetTickerCount(options, NUMBER_RATE_LIMITER_DRAINS) -
2848 rate_limiter_drains;
2849 Close();
2850 ASSERT_EQ(options.rate_limiter->GetTotalBytesThrough(), env_->bytes_written_);
2851 // Most intervals should've been drained (interval time is 100ms, elapsed is
2852 // micros)
2853 ASSERT_GT(rate_limiter_drains, elapsed / 100000 / 2);
2854 ASSERT_LE(rate_limiter_drains, elapsed / 100000 + 1);
2855 ratio = env_->bytes_written_ * 1000000 / elapsed / raw_rate;
2856 fprintf(stderr, "write rate ratio = %.2lf, expected 0.5\n", ratio);
2857 ASSERT_LT(ratio, 0.6);
2858 }
2859
2860 TEST_F(DBTest, TableOptionsSanitizeTest) {
2861 Options options = CurrentOptions();
2862 options.create_if_missing = true;
2863 DestroyAndReopen(options);
2864 ASSERT_EQ(db_->GetOptions().allow_mmap_reads, false);
2865
2866 options.table_factory.reset(new PlainTableFactory());
2867 options.prefix_extractor.reset(NewNoopTransform());
2868 Destroy(options);
2869 ASSERT_TRUE(!TryReopen(options).IsNotSupported());
2870
2871 // Test for check of prefix_extractor when hash index is used for
2872 // block-based table
2873 BlockBasedTableOptions to;
2874 to.index_type = BlockBasedTableOptions::kHashSearch;
2875 options = CurrentOptions();
2876 options.create_if_missing = true;
2877 options.table_factory.reset(NewBlockBasedTableFactory(to));
2878 ASSERT_TRUE(TryReopen(options).IsInvalidArgument());
2879 options.prefix_extractor.reset(NewFixedPrefixTransform(1));
2880 ASSERT_OK(TryReopen(options));
2881 }
2882
2883 TEST_F(DBTest, ConcurrentMemtableNotSupported) {
2884 Options options = CurrentOptions();
2885 options.allow_concurrent_memtable_write = true;
2886 options.soft_pending_compaction_bytes_limit = 0;
2887 options.hard_pending_compaction_bytes_limit = 100;
2888 options.create_if_missing = true;
2889
2890 DestroyDB(dbname_, options);
2891 options.memtable_factory.reset(NewHashLinkListRepFactory(4, 0, 3, true, 4));
2892 ASSERT_NOK(TryReopen(options));
2893
2894 options.memtable_factory.reset(new SkipListFactory);
2895 ASSERT_OK(TryReopen(options));
2896
2897 ColumnFamilyOptions cf_options(options);
2898 cf_options.memtable_factory.reset(
2899 NewHashLinkListRepFactory(4, 0, 3, true, 4));
2900 ColumnFamilyHandle* handle;
2901 ASSERT_NOK(db_->CreateColumnFamily(cf_options, "name", &handle));
2902 }
2903
2904 #endif // ROCKSDB_LITE
2905
2906 TEST_F(DBTest, SanitizeNumThreads) {
2907 for (int attempt = 0; attempt < 2; attempt++) {
2908 const size_t kTotalTasks = 8;
2909 test::SleepingBackgroundTask sleeping_tasks[kTotalTasks];
2910
2911 Options options = CurrentOptions();
2912 if (attempt == 0) {
2913 options.max_background_compactions = 3;
2914 options.max_background_flushes = 2;
2915 }
2916 options.create_if_missing = true;
2917 DestroyAndReopen(options);
2918
2919 for (size_t i = 0; i < kTotalTasks; i++) {
2920 // Insert 5 tasks to low priority queue and 5 tasks to high priority queue
2921 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
2922 &sleeping_tasks[i],
2923 (i < 4) ? Env::Priority::LOW : Env::Priority::HIGH);
2924 }
2925
2926 // Wait 100 milliseconds for they are scheduled.
2927 env_->SleepForMicroseconds(100000);
2928
2929 // pool size 3, total task 4. Queue size should be 1.
2930 ASSERT_EQ(1U, options.env->GetThreadPoolQueueLen(Env::Priority::LOW));
2931 // pool size 2, total task 4. Queue size should be 2.
2932 ASSERT_EQ(2U, options.env->GetThreadPoolQueueLen(Env::Priority::HIGH));
2933
2934 for (size_t i = 0; i < kTotalTasks; i++) {
2935 sleeping_tasks[i].WakeUp();
2936 sleeping_tasks[i].WaitUntilDone();
2937 }
2938
2939 ASSERT_OK(Put("abc", "def"));
2940 ASSERT_EQ("def", Get("abc"));
2941 Flush();
2942 ASSERT_EQ("def", Get("abc"));
2943 }
2944 }
2945
2946 TEST_F(DBTest, WriteSingleThreadEntry) {
2947 std::vector<port::Thread> threads;
2948 dbfull()->TEST_LockMutex();
2949 auto w = dbfull()->TEST_BeginWrite();
2950 threads.emplace_back([&] { Put("a", "b"); });
2951 env_->SleepForMicroseconds(10000);
2952 threads.emplace_back([&] { Flush(); });
2953 env_->SleepForMicroseconds(10000);
2954 dbfull()->TEST_UnlockMutex();
2955 dbfull()->TEST_LockMutex();
2956 dbfull()->TEST_EndWrite(w);
2957 dbfull()->TEST_UnlockMutex();
2958
2959 for (auto& t : threads) {
2960 t.join();
2961 }
2962 }
2963
2964 #ifndef ROCKSDB_LITE
2965 TEST_F(DBTest, DynamicMemtableOptions) {
2966 const uint64_t k64KB = 1 << 16;
2967 const uint64_t k128KB = 1 << 17;
2968 const uint64_t k5KB = 5 * 1024;
2969 Options options;
2970 options.env = env_;
2971 options.create_if_missing = true;
2972 options.compression = kNoCompression;
2973 options.max_background_compactions = 1;
2974 options.write_buffer_size = k64KB;
2975 options.arena_block_size = 16 * 1024;
2976 options.max_write_buffer_number = 2;
2977 // Don't trigger compact/slowdown/stop
2978 options.level0_file_num_compaction_trigger = 1024;
2979 options.level0_slowdown_writes_trigger = 1024;
2980 options.level0_stop_writes_trigger = 1024;
2981 DestroyAndReopen(options);
2982
2983 auto gen_l0_kb = [this](int size) {
2984 const int kNumPutsBeforeWaitForFlush = 64;
2985 Random rnd(301);
2986 for (int i = 0; i < size; i++) {
2987 ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024)));
2988
2989 // The following condition prevents a race condition between flush jobs
2990 // acquiring work and this thread filling up multiple memtables. Without
2991 // this, the flush might produce less files than expected because
2992 // multiple memtables are flushed into a single L0 file. This race
2993 // condition affects assertion (A).
2994 if (i % kNumPutsBeforeWaitForFlush == kNumPutsBeforeWaitForFlush - 1) {
2995 dbfull()->TEST_WaitForFlushMemTable();
2996 }
2997 }
2998 dbfull()->TEST_WaitForFlushMemTable();
2999 };
3000
3001 // Test write_buffer_size
3002 gen_l0_kb(64);
3003 ASSERT_EQ(NumTableFilesAtLevel(0), 1);
3004 ASSERT_LT(SizeAtLevel(0), k64KB + k5KB);
3005 ASSERT_GT(SizeAtLevel(0), k64KB - k5KB * 2);
3006
3007 // Clean up L0
3008 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
3009 ASSERT_EQ(NumTableFilesAtLevel(0), 0);
3010
3011 // Increase buffer size
3012 ASSERT_OK(dbfull()->SetOptions({
3013 {"write_buffer_size", "131072"},
3014 }));
3015
3016 // The existing memtable is still 64KB in size, after it becomes immutable,
3017 // the next memtable will be 128KB in size. Write 256KB total, we should
3018 // have a 64KB L0 file, a 128KB L0 file, and a memtable with 64KB data
3019 gen_l0_kb(256);
3020 ASSERT_EQ(NumTableFilesAtLevel(0), 2); // (A)
3021 ASSERT_LT(SizeAtLevel(0), k128KB + k64KB + 2 * k5KB);
3022 ASSERT_GT(SizeAtLevel(0), k128KB + k64KB - 4 * k5KB);
3023
3024 // Test max_write_buffer_number
3025 // Block compaction thread, which will also block the flushes because
3026 // max_background_flushes == 0, so flushes are getting executed by the
3027 // compaction thread
3028 env_->SetBackgroundThreads(1, Env::LOW);
3029 test::SleepingBackgroundTask sleeping_task_low;
3030 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
3031 Env::Priority::LOW);
3032 // Start from scratch and disable compaction/flush. Flush can only happen
3033 // during compaction but trigger is pretty high
3034 options.max_background_flushes = 0;
3035 options.disable_auto_compactions = true;
3036 DestroyAndReopen(options);
3037
3038 // Put until writes are stopped, bounded by 256 puts. We should see stop at
3039 // ~128KB
3040 int count = 0;
3041 Random rnd(301);
3042
3043 rocksdb::SyncPoint::GetInstance()->SetCallBack(
3044 "DBImpl::DelayWrite:Wait",
3045 [&](void* arg) { sleeping_task_low.WakeUp(); });
3046 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
3047
3048 while (!sleeping_task_low.WokenUp() && count < 256) {
3049 ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), WriteOptions()));
3050 count++;
3051 }
3052 ASSERT_GT(static_cast<double>(count), 128 * 0.8);
3053 ASSERT_LT(static_cast<double>(count), 128 * 1.2);
3054
3055 sleeping_task_low.WaitUntilDone();
3056
3057 // Increase
3058 ASSERT_OK(dbfull()->SetOptions({
3059 {"max_write_buffer_number", "8"},
3060 }));
3061 // Clean up memtable and L0
3062 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
3063
3064 sleeping_task_low.Reset();
3065 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
3066 Env::Priority::LOW);
3067 count = 0;
3068 while (!sleeping_task_low.WokenUp() && count < 1024) {
3069 ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), WriteOptions()));
3070 count++;
3071 }
3072 // Windows fails this test. Will tune in the future and figure out
3073 // approp number
3074 #ifndef OS_WIN
3075 ASSERT_GT(static_cast<double>(count), 512 * 0.8);
3076 ASSERT_LT(static_cast<double>(count), 512 * 1.2);
3077 #endif
3078 sleeping_task_low.WaitUntilDone();
3079
3080 // Decrease
3081 ASSERT_OK(dbfull()->SetOptions({
3082 {"max_write_buffer_number", "4"},
3083 }));
3084 // Clean up memtable and L0
3085 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
3086
3087 sleeping_task_low.Reset();
3088 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
3089 Env::Priority::LOW);
3090
3091 count = 0;
3092 while (!sleeping_task_low.WokenUp() && count < 1024) {
3093 ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), WriteOptions()));
3094 count++;
3095 }
3096 // Windows fails this test. Will tune in the future and figure out
3097 // approp number
3098 #ifndef OS_WIN
3099 ASSERT_GT(static_cast<double>(count), 256 * 0.8);
3100 ASSERT_LT(static_cast<double>(count), 266 * 1.2);
3101 #endif
3102 sleeping_task_low.WaitUntilDone();
3103
3104 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
3105 }
3106 #endif // ROCKSDB_LITE
3107
3108 #ifdef ROCKSDB_USING_THREAD_STATUS
3109 namespace {
3110 void VerifyOperationCount(Env* env, ThreadStatus::OperationType op_type,
3111 int expected_count) {
3112 int op_count = 0;
3113 std::vector<ThreadStatus> thread_list;
3114 ASSERT_OK(env->GetThreadList(&thread_list));
3115 for (auto thread : thread_list) {
3116 if (thread.operation_type == op_type) {
3117 op_count++;
3118 }
3119 }
3120 ASSERT_EQ(op_count, expected_count);
3121 }
3122 } // namespace
3123
3124 TEST_F(DBTest, GetThreadStatus) {
3125 Options options;
3126 options.env = env_;
3127 options.enable_thread_tracking = true;
3128 TryReopen(options);
3129
3130 std::vector<ThreadStatus> thread_list;
3131 Status s = env_->GetThreadList(&thread_list);
3132
3133 for (int i = 0; i < 2; ++i) {
3134 // repeat the test with differet number of high / low priority threads
3135 const int kTestCount = 3;
3136 const unsigned int kHighPriCounts[kTestCount] = {3, 2, 5};
3137 const unsigned int kLowPriCounts[kTestCount] = {10, 15, 3};
3138 for (int test = 0; test < kTestCount; ++test) {
3139 // Change the number of threads in high / low priority pool.
3140 env_->SetBackgroundThreads(kHighPriCounts[test], Env::HIGH);
3141 env_->SetBackgroundThreads(kLowPriCounts[test], Env::LOW);
3142 // Wait to ensure the all threads has been registered
3143 unsigned int thread_type_counts[ThreadStatus::NUM_THREAD_TYPES];
3144 // Try up to 60 seconds.
3145 for (int num_try = 0; num_try < 60000; num_try++) {
3146 env_->SleepForMicroseconds(1000);
3147 thread_list.clear();
3148 s = env_->GetThreadList(&thread_list);
3149 ASSERT_OK(s);
3150 memset(thread_type_counts, 0, sizeof(thread_type_counts));
3151 for (auto thread : thread_list) {
3152 ASSERT_LT(thread.thread_type, ThreadStatus::NUM_THREAD_TYPES);
3153 thread_type_counts[thread.thread_type]++;
3154 }
3155 if (thread_type_counts[ThreadStatus::HIGH_PRIORITY] ==
3156 kHighPriCounts[test] &&
3157 thread_type_counts[ThreadStatus::LOW_PRIORITY] ==
3158 kLowPriCounts[test]) {
3159 break;
3160 }
3161 }
3162 // Verify the total number of threades
3163 ASSERT_EQ(thread_type_counts[ThreadStatus::HIGH_PRIORITY] +
3164 thread_type_counts[ThreadStatus::LOW_PRIORITY],
3165 kHighPriCounts[test] + kLowPriCounts[test]);
3166 // Verify the number of high-priority threads
3167 ASSERT_EQ(thread_type_counts[ThreadStatus::HIGH_PRIORITY],
3168 kHighPriCounts[test]);
3169 // Verify the number of low-priority threads
3170 ASSERT_EQ(thread_type_counts[ThreadStatus::LOW_PRIORITY],
3171 kLowPriCounts[test]);
3172 }
3173 if (i == 0) {
3174 // repeat the test with multiple column families
3175 CreateAndReopenWithCF({"pikachu", "about-to-remove"}, options);
3176 env_->GetThreadStatusUpdater()->TEST_VerifyColumnFamilyInfoMap(handles_,
3177 true);
3178 }
3179 }
3180 db_->DropColumnFamily(handles_[2]);
3181 delete handles_[2];
3182 handles_.erase(handles_.begin() + 2);
3183 env_->GetThreadStatusUpdater()->TEST_VerifyColumnFamilyInfoMap(handles_,
3184 true);
3185 Close();
3186 env_->GetThreadStatusUpdater()->TEST_VerifyColumnFamilyInfoMap(handles_,
3187 true);
3188 }
3189
3190 TEST_F(DBTest, DisableThreadStatus) {
3191 Options options;
3192 options.env = env_;
3193 options.enable_thread_tracking = false;
3194 TryReopen(options);
3195 CreateAndReopenWithCF({"pikachu", "about-to-remove"}, options);
3196 // Verify non of the column family info exists
3197 env_->GetThreadStatusUpdater()->TEST_VerifyColumnFamilyInfoMap(handles_,
3198 false);
3199 }
3200
3201 TEST_F(DBTest, ThreadStatusFlush) {
3202 Options options;
3203 options.env = env_;
3204 options.write_buffer_size = 100000; // Small write buffer
3205 options.enable_thread_tracking = true;
3206 options = CurrentOptions(options);
3207
3208 rocksdb::SyncPoint::GetInstance()->LoadDependency({
3209 {"FlushJob::FlushJob()", "DBTest::ThreadStatusFlush:1"},
3210 {"DBTest::ThreadStatusFlush:2", "FlushJob::WriteLevel0Table"},
3211 });
3212 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
3213
3214 CreateAndReopenWithCF({"pikachu"}, options);
3215 VerifyOperationCount(env_, ThreadStatus::OP_FLUSH, 0);
3216
3217 ASSERT_OK(Put(1, "foo", "v1"));
3218 ASSERT_EQ("v1", Get(1, "foo"));
3219 VerifyOperationCount(env_, ThreadStatus::OP_FLUSH, 0);
3220
3221 uint64_t num_running_flushes = 0;
3222 db_->GetIntProperty(DB::Properties::kNumRunningFlushes, &num_running_flushes);
3223 ASSERT_EQ(num_running_flushes, 0);
3224
3225 Put(1, "k1", std::string(100000, 'x')); // Fill memtable
3226 Put(1, "k2", std::string(100000, 'y')); // Trigger flush
3227
3228 // The first sync point is to make sure there's one flush job
3229 // running when we perform VerifyOperationCount().
3230 TEST_SYNC_POINT("DBTest::ThreadStatusFlush:1");
3231 VerifyOperationCount(env_, ThreadStatus::OP_FLUSH, 1);
3232 db_->GetIntProperty(DB::Properties::kNumRunningFlushes, &num_running_flushes);
3233 ASSERT_EQ(num_running_flushes, 1);
3234 // This second sync point is to ensure the flush job will not
3235 // be completed until we already perform VerifyOperationCount().
3236 TEST_SYNC_POINT("DBTest::ThreadStatusFlush:2");
3237 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
3238 }
3239
3240 TEST_P(DBTestWithParam, ThreadStatusSingleCompaction) {
3241 const int kTestKeySize = 16;
3242 const int kTestValueSize = 984;
3243 const int kEntrySize = kTestKeySize + kTestValueSize;
3244 const int kEntriesPerBuffer = 100;
3245 Options options;
3246 options.create_if_missing = true;
3247 options.write_buffer_size = kEntrySize * kEntriesPerBuffer;
3248 options.compaction_style = kCompactionStyleLevel;
3249 options.target_file_size_base = options.write_buffer_size;
3250 options.max_bytes_for_level_base = options.target_file_size_base * 2;
3251 options.max_bytes_for_level_multiplier = 2;
3252 options.compression = kNoCompression;
3253 options = CurrentOptions(options);
3254 options.env = env_;
3255 options.enable_thread_tracking = true;
3256 const int kNumL0Files = 4;
3257 options.level0_file_num_compaction_trigger = kNumL0Files;
3258 options.max_subcompactions = max_subcompactions_;
3259
3260 rocksdb::SyncPoint::GetInstance()->LoadDependency({
3261 {"DBTest::ThreadStatusSingleCompaction:0", "DBImpl::BGWorkCompaction"},
3262 {"CompactionJob::Run():Start", "DBTest::ThreadStatusSingleCompaction:1"},
3263 {"DBTest::ThreadStatusSingleCompaction:2", "CompactionJob::Run():End"},
3264 });
3265 for (int tests = 0; tests < 2; ++tests) {
3266 DestroyAndReopen(options);
3267 rocksdb::SyncPoint::GetInstance()->ClearTrace();
3268 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
3269
3270 Random rnd(301);
3271 // The Put Phase.
3272 for (int file = 0; file < kNumL0Files; ++file) {
3273 for (int key = 0; key < kEntriesPerBuffer; ++key) {
3274 ASSERT_OK(Put(ToString(key + file * kEntriesPerBuffer),
3275 RandomString(&rnd, kTestValueSize)));
3276 }
3277 Flush();
3278 }
3279 // This makes sure a compaction won't be scheduled until
3280 // we have done with the above Put Phase.
3281 uint64_t num_running_compactions = 0;
3282 db_->GetIntProperty(DB::Properties::kNumRunningCompactions,
3283 &num_running_compactions);
3284 ASSERT_EQ(num_running_compactions, 0);
3285 TEST_SYNC_POINT("DBTest::ThreadStatusSingleCompaction:0");
3286 ASSERT_GE(NumTableFilesAtLevel(0),
3287 options.level0_file_num_compaction_trigger);
3288
3289 // This makes sure at least one compaction is running.
3290 TEST_SYNC_POINT("DBTest::ThreadStatusSingleCompaction:1");
3291
3292 if (options.enable_thread_tracking) {
3293 // expecting one single L0 to L1 compaction
3294 VerifyOperationCount(env_, ThreadStatus::OP_COMPACTION, 1);
3295 } else {
3296 // If thread tracking is not enabled, compaction count should be 0.
3297 VerifyOperationCount(env_, ThreadStatus::OP_COMPACTION, 0);
3298 }
3299 db_->GetIntProperty(DB::Properties::kNumRunningCompactions,
3300 &num_running_compactions);
3301 ASSERT_EQ(num_running_compactions, 1);
3302 // TODO(yhchiang): adding assert to verify each compaction stage.
3303 TEST_SYNC_POINT("DBTest::ThreadStatusSingleCompaction:2");
3304
3305 // repeat the test with disabling thread tracking.
3306 options.enable_thread_tracking = false;
3307 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
3308 }
3309 }
3310
3311 TEST_P(DBTestWithParam, PreShutdownManualCompaction) {
3312 Options options = CurrentOptions();
3313 options.max_background_flushes = 0;
3314 options.max_subcompactions = max_subcompactions_;
3315 CreateAndReopenWithCF({"pikachu"}, options);
3316
3317 // iter - 0 with 7 levels
3318 // iter - 1 with 3 levels
3319 for (int iter = 0; iter < 2; ++iter) {
3320 MakeTables(3, "p", "q", 1);
3321 ASSERT_EQ("1,1,1", FilesPerLevel(1));
3322
3323 // Compaction range falls before files
3324 Compact(1, "", "c");
3325 ASSERT_EQ("1,1,1", FilesPerLevel(1));
3326
3327 // Compaction range falls after files
3328 Compact(1, "r", "z");
3329 ASSERT_EQ("1,1,1", FilesPerLevel(1));
3330
3331 // Compaction range overlaps files
3332 Compact(1, "p1", "p9");
3333 ASSERT_EQ("0,0,1", FilesPerLevel(1));
3334
3335 // Populate a different range
3336 MakeTables(3, "c", "e", 1);
3337 ASSERT_EQ("1,1,2", FilesPerLevel(1));
3338
3339 // Compact just the new range
3340 Compact(1, "b", "f");
3341 ASSERT_EQ("0,0,2", FilesPerLevel(1));
3342
3343 // Compact all
3344 MakeTables(1, "a", "z", 1);
3345 ASSERT_EQ("1,0,2", FilesPerLevel(1));
3346 CancelAllBackgroundWork(db_);
3347 db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr);
3348 ASSERT_EQ("1,0,2", FilesPerLevel(1));
3349
3350 if (iter == 0) {
3351 options = CurrentOptions();
3352 options.max_background_flushes = 0;
3353 options.num_levels = 3;
3354 options.create_if_missing = true;
3355 DestroyAndReopen(options);
3356 CreateAndReopenWithCF({"pikachu"}, options);
3357 }
3358 }
3359 }
3360
3361 TEST_F(DBTest, PreShutdownFlush) {
3362 Options options = CurrentOptions();
3363 options.max_background_flushes = 0;
3364 CreateAndReopenWithCF({"pikachu"}, options);
3365 ASSERT_OK(Put(1, "key", "value"));
3366 CancelAllBackgroundWork(db_);
3367 Status s =
3368 db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr);
3369 ASSERT_TRUE(s.IsShutdownInProgress());
3370 }
3371
3372 TEST_P(DBTestWithParam, PreShutdownMultipleCompaction) {
3373 const int kTestKeySize = 16;
3374 const int kTestValueSize = 984;
3375 const int kEntrySize = kTestKeySize + kTestValueSize;
3376 const int kEntriesPerBuffer = 40;
3377 const int kNumL0Files = 4;
3378
3379 const int kHighPriCount = 3;
3380 const int kLowPriCount = 5;
3381 env_->SetBackgroundThreads(kHighPriCount, Env::HIGH);
3382 env_->SetBackgroundThreads(kLowPriCount, Env::LOW);
3383
3384 Options options;
3385 options.create_if_missing = true;
3386 options.write_buffer_size = kEntrySize * kEntriesPerBuffer;
3387 options.compaction_style = kCompactionStyleLevel;
3388 options.target_file_size_base = options.write_buffer_size;
3389 options.max_bytes_for_level_base =
3390 options.target_file_size_base * kNumL0Files;
3391 options.compression = kNoCompression;
3392 options = CurrentOptions(options);
3393 options.env = env_;
3394 options.enable_thread_tracking = true;
3395 options.level0_file_num_compaction_trigger = kNumL0Files;
3396 options.max_bytes_for_level_multiplier = 2;
3397 options.max_background_compactions = kLowPriCount;
3398 options.level0_stop_writes_trigger = 1 << 10;
3399 options.level0_slowdown_writes_trigger = 1 << 10;
3400 options.max_subcompactions = max_subcompactions_;
3401
3402 TryReopen(options);
3403 Random rnd(301);
3404
3405 std::vector<ThreadStatus> thread_list;
3406 // Delay both flush and compaction
3407 rocksdb::SyncPoint::GetInstance()->LoadDependency(
3408 {{"FlushJob::FlushJob()", "CompactionJob::Run():Start"},
3409 {"CompactionJob::Run():Start",
3410 "DBTest::PreShutdownMultipleCompaction:Preshutdown"},
3411 {"CompactionJob::Run():Start",
3412 "DBTest::PreShutdownMultipleCompaction:VerifyCompaction"},
3413 {"DBTest::PreShutdownMultipleCompaction:Preshutdown",
3414 "CompactionJob::Run():End"},
3415 {"CompactionJob::Run():End",
3416 "DBTest::PreShutdownMultipleCompaction:VerifyPreshutdown"}});
3417
3418 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
3419
3420 // Make rocksdb busy
3421 int key = 0;
3422 // check how many threads are doing compaction using GetThreadList
3423 int operation_count[ThreadStatus::NUM_OP_TYPES] = {0};
3424 for (int file = 0; file < 16 * kNumL0Files; ++file) {
3425 for (int k = 0; k < kEntriesPerBuffer; ++k) {
3426 ASSERT_OK(Put(ToString(key++), RandomString(&rnd, kTestValueSize)));
3427 }
3428
3429 Status s = env_->GetThreadList(&thread_list);
3430 for (auto thread : thread_list) {
3431 operation_count[thread.operation_type]++;
3432 }
3433
3434 // Speed up the test
3435 if (operation_count[ThreadStatus::OP_FLUSH] > 1 &&
3436 operation_count[ThreadStatus::OP_COMPACTION] >
3437 0.6 * options.max_background_compactions) {
3438 break;
3439 }
3440 if (file == 15 * kNumL0Files) {
3441 TEST_SYNC_POINT("DBTest::PreShutdownMultipleCompaction:Preshutdown");
3442 }
3443 }
3444
3445 TEST_SYNC_POINT("DBTest::PreShutdownMultipleCompaction:Preshutdown");
3446 ASSERT_GE(operation_count[ThreadStatus::OP_COMPACTION], 1);
3447 CancelAllBackgroundWork(db_);
3448 TEST_SYNC_POINT("DBTest::PreShutdownMultipleCompaction:VerifyPreshutdown");
3449 dbfull()->TEST_WaitForCompact();
3450 // Record the number of compactions at a time.
3451 for (int i = 0; i < ThreadStatus::NUM_OP_TYPES; ++i) {
3452 operation_count[i] = 0;
3453 }
3454 Status s = env_->GetThreadList(&thread_list);
3455 for (auto thread : thread_list) {
3456 operation_count[thread.operation_type]++;
3457 }
3458 ASSERT_EQ(operation_count[ThreadStatus::OP_COMPACTION], 0);
3459 }
3460
3461 TEST_P(DBTestWithParam, PreShutdownCompactionMiddle) {
3462 const int kTestKeySize = 16;
3463 const int kTestValueSize = 984;
3464 const int kEntrySize = kTestKeySize + kTestValueSize;
3465 const int kEntriesPerBuffer = 40;
3466 const int kNumL0Files = 4;
3467
3468 const int kHighPriCount = 3;
3469 const int kLowPriCount = 5;
3470 env_->SetBackgroundThreads(kHighPriCount, Env::HIGH);
3471 env_->SetBackgroundThreads(kLowPriCount, Env::LOW);
3472
3473 Options options;
3474 options.create_if_missing = true;
3475 options.write_buffer_size = kEntrySize * kEntriesPerBuffer;
3476 options.compaction_style = kCompactionStyleLevel;
3477 options.target_file_size_base = options.write_buffer_size;
3478 options.max_bytes_for_level_base =
3479 options.target_file_size_base * kNumL0Files;
3480 options.compression = kNoCompression;
3481 options = CurrentOptions(options);
3482 options.env = env_;
3483 options.enable_thread_tracking = true;
3484 options.level0_file_num_compaction_trigger = kNumL0Files;
3485 options.max_bytes_for_level_multiplier = 2;
3486 options.max_background_compactions = kLowPriCount;
3487 options.level0_stop_writes_trigger = 1 << 10;
3488 options.level0_slowdown_writes_trigger = 1 << 10;
3489 options.max_subcompactions = max_subcompactions_;
3490
3491 TryReopen(options);
3492 Random rnd(301);
3493
3494 std::vector<ThreadStatus> thread_list;
3495 // Delay both flush and compaction
3496 rocksdb::SyncPoint::GetInstance()->LoadDependency(
3497 {{"DBTest::PreShutdownCompactionMiddle:Preshutdown",
3498 "CompactionJob::Run():Inprogress"},
3499 {"CompactionJob::Run():Start",
3500 "DBTest::PreShutdownCompactionMiddle:VerifyCompaction"},
3501 {"CompactionJob::Run():Inprogress", "CompactionJob::Run():End"},
3502 {"CompactionJob::Run():End",
3503 "DBTest::PreShutdownCompactionMiddle:VerifyPreshutdown"}});
3504
3505 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
3506
3507 // Make rocksdb busy
3508 int key = 0;
3509 // check how many threads are doing compaction using GetThreadList
3510 int operation_count[ThreadStatus::NUM_OP_TYPES] = {0};
3511 for (int file = 0; file < 16 * kNumL0Files; ++file) {
3512 for (int k = 0; k < kEntriesPerBuffer; ++k) {
3513 ASSERT_OK(Put(ToString(key++), RandomString(&rnd, kTestValueSize)));
3514 }
3515
3516 Status s = env_->GetThreadList(&thread_list);
3517 for (auto thread : thread_list) {
3518 operation_count[thread.operation_type]++;
3519 }
3520
3521 // Speed up the test
3522 if (operation_count[ThreadStatus::OP_FLUSH] > 1 &&
3523 operation_count[ThreadStatus::OP_COMPACTION] >
3524 0.6 * options.max_background_compactions) {
3525 break;
3526 }
3527 if (file == 15 * kNumL0Files) {
3528 TEST_SYNC_POINT("DBTest::PreShutdownCompactionMiddle:VerifyCompaction");
3529 }
3530 }
3531
3532 ASSERT_GE(operation_count[ThreadStatus::OP_COMPACTION], 1);
3533 CancelAllBackgroundWork(db_);
3534 TEST_SYNC_POINT("DBTest::PreShutdownCompactionMiddle:Preshutdown");
3535 TEST_SYNC_POINT("DBTest::PreShutdownCompactionMiddle:VerifyPreshutdown");
3536 dbfull()->TEST_WaitForCompact();
3537 // Record the number of compactions at a time.
3538 for (int i = 0; i < ThreadStatus::NUM_OP_TYPES; ++i) {
3539 operation_count[i] = 0;
3540 }
3541 Status s = env_->GetThreadList(&thread_list);
3542 for (auto thread : thread_list) {
3543 operation_count[thread.operation_type]++;
3544 }
3545 ASSERT_EQ(operation_count[ThreadStatus::OP_COMPACTION], 0);
3546 }
3547
3548 #endif // ROCKSDB_USING_THREAD_STATUS
3549
3550 #ifndef ROCKSDB_LITE
3551 TEST_F(DBTest, FlushOnDestroy) {
3552 WriteOptions wo;
3553 wo.disableWAL = true;
3554 ASSERT_OK(Put("foo", "v1", wo));
3555 CancelAllBackgroundWork(db_);
3556 }
3557
3558 TEST_F(DBTest, DynamicLevelCompressionPerLevel) {
3559 if (!Snappy_Supported()) {
3560 return;
3561 }
3562 const int kNKeys = 120;
3563 int keys[kNKeys];
3564 for (int i = 0; i < kNKeys; i++) {
3565 keys[i] = i;
3566 }
3567 std::random_shuffle(std::begin(keys), std::end(keys));
3568
3569 Random rnd(301);
3570 Options options;
3571 options.create_if_missing = true;
3572 options.db_write_buffer_size = 20480;
3573 options.write_buffer_size = 20480;
3574 options.max_write_buffer_number = 2;
3575 options.level0_file_num_compaction_trigger = 2;
3576 options.level0_slowdown_writes_trigger = 2;
3577 options.level0_stop_writes_trigger = 2;
3578 options.target_file_size_base = 20480;
3579 options.level_compaction_dynamic_level_bytes = true;
3580 options.max_bytes_for_level_base = 102400;
3581 options.max_bytes_for_level_multiplier = 4;
3582 options.max_background_compactions = 1;
3583 options.num_levels = 5;
3584
3585 options.compression_per_level.resize(3);
3586 options.compression_per_level[0] = kNoCompression;
3587 options.compression_per_level[1] = kNoCompression;
3588 options.compression_per_level[2] = kSnappyCompression;
3589
3590 OnFileDeletionListener* listener = new OnFileDeletionListener();
3591 options.listeners.emplace_back(listener);
3592
3593 DestroyAndReopen(options);
3594
3595 // Insert more than 80K. L4 should be base level. Neither L0 nor L4 should
3596 // be compressed, so total data size should be more than 80K.
3597 for (int i = 0; i < 20; i++) {
3598 ASSERT_OK(Put(Key(keys[i]), CompressibleString(&rnd, 4000)));
3599 }
3600 Flush();
3601 dbfull()->TEST_WaitForCompact();
3602
3603 ASSERT_EQ(NumTableFilesAtLevel(1), 0);
3604 ASSERT_EQ(NumTableFilesAtLevel(2), 0);
3605 ASSERT_EQ(NumTableFilesAtLevel(3), 0);
3606 // Assuming each files' metadata is at least 50 bytes/
3607 ASSERT_GT(SizeAtLevel(0) + SizeAtLevel(4), 20U * 4000U + 50U * 4);
3608
3609 // Insert 400KB. Some data will be compressed
3610 for (int i = 21; i < 120; i++) {
3611 ASSERT_OK(Put(Key(keys[i]), CompressibleString(&rnd, 4000)));
3612 }
3613 Flush();
3614 dbfull()->TEST_WaitForCompact();
3615 ASSERT_EQ(NumTableFilesAtLevel(1), 0);
3616 ASSERT_EQ(NumTableFilesAtLevel(2), 0);
3617 ASSERT_LT(SizeAtLevel(0) + SizeAtLevel(3) + SizeAtLevel(4),
3618 120U * 4000U + 50U * 24);
3619 // Make sure data in files in L3 is not compacted by removing all files
3620 // in L4 and calculate number of rows
3621 ASSERT_OK(dbfull()->SetOptions({
3622 {"disable_auto_compactions", "true"},
3623 }));
3624 ColumnFamilyMetaData cf_meta;
3625 db_->GetColumnFamilyMetaData(&cf_meta);
3626 for (auto file : cf_meta.levels[4].files) {
3627 listener->SetExpectedFileName(dbname_ + file.name);
3628 ASSERT_OK(dbfull()->DeleteFile(file.name));
3629 }
3630 listener->VerifyMatchedCount(cf_meta.levels[4].files.size());
3631
3632 int num_keys = 0;
3633 std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
3634 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
3635 num_keys++;
3636 }
3637 ASSERT_OK(iter->status());
3638 ASSERT_GT(SizeAtLevel(0) + SizeAtLevel(3), num_keys * 4000U + num_keys * 10U);
3639 }
3640
3641 TEST_F(DBTest, DynamicLevelCompressionPerLevel2) {
3642 if (!Snappy_Supported() || !LZ4_Supported() || !Zlib_Supported()) {
3643 return;
3644 }
3645 const int kNKeys = 500;
3646 int keys[kNKeys];
3647 for (int i = 0; i < kNKeys; i++) {
3648 keys[i] = i;
3649 }
3650 std::random_shuffle(std::begin(keys), std::end(keys));
3651
3652 Random rnd(301);
3653 Options options;
3654 options.create_if_missing = true;
3655 options.db_write_buffer_size = 6000;
3656 options.write_buffer_size = 6000;
3657 options.max_write_buffer_number = 2;
3658 options.level0_file_num_compaction_trigger = 2;
3659 options.level0_slowdown_writes_trigger = 2;
3660 options.level0_stop_writes_trigger = 2;
3661 options.soft_pending_compaction_bytes_limit = 1024 * 1024;
3662
3663 // Use file size to distinguish levels
3664 // L1: 10, L2: 20, L3 40, L4 80
3665 // L0 is less than 30
3666 options.target_file_size_base = 10;
3667 options.target_file_size_multiplier = 2;
3668
3669 options.level_compaction_dynamic_level_bytes = true;
3670 options.max_bytes_for_level_base = 200;
3671 options.max_bytes_for_level_multiplier = 8;
3672 options.max_background_compactions = 1;
3673 options.num_levels = 5;
3674 std::shared_ptr<mock::MockTableFactory> mtf(new mock::MockTableFactory);
3675 options.table_factory = mtf;
3676
3677 options.compression_per_level.resize(3);
3678 options.compression_per_level[0] = kNoCompression;
3679 options.compression_per_level[1] = kLZ4Compression;
3680 options.compression_per_level[2] = kZlibCompression;
3681
3682 DestroyAndReopen(options);
3683 // When base level is L4, L4 is LZ4.
3684 std::atomic<int> num_zlib(0);
3685 std::atomic<int> num_lz4(0);
3686 std::atomic<int> num_no(0);
3687 rocksdb::SyncPoint::GetInstance()->SetCallBack(
3688 "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
3689 Compaction* compaction = reinterpret_cast<Compaction*>(arg);
3690 if (compaction->output_level() == 4) {
3691 ASSERT_TRUE(compaction->output_compression() == kLZ4Compression);
3692 num_lz4.fetch_add(1);
3693 }
3694 });
3695 rocksdb::SyncPoint::GetInstance()->SetCallBack(
3696 "FlushJob::WriteLevel0Table:output_compression", [&](void* arg) {
3697 auto* compression = reinterpret_cast<CompressionType*>(arg);
3698 ASSERT_TRUE(*compression == kNoCompression);
3699 num_no.fetch_add(1);
3700 });
3701 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
3702
3703 for (int i = 0; i < 100; i++) {
3704 ASSERT_OK(Put(Key(keys[i]), RandomString(&rnd, 200)));
3705
3706 if (i % 25 == 0) {
3707 dbfull()->TEST_WaitForFlushMemTable();
3708 }
3709 }
3710
3711 Flush();
3712 dbfull()->TEST_WaitForFlushMemTable();
3713 dbfull()->TEST_WaitForCompact();
3714 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
3715 rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
3716
3717 ASSERT_EQ(NumTableFilesAtLevel(1), 0);
3718 ASSERT_EQ(NumTableFilesAtLevel(2), 0);
3719 ASSERT_EQ(NumTableFilesAtLevel(3), 0);
3720 ASSERT_GT(NumTableFilesAtLevel(4), 0);
3721 ASSERT_GT(num_no.load(), 2);
3722 ASSERT_GT(num_lz4.load(), 0);
3723 int prev_num_files_l4 = NumTableFilesAtLevel(4);
3724
3725 // After base level turn L4->L3, L3 becomes LZ4 and L4 becomes Zlib
3726 num_lz4.store(0);
3727 num_no.store(0);
3728 rocksdb::SyncPoint::GetInstance()->SetCallBack(
3729 "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
3730 Compaction* compaction = reinterpret_cast<Compaction*>(arg);
3731 if (compaction->output_level() == 4 && compaction->start_level() == 3) {
3732 ASSERT_TRUE(compaction->output_compression() == kZlibCompression);
3733 num_zlib.fetch_add(1);
3734 } else {
3735 ASSERT_TRUE(compaction->output_compression() == kLZ4Compression);
3736 num_lz4.fetch_add(1);
3737 }
3738 });
3739 rocksdb::SyncPoint::GetInstance()->SetCallBack(
3740 "FlushJob::WriteLevel0Table:output_compression", [&](void* arg) {
3741 auto* compression = reinterpret_cast<CompressionType*>(arg);
3742 ASSERT_TRUE(*compression == kNoCompression);
3743 num_no.fetch_add(1);
3744 });
3745 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
3746
3747 for (int i = 101; i < 500; i++) {
3748 ASSERT_OK(Put(Key(keys[i]), RandomString(&rnd, 200)));
3749 if (i % 100 == 99) {
3750 Flush();
3751 dbfull()->TEST_WaitForCompact();
3752 }
3753 }
3754
3755 rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
3756 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
3757 ASSERT_EQ(NumTableFilesAtLevel(1), 0);
3758 ASSERT_EQ(NumTableFilesAtLevel(2), 0);
3759 ASSERT_GT(NumTableFilesAtLevel(3), 0);
3760 ASSERT_GT(NumTableFilesAtLevel(4), prev_num_files_l4);
3761 ASSERT_GT(num_no.load(), 2);
3762 ASSERT_GT(num_lz4.load(), 0);
3763 ASSERT_GT(num_zlib.load(), 0);
3764 }
3765
3766 TEST_F(DBTest, DynamicCompactionOptions) {
3767 // minimum write buffer size is enforced at 64KB
3768 const uint64_t k32KB = 1 << 15;
3769 const uint64_t k64KB = 1 << 16;
3770 const uint64_t k128KB = 1 << 17;
3771 const uint64_t k1MB = 1 << 20;
3772 const uint64_t k4KB = 1 << 12;
3773 Options options;
3774 options.env = env_;
3775 options.create_if_missing = true;
3776 options.compression = kNoCompression;
3777 options.soft_pending_compaction_bytes_limit = 1024 * 1024;
3778 options.write_buffer_size = k64KB;
3779 options.arena_block_size = 4 * k4KB;
3780 options.max_write_buffer_number = 2;
3781 // Compaction related options
3782 options.level0_file_num_compaction_trigger = 3;
3783 options.level0_slowdown_writes_trigger = 4;
3784 options.level0_stop_writes_trigger = 8;
3785 options.target_file_size_base = k64KB;
3786 options.max_compaction_bytes = options.target_file_size_base * 10;
3787 options.target_file_size_multiplier = 1;
3788 options.max_bytes_for_level_base = k128KB;
3789 options.max_bytes_for_level_multiplier = 4;
3790
3791 // Block flush thread and disable compaction thread
3792 env_->SetBackgroundThreads(1, Env::LOW);
3793 env_->SetBackgroundThreads(1, Env::HIGH);
3794 DestroyAndReopen(options);
3795
3796 auto gen_l0_kb = [this](int start, int size, int stride) {
3797 Random rnd(301);
3798 for (int i = 0; i < size; i++) {
3799 ASSERT_OK(Put(Key(start + stride * i), RandomString(&rnd, 1024)));
3800 }
3801 dbfull()->TEST_WaitForFlushMemTable();
3802 };
3803
3804 // Write 3 files that have the same key range.
3805 // Since level0_file_num_compaction_trigger is 3, compaction should be
3806 // triggered. The compaction should result in one L1 file
3807 gen_l0_kb(0, 64, 1);
3808 ASSERT_EQ(NumTableFilesAtLevel(0), 1);
3809 gen_l0_kb(0, 64, 1);
3810 ASSERT_EQ(NumTableFilesAtLevel(0), 2);
3811 gen_l0_kb(0, 64, 1);
3812 dbfull()->TEST_WaitForCompact();
3813 ASSERT_EQ("0,1", FilesPerLevel());
3814 std::vector<LiveFileMetaData> metadata;
3815 db_->GetLiveFilesMetaData(&metadata);
3816 ASSERT_EQ(1U, metadata.size());
3817 ASSERT_LE(metadata[0].size, k64KB + k4KB);
3818 ASSERT_GE(metadata[0].size, k64KB - k4KB);
3819
3820 // Test compaction trigger and target_file_size_base
3821 // Reduce compaction trigger to 2, and reduce L1 file size to 32KB.
3822 // Writing to 64KB L0 files should trigger a compaction. Since these
3823 // 2 L0 files have the same key range, compaction merge them and should
3824 // result in 2 32KB L1 files.
3825 ASSERT_OK(dbfull()->SetOptions({{"level0_file_num_compaction_trigger", "2"},
3826 {"target_file_size_base", ToString(k32KB)}}));
3827
3828 gen_l0_kb(0, 64, 1);
3829 ASSERT_EQ("1,1", FilesPerLevel());
3830 gen_l0_kb(0, 64, 1);
3831 dbfull()->TEST_WaitForCompact();
3832 ASSERT_EQ("0,2", FilesPerLevel());
3833 metadata.clear();
3834 db_->GetLiveFilesMetaData(&metadata);
3835 ASSERT_EQ(2U, metadata.size());
3836 ASSERT_LE(metadata[0].size, k32KB + k4KB);
3837 ASSERT_GE(metadata[0].size, k32KB - k4KB);
3838 ASSERT_LE(metadata[1].size, k32KB + k4KB);
3839 ASSERT_GE(metadata[1].size, k32KB - k4KB);
3840
3841 // Test max_bytes_for_level_base
3842 // Increase level base size to 256KB and write enough data that will
3843 // fill L1 and L2. L1 size should be around 256KB while L2 size should be
3844 // around 256KB x 4.
3845 ASSERT_OK(
3846 dbfull()->SetOptions({{"max_bytes_for_level_base", ToString(k1MB)}}));
3847
3848 // writing 96 x 64KB => 6 * 1024KB
3849 // (L1 + L2) = (1 + 4) * 1024KB
3850 for (int i = 0; i < 96; ++i) {
3851 gen_l0_kb(i, 64, 96);
3852 }
3853 dbfull()->TEST_WaitForCompact();
3854 ASSERT_GT(SizeAtLevel(1), k1MB / 2);
3855 ASSERT_LT(SizeAtLevel(1), k1MB + k1MB / 2);
3856
3857 // Within (0.5, 1.5) of 4MB.
3858 ASSERT_GT(SizeAtLevel(2), 2 * k1MB);
3859 ASSERT_LT(SizeAtLevel(2), 6 * k1MB);
3860
3861 // Test max_bytes_for_level_multiplier and
3862 // max_bytes_for_level_base. Now, reduce both mulitplier and level base,
3863 // After filling enough data that can fit in L1 - L3, we should see L1 size
3864 // reduces to 128KB from 256KB which was asserted previously. Same for L2.
3865 ASSERT_OK(
3866 dbfull()->SetOptions({{"max_bytes_for_level_multiplier", "2"},
3867 {"max_bytes_for_level_base", ToString(k128KB)}}));
3868
3869 // writing 20 x 64KB = 10 x 128KB
3870 // (L1 + L2 + L3) = (1 + 2 + 4) * 128KB
3871 for (int i = 0; i < 20; ++i) {
3872 gen_l0_kb(i, 64, 32);
3873 }
3874 dbfull()->TEST_WaitForCompact();
3875 uint64_t total_size = SizeAtLevel(1) + SizeAtLevel(2) + SizeAtLevel(3);
3876 ASSERT_TRUE(total_size < k128KB * 7 * 1.5);
3877
3878 // Test level0_stop_writes_trigger.
3879 // Clean up memtable and L0. Block compaction threads. If continue to write
3880 // and flush memtables. We should see put stop after 8 memtable flushes
3881 // since level0_stop_writes_trigger = 8
3882 dbfull()->TEST_FlushMemTable(true);
3883 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
3884 // Block compaction
3885 test::SleepingBackgroundTask sleeping_task_low;
3886 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
3887 Env::Priority::LOW);
3888 sleeping_task_low.WaitUntilSleeping();
3889 ASSERT_EQ(NumTableFilesAtLevel(0), 0);
3890 int count = 0;
3891 Random rnd(301);
3892 WriteOptions wo;
3893 while (count < 64) {
3894 ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), wo));
3895 dbfull()->TEST_FlushMemTable(true);
3896 count++;
3897 if (dbfull()->TEST_write_controler().IsStopped()) {
3898 sleeping_task_low.WakeUp();
3899 break;
3900 }
3901 }
3902 // Stop trigger = 8
3903 ASSERT_EQ(count, 8);
3904 // Unblock
3905 sleeping_task_low.WaitUntilDone();
3906
3907 // Now reduce level0_stop_writes_trigger to 6. Clear up memtables and L0.
3908 // Block compaction thread again. Perform the put and memtable flushes
3909 // until we see the stop after 6 memtable flushes.
3910 ASSERT_OK(dbfull()->SetOptions({{"level0_stop_writes_trigger", "6"}}));
3911 dbfull()->TEST_FlushMemTable(true);
3912 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
3913 ASSERT_EQ(NumTableFilesAtLevel(0), 0);
3914
3915 // Block compaction again
3916 sleeping_task_low.Reset();
3917 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
3918 Env::Priority::LOW);
3919 sleeping_task_low.WaitUntilSleeping();
3920 count = 0;
3921 while (count < 64) {
3922 ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), wo));
3923 dbfull()->TEST_FlushMemTable(true);
3924 count++;
3925 if (dbfull()->TEST_write_controler().IsStopped()) {
3926 sleeping_task_low.WakeUp();
3927 break;
3928 }
3929 }
3930 ASSERT_EQ(count, 6);
3931 // Unblock
3932 sleeping_task_low.WaitUntilDone();
3933
3934 // Test disable_auto_compactions
3935 // Compaction thread is unblocked but auto compaction is disabled. Write
3936 // 4 L0 files and compaction should be triggered. If auto compaction is
3937 // disabled, then TEST_WaitForCompact will be waiting for nothing. Number of
3938 // L0 files do not change after the call.
3939 ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "true"}}));
3940 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
3941 ASSERT_EQ(NumTableFilesAtLevel(0), 0);
3942
3943 for (int i = 0; i < 4; ++i) {
3944 ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024)));
3945 // Wait for compaction so that put won't stop
3946 dbfull()->TEST_FlushMemTable(true);
3947 }
3948 dbfull()->TEST_WaitForCompact();
3949 ASSERT_EQ(NumTableFilesAtLevel(0), 4);
3950
3951 // Enable auto compaction and perform the same test, # of L0 files should be
3952 // reduced after compaction.
3953 ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
3954 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
3955 ASSERT_EQ(NumTableFilesAtLevel(0), 0);
3956
3957 for (int i = 0; i < 4; ++i) {
3958 ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024)));
3959 // Wait for compaction so that put won't stop
3960 dbfull()->TEST_FlushMemTable(true);
3961 }
3962 dbfull()->TEST_WaitForCompact();
3963 ASSERT_LT(NumTableFilesAtLevel(0), 4);
3964 }
3965 #endif // ROCKSDB_LITE
3966
3967 TEST_F(DBTest, FileCreationRandomFailure) {
3968 Options options;
3969 options.env = env_;
3970 options.create_if_missing = true;
3971 options.write_buffer_size = 100000; // Small write buffer
3972 options.target_file_size_base = 200000;
3973 options.max_bytes_for_level_base = 1000000;
3974 options.max_bytes_for_level_multiplier = 2;
3975
3976 DestroyAndReopen(options);
3977 Random rnd(301);
3978
3979 const int kCDTKeysPerBuffer = 4;
3980 const int kTestSize = kCDTKeysPerBuffer * 4096;
3981 const int kTotalIteration = 100;
3982 // the second half of the test involves in random failure
3983 // of file creation.
3984 const int kRandomFailureTest = kTotalIteration / 2;
3985 std::vector<std::string> values;
3986 for (int i = 0; i < kTestSize; ++i) {
3987 values.push_back("NOT_FOUND");
3988 }
3989 for (int j = 0; j < kTotalIteration; ++j) {
3990 if (j == kRandomFailureTest) {
3991 env_->non_writeable_rate_.store(90);
3992 }
3993 for (int k = 0; k < kTestSize; ++k) {
3994 // here we expect some of the Put fails.
3995 std::string value = RandomString(&rnd, 100);
3996 Status s = Put(Key(k), Slice(value));
3997 if (s.ok()) {
3998 // update the latest successful put
3999 values[k] = value;
4000 }
4001 // But everything before we simulate the failure-test should succeed.
4002 if (j < kRandomFailureTest) {
4003 ASSERT_OK(s);
4004 }
4005 }
4006 }
4007
4008 // If rocksdb does not do the correct job, internal assert will fail here.
4009 dbfull()->TEST_WaitForFlushMemTable();
4010 dbfull()->TEST_WaitForCompact();
4011
4012 // verify we have the latest successful update
4013 for (int k = 0; k < kTestSize; ++k) {
4014 auto v = Get(Key(k));
4015 ASSERT_EQ(v, values[k]);
4016 }
4017
4018 // reopen and reverify we have the latest successful update
4019 env_->non_writeable_rate_.store(0);
4020 Reopen(options);
4021 for (int k = 0; k < kTestSize; ++k) {
4022 auto v = Get(Key(k));
4023 ASSERT_EQ(v, values[k]);
4024 }
4025 }
4026
4027 #ifndef ROCKSDB_LITE
4028 TEST_F(DBTest, DynamicMiscOptions) {
4029 // Test max_sequential_skip_in_iterations
4030 Options options;
4031 options.env = env_;
4032 options.create_if_missing = true;
4033 options.max_sequential_skip_in_iterations = 16;
4034 options.compression = kNoCompression;
4035 options.statistics = rocksdb::CreateDBStatistics();
4036 DestroyAndReopen(options);
4037
4038 auto assert_reseek_count = [this, &options](int key_start, int num_reseek) {
4039 int key0 = key_start;
4040 int key1 = key_start + 1;
4041 int key2 = key_start + 2;
4042 Random rnd(301);
4043 ASSERT_OK(Put(Key(key0), RandomString(&rnd, 8)));
4044 for (int i = 0; i < 10; ++i) {
4045 ASSERT_OK(Put(Key(key1), RandomString(&rnd, 8)));
4046 }
4047 ASSERT_OK(Put(Key(key2), RandomString(&rnd, 8)));
4048 std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
4049 iter->Seek(Key(key1));
4050 ASSERT_TRUE(iter->Valid());
4051 ASSERT_EQ(iter->key().compare(Key(key1)), 0);
4052 iter->Next();
4053 ASSERT_TRUE(iter->Valid());
4054 ASSERT_EQ(iter->key().compare(Key(key2)), 0);
4055 ASSERT_EQ(num_reseek,
4056 TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION));
4057 };
4058 // No reseek
4059 assert_reseek_count(100, 0);
4060
4061 ASSERT_OK(dbfull()->SetOptions({{"max_sequential_skip_in_iterations", "4"}}));
4062 // Clear memtable and make new option effective
4063 dbfull()->TEST_FlushMemTable(true);
4064 // Trigger reseek
4065 assert_reseek_count(200, 1);
4066
4067 ASSERT_OK(
4068 dbfull()->SetOptions({{"max_sequential_skip_in_iterations", "16"}}));
4069 // Clear memtable and make new option effective
4070 dbfull()->TEST_FlushMemTable(true);
4071 // No reseek
4072 assert_reseek_count(300, 1);
4073
4074 MutableCFOptions mutable_cf_options;
4075 CreateAndReopenWithCF({"pikachu"}, options);
4076 // Test soft_pending_compaction_bytes_limit,
4077 // hard_pending_compaction_bytes_limit
4078 ASSERT_OK(dbfull()->SetOptions(
4079 handles_[1], {{"soft_pending_compaction_bytes_limit", "200"},
4080 {"hard_pending_compaction_bytes_limit", "300"}}));
4081 ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_[1],
4082 &mutable_cf_options));
4083 ASSERT_EQ(200, mutable_cf_options.soft_pending_compaction_bytes_limit);
4084 ASSERT_EQ(300, mutable_cf_options.hard_pending_compaction_bytes_limit);
4085 // Test report_bg_io_stats
4086 ASSERT_OK(
4087 dbfull()->SetOptions(handles_[1], {{"report_bg_io_stats", "true"}}));
4088 // sanity check
4089 ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_[1],
4090 &mutable_cf_options));
4091 ASSERT_TRUE(mutable_cf_options.report_bg_io_stats);
4092 // Test compression
4093 // sanity check
4094 ASSERT_OK(dbfull()->SetOptions({{"compression", "kNoCompression"}}));
4095 ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_[0],
4096 &mutable_cf_options));
4097 ASSERT_EQ(CompressionType::kNoCompression, mutable_cf_options.compression);
4098 ASSERT_OK(dbfull()->SetOptions({{"compression", "kSnappyCompression"}}));
4099 ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_[0],
4100 &mutable_cf_options));
4101 ASSERT_EQ(CompressionType::kSnappyCompression,
4102 mutable_cf_options.compression);
4103 // Test paranoid_file_checks already done in db_block_cache_test
4104 ASSERT_OK(
4105 dbfull()->SetOptions(handles_[1], {{"paranoid_file_checks", "true"}}));
4106 ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_[1],
4107 &mutable_cf_options));
4108 ASSERT_TRUE(mutable_cf_options.report_bg_io_stats);
4109 }
4110 #endif // ROCKSDB_LITE
4111
4112 TEST_F(DBTest, L0L1L2AndUpHitCounter) {
4113 Options options = CurrentOptions();
4114 options.write_buffer_size = 32 * 1024;
4115 options.target_file_size_base = 32 * 1024;
4116 options.level0_file_num_compaction_trigger = 2;
4117 options.level0_slowdown_writes_trigger = 2;
4118 options.level0_stop_writes_trigger = 4;
4119 options.max_bytes_for_level_base = 64 * 1024;
4120 options.max_write_buffer_number = 2;
4121 options.max_background_compactions = 8;
4122 options.max_background_flushes = 8;
4123 options.statistics = rocksdb::CreateDBStatistics();
4124 CreateAndReopenWithCF({"mypikachu"}, options);
4125
4126 int numkeys = 20000;
4127 for (int i = 0; i < numkeys; i++) {
4128 ASSERT_OK(Put(1, Key(i), "val"));
4129 }
4130 ASSERT_EQ(0, TestGetTickerCount(options, GET_HIT_L0));
4131 ASSERT_EQ(0, TestGetTickerCount(options, GET_HIT_L1));
4132 ASSERT_EQ(0, TestGetTickerCount(options, GET_HIT_L2_AND_UP));
4133
4134 ASSERT_OK(Flush(1));
4135 dbfull()->TEST_WaitForCompact();
4136
4137 for (int i = 0; i < numkeys; i++) {
4138 ASSERT_EQ(Get(1, Key(i)), "val");
4139 }
4140
4141 ASSERT_GT(TestGetTickerCount(options, GET_HIT_L0), 100);
4142 ASSERT_GT(TestGetTickerCount(options, GET_HIT_L1), 100);
4143 ASSERT_GT(TestGetTickerCount(options, GET_HIT_L2_AND_UP), 100);
4144
4145 ASSERT_EQ(numkeys, TestGetTickerCount(options, GET_HIT_L0) +
4146 TestGetTickerCount(options, GET_HIT_L1) +
4147 TestGetTickerCount(options, GET_HIT_L2_AND_UP));
4148 }
4149
4150 TEST_F(DBTest, EncodeDecompressedBlockSizeTest) {
4151 // iter 0 -- zlib
4152 // iter 1 -- bzip2
4153 // iter 2 -- lz4
4154 // iter 3 -- lz4HC
4155 // iter 4 -- xpress
4156 CompressionType compressions[] = {kZlibCompression, kBZip2Compression,
4157 kLZ4Compression, kLZ4HCCompression,
4158 kXpressCompression};
4159 for (auto comp : compressions) {
4160 if (!CompressionTypeSupported(comp)) {
4161 continue;
4162 }
4163 // first_table_version 1 -- generate with table_version == 1, read with
4164 // table_version == 2
4165 // first_table_version 2 -- generate with table_version == 2, read with
4166 // table_version == 1
4167 for (int first_table_version = 1; first_table_version <= 2;
4168 ++first_table_version) {
4169 BlockBasedTableOptions table_options;
4170 table_options.format_version = first_table_version;
4171 table_options.filter_policy.reset(NewBloomFilterPolicy(10));
4172 Options options = CurrentOptions();
4173 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
4174 options.create_if_missing = true;
4175 options.compression = comp;
4176 DestroyAndReopen(options);
4177
4178 int kNumKeysWritten = 100000;
4179
4180 Random rnd(301);
4181 for (int i = 0; i < kNumKeysWritten; ++i) {
4182 // compressible string
4183 ASSERT_OK(Put(Key(i), RandomString(&rnd, 128) + std::string(128, 'a')));
4184 }
4185
4186 table_options.format_version = first_table_version == 1 ? 2 : 1;
4187 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
4188 Reopen(options);
4189 for (int i = 0; i < kNumKeysWritten; ++i) {
4190 auto r = Get(Key(i));
4191 ASSERT_EQ(r.substr(128), std::string(128, 'a'));
4192 }
4193 }
4194 }
4195 }
4196
4197 TEST_F(DBTest, CloseSpeedup) {
4198 Options options = CurrentOptions();
4199 options.compaction_style = kCompactionStyleLevel;
4200 options.write_buffer_size = 110 << 10; // 110KB
4201 options.arena_block_size = 4 << 10;
4202 options.level0_file_num_compaction_trigger = 2;
4203 options.num_levels = 4;
4204 options.max_bytes_for_level_base = 400 * 1024;
4205 options.max_write_buffer_number = 16;
4206
4207 // Block background threads
4208 env_->SetBackgroundThreads(1, Env::LOW);
4209 env_->SetBackgroundThreads(1, Env::HIGH);
4210 test::SleepingBackgroundTask sleeping_task_low;
4211 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
4212 Env::Priority::LOW);
4213 test::SleepingBackgroundTask sleeping_task_high;
4214 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
4215 &sleeping_task_high, Env::Priority::HIGH);
4216
4217 std::vector<std::string> filenames;
4218 env_->GetChildren(dbname_, &filenames);
4219 // Delete archival files.
4220 for (size_t i = 0; i < filenames.size(); ++i) {
4221 env_->DeleteFile(dbname_ + "/" + filenames[i]);
4222 }
4223 env_->DeleteDir(dbname_);
4224 DestroyAndReopen(options);
4225
4226 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
4227 env_->SetBackgroundThreads(1, Env::LOW);
4228 env_->SetBackgroundThreads(1, Env::HIGH);
4229 Random rnd(301);
4230 int key_idx = 0;
4231
4232 // First three 110KB files are not going to level 2
4233 // After that, (100K, 200K)
4234 for (int num = 0; num < 5; num++) {
4235 GenerateNewFile(&rnd, &key_idx, true);
4236 }
4237
4238 ASSERT_EQ(0, GetSstFileCount(dbname_));
4239
4240 Close();
4241 ASSERT_EQ(0, GetSstFileCount(dbname_));
4242
4243 // Unblock background threads
4244 sleeping_task_high.WakeUp();
4245 sleeping_task_high.WaitUntilDone();
4246 sleeping_task_low.WakeUp();
4247 sleeping_task_low.WaitUntilDone();
4248
4249 Destroy(options);
4250 }
4251
4252 class DelayedMergeOperator : public MergeOperator {
4253 private:
4254 DBTest* db_test_;
4255
4256 public:
4257 explicit DelayedMergeOperator(DBTest* d) : db_test_(d) {}
4258
4259 virtual bool FullMergeV2(const MergeOperationInput& merge_in,
4260 MergeOperationOutput* merge_out) const override {
4261 db_test_->env_->addon_time_.fetch_add(1000);
4262 merge_out->new_value = "";
4263 return true;
4264 }
4265
4266 virtual const char* Name() const override { return "DelayedMergeOperator"; }
4267 };
4268
4269 TEST_F(DBTest, MergeTestTime) {
4270 std::string one, two, three;
4271 PutFixed64(&one, 1);
4272 PutFixed64(&two, 2);
4273 PutFixed64(&three, 3);
4274
4275 // Enable time profiling
4276 SetPerfLevel(kEnableTime);
4277 this->env_->addon_time_.store(0);
4278 this->env_->time_elapse_only_sleep_ = true;
4279 this->env_->no_slowdown_ = true;
4280 Options options = CurrentOptions();
4281 options.statistics = rocksdb::CreateDBStatistics();
4282 options.merge_operator.reset(new DelayedMergeOperator(this));
4283 DestroyAndReopen(options);
4284
4285 ASSERT_EQ(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 0);
4286 db_->Put(WriteOptions(), "foo", one);
4287 ASSERT_OK(Flush());
4288 ASSERT_OK(db_->Merge(WriteOptions(), "foo", two));
4289 ASSERT_OK(Flush());
4290 ASSERT_OK(db_->Merge(WriteOptions(), "foo", three));
4291 ASSERT_OK(Flush());
4292
4293 ReadOptions opt;
4294 opt.verify_checksums = true;
4295 opt.snapshot = nullptr;
4296 std::string result;
4297 db_->Get(opt, "foo", &result);
4298
4299 ASSERT_EQ(1000000, TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME));
4300
4301 ReadOptions read_options;
4302 std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
4303 int count = 0;
4304 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
4305 ASSERT_OK(iter->status());
4306 ++count;
4307 }
4308
4309 ASSERT_EQ(1, count);
4310 ASSERT_EQ(2000000, TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME));
4311 #ifdef ROCKSDB_USING_THREAD_STATUS
4312 ASSERT_GT(TestGetTickerCount(options, FLUSH_WRITE_BYTES), 0);
4313 #endif // ROCKSDB_USING_THREAD_STATUS
4314 this->env_->time_elapse_only_sleep_ = false;
4315 }
4316
4317 #ifndef ROCKSDB_LITE
4318 TEST_P(DBTestWithParam, MergeCompactionTimeTest) {
4319 SetPerfLevel(kEnableTime);
4320 Options options = CurrentOptions();
4321 options.compaction_filter_factory = std::make_shared<KeepFilterFactory>();
4322 options.statistics = rocksdb::CreateDBStatistics();
4323 options.merge_operator.reset(new DelayedMergeOperator(this));
4324 options.compaction_style = kCompactionStyleUniversal;
4325 options.max_subcompactions = max_subcompactions_;
4326 DestroyAndReopen(options);
4327
4328 for (int i = 0; i < 1000; i++) {
4329 ASSERT_OK(db_->Merge(WriteOptions(), "foo", "TEST"));
4330 ASSERT_OK(Flush());
4331 }
4332 dbfull()->TEST_WaitForFlushMemTable();
4333 dbfull()->TEST_WaitForCompact();
4334
4335 ASSERT_NE(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 0);
4336 }
4337
4338 TEST_P(DBTestWithParam, FilterCompactionTimeTest) {
4339 Options options = CurrentOptions();
4340 options.compaction_filter_factory =
4341 std::make_shared<DelayFilterFactory>(this);
4342 options.disable_auto_compactions = true;
4343 options.create_if_missing = true;
4344 options.statistics = rocksdb::CreateDBStatistics();
4345 options.max_subcompactions = max_subcompactions_;
4346 DestroyAndReopen(options);
4347
4348 // put some data
4349 for (int table = 0; table < 4; ++table) {
4350 for (int i = 0; i < 10 + table; ++i) {
4351 Put(ToString(table * 100 + i), "val");
4352 }
4353 Flush();
4354 }
4355
4356 CompactRangeOptions cro;
4357 cro.exclusive_manual_compaction = exclusive_manual_compaction_;
4358 ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
4359 ASSERT_EQ(0U, CountLiveFiles());
4360
4361 Reopen(options);
4362
4363 Iterator* itr = db_->NewIterator(ReadOptions());
4364 itr->SeekToFirst();
4365 ASSERT_NE(TestGetTickerCount(options, FILTER_OPERATION_TOTAL_TIME), 0);
4366 delete itr;
4367 }
4368 #endif // ROCKSDB_LITE
4369
4370 TEST_F(DBTest, TestLogCleanup) {
4371 Options options = CurrentOptions();
4372 options.write_buffer_size = 64 * 1024; // very small
4373 // only two memtables allowed ==> only two log files
4374 options.max_write_buffer_number = 2;
4375 Reopen(options);
4376
4377 for (int i = 0; i < 100000; ++i) {
4378 Put(Key(i), "val");
4379 // only 2 memtables will be alive, so logs_to_free needs to always be below
4380 // 2
4381 ASSERT_LT(dbfull()->TEST_LogsToFreeSize(), static_cast<size_t>(3));
4382 }
4383 }
4384
4385 #ifndef ROCKSDB_LITE
4386 TEST_F(DBTest, EmptyCompactedDB) {
4387 Options options = CurrentOptions();
4388 options.max_open_files = -1;
4389 Close();
4390 ASSERT_OK(ReadOnlyReopen(options));
4391 Status s = Put("new", "value");
4392 ASSERT_TRUE(s.IsNotSupported());
4393 Close();
4394 }
4395 #endif // ROCKSDB_LITE
4396
4397 #ifndef ROCKSDB_LITE
4398 TEST_F(DBTest, SuggestCompactRangeTest) {
4399 class CompactionFilterFactoryGetContext : public CompactionFilterFactory {
4400 public:
4401 virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
4402 const CompactionFilter::Context& context) override {
4403 saved_context = context;
4404 std::unique_ptr<CompactionFilter> empty_filter;
4405 return empty_filter;
4406 }
4407 const char* Name() const override {
4408 return "CompactionFilterFactoryGetContext";
4409 }
4410 static bool IsManual(CompactionFilterFactory* compaction_filter_factory) {
4411 return reinterpret_cast<CompactionFilterFactoryGetContext*>(
4412 compaction_filter_factory)
4413 ->saved_context.is_manual_compaction;
4414 }
4415 CompactionFilter::Context saved_context;
4416 };
4417
4418 Options options = CurrentOptions();
4419 options.memtable_factory.reset(
4420 new SpecialSkipListFactory(DBTestBase::kNumKeysByGenerateNewRandomFile));
4421 options.compaction_style = kCompactionStyleLevel;
4422 options.compaction_filter_factory.reset(
4423 new CompactionFilterFactoryGetContext());
4424 options.write_buffer_size = 200 << 10;
4425 options.arena_block_size = 4 << 10;
4426 options.level0_file_num_compaction_trigger = 4;
4427 options.num_levels = 4;
4428 options.compression = kNoCompression;
4429 options.max_bytes_for_level_base = 450 << 10;
4430 options.target_file_size_base = 98 << 10;
4431 options.max_compaction_bytes = static_cast<uint64_t>(1) << 60; // inf
4432
4433 Reopen(options);
4434
4435 Random rnd(301);
4436
4437 for (int num = 0; num < 3; num++) {
4438 GenerateNewRandomFile(&rnd);
4439 }
4440
4441 GenerateNewRandomFile(&rnd);
4442 ASSERT_EQ("0,4", FilesPerLevel(0));
4443 ASSERT_TRUE(!CompactionFilterFactoryGetContext::IsManual(
4444 options.compaction_filter_factory.get()));
4445
4446 GenerateNewRandomFile(&rnd);
4447 ASSERT_EQ("1,4", FilesPerLevel(0));
4448
4449 GenerateNewRandomFile(&rnd);
4450 ASSERT_EQ("2,4", FilesPerLevel(0));
4451
4452 GenerateNewRandomFile(&rnd);
4453 ASSERT_EQ("3,4", FilesPerLevel(0));
4454
4455 GenerateNewRandomFile(&rnd);
4456 ASSERT_EQ("0,4,4", FilesPerLevel(0));
4457
4458 GenerateNewRandomFile(&rnd);
4459 ASSERT_EQ("1,4,4", FilesPerLevel(0));
4460
4461 GenerateNewRandomFile(&rnd);
4462 ASSERT_EQ("2,4,4", FilesPerLevel(0));
4463
4464 GenerateNewRandomFile(&rnd);
4465 ASSERT_EQ("3,4,4", FilesPerLevel(0));
4466
4467 GenerateNewRandomFile(&rnd);
4468 ASSERT_EQ("0,4,8", FilesPerLevel(0));
4469
4470 GenerateNewRandomFile(&rnd);
4471 ASSERT_EQ("1,4,8", FilesPerLevel(0));
4472
4473 // compact it three times
4474 for (int i = 0; i < 3; ++i) {
4475 ASSERT_OK(experimental::SuggestCompactRange(db_, nullptr, nullptr));
4476 dbfull()->TEST_WaitForCompact();
4477 }
4478
4479 // All files are compacted
4480 ASSERT_EQ(0, NumTableFilesAtLevel(0));
4481 ASSERT_EQ(0, NumTableFilesAtLevel(1));
4482
4483 GenerateNewRandomFile(&rnd);
4484 ASSERT_EQ(1, NumTableFilesAtLevel(0));
4485
4486 // nonoverlapping with the file on level 0
4487 Slice start("a"), end("b");
4488 ASSERT_OK(experimental::SuggestCompactRange(db_, &start, &end));
4489 dbfull()->TEST_WaitForCompact();
4490
4491 // should not compact the level 0 file
4492 ASSERT_EQ(1, NumTableFilesAtLevel(0));
4493
4494 start = Slice("j");
4495 end = Slice("m");
4496 ASSERT_OK(experimental::SuggestCompactRange(db_, &start, &end));
4497 dbfull()->TEST_WaitForCompact();
4498 ASSERT_TRUE(CompactionFilterFactoryGetContext::IsManual(
4499 options.compaction_filter_factory.get()));
4500
4501 // now it should compact the level 0 file
4502 ASSERT_EQ(0, NumTableFilesAtLevel(0));
4503 ASSERT_EQ(1, NumTableFilesAtLevel(1));
4504 }
4505
4506 TEST_F(DBTest, PromoteL0) {
4507 Options options = CurrentOptions();
4508 options.disable_auto_compactions = true;
4509 options.write_buffer_size = 10 * 1024 * 1024;
4510 DestroyAndReopen(options);
4511
4512 // non overlapping ranges
4513 std::vector<std::pair<int32_t, int32_t>> ranges = {
4514 {81, 160}, {0, 80}, {161, 240}, {241, 320}};
4515
4516 int32_t value_size = 10 * 1024; // 10 KB
4517
4518 Random rnd(301);
4519 std::map<int32_t, std::string> values;
4520 for (const auto& range : ranges) {
4521 for (int32_t j = range.first; j < range.second; j++) {
4522 values[j] = RandomString(&rnd, value_size);
4523 ASSERT_OK(Put(Key(j), values[j]));
4524 }
4525 ASSERT_OK(Flush());
4526 }
4527
4528 int32_t level0_files = NumTableFilesAtLevel(0, 0);
4529 ASSERT_EQ(level0_files, ranges.size());
4530 ASSERT_EQ(NumTableFilesAtLevel(1, 0), 0); // No files in L1
4531
4532 // Promote L0 level to L2.
4533 ASSERT_OK(experimental::PromoteL0(db_, db_->DefaultColumnFamily(), 2));
4534 // We expect that all the files were trivially moved from L0 to L2
4535 ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0);
4536 ASSERT_EQ(NumTableFilesAtLevel(2, 0), level0_files);
4537
4538 for (const auto& kv : values) {
4539 ASSERT_EQ(Get(Key(kv.first)), kv.second);
4540 }
4541 }
4542
4543 TEST_F(DBTest, PromoteL0Failure) {
4544 Options options = CurrentOptions();
4545 options.disable_auto_compactions = true;
4546 options.write_buffer_size = 10 * 1024 * 1024;
4547 DestroyAndReopen(options);
4548
4549 // Produce two L0 files with overlapping ranges.
4550 ASSERT_OK(Put(Key(0), ""));
4551 ASSERT_OK(Put(Key(3), ""));
4552 ASSERT_OK(Flush());
4553 ASSERT_OK(Put(Key(1), ""));
4554 ASSERT_OK(Flush());
4555
4556 Status status;
4557 // Fails because L0 has overlapping files.
4558 status = experimental::PromoteL0(db_, db_->DefaultColumnFamily());
4559 ASSERT_TRUE(status.IsInvalidArgument());
4560
4561 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
4562 // Now there is a file in L1.
4563 ASSERT_GE(NumTableFilesAtLevel(1, 0), 1);
4564
4565 ASSERT_OK(Put(Key(5), ""));
4566 ASSERT_OK(Flush());
4567 // Fails because L1 is non-empty.
4568 status = experimental::PromoteL0(db_, db_->DefaultColumnFamily());
4569 ASSERT_TRUE(status.IsInvalidArgument());
4570 }
4571 #endif // ROCKSDB_LITE
4572
4573 // Github issue #596
4574 TEST_F(DBTest, HugeNumberOfLevels) {
4575 Options options = CurrentOptions();
4576 options.write_buffer_size = 2 * 1024 * 1024; // 2MB
4577 options.max_bytes_for_level_base = 2 * 1024 * 1024; // 2MB
4578 options.num_levels = 12;
4579 options.max_background_compactions = 10;
4580 options.max_bytes_for_level_multiplier = 2;
4581 options.level_compaction_dynamic_level_bytes = true;
4582 DestroyAndReopen(options);
4583
4584 Random rnd(301);
4585 for (int i = 0; i < 300000; ++i) {
4586 ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024)));
4587 }
4588
4589 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
4590 }
4591
4592 TEST_F(DBTest, AutomaticConflictsWithManualCompaction) {
4593 Options options = CurrentOptions();
4594 options.write_buffer_size = 2 * 1024 * 1024; // 2MB
4595 options.max_bytes_for_level_base = 2 * 1024 * 1024; // 2MB
4596 options.num_levels = 12;
4597 options.max_background_compactions = 10;
4598 options.max_bytes_for_level_multiplier = 2;
4599 options.level_compaction_dynamic_level_bytes = true;
4600 DestroyAndReopen(options);
4601
4602 Random rnd(301);
4603 for (int i = 0; i < 300000; ++i) {
4604 ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024)));
4605 }
4606
4607 std::atomic<int> callback_count(0);
4608 rocksdb::SyncPoint::GetInstance()->SetCallBack(
4609 "DBImpl::BackgroundCompaction()::Conflict",
4610 [&](void* arg) { callback_count.fetch_add(1); });
4611 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
4612 CompactRangeOptions croptions;
4613 croptions.exclusive_manual_compaction = false;
4614 ASSERT_OK(db_->CompactRange(croptions, nullptr, nullptr));
4615 ASSERT_GE(callback_count.load(), 1);
4616 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
4617 for (int i = 0; i < 300000; ++i) {
4618 ASSERT_NE("NOT_FOUND", Get(Key(i)));
4619 }
4620 }
4621
4622 // Github issue #595
4623 // Large write batch with column families
4624 TEST_F(DBTest, LargeBatchWithColumnFamilies) {
4625 Options options = CurrentOptions();
4626 options.env = env_;
4627 options.write_buffer_size = 100000; // Small write buffer
4628 CreateAndReopenWithCF({"pikachu"}, options);
4629 int64_t j = 0;
4630 for (int i = 0; i < 5; i++) {
4631 for (int pass = 1; pass <= 3; pass++) {
4632 WriteBatch batch;
4633 size_t write_size = 1024 * 1024 * (5 + i);
4634 fprintf(stderr, "prepare: %" ROCKSDB_PRIszt " MB, pass:%d\n",
4635 (write_size / 1024 / 1024), pass);
4636 for (;;) {
4637 std::string data(3000, j++ % 127 + 20);
4638 data += ToString(j);
4639 batch.Put(handles_[0], Slice(data), Slice(data));
4640 if (batch.GetDataSize() > write_size) {
4641 break;
4642 }
4643 }
4644 fprintf(stderr, "write: %" ROCKSDB_PRIszt " MB\n",
4645 (batch.GetDataSize() / 1024 / 1024));
4646 ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
4647 fprintf(stderr, "done\n");
4648 }
4649 }
4650 // make sure we can re-open it.
4651 ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
4652 }
4653
4654 // Make sure that Flushes can proceed in parallel with CompactRange()
4655 TEST_F(DBTest, FlushesInParallelWithCompactRange) {
4656 // iter == 0 -- leveled
4657 // iter == 1 -- leveled, but throw in a flush between two levels compacting
4658 // iter == 2 -- universal
4659 for (int iter = 0; iter < 3; ++iter) {
4660 Options options = CurrentOptions();
4661 if (iter < 2) {
4662 options.compaction_style = kCompactionStyleLevel;
4663 } else {
4664 options.compaction_style = kCompactionStyleUniversal;
4665 }
4666 options.write_buffer_size = 110 << 10;
4667 options.level0_file_num_compaction_trigger = 4;
4668 options.num_levels = 4;
4669 options.compression = kNoCompression;
4670 options.max_bytes_for_level_base = 450 << 10;
4671 options.target_file_size_base = 98 << 10;
4672 options.max_write_buffer_number = 2;
4673
4674 DestroyAndReopen(options);
4675
4676 Random rnd(301);
4677 for (int num = 0; num < 14; num++) {
4678 GenerateNewRandomFile(&rnd);
4679 }
4680
4681 if (iter == 1) {
4682 rocksdb::SyncPoint::GetInstance()->LoadDependency(
4683 {{"DBImpl::RunManualCompaction()::1",
4684 "DBTest::FlushesInParallelWithCompactRange:1"},
4685 {"DBTest::FlushesInParallelWithCompactRange:2",
4686 "DBImpl::RunManualCompaction()::2"}});
4687 } else {
4688 rocksdb::SyncPoint::GetInstance()->LoadDependency(
4689 {{"CompactionJob::Run():Start",
4690 "DBTest::FlushesInParallelWithCompactRange:1"},
4691 {"DBTest::FlushesInParallelWithCompactRange:2",
4692 "CompactionJob::Run():End"}});
4693 }
4694 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
4695
4696 std::vector<port::Thread> threads;
4697 threads.emplace_back([&]() { Compact("a", "z"); });
4698
4699 TEST_SYNC_POINT("DBTest::FlushesInParallelWithCompactRange:1");
4700
4701 // this has to start a flush. if flushes are blocked, this will try to
4702 // create
4703 // 3 memtables, and that will fail because max_write_buffer_number is 2
4704 for (int num = 0; num < 3; num++) {
4705 GenerateNewRandomFile(&rnd, /* nowait */ true);
4706 }
4707
4708 TEST_SYNC_POINT("DBTest::FlushesInParallelWithCompactRange:2");
4709
4710 for (auto& t : threads) {
4711 t.join();
4712 }
4713 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
4714 }
4715 }
4716
4717 TEST_F(DBTest, DelayedWriteRate) {
4718 const int kEntriesPerMemTable = 100;
4719 const int kTotalFlushes = 12;
4720
4721 Options options = CurrentOptions();
4722 env_->SetBackgroundThreads(1, Env::LOW);
4723 options.env = env_;
4724 env_->no_slowdown_ = true;
4725 options.write_buffer_size = 100000000;
4726 options.max_write_buffer_number = 256;
4727 options.max_background_compactions = 1;
4728 options.level0_file_num_compaction_trigger = 3;
4729 options.level0_slowdown_writes_trigger = 3;
4730 options.level0_stop_writes_trigger = 999999;
4731 options.delayed_write_rate = 20000000; // Start with 200MB/s
4732 options.memtable_factory.reset(
4733 new SpecialSkipListFactory(kEntriesPerMemTable));
4734
4735 CreateAndReopenWithCF({"pikachu"}, options);
4736
4737 // Block compactions
4738 test::SleepingBackgroundTask sleeping_task_low;
4739 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
4740 Env::Priority::LOW);
4741
4742 for (int i = 0; i < 3; i++) {
4743 Put(Key(i), std::string(10000, 'x'));
4744 Flush();
4745 }
4746
4747 // These writes will be slowed down to 1KB/s
4748 uint64_t estimated_sleep_time = 0;
4749 Random rnd(301);
4750 Put("", "");
4751 uint64_t cur_rate = options.delayed_write_rate;
4752 for (int i = 0; i < kTotalFlushes; i++) {
4753 uint64_t size_memtable = 0;
4754 for (int j = 0; j < kEntriesPerMemTable; j++) {
4755 auto rand_num = rnd.Uniform(20);
4756 // Spread the size range to more.
4757 size_t entry_size = rand_num * rand_num * rand_num;
4758 WriteOptions wo;
4759 Put(Key(i), std::string(entry_size, 'x'), wo);
4760 size_memtable += entry_size + 18;
4761 // Occasionally sleep a while
4762 if (rnd.Uniform(20) == 6) {
4763 env_->SleepForMicroseconds(2666);
4764 }
4765 }
4766 dbfull()->TEST_WaitForFlushMemTable();
4767 estimated_sleep_time += size_memtable * 1000000u / cur_rate;
4768 // Slow down twice. One for memtable switch and one for flush finishes.
4769 cur_rate = static_cast<uint64_t>(static_cast<double>(cur_rate) *
4770 kIncSlowdownRatio * kIncSlowdownRatio);
4771 }
4772 // Estimate the total sleep time fall into the rough range.
4773 ASSERT_GT(env_->addon_time_.load(),
4774 static_cast<int64_t>(estimated_sleep_time / 2));
4775 ASSERT_LT(env_->addon_time_.load(),
4776 static_cast<int64_t>(estimated_sleep_time * 2));
4777
4778 env_->no_slowdown_ = false;
4779 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
4780 sleeping_task_low.WakeUp();
4781 sleeping_task_low.WaitUntilDone();
4782 }
4783
4784 TEST_F(DBTest, HardLimit) {
4785 Options options = CurrentOptions();
4786 options.env = env_;
4787 env_->SetBackgroundThreads(1, Env::LOW);
4788 options.max_write_buffer_number = 256;
4789 options.write_buffer_size = 110 << 10; // 110KB
4790 options.arena_block_size = 4 * 1024;
4791 options.level0_file_num_compaction_trigger = 4;
4792 options.level0_slowdown_writes_trigger = 999999;
4793 options.level0_stop_writes_trigger = 999999;
4794 options.hard_pending_compaction_bytes_limit = 800 << 10;
4795 options.max_bytes_for_level_base = 10000000000u;
4796 options.max_background_compactions = 1;
4797 options.memtable_factory.reset(
4798 new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
4799
4800 env_->SetBackgroundThreads(1, Env::LOW);
4801 test::SleepingBackgroundTask sleeping_task_low;
4802 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
4803 Env::Priority::LOW);
4804
4805 CreateAndReopenWithCF({"pikachu"}, options);
4806
4807 std::atomic<int> callback_count(0);
4808 rocksdb::SyncPoint::GetInstance()->SetCallBack("DBImpl::DelayWrite:Wait",
4809 [&](void* arg) {
4810 callback_count.fetch_add(1);
4811 sleeping_task_low.WakeUp();
4812 });
4813 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
4814
4815 Random rnd(301);
4816 int key_idx = 0;
4817 for (int num = 0; num < 5; num++) {
4818 GenerateNewFile(&rnd, &key_idx, true);
4819 dbfull()->TEST_WaitForFlushMemTable();
4820 }
4821
4822 ASSERT_EQ(0, callback_count.load());
4823
4824 for (int num = 0; num < 5; num++) {
4825 GenerateNewFile(&rnd, &key_idx, true);
4826 dbfull()->TEST_WaitForFlushMemTable();
4827 }
4828 ASSERT_GE(callback_count.load(), 1);
4829
4830 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
4831 sleeping_task_low.WaitUntilDone();
4832 }
4833
4834 #ifndef ROCKSDB_LITE
4835 TEST_F(DBTest, SoftLimit) {
4836 Options options = CurrentOptions();
4837 options.env = env_;
4838 options.write_buffer_size = 100000; // Small write buffer
4839 options.max_write_buffer_number = 256;
4840 options.level0_file_num_compaction_trigger = 1;
4841 options.level0_slowdown_writes_trigger = 3;
4842 options.level0_stop_writes_trigger = 999999;
4843 options.delayed_write_rate = 20000; // About 200KB/s limited rate
4844 options.soft_pending_compaction_bytes_limit = 160000;
4845 options.target_file_size_base = 99999999; // All into one file
4846 options.max_bytes_for_level_base = 50000;
4847 options.max_bytes_for_level_multiplier = 10;
4848 options.max_background_compactions = 1;
4849 options.compression = kNoCompression;
4850
4851 Reopen(options);
4852
4853 // Generating 360KB in Level 3
4854 for (int i = 0; i < 72; i++) {
4855 Put(Key(i), std::string(5000, 'x'));
4856 if (i % 10 == 0) {
4857 Flush();
4858 }
4859 }
4860 dbfull()->TEST_WaitForCompact();
4861 MoveFilesToLevel(3);
4862
4863 // Generating 360KB in Level 2
4864 for (int i = 0; i < 72; i++) {
4865 Put(Key(i), std::string(5000, 'x'));
4866 if (i % 10 == 0) {
4867 Flush();
4868 }
4869 }
4870 dbfull()->TEST_WaitForCompact();
4871 MoveFilesToLevel(2);
4872
4873 Put(Key(0), "");
4874
4875 test::SleepingBackgroundTask sleeping_task_low;
4876 // Block compactions
4877 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
4878 Env::Priority::LOW);
4879 sleeping_task_low.WaitUntilSleeping();
4880
4881 // Create 3 L0 files, making score of L0 to be 3.
4882 for (int i = 0; i < 3; i++) {
4883 Put(Key(i), std::string(5000, 'x'));
4884 Put(Key(100 - i), std::string(5000, 'x'));
4885 // Flush the file. File size is around 30KB.
4886 Flush();
4887 }
4888 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
4889
4890 sleeping_task_low.WakeUp();
4891 sleeping_task_low.WaitUntilDone();
4892 sleeping_task_low.Reset();
4893 dbfull()->TEST_WaitForCompact();
4894
4895 // Now there is one L1 file but doesn't trigger soft_rate_limit
4896 // The L1 file size is around 30KB.
4897 ASSERT_EQ(NumTableFilesAtLevel(1), 1);
4898 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
4899
4900 // Only allow one compactin going through.
4901 rocksdb::SyncPoint::GetInstance()->SetCallBack(
4902 "BackgroundCallCompaction:0", [&](void* arg) {
4903 // Schedule a sleeping task.
4904 sleeping_task_low.Reset();
4905 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
4906 &sleeping_task_low, Env::Priority::LOW);
4907 });
4908
4909 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
4910
4911 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
4912 Env::Priority::LOW);
4913 sleeping_task_low.WaitUntilSleeping();
4914 // Create 3 L0 files, making score of L0 to be 3
4915 for (int i = 0; i < 3; i++) {
4916 Put(Key(10 + i), std::string(5000, 'x'));
4917 Put(Key(90 - i), std::string(5000, 'x'));
4918 // Flush the file. File size is around 30KB.
4919 Flush();
4920 }
4921
4922 // Wake up sleep task to enable compaction to run and waits
4923 // for it to go to sleep state again to make sure one compaction
4924 // goes through.
4925 sleeping_task_low.WakeUp();
4926 sleeping_task_low.WaitUntilSleeping();
4927
4928 // Now there is one L1 file (around 60KB) which exceeds 50KB base by 10KB
4929 // Given level multiplier 10, estimated pending compaction is around 100KB
4930 // doesn't trigger soft_pending_compaction_bytes_limit
4931 ASSERT_EQ(NumTableFilesAtLevel(1), 1);
4932 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
4933
4934 // Create 3 L0 files, making score of L0 to be 3, higher than L0.
4935 for (int i = 0; i < 3; i++) {
4936 Put(Key(20 + i), std::string(5000, 'x'));
4937 Put(Key(80 - i), std::string(5000, 'x'));
4938 // Flush the file. File size is around 30KB.
4939 Flush();
4940 }
4941 // Wake up sleep task to enable compaction to run and waits
4942 // for it to go to sleep state again to make sure one compaction
4943 // goes through.
4944 sleeping_task_low.WakeUp();
4945 sleeping_task_low.WaitUntilSleeping();
4946
4947 // Now there is one L1 file (around 90KB) which exceeds 50KB base by 40KB
4948 // L2 size is 360KB, so the estimated level fanout 4, estimated pending
4949 // compaction is around 200KB
4950 // triggerring soft_pending_compaction_bytes_limit
4951 ASSERT_EQ(NumTableFilesAtLevel(1), 1);
4952 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
4953
4954 sleeping_task_low.WakeUp();
4955 sleeping_task_low.WaitUntilSleeping();
4956
4957 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
4958
4959 // shrink level base so L2 will hit soft limit easier.
4960 ASSERT_OK(dbfull()->SetOptions({
4961 {"max_bytes_for_level_base", "5000"},
4962 }));
4963
4964 Put("", "");
4965 Flush();
4966 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
4967
4968 sleeping_task_low.WaitUntilSleeping();
4969 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
4970 sleeping_task_low.WakeUp();
4971 sleeping_task_low.WaitUntilDone();
4972 }
4973
4974 TEST_F(DBTest, LastWriteBufferDelay) {
4975 Options options = CurrentOptions();
4976 options.env = env_;
4977 options.write_buffer_size = 100000;
4978 options.max_write_buffer_number = 4;
4979 options.delayed_write_rate = 20000;
4980 options.compression = kNoCompression;
4981 options.disable_auto_compactions = true;
4982 int kNumKeysPerMemtable = 3;
4983 options.memtable_factory.reset(
4984 new SpecialSkipListFactory(kNumKeysPerMemtable));
4985
4986 Reopen(options);
4987 test::SleepingBackgroundTask sleeping_task;
4988 // Block flushes
4989 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
4990 Env::Priority::HIGH);
4991 sleeping_task.WaitUntilSleeping();
4992
4993 // Create 3 L0 files, making score of L0 to be 3.
4994 for (int i = 0; i < 3; i++) {
4995 // Fill one mem table
4996 for (int j = 0; j < kNumKeysPerMemtable; j++) {
4997 Put(Key(j), "");
4998 }
4999 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
5000 }
5001 // Inserting a new entry would create a new mem table, triggering slow down.
5002 Put(Key(0), "");
5003 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
5004
5005 sleeping_task.WakeUp();
5006 sleeping_task.WaitUntilDone();
5007 }
5008 #endif // ROCKSDB_LITE
5009
5010 TEST_F(DBTest, FailWhenCompressionNotSupportedTest) {
5011 CompressionType compressions[] = {kZlibCompression, kBZip2Compression,
5012 kLZ4Compression, kLZ4HCCompression,
5013 kXpressCompression};
5014 for (auto comp : compressions) {
5015 if (!CompressionTypeSupported(comp)) {
5016 // not supported, we should fail the Open()
5017 Options options = CurrentOptions();
5018 options.compression = comp;
5019 ASSERT_TRUE(!TryReopen(options).ok());
5020 // Try if CreateColumnFamily also fails
5021 options.compression = kNoCompression;
5022 ASSERT_OK(TryReopen(options));
5023 ColumnFamilyOptions cf_options(options);
5024 cf_options.compression = comp;
5025 ColumnFamilyHandle* handle;
5026 ASSERT_TRUE(!db_->CreateColumnFamily(cf_options, "name", &handle).ok());
5027 }
5028 }
5029 }
5030
5031 #ifndef ROCKSDB_LITE
5032 TEST_F(DBTest, RowCache) {
5033 Options options = CurrentOptions();
5034 options.statistics = rocksdb::CreateDBStatistics();
5035 options.row_cache = NewLRUCache(8192);
5036 DestroyAndReopen(options);
5037
5038 ASSERT_OK(Put("foo", "bar"));
5039 ASSERT_OK(Flush());
5040
5041 ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 0);
5042 ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 0);
5043 ASSERT_EQ(Get("foo"), "bar");
5044 ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 0);
5045 ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 1);
5046 ASSERT_EQ(Get("foo"), "bar");
5047 ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 1);
5048 ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 1);
5049 }
5050 #endif // ROCKSDB_LITE
5051
5052 TEST_F(DBTest, DeletingOldWalAfterDrop) {
5053 rocksdb::SyncPoint::GetInstance()->LoadDependency(
5054 {{"Test:AllowFlushes", "DBImpl::BGWorkFlush"},
5055 {"DBImpl::BGWorkFlush:done", "Test:WaitForFlush"}});
5056 rocksdb::SyncPoint::GetInstance()->ClearTrace();
5057
5058 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
5059 Options options = CurrentOptions();
5060 options.max_total_wal_size = 8192;
5061 options.compression = kNoCompression;
5062 options.write_buffer_size = 1 << 20;
5063 options.level0_file_num_compaction_trigger = (1 << 30);
5064 options.level0_slowdown_writes_trigger = (1 << 30);
5065 options.level0_stop_writes_trigger = (1 << 30);
5066 options.disable_auto_compactions = true;
5067 DestroyAndReopen(options);
5068 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
5069
5070 CreateColumnFamilies({"cf1", "cf2"}, options);
5071 ASSERT_OK(Put(0, "key1", DummyString(8192)));
5072 ASSERT_OK(Put(0, "key2", DummyString(8192)));
5073 // the oldest wal should now be getting_flushed
5074 ASSERT_OK(db_->DropColumnFamily(handles_[0]));
5075 // all flushes should now do nothing because their CF is dropped
5076 TEST_SYNC_POINT("Test:AllowFlushes");
5077 TEST_SYNC_POINT("Test:WaitForFlush");
5078 uint64_t lognum1 = dbfull()->TEST_LogfileNumber();
5079 ASSERT_OK(Put(1, "key3", DummyString(8192)));
5080 ASSERT_OK(Put(1, "key4", DummyString(8192)));
5081 // new wal should have been created
5082 uint64_t lognum2 = dbfull()->TEST_LogfileNumber();
5083 EXPECT_GT(lognum2, lognum1);
5084 }
5085
5086 TEST_F(DBTest, UnsupportedManualSync) {
5087 DestroyAndReopen(CurrentOptions());
5088 env_->is_wal_sync_thread_safe_.store(false);
5089 Status s = db_->SyncWAL();
5090 ASSERT_TRUE(s.IsNotSupported());
5091 }
5092
5093 INSTANTIATE_TEST_CASE_P(DBTestWithParam, DBTestWithParam,
5094 ::testing::Combine(::testing::Values(1, 4),
5095 ::testing::Bool()));
5096
5097 TEST_F(DBTest, PauseBackgroundWorkTest) {
5098 Options options = CurrentOptions();
5099 options.write_buffer_size = 100000; // Small write buffer
5100 Reopen(options);
5101
5102 std::vector<port::Thread> threads;
5103 std::atomic<bool> done(false);
5104 db_->PauseBackgroundWork();
5105 threads.emplace_back([&]() {
5106 Random rnd(301);
5107 for (int i = 0; i < 10000; ++i) {
5108 Put(RandomString(&rnd, 10), RandomString(&rnd, 10));
5109 }
5110 done.store(true);
5111 });
5112 env_->SleepForMicroseconds(200000);
5113 // make sure the thread is not done
5114 ASSERT_FALSE(done.load());
5115 db_->ContinueBackgroundWork();
5116 for (auto& t : threads) {
5117 t.join();
5118 }
5119 // now it's done
5120 ASSERT_TRUE(done.load());
5121 }
5122
5123 } // namespace rocksdb
5124
5125 int main(int argc, char** argv) {
5126 rocksdb::port::InstallStackTraceHandler();
5127 ::testing::InitGoogleTest(&argc, argv);
5128 return RUN_ALL_TESTS();
5129 }